From ec5d656fdf60708b0f6561dc811c5f7ebb17a09b Mon Sep 17 00:00:00 2001 From: agentson Date: Wed, 4 Mar 2026 03:12:23 +0900 Subject: [PATCH] feat: process active markets in parallel with fail-fast semantics (#401) --- src/main.py | 61 ++++++++++++++++++++++++++++------------------ tests/test_main.py | 27 ++++++++++++++++++++ 2 files changed, 64 insertions(+), 24 deletions(-) diff --git a/src/main.py b/src/main.py index 081ecb4..ecde4f6 100644 --- a/src/main.py +++ b/src/main.py @@ -12,6 +12,7 @@ import json import logging import signal import threading +from collections.abc import Awaitable, Callable from datetime import UTC, datetime from typing import Any @@ -3564,6 +3565,33 @@ def _should_rescan_market( return session_changed or (now_timestamp - last_scan >= rescan_interval) +async def _run_markets_in_parallel( + markets: list[Any], processor: Callable[[Any], Awaitable[None]] +) -> None: + """Run market processors in parallel and fail fast on the first exception.""" + if not markets: + return + + tasks = [asyncio.create_task(processor(market)) for market in markets] + done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION) + + first_exc: BaseException | None = None + for task in done: + exc = task.exception() + if exc is not None and first_exc is None: + first_exc = exc + + if first_exc is not None: + for task in pending: + task.cancel() + if pending: + await asyncio.gather(*pending, return_exceptions=True) + raise first_exc + + if pending: + await asyncio.gather(*pending) + + async def _run_evolution_loop( evolution_optimizer: EvolutionOptimizer, telegram: TelegramClient, @@ -4238,10 +4266,9 @@ async def run(settings: Settings) -> None: await asyncio.sleep(TRADE_INTERVAL_SECONDS) continue - # Process each open market - for market in open_markets: + async def _process_realtime_market(market: MarketInfo) -> None: if shutdown.is_set(): - break + return session_info = get_session_info(market) _session_risk_overrides(market=market, settings=settings) @@ -4330,12 +4357,9 @@ async def run(settings: Settings) -> None: ) if candidates: - # Use scanner results directly as trading candidates active_stocks[market.code] = smart_scanner.get_stock_codes( candidates ) - - # Store candidates per market for selection context logging scan_candidates[market.code] = {c.stock_code: c for c in candidates} logger.info( @@ -4345,12 +4369,8 @@ async def run(settings: Settings) -> None: [f"{c.stock_code}(RSI={c.rsi:.0f})" for c in candidates], ) - # Get market-local date for playbook keying market_today = datetime.now(market.timezone).date() - - # Load or generate playbook (1 Gemini call per market per day) if market.code not in playbooks: - # Try DB first (survives process restart) stored_pb = playbook_store.load(market_today, market.code) if stored_pb is not None: playbooks[market.code] = stored_pb @@ -4410,12 +4430,6 @@ async def run(settings: Settings) -> None: except Exception as exc: logger.error("Smart Scanner failed for %s: %s", market.name, exc) - # Get active stocks from scanner (dynamic, no static fallback). - # Also include currently-held positions so stop-loss / - # take-profit can fire even when a holding drops off the - # scanner. Broker balance is the source of truth here — - # unlike the local DB it reflects actual fills and any - # manual trades done outside the bot. scanner_codes = active_stocks.get(market.code, []) try: if market.is_domestic: @@ -4446,16 +4460,14 @@ async def run(settings: Settings) -> None: if not stock_codes: logger.debug("No active stocks for market %s", market.code) - continue + return logger.info("Processing market: %s (%d stocks)", market.name, len(stock_codes)) - # Process each stock from scanner results for stock_code in stock_codes: if shutdown.is_set(): break - # Get playbook for this market market_playbook = playbooks.get( market.code, PreMarketPlanner._empty_playbook( @@ -4463,7 +4475,6 @@ async def run(settings: Settings) -> None: ), ) - # Retry logic for connection errors for attempt in range(1, MAX_CONNECTION_RETRIES + 1): try: await trading_cycle( @@ -4483,7 +4494,7 @@ async def run(settings: Settings) -> None: settings, buy_cooldown, ) - break # Success — exit retry loop + break except CircuitBreakerTripped as exc: logger.critical("Circuit breaker tripped — shutting down") try: @@ -4505,17 +4516,19 @@ async def run(settings: Settings) -> None: MAX_CONNECTION_RETRIES, exc, ) - await asyncio.sleep(2**attempt) # Exponential backoff + await asyncio.sleep(2**attempt) else: logger.error( "Connection error for %s (all retries exhausted): %s", stock_code, exc, ) - break # Give up on this stock + break except Exception as exc: logger.exception("Unexpected error for %s: %s", stock_code, exc) - break # Don't retry on unexpected errors + break + + await _run_markets_in_parallel(open_markets, _process_realtime_market) # Log priority queue metrics periodically metrics = await priority_queue.get_metrics() diff --git a/tests/test_main.py b/tests/test_main.py index efa08f8..65f61db 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -1,5 +1,6 @@ """Tests for main trading loop integration.""" +import asyncio from datetime import UTC, date, datetime from typing import Any from unittest.mock import ANY, AsyncMock, MagicMock, patch @@ -42,6 +43,7 @@ from src.main import ( _retry_connection, _run_context_scheduler, _run_evolution_loop, + _run_markets_in_parallel, _should_block_overseas_buy_for_fx_buffer, _should_force_exit_for_overnight, _should_rescan_market, @@ -174,6 +176,31 @@ class TestRealtimeSessionStateHelpers: session_changed=False, ) + +class TestMarketParallelRunner: + """Tests for market-level parallel processing helper.""" + + @pytest.mark.asyncio + async def test_run_markets_in_parallel_runs_all_markets(self) -> None: + processed: list[str] = [] + + async def _processor(market: str) -> None: + await asyncio.sleep(0.01) + processed.append(market) + + await _run_markets_in_parallel(["KR", "US_NASDAQ", "US_NYSE"], _processor) + assert set(processed) == {"KR", "US_NASDAQ", "US_NYSE"} + + @pytest.mark.asyncio + async def test_run_markets_in_parallel_propagates_errors(self) -> None: + async def _processor(market: str) -> None: + if market == "US_NASDAQ": + raise RuntimeError("boom") + await asyncio.sleep(0.01) + + with pytest.raises(RuntimeError, match="boom"): + await _run_markets_in_parallel(["KR", "US_NASDAQ"], _processor) + def test_returns_zero_when_field_absent(self) -> None: """Returns 0.0 when pchs_avg_pric key is missing entirely.""" balance = {"output1": [{"pdno": "005930", "ord_psbl_qty": "5"}]} -- 2.49.1