API_logistics/app/core/usecases/city_service.py
2024-10-05 13:12:08 +05:00

63 lines
2.1 KiB
Python

from typing import List, Optional
from sqlalchemy.orm import Session
from app.core.entities.city import CityEntity
from app.infrastructure.database.models.cities import City
from app.infrastructure.database.repository.city_repository import CitiesRepository
class CitiesService:
def __init__(self, db: Session):
self.repository = CitiesRepository(db)
def get_all_cities(self) -> List[CityEntity]:
cities = self.repository.get_all_with_federal_district()
return [
CityEntity(
id=c.id,
name=c.name,
federal_district_id=c.federal_district_id,
federal_district_name=c.federal_district.name
)
for c in cities
]
def get_city_by_id(self, city_id: int) -> Optional[CityEntity]:
city = self.repository.get_by_id(city_id)
if city:
return CityEntity(
id=city.id,
name=city.name,
federal_district_id=city.federal_district_id
)
return None
def create_city(self, entity: CityEntity) -> CityEntity:
city_model = City(
name=entity.name,
federal_district_id=entity.federal_district_id
)
created_city = self.repository.create(city_model)
return CityEntity(
id=created_city.id,
name=created_city.name,
federal_district_id=created_city.federal_district_id
)
def update_city(self, city_id: int, entity: CityEntity) -> Optional[CityEntity]:
city_model = self.repository.get_by_id(city_id)
if city_model:
city_model.name = entity.name
city_model.federal_district_id = entity.federal_district_id
self.repository.update(city_model)
return CityEntity(
id=city_model.id,
name=city_model.name,
federal_district_id=city_model.federal_district_id
)
return None
def delete_city(self, city_id: int) -> bool:
return self.repository.delete(city_id) is not None