Compare commits

...

10 Commits

Author SHA1 Message Date
cfd5351b58 Merge pull request '[FX-ACCOUNTING] TKT-P1-001 USD/KRW 버퍼 진입 제한' (#288) from feature/issue-tkt-p1-001-fx-buffer-guard into feature/v3-session-policy-stream 2026-02-27 00:53:21 +09:00
agentson
b206c23fc9 fix: scope USD buffer guard to US markets and add boundary tests 2026-02-27 00:52:44 +09:00
agentson
4d9f3e2cfc feat: enforce overseas buy guard with USD buffer threshold (TASK-V3-014) 2026-02-27 00:50:12 +09:00
a93a5c616b Merge pull request '[BACKTEST-MODEL] TKT-P1-003 Triple Barrier 라벨러 구현' (#286) from feature/issue-tkt-p1-003-triple-barrier-labeler into feature/v3-session-policy-stream 2026-02-27 00:47:37 +09:00
agentson
9f64c9944a fix: correct short-side tie-break semantics in triple barrier 2026-02-27 00:47:09 +09:00
agentson
bb391d502c feat: add triple barrier labeler with first-touch logic (TASK-CODE-004) 2026-02-27 00:45:18 +09:00
b0100fde10 Merge pull request '[RISK-EMERGENCY][SCN-FAIL-003] TKT-P0-002 Kill Switch 순서 강제 검증 자동화' (#284) from feature/issue-tkt-p0-002-killswitch-ordering into feature/v3-session-policy-stream 2026-02-27 00:42:16 +09:00
agentson
0a4e69d40c fix: record kill switch cancel failures and add failure-path tests 2026-02-27 00:41:13 +09:00
agentson
25401ac132 feat: enforce operational kill switch callbacks in runtime flow (TASK-CODE-003) 2026-02-27 00:38:26 +09:00
1381b140ab Merge pull request '[EXEC-POLICY][SCN-FAIL-001] TKT-P0-001 블랙아웃 차단/큐/복구 재검증' (#282) from feature/issue-tkt-p0-001-blackout-queue-revalidate into feature/v3-session-policy-stream 2026-02-27 00:32:59 +09:00
6 changed files with 737 additions and 26 deletions

View File

@@ -0,0 +1,111 @@
"""Triple barrier labeler utilities.
Implements first-touch labeling with upper/lower/time barriers.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Literal, Sequence
TieBreakMode = Literal["stop_first", "take_first"]
@dataclass(frozen=True)
class TripleBarrierSpec:
take_profit_pct: float
stop_loss_pct: float
max_holding_bars: int
tie_break: TieBreakMode = "stop_first"
@dataclass(frozen=True)
class TripleBarrierLabel:
label: int # +1 take-profit first, -1 stop-loss first, 0 timeout
touched: Literal["take_profit", "stop_loss", "time"]
touch_bar: int
entry_price: float
upper_barrier: float
lower_barrier: float
def label_with_triple_barrier(
*,
highs: Sequence[float],
lows: Sequence[float],
closes: Sequence[float],
entry_index: int,
side: int,
spec: TripleBarrierSpec,
) -> TripleBarrierLabel:
"""Label one entry using triple-barrier first-touch rule.
Args:
highs/lows/closes: OHLC components with identical length.
entry_index: Entry bar index in the sequences.
side: +1 for long, -1 for short.
spec: Barrier specification.
"""
if side not in {1, -1}:
raise ValueError("side must be +1 or -1")
if len(highs) != len(lows) or len(highs) != len(closes):
raise ValueError("highs, lows, closes lengths must match")
if entry_index < 0 or entry_index >= len(closes):
raise IndexError("entry_index out of range")
if spec.max_holding_bars <= 0:
raise ValueError("max_holding_bars must be positive")
entry_price = float(closes[entry_index])
if entry_price <= 0:
raise ValueError("entry price must be positive")
if side == 1:
upper = entry_price * (1.0 + spec.take_profit_pct)
lower = entry_price * (1.0 - spec.stop_loss_pct)
else:
# For short side, favorable move is down.
upper = entry_price * (1.0 + spec.stop_loss_pct)
lower = entry_price * (1.0 - spec.take_profit_pct)
last_index = min(len(closes) - 1, entry_index + spec.max_holding_bars)
for idx in range(entry_index + 1, last_index + 1):
h = float(highs[idx])
l = float(lows[idx])
up_touch = h >= upper
down_touch = l <= lower
if not up_touch and not down_touch:
continue
if up_touch and down_touch:
if spec.tie_break == "stop_first":
touched = "stop_loss"
label = -1
else:
touched = "take_profit"
label = 1
elif up_touch:
touched = "take_profit" if side == 1 else "stop_loss"
label = 1 if side == 1 else -1
else:
touched = "stop_loss" if side == 1 else "take_profit"
label = -1 if side == 1 else 1
return TripleBarrierLabel(
label=label,
touched=touched,
touch_bar=idx,
entry_price=entry_price,
upper_barrier=upper,
lower_barrier=lower,
)
return TripleBarrierLabel(
label=0,
touched="time",
touch_bar=last_index,
entry_price=entry_price,
upper_barrier=upper,
lower_barrier=lower,
)

View File

@@ -59,6 +59,7 @@ class Settings(BaseSettings):
# KIS VTS overseas balance API returns errors for most accounts.
# This value is used as a fallback when the balance API returns 0 in paper mode.
PAPER_OVERSEAS_CASH: float = Field(default=50000.0, ge=0.0)
USD_BUFFER_MIN: float = Field(default=1000.0, ge=0.0)
# Trading frequency mode (daily = batch API calls, realtime = per-stock calls)
TRADE_MODE: str = Field(default="daily", pattern="^(daily|realtime)$")

View File

@@ -98,3 +98,8 @@ class BlackoutOrderManager:
def requeue(self, intent: QueuedOrderIntent) -> None:
if len(self._queue) < self._max_queue_size:
self._queue.append(intent)
def clear(self) -> int:
count = len(self._queue)
self._queue.clear()
return count

View File

@@ -429,6 +429,26 @@ def _determine_order_quantity(
return quantity
def _should_block_overseas_buy_for_fx_buffer(
*,
market: MarketInfo,
action: str,
total_cash: float,
order_amount: float,
settings: Settings | None,
) -> tuple[bool, float, float]:
if (
market.is_domestic
or not market.code.startswith("US")
or action != "BUY"
or settings is None
):
return False, total_cash - order_amount, 0.0
remaining = total_cash - order_amount
required = settings.USD_BUFFER_MIN
return remaining < required, remaining, required
async def build_overseas_symbol_universe(
db_conn: Any,
overseas_broker: OverseasBroker,
@@ -636,6 +656,187 @@ async def process_blackout_recovery_orders(
BLACKOUT_ORDER_MANAGER.requeue(intent)
def _resolve_kill_switch_markets(
*,
settings: Settings | None,
current_market: MarketInfo | None,
) -> list[MarketInfo]:
if settings is not None:
markets: list[MarketInfo] = []
seen: set[str] = set()
for market_code in settings.enabled_market_list:
market = MARKETS.get(market_code)
if market is None or market.code in seen:
continue
markets.append(market)
seen.add(market.code)
if markets:
return markets
if current_market is not None:
return [current_market]
return []
async def _cancel_pending_orders_for_kill_switch(
*,
broker: KISBroker,
overseas_broker: OverseasBroker,
markets: list[MarketInfo],
) -> None:
failures: list[str] = []
domestic = [m for m in markets if m.is_domestic]
overseas = [m for m in markets if not m.is_domestic]
if domestic:
try:
orders = await broker.get_domestic_pending_orders()
except Exception as exc:
logger.warning("KillSwitch: failed to fetch domestic pending orders: %s", exc)
orders = []
for order in orders:
stock_code = str(order.get("pdno", ""))
try:
orgn_odno = order.get("orgn_odno", "")
krx_fwdg_ord_orgno = order.get("ord_gno_brno", "")
psbl_qty = int(order.get("psbl_qty", "0") or "0")
if not stock_code or not orgn_odno or psbl_qty <= 0:
continue
cancel_result = await broker.cancel_domestic_order(
stock_code=stock_code,
orgn_odno=orgn_odno,
krx_fwdg_ord_orgno=krx_fwdg_ord_orgno,
qty=psbl_qty,
)
if cancel_result.get("rt_cd") != "0":
failures.append(
"domestic cancel failed for"
f" {stock_code}: rt_cd={cancel_result.get('rt_cd')}"
f" msg={cancel_result.get('msg1')}"
)
except Exception as exc:
logger.warning("KillSwitch: domestic cancel failed: %s", exc)
failures.append(f"domestic cancel exception for {stock_code}: {exc}")
us_exchanges = frozenset({"NASD", "NYSE", "AMEX"})
exchange_codes: list[str] = []
seen_us = False
for market in overseas:
exc_code = market.exchange_code
if exc_code in us_exchanges:
if not seen_us:
exchange_codes.append("NASD")
seen_us = True
elif exc_code not in exchange_codes:
exchange_codes.append(exc_code)
for exchange_code in exchange_codes:
try:
orders = await overseas_broker.get_overseas_pending_orders(exchange_code)
except Exception as exc:
logger.warning(
"KillSwitch: failed to fetch overseas pending orders for %s: %s",
exchange_code,
exc,
)
continue
for order in orders:
stock_code = str(order.get("pdno", ""))
order_exchange = str(order.get("ovrs_excg_cd") or exchange_code)
try:
odno = order.get("odno", "")
nccs_qty = int(order.get("nccs_qty", "0") or "0")
if not stock_code or not odno or nccs_qty <= 0:
continue
cancel_result = await overseas_broker.cancel_overseas_order(
exchange_code=order_exchange,
stock_code=stock_code,
odno=odno,
qty=nccs_qty,
)
if cancel_result.get("rt_cd") != "0":
failures.append(
"overseas cancel failed for"
f" {order_exchange}/{stock_code}: rt_cd={cancel_result.get('rt_cd')}"
f" msg={cancel_result.get('msg1')}"
)
except Exception as exc:
logger.warning("KillSwitch: overseas cancel failed: %s", exc)
failures.append(
f"overseas cancel exception for {order_exchange}/{stock_code}: {exc}"
)
if failures:
raise RuntimeError("; ".join(failures[:3]))
async def _refresh_order_state_for_kill_switch(
*,
broker: KISBroker,
overseas_broker: OverseasBroker,
markets: list[MarketInfo],
) -> None:
seen_overseas: set[str] = set()
for market in markets:
try:
if market.is_domestic:
await broker.get_balance()
elif market.exchange_code not in seen_overseas:
seen_overseas.add(market.exchange_code)
await overseas_broker.get_overseas_balance(market.exchange_code)
except Exception as exc:
logger.warning(
"KillSwitch: refresh state failed for %s/%s: %s",
market.code,
market.exchange_code,
exc,
)
def _reduce_risk_for_kill_switch() -> None:
dropped = BLACKOUT_ORDER_MANAGER.clear()
logger.critical("KillSwitch: reduced queued order risk by clearing %d queued intents", dropped)
async def _trigger_emergency_kill_switch(
*,
reason: str,
broker: KISBroker,
overseas_broker: OverseasBroker,
telegram: TelegramClient,
settings: Settings | None,
current_market: MarketInfo | None,
stock_code: str,
pnl_pct: float,
threshold: float,
) -> Any:
markets = _resolve_kill_switch_markets(settings=settings, current_market=current_market)
return await KILL_SWITCH.trigger(
reason=reason,
cancel_pending_orders=lambda: _cancel_pending_orders_for_kill_switch(
broker=broker,
overseas_broker=overseas_broker,
markets=markets,
),
refresh_order_state=lambda: _refresh_order_state_for_kill_switch(
broker=broker,
overseas_broker=overseas_broker,
markets=markets,
),
reduce_risk=_reduce_risk_for_kill_switch,
snapshot_state=lambda: logger.critical(
"KillSwitch snapshot %s/%s pnl=%.2f threshold=%.2f",
current_market.code if current_market else "UNKNOWN",
stock_code,
pnl_pct,
threshold,
),
notify=lambda: telegram.notify_circuit_breaker(
pnl_pct=pnl_pct,
threshold=threshold,
),
)
async def trading_cycle(
broker: KISBroker,
overseas_broker: OverseasBroker,
@@ -1111,6 +1312,24 @@ async def trading_cycle(
)
return
order_amount = current_price * quantity
fx_blocked, remaining_cash, required_buffer = _should_block_overseas_buy_for_fx_buffer(
market=market,
action=decision.action,
total_cash=total_cash,
order_amount=order_amount,
settings=settings,
)
if fx_blocked:
logger.warning(
"Skip BUY %s (%s): FX buffer guard (remaining=%.2f, required=%.2f, cash=%.2f, order=%.2f)",
stock_code,
market.name,
remaining_cash,
required_buffer,
total_cash,
order_amount,
)
return
# 4. Check BUY cooldown (set when a prior BUY failed due to insufficient balance)
if decision.action == "BUY" and buy_cooldown is not None:
@@ -1151,15 +1370,16 @@ async def trading_cycle(
logger.warning("Fat finger notification failed: %s", notify_exc)
raise # Re-raise to prevent trade
except CircuitBreakerTripped as exc:
ks_report = await KILL_SWITCH.trigger(
ks_report = await _trigger_emergency_kill_switch(
reason=f"circuit_breaker:{market.code}:{stock_code}:{exc.pnl_pct:.2f}",
snapshot_state=lambda: logger.critical(
"KillSwitch snapshot %s/%s pnl=%.2f threshold=%.2f",
market.code,
stock_code,
exc.pnl_pct,
exc.threshold,
),
broker=broker,
overseas_broker=overseas_broker,
telegram=telegram,
settings=settings,
current_market=market,
stock_code=stock_code,
pnl_pct=exc.pnl_pct,
threshold=exc.threshold,
)
if ks_report.errors:
logger.critical(
@@ -2178,6 +2398,24 @@ async def run_daily_session(
)
continue
order_amount = stock_data["current_price"] * quantity
fx_blocked, remaining_cash, required_buffer = _should_block_overseas_buy_for_fx_buffer(
market=market,
action=decision.action,
total_cash=total_cash,
order_amount=order_amount,
settings=settings,
)
if fx_blocked:
logger.warning(
"Skip BUY %s (%s): FX buffer guard (remaining=%.2f, required=%.2f, cash=%.2f, order=%.2f)",
stock_code,
market.name,
remaining_cash,
required_buffer,
total_cash,
order_amount,
)
continue
# Check BUY cooldown (insufficient balance)
if decision.action == "BUY":
@@ -2218,26 +2456,18 @@ async def run_daily_session(
logger.warning("Fat finger notification failed: %s", notify_exc)
continue # Skip this order
except CircuitBreakerTripped as exc:
ks_report = await KILL_SWITCH.trigger(
ks_report = await _trigger_emergency_kill_switch(
reason=f"daily_circuit_breaker:{market.code}:{stock_code}:{exc.pnl_pct:.2f}",
snapshot_state=lambda: logger.critical(
"Daily KillSwitch snapshot %s/%s pnl=%.2f threshold=%.2f",
market.code,
stock_code,
exc.pnl_pct,
exc.threshold,
),
broker=broker,
overseas_broker=overseas_broker,
telegram=telegram,
settings=settings,
current_market=market,
stock_code=stock_code,
pnl_pct=exc.pnl_pct,
threshold=exc.threshold,
)
logger.critical("Circuit breaker tripped — stopping session")
try:
await telegram.notify_circuit_breaker(
pnl_pct=exc.pnl_pct,
threshold=exc.threshold,
)
except Exception as notify_exc:
logger.warning(
"Circuit breaker notification failed: %s", notify_exc
)
if ks_report.errors:
logger.critical(
"Daily KillSwitch step errors for %s/%s: %s",

View File

@@ -15,6 +15,8 @@ from src.evolution.scorecard import DailyScorecard
from src.logging.decision_logger import DecisionLogger
from src.main import (
KILL_SWITCH,
_should_block_overseas_buy_for_fx_buffer,
_trigger_emergency_kill_switch,
_apply_dashboard_flag,
_determine_order_quantity,
_extract_avg_price_from_balance,
@@ -3689,6 +3691,81 @@ class TestOverseasBrokerIntegration:
# DB도 브로커도 보유 없음 → BUY 주문이 실행되어야 함 (회귀 테스트)
overseas_broker.send_overseas_order.assert_called_once()
@pytest.mark.asyncio
async def test_overseas_buy_blocked_by_usd_buffer_guard(self) -> None:
"""Overseas BUY must be blocked when USD buffer would be breached."""
db_conn = init_db(":memory:")
overseas_broker = MagicMock()
overseas_broker.get_overseas_price = AsyncMock(
return_value={"output": {"last": "182.50"}}
)
overseas_broker.get_overseas_balance = AsyncMock(
return_value={
"output1": [],
"output2": [
{
"frcr_evlu_tota": "50000.00",
"frcr_buy_amt_smtl": "0.00",
}
],
}
)
overseas_broker.get_overseas_buying_power = AsyncMock(
return_value={"output": {"ovrs_ord_psbl_amt": "50000.00"}}
)
overseas_broker.send_overseas_order = AsyncMock(return_value={"msg1": "주문접수"})
engine = MagicMock(spec=ScenarioEngine)
engine.evaluate = MagicMock(return_value=_make_buy_match("AAPL"))
market = MagicMock()
market.name = "NASDAQ"
market.code = "US_NASDAQ"
market.exchange_code = "NASD"
market.is_domestic = False
telegram = MagicMock()
telegram.notify_trade_execution = AsyncMock()
telegram.notify_fat_finger = AsyncMock()
telegram.notify_circuit_breaker = AsyncMock()
telegram.notify_scenario_matched = AsyncMock()
decision_logger = MagicMock()
decision_logger.log_decision = MagicMock(return_value="decision-id")
settings = MagicMock()
settings.POSITION_SIZING_ENABLED = False
settings.CONFIDENCE_THRESHOLD = 80
settings.USD_BUFFER_MIN = 49900.0
settings.MODE = "paper"
settings.PAPER_OVERSEAS_CASH = 50000.0
await trading_cycle(
broker=MagicMock(),
overseas_broker=overseas_broker,
scenario_engine=engine,
playbook=_make_playbook(market="US"),
risk=MagicMock(),
db_conn=db_conn,
decision_logger=decision_logger,
context_store=MagicMock(
get_latest_timeframe=MagicMock(return_value=None),
set_context=MagicMock(),
),
criticality_assessor=MagicMock(
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
get_timeout=MagicMock(return_value=5.0),
),
telegram=telegram,
market=market,
stock_code="AAPL",
scan_candidates={},
settings=settings,
)
overseas_broker.send_overseas_order.assert_not_called()
# ---------------------------------------------------------------------------
# _retry_connection — unit tests (issue #209)
@@ -3722,7 +3799,6 @@ class TestRetryConnection:
with patch("src.main.asyncio.sleep") as mock_sleep:
mock_sleep.return_value = None
result = await _retry_connection(flaky, label="flaky")
assert result == "ok"
assert call_count == 2
mock_sleep.assert_called_once()
@@ -3777,6 +3853,48 @@ class TestRetryConnection:
assert call_count == 1 # No retry for non-ConnectionError
def test_fx_buffer_guard_applies_only_to_us_and_respects_boundary() -> None:
settings = MagicMock()
settings.USD_BUFFER_MIN = 1000.0
us_market = MagicMock()
us_market.is_domestic = False
us_market.code = "US_NASDAQ"
blocked, remaining, required = _should_block_overseas_buy_for_fx_buffer(
market=us_market,
action="BUY",
total_cash=5000.0,
order_amount=4001.0,
settings=settings,
)
assert blocked
assert remaining == 999.0
assert required == 1000.0
blocked_eq, _, _ = _should_block_overseas_buy_for_fx_buffer(
market=us_market,
action="BUY",
total_cash=5000.0,
order_amount=4000.0,
settings=settings,
)
assert not blocked_eq
jp_market = MagicMock()
jp_market.is_domestic = False
jp_market.code = "JP"
blocked_jp, _, required_jp = _should_block_overseas_buy_for_fx_buffer(
market=jp_market,
action="BUY",
total_cash=5000.0,
order_amount=4500.0,
settings=settings,
)
assert not blocked_jp
assert required_jp == 0.0
# run_daily_session — daily CB baseline (daily_start_eval) tests (issue #207)
# ---------------------------------------------------------------------------
@@ -5349,3 +5467,118 @@ async def test_process_blackout_recovery_drops_policy_rejected_intent() -> None:
broker.send_order.assert_not_called()
blackout_manager.requeue.assert_not_called()
@pytest.mark.asyncio
async def test_trigger_emergency_kill_switch_executes_operational_steps() -> None:
"""Emergency kill switch should execute cancel/refresh/reduce/notify callbacks."""
broker = MagicMock()
broker.get_domestic_pending_orders = AsyncMock(
return_value=[
{
"pdno": "005930",
"orgn_odno": "1",
"ord_gno_brno": "01",
"psbl_qty": "3",
}
]
)
broker.cancel_domestic_order = AsyncMock(return_value={"rt_cd": "0"})
broker.get_balance = AsyncMock(return_value={"output1": [], "output2": []})
overseas_broker = MagicMock()
overseas_broker.get_overseas_pending_orders = AsyncMock(return_value=[])
overseas_broker.get_overseas_balance = AsyncMock(return_value={"output1": [], "output2": []})
telegram = MagicMock()
telegram.notify_circuit_breaker = AsyncMock()
settings = MagicMock()
settings.enabled_market_list = ["KR"]
market = MagicMock()
market.code = "KR"
market.exchange_code = "KRX"
market.is_domestic = True
with (
patch("src.main.MARKETS", {"KR": market}),
patch("src.main.BLACKOUT_ORDER_MANAGER.clear", return_value=2),
):
report = await _trigger_emergency_kill_switch(
reason="test",
broker=broker,
overseas_broker=overseas_broker,
telegram=telegram,
settings=settings,
current_market=market,
stock_code="005930",
pnl_pct=-3.2,
threshold=-3.0,
)
assert report.steps == [
"block_new_orders",
"cancel_pending_orders",
"refresh_order_state",
"reduce_risk",
"snapshot_state",
"notify",
]
broker.cancel_domestic_order.assert_called_once()
broker.get_balance.assert_called_once()
telegram.notify_circuit_breaker.assert_called_once_with(
pnl_pct=-3.2,
threshold=-3.0,
)
@pytest.mark.asyncio
async def test_trigger_emergency_kill_switch_records_cancel_failure() -> None:
"""Cancel API rejection should be captured in kill switch errors."""
broker = MagicMock()
broker.get_domestic_pending_orders = AsyncMock(
return_value=[
{
"pdno": "005930",
"orgn_odno": "1",
"ord_gno_brno": "01",
"psbl_qty": "3",
}
]
)
broker.cancel_domestic_order = AsyncMock(return_value={"rt_cd": "1", "msg1": "fail"})
broker.get_balance = AsyncMock(return_value={"output1": [], "output2": []})
overseas_broker = MagicMock()
overseas_broker.get_overseas_pending_orders = AsyncMock(return_value=[])
overseas_broker.get_overseas_balance = AsyncMock(return_value={"output1": [], "output2": []})
telegram = MagicMock()
telegram.notify_circuit_breaker = AsyncMock()
settings = MagicMock()
settings.enabled_market_list = ["KR"]
market = MagicMock()
market.code = "KR"
market.exchange_code = "KRX"
market.is_domestic = True
with (
patch("src.main.MARKETS", {"KR": market}),
patch("src.main.BLACKOUT_ORDER_MANAGER.clear", return_value=0),
):
report = await _trigger_emergency_kill_switch(
reason="test-fail",
broker=broker,
overseas_broker=overseas_broker,
telegram=telegram,
settings=settings,
current_market=market,
stock_code="005930",
pnl_pct=-3.2,
threshold=-3.0,
)
assert any(err.startswith("cancel_pending_orders:") for err in report.errors)

View File

@@ -0,0 +1,131 @@
from __future__ import annotations
from src.analysis.triple_barrier import TripleBarrierSpec, label_with_triple_barrier
def test_long_take_profit_first() -> None:
highs = [100, 101, 103]
lows = [100, 99.6, 100]
closes = [100, 100, 102]
spec = TripleBarrierSpec(take_profit_pct=0.02, stop_loss_pct=0.01, max_holding_bars=3)
out = label_with_triple_barrier(
highs=highs,
lows=lows,
closes=closes,
entry_index=0,
side=1,
spec=spec,
)
assert out.label == 1
assert out.touched == "take_profit"
assert out.touch_bar == 2
def test_long_stop_loss_first() -> None:
highs = [100, 100.5, 101]
lows = [100, 98.8, 99]
closes = [100, 99.5, 100]
spec = TripleBarrierSpec(take_profit_pct=0.02, stop_loss_pct=0.01, max_holding_bars=3)
out = label_with_triple_barrier(
highs=highs,
lows=lows,
closes=closes,
entry_index=0,
side=1,
spec=spec,
)
assert out.label == -1
assert out.touched == "stop_loss"
assert out.touch_bar == 1
def test_time_barrier_timeout() -> None:
highs = [100, 100.8, 100.7]
lows = [100, 99.3, 99.4]
closes = [100, 100, 100]
spec = TripleBarrierSpec(take_profit_pct=0.02, stop_loss_pct=0.02, max_holding_bars=2)
out = label_with_triple_barrier(
highs=highs,
lows=lows,
closes=closes,
entry_index=0,
side=1,
spec=spec,
)
assert out.label == 0
assert out.touched == "time"
assert out.touch_bar == 2
def test_tie_break_stop_first_default() -> None:
highs = [100, 102.1]
lows = [100, 98.9]
closes = [100, 100]
spec = TripleBarrierSpec(take_profit_pct=0.02, stop_loss_pct=0.01, max_holding_bars=1)
out = label_with_triple_barrier(
highs=highs,
lows=lows,
closes=closes,
entry_index=0,
side=1,
spec=spec,
)
assert out.label == -1
assert out.touched == "stop_loss"
def test_short_side_inverts_barrier_semantics() -> None:
highs = [100, 100.5, 101.2]
lows = [100, 97.8, 98.0]
closes = [100, 99, 99]
spec = TripleBarrierSpec(take_profit_pct=0.02, stop_loss_pct=0.01, max_holding_bars=3)
out = label_with_triple_barrier(
highs=highs,
lows=lows,
closes=closes,
entry_index=0,
side=-1,
spec=spec,
)
assert out.label == 1
assert out.touched == "take_profit"
def test_short_tie_break_modes() -> None:
highs = [100, 101.1]
lows = [100, 97.9]
closes = [100, 100]
stop_first = TripleBarrierSpec(
take_profit_pct=0.02,
stop_loss_pct=0.01,
max_holding_bars=1,
tie_break="stop_first",
)
out_stop = label_with_triple_barrier(
highs=highs,
lows=lows,
closes=closes,
entry_index=0,
side=-1,
spec=stop_first,
)
assert out_stop.label == -1
assert out_stop.touched == "stop_loss"
take_first = TripleBarrierSpec(
take_profit_pct=0.02,
stop_loss_pct=0.01,
max_holding_bars=1,
tie_break="take_first",
)
out_take = label_with_triple_barrier(
highs=highs,
lows=lows,
closes=closes,
entry_index=0,
side=-1,
spec=take_first,
)
assert out_take.label == 1
assert out_take.touched == "take_profit"