visus-plus/api/app/infrastructure/backup_service.py

177 lines
6.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import datetime
import io
import os
import shutil
import subprocess
import tarfile
from typing import Optional
from fastapi_maintenance import maintenance_mode_on
import aiofiles
from fastapi import HTTPException, UploadFile
from magic import magic
from sqlalchemy.ext.asyncio import AsyncSession, AsyncEngine
from starlette.responses import FileResponse
from werkzeug.utils import secure_filename
from app.application.backups_repository import BackupsRepository
from app.domain.entities.responses.backup import BackupResponseEntity
from app.domain.models import Backup, User
class BackupService:
def __init__(
self,
db: AsyncSession,
db_url: str = None,
app_files_dir: str = None,
backup_dir: str = None,
pg_dump_path: str = None
):
self.backup_repository = BackupsRepository(db)
self.db_url = db_url
self.app_files_dir = app_files_dir
self.backup_dir = backup_dir
self.pg_dump_path = pg_dump_path
if backup_dir:
os.makedirs(backup_dir, exist_ok=True)
async def get_all_backups(self) -> list[BackupResponseEntity]:
backups = await self.backup_repository.get_all()
return [
self.model_to_entity(backup)
for backup in backups
]
async def create_backup(self, user_id: int) -> BackupResponseEntity:
try:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
backup_name = f"backup_{timestamp}.tar.gz"
backup_path = os.path.join(self.backup_dir, backup_name)
db_dump_path = f"{os.getcwd()}/{backup_path}.sql"
dump_cmd = f'"{self.pg_dump_path}" -Fc -d {self.db_url} -f "{db_dump_path}"'
subprocess.run(dump_cmd, shell=True, check=True)
with tarfile.open(backup_path, "w:gz") as tar:
tar.add(self.app_files_dir, arcname=os.path.basename(self.app_files_dir))
tar.add(db_dump_path, arcname="db_dump.sql")
backup_record = Backup(
filename=backup_name,
path=backup_path,
user_id=user_id,
)
await self.backup_repository.create(backup_record)
os.remove(db_dump_path)
return self.model_to_entity(backup_record)
except subprocess.CalledProcessError as e:
raise HTTPException(500, f"Ошибка создания бэкапа: {e}")
async def get_backup_file_by_id(self, backup_id: int) -> FileResponse:
backup = await self.backup_repository.get_by_id(backup_id)
if not backup:
raise HTTPException(404, 'Резервная копия с таким id не найдена')
if not os.path.exists(backup.path):
raise HTTPException(404, 'Файл не найден на диске')
return FileResponse(
backup.path,
media_type="application/gzip",
filename=backup.filename,
)
async def upload_backup(self, file: UploadFile, user: User) -> BackupResponseEntity:
file_bytes = await file.read()
file.file.seek(0)
self.validate_file_type(file)
if not self.validate_backup_archive(file_bytes, self.app_files_dir):
raise HTTPException(400, "Неверная структура архива резервной копии")
filename = self.generate_filename(file)
backup_path = os.path.join(self.backup_dir, filename)
async with aiofiles.open(backup_path, 'wb') as out_file:
await out_file.write(file_bytes)
backup_record = Backup(
filename=filename,
path=backup_path,
user_id=user.id,
is_by_user=True,
)
await self.backup_repository.create(backup_record)
return self.model_to_entity(backup_record)
async def delete_backup(self, backup_id: int) -> Optional[BackupResponseEntity]:
backup = await self.backup_repository.get_by_id(backup_id)
if not backup:
raise HTTPException(404, 'Резервная копия с таким id не найдена')
if not os.path.exists(backup.path):
raise HTTPException(404, 'Файл не найден на диске')
if os.path.exists(backup.path):
os.remove(backup.path)
return self.model_to_entity(
await self.backup_repository.delete(backup)
)
@staticmethod
def generate_filename(file: UploadFile) -> str:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
return secure_filename(f"uploaded_{timestamp}_{file.filename}")
@staticmethod
def validate_backup_archive(file_bytes: bytes, expected_app_files_dir_name: str) -> bool:
try:
with tarfile.open(fileobj=io.BytesIO(file_bytes), mode="r:gz") as tar:
members = tar.getnames()
if "db_dump.sql" not in members:
return False
if not any(name.startswith(expected_app_files_dir_name) for name in members):
return False
return True
except Exception as e:
return False
@staticmethod
def model_to_entity(backup: Backup):
return BackupResponseEntity(
id=backup.id,
timestamp=backup.timestamp,
path=backup.path,
filename=backup.filename,
is_by_user=backup.is_by_user,
user_id=backup.user_id,
)
@staticmethod
def validate_file_type(file: UploadFile):
mime = magic.Magic(mime=True)
file_type = mime.from_buffer(file.file.read(1024))
file.file.seek(0)
if file_type not in ["application/zip", "application/gzip", "application/x-gzip"]:
raise HTTPException(400, "Неправильный формат файла")
@staticmethod
def get_media_type(filename: str) -> str:
extension = filename.split('.')[-1].lower()
if extension in ['zip']:
return "application/zip"
if extension == 'tar.gz':
return "application/gzip"
return "application/octet-stream"