swag-shop/app/services/user_service.py
2024-03-05 16:02:13 +01:00

130 lines
4.3 KiB
Python

import bcrypt
import re
import jwt
import datetime
from typing import Tuple, Union
from app.extensions import db_cursor, db_connection
from mysql.connector import Error
from flask_jwt_extended import create_access_token
class UserService:
@staticmethod
def create_user(username: str, email: str, password: str) -> Tuple[Union[dict, str], int]:
if not UserService.__verify_username(username):
return {"Failed": "Failed to verify username. Try another username"}, 400
if not UserService.__verify_email(email):
return {"Failed": "Failed to verify email. Try another email"}, 400
if not UserService.__verify_password(password):
return {"Failed": "Failed to verify password. Try another (stronger) password"}, 400
# Role ID 1 => Normal user
# Role ID 2 => Seller
# Role ID 3 => Admin
hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt())
try:
db_cursor.execute("select max(user_id) from user")
last_id = db_cursor.fetchone()[0]
if last_id < 23000:
return {"Failed": "Error occured when fetching last user id"}
new_id = last_id + 1
db_cursor.execute("insert into user (username, email, password, user_id, role_id) values (%s, %s, %s, %s, 1)", (username, email, hashed_password, new_id))
db_connection.commit()
except Error as e:
print(f"Error: {e}")
return {"Failed": "Failed to insert into database. Username or email are likely in use already"}, 500
return {"Success": "User created successfully"}, 200
@staticmethod
def login(username: str, password: str) -> Tuple[Union[dict, str], int]:
db_cursor.execute("select user_id, password, last_change from user where username = %s", (username,))
result = db_cursor.fetchone()
user_id = result[0]
password_hash = result[1]
last_change = result[2]
if user_id is None:
return {"Failed": "Username not found"}, 400
if not bcrypt.checkpw(password.encode('utf-8'), password_hash.encode('utf-8')):
return {"Failed": "Incorrect password"}, 401
expire = datetime.timedelta(days=1)
token = create_access_token(identity=user_id, expires_delta=expire,additional_claims={"lm": last_change})
return {"token": token}, 200
@staticmethod
def update_email(user_id: str, new_email: str) -> Tuple[Union[dict, str], int]:
if not UserService.__verify_email(new_email):
return {"Failed": "Failed to verify email. Try another email"}, 400
try:
db_cursor.execute("update user set email = %s where user_id = %s", (new_email, user_id))
db_connection.commit()
except Error as e:
return {"Failed": f"Failed to update email. Email is likely in use already. Error: {e}"}, 500
return {"Success": "Email successfully updated"}, 200
@staticmethod
def update_username(user_id: str, new_username: str) -> Tuple[Union[dict, str], int]:
if not UserService.__verify_username(new_username):
return {"Failed": "Failed to verify username. Try another one"}, 400
try:
db_cursor.execute("update user set username = %s where user_id = %s", (new_username, user_id))
db_connection.commit()
except Error as e:
return {"Failed": f"Failed to update username. Username is likely in use already. Error: {e}"}, 500
return {"Success": "Username successfully updated"}, 200
@staticmethod
def update_password(user_id: str, new_password: str) -> Tuple[Union[dict, str], int]:
if not UserService.__verify_password(new_password):
return {"Failed": "Failed to verify password. Try another (stronger) one"}, 400
hashed_password = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt())
try:
db_cursor.execute("update user set password = %s where user_id = %s", (new_username, user_id))
db_connection.commit()
except Error as e:
return {"Failed": f"Failed to update password. Error: {e}"}, 500
return {"Success": "Password successfully updated"}, 200
@staticmethod
def __verify_email(email: str) -> bool:
email_regex = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
return re.match(email_regex ,email) and len(email) <= 64
@staticmethod
def __verify_username(username: str) -> bool:
username_regex = r"^[a-zA-Z.-_]{1,64}$"
return re.match(username_regex, username)
@staticmethod
def __verify_password(password: str) -> bool:
password_regex = r"^(?=.*?[A-Z])(?=.*?[a-z])(?=.*?[0-9])(?=.*?[#?!@$ %^&*-]).{8,64}$"
return re.match(password_regex, password)