visus-plus/api/app/infrastructure/backup_service.py

108 lines
3.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import subprocess
import os
import tarfile
import datetime
from typing import Any, Coroutine, Optional
from fastapi import HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from starlette.responses import FileResponse
from app.application.backups_repository import BackupsRepository
from app.domain.entities.responses.backup import BackupResponseEntity
from app.domain.models import Backup
class BackupService:
def __init__(
self,
db: AsyncSession,
db_url: str = None,
app_files_dir: str = None,
backup_dir: str = None,
pg_dump_path: str = None
):
self.backup_repository = BackupsRepository(db)
self.db_url = db_url
self.app_files_dir = app_files_dir
self.backup_dir = backup_dir
self.pg_dump_path = pg_dump_path
if backup_dir:
os.makedirs(backup_dir, exist_ok=True)
async def get_all_backups(self) -> list[BackupResponseEntity]:
backups = await self.backup_repository.get_all()
return [
self.model_to_entity(backup)
for backup in backups
]
async def create_backup(self, user_id: int) -> BackupResponseEntity:
try:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
backup_name = f"backup_{timestamp}.tar.gz"
backup_path = os.path.join(self.backup_dir, backup_name)
db_dump_path = f"{os.getcwd()}/{backup_path}.sql"
dump_cmd = f'"{self.pg_dump_path}" -Fc -d {self.db_url} -f "{db_dump_path}"'
subprocess.run(dump_cmd, shell=True, check=True)
with tarfile.open(backup_path, "w:gz") as tar:
tar.add(self.app_files_dir, arcname=os.path.basename(self.app_files_dir))
tar.add(db_dump_path, arcname="db_dump.sql")
backup_record = Backup(
filename=backup_name,
path=backup_path,
user_id=user_id,
)
await self.backup_repository.create(backup_record)
os.remove(db_dump_path)
return self.model_to_entity(backup_record)
except subprocess.CalledProcessError as e:
raise HTTPException(500, f"Ошибка создания бэкапа: {e}")
async def get_backup_file_by_id(self, backup_id: int) -> FileResponse:
backup = await self.backup_repository.get_by_id(backup_id)
if not backup:
raise HTTPException(404, 'Резервная копия с таким id не найдена')
if not os.path.exists(backup.path):
raise HTTPException(404, 'Файл не найден на диске')
return FileResponse(
backup.path,
media_type="application/gzip",
filename=backup.filename,
)
async def delete_backup(self, backup_id: int) -> Optional[BackupResponseEntity]:
backup = await self.backup_repository.get_by_id(backup_id)
if not backup:
raise HTTPException(404, 'Резервная копия с таким id не найдена')
if not os.path.exists(backup.path):
raise HTTPException(404, 'Файл не найден на диске')
if os.path.exists(backup.path):
os.remove(backup.path)
return self.model_to_entity(
await self.backup_repository.delete(backup)
)
@staticmethod
def model_to_entity(backup: Backup):
return BackupResponseEntity(
id=backup.id,
timestamp=backup.timestamp,
path=backup.path,
filename=backup.filename,
user_id=backup.user_id,
)