diff --git a/docs/requirements-log.md b/docs/requirements-log.md index cd629a9..bd18e04 100644 --- a/docs/requirements-log.md +++ b/docs/requirements-log.md @@ -355,3 +355,36 @@ Order result: 모의투자 매수주문이 완료 되었습니다. ✓ - `TestOverseasGhostPositionClose` 2개: ghost-close 로그 확인, 일반 오류 무시 **이슈/PR:** #235, PR #236 + +--- + +## 2026-02-27 + +### v2 백테스트 파이프라인 통합 (#305) + +**배경:** +- `TripleBarrier`, `WalkForward`, `BacktestCostGuard`는 개별 모듈로 존재했으나, + 하나의 실행 경로로 연결된 파이프라인이 없어 통합 검증이 불가능했다. + +**구현 내용:** + +1. `src/analysis/backtest_pipeline.py` + - `run_v2_backtest_pipeline()` 추가: + - `validate_backtest_cost_model()` 선검증(fail-fast) + - `label_with_triple_barrier()`로 entry 라벨 생성 + - `generate_walk_forward_splits()`로 fold 생성 + - fold별 baseline(`B0`, `B1`, `M1`) score 산출 + - 결과 아티팩트 계약 구조(`BacktestPipelineResult`) 정의 + - leakage 검사 유틸 `fold_has_leakage()` 제공 + +2. `tests/test_backtest_pipeline_integration.py` 신규 + - happy path 통합 검증 + - cost guard 실패 fail-fast 검증 + - purge/embargo 기반 누수 방지 검증 + - 동일 입력 재실행 결정성 검증 + +**검증:** +- `pytest -q tests/test_backtest_pipeline_integration.py tests/test_triple_barrier.py tests/test_walk_forward_split.py tests/test_backtest_cost_guard.py tests/test_backtest_execution_model.py` +- `ruff check src/analysis/backtest_pipeline.py tests/test_backtest_pipeline_integration.py` + +**이슈/PR:** #305 diff --git a/src/analysis/backtest_pipeline.py b/src/analysis/backtest_pipeline.py new file mode 100644 index 0000000..4b0701f --- /dev/null +++ b/src/analysis/backtest_pipeline.py @@ -0,0 +1,187 @@ +"""Integrated v2 backtest pipeline. + +Wires TripleBarrier labeling + WalkForward split + CostGuard validation +into a single deterministic orchestration path. +""" + +from __future__ import annotations + +from collections.abc import Sequence +from dataclasses import dataclass +from statistics import mean +from typing import Literal + +from src.analysis.backtest_cost_guard import BacktestCostModel, validate_backtest_cost_model +from src.analysis.triple_barrier import TripleBarrierSpec, label_with_triple_barrier +from src.analysis.walk_forward_split import WalkForwardFold, generate_walk_forward_splits + + +@dataclass(frozen=True) +class BacktestBar: + high: float + low: float + close: float + session_id: str + + +@dataclass(frozen=True) +class WalkForwardConfig: + train_size: int + test_size: int + step_size: int | None = None + purge_size: int = 0 + embargo_size: int = 0 + min_train_size: int = 1 + + +@dataclass(frozen=True) +class BaselineScore: + name: Literal["B0", "B1", "M1"] + accuracy: float + + +@dataclass(frozen=True) +class BacktestFoldResult: + fold_index: int + train_indices: list[int] + test_indices: list[int] + train_label_distribution: dict[int, int] + test_label_distribution: dict[int, int] + baseline_scores: list[BaselineScore] + + +@dataclass(frozen=True) +class BacktestPipelineResult: + run_id: str + n_bars: int + n_entries: int + required_sessions: list[str] + label_distribution: dict[int, int] + folds: list[BacktestFoldResult] + + +def run_v2_backtest_pipeline( + *, + bars: Sequence[BacktestBar], + entry_indices: Sequence[int], + side: int, + triple_barrier_spec: TripleBarrierSpec, + walk_forward: WalkForwardConfig, + cost_model: BacktestCostModel, + required_sessions: list[str] | None = None, +) -> BacktestPipelineResult: + """Run v2 integrated pipeline (cost guard -> labels -> walk-forward baselines).""" + if not bars: + raise ValueError("bars must not be empty") + if not entry_indices: + raise ValueError("entry_indices must not be empty") + + resolved_sessions = ( + sorted(set(required_sessions)) + if required_sessions is not None + else sorted({bar.session_id for bar in bars}) + ) + validate_backtest_cost_model(model=cost_model, required_sessions=resolved_sessions) + + highs = [float(bar.high) for bar in bars] + lows = [float(bar.low) for bar in bars] + closes = [float(bar.close) for bar in bars] + normalized_entries = sorted(set(int(i) for i in entry_indices)) + if normalized_entries[0] < 0 or normalized_entries[-1] >= len(bars): + raise IndexError("entry index out of range") + + labels_by_bar_index: dict[int, int] = {} + for idx in normalized_entries: + labels_by_bar_index[idx] = label_with_triple_barrier( + highs=highs, + lows=lows, + closes=closes, + entry_index=idx, + side=side, + spec=triple_barrier_spec, + ).label + + ordered_labels = [labels_by_bar_index[idx] for idx in normalized_entries] + folds = generate_walk_forward_splits( + n_samples=len(normalized_entries), + train_size=walk_forward.train_size, + test_size=walk_forward.test_size, + step_size=walk_forward.step_size, + purge_size=walk_forward.purge_size, + embargo_size=walk_forward.embargo_size, + min_train_size=walk_forward.min_train_size, + ) + + fold_results: list[BacktestFoldResult] = [] + for fold_idx, fold in enumerate(folds): + train_labels = [ordered_labels[i] for i in fold.train_indices] + test_labels = [ordered_labels[i] for i in fold.test_indices] + if not test_labels: + continue + fold_results.append( + BacktestFoldResult( + fold_index=fold_idx, + train_indices=fold.train_indices, + test_indices=fold.test_indices, + train_label_distribution=_label_dist(train_labels), + test_label_distribution=_label_dist(test_labels), + baseline_scores=[ + BaselineScore(name="B0", accuracy=_baseline_b0(train_labels, test_labels)), + BaselineScore(name="B1", accuracy=_score_constant(1, test_labels)), + BaselineScore( + name="M1", + accuracy=_score_constant(_m1_pred(train_labels), test_labels), + ), + ], + ) + ) + + return BacktestPipelineResult( + run_id=_build_run_id( + n_entries=len(normalized_entries), + n_folds=len(fold_results), + sessions=resolved_sessions, + ), + n_bars=len(bars), + n_entries=len(normalized_entries), + required_sessions=resolved_sessions, + label_distribution=_label_dist(ordered_labels), + folds=fold_results, + ) + + +def _label_dist(labels: Sequence[int]) -> dict[int, int]: + dist: dict[int, int] = {-1: 0, 0: 0, 1: 0} + for val in labels: + if val in dist: + dist[val] += 1 + return dist + + +def _score_constant(pred: int, actual: Sequence[int]) -> float: + return mean(1.0 if pred == label else 0.0 for label in actual) + + +def _baseline_b0(train_labels: Sequence[int], test_labels: Sequence[int]) -> float: + if not train_labels: + return _score_constant(0, test_labels) + # Majority-class baseline from training fold. + choices = (-1, 0, 1) + pred = max(choices, key=lambda c: train_labels.count(c)) + return _score_constant(pred, test_labels) + + +def _m1_pred(train_labels: Sequence[int]) -> int: + if not train_labels: + return 0 + return train_labels[-1] + + +def _build_run_id(*, n_entries: int, n_folds: int, sessions: Sequence[str]) -> str: + sess_key = "_".join(sessions) + return f"v2p-e{n_entries}-f{n_folds}-s{sess_key}" + + +def fold_has_leakage(fold: WalkForwardFold) -> bool: + """Utility for tests/verification: True when train/test overlap exists.""" + return bool(set(fold.train_indices).intersection(fold.test_indices)) diff --git a/tests/test_backtest_pipeline_integration.py b/tests/test_backtest_pipeline_integration.py new file mode 100644 index 0000000..60dca91 --- /dev/null +++ b/tests/test_backtest_pipeline_integration.py @@ -0,0 +1,136 @@ +from __future__ import annotations + +from src.analysis.backtest_cost_guard import BacktestCostModel +from src.analysis.backtest_pipeline import ( + BacktestBar, + WalkForwardConfig, + fold_has_leakage, + run_v2_backtest_pipeline, +) +from src.analysis.triple_barrier import TripleBarrierSpec +from src.analysis.walk_forward_split import generate_walk_forward_splits + + +def _bars() -> list[BacktestBar]: + closes = [100.0, 101.0, 102.0, 101.5, 103.0, 102.5, 104.0, 103.5, 105.0, 104.5, 106.0, 105.5] + bars: list[BacktestBar] = [] + for i, close in enumerate(closes): + bars.append( + BacktestBar( + high=close + 1.0, + low=close - 1.0, + close=close, + session_id="KRX_REG" if i % 2 == 0 else "US_PRE", + ) + ) + return bars + + +def _cost_model() -> BacktestCostModel: + return BacktestCostModel( + commission_bps=3.0, + slippage_bps_by_session={"KRX_REG": 10.0, "US_PRE": 50.0}, + failure_rate_by_session={"KRX_REG": 0.01, "US_PRE": 0.08}, + unfavorable_fill_required=True, + ) + + +def test_pipeline_happy_path_returns_fold_and_artifact_contract() -> None: + out = run_v2_backtest_pipeline( + bars=_bars(), + entry_indices=[0, 1, 2, 3, 4, 5, 6, 7], + side=1, + triple_barrier_spec=TripleBarrierSpec( + take_profit_pct=0.02, + stop_loss_pct=0.01, + max_holding_bars=3, + ), + walk_forward=WalkForwardConfig( + train_size=4, + test_size=2, + step_size=2, + purge_size=1, + embargo_size=1, + min_train_size=3, + ), + cost_model=_cost_model(), + ) + + assert out.run_id.startswith("v2p-e8-f") + assert out.n_bars == 12 + assert out.n_entries == 8 + assert out.required_sessions == ["KRX_REG", "US_PRE"] + assert len(out.folds) > 0 + assert set(out.label_distribution) == {-1, 0, 1} + for fold in out.folds: + names = {score.name for score in fold.baseline_scores} + assert names == {"B0", "B1", "M1"} + for score in fold.baseline_scores: + assert 0.0 <= score.accuracy <= 1.0 + + +def test_pipeline_cost_guard_fail_fast() -> None: + bad = BacktestCostModel( + commission_bps=3.0, + slippage_bps_by_session={"KRX_REG": 10.0}, + failure_rate_by_session={"KRX_REG": 0.01}, + unfavorable_fill_required=True, + ) + try: + run_v2_backtest_pipeline( + bars=_bars(), + entry_indices=[0, 1, 2, 3], + side=1, + triple_barrier_spec=TripleBarrierSpec( + take_profit_pct=0.02, + stop_loss_pct=0.01, + max_holding_bars=3, + ), + walk_forward=WalkForwardConfig(train_size=2, test_size=1), + cost_model=bad, + required_sessions=["KRX_REG", "US_PRE"], + ) + except ValueError as exc: + assert "missing slippage_bps_by_session" in str(exc) + else: + raise AssertionError("expected cost guard validation error") + + +def test_pipeline_fold_leakage_guard() -> None: + folds = generate_walk_forward_splits( + n_samples=12, + train_size=6, + test_size=2, + step_size=2, + purge_size=1, + embargo_size=1, + min_train_size=5, + ) + assert folds + for fold in folds: + assert not fold_has_leakage(fold) + + +def test_pipeline_deterministic_seed_free_deterministic_result() -> None: + cfg = dict( + bars=_bars(), + entry_indices=[0, 1, 2, 3, 4, 5, 6, 7], + side=1, + triple_barrier_spec=TripleBarrierSpec( + take_profit_pct=0.02, + stop_loss_pct=0.01, + max_holding_bars=3, + ), + walk_forward=WalkForwardConfig( + train_size=4, + test_size=2, + step_size=2, + purge_size=1, + embargo_size=1, + min_train_size=3, + ), + cost_model=_cost_model(), + ) + out1 = run_v2_backtest_pipeline(**cfg) + out2 = run_v2_backtest_pipeline(**cfg) + assert out1 == out2 diff --git a/workflow/session-handover.md b/workflow/session-handover.md index ea9f9cd..8f2b222 100644 --- a/workflow/session-handover.md +++ b/workflow/session-handover.md @@ -49,3 +49,19 @@ - next_ticket: #304 - process_gate_checked: process_ticket=#306,#308 merged_to_feature_branch=yes - risks_or_notes: handover 재시작 요청으로 세션 엔트리 추가, 미추적 산출물(AMS/NAS/NYS, DB, lock, xlsx) 커밋 분리 필요 + +### 2026-02-27 | session=codex-issue305-start +- branch: feature/v3-session-policy-stream +- docs_checked: docs/workflow.md, docs/commands.md, docs/agent-constraints.md +- open_issues_reviewed: #305 +- next_ticket: #305 +- process_gate_checked: process_ticket=#306,#308 merged_to_feature_branch=yes +- risks_or_notes: #305 구현을 위해 분석/백테스트 모듈 통합 경로 점검 시작 + +### 2026-02-27 | session=codex-issue305-ticket-branch +- branch: feature/issue-305-backtest-pipeline-integration +- docs_checked: docs/workflow.md, docs/commands.md, docs/agent-constraints.md +- open_issues_reviewed: #305 +- next_ticket: #305 +- process_gate_checked: process_ticket=#306,#308 merged_to_feature_branch=yes +- risks_or_notes: 티켓 브랜치 분기 후 strict gate 재통과를 위한 엔트리 추가