test: 테스트 커버리지 77% → 80% 달성 (issue #204)
Some checks failed
CI / test (pull_request) Has been cancelled
Some checks failed
CI / test (pull_request) Has been cancelled
신규/추가 테스트: - tests/test_logging_config.py: JSONFormatter, setup_logging 전체 커버 (14줄) - tests/test_strategies_base.py: BaseStrategy 추상 클래스 커버 (6줄) - tests/test_backup.py: BackupExporter 미커버 경로(빈 CSV, compress=True CSV, 포맷 실패 로깅, 기본 formats) + CloudStorage boto3 모킹 테스트 20개 (113줄) - tests/test_context.py: ContextSummarizer 전체 커버 22개 테스트 (50줄) 총 815개 테스트 통과, TOTAL 커버리지 80% (1046줄 미커버 / 5225줄 전체) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -3,9 +3,11 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import sqlite3
|
||||
import sys
|
||||
import tempfile
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
@@ -363,3 +365,435 @@ class TestHealthMonitor:
|
||||
assert "timestamp" in report
|
||||
assert "checks" in report
|
||||
assert len(report["checks"]) == 3
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# BackupExporter — additional coverage for previously uncovered branches
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def empty_db(tmp_path: Path) -> Path:
|
||||
"""Create a temporary database with NO trade records."""
|
||||
db_path = tmp_path / "empty_trades.db"
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
conn.execute(
|
||||
"""CREATE TABLE trades (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
timestamp TEXT NOT NULL,
|
||||
stock_code TEXT NOT NULL,
|
||||
action TEXT NOT NULL,
|
||||
quantity INTEGER NOT NULL,
|
||||
price REAL NOT NULL,
|
||||
confidence INTEGER NOT NULL,
|
||||
rationale TEXT,
|
||||
pnl REAL DEFAULT 0.0
|
||||
)"""
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
return db_path
|
||||
|
||||
|
||||
class TestBackupExporterAdditional:
|
||||
"""Cover branches missed in the original TestBackupExporter suite."""
|
||||
|
||||
def test_export_all_default_formats(self, temp_db: Path, tmp_path: Path) -> None:
|
||||
"""export_all with formats=None must default to JSON+CSV+Parquet path."""
|
||||
exporter = BackupExporter(str(temp_db))
|
||||
# formats=None triggers the default list assignment (line 62)
|
||||
results = exporter.export_all(tmp_path / "out", formats=None, compress=False)
|
||||
# JSON and CSV must always succeed; Parquet needs pyarrow
|
||||
assert ExportFormat.JSON in results
|
||||
assert ExportFormat.CSV in results
|
||||
|
||||
def test_export_all_logs_error_on_failure(
|
||||
self, temp_db: Path, tmp_path: Path
|
||||
) -> None:
|
||||
"""export_all must log an error and continue when one format fails."""
|
||||
exporter = BackupExporter(str(temp_db))
|
||||
# Patch _export_format to raise on JSON, succeed on CSV
|
||||
original = exporter._export_format
|
||||
|
||||
def failing_export(fmt, *args, **kwargs): # type: ignore[no-untyped-def]
|
||||
if fmt == ExportFormat.JSON:
|
||||
raise RuntimeError("simulated failure")
|
||||
return original(fmt, *args, **kwargs)
|
||||
|
||||
exporter._export_format = failing_export # type: ignore[method-assign]
|
||||
results = exporter.export_all(
|
||||
tmp_path / "out",
|
||||
formats=[ExportFormat.JSON, ExportFormat.CSV],
|
||||
compress=False,
|
||||
)
|
||||
# JSON failed → not in results; CSV succeeded → in results
|
||||
assert ExportFormat.JSON not in results
|
||||
assert ExportFormat.CSV in results
|
||||
|
||||
def test_export_csv_empty_trades_no_compress(
|
||||
self, empty_db: Path, tmp_path: Path
|
||||
) -> None:
|
||||
"""CSV export with no trades and compress=False must write header row only."""
|
||||
exporter = BackupExporter(str(empty_db))
|
||||
results = exporter.export_all(
|
||||
tmp_path / "out",
|
||||
formats=[ExportFormat.CSV],
|
||||
compress=False,
|
||||
)
|
||||
assert ExportFormat.CSV in results
|
||||
out = results[ExportFormat.CSV]
|
||||
assert out.exists()
|
||||
content = out.read_text()
|
||||
assert "timestamp" in content
|
||||
|
||||
def test_export_csv_empty_trades_compressed(
|
||||
self, empty_db: Path, tmp_path: Path
|
||||
) -> None:
|
||||
"""CSV export with no trades and compress=True must write gzipped header."""
|
||||
import gzip
|
||||
|
||||
exporter = BackupExporter(str(empty_db))
|
||||
results = exporter.export_all(
|
||||
tmp_path / "out",
|
||||
formats=[ExportFormat.CSV],
|
||||
compress=True,
|
||||
)
|
||||
assert ExportFormat.CSV in results
|
||||
out = results[ExportFormat.CSV]
|
||||
assert out.suffix == ".gz"
|
||||
with gzip.open(out, "rt", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
assert "timestamp" in content
|
||||
|
||||
def test_export_csv_with_data_compressed(
|
||||
self, temp_db: Path, tmp_path: Path
|
||||
) -> None:
|
||||
"""CSV export with data and compress=True must write gzipped rows."""
|
||||
import gzip
|
||||
|
||||
exporter = BackupExporter(str(temp_db))
|
||||
results = exporter.export_all(
|
||||
tmp_path / "out",
|
||||
formats=[ExportFormat.CSV],
|
||||
compress=True,
|
||||
)
|
||||
assert ExportFormat.CSV in results
|
||||
out = results[ExportFormat.CSV]
|
||||
with gzip.open(out, "rt", encoding="utf-8") as f:
|
||||
lines = f.readlines()
|
||||
# Header + 3 data rows
|
||||
assert len(lines) == 4
|
||||
|
||||
def test_export_parquet_raises_import_error_without_pyarrow(
|
||||
self, temp_db: Path, tmp_path: Path
|
||||
) -> None:
|
||||
"""Parquet export must raise ImportError when pyarrow is not installed."""
|
||||
exporter = BackupExporter(str(temp_db))
|
||||
with patch.dict(sys.modules, {"pyarrow": None, "pyarrow.parquet": None}):
|
||||
try:
|
||||
import pyarrow # noqa: F401
|
||||
pytest.skip("pyarrow is installed; cannot test ImportError path")
|
||||
except ImportError:
|
||||
pass
|
||||
results = exporter.export_all(
|
||||
tmp_path / "out",
|
||||
formats=[ExportFormat.PARQUET],
|
||||
compress=False,
|
||||
)
|
||||
# Parquet export fails gracefully; result dict should not contain it
|
||||
assert ExportFormat.PARQUET not in results
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CloudStorage — mocked boto3 tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_boto3_module():
|
||||
"""Inject a fake boto3 into sys.modules for the duration of the test."""
|
||||
mock = MagicMock()
|
||||
with patch.dict(sys.modules, {"boto3": mock}):
|
||||
yield mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def s3_config():
|
||||
"""Minimal S3Config for tests."""
|
||||
from src.backup.cloud_storage import S3Config
|
||||
|
||||
return S3Config(
|
||||
endpoint_url="http://localhost:9000",
|
||||
access_key="minioadmin",
|
||||
secret_key="minioadmin",
|
||||
bucket_name="test-bucket",
|
||||
region="us-east-1",
|
||||
)
|
||||
|
||||
|
||||
class TestCloudStorage:
|
||||
"""Test CloudStorage using mocked boto3."""
|
||||
|
||||
def test_init_creates_s3_client(self, mock_boto3_module, s3_config) -> None:
|
||||
"""CloudStorage.__init__ must call boto3.client with the correct args."""
|
||||
from src.backup.cloud_storage import CloudStorage
|
||||
|
||||
storage = CloudStorage(s3_config)
|
||||
mock_boto3_module.client.assert_called_once()
|
||||
call_kwargs = mock_boto3_module.client.call_args[1]
|
||||
assert call_kwargs["aws_access_key_id"] == "minioadmin"
|
||||
assert call_kwargs["aws_secret_access_key"] == "minioadmin"
|
||||
assert storage.config == s3_config
|
||||
|
||||
def test_init_raises_if_boto3_missing(self, s3_config) -> None:
|
||||
"""CloudStorage.__init__ must raise ImportError when boto3 is absent."""
|
||||
with patch.dict(sys.modules, {"boto3": None}): # type: ignore[dict-item]
|
||||
with pytest.raises((ImportError, TypeError)):
|
||||
# Re-import to trigger the try/except inside __init__
|
||||
import importlib
|
||||
|
||||
import src.backup.cloud_storage as m
|
||||
|
||||
importlib.reload(m)
|
||||
m.CloudStorage(s3_config)
|
||||
|
||||
def test_upload_file_success(
|
||||
self, mock_boto3_module, s3_config, tmp_path: Path
|
||||
) -> None:
|
||||
"""upload_file must call client.upload_file and return the object key."""
|
||||
from src.backup.cloud_storage import CloudStorage
|
||||
|
||||
test_file = tmp_path / "backup.json.gz"
|
||||
test_file.write_bytes(b"data")
|
||||
|
||||
storage = CloudStorage(s3_config)
|
||||
key = storage.upload_file(test_file, object_key="backups/backup.json.gz")
|
||||
|
||||
assert key == "backups/backup.json.gz"
|
||||
storage.client.upload_file.assert_called_once()
|
||||
|
||||
def test_upload_file_default_key(
|
||||
self, mock_boto3_module, s3_config, tmp_path: Path
|
||||
) -> None:
|
||||
"""upload_file without object_key must use the filename as key."""
|
||||
from src.backup.cloud_storage import CloudStorage
|
||||
|
||||
test_file = tmp_path / "myfile.gz"
|
||||
test_file.write_bytes(b"data")
|
||||
|
||||
storage = CloudStorage(s3_config)
|
||||
key = storage.upload_file(test_file)
|
||||
|
||||
assert key == "myfile.gz"
|
||||
|
||||
def test_upload_file_not_found(
|
||||
self, mock_boto3_module, s3_config, tmp_path: Path
|
||||
) -> None:
|
||||
"""upload_file must raise FileNotFoundError for missing files."""
|
||||
from src.backup.cloud_storage import CloudStorage
|
||||
|
||||
storage = CloudStorage(s3_config)
|
||||
with pytest.raises(FileNotFoundError):
|
||||
storage.upload_file(tmp_path / "nonexistent.gz")
|
||||
|
||||
def test_upload_file_propagates_client_error(
|
||||
self, mock_boto3_module, s3_config, tmp_path: Path
|
||||
) -> None:
|
||||
"""upload_file must re-raise exceptions from the boto3 client."""
|
||||
from src.backup.cloud_storage import CloudStorage
|
||||
|
||||
test_file = tmp_path / "backup.gz"
|
||||
test_file.write_bytes(b"data")
|
||||
|
||||
storage = CloudStorage(s3_config)
|
||||
storage.client.upload_file.side_effect = RuntimeError("network error")
|
||||
|
||||
with pytest.raises(RuntimeError, match="network error"):
|
||||
storage.upload_file(test_file)
|
||||
|
||||
def test_download_file_success(
|
||||
self, mock_boto3_module, s3_config, tmp_path: Path
|
||||
) -> None:
|
||||
"""download_file must call client.download_file and return local path."""
|
||||
from src.backup.cloud_storage import CloudStorage
|
||||
|
||||
storage = CloudStorage(s3_config)
|
||||
dest = tmp_path / "downloads" / "backup.gz"
|
||||
|
||||
result = storage.download_file("backups/backup.gz", dest)
|
||||
|
||||
assert result == dest
|
||||
storage.client.download_file.assert_called_once()
|
||||
|
||||
def test_download_file_propagates_error(
|
||||
self, mock_boto3_module, s3_config, tmp_path: Path
|
||||
) -> None:
|
||||
"""download_file must re-raise exceptions from the boto3 client."""
|
||||
from src.backup.cloud_storage import CloudStorage
|
||||
|
||||
storage = CloudStorage(s3_config)
|
||||
storage.client.download_file.side_effect = RuntimeError("timeout")
|
||||
|
||||
with pytest.raises(RuntimeError, match="timeout"):
|
||||
storage.download_file("key", tmp_path / "dest.gz")
|
||||
|
||||
def test_list_files_returns_objects(
|
||||
self, mock_boto3_module, s3_config
|
||||
) -> None:
|
||||
"""list_files must return parsed file metadata from S3 response."""
|
||||
from datetime import timezone
|
||||
|
||||
from src.backup.cloud_storage import CloudStorage
|
||||
|
||||
storage = CloudStorage(s3_config)
|
||||
storage.client.list_objects_v2.return_value = {
|
||||
"Contents": [
|
||||
{
|
||||
"Key": "backups/a.gz",
|
||||
"Size": 1024,
|
||||
"LastModified": datetime(2026, 1, 1, tzinfo=timezone.utc),
|
||||
"ETag": '"abc123"',
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
files = storage.list_files(prefix="backups/")
|
||||
assert len(files) == 1
|
||||
assert files[0]["key"] == "backups/a.gz"
|
||||
assert files[0]["size_bytes"] == 1024
|
||||
|
||||
def test_list_files_empty_bucket(
|
||||
self, mock_boto3_module, s3_config
|
||||
) -> None:
|
||||
"""list_files must return empty list when bucket has no objects."""
|
||||
from src.backup.cloud_storage import CloudStorage
|
||||
|
||||
storage = CloudStorage(s3_config)
|
||||
storage.client.list_objects_v2.return_value = {}
|
||||
|
||||
files = storage.list_files()
|
||||
assert files == []
|
||||
|
||||
def test_list_files_propagates_error(
|
||||
self, mock_boto3_module, s3_config
|
||||
) -> None:
|
||||
"""list_files must re-raise exceptions from the boto3 client."""
|
||||
from src.backup.cloud_storage import CloudStorage
|
||||
|
||||
storage = CloudStorage(s3_config)
|
||||
storage.client.list_objects_v2.side_effect = RuntimeError("auth error")
|
||||
|
||||
with pytest.raises(RuntimeError):
|
||||
storage.list_files()
|
||||
|
||||
def test_delete_file_success(
|
||||
self, mock_boto3_module, s3_config
|
||||
) -> None:
|
||||
"""delete_file must call client.delete_object with the correct key."""
|
||||
from src.backup.cloud_storage import CloudStorage
|
||||
|
||||
storage = CloudStorage(s3_config)
|
||||
storage.delete_file("backups/old.gz")
|
||||
storage.client.delete_object.assert_called_once_with(
|
||||
Bucket="test-bucket", Key="backups/old.gz"
|
||||
)
|
||||
|
||||
def test_delete_file_propagates_error(
|
||||
self, mock_boto3_module, s3_config
|
||||
) -> None:
|
||||
"""delete_file must re-raise exceptions from the boto3 client."""
|
||||
from src.backup.cloud_storage import CloudStorage
|
||||
|
||||
storage = CloudStorage(s3_config)
|
||||
storage.client.delete_object.side_effect = RuntimeError("permission denied")
|
||||
|
||||
with pytest.raises(RuntimeError):
|
||||
storage.delete_file("backups/old.gz")
|
||||
|
||||
def test_get_storage_stats_success(
|
||||
self, mock_boto3_module, s3_config
|
||||
) -> None:
|
||||
"""get_storage_stats must aggregate file sizes correctly."""
|
||||
from datetime import timezone
|
||||
|
||||
from src.backup.cloud_storage import CloudStorage
|
||||
|
||||
storage = CloudStorage(s3_config)
|
||||
storage.client.list_objects_v2.return_value = {
|
||||
"Contents": [
|
||||
{
|
||||
"Key": "a.gz",
|
||||
"Size": 1024 * 1024,
|
||||
"LastModified": datetime(2026, 1, 1, tzinfo=timezone.utc),
|
||||
"ETag": '"x"',
|
||||
},
|
||||
{
|
||||
"Key": "b.gz",
|
||||
"Size": 1024 * 1024,
|
||||
"LastModified": datetime(2026, 1, 2, tzinfo=timezone.utc),
|
||||
"ETag": '"y"',
|
||||
},
|
||||
]
|
||||
}
|
||||
|
||||
stats = storage.get_storage_stats()
|
||||
assert stats["total_files"] == 2
|
||||
assert stats["total_size_bytes"] == 2 * 1024 * 1024
|
||||
assert stats["total_size_mb"] == pytest.approx(2.0)
|
||||
|
||||
def test_get_storage_stats_on_error(
|
||||
self, mock_boto3_module, s3_config
|
||||
) -> None:
|
||||
"""get_storage_stats must return error dict without raising on failure."""
|
||||
from src.backup.cloud_storage import CloudStorage
|
||||
|
||||
storage = CloudStorage(s3_config)
|
||||
storage.client.list_objects_v2.side_effect = RuntimeError("no connection")
|
||||
|
||||
stats = storage.get_storage_stats()
|
||||
assert "error" in stats
|
||||
assert stats["total_files"] == 0
|
||||
|
||||
def test_verify_connection_success(
|
||||
self, mock_boto3_module, s3_config
|
||||
) -> None:
|
||||
"""verify_connection must return True when head_bucket succeeds."""
|
||||
from src.backup.cloud_storage import CloudStorage
|
||||
|
||||
storage = CloudStorage(s3_config)
|
||||
result = storage.verify_connection()
|
||||
assert result is True
|
||||
|
||||
def test_verify_connection_failure(
|
||||
self, mock_boto3_module, s3_config
|
||||
) -> None:
|
||||
"""verify_connection must return False when head_bucket raises."""
|
||||
from src.backup.cloud_storage import CloudStorage
|
||||
|
||||
storage = CloudStorage(s3_config)
|
||||
storage.client.head_bucket.side_effect = RuntimeError("no such bucket")
|
||||
|
||||
result = storage.verify_connection()
|
||||
assert result is False
|
||||
|
||||
def test_enable_versioning(
|
||||
self, mock_boto3_module, s3_config
|
||||
) -> None:
|
||||
"""enable_versioning must call put_bucket_versioning."""
|
||||
from src.backup.cloud_storage import CloudStorage
|
||||
|
||||
storage = CloudStorage(s3_config)
|
||||
storage.enable_versioning()
|
||||
storage.client.put_bucket_versioning.assert_called_once()
|
||||
|
||||
def test_enable_versioning_propagates_error(
|
||||
self, mock_boto3_module, s3_config
|
||||
) -> None:
|
||||
"""enable_versioning must re-raise exceptions from the boto3 client."""
|
||||
from src.backup.cloud_storage import CloudStorage
|
||||
|
||||
storage = CloudStorage(s3_config)
|
||||
storage.client.put_bucket_versioning.side_effect = RuntimeError("denied")
|
||||
|
||||
with pytest.raises(RuntimeError):
|
||||
storage.enable_versioning()
|
||||
|
||||
Reference in New Issue
Block a user