[rewrite] Implemented login and register

This commit is contained in:
Thastertyn 2025-03-12 14:01:13 +01:00
parent cd8fdb9c21
commit 20ef2aa4e8
29 changed files with 183 additions and 722 deletions

View File

@ -1,11 +1,11 @@
from fastapi import APIRouter from fastapi import APIRouter
from app.api.routes import cart_routes, user_routes, utils_routes, shop from app.api.routes import cart_routes, login_routes, shop, user_routes, utils_routes
api_router = APIRouter() api_router = APIRouter()
api_router.include_router(cart_routes.router) api_router.include_router(cart_routes.router)
api_router.include_router(user_routes.router) api_router.include_router(user_routes.router)
api_router.include_router(utils_routes.router) api_router.include_router(utils_routes.router)
api_router.include_router(login_routes.router)
api_router.include_router(shop.shop_router) api_router.include_router(shop.shop_router)

View File

@ -1,23 +1,18 @@
from typing import Annotated from typing import Annotated
from sqlmodel import Session
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
import jwt import jwt
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jwt.exceptions import InvalidTokenError from jwt.exceptions import InvalidTokenError
from pydantic import ValidationError from pydantic import ValidationError
from sqlmodel import Session
from app.core.config import settings
from app.core import security from app.core import security
from app.core.config import settings
from app.database.manager import get_session from app.database.manager import get_session
from app.database.models.user_model import User from app.database.models.user_model import User, UserRole
from app.schemas.user_schemas import TokenPayload from app.schemas.user_schemas import TokenPayload
reusable_oauth2 = OAuth2PasswordBearer( reusable_oauth2 = OAuth2PasswordBearer(
tokenUrl="/login/access-token" tokenUrl="/login/access-token"
) )
@ -46,3 +41,10 @@ def get_current_user(session: SessionDep, token: TokenDep) -> User:
CurrentUser = Annotated[User, Depends(get_current_user)] CurrentUser = Annotated[User, Depends(get_current_user)]
def get_owner_user(current_user: CurrentUser) -> User:
if current_user.user_role != UserRole.OWNER:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="You must be an owner")
return current_user
CurrentOwnerUser = Annotated[User, Depends(get_owner_user)]

View File

@ -1,20 +1,16 @@
from typing import Annotated
from datetime import timedelta from datetime import timedelta
from typing import Annotated
from fastapi import APIRouter, HTTPException, Depends from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordRequestForm from fastapi.security import OAuth2PasswordRequestForm
from app.api.dependencies import SessionDep from app.api.dependencies import SessionDep
from app.core import security from app.core import security
from app.core.config import settings from app.core.config import settings
from app.crud import user_crud
from app.schemas.user_schemas import Token from app.schemas.user_schemas import Token
from app.crud import user_crud router = APIRouter(tags=["Login"])
router = APIRouter(tags=["login"])
@router.post("/login/access-token") @router.post("/login/access-token")

View File

