Massive rewrite of user and product logic. Still WIP

This commit is contained in:
Thastertyn 2024-06-02 22:41:14 +02:00
parent be7b8064d6
commit c018d2fb52
49 changed files with 1471 additions and 993 deletions

View File

@ -7,20 +7,23 @@ from app.doc.main_swag import main_swagger
app = Flask(__name__)
from app.config import FlaskTesting, FlaskProduction
app.config.from_object(FlaskTesting)
flask_mail = Mail(app)
jwt_manager = JWTManager(app)
swag = Swagger(app, template=main_swagger)
def create_app():
from app.api import bp, bp_errors, bp_product, bp_user, bp_cart
app.register_blueprint(bp)
app.register_blueprint(bp_errors)
app.register_blueprint(bp_product)
app.register_blueprint(bp_user)
app.register_blueprint(bp_cart)
from app.api import bp, bp_errors, bp_product, bp_user, bp_cart
from . import jwt_utils
app.register_blueprint(bp)
app.register_blueprint(bp_errors)
app.register_blueprint(bp_product)
app.register_blueprint(bp_user)
app.register_blueprint(bp_cart)
return app
from . import jwt_utils
return app

View File

@ -1 +1,15 @@
from app.api.routes import main_routes, error_routes, product_routes, user_routes, cart_routes
from app.api.routes.user import (
register_route,
login_route,
logout_route,
update_route,
delete_route,
)
from app.api.routes.product import (
product_create_route,
product_delete_route,
product_info_route,
product_page_route,
)
from app.api.routes import main_routes, error_routes, cart_routes

View File

@ -1,7 +1,13 @@
from flask import jsonify, abort, request
from flask_jwt_extended import jwt_required, get_jwt_identity
from app.doc.cart_swag import show_cart_swagger, add_to_cart_swagger, remove_from_cart_swagger, update_count_in_cart_swagger, purchase_swagger
from app.doc.cart_swag import (
show_cart_swagger,
add_to_cart_swagger,
remove_from_cart_swagger,
update_count_in_cart_swagger,
purchase_swagger,
)
from flasgger import swag_from
@ -9,60 +15,65 @@ from app.api import bp_cart
from app.services.cart_service import CartService
@bp_cart.route('', methods=['GET'])
@bp_cart.route("", methods=["GET"])
@jwt_required()
@swag_from(show_cart_swagger)
def show_cart():
user_id = get_jwt_identity()
user_id = get_jwt_identity()
result, status_code = CartService.show_cart(user_id)
result, status_code = CartService.show_cart(user_id)
return result, status_code
return result, status_code
@bp_cart.route('/add/<int:product_id>', methods=['PUT'])
@bp_cart.route("/add/<int:product_id>", methods=["PUT"])
@jwt_required()
@swag_from(add_to_cart_swagger)
def add_to_cart(product_id: int):
user_id = get_jwt_identity()
count = request.args.get('count', default=1, type=int)
user_id = get_jwt_identity()
count = request.args.get("count", default=1, type=int)
if count < 1:
return abort(400)
result, status_code = CartService.add_to_cart(user_id, product_id, count)
if count < 1:
return abort(400)
return result, status_code
result, status_code = CartService.add_to_cart(user_id, product_id, count)
@bp_cart.route('/remove/<int:product_id>', methods=['DELETE'])
return result, status_code
@bp_cart.route("/remove/<int:product_id>", methods=["DELETE"])
@jwt_required()
@swag_from(remove_from_cart_swagger)
def remove_from_cart(product_id: int):
user_id = get_jwt_identity()
user_id = get_jwt_identity()
result, status_code = CartService.delete_from_cart(user_id, product_id)
result, status_code = CartService.delete_from_cart(user_id, product_id)
return result, status_code
return result, status_code
@bp_cart.route('/update/<int:product_id>', methods=['PUT'])
@bp_cart.route("/update/<int:product_id>", methods=["PUT"])
@jwt_required()
@swag_from(update_count_in_cart_swagger)
def update_count_in_cart(product_id: int):
user_id = get_jwt_identity()
count = request.args.get('count', type=int)
user_id = get_jwt_identity()
count = request.args.get("count", type=int)
if not count:
return abort(400)
if not count:
return abort(400)
result, status_code = CartService.update_count(user_id, product_id, count)
result, status_code = CartService.update_count(user_id, product_id, count)
return result, status_code
return result, status_code
@bp_cart.route('/purchase', methods=['GET'])
@bp_cart.route("/purchase", methods=["GET"])
@jwt_required()
@swag_from(purchase_swagger)
def purchase():
user_id = get_jwt_identity()
user_id = get_jwt_identity()
result, status_code = CartService.purchase(user_id)
result, status_code = CartService.purchase(user_id)
return result, status_code
return result, status_code

View File

@ -1,29 +1,38 @@
from app.api import bp_errors
@bp_errors.app_errorhandler(400)
def bad_request(e):
return {"msg": "The request was incorrectly formatted, or contained invalid data"}, 400
return {
"msg": "The request was incorrectly formatted, or contained invalid data"
}, 400
@bp_errors.app_errorhandler(401)
def unauthorized(e):
return {"msg": "Failed to authorize the request"}, 401
return {"msg": "Failed to authorize the request"}, 401
@bp_errors.app_errorhandler(403)
def forbidden(e):
return {"msg": "You shall not pass"}, 403
return {"msg": "You shall not pass"}, 403
@bp_errors.app_errorhandler(404)
def not_found(e):
return {"msg": "The requested resource was not found"}, 404
return {"msg": "The requested resource was not found"}, 404
@bp_errors.app_errorhandler(405)
def method_not_allowed(e):
return {"msg": "The method used is not allowed in current context"}, 405
return {"msg": "The method used is not allowed in current context"}, 405
@bp_errors.app_errorhandler(500)
def internal_error(e):
return {"msg": "An error occurred on he server"}, 500
return {"msg": "An error occurred on he server"}, 500
@bp_errors.app_errorhandler(501)
def internal_error(e):
return {"msg": "This function has not been implemented yet. Check back soon!"}, 501
def unimplemented_error(e):
return {"msg": "This function has not been implemented yet. Check back soon!"}, 501

View File

@ -1,11 +1,12 @@
from flask import jsonify, abort
from flask import jsonify
from flasgger import swag_from
from app.doc.root_swag import root_swagger
from app.api import bp
@bp.route('/')
@bp.route("/")
@swag_from(root_swagger)
def hello():
return jsonify({'message': 'Hello, Flask!'})
return jsonify({"message": "Hello, Flask!"})

View File

@ -0,0 +1,33 @@
from flask import jsonify, abort, request
from flask_jwt_extended import jwt_required, get_jwt_identity
from app.doc.product_swag import create_product_swagger
from flasgger import swag_from
from app.api import bp_product
from app.services.product import product_create_service
@bp_product.route("/create", methods=["POST"])
@swag_from(create_product_swagger)
@jwt_required()
def create_product_listing():
user_id = get_jwt_identity()
name = request.json.get("name")
price = request.json.get("price")
if name is None or price is None:
return abort(400)
float_price = float(price)
if not isinstance(float_price, float):
return abort(400)
result, status_code = product_create_service.create_product(
user_id, name, float_price
)
return jsonify(result), status_code

View File

@ -0,0 +1,19 @@
from flask import jsonify, abort, request
from flask_jwt_extended import jwt_required, get_jwt_identity
from flasgger import swag_from
from app.api import bp_product
from app.services.product import product_delete_service
@bp_product.route("/<int:product_id>/delete", methods=["DELETE"])
@jwt_required()
def delete_product(product_id: int):
user_id = get_jwt_identity()
result, status_code = product_delete_service.delete_product(user_id, product_id)
return jsonify(result), status_code

View File

@ -0,0 +1,25 @@
from flask import jsonify, request
from app.doc.product_swag import get_product_info_swagger
from flasgger import swag_from
from app.api import bp_product
from app.services.product import product_info_service
@bp_product.route("/<int:product_id>", methods=["GET"])
@swag_from(get_product_info_swagger)
def get_product_info(product_id: int):
fields = ["name", "price", "image", "image_name", "seller"]
fields_param = request.args.get("fields")
fields_param_list = fields_param.split(",") if fields_param else fields
common_fields = list(set(fields) & set(fields_param_list))
result, status_code = product_info_service.product_info(product_id)
return jsonify(result), status_code

View File

@ -0,0 +1,22 @@
from flask import jsonify, abort, request
from app.doc.product_swag import get_products_swagger
from flasgger import swag_from
from app.api import bp_product
from app.services.product import product_list_service
@bp_product.route("", methods=["GET"])
@swag_from(get_products_swagger)
def get_products():
page = request.args.get("page", default=0, type=int)
if page < 0:
return abort(400)
result, status_code = product_list_service.product_list(page)
return jsonify(result), status_code

View File

