Compare commits
10 Commits
516639ef9d
...
104d42201d
Author | SHA1 | Date | |
---|---|---|---|
104d42201d | |||
e452822ffc | |||
a129d88575 | |||
9c827a04f5 | |||
85382ee616 | |||
d7ee79f7e9 | |||
153da38f89 | |||
6a42e522f9 | |||
ec55394d79 | |||
79ca9f2e6f |
@ -2,4 +2,7 @@ DATABASE_HOST=
|
||||
DATABASE_PORT=
|
||||
DATABASE_NAME=
|
||||
DATABASE_USER=
|
||||
DATABASE_PASSWORD=
|
||||
DATABASE_PASSWORD=
|
||||
|
||||
TRANSACTION_LEVEL=
|
||||
VERBOSITY=
|
BIN
db/Model.mwb
BIN
db/Model.mwb
Binary file not shown.
5
src/assets/__init__.py
Normal file
5
src/assets/__init__.py
Normal file
@ -0,0 +1,5 @@
|
||||
from .asset_manager import *
|
||||
|
||||
__all__ = [
|
||||
*asset_manager.__all__
|
||||
]
|
13
src/assets/asset_manager.py
Normal file
13
src/assets/asset_manager.py
Normal file
@ -0,0 +1,13 @@
|
||||
import os
|
||||
|
||||
ASSETS_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "assets")
|
||||
|
||||
|
||||
def get_asset(name: str) -> str:
|
||||
absolute_path = os.path.join(ASSETS_DIR, name)
|
||||
if not os.path.exists(absolute_path):
|
||||
raise FileNotFoundError(f"Asset not found: {absolute_path}")
|
||||
return os.path.abspath(absolute_path)
|
||||
|
||||
|
||||
__all__ = ["get_asset"]
|
@ -6,9 +6,8 @@
|
||||
<xs:element name="book" maxOccurs="unbounded">
|
||||
<xs:complexType>
|
||||
<xs:sequence>
|
||||
<xs:element name="title"> <!-- Book title-->
|
||||
<xs:element name="title"> <!-- Book title -->
|
||||
<xs:simpleType>
|
||||
<!-- Allow for a string 2-100 characters long -->
|
||||
<xs:restriction base="xs:string">
|
||||
<xs:minLength value="2" />
|
||||
<xs:maxLength value="100" />
|
||||
@ -21,14 +20,14 @@
|
||||
<xs:element name="first_name">
|
||||
<xs:simpleType>
|
||||
<xs:restriction base="xs:string">
|
||||
<xs:maxLength value="50"/>
|
||||
<xs:maxLength value="50" />
|
||||
</xs:restriction>
|
||||
</xs:simpleType>
|
||||
</xs:element>
|
||||
<xs:element name="last_name">
|
||||
<xs:simpleType>
|
||||
<xs:restriction base="xs:string">
|
||||
<xs:maxLength value="50"/>
|
||||
<xs:maxLength value="50" />
|
||||
</xs:restriction>
|
||||
</xs:simpleType>
|
||||
</xs:element>
|
||||
@ -50,6 +49,16 @@
|
||||
</xs:restriction>
|
||||
</xs:simpleType>
|
||||
</xs:element>
|
||||
<xs:element name="price"> <!-- Price -->
|
||||
<xs:simpleType>
|
||||
<xs:restriction base="xs:decimal">
|
||||
<xs:totalDigits value="7" /> <!-- Max 99999.99 -->
|
||||
<xs:fractionDigits value="2" />
|
||||
<xs:minInclusive value="0.00" />
|
||||
</xs:restriction>
|
||||
</xs:simpleType>
|
||||
</xs:element>
|
||||
<xs:element name="is_damaged" type="xs:boolean" /> <!-- Is damaged -->
|
||||
<xs:element name="categories"> <!-- Categories list -->
|
||||
<xs:complexType>
|
||||
<xs:sequence>
|
||||
@ -64,5 +73,4 @@
|
||||
</xs:sequence>
|
||||
</xs:complexType>
|
||||
</xs:element>
|
||||
|
||||
</xs:schema>
|
||||
</xs:schema>
|
@ -1,9 +1,17 @@
|
||||
from .manager import *
|
||||
from .book import *
|
||||
from .book_category import *
|
||||
from .book_category_statistics import *
|
||||
from .book_category_statistics_overview import *
|
||||
from .member import *
|
||||
from .book_overview import *
|
||||
|
||||
__all__ = [
|
||||
*manager.__all__,
|
||||
*book.__all__,
|
||||
*book_category.__all__,
|
||||
*book_category_statistics.__all__,
|
||||
*book_category_statistics_overview.__all__,
|
||||
*book_overview.__all__,
|
||||
*member.__all__,
|
||||
]
|
32
src/database/author.py
Normal file
32
src/database/author.py
Normal file
@ -0,0 +1,32 @@
|
||||
import logging
|
||||
from typing import Dict
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from models import Author
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def get_or_create_author(session: Session, author_data: Dict[str, str]) -> Author:
|
||||
"""
|
||||
Checks if an author exists in the database, creates one if not.
|
||||
|
||||
:param session: SQLAlchemy session object.
|
||||
:param author_data: Dictionary containing author's first and last name.
|
||||
:return: An Author instance (either existing or newly created).
|
||||
"""
|
||||
existing_author = session.query(Author).filter_by(
|
||||
first_name=author_data["first_name"],
|
||||
last_name=author_data["last_name"]
|
||||
).one_or_none()
|
||||
|
||||
if existing_author is not None:
|
||||
logger.debug(f"Author {author_data['first_name']} {author_data['last_name']} already exists. Reusing.")
|
||||
return existing_author
|
||||
|
||||
logger.debug(f"Creating new author: {author_data['first_name']} {author_data['last_name']}")
|
||||
author = Author(first_name=author_data["first_name"], last_name=author_data["last_name"])
|
||||
session.add(author)
|
||||
return author
|
||||
|
||||
__all__ = ["get_or_create_author"]
|
@ -1,66 +1,132 @@
|
||||
from typing import Dict, List
|
||||
import logging
|
||||
|
||||
from sqlalchemy.exc import IntegrityError, SQLAlchemyError, DatabaseError as SqlAlchemyDatabaseError
|
||||
|
||||
from sqlalchemy.orm import joinedload
|
||||
from sqlalchemy import delete
|
||||
from utils.errors.database import DatabaseError, DuplicateEntryError, DatabaseConnectionError
|
||||
from models import Book
|
||||
from database.manager import DatabaseManager
|
||||
from .author import get_or_create_author
|
||||
from .book_category import get_or_create_categories
|
||||
from .book_category_statistics import update_category_statistics
|
||||
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def fetch_all():
|
||||
def fetch_all_books() -> List[Book]:
|
||||
with DatabaseManager.get_session() as session:
|
||||
try:
|
||||
return session.query(Book).all()
|
||||
return session.query(Book) \
|
||||
.options(
|
||||
joinedload(Book.author),
|
||||
joinedload(Book.categories)
|
||||
) \
|
||||
.all()
|
||||
except SqlAlchemyDatabaseError as e:
|
||||
logger.critical("Connection with database interrupted")
|
||||
raise DatabaseConnectionError(
|
||||
"Connection with database interrupted") from e
|
||||
raise DatabaseConnectionError("Connection with database interrupted") from e
|
||||
except SQLAlchemyError as e:
|
||||
logger.error(f"An error occured when fetching all books: {e}")
|
||||
raise DatabaseError(
|
||||
"An error occured when fetching all books") from e
|
||||
logger.error(f"An error occurred when fetching all books: {e}")
|
||||
raise DatabaseError("An error occurred when fetching all books") from e
|
||||
|
||||
def create_book(book: Dict[str, object], skip_existing: bool = True) -> None:
|
||||
create_books([book], skip_existing)
|
||||
|
||||
def create_new_book(book: Book):
|
||||
pass
|
||||
|
||||
|
||||
def update_book(book: Book):
|
||||
session = DatabaseManager.get_session()
|
||||
def create_books(books: List[Dict[str, object]], skip_existing: bool = True) -> None:
|
||||
try:
|
||||
with session:
|
||||
logger.debug(f"Updating book {book.title}")
|
||||
existing_book = session.query(Book).get(book.id)
|
||||
with DatabaseManager.get_session() as session:
|
||||
for book in books:
|
||||
logger.debug(f"Attempting to create a new book: {book['title']}")
|
||||
|
||||
if not existing_book:
|
||||
logger.warning(f"Book with id {book.id} not found")
|
||||
raise DatabaseError("Book not found in the database")
|
||||
existing_book = session.query(Book).filter_by(isbn=book["isbn"]).first()
|
||||
|
||||
existing_book.title = book.title
|
||||
existing_book.description = book.description
|
||||
existing_book.year_published = book.year_published
|
||||
existing_book.isbn = book.isbn
|
||||
if existing_book:
|
||||
if skip_existing:
|
||||
logger.warning(f"Book with ISBN {book['isbn']} already exists. Skipping.")
|
||||
continue
|
||||
else:
|
||||
logger.error(f"Book with ISBN {book['isbn']} already exists.")
|
||||
raise DuplicateEntryError(f"Book with ISBN {book['isbn']} already exists.")
|
||||
|
||||
author = get_or_create_author(session, book["author"])
|
||||
categories = get_or_create_categories(session, book["categories"])
|
||||
|
||||
new_book = Book(
|
||||
title=book["title"],
|
||||
description=book["description"],
|
||||
year_published=book["year_published"],
|
||||
isbn=book["isbn"],
|
||||
price=book["price"],
|
||||
is_damaged=book["is_damaged"],
|
||||
author=author,
|
||||
categories=categories
|
||||
)
|
||||
session.add(new_book)
|
||||
|
||||
update_category_statistics(session)
|
||||
session.commit()
|
||||
logger.info("Book successfully updated")
|
||||
except IntegrityError as e:
|
||||
logger.warning("Data already exists")
|
||||
session.rollback()
|
||||
raise DuplicateEntryError(
|
||||
"Data already exists in the database") from e
|
||||
raise DuplicateEntryError("Data already exists in the database") from e
|
||||
except SqlAlchemyDatabaseError as e:
|
||||
session.rollback()
|
||||
logger.critical("Connection with database interrupted")
|
||||
raise DatabaseConnectionError(
|
||||
"Connection with database interrupted") from e
|
||||
raise DatabaseConnectionError("Connection with database interrupted") from e
|
||||
except SQLAlchemyError as e:
|
||||
logger.error(f"An error occured when saving book: {e}")
|
||||
session.rollback()
|
||||
raise DatabaseError(
|
||||
"An error occured when updating the book") from e
|
||||
logger.error(f"An error occurred when creating the book: {e}")
|
||||
raise DatabaseError("An error occurred when creating the book") from e
|
||||
|
||||
def update_book(book: Dict[str, object]) -> None:
|
||||
try:
|
||||
with DatabaseManager.get_session() as session:
|
||||
logger.debug(f"Updating book {book['title']}")
|
||||
|
||||
__all__ = ["create_new_book", "update_book"]
|
||||
existing_book = session.query(Book).filter_by(isbn=book["isbn"]).first()
|
||||
|
||||
if not existing_book:
|
||||
logger.warning(f"Book with ISBN {book['isbn']} not found")
|
||||
raise DatabaseError("Book not found in the database")
|
||||
|
||||
author = get_or_create_author(session, book["author"])
|
||||
categories = get_or_create_categories(session, book["categories"])
|
||||
session.commit()
|
||||
|
||||
existing_book.title = book["title"]
|
||||
existing_book.description = book["description"]
|
||||
existing_book.year_published = book["year_published"]
|
||||
existing_book.isbn = book["isbn"]
|
||||
existing_book.price = book["price"]
|
||||
existing_book.is_damaged = book["is_damaged"]
|
||||
existing_book.status = book["status"]
|
||||
existing_book.author = author
|
||||
existing_book.categories = categories
|
||||
|
||||
update_category_statistics(session, ignore_config=True)
|
||||
session.commit()
|
||||
|
||||
logger.info(f"{book['title']} successfully updated.")
|
||||
except IntegrityError as e:
|
||||
logger.warning("Data already exists")
|
||||
raise DuplicateEntryError("Data already exists in the database") from e
|
||||
except SqlAlchemyDatabaseError as e:
|
||||
logger.critical("Connection with database interrupted")
|
||||
raise DatabaseConnectionError("Connection with database interrupted") from e
|
||||
except SQLAlchemyError as e:
|
||||
logger.error(f"An error occurred when updating the book: {e}")
|
||||
raise DatabaseError("An error occurred when updating the book") from e
|
||||
|
||||
def delete_book(book_id: int) -> None:
|
||||
try:
|
||||
with DatabaseManager.get_session() as session:
|
||||
logger.debug(f"Deleting book id {book_id}")
|
||||
stmt = delete(Book).where(Book.id == book_id)
|
||||
session.execute(stmt)
|
||||
update_category_statistics(session, ignore_config=True)
|
||||
session.commit()
|
||||
logger.info(f"Successfully deleted book with id {book_id}")
|
||||
except SqlAlchemyDatabaseError as e:
|
||||
logger.critical("Connection with database interrupted")
|
||||
raise DatabaseConnectionError("Connection with database interrupted") from e
|
||||
except SQLAlchemyError as e:
|
||||
logger.error(f"An error occurred when updating the book: {e}")
|
||||
raise DatabaseError("An error occurred when updating the book") from e
|
||||
|
||||
__all__ = ["create_book", "create_books", "update_book", "fetch_all_books", "delete_book"]
|
||||
|
48
src/database/book_category.py
Normal file
48
src/database/book_category.py
Normal file
@ -0,0 +1,48 @@
|
||||
from typing import Dict, List, Optional
|
||||
import logging
|
||||
|
||||
from models import BookCategory
|
||||
from database.manager import DatabaseManager
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
from sqlalchemy import func
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_or_create_categories(session, category_names: List[str]) -> List[BookCategory]:
|
||||
"""
|
||||
Checks if categories exist in the database, creates ones that don't.
|
||||
|
||||
:param session: SQLAlchemy session object.
|
||||
:param category_names: List of category names.
|
||||
:return: List of BookCategory instances (existing or newly created).
|
||||
"""
|
||||
processed_categories = {} # Cache for already processed categories
|
||||
filtered_categories = []
|
||||
|
||||
for category_name in category_names:
|
||||
if category_name in processed_categories:
|
||||
filtered_categories.append(processed_categories[category_name])
|
||||
continue
|
||||
|
||||
existing_category = session.query(BookCategory).filter_by(name=category_name).one_or_none()
|
||||
|
||||
if existing_category is not None:
|
||||
logger.debug(f"Category {category_name} already exists. Reusing.")
|
||||
processed_categories[category_name] = existing_category
|
||||
filtered_categories.append(existing_category)
|
||||
else:
|
||||
logger.debug(f"Adding new category: {category_name}")
|
||||
new_category = BookCategory(name=category_name)
|
||||
session.add(new_category)
|
||||
processed_categories[category_name] = new_category
|
||||
filtered_categories.append(new_category)
|
||||
|
||||
return filtered_categories
|
||||
|
||||
|
||||
def get_total_count(session: Session) -> int:
|
||||
return session.query(func.count(BookCategory.id)).scalar()
|
||||
|
||||
__all__ = ["get_total_count", "get_or_create_categories"]
|
56
src/database/book_category_statistics.py
Normal file
56
src/database/book_category_statistics.py
Normal file
@ -0,0 +1,56 @@
|
||||
from sqlalchemy import func, insert
|
||||
from sqlalchemy.orm import Session
|
||||
from models import BookCategory, BookCategoryStatistics, BookCategoryLink
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def update_category_statistics(session: Session, ignore_config: bool = False) -> None:
|
||||
"""
|
||||
Updates category statistics by calculating the count of books in each category.
|
||||
|
||||
:param session: SQLAlchemy session object.
|
||||
"""
|
||||
# Fetch book counts per category using a join between book_category and book_category_link
|
||||
category_counts = (
|
||||
session.query(
|
||||
BookCategory.id.label('book_category_id'),
|
||||
func.count(BookCategoryLink.book_id).label('book_count')
|
||||
)
|
||||
.join(BookCategoryLink, BookCategoryLink.book_category_id == BookCategory.id, isouter=True)
|
||||
.group_by(BookCategory.id)
|
||||
.all()
|
||||
)
|
||||
|
||||
# Iterate over the results and update or insert the category statistics
|
||||
for category_id, book_count in category_counts:
|
||||
# Try to get the existing statistics or create a new one if it doesn't exist
|
||||
existing_statistics = (
|
||||
session.query(BookCategoryStatistics)
|
||||
.filter(BookCategoryStatistics.book_category_id == category_id)
|
||||
.one_or_none()
|
||||
)
|
||||
|
||||
if existing_statistics:
|
||||
# If statistics exist, update the count
|
||||
existing_statistics.book_count = book_count
|
||||
logger.debug(f"Updated category {category_id} with count {book_count}")
|
||||
else:
|
||||
# If statistics don't exist, create a new one
|
||||
new_statistics = BookCategoryStatistics(
|
||||
book_category_id=category_id,
|
||||
book_count=book_count
|
||||
)
|
||||
session.add(new_statistics)
|
||||
logger.debug(f"Inserted new statistics for category {category_id} with count {book_count}")
|
||||
|
||||
try:
|
||||
# Commit the transaction
|
||||
session.commit()
|
||||
logger.debug("Category statistics updated successfully")
|
||||
except Exception as e:
|
||||
# In case of error, rollback the transaction
|
||||
logger.error(f"An error occurred while updating category statistics: {e}")
|
||||
session.rollback()
|
||||
|
||||
__all__ = ["update_category_statistics"]
|
55
src/database/book_category_statistics_overview.py
Normal file
55
src/database/book_category_statistics_overview.py
Normal file
@ -0,0 +1,55 @@
|
||||
from typing import List, Tuple
|
||||
import time
|
||||
|
||||
import logging
|
||||
from sqlalchemy.exc import IntegrityError, SQLAlchemyError, DatabaseError as SqlAlchemyDatabaseError
|
||||
|
||||
from utils.errors.database import DatabaseError, DuplicateEntryError, DatabaseConnectionError
|
||||
from models import BookCategoryStatisticsOverview
|
||||
from database.manager import DatabaseManager
|
||||
from database import get_total_count
|
||||
|
||||
from utils.config import UserConfig
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def fetch_all_book_category_statistics_overviews() -> List[BookCategoryStatisticsOverview]:
|
||||
with DatabaseManager.get_session() as session:
|
||||
try:
|
||||
return session.query(BookCategoryStatisticsOverview).all()
|
||||
except SqlAlchemyDatabaseError as e:
|
||||
logger.critical("Connection with database interrupted")
|
||||
raise DatabaseConnectionError("Connection with database interrupted") from e
|
||||
except SQLAlchemyError as e:
|
||||
logger.error(f"An error occurred when fetching all category overviews: {e}")
|
||||
raise DatabaseError("An error occurred when fetching all category overviews") from e
|
||||
|
||||
|
||||
def fetch_all_book_category_statistics_overviews_with_count() -> Tuple[List[BookCategoryStatisticsOverview], int]:
|
||||
with DatabaseManager.get_session() as session:
|
||||
try:
|
||||
category_list = session.query(BookCategoryStatisticsOverview).all()
|
||||
|
||||
user_config = UserConfig()
|
||||
|
||||
if user_config.simulate_slowdown:
|
||||
logger.debug("Simulating slowdown after fetching statistics for 10 seconds")
|
||||
time.sleep(10)
|
||||
else:
|
||||
logger.debug("Performing category statistics update normally")
|
||||
|
||||
count = get_total_count(session)
|
||||
|
||||
return (category_list, count)
|
||||
|
||||
except SqlAlchemyDatabaseError as e:
|
||||
logger.critical("Connection with database interrupted")
|
||||
raise DatabaseConnectionError("Connection with database interrupted") from e
|
||||
except SQLAlchemyError as e:
|
||||
logger.error(f"An error occurred when fetching all category overviews: {e}")
|
||||
raise DatabaseError("An error occurred when fetching all category overviews") from e
|
||||
|
||||
|
||||
__all__ = ["fetch_all_book_category_statistics_overviews", "fetch_all_book_category_statistics_overviews_with_count"]
|
25
src/database/book_overview.py
Normal file
25
src/database/book_overview.py
Normal file
@ -0,0 +1,25 @@
|
||||
from typing import Dict, List
|
||||
import logging
|
||||
|
||||
from sqlalchemy.exc import IntegrityError, SQLAlchemyError, DatabaseError as SqlAlchemyDatabaseError
|
||||
|
||||
from utils.errors.database import DatabaseError, DuplicateEntryError, DatabaseConnectionError
|
||||
from models import BooksOverview
|
||||
from database.manager import DatabaseManager
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def fetch_all_book_overviews() -> List[BooksOverview]:
|
||||
with DatabaseManager.get_session() as session:
|
||||
try:
|
||||
return session.query(BooksOverview).all()
|
||||
except SqlAlchemyDatabaseError as e:
|
||||
logger.critical("Connection with database interrupted")
|
||||
raise DatabaseConnectionError("Connection with database interrupted") from e
|
||||
except SQLAlchemyError as e:
|
||||
logger.error(f"An error occurred when fetching all books: {e}")
|
||||
raise DatabaseError("An error occurred when fetching all books") from e
|
||||
|
||||
|
||||
__all__ = ["fetch_all_book_overviews"]
|
@ -5,7 +5,7 @@ from sqlalchemy import create_engine, text
|
||||
|
||||
from sqlalchemy.exc import DatabaseError
|
||||
|
||||
from utils.config import DatabaseConfig
|
||||
from utils.config import DatabaseConfig, UserConfig
|
||||
from utils.errors.database import DatabaseConnectionError
|
||||
|
||||
|
||||
@ -22,6 +22,7 @@ class DatabaseManager():
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self.logger.info("Reading database config")
|
||||
database_config = DatabaseConfig()
|
||||
user_config = UserConfig()
|
||||
self.engine = create_engine('mysql+mysqlconnector://%s:%s@%s:%s/%s' % (
|
||||
database_config.user,
|
||||
database_config.password,
|
||||
@ -29,27 +30,34 @@ class DatabaseManager():
|
||||
database_config.port,
|
||||
database_config.name),
|
||||
pool_pre_ping=True,
|
||||
echo=True)
|
||||
if self.test_connection():
|
||||
self.Session = sessionmaker(bind=self.engine)
|
||||
isolation_level=user_config.transaction_level.value)
|
||||
self.test_connection()
|
||||
self.Session = sessionmaker(bind=self.engine)
|
||||
|
||||
def cleanup(self) -> None:
|
||||
self.logger.debug("Closing connection")
|
||||
self.engine.dispose()
|
||||
|
||||
def test_connection(self) -> bool:
|
||||
def test_connection(self):
|
||||
self.logger.debug("Testing database connection")
|
||||
try:
|
||||
with self.engine.connect() as connection:
|
||||
connection.execute(text("select 1"))
|
||||
self.logger.debug("Database connection successful")
|
||||
return True
|
||||
except DatabaseError as e:
|
||||
self.logger.critical(f"Database connection failed: {e}")
|
||||
raise DatabaseConnectionError("Database connection failed") from e
|
||||
|
||||
@classmethod
|
||||
def get_session(cls) -> Session:
|
||||
return DatabaseManager._instance.Session()
|
||||
user_config = UserConfig()
|
||||
|
||||
# Get the transaction level as a string (e.g., "READ COMMITTED", "SERIALIZABLE")
|
||||
isolation_level = user_config.transaction_level.value
|
||||
|
||||
# Create a session with the appropriate transaction isolation level
|
||||
session = cls._instance.Session()
|
||||
session.connection(execution_options={"isolation_level": isolation_level})
|
||||
return session
|
||||
|
||||
__all__ = ["DatabaseManager"]
|
@ -1,6 +1,8 @@
|
||||
import logging
|
||||
from typing import List, Dict
|
||||
|
||||
from sqlalchemy.exc import IntegrityError, SQLAlchemyError, DatabaseError
|
||||
from sqlalchemy.exc import IntegrityError, SQLAlchemyError, DatabaseError as SqlAlchemyDatabaseError
|
||||
from sqlalchemy import delete
|
||||
|
||||
from utils.errors.database import DatabaseError, DuplicateEntryError, DatabaseConnectionError
|
||||
from models import Member
|
||||
@ -8,26 +10,113 @@ from database.manager import DatabaseManager
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def fetch_all_members() -> List[Member]:
|
||||
"""
|
||||
Fetches all members from the database.
|
||||
|
||||
def create_new_member(new_member: Member):
|
||||
:return: A list of all members in the database.
|
||||
:raises DatabaseConnectionError: If the connection to the database is interrupted.
|
||||
:raises DatabaseError: If any other error occurs while fetching members.
|
||||
"""
|
||||
with DatabaseManager.get_session() as session:
|
||||
try:
|
||||
return session.query(Member).all()
|
||||
except SqlAlchemyDatabaseError as e:
|
||||
logger.critical("Connection with database interrupted")
|
||||
raise DatabaseConnectionError("Connection with database interrupted") from e
|
||||
except SQLAlchemyError as e:
|
||||
logger.error(f"An error occurred when fetching all members: {e}")
|
||||
raise DatabaseError("An error occurred when fetching all members") from e
|
||||
|
||||
def create_member(new_member: Dict[str, str]):
|
||||
create_members([new_member])
|
||||
|
||||
def create_members(members: List[Dict[str, str]]):
|
||||
try:
|
||||
with DatabaseManager.get_session() as session:
|
||||
session.add(new_member)
|
||||
for member_dict in members:
|
||||
member = Member(
|
||||
first_name=member_dict["first_name"],
|
||||
last_name=member_dict["last_name"],
|
||||
email=member_dict["email"],
|
||||
phone=member_dict["phone_number"]
|
||||
)
|
||||
session.add(member)
|
||||
|
||||
session.commit()
|
||||
except IntegrityError as e:
|
||||
logger.warning("Data already exists")
|
||||
session.rollback()
|
||||
raise DuplicateEntryError("Data already exists in the database") from e
|
||||
|
||||
if "email" in str(e.orig):
|
||||
logger.warning("Email is already in use")
|
||||
raise DuplicateEntryError("Email", "Email is already in use") from e
|
||||
elif "phone" in str(e.orig):
|
||||
logger.warning("Phone number is already in use")
|
||||
raise DuplicateEntryError("Phone number", "Phone number is already in use") from e
|
||||
else:
|
||||
logger.error("Member exists already in the database")
|
||||
raise DatabaseError("Member exists already") from e
|
||||
|
||||
except DatabaseError as e:
|
||||
session.rollback()
|
||||
logger.critical("Connection with database interrupted")
|
||||
raise DatabaseConnectionError(
|
||||
"Connection with database interrupted") from e
|
||||
raise DatabaseConnectionError("Connection with database interrupted") from e
|
||||
except SQLAlchemyError as e:
|
||||
logger.error(f"An error occured when saving member: {e}")
|
||||
session.rollback()
|
||||
raise DatabaseError(
|
||||
"An error occured when creating a new member") from e
|
||||
logger.error(f"An error occurred when saving member: {e}")
|
||||
raise DatabaseError("An error occurred when creating a new member") from e
|
||||
|
||||
def update_member(member: Dict[str, str]):
|
||||
try:
|
||||
with DatabaseManager.get_session() as session:
|
||||
logger.debug(f"Editing member {member['first_name']} {member['last_name']}")
|
||||
|
||||
existing_member = session.query(Member).get(member["id"])
|
||||
|
||||
if not existing_member:
|
||||
logger.warning(f"Member with ID {member['id']} not found")
|
||||
raise DatabaseError("Member not found in database")
|
||||
|
||||
existing_member.first_name = member["first_name"]
|
||||
existing_member.last_name = member["last_name"]
|
||||
existing_member.email = member["email"]
|
||||
existing_member.phone = member["phone"]
|
||||
|
||||
session.commit()
|
||||
except IntegrityError as e:
|
||||
session.rollback()
|
||||
|
||||
if "email" in str(e.orig):
|
||||
logger.warning("Email is already in use")
|
||||
raise DuplicateEntryError("Email", "Email is already in use") from e
|
||||
elif "phone" in str(e.orig):
|
||||
logger.warning("Phone number is already in use")
|
||||
raise DuplicateEntryError("Phone number", "Phone number is already in use") from e
|
||||
else:
|
||||
logger.error("An error occurred when updating member")
|
||||
raise DatabaseError("An error occurred when updating member") from e
|
||||
|
||||
except DatabaseError as e:
|
||||
session.rollback()
|
||||
logger.critical("Connection with database interrupted")
|
||||
raise DatabaseConnectionError("Connection with database interrupted") from e
|
||||
except SQLAlchemyError as e:
|
||||
session.rollback()
|
||||
logger.error(f"An error occurred when saving member: {e}")
|
||||
raise DatabaseError("An error occurred when creating a new member") from e
|
||||
|
||||
|
||||
__all__ = ["create_new_member"]
|
||||
def delete_member(member_id: int) -> None:
|
||||
try:
|
||||
with DatabaseManager.get_session() as session:
|
||||
stmt = delete(Member).where(Member.id == member_id)
|
||||
session.execute(stmt)
|
||||
session.commit()
|
||||
except SqlAlchemyDatabaseError as e:
|
||||
logger.critical("Connection with database interrupted")
|
||||
raise DatabaseConnectionError("Connection with database interrupted") from e
|
||||
except SQLAlchemyError as e:
|
||||
logger.error(f"An error occurred when deleting member: {e}")
|
||||
raise DatabaseError("An error occurred when deleting member") from e
|
||||
|
||||
__all__ = ["create_member", "create_members", "fetch_all_members", "delete_member"]
|
||||
|
@ -1,76 +0,0 @@
|
||||
from typing import Optional
|
||||
import xml.etree.ElementTree as ET
|
||||
from xml.dom import minidom
|
||||
|
||||
from database.manager import DatabaseManager
|
||||
from utils.errors.export_error import ExportError
|
||||
from utils.errors.no_export_entity_error import NoExportEntityError
|
||||
|
||||
from models import Book
|
||||
|
||||
|
||||
class BookExporter():
|
||||
def save_xml(self, file_path: str):
|
||||
|
||||
xml = self._get_full_xml()
|
||||
|
||||
with open(file_path, "w", encoding="utf-8") as file:
|
||||
file.write(xml)
|
||||
|
||||
def _get_full_xml(self) -> str:
|
||||
root = ET.Element("books")
|
||||
|
||||
with DatabaseManager.get_session() as session:
|
||||
self.books = session.query(Book).all()
|
||||
|
||||
if not self.books:
|
||||
raise NoExportEntityError("No books found to export")
|
||||
|
||||
for book in self.books:
|
||||
# Create a <book> element
|
||||
book_element = ET.SubElement(root, "book")
|
||||
|
||||
# Add <title>
|
||||
title_element = ET.SubElement(book_element, "title")
|
||||
title_element.text = book.title
|
||||
|
||||
# Add <author>
|
||||
author_element = ET.SubElement(book_element, "author")
|
||||
|
||||
# Add <first_name>
|
||||
author_first_name_element = ET.SubElement(
|
||||
author_element, "first_name")
|
||||
author_first_name_element.text = book.author.first_name
|
||||
|
||||
author_last_name_element = ET.SubElement(
|
||||
author_element, "last_name")
|
||||
author_last_name_element.text = book.author.last_name
|
||||
|
||||
# Add <description>
|
||||
description_element = ET.SubElement(
|
||||
book_element, "description")
|
||||
description_element.text = book.description
|
||||
|
||||
# Add <year_published>
|
||||
year_published_element = ET.SubElement(
|
||||
book_element, "year_published")
|
||||
year_published_element.text = book.year_published
|
||||
|
||||
# Add <isbn>
|
||||
isbn_element = ET.SubElement(book_element, "isbn")
|
||||
isbn_element.text = book.isbn
|
||||
|
||||
# Add <categories>
|
||||
categories_element = ET.SubElement(book_element, "categories")
|
||||
for category in book.categories:
|
||||
category_element = ET.SubElement(
|
||||
categories_element, "category")
|
||||
category_element.text = category.name
|
||||
|
||||
# Convert the tree to a string
|
||||
tree_str = ET.tostring(root, encoding="unicode")
|
||||
|
||||
# Pretty print the XML
|
||||
pretty_xml = minidom.parseString(
|
||||
tree_str).toprettyxml(indent=(" " * 4))
|
||||
return pretty_xml
|
@ -1,133 +0,0 @@
|
||||
from typing import List, Dict
|
||||
import os
|
||||
import logging
|
||||
from xml.etree import ElementTree as ET
|
||||
from xmlschema import XMLSchema
|
||||
from database.manager import DatabaseManager
|
||||
from utils.errors.import_error.xsd_scheme_not_found import XsdSchemeNotFoundError
|
||||
from utils.errors.import_error.invalid_contents_error import InvalidContentsError
|
||||
from utils.errors.import_error.import_error import ImportError
|
||||
from models import Book, Author, BookCategory
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
|
||||
class BookImporter:
|
||||
def __init__(self):
|
||||
# Initialize the logger and schema
|
||||
self.logger = logging.getLogger(__name__)
|
||||
try:
|
||||
self.logger.debug("Opening XSD scheme in ./")
|
||||
scheme_path = os.path.join(os.path.dirname(__file__), "book_import_scheme.xsd")
|
||||
self.schema = XMLSchema(scheme_path)
|
||||
except Exception as e:
|
||||
self.logger.error("Failed to load XSD scheme")
|
||||
raise XsdSchemeNotFoundError(f"Failed to load XSD schema: {e}")
|
||||
|
||||
def parse_xml(self, file_path: str) -> List[Dict[str, object]]:
|
||||
"""Parses the XML file and validates it against the XSD schema."""
|
||||
try:
|
||||
tree = ET.parse(file_path)
|
||||
root = tree.getroot()
|
||||
|
||||
if not self.schema.is_valid(file_path):
|
||||
raise InvalidContentsError("XML file is not valid according to XSD schema.")
|
||||
|
||||
books = []
|
||||
for book_element in root.findall("book"):
|
||||
title = book_element.find("title").text
|
||||
year_published = book_element.find("year_published").text
|
||||
description = book_element.find("description").text
|
||||
isbn = book_element.find("isbn").text
|
||||
|
||||
# Parse author
|
||||
author_element = book_element.find("author")
|
||||
author = {
|
||||
"first_name": author_element.find("first_name").text,
|
||||
"last_name": author_element.find("last_name").text
|
||||
}
|
||||
|
||||
# Parse categories
|
||||
category_elements = book_element.find("categories").findall("category")
|
||||
categories = [category_element.text for category_element in category_elements]
|
||||
|
||||
# Create a book dictionary with explicit types
|
||||
book = {
|
||||
"title": title,
|
||||
"description": description,
|
||||
"year_published": year_published,
|
||||
"isbn": isbn,
|
||||
"author": author,
|
||||
"categories": categories
|
||||
}
|
||||
books.append(book)
|
||||
|
||||
return books
|
||||
except ET.ParseError as e:
|
||||
raise ImportError(f"Failed to parse XML file: {e}")
|
||||
|
||||
def save_books(self, books: List[Dict[str, object]]):
|
||||
"""Saves a list of books to the database."""
|
||||
try:
|
||||
with DatabaseManager.get_session() as session:
|
||||
processed_categories = {} # Cache for processed categories by name
|
||||
|
||||
for book_dict in books:
|
||||
self.logger.debug(f"Attempting to save {book_dict['title']}")
|
||||
|
||||
# Check if the book already exists
|
||||
existing_book = session.query(Book).filter_by(isbn=book_dict["isbn"]).first()
|
||||
if existing_book:
|
||||
self.logger.warning(f"ISBN {book_dict['isbn']} already exists. Skipping.")
|
||||
continue
|
||||
|
||||
# Check or add the author
|
||||
existing_author = session.query(Author).filter_by(
|
||||
first_name=book_dict["author"]["first_name"],
|
||||
last_name=book_dict["author"]["last_name"]
|
||||
).one_or_none()
|
||||
|
||||
if existing_author is not None:
|
||||
self.logger.debug(f"Author {existing_author.first_name} {existing_author.last_name} already exists. Reusing.")
|
||||
author = existing_author
|
||||
else:
|
||||
self.logger.debug(f"Creating new author: {book_dict['author']['first_name']} {book_dict['author']['last_name']}")
|
||||
author = Author(
|
||||
first_name=book_dict["author"]["first_name"],
|
||||
last_name=book_dict["author"]["last_name"]
|
||||
)
|
||||
session.add(author)
|
||||
|
||||
# Handle categories
|
||||
filtered_categories = []
|
||||
for category_name in book_dict["categories"]:
|
||||
if category_name in processed_categories:
|
||||
filtered_categories.append(processed_categories[category_name])
|
||||
continue
|
||||
|
||||
existing_category = session.query(BookCategory).filter_by(name=category_name).one_or_none()
|
||||
if existing_category is not None:
|
||||
self.logger.debug(f"Category {category_name} already exists. Reusing.")
|
||||
processed_categories[category_name] = existing_category
|
||||
filtered_categories.append(existing_category)
|
||||
else:
|
||||
self.logger.debug(f"Adding new category: {category_name}")
|
||||
new_category = BookCategory(name=category_name)
|
||||
session.add(new_category)
|
||||
processed_categories[category_name] = new_category
|
||||
filtered_categories.append(new_category)
|
||||
|
||||
book = Book(
|
||||
title=book_dict["title"],
|
||||
description=book_dict["description"],
|
||||
year_published=book_dict["year_published"],
|
||||
isbn=book_dict["isbn"],
|
||||
author=author,
|
||||
categories=filtered_categories
|
||||
)
|
||||
session.add(book)
|
||||
# Commit all changes
|
||||
session.commit()
|
||||
except e:
|
||||
session.rollback()
|
||||
raise ImportError(f"An error occurred when importing books: {e}") from e
|
||||
finally:
|
||||
session.close()
|
@ -1,19 +1,19 @@
|
||||
from .author import *
|
||||
from .book import *
|
||||
from .book_category import *
|
||||
from .book_category_link import *
|
||||
from .book_overview import *
|
||||
from .member import *
|
||||
from .librarian import *
|
||||
from .loan import *
|
||||
from .author_model import *
|
||||
from .book_model import *
|
||||
from .book_category_model import *
|
||||
from .book_category_link_model import *
|
||||
from .book_category_statistics_model import *
|
||||
from .book_category_statistics_overview_model import *
|
||||
from .book_overview_model import *
|
||||
from .member_model import *
|
||||
|
||||
__all__ = [
|
||||
*author.__all__,
|
||||
*book.__all__,
|
||||
*book_category.__all__,
|
||||
*book_category_link.__all__,
|
||||
*book_overview.__all__,
|
||||
*member.__all__,
|
||||
*librarian.__all__,
|
||||
*loan.__all__
|
||||
*author_model.__all__,
|
||||
*book_model.__all__,
|
||||
*book_category_model.__all__,
|
||||
*book_category_link_model.__all__,
|
||||
*book_category_statistics_model.__all__,
|
||||
*book_category_statistics_overview_model.__all__,
|
||||
*book_overview_model.__all__,
|
||||
*member_model.__all__,
|
||||
]
|
||||
|
@ -1,7 +1,7 @@
|
||||
from sqlalchemy import Column, Integer, String, TIMESTAMP, UniqueConstraint, func
|
||||
from sqlalchemy.orm import relationship
|
||||
|
||||
from .base import Base
|
||||
from .base_model import Base
|
||||
|
||||
|
||||
class Author(Base):
|
||||
@ -16,5 +16,8 @@ class Author(Base):
|
||||
# Reference 'Book' as a string to avoid direct import
|
||||
books = relationship('Book', back_populates='author')
|
||||
|
||||
def to_dict(self):
|
||||
return {col.name: getattr(self, col.name) for col in self.__table__.columns}
|
||||
|
||||
|
||||
__all__ = ["Author"]
|
@ -1,9 +1,9 @@
|
||||
from sqlalchemy import Column, Integer, ForeignKey
|
||||
from sqlalchemy.orm import relationship
|
||||
|
||||
from .book import Book
|
||||
from .book_category import BookCategory
|
||||
from .base import Base
|
||||
from .book_model import Book
|
||||
from .book_category_model import BookCategory
|
||||
from .base_model import Base
|
||||
|
||||
|
||||
class BookCategoryLink(Base):
|
@ -1,25 +1,23 @@
|
||||
from sqlalchemy import Column, Integer, String, TIMESTAMP, ForeignKey, UniqueConstraint, func
|
||||
from sqlalchemy.orm import relationship
|
||||
|
||||
from .base import Base
|
||||
from .base_model import Base
|
||||
|
||||
class BookCategory(Base):
|
||||
__tablename__ = 'book_category'
|
||||
__table_args__ = (UniqueConstraint('name'),)
|
||||
|
||||
id = Column(Integer, primary_key=True, autoincrement=True)
|
||||
parent_category_id = Column(Integer, ForeignKey('book_category.id'), nullable=True)
|
||||
name = Column(String(100), nullable=False)
|
||||
mature_content = Column(Integer, nullable=False, default=0)
|
||||
last_updated = Column(TIMESTAMP, nullable=False, server_default=func.now())
|
||||
|
||||
parent_category = relationship('BookCategory', remote_side=[id])
|
||||
|
||||
books = relationship(
|
||||
'Book',
|
||||
secondary='book_category_link', # Junction table
|
||||
back_populates='categories' # For bidirectional relationship
|
||||
secondary='book_category_link',
|
||||
back_populates='categories',
|
||||
)
|
||||
book_category_statistics = relationship('BookCategoryStatistics', backref='book_category_statistics')
|
||||
|
||||
|
||||
|
||||
__all__ = ["BookCategory"]
|
21
src/models/book_category_statistics_model.py
Normal file
21
src/models/book_category_statistics_model.py
Normal file
@ -0,0 +1,21 @@
|
||||
from sqlalchemy import Column, Integer, ForeignKey
|
||||
|
||||
from sqlalchemy.dialects.mysql import INTEGER
|
||||
from sqlalchemy.orm import relationship
|
||||
|
||||
from .base_model import Base
|
||||
|
||||
class BookCategoryStatistics(Base):
|
||||
__tablename__ = 'book_category_statistics'
|
||||
|
||||
book_category_id = Column(Integer, ForeignKey('book_category.id', ondelete="cascade"), primary_key=True)
|
||||
book_count = Column(INTEGER(unsigned=True), nullable=False, default=0)
|
||||
|
||||
category = relationship(
|
||||
'BookCategory',
|
||||
back_populates='book_category_statistics',
|
||||
overlaps="book_category_statistics"
|
||||
)
|
||||
|
||||
|
||||
__all__ = ["BookCategoryStatistics"]
|
19
src/models/book_category_statistics_overview_model.py
Normal file
19
src/models/book_category_statistics_overview_model.py
Normal file
@ -0,0 +1,19 @@
|
||||
from sqlalchemy import Column, String, TIMESTAMP, Integer, Text, Enum
|
||||
from sqlalchemy.dialects.mysql import INTEGER
|
||||
|
||||
from .base_model import Base
|
||||
|
||||
|
||||
class BookCategoryStatisticsOverview(Base):
|
||||
__tablename__ = 'book_category_statistics_overview'
|
||||
__table_args__ = {'extend_existing': True}
|
||||
|
||||
id = Column(Integer, primary_key=True)
|
||||
name = Column(String)
|
||||
book_count = Column(INTEGER(unsigned=True), default=0)
|
||||
|
||||
def __repr__(self):
|
||||
return (f"<BookCategoryStatisticsOverview(book_category_id={self.id}, book_count={self.book_count})>")
|
||||
|
||||
|
||||
__all__ = ["BookCategoryStatisticsOverview"]
|
@ -1,9 +1,9 @@
|
||||
import enum
|
||||
|
||||
from sqlalchemy import Column, Integer, String, TIMESTAMP, Text, ForeignKey, Enum, UniqueConstraint, func
|
||||
from sqlalchemy import Column, Integer, String, TIMESTAMP, Text, ForeignKey, Enum, UniqueConstraint, func, DECIMAL,Boolean
|
||||
from sqlalchemy.orm import relationship
|
||||
|
||||
from .base import Base
|
||||
from .base_model import Base
|
||||
|
||||
|
||||
class BookStatusEnum(enum.Enum):
|
||||
@ -22,12 +22,25 @@ class Book(Base):
|
||||
description = Column(Text, nullable=False)
|
||||
year_published = Column(String(4), nullable=False)
|
||||
isbn = Column(String(13), nullable=False, unique=True)
|
||||
price = Column(DECIMAL(5, 2), nullable=False, default=0)
|
||||
is_damaged = Column(Boolean, nullable=False, default=False)
|
||||
status = Column(Enum(BookStatusEnum), nullable=False, default=BookStatusEnum.available)
|
||||
created_at = Column(TIMESTAMP, nullable=False, server_default=func.now())
|
||||
last_updated = Column(TIMESTAMP, nullable=False, server_default=func.now())
|
||||
|
||||
author = relationship('Author', back_populates='books')
|
||||
categories = relationship('BookCategory',secondary='book_category_link',back_populates='books')
|
||||
author = relationship('Author', back_populates='books')
|
||||
categories = relationship('BookCategory',secondary='book_category_link',back_populates='books')
|
||||
|
||||
def to_dict(self):
|
||||
book_dict = {col.name: getattr(self, col.name) for col in self.__table__.columns}
|
||||
|
||||
book_dict['author'] = {
|
||||
'first_name': self.author.first_name,
|
||||
'last_name': self.author.last_name
|
||||
}
|
||||
|
||||
book_dict['categories'] = [category.name for category in self.categories]
|
||||
|
||||
return book_dict
|
||||
|
||||
__all__ = ["Book", "BookStatusEnum"]
|
@ -1,8 +1,7 @@
|
||||
from sqlalchemy import Column, String, TIMESTAMP, Integer, Text, Enum
|
||||
from sqlalchemy import Column, String, TIMESTAMP, Integer, Text, Enum, DECIMAL, Boolean
|
||||
|
||||
from .base import Base
|
||||
|
||||
from models.book import BookStatusEnum
|
||||
from .base_model import Base
|
||||
from .book_model import BookStatusEnum
|
||||
|
||||
class BooksOverview(Base):
|
||||
__tablename__ = 'books_overview'
|
||||
@ -16,12 +15,10 @@ class BooksOverview(Base):
|
||||
categories = Column(Text, nullable=True)
|
||||
year_published = Column(Integer, nullable=True)
|
||||
isbn = Column(String, nullable=True)
|
||||
price = Column(DECIMAL(5, 2), nullable=False, default=0)
|
||||
is_damaged = Column(Boolean, nullable=False, default=False)
|
||||
status = Column(Enum(BookStatusEnum), nullable=False)
|
||||
created_at = Column(TIMESTAMP, nullable=False)
|
||||
borrower_name = Column(String, nullable=True)
|
||||
member_id = Column(Integer, nullable=True)
|
||||
librarian_name = Column(String, nullable=True)
|
||||
librarian_id = Column(Integer, nullable=True)
|
||||
|
||||
# This prevents accidental updates/deletes as it's a view
|
||||
def __repr__(self):
|
@ -1,33 +0,0 @@
|
||||
import enum
|
||||
|
||||
from sqlalchemy import Column, Integer, String, TIMESTAMP, Text, ForeignKey, Enum, UniqueConstraint, func
|
||||
from sqlalchemy.orm import relationship
|
||||
|
||||
from .base import Base
|
||||
|
||||
|
||||
class LibrarianStatusEnum(enum.Enum):
|
||||
active = 'active'
|
||||
inactive = 'inactive'
|
||||
|
||||
class LibrarianRoleEnum(enum.Enum):
|
||||
staff = 'staff'
|
||||
admin = 'admin'
|
||||
|
||||
|
||||
class Librarian(Base):
|
||||
__tablename__ = 'librarian'
|
||||
__table_args__ = (UniqueConstraint('id'),)
|
||||
|
||||
id = Column(Integer, primary_key=True, autoincrement=True)
|
||||
first_name = Column(String(50), nullable=False)
|
||||
last_name = Column(String(50), nullable=False)
|
||||
email = Column(String(100), nullable=False, unique=True)
|
||||
phone = Column(String(20), nullable=False)
|
||||
hire_date = Column(TIMESTAMP, nullable=False, server_default=func.now())
|
||||
status = Column(Enum(LibrarianStatusEnum), nullable=False, default=LibrarianStatusEnum.active)
|
||||
role = Column(Enum(LibrarianRoleEnum), nullable=False)
|
||||
last_updated = Column(TIMESTAMP, nullable=False, server_default=func.now())
|
||||
|
||||
|
||||
__all__ = ["Librarian", "LibrarianRoleEnum", "LibrarianStatusEnum"]
|
@ -1,39 +0,0 @@
|
||||
import enum
|
||||
|
||||
from sqlalchemy import Column, Integer, String, TIMESTAMP, Text, ForeignKey, Enum, UniqueConstraint, Float, func
|
||||
from sqlalchemy.orm import relationship
|
||||
|
||||
from .base import Base
|
||||
from .book import Book
|
||||
from .member import Member
|
||||
from .librarian import Librarian
|
||||
|
||||
|
||||
class LoanStatusEnum(enum.Enum):
|
||||
borrowed = 'borrowed'
|
||||
returned = 'returned'
|
||||
overdue = 'overdue'
|
||||
reserved = 'reserved'
|
||||
|
||||
|
||||
class Loan(Base):
|
||||
__tablename__ = 'loan'
|
||||
__table_args__ = (UniqueConstraint('id'),)
|
||||
|
||||
id = Column(Integer, primary_key=True, autoincrement=True)
|
||||
book_id = Column(Integer, ForeignKey('book.id'), nullable=False)
|
||||
member_id = Column(Integer, ForeignKey('member.id'), nullable=False)
|
||||
librarian_id = Column(Integer, ForeignKey('librarian.id'), nullable=False)
|
||||
loan_date = Column(TIMESTAMP, nullable=False, server_default=func.now())
|
||||
due_date = Column(TIMESTAMP, nullable=False)
|
||||
return_date = Column(TIMESTAMP, nullable=True)
|
||||
status = Column(Enum(LoanStatusEnum), nullable=False, default=LoanStatusEnum.borrowed)
|
||||
overdue_fee = Column(Float, nullable=True)
|
||||
last_updated = Column(TIMESTAMP, nullable=False, server_default=func.now())
|
||||
|
||||
book = relationship('Book', backref='loans')
|
||||
member = relationship('Member', backref='loans')
|
||||
librarian = relationship('Librarian', backref='loans')
|
||||
|
||||
|
||||
__all__ = ["Loan", "LoanStatusEnum"]
|
@ -3,7 +3,7 @@ import enum
|
||||
from sqlalchemy import Column, Integer, String, TIMESTAMP, Text, ForeignKey, Enum, UniqueConstraint, func
|
||||
from sqlalchemy.orm import relationship
|
||||
|
||||
from .base import Base
|
||||
from .base_model import Base
|
||||
|
||||
|
||||
class MemberStatusEnum(enum.Enum):
|
||||
@ -13,7 +13,7 @@ class MemberStatusEnum(enum.Enum):
|
||||
|
||||
class Member(Base):
|
||||
__tablename__ = 'member'
|
||||
__table_args__ = (UniqueConstraint('id'),)
|
||||
__table_args__ = (UniqueConstraint('id'), UniqueConstraint('email'), UniqueConstraint('phone'))
|
||||
|
||||
id = Column(Integer, primary_key=True, autoincrement=True)
|
||||
first_name = Column(String(50), nullable=False)
|
||||
@ -24,5 +24,8 @@ class Member(Base):
|
||||
status = Column(Enum(MemberStatusEnum), nullable=True, default=MemberStatusEnum.active)
|
||||
last_updated = Column(TIMESTAMP, nullable=True)
|
||||
|
||||
def to_dict(self):
|
||||
return {col.name: getattr(self, col.name) for col in self.__table__.columns}
|
||||
|
||||
|
||||
__all__ = ["Member", "MemberStatusEnum"]
|
@ -5,3 +5,4 @@ PySide6_Essentials==6.8.1
|
||||
python-dotenv==1.0.1
|
||||
SQLAlchemy==2.0.36
|
||||
xmlschema==3.4.3
|
||||
mysql-connector-python==9.1.0
|
61
src/services/book_category_statistics_service.py
Normal file
61
src/services/book_category_statistics_service.py
Normal file
@ -0,0 +1,61 @@
|
||||
from typing import List
|
||||
import logging
|
||||
|
||||
import xml.etree.ElementTree as ET
|
||||
from xml.dom import minidom
|
||||
from xmlschema import XMLSchema
|
||||
|
||||
from utils.errors import (
|
||||
NoExportEntityError,
|
||||
ExportError,
|
||||
ExportFileError,
|
||||
InvalidContentsError,
|
||||
XsdSchemeNotFoundError,
|
||||
ImportError,
|
||||
)
|
||||
from models import BookCategoryStatisticsOverview
|
||||
from assets import asset_manager
|
||||
|
||||
from utils.errors import DatabaseConnectionError, DatabaseError
|
||||
|
||||
from database import fetch_all_book_category_statistics_overviews_with_count
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def export_to_xml(file_path: str) -> None:
|
||||
try:
|
||||
category_list, count = fetch_all_book_category_statistics_overviews_with_count()
|
||||
|
||||
if not category_list:
|
||||
raise NoExportEntityError("No categories found to export")
|
||||
|
||||
xml = category_statistics_to_xml(category_list, count)
|
||||
|
||||
with open(file_path, "w", encoding="utf-8") as file:
|
||||
file.write(xml)
|
||||
except OSError as e:
|
||||
raise ExportFileError("Failed to save to a file") from e
|
||||
except DatabaseConnectionError as e:
|
||||
raise ExportError("An error with database occurred. Try again later") from e
|
||||
except DatabaseError as e:
|
||||
raise ExportError("Unknown error occurred") from e
|
||||
|
||||
def category_statistics_to_xml(category_statistics: List[BookCategoryStatisticsOverview], total_count: int):
|
||||
root = ET.Element("statistics")
|
||||
|
||||
for statistic in category_statistics:
|
||||
category_element = ET.SubElement(root, "category")
|
||||
|
||||
name_element = ET.SubElement(category_element, "name")
|
||||
name_element.text = statistic.name
|
||||
|
||||
count_element = ET.SubElement(category_element, "count")
|
||||
count_element.text = str(statistic.book_count)
|
||||
|
||||
total_count_element = ET.SubElement(root, "total_category_count")
|
||||
total_count_element.text = str(total_count)
|
||||
|
||||
tree_str = ET.tostring(root, encoding="unicode")
|
||||
|
||||
pretty_xml = minidom.parseString(tree_str).toprettyxml(indent=(" " * 4))
|
||||
return pretty_xml
|
78
src/services/book_overview_service.py
Normal file
78
src/services/book_overview_service.py
Normal file
@ -0,0 +1,78 @@
|
||||
import os
|
||||
import logging
|
||||
from typing import Optional, List, Dict
|
||||
import xml.etree.ElementTree as ET
|
||||
from xml.dom import minidom
|
||||
from xmlschema import XMLSchema
|
||||
|
||||
from utils.errors import (
|
||||
NoExportEntityError,
|
||||
ExportError,
|
||||
ExportFileError,
|
||||
InvalidContentsError,
|
||||
XsdSchemeNotFoundError,
|
||||
ImportError,
|
||||
)
|
||||
|
||||
# Initialize logger and XML Schema
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
from models import BooksOverview
|
||||
|
||||
from database import fetch_all_book_overviews
|
||||
|
||||
def export_to_xml(file_path: str) -> None:
|
||||
logger.debug("Attempting to export book overview")
|
||||
all_books = fetch_all_book_overviews()
|
||||
|
||||
if not all_books:
|
||||
logger.warning("No books found to export")
|
||||
raise NoExportEntityError("No books found to export")
|
||||
|
||||
xml = overviews_to_xml(all_books)
|
||||
|
||||
try:
|
||||
with open(file_path, "w", encoding="utf-8") as file:
|
||||
file.write(xml)
|
||||
logger.info("Successfully saved book overview export")
|
||||
except OSError as e:
|
||||
raise ExportFileError("Failed to save to a file") from e
|
||||
|
||||
|
||||
def overviews_to_xml(overview_list: List[BooksOverview]) -> str:
|
||||
root = ET.Element("book_overview")
|
||||
|
||||
for book_overview in overview_list:
|
||||
# Create a <book_entry> element
|
||||
book_element = ET.SubElement(root, "book_entry")
|
||||
|
||||
# Add <title>
|
||||
title_element = ET.SubElement(book_element, "title")
|
||||
title_element.text = book_overview.title
|
||||
|
||||
# Add <author>
|
||||
author_element = ET.SubElement(book_element, "author")
|
||||
author_element.text = book_overview.author_name
|
||||
|
||||
# Add <year_published>
|
||||
year_published_element = ET.SubElement(book_element, "year_published")
|
||||
year_published_element.text = book_overview.year_published
|
||||
|
||||
# Add <isbn>
|
||||
isbn_element = ET.SubElement(book_element, "isbn")
|
||||
isbn_element.text = book_overview.isbn
|
||||
|
||||
# Add <borrower_name>
|
||||
borrower_name = ET.SubElement(book_element, "borrower_name")
|
||||
borrower_name.text = book_overview.borrower_name
|
||||
|
||||
# Add <librarian_name>
|
||||
librarian_name = ET.SubElement(book_element, "librarian_name")
|
||||
librarian_name.text = book_overview.librarian_name
|
||||
|
||||
# Convert the tree to a string
|
||||
tree_str = ET.tostring(root, encoding="unicode")
|
||||
|
||||
# Pretty print the XML
|
||||
pretty_xml = minidom.parseString(tree_str).toprettyxml(indent=(" " * 4))
|
||||
return pretty_xml
|
146
src/services/book_service.py
Normal file
146
src/services/book_service.py
Normal file
@ -0,0 +1,146 @@
|
||||
import os
|
||||
import logging
|
||||
from typing import Optional, List, Dict
|
||||
import xml.etree.ElementTree as ET
|
||||
from xml.dom import minidom
|
||||
from xmlschema import XMLSchema
|
||||
|
||||
from utils.errors import (
|
||||
NoExportEntityError,
|
||||
ExportError,
|
||||
ExportFileError,
|
||||
InvalidContentsError,
|
||||
XsdSchemeNotFoundError,
|
||||
ImportError,
|
||||
)
|
||||
from models import Book
|
||||
from database import fetch_all_books, create_books
|
||||
from assets import asset_manager
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
try:
|
||||
logger.debug("Loading XSD schema")
|
||||
SCHEMA = XMLSchema(asset_manager.get_asset("book_import_scheme.xsd"))
|
||||
except Exception as e:
|
||||
logger.error("Failed to load XSD schema")
|
||||
raise XsdSchemeNotFoundError(f"Failed to load XSD schema: {e}")
|
||||
|
||||
|
||||
def export_to_xml(file_path: str) -> None:
|
||||
all_books = fetch_all_books()
|
||||
|
||||
if not all_books:
|
||||
raise NoExportEntityError("No books found to export")
|
||||
|
||||
xml = books_to_xml(all_books)
|
||||
|
||||
try:
|
||||
with open(file_path, "w", encoding="utf-8") as file:
|
||||
file.write(xml)
|
||||
except OSError as e:
|
||||
raise ExportFileError("Failed to save to a file") from e
|
||||
|
||||
def save_books(books: List[Dict[str, object]]):
|
||||
create_books(books)
|
||||
|
||||
def parse_from_xml(file_path: str) -> List[Dict[str, object]]:
|
||||
if not SCHEMA.is_valid(file_path):
|
||||
raise InvalidContentsError("XML file is not valid according to XSD schema.")
|
||||
|
||||
try:
|
||||
tree = ET.parse(file_path)
|
||||
root = tree.getroot()
|
||||
|
||||
books = []
|
||||
for book_element in root.findall("book"):
|
||||
title = book_element.find("title").text
|
||||
year_published = book_element.find("year_published").text
|
||||
description = book_element.find("description").text
|
||||
isbn = book_element.find("isbn").text
|
||||
price = float(book_element.find("price").text)
|
||||
is_damaged = bool(book_element.find("is_damaged").text)
|
||||
|
||||
# Parse author
|
||||
author_element = book_element.find("author")
|
||||
author = {
|
||||
"first_name": author_element.find("first_name").text,
|
||||
"last_name": author_element.find("last_name").text,
|
||||
}
|
||||
|
||||
# Parse categories
|
||||
category_elements = book_element.find("categories").findall("category")
|
||||
categories = [category_element.text for category_element in category_elements]
|
||||
|
||||
# Create a book dictionary
|
||||
book = {
|
||||
"title" : title,
|
||||
"description" : description,
|
||||
"year_published" : year_published,
|
||||
"isbn" : isbn,
|
||||
"author" : author,
|
||||
"price": price,
|
||||
"is_damaged" : is_damaged,
|
||||
"categories" : categories,
|
||||
}
|
||||
books.append(book)
|
||||
|
||||
return books
|
||||
except ET.ParseError as e:
|
||||
raise ImportError(f"Failed to parse XML file: {e}")
|
||||
|
||||
|
||||
def books_to_xml(books: List[Book]) -> str:
|
||||
root = ET.Element("books")
|
||||
|
||||
for book in books:
|
||||
# Create a <book> element
|
||||
book_element = ET.SubElement(root, "book")
|
||||
|
||||
# Add <title>
|
||||
title_element = ET.SubElement(book_element, "title")
|
||||
title_element.text = book.title
|
||||
|
||||
# Add <author>
|
||||
author_element = ET.SubElement(book_element, "author")
|
||||
|
||||
# Add <first_name>
|
||||
author_first_name_element = ET.SubElement(author_element, "first_name")
|
||||
author_first_name_element.text = book.author.first_name
|
||||
|
||||
author_last_name_element = ET.SubElement(author_element, "last_name")
|
||||
author_last_name_element.text = book.author.last_name
|
||||
|
||||
# Add <description>
|
||||
description_element = ET.SubElement(book_element, "description")
|
||||
description_element.text = book.description
|
||||
|
||||
# Add <year_published>
|
||||
year_published_element = ET.SubElement(book_element, "year_published")
|
||||
year_published_element.text = book.year_published
|
||||
|
||||
# Add <isbn>
|
||||
isbn_element = ET.SubElement(book_element, "isbn")
|
||||
isbn_element.text = book.isbn
|
||||
|
||||
price_element = ET.SubElement(book_element, "price")
|
||||
price_element.text = str(book.price)
|
||||
|
||||
damaged_element = ET.SubElement(book_element, "is_damaged")
|
||||
damaged_element.text = str(book.is_damaged).lower()
|
||||
|
||||
# Add <categories>
|
||||
categories_element = ET.SubElement(book_element, "categories")
|
||||
for category in book.categories:
|
||||
category_element = ET.SubElement(categories_element, "category")
|
||||
category_element.text = category.name
|
||||
|
||||
# Convert the tree to a string
|
||||
tree_str = ET.tostring(root, encoding="unicode")
|
||||
|
||||
# Pretty print the XML
|
||||
pretty_xml = minidom.parseString(tree_str).toprettyxml(indent=(" " * 4))
|
||||
return pretty_xml
|
||||
|
||||
|
||||
__all__ = ["export_to_xml", "parse_from_xml"]
|
@ -1,156 +0,0 @@
|
||||
from PySide6.QtGui import QGuiApplication, QAction, Qt
|
||||
from PySide6.QtQml import QQmlApplicationEngine
|
||||
from PySide6.QtWidgets import QHBoxLayout, QVBoxLayout, QLabel, QWidget, QMenu, QSizePolicy, QLayout, QMessageBox
|
||||
from PySide6.QtCore import qDebug
|
||||
|
||||
from ui.editor import BookEditor
|
||||
|
||||
from models import BooksOverview, Book, BookStatusEnum
|
||||
|
||||
from database.manager import DatabaseManager
|
||||
|
||||
from sqlalchemy import delete
|
||||
|
||||
STATUS_TO_COLOR_MAP = {
|
||||
BookStatusEnum.available: "#3c702e",
|
||||
BookStatusEnum.borrowed: "#702525",
|
||||
BookStatusEnum.reserved: "#bc7613"
|
||||
}
|
||||
|
||||
|
||||
class BookCard(QWidget):
|
||||
def __init__(self, book_overview: BooksOverview):
|
||||
super().__init__()
|
||||
|
||||
self.book_overview = book_overview
|
||||
|
||||
self.setAttribute(Qt.WidgetAttribute.WA_Hover,
|
||||
True) # Enable hover events
|
||||
# Enable styling for background
|
||||
self.setAttribute(Qt.WidgetAttribute.WA_StyledBackground, True)
|
||||
|
||||
# Set initial stylesheet with hover behavior
|
||||
self.setStyleSheet("""
|
||||
BookCard:hover {
|
||||
background-color: palette(highlight);
|
||||
}
|
||||
""")
|
||||
|
||||
# Layout setup
|
||||
layout = QHBoxLayout(self)
|
||||
layout.setSizeConstraint(QLayout.SizeConstraint.SetMinimumSize)
|
||||
self.setSizePolicy(QSizePolicy.Policy.Preferred,
|
||||
QSizePolicy.Policy.Fixed)
|
||||
|
||||
layout.setContentsMargins(10, 10, 10, 10)
|
||||
layout.setSpacing(10)
|
||||
|
||||
# Left-side content
|
||||
left_side = QVBoxLayout()
|
||||
layout.addLayout(left_side)
|
||||
title_label = QLabel(book_overview.title)
|
||||
title_label.setStyleSheet("font-size: 20px; font-weight: bold;")
|
||||
author_label = QLabel("By: " + book_overview.author_name)
|
||||
isbn_label = QLabel("ISBN: " + (book_overview.isbn or "Not Available"))
|
||||
left_side.addWidget(title_label)
|
||||
left_side.addWidget(author_label)
|
||||
left_side.addWidget(isbn_label)
|
||||
|
||||
# Right-side content
|
||||
right_side = QVBoxLayout()
|
||||
layout.addLayout(right_side)
|
||||
|
||||
status_label = QLabel(str(book_overview.status.value.capitalize()))
|
||||
status_label.setStyleSheet(f"color: {
|
||||
STATUS_TO_COLOR_MAP[book_overview.status]}; font-size: 20px; font-weight: bold;")
|
||||
status_label.setAlignment(Qt.AlignmentFlag.AlignRight)
|
||||
|
||||
right_side.addWidget(status_label)
|
||||
|
||||
if book_overview.librarian_name and book_overview.borrower_name:
|
||||
borrower_label = QLabel("Borrowed: " + book_overview.borrower_name)
|
||||
borrower_label.setAlignment(Qt.AlignmentFlag.AlignRight)
|
||||
|
||||
librarian_label = QLabel("By: " + book_overview.librarian_name)
|
||||
librarian_label.setAlignment(Qt.AlignmentFlag.AlignRight)
|
||||
|
||||
right_side.addWidget(borrower_label)
|
||||
right_side.addWidget(librarian_label)
|
||||
|
||||
self.setLayout(layout)
|
||||
|
||||
def mousePressEvent(self, event):
|
||||
if event.button() == Qt.MouseButton.LeftButton:
|
||||
self.contextMenuEvent(event)
|
||||
else:
|
||||
super().mousePressEvent(event)
|
||||
|
||||
def contextMenuEvent(self, event):
|
||||
context_menu = QMenu(self)
|
||||
|
||||
action_edit_book = context_menu.addAction("Edit Book")
|
||||
action_edit_author = context_menu.addAction("Edit Author")
|
||||
action_mark_returned = context_menu.addAction("Mark as Returned")
|
||||
action_remove_reservation = context_menu.addAction(
|
||||
"Remove reservation")
|
||||
context_menu.addSeparator()
|
||||
delete_book_action = context_menu.addAction("Delete Book")
|
||||
delete_book_action.triggered.connect(self.delete_book)
|
||||
|
||||
if self.book_overview.status != BookStatusEnum.borrowed:
|
||||
action_mark_returned.setVisible(False)
|
||||
|
||||
if self.book_overview.status != BookStatusEnum.reserved:
|
||||
action_remove_reservation.setVisible(False)
|
||||
|
||||
action = context_menu.exec_(self.mapToGlobal(event.pos()))
|
||||
|
||||
if action == action_edit_book:
|
||||
with DatabaseManager.get_session() as session:
|
||||
book_id = self.book_overview.id
|
||||
|
||||
book = session.query(Book).filter(
|
||||
Book.id == book_id).one_or_none()
|
||||
|
||||
if book:
|
||||
BookEditor(book).exec()
|
||||
else:
|
||||
QMessageBox.critical(self,
|
||||
"Error",
|
||||
"The book you requested could not be found. Try again later",
|
||||
QMessageBox.StandardButton.Ok,
|
||||
QMessageBox.StandardButton.NoButton)
|
||||
elif action == action_edit_author:
|
||||
print("Edit Author selected")
|
||||
elif action == action_mark_returned:
|
||||
print("Mark as Returned selected")
|
||||
elif action == action_remove_reservation:
|
||||
print("Remove reservation selected")
|
||||
|
||||
def delete_book(self):
|
||||
if not self.make_sure():
|
||||
return
|
||||
|
||||
with DatabaseManager.get_session() as session:
|
||||
try:
|
||||
stmt = delete(Book).where(Book.id == self.book_overview.id)
|
||||
session.execute(stmt)
|
||||
session.commit()
|
||||
self.setVisible(False)
|
||||
except Exception as e:
|
||||
session.rollback
|
||||
print(e)
|
||||
|
||||
def make_sure(self) -> bool:
|
||||
are_you_sure_box = QMessageBox()
|
||||
are_you_sure_box.setIcon(QMessageBox.Question)
|
||||
are_you_sure_box.setWindowTitle("Are you sure?")
|
||||
are_you_sure_box.setText(f"Are you sure you want to delete {self.book_overview.title}?")
|
||||
are_you_sure_box.setStandardButtons(QMessageBox.Yes | QMessageBox.No)
|
||||
are_you_sure_box.setDefaultButton(QMessageBox.No)
|
||||
|
||||
# Show the message box and capture the user's response
|
||||
response = are_you_sure_box.exec()
|
||||
|
||||
# Handle the response
|
||||
return response == QMessageBox.Yes
|
@ -1,29 +1,33 @@
|
||||
from typing import Dict, Callable
|
||||
|
||||
import logging
|
||||
|
||||
from PySide6.QtWidgets import (
|
||||
QDialog, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QTextEdit, QPushButton, QComboBox, QFormLayout, QMessageBox
|
||||
QDialog, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QTextEdit, QPushButton, QComboBox, QFormLayout, QMessageBox, QCheckBox
|
||||
)
|
||||
from PySide6.QtGui import QRegularExpressionValidator
|
||||
from PySide6.QtCore import QRegularExpression
|
||||
from models import Book, BookStatusEnum
|
||||
from models import Book, BookStatusEnum, BookCategory
|
||||
|
||||
from database import update_book
|
||||
from database import update_book, create_book
|
||||
|
||||
from utils.errors.database import DatabaseError, DatabaseConnectionError, DuplicateEntryError
|
||||
|
||||
|
||||
class BookEditor(QDialog):
|
||||
def __init__(self, book: Book = None, parent=None):
|
||||
def __init__(self, book: Book = None, parent=None, refresh_callback: Callable[[Dict[str, object]], None] = None):
|
||||
super().__init__(parent)
|
||||
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self.create_layout()
|
||||
|
||||
self.refresh_callback = refresh_callback
|
||||
|
||||
if book:
|
||||
self.logger.debug(f"Editing existing book {book.title}")
|
||||
self.book = book
|
||||
self.book_id = book.id
|
||||
self.logger.debug(f"Editing book {book.title}")
|
||||
self.create_new = False
|
||||
self.fill_with_existing_data()
|
||||
self.fill_with_existing_data(book)
|
||||
else:
|
||||
self.logger.debug("Editing a new book")
|
||||
self.create_new = True
|
||||
@ -43,7 +47,7 @@ class BookEditor(QDialog):
|
||||
form_layout.addRow("Title:", self.title_input)
|
||||
|
||||
# Author field
|
||||
self.author_label = QLabel()
|
||||
self.author_label = QLineEdit()
|
||||
form_layout.addRow("Author: ", self.author_label)
|
||||
|
||||
# Description field
|
||||
@ -52,20 +56,33 @@ class BookEditor(QDialog):
|
||||
|
||||
# Year published field
|
||||
self.year_input = QLineEdit()
|
||||
# self.year_input.setValidator
|
||||
form_layout.addRow("Year Published:", self.year_input)
|
||||
|
||||
# ISBN field
|
||||
self.isbn_input = QLineEdit()
|
||||
self.isbn_expression = QRegularExpression("\d{10}|\d{13}")
|
||||
self.isbn_expression = QRegularExpression("(\d{13})|(\d{10})")
|
||||
self.isbn_validator = QRegularExpressionValidator(self.isbn_expression)
|
||||
self.isbn_input.setValidator(self.isbn_validator)
|
||||
form_layout.addRow("ISBN:", self.isbn_input)
|
||||
|
||||
# Categories field
|
||||
self.categories_input = QLineEdit()
|
||||
self.categories_input.setEnabled(False)
|
||||
form_layout.addRow("Categories: ", self.categories_input)
|
||||
|
||||
# Damage field
|
||||
self.damage_input = QCheckBox()
|
||||
form_layout.addRow("Damage:", self.damage_input)
|
||||
|
||||
# Price field
|
||||
self.price_input = QLineEdit()
|
||||
self.price_input.setValidator(QRegularExpressionValidator(QRegularExpression(r"^\d+(\.\d{1,2})?$")))
|
||||
form_layout.addRow("Price:", self.price_input)
|
||||
|
||||
# Status field
|
||||
self.status_input = QComboBox()
|
||||
self.status_input.addItems([status.value for status in BookStatusEnum])
|
||||
form_layout.addRow("Status:", self.status_input)
|
||||
|
||||
layout.addLayout(form_layout)
|
||||
|
||||
# Buttons
|
||||
@ -81,50 +98,128 @@ class BookEditor(QDialog):
|
||||
|
||||
layout.addLayout(button_layout)
|
||||
|
||||
def fill_with_existing_data(self):
|
||||
self.title_input.setText(self.book.title)
|
||||
self.description_input.setText(self.book.description)
|
||||
self.year_input.setText(self.book.year_published)
|
||||
self.isbn_input.setText(self.book.isbn)
|
||||
def fill_with_existing_data(self, book: Book):
|
||||
self.title_input.setText(book.title)
|
||||
self.description_input.setText(book.description)
|
||||
self.year_input.setText(book.year_published)
|
||||
self.isbn_input.setText(book.isbn)
|
||||
|
||||
def save_book(self):
|
||||
# Update book object with input values
|
||||
self.book.title = self.title_input.text()
|
||||
full_author_name = f"{self.book.author.first_name} {
|
||||
self.book.author.last_name}"
|
||||
full_author_name = f"{book.author.first_name} {book.author.last_name}"
|
||||
self.author_label.setText(full_author_name)
|
||||
self.book.description = self.description_input.toPlainText()
|
||||
self.book.year_published = self.year_input.text()
|
||||
self.book.isbn = self.isbn_input.text()
|
||||
|
||||
all_categories = ", ".join(
|
||||
category.name for category in self.book.categories)
|
||||
all_categories = ", ".join(category.name for category in book.categories)
|
||||
self.categories_input.setText(all_categories)
|
||||
|
||||
self.damage_input.setChecked(book.is_damaged)
|
||||
self.price_input.setText(str(book.price))
|
||||
self.status_input.setCurrentText(book.status.value)
|
||||
|
||||
def save_book(self):
|
||||
try:
|
||||
book_object = self.parse_inputs()
|
||||
|
||||
if self.create_new:
|
||||
pass
|
||||
create_book(book_object, skip_existing=False)
|
||||
else:
|
||||
update_book(self.book)
|
||||
book_object["id"] = self.book_id
|
||||
update_book(book_object)
|
||||
|
||||
QMessageBox.information(None,
|
||||
"Success",
|
||||
"Book updated successfully",
|
||||
QMessageBox.StandardButton.Ok)
|
||||
|
||||
if self.refresh_callback:
|
||||
self.refresh_callback(book_object)
|
||||
|
||||
self.accept()
|
||||
except ValueError as e:
|
||||
QMessageBox.critical(None,
|
||||
"Invalid Input",
|
||||
f"Input validation failed: {e}",
|
||||
QMessageBox.StandardButton.Ok)
|
||||
except DuplicateEntryError as e:
|
||||
QMessageBox.critical(None,
|
||||
"ISBN is already in use",
|
||||
"The ISBN provided is already in use",
|
||||
QMessageBox.StandardButton.Ok,
|
||||
QMessageBox.StandardButton.NoButton)
|
||||
QMessageBox.StandardButton.Ok)
|
||||
except DatabaseConnectionError as e:
|
||||
QMessageBox.critical(None,
|
||||
"Failed to save",
|
||||
"Could not connect to the database",
|
||||
QMessageBox.StandardButton.Ok,
|
||||
QMessageBox.StandardButton.NoButton)
|
||||
QMessageBox.StandardButton.Ok)
|
||||
except DatabaseError as e:
|
||||
QMessageBox.critical(self.parent,
|
||||
"An error occured",
|
||||
QMessageBox.critical(None,
|
||||
"An error occurred",
|
||||
f"Could not save the book because of the following error: {e}",
|
||||
QMessageBox.StandardButton.Ok,
|
||||
QMessageBox.StandardButton.NoButton)
|
||||
QMessageBox.StandardButton.Ok)
|
||||
|
||||
def parse_inputs(self) -> Dict[str, object]:
|
||||
# Title validation
|
||||
title = self.title_input.text().strip()
|
||||
if not title or len(title) > 100:
|
||||
raise ValueError("Title must be non-empty and at most 100 characters long.")
|
||||
|
||||
# Author validation
|
||||
author_name = self.author_label.text().strip()
|
||||
if not author_name or len(author_name.split()) < 2:
|
||||
raise ValueError("Author must include at least a first and last name.")
|
||||
|
||||
# Split author name into first and last names
|
||||
author_parts = author_name.split()
|
||||
first_name = author_parts[0]
|
||||
last_name = " ".join(author_parts[1:])
|
||||
|
||||
# Description validation
|
||||
description = self.description_input.toPlainText().strip()
|
||||
if not description:
|
||||
raise ValueError("Description cannot be empty.")
|
||||
|
||||
# Year published validation
|
||||
year_published = self.year_input.text().strip()
|
||||
if not year_published.isdigit() or len(year_published) != 4 or int(year_published) < 0:
|
||||
raise ValueError("Year published must be a 4-digit positive number.")
|
||||
|
||||
# ISBN validation
|
||||
isbn = self.isbn_input.text().strip()
|
||||
if not isbn or len(isbn) not in (10, 13):
|
||||
raise ValueError("ISBN must be either 10 or 13 characters long.")
|
||||
|
||||
# Categories validation
|
||||
category_text = self.categories_input.text().strip()
|
||||
categories = [category.strip() for category in category_text.split(",") if category.strip()]
|
||||
if not categories:
|
||||
raise ValueError("At least one category must be specified.")
|
||||
|
||||
# Damage validation
|
||||
damage = self.damage_input.isChecked()
|
||||
|
||||
# Price validation
|
||||
price = self.price_input.text().strip()
|
||||
try:
|
||||
price = float(price)
|
||||
if price < 0:
|
||||
raise ValueError("Price must be a non-negative number.")
|
||||
except ValueError:
|
||||
raise ValueError("Price must be a valid decimal number.")
|
||||
|
||||
# Status validation
|
||||
status = self.status_input.currentText()
|
||||
|
||||
# Map parsed values to dictionary format for saving
|
||||
return {
|
||||
"title": title,
|
||||
"author": {
|
||||
"first_name": first_name,
|
||||
"last_name": last_name
|
||||
},
|
||||
"description": description,
|
||||
"year_published": year_published,
|
||||
"isbn": isbn,
|
||||
"categories": categories,
|
||||
"is_damaged": damage,
|
||||
"price": price,
|
||||
"status": status
|
||||
}
|
||||
|
||||
__all__ = ["BookEditor"]
|
||||
|
@ -1,4 +1,6 @@
|
||||
import logging
|
||||
import re
|
||||
from typing import Dict, Callable
|
||||
|
||||
from PySide6.QtGui import QGuiApplication, QAction
|
||||
from PySide6.QtQml import QQmlApplicationEngine
|
||||
@ -7,24 +9,27 @@ from PySide6.QtWidgets import QVBoxLayout, QFormLayout, QLineEdit, QHBoxLayout,
|
||||
|
||||
from models import Member
|
||||
|
||||
from database.member import create_new_member
|
||||
from database.member import create_member, update_member
|
||||
|
||||
from utils.errors.database import DatabaseError, DatabaseConnectionError, DuplicateEntryError
|
||||
|
||||
|
||||
class MemberEditor(QDialog):
|
||||
def __init__(self, member: Member = None):
|
||||
def __init__(self, member: Member = None, refresh_callback: Callable[[Dict[str, object]], None] = None):
|
||||
super().__init__()
|
||||
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self.create_layout()
|
||||
|
||||
self.refresh_callback = refresh_callback
|
||||
|
||||
if member:
|
||||
self.member = member
|
||||
self.fill_with_existing_data()
|
||||
self.member_id = member.id
|
||||
self.logger.debug(f"Editing member {member.first_name} {member.last_name}")
|
||||
self.fill_with_existing_data(member)
|
||||
self.create_new = False
|
||||
else:
|
||||
self.member = Member()
|
||||
self.logger.debug("Editing a new member")
|
||||
self.create_new = True
|
||||
|
||||
def create_layout(self):
|
||||
@ -68,27 +73,83 @@ class MemberEditor(QDialog):
|
||||
|
||||
self.layout.addLayout(self.button_layout)
|
||||
|
||||
def fill_with_existing_data(self):
|
||||
self.first_name_input.setText(self.member.first_name)
|
||||
self.last_name_input.setText(self.member.last_name)
|
||||
self.email_input.setText(self.member.email)
|
||||
self.phone_number_input.setText(self.member.phone)
|
||||
def fill_with_existing_data(self, member: Member):
|
||||
self.first_name_input.setText(member.first_name)
|
||||
self.last_name_input.setText(member.last_name)
|
||||
self.email_input.setText(member.email)
|
||||
self.phone_number_input.setText(member.phone)
|
||||
|
||||
def save_member(self):
|
||||
self.member.first_name = self.first_name_input.text()
|
||||
self.member.last_name = self.last_name_input.text()
|
||||
self.member.email = self.email_input.text()
|
||||
self.member.phone = self.phone_number_input.text()
|
||||
|
||||
try:
|
||||
member_object = self.parse_inputs()
|
||||
|
||||
if self.create_new:
|
||||
self.logger.debug("Creating new member")
|
||||
create_new_member(self.member)
|
||||
except DuplicateEntryError:
|
||||
QMessageBox.critical(None, "Details already in use", "Cannot create a new user",
|
||||
QMessageBox.StandardButton.Ok, QMessageBox.StandardButtons.NoButton)
|
||||
create_member(member_object)
|
||||
QMessageBox.information(None,
|
||||
"Success",
|
||||
"Member created successfully",
|
||||
QMessageBox.StandardButton.Ok,
|
||||
QMessageBox.StandardButton.NoButton)
|
||||
else:
|
||||
member_object["id"] = self.member_id
|
||||
update_member(member_object)
|
||||
QMessageBox.information(None,
|
||||
"Success",
|
||||
"Member updated successfully",
|
||||
QMessageBox.StandardButton.Ok,
|
||||
QMessageBox.StandardButton.NoButton)
|
||||
|
||||
self.accept()
|
||||
if self.refresh_callback:
|
||||
self.refresh_callback(member_object)
|
||||
|
||||
self.accept()
|
||||
except ValueError as e:
|
||||
QMessageBox.critical(None,
|
||||
"Invalid Input",
|
||||
f"Input validation failed: {e}",
|
||||
QMessageBox.StandardButton.Ok,
|
||||
QMessageBox.StandardButton.NoButton)
|
||||
except DuplicateEntryError as e:
|
||||
QMessageBox.critical(None,
|
||||
f"Duplicate {e.duplicate_entry_name}",
|
||||
f"The {e.duplicate_entry_name} is already in use",
|
||||
QMessageBox.StandardButton.Ok,
|
||||
QMessageBox.StandardButton.NoButton)
|
||||
except DatabaseConnectionError as e:
|
||||
QMessageBox.critical(None,
|
||||
"Connection error",
|
||||
"Could not connect to the database",
|
||||
QMessageBox.StandardButton.Ok)
|
||||
except DatabaseError as e:
|
||||
QMessageBox.critical(None,
|
||||
"Unknown database error",
|
||||
f"Could not save the book because of the following error: {e}",
|
||||
QMessageBox.StandardButton.Ok)
|
||||
|
||||
def parse_inputs(self) -> Dict:
|
||||
first_name = self.first_name_input.text().strip()
|
||||
if not first_name or len(first_name) > 50:
|
||||
raise ValueError("First name must be non-empty and at most 50 characters long.")
|
||||
|
||||
last_name = self.last_name_input.text().strip()
|
||||
if not last_name or len(last_name) > 50:
|
||||
raise ValueError("Last name must be non-empty and at most 50 characters long.")
|
||||
|
||||
email = self.email_input.text().strip()
|
||||
email_regex = r"^[\w\-\.]+@([\w\-]+\.)+[\w\-]{2,}$"
|
||||
if not re.match(email_regex, email):
|
||||
raise ValueError("E-mail address is not in a valid format.")
|
||||
|
||||
phone_number = self.phone_number_input.text().strip()
|
||||
phone_number_regex = r"(\+\d{1,3})?\d{9}"
|
||||
if not re.match(phone_number_regex, phone_number):
|
||||
raise ValueError("Phone number is not in valid format.")
|
||||
|
||||
return {
|
||||
"first_name": first_name,
|
||||
"last_name": last_name,
|
||||
"email": email,
|
||||
"phone": phone_number
|
||||
}
|
||||
|
||||
__all__ = ["MemberEditor"]
|
||||
|
3
src/ui/main_tabs/__init__.py
Normal file
3
src/ui/main_tabs/__init__.py
Normal file
@ -0,0 +1,3 @@
|
||||
from .book_overview_list import BookOverviewList
|
||||
from .member_list import MemberList
|
||||
from .category_statistics_overview_list import BookCategoryStatisticsOverview
|
7
src/ui/main_tabs/book_overview_list/__init__.py
Normal file
7
src/ui/main_tabs/book_overview_list/__init__.py
Normal file
@ -0,0 +1,7 @@
|
||||
from .overview_list import *
|
||||
from .book_card import *
|
||||
|
||||
__all__ = [
|
||||
*overview_list.__all__,
|
||||
*book_card.__all__
|
||||
]
|
148
src/ui/main_tabs/book_overview_list/book_card.py
Normal file
148
src/ui/main_tabs/book_overview_list/book_card.py
Normal file
@ -0,0 +1,148 @@
|
||||
from PySide6.QtGui import QAction, Qt
|
||||
from PySide6.QtWidgets import QHBoxLayout, QVBoxLayout, QLabel, QWidget, QMenu, QSizePolicy, QLayout, QMessageBox, QDialog
|
||||
from models import BooksOverview, Book, BookStatusEnum
|
||||
from ui.editor import BookEditor
|
||||
from database.manager import DatabaseManager
|
||||
from database import delete_book
|
||||
from utils.errors import DatabaseConnectionError, DatabaseError
|
||||
|
||||
STATUS_TO_COLOR_MAP = {
|
||||
BookStatusEnum.available: "#3c702e",
|
||||
BookStatusEnum.borrowed: "#702525",
|
||||
BookStatusEnum.reserved: "#bc7613"
|
||||
}
|
||||
|
||||
|
||||
class BookCard(QWidget):
|
||||
def __init__(self, book_overview: BooksOverview):
|
||||
super().__init__()
|
||||
|
||||
self.book_overview = book_overview
|
||||
|
||||
self.setAttribute(Qt.WidgetAttribute.WA_Hover, True) # Enable hover events
|
||||
self.setAttribute(Qt.WidgetAttribute.WA_StyledBackground, True) # Enable styling for background
|
||||
|
||||
# Set initial stylesheet with hover behavior
|
||||
self.setStyleSheet("""
|
||||
BookCard:hover {
|
||||
background-color: palette(highlight);
|
||||
}
|
||||
""")
|
||||
|
||||
# Layout setup
|
||||
layout = QHBoxLayout(self)
|
||||
layout.setSizeConstraint(QLayout.SizeConstraint.SetMinimumSize)
|
||||
self.setSizePolicy(QSizePolicy.Policy.Preferred, QSizePolicy.Policy.Fixed)
|
||||
|
||||
layout.setContentsMargins(10, 10, 10, 10)
|
||||
layout.setSpacing(10)
|
||||
|
||||
# Initialize UI components
|
||||
self.title_label = QLabel()
|
||||
self.author_label = QLabel()
|
||||
self.isbn_label = QLabel()
|
||||
self.status_label = QLabel()
|
||||
self.price_label = QLabel()
|
||||
self.is_damaged_label = QLabel()
|
||||
|
||||
# Left-side content
|
||||
left_side = QVBoxLayout()
|
||||
layout.addLayout(left_side)
|
||||
left_side.addWidget(self.title_label)
|
||||
left_side.addWidget(self.author_label)
|
||||
left_side.addWidget(self.isbn_label)
|
||||
|
||||
# Right-side content
|
||||
right_side = QVBoxLayout()
|
||||
layout.addLayout(right_side)
|
||||
right_side.addWidget(self.status_label)
|
||||
right_side.addWidget(self.price_label)
|
||||
right_side.addWidget(self.is_damaged_label)
|
||||
|
||||
self.setLayout(layout)
|
||||
self.update_display()
|
||||
|
||||
def update_display(self):
|
||||
"""Refreshes the display of the book card based on its current data."""
|
||||
self.title_label.setText(self.book_overview.title)
|
||||
self.title_label.setStyleSheet("font-size: 20px; font-weight: bold;")
|
||||
|
||||
self.author_label.setText("By: " + self.book_overview.author_name)
|
||||
self.isbn_label.setText("ISBN: " + (self.book_overview.isbn or "Not Available"))
|
||||
|
||||
self.status_label.setText(str(self.book_overview.status.value.capitalize()))
|
||||
self.status_label.setStyleSheet(f"color: {STATUS_TO_COLOR_MAP[self.book_overview.status]}; font-size: 20px; font-weight: bold;")
|
||||
self.status_label.setAlignment(Qt.AlignmentFlag.AlignRight)
|
||||
|
||||
self.price_label.setText("Price: " + str(self.book_overview.price))
|
||||
self.price_label.setAlignment(Qt.AlignmentFlag.AlignRight)
|
||||
|
||||
self.is_damaged_label.setText("Damaged: " + str(self.book_overview.is_damaged))
|
||||
self.is_damaged_label.setAlignment(Qt.AlignmentFlag.AlignRight)
|
||||
|
||||
def contextMenuEvent(self, event):
|
||||
context_menu = QMenu(self)
|
||||
|
||||
action_edit_book = context_menu.addAction("Edit Book")
|
||||
delete_book_action = context_menu.addAction("Delete Book")
|
||||
delete_book_action.triggered.connect(self.delete_book)
|
||||
|
||||
action = context_menu.exec_(self.mapToGlobal(event.pos()))
|
||||
|
||||
if action == action_edit_book:
|
||||
self.open_editor()
|
||||
|
||||
def open_editor(self):
|
||||
"""Opens the BookEditor and updates the card if changes are made."""
|
||||
with DatabaseManager.get_session() as session:
|
||||
book_id = self.book_overview.id
|
||||
book = session.query(Book).filter(Book.id == book_id).one_or_none()
|
||||
|
||||
if book:
|
||||
editor = BookEditor(book)
|
||||
if editor.exec() == QDialog.DialogCode.Accepted:
|
||||
updated_data = editor.parse_inputs()
|
||||
self.refresh(updated_data)
|
||||
else:
|
||||
QMessageBox.critical(self,
|
||||
"Error",
|
||||
"The book you requested could not be found. Try again later",
|
||||
QMessageBox.StandardButton.Ok)
|
||||
|
||||
def refresh(self, updated_data):
|
||||
"""Updates the card's data and refreshes the display."""
|
||||
self.book_overview.title = updated_data["title"]
|
||||
self.book_overview.author_name = f"{updated_data['author']['first_name']} {updated_data['author']['last_name']}"
|
||||
self.book_overview.isbn = updated_data["isbn"]
|
||||
self.book_overview.status = BookStatusEnum(updated_data["status"])
|
||||
self.book_overview.price = updated_data["price"]
|
||||
self.book_overview.is_damaged = updated_data["is_damaged"]
|
||||
self.update_display()
|
||||
|
||||
def delete_book(self):
|
||||
if not self.make_sure():
|
||||
return
|
||||
|
||||
try:
|
||||
delete_book(self.book_overview.id)
|
||||
self.setVisible(False)
|
||||
except DatabaseConnectionError as e:
|
||||
QMessageBox.critical(None, "Failed", "Connection with database failed", QMessageBox.StandardButton.Ok)
|
||||
except DatabaseError as e:
|
||||
QMessageBox.critical(None, "Failed", f"An error occurred when deleting book: {e}", QMessageBox.StandardButton.Ok)
|
||||
|
||||
def make_sure(self) -> bool:
|
||||
are_you_sure_box = QMessageBox()
|
||||
are_you_sure_box.setIcon(QMessageBox.Question)
|
||||
are_you_sure_box.setWindowTitle("Are you sure?")
|
||||
are_you_sure_box.setText(f"Are you sure you want to delete {self.book_overview.title}?")
|
||||
are_you_sure_box.setStandardButtons(QMessageBox.Yes | QMessageBox.No)
|
||||
are_you_sure_box.setDefaultButton(QMessageBox.No)
|
||||
|
||||
# Show the message box and capture the user's response
|
||||
response = are_you_sure_box.exec()
|
||||
|
||||
# Handle the response
|
||||
return response == QMessageBox.Yes
|
||||
|
||||
__all__ = ["BookCard"]
|
@ -9,13 +9,15 @@ from .book_card import BookCard
|
||||
from models import BooksOverview
|
||||
|
||||
from database.manager import DatabaseManager
|
||||
from database.book_overview import fetch_all_book_overviews
|
||||
|
||||
from ui.editor import MemberEditor
|
||||
|
||||
|
||||
class LibraryDashboard(QWidget):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
class BookOverviewList(QWidget):
|
||||
def __init__(self, parent = None):
|
||||
self.parent = parent
|
||||
super().__init__(parent=parent)
|
||||
|
||||
# Central widget and layout
|
||||
main_layout = QVBoxLayout(self)
|
||||
@ -59,10 +61,6 @@ class LibraryDashboard(QWidget):
|
||||
register_member_button.clicked.connect(self.register_member)
|
||||
button_layout.addWidget(register_member_button)
|
||||
|
||||
add_borrow_record_button = QPushButton("Add Borrow Record")
|
||||
add_borrow_record_button.clicked.connect(self.add_borrow_record)
|
||||
button_layout.addWidget(add_borrow_record_button)
|
||||
|
||||
main_layout.addLayout(button_layout)
|
||||
|
||||
def filter_books(self, text):
|
||||
@ -77,6 +75,7 @@ class LibraryDashboard(QWidget):
|
||||
|
||||
def register_member(self):
|
||||
MemberEditor().exec()
|
||||
self.parent.refresh_member_cards()
|
||||
|
||||
def add_borrow_record(self):
|
||||
QMessageBox.information(self, "Add Borrow Record",
|
||||
@ -98,7 +97,7 @@ class LibraryDashboard(QWidget):
|
||||
self.clear_layout(self.scroll_layout)
|
||||
self.book_cards = []
|
||||
|
||||
self.books = self.fetch_books_from_db()
|
||||
self.books = fetch_all_book_overviews()
|
||||
|
||||
for book in self.books:
|
||||
card = BookCard(book)
|
||||
@ -106,13 +105,5 @@ class LibraryDashboard(QWidget):
|
||||
self.scroll_layout.addWidget(card)
|
||||
self.book_cards.append(card)
|
||||
|
||||
def fetch_books_from_db(self):
|
||||
"""Fetch all books from the database."""
|
||||
try:
|
||||
with DatabaseManager.get_session() as session:
|
||||
books = session.query(BooksOverview).all()
|
||||
return books
|
||||
except Exception as e:
|
||||
QMessageBox.critical(self, "Database Error",
|
||||
f"Failed to fetch books: {e}")
|
||||
return []
|
||||
|
||||
__all__ = ["BookOverviewList"]
|
@ -0,0 +1,7 @@
|
||||
from .category_overview_list import *
|
||||
from .category_overview_card import *
|
||||
|
||||
__all__ = [
|
||||
*category_overview_list.__all__,
|
||||
*category_overview_card.__all__
|
||||
]
|
@ -0,0 +1,58 @@
|
||||
from PySide6.QtGui import QGuiApplication, QAction, Qt
|
||||
from PySide6.QtQml import QQmlApplicationEngine
|
||||
from PySide6.QtWidgets import QHBoxLayout, QVBoxLayout, QLabel, QWidget, QMenu, QSizePolicy, QLayout, QMessageBox
|
||||
from PySide6.QtCore import qDebug
|
||||
|
||||
from ui.editor import BookEditor
|
||||
|
||||
from models import BookCategoryStatisticsOverview
|
||||
|
||||
from database.manager import DatabaseManager
|
||||
|
||||
|
||||
class BookCategoryStatisticsOverviewCard(QWidget):
|
||||
def __init__(self, book_category_statistics_overview: BookCategoryStatisticsOverview):
|
||||
super().__init__()
|
||||
|
||||
self.book_category_statistics_overview = book_category_statistics_overview
|
||||
|
||||
self.setAttribute(Qt.WidgetAttribute.WA_Hover, True) # Enable hover events
|
||||
# Enable styling for background
|
||||
self.setAttribute(Qt.WidgetAttribute.WA_StyledBackground, True)
|
||||
|
||||
# Set initial stylesheet with hover behavior
|
||||
self.setStyleSheet("""
|
||||
BookCategoryStatisticsOverviewCard:hover {
|
||||
background-color: palette(highlight);
|
||||
}
|
||||
""")
|
||||
|
||||
# Layout setup
|
||||
layout = QHBoxLayout(self)
|
||||
layout.setSizeConstraint(QLayout.SizeConstraint.SetMinimumSize)
|
||||
self.setSizePolicy(QSizePolicy.Policy.Preferred,
|
||||
QSizePolicy.Policy.Fixed)
|
||||
|
||||
layout.setContentsMargins(10, 10, 10, 10)
|
||||
layout.setSpacing(10)
|
||||
|
||||
# Left-side content
|
||||
left_side = QVBoxLayout()
|
||||
layout.addLayout(left_side)
|
||||
category_name_label = QLabel(book_category_statistics_overview.name)
|
||||
category_name_label.setStyleSheet("font-size: 20px; font-weight: bold;")
|
||||
left_side.addWidget(category_name_label)
|
||||
|
||||
# Right-side content
|
||||
right_side = QVBoxLayout()
|
||||
layout.addLayout(right_side)
|
||||
|
||||
status_label = QLabel(str(book_category_statistics_overview.book_count))
|
||||
status_label.setStyleSheet("font-size: 20px; font-weight: bold;")
|
||||
status_label.setAlignment(Qt.AlignmentFlag.AlignRight)
|
||||
|
||||
right_side.addWidget(status_label)
|
||||
|
||||
self.setLayout(layout)
|
||||
|
||||
__all__ = ["BookCategoryStatisticsOverviewCard"]
|
@ -0,0 +1,89 @@
|
||||
from PySide6.QtGui import QAction
|
||||
from PySide6.QtWidgets import (
|
||||
QWidget, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QScrollArea,
|
||||
QFrame, QPushButton, QMessageBox, QVBoxLayout
|
||||
)
|
||||
from PySide6.QtCore import Qt
|
||||
|
||||
from .category_overview_card import BookCategoryStatisticsOverviewCard
|
||||
from models import BookCategoryStatisticsOverview
|
||||
|
||||
from database.manager import DatabaseManager
|
||||
from database import fetch_all_book_category_statistics_overviews
|
||||
|
||||
class BookCategoryStatisticsOverview(QWidget):
|
||||
def __init__(self, parent = None):
|
||||
self.parent = parent
|
||||
super().__init__(parent=parent)
|
||||
|
||||
# Central widget and layout
|
||||
main_layout = QVBoxLayout(self)
|
||||
|
||||
# Title label
|
||||
title_label = QLabel("Category statistics", self)
|
||||
title_label.setAlignment(Qt.AlignCenter)
|
||||
title_label.setStyleSheet(
|
||||
"font-size: 20px; font-weight: bold; color: #0078D4;")
|
||||
main_layout.addWidget(title_label)
|
||||
|
||||
# Search bar
|
||||
self.search_input = QLineEdit()
|
||||
self.search_input.setPlaceholderText("Search in categories...")
|
||||
self.search_input.textChanged.connect(self.filter_categories)
|
||||
main_layout.addWidget(self.search_input)
|
||||
|
||||
# Scrollable area for cards
|
||||
self.scroll_area = QScrollArea()
|
||||
self.scroll_area.setWidgetResizable(True)
|
||||
|
||||
# Container widget for the scroll area
|
||||
self.scroll_widget = QWidget()
|
||||
self.scroll_layout = QVBoxLayout(self.scroll_widget)
|
||||
self.scroll_layout.setSpacing(5) # Set gap between individual cards
|
||||
self.scroll_layout.setContentsMargins(0, 0, 0, 0) # Remove spacing from all sides which is present by default
|
||||
|
||||
# Align the cards to the top
|
||||
self.scroll_layout.setAlignment(Qt.AlignTop)
|
||||
self.category_overviews = []
|
||||
self.category_overview_cards = []
|
||||
self.redraw_cards()
|
||||
|
||||
self.scroll_widget.setLayout(self.scroll_layout)
|
||||
self.scroll_area.setWidget(self.scroll_widget)
|
||||
main_layout.addWidget(self.scroll_area)
|
||||
|
||||
def filter_categories(self, text):
|
||||
"""Filter the cards based on the search input."""
|
||||
for card, category in zip(self.category_overview_cards, self.category_overviews):
|
||||
|
||||
name_contains_text = text.lower() in category.name.lower()
|
||||
|
||||
card.setVisible(name_contains_text)
|
||||
|
||||
|
||||
def clear_layout(self, layout):
|
||||
while layout.count():
|
||||
item = layout.takeAt(0)
|
||||
widget = item.widget()
|
||||
if widget is not None:
|
||||
widget.deleteLater()
|
||||
else:
|
||||
sub_layout = item.layout()
|
||||
if sub_layout is not None:
|
||||
self.clear_layout(sub_layout)
|
||||
del item
|
||||
|
||||
def redraw_cards(self):
|
||||
self.clear_layout(self.scroll_layout)
|
||||
self.category_overview_cards = []
|
||||
|
||||
self.category_overviews = fetch_all_book_category_statistics_overviews()
|
||||
|
||||
for category in self.category_overviews:
|
||||
card = BookCategoryStatisticsOverviewCard(category)
|
||||
|
||||
self.scroll_layout.addWidget(card)
|
||||
self.category_overview_cards.append(card)
|
||||
|
||||
|
||||
__all__ = ["BookCategoryStatisticsOverview"]
|
7
src/ui/main_tabs/member_list/__init__.py
Normal file
7
src/ui/main_tabs/member_list/__init__.py
Normal file
@ -0,0 +1,7 @@
|
||||
from .member_list import *
|
||||
from .member_card import *
|
||||
|
||||
__all__ = [
|
||||
*member_list.__all__,
|
||||
*member_card.__all__
|
||||
]
|
@ -2,11 +2,11 @@ from PySide6.QtWidgets import (
|
||||
QHBoxLayout, QVBoxLayout, QLabel, QWidget, QMenu, QSizePolicy, QLayout, QMessageBox
|
||||
)
|
||||
from PySide6.QtGui import Qt
|
||||
from PySide6.QtCore import qDebug
|
||||
|
||||
from models import Member, MemberStatusEnum
|
||||
from database.manager import DatabaseManager
|
||||
from sqlalchemy import delete
|
||||
from database import delete_member
|
||||
from ui.editor import MemberEditor
|
||||
from utils.errors import DatabaseConnectionError, DatabaseError
|
||||
|
||||
STATUS_TO_COLOR_MAP = {
|
||||
MemberStatusEnum.active: "#3c702e",
|
||||
@ -38,32 +38,43 @@ class MemberCard(QWidget):
|
||||
layout.setContentsMargins(10, 10, 10, 10)
|
||||
layout.setSpacing(10)
|
||||
|
||||
# Initialize UI components
|
||||
self.name_label = QLabel()
|
||||
self.email_label = QLabel()
|
||||
self.phone_label = QLabel()
|
||||
self.status_label = QLabel()
|
||||
self.register_date_label = QLabel()
|
||||
|
||||
# Left-side content
|
||||
left_side = QVBoxLayout()
|
||||
layout.addLayout(left_side)
|
||||
name_label = QLabel(f"{member.first_name} {member.last_name}")
|
||||
name_label.setStyleSheet("font-size: 20px; font-weight: bold;")
|
||||
email_label = QLabel(f"Email: {member.email}")
|
||||
phone_label = QLabel(f"Phone: {member.phone or 'Not Available'}")
|
||||
left_side.addWidget(name_label)
|
||||
left_side.addWidget(email_label)
|
||||
left_side.addWidget(phone_label)
|
||||
left_side.addWidget(self.name_label)
|
||||
left_side.addWidget(self.email_label)
|
||||
left_side.addWidget(self.phone_label)
|
||||
|
||||
# Right-side content
|
||||
right_side = QVBoxLayout()
|
||||
layout.addLayout(right_side)
|
||||
|
||||
status_label = QLabel(str(member.status.value.capitalize()))
|
||||
status_label.setStyleSheet(f"color: {STATUS_TO_COLOR_MAP[member.status]}; font-size: 20px; font-weight: bold;")
|
||||
status_label.setAlignment(Qt.AlignmentFlag.AlignRight)
|
||||
|
||||
register_date_label = QLabel(f"Registered: {member.register_date}")
|
||||
register_date_label.setAlignment(Qt.AlignmentFlag.AlignRight)
|
||||
|
||||
right_side.addWidget(status_label)
|
||||
right_side.addWidget(register_date_label)
|
||||
right_side.addWidget(self.status_label)
|
||||
right_side.addWidget(self.register_date_label)
|
||||
|
||||
self.setLayout(layout)
|
||||
self.update_display()
|
||||
|
||||
def update_display(self):
|
||||
"""Refreshes the display of the member card based on its current data."""
|
||||
self.name_label.setText(f"{self.member.first_name} {self.member.last_name}")
|
||||
self.name_label.setStyleSheet("font-size: 20px; font-weight: bold;")
|
||||
|
||||
self.email_label.setText(f"Email: {self.member.email}")
|
||||
self.phone_label.setText(f"Phone: {self.member.phone or 'Not Available'}")
|
||||
|
||||
self.status_label.setText(str(self.member.status.value.capitalize()))
|
||||
self.status_label.setStyleSheet(f"color: {STATUS_TO_COLOR_MAP[self.member.status]}; font-size: 20px; font-weight: bold;")
|
||||
self.status_label.setAlignment(Qt.AlignmentFlag.AlignRight)
|
||||
|
||||
self.register_date_label.setText(f"Registered: {self.member.register_date}")
|
||||
self.register_date_label.setAlignment(Qt.AlignmentFlag.AlignRight)
|
||||
|
||||
def mousePressEvent(self, event):
|
||||
if event.button() == Qt.MouseButton.LeftButton:
|
||||
@ -89,26 +100,24 @@ class MemberCard(QWidget):
|
||||
action = context_menu.exec_(self.mapToGlobal(event.pos()))
|
||||
|
||||
if action == action_edit_member:
|
||||
print("Edit Member selected") # Implement editor logic here
|
||||
editor = MemberEditor(self.member, refresh_callback=self.refresh)
|
||||
editor.exec()
|
||||
elif action == action_deactivate_member:
|
||||
self.update_member_status(MemberStatusEnum.inactive)
|
||||
elif action == action_activate_member:
|
||||
self.update_member_status(MemberStatusEnum.active)
|
||||
|
||||
def delete_member(self):
|
||||
self.make_sure()
|
||||
# if not self.make_sure():
|
||||
# return
|
||||
|
||||
# with DatabaseManager.get_session() as session:
|
||||
# try:
|
||||
# stmt = delete(Member).where(Member.id == self.member.id)
|
||||
# session.execute(stmt)
|
||||
# session.commit()
|
||||
# self.setVisible(False)
|
||||
# except Exception as e:
|
||||
# session.rollback()
|
||||
# print(e)
|
||||
if not self.make_sure():
|
||||
return
|
||||
|
||||
try:
|
||||
delete_member(self.member.id)
|
||||
self.setVisible(False)
|
||||
except DatabaseConnectionError as e:
|
||||
QMessageBox.critical(None, "Failed", "Connection with database failed", QMessageBox.StandardButton.Ok)
|
||||
except DatabaseError as e:
|
||||
QMessageBox.critical(None, "Failed", f"An error occurred when deleting member: {e}", QMessageBox.StandardButton.Ok)
|
||||
|
||||
def update_member_status(self, new_status):
|
||||
with DatabaseManager.get_session() as session:
|
||||
@ -119,17 +128,20 @@ class MemberCard(QWidget):
|
||||
session.commit()
|
||||
QMessageBox.information(self, "Status Updated", f"Member status updated to {new_status.value.capitalize()}.")
|
||||
self.member.status = new_status
|
||||
self.update_status_label()
|
||||
self.update_display()
|
||||
else:
|
||||
QMessageBox.critical(self, "Error", "The member you requested could not be found.", QMessageBox.StandardButton.Ok)
|
||||
except Exception as e:
|
||||
session.rollback()
|
||||
print(e)
|
||||
|
||||
def update_status_label(self):
|
||||
self.findChild(QLabel, self.member.status.value).setStyleSheet(
|
||||
f"color: {STATUS_TO_COLOR_MAP[self.member.status]}; font-size: 20px; font-weight: bold;"
|
||||
)
|
||||
def refresh(self, updated_data):
|
||||
"""Updates the card's data and refreshes the display."""
|
||||
self.member.first_name = updated_data["first_name"]
|
||||
self.member.last_name = updated_data["last_name"]
|
||||
self.member.email = updated_data["email"]
|
||||
self.member.phone = updated_data["phone"]
|
||||
self.update_display()
|
||||
|
||||
def make_sure(self) -> bool:
|
||||
are_you_sure_box = QMessageBox()
|
||||
@ -141,3 +153,5 @@ class MemberCard(QWidget):
|
||||
|
||||
response = are_you_sure_box.exec()
|
||||
return response == QMessageBox.Yes
|
||||
|
||||
__all__ = ["MemberCard"]
|
@ -57,27 +57,19 @@ class MemberList(QWidget):
|
||||
register_member_button.clicked.connect(self.register_member)
|
||||
button_layout.addWidget(register_member_button)
|
||||
|
||||
delete_member_button = QPushButton("Delete Member")
|
||||
delete_member_button.clicked.connect(self.delete_member)
|
||||
button_layout.addWidget(delete_member_button)
|
||||
|
||||
main_layout.addLayout(button_layout)
|
||||
|
||||
def filter_members(self, text):
|
||||
"""Filter the cards based on the search input."""
|
||||
for card, member in zip(self.member_cards, self.members):
|
||||
name_contains_text = text.lower() in member.name.lower()
|
||||
id_contains_text = text.lower() in str(member.id)
|
||||
first_name_contains_text = text.lower() in member.first_name.lower()
|
||||
last_name_contains_text = text.lower() in member.last_name.lower()
|
||||
|
||||
card.setVisible(name_contains_text or id_contains_text)
|
||||
card.setVisible(first_name_contains_text or last_name_contains_text)
|
||||
|
||||
def register_member(self):
|
||||
MemberEditor().exec()
|
||||
|
||||
def delete_member(self):
|
||||
QMessageBox.information(self, "Delete Member",
|
||||
"Open dialog to delete a member.")
|
||||
|
||||
def clear_layout(self, layout):
|
||||
while layout.count():
|
||||
item = layout.takeAt(0)
|
||||
@ -112,3 +104,6 @@ class MemberList(QWidget):
|
||||
QMessageBox.critical(self, "Database Error",
|
||||
f"Failed to fetch members: {e}")
|
||||
return []
|
||||
|
||||
|
||||
__all__ = ["MemberList"]
|
172
src/ui/menu_bar.py
Normal file
172
src/ui/menu_bar.py
Normal file
@ -0,0 +1,172 @@
|
||||
from PySide6.QtGui import QAction
|
||||
from PySide6.QtWidgets import QMessageBox, QFileDialog, QMenuBar, QMenu, QDialog
|
||||
from PySide6.QtCore import QStandardPaths
|
||||
|
||||
from ui.settings import SettingsDialog
|
||||
from ui.import_preview import PreviewDialog
|
||||
from ui.editor import BookEditor, MemberEditor
|
||||
|
||||
from utils.errors import ExportError, ExportFileError, InvalidContentsError
|
||||
from services import book_service, book_overview_service, book_category_statistics_service
|
||||
|
||||
|
||||
class MenuBar(QMenuBar):
|
||||
def __init__(self, parent):
|
||||
super().__init__(parent)
|
||||
|
||||
self.parent = parent
|
||||
self.file_types = {
|
||||
"XML files (*.xml)": ".xml",
|
||||
"Any file type (*)": ""
|
||||
}
|
||||
|
||||
self.create_file_menu()
|
||||
self.create_edit_menu()
|
||||
self.create_help_menu()
|
||||
|
||||
def create_file_menu(self):
|
||||
# File menu
|
||||
file_menu = self.addMenu("File")
|
||||
|
||||
# New submenu
|
||||
new_submenu = QMenu("New", self)
|
||||
file_menu.addMenu(new_submenu)
|
||||
|
||||
# New book action
|
||||
new_book_action = QAction("New book", self)
|
||||
new_book_action.triggered.connect(self.new_book)
|
||||
new_submenu.addAction(new_book_action)
|
||||
|
||||
# New member action
|
||||
new_member_action = QAction("New member", self)
|
||||
new_member_action.triggered.connect(self.new_member)
|
||||
new_submenu.addAction(new_member_action)
|
||||
|
||||
# Import submenu
|
||||
import_submenu = QMenu("Import", self)
|
||||
file_menu.addMenu(import_submenu)
|
||||
|
||||
import_books_action = QAction("Import books", self)
|
||||
import_books_action.triggered.connect(self.import_books)
|
||||
import_submenu.addAction(import_books_action)
|
||||
|
||||
# Export submenu
|
||||
export_submenu = QMenu("Export", self)
|
||||
file_menu.addMenu(export_submenu)
|
||||
|
||||
export_books_action = QAction("Export books", self)
|
||||
export_books_action.triggered.connect(self.export_books)
|
||||
export_submenu.addAction(export_books_action)
|
||||
|
||||
export_category_statistics = QAction("Export category statistics", self)
|
||||
export_category_statistics.triggered.connect(self.export_category_statistics)
|
||||
export_submenu.addAction(export_category_statistics)
|
||||
|
||||
file_menu.addSeparator()
|
||||
|
||||
exit_action = QAction("Exit", self)
|
||||
exit_action.setShortcut("Ctrl+Q")
|
||||
exit_action.triggered.connect(self.parent.close)
|
||||
file_menu.addAction(exit_action)
|
||||
|
||||
def create_edit_menu(self):
|
||||
# Edit menu
|
||||
edit_menu = self.addMenu("Edit")
|
||||
|
||||
# Preferences menu
|
||||
preferences_action = QAction("Preferences", self)
|
||||
preferences_action.setShortcut("Ctrl+,")
|
||||
preferences_action.triggered.connect(self.edit_preferences)
|
||||
edit_menu.addAction(preferences_action)
|
||||
|
||||
def create_help_menu(self):
|
||||
# Help menu
|
||||
help_menu = self.addMenu("Help")
|
||||
|
||||
about_action = QAction("About", self)
|
||||
about_action.triggered.connect(self.about)
|
||||
help_menu.addAction(about_action)
|
||||
|
||||
def edit_preferences(self):
|
||||
SettingsDialog(parent=self).exec()
|
||||
|
||||
def new_book(self):
|
||||
BookEditor().exec()
|
||||
self.parent.refresh_book_cards()
|
||||
|
||||
def new_member(self):
|
||||
MemberEditor().exec()
|
||||
self.parent.refresh_member_cards()
|
||||
|
||||
def import_books(self):
|
||||
self.import_data("Book", None, book_service)
|
||||
|
||||
def export_books(self):
|
||||
self.export_data("Book", book_service)
|
||||
|
||||
|
||||
def export_category_statistics(self):
|
||||
self.export_data("Category statistics", book_category_statistics_service)
|
||||
|
||||
def about(self):
|
||||
QMessageBox.information(
|
||||
self, "About", "Library app demonstrating the phantom read problem")
|
||||
|
||||
def import_data(self, import_name: str, preview_dialog, service):
|
||||
try:
|
||||
home_dir = QStandardPaths.writableLocation(QStandardPaths.HomeLocation)
|
||||
file_path, _ = QFileDialog.getOpenFileName(self, "Choose import file", home_dir, ";;".join(self.file_types.keys()))
|
||||
|
||||
if not file_path:
|
||||
return # User canceled
|
||||
|
||||
parsed_data = service.parse_from_xml(file_path)
|
||||
if not parsed_data:
|
||||
QMessageBox.warning(self, f"No New {import_name}s", f"No new {import_name}s to import.", QMessageBox.Ok)
|
||||
return
|
||||
|
||||
# Show preview dialog
|
||||
dialog = PreviewDialog(parsed_data, self)
|
||||
if dialog.exec() == QDialog.Accepted:
|
||||
# User confirmed, proceed with importing
|
||||
book_service.create_books(parsed_data)
|
||||
QMessageBox.information(self, "Success", "Books imported successfully!", QMessageBox.Ok)
|
||||
self.parent.refresh_book_cards()
|
||||
else:
|
||||
QMessageBox.information(self, "Canceled", "Import was canceled.", QMessageBox.Ok)
|
||||
except InvalidContentsError as e:
|
||||
QMessageBox.critical(self,
|
||||
"Invalid file",
|
||||
"The file you selected is invalid",
|
||||
QMessageBox.StandardButton.Ok)
|
||||
except ImportError as e:
|
||||
QMessageBox.critical(self,
|
||||
"Error importing books",
|
||||
f"An error occurred when importing books from the file provided: {e}",
|
||||
QMessageBox.StandardButton.Ok)
|
||||
|
||||
def export_data(self, export_name: str, service):
|
||||
try:
|
||||
home_dir = QStandardPaths.writableLocation(QStandardPaths.HomeLocation)
|
||||
file_path, selected_filter = QFileDialog.getSaveFileName(self,
|
||||
f"Save {export_name} export",
|
||||
home_dir,
|
||||
";;".join(self.file_types.keys()))
|
||||
if file_path:
|
||||
selected_filetype = self.file_types[selected_filter]
|
||||
|
||||
if file_path.endswith(selected_filetype):
|
||||
selected_filetype = ""
|
||||
|
||||
service.export_to_xml(file_path + selected_filetype)
|
||||
|
||||
except ExportFileError as e:
|
||||
QMessageBox.critical(self,
|
||||
"Error saving file",
|
||||
f"Error occurred when saving the exported data: {e}",
|
||||
QMessageBox.StandardButton.Ok)
|
||||
except ExportError as e:
|
||||
QMessageBox.critical(self,
|
||||
f"Error exporting {export_name}s",
|
||||
f"An error occurred when exporting {export_name}s: {e}",
|
||||
QMessageBox.StandardButton.Ok)
|
@ -24,37 +24,26 @@ class SettingsDialog(QDialog):
|
||||
self.data_mode_label = QtWidgets.QLabel(UserConfig.get_friendly_name("transaction_level") + ":")
|
||||
data_mode_layout.addWidget(self.data_mode_label)
|
||||
|
||||
self.data_mode_dropdown = QtWidgets.QComboBox()
|
||||
for tl in TransactionLevel:
|
||||
self.data_mode_dropdown.addItem(tl.name.capitalize(), tl)
|
||||
self.data_mode_dropdown.setCurrentIndex(
|
||||
list(TransactionLevel).index(self.user_config.transaction_level)
|
||||
)
|
||||
self.data_mode_selected = QtWidgets.QLabel(self.user_config.transaction_level.name.capitalize())
|
||||
data_mode_layout.addWidget(self.data_mode_selected)
|
||||
|
||||
data_mode_layout.addWidget(self.data_mode_dropdown)
|
||||
|
||||
# Slowdown simulation
|
||||
self.slowdown_layout = QtWidgets.QHBoxLayout()
|
||||
|
||||
self.slowdown_label = QtWidgets.QLabel(UserConfig.get_friendly_name("simulate_slowdown") + ":")
|
||||
data_mode_layout.addWidget(self.slowdown_label)
|
||||
self.slowdown_layout.addWidget(self.slowdown_label)
|
||||
|
||||
self.slowdown_checkbox = QtWidgets.QCheckBox()
|
||||
self.slowdown_checkbox.setChecked(self.user_config.simulate_slowdown)
|
||||
|
||||
data_mode_layout.addWidget(self.slowdown_checkbox)
|
||||
self.slowdown_layout.addWidget(self.slowdown_checkbox)
|
||||
|
||||
layout.addLayout(data_mode_layout)
|
||||
|
||||
# Set the currently selected mode to the mode in UserConfig
|
||||
config = UserConfig()
|
||||
|
||||
# Transaction level
|
||||
current_level = config.transaction_level
|
||||
index = self.data_mode_dropdown.findData(current_level)
|
||||
if index != -1:
|
||||
self.data_mode_dropdown.setCurrentIndex(index)
|
||||
layout.addLayout(self.slowdown_layout)
|
||||
|
||||
# Slowdown simulation
|
||||
simulate_slowdown = config.simulate_slowdown
|
||||
simulate_slowdown = self.user_config.simulate_slowdown
|
||||
self.slowdown_checkbox.setChecked(simulate_slowdown)
|
||||
|
||||
# Buttons
|
||||
@ -71,15 +60,8 @@ class SettingsDialog(QDialog):
|
||||
layout.addLayout(button_layout)
|
||||
|
||||
def save_settings(self):
|
||||
data_mode = self.data_mode_dropdown.currentData()
|
||||
simulate_slowdown = self.slowdown_checkbox.isChecked()
|
||||
try:
|
||||
self.logger.debug("Saving user configuration")
|
||||
config = UserConfig()
|
||||
config.transaction_level = data_mode
|
||||
config.simulate_slowdown = simulate_slowdown
|
||||
self.accept()
|
||||
except TypeError as e:
|
||||
self.logger.error("Invalid user configuration found")
|
||||
QMessageBox.critical(None, "Invalid config detected", "Double check your configuration", QMessageBox.StandardButton.Ok, QMessageBox.StandardBUttons.NoButton)
|
||||
|
||||
self.logger.debug("Saving user configuration")
|
||||
config = UserConfig()
|
||||
config.simulate_slowdown = simulate_slowdown
|
||||
self.accept()
|
||||
|
207
src/ui/window.py
207
src/ui/window.py
@ -1,25 +1,11 @@
|
||||
from PySide6.QtGui import QGuiApplication, QAction, QIcon
|
||||
from PySide6.QtQml import QQmlApplicationEngine
|
||||
from PySide6 import QtWidgets, QtCore
|
||||
from PySide6.QtWidgets import QMessageBox, QFileDialog
|
||||
from PySide6.QtCore import QStandardPaths
|
||||
from PySide6.QtGui import QGuiApplication, QIcon
|
||||
from PySide6.QtWidgets import QMainWindow, QApplication, QTabWidget
|
||||
|
||||
from ui.dashboard.dashboard import LibraryDashboard
|
||||
from ui.main_window_tabs.member_list.member_list import MemberList
|
||||
from ui.editor import BookEditor, MemberEditor
|
||||
|
||||
from ui.settings import SettingsDialog
|
||||
|
||||
from ui.import_preview import PreviewDialog
|
||||
|
||||
from export.book_exporter import BookExporter
|
||||
from importer.book.book_importer import BookImporter
|
||||
|
||||
from utils.errors.export_error import ExportError
|
||||
from utils.errors.import_error.import_error import ImportError
|
||||
from ui.main_tabs import BookOverviewList, MemberList, BookCategoryStatisticsOverview
|
||||
from ui.menu_bar import MenuBar
|
||||
|
||||
|
||||
class LibraryWindow(QtWidgets.QMainWindow):
|
||||
class LibraryWindow(QMainWindow):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
@ -30,26 +16,22 @@ class LibraryWindow(QtWidgets.QMainWindow):
|
||||
|
||||
self.center_window()
|
||||
|
||||
# Set up menu bar
|
||||
self.create_menu_bar()
|
||||
self.setMenuBar(MenuBar(self))
|
||||
|
||||
# Central widget and layout
|
||||
central_widget = QtWidgets.QTabWidget()
|
||||
central_widget = QTabWidget()
|
||||
self.setCentralWidget(central_widget)
|
||||
|
||||
|
||||
self.dashboard = LibraryDashboard()
|
||||
self.member_list = MemberList()
|
||||
self.dashboard = BookOverviewList(self)
|
||||
self.member_list = MemberList()
|
||||
self.category_statistics_overview_list = BookCategoryStatisticsOverview()
|
||||
central_widget.addTab(self.dashboard, "Dashboard")
|
||||
central_widget.addTab(self.member_list, "Members")
|
||||
|
||||
self.file_types = {
|
||||
"XML files (*.xml)": ".xml",
|
||||
"Any file type (*)": ""}
|
||||
central_widget.addTab(self.category_statistics_overview_list, "Category stats")
|
||||
|
||||
def center_window(self):
|
||||
# Get the screen geometry
|
||||
screen = QtWidgets.QApplication.primaryScreen()
|
||||
screen = QApplication.primaryScreen()
|
||||
screen_geometry = screen.geometry()
|
||||
|
||||
# Get the dimensions of the window
|
||||
@ -64,164 +46,9 @@ class LibraryWindow(QtWidgets.QMainWindow):
|
||||
# Move the window to the calculated geometry
|
||||
self.move(window_geometry.topLeft())
|
||||
|
||||
def create_menu_bar(self):
|
||||
# Create the menu bar
|
||||
menu_bar = self.menuBar()
|
||||
def refresh_book_cards(self):
|
||||
self.dashboard.redraw_cards()
|
||||
self.category_statistics_overview_list.redraw_cards()
|
||||
|
||||
# File menu
|
||||
file_menu = menu_bar.addMenu("File")
|
||||
|
||||
# New submenu
|
||||
new_submenu = QtWidgets.QMenu(self)
|
||||
new_submenu.setTitle("New")
|
||||
file_menu.addMenu(new_submenu)
|
||||
|
||||
# New book action
|
||||
new_book_action = QAction("New book", self)
|
||||
new_book_action.triggered.connect(self.new_book)
|
||||
new_submenu.addAction(new_book_action)
|
||||
|
||||
# New book action
|
||||
new_member_action = QAction("New member", self)
|
||||
new_member_action.triggered.connect(self.new_member)
|
||||
new_submenu.addAction(new_member_action)
|
||||
|
||||
# Import submenu
|
||||
import_submenu = QtWidgets.QMenu(self)
|
||||
import_submenu.setTitle("Import")
|
||||
file_menu.addMenu(import_submenu)
|
||||
|
||||
import_books_action = QAction("Import books", self)
|
||||
import_books_action.triggered.connect(self.import_books)
|
||||
import_submenu.addAction(import_books_action)
|
||||
|
||||
import_members_action = QAction("Import members", self)
|
||||
import_members_action.triggered.connect(self.import_data)
|
||||
import_submenu.addAction(import_members_action)
|
||||
|
||||
# Export submenu
|
||||
export_submenu = QtWidgets.QMenu(self)
|
||||
export_submenu.setTitle("Export")
|
||||
file_menu.addMenu(export_submenu)
|
||||
|
||||
# Export overview
|
||||
export_overview_action = QAction("Export overview", self)
|
||||
export_overview_action.triggered.connect(self.export_data)
|
||||
export_submenu.addAction(export_overview_action)
|
||||
|
||||
# Export books
|
||||
export_books_action = QAction("Export books", self)
|
||||
export_books_action.triggered.connect(self.export_books)
|
||||
export_submenu.addAction(export_books_action)
|
||||
|
||||
# Export members
|
||||
export_members_action = QAction("Export members", self)
|
||||
export_members_action.triggered.connect(self.export_data)
|
||||
export_submenu.addAction(export_members_action)
|
||||
|
||||
file_menu.addSeparator()
|
||||
|
||||
exit_action = QAction("Exit", self)
|
||||
exit_action.setShortcut("Ctrl+Q")
|
||||
exit_action.triggered.connect(self.close)
|
||||
file_menu.addAction(exit_action)
|
||||
|
||||
# Edit menu
|
||||
edit_menu = menu_bar.addMenu("Edit")
|
||||
|
||||
# Preferences menu
|
||||
preferences_action = QAction("Preferences", self)
|
||||
preferences_action.setShortcut("Ctrl+,")
|
||||
preferences_action.triggered.connect(self.edit_preferences)
|
||||
edit_menu.addAction(preferences_action)
|
||||
|
||||
# Help menu
|
||||
help_menu = menu_bar.addMenu("Help")
|
||||
about_action = QAction("About", self)
|
||||
about_action.triggered.connect(self.about)
|
||||
help_menu.addAction(about_action)
|
||||
|
||||
# Menu action slots
|
||||
def edit_preferences(self):
|
||||
SettingsDialog(parent=self).exec()
|
||||
|
||||
# region Menu Actions
|
||||
|
||||
def export_books(self):
|
||||
try:
|
||||
home_dir = QStandardPaths.writableLocation(
|
||||
QStandardPaths.HomeLocation)
|
||||
file_path, selected_filter = QFileDialog.getSaveFileName(self,
|
||||
"Save book export",
|
||||
home_dir,
|
||||
";;".join(self.file_types.keys()))
|
||||
if file_path:
|
||||
selected_filetype = self.file_types[selected_filter]
|
||||
|
||||
if file_path.endswith(selected_filetype):
|
||||
selected_filetype = ""
|
||||
|
||||
book_exporter = BookExporter()
|
||||
book_exporter.save_xml(file_path + selected_filetype)
|
||||
|
||||
except OSError as e:
|
||||
QMessageBox.critical(self, "Error saving file", f"Error occurred when saving the exported data: {
|
||||
e}", QMessageBox.StandardButton.Ok, QMessageBox.StandardButton.NoButton)
|
||||
except ExportError as e:
|
||||
QMessageBox.critical(self, "Error exporting books", f"An error occurred when exporting books: {
|
||||
e}", QMessageBox.StandardButton.Ok, QMessageBox.StandardButton.NoButton)
|
||||
|
||||
def import_books(self):
|
||||
try:
|
||||
home_dir = QStandardPaths.writableLocation(
|
||||
QStandardPaths.HomeLocation)
|
||||
file_path, _ = QFileDialog.getOpenFileName(
|
||||
self, "Choose import file", home_dir, ";;".join(
|
||||
self.file_types.keys())
|
||||
)
|
||||
|
||||
if not file_path:
|
||||
return # User canceled
|
||||
|
||||
importer = BookImporter()
|
||||
books = importer.parse_xml(file_path)
|
||||
|
||||
if not books:
|
||||
QMessageBox.information(
|
||||
self, "No New Books", "No new books to import.", QMessageBox.Ok)
|
||||
return
|
||||
|
||||
# Show preview dialog
|
||||
dialog = PreviewDialog(books, self)
|
||||
if dialog.exec() == QtWidgets.QDialog.Accepted:
|
||||
# User confirmed, proceed with importing
|
||||
importer.save_books(books)
|
||||
QMessageBox.information(
|
||||
self, "Success", "Books imported successfully!", QMessageBox.Ok)
|
||||
self.dashboard.redraw_cards()
|
||||
else:
|
||||
QMessageBox.information(
|
||||
self, "Canceled", "Import was canceled.", QMessageBox.Ok)
|
||||
except ImportError as e:
|
||||
QMessageBox.critical(self, "Error importing books", f"An error occurred when importing books from the file provided: {
|
||||
e}", QMessageBox.StandardButton.Ok, QMessageBox.StandardButton.NoButton)
|
||||
|
||||
def new_book(self):
|
||||
# BookEditor()
|
||||
pass
|
||||
|
||||
def new_member(self):
|
||||
MemberEditor().exec()
|
||||
|
||||
|
||||
def import_data(self):
|
||||
pass
|
||||
|
||||
def export_data(self):
|
||||
pass
|
||||
|
||||
def about(self):
|
||||
QtWidgets.QMessageBox.information(
|
||||
self, "About", "Library app demonstrating the phantom read problem")
|
||||
|
||||
# endregion
|
||||
def refresh_member_cards(self):
|
||||
self.member_list.redraw_cards()
|
@ -60,11 +60,12 @@ class DatabaseConfig():
|
||||
|
||||
|
||||
class TransactionLevel(enum.Enum):
|
||||
insecure = "READ UNCOMMITTED"
|
||||
insecure = "READ COMMITTED"
|
||||
secure = "SERIALIZABLE"
|
||||
|
||||
|
||||
class UserConfig:
|
||||
|
||||
_instance = None
|
||||
|
||||
_metadata = {
|
||||
@ -78,21 +79,23 @@ class UserConfig:
|
||||
return cls._instance
|
||||
|
||||
def __init__(self):
|
||||
self._transaction_level = TransactionLevel.insecure
|
||||
self._simulate_slowdown = False
|
||||
if not hasattr(self, "logger"):
|
||||
self.logger = logging.getLogger(__name__)
|
||||
if not hasattr(self, "_transaction_level"):
|
||||
env_level = os.getenv("TRANSACTION_LEVEL", "INSECURE").upper()
|
||||
if env_level == "SECURE":
|
||||
self.logger.debug("Running in SECURE mode")
|
||||
self._transaction_level = TransactionLevel.secure
|
||||
else:
|
||||
self.logger.debug("Running in INSECURE mode")
|
||||
self._transaction_level = TransactionLevel.insecure
|
||||
if not hasattr(self, "_simulate_slowdown"):
|
||||
self._simulate_slowdown = False
|
||||
|
||||
@property
|
||||
def transaction_level(self) -> TransactionLevel:
|
||||
return self._transaction_level
|
||||
|
||||
@transaction_level.setter
|
||||
def transaction_level(self, value: Any):
|
||||
if not isinstance(value, TransactionLevel):
|
||||
raise TypeError(
|
||||
f"Invalid value for 'transaction_level'. Must be a TransactionLevel enum, got {type(value).__name__}."
|
||||
)
|
||||
self._transaction_level = value
|
||||
|
||||
@property
|
||||
def simulate_slowdown(self) -> bool:
|
||||
return self._simulate_slowdown
|
||||
@ -103,14 +106,9 @@ class UserConfig:
|
||||
raise TypeError(
|
||||
f"Invalid value for 'simulate_slowdown'. Must be a boolean, got {type(value).__name__}."
|
||||
)
|
||||
self.logger.debug(f"Slowdown simulation set to: {value}")
|
||||
self._simulate_slowdown = value
|
||||
|
||||
@classmethod
|
||||
def get_friendly_name(cls, option: str) -> str:
|
||||
return cls._metadata.get(option, {}).get("friendly_name", option)
|
||||
|
||||
def __dict__(self) -> dict:
|
||||
return {
|
||||
"transaction_level": self.transaction_level,
|
||||
"simulate_slowdown": self.simulate_slowdown,
|
||||
}
|
||||
|
@ -0,0 +1,9 @@
|
||||
from .import_error import *
|
||||
from .export_error import *
|
||||
from .database import *
|
||||
|
||||
__all__ = [
|
||||
*import_error.__all__,
|
||||
*export_error.__all__,
|
||||
*database.__all__
|
||||
]
|
@ -20,6 +20,10 @@ class DatabaseConnectionError(DatabaseError):
|
||||
|
||||
|
||||
class DuplicateEntryError(DatabaseError):
|
||||
def __init__(self, message: str):
|
||||
def __init__(self, duplicate_entry_name: str, message: str = ""):
|
||||
super().__init__(message)
|
||||
self.duplicate_entry_name = duplicate_entry_name
|
||||
self.message = message
|
||||
|
||||
|
||||
__all__ = ["DatabaseError", "DatabaseConfigError", "DatabaseConnectionError", "DuplicateEntryError"]
|
||||
|
@ -3,3 +3,19 @@ class ExportError(Exception):
|
||||
super().__init__(message)
|
||||
|
||||
self.message = message
|
||||
|
||||
|
||||
class NoExportEntityError(ExportError):
|
||||
def __init__(self, message: str):
|
||||
super().__init__(message)
|
||||
|
||||
self.message = message
|
||||
|
||||
class ExportFileError(ExportError):
|
||||
def __init__(self, message: str):
|
||||
super().__init__(message)
|
||||
|
||||
self.message = message
|
||||
|
||||
|
||||
__all__ = ["ExportError", "NoExportEntityError", "ExportFileError"]
|
22
src/utils/errors/import_error.py
Normal file
22
src/utils/errors/import_error.py
Normal file
@ -0,0 +1,22 @@
|
||||
class ImportError(Exception):
|
||||
def __init__(self, message: str):
|
||||
super().__init__(message)
|
||||
|
||||
self.message = message
|
||||
|
||||
|
||||
class InvalidContentsError(ImportError):
|
||||
def __init__(self, message: str):
|
||||
super().__init__(message)
|
||||
|
||||
self.message = message
|
||||
|
||||
|
||||
class XsdSchemeNotFoundError(ImportError):
|
||||
def __init__(self, message: str):
|
||||
super().__init__(message)
|
||||
|
||||
self.message = message
|
||||
|
||||
|
||||
__all__ = ["ImportError", "InvalidContentsError", "XsdSchemeNotFoundError"]
|
@ -1,5 +0,0 @@
|
||||
class ImportError(Exception):
|
||||
def __init__(self, message: str):
|
||||
super().__init__(message)
|
||||
|
||||
self.message = message
|
@ -1,8 +0,0 @@
|
||||
from .import_error import ImportError
|
||||
|
||||
|
||||
class InvalidContentsError(ImportError):
|
||||
def __init__(self, message: str):
|
||||
super().__init__(message)
|
||||
|
||||
self.message = message
|
@ -1,8 +0,0 @@
|
||||
from .import_error import ImportError
|
||||
|
||||
|
||||
class XsdSchemeNotFoundError(ImportError):
|
||||
def __init__(self, message: str):
|
||||
super().__init__(message)
|
||||
|
||||
self.message = message
|
@ -1,6 +0,0 @@
|
||||
from .export_error import ExportError
|
||||
class NoExportEntityError(ExportError):
|
||||
def __init__(self, message: str):
|
||||
super().__init__(message)
|
||||
|
||||
self.message = message
|
@ -1,12 +1,24 @@
|
||||
import sys
|
||||
import os
|
||||
import logging
|
||||
|
||||
def setup_logger():
|
||||
logger = logging.getLogger()
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
verbosity = os.getenv("VERBOSITY", "DEBUG").upper()
|
||||
level_map = {
|
||||
"DEBUG": logging.DEBUG,
|
||||
"INFO": logging.INFO,
|
||||
"WARNING": logging.WARNING,
|
||||
"ERROR": logging.ERROR,
|
||||
"CRITICAL": logging.CRITICAL,
|
||||
}
|
||||
log_level = level_map.get(verbosity, logging.DEBUG) # Default to DEBUG if invalid
|
||||
|
||||
logger.setLevel(log_level)
|
||||
|
||||
handler = logging.StreamHandler(sys.stdout)
|
||||
handler.setLevel(logging.DEBUG)
|
||||
handler.setLevel(log_level)
|
||||
|
||||
formatter = logging.Formatter("[%(levelname)s] - %(name)s:%(lineno)d - %(message)s")
|
||||
handler.setFormatter(formatter)
|
||||
|
Loading…
x
Reference in New Issue
Block a user