feat: implement daily trading mode with batch decisions (issue #57)
Some checks failed
CI / test (pull_request) Has been cancelled

Add API-efficient daily trading mode for Gemini Free tier compatibility:

## Features

- **Batch Decisions**: GeminiClient.decide_batch() analyzes multiple stocks
  in a single API call using compressed JSON format
- **Daily Trading Mode**: run_daily_session() executes N sessions per day
  at configurable intervals (default: 4 sessions, 6 hours apart)
- **Mode Selection**: TRADE_MODE env var switches between daily (batch)
  and realtime (per-stock) modes
- **Requirements Log**: docs/requirements-log.md tracks user feedback
  chronologically for project evolution

## Configuration

- TRADE_MODE: "daily" (default) | "realtime"
- DAILY_SESSIONS: 1-10 (default: 4)
- SESSION_INTERVAL_HOURS: 1-24 (default: 6)

## API Efficiency

- 2 markets × 4 sessions = 8 API calls/day (within Free tier 20 calls)
- 3 markets × 4 sessions = 12 API calls/day (within Free tier 20 calls)

## Testing

- 9 new batch decision tests (all passing)
- All existing tests maintained (298 passed)

## Documentation

- docs/architecture.md: Trading Modes section with daily vs realtime
- CLAUDE.md: Requirements Management section
- docs/requirements-log.md: Initial entries for API efficiency needs

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
agentson
2026-02-05 09:28:10 +09:00
parent 71ac59794e
commit 0057de4d12
7 changed files with 861 additions and 150 deletions

View File

@@ -74,6 +74,10 @@ TRADE_INTERVAL_SECONDS = 60
SCAN_INTERVAL_SECONDS = 60 # Scan markets every 60 seconds
MAX_CONNECTION_RETRIES = 3
# Daily trading mode constants (for Free tier API efficiency)
DAILY_TRADE_SESSIONS = 4 # Number of trading sessions per day
TRADE_SESSION_INTERVAL_HOURS = 6 # Hours between sessions
# Full stock universe per market (for scanning)
# In production, this would be loaded from a database or API
STOCK_UNIVERSE = {
@@ -321,6 +325,239 @@ async def trading_cycle(
)
async def run_daily_session(
broker: KISBroker,
overseas_broker: OverseasBroker,
brain: GeminiClient,
risk: RiskManager,
db_conn: Any,
decision_logger: DecisionLogger,
context_store: ContextStore,
criticality_assessor: CriticalityAssessor,
telegram: TelegramClient,
settings: Settings,
) -> None:
"""Execute one daily trading session.
Designed for API efficiency with Gemini Free tier:
- Batch decision making (1 API call per market)
- Runs N times per day at fixed intervals
- Minimizes API usage while maintaining trading capability
"""
# Get currently open markets
open_markets = get_open_markets(settings.enabled_market_list)
if not open_markets:
logger.info("No markets open for this session")
return
logger.info("Starting daily trading session for %d markets", len(open_markets))
# Process each open market
for market in open_markets:
# Get watchlist for this market
watchlist = WATCHLISTS.get(market.code, [])
if not watchlist:
logger.debug("No watchlist for market %s", market.code)
continue
logger.info("Processing market: %s (%d stocks)", market.name, len(watchlist))
# Collect market data for all stocks in the watchlist
stocks_data = []
for stock_code in watchlist:
try:
if market.is_domestic:
orderbook = await broker.get_orderbook(stock_code)
current_price = safe_float(orderbook.get("output1", {}).get("stck_prpr", "0"))
foreigner_net = safe_float(
orderbook.get("output1", {}).get("frgn_ntby_qty", "0")
)
else:
price_data = await overseas_broker.get_overseas_price(
market.exchange_code, stock_code
)
current_price = safe_float(price_data.get("output", {}).get("last", "0"))
foreigner_net = 0.0
stocks_data.append(
{
"stock_code": stock_code,
"market_name": market.name,
"current_price": current_price,
"foreigner_net": foreigner_net,
}
)
except Exception as exc:
logger.error("Failed to fetch data for %s: %s", stock_code, exc)
continue
if not stocks_data:
logger.warning("No valid stock data for market %s", market.code)
continue
# Get batch decisions (1 API call for all stocks in this market)
logger.info("Requesting batch decision for %d stocks in %s", len(stocks_data), market.name)
decisions = await brain.decide_batch(stocks_data)
# Get balance data once for the market
if market.is_domestic:
balance_data = await broker.get_balance()
output2 = balance_data.get("output2", [{}])
total_eval = safe_float(output2[0].get("tot_evlu_amt", "0")) if output2 else 0
total_cash = safe_float(output2[0].get("dnca_tot_amt", "0")) if output2 else 0
purchase_total = safe_float(output2[0].get("pchs_amt_smtl_amt", "0")) if output2 else 0
else:
balance_data = await overseas_broker.get_overseas_balance(market.exchange_code)
output2 = balance_data.get("output2", [{}])
if isinstance(output2, list) and output2:
balance_info = output2[0]
elif isinstance(output2, dict):
balance_info = output2
else:
balance_info = {}
total_eval = safe_float(balance_info.get("frcr_evlu_tota", "0") or "0")
total_cash = safe_float(balance_info.get("frcr_dncl_amt_2", "0") or "0")
purchase_total = safe_float(balance_info.get("frcr_buy_amt_smtl", "0") or "0")
# Calculate daily P&L %
pnl_pct = (
((total_eval - purchase_total) / purchase_total * 100) if purchase_total > 0 else 0.0
)
# Execute decisions for each stock
for stock_data in stocks_data:
stock_code = stock_data["stock_code"]
decision = decisions.get(stock_code)
if not decision:
logger.warning("No decision for %s — skipping", stock_code)
continue
logger.info(
"Decision for %s (%s): %s (confidence=%d)",
stock_code,
market.name,
decision.action,
decision.confidence,
)
# Log decision
context_snapshot = {
"L1": {
"current_price": stock_data["current_price"],
"foreigner_net": stock_data["foreigner_net"],
},
"L2": {
"total_eval": total_eval,
"total_cash": total_cash,
"purchase_total": purchase_total,
"pnl_pct": pnl_pct,
},
}
input_data = {
"current_price": stock_data["current_price"],
"foreigner_net": stock_data["foreigner_net"],
"total_eval": total_eval,
"total_cash": total_cash,
"pnl_pct": pnl_pct,
}
decision_logger.log_decision(
stock_code=stock_code,
market=market.code,
exchange_code=market.exchange_code,
action=decision.action,
confidence=decision.confidence,
rationale=decision.rationale,
context_snapshot=context_snapshot,
input_data=input_data,
)
# Execute if actionable
if decision.action in ("BUY", "SELL"):
quantity = 1
order_amount = stock_data["current_price"] * quantity
# Risk check
try:
risk.validate_order(
current_pnl_pct=pnl_pct,
order_amount=order_amount,
total_cash=total_cash,
)
except FatFingerRejected as exc:
try:
await telegram.notify_fat_finger(
stock_code=stock_code,
order_amount=exc.order_amount,
total_cash=exc.total_cash,
max_pct=exc.max_pct,
)
except Exception as notify_exc:
logger.warning("Fat finger notification failed: %s", notify_exc)
continue # Skip this order
except CircuitBreakerTripped as exc:
logger.critical("Circuit breaker tripped — stopping session")
try:
await telegram.notify_circuit_breaker(
pnl_pct=exc.pnl_pct,
threshold=exc.threshold,
)
except Exception as notify_exc:
logger.warning("Circuit breaker notification failed: %s", notify_exc)
raise
# Send order
try:
if market.is_domestic:
result = await broker.send_order(
stock_code=stock_code,
order_type=decision.action,
quantity=quantity,
price=0, # market order
)
else:
result = await overseas_broker.send_overseas_order(
exchange_code=market.exchange_code,
stock_code=stock_code,
order_type=decision.action,
quantity=quantity,
price=0.0, # market order
)
logger.info("Order result: %s", result.get("msg1", "OK"))
# Notify trade execution
try:
await telegram.notify_trade_execution(
stock_code=stock_code,
market=market.name,
action=decision.action,
quantity=quantity,
price=stock_data["current_price"],
confidence=decision.confidence,
)
except Exception as exc:
logger.warning("Telegram notification failed: %s", exc)
except Exception as exc:
logger.error("Order execution failed for %s: %s", stock_code, exc)
continue
# Log trade
log_trade(
conn=db_conn,
stock_code=stock_code,
action=decision.action,
confidence=decision.confidence,
rationale=decision.rationale,
market=market.code,
exchange_code=market.exchange_code,
)
logger.info("Daily trading session completed")
async def run(settings: Settings) -> None:
"""Main async loop — iterate over open markets on a timer."""
broker = KISBroker(settings)
@@ -375,7 +612,7 @@ async def run(settings: Settings) -> None:
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, _signal_handler)
logger.info("The Ouroboros is alive. Mode: %s", settings.MODE)
logger.info("The Ouroboros is alive. Mode: %s, Trading: %s", settings.MODE, settings.TRADE_MODE)
logger.info("Enabled markets: %s", settings.enabled_market_list)
# Notify system startup
@@ -385,170 +622,213 @@ async def run(settings: Settings) -> None:
logger.warning("System startup notification failed: %s", exc)
try:
while not shutdown.is_set():
# Get currently open markets
open_markets = get_open_markets(settings.enabled_market_list)
# Branch based on trading mode
if settings.TRADE_MODE == "daily":
# Daily trading mode: batch decisions at fixed intervals
logger.info(
"Daily trading mode: %d sessions every %d hours",
settings.DAILY_SESSIONS,
settings.SESSION_INTERVAL_HOURS,
)
if not open_markets:
# Notify market close for any markets that were open
for market_code, is_open in list(_market_states.items()):
if is_open:
try:
from src.markets.schedule import MARKETS
session_interval = settings.SESSION_INTERVAL_HOURS * 3600 # Convert to seconds
market_info = MARKETS.get(market_code)
if market_info:
await telegram.notify_market_close(market_info.name, 0.0)
except Exception as exc:
logger.warning("Market close notification failed: %s", exc)
_market_states[market_code] = False
# No markets open — wait until next market opens
while not shutdown.is_set():
try:
next_market, next_open_time = get_next_market_open(
settings.enabled_market_list
await run_daily_session(
broker,
overseas_broker,
brain,
risk,
db_conn,
decision_logger,
context_store,
criticality_assessor,
telegram,
settings,
)
now = datetime.now(UTC)
wait_seconds = (next_open_time - now).total_seconds()
logger.info(
"No markets open. Next market: %s, opens in %.1f hours",
next_market.name,
wait_seconds / 3600,
)
await asyncio.wait_for(shutdown.wait(), timeout=wait_seconds)
except TimeoutError:
continue # Market should be open now
except ValueError as exc:
logger.error("Failed to find next market open: %s", exc)
await asyncio.sleep(TRADE_INTERVAL_SECONDS)
continue
# Process each open market
for market in open_markets:
if shutdown.is_set():
except CircuitBreakerTripped:
logger.critical("Circuit breaker tripped — shutting down")
shutdown.set()
break
except Exception as exc:
logger.exception("Daily session error: %s", exc)
# Notify market open if it just opened
if not _market_states.get(market.code, False):
# Wait for next session or shutdown
logger.info("Next session in %.1f hours", session_interval / 3600)
try:
await asyncio.wait_for(shutdown.wait(), timeout=session_interval)
except TimeoutError:
pass # Normal — time for next session
else:
# Realtime trading mode: original per-stock loop
logger.info("Realtime trading mode: 60s interval per stock")
while not shutdown.is_set():
# Get currently open markets
open_markets = get_open_markets(settings.enabled_market_list)
if not open_markets:
# Notify market close for any markets that were open
for market_code, is_open in list(_market_states.items()):
if is_open:
try:
from src.markets.schedule import MARKETS
market_info = MARKETS.get(market_code)
if market_info:
await telegram.notify_market_close(market_info.name, 0.0)
except Exception as exc:
logger.warning("Market close notification failed: %s", exc)
_market_states[market_code] = False
# No markets open — wait until next market opens
try:
await telegram.notify_market_open(market.name)
except Exception as exc:
logger.warning("Market open notification failed: %s", exc)
_market_states[market.code] = True
# Volatility Hunter: Scan market periodically to update watchlist
now_timestamp = asyncio.get_event_loop().time()
last_scan = last_scan_time.get(market.code, 0.0)
if now_timestamp - last_scan >= SCAN_INTERVAL_SECONDS:
try:
# Scan all stocks in the universe
stock_universe = STOCK_UNIVERSE.get(market.code, [])
if stock_universe:
logger.info("Volatility Hunter: Scanning %s market", market.name)
scan_result = await market_scanner.scan_market(
market, stock_universe
)
# Update watchlist with top movers
current_watchlist = WATCHLISTS.get(market.code, [])
updated_watchlist = market_scanner.get_updated_watchlist(
current_watchlist,
scan_result,
max_replacements=2,
)
WATCHLISTS[market.code] = updated_watchlist
logger.info(
"Volatility Hunter: Watchlist updated for %s (%d top movers, %d breakouts)",
market.name,
len(scan_result.top_movers),
len(scan_result.breakouts),
)
last_scan_time[market.code] = now_timestamp
except Exception as exc:
logger.error("Volatility Hunter scan failed for %s: %s", market.name, exc)
# Get watchlist for this market
watchlist = WATCHLISTS.get(market.code, [])
if not watchlist:
logger.debug("No watchlist for market %s", market.code)
next_market, next_open_time = get_next_market_open(
settings.enabled_market_list
)
now = datetime.now(UTC)
wait_seconds = (next_open_time - now).total_seconds()
logger.info(
"No markets open. Next market: %s, opens in %.1f hours",
next_market.name,
wait_seconds / 3600,
)
await asyncio.wait_for(shutdown.wait(), timeout=wait_seconds)
except TimeoutError:
continue # Market should be open now
except ValueError as exc:
logger.error("Failed to find next market open: %s", exc)
await asyncio.sleep(TRADE_INTERVAL_SECONDS)
continue
logger.info("Processing market: %s (%d stocks)", market.name, len(watchlist))
# Process each stock in the watchlist
for stock_code in watchlist:
# Process each open market
for market in open_markets:
if shutdown.is_set():
break
# Retry logic for connection errors
for attempt in range(1, MAX_CONNECTION_RETRIES + 1):
# Notify market open if it just opened
if not _market_states.get(market.code, False):
try:
await trading_cycle(
broker,
overseas_broker,
brain,
risk,
db_conn,
decision_logger,
context_store,
criticality_assessor,
telegram,
market,
stock_code,
)
break # Success — exit retry loop
except CircuitBreakerTripped as exc:
logger.critical("Circuit breaker tripped — shutting down")
try:
await telegram.notify_circuit_breaker(
pnl_pct=exc.pnl_pct,
threshold=exc.threshold,
)
except Exception as notify_exc:
logger.warning(
"Circuit breaker notification failed: %s", notify_exc
)
raise
except ConnectionError as exc:
if attempt < MAX_CONNECTION_RETRIES:
logger.warning(
"Connection error for %s (attempt %d/%d): %s",
stock_code,
attempt,
MAX_CONNECTION_RETRIES,
exc,
)
await asyncio.sleep(2**attempt) # Exponential backoff
else:
logger.error(
"Connection error for %s (all retries exhausted): %s",
stock_code,
exc,
)
break # Give up on this stock
await telegram.notify_market_open(market.name)
except Exception as exc:
logger.exception("Unexpected error for %s: %s", stock_code, exc)
break # Don't retry on unexpected errors
logger.warning("Market open notification failed: %s", exc)
_market_states[market.code] = True
# Log priority queue metrics periodically
metrics = await priority_queue.get_metrics()
if metrics.total_enqueued > 0:
logger.info(
"Priority queue metrics: enqueued=%d, dequeued=%d, size=%d, timeouts=%d, errors=%d",
metrics.total_enqueued,
metrics.total_dequeued,
metrics.current_size,
metrics.total_timeouts,
metrics.total_errors,
)
# Volatility Hunter: Scan market periodically to update watchlist
now_timestamp = asyncio.get_event_loop().time()
last_scan = last_scan_time.get(market.code, 0.0)
if now_timestamp - last_scan >= SCAN_INTERVAL_SECONDS:
try:
# Scan all stocks in the universe
stock_universe = STOCK_UNIVERSE.get(market.code, [])
if stock_universe:
logger.info("Volatility Hunter: Scanning %s market", market.name)
scan_result = await market_scanner.scan_market(
market, stock_universe
)
# Wait for next cycle or shutdown
try:
await asyncio.wait_for(shutdown.wait(), timeout=TRADE_INTERVAL_SECONDS)
except TimeoutError:
pass # Normal — timeout means it's time for next cycle
# Update watchlist with top movers
current_watchlist = WATCHLISTS.get(market.code, [])
updated_watchlist = market_scanner.get_updated_watchlist(
current_watchlist,
scan_result,
max_replacements=2,
)
WATCHLISTS[market.code] = updated_watchlist
logger.info(
"Volatility Hunter: Watchlist updated for %s (%d top movers, %d breakouts)",
market.name,
len(scan_result.top_movers),
len(scan_result.breakouts),
)
last_scan_time[market.code] = now_timestamp
except Exception as exc:
logger.error("Volatility Hunter scan failed for %s: %s", market.name, exc)
# Get watchlist for this market
watchlist = WATCHLISTS.get(market.code, [])
if not watchlist:
logger.debug("No watchlist for market %s", market.code)
continue
logger.info("Processing market: %s (%d stocks)", market.name, len(watchlist))
# Process each stock in the watchlist
for stock_code in watchlist:
if shutdown.is_set():
break
# Retry logic for connection errors
for attempt in range(1, MAX_CONNECTION_RETRIES + 1):
try:
await trading_cycle(
broker,
overseas_broker,
brain,
risk,
db_conn,
decision_logger,
context_store,
criticality_assessor,
telegram,
market,
stock_code,
)
break # Success — exit retry loop
except CircuitBreakerTripped as exc:
logger.critical("Circuit breaker tripped — shutting down")
try:
await telegram.notify_circuit_breaker(
pnl_pct=exc.pnl_pct,
threshold=exc.threshold,
)
except Exception as notify_exc:
logger.warning(
"Circuit breaker notification failed: %s", notify_exc
)
raise
except ConnectionError as exc:
if attempt < MAX_CONNECTION_RETRIES:
logger.warning(
"Connection error for %s (attempt %d/%d): %s",
stock_code,
attempt,
MAX_CONNECTION_RETRIES,
exc,
)
await asyncio.sleep(2**attempt) # Exponential backoff
else:
logger.error(
"Connection error for %s (all retries exhausted): %s",
stock_code,
exc,
)
break # Give up on this stock
except Exception as exc:
logger.exception("Unexpected error for %s: %s", stock_code, exc)
break # Don't retry on unexpected errors
# Log priority queue metrics periodically
metrics = await priority_queue.get_metrics()
if metrics.total_enqueued > 0:
logger.info(
"Priority queue metrics: enqueued=%d, dequeued=%d, size=%d, timeouts=%d, errors=%d",
metrics.total_enqueued,
metrics.total_dequeued,
metrics.current_size,
metrics.total_timeouts,
metrics.total_errors,
)
# Wait for next cycle or shutdown
try:
await asyncio.wait_for(shutdown.wait(), timeout=TRADE_INTERVAL_SECONDS)
except TimeoutError:
pass # Normal — timeout means it's time for next cycle
finally:
# Clean up resources
await broker.close()