Compare commits

...

10 Commits

65 changed files with 1743 additions and 934 deletions

View File

@ -2,4 +2,7 @@ DATABASE_HOST=
DATABASE_PORT=
DATABASE_NAME=
DATABASE_USER=
DATABASE_PASSWORD=
DATABASE_PASSWORD=
TRANSACTION_LEVEL=
VERBOSITY=

Binary file not shown.

5
src/assets/__init__.py Normal file
View File

@ -0,0 +1,5 @@
from .asset_manager import *
__all__ = [
*asset_manager.__all__
]

View File

@ -0,0 +1,13 @@
import os
ASSETS_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "assets")
def get_asset(name: str) -> str:
absolute_path = os.path.join(ASSETS_DIR, name)
if not os.path.exists(absolute_path):
raise FileNotFoundError(f"Asset not found: {absolute_path}")
return os.path.abspath(absolute_path)
__all__ = ["get_asset"]

View File

@ -6,9 +6,8 @@
<xs:element name="book" maxOccurs="unbounded">
<xs:complexType>
<xs:sequence>
<xs:element name="title"> <!-- Book title-->
<xs:element name="title"> <!-- Book title -->
<xs:simpleType>
<!-- Allow for a string 2-100 characters long -->
<xs:restriction base="xs:string">
<xs:minLength value="2" />
<xs:maxLength value="100" />
@ -21,14 +20,14 @@
<xs:element name="first_name">
<xs:simpleType>
<xs:restriction base="xs:string">
<xs:maxLength value="50"/>
<xs:maxLength value="50" />
</xs:restriction>
</xs:simpleType>
</xs:element>
<xs:element name="last_name">
<xs:simpleType>
<xs:restriction base="xs:string">
<xs:maxLength value="50"/>
<xs:maxLength value="50" />
</xs:restriction>
</xs:simpleType>
</xs:element>
@ -50,6 +49,16 @@
</xs:restriction>
</xs:simpleType>
</xs:element>
<xs:element name="price"> <!-- Price -->
<xs:simpleType>
<xs:restriction base="xs:decimal">
<xs:totalDigits value="7" /> <!-- Max 99999.99 -->
<xs:fractionDigits value="2" />
<xs:minInclusive value="0.00" />
</xs:restriction>
</xs:simpleType>
</xs:element>
<xs:element name="is_damaged" type="xs:boolean" /> <!-- Is damaged -->
<xs:element name="categories"> <!-- Categories list -->
<xs:complexType>
<xs:sequence>
@ -64,5 +73,4 @@
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:schema>
</xs:schema>

View File

@ -1,9 +1,17 @@
from .manager import *
from .book import *
from .book_category import *
from .book_category_statistics import *
from .book_category_statistics_overview import *
from .member import *
from .book_overview import *
__all__ = [
*manager.__all__,
*book.__all__,
*book_category.__all__,
*book_category_statistics.__all__,
*book_category_statistics_overview.__all__,
*book_overview.__all__,
*member.__all__,
]

32
src/database/author.py Normal file
View File

@ -0,0 +1,32 @@
import logging
from typing import Dict
from sqlalchemy.orm import Session
from models import Author
logger = logging.getLogger(__name__)
def get_or_create_author(session: Session, author_data: Dict[str, str]) -> Author:
"""
Checks if an author exists in the database, creates one if not.
:param session: SQLAlchemy session object.
:param author_data: Dictionary containing author's first and last name.
:return: An Author instance (either existing or newly created).
"""
existing_author = session.query(Author).filter_by(
first_name=author_data["first_name"],
last_name=author_data["last_name"]
).one_or_none()
if existing_author is not None:
logger.debug(f"Author {author_data['first_name']} {author_data['last_name']} already exists. Reusing.")
return existing_author
logger.debug(f"Creating new author: {author_data['first_name']} {author_data['last_name']}")
author = Author(first_name=author_data["first_name"], last_name=author_data["last_name"])
session.add(author)
return author
__all__ = ["get_or_create_author"]

View File

@ -1,66 +1,132 @@
from typing import Dict, List
import logging
from sqlalchemy.exc import IntegrityError, SQLAlchemyError, DatabaseError as SqlAlchemyDatabaseError
from sqlalchemy.orm import joinedload
from sqlalchemy import delete
from utils.errors.database import DatabaseError, DuplicateEntryError, DatabaseConnectionError
from models import Book
from database.manager import DatabaseManager
from .author import get_or_create_author
from .book_category import get_or_create_categories
from .book_category_statistics import update_category_statistics
import logging
logger = logging.getLogger(__name__)
def fetch_all():
def fetch_all_books() -> List[Book]:
with DatabaseManager.get_session() as session:
try:
return session.query(Book).all()
return session.query(Book) \
.options(
joinedload(Book.author),
joinedload(Book.categories)
) \
.all()
except SqlAlchemyDatabaseError as e:
logger.critical("Connection with database interrupted")
raise DatabaseConnectionError(
"Connection with database interrupted") from e
raise DatabaseConnectionError("Connection with database interrupted") from e
except SQLAlchemyError as e:
logger.error(f"An error occured when fetching all books: {e}")
raise DatabaseError(
"An error occured when fetching all books") from e
logger.error(f"An error occurred when fetching all books: {e}")
raise DatabaseError("An error occurred when fetching all books") from e
def create_book(book: Dict[str, object], skip_existing: bool = True) -> None:
create_books([book], skip_existing)
def create_new_book(book: Book):
pass
def update_book(book: Book):
session = DatabaseManager.get_session()
def create_books(books: List[Dict[str, object]], skip_existing: bool = True) -> None:
try:
with session:
logger.debug(f"Updating book {book.title}")
existing_book = session.query(Book).get(book.id)
with DatabaseManager.get_session() as session:
for book in books:
logger.debug(f"Attempting to create a new book: {book['title']}")
if not existing_book:
logger.warning(f"Book with id {book.id} not found")
raise DatabaseError("Book not found in the database")
existing_book = session.query(Book).filter_by(isbn=book["isbn"]).first()
existing_book.title = book.title
existing_book.description = book.description
existing_book.year_published = book.year_published
existing_book.isbn = book.isbn
if existing_book:
if skip_existing:
logger.warning(f"Book with ISBN {book['isbn']} already exists. Skipping.")
continue
else:
logger.error(f"Book with ISBN {book['isbn']} already exists.")
raise DuplicateEntryError(f"Book with ISBN {book['isbn']} already exists.")
author = get_or_create_author(session, book["author"])
categories = get_or_create_categories(session, book["categories"])
new_book = Book(
title=book["title"],
description=book["description"],
year_published=book["year_published"],
isbn=book["isbn"],
price=book["price"],
is_damaged=book["is_damaged"],
author=author,
categories=categories
)
session.add(new_book)
update_category_statistics(session)
session.commit()
logger.info("Book successfully updated")
except IntegrityError as e:
logger.warning("Data already exists")
session.rollback()
raise DuplicateEntryError(
"Data already exists in the database") from e
raise DuplicateEntryError("Data already exists in the database") from e
except SqlAlchemyDatabaseError as e:
session.rollback()
logger.critical("Connection with database interrupted")
raise DatabaseConnectionError(
"Connection with database interrupted") from e
raise DatabaseConnectionError("Connection with database interrupted") from e
except SQLAlchemyError as e:
logger.error(f"An error occured when saving book: {e}")
session.rollback()
raise DatabaseError(
"An error occured when updating the book") from e
logger.error(f"An error occurred when creating the book: {e}")
raise DatabaseError("An error occurred when creating the book") from e
def update_book(book: Dict[str, object]) -> None:
try:
with DatabaseManager.get_session() as session:
logger.debug(f"Updating book {book['title']}")
__all__ = ["create_new_book", "update_book"]
existing_book = session.query(Book).filter_by(isbn=book["isbn"]).first()
if not existing_book:
logger.warning(f"Book with ISBN {book['isbn']} not found")
raise DatabaseError("Book not found in the database")
author = get_or_create_author(session, book["author"])
categories = get_or_create_categories(session, book["categories"])
session.commit()
existing_book.title = book["title"]
existing_book.description = book["description"]
existing_book.year_published = book["year_published"]
existing_book.isbn = book["isbn"]
existing_book.price = book["price"]
existing_book.is_damaged = book["is_damaged"]
existing_book.status = book["status"]
existing_book.author = author
existing_book.categories = categories
update_category_statistics(session, ignore_config=True)
session.commit()
logger.info(f"{book['title']} successfully updated.")
except IntegrityError as e:
logger.warning("Data already exists")
raise DuplicateEntryError("Data already exists in the database") from e
except SqlAlchemyDatabaseError as e:
logger.critical("Connection with database interrupted")
raise DatabaseConnectionError("Connection with database interrupted") from e
except SQLAlchemyError as e:
logger.error(f"An error occurred when updating the book: {e}")
raise DatabaseError("An error occurred when updating the book") from e
def delete_book(book_id: int) -> None:
try:
with DatabaseManager.get_session() as session:
logger.debug(f"Deleting book id {book_id}")
stmt = delete(Book).where(Book.id == book_id)
session.execute(stmt)
update_category_statistics(session, ignore_config=True)
session.commit()
logger.info(f"Successfully deleted book with id {book_id}")
except SqlAlchemyDatabaseError as e:
logger.critical("Connection with database interrupted")
raise DatabaseConnectionError("Connection with database interrupted") from e
except SQLAlchemyError as e:
logger.error(f"An error occurred when updating the book: {e}")
raise DatabaseError("An error occurred when updating the book") from e
__all__ = ["create_book", "create_books", "update_book", "fetch_all_books", "delete_book"]

View File

@ -0,0 +1,48 @@
from typing import Dict, List, Optional
import logging
from models import BookCategory
from database.manager import DatabaseManager
from sqlalchemy.orm import Session
from sqlalchemy import func
logger = logging.getLogger(__name__)
def get_or_create_categories(session, category_names: List[str]) -> List[BookCategory]:
"""
Checks if categories exist in the database, creates ones that don't.
:param session: SQLAlchemy session object.
:param category_names: List of category names.
:return: List of BookCategory instances (existing or newly created).
"""
processed_categories = {} # Cache for already processed categories
filtered_categories = []
for category_name in category_names:
if category_name in processed_categories:
filtered_categories.append(processed_categories[category_name])
continue
existing_category = session.query(BookCategory).filter_by(name=category_name).one_or_none()
if existing_category is not None:
logger.debug(f"Category {category_name} already exists. Reusing.")
processed_categories[category_name] = existing_category
filtered_categories.append(existing_category)
else:
logger.debug(f"Adding new category: {category_name}")
new_category = BookCategory(name=category_name)
session.add(new_category)
processed_categories[category_name] = new_category
filtered_categories.append(new_category)
return filtered_categories
def get_total_count(session: Session) -> int:
return session.query(func.count(BookCategory.id)).scalar()
__all__ = ["get_total_count", "get_or_create_categories"]

View File

@ -0,0 +1,56 @@
from sqlalchemy import func, insert
from sqlalchemy.orm import Session
from models import BookCategory, BookCategoryStatistics, BookCategoryLink
import logging
logger = logging.getLogger(__name__)
def update_category_statistics(session: Session, ignore_config: bool = False) -> None:
"""
Updates category statistics by calculating the count of books in each category.
:param session: SQLAlchemy session object.
"""
# Fetch book counts per category using a join between book_category and book_category_link
category_counts = (
session.query(
BookCategory.id.label('book_category_id'),
func.count(BookCategoryLink.book_id).label('book_count')
)
.join(BookCategoryLink, BookCategoryLink.book_category_id == BookCategory.id, isouter=True)
.group_by(BookCategory.id)
.all()
)
# Iterate over the results and update or insert the category statistics
for category_id, book_count in category_counts:
# Try to get the existing statistics or create a new one if it doesn't exist
existing_statistics = (
session.query(BookCategoryStatistics)
.filter(BookCategoryStatistics.book_category_id == category_id)
.one_or_none()
)
if existing_statistics:
# If statistics exist, update the count
existing_statistics.book_count = book_count
logger.debug(f"Updated category {category_id} with count {book_count}")
else:
# If statistics don't exist, create a new one
new_statistics = BookCategoryStatistics(
book_category_id=category_id,
book_count=book_count
)
session.add(new_statistics)
logger.debug(f"Inserted new statistics for category {category_id} with count {book_count}")
try:
# Commit the transaction
session.commit()
logger.debug("Category statistics updated successfully")
except Exception as e:
# In case of error, rollback the transaction
logger.error(f"An error occurred while updating category statistics: {e}")
session.rollback()
__all__ = ["update_category_statistics"]

View File

