visus-plus/api/app/infrastructure/users_service.py
andrei aadc4bf5bd feat: Админ панель, блокировка пользователей
Добавлена возможность блокировки/разблокировки пользователей администратором.
2025-06-29 10:40:02 +05:00

229 lines
8.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
from typing import Optional
from fastapi import HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.application.roles_repository import RolesRepository
from app.application.users_repository import UsersRepository
from app.domain.entities.change_password import ChangePasswordEntity
from app.domain.entities.register import RegisterEntity
from app.domain.entities.user import UserEntity
from app.domain.models import User
from app.infrastructure.roles_service import RolesService
class UsersService:
def __init__(self, db: AsyncSession):
self.users_repository = UsersRepository(db)
self.roles_repository = RolesRepository(db)
async def get_by_id(self, user_id: int) -> Optional[UserEntity]:
user = await self.users_repository.get_by_id(user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Пользователь не найден',
)
user_entity = self.model_to_entity(user)
user_entity.role = RolesService.model_to_entity(user.role)
return user_entity
async def get_all_users(self) -> list[UserEntity]:
users = await self.users_repository.get_all()
response = []
for user in users:
user_entity = self.model_to_entity(user)
user_entity.role = RolesService.model_to_entity(user.role)
response.append(user_entity)
return response
async def change_password(self, data: ChangePasswordEntity, current_user_id: int) -> Optional[UserEntity]:
user = await self.users_repository.get_by_id(data.user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Пользователь не найден',
)
current_user = await self.users_repository.get_by_id(current_user_id)
if not current_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Пользователь не найден',
)
if user.id != current_user.id and current_user.role.title != 'Администратор':
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail='Доступ запрещен',
)
if data.new_password != data.confirm_password:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Пароли не совпали',
)
user.set_password(data.confirm_password)
user = await self.users_repository.update(user)
return self.model_to_entity(user)
async def register_user(self, register_entity: RegisterEntity) -> Optional[UserEntity]:
role = await self.roles_repository.get_by_id(register_entity.role_id)
if not role:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Роль с таким ID не найдена'
)
user = await self.users_repository.get_by_login(register_entity.login)
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Пользователь с таким логином уже существует'
)
if register_entity.password != register_entity.confirm_password:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Пароли не совпали',
)
if not self.is_strong_password(register_entity.confirm_password):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Пароль слишком слабый. Пароль должен содержать не менее 8 символов, включая хотя бы одну букву и одну цифру и один специальный символ."
)
user_model = User(
first_name=register_entity.first_name,
last_name=register_entity.last_name,
patronymic=register_entity.patronymic,
login=register_entity.login,
role_id=register_entity.role_id,
)
user_model.set_password(register_entity.confirm_password)
created_user = await self.users_repository.create(user_model)
return UserEntity(
id=created_user.id,
first_name=created_user.first_name,
last_name=created_user.last_name,
patronymic=created_user.patronymic,
login=created_user.login,
role_id=created_user.role_id,
)
async def set_is_blocked(self, user_id: int, is_blocked: bool, current_user_id: int) -> Optional[UserEntity]:
user_model = await self.users_repository.get_by_id(user_id)
if not user_model:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Пользователь не найден',
)
current_user = await self.users_repository.get_by_id(current_user_id)
if not current_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Пользователь не найден',
)
if current_user_id == user_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Администратор не может заблокировать и разблокировать сам себя',
)
user_model.is_blocked = is_blocked
user_model = await self.users_repository.update(user_model)
return self.model_to_entity(user_model)
async def update_user(self, user: UserEntity, user_id: int, current_user_id: int) -> Optional[UserEntity]:
user_model = await self.users_repository.get_by_id(user_id)
if not user_model:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Пользователь не найден',
)
current_user = await self.users_repository.get_by_id(current_user_id)
if not current_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Пользователь не найден',
)
if user_id and current_user.role.title != 'Администратор':
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail='Доступ запрещен',
)
user_model.first_name = user.first_name
user_model.last_name = user.last_name
user_model.patronymic = user.patronymic
if current_user.role.title == 'Администратор' and user_id != current_user.id:
user_model.role_id = user.role_id
user_model = await self.users_repository.update(user_model)
return self.model_to_entity(user_model)
@staticmethod
def is_strong_password(password: str) -> bool:
if len(password) < 8:
return False
if not re.search(r"[A-Z]", password):
return False
if not re.search(r"[a-z]", password):
return False
if not re.search(r"\d", password):
return False
if not re.search(r"[!@#$%^&*(),.?\":{}|<>]", password):
return False
return True
@staticmethod
def entity_to_model(user: UserEntity) -> User:
user_model = User(
first_name=user.first_name,
last_name=user.last_name,
patronymic=user.patronymic,
login=user.login,
is_blocked=user.is_blocked,
)
if user.id is not None:
user_model.id = user.id
return user_model
@staticmethod
def model_to_entity(user: User) -> UserEntity:
return UserEntity(
id=user.id,
first_name=user.first_name,
last_name=user.last_name,
patronymic=user.patronymic,
login=user.login,
role_id=user.role_id,
is_blocked=user.is_blocked,
)