swag-shop/app/services/user_service.py

324 lines
10 KiB
Python
Raw Normal View History

2024-03-05 16:01:26 +01:00
import bcrypt
import re
import jwt
import datetime
from typing import Tuple, Union
from mysql.connector import Error
from flask_jwt_extended import create_access_token
2024-03-10 22:58:04 +01:00
from app.extensions import db_connection
from app.extensions import jwt_redis_blocklist
from app.mail.mail import send_mail
2024-03-10 21:41:48 +01:00
2024-03-05 16:01:26 +01:00
class UserService:
"""
UserService class provides methods for user-related operations.
Methods:
- register(username: str, email: str, password: str) -> Tuple[Union[dict, str], int]
- login(username: str, password: str) -> Tuple[Union[dict, str], int]
- logout(jti, exp) -> Tuple[Union[dict, str], int]
- update_email(user_id: str, new_email: str) -> Tuple[Union[dict, str], int]
- update_username(user_id: str, new_username: str) -> Tuple[Union[dict, str], int]
- update_password(user_id: str, new_password: str) -> Tuple[Union[dict, str], int]
"""
2024-03-05 16:01:26 +01:00
@staticmethod
def register(username: str, displayname: str, email: str, password: str) -> Tuple[Union[dict, str], int]:
"""
Registers a new user with the provided username, email, and password.
:param username: User's username.
:type username: str
:param email: User's email address.
:type email: str
:param password: User's password.
:type password: str
:return: Tuple containing a dictionary and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
2024-03-05 16:01:26 +01:00
2024-03-10 22:58:04 +01:00
try:
if not UserService.__verify_username(username):
return {"Failed": "Failed to verify username. Try another username"}, 400
2024-03-10 22:58:04 +01:00
if not UserService.__verify_displayname(displayname):
return {"Failed": "Failed to verify display name. Try another name"}, 400
2024-03-05 16:01:26 +01:00
2024-03-10 22:58:04 +01:00
if not UserService.__verify_email(email):
return {"Failed": "Failed to verify email. Try another email"}, 400
2024-03-05 16:01:26 +01:00
2024-03-10 22:58:04 +01:00
if not UserService.__verify_password(password):
return {"Failed": "Failed to verify password. Try another (stronger) password"}, 400
2024-03-05 16:01:26 +01:00
2024-03-10 22:58:04 +01:00
hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt())
2024-03-05 16:01:26 +01:00
2024-03-10 22:58:04 +01:00
with db_connection.cursor() as cursor:
cursor.execute("insert into user (username, displayname, email, password) values (%s, %s, %s, %s)", (username, displayname, email, hashed_password))
db_connection.commit()
2024-03-05 16:01:26 +01:00
except Error as e:
print(f"Error: {e}")
return {"Failed": "Failed to insert into database. Username or email are likely in use already"}, 500
send_mail("register", email)
2024-03-10 21:41:48 +01:00
2024-03-05 16:01:26 +01:00
return {"Success": "User created successfully"}, 200
@staticmethod
def login(username: str, password: str) -> Tuple[Union[dict, str], int]:
"""
Authenticates a user with the provided username and password.
:param username: User's username.
:type username: str
:param password: User's password.
:type password: str
:return: Tuple containing a dictionary with a token and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
2024-03-10 22:58:04 +01:00
try:
with db_connection.cursor(dictionary=True) as cursor:
2024-03-05 16:01:26 +01:00
cursor.execute("select id, email, password from user where username = %s", (username,))
2024-03-11 08:20:54 +01:00
result = cursor.fetchone()
2024-03-05 16:01:26 +01:00
2024-03-10 22:58:04 +01:00
user_id = result['id']
email = result['email']
2024-03-10 22:58:04 +01:00
password_hash = result['password']
2024-03-05 16:01:26 +01:00
2024-03-10 22:58:04 +01:00
if user_id is None:
return {"Failed": "Username not found"}, 400
2024-03-05 16:01:26 +01:00
2024-03-10 22:58:04 +01:00
if not bcrypt.checkpw(password.encode('utf-8'), password_hash.encode('utf-8')):
return {"Failed": "Incorrect password"}, 401
2024-03-05 16:01:26 +01:00
2024-03-10 22:58:04 +01:00
expire = datetime.timedelta(hours=1)
2024-03-05 16:01:26 +01:00
2024-03-10 22:58:04 +01:00
token = create_access_token(identity=user_id, expires_delta=expire)
2024-03-05 16:01:26 +01:00
send_mail("login", email)
2024-03-10 22:58:04 +01:00
return {"token": token}, 200
except Error as e:
return {"Failed": f"Failed to login. Error: {e}"}, 500
2024-03-05 16:01:26 +01:00
@staticmethod
def logout(jti, exp, user_id) -> Tuple[Union[dict, str], int]:
"""
Logs out a user by invalidating the provided JWT.
:param jti: JWT ID.
:type jti: str
:param exp: JWT expiration timestamp.
:type exp: int
:return: Tuple containing a dictionary and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
UserService.__invalidate_token(jti, exp)
UserService.__send_email("logout", id=user_id)
return {"Success": "Successfully logged out"}, 200
@staticmethod
def delete_user(user_id: str) -> Tuple[Union[dict, str], int]:
2024-03-10 22:58:04 +01:00
"""
Deletes a user account.
:param user_id: User ID.
:type user_id: str
:return: Tuple containing a dictionary and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
UserService.__send_email("delete", id=user_id)
2024-03-10 22:58:04 +01:00
try:
2024-03-10 22:58:04 +01:00
with db_connection.cursor() as cursor:
cursor.execute("delete from user where id = %s", (user_id,))
db_connection.commit()
except Error as e:
return {"Failed": f"Failed to delete user. {e}"}, 500
return {"Success": "User successfully deleted"}, 200
2024-03-05 16:01:26 +01:00
@staticmethod
def update_email(user_id: str, new_email: str) -> Tuple[Union[dict, str], int]:
"""
Updates the email address for a user with the provided user ID.
:param user_id: User's ID.
:type user_id: str
:param new_email: New email address.
:type new_email: str
:return: Tuple containing a dictionary and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
2024-03-05 16:01:26 +01:00
try:
2024-03-10 22:58:04 +01:00
if not UserService.__verify_email(new_email):
return {"Failed": "Failed to verify email. Try another email"}, 400
with db_connection.cursor() as cursor:
cursor.execute("update user set email = %s where id = %s", (new_email, user_id))
db_connection.commit()
2024-03-05 16:01:26 +01:00
except Error as e:
return {"Failed": f"Failed to update email. Email is likely in use already. Error: {e}"}, 500
return {"Success": "Email successfully updated"}, 200
@staticmethod
def update_username(user_id: str, new_username: str) -> Tuple[Union[dict, str], int]:
"""
Updates the username for a user with the provided user ID.
:param user_id: User's ID.
:type user_id: str
:param new_username: New username.
:type new_username: str
:return: Tuple containing a dictionary and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
2024-03-05 16:01:26 +01:00
try:
2024-03-10 22:58:04 +01:00
if not UserService.__verify_name(new_username):
return {"Failed": "Failed to verify username. Try another one"}, 400
with db_connection.cursor() as cursor:
cursor.execute("update user set username = %s where id = %s", (new_username, user_id))
db_connection.commit()
2024-03-05 16:01:26 +01:00
except Error as e:
return {"Failed": f"Failed to update username. Username is likely in use already. Error: {e}"}, 500
return {"Success": "Username successfully updated"}, 200
@staticmethod
def update_password(user_id: str, new_password: str) -> Tuple[Union[dict, str], int]:
"""
Updates the password for a user with the provided user ID.
:param user_id: User's ID.
:type user_id: str
:param new_password: New password.
:type new_password: str
:return: Tuple containing a dictionary and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
2024-03-05 16:01:26 +01:00
2024-03-10 22:58:04 +01:00
try:
if not UserService.__verify_password(new_password):
return {"Failed": "Failed to verify password. Try another (stronger) one"}, 400
2024-03-05 16:01:26 +01:00
2024-03-10 22:58:04 +01:00
hashed_password = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt())
2024-03-05 16:01:26 +01:00
2024-03-10 22:58:04 +01:00
with db_connection.cursor() as cursor:
cursor.execute("update user set password = %s where id = %s", (new_username, user_id))
db_connection.commit()
2024-03-05 16:01:26 +01:00
except Error as e:
return {"Failed": f"Failed to update password. Error: {e}"}, 500
return {"Success": "Password successfully updated"}, 200
@staticmethod
def __send_email(message: str, username: str = None, id: str = None, email: str = None):
if email is not None:
send_mail(message, email)
return
if username is not None:
try:
with db_connection.cursor(dictionary=True) as cursor:
cursor.execute("select email from user where username = %s", (username,))
result = cursor.fetchone()
email = result['email']
send_mail("logout", email)
except Error as e:
return {"Failed": f"Failed to fetch some data. Error: {e}"}, 500
return
if id is not None:
try:
with db_connection.cursor(dictionary=True) as cursor:
cursor.execute("select email from user where id = %s", (id,))
result = cursor.fetchone()
email = result['email']
send_mail("logout", email)
except Error as e:
return {"Failed": f"Failed to fetch some data. Error: {e}"}, 500
return
raise ValueError("Invalid input data to send mail")
@staticmethod
def __invalidate_token(jti: str, exp: int):
"""
Invalidates a JWT by adding its JTI to the Redis blocklist.
:param jti: JWT ID.
:type jti: str
:param exp: JWT expiration timestamp.
:type exp: int
"""
expiration = datetime.datetime.fromtimestamp(exp)
now = datetime.datetime.now()
delta = expiration - now
jwt_redis_blocklist.set(jti, "", ex=delta)
2024-03-05 16:01:26 +01:00
@staticmethod
def __verify_email(email: str) -> bool:
"""
Verifies a given email string against a regular expression.
:param email: Email string.
:type email: str
:return: Boolean indicating whether the email successfully passed the check.
:rtype: bool
"""
2024-03-05 16:01:26 +01:00
email_regex = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
return re.match(email_regex, email) and len(email) <= 64
2024-03-05 16:01:26 +01:00
@staticmethod
def __verify_displayname(displayname: str) -> bool:
"""
Verifies a given display name string against a regular expression.
:param displayname: Display name string.
:type displayname: str
:return: Boolean indicating whether the display name successfully passed the check.
:rtype: bool
"""
displayname_regex = r"^[a-zA-Z.-_]{1,64}$"
return re.match(displayname_regex, displayname)
2024-03-05 16:01:26 +01:00
@staticmethod
def __verify_username(username: str) -> bool:
"""
Verifies a given username string against a regular expression.
:param username: Username string.
:type username: str
:return: Boolean indicating whether the username successfully passed the check.
:rtype: bool
"""
username_regex = r"^[a-z]{1,64}$"
2024-03-05 16:01:26 +01:00
return re.match(username_regex, username)
@staticmethod
def __verify_password(password: str) -> bool:
"""
Verifies a given password string against a regular expression.
:param password: Password string.
:type password: str
:return: Boolean indicating whether the password successfully passed the check.
:rtype: bool
"""
2024-03-05 16:01:26 +01:00
password_regex = r"^(?=.*?[A-Z])(?=.*?[a-z])(?=.*?[0-9])(?=.*?[#?!@$ %^&*-]).{8,64}$"
return re.match(password_regex, password)