@ -1,59 +0,0 @@
from flask import jsonify, abort, request
from flask_jwt_extended import jwt_required, get_jwt_identity
from app.doc.product_swag import get_products_swagger, get_product_info_swagger, create_product_swagger
from flasgger import swag_from
from app.api import bp_product
from app.services.product_service import ProductService
@bp_product.route('', methods=['GET'])
@swag_from(get_products_swagger)
def get_products():
page = request.args.get('page', default=0, type=int)
if page < 0:
return abort(400)
result, status_code = ProductService.get_products(page)
return result, status_code
@bp_product.route('/<int:id>', methods=['GET'])
@swag_from(get_product_info_swagger)
def get_product_info(id: int):
fields = ['name', 'price', 'image', 'image_name', 'seller']
fields_param = request.args.get('fields')
fields_param_list = fields_param.split(',') if fields_param else fields
common_fields = list(set(fields) & set(fields_param_list))
result, status_code = ProductService.get_product_info(common_fields, id)
return result, status_code
@bp_product.route('/create', methods=['POST'])
@swag_from(create_product_swagger)
@jwt_required()
def create_product_listing():
user_id = get_jwt_identity()
name = request.json.get('name')
price = request.json.get('price')
if name is None or price is None:
return abort(400)
float_price = float(price)
if not isinstance(float_price, float):
return abort(400)
result, status_code = ProductService.create_listing(user_id, name, float_price)
return result, status_code

View File

@ -0,0 +1,22 @@
from app.api import bp_user
from flask_jwt_extended import jwt_required, get_jwt_identity, get_jwt
from flask import request, abort
from flasgger import swag_from
from app.doc.user_swag import delete_swagger
from app.services.user import delete_service, logout_service
@bp_user.route("/delete", methods=["DELETE"])
@swag_from(delete_swagger)
@jwt_required()
def delete_user():
user_id = get_jwt_identity()
result, status_code = delete_service.delete_user(user_id)
jwt = get_jwt()
logout_service.logout(jwt, user_id, True)
return result, status_code

View File

@ -0,0 +1,33 @@
from app.api import bp_user
from flask import request, jsonify
from flasgger import swag_from
import app.messages.api_responses.user_responses as response
import app.messages.api_errors as errors
from app.doc.user_swag import login_swagger
from app.services.user import login_service
@bp_user.route("/login", methods=["POST"])
@swag_from(login_swagger)
def login():
data = request.get_json()
if not data:
result, status_code = errors.NOT_JSON
return jsonify(result), status_code
required_fields = ["username", "password"]
missing_fields = [field for field in required_fields if field not in data]
if missing_fields:
result, status_code = errors.MISSING_FIELDS(missing_fields)
return jsonify(result), status_code
username = data["username"]
password = data["password"]
result, status_code = login_service.login(username, password)
return result, status_code

View File

@ -0,0 +1,20 @@
from app.api import bp_user
from flasgger import swag_from
from flask_jwt_extended import get_jwt_identity, jwt_required, get_jwt
from app.doc.user_swag import logout_swagger
from app.services.user import logout_service
@bp_user.route("/logout", methods=["DELETE"])
@swag_from(logout_swagger)
@jwt_required()
def logout():
jwt = get_jwt()
user_id = get_jwt_identity()
result, status_code = logout_service.logout(jwt, user_id, True)
return result, status_code

View File

@ -0,0 +1,39 @@
from app.api import bp_user
from flask import request, jsonify
from app.services.user import register_service
from app.doc.user_swag import register_swagger
import app.messages.api_responses.user_responses as response
import app.messages.api_errors as errors
from flasgger import swag_from
@bp_user.route("/register", methods=["POST"])
@swag_from(register_swagger)
def register():
data = request.get_json()
if not data:
result, status_code = errors.NOT_JSON
return jsonify(result), status_code
required_fields = ["username", "displayname", "email", "password"]
missing_fields = [field for field in required_fields if field not in data]
if missing_fields:
result, status_code = errors.MISSING_FIELDS(missing_fields)
return jsonify(result), status_code
username = data["username"]
displayname = data["displayname"]
email = data["email"]
password = data["password"]
result, status_code = register_service.register(
username, displayname, email, password
)
return jsonify(result), status_code

View File

@ -0,0 +1,40 @@
from app.api import bp_user
from flask import request, jsonify
from flask_jwt_extended import jwt_required, get_jwt_identity, get_jwt
from flasgger import swag_from
import app.messages.api_errors as errors
from app.doc.user_swag import update_swagger
from app.services.user import logout_service, update_user_service
@bp_user.route("/update", methods=["PUT"])
@swag_from(update_swagger)
@jwt_required()
def update_user():
data = request.get_json()
possible_fields = ["new_username", "new_displayname", "new_email", "new_password"]
selected_fields = [field for field in possible_fields if field in data]
if not selected_fields:
result, status_code = errors.NO_FIELD_PROVIDED(possible_fields)
return jsonify(result), status_code
user_id = get_jwt_identity()
new_username = data.get("new_username")
new_displayname = data.get("new_displayname")
new_email = data.get("new_email")
new_password = data.get("new_password")
result, status_code = update_user_service.update_user(user_id, new_username, new_displayname, new_email, new_password)
if status_code < 300:
jwt = get_jwt()
logout_service.logout(jwt, user_id, False)
return result, status_code

View File

@ -1,126 +0,0 @@
from app.api import bp_user
from flask_jwt_extended import jwt_required, get_jwt_identity, get_jwt
from flask import request, abort
from flasgger import swag_from
from app.doc.user_swag import login_swagger, logout_swagger, delete_swagger, register_swagger
from app.services.user_service import UserService
@bp_user.route('/register', methods=['POST'])
@swag_from(register_swagger)
def register():
username = request.json.get('username')
displayname = request.json.get('displayname')
email = request.json.get('email')
password = request.json.get('password')
if username is None or email is None or password is None or displayname is None:
return abort(400)
result, status_code = UserService.register(username, displayname, email, password)
return result, status_code
@bp_user.route('/login', methods=['POST'])
@swag_from(login_swagger)
def login():
username = request.json.get('username')
password = request.json.get('password')
if username is None or password is None:
return abort(400)
result, status_code = UserService.login(username, password)
return result, status_code
@bp_user.route('/logout', methods=['DELETE'])
@swag_from(logout_swagger)
@jwt_required()
def logout():
jwt = get_jwt()
user_id = get_jwt_identity()
result, status_code = UserService.logout(jwt, user_id)
return result, status_code
@bp_user.route('/update/username', methods=['PUT'])
@jwt_required()
def update_username():
user_id = get_jwt_identity()
new_username = request.json.get('new_username')
if new_username is None:
return abort(400)
result, status_code = UserService.update_username(user_id, new_username)
jwt = get_jwt()
UserService.logout(jwt, user_id)
return result, status_code
@bp_user.route('/update/displayname', methods=['PUT'])
@jwt_required()
def update_displayname():
user_id = get_jwt_identity()
new_displayname = request.json.get('new_displayname')
if new_displayname is None:
return abort(400)
result, status_code = UserService.update_username(user_id, new_displayname)
jwt = get_jwt()
UserService.logout(jwt, user_id)
return result, status_code
@bp_user.route('/update/email', methods=['PUT'])
@jwt_required()
def update_email():
username = get_jwt_identity()
new_mail = request.json.get('new_email')
if new_mail is None:
return abort(400)
result, status_code = UserService.update_email(username, new_mail)
jwt = get_jwt()
UserService.logout(jwt, username)
return result, status_code
@bp_user.route('/update/password', methods=['PUT'])
@jwt_required()
def update_password():
username = get_jwt_identity()
new_password = request.json.get('new_password')
if new_password is None:
return abort(400)
result, status_code = UserService.update_password(username, new_password)
jwt = get_jwt()
UserService.logout(jwt, username)
return result, status_code
@bp_user.route('/delete', methods=['DELETE'])
@swag_from(delete_swagger)
@jwt_required()
def delete_user():
user_id = get_jwt_identity()
result, status_code = UserService.delete_user(user_id)
jwt = get_jwt()
UserService.logout(jwt, user_id)
return result, status_code

View File

