добавил таблицу для запланированных приемов

This commit is contained in:
Андрей Дувакин 2025-03-16 16:05:53 +05:00
parent 42a46af783
commit 51dfe2f508
10 changed files with 269 additions and 4 deletions

View File

@ -1,4 +1,4 @@
from typing import Sequence from typing import Sequence, Optional
from sqlalchemy import select, desc from sqlalchemy import select, desc
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
@ -16,38 +16,42 @@ class AppointmentsRepository:
select(Appointment) select(Appointment)
.options(joinedload(Appointment.type)) .options(joinedload(Appointment.type))
.options(joinedload(Appointment.patient)) .options(joinedload(Appointment.patient))
.options(joinedload(Appointment.doctor))
.order_by(desc(Appointment.appointment_datetime)) .order_by(desc(Appointment.appointment_datetime))
) )
result = await self.db.execute(stmt) result = await self.db.execute(stmt)
return result.scalars().all() return result.scalars().all()
async def get_by_doctor_id(self, doctor_id: int): async def get_by_doctor_id(self, doctor_id: int) -> Sequence[Appointment]:
stmt = ( stmt = (
select(Appointment) select(Appointment)
.options(joinedload(Appointment.type)) .options(joinedload(Appointment.type))
.options(joinedload(Appointment.patient)) .options(joinedload(Appointment.patient))
.options(joinedload(Appointment.doctor))
.filter(Appointment.doctor_id == doctor_id) .filter(Appointment.doctor_id == doctor_id)
.order_by(desc(Appointment.appointment_datetime)) .order_by(desc(Appointment.appointment_datetime))
) )
result = await self.db.execute(stmt) result = await self.db.execute(stmt)
return result.scalars().all() return result.scalars().all()
async def get_by_patient_id(self, patient_id: int): async def get_by_patient_id(self, patient_id: int) -> Sequence[Appointment]:
stmt = ( stmt = (
select(Appointment) select(Appointment)
.options(joinedload(Appointment.type)) .options(joinedload(Appointment.type))
.options(joinedload(Appointment.patient)) .options(joinedload(Appointment.patient))
.options(joinedload(Appointment.doctor))
.filter(Appointment.patient_id == patient_id) .filter(Appointment.patient_id == patient_id)
.order_by(desc(Appointment.appointment_datetime)) .order_by(desc(Appointment.appointment_datetime))
) )
result = await self.db.execute(stmt) result = await self.db.execute(stmt)
return result.scalars().all() return result.scalars().all()
async def get_by_id(self, appointment_id: int): async def get_by_id(self, appointment_id: int) -> Optional[Appointment]:
stmt = ( stmt = (
select(Appointment) select(Appointment)
.options(joinedload(Appointment.type)) .options(joinedload(Appointment.type))
.options(joinedload(Appointment.patient)) .options(joinedload(Appointment.patient))
.options(joinedload(Appointment.doctor))
.filter(Appointment.id == appointment_id) .filter(Appointment.id == appointment_id)
) )
result = await self.db.execute(stmt) result = await self.db.execute(stmt)

View File

@ -0,0 +1,74 @@
from typing import Sequence
from sqlalchemy import select, desc
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload
from app.domain.models import ScheduledAppointment
class ScheduledAppointmentsRepository:
def __init__(self, db: AsyncSession):
self.db = db
async def get_all(self) -> Sequence[ScheduledAppointment]:
stmt = (
select(ScheduledAppointment)
.options(joinedload(ScheduledAppointment.type))
.options(joinedload(ScheduledAppointment.patient))
.options(joinedload(ScheduledAppointment.doctor))
.order_by(desc(ScheduledAppointment.appointment_datetime))
)
result = await self.db.execute(stmt)
return result.scalars().all()
async def get_by_doctor_id(self, doctor_id: int) -> Sequence[ScheduledAppointment]:
stmt = (
select(ScheduledAppointment)
.options(joinedload(ScheduledAppointment.type))
.options(joinedload(ScheduledAppointment.patient))
.options(joinedload(ScheduledAppointment.doctor))
.filter(ScheduledAppointment.doctor_id == doctor_id)
.order_by(desc(ScheduledAppointment.appointment_datetime))
)
result = await self.db.execute(stmt)
return result.scalars().all()
async def get_by_patient_id(self, patient_id: int) -> Sequence[ScheduledAppointment]:
stmt = (
select(ScheduledAppointment)
.options(joinedload(ScheduledAppointment.type))
.options(joinedload(ScheduledAppointment.patient))
.options(joinedload(ScheduledAppointment.doctor))
.filter(ScheduledAppointment.patient_id == patient_id)
.order_by(desc(ScheduledAppointment.appointment_datetime))
)
result = await self.db.execute(stmt)
return result.scalars().all()
async def get_by_id(self, scheduled_appointment_id: int) -> ScheduledAppointment:
stmt = (
select(ScheduledAppointment)
.options(joinedload(ScheduledAppointment.type))
.options(joinedload(ScheduledAppointment.patient))
.options(joinedload(ScheduledAppointment.doctor))
.filter(ScheduledAppointment.id == scheduled_appointment_id)
)
result = await self.db.execute(stmt)
return result.scalars().first()
async def create(self, scheduled_appointment: ScheduledAppointment) -> ScheduledAppointment:
self.db.add(scheduled_appointment)
await self.db.commit()
await self.db.refresh(scheduled_appointment)
return scheduled_appointment
async def update(self, scheduled_appointment: ScheduledAppointment) -> ScheduledAppointment:
await self.db.merge(scheduled_appointment)
await self.db.commit()
return scheduled_appointment
async def delete(self, scheduled_appointment: ScheduledAppointment) -> ScheduledAppointment:
await self.db.delete(scheduled_appointment)
await self.db.commit()
return scheduled_appointment

