Attach to existing tmux Claude session

This commit is contained in:
2026-02-17 04:04:28 +09:00
parent e616bebdb9
commit b83929fac2
7 changed files with 133 additions and 39 deletions

View File

@@ -48,7 +48,7 @@ class Bridge:
self._input_idle_reported = False
logger.info("입력 전달: %s", text)
else:
self.slack.send_message(channel, ":warning: 실행 중인 세션이 없습니다.")
self.slack.send_message(channel, ":warning: 연결된 세션이 없습니다.")
@staticmethod
def _is_blocked_input(text: str) -> bool:
@@ -73,17 +73,23 @@ class Bridge:
self._stop_session(channel)
def _start_session(self, channel: str) -> None:
"""Claude Code 세션을 시작한다."""
"""기존 Claude tmux 세션에 연결한다."""
if self.pty and self.pty.is_alive:
self.slack.send_message(
channel, ":information_source: 이미 세션이 실행 중입니다."
channel, ":information_source: 이미 세션에 연결되어 있습니다."
)
return
self._channel = channel
self._last_sent_output = ""
self.pty = PtyManager(self.config.default_shell)
self.pty.start()
self.pty = PtyManager(self.config.tmux_session_name)
try:
self.pty.start()
except RuntimeError as exc:
self.pty = None
self.slack.send_message(channel, f":warning: {exc}")
return
self._running = True
self._last_input_at = time.monotonic()
self._last_output_at = time.monotonic()
@@ -92,16 +98,16 @@ class Bridge:
self._output_thread = threading.Thread(target=self._poll_output, daemon=True)
self._output_thread.start()
self.slack.send_message(channel, ":rocket: Claude Code 세션이 시작되었습니다.")
self.slack.send_message(channel, ":link: Claude 세션에 연결되었습니다.")
def _stop_session(self, channel: str) -> None:
"""Claude Code 세션을 종료한다."""
"""브릿지 연결만 해제한다."""
self._running = False
if self.pty:
self.pty.stop()
self.pty = None
self._last_sent_output = ""
self.slack.send_message(channel, ":stop_sign: 세션이 종료되었습니다.")
self.slack.send_message(channel, ":electric_plug: 세션 연결이 해제되었습니다.")
def _poll_output(self) -> None:
"""PTY 출력을 주기적으로 읽어 Slack으로 전송한다."""
@@ -158,8 +164,8 @@ class Bridge:
if not self._running:
return
# 프로세스가 예기치 않게 종료된 경우
self.slack.send_message(self._channel, ":warning: 프로세스가 종료되었습니다.")
# attach 프로세스가 예기치 않게 종료된 경우
self.slack.send_message(self._channel, ":warning: 세션 연결이 종료되었습니다.")
def run(self) -> None:
"""브릿지를 시작한다."""

View File

@@ -18,8 +18,8 @@ class Config:
allowed_user_id: str = os.getenv("SLACK_ALLOWED_USER_ID", "")
allowed_channel_id: str = os.getenv("SLACK_ALLOWED_CHANNEL_ID", "")
# PTY
default_shell: str = os.getenv("DEFAULT_SHELL", "claude")
# PTY / tmux attach
tmux_session_name: str = os.getenv("TMUX_SESSION_NAME", "claude")
pty_read_timeout: int = int(os.getenv("PTY_READ_TIMEOUT", "5"))
# Buffer

View File

@@ -1,8 +1,9 @@
"""pexpect 기반 PTY 프로세스 관리."""
"""기존 tmux 세션에 attach하여 CLI 입출력을 중계한다."""
from __future__ import annotations
import logging
import subprocess
import pexpect
@@ -10,21 +11,37 @@ logger = logging.getLogger(__name__)
class PtyManager:
"""가상 터미널에서 CLI 프로세스를 생성하고 입출력을 제어한다."""
"""기존 tmux 세션에 attach하고 입출력을 제어한다."""
def __init__(self, command: str = "claude") -> None:
self.command = command
def __init__(self, session_name: str = "claude") -> None:
self.session_name = session_name
self._process: pexpect.spawn | None = None
@property
def is_alive(self) -> bool:
return self._process is not None and self._process.isalive()
def _ensure_session_exists(self) -> None:
"""attach 대상 tmux 세션 존재 여부를 검증한다."""
result = subprocess.run(
["tmux", "has-session", "-t", self.session_name],
check=False,
capture_output=True,
text=True,
)
if result.returncode != 0:
raise RuntimeError(
"tmux 세션이 없습니다. 먼저 실행하세요: "
f"tmux new -s {self.session_name} claude"
)
def start(self) -> None:
"""프로세스를 시작한다."""
logger.info("프로세스 시작: %s", self.command)
"""기존 tmux 세션에 attach한다."""
self._ensure_session_exists()
logger.info("tmux 세션 attach: %s", self.session_name)
self._process = pexpect.spawn(
self.command,
"tmux",
["attach-session", "-t", self.session_name],
encoding="utf-8",
timeout=None,
)
@@ -49,8 +66,8 @@ class PtyManager:
return self._process.before or ""
def stop(self) -> None:
"""프로세스를 종료한다."""
"""attach 연결만 종료한다(원격 세션은 유지)."""
if self._process is not None:
logger.info("프로세스 종료")
logger.info("tmux attach 종료")
self._process.close(force=True)
self._process = None