visus-plus/api/app/application/scheduled_appointments_repository.py

105 lines
4.6 KiB
Python

from typing import Sequence
from sqlalchemy import select, desc, func
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload
from datetime import date
from app.domain.models import ScheduledAppointment
class ScheduledAppointmentsRepository:
def __init__(self, db: AsyncSession):
self.db = db
async def get_all(self, start_date: date | None = None, end_date: date | None = None) -> Sequence[
ScheduledAppointment]:
stmt = (
select(ScheduledAppointment)
.options(joinedload(ScheduledAppointment.type))
.options(joinedload(ScheduledAppointment.patient))
.options(joinedload(ScheduledAppointment.doctor))
.filter_by(is_canceled=False)
.order_by(desc(ScheduledAppointment.scheduled_datetime))
)
if start_date:
stmt = stmt.filter(ScheduledAppointment.scheduled_datetime >= start_date)
if end_date:
stmt = stmt.filter(ScheduledAppointment.scheduled_datetime <= end_date)
result = await self.db.execute(stmt)
return result.scalars().all()
async def get_by_doctor_id(self, doctor_id: int, start_date: date | None = None, end_date: date | None = None) -> \
Sequence[ScheduledAppointment]:
stmt = (
select(ScheduledAppointment)
.options(joinedload(ScheduledAppointment.type))
.options(joinedload(ScheduledAppointment.patient))
.options(joinedload(ScheduledAppointment.doctor))
.filter_by(doctor_id=doctor_id, is_canceled=False)
.order_by(desc(ScheduledAppointment.scheduled_datetime))
)
if start_date:
stmt = stmt.filter(ScheduledAppointment.scheduled_datetime >= start_date)
if end_date:
stmt = stmt.filter(ScheduledAppointment.scheduled_datetime <= end_date)
result = await self.db.execute(stmt)
return result.scalars().all()
async def get_upcoming_by_doctor_id(self, doctor_id: int) -> Sequence[ScheduledAppointment]:
stmt = (
select(ScheduledAppointment)
.options(joinedload(ScheduledAppointment.type))
.options(joinedload(ScheduledAppointment.patient))
.options(joinedload(ScheduledAppointment.doctor))
.filter_by(doctor_id=doctor_id, is_canceled=False)
.filter(ScheduledAppointment.scheduled_datetime >= func.now())
.order_by(ScheduledAppointment.scheduled_datetime)
.limit(5)
)
result = await self.db.execute(stmt)
return result.scalars().all()
async def get_by_patient_id(self, patient_id: int, start_date: date | None = None, end_date: date | None = None) -> \
Sequence[ScheduledAppointment]:
stmt = (
select(ScheduledAppointment)
.options(joinedload(ScheduledAppointment.type))
.options(joinedload(ScheduledAppointment.patient))
.options(joinedload(ScheduledAppointment.doctor))
.filter_by(patient_id=patient_id, is_canceled=False)
.order_by(desc(ScheduledAppointment.scheduled_datetime))
)
if start_date:
stmt = stmt.filter(ScheduledAppointment.scheduled_datetime >= start_date)
if end_date:
stmt = stmt.filter(ScheduledAppointment.scheduled_datetime <= end_date)
result = await self.db.execute(stmt)
return result.scalars().all()
async def get_by_id(self, scheduled_appointment_id: int) -> ScheduledAppointment:
stmt = (
select(ScheduledAppointment)
.options(joinedload(ScheduledAppointment.type))
.options(joinedload(ScheduledAppointment.patient))
.options(joinedload(ScheduledAppointment.doctor))
.filter_by(id=scheduled_appointment_id, is_canceled=False)
)
result = await self.db.execute(stmt)
return result.scalars().first()
async def create(self, scheduled_appointment: ScheduledAppointment) -> ScheduledAppointment:
self.db.add(scheduled_appointment)
await self.db.commit()
await self.db.refresh(scheduled_appointment)
return scheduled_appointment
async def update(self, scheduled_appointment: ScheduledAppointment) -> ScheduledAppointment:
await self.db.merge(scheduled_appointment)
await self.db.commit()
return scheduled_appointment
async def delete(self, scheduled_appointment: ScheduledAppointment) -> ScheduledAppointment:
await self.db.delete(scheduled_appointment)
await self.db.commit()
return scheduled_appointment