feat: integrate ContextScheduler into main loop (issue #89)
Some checks failed
CI / test (pull_request) Has been cancelled

Wire up periodic context rollups (weekly/monthly/quarterly/annual/legacy)
in both daily and realtime trading loops with dedup-safe scheduling.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
agentson
2026-02-14 23:37:30 +09:00
parent ab7f0444b2
commit d9763def85
2 changed files with 49 additions and 2 deletions

View File

@@ -22,6 +22,7 @@ from src.broker.overseas import OverseasBroker
from src.config import Settings
from src.context.aggregator import ContextAggregator
from src.context.layer import ContextLayer
from src.context.scheduler import ContextScheduler
from src.context.store import ContextStore
from src.core.criticality import CriticalityAssessor
from src.core.priority_queue import PriorityTaskQueue
@@ -772,6 +773,35 @@ async def _handle_market_close(
)
def _run_context_scheduler(
scheduler: ContextScheduler, now: datetime | None = None,
) -> None:
"""Run periodic context scheduler tasks and log when anything executes."""
result = scheduler.run_if_due(now=now)
if any(
[
result.weekly,
result.monthly,
result.quarterly,
result.annual,
result.legacy,
result.cleanup,
]
):
logger.info(
(
"Context scheduler ran (weekly=%s, monthly=%s, quarterly=%s, "
"annual=%s, legacy=%s, cleanup=%s)"
),
result.weekly,
result.monthly,
result.quarterly,
result.annual,
result.legacy,
result.cleanup,
)
async def run(settings: Settings) -> None:
"""Main async loop — iterate over open markets on a timer."""
broker = KISBroker(settings)
@@ -782,6 +812,10 @@ async def run(settings: Settings) -> None:
decision_logger = DecisionLogger(db_conn)
context_store = ContextStore(db_conn)
context_aggregator = ContextAggregator(db_conn)
context_scheduler = ContextScheduler(
aggregator=context_aggregator,
store=context_store,
)
# V2 proactive strategy components
context_selector = ContextSelector(context_store)
@@ -1015,6 +1049,7 @@ async def run(settings: Settings) -> None:
while not shutdown.is_set():
# Wait for trading to be unpaused
await pause_trading.wait()
_run_context_scheduler(context_scheduler, now=datetime.now(UTC))
try:
await run_daily_session(
@@ -1053,6 +1088,7 @@ async def run(settings: Settings) -> None:
while not shutdown.is_set():
# Wait for trading to be unpaused
await pause_trading.wait()
_run_context_scheduler(context_scheduler, now=datetime.now(UTC))
# Get currently open markets
open_markets = get_open_markets(settings.enabled_market_list)