psb_hack/api/app/infrastructure/solution_files_service.py

119 lines
4.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import uuid
from typing import List
import aiofiles
from fastapi import UploadFile, HTTPException
from starlette.responses import FileResponse
from sqlalchemy.ext.asyncio import AsyncSession
from werkzeug.utils import secure_filename
from app.application.solution_files_repository import SolutionFilesRepository
from app.application.solutions_repository import SolutionsRepository
from app.domain.entities.solution_files import ReadSolutionFile
from app.domain.models import SolutionFile
class SolutionFilesService:
def __init__(self, db: AsyncSession):
self.solution_files_repository = SolutionFilesRepository(db)
self.solutions_repository = SolutionsRepository(db)
async def get_file_by_id(self, file_id: int) -> FileResponse:
solution_file = await self.solution_files_repository.get_by_id(file_id)
if not solution_file:
raise HTTPException(404, "Файл с таким ID не найден")
return FileResponse(
solution_file.file_path,
media_type=self.get_media_type(solution_file.filename),
filename=os.path.basename(solution_file.filename),
)
async def get_files_list_by_solution(self, solution_id: int) -> List[ReadSolutionFile]:
solution = await self.solutions_repository.get_by_id(solution_id)
if solution is None:
raise HTTPException(404, "Лекционный материал не найден")
solution_files = await self.solution_files_repository.get_by_solution_id(solution_id)
response = []
for solution_file in solution_files:
response.append(
ReadSolutionFile.model_validate(
solution_file
)
)
return response
async def upload_file(self, solution_id: int, file: UploadFile) -> ReadSolutionFile:
solution = await self.solutions_repository.get_by_id(solution_id)
if solution is None:
raise HTTPException(404, "Лекционный материал не найден")
file_path = await self.save_file(file, f'uploads/solutions/{solution.id}')
solution_file_model = SolutionFile(
filename=file.filename,
file_path=file_path,
solution_id=solution.id,
)
solution_file_model = await self.solution_files_repository.create(solution_file_model)
return ReadSolutionFile.model_validate(solution_file_model)
async def delete_file(self, file_id: int) -> ReadSolutionFile:
solution_file = await self.solution_files_repository.get_by_id(file_id)
if solution_file is None:
raise HTTPException(404, "Файл не найден")
if not os.path.exists(solution_file.file_path):
raise HTTPException(404, "Файл не найден на диске")
if os.path.exists(solution_file.file_path):
os.remove(solution_file.file_path)
solution_file = await self.solution_files_repository.delete(solution_file)
return ReadSolutionFile.model_validate(solution_file)
async def save_file(self, file: UploadFile, upload_dir: str = 'uploads/solutions') -> str:
os.makedirs(upload_dir, exist_ok=True)
filename = self.generate_filename(file)
file_path = os.path.join(upload_dir, filename)
async with aiofiles.open(file_path, 'wb') as out_file:
content = await file.read()
await out_file.write(content)
return file_path
@staticmethod
def generate_filename(file: UploadFile) -> str:
return secure_filename(f"{uuid.uuid4()}_{file.filename}")
@staticmethod
def get_media_type(filename: str) -> str:
extension = filename.split('.')[-1].lower()
if extension in ['jpeg', 'jpg', 'png']:
return f"image/{extension}"
if extension == 'pdf':
return "application/pdf"
if extension in ['zip']:
return "application/zip"
if extension in ['doc', 'docx']:
return "application/msword"
if extension in ['xls', 'xlsx']:
return "application/vnd.ms-excel"
if extension in ['ppt', 'pptx']:
return "application/vnd.ms-powerpoint"
if extension in ['txt']:
return "text/plain"
return "application/octet-stream"