import re from typing import Optional from fastapi import HTTPException, status from sqlalchemy.ext.asyncio import AsyncSession from app.application.roles_repository import RolesRepository from app.application.users_repository import UsersRepository from app.domain.entities.change_password import ChangePasswordEntity from app.domain.entities.register import RegisterEntity from app.domain.entities.user import UserEntity from app.domain.models import User from app.infrastructure.roles_service import RolesService class UsersService: def __init__(self, db: AsyncSession): self.users_repository = UsersRepository(db) self.roles_repository = RolesRepository(db) async def get_by_id(self, user_id: int) -> Optional[UserEntity]: user = await self.users_repository.get_by_id(user_id) if not user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail='Пользователь не найден', ) user_entity = self.model_to_entity(user) user_entity.role = RolesService.model_to_entity(user.role) return user_entity async def get_all_users(self) -> list[UserEntity]: users = await self.users_repository.get_all() response = [] for user in users: user_entity = self.model_to_entity(user) user_entity.role = RolesService.model_to_entity(user.role) response.append(user_entity) return response async def change_password(self, data: ChangePasswordEntity, current_user_id: int) -> Optional[UserEntity]: user = await self.users_repository.get_by_id(data.user_id) if not user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail='Пользователь не найден', ) current_user = await self.users_repository.get_by_id(current_user_id) if not current_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail='Пользователь не найден', ) if user.id != current_user.id and current_user.role.title != 'Администратор': raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail='Доступ запрещен', ) if not user.check_password(data.current_password): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail='Доступ запрещен', ) if data.new_password != data.confirm_password: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail='Пароли не совпали', ) user.set_password(data.confirm_password) user = await self.users_repository.update(user) return self.model_to_entity(user) async def register_user(self, register_entity: RegisterEntity) -> Optional[UserEntity]: role = await self.roles_repository.get_by_id(register_entity.role_id) if not role: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail='Роль с таким ID не найдена' ) user = await self.users_repository.get_by_login(register_entity.login) if user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail='Пользователь с таким логином уже существует' ) if register_entity.password != register_entity.confirm_password: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail='Пароли не совпали', ) if not self.is_strong_password(register_entity.confirm_password): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Пароль слишком слабый. Пароль должен содержать не менее 8 символов, включая хотя бы одну букву и одну цифру и один специальный символ." ) user_model = User( first_name=register_entity.first_name, last_name=register_entity.last_name, patronymic=register_entity.patronymic, login=register_entity.login, role_id=register_entity.role_id, ) user_model.set_password(register_entity.confirm_password) created_user = await self.users_repository.create(user_model) return UserEntity( id=created_user.id, first_name=created_user.first_name, last_name=created_user.last_name, patronymic=created_user.patronymic, login=created_user.login, role_id=created_user.role_id, ) async def update_user(self, user: UserEntity, user_id: int, current_user_id: int) -> Optional[UserEntity]: user_model = await self.users_repository.get_by_id(user_id) if not user_model: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail='Пользователь не найден', ) current_user = await self.users_repository.get_by_id(current_user_id) if not current_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail='Пользователь не найден', ) if user.id != current_user.id and current_user.role.title != 'Администратор': raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail='Доступ запрещен', ) user_model.first_name = user.first_name user_model.last_name = user.last_name user_model.patronymic = user.patronymic user_model = await self.users_repository.update(user_model) return self.model_to_entity(user_model) @staticmethod def is_strong_password(password: str) -> bool: if len(password) < 8: return False if not re.search(r"[A-Z]", password): return False if not re.search(r"[a-z]", password): return False if not re.search(r"\d", password): return False if not re.search(r"[!@#$%^&*(),.?\":{}|<>]", password): return False return True @staticmethod def entity_to_model(user: UserEntity) -> User: user_model = User( first_name=user.first_name, last_name=user.last_name, patronymic=user.patronymic, login=user.login, ) if user.id is not None: user_model.id = user.id return user_model @staticmethod def model_to_entity(user: User) -> UserEntity: return UserEntity( id=user.id, first_name=user.first_name, last_name=user.last_name, patronymic=user.patronymic, login=user.login, role_id=user.role_id, )