229 lines
8.3 KiB
Python
229 lines
8.3 KiB
Python
import re
|
||
from typing import Optional
|
||
|
||
from fastapi import HTTPException, status
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
||
from app.application.roles_repository import RolesRepository
|
||
from app.application.users_repository import UsersRepository
|
||
from app.domain.entities.change_password import ChangePasswordEntity
|
||
from app.domain.entities.register import RegisterEntity
|
||
from app.domain.entities.user import UserEntity
|
||
from app.domain.models import User
|
||
from app.infrastructure.roles_service import RolesService
|
||
|
||
|
||
class UsersService:
|
||
def __init__(self, db: AsyncSession):
|
||
self.users_repository = UsersRepository(db)
|
||
self.roles_repository = RolesRepository(db)
|
||
|
||
async def get_by_id(self, user_id: int) -> Optional[UserEntity]:
|
||
user = await self.users_repository.get_by_id(user_id)
|
||
if not user:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
detail='Пользователь не найден',
|
||
)
|
||
|
||
user_entity = self.model_to_entity(user)
|
||
user_entity.role = RolesService.model_to_entity(user.role)
|
||
|
||
return user_entity
|
||
|
||
async def get_all_users(self) -> list[UserEntity]:
|
||
users = await self.users_repository.get_all()
|
||
response = []
|
||
|
||
for user in users:
|
||
user_entity = self.model_to_entity(user)
|
||
user_entity.role = RolesService.model_to_entity(user.role)
|
||
response.append(user_entity)
|
||
|
||
return response
|
||
|
||
async def change_password(self, data: ChangePasswordEntity, current_user_id: int) -> Optional[UserEntity]:
|
||
user = await self.users_repository.get_by_id(data.user_id)
|
||
if not user:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
detail='Пользователь не найден',
|
||
)
|
||
|
||
current_user = await self.users_repository.get_by_id(current_user_id)
|
||
if not current_user:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
detail='Пользователь не найден',
|
||
)
|
||
|
||
if user.id != current_user.id and current_user.role.title != 'Администратор':
|
||
raise HTTPException(
|
||
status_code=status.HTTP_403_FORBIDDEN,
|
||
detail='Доступ запрещен',
|
||
)
|
||
|
||
if data.new_password != data.confirm_password:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
detail='Пароли не совпали',
|
||
)
|
||
|
||
user.set_password(data.confirm_password)
|
||
|
||
user = await self.users_repository.update(user)
|
||
|
||
return self.model_to_entity(user)
|
||
|
||
async def register_user(self, register_entity: RegisterEntity) -> Optional[UserEntity]:
|
||
role = await self.roles_repository.get_by_id(register_entity.role_id)
|
||
if not role:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
detail='Роль с таким ID не найдена'
|
||
)
|
||
|
||
user = await self.users_repository.get_by_login(register_entity.login)
|
||
if user:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
detail='Пользователь с таким логином уже существует'
|
||
)
|
||
|
||
if register_entity.password != register_entity.confirm_password:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
detail='Пароли не совпали',
|
||
)
|
||
|
||
if not self.is_strong_password(register_entity.confirm_password):
|
||
raise HTTPException(
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
detail="Пароль слишком слабый. Пароль должен содержать не менее 8 символов, включая хотя бы одну букву и одну цифру и один специальный символ."
|
||
)
|
||
|
||
user_model = User(
|
||
first_name=register_entity.first_name,
|
||
last_name=register_entity.last_name,
|
||
patronymic=register_entity.patronymic,
|
||
login=register_entity.login,
|
||
role_id=register_entity.role_id,
|
||
)
|
||
user_model.set_password(register_entity.confirm_password)
|
||
|
||
created_user = await self.users_repository.create(user_model)
|
||
|
||
return UserEntity(
|
||
id=created_user.id,
|
||
first_name=created_user.first_name,
|
||
last_name=created_user.last_name,
|
||
patronymic=created_user.patronymic,
|
||
login=created_user.login,
|
||
role_id=created_user.role_id,
|
||
)
|
||
|
||
async def set_is_blocked(self, user_id: int, is_blocked: bool, current_user_id: int) -> Optional[UserEntity]:
|
||
user_model = await self.users_repository.get_by_id(user_id)
|
||
if not user_model:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
detail='Пользователь не найден',
|
||
)
|
||
|
||
current_user = await self.users_repository.get_by_id(current_user_id)
|
||
if not current_user:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
detail='Пользователь не найден',
|
||
)
|
||
|
||
if current_user_id == user_id:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
detail='Администратор не может заблокировать и разблокировать сам себя',
|
||
)
|
||
|
||
user_model.is_blocked = is_blocked
|
||
|
||
user_model = await self.users_repository.update(user_model)
|
||
|
||
return self.model_to_entity(user_model)
|
||
|
||
async def update_user(self, user: UserEntity, user_id: int, current_user_id: int) -> Optional[UserEntity]:
|
||
user_model = await self.users_repository.get_by_id(user_id)
|
||
if not user_model:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
detail='Пользователь не найден',
|
||
)
|
||
|
||
current_user = await self.users_repository.get_by_id(current_user_id)
|
||
if not current_user:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
detail='Пользователь не найден',
|
||
)
|
||
|
||
if user_id and current_user.role.title != 'Администратор':
|
||
raise HTTPException(
|
||
status_code=status.HTTP_403_FORBIDDEN,
|
||
detail='Доступ запрещен',
|
||
)
|
||
|
||
user_model.first_name = user.first_name
|
||
user_model.last_name = user.last_name
|
||
user_model.patronymic = user.patronymic
|
||
|
||
if current_user.role.title == 'Администратор' and user_id != current_user.id:
|
||
user_model.role_id = user.role_id
|
||
|
||
user_model = await self.users_repository.update(user_model)
|
||
|
||
return self.model_to_entity(user_model)
|
||
|
||
@staticmethod
|
||
def is_strong_password(password: str) -> bool:
|
||
if len(password) < 8:
|
||
return False
|
||
|
||
if not re.search(r"[A-Z]", password):
|
||
return False
|
||
|
||
if not re.search(r"[a-z]", password):
|
||
return False
|
||
|
||
if not re.search(r"\d", password):
|
||
return False
|
||
|
||
if not re.search(r"[!@#$%^&*(),.?\":{}|<>]", password):
|
||
return False
|
||
|
||
return True
|
||
|
||
@staticmethod
|
||
def entity_to_model(user: UserEntity) -> User:
|
||
user_model = User(
|
||
first_name=user.first_name,
|
||
last_name=user.last_name,
|
||
patronymic=user.patronymic,
|
||
login=user.login,
|
||
is_blocked=user.is_blocked,
|
||
)
|
||
|
||
if user.id is not None:
|
||
user_model.id = user.id
|
||
|
||
return user_model
|
||
|
||
@staticmethod
|
||
def model_to_entity(user: User) -> UserEntity:
|
||
return UserEntity(
|
||
id=user.id,
|
||
first_name=user.first_name,
|
||
last_name=user.last_name,
|
||
patronymic=user.patronymic,
|
||
login=user.login,
|
||
role_id=user.role_id,
|
||
is_blocked=user.is_blocked,
|
||
)
|