business-card-site/API/app/infrastructure/contest_files_service.py

134 lines
5.0 KiB
Python

import os
import uuid
import aiofiles
import magic
from fastapi import HTTPException, UploadFile
from sqlalchemy.ext.asyncio import AsyncSession
from starlette.responses import FileResponse
from werkzeug.utils import secure_filename
from app.application.contest_files_repository import ContestFilesRepository
from app.application.contests_repository import ContestsRepository
from app.domain.entities.contest_file import ContestFileEntity
from app.domain.models import ContestFile, User
class ContestFilesService:
def __init__(self, db: AsyncSession):
self.contest_files_repository = ContestFilesRepository(db)
self.contests_repository = ContestsRepository(db)
async def get_file_by_id(self, file_id: int) -> FileResponse:
contest_file = await self.contest_files_repository.get_by_id(file_id)
if not contest_file:
raise HTTPException(404, "Файл не найден")
if not os.path.exists(contest_file.file_path):
raise HTTPException(404, "Файл не найден на диске")
return FileResponse(
contest_file.file_path,
media_type=self.get_media_type(contest_file.file_path),
filename=os.path.basename(contest_file.file_path),
)
async def get_files_by_contest_id(self, contest_id: int) -> list[ContestFileEntity]:
files = await self.contest_files_repository.get_by_contest_id(contest_id)
return [self.model_to_entity(file) for file in files]
async def upload_file(self, contest_id: int, file: UploadFile, user: User) -> ContestFileEntity:
contest = await self.contests_repository.get_by_id(contest_id)
if not contest:
raise HTTPException(404, "Конкурс не найден")
self.validate_file_type(file)
file_path = await self.save_file(file)
contest_file = ContestFile(
filename=file.filename,
file_path=file_path,
contest_id=contest_id,
)
return self.model_to_entity(
await self.contest_files_repository.create(contest_file)
)
async def delete_file(self, file_id: int, user: User) -> ContestFileEntity:
contest_file = await self.contest_files_repository.get_by_id(file_id)
if not contest_file:
raise HTTPException(404, "Файл не найден")
if os.path.exists(contest_file.file_path):
os.remove(contest_file.file_path)
return self.model_to_entity(
await self.contest_files_repository.delete(contest_file)
)
async def save_file(self, file: UploadFile, upload_dir: str = "uploads/contest_files") -> str:
os.makedirs(upload_dir, exist_ok=True)
filename = self.generate_filename(file)
file_path = os.path.join(upload_dir, filename)
async with aiofiles.open(file_path, 'wb') as out_file:
content = await file.read()
await out_file.write(content)
return file_path
@staticmethod
def validate_file_type(file: UploadFile):
mime = magic.Magic(mime=True)
file_type = mime.from_buffer(file.file.read(1024))
file.file.seek(0)
allowed_types = [
"application/pdf",
"image/jpeg",
"image/png",
"application/zip",
"application/msword",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"application/vnd.ms-excel",
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"application/vnd.ms-powerpoint",
"application/vnd.openxmlformats-officedocument.presentationml.presentation",
"text/plain",
]
if file_type not in allowed_types:
raise HTTPException(400, f"Недопустимый тип файла: {file_type}")
@staticmethod
def generate_filename(file: UploadFile) -> str:
return secure_filename(f"{uuid.uuid4()}_{file.filename}")
@staticmethod
def model_to_entity(contest_file_model: ContestFile) -> ContestFileEntity:
return ContestFileEntity(
id=contest_file_model.id,
filename=contest_file_model.filename,
file_path=contest_file_model.file_path,
contest_id=contest_file_model.contest_id,
)
@staticmethod
def get_media_type(filename: str) -> str:
extension = filename.split('.')[-1].lower()
if extension in ['jpeg', 'jpg', 'png']:
return f"image/{extension}"
if extension == 'pdf':
return "application/pdf"
if extension in ['zip']:
return "application/zip"
if extension in ['doc', 'docx']:
return "application/msword"
if extension in ['xls', 'xlsx']:
return "application/vnd.ms-excel"
if extension in ['ppt', 'pptx']:
return "application/vnd.ms-powerpoint"
if extension in ['txt']:
return "text/plain"
return "application/octet-stream"