352 lines
13 KiB
Python
352 lines
13 KiB
Python
"""Bridge Agent - Slack과 PTY 프로세스를 연결하는 핵심 중계 로직."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import re
|
|
import threading
|
|
import time
|
|
|
|
from lazy_enter.config import Config
|
|
from lazy_enter.output_filter import clean_terminal_output
|
|
from lazy_enter.pty_manager import PtyManager
|
|
from lazy_enter.slack_handler import SlackHandler
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class Bridge:
|
|
"""Slack ↔ CLI 프로세스 간의 중계기."""
|
|
|
|
ENTER_COMMANDS = {"!e", "!enter"}
|
|
|
|
def __init__(self, config: Config | None = None) -> None:
|
|
self.config = config or Config()
|
|
self.slack = SlackHandler(self.config)
|
|
self.pty: PtyManager | None = None
|
|
self._output_thread: threading.Thread | None = None
|
|
self._running = False
|
|
self._channel: str = self.config.allowed_channel_id
|
|
self._active_target: str | None = None
|
|
self._last_sent_output: str = ""
|
|
self._last_sent_fingerprint: str | None = None
|
|
self._last_input_at = time.monotonic()
|
|
self._last_output_at = time.monotonic()
|
|
self._output_buffer_started_at: float | None = None
|
|
self._input_idle_reported = False
|
|
self._output_idle_reported = False
|
|
|
|
self.slack.on_message(self._handle_message)
|
|
self.slack.on_command(self._handle_command)
|
|
|
|
def _handle_message(self, text: str, channel: str) -> None:
|
|
"""Slack 메시지를 PTY 프로세스로 전달한다."""
|
|
if not self.pty or not self.pty.is_alive:
|
|
self.slack.send_message(channel, ":warning: 연결된 세션이 없습니다.")
|
|
return
|
|
|
|
if text.strip().lower() in self.ENTER_COMMANDS:
|
|
self.pty.send_enter()
|
|
self._last_sent_output = ""
|
|
self._last_sent_fingerprint = None
|
|
self._last_input_at = time.monotonic()
|
|
self._input_idle_reported = False
|
|
logger.info("엔터 입력 전달")
|
|
return
|
|
|
|
if self._is_blocked_input(text):
|
|
self.slack.send_message(
|
|
channel, ":no_entry: 차단된 명령 패턴이 감지되었습니다."
|
|
)
|
|
return
|
|
|
|
self.pty.send(text)
|
|
# 입력 이후 출력은 동일 문자열이어도 한 번 더 전달한다.
|
|
self._last_sent_output = ""
|
|
self._last_sent_fingerprint = None
|
|
self._last_input_at = time.monotonic()
|
|
self._input_idle_reported = False
|
|
logger.info("입력 전달(엔터 미포함): %s", text)
|
|
|
|
@staticmethod
|
|
def _is_blocked_input(text: str) -> bool:
|
|
"""치명적 쉘 명령 패턴을 단순 차단한다."""
|
|
normalized = re.sub(r"\s+", " ", text.lower()).strip()
|
|
blocked_patterns = (
|
|
"rm -rf /",
|
|
"rm -rf /*",
|
|
"mkfs",
|
|
":(){:|:&};:",
|
|
"shutdown -h",
|
|
"reboot",
|
|
"poweroff",
|
|
)
|
|
return any(pattern in normalized for pattern in blocked_patterns)
|
|
|
|
@staticmethod
|
|
def _display_name(target: str) -> str:
|
|
if target == "codex":
|
|
return "Codex"
|
|
return "Claude"
|
|
|
|
def _session_name_for_target(self, target: str) -> str:
|
|
if target == "codex":
|
|
return self.config.codex_tmux_session_name
|
|
return self.config.tmux_session_name
|
|
|
|
def _handle_command(self, action: str, target: str, channel: str) -> None:
|
|
"""슬래시 커맨드를 처리한다."""
|
|
if target not in {"claude", "codex"}:
|
|
self.slack.send_message(channel, ":warning: 지원하지 않는 대상입니다.")
|
|
return
|
|
|
|
if action == "start":
|
|
self._start_session(channel, target)
|
|
elif action == "stop":
|
|
self._stop_session(channel)
|
|
|
|
def _start_session(self, channel: str, target: str) -> None:
|
|
"""지정한 대상의 tmux 세션에 연결한다."""
|
|
if self.pty and self.pty.is_alive:
|
|
self.slack.send_message(
|
|
channel, ":information_source: 이미 세션에 연결되어 있습니다."
|
|
)
|
|
return
|
|
|
|
self._channel = channel
|
|
self._active_target = target
|
|
self._last_sent_output = ""
|
|
self._last_sent_fingerprint = None
|
|
self.pty = PtyManager(
|
|
self._session_name_for_target(target),
|
|
cli_name=target,
|
|
)
|
|
try:
|
|
self.pty.start()
|
|
except RuntimeError as exc:
|
|
self.pty = None
|
|
self.slack.send_message(channel, f":warning: {exc}")
|
|
return
|
|
|
|
self._running = True
|
|
self._last_input_at = time.monotonic()
|
|
self._last_output_at = time.monotonic()
|
|
self._output_buffer_started_at = None
|
|
self._input_idle_reported = False
|
|
self._output_idle_reported = False
|
|
self._output_thread = threading.Thread(target=self._poll_output, daemon=True)
|
|
self._output_thread.start()
|
|
|
|
display_name = self._display_name(target)
|
|
self.slack.send_message(
|
|
channel, f":link: {display_name} 세션에 연결되었습니다."
|
|
)
|
|
|
|
def _stop_session(self, channel: str) -> None:
|
|
"""브릿지 연결만 해제한다."""
|
|
self._running = False
|
|
if self.pty:
|
|
self.pty.stop()
|
|
self.pty = None
|
|
self._active_target = None
|
|
self._last_sent_output = ""
|
|
self._last_sent_fingerprint = None
|
|
self._output_buffer_started_at = None
|
|
self.slack.send_message(channel, ":electric_plug: 세션 연결이 해제되었습니다.")
|
|
|
|
def _poll_output(self) -> None:
|
|
"""PTY 출력을 주기적으로 읽어 Slack으로 전송한다."""
|
|
buffer = ""
|
|
while self._running and self.pty and self.pty.is_alive:
|
|
now = time.monotonic()
|
|
if buffer and self._should_flush_output_buffer(now):
|
|
self._send_output_chunks(buffer)
|
|
buffer = ""
|
|
self._output_buffer_started_at = None
|
|
|
|
read_timeout = self._next_read_timeout(now, has_buffer=bool(buffer))
|
|
output = self.pty.read_output(timeout=read_timeout)
|
|
now = time.monotonic()
|
|
if output:
|
|
cleaned = clean_terminal_output(output)
|
|
if cleaned:
|
|
buffer += f"{cleaned}\n"
|
|
self._last_output_at = now
|
|
if self._output_buffer_started_at is None:
|
|
self._output_buffer_started_at = now
|
|
self._output_idle_reported = False
|
|
|
|
if buffer and self._should_flush_output_buffer(now):
|
|
self._send_output_chunks(buffer)
|
|
buffer = ""
|
|
self._output_buffer_started_at = None
|
|
|
|
output_idle = now - self._last_output_at
|
|
if (
|
|
self.config.output_idle_report_seconds > 0
|
|
and not self._output_idle_reported
|
|
and output_idle >= self.config.output_idle_report_seconds
|
|
):
|
|
self.slack.send_message(
|
|
self._channel,
|
|
(
|
|
f":hourglass_flowing_sand: 출력이 {int(output_idle)}초 동안 "
|
|
"없습니다. 세션 상태를 확인해주세요."
|
|
),
|
|
)
|
|
self._output_idle_reported = True
|
|
|
|
input_idle = now - self._last_input_at
|
|
if (
|
|
self.config.input_idle_report_seconds > 0
|
|
and not self._input_idle_reported
|
|
and input_idle >= self.config.input_idle_report_seconds
|
|
):
|
|
self.slack.send_message(
|
|
self._channel,
|
|
f":information_source: 입력이 {int(input_idle)}초 동안 없습니다.",
|
|
)
|
|
self._input_idle_reported = True
|
|
|
|
time.sleep(self.config.output_buffer_interval)
|
|
|
|
if not self._running:
|
|
self._output_buffer_started_at = None
|
|
return
|
|
|
|
if buffer:
|
|
self._send_output_chunks(buffer)
|
|
self._output_buffer_started_at = None
|
|
|
|
# attach 프로세스가 예기치 않게 종료된 경우
|
|
self.slack.send_message(self._channel, ":warning: 세션 연결이 종료되었습니다.")
|
|
|
|
def _next_read_timeout(self, now: float, has_buffer: bool) -> float:
|
|
"""다음 PTY 읽기 타임아웃을 계산한다."""
|
|
base_timeout = max(0.0, float(self.config.pty_read_timeout))
|
|
if not has_buffer:
|
|
return base_timeout
|
|
|
|
deadline = self._next_output_flush_deadline()
|
|
if deadline is None:
|
|
return base_timeout
|
|
|
|
remaining = max(0.0, deadline - now)
|
|
return min(base_timeout, remaining)
|
|
|
|
def _next_output_flush_deadline(self) -> float | None:
|
|
"""버퍼 flush의 가장 이른 데드라인을 반환한다."""
|
|
deadlines: list[float] = []
|
|
settle_seconds = max(0.0, self.config.output_settle_seconds)
|
|
if settle_seconds > 0:
|
|
deadlines.append(self._last_output_at + settle_seconds)
|
|
|
|
flush_interval_seconds = max(0.0, self.config.output_flush_interval_seconds)
|
|
if self._output_buffer_started_at is not None:
|
|
if flush_interval_seconds == 0:
|
|
return self._output_buffer_started_at
|
|
deadlines.append(self._output_buffer_started_at + flush_interval_seconds)
|
|
|
|
if not deadlines:
|
|
return None
|
|
return min(deadlines)
|
|
|
|
def _should_flush_output_buffer(self, now: float) -> bool:
|
|
"""버퍼를 Slack으로 전송할 시점을 계산한다."""
|
|
deadline = self._next_output_flush_deadline()
|
|
return deadline is not None and now >= deadline
|
|
|
|
@staticmethod
|
|
def _split_message(text: str, max_length: int) -> list[str]:
|
|
"""긴 텍스트를 메시지 길이 제한에 맞게 분할한다."""
|
|
if max_length <= 0:
|
|
return [text] if text else []
|
|
|
|
lines = text.splitlines(keepends=True)
|
|
chunks: list[str] = []
|
|
current = ""
|
|
|
|
def flush() -> None:
|
|
nonlocal current
|
|
if current:
|
|
chunks.append(current)
|
|
current = ""
|
|
|
|
for line in lines:
|
|
if len(line) > max_length:
|
|
flush()
|
|
start = 0
|
|
while start < len(line):
|
|
piece = line[start : start + max_length]
|
|
chunks.append(piece)
|
|
start += max_length
|
|
continue
|
|
|
|
if len(current) + len(line) > max_length:
|
|
flush()
|
|
current += line
|
|
|
|
flush()
|
|
return chunks
|
|
|
|
def _send_output_chunks(self, text: str) -> None:
|
|
"""출력을 잘라서 Slack으로 순차 전송한다."""
|
|
chunks = self._split_message(text, self.config.max_message_length)
|
|
snapshot = "\n\x00".join(chunks)
|
|
fingerprint = self._output_fingerprint(snapshot)
|
|
if not chunks:
|
|
return
|
|
if (
|
|
self._last_sent_fingerprint is not None
|
|
and fingerprint == self._last_sent_fingerprint
|
|
):
|
|
return
|
|
|
|
for chunk in chunks:
|
|
if chunk.endswith("\n"):
|
|
message = f"```\n{chunk}```"
|
|
else:
|
|
message = f"```\n{chunk}\n```"
|
|
self.slack.send_message(self._channel, message)
|
|
self._last_sent_output = snapshot
|
|
self._last_sent_fingerprint = fingerprint
|
|
|
|
@staticmethod
|
|
def _output_fingerprint(text: str) -> str:
|
|
"""중복 전송 억제를 위한 정규화 지문을 생성한다."""
|
|
normalized_lines: list[str] = []
|
|
for raw_line in text.splitlines():
|
|
line = raw_line.rstrip()
|
|
if not line.strip():
|
|
continue
|
|
|
|
# tmux 상태줄의 시계/날짜 라인은 프레임마다 변할 수 있어 제외한다.
|
|
if re.fullmatch(
|
|
(
|
|
r'\s*\w*odex\]\s+\d+:[^\[]*'
|
|
r'\[\d+,\d+\]\s+".+"\s+\d{2}:\d{2}\s+\d{2}-[A-Za-z]{3}-\d{2}'
|
|
),
|
|
line,
|
|
):
|
|
continue
|
|
|
|
normalized_lines.append(line)
|
|
return "\n".join(normalized_lines)
|
|
|
|
def run(self) -> None:
|
|
"""브릿지를 시작한다."""
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
|
|
)
|
|
logger.info("LazyEnter Bridge 시작")
|
|
try:
|
|
self.slack.start()
|
|
except KeyboardInterrupt:
|
|
logger.info("종료 신호 수신")
|
|
finally:
|
|
self._running = False
|
|
if self.pty:
|
|
self.pty.stop()
|
|
self.slack.stop()
|