Files
The-Ouroboros/tests/test_backup.py
agentson 5730f0db2a
Some checks failed
Gitea CI / test (push) Failing after 5s
Gitea CI / test (pull_request) Failing after 5s
ci: fix lint baseline and stabilize failing main tests
2026-03-01 20:17:13 +09:00

756 lines
28 KiB
Python

"""Tests for backup and disaster recovery system."""
from __future__ import annotations
import sqlite3
import sys
from datetime import UTC, datetime
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from src.backup.exporter import BackupExporter, ExportFormat
from src.backup.health_monitor import HealthMonitor, HealthStatus
from src.backup.scheduler import BackupPolicy, BackupScheduler
@pytest.fixture
def temp_db(tmp_path: Path) -> Path:
"""Create a temporary test database."""
db_path = tmp_path / "test_trades.db"
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
# Create trades table
cursor.execute("""
CREATE TABLE trades (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
stock_code TEXT NOT NULL,
action TEXT NOT NULL,
quantity INTEGER NOT NULL,
price REAL NOT NULL,
confidence INTEGER NOT NULL,
rationale TEXT,
pnl REAL DEFAULT 0.0
)
""")
# Insert test data
test_trades = [
("2024-01-01T10:00:00Z", "005930", "BUY", 10, 70000.0, 85, "Test buy", 0.0),
("2024-01-01T11:00:00Z", "005930", "SELL", 10, 71000.0, 90, "Test sell", 10000.0),
("2024-01-02T10:00:00Z", "AAPL", "BUY", 5, 180.0, 88, "Tech buy", 0.0),
]
cursor.executemany(
"""
INSERT INTO trades (
timestamp, stock_code, action, quantity, price, confidence, rationale, pnl
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
test_trades,
)
conn.commit()
conn.close()
return db_path
class TestBackupExporter:
"""Test BackupExporter functionality."""
def test_exporter_init(self, temp_db: Path) -> None:
"""Test exporter initialization."""
exporter = BackupExporter(str(temp_db))
assert exporter.db_path == str(temp_db)
def test_export_json(self, temp_db: Path, tmp_path: Path) -> None:
"""Test JSON export."""
exporter = BackupExporter(str(temp_db))
output_dir = tmp_path / "exports"
results = exporter.export_all(output_dir, formats=[ExportFormat.JSON], compress=False)
assert ExportFormat.JSON in results
assert results[ExportFormat.JSON].exists()
assert results[ExportFormat.JSON].suffix == ".json"
def test_export_json_compressed(self, temp_db: Path, tmp_path: Path) -> None:
"""Test compressed JSON export."""
exporter = BackupExporter(str(temp_db))
output_dir = tmp_path / "exports"
results = exporter.export_all(output_dir, formats=[ExportFormat.JSON], compress=True)
assert ExportFormat.JSON in results
assert results[ExportFormat.JSON].suffix == ".gz"
def test_export_csv(self, temp_db: Path, tmp_path: Path) -> None:
"""Test CSV export."""
exporter = BackupExporter(str(temp_db))
output_dir = tmp_path / "exports"
results = exporter.export_all(output_dir, formats=[ExportFormat.CSV], compress=False)
assert ExportFormat.CSV in results
assert results[ExportFormat.CSV].exists()
# Verify CSV content
with open(results[ExportFormat.CSV]) as f:
lines = f.readlines()
assert len(lines) == 4 # Header + 3 rows
def test_export_all_formats(self, temp_db: Path, tmp_path: Path) -> None:
"""Test exporting all formats."""
exporter = BackupExporter(str(temp_db))
output_dir = tmp_path / "exports"
# Skip Parquet if pyarrow not available
try:
import pyarrow # noqa: F401
formats = [ExportFormat.JSON, ExportFormat.CSV, ExportFormat.PARQUET]
except ImportError:
formats = [ExportFormat.JSON, ExportFormat.CSV]
results = exporter.export_all(output_dir, formats=formats, compress=False)
for fmt in formats:
assert fmt in results
assert results[fmt].exists()
def test_incremental_export(self, temp_db: Path, tmp_path: Path) -> None:
"""Test incremental export."""
exporter = BackupExporter(str(temp_db))
output_dir = tmp_path / "exports"
# Export only trades after Jan 2
cutoff = datetime(2024, 1, 2, tzinfo=UTC)
results = exporter.export_all(
output_dir,
formats=[ExportFormat.JSON],
compress=False,
incremental_since=cutoff,
)
# Should only have 1 trade (AAPL on Jan 2)
import json
with open(results[ExportFormat.JSON]) as f:
data = json.load(f)
assert data["record_count"] == 1
assert data["trades"][0]["stock_code"] == "AAPL"
def test_get_export_stats(self, temp_db: Path) -> None:
"""Test export statistics."""
exporter = BackupExporter(str(temp_db))
stats = exporter.get_export_stats()
assert stats["total_trades"] == 3
assert "date_range" in stats
assert "db_size_bytes" in stats
class TestBackupScheduler:
"""Test BackupScheduler functionality."""
def test_scheduler_init(self, temp_db: Path, tmp_path: Path) -> None:
"""Test scheduler initialization."""
backup_dir = tmp_path / "backups"
scheduler = BackupScheduler(str(temp_db), backup_dir)
assert scheduler.db_path == temp_db
assert (backup_dir / "daily").exists()
assert (backup_dir / "weekly").exists()
assert (backup_dir / "monthly").exists()
def test_create_daily_backup(self, temp_db: Path, tmp_path: Path) -> None:
"""Test daily backup creation."""
backup_dir = tmp_path / "backups"
scheduler = BackupScheduler(str(temp_db), backup_dir)
metadata = scheduler.create_backup(BackupPolicy.DAILY, verify=True)
assert metadata.policy == BackupPolicy.DAILY
assert metadata.file_path.exists()
assert metadata.size_bytes > 0
assert metadata.checksum is not None
def test_create_weekly_backup(self, temp_db: Path, tmp_path: Path) -> None:
"""Test weekly backup creation."""
backup_dir = tmp_path / "backups"
scheduler = BackupScheduler(str(temp_db), backup_dir)
metadata = scheduler.create_backup(BackupPolicy.WEEKLY, verify=False)
assert metadata.policy == BackupPolicy.WEEKLY
assert metadata.file_path.exists()
assert metadata.checksum is None # verify=False
def test_list_backups(self, temp_db: Path, tmp_path: Path) -> None:
"""Test listing backups."""
backup_dir = tmp_path / "backups"
scheduler = BackupScheduler(str(temp_db), backup_dir)
scheduler.create_backup(BackupPolicy.DAILY)
scheduler.create_backup(BackupPolicy.WEEKLY)
backups = scheduler.list_backups()
assert len(backups) == 2
daily_backups = scheduler.list_backups(BackupPolicy.DAILY)
assert len(daily_backups) == 1
assert daily_backups[0].policy == BackupPolicy.DAILY
def test_cleanup_old_backups(self, temp_db: Path, tmp_path: Path) -> None:
"""Test cleanup of old backups."""
backup_dir = tmp_path / "backups"
scheduler = BackupScheduler(str(temp_db), backup_dir, daily_retention_days=0)
# Create a backup
scheduler.create_backup(BackupPolicy.DAILY)
# Cleanup should remove it (0 day retention)
removed = scheduler.cleanup_old_backups()
assert removed[BackupPolicy.DAILY] >= 1
def test_backup_stats(self, temp_db: Path, tmp_path: Path) -> None:
"""Test backup statistics."""
backup_dir = tmp_path / "backups"
scheduler = BackupScheduler(str(temp_db), backup_dir)
scheduler.create_backup(BackupPolicy.DAILY)
scheduler.create_backup(BackupPolicy.MONTHLY)
stats = scheduler.get_backup_stats()
assert stats["daily"]["count"] == 1
assert stats["monthly"]["count"] == 1
assert stats["daily"]["total_size_bytes"] > 0
def test_restore_backup(self, temp_db: Path, tmp_path: Path) -> None:
"""Test backup restoration."""
backup_dir = tmp_path / "backups"
scheduler = BackupScheduler(str(temp_db), backup_dir)
# Create backup
metadata = scheduler.create_backup(BackupPolicy.DAILY)
# Modify database
conn = sqlite3.connect(str(temp_db))
conn.execute("DELETE FROM trades")
conn.commit()
conn.close()
# Restore
scheduler.restore_backup(metadata, verify=True)
# Verify restoration
conn = sqlite3.connect(str(temp_db))
cursor = conn.execute("SELECT COUNT(*) FROM trades")
count = cursor.fetchone()[0]
conn.close()
assert count == 3 # Original 3 trades restored
class TestHealthMonitor:
"""Test HealthMonitor functionality."""
def test_monitor_init(self, temp_db: Path, tmp_path: Path) -> None:
"""Test monitor initialization."""
backup_dir = tmp_path / "backups"
monitor = HealthMonitor(str(temp_db), backup_dir)
assert monitor.db_path == temp_db
def test_check_database_health_ok(self, temp_db: Path, tmp_path: Path) -> None:
"""Test database health check (healthy)."""
monitor = HealthMonitor(str(temp_db), tmp_path / "backups")
result = monitor.check_database_health()
assert result.status == HealthStatus.HEALTHY
assert "healthy" in result.message.lower()
assert result.details is not None
assert result.details["trade_count"] == 3
def test_check_database_health_missing(self, tmp_path: Path) -> None:
"""Test database health check (missing file)."""
non_existent = tmp_path / "missing.db"
monitor = HealthMonitor(str(non_existent), tmp_path / "backups")
result = monitor.check_database_health()
assert result.status == HealthStatus.UNHEALTHY
assert "not found" in result.message.lower()
def test_check_disk_space(self, temp_db: Path, tmp_path: Path) -> None:
"""Test disk space check."""
monitor = HealthMonitor(str(temp_db), tmp_path, min_disk_space_gb=0.001)
result = monitor.check_disk_space()
# Should be healthy with minimal requirement
assert result.status in [HealthStatus.HEALTHY, HealthStatus.DEGRADED]
assert result.details is not None
assert "free_gb" in result.details
def test_check_backup_recency_no_backups(self, temp_db: Path, tmp_path: Path) -> None:
"""Test backup recency check (no backups)."""
backup_dir = tmp_path / "backups"
backup_dir.mkdir()
(backup_dir / "daily").mkdir()
monitor = HealthMonitor(str(temp_db), backup_dir)
result = monitor.check_backup_recency()
assert result.status == HealthStatus.UNHEALTHY
assert "no" in result.message.lower()
def test_check_backup_recency_recent(self, temp_db: Path, tmp_path: Path) -> None:
"""Test backup recency check (recent backup)."""
backup_dir = tmp_path / "backups"
scheduler = BackupScheduler(str(temp_db), backup_dir)
scheduler.create_backup(BackupPolicy.DAILY)
monitor = HealthMonitor(str(temp_db), backup_dir)
result = monitor.check_backup_recency()
assert result.status == HealthStatus.HEALTHY
assert "recent" in result.message.lower()
def test_run_all_checks(self, temp_db: Path, tmp_path: Path) -> None:
"""Test running all health checks."""
backup_dir = tmp_path / "backups"
scheduler = BackupScheduler(str(temp_db), backup_dir)
scheduler.create_backup(BackupPolicy.DAILY)
monitor = HealthMonitor(str(temp_db), backup_dir, min_disk_space_gb=0.001)
checks = monitor.run_all_checks()
assert "database" in checks
assert "disk_space" in checks
assert "backup_recency" in checks
assert checks["database"].status == HealthStatus.HEALTHY
def test_get_overall_status(self, temp_db: Path, tmp_path: Path) -> None:
"""Test overall health status."""
backup_dir = tmp_path / "backups"
scheduler = BackupScheduler(str(temp_db), backup_dir)
scheduler.create_backup(BackupPolicy.DAILY)
monitor = HealthMonitor(str(temp_db), backup_dir, min_disk_space_gb=0.001)
status = monitor.get_overall_status()
assert status in [HealthStatus.HEALTHY, HealthStatus.DEGRADED]
def test_get_health_report(self, temp_db: Path, tmp_path: Path) -> None:
"""Test health report generation."""
backup_dir = tmp_path / "backups"
scheduler = BackupScheduler(str(temp_db), backup_dir)
scheduler.create_backup(BackupPolicy.DAILY)
monitor = HealthMonitor(str(temp_db), backup_dir, min_disk_space_gb=0.001)
report = monitor.get_health_report()
assert "overall_status" in report
assert "timestamp" in report
assert "checks" in report
assert len(report["checks"]) == 3
# ---------------------------------------------------------------------------
# BackupExporter — additional coverage for previously uncovered branches
# ---------------------------------------------------------------------------
@pytest.fixture
def empty_db(tmp_path: Path) -> Path:
"""Create a temporary database with NO trade records."""
db_path = tmp_path / "empty_trades.db"
conn = sqlite3.connect(str(db_path))
conn.execute(
"""CREATE TABLE trades (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
stock_code TEXT NOT NULL,
action TEXT NOT NULL,
quantity INTEGER NOT NULL,
price REAL NOT NULL,
confidence INTEGER NOT NULL,
rationale TEXT,
pnl REAL DEFAULT 0.0
)"""
)
conn.commit()
conn.close()
return db_path
class TestBackupExporterAdditional:
"""Cover branches missed in the original TestBackupExporter suite."""
def test_export_all_default_formats(self, temp_db: Path, tmp_path: Path) -> None:
"""export_all with formats=None must default to JSON+CSV+Parquet path."""
exporter = BackupExporter(str(temp_db))
# formats=None triggers the default list assignment (line 62)
results = exporter.export_all(tmp_path / "out", formats=None, compress=False)
# JSON and CSV must always succeed; Parquet needs pyarrow
assert ExportFormat.JSON in results
assert ExportFormat.CSV in results
def test_export_all_logs_error_on_failure(self, temp_db: Path, tmp_path: Path) -> None:
"""export_all must log an error and continue when one format fails."""
exporter = BackupExporter(str(temp_db))
# Patch _export_format to raise on JSON, succeed on CSV
original = exporter._export_format
def failing_export(fmt, *args, **kwargs): # type: ignore[no-untyped-def]
if fmt == ExportFormat.JSON:
raise RuntimeError("simulated failure")
return original(fmt, *args, **kwargs)
exporter._export_format = failing_export # type: ignore[method-assign]
results = exporter.export_all(
tmp_path / "out",
formats=[ExportFormat.JSON, ExportFormat.CSV],
compress=False,
)
# JSON failed → not in results; CSV succeeded → in results
assert ExportFormat.JSON not in results
assert ExportFormat.CSV in results
def test_export_csv_empty_trades_no_compress(self, empty_db: Path, tmp_path: Path) -> None:
"""CSV export with no trades and compress=False must write header row only."""
exporter = BackupExporter(str(empty_db))
results = exporter.export_all(
tmp_path / "out",
formats=[ExportFormat.CSV],
compress=False,
)
assert ExportFormat.CSV in results
out = results[ExportFormat.CSV]
assert out.exists()
content = out.read_text()
assert "timestamp" in content
def test_export_csv_empty_trades_compressed(self, empty_db: Path, tmp_path: Path) -> None:
"""CSV export with no trades and compress=True must write gzipped header."""
import gzip
exporter = BackupExporter(str(empty_db))
results = exporter.export_all(
tmp_path / "out",
formats=[ExportFormat.CSV],
compress=True,
)
assert ExportFormat.CSV in results
out = results[ExportFormat.CSV]
assert out.suffix == ".gz"
with gzip.open(out, "rt", encoding="utf-8") as f:
content = f.read()
assert "timestamp" in content
def test_export_csv_with_data_compressed(self, temp_db: Path, tmp_path: Path) -> None:
"""CSV export with data and compress=True must write gzipped rows."""
import gzip
exporter = BackupExporter(str(temp_db))
results = exporter.export_all(
tmp_path / "out",
formats=[ExportFormat.CSV],
compress=True,
)
assert ExportFormat.CSV in results
out = results[ExportFormat.CSV]
with gzip.open(out, "rt", encoding="utf-8") as f:
lines = f.readlines()
# Header + 3 data rows
assert len(lines) == 4
def test_export_parquet_raises_import_error_without_pyarrow(
self, temp_db: Path, tmp_path: Path
) -> None:
"""Parquet export must raise ImportError when pyarrow is not installed."""
exporter = BackupExporter(str(temp_db))
with patch.dict(sys.modules, {"pyarrow": None, "pyarrow.parquet": None}):
try:
import pyarrow # noqa: F401
pytest.skip("pyarrow is installed; cannot test ImportError path")
except ImportError:
pass
results = exporter.export_all(
tmp_path / "out",
formats=[ExportFormat.PARQUET],
compress=False,
)
# Parquet export fails gracefully; result dict should not contain it
assert ExportFormat.PARQUET not in results
# ---------------------------------------------------------------------------
# CloudStorage — mocked boto3 tests
# ---------------------------------------------------------------------------
@pytest.fixture
def mock_boto3_module():
"""Inject a fake boto3 into sys.modules for the duration of the test."""
mock = MagicMock()
with patch.dict(sys.modules, {"boto3": mock}):
yield mock
@pytest.fixture
def s3_config():
"""Minimal S3Config for tests."""
from src.backup.cloud_storage import S3Config
return S3Config(
endpoint_url="http://localhost:9000",
access_key="minioadmin",
secret_key="minioadmin",
bucket_name="test-bucket",
region="us-east-1",
)
class TestCloudStorage:
"""Test CloudStorage using mocked boto3."""
def test_init_creates_s3_client(self, mock_boto3_module, s3_config) -> None:
"""CloudStorage.__init__ must call boto3.client with the correct args."""
from src.backup.cloud_storage import CloudStorage
storage = CloudStorage(s3_config)
mock_boto3_module.client.assert_called_once()
call_kwargs = mock_boto3_module.client.call_args[1]
assert call_kwargs["aws_access_key_id"] == "minioadmin"
assert call_kwargs["aws_secret_access_key"] == "minioadmin"
assert storage.config == s3_config
def test_init_raises_if_boto3_missing(self, s3_config) -> None:
"""CloudStorage.__init__ must raise ImportError when boto3 is absent."""
with patch.dict(sys.modules, {"boto3": None}): # type: ignore[dict-item]
with pytest.raises((ImportError, TypeError)):
# Re-import to trigger the try/except inside __init__
import importlib
import src.backup.cloud_storage as m
importlib.reload(m)
m.CloudStorage(s3_config)
def test_upload_file_success(self, mock_boto3_module, s3_config, tmp_path: Path) -> None:
"""upload_file must call client.upload_file and return the object key."""
from src.backup.cloud_storage import CloudStorage
test_file = tmp_path / "backup.json.gz"
test_file.write_bytes(b"data")
storage = CloudStorage(s3_config)
key = storage.upload_file(test_file, object_key="backups/backup.json.gz")
assert key == "backups/backup.json.gz"
storage.client.upload_file.assert_called_once()
def test_upload_file_default_key(self, mock_boto3_module, s3_config, tmp_path: Path) -> None:
"""upload_file without object_key must use the filename as key."""
from src.backup.cloud_storage import CloudStorage
test_file = tmp_path / "myfile.gz"
test_file.write_bytes(b"data")
storage = CloudStorage(s3_config)
key = storage.upload_file(test_file)
assert key == "myfile.gz"
def test_upload_file_not_found(self, mock_boto3_module, s3_config, tmp_path: Path) -> None:
"""upload_file must raise FileNotFoundError for missing files."""
from src.backup.cloud_storage import CloudStorage
storage = CloudStorage(s3_config)
with pytest.raises(FileNotFoundError):
storage.upload_file(tmp_path / "nonexistent.gz")
def test_upload_file_propagates_client_error(
self, mock_boto3_module, s3_config, tmp_path: Path
) -> None:
"""upload_file must re-raise exceptions from the boto3 client."""
from src.backup.cloud_storage import CloudStorage
test_file = tmp_path / "backup.gz"
test_file.write_bytes(b"data")
storage = CloudStorage(s3_config)
storage.client.upload_file.side_effect = RuntimeError("network error")
with pytest.raises(RuntimeError, match="network error"):
storage.upload_file(test_file)
def test_download_file_success(self, mock_boto3_module, s3_config, tmp_path: Path) -> None:
"""download_file must call client.download_file and return local path."""
from src.backup.cloud_storage import CloudStorage
storage = CloudStorage(s3_config)
dest = tmp_path / "downloads" / "backup.gz"
result = storage.download_file("backups/backup.gz", dest)
assert result == dest
storage.client.download_file.assert_called_once()
def test_download_file_propagates_error(
self, mock_boto3_module, s3_config, tmp_path: Path
) -> None:
"""download_file must re-raise exceptions from the boto3 client."""
from src.backup.cloud_storage import CloudStorage
storage = CloudStorage(s3_config)
storage.client.download_file.side_effect = RuntimeError("timeout")
with pytest.raises(RuntimeError, match="timeout"):
storage.download_file("key", tmp_path / "dest.gz")
def test_list_files_returns_objects(self, mock_boto3_module, s3_config) -> None:
"""list_files must return parsed file metadata from S3 response."""
from src.backup.cloud_storage import CloudStorage
storage = CloudStorage(s3_config)
storage.client.list_objects_v2.return_value = {
"Contents": [
{
"Key": "backups/a.gz",
"Size": 1024,
"LastModified": datetime(2026, 1, 1, tzinfo=UTC),
"ETag": '"abc123"',
}
]
}
files = storage.list_files(prefix="backups/")
assert len(files) == 1
assert files[0]["key"] == "backups/a.gz"
assert files[0]["size_bytes"] == 1024
def test_list_files_empty_bucket(self, mock_boto3_module, s3_config) -> None:
"""list_files must return empty list when bucket has no objects."""
from src.backup.cloud_storage import CloudStorage
storage = CloudStorage(s3_config)
storage.client.list_objects_v2.return_value = {}
files = storage.list_files()
assert files == []
def test_list_files_propagates_error(self, mock_boto3_module, s3_config) -> None:
"""list_files must re-raise exceptions from the boto3 client."""
from src.backup.cloud_storage import CloudStorage
storage = CloudStorage(s3_config)
storage.client.list_objects_v2.side_effect = RuntimeError("auth error")
with pytest.raises(RuntimeError):
storage.list_files()
def test_delete_file_success(self, mock_boto3_module, s3_config) -> None:
"""delete_file must call client.delete_object with the correct key."""
from src.backup.cloud_storage import CloudStorage
storage = CloudStorage(s3_config)
storage.delete_file("backups/old.gz")
storage.client.delete_object.assert_called_once_with(
Bucket="test-bucket", Key="backups/old.gz"
)
def test_delete_file_propagates_error(self, mock_boto3_module, s3_config) -> None:
"""delete_file must re-raise exceptions from the boto3 client."""
from src.backup.cloud_storage import CloudStorage
storage = CloudStorage(s3_config)
storage.client.delete_object.side_effect = RuntimeError("permission denied")
with pytest.raises(RuntimeError):
storage.delete_file("backups/old.gz")
def test_get_storage_stats_success(self, mock_boto3_module, s3_config) -> None:
"""get_storage_stats must aggregate file sizes correctly."""
from src.backup.cloud_storage import CloudStorage
storage = CloudStorage(s3_config)
storage.client.list_objects_v2.return_value = {
"Contents": [
{
"Key": "a.gz",
"Size": 1024 * 1024,
"LastModified": datetime(2026, 1, 1, tzinfo=UTC),
"ETag": '"x"',
},
{
"Key": "b.gz",
"Size": 1024 * 1024,
"LastModified": datetime(2026, 1, 2, tzinfo=UTC),
"ETag": '"y"',
},
]
}
stats = storage.get_storage_stats()
assert stats["total_files"] == 2
assert stats["total_size_bytes"] == 2 * 1024 * 1024
assert stats["total_size_mb"] == pytest.approx(2.0)
def test_get_storage_stats_on_error(self, mock_boto3_module, s3_config) -> None:
"""get_storage_stats must return error dict without raising on failure."""
from src.backup.cloud_storage import CloudStorage
storage = CloudStorage(s3_config)
storage.client.list_objects_v2.side_effect = RuntimeError("no connection")
stats = storage.get_storage_stats()
assert "error" in stats
assert stats["total_files"] == 0
def test_verify_connection_success(self, mock_boto3_module, s3_config) -> None:
"""verify_connection must return True when head_bucket succeeds."""
from src.backup.cloud_storage import CloudStorage
storage = CloudStorage(s3_config)
result = storage.verify_connection()
assert result is True
def test_verify_connection_failure(self, mock_boto3_module, s3_config) -> None:
"""verify_connection must return False when head_bucket raises."""
from src.backup.cloud_storage import CloudStorage
storage = CloudStorage(s3_config)
storage.client.head_bucket.side_effect = RuntimeError("no such bucket")
result = storage.verify_connection()
assert result is False
def test_enable_versioning(self, mock_boto3_module, s3_config) -> None:
"""enable_versioning must call put_bucket_versioning."""
from src.backup.cloud_storage import CloudStorage
storage = CloudStorage(s3_config)
storage.enable_versioning()
storage.client.put_bucket_versioning.assert_called_once()
def test_enable_versioning_propagates_error(self, mock_boto3_module, s3_config) -> None:
"""enable_versioning must re-raise exceptions from the boto3 client."""
from src.backup.cloud_storage import CloudStorage
storage = CloudStorage(s3_config)
storage.client.put_bucket_versioning.side_effect = RuntimeError("denied")
with pytest.raises(RuntimeError):
storage.enable_versioning()