Compare commits

...

7 Commits

Author SHA1 Message Date
agentson
ae7195c829 feat: implement evolution engine for self-improving strategies
Some checks failed
CI / test (pull_request) Has been cancelled
Complete Pillar 4 implementation with comprehensive testing and analysis.

Components:
- EvolutionOptimizer: Analyzes losing decisions from DecisionLogger,
  identifies failure patterns (time, market, action), and uses Gemini
  to generate improved strategies with auto-deployment capability
- ABTester: A/B testing framework with statistical significance testing
  (two-sample t-test), performance comparison, and deployment criteria
  (>60% win rate, >20 trades minimum)
- PerformanceTracker: Tracks strategy win rates, monitors improvement
  trends over time, generates comprehensive dashboards with daily/weekly
  metrics and trend analysis

Key Features:
- Uses DecisionLogger.get_losing_decisions() for failure identification
- Pattern analysis: market distribution, action types, time-of-day patterns
- Gemini integration for AI-powered strategy generation
- Statistical validation using scipy.stats.ttest_ind
- Sharpe ratio calculation for risk-adjusted returns
- Auto-deploy strategies meeting 60% win rate threshold
- Performance dashboard with JSON export capability

Testing:
- 24 comprehensive tests covering all evolution components
- 90% coverage of evolution module (304 lines, 31 missed)
- Integration tests for full evolution pipeline
- All 105 project tests passing with 72% overall coverage

Dependencies:
- Added scipy>=1.11,<2 for statistical analysis

Closes #19

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-04 16:34:10 +09:00
agentson
2f9efdad64 feat: integrate decision logger with main trading loop
Some checks failed
CI / test (pull_request) Has been cancelled
- Add DecisionLogger to main.py trading cycle
- Log all decisions with context snapshot (L1-L2 layers)
- Capture market data and balance info in context
- Add comprehensive tests (9 tests, 100% coverage)
- All tests passing (63 total)

