swag-shop/backend/app/services/auth_service.py

74 lines
2.5 KiB
Python

from datetime import datetime, timedelta, timezone
import jwt
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jwt.exceptions import JWTException
from passlib.context import CryptContext
from app.models.user_model import User
from app.database.manager import DatabaseManager
# Security settings
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# Password hashing setup
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# OAuth2 for token-based authentication
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def get_password_hash(password: str) -> str:
"""Hashes a password using bcrypt."""
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verifies that a given password matches the stored hashed password."""
return pwd_context.verify(plain_password, hashed_password)
def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
"""Generates a JWT token."""
to_encode = data.copy()
expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def get_user_by_username(username: str) -> User | None:
"""Retrieves a user from the database by username."""
with DatabaseManager.get_session() as session:
return session.query(User).filter(User.username == username).first()
def authenticate_user(username: str, password: str) -> User | bool:
"""Authenticates a user by verifying their credentials."""
user = get_user_by_username(username)
if not user or not verify_password(password, user.password):
return False
return user
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
"""Retrieves the current authenticated user based on the token."""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTException:
raise credentials_exception
user = get_user_by_username(username)
if user is None:
raise credentials_exception
return user