"""Bridge Agent - Slack과 PTY 프로세스를 연결하는 핵심 중계 로직.""" from __future__ import annotations import logging import re import threading import time from lazy_enter.config import Config from lazy_enter.output_filter import clean_terminal_output from lazy_enter.pty_manager import PtyManager from lazy_enter.slack_handler import SlackHandler logger = logging.getLogger(__name__) class Bridge: """Slack ↔ CLI 프로세스 간의 중계기.""" ENTER_COMMANDS = {"!e", "!enter"} def __init__(self, config: Config | None = None) -> None: self.config = config or Config() self.slack = SlackHandler(self.config) self.pty: PtyManager | None = None self._output_thread: threading.Thread | None = None self._running = False self._channel: str = self.config.allowed_channel_id self._active_target: str | None = None self._last_sent_output: str = "" self._last_sent_fingerprint: str | None = None self._last_input_at = time.monotonic() self._last_output_at = time.monotonic() self._output_buffer_started_at: float | None = None self._input_idle_reported = False self._output_idle_reported = False self.slack.on_message(self._handle_message) self.slack.on_command(self._handle_command) def _handle_message(self, text: str, channel: str) -> None: """Slack 메시지를 PTY 프로세스로 전달한다.""" if not self.pty or not self.pty.is_alive: self.slack.send_message(channel, ":warning: 연결된 세션이 없습니다.") return if text.strip().lower() in self.ENTER_COMMANDS: self.pty.send_enter() self._last_sent_output = "" self._last_sent_fingerprint = None self._last_input_at = time.monotonic() self._input_idle_reported = False logger.info("엔터 입력 전달") return if self._is_blocked_input(text): self.slack.send_message( channel, ":no_entry: 차단된 명령 패턴이 감지되었습니다." ) return self.pty.send(text) # 입력 이후 출력은 동일 문자열이어도 한 번 더 전달한다. self._last_sent_output = "" self._last_sent_fingerprint = None self._last_input_at = time.monotonic() self._input_idle_reported = False logger.info("입력 전달(엔터 미포함): %s", text) @staticmethod def _is_blocked_input(text: str) -> bool: """치명적 쉘 명령 패턴을 단순 차단한다.""" normalized = re.sub(r"\s+", " ", text.lower()).strip() blocked_patterns = ( "rm -rf /", "rm -rf /*", "mkfs", ":(){:|:&};:", "shutdown -h", "reboot", "poweroff", ) return any(pattern in normalized for pattern in blocked_patterns) @staticmethod def _display_name(target: str) -> str: if target == "codex": return "Codex" return "Claude" def _session_name_for_target(self, target: str) -> str: if target == "codex": return self.config.codex_tmux_session_name return self.config.tmux_session_name def _handle_command(self, action: str, target: str, channel: str) -> None: """슬래시 커맨드를 처리한다.""" if action == "start": if target not in {"claude", "codex"}: self.slack.send_message(channel, ":warning: 지원하지 않는 대상입니다.") return self._start_session(channel, target) elif action == "stop": self._stop_session(channel) def _start_session(self, channel: str, target: str) -> None: """지정한 대상의 tmux 세션에 연결한다.""" if self.pty and self.pty.is_alive: self.slack.send_message( channel, ":information_source: 이미 세션에 연결되어 있습니다." ) return self._channel = channel self._active_target = target self._last_sent_output = "" self._last_sent_fingerprint = None self.pty = PtyManager( self._session_name_for_target(target), cli_name=target, ) try: self.pty.start() except RuntimeError as exc: self.pty = None self.slack.send_message(channel, f":warning: {exc}") return self._running = True self._last_input_at = time.monotonic() self._last_output_at = time.monotonic() self._output_buffer_started_at = None self._input_idle_reported = False self._output_idle_reported = False self._output_thread = threading.Thread(target=self._poll_output, daemon=True) self._output_thread.start() display_name = self._display_name(target) self.slack.send_message( channel, f":link: {display_name} 세션에 연결되었습니다." ) def _stop_session(self, channel: str) -> None: """브릿지 연결만 해제한다.""" self._running = False if self.pty: self.pty.stop() self.pty = None self._active_target = None self._last_sent_output = "" self._last_sent_fingerprint = None self._output_buffer_started_at = None self.slack.send_message(channel, ":electric_plug: 세션 연결이 해제되었습니다.") def _poll_output(self) -> None: """PTY 출력을 주기적으로 읽어 Slack으로 전송한다.""" buffer = "" while self._running and self.pty and self.pty.is_alive: now = time.monotonic() if buffer and self._should_flush_output_buffer(now): self._send_output_chunks(buffer) buffer = "" self._output_buffer_started_at = None read_timeout = self._next_read_timeout(now, has_buffer=bool(buffer)) output = self.pty.read_output(timeout=read_timeout) now = time.monotonic() if output: cleaned = clean_terminal_output(output) if cleaned: buffer += f"{cleaned}\n" self._last_output_at = now if self._output_buffer_started_at is None: self._output_buffer_started_at = now self._output_idle_reported = False if buffer and self._should_flush_output_buffer(now): self._send_output_chunks(buffer) buffer = "" self._output_buffer_started_at = None output_idle = now - self._last_output_at if ( self.config.output_idle_report_seconds > 0 and not self._output_idle_reported and output_idle >= self.config.output_idle_report_seconds ): self.slack.send_message( self._channel, ( f":hourglass_flowing_sand: 출력이 {int(output_idle)}초 동안 " "없습니다. 세션 상태를 확인해주세요." ), ) self._output_idle_reported = True input_idle = now - self._last_input_at if ( self.config.input_idle_report_seconds > 0 and not self._input_idle_reported and input_idle >= self.config.input_idle_report_seconds ): self.slack.send_message( self._channel, f":information_source: 입력이 {int(input_idle)}초 동안 없습니다.", ) self._input_idle_reported = True time.sleep(self.config.output_buffer_interval) if not self._running: self._output_buffer_started_at = None return if buffer: self._send_output_chunks(buffer) self._output_buffer_started_at = None # attach 프로세스가 예기치 않게 종료된 경우 self.slack.send_message(self._channel, ":warning: 세션 연결이 종료되었습니다.") def _next_read_timeout(self, now: float, has_buffer: bool) -> float: """다음 PTY 읽기 타임아웃을 계산한다.""" base_timeout = max(0.0, float(self.config.pty_read_timeout)) if not has_buffer: return base_timeout deadline = self._next_output_flush_deadline() if deadline is None: return base_timeout remaining = max(0.0, deadline - now) return min(base_timeout, remaining) def _next_output_flush_deadline(self) -> float | None: """버퍼 flush의 가장 이른 데드라인을 반환한다.""" deadlines: list[float] = [] settle_seconds = max(0.0, self.config.output_settle_seconds) if settle_seconds > 0: deadlines.append(self._last_output_at + settle_seconds) flush_interval_seconds = max(0.0, self.config.output_flush_interval_seconds) if self._output_buffer_started_at is not None: if flush_interval_seconds == 0: return self._output_buffer_started_at deadlines.append(self._output_buffer_started_at + flush_interval_seconds) if not deadlines: return None return min(deadlines) def _should_flush_output_buffer(self, now: float) -> bool: """버퍼를 Slack으로 전송할 시점을 계산한다.""" deadline = self._next_output_flush_deadline() return deadline is not None and now >= deadline @staticmethod def _split_message(text: str, max_length: int) -> list[str]: """긴 텍스트를 메시지 길이 제한에 맞게 분할한다.""" if max_length <= 0: return [text] if text else [] lines = text.splitlines(keepends=True) chunks: list[str] = [] current = "" def flush() -> None: nonlocal current if current: chunks.append(current) current = "" for line in lines: if len(line) > max_length: flush() start = 0 while start < len(line): piece = line[start : start + max_length] chunks.append(piece) start += max_length continue if len(current) + len(line) > max_length: flush() current += line flush() return chunks def _send_output_chunks(self, text: str) -> None: """출력을 잘라서 Slack으로 순차 전송한다.""" chunks = self._split_message(text, self.config.max_message_length) snapshot = "\n\x00".join(chunks) fingerprint = self._output_fingerprint(snapshot) if not chunks: return if ( self._last_sent_fingerprint is not None and fingerprint == self._last_sent_fingerprint ): return for chunk in chunks: if chunk.endswith("\n"): message = f"```\n{chunk}```" else: message = f"```\n{chunk}\n```" self.slack.send_message(self._channel, message) self._last_sent_output = snapshot self._last_sent_fingerprint = fingerprint @staticmethod def _output_fingerprint(text: str) -> str: """중복 전송 억제를 위한 정규화 지문을 생성한다.""" normalized_lines: list[str] = [] for raw_line in text.splitlines(): line = raw_line.rstrip() if not line.strip(): continue # tmux 상태줄의 시계/날짜 라인은 프레임마다 변할 수 있어 제외한다. if re.fullmatch( ( r'\s*\w*odex\]\s+\d+:[^\[]*' r'\[\d+,\d+\]\s+".+"\s+\d{2}:\d{2}\s+\d{2}-[A-Za-z]{3}-\d{2}' ), line, ): continue normalized_lines.append(line) return "\n".join(normalized_lines) def run(self) -> None: """브릿지를 시작한다.""" logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", ) logger.info("LazyEnter Bridge 시작") try: self.slack.start() except KeyboardInterrupt: logger.info("종료 신호 수신") finally: self._running = False if self.pty: self.pty.stop() self.slack.stop()