import datetime import io import os import shutil import subprocess import tarfile from typing import Optional from fastapi_maintenance import maintenance_mode_on import aiofiles from fastapi import HTTPException, UploadFile from magic import magic from sqlalchemy.ext.asyncio import AsyncSession, AsyncEngine from starlette.responses import FileResponse from werkzeug.utils import secure_filename from app.application.backups_repository import BackupsRepository from app.domain.entities.responses.backup import BackupResponseEntity from app.domain.models import Backup, User class BackupService: def __init__( self, db: AsyncSession, db_url: str = None, app_files_dir: str = None, backup_dir: str = None, pg_dump_path: str = None ): self.backup_repository = BackupsRepository(db) self.db_url = db_url self.app_files_dir = app_files_dir self.backup_dir = backup_dir self.pg_dump_path = pg_dump_path if backup_dir: os.makedirs(backup_dir, exist_ok=True) async def get_all_backups(self) -> list[BackupResponseEntity]: backups = await self.backup_repository.get_all() return [ self.model_to_entity(backup) for backup in backups ] async def create_backup(self, user_id: int) -> BackupResponseEntity: try: timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") backup_name = f"backup_{timestamp}.tar.gz" backup_path = os.path.join(self.backup_dir, backup_name) db_dump_path = f"{os.getcwd()}/{backup_path}.sql" dump_cmd = f'"{self.pg_dump_path}" -Fc -d {self.db_url} -f "{db_dump_path}"' subprocess.run(dump_cmd, shell=True, check=True) with tarfile.open(backup_path, "w:gz") as tar: tar.add(self.app_files_dir, arcname=os.path.basename(self.app_files_dir)) tar.add(db_dump_path, arcname="db_dump.sql") backup_record = Backup( filename=backup_name, path=backup_path, user_id=user_id, ) await self.backup_repository.create(backup_record) os.remove(db_dump_path) return self.model_to_entity(backup_record) except subprocess.CalledProcessError as e: raise HTTPException(500, f"Ошибка создания бэкапа: {e}") async def restore_backup(self, backup_id: int, engine: AsyncEngine) -> BackupResponseEntity: try: async with maintenance_mode_on(): backup = await self.backup_repository.get_by_id(backup_id) if not backup: raise HTTPException(404, 'Резервная копия с таким id не найдена') if not os.path.exists(backup.path): raise HTTPException(404, 'Файл не найден на диске') with tarfile.open(backup.path, "r:gz") as tar: members = tar.getnames() if "db_dump.sql" not in members or not any( name.startswith(os.path.basename(self.app_files_dir)) for name in members): raise HTTPException(400, "Неверная структура архива резервной копии") if os.path.exists(self.app_files_dir): shutil.rmtree(self.app_files_dir) os.makedirs(self.app_files_dir, exist_ok=True) await engine.dispose() psql_path = self.pg_dump_path.replace("pg_dump", "psql") drop_cmd = ( f'"{psql_path}" ' f'-d "{self.db_url}" ' f'-c "DROP SCHEMA public CASCADE; CREATE SCHEMA public;"' ) subprocess.run(drop_cmd, shell=True, check=True) temp_dir = os.path.join(self.backup_dir, f"temp_restore_{backup_id}") os.makedirs(temp_dir, exist_ok=True) with tarfile.open(backup.path, "r:gz") as tar: tar.extractall(temp_dir) db_dump_path = os.path.join(temp_dir, "db_dump.sql") restore_cmd = f'"{self.pg_dump_path.replace("pg_dump", "pg_restore")}" -d {self.db_url} --no-owner --no-privileges -Fc "{db_dump_path}"' subprocess.run(restore_cmd, shell=True, check=True) extracted_app_files_dir = os.path.join(temp_dir, os.path.basename(self.app_files_dir)) if os.path.exists(extracted_app_files_dir): shutil.copytree(extracted_app_files_dir, self.app_files_dir, dirs_exist_ok=True) shutil.rmtree(temp_dir) return self.model_to_entity(backup) except subprocess.CalledProcessError as e: raise HTTPException(500, f"Ошибка восстановления бэкапа: {e}") except Exception as e: raise HTTPException(500, f"Ошибка сервера: {str(e)}") async def get_backup_file_by_id(self, backup_id: int) -> FileResponse: backup = await self.backup_repository.get_by_id(backup_id) if not backup: raise HTTPException(404, 'Резервная копия с таким id не найдена') if not os.path.exists(backup.path): raise HTTPException(404, 'Файл не найден на диске') return FileResponse( backup.path, media_type="application/gzip", filename=backup.filename, ) async def upload_backup(self, file: UploadFile, user: User) -> BackupResponseEntity: file_bytes = await file.read() file.file.seek(0) self.validate_file_type(file) if not self.validate_backup_archive(file_bytes, self.app_files_dir): raise HTTPException(400, "Неверная структура архива резервной копии") filename = self.generate_filename(file) backup_path = os.path.join(self.backup_dir, filename) async with aiofiles.open(backup_path, 'wb') as out_file: await out_file.write(file_bytes) backup_record = Backup( filename=filename, path=backup_path, user_id=user.id, is_by_user=True, ) await self.backup_repository.create(backup_record) return self.model_to_entity(backup_record) async def delete_backup(self, backup_id: int) -> Optional[BackupResponseEntity]: backup = await self.backup_repository.get_by_id(backup_id) if not backup: raise HTTPException(404, 'Резервная копия с таким id не найдена') if not os.path.exists(backup.path): raise HTTPException(404, 'Файл не найден на диске') if os.path.exists(backup.path): os.remove(backup.path) return self.model_to_entity( await self.backup_repository.delete(backup) ) @staticmethod def generate_filename(file: UploadFile) -> str: timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") return secure_filename(f"uploaded_{timestamp}_{file.filename}") @staticmethod def validate_backup_archive(file_bytes: bytes, expected_app_files_dir_name: str) -> bool: try: with tarfile.open(fileobj=io.BytesIO(file_bytes), mode="r:gz") as tar: members = tar.getnames() if "db_dump.sql" not in members: return False if not any(name.startswith(expected_app_files_dir_name) for name in members): return False return True except Exception as e: return False @staticmethod def model_to_entity(backup: Backup): return BackupResponseEntity( id=backup.id, timestamp=backup.timestamp, path=backup.path, filename=backup.filename, is_by_user=backup.is_by_user, user_id=backup.user_id, ) @staticmethod def validate_file_type(file: UploadFile): mime = magic.Magic(mime=True) file_type = mime.from_buffer(file.file.read(1024)) file.file.seek(0) if file_type not in ["application/zip", "application/gzip", "application/x-gzip"]: raise HTTPException(400, "Неправильный формат файла") @staticmethod def get_media_type(filename: str) -> str: extension = filename.split('.')[-1].lower() if extension in ['zip']: return "application/zip" if extension == 'tar.gz': return "application/gzip" return "application/octet-stream"