@ -0,0 +1,55 @@
from typing import List, Tuple
import time
import logging
from sqlalchemy.exc import IntegrityError, SQLAlchemyError, DatabaseError as SqlAlchemyDatabaseError
from utils.errors.database import DatabaseError, DuplicateEntryError, DatabaseConnectionError
from models import BookCategoryStatisticsOverview
from database.manager import DatabaseManager
from database import get_total_count
from utils.config import UserConfig
logger = logging.getLogger(__name__)
def fetch_all_book_category_statistics_overviews() -> List[BookCategoryStatisticsOverview]:
with DatabaseManager.get_session() as session:
try:
return session.query(BookCategoryStatisticsOverview).all()
except SqlAlchemyDatabaseError as e:
logger.critical("Connection with database interrupted")
raise DatabaseConnectionError("Connection with database interrupted") from e
except SQLAlchemyError as e:
logger.error(f"An error occurred when fetching all category overviews: {e}")
raise DatabaseError("An error occurred when fetching all category overviews") from e
def fetch_all_book_category_statistics_overviews_with_count() -> Tuple[List[BookCategoryStatisticsOverview], int]:
with DatabaseManager.get_session() as session:
try:
category_list = session.query(BookCategoryStatisticsOverview).all()
user_config = UserConfig()
if user_config.simulate_slowdown:
logger.debug("Simulating slowdown after fetching statistics for 10 seconds")
time.sleep(10)
else:
logger.debug("Performing category statistics update normally")
count = get_total_count(session)
return (category_list, count)
except SqlAlchemyDatabaseError as e:
logger.critical("Connection with database interrupted")
raise DatabaseConnectionError("Connection with database interrupted") from e
except SQLAlchemyError as e:
logger.error(f"An error occurred when fetching all category overviews: {e}")
raise DatabaseError("An error occurred when fetching all category overviews") from e
__all__ = ["fetch_all_book_category_statistics_overviews", "fetch_all_book_category_statistics_overviews_with_count"]

View File

@ -0,0 +1,25 @@
from typing import Dict, List
import logging
from sqlalchemy.exc import IntegrityError, SQLAlchemyError, DatabaseError as SqlAlchemyDatabaseError
from utils.errors.database import DatabaseError, DuplicateEntryError, DatabaseConnectionError
from models import BooksOverview
from database.manager import DatabaseManager
logger = logging.getLogger(__name__)
def fetch_all_book_overviews() -> List[BooksOverview]:
with DatabaseManager.get_session() as session:
try:
return session.query(BooksOverview).all()
except SqlAlchemyDatabaseError as e:
logger.critical("Connection with database interrupted")
raise DatabaseConnectionError("Connection with database interrupted") from e
except SQLAlchemyError as e:
logger.error(f"An error occurred when fetching all books: {e}")
raise DatabaseError("An error occurred when fetching all books") from e
__all__ = ["fetch_all_book_overviews"]

View File

@ -5,7 +5,7 @@ from sqlalchemy import create_engine, text
from sqlalchemy.exc import DatabaseError
from utils.config import DatabaseConfig
from utils.config import DatabaseConfig, UserConfig
from utils.errors.database import DatabaseConnectionError
@ -22,6 +22,7 @@ class DatabaseManager():
self.logger = logging.getLogger(__name__)
self.logger.info("Reading database config")
database_config = DatabaseConfig()
user_config = UserConfig()
self.engine = create_engine('mysql+mysqlconnector://%s:%s@%s:%s/%s' % (
database_config.user,
database_config.password,
@ -29,27 +30,34 @@ class DatabaseManager():
database_config.port,
database_config.name),
pool_pre_ping=True,
echo=True)
if self.test_connection():
self.Session = sessionmaker(bind=self.engine)
isolation_level=user_config.transaction_level.value)
self.test_connection()
self.Session = sessionmaker(bind=self.engine)
def cleanup(self) -> None:
self.logger.debug("Closing connection")
self.engine.dispose()
def test_connection(self) -> bool:
def test_connection(self):
self.logger.debug("Testing database connection")
try:
with self.engine.connect() as connection:
connection.execute(text("select 1"))
self.logger.debug("Database connection successful")
return True
except DatabaseError as e:
self.logger.critical(f"Database connection failed: {e}")
raise DatabaseConnectionError("Database connection failed") from e
@classmethod
def get_session(cls) -> Session:
return DatabaseManager._instance.Session()
user_config = UserConfig()
# Get the transaction level as a string (e.g., "READ COMMITTED", "SERIALIZABLE")
isolation_level = user_config.transaction_level.value
# Create a session with the appropriate transaction isolation level
session = cls._instance.Session()
session.connection(execution_options={"isolation_level": isolation_level})
return session
__all__ = ["DatabaseManager"]

View File

@ -1,6 +1,8 @@
import logging
from typing import List, Dict
from sqlalchemy.exc import IntegrityError, SQLAlchemyError, DatabaseError
from sqlalchemy.exc import IntegrityError, SQLAlchemyError, DatabaseError as SqlAlchemyDatabaseError
from sqlalchemy import delete
from utils.errors.database import DatabaseError, DuplicateEntryError, DatabaseConnectionError
from models import Member
@ -8,26 +10,113 @@ from database.manager import DatabaseManager
logger = logging.getLogger(__name__)
def fetch_all_members() -> List[Member]:
"""
Fetches all members from the database.
def create_new_member(new_member: Member):
:return: A list of all members in the database.
:raises DatabaseConnectionError: If the connection to the database is interrupted.
:raises DatabaseError: If any other error occurs while fetching members.
"""
with DatabaseManager.get_session() as session:
try:
return session.query(Member).all()
except SqlAlchemyDatabaseError as e:
logger.critical("Connection with database interrupted")
raise DatabaseConnectionError("Connection with database interrupted") from e
except SQLAlchemyError as e:
logger.error(f"An error occurred when fetching all members: {e}")
raise DatabaseError("An error occurred when fetching all members") from e
def create_member(new_member: Dict[str, str]):
create_members([new_member])
def create_members(members: List[Dict[str, str]]):
try:
with DatabaseManager.get_session() as session:
session.add(new_member)
for member_dict in members:
member = Member(
first_name=member_dict["first_name"],
last_name=member_dict["last_name"],
email=member_dict["email"],
phone=member_dict["phone_number"]
)
session.add(member)
session.commit()
except IntegrityError as e:
logger.warning("Data already exists")
session.rollback()
raise DuplicateEntryError("Data already exists in the database") from e
if "email" in str(e.orig):
logger.warning("Email is already in use")
raise DuplicateEntryError("Email", "Email is already in use") from e
elif "phone" in str(e.orig):
logger.warning("Phone number is already in use")
raise DuplicateEntryError("Phone number", "Phone number is already in use") from e
else:
logger.error("Member exists already in the database")
raise DatabaseError("Member exists already") from e
except DatabaseError as e:
session.rollback()
logger.critical("Connection with database interrupted")
raise DatabaseConnectionError(
"Connection with database interrupted") from e
raise DatabaseConnectionError("Connection with database interrupted") from e
except SQLAlchemyError as e:
logger.error(f"An error occured when saving member: {e}")
session.rollback()
raise DatabaseError(
"An error occured when creating a new member") from e
logger.error(f"An error occurred when saving member: {e}")
raise DatabaseError("An error occurred when creating a new member") from e
def update_member(member: Dict[str, str]):
try:
with DatabaseManager.get_session() as session:
logger.debug(f"Editing member {member['first_name']} {member['last_name']}")
existing_member = session.query(Member).get(member["id"])
if not existing_member:
logger.warning(f"Member with ID {member['id']} not found")
raise DatabaseError("Member not found in database")
existing_member.first_name = member["first_name"]
existing_member.last_name = member["last_name"]
existing_member.email = member["email"]
existing_member.phone = member["phone"]
session.commit()
except IntegrityError as e:
session.rollback()
if "email" in str(e.orig):
logger.warning("Email is already in use")
raise DuplicateEntryError("Email", "Email is already in use") from e
elif "phone" in str(e.orig):
logger.warning("Phone number is already in use")
raise DuplicateEntryError("Phone number", "Phone number is already in use") from e
else:
logger.error("An error occurred when updating member")
raise DatabaseError("An error occurred when updating member") from e
except DatabaseError as e:
session.rollback()
logger.critical("Connection with database interrupted")
raise DatabaseConnectionError("Connection with database interrupted") from e
except SQLAlchemyError as e:
session.rollback()
logger.error(f"An error occurred when saving member: {e}")
raise DatabaseError("An error occurred when creating a new member") from e
__all__ = ["create_new_member"]
def delete_member(member_id: int) -> None:
try:
with DatabaseManager.get_session() as session:
stmt = delete(Member).where(Member.id == member_id)
session.execute(stmt)
session.commit()
except SqlAlchemyDatabaseError as e:
logger.critical("Connection with database interrupted")
raise DatabaseConnectionError("Connection with database interrupted") from e
except SQLAlchemyError as e:
logger.error(f"An error occurred when deleting member: {e}")
raise DatabaseError("An error occurred when deleting member") from e
__all__ = ["create_member", "create_members", "fetch_all_members", "delete_member"]

View File

@ -1,76 +0,0 @@
from typing import Optional
import xml.etree.ElementTree as ET
from xml.dom import minidom
from database.manager import DatabaseManager
from utils.errors.export_error import ExportError
from utils.errors.no_export_entity_error import NoExportEntityError
from models import Book
class BookExporter():
def save_xml(self, file_path: str):
xml = self._get_full_xml()
with open(file_path, "w", encoding="utf-8") as file:
file.write(xml)
def _get_full_xml(self) -> str:
root = ET.Element("books")
with DatabaseManager.get_session() as session:
self.books = session.query(Book).all()
if not self.books:
raise NoExportEntityError("No books found to export")
for book in self.books:
# Create a <book> element
book_element = ET.SubElement(root, "book")
# Add <title>
title_element = ET.SubElement(book_element, "title")
title_element.text = book.title
# Add <author>
author_element = ET.SubElement(book_element, "author")
# Add <first_name>
author_first_name_element = ET.SubElement(
author_element, "first_name")
author_first_name_element.text = book.author.first_name
author_last_name_element = ET.SubElement(
author_element, "last_name")
author_last_name_element.text = book.author.last_name
# Add <description>
description_element = ET.SubElement(
book_element, "description")
description_element.text = book.description
# Add <year_published>
year_published_element = ET.SubElement(
book_element, "year_published")
year_published_element.text = book.year_published
# Add <isbn>
isbn_element = ET.SubElement(book_element, "isbn")
isbn_element.text = book.isbn
# Add <categories>
categories_element = ET.SubElement(book_element, "categories")
for category in book.categories:
category_element = ET.SubElement(
categories_element, "category")
category_element.text = category.name
# Convert the tree to a string
tree_str = ET.tostring(root, encoding="unicode")
# Pretty print the XML
pretty_xml = minidom.parseString(
tree_str).toprettyxml(indent=(" " * 4))
return pretty_xml

View File

@ -1,133 +0,0 @@
from typing import List, Dict
import os
import logging
from xml.etree import ElementTree as ET
from xmlschema import XMLSchema
from database.manager import DatabaseManager
from utils.errors.import_error.xsd_scheme_not_found import XsdSchemeNotFoundError
from utils.errors.import_error.invalid_contents_error import InvalidContentsError
from utils.errors.import_error.import_error import ImportError
from models import Book, Author, BookCategory
from sqlalchemy.exc import IntegrityError
class BookImporter:
def __init__(self):
# Initialize the logger and schema
self.logger = logging.getLogger(__name__)
try:
self.logger.debug("Opening XSD scheme in ./")
scheme_path = os.path.join(os.path.dirname(__file__), "book_import_scheme.xsd")
self.schema = XMLSchema(scheme_path)
except Exception as e:
self.logger.error("Failed to load XSD scheme")
raise XsdSchemeNotFoundError(f"Failed to load XSD schema: {e}")
def parse_xml(self, file_path: str) -> List[Dict[str, object]]:
"""Parses the XML file and validates it against the XSD schema."""
try:
tree = ET.parse(file_path)
root = tree.getroot()
if not self.schema.is_valid(file_path):
raise InvalidContentsError("XML file is not valid according to XSD schema.")
books = []
for book_element in root.findall("book"):
title = book_element.find("title").text
year_published = book_element.find("year_published").text
description = book_element.find("description").text
isbn = book_element.find("isbn").text
# Parse author
author_element = book_element.find("author")
author = {
"first_name": author_element.find("first_name").text,
"last_name": author_element.find("last_name").text
}
# Parse categories
category_elements = book_element.find("categories").findall("category")
categories = [category_element.text for category_element in category_elements]
# Create a book dictionary with explicit types
book = {
"title": title,
"description": description,
"year_published": year_published,
"isbn": isbn,
"author": author,
"categories": categories
}
books.append(book)
return books
except ET.ParseError as e:
raise ImportError(f"Failed to parse XML file: {e}")
def save_books(self, books: List[Dict[str, object]]):
"""Saves a list of books to the database."""
try:
with DatabaseManager.get_session() as session:
processed_categories = {} # Cache for processed categories by name
for book_dict in books:
self.logger.debug(f"Attempting to save {book_dict['title']}")
# Check if the book already exists
existing_book = session.query(Book).filter_by(isbn=book_dict["isbn"]).first()
if existing_book:
self.logger.warning(f"ISBN {book_dict['isbn']} already exists. Skipping.")
continue
# Check or add the author
existing_author = session.query(Author).filter_by(
first_name=book_dict["author"]["first_name"],
last_name=book_dict["author"]["last_name"]
).one_or_none()
if existing_author is not None:
self.logger.debug(f"Author {existing_author.first_name} {existing_author.last_name} already exists. Reusing.")
author = existing_author
else:
self.logger.debug(f"Creating new author: {book_dict['author']['first_name']} {book_dict['author']['last_name']}")
author = Author(
first_name=book_dict["author"]["first_name"],
last_name=book_dict["author"]["last_name"]
)
session.add(author)
# Handle categories
filtered_categories = []
for category_name in book_dict["categories"]:
if category_name in processed_categories:
filtered_categories.append(processed_categories[category_name])
continue
existing_category = session.query(BookCategory).filter_by(name=category_name).one_or_none()
if existing_category is not None:
self.logger.debug(f"Category {category_name} already exists. Reusing.")
processed_categories[category_name] = existing_category
filtered_categories.append(existing_category)
else:
self.logger.debug(f"Adding new category: {category_name}")
new_category = BookCategory(name=category_name)
session.add(new_category)
processed_categories[category_name] = new_category
filtered_categories.append(new_category)
book = Book(
title=book_dict["title"],
description=book_dict["description"],
year_published=book_dict["year_published"],
isbn=book_dict["isbn"],
author=author,
categories=filtered_categories
)
session.add(book)
# Commit all changes
session.commit()
except e:
session.rollback()
raise ImportError(f"An error occurred when importing books: {e}") from e
finally:
session.close()