@ -1,30 +1,42 @@
import os
class MySqlConfig:
MYSQL_USER = os.environ.get('MYSQL_USER')
MYSQL_DATABASE = os.environ.get('MYSQL_DATABASE')
MYSQL_HOST = os.environ.get('MYSQL_HOST')
MYSQL_PORT = os.environ.get('MYSQL_PORT')
MYSQL_PASSWORD = os.environ.get('MYSQL_PASSWORD')
MYSQL_USER = os.environ.get("MYSQL_USER")
MYSQL_DATABASE = os.environ.get("MYSQL_DATABASE")
MYSQL_HOST = os.environ.get("MYSQL_HOST")
MYSQL_PORT = os.environ.get("MYSQL_PORT")
MYSQL_PASSWORD = os.environ.get("MYSQL_PASSWORD")
class RedisConfig:
REDIS_HOST = os.environ.get('REDIS_HOST')
REDIS_PORT = os.environ.get('REDIS_PORT')
REDIS_PASSWORD = os.environ.get('REDIS_PASSWORD')
REDIS_HOST = os.environ.get("REDIS_HOST")
REDIS_PORT = os.environ.get("REDIS_PORT")
REDIS_PASSWORD = os.environ.get("REDIS_PASSWORD")
class FlaskProduction:
DEBUG = False
JWT_SECRET_KEY = os.environ.get('JWT_SECRET_KEY')
SERVER_NAME = os.environ.get('HOST') + ':' + os.environ.get('PORT')
DEBUG = False
JWT_SECRET_KEY = os.environ.get("JWT_SECRET_KEY")
SERVER_NAME = os.environ.get("HOST") + ":" + os.environ.get("PORT")
MAIL_SERVER = os.environ.get("MAIL_SERVER")
MAIL_PORT = os.environ.get("MAIL_PORT")
MAIL_USERNAME = os.environ.get("MAIL_USERNAME")
MAIL_PASSWORD = os.environ.get("MAIL_PASSWORD")
MAIL_USE_TLS = os.environ.get("MAIL_USE_TLS")
MAIL_DEFAULT_SENDER = os.environ.get("MAIL_DEFAULT_SENDER")
class FlaskTesting:
DEBUG = True
JWT_SECRET_KEY = os.environ.get('JWT_SECRET_KEY')
SERVER_NAME = os.environ.get('HOST') + ':' + os.environ.get('PORT')
DEBUG = True
TESTING = True
JWT_SECRET_KEY = os.environ.get("JWT_SECRET_KEY")
SERVER_NAME = os.environ.get("HOST") + ":" + os.environ.get("PORT")
MAIL_SERVER = os.environ.get('MAIL_SERVER')
MAIL_PORT = os.environ.get('MAIL_PORT')
MAIL_USERNAME = os.environ.get('MAIL_USERNAME')
MAIL_PASSWORD = os.environ.get('MAIL_PASSWORD')
MAIL_USE_TLS = os.environ.get('MAIL_USE_TLS')
MAIL_DEFAULT_SENDER = os.environ.get('MAIL_DEFAULT_SENDER')
MAIL_SERVER = os.environ.get("MAIL_SERVER")
MAIL_PORT = os.environ.get("MAIL_PORT")
MAIL_USERNAME = os.environ.get("MAIL_USERNAME")
MAIL_PASSWORD = os.environ.get("MAIL_PASSWORD")
MAIL_USE_TLS = os.environ.get("MAIL_USE_TLS")
MAIL_DEFAULT_SENDER = os.environ.get("MAIL_DEFAULT_SENDER")

0
app/db/cart_db.py Normal file
View File

118
app/db/product_db.py Normal file
View File

@ -0,0 +1,118 @@
from typing import Optional
from app.extensions import db_connection
from app.models.product_model import Product
def fetch_products(page: int = 0) -> Optional[list[Product]]:
cursor = db_connection.cursor(dictionary=True)
offset = 10 * page
cursor.execute(
"select product.id, user.displayname as seller, product.name, product.price_pc from product inner join user on user.id = product.seller_id order by product.id limit 10 offset %s",
(offset,),
)
results = cursor.fetchall()
if len(results) < 1:
return None
result_products: list[Product] = []
for row in results:
result_products.append(
Product(
product_id=row["id"],
seller_id=row["seller_id"],
name=row["name"],
price=row["price"],
creation_date=row["creation_date"],
)
)
return result_products
def fetch_product_by_id(product_id: int) -> Optional[Product]:
"""
Fetches specific product info
:param product_id: ID of product to be updated.
:type product_id: int
"""
cursor = db_connection.cursor(dictionary=True)
cursor.execute("select * from product where id = %s", (product_id,))
result = cursor.fetchone()
if cursor.rowcount != 1:
return None
result_product = Product(
product_id=result["id"],
seller_id=result["seller_id"],
name=result["name"],
price=result["price"],
creation_date=result["creation_date"],
)
return result_product
def fetch_product_extended_by_id(product_id: int) -> Optional[Product]:
"""
Fetches specific product info including the seller n
:param product_id: ID of product to be updated.
:type product_id: int
"""
cursor = db_connection.cursor(dictionary=True)
cursor.execute("select * from product inner join user on user.id = product.seller_id where product.id = %s", (product_id,))
result = cursor.fetchone()
if cursor.rowcount != 1:
return None
result_product = Product(
product_id=result["id"],
seller_id=result["seller_id"],
seller_name=result["displayname"],
name=result["name"],
price=result["price"],
creation_date=result["creation_date"],
)
return result_product
def insert_product(product: Product):
"""
Creates a new product listing
:param seller_id: User ID
:type seller_id: str
:param name: New product's name
:type name: str
:param price: New product's price
:type price: float
"""
cursor = db_connection.cursor()
cursor.execute(
"insert into product(seller_id, name, price_pc) values (%s, %s, %s)",
(product.seller_id, product.name, round(product.price, 2)),
)
db_connection.commit()
def delete_product(product: Product):
cursor = db_connection.cursor()
cursor.execute(
"delete from product where id = %s",
(product.product_id,),
)
db_connection.commit()

79
app/db/user_db.py Normal file
View File

@ -0,0 +1,79 @@
from typing import Optional
from app.extensions import db_connection
from app.models.user_model import User
def fetch_by_username(username: str) -> Optional[User]:
cursor = db_connection.cursor(dictionary=True)
cursor.execute("select * from user where username = %s", (username,))
result = cursor.fetchone()
result_user = (
User(
user_id=result["id"],
username=result["username"],
displayname=result["displayname"],
email=result["email"],
password=result["password"],
role_id=result["role_id"],
creation_date=result["creation_date"],
)
if result
else None
)
return result_user
def fetch_by_id(user_id: int) -> Optional[User]:
cursor = db_connection.cursor(dictionary=True)
cursor.execute("select * from user where id = %s", (user_id,))
result = cursor.fetchone()
result_user = (
User(
user_id=result["id"],
username=result["username"],
displayname=result["displayname"],
email=result["email"],
password=result["password"],
role_id=result["role_id"],
creation_date=result["creation_date"],
)
if result
else None
)
return result_user
def insert_user(new_user: User):
cursor = db_connection.cursor(dictionary=True)
cursor.execute(
"insert into user (username, displayname, email, password) values (%s, %s, %s, %s)",
(new_user.username, new_user.displayname, new_user.email, new_user.password),
)
db_connection.commit()
def delete_user(user: User):
cursor = db_connection.cursor(dictionary=True)
cursor.execute("delete from user where id = %s", (user.user_id,))
db_connection.commit()
def update_user(user: User):
cursor = db_connection.cursor(dictionary=True)
cursor.execute(
"update user set username=%s, displayname=%s, email=%s, password=%s where id = %s",
(user.username, user.displayname, user.email, user.password, user.user_id),
)
db_connection.commit()

View File

