19 Commits

Author SHA1 Message Date
7bdeb23678 Merge pull request 'Allow /stop without target argument' (#13) from feat/unify-start-stop-args into main
Reviewed-on: #13
2026-02-17 06:19:47 +09:00
fa2dc0f295 Allow /stop without target argument 2026-02-17 06:17:36 +09:00
21b01ca055 Merge pull request 'Unify start/stop slash commands by target' (#12) from feat/unify-start-stop-args into main
Reviewed-on: #12
2026-02-17 06:12:59 +09:00
48c79c88c8 Unify start/stop slash commands by target 2026-02-17 06:08:43 +09:00
70020e3f71 Merge pull request 'Separate Slack text input from Enter submission' (#11) from fix/output-throttle into main
Reviewed-on: #11
2026-02-17 06:04:02 +09:00
6049ba7a8a Drop bang-only enter alias 2026-02-17 06:02:09 +09:00
b3ce2622e3 Add short enter aliases for Slack input 2026-02-17 06:00:34 +09:00
271376a7da Separate input from enter submission in Slack bridge 2026-02-17 05:55:49 +09:00
272c840913 Merge pull request 'Throttle and schedule buffered output delivery' (#10) from fix/output-throttle into main
Reviewed-on: #10
2026-02-17 05:52:36 +09:00
51e63a0f85 Throttle and schedule buffered output delivery 2026-02-17 05:46:43 +09:00
3effedf07d Merge pull request 'Fix Slack output dedup and chunk forwarding' (#9) from fix/slack-input-drop-logs into main
Reviewed-on: #9
2026-02-17 05:35:51 +09:00
6747ab703a Fix Slack output dedup and chunk forwarding 2026-02-17 05:34:26 +09:00
cfbe781210 Merge pull request 'Filter Codex TUI redraw noise in Slack output' (#8) from fix/codex-tui-noise-filter into main
Reviewed-on: #8
2026-02-17 04:45:45 +09:00
c2fe922105 Filter Codex TUI redraw noise in Slack output 2026-02-17 04:44:53 +09:00
0a49032f29 Merge pull request 'Document safer PR workflow rules' (#7) from chore/record-pr-workflow-safety into main
Reviewed-on: #7
2026-02-17 04:31:22 +09:00
cf1a6cb620 Resolve AGENTS.md merge conflict with main 2026-02-17 04:30:50 +09:00
909ef08e37 Document safe PR body flow and post-merge main sync 2026-02-17 04:28:54 +09:00
2fddb1ca24 Merge pull request 'Add Codex bridge target support' (#6) from feat/add-codex-bridge-support into main
Reviewed-on: #6
2026-02-17 04:26:25 +09:00
96188680fb Merge pull request 'Document Gitea workflow and forbid direct main commits' (#5) from chore/gitea-workflow-notes into main
Reviewed-on: #5
2026-02-17 04:20:07 +09:00
13 changed files with 860 additions and 78 deletions

View File

@@ -13,6 +13,8 @@ PTY_READ_TIMEOUT=5
# 출력 버퍼 설정
OUTPUT_BUFFER_INTERVAL=2.0
OUTPUT_SETTLE_SECONDS=4.0
OUTPUT_FLUSH_INTERVAL_SECONDS=15.0
MAX_MESSAGE_LENGTH=3000
# 상태 보고 / 재연결 설정

View File

@@ -67,9 +67,16 @@ Hard rule:
- `git branch --set-upstream-to=origin/<branch> <branch>`
- Create PR on Gitea:
- `tea pr create --base main --head <branch> --title "<title>" --description "<body>"`
- If PR body includes backticks (`` ` ``), slash commands, or markdown that can be shell-expanded, do not pass it directly in double quotes.
- Preferred safe flow:
- `cat > /tmp/pr_body.md <<'EOF'` ... `EOF`
- `tea pr create --base main --head <branch> --title "<title>" --description "$(cat /tmp/pr_body.md)"`
- If the body is malformed after creation, patch it with API:
- `tea api -X PATCH repos/{owner}/{repo}/pulls/<number> -F body=@/tmp/pr_body.md`
- Sync local main:
- `git checkout main`
- `git pull --ff-only`
- When user confirms the PR is merged, always run this sync immediately to keep local `main` up to date before any next task.
Safety:
- Do not commit or print tokens in logs/docs.

View File

@@ -8,11 +8,11 @@
1. 로컬에서 Claude 또는 Codex를 tmux 세션으로 미리 실행
- Claude: `tmux new -s claude claude`
- Codex: `tmux new -s codex codex`
2. Slack에서 `/start-claude` 또는 `/start-codex` 실행
2. Slack에서 `/start claude` 또는 `/start codex` 실행
3. 브릿지가 기존 tmux 세션에 attach
4. Slack 채널 메시지가 CLI 입력으로 전달됨
4. Slack 채널 메시지가 CLI 입력으로 전달됨 (기본: 엔터 미포함)
5. CLI 출력이 Slack으로 다시 전송됨
6. `/stop-claude`로 브릿지 연결 해제 (tmux 세션은 유지)
6. `/stop claude` 또는 `/stop codex`로 브릿지 연결 해제 (tmux 세션은 유지)
## 빠른 시작
@@ -37,10 +37,12 @@ cp .env.example .env
- `SLACK_ALLOWED_CHANNEL_ID`
선택 환경 변수:
- `TMUX_SESSION_NAME` (기본: `claude`, `/start-claude` 대상)
- `CODEX_TMUX_SESSION_NAME` (기본: `codex`, `/start-codex` 대상)
- `TMUX_SESSION_NAME` (기본: `claude`, `/start claude` 대상)
- `CODEX_TMUX_SESSION_NAME` (기본: `codex`, `/start codex` 대상)
- `PTY_READ_TIMEOUT` (기본: `5`)
- `OUTPUT_BUFFER_INTERVAL` (기본: `2.0`)
- `OUTPUT_SETTLE_SECONDS` (기본: `4.0`, 출력이 잠잠해진 뒤 전송 대기 시간)
- `OUTPUT_FLUSH_INTERVAL_SECONDS` (기본: `15.0`, 출력이 계속 이어질 때 강제 전송 주기)
- `MAX_MESSAGE_LENGTH` (기본: `3000`)
- `RECONNECT_DELAY_SECONDS` (기본: `5.0`, Socket Mode 재연결 대기 시간)
- `OUTPUT_IDLE_REPORT_SECONDS` (기본: `120`, 출력 정지 보고 임계값)
@@ -53,10 +55,8 @@ cp .env.example .env
- Socket Mode 활성화
- Bot Token Scopes에 `chat:write`, `commands` 추가
- Slash Commands 생성:
- `/start-claude`
- `/stop-claude`
- `/start-codex`
- `/stop-codex`
- `/start`
- `/stop`
- 앱을 워크스페이스에 설치 후 토큰을 `.env`에 반영
## 실행
@@ -78,9 +78,10 @@ python -m lazy_enter
```
실행 후 Slack의 허용된 채널에서:
- `/start-claude`, `/start-codex`: 기존 세션에 연결
- 일반 메시지 전송: 현재 연결된 CLI(Claude/Codex)로 입력 전달
- `/stop-claude`, `/stop-codex`: 브릿지 연결 해제 (세션 유지)
- `/start claude`, `/start codex`: 기존 세션에 연결
- 일반 메시지 전송: 현재 연결된 CLI(Claude/Codex)로 입력 전달 (엔터 미포함)
- `!e`, `!enter` 전송: 엔터 키만 전달 (현재 프롬프트 제출)
- `/stop claude`, `/stop codex`: 브릿지 연결 해제 (세션 유지)
## 테스트 및 품질 점검

View File

@@ -18,6 +18,8 @@ logger = logging.getLogger(__name__)
class Bridge:
"""Slack ↔ CLI 프로세스 간의 중계기."""
ENTER_COMMANDS = {"!e", "!enter"}
def __init__(self, config: Config | None = None) -> None:
self.config = config or Config()
self.slack = SlackHandler(self.config)
@@ -27,8 +29,10 @@ class Bridge:
self._channel: str = self.config.allowed_channel_id
self._active_target: str | None = None
self._last_sent_output: str = ""
self._last_sent_fingerprint: str | None = None
self._last_input_at = time.monotonic()
self._last_output_at = time.monotonic()
self._output_buffer_started_at: float | None = None
self._input_idle_reported = False
self._output_idle_reported = False
@@ -37,19 +41,32 @@ class Bridge:
def _handle_message(self, text: str, channel: str) -> None:
"""Slack 메시지를 PTY 프로세스로 전달한다."""
if not self.pty or not self.pty.is_alive:
self.slack.send_message(channel, ":warning: 연결된 세션이 없습니다.")
return
if text.strip().lower() in self.ENTER_COMMANDS:
self.pty.send_enter()
self._last_sent_output = ""
self._last_sent_fingerprint = None
self._last_input_at = time.monotonic()
self._input_idle_reported = False
logger.info("엔터 입력 전달")
return
if self._is_blocked_input(text):
self.slack.send_message(
channel, ":no_entry: 차단된 명령 패턴이 감지되었습니다."
)
return
if self.pty and self.pty.is_alive:
self.pty.send(text)
# 입력 이후 출력은 동일 문자열이어도 한 번 더 전달한다.
self._last_sent_output = ""
self._last_sent_fingerprint = None
self._last_input_at = time.monotonic()
self._input_idle_reported = False
logger.info("입력 전달: %s", text)
else:
self.slack.send_message(channel, ":warning: 연결된 세션이 없습니다.")
logger.info("입력 전달(엔터 미포함): %s", text)
@staticmethod
def _is_blocked_input(text: str) -> bool:
@@ -79,11 +96,10 @@ class Bridge:
def _handle_command(self, action: str, target: str, channel: str) -> None:
"""슬래시 커맨드를 처리한다."""
if action == "start":
if target not in {"claude", "codex"}:
self.slack.send_message(channel, ":warning: 지원하지 않는 대상입니다.")
return
if action == "start":
self._start_session(channel, target)
elif action == "stop":
self._stop_session(channel)
@@ -99,6 +115,7 @@ class Bridge:
self._channel = channel
self._active_target = target
self._last_sent_output = ""
self._last_sent_fingerprint = None
self.pty = PtyManager(
self._session_name_for_target(target),
cli_name=target,
@@ -113,6 +130,7 @@ class Bridge:
self._running = True
self._last_input_at = time.monotonic()
self._last_output_at = time.monotonic()
self._output_buffer_started_at = None
self._input_idle_reported = False
self._output_idle_reported = False
self._output_thread = threading.Thread(target=self._poll_output, daemon=True)
@@ -131,6 +149,8 @@ class Bridge:
self.pty = None
self._active_target = None
self._last_sent_output = ""
self._last_sent_fingerprint = None
self._output_buffer_started_at = None
self.slack.send_message(channel, ":electric_plug: 세션 연결이 해제되었습니다.")
def _poll_output(self) -> None:
@@ -138,23 +158,27 @@ class Bridge:
buffer = ""
while self._running and self.pty and self.pty.is_alive:
now = time.monotonic()
output = self.pty.read_output(timeout=self.config.pty_read_timeout)
if buffer and self._should_flush_output_buffer(now):
self._send_output_chunks(buffer)
buffer = ""
self._output_buffer_started_at = None
read_timeout = self._next_read_timeout(now, has_buffer=bool(buffer))
output = self.pty.read_output(timeout=read_timeout)
now = time.monotonic()
if output:
cleaned = clean_terminal_output(output)
if cleaned:
buffer += f"{cleaned}\n"
self._last_output_at = now
if self._output_buffer_started_at is None:
self._output_buffer_started_at = now
self._output_idle_reported = False
if buffer:
# 메시지 길이 제한 적용
message = buffer[: self.config.max_message_length].rstrip()
if len(buffer) > self.config.max_message_length:
message += "\n... (truncated)"
if message and message != self._last_sent_output:
self.slack.send_message(self._channel, f"```\n{message}\n```")
self._last_sent_output = message
if buffer and self._should_flush_output_buffer(now):
self._send_output_chunks(buffer)
buffer = ""
self._output_buffer_started_at = None
output_idle = now - self._last_output_at
if (
@@ -186,11 +210,128 @@ class Bridge:
time.sleep(self.config.output_buffer_interval)
if not self._running:
self._output_buffer_started_at = None
return
if buffer:
self._send_output_chunks(buffer)
self._output_buffer_started_at = None
# attach 프로세스가 예기치 않게 종료된 경우
self.slack.send_message(self._channel, ":warning: 세션 연결이 종료되었습니다.")
def _next_read_timeout(self, now: float, has_buffer: bool) -> float:
"""다음 PTY 읽기 타임아웃을 계산한다."""
base_timeout = max(0.0, float(self.config.pty_read_timeout))
if not has_buffer:
return base_timeout
deadline = self._next_output_flush_deadline()
if deadline is None:
return base_timeout
remaining = max(0.0, deadline - now)
return min(base_timeout, remaining)
def _next_output_flush_deadline(self) -> float | None:
"""버퍼 flush의 가장 이른 데드라인을 반환한다."""
deadlines: list[float] = []
settle_seconds = max(0.0, self.config.output_settle_seconds)
if settle_seconds > 0:
deadlines.append(self._last_output_at + settle_seconds)
flush_interval_seconds = max(0.0, self.config.output_flush_interval_seconds)
if self._output_buffer_started_at is not None:
if flush_interval_seconds == 0:
return self._output_buffer_started_at
deadlines.append(self._output_buffer_started_at + flush_interval_seconds)
if not deadlines:
return None
return min(deadlines)
def _should_flush_output_buffer(self, now: float) -> bool:
"""버퍼를 Slack으로 전송할 시점을 계산한다."""
deadline = self._next_output_flush_deadline()
return deadline is not None and now >= deadline
@staticmethod
def _split_message(text: str, max_length: int) -> list[str]:
"""긴 텍스트를 메시지 길이 제한에 맞게 분할한다."""
if max_length <= 0:
return [text] if text else []
lines = text.splitlines(keepends=True)
chunks: list[str] = []
current = ""
def flush() -> None:
nonlocal current
if current:
chunks.append(current)
current = ""
for line in lines:
if len(line) > max_length:
flush()
start = 0
while start < len(line):
piece = line[start : start + max_length]
chunks.append(piece)
start += max_length
continue
if len(current) + len(line) > max_length:
flush()
current += line
flush()
return chunks
def _send_output_chunks(self, text: str) -> None:
"""출력을 잘라서 Slack으로 순차 전송한다."""
chunks = self._split_message(text, self.config.max_message_length)
snapshot = "\n\x00".join(chunks)
fingerprint = self._output_fingerprint(snapshot)
if not chunks:
return
if (
self._last_sent_fingerprint is not None
and fingerprint == self._last_sent_fingerprint
):
return
for chunk in chunks:
if chunk.endswith("\n"):
message = f"```\n{chunk}```"
else:
message = f"```\n{chunk}\n```"
self.slack.send_message(self._channel, message)
self._last_sent_output = snapshot
self._last_sent_fingerprint = fingerprint
@staticmethod
def _output_fingerprint(text: str) -> str:
"""중복 전송 억제를 위한 정규화 지문을 생성한다."""
normalized_lines: list[str] = []
for raw_line in text.splitlines():
line = raw_line.rstrip()
if not line.strip():
continue
# tmux 상태줄의 시계/날짜 라인은 프레임마다 변할 수 있어 제외한다.
if re.fullmatch(
(
r'\s*\w*odex\]\s+\d+:[^\[]*'
r'\[\d+,\d+\]\s+".+"\s+\d{2}:\d{2}\s+\d{2}-[A-Za-z]{3}-\d{2}'
),
line,
):
continue
normalized_lines.append(line)
return "\n".join(normalized_lines)
def run(self) -> None:
"""브릿지를 시작한다."""
logging.basicConfig(

View File

@@ -25,6 +25,10 @@ class Config:
# Buffer
output_buffer_interval: float = float(os.getenv("OUTPUT_BUFFER_INTERVAL", "2.0"))
output_settle_seconds: float = float(os.getenv("OUTPUT_SETTLE_SECONDS", "4.0"))
output_flush_interval_seconds: float = float(
os.getenv("OUTPUT_FLUSH_INTERVAL_SECONDS", "15.0")
)
max_message_length: int = int(os.getenv("MAX_MESSAGE_LENGTH", "3000"))
# Status reporting / reconnect

View File

@@ -15,18 +15,67 @@ _ANSI_ESCAPE_RE = re.compile(
# ESC 문자가 유실된 상태로 남는 CSI 토큰(예: [38;2;153;153;153m) 제거.
_BROKEN_CSI_RE = re.compile(r"\[(?:[0-9;<=>?]|(?:\d+;))*\d*[A-Za-z]")
# ESC(B 같은 문자셋 지정 시퀀스가 ESC 유실 후 남긴 토큰 제거.
_BROKEN_CHARSET_RE = re.compile(r"[\(\)][B0]")
# C0 제어문자 중 개행/탭/캐리지리턴을 제외하고 제거.
_CONTROL_RE = re.compile(r"[\x00-\x08\x0B-\x1F\x7F]")
_CODEX_TUI_LINE_PATTERNS = (
re.compile(r"openai codex", re.IGNORECASE),
re.compile(r"/model to change", re.IGNORECASE),
re.compile(r"^\s*[|│]\s*model:", re.IGNORECASE),
re.compile(r"^\s*[|│]\s*directory:", re.IGNORECASE),
re.compile(r"^\s*tip:\s*new\s+codex", re.IGNORECASE),
re.compile(r"find and fix a bug in @filename", re.IGNORECASE),
re.compile(r"\?\s*for shortcuts", re.IGNORECASE),
re.compile(r"\bcontext left\b", re.IGNORECASE),
re.compile(r"^\s*\w*odex\]\s+\d+:", re.IGNORECASE),
)
def _is_box_border_line(line: str) -> bool:
stripped = line.strip()
if not stripped:
return False
# ESC(B 유실 잔재가 앞에 붙는 경우(예: =╭────╮)도 border로 본다.
stripped = stripped.lstrip("= ")
return all(ch in "╭╮╰╯─│┌┐└┘├┤┬┴┼" for ch in stripped)
def _is_box_empty_line(line: str) -> bool:
stripped = line.strip()
if not stripped:
return False
return bool(re.fullmatch(r"[│|]\s+[│|]", stripped))
def _is_noise_line(line: str) -> bool:
if _is_box_border_line(line):
return True
if _is_box_empty_line(line):
return True
return any(pattern.search(line) for pattern in _CODEX_TUI_LINE_PATTERNS)
def clean_terminal_output(text: str) -> str:
"""Slack 전송용 텍스트를 정리한다."""
cleaned = _ANSI_ESCAPE_RE.sub("", text)
cleaned = _BROKEN_CSI_RE.sub("", cleaned)
cleaned = _BROKEN_CHARSET_RE.sub("", cleaned)
cleaned = cleaned.replace("\r", "")
cleaned = _CONTROL_RE.sub("", cleaned)
# 제어 코드 제거 후 남는 빈 줄을 축소한다.
lines = [line.rstrip() for line in cleaned.splitlines()]
lines: list[str] = []
for raw_line in cleaned.splitlines():
line = raw_line.rstrip()
if not line:
continue
if _is_noise_line(line):
continue
if lines and lines[-1] == line:
continue
lines.append(line)
compact = "\n".join(lines).strip()
return compact

View File

@@ -47,15 +47,22 @@ class PtyManager:
timeout=None,
)
def send(self, text: str) -> None:
def send(self, text: str, submit: bool = False) -> None:
"""프로세스에 텍스트 입력을 전달한다."""
if not self.is_alive:
raise RuntimeError("프로세스가 실행 중이 아닙니다.")
assert self._process is not None
logger.debug("입력 전송: %s", text)
if submit:
self._process.sendline(text)
return
self._process.send(text)
def read_output(self, timeout: int = 5) -> str:
def send_enter(self) -> None:
"""엔터 키 입력만 전송한다."""
self.send("", submit=True)
def read_output(self, timeout: float = 5) -> str:
"""프로세스의 출력을 읽는다."""
if not self.is_alive:
raise RuntimeError("프로세스가 실행 중이 아닙니다.")

View File

@@ -58,8 +58,31 @@ class SlackHandler:
if self._on_message_callback:
self._on_message_callback(text, channel_id)
@self.app.command("/start-claude")
def handle_start_claude(ack: Callable[..., None], body: dict) -> None:
@self.app.command("/start")
def handle_start(ack: Callable[..., None], body: dict) -> None:
ack()
user_id = body.get("user_id", "")
channel_id = body.get("channel_id", "")
target = self._parse_target(body.get("text", ""))
if not self._is_authorized(user_id, channel_id):
return
if target is None:
self.send_message(
channel_id,
(
":warning: 대상은 `claude` 또는 `codex`여야 합니다. "
"예: `/start codex`"
),
)
return
if self._on_command_callback:
self._on_command_callback("start", target, channel_id)
@self.app.command("/stop")
def handle_stop(ack: Callable[..., None], body: dict) -> None:
ack()
user_id = body.get("user_id", "")
channel_id = body.get("channel_id", "")
@@ -68,43 +91,15 @@ class SlackHandler:
return
if self._on_command_callback:
self._on_command_callback("start", "claude", channel_id)
self._on_command_callback("stop", "", channel_id)
@self.app.command("/stop-claude")
def handle_stop_claude(ack: Callable[..., None], body: dict) -> None:
ack()
user_id = body.get("user_id", "")
channel_id = body.get("channel_id", "")
if not self._is_authorized(user_id, channel_id):
return
if self._on_command_callback:
self._on_command_callback("stop", "claude", channel_id)
@self.app.command("/start-codex")
def handle_start_codex(ack: Callable[..., None], body: dict) -> None:
ack()
user_id = body.get("user_id", "")
channel_id = body.get("channel_id", "")
if not self._is_authorized(user_id, channel_id):
return
if self._on_command_callback:
self._on_command_callback("start", "codex", channel_id)
@self.app.command("/stop-codex")
def handle_stop_codex(ack: Callable[..., None], body: dict) -> None:
ack()
user_id = body.get("user_id", "")
channel_id = body.get("channel_id", "")
if not self._is_authorized(user_id, channel_id):
return
if self._on_command_callback:
self._on_command_callback("stop", "codex", channel_id)
@staticmethod
def _parse_target(text: str) -> str | None:
"""슬래시 커맨드 인자에서 대상을 파싱한다."""
first_token = text.strip().split(maxsplit=1)[0].lower() if text.strip() else ""
if first_token in {"claude", "codex"}:
return first_token
return None
def on_message(self, callback: Callable[[str, str], None]) -> None:
"""메시지 수신 콜백을 등록한다."""

View File

@@ -45,6 +45,8 @@ class FakePtyManager:
self.session_name = session_name
self.cli_name = cli_name
self._alive = False
self.sent_inputs: list[str] = []
self.enter_count = 0
FakePtyManager.instances.append(self)
@property
@@ -57,10 +59,13 @@ class FakePtyManager:
def stop(self) -> None:
self._alive = False
def send(self, _text: str) -> None:
return None
def send(self, text: str) -> None:
self.sent_inputs.append(text)
def read_output(self, timeout: int = 5) -> str:
def send_enter(self) -> None:
self.enter_count += 1
def read_output(self, timeout: float = 5) -> str:
return ""
@@ -104,6 +109,18 @@ def test_start_codex_routes_to_codex_session(monkeypatch) -> None:
)
def test_start_session_resets_output_dedup_state(monkeypatch) -> None:
FakePtyManager.instances.clear()
bridge = _make_bridge(monkeypatch)
bridge._last_sent_output = "stale"
bridge._last_sent_fingerprint = "stale-fp"
bridge._handle_command("start", "codex", "C1")
assert bridge._last_sent_output == ""
assert bridge._last_sent_fingerprint is None
def test_unknown_target_is_rejected(monkeypatch) -> None:
bridge = _make_bridge(monkeypatch)
@@ -113,3 +130,330 @@ def test_unknown_target_is_rejected(monkeypatch) -> None:
"C1",
":warning: 지원하지 않는 대상입니다.",
)
def test_stop_command_ignores_empty_target(monkeypatch) -> None:
FakePtyManager.instances.clear()
bridge = _make_bridge(monkeypatch)
bridge._handle_command("start", "codex", "C1")
bridge._handle_command("stop", "", "C1")
assert bridge.pty is None
assert bridge._active_target is None
assert bridge.slack.sent_messages[-1] == (
"C1",
":electric_plug: 세션 연결이 해제되었습니다.",
)
def test_handle_message_resets_last_sent_output_after_input(monkeypatch) -> None:
FakePtyManager.instances.clear()
bridge = _make_bridge(monkeypatch)
bridge._handle_command("start", "codex", "C1")
pty = FakePtyManager.instances[-1]
bridge._last_sent_output = "previous output"
bridge._handle_message("status", "C1")
assert pty.sent_inputs[-1] == "status"
assert bridge._last_sent_output == ""
assert bridge._last_sent_fingerprint is None
def test_handle_message_enter_command_sends_enter_only(monkeypatch) -> None:
FakePtyManager.instances.clear()
bridge = _make_bridge(monkeypatch)
bridge._handle_command("start", "codex", "C1")
pty = FakePtyManager.instances[-1]
bridge._handle_message("!enter", "C1")
assert pty.sent_inputs == []
assert pty.enter_count == 1
def test_handle_message_short_enter_alias_sends_enter_only(monkeypatch) -> None:
FakePtyManager.instances.clear()
bridge = _make_bridge(monkeypatch)
bridge._handle_command("start", "codex", "C1")
pty = FakePtyManager.instances[-1]
bridge._handle_message("!e", "C1")
assert pty.sent_inputs == []
assert pty.enter_count == 1
def test_handle_message_bang_is_plain_input(monkeypatch) -> None:
FakePtyManager.instances.clear()
bridge = _make_bridge(monkeypatch)
bridge._handle_command("start", "codex", "C1")
pty = FakePtyManager.instances[-1]
bridge._handle_message("!", "C1")
assert pty.sent_inputs == ["!"]
assert pty.enter_count == 0
def test_split_message_preserves_all_content(monkeypatch) -> None:
bridge = _make_bridge(monkeypatch)
chunks = bridge._split_message("line1\nline2\nline3", max_length=7)
assert "".join(chunks) == "line1\nline2\nline3"
def test_split_message_preserves_trailing_whitespace_and_blank_line(
monkeypatch,
) -> None:
bridge = _make_bridge(monkeypatch)
text = "ab \n\ncd "
chunks = bridge._split_message(text, max_length=4)
assert "".join(chunks) == text
def test_send_output_chunks_sends_multiple_messages(monkeypatch) -> None:
bridge = _make_bridge(monkeypatch)
bridge._channel = "C1"
bridge._send_output_chunks("1234567890")
assert bridge.slack.sent_messages == [
("C1", "```\n1234567890\n```"),
]
bridge.config.max_message_length = 4
bridge.slack.sent_messages.clear()
bridge._send_output_chunks("abcdefghij")
assert bridge.slack.sent_messages == [
("C1", "```\nabcd\n```"),
("C1", "```\nefgh\n```"),
("C1", "```\nij\n```"),
]
def test_send_output_chunks_skips_duplicate_snapshot(monkeypatch) -> None:
bridge = _make_bridge(monkeypatch)
bridge._channel = "C1"
bridge.config.max_message_length = 4
bridge._send_output_chunks("abcdefghij")
first = list(bridge.slack.sent_messages)
bridge._send_output_chunks("abcdefghij")
assert bridge.slack.sent_messages == first
def test_send_output_chunks_skips_duplicate_with_volatile_status_line(
monkeypatch,
) -> None:
bridge = _make_bridge(monkeypatch)
bridge._channel = "C1"
bridge.config.max_message_length = 1000
first_output = (
"Would you like to make the following edits?\n"
"src/main.py (+68 -1)\n"
'odex] 0:node* [0,0] "jihoson-home" 05:12 17-Feb-26\n'
"Press enter to confirm or esc to cancel\n"
)
second_output = (
"Would you like to make the following edits?\n"
"src/main.py (+68 -1)\n"
'odex] 0:node* [0,0] "jihoson-home" 05:13 17-Feb-26\n'
"Press enter to confirm or esc to cancel\n"
)
bridge._send_output_chunks(first_output)
first = list(bridge.slack.sent_messages)
bridge._send_output_chunks(second_output)
assert bridge.slack.sent_messages == first
def test_send_output_chunks_sends_first_when_fingerprint_is_empty(monkeypatch) -> None:
bridge = _make_bridge(monkeypatch)
bridge._channel = "C1"
volatile_only = 'odex] 0:node* [0,0] "jihoson-home" 05:12 17-Feb-26\n'
bridge._send_output_chunks(volatile_only)
assert bridge.slack.sent_messages == [("C1", f"```\n{volatile_only.rstrip()}\n```")]
bridge._send_output_chunks(volatile_only)
assert bridge.slack.sent_messages == [("C1", f"```\n{volatile_only.rstrip()}\n```")]
def test_send_output_chunks_keeps_non_tmux_timestamp_lines(monkeypatch) -> None:
bridge = _make_bridge(monkeypatch)
bridge._channel = "C1"
bridge.config.max_message_length = 1000
first_output = (
"report generated at 05:12 17-Feb-26\n"
"count=10\n"
)
second_output = (
"report generated at 05:13 17-Feb-26\n"
"count=10\n"
)
bridge._send_output_chunks(first_output)
bridge._send_output_chunks(second_output)
assert bridge.slack.sent_messages == [
("C1", f"```\n{first_output.rstrip()}\n```"),
("C1", f"```\n{second_output.rstrip()}\n```"),
]
def test_send_output_chunks_preserves_whitespace_signals(monkeypatch) -> None:
bridge = _make_bridge(monkeypatch)
bridge._channel = "C1"
bridge.config.max_message_length = 1000
first_output = "def f():\n return 1\n"
second_output = "def f():\n return 1\n"
bridge._send_output_chunks(first_output)
bridge._send_output_chunks(second_output)
assert bridge.slack.sent_messages == [
("C1", f"```\n{first_output.rstrip()}\n```"),
("C1", f"```\n{second_output.rstrip()}\n```"),
]
def test_send_output_chunks_keeps_standalone_time_lines(monkeypatch) -> None:
bridge = _make_bridge(monkeypatch)
bridge._channel = "C1"
bridge.config.max_message_length = 1000
bridge._send_output_chunks("[5:12 AM]\n")
bridge._send_output_chunks("[5:13 AM]\n")
assert bridge.slack.sent_messages == [
("C1", "```\n[5:12 AM]\n```"),
("C1", "```\n[5:13 AM]\n```"),
]
def test_send_output_chunks_keeps_non_tmux_status_like_lines(monkeypatch) -> None:
bridge = _make_bridge(monkeypatch)
bridge._channel = "C1"
bridge.config.max_message_length = 1000
first_output = '[2,3] "job-runner" 05:12 17-Feb-26\n'
second_output = '[2,3] "job-runner" 05:13 17-Feb-26\n'
bridge._send_output_chunks(first_output)
bridge._send_output_chunks(second_output)
assert bridge.slack.sent_messages == [
("C1", "```\n[2,3] \"job-runner\" 05:12 17-Feb-26\n```"),
("C1", "```\n[2,3] \"job-runner\" 05:13 17-Feb-26\n```"),
]
def test_should_flush_output_buffer_when_settled(monkeypatch) -> None:
bridge = _make_bridge(monkeypatch)
bridge.config.output_settle_seconds = 4.0
bridge.config.output_flush_interval_seconds = 15.0
bridge._last_output_at = 10.0
bridge._output_buffer_started_at = 2.0
assert bridge._should_flush_output_buffer(14.1) is True
def test_should_flush_output_buffer_when_flush_interval_elapsed(monkeypatch) -> None:
bridge = _make_bridge(monkeypatch)
bridge.config.output_settle_seconds = 4.0
bridge.config.output_flush_interval_seconds = 15.0
bridge._last_output_at = 20.0
bridge._output_buffer_started_at = 2.0
assert bridge._should_flush_output_buffer(17.1) is True
def test_should_flush_output_buffer_false_during_active_stream(monkeypatch) -> None:
bridge = _make_bridge(monkeypatch)
bridge.config.output_settle_seconds = 4.0
bridge.config.output_flush_interval_seconds = 15.0
bridge._last_output_at = 19.0
bridge._output_buffer_started_at = 10.0
assert bridge._should_flush_output_buffer(20.0) is False
def test_poll_output_skips_final_flush_after_intentional_stop(monkeypatch) -> None:
bridge = _make_bridge(monkeypatch)
bridge._channel = "C1"
bridge.config.output_settle_seconds = 9999.0
bridge.config.output_flush_interval_seconds = 9999.0
bridge.config.output_buffer_interval = 0.0
pty = FakePtyManager("codex-room", cli_name="codex")
pty._alive = True
bridge.pty = pty
bridge._running = True
sent_buffers: list[str] = []
monkeypatch.setattr(bridge, "_send_output_chunks", sent_buffers.append)
def _read_output(timeout: float = 5) -> str:
bridge._running = False
return "planning update"
monkeypatch.setattr(pty, "read_output", _read_output)
bridge._poll_output()
assert sent_buffers == []
def test_next_read_timeout_is_capped_by_flush_deadline(monkeypatch) -> None:
bridge = _make_bridge(monkeypatch)
bridge.config.pty_read_timeout = 5
bridge.config.output_settle_seconds = 4.0
bridge.config.output_flush_interval_seconds = 15.0
bridge._last_output_at = 100.0
bridge._output_buffer_started_at = 95.0
timeout = bridge._next_read_timeout(103.6, has_buffer=True)
assert 0.39 <= timeout <= 0.41
def test_poll_output_uses_shorter_timeout_near_settle_deadline(monkeypatch) -> None:
bridge = _make_bridge(monkeypatch)
bridge._channel = "C1"
bridge.config.pty_read_timeout = 5
bridge.config.output_settle_seconds = 4.0
bridge.config.output_flush_interval_seconds = 15.0
bridge.config.output_buffer_interval = 0.0
bridge.config.output_idle_report_seconds = 0
bridge.config.input_idle_report_seconds = 0
pty = FakePtyManager("codex-room", cli_name="codex")
pty._alive = True
bridge.pty = pty
bridge._running = True
observed_timeouts: list[float] = []
call_count = 0
def _read_output(timeout: float = 5) -> str:
nonlocal call_count
observed_timeouts.append(timeout)
if call_count == 0:
call_count += 1
return "first chunk"
bridge._running = False
return ""
monkeypatch.setattr(pty, "read_output", _read_output)
bridge._poll_output()
assert len(observed_timeouts) == 2
assert observed_timeouts[0] == 5
assert 0.0 <= observed_timeouts[1] < 5

View File

@@ -11,6 +11,8 @@ def test_config_defaults():
assert config.codex_tmux_session_name == "codex"
assert config.pty_read_timeout == 5
assert config.output_buffer_interval == 2.0
assert config.output_settle_seconds == 4.0
assert config.output_flush_interval_seconds == 15.0
assert config.max_message_length == 3000
assert config.reconnect_delay_seconds == 5.0
assert config.output_idle_report_seconds == 120

View File

@@ -16,3 +16,28 @@ def test_clean_terminal_output_removes_broken_csi_tokens():
def test_clean_terminal_output_preserves_plain_text():
raw = "line1\nline2"
assert clean_terminal_output(raw) == "line1\nline2"
def test_clean_terminal_output_removes_broken_charset_tokens():
raw = "=(Bhello(B"
assert clean_terminal_output(raw) == "=hello"
def test_clean_terminal_output_filters_codex_tui_redraw_noise():
raw = """=(B╭─────────────────────────────────────────────╮(B
│ >_ OpenAI Codex (v0.101.0) │(B
│ model: gpt-5.3-codex /model to change │(B
│ directory: ~/repos/The-Ouroboros │(B
╰─────────────────────────────────────────────╯(B
Tip: New Codex is included in your plan for free
Find and fix a bug in @filename
? for shortcuts
100% context left
Assistant: 작업을 시작합니다.
"""
assert clean_terminal_output(raw) == "Assistant: 작업을 시작합니다."
def test_clean_terminal_output_removes_box_empty_line_noise():
raw = "│ │\n Explain this codebase\n"
assert clean_terminal_output(raw) == " Explain this codebase"

View File

@@ -13,11 +13,18 @@ class FakeSpawn:
def __init__(self, *_args: object, **_kwargs: object) -> None:
self.before = ""
self._alive = True
self.sent: list[str] = []
self.sentline: list[str] = []
def isalive(self) -> bool:
return self._alive
def sendline(self, _text: str) -> None:
self.sentline.append(_text)
return None
def send(self, _text: str) -> None:
self.sent.append(_text)
return None
def expect(self, *_args: object, **_kwargs: object) -> None:
@@ -82,3 +89,33 @@ def test_start_and_stop_attach(monkeypatch: pytest.MonkeyPatch):
pty.stop()
assert not pty.is_alive
def test_send_and_send_enter_are_separated(monkeypatch: pytest.MonkeyPatch):
def fake_run(
_cmd: Sequence[str],
check: bool,
capture_output: bool,
text: bool,
):
assert check is False
assert capture_output is True
assert text is True
class Result:
returncode = 0
return Result()
monkeypatch.setattr("lazy_enter.pty_manager.subprocess.run", fake_run)
monkeypatch.setattr("lazy_enter.pty_manager.pexpect.spawn", FakeSpawn)
pty = PtyManager("claude")
pty.start()
assert pty._process is not None
pty.send("status")
pty.send_enter()
assert pty._process.sent == ["status"]
assert pty._process.sentline == [""]

168
tests/test_slack_handler.py Normal file
View File

@@ -0,0 +1,168 @@
"""Slack 슬래시 커맨드 파싱 테스트."""
from __future__ import annotations
from collections.abc import Callable
from lazy_enter.config import Config
class FakeClient:
def __init__(self) -> None:
self.messages: list[dict[str, str | None]] = []
def chat_postMessage( # noqa: N802
self, channel: str, text: str, thread_ts: str | None = None
) -> None:
self.messages.append(
{
"channel": channel,
"text": text,
"thread_ts": thread_ts,
}
)
class FakeApp:
def __init__(self, token: str) -> None:
self.token = token
self.client = FakeClient()
self.command_handlers: dict[str, Callable[..., None]] = {}
self.event_handlers: dict[str, Callable[..., None]] = {}
def event(self, name: str) -> Callable[[Callable[..., None]], Callable[..., None]]:
def decorator(func: Callable[..., None]) -> Callable[..., None]:
self.event_handlers[name] = func
return func
return decorator
def command(
self, name: str
) -> Callable[[Callable[..., None]], Callable[..., None]]:
def decorator(func: Callable[..., None]) -> Callable[..., None]:
self.command_handlers[name] = func
return func
return decorator
class FakeSocketModeHandler:
def __init__(self, app: FakeApp, app_token: str) -> None:
self.app = app
self.app_token = app_token
def start(self) -> None:
return None
def close(self) -> None:
return None
def _build_handler(monkeypatch):
from lazy_enter import slack_handler as slack_handler_module
monkeypatch.setattr(slack_handler_module, "App", FakeApp)
monkeypatch.setattr(
slack_handler_module, "SocketModeHandler", FakeSocketModeHandler
)
config = Config()
config.allowed_user_id = "U1"
config.allowed_channel_id = "C1"
return slack_handler_module.SlackHandler(config)
def _capture_commands(handler) -> list[tuple[str, str, str]]:
called: list[tuple[str, str, str]] = []
def callback(action: str, target: str, channel: str) -> None:
called.append((action, target, channel))
handler.on_command(callback)
return called
def test_start_command_routes_target_from_text(monkeypatch) -> None:
handler = _build_handler(monkeypatch)
called = _capture_commands(handler)
acked: list[bool] = []
start_handler = handler.app.command_handlers["/start"]
start_handler(
lambda: acked.append(True),
{"user_id": "U1", "channel_id": "C1", "text": "codex"},
)
assert acked == [True]
assert called == [("start", "codex", "C1")]
def test_start_command_normalizes_target(monkeypatch) -> None:
handler = _build_handler(monkeypatch)
called = _capture_commands(handler)
start_handler = handler.app.command_handlers["/start"]
start_handler(
lambda: None,
{"user_id": "U1", "channel_id": "C1", "text": " Claude now "},
)
assert called == [("start", "claude", "C1")]
def test_stop_command_ignores_target_text(monkeypatch) -> None:
handler = _build_handler(monkeypatch)
called = _capture_commands(handler)
stop_handler = handler.app.command_handlers["/stop"]
stop_handler(
lambda: None,
{"user_id": "U1", "channel_id": "C1", "text": "claude"},
)
assert called == [("stop", "", "C1")]
def test_stop_command_without_target_routes_immediately(monkeypatch) -> None:
handler = _build_handler(monkeypatch)
called = _capture_commands(handler)
stop_handler = handler.app.command_handlers["/stop"]
stop_handler(
lambda: None,
{"user_id": "U1", "channel_id": "C1", "text": ""},
)
assert called == [("stop", "", "C1")]
assert handler.app.client.messages == []
def test_invalid_target_sends_warning_message(monkeypatch) -> None:
handler = _build_handler(monkeypatch)
called = _capture_commands(handler)
start_handler = handler.app.command_handlers["/start"]
start_handler(
lambda: None,
{"user_id": "U1", "channel_id": "C1", "text": "gpt"},
)
assert called == []
assert handler.app.client.messages[-1]["channel"] == "C1"
assert "claude" in str(handler.app.client.messages[-1]["text"])
assert "codex" in str(handler.app.client.messages[-1]["text"])
def test_unauthorized_command_is_ignored(monkeypatch) -> None:
handler = _build_handler(monkeypatch)
called = _capture_commands(handler)
start_handler = handler.app.command_handlers["/start"]
start_handler(
lambda: None,
{"user_id": "U2", "channel_id": "C1", "text": "codex"},
)
assert called == []
assert handler.app.client.messages == []