Compare commits

..

3 Commits

Author SHA1 Message Date
c018d2fb52 Massive rewrite of user and product logic. Still WIP 2024-06-02 22:41:14 +02:00
be7b8064d6 Update .vscode for convenience 2024-06-02 16:47:18 +02:00
80dd89212f Unify json naming 2024-05-06 00:52:03 +02:00
50 changed files with 1477 additions and 994 deletions

View File

@ -10,5 +10,10 @@
"lastrowid", "lastrowid",
"rtype", "rtype",
"flasgger" "flasgger"
] ],
"files.exclude": {
"**/__pycache__/**": true,
},
"editor.tabSize": 4,
"editor.insertSpaces": true,
} }

View File

@ -7,14 +7,17 @@ from app.doc.main_swag import main_swagger
app = Flask(__name__) app = Flask(__name__)
from app.config import FlaskTesting, FlaskProduction from app.config import FlaskTesting, FlaskProduction
app.config.from_object(FlaskTesting) app.config.from_object(FlaskTesting)
flask_mail = Mail(app) flask_mail = Mail(app)
jwt_manager = JWTManager(app) jwt_manager = JWTManager(app)
swag = Swagger(app, template=main_swagger) swag = Swagger(app, template=main_swagger)
def create_app(): def create_app():
from app.api import bp, bp_errors, bp_product, bp_user, bp_cart from app.api import bp, bp_errors, bp_product, bp_user, bp_cart
app.register_blueprint(bp) app.register_blueprint(bp)
app.register_blueprint(bp_errors) app.register_blueprint(bp_errors)
app.register_blueprint(bp_product) app.register_blueprint(bp_product)

View File

@ -1 +1,15 @@
from app.api.routes import main_routes, error_routes, product_routes, user_routes, cart_routes from app.api.routes.user import (
register_route,
login_route,
logout_route,
update_route,
delete_route,
)
from app.api.routes.product import (
product_create_route,
product_delete_route,
product_info_route,
product_page_route,
)
from app.api.routes import main_routes, error_routes, cart_routes

View File

@ -1,7 +1,13 @@
from flask import jsonify, abort, request from flask import jsonify, abort, request
from flask_jwt_extended import jwt_required, get_jwt_identity from flask_jwt_extended import jwt_required, get_jwt_identity
from app.doc.cart_swag import show_cart_swagger, add_to_cart_swagger, remove_from_cart_swagger, update_count_in_cart_swagger, purchase_swagger from app.doc.cart_swag import (
show_cart_swagger,
add_to_cart_swagger,
remove_from_cart_swagger,
update_count_in_cart_swagger,
purchase_swagger,
)
from flasgger import swag_from from flasgger import swag_from
@ -9,7 +15,8 @@ from app.api import bp_cart
from app.services.cart_service import CartService from app.services.cart_service import CartService
@bp_cart.route('', methods=['GET'])
@bp_cart.route("", methods=["GET"])
@jwt_required() @jwt_required()
@swag_from(show_cart_swagger) @swag_from(show_cart_swagger)
def show_cart(): def show_cart():
@ -19,12 +26,13 @@ def show_cart():
return result, status_code return result, status_code
@bp_cart.route('/add/<int:product_id>', methods=['PUT'])
@bp_cart.route("/add/<int:product_id>", methods=["PUT"])
@jwt_required() @jwt_required()
@swag_from(add_to_cart_swagger) @swag_from(add_to_cart_swagger)
def add_to_cart(product_id: int): def add_to_cart(product_id: int):
user_id = get_jwt_identity() user_id = get_jwt_identity()
count = request.args.get('count', default=1, type=int) count = request.args.get("count", default=1, type=int)
if count < 1: if count < 1:
return abort(400) return abort(400)
@ -33,7 +41,8 @@ def add_to_cart(product_id: int):
return result, status_code return result, status_code
@bp_cart.route('/remove/<int:product_id>', methods=['DELETE'])
@bp_cart.route("/remove/<int:product_id>", methods=["DELETE"])
@jwt_required() @jwt_required()
@swag_from(remove_from_cart_swagger) @swag_from(remove_from_cart_swagger)
def remove_from_cart(product_id: int): def remove_from_cart(product_id: int):
@ -43,12 +52,13 @@ def remove_from_cart(product_id: int):
return result, status_code return result, status_code
@bp_cart.route('/update/<int:product_id>', methods=['PUT'])
@bp_cart.route("/update/<int:product_id>", methods=["PUT"])
@jwt_required() @jwt_required()
@swag_from(update_count_in_cart_swagger) @swag_from(update_count_in_cart_swagger)
def update_count_in_cart(product_id: int): def update_count_in_cart(product_id: int):
user_id = get_jwt_identity() user_id = get_jwt_identity()
count = request.args.get('count', type=int) count = request.args.get("count", type=int)
if not count: if not count:
return abort(400) return abort(400)
@ -57,7 +67,8 @@ def update_count_in_cart(product_id: int):
return result, status_code return result, status_code
@bp_cart.route('/purchase', methods=['GET'])
@bp_cart.route("/purchase", methods=["GET"])
@jwt_required() @jwt_required()
@swag_from(purchase_swagger) @swag_from(purchase_swagger)
def purchase(): def purchase():

View File

@ -1,29 +1,38 @@
from app.api import bp_errors from app.api import bp_errors
@bp_errors.app_errorhandler(400) @bp_errors.app_errorhandler(400)
def bad_request(e): def bad_request(e):
return {"Bad Request": "The request was incorrectly formatted, or contained invalid data"}, 400 return {
"msg": "The request was incorrectly formatted, or contained invalid data"
}, 400
@bp_errors.app_errorhandler(401) @bp_errors.app_errorhandler(401)
def unauthorized(e): def unauthorized(e):
return {"Unauthorized": "Failed to authorize the request"}, 401 return {"msg": "Failed to authorize the request"}, 401
@bp_errors.app_errorhandler(403) @bp_errors.app_errorhandler(403)
def forbidden(e): def forbidden(e):
return {"Forbidden": "You shall not pass"}, 403 return {"msg": "You shall not pass"}, 403
@bp_errors.app_errorhandler(404) @bp_errors.app_errorhandler(404)
def not_found(e): def not_found(e):
return {"Not Found": "The requested resource was not found"}, 404 return {"msg": "The requested resource was not found"}, 404
@bp_errors.app_errorhandler(405) @bp_errors.app_errorhandler(405)
def method_not_allowed(e): def method_not_allowed(e):
return {"Method Not Allowed": "The method used is not allowed in current context"}, 405 return {"msg": "The method used is not allowed in current context"}, 405
@bp_errors.app_errorhandler(500) @bp_errors.app_errorhandler(500)
def internal_error(e): def internal_error(e):
return {"Internal Server Error": "An error occurred on he server"}, 500 return {"msg": "An error occurred on he server"}, 500
@bp_errors.app_errorhandler(501) @bp_errors.app_errorhandler(501)
def internal_error(e): def unimplemented_error(e):
return {"Not Implemented": "This function has not been implemented yet. Check back soon!"}, 501 return {"msg": "This function has not been implemented yet. Check back soon!"}, 501

View File

@ -1,11 +1,12 @@
from flask import jsonify, abort from flask import jsonify
from flasgger import swag_from from flasgger import swag_from
from app.doc.root_swag import root_swagger from app.doc.root_swag import root_swagger
from app.api import bp from app.api import bp
@bp.route('/')
@bp.route("/")
@swag_from(root_swagger) @swag_from(root_swagger)
def hello(): def hello():
return jsonify({'message': 'Hello, Flask!'}) return jsonify({"message": "Hello, Flask!"})

View File

@ -0,0 +1,33 @@
from flask import jsonify, abort, request
from flask_jwt_extended import jwt_required, get_jwt_identity
from app.doc.product_swag import create_product_swagger
from flasgger import swag_from
from app.api import bp_product
from app.services.product import product_create_service
@bp_product.route("/create", methods=["POST"])
@swag_from(create_product_swagger)
@jwt_required()
def create_product_listing():
user_id = get_jwt_identity()
name = request.json.get("name")
price = request.json.get("price")
if name is None or price is None:
return abort(400)
float_price = float(price)
if not isinstance(float_price, float):
return abort(400)
result, status_code = product_create_service.create_product(
user_id, name, float_price
)
return jsonify(result), status_code

View File

@ -0,0 +1,19 @@
from flask import jsonify, abort, request
from flask_jwt_extended import jwt_required, get_jwt_identity
from flasgger import swag_from
from app.api import bp_product
from app.services.product import product_delete_service
@bp_product.route("/<int:product_id>/delete", methods=["DELETE"])
@jwt_required()
def delete_product(product_id: int):
user_id = get_jwt_identity()
result, status_code = product_delete_service.delete_product(user_id, product_id)
return jsonify(result), status_code

View File