View File

@ -1,19 +1,19 @@
from .author import *
from .book import *
from .book_category import *
from .book_category_link import *
from .book_overview import *
from .member import *
from .librarian import *
from .loan import *
from .author_model import *
from .book_model import *
from .book_category_model import *
from .book_category_link_model import *
from .book_category_statistics_model import *
from .book_category_statistics_overview_model import *
from .book_overview_model import *
from .member_model import *
__all__ = [
*author.__all__,
*book.__all__,
*book_category.__all__,
*book_category_link.__all__,
*book_overview.__all__,
*member.__all__,
*librarian.__all__,
*loan.__all__
*author_model.__all__,
*book_model.__all__,
*book_category_model.__all__,
*book_category_link_model.__all__,
*book_category_statistics_model.__all__,
*book_category_statistics_overview_model.__all__,
*book_overview_model.__all__,
*member_model.__all__,
]

View File

@ -1,7 +1,7 @@
from sqlalchemy import Column, Integer, String, TIMESTAMP, UniqueConstraint, func
from sqlalchemy.orm import relationship
from .base import Base
from .base_model import Base
class Author(Base):
@ -16,5 +16,8 @@ class Author(Base):
# Reference 'Book' as a string to avoid direct import
books = relationship('Book', back_populates='author')
def to_dict(self):
return {col.name: getattr(self, col.name) for col in self.__table__.columns}
__all__ = ["Author"]

View File

@ -1,9 +1,9 @@
from sqlalchemy import Column, Integer, ForeignKey
from sqlalchemy.orm import relationship
from .book import Book
from .book_category import BookCategory
from .base import Base
from .book_model import Book
from .book_category_model import BookCategory
from .base_model import Base
class BookCategoryLink(Base):

View File

@ -1,25 +1,23 @@
from sqlalchemy import Column, Integer, String, TIMESTAMP, ForeignKey, UniqueConstraint, func
from sqlalchemy.orm import relationship
from .base import Base
from .base_model import Base
class BookCategory(Base):
__tablename__ = 'book_category'
__table_args__ = (UniqueConstraint('name'),)
id = Column(Integer, primary_key=True, autoincrement=True)
parent_category_id = Column(Integer, ForeignKey('book_category.id'), nullable=True)
name = Column(String(100), nullable=False)
mature_content = Column(Integer, nullable=False, default=0)
last_updated = Column(TIMESTAMP, nullable=False, server_default=func.now())
parent_category = relationship('BookCategory', remote_side=[id])
books = relationship(
'Book',
secondary='book_category_link', # Junction table
back_populates='categories' # For bidirectional relationship
secondary='book_category_link',
back_populates='categories',
)
book_category_statistics = relationship('BookCategoryStatistics', backref='book_category_statistics')
__all__ = ["BookCategory"]

View File

@ -0,0 +1,21 @@
from sqlalchemy import Column, Integer, ForeignKey
from sqlalchemy.dialects.mysql import INTEGER
from sqlalchemy.orm import relationship
from .base_model import Base
class BookCategoryStatistics(Base):
__tablename__ = 'book_category_statistics'
book_category_id = Column(Integer, ForeignKey('book_category.id', ondelete="cascade"), primary_key=True)
book_count = Column(INTEGER(unsigned=True), nullable=False, default=0)
category = relationship(
'BookCategory',
back_populates='book_category_statistics',
overlaps="book_category_statistics"
)
__all__ = ["BookCategoryStatistics"]

View File

@ -0,0 +1,19 @@
from sqlalchemy import Column, String, TIMESTAMP, Integer, Text, Enum
from sqlalchemy.dialects.mysql import INTEGER
from .base_model import Base
class BookCategoryStatisticsOverview(Base):
__tablename__ = 'book_category_statistics_overview'
__table_args__ = {'extend_existing': True}
id = Column(Integer, primary_key=True)
name = Column(String)
book_count = Column(INTEGER(unsigned=True), default=0)
def __repr__(self):
return (f"<BookCategoryStatisticsOverview(book_category_id={self.id}, book_count={self.book_count})>")
__all__ = ["BookCategoryStatisticsOverview"]

View File

@ -1,9 +1,9 @@
import enum
from sqlalchemy import Column, Integer, String, TIMESTAMP, Text, ForeignKey, Enum, UniqueConstraint, func
from sqlalchemy import Column, Integer, String, TIMESTAMP, Text, ForeignKey, Enum, UniqueConstraint, func, DECIMAL,Boolean
from sqlalchemy.orm import relationship
from .base import Base
from .base_model import Base
class BookStatusEnum(enum.Enum):
@ -22,12 +22,25 @@ class Book(Base):
description = Column(Text, nullable=False)
year_published = Column(String(4), nullable=False)
isbn = Column(String(13), nullable=False, unique=True)
price = Column(DECIMAL(5, 2), nullable=False, default=0)
is_damaged = Column(Boolean, nullable=False, default=False)
status = Column(Enum(BookStatusEnum), nullable=False, default=BookStatusEnum.available)
created_at = Column(TIMESTAMP, nullable=False, server_default=func.now())
last_updated = Column(TIMESTAMP, nullable=False, server_default=func.now())
author = relationship('Author', back_populates='books')
categories = relationship('BookCategory',secondary='book_category_link',back_populates='books')
author = relationship('Author', back_populates='books')
categories = relationship('BookCategory',secondary='book_category_link',back_populates='books')
def to_dict(self):
book_dict = {col.name: getattr(self, col.name) for col in self.__table__.columns}
book_dict['author'] = {
'first_name': self.author.first_name,
'last_name': self.author.last_name
}
book_dict['categories'] = [category.name for category in self.categories]
return book_dict
__all__ = ["Book", "BookStatusEnum"]

View File

@ -1,8 +1,7 @@
from sqlalchemy import Column, String, TIMESTAMP, Integer, Text, Enum
from sqlalchemy import Column, String, TIMESTAMP, Integer, Text, Enum, DECIMAL, Boolean
from .base import Base
from models.book import BookStatusEnum
from .base_model import Base
from .book_model import BookStatusEnum
class BooksOverview(Base):
__tablename__ = 'books_overview'
@ -16,12 +15,10 @@ class BooksOverview(Base):
categories = Column(Text, nullable=True)
year_published = Column(Integer, nullable=True)
isbn = Column(String, nullable=True)
price = Column(DECIMAL(5, 2), nullable=False, default=0)
is_damaged = Column(Boolean, nullable=False, default=False)
status = Column(Enum(BookStatusEnum), nullable=False)
created_at = Column(TIMESTAMP, nullable=False)
borrower_name = Column(String, nullable=True)
member_id = Column(Integer, nullable=True)
librarian_name = Column(String, nullable=True)
librarian_id = Column(Integer, nullable=True)
# This prevents accidental updates/deletes as it's a view
def __repr__(self):

View File

@ -1,33 +0,0 @@
import enum
from sqlalchemy import Column, Integer, String, TIMESTAMP, Text, ForeignKey, Enum, UniqueConstraint, func
from sqlalchemy.orm import relationship
from .base import Base
class LibrarianStatusEnum(enum.Enum):
active = 'active'
inactive = 'inactive'
class LibrarianRoleEnum(enum.Enum):
staff = 'staff'
admin = 'admin'
class Librarian(Base):
__tablename__ = 'librarian'
__table_args__ = (UniqueConstraint('id'),)
id = Column(Integer, primary_key=True, autoincrement=True)
first_name = Column(String(50), nullable=False)
last_name = Column(String(50), nullable=False)
email = Column(String(100), nullable=False, unique=True)
phone = Column(String(20), nullable=False)
hire_date = Column(TIMESTAMP, nullable=False, server_default=func.now())
status = Column(Enum(LibrarianStatusEnum), nullable=False, default=LibrarianStatusEnum.active)
role = Column(Enum(LibrarianRoleEnum), nullable=False)
last_updated = Column(TIMESTAMP, nullable=False, server_default=func.now())
__all__ = ["Librarian", "LibrarianRoleEnum", "LibrarianStatusEnum"]

View File

@ -1,39 +0,0 @@
import enum
from sqlalchemy import Column, Integer, String, TIMESTAMP, Text, ForeignKey, Enum, UniqueConstraint, Float, func
from sqlalchemy.orm import relationship
from .base import Base
from .book import Book
from .member import Member
from .librarian import Librarian
class LoanStatusEnum(enum.Enum):
borrowed = 'borrowed'
returned = 'returned'
overdue = 'overdue'
reserved = 'reserved'
class Loan(Base):
__tablename__ = 'loan'
__table_args__ = (UniqueConstraint('id'),)
id = Column(Integer, primary_key=True, autoincrement=True)
book_id = Column(Integer, ForeignKey('book.id'), nullable=False)
member_id = Column(Integer, ForeignKey('member.id'), nullable=False)
librarian_id = Column(Integer, ForeignKey('librarian.id'), nullable=False)
loan_date = Column(TIMESTAMP, nullable=False, server_default=func.now())
due_date = Column(TIMESTAMP, nullable=False)
return_date = Column(TIMESTAMP, nullable=True)
status = Column(Enum(LoanStatusEnum), nullable=False, default=LoanStatusEnum.borrowed)
overdue_fee = Column(Float, nullable=True)
last_updated = Column(TIMESTAMP, nullable=False, server_default=func.now())
book = relationship('Book', backref='loans')
member = relationship('Member', backref='loans')
librarian = relationship('Librarian', backref='loans')
__all__ = ["Loan", "LoanStatusEnum"]

View File

@ -3,7 +3,7 @@ import enum
from sqlalchemy import Column, Integer, String, TIMESTAMP, Text, ForeignKey, Enum, UniqueConstraint, func
from sqlalchemy.orm import relationship
from .base import Base
from .base_model import Base
class MemberStatusEnum(enum.Enum):
@ -13,7 +13,7 @@ class MemberStatusEnum(enum.Enum):
class Member(Base):
__tablename__ = 'member'
__table_args__ = (UniqueConstraint('id'),)
__table_args__ = (UniqueConstraint('id'), UniqueConstraint('email'), UniqueConstraint('phone'))
id = Column(Integer, primary_key=True, autoincrement=True)
first_name = Column(String(50), nullable=False)
@ -24,5 +24,8 @@ class Member(Base):
status = Column(Enum(MemberStatusEnum), nullable=True, default=MemberStatusEnum.active)
last_updated = Column(TIMESTAMP, nullable=True)
def to_dict(self):
return {col.name: getattr(self, col.name) for col in self.__table__.columns}
__all__ = ["Member", "MemberStatusEnum"]

View File

@ -5,3 +5,4 @@ PySide6_Essentials==6.8.1
python-dotenv==1.0.1
SQLAlchemy==2.0.36
xmlschema==3.4.3
mysql-connector-python==9.1.0

View File

