Files
The-Ouroboros/src/evolution/optimizer.py
agentson b26ff0c1b8
Some checks failed
CI / test (pull_request) Has been cancelled
feat: implement timezone-based global market auto-selection
Implement comprehensive multi-market trading system with automatic
market selection based on timezone and trading hours.

## New Features
- Market schedule module with 10 global markets (KR, US, JP, HK, CN, VN)
- Overseas broker for KIS API international stock trading
- Automatic market detection based on current time and timezone
- Next market open waiting logic when all markets closed
- ConnectionError retry with exponential backoff (max 3 attempts)

## Architecture Changes
- Market-aware trading cycle with domestic/overseas broker routing
- Market context in AI prompts for better decision making
- Database schema extended with market and exchange_code columns
- Config setting ENABLED_MARKETS for market selection

## Testing
- 19 new tests for market schedule (timezone, DST, lunch breaks)
- All 54 tests passing
- Lint fixes with ruff

## Files Added
- src/markets/schedule.py - Market schedule and timezone logic
- src/broker/overseas.py - KIS overseas stock API client
- tests/test_market_schedule.py - Market schedule test suite

## Files Modified
- src/main.py - Multi-market main loop with retry logic
- src/config.py - ENABLED_MARKETS setting
- src/db.py - market/exchange_code columns with migration
- src/brain/gemini_client.py - Dynamic market context in prompts

Resolves #5

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-04 09:29:25 +09:00

232 lines
7.8 KiB
Python