View File

@ -0,0 +1,42 @@
"""0003_сделал_таблицу_для_запланированных_приемов
Revision ID: 1e122f5b8727
Revises: f28580d2d60f
Create Date: 2025-03-15 10:11:07.192382
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '1e122f5b8727'
down_revision: Union[str, None] = 'f28580d2d60f'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('scheduled_appointments',
sa.Column('scheduled_datetime', sa.DateTime(), server_default=sa.text('now()'), nullable=False),
sa.Column('patient_id', sa.Integer(), nullable=False),
sa.Column('doctor_id', sa.Integer(), nullable=False),
sa.Column('type_id', sa.Integer(), nullable=False),
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False),
sa.Column('updated_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False),
sa.ForeignKeyConstraint(['doctor_id'], ['users.id'], ),
sa.ForeignKeyConstraint(['patient_id'], ['patients.id'], ),
sa.ForeignKeyConstraint(['type_id'], ['appointment_types.id'], ),
sa.PrimaryKeyConstraint('id')
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('scheduled_appointments')
# ### end Alembic commands ###

View File

@ -0,0 +1,21 @@
import datetime
from typing import Optional
from pydantic import BaseModel
from app.domain.entities.appointment_type import AppointmentTypeEntity
from app.domain.entities.patient import PatientEntity
from app.domain.entities.user import UserEntity
class ScheduledAppointmentEntity(BaseModel):
id: Optional[int] = None
scheduled_datetime: datetime.datetime
patient_id: int
doctor_id: int
type_id: int
patient: Optional[PatientEntity] = None
doctor: Optional[UserEntity] = None
type: Optional[AppointmentTypeEntity] = None

View File

@ -14,6 +14,7 @@ from app.domain.models.mailing import Mailing
from app.domain.models.patients import Patient from app.domain.models.patients import Patient
from app.domain.models.recipients import Recipient from app.domain.models.recipients import Recipient
from app.domain.models.roles import Role from app.domain.models.roles import Role
from app.domain.models.scheduled_appointments import ScheduledAppointment
from app.domain.models.set_contents import SetContent from app.domain.models.set_contents import SetContent
from app.domain.models.sets import Set from app.domain.models.sets import Set
from app.domain.models.users import User from app.domain.models.users import User

View File

@ -10,3 +10,4 @@ class AppointmentType(BaseModel):
title = Column(VARCHAR(150), nullable=False, unique=True) title = Column(VARCHAR(150), nullable=False, unique=True)
appointments = relationship('Appointment', back_populates='type') appointments = relationship('Appointment', back_populates='type')
scheduled_appointments = relationship('ScheduledAppointment', back_populates='type')

View File

@ -20,3 +20,4 @@ class Patient(BaseModel):
lens_issues = relationship('LensIssue', back_populates='patient') lens_issues = relationship('LensIssue', back_populates='patient')
appointments = relationship('Appointment', back_populates='patient') appointments = relationship('Appointment', back_populates='patient')
mailing = relationship('Recipient', back_populates='patient') mailing = relationship('Recipient', back_populates='patient')
scheduled_appointments = relationship('ScheduledAppointment', back_populates='patient')

View File

@ -0,0 +1,19 @@
from sqlalchemy import Column, Integer, ForeignKey, DateTime
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from app.domain.models.base import BaseModel
class ScheduledAppointment(BaseModel):
__tablename__ = 'scheduled_appointments'
scheduled_datetime = Column(DateTime, nullable=False, server_default=func.now())
patient_id = Column(Integer, ForeignKey('patients.id'), nullable=False)
doctor_id = Column(Integer, ForeignKey('users.id'), nullable=False)
type_id = Column(Integer, ForeignKey('appointment_types.id'), nullable=False)
patient = relationship('Patient', back_populates='scheduled_appointments')
doctor = relationship('User', back_populates='scheduled_appointments')
type = relationship('AppointmentType', back_populates='scheduled_appointments')

View File

@ -21,6 +21,7 @@ class User(BaseModel):
lens_issues = relationship('LensIssue', back_populates='doctor') lens_issues = relationship('LensIssue', back_populates='doctor')
appointments = relationship('Appointment', back_populates='doctor') appointments = relationship('Appointment', back_populates='doctor')
mailing = relationship('Mailing', back_populates='user') mailing = relationship('Mailing', back_populates='user')
scheduled_appointments = relationship('ScheduledAppointment', back_populates='doctor')
def check_password(self, password): def check_password(self, password):
return check_password_hash(self.password, password) return check_password_hash(self.password, password)

View File

@ -0,0 +1,101 @@
from typing import Optional
from fastapi import HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from starlette import status
from app.application.appointment_types_repository import AppointmentTypesRepository
from app.application.patients_repository import PatientsRepository
from app.application.scheduled_appointments_repository import ScheduledAppointmentsRepository
from app.application.users_repository import UsersRepository
from app.domain.entities.scheduled_appointment import ScheduledAppointmentEntity
from app.domain.models import ScheduledAppointment
class ScheduledAppointmentsService:
def __init__(self, db: AsyncSession):
self.scheduled_appointment_repository = ScheduledAppointmentsRepository(db)
self.appointment_types_repository = AppointmentTypesRepository(db)
self.users_repository = UsersRepository(db)
self.patients_repository = PatientsRepository(db)
async def get_all_scheduled_appointments(self) -> list[ScheduledAppointmentEntity]:
scheduled_appointments = await self.scheduled_appointment_repository.get_all()
return [
self.model_to_entity(scheduled_appointment)
for scheduled_appointment in scheduled_appointments
]
async def get_scheduled_appointments_by_doctor_id(self, doctor_id: int) -> Optional[
list[ScheduledAppointmentEntity]
]:
doctor = self.users_repository.get_by_id(doctor_id)
if not doctor:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='The doctor with this ID was not found',
)
scheduled_appointments = await self.scheduled_appointment_repository.get_by_doctor_id(doctor_id)
return [
self.model_to_entity(scheduled_appointment)
for scheduled_appointment in scheduled_appointments
]
async def get_scheduled_appointments_by_patient_id(self, patient_id: int) -> Optional[
list[ScheduledAppointmentEntity]
]:
patient = await self.patients_repository.get_by_id(patient_id)
if not patient:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='The patient with this ID was not found',
)
scheduled_appointments = await self.scheduled_appointment_repository.get_by_patient_id(patient_id)
return [
self.model_to_entity(scheduled_appointment)
for scheduled_appointment in scheduled_appointments
]
@staticmethod
def entity_to_model(scheduled_appointment: ScheduledAppointmentEntity) -> ScheduledAppointment:
scheduled_appointment_model = ScheduledAppointment(
scheduled_datetime=scheduled_appointment.scheduled_datetime,
patient_id=scheduled_appointment.patient_id,
doctor_id=scheduled_appointment.doctor_id,
type_id=scheduled_appointment.type_id,
)
if scheduled_appointment.id:
scheduled_appointment_model.id = scheduled_appointment.id
return scheduled_appointment_model
@staticmethod
def model_to_entity(scheduled_appointment: ScheduledAppointment) -> ScheduledAppointmentEntity:
scheduled_appointment_entity = ScheduledAppointmentEntity(
id=scheduled_appointment.id,
scheduled_datetime=scheduled_appointment.scheduled_datetime,
patient_id=scheduled_appointment.patient_id,
doctor_id=scheduled_appointment.doctor_id,
type_id=scheduled_appointment.type_id,
)
if scheduled_appointment.patient is not None:
scheduled_appointment_entity.patient = scheduled_appointment.patient
if scheduled_appointment.doctor is not None:
scheduled_appointment_entity.doctor = scheduled_appointment.doctor
if scheduled_appointment.type is not None:
scheduled_appointment_entity.type = scheduled_appointment.type
return scheduled_appointment_entity