feat: implement Latency Control with criticality-based prioritization (Pillar 1) #27

Merged
jihoson merged 1 commits from feature/issue-21-latency-control into main 2026-02-04 17:02:41 +09:00
Collaborator

Summary

Implements Pillar 1: 속도와 시의성의 최적화

Urgency-based response system that reacts faster in critical market situations, with prioritized task execution based on real-time market conditions.

  • 4-level criticality system (CRITICAL/HIGH/NORMAL/LOW)
  • Priority-based task queue with timeout enforcement
  • Integration with Volatility Hunter for real-time assessment
  • Fast-path execution for critical situations
  • Comprehensive latency monitoring
  • 30 tests, 98% coverage

Components

1. CriticalityAssessor (src/core/criticality.py)

Evaluates market conditions to determine response urgency:

Criticality Levels & Timeouts:

  • CRITICAL (<5s): Emergency conditions
  • HIGH (<30s): Elevated volatility
  • NORMAL (<60s): Standard trading
  • LOW (batch): Quiet markets

Auto-elevation to CRITICAL when:

  • P&L < -2.5% (near circuit breaker at -3.0%)
  • Stock moves >5% in 1 minute
  • Volume surge >10x average

Key Methods:

  • assess_market_conditions() - Evaluates criticality
  • get_timeout_for_level() - Returns timeout per level
  • Fully configurable thresholds

Coverage: 100%

2. PriorityTaskQueue (src/core/priority_queue.py)

Thread-safe priority queue with timeout enforcement:

Features:

  • Priority-based ordering (CRITICAL tasks bypass queue)
  • FIFO within same priority level
  • Timeout enforcement per criticality
  • Comprehensive metrics (enqueued, dequeued, timeouts, errors)
  • Wait time monitoring (average & P95)
  • Graceful degradation when full
  • Asyncio lock for concurrent operations

Key Methods:

  • enqueue() - Add task with priority
  • dequeue() - Get highest priority task
  • get_metrics() - Queue statistics
  • calculate_wait_times() - Latency analysis

Coverage: 96%

3. Integration with main.py

Criticality Assessment (start of trading_cycle):

  • Fetches volatility from Context Tree L7_REALTIME
  • Uses VolatilityAnalyzer momentum scores
  • Evaluates P&L, price changes, volume surges

Latency Monitoring (end of trading_cycle):

  • Measures cycle execution time
  • Compares against timeout threshold
  • Logs warnings on SLA violations

Queue Metrics: Periodic reporting of statistics

4. Volatility Hunter Integration

  • Uses calculate_momentum() for assessment
  • Pulls data from Context Tree L7_REALTIME
  • Auto-detects market conditions in real-time

Tests (tests/test_latency_control.py)

30 comprehensive tests covering:

CriticalityAssessor (11 tests):

  • Market closed/quiet → LOW
  • P&L threshold → CRITICAL
  • Price change threshold → CRITICAL
  • Volume surge → CRITICAL
  • High volatility → HIGH
  • Normal conditions → NORMAL
  • Custom configurations
  • Timeout mappings

PriorityTaskQueue (15 tests):

  • Enqueue/dequeue operations
  • Priority ordering
  • FIFO within priority
  • Queue full handling
  • Timeout enforcement
  • Error propagation
  • Metrics tracking
  • Concurrent operations
  • Wait time calculations

Integration (4 tests):

  • Critical task bypass
  • Timeout enforcement
  • Fast-path execution
  • Graceful degradation

Coverage: 98% (criticality: 100%, queue: 96%)
All 157 tests passing

Performance Specifications

CRITICAL timeout: <5s
HIGH timeout: <30s
NORMAL timeout: <60s
LOW: no timeout (batch)
Queue is thread-safe
Graceful degradation
P&L threshold: -2.5%
Price change: 5% in 1min
Volume surge: 10x

Integration Points

Uses existing infrastructure:

  • VolatilityAnalyzer for momentum scores
  • Context Tree L7_REALTIME for volatility data
  • RiskManager for P&L calculation
  • main.py trading_cycle for latency tracking

Usage Example

# Initialize
assessor = CriticalityAssessor(settings)
queue = PriorityTaskQueue()

# Assess criticality
level = assessor.assess_market_conditions(
    pnl_pct=-2.8,  # Near circuit breaker
    volatility_score=75,
    volume_surge=12.0
)  # Returns CRITICAL

# Queue task with priority
await queue.enqueue(level, task_data)

# Dequeue highest priority
priority, task = await queue.dequeue(timeout=5.0)

# Get metrics
metrics = queue.get_metrics()
wait_times = queue.calculate_wait_times()

Closes #21
Part of The 4 Pillars: Pillar 1 (Latency Control)
Depends on: #20 (Volatility Hunter)
Synergy with: #15 (Context Tree), #19 (Evolution)

