import re from typing import Optional from fastapi import HTTPException, status from sqlalchemy.ext.asyncio import AsyncSession from app.application.roles_repository import RolesRepository from app.application.users_repository import UsersRepository from app.domain.entities.change_password import ChangePasswordEntity from app.domain.entities.register import RegisterEntity from app.domain.entities.role import RoleEntity from app.domain.entities.user import UserEntity from app.domain.models import User, Role class UsersService: def __init__(self, db: AsyncSession): self.users_repository = UsersRepository(db) self.roles_repository = RolesRepository(db) async def get_by_id(self, user_id: int) -> Optional[UserEntity]: user = await self.users_repository.get_by_id(user_id) if not user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail='User was not found', ) user_entity = self.model_to_entity(user) user_entity.role = self.role_model_to_entity(user.role) return user_entity async def change_password(self, data: ChangePasswordEntity, current_user_id: int) -> Optional[UserEntity]: user = await self.users_repository.get_by_id(data.user_id) if not user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail='User was not found', ) current_user = await self.users_repository.get_by_id(current_user_id) if not current_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail='User was not found', ) if user.id != current_user.id and current_user.role.title != 'Администратор': raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail='Permission denied', ) if not user.check_password(data.current_password): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail='Permission denied', ) if data.new_password != data.confirm_password: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail='Password not matched', ) user.set_password(data.confirm_password) user = await self.users_repository.update(user) return self.model_to_entity(user) async def register_user(self, register_entity: RegisterEntity) -> Optional[UserEntity]: role = await self.roles_repository.get_by_id(register_entity.role_id) if not role: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail='The role with this ID was not found' ) user = await self.users_repository.get_by_login(register_entity.login) if user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail='Such a login already exists' ) if not self.is_strong_password(register_entity.password): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Password is too weak. It must contain at least 8 characters, including an uppercase letter, a lowercase letter, a digit, and a special character." ) user_model = User( first_name=register_entity.first_name, last_name=register_entity.last_name, patronymic=register_entity.patronymic, login=register_entity.login, role_id=register_entity.role_id, ) user_model.set_password(register_entity.password) created_user = await self.users_repository.create(user_model) return UserEntity( id=created_user.id, first_name=created_user.first_name, last_name=created_user.last_name, patronymic=created_user.patronymic, login=created_user.login, role_id=created_user.role_id, ) async def update_user(self, user: UserEntity, user_id: int, current_user_id: int) -> Optional[UserEntity]: user_model = await self.users_repository.get_by_id(user_id) if not user_model: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail='User was not found', ) current_user = await self.users_repository.get_by_id(current_user_id) if not current_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail='User was not found', ) if user.id != current_user.id and current_user.role.title != 'Администратор': raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail='Permission denied', ) user_model.first_name = user.first_name user_model.last_name = user.last_name user_model.patronymic = user.patronymic user_model = await self.users_repository.update(user_model) return self.model_to_entity(user_model) @staticmethod def role_model_to_entity(role: Role) -> RoleEntity: return RoleEntity( id=role.id, title=role.title, ) @staticmethod def is_strong_password(password: str) -> bool: if len(password) < 8: return False if not re.search(r"[A-Z]", password): return False if not re.search(r"[a-z]", password): return False if not re.search(r"\d", password): return False if not re.search(r"[!@#$%^&*(),.?\":{}|<>]", password): return False return True @staticmethod def entity_to_model(user: UserEntity) -> User: user_model = User( first_name=user.first_name, last_name=user.last_name, patronymic=user.patronymic, login=user.login, ) if user.id is not None: user_model.id = user.id return user_model @staticmethod def model_to_entity(user: User) -> UserEntity: return UserEntity( id=user.id, first_name=user.first_name, last_name=user.last_name, patronymic=user.patronymic, login=user.login, role_id=user.role_id, )