87 lines
2.8 KiB
Python
87 lines
2.8 KiB
Python
from typing import Sequence, Optional, Tuple, Literal
|
|
|
|
from sqlalchemy import select, func, or_
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.domain.models import Patient
|
|
|
|
|
|
class PatientsRepository:
|
|
def __init__(self, db: AsyncSession):
|
|
self.db = db
|
|
|
|
async def get_all(self, skip: int = 0, limit: int = 100, search: str = None,
|
|
sort_order: Literal["asc", "desc"] = "asc", all_params: bool = False) -> Tuple[
|
|
Sequence[Patient], int]:
|
|
stmt = select(Patient)
|
|
|
|
if search:
|
|
search = f"%{search}%"
|
|
stmt = stmt.filter(
|
|
or_(
|
|
Patient.last_name.ilike(search),
|
|
Patient.first_name.ilike(search),
|
|
Patient.patronymic.ilike(search),
|
|
Patient.email.ilike(search),
|
|
Patient.phone.ilike(search)
|
|
)
|
|
)
|
|
|
|
if sort_order == "desc":
|
|
stmt = stmt.order_by(Patient.last_name.desc())
|
|
else:
|
|
stmt = stmt.order_by(Patient.last_name.asc())
|
|
|
|
stmt = stmt.offset(skip).limit(limit)
|
|
result = await self.db.execute(stmt)
|
|
patients = result.scalars().all()
|
|
|
|
count_stmt = select(func.count()).select_from(Patient)
|
|
if search:
|
|
search = f"%{search}%"
|
|
count_stmt = count_stmt.filter(
|
|
or_(
|
|
Patient.last_name.ilike(search),
|
|
Patient.first_name.ilike(search),
|
|
Patient.patronymic.ilike(search),
|
|
Patient.email.ilike(search),
|
|
Patient.phone.ilike(search)
|
|
)
|
|
)
|
|
count_result = await self.db.execute(count_stmt)
|
|
total_count = count_result.scalar()
|
|
|
|
if not all_params:
|
|
stmt = stmt.offset(skip).limit(limit)
|
|
|
|
result = await self.db.execute(stmt)
|
|
patients = result.scalars().all()
|
|
|
|
return patients, total_count
|
|
|
|
async def get_width_email(self) -> Sequence[Patient]:
|
|
stmt = select(Patient).filter(Patient.email != None)
|
|
result = await self.db.execute(stmt)
|
|
return result.scalars().all()
|
|
|
|
async def get_by_id(self, patient_id: int) -> Optional[Patient]:
|
|
stmt = select(Patient).filter_by(id=patient_id)
|
|
result = await self.db.execute(stmt)
|
|
return result.scalars().first()
|
|
|
|
async def create(self, patient: Patient) -> Patient:
|
|
self.db.add(patient)
|
|
await self.db.commit()
|
|
await self.db.refresh(patient)
|
|
return patient
|
|
|
|
async def update(self, patient: Patient) -> Patient:
|
|
await self.db.merge(patient)
|
|
await self.db.commit()
|
|
return patient
|
|
|
|
async def delete(self, patient: Patient) -> Patient:
|
|
await self.db.delete(patient)
|
|
await self.db.commit()
|
|
return patient
|