Compare commits
No commits in common. "0202a4022845af53d1fff0b8373ba68037dbe1d0" and "39d69ee0cacbeadda3523932f9fb8369fc134c74" have entirely different histories.
0202a40228
...
39d69ee0ca
5
.vscode/launch.json
vendored
5
.vscode/launch.json
vendored
@ -15,9 +15,10 @@
|
|||||||
"FLASK_APP": "main.py",
|
"FLASK_APP": "main.py",
|
||||||
"FLASK_ENV": "development"
|
"FLASK_ENV": "development"
|
||||||
},
|
},
|
||||||
|
"envFile": "${workspaceFolder}/.env",
|
||||||
|
"stopOnEntry": false,
|
||||||
"console": "internalConsole",
|
"console": "internalConsole",
|
||||||
"autoReload": {"enable": true},
|
"autoReload": {"enable": true}
|
||||||
"justMyCode": true
|
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
3
.vscode/settings.json
vendored
3
.vscode/settings.json
vendored
@ -5,7 +5,6 @@
|
|||||||
"gensalt",
|
"gensalt",
|
||||||
"hashpw",
|
"hashpw",
|
||||||
"checkpw",
|
"checkpw",
|
||||||
"jsonify",
|
"jsonify"
|
||||||
"rtype"
|
|
||||||
]
|
]
|
||||||
}
|
}
|
@ -14,6 +14,4 @@ def create_app():
|
|||||||
from app.config import FlaskTesting, FlaskProduction
|
from app.config import FlaskTesting, FlaskProduction
|
||||||
app.config.from_object(FlaskTesting)
|
app.config.from_object(FlaskTesting)
|
||||||
|
|
||||||
from . import jwt_utils
|
|
||||||
|
|
||||||
return app
|
return app
|
@ -1,21 +1,10 @@
|
|||||||
from app.api import bp_user
|
from app.api import bp_user
|
||||||
from flask_jwt_extended import jwt_required, get_jwt_identity, get_jwt
|
from flask_jwt_extended import jwt_required, get_jwt_identity, get_jwt
|
||||||
from flask import request, abort
|
from flask import request, abort, jsonify
|
||||||
|
from datetime import timedelta
|
||||||
|
|
||||||
from app.services.user_service import UserService
|
from app.services.user_service import UserService
|
||||||
|
from app.extensions import jwt_redis_blocklist
|
||||||
@bp_user.route('/register', methods=['POST'])
|
|
||||||
def register():
|
|
||||||
username = request.json.get('username')
|
|
||||||
email = request.json.get('email')
|
|
||||||
password = request.json.get('password')
|
|
||||||
|
|
||||||
if username is None or email is None or password is None:
|
|
||||||
return abort(400)
|
|
||||||
|
|
||||||
result, status_code = UserService.register(username, email, password)
|
|
||||||
|
|
||||||
return result, status_code
|
|
||||||
|
|
||||||
@bp_user.route('/login', methods=['POST'])
|
@bp_user.route('/login', methods=['POST'])
|
||||||
def login():
|
def login():
|
||||||
@ -27,39 +16,30 @@ def login():
|
|||||||
|
|
||||||
result, status_code = UserService.login(username, password)
|
result, status_code = UserService.login(username, password)
|
||||||
|
|
||||||
return result, status_code
|
return jsonify(**result), status_code
|
||||||
|
|
||||||
@bp_user.route('/logout', methods=['DELETE'])
|
@bp_user.route('/logout', methods=['DELETE'])
|
||||||
@jwt_required()
|
@jwt_required()
|
||||||
def logout():
|
def logout():
|
||||||
jwt = get_jwt()
|
jti = get_jwt()["jti"]
|
||||||
|
jwt_redis_blocklist.set(jti, "", ex=timedelta(days=1))
|
||||||
|
|
||||||
jti = jwt['jti']
|
return {"Success": "Successfully logged out"}, 200
|
||||||
exp = jwt['exp']
|
|
||||||
result, status_code = UserService.logout(jti, exp)
|
|
||||||
|
|
||||||
return result, status_code
|
@bp_user.route('/create', methods=['POST'])
|
||||||
|
def create_user():
|
||||||
|
username = request.json.get('username')
|
||||||
|
email = request.json.get('email')
|
||||||
|
password = request.json.get('password')
|
||||||
|
|
||||||
@bp_user.route('/update/username', methods=['PUT'])
|
if username is None or email is None or password is None:
|
||||||
@jwt_required()
|
|
||||||
def update_username():
|
|
||||||
username = get_jwt_identity()
|
|
||||||
new_username = request.json.get('new_username')
|
|
||||||
|
|
||||||
if new_username is None:
|
|
||||||
return abort(400)
|
return abort(400)
|
||||||
|
|
||||||
result, status_code = UserService.update_username(username, new_username)
|
result, status_code = UserService.create_user(username, email, password)
|
||||||
|
|
||||||
jwt = get_jwt()
|
return jsonify(**result), status_code
|
||||||
|
|
||||||
jti = jwt['jti']
|
|
||||||
exp = jwt['exp']
|
|
||||||
UserService.logout(jti, exp)
|
|
||||||
|
|
||||||
return result, status_code
|
@bp_user.route('/update/email', methods=['POST'])
|
||||||
|
|
||||||
@bp_user.route('/update/email', methods=['PUT'])
|
|
||||||
@jwt_required()
|
@jwt_required()
|
||||||
def update_email():
|
def update_email():
|
||||||
username = get_jwt_identity()
|
username = get_jwt_identity()
|
||||||
@ -70,16 +50,22 @@ def update_email():
|
|||||||
|
|
||||||
result, status_code = UserService.update_email(username, new_mail)
|
result, status_code = UserService.update_email(username, new_mail)
|
||||||
|
|
||||||
jwt = get_jwt()
|
return jsonify(**result), status_code
|
||||||
|
|
||||||
jti = jwt['jti']
|
|
||||||
exp = jwt['exp']
|
|
||||||
UserService.logout(jti, exp)
|
|
||||||
|
|
||||||
return result, status_code
|
@bp_user.route('/update/username', methods=['POST'])
|
||||||
|
@jwt_required()
|
||||||
|
def update_username():
|
||||||
|
username = get_jwt_identity()
|
||||||
|
new_username = request.json.get('new_username')
|
||||||
|
|
||||||
|
if new_username is None:
|
||||||
|
return abort(400)
|
||||||
|
|
||||||
@bp_user.route('/update/password', methods=['PUT'])
|
result, status_code = UserService.update_username(username, new_username)
|
||||||
|
|
||||||
|
return jsonify(**result), status_code
|
||||||
|
|
||||||
|
@bp_user.route('/update/password', methods=['POST'])
|
||||||
@jwt_required()
|
@jwt_required()
|
||||||
def update_password():
|
def update_password():
|
||||||
username = get_jwt_identity()
|
username = get_jwt_identity()
|
||||||
@ -90,15 +76,4 @@ def update_password():
|
|||||||
|
|
||||||
result, status_code = UserService.update_password(username, new_password)
|
result, status_code = UserService.update_password(username, new_password)
|
||||||
|
|
||||||
jwt = get_jwt()
|
return jsonify(**result), status_code
|
||||||
|
|
||||||
jti = jwt['jti']
|
|
||||||
exp = jwt['exp']
|
|
||||||
UserService.logout(jti, exp)
|
|
||||||
|
|
||||||
return result, status_code
|
|
||||||
|
|
||||||
@bp_user.route('/delete', methods=['DELETE'])
|
|
||||||
@jwt_required()
|
|
||||||
def delete_user():
|
|
||||||
return abort(501)
|
|
@ -2,11 +2,11 @@ from app.extensions import jwt_redis_blocklist
|
|||||||
|
|
||||||
from . import jwt_manager
|
from . import jwt_manager
|
||||||
|
|
||||||
from app import app
|
@jwt.token_in_blocklist_loader
|
||||||
|
|
||||||
@jwt_manager.token_in_blocklist_loader
|
|
||||||
def check_if_token_is_revoked(jwt_header, jwt_payload: dict) -> bool:
|
def check_if_token_is_revoked(jwt_header, jwt_payload: dict) -> bool:
|
||||||
jti = jwt_payload["jti"]
|
jti = jwt_payload["jti"]
|
||||||
token_in_redis = jwt_redis_blocklist.get(jti)
|
token_in_redis = jwt_redis_blocklist.get(jti)
|
||||||
|
|
||||||
|
print(token_in_redis)
|
||||||
|
|
||||||
return token_in_redis is not None
|
return token_in_redis is not None
|
||||||
|
@ -3,40 +3,17 @@ import re
|
|||||||
import jwt
|
import jwt
|
||||||
import datetime
|
import datetime
|
||||||
from typing import Tuple, Union
|
from typing import Tuple, Union
|
||||||
from mysql.connector import Error
|
|
||||||
from flask_jwt_extended import create_access_token
|
|
||||||
|
|
||||||
from app.extensions import db_cursor, db_connection
|
from app.extensions import db_cursor, db_connection
|
||||||
from app.extensions import jwt_redis_blocklist
|
from mysql.connector import Error
|
||||||
|
|
||||||
|
from flask_jwt_extended import create_access_token
|
||||||
|
|
||||||
|
|
||||||
class UserService:
|
class UserService:
|
||||||
"""
|
|
||||||
UserService class provides methods for user-related operations.
|
|
||||||
|
|
||||||
Methods:
|
|
||||||
- register(username: str, email: str, password: str) -> Tuple[Union[dict, str], int]
|
|
||||||
- login(username: str, password: str) -> Tuple[Union[dict, str], int]
|
|
||||||
- logout(jti, exp) -> Tuple[Union[dict, str], int]
|
|
||||||
- update_email(user_id: str, new_email: str) -> Tuple[Union[dict, str], int]
|
|
||||||
- update_username(user_id: str, new_username: str) -> Tuple[Union[dict, str], int]
|
|
||||||
- update_password(user_id: str, new_password: str) -> Tuple[Union[dict, str], int]
|
|
||||||
"""
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def register(username: str, email: str, password: str) -> Tuple[Union[dict, str], int]:
|
def create_user(username: str, email: str, password: str) -> Tuple[Union[dict, str], int]:
|
||||||
"""
|
|
||||||
Registers a new user with the provided username, email, and password.
|
|
||||||
|
|
||||||
:param username: User's username.
|
|
||||||
:type username: str
|
|
||||||
:param email: User's email address.
|
|
||||||
:type email: str
|
|
||||||
:param password: User's password.
|
|
||||||
:type password: str
|
|
||||||
:return: Tuple containing a dictionary and an HTTP status code.
|
|
||||||
:rtype: Tuple[Union[dict, str], int]
|
|
||||||
"""
|
|
||||||
|
|
||||||
if not UserService.__verify_username(username):
|
if not UserService.__verify_username(username):
|
||||||
return {"Failed": "Failed to verify username. Try another username"}, 400
|
return {"Failed": "Failed to verify username. Try another username"}, 400
|
||||||
@ -47,6 +24,10 @@ class UserService:
|
|||||||
if not UserService.__verify_password(password):
|
if not UserService.__verify_password(password):
|
||||||
return {"Failed": "Failed to verify password. Try another (stronger) password"}, 400
|
return {"Failed": "Failed to verify password. Try another (stronger) password"}, 400
|
||||||
|
|
||||||
|
# Role ID 1 => Normal user
|
||||||
|
# Role ID 2 => Seller
|
||||||
|
# Role ID 3 => Admin
|
||||||
|
|
||||||
hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt())
|
hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt())
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -68,16 +49,6 @@ class UserService:
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def login(username: str, password: str) -> Tuple[Union[dict, str], int]:
|
def login(username: str, password: str) -> Tuple[Union[dict, str], int]:
|
||||||
"""
|
|
||||||
Authenticates a user with the provided username and password.
|
|
||||||
|
|
||||||
:param username: User's username.
|
|
||||||
:type username: str
|
|
||||||
:param password: User's password.
|
|
||||||
:type password: str
|
|
||||||
:return: Tuple containing a dictionary with a token and an HTTP status code.
|
|
||||||
:rtype: Tuple[Union[dict, str], int]
|
|
||||||
"""
|
|
||||||
|
|
||||||
db_cursor.execute("select user_id, password, last_change from user where username = %s", (username,))
|
db_cursor.execute("select user_id, password, last_change from user where username = %s", (username,))
|
||||||
result = db_cursor.fetchone()
|
result = db_cursor.fetchone()
|
||||||
@ -92,41 +63,15 @@ class UserService:
|
|||||||
if not bcrypt.checkpw(password.encode('utf-8'), password_hash.encode('utf-8')):
|
if not bcrypt.checkpw(password.encode('utf-8'), password_hash.encode('utf-8')):
|
||||||
return {"Failed": "Incorrect password"}, 401
|
return {"Failed": "Incorrect password"}, 401
|
||||||
|
|
||||||
expire = datetime.timedelta(hours=1)
|
expire = datetime.timedelta(days=1)
|
||||||
|
|
||||||
token = create_access_token(identity=user_id, expires_delta=expire,additional_claims={"lm": last_change})
|
token = create_access_token(identity=user_id, expires_delta=expire,additional_claims={"lm": last_change})
|
||||||
|
|
||||||
return {"token": token}, 200
|
return {"token": token}, 200
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def logout(jti, exp) -> Tuple[Union[dict, str], int]:
|
|
||||||
"""
|
|
||||||
Logs out a user by invalidating the provided JWT.
|
|
||||||
|
|
||||||
:param jti: JWT ID.
|
|
||||||
:type jti: str
|
|
||||||
:param exp: JWT expiration timestamp.
|
|
||||||
:type exp: int
|
|
||||||
:return: Tuple containing a dictionary and an HTTP status code.
|
|
||||||
:rtype: Tuple[Union[dict, str], int]
|
|
||||||
"""
|
|
||||||
UserService.__invalidate_token(jti, exp)
|
|
||||||
|
|
||||||
return {"Success": "Successfully logged out"}, 200
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def update_email(user_id: str, new_email: str) -> Tuple[Union[dict, str], int]:
|
def update_email(user_id: str, new_email: str) -> Tuple[Union[dict, str], int]:
|
||||||
"""
|
|
||||||
Updates the email address for a user with the provided user ID.
|
|
||||||
|
|
||||||
:param user_id: User's ID.
|
|
||||||
:type user_id: str
|
|
||||||
:param new_email: New email address.
|
|
||||||
:type new_email: str
|
|
||||||
:return: Tuple containing a dictionary and an HTTP status code.
|
|
||||||
:rtype: Tuple[Union[dict, str], int]
|
|
||||||
"""
|
|
||||||
|
|
||||||
if not UserService.__verify_email(new_email):
|
if not UserService.__verify_email(new_email):
|
||||||
return {"Failed": "Failed to verify email. Try another email"}, 400
|
return {"Failed": "Failed to verify email. Try another email"}, 400
|
||||||
|
|
||||||
@ -140,17 +85,7 @@ class UserService:
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def update_username(user_id: str, new_username: str) -> Tuple[Union[dict, str], int]:
|
def update_username(user_id: str, new_username: str) -> Tuple[Union[dict, str], int]:
|
||||||
"""
|
|
||||||
Updates the username for a user with the provided user ID.
|
|
||||||
|
|
||||||
:param user_id: User's ID.
|
|
||||||
:type user_id: str
|
|
||||||
:param new_username: New username.
|
|
||||||
:type new_username: str
|
|
||||||
:return: Tuple containing a dictionary and an HTTP status code.
|
|
||||||
:rtype: Tuple[Union[dict, str], int]
|
|
||||||
"""
|
|
||||||
|
|
||||||
if not UserService.__verify_username(new_username):
|
if not UserService.__verify_username(new_username):
|
||||||
return {"Failed": "Failed to verify username. Try another one"}, 400
|
return {"Failed": "Failed to verify username. Try another one"}, 400
|
||||||
|
|
||||||
@ -164,16 +99,6 @@ class UserService:
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def update_password(user_id: str, new_password: str) -> Tuple[Union[dict, str], int]:
|
def update_password(user_id: str, new_password: str) -> Tuple[Union[dict, str], int]:
|
||||||
"""
|
|
||||||
Updates the password for a user with the provided user ID.
|
|
||||||
|
|
||||||
:param user_id: User's ID.
|
|
||||||
:type user_id: str
|
|
||||||
:param new_password: New password.
|
|
||||||
:type new_password: str
|
|
||||||
:return: Tuple containing a dictionary and an HTTP status code.
|
|
||||||
:rtype: Tuple[Union[dict, str], int]
|
|
||||||
"""
|
|
||||||
|
|
||||||
if not UserService.__verify_password(new_password):
|
if not UserService.__verify_password(new_password):
|
||||||
return {"Failed": "Failed to verify password. Try another (stronger) one"}, 400
|
return {"Failed": "Failed to verify password. Try another (stronger) one"}, 400
|
||||||
@ -188,57 +113,17 @@ class UserService:
|
|||||||
|
|
||||||
return {"Success": "Password successfully updated"}, 200
|
return {"Success": "Password successfully updated"}, 200
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def __invalidate_token(jti: str, exp: int):
|
|
||||||
"""
|
|
||||||
Invalidates a JWT by adding its JTI to the Redis blocklist.
|
|
||||||
|
|
||||||
:param jti: JWT ID.
|
|
||||||
:type jti: str
|
|
||||||
:param exp: JWT expiration timestamp.
|
|
||||||
:type exp: int
|
|
||||||
"""
|
|
||||||
expiration = datetime.datetime.fromtimestamp(exp)
|
|
||||||
now = datetime.datetime.now()
|
|
||||||
|
|
||||||
delta = expiration - now
|
|
||||||
jwt_redis_blocklist.set(jti, "", ex=delta)
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def __verify_email(email: str) -> bool:
|
def __verify_email(email: str) -> bool:
|
||||||
"""
|
|
||||||
Verifies a given email string against a regular expression.
|
|
||||||
|
|
||||||
:param email: Email string.
|
|
||||||
:type email: str
|
|
||||||
:return: Boolean indicating whether the email successfully passed the check.
|
|
||||||
:rtype: bool
|
|
||||||
"""
|
|
||||||
email_regex = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
|
email_regex = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
|
||||||
return re.match(email_regex, email) and len(email) <= 64
|
return re.match(email_regex ,email) and len(email) <= 64
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def __verify_username(username: str) -> bool:
|
def __verify_username(username: str) -> bool:
|
||||||
"""
|
|
||||||
Verifies a given username string against a regular expression.
|
|
||||||
|
|
||||||
:param username: Username string.
|
|
||||||
:type username: str
|
|
||||||
:return: Boolean indicating whether the username successfully passed the check.
|
|
||||||
:rtype: bool
|
|
||||||
"""
|
|
||||||
username_regex = r"^[a-zA-Z.-_]{1,64}$"
|
username_regex = r"^[a-zA-Z.-_]{1,64}$"
|
||||||
return re.match(username_regex, username)
|
return re.match(username_regex, username)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def __verify_password(password: str) -> bool:
|
def __verify_password(password: str) -> bool:
|
||||||
"""
|
|
||||||
Verifies a given password string against a regular expression.
|
|
||||||
|
|
||||||
:param password: Password string.
|
|
||||||
:type password: str
|
|
||||||
:return: Boolean indicating whether the password successfully passed the check.
|
|
||||||
:rtype: bool
|
|
||||||
"""
|
|
||||||
password_regex = r"^(?=.*?[A-Z])(?=.*?[a-z])(?=.*?[0-9])(?=.*?[#?!@$ %^&*-]).{8,64}$"
|
password_regex = r"^(?=.*?[A-Z])(?=.*?[a-z])(?=.*?[0-9])(?=.*?[#?!@$ %^&*-]).{8,64}$"
|
||||||
return re.match(password_regex, password)
|
return re.match(password_regex, password)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user