import datetime from typing import Optional, List import jwt from fastapi import HTTPException, Request from sqlalchemy.ext.asyncio import AsyncSession from starlette import status from app.application.users_repository import UsersRepository from app.application.sessions_repository import SessionsRepository from app.domain.entities.responses.session import SessionEntity from app.domain.models.sessions import Session from app.settings import get_auth_data class AuthService: def __init__(self, db: AsyncSession): self.users_repository = UsersRepository(db) self.sessions_repository = SessionsRepository(db) async def authenticate_user(self, login: str, password: str, request: Request) -> Optional[dict]: user = await self.users_repository.get_by_login(login) if user and user.check_password(password): access_token = self.create_access_token({"user_id": user.id}) # Создаем сессию session = Session( user_id=user.id, token=access_token, device_info=request.headers.get("User-Agent", "Unknown"), created_at=datetime.datetime.now(), # Naive datetime expires_at=datetime.datetime.now() + datetime.timedelta(days=30), is_active=True ) await self.sessions_repository.create(session) return { "access_token": access_token, "user_id": user.id } raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Неправильный логин или пароль") async def get_user_sessions(self, user_id: int) -> List[SessionEntity]: sessions = await self.sessions_repository.get_by_user_id(user_id) return [SessionEntity.from_orm(session) for session in sessions] async def deactivate_session(self, session_id: int, user_id: int) -> None: session = await self.sessions_repository.get_by_id(session_id) if not session or session.user_id != user_id: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Сессия не найдена или доступ запрещен") await self.sessions_repository.deactivate_session(session_id) async def deactivate_all_sessions(self, user_id: int) -> None: await self.sessions_repository.deactivate_all_sessions(user_id) async def cleanup_expired_sessions(self) -> None: await self.sessions_repository.cleanup_expired_sessions() @staticmethod def create_access_token(data: dict) -> str: to_encode = data.copy() expire = datetime.datetime.now() + datetime.timedelta(days=30) # Naive datetime to_encode.update({"exp": expire}) auth_data = get_auth_data() encode_jwt = jwt.encode(to_encode, auth_data['secret_key'], algorithm=auth_data['algorithm']) return encode_jwt