51 lines
1.7 KiB
Python
51 lines
1.7 KiB
Python
from typing import Annotated
|
|
|
|
import jwt
|
|
from fastapi import Depends, HTTPException, status
|
|
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
|
|
from jwt.exceptions import InvalidTokenError
|
|
from pydantic import ValidationError
|
|
from sqlmodel import Session, select
|
|
|
|
from app.core import security
|
|
from app.core.config import settings
|
|
from app.database.manager import get_session
|
|
from app.database.models.user_model import User, UserRole
|
|
from app.schemas.user_schemas import TokenPayload
|
|
|
|
reusable_oauth2 = OAuth2PasswordBearer(
|
|
tokenUrl="/login/access-token"
|
|
)
|
|
|
|
SessionDep = Annotated[Session, Depends(get_session)]
|
|
TokenDep = Annotated[str, Depends(reusable_oauth2)]
|
|
LoginDep = Annotated[OAuth2PasswordRequestForm, Depends(OAuth2PasswordRequestForm)]
|
|
|
|
|
|
def get_current_user(session: SessionDep, token: TokenDep) -> User:
|
|
try:
|
|
payload = jwt.decode(
|
|
token, settings.SECRET_KEY, algorithms=[security.ALGORITHM]
|
|
)
|
|
token_data = TokenPayload(**payload)
|
|
except (InvalidTokenError, ValidationError):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Could not validate credentials",
|
|
)
|
|
stmt = select(User).where(User.uuid == token_data.sub)
|
|
user = session.exec(stmt).one_or_none()
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
return user
|
|
|
|
|
|
CurrentUser = Annotated[User, Depends(get_current_user)]
|
|
|
|
def get_owner_user(current_user: CurrentUser) -> User:
|
|
if current_user.user_role != UserRole.OWNER:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="You must be an owner")
|
|
return current_user
|
|
|
|
CurrentOwnerUser = Annotated[User, Depends(get_owner_user)]
|