Separate input from enter submission in Slack bridge

This commit is contained in:
2026-02-17 05:55:49 +09:00
parent 51e63a0f85
commit 271376a7da
5 changed files with 88 additions and 14 deletions

View File

@@ -18,6 +18,8 @@ logger = logging.getLogger(__name__)
class Bridge:
"""Slack ↔ CLI 프로세스 간의 중계기."""
ENTER_COMMAND = "!enter"
def __init__(self, config: Config | None = None) -> None:
self.config = config or Config()
self.slack = SlackHandler(self.config)
@@ -39,22 +41,32 @@ class Bridge:
def _handle_message(self, text: str, channel: str) -> None:
"""Slack 메시지를 PTY 프로세스로 전달한다."""
if not self.pty or not self.pty.is_alive:
self.slack.send_message(channel, ":warning: 연결된 세션이 없습니다.")
return
if text.strip().lower() == self.ENTER_COMMAND:
self.pty.send_enter()
self._last_sent_output = ""
self._last_sent_fingerprint = None
self._last_input_at = time.monotonic()
self._input_idle_reported = False
logger.info("엔터 입력 전달")
return
if self._is_blocked_input(text):
self.slack.send_message(
channel, ":no_entry: 차단된 명령 패턴이 감지되었습니다."
)
return
if self.pty and self.pty.is_alive:
self.pty.send(text)
# 입력 이후 출력은 동일 문자열이어도 한 번 더 전달한다.
self._last_sent_output = ""
self._last_sent_fingerprint = None
self._last_input_at = time.monotonic()
self._input_idle_reported = False
logger.info("입력 전달: %s", text)
else:
self.slack.send_message(channel, ":warning: 연결된 세션이 없습니다.")
self.pty.send(text)
# 입력 이후 출력은 동일 문자열이어도 한 번 더 전달한다.
self._last_sent_output = ""
self._last_sent_fingerprint = None
self._last_input_at = time.monotonic()
self._input_idle_reported = False
logger.info("입력 전달(엔터 미포함): %s", text)
@staticmethod
def _is_blocked_input(text: str) -> bool:

View File

@@ -47,13 +47,20 @@ class PtyManager:
timeout=None,
)
def send(self, text: str) -> None:
def send(self, text: str, submit: bool = False) -> None:
"""프로세스에 텍스트 입력을 전달한다."""
if not self.is_alive:
raise RuntimeError("프로세스가 실행 중이 아닙니다.")
assert self._process is not None
logger.debug("입력 전송: %s", text)
self._process.sendline(text)
if submit:
self._process.sendline(text)
return
self._process.send(text)
def send_enter(self) -> None:
"""엔터 키 입력만 전송한다."""
self.send("", submit=True)
def read_output(self, timeout: float = 5) -> str:
"""프로세스의 출력을 읽는다."""