visus-plus/api/app/infrastructure/users_service.py

140 lines
4.5 KiB
Python

import re
from typing import Optional
from fastapi import HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.application.roles_repository import RolesRepository
from app.application.users_repository import UsersRepository
from app.domain.entities.register import RegisterEntity
from app.domain.entities.user import UserEntity
from app.domain.models import User
class UsersService:
def __init__(self, db: AsyncSession):
self.users_repository = UsersRepository(db)
self.roles_repository = RolesRepository(db)
async def get_by_id(self, user_id: int) -> Optional[UserEntity]:
user = await self.users_repository.get_by_id(user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='User was not found',
)
return self.model_to_entity(user)
async def change_password(self, user_id: int, new_password: str, current_user_id: int) -> Optional[UserEntity]:
user = await self.users_repository.get_by_id(user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='User was not found',
)
current_user = await self.users_repository.get_by_id(current_user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='User was not found',
)
if user.id != current_user.id and current_user.role.title != 'Администратор':
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail='Permission denied',
)
user.set_password(new_password)
user = await self.users_repository.update(user)
return self.model_to_entity(user)
async def register_user(self, register_entity: RegisterEntity) -> Optional[UserEntity]:
role = await self.roles_repository.get_by_id(register_entity.role_id)
if not role:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='The role with this ID was not found'
)
user = await self.users_repository.get_by_login(register_entity.login)
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Such a login already exists'
)
if not self.is_strong_password(register_entity.password):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Password is too weak. It must contain at least 8 characters, including an uppercase letter, a lowercase letter, a digit, and a special character."
)
user_model = User(
first_name=register_entity.first_name,
last_name=register_entity.last_name,
patronymic=register_entity.patronymic,
login=register_entity.login,
role_id=register_entity.role_id,
)
user_model.set_password(register_entity.password)
created_user = await self.users_repository.create(user_model)
return UserEntity(
id=created_user.id,
first_name=created_user.first_name,
last_name=created_user.last_name,
patronymic=created_user.patronymic,
login=created_user.login,
role_id=created_user.role_id,
)
@staticmethod
def is_strong_password(password: str) -> bool:
if len(password) < 8:
return False
if not re.search(r"[A-Z]", password):
return False
if not re.search(r"[a-z]", password):
return False
if not re.search(r"\d", password):
return False
if not re.search(r"[!@#$%^&*(),.?\":{}|<>]", password):
return False
return True
@staticmethod
def entity_to_model(user: UserEntity) -> User:
user_model = User(
first_name=user.first_name,
last_name=user.last_name,
patronymic=user.patronymic,
login=user.login,
)
if user.id is not None:
user_model.id = user.id
return user_model
@staticmethod
def model_to_entity(user: User) -> UserEntity:
return UserEntity(
id=user.id,
first_name=user.first_name,
last_name=user.last_name,
patronymic=user.patronymic,
login=user.login,
role_id=user.role_id,
)