import bcrypt import re import jwt import datetime from typing import Tuple, Union from mysql.connector import Error from flask_jwt_extended import create_access_token from app.extensions import db_connection from app.extensions import jwt_redis_blocklist from app.mail.mail import send_mail class UserService: """ UserService class provides methods for user-related operations. Methods: - register(username: str, email: str, password: str) -> Tuple[Union[dict, str], int] - login(username: str, password: str) -> Tuple[Union[dict, str], int] - logout(jti, exp) -> Tuple[Union[dict, str], int] - update_email(user_id: str, new_email: str) -> Tuple[Union[dict, str], int] - update_username(user_id: str, new_username: str) -> Tuple[Union[dict, str], int] - update_password(user_id: str, new_password: str) -> Tuple[Union[dict, str], int] """ @staticmethod def register(username: str, displayname: str, email: str, password: str) -> Tuple[Union[dict, str], int]: """ Registers a new user with the provided username, email, and password. :param username: User's username. :type username: str :param email: User's email address. :type email: str :param password: User's password. :type password: str :return: Tuple containing a dictionary and an HTTP status code. :rtype: Tuple[Union[dict, str], int] """ try: if not UserService.__verify_username(username): return {"msg": "Failed to verify username. Try another username"}, 400 if not UserService.__verify_displayname(displayname): return {"msg": "Failed to verify display name. Try another name"}, 400 if not UserService.__verify_email(email): return {"msg": "Failed to verify email. Try another email"}, 400 if not UserService.__verify_password(password): return {"msg": "Failed to verify password. Try another (stronger) password"}, 400 hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()) with db_connection.cursor() as cursor: cursor.execute("insert into user (username, displayname, email, password) values (%s, %s, %s, %s)", (username, displayname, email, hashed_password)) db_connection.commit() except Error as e: print(f"Error: {e}") return {"msg": "Failed to insert into database. Username or email are likely in use already"}, 500 UserService.__send_email("register", email=email) return {"msg": "User created successfully"}, 200 @staticmethod def login(username: str, password: str) -> Tuple[Union[dict, str], int]: """ Authenticates a user with the provided username and password. :param username: User's username. :type username: str :param password: User's password. :type password: str :return: Tuple containing a dictionary with a token and an HTTP status code. :rtype: Tuple[Union[dict, str], int] """ try: with db_connection.cursor(dictionary=True) as cursor: cursor.execute("select id, email, password from user where username = %s", (username,)) result = cursor.fetchone() user_id = result['id'] email = result['email'] password_hash = result['password'] if user_id is None: return {"msg": "Username not found"}, 400 if not bcrypt.checkpw(password.encode('utf-8'), password_hash.encode('utf-8')): return {"msg": "Incorrect password"}, 401 expire = datetime.timedelta(hours=1) token = create_access_token(identity=user_id, expires_delta=expire) UserService.__send_email("login", email=email) return {"token": token}, 200 except Error as e: return {"msg": f"Failed to login. Error: {e}"}, 500 @staticmethod def logout(jwt_token, user_id) -> Tuple[Union[dict, str], int]: """ Logs out a user by invalidating the provided JWT. :param jti: JWT ID. :type jti: str :param exp: JWT expiration timestamp. :type exp: int :return: Tuple containing a dictionary and an HTTP status code. :rtype: Tuple[Union[dict, str], int] """ jti = jwt['jti'] exp = jwt['exp'] UserService.__invalidate_token(jti, exp) UserService.__send_email("logout", id=user_id) return {"msg": "Successfully logged out"}, 200 @staticmethod def delete_user(user_id: str) -> Tuple[Union[dict, str], int]: """ Deletes a user account. :param user_id: User ID. :type user_id: str :return: Tuple containing a dictionary and an HTTP status code. :rtype: Tuple[Union[dict, str], int] """ UserService.__send_email("delete", id=user_id) try: with db_connection.cursor() as cursor: cursor.execute("delete from user where id = %s", (user_id,)) db_connection.commit() except Error as e: return {"msg": f"Failed to delete user. {e}"}, 500 return {"msg": "User successfully deleted"}, 200 @staticmethod def update_email(user_id: str, new_email: str) -> Tuple[Union[dict, str], int]: """ Updates the email address for a user with the provided user ID. :param user_id: User's ID. :type user_id: str :param new_email: New email address. :type new_email: str :return: Tuple containing a dictionary and an HTTP status code. :rtype: Tuple[Union[dict, str], int] """ try: if not UserService.__verify_email(new_email): return {"msg": "Failed to verify email. Try another email"}, 400 with db_connection.cursor() as cursor: cursor.execute("update user set email = %s where id = %s", (new_email, user_id)) db_connection.commit() except Error as e: return {"msg": f"Failed to update email. Email is likely in use already. Error: {e}"}, 500 return {"msg": "Email successfully updated"}, 200 @staticmethod def update_username(user_id: str, new_username: str) -> Tuple[Union[dict, str], int]: """ Updates the username for a user with the provided user ID. :param user_id: User's ID. :type user_id: str :param new_username: New username. :type new_username: str :return: Tuple containing a dictionary and an HTTP status code. :rtype: Tuple[Union[dict, str], int] """ try: if not UserService.__verify_name(new_username): return {"msg": "Failed to verify username. Try another one"}, 400 with db_connection.cursor() as cursor: cursor.execute("update user set username = %s where id = %s", (new_username, user_id)) db_connection.commit() except Error as e: return {"msg": f"Failed to update username. Username is likely in use already. Error: {e}"}, 500 return {"msg": "Username successfully updated"}, 200 @staticmethod def update_password(user_id: str, new_password: str) -> Tuple[Union[dict, str], int]: """ Updates the password for a user with the provided user ID. :param user_id: User's ID. :type user_id: str :param new_password: New password. :type new_password: str :return: Tuple containing a dictionary and an HTTP status code. :rtype: Tuple[Union[dict, str], int] """ try: if not UserService.__verify_password(new_password): return {"msg": "Failed to verify password. Try another (stronger) one"}, 400 hashed_password = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt()) with db_connection.cursor() as cursor: cursor.execute("update user set password = %s where id = %s", (new_username, user_id)) db_connection.commit() except Error as e: return {"msg": f"Failed to update password. Error: {e}"}, 500 return {"msg": "Password successfully updated"}, 200 @staticmethod def __send_email(message: str, username: str = None, id: str = None, email: str = None): if email is not None: send_mail(message, email) return if username is not None: try: with db_connection.cursor(dictionary=True) as cursor: cursor.execute("select email from user where username = %s", (username,)) result = cursor.fetchone() email = result['email'] send_mail(message, email) except Error as e: return {"msg": f"Failed to fetch some data. Error: {e}"}, 500 return if id is not None: try: with db_connection.cursor(dictionary=True) as cursor: cursor.execute("select email from user where id = %s", (id,)) result = cursor.fetchone() email = result['email'] send_mail(message, email) except Error as e: return {"msg": f"Failed to fetch some data. Error: {e}"}, 500 return raise ValueError("Invalid input data to send mail") @staticmethod def __invalidate_token(jti: str, exp: int): """ Invalidates a JWT by adding its JTI to the Redis blocklist. :param jti: JWT ID. :type jti: str :param exp: JWT expiration timestamp. :type exp: int """ expiration = datetime.datetime.fromtimestamp(exp) now = datetime.datetime.now() delta = expiration - now jwt_redis_blocklist.set(jti, "", ex=delta) @staticmethod def __verify_email(email: str) -> bool: """ Verifies a given email string against a regular expression. :param email: Email string. :type email: str :return: Boolean indicating whether the email successfully passed the check. :rtype: bool """ email_regex = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" return re.match(email_regex, email) and len(email) <= 64 @staticmethod def __verify_displayname(displayname: str) -> bool: """ Verifies a given display name string against a regular expression. :param displayname: Display name string. :type displayname: str :return: Boolean indicating whether the display name successfully passed the check. :rtype: bool """ displayname_regex = r"^[a-zA-Z.-_]{1,64}$" return re.match(displayname_regex, displayname) @staticmethod def __verify_username(username: str) -> bool: """ Verifies a given username string against a regular expression. :param username: Username string. :type username: str :return: Boolean indicating whether the username successfully passed the check. :rtype: bool """ username_regex = r"^[a-z]{1,64}$" return re.match(username_regex, username) @staticmethod def __verify_password(password: str) -> bool: """ Verifies a given password string against a regular expression. :param password: Password string. :type password: str :return: Boolean indicating whether the password successfully passed the check. :rtype: bool """ password_regex = r"^(?=.*?[A-Z])(?=.*?[a-z])(?=.*?[0-9])(?=.*?[#?!@$ %^&*-]).{8,64}$" return re.match(password_regex, password)