@ -0,0 +1,61 @@
from typing import List
import logging
import xml.etree.ElementTree as ET
from xml.dom import minidom
from xmlschema import XMLSchema
from utils.errors import (
NoExportEntityError,
ExportError,
ExportFileError,
InvalidContentsError,
XsdSchemeNotFoundError,
ImportError,
)
from models import BookCategoryStatisticsOverview
from assets import asset_manager
from utils.errors import DatabaseConnectionError, DatabaseError
from database import fetch_all_book_category_statistics_overviews_with_count
logger = logging.getLogger(__name__)
def export_to_xml(file_path: str) -> None:
try:
category_list, count = fetch_all_book_category_statistics_overviews_with_count()
if not category_list:
raise NoExportEntityError("No categories found to export")
xml = category_statistics_to_xml(category_list, count)
with open(file_path, "w", encoding="utf-8") as file:
file.write(xml)
except OSError as e:
raise ExportFileError("Failed to save to a file") from e
except DatabaseConnectionError as e:
raise ExportError("An error with database occurred. Try again later") from e
except DatabaseError as e:
raise ExportError("Unknown error occurred") from e
def category_statistics_to_xml(category_statistics: List[BookCategoryStatisticsOverview], total_count: int):
root = ET.Element("statistics")
for statistic in category_statistics:
category_element = ET.SubElement(root, "category")
name_element = ET.SubElement(category_element, "name")
name_element.text = statistic.name
count_element = ET.SubElement(category_element, "count")
count_element.text = str(statistic.book_count)
total_count_element = ET.SubElement(root, "total_category_count")
total_count_element.text = str(total_count)
tree_str = ET.tostring(root, encoding="unicode")
pretty_xml = minidom.parseString(tree_str).toprettyxml(indent=(" " * 4))
return pretty_xml

View File

@ -0,0 +1,78 @@
import os
import logging
from typing import Optional, List, Dict
import xml.etree.ElementTree as ET
from xml.dom import minidom
from xmlschema import XMLSchema
from utils.errors import (
NoExportEntityError,
ExportError,
ExportFileError,
InvalidContentsError,
XsdSchemeNotFoundError,
ImportError,
)
# Initialize logger and XML Schema
logger = logging.getLogger(__name__)
from models import BooksOverview
from database import fetch_all_book_overviews
def export_to_xml(file_path: str) -> None:
logger.debug("Attempting to export book overview")
all_books = fetch_all_book_overviews()
if not all_books:
logger.warning("No books found to export")
raise NoExportEntityError("No books found to export")
xml = overviews_to_xml(all_books)
try:
with open(file_path, "w", encoding="utf-8") as file:
file.write(xml)
logger.info("Successfully saved book overview export")
except OSError as e:
raise ExportFileError("Failed to save to a file") from e
def overviews_to_xml(overview_list: List[BooksOverview]) -> str:
root = ET.Element("book_overview")
for book_overview in overview_list:
# Create a <book_entry> element
book_element = ET.SubElement(root, "book_entry")
# Add <title>
title_element = ET.SubElement(book_element, "title")
title_element.text = book_overview.title
# Add <author>
author_element = ET.SubElement(book_element, "author")
author_element.text = book_overview.author_name
# Add <year_published>
year_published_element = ET.SubElement(book_element, "year_published")
year_published_element.text = book_overview.year_published
# Add <isbn>
isbn_element = ET.SubElement(book_element, "isbn")
isbn_element.text = book_overview.isbn
# Add <borrower_name>
borrower_name = ET.SubElement(book_element, "borrower_name")
borrower_name.text = book_overview.borrower_name
# Add <librarian_name>
librarian_name = ET.SubElement(book_element, "librarian_name")
librarian_name.text = book_overview.librarian_name
# Convert the tree to a string
tree_str = ET.tostring(root, encoding="unicode")
# Pretty print the XML
pretty_xml = minidom.parseString(tree_str).toprettyxml(indent=(" " * 4))
return pretty_xml

View File

@ -0,0 +1,146 @@
import os
import logging
from typing import Optional, List, Dict
import xml.etree.ElementTree as ET
from xml.dom import minidom
from xmlschema import XMLSchema
from utils.errors import (
NoExportEntityError,
ExportError,
ExportFileError,
InvalidContentsError,
XsdSchemeNotFoundError,
ImportError,
)
from models import Book
from database import fetch_all_books, create_books
from assets import asset_manager
logger = logging.getLogger(__name__)
try:
logger.debug("Loading XSD schema")
SCHEMA = XMLSchema(asset_manager.get_asset("book_import_scheme.xsd"))
except Exception as e:
logger.error("Failed to load XSD schema")
raise XsdSchemeNotFoundError(f"Failed to load XSD schema: {e}")
def export_to_xml(file_path: str) -> None:
all_books = fetch_all_books()
if not all_books:
raise NoExportEntityError("No books found to export")
xml = books_to_xml(all_books)
try:
with open(file_path, "w", encoding="utf-8") as file:
file.write(xml)
except OSError as e:
raise ExportFileError("Failed to save to a file") from e
def save_books(books: List[Dict[str, object]]):
create_books(books)
def parse_from_xml(file_path: str) -> List[Dict[str, object]]:
if not SCHEMA.is_valid(file_path):
raise InvalidContentsError("XML file is not valid according to XSD schema.")
try:
tree = ET.parse(file_path)
root = tree.getroot()
books = []
for book_element in root.findall("book"):
title = book_element.find("title").text
year_published = book_element.find("year_published").text
description = book_element.find("description").text
isbn = book_element.find("isbn").text
price = float(book_element.find("price").text)
is_damaged = bool(book_element.find("is_damaged").text)
# Parse author
author_element = book_element.find("author")
author = {
"first_name": author_element.find("first_name").text,
"last_name": author_element.find("last_name").text,
}
# Parse categories
category_elements = book_element.find("categories").findall("category")
categories = [category_element.text for category_element in category_elements]
# Create a book dictionary
book = {
"title" : title,
"description" : description,
"year_published" : year_published,
"isbn" : isbn,
"author" : author,
"price": price,
"is_damaged" : is_damaged,
"categories" : categories,
}
books.append(book)
return books
except ET.ParseError as e:
raise ImportError(f"Failed to parse XML file: {e}")
def books_to_xml(books: List[Book]) -> str:
root = ET.Element("books")
for book in books:
# Create a <book> element
book_element = ET.SubElement(root, "book")
# Add <title>
title_element = ET.SubElement(book_element, "title")
title_element.text = book.title
# Add <author>
author_element = ET.SubElement(book_element, "author")
# Add <first_name>
author_first_name_element = ET.SubElement(author_element, "first_name")
author_first_name_element.text = book.author.first_name
author_last_name_element = ET.SubElement(author_element, "last_name")
author_last_name_element.text = book.author.last_name
# Add <description>
description_element = ET.SubElement(book_element, "description")
description_element.text = book.description
# Add <year_published>
year_published_element = ET.SubElement(book_element, "year_published")
year_published_element.text = book.year_published
# Add <isbn>
isbn_element = ET.SubElement(book_element, "isbn")
isbn_element.text = book.isbn
price_element = ET.SubElement(book_element, "price")
price_element.text = str(book.price)
damaged_element = ET.SubElement(book_element, "is_damaged")
damaged_element.text = str(book.is_damaged).lower()
# Add <categories>
categories_element = ET.SubElement(book_element, "categories")
for category in book.categories:
category_element = ET.SubElement(categories_element, "category")
category_element.text = category.name
# Convert the tree to a string
tree_str = ET.tostring(root, encoding="unicode")
# Pretty print the XML
pretty_xml = minidom.parseString(tree_str).toprettyxml(indent=(" " * 4))
return pretty_xml
__all__ = ["export_to_xml", "parse_from_xml"]

View File

@ -1,156 +0,0 @@
from PySide6.QtGui import QGuiApplication, QAction, Qt
from PySide6.QtQml import QQmlApplicationEngine
from PySide6.QtWidgets import QHBoxLayout, QVBoxLayout, QLabel, QWidget, QMenu, QSizePolicy, QLayout, QMessageBox
from PySide6.QtCore import qDebug
from ui.editor import BookEditor
from models import BooksOverview, Book, BookStatusEnum
from database.manager import DatabaseManager
from sqlalchemy import delete
STATUS_TO_COLOR_MAP = {
BookStatusEnum.available: "#3c702e",
BookStatusEnum.borrowed: "#702525",
BookStatusEnum.reserved: "#bc7613"
}
class BookCard(QWidget):
def __init__(self, book_overview: BooksOverview):
super().__init__()
self.book_overview = book_overview
self.setAttribute(Qt.WidgetAttribute.WA_Hover,
True) # Enable hover events
# Enable styling for background
self.setAttribute(Qt.WidgetAttribute.WA_StyledBackground, True)
# Set initial stylesheet with hover behavior
self.setStyleSheet("""
BookCard:hover {
background-color: palette(highlight);
}
""")
# Layout setup
layout = QHBoxLayout(self)
layout.setSizeConstraint(QLayout.SizeConstraint.SetMinimumSize)
self.setSizePolicy(QSizePolicy.Policy.Preferred,
QSizePolicy.Policy.Fixed)
layout.setContentsMargins(10, 10, 10, 10)
layout.setSpacing(10)
# Left-side content
left_side = QVBoxLayout()
layout.addLayout(left_side)
title_label = QLabel(book_overview.title)
title_label.setStyleSheet("font-size: 20px; font-weight: bold;")
author_label = QLabel("By: " + book_overview.author_name)
isbn_label = QLabel("ISBN: " + (book_overview.isbn or "Not Available"))
left_side.addWidget(title_label)
left_side.addWidget(author_label)
left_side.addWidget(isbn_label)
# Right-side content
right_side = QVBoxLayout()
layout.addLayout(right_side)
status_label = QLabel(str(book_overview.status.value.capitalize()))
status_label.setStyleSheet(f"color: {
STATUS_TO_COLOR_MAP[book_overview.status]}; font-size: 20px; font-weight: bold;")
status_label.setAlignment(Qt.AlignmentFlag.AlignRight)
right_side.addWidget(status_label)
if book_overview.librarian_name and book_overview.borrower_name:
borrower_label = QLabel("Borrowed: " + book_overview.borrower_name)
borrower_label.setAlignment(Qt.AlignmentFlag.AlignRight)
librarian_label = QLabel("By: " + book_overview.librarian_name)
librarian_label.setAlignment(Qt.AlignmentFlag.AlignRight)
right_side.addWidget(borrower_label)
right_side.addWidget(librarian_label)
self.setLayout(layout)
def mousePressEvent(self, event):
if event.button() == Qt.MouseButton.LeftButton:
self.contextMenuEvent(event)
else:
super().mousePressEvent(event)
def contextMenuEvent(self, event):
context_menu = QMenu(self)
action_edit_book = context_menu.addAction("Edit Book")
action_edit_author = context_menu.addAction("Edit Author")
action_mark_returned = context_menu.addAction("Mark as Returned")
action_remove_reservation = context_menu.addAction(
"Remove reservation")
context_menu.addSeparator()
delete_book_action = context_menu.addAction("Delete Book")
delete_book_action.triggered.connect(self.delete_book)
if self.book_overview.status != BookStatusEnum.borrowed:
action_mark_returned.setVisible(False)
if self.book_overview.status != BookStatusEnum.reserved:
action_remove_reservation.setVisible(False)
action = context_menu.exec_(self.mapToGlobal(event.pos()))
if action == action_edit_book:
with DatabaseManager.get_session() as session:
book_id = self.book_overview.id
book = session.query(Book).filter(
Book.id == book_id).one_or_none()
if book:
BookEditor(book).exec()
else:
QMessageBox.critical(self,
"Error",
"The book you requested could not be found. Try again later",
QMessageBox.StandardButton.Ok,
QMessageBox.StandardButton.NoButton)
elif action == action_edit_author:
print("Edit Author selected")
elif action == action_mark_returned:
print("Mark as Returned selected")
elif action == action_remove_reservation:
print("Remove reservation selected")
def delete_book(self):
if not self.make_sure():
return
with DatabaseManager.get_session() as session:
try:
stmt = delete(Book).where(Book.id == self.book_overview.id)
session.execute(stmt)
session.commit()
self.setVisible(False)
except Exception as e:
session.rollback
print(e)
def make_sure(self) -> bool:
are_you_sure_box = QMessageBox()
are_you_sure_box.setIcon(QMessageBox.Question)
are_you_sure_box.setWindowTitle("Are you sure?")
are_you_sure_box.setText(f"Are you sure you want to delete {self.book_overview.title}?")
are_you_sure_box.setStandardButtons(QMessageBox.Yes | QMessageBox.No)
are_you_sure_box.setDefaultButton(QMessageBox.No)
# Show the message box and capture the user's response
response = are_you_sure_box.exec()
# Handle the response
return response == QMessageBox.Yes

View File

