Some checks failed
CI / test (pull_request) Has been cancelled
신규/추가 테스트: - tests/test_logging_config.py: JSONFormatter, setup_logging 전체 커버 (14줄) - tests/test_strategies_base.py: BaseStrategy 추상 클래스 커버 (6줄) - tests/test_backup.py: BackupExporter 미커버 경로(빈 CSV, compress=True CSV, 포맷 실패 로깅, 기본 formats) + CloudStorage boto3 모킹 테스트 20개 (113줄) - tests/test_context.py: ContextSummarizer 전체 커버 22개 테스트 (50줄) 총 815개 테스트 통과, TOTAL 커버리지 80% (1046줄 미커버 / 5225줄 전체) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
800 lines
28 KiB
Python
800 lines
28 KiB
Python
"""Tests for backup and disaster recovery system."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import sqlite3
|
|
import sys
|
|
import tempfile
|
|
from datetime import UTC, datetime, timedelta
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from src.backup.exporter import BackupExporter, ExportFormat
|
|
from src.backup.health_monitor import HealthMonitor, HealthStatus
|
|
from src.backup.scheduler import BackupPolicy, BackupScheduler
|
|
|
|
|
|
@pytest.fixture
|
|
def temp_db(tmp_path: Path) -> Path:
|
|
"""Create a temporary test database."""
|
|
db_path = tmp_path / "test_trades.db"
|
|
|
|
conn = sqlite3.connect(str(db_path))
|
|
cursor = conn.cursor()
|
|
|
|
# Create trades table
|
|
cursor.execute("""
|
|
CREATE TABLE trades (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp TEXT NOT NULL,
|
|
stock_code TEXT NOT NULL,
|
|
action TEXT NOT NULL,
|
|
quantity INTEGER NOT NULL,
|
|
price REAL NOT NULL,
|
|
confidence INTEGER NOT NULL,
|
|
rationale TEXT,
|
|
pnl REAL DEFAULT 0.0
|
|
)
|
|
""")
|
|
|
|
# Insert test data
|
|
test_trades = [
|
|
("2024-01-01T10:00:00Z", "005930", "BUY", 10, 70000.0, 85, "Test buy", 0.0),
|
|
("2024-01-01T11:00:00Z", "005930", "SELL", 10, 71000.0, 90, "Test sell", 10000.0),
|
|
("2024-01-02T10:00:00Z", "AAPL", "BUY", 5, 180.0, 88, "Tech buy", 0.0),
|
|
]
|
|
|
|
cursor.executemany(
|
|
"""
|
|
INSERT INTO trades (timestamp, stock_code, action, quantity, price, confidence, rationale, pnl)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
test_trades,
|
|
)
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return db_path
|
|
|
|
|
|
class TestBackupExporter:
|
|
"""Test BackupExporter functionality."""
|
|
|
|
def test_exporter_init(self, temp_db: Path) -> None:
|
|
"""Test exporter initialization."""
|
|
exporter = BackupExporter(str(temp_db))
|
|
assert exporter.db_path == str(temp_db)
|
|
|
|
def test_export_json(self, temp_db: Path, tmp_path: Path) -> None:
|
|
"""Test JSON export."""
|
|
exporter = BackupExporter(str(temp_db))
|
|
output_dir = tmp_path / "exports"
|
|
|
|
results = exporter.export_all(
|
|
output_dir, formats=[ExportFormat.JSON], compress=False
|
|
)
|
|
|
|
assert ExportFormat.JSON in results
|
|
assert results[ExportFormat.JSON].exists()
|
|
assert results[ExportFormat.JSON].suffix == ".json"
|
|
|
|
def test_export_json_compressed(self, temp_db: Path, tmp_path: Path) -> None:
|
|
"""Test compressed JSON export."""
|
|
exporter = BackupExporter(str(temp_db))
|
|
output_dir = tmp_path / "exports"
|
|
|
|
results = exporter.export_all(
|
|
output_dir, formats=[ExportFormat.JSON], compress=True
|
|
)
|
|
|
|
assert ExportFormat.JSON in results
|
|
assert results[ExportFormat.JSON].suffix == ".gz"
|
|
|
|
def test_export_csv(self, temp_db: Path, tmp_path: Path) -> None:
|
|
"""Test CSV export."""
|
|
exporter = BackupExporter(str(temp_db))
|
|
output_dir = tmp_path / "exports"
|
|
|
|
results = exporter.export_all(
|
|
output_dir, formats=[ExportFormat.CSV], compress=False
|
|
)
|
|
|
|
assert ExportFormat.CSV in results
|
|
assert results[ExportFormat.CSV].exists()
|
|
|
|
# Verify CSV content
|
|
with open(results[ExportFormat.CSV], "r") as f:
|
|
lines = f.readlines()
|
|
assert len(lines) == 4 # Header + 3 rows
|
|
|
|
def test_export_all_formats(self, temp_db: Path, tmp_path: Path) -> None:
|
|
"""Test exporting all formats."""
|
|
exporter = BackupExporter(str(temp_db))
|
|
output_dir = tmp_path / "exports"
|
|
|
|
# Skip Parquet if pyarrow not available
|
|
try:
|
|
import pyarrow # noqa: F401
|
|
|
|
formats = [ExportFormat.JSON, ExportFormat.CSV, ExportFormat.PARQUET]
|
|
except ImportError:
|
|
formats = [ExportFormat.JSON, ExportFormat.CSV]
|
|
|
|
results = exporter.export_all(output_dir, formats=formats, compress=False)
|
|
|
|
for fmt in formats:
|
|
assert fmt in results
|
|
assert results[fmt].exists()
|
|
|
|
def test_incremental_export(self, temp_db: Path, tmp_path: Path) -> None:
|
|
"""Test incremental export."""
|
|
exporter = BackupExporter(str(temp_db))
|
|
output_dir = tmp_path / "exports"
|
|
|
|
# Export only trades after Jan 2
|
|
cutoff = datetime(2024, 1, 2, tzinfo=UTC)
|
|
results = exporter.export_all(
|
|
output_dir,
|
|
formats=[ExportFormat.JSON],
|
|
compress=False,
|
|
incremental_since=cutoff,
|
|
)
|
|
|
|
# Should only have 1 trade (AAPL on Jan 2)
|
|
import json
|
|
|
|
with open(results[ExportFormat.JSON], "r") as f:
|
|
data = json.load(f)
|
|
assert data["record_count"] == 1
|
|
assert data["trades"][0]["stock_code"] == "AAPL"
|
|
|
|
def test_get_export_stats(self, temp_db: Path) -> None:
|
|
"""Test export statistics."""
|
|
exporter = BackupExporter(str(temp_db))
|
|
stats = exporter.get_export_stats()
|
|
|
|
assert stats["total_trades"] == 3
|
|
assert "date_range" in stats
|
|
assert "db_size_bytes" in stats
|
|
|
|
|
|
class TestBackupScheduler:
|
|
"""Test BackupScheduler functionality."""
|
|
|
|
def test_scheduler_init(self, temp_db: Path, tmp_path: Path) -> None:
|
|
"""Test scheduler initialization."""
|
|
backup_dir = tmp_path / "backups"
|
|
scheduler = BackupScheduler(str(temp_db), backup_dir)
|
|
|
|
assert scheduler.db_path == temp_db
|
|
assert (backup_dir / "daily").exists()
|
|
assert (backup_dir / "weekly").exists()
|
|
assert (backup_dir / "monthly").exists()
|
|
|
|
def test_create_daily_backup(self, temp_db: Path, tmp_path: Path) -> None:
|
|
"""Test daily backup creation."""
|
|
backup_dir = tmp_path / "backups"
|
|
scheduler = BackupScheduler(str(temp_db), backup_dir)
|
|
|
|
metadata = scheduler.create_backup(BackupPolicy.DAILY, verify=True)
|
|
|
|
assert metadata.policy == BackupPolicy.DAILY
|
|
assert metadata.file_path.exists()
|
|
assert metadata.size_bytes > 0
|
|
assert metadata.checksum is not None
|
|
|
|
def test_create_weekly_backup(self, temp_db: Path, tmp_path: Path) -> None:
|
|
"""Test weekly backup creation."""
|
|
backup_dir = tmp_path / "backups"
|
|
scheduler = BackupScheduler(str(temp_db), backup_dir)
|
|
|
|
metadata = scheduler.create_backup(BackupPolicy.WEEKLY, verify=False)
|
|
|
|
assert metadata.policy == BackupPolicy.WEEKLY
|
|
assert metadata.file_path.exists()
|
|
assert metadata.checksum is None # verify=False
|
|
|
|
def test_list_backups(self, temp_db: Path, tmp_path: Path) -> None:
|
|
"""Test listing backups."""
|
|
backup_dir = tmp_path / "backups"
|
|
scheduler = BackupScheduler(str(temp_db), backup_dir)
|
|
|
|
scheduler.create_backup(BackupPolicy.DAILY)
|
|
scheduler.create_backup(BackupPolicy.WEEKLY)
|
|
|
|
backups = scheduler.list_backups()
|
|
assert len(backups) == 2
|
|
|
|
daily_backups = scheduler.list_backups(BackupPolicy.DAILY)
|
|
assert len(daily_backups) == 1
|
|
assert daily_backups[0].policy == BackupPolicy.DAILY
|
|
|
|
def test_cleanup_old_backups(self, temp_db: Path, tmp_path: Path) -> None:
|
|
"""Test cleanup of old backups."""
|
|
backup_dir = tmp_path / "backups"
|
|
scheduler = BackupScheduler(str(temp_db), backup_dir, daily_retention_days=0)
|
|
|
|
# Create a backup
|
|
scheduler.create_backup(BackupPolicy.DAILY)
|
|
|
|
# Cleanup should remove it (0 day retention)
|
|
removed = scheduler.cleanup_old_backups()
|
|
assert removed[BackupPolicy.DAILY] >= 1
|
|
|
|
def test_backup_stats(self, temp_db: Path, tmp_path: Path) -> None:
|
|
"""Test backup statistics."""
|
|
backup_dir = tmp_path / "backups"
|
|
scheduler = BackupScheduler(str(temp_db), backup_dir)
|
|
|
|
scheduler.create_backup(BackupPolicy.DAILY)
|
|
scheduler.create_backup(BackupPolicy.MONTHLY)
|
|
|
|
stats = scheduler.get_backup_stats()
|
|
|
|
assert stats["daily"]["count"] == 1
|
|
assert stats["monthly"]["count"] == 1
|
|
assert stats["daily"]["total_size_bytes"] > 0
|
|
|
|
def test_restore_backup(self, temp_db: Path, tmp_path: Path) -> None:
|
|
"""Test backup restoration."""
|
|
backup_dir = tmp_path / "backups"
|
|
scheduler = BackupScheduler(str(temp_db), backup_dir)
|
|
|
|
# Create backup
|
|
metadata = scheduler.create_backup(BackupPolicy.DAILY)
|
|
|
|
# Modify database
|
|
conn = sqlite3.connect(str(temp_db))
|
|
conn.execute("DELETE FROM trades")
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
# Restore
|
|
scheduler.restore_backup(metadata, verify=True)
|
|
|
|
# Verify restoration
|
|
conn = sqlite3.connect(str(temp_db))
|
|
cursor = conn.execute("SELECT COUNT(*) FROM trades")
|
|
count = cursor.fetchone()[0]
|
|
conn.close()
|
|
|
|
assert count == 3 # Original 3 trades restored
|
|
|
|
|
|
class TestHealthMonitor:
|
|
"""Test HealthMonitor functionality."""
|
|
|
|
def test_monitor_init(self, temp_db: Path, tmp_path: Path) -> None:
|
|
"""Test monitor initialization."""
|
|
backup_dir = tmp_path / "backups"
|
|
monitor = HealthMonitor(str(temp_db), backup_dir)
|
|
|
|
assert monitor.db_path == temp_db
|
|
|
|
def test_check_database_health_ok(self, temp_db: Path, tmp_path: Path) -> None:
|
|
"""Test database health check (healthy)."""
|
|
monitor = HealthMonitor(str(temp_db), tmp_path / "backups")
|
|
result = monitor.check_database_health()
|
|
|
|
assert result.status == HealthStatus.HEALTHY
|
|
assert "healthy" in result.message.lower()
|
|
assert result.details is not None
|
|
assert result.details["trade_count"] == 3
|
|
|
|
def test_check_database_health_missing(self, tmp_path: Path) -> None:
|
|
"""Test database health check (missing file)."""
|
|
non_existent = tmp_path / "missing.db"
|
|
monitor = HealthMonitor(str(non_existent), tmp_path / "backups")
|
|
result = monitor.check_database_health()
|
|
|
|
assert result.status == HealthStatus.UNHEALTHY
|
|
assert "not found" in result.message.lower()
|
|
|
|
def test_check_disk_space(self, temp_db: Path, tmp_path: Path) -> None:
|
|
"""Test disk space check."""
|
|
monitor = HealthMonitor(str(temp_db), tmp_path, min_disk_space_gb=0.001)
|
|
result = monitor.check_disk_space()
|
|
|
|
# Should be healthy with minimal requirement
|
|
assert result.status in [HealthStatus.HEALTHY, HealthStatus.DEGRADED]
|
|
assert result.details is not None
|
|
assert "free_gb" in result.details
|
|
|
|
def test_check_backup_recency_no_backups(self, temp_db: Path, tmp_path: Path) -> None:
|
|
"""Test backup recency check (no backups)."""
|
|
backup_dir = tmp_path / "backups"
|
|
backup_dir.mkdir()
|
|
(backup_dir / "daily").mkdir()
|
|
|
|
monitor = HealthMonitor(str(temp_db), backup_dir)
|
|
result = monitor.check_backup_recency()
|
|
|
|
assert result.status == HealthStatus.UNHEALTHY
|
|
assert "no" in result.message.lower()
|
|
|
|
def test_check_backup_recency_recent(self, temp_db: Path, tmp_path: Path) -> None:
|
|
"""Test backup recency check (recent backup)."""
|
|
backup_dir = tmp_path / "backups"
|
|
scheduler = BackupScheduler(str(temp_db), backup_dir)
|
|
scheduler.create_backup(BackupPolicy.DAILY)
|
|
|
|
monitor = HealthMonitor(str(temp_db), backup_dir)
|
|
result = monitor.check_backup_recency()
|
|
|
|
assert result.status == HealthStatus.HEALTHY
|
|
assert "recent" in result.message.lower()
|
|
|
|
def test_run_all_checks(self, temp_db: Path, tmp_path: Path) -> None:
|
|
"""Test running all health checks."""
|
|
backup_dir = tmp_path / "backups"
|
|
scheduler = BackupScheduler(str(temp_db), backup_dir)
|
|
scheduler.create_backup(BackupPolicy.DAILY)
|
|
|
|
monitor = HealthMonitor(str(temp_db), backup_dir, min_disk_space_gb=0.001)
|
|
checks = monitor.run_all_checks()
|
|
|
|
assert "database" in checks
|
|
assert "disk_space" in checks
|
|
assert "backup_recency" in checks
|
|
assert checks["database"].status == HealthStatus.HEALTHY
|
|
|
|
def test_get_overall_status(self, temp_db: Path, tmp_path: Path) -> None:
|
|
"""Test overall health status."""
|
|
backup_dir = tmp_path / "backups"
|
|
scheduler = BackupScheduler(str(temp_db), backup_dir)
|
|
scheduler.create_backup(BackupPolicy.DAILY)
|
|
|
|
monitor = HealthMonitor(str(temp_db), backup_dir, min_disk_space_gb=0.001)
|
|
status = monitor.get_overall_status()
|
|
|
|
assert status in [HealthStatus.HEALTHY, HealthStatus.DEGRADED]
|
|
|
|
def test_get_health_report(self, temp_db: Path, tmp_path: Path) -> None:
|
|
"""Test health report generation."""
|
|
backup_dir = tmp_path / "backups"
|
|
scheduler = BackupScheduler(str(temp_db), backup_dir)
|
|
scheduler.create_backup(BackupPolicy.DAILY)
|
|
|
|
monitor = HealthMonitor(str(temp_db), backup_dir, min_disk_space_gb=0.001)
|
|
report = monitor.get_health_report()
|
|
|
|
assert "overall_status" in report
|
|
assert "timestamp" in report
|
|
assert "checks" in report
|
|
assert len(report["checks"]) == 3
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# BackupExporter — additional coverage for previously uncovered branches
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.fixture
|
|
def empty_db(tmp_path: Path) -> Path:
|
|
"""Create a temporary database with NO trade records."""
|
|
db_path = tmp_path / "empty_trades.db"
|
|
conn = sqlite3.connect(str(db_path))
|
|
conn.execute(
|
|
"""CREATE TABLE trades (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp TEXT NOT NULL,
|
|
stock_code TEXT NOT NULL,
|
|
action TEXT NOT NULL,
|
|
quantity INTEGER NOT NULL,
|
|
price REAL NOT NULL,
|
|
confidence INTEGER NOT NULL,
|
|
rationale TEXT,
|
|
pnl REAL DEFAULT 0.0
|
|
)"""
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
return db_path
|
|
|
|
|
|
class TestBackupExporterAdditional:
|
|
"""Cover branches missed in the original TestBackupExporter suite."""
|
|
|
|
def test_export_all_default_formats(self, temp_db: Path, tmp_path: Path) -> None:
|
|
"""export_all with formats=None must default to JSON+CSV+Parquet path."""
|
|
exporter = BackupExporter(str(temp_db))
|
|
# formats=None triggers the default list assignment (line 62)
|
|
results = exporter.export_all(tmp_path / "out", formats=None, compress=False)
|
|
# JSON and CSV must always succeed; Parquet needs pyarrow
|
|
assert ExportFormat.JSON in results
|
|
assert ExportFormat.CSV in results
|
|
|
|
def test_export_all_logs_error_on_failure(
|
|
self, temp_db: Path, tmp_path: Path
|
|
) -> None:
|
|
"""export_all must log an error and continue when one format fails."""
|
|
exporter = BackupExporter(str(temp_db))
|
|
# Patch _export_format to raise on JSON, succeed on CSV
|
|
original = exporter._export_format
|
|
|
|
def failing_export(fmt, *args, **kwargs): # type: ignore[no-untyped-def]
|
|
if fmt == ExportFormat.JSON:
|
|
raise RuntimeError("simulated failure")
|
|
return original(fmt, *args, **kwargs)
|
|
|
|
exporter._export_format = failing_export # type: ignore[method-assign]
|
|
results = exporter.export_all(
|
|
tmp_path / "out",
|
|
formats=[ExportFormat.JSON, ExportFormat.CSV],
|
|
compress=False,
|
|
)
|
|
# JSON failed → not in results; CSV succeeded → in results
|
|
assert ExportFormat.JSON not in results
|
|
assert ExportFormat.CSV in results
|
|
|
|
def test_export_csv_empty_trades_no_compress(
|
|
self, empty_db: Path, tmp_path: Path
|
|
) -> None:
|
|
"""CSV export with no trades and compress=False must write header row only."""
|
|
exporter = BackupExporter(str(empty_db))
|
|
results = exporter.export_all(
|
|
tmp_path / "out",
|
|
formats=[ExportFormat.CSV],
|
|
compress=False,
|
|
)
|
|
assert ExportFormat.CSV in results
|
|
out = results[ExportFormat.CSV]
|
|
assert out.exists()
|
|
content = out.read_text()
|
|
assert "timestamp" in content
|
|
|
|
def test_export_csv_empty_trades_compressed(
|
|
self, empty_db: Path, tmp_path: Path
|
|
) -> None:
|
|
"""CSV export with no trades and compress=True must write gzipped header."""
|
|
import gzip
|
|
|
|
exporter = BackupExporter(str(empty_db))
|
|
results = exporter.export_all(
|
|
tmp_path / "out",
|
|
formats=[ExportFormat.CSV],
|
|
compress=True,
|
|
)
|
|
assert ExportFormat.CSV in results
|
|
out = results[ExportFormat.CSV]
|
|
assert out.suffix == ".gz"
|
|
with gzip.open(out, "rt", encoding="utf-8") as f:
|
|
content = f.read()
|
|
assert "timestamp" in content
|
|
|
|
def test_export_csv_with_data_compressed(
|
|
self, temp_db: Path, tmp_path: Path
|
|
) -> None:
|
|
"""CSV export with data and compress=True must write gzipped rows."""
|
|
import gzip
|
|
|
|
exporter = BackupExporter(str(temp_db))
|
|
results = exporter.export_all(
|
|
tmp_path / "out",
|
|
formats=[ExportFormat.CSV],
|
|
compress=True,
|
|
)
|
|
assert ExportFormat.CSV in results
|
|
out = results[ExportFormat.CSV]
|
|
with gzip.open(out, "rt", encoding="utf-8") as f:
|
|
lines = f.readlines()
|
|
# Header + 3 data rows
|
|
assert len(lines) == 4
|
|
|
|
def test_export_parquet_raises_import_error_without_pyarrow(
|
|
self, temp_db: Path, tmp_path: Path
|
|
) -> None:
|
|
"""Parquet export must raise ImportError when pyarrow is not installed."""
|
|
exporter = BackupExporter(str(temp_db))
|
|
with patch.dict(sys.modules, {"pyarrow": None, "pyarrow.parquet": None}):
|
|
try:
|
|
import pyarrow # noqa: F401
|
|
pytest.skip("pyarrow is installed; cannot test ImportError path")
|
|
except ImportError:
|
|
pass
|
|
results = exporter.export_all(
|
|
tmp_path / "out",
|
|
formats=[ExportFormat.PARQUET],
|
|
compress=False,
|
|
)
|
|
# Parquet export fails gracefully; result dict should not contain it
|
|
assert ExportFormat.PARQUET not in results
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CloudStorage — mocked boto3 tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_boto3_module():
|
|
"""Inject a fake boto3 into sys.modules for the duration of the test."""
|
|
mock = MagicMock()
|
|
with patch.dict(sys.modules, {"boto3": mock}):
|
|
yield mock
|
|
|
|
|
|
@pytest.fixture
|
|
def s3_config():
|
|
"""Minimal S3Config for tests."""
|
|
from src.backup.cloud_storage import S3Config
|
|
|
|
return S3Config(
|
|
endpoint_url="http://localhost:9000",
|
|
access_key="minioadmin",
|
|
secret_key="minioadmin",
|
|
bucket_name="test-bucket",
|
|
region="us-east-1",
|
|
)
|
|
|
|
|
|
class TestCloudStorage:
|
|
"""Test CloudStorage using mocked boto3."""
|
|
|
|
def test_init_creates_s3_client(self, mock_boto3_module, s3_config) -> None:
|
|
"""CloudStorage.__init__ must call boto3.client with the correct args."""
|
|
from src.backup.cloud_storage import CloudStorage
|
|
|
|
storage = CloudStorage(s3_config)
|
|
mock_boto3_module.client.assert_called_once()
|
|
call_kwargs = mock_boto3_module.client.call_args[1]
|
|
assert call_kwargs["aws_access_key_id"] == "minioadmin"
|
|
assert call_kwargs["aws_secret_access_key"] == "minioadmin"
|
|
assert storage.config == s3_config
|
|
|
|
def test_init_raises_if_boto3_missing(self, s3_config) -> None:
|
|
"""CloudStorage.__init__ must raise ImportError when boto3 is absent."""
|
|
with patch.dict(sys.modules, {"boto3": None}): # type: ignore[dict-item]
|
|
with pytest.raises((ImportError, TypeError)):
|
|
# Re-import to trigger the try/except inside __init__
|
|
import importlib
|
|
|
|
import src.backup.cloud_storage as m
|
|
|
|
importlib.reload(m)
|
|
m.CloudStorage(s3_config)
|
|
|
|
def test_upload_file_success(
|
|
self, mock_boto3_module, s3_config, tmp_path: Path
|
|
) -> None:
|
|
"""upload_file must call client.upload_file and return the object key."""
|
|
from src.backup.cloud_storage import CloudStorage
|
|
|
|
test_file = tmp_path / "backup.json.gz"
|
|
test_file.write_bytes(b"data")
|
|
|
|
storage = CloudStorage(s3_config)
|
|
key = storage.upload_file(test_file, object_key="backups/backup.json.gz")
|
|
|
|
assert key == "backups/backup.json.gz"
|
|
storage.client.upload_file.assert_called_once()
|
|
|
|
def test_upload_file_default_key(
|
|
self, mock_boto3_module, s3_config, tmp_path: Path
|
|
) -> None:
|
|
"""upload_file without object_key must use the filename as key."""
|
|
from src.backup.cloud_storage import CloudStorage
|
|
|
|
test_file = tmp_path / "myfile.gz"
|
|
test_file.write_bytes(b"data")
|
|
|
|
storage = CloudStorage(s3_config)
|
|
key = storage.upload_file(test_file)
|
|
|
|
assert key == "myfile.gz"
|
|
|
|
def test_upload_file_not_found(
|
|
self, mock_boto3_module, s3_config, tmp_path: Path
|
|
) -> None:
|
|
"""upload_file must raise FileNotFoundError for missing files."""
|
|
from src.backup.cloud_storage import CloudStorage
|
|
|
|
storage = CloudStorage(s3_config)
|
|
with pytest.raises(FileNotFoundError):
|
|
storage.upload_file(tmp_path / "nonexistent.gz")
|
|
|
|
def test_upload_file_propagates_client_error(
|
|
self, mock_boto3_module, s3_config, tmp_path: Path
|
|
) -> None:
|
|
"""upload_file must re-raise exceptions from the boto3 client."""
|
|
from src.backup.cloud_storage import CloudStorage
|
|
|
|
test_file = tmp_path / "backup.gz"
|
|
test_file.write_bytes(b"data")
|
|
|
|
storage = CloudStorage(s3_config)
|
|
storage.client.upload_file.side_effect = RuntimeError("network error")
|
|
|
|
with pytest.raises(RuntimeError, match="network error"):
|
|
storage.upload_file(test_file)
|
|
|
|
def test_download_file_success(
|
|
self, mock_boto3_module, s3_config, tmp_path: Path
|
|
) -> None:
|
|
"""download_file must call client.download_file and return local path."""
|
|
from src.backup.cloud_storage import CloudStorage
|
|
|
|
storage = CloudStorage(s3_config)
|
|
dest = tmp_path / "downloads" / "backup.gz"
|
|
|
|
result = storage.download_file("backups/backup.gz", dest)
|
|
|
|
assert result == dest
|
|
storage.client.download_file.assert_called_once()
|
|
|
|
def test_download_file_propagates_error(
|
|
self, mock_boto3_module, s3_config, tmp_path: Path
|
|
) -> None:
|
|
"""download_file must re-raise exceptions from the boto3 client."""
|
|
from src.backup.cloud_storage import CloudStorage
|
|
|
|
storage = CloudStorage(s3_config)
|
|
storage.client.download_file.side_effect = RuntimeError("timeout")
|
|
|
|
with pytest.raises(RuntimeError, match="timeout"):
|
|
storage.download_file("key", tmp_path / "dest.gz")
|
|
|
|
def test_list_files_returns_objects(
|
|
self, mock_boto3_module, s3_config
|
|
) -> None:
|
|
"""list_files must return parsed file metadata from S3 response."""
|
|
from datetime import timezone
|
|
|
|
from src.backup.cloud_storage import CloudStorage
|
|
|
|
storage = CloudStorage(s3_config)
|
|
storage.client.list_objects_v2.return_value = {
|
|
"Contents": [
|
|
{
|
|
"Key": "backups/a.gz",
|
|
"Size": 1024,
|
|
"LastModified": datetime(2026, 1, 1, tzinfo=timezone.utc),
|
|
"ETag": '"abc123"',
|
|
}
|
|
]
|
|
}
|
|
|
|
files = storage.list_files(prefix="backups/")
|
|
assert len(files) == 1
|
|
assert files[0]["key"] == "backups/a.gz"
|
|
assert files[0]["size_bytes"] == 1024
|
|
|
|
def test_list_files_empty_bucket(
|
|
self, mock_boto3_module, s3_config
|
|
) -> None:
|
|
"""list_files must return empty list when bucket has no objects."""
|
|
from src.backup.cloud_storage import CloudStorage
|
|
|
|
storage = CloudStorage(s3_config)
|
|
storage.client.list_objects_v2.return_value = {}
|
|
|
|
files = storage.list_files()
|
|
assert files == []
|
|
|
|
def test_list_files_propagates_error(
|
|
self, mock_boto3_module, s3_config
|
|
) -> None:
|
|
"""list_files must re-raise exceptions from the boto3 client."""
|
|
from src.backup.cloud_storage import CloudStorage
|
|
|
|
storage = CloudStorage(s3_config)
|
|
storage.client.list_objects_v2.side_effect = RuntimeError("auth error")
|
|
|
|
with pytest.raises(RuntimeError):
|
|
storage.list_files()
|
|
|
|
def test_delete_file_success(
|
|
self, mock_boto3_module, s3_config
|
|
) -> None:
|
|
"""delete_file must call client.delete_object with the correct key."""
|
|
from src.backup.cloud_storage import CloudStorage
|
|
|
|
storage = CloudStorage(s3_config)
|
|
storage.delete_file("backups/old.gz")
|
|
storage.client.delete_object.assert_called_once_with(
|
|
Bucket="test-bucket", Key="backups/old.gz"
|
|
)
|
|
|
|
def test_delete_file_propagates_error(
|
|
self, mock_boto3_module, s3_config
|
|
) -> None:
|
|
"""delete_file must re-raise exceptions from the boto3 client."""
|
|
from src.backup.cloud_storage import CloudStorage
|
|
|
|
storage = CloudStorage(s3_config)
|
|
storage.client.delete_object.side_effect = RuntimeError("permission denied")
|
|
|
|
with pytest.raises(RuntimeError):
|
|
storage.delete_file("backups/old.gz")
|
|
|
|
def test_get_storage_stats_success(
|
|
self, mock_boto3_module, s3_config
|
|
) -> None:
|
|
"""get_storage_stats must aggregate file sizes correctly."""
|
|
from datetime import timezone
|
|
|
|
from src.backup.cloud_storage import CloudStorage
|
|
|
|
storage = CloudStorage(s3_config)
|
|
storage.client.list_objects_v2.return_value = {
|
|
"Contents": [
|
|
{
|
|
"Key": "a.gz",
|
|
"Size": 1024 * 1024,
|
|
"LastModified": datetime(2026, 1, 1, tzinfo=timezone.utc),
|
|
"ETag": '"x"',
|
|
},
|
|
{
|
|
"Key": "b.gz",
|
|
"Size": 1024 * 1024,
|
|
"LastModified": datetime(2026, 1, 2, tzinfo=timezone.utc),
|
|
"ETag": '"y"',
|
|
},
|
|
]
|
|
}
|
|
|
|
stats = storage.get_storage_stats()
|
|
assert stats["total_files"] == 2
|
|
assert stats["total_size_bytes"] == 2 * 1024 * 1024
|
|
assert stats["total_size_mb"] == pytest.approx(2.0)
|
|
|
|
def test_get_storage_stats_on_error(
|
|
self, mock_boto3_module, s3_config
|
|
) -> None:
|
|
"""get_storage_stats must return error dict without raising on failure."""
|
|
from src.backup.cloud_storage import CloudStorage
|
|
|
|
storage = CloudStorage(s3_config)
|
|
storage.client.list_objects_v2.side_effect = RuntimeError("no connection")
|
|
|
|
stats = storage.get_storage_stats()
|
|
assert "error" in stats
|
|
assert stats["total_files"] == 0
|
|
|
|
def test_verify_connection_success(
|
|
self, mock_boto3_module, s3_config
|
|
) -> None:
|
|
"""verify_connection must return True when head_bucket succeeds."""
|
|
from src.backup.cloud_storage import CloudStorage
|
|
|
|
storage = CloudStorage(s3_config)
|
|
result = storage.verify_connection()
|
|
assert result is True
|
|
|
|
def test_verify_connection_failure(
|
|
self, mock_boto3_module, s3_config
|
|
) -> None:
|
|
"""verify_connection must return False when head_bucket raises."""
|
|
from src.backup.cloud_storage import CloudStorage
|
|
|
|
storage = CloudStorage(s3_config)
|
|
storage.client.head_bucket.side_effect = RuntimeError("no such bucket")
|
|
|
|
result = storage.verify_connection()
|
|
assert result is False
|
|
|
|
def test_enable_versioning(
|
|
self, mock_boto3_module, s3_config
|
|
) -> None:
|
|
"""enable_versioning must call put_bucket_versioning."""
|
|
from src.backup.cloud_storage import CloudStorage
|
|
|
|
storage = CloudStorage(s3_config)
|
|
storage.enable_versioning()
|
|
storage.client.put_bucket_versioning.assert_called_once()
|
|
|
|
def test_enable_versioning_propagates_error(
|
|
self, mock_boto3_module, s3_config
|
|
) -> None:
|
|
"""enable_versioning must re-raise exceptions from the boto3 client."""
|
|
from src.backup.cloud_storage import CloudStorage
|
|
|
|
storage = CloudStorage(s3_config)
|
|
storage.client.put_bucket_versioning.side_effect = RuntimeError("denied")
|
|
|
|
with pytest.raises(RuntimeError):
|
|
storage.enable_versioning()
|