import bcrypt import re import jwt import datetime from typing import Tuple, Union from app.extensions import db_cursor, db_connection from mysql.connector import Error from flask_jwt_extended import create_access_token class UserService: @staticmethod def create_user(username: str, email: str, password: str) -> Tuple[Union[dict, str], int]: if not UserService.__verify_username(username): return {"Failed": "Failed to verify username. Try another username"}, 400 if not UserService.__verify_email(email): return {"Failed": "Failed to verify email. Try another email"}, 400 if not UserService.__verify_password(password): return {"Failed": "Failed to verify password. Try another (stronger) password"}, 400 # Role ID 1 => Normal user # Role ID 2 => Seller # Role ID 3 => Admin hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()) try: db_cursor.execute("select max(user_id) as max_id from user") last_id = db_cursor.fetchone()['max_id'] if last_id < 23000: return {"Failed": "Error occurred when fetching last user id"} new_id = last_id + 1 db_cursor.execute("insert into user (username, email, password, user_id, role_id) values (%s, %s, %s, %s, 1)", (username, email, hashed_password, new_id)) db_connection.commit() except Error as e: print(f"Error: {e}") return {"Failed": "Failed to insert into database. Username or email are likely in use already"}, 500 return {"Success": "User created successfully"}, 200 @staticmethod def login(username: str, password: str) -> Tuple[Union[dict, str], int]: db_cursor.execute("select user_id, password, last_change from user where username = %s", (username,)) result = db_cursor.fetchone() user_id = result['user_id'] password_hash = result['password'] last_change = result['last_change'] if user_id is None: return {"Failed": "Username not found"}, 400 if not bcrypt.checkpw(password.encode('utf-8'), password_hash.encode('utf-8')): return {"Failed": "Incorrect password"}, 401 expire = datetime.timedelta(days=1) token = create_access_token(identity=user_id, expires_delta=expire,additional_claims={"lm": last_change}) return {"token": token}, 200 @staticmethod def update_email(user_id: str, new_email: str) -> Tuple[Union[dict, str], int]: if not UserService.__verify_email(new_email): return {"Failed": "Failed to verify email. Try another email"}, 400 try: db_cursor.execute("update user set email = %s where user_id = %s", (new_email, user_id)) db_connection.commit() except Error as e: return {"Failed": f"Failed to update email. Email is likely in use already. Error: {e}"}, 500 return {"Success": "Email successfully updated"}, 200 @staticmethod def update_username(user_id: str, new_username: str) -> Tuple[Union[dict, str], int]: if not UserService.__verify_username(new_username): return {"Failed": "Failed to verify username. Try another one"}, 400 try: db_cursor.execute("update user set username = %s where user_id = %s", (new_username, user_id)) db_connection.commit() except Error as e: return {"Failed": f"Failed to update username. Username is likely in use already. Error: {e}"}, 500 return {"Success": "Username successfully updated"}, 200 @staticmethod def update_password(user_id: str, new_password: str) -> Tuple[Union[dict, str], int]: if not UserService.__verify_password(new_password): return {"Failed": "Failed to verify password. Try another (stronger) one"}, 400 hashed_password = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt()) try: db_cursor.execute("update user set password = %s where user_id = %s", (new_username, user_id)) db_connection.commit() except Error as e: return {"Failed": f"Failed to update password. Error: {e}"}, 500 return {"Success": "Password successfully updated"}, 200 @staticmethod def __verify_email(email: str) -> bool: email_regex = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" return re.match(email_regex ,email) and len(email) <= 64 @staticmethod def __verify_username(username: str) -> bool: username_regex = r"^[a-zA-Z.-_]{1,64}$" return re.match(username_regex, username) @staticmethod def __verify_password(password: str) -> bool: password_regex = r"^(?=.*?[A-Z])(?=.*?[a-z])(?=.*?[0-9])(?=.*?[#?!@$ %^&*-]).{8,64}$" return re.match(password_regex, password)