Use try ... with statements everywhere

This commit is contained in:
Thastertyn 2024-03-10 22:58:04 +01:00
parent 031ed358f1
commit afc9db32e4
2 changed files with 66 additions and 49 deletions

View File

@ -12,8 +12,6 @@ db_connection = mysql.connector.connect(
database=MySqlConfig.MYSQL_DATABASE,
)
db_cursor = db_connection.cursor(dictionary=True)
jwt_redis_blocklist = redis.StrictRedis(
host=RedisConfig.REDIS_HOST,
port=RedisConfig.REDIS_PORT,

View File

@ -6,7 +6,7 @@ from typing import Tuple, Union
from mysql.connector import Error
from flask_jwt_extended import create_access_token
from app.extensions import db_cursor, db_connection
from app.extensions import db_connection
from app.extensions import jwt_redis_blocklist
from app.mail_utils import send_mail
@ -40,23 +40,24 @@ class UserService:
:rtype: Tuple[Union[dict, str], int]
"""
if not UserService.__verify_username(username):
return {"Failed": "Failed to verify username. Try another username"}, 400
if not UserService.__verify_displayname(displayname):
return {"Failed": "Failed to verify display name. Try another name"}, 400
if not UserService.__verify_email(email):
return {"Failed": "Failed to verify email. Try another email"}, 400
if not UserService.__verify_password(password):
return {"Failed": "Failed to verify password. Try another (stronger) password"}, 400
hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt())
try:
db_cursor.execute("insert into user (username, displayname, email, password) values (%s, %s, %s, %s)", (username, displayname, email, hashed_password))
db_connection.commit()
if not UserService.__verify_username(username):
return {"Failed": "Failed to verify username. Try another username"}, 400
if not UserService.__verify_displayname(displayname):
return {"Failed": "Failed to verify display name. Try another name"}, 400
if not UserService.__verify_email(email):
return {"Failed": "Failed to verify email. Try another email"}, 400
if not UserService.__verify_password(password):
return {"Failed": "Failed to verify password. Try another (stronger) password"}, 400
hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt())
with db_connection.cursor() as cursor:
cursor.execute("insert into user (username, displayname, email, password) values (%s, %s, %s, %s)", (username, displayname, email, hashed_password))
db_connection.commit()
except Error as e:
print(f"Error: {e}")
return {"Failed": "Failed to insert into database. Username or email are likely in use already"}, 500
@ -80,24 +81,29 @@ class UserService:
:return: Tuple containing a dictionary with a token and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
try:
with db_connection.cursor(dictionary=True) as cursor:
db_cursor.execute("select id, password from user where username = %s", (username,))
result = db_cursor.fetchone()
cursor.execute("select id, password from user where username = %s", (username,))
result = db_cursor.fetchone()
user_id = result['id']
password_hash = result['password']
user_id = result['id']
password_hash = result['password']
if user_id is None:
return {"Failed": "Username not found"}, 400
if user_id is None:
return {"Failed": "Username not found"}, 400
if not bcrypt.checkpw(password.encode('utf-8'), password_hash.encode('utf-8')):
return {"Failed": "Incorrect password"}, 401
if not bcrypt.checkpw(password.encode('utf-8'), password_hash.encode('utf-8')):
return {"Failed": "Incorrect password"}, 401
expire = datetime.timedelta(hours=1)
expire = datetime.timedelta(hours=1)
token = create_access_token(identity=user_id, expires_delta=expire)
token = create_access_token(identity=user_id, expires_delta=expire)
return {"token": token}, 200
return {"token": token}, 200
except Error as e:
return {"Failed": f"Failed to login. Error: {e}"}, 500
@staticmethod
def logout(jti, exp) -> Tuple[Union[dict, str], int]:
@ -117,9 +123,19 @@ class UserService:
@staticmethod
def delete_user(user_id: str) -> Tuple[Union[dict, str], int]:
"""
Deletes a user account.
:param user_id: User ID.
:type user_id: str
:return: Tuple containing a dictionary and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
try:
db_cursor.execute("delete from user where id = %s", (user_id,))
db_connection.commit()
with db_connection.cursor() as cursor:
cursor.execute("delete from user where id = %s", (user_id,))
db_connection.commit()
except Error as e:
return {"Failed": f"Failed to delete user. {e}"}, 500
@ -138,12 +154,13 @@ class UserService:
:rtype: Tuple[Union[dict, str], int]
"""
if not UserService.__verify_email(new_email):
return {"Failed": "Failed to verify email. Try another email"}, 400
try:
db_cursor.execute("update user set email = %s where id = %s", (new_email, user_id))
db_connection.commit()
if not UserService.__verify_email(new_email):
return {"Failed": "Failed to verify email. Try another email"}, 400
with db_connection.cursor() as cursor:
cursor.execute("update user set email = %s where id = %s", (new_email, user_id))
db_connection.commit()
except Error as e:
return {"Failed": f"Failed to update email. Email is likely in use already. Error: {e}"}, 500
@ -162,12 +179,13 @@ class UserService:
:rtype: Tuple[Union[dict, str], int]
"""
if not UserService.__verify_name(new_username):
return {"Failed": "Failed to verify username. Try another one"}, 400
try:
db_cursor.execute("update user set username = %s where id = %s", (new_username, user_id))
db_connection.commit()
if not UserService.__verify_name(new_username):
return {"Failed": "Failed to verify username. Try another one"}, 400
with db_connection.cursor() as cursor:
cursor.execute("update user set username = %s where id = %s", (new_username, user_id))
db_connection.commit()
except Error as e:
return {"Failed": f"Failed to update username. Username is likely in use already. Error: {e}"}, 500
@ -186,14 +204,15 @@ class UserService:
:rtype: Tuple[Union[dict, str], int]
"""
if not UserService.__verify_password(new_password):
return {"Failed": "Failed to verify password. Try another (stronger) one"}, 400
hashed_password = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt())
try:
db_cursor.execute("update user set password = %s where id = %s", (new_username, user_id))
db_connection.commit()
if not UserService.__verify_password(new_password):
return {"Failed": "Failed to verify password. Try another (stronger) one"}, 400
hashed_password = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt())
with db_connection.cursor() as cursor:
cursor.execute("update user set password = %s where id = %s", (new_username, user_id))
db_connection.commit()
except Error as e:
return {"Failed": f"Failed to update password. Error: {e}"}, 500