@ -1,29 +1,33 @@
from typing import Dict, Callable
import logging
from PySide6.QtWidgets import (
QDialog, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QTextEdit, QPushButton, QComboBox, QFormLayout, QMessageBox
QDialog, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QTextEdit, QPushButton, QComboBox, QFormLayout, QMessageBox, QCheckBox
)
from PySide6.QtGui import QRegularExpressionValidator
from PySide6.QtCore import QRegularExpression
from models import Book, BookStatusEnum
from models import Book, BookStatusEnum, BookCategory
from database import update_book
from database import update_book, create_book
from utils.errors.database import DatabaseError, DatabaseConnectionError, DuplicateEntryError
class BookEditor(QDialog):
def __init__(self, book: Book = None, parent=None):
def __init__(self, book: Book = None, parent=None, refresh_callback: Callable[[Dict[str, object]], None] = None):
super().__init__(parent)
self.logger = logging.getLogger(__name__)
self.create_layout()
self.refresh_callback = refresh_callback
if book:
self.logger.debug(f"Editing existing book {book.title}")
self.book = book
self.book_id = book.id
self.logger.debug(f"Editing book {book.title}")
self.create_new = False
self.fill_with_existing_data()
self.fill_with_existing_data(book)
else:
self.logger.debug("Editing a new book")
self.create_new = True
@ -43,7 +47,7 @@ class BookEditor(QDialog):
form_layout.addRow("Title:", self.title_input)
# Author field
self.author_label = QLabel()
self.author_label = QLineEdit()
form_layout.addRow("Author: ", self.author_label)
# Description field
@ -52,20 +56,33 @@ class BookEditor(QDialog):
# Year published field
self.year_input = QLineEdit()
# self.year_input.setValidator
form_layout.addRow("Year Published:", self.year_input)
# ISBN field
self.isbn_input = QLineEdit()
self.isbn_expression = QRegularExpression("\d{10}|\d{13}")
self.isbn_expression = QRegularExpression("(\d{13})|(\d{10})")
self.isbn_validator = QRegularExpressionValidator(self.isbn_expression)
self.isbn_input.setValidator(self.isbn_validator)
form_layout.addRow("ISBN:", self.isbn_input)
# Categories field
self.categories_input = QLineEdit()
self.categories_input.setEnabled(False)
form_layout.addRow("Categories: ", self.categories_input)
# Damage field
self.damage_input = QCheckBox()
form_layout.addRow("Damage:", self.damage_input)
# Price field
self.price_input = QLineEdit()
self.price_input.setValidator(QRegularExpressionValidator(QRegularExpression(r"^\d+(\.\d{1,2})?$")))
form_layout.addRow("Price:", self.price_input)
# Status field
self.status_input = QComboBox()
self.status_input.addItems([status.value for status in BookStatusEnum])
form_layout.addRow("Status:", self.status_input)
layout.addLayout(form_layout)
# Buttons
@ -81,50 +98,128 @@ class BookEditor(QDialog):
layout.addLayout(button_layout)
def fill_with_existing_data(self):
self.title_input.setText(self.book.title)
self.description_input.setText(self.book.description)
self.year_input.setText(self.book.year_published)
self.isbn_input.setText(self.book.isbn)
def fill_with_existing_data(self, book: Book):
self.title_input.setText(book.title)
self.description_input.setText(book.description)
self.year_input.setText(book.year_published)
self.isbn_input.setText(book.isbn)
def save_book(self):
# Update book object with input values
self.book.title = self.title_input.text()
full_author_name = f"{self.book.author.first_name} {
self.book.author.last_name}"
full_author_name = f"{book.author.first_name} {book.author.last_name}"
self.author_label.setText(full_author_name)
self.book.description = self.description_input.toPlainText()
self.book.year_published = self.year_input.text()
self.book.isbn = self.isbn_input.text()
all_categories = ", ".join(
category.name for category in self.book.categories)
all_categories = ", ".join(category.name for category in book.categories)
self.categories_input.setText(all_categories)
self.damage_input.setChecked(book.is_damaged)
self.price_input.setText(str(book.price))
self.status_input.setCurrentText(book.status.value)
def save_book(self):
try:
book_object = self.parse_inputs()
if self.create_new:
pass
create_book(book_object, skip_existing=False)
else:
update_book(self.book)
book_object["id"] = self.book_id
update_book(book_object)
QMessageBox.information(None,
"Success",
"Book updated successfully",
QMessageBox.StandardButton.Ok)
if self.refresh_callback:
self.refresh_callback(book_object)
self.accept()
except ValueError as e:
QMessageBox.critical(None,
"Invalid Input",
f"Input validation failed: {e}",
QMessageBox.StandardButton.Ok)
except DuplicateEntryError as e:
QMessageBox.critical(None,
"ISBN is already in use",
"The ISBN provided is already in use",
QMessageBox.StandardButton.Ok,
QMessageBox.StandardButton.NoButton)
QMessageBox.StandardButton.Ok)
except DatabaseConnectionError as e:
QMessageBox.critical(None,
"Failed to save",
"Could not connect to the database",
QMessageBox.StandardButton.Ok,
QMessageBox.StandardButton.NoButton)
QMessageBox.StandardButton.Ok)
except DatabaseError as e:
QMessageBox.critical(self.parent,
"An error occured",
QMessageBox.critical(None,
"An error occurred",
f"Could not save the book because of the following error: {e}",
QMessageBox.StandardButton.Ok,
QMessageBox.StandardButton.NoButton)
QMessageBox.StandardButton.Ok)
def parse_inputs(self) -> Dict[str, object]:
# Title validation
title = self.title_input.text().strip()
if not title or len(title) > 100:
raise ValueError("Title must be non-empty and at most 100 characters long.")
# Author validation
author_name = self.author_label.text().strip()
if not author_name or len(author_name.split()) < 2:
raise ValueError("Author must include at least a first and last name.")
# Split author name into first and last names
author_parts = author_name.split()
first_name = author_parts[0]
last_name = " ".join(author_parts[1:])
# Description validation
description = self.description_input.toPlainText().strip()
if not description:
raise ValueError("Description cannot be empty.")
# Year published validation
year_published = self.year_input.text().strip()
if not year_published.isdigit() or len(year_published) != 4 or int(year_published) < 0:
raise ValueError("Year published must be a 4-digit positive number.")
# ISBN validation
isbn = self.isbn_input.text().strip()
if not isbn or len(isbn) not in (10, 13):
raise ValueError("ISBN must be either 10 or 13 characters long.")
# Categories validation
category_text = self.categories_input.text().strip()
categories = [category.strip() for category in category_text.split(",") if category.strip()]
if not categories:
raise ValueError("At least one category must be specified.")
# Damage validation
damage = self.damage_input.isChecked()
# Price validation
price = self.price_input.text().strip()
try:
price = float(price)
if price < 0:
raise ValueError("Price must be a non-negative number.")
except ValueError:
raise ValueError("Price must be a valid decimal number.")
# Status validation
status = self.status_input.currentText()
# Map parsed values to dictionary format for saving
return {
"title": title,
"author": {
"first_name": first_name,
"last_name": last_name
},
"description": description,
"year_published": year_published,
"isbn": isbn,
"categories": categories,
"is_damaged": damage,
"price": price,
"status": status
}
__all__ = ["BookEditor"]

View File

@ -1,4 +1,6 @@
import logging
import re
from typing import Dict, Callable
from PySide6.QtGui import QGuiApplication, QAction
from PySide6.QtQml import QQmlApplicationEngine
@ -7,24 +9,27 @@ from PySide6.QtWidgets import QVBoxLayout, QFormLayout, QLineEdit, QHBoxLayout,
from models import Member
from database.member import create_new_member
from database.member import create_member, update_member
from utils.errors.database import DatabaseError, DatabaseConnectionError, DuplicateEntryError
class MemberEditor(QDialog):
def __init__(self, member: Member = None):
def __init__(self, member: Member = None, refresh_callback: Callable[[Dict[str, object]], None] = None):
super().__init__()
self.logger = logging.getLogger(__name__)
self.create_layout()
self.refresh_callback = refresh_callback
if member:
self.member = member
self.fill_with_existing_data()
self.member_id = member.id
self.logger.debug(f"Editing member {member.first_name} {member.last_name}")
self.fill_with_existing_data(member)
self.create_new = False
else:
self.member = Member()
self.logger.debug("Editing a new member")
self.create_new = True
def create_layout(self):
@ -68,27 +73,83 @@ class MemberEditor(QDialog):
self.layout.addLayout(self.button_layout)
def fill_with_existing_data(self):
self.first_name_input.setText(self.member.first_name)
self.last_name_input.setText(self.member.last_name)
self.email_input.setText(self.member.email)
self.phone_number_input.setText(self.member.phone)
def fill_with_existing_data(self, member: Member):
self.first_name_input.setText(member.first_name)
self.last_name_input.setText(member.last_name)
self.email_input.setText(member.email)
self.phone_number_input.setText(member.phone)
def save_member(self):
self.member.first_name = self.first_name_input.text()
self.member.last_name = self.last_name_input.text()
self.member.email = self.email_input.text()
self.member.phone = self.phone_number_input.text()
try:
member_object = self.parse_inputs()
if self.create_new:
self.logger.debug("Creating new member")
create_new_member(self.member)
except DuplicateEntryError:
QMessageBox.critical(None, "Details already in use", "Cannot create a new user",
QMessageBox.StandardButton.Ok, QMessageBox.StandardButtons.NoButton)
create_member(member_object)
QMessageBox.information(None,
"Success",
"Member created successfully",
QMessageBox.StandardButton.Ok,
QMessageBox.StandardButton.NoButton)
else:
member_object["id"] = self.member_id
update_member(member_object)
QMessageBox.information(None,
"Success",
"Member updated successfully",
QMessageBox.StandardButton.Ok,
QMessageBox.StandardButton.NoButton)
self.accept()
if self.refresh_callback:
self.refresh_callback(member_object)
self.accept()
except ValueError as e:
QMessageBox.critical(None,
"Invalid Input",
f"Input validation failed: {e}",
QMessageBox.StandardButton.Ok,
QMessageBox.StandardButton.NoButton)
except DuplicateEntryError as e:
QMessageBox.critical(None,
f"Duplicate {e.duplicate_entry_name}",
f"The {e.duplicate_entry_name} is already in use",
QMessageBox.StandardButton.Ok,
QMessageBox.StandardButton.NoButton)
except DatabaseConnectionError as e:
QMessageBox.critical(None,
"Connection error",
"Could not connect to the database",
QMessageBox.StandardButton.Ok)
except DatabaseError as e:
QMessageBox.critical(None,
"Unknown database error",
f"Could not save the book because of the following error: {e}",
QMessageBox.StandardButton.Ok)
def parse_inputs(self) -> Dict:
first_name = self.first_name_input.text().strip()
if not first_name or len(first_name) > 50:
raise ValueError("First name must be non-empty and at most 50 characters long.")
last_name = self.last_name_input.text().strip()
if not last_name or len(last_name) > 50:
raise ValueError("Last name must be non-empty and at most 50 characters long.")
email = self.email_input.text().strip()
email_regex = r"^[\w\-\.]+@([\w\-]+\.)+[\w\-]{2,}$"
if not re.match(email_regex, email):
raise ValueError("E-mail address is not in a valid format.")
phone_number = self.phone_number_input.text().strip()
phone_number_regex = r"(\+\d{1,3})?\d{9}"
if not re.match(phone_number_regex, phone_number):
raise ValueError("Phone number is not in valid format.")
return {
"first_name": first_name,
"last_name": last_name,
"email": email,
"phone": phone_number
}
__all__ = ["MemberEditor"]

View File

@ -0,0 +1,3 @@
from .book_overview_list import BookOverviewList
from .member_list import MemberList
from .category_statistics_overview_list import BookCategoryStatisticsOverview

View File

@ -0,0 +1,7 @@
from .overview_list import *
from .book_card import *
__all__ = [
*overview_list.__all__,
*book_card.__all__
]

View File

