feat: 컨텍스트 집계 스케줄러 (issue #87) #119

Merged
jihoson merged 1 commits from feature/issue-87-context-scheduler into main 2026-02-10 04:28:42 +09:00
3 changed files with 241 additions and 1 deletions

View File

@@ -5,6 +5,7 @@ The context tree implements Pillar 2: hierarchical memory management across
"""
from src.context.layer import ContextLayer
from src.context.scheduler import ContextScheduler
from src.context.store import ContextStore
__all__ = ["ContextLayer", "ContextStore"]
__all__ = ["ContextLayer", "ContextScheduler", "ContextStore"]

135
src/context/scheduler.py Normal file
View File

@@ -0,0 +1,135 @@
"""Context aggregation scheduler for periodic rollups and cleanup."""
from __future__ import annotations
import sqlite3
from calendar import monthrange
from dataclasses import dataclass
from datetime import UTC, datetime
from src.context.aggregator import ContextAggregator
from src.context.store import ContextStore
@dataclass(frozen=True)
class ScheduleResult:
"""Represents which scheduled tasks ran."""
weekly: bool = False
monthly: bool = False
quarterly: bool = False
annual: bool = False
legacy: bool = False
cleanup: bool = False
class ContextScheduler:
"""Run periodic context aggregations and cleanup when due."""
def __init__(
self,
conn: sqlite3.Connection | None = None,
aggregator: ContextAggregator | None = None,
store: ContextStore | None = None,
) -> None:
if aggregator is None:
if conn is None:
raise ValueError("conn is required when aggregator is not provided")
aggregator = ContextAggregator(conn)
self.aggregator = aggregator
if store is None:
store = getattr(aggregator, "store", None)
if store is None:
if conn is None:
raise ValueError("conn is required when store is not provided")
store = ContextStore(conn)
self.store = store
self._last_run: dict[str, str] = {}
def run_if_due(self, now: datetime | None = None) -> ScheduleResult:
"""Run scheduled aggregations if their schedule is due.
Args:
now: Current datetime (UTC). If None, uses current time.
Returns:
ScheduleResult indicating which tasks ran.
"""
if now is None:
now = datetime.now(UTC)
today = now.date().isoformat()
result = ScheduleResult()
if self._should_run("cleanup", today):
self.store.cleanup_expired_contexts()
result = self._with(result, cleanup=True)
if self._is_sunday(now) and self._should_run("weekly", today):
week = now.strftime("%Y-W%V")
self.aggregator.aggregate_weekly_from_daily(week)
result = self._with(result, weekly=True)
if self._is_last_day_of_month(now) and self._should_run("monthly", today):
month = now.strftime("%Y-%m")
self.aggregator.aggregate_monthly_from_weekly(month)
result = self._with(result, monthly=True)
if self._is_last_day_of_quarter(now) and self._should_run("quarterly", today):
quarter = self._current_quarter(now)
self.aggregator.aggregate_quarterly_from_monthly(quarter)
result = self._with(result, quarterly=True)
if self._is_last_day_of_year(now) and self._should_run("annual", today):
year = str(now.year)
self.aggregator.aggregate_annual_from_quarterly(year)
result = self._with(result, annual=True)
# Legacy rollup runs after annual aggregation.
self.aggregator.aggregate_legacy_from_annual()
result = self._with(result, legacy=True)
return result
def _should_run(self, key: str, date_str: str) -> bool:
if self._last_run.get(key) == date_str:
return False
self._last_run[key] = date_str
return True
@staticmethod
def _is_sunday(now: datetime) -> bool:
return now.weekday() == 6
@staticmethod
def _is_last_day_of_month(now: datetime) -> bool:
last_day = monthrange(now.year, now.month)[1]
return now.day == last_day
@classmethod
def _is_last_day_of_quarter(cls, now: datetime) -> bool:
if now.month not in (3, 6, 9, 12):
return False
return cls._is_last_day_of_month(now)
@staticmethod
def _is_last_day_of_year(now: datetime) -> bool:
return now.month == 12 and now.day == 31
@staticmethod
def _current_quarter(now: datetime) -> str:
quarter = (now.month - 1) // 3 + 1
return f"{now.year}-Q{quarter}"
@staticmethod
def _with(result: ScheduleResult, **kwargs: bool) -> ScheduleResult:
return ScheduleResult(
weekly=kwargs.get("weekly", result.weekly),
monthly=kwargs.get("monthly", result.monthly),
quarterly=kwargs.get("quarterly", result.quarterly),
annual=kwargs.get("annual", result.annual),
legacy=kwargs.get("legacy", result.legacy),
cleanup=kwargs.get("cleanup", result.cleanup),
)

View File

@@ -0,0 +1,104 @@
"""Tests for ContextScheduler."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import UTC, datetime
from src.context.scheduler import ContextScheduler
@dataclass
class StubAggregator:
"""Stub aggregator that records calls."""
weekly_calls: list[str]
monthly_calls: list[str]
quarterly_calls: list[str]
annual_calls: list[str]
legacy_calls: int
def aggregate_weekly_from_daily(self, week: str) -> None:
self.weekly_calls.append(week)
def aggregate_monthly_from_weekly(self, month: str) -> None:
self.monthly_calls.append(month)
def aggregate_quarterly_from_monthly(self, quarter: str) -> None:
self.quarterly_calls.append(quarter)
def aggregate_annual_from_quarterly(self, year: str) -> None:
self.annual_calls.append(year)
def aggregate_legacy_from_annual(self) -> None:
self.legacy_calls += 1
@dataclass
class StubStore:
"""Stub store that records cleanup calls."""
cleanup_calls: int = 0
def cleanup_expired_contexts(self) -> None:
self.cleanup_calls += 1
def make_scheduler() -> tuple[ContextScheduler, StubAggregator, StubStore]:
aggregator = StubAggregator([], [], [], [], 0)
store = StubStore()
scheduler = ContextScheduler(aggregator=aggregator, store=store)
return scheduler, aggregator, store
def test_run_if_due_weekly() -> None:
scheduler, aggregator, store = make_scheduler()
now = datetime(2026, 2, 8, 10, 0, tzinfo=UTC) # Sunday
result = scheduler.run_if_due(now)
assert result.weekly is True
assert aggregator.weekly_calls == ["2026-W06"]
assert store.cleanup_calls == 1
def test_run_if_due_monthly() -> None:
scheduler, aggregator, _store = make_scheduler()
now = datetime(2026, 2, 28, 12, 0, tzinfo=UTC) # Last day of month
result = scheduler.run_if_due(now)
assert result.monthly is True
assert aggregator.monthly_calls == ["2026-02"]
def test_run_if_due_quarterly() -> None:
scheduler, aggregator, _store = make_scheduler()
now = datetime(2026, 3, 31, 12, 0, tzinfo=UTC) # Last day of Q1
result = scheduler.run_if_due(now)
assert result.quarterly is True
assert aggregator.quarterly_calls == ["2026-Q1"]
def test_run_if_due_annual_and_legacy() -> None:
scheduler, aggregator, _store = make_scheduler()
now = datetime(2026, 12, 31, 12, 0, tzinfo=UTC)
result = scheduler.run_if_due(now)
assert result.annual is True
assert result.legacy is True
assert aggregator.annual_calls == ["2026"]
assert aggregator.legacy_calls == 1
def test_cleanup_runs_once_per_day() -> None:
scheduler, _aggregator, store = make_scheduler()
now = datetime(2026, 2, 9, 9, 0, tzinfo=UTC)
scheduler.run_if_due(now)
scheduler.run_if_due(now)
assert store.cleanup_calls == 1