import bcrypt import re import jwt import datetime from typing import Tuple, Union from mysql.connector import Error from flask_jwt_extended import create_access_token from app.extensions import db_cursor, db_connection from app.extensions import jwt_redis_blocklist class UserService: """ UserService class provides methods for user-related operations. Methods: - register(username: str, email: str, password: str) -> Tuple[Union[dict, str], int] - login(username: str, password: str) -> Tuple[Union[dict, str], int] - logout(jti, exp) -> Tuple[Union[dict, str], int] - update_email(user_id: str, new_email: str) -> Tuple[Union[dict, str], int] - update_username(user_id: str, new_username: str) -> Tuple[Union[dict, str], int] - update_password(user_id: str, new_password: str) -> Tuple[Union[dict, str], int] """ @staticmethod def register(username: str, email: str, password: str) -> Tuple[Union[dict, str], int]: """ Registers a new user with the provided username, email, and password. :param username: User's username. :type username: str :param email: User's email address. :type email: str :param password: User's password. :type password: str :return: Tuple containing a dictionary and an HTTP status code. :rtype: Tuple[Union[dict, str], int] """ if not UserService.__verify_username(username): return {"Failed": "Failed to verify username. Try another username"}, 400 if not UserService.__verify_email(email): return {"Failed": "Failed to verify email. Try another email"}, 400 if not UserService.__verify_password(password): return {"Failed": "Failed to verify password. Try another (stronger) password"}, 400 hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()) try: db_cursor.execute("insert into user (username, email, password, role_id) values (%s, %s, %s, 1)", (username, email, hashed_password)) db_connection.commit() except Error as e: print(f"Error: {e}") return {"Failed": "Failed to insert into database. Username or email are likely in use already"}, 500 return {"Success": "User created successfully"}, 200 @staticmethod def login(username: str, password: str) -> Tuple[Union[dict, str], int]: """ Authenticates a user with the provided username and password. :param username: User's username. :type username: str :param password: User's password. :type password: str :return: Tuple containing a dictionary with a token and an HTTP status code. :rtype: Tuple[Union[dict, str], int] """ db_cursor.execute("select user_id, password from user where username = %s", (username,)) result = db_cursor.fetchone() user_id = result['user_id'] password_hash = result['password'] if user_id is None: return {"Failed": "Username not found"}, 400 if not bcrypt.checkpw(password.encode('utf-8'), password_hash.encode('utf-8')): return {"Failed": "Incorrect password"}, 401 expire = datetime.timedelta(hours=1) token = create_access_token(identity=user_id, expires_delta=expire) return {"token": token}, 200 @staticmethod def logout(jti, exp) -> Tuple[Union[dict, str], int]: """ Logs out a user by invalidating the provided JWT. :param jti: JWT ID. :type jti: str :param exp: JWT expiration timestamp. :type exp: int :return: Tuple containing a dictionary and an HTTP status code. :rtype: Tuple[Union[dict, str], int] """ UserService.__invalidate_token(jti, exp) return {"Success": "Successfully logged out"}, 200 @staticmethod def delete_user(user_id: str) -> Tuple[Union[dict, str], int]: try: db_cursor.execute("delete from user where user_id = %s", (user_id,)) db_connection.commit() except Error as e: return {"Failed": f"Failed to delete user. {e}"}, 500 return {"Success": "User successfully deleted"}, 200 @staticmethod def update_email(user_id: str, new_email: str) -> Tuple[Union[dict, str], int]: """ Updates the email address for a user with the provided user ID. :param user_id: User's ID. :type user_id: str :param new_email: New email address. :type new_email: str :return: Tuple containing a dictionary and an HTTP status code. :rtype: Tuple[Union[dict, str], int] """ if not UserService.__verify_email(new_email): return {"Failed": "Failed to verify email. Try another email"}, 400 try: db_cursor.execute("update user set email = %s where user_id = %s", (new_email, user_id)) db_connection.commit() except Error as e: return {"Failed": f"Failed to update email. Email is likely in use already. Error: {e}"}, 500 return {"Success": "Email successfully updated"}, 200 @staticmethod def update_username(user_id: str, new_username: str) -> Tuple[Union[dict, str], int]: """ Updates the username for a user with the provided user ID. :param user_id: User's ID. :type user_id: str :param new_username: New username. :type new_username: str :return: Tuple containing a dictionary and an HTTP status code. :rtype: Tuple[Union[dict, str], int] """ if not UserService.__verify_username(new_username): return {"Failed": "Failed to verify username. Try another one"}, 400 try: db_cursor.execute("update user set username = %s where user_id = %s", (new_username, user_id)) db_connection.commit() except Error as e: return {"Failed": f"Failed to update username. Username is likely in use already. Error: {e}"}, 500 return {"Success": "Username successfully updated"}, 200 @staticmethod def update_password(user_id: str, new_password: str) -> Tuple[Union[dict, str], int]: """ Updates the password for a user with the provided user ID. :param user_id: User's ID. :type user_id: str :param new_password: New password. :type new_password: str :return: Tuple containing a dictionary and an HTTP status code. :rtype: Tuple[Union[dict, str], int] """ if not UserService.__verify_password(new_password): return {"Failed": "Failed to verify password. Try another (stronger) one"}, 400 hashed_password = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt()) try: db_cursor.execute("update user set password = %s where user_id = %s", (new_username, user_id)) db_connection.commit() except Error as e: return {"Failed": f"Failed to update password. Error: {e}"}, 500 return {"Success": "Password successfully updated"}, 200 @staticmethod def __invalidate_token(jti: str, exp: int): """ Invalidates a JWT by adding its JTI to the Redis blocklist. :param jti: JWT ID. :type jti: str :param exp: JWT expiration timestamp. :type exp: int """ expiration = datetime.datetime.fromtimestamp(exp) now = datetime.datetime.now() delta = expiration - now jwt_redis_blocklist.set(jti, "", ex=delta) @staticmethod def __verify_email(email: str) -> bool: """ Verifies a given email string against a regular expression. :param email: Email string. :type email: str :return: Boolean indicating whether the email successfully passed the check. :rtype: bool """ email_regex = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" return re.match(email_regex, email) and len(email) <= 64 @staticmethod def __verify_username(username: str) -> bool: """ Verifies a given username string against a regular expression. :param username: Username string. :type username: str :return: Boolean indicating whether the username successfully passed the check. :rtype: bool """ username_regex = r"^[a-zA-Z.-_]{1,64}$" return re.match(username_regex, username) @staticmethod def __verify_password(password: str) -> bool: """ Verifies a given password string against a regular expression. :param password: Password string. :type password: str :return: Boolean indicating whether the password successfully passed the check. :rtype: bool """ password_regex = r"^(?=.*?[A-Z])(?=.*?[a-z])(?=.*?[0-9])(?=.*?[#?!@$ %^&*-]).{8,64}$" return re.match(password_regex, password)