@ -0,0 +1,25 @@
from flask import jsonify, request
from app.doc.product_swag import get_product_info_swagger
from flasgger import swag_from
from app.api import bp_product
from app.services.product import product_info_service
@bp_product.route("/<int:product_id>", methods=["GET"])
@swag_from(get_product_info_swagger)
def get_product_info(product_id: int):
fields = ["name", "price", "image", "image_name", "seller"]
fields_param = request.args.get("fields")
fields_param_list = fields_param.split(",") if fields_param else fields
common_fields = list(set(fields) & set(fields_param_list))
result, status_code = product_info_service.product_info(product_id)
return jsonify(result), status_code

View File

@ -0,0 +1,22 @@
from flask import jsonify, abort, request
from app.doc.product_swag import get_products_swagger
from flasgger import swag_from
from app.api import bp_product
from app.services.product import product_list_service
@bp_product.route("", methods=["GET"])
@swag_from(get_products_swagger)
def get_products():
page = request.args.get("page", default=0, type=int)
if page < 0:
return abort(400)
result, status_code = product_list_service.product_list(page)
return jsonify(result), status_code

View File

@ -1,59 +0,0 @@
from flask import jsonify, abort, request
from flask_jwt_extended import jwt_required, get_jwt_identity
from app.doc.product_swag import get_products_swagger, get_product_info_swagger, create_product_swagger
from flasgger import swag_from
from app.api import bp_product
from app.services.product_service import ProductService
@bp_product.route('', methods=['GET'])
@swag_from(get_products_swagger)
def get_products():
page = request.args.get('page', default=0, type=int)
if page < 0:
return abort(400)
result, status_code = ProductService.get_products(page)
return result, status_code
@bp_product.route('/<int:id>', methods=['GET'])
@swag_from(get_product_info_swagger)
def get_product_info(id: int):
fields = ['name', 'price', 'image', 'image_name', 'seller']
fields_param = request.args.get('fields')
fields_param_list = fields_param.split(',') if fields_param else fields
common_fields = list(set(fields) & set(fields_param_list))
result, status_code = ProductService.get_product_info(common_fields, id)
return result, status_code
@bp_product.route('/create', methods=['POST'])
@swag_from(create_product_swagger)
@jwt_required()
def create_product_listing():
user_id = get_jwt_identity()
name = request.json.get('name')
price = request.json.get('price')
if name is None or price is None:
return abort(400)
float_price = float(price)
if not isinstance(float_price, float):
return abort(400)
result, status_code = ProductService.create_listing(user_id, name, float_price)
return result, status_code

View File

@ -0,0 +1,22 @@
from app.api import bp_user
from flask_jwt_extended import jwt_required, get_jwt_identity, get_jwt
from flask import request, abort
from flasgger import swag_from
from app.doc.user_swag import delete_swagger
from app.services.user import delete_service, logout_service
@bp_user.route("/delete", methods=["DELETE"])
@swag_from(delete_swagger)
@jwt_required()
def delete_user():
user_id = get_jwt_identity()
result, status_code = delete_service.delete_user(user_id)
jwt = get_jwt()
logout_service.logout(jwt, user_id, True)
return result, status_code

View File

@ -0,0 +1,33 @@
from app.api import bp_user
from flask import request, jsonify
from flasgger import swag_from
import app.messages.api_responses.user_responses as response
import app.messages.api_errors as errors
from app.doc.user_swag import login_swagger
from app.services.user import login_service
@bp_user.route("/login", methods=["POST"])
@swag_from(login_swagger)
def login():
data = request.get_json()
if not data:
result, status_code = errors.NOT_JSON
return jsonify(result), status_code
required_fields = ["username", "password"]
missing_fields = [field for field in required_fields if field not in data]
if missing_fields:
result, status_code = errors.MISSING_FIELDS(missing_fields)
return jsonify(result), status_code
username = data["username"]
password = data["password"]
result, status_code = login_service.login(username, password)
return result, status_code

View File

@ -0,0 +1,20 @@
from app.api import bp_user
from flasgger import swag_from
from flask_jwt_extended import get_jwt_identity, jwt_required, get_jwt
from app.doc.user_swag import logout_swagger
from app.services.user import logout_service
@bp_user.route("/logout", methods=["DELETE"])
@swag_from(logout_swagger)
@jwt_required()
def logout():
jwt = get_jwt()
user_id = get_jwt_identity()
result, status_code = logout_service.logout(jwt, user_id, True)
return result, status_code

View File

@ -0,0 +1,39 @@
from app.api import bp_user
from flask import request, jsonify
from app.services.user import register_service
from app.doc.user_swag import register_swagger
import app.messages.api_responses.user_responses as response
import app.messages.api_errors as errors
from flasgger import swag_from
@bp_user.route("/register", methods=["POST"])
@swag_from(register_swagger)
def register():
data = request.get_json()
if not data:
result, status_code = errors.NOT_JSON
return jsonify(result), status_code
required_fields = ["username", "displayname", "email", "password"]
missing_fields = [field for field in required_fields if field not in data]
if missing_fields:
result, status_code = errors.MISSING_FIELDS(missing_fields)
return jsonify(result), status_code
username = data["username"]
displayname = data["displayname"]
email = data["email"]
password = data["password"]
result, status_code = register_service.register(
username, displayname, email, password
)
return jsonify(result), status_code

View File

@ -0,0 +1,40 @@
from app.api import bp_user
from flask import request, jsonify
from flask_jwt_extended import jwt_required, get_jwt_identity, get_jwt
from flasgger import swag_from
import app.messages.api_errors as errors
from app.doc.user_swag import update_swagger
from app.services.user import logout_service, update_user_service
@bp_user.route("/update", methods=["PUT"])
@swag_from(update_swagger)
@jwt_required()
def update_user():
data = request.get_json()
possible_fields = ["new_username", "new_displayname", "new_email", "new_password"]
selected_fields = [field for field in possible_fields if field in data]
if not selected_fields:
result, status_code = errors.NO_FIELD_PROVIDED(possible_fields)
return jsonify(result), status_code
user_id = get_jwt_identity()
new_username = data.get("new_username")
new_displayname = data.get("new_displayname")
new_email = data.get("new_email")
new_password = data.get("new_password")
result, status_code = update_user_service.update_user(user_id, new_username, new_displayname, new_email, new_password)
if status_code < 300:
jwt = get_jwt()
logout_service.logout(jwt, user_id, False)
return result, status_code

View File

@ -1,126 +0,0 @@
from app.api import bp_user
from flask_jwt_extended import jwt_required, get_jwt_identity, get_jwt
from flask import request, abort
from flasgger import swag_from
from app.doc.user_swag import login_swagger, logout_swagger, delete_swagger, register_swagger
from app.services.user_service import UserService
@bp_user.route('/register', methods=['POST'])
@swag_from(register_swagger)
def register():
username = request.json.get('username')
displayname = request.json.get('displayname')
email = request.json.get('email')
password = request.json.get('password')
if username is None or email is None or password is None or displayname is None:
return abort(400)
result, status_code = UserService.register(username, displayname, email, password)
return result, status_code
@bp_user.route('/login', methods=['POST'])
@swag_from(login_swagger)
def login():
username = request.json.get('username')
password = request.json.get('password')
if username is None or password is None:
return abort(400)
result, status_code = UserService.login(username, password)
return result, status_code
@bp_user.route('/logout', methods=['DELETE'])
@swag_from(logout_swagger)
@jwt_required()
def logout():
jwt = get_jwt()
user_id = get_jwt_identity()
result, status_code = UserService.logout(jwt, user_id)
return result, status_code
@bp_user.route('/update/username', methods=['PUT'])
@jwt_required()
def update_username():
user_id = get_jwt_identity()
new_username = request.json.get('new_username')
if new_username is None:
return abort(400)
result, status_code = UserService.update_username(user_id, new_username)
jwt = get_jwt()
UserService.logout(jwt, user_id)
return result, status_code
@bp_user.route('/update/displayname', methods=['PUT'])
@jwt_required()
def update_displayname():
user_id = get_jwt_identity()
new_displayname = request.json.get('new_displayname')
if new_displayname is None:
return abort(400)
result, status_code = UserService.update_username(user_id, new_displayname)
jwt = get_jwt()
UserService.logout(jwt, user_id)
return result, status_code
@bp_user.route('/update/email', methods=['PUT'])
@jwt_required()
def update_email():
username = get_jwt_identity()
new_mail = request.json.get('new_email')
if new_mail is None:
return abort(400)
result, status_code = UserService.update_email(username, new_mail)
jwt = get_jwt()
UserService.logout(jwt, username)
return result, status_code
@bp_user.route('/update/password', methods=['PUT'])
@jwt_required()
def update_password():
username = get_jwt_identity()
new_password = request.json.get('new_password')
if new_password is None:
return abort(400)
result, status_code = UserService.update_password(username, new_password)
jwt = get_jwt()
UserService.logout(jwt, username)
return result, status_code
@bp_user.route('/delete', methods=['DELETE'])
@swag_from(delete_swagger)
@jwt_required()
def delete_user():
user_id = get_jwt_identity()
result, status_code = UserService.delete_user(user_id)
jwt = get_jwt()
UserService.logout(jwt, user_id)
return result, status_code

