import os import uuid from typing import Optional, Any, Coroutine import aiofiles from fastapi import HTTPException, status, UploadFile from magic import magic from sqlalchemy.ext.asyncio import AsyncSession from starlette.responses import FileResponse from werkzeug.utils import secure_filename from app.application.teams_repository import TeamsRepository from app.domain.entities.team import TeamEntity from app.domain.entities.team_logo import TeamLogoEntity from app.domain.models import Team class TeamsService: def __init__(self, db: AsyncSession): self.teams_repository = TeamsRepository(db) async def get_all_teams(self) -> list[TeamEntity]: teams = await self.teams_repository.get_all() return [ self.model_to_entity(team) for team in teams ] async def get_active_team(self) -> Optional[TeamEntity]: team = await self.teams_repository.get_active_team() if not team: return None return self.model_to_entity(team) async def create_team(self, team: TeamEntity) -> Optional[TeamEntity]: team_model = self.entity_to_model(team) await self.teams_repository.create(team_model) return self.model_to_entity(team_model) async def update_team(self, team_id: int, team: TeamEntity) -> Optional[TeamEntity]: team_model = await self.teams_repository.get_by_id(team_id) if not team_model: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Team not found") team_model.title = team.title team_model.description = team.description team_model.git_url = team.git_url await self.teams_repository.update(team_model) return self.model_to_entity(team_model) async def set_active_team(self, team_id: int) -> TeamEntity: team_model = await self.teams_repository.get_by_id(team_id) if not team_model: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Team not found") active_team = await self.teams_repository.get_active_team() if active_team: active_team.is_active = False await self.teams_repository.update(active_team) team_model.is_active = True team_model = await self.teams_repository.update(team_model) return self.model_to_entity(team_model) async def delete_team(self, team_id: int) -> Optional[TeamEntity]: team_model = await self.teams_repository.get_by_id(team_id) if not team_model: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Team not found") result = await self.teams_repository.delete(team_model) return self.model_to_entity(result) async def get_photo_file_by_team_id(self, team_id: int) -> FileResponse: team = await self.teams_repository.get_by_id(team_id) if not team: raise HTTPException(404, "Команда с таким ID не найдена") if not os.path.exists(team.logo): raise HTTPException(404, "Файл логотипа не найден") return FileResponse( team.logo, media_type=self.get_media_type(team.logo), filename=team.logo_filename, ) async def upload_photo(self, team_id: int, file: UploadFile): team = await self.teams_repository.get_by_id(team_id) self.validate_file_type(file) team.logo = await self.save_file(file) team.logo_filename = self.generate_filename(file) team = await self.teams_repository.update(team) return TeamLogoEntity( filename=team.logo_filename, file_path=team.logo, ) @staticmethod def model_to_entity(team_model: Team) -> TeamEntity: return TeamEntity( id=team_model.id, title=team_model.title, description=team_model.description, logo=team_model.logo, logo_filename=team_model.logo_filename, git_url=team_model.git_url, is_active=team_model.is_active, ) @staticmethod def entity_to_model(team_entity: TeamEntity) -> Team: team_model = Team( title=team_entity.title, description=team_entity.description, logo=team_entity.logo, logo_filename=team_entity.logo_filename, git_url=team_entity.git_url, is_active=team_entity.is_active, ) if team_entity.id: team_model.id = team_entity.id return team_model async def save_file(self, file: UploadFile, upload_dir: str = "uploads/team_logos") -> str: os.makedirs(upload_dir, exist_ok=True) filename = self.generate_filename(file) file_path = os.path.join(upload_dir, filename) async with aiofiles.open(file_path, 'wb') as out_file: content = await file.read() await out_file.write(content) return file_path @staticmethod def validate_file_type(file: UploadFile): mime = magic.Magic(mime=True) file_type = mime.from_buffer(file.file.read(1024)) file.file.seek(0) if file_type not in ["image/jpeg", "image/png"]: raise HTTPException(400, "Invalid file type") @staticmethod def generate_filename(file: UploadFile): return secure_filename(f"{uuid.uuid4()}_{file.filename}") @staticmethod def get_media_type(filename: str) -> str: extension = filename.split('.')[-1].lower() return f"image/{extension}" if extension in ['jpeg', 'jpg', 'png'] else "application/octet-stream"