visus-plus/api/app/infrastructure/patients_service.py
andrei 44c7f031cc feat: Добавлена страница рассылок
Добавлена страница для отправки email рассылок пациентам.
2025-07-03 14:42:54 +05:00

104 lines
3.8 KiB
Python

from typing import Optional, Tuple, Literal
from fastapi import HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from starlette import status
from app.application.patients_repository import PatientsRepository
from app.domain.entities.patient import PatientEntity
from app.domain.models import Patient
class PatientsService:
def __init__(self, db: AsyncSession):
self.patient_repository = PatientsRepository(db)
async def get_all_patients(self, page: int = 1, page_size: int = 10, search: str = None,
sort_order: Literal["asc", "desc"] = "asc", all_params: bool = False) -> Tuple[
list[PatientEntity], int]:
skip = (page - 1) * page_size
patients, total_count = await self.patient_repository.get_all(skip=skip, limit=page_size, search=search,
sort_order=sort_order, all_params=all_params)
return (
[self.model_to_entity(patient) for patient in patients],
total_count
)
async def get_patients_with_email(self) -> list[PatientEntity]:
patients = await self.patient_repository.get_width_email()
return [
self.model_to_entity(patient)
for patient in patients
]
async def create_patient(self, patient: PatientEntity) -> PatientEntity:
patient_model = self.entity_to_model(patient)
await self.patient_repository.create(patient_model)
return self.model_to_entity(patient_model)
async def update_patient(self, patient_id: int, patient: PatientEntity) -> Optional[PatientEntity]:
patient_model = await self.patient_repository.get_by_id(patient_id)
if not patient_model:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Пациент не найден")
patient_model.first_name = patient.first_name
patient_model.last_name = patient.last_name
patient_model.patronymic = patient.patronymic
patient_model.birthday = patient.birthday
patient_model.address = patient.address
patient_model.email = patient.email
patient_model.phone = patient.phone
patient_model.diagnosis = patient.diagnosis
patient_model.correction = patient.correction
await self.patient_repository.update(patient_model)
return self.model_to_entity(patient_model)
async def delete_patient(self, patient_id: int) -> Optional[PatientEntity]:
patient = await self.patient_repository.get_by_id(patient_id)
if not patient:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Пациент не найден")
result = await self.patient_repository.delete(patient)
return self.model_to_entity(result)
@staticmethod
def model_to_entity(patient: Patient) -> PatientEntity:
return PatientEntity(
id=patient.id,
first_name=patient.first_name,
last_name=patient.last_name,
patronymic=patient.patronymic,
birthday=patient.birthday,
address=patient.address,
email=patient.email,
phone=patient.phone,
diagnosis=patient.diagnosis,
correction=patient.correction,
)
@staticmethod
def entity_to_model(patient: PatientEntity) -> Patient:
patient_model = Patient(
first_name=patient.first_name,
last_name=patient.last_name,
patronymic=patient.patronymic,
birthday=patient.birthday,
address=patient.address,
email=patient.email,
phone=patient.phone,
diagnosis=patient.diagnosis,
correction=patient.correction,
)
if patient.id is not None:
patient_model.id = patient.id
return patient_model