Throttle and schedule buffered output delivery #10
@@ -13,6 +13,8 @@ PTY_READ_TIMEOUT=5
|
|||||||
|
|
||||||
# 출력 버퍼 설정
|
# 출력 버퍼 설정
|
||||||
OUTPUT_BUFFER_INTERVAL=2.0
|
OUTPUT_BUFFER_INTERVAL=2.0
|
||||||
|
OUTPUT_SETTLE_SECONDS=4.0
|
||||||
|
OUTPUT_FLUSH_INTERVAL_SECONDS=15.0
|
||||||
MAX_MESSAGE_LENGTH=3000
|
MAX_MESSAGE_LENGTH=3000
|
||||||
|
|
||||||
# 상태 보고 / 재연결 설정
|
# 상태 보고 / 재연결 설정
|
||||||
|
|||||||
@@ -41,6 +41,8 @@ cp .env.example .env
|
|||||||
- `CODEX_TMUX_SESSION_NAME` (기본: `codex`, `/start-codex` 대상)
|
- `CODEX_TMUX_SESSION_NAME` (기본: `codex`, `/start-codex` 대상)
|
||||||
- `PTY_READ_TIMEOUT` (기본: `5`)
|
- `PTY_READ_TIMEOUT` (기본: `5`)
|
||||||
- `OUTPUT_BUFFER_INTERVAL` (기본: `2.0`)
|
- `OUTPUT_BUFFER_INTERVAL` (기본: `2.0`)
|
||||||
|
- `OUTPUT_SETTLE_SECONDS` (기본: `4.0`, 출력이 잠잠해진 뒤 전송 대기 시간)
|
||||||
|
- `OUTPUT_FLUSH_INTERVAL_SECONDS` (기본: `15.0`, 출력이 계속 이어질 때 강제 전송 주기)
|
||||||
- `MAX_MESSAGE_LENGTH` (기본: `3000`)
|
- `MAX_MESSAGE_LENGTH` (기본: `3000`)
|
||||||
- `RECONNECT_DELAY_SECONDS` (기본: `5.0`, Socket Mode 재연결 대기 시간)
|
- `RECONNECT_DELAY_SECONDS` (기본: `5.0`, Socket Mode 재연결 대기 시간)
|
||||||
- `OUTPUT_IDLE_REPORT_SECONDS` (기본: `120`, 출력 정지 보고 임계값)
|
- `OUTPUT_IDLE_REPORT_SECONDS` (기본: `120`, 출력 정지 보고 임계값)
|
||||||
|
|||||||
@@ -30,6 +30,7 @@ class Bridge:
|
|||||||
self._last_sent_fingerprint: str | None = None
|
self._last_sent_fingerprint: str | None = None
|
||||||
self._last_input_at = time.monotonic()
|
self._last_input_at = time.monotonic()
|
||||||
self._last_output_at = time.monotonic()
|
self._last_output_at = time.monotonic()
|
||||||
|
self._output_buffer_started_at: float | None = None
|
||||||
self._input_idle_reported = False
|
self._input_idle_reported = False
|
||||||
self._output_idle_reported = False
|
self._output_idle_reported = False
|
||||||
|
|
||||||
@@ -118,6 +119,7 @@ class Bridge:
|
|||||||
self._running = True
|
self._running = True
|
||||||
self._last_input_at = time.monotonic()
|
self._last_input_at = time.monotonic()
|
||||||
self._last_output_at = time.monotonic()
|
self._last_output_at = time.monotonic()
|
||||||
|
self._output_buffer_started_at = None
|
||||||
self._input_idle_reported = False
|
self._input_idle_reported = False
|
||||||
self._output_idle_reported = False
|
self._output_idle_reported = False
|
||||||
self._output_thread = threading.Thread(target=self._poll_output, daemon=True)
|
self._output_thread = threading.Thread(target=self._poll_output, daemon=True)
|
||||||
@@ -137,6 +139,7 @@ class Bridge:
|
|||||||
self._active_target = None
|
self._active_target = None
|
||||||
self._last_sent_output = ""
|
self._last_sent_output = ""
|
||||||
self._last_sent_fingerprint = None
|
self._last_sent_fingerprint = None
|
||||||
|
self._output_buffer_started_at = None
|
||||||
self.slack.send_message(channel, ":electric_plug: 세션 연결이 해제되었습니다.")
|
self.slack.send_message(channel, ":electric_plug: 세션 연결이 해제되었습니다.")
|
||||||
|
|
||||||
def _poll_output(self) -> None:
|
def _poll_output(self) -> None:
|
||||||
@@ -144,17 +147,27 @@ class Bridge:
|
|||||||
buffer = ""
|
buffer = ""
|
||||||
while self._running and self.pty and self.pty.is_alive:
|
while self._running and self.pty and self.pty.is_alive:
|
||||||
now = time.monotonic()
|
now = time.monotonic()
|
||||||
output = self.pty.read_output(timeout=self.config.pty_read_timeout)
|
if buffer and self._should_flush_output_buffer(now):
|
||||||
|
self._send_output_chunks(buffer)
|
||||||
|
buffer = ""
|
||||||
|
self._output_buffer_started_at = None
|
||||||
|
|
||||||
|
read_timeout = self._next_read_timeout(now, has_buffer=bool(buffer))
|
||||||
|
output = self.pty.read_output(timeout=read_timeout)
|
||||||
|
now = time.monotonic()
|
||||||
if output:
|
if output:
|
||||||
cleaned = clean_terminal_output(output)
|
cleaned = clean_terminal_output(output)
|
||||||
if cleaned:
|
if cleaned:
|
||||||
buffer += f"{cleaned}\n"
|
buffer += f"{cleaned}\n"
|
||||||
self._last_output_at = now
|
self._last_output_at = now
|
||||||
|
if self._output_buffer_started_at is None:
|
||||||
|
self._output_buffer_started_at = now
|
||||||
self._output_idle_reported = False
|
self._output_idle_reported = False
|
||||||
|
|
||||||
if buffer:
|
if buffer and self._should_flush_output_buffer(now):
|
||||||
self._send_output_chunks(buffer)
|
self._send_output_chunks(buffer)
|
||||||
buffer = ""
|
buffer = ""
|
||||||
|
self._output_buffer_started_at = None
|
||||||
|
|
||||||
output_idle = now - self._last_output_at
|
output_idle = now - self._last_output_at
|
||||||
if (
|
if (
|
||||||
@@ -186,11 +199,51 @@ class Bridge:
|
|||||||
time.sleep(self.config.output_buffer_interval)
|
time.sleep(self.config.output_buffer_interval)
|
||||||
|
|
||||||
if not self._running:
|
if not self._running:
|
||||||
|
self._output_buffer_started_at = None
|
||||||
return
|
return
|
||||||
|
|
||||||
|
if buffer:
|
||||||
|
self._send_output_chunks(buffer)
|
||||||
|
self._output_buffer_started_at = None
|
||||||
|
|
||||||
# attach 프로세스가 예기치 않게 종료된 경우
|
# attach 프로세스가 예기치 않게 종료된 경우
|
||||||
self.slack.send_message(self._channel, ":warning: 세션 연결이 종료되었습니다.")
|
self.slack.send_message(self._channel, ":warning: 세션 연결이 종료되었습니다.")
|
||||||
|
|
||||||
|
def _next_read_timeout(self, now: float, has_buffer: bool) -> float:
|
||||||
|
"""다음 PTY 읽기 타임아웃을 계산한다."""
|
||||||
|
base_timeout = max(0.0, float(self.config.pty_read_timeout))
|
||||||
|
if not has_buffer:
|
||||||
|
return base_timeout
|
||||||
|
|
||||||
|
deadline = self._next_output_flush_deadline()
|
||||||
|
if deadline is None:
|
||||||
|
return base_timeout
|
||||||
|
|
||||||
|
remaining = max(0.0, deadline - now)
|
||||||
|
return min(base_timeout, remaining)
|
||||||
|
|
||||||
|
def _next_output_flush_deadline(self) -> float | None:
|
||||||
|
"""버퍼 flush의 가장 이른 데드라인을 반환한다."""
|
||||||
|
deadlines: list[float] = []
|
||||||
|
settle_seconds = max(0.0, self.config.output_settle_seconds)
|
||||||
|
if settle_seconds > 0:
|
||||||
|
deadlines.append(self._last_output_at + settle_seconds)
|
||||||
|
|
||||||
|
flush_interval_seconds = max(0.0, self.config.output_flush_interval_seconds)
|
||||||
|
if self._output_buffer_started_at is not None:
|
||||||
|
if flush_interval_seconds == 0:
|
||||||
|
return self._output_buffer_started_at
|
||||||
|
deadlines.append(self._output_buffer_started_at + flush_interval_seconds)
|
||||||
|
|
||||||
|
if not deadlines:
|
||||||
|
return None
|
||||||
|
return min(deadlines)
|
||||||
|
|
||||||
|
def _should_flush_output_buffer(self, now: float) -> bool:
|
||||||
|
"""버퍼를 Slack으로 전송할 시점을 계산한다."""
|
||||||
|
deadline = self._next_output_flush_deadline()
|
||||||
|
return deadline is not None and now >= deadline
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _split_message(text: str, max_length: int) -> list[str]:
|
def _split_message(text: str, max_length: int) -> list[str]:
|
||||||
"""긴 텍스트를 메시지 길이 제한에 맞게 분할한다."""
|
"""긴 텍스트를 메시지 길이 제한에 맞게 분할한다."""
|
||||||
|
|||||||
@@ -25,6 +25,10 @@ class Config:
|
|||||||
|
|
||||||
# Buffer
|
# Buffer
|
||||||
output_buffer_interval: float = float(os.getenv("OUTPUT_BUFFER_INTERVAL", "2.0"))
|
output_buffer_interval: float = float(os.getenv("OUTPUT_BUFFER_INTERVAL", "2.0"))
|
||||||
|
output_settle_seconds: float = float(os.getenv("OUTPUT_SETTLE_SECONDS", "4.0"))
|
||||||
|
output_flush_interval_seconds: float = float(
|
||||||
|
os.getenv("OUTPUT_FLUSH_INTERVAL_SECONDS", "15.0")
|
||||||
|
)
|
||||||
max_message_length: int = int(os.getenv("MAX_MESSAGE_LENGTH", "3000"))
|
max_message_length: int = int(os.getenv("MAX_MESSAGE_LENGTH", "3000"))
|
||||||
|
|
||||||
# Status reporting / reconnect
|
# Status reporting / reconnect
|
||||||
|
|||||||
@@ -55,7 +55,7 @@ class PtyManager:
|
|||||||
logger.debug("입력 전송: %s", text)
|
logger.debug("입력 전송: %s", text)
|
||||||
self._process.sendline(text)
|
self._process.sendline(text)
|
||||||
|
|
||||||
def read_output(self, timeout: int = 5) -> str:
|
def read_output(self, timeout: float = 5) -> str:
|
||||||
"""프로세스의 출력을 읽는다."""
|
"""프로세스의 출력을 읽는다."""
|
||||||
if not self.is_alive:
|
if not self.is_alive:
|
||||||
raise RuntimeError("프로세스가 실행 중이 아닙니다.")
|
raise RuntimeError("프로세스가 실행 중이 아닙니다.")
|
||||||
|
|||||||
@@ -61,7 +61,7 @@ class FakePtyManager:
|
|||||||
def send(self, text: str) -> None:
|
def send(self, text: str) -> None:
|
||||||
self.sent_inputs.append(text)
|
self.sent_inputs.append(text)
|
||||||
|
|
||||||
def read_output(self, timeout: int = 5) -> str:
|
def read_output(self, timeout: float = 5) -> str:
|
||||||
return ""
|
return ""
|
||||||
|
|
||||||
|
|
||||||
@@ -297,3 +297,105 @@ def test_send_output_chunks_keeps_non_tmux_status_like_lines(monkeypatch) -> Non
|
|||||||
("C1", "```\n[2,3] \"job-runner\" 05:12 17-Feb-26\n```"),
|
("C1", "```\n[2,3] \"job-runner\" 05:12 17-Feb-26\n```"),
|
||||||
("C1", "```\n[2,3] \"job-runner\" 05:13 17-Feb-26\n```"),
|
("C1", "```\n[2,3] \"job-runner\" 05:13 17-Feb-26\n```"),
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def test_should_flush_output_buffer_when_settled(monkeypatch) -> None:
|
||||||
|
bridge = _make_bridge(monkeypatch)
|
||||||
|
bridge.config.output_settle_seconds = 4.0
|
||||||
|
bridge.config.output_flush_interval_seconds = 15.0
|
||||||
|
bridge._last_output_at = 10.0
|
||||||
|
bridge._output_buffer_started_at = 2.0
|
||||||
|
|
||||||
|
assert bridge._should_flush_output_buffer(14.1) is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_should_flush_output_buffer_when_flush_interval_elapsed(monkeypatch) -> None:
|
||||||
|
bridge = _make_bridge(monkeypatch)
|
||||||
|
bridge.config.output_settle_seconds = 4.0
|
||||||
|
bridge.config.output_flush_interval_seconds = 15.0
|
||||||
|
bridge._last_output_at = 20.0
|
||||||
|
bridge._output_buffer_started_at = 2.0
|
||||||
|
|
||||||
|
assert bridge._should_flush_output_buffer(17.1) is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_should_flush_output_buffer_false_during_active_stream(monkeypatch) -> None:
|
||||||
|
bridge = _make_bridge(monkeypatch)
|
||||||
|
bridge.config.output_settle_seconds = 4.0
|
||||||
|
bridge.config.output_flush_interval_seconds = 15.0
|
||||||
|
bridge._last_output_at = 19.0
|
||||||
|
bridge._output_buffer_started_at = 10.0
|
||||||
|
|
||||||
|
assert bridge._should_flush_output_buffer(20.0) is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_poll_output_skips_final_flush_after_intentional_stop(monkeypatch) -> None:
|
||||||
|
bridge = _make_bridge(monkeypatch)
|
||||||
|
bridge._channel = "C1"
|
||||||
|
bridge.config.output_settle_seconds = 9999.0
|
||||||
|
bridge.config.output_flush_interval_seconds = 9999.0
|
||||||
|
bridge.config.output_buffer_interval = 0.0
|
||||||
|
|
||||||
|
pty = FakePtyManager("codex-room", cli_name="codex")
|
||||||
|
pty._alive = True
|
||||||
|
bridge.pty = pty
|
||||||
|
bridge._running = True
|
||||||
|
|
||||||
|
sent_buffers: list[str] = []
|
||||||
|
monkeypatch.setattr(bridge, "_send_output_chunks", sent_buffers.append)
|
||||||
|
|
||||||
|
def _read_output(timeout: float = 5) -> str:
|
||||||
|
bridge._running = False
|
||||||
|
return "planning update"
|
||||||
|
|
||||||
|
monkeypatch.setattr(pty, "read_output", _read_output)
|
||||||
|
bridge._poll_output()
|
||||||
|
|
||||||
|
assert sent_buffers == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_next_read_timeout_is_capped_by_flush_deadline(monkeypatch) -> None:
|
||||||
|
bridge = _make_bridge(monkeypatch)
|
||||||
|
bridge.config.pty_read_timeout = 5
|
||||||
|
bridge.config.output_settle_seconds = 4.0
|
||||||
|
bridge.config.output_flush_interval_seconds = 15.0
|
||||||
|
bridge._last_output_at = 100.0
|
||||||
|
bridge._output_buffer_started_at = 95.0
|
||||||
|
|
||||||
|
timeout = bridge._next_read_timeout(103.6, has_buffer=True)
|
||||||
|
assert 0.39 <= timeout <= 0.41
|
||||||
|
|
||||||
|
|
||||||
|
def test_poll_output_uses_shorter_timeout_near_settle_deadline(monkeypatch) -> None:
|
||||||
|
bridge = _make_bridge(monkeypatch)
|
||||||
|
bridge._channel = "C1"
|
||||||
|
bridge.config.pty_read_timeout = 5
|
||||||
|
bridge.config.output_settle_seconds = 4.0
|
||||||
|
bridge.config.output_flush_interval_seconds = 15.0
|
||||||
|
bridge.config.output_buffer_interval = 0.0
|
||||||
|
bridge.config.output_idle_report_seconds = 0
|
||||||
|
bridge.config.input_idle_report_seconds = 0
|
||||||
|
|
||||||
|
pty = FakePtyManager("codex-room", cli_name="codex")
|
||||||
|
pty._alive = True
|
||||||
|
bridge.pty = pty
|
||||||
|
bridge._running = True
|
||||||
|
|
||||||
|
observed_timeouts: list[float] = []
|
||||||
|
call_count = 0
|
||||||
|
|
||||||
|
def _read_output(timeout: float = 5) -> str:
|
||||||
|
nonlocal call_count
|
||||||
|
observed_timeouts.append(timeout)
|
||||||
|
if call_count == 0:
|
||||||
|
call_count += 1
|
||||||
|
return "first chunk"
|
||||||
|
bridge._running = False
|
||||||
|
return ""
|
||||||
|
|
||||||
|
monkeypatch.setattr(pty, "read_output", _read_output)
|
||||||
|
bridge._poll_output()
|
||||||
|
|
||||||
|
assert len(observed_timeouts) == 2
|
||||||
|
assert observed_timeouts[0] == 5
|
||||||
|
assert 0.0 <= observed_timeouts[1] < 5
|
||||||
|
|||||||
@@ -11,6 +11,8 @@ def test_config_defaults():
|
|||||||
assert config.codex_tmux_session_name == "codex"
|
assert config.codex_tmux_session_name == "codex"
|
||||||
assert config.pty_read_timeout == 5
|
assert config.pty_read_timeout == 5
|
||||||
assert config.output_buffer_interval == 2.0
|
assert config.output_buffer_interval == 2.0
|
||||||
|
assert config.output_settle_seconds == 4.0
|
||||||
|
assert config.output_flush_interval_seconds == 15.0
|
||||||
assert config.max_message_length == 3000
|
assert config.max_message_length == 3000
|
||||||
assert config.reconnect_delay_seconds == 5.0
|
assert config.reconnect_delay_seconds == 5.0
|
||||||
assert config.output_idle_report_seconds == 120
|
assert config.output_idle_report_seconds == 120
|
||||||
|
|||||||
Reference in New Issue
Block a user