Compare commits

..

14 Commits

Author SHA1 Message Date
agentson
09e6eef3bf feat: unify domestic scanner and sizing; update docs
Some checks failed
CI / test (pull_request) Has been cancelled
2026-02-17 06:15:20 +09:00
agentson
10b15a4563 docs: reflect overseas ranking integration and volatility-first selection 2026-02-17 05:57:56 +09:00
agentson
a6693560c1 feat: prioritize overseas volatility scoring over raw rankings 2026-02-17 05:54:46 +09:00
agentson
16bb8b6dc6 feat: add overseas ranking integration with dynamic fallback 2026-02-17 05:50:10 +09:00
0424c78f6c Merge pull request 'feat: US market code 정합성, Telegram 명령 4종, 손절 모니터링 (#132)' (#135) from feature/issue-132-us-market-telegram-gaps into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #135
2026-02-16 20:25:43 +09:00
agentson
3fdb7a29d4 feat: US market code 정합성, Telegram 명령 4종, 손절 모니터링 (#132)
Some checks failed
CI / test (pull_request) Has been cancelled
- MARKET_SHORTHAND + expand_market_codes()로 config "US" → schedule "US_NASDAQ/NYSE/AMEX" 자동 확장
- /report, /scenarios, /review, /dashboard 텔레그램 명령 추가
- price_change_pct를 trading_cycle과 run_daily_session에 주입
- HOLD시 get_open_position 기반 손절 모니터링 및 자동 SELL 오버라이드
- 대시보드 /api/status 동적 market 조회로 변경

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 20:24:01 +09:00
31b4d0bf1e Merge pull request 'fix: daily_review 테스트 날짜 불일치 수정 (#129)' (#130) from feature/issue-129-fix-daily-review-test-date into main
Some checks failed
CI / test (push) Has been cancelled
CI / test (pull_request) Has been cancelled
Reviewed-on: #130
2026-02-16 11:30:20 +09:00
agentson
e2275a23b1 fix: daily_review 테스트에서 날짜 불일치로 인한 실패 수정 (#129)
Some checks failed
CI / test (pull_request) Has been cancelled
DecisionLogger와 log_trade가 datetime.now(UTC)로 현재 날짜를 저장하는데,
테스트에서 하드코딩된 '2026-02-14'로 조회하여 0건이 반환되던 문제 수정.
generate_scorecard 호출 시 TODAY 변수를 사용하도록 변경.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 10:05:17 +09:00
7522bb7e66 Merge pull request 'feat: 대시보드 실행 통합 - CLI + 환경변수 (issue #97)' (#128) from feature/issue-97-dashboard-integration into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #128
2026-02-15 00:01:57 +09:00
agentson
63fa6841a2 feat: dashboard background thread with CLI flag (issue #97)
Some checks failed
CI / test (pull_request) Has been cancelled
Add --dashboard CLI flag and DASHBOARD_ENABLED env var to start
FastAPI dashboard in a daemon thread alongside the trading loop.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-15 00:01:29 +09:00
ece3c5597b Merge pull request 'feat: FastAPI 읽기 전용 대시보드 (issue #96)' (#127) from feature/issue-96-evolution-main-integration into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #127
2026-02-14 23:57:17 +09:00
agentson
63f4e49d88 feat: read-only FastAPI dashboard with 7 API endpoints (issue #96)
Some checks failed
CI / test (pull_request) Has been cancelled
Add observability dashboard: status, playbook, scorecard, performance,
context browser, decisions, and active scenarios endpoints.
SQLite read-only on separate connections from trading loop.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-14 23:56:10 +09:00
agentson
e0a6b307a2 fix: add error handling to evolution loop telegram notification
Wrap evolution notification in try/except so telegram failures don't
crash the evolution loop. Add integration tests for market close flow.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-14 23:56:04 +09:00
75320eb587 Merge pull request 'feat: 전략 진화 루프 연결 (issue #95)' (#126) from feature/issue-95-evolution-loop into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #126
2026-02-14 23:42:30 +09:00
19 changed files with 2378 additions and 288 deletions

View File

@@ -68,6 +68,10 @@ High-frequency trading with individual stock analysis:
- `fetch_market_rankings()` — Fetch volume surge rankings from KIS API - `fetch_market_rankings()` — Fetch volume surge rankings from KIS API
- `get_daily_prices()` — Fetch OHLCV history for technical analysis - `get_daily_prices()` — Fetch OHLCV history for technical analysis
**Overseas Ranking API Methods** (added in v0.10.x):
- `fetch_overseas_rankings()` — Fetch overseas ranking universe (fluctuation / volume)
- Ranking endpoint paths and TR_IDs are configurable via environment variables
### 2. Analysis (`src/analysis/`) ### 2. Analysis (`src/analysis/`)
**VolatilityAnalyzer** (`volatility.py`) — Technical indicator calculations **VolatilityAnalyzer** (`volatility.py`) — Technical indicator calculations
@@ -81,20 +85,24 @@ High-frequency trading with individual stock analysis:
**SmartVolatilityScanner** (`smart_scanner.py`) — Python-first filtering pipeline **SmartVolatilityScanner** (`smart_scanner.py`) — Python-first filtering pipeline
- **Step 1**: Fetch volume rankings from KIS API (top 30 stocks) - **Domestic (KR)**:
- **Step 2**: Calculate RSI and volume ratio for each stock - **Step 1**: Fetch domestic fluctuation ranking as primary universe
- **Step 3**: Apply filters: - **Step 2**: Fetch domestic volume ranking for liquidity bonus
- Volume ratio >= `VOL_MULTIPLIER` (default 2.0x previous day) - **Step 3**: Compute volatility-first score (max of daily change% and intraday range%)
- RSI < `RSI_OVERSOLD_THRESHOLD` (30) OR RSI > `RSI_MOMENTUM_THRESHOLD` (70) - **Step 4**: Apply liquidity bonus and return top N candidates
- **Step 4**: Score candidates by RSI extremity (60%) + volume surge (40%) - **Overseas (US/JP/HK/CN/VN)**:
- **Step 5**: Return top N candidates (default 3) for AI analysis - **Step 1**: Fetch overseas ranking universe (fluctuation rank + volume rank bonus)
- **Fallback**: Uses static watchlist if ranking API unavailable - **Step 2**: Compute volatility-first score (max of daily change% and intraday range%)
- **Step 3**: Apply liquidity bonus from volume ranking
- **Step 4**: Return top N candidates (default 3)
- **Fallback (overseas only)**: If ranking API is unavailable, uses dynamic universe
from runtime active symbols + recent traded symbols + current holdings (no static watchlist)
- **Realtime mode only**: Daily mode uses batch processing for API efficiency - **Realtime mode only**: Daily mode uses batch processing for API efficiency
**Benefits:** **Benefits:**
- Reduces Gemini API calls from 20-30 stocks to 1-3 qualified candidates - Reduces Gemini API calls from 20-30 stocks to 1-3 qualified candidates
- Fast Python-based filtering before expensive AI judgment - Fast Python-based filtering before expensive AI judgment
- Logs selection context (RSI, volume_ratio, signal, score) for Evolution system - Logs selection context (RSI-compatible proxy, volume_ratio, signal, score) for Evolution system
### 3. Brain (`src/brain/gemini_client.py`) ### 3. Brain (`src/brain/gemini_client.py`)
@@ -167,10 +175,12 @@ High-frequency trading with individual stock analysis:
┌──────────────────────────────────┐ ┌──────────────────────────────────┐
│ Smart Scanner (Python-first) │ │ Smart Scanner (Python-first) │
│ - Fetch volume rankings (KIS) │ - Domestic: fluctuation rank
- Get 20d price history per stock + volume rank bonus
- Calculate RSI(14) + vol ratio + volatility-first scoring
│ - Filter: vol>2x AND RSI extreme │ - Overseas: ranking universe
│ + volatility-first scoring │
│ - Fallback: dynamic universe │
│ - Return top 3 qualified stocks │ │ - Return top 3 qualified stocks │
└──────────────────┬────────────────┘ └──────────────────┬────────────────┘
@@ -303,10 +313,23 @@ TELEGRAM_CHAT_ID=123456789
TELEGRAM_ENABLED=true TELEGRAM_ENABLED=true
# Smart Scanner (optional, realtime mode only) # Smart Scanner (optional, realtime mode only)
RSI_OVERSOLD_THRESHOLD=30 # 0-50, oversold threshold
RSI_MOMENTUM_THRESHOLD=70 # 50-100, momentum threshold
VOL_MULTIPLIER=2.0 # Minimum volume ratio (2.0 = 200%)
SCANNER_TOP_N=3 # Max qualified candidates per scan SCANNER_TOP_N=3 # Max qualified candidates per scan
POSITION_SIZING_ENABLED=true
POSITION_BASE_ALLOCATION_PCT=5.0
POSITION_MIN_ALLOCATION_PCT=1.0
POSITION_MAX_ALLOCATION_PCT=10.0
POSITION_VOLATILITY_TARGET_SCORE=50.0
# Legacy/compat scanner thresholds (kept for backward compatibility)
RSI_OVERSOLD_THRESHOLD=30
RSI_MOMENTUM_THRESHOLD=70
VOL_MULTIPLIER=2.0
# Overseas Ranking API (optional override; account-dependent)
OVERSEAS_RANKING_ENABLED=true
OVERSEAS_RANKING_FLUCT_TR_ID=HHDFS76200100
OVERSEAS_RANKING_VOLUME_TR_ID=HHDFS76200200
OVERSEAS_RANKING_FLUCT_PATH=/uapi/overseas-price/v1/quotations/inquire-updown-rank
OVERSEAS_RANKING_VOLUME_PATH=/uapi/overseas-price/v1/quotations/inquire-volume-rank
``` ```
Tests use in-memory SQLite (`DB_PATH=":memory:"`) and dummy credentials via `tests/conftest.py`. Tests use in-memory SQLite (`DB_PATH=":memory:"`) and dummy credentials via `tests/conftest.py`.

View File

@@ -86,3 +86,61 @@
- Plan Consistency (필수), Safety & Constraints, Quality, Workflow 4개 카테고리 - Plan Consistency (필수), Safety & Constraints, Quality, Workflow 4개 카테고리
**이슈/PR:** #114 **이슈/PR:** #114
---
## 2026-02-16
### 해외 스캐너 개선: 랭킹 연동 + 변동성 우선 선별
**배경:**
- `run_overnight` 실운영에서 미국장 동안 거래가 0건 지속
- 원인: 해외 시장에서도 국내 랭킹/일봉 API 경로를 사용하던 구조적 불일치
**요구사항:**
1. 해외 시장도 랭킹 API 기반 유니버스 탐색 지원
2. 단순 상승률/거래대금 상위가 아니라, **변동성이 큰 종목**을 우선 선별
3. 고정 티커 fallback 금지
**구현 결과:**
- `src/broker/overseas.py`
- `fetch_overseas_rankings()` 추가 (fluctuation / volume)
- 해외 랭킹 API 경로/TR_ID를 설정값으로 오버라이드 가능하게 구현
- `src/analysis/smart_scanner.py`
- market-aware 스캔(국내/해외 분리)
- 해외: 랭킹 API 유니버스 + 변동성 우선 점수(일변동률 vs 장중 고저폭)
- 거래대금/거래량 랭킹은 유동성 보정 점수로 활용
- 랭킹 실패 시에는 동적 유니버스(active/recent/holdings)만 사용
- `src/config.py`
- `OVERSEAS_RANKING_*` 설정 추가
**효과:**
- 해외 시장에서 스캐너 후보 0개로 정지되는 상황 완화
- 종목 선정 기준이 단순 상승률 중심에서 변동성 중심으로 개선
- 고정 티커 없이도 시장 주도 변동 종목 탐지 가능
### 국내 스캐너/주문수량 정렬: 변동성 우선 + 리스크 타기팅
**배경:**
- 해외만 변동성 우선으로 동작하고, 국내는 RSI/거래량 필터 중심으로 동작해 시장 간 전략 일관성이 낮았음
- 매수 수량이 고정 1주라서 변동성 구간별 익스포저 관리가 어려웠음
**요구사항:**
1. 국내 스캐너도 변동성 우선 선별로 해외와 통일
2. 고변동 종목일수록 포지션 크기를 줄이는 수량 산식 적용
**구현 결과:**
- `src/analysis/smart_scanner.py`
- 국내: `fluctuation ranking + volume ranking bonus` 기반 점수화로 전환
- 점수는 `max(abs(change_rate), intraday_range_pct)` 중심으로 계산
- 국내 랭킹 응답 스키마 키(`price`, `change_rate`, `volume`) 파싱 보강
- `src/main.py`
- `_determine_order_quantity()` 추가
- BUY 시 변동성 점수 기반 동적 수량 산정 적용
- `trading_cycle`, `run_daily_session` 경로 모두 동일 수량 로직 사용
- `src/config.py`
- `POSITION_SIZING_*` 설정 추가
**효과:**
- 국내/해외 스캐너 기준이 변동성 중심으로 일관화
- 고변동 구간에서 자동 익스포저 축소, 저변동 구간에서 과소진입 완화

View File

@@ -9,6 +9,8 @@ dependencies = [
"pydantic-settings>=2.1,<3", "pydantic-settings>=2.1,<3",
"google-genai>=1.0,<2", "google-genai>=1.0,<2",
"scipy>=1.11,<2", "scipy>=1.11,<2",
"fastapi>=0.110,<1",
"uvicorn>=0.29,<1",
] ]
[project.optional-dependencies] [project.optional-dependencies]

View File

@@ -1,8 +1,4 @@
"""Smart Volatility Scanner with RSI and volume filters. """Smart Volatility Scanner with volatility-first market ranking logic."""
Fetches market rankings from KIS API and applies technical filters
to identify high-probability trading candidates.
"""
from __future__ import annotations from __future__ import annotations
@@ -12,7 +8,9 @@ from typing import Any
from src.analysis.volatility import VolatilityAnalyzer from src.analysis.volatility import VolatilityAnalyzer
from src.broker.kis_api import KISBroker from src.broker.kis_api import KISBroker
from src.broker.overseas import OverseasBroker
from src.config import Settings from src.config import Settings
from src.markets.schedule import MarketInfo
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -32,19 +30,19 @@ class ScanCandidate:
class SmartVolatilityScanner: class SmartVolatilityScanner:
"""Scans market rankings and applies RSI/volume filters. """Scans market rankings and applies volatility-first filters.
Flow: Flow:
1. Fetch volume rankings from KIS API 1. Fetch fluctuation rankings as primary universe
2. For each ranked stock, fetch daily prices 2. Fetch volume rankings for liquidity bonus
3. Calculate RSI and volume ratio 3. Score by volatility first, liquidity second
4. Apply filters: volume > VOL_MULTIPLIER AND (RSI < 30 OR RSI > 70) 4. Return top N qualified candidates
5. Return top N qualified candidates
""" """
def __init__( def __init__(
self, self,
broker: KISBroker, broker: KISBroker,
overseas_broker: OverseasBroker | None,
volatility_analyzer: VolatilityAnalyzer, volatility_analyzer: VolatilityAnalyzer,
settings: Settings, settings: Settings,
) -> None: ) -> None:
@@ -56,6 +54,7 @@ class SmartVolatilityScanner:
settings: Application settings settings: Application settings
""" """
self.broker = broker self.broker = broker
self.overseas_broker = overseas_broker
self.analyzer = volatility_analyzer self.analyzer = volatility_analyzer
self.settings = settings self.settings = settings
@@ -67,107 +66,129 @@ class SmartVolatilityScanner:
async def scan( async def scan(
self, self,
market: MarketInfo | None = None,
fallback_stocks: list[str] | None = None, fallback_stocks: list[str] | None = None,
) -> list[ScanCandidate]: ) -> list[ScanCandidate]:
"""Execute smart scan and return qualified candidates. """Execute smart scan and return qualified candidates.
Args: Args:
market: Target market info (domestic vs overseas behavior)
fallback_stocks: Stock codes to use if ranking API fails fallback_stocks: Stock codes to use if ranking API fails
Returns: Returns:
List of ScanCandidate, sorted by score, up to top_n items List of ScanCandidate, sorted by score, up to top_n items
""" """
# Step 1: Fetch rankings if market and not market.is_domestic:
return await self._scan_overseas(market, fallback_stocks)
return await self._scan_domestic(fallback_stocks)
async def _scan_domestic(
self,
fallback_stocks: list[str] | None = None,
) -> list[ScanCandidate]:
"""Scan domestic market using volatility-first ranking + liquidity bonus."""
# 1) Primary universe from fluctuation ranking.
try: try:
rankings = await self.broker.fetch_market_rankings( fluct_rows = await self.broker.fetch_market_rankings(
ranking_type="volume", ranking_type="fluctuation",
limit=30, # Fetch more than needed for filtering limit=50,
) )
logger.info("Fetched %d stocks from volume rankings", len(rankings))
except ConnectionError as exc: except ConnectionError as exc:
logger.warning("Ranking API failed, using fallback: %s", exc) logger.warning("Domestic fluctuation ranking failed: %s", exc)
if fallback_stocks: fluct_rows = []
# Create minimal ranking data for fallback
rankings = [ # 2) Liquidity bonus from volume ranking.
{ try:
"stock_code": code, volume_rows = await self.broker.fetch_market_rankings(
"name": code, ranking_type="volume",
"price": 0, limit=50,
"volume": 0, )
"change_rate": 0, except ConnectionError as exc:
"volume_increase_rate": 0, logger.warning("Domestic volume ranking failed: %s", exc)
} volume_rows = []
for code in fallback_stocks
] if not fluct_rows and fallback_stocks:
else: logger.info(
return [] "Domestic ranking unavailable; using fallback symbols (%d)",
len(fallback_stocks),
)
fluct_rows = [
{
"stock_code": code,
"name": code,
"price": 0.0,
"volume": 0.0,
"change_rate": 0.0,
"volume_increase_rate": 0.0,
}
for code in fallback_stocks
]
if not fluct_rows:
return []
volume_rank_bonus: dict[str, float] = {}
for idx, row in enumerate(volume_rows):
code = _extract_stock_code(row)
if not code:
continue
volume_rank_bonus[code] = max(0.0, 15.0 - idx * 0.3)
# Step 2: Analyze each stock
candidates: list[ScanCandidate] = [] candidates: list[ScanCandidate] = []
for stock in fluct_rows:
for stock in rankings: stock_code = _extract_stock_code(stock)
stock_code = stock["stock_code"]
if not stock_code: if not stock_code:
continue continue
try: try:
# Fetch daily prices for RSI calculation price = _extract_last_price(stock)
daily_prices = await self.broker.get_daily_prices(stock_code, days=20) change_rate = _extract_change_rate_pct(stock)
volume = _extract_volume(stock)
if len(daily_prices) < 15: # Need at least 14+1 for RSI intraday_range_pct = 0.0
logger.debug("Insufficient price history for %s", stock_code) volume_ratio = _safe_float(stock.get("volume_increase_rate"), 0.0) / 100.0 + 1.0
# Use daily chart to refine range/volume when available.
daily_prices = await self.broker.get_daily_prices(stock_code, days=2)
if daily_prices:
latest = daily_prices[-1]
latest_close = _safe_float(latest.get("close"), default=price)
if price <= 0:
price = latest_close
latest_high = _safe_float(latest.get("high"))
latest_low = _safe_float(latest.get("low"))
if latest_close > 0 and latest_high > 0 and latest_low > 0 and latest_high >= latest_low:
intraday_range_pct = (latest_high - latest_low) / latest_close * 100.0
if volume <= 0:
volume = _safe_float(latest.get("volume"))
if len(daily_prices) >= 2:
prev_day_volume = _safe_float(daily_prices[-2].get("volume"))
if prev_day_volume > 0:
volume_ratio = max(volume_ratio, volume / prev_day_volume)
volatility_pct = max(abs(change_rate), intraday_range_pct)
if price <= 0 or volatility_pct < 0.8:
continue continue
# Calculate RSI volatility_score = min(volatility_pct / 10.0, 1.0) * 85.0
close_prices = [p["close"] for p in daily_prices] liquidity_score = volume_rank_bonus.get(stock_code, 0.0)
rsi = self.analyzer.calculate_rsi(close_prices, period=14) score = min(100.0, volatility_score + liquidity_score)
signal = "momentum" if change_rate >= 0 else "oversold"
implied_rsi = max(0.0, min(100.0, 50.0 + (change_rate * 4.0)))
# Calculate volume ratio (today vs previous day avg) candidates.append(
if len(daily_prices) >= 2: ScanCandidate(
prev_day_volume = daily_prices[-2]["volume"] stock_code=stock_code,
current_volume = stock.get("volume", 0) or daily_prices[-1]["volume"] name=stock.get("name", stock_code),
volume_ratio = ( price=price,
current_volume / prev_day_volume if prev_day_volume > 0 else 1.0 volume=volume,
) volume_ratio=max(1.0, volume_ratio, volatility_pct / 2.0),
else: rsi=implied_rsi,
volume_ratio = stock.get("volume_increase_rate", 0) / 100 + 1 # Fallback signal=signal,
score=score,
# Apply filters
volume_qualified = volume_ratio >= self.vol_multiplier
rsi_oversold = rsi < self.rsi_oversold
rsi_momentum = rsi > self.rsi_momentum
if volume_qualified and (rsi_oversold or rsi_momentum):
signal = "oversold" if rsi_oversold else "momentum"
# Calculate composite score
# Higher score for: extreme RSI + high volume
rsi_extremity = abs(rsi - 50) / 50 # 0-1 scale
volume_score = min(volume_ratio / 5, 1.0) # Cap at 5x
score = (rsi_extremity * 0.6 + volume_score * 0.4) * 100
candidates.append(
ScanCandidate(
stock_code=stock_code,
name=stock.get("name", stock_code),
price=stock.get("price", daily_prices[-1]["close"]),
volume=current_volume,
volume_ratio=volume_ratio,
rsi=rsi,
signal=signal,
score=score,
)
)
logger.info(
"Qualified: %s (%s) RSI=%.1f vol=%.1fx signal=%s score=%.1f",
stock_code,
stock.get("name", ""),
rsi,
volume_ratio,
signal,
score,
) )
)
except ConnectionError as exc: except ConnectionError as exc:
logger.warning("Failed to analyze %s: %s", stock_code, exc) logger.warning("Failed to analyze %s: %s", stock_code, exc)
@@ -176,10 +197,161 @@ class SmartVolatilityScanner:
logger.error("Unexpected error analyzing %s: %s", stock_code, exc) logger.error("Unexpected error analyzing %s: %s", stock_code, exc)
continue continue
# Sort by score and return top N logger.info("Domestic ranking scan found %d candidates", len(candidates))
candidates.sort(key=lambda c: c.score, reverse=True) candidates.sort(key=lambda c: c.score, reverse=True)
return candidates[: self.top_n] return candidates[: self.top_n]
async def _scan_overseas(
self,
market: MarketInfo,
fallback_stocks: list[str] | None = None,
) -> list[ScanCandidate]:
"""Scan overseas symbols using ranking API first, then fallback universe."""
if self.overseas_broker is None:
logger.warning(
"Overseas scanner unavailable for %s: overseas broker not configured",
market.name,
)
return []
candidates = await self._scan_overseas_from_rankings(market)
if not candidates:
candidates = await self._scan_overseas_from_symbols(market, fallback_stocks)
candidates.sort(key=lambda c: c.score, reverse=True)
return candidates[: self.top_n]
async def _scan_overseas_from_rankings(
self,
market: MarketInfo,
) -> list[ScanCandidate]:
"""Build overseas candidates from ranking APIs using volatility-first scoring."""
assert self.overseas_broker is not None
try:
fluct_rows = await self.overseas_broker.fetch_overseas_rankings(
exchange_code=market.exchange_code,
ranking_type="fluctuation",
limit=50,
)
except Exception as exc:
logger.warning(
"Overseas fluctuation ranking failed for %s: %s", market.code, exc
)
fluct_rows = []
if not fluct_rows:
return []
volume_rank_bonus: dict[str, float] = {}
try:
volume_rows = await self.overseas_broker.fetch_overseas_rankings(
exchange_code=market.exchange_code,
ranking_type="volume",
limit=50,
)
except Exception as exc:
logger.warning(
"Overseas volume ranking failed for %s: %s", market.code, exc
)
volume_rows = []
for idx, row in enumerate(volume_rows):
code = _extract_stock_code(row)
if not code:
continue
# Top-ranked by traded value/volume gets higher liquidity bonus.
volume_rank_bonus[code] = max(0.0, 15.0 - idx * 0.3)
candidates: list[ScanCandidate] = []
for row in fluct_rows:
stock_code = _extract_stock_code(row)
if not stock_code:
continue
price = _extract_last_price(row)
change_rate = _extract_change_rate_pct(row)
volume = _extract_volume(row)
intraday_range_pct = _extract_intraday_range_pct(row, price)
volatility_pct = max(abs(change_rate), intraday_range_pct)
# Volatility-first filter (not simple gainers/value ranking).
if price <= 0 or volatility_pct < 0.8:
continue
volatility_score = min(volatility_pct / 10.0, 1.0) * 85.0
liquidity_score = volume_rank_bonus.get(stock_code, 0.0)
score = min(100.0, volatility_score + liquidity_score)
signal = "momentum" if change_rate >= 0 else "oversold"
implied_rsi = max(0.0, min(100.0, 50.0 + (change_rate * 4.0)))
candidates.append(
ScanCandidate(
stock_code=stock_code,
name=str(row.get("name") or row.get("ovrs_item_name") or stock_code),
price=price,
volume=volume,
volume_ratio=max(1.0, volatility_pct / 2.0),
rsi=implied_rsi,
signal=signal,
score=score,
)
)
if candidates:
logger.info(
"Overseas ranking scan found %d candidates for %s",
len(candidates),
market.name,
)
return candidates
async def _scan_overseas_from_symbols(
self,
market: MarketInfo,
symbols: list[str] | None,
) -> list[ScanCandidate]:
"""Fallback overseas scan from dynamic symbol universe."""
assert self.overseas_broker is not None
if not symbols:
logger.info("Overseas scanner: no symbol universe for %s", market.name)
return []
candidates: list[ScanCandidate] = []
for stock_code in symbols:
try:
price_data = await self.overseas_broker.get_overseas_price(
market.exchange_code, stock_code
)
output = price_data.get("output", {})
price = _extract_last_price(output)
change_rate = _extract_change_rate_pct(output)
volume = _extract_volume(output)
intraday_range_pct = _extract_intraday_range_pct(output, price)
volatility_pct = max(abs(change_rate), intraday_range_pct)
if price <= 0 or volatility_pct < 0.8:
continue
score = min(volatility_pct / 10.0, 1.0) * 100.0
signal = "momentum" if change_rate >= 0 else "oversold"
implied_rsi = max(0.0, min(100.0, 50.0 + (change_rate * 4.0)))
candidates.append(
ScanCandidate(
stock_code=stock_code,
name=stock_code,
price=price,
volume=volume,
volume_ratio=max(1.0, volatility_pct / 2.0),
rsi=implied_rsi,
signal=signal,
score=score,
)
)
except ConnectionError as exc:
logger.warning("Failed to analyze overseas %s: %s", stock_code, exc)
except Exception as exc:
logger.error("Unexpected error analyzing overseas %s: %s", stock_code, exc)
return candidates
def get_stock_codes(self, candidates: list[ScanCandidate]) -> list[str]: def get_stock_codes(self, candidates: list[ScanCandidate]) -> list[str]:
"""Extract stock codes from candidates for watchlist update. """Extract stock codes from candidates for watchlist update.
@@ -190,3 +362,78 @@ class SmartVolatilityScanner:
List of stock codes List of stock codes
""" """
return [c.stock_code for c in candidates] return [c.stock_code for c in candidates]
def _safe_float(value: Any, default: float = 0.0) -> float:
"""Convert arbitrary values to float safely."""
if value in (None, ""):
return default
try:
return float(value)
except (TypeError, ValueError):
return default
def _extract_stock_code(row: dict[str, Any]) -> str:
"""Extract normalized stock code from various API schemas."""
return (
str(
row.get("symb")
or row.get("ovrs_pdno")
or row.get("stock_code")
or row.get("pdno")
or ""
)
.strip()
.upper()
)
def _extract_last_price(row: dict[str, Any]) -> float:
"""Extract last/close-like price from API schema variants."""
return _safe_float(
row.get("last")
or row.get("ovrs_nmix_prpr")
or row.get("stck_prpr")
or row.get("price")
or row.get("close")
)
def _extract_change_rate_pct(row: dict[str, Any]) -> float:
"""Extract daily change rate (%) from API schema variants."""
return _safe_float(
row.get("rate")
or row.get("change_rate")
or row.get("prdy_ctrt")
or row.get("evlu_pfls_rt")
or row.get("chg_rt")
)
def _extract_volume(row: dict[str, Any]) -> float:
"""Extract volume/traded-amount proxy from schema variants."""
return _safe_float(
row.get("tvol") or row.get("acml_vol") or row.get("vol") or row.get("volume")
)
def _extract_intraday_range_pct(row: dict[str, Any], price: float) -> float:
"""Estimate intraday range percentage from high/low fields."""
if price <= 0:
return 0.0
high = _safe_float(
row.get("high")
or row.get("ovrs_hgpr")
or row.get("stck_hgpr")
or row.get("day_hgpr")
)
low = _safe_float(
row.get("low")
or row.get("ovrs_lwpr")
or row.get("stck_lwpr")
or row.get("day_lwpr")
)
if high <= 0 or low <= 0 or high < low:
return 0.0
return (high - low) / price * 100.0

View File

@@ -64,6 +64,65 @@ class OverseasBroker:
f"Network error fetching overseas price: {exc}" f"Network error fetching overseas price: {exc}"
) from exc ) from exc
async def fetch_overseas_rankings(
self,
exchange_code: str,
ranking_type: str = "fluctuation",
limit: int = 30,
) -> list[dict[str, Any]]:
"""Fetch overseas rankings (price change or volume amount).
Ranking API specs may differ by account/product. Endpoint paths and
TR_IDs are configurable via settings and can be overridden in .env.
"""
if not self._broker._settings.OVERSEAS_RANKING_ENABLED:
return []
await self._broker._rate_limiter.acquire()
session = self._broker._get_session()
if ranking_type == "volume":
tr_id = self._broker._settings.OVERSEAS_RANKING_VOLUME_TR_ID
path = self._broker._settings.OVERSEAS_RANKING_VOLUME_PATH
else:
tr_id = self._broker._settings.OVERSEAS_RANKING_FLUCT_TR_ID
path = self._broker._settings.OVERSEAS_RANKING_FLUCT_PATH
headers = await self._broker._auth_headers(tr_id)
url = f"{self._broker._base_url}{path}"
# Try common param variants used by KIS overseas quotation APIs.
param_variants = [
{"AUTH": "", "EXCD": exchange_code, "NREC": str(max(limit, 30))},
{"AUTH": "", "OVRS_EXCG_CD": exchange_code, "NREC": str(max(limit, 30))},
{"AUTH": "", "EXCD": exchange_code},
{"AUTH": "", "OVRS_EXCG_CD": exchange_code},
]
last_error: str | None = None
for params in param_variants:
try:
async with session.get(url, headers=headers, params=params) as resp:
text = await resp.text()
if resp.status != 200:
last_error = f"HTTP {resp.status}: {text}"
continue
data = await resp.json()
rows = self._extract_ranking_rows(data)
if rows:
return rows[:limit]
# keep trying another param variant if response has no usable rows
last_error = f"empty output (keys={list(data.keys())})"
except (TimeoutError, aiohttp.ClientError) as exc:
last_error = str(exc)
continue
raise ConnectionError(
f"fetch_overseas_rankings failed for {exchange_code}/{ranking_type}: {last_error}"
)
async def get_overseas_balance(self, exchange_code: str) -> dict[str, Any]: async def get_overseas_balance(self, exchange_code: str) -> dict[str, Any]:
""" """
Fetch overseas account balance. Fetch overseas account balance.
@@ -198,3 +257,11 @@ class OverseasBroker:
"HSX": "VND", "HSX": "VND",
} }
return currency_map.get(exchange_code, "USD") return currency_map.get(exchange_code, "USD")
def _extract_ranking_rows(self, data: dict[str, Any]) -> list[dict[str, Any]]:
"""Extract list rows from ranking response across schema variants."""
candidates = [data.get("output"), data.get("output1"), data.get("output2")]
for value in candidates:
if isinstance(value, list):
return [row for row in value if isinstance(row, dict)]
return []

View File

@@ -38,6 +38,11 @@ class Settings(BaseSettings):
RSI_MOMENTUM_THRESHOLD: int = Field(default=70, ge=50, le=100) RSI_MOMENTUM_THRESHOLD: int = Field(default=70, ge=50, le=100)
VOL_MULTIPLIER: float = Field(default=2.0, gt=1.0, le=10.0) VOL_MULTIPLIER: float = Field(default=2.0, gt=1.0, le=10.0)
SCANNER_TOP_N: int = Field(default=3, ge=1, le=10) SCANNER_TOP_N: int = Field(default=3, ge=1, le=10)
POSITION_SIZING_ENABLED: bool = True
POSITION_BASE_ALLOCATION_PCT: float = Field(default=5.0, gt=0.0, le=30.0)
POSITION_MIN_ALLOCATION_PCT: float = Field(default=1.0, gt=0.0, le=20.0)
POSITION_MAX_ALLOCATION_PCT: float = Field(default=10.0, gt=0.0, le=50.0)
POSITION_VOLATILITY_TARGET_SCORE: float = Field(default=50.0, gt=0.0, le=100.0)
# Database # Database
DB_PATH: str = "data/trade_logs.db" DB_PATH: str = "data/trade_logs.db"
@@ -83,6 +88,23 @@ class Settings(BaseSettings):
TELEGRAM_COMMANDS_ENABLED: bool = True TELEGRAM_COMMANDS_ENABLED: bool = True
TELEGRAM_POLLING_INTERVAL: float = 1.0 # seconds TELEGRAM_POLLING_INTERVAL: float = 1.0 # seconds
# Overseas ranking API (KIS endpoint/TR_ID may vary by account/product)
# Override these from .env if your account uses different specs.
OVERSEAS_RANKING_ENABLED: bool = True
OVERSEAS_RANKING_FLUCT_TR_ID: str = "HHDFS76200100"
OVERSEAS_RANKING_VOLUME_TR_ID: str = "HHDFS76200200"
OVERSEAS_RANKING_FLUCT_PATH: str = (
"/uapi/overseas-price/v1/quotations/inquire-updown-rank"
)
OVERSEAS_RANKING_VOLUME_PATH: str = (
"/uapi/overseas-price/v1/quotations/inquire-volume-rank"
)
# Dashboard (optional)
DASHBOARD_ENABLED: bool = False
DASHBOARD_HOST: str = "127.0.0.1"
DASHBOARD_PORT: int = Field(default=8080, ge=1, le=65535)
model_config = {"env_file": ".env", "env_file_encoding": "utf-8"} model_config = {"env_file": ".env", "env_file_encoding": "utf-8"}
@property @property
@@ -96,4 +118,7 @@ class Settings(BaseSettings):
@property @property
def enabled_market_list(self) -> list[str]: def enabled_market_list(self) -> list[str]:
"""Parse ENABLED_MARKETS into list of market codes.""" """Parse ENABLED_MARKETS into list of market codes."""
return [m.strip() for m in self.ENABLED_MARKETS.split(",") if m.strip()] from src.markets.schedule import expand_market_codes
raw = [m.strip() for m in self.ENABLED_MARKETS.split(",") if m.strip()]
return expand_market_codes(raw)

View File

@@ -0,0 +1,5 @@
"""FastAPI dashboard package for observability APIs."""
from src.dashboard.app import create_dashboard_app
__all__ = ["create_dashboard_app"]

361
src/dashboard/app.py Normal file
View File

@@ -0,0 +1,361 @@
"""FastAPI application for observability dashboard endpoints."""
from __future__ import annotations
import json
import sqlite3
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from fastapi import FastAPI, HTTPException, Query
from fastapi.responses import FileResponse
def create_dashboard_app(db_path: str) -> FastAPI:
"""Create dashboard FastAPI app bound to a SQLite database path."""
app = FastAPI(title="The Ouroboros Dashboard", version="1.0.0")
app.state.db_path = db_path
@app.get("/")
def index() -> FileResponse:
index_path = Path(__file__).parent / "static" / "index.html"
return FileResponse(index_path)
@app.get("/api/status")
def get_status() -> dict[str, Any]:
today = datetime.now(UTC).date().isoformat()
with _connect(db_path) as conn:
market_rows = conn.execute(
"""
SELECT DISTINCT market FROM (
SELECT market FROM trades WHERE DATE(timestamp) = ?
UNION
SELECT market FROM decision_logs WHERE DATE(timestamp) = ?
UNION
SELECT market FROM playbooks WHERE date = ?
) ORDER BY market
""",
(today, today, today),
).fetchall()
markets = [row[0] for row in market_rows] if market_rows else []
market_status: dict[str, Any] = {}
total_trades = 0
total_pnl = 0.0
total_decisions = 0
for market in markets:
trade_row = conn.execute(
"""
SELECT COUNT(*) AS c, COALESCE(SUM(pnl), 0.0) AS p
FROM trades
WHERE DATE(timestamp) = ? AND market = ?
""",
(today, market),
).fetchone()
decision_row = conn.execute(
"""
SELECT COUNT(*) AS c
FROM decision_logs
WHERE DATE(timestamp) = ? AND market = ?
""",
(today, market),
).fetchone()
playbook_row = conn.execute(
"""
SELECT status
FROM playbooks
WHERE date = ? AND market = ?
LIMIT 1
""",
(today, market),
).fetchone()
market_status[market] = {
"trade_count": int(trade_row["c"] if trade_row else 0),
"total_pnl": float(trade_row["p"] if trade_row else 0.0),
"decision_count": int(decision_row["c"] if decision_row else 0),
"playbook_status": playbook_row["status"] if playbook_row else None,
}
total_trades += market_status[market]["trade_count"]
total_pnl += market_status[market]["total_pnl"]
total_decisions += market_status[market]["decision_count"]
return {
"date": today,
"markets": market_status,
"totals": {
"trade_count": total_trades,
"total_pnl": round(total_pnl, 2),
"decision_count": total_decisions,
},
}
@app.get("/api/playbook/{date_str}")
def get_playbook(date_str: str, market: str = Query("KR")) -> dict[str, Any]:
with _connect(db_path) as conn:
row = conn.execute(
"""
SELECT date, market, status, playbook_json, generated_at,
token_count, scenario_count, match_count
FROM playbooks
WHERE date = ? AND market = ?
""",
(date_str, market),
).fetchone()
if row is None:
raise HTTPException(status_code=404, detail="playbook not found")
return {
"date": row["date"],
"market": row["market"],
"status": row["status"],
"playbook": json.loads(row["playbook_json"]),
"generated_at": row["generated_at"],
"token_count": row["token_count"],
"scenario_count": row["scenario_count"],
"match_count": row["match_count"],
}
@app.get("/api/scorecard/{date_str}")
def get_scorecard(date_str: str, market: str = Query("KR")) -> dict[str, Any]:
key = f"scorecard_{market}"
with _connect(db_path) as conn:
row = conn.execute(
"""
SELECT value
FROM contexts
WHERE layer = 'L6_DAILY' AND timeframe = ? AND key = ?
""",
(date_str, key),
).fetchone()
if row is None:
raise HTTPException(status_code=404, detail="scorecard not found")
return {"date": date_str, "market": market, "scorecard": json.loads(row["value"])}
@app.get("/api/performance")
def get_performance(market: str = Query("all")) -> dict[str, Any]:
with _connect(db_path) as conn:
if market == "all":
by_market_rows = conn.execute(
"""
SELECT market,
COUNT(*) AS total_trades,
SUM(CASE WHEN pnl > 0 THEN 1 ELSE 0 END) AS wins,
SUM(CASE WHEN pnl < 0 THEN 1 ELSE 0 END) AS losses,
COALESCE(SUM(pnl), 0.0) AS total_pnl,
COALESCE(AVG(confidence), 0.0) AS avg_confidence
FROM trades
GROUP BY market
ORDER BY market
"""
).fetchall()
combined = _performance_from_rows(by_market_rows)
return {
"market": "all",
"combined": combined,
"by_market": [
_row_to_performance(row)
for row in by_market_rows
],
}
row = conn.execute(
"""
SELECT market,
COUNT(*) AS total_trades,
SUM(CASE WHEN pnl > 0 THEN 1 ELSE 0 END) AS wins,
SUM(CASE WHEN pnl < 0 THEN 1 ELSE 0 END) AS losses,
COALESCE(SUM(pnl), 0.0) AS total_pnl,
COALESCE(AVG(confidence), 0.0) AS avg_confidence
FROM trades
WHERE market = ?
GROUP BY market
""",
(market,),
).fetchone()
if row is None:
return {"market": market, "metrics": _empty_performance(market)}
return {"market": market, "metrics": _row_to_performance(row)}
@app.get("/api/context/{layer}")
def get_context_layer(
layer: str,
timeframe: str | None = Query(default=None),
limit: int = Query(default=100, ge=1, le=1000),
) -> dict[str, Any]:
with _connect(db_path) as conn:
if timeframe is None:
rows = conn.execute(
"""
SELECT timeframe, key, value, updated_at
FROM contexts
WHERE layer = ?
ORDER BY updated_at DESC
LIMIT ?
""",
(layer, limit),
).fetchall()
else:
rows = conn.execute(
"""
SELECT timeframe, key, value, updated_at
FROM contexts
WHERE layer = ? AND timeframe = ?
ORDER BY key
LIMIT ?
""",
(layer, timeframe, limit),
).fetchall()
entries = [
{
"timeframe": row["timeframe"],
"key": row["key"],
"value": json.loads(row["value"]),
"updated_at": row["updated_at"],
}
for row in rows
]
return {
"layer": layer,
"timeframe": timeframe,
"count": len(entries),
"entries": entries,
}
@app.get("/api/decisions")
def get_decisions(
market: str = Query("KR"),
limit: int = Query(default=50, ge=1, le=500),
) -> dict[str, Any]:
with _connect(db_path) as conn:
rows = conn.execute(
"""
SELECT decision_id, timestamp, stock_code, market, exchange_code,
action, confidence, rationale, context_snapshot, input_data,
outcome_pnl, outcome_accuracy
FROM decision_logs
WHERE market = ?
ORDER BY timestamp DESC
LIMIT ?
""",
(market, limit),
).fetchall()
decisions = []
for row in rows:
decisions.append(
{
"decision_id": row["decision_id"],
"timestamp": row["timestamp"],
"stock_code": row["stock_code"],
"market": row["market"],
"exchange_code": row["exchange_code"],
"action": row["action"],
"confidence": row["confidence"],
"rationale": row["rationale"],
"context_snapshot": json.loads(row["context_snapshot"]),
"input_data": json.loads(row["input_data"]),
"outcome_pnl": row["outcome_pnl"],
"outcome_accuracy": row["outcome_accuracy"],
}
)
return {"market": market, "count": len(decisions), "decisions": decisions}
@app.get("/api/scenarios/active")
def get_active_scenarios(
market: str = Query("US"),
date_str: str | None = Query(default=None),
limit: int = Query(default=50, ge=1, le=500),
) -> dict[str, Any]:
if date_str is None:
date_str = datetime.now(UTC).date().isoformat()
with _connect(db_path) as conn:
rows = conn.execute(
"""
SELECT timestamp, stock_code, action, confidence, rationale, context_snapshot
FROM decision_logs
WHERE market = ? AND DATE(timestamp) = ?
ORDER BY timestamp DESC
LIMIT ?
""",
(market, date_str, limit),
).fetchall()
matches: list[dict[str, Any]] = []
for row in rows:
snapshot = json.loads(row["context_snapshot"])
scenario_match = snapshot.get("scenario_match", {})
if not isinstance(scenario_match, dict) or not scenario_match:
continue
matches.append(
{
"timestamp": row["timestamp"],
"stock_code": row["stock_code"],
"action": row["action"],
"confidence": row["confidence"],
"rationale": row["rationale"],
"scenario_match": scenario_match,
}
)
return {"market": market, "date": date_str, "count": len(matches), "matches": matches}
return app
def _connect(db_path: str) -> sqlite3.Connection:
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
return conn
def _row_to_performance(row: sqlite3.Row) -> dict[str, Any]:
wins = int(row["wins"] or 0)
losses = int(row["losses"] or 0)
total = int(row["total_trades"] or 0)
win_rate = round((wins / (wins + losses) * 100), 2) if (wins + losses) > 0 else 0.0
return {
"market": row["market"],
"total_trades": total,
"wins": wins,
"losses": losses,
"win_rate": win_rate,
"total_pnl": round(float(row["total_pnl"] or 0.0), 2),
"avg_confidence": round(float(row["avg_confidence"] or 0.0), 2),
}
def _performance_from_rows(rows: list[sqlite3.Row]) -> dict[str, Any]:
total_trades = 0
wins = 0
losses = 0
total_pnl = 0.0
confidence_weighted = 0.0
for row in rows:
market_total = int(row["total_trades"] or 0)
market_conf = float(row["avg_confidence"] or 0.0)
total_trades += market_total
wins += int(row["wins"] or 0)
losses += int(row["losses"] or 0)
total_pnl += float(row["total_pnl"] or 0.0)
confidence_weighted += market_total * market_conf
win_rate = round((wins / (wins + losses) * 100), 2) if (wins + losses) > 0 else 0.0
avg_confidence = round(confidence_weighted / total_trades, 2) if total_trades > 0 else 0.0
return {
"market": "all",
"total_trades": total_trades,
"wins": wins,
"losses": losses,
"win_rate": win_rate,
"total_pnl": round(total_pnl, 2),
"avg_confidence": avg_confidence,
}
def _empty_performance(market: str) -> dict[str, Any]:
return {
"market": market,
"total_trades": 0,
"wins": 0,
"losses": 0,
"win_rate": 0.0,
"total_pnl": 0.0,
"avg_confidence": 0.0,
}

View File

@@ -0,0 +1,61 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>The Ouroboros Dashboard</title>
<style>
:root {
--bg: #0b1724;
--panel: #12263a;
--fg: #e6eef7;
--muted: #9fb3c8;
--accent: #3cb371;
}
body {
margin: 0;
font-family: ui-monospace, SFMono-Regular, Menlo, monospace;
background: radial-gradient(circle at top left, #173b58, var(--bg));
color: var(--fg);
}
.wrap {
max-width: 900px;
margin: 48px auto;
padding: 0 16px;
}
.card {
background: color-mix(in oklab, var(--panel), black 12%);
border: 1px solid #28455f;
border-radius: 12px;
padding: 20px;
}
h1 {
margin-top: 0;
}
code {
color: var(--accent);
}
li {
margin: 6px 0;
color: var(--muted);
}
</style>
</head>
<body>
<div class="wrap">
<div class="card">
<h1>The Ouroboros Dashboard API</h1>
<p>Use the following endpoints:</p>
<ul>
<li><code>/api/status</code></li>
<li><code>/api/playbook/{date}?market=KR</code></li>
<li><code>/api/scorecard/{date}?market=KR</code></li>
<li><code>/api/performance?market=all</code></li>
<li><code>/api/context/{layer}</code></li>
<li><code>/api/decisions?market=KR</code></li>
<li><code>/api/scenarios/active?market=US</code></li>
</ul>
</div>
</div>
</body>
</html>

View File

@@ -214,3 +214,42 @@ def get_latest_buy_trade(
if not row: if not row:
return None return None
return {"decision_id": row[0], "price": row[1], "quantity": row[2]} return {"decision_id": row[0], "price": row[1], "quantity": row[2]}
def get_open_position(
conn: sqlite3.Connection, stock_code: str, market: str
) -> dict[str, Any] | None:
"""Return open position if latest trade is BUY, else None."""
cursor = conn.execute(
"""
SELECT action, decision_id, price, quantity
FROM trades
WHERE stock_code = ?
AND market = ?
ORDER BY timestamp DESC
LIMIT 1
""",
(stock_code, market),
)
row = cursor.fetchone()
if not row or row[0] != "BUY":
return None
return {"decision_id": row[1], "price": row[2], "quantity": row[3]}
def get_recent_symbols(
conn: sqlite3.Connection, market: str, limit: int = 30
) -> list[str]:
"""Return recent unique symbols for a market, newest first."""
cursor = conn.execute(
"""
SELECT stock_code, MAX(timestamp) AS last_ts
FROM trades
WHERE market = ?
GROUP BY stock_code
ORDER BY last_ts DESC
LIMIT ?
""",
(market, limit),
)
return [row[0] for row in cursor.fetchall() if row and row[0]]

View File

@@ -8,8 +8,10 @@ from __future__ import annotations
import argparse import argparse
import asyncio import asyncio
import json
import logging import logging
import signal import signal
import threading
from datetime import UTC, datetime from datetime import UTC, datetime
from typing import Any from typing import Any
@@ -27,7 +29,13 @@ from src.context.store import ContextStore
from src.core.criticality import CriticalityAssessor from src.core.criticality import CriticalityAssessor
from src.core.priority_queue import PriorityTaskQueue from src.core.priority_queue import PriorityTaskQueue
from src.core.risk_manager import CircuitBreakerTripped, FatFingerRejected, RiskManager from src.core.risk_manager import CircuitBreakerTripped, FatFingerRejected, RiskManager
from src.db import get_latest_buy_trade, init_db, log_trade from src.db import (
get_latest_buy_trade,
get_open_position,
get_recent_symbols,
init_db,
log_trade,
)
from src.evolution.daily_review import DailyReviewer from src.evolution.daily_review import DailyReviewer
from src.evolution.optimizer import EvolutionOptimizer from src.evolution.optimizer import EvolutionOptimizer
from src.logging.decision_logger import DecisionLogger from src.logging.decision_logger import DecisionLogger
@@ -79,6 +87,102 @@ DAILY_TRADE_SESSIONS = 4 # Number of trading sessions per day
TRADE_SESSION_INTERVAL_HOURS = 6 # Hours between sessions TRADE_SESSION_INTERVAL_HOURS = 6 # Hours between sessions
def _extract_symbol_from_holding(item: dict[str, Any]) -> str:
"""Extract symbol from overseas holding payload variants."""
for key in (
"ovrs_pdno",
"pdno",
"ovrs_item_name",
"prdt_name",
"symb",
"symbol",
"stock_code",
):
value = item.get(key)
if isinstance(value, str):
symbol = value.strip().upper()
if symbol and symbol.replace(".", "").replace("-", "").isalnum():
return symbol
return ""
def _determine_order_quantity(
*,
action: str,
current_price: float,
total_cash: float,
candidate: ScanCandidate | None,
settings: Settings | None,
) -> int:
"""Determine order quantity using volatility-aware position sizing."""
if action != "BUY":
return 1
if current_price <= 0 or total_cash <= 0:
return 0
if settings is None or not settings.POSITION_SIZING_ENABLED:
return 1
target_score = max(1.0, settings.POSITION_VOLATILITY_TARGET_SCORE)
observed_score = candidate.score if candidate else target_score
observed_score = max(1.0, min(100.0, observed_score))
# Higher observed volatility score => smaller allocation.
scaled_pct = settings.POSITION_BASE_ALLOCATION_PCT * (target_score / observed_score)
allocation_pct = min(
settings.POSITION_MAX_ALLOCATION_PCT,
max(settings.POSITION_MIN_ALLOCATION_PCT, scaled_pct),
)
budget = total_cash * (allocation_pct / 100.0)
quantity = int(budget // current_price)
if quantity <= 0:
return 0
return quantity
async def build_overseas_symbol_universe(
db_conn: Any,
overseas_broker: OverseasBroker,
market: MarketInfo,
active_stocks: dict[str, list[str]],
) -> list[str]:
"""Build dynamic overseas symbol universe from runtime, DB, and holdings."""
symbols: list[str] = []
# 1) Keep current active stocks first to avoid sudden churn between cycles.
symbols.extend(active_stocks.get(market.code, []))
# 2) Add recent symbols from own trading history (no fixed list).
symbols.extend(get_recent_symbols(db_conn, market.code, limit=30))
# 3) Add current overseas holdings from broker balance if available.
try:
balance_data = await overseas_broker.get_overseas_balance(market.exchange_code)
output1 = balance_data.get("output1", [])
if isinstance(output1, dict):
output1 = [output1]
if isinstance(output1, list):
for row in output1:
if not isinstance(row, dict):
continue
symbol = _extract_symbol_from_holding(row)
if symbol:
symbols.append(symbol)
except Exception as exc:
logger.warning("Failed to build overseas holdings universe for %s: %s", market.code, exc)
seen: set[str] = set()
ordered_unique: list[str] = []
for symbol in symbols:
normalized = symbol.strip().upper()
if not normalized or normalized in seen:
continue
seen.add(normalized)
ordered_unique.append(normalized)
return ordered_unique
async def trading_cycle( async def trading_cycle(
broker: KISBroker, broker: KISBroker,
overseas_broker: OverseasBroker, overseas_broker: OverseasBroker,
@@ -93,6 +197,7 @@ async def trading_cycle(
market: MarketInfo, market: MarketInfo,
stock_code: str, stock_code: str,
scan_candidates: dict[str, dict[str, ScanCandidate]], scan_candidates: dict[str, dict[str, ScanCandidate]],
settings: Settings | None = None,
) -> None: ) -> None:
"""Execute one trading cycle for a single stock.""" """Execute one trading cycle for a single stock."""
cycle_start_time = asyncio.get_event_loop().time() cycle_start_time = asyncio.get_event_loop().time()
@@ -113,6 +218,7 @@ async def trading_cycle(
current_price = safe_float(orderbook.get("output1", {}).get("stck_prpr", "0")) current_price = safe_float(orderbook.get("output1", {}).get("stck_prpr", "0"))
foreigner_net = safe_float(orderbook.get("output1", {}).get("frgn_ntby_qty", "0")) foreigner_net = safe_float(orderbook.get("output1", {}).get("frgn_ntby_qty", "0"))
price_change_pct = safe_float(orderbook.get("output1", {}).get("prdy_ctrt", "0"))
else: else:
# Overseas market # Overseas market
price_data = await overseas_broker.get_overseas_price( price_data = await overseas_broker.get_overseas_price(
@@ -135,6 +241,7 @@ async def trading_cycle(
current_price = safe_float(price_data.get("output", {}).get("last", "0")) current_price = safe_float(price_data.get("output", {}).get("last", "0"))
foreigner_net = 0.0 # Not available for overseas foreigner_net = 0.0 # Not available for overseas
price_change_pct = safe_float(price_data.get("output", {}).get("rate", "0"))
# Calculate daily P&L % # Calculate daily P&L %
pnl_pct = ( pnl_pct = (
@@ -148,6 +255,7 @@ async def trading_cycle(
"market_name": market.name, "market_name": market.name,
"current_price": current_price, "current_price": current_price,
"foreigner_net": foreigner_net, "foreigner_net": foreigner_net,
"price_change_pct": price_change_pct,
} }
# Enrich market_data with scanner metrics for scenario engine # Enrich market_data with scanner metrics for scenario engine
@@ -239,6 +347,34 @@ async def trading_cycle(
confidence=match.confidence, confidence=match.confidence,
rationale=match.rationale, rationale=match.rationale,
) )
stock_playbook = playbook.get_stock_playbook(stock_code)
if decision.action == "HOLD":
open_position = get_open_position(db_conn, stock_code, market.code)
if open_position:
entry_price = safe_float(open_position.get("price"), 0.0)
if entry_price > 0:
loss_pct = (current_price - entry_price) / entry_price * 100
stop_loss_threshold = -2.0
if stock_playbook and stock_playbook.scenarios:
stop_loss_threshold = stock_playbook.scenarios[0].stop_loss_pct
if loss_pct <= stop_loss_threshold:
decision = TradeDecision(
action="SELL",
confidence=95,
rationale=(
f"Stop-loss triggered ({loss_pct:.2f}% <= "
f"{stop_loss_threshold:.2f}%)"
),
)
logger.info(
"Stop-loss override for %s (%s): %.2f%% <= %.2f%%",
stock_code,
market.name,
loss_pct,
stop_loss_threshold,
)
logger.info( logger.info(
"Decision for %s (%s): %s (confidence=%d)", "Decision for %s (%s): %s (confidence=%d)",
stock_code, stock_code,
@@ -277,6 +413,7 @@ async def trading_cycle(
input_data = { input_data = {
"current_price": current_price, "current_price": current_price,
"foreigner_net": foreigner_net, "foreigner_net": foreigner_net,
"price_change_pct": price_change_pct,
"total_eval": total_eval, "total_eval": total_eval,
"total_cash": total_cash, "total_cash": total_cash,
"pnl_pct": pnl_pct, "pnl_pct": pnl_pct,
@@ -298,8 +435,23 @@ async def trading_cycle(
trade_price = current_price trade_price = current_price
trade_pnl = 0.0 trade_pnl = 0.0
if decision.action in ("BUY", "SELL"): if decision.action in ("BUY", "SELL"):
# Determine order size (simplified: 1 lot) quantity = _determine_order_quantity(
quantity = 1 action=decision.action,
current_price=current_price,
total_cash=total_cash,
candidate=candidate,
settings=settings,
)
if quantity <= 0:
logger.info(
"Skip %s %s (%s): no affordable quantity (cash=%.2f, price=%.2f)",
decision.action,
stock_code,
market.name,
total_cash,
current_price,
)
return
order_amount = current_price * quantity order_amount = current_price * quantity
# 4. Risk check BEFORE order # 4. Risk check BEFORE order
@@ -448,8 +600,28 @@ async def run_daily_session(
# Dynamic stock discovery via scanner (no static watchlists) # Dynamic stock discovery via scanner (no static watchlists)
candidates_list: list[ScanCandidate] = [] candidates_list: list[ScanCandidate] = []
fallback_stocks: list[str] | None = None
if not market.is_domestic:
fallback_stocks = await build_overseas_symbol_universe(
db_conn=db_conn,
overseas_broker=overseas_broker,
market=market,
active_stocks={},
)
if not fallback_stocks:
logger.warning(
"No dynamic overseas symbol universe for %s; scanner cannot run",
market.code,
)
try: try:
candidates_list = await smart_scanner.scan() if smart_scanner else [] candidates_list = (
await smart_scanner.scan(
market=market,
fallback_stocks=fallback_stocks,
)
if smart_scanner
else []
)
except Exception as exc: except Exception as exc:
logger.error("Smart Scanner failed for %s: %s", market.name, exc) logger.error("Smart Scanner failed for %s: %s", market.name, exc)
@@ -506,6 +678,9 @@ async def run_daily_session(
foreigner_net = safe_float( foreigner_net = safe_float(
orderbook.get("output1", {}).get("frgn_ntby_qty", "0") orderbook.get("output1", {}).get("frgn_ntby_qty", "0")
) )
price_change_pct = safe_float(
orderbook.get("output1", {}).get("prdy_ctrt", "0")
)
else: else:
price_data = await overseas_broker.get_overseas_price( price_data = await overseas_broker.get_overseas_price(
market.exchange_code, stock_code market.exchange_code, stock_code
@@ -514,12 +689,16 @@ async def run_daily_session(
price_data.get("output", {}).get("last", "0") price_data.get("output", {}).get("last", "0")
) )
foreigner_net = 0.0 foreigner_net = 0.0
price_change_pct = safe_float(
price_data.get("output", {}).get("rate", "0")
)
stock_data: dict[str, Any] = { stock_data: dict[str, Any] = {
"stock_code": stock_code, "stock_code": stock_code,
"market_name": market.name, "market_name": market.name,
"current_price": current_price, "current_price": current_price,
"foreigner_net": foreigner_net, "foreigner_net": foreigner_net,
"price_change_pct": price_change_pct,
} }
# Enrich with scanner metrics # Enrich with scanner metrics
cand = candidate_map.get(stock_code) cand = candidate_map.get(stock_code)
@@ -638,7 +817,23 @@ async def run_daily_session(
trade_price = stock_data["current_price"] trade_price = stock_data["current_price"]
trade_pnl = 0.0 trade_pnl = 0.0
if decision.action in ("BUY", "SELL"): if decision.action in ("BUY", "SELL"):
quantity = 1 quantity = _determine_order_quantity(
action=decision.action,
current_price=stock_data["current_price"],
total_cash=total_cash,
candidate=candidate_map.get(stock_code),
settings=settings,
)
if quantity <= 0:
logger.info(
"Skip %s %s (%s): no affordable quantity (cash=%.2f, price=%.2f)",
decision.action,
stock_code,
market.name,
total_cash,
stock_data["current_price"],
)
continue
order_amount = stock_data["current_price"] * quantity order_amount = stock_data["current_price"] * quantity
# Risk check # Risk check
@@ -819,7 +1014,7 @@ async def _run_evolution_loop(
market_date: str, market_date: str,
) -> None: ) -> None:
"""Run evolution loop once at US close (end of trading day).""" """Run evolution loop once at US close (end of trading day)."""
if market_code != "US": if not market_code.startswith("US"):
return return
try: try:
@@ -832,13 +1027,58 @@ async def _run_evolution_loop(
logger.info("Evolution loop skipped on %s (no actionable failures)", market_date) logger.info("Evolution loop skipped on %s (no actionable failures)", market_date)
return return
await telegram.send_message( try:
"<b>Evolution Update</b>\n" await telegram.send_message(
f"Date: {market_date}\n" "<b>Evolution Update</b>\n"
f"PR: {pr_info.get('title', 'N/A')}\n" f"Date: {market_date}\n"
f"Branch: {pr_info.get('branch', 'N/A')}\n" f"PR: {pr_info.get('title', 'N/A')}\n"
f"Status: {pr_info.get('status', 'N/A')}" f"Branch: {pr_info.get('branch', 'N/A')}\n"
f"Status: {pr_info.get('status', 'N/A')}"
)
except Exception as exc:
logger.warning("Evolution notification failed on %s: %s", market_date, exc)
def _start_dashboard_server(settings: Settings) -> threading.Thread | None:
"""Start FastAPI dashboard in a daemon thread when enabled."""
if not settings.DASHBOARD_ENABLED:
return None
def _serve() -> None:
try:
import uvicorn
from src.dashboard import create_dashboard_app
app = create_dashboard_app(settings.DB_PATH)
uvicorn.run(
app,
host=settings.DASHBOARD_HOST,
port=settings.DASHBOARD_PORT,
log_level="info",
)
except Exception as exc:
logger.warning("Dashboard server failed to start: %s", exc)
thread = threading.Thread(
target=_serve,
name="dashboard-server",
daemon=True,
) )
thread.start()
logger.info(
"Dashboard server started at http://%s:%d",
settings.DASHBOARD_HOST,
settings.DASHBOARD_PORT,
)
return thread
def _apply_dashboard_flag(settings: Settings, dashboard_flag: bool) -> Settings:
"""Apply CLI dashboard flag over environment settings."""
if dashboard_flag and not settings.DASHBOARD_ENABLED:
return settings.model_copy(update={"DASHBOARD_ENABLED": True})
return settings
async def run(settings: Settings) -> None: async def run(settings: Settings) -> None:
@@ -890,6 +1130,10 @@ async def run(settings: Settings) -> None:
"/help - Show available commands\n" "/help - Show available commands\n"
"/status - Trading status (mode, markets, P&L)\n" "/status - Trading status (mode, markets, P&L)\n"
"/positions - Current holdings\n" "/positions - Current holdings\n"
"/report - Daily summary report\n"
"/scenarios - Today's playbook scenarios\n"
"/review - Recent scorecards\n"
"/dashboard - Dashboard URL/status\n"
"/stop - Pause trading\n" "/stop - Pause trading\n"
"/resume - Resume trading" "/resume - Resume trading"
) )
@@ -1009,17 +1253,171 @@ async def run(settings: Settings) -> None:
"<b>⚠️ Error</b>\n\nFailed to retrieve positions." "<b>⚠️ Error</b>\n\nFailed to retrieve positions."
) )
async def handle_report() -> None:
"""Handle /report command - show daily summary metrics."""
try:
today = datetime.now(UTC).date().isoformat()
trade_row = db_conn.execute(
"""
SELECT COUNT(*) AS trade_count,
COALESCE(SUM(pnl), 0.0) AS total_pnl,
SUM(CASE WHEN pnl > 0 THEN 1 ELSE 0 END) AS wins
FROM trades
WHERE DATE(timestamp) = ?
""",
(today,),
).fetchone()
decision_row = db_conn.execute(
"""
SELECT COUNT(*) AS decision_count,
COALESCE(AVG(confidence), 0.0) AS avg_confidence
FROM decision_logs
WHERE DATE(timestamp) = ?
""",
(today,),
).fetchone()
trade_count = int(trade_row[0] if trade_row else 0)
total_pnl = float(trade_row[1] if trade_row else 0.0)
wins = int(trade_row[2] if trade_row and trade_row[2] is not None else 0)
decision_count = int(decision_row[0] if decision_row else 0)
avg_confidence = float(decision_row[1] if decision_row else 0.0)
win_rate = (wins / trade_count * 100.0) if trade_count > 0 else 0.0
await telegram.send_message(
"<b>📈 Daily Report</b>\n\n"
f"<b>Date:</b> {today}\n"
f"<b>Trades:</b> {trade_count}\n"
f"<b>Total P&L:</b> {total_pnl:+.2f}\n"
f"<b>Win Rate:</b> {win_rate:.2f}%\n"
f"<b>Decisions:</b> {decision_count}\n"
f"<b>Avg Confidence:</b> {avg_confidence:.2f}"
)
except Exception as exc:
logger.error("Error in /report handler: %s", exc)
await telegram.send_message(
"<b>⚠️ Error</b>\n\nFailed to generate daily report."
)
async def handle_scenarios() -> None:
"""Handle /scenarios command - show today's playbook scenarios."""
try:
today = datetime.now(UTC).date().isoformat()
rows = db_conn.execute(
"""
SELECT market, playbook_json
FROM playbooks
WHERE date = ?
ORDER BY market
""",
(today,),
).fetchall()
if not rows:
await telegram.send_message(
"<b>🧠 Today's Scenarios</b>\n\nNo playbooks found for today."
)
return
lines = ["<b>🧠 Today's Scenarios</b>", ""]
for market, playbook_json in rows:
lines.append(f"<b>{market}</b>")
playbook_data = {}
try:
playbook_data = json.loads(playbook_json)
except Exception:
playbook_data = {}
stock_playbooks = playbook_data.get("stock_playbooks", [])
if not stock_playbooks:
lines.append("- No scenarios")
lines.append("")
continue
for stock_pb in stock_playbooks:
stock_code = stock_pb.get("stock_code", "N/A")
scenarios = stock_pb.get("scenarios", [])
for sc in scenarios:
action = sc.get("action", "HOLD")
confidence = sc.get("confidence", 0)
lines.append(f"- {stock_code}: {action} ({confidence})")
lines.append("")
await telegram.send_message("\n".join(lines).strip())
except Exception as exc:
logger.error("Error in /scenarios handler: %s", exc)
await telegram.send_message(
"<b>⚠️ Error</b>\n\nFailed to retrieve scenarios."
)
async def handle_review() -> None:
"""Handle /review command - show recent scorecards."""
try:
rows = db_conn.execute(
"""
SELECT timeframe, key, value
FROM contexts
WHERE layer = 'L6_DAILY' AND key LIKE 'scorecard_%'
ORDER BY updated_at DESC
LIMIT 5
"""
).fetchall()
if not rows:
await telegram.send_message(
"<b>📝 Recent Reviews</b>\n\nNo scorecards available."
)
return
lines = ["<b>📝 Recent Reviews</b>", ""]
for timeframe, key, value in rows:
scorecard = json.loads(value)
market = key.replace("scorecard_", "")
total_pnl = float(scorecard.get("total_pnl", 0.0))
win_rate = float(scorecard.get("win_rate", 0.0))
decisions = int(scorecard.get("total_decisions", 0))
lines.append(
f"- {timeframe} {market}: P&L {total_pnl:+.2f}, "
f"Win {win_rate:.2f}%, Decisions {decisions}"
)
await telegram.send_message("\n".join(lines))
except Exception as exc:
logger.error("Error in /review handler: %s", exc)
await telegram.send_message(
"<b>⚠️ Error</b>\n\nFailed to retrieve reviews."
)
async def handle_dashboard() -> None:
"""Handle /dashboard command - show dashboard URL if enabled."""
if not settings.DASHBOARD_ENABLED:
await telegram.send_message(
"<b>🖥️ Dashboard</b>\n\nDashboard is not enabled."
)
return
url = f"http://{settings.DASHBOARD_HOST}:{settings.DASHBOARD_PORT}"
await telegram.send_message(
"<b>🖥️ Dashboard</b>\n\n"
f"<b>URL:</b> {url}"
)
command_handler.register_command("help", handle_help) command_handler.register_command("help", handle_help)
command_handler.register_command("stop", handle_stop) command_handler.register_command("stop", handle_stop)
command_handler.register_command("resume", handle_resume) command_handler.register_command("resume", handle_resume)
command_handler.register_command("status", handle_status) command_handler.register_command("status", handle_status)
command_handler.register_command("positions", handle_positions) command_handler.register_command("positions", handle_positions)
command_handler.register_command("report", handle_report)
command_handler.register_command("scenarios", handle_scenarios)
command_handler.register_command("review", handle_review)
command_handler.register_command("dashboard", handle_dashboard)
# Initialize volatility hunter # Initialize volatility hunter
volatility_analyzer = VolatilityAnalyzer(min_volume_surge=2.0, min_price_change=1.0) volatility_analyzer = VolatilityAnalyzer(min_volume_surge=2.0, min_price_change=1.0)
# Initialize smart scanner (Python-first, AI-last pipeline) # Initialize smart scanner (Python-first, AI-last pipeline)
smart_scanner = SmartVolatilityScanner( smart_scanner = SmartVolatilityScanner(
broker=broker, broker=broker,
overseas_broker=overseas_broker,
volatility_analyzer=volatility_analyzer, volatility_analyzer=volatility_analyzer,
settings=settings, settings=settings,
) )
@@ -1039,6 +1437,7 @@ async def run(settings: Settings) -> None:
low_volatility_threshold=30.0, low_volatility_threshold=30.0,
) )
priority_queue = PriorityTaskQueue(max_size=1000) priority_queue = PriorityTaskQueue(max_size=1000)
_start_dashboard_server(settings)
# Track last scan time for each market # Track last scan time for each market
last_scan_time: dict[str, float] = {} last_scan_time: dict[str, float] = {}
@@ -1198,7 +1597,25 @@ async def run(settings: Settings) -> None:
try: try:
logger.info("Smart Scanner: Scanning %s market", market.name) logger.info("Smart Scanner: Scanning %s market", market.name)
candidates = await smart_scanner.scan() fallback_stocks: list[str] | None = None
if not market.is_domestic:
fallback_stocks = await build_overseas_symbol_universe(
db_conn=db_conn,
overseas_broker=overseas_broker,
market=market,
active_stocks=active_stocks,
)
if not fallback_stocks:
logger.warning(
"No dynamic overseas symbol universe for %s;"
" scanner cannot run",
market.code,
)
candidates = await smart_scanner.scan(
market=market,
fallback_stocks=fallback_stocks,
)
if candidates: if candidates:
# Use scanner results directly as trading candidates # Use scanner results directly as trading candidates
@@ -1322,6 +1739,7 @@ async def run(settings: Settings) -> None:
market, market,
stock_code, stock_code,
scan_candidates, scan_candidates,
settings,
) )
break # Success — exit retry loop break # Success — exit retry loop
except CircuitBreakerTripped as exc: except CircuitBreakerTripped as exc:
@@ -1392,10 +1810,16 @@ def main() -> None:
default="paper", default="paper",
help="Trading mode (default: paper)", help="Trading mode (default: paper)",
) )
parser.add_argument(
"--dashboard",
action="store_true",
help="Enable FastAPI dashboard server in background thread",
)
args = parser.parse_args() args = parser.parse_args()
setup_logging() setup_logging()
settings = Settings(MODE=args.mode) # type: ignore[call-arg] settings = Settings(MODE=args.mode) # type: ignore[call-arg]
settings = _apply_dashboard_flag(settings, args.dashboard)
asyncio.run(run(settings)) asyncio.run(run(settings))

View File

@@ -123,6 +123,23 @@ MARKETS: dict[str, MarketInfo] = {
), ),
} }
MARKET_SHORTHAND: dict[str, list[str]] = {
"US": ["US_NASDAQ", "US_NYSE", "US_AMEX"],
"CN": ["CN_SHA", "CN_SZA"],
"VN": ["VN_HAN", "VN_HCM"],
}
def expand_market_codes(codes: list[str]) -> list[str]:
"""Expand shorthand market codes into concrete exchange market codes."""
expanded: list[str] = []
for code in codes:
if code in MARKET_SHORTHAND:
expanded.extend(MARKET_SHORTHAND[code])
else:
expanded.append(code)
return expanded
def is_market_open(market: MarketInfo, now: datetime | None = None) -> bool: def is_market_open(market: MarketInfo, now: datetime | None = None) -> bool:
""" """

View File

@@ -16,6 +16,10 @@ from src.evolution.daily_review import DailyReviewer
from src.evolution.scorecard import DailyScorecard from src.evolution.scorecard import DailyScorecard
from src.logging.decision_logger import DecisionLogger from src.logging.decision_logger import DecisionLogger
from datetime import UTC, datetime
TODAY = datetime.now(UTC).strftime("%Y-%m-%d")
@pytest.fixture @pytest.fixture
def db_conn() -> sqlite3.Connection: def db_conn() -> sqlite3.Connection:
@@ -116,7 +120,7 @@ def test_generate_scorecard_market_scoped(
exchange_code="NASDAQ", exchange_code="NASDAQ",
) )
scorecard = reviewer.generate_scorecard("2026-02-14", "KR") scorecard = reviewer.generate_scorecard(TODAY, "KR")
assert scorecard.market == "KR" assert scorecard.market == "KR"
assert scorecard.total_decisions == 2 assert scorecard.total_decisions == 2
@@ -158,7 +162,7 @@ def test_generate_scorecard_top_winners_and_losers(
decision_id=decision_id, decision_id=decision_id,
) )
scorecard = reviewer.generate_scorecard("2026-02-14", "KR") scorecard = reviewer.generate_scorecard(TODAY, "KR")
assert scorecard.top_winners == ["005930", "000660"] assert scorecard.top_winners == ["005930", "000660"]
assert scorecard.top_losers == ["035420", "051910"] assert scorecard.top_losers == ["035420", "051910"]
@@ -167,7 +171,7 @@ def test_generate_scorecard_empty_day(
db_conn: sqlite3.Connection, context_store: ContextStore, db_conn: sqlite3.Connection, context_store: ContextStore,
) -> None: ) -> None:
reviewer = DailyReviewer(db_conn, context_store) reviewer = DailyReviewer(db_conn, context_store)
scorecard = reviewer.generate_scorecard("2026-02-14", "KR") scorecard = reviewer.generate_scorecard(TODAY, "KR")
assert scorecard.total_decisions == 0 assert scorecard.total_decisions == 0
assert scorecard.total_pnl == 0.0 assert scorecard.total_pnl == 0.0

298
tests/test_dashboard.py Normal file
View File

@@ -0,0 +1,298 @@
"""Tests for dashboard endpoint handlers."""
from __future__ import annotations
import json
import sqlite3
from collections.abc import Callable
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
import pytest
from fastapi import HTTPException
from fastapi.responses import FileResponse
from src.dashboard.app import create_dashboard_app
from src.db import init_db
def _seed_db(conn: sqlite3.Connection) -> None:
today = datetime.now(UTC).date().isoformat()
conn.execute(
"""
INSERT INTO playbooks (
date, market, status, playbook_json, generated_at,
token_count, scenario_count, match_count
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"2026-02-14",
"KR",
"ready",
json.dumps({"market": "KR", "stock_playbooks": []}),
"2026-02-14T08:30:00+00:00",
123,
2,
1,
),
)
conn.execute(
"""
INSERT INTO playbooks (
date, market, status, playbook_json, generated_at,
token_count, scenario_count, match_count
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
today,
"US_NASDAQ",
"ready",
json.dumps({"market": "US_NASDAQ", "stock_playbooks": []}),
f"{today}T08:30:00+00:00",
100,
1,
0,
),
)
conn.execute(
"""
INSERT INTO contexts (layer, timeframe, key, value, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
"L6_DAILY",
"2026-02-14",
"scorecard_KR",
json.dumps({"market": "KR", "total_pnl": 1.5, "win_rate": 60.0}),
"2026-02-14T15:30:00+00:00",
"2026-02-14T15:30:00+00:00",
),
)
conn.execute(
"""
INSERT INTO contexts (layer, timeframe, key, value, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
"L7_REALTIME",
"2026-02-14T10:00:00+00:00",
"volatility_KR_005930",
json.dumps({"momentum_score": 70.0}),
"2026-02-14T10:00:00+00:00",
"2026-02-14T10:00:00+00:00",
),
)
conn.execute(
"""
INSERT INTO decision_logs (
decision_id, timestamp, stock_code, market, exchange_code,
action, confidence, rationale, context_snapshot, input_data
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"d-kr-1",
f"{today}T09:10:00+00:00",
"005930",
"KR",
"KRX",
"BUY",
85,
"signal matched",
json.dumps({"scenario_match": {"rsi": 28.0}}),
json.dumps({"current_price": 70000}),
),
)
conn.execute(
"""
INSERT INTO decision_logs (
decision_id, timestamp, stock_code, market, exchange_code,
action, confidence, rationale, context_snapshot, input_data
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"d-us-1",
f"{today}T21:10:00+00:00",
"AAPL",
"US_NASDAQ",
"NASDAQ",
"SELL",
80,
"no match",
json.dumps({"scenario_match": {}}),
json.dumps({"current_price": 200}),
),
)
conn.execute(
"""
INSERT INTO trades (
timestamp, stock_code, action, confidence, rationale,
quantity, price, pnl, market, exchange_code, selection_context, decision_id
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
f"{today}T09:11:00+00:00",
"005930",
"BUY",
85,
"buy",
1,
70000,
2.0,
"KR",
"KRX",
None,
"d-kr-1",
),
)
conn.execute(
"""
INSERT INTO trades (
timestamp, stock_code, action, confidence, rationale,
quantity, price, pnl, market, exchange_code, selection_context, decision_id
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
f"{today}T21:11:00+00:00",
"AAPL",
"SELL",
80,
"sell",
1,
200,
-1.0,
"US_NASDAQ",
"NASDAQ",
None,
"d-us-1",
),
)
conn.commit()
def _app(tmp_path: Path) -> Any:
db_path = tmp_path / "dashboard_test.db"
conn = init_db(str(db_path))
_seed_db(conn)
conn.close()
return create_dashboard_app(str(db_path))
def _endpoint(app: Any, path: str) -> Callable[..., Any]:
for route in app.routes:
if getattr(route, "path", None) == path:
return route.endpoint
raise AssertionError(f"route not found: {path}")
def test_index_serves_html(tmp_path: Path) -> None:
app = _app(tmp_path)
index = _endpoint(app, "/")
resp = index()
assert isinstance(resp, FileResponse)
assert "index.html" in str(resp.path)
def test_status_endpoint(tmp_path: Path) -> None:
app = _app(tmp_path)
get_status = _endpoint(app, "/api/status")
body = get_status()
assert "KR" in body["markets"]
assert "US_NASDAQ" in body["markets"]
assert "totals" in body
def test_playbook_found(tmp_path: Path) -> None:
app = _app(tmp_path)
get_playbook = _endpoint(app, "/api/playbook/{date_str}")
body = get_playbook("2026-02-14", market="KR")
assert body["market"] == "KR"
def test_playbook_not_found(tmp_path: Path) -> None:
app = _app(tmp_path)
get_playbook = _endpoint(app, "/api/playbook/{date_str}")
with pytest.raises(HTTPException, match="playbook not found"):
get_playbook("2026-02-15", market="KR")
def test_scorecard_found(tmp_path: Path) -> None:
app = _app(tmp_path)
get_scorecard = _endpoint(app, "/api/scorecard/{date_str}")
body = get_scorecard("2026-02-14", market="KR")
assert body["scorecard"]["total_pnl"] == 1.5
def test_scorecard_not_found(tmp_path: Path) -> None:
app = _app(tmp_path)
get_scorecard = _endpoint(app, "/api/scorecard/{date_str}")
with pytest.raises(HTTPException, match="scorecard not found"):
get_scorecard("2026-02-15", market="KR")
def test_performance_all(tmp_path: Path) -> None:
app = _app(tmp_path)
get_performance = _endpoint(app, "/api/performance")
body = get_performance(market="all")
assert body["market"] == "all"
assert body["combined"]["total_trades"] == 2
assert len(body["by_market"]) == 2
def test_performance_market_filter(tmp_path: Path) -> None:
app = _app(tmp_path)
get_performance = _endpoint(app, "/api/performance")
body = get_performance(market="KR")
assert body["market"] == "KR"
assert body["metrics"]["total_trades"] == 1
def test_performance_empty_market(tmp_path: Path) -> None:
app = _app(tmp_path)
get_performance = _endpoint(app, "/api/performance")
body = get_performance(market="JP")
assert body["metrics"]["total_trades"] == 0
def test_context_layer_all(tmp_path: Path) -> None:
app = _app(tmp_path)
get_context_layer = _endpoint(app, "/api/context/{layer}")
body = get_context_layer("L7_REALTIME", timeframe=None, limit=100)
assert body["layer"] == "L7_REALTIME"
assert body["count"] == 1
def test_context_layer_timeframe_filter(tmp_path: Path) -> None:
app = _app(tmp_path)
get_context_layer = _endpoint(app, "/api/context/{layer}")
body = get_context_layer("L6_DAILY", timeframe="2026-02-14", limit=100)
assert body["count"] == 1
assert body["entries"][0]["key"] == "scorecard_KR"
def test_decisions_endpoint(tmp_path: Path) -> None:
app = _app(tmp_path)
get_decisions = _endpoint(app, "/api/decisions")
body = get_decisions(market="KR", limit=50)
assert body["count"] == 1
assert body["decisions"][0]["decision_id"] == "d-kr-1"
def test_scenarios_active_filters_non_matched(tmp_path: Path) -> None:
app = _app(tmp_path)
get_active_scenarios = _endpoint(app, "/api/scenarios/active")
body = get_active_scenarios(
market="KR",
date_str=datetime.now(UTC).date().isoformat(),
limit=50,
)
assert body["count"] == 1
assert body["matches"][0]["stock_code"] == "005930"
def test_scenarios_active_empty_when_no_matches(tmp_path: Path) -> None:
app = _app(tmp_path)
get_active_scenarios = _endpoint(app, "/api/scenarios/active")
body = get_active_scenarios(market="US", date_str="2026-02-14", limit=50)
assert body["count"] == 0

60
tests/test_db.py Normal file
View File

@@ -0,0 +1,60 @@
"""Tests for database helper functions."""
from src.db import get_open_position, init_db, log_trade
def test_get_open_position_returns_latest_buy() -> None:
conn = init_db(":memory:")
log_trade(
conn=conn,
stock_code="005930",
action="BUY",
confidence=90,
rationale="entry",
quantity=2,
price=70000.0,
market="KR",
exchange_code="KRX",
decision_id="d-buy-1",
)
position = get_open_position(conn, "005930", "KR")
assert position is not None
assert position["decision_id"] == "d-buy-1"
assert position["price"] == 70000.0
assert position["quantity"] == 2
def test_get_open_position_returns_none_when_latest_is_sell() -> None:
conn = init_db(":memory:")
log_trade(
conn=conn,
stock_code="005930",
action="BUY",
confidence=90,
rationale="entry",
quantity=1,
price=70000.0,
market="KR",
exchange_code="KRX",
decision_id="d-buy-1",
)
log_trade(
conn=conn,
stock_code="005930",
action="SELL",
confidence=95,
rationale="exit",
quantity=1,
price=71000.0,
market="KR",
exchange_code="KRX",
decision_id="d-sell-1",
)
assert get_open_position(conn, "005930", "KR") is None
def test_get_open_position_returns_none_when_no_trades() -> None:
conn = init_db(":memory:")
assert get_open_position(conn, "AAPL", "US_NASDAQ") is None

View File

@@ -5,6 +5,7 @@ from unittest.mock import ANY, AsyncMock, MagicMock, patch
import pytest import pytest
from src.config import Settings
from src.context.layer import ContextLayer from src.context.layer import ContextLayer
from src.context.scheduler import ScheduleResult from src.context.scheduler import ScheduleResult
from src.core.risk_manager import CircuitBreakerTripped, FatFingerRejected from src.core.risk_manager import CircuitBreakerTripped, FatFingerRejected
@@ -12,9 +13,11 @@ from src.db import init_db, log_trade
from src.evolution.scorecard import DailyScorecard from src.evolution.scorecard import DailyScorecard
from src.logging.decision_logger import DecisionLogger from src.logging.decision_logger import DecisionLogger
from src.main import ( from src.main import (
_apply_dashboard_flag,
_handle_market_close, _handle_market_close,
_run_context_scheduler, _run_context_scheduler,
_run_evolution_loop, _run_evolution_loop,
_start_dashboard_server,
safe_float, safe_float,
trading_cycle, trading_cycle,
) )
@@ -113,6 +116,7 @@ class TestTradingCycleTelegramIntegration:
"output1": { "output1": {
"stck_prpr": "50000", "stck_prpr": "50000",
"frgn_ntby_qty": "100", "frgn_ntby_qty": "100",
"prdy_ctrt": "1.23",
} }
} }
) )
@@ -744,7 +748,7 @@ class TestScenarioEngineIntegration:
broker = MagicMock() broker = MagicMock()
broker.get_orderbook = AsyncMock( broker.get_orderbook = AsyncMock(
return_value={ return_value={
"output1": {"stck_prpr": "50000", "frgn_ntby_qty": "100"} "output1": {"stck_prpr": "50000", "frgn_ntby_qty": "100", "prdy_ctrt": "2.50"}
} }
) )
broker.get_balance = AsyncMock( broker.get_balance = AsyncMock(
@@ -827,6 +831,7 @@ class TestScenarioEngineIntegration:
assert market_data["rsi"] == 25.0 assert market_data["rsi"] == 25.0
assert market_data["volume_ratio"] == 3.5 assert market_data["volume_ratio"] == 3.5
assert market_data["current_price"] == 50000.0 assert market_data["current_price"] == 50000.0
assert market_data["price_change_pct"] == 2.5
# Portfolio data should include pnl # Portfolio data should include pnl
assert "portfolio_pnl_pct" in portfolio_data assert "portfolio_pnl_pct" in portfolio_data
@@ -1229,6 +1234,107 @@ async def test_sell_updates_original_buy_decision_outcome() -> None:
assert updated_buy.outcome_accuracy == 1 assert updated_buy.outcome_accuracy == 1
@pytest.mark.asyncio
async def test_hold_overridden_to_sell_when_stop_loss_triggered() -> None:
"""HOLD decision should be overridden to SELL when stop-loss threshold is breached."""
db_conn = init_db(":memory:")
decision_logger = DecisionLogger(db_conn)
buy_decision_id = decision_logger.log_decision(
stock_code="005930",
market="KR",
exchange_code="KRX",
action="BUY",
confidence=90,
rationale="entry",
context_snapshot={},
input_data={},
)
log_trade(
conn=db_conn,
stock_code="005930",
action="BUY",
confidence=90,
rationale="entry",
quantity=1,
price=100.0,
market="KR",
exchange_code="KRX",
decision_id=buy_decision_id,
)
broker = MagicMock()
broker.get_orderbook = AsyncMock(
return_value={"output1": {"stck_prpr": "95", "frgn_ntby_qty": "0", "prdy_ctrt": "-5.0"}}
)
broker.get_balance = AsyncMock(
return_value={
"output2": [
{
"tot_evlu_amt": "100000",
"dnca_tot_amt": "10000",
"pchs_amt_smtl_amt": "90000",
}
]
}
)
broker.send_order = AsyncMock(return_value={"msg1": "OK"})
scenario = StockScenario(
condition=StockCondition(rsi_below=30),
action=ScenarioAction.BUY,
confidence=88,
stop_loss_pct=-2.0,
rationale="stop loss policy",
)
playbook = DayPlaybook(
date=date(2026, 2, 8),
market="KR",
stock_playbooks=[
{"stock_code": "005930", "stock_name": "Samsung", "scenarios": [scenario]}
],
)
engine = MagicMock(spec=ScenarioEngine)
engine.evaluate = MagicMock(return_value=_make_hold_match())
market = MagicMock()
market.name = "Korea"
market.code = "KR"
market.exchange_code = "KRX"
market.is_domestic = True
telegram = MagicMock()
telegram.notify_trade_execution = AsyncMock()
telegram.notify_fat_finger = AsyncMock()
telegram.notify_circuit_breaker = AsyncMock()
telegram.notify_scenario_matched = AsyncMock()
await trading_cycle(
broker=broker,
overseas_broker=MagicMock(),
scenario_engine=engine,
playbook=playbook,
risk=MagicMock(),
db_conn=db_conn,
decision_logger=decision_logger,
context_store=MagicMock(
get_latest_timeframe=MagicMock(return_value=None),
set_context=MagicMock(),
),
criticality_assessor=MagicMock(
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
get_timeout=MagicMock(return_value=5.0),
),
telegram=telegram,
market=market,
stock_code="005930",
scan_candidates={},
)
broker.send_order.assert_called_once()
assert broker.send_order.call_args.kwargs["order_type"] == "SELL"
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_handle_market_close_runs_daily_review_flow() -> None: async def test_handle_market_close_runs_daily_review_flow() -> None:
"""Market close should aggregate, create scorecard, lessons, and notify.""" """Market close should aggregate, create scorecard, lessons, and notify."""
@@ -1304,6 +1410,82 @@ async def test_handle_market_close_without_lessons_stores_once() -> None:
assert reviewer.store_scorecard_in_context.call_count == 1 assert reviewer.store_scorecard_in_context.call_count == 1
@pytest.mark.asyncio
async def test_handle_market_close_triggers_evolution_for_us() -> None:
telegram = MagicMock()
telegram.notify_market_close = AsyncMock()
telegram.send_message = AsyncMock()
context_aggregator = MagicMock()
reviewer = MagicMock()
reviewer.generate_scorecard.return_value = DailyScorecard(
date="2026-02-14",
market="US",
total_decisions=2,
buys=1,
sells=1,
holds=0,
total_pnl=3.0,
win_rate=50.0,
avg_confidence=80.0,
scenario_match_rate=100.0,
)
reviewer.generate_lessons = AsyncMock(return_value=[])
evolution_optimizer = MagicMock()
evolution_optimizer.evolve = AsyncMock(return_value=None)
await _handle_market_close(
market_code="US",
market_name="United States",
market_timezone=UTC,
telegram=telegram,
context_aggregator=context_aggregator,
daily_reviewer=reviewer,
evolution_optimizer=evolution_optimizer,
)
evolution_optimizer.evolve.assert_called_once()
@pytest.mark.asyncio
async def test_handle_market_close_skips_evolution_for_kr() -> None:
telegram = MagicMock()
telegram.notify_market_close = AsyncMock()
telegram.send_message = AsyncMock()
context_aggregator = MagicMock()
reviewer = MagicMock()
reviewer.generate_scorecard.return_value = DailyScorecard(
date="2026-02-14",
market="KR",
total_decisions=1,
buys=1,
sells=0,
holds=0,
total_pnl=1.0,
win_rate=100.0,
avg_confidence=90.0,
scenario_match_rate=100.0,
)
reviewer.generate_lessons = AsyncMock(return_value=[])
evolution_optimizer = MagicMock()
evolution_optimizer.evolve = AsyncMock(return_value=None)
await _handle_market_close(
market_code="KR",
market_name="Korea",
market_timezone=UTC,
telegram=telegram,
context_aggregator=context_aggregator,
daily_reviewer=reviewer,
evolution_optimizer=evolution_optimizer,
)
evolution_optimizer.evolve.assert_not_called()
def test_run_context_scheduler_invokes_scheduler() -> None: def test_run_context_scheduler_invokes_scheduler() -> None:
"""Scheduler helper should call run_if_due with provided datetime.""" """Scheduler helper should call run_if_due with provided datetime."""
scheduler = MagicMock() scheduler = MagicMock()
@@ -1348,9 +1530,74 @@ async def test_run_evolution_loop_notifies_when_pr_generated() -> None:
await _run_evolution_loop( await _run_evolution_loop(
evolution_optimizer=optimizer, evolution_optimizer=optimizer,
telegram=telegram, telegram=telegram,
market_code="US", market_code="US_NASDAQ",
market_date="2026-02-14", market_date="2026-02-14",
) )
optimizer.evolve.assert_called_once() optimizer.evolve.assert_called_once()
telegram.send_message.assert_called_once() telegram.send_message.assert_called_once()
@pytest.mark.asyncio
async def test_run_evolution_loop_notification_error_is_ignored() -> None:
optimizer = MagicMock()
optimizer.evolve = AsyncMock(
return_value={
"title": "[Evolution] New strategy: v20260214_050000",
"branch": "evolution/v20260214_050000",
"status": "ready_for_review",
}
)
telegram = MagicMock()
telegram.send_message = AsyncMock(side_effect=RuntimeError("telegram down"))
await _run_evolution_loop(
evolution_optimizer=optimizer,
telegram=telegram,
market_code="US_NYSE",
market_date="2026-02-14",
)
optimizer.evolve.assert_called_once()
telegram.send_message.assert_called_once()
def test_apply_dashboard_flag_enables_dashboard() -> None:
settings = Settings(
KIS_APP_KEY="test_key",
KIS_APP_SECRET="test_secret",
KIS_ACCOUNT_NO="12345678-01",
GEMINI_API_KEY="test_gemini_key",
DASHBOARD_ENABLED=False,
)
updated = _apply_dashboard_flag(settings, dashboard_flag=True)
assert updated.DASHBOARD_ENABLED is True
def test_start_dashboard_server_disabled_returns_none() -> None:
settings = Settings(
KIS_APP_KEY="test_key",
KIS_APP_SECRET="test_secret",
KIS_ACCOUNT_NO="12345678-01",
GEMINI_API_KEY="test_gemini_key",
DASHBOARD_ENABLED=False,
)
thread = _start_dashboard_server(settings)
assert thread is None
def test_start_dashboard_server_enabled_starts_thread() -> None:
settings = Settings(
KIS_APP_KEY="test_key",
KIS_APP_SECRET="test_secret",
KIS_ACCOUNT_NO="12345678-01",
GEMINI_API_KEY="test_gemini_key",
DASHBOARD_ENABLED=True,
)
mock_thread = MagicMock()
with patch("src.main.threading.Thread", return_value=mock_thread) as mock_thread_cls:
thread = _start_dashboard_server(settings)
assert thread == mock_thread
mock_thread_cls.assert_called_once()
mock_thread.start.assert_called_once()

View File

@@ -7,6 +7,7 @@ import pytest
from src.markets.schedule import ( from src.markets.schedule import (
MARKETS, MARKETS,
expand_market_codes,
get_next_market_open, get_next_market_open,
get_open_markets, get_open_markets,
is_market_open, is_market_open,
@@ -199,3 +200,28 @@ class TestGetNextMarketOpen:
enabled_markets=["INVALID", "KR"], now=test_time enabled_markets=["INVALID", "KR"], now=test_time
) )
assert market.code == "KR" assert market.code == "KR"
class TestExpandMarketCodes:
"""Test shorthand market expansion."""
def test_expand_us_shorthand(self) -> None:
assert expand_market_codes(["US"]) == ["US_NASDAQ", "US_NYSE", "US_AMEX"]
def test_expand_cn_shorthand(self) -> None:
assert expand_market_codes(["CN"]) == ["CN_SHA", "CN_SZA"]
def test_expand_vn_shorthand(self) -> None:
assert expand_market_codes(["VN"]) == ["VN_HAN", "VN_HCM"]
def test_expand_mixed_codes(self) -> None:
assert expand_market_codes(["KR", "US", "JP"]) == [
"KR",
"US_NASDAQ",
"US_NYSE",
"US_AMEX",
"JP",
]
def test_expand_preserves_unknown_code(self) -> None:
assert expand_market_codes(["KR", "UNKNOWN"]) == ["KR", "UNKNOWN"]

View File

@@ -8,6 +8,7 @@ from unittest.mock import AsyncMock, MagicMock
from src.analysis.smart_scanner import ScanCandidate, SmartVolatilityScanner from src.analysis.smart_scanner import ScanCandidate, SmartVolatilityScanner
from src.analysis.volatility import VolatilityAnalyzer from src.analysis.volatility import VolatilityAnalyzer
from src.broker.kis_api import KISBroker from src.broker.kis_api import KISBroker
from src.broker.overseas import OverseasBroker
from src.config import Settings from src.config import Settings
@@ -43,61 +44,70 @@ def scanner(mock_broker: MagicMock, mock_settings: Settings) -> SmartVolatilityS
analyzer = VolatilityAnalyzer() analyzer = VolatilityAnalyzer()
return SmartVolatilityScanner( return SmartVolatilityScanner(
broker=mock_broker, broker=mock_broker,
overseas_broker=None,
volatility_analyzer=analyzer, volatility_analyzer=analyzer,
settings=mock_settings, settings=mock_settings,
) )
@pytest.fixture
def mock_overseas_broker() -> MagicMock:
"""Create mock overseas broker."""
broker = MagicMock(spec=OverseasBroker)
broker.get_overseas_price = AsyncMock()
broker.fetch_overseas_rankings = AsyncMock(return_value=[])
return broker
class TestSmartVolatilityScanner: class TestSmartVolatilityScanner:
"""Test suite for SmartVolatilityScanner.""" """Test suite for SmartVolatilityScanner."""
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_scan_finds_oversold_candidates( async def test_scan_domestic_prefers_volatility_with_liquidity_bonus(
self, scanner: SmartVolatilityScanner, mock_broker: MagicMock self, scanner: SmartVolatilityScanner, mock_broker: MagicMock
) -> None: ) -> None:
"""Test that scanner identifies oversold stocks with high volume.""" """Domestic scan should score by volatility first and volume rank second."""
# Mock rankings fluctuation_rows = [
mock_broker.fetch_market_rankings.return_value = [
{ {
"stock_code": "005930", "stock_code": "005930",
"name": "Samsung", "name": "Samsung",
"price": 70000, "price": 70000,
"volume": 5000000, "volume": 5000000,
"change_rate": -3.5, "change_rate": -5.0,
"volume_increase_rate": 250, "volume_increase_rate": 250,
}, },
{
"stock_code": "035420",
"name": "NAVER",
"price": 250000,
"volume": 3000000,
"change_rate": 3.0,
"volume_increase_rate": 200,
},
]
volume_rows = [
{"stock_code": "035420", "name": "NAVER", "price": 250000, "volume": 3000000},
{"stock_code": "005930", "name": "Samsung", "price": 70000, "volume": 5000000},
]
mock_broker.fetch_market_rankings.side_effect = [fluctuation_rows, volume_rows]
mock_broker.get_daily_prices.return_value = [
{"open": 1, "high": 1, "low": 1, "close": 1, "volume": 1000000},
{"open": 1, "high": 1, "low": 1, "close": 1, "volume": 1000000},
] ]
# Mock daily prices - trending down (oversold)
prices = []
for i in range(20):
prices.append({
"date": f"2026020{i:02d}",
"open": 75000 - i * 200,
"high": 75500 - i * 200,
"low": 74500 - i * 200,
"close": 75000 - i * 250, # Steady decline
"volume": 2000000,
})
mock_broker.get_daily_prices.return_value = prices
candidates = await scanner.scan() candidates = await scanner.scan()
# Should find at least one candidate (depending on exact RSI calculation) assert len(candidates) >= 1
mock_broker.fetch_market_rankings.assert_called_once() # Samsung has higher absolute move, so it should lead despite lower volume rank bonus.
mock_broker.get_daily_prices.assert_called_once_with("005930", days=20) assert candidates[0].stock_code == "005930"
assert candidates[0].signal == "oversold"
# If qualified, should have oversold signal
if candidates:
assert candidates[0].signal in ["oversold", "momentum"]
assert candidates[0].volume_ratio >= scanner.vol_multiplier
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_scan_finds_momentum_candidates( async def test_scan_domestic_finds_momentum_candidate(
self, scanner: SmartVolatilityScanner, mock_broker: MagicMock self, scanner: SmartVolatilityScanner, mock_broker: MagicMock
) -> None: ) -> None:
"""Test that scanner identifies momentum stocks with high volume.""" """Positive change should be represented as momentum signal."""
mock_broker.fetch_market_rankings.return_value = [ fluctuation_rows = [
{ {
"stock_code": "035420", "stock_code": "035420",
"name": "NAVER", "name": "NAVER",
@@ -107,124 +117,67 @@ class TestSmartVolatilityScanner:
"volume_increase_rate": 300, "volume_increase_rate": 300,
}, },
] ]
mock_broker.fetch_market_rankings.side_effect = [fluctuation_rows, fluctuation_rows]
# Mock daily prices - trending up (momentum) mock_broker.get_daily_prices.return_value = [
prices = [] {"open": 1, "high": 1, "low": 1, "close": 1, "volume": 1000000},
for i in range(20): {"open": 1, "high": 1, "low": 1, "close": 1, "volume": 1000000},
prices.append({ ]
"date": f"2026020{i:02d}",
"open": 230000 + i * 500,
"high": 231000 + i * 500,
"low": 229000 + i * 500,
"close": 230500 + i * 500, # Steady rise
"volume": 1000000,
})
mock_broker.get_daily_prices.return_value = prices
candidates = await scanner.scan() candidates = await scanner.scan()
mock_broker.fetch_market_rankings.assert_called_once() assert [c.stock_code for c in candidates] == ["035420"]
assert candidates[0].signal == "momentum"
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_scan_filters_low_volume( async def test_scan_domestic_filters_low_volatility(
self, scanner: SmartVolatilityScanner, mock_broker: MagicMock self, scanner: SmartVolatilityScanner, mock_broker: MagicMock
) -> None: ) -> None:
"""Test that stocks with low volume ratio are filtered out.""" """Domestic scan should drop symbols below volatility threshold."""
mock_broker.fetch_market_rankings.return_value = [ fluctuation_rows = [
{ {
"stock_code": "000660", "stock_code": "000660",
"name": "SK Hynix", "name": "SK Hynix",
"price": 150000, "price": 150000,
"volume": 500000, "volume": 500000,
"change_rate": -5.0, "change_rate": 0.2,
"volume_increase_rate": 50, # Only 50% increase (< 200%) "volume_increase_rate": 50,
}, },
] ]
mock_broker.fetch_market_rankings.side_effect = [fluctuation_rows, fluctuation_rows]
# Low volume mock_broker.get_daily_prices.return_value = [
prices = [] {"open": 1, "high": 150100, "low": 149900, "close": 150000, "volume": 1000000},
for i in range(20): {"open": 1, "high": 150100, "low": 149900, "close": 150000, "volume": 1000000},
prices.append({ ]
"date": f"2026020{i:02d}",
"open": 150000 - i * 100,
"high": 151000 - i * 100,
"low": 149000 - i * 100,
"close": 150000 - i * 150, # Declining (would be oversold)
"volume": 1000000, # Current 500k < 2x prev day 1M
})
mock_broker.get_daily_prices.return_value = prices
candidates = await scanner.scan() candidates = await scanner.scan()
# Should be filtered out due to low volume ratio
assert len(candidates) == 0
@pytest.mark.asyncio
async def test_scan_filters_neutral_rsi(
self, scanner: SmartVolatilityScanner, mock_broker: MagicMock
) -> None:
"""Test that stocks with neutral RSI are filtered out."""
mock_broker.fetch_market_rankings.return_value = [
{
"stock_code": "051910",
"name": "LG Chem",
"price": 500000,
"volume": 3000000,
"change_rate": 0.5,
"volume_increase_rate": 300, # High volume
},
]
# Flat prices (neutral RSI ~50)
prices = []
for i in range(20):
prices.append({
"date": f"2026020{i:02d}",
"open": 500000 + (i % 2) * 100, # Small oscillation
"high": 500500,
"low": 499500,
"close": 500000 + (i % 2) * 50,
"volume": 1000000,
})
mock_broker.get_daily_prices.return_value = prices
candidates = await scanner.scan()
# Should be filtered out (RSI ~50, not < 30 or > 70)
assert len(candidates) == 0 assert len(candidates) == 0
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_scan_uses_fallback_on_api_error( async def test_scan_uses_fallback_on_api_error(
self, scanner: SmartVolatilityScanner, mock_broker: MagicMock self, scanner: SmartVolatilityScanner, mock_broker: MagicMock
) -> None: ) -> None:
"""Test fallback to static list when ranking API fails.""" """Domestic scan should remain operational using fallback symbols."""
mock_broker.fetch_market_rankings.side_effect = ConnectionError("API unavailable") mock_broker.fetch_market_rankings.side_effect = [
ConnectionError("API unavailable"),
# Fallback stocks should still be analyzed ConnectionError("API unavailable"),
prices = [] ]
for i in range(20): mock_broker.get_daily_prices.return_value = [
prices.append({ {"open": 1, "high": 103, "low": 97, "close": 100, "volume": 1000000},
"date": f"2026020{i:02d}", {"open": 1, "high": 103, "low": 97, "close": 100, "volume": 800000},
"open": 50000 - i * 50, ]
"high": 51000 - i * 50,
"low": 49000 - i * 50,
"close": 50000 - i * 75, # Declining
"volume": 1000000,
})
mock_broker.get_daily_prices.return_value = prices
candidates = await scanner.scan(fallback_stocks=["005930", "000660"]) candidates = await scanner.scan(fallback_stocks=["005930", "000660"])
# Should not crash
assert isinstance(candidates, list) assert isinstance(candidates, list)
assert len(candidates) >= 1
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_scan_returns_top_n_only( async def test_scan_returns_top_n_only(
self, scanner: SmartVolatilityScanner, mock_broker: MagicMock self, scanner: SmartVolatilityScanner, mock_broker: MagicMock
) -> None: ) -> None:
"""Test that scan returns at most top_n candidates.""" """Test that scan returns at most top_n candidates."""
# Return many stocks fluctuation_rows = [
mock_broker.fetch_market_rankings.return_value = [
{ {
"stock_code": f"00{i}000", "stock_code": f"00{i}000",
"name": f"Stock{i}", "name": f"Stock{i}",
@@ -235,62 +188,17 @@ class TestSmartVolatilityScanner:
} }
for i in range(1, 10) for i in range(1, 10)
] ]
mock_broker.fetch_market_rankings.side_effect = [fluctuation_rows, fluctuation_rows]
# All oversold with high volume mock_broker.get_daily_prices.return_value = [
def make_prices(code: str) -> list[dict]: {"open": 1, "high": 105, "low": 95, "close": 100, "volume": 1000000},
prices = [] {"open": 1, "high": 105, "low": 95, "close": 100, "volume": 900000},
for i in range(20): ]
prices.append({
"date": f"2026020{i:02d}",
"open": 10000 - i * 100,
"high": 10500 - i * 100,
"low": 9500 - i * 100,
"close": 10000 - i * 150,
"volume": 1000000,
})
return prices
mock_broker.get_daily_prices.side_effect = make_prices
candidates = await scanner.scan() candidates = await scanner.scan()
# Should respect top_n limit (3) # Should respect top_n limit (3)
assert len(candidates) <= scanner.top_n assert len(candidates) <= scanner.top_n
@pytest.mark.asyncio
async def test_scan_skips_insufficient_price_history(
self, scanner: SmartVolatilityScanner, mock_broker: MagicMock
) -> None:
"""Test that stocks with insufficient history are skipped."""
mock_broker.fetch_market_rankings.return_value = [
{
"stock_code": "005930",
"name": "Samsung",
"price": 70000,
"volume": 5000000,
"change_rate": -5.0,
"volume_increase_rate": 300,
},
]
# Only 5 days of data (need 15+ for RSI)
mock_broker.get_daily_prices.return_value = [
{
"date": f"2026020{i:02d}",
"open": 70000,
"high": 71000,
"low": 69000,
"close": 70000,
"volume": 2000000,
}
for i in range(5)
]
candidates = await scanner.scan()
# Should skip due to insufficient data
assert len(candidates) == 0
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_get_stock_codes( async def test_get_stock_codes(
self, scanner: SmartVolatilityScanner self, scanner: SmartVolatilityScanner
@@ -323,6 +231,124 @@ class TestSmartVolatilityScanner:
assert codes == ["005930", "035420"] assert codes == ["005930", "035420"]
@pytest.mark.asyncio
async def test_scan_overseas_uses_dynamic_symbols(
self, mock_broker: MagicMock, mock_overseas_broker: MagicMock, mock_settings: Settings
) -> None:
"""Overseas scan should use provided dynamic universe symbols."""
analyzer = VolatilityAnalyzer()
scanner = SmartVolatilityScanner(
broker=mock_broker,
overseas_broker=mock_overseas_broker,
volatility_analyzer=analyzer,
settings=mock_settings,
)
market = MagicMock()
market.name = "NASDAQ"
market.code = "US_NASDAQ"
market.exchange_code = "NASD"
market.is_domestic = False
mock_overseas_broker.get_overseas_price.side_effect = [
{"output": {"last": "210.5", "rate": "1.6", "tvol": "1500000"}},
{"output": {"last": "330.1", "rate": "0.2", "tvol": "900000"}},
]
candidates = await scanner.scan(
market=market,
fallback_stocks=["AAPL", "MSFT"],
)
assert [c.stock_code for c in candidates] == ["AAPL"]
assert candidates[0].signal == "momentum"
assert candidates[0].price == 210.5
@pytest.mark.asyncio
async def test_scan_overseas_uses_ranking_api_first(
self, mock_broker: MagicMock, mock_overseas_broker: MagicMock, mock_settings: Settings
) -> None:
"""Overseas scan should prioritize ranking API when available."""
analyzer = VolatilityAnalyzer()
scanner = SmartVolatilityScanner(
broker=mock_broker,
overseas_broker=mock_overseas_broker,
volatility_analyzer=analyzer,
settings=mock_settings,
)
market = MagicMock()
market.name = "NASDAQ"
market.code = "US_NASDAQ"
market.exchange_code = "NASD"
market.is_domestic = False
mock_overseas_broker.fetch_overseas_rankings.return_value = [
{"symb": "NVDA", "last": "780.2", "rate": "2.4", "tvol": "1200000"},
{"symb": "MSFT", "last": "420.0", "rate": "0.3", "tvol": "900000"},
]
candidates = await scanner.scan(market=market, fallback_stocks=["AAPL", "TSLA"])
assert mock_overseas_broker.fetch_overseas_rankings.call_count >= 1
mock_overseas_broker.get_overseas_price.assert_not_called()
assert [c.stock_code for c in candidates] == ["NVDA"]
@pytest.mark.asyncio
async def test_scan_overseas_without_symbols_returns_empty(
self, mock_broker: MagicMock, mock_overseas_broker: MagicMock, mock_settings: Settings
) -> None:
"""Overseas scan should return empty list when no symbol universe exists."""
analyzer = VolatilityAnalyzer()
scanner = SmartVolatilityScanner(
broker=mock_broker,
overseas_broker=mock_overseas_broker,
volatility_analyzer=analyzer,
settings=mock_settings,
)
market = MagicMock()
market.name = "NASDAQ"
market.code = "US_NASDAQ"
market.exchange_code = "NASD"
market.is_domestic = False
candidates = await scanner.scan(market=market, fallback_stocks=[])
assert candidates == []
@pytest.mark.asyncio
async def test_scan_overseas_picks_high_intraday_range_even_with_low_change(
self, mock_broker: MagicMock, mock_overseas_broker: MagicMock, mock_settings: Settings
) -> None:
"""Volatility selection should consider intraday range, not only change rate."""
analyzer = VolatilityAnalyzer()
scanner = SmartVolatilityScanner(
broker=mock_broker,
overseas_broker=mock_overseas_broker,
volatility_analyzer=analyzer,
settings=mock_settings,
)
market = MagicMock()
market.name = "NASDAQ"
market.code = "US_NASDAQ"
market.exchange_code = "NASD"
market.is_domestic = False
# change rate is tiny, but high-low range is large (15%).
mock_overseas_broker.fetch_overseas_rankings.return_value = [
{
"symb": "ABCD",
"last": "100",
"rate": "0.2",
"high": "110",
"low": "95",
"tvol": "800000",
}
]
candidates = await scanner.scan(market=market, fallback_stocks=[])
assert [c.stock_code for c in candidates] == ["ABCD"]
class TestRSICalculation: class TestRSICalculation:
"""Test RSI calculation in VolatilityAnalyzer.""" """Test RSI calculation in VolatilityAnalyzer."""

View File

@@ -682,6 +682,10 @@ class TestBasicCommands:
"/help - Show available commands\n" "/help - Show available commands\n"
"/status - Trading status (mode, markets, P&L)\n" "/status - Trading status (mode, markets, P&L)\n"
"/positions - Current holdings\n" "/positions - Current holdings\n"
"/report - Daily summary report\n"
"/scenarios - Today's playbook scenarios\n"
"/review - Recent scorecards\n"
"/dashboard - Dashboard URL/status\n"
"/stop - Pause trading\n" "/stop - Pause trading\n"
"/resume - Resume trading" "/resume - Resume trading"
) )
@@ -707,10 +711,106 @@ class TestBasicCommands:
assert "/help" in payload["text"] assert "/help" in payload["text"]
assert "/status" in payload["text"] assert "/status" in payload["text"]
assert "/positions" in payload["text"] assert "/positions" in payload["text"]
assert "/report" in payload["text"]
assert "/scenarios" in payload["text"]
assert "/review" in payload["text"]
assert "/dashboard" in payload["text"]
assert "/stop" in payload["text"] assert "/stop" in payload["text"]
assert "/resume" in payload["text"] assert "/resume" in payload["text"]
class TestExtendedCommands:
"""Test additional bot commands."""
@pytest.mark.asyncio
async def test_report_command(self) -> None:
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
async def mock_report() -> None:
await client.send_message("<b>📈 Daily Report</b>\n\nTrades: 1")
handler.register_command("report", mock_report)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
await handler._handle_update(
{"update_id": 1, "message": {"chat": {"id": 456}, "text": "/report"}}
)
payload = mock_post.call_args.kwargs["json"]
assert "Daily Report" in payload["text"]
@pytest.mark.asyncio
async def test_scenarios_command(self) -> None:
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
async def mock_scenarios() -> None:
await client.send_message("<b>🧠 Today's Scenarios</b>\n\n- AAPL: BUY (85)")
handler.register_command("scenarios", mock_scenarios)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
await handler._handle_update(
{"update_id": 1, "message": {"chat": {"id": 456}, "text": "/scenarios"}}
)
payload = mock_post.call_args.kwargs["json"]
assert "Today's Scenarios" in payload["text"]
@pytest.mark.asyncio
async def test_review_command(self) -> None:
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
async def mock_review() -> None:
await client.send_message("<b>📝 Recent Reviews</b>\n\n- 2026-02-14 KR")
handler.register_command("review", mock_review)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
await handler._handle_update(
{"update_id": 1, "message": {"chat": {"id": 456}, "text": "/review"}}
)
payload = mock_post.call_args.kwargs["json"]
assert "Recent Reviews" in payload["text"]
@pytest.mark.asyncio
async def test_dashboard_command(self) -> None:
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
async def mock_dashboard() -> None:
await client.send_message("<b>🖥️ Dashboard</b>\n\nURL: http://127.0.0.1:8080")
handler.register_command("dashboard", mock_dashboard)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
await handler._handle_update(
{"update_id": 1, "message": {"chat": {"id": 456}, "text": "/dashboard"}}
)
payload = mock_post.call_args.kwargs["json"]
assert "Dashboard" in payload["text"]
class TestGetUpdates: class TestGetUpdates:
"""Test getUpdates API interaction.""" """Test getUpdates API interaction."""