import io import subprocess import os import tarfile import datetime from typing import Any, Coroutine, Optional import aiofiles from fastapi import HTTPException, UploadFile from magic import magic from sqlalchemy.ext.asyncio import AsyncSession from starlette.responses import FileResponse from werkzeug.utils import secure_filename from app.application.backups_repository import BackupsRepository from app.domain.entities.responses.backup import BackupResponseEntity from app.domain.models import Backup, User class BackupService: def __init__( self, db: AsyncSession, db_url: str = None, app_files_dir: str = None, backup_dir: str = None, pg_dump_path: str = None ): self.backup_repository = BackupsRepository(db) self.db_url = db_url self.app_files_dir = app_files_dir self.backup_dir = backup_dir self.pg_dump_path = pg_dump_path if backup_dir: os.makedirs(backup_dir, exist_ok=True) async def get_all_backups(self) -> list[BackupResponseEntity]: backups = await self.backup_repository.get_all() return [ self.model_to_entity(backup) for backup in backups ] async def create_backup(self, user_id: int) -> BackupResponseEntity: try: timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") backup_name = f"backup_{timestamp}.tar.gz" backup_path = os.path.join(self.backup_dir, backup_name) db_dump_path = f"{os.getcwd()}/{backup_path}.sql" dump_cmd = f'"{self.pg_dump_path}" -Fc -d {self.db_url} -f "{db_dump_path}"' subprocess.run(dump_cmd, shell=True, check=True) with tarfile.open(backup_path, "w:gz") as tar: tar.add(self.app_files_dir, arcname=os.path.basename(self.app_files_dir)) tar.add(db_dump_path, arcname="db_dump.sql") backup_record = Backup( filename=backup_name, path=backup_path, user_id=user_id, ) await self.backup_repository.create(backup_record) os.remove(db_dump_path) return self.model_to_entity(backup_record) except subprocess.CalledProcessError as e: raise HTTPException(500, f"Ошибка создания бэкапа: {e}") async def get_backup_file_by_id(self, backup_id: int) -> FileResponse: backup = await self.backup_repository.get_by_id(backup_id) if not backup: raise HTTPException(404, 'Резервная копия с таким id не найдена') if not os.path.exists(backup.path): raise HTTPException(404, 'Файл не найден на диске') return FileResponse( backup.path, media_type="application/gzip", filename=backup.filename, ) async def upload_backup(self, file: UploadFile, user: User) -> BackupResponseEntity: file_bytes = await file.read() file.file.seek(0) self.validate_file_type(file) if not self.validate_backup_archive(file_bytes, expected_app_files_dir_name=os.path.basename(self.app_files_dir)): raise HTTPException(400, "Неверная структура архива резервной копии") filename = self.generate_filename(file) backup_path = os.path.join(self.backup_dir, filename) async with aiofiles.open(backup_path, 'wb') as out_file: await out_file.write(file_bytes) backup_record = Backup( filename=filename, path=backup_path, user_id=user.id, ) await self.backup_repository.create(backup_record) return self.model_to_entity(backup_record) async def delete_backup(self, backup_id: int) -> Optional[BackupResponseEntity]: backup = await self.backup_repository.get_by_id(backup_id) if not backup: raise HTTPException(404, 'Резервная копия с таким id не найдена') if not os.path.exists(backup.path): raise HTTPException(404, 'Файл не найден на диске') if os.path.exists(backup.path): os.remove(backup.path) return self.model_to_entity( await self.backup_repository.delete(backup) ) @staticmethod def generate_filename(file: UploadFile) -> str: timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") return secure_filename(f"uploaded_{timestamp}_{file.filename}") @staticmethod def validate_backup_archive(file_bytes: bytes, expected_app_files_dir_name: str) -> bool: try: with tarfile.open(fileobj=io.BytesIO(file_bytes), mode="r:gz") as tar: members = tar.getnames() if "db_dump.sql" not in members: return False if not any(name.startswith(expected_app_files_dir_name) for name in members): return False return True except Exception: return False @staticmethod def model_to_entity(backup: Backup): return BackupResponseEntity( id=backup.id, timestamp=backup.timestamp, path=backup.path, filename=backup.filename, is_by_user=backup.is_by_user, user_id=backup.user_id, ) @staticmethod def validate_file_type(file: UploadFile): mime = magic.Magic(mime=True) file_type = mime.from_buffer(file.file.read(1024)) file.file.seek(0) content = file.file.read() print(content.decode('utf-8')) if file_type not in ["application/zip", "application/gzip"]: raise HTTPException(400, "Неправильный формат файла") @staticmethod def get_media_type(filename: str) -> str: extension = filename.split('.')[-1].lower() if extension in ['zip']: return "application/zip" if extension == 'tar.gz': return "application/gzip" return "application/octet-stream"