From 16bb8b6dc604263e73162b0dc3899bfcfefec7a5 Mon Sep 17 00:00:00 2001 From: agentson Date: Tue, 17 Feb 2026 05:50:10 +0900 Subject: [PATCH] feat: add overseas ranking integration with dynamic fallback --- src/analysis/smart_scanner.py | 177 ++++++++++++++++++++++++++++++++++ src/broker/overseas.py | 67 +++++++++++++ src/config.py | 12 +++ src/db.py | 18 ++++ src/main.py | 112 ++++++++++++++++++++- tests/test_smart_scanner.py | 95 ++++++++++++++++++ 6 files changed, 478 insertions(+), 3 deletions(-) diff --git a/src/analysis/smart_scanner.py b/src/analysis/smart_scanner.py index b25f15a..fae55b0 100644 --- a/src/analysis/smart_scanner.py +++ b/src/analysis/smart_scanner.py @@ -12,7 +12,9 @@ from typing import Any from src.analysis.volatility import VolatilityAnalyzer from src.broker.kis_api import KISBroker +from src.broker.overseas import OverseasBroker from src.config import Settings +from src.markets.schedule import MarketInfo logger = logging.getLogger(__name__) @@ -45,6 +47,7 @@ class SmartVolatilityScanner: def __init__( self, broker: KISBroker, + overseas_broker: OverseasBroker | None, volatility_analyzer: VolatilityAnalyzer, settings: Settings, ) -> None: @@ -56,6 +59,7 @@ class SmartVolatilityScanner: settings: Application settings """ self.broker = broker + self.overseas_broker = overseas_broker self.analyzer = volatility_analyzer self.settings = settings @@ -67,16 +71,28 @@ class SmartVolatilityScanner: async def scan( self, + market: MarketInfo | None = None, fallback_stocks: list[str] | None = None, ) -> list[ScanCandidate]: """Execute smart scan and return qualified candidates. Args: + market: Target market info (domestic vs overseas behavior) fallback_stocks: Stock codes to use if ranking API fails Returns: List of ScanCandidate, sorted by score, up to top_n items """ + if market and not market.is_domestic: + return await self._scan_overseas(market, fallback_stocks) + + return await self._scan_domestic(fallback_stocks) + + async def _scan_domestic( + self, + fallback_stocks: list[str] | None = None, + ) -> list[ScanCandidate]: + """Scan domestic market using ranking API + RSI/volume filters.""" # Step 1: Fetch rankings try: rankings = await self.broker.fetch_market_rankings( @@ -180,6 +196,157 @@ class SmartVolatilityScanner: candidates.sort(key=lambda c: c.score, reverse=True) return candidates[: self.top_n] + async def _scan_overseas( + self, + market: MarketInfo, + fallback_stocks: list[str] | None = None, + ) -> list[ScanCandidate]: + """Scan overseas symbols using ranking API first, then fallback universe.""" + if self.overseas_broker is None: + logger.warning( + "Overseas scanner unavailable for %s: overseas broker not configured", + market.name, + ) + return [] + + candidates = await self._scan_overseas_from_rankings(market) + if not candidates: + candidates = await self._scan_overseas_from_symbols(market, fallback_stocks) + + candidates.sort(key=lambda c: c.score, reverse=True) + return candidates[: self.top_n] + + async def _scan_overseas_from_rankings( + self, + market: MarketInfo, + ) -> list[ScanCandidate]: + """Build overseas candidates from ranking APIs.""" + assert self.overseas_broker is not None + try: + fluct_rows = await self.overseas_broker.fetch_overseas_rankings( + exchange_code=market.exchange_code, + ranking_type="fluctuation", + limit=50, + ) + except Exception as exc: + logger.warning( + "Overseas fluctuation ranking failed for %s: %s", market.code, exc + ) + fluct_rows = [] + + if not fluct_rows: + return [] + + candidates: list[ScanCandidate] = [] + for row in fluct_rows: + stock_code = ( + str( + row.get("symb") + or row.get("ovrs_pdno") + or row.get("stock_code") + or row.get("pdno") + or "" + ) + .strip() + .upper() + ) + if not stock_code: + continue + + price = _safe_float( + row.get("last") + or row.get("ovrs_nmix_prpr") + or row.get("stck_prpr") + ) + change_rate = _safe_float( + row.get("rate") + or row.get("prdy_ctrt") + or row.get("evlu_pfls_rt") + or row.get("chg_rt") + ) + volume = _safe_float(row.get("tvol") or row.get("acml_vol") or row.get("vol")) + + if price <= 0 or abs(change_rate) < 0.8: + continue + + score = min(abs(change_rate) / 8.0, 1.0) * 100.0 + signal = "momentum" if change_rate >= 0 else "oversold" + implied_rsi = max(0.0, min(100.0, 50.0 + (change_rate * 4.0))) + candidates.append( + ScanCandidate( + stock_code=stock_code, + name=str(row.get("name") or row.get("ovrs_item_name") or stock_code), + price=price, + volume=volume, + volume_ratio=max(1.0, abs(change_rate) / 2.0), + rsi=implied_rsi, + signal=signal, + score=score, + ) + ) + + if candidates: + logger.info( + "Overseas ranking scan found %d candidates for %s", + len(candidates), + market.name, + ) + return candidates + + async def _scan_overseas_from_symbols( + self, + market: MarketInfo, + symbols: list[str] | None, + ) -> list[ScanCandidate]: + """Fallback overseas scan from dynamic symbol universe.""" + assert self.overseas_broker is not None + if not symbols: + logger.info("Overseas scanner: no symbol universe for %s", market.name) + return [] + + candidates: list[ScanCandidate] = [] + for stock_code in symbols: + try: + price_data = await self.overseas_broker.get_overseas_price( + market.exchange_code, stock_code + ) + output = price_data.get("output", {}) + price = _safe_float( + output.get("last") + or output.get("ovrs_nmix_prpr") + or output.get("stck_prpr") + ) + change_rate = _safe_float( + output.get("rate") + or output.get("prdy_ctrt") + or output.get("evlu_pfls_rt") + ) + volume = _safe_float(output.get("tvol") or output.get("acml_vol")) + + if price <= 0 or abs(change_rate) < 0.8: + continue + + score = min(abs(change_rate) / 8.0, 1.0) * 100.0 + signal = "momentum" if change_rate >= 0 else "oversold" + implied_rsi = max(0.0, min(100.0, 50.0 + (change_rate * 4.0))) + candidates.append( + ScanCandidate( + stock_code=stock_code, + name=stock_code, + price=price, + volume=volume, + volume_ratio=max(1.0, abs(change_rate) / 2.0), + rsi=implied_rsi, + signal=signal, + score=score, + ) + ) + except ConnectionError as exc: + logger.warning("Failed to analyze overseas %s: %s", stock_code, exc) + except Exception as exc: + logger.error("Unexpected error analyzing overseas %s: %s", stock_code, exc) + return candidates + def get_stock_codes(self, candidates: list[ScanCandidate]) -> list[str]: """Extract stock codes from candidates for watchlist update. @@ -190,3 +357,13 @@ class SmartVolatilityScanner: List of stock codes """ return [c.stock_code for c in candidates] + + +def _safe_float(value: Any, default: float = 0.0) -> float: + """Convert arbitrary values to float safely.""" + if value in (None, ""): + return default + try: + return float(value) + except (TypeError, ValueError): + return default diff --git a/src/broker/overseas.py b/src/broker/overseas.py index 874df83..541d2da 100644 --- a/src/broker/overseas.py +++ b/src/broker/overseas.py @@ -64,6 +64,65 @@ class OverseasBroker: f"Network error fetching overseas price: {exc}" ) from exc + async def fetch_overseas_rankings( + self, + exchange_code: str, + ranking_type: str = "fluctuation", + limit: int = 30, + ) -> list[dict[str, Any]]: + """Fetch overseas rankings (price change or volume amount). + + Ranking API specs may differ by account/product. Endpoint paths and + TR_IDs are configurable via settings and can be overridden in .env. + """ + if not self._broker._settings.OVERSEAS_RANKING_ENABLED: + return [] + + await self._broker._rate_limiter.acquire() + session = self._broker._get_session() + + if ranking_type == "volume": + tr_id = self._broker._settings.OVERSEAS_RANKING_VOLUME_TR_ID + path = self._broker._settings.OVERSEAS_RANKING_VOLUME_PATH + else: + tr_id = self._broker._settings.OVERSEAS_RANKING_FLUCT_TR_ID + path = self._broker._settings.OVERSEAS_RANKING_FLUCT_PATH + + headers = await self._broker._auth_headers(tr_id) + url = f"{self._broker._base_url}{path}" + + # Try common param variants used by KIS overseas quotation APIs. + param_variants = [ + {"AUTH": "", "EXCD": exchange_code, "NREC": str(max(limit, 30))}, + {"AUTH": "", "OVRS_EXCG_CD": exchange_code, "NREC": str(max(limit, 30))}, + {"AUTH": "", "EXCD": exchange_code}, + {"AUTH": "", "OVRS_EXCG_CD": exchange_code}, + ] + + last_error: str | None = None + for params in param_variants: + try: + async with session.get(url, headers=headers, params=params) as resp: + text = await resp.text() + if resp.status != 200: + last_error = f"HTTP {resp.status}: {text}" + continue + + data = await resp.json() + rows = self._extract_ranking_rows(data) + if rows: + return rows[:limit] + + # keep trying another param variant if response has no usable rows + last_error = f"empty output (keys={list(data.keys())})" + except (TimeoutError, aiohttp.ClientError) as exc: + last_error = str(exc) + continue + + raise ConnectionError( + f"fetch_overseas_rankings failed for {exchange_code}/{ranking_type}: {last_error}" + ) + async def get_overseas_balance(self, exchange_code: str) -> dict[str, Any]: """ Fetch overseas account balance. @@ -198,3 +257,11 @@ class OverseasBroker: "HSX": "VND", } return currency_map.get(exchange_code, "USD") + + def _extract_ranking_rows(self, data: dict[str, Any]) -> list[dict[str, Any]]: + """Extract list rows from ranking response across schema variants.""" + candidates = [data.get("output"), data.get("output1"), data.get("output2")] + for value in candidates: + if isinstance(value, list): + return [row for row in value if isinstance(row, dict)] + return [] diff --git a/src/config.py b/src/config.py index d5c1aa4..8516cb8 100644 --- a/src/config.py +++ b/src/config.py @@ -83,6 +83,18 @@ class Settings(BaseSettings): TELEGRAM_COMMANDS_ENABLED: bool = True TELEGRAM_POLLING_INTERVAL: float = 1.0 # seconds + # Overseas ranking API (KIS endpoint/TR_ID may vary by account/product) + # Override these from .env if your account uses different specs. + OVERSEAS_RANKING_ENABLED: bool = True + OVERSEAS_RANKING_FLUCT_TR_ID: str = "HHDFS76200100" + OVERSEAS_RANKING_VOLUME_TR_ID: str = "HHDFS76200200" + OVERSEAS_RANKING_FLUCT_PATH: str = ( + "/uapi/overseas-price/v1/quotations/inquire-updown-rank" + ) + OVERSEAS_RANKING_VOLUME_PATH: str = ( + "/uapi/overseas-price/v1/quotations/inquire-volume-rank" + ) + # Dashboard (optional) DASHBOARD_ENABLED: bool = False DASHBOARD_HOST: str = "127.0.0.1" diff --git a/src/db.py b/src/db.py index 619bb3c..d798a45 100644 --- a/src/db.py +++ b/src/db.py @@ -235,3 +235,21 @@ def get_open_position( if not row or row[0] != "BUY": return None return {"decision_id": row[1], "price": row[2], "quantity": row[3]} + + +def get_recent_symbols( + conn: sqlite3.Connection, market: str, limit: int = 30 +) -> list[str]: + """Return recent unique symbols for a market, newest first.""" + cursor = conn.execute( + """ + SELECT stock_code, MAX(timestamp) AS last_ts + FROM trades + WHERE market = ? + GROUP BY stock_code + ORDER BY last_ts DESC + LIMIT ? + """, + (market, limit), + ) + return [row[0] for row in cursor.fetchall() if row and row[0]] diff --git a/src/main.py b/src/main.py index eea2dc1..da6f658 100644 --- a/src/main.py +++ b/src/main.py @@ -29,7 +29,13 @@ from src.context.store import ContextStore from src.core.criticality import CriticalityAssessor from src.core.priority_queue import PriorityTaskQueue from src.core.risk_manager import CircuitBreakerTripped, FatFingerRejected, RiskManager -from src.db import get_latest_buy_trade, get_open_position, init_db, log_trade +from src.db import ( + get_latest_buy_trade, + get_open_position, + get_recent_symbols, + init_db, + log_trade, +) from src.evolution.daily_review import DailyReviewer from src.evolution.optimizer import EvolutionOptimizer from src.logging.decision_logger import DecisionLogger @@ -81,6 +87,67 @@ DAILY_TRADE_SESSIONS = 4 # Number of trading sessions per day TRADE_SESSION_INTERVAL_HOURS = 6 # Hours between sessions +def _extract_symbol_from_holding(item: dict[str, Any]) -> str: + """Extract symbol from overseas holding payload variants.""" + for key in ( + "ovrs_pdno", + "pdno", + "ovrs_item_name", + "prdt_name", + "symb", + "symbol", + "stock_code", + ): + value = item.get(key) + if isinstance(value, str): + symbol = value.strip().upper() + if symbol and symbol.replace(".", "").replace("-", "").isalnum(): + return symbol + return "" + + +async def build_overseas_symbol_universe( + db_conn: Any, + overseas_broker: OverseasBroker, + market: MarketInfo, + active_stocks: dict[str, list[str]], +) -> list[str]: + """Build dynamic overseas symbol universe from runtime, DB, and holdings.""" + symbols: list[str] = [] + + # 1) Keep current active stocks first to avoid sudden churn between cycles. + symbols.extend(active_stocks.get(market.code, [])) + + # 2) Add recent symbols from own trading history (no fixed list). + symbols.extend(get_recent_symbols(db_conn, market.code, limit=30)) + + # 3) Add current overseas holdings from broker balance if available. + try: + balance_data = await overseas_broker.get_overseas_balance(market.exchange_code) + output1 = balance_data.get("output1", []) + if isinstance(output1, dict): + output1 = [output1] + if isinstance(output1, list): + for row in output1: + if not isinstance(row, dict): + continue + symbol = _extract_symbol_from_holding(row) + if symbol: + symbols.append(symbol) + except Exception as exc: + logger.warning("Failed to build overseas holdings universe for %s: %s", market.code, exc) + + seen: set[str] = set() + ordered_unique: list[str] = [] + for symbol in symbols: + normalized = symbol.strip().upper() + if not normalized or normalized in seen: + continue + seen.add(normalized) + ordered_unique.append(normalized) + return ordered_unique + + async def trading_cycle( broker: KISBroker, overseas_broker: OverseasBroker, @@ -482,8 +549,28 @@ async def run_daily_session( # Dynamic stock discovery via scanner (no static watchlists) candidates_list: list[ScanCandidate] = [] + fallback_stocks: list[str] | None = None + if not market.is_domestic: + fallback_stocks = await build_overseas_symbol_universe( + db_conn=db_conn, + overseas_broker=overseas_broker, + market=market, + active_stocks={}, + ) + if not fallback_stocks: + logger.warning( + "No dynamic overseas symbol universe for %s; scanner cannot run", + market.code, + ) try: - candidates_list = await smart_scanner.scan() if smart_scanner else [] + candidates_list = ( + await smart_scanner.scan( + market=market, + fallback_stocks=fallback_stocks, + ) + if smart_scanner + else [] + ) except Exception as exc: logger.error("Smart Scanner failed for %s: %s", market.name, exc) @@ -1263,6 +1350,7 @@ async def run(settings: Settings) -> None: # Initialize smart scanner (Python-first, AI-last pipeline) smart_scanner = SmartVolatilityScanner( broker=broker, + overseas_broker=overseas_broker, volatility_analyzer=volatility_analyzer, settings=settings, ) @@ -1442,7 +1530,25 @@ async def run(settings: Settings) -> None: try: logger.info("Smart Scanner: Scanning %s market", market.name) - candidates = await smart_scanner.scan() + fallback_stocks: list[str] | None = None + if not market.is_domestic: + fallback_stocks = await build_overseas_symbol_universe( + db_conn=db_conn, + overseas_broker=overseas_broker, + market=market, + active_stocks=active_stocks, + ) + if not fallback_stocks: + logger.warning( + "No dynamic overseas symbol universe for %s;" + " scanner cannot run", + market.code, + ) + + candidates = await smart_scanner.scan( + market=market, + fallback_stocks=fallback_stocks, + ) if candidates: # Use scanner results directly as trading candidates diff --git a/tests/test_smart_scanner.py b/tests/test_smart_scanner.py index fc380d7..03d2991 100644 --- a/tests/test_smart_scanner.py +++ b/tests/test_smart_scanner.py @@ -8,6 +8,7 @@ from unittest.mock import AsyncMock, MagicMock from src.analysis.smart_scanner import ScanCandidate, SmartVolatilityScanner from src.analysis.volatility import VolatilityAnalyzer from src.broker.kis_api import KISBroker +from src.broker.overseas import OverseasBroker from src.config import Settings @@ -43,11 +44,21 @@ def scanner(mock_broker: MagicMock, mock_settings: Settings) -> SmartVolatilityS analyzer = VolatilityAnalyzer() return SmartVolatilityScanner( broker=mock_broker, + overseas_broker=None, volatility_analyzer=analyzer, settings=mock_settings, ) +@pytest.fixture +def mock_overseas_broker() -> MagicMock: + """Create mock overseas broker.""" + broker = MagicMock(spec=OverseasBroker) + broker.get_overseas_price = AsyncMock() + broker.fetch_overseas_rankings = AsyncMock(return_value=[]) + return broker + + class TestSmartVolatilityScanner: """Test suite for SmartVolatilityScanner.""" @@ -323,6 +334,90 @@ class TestSmartVolatilityScanner: assert codes == ["005930", "035420"] + @pytest.mark.asyncio + async def test_scan_overseas_uses_dynamic_symbols( + self, mock_broker: MagicMock, mock_overseas_broker: MagicMock, mock_settings: Settings + ) -> None: + """Overseas scan should use provided dynamic universe symbols.""" + analyzer = VolatilityAnalyzer() + scanner = SmartVolatilityScanner( + broker=mock_broker, + overseas_broker=mock_overseas_broker, + volatility_analyzer=analyzer, + settings=mock_settings, + ) + + market = MagicMock() + market.name = "NASDAQ" + market.code = "US_NASDAQ" + market.exchange_code = "NASD" + market.is_domestic = False + + mock_overseas_broker.get_overseas_price.side_effect = [ + {"output": {"last": "210.5", "rate": "1.6", "tvol": "1500000"}}, + {"output": {"last": "330.1", "rate": "0.2", "tvol": "900000"}}, + ] + + candidates = await scanner.scan( + market=market, + fallback_stocks=["AAPL", "MSFT"], + ) + + assert [c.stock_code for c in candidates] == ["AAPL"] + assert candidates[0].signal == "momentum" + assert candidates[0].price == 210.5 + + @pytest.mark.asyncio + async def test_scan_overseas_uses_ranking_api_first( + self, mock_broker: MagicMock, mock_overseas_broker: MagicMock, mock_settings: Settings + ) -> None: + """Overseas scan should prioritize ranking API when available.""" + analyzer = VolatilityAnalyzer() + scanner = SmartVolatilityScanner( + broker=mock_broker, + overseas_broker=mock_overseas_broker, + volatility_analyzer=analyzer, + settings=mock_settings, + ) + market = MagicMock() + market.name = "NASDAQ" + market.code = "US_NASDAQ" + market.exchange_code = "NASD" + market.is_domestic = False + + mock_overseas_broker.fetch_overseas_rankings.return_value = [ + {"symb": "NVDA", "last": "780.2", "rate": "2.4", "tvol": "1200000"}, + {"symb": "MSFT", "last": "420.0", "rate": "0.3", "tvol": "900000"}, + ] + + candidates = await scanner.scan(market=market, fallback_stocks=["AAPL", "TSLA"]) + + mock_overseas_broker.fetch_overseas_rankings.assert_called_once() + mock_overseas_broker.get_overseas_price.assert_not_called() + assert [c.stock_code for c in candidates] == ["NVDA"] + + @pytest.mark.asyncio + async def test_scan_overseas_without_symbols_returns_empty( + self, mock_broker: MagicMock, mock_overseas_broker: MagicMock, mock_settings: Settings + ) -> None: + """Overseas scan should return empty list when no symbol universe exists.""" + analyzer = VolatilityAnalyzer() + scanner = SmartVolatilityScanner( + broker=mock_broker, + overseas_broker=mock_overseas_broker, + volatility_analyzer=analyzer, + settings=mock_settings, + ) + market = MagicMock() + market.name = "NASDAQ" + market.code = "US_NASDAQ" + market.exchange_code = "NASD" + market.is_domestic = False + + candidates = await scanner.scan(market=market, fallback_stocks=[]) + + assert candidates == [] + class TestRSICalculation: """Test RSI calculation in VolatilityAnalyzer."""