сделал аутентификацию в апи

This commit is contained in:
Андрей Дувакин 2025-11-26 22:02:57 +05:00
parent 3947cdfc79
commit a66ca3a5e3
15 changed files with 406 additions and 1 deletions

View File

@ -0,0 +1,27 @@
from typing import Optional
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.domain.models.roles import Role
class RolesRepository:
def __init__(self, db: AsyncSession) -> None:
self.db = db
async def get_by_id(self, role_id: int) -> Optional[Role]:
query = (
select(Role)
.filter_by(role_id=role_id)
)
result = await self.db.execute(query)
return result.scalars().first()
async def get_by_title(self, title: str) -> Optional[Role]:
query = (
select(Role)
.filter_by(title=title)
)
result = await self.db.execute(query)
return result.scalars().first()

View File

@ -0,0 +1,27 @@
from typing import Optional
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.domain.models.statuses import Status
class StatusesRepository:
def __init__(self, db: AsyncSession) -> None:
self.db = db
async def get_by_id(self, status_id: int) -> Optional[Status]:
query = (
select(Status)
.filter_by(id=status_id)
)
result = await self.db.execute(query)
return result.scalars().first()
async def get_by_title(self, title: str) -> Optional[Status]:
query = (
select(Status)
.filter_by(title=title)
)
result = await self.db.execute(query)
return result.scalars().first()

View File

@ -0,0 +1,38 @@
from typing import Optional
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.domain.models.users import User
class UsersRepository:
def __init__(self, db: AsyncSession) -> None:
self.db = db
async def get_by_id(self, user_id: int) -> Optional[User]:
query = (
select(User)
.filter_by(id=user_id)
)
result = await self.db.execute(query)
return result.scalars().first()
async def get_by_login(self, login: str) -> Optional[User]:
query = (
select(User)
.filter_by(login=login)
)
result = await self.db.execute(query)
return result.scalars().first()
async def create(self, user: User) -> User:
self.db.add(user)
await self.db.commit()
await self.db.refresh(user)
return user
async def update(self, user: User) -> User:
await self.db.merge(user)
await self.db.commit()
return user

View File

@ -0,0 +1,23 @@
from fastapi import APIRouter, Depends, Response
from sqlalchemy.ext.asyncio import AsyncSession
from app.database.session import get_db
from app.domain.entities.auth import LoginRequest, LoginResponse
from app.infrastructure.auth_service import AuthService
auth_router = APIRouter()
@auth_router.post(
'/login/',
response_model=LoginResponse,
summary='User authentication',
description='Logs in the user and outputs the `access_token` ',
)
async def auth_user(
response: Response,
user_data: LoginRequest,
db: AsyncSession = Depends(get_db)
):
auth_service = AuthService(db)
return await auth_service.authenticate_user(user_data)

0
api/app/core/__init__.py Normal file
View File

View File

@ -0,0 +1,9 @@
class UserStatuses:
ACTIVE = 'active'
BLOCKED = 'blocked'
class UserRoles:
STUDENT = 'student'
TEACHER = 'teacher'
ADMIN = 'root'

View File

@ -0,0 +1,11 @@
from pydantic import BaseModel, EmailStr, Field
class LoginRequest(BaseModel):
login: str = Field(max_length=250)
password: str = Field(min_length=8)
class LoginResponse(BaseModel):
user_id: int
access_token: str

View File

@ -0,0 +1,30 @@
from datetime import date
from typing import Optional
from pydantic import BaseModel, EmailStr, Field
class UserCreate(BaseModel):
first_name: str = Field(max_length=250)
last_name: str = Field(max_length=250)
patronymic: Optional[str] = Field(default=None, max_length=250)
login: str = Field(max_length=250)
email: Optional[EmailStr] = None
birthdate: date
password: str = Field(min_length=8)
role_id: Optional[int] = Field(default=None)
repeat_password: str = Field(min_length=8)
class UserRead(BaseModel):
first_name: str
last_name: str
patronymic: Optional[str]
login: str
email: Optional[EmailStr]
birthdate: date
status_id: int
role_id: int
class Config:
orm_mode = True

View File

@ -4,6 +4,7 @@ from typing import List
from sqlalchemy import String, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func
from werkzeug.security import check_password_hash, generate_password_hash
from app.domain.models.base import PhotoAbstract
@ -43,3 +44,9 @@ class User(PhotoAbstract):
back_populates='student',
foreign_keys=['student_id'],
)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
def set_password(self, password):
self.password_hash = generate_password_hash(password)

View File

@ -0,0 +1,37 @@
import datetime
from typing import Optional
import jwt
from fastapi import HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from starlette import status
from app.application.users_repository import UsersRepository
from app.domain.entities.auth import LoginRequest
from app.settings import get_auth_data
class AuthService:
def __init__(self, db: AsyncSession):
self.users_repository = UsersRepository(db)
async def authenticate_user(self, user_data: LoginRequest) -> Optional[dict]:
user = await self.users_repository.get_by_login(user_data.login)
if user and user.check_password(user_data.password):
access_token = self.create_access_token({"user_id": user.id})
return {
"access_token": access_token,
"user_id": user.id
}
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Неправильный логин или пароль")
@staticmethod
def create_access_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=30)
to_encode.update({"exp": expire})
auth_data = get_auth_data()
encode_jwt = jwt.encode(to_encode, auth_data['secret_key'], algorithm=auth_data['algorithm'])
return encode_jwt

View File

