From afc9db32e4f5388dedff17a28844cbe502584e9d Mon Sep 17 00:00:00 2001 From: Thastertyn Date: Sun, 10 Mar 2024 22:58:04 +0100 Subject: [PATCH] Use try ... with statements everywhere --- app/extensions.py | 2 - app/services/user_service.py | 113 ++++++++++++++++++++--------------- 2 files changed, 66 insertions(+), 49 deletions(-) diff --git a/app/extensions.py b/app/extensions.py index be72693..7813fb2 100644 --- a/app/extensions.py +++ b/app/extensions.py @@ -12,8 +12,6 @@ db_connection = mysql.connector.connect( database=MySqlConfig.MYSQL_DATABASE, ) -db_cursor = db_connection.cursor(dictionary=True) - jwt_redis_blocklist = redis.StrictRedis( host=RedisConfig.REDIS_HOST, port=RedisConfig.REDIS_PORT, diff --git a/app/services/user_service.py b/app/services/user_service.py index 0e06184..c27ad7e 100644 --- a/app/services/user_service.py +++ b/app/services/user_service.py @@ -6,7 +6,7 @@ from typing import Tuple, Union from mysql.connector import Error from flask_jwt_extended import create_access_token -from app.extensions import db_cursor, db_connection +from app.extensions import db_connection from app.extensions import jwt_redis_blocklist from app.mail_utils import send_mail @@ -40,23 +40,24 @@ class UserService: :rtype: Tuple[Union[dict, str], int] """ - if not UserService.__verify_username(username): - return {"Failed": "Failed to verify username. Try another username"}, 400 - - if not UserService.__verify_displayname(displayname): - return {"Failed": "Failed to verify display name. Try another name"}, 400 - - if not UserService.__verify_email(email): - return {"Failed": "Failed to verify email. Try another email"}, 400 - - if not UserService.__verify_password(password): - return {"Failed": "Failed to verify password. Try another (stronger) password"}, 400 - - hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()) - try: - db_cursor.execute("insert into user (username, displayname, email, password) values (%s, %s, %s, %s)", (username, displayname, email, hashed_password)) - db_connection.commit() + if not UserService.__verify_username(username): + return {"Failed": "Failed to verify username. Try another username"}, 400 + + if not UserService.__verify_displayname(displayname): + return {"Failed": "Failed to verify display name. Try another name"}, 400 + + if not UserService.__verify_email(email): + return {"Failed": "Failed to verify email. Try another email"}, 400 + + if not UserService.__verify_password(password): + return {"Failed": "Failed to verify password. Try another (stronger) password"}, 400 + + hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()) + + with db_connection.cursor() as cursor: + cursor.execute("insert into user (username, displayname, email, password) values (%s, %s, %s, %s)", (username, displayname, email, hashed_password)) + db_connection.commit() except Error as e: print(f"Error: {e}") return {"Failed": "Failed to insert into database. Username or email are likely in use already"}, 500 @@ -80,24 +81,29 @@ class UserService: :return: Tuple containing a dictionary with a token and an HTTP status code. :rtype: Tuple[Union[dict, str], int] """ + try: + with db_connection.cursor(dictionary=True) as cursor: - db_cursor.execute("select id, password from user where username = %s", (username,)) - result = db_cursor.fetchone() + cursor.execute("select id, password from user where username = %s", (username,)) + result = db_cursor.fetchone() - user_id = result['id'] - password_hash = result['password'] + user_id = result['id'] + password_hash = result['password'] - if user_id is None: - return {"Failed": "Username not found"}, 400 + if user_id is None: + return {"Failed": "Username not found"}, 400 - if not bcrypt.checkpw(password.encode('utf-8'), password_hash.encode('utf-8')): - return {"Failed": "Incorrect password"}, 401 + if not bcrypt.checkpw(password.encode('utf-8'), password_hash.encode('utf-8')): + return {"Failed": "Incorrect password"}, 401 - expire = datetime.timedelta(hours=1) + expire = datetime.timedelta(hours=1) - token = create_access_token(identity=user_id, expires_delta=expire) + token = create_access_token(identity=user_id, expires_delta=expire) - return {"token": token}, 200 + return {"token": token}, 200 + + except Error as e: + return {"Failed": f"Failed to login. Error: {e}"}, 500 @staticmethod def logout(jti, exp) -> Tuple[Union[dict, str], int]: @@ -117,9 +123,19 @@ class UserService: @staticmethod def delete_user(user_id: str) -> Tuple[Union[dict, str], int]: + """ + Deletes a user account. + + :param user_id: User ID. + :type user_id: str + :return: Tuple containing a dictionary and an HTTP status code. + :rtype: Tuple[Union[dict, str], int] + """ + try: - db_cursor.execute("delete from user where id = %s", (user_id,)) - db_connection.commit() + with db_connection.cursor() as cursor: + cursor.execute("delete from user where id = %s", (user_id,)) + db_connection.commit() except Error as e: return {"Failed": f"Failed to delete user. {e}"}, 500 @@ -138,12 +154,13 @@ class UserService: :rtype: Tuple[Union[dict, str], int] """ - if not UserService.__verify_email(new_email): - return {"Failed": "Failed to verify email. Try another email"}, 400 - try: - db_cursor.execute("update user set email = %s where id = %s", (new_email, user_id)) - db_connection.commit() + if not UserService.__verify_email(new_email): + return {"Failed": "Failed to verify email. Try another email"}, 400 + + with db_connection.cursor() as cursor: + cursor.execute("update user set email = %s where id = %s", (new_email, user_id)) + db_connection.commit() except Error as e: return {"Failed": f"Failed to update email. Email is likely in use already. Error: {e}"}, 500 @@ -162,12 +179,13 @@ class UserService: :rtype: Tuple[Union[dict, str], int] """ - if not UserService.__verify_name(new_username): - return {"Failed": "Failed to verify username. Try another one"}, 400 - try: - db_cursor.execute("update user set username = %s where id = %s", (new_username, user_id)) - db_connection.commit() + if not UserService.__verify_name(new_username): + return {"Failed": "Failed to verify username. Try another one"}, 400 + + with db_connection.cursor() as cursor: + cursor.execute("update user set username = %s where id = %s", (new_username, user_id)) + db_connection.commit() except Error as e: return {"Failed": f"Failed to update username. Username is likely in use already. Error: {e}"}, 500 @@ -186,14 +204,15 @@ class UserService: :rtype: Tuple[Union[dict, str], int] """ - if not UserService.__verify_password(new_password): - return {"Failed": "Failed to verify password. Try another (stronger) one"}, 400 - - hashed_password = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt()) - try: - db_cursor.execute("update user set password = %s where id = %s", (new_username, user_id)) - db_connection.commit() + if not UserService.__verify_password(new_password): + return {"Failed": "Failed to verify password. Try another (stronger) one"}, 400 + + hashed_password = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt()) + + with db_connection.cursor() as cursor: + cursor.execute("update user set password = %s where id = %s", (new_username, user_id)) + db_connection.commit() except Error as e: return {"Failed": f"Failed to update password. Error: {e}"}, 500