visus-plus/api/app/application/appointments_repository.py

132 lines
5.2 KiB
Python

from typing import Sequence, Optional
from sqlalchemy import select, desc, func
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload
from datetime import date, timedelta
from app.domain.models import Appointment
class AppointmentsRepository:
def __init__(self, db: AsyncSession):
self.db = db
async def get_all(self, start_date: date | None = None, end_date: date | None = None) -> Sequence[Appointment]:
stmt = (
select(Appointment)
.options(joinedload(Appointment.type))
.options(joinedload(Appointment.patient))
.options(joinedload(Appointment.doctor))
.order_by(desc(Appointment.appointment_datetime))
)
if start_date:
stmt = stmt.filter(Appointment.appointment_datetime >= start_date)
if end_date:
stmt = stmt.filter(Appointment.appointment_datetime <= end_date)
result = await self.db.execute(stmt)
return result.scalars().all()
async def get_by_doctor_id(self, doctor_id: int, start_date: date | None = None, end_date: date | None = None) -> \
Sequence[Appointment]:
stmt = (
select(Appointment)
.options(joinedload(Appointment.type))
.options(joinedload(Appointment.patient))
.options(joinedload(Appointment.doctor))
.filter_by(doctor_id=doctor_id)
.order_by(desc(Appointment.appointment_datetime))
)
if start_date:
stmt = stmt.filter(Appointment.appointment_datetime >= start_date)
if end_date:
stmt = stmt.filter(Appointment.appointment_datetime <= end_date)
result = await self.db.execute(stmt)
return result.scalars().all()
async def get_reminders(self, current_date: date) -> Sequence[Appointment]:
stmt = (
select(Appointment)
.options(joinedload(Appointment.type))
.options(joinedload(Appointment.patient))
.options(joinedload(Appointment.doctor))
.filter(Appointment.days_until_the_next_appointment.is_not(None))
)
result = await self.db.execute(stmt)
appointments = result.scalars().all()
filtered_appointments = []
for appointment in appointments:
next_appointment_date = (appointment.appointment_datetime + timedelta(
days=appointment.days_until_the_next_appointment)).date()
days_until = appointment.days_until_the_next_appointment
window_days = (
7 if days_until <= 90 else
14 if days_until <= 180 else
30 if days_until <= 365 else
60
)
window_start = next_appointment_date - timedelta(days=window_days)
window_end = next_appointment_date + timedelta(days=window_days)
if window_start <= current_date <= window_end:
filtered_appointments.append(appointment)
return filtered_appointments
async def get_upcoming_by_doctor_id(self, doctor_id: int) -> Sequence[Appointment]:
stmt = (
select(Appointment)
.options(joinedload(Appointment.type))
.options(joinedload(Appointment.patient))
.options(joinedload(Appointment.doctor))
.filter_by(doctor_id=doctor_id)
.filter(Appointment.appointment_datetime >= func.now())
.order_by(Appointment.appointment_datetime)
.limit(5)
)
result = await self.db.execute(stmt)
return result.scalars().all()
async def get_by_patient_id(self, patient_id: int, start_date: date | None = None, end_date: date | None = None) -> \
Sequence[Appointment]:
stmt = (
select(Appointment)
.options(joinedload(Appointment.type))
.options(joinedload(Appointment.patient))
.options(joinedload(Appointment.doctor))
.filter_by(patient_id=patient_id)
.order_by(desc(Appointment.appointment_datetime))
)
if start_date:
stmt = stmt.filter(Appointment.appointment_datetime >= start_date)
if end_date:
stmt = stmt.filter(Appointment.appointment_datetime <= end_date)
result = await self.db.execute(stmt)
return result.scalars().all()
async def get_by_id(self, appointment_id: int) -> Optional[Appointment]:
stmt = (
select(Appointment)
.options(joinedload(Appointment.type))
.options(joinedload(Appointment.patient))
.options(joinedload(Appointment.doctor))
.filter_by(id=appointment_id)
)
result = await self.db.execute(stmt)
return result.scalars().first()
async def create(self, appointment: Appointment) -> Appointment:
self.db.add(appointment)
await self.db.commit()
await self.db.refresh(appointment)
return appointment
async def update(self, appointment: Appointment) -> Appointment:
await self.db.merge(appointment)
await self.db.commit()
return appointment
async def delete(self, appointment) -> Appointment:
await self.db.delete(appointment)
await self.db.commit()
return appointment