diff --git a/app/__init__.py b/app/__init__.py index 5ab3565..61ef994 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -7,20 +7,23 @@ from app.doc.main_swag import main_swagger app = Flask(__name__) from app.config import FlaskTesting, FlaskProduction + app.config.from_object(FlaskTesting) flask_mail = Mail(app) jwt_manager = JWTManager(app) swag = Swagger(app, template=main_swagger) + def create_app(): - from app.api import bp, bp_errors, bp_product, bp_user, bp_cart - app.register_blueprint(bp) - app.register_blueprint(bp_errors) - app.register_blueprint(bp_product) - app.register_blueprint(bp_user) - app.register_blueprint(bp_cart) + from app.api import bp, bp_errors, bp_product, bp_user, bp_cart - from . import jwt_utils + app.register_blueprint(bp) + app.register_blueprint(bp_errors) + app.register_blueprint(bp_product) + app.register_blueprint(bp_user) + app.register_blueprint(bp_cart) - return app \ No newline at end of file + from . import jwt_utils + + return app diff --git a/app/api/routes/__init__.py b/app/api/routes/__init__.py index a342a91..7fd9edf 100644 --- a/app/api/routes/__init__.py +++ b/app/api/routes/__init__.py @@ -1 +1,15 @@ -from app.api.routes import main_routes, error_routes, product_routes, user_routes, cart_routes \ No newline at end of file +from app.api.routes.user import ( + register_route, + login_route, + logout_route, + update_route, + delete_route, +) +from app.api.routes.product import ( + product_create_route, + product_delete_route, + product_info_route, + product_page_route, +) + +from app.api.routes import main_routes, error_routes, cart_routes diff --git a/app/api/routes/cart_routes.py b/app/api/routes/cart_routes.py index d388100..d30db8a 100644 --- a/app/api/routes/cart_routes.py +++ b/app/api/routes/cart_routes.py @@ -1,7 +1,13 @@ from flask import jsonify, abort, request from flask_jwt_extended import jwt_required, get_jwt_identity -from app.doc.cart_swag import show_cart_swagger, add_to_cart_swagger, remove_from_cart_swagger, update_count_in_cart_swagger, purchase_swagger +from app.doc.cart_swag import ( + show_cart_swagger, + add_to_cart_swagger, + remove_from_cart_swagger, + update_count_in_cart_swagger, + purchase_swagger, +) from flasgger import swag_from @@ -9,60 +15,65 @@ from app.api import bp_cart from app.services.cart_service import CartService -@bp_cart.route('', methods=['GET']) + +@bp_cart.route("", methods=["GET"]) @jwt_required() @swag_from(show_cart_swagger) def show_cart(): - user_id = get_jwt_identity() + user_id = get_jwt_identity() - result, status_code = CartService.show_cart(user_id) + result, status_code = CartService.show_cart(user_id) - return result, status_code + return result, status_code -@bp_cart.route('/add/', methods=['PUT']) + +@bp_cart.route("/add/", methods=["PUT"]) @jwt_required() @swag_from(add_to_cart_swagger) def add_to_cart(product_id: int): - user_id = get_jwt_identity() - count = request.args.get('count', default=1, type=int) + user_id = get_jwt_identity() + count = request.args.get("count", default=1, type=int) - if count < 1: - return abort(400) - - result, status_code = CartService.add_to_cart(user_id, product_id, count) + if count < 1: + return abort(400) - return result, status_code + result, status_code = CartService.add_to_cart(user_id, product_id, count) -@bp_cart.route('/remove/', methods=['DELETE']) + return result, status_code + + +@bp_cart.route("/remove/", methods=["DELETE"]) @jwt_required() @swag_from(remove_from_cart_swagger) def remove_from_cart(product_id: int): - user_id = get_jwt_identity() + user_id = get_jwt_identity() - result, status_code = CartService.delete_from_cart(user_id, product_id) + result, status_code = CartService.delete_from_cart(user_id, product_id) - return result, status_code + return result, status_code -@bp_cart.route('/update/', methods=['PUT']) + +@bp_cart.route("/update/", methods=["PUT"]) @jwt_required() @swag_from(update_count_in_cart_swagger) def update_count_in_cart(product_id: int): - user_id = get_jwt_identity() - count = request.args.get('count', type=int) + user_id = get_jwt_identity() + count = request.args.get("count", type=int) - if not count: - return abort(400) + if not count: + return abort(400) - result, status_code = CartService.update_count(user_id, product_id, count) + result, status_code = CartService.update_count(user_id, product_id, count) - return result, status_code + return result, status_code -@bp_cart.route('/purchase', methods=['GET']) + +@bp_cart.route("/purchase", methods=["GET"]) @jwt_required() @swag_from(purchase_swagger) def purchase(): - user_id = get_jwt_identity() + user_id = get_jwt_identity() - result, status_code = CartService.purchase(user_id) + result, status_code = CartService.purchase(user_id) - return result, status_code \ No newline at end of file + return result, status_code diff --git a/app/api/routes/error_routes.py b/app/api/routes/error_routes.py index 53672ea..e974e4d 100644 --- a/app/api/routes/error_routes.py +++ b/app/api/routes/error_routes.py @@ -1,29 +1,38 @@ from app.api import bp_errors + @bp_errors.app_errorhandler(400) def bad_request(e): - return {"msg": "The request was incorrectly formatted, or contained invalid data"}, 400 + return { + "msg": "The request was incorrectly formatted, or contained invalid data" + }, 400 + @bp_errors.app_errorhandler(401) def unauthorized(e): - return {"msg": "Failed to authorize the request"}, 401 + return {"msg": "Failed to authorize the request"}, 401 + @bp_errors.app_errorhandler(403) def forbidden(e): - return {"msg": "You shall not pass"}, 403 + return {"msg": "You shall not pass"}, 403 + @bp_errors.app_errorhandler(404) def not_found(e): - return {"msg": "The requested resource was not found"}, 404 + return {"msg": "The requested resource was not found"}, 404 + @bp_errors.app_errorhandler(405) def method_not_allowed(e): - return {"msg": "The method used is not allowed in current context"}, 405 + return {"msg": "The method used is not allowed in current context"}, 405 + @bp_errors.app_errorhandler(500) def internal_error(e): - return {"msg": "An error occurred on he server"}, 500 + return {"msg": "An error occurred on he server"}, 500 + @bp_errors.app_errorhandler(501) -def internal_error(e): - return {"msg": "This function has not been implemented yet. Check back soon!"}, 501 \ No newline at end of file +def unimplemented_error(e): + return {"msg": "This function has not been implemented yet. Check back soon!"}, 501 diff --git a/app/api/routes/main_routes.py b/app/api/routes/main_routes.py index f891e61..bc54eed 100644 --- a/app/api/routes/main_routes.py +++ b/app/api/routes/main_routes.py @@ -1,11 +1,12 @@ -from flask import jsonify, abort +from flask import jsonify from flasgger import swag_from from app.doc.root_swag import root_swagger from app.api import bp -@bp.route('/') + +@bp.route("/") @swag_from(root_swagger) def hello(): - return jsonify({'message': 'Hello, Flask!'}) \ No newline at end of file + return jsonify({"message": "Hello, Flask!"}) diff --git a/app/api/routes/product/product_create_route.py b/app/api/routes/product/product_create_route.py new file mode 100644 index 0000000..848f5fc --- /dev/null +++ b/app/api/routes/product/product_create_route.py @@ -0,0 +1,33 @@ +from flask import jsonify, abort, request +from flask_jwt_extended import jwt_required, get_jwt_identity + +from app.doc.product_swag import create_product_swagger + +from flasgger import swag_from + +from app.api import bp_product + +from app.services.product import product_create_service + + +@bp_product.route("/create", methods=["POST"]) +@swag_from(create_product_swagger) +@jwt_required() +def create_product_listing(): + user_id = get_jwt_identity() + name = request.json.get("name") + price = request.json.get("price") + + if name is None or price is None: + return abort(400) + + float_price = float(price) + + if not isinstance(float_price, float): + return abort(400) + + result, status_code = product_create_service.create_product( + user_id, name, float_price + ) + + return jsonify(result), status_code diff --git a/app/api/routes/product/product_delete_route.py b/app/api/routes/product/product_delete_route.py new file mode 100644 index 0000000..be6e4b5 --- /dev/null +++ b/app/api/routes/product/product_delete_route.py @@ -0,0 +1,19 @@ +from flask import jsonify, abort, request +from flask_jwt_extended import jwt_required, get_jwt_identity + +from flasgger import swag_from + +from app.api import bp_product + +from app.services.product import product_delete_service + + +@bp_product.route("//delete", methods=["DELETE"]) +@jwt_required() +def delete_product(product_id: int): + user_id = get_jwt_identity() + + + result, status_code = product_delete_service.delete_product(user_id, product_id) + + return jsonify(result), status_code diff --git a/app/api/routes/product/product_info_route.py b/app/api/routes/product/product_info_route.py new file mode 100644 index 0000000..dd36ee7 --- /dev/null +++ b/app/api/routes/product/product_info_route.py @@ -0,0 +1,25 @@ +from flask import jsonify, request + +from app.doc.product_swag import get_product_info_swagger + +from flasgger import swag_from + +from app.api import bp_product + +from app.services.product import product_info_service + + +@bp_product.route("/", methods=["GET"]) +@swag_from(get_product_info_swagger) +def get_product_info(product_id: int): + fields = ["name", "price", "image", "image_name", "seller"] + + fields_param = request.args.get("fields") + + fields_param_list = fields_param.split(",") if fields_param else fields + + common_fields = list(set(fields) & set(fields_param_list)) + + result, status_code = product_info_service.product_info(product_id) + + return jsonify(result), status_code diff --git a/app/api/routes/product/product_page_route.py b/app/api/routes/product/product_page_route.py new file mode 100644 index 0000000..b2cc2b4 --- /dev/null +++ b/app/api/routes/product/product_page_route.py @@ -0,0 +1,22 @@ +from flask import jsonify, abort, request + +from app.doc.product_swag import get_products_swagger + +from flasgger import swag_from + +from app.api import bp_product + +from app.services.product import product_list_service + + +@bp_product.route("", methods=["GET"]) +@swag_from(get_products_swagger) +def get_products(): + page = request.args.get("page", default=0, type=int) + + if page < 0: + return abort(400) + + result, status_code = product_list_service.product_list(page) + + return jsonify(result), status_code diff --git a/app/api/routes/product_routes.py b/app/api/routes/product_routes.py deleted file mode 100644 index 0ab213a..0000000 --- a/app/api/routes/product_routes.py +++ /dev/null @@ -1,59 +0,0 @@ -from flask import jsonify, abort, request -from flask_jwt_extended import jwt_required, get_jwt_identity - -from app.doc.product_swag import get_products_swagger, get_product_info_swagger, create_product_swagger - -from flasgger import swag_from - -from app.api import bp_product - -from app.services.product_service import ProductService - -@bp_product.route('', methods=['GET']) -@swag_from(get_products_swagger) -def get_products(): - page = request.args.get('page', default=0, type=int) - - if page < 0: - return abort(400) - - result, status_code = ProductService.get_products(page) - - return result, status_code - -@bp_product.route('/', methods=['GET']) -@swag_from(get_product_info_swagger) -def get_product_info(id: int): - fields = ['name', 'price', 'image', 'image_name', 'seller'] - - fields_param = request.args.get('fields') - - fields_param_list = fields_param.split(',') if fields_param else fields - - common_fields = list(set(fields) & set(fields_param_list)) - - result, status_code = ProductService.get_product_info(common_fields, id) - - return result, status_code - - - -@bp_product.route('/create', methods=['POST']) -@swag_from(create_product_swagger) -@jwt_required() -def create_product_listing(): - user_id = get_jwt_identity() - name = request.json.get('name') - price = request.json.get('price') - - if name is None or price is None: - return abort(400) - - float_price = float(price) - - if not isinstance(float_price, float): - return abort(400) - - result, status_code = ProductService.create_listing(user_id, name, float_price) - - return result, status_code \ No newline at end of file diff --git a/app/api/routes/user/delete_route.py b/app/api/routes/user/delete_route.py new file mode 100644 index 0000000..512cb9b --- /dev/null +++ b/app/api/routes/user/delete_route.py @@ -0,0 +1,22 @@ +from app.api import bp_user +from flask_jwt_extended import jwt_required, get_jwt_identity, get_jwt +from flask import request, abort + +from flasgger import swag_from + +from app.doc.user_swag import delete_swagger +from app.services.user import delete_service, logout_service + + +@bp_user.route("/delete", methods=["DELETE"]) +@swag_from(delete_swagger) +@jwt_required() +def delete_user(): + user_id = get_jwt_identity() + + result, status_code = delete_service.delete_user(user_id) + + jwt = get_jwt() + logout_service.logout(jwt, user_id, True) + + return result, status_code diff --git a/app/api/routes/user/login_route.py b/app/api/routes/user/login_route.py new file mode 100644 index 0000000..527fcb8 --- /dev/null +++ b/app/api/routes/user/login_route.py @@ -0,0 +1,33 @@ +from app.api import bp_user +from flask import request, jsonify + +from flasgger import swag_from + +import app.messages.api_responses.user_responses as response +import app.messages.api_errors as errors +from app.doc.user_swag import login_swagger + +from app.services.user import login_service + +@bp_user.route("/login", methods=["POST"]) +@swag_from(login_swagger) +def login(): + data = request.get_json() + + if not data: + result, status_code = errors.NOT_JSON + return jsonify(result), status_code + + required_fields = ["username", "password"] + missing_fields = [field for field in required_fields if field not in data] + + if missing_fields: + result, status_code = errors.MISSING_FIELDS(missing_fields) + return jsonify(result), status_code + + username = data["username"] + password = data["password"] + + result, status_code = login_service.login(username, password) + + return result, status_code diff --git a/app/api/routes/user/logout_route.py b/app/api/routes/user/logout_route.py new file mode 100644 index 0000000..4c1b3fc --- /dev/null +++ b/app/api/routes/user/logout_route.py @@ -0,0 +1,20 @@ +from app.api import bp_user + +from flasgger import swag_from + +from flask_jwt_extended import get_jwt_identity, jwt_required, get_jwt + +from app.doc.user_swag import logout_swagger +from app.services.user import logout_service + + +@bp_user.route("/logout", methods=["DELETE"]) +@swag_from(logout_swagger) +@jwt_required() +def logout(): + jwt = get_jwt() + user_id = get_jwt_identity() + + result, status_code = logout_service.logout(jwt, user_id, True) + + return result, status_code diff --git a/app/api/routes/user/register_route.py b/app/api/routes/user/register_route.py new file mode 100644 index 0000000..6d0107d --- /dev/null +++ b/app/api/routes/user/register_route.py @@ -0,0 +1,39 @@ +from app.api import bp_user +from flask import request, jsonify + +from app.services.user import register_service + +from app.doc.user_swag import register_swagger +import app.messages.api_responses.user_responses as response +import app.messages.api_errors as errors + + +from flasgger import swag_from + + +@bp_user.route("/register", methods=["POST"]) +@swag_from(register_swagger) +def register(): + data = request.get_json() + + if not data: + result, status_code = errors.NOT_JSON + return jsonify(result), status_code + + required_fields = ["username", "displayname", "email", "password"] + missing_fields = [field for field in required_fields if field not in data] + + if missing_fields: + result, status_code = errors.MISSING_FIELDS(missing_fields) + return jsonify(result), status_code + + username = data["username"] + displayname = data["displayname"] + email = data["email"] + password = data["password"] + + result, status_code = register_service.register( + username, displayname, email, password + ) + + return jsonify(result), status_code diff --git a/app/api/routes/user/update_route.py b/app/api/routes/user/update_route.py new file mode 100644 index 0000000..5eb5db3 --- /dev/null +++ b/app/api/routes/user/update_route.py @@ -0,0 +1,40 @@ +from app.api import bp_user +from flask import request, jsonify +from flask_jwt_extended import jwt_required, get_jwt_identity, get_jwt + +from flasgger import swag_from + +import app.messages.api_errors as errors +from app.doc.user_swag import update_swagger + +from app.services.user import logout_service, update_user_service + + +@bp_user.route("/update", methods=["PUT"]) +@swag_from(update_swagger) +@jwt_required() +def update_user(): + data = request.get_json() + + possible_fields = ["new_username", "new_displayname", "new_email", "new_password"] + selected_fields = [field for field in possible_fields if field in data] + + if not selected_fields: + result, status_code = errors.NO_FIELD_PROVIDED(possible_fields) + return jsonify(result), status_code + + user_id = get_jwt_identity() + + new_username = data.get("new_username") + new_displayname = data.get("new_displayname") + new_email = data.get("new_email") + new_password = data.get("new_password") + + result, status_code = update_user_service.update_user(user_id, new_username, new_displayname, new_email, new_password) + + if status_code < 300: + jwt = get_jwt() + logout_service.logout(jwt, user_id, False) + + return result, status_code + diff --git a/app/api/routes/user_routes.py b/app/api/routes/user_routes.py deleted file mode 100644 index d53b3f0..0000000 --- a/app/api/routes/user_routes.py +++ /dev/null @@ -1,126 +0,0 @@ -from app.api import bp_user -from flask_jwt_extended import jwt_required, get_jwt_identity, get_jwt -from flask import request, abort - -from flasgger import swag_from - -from app.doc.user_swag import login_swagger, logout_swagger, delete_swagger, register_swagger - -from app.services.user_service import UserService - -@bp_user.route('/register', methods=['POST']) -@swag_from(register_swagger) -def register(): - username = request.json.get('username') - displayname = request.json.get('displayname') - email = request.json.get('email') - password = request.json.get('password') - - if username is None or email is None or password is None or displayname is None: - return abort(400) - - result, status_code = UserService.register(username, displayname, email, password) - - return result, status_code - -@bp_user.route('/login', methods=['POST']) -@swag_from(login_swagger) -def login(): - username = request.json.get('username') - password = request.json.get('password') - - if username is None or password is None: - return abort(400) - - result, status_code = UserService.login(username, password) - - return result, status_code - -@bp_user.route('/logout', methods=['DELETE']) -@swag_from(logout_swagger) -@jwt_required() -def logout(): - jwt = get_jwt() - user_id = get_jwt_identity() - - result, status_code = UserService.logout(jwt, user_id) - - return result, status_code - -@bp_user.route('/update/username', methods=['PUT']) -@jwt_required() -def update_username(): - user_id = get_jwt_identity() - new_username = request.json.get('new_username') - - if new_username is None: - return abort(400) - - result, status_code = UserService.update_username(user_id, new_username) - - jwt = get_jwt() - UserService.logout(jwt, user_id) - - return result, status_code - -@bp_user.route('/update/displayname', methods=['PUT']) -@jwt_required() -def update_displayname(): - user_id = get_jwt_identity() - new_displayname = request.json.get('new_displayname') - - if new_displayname is None: - return abort(400) - - result, status_code = UserService.update_username(user_id, new_displayname) - - jwt = get_jwt() - UserService.logout(jwt, user_id) - - return result, status_code - -@bp_user.route('/update/email', methods=['PUT']) -@jwt_required() -def update_email(): - username = get_jwt_identity() - new_mail = request.json.get('new_email') - - if new_mail is None: - return abort(400) - - result, status_code = UserService.update_email(username, new_mail) - - jwt = get_jwt() - UserService.logout(jwt, username) - - return result, status_code - - -@bp_user.route('/update/password', methods=['PUT']) -@jwt_required() -def update_password(): - username = get_jwt_identity() - new_password = request.json.get('new_password') - - if new_password is None: - return abort(400) - - result, status_code = UserService.update_password(username, new_password) - - jwt = get_jwt() - UserService.logout(jwt, username) - - return result, status_code - -@bp_user.route('/delete', methods=['DELETE']) -@swag_from(delete_swagger) -@jwt_required() -def delete_user(): - user_id = get_jwt_identity() - - result, status_code = UserService.delete_user(user_id) - - jwt = get_jwt() - UserService.logout(jwt, user_id) - - return result, status_code \ No newline at end of file diff --git a/app/config.py b/app/config.py index 101f7ea..e09b396 100644 --- a/app/config.py +++ b/app/config.py @@ -1,30 +1,42 @@ import os + class MySqlConfig: - MYSQL_USER = os.environ.get('MYSQL_USER') - MYSQL_DATABASE = os.environ.get('MYSQL_DATABASE') - MYSQL_HOST = os.environ.get('MYSQL_HOST') - MYSQL_PORT = os.environ.get('MYSQL_PORT') - MYSQL_PASSWORD = os.environ.get('MYSQL_PASSWORD') + MYSQL_USER = os.environ.get("MYSQL_USER") + MYSQL_DATABASE = os.environ.get("MYSQL_DATABASE") + MYSQL_HOST = os.environ.get("MYSQL_HOST") + MYSQL_PORT = os.environ.get("MYSQL_PORT") + MYSQL_PASSWORD = os.environ.get("MYSQL_PASSWORD") + class RedisConfig: - REDIS_HOST = os.environ.get('REDIS_HOST') - REDIS_PORT = os.environ.get('REDIS_PORT') - REDIS_PASSWORD = os.environ.get('REDIS_PASSWORD') + REDIS_HOST = os.environ.get("REDIS_HOST") + REDIS_PORT = os.environ.get("REDIS_PORT") + REDIS_PASSWORD = os.environ.get("REDIS_PASSWORD") + class FlaskProduction: - DEBUG = False - JWT_SECRET_KEY = os.environ.get('JWT_SECRET_KEY') - SERVER_NAME = os.environ.get('HOST') + ':' + os.environ.get('PORT') + DEBUG = False + JWT_SECRET_KEY = os.environ.get("JWT_SECRET_KEY") + SERVER_NAME = os.environ.get("HOST") + ":" + os.environ.get("PORT") + + MAIL_SERVER = os.environ.get("MAIL_SERVER") + MAIL_PORT = os.environ.get("MAIL_PORT") + MAIL_USERNAME = os.environ.get("MAIL_USERNAME") + MAIL_PASSWORD = os.environ.get("MAIL_PASSWORD") + MAIL_USE_TLS = os.environ.get("MAIL_USE_TLS") + MAIL_DEFAULT_SENDER = os.environ.get("MAIL_DEFAULT_SENDER") + class FlaskTesting: - DEBUG = True - JWT_SECRET_KEY = os.environ.get('JWT_SECRET_KEY') - SERVER_NAME = os.environ.get('HOST') + ':' + os.environ.get('PORT') + DEBUG = True + TESTING = True + JWT_SECRET_KEY = os.environ.get("JWT_SECRET_KEY") + SERVER_NAME = os.environ.get("HOST") + ":" + os.environ.get("PORT") - MAIL_SERVER = os.environ.get('MAIL_SERVER') - MAIL_PORT = os.environ.get('MAIL_PORT') - MAIL_USERNAME = os.environ.get('MAIL_USERNAME') - MAIL_PASSWORD = os.environ.get('MAIL_PASSWORD') - MAIL_USE_TLS = os.environ.get('MAIL_USE_TLS') - MAIL_DEFAULT_SENDER = os.environ.get('MAIL_DEFAULT_SENDER') \ No newline at end of file + MAIL_SERVER = os.environ.get("MAIL_SERVER") + MAIL_PORT = os.environ.get("MAIL_PORT") + MAIL_USERNAME = os.environ.get("MAIL_USERNAME") + MAIL_PASSWORD = os.environ.get("MAIL_PASSWORD") + MAIL_USE_TLS = os.environ.get("MAIL_USE_TLS") + MAIL_DEFAULT_SENDER = os.environ.get("MAIL_DEFAULT_SENDER") diff --git a/app/db/cart_db.py b/app/db/cart_db.py new file mode 100644 index 0000000..e69de29 diff --git a/app/db/product_db.py b/app/db/product_db.py new file mode 100644 index 0000000..37a87a4 --- /dev/null +++ b/app/db/product_db.py @@ -0,0 +1,118 @@ +from typing import Optional + +from app.extensions import db_connection + +from app.models.product_model import Product + + +def fetch_products(page: int = 0) -> Optional[list[Product]]: + cursor = db_connection.cursor(dictionary=True) + + offset = 10 * page + cursor.execute( + "select product.id, user.displayname as seller, product.name, product.price_pc from product inner join user on user.id = product.seller_id order by product.id limit 10 offset %s", + (offset,), + ) + results = cursor.fetchall() + + if len(results) < 1: + return None + + result_products: list[Product] = [] + + for row in results: + result_products.append( + Product( + product_id=row["id"], + seller_id=row["seller_id"], + name=row["name"], + price=row["price"], + creation_date=row["creation_date"], + ) + ) + + return result_products + +def fetch_product_by_id(product_id: int) -> Optional[Product]: + """ + Fetches specific product info + + :param product_id: ID of product to be updated. + :type product_id: int + """ + + cursor = db_connection.cursor(dictionary=True) + + cursor.execute("select * from product where id = %s", (product_id,)) + result = cursor.fetchone() + + if cursor.rowcount != 1: + return None + + result_product = Product( + product_id=result["id"], + seller_id=result["seller_id"], + name=result["name"], + price=result["price"], + creation_date=result["creation_date"], + ) + + return result_product + + +def fetch_product_extended_by_id(product_id: int) -> Optional[Product]: + """ + Fetches specific product info including the seller n + + :param product_id: ID of product to be updated. + :type product_id: int + """ + + cursor = db_connection.cursor(dictionary=True) + + cursor.execute("select * from product inner join user on user.id = product.seller_id where product.id = %s", (product_id,)) + result = cursor.fetchone() + + if cursor.rowcount != 1: + return None + + result_product = Product( + product_id=result["id"], + seller_id=result["seller_id"], + seller_name=result["displayname"], + name=result["name"], + price=result["price"], + creation_date=result["creation_date"], + ) + + return result_product + +def insert_product(product: Product): + """ + Creates a new product listing + + :param seller_id: User ID + :type seller_id: str + :param name: New product's name + :type name: str + :param price: New product's price + :type price: float + """ + + cursor = db_connection.cursor() + + cursor.execute( + "insert into product(seller_id, name, price_pc) values (%s, %s, %s)", + (product.seller_id, product.name, round(product.price, 2)), + ) + db_connection.commit() + + +def delete_product(product: Product): + cursor = db_connection.cursor() + + cursor.execute( + "delete from product where id = %s", + (product.product_id,), + ) + db_connection.commit() diff --git a/app/db/user_db.py b/app/db/user_db.py new file mode 100644 index 0000000..e4cf4f3 --- /dev/null +++ b/app/db/user_db.py @@ -0,0 +1,79 @@ +from typing import Optional + +from app.extensions import db_connection + +from app.models.user_model import User + + +def fetch_by_username(username: str) -> Optional[User]: + cursor = db_connection.cursor(dictionary=True) + + cursor.execute("select * from user where username = %s", (username,)) + + result = cursor.fetchone() + + result_user = ( + User( + user_id=result["id"], + username=result["username"], + displayname=result["displayname"], + email=result["email"], + password=result["password"], + role_id=result["role_id"], + creation_date=result["creation_date"], + ) + if result + else None + ) + + return result_user + + +def fetch_by_id(user_id: int) -> Optional[User]: + cursor = db_connection.cursor(dictionary=True) + + cursor.execute("select * from user where id = %s", (user_id,)) + + result = cursor.fetchone() + result_user = ( + User( + user_id=result["id"], + username=result["username"], + displayname=result["displayname"], + email=result["email"], + password=result["password"], + role_id=result["role_id"], + creation_date=result["creation_date"], + ) + if result + else None + ) + + return result_user + + +def insert_user(new_user: User): + cursor = db_connection.cursor(dictionary=True) + + cursor.execute( + "insert into user (username, displayname, email, password) values (%s, %s, %s, %s)", + (new_user.username, new_user.displayname, new_user.email, new_user.password), + ) + db_connection.commit() + + +def delete_user(user: User): + cursor = db_connection.cursor(dictionary=True) + + cursor.execute("delete from user where id = %s", (user.user_id,)) + db_connection.commit() + + +def update_user(user: User): + cursor = db_connection.cursor(dictionary=True) + + cursor.execute( + "update user set username=%s, displayname=%s, email=%s, password=%s where id = %s", + (user.username, user.displayname, user.email, user.password, user.user_id), + ) + db_connection.commit() diff --git a/app/doc/main_swag.py b/app/doc/main_swag.py index 652bb22..428443b 100644 --- a/app/doc/main_swag.py +++ b/app/doc/main_swag.py @@ -1,6 +1,6 @@ main_swagger = { "info": { - "title": "Shop API", + "title": "Swag Shop", "version": "0.1", "description": "Simple shop API using flask and co.\nFeatures include:\n- Not working\n- Successful registration of users\n- Adding items to cart\n- I don't know", }, diff --git a/app/doc/user_swag.py b/app/doc/user_swag.py index 0daf96b..26dcf32 100644 --- a/app/doc/user_swag.py +++ b/app/doc/user_swag.py @@ -1,101 +1,116 @@ register_swagger = { - "methods": ["POST"], - "tags": ["User"], - "description": "Registers a new user in the app. Also sends a notification to the user via the provided email", - "parameters": - [ - { - "in": "body", - "name": "body", - "description": "Username, displayname and password of the new user\n- Username can be only lowercase and up to 64 characters\n- Displayname can contain special characters (. _ -) and lower and upper characters\n- Password must be at least 8 characters long, contain both lower and upper characters, numbers and special characters\n- Email has to be in format \"name@domain.tld\" and up to 64 characters long in total", - "required": True, - "schema": - { - "type": "object", - "properties": - { - "username": {"type": "string", "example": "mycoolusername"}, - "email": {"type": "string", "example": "mymail@dot.com"}, - "displayname": {"type": "string", "example": "MyCoolDisplayName"}, - "password": {"type": "string", "example": "My5tr0ngP@55w0rd"} - } - } - } - ], + "methods": ["POST"], + "tags": ["User"], + "description": "Registers a new user in the app. Also sends a notification to the user via the provided email", + "parameters": [ + { + "in": "body", + "name": "body", + "description": 'Username, displayname and password of the new user\n- Username can be only lowercase and up to 64 characters\n- Displayname can contain special characters (. _ -) and lower and upper characters\n- Password must be at least 8 characters long, contain both lower and upper characters, numbers and special characters\n- Email has to be in format "name@domain.tld" and up to 64 characters long in total', + "required": True, + "schema": { + "type": "object", + "properties": { + "username": {"type": "string", "example": "mycoolusername"}, + "email": {"type": "string", "example": "mymail@dot.com"}, + "displayname": {"type": "string", "example": "MyCoolDisplayName"}, + "password": {"type": "string", "example": "My5tr0ngP@55w0rd"}, + }, + }, + } + ], } login_swagger = { - "methods": ["POST"], - "tags": ["User"], - "description": "Logs in using username and password and returns a JWT token for further authorization of requests.\n**The token is valid for 1 hour**", - "parameters": - [ - { - "in": "body", - "name": "body", - "description": "Username and password payload", - "required": True, - "schema": - { - "type": "object", - "properties": - { - "username": {"type": "string", "example": "mycoolusername"}, - "password": {"type": "string", "example": "MyStrongPassword123"} - } - } - } - ], - "responses": - { - "200": - { - "description": "Returns a fresh token", - "schema": - { - "type": "object", - "properties": { - "token": {"type": "string", "example": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJmcmVzaCI6ZmFsc2UsImlhdCI6MTcxMDMyMjkyOCwianRpIjoiZDFhYzQxZDktZjA4NC00MmYzLThlMWUtZWFmZjJiNGU1MDAyIiwidHlwZSI6ImFjY2VzcyIsInN1YiI6MjMwMDEsIm5iZiI6MTcxMDMyMjkyOCwiZXhwIjoxNzEwMzI2NTI4fQ.SW7LAi1j5vDOEIvzeN-sy0eHPP9PFJFkXYY029O35w0"} - } - } - }, - "400": - { - "description": "Possible causes:\n- Missing username or password from request.\n- Nonexistent username" - }, - "401": - { - "description": "Password is incorrect" - } - } + "methods": ["POST"], + "tags": ["User"], + "description": "Logs in using username and password and returns a JWT token for further authorization of requests.\n**The token is valid for 1 hour**", + "parameters": [ + { + "in": "body", + "name": "body", + "description": "Username and password payload", + "required": True, + "schema": { + "type": "object", + "properties": { + "username": {"type": "string", "example": "mycoolusername"}, + "password": {"type": "string", "example": "MyStrongPassword123"}, + }, + }, + } + ], + "responses": { + "200": { + "description": "Returns a fresh token", + "schema": { + "type": "object", + "properties": { + "token": { + "type": "string", + "example": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJmcmVzaCI6ZmFsc2UsImlhdCI6MTcxMDMyMjkyOCwianRpIjoiZDFhYzQxZDktZjA4NC00MmYzLThlMWUtZWFmZjJiNGU1MDAyIiwidHlwZSI6ImFjY2VzcyIsInN1YiI6MjMwMDEsIm5iZiI6MTcxMDMyMjkyOCwiZXhwIjoxNzEwMzI2NTI4fQ.SW7LAi1j5vDOEIvzeN-sy0eHPP9PFJFkXYY029O35w0", + } + }, + }, + }, + "400": { + "description": "Possible causes:\n- Missing username or password from request.\n- Nonexistent username" + }, + "401": {"description": "Password is incorrect"}, + }, } logout_swagger = { - "methods": ["DELETE"], - "tags": ["User"], - "security": [{"JWT": []}], - "description": "Logs out the user via provided JWT token", - "parameters": [], - "responses": - { - "200": - { - "description": "User successfully logged out" - } - } + "methods": ["DELETE"], + "tags": ["User"], + "security": [{"JWT": []}], + "description": "Logs out the user via provided JWT token", + "parameters": [], + "responses": {"200": {"description": "User successfully logged out"}}, } +update_swagger = { + "methods": ["PUT"], + "tags": ["User"], + "security": [{"JWT": []}], + "description": "Updates user attributes.", + "parameters": [ + { + "in": "body", + "name": "body", + "description": "Attributes to update for the user.", + "required": True, + "schema": { + "type": "object", + "properties": { + "new_username": {"type": "string", "example": "mycoolusername"}, + "new_email": {"type": "string", "example": "mymail@dot.com"}, + "new_displayname": { + "type": "string", + "example": "MyCoolDisplayName", + }, + "new_password": {"type": "string", "example": "My5tr0ngP@55w0rd"}, + }, + }, + } + ], + "responses": { + "200": {"description": "User attributes updated successfully."}, + "400": {"description": "Bad request. Check the request body for errors."}, + "401": {"description": "Unauthorized. User must be logged in."}, + "409": {"description": "Conflict. Check the response message for details."}, + "500": { + "description": "Internal server error. Contact the system administrator." + }, + }, +} + + delete_swagger = { - "methods": ["DELETE"], - "tags": ["User"], - "security": [{"JWT": []}], - "description": "Deletes a user via JWT token", - "parameters": [], - "responses": - { - "200": - { - "description": "User successfully deleted" - } - } -} \ No newline at end of file + "methods": ["DELETE"], + "tags": ["User"], + "security": [{"JWT": []}], + "description": "Deletes a user via JWT token", + "parameters": [], + "responses": {"200": {"description": "User successfully deleted"}}, +} diff --git a/app/extensions.py b/app/extensions.py index 7a370e2..dd4d11b 100644 --- a/app/extensions.py +++ b/app/extensions.py @@ -6,16 +6,16 @@ from app.config import RedisConfig from app.config import MySqlConfig db_connection = mysql.connector.connect( - host=MySqlConfig.MYSQL_HOST, - user=MySqlConfig.MYSQL_USER, - password=MySqlConfig.MYSQL_PASSWORD, - database=MySqlConfig.MYSQL_DATABASE, + host=MySqlConfig.MYSQL_HOST, + user=MySqlConfig.MYSQL_USER, + password=MySqlConfig.MYSQL_PASSWORD, + database=MySqlConfig.MYSQL_DATABASE, ) jwt_redis_blocklist = redis.StrictRedis( - host=RedisConfig.REDIS_HOST, - port=RedisConfig.REDIS_PORT, - password=RedisConfig.REDIS_PASSWORD, - db=0, - decode_responses=True + host=RedisConfig.REDIS_HOST, + port=RedisConfig.REDIS_PORT, + password=RedisConfig.REDIS_PASSWORD, + db=0, + decode_responses=True, ) diff --git a/app/jwt_utils.py b/app/jwt_utils.py index 96f050c..3fff27c 100644 --- a/app/jwt_utils.py +++ b/app/jwt_utils.py @@ -4,9 +4,10 @@ from . import jwt_manager from app import app + @jwt_manager.token_in_blocklist_loader def check_if_token_is_revoked(jwt_header, jwt_payload: dict) -> bool: - jti = jwt_payload["jti"] - token_in_redis = jwt_redis_blocklist.get(jti) + jti = jwt_payload["jti"] + token_in_redis = jwt_redis_blocklist.get(jti) - return token_in_redis is not None + return token_in_redis is not None diff --git a/app/mail/mail.py b/app/mail/mail.py index 487f948..6fe2e96 100644 --- a/app/mail/mail.py +++ b/app/mail/mail.py @@ -2,19 +2,16 @@ from flask_mail import Message from app import flask_mail -from app.mail.messages import messages +from app.mail.message_content import MessageContent -def send_mail(message: str, recipient: str): +def send_mail(message: MessageContent, recipient: str): - body = messages[message]["body"] - subject = messages[message]["subject"] + msg = Message(subject=message.subject, recipients=[recipient], body=message.body) - msg = Message(subject, recipients=[recipient], body=body) - - try: - flask_mail.send(msg) - return True - except Exception as e: - print(f"Failed to send email. Error: {e}") - return False \ No newline at end of file + try: + flask_mail.send(msg) + return True + except Exception as e: + print(f"Failed to send email. Error: {e}") + return False diff --git a/app/mail/message_content.py b/app/mail/message_content.py new file mode 100644 index 0000000..1d787e9 --- /dev/null +++ b/app/mail/message_content.py @@ -0,0 +1,4 @@ +class MessageContent: + def __init__(self, subject, body): + self.subject = subject + self.body = body diff --git a/app/mail/messages.py b/app/mail/messages.py deleted file mode 100644 index 895c810..0000000 --- a/app/mail/messages.py +++ /dev/null @@ -1,24 +0,0 @@ -import datetime - -messages = { - "register": { - "subject": "Successfully registered!", - "body": "Congratulations! Your account has been successfully created.\nThis mail also serves as a test that the email address is correct" - }, - "login": { - "subject": "New Login detected!", - "body": "A new login token has been created" - }, - "logout": { - "subject": "Successfully logged out", - "body": "A login has been revoked. No further action is needed." - }, - "update": { - "subject": "Account updated", - "body": "Your account has been successfully updated. This also means you have been logged out of everywhere" - }, - "delete": { - "subject": "Account Deleted!", - "body": "Your account has been deleted. No further action needed" - } -} \ No newline at end of file diff --git a/app/messages/api_errors.py b/app/messages/api_errors.py new file mode 100644 index 0000000..c95f7f9 --- /dev/null +++ b/app/messages/api_errors.py @@ -0,0 +1,15 @@ +NOT_JSON = {"msg": "Request body must be JSON"}, 400 + + +def UNKNOWN_DATABASE_ERROR(e): + return {"msg": f"An unknown error occurred within the database. {e}"}, 500 + + +def MISSING_FIELDS(fields): + return {"msg": f"Missing required fields: {', '.join(fields)}"}, 400 + + +def NO_FIELD_PROVIDED(possible_fields): + return { + "msg": f"No field was provided. At least one of the following is required: {', '.join(possible_fields)}" + }, 400 diff --git a/app/messages/api_responses/product_responses.py b/app/messages/api_responses/product_responses.py new file mode 100644 index 0000000..117db98 --- /dev/null +++ b/app/messages/api_responses/product_responses.py @@ -0,0 +1,6 @@ +PRODUCT_LISTING_CREATED_SUCCESSFULLY = {"msg": "Successfully created a brand new product."}, 201 + +NOT_OWNER_OF_PRODUCT = {"msg": "You don't own this product, therefore you cannot delete it!"}, 400 +UNKNOWN_PRODUCT = {"msg": "The product you tried fetching is not known. Try a different product ID."}, 400 + +SCROLLED_TOO_FAR = {"msg": "You scrolled too far in the pages. Try going back a little again."}, 400 \ No newline at end of file diff --git a/app/messages/api_responses/user_responses.py b/app/messages/api_responses/user_responses.py new file mode 100644 index 0000000..6749600 --- /dev/null +++ b/app/messages/api_responses/user_responses.py @@ -0,0 +1,26 @@ +USER_CREATED_SUCCESSFULLY = {"msg": "User created successfully."}, 201 +USER_LOGGED_OUT_SUCCESSFULLY = {"msg": "Successfully logged out"}, 200 +USER_DELETED_SUCCESSFULLY = {"msg": "User successfully deleted"}, 200 + + +def USER_ACCOUNT_UPDATED_SUCCESSFULLY(updated_attributes): + return {"msg": f"Successfully updated your accounts {', '.join(updated_attributes)}"}, 200 + +INVALID_USERNAME_FORMAT = { + "msg": "Username is in incorrect format. It must be between 1 and 64 lowercase characters." +}, 400 +INVALID_DISPLAYNAME_FORMAT = { + "msg": "Display name is in incorrect format. It must contain only letters, '.', '-', or '_' and be between 1 and 64 characters." +}, 400 +INVALID_EMAIL_FORMAT = {"msg": "Email is in incorrect format."}, 400 +INVALID_PASSWORD_FORMAT = { + "msg": "Password is in incorrect format. It must be between 8 and 64 characters and contain at least one uppercase letter, one lowercase letter, one digit, and one special character" +}, 400 + +EMAIL_ALREADY_IN_USE = {"msg": "Email already in use."}, 409 +USERNAME_ALREADY_IN_USE = {"msg": "Username already in use."}, 409 + +USERNAME_NOT_FOUND = {"msg": "Username not found"}, 400 +INCORRECT_PASSWORD = {"msg": "Incorrect password"}, 401 + +UNKNOWN_ERROR = {"msg": "An unknown error occurred with user"}, 500 diff --git a/app/messages/mail_responses/user_email.py b/app/messages/mail_responses/user_email.py new file mode 100644 index 0000000..9772315 --- /dev/null +++ b/app/messages/mail_responses/user_email.py @@ -0,0 +1,26 @@ +from app.mail.message_content import MessageContent + +USER_EMAIL_SUCCESSFULLY_REGISTERED = MessageContent( + subject="Successfully registered!", + body="Congratulations! Your account has been successfully created.\nThis mail also serves as a test that the email address is correct", +) + +USER_EMAIL_SUCCESSFULLY_LOGGED_IN = MessageContent( + subject="New Login detected!", + body="A new login token has been created", +) + +USER_EMAIL_SUCCESSFULLY_LOGGED_OUT = MessageContent( + subject="Successfully logged out", + body="A login has been revoked. No further action is needed.", +) + +USER_EMAIL_SUCCESSFULLY_UPDATED_ACCOUNT = MessageContent( + subject="Account updated", + body="Your account has been successfully updated. This also means you have been logged out of everywhere", +) + +USER_EMAIL_SUCCESSFULLY_DELETED_ACCOUNT = MessageContent( + subject="Account Deleted!", + body="Your account has been deleted. No further action needed", +) \ No newline at end of file diff --git a/app/models/cart_model.py b/app/models/cart_model.py new file mode 100644 index 0000000..b61802c --- /dev/null +++ b/app/models/cart_model.py @@ -0,0 +1,63 @@ +from datetime import datetime + +class Cart: + """ + Represents a cart in the system. + + :param id: The unique identifier of the cart. + :type id: int + :param price_total: The total price of the cart. + :type price_total: float + :param item_count: The count of items in the cart. + :type item_count: int + """ + + def __init__( + self, + cart_id: int = None, + price_total: float = 0.00, + item_count: int = 0, + ): + self.id = cart_id + self.price_total = price_total + self.item_count = item_count + + def __repr__(self): + return f"Cart(id={self.id}, price_total={self.price_total}, item_count={self.item_count})" + +class CartItem: + """ + Represents a cart item in the system. + + :param id: The unique identifier of the cart item. + :type id: int + :param cart_id: The identifier of the cart. + :type cart_id: int + :param product_id: The identifier of the product. + :type product_id: int + :param count: The count of the product in the cart. + :type count: int + :param price_subtotal: The subtotal price of the product in the cart. + :type price_subtotal: float + :param date_added: The date and time when the item was added to the cart. + :type date_added: datetime + """ + + def __init__( + self, + cart_item_id: int = None, + cart_id: int = None, + product_id: int = None, + count: int = 0, + price_subtotal: float = 0.00, + date_added: datetime = None, + ): + self.id = cart_item_id + self.cart_id = cart_id + self.product_id = product_id + self.count = count + self.price_subtotal = price_subtotal + self.date_added = date_added or datetime.now() + + def __repr__(self): + return f"CartItem(id={self.id}, cart_id={self.cart_id}, product_id={self.product_id}, count={self.count}, price_subtotal={self.price_subtotal}, date_added={self.date_added})" \ No newline at end of file diff --git a/app/models/product_model.py b/app/models/product_model.py new file mode 100644 index 0000000..eac7e6e --- /dev/null +++ b/app/models/product_model.py @@ -0,0 +1,38 @@ +from datetime import datetime +from decimal import Decimal + + +class Product: + """ + Represents a product in the system. + + :param id: The unique identifier of the product. + :type id: int + :param seller_id: The user ID of the seller. + :type seller_id: int + :param name: The name of the product. + :type name: str + :param price: The price of the product. + :type price: Decimal + :param creation_date: The date and time when the product was created. + :type creation_date: datetime + """ + + def __init__( + self, + product_id: int = None, + seller_id: int = None, + seller_name: str = None, + name: str = None, + price: Decimal = None, + creation_date: datetime = None, + ): + self.product_id = product_id + self.seller_id = seller_id + self.seller_name = seller_name + self.name = name + self.price = price + self.creation_date = creation_date + + def __repr__(self): + return f"Product(product_id={self.product_id}, seller_id={self.seller_id}, seller_name={self.seller_name}, name='{self.name}', price={self.price}, creation_date={self.creation_date!r})" diff --git a/app/models/user_model.py b/app/models/user_model.py new file mode 100644 index 0000000..99b8612 --- /dev/null +++ b/app/models/user_model.py @@ -0,0 +1,44 @@ +from datetime import datetime + + +class User: + """ + Represents a user in the system. + + :param user_id: The unique identifier of the user. + :type user_id: int + :param username: The username of the user. + :type username: str + :param displayname: The display name of the user. + :type displayname: str + :param email: The email address of the user. + :type email: str + :param password: The hashed password of the user. + :type password: str + :param role_id: The role ID of the user. Defaults to 1. + :type role_id: int + :param creation_date: The date and time when the user was created. + :type creation_date: datetime + + """ + + def __init__( + self, + user_id: str = None, + username: str = None, + displayname: str = None, + email: str = None, + password: str = None, + role_id: int = 1, + creation_date: datetime = None, + ): + self.user_id = user_id + self.username = username + self.displayname = displayname + self.email = email + self.password = password + self.role_id = role_id + self.creation_date = creation_date + + def __repr__(self): + return f"User(id={self.user_id}, username={self.username}, displayname={self.displayname}, email={self.email}, password={self.password}, role_id={self.role_id}, creation_date={self.creation_date})" diff --git a/app/services/cart_service.py b/app/services/cart_service.py index 887e047..5d96ee3 100644 --- a/app/services/cart_service.py +++ b/app/services/cart_service.py @@ -3,180 +3,160 @@ from typing import Tuple, Union from app.extensions import db_connection + class CartService: - @staticmethod - def add_to_cart(user_id: str, product_id: int, count: int) -> Tuple[Union[dict, str], int]: - """ - Adds a product to a user's cart. + @staticmethod + @staticmethod + def update_count( + user_id: str, product_id: int, count: int + ) -> Tuple[Union[dict, str], int]: + """ + Updates count of products in user's cart - :param user_id: User ID. - :type user_id: str - :param product_id: ID of product to be added. - :type product_id: int - :return: Tuple containing a dictionary with a token and an HTTP status code. - :rtype: Tuple[Union[dict, str], int] - """ + :param user_id: User ID. + :type user_id: str + :param product_id: ID of product to be updated. + :type product_id: int + :param count: New count of products + :type count: int + :return: Tuple containing a dictionary with a token and an HTTP status code. + :rtype: Tuple[Union[dict, str], int] + """ - try: - with db_connection.cursor(dictionary=True) as cursor: - cursor.execute("select count from cart_item where cart_id = %s and product_id = %s", (user_id, product_id)) - result = cursor.fetchone() + try: + if count <= 0: + return CartService.delete_from_cart(user_id, product_id) - if cursor.rowcount == 1: - cursor.execute("update cart_item set count = count + %s where cart_id = %s and product_id = %s", (count, user_id, product_id)) - else: - cursor.execute("insert into cart_item(cart_id, product_id, count) values (%s, %s, %s)", (user_id, product_id, count)) + with db_connection.cursor(dictionary=True) as cursor: + cursor.execute( + "update cart_item set count = %s where cart_id = %s and product_id = %s", + (count, user_id, product_id), + ) + db_connection.commit() - db_connection.commit() + return {"Success": "Successfully added to cart"}, 200 + except Error as e: + return {"Failed": f"Failed to update item count in cart. Reason: {e}"}, 500 - except Error as e: - return {"Failed": f"Failed to add item to cart. Reason: {e}"}, 500 + @staticmethod + def delete_from_cart(user_id: str, product_id: int) -> Tuple[Union[dict, str], int]: + """ + Completely deletes an item from a user's cart - return {"Success": "Successfully added to cart"}, 200 + :param user_id: User ID. + :type user_id: str + :param product_id: ID of product to be updated. + :type product_id: int + :return: Tuple containing a dictionary with a token and an HTTP status code. + :rtype: Tuple[Union[dict, str], int] + """ - @staticmethod - def update_count(user_id: str, product_id: int, count: int) -> Tuple[Union[dict, str], int]: - """ - Updates count of products in user's cart + try: + with db_connection.cursor() as cursor: + cursor.execute( + "delete from cart_item where cart_id = %s and product_id = %s", + (user_id, product_id), + ) + db_connection.commit() - :param user_id: User ID. - :type user_id: str - :param product_id: ID of product to be updated. - :type product_id: int - :param count: New count of products - :type count: int - :return: Tuple containing a dictionary with a token and an HTTP status code. - :rtype: Tuple[Union[dict, str], int] - """ + return {"Success": "Successfully removed item from cart"}, 200 + except Error as e: + return {"Failed": f"Failed to remove item from cart. Reason: {e}"}, 500 - try: - if count <= 0: - return CartService.delete_from_cart(user_id, product_id) + @staticmethod + def show_cart(user_id: str) -> Tuple[Union[dict, str], int]: + """ + Gives the user the content of their cart - with db_connection.cursor(dictionary=True) as cursor: - cursor.execute("update cart_item set count = %s where cart_id = %s and product_id = %s", (count, user_id, product_id)) - db_connection.commit() + :param user_id: User ID. + :type user_id: str + :return: Tuple containing a dictionary with a token and an HTTP status code. + :rtype: Tuple[Union[dict, str], int] + """ - return {"Success": "Successfully added to cart"}, 200 + try: + with db_connection.cursor(dictionary=True) as cursor: + cursor.execute( + "select product.name as product_name, count, price_subtotal, date_added from cart_item inner join product on cart_item.product_id = product.id where cart_item.cart_id = %s", + (user_id,), + ) + rows = cursor.fetchall() - except Error as e: - return {"Failed": f"Failed to update item count in cart. Reason: {e}"}, 500 + results = [] - @staticmethod - def delete_from_cart(user_id: str, product_id: int) -> Tuple[Union[dict, str], int]: - """ - Completely deletes an item from a user's cart + for row in rows: + mid_result = { + "name": row["product_name"], + "count": row["count"], + "price_subtotal": row["price_subtotal"], + "date_added": row["date_added"], + } - :param user_id: User ID. - :type user_id: str - :param product_id: ID of product to be updated. - :type product_id: int - :return: Tuple containing a dictionary with a token and an HTTP status code. - :rtype: Tuple[Union[dict, str], int] - """ + results.append(mid_result) + return results, 200 - try: - with db_connection.cursor() as cursor: - cursor.execute("delete from cart_item where cart_id = %s and product_id = %s", (user_id, product_id)) - db_connection.commit() + except Error as e: + return {"Failed": f"Failed to load cart. Reason: {e}"}, 500 - return {"Success": "Successfully removed item from cart"}, 200 - except Error as e: - return {"Failed": f"Failed to remove item from cart. Reason: {e}"}, 500 + @staticmethod + def purchase(user_id: str) -> Tuple[Union[dict, str], int]: + """ + "Purchases" the contents of user's cart + :param user_id: User ID. + :type user_id: str + :return: Tuple containing a dictionary with a token and an HTTP status code. + :rtype: Tuple[Union[dict, str], int] + """ - @staticmethod - def show_cart(user_id: str) -> Tuple[Union[dict, str], int]: - """ - Gives the user the content of their cart - - :param user_id: User ID. - :type user_id: str - :return: Tuple containing a dictionary with a token and an HTTP status code. - :rtype: Tuple[Union[dict, str], int] - """ + try: + with db_connection.cursor(dictionary=True) as cursor: + # get all cart items + cursor.execute( + "select id, product_id, count, price_subtotal from cart_item where cart_id = %s", + (user_id,), + ) + results = cursor.fetchall() - try: - with db_connection.cursor(dictionary=True) as cursor: - cursor.execute("select product.name as product_name, count, price_subtotal, date_added from cart_item inner join product on cart_item.product_id = product.id where cart_item.cart_id = %s", (user_id,)) - rows = cursor.fetchall() + if len(results) < 1: + return {"Failed": "Failed to purchase. Cart is Empty"}, 400 - results = [] + # create a purchase + cursor.execute("insert into purchase(user_id) values (%s)", (user_id,)) - for row in rows: - mid_result = { - "name": row['product_name'], - "count": row['count'], - "price_subtotal": row['price_subtotal'], - "date_added": row['date_added'] - } + last_id = cursor.lastrowid - results.append(mid_result) + parsed = [] + ids = [] - return results, 200 + for row in results: + mid_row = ( + last_id, + row["product_id"], + row["count"], + row["price_subtotal"], + ) - except Error as e: - return {"Failed": f"Failed to load cart. Reason: {e}"}, 500 + row_id = row["id"] + parsed.append(mid_row) + ids.append(row_id) - @staticmethod - def purchase(user_id: str) -> Tuple[Union[dict, str], int]: - """ - "Purchases" the contents of user's cart + insert_query = "INSERT INTO purchase_item (purchase_id, product_id, count, price_subtotal) VALUES (%s, %s, %s, %s)" + for row in parsed: + cursor.execute(insert_query, row) - :param user_id: User ID. - :type user_id: str - :return: Tuple containing a dictionary with a token and an HTTP status code. - :rtype: Tuple[Union[dict, str], int] - """ - - try: - with db_connection.cursor(dictionary=True) as cursor: - # get all cart items - cursor.execute("select id, product_id, count, price_subtotal from cart_item where cart_id = %s", (user_id,)) - results = cursor.fetchall() + delete_query = "delete from cart_item where id = %s" + for one_id in ids: + cursor.execute(delete_query, (one_id,)) - if len(results) < 1: - return {"Failed": "Failed to purchase. Cart is Empty"}, 400 + db_connection.commit() - # create a purchase - cursor.execute("insert into purchase(user_id) values (%s)", (user_id,)) + # clear cart + except Error as e: + return {"msg": f"Failed to load cart. Reason: {e}"}, 500 - last_id = cursor.lastrowid - - parsed = [] - ids = [] - - for row in results: - mid_row = ( - last_id, - row['product_id'], - row['count'], - row['price_subtotal'] - ) - - row_id = row['id'] - - parsed.append(mid_row) - ids.append(row_id) - - insert_query = "INSERT INTO purchase_item (purchase_id, product_id, count, price_subtotal) VALUES (%s, %s, %s, %s)" - for row in parsed: - cursor.execute(insert_query, row) - - delete_query = "delete from cart_item where id = %s" - for one_id in ids: - cursor.execute(delete_query, (one_id,)) - - db_connection.commit() - - - # clear cart - except Error as e: - return {"msg": f"Failed to load cart. Reason: {e}"}, 500 - - return {"msg": "Successfully purchased"}, 200 - + return {"msg": "Successfully purchased"}, 200 diff --git a/app/services/product/product_create_service.py b/app/services/product/product_create_service.py new file mode 100644 index 0000000..2ef070a --- /dev/null +++ b/app/services/product/product_create_service.py @@ -0,0 +1,30 @@ +from mysql.connector import Error as mysqlError + +from app.messages.api_responses import product_responses as response +import app.messages.api_errors as errors + +from app.db import product_db + +from app.models.product_model import Product + + +def create_product(seller_id: str, name: str, price: float): + """ + Creates a new product listing + + :param seller_id: User ID + :type seller_id: str + :param name: New product's name + :type name: str + :param price: New product's price + :type price: float + """ + + product: Product = Product(seller_id=seller_id, name=name, price=price) + try: + product_db.insert_product(product) + + except mysqlError as e: + return errors.UNKNOWN_DATABASE_ERROR(e) + + return response.PRODUCT_LISTING_CREATED_SUCCESSFULLY diff --git a/app/services/product/product_delete_service.py b/app/services/product/product_delete_service.py new file mode 100644 index 0000000..cc4e912 --- /dev/null +++ b/app/services/product/product_delete_service.py @@ -0,0 +1,23 @@ +from mysql.connector import Error as mysqlError + +from app.messages.api_responses import product_responses as response +import app.messages.api_errors as errors + +from app.db import product_db + +from app.models.product_model import Product + + +def delete_product(seller_id: str, product_id: str): + product: Product = product_db.fetch_product_by_id(product_id) + + if product.seller_id != seller_id: + return response.NOT_OWNER_OF_PRODUCT + + try: + product_db.delete_product(product) + + except mysqlError as e: + return errors.UNKNOWN_DATABASE_ERROR(e) + + return response.PRODUCT_LISTING_CREATED_SUCCESSFULLY diff --git a/app/services/product/product_helper.py b/app/services/product/product_helper.py new file mode 100644 index 0000000..55bc48e --- /dev/null +++ b/app/services/product/product_helper.py @@ -0,0 +1,8 @@ +import imghdr + +def is_base64_jpg(decoded_string) -> bool: + try: + image_type = imghdr.what(None, decoded_string) + return image_type == "jpeg" + except Exception: + return False diff --git a/app/services/product/product_info_service.py b/app/services/product/product_info_service.py new file mode 100644 index 0000000..5f13688 --- /dev/null +++ b/app/services/product/product_info_service.py @@ -0,0 +1,20 @@ +from mysql.connector import Error as mysqlError + +from app.messages.api_responses import product_responses as response +import app.messages.api_errors as errors + +from app.db import product_db + +from app.models.product_model import Product + + +def product_info(product_id: int): + try: + product: Product = product_db.fetch_product_extended_by_id(product_id) + + if product is None: + return response.UNKNOWN_PRODUCT + + return product, 200 + except mysqlError as e: + return errors.UNKNOWN_DATABASE_ERROR(e) diff --git a/app/services/product/product_list_service.py b/app/services/product/product_list_service.py new file mode 100644 index 0000000..ad0ee77 --- /dev/null +++ b/app/services/product/product_list_service.py @@ -0,0 +1,29 @@ +from mysql.connector import Error as mysqlError + +from app.messages.api_responses import product_responses as response +import app.messages.api_errors as errors + +from app.db import product_db + +def product_list(page: int): + try: + result_products = product_db.fetch_products(page) + + if result_products is None: + return response.SCROLLED_TOO_FAR + + result_obj = [] + for product in result_products: + mid_result = { + "id": product.product_id, + "seller": product.seller_id, + "name": product.name, + "price": product.price, + } + + result_obj.append(mid_result) + + return result_obj, 200 + + except mysqlError as e: + errors.UNKNOWN_DATABASE_ERROR(e) diff --git a/app/services/product_service.py b/app/services/product_service.py deleted file mode 100644 index a60dab0..0000000 --- a/app/services/product_service.py +++ /dev/null @@ -1,127 +0,0 @@ -import base64 - -from flask import abort -from mysql.connector import Error - -from decimal import Decimal - -from app.extensions import db_connection - -class ProductService: - - @staticmethod - def get_products(page: int): - """ - Fetches 10 products - - :param page: Page, aka offset of fetching - :type page: int - """ - - try: - with db_connection.cursor(dictionary=True) as cursor: - - offset = 10 * page - cursor.execute("select product.id, user.displayname as seller, product.name, product.price_pc from product inner join user on user.id = product.seller_id order by product.id limit 10 offset %s", (offset,)) - results = cursor.fetchall() - - if len(results) < 1: - return {"msg": "Failed to fetch products. You've probably selected too far with pages"}, 400 - - result_obj = [] - for row in results: - mid_result = { - "id": row['id'], - "seller": row['seller'], - "name": row['name'], - "price": row['price_pc'] - } - - result_obj.append(mid_result) - - return result_obj, 200 - - except Error as e: - return {"msg": f"Failed to fetch products. Error: {e}"}, 500 - - @staticmethod - def get_product_info(fields: list[str], product_id: int): - """ - Fetches specific product info - - :param fields: array of fields that can be fetched - :type fields: list[str] - :param product_id: ID of product to be updated. - :type product_id: int - """ - - try: - with db_connection.cursor(dictionary=True) as cursor: - fields_to_sql = { - "name": "product.name", - "price": "product.price_pc as price", - "image": "product.image", - "image_name": "product.image_name", - "seller": "user.displayname as seller" - } - field_sql = [] - - for field in fields: - field_sql.append(fields_to_sql[field]) - - select_params = ", ".join(field_sql) - - if "seller" in fields: - cursor.execute(f"select {select_params} from product inner join user on user.id = product.seller_id where product.id = %s", (product_id,)) - else: - cursor.execute(f"select {select_params} from product where id = %s", (product_id,)) - - result = cursor.fetchone() - - if cursor.rowcount != 1: - return {"msg": "Failed to fetch product. Product likely doesn't exist"}, 400 - - result_obj = {} - - for field in fields: - if field == "image": # Encode image into base64 - result_obj[field] = base64.b64encode(result[field]).decode('utf-8') - else: - result_obj[field] = result[field] - - return result_obj, 200 - except Error as e: - return {"msg": f"Failed to fetch product info. Error: {e}"}, 500 - - @staticmethod - def create_listing(seller_id: str, name: str, price: float): - """ - Creates a new product listing - - :param seller_id: User ID - :type seller_id: str - :param name: New product's name - :type name: str - :param price: New product's price - :type price: float - """ - - try: - with db_connection.cursor() as cursor: - rounded_price = round(price, 2) - - cursor.execute("insert into product(seller_id, name, price_pc) values (%s, %s, %s)", (seller_id, name, Decimal(str(rounded_price)))) - db_connection.commit() - except Error as e: - return {"msg": f"Failed to create product. {e}"}, 400 - - return {"msg": "Successfully created new product listing"}, 200 - - @staticmethod - def __is_base64_jpg(decoded_string): - try: - # Checking if the decoded data represents a valid JPEG image - image_type = imghdr.what(None, decoded_data) - return image_type == 'jpeg' - except Exception: - return False \ No newline at end of file diff --git a/app/services/user/delete_service.py b/app/services/user/delete_service.py new file mode 100644 index 0000000..ecee684 --- /dev/null +++ b/app/services/user/delete_service.py @@ -0,0 +1,31 @@ +from typing import Tuple, Union +from mysql.connector import Error as mysqlError + +import app.db.user_db as user_db +from app.models.user_model import User + +import app.messages.api_responses.user_responses as response +import app.messages.api_errors as errors +from app.mail.mail import send_mail + + +from app.messages.mail_responses.user_email import USER_EMAIL_SUCCESSFULLY_DELETED_ACCOUNT + +def delete_user(user_id: str) -> Tuple[Union[dict, str], int]: + """ + Deletes a user account. + + :param user_id: User ID. + :type user_id: str + :return: Tuple containing a dictionary and an HTTP status code. + :rtype: Tuple[Union[dict, str], int] + """ + + try: + user: User = user_db.fetch_by_id(user_id=user_id) + user_db.delete_user(user) + send_mail(USER_EMAIL_SUCCESSFULLY_DELETED_ACCOUNT, user.email) + except mysqlError as e: + return errors.UNKNOWN_DATABASE_ERROR(e) + + return response.USER_DELETED_SUCCESSFULLY diff --git a/app/services/user/login_service.py b/app/services/user/login_service.py new file mode 100644 index 0000000..c57b01b --- /dev/null +++ b/app/services/user/login_service.py @@ -0,0 +1,45 @@ +import datetime +from typing import Tuple, Union + +import bcrypt +from mysql.connector import Error as mysqlError +from flask_jwt_extended import create_access_token + +import app.messages.api_responses.user_responses as response +import app.messages.api_errors as errors +from app.db import user_db +from app.mail.mail import send_mail +from app.models.user_model import User +from app.messages.mail_responses.user_email import USER_EMAIL_SUCCESSFULLY_LOGGED_IN + + + +def login(username: str, password: str) -> Tuple[Union[dict, str], int]: + """ + Authenticates a user with the provided username and password. + + :param username: User's username. + :type username: str + :param password: User's password. + :type password: str + :return: Tuple containing a dictionary with a token and an HTTP status code. + :rtype: Tuple[Union[dict, str], int] + """ + try: + user: User = user_db.fetch_by_username(username) + + if user is None: + return response.USERNAME_NOT_FOUND + + if not bcrypt.checkpw(password.encode("utf-8"), user.password.encode("utf-8")): + return response.INCORRECT_PASSWORD + + expire = datetime.timedelta(hours=1) + token = create_access_token(identity=user.user_id, expires_delta=expire) + + send_mail(USER_EMAIL_SUCCESSFULLY_LOGGED_IN, user.email) + + return {"token": token}, 200 + + except mysqlError as e: + return errors.UNKNOWN_DATABASE_ERROR(e) diff --git a/app/services/user/logout_service.py b/app/services/user/logout_service.py new file mode 100644 index 0000000..05cf185 --- /dev/null +++ b/app/services/user/logout_service.py @@ -0,0 +1,31 @@ +from typing import Tuple, Union + +import app.messages.api_responses.user_responses as response +from app.db import user_db +from app.models.user_model import User +from app.mail.mail import send_mail +from app.services.user import user_helper as helper +from app.messages.mail_responses.user_email import USER_EMAIL_SUCCESSFULLY_LOGGED_OUT + + +def logout(jwt_token, user_id, send_notif: bool) -> Tuple[Union[dict, str], int]: + """ + Logs out a user by invalidating the provided JWT. + + :param jti: JWT ID. + :type jti: str + :param exp: JWT expiration timestamp. + :type exp: int + :return: Tuple containing a dictionary and an HTTP status code. + :rtype: Tuple[Union[dict, str], int] + """ + + jti = jwt_token["jti"] + exp = jwt_token["exp"] + + user: User = user_db.fetch_by_id(user_id) + + helper.invalidate_token(jti, exp) + if send_notif: + send_mail(USER_EMAIL_SUCCESSFULLY_LOGGED_OUT, user.email) + return response.USER_LOGGED_OUT_SUCCESSFULLY diff --git a/app/services/user/register_service.py b/app/services/user/register_service.py new file mode 100644 index 0000000..c050caf --- /dev/null +++ b/app/services/user/register_service.py @@ -0,0 +1,64 @@ +import bcrypt +from typing import Tuple, Union +from mysql.connector import Error as mysqlError + +import app.messages.api_responses.user_responses as response +import app.messages.api_errors as errors +from app.db import user_db +from app.mail.mail import send_mail +from app.models.user_model import User +from app.services.user import user_helper as helper +from app.messages.mail_responses.user_email import USER_EMAIL_SUCCESSFULLY_REGISTERED + + +def register( + username: str, displayname: str, email: str, password: str +) -> Tuple[Union[dict, str], int]: + """ + Registers a new user with the provided username, email, and password. + + :param username: User's username. + :type username: str + :param email: User's email address. + :type email: str + :param password: User's password. + :type password: str + :return: Tuple containing a dictionary and an HTTP status code. + :rtype: Tuple[Union[dict, str], int] + """ + + try: + if not helper.verify_username(username): + return response.INVALID_USERNAME_FORMAT + + if not helper.verify_displayname(displayname): + return response.INVALID_DISPLAYNAME_FORMAT + + if not helper.verify_email(email): + return response.INVALID_EMAIL_FORMAT + + if not helper.verify_password(password): + return response.INVALID_PASSWORD_FORMAT + + hashed_password = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()) + + new_user: User = User( + username=username, + displayname=displayname, + email=email, + password=hashed_password, + ) + + user_db.insert_user(new_user) + + except mysqlError as e: + if "email" in e.msg: + return response.EMAIL_ALREADY_IN_USE + if "username" in e.msg: + return response.USERNAME_ALREADY_IN_USE + + return errors.UNKNOWN_DATABASE_ERROR(e) + + send_mail(USER_EMAIL_SUCCESSFULLY_REGISTERED, new_user.email) + + return response.USER_CREATED_SUCCESSFULLY diff --git a/app/services/user/update_user_service.py b/app/services/user/update_user_service.py new file mode 100644 index 0000000..0e35288 --- /dev/null +++ b/app/services/user/update_user_service.py @@ -0,0 +1,72 @@ +import bcrypt +from typing import Tuple, Union +from mysql.connector import Error as mysqlError + +from app.db import user_db +from app.mail.mail import send_mail +from app.models.user_model import User +from app.services.user import user_helper as helper +from app.messages.api_responses import user_responses as response +import app.messages.api_errors as errors +from app.messages.mail_responses.user_email import ( + USER_EMAIL_SUCCESSFULLY_UPDATED_ACCOUNT, +) + + +def update_user( + user_id: str, + new_username: str = None, + new_displayname: str = None, + new_email: str = None, + new_password: str = None, +) -> Tuple[Union[dict, str], int]: + user: User = user_db.fetch_by_id(user_id) + + updated_attributes = [] + + if user is None: + return response.UNKNOWN_ERROR + + if new_username: + if not helper.verify_username(new_username): + return response.INVALID_USERNAME_FORMAT + + user.username = new_username + updated_attributes.append("username") + + if new_displayname: + if not helper.verify_displayname(new_displayname): + return response.INVALID_DISPLAYNAME_FORMAT + + user.displayname = new_displayname + updated_attributes.append("displayname") + + if new_email: + if not helper.verify_email(new_email): + return response.INVALID_EMAIL_FORMAT + + user.email = new_email + updated_attributes.append("email") + + if new_password: + if not helper.verify_password(new_password): + return response.INVALID_PASSWORD_FORMAT + + hashed_password = bcrypt.hashpw(new_password.encode("utf-8"), bcrypt.gensalt()) + + user.password = hashed_password + updated_attributes.append("password") + + try: + user_db.update_user(user) + + except mysqlError as e: + if "username" in e.msg: + return response.USERNAME_ALREADY_IN_USE + if "email" in e.msg: + return response.EMAIL_ALREADY_IN_USE + + return errors.UNKNOWN_DATABASE_ERROR(e) + + send_mail(USER_EMAIL_SUCCESSFULLY_UPDATED_ACCOUNT, user.email) + return response.USER_ACCOUNT_UPDATED_SUCCESSFULLY(updated_attributes) diff --git a/app/services/user/user_helper.py b/app/services/user/user_helper.py new file mode 100644 index 0000000..b64b1a4 --- /dev/null +++ b/app/services/user/user_helper.py @@ -0,0 +1,74 @@ +import re +from datetime import datetime + +from app.extensions import jwt_redis_blocklist + + +def invalidate_token(jti: str, exp: int): + """ + Invalidates a JWT by adding its JTI to the Redis blocklist. + + :param jti: JWT ID. + :type jti: str + :param exp: JWT expiration timestamp. + :type exp: int + """ + expiration = datetime.fromtimestamp(exp) + now = datetime.now() + + delta = expiration - now + jwt_redis_blocklist.set(jti, "", ex=delta) + + +def verify_email(email: str) -> bool: + """ + Verifies a given email string against a regular expression. + + :param email: Email string. + :type email: str + :return: Boolean indicating whether the email successfully passed the check. + :rtype: bool + """ + email_regex = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" + return re.match(email_regex, email) and len(email) <= 64 + + +def verify_displayname(displayname: str) -> bool: + """ + Verifies a given display name string against a regular expression. + + :param displayname: Display name string. + :type displayname: str + :return: Boolean indicating whether the display name successfully passed the check. + :rtype: bool + """ + displayname_regex = r"^[a-zA-Z.-_]{1,64}$" + return re.match(displayname_regex, displayname) + + +def verify_username(username: str) -> bool: + """ + Verifies a given username string against a regular expression. + + :param username: Username string. + :type username: str + :return: Boolean indicating whether the username successfully passed the check. + :rtype: bool + """ + username_regex = r"^[a-z]{1,64}$" + return re.match(username_regex, username) + + +def verify_password(password: str) -> bool: + """ + Verifies a given password string against a regular expression. + + :param password: Password string. + :type password: str + :return: Boolean indicating whether the password successfully passed the check. + :rtype: bool + """ + password_regex = ( + r"^(?=.*?[A-Z])(?=.*?[a-z])(?=.*?[0-9])(?=.*?[#?!@$ %^&*-]).{8,64}$" + ) + return re.match(password_regex, password) diff --git a/app/services/user_service.py b/app/services/user_service.py deleted file mode 100644 index 10b23ca..0000000 --- a/app/services/user_service.py +++ /dev/null @@ -1,326 +0,0 @@ -import bcrypt -import re -import jwt -import datetime -from typing import Tuple, Union -from mysql.connector import Error -from flask_jwt_extended import create_access_token - -from app.extensions import db_connection -from app.extensions import jwt_redis_blocklist - -from app.mail.mail import send_mail - - -class UserService: - """ - UserService class provides methods for user-related operations. - - Methods: - - register(username: str, email: str, password: str) -> Tuple[Union[dict, str], int] - - login(username: str, password: str) -> Tuple[Union[dict, str], int] - - logout(jti, exp) -> Tuple[Union[dict, str], int] - - update_email(user_id: str, new_email: str) -> Tuple[Union[dict, str], int] - - update_username(user_id: str, new_username: str) -> Tuple[Union[dict, str], int] - - update_password(user_id: str, new_password: str) -> Tuple[Union[dict, str], int] - """ - - @staticmethod - def register(username: str, displayname: str, email: str, password: str) -> Tuple[Union[dict, str], int]: - """ - Registers a new user with the provided username, email, and password. - - :param username: User's username. - :type username: str - :param email: User's email address. - :type email: str - :param password: User's password. - :type password: str - :return: Tuple containing a dictionary and an HTTP status code. - :rtype: Tuple[Union[dict, str], int] - """ - - try: - if not UserService.__verify_username(username): - return {"msg": "Failed to verify username. Try another username"}, 400 - - if not UserService.__verify_displayname(displayname): - return {"msg": "Failed to verify display name. Try another name"}, 400 - - if not UserService.__verify_email(email): - return {"msg": "Failed to verify email. Try another email"}, 400 - - if not UserService.__verify_password(password): - return {"msg": "Failed to verify password. Try another (stronger) password"}, 400 - - hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()) - - with db_connection.cursor() as cursor: - cursor.execute("insert into user (username, displayname, email, password) values (%s, %s, %s, %s)", (username, displayname, email, hashed_password)) - db_connection.commit() - except Error as e: - print(f"Error: {e}") - return {"msg": "Failed to insert into database. Username or email are likely in use already"}, 500 - - UserService.__send_email("register", email=email) - - return {"msg": "User created successfully"}, 200 - - @staticmethod - def login(username: str, password: str) -> Tuple[Union[dict, str], int]: - """ - Authenticates a user with the provided username and password. - - :param username: User's username. - :type username: str - :param password: User's password. - :type password: str - :return: Tuple containing a dictionary with a token and an HTTP status code. - :rtype: Tuple[Union[dict, str], int] - """ - try: - with db_connection.cursor(dictionary=True) as cursor: - - cursor.execute("select id, email, password from user where username = %s", (username,)) - result = cursor.fetchone() - - user_id = result['id'] - email = result['email'] - password_hash = result['password'] - - if user_id is None: - return {"msg": "Username not found"}, 400 - - if not bcrypt.checkpw(password.encode('utf-8'), password_hash.encode('utf-8')): - return {"msg": "Incorrect password"}, 401 - - expire = datetime.timedelta(hours=1) - - token = create_access_token(identity=user_id, expires_delta=expire) - - UserService.__send_email("login", email=email) - - return {"token": token}, 200 - - except Error as e: - return {"msg": f"Failed to login. Error: {e}"}, 500 - - @staticmethod - def logout(jwt_token, user_id) -> Tuple[Union[dict, str], int]: - """ - Logs out a user by invalidating the provided JWT. - - :param jti: JWT ID. - :type jti: str - :param exp: JWT expiration timestamp. - :type exp: int - :return: Tuple containing a dictionary and an HTTP status code. - :rtype: Tuple[Union[dict, str], int] - """ - - jti = jwt['jti'] - exp = jwt['exp'] - - UserService.__invalidate_token(jti, exp) - UserService.__send_email("logout", id=user_id) - - return {"msg": "Successfully logged out"}, 200 - - @staticmethod - def delete_user(user_id: str) -> Tuple[Union[dict, str], int]: - """ - Deletes a user account. - - :param user_id: User ID. - :type user_id: str - :return: Tuple containing a dictionary and an HTTP status code. - :rtype: Tuple[Union[dict, str], int] - """ - - UserService.__send_email("delete", id=user_id) - - try: - with db_connection.cursor() as cursor: - cursor.execute("delete from user where id = %s", (user_id,)) - db_connection.commit() - except Error as e: - return {"msg": f"Failed to delete user. {e}"}, 500 - - return {"msg": "User successfully deleted"}, 200 - - @staticmethod - def update_email(user_id: str, new_email: str) -> Tuple[Union[dict, str], int]: - """ - Updates the email address for a user with the provided user ID. - - :param user_id: User's ID. - :type user_id: str - :param new_email: New email address. - :type new_email: str - :return: Tuple containing a dictionary and an HTTP status code. - :rtype: Tuple[Union[dict, str], int] - """ - - try: - if not UserService.__verify_email(new_email): - return {"msg": "Failed to verify email. Try another email"}, 400 - - with db_connection.cursor() as cursor: - cursor.execute("update user set email = %s where id = %s", (new_email, user_id)) - db_connection.commit() - except Error as e: - return {"msg": f"Failed to update email. Email is likely in use already. Error: {e}"}, 500 - - return {"msg": "Email successfully updated"}, 200 - - @staticmethod - def update_username(user_id: str, new_username: str) -> Tuple[Union[dict, str], int]: - """ - Updates the username for a user with the provided user ID. - - :param user_id: User's ID. - :type user_id: str - :param new_username: New username. - :type new_username: str - :return: Tuple containing a dictionary and an HTTP status code. - :rtype: Tuple[Union[dict, str], int] - """ - - try: - if not UserService.__verify_name(new_username): - return {"msg": "Failed to verify username. Try another one"}, 400 - - with db_connection.cursor() as cursor: - cursor.execute("update user set username = %s where id = %s", (new_username, user_id)) - db_connection.commit() - except Error as e: - return {"msg": f"Failed to update username. Username is likely in use already. Error: {e}"}, 500 - - return {"msg": "Username successfully updated"}, 200 - - @staticmethod - def update_password(user_id: str, new_password: str) -> Tuple[Union[dict, str], int]: - """ - Updates the password for a user with the provided user ID. - - :param user_id: User's ID. - :type user_id: str - :param new_password: New password. - :type new_password: str - :return: Tuple containing a dictionary and an HTTP status code. - :rtype: Tuple[Union[dict, str], int] - """ - - try: - if not UserService.__verify_password(new_password): - return {"msg": "Failed to verify password. Try another (stronger) one"}, 400 - - hashed_password = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt()) - - with db_connection.cursor() as cursor: - cursor.execute("update user set password = %s where id = %s", (new_username, user_id)) - db_connection.commit() - except Error as e: - return {"msg": f"Failed to update password. Error: {e}"}, 500 - - return {"msg": "Password successfully updated"}, 200 - - @staticmethod - def __send_email(message: str, username: str = None, id: str = None, email: str = None): - if email is not None: - send_mail(message, email) - return - - if username is not None: - try: - with db_connection.cursor(dictionary=True) as cursor: - cursor.execute("select email from user where username = %s", (username,)) - result = cursor.fetchone() - email = result['email'] - send_mail(message, email) - - except Error as e: - return {"msg": f"Failed to fetch some data. Error: {e}"}, 500 - return - - if id is not None: - try: - with db_connection.cursor(dictionary=True) as cursor: - cursor.execute("select email from user where id = %s", (id,)) - result = cursor.fetchone() - email = result['email'] - send_mail(message, email) - - except Error as e: - return {"msg": f"Failed to fetch some data. Error: {e}"}, 500 - return - - raise ValueError("Invalid input data to send mail") - - @staticmethod - def __invalidate_token(jti: str, exp: int): - """ - Invalidates a JWT by adding its JTI to the Redis blocklist. - - :param jti: JWT ID. - :type jti: str - :param exp: JWT expiration timestamp. - :type exp: int - """ - expiration = datetime.datetime.fromtimestamp(exp) - now = datetime.datetime.now() - - delta = expiration - now - jwt_redis_blocklist.set(jti, "", ex=delta) - - @staticmethod - def __verify_email(email: str) -> bool: - """ - Verifies a given email string against a regular expression. - - :param email: Email string. - :type email: str - :return: Boolean indicating whether the email successfully passed the check. - :rtype: bool - """ - email_regex = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" - return re.match(email_regex, email) and len(email) <= 64 - - @staticmethod - def __verify_displayname(displayname: str) -> bool: - """ - Verifies a given display name string against a regular expression. - - :param displayname: Display name string. - :type displayname: str - :return: Boolean indicating whether the display name successfully passed the check. - :rtype: bool - """ - displayname_regex = r"^[a-zA-Z.-_]{1,64}$" - return re.match(displayname_regex, displayname) - - @staticmethod - def __verify_username(username: str) -> bool: - """ - Verifies a given username string against a regular expression. - - :param username: Username string. - :type username: str - :return: Boolean indicating whether the username successfully passed the check. - :rtype: bool - """ - username_regex = r"^[a-z]{1,64}$" - return re.match(username_regex, username) - - @staticmethod - def __verify_password(password: str) -> bool: - """ - Verifies a given password string against a regular expression. - - :param password: Password string. - :type password: str - :return: Boolean indicating whether the password successfully passed the check. - :rtype: bool - """ - password_regex = r"^(?=.*?[A-Z])(?=.*?[a-z])(?=.*?[0-9])(?=.*?[#?!@$ %^&*-]).{8,64}$" - return re.match(password_regex, password) diff --git a/main.py b/main.py index 3f9dfc6..d778a18 100644 --- a/main.py +++ b/main.py @@ -1,6 +1,4 @@ from dotenv import load_dotenv -from flask_jwt_extended import JWTManager -import os from app import create_app @@ -9,5 +7,5 @@ load_dotenv() app = create_app() if __name__ == "__main__": - print("Hello, Flask") - app.run(use_reloader=False) + print("Hello, Flask") + app.run(use_reloader=False)