@ -1,6 +1,6 @@
main_swagger = {
"info": {
"title": "Shop API",
"title": "Swag Shop",
"version": "0.1",
"description": "Simple shop API using flask and co.\nFeatures include:\n- Not working\n- Successful registration of users\n- Adding items to cart\n- I don't know",
},

View File

@ -1,101 +1,116 @@
register_swagger = {
"methods": ["POST"],
"tags": ["User"],
"description": "Registers a new user in the app. Also sends a notification to the user via the provided email",
"parameters":
[
{
"in": "body",
"name": "body",
"description": "Username, displayname and password of the new user\n- Username can be only lowercase and up to 64 characters\n- Displayname can contain special characters (. _ -) and lower and upper characters\n- Password must be at least 8 characters long, contain both lower and upper characters, numbers and special characters\n- Email has to be in format \"name@domain.tld\" and up to 64 characters long in total",
"required": True,
"schema":
{
"type": "object",
"properties":
{
"username": {"type": "string", "example": "mycoolusername"},
"email": {"type": "string", "example": "mymail@dot.com"},
"displayname": {"type": "string", "example": "MyCoolDisplayName"},
"password": {"type": "string", "example": "My5tr0ngP@55w0rd"}
}
}
}
],
"methods": ["POST"],
"tags": ["User"],
"description": "Registers a new user in the app. Also sends a notification to the user via the provided email",
"parameters": [
{
"in": "body",
"name": "body",
"description": 'Username, displayname and password of the new user\n- Username can be only lowercase and up to 64 characters\n- Displayname can contain special characters (. _ -) and lower and upper characters\n- Password must be at least 8 characters long, contain both lower and upper characters, numbers and special characters\n- Email has to be in format "name@domain.tld" and up to 64 characters long in total',
"required": True,
"schema": {
"type": "object",
"properties": {
"username": {"type": "string", "example": "mycoolusername"},
"email": {"type": "string", "example": "mymail@dot.com"},
"displayname": {"type": "string", "example": "MyCoolDisplayName"},
"password": {"type": "string", "example": "My5tr0ngP@55w0rd"},
},
},
}
],
}
login_swagger = {
"methods": ["POST"],
"tags": ["User"],
"description": "Logs in using username and password and returns a JWT token for further authorization of requests.\n**The token is valid for 1 hour**",
"parameters":
[
{
"in": "body",
"name": "body",
"description": "Username and password payload",
"required": True,
"schema":
{
"type": "object",
"properties":
{
"username": {"type": "string", "example": "mycoolusername"},
"password": {"type": "string", "example": "MyStrongPassword123"}
}
}
}
],
"responses":
{
"200":
{
"description": "Returns a fresh token",
"schema":
{
"type": "object",
"properties": {
"token": {"type": "string", "example": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJmcmVzaCI6ZmFsc2UsImlhdCI6MTcxMDMyMjkyOCwianRpIjoiZDFhYzQxZDktZjA4NC00MmYzLThlMWUtZWFmZjJiNGU1MDAyIiwidHlwZSI6ImFjY2VzcyIsInN1YiI6MjMwMDEsIm5iZiI6MTcxMDMyMjkyOCwiZXhwIjoxNzEwMzI2NTI4fQ.SW7LAi1j5vDOEIvzeN-sy0eHPP9PFJFkXYY029O35w0"}
}
}
},
"400":
{
"description": "Possible causes:\n- Missing username or password from request.\n- Nonexistent username"
},
"401":
{
"description": "Password is incorrect"
}
}
"methods": ["POST"],
"tags": ["User"],
"description": "Logs in using username and password and returns a JWT token for further authorization of requests.\n**The token is valid for 1 hour**",
"parameters": [
{
"in": "body",
"name": "body",
"description": "Username and password payload",
"required": True,
"schema": {
"type": "object",
"properties": {
"username": {"type": "string", "example": "mycoolusername"},
"password": {"type": "string", "example": "MyStrongPassword123"},
},
},
}
],
"responses": {
"200": {
"description": "Returns a fresh token",
"schema": {
"type": "object",
"properties": {
"token": {
"type": "string",
"example": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJmcmVzaCI6ZmFsc2UsImlhdCI6MTcxMDMyMjkyOCwianRpIjoiZDFhYzQxZDktZjA4NC00MmYzLThlMWUtZWFmZjJiNGU1MDAyIiwidHlwZSI6ImFjY2VzcyIsInN1YiI6MjMwMDEsIm5iZiI6MTcxMDMyMjkyOCwiZXhwIjoxNzEwMzI2NTI4fQ.SW7LAi1j5vDOEIvzeN-sy0eHPP9PFJFkXYY029O35w0",
}
},
},
},
"400": {
"description": "Possible causes:\n- Missing username or password from request.\n- Nonexistent username"
},
"401": {"description": "Password is incorrect"},
},
}
logout_swagger = {
"methods": ["DELETE"],
"tags": ["User"],
"security": [{"JWT": []}],
"description": "Logs out the user via provided JWT token",
"parameters": [],
"responses":
{
"200":
{
"description": "User successfully logged out"
}
}
"methods": ["DELETE"],
"tags": ["User"],
"security": [{"JWT": []}],
"description": "Logs out the user via provided JWT token",
"parameters": [],
"responses": {"200": {"description": "User successfully logged out"}},
}
update_swagger = {
"methods": ["PUT"],
"tags": ["User"],
"security": [{"JWT": []}],
"description": "Updates user attributes.",
"parameters": [
{
"in": "body",
"name": "body",
"description": "Attributes to update for the user.",
"required": True,
"schema": {
"type": "object",
"properties": {
"new_username": {"type": "string", "example": "mycoolusername"},
"new_email": {"type": "string", "example": "mymail@dot.com"},
"new_displayname": {
"type": "string",
"example": "MyCoolDisplayName",
},
"new_password": {"type": "string", "example": "My5tr0ngP@55w0rd"},
},
},
}
],
"responses": {
"200": {"description": "User attributes updated successfully."},
"400": {"description": "Bad request. Check the request body for errors."},
"401": {"description": "Unauthorized. User must be logged in."},
"409": {"description": "Conflict. Check the response message for details."},
"500": {
"description": "Internal server error. Contact the system administrator."
},
},
}
delete_swagger = {
"methods": ["DELETE"],
"tags": ["User"],
"security": [{"JWT": []}],
"description": "Deletes a user via JWT token",
"parameters": [],
"responses":
{
"200":
{
"description": "User successfully deleted"
}
}
}
"methods": ["DELETE"],
"tags": ["User"],
"security": [{"JWT": []}],
"description": "Deletes a user via JWT token",
"parameters": [],
"responses": {"200": {"description": "User successfully deleted"}},
}

View File

@ -6,16 +6,16 @@ from app.config import RedisConfig
from app.config import MySqlConfig
db_connection = mysql.connector.connect(
host=MySqlConfig.MYSQL_HOST,
user=MySqlConfig.MYSQL_USER,
password=MySqlConfig.MYSQL_PASSWORD,
database=MySqlConfig.MYSQL_DATABASE,
host=MySqlConfig.MYSQL_HOST,
user=MySqlConfig.MYSQL_USER,
password=MySqlConfig.MYSQL_PASSWORD,
database=MySqlConfig.MYSQL_DATABASE,
)
jwt_redis_blocklist = redis.StrictRedis(
host=RedisConfig.REDIS_HOST,
port=RedisConfig.REDIS_PORT,
password=RedisConfig.REDIS_PASSWORD,
db=0,
decode_responses=True
host=RedisConfig.REDIS_HOST,
port=RedisConfig.REDIS_PORT,
password=RedisConfig.REDIS_PASSWORD,
db=0,
decode_responses=True,
)

View File

@ -4,9 +4,10 @@ from . import jwt_manager
from app import app
@jwt_manager.token_in_blocklist_loader
def check_if_token_is_revoked(jwt_header, jwt_payload: dict) -> bool:
jti = jwt_payload["jti"]
token_in_redis = jwt_redis_blocklist.get(jti)
jti = jwt_payload["jti"]
token_in_redis = jwt_redis_blocklist.get(jti)
return token_in_redis is not None
return token_in_redis is not None

View File

@ -2,19 +2,16 @@ from flask_mail import Message
from app import flask_mail
from app.mail.messages import messages
from app.mail.message_content import MessageContent
def send_mail(message: str, recipient: str):
def send_mail(message: MessageContent, recipient: str):
body = messages[message]["body"]
subject = messages[message]["subject"]
msg = Message(subject=message.subject, recipients=[recipient], body=message.body)
msg = Message(subject, recipients=[recipient], body=body)
try:
flask_mail.send(msg)
return True
except Exception as e:
print(f"Failed to send email. Error: {e}")
return False
try:
flask_mail.send(msg)
return True
except Exception as e:
print(f"Failed to send email. Error: {e}")
return False

View File

@ -0,0 +1,4 @@
class MessageContent:
def __init__(self, subject, body):
self.subject = subject
self.body = body

View File

@ -1,24 +0,0 @@
import datetime
messages = {
"register": {
"subject": "Successfully registered!",
"body": "Congratulations! Your account has been successfully created.\nThis mail also serves as a test that the email address is correct"
},
"login": {
"subject": "New Login detected!",
"body": "A new login token has been created"
},
"logout": {
"subject": "Successfully logged out",
"body": "A login has been revoked. No further action is needed."
},
"update": {
"subject": "Account updated",
"body": "Your account has been successfully updated. This also means you have been logged out of everywhere"
},
"delete": {
"subject": "Account Deleted!",
"body": "Your account has been deleted. No further action needed"
}
}

View File

@ -0,0 +1,15 @@
NOT_JSON = {"msg": "Request body must be JSON"}, 400
def UNKNOWN_DATABASE_ERROR(e):
return {"msg": f"An unknown error occurred within the database. {e}"}, 500
def MISSING_FIELDS(fields):
return {"msg": f"Missing required fields: {', '.join(fields)}"}, 400
def NO_FIELD_PROVIDED(possible_fields):
return {
"msg": f"No field was provided. At least one of the following is required: {', '.join(possible_fields)}"
}, 400

View File

@ -0,0 +1,6 @@
PRODUCT_LISTING_CREATED_SUCCESSFULLY = {"msg": "Successfully created a brand new product."}, 201
NOT_OWNER_OF_PRODUCT = {"msg": "You don't own this product, therefore you cannot delete it!"}, 400
UNKNOWN_PRODUCT = {"msg": "The product you tried fetching is not known. Try a different product ID."}, 400
SCROLLED_TOO_FAR = {"msg": "You scrolled too far in the pages. Try going back a little again."}, 400

View File

@ -0,0 +1,26 @@
USER_CREATED_SUCCESSFULLY = {"msg": "User created successfully."}, 201
USER_LOGGED_OUT_SUCCESSFULLY = {"msg": "Successfully logged out"}, 200
USER_DELETED_SUCCESSFULLY = {"msg": "User successfully deleted"}, 200
def USER_ACCOUNT_UPDATED_SUCCESSFULLY(updated_attributes):
return {"msg": f"Successfully updated your accounts {', '.join(updated_attributes)}"}, 200
INVALID_USERNAME_FORMAT = {
"msg": "Username is in incorrect format. It must be between 1 and 64 lowercase characters."
}, 400
INVALID_DISPLAYNAME_FORMAT = {
"msg": "Display name is in incorrect format. It must contain only letters, '.', '-', or '_' and be between 1 and 64 characters."
}, 400
INVALID_EMAIL_FORMAT = {"msg": "Email is in incorrect format."}, 400
INVALID_PASSWORD_FORMAT = {
"msg": "Password is in incorrect format. It must be between 8 and 64 characters and contain at least one uppercase letter, one lowercase letter, one digit, and one special character"
}, 400
EMAIL_ALREADY_IN_USE = {"msg": "Email already in use."}, 409
USERNAME_ALREADY_IN_USE = {"msg": "Username already in use."}, 409
USERNAME_NOT_FOUND = {"msg": "Username not found"}, 400
INCORRECT_PASSWORD = {"msg": "Incorrect password"}, 401
UNKNOWN_ERROR = {"msg": "An unknown error occurred with user"}, 500

View File

@ -0,0 +1,26 @@
from app.mail.message_content import MessageContent
USER_EMAIL_SUCCESSFULLY_REGISTERED = MessageContent(
subject="Successfully registered!",
body="Congratulations! Your account has been successfully created.\nThis mail also serves as a test that the email address is correct",
)
USER_EMAIL_SUCCESSFULLY_LOGGED_IN = MessageContent(
subject="New Login detected!",
body="A new login token has been created",
)
USER_EMAIL_SUCCESSFULLY_LOGGED_OUT = MessageContent(
subject="Successfully logged out",
body="A login has been revoked. No further action is needed.",
)
USER_EMAIL_SUCCESSFULLY_UPDATED_ACCOUNT = MessageContent(
subject="Account updated",
body="Your account has been successfully updated. This also means you have been logged out of everywhere",
)
USER_EMAIL_SUCCESSFULLY_DELETED_ACCOUNT = MessageContent(
subject="Account Deleted!",
body="Your account has been deleted. No further action needed",
)

63
app/models/cart_model.py Normal file
View File

@ -0,0 +1,63 @@
from datetime import datetime
class Cart:
"""
Represents a cart in the system.
:param id: The unique identifier of the cart.
:type id: int
:param price_total: The total price of the cart.
:type price_total: float
:param item_count: The count of items in the cart.
:type item_count: int
"""
def __init__(
self,
cart_id: int = None,
price_total: float = 0.00,
item_count: int = 0,
):
self.id = cart_id
self.price_total = price_total
self.item_count = item_count
def __repr__(self):
return f"Cart(id={self.id}, price_total={self.price_total}, item_count={self.item_count})"
class CartItem:
"""
Represents a cart item in the system.
:param id: The unique identifier of the cart item.
:type id: int
:param cart_id: The identifier of the cart.
:type cart_id: int
:param product_id: The identifier of the product.
:type product_id: int
:param count: The count of the product in the cart.
:type count: int
:param price_subtotal: The subtotal price of the product in the cart.
:type price_subtotal: float
:param date_added: The date and time when the item was added to the cart.
:type date_added: datetime
"""
def __init__(
self,
cart_item_id: int = None,
cart_id: int = None,
product_id: int = None,
count: int = 0,
price_subtotal: float = 0.00,
date_added: datetime = None,
):
self.id = cart_item_id
self.cart_id = cart_id
self.product_id = product_id
self.count = count
self.price_subtotal = price_subtotal
self.date_added = date_added or datetime.now()
def __repr__(self):
return f"CartItem(id={self.id}, cart_id={self.cart_id}, product_id={self.product_id}, count={self.count}, price_subtotal={self.price_subtotal}, date_added={self.date_added})"

View File

@ -0,0 +1,38 @@
from datetime import datetime
from decimal import Decimal
class Product:
"""
Represents a product in the system.
:param id: The unique identifier of the product.
:type id: int
:param seller_id: The user ID of the seller.
:type seller_id: int
:param name: The name of the product.
:type name: str
:param price: The price of the product.
:type price: Decimal
:param creation_date: The date and time when the product was created.
:type creation_date: datetime
"""
def __init__(
self,
product_id: int = None,
seller_id: int = None,
seller_name: str = None,
name: str = None,
price: Decimal = None,
creation_date: datetime = None,
):
self.product_id = product_id
self.seller_id = seller_id
self.seller_name = seller_name
self.name = name
self.price = price
self.creation_date = creation_date
def __repr__(self):
return f"Product(product_id={self.product_id}, seller_id={self.seller_id}, seller_name={self.seller_name}, name='{self.name}', price={self.price}, creation_date={self.creation_date!r})"

44
app/models/user_model.py Normal file
View File

@ -0,0 +1,44 @@
from datetime import datetime
class User:
"""
Represents a user in the system.
:param user_id: The unique identifier of the user.
:type user_id: int
:param username: The username of the user.
:type username: str
:param displayname: The display name of the user.
:type displayname: str
:param email: The email address of the user.
:type email: str
:param password: The hashed password of the user.
:type password: str
:param role_id: The role ID of the user. Defaults to 1.
:type role_id: int
:param creation_date: The date and time when the user was created.
:type creation_date: datetime
"""
def __init__(
self,
user_id: str = None,
username: str = None,
displayname: str = None,
email: str = None,
password: str = None,
role_id: int = 1,
creation_date: datetime = None,
):
self.user_id = user_id
self.username = username
self.displayname = displayname
self.email = email
self.password = password
self.role_id = role_id
self.creation_date = creation_date
def __repr__(self):
return f"User(id={self.user_id}, username={self.username}, displayname={self.displayname}, email={self.email}, password={self.password}, role_id={self.role_id}, creation_date={self.creation_date})"

View File

@ -3,180 +3,160 @@ from typing import Tuple, Union
from app.extensions import db_connection
class CartService:
@staticmethod
def add_to_cart(user_id: str, product_id: int, count: int) -> Tuple[Union[dict, str], int]:
"""
Adds a product to a user's cart.
@staticmethod
@staticmethod
def update_count(
user_id: str, product_id: int, count: int
) -> Tuple[Union[dict, str], int]:
"""
Updates count of products in user's cart
:param user_id: User ID.
:type user_id: str
:param product_id: ID of product to be added.
:type product_id: int
:return: Tuple containing a dictionary with a token and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
:param user_id: User ID.
:type user_id: str
:param product_id: ID of product to be updated.
:type product_id: int
:param count: New count of products
:type count: int
:return: Tuple containing a dictionary with a token and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
try:
with db_connection.cursor(dictionary=True) as cursor:
cursor.execute("select count from cart_item where cart_id = %s and product_id = %s", (user_id, product_id))
result = cursor.fetchone()
try:
if count <= 0:
return CartService.delete_from_cart(user_id, product_id)
if cursor.rowcount == 1:
cursor.execute("update cart_item set count = count + %s where cart_id = %s and product_id = %s", (count, user_id, product_id))
else:
cursor.execute("insert into cart_item(cart_id, product_id, count) values (%s, %s, %s)", (user_id, product_id, count))
with db_connection.cursor(dictionary=True) as cursor:
cursor.execute(
"update cart_item set count = %s where cart_id = %s and product_id = %s",
(count, user_id, product_id),
)
db_connection.commit()
db_connection.commit()
return {"Success": "Successfully added to cart"}, 200
except Error as e:
return {"Failed": f"Failed to update item count in cart. Reason: {e}"}, 500
except Error as e:
return {"Failed": f"Failed to add item to cart. Reason: {e}"}, 500
@staticmethod
def delete_from_cart(user_id: str, product_id: int) -> Tuple[Union[dict, str], int]:
"""
Completely deletes an item from a user's cart
return {"Success": "Successfully added to cart"}, 200
:param user_id: User ID.
:type user_id: str
:param product_id: ID of product to be updated.
:type product_id: int
:return: Tuple containing a dictionary with a token and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
@staticmethod
def update_count(user_id: str, product_id: int, count: int) -> Tuple[Union[dict, str], int]:
"""
Updates count of products in user's cart
try:
with db_connection.cursor() as cursor:
cursor.execute(
"delete from cart_item where cart_id = %s and product_id = %s",
(user_id, product_id),
)
db_connection.commit()
:param user_id: User ID.
:type user_id: str
:param product_id: ID of product to be updated.
:type product_id: int
:param count: New count of products
:type count: int
:return: Tuple containing a dictionary with a token and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
return {"Success": "Successfully removed item from cart"}, 200
except Error as e:
return {"Failed": f"Failed to remove item from cart. Reason: {e}"}, 500
try:
if count <= 0:
return CartService.delete_from_cart(user_id, product_id)
@staticmethod
def show_cart(user_id: str) -> Tuple[Union[dict, str], int]:
"""
Gives the user the content of their cart
with db_connection.cursor(dictionary=True) as cursor:
cursor.execute("update cart_item set count = %s where cart_id = %s and product_id = %s", (count, user_id, product_id))
db_connection.commit()
:param user_id: User ID.
:type user_id: str
:return: Tuple containing a dictionary with a token and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
return {"Success": "Successfully added to cart"}, 200
try:
with db_connection.cursor(dictionary=True) as cursor:
cursor.execute(
"select product.name as product_name, count, price_subtotal, date_added from cart_item inner join product on cart_item.product_id = product.id where cart_item.cart_id = %s",
(user_id,),
)
rows = cursor.fetchall()
except Error as e:
return {"Failed": f"Failed to update item count in cart. Reason: {e}"}, 500
results = []
@staticmethod
def delete_from_cart(user_id: str, product_id: int) -> Tuple[Union[dict, str], int]:
"""
Completely deletes an item from a user's cart
for row in rows:
mid_result = {
"name": row["product_name"],
"count": row["count"],
"price_subtotal": row["price_subtotal"],
"date_added": row["date_added"],
}
:param user_id: User ID.
:type user_id: str
:param product_id: ID of product to be updated.
:type product_id: int
:return: Tuple containing a dictionary with a token and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
results.append(mid_result)
return results, 200
try:
with db_connection.cursor() as cursor:
cursor.execute("delete from cart_item where cart_id = %s and product_id = %s", (user_id, product_id))
db_connection.commit()
except Error as e:
return {"Failed": f"Failed to load cart. Reason: {e}"}, 500
return {"Success": "Successfully removed item from cart"}, 200
except Error as e:
return {"Failed": f"Failed to remove item from cart. Reason: {e}"}, 500
@staticmethod
def purchase(user_id: str) -> Tuple[Union[dict, str], int]:
"""
"Purchases" the contents of user's cart
:param user_id: User ID.
:type user_id: str
:return: Tuple containing a dictionary with a token and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
@staticmethod
def show_cart(user_id: str) -> Tuple[Union[dict, str], int]:
"""
Gives the user the content of their cart
:param user_id: User ID.
:type user_id: str
:return: Tuple containing a dictionary with a token and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
try:
with db_connection.cursor(dictionary=True) as cursor:
# get all cart items
cursor.execute(
"select id, product_id, count, price_subtotal from cart_item where cart_id = %s",
(user_id,),
)
results = cursor.fetchall()
try:
with db_connection.cursor(dictionary=True) as cursor:
cursor.execute("select product.name as product_name, count, price_subtotal, date_added from cart_item inner join product on cart_item.product_id = product.id where cart_item.cart_id = %s", (user_id,))
rows = cursor.fetchall()
if len(results) < 1:
return {"Failed": "Failed to purchase. Cart is Empty"}, 400
results = []
# create a purchase
cursor.execute("insert into purchase(user_id) values (%s)", (user_id,))
for row in rows:
mid_result = {
"name": row['product_name'],
"count": row['count'],
"price_subtotal": row['price_subtotal'],
"date_added": row['date_added']
}
last_id = cursor.lastrowid
results.append(mid_result)
parsed = []
ids = []
return results, 200
for row in results:
mid_row = (
last_id,
row["product_id"],
row["count"],
row["price_subtotal"],
)
except Error as e:
return {"Failed": f"Failed to load cart. Reason: {e}"}, 500
row_id = row["id"]
parsed.append(mid_row)
ids.append(row_id)
@staticmethod
def purchase(user_id: str) -> Tuple[Union[dict, str], int]:
"""
"Purchases" the contents of user's cart
insert_query = "INSERT INTO purchase_item (purchase_id, product_id, count, price_subtotal) VALUES (%s, %s, %s, %s)"
for row in parsed:
cursor.execute(insert_query, row)
:param user_id: User ID.
:type user_id: str
:return: Tuple containing a dictionary with a token and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
try:
with db_connection.cursor(dictionary=True) as cursor:
# get all cart items
cursor.execute("select id, product_id, count, price_subtotal from cart_item where cart_id = %s", (user_id,))
results = cursor.fetchall()
delete_query = "delete from cart_item where id = %s"
for one_id in ids:
cursor.execute(delete_query, (one_id,))
if len(results) < 1:
return {"Failed": "Failed to purchase. Cart is Empty"}, 400
db_connection.commit()
# create a purchase
cursor.execute("insert into purchase(user_id) values (%s)", (user_id,))
# clear cart
except Error as e:
return {"msg": f"Failed to load cart. Reason: {e}"}, 500
last_id = cursor.lastrowid
parsed = []
ids = []
for row in results:
mid_row = (
last_id,
row['product_id'],
row['count'],
row['price_subtotal']
)
row_id = row['id']
parsed.append(mid_row)
ids.append(row_id)
insert_query = "INSERT INTO purchase_item (purchase_id, product_id, count, price_subtotal) VALUES (%s, %s, %s, %s)"
for row in parsed:
cursor.execute(insert_query, row)
delete_query = "delete from cart_item where id = %s"
for one_id in ids:
cursor.execute(delete_query, (one_id,))
db_connection.commit()
# clear cart
except Error as e:
return {"msg": f"Failed to load cart. Reason: {e}"}, 500
return {"msg": "Successfully purchased"}, 200
return {"msg": "Successfully purchased"}, 200

View File

@ -0,0 +1,30 @@
from mysql.connector import Error as mysqlError
from app.messages.api_responses import product_responses as response
import app.messages.api_errors as errors
from app.db import product_db
from app.models.product_model import Product
def create_product(seller_id: str, name: str, price: float):
"""
Creates a new product listing
:param seller_id: User ID
:type seller_id: str
:param name: New product's name
:type name: str
:param price: New product's price
:type price: float
"""
product: Product = Product(seller_id=seller_id, name=name, price=price)
try:
product_db.insert_product(product)
except mysqlError as e:
return errors.UNKNOWN_DATABASE_ERROR(e)
return response.PRODUCT_LISTING_CREATED_SUCCESSFULLY

View File

@ -0,0 +1,23 @@
from mysql.connector import Error as mysqlError
from app.messages.api_responses import product_responses as response
import app.messages.api_errors as errors
from app.db import product_db
from app.models.product_model import Product
def delete_product(seller_id: str, product_id: str):
product: Product = product_db.fetch_product_by_id(product_id)
if product.seller_id != seller_id:
return response.NOT_OWNER_OF_PRODUCT
try:
product_db.delete_product(product)
except mysqlError as e:
return errors.UNKNOWN_DATABASE_ERROR(e)
return response.PRODUCT_LISTING_CREATED_SUCCESSFULLY

View File

@ -0,0 +1,8 @@
import imghdr
def is_base64_jpg(decoded_string) -> bool:
try:
image_type = imghdr.what(None, decoded_string)
return image_type == "jpeg"
except Exception:
return False

View File

@ -0,0 +1,20 @@
from mysql.connector import Error as mysqlError
from app.messages.api_responses import product_responses as response
import app.messages.api_errors as errors
from app.db import product_db
from app.models.product_model import Product
def product_info(product_id: int):
try:
product: Product = product_db.fetch_product_extended_by_id(product_id)
if product is None:
return response.UNKNOWN_PRODUCT
return product, 200
except mysqlError as e:
return errors.UNKNOWN_DATABASE_ERROR(e)

View File

@ -0,0 +1,29 @@
from mysql.connector import Error as mysqlError
from app.messages.api_responses import product_responses as response
import app.messages.api_errors as errors
from app.db import product_db
def product_list(page: int):
try:
result_products = product_db.fetch_products(page)
if result_products is None:
return response.SCROLLED_TOO_FAR
result_obj = []
for product in result_products:
mid_result = {
"id": product.product_id,
"seller": product.seller_id,
"name": product.name,
"price": product.price,
}
result_obj.append(mid_result)
return result_obj, 200
except mysqlError as e:
errors.UNKNOWN_DATABASE_ERROR(e)

View File

@ -1,127 +0,0 @@
import base64
from flask import abort
from mysql.connector import Error
from decimal import Decimal
from app.extensions import db_connection
class ProductService:
@staticmethod
def get_products(page: int):
"""
Fetches 10 products
:param page: Page, aka offset of fetching
:type page: int
"""
try:
with db_connection.cursor(dictionary=True) as cursor:
offset = 10 * page
cursor.execute("select product.id, user.displayname as seller, product.name, product.price_pc from product inner join user on user.id = product.seller_id order by product.id limit 10 offset %s", (offset,))
results = cursor.fetchall()
if len(results) < 1:
return {"msg": "Failed to fetch products. You've probably selected too far with pages"}, 400
result_obj = []
for row in results:
mid_result = {
"id": row['id'],
"seller": row['seller'],
"name": row['name'],
"price": row['price_pc']
}
result_obj.append(mid_result)
return result_obj, 200
except Error as e:
return {"msg": f"Failed to fetch products. Error: {e}"}, 500
@staticmethod
def get_product_info(fields: list[str], product_id: int):
"""
Fetches specific product info
:param fields: array of fields that can be fetched
:type fields: list[str]
:param product_id: ID of product to be updated.
:type product_id: int
"""
try:
with db_connection.cursor(dictionary=True) as cursor:
fields_to_sql = {
"name": "product.name",
"price": "product.price_pc as price",
"image": "product.image",
"image_name": "product.image_name",
"seller": "user.displayname as seller"
}
field_sql = []
for field in fields:
field_sql.append(fields_to_sql[field])
select_params = ", ".join(field_sql)
if "seller" in fields:
cursor.execute(f"select {select_params} from product inner join user on user.id = product.seller_id where product.id = %s", (product_id,))
else:
cursor.execute(f"select {select_params} from product where id = %s", (product_id,))
result = cursor.fetchone()
if cursor.rowcount != 1:
return {"msg": "Failed to fetch product. Product likely doesn't exist"}, 400
result_obj = {}
for field in fields:
if field == "image": # Encode image into base64
result_obj[field] = base64.b64encode(result[field]).decode('utf-8')
else:
result_obj[field] = result[field]
return result_obj, 200
except Error as e:
return {"msg": f"Failed to fetch product info. Error: {e}"}, 500
@staticmethod
def create_listing(seller_id: str, name: str, price: float):
"""
Creates a new product listing
:param seller_id: User ID
:type seller_id: str
:param name: New product's name
:type name: str
:param price: New product's price
:type price: float
"""
try:
with db_connection.cursor() as cursor:
rounded_price = round(price, 2)
cursor.execute("insert into product(seller_id, name, price_pc) values (%s, %s, %s)", (seller_id, name, Decimal(str(rounded_price))))
db_connection.commit()
except Error as e:
return {"msg": f"Failed to create product. {e}"}, 400
return {"msg": "Successfully created new product listing"}, 200
@staticmethod
def __is_base64_jpg(decoded_string):
try:
# Checking if the decoded data represents a valid JPEG image
image_type = imghdr.what(None, decoded_data)
return image_type == 'jpeg'
except Exception:
return False

View File

@ -0,0 +1,31 @@
from typing import Tuple, Union
from mysql.connector import Error as mysqlError
import app.db.user_db as user_db
from app.models.user_model import User
import app.messages.api_responses.user_responses as response
import app.messages.api_errors as errors
from app.mail.mail import send_mail
from app.messages.mail_responses.user_email import USER_EMAIL_SUCCESSFULLY_DELETED_ACCOUNT
def delete_user(user_id: str) -> Tuple[Union[dict, str], int]:
"""
Deletes a user account.
:param user_id: User ID.
:type user_id: str
:return: Tuple containing a dictionary and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
try:
user: User = user_db.fetch_by_id(user_id=user_id)
user_db.delete_user(user)
send_mail(USER_EMAIL_SUCCESSFULLY_DELETED_ACCOUNT, user.email)
except mysqlError as e:
return errors.UNKNOWN_DATABASE_ERROR(e)
return response.USER_DELETED_SUCCESSFULLY

View File

@ -0,0 +1,45 @@
import datetime
from typing import Tuple, Union
import bcrypt
from mysql.connector import Error as mysqlError
from flask_jwt_extended import create_access_token
import app.messages.api_responses.user_responses as response
import app.messages.api_errors as errors
from app.db import user_db
from app.mail.mail import send_mail
from app.models.user_model import User
from app.messages.mail_responses.user_email import USER_EMAIL_SUCCESSFULLY_LOGGED_IN
def login(username: str, password: str) -> Tuple[Union[dict, str], int]:
"""
Authenticates a user with the provided username and password.
:param username: User's username.
:type username: str
:param password: User's password.
:type password: str
:return: Tuple containing a dictionary with a token and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
try:
user: User = user_db.fetch_by_username(username)
if user is None:
return response.USERNAME_NOT_FOUND
if not bcrypt.checkpw(password.encode("utf-8"), user.password.encode("utf-8")):
return response.INCORRECT_PASSWORD
expire = datetime.timedelta(hours=1)
token = create_access_token(identity=user.user_id, expires_delta=expire)
send_mail(USER_EMAIL_SUCCESSFULLY_LOGGED_IN, user.email)
return {"token": token}, 200
except mysqlError as e:
return errors.UNKNOWN_DATABASE_ERROR(e)

View File

@ -0,0 +1,31 @@
from typing import Tuple, Union
import app.messages.api_responses.user_responses as response
from app.db import user_db
from app.models.user_model import User
from app.mail.mail import send_mail
from app.services.user import user_helper as helper
from app.messages.mail_responses.user_email import USER_EMAIL_SUCCESSFULLY_LOGGED_OUT
def logout(jwt_token, user_id, send_notif: bool) -> Tuple[Union[dict, str], int]:
"""
Logs out a user by invalidating the provided JWT.
:param jti: JWT ID.
:type jti: str
:param exp: JWT expiration timestamp.
:type exp: int
:return: Tuple containing a dictionary and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
jti = jwt_token["jti"]
exp = jwt_token["exp"]
user: User = user_db.fetch_by_id(user_id)
helper.invalidate_token(jti, exp)
if send_notif:
send_mail(USER_EMAIL_SUCCESSFULLY_LOGGED_OUT, user.email)
return response.USER_LOGGED_OUT_SUCCESSFULLY

View File

@ -0,0 +1,64 @@
import bcrypt
from typing import Tuple, Union
from mysql.connector import Error as mysqlError
import app.messages.api_responses.user_responses as response
import app.messages.api_errors as errors
from app.db import user_db
from app.mail.mail import send_mail
from app.models.user_model import User
from app.services.user import user_helper as helper
from app.messages.mail_responses.user_email import USER_EMAIL_SUCCESSFULLY_REGISTERED
def register(
username: str, displayname: str, email: str, password: str
) -> Tuple[Union[dict, str], int]:
"""
Registers a new user with the provided username, email, and password.
:param username: User's username.
:type username: str
:param email: User's email address.
:type email: str
:param password: User's password.
:type password: str
:return: Tuple containing a dictionary and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
try:
if not helper.verify_username(username):
return response.INVALID_USERNAME_FORMAT
if not helper.verify_displayname(displayname):
return response.INVALID_DISPLAYNAME_FORMAT
if not helper.verify_email(email):
return response.INVALID_EMAIL_FORMAT
if not helper.verify_password(password):
return response.INVALID_PASSWORD_FORMAT
hashed_password = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt())
new_user: User = User(
username=username,
displayname=displayname,
email=email,
password=hashed_password,
)
user_db.insert_user(new_user)
except mysqlError as e:
if "email" in e.msg:
return response.EMAIL_ALREADY_IN_USE
if "username" in e.msg:
return response.USERNAME_ALREADY_IN_USE
return errors.UNKNOWN_DATABASE_ERROR(e)
send_mail(USER_EMAIL_SUCCESSFULLY_REGISTERED, new_user.email)
return response.USER_CREATED_SUCCESSFULLY

View File

@ -0,0 +1,72 @@
import bcrypt
from typing import Tuple, Union
from mysql.connector import Error as mysqlError
from app.db import user_db
from app.mail.mail import send_mail
from app.models.user_model import User
from app.services.user import user_helper as helper
from app.messages.api_responses import user_responses as response
import app.messages.api_errors as errors
from app.messages.mail_responses.user_email import (
USER_EMAIL_SUCCESSFULLY_UPDATED_ACCOUNT,
)
def update_user(
user_id: str,
new_username: str = None,
new_displayname: str = None,
new_email: str = None,
new_password: str = None,
) -> Tuple[Union[dict, str], int]:
user: User = user_db.fetch_by_id(user_id)
updated_attributes = []
if user is None:
return response.UNKNOWN_ERROR
if new_username:
if not helper.verify_username(new_username):
return response.INVALID_USERNAME_FORMAT
user.username = new_username
updated_attributes.append("username")
if new_displayname:
if not helper.verify_displayname(new_displayname):
return response.INVALID_DISPLAYNAME_FORMAT
user.displayname = new_displayname
updated_attributes.append("displayname")
if new_email:
if not helper.verify_email(new_email):
return response.INVALID_EMAIL_FORMAT
user.email = new_email
updated_attributes.append("email")
if new_password:
if not helper.verify_password(new_password):
return response.INVALID_PASSWORD_FORMAT
hashed_password = bcrypt.hashpw(new_password.encode("utf-8"), bcrypt.gensalt())
user.password = hashed_password
updated_attributes.append("password")
try:
user_db.update_user(user)
except mysqlError as e:
if "username" in e.msg:
return response.USERNAME_ALREADY_IN_USE
if "email" in e.msg:
return response.EMAIL_ALREADY_IN_USE
return errors.UNKNOWN_DATABASE_ERROR(e)
send_mail(USER_EMAIL_SUCCESSFULLY_UPDATED_ACCOUNT, user.email)
return response.USER_ACCOUNT_UPDATED_SUCCESSFULLY(updated_attributes)

View File

@ -0,0 +1,74 @@
import re
from datetime import datetime
from app.extensions import jwt_redis_blocklist
def invalidate_token(jti: str, exp: int):
"""
Invalidates a JWT by adding its JTI to the Redis blocklist.
:param jti: JWT ID.
:type jti: str
:param exp: JWT expiration timestamp.
:type exp: int
"""
expiration = datetime.fromtimestamp(exp)
now = datetime.now()
delta = expiration - now
jwt_redis_blocklist.set(jti, "", ex=delta)
def verify_email(email: str) -> bool:
"""
Verifies a given email string against a regular expression.
:param email: Email string.
:type email: str
:return: Boolean indicating whether the email successfully passed the check.
:rtype: bool
"""
email_regex = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
return re.match(email_regex, email) and len(email) <= 64
def verify_displayname(displayname: str) -> bool:
"""
Verifies a given display name string against a regular expression.
:param displayname: Display name string.
:type displayname: str
:return: Boolean indicating whether the display name successfully passed the check.
:rtype: bool
"""
displayname_regex = r"^[a-zA-Z.-_]{1,64}$"
return re.match(displayname_regex, displayname)
def verify_username(username: str) -> bool:
"""
Verifies a given username string against a regular expression.
:param username: Username string.
:type username: str
:return: Boolean indicating whether the username successfully passed the check.
:rtype: bool
"""
username_regex = r"^[a-z]{1,64}$"
return re.match(username_regex, username)
def verify_password(password: str) -> bool:
"""
Verifies a given password string against a regular expression.
:param password: Password string.
:type password: str
:return: Boolean indicating whether the password successfully passed the check.
:rtype: bool
"""
password_regex = (
r"^(?=.*?[A-Z])(?=.*?[a-z])(?=.*?[0-9])(?=.*?[#?!@$ %^&*-]).{8,64}$"
)
return re.match(password_regex, password)

View File

@ -1,326 +0,0 @@
import bcrypt
import re
import jwt
import datetime
from typing import Tuple, Union
from mysql.connector import Error
from flask_jwt_extended import create_access_token
from app.extensions import db_connection
from app.extensions import jwt_redis_blocklist
from app.mail.mail import send_mail
class UserService:
"""
UserService class provides methods for user-related operations.
Methods:
- register(username: str, email: str, password: str) -> Tuple[Union[dict, str], int]
- login(username: str, password: str) -> Tuple[Union[dict, str], int]
- logout(jti, exp) -> Tuple[Union[dict, str], int]
- update_email(user_id: str, new_email: str) -> Tuple[Union[dict, str], int]
- update_username(user_id: str, new_username: str) -> Tuple[Union[dict, str], int]
- update_password(user_id: str, new_password: str) -> Tuple[Union[dict, str], int]
"""
@staticmethod
def register(username: str, displayname: str, email: str, password: str) -> Tuple[Union[dict, str], int]:
"""
Registers a new user with the provided username, email, and password.
:param username: User's username.
:type username: str
:param email: User's email address.
:type email: str
:param password: User's password.
:type password: str
:return: Tuple containing a dictionary and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
try:
if not UserService.__verify_username(username):
return {"msg": "Failed to verify username. Try another username"}, 400
if not UserService.__verify_displayname(displayname):
return {"msg": "Failed to verify display name. Try another name"}, 400
if not UserService.__verify_email(email):
return {"msg": "Failed to verify email. Try another email"}, 400
if not UserService.__verify_password(password):
return {"msg": "Failed to verify password. Try another (stronger) password"}, 400
hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt())
with db_connection.cursor() as cursor:
cursor.execute("insert into user (username, displayname, email, password) values (%s, %s, %s, %s)", (username, displayname, email, hashed_password))
db_connection.commit()
except Error as e:
print(f"Error: {e}")
return {"msg": "Failed to insert into database. Username or email are likely in use already"}, 500
UserService.__send_email("register", email=email)
return {"msg": "User created successfully"}, 200
@staticmethod
def login(username: str, password: str) -> Tuple[Union[dict, str], int]:
"""
Authenticates a user with the provided username and password.
:param username: User's username.
:type username: str
:param password: User's password.
:type password: str
:return: Tuple containing a dictionary with a token and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
try:
with db_connection.cursor(dictionary=True) as cursor:
cursor.execute("select id, email, password from user where username = %s", (username,))
result = cursor.fetchone()
user_id = result['id']
email = result['email']
password_hash = result['password']
if user_id is None:
return {"msg": "Username not found"}, 400
if not bcrypt.checkpw(password.encode('utf-8'), password_hash.encode('utf-8')):
return {"msg": "Incorrect password"}, 401
expire = datetime.timedelta(hours=1)
token = create_access_token(identity=user_id, expires_delta=expire)
UserService.__send_email("login", email=email)
return {"token": token}, 200
except Error as e:
return {"msg": f"Failed to login. Error: {e}"}, 500
@staticmethod
def logout(jwt_token, user_id) -> Tuple[Union[dict, str], int]:
"""
Logs out a user by invalidating the provided JWT.
:param jti: JWT ID.
:type jti: str
:param exp: JWT expiration timestamp.
:type exp: int
:return: Tuple containing a dictionary and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
jti = jwt['jti']
exp = jwt['exp']
UserService.__invalidate_token(jti, exp)
UserService.__send_email("logout", id=user_id)
return {"msg": "Successfully logged out"}, 200
@staticmethod
def delete_user(user_id: str) -> Tuple[Union[dict, str], int]:
"""
Deletes a user account.
:param user_id: User ID.
:type user_id: str
:return: Tuple containing a dictionary and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
UserService.__send_email("delete", id=user_id)
try:
with db_connection.cursor() as cursor:
cursor.execute("delete from user where id = %s", (user_id,))
db_connection.commit()
except Error as e:
return {"msg": f"Failed to delete user. {e}"}, 500
return {"msg": "User successfully deleted"}, 200
@staticmethod
def update_email(user_id: str, new_email: str) -> Tuple[Union[dict, str], int]:
"""
Updates the email address for a user with the provided user ID.
:param user_id: User's ID.
:type user_id: str
:param new_email: New email address.
:type new_email: str
:return: Tuple containing a dictionary and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
try:
if not UserService.__verify_email(new_email):
return {"msg": "Failed to verify email. Try another email"}, 400
with db_connection.cursor() as cursor:
cursor.execute("update user set email = %s where id = %s", (new_email, user_id))
db_connection.commit()
except Error as e:
return {"msg": f"Failed to update email. Email is likely in use already. Error: {e}"}, 500
return {"msg": "Email successfully updated"}, 200
@staticmethod
def update_username(user_id: str, new_username: str) -> Tuple[Union[dict, str], int]:
"""
Updates the username for a user with the provided user ID.
:param user_id: User's ID.
:type user_id: str
:param new_username: New username.
:type new_username: str
:return: Tuple containing a dictionary and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
try:
if not UserService.__verify_name(new_username):
return {"msg": "Failed to verify username. Try another one"}, 400
with db_connection.cursor() as cursor:
cursor.execute("update user set username = %s where id = %s", (new_username, user_id))
db_connection.commit()
except Error as e:
return {"msg": f"Failed to update username. Username is likely in use already. Error: {e}"}, 500
return {"msg": "Username successfully updated"}, 200
@staticmethod
def update_password(user_id: str, new_password: str) -> Tuple[Union[dict, str], int]:
"""
Updates the password for a user with the provided user ID.
:param user_id: User's ID.
:type user_id: str
:param new_password: New password.
:type new_password: str
:return: Tuple containing a dictionary and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
try:
if not UserService.__verify_password(new_password):
return {"msg": "Failed to verify password. Try another (stronger) one"}, 400
hashed_password = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt())
with db_connection.cursor() as cursor:
cursor.execute("update user set password = %s where id = %s", (new_username, user_id))
db_connection.commit()
except Error as e:
return {"msg": f"Failed to update password. Error: {e}"}, 500
return {"msg": "Password successfully updated"}, 200
@staticmethod
def __send_email(message: str, username: str = None, id: str = None, email: str = None):
if email is not None:
send_mail(message, email)
return
if username is not None:
try:
with db_connection.cursor(dictionary=True) as cursor:
cursor.execute("select email from user where username = %s", (username,))
result = cursor.fetchone()
email = result['email']
send_mail(message, email)
except Error as e:
return {"msg": f"Failed to fetch some data. Error: {e}"}, 500
return
if id is not None:
try:
with db_connection.cursor(dictionary=True) as cursor:
cursor.execute("select email from user where id = %s", (id,))
result = cursor.fetchone()
email = result['email']
send_mail(message, email)
except Error as e:
return {"msg": f"Failed to fetch some data. Error: {e}"}, 500
return
raise ValueError("Invalid input data to send mail")
@staticmethod
def __invalidate_token(jti: str, exp: int):
"""
Invalidates a JWT by adding its JTI to the Redis blocklist.
:param jti: JWT ID.
:type jti: str
:param exp: JWT expiration timestamp.
:type exp: int
"""
expiration = datetime.datetime.fromtimestamp(exp)
now = datetime.datetime.now()
delta = expiration - now
jwt_redis_blocklist.set(jti, "", ex=delta)
@staticmethod
def __verify_email(email: str) -> bool:
"""
Verifies a given email string against a regular expression.
:param email: Email string.
:type email: str
:return: Boolean indicating whether the email successfully passed the check.
:rtype: bool
"""
email_regex = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
return re.match(email_regex, email) and len(email) <= 64
@staticmethod
def __verify_displayname(displayname: str) -> bool:
"""
Verifies a given display name string against a regular expression.
:param displayname: Display name string.
:type displayname: str
:return: Boolean indicating whether the display name successfully passed the check.
:rtype: bool
"""
displayname_regex = r"^[a-zA-Z.-_]{1,64}$"
return re.match(displayname_regex, displayname)
@staticmethod
def __verify_username(username: str) -> bool:
"""
Verifies a given username string against a regular expression.
:param username: Username string.
:type username: str
:return: Boolean indicating whether the username successfully passed the check.
:rtype: bool
"""
username_regex = r"^[a-z]{1,64}$"
return re.match(username_regex, username)
@staticmethod
def __verify_password(password: str) -> bool:
"""
Verifies a given password string against a regular expression.
:param password: Password string.
:type password: str
:return: Boolean indicating whether the password successfully passed the check.
:rtype: bool
"""
password_regex = r"^(?=.*?[A-Z])(?=.*?[a-z])(?=.*?[0-9])(?=.*?[#?!@$ %^&*-]).{8,64}$"
return re.match(password_regex, password)

View File

@ -1,6 +1,4 @@
from dotenv import load_dotenv
from flask_jwt_extended import JWTManager
import os
from app import create_app
@ -9,5 +7,5 @@ load_dotenv()
app = create_app()
if __name__ == "__main__":
print("Hello, Flask")
app.run(use_reloader=False)
print("Hello, Flask")
app.run(use_reloader=False)