View File

@ -1,30 +1,42 @@
import os import os
class MySqlConfig: class MySqlConfig:
MYSQL_USER = os.environ.get('MYSQL_USER') MYSQL_USER = os.environ.get("MYSQL_USER")
MYSQL_DATABASE = os.environ.get('MYSQL_DATABASE') MYSQL_DATABASE = os.environ.get("MYSQL_DATABASE")
MYSQL_HOST = os.environ.get('MYSQL_HOST') MYSQL_HOST = os.environ.get("MYSQL_HOST")
MYSQL_PORT = os.environ.get('MYSQL_PORT') MYSQL_PORT = os.environ.get("MYSQL_PORT")
MYSQL_PASSWORD = os.environ.get('MYSQL_PASSWORD') MYSQL_PASSWORD = os.environ.get("MYSQL_PASSWORD")
class RedisConfig: class RedisConfig:
REDIS_HOST = os.environ.get('REDIS_HOST') REDIS_HOST = os.environ.get("REDIS_HOST")
REDIS_PORT = os.environ.get('REDIS_PORT') REDIS_PORT = os.environ.get("REDIS_PORT")
REDIS_PASSWORD = os.environ.get('REDIS_PASSWORD') REDIS_PASSWORD = os.environ.get("REDIS_PASSWORD")
class FlaskProduction: class FlaskProduction:
DEBUG = False DEBUG = False
JWT_SECRET_KEY = os.environ.get('JWT_SECRET_KEY') JWT_SECRET_KEY = os.environ.get("JWT_SECRET_KEY")
SERVER_NAME = os.environ.get('HOST') + ':' + os.environ.get('PORT') SERVER_NAME = os.environ.get("HOST") + ":" + os.environ.get("PORT")
MAIL_SERVER = os.environ.get("MAIL_SERVER")
MAIL_PORT = os.environ.get("MAIL_PORT")
MAIL_USERNAME = os.environ.get("MAIL_USERNAME")
MAIL_PASSWORD = os.environ.get("MAIL_PASSWORD")
MAIL_USE_TLS = os.environ.get("MAIL_USE_TLS")
MAIL_DEFAULT_SENDER = os.environ.get("MAIL_DEFAULT_SENDER")
class FlaskTesting: class FlaskTesting:
DEBUG = True DEBUG = True
JWT_SECRET_KEY = os.environ.get('JWT_SECRET_KEY') TESTING = True
SERVER_NAME = os.environ.get('HOST') + ':' + os.environ.get('PORT') JWT_SECRET_KEY = os.environ.get("JWT_SECRET_KEY")
SERVER_NAME = os.environ.get("HOST") + ":" + os.environ.get("PORT")
MAIL_SERVER = os.environ.get('MAIL_SERVER') MAIL_SERVER = os.environ.get("MAIL_SERVER")
MAIL_PORT = os.environ.get('MAIL_PORT') MAIL_PORT = os.environ.get("MAIL_PORT")
MAIL_USERNAME = os.environ.get('MAIL_USERNAME') MAIL_USERNAME = os.environ.get("MAIL_USERNAME")
MAIL_PASSWORD = os.environ.get('MAIL_PASSWORD') MAIL_PASSWORD = os.environ.get("MAIL_PASSWORD")
MAIL_USE_TLS = os.environ.get('MAIL_USE_TLS') MAIL_USE_TLS = os.environ.get("MAIL_USE_TLS")
MAIL_DEFAULT_SENDER = os.environ.get('MAIL_DEFAULT_SENDER') MAIL_DEFAULT_SENDER = os.environ.get("MAIL_DEFAULT_SENDER")

0
app/db/cart_db.py Normal file
View File

118
app/db/product_db.py Normal file
View File

@ -0,0 +1,118 @@
from typing import Optional
from app.extensions import db_connection
from app.models.product_model import Product
def fetch_products(page: int = 0) -> Optional[list[Product]]:
cursor = db_connection.cursor(dictionary=True)
offset = 10 * page
cursor.execute(
"select product.id, user.displayname as seller, product.name, product.price_pc from product inner join user on user.id = product.seller_id order by product.id limit 10 offset %s",
(offset,),
)
results = cursor.fetchall()
if len(results) < 1:
return None
result_products: list[Product] = []
for row in results:
result_products.append(
Product(
product_id=row["id"],
seller_id=row["seller_id"],
name=row["name"],
price=row["price"],
creation_date=row["creation_date"],
)
)
return result_products
def fetch_product_by_id(product_id: int) -> Optional[Product]:
"""
Fetches specific product info
:param product_id: ID of product to be updated.
:type product_id: int
"""
cursor = db_connection.cursor(dictionary=True)
cursor.execute("select * from product where id = %s", (product_id,))
result = cursor.fetchone()
if cursor.rowcount != 1:
return None
result_product = Product(
product_id=result["id"],
seller_id=result["seller_id"],
name=result["name"],
price=result["price"],
creation_date=result["creation_date"],
)
return result_product
def fetch_product_extended_by_id(product_id: int) -> Optional[Product]:
"""
Fetches specific product info including the seller n
:param product_id: ID of product to be updated.
:type product_id: int
"""
cursor = db_connection.cursor(dictionary=True)
cursor.execute("select * from product inner join user on user.id = product.seller_id where product.id = %s", (product_id,))
result = cursor.fetchone()
if cursor.rowcount != 1:
return None
result_product = Product(
product_id=result["id"],
seller_id=result["seller_id"],
seller_name=result["displayname"],
name=result["name"],
price=result["price"],
creation_date=result["creation_date"],
)
return result_product
def insert_product(product: Product):
"""
Creates a new product listing
:param seller_id: User ID
:type seller_id: str
:param name: New product's name
:type name: str
:param price: New product's price
:type price: float
"""
cursor = db_connection.cursor()
cursor.execute(
"insert into product(seller_id, name, price_pc) values (%s, %s, %s)",
(product.seller_id, product.name, round(product.price, 2)),
)
db_connection.commit()
def delete_product(product: Product):
cursor = db_connection.cursor()
cursor.execute(
"delete from product where id = %s",
(product.product_id,),
)
db_connection.commit()

79
app/db/user_db.py Normal file
View File

@ -0,0 +1,79 @@
from typing import Optional
from app.extensions import db_connection
from app.models.user_model import User
def fetch_by_username(username: str) -> Optional[User]:
cursor = db_connection.cursor(dictionary=True)
cursor.execute("select * from user where username = %s", (username,))
result = cursor.fetchone()
result_user = (
User(
user_id=result["id"],
username=result["username"],
displayname=result["displayname"],
email=result["email"],
password=result["password"],
role_id=result["role_id"],
creation_date=result["creation_date"],
)
if result
else None
)
return result_user
def fetch_by_id(user_id: int) -> Optional[User]:
cursor = db_connection.cursor(dictionary=True)
cursor.execute("select * from user where id = %s", (user_id,))
result = cursor.fetchone()
result_user = (
User(
user_id=result["id"],
username=result["username"],
displayname=result["displayname"],
email=result["email"],
password=result["password"],
role_id=result["role_id"],
creation_date=result["creation_date"],
)
if result
else None
)
return result_user
def insert_user(new_user: User):
cursor = db_connection.cursor(dictionary=True)
cursor.execute(
"insert into user (username, displayname, email, password) values (%s, %s, %s, %s)",
(new_user.username, new_user.displayname, new_user.email, new_user.password),
)
db_connection.commit()
def delete_user(user: User):
cursor = db_connection.cursor(dictionary=True)
cursor.execute("delete from user where id = %s", (user.user_id,))
db_connection.commit()
def update_user(user: User):
cursor = db_connection.cursor(dictionary=True)
cursor.execute(
"update user set username=%s, displayname=%s, email=%s, password=%s where id = %s",
(user.username, user.displayname, user.email, user.password, user.user_id),
)
db_connection.commit()

View File