@ -0,0 +1,47 @@
import jwt
from fastapi import Depends, HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.ext.asyncio import AsyncSession
from starlette import status
from app.application.users_repository import UsersRepository
from app.core.constants import UserStatuses
from app.database.session import get_db
from app.domain.models.users import User
from app.settings import get_auth_data, Settings
security = HTTPBearer()
async def require_auth_user(
credentials: HTTPAuthorizationCredentials = Security(security),
db: AsyncSession = Depends(get_db)
):
auth_data = get_auth_data()
try:
payload = jwt.decode(credentials.credentials, auth_data['secret_key'], algorithms=[auth_data['algorithm']])
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Ошибка авторизации')
except jwt.InvalidTokenError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Ошибка авторизации')
user_id = payload.get('user_id')
if user_id is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Ошибка авторизации')
user = await UsersRepository(db).get_by_id(user_id)
if user is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Ошибка авторизации')
if user.status != UserStatuses.ACTIVE:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail='Ошибка авторизации')
return user
def require_admin(user: User = Depends(require_auth_user)):
if user.role.title != Settings().root_role_name:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail='Ошибка доступа')
return user

View File

@ -0,0 +1,102 @@
import re
from typing import Optional
from fastapi import HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.application.roles_repository import RolesRepository
from app.application.statuses_repository import StatusesRepository
from app.application.users_repository import UsersRepository
from app.domain.entities.users import UserCreate, UserRead
from app.domain.models.users import User
from app.settings import Settings
class UsersService:
def __init__(self, db: AsyncSession):
self.users_repository = UsersRepository(db)
self.roles_repository = RolesRepository(db)
self.statuses_repository = StatusesRepository(db)
self.settings = Settings()
async def create_user(self, create_user_entity: UserCreate) -> Optional[UserRead]:
user = await self.users_repository.get_by_login(create_user_entity.login)
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Пользователь с таким логином уже существует',
)
if create_user_entity.password != create_user_entity.repeat_password:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Пароли не совпадают',
)
if not self.is_strong_password(create_user_entity.repeat_password):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Пароль слишком слабый. Пароль должен содержать не менее 8 символов, включая хотя бы одну букву и одну цифру и один специальный символ.'
)
default_status = await self.statuses_repository.get_by_title(self.settings.default_status)
if default_status is None:
raise HTTPException(
status_code=status.HTTP_424_FAILED_DEPENDENCY,
detail='Статус по умолчанию не найден',
)
user = User(
first_name=create_user_entity.first_name,
last_name=create_user_entity.last_name,
patronymic=create_user_entity.patronymic,
login=create_user_entity.login,
email=create_user_entity.email,
birthdate=create_user_entity.birthdate,
status_id=default_status.status_id,
)
if create_user_entity.role_id is None:
default_role = await self.roles_repository.get_by_title(self.settings.default_role_name)
if default_role is None:
raise HTTPException(
status_code=status.HTTP_424_FAILED_DEPENDENCY,
detail='Роль по умолчанию не найдена',
)
user.role_id = default_role.id
else:
role = await self.roles_repository.get_by_id(create_user_entity.role_id)
if role is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Роль с таким ID не найдена',
)
user.role_id = role.id
user.set_password(create_user_entity.password)
user = await self.users_repository.create(user)
return UserRead.model_validate(user)
@staticmethod
def is_strong_password(password: str) -> bool:
if len(password) < 8:
return False
if not re.search(r'[A-Z]', password):
return False
if not re.search(r'[a-z]', password):
return False
if not re.search(r'\d', password):
return False
if not re.search(r'[!@#$%^&*(),.?\':{}|<>]', password):
return False
return True

View File

@ -0,0 +1,30 @@
from fastapi import FastAPI
from starlette.middleware.cors import CORSMiddleware
from app.controllers.auth_router import auth_router
from app.settings import Settings
def start_app():
api_app = FastAPI()
settings = Settings()
api_app.add_middleware(
CORSMiddleware,
allow_origins=['*'],
allow_credentials=True,
allow_methods=['*'],
allow_headers=['*'],
)
api_app.include_router(auth_router, prefix=f'{settings.prefix}/auth', tags=['auth'])
return api_app
app = start_app()
@app.get('/', tags=['root'])
async def root():
return {'message': 'Hello :з'}

View File

@ -13,7 +13,21 @@ class Settings(BaseSettings):
db_name: str = Field(alias='DB_NAME')
db_schema: str = Field(alias='DB_SCHEMA')
secret_key: str = Field(alias='SECRET_KEY')
algorithm: str = Field(alias='ALGORITHM', default='sha256')
default_role_name: str = Field(alias='DEFAULT_ROLE_NAME', default='student')
root_role_name: str = Field(alias='ROOT_ROLE_NAME', default='root')
default_status: str = Field(alias='DEFAULT_STATUS', default='active')
prefix: str = Field(alias='PREFIX', default='/api/v1')
def get_db_url() -> str:
settings = Settings()
return f'{settings.db_driver}://{settings.db_user}:{settings.db_password}@{settings.db_host}:{settings.db_port}/{settings.db_name}'
def get_auth_data() -> dict:
settings = Settings()
return {'secret_key': settings.secret_key, 'algorithm': settings.algorithm}

View File

@ -3,3 +3,6 @@ pydantic-settings==2.12.0
alembic==1.17.2
asyncpg==0.31.0
greenlet==3.2.4
werkzeug==3.1.3
pyjwt==2.9.0
fastapi==0.115.0