6 Commits

10 changed files with 268 additions and 66 deletions

View File

@@ -6,8 +6,8 @@ SLACK_APP_TOKEN=xapp-your-app-token
SLACK_ALLOWED_USER_ID=U0XXXXXXXXX SLACK_ALLOWED_USER_ID=U0XXXXXXXXX
SLACK_ALLOWED_CHANNEL_ID=C0XXXXXXXXX SLACK_ALLOWED_CHANNEL_ID=C0XXXXXXXXX
# PTY 설정 # PTY / tmux 연결 설정
DEFAULT_SHELL=claude TMUX_SESSION_NAME=claude
PTY_READ_TIMEOUT=5 PTY_READ_TIMEOUT=5
# 출력 버퍼 설정 # 출력 버퍼 설정

View File

@@ -5,11 +5,12 @@
## 동작 방식 ## 동작 방식
1. Slack에서 `/start-claude` 실행 1. 로컬에서 Claude를 tmux 세션으로 미리 실행 (`tmux new -s claude claude`)
2. 로컬에서 `claude` 프로세스(기본값)가 PTY로 시작됨 2. Slack에서 `/start-claude` 실행
3. Slack 채널 메시지가 CLI 입력으로 전달됨 3. 브릿지가 기존 tmux 세션에 연결해 pane 출력/입력을 중계
4. CLI 출력이 Slack으로 다시 4. Slack 채널 메시지가 CLI 입력으로
5. `/stop-claude`로 세션 종료 5. CLI 출력이 Slack으로 다시 전송됨
6. `/stop-claude`로 브릿지 연결 해제 (tmux 세션은 유지)
## 빠른 시작 ## 빠른 시작
@@ -34,7 +35,7 @@ cp .env.example .env
- `SLACK_ALLOWED_CHANNEL_ID` - `SLACK_ALLOWED_CHANNEL_ID`
선택 환경 변수: 선택 환경 변수:
- `DEFAULT_SHELL` (기본: `claude`) - `TMUX_SESSION_NAME` (기본: `claude`)
- `PTY_READ_TIMEOUT` (기본: `5`) - `PTY_READ_TIMEOUT` (기본: `5`)
- `OUTPUT_BUFFER_INTERVAL` (기본: `2.0`) - `OUTPUT_BUFFER_INTERVAL` (기본: `2.0`)
- `MAX_MESSAGE_LENGTH` (기본: `3000`) - `MAX_MESSAGE_LENGTH` (기본: `3000`)
@@ -55,6 +56,14 @@ cp .env.example .env
## 실행 ## 실행
먼저 로컬에서 Claude tmux 세션을 실행:
```bash
tmux new -s claude claude
```
그 다음 브릿지 실행:
```bash ```bash
lazy-enter lazy-enter
# 또는 # 또는
@@ -62,9 +71,9 @@ python -m lazy_enter
``` ```
실행 후 Slack의 허용된 채널에서: 실행 후 Slack의 허용된 채널에서:
- `/start-claude`: 세션 시작 - `/start-claude`: 기존 세션에 연결
- 일반 메시지 전송: Claude CLI로 입력 전달 - 일반 메시지 전송: Claude CLI로 입력 전달
- `/stop-claude`: 세션 종료 - `/stop-claude`: 브릿지 연결 해제 (세션 유지)
## 테스트 및 품질 점검 ## 테스트 및 품질 점검

View File