"""Evolution Engine — analyzes trade logs and generates new strategies.
This module:
1. Reads trade_logs.db to identify failing patterns
2. Asks Gemini to generate a new strategy class
3. Runs pytest on the generated file
4. Creates a simulated PR if tests pass
"""
from __future__ import annotations
import json
import logging
import sqlite3
import subprocess
import textwrap
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from google import genai
from src.config import Settings
logger = logging.getLogger(__name__)
STRATEGIES_DIR = Path("src/strategies")
STRATEGY_TEMPLATE = textwrap.dedent("""\
\"\"\"Auto-generated strategy: {name}
Generated at: {timestamp}
Rationale: {rationale}
\"\"\"
from __future__ import annotations
from typing import Any
from src.strategies.base import BaseStrategy
class {class_name}(BaseStrategy):
\"\"\"Strategy: {name}\"\"\"
def evaluate(self, market_data: dict[str, Any]) -> dict[str, Any]:
{body}
""")
class EvolutionOptimizer:
"""Analyzes trade history and evolves trading strategies."""
def __init__(self, settings: Settings) -> None:
self._settings = settings
self._db_path = settings.DB_PATH
self._client = genai.Client(api_key=settings.GEMINI_API_KEY)
self._model_name = settings.GEMINI_MODEL
# ------------------------------------------------------------------
# Analysis
# ------------------------------------------------------------------
def analyze_failures(self, limit: int = 50) -> list[dict[str, Any]]:
"""Find trades where high confidence led to losses."""
conn = sqlite3.connect(self._db_path)
conn.row_factory = sqlite3.Row
try:
rows = conn.execute(
"""
SELECT stock_code, action, confidence, pnl, rationale, timestamp
FROM trades
WHERE confidence >= 80 AND pnl < 0
ORDER BY pnl ASC
LIMIT ?
""",
(limit,),
).fetchall()
return [dict(r) for r in rows]
finally:
conn.close()
def get_performance_summary(self) -> dict[str, Any]:
"""Return aggregate performance metrics from trade logs."""
conn = sqlite3.connect(self._db_path)
try:
row = conn.execute(
"""
SELECT
COUNT(*) as total_trades,
SUM(CASE WHEN pnl > 0 THEN 1 ELSE 0 END) as wins,
SUM(CASE WHEN pnl < 0 THEN 1 ELSE 0 END) as losses,
COALESCE(AVG(pnl), 0) as avg_pnl,
COALESCE(SUM(pnl), 0) as total_pnl
FROM trades
"""
).fetchone()
return {
"total_trades": row[0],
"wins": row[1] or 0,
"losses": row[2] or 0,
"avg_pnl": round(row[3], 2),
"total_pnl": round(row[4], 2),
}
finally:
conn.close()
# ------------------------------------------------------------------
# Strategy Generation
# ------------------------------------------------------------------
async def generate_strategy(self, failures: list[dict[str, Any]]) -> Path | None:
"""Ask Gemini to generate a new strategy based on failure analysis.
Returns the path to the generated strategy file, or None on failure.
"""
prompt = (
"You are a quantitative trading strategy developer.\n"
"Analyze these failed trades and generate an improved strategy.\n\n"
f"Failed trades:\n{json.dumps(failures, indent=2, default=str)}\n\n"
"Generate a Python class that inherits from BaseStrategy.\n"
"The class must have an `evaluate(self, market_data: dict) -> dict` method.\n"
"The method must return a dict with keys: action, confidence, rationale.\n"
"Respond with ONLY the method body (Python code), no class definition.\n"
)
try:
response = await self._client.aio.models.generate_content(
model=self._model_name, contents=prompt,
)
body = response.text.strip()
except Exception as exc:
logger.error("Failed to generate strategy: %s", exc)
return None
# Clean up code fences
if body.startswith("```"):
lines = body.split("\n")
body = "\n".join(lines[1:-1])
# Create strategy file
timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")
version = f"v{timestamp}"
class_name = f"Strategy_{version}"
file_name = f"{version}_evolved.py"
STRATEGIES_DIR.mkdir(parents=True, exist_ok=True)
file_path = STRATEGIES_DIR / file_name
# Indent the body for the class method
indented_body = textwrap.indent(body, " ")
content = STRATEGY_TEMPLATE.format(
name=version,
timestamp=datetime.now(UTC).isoformat(),
rationale="Auto-evolved from failure analysis",
class_name=class_name,
body=indented_body.strip(),
)
file_path.write_text(content)
logger.info("Generated strategy file: %s", file_path)
return file_path
# ------------------------------------------------------------------
# Validation
# ------------------------------------------------------------------
def validate_strategy(self, strategy_path: Path) -> bool:
"""Run pytest on the generated strategy. Returns True if all tests pass."""
logger.info("Validating strategy: %s", strategy_path)
result = subprocess.run(
["python", "-m", "pytest", "tests/", "-v", "--tb=short"],
capture_output=True,
text=True,
timeout=120,
)
if result.returncode == 0:
logger.info("Strategy validation PASSED")
return True
else:
logger.warning(
"Strategy validation FAILED:\n%s", result.stdout + result.stderr
)
# Clean up failing strategy
strategy_path.unlink(missing_ok=True)
return False
# ------------------------------------------------------------------
# PR Simulation
# ------------------------------------------------------------------
def create_pr_simulation(self, strategy_path: Path) -> dict[str, str]:
"""Simulate creating a pull request for the new strategy."""
pr = {
"title": f"[Evolution] New strategy: {strategy_path.stem}",
"branch": f"evolution/{strategy_path.stem}",
"body": (
f"Auto-generated strategy from evolution engine.\n"
f"File: {strategy_path}\n"
f"All tests passed."
),
"status": "ready_for_review",
}
logger.info("PR simulation created: %s", pr["title"])
return pr
# ------------------------------------------------------------------
# Full Pipeline
# ------------------------------------------------------------------
async def evolve(self) -> dict[str, Any] | None:
"""Run the full evolution pipeline.
1. Analyze failures
2. Generate new strategy
3. Validate with tests
4. Create PR simulation
Returns PR info on success, None on failure.
"""
failures = self.analyze_failures()
if not failures:
logger.info("No failure patterns found — skipping evolution")
return None
strategy_path = await self.generate_strategy(failures)
if strategy_path is None:
return None
if not self.validate_strategy(strategy_path):
return None
return self.create_pr_simulation(strategy_path)