Merge pull request 'feat: integrate v2 backtest validation pipeline (#305)' (#313) from feature/issue-305-backtest-pipeline-integration into feature/v3-session-policy-stream
Some checks failed
Gitea CI / test (push) Has been cancelled
Some checks failed
Gitea CI / test (push) Has been cancelled
This commit was merged in pull request #313.
This commit is contained in:
@@ -355,3 +355,36 @@ Order result: 모의투자 매수주문이 완료 되었습니다. ✓
|
||||
- `TestOverseasGhostPositionClose` 2개: ghost-close 로그 확인, 일반 오류 무시
|
||||
|
||||
**이슈/PR:** #235, PR #236
|
||||
|
||||
---
|
||||
|
||||
## 2026-02-27
|
||||
|
||||
### v2 백테스트 파이프라인 통합 (#305)
|
||||
|
||||
**배경:**
|
||||
- `TripleBarrier`, `WalkForward`, `BacktestCostGuard`는 개별 모듈로 존재했으나,
|
||||
하나의 실행 경로로 연결된 파이프라인이 없어 통합 검증이 불가능했다.
|
||||
|
||||
**구현 내용:**
|
||||
|
||||
1. `src/analysis/backtest_pipeline.py`
|
||||
- `run_v2_backtest_pipeline()` 추가:
|
||||
- `validate_backtest_cost_model()` 선검증(fail-fast)
|
||||
- `label_with_triple_barrier()`로 entry 라벨 생성
|
||||
- `generate_walk_forward_splits()`로 fold 생성
|
||||
- fold별 baseline(`B0`, `B1`, `M1`) score 산출
|
||||
- 결과 아티팩트 계약 구조(`BacktestPipelineResult`) 정의
|
||||
- leakage 검사 유틸 `fold_has_leakage()` 제공
|
||||
|
||||
2. `tests/test_backtest_pipeline_integration.py` 신규
|
||||
- happy path 통합 검증
|
||||
- cost guard 실패 fail-fast 검증
|
||||
- purge/embargo 기반 누수 방지 검증
|
||||
- 동일 입력 재실행 결정성 검증
|
||||
|
||||
**검증:**
|
||||
- `pytest -q tests/test_backtest_pipeline_integration.py tests/test_triple_barrier.py tests/test_walk_forward_split.py tests/test_backtest_cost_guard.py tests/test_backtest_execution_model.py`
|
||||
- `ruff check src/analysis/backtest_pipeline.py tests/test_backtest_pipeline_integration.py`
|
||||
|
||||
**이슈/PR:** #305
|
||||
|
||||
187
src/analysis/backtest_pipeline.py
Normal file
187
src/analysis/backtest_pipeline.py
Normal file
@@ -0,0 +1,187 @@
|
||||
"""Integrated v2 backtest pipeline.
|
||||
|
||||
Wires TripleBarrier labeling + WalkForward split + CostGuard validation
|
||||
into a single deterministic orchestration path.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Sequence
|
||||
from dataclasses import dataclass
|
||||
from statistics import mean
|
||||
from typing import Literal
|
||||
|
||||
from src.analysis.backtest_cost_guard import BacktestCostModel, validate_backtest_cost_model
|
||||
from src.analysis.triple_barrier import TripleBarrierSpec, label_with_triple_barrier
|
||||
from src.analysis.walk_forward_split import WalkForwardFold, generate_walk_forward_splits
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class BacktestBar:
|
||||
high: float
|
||||
low: float
|
||||
close: float
|
||||
session_id: str
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class WalkForwardConfig:
|
||||
train_size: int
|
||||
test_size: int
|
||||
step_size: int | None = None
|
||||
purge_size: int = 0
|
||||
embargo_size: int = 0
|
||||
min_train_size: int = 1
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class BaselineScore:
|
||||
name: Literal["B0", "B1", "M1"]
|
||||
accuracy: float
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class BacktestFoldResult:
|
||||
fold_index: int
|
||||
train_indices: list[int]
|
||||
test_indices: list[int]
|
||||
train_label_distribution: dict[int, int]
|
||||
test_label_distribution: dict[int, int]
|
||||
baseline_scores: list[BaselineScore]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class BacktestPipelineResult:
|
||||
run_id: str
|
||||
n_bars: int
|
||||
n_entries: int
|
||||
required_sessions: list[str]
|
||||
label_distribution: dict[int, int]
|
||||
folds: list[BacktestFoldResult]
|
||||
|
||||
|
||||
def run_v2_backtest_pipeline(
|
||||
*,
|
||||
bars: Sequence[BacktestBar],
|
||||
entry_indices: Sequence[int],
|
||||
side: int,
|
||||
triple_barrier_spec: TripleBarrierSpec,
|
||||
walk_forward: WalkForwardConfig,
|
||||
cost_model: BacktestCostModel,
|
||||
required_sessions: list[str] | None = None,
|
||||
) -> BacktestPipelineResult:
|
||||
"""Run v2 integrated pipeline (cost guard -> labels -> walk-forward baselines)."""
|
||||
if not bars:
|
||||
raise ValueError("bars must not be empty")
|
||||
if not entry_indices:
|
||||
raise ValueError("entry_indices must not be empty")
|
||||
|
||||
resolved_sessions = (
|
||||
sorted(set(required_sessions))
|
||||
if required_sessions is not None
|
||||
else sorted({bar.session_id for bar in bars})
|
||||
)
|
||||
validate_backtest_cost_model(model=cost_model, required_sessions=resolved_sessions)
|
||||
|
||||
highs = [float(bar.high) for bar in bars]
|
||||
lows = [float(bar.low) for bar in bars]
|
||||
closes = [float(bar.close) for bar in bars]
|
||||
normalized_entries = sorted(set(int(i) for i in entry_indices))
|
||||
if normalized_entries[0] < 0 or normalized_entries[-1] >= len(bars):
|
||||
raise IndexError("entry index out of range")
|
||||
|
||||
labels_by_bar_index: dict[int, int] = {}
|
||||
for idx in normalized_entries:
|
||||
labels_by_bar_index[idx] = label_with_triple_barrier(
|
||||
highs=highs,
|
||||
lows=lows,
|
||||
closes=closes,
|
||||
entry_index=idx,
|
||||
side=side,
|
||||
spec=triple_barrier_spec,
|
||||
).label
|
||||
|
||||
ordered_labels = [labels_by_bar_index[idx] for idx in normalized_entries]
|
||||
folds = generate_walk_forward_splits(
|
||||
n_samples=len(normalized_entries),
|
||||
train_size=walk_forward.train_size,
|
||||
test_size=walk_forward.test_size,
|
||||
step_size=walk_forward.step_size,
|
||||
purge_size=walk_forward.purge_size,
|
||||
embargo_size=walk_forward.embargo_size,
|
||||
min_train_size=walk_forward.min_train_size,
|
||||
)
|
||||
|
||||
fold_results: list[BacktestFoldResult] = []
|
||||
for fold_idx, fold in enumerate(folds):
|
||||
train_labels = [ordered_labels[i] for i in fold.train_indices]
|
||||
test_labels = [ordered_labels[i] for i in fold.test_indices]
|
||||
if not test_labels:
|
||||
continue
|
||||
fold_results.append(
|
||||
BacktestFoldResult(
|
||||
fold_index=fold_idx,
|
||||
train_indices=fold.train_indices,
|
||||
test_indices=fold.test_indices,
|
||||
train_label_distribution=_label_dist(train_labels),
|
||||
test_label_distribution=_label_dist(test_labels),
|
||||
baseline_scores=[
|
||||
BaselineScore(name="B0", accuracy=_baseline_b0(train_labels, test_labels)),
|
||||
BaselineScore(name="B1", accuracy=_score_constant(1, test_labels)),
|
||||
BaselineScore(
|
||||
name="M1",
|
||||
accuracy=_score_constant(_m1_pred(train_labels), test_labels),
|
||||
),
|
||||
],
|
||||
)
|
||||
)
|
||||
|
||||
return BacktestPipelineResult(
|
||||
run_id=_build_run_id(
|
||||
n_entries=len(normalized_entries),
|
||||
n_folds=len(fold_results),
|
||||
sessions=resolved_sessions,
|
||||
),
|
||||
n_bars=len(bars),
|
||||
n_entries=len(normalized_entries),
|
||||
required_sessions=resolved_sessions,
|
||||
label_distribution=_label_dist(ordered_labels),
|
||||
folds=fold_results,
|
||||
)
|
||||
|
||||
|
||||
def _label_dist(labels: Sequence[int]) -> dict[int, int]:
|
||||
dist: dict[int, int] = {-1: 0, 0: 0, 1: 0}
|
||||
for val in labels:
|
||||
if val in dist:
|
||||
dist[val] += 1
|
||||
return dist
|
||||
|
||||
|
||||
def _score_constant(pred: int, actual: Sequence[int]) -> float:
|
||||
return mean(1.0 if pred == label else 0.0 for label in actual)
|
||||
|
||||
|
||||
def _baseline_b0(train_labels: Sequence[int], test_labels: Sequence[int]) -> float:
|
||||
if not train_labels:
|
||||
return _score_constant(0, test_labels)
|
||||
# Majority-class baseline from training fold.
|
||||
choices = (-1, 0, 1)
|
||||
pred = max(choices, key=lambda c: train_labels.count(c))
|
||||
return _score_constant(pred, test_labels)
|
||||
|
||||
|
||||
def _m1_pred(train_labels: Sequence[int]) -> int:
|
||||
if not train_labels:
|
||||
return 0
|
||||
return train_labels[-1]
|
||||
|
||||
|
||||
def _build_run_id(*, n_entries: int, n_folds: int, sessions: Sequence[str]) -> str:
|
||||
sess_key = "_".join(sessions)
|
||||
return f"v2p-e{n_entries}-f{n_folds}-s{sess_key}"
|
||||
|
||||
|
||||
def fold_has_leakage(fold: WalkForwardFold) -> bool:
|
||||
"""Utility for tests/verification: True when train/test overlap exists."""
|
||||
return bool(set(fold.train_indices).intersection(fold.test_indices))
|
||||
136
tests/test_backtest_pipeline_integration.py
Normal file
136
tests/test_backtest_pipeline_integration.py
Normal file
@@ -0,0 +1,136 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from src.analysis.backtest_cost_guard import BacktestCostModel
|
||||
from src.analysis.backtest_pipeline import (
|
||||
BacktestBar,
|
||||
WalkForwardConfig,
|
||||
fold_has_leakage,
|
||||
run_v2_backtest_pipeline,
|
||||
)
|
||||
from src.analysis.triple_barrier import TripleBarrierSpec
|
||||
from src.analysis.walk_forward_split import generate_walk_forward_splits
|
||||
|
||||
|
||||
def _bars() -> list[BacktestBar]:
|
||||
closes = [100.0, 101.0, 102.0, 101.5, 103.0, 102.5, 104.0, 103.5, 105.0, 104.5, 106.0, 105.5]
|
||||
bars: list[BacktestBar] = []
|
||||
for i, close in enumerate(closes):
|
||||
bars.append(
|
||||
BacktestBar(
|
||||
high=close + 1.0,
|
||||
low=close - 1.0,
|
||||
close=close,
|
||||
session_id="KRX_REG" if i % 2 == 0 else "US_PRE",
|
||||
)
|
||||
)
|
||||
return bars
|
||||
|
||||
|
||||
def _cost_model() -> BacktestCostModel:
|
||||
return BacktestCostModel(
|
||||
commission_bps=3.0,
|
||||
slippage_bps_by_session={"KRX_REG": 10.0, "US_PRE": 50.0},
|
||||
failure_rate_by_session={"KRX_REG": 0.01, "US_PRE": 0.08},
|
||||
unfavorable_fill_required=True,
|
||||
)
|
||||
|
||||
|
||||
def test_pipeline_happy_path_returns_fold_and_artifact_contract() -> None:
|
||||
out = run_v2_backtest_pipeline(
|
||||
bars=_bars(),
|
||||
entry_indices=[0, 1, 2, 3, 4, 5, 6, 7],
|
||||
side=1,
|
||||
triple_barrier_spec=TripleBarrierSpec(
|
||||
take_profit_pct=0.02,
|
||||
stop_loss_pct=0.01,
|
||||
max_holding_bars=3,
|
||||
),
|
||||
walk_forward=WalkForwardConfig(
|
||||
train_size=4,
|
||||
test_size=2,
|
||||
step_size=2,
|
||||
purge_size=1,
|
||||
embargo_size=1,
|
||||
min_train_size=3,
|
||||
),
|
||||
cost_model=_cost_model(),
|
||||
)
|
||||
|
||||
assert out.run_id.startswith("v2p-e8-f")
|
||||
assert out.n_bars == 12
|
||||
assert out.n_entries == 8
|
||||
assert out.required_sessions == ["KRX_REG", "US_PRE"]
|
||||
assert len(out.folds) > 0
|
||||
assert set(out.label_distribution) == {-1, 0, 1}
|
||||
for fold in out.folds:
|
||||
names = {score.name for score in fold.baseline_scores}
|
||||
assert names == {"B0", "B1", "M1"}
|
||||
for score in fold.baseline_scores:
|
||||
assert 0.0 <= score.accuracy <= 1.0
|
||||
|
||||
|
||||
def test_pipeline_cost_guard_fail_fast() -> None:
|
||||
bad = BacktestCostModel(
|
||||
commission_bps=3.0,
|
||||
slippage_bps_by_session={"KRX_REG": 10.0},
|
||||
failure_rate_by_session={"KRX_REG": 0.01},
|
||||
unfavorable_fill_required=True,
|
||||
)
|
||||
try:
|
||||
run_v2_backtest_pipeline(
|
||||
bars=_bars(),
|
||||
entry_indices=[0, 1, 2, 3],
|
||||
side=1,
|
||||
triple_barrier_spec=TripleBarrierSpec(
|
||||
take_profit_pct=0.02,
|
||||
stop_loss_pct=0.01,
|
||||
max_holding_bars=3,
|
||||
),
|
||||
walk_forward=WalkForwardConfig(train_size=2, test_size=1),
|
||||
cost_model=bad,
|
||||
required_sessions=["KRX_REG", "US_PRE"],
|
||||
)
|
||||
except ValueError as exc:
|
||||
assert "missing slippage_bps_by_session" in str(exc)
|
||||
else:
|
||||
raise AssertionError("expected cost guard validation error")
|
||||
|
||||
|
||||
def test_pipeline_fold_leakage_guard() -> None:
|
||||
folds = generate_walk_forward_splits(
|
||||
n_samples=12,
|
||||
train_size=6,
|
||||
test_size=2,
|
||||
step_size=2,
|
||||
purge_size=1,
|
||||
embargo_size=1,
|
||||
min_train_size=5,
|
||||
)
|
||||
assert folds
|
||||
for fold in folds:
|
||||
assert not fold_has_leakage(fold)
|
||||
|
||||
|
||||
def test_pipeline_deterministic_seed_free_deterministic_result() -> None:
|
||||
cfg = dict(
|
||||
bars=_bars(),
|
||||
entry_indices=[0, 1, 2, 3, 4, 5, 6, 7],
|
||||
side=1,
|
||||
triple_barrier_spec=TripleBarrierSpec(
|
||||
take_profit_pct=0.02,
|
||||
stop_loss_pct=0.01,
|
||||
max_holding_bars=3,
|
||||
),
|
||||
walk_forward=WalkForwardConfig(
|
||||
train_size=4,
|
||||
test_size=2,
|
||||
step_size=2,
|
||||
purge_size=1,
|
||||
embargo_size=1,
|
||||
min_train_size=3,
|
||||
),
|
||||
cost_model=_cost_model(),
|
||||
)
|
||||
out1 = run_v2_backtest_pipeline(**cfg)
|
||||
out2 = run_v2_backtest_pipeline(**cfg)
|
||||
assert out1 == out2
|
||||
@@ -49,3 +49,19 @@
|
||||
- next_ticket: #304
|
||||
- process_gate_checked: process_ticket=#306,#308 merged_to_feature_branch=yes
|
||||
- risks_or_notes: handover 재시작 요청으로 세션 엔트리 추가, 미추적 산출물(AMS/NAS/NYS, DB, lock, xlsx) 커밋 분리 필요
|
||||
|
||||
### 2026-02-27 | session=codex-issue305-start
|
||||
- branch: feature/v3-session-policy-stream
|
||||
- docs_checked: docs/workflow.md, docs/commands.md, docs/agent-constraints.md
|
||||
- open_issues_reviewed: #305
|
||||
- next_ticket: #305
|
||||
- process_gate_checked: process_ticket=#306,#308 merged_to_feature_branch=yes
|
||||
- risks_or_notes: #305 구현을 위해 분석/백테스트 모듈 통합 경로 점검 시작
|
||||
|
||||
### 2026-02-27 | session=codex-issue305-ticket-branch
|
||||
- branch: feature/issue-305-backtest-pipeline-integration
|
||||
- docs_checked: docs/workflow.md, docs/commands.md, docs/agent-constraints.md
|
||||
- open_issues_reviewed: #305
|
||||
- next_ticket: #305
|
||||
- process_gate_checked: process_ticket=#306,#308 merged_to_feature_branch=yes
|
||||
- risks_or_notes: 티켓 브랜치 분기 후 strict gate 재통과를 위한 엔트리 추가
|
||||
|
||||
Reference in New Issue
Block a user