From 0202a4022845af53d1fff0b8373ba68037dbe1d0 Mon Sep 17 00:00:00 2001 From: Thastertyn Date: Thu, 7 Mar 2024 07:52:27 +0100 Subject: [PATCH] Cleanup and documentation of user management code --- app/api/routes/user_routes.py | 97 +++++++++++++++--------- app/services/user_service.py | 139 +++++++++++++++++++++++++++++++--- 2 files changed, 188 insertions(+), 48 deletions(-) diff --git a/app/api/routes/user_routes.py b/app/api/routes/user_routes.py index b33d9b8..34a012f 100644 --- a/app/api/routes/user_routes.py +++ b/app/api/routes/user_routes.py @@ -1,10 +1,21 @@ from app.api import bp_user from flask_jwt_extended import jwt_required, get_jwt_identity, get_jwt -from flask import request, abort, jsonify -from datetime import timedelta +from flask import request, abort from app.services.user_service import UserService -from app.extensions import jwt_redis_blocklist + +@bp_user.route('/register', methods=['POST']) +def register(): + username = request.json.get('username') + email = request.json.get('email') + password = request.json.get('password') + + if username is None or email is None or password is None: + return abort(400) + + result, status_code = UserService.register(username, email, password) + + return result, status_code @bp_user.route('/login', methods=['POST']) def login(): @@ -16,43 +27,20 @@ def login(): result, status_code = UserService.login(username, password) - return jsonify(**result), status_code + return result, status_code @bp_user.route('/logout', methods=['DELETE']) @jwt_required() def logout(): - jti = get_jwt()["jti"] - jwt_redis_blocklist.set(jti, "", ex=timedelta(days=1)) + jwt = get_jwt() - return {"Success": "Successfully logged out"}, 200 + jti = jwt['jti'] + exp = jwt['exp'] + result, status_code = UserService.logout(jti, exp) -@bp_user.route('/create', methods=['POST']) -def create_user(): - username = request.json.get('username') - email = request.json.get('email') - password = request.json.get('password') + return result, status_code - if username is None or email is None or password is None: - return abort(400) - - result, status_code = UserService.create_user(username, email, password) - - return jsonify(**result), status_code - -@bp_user.route('/update/email', methods=['POST']) -@jwt_required() -def update_email(): - username = get_jwt_identity() - new_mail = request.json.get('new_email') - - if new_mail is None: - return abort(400) - - result, status_code = UserService.update_email(username, new_mail) - - return jsonify(**result), status_code - -@bp_user.route('/update/username', methods=['POST']) +@bp_user.route('/update/username', methods=['PUT']) @jwt_required() def update_username(): username = get_jwt_identity() @@ -63,9 +51,35 @@ def update_username(): result, status_code = UserService.update_username(username, new_username) - return jsonify(**result), status_code + jwt = get_jwt() + + jti = jwt['jti'] + exp = jwt['exp'] + UserService.logout(jti, exp) -@bp_user.route('/update/password', methods=['POST']) + return result, status_code + +@bp_user.route('/update/email', methods=['PUT']) +@jwt_required() +def update_email(): + username = get_jwt_identity() + new_mail = request.json.get('new_email') + + if new_mail is None: + return abort(400) + + result, status_code = UserService.update_email(username, new_mail) + + jwt = get_jwt() + + jti = jwt['jti'] + exp = jwt['exp'] + UserService.logout(jti, exp) + + return result, status_code + + +@bp_user.route('/update/password', methods=['PUT']) @jwt_required() def update_password(): username = get_jwt_identity() @@ -76,4 +90,15 @@ def update_password(): result, status_code = UserService.update_password(username, new_password) - return jsonify(**result), status_code \ No newline at end of file + jwt = get_jwt() + + jti = jwt['jti'] + exp = jwt['exp'] + UserService.logout(jti, exp) + + return result, status_code + +@bp_user.route('/delete', methods=['DELETE']) +@jwt_required() +def delete_user(): + return abort(501) \ No newline at end of file diff --git a/app/services/user_service.py b/app/services/user_service.py index 26dc84a..cfb7144 100644 --- a/app/services/user_service.py +++ b/app/services/user_service.py @@ -3,17 +3,40 @@ import re import jwt import datetime from typing import Tuple, Union +from mysql.connector import Error +from flask_jwt_extended import create_access_token from app.extensions import db_cursor, db_connection -from mysql.connector import Error - -from flask_jwt_extended import create_access_token +from app.extensions import jwt_redis_blocklist class UserService: + """ + UserService class provides methods for user-related operations. + + Methods: + - register(username: str, email: str, password: str) -> Tuple[Union[dict, str], int] + - login(username: str, password: str) -> Tuple[Union[dict, str], int] + - logout(jti, exp) -> Tuple[Union[dict, str], int] + - update_email(user_id: str, new_email: str) -> Tuple[Union[dict, str], int] + - update_username(user_id: str, new_username: str) -> Tuple[Union[dict, str], int] + - update_password(user_id: str, new_password: str) -> Tuple[Union[dict, str], int] + """ @staticmethod - def create_user(username: str, email: str, password: str) -> Tuple[Union[dict, str], int]: + def register(username: str, email: str, password: str) -> Tuple[Union[dict, str], int]: + """ + Registers a new user with the provided username, email, and password. + + :param username: User's username. + :type username: str + :param email: User's email address. + :type email: str + :param password: User's password. + :type password: str + :return: Tuple containing a dictionary and an HTTP status code. + :rtype: Tuple[Union[dict, str], int] + """ if not UserService.__verify_username(username): return {"Failed": "Failed to verify username. Try another username"}, 400 @@ -24,10 +47,6 @@ class UserService: if not UserService.__verify_password(password): return {"Failed": "Failed to verify password. Try another (stronger) password"}, 400 - # Role ID 1 => Normal user - # Role ID 2 => Seller - # Role ID 3 => Admin - hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()) try: @@ -49,6 +68,16 @@ class UserService: @staticmethod def login(username: str, password: str) -> Tuple[Union[dict, str], int]: + """ + Authenticates a user with the provided username and password. + + :param username: User's username. + :type username: str + :param password: User's password. + :type password: str + :return: Tuple containing a dictionary with a token and an HTTP status code. + :rtype: Tuple[Union[dict, str], int] + """ db_cursor.execute("select user_id, password, last_change from user where username = %s", (username,)) result = db_cursor.fetchone() @@ -63,15 +92,41 @@ class UserService: if not bcrypt.checkpw(password.encode('utf-8'), password_hash.encode('utf-8')): return {"Failed": "Incorrect password"}, 401 - expire = datetime.timedelta(days=1) + expire = datetime.timedelta(hours=1) token = create_access_token(identity=user_id, expires_delta=expire,additional_claims={"lm": last_change}) return {"token": token}, 200 + @staticmethod + def logout(jti, exp) -> Tuple[Union[dict, str], int]: + """ + Logs out a user by invalidating the provided JWT. + + :param jti: JWT ID. + :type jti: str + :param exp: JWT expiration timestamp. + :type exp: int + :return: Tuple containing a dictionary and an HTTP status code. + :rtype: Tuple[Union[dict, str], int] + """ + UserService.__invalidate_token(jti, exp) + + return {"Success": "Successfully logged out"}, 200 + @staticmethod def update_email(user_id: str, new_email: str) -> Tuple[Union[dict, str], int]: - + """ + Updates the email address for a user with the provided user ID. + + :param user_id: User's ID. + :type user_id: str + :param new_email: New email address. + :type new_email: str + :return: Tuple containing a dictionary and an HTTP status code. + :rtype: Tuple[Union[dict, str], int] + """ + if not UserService.__verify_email(new_email): return {"Failed": "Failed to verify email. Try another email"}, 400 @@ -85,7 +140,17 @@ class UserService: @staticmethod def update_username(user_id: str, new_username: str) -> Tuple[Union[dict, str], int]: - + """ + Updates the username for a user with the provided user ID. + + :param user_id: User's ID. + :type user_id: str + :param new_username: New username. + :type new_username: str + :return: Tuple containing a dictionary and an HTTP status code. + :rtype: Tuple[Union[dict, str], int] + """ + if not UserService.__verify_username(new_username): return {"Failed": "Failed to verify username. Try another one"}, 400 @@ -99,6 +164,16 @@ class UserService: @staticmethod def update_password(user_id: str, new_password: str) -> Tuple[Union[dict, str], int]: + """ + Updates the password for a user with the provided user ID. + + :param user_id: User's ID. + :type user_id: str + :param new_password: New password. + :type new_password: str + :return: Tuple containing a dictionary and an HTTP status code. + :rtype: Tuple[Union[dict, str], int] + """ if not UserService.__verify_password(new_password): return {"Failed": "Failed to verify password. Try another (stronger) one"}, 400 @@ -113,17 +188,57 @@ class UserService: return {"Success": "Password successfully updated"}, 200 + @staticmethod + def __invalidate_token(jti: str, exp: int): + """ + Invalidates a JWT by adding its JTI to the Redis blocklist. + + :param jti: JWT ID. + :type jti: str + :param exp: JWT expiration timestamp. + :type exp: int + """ + expiration = datetime.datetime.fromtimestamp(exp) + now = datetime.datetime.now() + + delta = expiration - now + jwt_redis_blocklist.set(jti, "", ex=delta) + @staticmethod def __verify_email(email: str) -> bool: + """ + Verifies a given email string against a regular expression. + + :param email: Email string. + :type email: str + :return: Boolean indicating whether the email successfully passed the check. + :rtype: bool + """ email_regex = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" - return re.match(email_regex ,email) and len(email) <= 64 + return re.match(email_regex, email) and len(email) <= 64 @staticmethod def __verify_username(username: str) -> bool: + """ + Verifies a given username string against a regular expression. + + :param username: Username string. + :type username: str + :return: Boolean indicating whether the username successfully passed the check. + :rtype: bool + """ username_regex = r"^[a-zA-Z.-_]{1,64}$" return re.match(username_regex, username) @staticmethod def __verify_password(password: str) -> bool: + """ + Verifies a given password string against a regular expression. + + :param password: Password string. + :type password: str + :return: Boolean indicating whether the password successfully passed the check. + :rtype: bool + """ password_regex = r"^(?=.*?[A-Z])(?=.*?[a-z])(?=.*?[0-9])(?=.*?[#?!@$ %^&*-]).{8,64}$" return re.match(password_regex, password)