Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 815fb89585 | |||
|
|
79e51b8ece | ||
| c29e5125bc | |||
|
|
4afae017a2 | ||
|
|
650d464da5 | ||
| b1f5f3e888 | |||
|
|
ec5d656fdf | ||
| bacb0d2037 |
BIN
docs/한국투자증권_오픈API_전체문서_20260221_030000.xlsx
Normal file
BIN
docs/한국투자증권_오픈API_전체문서_20260221_030000.xlsx
Normal file
Binary file not shown.
61
src/main.py
61
src/main.py
@@ -12,6 +12,7 @@ import json
|
||||
import logging
|
||||
import signal
|
||||
import threading
|
||||
from collections.abc import Awaitable, Callable
|
||||
from datetime import UTC, datetime
|
||||
from typing import Any
|
||||
|
||||
@@ -3564,6 +3565,33 @@ def _should_rescan_market(
|
||||
return session_changed or (now_timestamp - last_scan >= rescan_interval)
|
||||
|
||||
|
||||
async def _run_markets_in_parallel(
|
||||
markets: list[Any], processor: Callable[[Any], Awaitable[None]]
|
||||
) -> None:
|
||||
"""Run market processors in parallel and fail fast on the first exception."""
|
||||
if not markets:
|
||||
return
|
||||
|
||||
tasks = [asyncio.create_task(processor(market)) for market in markets]
|
||||
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
|
||||
|
||||
first_exc: BaseException | None = None
|
||||
for task in done:
|
||||
exc = task.exception()
|
||||
if exc is not None and first_exc is None:
|
||||
first_exc = exc
|
||||
|
||||
if first_exc is not None:
|
||||
for task in pending:
|
||||
task.cancel()
|
||||
if pending:
|
||||
await asyncio.gather(*pending, return_exceptions=True)
|
||||
raise first_exc
|
||||
|
||||
if pending:
|
||||
await asyncio.gather(*pending)
|
||||
|
||||
|
||||
async def _run_evolution_loop(
|
||||
evolution_optimizer: EvolutionOptimizer,
|
||||
telegram: TelegramClient,
|
||||
@@ -4238,10 +4266,9 @@ async def run(settings: Settings) -> None:
|
||||
await asyncio.sleep(TRADE_INTERVAL_SECONDS)
|
||||
continue
|
||||
|
||||
# Process each open market
|
||||
for market in open_markets:
|
||||
async def _process_realtime_market(market: MarketInfo) -> None:
|
||||
if shutdown.is_set():
|
||||
break
|
||||
return
|
||||
|
||||
session_info = get_session_info(market)
|
||||
_session_risk_overrides(market=market, settings=settings)
|
||||
@@ -4330,12 +4357,9 @@ async def run(settings: Settings) -> None:
|
||||
)
|
||||
|
||||
if candidates:
|
||||
# Use scanner results directly as trading candidates
|
||||
active_stocks[market.code] = smart_scanner.get_stock_codes(
|
||||
candidates
|
||||
)
|
||||
|
||||
# Store candidates per market for selection context logging
|
||||
scan_candidates[market.code] = {c.stock_code: c for c in candidates}
|
||||
|
||||
logger.info(
|
||||
@@ -4345,12 +4369,8 @@ async def run(settings: Settings) -> None:
|
||||
[f"{c.stock_code}(RSI={c.rsi:.0f})" for c in candidates],
|
||||
)
|
||||
|
||||
# Get market-local date for playbook keying
|
||||
market_today = datetime.now(market.timezone).date()
|
||||
|
||||
# Load or generate playbook (1 Gemini call per market per day)
|
||||
if market.code not in playbooks:
|
||||
# Try DB first (survives process restart)
|
||||
stored_pb = playbook_store.load(market_today, market.code)
|
||||
if stored_pb is not None:
|
||||
playbooks[market.code] = stored_pb
|
||||
@@ -4410,12 +4430,6 @@ async def run(settings: Settings) -> None:
|
||||
except Exception as exc:
|
||||
logger.error("Smart Scanner failed for %s: %s", market.name, exc)
|
||||
|
||||
# Get active stocks from scanner (dynamic, no static fallback).
|
||||
# Also include currently-held positions so stop-loss /
|
||||
# take-profit can fire even when a holding drops off the
|
||||
# scanner. Broker balance is the source of truth here —
|
||||
# unlike the local DB it reflects actual fills and any
|
||||
# manual trades done outside the bot.
|
||||
scanner_codes = active_stocks.get(market.code, [])
|
||||
try:
|
||||
if market.is_domestic:
|
||||
@@ -4446,16 +4460,14 @@ async def run(settings: Settings) -> None:
|
||||
|
||||
if not stock_codes:
|
||||
logger.debug("No active stocks for market %s", market.code)
|
||||
continue
|
||||
return
|
||||
|
||||
logger.info("Processing market: %s (%d stocks)", market.name, len(stock_codes))
|
||||
|
||||
# Process each stock from scanner results
|
||||
for stock_code in stock_codes:
|
||||
if shutdown.is_set():
|
||||
break
|
||||
|
||||
# Get playbook for this market
|
||||
market_playbook = playbooks.get(
|
||||
market.code,
|
||||
PreMarketPlanner._empty_playbook(
|
||||
@@ -4463,7 +4475,6 @@ async def run(settings: Settings) -> None:
|
||||
),
|
||||
)
|
||||
|
||||
# Retry logic for connection errors
|
||||
for attempt in range(1, MAX_CONNECTION_RETRIES + 1):
|
||||
try:
|
||||
await trading_cycle(
|
||||
@@ -4483,7 +4494,7 @@ async def run(settings: Settings) -> None:
|
||||
settings,
|
||||
buy_cooldown,
|
||||
)
|
||||
break # Success — exit retry loop
|
||||
break
|
||||
except CircuitBreakerTripped as exc:
|
||||
logger.critical("Circuit breaker tripped — shutting down")
|
||||
try:
|
||||
@@ -4505,17 +4516,19 @@ async def run(settings: Settings) -> None:
|
||||
MAX_CONNECTION_RETRIES,
|
||||
exc,
|
||||
)
|
||||
await asyncio.sleep(2**attempt) # Exponential backoff
|
||||
await asyncio.sleep(2**attempt)
|
||||
else:
|
||||
logger.error(
|
||||
"Connection error for %s (all retries exhausted): %s",
|
||||
stock_code,
|
||||
exc,
|
||||
)
|
||||
break # Give up on this stock
|
||||
break
|
||||
except Exception as exc:
|
||||
logger.exception("Unexpected error for %s: %s", stock_code, exc)
|
||||
break # Don't retry on unexpected errors
|
||||
break
|
||||
|
||||
await _run_markets_in_parallel(open_markets, _process_realtime_market)
|
||||
|
||||
# Log priority queue metrics periodically
|
||||
metrics = await priority_queue.get_metrics()
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
"""Tests for main trading loop integration."""
|
||||
|
||||
import asyncio
|
||||
from datetime import UTC, date, datetime
|
||||
from typing import Any
|
||||
from unittest.mock import ANY, AsyncMock, MagicMock, patch
|
||||
@@ -42,6 +43,7 @@ from src.main import (
|
||||
_retry_connection,
|
||||
_run_context_scheduler,
|
||||
_run_evolution_loop,
|
||||
_run_markets_in_parallel,
|
||||
_should_block_overseas_buy_for_fx_buffer,
|
||||
_should_force_exit_for_overnight,
|
||||
_should_rescan_market,
|
||||
@@ -174,6 +176,31 @@ class TestRealtimeSessionStateHelpers:
|
||||
session_changed=False,
|
||||
)
|
||||
|
||||
|
||||
class TestMarketParallelRunner:
|
||||
"""Tests for market-level parallel processing helper."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_markets_in_parallel_runs_all_markets(self) -> None:
|
||||
processed: list[str] = []
|
||||
|
||||
async def _processor(market: str) -> None:
|
||||
await asyncio.sleep(0.01)
|
||||
processed.append(market)
|
||||
|
||||
await _run_markets_in_parallel(["KR", "US_NASDAQ", "US_NYSE"], _processor)
|
||||
assert set(processed) == {"KR", "US_NASDAQ", "US_NYSE"}
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_markets_in_parallel_propagates_errors(self) -> None:
|
||||
async def _processor(market: str) -> None:
|
||||
if market == "US_NASDAQ":
|
||||
raise RuntimeError("boom")
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
with pytest.raises(RuntimeError, match="boom"):
|
||||
await _run_markets_in_parallel(["KR", "US_NASDAQ"], _processor)
|
||||
|
||||
def test_returns_zero_when_field_absent(self) -> None:
|
||||
"""Returns 0.0 when pchs_avg_pric key is missing entirely."""
|
||||
balance = {"output1": [{"pdno": "005930", "ord_psbl_qty": "5"}]}
|
||||
|
||||
Reference in New Issue
Block a user