psb_hack/api/app/infrastructure/users_service.py

106 lines
4.1 KiB
Python

from typing import Optional, List
from fastapi import HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.application.users_repository import UsersRepository
from app.domain.entities.users import UserRead, UserUpdate, PasswordChangeRequest
from app.domain.models import User
from app.infrastructure.register_service import RegisterService
from app.settings import Settings
class UsersService:
def __init__(self, db: AsyncSession):
self.users_repository = UsersRepository(db)
self.settings = Settings()
async def get_all(self) -> List[UserRead]:
users = await self.users_repository.get_all()
response = []
for user in users:
response.append(UserRead.model_validate(user))
return response
async def get_by_id(self, user_id: int) -> UserRead:
user = await self.users_repository.get_by_id(user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Пользователь не найден',
)
return UserRead.model_validate(user)
async def update(self, user_id: int, user: UserUpdate, current_user: User) -> Optional[UserRead]:
user_model = await self.users_repository.get_by_id(user_id)
if not user_model:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Пользователь не найден',
)
if current_user.id != user_model.id and current_user.role.title != self.settings.root_role_name:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail='Доступ запрещен',
)
user_model.first_name = user.first_name
user_model.last_name = user.last_name
user_model.patronymic = user.patronymic
user_model.email = user.email
user_model.birthdate = user.birthdate
if current_user.role.title == self.settings.root_role_name and user.role_id is not None:
user_model.role_id = user.role_id
user_model.status_id = user.status_id
user_model = await self.users_repository.update(user_model)
return UserRead.model_validate(user_model)
async def change_password(
self, user_id: int, password_data: PasswordChangeRequest, current_user: User
) -> Optional[UserRead]:
user_model = await self.users_repository.get_by_id(user_id)
if not user_model:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Пользователь не найден',
)
if current_user.id != user_model.id and current_user.role.title != self.settings.root_role_name:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail='Доступ запрещен',
)
if password_data.password != password_data.repeat_password:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Пароли не совпадают',
)
if not RegisterService.is_strong_password(password_data.repeat_password):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Пароль слишком слабый. Пароль должен содержать не менее 8 символов, включая хотя бы одну букву и одну цифру и один специальный символ.'
)
user_model.set_password(password_data.password)
user_model = await self.users_repository.update(user_model)
return UserRead.model_validate(user_model)
async def get_by_role_name(self, role_name: str) -> List[UserRead]:
users = await self.users_repository.get_by_role_name(role_name)
response = []
for user in users:
response.append(UserRead.model_validate(user))
return response