Compare commits

...

8 Commits

Author SHA1 Message Date
agentson
05e8986ff5 refactor: split CLAUDE.md into focused documentation structure
Some checks failed
CI / test (pull_request) Has been cancelled
- Restructure docs into topic-specific files to minimize context
- Create docs/workflow.md (Git + Agent workflow)
- Create docs/commands.md (Common failures + build commands)
- Create docs/architecture.md (System design + data flow)
- Create docs/testing.md (Test structure + guidelines)
- Rewrite CLAUDE.md as concise hub with links to detailed docs
- Update .gitignore to exclude data/ directory

Benefits:
- Reduced context size for AI assistants
- Faster reference lookups
- Better maintainability
- Topic-focused documentation

Closes #13

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-04 10:13:48 +09:00
3c676c2b8d Merge pull request 'docs: add common command failures and solutions' (#12) from feature/issue-11-command-failures into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #12
2026-02-04 10:03:29 +09:00
agentson
3dd222bd3b docs: add common command failures and solutions
Some checks failed
CI / test (pull_request) Has been cancelled
- Document tea CLI TTY errors and YES="" workaround
- Record wrong parameter names (--body vs --description)
- Add Gitea API hostname corrections
- Include Git configuration errors
- Document Python/pytest common issues

Each failure includes:
-  Failing command
- 💡 Failure reason
-  Working solution
- 📝 Additional notes

Closes #11

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-04 10:02:41 +09:00
f4e6b609a4 Merge pull request 'docs: add agent workflow and parallel execution strategy' (#10) from feature/issue-9-agent-workflow into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #10
2026-02-04 09:51:01 +09:00
agentson
9c5bd254b5 docs: add agent workflow and parallel execution strategy
Some checks failed
CI / test (pull_request) Has been cancelled
- Document modern AI development workflow using specialized agents
- Add guidelines for when to use git worktree/subagents vs main conversation
- Define agent roles: ticket mgmt, design, code, test, docs, review
- Include implementation examples with Task tool
- Update test count (35 → 54) with new market_schedule tests

Closes #9

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-04 09:47:14 +09:00
5c9261ce5b Merge pull request 'feat: implement timezone-based global market auto-selection' (#7) from feature/issue-5-global-market-auto-selection into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #7
2026-02-04 09:33:39 +09:00
ef4305cfc3 Merge pull request 'docs: add Git workflow policy to CLAUDE.md' (#6) from feature/issue-4-add-git-workflow-policy into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #6
2026-02-04 09:33:16 +09:00
agentson
b26ff0c1b8 feat: implement timezone-based global market auto-selection
Some checks failed
CI / test (pull_request) Has been cancelled
Implement comprehensive multi-market trading system with automatic
market selection based on timezone and trading hours.

## New Features
- Market schedule module with 10 global markets (KR, US, JP, HK, CN, VN)
- Overseas broker for KIS API international stock trading
- Automatic market detection based on current time and timezone
- Next market open waiting logic when all markets closed
- ConnectionError retry with exponential backoff (max 3 attempts)

## Architecture Changes
- Market-aware trading cycle with domestic/overseas broker routing
- Market context in AI prompts for better decision making
- Database schema extended with market and exchange_code columns
- Config setting ENABLED_MARKETS for market selection

## Testing
- 19 new tests for market schedule (timezone, DST, lunch breaks)
- All 54 tests passing
- Lint fixes with ruff

## Files Added
- src/markets/schedule.py - Market schedule and timezone logic
- src/broker/overseas.py - KIS overseas stock API client
- tests/test_market_schedule.py - Market schedule test suite

## Files Modified
- src/main.py - Multi-market main loop with retry logic
- src/config.py - ENABLED_MARKETS setting
- src/db.py - market/exchange_code columns with migration
- src/brain/gemini_client.py - Dynamic market context in prompts

Resolves #5

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-04 09:29:25 +09:00
22 changed files with 1588 additions and 136 deletions

1
.gitignore vendored
View File

@@ -174,3 +174,4 @@ cython_debug/
# PyPI configuration file
.pypirc
data/

132
CLAUDE.md
View File

@@ -1,79 +1,97 @@
# CLAUDE.md
# The Ouroboros
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
AI-powered trading agent for global stock markets with self-evolution capabilities.
## Git Workflow Policy
**CRITICAL: All code changes MUST follow this workflow. Direct pushes to `main` are ABSOLUTELY PROHIBITED.**
1. **Create Gitea Issue First** — All features, bug fixes, and policy changes require a Gitea issue before any code is written
2. **Create Feature Branch** — Branch from `main` using format `feature/issue-{N}-{short-description}`
3. **Implement Changes** — Write code, tests, and documentation on the feature branch
4. **Create Pull Request** — Submit PR to `main` branch referencing the issue number
5. **Review & Merge** — After approval, merge via PR (squash or merge commit)
**Never commit directly to `main`.** This policy applies to all changes, no exceptions.
## Build & Test Commands
## Quick Start
```bash
# Install all dependencies (production + dev)
pip install ".[dev]"
# Setup
pip install -e ".[dev]"
cp .env.example .env
# Edit .env with your KIS and Gemini API credentials
# Run full test suite with coverage
pytest -v --cov=src --cov-report=term-missing
# Test
pytest -v --cov=src
# Run a single test file
pytest tests/test_risk.py -v
# Run a single test by name
pytest tests/test_brain.py -k "test_parse_valid_json" -v
# Lint
ruff check src/ tests/
# Type check (strict mode, non-blocking in CI)
mypy src/ --strict
# Run the trading agent
# Run (paper trading)
python -m src.main --mode=paper
# Docker
docker compose up -d ouroboros # Run agent
docker compose --profile test up test # Run tests in container
```
## Architecture
## Documentation
Self-evolving AI trading agent for Korean stock markets (KIS API). The main loop in `src/main.py` orchestrates four components in a 60-second cycle per stock:
- **[Workflow Guide](docs/workflow.md)** — Git workflow policy and agent-based development
- **[Command Reference](docs/commands.md)** — Common failures, build commands, troubleshooting
- **[Architecture](docs/architecture.md)** — System design, components, data flow
- **[Testing](docs/testing.md)** — Test structure, coverage requirements, writing tests
- **[Agent Policies](docs/agents.md)** — Prime directives, constraints, prohibited actions
1. **Broker** (`src/broker/kis_api.py`) — Async KIS API client with automatic OAuth token refresh, leaky-bucket rate limiter (10 RPS), and POST body hash-key signing. Uses a custom SSL context with disabled hostname verification for the VTS (virtual trading) endpoint due to a known certificate mismatch.
## Core Principles
2. **Brain** (`src/brain/gemini_client.py`) — Sends structured prompts to Google Gemini, parses JSON responses into `TradeDecision` objects. Forces HOLD when confidence < threshold (default 80). Falls back to safe HOLD on any parse/API error.
1. **Safety First** — Risk manager is READ-ONLY and enforces circuit breakers
2. **Test Everything** — 80% coverage minimum, all changes require tests
3. **Issue-Driven Development** — All work goes through Gitea issues → feature branches → PRs
4. **Agent Specialization** — Use dedicated agents for design, coding, testing, docs, review
3. **Risk Manager** (`src/core/risk_manager.py`) — **READ-ONLY by policy** (see `docs/agents.md`). Circuit breaker halts all trading via `SystemExit` when daily P&L drops below -3.0%. Fat-finger check rejects orders exceeding 30% of available cash.
## Project Structure
4. **Evolution** (`src/evolution/optimizer.py`) — Analyzes high-confidence losing trades from SQLite, asks Gemini to generate new `BaseStrategy` subclasses, validates them by running the full pytest suite, and simulates PR creation.
```
src/
├── broker/ # KIS API client (domestic + overseas)
├── brain/ # Gemini AI decision engine
├── core/ # Risk manager (READ-ONLY)
├── evolution/ # Self-improvement optimizer
├── markets/ # Market schedules and timezone handling
├── db.py # SQLite trade logging
├── main.py # Trading loop orchestrator
└── config.py # Settings (from .env)
**Data flow per cycle:** Fetch orderbook + balance → calculate P&L → get Gemini decision → validate with risk manager → execute order → log to SQLite (`src/db.py`).
tests/ # 54 tests across 4 files
docs/ # Extended documentation
```
## Key Constraints (from `docs/agents.md`)
## Key Commands
- `core/risk_manager.py` is **READ-ONLY**. Changes require human approval.
- Circuit breaker threshold (-3.0%) may only be made stricter, never relaxed.
- Fat-finger protection (30% max order size) must always be enforced.
- Confidence < 80 **must** force HOLD — this rule cannot be weakened.
- All code changes require corresponding tests. Coverage must stay >= 80%.
- Generated strategies must pass the full test suite before activation.
```bash
pytest -v --cov=src # Run tests with coverage
ruff check src/ tests/ # Lint
mypy src/ --strict # Type check
## Configuration
python -m src.main --mode=paper # Paper trading
python -m src.main --mode=live # Live trading (⚠️ real money)
Pydantic Settings loaded from `.env` (see `.env.example`). Required vars: `KIS_APP_KEY`, `KIS_APP_SECRET`, `KIS_ACCOUNT_NO` (format `XXXXXXXX-XX`), `GEMINI_API_KEY`. Tests use in-memory SQLite (`DB_PATH=":memory:"`) and dummy credentials via `tests/conftest.py`.
# Gitea workflow (requires tea CLI)
YES="" ~/bin/tea issues create --repo jihoson/The-Ouroboros --title "..." --description "..."
YES="" ~/bin/tea pulls create --head feature-branch --base main --title "..." --description "..."
```
## Test Structure
## Markets Supported
35 tests across three files. `asyncio_mode = "auto"` in pyproject.toml — async tests need no special decorator. The `settings` fixture in `conftest.py` provides safe defaults with test credentials and in-memory DB.
- 🇰🇷 Korea (KRX)
- 🇺🇸 United States (NASDAQ, NYSE, AMEX)
- 🇯🇵 Japan (TSE)
- 🇭🇰 Hong Kong (SEHK)
- 🇨🇳 China (Shanghai, Shenzhen)
- 🇻🇳 Vietnam (Hanoi, HCM)
- `test_risk.py` (11) — Circuit breaker boundaries, fat-finger edge cases
- `test_broker.py` (6) — Token lifecycle, rate limiting, hash keys, network errors
- `test_brain.py` (18) — JSON parsing, confidence threshold, malformed responses, prompt construction
Markets auto-detected based on timezone and enabled in `ENABLED_MARKETS` env variable.
## Critical Constraints
⚠️ **Non-Negotiable Rules** (see [docs/agents.md](docs/agents.md)):
- `src/core/risk_manager.py` is **READ-ONLY** — changes require human approval
- Circuit breaker at -3.0% P&L — may only be made **stricter**
- Fat-finger protection: max 30% of cash per order — always enforced
- Confidence < 80 → force HOLD — cannot be weakened
- All code changes → corresponding tests → coverage ≥ 80%
## Contributing
See [docs/workflow.md](docs/workflow.md) for the complete development process.
**TL;DR:**
1. Create issue in Gitea
2. Create feature branch: `feature/issue-N-description`
3. Implement with tests
4. Open PR
5. Merge after review

191
docs/architecture.md Normal file
View File

@@ -0,0 +1,191 @@
# System Architecture
## Overview
Self-evolving AI trading agent for global stock markets via KIS (Korea Investment & Securities) API. The main loop in `src/main.py` orchestrates four components in a 60-second cycle per stock across multiple markets.
## Core Components
### 1. Broker (`src/broker/`)
**KISBroker** (`kis_api.py`) — Async KIS API client for domestic Korean market
- Automatic OAuth token refresh (valid for 24 hours)
- Leaky-bucket rate limiter (10 requests per second)
- POST body hash-key signing for order authentication
- Custom SSL context with disabled hostname verification for VTS (virtual trading) endpoint due to known certificate mismatch
**OverseasBroker** (`overseas.py`) — KIS overseas stock API wrapper
- Reuses KISBroker infrastructure (session, token, rate limiter) via composition
- Supports 9 global markets: US (NASDAQ/NYSE/AMEX), Japan, Hong Kong, China (Shanghai/Shenzhen), Vietnam (Hanoi/HCM)
- Different API endpoints for overseas price/balance/order operations
**Market Schedule** (`src/markets/schedule.py`) — Timezone-aware market management
- `MarketInfo` dataclass with timezone, trading hours, lunch breaks
- Automatic DST handling via `zoneinfo.ZoneInfo`
- `is_market_open()` checks weekends, trading hours, lunch breaks
- `get_open_markets()` returns currently active markets
- `get_next_market_open()` finds next market to open and when
### 2. Brain (`src/brain/gemini_client.py`)
**GeminiClient** — AI decision engine powered by Google Gemini
- Constructs structured prompts from market data
- Parses JSON responses into `TradeDecision` objects (`action`, `confidence`, `rationale`)
- Forces HOLD when confidence < threshold (default 80)
- Falls back to safe HOLD on any parse/API error
- Handles markdown-wrapped JSON, malformed responses, invalid actions
### 3. Risk Manager (`src/core/risk_manager.py`)
**RiskManager** — Safety circuit breaker and order validation
⚠️ **READ-ONLY by policy** (see [`docs/agents.md`](./agents.md))
- **Circuit Breaker**: Halts all trading via `SystemExit` when daily P&L drops below -3.0%
- Threshold may only be made stricter, never relaxed
- Calculated as `(total_eval - purchase_total) / purchase_total * 100`
- **Fat-Finger Protection**: Rejects orders exceeding 30% of available cash
- Must always be enforced, cannot be disabled
### 4. Evolution (`src/evolution/optimizer.py`)
**StrategyOptimizer** — Self-improvement loop
- Analyzes high-confidence losing trades from SQLite
- Asks Gemini to generate new `BaseStrategy` subclasses
- Validates generated strategies by running full pytest suite
- Simulates PR creation for human review
- Only activates strategies that pass all tests
## Data Flow
```
┌─────────────────────────────────────────────────────────────┐
│ Main Loop (60s cycle per stock, per market) │
└─────────────────────────────────────────────────────────────┘
┌──────────────────────────────────┐
│ Market Schedule Check │
│ - Get open markets │
│ - Filter by enabled markets │
│ - Wait if all closed │
└──────────────────┬────────────────┘
┌──────────────────────────────────┐
│ Broker: Fetch Market Data │
│ - Domestic: orderbook + balance │
│ - Overseas: price + balance │
└──────────────────┬────────────────┘
┌──────────────────────────────────┐
│ Calculate P&L │
│ pnl_pct = (eval - cost) / cost │
└──────────────────┬────────────────┘
┌──────────────────────────────────┐
│ Brain: Get Decision │
│ - Build prompt with market data │
│ - Call Gemini API │
│ - Parse JSON response │
│ - Return TradeDecision │
└──────────────────┬────────────────┘
┌──────────────────────────────────┐
│ Risk Manager: Validate Order │
│ - Check circuit breaker │
│ - Check fat-finger limit │
│ - Raise if validation fails │
└──────────────────┬────────────────┘
┌──────────────────────────────────┐
│ Broker: Execute Order │
│ - Domestic: send_order() │
│ - Overseas: send_overseas_order() │
└──────────────────┬────────────────┘
┌──────────────────────────────────┐
│ Database: Log Trade │
│ - SQLite (data/trades.db) │
│ - Track: action, confidence, │
│ rationale, market, exchange │
└───────────────────────────────────┘
```
## Database Schema
**SQLite** (`src/db.py`)
```sql
CREATE TABLE trades (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
stock_code TEXT NOT NULL,
action TEXT NOT NULL, -- BUY | SELL | HOLD
confidence INTEGER NOT NULL, -- 0-100
rationale TEXT,
quantity INTEGER,
price REAL,
pnl REAL DEFAULT 0.0,
market TEXT DEFAULT 'KR', -- KR | US_NASDAQ | JP | etc.
exchange_code TEXT DEFAULT 'KRX' -- KRX | NASD | NYSE | etc.
);
```
Auto-migration: Adds `market` and `exchange_code` columns if missing for backward compatibility.
## Configuration
**Pydantic Settings** (`src/config.py`)
Loaded from `.env` file:
```bash
# Required
KIS_APP_KEY=your_app_key
KIS_APP_SECRET=your_app_secret
KIS_ACCOUNT_NO=XXXXXXXX-XX
GEMINI_API_KEY=your_gemini_key
# Optional
MODE=paper # paper | live
DB_PATH=data/trades.db
CONFIDENCE_THRESHOLD=80
MAX_LOSS_PCT=3.0
MAX_ORDER_PCT=30.0
ENABLED_MARKETS=KR,US_NASDAQ # Comma-separated market codes
```
Tests use in-memory SQLite (`DB_PATH=":memory:"`) and dummy credentials via `tests/conftest.py`.
## Error Handling
### Connection Errors (Broker API)
- Retry with exponential backoff (2^attempt seconds)
- Max 3 retries per stock
- After exhaustion, skip stock and continue with next
### API Quota Errors (Gemini)
- Return safe HOLD decision with confidence=0
- Log error but don't crash
- Agent continues trading on next cycle
### Circuit Breaker Tripped
- Immediately halt via `SystemExit`
- Log critical message
- Requires manual intervention to restart
### Market Closed
- Wait until next market opens
- Use `get_next_market_open()` to calculate wait time
- Sleep until market open time

156
docs/commands.md Normal file
View File

@@ -0,0 +1,156 @@
# Command Reference
## Common Command Failures
**Critical: Learn from failures. Never repeat the same failed command without modification.**
### tea CLI (Gitea Command Line Tool)
#### ❌ TTY Error - Interactive Confirmation Fails
```bash
~/bin/tea issues create --repo X --title "Y" --description "Z"
# Error: huh: could not open a new TTY: open /dev/tty: no such device or address
```
**💡 Reason:** tea tries to open `/dev/tty` for interactive confirmation prompts, which is unavailable in non-interactive environments.
**✅ Solution:** Use `YES=""` environment variable to bypass confirmation
```bash
YES="" ~/bin/tea issues create --repo jihoson/The-Ouroboros --title "Title" --description "Body"
YES="" ~/bin/tea issues edit <number> --repo jihoson/The-Ouroboros --description "Updated body"
YES="" ~/bin/tea pulls create --repo jihoson/The-Ouroboros --head feature-branch --base main --title "Title" --description "Body"
```
**📝 Notes:**
- Always set default login: `~/bin/tea login default local`
- Use `--repo jihoson/The-Ouroboros` when outside repo directory
- tea is preferred over direct Gitea API calls for consistency
#### ❌ Wrong Parameter Name
```bash
tea issues create --body "text"
# Error: flag provided but not defined: -body
```
**💡 Reason:** Parameter is `--description`, not `--body`.
**✅ Solution:** Use correct parameter name
```bash
YES="" ~/bin/tea issues create --description "text"
```
### Gitea API (Direct HTTP Calls)
#### ❌ Wrong Hostname
```bash
curl http://gitea.local:3000/api/v1/...
# Error: Could not resolve host: gitea.local
```
**💡 Reason:** Gitea instance runs on `localhost:3000`, not `gitea.local`.
**✅ Solution:** Use correct hostname (but prefer tea CLI)
```bash
curl http://localhost:3000/api/v1/repos/jihoson/The-Ouroboros/issues \
-H "Authorization: token $GITEA_TOKEN" \
-H "Content-Type: application/json" \
-d '{"title":"...", "body":"..."}'
```
**📝 Notes:**
- Prefer `tea` CLI over direct API calls
- Only use curl for operations tea doesn't support
### Git Commands
#### ❌ User Not Configured
```bash
git commit -m "message"
# Error: Author identity unknown
```
**💡 Reason:** Git user.name and user.email not set.
**✅ Solution:** Configure git user
```bash
git config user.name "agentson"
git config user.email "agentson@localhost"
```
#### ❌ Permission Denied on Push
```bash
git push origin branch
# Error: User permission denied for writing
```
**💡 Reason:** Repository access token lacks write permissions or user lacks repo write access.
**✅ Solution:**
1. Verify user has write access to repository (admin grants this)
2. Ensure git credential has correct token with `write:repository` scope
3. Check remote URL uses correct authentication
### Python/Pytest
#### ❌ Module Import Error
```bash
pytest tests/test_foo.py
# ModuleNotFoundError: No module named 'src'
```
**💡 Reason:** Package not installed in development mode.
**✅ Solution:** Install package with dev dependencies
```bash
pip install -e ".[dev]"
```
#### ❌ Async Test Hangs
```python
async def test_something(): # Hangs forever
result = await async_function()
```
**💡 Reason:** Missing pytest-asyncio or wrong configuration.
**✅ Solution:** Already configured in pyproject.toml
```toml
[tool.pytest.ini_options]
asyncio_mode = "auto"
```
No decorator needed for async tests.
## Build & Test Commands
```bash
# Install all dependencies (production + dev)
pip install -e ".[dev]"
# Run full test suite with coverage
pytest -v --cov=src --cov-report=term-missing
# Run a single test file
pytest tests/test_risk.py -v
# Run a single test by name
pytest tests/test_brain.py -k "test_parse_valid_json" -v
# Lint
ruff check src/ tests/
# Type check (strict mode, non-blocking in CI)
mypy src/ --strict
# Run the trading agent
python -m src.main --mode=paper
# Docker
docker compose up -d ouroboros # Run agent
docker compose --profile test up test # Run tests in container
```
## Environment Setup
```bash
# Create .env file from example
cp .env.example .env
# Edit .env with your credentials
# Required: KIS_APP_KEY, KIS_APP_SECRET, KIS_ACCOUNT_NO, GEMINI_API_KEY
# Verify configuration
python -c "from src.config import Settings; print(Settings())"
```

213
docs/testing.md Normal file
View File

@@ -0,0 +1,213 @@
# Testing Guidelines
## Test Structure
**54 tests** across four files. `asyncio_mode = "auto"` in pyproject.toml — async tests need no special decorator.
The `settings` fixture in `conftest.py` provides safe defaults with test credentials and in-memory DB.
### Test Files
#### `tests/test_risk.py` (11 tests)
- Circuit breaker boundaries
- Fat-finger edge cases
- P&L calculation edge cases
- Order validation logic
**Example:**
```python
def test_circuit_breaker_exact_threshold(risk_manager):
"""Circuit breaker should trip at exactly -3.0%."""
with pytest.raises(CircuitBreakerTripped):
risk_manager.validate_order(
current_pnl_pct=-3.0,
order_amount=1000,
total_cash=10000
)
```
#### `tests/test_broker.py` (6 tests)
- OAuth token lifecycle
- Rate limiting enforcement
- Hash key generation
- Network error handling
- SSL context configuration
**Example:**
```python
async def test_rate_limiter(broker):
"""Rate limiter should delay requests to stay under 10 RPS."""
start = time.monotonic()
for _ in range(15): # 15 requests
await broker._rate_limiter.acquire()
elapsed = time.monotonic() - start
assert elapsed >= 1.0 # Should take at least 1 second
```
#### `tests/test_brain.py` (18 tests)
- Valid JSON parsing
- Markdown-wrapped JSON handling
- Malformed JSON fallback
- Missing fields handling
- Invalid action validation
- Confidence threshold enforcement
- Empty response handling
- Prompt construction for different markets
**Example:**
```python
async def test_confidence_below_threshold_forces_hold(brain):
"""Decisions below confidence threshold should force HOLD."""
decision = brain.parse_response('{"action":"BUY","confidence":70,"rationale":"test"}')
assert decision.action == "HOLD"
assert decision.confidence == 70
```
#### `tests/test_market_schedule.py` (19 tests)
- Market open/close logic
- Timezone handling (UTC, Asia/Seoul, America/New_York, etc.)
- DST (Daylight Saving Time) transitions
- Weekend handling
- Lunch break logic
- Multiple market filtering
- Next market open calculation
**Example:**
```python
def test_is_market_open_during_trading_hours():
"""Market should be open during regular trading hours."""
# KRX: 9:00-15:30 KST, no lunch break
market = MARKETS["KR"]
trading_time = datetime(2026, 2, 3, 10, 0, tzinfo=ZoneInfo("Asia/Seoul")) # Monday 10:00
assert is_market_open(market, trading_time) is True
```
## Coverage Requirements
**Minimum coverage: 80%**
Check coverage:
```bash
pytest -v --cov=src --cov-report=term-missing
```
Expected output:
```
Name Stmts Miss Cover Missing
-----------------------------------------------------------
src/brain/gemini_client.py 85 5 94% 165-169
src/broker/kis_api.py 120 12 90% ...
src/core/risk_manager.py 35 2 94% ...
src/db.py 25 1 96% ...
src/main.py 150 80 47% (excluded from CI)
src/markets/schedule.py 95 3 97% ...
-----------------------------------------------------------
TOTAL 510 103 80%
```
**Note:** `main.py` has lower coverage as it contains the main loop which is tested via integration/manual testing.
## Test Configuration
### `pyproject.toml`
```toml
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
python_files = ["test_*.py"]
```
### `tests/conftest.py`
```python
@pytest.fixture
def settings() -> Settings:
"""Provide test settings with safe defaults."""
return Settings(
KIS_APP_KEY="test_key",
KIS_APP_SECRET="test_secret",
KIS_ACCOUNT_NO="12345678-01",
GEMINI_API_KEY="test_gemini_key",
MODE="paper",
DB_PATH=":memory:", # In-memory SQLite
CONFIDENCE_THRESHOLD=80,
ENABLED_MARKETS="KR",
)
```
## Writing New Tests
### Naming Convention
- Test files: `test_<module>.py`
- Test functions: `test_<feature>_<scenario>()`
- Use descriptive names that explain what is being tested
### Good Test Example
```python
async def test_send_order_with_market_price(broker, settings):
"""Market orders should use price=0 and ORD_DVSN='01'."""
# Arrange
stock_code = "005930"
order_type = "BUY"
quantity = 10
# Act
with patch.object(broker._session, 'post') as mock_post:
mock_post.return_value.__aenter__.return_value.status = 200
mock_post.return_value.__aenter__.return_value.json = AsyncMock(
return_value={"rt_cd": "0", "msg1": "OK"}
)
await broker.send_order(stock_code, order_type, quantity, price=0)
# Assert
call_args = mock_post.call_args
body = call_args.kwargs['json']
assert body['ORD_DVSN'] == '01' # Market order
assert body['ORD_UNPR'] == '0' # Price 0
```
### Test Checklist
- [ ] Test passes in isolation (`pytest tests/test_foo.py::test_bar -v`)
- [ ] Test has clear docstring explaining what it tests
- [ ] Arrange-Act-Assert structure
- [ ] Uses appropriate fixtures from conftest.py
- [ ] Mocks external dependencies (API calls, network)
- [ ] Tests edge cases and error conditions
- [ ] Doesn't rely on test execution order
## Running Tests
```bash
# All tests
pytest -v
# Specific file
pytest tests/test_risk.py -v
# Specific test
pytest tests/test_brain.py::test_parse_valid_json -v
# With coverage
pytest -v --cov=src --cov-report=term-missing
# Stop on first failure
pytest -x
# Verbose output with print statements
pytest -v -s
```
## CI/CD Integration
Tests run automatically on:
- Every commit to feature branches
- Every PR to main
- Scheduled daily runs
**Blocking conditions:**
- Test failures → PR blocked
- Coverage < 80% → PR blocked (warning only for main.py)
**Non-blocking:**
- `mypy --strict` errors (type hints encouraged but not enforced)
- `ruff check` warnings (must be acknowledged)

75
docs/workflow.md Normal file
View File

@@ -0,0 +1,75 @@
# Development Workflow
## Git Workflow Policy
**CRITICAL: All code changes MUST follow this workflow. Direct pushes to `main` are ABSOLUTELY PROHIBITED.**
1. **Create Gitea Issue First** — All features, bug fixes, and policy changes require a Gitea issue before any code is written
2. **Create Feature Branch** — Branch from `main` using format `feature/issue-{N}-{short-description}`
3. **Implement Changes** — Write code, tests, and documentation on the feature branch
4. **Create Pull Request** — Submit PR to `main` branch referencing the issue number
5. **Review & Merge** — After approval, merge via PR (squash or merge commit)
**Never commit directly to `main`.** This policy applies to all changes, no exceptions.
## Agent Workflow
**Modern AI development leverages specialized agents for concurrent, efficient task execution.**
### Parallel Execution Strategy
Use **git worktree** or **subagents** (via the Task tool) to handle multiple requirements simultaneously:
- Each task runs in independent context
- Parallel branches for concurrent features
- Isolated test environments prevent interference
- Faster iteration with distributed workload
### Specialized Agent Roles
Deploy task-specific agents as needed instead of handling everything in the main conversation:
- **Conversational Agent** (main) — Interface with user, coordinate other agents
- **Ticket Management Agent** — Create/update Gitea issues, track task status
- **Design Agent** — Architectural planning, RFC documents, API design
- **Code Writing Agent** — Implementation following specs
- **Testing Agent** — Write tests, verify coverage, run test suites
- **Documentation Agent** — Update docs, docstrings, CLAUDE.md, README
- **Review Agent** — Code review, lint checks, security audits
- **Custom Agents** — Created dynamically for specialized tasks (performance analysis, migration scripts, etc.)
### When to Use Agents
**Prefer spawning specialized agents for:**
1. Complex multi-file changes requiring exploration
2. Tasks with clear, isolated scope (e.g., "write tests for module X")
3. Parallel work streams (feature A + bugfix B simultaneously)
4. Long-running analysis (codebase search, dependency audit)
5. Tasks requiring different contexts (multiple git worktrees)
**Use the main conversation for:**
1. User interaction and clarification
2. Quick single-file edits
3. Coordinating agent work
4. High-level decision making
### Implementation
```python
# Example: Spawn parallel test and documentation agents
task_tool(
subagent_type="general-purpose",
prompt="Write comprehensive tests for src/markets/schedule.py",
description="Write schedule tests"
)
task_tool(
subagent_type="general-purpose",
prompt="Update README.md with global market feature documentation",
description="Update README"
)
```
Use `run_in_background=True` for independent tasks that don't block subsequent work.

View File

@@ -49,15 +49,40 @@ class GeminiClient:
The prompt instructs Gemini to return valid JSON with action,
confidence, and rationale fields.
"""
market_name = market_data.get("market_name", "Korean stock market")
# Build market data section dynamically based on available fields
market_info_lines = [
f"Market: {market_name}",
f"Stock Code: {market_data['stock_code']}",
f"Current Price: {market_data['current_price']}",
]
# Add orderbook if available (domestic markets)
if "orderbook" in market_data:
market_info_lines.append(
f"Orderbook: {json.dumps(market_data['orderbook'], ensure_ascii=False)}"
)
# Add foreigner net if non-zero
if market_data.get("foreigner_net", 0) != 0:
market_info_lines.append(
f"Foreigner Net Buy/Sell: {market_data['foreigner_net']}"
)
market_info = "\n".join(market_info_lines)
json_format = (
'{"action": "BUY"|"SELL"|"HOLD", '
'"confidence": <int 0-100>, "rationale": "<string>"}'
)
return (
"You are a professional Korean stock market trading analyst.\n"
"Analyze the following market data and decide whether to BUY, SELL, or HOLD.\n\n"
f"Stock Code: {market_data['stock_code']}\n"
f"Current Price: {market_data['current_price']}\n"
f"Orderbook: {json.dumps(market_data['orderbook'], ensure_ascii=False)}\n"
f"Foreigner Net Buy/Sell: {market_data['foreigner_net']}\n\n"
f"You are a professional {market_name} trading analyst.\n"
"Analyze the following market data and decide whether to "
"BUY, SELL, or HOLD.\n\n"
f"{market_info}\n\n"
"You MUST respond with ONLY valid JSON in the following format:\n"
'{"action": "BUY"|"SELL"|"HOLD", "confidence": <int 0-100>, "rationale": "<string>"}\n\n'
f"{json_format}\n\n"
"Rules:\n"
"- action must be exactly one of: BUY, SELL, HOLD\n"
"- confidence must be an integer from 0 to 100\n"

View File

@@ -6,11 +6,8 @@ Handles token refresh, rate limiting (leaky bucket), and hash key generation.
from __future__ import annotations
import asyncio
import hashlib
import json
import logging
import ssl
import time
from typing import Any
import aiohttp
@@ -168,7 +165,7 @@ class KISBroker:
f"get_orderbook failed ({resp.status}): {text}"
)
return await resp.json()
except (aiohttp.ClientError, asyncio.TimeoutError) as exc:
except (TimeoutError, aiohttp.ClientError) as exc:
raise ConnectionError(f"Network error fetching orderbook: {exc}") from exc
async def get_balance(self) -> dict[str, Any]:
@@ -200,7 +197,7 @@ class KISBroker:
f"get_balance failed ({resp.status}): {text}"
)
return await resp.json()
except (aiohttp.ClientError, asyncio.TimeoutError) as exc:
except (TimeoutError, aiohttp.ClientError) as exc:
raise ConnectionError(f"Network error fetching balance: {exc}") from exc
async def send_order(
@@ -253,5 +250,5 @@ class KISBroker:
},
)
return data
except (aiohttp.ClientError, asyncio.TimeoutError) as exc:
except (TimeoutError, aiohttp.ClientError) as exc:
raise ConnectionError(f"Network error sending order: {exc}") from exc

200
src/broker/overseas.py Normal file
View File

@@ -0,0 +1,200 @@
"""KIS Overseas Stock API client."""
from __future__ import annotations
import logging
from typing import Any
import aiohttp
from src.broker.kis_api import KISBroker
logger = logging.getLogger(__name__)
class OverseasBroker:
"""KIS Overseas Stock API wrapper that reuses KISBroker infrastructure."""
def __init__(self, kis_broker: KISBroker) -> None:
"""
Initialize overseas broker.
Args:
kis_broker: Domestic KIS broker instance to reuse session/token/rate limiter
"""
self._broker = kis_broker
async def get_overseas_price(
self, exchange_code: str, stock_code: str
) -> dict[str, Any]:
"""
Fetch overseas stock price.
Args:
exchange_code: Exchange code (e.g., "NASD", "NYSE", "TSE")
stock_code: Stock ticker symbol
Returns:
API response with price data
Raises:
ConnectionError: On network or API errors
"""
await self._broker._rate_limiter.acquire()
session = self._broker._get_session()
headers = await self._broker._auth_headers("HHDFS00000300")
params = {
"AUTH": "",
"EXCD": exchange_code,
"SYMB": stock_code,
}
url = f"{self._broker._base_url}/uapi/overseas-price/v1/quotations/price"
try:
async with session.get(url, headers=headers, params=params) as resp:
if resp.status != 200:
text = await resp.text()
raise ConnectionError(
f"get_overseas_price failed ({resp.status}): {text}"
)
return await resp.json()
except (TimeoutError, aiohttp.ClientError) as exc:
raise ConnectionError(
f"Network error fetching overseas price: {exc}"
) from exc
async def get_overseas_balance(self, exchange_code: str) -> dict[str, Any]:
"""
Fetch overseas account balance.
Args:
exchange_code: Exchange code (e.g., "NASD", "NYSE")
Returns:
API response with balance data
Raises:
ConnectionError: On network or API errors
"""
await self._broker._rate_limiter.acquire()
session = self._broker._get_session()
# Virtual trading TR_ID for overseas balance inquiry
headers = await self._broker._auth_headers("VTTS3012R")
params = {
"CANO": self._broker._account_no,
"ACNT_PRDT_CD": self._broker._product_cd,
"OVRS_EXCG_CD": exchange_code,
"TR_CRCY_CD": self._get_currency_code(exchange_code),
"CTX_AREA_FK200": "",
"CTX_AREA_NK200": "",
}
url = (
f"{self._broker._base_url}/uapi/overseas-stock/v1/trading/inquire-balance"
)
try:
async with session.get(url, headers=headers, params=params) as resp:
if resp.status != 200:
text = await resp.text()
raise ConnectionError(
f"get_overseas_balance failed ({resp.status}): {text}"
)
return await resp.json()
except (TimeoutError, aiohttp.ClientError) as exc:
raise ConnectionError(
f"Network error fetching overseas balance: {exc}"
) from exc
async def send_overseas_order(
self,
exchange_code: str,
stock_code: str,
order_type: str, # "BUY" or "SELL"
quantity: int,
price: float = 0.0,
) -> dict[str, Any]:
"""
Submit overseas stock order.
Args:
exchange_code: Exchange code (e.g., "NASD", "NYSE")
stock_code: Stock ticker symbol
order_type: "BUY" or "SELL"
quantity: Number of shares
price: Order price (0 for market order)
Returns:
API response with order result
Raises:
ConnectionError: On network or API errors
"""
await self._broker._rate_limiter.acquire()
session = self._broker._get_session()
# Virtual trading TR_IDs for overseas orders
tr_id = "VTTT1002U" if order_type == "BUY" else "VTTT1006U"
body = {
"CANO": self._broker._account_no,
"ACNT_PRDT_CD": self._broker._product_cd,
"OVRS_EXCG_CD": exchange_code,
"PDNO": stock_code,
"ORD_DVSN": "00" if price > 0 else "01", # 00=지정가, 01=시장가
"ORD_QTY": str(quantity),
"OVRS_ORD_UNPR": str(price) if price > 0 else "0",
"ORD_SVR_DVSN_CD": "0", # 0=해외주문
}
hash_key = await self._broker._get_hash_key(body)
headers = await self._broker._auth_headers(tr_id)
headers["hashkey"] = hash_key
url = f"{self._broker._base_url}/uapi/overseas-stock/v1/trading/order"
try:
async with session.post(url, headers=headers, json=body) as resp:
if resp.status != 200:
text = await resp.text()
raise ConnectionError(
f"send_overseas_order failed ({resp.status}): {text}"
)
data = await resp.json()
logger.info(
"Overseas order submitted",
extra={
"exchange": exchange_code,
"stock_code": stock_code,
"action": order_type,
},
)
return data
except (TimeoutError, aiohttp.ClientError) as exc:
raise ConnectionError(
f"Network error sending overseas order: {exc}"
) from exc
def _get_currency_code(self, exchange_code: str) -> str:
"""
Map exchange code to currency code.
Args:
exchange_code: Exchange code
Returns:
Currency code (e.g., "USD", "JPY")
"""
currency_map = {
"NASD": "USD",
"NYSE": "USD",
"AMEX": "USD",
"TSE": "JPY",
"SEHK": "HKD",
"SHAA": "CNY",
"SZAA": "CNY",
"HNX": "VND",
"HSX": "VND",
}
return currency_map.get(exchange_code, "USD")

View File

@@ -33,6 +33,9 @@ class Settings(BaseSettings):
# Trading mode
MODE: str = Field(default="paper", pattern="^(paper|live)$")
# Market selection (comma-separated market codes)
ENABLED_MARKETS: str = "KR"
model_config = {"env_file": ".env", "env_file_encoding": "utf-8"}
@property
@@ -42,3 +45,8 @@ class Settings(BaseSettings):
@property
def account_product_code(self) -> str:
return self.KIS_ACCOUNT_NO.split("-")[1]
@property
def enabled_market_list(self) -> list[str]:
"""Parse ENABLED_MARKETS into list of market codes."""
return [m.strip() for m in self.ENABLED_MARKETS.split(",") if m.strip()]

View File

@@ -7,7 +7,6 @@ Changes require human approval and two passing test suites.
from __future__ import annotations
import logging
from dataclasses import dataclass
from src.config import Settings

View File

@@ -3,9 +3,8 @@
from __future__ import annotations
import sqlite3
from datetime import datetime, timezone
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
def init_db(db_path: str) -> sqlite3.Connection:
@@ -24,10 +23,22 @@ def init_db(db_path: str) -> sqlite3.Connection:
rationale TEXT,
quantity INTEGER,
price REAL,
pnl REAL DEFAULT 0.0
pnl REAL DEFAULT 0.0,
market TEXT DEFAULT 'KR',
exchange_code TEXT DEFAULT 'KRX'
)
"""
)
# Migration: Add market and exchange_code columns if they don't exist
cursor = conn.execute("PRAGMA table_info(trades)")
columns = {row[1] for row in cursor.fetchall()}
if "market" not in columns:
conn.execute("ALTER TABLE trades ADD COLUMN market TEXT DEFAULT 'KR'")
if "exchange_code" not in columns:
conn.execute("ALTER TABLE trades ADD COLUMN exchange_code TEXT DEFAULT 'KRX'")
conn.commit()
return conn
@@ -41,15 +52,20 @@ def log_trade(
quantity: int = 0,
price: float = 0.0,
pnl: float = 0.0,
market: str = "KR",
exchange_code: str = "KRX",
) -> None:
"""Insert a trade record into the database."""
conn.execute(
"""
INSERT INTO trades (timestamp, stock_code, action, confidence, rationale, quantity, price, pnl)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
INSERT INTO trades (
timestamp, stock_code, action, confidence, rationale,
quantity, price, pnl, market, exchange_code
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
datetime.now(timezone.utc).isoformat(),
datetime.now(UTC).isoformat(),
stock_code,
action,
confidence,
@@ -57,6 +73,8 @@ def log_trade(
quantity,
price,
pnl,
market,
exchange_code,
),
)
conn.commit()

View File

@@ -14,7 +14,7 @@ import logging
import sqlite3
import subprocess
import textwrap
from datetime import datetime, timezone
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
@@ -136,7 +136,7 @@ class EvolutionOptimizer:
body = "\n".join(lines[1:-1])
# Create strategy file
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")
version = f"v{timestamp}"
class_name = f"Strategy_{version}"
file_name = f"{version}_evolved.py"
@@ -149,7 +149,7 @@ class EvolutionOptimizer:
content = STRATEGY_TEMPLATE.format(
name=version,
timestamp=datetime.now(timezone.utc).isoformat(),
timestamp=datetime.now(UTC).isoformat(),
rationale="Auto-evolved from failure analysis",
class_name=class_name,
body=indented_body.strip(),

View File

@@ -2,20 +2,19 @@
from __future__ import annotations
import json
import logging
import sys
from datetime import datetime, timezone
from datetime import UTC, datetime
from typing import Any
import json
class JSONFormatter(logging.Formatter):
"""Emit log records as single-line JSON objects."""
def format(self, record: logging.LogRecord) -> str:
log_entry: dict[str, Any] = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"timestamp": datetime.now(UTC).isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),

View File

@@ -10,33 +10,44 @@ import argparse
import asyncio
import logging
import signal
import sys
from datetime import UTC, datetime
from typing import Any
from src.brain.gemini_client import GeminiClient
from src.broker.kis_api import KISBroker
from src.broker.overseas import OverseasBroker
from src.config import Settings
from src.core.risk_manager import CircuitBreakerTripped, RiskManager
from src.db import init_db, log_trade
from src.logging_config import setup_logging
from src.markets.schedule import MarketInfo, get_next_market_open, get_open_markets
logger = logging.getLogger(__name__)
# Target stock codes to monitor
WATCHLIST = ["005930", "000660", "035420"] # Samsung, SK Hynix, NAVER
# Target stock codes to monitor per market
WATCHLISTS = {
"KR": ["005930", "000660", "035420"], # Samsung, SK Hynix, NAVER
"US_NASDAQ": ["AAPL", "MSFT", "GOOGL"], # Example US stocks
"US_NYSE": ["JPM", "BAC"], # Example NYSE stocks
"JP": ["7203", "6758"], # Toyota, Sony
}
TRADE_INTERVAL_SECONDS = 60
MAX_CONNECTION_RETRIES = 3
async def trading_cycle(
broker: KISBroker,
overseas_broker: OverseasBroker,
brain: GeminiClient,
risk: RiskManager,
db_conn: Any,
market: MarketInfo,
stock_code: str,
) -> None:
"""Execute one trading cycle for a single stock."""
# 1. Fetch market data
if market.is_domestic:
orderbook = await broker.get_orderbook(stock_code)
balance_data = await broker.get_balance()
@@ -49,27 +60,43 @@ async def trading_cycle(
)
purchase_total = float(output2[0].get("pchs_amt_smtl_amt", "0")) if output2 else 0
# Calculate daily P&L %
pnl_pct = ((total_eval - purchase_total) / purchase_total * 100) if purchase_total > 0 else 0.0
current_price = float(orderbook.get("output1", {}).get("stck_prpr", "0"))
foreigner_net = float(orderbook.get("output1", {}).get("frgn_ntby_qty", "0"))
else:
# Overseas market
price_data = await overseas_broker.get_overseas_price(
market.exchange_code, stock_code
)
balance_data = await overseas_broker.get_overseas_balance(market.exchange_code)
current_price = float(
orderbook.get("output1", {}).get("stck_prpr", "0")
output2 = balance_data.get("output2", [{}])
total_eval = float(output2[0].get("frcr_evlu_tota", "0")) if output2 else 0
total_cash = float(output2[0].get("frcr_dncl_amt_2", "0")) if output2 else 0
purchase_total = float(output2[0].get("frcr_buy_amt_smtl", "0")) if output2 else 0
current_price = float(price_data.get("output", {}).get("last", "0"))
foreigner_net = 0.0 # Not available for overseas
# Calculate daily P&L %
pnl_pct = (
((total_eval - purchase_total) / purchase_total * 100)
if purchase_total > 0
else 0.0
)
market_data = {
"stock_code": stock_code,
"market_name": market.name,
"current_price": current_price,
"orderbook": orderbook.get("output1", {}),
"foreigner_net": float(
orderbook.get("output1", {}).get("frgn_ntby_qty", "0")
),
"foreigner_net": foreigner_net,
}
# 2. Ask the brain for a decision
decision = await brain.decide(market_data)
logger.info(
"Decision for %s: %s (confidence=%d)",
"Decision for %s (%s): %s (confidence=%d)",
stock_code,
market.name,
decision.action,
decision.confidence,
)
@@ -88,12 +115,21 @@ async def trading_cycle(
)
# 5. Send order
if market.is_domestic:
result = await broker.send_order(
stock_code=stock_code,
order_type=decision.action,
quantity=quantity,
price=0, # market order
)
else:
result = await overseas_broker.send_overseas_order(
exchange_code=market.exchange_code,
stock_code=stock_code,
order_type=decision.action,
quantity=quantity,
price=0.0, # market order
)
logger.info("Order result: %s", result.get("msg1", "OK"))
# 6. Log trade
@@ -103,12 +139,15 @@ async def trading_cycle(
action=decision.action,
confidence=decision.confidence,
rationale=decision.rationale,
market=market.code,
exchange_code=market.exchange_code,
)
async def run(settings: Settings) -> None:
"""Main async loop — iterate over watchlist on a timer."""
"""Main async loop — iterate over open markets on a timer."""
broker = KISBroker(settings)
overseas_broker = OverseasBroker(broker)
brain = GeminiClient(settings)
risk = RiskManager(settings)
db_conn = init_db(settings.DB_PATH)
@@ -124,27 +163,93 @@ async def run(settings: Settings) -> None:
loop.add_signal_handler(sig, _signal_handler)
logger.info("The Ouroboros is alive. Mode: %s", settings.MODE)
logger.info("Watchlist: %s", WATCHLIST)
logger.info("Enabled markets: %s", settings.enabled_market_list)
try:
while not shutdown.is_set():
for code in WATCHLIST:
# Get currently open markets
open_markets = get_open_markets(settings.enabled_market_list)
if not open_markets:
# No markets open — wait until next market opens
try:
next_market, next_open_time = get_next_market_open(
settings.enabled_market_list
)
now = datetime.now(UTC)
wait_seconds = (next_open_time - now).total_seconds()
logger.info(
"No markets open. Next market: %s, opens in %.1f hours",
next_market.name,
wait_seconds / 3600,
)
await asyncio.wait_for(shutdown.wait(), timeout=wait_seconds)
except TimeoutError:
continue # Market should be open now
except ValueError as exc:
logger.error("Failed to find next market open: %s", exc)
await asyncio.sleep(TRADE_INTERVAL_SECONDS)
continue
# Process each open market
for market in open_markets:
if shutdown.is_set():
break
# Get watchlist for this market
watchlist = WATCHLISTS.get(market.code, [])
if not watchlist:
logger.debug("No watchlist for market %s", market.code)
continue
logger.info("Processing market: %s (%d stocks)", market.name, len(watchlist))
# Process each stock in the watchlist
for stock_code in watchlist:
if shutdown.is_set():
break
# Retry logic for connection errors
for attempt in range(1, MAX_CONNECTION_RETRIES + 1):
try:
await trading_cycle(broker, brain, risk, db_conn, code)
await trading_cycle(
broker,
overseas_broker,
brain,
risk,
db_conn,
market,
stock_code,
)
break # Success — exit retry loop
except CircuitBreakerTripped:
logger.critical("Circuit breaker tripped — shutting down")
raise
except ConnectionError as exc:
logger.error("Connection error for %s: %s", code, exc)
if attempt < MAX_CONNECTION_RETRIES:
logger.warning(
"Connection error for %s (attempt %d/%d): %s",
stock_code,
attempt,
MAX_CONNECTION_RETRIES,
exc,
)
await asyncio.sleep(2**attempt) # Exponential backoff
else:
logger.error(
"Connection error for %s (all retries exhausted): %s",
stock_code,
exc,
)
break # Give up on this stock
except Exception as exc:
logger.exception("Unexpected error for %s: %s", code, exc)
logger.exception("Unexpected error for %s: %s", stock_code, exc)
break # Don't retry on unexpected errors
# Wait for next cycle or shutdown
try:
await asyncio.wait_for(shutdown.wait(), timeout=TRADE_INTERVAL_SECONDS)
except asyncio.TimeoutError:
except TimeoutError:
pass # Normal — timeout means it's time for next cycle
finally:
await broker.close()

1
src/markets/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Global market scheduling and timezone management."""

252
src/markets/schedule.py Normal file
View File

@@ -0,0 +1,252 @@
"""Market schedule management with timezone support."""
from dataclasses import dataclass
from datetime import datetime, time, timedelta
from zoneinfo import ZoneInfo
@dataclass(frozen=True)
class MarketInfo:
"""Information about a trading market."""
code: str # Market code for internal use (e.g., "KR", "US_NASDAQ")
exchange_code: str # KIS API exchange code (e.g., "NASD", "NYSE")
name: str # Human-readable name
timezone: ZoneInfo # Market timezone
open_time: time # Market open time in local timezone
close_time: time # Market close time in local timezone
is_domestic: bool # True for Korean market, False for overseas
lunch_break: tuple[time, time] | None = None # (start, end) or None
# 10 global markets with their schedules
MARKETS: dict[str, MarketInfo] = {
"KR": MarketInfo(
code="KR",
exchange_code="KRX",
name="Korea Exchange",
timezone=ZoneInfo("Asia/Seoul"),
open_time=time(9, 0),
close_time=time(15, 30),
is_domestic=True,
lunch_break=None, # KRX removed lunch break
),
"US_NASDAQ": MarketInfo(
code="US_NASDAQ",
exchange_code="NASD",
name="NASDAQ",
timezone=ZoneInfo("America/New_York"),
open_time=time(9, 30),
close_time=time(16, 0),
is_domestic=False,
lunch_break=None,
),
"US_NYSE": MarketInfo(
code="US_NYSE",
exchange_code="NYSE",
name="New York Stock Exchange",
timezone=ZoneInfo("America/New_York"),
open_time=time(9, 30),
close_time=time(16, 0),
is_domestic=False,
lunch_break=None,
),
"US_AMEX": MarketInfo(
code="US_AMEX",
exchange_code="AMEX",
name="NYSE American",
timezone=ZoneInfo("America/New_York"),
open_time=time(9, 30),
close_time=time(16, 0),
is_domestic=False,
lunch_break=None,
),
"JP": MarketInfo(
code="JP",
exchange_code="TSE",
name="Tokyo Stock Exchange",
timezone=ZoneInfo("Asia/Tokyo"),
open_time=time(9, 0),
close_time=time(15, 0),
is_domestic=False,
lunch_break=(time(11, 30), time(12, 30)),
),
"HK": MarketInfo(
code="HK",
exchange_code="SEHK",
name="Hong Kong Stock Exchange",
timezone=ZoneInfo("Asia/Hong_Kong"),
open_time=time(9, 30),
close_time=time(16, 0),
is_domestic=False,
lunch_break=(time(12, 0), time(13, 0)),
),
"CN_SHA": MarketInfo(
code="CN_SHA",
exchange_code="SHAA",
name="Shanghai Stock Exchange",
timezone=ZoneInfo("Asia/Shanghai"),
open_time=time(9, 30),
close_time=time(15, 0),
is_domestic=False,
lunch_break=(time(11, 30), time(13, 0)),
),
"CN_SZA": MarketInfo(
code="CN_SZA",
exchange_code="SZAA",
name="Shenzhen Stock Exchange",
timezone=ZoneInfo("Asia/Shanghai"),
open_time=time(9, 30),
close_time=time(15, 0),
is_domestic=False,
lunch_break=(time(11, 30), time(13, 0)),
),
"VN_HAN": MarketInfo(
code="VN_HAN",
exchange_code="HNX",
name="Hanoi Stock Exchange",
timezone=ZoneInfo("Asia/Ho_Chi_Minh"),
open_time=time(9, 0),
close_time=time(15, 0),
is_domestic=False,
lunch_break=(time(11, 30), time(13, 0)),
),
"VN_HCM": MarketInfo(
code="VN_HCM",
exchange_code="HSX",
name="Ho Chi Minh Stock Exchange",
timezone=ZoneInfo("Asia/Ho_Chi_Minh"),
open_time=time(9, 0),
close_time=time(15, 0),
is_domestic=False,
lunch_break=(time(11, 30), time(13, 0)),
),
}
def is_market_open(market: MarketInfo, now: datetime | None = None) -> bool:
"""
Check if a market is currently open for trading.
Args:
market: Market information
now: Current time (defaults to datetime.now(UTC) for testing)
Returns:
True if market is open, False otherwise
Note:
Does not account for holidays (KIS API will reject orders on holidays)
"""
if now is None:
now = datetime.now(ZoneInfo("UTC"))
# Convert to market's local timezone
local_now = now.astimezone(market.timezone)
# Check if it's a weekend
if local_now.weekday() >= 5: # Saturday=5, Sunday=6
return False
current_time = local_now.time()
# Check if within trading hours
if current_time < market.open_time or current_time >= market.close_time:
return False
# Check lunch break
if market.lunch_break:
lunch_start, lunch_end = market.lunch_break
if lunch_start <= current_time < lunch_end:
return False
return True
def get_open_markets(
enabled_markets: list[str] | None = None, now: datetime | None = None
) -> list[MarketInfo]:
"""
Get list of currently open markets.
Args:
enabled_markets: List of market codes to check (defaults to all markets)
now: Current time (defaults to datetime.now(UTC) for testing)
Returns:
List of open markets, sorted by market code
"""
if enabled_markets is None:
enabled_markets = list(MARKETS.keys())
open_markets = [
MARKETS[code]
for code in enabled_markets
if code in MARKETS and is_market_open(MARKETS[code], now)
]
return sorted(open_markets, key=lambda m: m.code)
def get_next_market_open(
enabled_markets: list[str] | None = None, now: datetime | None = None
) -> tuple[MarketInfo, datetime]:
"""
Find the next market that will open and when.
Args:
enabled_markets: List of market codes to check (defaults to all markets)
now: Current time (defaults to datetime.now(UTC) for testing)
Returns:
Tuple of (market, open_datetime) for the next market to open
Raises:
ValueError: If no enabled markets are configured
"""
if now is None:
now = datetime.now(ZoneInfo("UTC"))
if enabled_markets is None:
enabled_markets = list(MARKETS.keys())
if not enabled_markets:
raise ValueError("No enabled markets configured")
next_open_time: datetime | None = None
next_market: MarketInfo | None = None
for code in enabled_markets:
if code not in MARKETS:
continue
market = MARKETS[code]
market_now = now.astimezone(market.timezone)
# Calculate next open time for this market
for days_ahead in range(7): # Check next 7 days
check_date = market_now.date() + timedelta(days=days_ahead)
check_datetime = datetime.combine(
check_date, market.open_time, tzinfo=market.timezone
)
# Skip weekends
if check_datetime.weekday() >= 5:
continue
# Skip if this open time already passed today
if check_datetime <= market_now:
continue
# Convert to UTC for comparison
check_datetime_utc = check_datetime.astimezone(ZoneInfo("UTC"))
if next_open_time is None or check_datetime_utc < next_open_time:
next_open_time = check_datetime_utc
next_market = market
break
if next_market is None or next_open_time is None:
raise ValueError("Could not find next market open time")
return next_market, next_open_time

View File

@@ -20,4 +20,5 @@ def settings() -> Settings:
FAT_FINGER_PCT=30.0,
CONFIDENCE_THRESHOLD=80,
DB_PATH=":memory:",
ENABLED_MARKETS="KR",
)

View File

@@ -2,12 +2,7 @@
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from src.brain.gemini_client import GeminiClient, TradeDecision
from src.brain.gemini_client import GeminiClient
# ---------------------------------------------------------------------------
# Response Parsing

View File

@@ -3,14 +3,12 @@
from __future__ import annotations
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
from unittest.mock import AsyncMock, patch
import aiohttp
import pytest
from src.broker.kis_api import KISBroker
# ---------------------------------------------------------------------------
# Token Management
# ---------------------------------------------------------------------------
@@ -68,7 +66,7 @@ class TestNetworkErrorHandling:
with patch(
"aiohttp.ClientSession.get",
side_effect=asyncio.TimeoutError(),
side_effect=TimeoutError(),
):
with pytest.raises(ConnectionError):
await broker.get_orderbook("005930")

View File

@@ -0,0 +1,201 @@
"""Tests for market schedule management."""
from datetime import datetime
from zoneinfo import ZoneInfo
import pytest
from src.markets.schedule import (
MARKETS,
get_next_market_open,
get_open_markets,
is_market_open,
)
class TestMarketInfo:
"""Test MarketInfo dataclass."""
def test_market_info_immutable(self) -> None:
"""MarketInfo should be frozen."""
market = MARKETS["KR"]
with pytest.raises(AttributeError):
market.code = "US" # type: ignore[misc]
def test_all_markets_defined(self) -> None:
"""All 10 markets should be defined."""
expected_markets = {
"KR",
"US_NASDAQ",
"US_NYSE",
"US_AMEX",
"JP",
"HK",
"CN_SHA",
"CN_SZA",
"VN_HAN",
"VN_HCM",
}
assert set(MARKETS.keys()) == expected_markets
class TestIsMarketOpen:
"""Test is_market_open function."""
def test_kr_market_open_weekday(self) -> None:
"""KR market should be open during trading hours on weekday."""
# Monday 2026-02-02 10:00 KST
test_time = datetime(2026, 2, 2, 10, 0, tzinfo=ZoneInfo("Asia/Seoul"))
assert is_market_open(MARKETS["KR"], test_time)
def test_kr_market_closed_before_open(self) -> None:
"""KR market should be closed before 9:00."""
# Monday 2026-02-02 08:30 KST
test_time = datetime(2026, 2, 2, 8, 30, tzinfo=ZoneInfo("Asia/Seoul"))
assert not is_market_open(MARKETS["KR"], test_time)
def test_kr_market_closed_after_close(self) -> None:
"""KR market should be closed after 15:30."""
# Monday 2026-02-02 15:30 KST (exact close time)
test_time = datetime(2026, 2, 2, 15, 30, tzinfo=ZoneInfo("Asia/Seoul"))
assert not is_market_open(MARKETS["KR"], test_time)
def test_kr_market_closed_weekend(self) -> None:
"""KR market should be closed on weekends."""
# Saturday 2026-02-07 10:00 KST
test_time = datetime(2026, 2, 7, 10, 0, tzinfo=ZoneInfo("Asia/Seoul"))
assert not is_market_open(MARKETS["KR"], test_time)
# Sunday 2026-02-08 10:00 KST
test_time = datetime(2026, 2, 8, 10, 0, tzinfo=ZoneInfo("Asia/Seoul"))
assert not is_market_open(MARKETS["KR"], test_time)
def test_us_nasdaq_open_with_dst(self) -> None:
"""US markets should respect DST."""
# Monday 2026-06-01 10:00 EDT (DST in effect)
test_time = datetime(2026, 6, 1, 10, 0, tzinfo=ZoneInfo("America/New_York"))
assert is_market_open(MARKETS["US_NASDAQ"], test_time)
# Monday 2026-12-07 10:00 EST (no DST)
test_time = datetime(2026, 12, 7, 10, 0, tzinfo=ZoneInfo("America/New_York"))
assert is_market_open(MARKETS["US_NASDAQ"], test_time)
def test_jp_market_lunch_break(self) -> None:
"""JP market should be closed during lunch break."""
# Monday 2026-02-02 12:00 JST (lunch break)
test_time = datetime(2026, 2, 2, 12, 0, tzinfo=ZoneInfo("Asia/Tokyo"))
assert not is_market_open(MARKETS["JP"], test_time)
# Before lunch
test_time = datetime(2026, 2, 2, 11, 0, tzinfo=ZoneInfo("Asia/Tokyo"))
assert is_market_open(MARKETS["JP"], test_time)
# After lunch
test_time = datetime(2026, 2, 2, 13, 0, tzinfo=ZoneInfo("Asia/Tokyo"))
assert is_market_open(MARKETS["JP"], test_time)
def test_hk_market_lunch_break(self) -> None:
"""HK market should be closed during lunch break."""
# Monday 2026-02-02 12:30 HKT (lunch break)
test_time = datetime(2026, 2, 2, 12, 30, tzinfo=ZoneInfo("Asia/Hong_Kong"))
assert not is_market_open(MARKETS["HK"], test_time)
def test_timezone_conversion(self) -> None:
"""Should correctly convert timezones."""
# 2026-02-02 10:00 KST = 2026-02-02 01:00 UTC
test_time = datetime(2026, 2, 2, 1, 0, tzinfo=ZoneInfo("UTC"))
assert is_market_open(MARKETS["KR"], test_time)
class TestGetOpenMarkets:
"""Test get_open_markets function."""
def test_get_open_markets_all_closed(self) -> None:
"""Should return empty list when all markets closed."""
# Sunday 2026-02-08 12:00 UTC (all markets closed)
test_time = datetime(2026, 2, 8, 12, 0, tzinfo=ZoneInfo("UTC"))
assert get_open_markets(now=test_time) == []
def test_get_open_markets_kr_only(self) -> None:
"""Should return only KR when filtering enabled markets."""
# Monday 2026-02-02 10:00 KST = 01:00 UTC
test_time = datetime(2026, 2, 2, 1, 0, tzinfo=ZoneInfo("UTC"))
open_markets = get_open_markets(enabled_markets=["KR"], now=test_time)
assert len(open_markets) == 1
assert open_markets[0].code == "KR"
def test_get_open_markets_multiple(self) -> None:
"""Should return multiple markets when open."""
# Monday 2026-02-02 14:30 EST = 19:30 UTC
# US markets: 9:30-16:00 EST → 14:30-21:00 UTC (open)
test_time = datetime(2026, 2, 2, 19, 30, tzinfo=ZoneInfo("UTC"))
open_markets = get_open_markets(
enabled_markets=["US_NASDAQ", "US_NYSE", "US_AMEX"], now=test_time
)
assert len(open_markets) == 3
codes = {m.code for m in open_markets}
assert codes == {"US_NASDAQ", "US_NYSE", "US_AMEX"}
def test_get_open_markets_sorted(self) -> None:
"""Should return markets sorted by code."""
# Monday 2026-02-02 14:30 EST
test_time = datetime(2026, 2, 2, 19, 30, tzinfo=ZoneInfo("UTC"))
open_markets = get_open_markets(
enabled_markets=["US_NYSE", "US_AMEX", "US_NASDAQ"], now=test_time
)
codes = [m.code for m in open_markets]
assert codes == sorted(codes)
class TestGetNextMarketOpen:
"""Test get_next_market_open function."""
def test_get_next_market_open_weekend(self) -> None:
"""Should find next Monday opening when called on weekend."""
# Saturday 2026-02-07 12:00 UTC
test_time = datetime(2026, 2, 7, 12, 0, tzinfo=ZoneInfo("UTC"))
market, open_time = get_next_market_open(
enabled_markets=["KR"], now=test_time
)
assert market.code == "KR"
# Monday 2026-02-09 09:00 KST
expected = datetime(2026, 2, 9, 9, 0, tzinfo=ZoneInfo("Asia/Seoul"))
assert open_time == expected.astimezone(ZoneInfo("UTC"))
def test_get_next_market_open_after_close(self) -> None:
"""Should find next day opening when called after market close."""
# Monday 2026-02-02 16:00 KST (after close)
test_time = datetime(2026, 2, 2, 16, 0, tzinfo=ZoneInfo("Asia/Seoul"))
market, open_time = get_next_market_open(
enabled_markets=["KR"], now=test_time
)
assert market.code == "KR"
# Tuesday 2026-02-03 09:00 KST
expected = datetime(2026, 2, 3, 9, 0, tzinfo=ZoneInfo("Asia/Seoul"))
assert open_time == expected.astimezone(ZoneInfo("UTC"))
def test_get_next_market_open_multiple_markets(self) -> None:
"""Should find earliest opening market among multiple."""
# Saturday 2026-02-07 12:00 UTC
test_time = datetime(2026, 2, 7, 12, 0, tzinfo=ZoneInfo("UTC"))
market, open_time = get_next_market_open(
enabled_markets=["KR", "US_NASDAQ"], now=test_time
)
# Monday 2026-02-09: KR opens at 09:00 KST = 00:00 UTC
# Monday 2026-02-09: US opens at 09:30 EST = 14:30 UTC
# KR opens first
assert market.code == "KR"
def test_get_next_market_open_no_markets(self) -> None:
"""Should raise ValueError when no markets enabled."""
test_time = datetime(2026, 2, 7, 12, 0, tzinfo=ZoneInfo("UTC"))
with pytest.raises(ValueError, match="No enabled markets"):
get_next_market_open(enabled_markets=[], now=test_time)
def test_get_next_market_open_invalid_market(self) -> None:
"""Should skip invalid market codes."""
test_time = datetime(2026, 2, 7, 12, 0, tzinfo=ZoneInfo("UTC"))
market, _ = get_next_market_open(
enabled_markets=["INVALID", "KR"], now=test_time
)
assert market.code == "KR"

View File

@@ -10,7 +10,6 @@ from src.core.risk_manager import (
RiskManager,
)
# ---------------------------------------------------------------------------
# Circuit Breaker Tests
# ---------------------------------------------------------------------------