188 lines
5.9 KiB
Python
188 lines
5.9 KiB
Python
"""Integrated v2 backtest pipeline.
|
|
|
|
Wires TripleBarrier labeling + WalkForward split + CostGuard validation
|
|
into a single deterministic orchestration path.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import Sequence
|
|
from dataclasses import dataclass
|
|
from statistics import mean
|
|
from typing import Literal
|
|
|
|
from src.analysis.backtest_cost_guard import BacktestCostModel, validate_backtest_cost_model
|
|
from src.analysis.triple_barrier import TripleBarrierSpec, label_with_triple_barrier
|
|
from src.analysis.walk_forward_split import WalkForwardFold, generate_walk_forward_splits
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class BacktestBar:
|
|
high: float
|
|
low: float
|
|
close: float
|
|
session_id: str
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class WalkForwardConfig:
|
|
train_size: int
|
|
test_size: int
|
|
step_size: int | None = None
|
|
purge_size: int = 0
|
|
embargo_size: int = 0
|
|
min_train_size: int = 1
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class BaselineScore:
|
|
name: Literal["B0", "B1", "M1"]
|
|
accuracy: float
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class BacktestFoldResult:
|
|
fold_index: int
|
|
train_indices: list[int]
|
|
test_indices: list[int]
|
|
train_label_distribution: dict[int, int]
|
|
test_label_distribution: dict[int, int]
|
|
baseline_scores: list[BaselineScore]
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class BacktestPipelineResult:
|
|
run_id: str
|
|
n_bars: int
|
|
n_entries: int
|
|
required_sessions: list[str]
|
|
label_distribution: dict[int, int]
|
|
folds: list[BacktestFoldResult]
|
|
|
|
|
|
def run_v2_backtest_pipeline(
|
|
*,
|
|
bars: Sequence[BacktestBar],
|
|
entry_indices: Sequence[int],
|
|
side: int,
|
|
triple_barrier_spec: TripleBarrierSpec,
|
|
walk_forward: WalkForwardConfig,
|
|
cost_model: BacktestCostModel,
|
|
required_sessions: list[str] | None = None,
|
|
) -> BacktestPipelineResult:
|
|
"""Run v2 integrated pipeline (cost guard -> labels -> walk-forward baselines)."""
|
|
if not bars:
|
|
raise ValueError("bars must not be empty")
|
|
if not entry_indices:
|
|
raise ValueError("entry_indices must not be empty")
|
|
|
|
resolved_sessions = (
|
|
sorted(set(required_sessions))
|
|
if required_sessions is not None
|
|
else sorted({bar.session_id for bar in bars})
|
|
)
|
|
validate_backtest_cost_model(model=cost_model, required_sessions=resolved_sessions)
|
|
|
|
highs = [float(bar.high) for bar in bars]
|
|
lows = [float(bar.low) for bar in bars]
|
|
closes = [float(bar.close) for bar in bars]
|
|
normalized_entries = sorted(set(int(i) for i in entry_indices))
|
|
if normalized_entries[0] < 0 or normalized_entries[-1] >= len(bars):
|
|
raise IndexError("entry index out of range")
|
|
|
|
labels_by_bar_index: dict[int, int] = {}
|
|
for idx in normalized_entries:
|
|
labels_by_bar_index[idx] = label_with_triple_barrier(
|
|
highs=highs,
|
|
lows=lows,
|
|
closes=closes,
|
|
entry_index=idx,
|
|
side=side,
|
|
spec=triple_barrier_spec,
|
|
).label
|
|
|
|
ordered_labels = [labels_by_bar_index[idx] for idx in normalized_entries]
|
|
folds = generate_walk_forward_splits(
|
|
n_samples=len(normalized_entries),
|
|
train_size=walk_forward.train_size,
|
|
test_size=walk_forward.test_size,
|
|
step_size=walk_forward.step_size,
|
|
purge_size=walk_forward.purge_size,
|
|
embargo_size=walk_forward.embargo_size,
|
|
min_train_size=walk_forward.min_train_size,
|
|
)
|
|
|
|
fold_results: list[BacktestFoldResult] = []
|
|
for fold_idx, fold in enumerate(folds):
|
|
train_labels = [ordered_labels[i] for i in fold.train_indices]
|
|
test_labels = [ordered_labels[i] for i in fold.test_indices]
|
|
if not test_labels:
|
|
continue
|
|
fold_results.append(
|
|
BacktestFoldResult(
|
|
fold_index=fold_idx,
|
|
train_indices=fold.train_indices,
|
|
test_indices=fold.test_indices,
|
|
train_label_distribution=_label_dist(train_labels),
|
|
test_label_distribution=_label_dist(test_labels),
|
|
baseline_scores=[
|
|
BaselineScore(name="B0", accuracy=_baseline_b0(train_labels, test_labels)),
|
|
BaselineScore(name="B1", accuracy=_score_constant(1, test_labels)),
|
|
BaselineScore(
|
|
name="M1",
|
|
accuracy=_score_constant(_m1_pred(train_labels), test_labels),
|
|
),
|
|
],
|
|
)
|
|
)
|
|
|
|
return BacktestPipelineResult(
|
|
run_id=_build_run_id(
|
|
n_entries=len(normalized_entries),
|
|
n_folds=len(fold_results),
|
|
sessions=resolved_sessions,
|
|
),
|
|
n_bars=len(bars),
|
|
n_entries=len(normalized_entries),
|
|
required_sessions=resolved_sessions,
|
|
label_distribution=_label_dist(ordered_labels),
|
|
folds=fold_results,
|
|
)
|
|
|
|
|
|
def _label_dist(labels: Sequence[int]) -> dict[int, int]:
|
|
dist: dict[int, int] = {-1: 0, 0: 0, 1: 0}
|
|
for val in labels:
|
|
if val in dist:
|
|
dist[val] += 1
|
|
return dist
|
|
|
|
|
|
def _score_constant(pred: int, actual: Sequence[int]) -> float:
|
|
return mean(1.0 if pred == label else 0.0 for label in actual)
|
|
|
|
|
|
def _baseline_b0(train_labels: Sequence[int], test_labels: Sequence[int]) -> float:
|
|
if not train_labels:
|
|
return _score_constant(0, test_labels)
|
|
# Majority-class baseline from training fold.
|
|
choices = (-1, 0, 1)
|
|
pred = max(choices, key=lambda c: train_labels.count(c))
|
|
return _score_constant(pred, test_labels)
|
|
|
|
|
|
def _m1_pred(train_labels: Sequence[int]) -> int:
|
|
if not train_labels:
|
|
return 0
|
|
return train_labels[-1]
|
|
|
|
|
|
def _build_run_id(*, n_entries: int, n_folds: int, sessions: Sequence[str]) -> str:
|
|
sess_key = "_".join(sessions)
|
|
return f"v2p-e{n_entries}-f{n_folds}-s{sess_key}"
|
|
|
|
|
|
def fold_has_leakage(fold: WalkForwardFold) -> bool:
|
|
"""Utility for tests/verification: True when train/test overlap exists."""
|
|
return bool(set(fold.train_indices).intersection(fold.test_indices))
|