сделал регистрацию пользователя и выдачу токена

This commit is contained in:
Андрей Дувакин 2025-02-07 10:52:06 +05:00
parent d2d3bf8c21
commit 73bffcc8f7
8 changed files with 123 additions and 12 deletions

View File

@ -0,0 +1,19 @@
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from app.domain.models import Role
class RolesRepository:
def __init__(self, db: AsyncSession):
self.db = db
async def get_all(self):
stmt = select(Role)
result = await self.db.execute(stmt)
return result.scalars().all()
async def get_by_id(self, role_id: int):
stmt = select(Role).filter(Role.id == role_id)
result = await self.db.execute(stmt)
return result.scalars().first()

View File

@ -28,7 +28,7 @@ class UsersRepository:
result = await self.db.execute(stmt) result = await self.db.execute(stmt)
return result.scalars().first() return result.scalars().first()
async def create(self, user: User): async def create(self, user: User) -> User:
self.db.add(user) self.db.add(user)
await self.db.commit() await self.db.commit()
await self.db.refresh(user) await self.db.refresh(user)

View File

@ -11,9 +11,9 @@ router = APIRouter()
@router.post( @router.post(
"/login/", "/login/",
response_model=dict, response_model=dict,
responses={401: {"description": "Неверный логин или пароль"}}, responses={401: {"description": "Invalid username or password"}},
summary="Аутентификация пользователя", summary="User authentication",
description="Производит вход пользователя и выдает `access_token` в `cookie`.", description="Logs in the user and outputs the `access_token` in the `cookie'",
) )
async def auth_user( async def auth_user(
response: Response, response: Response,
@ -29,7 +29,7 @@ async def auth_user(
if check is None: if check is None:
raise HTTPException( raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, status_code=status.HTTP_401_UNAUTHORIZED,
detail="Неверный логин или пароль", detail="Invalid username or password",
) )
access_token = auth_service.create_access_token({"sub": str(check.id)}) access_token = auth_service.create_access_token({"sub": str(check.id)})

View File

@ -4,6 +4,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.database.session import get_db from app.database.session import get_db
from app.domain.entities.register import RegisterEntity from app.domain.entities.register import RegisterEntity
from app.infrastructure.auth_service import AuthService from app.infrastructure.auth_service import AuthService
from app.infrastructure.user_service import UserService
router = APIRouter() router = APIRouter()
@ -11,12 +12,13 @@ router = APIRouter()
@router.post( @router.post(
"/register/", "/register/",
response_model=dict, response_model=dict,
summary="Регистрация пользователя", summary="User Registration",
description="Производит регистрацию пользователя в системе.", description="Performs user registration in the system",
) )
async def register_user( async def register_user(
response: Response,
user_data: RegisterEntity, user_data: RegisterEntity,
db: AsyncSession = Depends(get_db) db: AsyncSession = Depends(get_db)
): ):
return {} user_service = UserService(db)
user = await user_service.register_user(user_data)
return user.model_dump()

View File

@ -3,7 +3,7 @@ from sqlalchemy.orm import sessionmaker
from app.settings import settings from app.settings import settings
engine = create_async_engine(settings.DATABASE_URL, echo=True) engine = create_async_engine(settings.DATABASE_URL, echo=False)
async_session_maker = sessionmaker( async_session_maker = sessionmaker(
bind=engine, class_=AsyncSession, expire_on_commit=False bind=engine, class_=AsyncSession, expire_on_commit=False

View File

@ -0,0 +1,14 @@
from typing import Optional
from pydantic import BaseModel, Field
class UserEntity(BaseModel):
id: int = Field(..., example=1)
first_name: str = Field(..., example='Ivan')
last_name: str = Field(..., example='Ivanov')
patronymic: Optional[str] = Field(None, example='Ivanov')
login: str = Field(..., example='user@example.com')
role_id: int = Field(..., example=1)

View File

@ -9,10 +9,10 @@ from app.settings import get_auth_data
class AuthService: class AuthService:
def __init__(self, db: AsyncSession): def __init__(self, db: AsyncSession):
self.repository = UsersRepository(db) self.user_repository = UsersRepository(db)
async def authenticate_user(self, login: str, password: str): async def authenticate_user(self, login: str, password: str):
user = await self.repository.get_by_login(login) user = await self.user_repository.get_by_login(login)
if not user: if not user:
return None return None

View File

@ -0,0 +1,76 @@
import re
from fastapi import HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.application.roles_repository import RolesRepository
from app.application.users_repository import UsersRepository
from app.domain.entities.register import RegisterEntity
from app.domain.entities.user import UserEntity
from app.domain.models import User
class UserService:
def __init__(self, db: AsyncSession):
self.user_repository = UsersRepository(db)
self.role_repository = RolesRepository(db)
async def register_user(self, register_entity: RegisterEntity) -> UserEntity:
role = await self.role_repository.get_by_id(register_entity.role_id)
if not role:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='The role with this ID was not found'
)
user = await self.user_repository.get_by_login(register_entity.login)
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Such a login already exists'
)
if not self.is_strong_password(register_entity.password):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Password is too weak. It must contain at least 8 characters, including an uppercase letter, a lowercase letter, a digit, and a special character."
)
user_model = User(
first_name=register_entity.first_name,
last_name=register_entity.last_name,
patronymic=register_entity.patronymic,
login=register_entity.login,
role_id=register_entity.role_id,
)
user_model.set_password(register_entity.password)
created_user = await self.user_repository.create(user_model)
return UserEntity(
id=created_user.id,
first_name=created_user.first_name,
last_name=created_user.last_name,
patronymic=created_user.patronymic,
login=created_user.login,
role_id=created_user.role_id,
)
@staticmethod
def is_strong_password(password: str) -> bool:
if len(password) < 8:
return False
if not re.search(r"[A-Z]", password):
return False
if not re.search(r"[a-z]", password):
return False
if not re.search(r"\d", password):
return False
if not re.search(r"[!@#$%^&*(),.?\":{}|<>]", password):
return False
return True