@ -1,23 +1,19 @@
from typing import Annotated
from datetime import timedelta from datetime import timedelta
from typing import Annotated
from fastapi import APIRouter, HTTPException, Depends from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordRequestForm from fastapi.security import OAuth2PasswordRequestForm
from app.api.dependencies import SessionDep from app.api.dependencies import SessionDep
from app.core import security from app.core import security
from app.core.config import settings from app.core.config import settings
from app.crud import user_crud
from app.schemas.user_schemas import Token from app.schemas.user_schemas import Token
from app.crud import user_crud router = APIRouter(prefix="/login", tags=["Login"])
router = APIRouter(tags=["Shop-Login"]) @router.post("/access-token")
@router.post("/login/access-token")
def login_access_token( def login_access_token(
session: SessionDep, session: SessionDep,
form_data: Annotated[OAuth2PasswordRequestForm, Depends()] form_data: Annotated[OAuth2PasswordRequestForm, Depends()]

View File

@ -3,12 +3,13 @@ from typing import Annotated
from fastapi import APIRouter, Body, Path from fastapi import APIRouter, Body, Path
from app.api.dependencies import SessionDep
from app.crud.user_crud import create_user
from app.database.models.user_model import UserRole
from app.schemas.user_schemas import UserRegister from app.schemas.user_schemas import UserRegister
router = APIRouter( router = APIRouter(
prefix="/user", prefix="/user"
tags=["Shop-Users"]
) )
@ -23,8 +24,8 @@ async def logout():
@router.post("/register", summary="Register new user") @router.post("/register", summary="Register new user")
async def register(user_data: UserRegister): async def register(session: SessionDep, user_data: UserRegister, shop_uuid=Annotated[uuid.UUID, Path(title="UUID of the shop")]):
raise NotImplementedError() create_user(session, user_data, shop_uuid, UserRole.CUSTOMER)
@router.put("/update", summary="Update user details") @router.put("/update", summary="Update user details")

View File

@ -1,9 +1,16 @@
from fastapi import APIRouter, Body import logging
import re
from app.schemas.user_schemas import UserRegister from fastapi import APIRouter, Body, HTTPException, status
from sqlalchemy.exc import IntegrityError
from app.api.dependencies import SessionDep from app.api.dependencies import SessionDep
from app.crud import user_crud from app.crud import user_crud
from app.database.models.user_model import UserRole
from app.schemas.user_schemas import UserRegister
logger = logging.getLogger(__name__)
router = APIRouter( router = APIRouter(
prefix="/user", prefix="/user",
@ -24,10 +31,26 @@ async def logout():
@router.post("/register", summary="Register new user") @router.post("/register", summary="Register new user")
async def register(session: SessionDep, user_data: UserRegister): async def register(session: SessionDep, user_data: UserRegister):
try: try:
user_crud.create_user(session, user_data) user_crud.create_user(session, user_data, None, UserRole.OWNER)
return {"message": "User registered successfully"} return {"detail": "Registered succeesfully"}
except BaseException: except IntegrityError as e:
return {"message": "An error occurred"} field_mapping = {"uuid": "email"} # If a UUID is duplicate, it means email is in use
constraint = e.orig.diag.constraint_name
column_name = re.sub(r"^user_(\w+)_key$", r"\1", constraint) if constraint else None
if column_name == "uuid":
column_name = "email"
detail = f"{field_mapping.get(column_name, column_name or 'Entry').capitalize()} already in use"
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=detail
)
except Exception as e:
logger.error(e)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to register")
@router.put("/update", summary="Update user details") @router.put("/update", summary="Update user details")

View File

@ -9,5 +9,5 @@ class RepositoryError(SwagShopError):
"""Generic error occuring in a repository""" """Generic error occuring in a repository"""
class ServiceError(SwagShopError): class CrudError(SwagShopError):
"""Generic error occuring in a service""" """Generic error occuring in a service"""

View File

@ -1,162 +0,0 @@
from mysql.connector import Error
from typing import Tuple, Union
from app.extensions import db_connection
class CartService:
@staticmethod
@staticmethod
def update_count(
user_id: str, product_id: int, count: int
) -> Tuple[Union[dict, str], int]:
"""
Updates count of products in user's cart
:param user_id: User ID.
:type user_id: str
:param product_id: ID of product to be updated.
:type product_id: int
:param count: New count of products
:type count: int
:return: Tuple containing a dictionary with a token and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
try:
if count <= 0:
return CartService.delete_from_cart(user_id, product_id)
with db_connection.cursor(dictionary=True) as cursor:
cursor.execute(
"update cart_item set count = %s where cart_id = %s and product_id = %s",
(count, user_id, product_id),
)
db_connection.commit()
return {"Success": "Successfully added to cart"}, 200
except Error as e:
return {"Failed": f"Failed to update item count in cart. Reason: {e}"}, 500
@staticmethod
def delete_from_cart(user_id: str, product_id: int) -> Tuple[Union[dict, str], int]:
"""
Completely deletes an item from a user's cart
:param user_id: User ID.
:type user_id: str
:param product_id: ID of product to be updated.
:type product_id: int
:return: Tuple containing a dictionary with a token and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
try:
with db_connection.cursor() as cursor:
cursor.execute(
"delete from cart_item where cart_id = %s and product_id = %s",
(user_id, product_id),
)
db_connection.commit()
return {"Success": "Successfully removed item from cart"}, 200
except Error as e:
return {"Failed": f"Failed to remove item from cart. Reason: {e}"}, 500
@staticmethod
def show_cart(user_id: str) -> Tuple[Union[dict, str], int]:
"""
Gives the user the content of their cart
:param user_id: User ID.
:type user_id: str
:return: Tuple containing a dictionary with a token and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
try:
with db_connection.cursor(dictionary=True) as cursor:
cursor.execute(
"select product.name as product_name, count, price_subtotal, date_added from cart_item inner join product on cart_item.product_id = product.id where cart_item.cart_id = %s",
(user_id,),
)
rows = cursor.fetchall()
results = []
for row in rows:
mid_result = {
"name": row["product_name"],
"count": row["count"],
"price_subtotal": row["price_subtotal"],
"date_added": row["date_added"],
}
results.append(mid_result)
return results, 200
except Error as e:
return {"Failed": f"Failed to load cart. Reason: {e}"}, 500
@staticmethod
def purchase(user_id: str) -> Tuple[Union[dict, str], int]:
"""
"Purchases" the contents of user's cart
:param user_id: User ID.
:type user_id: str
:return: Tuple containing a dictionary with a token and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
try:
with db_connection.cursor(dictionary=True) as cursor:
# get all cart items
cursor.execute(
"select id, product_id, count, price_subtotal from cart_item where cart_id = %s",
(user_id,),
)
results = cursor.fetchall()
if len(results) < 1:
return {"Failed": "Failed to purchase. Cart is Empty"}, 400
# create a purchase
cursor.execute("insert into purchase(user_id) values (%s)", (user_id,))
last_id = cursor.lastrowid
parsed = []
ids = []
for row in results:
mid_row = (
last_id,
row["product_id"],
row["count"],
row["price_subtotal"],
)
row_id = row["id"]
parsed.append(mid_row)
ids.append(row_id)
insert_query = "INSERT INTO purchase_item (purchase_id, product_id, count, price_subtotal) VALUES (%s, %s, %s, %s)"
for row in parsed:
cursor.execute(insert_query, row)
delete_query = "delete from cart_item where id = %s"
for one_id in ids:
cursor.execute(delete_query, (one_id,))
db_connection.commit()
# clear cart
except Error as e:
return {"msg": f"Failed to load cart. Reason: {e}"}, 500
return {"msg": "Successfully purchased"}, 200

View File

@ -1,30 +0,0 @@
from mysql.connector import Error as mysqlError
from app.messages.api_responses import product_responses as response
import app.messages.api_errors as errors
from app.db import product_db
from app.models.product_model import Product
def create_product(seller_id: str, name: str, price: float):
"""
Creates a new product listing
:param seller_id: User ID
:type seller_id: str
:param name: New product's name
:type name: str
:param price: New product's price
:type price: float
"""
product: Product = Product(seller_id=seller_id, name=name, price=price)
try:
product_db.insert_product(product)
except mysqlError as e:
return errors.UNKNOWN_DATABASE_ERROR(e)
return response.PRODUCT_LISTING_CREATED_SUCCESSFULLY

View File

@ -1,23 +0,0 @@
from mysql.connector import Error as mysqlError
from app.messages.api_responses import product_responses as response
import app.messages.api_errors as errors
from app.db import product_db
from app.models.product_model import Product
def delete_product(seller_id: str, product_id: str):
product: Product = product_db.fetch_product_by_id(product_id)
if product.seller_id != seller_id:
return response.NOT_OWNER_OF_PRODUCT
try:
product_db.delete_product(product)
except mysqlError as e:
return errors.UNKNOWN_DATABASE_ERROR(e)
return response.PRODUCT_LISTING_CREATED_SUCCESSFULLY

View File

@ -1,9 +0,0 @@
import imghdr
def is_base64_jpg(decoded_string) -> bool:
try:
image_type = imghdr.what(None, decoded_string)
return image_type == "jpeg"
except Exception:
return False

View File

@ -1,20 +0,0 @@
from mysql.connector import Error as mysqlError
from app.messages.api_responses import product_responses as response
import app.messages.api_errors as errors
from app.db import product_db
from app.models.product_model import Product
def product_info(product_id: int):
try:
product: Product = product_db.fetch_product_extended_by_id(product_id)
if product is None:
return response.UNKNOWN_PRODUCT
return product, 200
except mysqlError as e:
return errors.UNKNOWN_DATABASE_ERROR(e)

View File

@ -1,30 +0,0 @@
from mysql.connector import Error as mysqlError
from app.messages.api_responses import product_responses as response
import app.messages.api_errors as errors
from app.db import product_db
def product_list(page: int):
try:
result_products = product_db.fetch_products(page)
if result_products is None:
return response.SCROLLED_TOO_FAR
result_obj = []
for product in result_products:
mid_result = {
"id": product.product_id,
"seller": product.seller_id,
"name": product.name,
"price": product.price,
}
result_obj.append(mid_result)
return result_obj, 200
except mysqlError as e:
errors.UNKNOWN_DATABASE_ERROR(e)

View File

@ -0,0 +1,11 @@
from typing import Optional
from uuid import UUID
from sqlmodel import Session, select
from app.database.models.shop_model import Shop
def get_shop_id_from_uuid(session: Session, shop_id: int) -> Optional[UUID]:
stmt = select(Shop).where(Shop.id == shop_id)
db_shop = session.exec(stmt).one_or_none()
return db_shop

View File

@ -1,32 +0,0 @@
from typing import Tuple, Union
from mysql.connector import Error as mysqlError
import app.db.user_db as user_db
from app.models.user_model import User
import app.messages.api_responses.user_responses as response
import app.messages.api_errors as errors
from app.mail.mail import send_mail
from app.messages.mail_responses.user_email import USER_EMAIL_SUCCESSFULLY_DELETED_ACCOUNT
def delete_user(user_id: str) -> Tuple[Union[dict, str], int]:
"""
Deletes a user account.
:param user_id: User ID.
:type user_id: str
:return: Tuple containing a dictionary and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
try:
user: User = user_db.fetch_by_id(user_id=user_id)
user_db.delete_user(user)
send_mail(USER_EMAIL_SUCCESSFULLY_DELETED_ACCOUNT, user.email)
except mysqlError as e:
return errors.UNKNOWN_DATABASE_ERROR(e)
return response.USER_DELETED_SUCCESSFULLY

View File

@ -1,44 +0,0 @@
import datetime
from typing import Tuple, Union
import bcrypt
from mysql.connector import Error as mysqlError
from flask_jwt_extended import create_access_token
import app.messages.api_responses.user_responses as response
import app.messages.api_errors as errors
from app.db import user_db
from app.mail.mail import send_mail
from app.models.user_model import User
from app.messages.mail_responses.user_email import USER_EMAIL_SUCCESSFULLY_LOGGED_IN
def login(username: str, password: str) -> Tuple[Union[dict, str], int]:
"""
Authenticates a user with the provided username and password.
:param username: User's username.
:type username: str
:param password: User's password.
:type password: str
:return: Tuple containing a dictionary with a token and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
try:
user: User = user_db.fetch_by_username(username)
if user is None:
return response.USERNAME_NOT_FOUND
if not bcrypt.checkpw(password.encode("utf-8"), user.password.encode("utf-8")):
return response.INCORRECT_PASSWORD
expire = datetime.timedelta(hours=1)
token = create_access_token(identity=user.user_id, expires_delta=expire)
send_mail(USER_EMAIL_SUCCESSFULLY_LOGGED_IN, user.email)
return {"token": token}, 200
except mysqlError as e:
return errors.UNKNOWN_DATABASE_ERROR(e)

View File

@ -1,31 +0,0 @@
from typing import Tuple, Union
import app.messages.api_responses.user_responses as response
from app.db import user_db
from app.models.user_model import User
from app.mail.mail import send_mail
from app.services.user import user_helper as helper
from app.messages.mail_responses.user_email import USER_EMAIL_SUCCESSFULLY_LOGGED_OUT
def logout(jwt_token, user_id, send_notif: bool) -> Tuple[Union[dict, str], int]:
"""
Logs out a user by invalidating the provided JWT.
:param jti: JWT ID.
:type jti: str
:param exp: JWT expiration timestamp.
:type exp: int
:return: Tuple containing a dictionary and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
jti = jwt_token["jti"]
exp = jwt_token["exp"]
user: User = user_db.fetch_by_id(user_id)
helper.invalidate_token(jti, exp)
if send_notif:
send_mail(USER_EMAIL_SUCCESSFULLY_LOGGED_OUT, user.email)
return response.USER_LOGGED_OUT_SUCCESSFULLY

View File

@ -1,64 +0,0 @@
import bcrypt
from typing import Tuple, Union
from mysql.connector import Error as mysqlError
import app.messages.api_responses.user_responses as response
import app.messages.api_errors as errors
from app.db import user_db
from app.mail.mail import send_mail
from app.models.user_model import User
from app.services.user import user_helper as helper
from app.messages.mail_responses.user_email import USER_EMAIL_SUCCESSFULLY_REGISTERED
def register(
username: str, displayname: str, email: str, password: str
) -> Tuple[Union[dict, str], int]:
"""
Registers a new user with the provided username, email, and password.
:param username: User's username.
:type username: str
:param email: User's email address.
:type email: str
:param password: User's password.
:type password: str
:return: Tuple containing a dictionary and an HTTP status code.
:rtype: Tuple[Union[dict, str], int]
"""
try:
if not helper.verify_username(username):
return response.INVALID_USERNAME_FORMAT
if not helper.verify_displayname(displayname):
return response.INVALID_DISPLAYNAME_FORMAT
if not helper.verify_email(email):
return response.INVALID_EMAIL_FORMAT
if not helper.verify_password(password):
return response.INVALID_PASSWORD_FORMAT
hashed_password = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt())
new_user: User = User(
username=username,
displayname=displayname,
email=email,
password=hashed_password,
)
user_db.insert_user(new_user)
except mysqlError as e:
if "email" in e.msg:
return response.EMAIL_ALREADY_IN_USE
if "username" in e.msg:
return response.USERNAME_ALREADY_IN_USE
return errors.UNKNOWN_DATABASE_ERROR(e)
send_mail(USER_EMAIL_SUCCESSFULLY_REGISTERED, new_user.email)
return response.USER_CREATED_SUCCESSFULLY

View File

@ -1,72 +0,0 @@
import bcrypt
from typing import Tuple, Union
from mysql.connector import Error as mysqlError
from app.db import user_db
from app.mail.mail import send_mail
from app.models.user_model import User
from app.services.user import user_helper as helper
from app.messages.api_responses import user_responses as response
import app.messages.api_errors as errors
from app.messages.mail_responses.user_email import (
USER_EMAIL_SUCCESSFULLY_UPDATED_ACCOUNT,
)
def update_user(
user_id: str,
new_username: str = None,
new_displayname: str = None,
new_email: str = None,
new_password: str = None,
) -> Tuple[Union[dict, str], int]:
user: User = user_db.fetch_by_id(user_id)
updated_attributes = []
if user is None:
return response.UNKNOWN_ERROR
if new_username:
if not helper.verify_username(new_username):
return response.INVALID_USERNAME_FORMAT
user.username = new_username
updated_attributes.append("username")
if new_displayname:
if not helper.verify_displayname(new_displayname):
return response.INVALID_DISPLAYNAME_FORMAT
user.displayname = new_displayname
updated_attributes.append("displayname")
if new_email:
if not helper.verify_email(new_email):
return response.INVALID_EMAIL_FORMAT
user.email = new_email
updated_attributes.append("email")
if new_password:
if not helper.verify_password(new_password):
return response.INVALID_PASSWORD_FORMAT
hashed_password = bcrypt.hashpw(new_password.encode("utf-8"), bcrypt.gensalt())
user.password = hashed_password
updated_attributes.append("password")
try:
user_db.update_user(user)
except mysqlError as e:
if "username" in e.msg:
return response.USERNAME_ALREADY_IN_USE
if "email" in e.msg:
return response.EMAIL_ALREADY_IN_USE
return errors.UNKNOWN_DATABASE_ERROR(e)
send_mail(USER_EMAIL_SUCCESSFULLY_UPDATED_ACCOUNT, user.email)
return response.USER_ACCOUNT_UPDATED_SUCCESSFULLY(updated_attributes)

View File

@ -1,74 +0,0 @@
import re
from datetime import datetime
from app.extensions import jwt_redis_blocklist
def invalidate_token(jti: str, exp: int):
"""
Invalidates a JWT by adding its JTI to the Redis blocklist.
:param jti: JWT ID.
:type jti: str
:param exp: JWT expiration timestamp.
:type exp: int
"""
expiration = datetime.fromtimestamp(exp)
now = datetime.now()
delta = expiration - now
jwt_redis_blocklist.set(jti, "", ex=delta)
def verify_email(email: str) -> bool:
"""
Verifies a given email string against a regular expression.
:param email: Email string.
:type email: str
:return: Boolean indicating whether the email successfully passed the check.
:rtype: bool
"""
email_regex = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
return re.match(email_regex, email) and len(email) <= 64
def verify_displayname(displayname: str) -> bool:
"""
Verifies a given display name string against a regular expression.
:param displayname: Display name string.
:type displayname: str
:return: Boolean indicating whether the display name successfully passed the check.
:rtype: bool
"""
displayname_regex = r"^[a-zA-Z.-_]{1,64}$"
return re.match(displayname_regex, displayname)
def verify_username(username: str) -> bool:
"""
Verifies a given username string against a regular expression.
:param username: Username string.
:type username: str
:return: Boolean indicating whether the username successfully passed the check.
:rtype: bool
"""
username_regex = r"^[a-z]{1,64}$"
return re.match(username_regex, username)
def verify_password(password: str) -> bool:
"""
Verifies a given password string against a regular expression.
:param password: Password string.
:type password: str
:return: Boolean indicating whether the password successfully passed the check.
:rtype: bool
"""
password_regex = (
r"^(?=.*?[A-Z])(?=.*?[a-z])(?=.*?[0-9])(?=.*?[#?!@$ %^&*-]).{8,64}$"
)
return re.match(password_regex, password)

View File

@ -1,36 +1,58 @@
import uuid import logging
from typing import Optional from typing import Optional
from uuid import UUID
from sqlmodel import Session, select from sqlmodel import Session, select
from app.database.models.user_model import User from app.core.security import get_password_hash, verify_password
from app.crud.shop_crud import get_shop_id_from_uuid
from app.database.models.user_model import User, UserRole
from app.schemas.user_schemas import UserRegister from app.schemas.user_schemas import UserRegister
from app.core.security import verify_password, get_password_hash
from app.utils.models import generate_user_uuid5 from app.utils.models import generate_user_uuid5
def get_user_by_generated_uuid(session: Session, email: str, shop_id: Optional[int]) -> Optional[User]: logger = logging.getLogger(__name__)
def get_user_by_generated_uuid(session: Session, email: str, shop_uuid: Optional[UUID]) -> Optional[User]:
logger.debug("Getting shop id by UUID - %s", shop_uuid)
shop_id = get_shop_id_from_uuid(session, shop_uuid)
logger.debug("Generating user UUID5")
user_uuid = generate_user_uuid5(email, shop_id) user_uuid = generate_user_uuid5(email, shop_id)
stmt = select(User).where(User.uuid == user_uuid) stmt = select(User).where(User.uuid == user_uuid)
logger.debug("Executing select query")
db_user = session.exec(stmt).one_or_none() db_user = session.exec(stmt).one_or_none()
return db_user return db_user
def create_user(session: Session, user_register: UserRegister, shop_id: Optional[int], user_role: str): def create_user(session: Session, user_register: UserRegister, shop_uuid: Optional[UUID], user_role: UserRole):
logger.debug("Getting shop id by UUID - %s", shop_uuid)
shop_id = get_shop_id_from_uuid(session, shop_uuid)
logger.debug("Generating user UUID5")
user_uuid = generate_user_uuid5(user_register.email, shop_id) user_uuid = generate_user_uuid5(user_register.email, shop_id)
password = get_password_hash(user_register.password) logger.debug("Hashing password")
hashed_password = get_password_hash(user_register.password)
new_user = User( new_user = User(
uuid=user_uuid, uuid=user_uuid,
shop_id=shop_id, shop_id=shop_id,
email=user_register.email, email=user_register.email,
username=user_register.username, username=user_register.username,
phone_number=user_register.phone_number phone_number=user_register.phone_number,
user_role=user_role,
password=hashed_password
) )
logger.debug("Inserting new user")
session.add(new_user) session.add(new_user)
session.commit() session.commit()
def authenticate(session: Session, email: str, password: str, shop_id: Optional[int]) -> Optional[User]: def authenticate(session: Session, email: str, password: str, shop_uuid: Optional[int]) -> Optional[User]:
logger.debug("Getting shop id by UUID - %s", shop_uuid)
shop_id = get_shop_id_from_uuid(session, shop_uuid)
logger.debug("Fetching user from db by email - %s", email)
db_user = get_user_by_generated_uuid(session, email, shop_id) db_user = get_user_by_generated_uuid(session, email, shop_id)
if db_user is None: if db_user is None:
logger.warn("Didn't find User with email=%s for shop=%s", email, shop_uuid)
return None return None
if not verify_password(plain_password=password, hashed_password=db_user.password): if not verify_password(plain_password=password, hashed_password=db_user.password):
logger.warn("Found user with email=%s for shop=%s", email, shop_uuid)
return None return None
return db_user return db_user

View File

@ -1,5 +0,0 @@
from sqlalchemy.orm import declarative_base
Base = declarative_base()
__all__ = ["Base"]

View File

@ -1,13 +1,12 @@
from uuid import UUID
from enum import Enum as PyEnum
from typing import Optional, List
from datetime import datetime, time from datetime import datetime, time
from enum import Enum as PyEnum
from typing import Optional
from uuid import UUID
from sqlalchemy import Column, CheckConstraint
from sqlalchemy.dialects.postgresql import JSONB
from sqlmodel import SQLModel, Field, Relationship, Enum
from pydantic import BaseModel, constr from pydantic import BaseModel, constr
from sqlalchemy import CheckConstraint, Column
from sqlalchemy.dialects.postgresql import JSONB
from sqlmodel import Enum, Field, Relationship, SQLModel
class ShopStatus(PyEnum): class ShopStatus(PyEnum):
@ -22,7 +21,7 @@ class ShopLinkEntry(BaseModel):
class ShopLinks(BaseModel): class ShopLinks(BaseModel):
links: List[ShopLinkEntry] links: list[ShopLinkEntry]
class ShopBusinessHourEntry(BaseModel): class ShopBusinessHourEntry(BaseModel):
@ -32,7 +31,7 @@ class ShopBusinessHourEntry(BaseModel):
class ShopBusinessHours(BaseModel): class ShopBusinessHours(BaseModel):
hours: List[ShopBusinessHourEntry] hours: list[ShopBusinessHourEntry]
class Shop(SQLModel, table=True): class Shop(SQLModel, table=True):
@ -59,8 +58,15 @@ class Shop(SQLModel, table=True):
sa_column=Column(JSONB, nullable=False, default=lambda: {}) sa_column=Column(JSONB, nullable=False, default=lambda: {})
) )
owner: Optional["User"] = Relationship(back_populates='owned_shops') owner: Optional["User"] = Relationship(
registered_users: List["User"] = Relationship(back_populates='registered_shop') back_populates='owned_shop',
sa_relationship_kwargs={"foreign_keys": "[Shop.owner_id]"}
)
registered_users: list["User"] = Relationship(
back_populates='registered_shop',
sa_relationship_kwargs={"foreign_keys": "[User.shop_id]"}
)
__table_args__ = ( __table_args__ = (
CheckConstraint("business_hours ? 'hours'", name="check_business_hours_keys"), CheckConstraint("business_hours ? 'hours'", name="check_business_hours_keys"),

View File

@ -1,17 +1,28 @@
from typing import Optional, List
from uuid import UUID
from datetime import datetime from datetime import datetime
from enum import Enum as PyEnum
from typing import Optional
from uuid import UUID
from sqlmodel import SQLModel, Field, Relationship from sqlalchemy import Column
from sqlmodel import Enum, Field, Relationship, SQLModel
class UserRole(PyEnum):
OWNER = "owner"
CUSTOMER = "customer"
EMPLOYEE = "employee"
MANAGER = "manager"
ADMIN = "admin"
class User(SQLModel, table=True): class User(SQLModel, table=True):
__tablename__ = 'user' __tablename__ = "user"
id: int = Field(primary_key=True) id: int = Field(primary_key=True)
uuid: UUID = Field(nullable=False, unique=True) uuid: UUID = Field(nullable=False, unique=True)
user_role_id: int = Field(foreign_key="user_role.id", nullable=False) user_role: UserRole = Field(sa_column=Column(Enum(UserRole), nullable=False))
shop_id: Optional[int] = Field(foreign_key="shop.id") shop_id: Optional[int] = Field(foreign_key="shop.id")
status: UserRole = Field(sa_column=Column(Enum(UserRole)))
username: str = Field(max_length=64, nullable=False, unique=True) username: str = Field(max_length=64, nullable=False, unique=True)
email: str = Field(max_length=128, nullable=False, unique=True) email: str = Field(max_length=128, nullable=False, unique=True)
password: str = Field(max_length=60, nullable=False) password: str = Field(max_length=60, nullable=False)
@ -23,31 +34,29 @@ class User(SQLModel, table=True):
updated_at: datetime = Field(default_factory=datetime.utcnow, nullable=False) updated_at: datetime = Field(default_factory=datetime.utcnow, nullable=False)
last_login: Optional[datetime] = Field(default_factory=datetime.utcnow) last_login: Optional[datetime] = Field(default_factory=datetime.utcnow)
owned_shops: List["Shop"] = Relationship(back_populates="owner") owned_shop: "Shop" = Relationship(
registered_shop: List["Shop"] = Relationship(back_populates="registered_users") back_populates="owner",
sa_relationship_kwargs={"foreign_keys": "[Shop.owner_id]"}
)
registered_shop: Optional["Shop"] = Relationship(
back_populates="registered_users",
sa_relationship_kwargs={"foreign_keys": "[User.shop_id]"}
)
user_assigned_role: Optional["UserRole"] = Relationship(back_populates="role_users", sa_relationship_kwargs={"foreign_keys": "[User.user_role_id]"})
preferences: Optional["UserPreferences"] = Relationship(back_populates="user") preferences: Optional["UserPreferences"] = Relationship(back_populates="user")
statistics: Optional["UserStatistics"] = Relationship(back_populates="user_statistics") statistics: Optional["UserStatistics"] = Relationship(back_populates="user")
class UserPreferences(SQLModel, table=True): class UserPreferences(SQLModel, table=True):
__tablename__ = 'user_preferences' __tablename__ = "user_preferences"
user_id: int = Field(foreign_key="user.id", primary_key=True) user_id: int = Field(foreign_key="user.id", primary_key=True)
user: Optional["User"] = Relationship(back_populates="preferences") user: Optional["User"] = Relationship(back_populates="preferences")
class UserRole(SQLModel, table=True):
__tablename__ = 'user_role'
id: int = Field(primary_key=True)
name: str = Field(nullable=False, unique=True, max_length=50)
role_users: List["User"] = Relationship(back_populates="user_assigned_role")
class UserStatistics(SQLModel, table=True): class UserStatistics(SQLModel, table=True):
__tablename__ = "user_statistics" __tablename__ = "user_statistics"
@ -56,4 +65,5 @@ class UserStatistics(SQLModel, table=True):
user: Optional["User"] = Relationship(back_populates="statistics") user: Optional["User"] = Relationship(back_populates="statistics")
__all__ = ["User", "UserPreferences", "UserRole"] __all__ = ["User", "UserPreferences", "UserRole"]

View File

@ -1,37 +0,0 @@
from typing import Annotated
from pydantic import BaseModel
from fastapi import Header, HTTPException, Request, Body
async def get_token_header(x_token: Annotated[str, Header()]):
if x_token != "fake-super-secret-token":
raise HTTPException(status_code=400, detail="X-Token header invalid")
class LoginRequest(BaseModel):
username: str
password: str
async def get_jsession_id(request: Request):
jsessionid = (
request.headers.get("JSESSIONID")
or request.cookies.get("JSESSIONID")
or request.query_params.get("jsessionid")
)
if not jsessionid:
raise HTTPException(status_code=400, detail="JSESSIONID is required for this operation")
return jsessionid
async def get_credentials(credentials: LoginRequest = Body(...)):
return credentials.dict()
async def get_query_token(token: str):
if token != "jessica":
raise HTTPException(status_code=400, detail="No Jessica token provided")

View File

@ -1,12 +1,10 @@
from fastapi import FastAPI from fastapi import FastAPI
from fastapi.routing import APIRoute from fastapi.routing import APIRoute
from starlette.middleware.cors import CORSMiddleware from starlette.middleware.cors import CORSMiddleware
from app.utils import logger
from app.api import api_router from app.api import api_router
from app.core.config import settings from app.core.config import settings
from app.utils import logger
logger.setup_logger() logger.setup_logger()

View File

@ -1,13 +1,24 @@
from sqlmodel import Field as SqlModelField, SQLModel import re
from pydantic import EmailStr, Field from pydantic import EmailStr, model_validator
from sqlmodel import Field, SQLModel
class UserRegister(SQLModel): class UserRegister(SQLModel):
username: str = Field(..., min_length=3, max_length=64) username: str = Field(min_length=3, max_length=64)
email: EmailStr = Field(...) email: EmailStr = Field()
phone_number: str = Field(..., min_length=2, max_length=16, pattern=r'^\+[1-9]\d{1,14}$') phone_number: str = Field(min_length=2, max_length=16, schema_extra={"pattern": r'^\+[1-9]\d{1,14}$'})
password: str = Field(..., min_length=6, max_length=128) password: str = Field(min_length=8, max_length=128,
shop_id: int = 0 description="Password must be at least 8 and at most 128 characters long, contain a lower case, upper case letter, a number and a special character (#?!@$ %^&*-)")
@model_validator(mode="after")
def validate_using_regex(self):
self.__validate_password()
return self
def __validate_password(self):
password_regex = r"^(?=.*?[A-Z])(?=.*?[a-z])(?=.*?[0-9])(?=.*?[#?!@$ %^&*-]).{8,128}$"
if not re.match(password_regex, self.password):
raise ValueError("Password is too weak")
class Token(SQLModel): class Token(SQLModel):

View File

@ -18,6 +18,24 @@ pydantic-settings = "^2.8.1"
sqlmodel = "^0.0.24" sqlmodel = "^0.0.24"
psycopg2-binary = "^2.9.10" psycopg2-binary = "^2.9.10"
[tool.ruff.lint]
select = [
"E", # pycodestyle errors
"W", # pycodestyle warnings
"F", # pyflakes
"I", # isort
"B", # flake8-bugbear
"C4", # flake8-comprehensions
"UP", # pyupgrade
"ARG001", # unused arguments in functions
]
ignore = [
"E501", # line too long, handled by black
"B008", # do not perform function calls in argument defaults
"W191", # indentation contains tabs
"B904", # Allow raising exceptions without from e, for HTTPException
]
[tool.poetry.group.dev.dependencies] [tool.poetry.group.dev.dependencies]
pylint = "^3.3.4" pylint = "^3.3.4"