Compare commits
15 Commits
feature/is
...
feature/is
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2331d80915 | ||
|
|
7d72669cb8 | ||
| 74a4784b7a | |||
|
|
dc70311aed | ||
|
|
e56819e9e2 | ||
| cfd5351b58 | |||
|
|
b206c23fc9 | ||
|
|
4d9f3e2cfc | ||
| a93a5c616b | |||
|
|
9f64c9944a | ||
|
|
bb391d502c | ||
| b0100fde10 | |||
|
|
0a4e69d40c | ||
|
|
25401ac132 | ||
| 1381b140ab |
@@ -149,6 +149,7 @@ TPM 티켓 운영 규칙:
|
||||
- TPM은 합의된 변경을 이슈로 등록하고 우선순위(`P0/P1/P2`)를 지정한다.
|
||||
- PR 본문에는 TPM이 지정한 우선순위와 범위가 그대로 반영되어야 한다.
|
||||
- 우선순위 변경은 TPM 제안 + Main Agent 승인으로만 가능하다.
|
||||
- PM/TPM/Dev/Reviewer/Verifier/Runtime Verifier는 주요 의사결정 시점마다 PR 코멘트를 남겨 결정 근거를 추적 가능 상태로 유지한다.
|
||||
|
||||
브랜치 운영 규칙:
|
||||
- TPM은 각 티켓에 대해 `ticket temp branch -> program feature branch` PR 경로를 지정한다.
|
||||
|
||||
@@ -50,6 +50,7 @@ Updated: 2026-02-26
|
||||
- PR 본문에 `REQ-*`, `TASK-*`, `TEST-*` 매핑 표 존재
|
||||
- `src/core/risk_manager.py` 변경 없음
|
||||
- 주요 의사결정 체크포인트(DCP-01~04) 중 해당 단계 Main Agent 확인 기록 존재
|
||||
- 주요 의사결정(리뷰 지적/수정 합의/검증 승인)에 대한 에이전트 PR 코멘트 존재
|
||||
- 티켓 PR의 base가 `main`이 아닌 program feature branch인지 확인
|
||||
|
||||
자동 점검:
|
||||
|
||||
@@ -22,6 +22,7 @@
|
||||
- Ticket-level development happens only on **ticket temp branches** cut from the program feature branch.
|
||||
- Ticket PR merges into program feature branch are allowed after verifier approval.
|
||||
- Until final user sign-off, `main` merge is prohibited.
|
||||
- 각 에이전트는 주요 의사결정(리뷰 지적, 수정 방향, 검증 승인)마다 PR 코멘트를 적극 작성해 의사결정 과정을 남긴다.
|
||||
|
||||
## Gitea CLI Formatting Troubleshooting
|
||||
|
||||
|
||||
52
src/analysis/backtest_cost_guard.py
Normal file
52
src/analysis/backtest_cost_guard.py
Normal file
@@ -0,0 +1,52 @@
|
||||
"""Backtest cost/slippage/failure validation guard."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
import math
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class BacktestCostModel:
|
||||
commission_bps: float | None = None
|
||||
slippage_bps_by_session: dict[str, float] | None = None
|
||||
failure_rate_by_session: dict[str, float] | None = None
|
||||
unfavorable_fill_required: bool = True
|
||||
|
||||
|
||||
def validate_backtest_cost_model(
|
||||
*,
|
||||
model: BacktestCostModel,
|
||||
required_sessions: list[str],
|
||||
) -> None:
|
||||
"""Raise ValueError when required cost assumptions are missing/invalid."""
|
||||
if (
|
||||
model.commission_bps is None
|
||||
or not math.isfinite(model.commission_bps)
|
||||
or model.commission_bps < 0
|
||||
):
|
||||
raise ValueError("commission_bps must be provided and >= 0")
|
||||
if not model.unfavorable_fill_required:
|
||||
raise ValueError("unfavorable_fill_required must be True")
|
||||
|
||||
slippage = model.slippage_bps_by_session or {}
|
||||
failure = model.failure_rate_by_session or {}
|
||||
|
||||
missing_slippage = [s for s in required_sessions if s not in slippage]
|
||||
if missing_slippage:
|
||||
raise ValueError(
|
||||
f"missing slippage_bps_by_session for sessions: {', '.join(missing_slippage)}"
|
||||
)
|
||||
|
||||
missing_failure = [s for s in required_sessions if s not in failure]
|
||||
if missing_failure:
|
||||
raise ValueError(
|
||||
f"missing failure_rate_by_session for sessions: {', '.join(missing_failure)}"
|
||||
)
|
||||
|
||||
for sess, bps in slippage.items():
|
||||
if not math.isfinite(bps) or bps < 0:
|
||||
raise ValueError(f"slippage bps must be >= 0 for session={sess}")
|
||||
for sess, rate in failure.items():
|
||||
if not math.isfinite(rate) or rate < 0 or rate > 1:
|
||||
raise ValueError(f"failure rate must be within [0,1] for session={sess}")
|
||||
111
src/analysis/triple_barrier.py
Normal file
111
src/analysis/triple_barrier.py
Normal file
@@ -0,0 +1,111 @@
|
||||
"""Triple barrier labeler utilities.
|
||||
|
||||
Implements first-touch labeling with upper/lower/time barriers.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Literal, Sequence
|
||||
|
||||
|
||||
TieBreakMode = Literal["stop_first", "take_first"]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class TripleBarrierSpec:
|
||||
take_profit_pct: float
|
||||
stop_loss_pct: float
|
||||
max_holding_bars: int
|
||||
tie_break: TieBreakMode = "stop_first"
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class TripleBarrierLabel:
|
||||
label: int # +1 take-profit first, -1 stop-loss first, 0 timeout
|
||||
touched: Literal["take_profit", "stop_loss", "time"]
|
||||
touch_bar: int
|
||||
entry_price: float
|
||||
upper_barrier: float
|
||||
lower_barrier: float
|
||||
|
||||
|
||||
def label_with_triple_barrier(
|
||||
*,
|
||||
highs: Sequence[float],
|
||||
lows: Sequence[float],
|
||||
closes: Sequence[float],
|
||||
entry_index: int,
|
||||
side: int,
|
||||
spec: TripleBarrierSpec,
|
||||
) -> TripleBarrierLabel:
|
||||
"""Label one entry using triple-barrier first-touch rule.
|
||||
|
||||
Args:
|
||||
highs/lows/closes: OHLC components with identical length.
|
||||
entry_index: Entry bar index in the sequences.
|
||||
side: +1 for long, -1 for short.
|
||||
spec: Barrier specification.
|
||||
"""
|
||||
if side not in {1, -1}:
|
||||
raise ValueError("side must be +1 or -1")
|
||||
if len(highs) != len(lows) or len(highs) != len(closes):
|
||||
raise ValueError("highs, lows, closes lengths must match")
|
||||
if entry_index < 0 or entry_index >= len(closes):
|
||||
raise IndexError("entry_index out of range")
|
||||
if spec.max_holding_bars <= 0:
|
||||
raise ValueError("max_holding_bars must be positive")
|
||||
|
||||
entry_price = float(closes[entry_index])
|
||||
if entry_price <= 0:
|
||||
raise ValueError("entry price must be positive")
|
||||
|
||||
if side == 1:
|
||||
upper = entry_price * (1.0 + spec.take_profit_pct)
|
||||
lower = entry_price * (1.0 - spec.stop_loss_pct)
|
||||
else:
|
||||
# For short side, favorable move is down.
|
||||
upper = entry_price * (1.0 + spec.stop_loss_pct)
|
||||
lower = entry_price * (1.0 - spec.take_profit_pct)
|
||||
|
||||
last_index = min(len(closes) - 1, entry_index + spec.max_holding_bars)
|
||||
for idx in range(entry_index + 1, last_index + 1):
|
||||
h = float(highs[idx])
|
||||
l = float(lows[idx])
|
||||
|
||||
up_touch = h >= upper
|
||||
down_touch = l <= lower
|
||||
if not up_touch and not down_touch:
|
||||
continue
|
||||
|
||||
if up_touch and down_touch:
|
||||
if spec.tie_break == "stop_first":
|
||||
touched = "stop_loss"
|
||||
label = -1
|
||||
else:
|
||||
touched = "take_profit"
|
||||
label = 1
|
||||
elif up_touch:
|
||||
touched = "take_profit" if side == 1 else "stop_loss"
|
||||
label = 1 if side == 1 else -1
|
||||
else:
|
||||
touched = "stop_loss" if side == 1 else "take_profit"
|
||||
label = -1 if side == 1 else 1
|
||||
|
||||
return TripleBarrierLabel(
|
||||
label=label,
|
||||
touched=touched,
|
||||
touch_bar=idx,
|
||||
entry_price=entry_price,
|
||||
upper_barrier=upper,
|
||||
lower_barrier=lower,
|
||||
)
|
||||
|
||||
return TripleBarrierLabel(
|
||||
label=0,
|
||||
touched="time",
|
||||
touch_bar=last_index,
|
||||
entry_price=entry_price,
|
||||
upper_barrier=upper,
|
||||
lower_barrier=lower,
|
||||
)
|
||||
74
src/analysis/walk_forward_split.py
Normal file
74
src/analysis/walk_forward_split.py
Normal file
@@ -0,0 +1,74 @@
|
||||
"""Walk-forward splitter with purge/embargo controls."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class WalkForwardFold:
|
||||
train_indices: list[int]
|
||||
test_indices: list[int]
|
||||
|
||||
@property
|
||||
def train_size(self) -> int:
|
||||
return len(self.train_indices)
|
||||
|
||||
@property
|
||||
def test_size(self) -> int:
|
||||
return len(self.test_indices)
|
||||
|
||||
|
||||
def generate_walk_forward_splits(
|
||||
*,
|
||||
n_samples: int,
|
||||
train_size: int,
|
||||
test_size: int,
|
||||
step_size: int | None = None,
|
||||
purge_size: int = 0,
|
||||
embargo_size: int = 0,
|
||||
min_train_size: int = 1,
|
||||
) -> list[WalkForwardFold]:
|
||||
"""Generate chronological folds with purge/embargo leakage controls."""
|
||||
if n_samples <= 0:
|
||||
raise ValueError("n_samples must be positive")
|
||||
if train_size <= 0 or test_size <= 0:
|
||||
raise ValueError("train_size and test_size must be positive")
|
||||
if purge_size < 0 or embargo_size < 0:
|
||||
raise ValueError("purge_size and embargo_size must be >= 0")
|
||||
if min_train_size <= 0:
|
||||
raise ValueError("min_train_size must be positive")
|
||||
|
||||
step = step_size if step_size is not None else test_size
|
||||
if step <= 0:
|
||||
raise ValueError("step_size must be positive")
|
||||
|
||||
folds: list[WalkForwardFold] = []
|
||||
prev_test_end: int | None = None
|
||||
test_start = train_size + purge_size
|
||||
|
||||
while test_start + test_size <= n_samples:
|
||||
test_end = test_start + test_size - 1
|
||||
train_end = test_start - purge_size - 1
|
||||
if train_end < 0:
|
||||
break
|
||||
|
||||
train_start = max(0, train_end - train_size + 1)
|
||||
train_indices = list(range(train_start, train_end + 1))
|
||||
|
||||
if prev_test_end is not None and embargo_size > 0:
|
||||
emb_from = prev_test_end + 1
|
||||
emb_to = prev_test_end + embargo_size
|
||||
train_indices = [i for i in train_indices if i < emb_from or i > emb_to]
|
||||
|
||||
if len(train_indices) >= min_train_size:
|
||||
folds.append(
|
||||
WalkForwardFold(
|
||||
train_indices=train_indices,
|
||||
test_indices=list(range(test_start, test_end + 1)),
|
||||
)
|
||||
)
|
||||
prev_test_end = test_end
|
||||
test_start += step
|
||||
|
||||
return folds
|
||||
@@ -59,6 +59,7 @@ class Settings(BaseSettings):
|
||||
# KIS VTS overseas balance API returns errors for most accounts.
|
||||
# This value is used as a fallback when the balance API returns 0 in paper mode.
|
||||
PAPER_OVERSEAS_CASH: float = Field(default=50000.0, ge=0.0)
|
||||
USD_BUFFER_MIN: float = Field(default=1000.0, ge=0.0)
|
||||
|
||||
# Trading frequency mode (daily = batch API calls, realtime = per-stock calls)
|
||||
TRADE_MODE: str = Field(default="daily", pattern="^(daily|realtime)$")
|
||||
|
||||
@@ -98,3 +98,8 @@ class BlackoutOrderManager:
|
||||
def requeue(self, intent: QueuedOrderIntent) -> None:
|
||||
if len(self._queue) < self._max_queue_size:
|
||||
self._queue.append(intent)
|
||||
|
||||
def clear(self) -> int:
|
||||
count = len(self._queue)
|
||||
self._queue.clear()
|
||||
return count
|
||||
|
||||
280
src/main.py
280
src/main.py
@@ -429,6 +429,26 @@ def _determine_order_quantity(
|
||||
return quantity
|
||||
|
||||
|
||||
def _should_block_overseas_buy_for_fx_buffer(
|
||||
*,
|
||||
market: MarketInfo,
|
||||
action: str,
|
||||
total_cash: float,
|
||||
order_amount: float,
|
||||
settings: Settings | None,
|
||||
) -> tuple[bool, float, float]:
|
||||
if (
|
||||
market.is_domestic
|
||||
or not market.code.startswith("US")
|
||||
or action != "BUY"
|
||||
or settings is None
|
||||
):
|
||||
return False, total_cash - order_amount, 0.0
|
||||
remaining = total_cash - order_amount
|
||||
required = settings.USD_BUFFER_MIN
|
||||
return remaining < required, remaining, required
|
||||
|
||||
|
||||
async def build_overseas_symbol_universe(
|
||||
db_conn: Any,
|
||||
overseas_broker: OverseasBroker,
|
||||
@@ -636,6 +656,187 @@ async def process_blackout_recovery_orders(
|
||||
BLACKOUT_ORDER_MANAGER.requeue(intent)
|
||||
|
||||
|
||||
def _resolve_kill_switch_markets(
|
||||
*,
|
||||
settings: Settings | None,
|
||||
current_market: MarketInfo | None,
|
||||
) -> list[MarketInfo]:
|
||||
if settings is not None:
|
||||
markets: list[MarketInfo] = []
|
||||
seen: set[str] = set()
|
||||
for market_code in settings.enabled_market_list:
|
||||
market = MARKETS.get(market_code)
|
||||
if market is None or market.code in seen:
|
||||
continue
|
||||
markets.append(market)
|
||||
seen.add(market.code)
|
||||
if markets:
|
||||
return markets
|
||||
if current_market is not None:
|
||||
return [current_market]
|
||||
return []
|
||||
|
||||
|
||||
async def _cancel_pending_orders_for_kill_switch(
|
||||
*,
|
||||
broker: KISBroker,
|
||||
overseas_broker: OverseasBroker,
|
||||
markets: list[MarketInfo],
|
||||
) -> None:
|
||||
failures: list[str] = []
|
||||
domestic = [m for m in markets if m.is_domestic]
|
||||
overseas = [m for m in markets if not m.is_domestic]
|
||||
|
||||
if domestic:
|
||||
try:
|
||||
orders = await broker.get_domestic_pending_orders()
|
||||
except Exception as exc:
|
||||
logger.warning("KillSwitch: failed to fetch domestic pending orders: %s", exc)
|
||||
orders = []
|
||||
for order in orders:
|
||||
stock_code = str(order.get("pdno", ""))
|
||||
try:
|
||||
orgn_odno = order.get("orgn_odno", "")
|
||||
krx_fwdg_ord_orgno = order.get("ord_gno_brno", "")
|
||||
psbl_qty = int(order.get("psbl_qty", "0") or "0")
|
||||
if not stock_code or not orgn_odno or psbl_qty <= 0:
|
||||
continue
|
||||
cancel_result = await broker.cancel_domestic_order(
|
||||
stock_code=stock_code,
|
||||
orgn_odno=orgn_odno,
|
||||
krx_fwdg_ord_orgno=krx_fwdg_ord_orgno,
|
||||
qty=psbl_qty,
|
||||
)
|
||||
if cancel_result.get("rt_cd") != "0":
|
||||
failures.append(
|
||||
"domestic cancel failed for"
|
||||
f" {stock_code}: rt_cd={cancel_result.get('rt_cd')}"
|
||||
f" msg={cancel_result.get('msg1')}"
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("KillSwitch: domestic cancel failed: %s", exc)
|
||||
failures.append(f"domestic cancel exception for {stock_code}: {exc}")
|
||||
|
||||
us_exchanges = frozenset({"NASD", "NYSE", "AMEX"})
|
||||
exchange_codes: list[str] = []
|
||||
seen_us = False
|
||||
for market in overseas:
|
||||
exc_code = market.exchange_code
|
||||
if exc_code in us_exchanges:
|
||||
if not seen_us:
|
||||
exchange_codes.append("NASD")
|
||||
seen_us = True
|
||||
elif exc_code not in exchange_codes:
|
||||
exchange_codes.append(exc_code)
|
||||
|
||||
for exchange_code in exchange_codes:
|
||||
try:
|
||||
orders = await overseas_broker.get_overseas_pending_orders(exchange_code)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"KillSwitch: failed to fetch overseas pending orders for %s: %s",
|
||||
exchange_code,
|
||||
exc,
|
||||
)
|
||||
continue
|
||||
for order in orders:
|
||||
stock_code = str(order.get("pdno", ""))
|
||||
order_exchange = str(order.get("ovrs_excg_cd") or exchange_code)
|
||||
try:
|
||||
odno = order.get("odno", "")
|
||||
nccs_qty = int(order.get("nccs_qty", "0") or "0")
|
||||
if not stock_code or not odno or nccs_qty <= 0:
|
||||
continue
|
||||
cancel_result = await overseas_broker.cancel_overseas_order(
|
||||
exchange_code=order_exchange,
|
||||
stock_code=stock_code,
|
||||
odno=odno,
|
||||
qty=nccs_qty,
|
||||
)
|
||||
if cancel_result.get("rt_cd") != "0":
|
||||
failures.append(
|
||||
"overseas cancel failed for"
|
||||
f" {order_exchange}/{stock_code}: rt_cd={cancel_result.get('rt_cd')}"
|
||||
f" msg={cancel_result.get('msg1')}"
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("KillSwitch: overseas cancel failed: %s", exc)
|
||||
failures.append(
|
||||
f"overseas cancel exception for {order_exchange}/{stock_code}: {exc}"
|
||||
)
|
||||
|
||||
if failures:
|
||||
raise RuntimeError("; ".join(failures[:3]))
|
||||
|
||||
|
||||
async def _refresh_order_state_for_kill_switch(
|
||||
*,
|
||||
broker: KISBroker,
|
||||
overseas_broker: OverseasBroker,
|
||||
markets: list[MarketInfo],
|
||||
) -> None:
|
||||
seen_overseas: set[str] = set()
|
||||
for market in markets:
|
||||
try:
|
||||
if market.is_domestic:
|
||||
await broker.get_balance()
|
||||
elif market.exchange_code not in seen_overseas:
|
||||
seen_overseas.add(market.exchange_code)
|
||||
await overseas_broker.get_overseas_balance(market.exchange_code)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"KillSwitch: refresh state failed for %s/%s: %s",
|
||||
market.code,
|
||||
market.exchange_code,
|
||||
exc,
|
||||
)
|
||||
|
||||
|
||||
def _reduce_risk_for_kill_switch() -> None:
|
||||
dropped = BLACKOUT_ORDER_MANAGER.clear()
|
||||
logger.critical("KillSwitch: reduced queued order risk by clearing %d queued intents", dropped)
|
||||
|
||||
|
||||
async def _trigger_emergency_kill_switch(
|
||||
*,
|
||||
reason: str,
|
||||
broker: KISBroker,
|
||||
overseas_broker: OverseasBroker,
|
||||
telegram: TelegramClient,
|
||||
settings: Settings | None,
|
||||
current_market: MarketInfo | None,
|
||||
stock_code: str,
|
||||
pnl_pct: float,
|
||||
threshold: float,
|
||||
) -> Any:
|
||||
markets = _resolve_kill_switch_markets(settings=settings, current_market=current_market)
|
||||
return await KILL_SWITCH.trigger(
|
||||
reason=reason,
|
||||
cancel_pending_orders=lambda: _cancel_pending_orders_for_kill_switch(
|
||||
broker=broker,
|
||||
overseas_broker=overseas_broker,
|
||||
markets=markets,
|
||||
),
|
||||
refresh_order_state=lambda: _refresh_order_state_for_kill_switch(
|
||||
broker=broker,
|
||||
overseas_broker=overseas_broker,
|
||||
markets=markets,
|
||||
),
|
||||
reduce_risk=_reduce_risk_for_kill_switch,
|
||||
snapshot_state=lambda: logger.critical(
|
||||
"KillSwitch snapshot %s/%s pnl=%.2f threshold=%.2f",
|
||||
current_market.code if current_market else "UNKNOWN",
|
||||
stock_code,
|
||||
pnl_pct,
|
||||
threshold,
|
||||
),
|
||||
notify=lambda: telegram.notify_circuit_breaker(
|
||||
pnl_pct=pnl_pct,
|
||||
threshold=threshold,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
async def trading_cycle(
|
||||
broker: KISBroker,
|
||||
overseas_broker: OverseasBroker,
|
||||
@@ -1111,6 +1312,24 @@ async def trading_cycle(
|
||||
)
|
||||
return
|
||||
order_amount = current_price * quantity
|
||||
fx_blocked, remaining_cash, required_buffer = _should_block_overseas_buy_for_fx_buffer(
|
||||
market=market,
|
||||
action=decision.action,
|
||||
total_cash=total_cash,
|
||||
order_amount=order_amount,
|
||||
settings=settings,
|
||||
)
|
||||
if fx_blocked:
|
||||
logger.warning(
|
||||
"Skip BUY %s (%s): FX buffer guard (remaining=%.2f, required=%.2f, cash=%.2f, order=%.2f)",
|
||||
stock_code,
|
||||
market.name,
|
||||
remaining_cash,
|
||||
required_buffer,
|
||||
total_cash,
|
||||
order_amount,
|
||||
)
|
||||
return
|
||||
|
||||
# 4. Check BUY cooldown (set when a prior BUY failed due to insufficient balance)
|
||||
if decision.action == "BUY" and buy_cooldown is not None:
|
||||
@@ -1151,15 +1370,16 @@ async def trading_cycle(
|
||||
logger.warning("Fat finger notification failed: %s", notify_exc)
|
||||
raise # Re-raise to prevent trade
|
||||
except CircuitBreakerTripped as exc:
|
||||
ks_report = await KILL_SWITCH.trigger(
|
||||
ks_report = await _trigger_emergency_kill_switch(
|
||||
reason=f"circuit_breaker:{market.code}:{stock_code}:{exc.pnl_pct:.2f}",
|
||||
snapshot_state=lambda: logger.critical(
|
||||
"KillSwitch snapshot %s/%s pnl=%.2f threshold=%.2f",
|
||||
market.code,
|
||||
stock_code,
|
||||
exc.pnl_pct,
|
||||
exc.threshold,
|
||||
),
|
||||
broker=broker,
|
||||
overseas_broker=overseas_broker,
|
||||
telegram=telegram,
|
||||
settings=settings,
|
||||
current_market=market,
|
||||
stock_code=stock_code,
|
||||
pnl_pct=exc.pnl_pct,
|
||||
threshold=exc.threshold,
|
||||
)
|
||||
if ks_report.errors:
|
||||
logger.critical(
|
||||
@@ -2178,6 +2398,24 @@ async def run_daily_session(
|
||||
)
|
||||
continue
|
||||
order_amount = stock_data["current_price"] * quantity
|
||||
fx_blocked, remaining_cash, required_buffer = _should_block_overseas_buy_for_fx_buffer(
|
||||
market=market,
|
||||
action=decision.action,
|
||||
total_cash=total_cash,
|
||||
order_amount=order_amount,
|
||||
settings=settings,
|
||||
)
|
||||
if fx_blocked:
|
||||
logger.warning(
|
||||
"Skip BUY %s (%s): FX buffer guard (remaining=%.2f, required=%.2f, cash=%.2f, order=%.2f)",
|
||||
stock_code,
|
||||
market.name,
|
||||
remaining_cash,
|
||||
required_buffer,
|
||||
total_cash,
|
||||
order_amount,
|
||||
)
|
||||
continue
|
||||
|
||||
# Check BUY cooldown (insufficient balance)
|
||||
if decision.action == "BUY":
|
||||
@@ -2218,26 +2456,18 @@ async def run_daily_session(
|
||||
logger.warning("Fat finger notification failed: %s", notify_exc)
|
||||
continue # Skip this order
|
||||
except CircuitBreakerTripped as exc:
|
||||
ks_report = await KILL_SWITCH.trigger(
|
||||
ks_report = await _trigger_emergency_kill_switch(
|
||||
reason=f"daily_circuit_breaker:{market.code}:{stock_code}:{exc.pnl_pct:.2f}",
|
||||
snapshot_state=lambda: logger.critical(
|
||||
"Daily KillSwitch snapshot %s/%s pnl=%.2f threshold=%.2f",
|
||||
market.code,
|
||||
stock_code,
|
||||
exc.pnl_pct,
|
||||
exc.threshold,
|
||||
),
|
||||
broker=broker,
|
||||
overseas_broker=overseas_broker,
|
||||
telegram=telegram,
|
||||
settings=settings,
|
||||
current_market=market,
|
||||
stock_code=stock_code,
|
||||
pnl_pct=exc.pnl_pct,
|
||||
threshold=exc.threshold,
|
||||
)
|
||||
logger.critical("Circuit breaker tripped — stopping session")
|
||||
try:
|
||||
await telegram.notify_circuit_breaker(
|
||||
pnl_pct=exc.pnl_pct,
|
||||
threshold=exc.threshold,
|
||||
)
|
||||
except Exception as notify_exc:
|
||||
logger.warning(
|
||||
"Circuit breaker notification failed: %s", notify_exc
|
||||
)
|
||||
if ks_report.errors:
|
||||
logger.critical(
|
||||
"Daily KillSwitch step errors for %s/%s: %s",
|
||||
|
||||
83
tests/test_backtest_cost_guard.py
Normal file
83
tests/test_backtest_cost_guard.py
Normal file
@@ -0,0 +1,83 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from src.analysis.backtest_cost_guard import BacktestCostModel, validate_backtest_cost_model
|
||||
|
||||
|
||||
def test_valid_backtest_cost_model_passes() -> None:
|
||||
model = BacktestCostModel(
|
||||
commission_bps=5.0,
|
||||
slippage_bps_by_session={"KRX_REG": 10.0, "US_PRE": 50.0},
|
||||
failure_rate_by_session={"KRX_REG": 0.01, "US_PRE": 0.08},
|
||||
unfavorable_fill_required=True,
|
||||
)
|
||||
validate_backtest_cost_model(model=model, required_sessions=["KRX_REG", "US_PRE"])
|
||||
|
||||
|
||||
def test_missing_required_slippage_session_raises() -> None:
|
||||
model = BacktestCostModel(
|
||||
commission_bps=5.0,
|
||||
slippage_bps_by_session={"KRX_REG": 10.0},
|
||||
failure_rate_by_session={"KRX_REG": 0.01, "US_PRE": 0.08},
|
||||
unfavorable_fill_required=True,
|
||||
)
|
||||
with pytest.raises(ValueError, match="missing slippage_bps_by_session.*US_PRE"):
|
||||
validate_backtest_cost_model(model=model, required_sessions=["KRX_REG", "US_PRE"])
|
||||
|
||||
|
||||
def test_missing_required_failure_rate_session_raises() -> None:
|
||||
model = BacktestCostModel(
|
||||
commission_bps=5.0,
|
||||
slippage_bps_by_session={"KRX_REG": 10.0, "US_PRE": 50.0},
|
||||
failure_rate_by_session={"KRX_REG": 0.01},
|
||||
unfavorable_fill_required=True,
|
||||
)
|
||||
with pytest.raises(ValueError, match="missing failure_rate_by_session.*US_PRE"):
|
||||
validate_backtest_cost_model(model=model, required_sessions=["KRX_REG", "US_PRE"])
|
||||
|
||||
|
||||
def test_invalid_failure_rate_range_raises() -> None:
|
||||
model = BacktestCostModel(
|
||||
commission_bps=5.0,
|
||||
slippage_bps_by_session={"KRX_REG": 10.0},
|
||||
failure_rate_by_session={"KRX_REG": 1.2},
|
||||
unfavorable_fill_required=True,
|
||||
)
|
||||
with pytest.raises(ValueError, match="failure rate must be within"):
|
||||
validate_backtest_cost_model(model=model, required_sessions=["KRX_REG"])
|
||||
|
||||
|
||||
def test_unfavorable_fill_requirement_cannot_be_disabled() -> None:
|
||||
model = BacktestCostModel(
|
||||
commission_bps=5.0,
|
||||
slippage_bps_by_session={"KRX_REG": 10.0},
|
||||
failure_rate_by_session={"KRX_REG": 0.02},
|
||||
unfavorable_fill_required=False,
|
||||
)
|
||||
with pytest.raises(ValueError, match="unfavorable_fill_required must be True"):
|
||||
validate_backtest_cost_model(model=model, required_sessions=["KRX_REG"])
|
||||
|
||||
|
||||
@pytest.mark.parametrize("bad_commission", [float("nan"), float("inf"), float("-inf")])
|
||||
def test_non_finite_commission_rejected(bad_commission: float) -> None:
|
||||
model = BacktestCostModel(
|
||||
commission_bps=bad_commission,
|
||||
slippage_bps_by_session={"KRX_REG": 10.0},
|
||||
failure_rate_by_session={"KRX_REG": 0.02},
|
||||
unfavorable_fill_required=True,
|
||||
)
|
||||
with pytest.raises(ValueError, match="commission_bps"):
|
||||
validate_backtest_cost_model(model=model, required_sessions=["KRX_REG"])
|
||||
|
||||
|
||||
@pytest.mark.parametrize("bad_slippage", [float("nan"), float("inf"), float("-inf")])
|
||||
def test_non_finite_slippage_rejected(bad_slippage: float) -> None:
|
||||
model = BacktestCostModel(
|
||||
commission_bps=5.0,
|
||||
slippage_bps_by_session={"KRX_REG": bad_slippage},
|
||||
failure_rate_by_session={"KRX_REG": 0.02},
|
||||
unfavorable_fill_required=True,
|
||||
)
|
||||
with pytest.raises(ValueError, match="slippage bps"):
|
||||
validate_backtest_cost_model(model=model, required_sessions=["KRX_REG"])
|
||||
@@ -15,6 +15,8 @@ from src.evolution.scorecard import DailyScorecard
|
||||
from src.logging.decision_logger import DecisionLogger
|
||||
from src.main import (
|
||||
KILL_SWITCH,
|
||||
_should_block_overseas_buy_for_fx_buffer,
|
||||
_trigger_emergency_kill_switch,
|
||||
_apply_dashboard_flag,
|
||||
_determine_order_quantity,
|
||||
_extract_avg_price_from_balance,
|
||||
@@ -3689,6 +3691,81 @@ class TestOverseasBrokerIntegration:
|
||||
# DB도 브로커도 보유 없음 → BUY 주문이 실행되어야 함 (회귀 테스트)
|
||||
overseas_broker.send_overseas_order.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_overseas_buy_blocked_by_usd_buffer_guard(self) -> None:
|
||||
"""Overseas BUY must be blocked when USD buffer would be breached."""
|
||||
db_conn = init_db(":memory:")
|
||||
|
||||
overseas_broker = MagicMock()
|
||||
overseas_broker.get_overseas_price = AsyncMock(
|
||||
return_value={"output": {"last": "182.50"}}
|
||||
)
|
||||
overseas_broker.get_overseas_balance = AsyncMock(
|
||||
return_value={
|
||||
"output1": [],
|
||||
"output2": [
|
||||
{
|
||||
"frcr_evlu_tota": "50000.00",
|
||||
"frcr_buy_amt_smtl": "0.00",
|
||||
}
|
||||
],
|
||||
}
|
||||
)
|
||||
overseas_broker.get_overseas_buying_power = AsyncMock(
|
||||
return_value={"output": {"ovrs_ord_psbl_amt": "50000.00"}}
|
||||
)
|
||||
overseas_broker.send_overseas_order = AsyncMock(return_value={"msg1": "주문접수"})
|
||||
|
||||
engine = MagicMock(spec=ScenarioEngine)
|
||||
engine.evaluate = MagicMock(return_value=_make_buy_match("AAPL"))
|
||||
|
||||
market = MagicMock()
|
||||
market.name = "NASDAQ"
|
||||
market.code = "US_NASDAQ"
|
||||
market.exchange_code = "NASD"
|
||||
market.is_domestic = False
|
||||
|
||||
telegram = MagicMock()
|
||||
telegram.notify_trade_execution = AsyncMock()
|
||||
telegram.notify_fat_finger = AsyncMock()
|
||||
telegram.notify_circuit_breaker = AsyncMock()
|
||||
telegram.notify_scenario_matched = AsyncMock()
|
||||
|
||||
decision_logger = MagicMock()
|
||||
decision_logger.log_decision = MagicMock(return_value="decision-id")
|
||||
|
||||
settings = MagicMock()
|
||||
settings.POSITION_SIZING_ENABLED = False
|
||||
settings.CONFIDENCE_THRESHOLD = 80
|
||||
settings.USD_BUFFER_MIN = 49900.0
|
||||
settings.MODE = "paper"
|
||||
settings.PAPER_OVERSEAS_CASH = 50000.0
|
||||
|
||||
await trading_cycle(
|
||||
broker=MagicMock(),
|
||||
overseas_broker=overseas_broker,
|
||||
scenario_engine=engine,
|
||||
playbook=_make_playbook(market="US"),
|
||||
risk=MagicMock(),
|
||||
db_conn=db_conn,
|
||||
decision_logger=decision_logger,
|
||||
context_store=MagicMock(
|
||||
get_latest_timeframe=MagicMock(return_value=None),
|
||||
set_context=MagicMock(),
|
||||
),
|
||||
criticality_assessor=MagicMock(
|
||||
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
|
||||
get_timeout=MagicMock(return_value=5.0),
|
||||
),
|
||||
telegram=telegram,
|
||||
market=market,
|
||||
stock_code="AAPL",
|
||||
scan_candidates={},
|
||||
settings=settings,
|
||||
)
|
||||
|
||||
overseas_broker.send_overseas_order.assert_not_called()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _retry_connection — unit tests (issue #209)
|
||||
@@ -3722,7 +3799,6 @@ class TestRetryConnection:
|
||||
with patch("src.main.asyncio.sleep") as mock_sleep:
|
||||
mock_sleep.return_value = None
|
||||
result = await _retry_connection(flaky, label="flaky")
|
||||
|
||||
assert result == "ok"
|
||||
assert call_count == 2
|
||||
mock_sleep.assert_called_once()
|
||||
@@ -3777,6 +3853,48 @@ class TestRetryConnection:
|
||||
assert call_count == 1 # No retry for non-ConnectionError
|
||||
|
||||
|
||||
def test_fx_buffer_guard_applies_only_to_us_and_respects_boundary() -> None:
|
||||
settings = MagicMock()
|
||||
settings.USD_BUFFER_MIN = 1000.0
|
||||
|
||||
us_market = MagicMock()
|
||||
us_market.is_domestic = False
|
||||
us_market.code = "US_NASDAQ"
|
||||
|
||||
blocked, remaining, required = _should_block_overseas_buy_for_fx_buffer(
|
||||
market=us_market,
|
||||
action="BUY",
|
||||
total_cash=5000.0,
|
||||
order_amount=4001.0,
|
||||
settings=settings,
|
||||
)
|
||||
assert blocked
|
||||
assert remaining == 999.0
|
||||
assert required == 1000.0
|
||||
|
||||
blocked_eq, _, _ = _should_block_overseas_buy_for_fx_buffer(
|
||||
market=us_market,
|
||||
action="BUY",
|
||||
total_cash=5000.0,
|
||||
order_amount=4000.0,
|
||||
settings=settings,
|
||||
)
|
||||
assert not blocked_eq
|
||||
|
||||
jp_market = MagicMock()
|
||||
jp_market.is_domestic = False
|
||||
jp_market.code = "JP"
|
||||
blocked_jp, _, required_jp = _should_block_overseas_buy_for_fx_buffer(
|
||||
market=jp_market,
|
||||
action="BUY",
|
||||
total_cash=5000.0,
|
||||
order_amount=4500.0,
|
||||
settings=settings,
|
||||
)
|
||||
assert not blocked_jp
|
||||
assert required_jp == 0.0
|
||||
|
||||
|
||||
# run_daily_session — daily CB baseline (daily_start_eval) tests (issue #207)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@@ -5349,3 +5467,118 @@ async def test_process_blackout_recovery_drops_policy_rejected_intent() -> None:
|
||||
|
||||
broker.send_order.assert_not_called()
|
||||
blackout_manager.requeue.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_trigger_emergency_kill_switch_executes_operational_steps() -> None:
|
||||
"""Emergency kill switch should execute cancel/refresh/reduce/notify callbacks."""
|
||||
broker = MagicMock()
|
||||
broker.get_domestic_pending_orders = AsyncMock(
|
||||
return_value=[
|
||||
{
|
||||
"pdno": "005930",
|
||||
"orgn_odno": "1",
|
||||
"ord_gno_brno": "01",
|
||||
"psbl_qty": "3",
|
||||
}
|
||||
]
|
||||
)
|
||||
broker.cancel_domestic_order = AsyncMock(return_value={"rt_cd": "0"})
|
||||
broker.get_balance = AsyncMock(return_value={"output1": [], "output2": []})
|
||||
|
||||
overseas_broker = MagicMock()
|
||||
overseas_broker.get_overseas_pending_orders = AsyncMock(return_value=[])
|
||||
overseas_broker.get_overseas_balance = AsyncMock(return_value={"output1": [], "output2": []})
|
||||
|
||||
telegram = MagicMock()
|
||||
telegram.notify_circuit_breaker = AsyncMock()
|
||||
|
||||
settings = MagicMock()
|
||||
settings.enabled_market_list = ["KR"]
|
||||
|
||||
market = MagicMock()
|
||||
market.code = "KR"
|
||||
market.exchange_code = "KRX"
|
||||
market.is_domestic = True
|
||||
|
||||
with (
|
||||
patch("src.main.MARKETS", {"KR": market}),
|
||||
patch("src.main.BLACKOUT_ORDER_MANAGER.clear", return_value=2),
|
||||
):
|
||||
report = await _trigger_emergency_kill_switch(
|
||||
reason="test",
|
||||
broker=broker,
|
||||
overseas_broker=overseas_broker,
|
||||
telegram=telegram,
|
||||
settings=settings,
|
||||
current_market=market,
|
||||
stock_code="005930",
|
||||
pnl_pct=-3.2,
|
||||
threshold=-3.0,
|
||||
)
|
||||
|
||||
assert report.steps == [
|
||||
"block_new_orders",
|
||||
"cancel_pending_orders",
|
||||
"refresh_order_state",
|
||||
"reduce_risk",
|
||||
"snapshot_state",
|
||||
"notify",
|
||||
]
|
||||
broker.cancel_domestic_order.assert_called_once()
|
||||
broker.get_balance.assert_called_once()
|
||||
telegram.notify_circuit_breaker.assert_called_once_with(
|
||||
pnl_pct=-3.2,
|
||||
threshold=-3.0,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_trigger_emergency_kill_switch_records_cancel_failure() -> None:
|
||||
"""Cancel API rejection should be captured in kill switch errors."""
|
||||
broker = MagicMock()
|
||||
broker.get_domestic_pending_orders = AsyncMock(
|
||||
return_value=[
|
||||
{
|
||||
"pdno": "005930",
|
||||
"orgn_odno": "1",
|
||||
"ord_gno_brno": "01",
|
||||
"psbl_qty": "3",
|
||||
}
|
||||
]
|
||||
)
|
||||
broker.cancel_domestic_order = AsyncMock(return_value={"rt_cd": "1", "msg1": "fail"})
|
||||
broker.get_balance = AsyncMock(return_value={"output1": [], "output2": []})
|
||||
|
||||
overseas_broker = MagicMock()
|
||||
overseas_broker.get_overseas_pending_orders = AsyncMock(return_value=[])
|
||||
overseas_broker.get_overseas_balance = AsyncMock(return_value={"output1": [], "output2": []})
|
||||
|
||||
telegram = MagicMock()
|
||||
telegram.notify_circuit_breaker = AsyncMock()
|
||||
|
||||
settings = MagicMock()
|
||||
settings.enabled_market_list = ["KR"]
|
||||
|
||||
market = MagicMock()
|
||||
market.code = "KR"
|
||||
market.exchange_code = "KRX"
|
||||
market.is_domestic = True
|
||||
|
||||
with (
|
||||
patch("src.main.MARKETS", {"KR": market}),
|
||||
patch("src.main.BLACKOUT_ORDER_MANAGER.clear", return_value=0),
|
||||
):
|
||||
report = await _trigger_emergency_kill_switch(
|
||||
reason="test-fail",
|
||||
broker=broker,
|
||||
overseas_broker=overseas_broker,
|
||||
telegram=telegram,
|
||||
settings=settings,
|
||||
current_market=market,
|
||||
stock_code="005930",
|
||||
pnl_pct=-3.2,
|
||||
threshold=-3.0,
|
||||
)
|
||||
|
||||
assert any(err.startswith("cancel_pending_orders:") for err in report.errors)
|
||||
|
||||
131
tests/test_triple_barrier.py
Normal file
131
tests/test_triple_barrier.py
Normal file
@@ -0,0 +1,131 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from src.analysis.triple_barrier import TripleBarrierSpec, label_with_triple_barrier
|
||||
|
||||
|
||||
def test_long_take_profit_first() -> None:
|
||||
highs = [100, 101, 103]
|
||||
lows = [100, 99.6, 100]
|
||||
closes = [100, 100, 102]
|
||||
spec = TripleBarrierSpec(take_profit_pct=0.02, stop_loss_pct=0.01, max_holding_bars=3)
|
||||
out = label_with_triple_barrier(
|
||||
highs=highs,
|
||||
lows=lows,
|
||||
closes=closes,
|
||||
entry_index=0,
|
||||
side=1,
|
||||
spec=spec,
|
||||
)
|
||||
assert out.label == 1
|
||||
assert out.touched == "take_profit"
|
||||
assert out.touch_bar == 2
|
||||
|
||||
|
||||
def test_long_stop_loss_first() -> None:
|
||||
highs = [100, 100.5, 101]
|
||||
lows = [100, 98.8, 99]
|
||||
closes = [100, 99.5, 100]
|
||||
spec = TripleBarrierSpec(take_profit_pct=0.02, stop_loss_pct=0.01, max_holding_bars=3)
|
||||
out = label_with_triple_barrier(
|
||||
highs=highs,
|
||||
lows=lows,
|
||||
closes=closes,
|
||||
entry_index=0,
|
||||
side=1,
|
||||
spec=spec,
|
||||
)
|
||||
assert out.label == -1
|
||||
assert out.touched == "stop_loss"
|
||||
assert out.touch_bar == 1
|
||||
|
||||
|
||||
def test_time_barrier_timeout() -> None:
|
||||
highs = [100, 100.8, 100.7]
|
||||
lows = [100, 99.3, 99.4]
|
||||
closes = [100, 100, 100]
|
||||
spec = TripleBarrierSpec(take_profit_pct=0.02, stop_loss_pct=0.02, max_holding_bars=2)
|
||||
out = label_with_triple_barrier(
|
||||
highs=highs,
|
||||
lows=lows,
|
||||
closes=closes,
|
||||
entry_index=0,
|
||||
side=1,
|
||||
spec=spec,
|
||||
)
|
||||
assert out.label == 0
|
||||
assert out.touched == "time"
|
||||
assert out.touch_bar == 2
|
||||
|
||||
|
||||
def test_tie_break_stop_first_default() -> None:
|
||||
highs = [100, 102.1]
|
||||
lows = [100, 98.9]
|
||||
closes = [100, 100]
|
||||
spec = TripleBarrierSpec(take_profit_pct=0.02, stop_loss_pct=0.01, max_holding_bars=1)
|
||||
out = label_with_triple_barrier(
|
||||
highs=highs,
|
||||
lows=lows,
|
||||
closes=closes,
|
||||
entry_index=0,
|
||||
side=1,
|
||||
spec=spec,
|
||||
)
|
||||
assert out.label == -1
|
||||
assert out.touched == "stop_loss"
|
||||
|
||||
|
||||
def test_short_side_inverts_barrier_semantics() -> None:
|
||||
highs = [100, 100.5, 101.2]
|
||||
lows = [100, 97.8, 98.0]
|
||||
closes = [100, 99, 99]
|
||||
spec = TripleBarrierSpec(take_profit_pct=0.02, stop_loss_pct=0.01, max_holding_bars=3)
|
||||
out = label_with_triple_barrier(
|
||||
highs=highs,
|
||||
lows=lows,
|
||||
closes=closes,
|
||||
entry_index=0,
|
||||
side=-1,
|
||||
spec=spec,
|
||||
)
|
||||
assert out.label == 1
|
||||
assert out.touched == "take_profit"
|
||||
|
||||
|
||||
def test_short_tie_break_modes() -> None:
|
||||
highs = [100, 101.1]
|
||||
lows = [100, 97.9]
|
||||
closes = [100, 100]
|
||||
|
||||
stop_first = TripleBarrierSpec(
|
||||
take_profit_pct=0.02,
|
||||
stop_loss_pct=0.01,
|
||||
max_holding_bars=1,
|
||||
tie_break="stop_first",
|
||||
)
|
||||
out_stop = label_with_triple_barrier(
|
||||
highs=highs,
|
||||
lows=lows,
|
||||
closes=closes,
|
||||
entry_index=0,
|
||||
side=-1,
|
||||
spec=stop_first,
|
||||
)
|
||||
assert out_stop.label == -1
|
||||
assert out_stop.touched == "stop_loss"
|
||||
|
||||
take_first = TripleBarrierSpec(
|
||||
take_profit_pct=0.02,
|
||||
stop_loss_pct=0.01,
|
||||
max_holding_bars=1,
|
||||
tie_break="take_first",
|
||||
)
|
||||
out_take = label_with_triple_barrier(
|
||||
highs=highs,
|
||||
lows=lows,
|
||||
closes=closes,
|
||||
entry_index=0,
|
||||
side=-1,
|
||||
spec=take_first,
|
||||
)
|
||||
assert out_take.label == 1
|
||||
assert out_take.touched == "take_profit"
|
||||
92
tests/test_walk_forward_split.py
Normal file
92
tests/test_walk_forward_split.py
Normal file
@@ -0,0 +1,92 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from src.analysis.walk_forward_split import generate_walk_forward_splits
|
||||
|
||||
|
||||
def test_generates_sequential_folds() -> None:
|
||||
folds = generate_walk_forward_splits(
|
||||
n_samples=30,
|
||||
train_size=10,
|
||||
test_size=5,
|
||||
)
|
||||
assert len(folds) == 4
|
||||
assert folds[0].train_indices == list(range(0, 10))
|
||||
assert folds[0].test_indices == list(range(10, 15))
|
||||
assert folds[1].train_indices == list(range(5, 15))
|
||||
assert folds[1].test_indices == list(range(15, 20))
|
||||
|
||||
|
||||
def test_purge_removes_boundary_samples_before_test() -> None:
|
||||
folds = generate_walk_forward_splits(
|
||||
n_samples=25,
|
||||
train_size=8,
|
||||
test_size=4,
|
||||
purge_size=2,
|
||||
)
|
||||
first = folds[0]
|
||||
# test starts at 10, purge=2 => train end must be 7
|
||||
assert first.train_indices == list(range(0, 8))
|
||||
assert first.test_indices == list(range(10, 14))
|
||||
|
||||
|
||||
def test_embargo_excludes_post_test_samples_from_next_train() -> None:
|
||||
folds = generate_walk_forward_splits(
|
||||
n_samples=45,
|
||||
train_size=15,
|
||||
test_size=5,
|
||||
step_size=10,
|
||||
embargo_size=3,
|
||||
)
|
||||
assert len(folds) >= 2
|
||||
# Fold1 test: 15..19, next fold train window: 10..24.
|
||||
# embargo_size=3 should remove 20,21,22 from fold2 train.
|
||||
second_train = folds[1].train_indices
|
||||
assert 20 not in second_train
|
||||
assert 21 not in second_train
|
||||
assert 22 not in second_train
|
||||
assert 23 in second_train
|
||||
|
||||
|
||||
def test_respects_min_train_size_and_returns_empty_when_impossible() -> None:
|
||||
folds = generate_walk_forward_splits(
|
||||
n_samples=15,
|
||||
train_size=5,
|
||||
test_size=5,
|
||||
min_train_size=6,
|
||||
)
|
||||
assert folds == []
|
||||
|
||||
|
||||
def test_embargo_uses_last_accepted_fold_when_intermediate_fold_skips() -> None:
|
||||
folds = generate_walk_forward_splits(
|
||||
n_samples=30,
|
||||
train_size=5,
|
||||
test_size=3,
|
||||
step_size=5,
|
||||
embargo_size=1,
|
||||
min_train_size=5,
|
||||
)
|
||||
# 1st fold accepted, 2nd skipped by min_train_size, subsequent folds still generated.
|
||||
assert len(folds) == 3
|
||||
assert folds[0].test_indices == [5, 6, 7]
|
||||
assert folds[1].test_indices == [15, 16, 17]
|
||||
assert folds[2].test_indices == [25, 26, 27]
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("n_samples", "train_size", "test_size"),
|
||||
[
|
||||
(0, 10, 2),
|
||||
(10, 0, 2),
|
||||
(10, 5, 0),
|
||||
],
|
||||
)
|
||||
def test_invalid_args_raise(n_samples: int, train_size: int, test_size: int) -> None:
|
||||
with pytest.raises(ValueError):
|
||||
generate_walk_forward_splits(
|
||||
n_samples=n_samples,
|
||||
train_size=train_size,
|
||||
test_size=test_size,
|
||||
)
|
||||
Reference in New Issue
Block a user