diff --git a/docs/한국투자증권_오픈API_전체문서_20260221_030000.xlsx b/docs/한국투자증권_오픈API_전체문서_20260221_030000.xlsx new file mode 100644 index 0000000..1a5e7cf Binary files /dev/null and b/docs/한국투자증권_오픈API_전체문서_20260221_030000.xlsx differ diff --git a/src/main.py b/src/main.py index a4ba69a..ecde4f6 100644 --- a/src/main.py +++ b/src/main.py @@ -12,6 +12,7 @@ import json import logging import signal import threading +from collections.abc import Awaitable, Callable from datetime import UTC, datetime from typing import Any @@ -2084,6 +2085,15 @@ async def trading_cycle( quantity=quantity, price=order_price, ) + if result.get("rt_cd", "0") != "0": + order_succeeded = False + msg1 = result.get("msg1") or "" + logger.warning( + "KR order not accepted for %s: rt_cd=%s msg=%s", + stock_code, + result.get("rt_cd"), + msg1, + ) else: # For overseas orders, always use limit orders (지정가): # - KIS market orders (ORD_DVSN=01) calculate quantity based on upper limit @@ -3293,6 +3303,15 @@ async def run_daily_session( quantity=quantity, price=order_price, ) + if result.get("rt_cd", "0") != "0": + order_succeeded = False + daily_msg1 = result.get("msg1") or "" + logger.warning( + "KR order not accepted for %s: rt_cd=%s msg=%s", + stock_code, + result.get("rt_cd"), + daily_msg1, + ) else: # KIS VTS only accepts limit orders; use 0.5% premium for BUY if decision.action == "BUY": @@ -3532,6 +3551,47 @@ def _run_context_scheduler( ) +def _has_market_session_transition( + market_states: dict[str, str], market_code: str, session_id: str +) -> bool: + """Return True when market session changed (or market has no prior state).""" + return market_states.get(market_code) != session_id + + +def _should_rescan_market( + *, last_scan: float, now_timestamp: float, rescan_interval: float, session_changed: bool +) -> bool: + """Force rescan on session transition; otherwise follow interval cadence.""" + return session_changed or (now_timestamp - last_scan >= rescan_interval) + + +async def _run_markets_in_parallel( + markets: list[Any], processor: Callable[[Any], Awaitable[None]] +) -> None: + """Run market processors in parallel and fail fast on the first exception.""" + if not markets: + return + + tasks = [asyncio.create_task(processor(market)) for market in markets] + done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION) + + first_exc: BaseException | None = None + for task in done: + exc = task.exception() + if exc is not None and first_exc is None: + first_exc = exc + + if first_exc is not None: + for task in pending: + task.cancel() + if pending: + await asyncio.gather(*pending, return_exceptions=True) + raise first_exc + + if pending: + await asyncio.gather(*pending) + + async def _run_evolution_loop( evolution_optimizer: EvolutionOptimizer, telegram: TelegramClient, @@ -4045,7 +4105,7 @@ async def run(settings: Settings) -> None: last_scan_time: dict[str, float] = {} # Track market open/close state for notifications - _market_states: dict[str, bool] = {} # market_code -> is_open + _market_states: dict[str, str] = {} # market_code -> session_id # Trading control events shutdown = asyncio.Event() @@ -4163,8 +4223,8 @@ async def run(settings: Settings) -> None: if not open_markets: # Notify market close for any markets that were open - for market_code, is_open in list(_market_states.items()): - if is_open: + for market_code, session_id in list(_market_states.items()): + if session_id: try: from src.markets.schedule import MARKETS @@ -4181,7 +4241,7 @@ async def run(settings: Settings) -> None: ) except Exception as exc: logger.warning("Market close notification failed: %s", exc) - _market_states[market_code] = False + _market_states.pop(market_code, None) # Clear playbook for closed market (new one generated next open) playbooks.pop(market_code, None) @@ -4206,10 +4266,9 @@ async def run(settings: Settings) -> None: await asyncio.sleep(TRADE_INTERVAL_SECONDS) continue - # Process each open market - for market in open_markets: + async def _process_realtime_market(market: MarketInfo) -> None: if shutdown.is_set(): - break + return session_info = get_session_info(market) _session_risk_overrides(market=market, settings=settings) @@ -4227,13 +4286,16 @@ async def run(settings: Settings) -> None: settings=settings, ) - # Notify market open if it just opened - if not _market_states.get(market.code, False): + # Notify on market/session transition (e.g., US_PRE -> US_REG) + session_changed = _has_market_session_transition( + _market_states, market.code, session_info.session_id + ) + if session_changed: try: await telegram.notify_market_open(market.name) except Exception as exc: logger.warning("Market open notification failed: %s", exc) - _market_states[market.code] = True + _market_states[market.code] = session_info.session_id # Check and handle domestic pending (unfilled) limit orders. if market.is_domestic: @@ -4265,7 +4327,12 @@ async def run(settings: Settings) -> None: now_timestamp = asyncio.get_event_loop().time() last_scan = last_scan_time.get(market.code, 0.0) rescan_interval = settings.RESCAN_INTERVAL_SECONDS - if now_timestamp - last_scan >= rescan_interval: + if _should_rescan_market( + last_scan=last_scan, + now_timestamp=now_timestamp, + rescan_interval=rescan_interval, + session_changed=session_changed, + ): try: logger.info("Smart Scanner: Scanning %s market", market.name) @@ -4290,12 +4357,9 @@ async def run(settings: Settings) -> None: ) if candidates: - # Use scanner results directly as trading candidates active_stocks[market.code] = smart_scanner.get_stock_codes( candidates ) - - # Store candidates per market for selection context logging scan_candidates[market.code] = {c.stock_code: c for c in candidates} logger.info( @@ -4305,12 +4369,8 @@ async def run(settings: Settings) -> None: [f"{c.stock_code}(RSI={c.rsi:.0f})" for c in candidates], ) - # Get market-local date for playbook keying market_today = datetime.now(market.timezone).date() - - # Load or generate playbook (1 Gemini call per market per day) if market.code not in playbooks: - # Try DB first (survives process restart) stored_pb = playbook_store.load(market_today, market.code) if stored_pb is not None: playbooks[market.code] = stored_pb @@ -4370,12 +4430,6 @@ async def run(settings: Settings) -> None: except Exception as exc: logger.error("Smart Scanner failed for %s: %s", market.name, exc) - # Get active stocks from scanner (dynamic, no static fallback). - # Also include currently-held positions so stop-loss / - # take-profit can fire even when a holding drops off the - # scanner. Broker balance is the source of truth here — - # unlike the local DB it reflects actual fills and any - # manual trades done outside the bot. scanner_codes = active_stocks.get(market.code, []) try: if market.is_domestic: @@ -4406,16 +4460,14 @@ async def run(settings: Settings) -> None: if not stock_codes: logger.debug("No active stocks for market %s", market.code) - continue + return logger.info("Processing market: %s (%d stocks)", market.name, len(stock_codes)) - # Process each stock from scanner results for stock_code in stock_codes: if shutdown.is_set(): break - # Get playbook for this market market_playbook = playbooks.get( market.code, PreMarketPlanner._empty_playbook( @@ -4423,7 +4475,6 @@ async def run(settings: Settings) -> None: ), ) - # Retry logic for connection errors for attempt in range(1, MAX_CONNECTION_RETRIES + 1): try: await trading_cycle( @@ -4443,7 +4494,7 @@ async def run(settings: Settings) -> None: settings, buy_cooldown, ) - break # Success — exit retry loop + break except CircuitBreakerTripped as exc: logger.critical("Circuit breaker tripped — shutting down") try: @@ -4465,17 +4516,19 @@ async def run(settings: Settings) -> None: MAX_CONNECTION_RETRIES, exc, ) - await asyncio.sleep(2**attempt) # Exponential backoff + await asyncio.sleep(2**attempt) else: logger.error( "Connection error for %s (all retries exhausted): %s", stock_code, exc, ) - break # Give up on this stock + break except Exception as exc: logger.exception("Unexpected error for %s: %s", stock_code, exc) - break # Don't retry on unexpected errors + break + + await _run_markets_in_parallel(open_markets, _process_realtime_market) # Log priority queue metrics periodically metrics = await priority_queue.get_metrics() diff --git a/src/markets/schedule.py b/src/markets/schedule.py index a87408e..9b7b421 100644 --- a/src/markets/schedule.py +++ b/src/markets/schedule.py @@ -207,7 +207,7 @@ def get_open_markets( from src.core.order_policy import classify_session_id session_id = classify_session_id(market, now) - return session_id not in {"KR_OFF", "US_OFF"} + return session_id not in {"KR_OFF", "US_OFF", "US_DAY"} return is_market_open(market, now) open_markets = [ @@ -254,10 +254,10 @@ def get_next_market_open( from src.core.order_policy import classify_session_id ts = start_utc.astimezone(ZoneInfo("UTC")).replace(second=0, microsecond=0) - prev_active = classify_session_id(market, ts) not in {"KR_OFF", "US_OFF"} + prev_active = classify_session_id(market, ts) not in {"KR_OFF", "US_OFF", "US_DAY"} for _ in range(7 * 24 * 60): ts = ts + timedelta(minutes=1) - active = classify_session_id(market, ts) not in {"KR_OFF", "US_OFF"} + active = classify_session_id(market, ts) not in {"KR_OFF", "US_OFF", "US_DAY"} if active and not prev_active: return ts prev_active = active diff --git a/tests/test_main.py b/tests/test_main.py index 47c2ad3..65f61db 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -1,5 +1,6 @@ """Tests for main trading loop integration.""" +import asyncio from datetime import UTC, date, datetime from typing import Any from unittest.mock import ANY, AsyncMock, MagicMock, patch @@ -34,6 +35,7 @@ from src.main import ( _extract_held_codes_from_balance, _extract_held_qty_from_balance, _handle_market_close, + _has_market_session_transition, _inject_staged_exit_features, _maybe_queue_order_intent, _resolve_market_setting, @@ -41,8 +43,10 @@ from src.main import ( _retry_connection, _run_context_scheduler, _run_evolution_loop, + _run_markets_in_parallel, _should_block_overseas_buy_for_fx_buffer, _should_force_exit_for_overnight, + _should_rescan_market, _split_trade_pnl_components, _start_dashboard_server, _stoploss_cooldown_minutes, @@ -140,6 +144,63 @@ class TestExtractAvgPriceFromBalance: result = _extract_avg_price_from_balance(balance, "AAPL", is_domestic=False) assert result == 170.5 + +class TestRealtimeSessionStateHelpers: + """Tests for realtime loop session-transition/rescan helper logic.""" + + def test_has_market_session_transition_when_state_missing(self) -> None: + states: dict[str, str] = {} + assert _has_market_session_transition(states, "US_NASDAQ", "US_REG") + + def test_has_market_session_transition_when_session_changes(self) -> None: + states = {"US_NASDAQ": "US_PRE"} + assert _has_market_session_transition(states, "US_NASDAQ", "US_REG") + + def test_has_market_session_transition_false_when_same_session(self) -> None: + states = {"US_NASDAQ": "US_REG"} + assert not _has_market_session_transition(states, "US_NASDAQ", "US_REG") + + def test_should_rescan_market_forces_on_session_transition(self) -> None: + assert _should_rescan_market( + last_scan=1000.0, + now_timestamp=1050.0, + rescan_interval=300.0, + session_changed=True, + ) + + def test_should_rescan_market_uses_interval_without_transition(self) -> None: + assert not _should_rescan_market( + last_scan=1000.0, + now_timestamp=1050.0, + rescan_interval=300.0, + session_changed=False, + ) + + +class TestMarketParallelRunner: + """Tests for market-level parallel processing helper.""" + + @pytest.mark.asyncio + async def test_run_markets_in_parallel_runs_all_markets(self) -> None: + processed: list[str] = [] + + async def _processor(market: str) -> None: + await asyncio.sleep(0.01) + processed.append(market) + + await _run_markets_in_parallel(["KR", "US_NASDAQ", "US_NYSE"], _processor) + assert set(processed) == {"KR", "US_NASDAQ", "US_NYSE"} + + @pytest.mark.asyncio + async def test_run_markets_in_parallel_propagates_errors(self) -> None: + async def _processor(market: str) -> None: + if market == "US_NASDAQ": + raise RuntimeError("boom") + await asyncio.sleep(0.01) + + with pytest.raises(RuntimeError, match="boom"): + await _run_markets_in_parallel(["KR", "US_NASDAQ"], _processor) + def test_returns_zero_when_field_absent(self) -> None: """Returns 0.0 when pchs_avg_pric key is missing entirely.""" balance = {"output1": [{"pdno": "005930", "ord_psbl_qty": "5"}]} @@ -913,6 +974,46 @@ class TestTradingCycleTelegramIntegration: # Verify notification was attempted mock_telegram.notify_trade_execution.assert_called_once() + @pytest.mark.asyncio + async def test_kr_rejected_order_does_not_notify_or_log_trade( + self, + mock_broker: MagicMock, + mock_overseas_broker: MagicMock, + mock_scenario_engine: MagicMock, + mock_playbook: DayPlaybook, + mock_risk: MagicMock, + mock_db: MagicMock, + mock_decision_logger: MagicMock, + mock_context_store: MagicMock, + mock_criticality_assessor: MagicMock, + mock_telegram: MagicMock, + mock_market: MagicMock, + ) -> None: + """KR orders rejected by KIS should not trigger success side effects.""" + mock_broker.send_order = AsyncMock( + return_value={"rt_cd": "1", "msg1": "장운영시간이 아닙니다."} + ) + + with patch("src.main.log_trade") as mock_log_trade: + await trading_cycle( + broker=mock_broker, + overseas_broker=mock_overseas_broker, + scenario_engine=mock_scenario_engine, + playbook=mock_playbook, + risk=mock_risk, + db_conn=mock_db, + decision_logger=mock_decision_logger, + context_store=mock_context_store, + criticality_assessor=mock_criticality_assessor, + telegram=mock_telegram, + market=mock_market, + stock_code="005930", + scan_candidates={}, + ) + + mock_telegram.notify_trade_execution.assert_not_called() + mock_log_trade.assert_not_called() + @pytest.mark.asyncio async def test_fat_finger_notification_sent( self, diff --git a/tests/test_market_schedule.py b/tests/test_market_schedule.py index 8723c2f..38d035b 100644 --- a/tests/test_market_schedule.py +++ b/tests/test_market_schedule.py @@ -165,6 +165,17 @@ class TestGetOpenMarkets: ) assert {m.code for m in extended} == {"US_NASDAQ", "US_NYSE", "US_AMEX"} + def test_get_open_markets_excludes_us_day_when_extended_enabled(self) -> None: + """US_DAY should be treated as non-tradable even in extended-session lookup.""" + # Monday 2026-02-02 10:30 KST = 01:30 UTC (US_DAY by session classification) + test_time = datetime(2026, 2, 2, 1, 30, tzinfo=ZoneInfo("UTC")) + extended = get_open_markets( + enabled_markets=["US_NASDAQ", "US_NYSE", "US_AMEX"], + now=test_time, + include_extended_sessions=True, + ) + assert extended == [] + class TestGetNextMarketOpen: """Test get_next_market_open function.""" @@ -214,8 +225,8 @@ class TestGetNextMarketOpen: def test_get_next_market_open_prefers_extended_session(self) -> None: """Extended lookup should return premarket open time before regular open.""" # Monday 2026-02-02 07:00 EST = 12:00 UTC - # By v3 KST session rules, US is OFF only in KST 07:00-10:00 (UTC 22:00-01:00). - # At 12:00 UTC market is active, so next OFF->ON transition is 01:00 UTC next day. + # US_DAY is treated as non-tradable in extended lookup, so after entering + # US_DAY the next tradable OFF->ON transition is US_PRE at 09:00 UTC next day. test_time = datetime(2026, 2, 2, 12, 0, tzinfo=ZoneInfo("UTC")) market, next_open = get_next_market_open( enabled_markets=["US_NASDAQ"], @@ -223,7 +234,7 @@ class TestGetNextMarketOpen: include_extended_sessions=True, ) assert market.code == "US_NASDAQ" - assert next_open == datetime(2026, 2, 3, 1, 0, tzinfo=ZoneInfo("UTC")) + assert next_open == datetime(2026, 2, 3, 9, 0, tzinfo=ZoneInfo("UTC")) class TestExpandMarketCodes: