Cleanup and documentation of user management code

This commit is contained in:
Thastertyn 2024-03-07 07:52:27 +01:00
parent 63ef1179fc
commit 0202a40228
2 changed files with 188 additions and 48 deletions

View File

@ -1,10 +1,21 @@
from app.api import bp_user from app.api import bp_user
from flask_jwt_extended import jwt_required, get_jwt_identity, get_jwt from flask_jwt_extended import jwt_required, get_jwt_identity, get_jwt
from flask import request, abort, jsonify from flask import request, abort
from datetime import timedelta
from app.services.user_service import UserService from app.services.user_service import UserService
from app.extensions import jwt_redis_blocklist
@bp_user.route('/register', methods=['POST'])
def register():
username = request.json.get('username')
email = request.json.get('email')
password = request.json.get('password')
if username is None or email is None or password is None:
return abort(400)
result, status_code = UserService.register(username, email, password)
return result, status_code
@bp_user.route('/login', methods=['POST']) @bp_user.route('/login', methods=['POST'])
def login(): def login():
@ -16,43 +27,20 @@ def login():
result, status_code = UserService.login(username, password) result, status_code = UserService.login(username, password)
return jsonify(**result), status_code return result, status_code
@bp_user.route('/logout', methods=['DELETE']) @bp_user.route('/logout', methods=['DELETE'])
@jwt_required() @jwt_required()
def logout(): def logout():
jti = get_jwt()["jti"] jwt = get_jwt()
jwt_redis_blocklist.set(jti, "", ex=timedelta(days=1))
return {"Success": "Successfully logged out"}, 200 jti = jwt['jti']
exp = jwt['exp']
result, status_code = UserService.logout(jti, exp)
@bp_user.route('/create', methods=['POST']) return result, status_code
def create_user():
username = request.json.get('username')
email = request.json.get('email')
password = request.json.get('password')
if username is None or email is None or password is None: @bp_user.route('/update/username', methods=['PUT'])
return abort(400)
result, status_code = UserService.create_user(username, email, password)
return jsonify(**result), status_code
@bp_user.route('/update/email', methods=['POST'])
@jwt_required()
def update_email():
username = get_jwt_identity()
new_mail = request.json.get('new_email')
if new_mail is None:
return abort(400)
result, status_code = UserService.update_email(username, new_mail)
return jsonify(**result), status_code
@bp_user.route('/update/username', methods=['POST'])
@jwt_required() @jwt_required()
def update_username(): def update_username():
username = get_jwt_identity() username = get_jwt_identity()
@ -63,9 +51,35 @@ def update_username():
result, status_code = UserService.update_username(username, new_username) result, status_code = UserService.update_username(username, new_username)
return jsonify(**result), status_code jwt = get_jwt()
@bp_user.route('/update/password', methods=['POST']) jti = jwt['jti']
exp = jwt['exp']
UserService.logout(jti, exp)
return result, status_code
@bp_user.route('/update/email', methods=['PUT'])
@jwt_required()
def update_email():
username = get_jwt_identity()
new_mail = request.json.get('new_email')
if new_mail is None:
return abort(400)
result, status_code = UserService.update_email(username, new_mail)
jwt = get_jwt()
jti = jwt['jti']
exp = jwt['exp']
UserService.logout(jti, exp)
return result, status_code
@bp_user.route('/update/password', methods=['PUT'])
@jwt_required() @jwt_required()
def update_password(): def update_password():
username = get_jwt_identity() username = get_jwt_identity()
@ -76,4 +90,15 @@ def update_password():
result, status_code = UserService.update_password(username, new_password) result, status_code = UserService.update_password(username, new_password)
return jsonify(**result), status_code jwt = get_jwt()
jti = jwt['jti']
exp = jwt['exp']
UserService.logout(jti, exp)
return result, status_code
@bp_user.route('/delete', methods=['DELETE'])
@jwt_required()
def delete_user():
return abort(501)

View File

