feat: implement Smart Volatility Scanner with RSI/volume filters (issue #76)
Some checks failed
CI / test (pull_request) Has been cancelled

Add Python-first scanning pipeline that reduces Gemini API calls by filtering
stocks before AI analysis: KIS rankings API -> RSI/volume filter -> AI judgment.

## Implementation
- Add RSI calculation (Wilder's smoothing method) to VolatilityAnalyzer
- Add KIS API methods: fetch_market_rankings() and get_daily_prices()
- Create SmartVolatilityScanner with configurable thresholds
- Integrate scanner into main.py realtime mode
- Add selection_context logging to trades table for Evolution system

## Configuration
- RSI_OVERSOLD_THRESHOLD: 30 (configurable 0-50)
- RSI_MOMENTUM_THRESHOLD: 70 (configurable 50-100)
- VOL_MULTIPLIER: 2.0 (minimum volume ratio, configurable 1-10)
- SCANNER_TOP_N: 3 (max candidates per scan, configurable 1-10)

## Benefits
- Reduces Gemini API calls (process 1-3 qualified stocks vs 20-30 ranked)
- Python-based technical filtering before expensive AI judgment
- Tracks selection criteria (RSI, volume_ratio, signal, score) for strategy optimization
- Graceful fallback to static watchlist if ranking API fails

## Tests
- 13 new tests for SmartVolatilityScanner and RSI calculation
- All existing tests updated and passing
- Coverage maintained at 73%

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
agentson
2026-02-06 00:48:23 +09:00
parent 27f581f17d
commit f0ae25c533
9 changed files with 861 additions and 24 deletions

View File

@@ -15,6 +15,7 @@ from datetime import UTC, datetime
from typing import Any
from src.analysis.scanner import MarketScanner
from src.analysis.smart_scanner import ScanCandidate, SmartVolatilityScanner
from src.analysis.volatility import VolatilityAnalyzer
from src.brain.gemini_client import GeminiClient
from src.broker.kis_api import KISBroker
@@ -100,6 +101,7 @@ async def trading_cycle(
telegram: TelegramClient,
market: MarketInfo,
stock_code: str,
scan_candidates: dict[str, ScanCandidate],
) -> None:
"""Execute one trading cycle for a single stock."""
cycle_start_time = asyncio.get_event_loop().time()
@@ -292,7 +294,17 @@ async def trading_cycle(
except Exception as exc:
logger.warning("Telegram notification failed: %s", exc)
# 6. Log trade
# 6. Log trade with selection context
selection_context = None
if stock_code in scan_candidates:
candidate = scan_candidates[stock_code]
selection_context = {
"rsi": candidate.rsi,
"volume_ratio": candidate.volume_ratio,
"signal": candidate.signal,
"score": candidate.score,
}
log_trade(
conn=db_conn,
stock_code=stock_code,
@@ -301,6 +313,7 @@ async def trading_cycle(
rationale=decision.rationale,
market=market.code,
exchange_code=market.exchange_code,
selection_context=selection_context,
)
# 7. Latency monitoring
@@ -722,6 +735,16 @@ async def run(settings: Settings) -> None:
max_concurrent_scans=1, # Fully serialized to avoid EGW00201
)
# Initialize smart scanner (Python-first, AI-last pipeline)
smart_scanner = SmartVolatilityScanner(
broker=broker,
volatility_analyzer=volatility_analyzer,
settings=settings,
)
# Track scan candidates for selection context logging
scan_candidates: dict[str, ScanCandidate] = {} # stock_code -> candidate
# Initialize latency control system
criticality_assessor = CriticalityAssessor(
critical_pnl_threshold=-2.5, # Near circuit breaker at -3.0%
@@ -867,38 +890,46 @@ async def run(settings: Settings) -> None:
logger.warning("Market open notification failed: %s", exc)
_market_states[market.code] = True
# Volatility Hunter: Scan market periodically to update watchlist
# Smart Scanner: Python-first filtering (RSI + volume) before AI
now_timestamp = asyncio.get_event_loop().time()
last_scan = last_scan_time.get(market.code, 0.0)
if now_timestamp - last_scan >= SCAN_INTERVAL_SECONDS:
try:
# Scan all stocks in the universe
stock_universe = STOCK_UNIVERSE.get(market.code, [])
if stock_universe:
logger.info("Volatility Hunter: Scanning %s market", market.name)
scan_result = await market_scanner.scan_market(
market, stock_universe
)
logger.info("Smart Scanner: Scanning %s market", market.name)
# Update watchlist with top movers
# Run smart scan with fallback to static universe
fallback_universe = STOCK_UNIVERSE.get(market.code, [])
candidates = await smart_scanner.scan(fallback_stocks=fallback_universe)
if candidates:
# Update watchlist with qualified candidates
qualified_codes = smart_scanner.get_stock_codes(candidates)
# Merge with existing watchlist (keep some continuity)
current_watchlist = WATCHLISTS.get(market.code, [])
updated_watchlist = market_scanner.get_updated_watchlist(
current_watchlist,
scan_result,
max_replacements=2,
)
WATCHLISTS[market.code] = updated_watchlist
# Keep up to 2 from existing, add new qualified
merged = qualified_codes + [
c for c in current_watchlist if c not in qualified_codes
][:2]
WATCHLISTS[market.code] = merged[:5] # Cap at 5
# Store candidates for later selection context logging
for candidate in candidates:
scan_candidates[candidate.stock_code] = candidate
logger.info(
"Volatility Hunter: Watchlist updated for %s (%d top movers, %d breakouts)",
"Smart Scanner: Found %d qualified candidates for %s: %s",
len(candidates),
market.name,
len(scan_result.top_movers),
len(scan_result.breakouts),
[f"{c.stock_code}(RSI={c.rsi:.0f})" for c in candidates],
)
else:
logger.info("Smart Scanner: No qualified candidates for %s", market.name)
last_scan_time[market.code] = now_timestamp
except Exception as exc:
logger.error("Volatility Hunter scan failed for %s: %s", market.name, exc)
logger.error("Smart Scanner failed for %s: %s", market.name, exc)
# Get watchlist for this market
watchlist = WATCHLISTS.get(market.code, [])
@@ -928,6 +959,7 @@ async def run(settings: Settings) -> None:
telegram,
market,
stock_code,
scan_candidates,
)
break # Success — exit retry loop
except CircuitBreakerTripped as exc: