[main] Finally fixed book importing and dynamic category and author creation
This commit is contained in:
parent
723be8ae16
commit
dffd140d1a
@ -1,4 +1,4 @@
|
||||
from typing import List
|
||||
from typing import List, Dict
|
||||
import os
|
||||
import logging
|
||||
from xml.etree import ElementTree as ET
|
||||
@ -14,6 +14,7 @@ from sqlalchemy.exc import IntegrityError
|
||||
|
||||
class BookImporter:
|
||||
def __init__(self):
|
||||
# Initialize the logger and schema
|
||||
self.logger = logging.getLogger(__name__)
|
||||
try:
|
||||
self.logger.debug("Opening XSD scheme in ./")
|
||||
@ -23,7 +24,7 @@ class BookImporter:
|
||||
self.logger.error("Failed to load XSD scheme")
|
||||
raise XsdSchemeNotFoundError(f"Failed to load XSD schema: {e}")
|
||||
|
||||
def parse_xml(self, file_path: str) -> List[Book]:
|
||||
def parse_xml(self, file_path: str) -> List[Dict[str, object]]:
|
||||
"""Parses the XML file and validates it against the XSD schema."""
|
||||
try:
|
||||
tree = ET.parse(file_path)
|
||||
@ -41,96 +42,94 @@ class BookImporter:
|
||||
|
||||
# Parse author
|
||||
author_element = book_element.find("author")
|
||||
first_name = author_element.find("first_name").text
|
||||
last_name = author_element.find("last_name").text
|
||||
author = Author(first_name=first_name, last_name=last_name)
|
||||
author = {
|
||||
"first_name": author_element.find("first_name").text,
|
||||
"last_name": author_element.find("last_name").text
|
||||
}
|
||||
|
||||
# Parse categories
|
||||
category_elements = book_element.find("categories").findall("category")
|
||||
categories = [BookCategory(name=category_element.text) for category_element in category_elements]
|
||||
categories = [category_element.text for category_element in category_elements]
|
||||
|
||||
# Create a Book object
|
||||
book = Book(
|
||||
title=title,
|
||||
description=description,
|
||||
year_published=year_published,
|
||||
isbn=isbn,
|
||||
author=author,
|
||||
categories=categories
|
||||
)
|
||||
# Create a book dictionary with explicit types
|
||||
book = {
|
||||
"title": title,
|
||||
"description": description,
|
||||
"year_published": year_published,
|
||||
"isbn": isbn,
|
||||
"author": author,
|
||||
"categories": categories
|
||||
}
|
||||
books.append(book)
|
||||
|
||||
return books
|
||||
except ET.ParseError as e:
|
||||
raise ImportError(f"Failed to parse XML file: {e}")
|
||||
|
||||
def filter_new_books(self, books: List[Book]) -> List[Book]:
|
||||
"""Filters out books that already exist in the database."""
|
||||
new_books = []
|
||||
with DatabaseManager.get_session() as session:
|
||||
for book in books:
|
||||
existing_book = session.query(Book).filter(
|
||||
Book.isbn == book.isbn,
|
||||
).first()
|
||||
if existing_book is None:
|
||||
new_books.append(book)
|
||||
return new_books
|
||||
|
||||
def save_books(self, books: List[Book]):
|
||||
def save_books(self, books: List[Dict[str, object]]):
|
||||
"""Saves a list of books to the database."""
|
||||
try:
|
||||
# Get a session instance
|
||||
session = DatabaseManager.get_session()
|
||||
|
||||
# Use no_autoflush as a context manager
|
||||
with session.no_autoflush:
|
||||
with DatabaseManager.get_session() as session:
|
||||
processed_categories = {} # Cache for processed categories by name
|
||||
|
||||
for book in books:
|
||||
self.logger.debug(f"Attempting to save {book.title}")
|
||||
for book_dict in books:
|
||||
self.logger.debug(f"Attempting to save {book_dict['title']}")
|
||||
|
||||
# Check if the author exists, otherwise add
|
||||
# Check if the book already exists
|
||||
existing_book = session.query(Book).filter_by(isbn=book_dict["isbn"]).first()
|
||||
if existing_book:
|
||||
self.logger.warning(f"ISBN {book_dict['isbn']} already exists. Skipping.")
|
||||
continue
|
||||
|
||||
# Check or add the author
|
||||
existing_author = session.query(Author).filter_by(
|
||||
first_name=book.author.first_name,
|
||||
last_name=book.author.last_name
|
||||
first_name=book_dict["author"]["first_name"],
|
||||
last_name=book_dict["author"]["last_name"]
|
||||
).one_or_none()
|
||||
|
||||
if existing_author is not None:
|
||||
self.logger.debug(f"Author {existing_author.first_name} {existing_author.last_name} already exists. Reusing")
|
||||
book.author = existing_author
|
||||
else:
|
||||
self.logger.debug(f"Creating new author: {book.author.first_name} {book.author.last_name}")
|
||||
session.add(book.author)
|
||||
self.logger.debug(f"Author {existing_author.first_name} {existing_author.last_name} already exists. Reusing.")
|
||||
author = existing_author
|
||||
else:
|
||||
self.logger.debug(f"Creating new author: {book_dict['author']['first_name']} {book_dict['author']['last_name']}")
|
||||
author = Author(
|
||||
first_name=book_dict["author"]["first_name"],
|
||||
last_name=book_dict["author"]["last_name"]
|
||||
)
|
||||
session.add(author)
|
||||
|
||||
# Handle categories
|
||||
filtered_categories = []
|
||||
for category in book.categories:
|
||||
existing_category = session.query(BookCategory).filter_by(name=category.name).one_or_none()
|
||||
for category_name in book_dict["categories"]:
|
||||
if category_name in processed_categories:
|
||||
filtered_categories.append(processed_categories[category_name])
|
||||
continue
|
||||
|
||||
existing_category = session.query(BookCategory).filter_by(name=category_name).one_or_none()
|
||||
if existing_category is not None:
|
||||
self.logger.debug(f"Category {existing_category.name} already exists. Reusing")
|
||||
filtered_categories.append(session.merge(existing_category))
|
||||
|
||||
self.logger.debug(f"Category {category_name} already exists. Reusing.")
|
||||
processed_categories[category_name] = existing_category
|
||||
filtered_categories.append(existing_category)
|
||||
else:
|
||||
self.logger.debug(f"Adding new category: {category.name}")
|
||||
session.add(category) # Add new category to the session
|
||||
filtered_categories.append(category) # Use the new category
|
||||
|
||||
book.categories = filtered_categories
|
||||
|
||||
|
||||
# Check if the book already exists
|
||||
existing_book = session.query(Book).filter_by(isbn=book.isbn).first()
|
||||
if not existing_book:
|
||||
session.add(book)
|
||||
else:
|
||||
self.logger.warning(f"ISBN {book.isbn} already exists. Skipping.")
|
||||
continue
|
||||
self.logger.debug(f"Adding new category: {category_name}")
|
||||
new_category = BookCategory(name=category_name)
|
||||
session.add(new_category)
|
||||
processed_categories[category_name] = new_category
|
||||
filtered_categories.append(new_category)
|
||||
|
||||
book = Book(
|
||||
title=book_dict["title"],
|
||||
description=book_dict["description"],
|
||||
year_published=book_dict["year_published"],
|
||||
isbn=book_dict["isbn"],
|
||||
author=author,
|
||||
categories=filtered_categories
|
||||
)
|
||||
session.add(book)
|
||||
# Commit all changes
|
||||
session.commit()
|
||||
except IntegrityError as e:
|
||||
except e:
|
||||
session.rollback()
|
||||
raise ImportError(f"An error occurred when importing books: {e}") from e
|
||||
finally:
|
||||
# Clean up the session
|
||||
session.close()
|
||||
|
@ -10,6 +10,8 @@ from models.book_overview import BooksOverview
|
||||
|
||||
from utils.database import DatabaseManager
|
||||
|
||||
from sqlalchemy import delete
|
||||
|
||||
STATUS_TO_COLOR_MAP = {
|
||||
BookStatusEnum.available: "#3c702e",
|
||||
BookStatusEnum.borrowed: "#702525",
|
||||
@ -90,9 +92,11 @@ class BookCard(QWidget):
|
||||
action_edit_book = context_menu.addAction("Edit Book")
|
||||
action_edit_author = context_menu.addAction("Edit Author")
|
||||
action_mark_returned = context_menu.addAction("Mark as Returned")
|
||||
action_remove_reservation = context_menu.addAction("Remove reservation")
|
||||
action_remove_reservation = context_menu.addAction(
|
||||
"Remove reservation")
|
||||
context_menu.addSeparator()
|
||||
delete_book_action = context_menu.addAction("Delete Book")
|
||||
delete_book_action.triggered.connect(self.delete_book)
|
||||
|
||||
if self.book_overview.status != BookStatusEnum.borrowed:
|
||||
action_mark_returned.setVisible(False)
|
||||
@ -123,5 +127,15 @@ class BookCard(QWidget):
|
||||
print("Mark as Returned selected")
|
||||
elif action == action_remove_reservation:
|
||||
print("Remove reservation selected")
|
||||
elif action == delete_book_action:
|
||||
print("Delete book")
|
||||
|
||||
def delete_book(self):
|
||||
print("Delete")
|
||||
with DatabaseManager.get_session() as session:
|
||||
try:
|
||||
stmt = delete(Book).where(Book.id == self.book_overview.id)
|
||||
session.execute(stmt)
|
||||
session.commit()
|
||||
self.setVisible(False)
|
||||
except Exception as e:
|
||||
session.rollback
|
||||
print(e)
|
||||
|
@ -66,7 +66,12 @@ class LibraryDashboard(QWidget):
|
||||
def filter_books(self, text):
|
||||
"""Filter the cards based on the search input."""
|
||||
for card, book in zip(self.book_cards, self.books):
|
||||
card.setVisible(text.lower() in book.title.lower())
|
||||
|
||||
title_contains_text = text.lower() in book.title.lower()
|
||||
author_name_contains_text = text.lower() in book.author_name.lower()
|
||||
isbn_contains_text = text.lower() in book.isbn
|
||||
|
||||
card.setVisible(title_contains_text or author_name_contains_text or isbn_contains_text)
|
||||
|
||||
def register_member(self):
|
||||
QMessageBox.information(
|
||||
|
@ -1,7 +1,8 @@
|
||||
from typing import List, Dict
|
||||
from PySide6.QtWidgets import QDialog, QVBoxLayout, QTableWidget, QTableWidgetItem, QPushButton, QHeaderView
|
||||
|
||||
class PreviewDialog(QDialog):
|
||||
def __init__(self, books, parent=None):
|
||||
def __init__(self, books: List[Dict], parent=None):
|
||||
super().__init__(parent)
|
||||
|
||||
self.setWindowTitle("Preview Books to Import")
|
||||
@ -15,10 +16,10 @@ class PreviewDialog(QDialog):
|
||||
table.setHorizontalHeaderLabels(["Title", "Author", "Year", "ISBN"])
|
||||
|
||||
for row, book in enumerate(books):
|
||||
table.setItem(row, 0, QTableWidgetItem(book.title))
|
||||
table.setItem(row, 1, QTableWidgetItem(f"{book.author.first_name} {book.author.last_name}"))
|
||||
table.setItem(row, 2, QTableWidgetItem(book.year_published))
|
||||
table.setItem(row, 3, QTableWidgetItem(book.isbn))
|
||||
table.setItem(row, 0, QTableWidgetItem(book["title"]))
|
||||
table.setItem(row, 1, QTableWidgetItem(f"{book["author"]["first_name"]} {book["author"]["last_name"]}"))
|
||||
table.setItem(row, 2, QTableWidgetItem(book["year_published"]))
|
||||
table.setItem(row, 3, QTableWidgetItem(book["isbn"]))
|
||||
|
||||
header = table.horizontalHeader()
|
||||
header.setSectionResizeMode(QHeaderView.Stretch)
|
||||
|
@ -186,18 +186,17 @@ class LibraryWindow(QtWidgets.QMainWindow):
|
||||
|
||||
importer = BookImporter()
|
||||
books = importer.parse_xml(file_path)
|
||||
new_books = importer.filter_new_books(books)
|
||||
|
||||
if not new_books:
|
||||
if not books:
|
||||
QMessageBox.information(
|
||||
self, "No New Books", "No new books to import.", QMessageBox.Ok)
|
||||
return
|
||||
|
||||
# Show preview dialog
|
||||
dialog = PreviewDialog(new_books, self)
|
||||
dialog = PreviewDialog(books, self)
|
||||
if dialog.exec() == QtWidgets.QDialog.Accepted:
|
||||
# User confirmed, proceed with importing
|
||||
importer.save_books(new_books)
|
||||
importer.save_books(books)
|
||||
QMessageBox.information(
|
||||
self, "Success", "Books imported successfully!", QMessageBox.Ok)
|
||||
self.dashboard.redraw_cards()
|
||||
|
Loading…
x
Reference in New Issue
Block a user