@ -3,17 +3,40 @@ import re
import jwt import jwt
import datetime import datetime
from typing import Tuple, Union from typing import Tuple, Union
from mysql.connector import Error
from flask_jwt_extended import create_access_token
from app.extensions import db_cursor, db_connection from app.extensions import db_cursor, db_connection
from mysql.connector import Error from app.extensions import jwt_redis_blocklist
from flask_jwt_extended import create_access_token
class UserService: class UserService:
"""
UserService class provides methods for user-related operations.
Methods:
- register(username: str, email: str, password: str) -> Tuple[Union[dict, str], int]
- login(username: str, password: str) -> Tuple[Union[dict, str], int]
- logout(jti, exp) -> Tuple[Union[dict, str], int]
- update_email(user_id: str, new_email: str) -> Tuple[Union[dict, str], int]
- update_username(user_id: str, new_username: str) -> Tuple[Union[dict, str], int]
- update_password(user_id: str, new_password: str) -> Tuple[Union[dict, str], int]
"""
@staticmethod @staticmethod
def create_user(username: str, email: str, password: str) -> Tuple[Union[dict, str], int]: def register(username: str, email: str, password: str) -> Tuple[Union[dict, str], int]:
"""
Registers a new user with the provided username, email, and password.
:param username: User's username.
:type username: str
:param email: User's email address.
:type email: str
:param password: User's password.
:type password: str
:return: Tuple containing a dictionary and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
if not UserService.__verify_username(username): if not UserService.__verify_username(username):
return {"Failed": "Failed to verify username. Try another username"}, 400 return {"Failed": "Failed to verify username. Try another username"}, 400
@ -24,10 +47,6 @@ class UserService:
if not UserService.__verify_password(password): if not UserService.__verify_password(password):
return {"Failed": "Failed to verify password. Try another (stronger) password"}, 400 return {"Failed": "Failed to verify password. Try another (stronger) password"}, 400
# Role ID 1 => Normal user
# Role ID 2 => Seller
# Role ID 3 => Admin
hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()) hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt())
try: try:
@ -49,6 +68,16 @@ class UserService:
@staticmethod @staticmethod
def login(username: str, password: str) -> Tuple[Union[dict, str], int]: def login(username: str, password: str) -> Tuple[Union[dict, str], int]:
"""
Authenticates a user with the provided username and password.
:param username: User's username.
:type username: str
:param password: User's password.
:type password: str
:return: Tuple containing a dictionary with a token and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
db_cursor.execute("select user_id, password, last_change from user where username = %s", (username,)) db_cursor.execute("select user_id, password, last_change from user where username = %s", (username,))
result = db_cursor.fetchone() result = db_cursor.fetchone()
@ -63,14 +92,40 @@ class UserService:
if not bcrypt.checkpw(password.encode('utf-8'), password_hash.encode('utf-8')): if not bcrypt.checkpw(password.encode('utf-8'), password_hash.encode('utf-8')):
return {"Failed": "Incorrect password"}, 401 return {"Failed": "Incorrect password"}, 401
expire = datetime.timedelta(days=1) expire = datetime.timedelta(hours=1)
token = create_access_token(identity=user_id, expires_delta=expire,additional_claims={"lm": last_change}) token = create_access_token(identity=user_id, expires_delta=expire,additional_claims={"lm": last_change})
return {"token": token}, 200 return {"token": token}, 200
@staticmethod
def logout(jti, exp) -> Tuple[Union[dict, str], int]:
"""
Logs out a user by invalidating the provided JWT.
:param jti: JWT ID.
:type jti: str
:param exp: JWT expiration timestamp.
:type exp: int
:return: Tuple containing a dictionary and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
UserService.__invalidate_token(jti, exp)
return {"Success": "Successfully logged out"}, 200
@staticmethod @staticmethod
def update_email(user_id: str, new_email: str) -> Tuple[Union[dict, str], int]: def update_email(user_id: str, new_email: str) -> Tuple[Union[dict, str], int]:
"""
Updates the email address for a user with the provided user ID.
:param user_id: User's ID.
:type user_id: str
:param new_email: New email address.
:type new_email: str
:return: Tuple containing a dictionary and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
if not UserService.__verify_email(new_email): if not UserService.__verify_email(new_email):
return {"Failed": "Failed to verify email. Try another email"}, 400 return {"Failed": "Failed to verify email. Try another email"}, 400
@ -85,6 +140,16 @@ class UserService:
@staticmethod @staticmethod
def update_username(user_id: str, new_username: str) -> Tuple[Union[dict, str], int]: def update_username(user_id: str, new_username: str) -> Tuple[Union[dict, str], int]:
"""
Updates the username for a user with the provided user ID.
:param user_id: User's ID.
:type user_id: str
:param new_username: New username.
:type new_username: str
:return: Tuple containing a dictionary and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
if not UserService.__verify_username(new_username): if not UserService.__verify_username(new_username):
return {"Failed": "Failed to verify username. Try another one"}, 400 return {"Failed": "Failed to verify username. Try another one"}, 400
@ -99,6 +164,16 @@ class UserService:
@staticmethod @staticmethod
def update_password(user_id: str, new_password: str) -> Tuple[Union[dict, str], int]: def update_password(user_id: str, new_password: str) -> Tuple[Union[dict, str], int]:
"""
Updates the password for a user with the provided user ID.
:param user_id: User's ID.
:type user_id: str
:param new_password: New password.
:type new_password: str
:return: Tuple containing a dictionary and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
if not UserService.__verify_password(new_password): if not UserService.__verify_password(new_password):
return {"Failed": "Failed to verify password. Try another (stronger) one"}, 400 return {"Failed": "Failed to verify password. Try another (stronger) one"}, 400
@ -113,17 +188,57 @@ class UserService:
return {"Success": "Password successfully updated"}, 200 return {"Success": "Password successfully updated"}, 200
@staticmethod
def __invalidate_token(jti: str, exp: int):
"""
Invalidates a JWT by adding its JTI to the Redis blocklist.
:param jti: JWT ID.
:type jti: str
:param exp: JWT expiration timestamp.
:type exp: int
"""
expiration = datetime.datetime.fromtimestamp(exp)
now = datetime.datetime.now()
delta = expiration - now
jwt_redis_blocklist.set(jti, "", ex=delta)
@staticmethod @staticmethod
def __verify_email(email: str) -> bool: def __verify_email(email: str) -> bool:
"""
Verifies a given email string against a regular expression.
:param email: Email string.
:type email: str
:return: Boolean indicating whether the email successfully passed the check.
:rtype: bool
"""
email_regex = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" email_regex = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
return re.match(email_regex, email) and len(email) <= 64 return re.match(email_regex, email) and len(email) <= 64
@staticmethod @staticmethod
def __verify_username(username: str) -> bool: def __verify_username(username: str) -> bool:
"""
Verifies a given username string against a regular expression.
:param username: Username string.
:type username: str
:return: Boolean indicating whether the username successfully passed the check.
:rtype: bool
"""
username_regex = r"^[a-zA-Z.-_]{1,64}$" username_regex = r"^[a-zA-Z.-_]{1,64}$"
return re.match(username_regex, username) return re.match(username_regex, username)
@staticmethod @staticmethod
def __verify_password(password: str) -> bool: def __verify_password(password: str) -> bool:
"""
Verifies a given password string against a regular expression.
:param password: Password string.
:type password: str
:return: Boolean indicating whether the password successfully passed the check.
:rtype: bool
"""
password_regex = r"^(?=.*?[A-Z])(?=.*?[a-z])(?=.*?[0-9])(?=.*?[#?!@$ %^&*-]).{8,64}$" password_regex = r"^(?=.*?[A-Z])(?=.*?[a-z])(?=.*?[0-9])(?=.*?[#?!@$ %^&*-]).{8,64}$"
return re.match(password_regex, password) return re.match(password_regex, password)