from datetime import date from typing import Optional, Sequence, Tuple, Literal from sqlalchemy import select, desc, or_, func, asc from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import joinedload from app.domain.models import LensIssue, Patient, User class LensIssuesRepository: def __init__(self, db: AsyncSession): self.db = db async def get_all( self, skip: int = 0, limit: int = 10, search: Optional[str] = None, sort_order: Literal["asc", "desc"] = "desc", start_date: Optional[date] = None, end_date: Optional[date] = None ) -> Tuple[Sequence[LensIssue], int]: stmt = ( select(LensIssue) .options(joinedload(LensIssue.lens)) .options(joinedload(LensIssue.patient)) .options(joinedload(LensIssue.doctor)) .join(Patient) .join(User) ) if search: search = f"%{search}%" stmt = stmt.filter( or_( Patient.last_name.ilike(search), Patient.first_name.ilike(search), User.last_name.ilike(search), User.first_name.ilike(search) ) ) if start_date: stmt = stmt.filter(LensIssue.issue_date >= start_date) if end_date: stmt = stmt.filter(LensIssue.issue_date <= end_date) stmt = stmt.order_by( desc(LensIssue.issue_date) if sort_order == "desc" else asc(LensIssue.issue_date) ) count_stmt = select(func.count()).select_from(LensIssue).join(Patient).join(User) if search: search = f"%{search}%" count_stmt = count_stmt.filter( or_( Patient.last_name.ilike(search), Patient.first_name.ilike(search), User.last_name.ilike(search), User.first_name.ilike(search) ) ) if start_date: count_stmt = count_stmt.filter(LensIssue.issue_date >= start_date) if end_date: count_stmt = count_stmt.filter(LensIssue.issue_date <= end_date) stmt = stmt.offset(skip).limit(limit) result = await self.db.execute(stmt) issues = result.scalars().all() count_result = await self.db.execute(count_stmt) total_count = count_result.scalar() return issues, total_count async def get_by_patient_id(self, patient_id: int) -> Sequence[LensIssue]: stmt = ( select(LensIssue) .filter_by(patient_id=patient_id) .options(joinedload(LensIssue.lens)) ) result = await self.db.execute(stmt) return result.scalars().all() async def get_by_lens_id(self, lens_id: int) -> Optional[LensIssue]: stmt = ( select(LensIssue) .filter_by(lens_id=lens_id) .options(joinedload(LensIssue.patient)) ) result = await self.db.execute(stmt) return result.scalars().first() async def get_by_id(self, lens_issue_id: int) -> Optional[LensIssue]: stmt = select(LensIssue).filter_by(id=lens_issue_id) result = await self.db.execute(stmt) return result.scalars().first() async def create(self, lens_issue: LensIssue) -> LensIssue: self.db.add(lens_issue) await self.db.commit() await self.db.refresh(lens_issue) return lens_issue