fix: add dual-listing spread routing and session propagation
Some checks failed
Gitea CI / test (push) Successful in 36s
Gitea CI / test (pull_request) Failing after 5s

This commit is contained in:
agentson
2026-03-04 10:16:28 +09:00
parent c80f3daad7
commit 9fd9c552f3
4 changed files with 142 additions and 7 deletions

View File

@@ -350,4 +350,3 @@ tea issues create -t "bug: runtime anomaly during #409 monitor" -d "$ISSUE_BODY"
**Step 5: Post monitoring summary to #409/#318/#325**
- Include PASS/FAIL/NOT_OBSERVED matrix and exact timestamps.
- Do not close #318/#325 without concrete acceptance evidence.

View File

@@ -218,12 +218,21 @@ class KISBroker:
async def get_orderbook(self, stock_code: str) -> dict[str, Any]:
"""Fetch the current orderbook for a given stock code."""
return await self.get_orderbook_by_market(stock_code, market_div_code="J")
async def get_orderbook_by_market(
self,
stock_code: str,
*,
market_div_code: str,
) -> dict[str, Any]:
"""Fetch orderbook for a specific domestic market division code."""
await self._rate_limiter.acquire()
session = self._get_session()
headers = await self._auth_headers("FHKST01010200")
params = {
"FID_COND_MRKT_DIV_CODE": "J",
"FID_COND_MRKT_DIV_CODE": market_div_code,
"FID_INPUT_ISCD": stock_code,
}
url = f"{self._base_url}/uapi/domestic-stock/v1/quotations/inquire-asking-price-exp-ccn"
@@ -237,6 +246,76 @@ class KISBroker:
except (TimeoutError, aiohttp.ClientError) as exc:
raise ConnectionError(f"Network error fetching orderbook: {exc}") from exc
@staticmethod
def _extract_orderbook_metrics(payload: dict[str, Any]) -> tuple[float | None, float | None]:
output = payload.get("output1") or payload.get("output") or {}
if not isinstance(output, dict):
return None, None
def _float(*keys: str) -> float | None:
for key in keys:
raw = output.get(key)
if raw in (None, ""):
continue
try:
return float(cast(str | int | float, raw))
except (ValueError, TypeError):
continue
return None
ask = _float("askp1", "stck_askp1")
bid = _float("bidp1", "stck_bidp1")
if ask is not None and bid is not None and ask > 0 and bid > 0 and ask >= bid:
mid = (ask + bid) / 2
if mid > 0:
spread = (ask - bid) / mid
else:
spread = None
else:
spread = None
ask_qty = _float("askp_rsqn1", "ask_qty1")
bid_qty = _float("bidp_rsqn1", "bid_qty1")
if ask_qty is not None and bid_qty is not None and ask_qty >= 0 and bid_qty >= 0:
liquidity = ask_qty + bid_qty
else:
liquidity = None
return spread, liquidity
async def _load_dual_listing_metrics(
self,
stock_code: str,
) -> tuple[bool, float | None, float | None, float | None, float | None]:
"""Try KRX/NXT orderbooks and derive spread/liquidity metrics."""
spread_krx: float | None = None
spread_nxt: float | None = None
liquidity_krx: float | None = None
liquidity_nxt: float | None = None
for market_div_code, exchange in (("J", "KRX"), ("NX", "NXT")):
try:
payload = await self.get_orderbook_by_market(
stock_code,
market_div_code=market_div_code,
)
except ConnectionError:
continue
spread, liquidity = self._extract_orderbook_metrics(payload)
if exchange == "KRX":
spread_krx = spread
liquidity_krx = liquidity
else:
spread_nxt = spread
liquidity_nxt = liquidity
is_dual_listed = (
(spread_krx is not None and spread_nxt is not None)
or (liquidity_krx is not None and liquidity_nxt is not None)
)
return is_dual_listed, spread_krx, spread_nxt, liquidity_krx, liquidity_nxt
async def get_current_price(self, stock_code: str) -> tuple[float, float, float]:
"""Fetch current price data for a domestic stock.
@@ -318,7 +397,7 @@ class KISBroker:
stock_code: str,
order_type: str, # "BUY" or "SELL"
quantity: int,
price: int = 0,
price: float = 0,
session_id: str | None = None,
) -> dict[str, Any]:
"""Submit a buy or sell order.
@@ -350,9 +429,24 @@ class KISBroker:
ord_price = 0
resolved_session = session_id or classify_session_id(MARKETS["KR"])
if session_id is not None:
is_dual_listed, spread_krx, spread_nxt, liquidity_krx, liquidity_nxt = (
await self._load_dual_listing_metrics(stock_code)
)
else:
is_dual_listed = False
spread_krx = None
spread_nxt = None
liquidity_krx = None
liquidity_nxt = None
resolution = self._kr_router.resolve_for_order(
stock_code=stock_code,
session_id=resolved_session,
is_dual_listed=is_dual_listed,
spread_krx=spread_krx,
spread_nxt=spread_nxt,
liquidity_krx=liquidity_krx,
liquidity_nxt=liquidity_nxt,
)
body = {

View File

@@ -35,6 +35,7 @@ from src.core.criticality import CriticalityAssessor
from src.core.kill_switch import KillSwitchOrchestrator
from src.core.order_policy import (
OrderPolicyRejected,
classify_session_id,
get_session_info,
validate_order_policy,
)
@@ -224,23 +225,27 @@ def _compute_kr_dynamic_stop_loss_pct(
key="KR_ATR_STOP_MULTIPLIER_K",
default=2.0,
)
min_pct = _resolve_market_setting(
min_pct = float(
_resolve_market_setting(
market=market,
settings=settings,
key="KR_ATR_STOP_MIN_PCT",
default=-2.0,
)
)
max_pct = _resolve_market_setting(
max_pct = float(
_resolve_market_setting(
market=market,
settings=settings,
key="KR_ATR_STOP_MAX_PCT",
default=-7.0,
)
)
if max_pct > min_pct:
min_pct, max_pct = max_pct, min_pct
dynamic_stop_pct = -((k * atr_value) / entry_price) * 100.0
return max(max_pct, min(min_pct, dynamic_stop_pct))
return float(max(max_pct, min(min_pct, dynamic_stop_pct)))
def _stoploss_cooldown_key(*, market: MarketInfo, stock_code: str) -> str:
@@ -1200,6 +1205,7 @@ async def process_blackout_recovery_orders(
order_type=intent.order_type,
quantity=intent.quantity,
price=intent.price,
session_id=intent.session_id,
)
else:
result = await overseas_broker.send_overseas_order(
@@ -2083,6 +2089,7 @@ async def trading_cycle(
order_type=decision.action,
quantity=quantity,
price=order_price,
session_id=runtime_session_id,
)
else:
# For overseas orders, always use limit orders (지정가):
@@ -2417,6 +2424,7 @@ async def handle_domestic_pending_orders(
order_type="SELL",
quantity=psbl_qty,
price=new_price,
session_id=classify_session_id(MARKETS["KR"]),
)
sell_resubmit_counts[key] = sell_resubmit_counts.get(key, 0) + 1
try:
@@ -3292,6 +3300,7 @@ async def run_daily_session(
order_type=decision.action,
quantity=quantity,
price=order_price,
session_id=runtime_session_id,
)
else:
# KIS VTS only accepts limit orders; use 0.5% premium for BUY

View File

@@ -615,7 +615,40 @@ class TestSendOrderTickRounding:
mock_order.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", side_effect=[mock_hash, mock_order]) as mock_post:
await broker.send_order("005930", "BUY", 1, price=50000, session_id="NXT_PRE")
with patch.object(
broker,
"_load_dual_listing_metrics",
new=AsyncMock(return_value=(False, None, None, None, None)),
):
await broker.send_order("005930", "BUY", 1, price=50000, session_id="NXT_PRE")
order_call = mock_post.call_args_list[1]
body = order_call[1].get("json", {})
assert body["EXCG_ID_DVSN_CD"] == "NXT"
@pytest.mark.asyncio
async def test_send_order_prefers_nxt_when_dual_listing_spread_is_tighter(
self, broker: KISBroker
) -> None:
mock_hash = AsyncMock()
mock_hash.status = 200
mock_hash.json = AsyncMock(return_value={"HASH": "h"})
mock_hash.__aenter__ = AsyncMock(return_value=mock_hash)
mock_hash.__aexit__ = AsyncMock(return_value=False)
mock_order = AsyncMock()
mock_order.status = 200
mock_order.json = AsyncMock(return_value={"rt_cd": "0"})
mock_order.__aenter__ = AsyncMock(return_value=mock_order)
mock_order.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", side_effect=[mock_hash, mock_order]) as mock_post:
with patch.object(
broker,
"_load_dual_listing_metrics",
new=AsyncMock(return_value=(True, 0.004, 0.002, 100000.0, 90000.0)),
):
await broker.send_order("005930", "BUY", 1, price=50000, session_id="KRX_REG")
order_call = mock_post.call_args_list[1]
body = order_call[1].get("json", {})