2024-03-05 16:01:26 +01:00
import bcrypt
import re
import jwt
import datetime
from typing import Tuple , Union
from mysql . connector import Error
from flask_jwt_extended import create_access_token
2024-03-07 07:52:27 +01:00
from app . extensions import db_cursor , db_connection
from app . extensions import jwt_redis_blocklist
2024-03-10 21:41:48 +01:00
from app . mail_utils import send_mail
2024-03-05 16:01:26 +01:00
class UserService :
2024-03-07 07:52:27 +01:00
"""
UserService class provides methods for user - related operations .
Methods :
- register ( username : str , email : str , password : str ) - > Tuple [ Union [ dict , str ] , int ]
- login ( username : str , password : str ) - > Tuple [ Union [ dict , str ] , int ]
- logout ( jti , exp ) - > Tuple [ Union [ dict , str ] , int ]
- update_email ( user_id : str , new_email : str ) - > Tuple [ Union [ dict , str ] , int ]
- update_username ( user_id : str , new_username : str ) - > Tuple [ Union [ dict , str ] , int ]
- update_password ( user_id : str , new_password : str ) - > Tuple [ Union [ dict , str ] , int ]
"""
2024-03-05 16:01:26 +01:00
@staticmethod
2024-03-07 22:16:29 +01:00
def register ( username : str , displayname : str , email : str , password : str ) - > Tuple [ Union [ dict , str ] , int ] :
2024-03-07 07:52:27 +01:00
"""
Registers a new user with the provided username , email , and password .
: param username : User ' s username.
: type username : str
: param email : User ' s email address.
: type email : str
: param password : User ' s password.
: type password : str
: return : Tuple containing a dictionary and an HTTP status code .
: rtype : Tuple [ Union [ dict , str ] , int ]
"""
2024-03-05 16:01:26 +01:00
if not UserService . __verify_username ( username ) :
2024-03-07 22:16:29 +01:00
return { " Failed " : " Failed to verify username. Try another username " } , 400
if not UserService . __verify_displayname ( displayname ) :
return { " Failed " : " Failed to verify display name. Try another name " } , 400
2024-03-05 16:01:26 +01:00
if not UserService . __verify_email ( email ) :
2024-03-07 22:16:29 +01:00
return { " Failed " : " Failed to verify email. Try another email " } , 400
2024-03-05 16:01:26 +01:00
if not UserService . __verify_password ( password ) :
2024-03-07 22:16:29 +01:00
return { " Failed " : " Failed to verify password. Try another (stronger) password " } , 400
2024-03-05 16:01:26 +01:00
hashed_password = bcrypt . hashpw ( password . encode ( ' utf-8 ' ) , bcrypt . gensalt ( ) )
try :
2024-03-07 22:16:29 +01:00
db_cursor . execute ( " insert into user (username, displayname, email, password) values ( %s , %s , %s , %s ) " , ( username , displayname , email , hashed_password ) )
2024-03-05 16:01:26 +01:00
db_connection . commit ( )
except Error as e :
print ( f " Error: { e } " )
return { " Failed " : " Failed to insert into database. Username or email are likely in use already " } , 500
2024-03-10 21:41:48 +01:00
# TODO Implement mail sending
# Currently throws error - connection refused
# send_mail("Successfully registered!", email, "Congratulations! Your account has been successfully created.\nThis mail also serves as a test that the email address is correct")
2024-03-05 16:01:26 +01:00
return { " Success " : " User created successfully " } , 200
@staticmethod
def login ( username : str , password : str ) - > Tuple [ Union [ dict , str ] , int ] :
2024-03-07 07:52:27 +01:00
"""
Authenticates a user with the provided username and password .
: param username : User ' s username.
: type username : str
: param password : User ' s password.
: type password : str
: return : Tuple containing a dictionary with a token and an HTTP status code .
: rtype : Tuple [ Union [ dict , str ] , int ]
"""
2024-03-05 16:01:26 +01:00
2024-03-07 22:16:29 +01:00
db_cursor . execute ( " select id, password from user where username = %s " , ( username , ) )
2024-03-05 16:01:26 +01:00
result = db_cursor . fetchone ( )
2024-03-07 22:16:29 +01:00
user_id = result [ ' id ' ]
2024-03-05 21:35:58 +01:00
password_hash = result [ ' password ' ]
2024-03-05 16:01:26 +01:00
if user_id is None :
return { " Failed " : " Username not found " } , 400
if not bcrypt . checkpw ( password . encode ( ' utf-8 ' ) , password_hash . encode ( ' utf-8 ' ) ) :
return { " Failed " : " Incorrect password " } , 401
2024-03-07 07:52:27 +01:00
expire = datetime . timedelta ( hours = 1 )
2024-03-05 16:01:26 +01:00
2024-03-07 15:04:34 +01:00
token = create_access_token ( identity = user_id , expires_delta = expire )
2024-03-05 16:01:26 +01:00
return { " token " : token } , 200
2024-03-07 07:52:27 +01:00
@staticmethod
def logout ( jti , exp ) - > Tuple [ Union [ dict , str ] , int ] :
"""
Logs out a user by invalidating the provided JWT .
: param jti : JWT ID .
: type jti : str
: param exp : JWT expiration timestamp .
: type exp : int
: return : Tuple containing a dictionary and an HTTP status code .
: rtype : Tuple [ Union [ dict , str ] , int ]
"""
UserService . __invalidate_token ( jti , exp )
return { " Success " : " Successfully logged out " } , 200
2024-03-07 15:04:34 +01:00
@staticmethod
def delete_user ( user_id : str ) - > Tuple [ Union [ dict , str ] , int ] :
try :
2024-03-07 22:16:29 +01:00
db_cursor . execute ( " delete from user where id = %s " , ( user_id , ) )
2024-03-07 15:04:34 +01:00
db_connection . commit ( )
except Error as e :
return { " Failed " : f " Failed to delete user. { e } " } , 500
return { " Success " : " User successfully deleted " } , 200
2024-03-05 16:01:26 +01:00
@staticmethod
def update_email ( user_id : str , new_email : str ) - > Tuple [ Union [ dict , str ] , int ] :
2024-03-07 07:52:27 +01:00
"""
Updates the email address for a user with the provided user ID .
: param user_id : User ' s ID.
: type user_id : str
: param new_email : New email address .
: type new_email : str
: return : Tuple containing a dictionary and an HTTP status code .
: rtype : Tuple [ Union [ dict , str ] , int ]
"""
2024-03-05 16:01:26 +01:00
if not UserService . __verify_email ( new_email ) :
2024-03-05 21:35:58 +01:00
return { " Failed " : " Failed to verify email. Try another email " } , 400
2024-03-05 16:01:26 +01:00
try :
2024-03-07 22:16:29 +01:00
db_cursor . execute ( " update user set email = %s where id = %s " , ( new_email , user_id ) )
2024-03-05 16:01:26 +01:00
db_connection . commit ( )
except Error as e :
return { " Failed " : f " Failed to update email. Email is likely in use already. Error: { e } " } , 500
return { " Success " : " Email successfully updated " } , 200
@staticmethod
def update_username ( user_id : str , new_username : str ) - > Tuple [ Union [ dict , str ] , int ] :
2024-03-07 07:52:27 +01:00
"""
Updates the username for a user with the provided user ID .
: param user_id : User ' s ID.
: type user_id : str
: param new_username : New username .
: type new_username : str
: return : Tuple containing a dictionary and an HTTP status code .
: rtype : Tuple [ Union [ dict , str ] , int ]
"""
2024-03-07 22:16:29 +01:00
if not UserService . __verify_name ( new_username ) :
2024-03-05 16:01:26 +01:00
return { " Failed " : " Failed to verify username. Try another one " } , 400
try :
2024-03-07 22:16:29 +01:00
db_cursor . execute ( " update user set username = %s where id = %s " , ( new_username , user_id ) )
2024-03-05 16:01:26 +01:00
db_connection . commit ( )
except Error as e :
return { " Failed " : f " Failed to update username. Username is likely in use already. Error: { e } " } , 500
return { " Success " : " Username successfully updated " } , 200
@staticmethod
def update_password ( user_id : str , new_password : str ) - > Tuple [ Union [ dict , str ] , int ] :
2024-03-07 07:52:27 +01:00
"""
Updates the password for a user with the provided user ID .
: param user_id : User ' s ID.
: type user_id : str
: param new_password : New password .
: type new_password : str
: return : Tuple containing a dictionary and an HTTP status code .
: rtype : Tuple [ Union [ dict , str ] , int ]
"""
2024-03-05 16:01:26 +01:00
if not UserService . __verify_password ( new_password ) :
return { " Failed " : " Failed to verify password. Try another (stronger) one " } , 400
hashed_password = bcrypt . hashpw ( new_password . encode ( ' utf-8 ' ) , bcrypt . gensalt ( ) )
try :
2024-03-07 22:16:29 +01:00
db_cursor . execute ( " update user set password = %s where id = %s " , ( new_username , user_id ) )
2024-03-05 16:01:26 +01:00
db_connection . commit ( )
except Error as e :
return { " Failed " : f " Failed to update password. Error: { e } " } , 500
return { " Success " : " Password successfully updated " } , 200
2024-03-07 07:52:27 +01:00
@staticmethod
def __invalidate_token ( jti : str , exp : int ) :
"""
Invalidates a JWT by adding its JTI to the Redis blocklist .
: param jti : JWT ID .
: type jti : str
: param exp : JWT expiration timestamp .
: type exp : int
"""
expiration = datetime . datetime . fromtimestamp ( exp )
now = datetime . datetime . now ( )
delta = expiration - now
jwt_redis_blocklist . set ( jti , " " , ex = delta )
2024-03-05 16:01:26 +01:00
@staticmethod
def __verify_email ( email : str ) - > bool :
2024-03-07 07:52:27 +01:00
"""
Verifies a given email string against a regular expression .
: param email : Email string .
: type email : str
: return : Boolean indicating whether the email successfully passed the check .
: rtype : bool
"""
2024-03-05 16:01:26 +01:00
email_regex = r " ^[a-zA-Z0-9._ % +-]+@[a-zA-Z0-9.-]+ \ .[a-zA-Z] { 2,}$ "
2024-03-07 07:52:27 +01:00
return re . match ( email_regex , email ) and len ( email ) < = 64
2024-03-05 16:01:26 +01:00
2024-03-07 22:16:29 +01:00
@staticmethod
def __verify_displayname ( displayname : str ) - > bool :
"""
Verifies a given display name string against a regular expression .
: param displayname : Display name string .
: type displayname : str
: return : Boolean indicating whether the display name successfully passed the check .
: rtype : bool
"""
displayname_regex = r " ^[a-zA-Z.-_] { 1,64}$ "
2024-03-07 22:37:49 +01:00
return re . match ( displayname_regex , displayname )
2024-03-07 22:16:29 +01:00
2024-03-05 16:01:26 +01:00
@staticmethod
def __verify_username ( username : str ) - > bool :
2024-03-07 07:52:27 +01:00
"""
Verifies a given username string against a regular expression .
: param username : Username string .
: type username : str
: return : Boolean indicating whether the username successfully passed the check .
: rtype : bool
"""
2024-03-07 22:16:29 +01:00
username_regex = r " ^[a-z] { 1,64}$ "
2024-03-05 16:01:26 +01:00
return re . match ( username_regex , username )
@staticmethod
def __verify_password ( password : str ) - > bool :
2024-03-07 07:52:27 +01:00
"""
Verifies a given password string against a regular expression .
: param password : Password string .
: type password : str
: return : Boolean indicating whether the password successfully passed the check .
: rtype : bool
"""
2024-03-05 16:01:26 +01:00
password_regex = r " ^(?=.*?[A-Z])(?=.*?[a-z])(?=.*?[0-9])(?=.*?[#?!@$ % ^&*-]). { 8,64}$ "
return re . match ( password_regex , password )