Files
The-Ouroboros/src/evolution/optimizer.py
agentson 61f5aaf4a3
Some checks failed
CI / test (pull_request) Has been cancelled
fix: resolve linting issues in token efficiency implementation
- Fix ambiguous variable names (l → layer)
- Remove unused imports and variables
- Organize import statements

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-04 18:35:55 +09:00

327 lines
12 KiB
Python

"""Evolution Engine — analyzes trade logs and generates new strategies.
This module:
1. Uses DecisionLogger.get_losing_decisions() to identify failing patterns
2. Analyzes failure patterns by time, market conditions, stock characteristics
3. Asks Gemini to generate improved strategy recommendations
4. Generates new strategy classes with enhanced decision-making logic
"""
from __future__ import annotations
import json
import logging
import sqlite3
import subprocess
import textwrap
from collections import Counter
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from google import genai
from src.config import Settings
from src.db import init_db
from src.logging.decision_logger import DecisionLogger
logger = logging.getLogger(__name__)
STRATEGIES_DIR = Path("src/strategies")
STRATEGY_TEMPLATE = textwrap.dedent("""\
\"\"\"Auto-generated strategy: {name}
Generated at: {timestamp}
Rationale: {rationale}
\"\"\"
from __future__ import annotations
from typing import Any
from src.strategies.base import BaseStrategy
class {class_name}(BaseStrategy):
\"\"\"Strategy: {name}\"\"\"
def evaluate(self, market_data: dict[str, Any]) -> dict[str, Any]:
{body}
""")
class EvolutionOptimizer:
"""Analyzes trade history and evolves trading strategies."""
def __init__(self, settings: Settings) -> None:
self._settings = settings
self._db_path = settings.DB_PATH
self._client = genai.Client(api_key=settings.GEMINI_API_KEY)
self._model_name = settings.GEMINI_MODEL
self._conn = init_db(self._db_path)
self._decision_logger = DecisionLogger(self._conn)
# ------------------------------------------------------------------
# Analysis
# ------------------------------------------------------------------
def analyze_failures(self, limit: int = 50) -> list[dict[str, Any]]:
"""Find high-confidence decisions that resulted in losses.
Uses DecisionLogger.get_losing_decisions() to retrieve failures.
"""
losing_decisions = self._decision_logger.get_losing_decisions(
min_confidence=80, min_loss=-100.0
)
# Limit results
if len(losing_decisions) > limit:
losing_decisions = losing_decisions[:limit]
# Convert to dict format for analysis
failures = []
for decision in losing_decisions:
failures.append({
"decision_id": decision.decision_id,
"timestamp": decision.timestamp,
"stock_code": decision.stock_code,
"market": decision.market,
"exchange_code": decision.exchange_code,
"action": decision.action,
"confidence": decision.confidence,
"rationale": decision.rationale,
"outcome_pnl": decision.outcome_pnl,
"outcome_accuracy": decision.outcome_accuracy,
"context_snapshot": decision.context_snapshot,
"input_data": decision.input_data,
})
return failures
def identify_failure_patterns(
self, failures: list[dict[str, Any]]
) -> dict[str, Any]:
"""Identify patterns in losing decisions.
Analyzes:
- Time patterns (hour of day, day of week)
- Market conditions (volatility, volume)
- Stock characteristics (price range, market)
- Common failure modes in rationale
"""
if not failures:
return {"pattern_count": 0, "patterns": {}}
patterns = {
"markets": Counter(),
"actions": Counter(),
"hours": Counter(),
"avg_confidence": 0.0,
"avg_loss": 0.0,
"total_failures": len(failures),
}
total_confidence = 0
total_loss = 0.0
for failure in failures:
# Market distribution
patterns["markets"][failure.get("market", "UNKNOWN")] += 1
# Action distribution
patterns["actions"][failure.get("action", "UNKNOWN")] += 1
# Time pattern (extract hour from ISO timestamp)
timestamp = failure.get("timestamp", "")
if timestamp:
try:
dt = datetime.fromisoformat(timestamp)
patterns["hours"][dt.hour] += 1
except (ValueError, AttributeError):
pass
# Aggregate metrics
total_confidence += failure.get("confidence", 0)
total_loss += failure.get("outcome_pnl", 0.0)
patterns["avg_confidence"] = (
round(total_confidence / len(failures), 2) if failures else 0.0
)
patterns["avg_loss"] = (
round(total_loss / len(failures), 2) if failures else 0.0
)
# Convert Counters to regular dicts for JSON serialization
patterns["markets"] = dict(patterns["markets"])
patterns["actions"] = dict(patterns["actions"])
patterns["hours"] = dict(patterns["hours"])
return patterns
def get_performance_summary(self) -> dict[str, Any]:
"""Return aggregate performance metrics from trade logs."""
conn = sqlite3.connect(self._db_path)
try:
row = conn.execute(
"""
SELECT
COUNT(*) as total_trades,
SUM(CASE WHEN pnl > 0 THEN 1 ELSE 0 END) as wins,
SUM(CASE WHEN pnl < 0 THEN 1 ELSE 0 END) as losses,
COALESCE(AVG(pnl), 0) as avg_pnl,
COALESCE(SUM(pnl), 0) as total_pnl
FROM trades
"""
).fetchone()
return {
"total_trades": row[0],
"wins": row[1] or 0,
"losses": row[2] or 0,
"avg_pnl": round(row[3], 2),
"total_pnl": round(row[4], 2),
}
finally:
conn.close()
# ------------------------------------------------------------------
# Strategy Generation
# ------------------------------------------------------------------
async def generate_strategy(self, failures: list[dict[str, Any]]) -> Path | None:
"""Ask Gemini to generate a new strategy based on failure analysis.
Integrates failure patterns and market conditions to create improved strategies.
Returns the path to the generated strategy file, or None on failure.
"""
# Identify failure patterns first
patterns = self.identify_failure_patterns(failures)
prompt = (
"You are a quantitative trading strategy developer.\n"
"Analyze these failed trades and their patterns, then generate an improved strategy.\n\n"
f"Failure Patterns:\n{json.dumps(patterns, indent=2)}\n\n"
f"Sample Failed Trades (first 5):\n"
f"{json.dumps(failures[:5], indent=2, default=str)}\n\n"
"Based on these patterns, generate an improved trading strategy.\n"
"The strategy should:\n"
"1. Avoid the identified failure patterns\n"
"2. Consider market-specific conditions\n"
"3. Adjust confidence based on historical performance\n\n"
"Generate a Python method body that inherits from BaseStrategy.\n"
"The method signature is: evaluate(self, market_data: dict) -> dict\n"
"The method must return a dict with keys: action, confidence, rationale.\n"
"Respond with ONLY the method body (Python code), no class definition.\n"
)
try:
response = await self._client.aio.models.generate_content(
model=self._model_name, contents=prompt,
)
body = response.text.strip()
except Exception as exc:
logger.error("Failed to generate strategy: %s", exc)
return None
# Clean up code fences
if body.startswith("```"):
lines = body.split("\n")
body = "\n".join(lines[1:-1])
# Create strategy file
timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")
version = f"v{timestamp}"
class_name = f"Strategy_{version}"
file_name = f"{version}_evolved.py"
STRATEGIES_DIR.mkdir(parents=True, exist_ok=True)
file_path = STRATEGIES_DIR / file_name
# Indent the body for the class method
indented_body = textwrap.indent(body, " ")
# Generate rationale from patterns
rationale = f"Auto-evolved from {len(failures)} failures. "
rationale += f"Primary failure markets: {list(patterns.get('markets', {}).keys())}. "
rationale += f"Average loss: {patterns.get('avg_loss', 0.0)}"
content = STRATEGY_TEMPLATE.format(
name=version,
timestamp=datetime.now(UTC).isoformat(),
rationale=rationale,
class_name=class_name,
body=indented_body.strip(),
)
file_path.write_text(content)
logger.info("Generated strategy file: %s", file_path)
return file_path
# ------------------------------------------------------------------
# Validation
# ------------------------------------------------------------------
def validate_strategy(self, strategy_path: Path) -> bool:
"""Run pytest on the generated strategy. Returns True if all tests pass."""
logger.info("Validating strategy: %s", strategy_path)
result = subprocess.run(
["python", "-m", "pytest", "tests/", "-v", "--tb=short"],
capture_output=True,
text=True,
timeout=120,
)
if result.returncode == 0:
logger.info("Strategy validation PASSED")
return True
else:
logger.warning(
"Strategy validation FAILED:\n%s", result.stdout + result.stderr
)
# Clean up failing strategy
strategy_path.unlink(missing_ok=True)
return False
# ------------------------------------------------------------------
# PR Simulation
# ------------------------------------------------------------------
def create_pr_simulation(self, strategy_path: Path) -> dict[str, str]:
"""Simulate creating a pull request for the new strategy."""
pr = {
"title": f"[Evolution] New strategy: {strategy_path.stem}",
"branch": f"evolution/{strategy_path.stem}",
"body": (
f"Auto-generated strategy from evolution engine.\n"
f"File: {strategy_path}\n"
f"All tests passed."
),
"status": "ready_for_review",
}
logger.info("PR simulation created: %s", pr["title"])
return pr
# ------------------------------------------------------------------
# Full Pipeline
# ------------------------------------------------------------------
async def evolve(self) -> dict[str, Any] | None:
"""Run the full evolution pipeline.
1. Analyze failures
2. Generate new strategy
3. Validate with tests
4. Create PR simulation
Returns PR info on success, None on failure.
"""
failures = self.analyze_failures()
if not failures:
logger.info("No failure patterns found — skipping evolution")
return None
strategy_path = await self.generate_strategy(failures)
if strategy_path is None:
return None
if not self.validate_strategy(strategy_path):
return None
return self.create_pr_simulation(strategy_path)