@@ -11,7 +11,6 @@ authors = [
dependencies = [ dependencies = [
"slack-bolt>=1.21.0", "slack-bolt>=1.21.0",
"slack-sdk>=3.33.0", "slack-sdk>=3.33.0",
"pexpect>=4.9.0",
"python-dotenv>=1.0.0", "python-dotenv>=1.0.0",
] ]

View File

@@ -8,6 +8,7 @@ import threading
import time import time
from lazy_enter.config import Config from lazy_enter.config import Config
from lazy_enter.output_filter import clean_terminal_output
from lazy_enter.pty_manager import PtyManager from lazy_enter.pty_manager import PtyManager
from lazy_enter.slack_handler import SlackHandler from lazy_enter.slack_handler import SlackHandler
@@ -24,6 +25,7 @@ class Bridge:
self._output_thread: threading.Thread | None = None self._output_thread: threading.Thread | None = None
self._running = False self._running = False
self._channel: str = self.config.allowed_channel_id self._channel: str = self.config.allowed_channel_id
self._last_sent_output: str = ""
self._last_input_at = time.monotonic() self._last_input_at = time.monotonic()
self._last_output_at = time.monotonic() self._last_output_at = time.monotonic()
self._input_idle_reported = False self._input_idle_reported = False
@@ -46,7 +48,7 @@ class Bridge:
self._input_idle_reported = False self._input_idle_reported = False
logger.info("입력 전달: %s", text) logger.info("입력 전달: %s", text)
else: else:
self.slack.send_message(channel, ":warning: 실행 중인 세션이 없습니다.") self.slack.send_message(channel, ":warning: 연결된 세션이 없습니다.")
@staticmethod @staticmethod
def _is_blocked_input(text: str) -> bool: def _is_blocked_input(text: str) -> bool:
@@ -71,16 +73,23 @@ class Bridge:
self._stop_session(channel) self._stop_session(channel)
def _start_session(self, channel: str) -> None: def _start_session(self, channel: str) -> None:
"""Claude Code 세션을 시작한다.""" """기존 Claude tmux 세션에 연결한다."""
if self.pty and self.pty.is_alive: if self.pty and self.pty.is_alive:
self.slack.send_message( self.slack.send_message(
channel, ":information_source: 이미 세션이 실행 중입니다." channel, ":information_source: 이미 세션에 연결되어 있습니다."
) )
return return
self._channel = channel self._channel = channel
self.pty = PtyManager(self.config.default_shell) self._last_sent_output = ""
self.pty = PtyManager(self.config.tmux_session_name)
try:
self.pty.start() self.pty.start()
except RuntimeError as exc:
self.pty = None
self.slack.send_message(channel, f":warning: {exc}")
return
self._running = True self._running = True
self._last_input_at = time.monotonic() self._last_input_at = time.monotonic()
self._last_output_at = time.monotonic() self._last_output_at = time.monotonic()
@@ -89,15 +98,16 @@ class Bridge:
self._output_thread = threading.Thread(target=self._poll_output, daemon=True) self._output_thread = threading.Thread(target=self._poll_output, daemon=True)
self._output_thread.start() self._output_thread.start()
self.slack.send_message(channel, ":rocket: Claude Code 세션이 시작되었습니다.") self.slack.send_message(channel, ":link: Claude 세션에 연결되었습니다.")
def _stop_session(self, channel: str) -> None: def _stop_session(self, channel: str) -> None:
"""Claude Code 세션을 종료한다.""" """브릿지 연결만 해제한다."""
self._running = False self._running = False
if self.pty: if self.pty:
self.pty.stop() self.pty.stop()
self.pty = None self.pty = None
self.slack.send_message(channel, ":stop_sign: 세션이 종료되었습니다.") self._last_sent_output = ""
self.slack.send_message(channel, ":electric_plug: 세션 연결이 해제되었습니다.")
def _poll_output(self) -> None: def _poll_output(self) -> None:
"""PTY 출력을 주기적으로 읽어 Slack으로 전송한다.""" """PTY 출력을 주기적으로 읽어 Slack으로 전송한다."""
@@ -106,16 +116,20 @@ class Bridge:
now = time.monotonic() now = time.monotonic()
output = self.pty.read_output(timeout=self.config.pty_read_timeout) output = self.pty.read_output(timeout=self.config.pty_read_timeout)
if output: if output:
buffer += output cleaned = clean_terminal_output(output)
if cleaned:
buffer += f"{cleaned}\n"
self._last_output_at = now self._last_output_at = now
self._output_idle_reported = False self._output_idle_reported = False
if buffer: if buffer:
# 메시지 길이 제한 적용 # 메시지 길이 제한 적용
message = buffer[: self.config.max_message_length] message = buffer[: self.config.max_message_length].rstrip()
if len(buffer) > self.config.max_message_length: if len(buffer) > self.config.max_message_length:
message += "\n... (truncated)" message += "\n... (truncated)"
if message and message != self._last_sent_output:
self.slack.send_message(self._channel, f"```\n{message}\n```") self.slack.send_message(self._channel, f"```\n{message}\n```")
self._last_sent_output = message
buffer = "" buffer = ""
output_idle = now - self._last_output_at output_idle = now - self._last_output_at
@@ -150,8 +164,8 @@ class Bridge:
if not self._running: if not self._running:
return return
# 프로세스가 예기치 않게 종료된 경우 # attach 프로세스가 예기치 않게 종료된 경우
self.slack.send_message(self._channel, ":warning: 프로세스가 종료되었습니다.") self.slack.send_message(self._channel, ":warning: 세션 연결이 종료되었습니다.")
def run(self) -> None: def run(self) -> None:
"""브릿지를 시작한다.""" """브릿지를 시작한다."""

View File

@@ -18,8 +18,8 @@ class Config:
allowed_user_id: str = os.getenv("SLACK_ALLOWED_USER_ID", "") allowed_user_id: str = os.getenv("SLACK_ALLOWED_USER_ID", "")
allowed_channel_id: str = os.getenv("SLACK_ALLOWED_CHANNEL_ID", "") allowed_channel_id: str = os.getenv("SLACK_ALLOWED_CHANNEL_ID", "")
# PTY # PTY / tmux attach
default_shell: str = os.getenv("DEFAULT_SHELL", "claude") tmux_session_name: str = os.getenv("TMUX_SESSION_NAME", "claude")
pty_read_timeout: int = int(os.getenv("PTY_READ_TIMEOUT", "5")) pty_read_timeout: int = int(os.getenv("PTY_READ_TIMEOUT", "5"))
# Buffer # Buffer

View File

@@ -0,0 +1,32 @@
"""PTY 출력에서 터미널 제어 시퀀스를 제거한다."""
from __future__ import annotations
import re
# ANSI CSI / OSC / DCS / 단일 ESC 시퀀스를 폭넓게 제거한다.
_ANSI_ESCAPE_RE = re.compile(
r"(?:\x1B\[[0-?]*[ -/]*[@-~])"
r"|(?:\x1B\][^\x1B\x07]*(?:\x07|\x1B\\))"
r"|(?:\x1BP[^\x1B]*(?:\x1B\\))"
r"|(?:\x1B[@-_])"
)
# ESC 문자가 유실된 상태로 남는 CSI 토큰(예: [38;2;153;153;153m) 제거.
_BROKEN_CSI_RE = re.compile(r"\[(?:[0-9;<=>?]|(?:\d+;))*\d*[A-Za-z]")
# C0 제어문자 중 개행/탭/캐리지리턴을 제외하고 제거.
_CONTROL_RE = re.compile(r"[\x00-\x08\x0B-\x1F\x7F]")
def clean_terminal_output(text: str) -> str:
"""Slack 전송용 텍스트를 정리한다."""
cleaned = _ANSI_ESCAPE_RE.sub("", text)
cleaned = _BROKEN_CSI_RE.sub("", cleaned)
cleaned = cleaned.replace("\r", "")
cleaned = _CONTROL_RE.sub("", cleaned)
# 제어 코드 제거 후 남는 빈 줄을 축소한다.
lines = [line.rstrip() for line in cleaned.splitlines()]
compact = "\n".join(lines).strip()
return compact

View File

@@ -1,56 +1,105 @@
"""pexpect 기반 PTY 프로세스 관리.""" """기존 tmux 세션의 pane을 캡처/입력하여 CLI 입출력을 중계한다."""
from __future__ import annotations from __future__ import annotations
import logging import logging
import subprocess
import pexpect
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class PtyManager: class PtyManager:
"""가상 터미널에서 CLI 프로세스를 생성하고 입력을 제어한다.""" """기존 tmux 세션의 pane을 읽고 입력을 보낸다."""
def __init__(self, command: str = "claude") -> None: def __init__(self, session_name: str = "claude") -> None:
self.command = command self.session_name = session_name
self._process: pexpect.spawn | None = None self._connected = False
self._last_capture: list[str] = []
@property @property
def is_alive(self) -> bool: def is_alive(self) -> bool:
return self._process is not None and self._process.isalive() return self._connected and self._session_exists()
def start(self) -> None: def _run_tmux(self, args: list[str]) -> subprocess.CompletedProcess[str]:
"""프로세스를 시작한다.""" return subprocess.run(
logger.info("프로세스 시작: %s", self.command) ["tmux", *args],
self._process = pexpect.spawn( check=False,
self.command, capture_output=True,
encoding="utf-8", text=True,
timeout=None,
) )
def _session_exists(self) -> bool:
return self._run_tmux(["has-session", "-t", self.session_name]).returncode == 0
def _ensure_session_exists(self) -> None:
"""대상 tmux 세션 존재 여부를 검증한다."""
result = self._run_tmux(["has-session", "-t", self.session_name])
if result.returncode != 0:
raise RuntimeError(
"tmux 세션이 없습니다. 먼저 실행하세요: "
f"tmux new -s {self.session_name} claude"
)
def _capture_lines(self) -> list[str]:
"""pane의 최근 출력 스냅샷을 줄 단위로 가져온다."""
result = self._run_tmux(
["capture-pane", "-p", "-t", self.session_name, "-S", "-200"]
)
if result.returncode != 0:
return []
return result.stdout.splitlines()
@staticmethod
def _diff_lines(previous: list[str], current: list[str]) -> list[str]:
"""이전 스냅샷 대비 새로 추가된 줄만 계산한다."""
if not previous:
return []
prefix = 0
for prev, curr in zip(previous, current):
if prev != curr:
break
prefix += 1
if prefix > 0:
return current[prefix:]
# 화면 스크롤/리프레시로 앞부분이 달라진 경우 tail overlap으로 보정.
max_overlap = min(len(previous), len(current))
for overlap in range(max_overlap, 0, -1):
if previous[-overlap:] == current[:overlap]:
return current[overlap:]
return current
def start(self) -> None:
"""기존 tmux 세션에 연결한다."""
self._ensure_session_exists()
logger.info("tmux 세션 연결: %s", self.session_name)
self._connected = True
self._last_capture = self._capture_lines()
def send(self, text: str) -> None: def send(self, text: str) -> None:
"""프로세스에 텍스트 입력을 전달한다.""" """세션에 텍스트 입력을 전달한다."""
if not self.is_alive: if not self.is_alive:
raise RuntimeError("프로세스가 실행 중이 아닙니다.") raise RuntimeError("프로세스가 실행 중이 아닙니다.")
assert self._process is not None
logger.debug("입력 전송: %s", text) logger.debug("입력 전송: %s", text)
self._process.sendline(text) result = self._run_tmux(["send-keys", "-t", self.session_name, text, "C-m"])
if result.returncode != 0:
raise RuntimeError("tmux 입력 전송에 실패했습니다.")
def read_output(self, timeout: int = 5) -> str: def read_output(self, timeout: int = 5) -> str:
"""프로세스의 출력을 읽는다.""" """세션 pane 출력을 읽는다."""
_ = timeout
if not self.is_alive: if not self.is_alive:
raise RuntimeError("프로세스가 실행 중이 아닙니다.") raise RuntimeError("프로세스가 실행 중이 아닙니다.")
assert self._process is not None current = self._capture_lines()
try: delta_lines = self._diff_lines(self._last_capture, current)
self._process.expect(pexpect.TIMEOUT, timeout=timeout) self._last_capture = current
except pexpect.TIMEOUT: return "\n".join(delta_lines).strip()
pass
return self._process.before or ""
def stop(self) -> None: def stop(self) -> None:
"""프로세스를 종료한다.""" """브릿지 연결만 종료한다(원격 세션은 유지)."""
if self._process is not None: if self._connected:
logger.info("프로세스 종료") logger.info("tmux 세션 연결 해제")
self._process.close(force=True) self._connected = False
self._process = None self._last_capture = []

View File

@@ -7,7 +7,7 @@ from lazy_enter.config import Config
def test_config_defaults(): def test_config_defaults():
config = Config() config = Config()
assert config.default_shell == "claude" assert config.tmux_session_name == "claude"
assert config.pty_read_timeout == 5 assert config.pty_read_timeout == 5
assert config.output_buffer_interval == 2.0 assert config.output_buffer_interval == 2.0
assert config.max_message_length == 3000 assert config.max_message_length == 3000

View File

@@ -0,0 +1,18 @@
"""output_filter 모듈 테스트."""
from lazy_enter.output_filter import clean_terminal_output
def test_clean_terminal_output_removes_ansi_sequences():
raw = "\x1b[?2004h\x1b[38;2;153;153;153mClaude Code\x1b[39m"
assert clean_terminal_output(raw) == "Claude Code"
def test_clean_terminal_output_removes_broken_csi_tokens():
raw = "[?2004h[38;2;153;153;153mhello[39m"
assert clean_terminal_output(raw) == "hello"
def test_clean_terminal_output_preserves_plain_text():
raw = "line1\nline2"
assert clean_terminal_output(raw) == "line1\nline2"

View File

@@ -1,22 +1,103 @@
"""PtyManager 테스트.""" """PtyManager 테스트."""
from __future__ import annotations
from collections.abc import Sequence
from dataclasses import dataclass
import pytest import pytest
from lazy_enter.pty_manager import PtyManager from lazy_enter.pty_manager import PtyManager
@dataclass
class Result:
returncode: int
stdout: str = ""
def test_initial_state(): def test_initial_state():
pty = PtyManager("echo hello") pty = PtyManager("claude")
assert not pty.is_alive assert not pty.is_alive
def test_start_and_stop(): def test_start_raises_when_tmux_session_missing(monkeypatch: pytest.MonkeyPatch):
pty = PtyManager("cat") def fake_run(
try: cmd: Sequence[str],
pty.start() check: bool,
except OSError as exc: capture_output: bool,
pytest.skip(f"PTY unavailable in this environment: {exc}") text: bool,
):
assert list(cmd) == ["tmux", "has-session", "-t", "claude"]
assert check is False
assert capture_output is True
assert text is True
return Result(returncode=1)
monkeypatch.setattr("lazy_enter.pty_manager.subprocess.run", fake_run)
pty = PtyManager("claude")
with pytest.raises(RuntimeError):
pty.start()
def test_start_send_read_and_stop(monkeypatch: pytest.MonkeyPatch):
calls: list[list[str]] = []
def fake_run(
cmd: Sequence[str],
check: bool,
capture_output: bool,
text: bool,
):
calls.append(list(cmd))
assert check is False
assert capture_output is True
assert text is True
if cmd[:4] == ["tmux", "has-session", "-t", "claude"]:
return Result(returncode=0)
if cmd[:7] == ["tmux", "capture-pane", "-p", "-t", "claude", "-S", "-200"]:
# 첫 캡처는 baseline, 두 번째 캡처에서 한 줄 증가.
if calls.count(list(cmd)) == 1:
return Result(returncode=0, stdout="line1\nline2\n")
return Result(returncode=0, stdout="line1\nline2\nline3\n")
if cmd[:6] == ["tmux", "send-keys", "-t", "claude", "hello", "C-m"]:
return Result(returncode=0)
return Result(returncode=1)
monkeypatch.setattr("lazy_enter.pty_manager.subprocess.run", fake_run)
pty = PtyManager("claude")
pty.start()
assert pty.is_alive assert pty.is_alive
pty.send("hello")
assert pty.read_output() == "line3"
pty.stop() pty.stop()
assert not pty.is_alive assert not pty.is_alive
def test_read_output_returns_empty_when_unchanged(monkeypatch: pytest.MonkeyPatch):
def fake_run(
cmd: Sequence[str],
check: bool,
capture_output: bool,
text: bool,
):
assert check is False
assert capture_output is True
assert text is True
if cmd[:4] == ["tmux", "has-session", "-t", "claude"]:
return Result(returncode=0)
if cmd[:7] == ["tmux", "capture-pane", "-p", "-t", "claude", "-S", "-200"]:
return Result(returncode=0, stdout="same\n")
return Result(returncode=1)
monkeypatch.setattr("lazy_enter.pty_manager.subprocess.run", fake_run)
pty = PtyManager("claude")
pty.start()
assert pty.read_output() == ""