feat: add overseas ranking integration with dynamic fallback
This commit is contained in:
@@ -12,7 +12,9 @@ from typing import Any
|
||||
|
||||
from src.analysis.volatility import VolatilityAnalyzer
|
||||
from src.broker.kis_api import KISBroker
|
||||
from src.broker.overseas import OverseasBroker
|
||||
from src.config import Settings
|
||||
from src.markets.schedule import MarketInfo
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -45,6 +47,7 @@ class SmartVolatilityScanner:
|
||||
def __init__(
|
||||
self,
|
||||
broker: KISBroker,
|
||||
overseas_broker: OverseasBroker | None,
|
||||
volatility_analyzer: VolatilityAnalyzer,
|
||||
settings: Settings,
|
||||
) -> None:
|
||||
@@ -56,6 +59,7 @@ class SmartVolatilityScanner:
|
||||
settings: Application settings
|
||||
"""
|
||||
self.broker = broker
|
||||
self.overseas_broker = overseas_broker
|
||||
self.analyzer = volatility_analyzer
|
||||
self.settings = settings
|
||||
|
||||
@@ -67,16 +71,28 @@ class SmartVolatilityScanner:
|
||||
|
||||
async def scan(
|
||||
self,
|
||||
market: MarketInfo | None = None,
|
||||
fallback_stocks: list[str] | None = None,
|
||||
) -> list[ScanCandidate]:
|
||||
"""Execute smart scan and return qualified candidates.
|
||||
|
||||
Args:
|
||||
market: Target market info (domestic vs overseas behavior)
|
||||
fallback_stocks: Stock codes to use if ranking API fails
|
||||
|
||||
Returns:
|
||||
List of ScanCandidate, sorted by score, up to top_n items
|
||||
"""
|
||||
if market and not market.is_domestic:
|
||||
return await self._scan_overseas(market, fallback_stocks)
|
||||
|
||||
return await self._scan_domestic(fallback_stocks)
|
||||
|
||||
async def _scan_domestic(
|
||||
self,
|
||||
fallback_stocks: list[str] | None = None,
|
||||
) -> list[ScanCandidate]:
|
||||
"""Scan domestic market using ranking API + RSI/volume filters."""
|
||||
# Step 1: Fetch rankings
|
||||
try:
|
||||
rankings = await self.broker.fetch_market_rankings(
|
||||
@@ -180,6 +196,157 @@ class SmartVolatilityScanner:
|
||||
candidates.sort(key=lambda c: c.score, reverse=True)
|
||||
return candidates[: self.top_n]
|
||||
|
||||
async def _scan_overseas(
|
||||
self,
|
||||
market: MarketInfo,
|
||||
fallback_stocks: list[str] | None = None,
|
||||
) -> list[ScanCandidate]:
|
||||
"""Scan overseas symbols using ranking API first, then fallback universe."""
|
||||
if self.overseas_broker is None:
|
||||
logger.warning(
|
||||
"Overseas scanner unavailable for %s: overseas broker not configured",
|
||||
market.name,
|
||||
)
|
||||
return []
|
||||
|
||||
candidates = await self._scan_overseas_from_rankings(market)
|
||||
if not candidates:
|
||||
candidates = await self._scan_overseas_from_symbols(market, fallback_stocks)
|
||||
|
||||
candidates.sort(key=lambda c: c.score, reverse=True)
|
||||
return candidates[: self.top_n]
|
||||
|
||||
async def _scan_overseas_from_rankings(
|
||||
self,
|
||||
market: MarketInfo,
|
||||
) -> list[ScanCandidate]:
|
||||
"""Build overseas candidates from ranking APIs."""
|
||||
assert self.overseas_broker is not None
|
||||
try:
|
||||
fluct_rows = await self.overseas_broker.fetch_overseas_rankings(
|
||||
exchange_code=market.exchange_code,
|
||||
ranking_type="fluctuation",
|
||||
limit=50,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"Overseas fluctuation ranking failed for %s: %s", market.code, exc
|
||||
)
|
||||
fluct_rows = []
|
||||
|
||||
if not fluct_rows:
|
||||
return []
|
||||
|
||||
candidates: list[ScanCandidate] = []
|
||||
for row in fluct_rows:
|
||||
stock_code = (
|
||||
str(
|
||||
row.get("symb")
|
||||
or row.get("ovrs_pdno")
|
||||
or row.get("stock_code")
|
||||
or row.get("pdno")
|
||||
or ""
|
||||
)
|
||||
.strip()
|
||||
.upper()
|
||||
)
|
||||
if not stock_code:
|
||||
continue
|
||||
|
||||
price = _safe_float(
|
||||
row.get("last")
|
||||
or row.get("ovrs_nmix_prpr")
|
||||
or row.get("stck_prpr")
|
||||
)
|
||||
change_rate = _safe_float(
|
||||
row.get("rate")
|
||||
or row.get("prdy_ctrt")
|
||||
or row.get("evlu_pfls_rt")
|
||||
or row.get("chg_rt")
|
||||
)
|
||||
volume = _safe_float(row.get("tvol") or row.get("acml_vol") or row.get("vol"))
|
||||
|
||||
if price <= 0 or abs(change_rate) < 0.8:
|
||||
continue
|
||||
|
||||
score = min(abs(change_rate) / 8.0, 1.0) * 100.0
|
||||
signal = "momentum" if change_rate >= 0 else "oversold"
|
||||
implied_rsi = max(0.0, min(100.0, 50.0 + (change_rate * 4.0)))
|
||||
candidates.append(
|
||||
ScanCandidate(
|
||||
stock_code=stock_code,
|
||||
name=str(row.get("name") or row.get("ovrs_item_name") or stock_code),
|
||||
price=price,
|
||||
volume=volume,
|
||||
volume_ratio=max(1.0, abs(change_rate) / 2.0),
|
||||
rsi=implied_rsi,
|
||||
signal=signal,
|
||||
score=score,
|
||||
)
|
||||
)
|
||||
|
||||
if candidates:
|
||||
logger.info(
|
||||
"Overseas ranking scan found %d candidates for %s",
|
||||
len(candidates),
|
||||
market.name,
|
||||
)
|
||||
return candidates
|
||||
|
||||
async def _scan_overseas_from_symbols(
|
||||
self,
|
||||
market: MarketInfo,
|
||||
symbols: list[str] | None,
|
||||
) -> list[ScanCandidate]:
|
||||
"""Fallback overseas scan from dynamic symbol universe."""
|
||||
assert self.overseas_broker is not None
|
||||
if not symbols:
|
||||
logger.info("Overseas scanner: no symbol universe for %s", market.name)
|
||||
return []
|
||||
|
||||
candidates: list[ScanCandidate] = []
|
||||
for stock_code in symbols:
|
||||
try:
|
||||
price_data = await self.overseas_broker.get_overseas_price(
|
||||
market.exchange_code, stock_code
|
||||
)
|
||||
output = price_data.get("output", {})
|
||||
price = _safe_float(
|
||||
output.get("last")
|
||||
or output.get("ovrs_nmix_prpr")
|
||||
or output.get("stck_prpr")
|
||||
)
|
||||
change_rate = _safe_float(
|
||||
output.get("rate")
|
||||
or output.get("prdy_ctrt")
|
||||
or output.get("evlu_pfls_rt")
|
||||
)
|
||||
volume = _safe_float(output.get("tvol") or output.get("acml_vol"))
|
||||
|
||||
if price <= 0 or abs(change_rate) < 0.8:
|
||||
continue
|
||||
|
||||
score = min(abs(change_rate) / 8.0, 1.0) * 100.0
|
||||
signal = "momentum" if change_rate >= 0 else "oversold"
|
||||
implied_rsi = max(0.0, min(100.0, 50.0 + (change_rate * 4.0)))
|
||||
candidates.append(
|
||||
ScanCandidate(
|
||||
stock_code=stock_code,
|
||||
name=stock_code,
|
||||
price=price,
|
||||
volume=volume,
|
||||
volume_ratio=max(1.0, abs(change_rate) / 2.0),
|
||||
rsi=implied_rsi,
|
||||
signal=signal,
|
||||
score=score,
|
||||
)
|
||||
)
|
||||
except ConnectionError as exc:
|
||||
logger.warning("Failed to analyze overseas %s: %s", stock_code, exc)
|
||||
except Exception as exc:
|
||||
logger.error("Unexpected error analyzing overseas %s: %s", stock_code, exc)
|
||||
return candidates
|
||||
|
||||
def get_stock_codes(self, candidates: list[ScanCandidate]) -> list[str]:
|
||||
"""Extract stock codes from candidates for watchlist update.
|
||||
|
||||
@@ -190,3 +357,13 @@ class SmartVolatilityScanner:
|
||||
List of stock codes
|
||||
"""
|
||||
return [c.stock_code for c in candidates]
|
||||
|
||||
|
||||
def _safe_float(value: Any, default: float = 0.0) -> float:
|
||||
"""Convert arbitrary values to float safely."""
|
||||
if value in (None, ""):
|
||||
return default
|
||||
try:
|
||||
return float(value)
|
||||
except (TypeError, ValueError):
|
||||
return default
|
||||
|
||||
@@ -64,6 +64,65 @@ class OverseasBroker:
|
||||
f"Network error fetching overseas price: {exc}"
|
||||
) from exc
|
||||
|
||||
async def fetch_overseas_rankings(
|
||||
self,
|
||||
exchange_code: str,
|
||||
ranking_type: str = "fluctuation",
|
||||
limit: int = 30,
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Fetch overseas rankings (price change or volume amount).
|
||||
|
||||
Ranking API specs may differ by account/product. Endpoint paths and
|
||||
TR_IDs are configurable via settings and can be overridden in .env.
|
||||
"""
|
||||
if not self._broker._settings.OVERSEAS_RANKING_ENABLED:
|
||||
return []
|
||||
|
||||
await self._broker._rate_limiter.acquire()
|
||||
session = self._broker._get_session()
|
||||
|
||||
if ranking_type == "volume":
|
||||
tr_id = self._broker._settings.OVERSEAS_RANKING_VOLUME_TR_ID
|
||||
path = self._broker._settings.OVERSEAS_RANKING_VOLUME_PATH
|
||||
else:
|
||||
tr_id = self._broker._settings.OVERSEAS_RANKING_FLUCT_TR_ID
|
||||
path = self._broker._settings.OVERSEAS_RANKING_FLUCT_PATH
|
||||
|
||||
headers = await self._broker._auth_headers(tr_id)
|
||||
url = f"{self._broker._base_url}{path}"
|
||||
|
||||
# Try common param variants used by KIS overseas quotation APIs.
|
||||
param_variants = [
|
||||
{"AUTH": "", "EXCD": exchange_code, "NREC": str(max(limit, 30))},
|
||||
{"AUTH": "", "OVRS_EXCG_CD": exchange_code, "NREC": str(max(limit, 30))},
|
||||
{"AUTH": "", "EXCD": exchange_code},
|
||||
{"AUTH": "", "OVRS_EXCG_CD": exchange_code},
|
||||
]
|
||||
|
||||
last_error: str | None = None
|
||||
for params in param_variants:
|
||||
try:
|
||||
async with session.get(url, headers=headers, params=params) as resp:
|
||||
text = await resp.text()
|
||||
if resp.status != 200:
|
||||
last_error = f"HTTP {resp.status}: {text}"
|
||||
continue
|
||||
|
||||
data = await resp.json()
|
||||
rows = self._extract_ranking_rows(data)
|
||||
if rows:
|
||||
return rows[:limit]
|
||||
|
||||
# keep trying another param variant if response has no usable rows
|
||||
last_error = f"empty output (keys={list(data.keys())})"
|
||||
except (TimeoutError, aiohttp.ClientError) as exc:
|
||||
last_error = str(exc)
|
||||
continue
|
||||
|
||||
raise ConnectionError(
|
||||
f"fetch_overseas_rankings failed for {exchange_code}/{ranking_type}: {last_error}"
|
||||
)
|
||||
|
||||
async def get_overseas_balance(self, exchange_code: str) -> dict[str, Any]:
|
||||
"""
|
||||
Fetch overseas account balance.
|
||||
@@ -198,3 +257,11 @@ class OverseasBroker:
|
||||
"HSX": "VND",
|
||||
}
|
||||
return currency_map.get(exchange_code, "USD")
|
||||
|
||||
def _extract_ranking_rows(self, data: dict[str, Any]) -> list[dict[str, Any]]:
|
||||
"""Extract list rows from ranking response across schema variants."""
|
||||
candidates = [data.get("output"), data.get("output1"), data.get("output2")]
|
||||
for value in candidates:
|
||||
if isinstance(value, list):
|
||||
return [row for row in value if isinstance(row, dict)]
|
||||
return []
|
||||
|
||||
@@ -83,6 +83,18 @@ class Settings(BaseSettings):
|
||||
TELEGRAM_COMMANDS_ENABLED: bool = True
|
||||
TELEGRAM_POLLING_INTERVAL: float = 1.0 # seconds
|
||||
|
||||
# Overseas ranking API (KIS endpoint/TR_ID may vary by account/product)
|
||||
# Override these from .env if your account uses different specs.
|
||||
OVERSEAS_RANKING_ENABLED: bool = True
|
||||
OVERSEAS_RANKING_FLUCT_TR_ID: str = "HHDFS76200100"
|
||||
OVERSEAS_RANKING_VOLUME_TR_ID: str = "HHDFS76200200"
|
||||
OVERSEAS_RANKING_FLUCT_PATH: str = (
|
||||
"/uapi/overseas-price/v1/quotations/inquire-updown-rank"
|
||||
)
|
||||
OVERSEAS_RANKING_VOLUME_PATH: str = (
|
||||
"/uapi/overseas-price/v1/quotations/inquire-volume-rank"
|
||||
)
|
||||
|
||||
# Dashboard (optional)
|
||||
DASHBOARD_ENABLED: bool = False
|
||||
DASHBOARD_HOST: str = "127.0.0.1"
|
||||
|
||||
18
src/db.py
18
src/db.py
@@ -235,3 +235,21 @@ def get_open_position(
|
||||
if not row or row[0] != "BUY":
|
||||
return None
|
||||
return {"decision_id": row[1], "price": row[2], "quantity": row[3]}
|
||||
|
||||
|
||||
def get_recent_symbols(
|
||||
conn: sqlite3.Connection, market: str, limit: int = 30
|
||||
) -> list[str]:
|
||||
"""Return recent unique symbols for a market, newest first."""
|
||||
cursor = conn.execute(
|
||||
"""
|
||||
SELECT stock_code, MAX(timestamp) AS last_ts
|
||||
FROM trades
|
||||
WHERE market = ?
|
||||
GROUP BY stock_code
|
||||
ORDER BY last_ts DESC
|
||||
LIMIT ?
|
||||
""",
|
||||
(market, limit),
|
||||
)
|
||||
return [row[0] for row in cursor.fetchall() if row and row[0]]
|
||||
|
||||
112
src/main.py
112
src/main.py
@@ -29,7 +29,13 @@ from src.context.store import ContextStore
|
||||
from src.core.criticality import CriticalityAssessor
|
||||
from src.core.priority_queue import PriorityTaskQueue
|
||||
from src.core.risk_manager import CircuitBreakerTripped, FatFingerRejected, RiskManager
|
||||
from src.db import get_latest_buy_trade, get_open_position, init_db, log_trade
|
||||
from src.db import (
|
||||
get_latest_buy_trade,
|
||||
get_open_position,
|
||||
get_recent_symbols,
|
||||
init_db,
|
||||
log_trade,
|
||||
)
|
||||
from src.evolution.daily_review import DailyReviewer
|
||||
from src.evolution.optimizer import EvolutionOptimizer
|
||||
from src.logging.decision_logger import DecisionLogger
|
||||
@@ -81,6 +87,67 @@ DAILY_TRADE_SESSIONS = 4 # Number of trading sessions per day
|
||||
TRADE_SESSION_INTERVAL_HOURS = 6 # Hours between sessions
|
||||
|
||||
|
||||
def _extract_symbol_from_holding(item: dict[str, Any]) -> str:
|
||||
"""Extract symbol from overseas holding payload variants."""
|
||||
for key in (
|
||||
"ovrs_pdno",
|
||||
"pdno",
|
||||
"ovrs_item_name",
|
||||
"prdt_name",
|
||||
"symb",
|
||||
"symbol",
|
||||
"stock_code",
|
||||
):
|
||||
value = item.get(key)
|
||||
if isinstance(value, str):
|
||||
symbol = value.strip().upper()
|
||||
if symbol and symbol.replace(".", "").replace("-", "").isalnum():
|
||||
return symbol
|
||||
return ""
|
||||
|
||||
|
||||
async def build_overseas_symbol_universe(
|
||||
db_conn: Any,
|
||||
overseas_broker: OverseasBroker,
|
||||
market: MarketInfo,
|
||||
active_stocks: dict[str, list[str]],
|
||||
) -> list[str]:
|
||||
"""Build dynamic overseas symbol universe from runtime, DB, and holdings."""
|
||||
symbols: list[str] = []
|
||||
|
||||
# 1) Keep current active stocks first to avoid sudden churn between cycles.
|
||||
symbols.extend(active_stocks.get(market.code, []))
|
||||
|
||||
# 2) Add recent symbols from own trading history (no fixed list).
|
||||
symbols.extend(get_recent_symbols(db_conn, market.code, limit=30))
|
||||
|
||||
# 3) Add current overseas holdings from broker balance if available.
|
||||
try:
|
||||
balance_data = await overseas_broker.get_overseas_balance(market.exchange_code)
|
||||
output1 = balance_data.get("output1", [])
|
||||
if isinstance(output1, dict):
|
||||
output1 = [output1]
|
||||
if isinstance(output1, list):
|
||||
for row in output1:
|
||||
if not isinstance(row, dict):
|
||||
continue
|
||||
symbol = _extract_symbol_from_holding(row)
|
||||
if symbol:
|
||||
symbols.append(symbol)
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to build overseas holdings universe for %s: %s", market.code, exc)
|
||||
|
||||
seen: set[str] = set()
|
||||
ordered_unique: list[str] = []
|
||||
for symbol in symbols:
|
||||
normalized = symbol.strip().upper()
|
||||
if not normalized or normalized in seen:
|
||||
continue
|
||||
seen.add(normalized)
|
||||
ordered_unique.append(normalized)
|
||||
return ordered_unique
|
||||
|
||||
|
||||
async def trading_cycle(
|
||||
broker: KISBroker,
|
||||
overseas_broker: OverseasBroker,
|
||||
@@ -482,8 +549,28 @@ async def run_daily_session(
|
||||
|
||||
# Dynamic stock discovery via scanner (no static watchlists)
|
||||
candidates_list: list[ScanCandidate] = []
|
||||
fallback_stocks: list[str] | None = None
|
||||
if not market.is_domestic:
|
||||
fallback_stocks = await build_overseas_symbol_universe(
|
||||
db_conn=db_conn,
|
||||
overseas_broker=overseas_broker,
|
||||
market=market,
|
||||
active_stocks={},
|
||||
)
|
||||
if not fallback_stocks:
|
||||
logger.warning(
|
||||
"No dynamic overseas symbol universe for %s; scanner cannot run",
|
||||
market.code,
|
||||
)
|
||||
try:
|
||||
candidates_list = await smart_scanner.scan() if smart_scanner else []
|
||||
candidates_list = (
|
||||
await smart_scanner.scan(
|
||||
market=market,
|
||||
fallback_stocks=fallback_stocks,
|
||||
)
|
||||
if smart_scanner
|
||||
else []
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.error("Smart Scanner failed for %s: %s", market.name, exc)
|
||||
|
||||
@@ -1263,6 +1350,7 @@ async def run(settings: Settings) -> None:
|
||||
# Initialize smart scanner (Python-first, AI-last pipeline)
|
||||
smart_scanner = SmartVolatilityScanner(
|
||||
broker=broker,
|
||||
overseas_broker=overseas_broker,
|
||||
volatility_analyzer=volatility_analyzer,
|
||||
settings=settings,
|
||||
)
|
||||
@@ -1442,7 +1530,25 @@ async def run(settings: Settings) -> None:
|
||||
try:
|
||||
logger.info("Smart Scanner: Scanning %s market", market.name)
|
||||
|
||||
candidates = await smart_scanner.scan()
|
||||
fallback_stocks: list[str] | None = None
|
||||
if not market.is_domestic:
|
||||
fallback_stocks = await build_overseas_symbol_universe(
|
||||
db_conn=db_conn,
|
||||
overseas_broker=overseas_broker,
|
||||
market=market,
|
||||
active_stocks=active_stocks,
|
||||
)
|
||||
if not fallback_stocks:
|
||||
logger.warning(
|
||||
"No dynamic overseas symbol universe for %s;"
|
||||
" scanner cannot run",
|
||||
market.code,
|
||||
)
|
||||
|
||||
candidates = await smart_scanner.scan(
|
||||
market=market,
|
||||
fallback_stocks=fallback_stocks,
|
||||
)
|
||||
|
||||
if candidates:
|
||||
# Use scanner results directly as trading candidates
|
||||
|
||||
Reference in New Issue
Block a user