visus-plus/api/app/application/lenses_repository.py

113 lines
4.1 KiB
Python

from typing import Sequence, Optional, Tuple, Literal
from sqlalchemy import select, desc, or_, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.domain.models import Lens
class LensesRepository:
def __init__(self, db: AsyncSession):
self.db = db
async def get_all(
self,
skip: int = 0,
limit: int = 10,
search: str = None,
sort_order: Literal["asc", "desc"] = "desc",
tor: float = None,
diameter: float = None,
preset_refraction: float = None,
periphery_toricity: float = None,
side: str = "all",
issued: bool = None,
trial: float = None,
) -> Tuple[Sequence[Lens], int]:
stmt = select(Lens)
count_stmt = select(func.count()).select_from(Lens)
if search:
search = f"%{search}%"
stmt = stmt.filter(
or_(
Lens.tor.cast(str).ilike(search),
Lens.diameter.cast(str).ilike(search),
Lens.preset_refraction.cast(str).ilike(search),
Lens.periphery_toricity.cast(str).ilike(search),
Lens.side.ilike(search),
Lens.fvc.cast(str).ilike(search),
Lens.trial.cast(str).ilike(search),
)
)
count_stmt = count_stmt.filter(
or_(
Lens.tor.cast(str).ilike(search),
Lens.diameter.cast(str).ilike(search),
Lens.preset_refraction.cast(str).ilike(search),
Lens.periphery_toricity.cast(str).ilike(search),
Lens.side.ilike(search),
Lens.fvc.cast(str).ilike(search),
Lens.trial.cast(str).ilike(search),
)
)
if tor is not None:
stmt = stmt.filter(Lens.tor == tor)
count_stmt = count_stmt.filter(Lens.tor == tor)
if diameter is not None:
stmt = stmt.filter(Lens.diameter == diameter)
count_stmt = count_stmt.filter(Lens.diameter == diameter)
if preset_refraction is not None:
stmt = stmt.filter(Lens.preset_refraction == preset_refraction)
count_stmt = count_stmt.filter(Lens.preset_refraction == preset_refraction)
if periphery_toricity is not None:
stmt = stmt.filter(Lens.periphery_toricity == periphery_toricity)
count_stmt = count_stmt.filter(Lens.periphery_toricity == periphery_toricity)
if side != "all":
stmt = stmt.filter(Lens.side == side)
count_stmt = count_stmt.filter(Lens.side == side)
if issued is not None:
stmt = stmt.filter(Lens.issued == issued)
count_stmt = count_stmt.filter(Lens.issued == issued)
if trial is not None:
stmt = stmt.filter(Lens.trial == trial)
count_stmt = count_stmt.filter(Lens.trial == trial)
stmt = stmt.order_by(Lens.id.desc() if sort_order == "desc" else Lens.id.asc())
stmt = stmt.offset(skip).limit(limit)
result = await self.db.execute(stmt)
lenses = result.scalars().all()
count_result = await self.db.execute(count_stmt)
total_count = count_result.scalar()
return lenses, total_count
async def get_all_not_issued(self) -> Sequence[Lens]:
stmt = select(Lens).filter_by(issued=False).order_by(desc(Lens.id))
result = await self.db.execute(stmt)
return result.scalars().all()
async def get_by_id(self, lens_id: int) -> Optional[Lens]:
stmt = select(Lens).filter_by(id=lens_id)
result = await self.db.execute(stmt)
return result.scalars().first()
async def create(self, lens: Lens) -> Lens:
self.db.add(lens)
await self.db.commit()
await self.db.refresh(lens)
return lens
async def update(self, lens: Lens) -> Lens:
await self.db.merge(lens)
await self.db.commit()
return lens
async def delete(self, lens: Lens) -> Lens:
await self.db.delete(lens)
await self.db.commit()
return lens