from typing import Sequence, Optional, Tuple, Literal from sqlalchemy import select, desc, or_, func, String from sqlalchemy.ext.asyncio import AsyncSession from app.domain.models import Lens class LensesRepository: def __init__(self, db: AsyncSession): self.db = db async def get_all( self, skip: int = 0, limit: int = 10, search: str = None, sort_order: Literal["asc", "desc"] = "desc", tor: float = None, diameter: float = None, preset_refraction: float = None, periphery_toricity: float = None, side: str = "all", issued: bool = None, trial: float = None, ) -> Tuple[Sequence[Lens], int]: stmt = select(Lens) count_stmt = select(func.count()).select_from(Lens) if search: search = f"%{search}%" stmt = stmt.filter( or_( Lens.tor.cast(String).ilike(search), Lens.diameter.cast(String).ilike(search), Lens.preset_refraction.cast(String).ilike(search), Lens.periphery_toricity.cast(String).ilike(search), Lens.side.cast(String).ilike(search), Lens.fvc.cast(String).ilike(search), Lens.trial.cast(String).ilike(search), Lens.esa.cast(String).ilike(search), ) ) count_stmt = count_stmt.filter( or_( Lens.tor.cast(String).ilike(search), Lens.diameter.cast(String).ilike(search), Lens.preset_refraction.cast(String).ilike(search), Lens.periphery_toricity.cast(String).ilike(search), Lens.side.cast(String).ilike(search), Lens.fvc.cast(String).ilike(search), Lens.trial.cast(String).ilike(search), Lens.esa.cast(String).ilike(search), ) ) if tor is not None: stmt = stmt.filter(Lens.tor == tor) count_stmt = count_stmt.filter(Lens.tor == tor) if diameter is not None: stmt = stmt.filter(Lens.diameter == diameter) count_stmt = count_stmt.filter(Lens.diameter == diameter) if preset_refraction is not None: stmt = stmt.filter(Lens.preset_refraction == preset_refraction) count_stmt = count_stmt.filter(Lens.preset_refraction == preset_refraction) if periphery_toricity is not None: stmt = stmt.filter(Lens.periphery_toricity == periphery_toricity) count_stmt = count_stmt.filter(Lens.periphery_toricity == periphery_toricity) if side != "all": stmt = stmt.filter(Lens.side == side) count_stmt = count_stmt.filter(Lens.side == side) if issued is not None: stmt = stmt.filter(Lens.issued == issued) count_stmt = count_stmt.filter(Lens.issued == issued) if trial is not None: stmt = stmt.filter(Lens.trial == trial) count_stmt = count_stmt.filter(Lens.trial == trial) stmt = stmt.order_by(Lens.id.desc() if sort_order == "desc" else Lens.id.asc()) stmt = stmt.offset(skip).limit(limit) result = await self.db.execute(stmt) lenses = result.scalars().all() count_result = await self.db.execute(count_stmt) total_count = count_result.scalar() return lenses, total_count async def get_all_not_issued(self) -> Sequence[Lens]: stmt = select(Lens).filter_by(issued=False).order_by(desc(Lens.id)) result = await self.db.execute(stmt) return result.scalars().all() async def get_by_id(self, lens_id: int) -> Optional[Lens]: stmt = select(Lens).filter_by(id=lens_id) result = await self.db.execute(stmt) return result.scalars().first() async def create(self, lens: Lens) -> Lens: self.db.add(lens) await self.db.commit() await self.db.refresh(lens) return lens async def update(self, lens: Lens) -> Lens: await self.db.merge(lens) await self.db.commit() return lens async def delete(self, lens: Lens) -> Lens: await self.db.delete(lens) await self.db.commit() return lens