"""Bridge Agent - Slack과 PTY 프로세스를 연결하는 핵심 중계 로직.""" from __future__ import annotations import logging import threading import time from lazy_enter.config import Config from lazy_enter.pty_manager import PtyManager from lazy_enter.slack_handler import SlackHandler logger = logging.getLogger(__name__) class Bridge: """Slack ↔ CLI 프로세스 간의 중계기.""" def __init__(self, config: Config | None = None) -> None: self.config = config or Config() self.slack = SlackHandler(self.config) self.pty: PtyManager | None = None self._output_thread: threading.Thread | None = None self._running = False self._channel: str = self.config.allowed_channel_id self.slack.on_message(self._handle_message) self.slack.on_command(self._handle_command) def _handle_message(self, text: str, channel: str) -> None: """Slack 메시지를 PTY 프로세스로 전달한다.""" if self.pty and self.pty.is_alive: self.pty.send(text) logger.info("입력 전달: %s", text) else: self.slack.send_message(channel, ":warning: 실행 중인 세션이 없습니다.") def _handle_command(self, command: str, channel: str) -> None: """슬래시 커맨드를 처리한다.""" if command == "start": self._start_session(channel) elif command == "stop": self._stop_session(channel) def _start_session(self, channel: str) -> None: """Claude Code 세션을 시작한다.""" if self.pty and self.pty.is_alive: self.slack.send_message( channel, ":information_source: 이미 세션이 실행 중입니다." ) return self._channel = channel self.pty = PtyManager(self.config.default_shell) self.pty.start() self._running = True self._output_thread = threading.Thread(target=self._poll_output, daemon=True) self._output_thread.start() self.slack.send_message(channel, ":rocket: Claude Code 세션이 시작되었습니다.") def _stop_session(self, channel: str) -> None: """Claude Code 세션을 종료한다.""" self._running = False if self.pty: self.pty.stop() self.pty = None self.slack.send_message(channel, ":stop_sign: 세션이 종료되었습니다.") def _poll_output(self) -> None: """PTY 출력을 주기적으로 읽어 Slack으로 전송한다.""" buffer = "" while self._running and self.pty and self.pty.is_alive: output = self.pty.read_output(timeout=self.config.pty_read_timeout) if output: buffer += output if buffer: # 메시지 길이 제한 적용 message = buffer[: self.config.max_message_length] if len(buffer) > self.config.max_message_length: message += "\n... (truncated)" self.slack.send_message(self._channel, f"```\n{message}\n```") buffer = "" time.sleep(self.config.output_buffer_interval) if not self._running: return # 프로세스가 예기치 않게 종료된 경우 self.slack.send_message(self._channel, ":warning: 프로세스가 종료되었습니다.") def run(self) -> None: """브릿지를 시작한다.""" logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", ) logger.info("LazyEnter Bridge 시작") try: self.slack.start() except KeyboardInterrupt: logger.info("종료 신호 수신") finally: self._running = False if self.pty: self.pty.stop() self.slack.stop()