@ -0,0 +1,148 @@
from PySide6.QtGui import QAction, Qt
from PySide6.QtWidgets import QHBoxLayout, QVBoxLayout, QLabel, QWidget, QMenu, QSizePolicy, QLayout, QMessageBox, QDialog
from models import BooksOverview, Book, BookStatusEnum
from ui.editor import BookEditor
from database.manager import DatabaseManager
from database import delete_book
from utils.errors import DatabaseConnectionError, DatabaseError
STATUS_TO_COLOR_MAP = {
BookStatusEnum.available: "#3c702e",
BookStatusEnum.borrowed: "#702525",
BookStatusEnum.reserved: "#bc7613"
}
class BookCard(QWidget):
def __init__(self, book_overview: BooksOverview):
super().__init__()
self.book_overview = book_overview
self.setAttribute(Qt.WidgetAttribute.WA_Hover, True) # Enable hover events
self.setAttribute(Qt.WidgetAttribute.WA_StyledBackground, True) # Enable styling for background
# Set initial stylesheet with hover behavior
self.setStyleSheet("""
BookCard:hover {
background-color: palette(highlight);
}
""")
# Layout setup
layout = QHBoxLayout(self)
layout.setSizeConstraint(QLayout.SizeConstraint.SetMinimumSize)
self.setSizePolicy(QSizePolicy.Policy.Preferred, QSizePolicy.Policy.Fixed)
layout.setContentsMargins(10, 10, 10, 10)
layout.setSpacing(10)
# Initialize UI components
self.title_label = QLabel()
self.author_label = QLabel()
self.isbn_label = QLabel()
self.status_label = QLabel()
self.price_label = QLabel()
self.is_damaged_label = QLabel()
# Left-side content
left_side = QVBoxLayout()
layout.addLayout(left_side)
left_side.addWidget(self.title_label)
left_side.addWidget(self.author_label)
left_side.addWidget(self.isbn_label)
# Right-side content
right_side = QVBoxLayout()
layout.addLayout(right_side)
right_side.addWidget(self.status_label)
right_side.addWidget(self.price_label)
right_side.addWidget(self.is_damaged_label)
self.setLayout(layout)
self.update_display()
def update_display(self):
"""Refreshes the display of the book card based on its current data."""
self.title_label.setText(self.book_overview.title)
self.title_label.setStyleSheet("font-size: 20px; font-weight: bold;")
self.author_label.setText("By: " + self.book_overview.author_name)
self.isbn_label.setText("ISBN: " + (self.book_overview.isbn or "Not Available"))
self.status_label.setText(str(self.book_overview.status.value.capitalize()))
self.status_label.setStyleSheet(f"color: {STATUS_TO_COLOR_MAP[self.book_overview.status]}; font-size: 20px; font-weight: bold;")
self.status_label.setAlignment(Qt.AlignmentFlag.AlignRight)
self.price_label.setText("Price: " + str(self.book_overview.price))
self.price_label.setAlignment(Qt.AlignmentFlag.AlignRight)
self.is_damaged_label.setText("Damaged: " + str(self.book_overview.is_damaged))
self.is_damaged_label.setAlignment(Qt.AlignmentFlag.AlignRight)
def contextMenuEvent(self, event):
context_menu = QMenu(self)
action_edit_book = context_menu.addAction("Edit Book")
delete_book_action = context_menu.addAction("Delete Book")
delete_book_action.triggered.connect(self.delete_book)
action = context_menu.exec_(self.mapToGlobal(event.pos()))
if action == action_edit_book:
self.open_editor()
def open_editor(self):
"""Opens the BookEditor and updates the card if changes are made."""
with DatabaseManager.get_session() as session:
book_id = self.book_overview.id
book = session.query(Book).filter(Book.id == book_id).one_or_none()
if book:
editor = BookEditor(book)
if editor.exec() == QDialog.DialogCode.Accepted:
updated_data = editor.parse_inputs()
self.refresh(updated_data)
else:
QMessageBox.critical(self,
"Error",
"The book you requested could not be found. Try again later",
QMessageBox.StandardButton.Ok)
def refresh(self, updated_data):
"""Updates the card's data and refreshes the display."""
self.book_overview.title = updated_data["title"]
self.book_overview.author_name = f"{updated_data['author']['first_name']} {updated_data['author']['last_name']}"
self.book_overview.isbn = updated_data["isbn"]
self.book_overview.status = BookStatusEnum(updated_data["status"])
self.book_overview.price = updated_data["price"]
self.book_overview.is_damaged = updated_data["is_damaged"]
self.update_display()
def delete_book(self):
if not self.make_sure():
return
try:
delete_book(self.book_overview.id)
self.setVisible(False)
except DatabaseConnectionError as e:
QMessageBox.critical(None, "Failed", "Connection with database failed", QMessageBox.StandardButton.Ok)
except DatabaseError as e:
QMessageBox.critical(None, "Failed", f"An error occurred when deleting book: {e}", QMessageBox.StandardButton.Ok)
def make_sure(self) -> bool:
are_you_sure_box = QMessageBox()
are_you_sure_box.setIcon(QMessageBox.Question)
are_you_sure_box.setWindowTitle("Are you sure?")
are_you_sure_box.setText(f"Are you sure you want to delete {self.book_overview.title}?")
are_you_sure_box.setStandardButtons(QMessageBox.Yes | QMessageBox.No)
are_you_sure_box.setDefaultButton(QMessageBox.No)
# Show the message box and capture the user's response
response = are_you_sure_box.exec()
# Handle the response
return response == QMessageBox.Yes
__all__ = ["BookCard"]

View File

