business-card-site/API/app/infrastructure/contest_carousel_photos_service.py

113 lines
4.1 KiB
Python

import os
import uuid
import aiofiles
import magic
from fastapi import HTTPException, UploadFile, status
from sqlalchemy.ext.asyncio import AsyncSession
from starlette.responses import FileResponse
from werkzeug.utils import secure_filename
from app.application.contest_carousel_photos_repository import ContestCarouselPhotosRepository
from app.domain.entities.contest_carousel_photo import ContestCarouselPhotoEntity
from app.domain.models import ContestCarouselPhoto
class ContestCarouselPhotosService:
def __init__(self, db: AsyncSession):
self.contest_carousel_photos_repository = ContestCarouselPhotosRepository(db)
async def get_photo_file_by_id(self, photo_id: int) -> FileResponse:
photo = await self.contest_carousel_photos_repository.get_by_id(photo_id)
if not photo:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Photo not found")
if not os.path.exists(photo.file_path):
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="File not found on disk")
return FileResponse(
photo.file_path,
media_type=self.get_media_type(photo.file_path), # Use file_path to infer type
filename=os.path.basename(photo.file_path), # Extract filename from path
)
async def get_photos_by_contest_id(self, contest_id: int) -> list[ContestCarouselPhotoEntity]:
photos = await self.contest_carousel_photos_repository.get_by_contest_id(contest_id)
return [
self.model_to_entity(photo)
for photo in photos
]
async def upload_photo(self, contest_id: int, file: UploadFile): # Removed 'user: User' for simplicity, add if needed
self.validate_file_type(file)
filename = self.generate_filename(file)
file_path = await self.save_file(file, filename=filename)
photo = ContestCarouselPhoto(
file_path=file_path,
number=0,
contest_id=contest_id
)
return self.model_to_entity(
await self.contest_carousel_photos_repository.create(photo)
)
async def delete_photo(self, photo_id: int):
photo = await self.contest_carousel_photos_repository.get_by_id(photo_id)
if not photo:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Photo not found")
if os.path.exists(photo.file_path):
os.remove(photo.file_path)
return self.model_to_entity(
await self.contest_carousel_photos_repository.delete(photo)
)
async def save_file(self, file: UploadFile, filename: str, upload_dir: str = "uploads/contest_carousel_photos") -> str:
os.makedirs(upload_dir, exist_ok=True)
file_path = os.path.join(upload_dir, filename)
async with aiofiles.open(file_path, 'wb') as out_file:
content = await file.read()
await out_file.write(content)
return file_path
@staticmethod
def validate_file_type(file: UploadFile):
contents = file.file.read(1024)
file.file.seek(0)
mime = magic.Magic(mime=True)
file_type = mime.from_buffer(contents)
if file_type not in ["image/jpeg", "image/png"]:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid file type. Only JPEG and PNG images are allowed.")
@staticmethod
def generate_filename(file: UploadFile):
original_filename = secure_filename(file.filename)
return f"{uuid.uuid4()}_{original_filename}"
@staticmethod
def model_to_entity(photo_model: ContestCarouselPhoto) -> ContestCarouselPhotoEntity:
return ContestCarouselPhotoEntity(
id=photo_model.id,
file_path=photo_model.file_path,
number=photo_model.number,
contest_id=photo_model.contest_id,
)
@staticmethod
def get_media_type(file_path: str) -> str:
extension = file_path.split('.')[-1].lower()
if extension in ['jpeg', 'jpg']:
return "image/jpeg"
elif extension == 'png':
return "image/png"
else:
return "application/octet-stream"