## Summary Implements Pillar 1: 속도와 시의성의 최적화 Urgency-based response system that reacts faster in critical market situations, with prioritized task execution based on real-time market conditions. - ✅ 4-level criticality system (CRITICAL/HIGH/NORMAL/LOW) - ✅ Priority-based task queue with timeout enforcement - ✅ Integration with Volatility Hunter for real-time assessment - ✅ Fast-path execution for critical situations - ✅ Comprehensive latency monitoring - ✅ 30 tests, 98% coverage ## Components ### 1. CriticalityAssessor (src/core/criticality.py) Evaluates market conditions to determine response urgency: **Criticality Levels & Timeouts**: - CRITICAL (<5s): Emergency conditions - HIGH (<30s): Elevated volatility - NORMAL (<60s): Standard trading - LOW (batch): Quiet markets **Auto-elevation to CRITICAL when**: - P&L < -2.5% (near circuit breaker at -3.0%) - Stock moves >5% in 1 minute - Volume surge >10x average **Key Methods**: - assess_market_conditions() - Evaluates criticality - get_timeout_for_level() - Returns timeout per level - Fully configurable thresholds **Coverage**: 100% ### 2. PriorityTaskQueue (src/core/priority_queue.py) Thread-safe priority queue with timeout enforcement: **Features**: - Priority-based ordering (CRITICAL tasks bypass queue) - FIFO within same priority level - Timeout enforcement per criticality - Comprehensive metrics (enqueued, dequeued, timeouts, errors) - Wait time monitoring (average & P95) - Graceful degradation when full - Asyncio lock for concurrent operations **Key Methods**: - enqueue() - Add task with priority - dequeue() - Get highest priority task - get_metrics() - Queue statistics - calculate_wait_times() - Latency analysis **Coverage**: 96% ### 3. Integration with main.py **Criticality Assessment** (start of trading_cycle): - Fetches volatility from Context Tree L7_REALTIME - Uses VolatilityAnalyzer momentum scores - Evaluates P&L, price changes, volume surges **Latency Monitoring** (end of trading_cycle): - Measures cycle execution time - Compares against timeout threshold - Logs warnings on SLA violations **Queue Metrics**: Periodic reporting of statistics ### 4. Volatility Hunter Integration - Uses calculate_momentum() for assessment - Pulls data from Context Tree L7_REALTIME - Auto-detects market conditions in real-time ## Tests (tests/test_latency_control.py) 30 comprehensive tests covering: **CriticalityAssessor (11 tests)**: - Market closed/quiet → LOW - P&L threshold → CRITICAL - Price change threshold → CRITICAL - Volume surge → CRITICAL - High volatility → HIGH - Normal conditions → NORMAL - Custom configurations - Timeout mappings **PriorityTaskQueue (15 tests)**: - Enqueue/dequeue operations - Priority ordering - FIFO within priority - Queue full handling - Timeout enforcement - Error propagation - Metrics tracking - Concurrent operations - Wait time calculations **Integration (4 tests)**: - Critical task bypass - Timeout enforcement - Fast-path execution - Graceful degradation **Coverage**: 98% (criticality: 100%, queue: 96%) All 157 tests passing ## Performance Specifications ✅ CRITICAL timeout: <5s ✅ HIGH timeout: <30s ✅ NORMAL timeout: <60s ✅ LOW: no timeout (batch) ✅ Queue is thread-safe ✅ Graceful degradation ✅ P&L threshold: -2.5% ✅ Price change: 5% in 1min ✅ Volume surge: 10x ## Integration Points Uses existing infrastructure: - VolatilityAnalyzer for momentum scores - Context Tree L7_REALTIME for volatility data - RiskManager for P&L calculation - main.py trading_cycle for latency tracking ## Usage Example ```python # Initialize assessor = CriticalityAssessor(settings) queue = PriorityTaskQueue() # Assess criticality level = assessor.assess_market_conditions( pnl_pct=-2.8, # Near circuit breaker volatility_score=75, volume_surge=12.0 ) # Returns CRITICAL # Queue task with priority await queue.enqueue(level, task_data) # Dequeue highest priority priority, task = await queue.dequeue(timeout=5.0) # Get metrics metrics = queue.get_metrics() wait_times = queue.calculate_wait_times() ``` ## Related Closes #21 Part of The 4 Pillars: Pillar 1 (Latency Control) Depends on: #20 (Volatility Hunter) Synergy with: #15 (Context Tree), #19 (Evolution)
agentson added 1 commit 2026-02-04 16:48:21 +09:00
feat: implement latency control system with criticality-based prioritization
Some checks failed
CI / test (pull_request) Has been cancelled
ce952d97b2
Add urgency-based response system to react faster in critical market situations.

Components:
- CriticalityAssessor: Evaluates market conditions (P&L, volatility, volume surge)
  and assigns urgency levels (CRITICAL <5s, HIGH <30s, NORMAL <60s, LOW batch)
- PriorityTaskQueue: Thread-safe priority queue with timeout enforcement,
  metrics tracking, and graceful degradation when full
- Integration with main.py: Assess criticality at trading cycle start,
  monitor latency per criticality level, log queue metrics

Auto-elevate to CRITICAL when:
- P&L < -2.5% (near circuit breaker at -3.0%)
- Stock moves >5% in 1 minute
- Volume surge >10x average

Integration with Volatility Hunter:
- Uses VolatilityAnalyzer.calculate_momentum() for assessment
- Pulls volatility scores from Context Tree L7_REALTIME
- Auto-detects market conditions for criticality

Tests:
- 30 comprehensive tests covering criticality assessment, priority queue,
  timeout enforcement, metrics tracking, and integration scenarios
- Coverage: criticality.py 100%, priority_queue.py 96%
- All 157 tests pass

Resolves issue #21 - Pillar 1: 속도와 시의성의 최적화

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
jihoson merged commit f40f19e735 into main 2026-02-04 17:02:41 +09:00
Sign in to join this conversation.
No Reviewers
1 Participants
Notifications
Due Date
No due date set.
Dependencies

No dependencies set.

Reference: jihoson/The-Ouroboros#27