Добавлены фильтрация, пагинация и поиск. Улучшен UI. Удален useIssuesUI.js. Добавлен PaginatedLensIssuesResponseEntity.
87 lines
2.8 KiB
Python
87 lines
2.8 KiB
Python
from typing import Optional, Sequence, Tuple, Literal
|
|
|
|
from sqlalchemy import select, desc, or_, func, asc
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.orm import joinedload
|
|
|
|
from app.domain.models import LensIssue, Patient, User
|
|
|
|
|
|
class LensIssuesRepository:
|
|
def __init__(self, db: AsyncSession):
|
|
self.db = db
|
|
|
|
async def get_all(
|
|
self,
|
|
skip: int = 0,
|
|
limit: int = 10,
|
|
search: Optional[str] = None,
|
|
sort_order: Literal["asc", "desc"] = "desc",
|
|
start_date: Optional[str] = None,
|
|
end_date: Optional[str] = None
|
|
) -> Tuple[Sequence[LensIssue], int]:
|
|
stmt = (
|
|
select(LensIssue)
|
|
.options(joinedload(LensIssue.lens))
|
|
.options(joinedload(LensIssue.patient))
|
|
.options(joinedload(LensIssue.doctor))
|
|
.join(Patient)
|
|
.join(User)
|
|
)
|
|
|
|
if search:
|
|
search = f"%{search}%"
|
|
stmt = stmt.filter(
|
|
or_(
|
|
Patient.last_name.ilike(search),
|
|
Patient.first_name.ilike(search),
|
|
User.last_name.ilike(search),
|
|
User.first_name.ilike(search)
|
|
)
|
|
)
|
|
|
|
if start_date:
|
|
stmt = stmt.filter(LensIssue.issue_date >= start_date)
|
|
if end_date:
|
|
stmt = stmt.filter(LensIssue.issue_date <= end_date)
|
|
|
|
stmt = stmt.order_by(
|
|
desc(LensIssue.issue_date) if sort_order == "desc" else asc(LensIssue.issue_date)
|
|
)
|
|
|
|
count_stmt = select(func.count()).select_from(LensIssue).join(Patient).join(User)
|
|
if search:
|
|
search = f"%{search}%"
|
|
count_stmt = count_stmt.filter(
|
|
or_(
|
|
Patient.last_name.ilike(search),
|
|
Patient.first_name.ilike(search),
|
|
User.last_name.ilike(search),
|
|
User.first_name.ilike(search)
|
|
)
|
|
)
|
|
if start_date:
|
|
count_stmt = count_stmt.filter(LensIssue.issue_date >= start_date)
|
|
if end_date:
|
|
count_stmt = count_stmt.filter(LensIssue.issue_date <= end_date)
|
|
|
|
stmt = stmt.offset(skip).limit(limit)
|
|
result = await self.db.execute(stmt)
|
|
issues = result.scalars().all()
|
|
|
|
count_result = await self.db.execute(count_stmt)
|
|
total_count = count_result.scalar()
|
|
|
|
return issues, total_count
|
|
|
|
async def get_by_id(self, lens_issue_id: int) -> Optional[LensIssue]:
|
|
stmt = select(LensIssue).filter_by(id=lens_issue_id)
|
|
result = await self.db.execute(stmt)
|
|
return result.scalars().first()
|
|
|
|
async def create(self, lens_issue: LensIssue) -> LensIssue:
|
|
self.db.add(lens_issue)
|
|
await self.db.commit()
|
|
await self.db.refresh(lens_issue)
|
|
return lens_issue
|