Implements issue #17 acceptance criteria:
-  decision_logs table with proper schema
-  DecisionLogger class with all required methods
-  Automatic logging in trading loop
-  Tests achieve 100% coverage of decision_logger.py
- ⚠️  Context snapshot uses L1-L2 data (L3-L7 pending issue #15)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-04 15:47:53 +09:00
agentson
6551d7af79 WIP: Add decision logging infrastructure
- Add decision_logs table to database schema
- Create decision logger module with comprehensive logging
- Prepare for decision tracking and audit trail

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-04 15:47:53 +09:00
7515a5a314 Merge pull request 'feat: implement L1-L7 context tree for multi-layered memory management' (#16) from feature/issue-15-context-tree into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #16
2026-02-04 15:40:00 +09:00
agentson
254b543c89 Merge main into feature/issue-15-context-tree
Some checks failed
CI / test (pull_request) Has been cancelled
Resolved conflicts in CLAUDE.md by:
- Keeping main's refactored structure (docs split into separate files)
- Added Context Tree documentation link to docs section
- Preserved all constraints and guidelines from main

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-04 15:25:13 +09:00
2becbddb4a Merge pull request 'refactor: split CLAUDE.md into focused documentation structure' (#14) from feature/issue-13-docs-refactor into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #14
2026-02-04 10:15:09 +09:00
agentson
05e8986ff5 refactor: split CLAUDE.md into focused documentation structure
Some checks failed
CI / test (pull_request) Has been cancelled
- Restructure docs into topic-specific files to minimize context
- Create docs/workflow.md (Git + Agent workflow)
- Create docs/commands.md (Common failures + build commands)
- Create docs/architecture.md (System design + data flow)
- Create docs/testing.md (Test structure + guidelines)
- Rewrite CLAUDE.md as concise hub with links to detailed docs
- Update .gitignore to exclude data/ directory

Benefits:
- Reduced context size for AI assistants
- Faster reference lookups
- Better maintainability
- Topic-focused documentation

Closes #13

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-04 10:13:48 +09:00
17 changed files with 2662 additions and 261 deletions

1
.gitignore vendored
View File

@@ -174,3 +174,4 @@ cython_debug/
# PyPI configuration file
.pypirc
data/

310
CLAUDE.md
View File

@@ -1,258 +1,98 @@
# CLAUDE.md
# The Ouroboros
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
AI-powered trading agent for global stock markets with self-evolution capabilities.
## Git Workflow Policy
## Quick Start
**CRITICAL: All code changes MUST follow this workflow. Direct pushes to `main` are ABSOLUTELY PROHIBITED.**
1. **Create Gitea Issue First** — All features, bug fixes, and policy changes require a Gitea issue before any code is written
2. **Create Feature Branch** — Branch from `main` using format `feature/issue-{N}-{short-description}`
3. **Implement Changes** — Write code, tests, and documentation on the feature branch
4. **Create Pull Request** — Submit PR to `main` branch referencing the issue number
5. **Review & Merge** — After approval, merge via PR (squash or merge commit)
**Never commit directly to `main`.** This policy applies to all changes, no exceptions.
## Agent Workflow
**Modern AI development leverages specialized agents for concurrent, efficient task execution.**
### Parallel Execution Strategy
Use **git worktree** or **subagents** (via the Task tool) to handle multiple requirements simultaneously:
- Each task runs in independent context
- Parallel branches for concurrent features
- Isolated test environments prevent interference
- Faster iteration with distributed workload
### Specialized Agent Roles
Deploy task-specific agents as needed instead of handling everything in the main conversation:
- **Conversational Agent** (main) — Interface with user, coordinate other agents
- **Ticket Management Agent** — Create/update Gitea issues, track task status
- **Design Agent** — Architectural planning, RFC documents, API design
- **Code Writing Agent** — Implementation following specs
- **Testing Agent** — Write tests, verify coverage, run test suites
- **Documentation Agent** — Update docs, docstrings, CLAUDE.md, README
- **Review Agent** — Code review, lint checks, security audits
- **Custom Agents** — Created dynamically for specialized tasks (performance analysis, migration scripts, etc.)
### When to Use Agents
**Prefer spawning specialized agents for:**
1. Complex multi-file changes requiring exploration
2. Tasks with clear, isolated scope (e.g., "write tests for module X")
3. Parallel work streams (feature A + bugfix B simultaneously)
4. Long-running analysis (codebase search, dependency audit)
5. Tasks requiring different contexts (multiple git worktrees)
**Use the main conversation for:**
1. User interaction and clarification
2. Quick single-file edits
3. Coordinating agent work
4. High-level decision making
### Implementation
```python
# Example: Spawn parallel test and documentation agents
task_tool(
subagent_type="general-purpose",
prompt="Write comprehensive tests for src/markets/schedule.py",
description="Write schedule tests"
)
task_tool(
subagent_type="general-purpose",
prompt="Update README.md with global market feature documentation",
description="Update README"
)
```
Use `run_in_background=True` for independent tasks that don't block subsequent work.
## Common Command Failures
**Critical: Learn from failures. Never repeat the same failed command without modification.**
### tea CLI (Gitea Command Line Tool)
#### ❌ TTY Error - Interactive Confirmation Fails
```bash
~/bin/tea issues create --repo X --title "Y" --description "Z"
# Error: huh: could not open a new TTY: open /dev/tty: no such device or address
```
**💡 Reason:** tea tries to open `/dev/tty` for interactive confirmation prompts, which is unavailable in non-interactive environments.
**✅ Solution:** Use `YES=""` environment variable to bypass confirmation
```bash
YES="" ~/bin/tea issues create --repo jihoson/The-Ouroboros --title "Title" --description "Body"
YES="" ~/bin/tea issues edit <number> --repo jihoson/The-Ouroboros --description "Updated body"
YES="" ~/bin/tea pulls create --repo jihoson/The-Ouroboros --head feature-branch --base main --title "Title" --description "Body"
```
**📝 Notes:**
- Always set default login: `~/bin/tea login default local`
- Use `--repo jihoson/The-Ouroboros` when outside repo directory
- tea is preferred over direct Gitea API calls for consistency
#### ❌ Wrong Parameter Name
```bash
tea issues create --body "text"
# Error: flag provided but not defined: -body
```
**💡 Reason:** Parameter is `--description`, not `--body`.
**✅ Solution:** Use correct parameter name
```bash
YES="" ~/bin/tea issues create --description "text"
```
### Gitea API (Direct HTTP Calls)
#### ❌ Wrong Hostname
```bash
curl http://gitea.local:3000/api/v1/...
# Error: Could not resolve host: gitea.local
```
**💡 Reason:** Gitea instance runs on `localhost:3000`, not `gitea.local`.
**✅ Solution:** Use correct hostname (but prefer tea CLI)
```bash
curl http://localhost:3000/api/v1/repos/jihoson/The-Ouroboros/issues \
-H "Authorization: token $GITEA_TOKEN" \
-H "Content-Type: application/json" \
-d '{"title":"...", "body":"..."}'
```
**📝 Notes:**
- Prefer `tea` CLI over direct API calls
- Only use curl for operations tea doesn't support
### Git Commands
#### ❌ User Not Configured
```bash
git commit -m "message"
# Error: Author identity unknown
```
**💡 Reason:** Git user.name and user.email not set.
**✅ Solution:** Configure git user
```bash
git config user.name "agentson"
git config user.email "agentson@localhost"
```
#### ❌ Permission Denied on Push
```bash
git push origin branch
# Error: User permission denied for writing
```
**💡 Reason:** Repository access token lacks write permissions or user lacks repo write access.
**✅ Solution:**
1. Verify user has write access to repository (admin grants this)
2. Ensure git credential has correct token with `write:repository` scope
3. Check remote URL uses correct authentication
### Python/Pytest
#### ❌ Module Import Error
```bash
pytest tests/test_foo.py
# ModuleNotFoundError: No module named 'src'
```
**💡 Reason:** Package not installed in development mode.
**✅ Solution:** Install package with dev dependencies
```bash
# Setup
pip install -e ".[dev]"
cp .env.example .env
# Edit .env with your KIS and Gemini API credentials
# Test
pytest -v --cov=src
# Run (paper trading)
python -m src.main --mode=paper
```
#### ❌ Async Test Hangs
```python
async def test_something(): # Hangs forever
result = await async_function()
```
**💡 Reason:** Missing pytest-asyncio or wrong configuration.
## Documentation
**✅ Solution:** Already configured in pyproject.toml
```toml
[tool.pytest.ini_options]
asyncio_mode = "auto"
```
No decorator needed for async tests.
- **[Workflow Guide](docs/workflow.md)** — Git workflow policy and agent-based development
- **[Command Reference](docs/commands.md)** — Common failures, build commands, troubleshooting
- **[Architecture](docs/architecture.md)** — System design, components, data flow
- **[Context Tree](docs/context-tree.md)** — L1-L7 hierarchical memory system
- **[Testing](docs/testing.md)** — Test structure, coverage requirements, writing tests
- **[Agent Policies](docs/agents.md)** — Prime directives, constraints, prohibited actions
## Build & Test Commands
## Core Principles
1. **Safety First** — Risk manager is READ-ONLY and enforces circuit breakers
2. **Test Everything** — 80% coverage minimum, all changes require tests
3. **Issue-Driven Development** — All work goes through Gitea issues → feature branches → PRs
4. **Agent Specialization** — Use dedicated agents for design, coding, testing, docs, review
## Project Structure
```
src/
├── broker/ # KIS API client (domestic + overseas)
├── brain/ # Gemini AI decision engine
├── core/ # Risk manager (READ-ONLY)
├── evolution/ # Self-improvement optimizer
├── markets/ # Market schedules and timezone handling
├── db.py # SQLite trade logging
├── main.py # Trading loop orchestrator
└── config.py # Settings (from .env)
tests/ # 54 tests across 4 files
docs/ # Extended documentation
```
## Key Commands
```bash
# Install all dependencies (production + dev)
pip install ".[dev]"
pytest -v --cov=src # Run tests with coverage
ruff check src/ tests/ # Lint
mypy src/ --strict # Type check
# Run full test suite with coverage
pytest -v --cov=src --cov-report=term-missing
python -m src.main --mode=paper # Paper trading
python -m src.main --mode=live # Live trading (⚠️ real money)
# Run a single test file
pytest tests/test_risk.py -v
# Run a single test by name
pytest tests/test_brain.py -k "test_parse_valid_json" -v
# Lint
ruff check src/ tests/
# Type check (strict mode, non-blocking in CI)
mypy src/ --strict
# Run the trading agent
python -m src.main --mode=paper
# Docker
docker compose up -d ouroboros # Run agent
docker compose --profile test up test # Run tests in container
# Gitea workflow (requires tea CLI)
YES="" ~/bin/tea issues create --repo jihoson/The-Ouroboros --title "..." --description "..."
YES="" ~/bin/tea pulls create --head feature-branch --base main --title "..." --description "..."
```
## Architecture
## Markets Supported
Self-evolving AI trading agent for Korean stock markets (KIS API). The main loop in `src/main.py` orchestrates five components in a 60-second cycle per stock:
- 🇰🇷 Korea (KRX)
- 🇺🇸 United States (NASDAQ, NYSE, AMEX)
- 🇯🇵 Japan (TSE)
- 🇭🇰 Hong Kong (SEHK)
- 🇨🇳 China (Shanghai, Shenzhen)
- 🇻🇳 Vietnam (Hanoi, HCM)
1. **Broker** (`src/broker/kis_api.py`) — Async KIS API client with automatic OAuth token refresh, leaky-bucket rate limiter (10 RPS), and POST body hash-key signing. Uses a custom SSL context with disabled hostname verification for the VTS (virtual trading) endpoint due to a known certificate mismatch.
Markets auto-detected based on timezone and enabled in `ENABLED_MARKETS` env variable.
2. **Brain** (`src/brain/gemini_client.py`) — Sends structured prompts to Google Gemini, parses JSON responses into `TradeDecision` objects. Forces HOLD when confidence < threshold (default 80). Falls back to safe HOLD on any parse/API error.
## Critical Constraints
3. **Risk Manager** (`src/core/risk_manager.py`) — **READ-ONLY by policy** (see `docs/agents.md`). Circuit breaker halts all trading via `SystemExit` when daily P&L drops below -3.0%. Fat-finger check rejects orders exceeding 30% of available cash.
⚠️ **Non-Negotiable Rules** (see [docs/agents.md](docs/agents.md)):
4. **Context Tree** (`src/context/`) — **NEW: Pillar 2 implementation.** 7-tier hierarchical memory (L1-L7) from real-time quotes to generational wisdom. Auto-aggregates daily → weekly → monthly → quarterly → annual → legacy. See [`docs/context-tree.md`](docs/context-tree.md) for details.
- `src/core/risk_manager.py` is **READ-ONLY** — changes require human approval
- Circuit breaker at -3.0% P&L — may only be made **stricter**
- Fat-finger protection: max 30% of cash per order — always enforced
- Confidence < 80 → force HOLD — cannot be weakened
- All code changes → corresponding tests → coverage ≥ 80%
5. **Evolution** (`src/evolution/optimizer.py`) — Analyzes high-confidence losing trades from SQLite, asks Gemini to generate new `BaseStrategy` subclasses, validates them by running the full pytest suite, and simulates PR creation.
## Contributing
**Data flow per cycle:** Fetch orderbook + balance → calculate P&L → query context tree → get Gemini decision → validate with risk manager → execute order → log to SQLite + context layers (`src/db.py`).
See [docs/workflow.md](docs/workflow.md) for the complete development process.
## Key Constraints (from `docs/agents.md`)
- `core/risk_manager.py` is **READ-ONLY**. Changes require human approval.
- Circuit breaker threshold (-3.0%) may only be made stricter, never relaxed.
- Fat-finger protection (30% max order size) must always be enforced.
- Confidence < 80 **must** force HOLD — this rule cannot be weakened.
- All code changes require corresponding tests. Coverage must stay >= 80%.
- Generated strategies must pass the full test suite before activation.
## Configuration
Pydantic Settings loaded from `.env` (see `.env.example`). Required vars: `KIS_APP_KEY`, `KIS_APP_SECRET`, `KIS_ACCOUNT_NO` (format `XXXXXXXX-XX`), `GEMINI_API_KEY`. Tests use in-memory SQLite (`DB_PATH=":memory:"`) and dummy credentials via `tests/conftest.py`.
## Test Structure
72 tests across five files. `asyncio_mode = "auto"` in pyproject.toml — async tests need no special decorator. The `settings` fixture in `conftest.py` provides safe defaults with test credentials and in-memory DB.
- `test_risk.py` (11) — Circuit breaker boundaries, fat-finger edge cases
- `test_broker.py` (6) — Token lifecycle, rate limiting, hash keys, network errors
- `test_brain.py` (18) — JSON parsing, confidence threshold, malformed responses, prompt construction
- `test_market_schedule.py` (19) — Market open/close logic, timezone handling, DST, lunch breaks
- `test_context.py` (18) — **NEW:** Context tree CRUD, aggregation logic, retention policies, layer metadata
**TL;DR:**
1. Create issue in Gitea
2. Create feature branch: `feature/issue-N-description`
3. Implement with tests
4. Open PR
5. Merge after review

191
docs/architecture.md Normal file
View File

@@ -0,0 +1,191 @@
# System Architecture
## Overview
Self-evolving AI trading agent for global stock markets via KIS (Korea Investment & Securities) API. The main loop in `src/main.py` orchestrates four components in a 60-second cycle per stock across multiple markets.
## Core Components
### 1. Broker (`src/broker/`)
**KISBroker** (`kis_api.py`) — Async KIS API client for domestic Korean market
- Automatic OAuth token refresh (valid for 24 hours)
- Leaky-bucket rate limiter (10 requests per second)
- POST body hash-key signing for order authentication
- Custom SSL context with disabled hostname verification for VTS (virtual trading) endpoint due to known certificate mismatch
**OverseasBroker** (`overseas.py`) — KIS overseas stock API wrapper
- Reuses KISBroker infrastructure (session, token, rate limiter) via composition
- Supports 9 global markets: US (NASDAQ/NYSE/AMEX), Japan, Hong Kong, China (Shanghai/Shenzhen), Vietnam (Hanoi/HCM)
- Different API endpoints for overseas price/balance/order operations
**Market Schedule** (`src/markets/schedule.py`) — Timezone-aware market management
- `MarketInfo` dataclass with timezone, trading hours, lunch breaks
- Automatic DST handling via `zoneinfo.ZoneInfo`
- `is_market_open()` checks weekends, trading hours, lunch breaks
- `get_open_markets()` returns currently active markets
- `get_next_market_open()` finds next market to open and when
### 2. Brain (`src/brain/gemini_client.py`)
**GeminiClient** — AI decision engine powered by Google Gemini
- Constructs structured prompts from market data
- Parses JSON responses into `TradeDecision` objects (`action`, `confidence`, `rationale`)
- Forces HOLD when confidence < threshold (default 80)
- Falls back to safe HOLD on any parse/API error
- Handles markdown-wrapped JSON, malformed responses, invalid actions
### 3. Risk Manager (`src/core/risk_manager.py`)
**RiskManager** — Safety circuit breaker and order validation
⚠️ **READ-ONLY by policy** (see [`docs/agents.md`](./agents.md))
- **Circuit Breaker**: Halts all trading via `SystemExit` when daily P&L drops below -3.0%
- Threshold may only be made stricter, never relaxed
- Calculated as `(total_eval - purchase_total) / purchase_total * 100`
- **Fat-Finger Protection**: Rejects orders exceeding 30% of available cash
- Must always be enforced, cannot be disabled
### 4. Evolution (`src/evolution/optimizer.py`)
**StrategyOptimizer** — Self-improvement loop
- Analyzes high-confidence losing trades from SQLite
- Asks Gemini to generate new `BaseStrategy` subclasses
- Validates generated strategies by running full pytest suite
- Simulates PR creation for human review
- Only activates strategies that pass all tests
## Data Flow
```
┌─────────────────────────────────────────────────────────────┐
│ Main Loop (60s cycle per stock, per market) │
└─────────────────────────────────────────────────────────────┘
┌──────────────────────────────────┐
│ Market Schedule Check │
│ - Get open markets │
│ - Filter by enabled markets │
│ - Wait if all closed │
└──────────────────┬────────────────┘
┌──────────────────────────────────┐
│ Broker: Fetch Market Data │
│ - Domestic: orderbook + balance │
│ - Overseas: price + balance │
└──────────────────┬────────────────┘
┌──────────────────────────────────┐
│ Calculate P&L │
│ pnl_pct = (eval - cost) / cost │
└──────────────────┬────────────────┘
┌──────────────────────────────────┐
│ Brain: Get Decision │
│ - Build prompt with market data │
│ - Call Gemini API │
│ - Parse JSON response │
│ - Return TradeDecision │
└──────────────────┬────────────────┘
┌──────────────────────────────────┐
│ Risk Manager: Validate Order │
│ - Check circuit breaker │
│ - Check fat-finger limit │
│ - Raise if validation fails │
└──────────────────┬────────────────┘
┌──────────────────────────────────┐
│ Broker: Execute Order │
│ - Domestic: send_order() │
│ - Overseas: send_overseas_order() │
└──────────────────┬────────────────┘
┌──────────────────────────────────┐
│ Database: Log Trade │
│ - SQLite (data/trades.db) │
│ - Track: action, confidence, │
│ rationale, market, exchange │
└───────────────────────────────────┘
```
## Database Schema
**SQLite** (`src/db.py`)
```sql
CREATE TABLE trades (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
stock_code TEXT NOT NULL,
action TEXT NOT NULL, -- BUY | SELL | HOLD
confidence INTEGER NOT NULL, -- 0-100
rationale TEXT,
quantity INTEGER,
price REAL,
pnl REAL DEFAULT 0.0,
market TEXT DEFAULT 'KR', -- KR | US_NASDAQ | JP | etc.
exchange_code TEXT DEFAULT 'KRX' -- KRX | NASD | NYSE | etc.
);
```
Auto-migration: Adds `market` and `exchange_code` columns if missing for backward compatibility.
## Configuration
**Pydantic Settings** (`src/config.py`)
Loaded from `.env` file:
```bash
# Required
KIS_APP_KEY=your_app_key
KIS_APP_SECRET=your_app_secret
KIS_ACCOUNT_NO=XXXXXXXX-XX
GEMINI_API_KEY=your_gemini_key
# Optional
MODE=paper # paper | live
DB_PATH=data/trades.db
CONFIDENCE_THRESHOLD=80
MAX_LOSS_PCT=3.0
MAX_ORDER_PCT=30.0
ENABLED_MARKETS=KR,US_NASDAQ # Comma-separated market codes
```
Tests use in-memory SQLite (`DB_PATH=":memory:"`) and dummy credentials via `tests/conftest.py`.
## Error Handling
### Connection Errors (Broker API)
- Retry with exponential backoff (2^attempt seconds)
- Max 3 retries per stock
- After exhaustion, skip stock and continue with next
### API Quota Errors (Gemini)
- Return safe HOLD decision with confidence=0
- Log error but don't crash
- Agent continues trading on next cycle
### Circuit Breaker Tripped
- Immediately halt via `SystemExit`
- Log critical message
- Requires manual intervention to restart
### Market Closed
- Wait until next market opens
- Use `get_next_market_open()` to calculate wait time
- Sleep until market open time

156
docs/commands.md Normal file
View File

@@ -0,0 +1,156 @@
# Command Reference
## Common Command Failures
**Critical: Learn from failures. Never repeat the same failed command without modification.**
### tea CLI (Gitea Command Line Tool)
#### ❌ TTY Error - Interactive Confirmation Fails
```bash
~/bin/tea issues create --repo X --title "Y" --description "Z"
# Error: huh: could not open a new TTY: open /dev/tty: no such device or address
```
**💡 Reason:** tea tries to open `/dev/tty` for interactive confirmation prompts, which is unavailable in non-interactive environments.
**✅ Solution:** Use `YES=""` environment variable to bypass confirmation
```bash
YES="" ~/bin/tea issues create --repo jihoson/The-Ouroboros --title "Title" --description "Body"
YES="" ~/bin/tea issues edit <number> --repo jihoson/The-Ouroboros --description "Updated body"
YES="" ~/bin/tea pulls create --repo jihoson/The-Ouroboros --head feature-branch --base main --title "Title" --description "Body"
```
**📝 Notes:**
- Always set default login: `~/bin/tea login default local`
- Use `--repo jihoson/The-Ouroboros` when outside repo directory
- tea is preferred over direct Gitea API calls for consistency
#### ❌ Wrong Parameter Name
```bash
tea issues create --body "text"
# Error: flag provided but not defined: -body
```
**💡 Reason:** Parameter is `--description`, not `--body`.
**✅ Solution:** Use correct parameter name
```bash
YES="" ~/bin/tea issues create --description "text"
```
### Gitea API (Direct HTTP Calls)
#### ❌ Wrong Hostname
```bash
curl http://gitea.local:3000/api/v1/...
# Error: Could not resolve host: gitea.local
```
**💡 Reason:** Gitea instance runs on `localhost:3000`, not `gitea.local`.
**✅ Solution:** Use correct hostname (but prefer tea CLI)
```bash
curl http://localhost:3000/api/v1/repos/jihoson/The-Ouroboros/issues \
-H "Authorization: token $GITEA_TOKEN" \
-H "Content-Type: application/json" \
-d '{"title":"...", "body":"..."}'
```
**📝 Notes:**
- Prefer `tea` CLI over direct API calls
- Only use curl for operations tea doesn't support
### Git Commands
#### ❌ User Not Configured
```bash
git commit -m "message"
# Error: Author identity unknown
```
**💡 Reason:** Git user.name and user.email not set.
**✅ Solution:** Configure git user
```bash
git config user.name "agentson"
git config user.email "agentson@localhost"
```
#### ❌ Permission Denied on Push
```bash
git push origin branch
# Error: User permission denied for writing
```
**💡 Reason:** Repository access token lacks write permissions or user lacks repo write access.
**✅ Solution:**
1. Verify user has write access to repository (admin grants this)
2. Ensure git credential has correct token with `write:repository` scope
3. Check remote URL uses correct authentication
### Python/Pytest
#### ❌ Module Import Error
```bash
pytest tests/test_foo.py
# ModuleNotFoundError: No module named 'src'
```
**💡 Reason:** Package not installed in development mode.
**✅ Solution:** Install package with dev dependencies
```bash
pip install -e ".[dev]"
```
#### ❌ Async Test Hangs
```python
async def test_something(): # Hangs forever
result = await async_function()
```
**💡 Reason:** Missing pytest-asyncio or wrong configuration.
**✅ Solution:** Already configured in pyproject.toml
```toml
[tool.pytest.ini_options]
asyncio_mode = "auto"
```
No decorator needed for async tests.
## Build & Test Commands
```bash
# Install all dependencies (production + dev)
pip install -e ".[dev]"
# Run full test suite with coverage
pytest -v --cov=src --cov-report=term-missing
# Run a single test file
pytest tests/test_risk.py -v
# Run a single test by name
pytest tests/test_brain.py -k "test_parse_valid_json" -v
# Lint
ruff check src/ tests/
# Type check (strict mode, non-blocking in CI)
mypy src/ --strict
# Run the trading agent
python -m src.main --mode=paper
# Docker
docker compose up -d ouroboros # Run agent
docker compose --profile test up test # Run tests in container
```
## Environment Setup
```bash
# Create .env file from example
cp .env.example .env
# Edit .env with your credentials
# Required: KIS_APP_KEY, KIS_APP_SECRET, KIS_ACCOUNT_NO, GEMINI_API_KEY
# Verify configuration
python -c "from src.config import Settings; print(Settings())"
```

213
docs/testing.md Normal file
View File

@@ -0,0 +1,213 @@
# Testing Guidelines
## Test Structure
**54 tests** across four files. `asyncio_mode = "auto"` in pyproject.toml — async tests need no special decorator.
The `settings` fixture in `conftest.py` provides safe defaults with test credentials and in-memory DB.
### Test Files
#### `tests/test_risk.py` (11 tests)
- Circuit breaker boundaries
- Fat-finger edge cases
- P&L calculation edge cases
- Order validation logic
**Example:**
```python
def test_circuit_breaker_exact_threshold(risk_manager):
"""Circuit breaker should trip at exactly -3.0%."""
with pytest.raises(CircuitBreakerTripped):
risk_manager.validate_order(
current_pnl_pct=-3.0,
order_amount=1000,
total_cash=10000
)
```
#### `tests/test_broker.py` (6 tests)
- OAuth token lifecycle
- Rate limiting enforcement
- Hash key generation
- Network error handling
- SSL context configuration
**Example:**
```python
async def test_rate_limiter(broker):
"""Rate limiter should delay requests to stay under 10 RPS."""
start = time.monotonic()
for _ in range(15): # 15 requests
await broker._rate_limiter.acquire()
elapsed = time.monotonic() - start
assert elapsed >= 1.0 # Should take at least 1 second
```
#### `tests/test_brain.py` (18 tests)
- Valid JSON parsing
- Markdown-wrapped JSON handling
- Malformed JSON fallback
- Missing fields handling
- Invalid action validation
- Confidence threshold enforcement
- Empty response handling
- Prompt construction for different markets
**Example:**
```python
async def test_confidence_below_threshold_forces_hold(brain):
"""Decisions below confidence threshold should force HOLD."""
decision = brain.parse_response('{"action":"BUY","confidence":70,"rationale":"test"}')
assert decision.action == "HOLD"
assert decision.confidence == 70
```
#### `tests/test_market_schedule.py` (19 tests)
- Market open/close logic
- Timezone handling (UTC, Asia/Seoul, America/New_York, etc.)
- DST (Daylight Saving Time) transitions
- Weekend handling
- Lunch break logic
- Multiple market filtering
- Next market open calculation
**Example:**
```python
def test_is_market_open_during_trading_hours():
"""Market should be open during regular trading hours."""
# KRX: 9:00-15:30 KST, no lunch break
market = MARKETS["KR"]
trading_time = datetime(2026, 2, 3, 10, 0, tzinfo=ZoneInfo("Asia/Seoul")) # Monday 10:00
assert is_market_open(market, trading_time) is True
```
## Coverage Requirements
**Minimum coverage: 80%**
Check coverage:
```bash
pytest -v --cov=src --cov-report=term-missing
```
Expected output:
```
Name Stmts Miss Cover Missing
-----------------------------------------------------------
src/brain/gemini_client.py 85 5 94% 165-169
src/broker/kis_api.py 120 12 90% ...
src/core/risk_manager.py 35 2 94% ...
src/db.py 25 1 96% ...
src/main.py 150 80 47% (excluded from CI)
src/markets/schedule.py 95 3 97% ...
-----------------------------------------------------------
TOTAL 510 103 80%
```
**Note:** `main.py` has lower coverage as it contains the main loop which is tested via integration/manual testing.
## Test Configuration
### `pyproject.toml`
```toml
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
python_files = ["test_*.py"]
```
### `tests/conftest.py`
```python
@pytest.fixture
def settings() -> Settings:
"""Provide test settings with safe defaults."""
return Settings(
KIS_APP_KEY="test_key",
KIS_APP_SECRET="test_secret",
KIS_ACCOUNT_NO="12345678-01",
GEMINI_API_KEY="test_gemini_key",
MODE="paper",
DB_PATH=":memory:", # In-memory SQLite
CONFIDENCE_THRESHOLD=80,
ENABLED_MARKETS="KR",
)
```
## Writing New Tests
### Naming Convention
- Test files: `test_<module>.py`
- Test functions: `test_<feature>_<scenario>()`
- Use descriptive names that explain what is being tested
### Good Test Example
```python
async def test_send_order_with_market_price(broker, settings):
"""Market orders should use price=0 and ORD_DVSN='01'."""
# Arrange
stock_code = "005930"
order_type = "BUY"
quantity = 10
# Act
with patch.object(broker._session, 'post') as mock_post:
mock_post.return_value.__aenter__.return_value.status = 200
mock_post.return_value.__aenter__.return_value.json = AsyncMock(
return_value={"rt_cd": "0", "msg1": "OK"}
)
await broker.send_order(stock_code, order_type, quantity, price=0)
# Assert
call_args = mock_post.call_args
body = call_args.kwargs['json']
assert body['ORD_DVSN'] == '01' # Market order
assert body['ORD_UNPR'] == '0' # Price 0
```
### Test Checklist
- [ ] Test passes in isolation (`pytest tests/test_foo.py::test_bar -v`)
- [ ] Test has clear docstring explaining what it tests
- [ ] Arrange-Act-Assert structure
- [ ] Uses appropriate fixtures from conftest.py
- [ ] Mocks external dependencies (API calls, network)
- [ ] Tests edge cases and error conditions
- [ ] Doesn't rely on test execution order
## Running Tests
```bash
# All tests
pytest -v
# Specific file
pytest tests/test_risk.py -v
# Specific test
pytest tests/test_brain.py::test_parse_valid_json -v
# With coverage
pytest -v --cov=src --cov-report=term-missing
# Stop on first failure
pytest -x
# Verbose output with print statements
pytest -v -s
```
## CI/CD Integration
Tests run automatically on:
- Every commit to feature branches
- Every PR to main
- Scheduled daily runs
**Blocking conditions:**
- Test failures → PR blocked
- Coverage < 80% → PR blocked (warning only for main.py)
**Non-blocking:**
- `mypy --strict` errors (type hints encouraged but not enforced)
- `ruff check` warnings (must be acknowledged)

75
docs/workflow.md Normal file
View File

@@ -0,0 +1,75 @@
# Development Workflow
## Git Workflow Policy
**CRITICAL: All code changes MUST follow this workflow. Direct pushes to `main` are ABSOLUTELY PROHIBITED.**
1. **Create Gitea Issue First** — All features, bug fixes, and policy changes require a Gitea issue before any code is written
2. **Create Feature Branch** — Branch from `main` using format `feature/issue-{N}-{short-description}`
3. **Implement Changes** — Write code, tests, and documentation on the feature branch
4. **Create Pull Request** — Submit PR to `main` branch referencing the issue number
5. **Review & Merge** — After approval, merge via PR (squash or merge commit)
**Never commit directly to `main`.** This policy applies to all changes, no exceptions.
## Agent Workflow
**Modern AI development leverages specialized agents for concurrent, efficient task execution.**
### Parallel Execution Strategy
Use **git worktree** or **subagents** (via the Task tool) to handle multiple requirements simultaneously:
- Each task runs in independent context
- Parallel branches for concurrent features
- Isolated test environments prevent interference
- Faster iteration with distributed workload
### Specialized Agent Roles
Deploy task-specific agents as needed instead of handling everything in the main conversation:
- **Conversational Agent** (main) — Interface with user, coordinate other agents
- **Ticket Management Agent** — Create/update Gitea issues, track task status
- **Design Agent** — Architectural planning, RFC documents, API design
- **Code Writing Agent** — Implementation following specs
- **Testing Agent** — Write tests, verify coverage, run test suites
- **Documentation Agent** — Update docs, docstrings, CLAUDE.md, README
- **Review Agent** — Code review, lint checks, security audits
- **Custom Agents** — Created dynamically for specialized tasks (performance analysis, migration scripts, etc.)
### When to Use Agents
**Prefer spawning specialized agents for:**
1. Complex multi-file changes requiring exploration
2. Tasks with clear, isolated scope (e.g., "write tests for module X")
3. Parallel work streams (feature A + bugfix B simultaneously)
4. Long-running analysis (codebase search, dependency audit)
5. Tasks requiring different contexts (multiple git worktrees)
**Use the main conversation for:**
1. User interaction and clarification
2. Quick single-file edits
3. Coordinating agent work
4. High-level decision making
### Implementation
```python
# Example: Spawn parallel test and documentation agents
task_tool(
subagent_type="general-purpose",
prompt="Write comprehensive tests for src/markets/schedule.py",
description="Write schedule tests"
)
task_tool(
subagent_type="general-purpose",
prompt="Update README.md with global market feature documentation",
description="Update README"
)
```
Use `run_in_background=True` for independent tasks that don't block subsequent work.

View File

@@ -8,6 +8,7 @@ dependencies = [
"pydantic>=2.5,<3",
"pydantic-settings>=2.1,<3",
"google-genai>=1.0,<2",
"scipy>=1.11,<2",
]
[project.optional-dependencies]

View File

@@ -55,6 +55,28 @@ def init_db(db_path: str) -> sqlite3.Connection:
"""
)
# Decision logging table for comprehensive audit trail
conn.execute(
"""
CREATE TABLE IF NOT EXISTS decision_logs (
decision_id TEXT PRIMARY KEY,
timestamp TEXT NOT NULL,
stock_code TEXT NOT NULL,
market TEXT NOT NULL,
exchange_code TEXT NOT NULL,
action TEXT NOT NULL,
confidence INTEGER NOT NULL,
rationale TEXT NOT NULL,
context_snapshot TEXT NOT NULL,
input_data TEXT NOT NULL,
outcome_pnl REAL,
outcome_accuracy INTEGER,
reviewed INTEGER DEFAULT 0,
review_notes TEXT
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS context_metadata (
@@ -71,6 +93,16 @@ def init_db(db_path: str) -> sqlite3.Connection:
conn.execute("CREATE INDEX IF NOT EXISTS idx_contexts_timeframe ON contexts(timeframe)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_contexts_updated ON contexts(updated_at)")
# Create indices for efficient decision log queries
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_decision_logs_timestamp ON decision_logs(timestamp)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_decision_logs_reviewed ON decision_logs(reviewed)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_decision_logs_confidence ON decision_logs(confidence)"
)
conn.commit()
return conn

View File

@@ -0,0 +1,19 @@
"""Evolution engine for self-improving trading strategies."""
from src.evolution.ab_test import ABTester, ABTestResult, StrategyPerformance
from src.evolution.optimizer import EvolutionOptimizer
from src.evolution.performance_tracker import (
PerformanceDashboard,
PerformanceTracker,
StrategyMetrics,
)
__all__ = [
"EvolutionOptimizer",
"ABTester",
"ABTestResult",
"StrategyPerformance",
"PerformanceTracker",
"PerformanceDashboard",
"StrategyMetrics",
]

220
src/evolution/ab_test.py Normal file
View File

@@ -0,0 +1,220 @@
"""A/B Testing framework for strategy comparison.
Runs multiple strategies in parallel, tracks their performance,
and uses statistical significance testing to determine winners.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import Any
import scipy.stats as stats
logger = logging.getLogger(__name__)
@dataclass
class StrategyPerformance:
"""Performance metrics for a single strategy."""
strategy_name: str
total_trades: int
wins: int
losses: int
total_pnl: float
avg_pnl: float
win_rate: float
sharpe_ratio: float | None = None
@dataclass
class ABTestResult:
"""Result of an A/B test between two strategies."""
strategy_a: str
strategy_b: str
winner: str | None
p_value: float
confidence_level: float
is_significant: bool
performance_a: StrategyPerformance
performance_b: StrategyPerformance
class ABTester:
"""A/B testing framework for comparing trading strategies."""
def __init__(self, significance_level: float = 0.05) -> None:
"""Initialize A/B tester.
Args:
significance_level: P-value threshold for statistical significance (default 0.05)
"""
self._significance_level = significance_level
def calculate_performance(
self, trades: list[dict[str, Any]], strategy_name: str
) -> StrategyPerformance:
"""Calculate performance metrics for a strategy.
Args:
trades: List of trade records with pnl values
strategy_name: Name of the strategy
Returns:
StrategyPerformance object with calculated metrics
"""
if not trades:
return StrategyPerformance(
strategy_name=strategy_name,
total_trades=0,
wins=0,
losses=0,
total_pnl=0.0,
avg_pnl=0.0,
win_rate=0.0,
sharpe_ratio=None,
)
total_trades = len(trades)
wins = sum(1 for t in trades if t.get("pnl", 0) > 0)
losses = sum(1 for t in trades if t.get("pnl", 0) < 0)
pnls = [t.get("pnl", 0.0) for t in trades]
total_pnl = sum(pnls)
avg_pnl = total_pnl / total_trades if total_trades > 0 else 0.0
win_rate = (wins / total_trades * 100) if total_trades > 0 else 0.0
# Calculate Sharpe ratio (risk-adjusted return)
sharpe_ratio = None
if len(pnls) > 1:
mean_return = avg_pnl
std_return = (
sum((p - mean_return) ** 2 for p in pnls) / (len(pnls) - 1)
) ** 0.5
if std_return > 0:
sharpe_ratio = mean_return / std_return
return StrategyPerformance(
strategy_name=strategy_name,
total_trades=total_trades,
wins=wins,
losses=losses,
total_pnl=round(total_pnl, 2),
avg_pnl=round(avg_pnl, 2),
win_rate=round(win_rate, 2),
sharpe_ratio=round(sharpe_ratio, 4) if sharpe_ratio else None,
)
def compare_strategies(
self,
trades_a: list[dict[str, Any]],
trades_b: list[dict[str, Any]],
strategy_a_name: str = "Strategy A",
strategy_b_name: str = "Strategy B",
) -> ABTestResult:
"""Compare two strategies using statistical testing.
Uses a two-sample t-test to determine if performance difference is significant.
Args:
trades_a: List of trades from strategy A
trades_b: List of trades from strategy B
strategy_a_name: Name of strategy A
strategy_b_name: Name of strategy B
Returns:
ABTestResult with comparison details
"""
perf_a = self.calculate_performance(trades_a, strategy_a_name)
perf_b = self.calculate_performance(trades_b, strategy_b_name)
# Extract PnL arrays for statistical testing
pnls_a = [t.get("pnl", 0.0) for t in trades_a]
pnls_b = [t.get("pnl", 0.0) for t in trades_b]
# Perform two-sample t-test
if len(pnls_a) > 1 and len(pnls_b) > 1:
t_stat, p_value = stats.ttest_ind(pnls_a, pnls_b, equal_var=False)
is_significant = p_value < self._significance_level
confidence_level = (1 - p_value) * 100
else:
# Not enough data for statistical test
p_value = 1.0
is_significant = False
confidence_level = 0.0
# Determine winner based on average PnL
winner = None
if is_significant:
if perf_a.avg_pnl > perf_b.avg_pnl:
winner = strategy_a_name
elif perf_b.avg_pnl > perf_a.avg_pnl:
winner = strategy_b_name
return ABTestResult(
strategy_a=strategy_a_name,
strategy_b=strategy_b_name,
winner=winner,
p_value=round(p_value, 4),
confidence_level=round(confidence_level, 2),
is_significant=is_significant,
performance_a=perf_a,
performance_b=perf_b,
)
def should_deploy(
self,
result: ABTestResult,
min_win_rate: float = 60.0,
min_trades: int = 20,
) -> bool:
"""Determine if a winning strategy should be deployed.
Args:
result: A/B test result
min_win_rate: Minimum win rate percentage for deployment (default 60%)
min_trades: Minimum number of trades required (default 20)
Returns:
True if the winning strategy meets deployment criteria
"""
if not result.is_significant or result.winner is None:
return False
# Get performance of winning strategy
if result.winner == result.strategy_a:
winning_perf = result.performance_a
else:
winning_perf = result.performance_b
# Check deployment criteria
has_enough_trades = winning_perf.total_trades >= min_trades
has_good_win_rate = winning_perf.win_rate >= min_win_rate
is_profitable = winning_perf.avg_pnl > 0
meets_criteria = has_enough_trades and has_good_win_rate and is_profitable
if meets_criteria:
logger.info(
"Strategy '%s' meets deployment criteria: "
"win_rate=%.2f%%, trades=%d, avg_pnl=%.2f",
result.winner,
winning_perf.win_rate,
winning_perf.total_trades,
winning_perf.avg_pnl,
)
else:
logger.info(
"Strategy '%s' does NOT meet deployment criteria: "
"win_rate=%.2f%% (min %.2f%%), trades=%d (min %d), avg_pnl=%.2f",
result.winner if result.winner else "unknown",
winning_perf.win_rate if result.winner else 0.0,
min_win_rate,
winning_perf.total_trades if result.winner else 0,
min_trades,
winning_perf.avg_pnl if result.winner else 0.0,
)
return meets_criteria

View File

@@ -1,10 +1,10 @@
"""Evolution Engine — analyzes trade logs and generates new strategies.
This module:
1. Reads trade_logs.db to identify failing patterns
2. Asks Gemini to generate a new strategy class
3. Runs pytest on the generated file
4. Creates a simulated PR if tests pass
1. Uses DecisionLogger.get_losing_decisions() to identify failing patterns
2. Analyzes failure patterns by time, market conditions, stock characteristics
3. Asks Gemini to generate improved strategy recommendations
4. Generates new strategy classes with enhanced decision-making logic
"""
from __future__ import annotations
@@ -14,6 +14,7 @@ import logging
import sqlite3
import subprocess
import textwrap
from collections import Counter
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
@@ -21,6 +22,8 @@ from typing import Any
from google import genai
from src.config import Settings
from src.db import init_db
from src.logging.decision_logger import DecisionLog, DecisionLogger
logger = logging.getLogger(__name__)
@@ -53,29 +56,105 @@ class EvolutionOptimizer:
self._db_path = settings.DB_PATH
self._client = genai.Client(api_key=settings.GEMINI_API_KEY)
self._model_name = settings.GEMINI_MODEL
self._conn = init_db(self._db_path)
self._decision_logger = DecisionLogger(self._conn)
# ------------------------------------------------------------------
# Analysis
# ------------------------------------------------------------------
def analyze_failures(self, limit: int = 50) -> list[dict[str, Any]]:
"""Find trades where high confidence led to losses."""
conn = sqlite3.connect(self._db_path)
conn.row_factory = sqlite3.Row
try:
rows = conn.execute(
"""
SELECT stock_code, action, confidence, pnl, rationale, timestamp
FROM trades
WHERE confidence >= 80 AND pnl < 0
ORDER BY pnl ASC
LIMIT ?
""",
(limit,),
).fetchall()
return [dict(r) for r in rows]
finally:
conn.close()
"""Find high-confidence decisions that resulted in losses.
Uses DecisionLogger.get_losing_decisions() to retrieve failures.
"""
losing_decisions = self._decision_logger.get_losing_decisions(
min_confidence=80, min_loss=-100.0
)
# Limit results
if len(losing_decisions) > limit:
losing_decisions = losing_decisions[:limit]
# Convert to dict format for analysis
failures = []
for decision in losing_decisions:
failures.append({
"decision_id": decision.decision_id,
"timestamp": decision.timestamp,
"stock_code": decision.stock_code,
"market": decision.market,
"exchange_code": decision.exchange_code,
"action": decision.action,
"confidence": decision.confidence,
"rationale": decision.rationale,
"outcome_pnl": decision.outcome_pnl,
"outcome_accuracy": decision.outcome_accuracy,
"context_snapshot": decision.context_snapshot,
"input_data": decision.input_data,
})
return failures
def identify_failure_patterns(
self, failures: list[dict[str, Any]]
) -> dict[str, Any]:
"""Identify patterns in losing decisions.
Analyzes:
- Time patterns (hour of day, day of week)
- Market conditions (volatility, volume)
- Stock characteristics (price range, market)
- Common failure modes in rationale
"""
if not failures:
return {"pattern_count": 0, "patterns": {}}
patterns = {
"markets": Counter(),
"actions": Counter(),
"hours": Counter(),
"avg_confidence": 0.0,
"avg_loss": 0.0,
"total_failures": len(failures),
}
total_confidence = 0
total_loss = 0.0
for failure in failures:
# Market distribution
patterns["markets"][failure.get("market", "UNKNOWN")] += 1
# Action distribution
patterns["actions"][failure.get("action", "UNKNOWN")] += 1
# Time pattern (extract hour from ISO timestamp)
timestamp = failure.get("timestamp", "")
if timestamp:
try:
dt = datetime.fromisoformat(timestamp)
patterns["hours"][dt.hour] += 1
except (ValueError, AttributeError):
pass
# Aggregate metrics
total_confidence += failure.get("confidence", 0)
total_loss += failure.get("outcome_pnl", 0.0)
patterns["avg_confidence"] = (
round(total_confidence / len(failures), 2) if failures else 0.0
)
patterns["avg_loss"] = (
round(total_loss / len(failures), 2) if failures else 0.0
)
# Convert Counters to regular dicts for JSON serialization
patterns["markets"] = dict(patterns["markets"])
patterns["actions"] = dict(patterns["actions"])
patterns["hours"] = dict(patterns["hours"])
return patterns
def get_performance_summary(self) -> dict[str, Any]:
"""Return aggregate performance metrics from trade logs."""
@@ -109,14 +188,25 @@ class EvolutionOptimizer:
async def generate_strategy(self, failures: list[dict[str, Any]]) -> Path | None:
"""Ask Gemini to generate a new strategy based on failure analysis.
Integrates failure patterns and market conditions to create improved strategies.
Returns the path to the generated strategy file, or None on failure.
"""
# Identify failure patterns first
patterns = self.identify_failure_patterns(failures)
prompt = (
"You are a quantitative trading strategy developer.\n"
"Analyze these failed trades and generate an improved strategy.\n\n"
f"Failed trades:\n{json.dumps(failures, indent=2, default=str)}\n\n"
"Generate a Python class that inherits from BaseStrategy.\n"
"The class must have an `evaluate(self, market_data: dict) -> dict` method.\n"
"Analyze these failed trades and their patterns, then generate an improved strategy.\n\n"
f"Failure Patterns:\n{json.dumps(patterns, indent=2)}\n\n"
f"Sample Failed Trades (first 5):\n"
f"{json.dumps(failures[:5], indent=2, default=str)}\n\n"
"Based on these patterns, generate an improved trading strategy.\n"
"The strategy should:\n"
"1. Avoid the identified failure patterns\n"
"2. Consider market-specific conditions\n"
"3. Adjust confidence based on historical performance\n\n"
"Generate a Python method body that inherits from BaseStrategy.\n"
"The method signature is: evaluate(self, market_data: dict) -> dict\n"
"The method must return a dict with keys: action, confidence, rationale.\n"
"Respond with ONLY the method body (Python code), no class definition.\n"
)
@@ -147,10 +237,15 @@ class EvolutionOptimizer:
# Indent the body for the class method
indented_body = textwrap.indent(body, " ")
# Generate rationale from patterns
rationale = f"Auto-evolved from {len(failures)} failures. "
rationale += f"Primary failure markets: {list(patterns.get('markets', {}).keys())}. "
rationale += f"Average loss: {patterns.get('avg_loss', 0.0)}"
content = STRATEGY_TEMPLATE.format(
name=version,
timestamp=datetime.now(UTC).isoformat(),
rationale="Auto-evolved from failure analysis",
rationale=rationale,
class_name=class_name,
body=indented_body.strip(),
)

View File

@@ -0,0 +1,303 @@
"""Performance tracking system for strategy monitoring.
Tracks win rates, monitors improvement over time,
and provides performance metrics dashboard.
"""
from __future__ import annotations
import json
import logging
import sqlite3
from dataclasses import asdict, dataclass
from datetime import UTC, datetime, timedelta
from typing import Any
logger = logging.getLogger(__name__)
@dataclass
class StrategyMetrics:
"""Performance metrics for a strategy over a time period."""
strategy_name: str
period_start: str
period_end: str
total_trades: int
wins: int
losses: int
holds: int
win_rate: float
avg_pnl: float
total_pnl: float
best_trade: float
worst_trade: float
avg_confidence: float
@dataclass
class PerformanceDashboard:
"""Comprehensive performance dashboard."""
generated_at: str
overall_metrics: StrategyMetrics
daily_metrics: list[StrategyMetrics]
weekly_metrics: list[StrategyMetrics]
improvement_trend: dict[str, Any]
class PerformanceTracker:
"""Tracks and monitors strategy performance over time."""
def __init__(self, db_path: str) -> None:
"""Initialize performance tracker.
Args:
db_path: Path to the trade logs database
"""
self._db_path = db_path
def get_strategy_metrics(
self,
strategy_name: str | None = None,
start_date: str | None = None,
end_date: str | None = None,
) -> StrategyMetrics:
"""Get performance metrics for a strategy over a time period.
Args:
strategy_name: Name of the strategy (None = all strategies)
start_date: Start date in ISO format (None = beginning of time)
end_date: End date in ISO format (None = now)
Returns:
StrategyMetrics object with performance data
"""
conn = sqlite3.connect(self._db_path)
conn.row_factory = sqlite3.Row
try:
# Build query with optional filters
query = """
SELECT
COUNT(*) as total_trades,
SUM(CASE WHEN pnl > 0 THEN 1 ELSE 0 END) as wins,
SUM(CASE WHEN pnl < 0 THEN 1 ELSE 0 END) as losses,
SUM(CASE WHEN action = 'HOLD' THEN 1 ELSE 0 END) as holds,
COALESCE(AVG(CASE WHEN pnl IS NOT NULL THEN pnl END), 0) as avg_pnl,
COALESCE(SUM(CASE WHEN pnl IS NOT NULL THEN pnl ELSE 0 END), 0) as total_pnl,
COALESCE(MAX(pnl), 0) as best_trade,
COALESCE(MIN(pnl), 0) as worst_trade,
COALESCE(AVG(confidence), 0) as avg_confidence,
MIN(timestamp) as period_start,
MAX(timestamp) as period_end
FROM trades
WHERE 1=1
"""
params: list[Any] = []
if start_date:
query += " AND timestamp >= ?"
params.append(start_date)
if end_date:
query += " AND timestamp <= ?"
params.append(end_date)
# Note: Currently trades table doesn't have strategy_name column
# This is a placeholder for future extension
row = conn.execute(query, params).fetchone()
total_trades = row["total_trades"] or 0
wins = row["wins"] or 0
win_rate = (wins / total_trades * 100) if total_trades > 0 else 0.0
return StrategyMetrics(
strategy_name=strategy_name or "default",
period_start=row["period_start"] or "",
period_end=row["period_end"] or "",
total_trades=total_trades,
wins=wins,
losses=row["losses"] or 0,
holds=row["holds"] or 0,
win_rate=round(win_rate, 2),
avg_pnl=round(row["avg_pnl"], 2),
total_pnl=round(row["total_pnl"], 2),
best_trade=round(row["best_trade"], 2),
worst_trade=round(row["worst_trade"], 2),
avg_confidence=round(row["avg_confidence"], 2),
)
finally:
conn.close()
def get_daily_metrics(
self, days: int = 7, strategy_name: str | None = None
) -> list[StrategyMetrics]:
"""Get daily performance metrics for the last N days.
Args:
days: Number of days to retrieve (default 7)
strategy_name: Name of the strategy (None = all strategies)
Returns:
List of StrategyMetrics, one per day
"""
metrics = []
end_date = datetime.now(UTC)
for i in range(days):
day_end = end_date - timedelta(days=i)
day_start = day_end - timedelta(days=1)
day_metrics = self.get_strategy_metrics(
strategy_name=strategy_name,
start_date=day_start.isoformat(),
end_date=day_end.isoformat(),
)
metrics.append(day_metrics)
return metrics
def get_weekly_metrics(
self, weeks: int = 4, strategy_name: str | None = None
) -> list[StrategyMetrics]:
"""Get weekly performance metrics for the last N weeks.
Args:
weeks: Number of weeks to retrieve (default 4)
strategy_name: Name of the strategy (None = all strategies)
Returns:
List of StrategyMetrics, one per week
"""
metrics = []
end_date = datetime.now(UTC)
for i in range(weeks):
week_end = end_date - timedelta(weeks=i)
week_start = week_end - timedelta(weeks=1)
week_metrics = self.get_strategy_metrics(
strategy_name=strategy_name,
start_date=week_start.isoformat(),
end_date=week_end.isoformat(),
)
metrics.append(week_metrics)
return metrics
def calculate_improvement_trend(
self, metrics_history: list[StrategyMetrics]
) -> dict[str, Any]:
"""Calculate improvement trend from historical metrics.
Args:
metrics_history: List of StrategyMetrics ordered from oldest to newest
Returns:
Dictionary with trend analysis
"""
if len(metrics_history) < 2:
return {
"trend": "insufficient_data",
"win_rate_change": 0.0,
"pnl_change": 0.0,
"confidence_change": 0.0,
}
oldest = metrics_history[0]
newest = metrics_history[-1]
win_rate_change = newest.win_rate - oldest.win_rate
pnl_change = newest.avg_pnl - oldest.avg_pnl
confidence_change = newest.avg_confidence - oldest.avg_confidence
# Determine overall trend
if win_rate_change > 5.0 and pnl_change > 0:
trend = "improving"
elif win_rate_change < -5.0 or pnl_change < 0:
trend = "declining"
else:
trend = "stable"
return {
"trend": trend,
"win_rate_change": round(win_rate_change, 2),
"pnl_change": round(pnl_change, 2),
"confidence_change": round(confidence_change, 2),
"period_count": len(metrics_history),
}
def generate_dashboard(
self, strategy_name: str | None = None
) -> PerformanceDashboard:
"""Generate a comprehensive performance dashboard.
Args:
strategy_name: Name of the strategy (None = all strategies)
Returns:
PerformanceDashboard with all metrics
"""
# Get overall metrics
overall_metrics = self.get_strategy_metrics(strategy_name=strategy_name)
# Get daily metrics (last 7 days)
daily_metrics = self.get_daily_metrics(days=7, strategy_name=strategy_name)
# Get weekly metrics (last 4 weeks)
weekly_metrics = self.get_weekly_metrics(weeks=4, strategy_name=strategy_name)
# Calculate improvement trend
improvement_trend = self.calculate_improvement_trend(weekly_metrics[::-1])
return PerformanceDashboard(
generated_at=datetime.now(UTC).isoformat(),
overall_metrics=overall_metrics,
daily_metrics=daily_metrics,
weekly_metrics=weekly_metrics,
improvement_trend=improvement_trend,
)
def export_dashboard_json(
self, dashboard: PerformanceDashboard
) -> str:
"""Export dashboard as JSON string.
Args:
dashboard: PerformanceDashboard object
Returns:
JSON string representation
"""
data = {
"generated_at": dashboard.generated_at,
"overall_metrics": asdict(dashboard.overall_metrics),
"daily_metrics": [asdict(m) for m in dashboard.daily_metrics],
"weekly_metrics": [asdict(m) for m in dashboard.weekly_metrics],
"improvement_trend": dashboard.improvement_trend,
}
return json.dumps(data, indent=2)
def log_dashboard(self, dashboard: PerformanceDashboard) -> None:
"""Log dashboard summary to logger.
Args:
dashboard: PerformanceDashboard object
"""
logger.info("=" * 60)
logger.info("PERFORMANCE DASHBOARD")
logger.info("=" * 60)
logger.info("Generated: %s", dashboard.generated_at)
logger.info("")
logger.info("Overall Performance:")
logger.info(" Total Trades: %d", dashboard.overall_metrics.total_trades)
logger.info(" Win Rate: %.2f%%", dashboard.overall_metrics.win_rate)
logger.info(" Average P&L: %.2f", dashboard.overall_metrics.avg_pnl)
logger.info(" Total P&L: %.2f", dashboard.overall_metrics.total_pnl)
logger.info("")
logger.info("Improvement Trend (%s):", dashboard.improvement_trend["trend"])
logger.info(" Win Rate Change: %+.2f%%", dashboard.improvement_trend["win_rate_change"])
logger.info(" P&L Change: %+.2f", dashboard.improvement_trend["pnl_change"])
logger.info("=" * 60)

5
src/logging/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
"""Decision logging and audit trail for trade decisions."""
from src.logging.decision_logger import DecisionLog, DecisionLogger
__all__ = ["DecisionLog", "DecisionLogger"]

View File

@@ -0,0 +1,235 @@
"""Decision logging system with context snapshots for comprehensive audit trail."""
from __future__ import annotations
import json
import sqlite3
import uuid
from dataclasses import dataclass
from datetime import UTC, datetime
from typing import Any
@dataclass
class DecisionLog:
"""A logged trading decision with context and outcome."""
decision_id: str
timestamp: str
stock_code: str
market: str
exchange_code: str
action: str
confidence: int
rationale: str
context_snapshot: dict[str, Any]
input_data: dict[str, Any]
outcome_pnl: float | None = None
outcome_accuracy: int | None = None
reviewed: bool = False
review_notes: str | None = None
class DecisionLogger:
"""Logs trading decisions with full context for review and evolution."""
def __init__(self, conn: sqlite3.Connection) -> None:
"""Initialize the decision logger with a database connection."""
self.conn = conn
def log_decision(
self,
stock_code: str,
market: str,
exchange_code: str,
action: str,
confidence: int,
rationale: str,
context_snapshot: dict[str, Any],
input_data: dict[str, Any],
) -> str:
"""Log a trading decision with full context.
Args:
stock_code: Stock symbol
market: Market code (e.g., "KR", "US_NASDAQ")
exchange_code: Exchange code (e.g., "KRX", "NASDAQ")
action: Trading action (BUY/SELL/HOLD)
confidence: Confidence level (0-100)
rationale: Reasoning for the decision
context_snapshot: L1-L7 context snapshot at decision time
input_data: Market data inputs (price, volume, orderbook, etc.)
Returns:
decision_id: Unique identifier for this decision
"""
decision_id = str(uuid.uuid4())
timestamp = datetime.now(UTC).isoformat()
self.conn.execute(
"""
INSERT INTO decision_logs (
decision_id, timestamp, stock_code, market, exchange_code,
action, confidence, rationale, context_snapshot, input_data
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
decision_id,
timestamp,
stock_code,
market,
exchange_code,
action,
confidence,
rationale,
json.dumps(context_snapshot),
json.dumps(input_data),
),
)
self.conn.commit()
return decision_id
def get_unreviewed_decisions(
self, min_confidence: int = 80, limit: int | None = None
) -> list[DecisionLog]:
"""Get unreviewed decisions with high confidence.
Args:
min_confidence: Minimum confidence threshold (default 80)
limit: Maximum number of results (None = unlimited)
Returns:
List of unreviewed DecisionLog objects
"""
query = """
SELECT
decision_id, timestamp, stock_code, market, exchange_code,
action, confidence, rationale, context_snapshot, input_data,
outcome_pnl, outcome_accuracy, reviewed, review_notes
FROM decision_logs
WHERE reviewed = 0 AND confidence >= ?
ORDER BY timestamp DESC
"""
if limit is not None:
query += f" LIMIT {limit}"
cursor = self.conn.execute(query, (min_confidence,))
return [self._row_to_decision_log(row) for row in cursor.fetchall()]
def mark_reviewed(self, decision_id: str, notes: str) -> None:
"""Mark a decision as reviewed with notes.
Args:
decision_id: Decision identifier
notes: Review notes and insights
"""
self.conn.execute(
"""
UPDATE decision_logs
SET reviewed = 1, review_notes = ?
WHERE decision_id = ?
""",
(notes, decision_id),
)
self.conn.commit()
def update_outcome(
self, decision_id: str, pnl: float, accuracy: int
) -> None:
"""Update the outcome of a decision after trade execution.
Args:
decision_id: Decision identifier
pnl: Actual profit/loss realized
accuracy: 1 if decision was correct, 0 if wrong
"""
self.conn.execute(
"""
UPDATE decision_logs
SET outcome_pnl = ?, outcome_accuracy = ?
WHERE decision_id = ?
""",
(pnl, accuracy, decision_id),
)
self.conn.commit()
def get_decision_by_id(self, decision_id: str) -> DecisionLog | None:
"""Get a specific decision by ID.
Args:
decision_id: Decision identifier
Returns:
DecisionLog object or None if not found
"""
cursor = self.conn.execute(
"""
SELECT
decision_id, timestamp, stock_code, market, exchange_code,
action, confidence, rationale, context_snapshot, input_data,
outcome_pnl, outcome_accuracy, reviewed, review_notes
FROM decision_logs
WHERE decision_id = ?
""",
(decision_id,),
)
row = cursor.fetchone()
return self._row_to_decision_log(row) if row else None
def get_losing_decisions(
self, min_confidence: int = 80, min_loss: float = -100.0
) -> list[DecisionLog]:
"""Get high-confidence decisions that resulted in losses.
Useful for identifying patterns in failed predictions.
Args:
min_confidence: Minimum confidence threshold (default 80)
min_loss: Minimum loss amount (default -100.0, i.e., loss >= 100)
Returns:
List of losing DecisionLog objects
"""
cursor = self.conn.execute(
"""
SELECT
decision_id, timestamp, stock_code, market, exchange_code,
action, confidence, rationale, context_snapshot, input_data,
outcome_pnl, outcome_accuracy, reviewed, review_notes
FROM decision_logs
WHERE confidence >= ?
AND outcome_pnl IS NOT NULL
AND outcome_pnl <= ?
ORDER BY outcome_pnl ASC
""",
(min_confidence, min_loss),
)
return [self._row_to_decision_log(row) for row in cursor.fetchall()]
def _row_to_decision_log(self, row: tuple[Any, ...]) -> DecisionLog:
"""Convert a database row to a DecisionLog object.
Args:
row: Database row tuple
Returns:
DecisionLog object
"""
return DecisionLog(
decision_id=row[0],
timestamp=row[1],
stock_code=row[2],
market=row[3],
exchange_code=row[4],
action=row[5],
confidence=row[6],
rationale=row[7],
context_snapshot=json.loads(row[8]),
input_data=json.loads(row[9]),
outcome_pnl=row[10],
outcome_accuracy=row[11],
reviewed=bool(row[12]),
review_notes=row[13],
)

View File

@@ -19,6 +19,7 @@ from src.broker.overseas import OverseasBroker
from src.config import Settings
from src.core.risk_manager import CircuitBreakerTripped, RiskManager
from src.db import init_db, log_trade
from src.logging.decision_logger import DecisionLogger
from src.logging_config import setup_logging
from src.markets.schedule import MarketInfo, get_next_market_open, get_open_markets
@@ -42,6 +43,7 @@ async def trading_cycle(
brain: GeminiClient,
risk: RiskManager,
db_conn: Any,
decision_logger: DecisionLogger,
market: MarketInfo,
stock_code: str,
) -> None:
@@ -101,6 +103,39 @@ async def trading_cycle(
decision.confidence,
)
# 2.5. Log decision with context snapshot
context_snapshot = {
"L1": {
"current_price": current_price,
"foreigner_net": foreigner_net,
},
"L2": {
"total_eval": total_eval,
"total_cash": total_cash,
"purchase_total": purchase_total,
"pnl_pct": pnl_pct,
},
# L3-L7 will be populated when context tree is implemented
}
input_data = {
"current_price": current_price,
"foreigner_net": foreigner_net,
"total_eval": total_eval,
"total_cash": total_cash,
"pnl_pct": pnl_pct,
}
decision_logger.log_decision(
stock_code=stock_code,
market=market.code,
exchange_code=market.exchange_code,
action=decision.action,
confidence=decision.confidence,
rationale=decision.rationale,
context_snapshot=context_snapshot,
input_data=input_data,
)
# 3. Execute if actionable
if decision.action in ("BUY", "SELL"):
# Determine order size (simplified: 1 lot)
@@ -151,6 +186,7 @@ async def run(settings: Settings) -> None:
brain = GeminiClient(settings)
risk = RiskManager(settings)
db_conn = init_db(settings.DB_PATH)
decision_logger = DecisionLogger(db_conn)
shutdown = asyncio.Event()
@@ -218,6 +254,7 @@ async def run(settings: Settings) -> None:
brain,
risk,
db_conn,
decision_logger,
market,
stock_code,
)

View File

@@ -0,0 +1,292 @@
"""Tests for decision logging and audit trail."""
from __future__ import annotations
import sqlite3
from datetime import UTC, datetime
import pytest
from src.db import init_db
from src.logging.decision_logger import DecisionLog, DecisionLogger
@pytest.fixture
def db_conn() -> sqlite3.Connection:
"""Provide an in-memory database with initialized schema."""
conn = init_db(":memory:")
return conn
@pytest.fixture
def logger(db_conn: sqlite3.Connection) -> DecisionLogger:
"""Provide a DecisionLogger instance."""
return DecisionLogger(db_conn)
def test_log_decision_creates_record(logger: DecisionLogger, db_conn: sqlite3.Connection) -> None:
"""Test that log_decision creates a database record."""
context_snapshot = {
"L1": {"quote": {"price": 100.0, "volume": 1000}},
"L2": {"orderbook": {"bid": [99.0], "ask": [101.0]}},
}
input_data = {"price": 100.0, "volume": 1000, "foreigner_net": 500}
decision_id = logger.log_decision(
stock_code="005930",
market="KR",
exchange_code="KRX",
action="BUY",
confidence=85,
rationale="Strong upward momentum",
context_snapshot=context_snapshot,
input_data=input_data,
)
# Verify decision_id is a valid UUID
assert decision_id is not None
assert len(decision_id) == 36 # UUID v4 format
# Verify record exists in database
cursor = db_conn.execute(
"SELECT decision_id, action, confidence FROM decision_logs WHERE decision_id = ?",
(decision_id,),
)
row = cursor.fetchone()
assert row is not None
assert row[0] == decision_id
assert row[1] == "BUY"
assert row[2] == 85
def test_log_decision_stores_context_snapshot(logger: DecisionLogger) -> None:
"""Test that context snapshot is stored as JSON."""
context_snapshot = {
"L1": {"real_time": "data"},
"L3": {"daily": "aggregate"},
"L7": {"legacy": "wisdom"},
}
input_data = {"price": 50000.0, "volume": 2000}
decision_id = logger.log_decision(
stock_code="035420",
market="KR",
exchange_code="KRX",
action="HOLD",
confidence=75,
rationale="Waiting for clearer signal",
context_snapshot=context_snapshot,
input_data=input_data,
)
# Retrieve and verify context snapshot
decision = logger.get_decision_by_id(decision_id)
assert decision is not None
assert decision.context_snapshot == context_snapshot
assert decision.input_data == input_data
def test_get_unreviewed_decisions(logger: DecisionLogger) -> None:
"""Test retrieving unreviewed decisions with confidence filter."""
# Log multiple decisions with varying confidence
logger.log_decision(
stock_code="005930",
market="KR",
exchange_code="KRX",
action="BUY",
confidence=90,
rationale="High confidence buy",
context_snapshot={},
input_data={},
)
logger.log_decision(
stock_code="000660",
market="KR",
exchange_code="KRX",
action="SELL",
confidence=75,
rationale="Low confidence sell",
context_snapshot={},
input_data={},
)
logger.log_decision(
stock_code="035420",
market="KR",
exchange_code="KRX",
action="HOLD",
confidence=85,
rationale="Medium confidence hold",
context_snapshot={},
input_data={},
)
# Get unreviewed decisions with default threshold (80)
unreviewed = logger.get_unreviewed_decisions()
assert len(unreviewed) == 2 # Only confidence >= 80
assert all(d.confidence >= 80 for d in unreviewed)
assert all(not d.reviewed for d in unreviewed)
# Get with lower threshold
unreviewed_all = logger.get_unreviewed_decisions(min_confidence=70)
assert len(unreviewed_all) == 3
def test_mark_reviewed(logger: DecisionLogger) -> None:
"""Test marking a decision as reviewed."""
decision_id = logger.log_decision(
stock_code="005930",
market="KR",
exchange_code="KRX",
action="BUY",
confidence=85,
rationale="Test decision",
context_snapshot={},
input_data={},
)
# Initially unreviewed
decision = logger.get_decision_by_id(decision_id)
assert decision is not None
assert not decision.reviewed
assert decision.review_notes is None
# Mark as reviewed
review_notes = "Good decision, captured bullish momentum correctly"
logger.mark_reviewed(decision_id, review_notes)
# Verify updated
decision = logger.get_decision_by_id(decision_id)
assert decision is not None
assert decision.reviewed
assert decision.review_notes == review_notes
# Should not appear in unreviewed list
unreviewed = logger.get_unreviewed_decisions()
assert all(d.decision_id != decision_id for d in unreviewed)
def test_update_outcome(logger: DecisionLogger) -> None:
"""Test updating decision outcome with P&L and accuracy."""
decision_id = logger.log_decision(
stock_code="005930",
market="KR",
exchange_code="KRX",
action="BUY",
confidence=90,
rationale="Expecting price increase",
context_snapshot={},
input_data={},
)
# Initially no outcome
decision = logger.get_decision_by_id(decision_id)
assert decision is not None
assert decision.outcome_pnl is None
assert decision.outcome_accuracy is None
# Update outcome (profitable trade)
logger.update_outcome(decision_id, pnl=5000.0, accuracy=1)
# Verify updated
decision = logger.get_decision_by_id(decision_id)
assert decision is not None
assert decision.outcome_pnl == 5000.0
assert decision.outcome_accuracy == 1
def test_get_losing_decisions(logger: DecisionLogger) -> None:
"""Test retrieving high-confidence losing decisions."""
# Profitable decision
id1 = logger.log_decision(
stock_code="005930",
market="KR",
exchange_code="KRX",
action="BUY",
confidence=85,
rationale="Correct prediction",
context_snapshot={},
input_data={},
)
logger.update_outcome(id1, pnl=3000.0, accuracy=1)
# High-confidence loss
id2 = logger.log_decision(
stock_code="000660",
market="KR",
exchange_code="KRX",
action="SELL",
confidence=90,
rationale="Wrong prediction",
context_snapshot={},
input_data={},
)
logger.update_outcome(id2, pnl=-2000.0, accuracy=0)
# Low-confidence loss (should be ignored)
id3 = logger.log_decision(
stock_code="035420",
market="KR",
exchange_code="KRX",
action="BUY",
confidence=70,
rationale="Low confidence, wrong",
context_snapshot={},
input_data={},
)
logger.update_outcome(id3, pnl=-1500.0, accuracy=0)
# Get high-confidence losing decisions
losers = logger.get_losing_decisions(min_confidence=80, min_loss=-1000.0)
assert len(losers) == 1
assert losers[0].decision_id == id2
assert losers[0].outcome_pnl == -2000.0
assert losers[0].confidence == 90
def test_get_decision_by_id_not_found(logger: DecisionLogger) -> None:
"""Test that get_decision_by_id returns None for non-existent ID."""
decision = logger.get_decision_by_id("non-existent-uuid")
assert decision is None
def test_unreviewed_limit(logger: DecisionLogger) -> None:
"""Test that get_unreviewed_decisions respects limit parameter."""
# Create 5 unreviewed decisions
for i in range(5):
logger.log_decision(
stock_code=f"00{i}",
market="KR",
exchange_code="KRX",
action="HOLD",
confidence=85,
rationale=f"Decision {i}",
context_snapshot={},
input_data={},
)
# Get only 3
unreviewed = logger.get_unreviewed_decisions(limit=3)
assert len(unreviewed) == 3
def test_decision_log_dataclass() -> None:
"""Test DecisionLog dataclass creation."""
now = datetime.now(UTC).isoformat()
log = DecisionLog(
decision_id="test-uuid",
timestamp=now,
stock_code="005930",
market="KR",
exchange_code="KRX",
action="BUY",
confidence=85,
rationale="Test",
context_snapshot={"L1": "data"},
input_data={"price": 100.0},
)
assert log.decision_id == "test-uuid"
assert log.action == "BUY"
assert log.confidence == 85
assert log.reviewed is False
assert log.outcome_pnl is None

686
tests/test_evolution.py Normal file
View File

@@ -0,0 +1,686 @@
"""Tests for the Evolution Engine components.
Tests cover:
- EvolutionOptimizer: failure analysis and strategy generation
- ABTester: A/B testing and statistical comparison
- PerformanceTracker: metrics tracking and dashboard
"""
from __future__ import annotations
import json
import sqlite3
import tempfile
from datetime import UTC, datetime, timedelta
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, Mock, patch
import pytest
from src.config import Settings
from src.db import init_db, log_trade
from src.evolution.ab_test import ABTester, ABTestResult, StrategyPerformance
from src.evolution.optimizer import EvolutionOptimizer
from src.evolution.performance_tracker import (
PerformanceDashboard,
PerformanceTracker,
StrategyMetrics,
)
from src.logging.decision_logger import DecisionLogger
# ------------------------------------------------------------------
# Fixtures
# ------------------------------------------------------------------
@pytest.fixture
def db_conn() -> sqlite3.Connection:
"""Provide an in-memory database with initialized schema."""
return init_db(":memory:")
@pytest.fixture
def settings() -> Settings:
"""Provide test settings."""
return Settings(
KIS_APP_KEY="test_key",
KIS_APP_SECRET="test_secret",
KIS_ACCOUNT_NO="12345678-01",
GEMINI_API_KEY="test_gemini_key",
GEMINI_MODEL="gemini-pro",
DB_PATH=":memory:",
)
@pytest.fixture
def optimizer(settings: Settings) -> EvolutionOptimizer:
"""Provide an EvolutionOptimizer instance."""
return EvolutionOptimizer(settings)
@pytest.fixture
def decision_logger(db_conn: sqlite3.Connection) -> DecisionLogger:
"""Provide a DecisionLogger instance."""
return DecisionLogger(db_conn)
@pytest.fixture
def ab_tester() -> ABTester:
"""Provide an ABTester instance."""
return ABTester(significance_level=0.05)
@pytest.fixture
def performance_tracker(settings: Settings) -> PerformanceTracker:
"""Provide a PerformanceTracker instance."""
return PerformanceTracker(db_path=":memory:")
# ------------------------------------------------------------------
# EvolutionOptimizer Tests
# ------------------------------------------------------------------
def test_analyze_failures_uses_decision_logger(optimizer: EvolutionOptimizer) -> None:
"""Test that analyze_failures uses DecisionLogger.get_losing_decisions()."""
# Add some losing decisions to the database
logger = optimizer._decision_logger
# High-confidence loss
id1 = logger.log_decision(
stock_code="005930",
market="KR",
exchange_code="KRX",
action="BUY",
confidence=85,
rationale="Expected growth",
context_snapshot={"L1": {"price": 70000}},
input_data={"price": 70000, "volume": 1000},
)
logger.update_outcome(id1, pnl=-2000.0, accuracy=0)
# Another high-confidence loss
id2 = logger.log_decision(
stock_code="000660",
market="KR",
exchange_code="KRX",
action="SELL",
confidence=90,
rationale="Expected drop",
context_snapshot={"L1": {"price": 100000}},
input_data={"price": 100000, "volume": 500},
)
logger.update_outcome(id2, pnl=-1500.0, accuracy=0)
# Low-confidence loss (should be ignored)
id3 = logger.log_decision(
stock_code="035420",
market="KR",
exchange_code="KRX",
action="HOLD",
confidence=70,
rationale="Uncertain",
context_snapshot={},
input_data={},
)
logger.update_outcome(id3, pnl=-500.0, accuracy=0)
# Analyze failures
failures = optimizer.analyze_failures(limit=10)
# Should get 2 failures (confidence >= 80)
assert len(failures) == 2
assert all(f["confidence"] >= 80 for f in failures)
assert all(f["outcome_pnl"] <= -100.0 for f in failures)
def test_analyze_failures_empty_database(optimizer: EvolutionOptimizer) -> None:
"""Test analyze_failures with no losing decisions."""
failures = optimizer.analyze_failures()
assert failures == []
def test_identify_failure_patterns(optimizer: EvolutionOptimizer) -> None:
"""Test identification of failure patterns."""
failures = [
{
"decision_id": "1",
"timestamp": "2024-01-15T09:30:00+00:00",
"stock_code": "005930",
"market": "KR",
"exchange_code": "KRX",
"action": "BUY",
"confidence": 85,
"rationale": "Test",
"outcome_pnl": -1000.0,
"outcome_accuracy": 0,
"context_snapshot": {},
"input_data": {},
},
{
"decision_id": "2",
"timestamp": "2024-01-15T14:30:00+00:00",
"stock_code": "000660",
"market": "KR",
"exchange_code": "KRX",
"action": "SELL",
"confidence": 90,
"rationale": "Test",
"outcome_pnl": -2000.0,
"outcome_accuracy": 0,
"context_snapshot": {},
"input_data": {},
},
{
"decision_id": "3",
"timestamp": "2024-01-15T09:45:00+00:00",
"stock_code": "035420",
"market": "US_NASDAQ",
"exchange_code": "NASDAQ",
"action": "BUY",
"confidence": 80,
"rationale": "Test",
"outcome_pnl": -500.0,
"outcome_accuracy": 0,
"context_snapshot": {},
"input_data": {},
},
]
patterns = optimizer.identify_failure_patterns(failures)
assert patterns["total_failures"] == 3
assert patterns["markets"]["KR"] == 2
assert patterns["markets"]["US_NASDAQ"] == 1
assert patterns["actions"]["BUY"] == 2
assert patterns["actions"]["SELL"] == 1
assert 9 in patterns["hours"] # 09:30 and 09:45
assert 14 in patterns["hours"] # 14:30
assert patterns["avg_confidence"] == 85.0
assert patterns["avg_loss"] == -1166.67
def test_identify_failure_patterns_empty(optimizer: EvolutionOptimizer) -> None:
"""Test pattern identification with no failures."""
patterns = optimizer.identify_failure_patterns([])
assert patterns["pattern_count"] == 0
assert patterns["patterns"] == {}
@pytest.mark.asyncio
async def test_generate_strategy_creates_file(optimizer: EvolutionOptimizer, tmp_path: Path) -> None:
"""Test that generate_strategy creates a strategy file."""
failures = [
{
"decision_id": "1",
"timestamp": "2024-01-15T09:30:00+00:00",
"stock_code": "005930",
"market": "KR",
"action": "BUY",
"confidence": 85,
"outcome_pnl": -1000.0,
"context_snapshot": {},
"input_data": {},
}
]
# Mock Gemini response
mock_response = Mock()
mock_response.text = """
# Simple strategy
price = market_data.get("current_price", 0)
if price > 50000:
return {"action": "BUY", "confidence": 70, "rationale": "Price above threshold"}
return {"action": "HOLD", "confidence": 50, "rationale": "Waiting"}
"""
with patch.object(optimizer._client.aio.models, "generate_content", new=AsyncMock(return_value=mock_response)):
with patch("src.evolution.optimizer.STRATEGIES_DIR", tmp_path):
strategy_path = await optimizer.generate_strategy(failures)
assert strategy_path is not None
assert strategy_path.exists()
assert strategy_path.suffix == ".py"
assert "class Strategy_" in strategy_path.read_text()
assert "def evaluate" in strategy_path.read_text()
@pytest.mark.asyncio
async def test_generate_strategy_handles_api_error(optimizer: EvolutionOptimizer) -> None:
"""Test that generate_strategy handles Gemini API errors gracefully."""
failures = [{"decision_id": "1", "timestamp": "2024-01-15T09:30:00+00:00"}]
with patch.object(
optimizer._client.aio.models,
"generate_content",
side_effect=Exception("API Error"),
):
strategy_path = await optimizer.generate_strategy(failures)
assert strategy_path is None
def test_get_performance_summary() -> None:
"""Test getting performance summary from trades table."""
# Create a temporary database with trades
import tempfile
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as tmp:
tmp_path = tmp.name
conn = init_db(tmp_path)
log_trade(conn, "005930", "BUY", 85, "Test win", quantity=10, price=70000, pnl=1000.0)
log_trade(conn, "000660", "SELL", 90, "Test loss", quantity=5, price=100000, pnl=-500.0)
log_trade(conn, "035420", "BUY", 80, "Test win", quantity=8, price=50000, pnl=800.0)
conn.close()
# Create settings with temp database path
settings = Settings(
KIS_APP_KEY="test_key",
KIS_APP_SECRET="test_secret",
KIS_ACCOUNT_NO="12345678-01",
GEMINI_API_KEY="test_gemini_key",
GEMINI_MODEL="gemini-pro",
DB_PATH=tmp_path,
)
optimizer = EvolutionOptimizer(settings)
summary = optimizer.get_performance_summary()
assert summary["total_trades"] == 3
assert summary["wins"] == 2
assert summary["losses"] == 1
assert summary["total_pnl"] == 1300.0
assert summary["avg_pnl"] == 433.33
# Clean up
Path(tmp_path).unlink()
def test_validate_strategy_success(optimizer: EvolutionOptimizer, tmp_path: Path) -> None:
"""Test strategy validation when tests pass."""
strategy_file = tmp_path / "test_strategy.py"
strategy_file.write_text("# Valid strategy file")
with patch("subprocess.run") as mock_run:
mock_run.return_value = Mock(returncode=0, stdout="", stderr="")
result = optimizer.validate_strategy(strategy_file)
assert result is True
assert strategy_file.exists()
def test_validate_strategy_failure(optimizer: EvolutionOptimizer, tmp_path: Path) -> None:
"""Test strategy validation when tests fail."""
strategy_file = tmp_path / "test_strategy.py"
strategy_file.write_text("# Invalid strategy file")
with patch("subprocess.run") as mock_run:
mock_run.return_value = Mock(returncode=1, stdout="FAILED", stderr="")
result = optimizer.validate_strategy(strategy_file)
assert result is False
# File should be deleted on failure
assert not strategy_file.exists()
# ------------------------------------------------------------------
# ABTester Tests
# ------------------------------------------------------------------
def test_calculate_performance_basic(ab_tester: ABTester) -> None:
"""Test basic performance calculation."""
trades = [
{"pnl": 1000.0},
{"pnl": -500.0},
{"pnl": 800.0},
{"pnl": 200.0},
]
perf = ab_tester.calculate_performance(trades, "TestStrategy")
assert perf.strategy_name == "TestStrategy"
assert perf.total_trades == 4
assert perf.wins == 3
assert perf.losses == 1
assert perf.total_pnl == 1500.0
assert perf.avg_pnl == 375.0
assert perf.win_rate == 75.0
assert perf.sharpe_ratio is not None
def test_calculate_performance_empty(ab_tester: ABTester) -> None:
"""Test performance calculation with no trades."""
perf = ab_tester.calculate_performance([], "EmptyStrategy")
assert perf.total_trades == 0
assert perf.wins == 0
assert perf.losses == 0
assert perf.total_pnl == 0.0
assert perf.avg_pnl == 0.0
assert perf.win_rate == 0.0
assert perf.sharpe_ratio is None
def test_compare_strategies_significant_difference(ab_tester: ABTester) -> None:
"""Test strategy comparison with significant performance difference."""
# Strategy A: consistently profitable
trades_a = [{"pnl": 1000.0} for _ in range(30)]
# Strategy B: consistently losing
trades_b = [{"pnl": -500.0} for _ in range(30)]
result = ab_tester.compare_strategies(trades_a, trades_b, "Strategy A", "Strategy B")
# scipy returns np.True_ instead of Python bool
assert bool(result.is_significant) is True
assert result.winner == "Strategy A"
assert result.p_value < 0.05
assert result.performance_a.avg_pnl > result.performance_b.avg_pnl
def test_compare_strategies_no_difference(ab_tester: ABTester) -> None:
"""Test strategy comparison with no significant difference."""
# Both strategies have similar performance
trades_a = [{"pnl": 100.0}, {"pnl": -50.0}, {"pnl": 80.0}]
trades_b = [{"pnl": 90.0}, {"pnl": -60.0}, {"pnl": 85.0}]
result = ab_tester.compare_strategies(trades_a, trades_b, "Strategy A", "Strategy B")
# With small samples and similar performance, likely not significant
assert result.winner is None or not result.is_significant
def test_should_deploy_meets_criteria(ab_tester: ABTester) -> None:
"""Test deployment decision when criteria are met."""
# Create a winning result that meets criteria
trades_a = [{"pnl": 1000.0} for _ in range(25)] # 100% win rate
trades_b = [{"pnl": -500.0} for _ in range(25)]
result = ab_tester.compare_strategies(trades_a, trades_b, "Winner", "Loser")
should_deploy = ab_tester.should_deploy(result, min_win_rate=60.0, min_trades=20)
assert should_deploy is True
def test_should_deploy_insufficient_trades(ab_tester: ABTester) -> None:
"""Test deployment decision with insufficient trades."""
trades_a = [{"pnl": 1000.0} for _ in range(10)] # Only 10 trades
trades_b = [{"pnl": -500.0} for _ in range(10)]
result = ab_tester.compare_strategies(trades_a, trades_b, "Winner", "Loser")
should_deploy = ab_tester.should_deploy(result, min_win_rate=60.0, min_trades=20)
assert should_deploy is False
def test_should_deploy_low_win_rate(ab_tester: ABTester) -> None:
"""Test deployment decision with low win rate."""
# Mix of wins and losses, below 60% win rate
trades_a = [{"pnl": 100.0}] * 10 + [{"pnl": -100.0}] * 15 # 40% win rate
trades_b = [{"pnl": -500.0} for _ in range(25)]
result = ab_tester.compare_strategies(trades_a, trades_b, "LowWinner", "Loser")
should_deploy = ab_tester.should_deploy(result, min_win_rate=60.0, min_trades=20)
assert should_deploy is False
def test_should_deploy_not_significant(ab_tester: ABTester) -> None:
"""Test deployment decision when difference is not significant."""
# Use more varied data to ensure statistical insignificance
trades_a = [{"pnl": 100.0}, {"pnl": -50.0}] * 12 + [{"pnl": 100.0}]
trades_b = [{"pnl": 95.0}, {"pnl": -45.0}] * 12 + [{"pnl": 95.0}]
result = ab_tester.compare_strategies(trades_a, trades_b, "A", "B")
should_deploy = ab_tester.should_deploy(result, min_win_rate=60.0, min_trades=20)
# Not significant or not profitable enough
# Even if significant, win rate is 50% which is below 60% threshold
assert should_deploy is False
# ------------------------------------------------------------------
# PerformanceTracker Tests
# ------------------------------------------------------------------
def test_get_strategy_metrics(db_conn: sqlite3.Connection) -> None:
"""Test getting strategy metrics."""
# Add some trades
log_trade(db_conn, "005930", "BUY", 85, "Win 1", quantity=10, price=70000, pnl=1000.0)
log_trade(db_conn, "000660", "SELL", 90, "Loss 1", quantity=5, price=100000, pnl=-500.0)
log_trade(db_conn, "035420", "BUY", 80, "Win 2", quantity=8, price=50000, pnl=800.0)
log_trade(db_conn, "005930", "HOLD", 75, "Hold", quantity=0, price=70000, pnl=0.0)
tracker = PerformanceTracker(db_path=":memory:")
# Manually set connection for testing
tracker._db_path = db_conn
# Need to use the same connection
with patch("sqlite3.connect", return_value=db_conn):
metrics = tracker.get_strategy_metrics()
assert metrics.total_trades == 4
assert metrics.wins == 2
assert metrics.losses == 1
assert metrics.holds == 1
assert metrics.win_rate == 50.0
assert metrics.total_pnl == 1300.0
def test_calculate_improvement_trend_improving(performance_tracker: PerformanceTracker) -> None:
"""Test improvement trend calculation for improving strategy."""
metrics = [
StrategyMetrics(
strategy_name="test",
period_start="2024-01-01",
period_end="2024-01-07",
total_trades=10,
wins=5,
losses=5,
holds=0,
win_rate=50.0,
avg_pnl=100.0,
total_pnl=1000.0,
best_trade=500.0,
worst_trade=-300.0,
avg_confidence=75.0,
),
StrategyMetrics(
strategy_name="test",
period_start="2024-01-08",
period_end="2024-01-14",
total_trades=10,
wins=7,
losses=3,
holds=0,
win_rate=70.0,
avg_pnl=200.0,
total_pnl=2000.0,
best_trade=600.0,
worst_trade=-200.0,
avg_confidence=80.0,
),
]
trend = performance_tracker.calculate_improvement_trend(metrics)
assert trend["trend"] == "improving"
assert trend["win_rate_change"] == 20.0
assert trend["pnl_change"] == 100.0
assert trend["confidence_change"] == 5.0
def test_calculate_improvement_trend_declining(performance_tracker: PerformanceTracker) -> None:
"""Test improvement trend calculation for declining strategy."""
metrics = [
StrategyMetrics(
strategy_name="test",
period_start="2024-01-01",
period_end="2024-01-07",
total_trades=10,
wins=7,
losses=3,
holds=0,
win_rate=70.0,
avg_pnl=200.0,
total_pnl=2000.0,
best_trade=600.0,
worst_trade=-200.0,
avg_confidence=80.0,
),
StrategyMetrics(
strategy_name="test",
period_start="2024-01-08",
period_end="2024-01-14",
total_trades=10,
wins=4,
losses=6,
holds=0,
win_rate=40.0,
avg_pnl=-50.0,
total_pnl=-500.0,
best_trade=300.0,
worst_trade=-400.0,
avg_confidence=70.0,
),
]
trend = performance_tracker.calculate_improvement_trend(metrics)
assert trend["trend"] == "declining"
assert trend["win_rate_change"] == -30.0
assert trend["pnl_change"] == -250.0
def test_calculate_improvement_trend_insufficient_data(performance_tracker: PerformanceTracker) -> None:
"""Test improvement trend with insufficient data."""
metrics = [
StrategyMetrics(
strategy_name="test",
period_start="2024-01-01",
period_end="2024-01-07",
total_trades=10,
wins=5,
losses=5,
holds=0,
win_rate=50.0,
avg_pnl=100.0,
total_pnl=1000.0,
best_trade=500.0,
worst_trade=-300.0,
avg_confidence=75.0,
)
]
trend = performance_tracker.calculate_improvement_trend(metrics)
assert trend["trend"] == "insufficient_data"
assert trend["win_rate_change"] == 0.0
assert trend["pnl_change"] == 0.0
def test_export_dashboard_json(performance_tracker: PerformanceTracker) -> None:
"""Test exporting dashboard as JSON."""
overall_metrics = StrategyMetrics(
strategy_name="test",
period_start="2024-01-01",
period_end="2024-01-31",
total_trades=100,
wins=60,
losses=40,
holds=10,
win_rate=60.0,
avg_pnl=150.0,
total_pnl=15000.0,
best_trade=1000.0,
worst_trade=-500.0,
avg_confidence=80.0,
)
dashboard = PerformanceDashboard(
generated_at=datetime.now(UTC).isoformat(),
overall_metrics=overall_metrics,
daily_metrics=[],
weekly_metrics=[],
improvement_trend={"trend": "improving", "win_rate_change": 10.0},
)
json_output = performance_tracker.export_dashboard_json(dashboard)
# Verify it's valid JSON
data = json.loads(json_output)
assert "generated_at" in data
assert "overall_metrics" in data
assert data["overall_metrics"]["total_trades"] == 100
assert data["overall_metrics"]["win_rate"] == 60.0
def test_generate_dashboard() -> None:
"""Test generating a complete dashboard."""
# Create tracker with temp database
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as tmp:
tmp_path = tmp.name
# Initialize with data
conn = init_db(tmp_path)
log_trade(conn, "005930", "BUY", 85, "Win", quantity=10, price=70000, pnl=1000.0)
log_trade(conn, "000660", "SELL", 90, "Loss", quantity=5, price=100000, pnl=-500.0)
conn.close()
tracker = PerformanceTracker(db_path=tmp_path)
dashboard = tracker.generate_dashboard()
assert isinstance(dashboard, PerformanceDashboard)
assert dashboard.overall_metrics.total_trades == 2
assert len(dashboard.daily_metrics) == 7
assert len(dashboard.weekly_metrics) == 4
assert "trend" in dashboard.improvement_trend
# Clean up
Path(tmp_path).unlink()
# ------------------------------------------------------------------
# Integration Tests
# ------------------------------------------------------------------
@pytest.mark.asyncio
async def test_full_evolution_pipeline(optimizer: EvolutionOptimizer, tmp_path: Path) -> None:
"""Test the complete evolution pipeline."""
# Add losing decisions
logger = optimizer._decision_logger
id1 = logger.log_decision(
stock_code="005930",
market="KR",
exchange_code="KRX",
action="BUY",
confidence=85,
rationale="Expected growth",
context_snapshot={},
input_data={},
)
logger.update_outcome(id1, pnl=-2000.0, accuracy=0)
# Mock Gemini and subprocess
mock_response = Mock()
mock_response.text = 'return {"action": "HOLD", "confidence": 50, "rationale": "Test"}'
with patch.object(optimizer._client.aio.models, "generate_content", new=AsyncMock(return_value=mock_response)):
with patch("src.evolution.optimizer.STRATEGIES_DIR", tmp_path):
with patch("subprocess.run") as mock_run:
mock_run.return_value = Mock(returncode=0, stdout="", stderr="")
result = await optimizer.evolve()
assert result is not None
assert "title" in result
assert "branch" in result
assert "status" in result