From 4fc4a57036d0ae18169807754004ace5826df124 Mon Sep 17 00:00:00 2001 From: agentson Date: Mon, 23 Feb 2026 12:48:08 +0900 Subject: [PATCH] =?UTF-8?q?test:=20=ED=85=8C=EC=8A=A4=ED=8A=B8=20=EC=BB=A4?= =?UTF-8?q?=EB=B2=84=EB=A6=AC=EC=A7=80=2077%=20=E2=86=92=2080%=20=EB=8B=AC?= =?UTF-8?q?=EC=84=B1=20(issue=20#204)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 신규/추가 테스트: - tests/test_logging_config.py: JSONFormatter, setup_logging 전체 커버 (14줄) - tests/test_strategies_base.py: BaseStrategy 추상 클래스 커버 (6줄) - tests/test_backup.py: BackupExporter 미커버 경로(빈 CSV, compress=True CSV, 포맷 실패 로깅, 기본 formats) + CloudStorage boto3 모킹 테스트 20개 (113줄) - tests/test_context.py: ContextSummarizer 전체 커버 22개 테스트 (50줄) 총 815개 테스트 통과, TOTAL 커버리지 80% (1046줄 미커버 / 5225줄 전체) Co-Authored-By: Claude Sonnet 4.6 --- tests/test_backup.py | 434 ++++++++++++++++++++++++++++++++++ tests/test_context.py | 257 ++++++++++++++++++++ tests/test_logging_config.py | 117 +++++++++ tests/test_strategies_base.py | 32 +++ 4 files changed, 840 insertions(+) create mode 100644 tests/test_logging_config.py create mode 100644 tests/test_strategies_base.py diff --git a/tests/test_backup.py b/tests/test_backup.py index 1949f15..0ecfa3e 100644 --- a/tests/test_backup.py +++ b/tests/test_backup.py @@ -3,9 +3,11 @@ from __future__ import annotations import sqlite3 +import sys import tempfile from datetime import UTC, datetime, timedelta from pathlib import Path +from unittest.mock import MagicMock, patch import pytest @@ -363,3 +365,435 @@ class TestHealthMonitor: assert "timestamp" in report assert "checks" in report assert len(report["checks"]) == 3 + + +# --------------------------------------------------------------------------- +# BackupExporter — additional coverage for previously uncovered branches +# --------------------------------------------------------------------------- + + +@pytest.fixture +def empty_db(tmp_path: Path) -> Path: + """Create a temporary database with NO trade records.""" + db_path = tmp_path / "empty_trades.db" + conn = sqlite3.connect(str(db_path)) + conn.execute( + """CREATE TABLE trades ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL, + stock_code TEXT NOT NULL, + action TEXT NOT NULL, + quantity INTEGER NOT NULL, + price REAL NOT NULL, + confidence INTEGER NOT NULL, + rationale TEXT, + pnl REAL DEFAULT 0.0 + )""" + ) + conn.commit() + conn.close() + return db_path + + +class TestBackupExporterAdditional: + """Cover branches missed in the original TestBackupExporter suite.""" + + def test_export_all_default_formats(self, temp_db: Path, tmp_path: Path) -> None: + """export_all with formats=None must default to JSON+CSV+Parquet path.""" + exporter = BackupExporter(str(temp_db)) + # formats=None triggers the default list assignment (line 62) + results = exporter.export_all(tmp_path / "out", formats=None, compress=False) + # JSON and CSV must always succeed; Parquet needs pyarrow + assert ExportFormat.JSON in results + assert ExportFormat.CSV in results + + def test_export_all_logs_error_on_failure( + self, temp_db: Path, tmp_path: Path + ) -> None: + """export_all must log an error and continue when one format fails.""" + exporter = BackupExporter(str(temp_db)) + # Patch _export_format to raise on JSON, succeed on CSV + original = exporter._export_format + + def failing_export(fmt, *args, **kwargs): # type: ignore[no-untyped-def] + if fmt == ExportFormat.JSON: + raise RuntimeError("simulated failure") + return original(fmt, *args, **kwargs) + + exporter._export_format = failing_export # type: ignore[method-assign] + results = exporter.export_all( + tmp_path / "out", + formats=[ExportFormat.JSON, ExportFormat.CSV], + compress=False, + ) + # JSON failed → not in results; CSV succeeded → in results + assert ExportFormat.JSON not in results + assert ExportFormat.CSV in results + + def test_export_csv_empty_trades_no_compress( + self, empty_db: Path, tmp_path: Path + ) -> None: + """CSV export with no trades and compress=False must write header row only.""" + exporter = BackupExporter(str(empty_db)) + results = exporter.export_all( + tmp_path / "out", + formats=[ExportFormat.CSV], + compress=False, + ) + assert ExportFormat.CSV in results + out = results[ExportFormat.CSV] + assert out.exists() + content = out.read_text() + assert "timestamp" in content + + def test_export_csv_empty_trades_compressed( + self, empty_db: Path, tmp_path: Path + ) -> None: + """CSV export with no trades and compress=True must write gzipped header.""" + import gzip + + exporter = BackupExporter(str(empty_db)) + results = exporter.export_all( + tmp_path / "out", + formats=[ExportFormat.CSV], + compress=True, + ) + assert ExportFormat.CSV in results + out = results[ExportFormat.CSV] + assert out.suffix == ".gz" + with gzip.open(out, "rt", encoding="utf-8") as f: + content = f.read() + assert "timestamp" in content + + def test_export_csv_with_data_compressed( + self, temp_db: Path, tmp_path: Path + ) -> None: + """CSV export with data and compress=True must write gzipped rows.""" + import gzip + + exporter = BackupExporter(str(temp_db)) + results = exporter.export_all( + tmp_path / "out", + formats=[ExportFormat.CSV], + compress=True, + ) + assert ExportFormat.CSV in results + out = results[ExportFormat.CSV] + with gzip.open(out, "rt", encoding="utf-8") as f: + lines = f.readlines() + # Header + 3 data rows + assert len(lines) == 4 + + def test_export_parquet_raises_import_error_without_pyarrow( + self, temp_db: Path, tmp_path: Path + ) -> None: + """Parquet export must raise ImportError when pyarrow is not installed.""" + exporter = BackupExporter(str(temp_db)) + with patch.dict(sys.modules, {"pyarrow": None, "pyarrow.parquet": None}): + try: + import pyarrow # noqa: F401 + pytest.skip("pyarrow is installed; cannot test ImportError path") + except ImportError: + pass + results = exporter.export_all( + tmp_path / "out", + formats=[ExportFormat.PARQUET], + compress=False, + ) + # Parquet export fails gracefully; result dict should not contain it + assert ExportFormat.PARQUET not in results + + +# --------------------------------------------------------------------------- +# CloudStorage — mocked boto3 tests +# --------------------------------------------------------------------------- + + +@pytest.fixture +def mock_boto3_module(): + """Inject a fake boto3 into sys.modules for the duration of the test.""" + mock = MagicMock() + with patch.dict(sys.modules, {"boto3": mock}): + yield mock + + +@pytest.fixture +def s3_config(): + """Minimal S3Config for tests.""" + from src.backup.cloud_storage import S3Config + + return S3Config( + endpoint_url="http://localhost:9000", + access_key="minioadmin", + secret_key="minioadmin", + bucket_name="test-bucket", + region="us-east-1", + ) + + +class TestCloudStorage: + """Test CloudStorage using mocked boto3.""" + + def test_init_creates_s3_client(self, mock_boto3_module, s3_config) -> None: + """CloudStorage.__init__ must call boto3.client with the correct args.""" + from src.backup.cloud_storage import CloudStorage + + storage = CloudStorage(s3_config) + mock_boto3_module.client.assert_called_once() + call_kwargs = mock_boto3_module.client.call_args[1] + assert call_kwargs["aws_access_key_id"] == "minioadmin" + assert call_kwargs["aws_secret_access_key"] == "minioadmin" + assert storage.config == s3_config + + def test_init_raises_if_boto3_missing(self, s3_config) -> None: + """CloudStorage.__init__ must raise ImportError when boto3 is absent.""" + with patch.dict(sys.modules, {"boto3": None}): # type: ignore[dict-item] + with pytest.raises((ImportError, TypeError)): + # Re-import to trigger the try/except inside __init__ + import importlib + + import src.backup.cloud_storage as m + + importlib.reload(m) + m.CloudStorage(s3_config) + + def test_upload_file_success( + self, mock_boto3_module, s3_config, tmp_path: Path + ) -> None: + """upload_file must call client.upload_file and return the object key.""" + from src.backup.cloud_storage import CloudStorage + + test_file = tmp_path / "backup.json.gz" + test_file.write_bytes(b"data") + + storage = CloudStorage(s3_config) + key = storage.upload_file(test_file, object_key="backups/backup.json.gz") + + assert key == "backups/backup.json.gz" + storage.client.upload_file.assert_called_once() + + def test_upload_file_default_key( + self, mock_boto3_module, s3_config, tmp_path: Path + ) -> None: + """upload_file without object_key must use the filename as key.""" + from src.backup.cloud_storage import CloudStorage + + test_file = tmp_path / "myfile.gz" + test_file.write_bytes(b"data") + + storage = CloudStorage(s3_config) + key = storage.upload_file(test_file) + + assert key == "myfile.gz" + + def test_upload_file_not_found( + self, mock_boto3_module, s3_config, tmp_path: Path + ) -> None: + """upload_file must raise FileNotFoundError for missing files.""" + from src.backup.cloud_storage import CloudStorage + + storage = CloudStorage(s3_config) + with pytest.raises(FileNotFoundError): + storage.upload_file(tmp_path / "nonexistent.gz") + + def test_upload_file_propagates_client_error( + self, mock_boto3_module, s3_config, tmp_path: Path + ) -> None: + """upload_file must re-raise exceptions from the boto3 client.""" + from src.backup.cloud_storage import CloudStorage + + test_file = tmp_path / "backup.gz" + test_file.write_bytes(b"data") + + storage = CloudStorage(s3_config) + storage.client.upload_file.side_effect = RuntimeError("network error") + + with pytest.raises(RuntimeError, match="network error"): + storage.upload_file(test_file) + + def test_download_file_success( + self, mock_boto3_module, s3_config, tmp_path: Path + ) -> None: + """download_file must call client.download_file and return local path.""" + from src.backup.cloud_storage import CloudStorage + + storage = CloudStorage(s3_config) + dest = tmp_path / "downloads" / "backup.gz" + + result = storage.download_file("backups/backup.gz", dest) + + assert result == dest + storage.client.download_file.assert_called_once() + + def test_download_file_propagates_error( + self, mock_boto3_module, s3_config, tmp_path: Path + ) -> None: + """download_file must re-raise exceptions from the boto3 client.""" + from src.backup.cloud_storage import CloudStorage + + storage = CloudStorage(s3_config) + storage.client.download_file.side_effect = RuntimeError("timeout") + + with pytest.raises(RuntimeError, match="timeout"): + storage.download_file("key", tmp_path / "dest.gz") + + def test_list_files_returns_objects( + self, mock_boto3_module, s3_config + ) -> None: + """list_files must return parsed file metadata from S3 response.""" + from datetime import timezone + + from src.backup.cloud_storage import CloudStorage + + storage = CloudStorage(s3_config) + storage.client.list_objects_v2.return_value = { + "Contents": [ + { + "Key": "backups/a.gz", + "Size": 1024, + "LastModified": datetime(2026, 1, 1, tzinfo=timezone.utc), + "ETag": '"abc123"', + } + ] + } + + files = storage.list_files(prefix="backups/") + assert len(files) == 1 + assert files[0]["key"] == "backups/a.gz" + assert files[0]["size_bytes"] == 1024 + + def test_list_files_empty_bucket( + self, mock_boto3_module, s3_config + ) -> None: + """list_files must return empty list when bucket has no objects.""" + from src.backup.cloud_storage import CloudStorage + + storage = CloudStorage(s3_config) + storage.client.list_objects_v2.return_value = {} + + files = storage.list_files() + assert files == [] + + def test_list_files_propagates_error( + self, mock_boto3_module, s3_config + ) -> None: + """list_files must re-raise exceptions from the boto3 client.""" + from src.backup.cloud_storage import CloudStorage + + storage = CloudStorage(s3_config) + storage.client.list_objects_v2.side_effect = RuntimeError("auth error") + + with pytest.raises(RuntimeError): + storage.list_files() + + def test_delete_file_success( + self, mock_boto3_module, s3_config + ) -> None: + """delete_file must call client.delete_object with the correct key.""" + from src.backup.cloud_storage import CloudStorage + + storage = CloudStorage(s3_config) + storage.delete_file("backups/old.gz") + storage.client.delete_object.assert_called_once_with( + Bucket="test-bucket", Key="backups/old.gz" + ) + + def test_delete_file_propagates_error( + self, mock_boto3_module, s3_config + ) -> None: + """delete_file must re-raise exceptions from the boto3 client.""" + from src.backup.cloud_storage import CloudStorage + + storage = CloudStorage(s3_config) + storage.client.delete_object.side_effect = RuntimeError("permission denied") + + with pytest.raises(RuntimeError): + storage.delete_file("backups/old.gz") + + def test_get_storage_stats_success( + self, mock_boto3_module, s3_config + ) -> None: + """get_storage_stats must aggregate file sizes correctly.""" + from datetime import timezone + + from src.backup.cloud_storage import CloudStorage + + storage = CloudStorage(s3_config) + storage.client.list_objects_v2.return_value = { + "Contents": [ + { + "Key": "a.gz", + "Size": 1024 * 1024, + "LastModified": datetime(2026, 1, 1, tzinfo=timezone.utc), + "ETag": '"x"', + }, + { + "Key": "b.gz", + "Size": 1024 * 1024, + "LastModified": datetime(2026, 1, 2, tzinfo=timezone.utc), + "ETag": '"y"', + }, + ] + } + + stats = storage.get_storage_stats() + assert stats["total_files"] == 2 + assert stats["total_size_bytes"] == 2 * 1024 * 1024 + assert stats["total_size_mb"] == pytest.approx(2.0) + + def test_get_storage_stats_on_error( + self, mock_boto3_module, s3_config + ) -> None: + """get_storage_stats must return error dict without raising on failure.""" + from src.backup.cloud_storage import CloudStorage + + storage = CloudStorage(s3_config) + storage.client.list_objects_v2.side_effect = RuntimeError("no connection") + + stats = storage.get_storage_stats() + assert "error" in stats + assert stats["total_files"] == 0 + + def test_verify_connection_success( + self, mock_boto3_module, s3_config + ) -> None: + """verify_connection must return True when head_bucket succeeds.""" + from src.backup.cloud_storage import CloudStorage + + storage = CloudStorage(s3_config) + result = storage.verify_connection() + assert result is True + + def test_verify_connection_failure( + self, mock_boto3_module, s3_config + ) -> None: + """verify_connection must return False when head_bucket raises.""" + from src.backup.cloud_storage import CloudStorage + + storage = CloudStorage(s3_config) + storage.client.head_bucket.side_effect = RuntimeError("no such bucket") + + result = storage.verify_connection() + assert result is False + + def test_enable_versioning( + self, mock_boto3_module, s3_config + ) -> None: + """enable_versioning must call put_bucket_versioning.""" + from src.backup.cloud_storage import CloudStorage + + storage = CloudStorage(s3_config) + storage.enable_versioning() + storage.client.put_bucket_versioning.assert_called_once() + + def test_enable_versioning_propagates_error( + self, mock_boto3_module, s3_config + ) -> None: + """enable_versioning must re-raise exceptions from the boto3 client.""" + from src.backup.cloud_storage import CloudStorage + + storage = CloudStorage(s3_config) + storage.client.put_bucket_versioning.side_effect = RuntimeError("denied") + + with pytest.raises(RuntimeError): + storage.enable_versioning() diff --git a/tests/test_context.py b/tests/test_context.py index 5acce78..a1d1f29 100644 --- a/tests/test_context.py +++ b/tests/test_context.py @@ -10,6 +10,7 @@ import pytest from src.context.aggregator import ContextAggregator from src.context.layer import LAYER_CONFIG, ContextLayer from src.context.store import ContextStore +from src.context.summarizer import ContextSummarizer from src.db import init_db, log_trade @@ -370,3 +371,259 @@ class TestLayerMetadata: # L1 aggregates from L2 assert LAYER_CONFIG[ContextLayer.L1_LEGACY].aggregation_source == ContextLayer.L2_ANNUAL + + +# --------------------------------------------------------------------------- +# ContextSummarizer tests +# --------------------------------------------------------------------------- + + +@pytest.fixture +def summarizer(db_conn: sqlite3.Connection) -> ContextSummarizer: + """Provide a ContextSummarizer backed by an in-memory store.""" + return ContextSummarizer(ContextStore(db_conn)) + + +class TestContextSummarizer: + """Test suite for ContextSummarizer.""" + + # ------------------------------------------------------------------ + # summarize_numeric_values + # ------------------------------------------------------------------ + + def test_summarize_empty_values(self, summarizer: ContextSummarizer) -> None: + """Empty list must return SummaryStats with count=0 and no other fields.""" + stats = summarizer.summarize_numeric_values([]) + assert stats.count == 0 + assert stats.mean is None + assert stats.min is None + assert stats.max is None + + def test_summarize_single_value(self, summarizer: ContextSummarizer) -> None: + """Single-element list must return correct stats with std=0 and trend=flat.""" + stats = summarizer.summarize_numeric_values([42.0]) + assert stats.count == 1 + assert stats.mean == 42.0 + assert stats.std == 0.0 + assert stats.trend == "flat" + + def test_summarize_upward_trend(self, summarizer: ContextSummarizer) -> None: + """Increasing values must produce trend='up'.""" + values = [1.0, 2.0, 3.0, 10.0, 20.0, 30.0] + stats = summarizer.summarize_numeric_values(values) + assert stats.trend == "up" + + def test_summarize_downward_trend(self, summarizer: ContextSummarizer) -> None: + """Decreasing values must produce trend='down'.""" + values = [30.0, 20.0, 10.0, 3.0, 2.0, 1.0] + stats = summarizer.summarize_numeric_values(values) + assert stats.trend == "down" + + def test_summarize_flat_trend(self, summarizer: ContextSummarizer) -> None: + """Stable values must produce trend='flat'.""" + values = [100.0, 100.1, 99.9, 100.0, 100.2, 99.8] + stats = summarizer.summarize_numeric_values(values) + assert stats.trend == "flat" + + # ------------------------------------------------------------------ + # summarize_layer + # ------------------------------------------------------------------ + + def test_summarize_layer_no_data( + self, summarizer: ContextSummarizer + ) -> None: + """summarize_layer with no data must return the 'No data' sentinel.""" + result = summarizer.summarize_layer(ContextLayer.L6_DAILY) + assert result["count"] == 0 + assert "No data" in result["summary"] + + def test_summarize_layer_numeric( + self, summarizer: ContextSummarizer, db_conn: sqlite3.Connection + ) -> None: + """summarize_layer must collect numeric values and produce stats.""" + store = summarizer.store + store.set_context(ContextLayer.L6_DAILY, "2026-02-01", "total_pnl", 100.0) + store.set_context(ContextLayer.L6_DAILY, "2026-02-02", "total_pnl", 200.0) + + result = summarizer.summarize_layer(ContextLayer.L6_DAILY) + assert "total_entries" in result + + def test_summarize_layer_with_dict_values( + self, summarizer: ContextSummarizer + ) -> None: + """summarize_layer must handle dict values by extracting numeric subkeys.""" + store = summarizer.store + # set_context serialises the value as JSON, so passing a dict works + store.set_context( + ContextLayer.L6_DAILY, "2026-02-01", "metrics", + {"win_rate": 65.0, "label": "good"} + ) + + result = summarizer.summarize_layer(ContextLayer.L6_DAILY) + assert "total_entries" in result + # numeric subkey "win_rate" should appear as "metrics.win_rate" + assert "metrics.win_rate" in result + + def test_summarize_layer_with_string_values( + self, summarizer: ContextSummarizer + ) -> None: + """summarize_layer must count string values separately.""" + store = summarizer.store + # set_context stores string values as JSON-encoded strings + store.set_context(ContextLayer.L6_DAILY, "2026-02-01", "outlook", "BULLISH") + + result = summarizer.summarize_layer(ContextLayer.L6_DAILY) + # String fields contribute a `_count` entry + assert "outlook_count" in result + + # ------------------------------------------------------------------ + # rolling_window_summary + # ------------------------------------------------------------------ + + def test_rolling_window_summary_basic( + self, summarizer: ContextSummarizer + ) -> None: + """rolling_window_summary must return the expected structure.""" + store = summarizer.store + store.set_context(ContextLayer.L6_DAILY, "2026-02-01", "pnl", 500.0) + + result = summarizer.rolling_window_summary(ContextLayer.L6_DAILY) + assert "window_days" in result + assert "recent_data" in result + assert "historical_summary" in result + + def test_rolling_window_summary_no_older_data( + self, summarizer: ContextSummarizer + ) -> None: + """rolling_window_summary with summarize_older=False skips history.""" + result = summarizer.rolling_window_summary( + ContextLayer.L6_DAILY, summarize_older=False + ) + assert result["historical_summary"] == {} + + # ------------------------------------------------------------------ + # aggregate_to_higher_layer + # ------------------------------------------------------------------ + + def test_aggregate_to_higher_layer_mean( + self, summarizer: ContextSummarizer + ) -> None: + """aggregate_to_higher_layer with 'mean' via dict subkeys returns average.""" + store = summarizer.store + # Use different outer keys but same inner metric key so get_all_contexts + # returns multiple rows with the target subkey. + store.set_context(ContextLayer.L6_DAILY, "2026-02-01", "day1", {"pnl": 100.0}) + store.set_context(ContextLayer.L6_DAILY, "2026-02-01", "day2", {"pnl": 200.0}) + + result = summarizer.aggregate_to_higher_layer( + ContextLayer.L6_DAILY, ContextLayer.L5_WEEKLY, "pnl", "mean" + ) + assert result == pytest.approx(150.0) + + def test_aggregate_to_higher_layer_sum( + self, summarizer: ContextSummarizer + ) -> None: + """aggregate_to_higher_layer with 'sum' must return the total.""" + store = summarizer.store + store.set_context(ContextLayer.L6_DAILY, "2026-02-01", "day1", {"pnl": 100.0}) + store.set_context(ContextLayer.L6_DAILY, "2026-02-01", "day2", {"pnl": 200.0}) + + result = summarizer.aggregate_to_higher_layer( + ContextLayer.L6_DAILY, ContextLayer.L5_WEEKLY, "pnl", "sum" + ) + assert result == pytest.approx(300.0) + + def test_aggregate_to_higher_layer_max( + self, summarizer: ContextSummarizer + ) -> None: + """aggregate_to_higher_layer with 'max' must return the maximum.""" + store = summarizer.store + store.set_context(ContextLayer.L6_DAILY, "2026-02-01", "day1", {"pnl": 100.0}) + store.set_context(ContextLayer.L6_DAILY, "2026-02-01", "day2", {"pnl": 200.0}) + + result = summarizer.aggregate_to_higher_layer( + ContextLayer.L6_DAILY, ContextLayer.L5_WEEKLY, "pnl", "max" + ) + assert result == pytest.approx(200.0) + + def test_aggregate_to_higher_layer_min( + self, summarizer: ContextSummarizer + ) -> None: + """aggregate_to_higher_layer with 'min' must return the minimum.""" + store = summarizer.store + store.set_context(ContextLayer.L6_DAILY, "2026-02-01", "day1", {"pnl": 100.0}) + store.set_context(ContextLayer.L6_DAILY, "2026-02-01", "day2", {"pnl": 200.0}) + + result = summarizer.aggregate_to_higher_layer( + ContextLayer.L6_DAILY, ContextLayer.L5_WEEKLY, "pnl", "min" + ) + assert result == pytest.approx(100.0) + + def test_aggregate_to_higher_layer_no_data( + self, summarizer: ContextSummarizer + ) -> None: + """aggregate_to_higher_layer with no matching key must return None.""" + result = summarizer.aggregate_to_higher_layer( + ContextLayer.L6_DAILY, ContextLayer.L5_WEEKLY, "nonexistent", "mean" + ) + assert result is None + + def test_aggregate_to_higher_layer_unknown_func_defaults_to_mean( + self, summarizer: ContextSummarizer + ) -> None: + """Unknown aggregation function must fall back to mean.""" + store = summarizer.store + store.set_context(ContextLayer.L6_DAILY, "2026-02-01", "day1", {"pnl": 100.0}) + store.set_context(ContextLayer.L6_DAILY, "2026-02-01", "day2", {"pnl": 200.0}) + + result = summarizer.aggregate_to_higher_layer( + ContextLayer.L6_DAILY, ContextLayer.L5_WEEKLY, "pnl", "unknown_func" + ) + assert result == pytest.approx(150.0) + + # ------------------------------------------------------------------ + # create_compact_summary + format_summary_for_prompt + # ------------------------------------------------------------------ + + def test_create_compact_summary( + self, summarizer: ContextSummarizer + ) -> None: + """create_compact_summary must produce a dict keyed by layer value.""" + store = summarizer.store + store.set_context(ContextLayer.L6_DAILY, "2026-02-01", "pnl", 100.0) + + result = summarizer.create_compact_summary([ContextLayer.L6_DAILY]) + assert ContextLayer.L6_DAILY.value in result + + def test_format_summary_for_prompt_with_numeric_metrics( + self, summarizer: ContextSummarizer + ) -> None: + """format_summary_for_prompt must render avg/trend fields.""" + store = summarizer.store + store.set_context(ContextLayer.L6_DAILY, "2026-02-01", "pnl", 100.0) + store.set_context(ContextLayer.L6_DAILY, "2026-02-02", "pnl", 200.0) + + compact = summarizer.create_compact_summary([ContextLayer.L6_DAILY]) + text = summarizer.format_summary_for_prompt(compact) + assert isinstance(text, str) + + def test_format_summary_for_prompt_skips_empty_layers( + self, summarizer: ContextSummarizer + ) -> None: + """format_summary_for_prompt must skip layers with no metrics.""" + summary = {ContextLayer.L6_DAILY.value: {}} + text = summarizer.format_summary_for_prompt(summary) + assert text == "" + + def test_format_summary_non_dict_value( + self, summarizer: ContextSummarizer + ) -> None: + """format_summary_for_prompt must render non-dict values as plain text.""" + summary = { + "daily": { + "plain_count": 42, + } + } + text = summarizer.format_summary_for_prompt(summary) + assert "plain_count" in text + assert "42" in text diff --git a/tests/test_logging_config.py b/tests/test_logging_config.py new file mode 100644 index 0000000..526f692 --- /dev/null +++ b/tests/test_logging_config.py @@ -0,0 +1,117 @@ +"""Tests for JSON structured logging configuration.""" + +from __future__ import annotations + +import json +import logging +import sys + +from src.logging_config import JSONFormatter, setup_logging + + +class TestJSONFormatter: + """Test JSONFormatter output.""" + + def test_basic_log_record(self) -> None: + """JSONFormatter must emit valid JSON with required fields.""" + formatter = JSONFormatter() + record = logging.LogRecord( + name="test.logger", + level=logging.INFO, + pathname="", + lineno=0, + msg="Hello %s", + args=("world",), + exc_info=None, + ) + output = formatter.format(record) + data = json.loads(output) + assert data["level"] == "INFO" + assert data["logger"] == "test.logger" + assert data["message"] == "Hello world" + assert "timestamp" in data + + def test_includes_exception_info(self) -> None: + """JSONFormatter must include exception info when present.""" + formatter = JSONFormatter() + try: + raise ValueError("test error") + except ValueError: + exc_info = sys.exc_info() + record = logging.LogRecord( + name="test", + level=logging.ERROR, + pathname="", + lineno=0, + msg="oops", + args=(), + exc_info=exc_info, + ) + output = formatter.format(record) + data = json.loads(output) + assert "exception" in data + assert "ValueError" in data["exception"] + + def test_extra_trading_fields_included(self) -> None: + """Extra trading fields attached to the record must appear in JSON.""" + formatter = JSONFormatter() + record = logging.LogRecord( + name="test", + level=logging.INFO, + pathname="", + lineno=0, + msg="trade", + args=(), + exc_info=None, + ) + record.stock_code = "005930" # type: ignore[attr-defined] + record.action = "BUY" # type: ignore[attr-defined] + record.confidence = 85 # type: ignore[attr-defined] + record.pnl_pct = -1.5 # type: ignore[attr-defined] + record.order_amount = 1_000_000 # type: ignore[attr-defined] + output = formatter.format(record) + data = json.loads(output) + assert data["stock_code"] == "005930" + assert data["action"] == "BUY" + assert data["confidence"] == 85 + assert data["pnl_pct"] == -1.5 + assert data["order_amount"] == 1_000_000 + + def test_none_extra_fields_excluded(self) -> None: + """Extra fields that are None must not appear in JSON output.""" + formatter = JSONFormatter() + record = logging.LogRecord( + name="test", + level=logging.INFO, + pathname="", + lineno=0, + msg="no extras", + args=(), + exc_info=None, + ) + output = formatter.format(record) + data = json.loads(output) + assert "stock_code" not in data + assert "action" not in data + assert "confidence" not in data + + +class TestSetupLogging: + """Test setup_logging function.""" + + def test_configures_root_logger(self) -> None: + """setup_logging must attach a JSON handler to the root logger.""" + setup_logging(level=logging.DEBUG) + root = logging.getLogger() + json_handlers = [ + h for h in root.handlers if isinstance(h.formatter, JSONFormatter) + ] + assert len(json_handlers) == 1 + assert root.level == logging.DEBUG + + def test_avoids_duplicate_handlers(self) -> None: + """Calling setup_logging twice must not add duplicate handlers.""" + setup_logging() + setup_logging() + root = logging.getLogger() + assert len(root.handlers) == 1 diff --git a/tests/test_strategies_base.py b/tests/test_strategies_base.py new file mode 100644 index 0000000..30d5abd --- /dev/null +++ b/tests/test_strategies_base.py @@ -0,0 +1,32 @@ +"""Tests for BaseStrategy abstract class.""" + +from __future__ import annotations + +from typing import Any + +import pytest + +from src.strategies.base import BaseStrategy + + +class ConcreteStrategy(BaseStrategy): + """Minimal concrete strategy for testing.""" + + def evaluate(self, market_data: dict[str, Any]) -> dict[str, Any]: + return {"action": "HOLD", "confidence": 50, "rationale": "test"} + + +def test_base_strategy_cannot_be_instantiated() -> None: + """BaseStrategy cannot be instantiated directly (it's abstract).""" + with pytest.raises(TypeError): + BaseStrategy() # type: ignore[abstract] + + +def test_concrete_strategy_evaluate_returns_decision() -> None: + """Concrete subclass must implement evaluate and return a dict.""" + strategy = ConcreteStrategy() + result = strategy.evaluate({"close": [100.0, 101.0]}) + assert isinstance(result, dict) + assert result["action"] == "HOLD" + assert result["confidence"] == 50 + assert "rationale" in result