@ -9,13 +9,15 @@ from .book_card import BookCard
from models import BooksOverview
from database.manager import DatabaseManager
from database.book_overview import fetch_all_book_overviews
from ui.editor import MemberEditor
class LibraryDashboard(QWidget):
def __init__(self):
super().__init__()
class BookOverviewList(QWidget):
def __init__(self, parent = None):
self.parent = parent
super().__init__(parent=parent)
# Central widget and layout
main_layout = QVBoxLayout(self)
@ -59,10 +61,6 @@ class LibraryDashboard(QWidget):
register_member_button.clicked.connect(self.register_member)
button_layout.addWidget(register_member_button)
add_borrow_record_button = QPushButton("Add Borrow Record")
add_borrow_record_button.clicked.connect(self.add_borrow_record)
button_layout.addWidget(add_borrow_record_button)
main_layout.addLayout(button_layout)
def filter_books(self, text):
@ -77,6 +75,7 @@ class LibraryDashboard(QWidget):
def register_member(self):
MemberEditor().exec()
self.parent.refresh_member_cards()
def add_borrow_record(self):
QMessageBox.information(self, "Add Borrow Record",
@ -98,7 +97,7 @@ class LibraryDashboard(QWidget):
self.clear_layout(self.scroll_layout)
self.book_cards = []
self.books = self.fetch_books_from_db()
self.books = fetch_all_book_overviews()
for book in self.books:
card = BookCard(book)
@ -106,13 +105,5 @@ class LibraryDashboard(QWidget):
self.scroll_layout.addWidget(card)
self.book_cards.append(card)
def fetch_books_from_db(self):
"""Fetch all books from the database."""
try:
with DatabaseManager.get_session() as session:
books = session.query(BooksOverview).all()
return books
except Exception as e:
QMessageBox.critical(self, "Database Error",
f"Failed to fetch books: {e}")
return []
__all__ = ["BookOverviewList"]

View File

@ -0,0 +1,7 @@
from .category_overview_list import *
from .category_overview_card import *
__all__ = [
*category_overview_list.__all__,
*category_overview_card.__all__
]

View File

@ -0,0 +1,58 @@
from PySide6.QtGui import QGuiApplication, QAction, Qt
from PySide6.QtQml import QQmlApplicationEngine
from PySide6.QtWidgets import QHBoxLayout, QVBoxLayout, QLabel, QWidget, QMenu, QSizePolicy, QLayout, QMessageBox
from PySide6.QtCore import qDebug
from ui.editor import BookEditor
from models import BookCategoryStatisticsOverview
from database.manager import DatabaseManager
class BookCategoryStatisticsOverviewCard(QWidget):
def __init__(self, book_category_statistics_overview: BookCategoryStatisticsOverview):
super().__init__()
self.book_category_statistics_overview = book_category_statistics_overview
self.setAttribute(Qt.WidgetAttribute.WA_Hover, True) # Enable hover events
# Enable styling for background
self.setAttribute(Qt.WidgetAttribute.WA_StyledBackground, True)
# Set initial stylesheet with hover behavior
self.setStyleSheet("""
BookCategoryStatisticsOverviewCard:hover {
background-color: palette(highlight);
}
""")
# Layout setup
layout = QHBoxLayout(self)
layout.setSizeConstraint(QLayout.SizeConstraint.SetMinimumSize)
self.setSizePolicy(QSizePolicy.Policy.Preferred,
QSizePolicy.Policy.Fixed)
layout.setContentsMargins(10, 10, 10, 10)
layout.setSpacing(10)
# Left-side content
left_side = QVBoxLayout()
layout.addLayout(left_side)
category_name_label = QLabel(book_category_statistics_overview.name)
category_name_label.setStyleSheet("font-size: 20px; font-weight: bold;")
left_side.addWidget(category_name_label)
# Right-side content
right_side = QVBoxLayout()
layout.addLayout(right_side)
status_label = QLabel(str(book_category_statistics_overview.book_count))
status_label.setStyleSheet("font-size: 20px; font-weight: bold;")
status_label.setAlignment(Qt.AlignmentFlag.AlignRight)
right_side.addWidget(status_label)
self.setLayout(layout)
__all__ = ["BookCategoryStatisticsOverviewCard"]

View File

@ -0,0 +1,89 @@
from PySide6.QtGui import QAction
from PySide6.QtWidgets import (
QWidget, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QScrollArea,
QFrame, QPushButton, QMessageBox, QVBoxLayout
)
from PySide6.QtCore import Qt
from .category_overview_card import BookCategoryStatisticsOverviewCard
from models import BookCategoryStatisticsOverview
from database.manager import DatabaseManager
from database import fetch_all_book_category_statistics_overviews
class BookCategoryStatisticsOverview(QWidget):
def __init__(self, parent = None):
self.parent = parent
super().__init__(parent=parent)
# Central widget and layout
main_layout = QVBoxLayout(self)
# Title label
title_label = QLabel("Category statistics", self)
title_label.setAlignment(Qt.AlignCenter)
title_label.setStyleSheet(
"font-size: 20px; font-weight: bold; color: #0078D4;")
main_layout.addWidget(title_label)
# Search bar
self.search_input = QLineEdit()
self.search_input.setPlaceholderText("Search in categories...")
self.search_input.textChanged.connect(self.filter_categories)
main_layout.addWidget(self.search_input)
# Scrollable area for cards
self.scroll_area = QScrollArea()
self.scroll_area.setWidgetResizable(True)
# Container widget for the scroll area
self.scroll_widget = QWidget()
self.scroll_layout = QVBoxLayout(self.scroll_widget)
self.scroll_layout.setSpacing(5) # Set gap between individual cards
self.scroll_layout.setContentsMargins(0, 0, 0, 0) # Remove spacing from all sides which is present by default
# Align the cards to the top
self.scroll_layout.setAlignment(Qt.AlignTop)
self.category_overviews = []
self.category_overview_cards = []
self.redraw_cards()
self.scroll_widget.setLayout(self.scroll_layout)
self.scroll_area.setWidget(self.scroll_widget)
main_layout.addWidget(self.scroll_area)
def filter_categories(self, text):
"""Filter the cards based on the search input."""
for card, category in zip(self.category_overview_cards, self.category_overviews):
name_contains_text = text.lower() in category.name.lower()
card.setVisible(name_contains_text)
def clear_layout(self, layout):
while layout.count():
item = layout.takeAt(0)
widget = item.widget()
if widget is not None:
widget.deleteLater()
else:
sub_layout = item.layout()
if sub_layout is not None:
self.clear_layout(sub_layout)
del item
def redraw_cards(self):
self.clear_layout(self.scroll_layout)
self.category_overview_cards = []
self.category_overviews = fetch_all_book_category_statistics_overviews()
for category in self.category_overviews:
card = BookCategoryStatisticsOverviewCard(category)
self.scroll_layout.addWidget(card)
self.category_overview_cards.append(card)
__all__ = ["BookCategoryStatisticsOverview"]

View File

@ -0,0 +1,7 @@
from .member_list import *
from .member_card import *
__all__ = [
*member_list.__all__,
*member_card.__all__
]

View File

@ -2,11 +2,11 @@ from PySide6.QtWidgets import (
QHBoxLayout, QVBoxLayout, QLabel, QWidget, QMenu, QSizePolicy, QLayout, QMessageBox
)
from PySide6.QtGui import Qt
from PySide6.QtCore import qDebug
from models import Member, MemberStatusEnum
from database.manager import DatabaseManager
from sqlalchemy import delete
from database import delete_member
from ui.editor import MemberEditor
from utils.errors import DatabaseConnectionError, DatabaseError
STATUS_TO_COLOR_MAP = {
MemberStatusEnum.active: "#3c702e",
@ -38,32 +38,43 @@ class MemberCard(QWidget):
layout.setContentsMargins(10, 10, 10, 10)
layout.setSpacing(10)
# Initialize UI components
self.name_label = QLabel()
self.email_label = QLabel()
self.phone_label = QLabel()
self.status_label = QLabel()
self.register_date_label = QLabel()
# Left-side content
left_side = QVBoxLayout()
layout.addLayout(left_side)
name_label = QLabel(f"{member.first_name} {member.last_name}")
name_label.setStyleSheet("font-size: 20px; font-weight: bold;")
email_label = QLabel(f"Email: {member.email}")
phone_label = QLabel(f"Phone: {member.phone or 'Not Available'}")
left_side.addWidget(name_label)
left_side.addWidget(email_label)
left_side.addWidget(phone_label)
left_side.addWidget(self.name_label)
left_side.addWidget(self.email_label)
left_side.addWidget(self.phone_label)
# Right-side content
right_side = QVBoxLayout()
layout.addLayout(right_side)
status_label = QLabel(str(member.status.value.capitalize()))
status_label.setStyleSheet(f"color: {STATUS_TO_COLOR_MAP[member.status]}; font-size: 20px; font-weight: bold;")
status_label.setAlignment(Qt.AlignmentFlag.AlignRight)
register_date_label = QLabel(f"Registered: {member.register_date}")
register_date_label.setAlignment(Qt.AlignmentFlag.AlignRight)
right_side.addWidget(status_label)
right_side.addWidget(register_date_label)
right_side.addWidget(self.status_label)
right_side.addWidget(self.register_date_label)
self.setLayout(layout)
self.update_display()
def update_display(self):
"""Refreshes the display of the member card based on its current data."""
self.name_label.setText(f"{self.member.first_name} {self.member.last_name}")
self.name_label.setStyleSheet("font-size: 20px; font-weight: bold;")
self.email_label.setText(f"Email: {self.member.email}")
self.phone_label.setText(f"Phone: {self.member.phone or 'Not Available'}")
self.status_label.setText(str(self.member.status.value.capitalize()))
self.status_label.setStyleSheet(f"color: {STATUS_TO_COLOR_MAP[self.member.status]}; font-size: 20px; font-weight: bold;")
self.status_label.setAlignment(Qt.AlignmentFlag.AlignRight)
self.register_date_label.setText(f"Registered: {self.member.register_date}")
self.register_date_label.setAlignment(Qt.AlignmentFlag.AlignRight)
def mousePressEvent(self, event):
if event.button() == Qt.MouseButton.LeftButton:
@ -89,26 +100,24 @@ class MemberCard(QWidget):
action = context_menu.exec_(self.mapToGlobal(event.pos()))
if action == action_edit_member:
print("Edit Member selected") # Implement editor logic here
editor = MemberEditor(self.member, refresh_callback=self.refresh)
editor.exec()
elif action == action_deactivate_member:
self.update_member_status(MemberStatusEnum.inactive)
elif action == action_activate_member:
self.update_member_status(MemberStatusEnum.active)
def delete_member(self):
self.make_sure()
# if not self.make_sure():
# return
# with DatabaseManager.get_session() as session:
# try:
# stmt = delete(Member).where(Member.id == self.member.id)
# session.execute(stmt)
# session.commit()
# self.setVisible(False)
# except Exception as e:
# session.rollback()
# print(e)
if not self.make_sure():
return
try:
delete_member(self.member.id)
self.setVisible(False)
except DatabaseConnectionError as e:
QMessageBox.critical(None, "Failed", "Connection with database failed", QMessageBox.StandardButton.Ok)
except DatabaseError as e:
QMessageBox.critical(None, "Failed", f"An error occurred when deleting member: {e}", QMessageBox.StandardButton.Ok)
def update_member_status(self, new_status):
with DatabaseManager.get_session() as session:
@ -119,17 +128,20 @@ class MemberCard(QWidget):
session.commit()
QMessageBox.information(self, "Status Updated", f"Member status updated to {new_status.value.capitalize()}.")
self.member.status = new_status
self.update_status_label()
self.update_display()
else:
QMessageBox.critical(self, "Error", "The member you requested could not be found.", QMessageBox.StandardButton.Ok)
except Exception as e:
session.rollback()
print(e)
def update_status_label(self):
self.findChild(QLabel, self.member.status.value).setStyleSheet(
f"color: {STATUS_TO_COLOR_MAP[self.member.status]}; font-size: 20px; font-weight: bold;"
)
def refresh(self, updated_data):
"""Updates the card's data and refreshes the display."""
self.member.first_name = updated_data["first_name"]
self.member.last_name = updated_data["last_name"]
self.member.email = updated_data["email"]
self.member.phone = updated_data["phone"]
self.update_display()
def make_sure(self) -> bool:
are_you_sure_box = QMessageBox()
@ -141,3 +153,5 @@ class MemberCard(QWidget):
response = are_you_sure_box.exec()
return response == QMessageBox.Yes
__all__ = ["MemberCard"]

View File

@ -57,27 +57,19 @@ class MemberList(QWidget):
register_member_button.clicked.connect(self.register_member)
button_layout.addWidget(register_member_button)
delete_member_button = QPushButton("Delete Member")
delete_member_button.clicked.connect(self.delete_member)
button_layout.addWidget(delete_member_button)
main_layout.addLayout(button_layout)
def filter_members(self, text):
"""Filter the cards based on the search input."""
for card, member in zip(self.member_cards, self.members):
name_contains_text = text.lower() in member.name.lower()
id_contains_text = text.lower() in str(member.id)
first_name_contains_text = text.lower() in member.first_name.lower()
last_name_contains_text = text.lower() in member.last_name.lower()
card.setVisible(name_contains_text or id_contains_text)
card.setVisible(first_name_contains_text or last_name_contains_text)
def register_member(self):
MemberEditor().exec()
def delete_member(self):
QMessageBox.information(self, "Delete Member",
"Open dialog to delete a member.")
def clear_layout(self, layout):
while layout.count():
item = layout.takeAt(0)
@ -112,3 +104,6 @@ class MemberList(QWidget):
QMessageBox.critical(self, "Database Error",
f"Failed to fetch members: {e}")
return []
__all__ = ["MemberList"]

172
src/ui/menu_bar.py Normal file
View File

@ -0,0 +1,172 @@
from PySide6.QtGui import QAction
from PySide6.QtWidgets import QMessageBox, QFileDialog, QMenuBar, QMenu, QDialog
from PySide6.QtCore import QStandardPaths
from ui.settings import SettingsDialog
from ui.import_preview import PreviewDialog
from ui.editor import BookEditor, MemberEditor
from utils.errors import ExportError, ExportFileError, InvalidContentsError
from services import book_service, book_overview_service, book_category_statistics_service
class MenuBar(QMenuBar):
def __init__(self, parent):
super().__init__(parent)
self.parent = parent
self.file_types = {
"XML files (*.xml)": ".xml",
"Any file type (*)": ""
}
self.create_file_menu()
self.create_edit_menu()
self.create_help_menu()
def create_file_menu(self):
# File menu
file_menu = self.addMenu("File")
# New submenu
new_submenu = QMenu("New", self)
file_menu.addMenu(new_submenu)
# New book action
new_book_action = QAction("New book", self)
new_book_action.triggered.connect(self.new_book)
new_submenu.addAction(new_book_action)
# New member action
new_member_action = QAction("New member", self)
new_member_action.triggered.connect(self.new_member)
new_submenu.addAction(new_member_action)
# Import submenu
import_submenu = QMenu("Import", self)
file_menu.addMenu(import_submenu)
import_books_action = QAction("Import books", self)
import_books_action.triggered.connect(self.import_books)
import_submenu.addAction(import_books_action)
# Export submenu
export_submenu = QMenu("Export", self)
file_menu.addMenu(export_submenu)
export_books_action = QAction("Export books", self)
export_books_action.triggered.connect(self.export_books)
export_submenu.addAction(export_books_action)
export_category_statistics = QAction("Export category statistics", self)
export_category_statistics.triggered.connect(self.export_category_statistics)
export_submenu.addAction(export_category_statistics)
file_menu.addSeparator()
exit_action = QAction("Exit", self)
exit_action.setShortcut("Ctrl+Q")
exit_action.triggered.connect(self.parent.close)
file_menu.addAction(exit_action)
def create_edit_menu(self):
# Edit menu
edit_menu = self.addMenu("Edit")
# Preferences menu
preferences_action = QAction("Preferences", self)
preferences_action.setShortcut("Ctrl+,")
preferences_action.triggered.connect(self.edit_preferences)
edit_menu.addAction(preferences_action)
def create_help_menu(self):
# Help menu
help_menu = self.addMenu("Help")
about_action = QAction("About", self)
about_action.triggered.connect(self.about)
help_menu.addAction(about_action)
def edit_preferences(self):
SettingsDialog(parent=self).exec()
def new_book(self):
BookEditor().exec()
self.parent.refresh_book_cards()
def new_member(self):
MemberEditor().exec()
self.parent.refresh_member_cards()
def import_books(self):
self.import_data("Book", None, book_service)
def export_books(self):
self.export_data("Book", book_service)
def export_category_statistics(self):
self.export_data("Category statistics", book_category_statistics_service)
def about(self):
QMessageBox.information(
self, "About", "Library app demonstrating the phantom read problem")
def import_data(self, import_name: str, preview_dialog, service):
try:
home_dir = QStandardPaths.writableLocation(QStandardPaths.HomeLocation)
file_path, _ = QFileDialog.getOpenFileName(self, "Choose import file", home_dir, ";;".join(self.file_types.keys()))
if not file_path:
return # User canceled
parsed_data = service.parse_from_xml(file_path)
if not parsed_data:
QMessageBox.warning(self, f"No New {import_name}s", f"No new {import_name}s to import.", QMessageBox.Ok)
return
# Show preview dialog
dialog = PreviewDialog(parsed_data, self)
if dialog.exec() == QDialog.Accepted:
# User confirmed, proceed with importing
book_service.create_books(parsed_data)
QMessageBox.information(self, "Success", "Books imported successfully!", QMessageBox.Ok)
self.parent.refresh_book_cards()
else:
QMessageBox.information(self, "Canceled", "Import was canceled.", QMessageBox.Ok)
except InvalidContentsError as e:
QMessageBox.critical(self,
"Invalid file",
"The file you selected is invalid",
QMessageBox.StandardButton.Ok)
except ImportError as e:
QMessageBox.critical(self,
"Error importing books",
f"An error occurred when importing books from the file provided: {e}",
QMessageBox.StandardButton.Ok)
def export_data(self, export_name: str, service):
try:
home_dir = QStandardPaths.writableLocation(QStandardPaths.HomeLocation)
file_path, selected_filter = QFileDialog.getSaveFileName(self,
f"Save {export_name} export",
home_dir,
";;".join(self.file_types.keys()))
if file_path:
selected_filetype = self.file_types[selected_filter]
if file_path.endswith(selected_filetype):
selected_filetype = ""
service.export_to_xml(file_path + selected_filetype)
except ExportFileError as e:
QMessageBox.critical(self,
"Error saving file",
f"Error occurred when saving the exported data: {e}",
QMessageBox.StandardButton.Ok)
except ExportError as e:
QMessageBox.critical(self,
f"Error exporting {export_name}s",
f"An error occurred when exporting {export_name}s: {e}",
QMessageBox.StandardButton.Ok)

View File

@ -24,37 +24,26 @@ class SettingsDialog(QDialog):
self.data_mode_label = QtWidgets.QLabel(UserConfig.get_friendly_name("transaction_level") + ":")
data_mode_layout.addWidget(self.data_mode_label)
self.data_mode_dropdown = QtWidgets.QComboBox()
for tl in TransactionLevel:
self.data_mode_dropdown.addItem(tl.name.capitalize(), tl)
self.data_mode_dropdown.setCurrentIndex(
list(TransactionLevel).index(self.user_config.transaction_level)
)
self.data_mode_selected = QtWidgets.QLabel(self.user_config.transaction_level.name.capitalize())
data_mode_layout.addWidget(self.data_mode_selected)
data_mode_layout.addWidget(self.data_mode_dropdown)
# Slowdown simulation
self.slowdown_layout = QtWidgets.QHBoxLayout()
self.slowdown_label = QtWidgets.QLabel(UserConfig.get_friendly_name("simulate_slowdown") + ":")
data_mode_layout.addWidget(self.slowdown_label)
self.slowdown_layout.addWidget(self.slowdown_label)
self.slowdown_checkbox = QtWidgets.QCheckBox()
self.slowdown_checkbox.setChecked(self.user_config.simulate_slowdown)
data_mode_layout.addWidget(self.slowdown_checkbox)
self.slowdown_layout.addWidget(self.slowdown_checkbox)
layout.addLayout(data_mode_layout)
# Set the currently selected mode to the mode in UserConfig
config = UserConfig()
# Transaction level
current_level = config.transaction_level
index = self.data_mode_dropdown.findData(current_level)
if index != -1:
self.data_mode_dropdown.setCurrentIndex(index)
layout.addLayout(self.slowdown_layout)
# Slowdown simulation
simulate_slowdown = config.simulate_slowdown
simulate_slowdown = self.user_config.simulate_slowdown
self.slowdown_checkbox.setChecked(simulate_slowdown)
# Buttons
@ -71,15 +60,8 @@ class SettingsDialog(QDialog):
layout.addLayout(button_layout)
def save_settings(self):
data_mode = self.data_mode_dropdown.currentData()
simulate_slowdown = self.slowdown_checkbox.isChecked()
try:
self.logger.debug("Saving user configuration")
config = UserConfig()
config.transaction_level = data_mode
config.simulate_slowdown = simulate_slowdown
self.accept()
except TypeError as e:
self.logger.error("Invalid user configuration found")
QMessageBox.critical(None, "Invalid config detected", "Double check your configuration", QMessageBox.StandardButton.Ok, QMessageBox.StandardBUttons.NoButton)
self.logger.debug("Saving user configuration")
config = UserConfig()
config.simulate_slowdown = simulate_slowdown
self.accept()

View File

@ -1,25 +1,11 @@
from PySide6.QtGui import QGuiApplication, QAction, QIcon
from PySide6.QtQml import QQmlApplicationEngine
from PySide6 import QtWidgets, QtCore
from PySide6.QtWidgets import QMessageBox, QFileDialog
from PySide6.QtCore import QStandardPaths
from PySide6.QtGui import QGuiApplication, QIcon
from PySide6.QtWidgets import QMainWindow, QApplication, QTabWidget
from ui.dashboard.dashboard import LibraryDashboard
from ui.main_window_tabs.member_list.member_list import MemberList
from ui.editor import BookEditor, MemberEditor
from ui.settings import SettingsDialog
from ui.import_preview import PreviewDialog
from export.book_exporter import BookExporter
from importer.book.book_importer import BookImporter
from utils.errors.export_error import ExportError
from utils.errors.import_error.import_error import ImportError
from ui.main_tabs import BookOverviewList, MemberList, BookCategoryStatisticsOverview
from ui.menu_bar import MenuBar
class LibraryWindow(QtWidgets.QMainWindow):
class LibraryWindow(QMainWindow):
def __init__(self):
super().__init__()
@ -30,26 +16,22 @@ class LibraryWindow(QtWidgets.QMainWindow):
self.center_window()
# Set up menu bar
self.create_menu_bar()
self.setMenuBar(MenuBar(self))
# Central widget and layout
central_widget = QtWidgets.QTabWidget()
central_widget = QTabWidget()
self.setCentralWidget(central_widget)
self.dashboard = LibraryDashboard()
self.member_list = MemberList()
self.dashboard = BookOverviewList(self)
self.member_list = MemberList()
self.category_statistics_overview_list = BookCategoryStatisticsOverview()
central_widget.addTab(self.dashboard, "Dashboard")
central_widget.addTab(self.member_list, "Members")
self.file_types = {
"XML files (*.xml)": ".xml",
"Any file type (*)": ""}
central_widget.addTab(self.category_statistics_overview_list, "Category stats")
def center_window(self):
# Get the screen geometry
screen = QtWidgets.QApplication.primaryScreen()
screen = QApplication.primaryScreen()
screen_geometry = screen.geometry()
# Get the dimensions of the window
@ -64,164 +46,9 @@ class LibraryWindow(QtWidgets.QMainWindow):
# Move the window to the calculated geometry
self.move(window_geometry.topLeft())
def create_menu_bar(self):
# Create the menu bar
menu_bar = self.menuBar()
def refresh_book_cards(self):
self.dashboard.redraw_cards()
self.category_statistics_overview_list.redraw_cards()
# File menu
file_menu = menu_bar.addMenu("File")
# New submenu
new_submenu = QtWidgets.QMenu(self)
new_submenu.setTitle("New")
file_menu.addMenu(new_submenu)
# New book action
new_book_action = QAction("New book", self)
new_book_action.triggered.connect(self.new_book)
new_submenu.addAction(new_book_action)
# New book action
new_member_action = QAction("New member", self)
new_member_action.triggered.connect(self.new_member)
new_submenu.addAction(new_member_action)
# Import submenu
import_submenu = QtWidgets.QMenu(self)
import_submenu.setTitle("Import")
file_menu.addMenu(import_submenu)
import_books_action = QAction("Import books", self)
import_books_action.triggered.connect(self.import_books)
import_submenu.addAction(import_books_action)
import_members_action = QAction("Import members", self)
import_members_action.triggered.connect(self.import_data)
import_submenu.addAction(import_members_action)
# Export submenu
export_submenu = QtWidgets.QMenu(self)
export_submenu.setTitle("Export")
file_menu.addMenu(export_submenu)
# Export overview
export_overview_action = QAction("Export overview", self)
export_overview_action.triggered.connect(self.export_data)
export_submenu.addAction(export_overview_action)
# Export books
export_books_action = QAction("Export books", self)
export_books_action.triggered.connect(self.export_books)
export_submenu.addAction(export_books_action)
# Export members
export_members_action = QAction("Export members", self)
export_members_action.triggered.connect(self.export_data)
export_submenu.addAction(export_members_action)
file_menu.addSeparator()
exit_action = QAction("Exit", self)
exit_action.setShortcut("Ctrl+Q")
exit_action.triggered.connect(self.close)
file_menu.addAction(exit_action)
# Edit menu
edit_menu = menu_bar.addMenu("Edit")
# Preferences menu
preferences_action = QAction("Preferences", self)
preferences_action.setShortcut("Ctrl+,")
preferences_action.triggered.connect(self.edit_preferences)
edit_menu.addAction(preferences_action)
# Help menu
help_menu = menu_bar.addMenu("Help")
about_action = QAction("About", self)
about_action.triggered.connect(self.about)
help_menu.addAction(about_action)
# Menu action slots
def edit_preferences(self):
SettingsDialog(parent=self).exec()
# region Menu Actions
def export_books(self):
try:
home_dir = QStandardPaths.writableLocation(
QStandardPaths.HomeLocation)
file_path, selected_filter = QFileDialog.getSaveFileName(self,
"Save book export",
home_dir,
";;".join(self.file_types.keys()))
if file_path:
selected_filetype = self.file_types[selected_filter]
if file_path.endswith(selected_filetype):
selected_filetype = ""
book_exporter = BookExporter()
book_exporter.save_xml(file_path + selected_filetype)
except OSError as e:
QMessageBox.critical(self, "Error saving file", f"Error occurred when saving the exported data: {
e}", QMessageBox.StandardButton.Ok, QMessageBox.StandardButton.NoButton)
except ExportError as e:
QMessageBox.critical(self, "Error exporting books", f"An error occurred when exporting books: {
e}", QMessageBox.StandardButton.Ok, QMessageBox.StandardButton.NoButton)
def import_books(self):
try:
home_dir = QStandardPaths.writableLocation(
QStandardPaths.HomeLocation)
file_path, _ = QFileDialog.getOpenFileName(
self, "Choose import file", home_dir, ";;".join(
self.file_types.keys())
)
if not file_path:
return # User canceled
importer = BookImporter()
books = importer.parse_xml(file_path)
if not books:
QMessageBox.information(
self, "No New Books", "No new books to import.", QMessageBox.Ok)
return
# Show preview dialog
dialog = PreviewDialog(books, self)
if dialog.exec() == QtWidgets.QDialog.Accepted:
# User confirmed, proceed with importing
importer.save_books(books)
QMessageBox.information(
self, "Success", "Books imported successfully!", QMessageBox.Ok)
self.dashboard.redraw_cards()
else:
QMessageBox.information(
self, "Canceled", "Import was canceled.", QMessageBox.Ok)
except ImportError as e:
QMessageBox.critical(self, "Error importing books", f"An error occurred when importing books from the file provided: {
e}", QMessageBox.StandardButton.Ok, QMessageBox.StandardButton.NoButton)
def new_book(self):
# BookEditor()
pass
def new_member(self):
MemberEditor().exec()
def import_data(self):
pass
def export_data(self):
pass
def about(self):
QtWidgets.QMessageBox.information(
self, "About", "Library app demonstrating the phantom read problem")
# endregion
def refresh_member_cards(self):
self.member_list.redraw_cards()

View File

@ -60,11 +60,12 @@ class DatabaseConfig():
class TransactionLevel(enum.Enum):
insecure = "READ UNCOMMITTED"
insecure = "READ COMMITTED"
secure = "SERIALIZABLE"
class UserConfig:
_instance = None
_metadata = {
@ -78,21 +79,23 @@ class UserConfig:
return cls._instance
def __init__(self):
self._transaction_level = TransactionLevel.insecure
self._simulate_slowdown = False
if not hasattr(self, "logger"):
self.logger = logging.getLogger(__name__)
if not hasattr(self, "_transaction_level"):
env_level = os.getenv("TRANSACTION_LEVEL", "INSECURE").upper()
if env_level == "SECURE":
self.logger.debug("Running in SECURE mode")
self._transaction_level = TransactionLevel.secure
else:
self.logger.debug("Running in INSECURE mode")
self._transaction_level = TransactionLevel.insecure
if not hasattr(self, "_simulate_slowdown"):
self._simulate_slowdown = False
@property
def transaction_level(self) -> TransactionLevel:
return self._transaction_level
@transaction_level.setter
def transaction_level(self, value: Any):
if not isinstance(value, TransactionLevel):
raise TypeError(
f"Invalid value for 'transaction_level'. Must be a TransactionLevel enum, got {type(value).__name__}."
)
self._transaction_level = value
@property
def simulate_slowdown(self) -> bool:
return self._simulate_slowdown
@ -103,14 +106,9 @@ class UserConfig:
raise TypeError(
f"Invalid value for 'simulate_slowdown'. Must be a boolean, got {type(value).__name__}."
)
self.logger.debug(f"Slowdown simulation set to: {value}")
self._simulate_slowdown = value
@classmethod
def get_friendly_name(cls, option: str) -> str:
return cls._metadata.get(option, {}).get("friendly_name", option)
def __dict__(self) -> dict:
return {
"transaction_level": self.transaction_level,
"simulate_slowdown": self.simulate_slowdown,
}

View File

View File

@ -0,0 +1,9 @@
from .import_error import *
from .export_error import *
from .database import *
__all__ = [
*import_error.__all__,
*export_error.__all__,
*database.__all__
]

View File

@ -20,6 +20,10 @@ class DatabaseConnectionError(DatabaseError):
class DuplicateEntryError(DatabaseError):
def __init__(self, message: str):
def __init__(self, duplicate_entry_name: str, message: str = ""):
super().__init__(message)
self.duplicate_entry_name = duplicate_entry_name
self.message = message
__all__ = ["DatabaseError", "DatabaseConfigError", "DatabaseConnectionError", "DuplicateEntryError"]

View File

@ -3,3 +3,19 @@ class ExportError(Exception):
super().__init__(message)
self.message = message
class NoExportEntityError(ExportError):
def __init__(self, message: str):
super().__init__(message)
self.message = message
class ExportFileError(ExportError):
def __init__(self, message: str):
super().__init__(message)
self.message = message
__all__ = ["ExportError", "NoExportEntityError", "ExportFileError"]

View File

@ -0,0 +1,22 @@
class ImportError(Exception):
def __init__(self, message: str):
super().__init__(message)
self.message = message
class InvalidContentsError(ImportError):
def __init__(self, message: str):
super().__init__(message)
self.message = message
class XsdSchemeNotFoundError(ImportError):
def __init__(self, message: str):
super().__init__(message)
self.message = message
__all__ = ["ImportError", "InvalidContentsError", "XsdSchemeNotFoundError"]

View File

@ -1,5 +0,0 @@
class ImportError(Exception):
def __init__(self, message: str):
super().__init__(message)
self.message = message

View File

@ -1,8 +0,0 @@
from .import_error import ImportError
class InvalidContentsError(ImportError):
def __init__(self, message: str):
super().__init__(message)
self.message = message

View File

@ -1,8 +0,0 @@
from .import_error import ImportError
class XsdSchemeNotFoundError(ImportError):
def __init__(self, message: str):
super().__init__(message)
self.message = message

View File

@ -1,6 +0,0 @@
from .export_error import ExportError
class NoExportEntityError(ExportError):
def __init__(self, message: str):
super().__init__(message)
self.message = message

View File

@ -1,12 +1,24 @@
import sys
import os
import logging
def setup_logger():
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
verbosity = os.getenv("VERBOSITY", "DEBUG").upper()
level_map = {
"DEBUG": logging.DEBUG,
"INFO": logging.INFO,
"WARNING": logging.WARNING,
"ERROR": logging.ERROR,
"CRITICAL": logging.CRITICAL,
}
log_level = level_map.get(verbosity, logging.DEBUG) # Default to DEBUG if invalid
logger.setLevel(log_level)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
handler.setLevel(log_level)
formatter = logging.Formatter("[%(levelname)s] - %(name)s:%(lineno)d - %(message)s")
handler.setFormatter(formatter)