146 lines
5.0 KiB
Python

from typing import Optional
from fastapi import HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.application.profiles_repository import ProfilesRepository
from app.application.roles_repository import RolesRepository
from app.application.teams_repository import TeamsRepository
from app.application.users_repository import UsersRepository
from app.domain.entities.profile import ProfileEntity
from app.domain.entities.register import RegisterEntity
from app.domain.entities.user import UserEntity
from app.domain.models import User, Profile
class UsersService:
def __init__(self, db: AsyncSession):
self.users_repository = UsersRepository(db)
self.teams_repository = TeamsRepository(db)
self.roles_repository = RolesRepository(db)
self.profiles_repository = ProfilesRepository(db)
async def register_user(self, register_entity: RegisterEntity) -> Optional[UserEntity]:
team = await self.teams_repository.get_by_id(register_entity.team_id)
if team is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="The team with this ID was not found",
)
role = await self.roles_repository.get_by_id(register_entity.role_id)
if role is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="The role with this ID was not found",
)
if not self.is_strong_password(register_entity.password):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="The password is too weak",
)
user_model, profile_model = self.register_entity_to_models(register_entity)
profile_model = await self.profiles_repository.create(profile_model)
user_model.profile_id = profile_model.id
user_model = await self.users_repository.create(user_model)
user_entity = self.user_model_to_entity(user_model)
user_entity.profile = self.profile_model_to_entity(profile_model)
return user_entity
async def change_user_password(self, user_id: int, new_password: str, user: User) -> Optional[UserEntity]:
user_model = await self.users_repository.get_by_id(user_id)
if user_model is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="The user with this ID was not found",
)
if user_model.id != user_id and user.profile.role.title != 'Администратор':
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Permission denied",
)
if not self.is_strong_password(new_password):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="The password is too weak",
)
user_model.set_password(new_password)
user_model = await self.users_repository.update(user_model)
return self.user_model_to_entity(user_model)
@staticmethod
def is_strong_password(password):
if len(password) < 8:
return False
if not any(char.isupper() for char in password):
return False
if not any(char.islower() for char in password):
return False
if not any(char.isdigit() for char in password):
return False
if not any(char in "!@#$%^&*()_+" for char in password):
return False
if not any(char.isalpha() for char in password):
return False
return True
@staticmethod
def register_entity_to_models(register_entity: RegisterEntity) -> tuple[User, Profile]:
user = User(
login=register_entity.login,
)
user.set_password(register_entity.password)
pofile = Profile(
first_name=register_entity.first_name,
last_name=register_entity.last_name,
patronymic=register_entity.patronymic,
birthday=register_entity.birthday,
email=register_entity.email,
phone=register_entity.phone,
role_id=register_entity.role_id,
team_id=register_entity.team_id
)
return user, pofile
@staticmethod
def profile_model_to_entity(profile_model: Profile) -> ProfileEntity:
return ProfileEntity(
id=profile_model.id,
first_name=profile_model.first_name,
last_name=profile_model.last_name,
patronymic=profile_model.patronymic,
birthday=profile_model.birthday,
email=profile_model.email,
phone=profile_model.phone,
role_id=profile_model.role_id,
team_id=profile_model.team_id,
)
@staticmethod
def user_model_to_entity(user_model: User) -> UserEntity:
return UserEntity(
id=user_model.id,
login=user_model.login,
profile_id=user_model.profile_id,
)