API_logistics/app/core/usecases/auth_service.py
2024-10-04 19:26:11 +05:00

51 lines
1.6 KiB
Python

from datetime import datetime, timedelta, timezone
from typing import Optional
from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
import jwt
from sqlalchemy.orm import Session
from dotenv import load_dotenv
import os
from app.infrastructure.database.dependencies import get_db
from app.infrastructure.database.repository.user_repository import UsersRepository
load_dotenv()
SECRET_KEY = os.getenv("SECRET_KEY")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def verify_token(token: str, db: Session = Depends(get_db)):
credentials_exception = HTTPException(
status_code=401,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id: str = payload.get("sub")
if user_id is None:
raise credentials_exception
user_repo = UsersRepository(db)
user = user_repo.get_by_id(
int(user_id)
)
if user is None:
raise credentials_exception
return user
except jwt.PyJWTError: # Обрабатываем исключения от PyJWT
raise credentials_exception