From e3b1ecc5722bb22fe2c416c2fb8ea90e11abda0c Mon Sep 17 00:00:00 2001 From: agentson Date: Tue, 10 Feb 2026 04:26:51 +0900 Subject: [PATCH] feat: context aggregation scheduler (issue #87) - Add ContextScheduler with run_if_due() for periodic rollups - Weekly (Sunday), monthly (last day), quarterly, annual, legacy schedules - Daily cleanup of expired contexts via ContextStore - Dedup guard: each task runs at most once per day Co-Authored-By: Claude Opus 4.6 --- src/context/__init__.py | 3 +- src/context/scheduler.py | 135 ++++++++++++++++++++++++++++++++ tests/test_context_scheduler.py | 104 ++++++++++++++++++++++++ 3 files changed, 241 insertions(+), 1 deletion(-) create mode 100644 src/context/scheduler.py create mode 100644 tests/test_context_scheduler.py diff --git a/src/context/__init__.py b/src/context/__init__.py index 1301ce9..c620635 100644 --- a/src/context/__init__.py +++ b/src/context/__init__.py @@ -5,6 +5,7 @@ The context tree implements Pillar 2: hierarchical memory management across """ from src.context.layer import ContextLayer +from src.context.scheduler import ContextScheduler from src.context.store import ContextStore -__all__ = ["ContextLayer", "ContextStore"] +__all__ = ["ContextLayer", "ContextScheduler", "ContextStore"] diff --git a/src/context/scheduler.py b/src/context/scheduler.py new file mode 100644 index 0000000..4d153da --- /dev/null +++ b/src/context/scheduler.py @@ -0,0 +1,135 @@ +"""Context aggregation scheduler for periodic rollups and cleanup.""" + +from __future__ import annotations + +import sqlite3 +from calendar import monthrange +from dataclasses import dataclass +from datetime import UTC, datetime + +from src.context.aggregator import ContextAggregator +from src.context.store import ContextStore + + +@dataclass(frozen=True) +class ScheduleResult: + """Represents which scheduled tasks ran.""" + + weekly: bool = False + monthly: bool = False + quarterly: bool = False + annual: bool = False + legacy: bool = False + cleanup: bool = False + + +class ContextScheduler: + """Run periodic context aggregations and cleanup when due.""" + + def __init__( + self, + conn: sqlite3.Connection | None = None, + aggregator: ContextAggregator | None = None, + store: ContextStore | None = None, + ) -> None: + if aggregator is None: + if conn is None: + raise ValueError("conn is required when aggregator is not provided") + aggregator = ContextAggregator(conn) + self.aggregator = aggregator + + if store is None: + store = getattr(aggregator, "store", None) + if store is None: + if conn is None: + raise ValueError("conn is required when store is not provided") + store = ContextStore(conn) + self.store = store + + self._last_run: dict[str, str] = {} + + def run_if_due(self, now: datetime | None = None) -> ScheduleResult: + """Run scheduled aggregations if their schedule is due. + + Args: + now: Current datetime (UTC). If None, uses current time. + + Returns: + ScheduleResult indicating which tasks ran. + """ + if now is None: + now = datetime.now(UTC) + + today = now.date().isoformat() + result = ScheduleResult() + + if self._should_run("cleanup", today): + self.store.cleanup_expired_contexts() + result = self._with(result, cleanup=True) + + if self._is_sunday(now) and self._should_run("weekly", today): + week = now.strftime("%Y-W%V") + self.aggregator.aggregate_weekly_from_daily(week) + result = self._with(result, weekly=True) + + if self._is_last_day_of_month(now) and self._should_run("monthly", today): + month = now.strftime("%Y-%m") + self.aggregator.aggregate_monthly_from_weekly(month) + result = self._with(result, monthly=True) + + if self._is_last_day_of_quarter(now) and self._should_run("quarterly", today): + quarter = self._current_quarter(now) + self.aggregator.aggregate_quarterly_from_monthly(quarter) + result = self._with(result, quarterly=True) + + if self._is_last_day_of_year(now) and self._should_run("annual", today): + year = str(now.year) + self.aggregator.aggregate_annual_from_quarterly(year) + result = self._with(result, annual=True) + + # Legacy rollup runs after annual aggregation. + self.aggregator.aggregate_legacy_from_annual() + result = self._with(result, legacy=True) + + return result + + def _should_run(self, key: str, date_str: str) -> bool: + if self._last_run.get(key) == date_str: + return False + self._last_run[key] = date_str + return True + + @staticmethod + def _is_sunday(now: datetime) -> bool: + return now.weekday() == 6 + + @staticmethod + def _is_last_day_of_month(now: datetime) -> bool: + last_day = monthrange(now.year, now.month)[1] + return now.day == last_day + + @classmethod + def _is_last_day_of_quarter(cls, now: datetime) -> bool: + if now.month not in (3, 6, 9, 12): + return False + return cls._is_last_day_of_month(now) + + @staticmethod + def _is_last_day_of_year(now: datetime) -> bool: + return now.month == 12 and now.day == 31 + + @staticmethod + def _current_quarter(now: datetime) -> str: + quarter = (now.month - 1) // 3 + 1 + return f"{now.year}-Q{quarter}" + + @staticmethod + def _with(result: ScheduleResult, **kwargs: bool) -> ScheduleResult: + return ScheduleResult( + weekly=kwargs.get("weekly", result.weekly), + monthly=kwargs.get("monthly", result.monthly), + quarterly=kwargs.get("quarterly", result.quarterly), + annual=kwargs.get("annual", result.annual), + legacy=kwargs.get("legacy", result.legacy), + cleanup=kwargs.get("cleanup", result.cleanup), + ) diff --git a/tests/test_context_scheduler.py b/tests/test_context_scheduler.py new file mode 100644 index 0000000..189a033 --- /dev/null +++ b/tests/test_context_scheduler.py @@ -0,0 +1,104 @@ +"""Tests for ContextScheduler.""" + +from __future__ import annotations + +from dataclasses import dataclass +from datetime import UTC, datetime + +from src.context.scheduler import ContextScheduler + + +@dataclass +class StubAggregator: + """Stub aggregator that records calls.""" + + weekly_calls: list[str] + monthly_calls: list[str] + quarterly_calls: list[str] + annual_calls: list[str] + legacy_calls: int + + def aggregate_weekly_from_daily(self, week: str) -> None: + self.weekly_calls.append(week) + + def aggregate_monthly_from_weekly(self, month: str) -> None: + self.monthly_calls.append(month) + + def aggregate_quarterly_from_monthly(self, quarter: str) -> None: + self.quarterly_calls.append(quarter) + + def aggregate_annual_from_quarterly(self, year: str) -> None: + self.annual_calls.append(year) + + def aggregate_legacy_from_annual(self) -> None: + self.legacy_calls += 1 + + +@dataclass +class StubStore: + """Stub store that records cleanup calls.""" + + cleanup_calls: int = 0 + + def cleanup_expired_contexts(self) -> None: + self.cleanup_calls += 1 + + +def make_scheduler() -> tuple[ContextScheduler, StubAggregator, StubStore]: + aggregator = StubAggregator([], [], [], [], 0) + store = StubStore() + scheduler = ContextScheduler(aggregator=aggregator, store=store) + return scheduler, aggregator, store + + +def test_run_if_due_weekly() -> None: + scheduler, aggregator, store = make_scheduler() + now = datetime(2026, 2, 8, 10, 0, tzinfo=UTC) # Sunday + + result = scheduler.run_if_due(now) + + assert result.weekly is True + assert aggregator.weekly_calls == ["2026-W06"] + assert store.cleanup_calls == 1 + + +def test_run_if_due_monthly() -> None: + scheduler, aggregator, _store = make_scheduler() + now = datetime(2026, 2, 28, 12, 0, tzinfo=UTC) # Last day of month + + result = scheduler.run_if_due(now) + + assert result.monthly is True + assert aggregator.monthly_calls == ["2026-02"] + + +def test_run_if_due_quarterly() -> None: + scheduler, aggregator, _store = make_scheduler() + now = datetime(2026, 3, 31, 12, 0, tzinfo=UTC) # Last day of Q1 + + result = scheduler.run_if_due(now) + + assert result.quarterly is True + assert aggregator.quarterly_calls == ["2026-Q1"] + + +def test_run_if_due_annual_and_legacy() -> None: + scheduler, aggregator, _store = make_scheduler() + now = datetime(2026, 12, 31, 12, 0, tzinfo=UTC) + + result = scheduler.run_if_due(now) + + assert result.annual is True + assert result.legacy is True + assert aggregator.annual_calls == ["2026"] + assert aggregator.legacy_calls == 1 + + +def test_cleanup_runs_once_per_day() -> None: + scheduler, _aggregator, store = make_scheduler() + now = datetime(2026, 2, 9, 9, 0, tzinfo=UTC) + + scheduler.run_if_due(now) + scheduler.run_if_due(now) + + assert store.cleanup_calls == 1