visus-plus/api/app/infrastructure/auth_service.py

66 lines
2.8 KiB
Python

import datetime
from typing import Optional, List
import jwt
from fastapi import HTTPException, Request
from sqlalchemy.ext.asyncio import AsyncSession
from starlette import status
from app.application.users_repository import UsersRepository
from app.application.sessions_repository import SessionsRepository
from app.domain.entities.responses.session import SessionEntity
from app.domain.models.sessions import Session
from app.settings import get_auth_data
class AuthService:
def __init__(self, db: AsyncSession):
self.users_repository = UsersRepository(db)
self.sessions_repository = SessionsRepository(db)
async def authenticate_user(self, login: str, password: str, request: Request) -> Optional[dict]:
user = await self.users_repository.get_by_login(login)
if user and user.check_password(password):
access_token = self.create_access_token({"user_id": user.id})
session = Session(
user_id=user.id,
token=access_token,
device_info=request.headers.get("User-Agent", "Unknown"),
created_at=datetime.datetime.now(), # Naive datetime
expires_at=datetime.datetime.now() + datetime.timedelta(days=30),
is_active=True
)
await self.sessions_repository.create(session)
return {
"access_token": access_token,
"user_id": user.id
}
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Неправильный логин или пароль")
async def get_user_sessions(self, user_id: int) -> List[SessionEntity]:
sessions = await self.sessions_repository.get_by_user_id(user_id)
return [SessionEntity.from_orm(session) for session in sessions]
async def deactivate_session(self, session_id: int, user_id: int) -> None:
session = await self.sessions_repository.get_by_id(session_id)
if not session or session.user_id != user_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Сессия не найдена или доступ запрещен")
await self.sessions_repository.deactivate_session(session_id)
async def deactivate_all_sessions(self, user_id: int) -> None:
await self.sessions_repository.deactivate_all_sessions(user_id)
async def cleanup_expired_sessions(self) -> None:
await self.sessions_repository.cleanup_expired_sessions()
@staticmethod
def create_access_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.datetime.now() + datetime.timedelta(days=30) # Naive datetime
to_encode.update({"exp": expire})
auth_data = get_auth_data()
encode_jwt = jwt.encode(to_encode, auth_data['secret_key'], algorithm=auth_data['algorithm'])
return encode_jwt