@ -1,6 +1,6 @@
main_swagger = { main_swagger = {
"info": { "info": {
"title": "Shop API", "title": "Swag Shop",
"version": "0.1", "version": "0.1",
"description": "Simple shop API using flask and co.\nFeatures include:\n- Not working\n- Successful registration of users\n- Adding items to cart\n- I don't know", "description": "Simple shop API using flask and co.\nFeatures include:\n- Not working\n- Successful registration of users\n- Adding items to cart\n- I don't know",
}, },

View File

@ -2,24 +2,21 @@ register_swagger = {
"methods": ["POST"], "methods": ["POST"],
"tags": ["User"], "tags": ["User"],
"description": "Registers a new user in the app. Also sends a notification to the user via the provided email", "description": "Registers a new user in the app. Also sends a notification to the user via the provided email",
"parameters": "parameters": [
[
{ {
"in": "body", "in": "body",
"name": "body", "name": "body",
"description": "Username, displayname and password of the new user\n- Username can be only lowercase and up to 64 characters\n- Displayname can contain special characters (. _ -) and lower and upper characters\n- Password must be at least 8 characters long, contain both lower and upper characters, numbers and special characters\n- Email has to be in format \"name@domain.tld\" and up to 64 characters long in total", "description": 'Username, displayname and password of the new user\n- Username can be only lowercase and up to 64 characters\n- Displayname can contain special characters (. _ -) and lower and upper characters\n- Password must be at least 8 characters long, contain both lower and upper characters, numbers and special characters\n- Email has to be in format "name@domain.tld" and up to 64 characters long in total',
"required": True, "required": True,
"schema": "schema": {
{
"type": "object", "type": "object",
"properties": "properties": {
{
"username": {"type": "string", "example": "mycoolusername"}, "username": {"type": "string", "example": "mycoolusername"},
"email": {"type": "string", "example": "mymail@dot.com"}, "email": {"type": "string", "example": "mymail@dot.com"},
"displayname": {"type": "string", "example": "MyCoolDisplayName"}, "displayname": {"type": "string", "example": "MyCoolDisplayName"},
"password": {"type": "string", "example": "My5tr0ngP@55w0rd"} "password": {"type": "string", "example": "My5tr0ngP@55w0rd"},
} },
} },
} }
], ],
} }
@ -28,46 +25,39 @@ login_swagger = {
"methods": ["POST"], "methods": ["POST"],
"tags": ["User"], "tags": ["User"],
"description": "Logs in using username and password and returns a JWT token for further authorization of requests.\n**The token is valid for 1 hour**", "description": "Logs in using username and password and returns a JWT token for further authorization of requests.\n**The token is valid for 1 hour**",
"parameters": "parameters": [
[
{ {
"in": "body", "in": "body",
"name": "body", "name": "body",
"description": "Username and password payload", "description": "Username and password payload",
"required": True, "required": True,
"schema": "schema": {
{
"type": "object",
"properties":
{
"username": {"type": "string", "example": "mycoolusername"},
"password": {"type": "string", "example": "MyStrongPassword123"}
}
}
}
],
"responses":
{
"200":
{
"description": "Returns a fresh token",
"schema":
{
"type": "object", "type": "object",
"properties": { "properties": {
"token": {"type": "string", "example": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJmcmVzaCI6ZmFsc2UsImlhdCI6MTcxMDMyMjkyOCwianRpIjoiZDFhYzQxZDktZjA4NC00MmYzLThlMWUtZWFmZjJiNGU1MDAyIiwidHlwZSI6ImFjY2VzcyIsInN1YiI6MjMwMDEsIm5iZiI6MTcxMDMyMjkyOCwiZXhwIjoxNzEwMzI2NTI4fQ.SW7LAi1j5vDOEIvzeN-sy0eHPP9PFJFkXYY029O35w0"} "username": {"type": "string", "example": "mycoolusername"},
"password": {"type": "string", "example": "MyStrongPassword123"},
},
},
} }
],
"responses": {
"200": {
"description": "Returns a fresh token",
"schema": {
"type": "object",
"properties": {
"token": {
"type": "string",
"example": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJmcmVzaCI6ZmFsc2UsImlhdCI6MTcxMDMyMjkyOCwianRpIjoiZDFhYzQxZDktZjA4NC00MmYzLThlMWUtZWFmZjJiNGU1MDAyIiwidHlwZSI6ImFjY2VzcyIsInN1YiI6MjMwMDEsIm5iZiI6MTcxMDMyMjkyOCwiZXhwIjoxNzEwMzI2NTI4fQ.SW7LAi1j5vDOEIvzeN-sy0eHPP9PFJFkXYY029O35w0",
} }
}, },
"400": },
{ },
"400": {
"description": "Possible causes:\n- Missing username or password from request.\n- Nonexistent username" "description": "Possible causes:\n- Missing username or password from request.\n- Nonexistent username"
}, },
"401": "401": {"description": "Password is incorrect"},
{ },
"description": "Password is incorrect"
}
}
} }
logout_swagger = { logout_swagger = {
@ -76,26 +66,51 @@ logout_swagger = {
"security": [{"JWT": []}], "security": [{"JWT": []}],
"description": "Logs out the user via provided JWT token", "description": "Logs out the user via provided JWT token",
"parameters": [], "parameters": [],
"responses": "responses": {"200": {"description": "User successfully logged out"}},
}
update_swagger = {
"methods": ["PUT"],
"tags": ["User"],
"security": [{"JWT": []}],
"description": "Updates user attributes.",
"parameters": [
{ {
"200": "in": "body",
{ "name": "body",
"description": "User successfully logged out" "description": "Attributes to update for the user.",
} "required": True,
"schema": {
"type": "object",
"properties": {
"new_username": {"type": "string", "example": "mycoolusername"},
"new_email": {"type": "string", "example": "mymail@dot.com"},
"new_displayname": {
"type": "string",
"example": "MyCoolDisplayName",
},
"new_password": {"type": "string", "example": "My5tr0ngP@55w0rd"},
},
},
} }
],
"responses": {
"200": {"description": "User attributes updated successfully."},
"400": {"description": "Bad request. Check the request body for errors."},
"401": {"description": "Unauthorized. User must be logged in."},
"409": {"description": "Conflict. Check the response message for details."},
"500": {
"description": "Internal server error. Contact the system administrator."
},
},
} }
delete_swagger = { delete_swagger = {
"methods": ["DELETE"], "methods": ["DELETE"],
"tags": ["User"], "tags": ["User"],
"security": [{"JWT": []}], "security": [{"JWT": []}],
"description": "Deletes a user via JWT token", "description": "Deletes a user via JWT token",
"parameters": [], "parameters": [],
"responses": "responses": {"200": {"description": "User successfully deleted"}},
{
"200":
{
"description": "User successfully deleted"
}
}
} }

View File

@ -17,5 +17,5 @@ jwt_redis_blocklist = redis.StrictRedis(
port=RedisConfig.REDIS_PORT, port=RedisConfig.REDIS_PORT,
password=RedisConfig.REDIS_PASSWORD, password=RedisConfig.REDIS_PASSWORD,
db=0, db=0,
decode_responses=True decode_responses=True,
) )

View File

@ -4,6 +4,7 @@ from . import jwt_manager
from app import app from app import app
@jwt_manager.token_in_blocklist_loader @jwt_manager.token_in_blocklist_loader
def check_if_token_is_revoked(jwt_header, jwt_payload: dict) -> bool: def check_if_token_is_revoked(jwt_header, jwt_payload: dict) -> bool:
jti = jwt_payload["jti"] jti = jwt_payload["jti"]

View File

@ -2,15 +2,12 @@ from flask_mail import Message
from app import flask_mail from app import flask_mail
from app.mail.messages import messages from app.mail.message_content import MessageContent
def send_mail(message: str, recipient: str): def send_mail(message: MessageContent, recipient: str):
body = messages[message]["body"] msg = Message(subject=message.subject, recipients=[recipient], body=message.body)
subject = messages[message]["subject"]
msg = Message(subject, recipients=[recipient], body=body)
try: try:
flask_mail.send(msg) flask_mail.send(msg)

View File

@ -0,0 +1,4 @@
class MessageContent:
def __init__(self, subject, body):
self.subject = subject
self.body = body

View File

@ -1,24 +0,0 @@
import datetime
messages = {
"register": {
"subject": "Successfully registered!",
"body": "Congratulations! Your account has been successfully created.\nThis mail also serves as a test that the email address is correct"
},
"login": {
"subject": "New Login detected!",
"body": "A new login token has been created"
},
"logout": {
"subject": "Successfully logged out",
"body": "A login has been revoked. No further action is needed."
},
"update": {
"subject": "Account updated",
"body": "Your account has been successfully updated. This also means you have been logged out of everywhere"
},
"delete": {
"subject": "Account Deleted!",
"body": "Your account has been deleted. No further action needed"
}
}

View File

@ -0,0 +1,15 @@
NOT_JSON = {"msg": "Request body must be JSON"}, 400
def UNKNOWN_DATABASE_ERROR(e):
return {"msg": f"An unknown error occurred within the database. {e}"}, 500
def MISSING_FIELDS(fields):
return {"msg": f"Missing required fields: {', '.join(fields)}"}, 400
def NO_FIELD_PROVIDED(possible_fields):
return {
"msg": f"No field was provided. At least one of the following is required: {', '.join(possible_fields)}"
}, 400

View File

@ -0,0 +1,6 @@
PRODUCT_LISTING_CREATED_SUCCESSFULLY = {"msg": "Successfully created a brand new product."}, 201
NOT_OWNER_OF_PRODUCT = {"msg": "You don't own this product, therefore you cannot delete it!"}, 400
UNKNOWN_PRODUCT = {"msg": "The product you tried fetching is not known. Try a different product ID."}, 400
SCROLLED_TOO_FAR = {"msg": "You scrolled too far in the pages. Try going back a little again."}, 400

View File

@ -0,0 +1,26 @@
USER_CREATED_SUCCESSFULLY = {"msg": "User created successfully."}, 201
USER_LOGGED_OUT_SUCCESSFULLY = {"msg": "Successfully logged out"}, 200
USER_DELETED_SUCCESSFULLY = {"msg": "User successfully deleted"}, 200
def USER_ACCOUNT_UPDATED_SUCCESSFULLY(updated_attributes):
return {"msg": f"Successfully updated your accounts {', '.join(updated_attributes)}"}, 200
INVALID_USERNAME_FORMAT = {
"msg": "Username is in incorrect format. It must be between 1 and 64 lowercase characters."
}, 400
INVALID_DISPLAYNAME_FORMAT = {
"msg": "Display name is in incorrect format. It must contain only letters, '.', '-', or '_' and be between 1 and 64 characters."
}, 400
INVALID_EMAIL_FORMAT = {"msg": "Email is in incorrect format."}, 400
INVALID_PASSWORD_FORMAT = {
"msg": "Password is in incorrect format. It must be between 8 and 64 characters and contain at least one uppercase letter, one lowercase letter, one digit, and one special character"
}, 400
EMAIL_ALREADY_IN_USE = {"msg": "Email already in use."}, 409
USERNAME_ALREADY_IN_USE = {"msg": "Username already in use."}, 409
USERNAME_NOT_FOUND = {"msg": "Username not found"}, 400
INCORRECT_PASSWORD = {"msg": "Incorrect password"}, 401
UNKNOWN_ERROR = {"msg": "An unknown error occurred with user"}, 500

View File

@ -0,0 +1,26 @@
from app.mail.message_content import MessageContent
USER_EMAIL_SUCCESSFULLY_REGISTERED = MessageContent(
subject="Successfully registered!",
body="Congratulations! Your account has been successfully created.\nThis mail also serves as a test that the email address is correct",
)
USER_EMAIL_SUCCESSFULLY_LOGGED_IN = MessageContent(
subject="New Login detected!",
body="A new login token has been created",
)
USER_EMAIL_SUCCESSFULLY_LOGGED_OUT = MessageContent(
subject="Successfully logged out",
body="A login has been revoked. No further action is needed.",
)
USER_EMAIL_SUCCESSFULLY_UPDATED_ACCOUNT = MessageContent(
subject="Account updated",
body="Your account has been successfully updated. This also means you have been logged out of everywhere",
)
USER_EMAIL_SUCCESSFULLY_DELETED_ACCOUNT = MessageContent(
subject="Account Deleted!",
body="Your account has been deleted. No further action needed",
)

63
app/models/cart_model.py Normal file
View File

@ -0,0 +1,63 @@
from datetime import datetime
class Cart:
"""
Represents a cart in the system.
:param id: The unique identifier of the cart.
:type id: int
:param price_total: The total price of the cart.
:type price_total: float
:param item_count: The count of items in the cart.
:type item_count: int
"""
def __init__(
self,
cart_id: int = None,
price_total: float = 0.00,
item_count: int = 0,
):
self.id = cart_id
self.price_total = price_total
self.item_count = item_count
def __repr__(self):
return f"Cart(id={self.id}, price_total={self.price_total}, item_count={self.item_count})"
class CartItem:
"""
Represents a cart item in the system.
:param id: The unique identifier of the cart item.
:type id: int
:param cart_id: The identifier of the cart.
:type cart_id: int
:param product_id: The identifier of the product.
:type product_id: int
:param count: The count of the product in the cart.
:type count: int
:param price_subtotal: The subtotal price of the product in the cart.
:type price_subtotal: float
:param date_added: The date and time when the item was added to the cart.
:type date_added: datetime
"""
def __init__(
self,
cart_item_id: int = None,
cart_id: int = None,
product_id: int = None,
count: int = 0,
price_subtotal: float = 0.00,
date_added: datetime = None,
):
self.id = cart_item_id
self.cart_id = cart_id
self.product_id = product_id
self.count = count
self.price_subtotal = price_subtotal
self.date_added = date_added or datetime.now()
def __repr__(self):
return f"CartItem(id={self.id}, cart_id={self.cart_id}, product_id={self.product_id}, count={self.count}, price_subtotal={self.price_subtotal}, date_added={self.date_added})"

View File

@ -0,0 +1,38 @@
from datetime import datetime
from decimal import Decimal
class Product:
"""
Represents a product in the system.
:param id: The unique identifier of the product.
:type id: int
:param seller_id: The user ID of the seller.
:type seller_id: int
:param name: The name of the product.
:type name: str
:param price: The price of the product.
:type price: Decimal
:param creation_date: The date and time when the product was created.
:type creation_date: datetime
"""
def __init__(
self,
product_id: int = None,
seller_id: int = None,
seller_name: str = None,
name: str = None,
price: Decimal = None,
creation_date: datetime = None,
):
self.product_id = product_id
self.seller_id = seller_id
self.seller_name = seller_name
self.name = name
self.price = price
self.creation_date = creation_date
def __repr__(self):
return f"Product(product_id={self.product_id}, seller_id={self.seller_id}, seller_name={self.seller_name}, name='{self.name}', price={self.price}, creation_date={self.creation_date!r})"

44
app/models/user_model.py Normal file
View File

@ -0,0 +1,44 @@
from datetime import datetime
class User:
"""
Represents a user in the system.
:param user_id: The unique identifier of the user.
:type user_id: int
:param username: The username of the user.
:type username: str
:param displayname: The display name of the user.
:type displayname: str
:param email: The email address of the user.
:type email: str
:param password: The hashed password of the user.
:type password: str
:param role_id: The role ID of the user. Defaults to 1.
:type role_id: int
:param creation_date: The date and time when the user was created.
:type creation_date: datetime
"""
def __init__(
self,
user_id: str = None,
username: str = None,
displayname: str = None,
email: str = None,
password: str = None,
role_id: int = 1,
creation_date: datetime = None,
):
self.user_id = user_id
self.username = username
self.displayname = displayname
self.email = email
self.password = password
self.role_id = role_id
self.creation_date = creation_date
def __repr__(self):
return f"User(id={self.user_id}, username={self.username}, displayname={self.displayname}, email={self.email}, password={self.password}, role_id={self.role_id}, creation_date={self.creation_date})"

View File

@ -3,41 +3,14 @@ from typing import Tuple, Union
from app.extensions import db_connection from app.extensions import db_connection
class CartService: class CartService:
@staticmethod @staticmethod
def add_to_cart(user_id: str, product_id: int, count: int) -> Tuple[Union[dict, str], int]:
"""
Adds a product to a user's cart.
:param user_id: User ID.
:type user_id: str
:param product_id: ID of product to be added.
:type product_id: int
:return: Tuple containing a dictionary with a token and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
try:
with db_connection.cursor(dictionary=True) as cursor:
cursor.execute("select count from cart_item where cart_id = %s and product_id = %s", (user_id, product_id))
result = cursor.fetchone()
if cursor.rowcount == 1:
cursor.execute("update cart_item set count = count + %s where cart_id = %s and product_id = %s", (count, user_id, product_id))
else:
cursor.execute("insert into cart_item(cart_id, product_id, count) values (%s, %s, %s)", (user_id, product_id, count))
db_connection.commit()
except Error as e:
return {"Failed": f"Failed to add item to cart. Reason: {e}"}, 500
return {"Success": "Successfully added to cart"}, 200
@staticmethod @staticmethod
def update_count(user_id: str, product_id: int, count: int) -> Tuple[Union[dict, str], int]: def update_count(
user_id: str, product_id: int, count: int
) -> Tuple[Union[dict, str], int]:
""" """
Updates count of products in user's cart Updates count of products in user's cart
@ -56,7 +29,10 @@ class CartService:
return CartService.delete_from_cart(user_id, product_id) return CartService.delete_from_cart(user_id, product_id)
with db_connection.cursor(dictionary=True) as cursor: with db_connection.cursor(dictionary=True) as cursor:
cursor.execute("update cart_item set count = %s where cart_id = %s and product_id = %s", (count, user_id, product_id)) cursor.execute(
"update cart_item set count = %s where cart_id = %s and product_id = %s",
(count, user_id, product_id),
)
db_connection.commit() db_connection.commit()
return {"Success": "Successfully added to cart"}, 200 return {"Success": "Successfully added to cart"}, 200
@ -77,17 +53,18 @@ class CartService:
:rtype: Tuple[Union[dict, str], int] :rtype: Tuple[Union[dict, str], int]
""" """
try: try:
with db_connection.cursor() as cursor: with db_connection.cursor() as cursor:
cursor.execute("delete from cart_item where cart_id = %s and product_id = %s", (user_id, product_id)) cursor.execute(
"delete from cart_item where cart_id = %s and product_id = %s",
(user_id, product_id),
)
db_connection.commit() db_connection.commit()
return {"Success": "Successfully removed item from cart"}, 200 return {"Success": "Successfully removed item from cart"}, 200
except Error as e: except Error as e:
return {"Failed": f"Failed to remove item from cart. Reason: {e}"}, 500 return {"Failed": f"Failed to remove item from cart. Reason: {e}"}, 500
@staticmethod @staticmethod
def show_cart(user_id: str) -> Tuple[Union[dict, str], int]: def show_cart(user_id: str) -> Tuple[Union[dict, str], int]:
""" """
@ -101,17 +78,20 @@ class CartService:
try: try:
with db_connection.cursor(dictionary=True) as cursor: with db_connection.cursor(dictionary=True) as cursor:
cursor.execute("select product.name as product_name, count, price_subtotal, date_added from cart_item inner join product on cart_item.product_id = product.id where cart_item.cart_id = %s", (user_id,)) cursor.execute(
"select product.name as product_name, count, price_subtotal, date_added from cart_item inner join product on cart_item.product_id = product.id where cart_item.cart_id = %s",
(user_id,),
)
rows = cursor.fetchall() rows = cursor.fetchall()
results = [] results = []
for row in rows: for row in rows:
mid_result = { mid_result = {
"name": row['product_name'], "name": row["product_name"],
"count": row['count'], "count": row["count"],
"price_subtotal": row['price_subtotal'], "price_subtotal": row["price_subtotal"],
"date_added": row['date_added'] "date_added": row["date_added"],
} }
results.append(mid_result) results.append(mid_result)
@ -121,7 +101,6 @@ class CartService:
except Error as e: except Error as e:
return {"Failed": f"Failed to load cart. Reason: {e}"}, 500 return {"Failed": f"Failed to load cart. Reason: {e}"}, 500
@staticmethod @staticmethod
def purchase(user_id: str) -> Tuple[Union[dict, str], int]: def purchase(user_id: str) -> Tuple[Union[dict, str], int]:
""" """
@ -136,7 +115,10 @@ class CartService:
try: try:
with db_connection.cursor(dictionary=True) as cursor: with db_connection.cursor(dictionary=True) as cursor:
# get all cart items # get all cart items
cursor.execute("select id, product_id, count, price_subtotal from cart_item where cart_id = %s", (user_id,)) cursor.execute(
"select id, product_id, count, price_subtotal from cart_item where cart_id = %s",
(user_id,),
)
results = cursor.fetchall() results = cursor.fetchall()
if len(results) < 1: if len(results) < 1:
@ -153,12 +135,12 @@ class CartService:
for row in results: for row in results:
mid_row = ( mid_row = (
last_id, last_id,
row['product_id'], row["product_id"],
row['count'], row["count"],
row['price_subtotal'] row["price_subtotal"],
) )
row_id = row['id'] row_id = row["id"]
parsed.append(mid_row) parsed.append(mid_row)
ids.append(row_id) ids.append(row_id)
@ -173,10 +155,8 @@ class CartService:
db_connection.commit() db_connection.commit()
# clear cart # clear cart
except Error as e: except Error as e:
return {"Failed": f"Failed to load cart. Reason: {e}"}, 500 return {"msg": f"Failed to load cart. Reason: {e}"}, 500
return {"Success": "Successfully purchased"}, 200
return {"msg": "Successfully purchased"}, 200

View File

@ -0,0 +1,30 @@
from mysql.connector import Error as mysqlError
from app.messages.api_responses import product_responses as response
import app.messages.api_errors as errors
from app.db import product_db
from app.models.product_model import Product
def create_product(seller_id: str, name: str, price: float):
"""
Creates a new product listing
:param seller_id: User ID
:type seller_id: str
:param name: New product's name
:type name: str
:param price: New product's price
:type price: float
"""
product: Product = Product(seller_id=seller_id, name=name, price=price)
try:
product_db.insert_product(product)
except mysqlError as e:
return errors.UNKNOWN_DATABASE_ERROR(e)
return response.PRODUCT_LISTING_CREATED_SUCCESSFULLY

View File

@ -0,0 +1,23 @@
from mysql.connector import Error as mysqlError
from app.messages.api_responses import product_responses as response
import app.messages.api_errors as errors
from app.db import product_db
from app.models.product_model import Product
def delete_product(seller_id: str, product_id: str):
product: Product = product_db.fetch_product_by_id(product_id)
if product.seller_id != seller_id:
return response.NOT_OWNER_OF_PRODUCT
try:
product_db.delete_product(product)
except mysqlError as e:
return errors.UNKNOWN_DATABASE_ERROR(e)
return response.PRODUCT_LISTING_CREATED_SUCCESSFULLY

View File

@ -0,0 +1,8 @@
import imghdr
def is_base64_jpg(decoded_string) -> bool:
try:
image_type = imghdr.what(None, decoded_string)
return image_type == "jpeg"
except Exception:
return False

View File

@ -0,0 +1,20 @@
from mysql.connector import Error as mysqlError
from app.messages.api_responses import product_responses as response
import app.messages.api_errors as errors
from app.db import product_db
from app.models.product_model import Product
def product_info(product_id: int):
try:
product: Product = product_db.fetch_product_extended_by_id(product_id)
if product is None:
return response.UNKNOWN_PRODUCT
return product, 200
except mysqlError as e:
return errors.UNKNOWN_DATABASE_ERROR(e)

View File

@ -0,0 +1,29 @@
from mysql.connector import Error as mysqlError
from app.messages.api_responses import product_responses as response
import app.messages.api_errors as errors
from app.db import product_db
def product_list(page: int):
try:
result_products = product_db.fetch_products(page)
if result_products is None:
return response.SCROLLED_TOO_FAR
result_obj = []
for product in result_products:
mid_result = {
"id": product.product_id,
"seller": product.seller_id,
"name": product.name,
"price": product.price,
}
result_obj.append(mid_result)
return result_obj, 200
except mysqlError as e:
errors.UNKNOWN_DATABASE_ERROR(e)

View File

@ -1,127 +0,0 @@
import base64
from flask import abort
from mysql.connector import Error
from decimal import Decimal
from app.extensions import db_connection
class ProductService:
@staticmethod
def get_products(page: int):
"""
Fetches 10 products
:param page: Page, aka offset of fetching
:type page: int
"""
try:
with db_connection.cursor(dictionary=True) as cursor:
offset = 10 * page
cursor.execute("select product.id, user.displayname as seller, product.name, product.price_pc from product inner join user on user.id = product.seller_id order by product.id limit 10 offset %s", (offset,))
results = cursor.fetchall()
if len(results) < 1:
return {"Failed": "Failed to fetch products. You've probably selected too far with pages"}, 400
result_obj = []
for row in results:
mid_result = {
"id": row['id'],
"seller": row['seller'],
"name": row['name'],
"price": row['price_pc']
}
result_obj.append(mid_result)
return result_obj, 200
except Error as e:
return {"Failed": f"Failed to fetch products. Error: {e}"}, 500
@staticmethod
def get_product_info(fields: list[str], product_id: int):
"""
Fetches specific product info
:param fields: array of fields that can be fetched
:type fields: list[str]
:param product_id: ID of product to be updated.
:type product_id: int
"""
try:
with db_connection.cursor(dictionary=True) as cursor:
fields_to_sql = {
"name": "product.name",
"price": "product.price_pc as price",
"image": "product.image",
"image_name": "product.image_name",
"seller": "user.displayname as seller"
}
field_sql = []
for field in fields:
field_sql.append(fields_to_sql[field])
select_params = ", ".join(field_sql)
if "seller" in fields:
cursor.execute(f"select {select_params} from product inner join user on user.id = product.seller_id where product.id = %s", (product_id,))
else:
cursor.execute(f"select {select_params} from product where id = %s", (product_id,))
result = cursor.fetchone()
if cursor.rowcount != 1:
return {"Failed": "Failed to fetch product. Product likely doesn't exist"}, 400
result_obj = {}
for field in fields:
if field == "image": # Encode image into base64
result_obj[field] = base64.b64encode(result[field]).decode('utf-8')
else:
result_obj[field] = result[field]
return result_obj, 200
except Error as e:
return {"Failed": f"Failed to fetch product info. Error: {e}"}, 500
@staticmethod
def create_listing(seller_id: str, name: str, price: float):
"""
Creates a new product listing
:param seller_id: User ID
:type seller_id: str
:param name: New product's name
:type name: str
:param price: New product's price
:type price: float
"""
try:
with db_connection.cursor() as cursor:
rounded_price = round(price, 2)
cursor.execute("insert into product(seller_id, name, price_pc) values (%s, %s, %s)", (seller_id, name, Decimal(str(rounded_price))))
db_connection.commit()
except Error as e:
return {"Failed": f"Failed to create product. {e}"}, 400
return {"Success": "Successfully created new product listing"}, 200
@staticmethod
def __is_base64_jpg(decoded_string):
try:
# Checking if the decoded data represents a valid JPEG image
image_type = imghdr.what(None, decoded_data)
return image_type == 'jpeg'
except Exception:
return False

View File

@ -0,0 +1,31 @@
from typing import Tuple, Union
from mysql.connector import Error as mysqlError
import app.db.user_db as user_db
from app.models.user_model import User
import app.messages.api_responses.user_responses as response
import app.messages.api_errors as errors
from app.mail.mail import send_mail
from app.messages.mail_responses.user_email import USER_EMAIL_SUCCESSFULLY_DELETED_ACCOUNT
def delete_user(user_id: str) -> Tuple[Union[dict, str], int]:
"""
Deletes a user account.
:param user_id: User ID.
:type user_id: str
:return: Tuple containing a dictionary and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
try:
user: User = user_db.fetch_by_id(user_id=user_id)
user_db.delete_user(user)
send_mail(USER_EMAIL_SUCCESSFULLY_DELETED_ACCOUNT, user.email)
except mysqlError as e:
return errors.UNKNOWN_DATABASE_ERROR(e)
return response.USER_DELETED_SUCCESSFULLY

View File

@ -0,0 +1,45 @@
import datetime
from typing import Tuple, Union
import bcrypt
from mysql.connector import Error as mysqlError
from flask_jwt_extended import create_access_token
import app.messages.api_responses.user_responses as response
import app.messages.api_errors as errors
from app.db import user_db
from app.mail.mail import send_mail
from app.models.user_model import User
from app.messages.mail_responses.user_email import USER_EMAIL_SUCCESSFULLY_LOGGED_IN
def login(username: str, password: str) -> Tuple[Union[dict, str], int]:
"""
Authenticates a user with the provided username and password.
:param username: User's username.
:type username: str
:param password: User's password.
:type password: str
:return: Tuple containing a dictionary with a token and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
try:
user: User = user_db.fetch_by_username(username)
if user is None:
return response.USERNAME_NOT_FOUND
if not bcrypt.checkpw(password.encode("utf-8"), user.password.encode("utf-8")):
return response.INCORRECT_PASSWORD
expire = datetime.timedelta(hours=1)
token = create_access_token(identity=user.user_id, expires_delta=expire)
send_mail(USER_EMAIL_SUCCESSFULLY_LOGGED_IN, user.email)
return {"token": token}, 200
except mysqlError as e:
return errors.UNKNOWN_DATABASE_ERROR(e)

View File

@ -0,0 +1,31 @@
from typing import Tuple, Union
import app.messages.api_responses.user_responses as response
from app.db import user_db
from app.models.user_model import User
from app.mail.mail import send_mail
from app.services.user import user_helper as helper
from app.messages.mail_responses.user_email import USER_EMAIL_SUCCESSFULLY_LOGGED_OUT
def logout(jwt_token, user_id, send_notif: bool) -> Tuple[Union[dict, str], int]:
"""
Logs out a user by invalidating the provided JWT.
:param jti: JWT ID.
:type jti: str
:param exp: JWT expiration timestamp.
:type exp: int
:return: Tuple containing a dictionary and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
jti = jwt_token["jti"]
exp = jwt_token["exp"]
user: User = user_db.fetch_by_id(user_id)
helper.invalidate_token(jti, exp)
if send_notif:
send_mail(USER_EMAIL_SUCCESSFULLY_LOGGED_OUT, user.email)
return response.USER_LOGGED_OUT_SUCCESSFULLY

View File

@ -0,0 +1,64 @@
import bcrypt
from typing import Tuple, Union
from mysql.connector import Error as mysqlError
import app.messages.api_responses.user_responses as response
import app.messages.api_errors as errors
from app.db import user_db
from app.mail.mail import send_mail
from app.models.user_model import User
from app.services.user import user_helper as helper
from app.messages.mail_responses.user_email import USER_EMAIL_SUCCESSFULLY_REGISTERED
def register(
username: str, displayname: str, email: str, password: str
) -> Tuple[Union[dict, str], int]:
"""
Registers a new user with the provided username, email, and password.
:param username: User's username.
:type username: str
:param email: User's email address.
:type email: str
:param password: User's password.
:type password: str
:return: Tuple containing a dictionary and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
try:
if not helper.verify_username(username):
return response.INVALID_USERNAME_FORMAT
if not helper.verify_displayname(displayname):
return response.INVALID_DISPLAYNAME_FORMAT
if not helper.verify_email(email):
return response.INVALID_EMAIL_FORMAT
if not helper.verify_password(password):
return response.INVALID_PASSWORD_FORMAT
hashed_password = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt())
new_user: User = User(
username=username,
displayname=displayname,
email=email,
password=hashed_password,
)
user_db.insert_user(new_user)
except mysqlError as e:
if "email" in e.msg:
return response.EMAIL_ALREADY_IN_USE
if "username" in e.msg:
return response.USERNAME_ALREADY_IN_USE
return errors.UNKNOWN_DATABASE_ERROR(e)
send_mail(USER_EMAIL_SUCCESSFULLY_REGISTERED, new_user.email)
return response.USER_CREATED_SUCCESSFULLY

View File

@ -0,0 +1,72 @@
import bcrypt
from typing import Tuple, Union
from mysql.connector import Error as mysqlError
from app.db import user_db
from app.mail.mail import send_mail
from app.models.user_model import User
from app.services.user import user_helper as helper
from app.messages.api_responses import user_responses as response
import app.messages.api_errors as errors
from app.messages.mail_responses.user_email import (
USER_EMAIL_SUCCESSFULLY_UPDATED_ACCOUNT,
)
def update_user(
user_id: str,
new_username: str = None,
new_displayname: str = None,
new_email: str = None,
new_password: str = None,
) -> Tuple[Union[dict, str], int]:
user: User = user_db.fetch_by_id(user_id)
updated_attributes = []
if user is None:
return response.UNKNOWN_ERROR
if new_username:
if not helper.verify_username(new_username):
return response.INVALID_USERNAME_FORMAT
user.username = new_username
updated_attributes.append("username")
if new_displayname:
if not helper.verify_displayname(new_displayname):
return response.INVALID_DISPLAYNAME_FORMAT
user.displayname = new_displayname
updated_attributes.append("displayname")
if new_email:
if not helper.verify_email(new_email):
return response.INVALID_EMAIL_FORMAT
user.email = new_email
updated_attributes.append("email")
if new_password:
if not helper.verify_password(new_password):
return response.INVALID_PASSWORD_FORMAT
hashed_password = bcrypt.hashpw(new_password.encode("utf-8"), bcrypt.gensalt())
user.password = hashed_password
updated_attributes.append("password")
try:
user_db.update_user(user)
except mysqlError as e:
if "username" in e.msg:
return response.USERNAME_ALREADY_IN_USE
if "email" in e.msg:
return response.EMAIL_ALREADY_IN_USE
return errors.UNKNOWN_DATABASE_ERROR(e)
send_mail(USER_EMAIL_SUCCESSFULLY_UPDATED_ACCOUNT, user.email)
return response.USER_ACCOUNT_UPDATED_SUCCESSFULLY(updated_attributes)

View File

@ -0,0 +1,74 @@
import re
from datetime import datetime
from app.extensions import jwt_redis_blocklist
def invalidate_token(jti: str, exp: int):
"""
Invalidates a JWT by adding its JTI to the Redis blocklist.
:param jti: JWT ID.
:type jti: str
:param exp: JWT expiration timestamp.
:type exp: int
"""
expiration = datetime.fromtimestamp(exp)
now = datetime.now()
delta = expiration - now
jwt_redis_blocklist.set(jti, "", ex=delta)
def verify_email(email: str) -> bool:
"""
Verifies a given email string against a regular expression.
:param email: Email string.
:type email: str
:return: Boolean indicating whether the email successfully passed the check.
:rtype: bool
"""
email_regex = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
return re.match(email_regex, email) and len(email) <= 64
def verify_displayname(displayname: str) -> bool:
"""
Verifies a given display name string against a regular expression.
:param displayname: Display name string.
:type displayname: str
:return: Boolean indicating whether the display name successfully passed the check.
:rtype: bool
"""
displayname_regex = r"^[a-zA-Z.-_]{1,64}$"
return re.match(displayname_regex, displayname)
def verify_username(username: str) -> bool:
"""
Verifies a given username string against a regular expression.
:param username: Username string.
:type username: str
:return: Boolean indicating whether the username successfully passed the check.
:rtype: bool
"""
username_regex = r"^[a-z]{1,64}$"
return re.match(username_regex, username)
def verify_password(password: str) -> bool:
"""
Verifies a given password string against a regular expression.
:param password: Password string.
:type password: str
:return: Boolean indicating whether the password successfully passed the check.
:rtype: bool
"""
password_regex = (
r"^(?=.*?[A-Z])(?=.*?[a-z])(?=.*?[0-9])(?=.*?[#?!@$ %^&*-]).{8,64}$"
)
return re.match(password_regex, password)

View File

@ -1,326 +0,0 @@
import bcrypt
import re
import jwt
import datetime
from typing import Tuple, Union
from mysql.connector import Error
from flask_jwt_extended import create_access_token
from app.extensions import db_connection
from app.extensions import jwt_redis_blocklist
from app.mail.mail import send_mail
class UserService:
"""
UserService class provides methods for user-related operations.
Methods:
- register(username: str, email: str, password: str) -> Tuple[Union[dict, str], int]
- login(username: str, password: str) -> Tuple[Union[dict, str], int]
- logout(jti, exp) -> Tuple[Union[dict, str], int]
- update_email(user_id: str, new_email: str) -> Tuple[Union[dict, str], int]
- update_username(user_id: str, new_username: str) -> Tuple[Union[dict, str], int]
- update_password(user_id: str, new_password: str) -> Tuple[Union[dict, str], int]
"""
@staticmethod
def register(username: str, displayname: str, email: str, password: str) -> Tuple[Union[dict, str], int]:
"""
Registers a new user with the provided username, email, and password.
:param username: User's username.
:type username: str
:param email: User's email address.
:type email: str
:param password: User's password.
:type password: str
:return: Tuple containing a dictionary and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
try:
if not UserService.__verify_username(username):
return {"Failed": "Failed to verify username. Try another username"}, 400
if not UserService.__verify_displayname(displayname):
return {"Failed": "Failed to verify display name. Try another name"}, 400
if not UserService.__verify_email(email):
return {"Failed": "Failed to verify email. Try another email"}, 400
if not UserService.__verify_password(password):
return {"Failed": "Failed to verify password. Try another (stronger) password"}, 400
hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt())
with db_connection.cursor() as cursor:
cursor.execute("insert into user (username, displayname, email, password) values (%s, %s, %s, %s)", (username, displayname, email, hashed_password))
db_connection.commit()
except Error as e:
print(f"Error: {e}")
return {"Failed": "Failed to insert into database. Username or email are likely in use already"}, 500
UserService.__send_email("register", email=email)
return {"Success": "User created successfully"}, 200
@staticmethod
def login(username: str, password: str) -> Tuple[Union[dict, str], int]:
"""
Authenticates a user with the provided username and password.
:param username: User's username.
:type username: str
:param password: User's password.
:type password: str
:return: Tuple containing a dictionary with a token and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
try:
with db_connection.cursor(dictionary=True) as cursor:
cursor.execute("select id, email, password from user where username = %s", (username,))
result = cursor.fetchone()
user_id = result['id']
email = result['email']
password_hash = result['password']
if user_id is None:
return {"Failed": "Username not found"}, 400
if not bcrypt.checkpw(password.encode('utf-8'), password_hash.encode('utf-8')):
return {"Failed": "Incorrect password"}, 401
expire = datetime.timedelta(hours=1)
token = create_access_token(identity=user_id, expires_delta=expire)
UserService.__send_email("login", email=email)
return {"token": token}, 200
except Error as e:
return {"Failed": f"Failed to login. Error: {e}"}, 500
@staticmethod
def logout(jwt_token, user_id) -> Tuple[Union[dict, str], int]:
"""
Logs out a user by invalidating the provided JWT.
:param jti: JWT ID.
:type jti: str
:param exp: JWT expiration timestamp.
:type exp: int
:return: Tuple containing a dictionary and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
jti = jwt['jti']
exp = jwt['exp']
UserService.__invalidate_token(jti, exp)
UserService.__send_email("logout", id=user_id)
return {"Success": "Successfully logged out"}, 200
@staticmethod
def delete_user(user_id: str) -> Tuple[Union[dict, str], int]:
"""
Deletes a user account.
:param user_id: User ID.
:type user_id: str
:return: Tuple containing a dictionary and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
UserService.__send_email("delete", id=user_id)
try:
with db_connection.cursor() as cursor:
cursor.execute("delete from user where id = %s", (user_id,))
db_connection.commit()
except Error as e:
return {"Failed": f"Failed to delete user. {e}"}, 500
return {"Success": "User successfully deleted"}, 200
@staticmethod
def update_email(user_id: str, new_email: str) -> Tuple[Union[dict, str], int]:
"""
Updates the email address for a user with the provided user ID.
:param user_id: User's ID.
:type user_id: str
:param new_email: New email address.
:type new_email: str
:return: Tuple containing a dictionary and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
try:
if not UserService.__verify_email(new_email):
return {"Failed": "Failed to verify email. Try another email"}, 400
with db_connection.cursor() as cursor:
cursor.execute("update user set email = %s where id = %s", (new_email, user_id))
db_connection.commit()
except Error as e:
return {"Failed": f"Failed to update email. Email is likely in use already. Error: {e}"}, 500
return {"Success": "Email successfully updated"}, 200
@staticmethod
def update_username(user_id: str, new_username: str) -> Tuple[Union[dict, str], int]:
"""
Updates the username for a user with the provided user ID.
:param user_id: User's ID.
:type user_id: str
:param new_username: New username.
:type new_username: str
:return: Tuple containing a dictionary and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
try:
if not UserService.__verify_name(new_username):
return {"Failed": "Failed to verify username. Try another one"}, 400
with db_connection.cursor() as cursor:
cursor.execute("update user set username = %s where id = %s", (new_username, user_id))
db_connection.commit()
except Error as e:
return {"Failed": f"Failed to update username. Username is likely in use already. Error: {e}"}, 500
return {"Success": "Username successfully updated"}, 200
@staticmethod
def update_password(user_id: str, new_password: str) -> Tuple[Union[dict, str], int]:
"""
Updates the password for a user with the provided user ID.
:param user_id: User's ID.
:type user_id: str
:param new_password: New password.
:type new_password: str
:return: Tuple containing a dictionary and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
try:
if not UserService.__verify_password(new_password):
return {"Failed": "Failed to verify password. Try another (stronger) one"}, 400
hashed_password = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt())
with db_connection.cursor() as cursor:
cursor.execute("update user set password = %s where id = %s", (new_username, user_id))
db_connection.commit()
except Error as e:
return {"Failed": f"Failed to update password. Error: {e}"}, 500
return {"Success": "Password successfully updated"}, 200
@staticmethod
def __send_email(message: str, username: str = None, id: str = None, email: str = None):
if email is not None:
send_mail(message, email)
return
if username is not None:
try:
with db_connection.cursor(dictionary=True) as cursor:
cursor.execute("select email from user where username = %s", (username,))
result = cursor.fetchone()
email = result['email']
send_mail(message, email)
except Error as e:
return {"Failed": f"Failed to fetch some data. Error: {e}"}, 500
return
if id is not None:
try:
with db_connection.cursor(dictionary=True) as cursor:
cursor.execute("select email from user where id = %s", (id,))
result = cursor.fetchone()
email = result['email']
send_mail(message, email)
except Error as e:
return {"Failed": f"Failed to fetch some data. Error: {e}"}, 500
return
raise ValueError("Invalid input data to send mail")
@staticmethod
def __invalidate_token(jti: str, exp: int):
"""
Invalidates a JWT by adding its JTI to the Redis blocklist.
:param jti: JWT ID.
:type jti: str
:param exp: JWT expiration timestamp.
:type exp: int
"""
expiration = datetime.datetime.fromtimestamp(exp)
now = datetime.datetime.now()
delta = expiration - now
jwt_redis_blocklist.set(jti, "", ex=delta)
@staticmethod
def __verify_email(email: str) -> bool:
"""
Verifies a given email string against a regular expression.
:param email: Email string.
:type email: str
:return: Boolean indicating whether the email successfully passed the check.
:rtype: bool
"""
email_regex = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
return re.match(email_regex, email) and len(email) <= 64
@staticmethod
def __verify_displayname(displayname: str) -> bool:
"""
Verifies a given display name string against a regular expression.
:param displayname: Display name string.
:type displayname: str
:return: Boolean indicating whether the display name successfully passed the check.
:rtype: bool
"""
displayname_regex = r"^[a-zA-Z.-_]{1,64}$"
return re.match(displayname_regex, displayname)
@staticmethod
def __verify_username(username: str) -> bool:
"""
Verifies a given username string against a regular expression.
:param username: Username string.
:type username: str
:return: Boolean indicating whether the username successfully passed the check.
:rtype: bool
"""
username_regex = r"^[a-z]{1,64}$"
return re.match(username_regex, username)
@staticmethod
def __verify_password(password: str) -> bool:
"""
Verifies a given password string against a regular expression.
:param password: Password string.
:type password: str
:return: Boolean indicating whether the password successfully passed the check.
:rtype: bool
"""
password_regex = r"^(?=.*?[A-Z])(?=.*?[a-z])(?=.*?[0-9])(?=.*?[#?!@$ %^&*-]).{8,64}$"
return re.match(password_regex, password)

View File

@ -1,6 +1,4 @@
from dotenv import load_dotenv from dotenv import load_dotenv
from flask_jwt_extended import JWTManager
import os
from app import create_app from app import create_app