134 lines
5.0 KiB
Python
134 lines
5.0 KiB
Python
import os
|
|
import uuid
|
|
|
|
import aiofiles
|
|
import magic
|
|
from fastapi import HTTPException, UploadFile
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from starlette.responses import FileResponse
|
|
from werkzeug.utils import secure_filename
|
|
|
|
from app.application.contest_files_repository import ContestFilesRepository
|
|
from app.application.contests_repository import ContestsRepository
|
|
from app.domain.entities.contest_file import ContestFileEntity
|
|
from app.domain.models import ContestFile, User
|
|
|
|
|
|
class ContestFilesService:
|
|
def __init__(self, db: AsyncSession):
|
|
self.contest_files_repository = ContestFilesRepository(db)
|
|
self.contests_repository = ContestsRepository(db)
|
|
|
|
async def get_file_by_id(self, file_id: int) -> FileResponse:
|
|
contest_file = await self.contest_files_repository.get_by_id(file_id)
|
|
|
|
if not contest_file:
|
|
raise HTTPException(404, "Файл не найден")
|
|
|
|
if not os.path.exists(contest_file.file_path):
|
|
raise HTTPException(404, "Файл не найден на диске")
|
|
|
|
return FileResponse(
|
|
contest_file.file_path,
|
|
media_type=self.get_media_type(contest_file.file_path),
|
|
filename=os.path.basename(contest_file.file_path),
|
|
)
|
|
|
|
async def get_files_by_contest_id(self, contest_id: int) -> list[ContestFileEntity]:
|
|
files = await self.contest_files_repository.get_by_contest_id(contest_id)
|
|
return [self.model_to_entity(file) for file in files]
|
|
|
|
async def upload_file(self, contest_id: int, file: UploadFile, user: User) -> ContestFileEntity:
|
|
contest = await self.contests_repository.get_by_id(contest_id)
|
|
if not contest:
|
|
raise HTTPException(404, "Конкурс не найден")
|
|
|
|
self.validate_file_type(file)
|
|
file_path = await self.save_file(file)
|
|
|
|
contest_file = ContestFile(
|
|
filename=file.filename,
|
|
file_path=file_path,
|
|
contest_id=contest_id,
|
|
)
|
|
|
|
return self.model_to_entity(
|
|
await self.contest_files_repository.create(contest_file)
|
|
)
|
|
|
|
async def delete_file(self, file_id: int, user: User) -> ContestFileEntity:
|
|
contest_file = await self.contest_files_repository.get_by_id(file_id)
|
|
if not contest_file:
|
|
raise HTTPException(404, "Файл не найден")
|
|
|
|
if os.path.exists(contest_file.file_path):
|
|
os.remove(contest_file.file_path)
|
|
|
|
return self.model_to_entity(
|
|
await self.contest_files_repository.delete(contest_file)
|
|
)
|
|
|
|
async def save_file(self, file: UploadFile, upload_dir: str = "uploads/contest_files") -> str:
|
|
os.makedirs(upload_dir, exist_ok=True)
|
|
filename = self.generate_filename(file)
|
|
file_path = os.path.join(upload_dir, filename)
|
|
|
|
async with aiofiles.open(file_path, 'wb') as out_file:
|
|
content = await file.read()
|
|
await out_file.write(content)
|
|
return file_path
|
|
|
|
@staticmethod
|
|
def validate_file_type(file: UploadFile):
|
|
mime = magic.Magic(mime=True)
|
|
file_type = mime.from_buffer(file.file.read(1024))
|
|
file.file.seek(0)
|
|
|
|
allowed_types = [
|
|
"application/pdf",
|
|
"image/jpeg",
|
|
"image/png",
|
|
"application/zip",
|
|
"application/msword",
|
|
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
|
|
"application/vnd.ms-excel",
|
|
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
|
|
"application/vnd.ms-powerpoint",
|
|
"application/vnd.openxmlformats-officedocument.presentationml.presentation",
|
|
"text/plain",
|
|
]
|
|
|
|
if file_type not in allowed_types:
|
|
raise HTTPException(400, f"Недопустимый тип файла: {file_type}")
|
|
|
|
@staticmethod
|
|
def generate_filename(file: UploadFile) -> str:
|
|
return secure_filename(f"{uuid.uuid4()}_{file.filename}")
|
|
|
|
@staticmethod
|
|
def model_to_entity(contest_file_model: ContestFile) -> ContestFileEntity:
|
|
return ContestFileEntity(
|
|
id=contest_file_model.id,
|
|
filename=contest_file_model.filename,
|
|
file_path=contest_file_model.file_path,
|
|
contest_id=contest_file_model.contest_id,
|
|
)
|
|
|
|
@staticmethod
|
|
def get_media_type(filename: str) -> str:
|
|
extension = filename.split('.')[-1].lower()
|
|
if extension in ['jpeg', 'jpg', 'png']:
|
|
return f"image/{extension}"
|
|
if extension == 'pdf':
|
|
return "application/pdf"
|
|
if extension in ['zip']:
|
|
return "application/zip"
|
|
if extension in ['doc', 'docx']:
|
|
return "application/msword"
|
|
if extension in ['xls', 'xlsx']:
|
|
return "application/vnd.ms-excel"
|
|
if extension in ['ppt', 'pptx']:
|
|
return "application/vnd.ms-powerpoint"
|
|
if extension in ['txt']:
|
|
return "text/plain"
|
|
return "application/octet-stream" |