- Обновлена логика фильтрации и сортировки линз - Добавлена возможность поиска - Добавлены переменные окружения - Удален secrets-example.yaml
115 lines
4.3 KiB
Python
115 lines
4.3 KiB
Python
from typing import Sequence, Optional, Tuple, Literal
|
|
|
|
from sqlalchemy import select, desc, or_, func, String
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.domain.models import Lens
|
|
|
|
|
|
class LensesRepository:
|
|
def __init__(self, db: AsyncSession):
|
|
self.db = db
|
|
|
|
async def get_all(
|
|
self,
|
|
skip: int = 0,
|
|
limit: int = 10,
|
|
search: str = None,
|
|
sort_order: Literal["asc", "desc"] = "desc",
|
|
tor: float = None,
|
|
diameter: float = None,
|
|
preset_refraction: float = None,
|
|
periphery_toricity: float = None,
|
|
side: str = "all",
|
|
issued: bool = None,
|
|
trial: float = None,
|
|
) -> Tuple[Sequence[Lens], int]:
|
|
stmt = select(Lens)
|
|
count_stmt = select(func.count()).select_from(Lens)
|
|
|
|
if search:
|
|
search = f"%{search}%"
|
|
stmt = stmt.filter(
|
|
or_(
|
|
Lens.tor.cast(String).ilike(search),
|
|
Lens.diameter.cast(String).ilike(search),
|
|
Lens.preset_refraction.cast(String).ilike(search),
|
|
Lens.periphery_toricity.cast(String).ilike(search),
|
|
Lens.side.cast(String).ilike(search),
|
|
Lens.fvc.cast(String).ilike(search),
|
|
Lens.trial.cast(String).ilike(search),
|
|
Lens.esa.cast(String).ilike(search),
|
|
)
|
|
)
|
|
count_stmt = count_stmt.filter(
|
|
or_(
|
|
Lens.tor.cast(String).ilike(search),
|
|
Lens.diameter.cast(String).ilike(search),
|
|
Lens.preset_refraction.cast(String).ilike(search),
|
|
Lens.periphery_toricity.cast(String).ilike(search),
|
|
Lens.side.cast(String).ilike(search),
|
|
Lens.fvc.cast(String).ilike(search),
|
|
Lens.trial.cast(String).ilike(search),
|
|
Lens.esa.cast(String).ilike(search),
|
|
)
|
|
)
|
|
|
|
if tor is not None:
|
|
stmt = stmt.filter(Lens.tor == tor)
|
|
count_stmt = count_stmt.filter(Lens.tor == tor)
|
|
if diameter is not None:
|
|
stmt = stmt.filter(Lens.diameter == diameter)
|
|
count_stmt = count_stmt.filter(Lens.diameter == diameter)
|
|
if preset_refraction is not None:
|
|
stmt = stmt.filter(Lens.preset_refraction == preset_refraction)
|
|
count_stmt = count_stmt.filter(Lens.preset_refraction == preset_refraction)
|
|
if periphery_toricity is not None:
|
|
stmt = stmt.filter(Lens.periphery_toricity == periphery_toricity)
|
|
count_stmt = count_stmt.filter(Lens.periphery_toricity == periphery_toricity)
|
|
if side != "all":
|
|
stmt = stmt.filter(Lens.side == side)
|
|
count_stmt = count_stmt.filter(Lens.side == side)
|
|
if issued is not None:
|
|
stmt = stmt.filter(Lens.issued == issued)
|
|
count_stmt = count_stmt.filter(Lens.issued == issued)
|
|
if trial is not None:
|
|
stmt = stmt.filter(Lens.trial == trial)
|
|
count_stmt = count_stmt.filter(Lens.trial == trial)
|
|
|
|
stmt = stmt.order_by(Lens.id.desc() if sort_order == "desc" else Lens.id.asc())
|
|
|
|
stmt = stmt.offset(skip).limit(limit)
|
|
result = await self.db.execute(stmt)
|
|
lenses = result.scalars().all()
|
|
|
|
count_result = await self.db.execute(count_stmt)
|
|
total_count = count_result.scalar()
|
|
|
|
return lenses, total_count
|
|
|
|
async def get_all_not_issued(self) -> Sequence[Lens]:
|
|
stmt = select(Lens).filter_by(issued=False).order_by(desc(Lens.id))
|
|
result = await self.db.execute(stmt)
|
|
return result.scalars().all()
|
|
|
|
async def get_by_id(self, lens_id: int) -> Optional[Lens]:
|
|
stmt = select(Lens).filter_by(id=lens_id)
|
|
result = await self.db.execute(stmt)
|
|
return result.scalars().first()
|
|
|
|
async def create(self, lens: Lens) -> Lens:
|
|
self.db.add(lens)
|
|
await self.db.commit()
|
|
await self.db.refresh(lens)
|
|
return lens
|
|
|
|
async def update(self, lens: Lens) -> Lens:
|
|
await self.db.merge(lens)
|
|
await self.db.commit()
|
|
return lens
|
|
|
|
async def delete(self, lens: Lens) -> Lens:
|
|
await self.db.delete(lens)
|
|
await self.db.commit()
|
|
return lens
|