from typing import Sequence from sqlalchemy import select, desc, func from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import joinedload from datetime import date from app.domain.models import ScheduledAppointment class ScheduledAppointmentsRepository: def __init__(self, db: AsyncSession): self.db = db async def get_all(self, start_date: date | None = None, end_date: date | None = None) -> Sequence[ ScheduledAppointment]: stmt = ( select(ScheduledAppointment) .options(joinedload(ScheduledAppointment.type)) .options(joinedload(ScheduledAppointment.patient)) .options(joinedload(ScheduledAppointment.doctor)) .filter_by(is_canceled=False) .order_by(desc(ScheduledAppointment.scheduled_datetime)) ) if start_date: stmt = stmt.filter(ScheduledAppointment.scheduled_datetime >= start_date) if end_date: stmt = stmt.filter(ScheduledAppointment.scheduled_datetime <= end_date) result = await self.db.execute(stmt) return result.scalars().all() async def get_by_doctor_id(self, doctor_id: int, start_date: date | None = None, end_date: date | None = None) -> \ Sequence[ScheduledAppointment]: stmt = ( select(ScheduledAppointment) .options(joinedload(ScheduledAppointment.type)) .options(joinedload(ScheduledAppointment.patient)) .options(joinedload(ScheduledAppointment.doctor)) .filter_by(doctor_id=doctor_id, is_canceled=False) .order_by(desc(ScheduledAppointment.scheduled_datetime)) ) if start_date: stmt = stmt.filter(ScheduledAppointment.scheduled_datetime >= start_date) if end_date: stmt = stmt.filter(ScheduledAppointment.scheduled_datetime <= end_date) result = await self.db.execute(stmt) return result.scalars().all() async def get_upcoming_by_doctor_id(self, doctor_id: int) -> Sequence[ScheduledAppointment]: stmt = ( select(ScheduledAppointment) .options(joinedload(ScheduledAppointment.type)) .options(joinedload(ScheduledAppointment.patient)) .options(joinedload(ScheduledAppointment.doctor)) .filter_by(doctor_id=doctor_id, is_canceled=False) .filter(ScheduledAppointment.scheduled_datetime >= func.now()) .order_by(ScheduledAppointment.scheduled_datetime) .limit(5) ) result = await self.db.execute(stmt) return result.scalars().all() async def get_by_patient_id(self, patient_id: int, start_date: date | None = None, end_date: date | None = None) -> \ Sequence[ScheduledAppointment]: stmt = ( select(ScheduledAppointment) .options(joinedload(ScheduledAppointment.type)) .options(joinedload(ScheduledAppointment.patient)) .options(joinedload(ScheduledAppointment.doctor)) .filter_by(patient_id=patient_id, is_canceled=False) .order_by(desc(ScheduledAppointment.scheduled_datetime)) ) if start_date: stmt = stmt.filter(ScheduledAppointment.scheduled_datetime >= start_date) if end_date: stmt = stmt.filter(ScheduledAppointment.scheduled_datetime <= end_date) result = await self.db.execute(stmt) return result.scalars().all() async def get_by_id(self, scheduled_appointment_id: int) -> ScheduledAppointment: stmt = ( select(ScheduledAppointment) .options(joinedload(ScheduledAppointment.type)) .options(joinedload(ScheduledAppointment.patient)) .options(joinedload(ScheduledAppointment.doctor)) .filter_by(id=scheduled_appointment_id, is_canceled=False) ) result = await self.db.execute(stmt) return result.scalars().first() async def create(self, scheduled_appointment: ScheduledAppointment) -> ScheduledAppointment: self.db.add(scheduled_appointment) await self.db.commit() await self.db.refresh(scheduled_appointment) return scheduled_appointment async def update(self, scheduled_appointment: ScheduledAppointment) -> ScheduledAppointment: await self.db.merge(scheduled_appointment) await self.db.commit() return scheduled_appointment async def delete(self, scheduled_appointment: ScheduledAppointment) -> ScheduledAppointment: await self.db.delete(scheduled_appointment) await self.db.commit() return scheduled_appointment