Files
bot-bottle/bot_bottle/cli/supervise.py
T
didericis-claude c12fc1c098
lint / lint (push) Successful in 1m22s
test / unit (pull_request) Successful in 30s
test / integration (pull_request) Successful in 41s
feat(supervise)!: remove egress-block MCP tool and runtime route-mutation
Drops `egress-block` from the supervise sidecar, removes
`_merge_single_route`, `add_route`, and `apply_routes_change` from
egress_apply.py, and strips the proposal/approve/reject flow for egress
from the supervise CLI. The list-egress-routes and capability-block tools
are unaffected. Tests updated throughout.

Closes #198
2026-06-06 16:19:55 -04:00

516 lines
16 KiB
Python

"""supervise: list pending supervise proposals across all bottles and
act on them (approve / modify / reject).
Curses-based TUI; modify-then-approve shells out to $EDITOR. The
approval handler wires to PRD 0016 (capability-block), which rebuilds
the bottle Dockerfile. The egress-block tool was removed in issue #198.
"""
from __future__ import annotations
import argparse
import curses
import os
import subprocess
import sys
import tempfile
import traceback
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from .. import supervise as _supervise
from ..backend.docker.bottle_state import read_metadata
from ..backend.docker.capability_apply import (
CapabilityApplyError,
apply_capability_change,
)
from ..log import Die, error, info
from ..supervise import (
COMPONENT_FOR_TOOL,
AuditEntry,
Proposal,
Response,
STATUS_APPROVED,
STATUS_MODIFIED,
STATUS_REJECTED,
TOOL_CAPABILITY_BLOCK,
archive_proposal,
list_pending_proposals,
render_diff,
write_audit_entry,
write_response,
)
from ._common import PROG
_REFRESH_INTERVAL_MS = 1000
@dataclass(frozen=True)
class QueuedProposal:
"""A pending proposal plus the queue dir it was found in."""
proposal: Proposal
queue_dir: Path
# Errors any remediation engine may raise. Caught by the TUI key
# handlers and surfaced in the status line so a failed apply keeps
# the proposal pending rather than crashing curses.
ApplyError = (CapabilityApplyError,)
def discover_pending() -> list[QueuedProposal]:
"""Walk ~/.bot-bottle/queue/* and collect pending proposals."""
queue_root = _supervise.bot_bottle_root() / "queue"
if not queue_root.is_dir():
return []
out: list[QueuedProposal] = []
for slug_dir in sorted(queue_root.iterdir()):
if not slug_dir.is_dir():
continue
for proposal in list_pending_proposals(slug_dir):
out.append(QueuedProposal(proposal=proposal, queue_dir=slug_dir))
out.sort(key=lambda q: q.proposal.arrival_timestamp)
return out
def _approval_status(qp: QueuedProposal, verb: str) -> str:
"""Status-line text after a successful approval."""
base = f"{verb} {qp.proposal.tool} for [{qp.proposal.bottle_slug}]"
return f"{base}; resume: ./cli.py resume {qp.proposal.bottle_slug}"
def _detail_lines(
qp: QueuedProposal,
*,
green_attr: int = 0,
) -> list[tuple[str, int]]:
"""Return the detail-view body as (text, curses-attr) tuples."""
p = qp.proposal
out: list[tuple[str, int]] = [
(f"bottle: {p.bottle_slug}", 0),
(f"tool: {p.tool}", 0),
(f"id: {p.id}", 0),
(f"arrived: {p.arrival_timestamp}", 0),
(f"queue: {qp.queue_dir}", 0),
("", 0),
("justification:", 0),
]
out.extend((" " + line, 0) for line in p.justification.splitlines() or [""])
out.extend([
("", 0),
("proposed file:", 0),
])
out.extend((line, 0) for line in p.proposed_file.splitlines() or [""])
return out
def _suffix_for_tool(tool: str) -> str:
if tool == TOOL_CAPABILITY_BLOCK:
return ".dockerfile"
return ".txt"
# --- Operator actions ------------------------------------------------------
def approve(
qp: QueuedProposal,
*,
notes: str = "",
final_file: str | None = None,
) -> None:
"""Apply the proposal, write the waiting response, and audit it."""
status = STATUS_MODIFIED if final_file is not None else STATUS_APPROVED
file_to_apply = final_file if final_file is not None else qp.proposal.proposed_file
diff_before, diff_after = "", ""
if qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
_meta = read_metadata(qp.proposal.bottle_slug)
if _meta is not None and not _meta.compose_project:
raise CapabilityApplyError(
"capability-block remediation is not supported for smolmachines "
"bottles. Reject this proposal or handle the capability change "
"manually, then restart the bottle."
)
diff_before, diff_after = apply_capability_change(
qp.proposal.bottle_slug, file_to_apply,
)
response = Response(
proposal_id=qp.proposal.id,
status=status,
notes=notes,
final_file=final_file,
)
write_response(qp.queue_dir, response)
_write_audit(
qp, action=status, notes=notes,
diff_before=diff_before, diff_after=diff_after,
)
if qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
archive_proposal(qp.queue_dir, qp.proposal.id)
def reject(qp: QueuedProposal, *, reason: str) -> None:
"""Write a rejection response and an audit entry."""
response = Response(
proposal_id=qp.proposal.id,
status=STATUS_REJECTED,
notes=reason,
final_file=None,
)
write_response(qp.queue_dir, response)
_write_audit(qp, action=STATUS_REJECTED, notes=reason, diff_before="", diff_after="")
def _write_audit(
qp: QueuedProposal,
*,
action: str,
notes: str,
diff_before: str,
diff_after: str,
) -> None:
"""Audit log for egress tool."""
component = COMPONENT_FOR_TOOL.get(qp.proposal.tool)
if component is None:
return
write_audit_entry(AuditEntry(
timestamp=datetime.now(timezone.utc).isoformat(),
bottle_slug=qp.proposal.bottle_slug,
component=component,
operator_action=action,
operator_notes=notes,
justification=qp.proposal.justification,
diff=render_diff(diff_before, diff_after, label=component),
))
# --- $EDITOR integration --------------------------------------------------
def edit_in_editor(content: str, *, suffix: str = ".tmp") -> str | None:
"""Open `content` in $EDITOR and return edited content, if changed."""
editor = os.environ.get("EDITOR", "vim")
with tempfile.NamedTemporaryFile(
mode="w", suffix=suffix, delete=False, prefix="supervise-modify.",
) as f:
f.write(content)
path = f.name
try:
subprocess.run([editor, path], check=False)
with open(path, encoding="utf-8") as f:
edited = f.read()
return edited if edited != content else None
finally:
try:
os.unlink(path)
except OSError:
pass
# --- TUI -------------------------------------------------------------------
def cmd_supervise(argv: list[str]) -> int:
parser = argparse.ArgumentParser(prog=f"{PROG} supervise", add_help=True)
parser.add_argument(
"--once", action="store_true",
help="list pending proposals once and exit (no TUI)",
)
args = parser.parse_args(argv)
if args.once:
return _list_once()
try:
curses.wrapper(_main_loop)
except KeyboardInterrupt:
return 130
except Die as e:
if e.message:
error(e.message)
else:
error("supervise exited on a fatal error (no detail captured).")
return e.code if isinstance(e.code, int) else 1
except Exception as e: # noqa: W0718 — catch supervise crash for logging
log_path = _write_crash_log(e)
error(f"supervise crashed: {type(e).__name__}: {e}")
error(f"full traceback written to {log_path}")
return 1
return 0
def _write_crash_log(exc: BaseException) -> Path:
"""Persist `exc`'s traceback to a stable file under ~/.bot-bottle/."""
stamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
body = "".join(
traceback.format_exception(type(exc), exc, exc.__traceback__)
)
entry = f"=== supervise crash {stamp} ===\n{body}\n"
try:
log_dir = _supervise.bot_bottle_root() / "logs"
log_dir.mkdir(parents=True, exist_ok=True)
path = log_dir / "supervise-crash.log"
with path.open("a", encoding="utf-8") as fh:
fh.write(entry)
return path
except OSError:
fd, tmp = tempfile.mkstemp(
prefix="bot-bottle-supervise-crash-", suffix=".log",
)
with os.fdopen(fd, "w", encoding="utf-8") as fh:
fh.write(entry)
return Path(tmp)
def _list_once() -> int:
pending = discover_pending()
if not pending:
info("no pending proposals")
return 0
for qp in pending:
sys.stdout.write(
f"{qp.proposal.arrival_timestamp} "
f"[{qp.proposal.bottle_slug}] "
f"{qp.proposal.tool} "
f"{qp.proposal.id}\n"
)
sys.stdout.write(f" {qp.proposal.justification}\n")
return 0
def _try_init_green() -> int:
"""Initialise a green color pair and return its attr, or 0."""
try:
curses.start_color()
curses.use_default_colors()
curses.init_pair(1, curses.COLOR_GREEN, -1)
return curses.color_pair(1)
except curses.error:
return 0
def _main_loop(stdscr: "curses._CursesWindow") -> None: # type: ignore
curses.curs_set(0)
stdscr.timeout(_REFRESH_INTERVAL_MS)
green_attr = _try_init_green()
selected = 0
status_line = ""
seen_ids: set[str] = set()
while True:
pending = discover_pending()
if selected >= len(pending):
selected = max(0, len(pending) - 1)
live_ids = {qp.proposal.id for qp in pending}
newly_arrived = live_ids - seen_ids
if seen_ids and newly_arrived:
try:
curses.beep()
except curses.error:
pass
for i, qp in enumerate(pending):
if qp.proposal.id in newly_arrived:
selected = i
break
seen_ids = live_ids
_render(
stdscr, pending, selected, status_line,
green_attr=green_attr,
)
try:
key = stdscr.getch()
except KeyboardInterrupt:
return
if key == -1:
continue
status_line = ""
if key in (ord("q"), 27):
return
if not pending:
continue
qp = pending[selected]
if key in (curses.KEY_DOWN, ord("j")):
selected = min(selected + 1, len(pending) - 1)
elif key in (curses.KEY_UP, ord("k")):
selected = max(selected - 1, 0)
elif key in (curses.KEY_ENTER, 10, 13):
_detail_view(stdscr, qp, green_attr=green_attr)
elif key == ord("a"):
try:
approve(qp)
status_line = _approval_status(qp, "approved")
except ApplyError as e:
status_line = f"apply failed: {e}"
elif key == ord("m"):
edited = _modify(stdscr, qp)
if edited is None:
status_line = "modify aborted (no change)"
else:
try:
approve(qp, final_file=edited, notes="operator modified before approving")
status_line = _approval_status(qp, "modified+approved")
except ApplyError as e:
status_line = f"apply failed: {e}"
elif key == ord("r"):
reason = _prompt(stdscr, "reject reason: ")
if reason:
reject(qp, reason=reason)
status_line = f"rejected {qp.proposal.tool} for [{qp.proposal.bottle_slug}]"
else:
status_line = "reject aborted (empty reason)"
def _render(
stdscr: "curses._CursesWindow", # type: ignore
pending: list[QueuedProposal],
selected: int,
status_line: str,
*,
green_attr: int = 0, # noqa: F841 — unused, but required by interface
) -> None:
stdscr.erase()
h, w = stdscr.getmaxyx()
header = f"bot-bottle supervise ({len(pending)} pending)"
stdscr.addnstr(0, 0, header, w - 1, curses.A_BOLD)
stdscr.hline(1, 0, curses.ACS_HLINE, w)
row = 2
if not pending:
stdscr.addnstr(
row, 2,
"no pending proposals; agents will queue here when they call a "
"supervise tool",
w - 4,
)
else:
for i, qp in enumerate(pending):
if row >= h - 3:
break
p = qp.proposal
ts_short = (
p.arrival_timestamp.split("T", 1)[1][:8]
if "T" in p.arrival_timestamp else p.arrival_timestamp
)
cursor = "> " if i == selected else " "
line = (
f"{cursor}{ts_short} "
f"[{p.bottle_slug}] {p.tool:<18} {p.id[:8]}"
)
attr = curses.A_REVERSE if i == selected else curses.A_NORMAL
stdscr.addnstr(row, 0, line, w - 1, attr)
row += 1
if row >= h - 3:
break
if p.justification:
stdscr.addnstr(row, 4, p.justification[: max(0, w - 5)], w - 5)
row += 1
footer = "[j/k] move [Enter] view [a] approve [m] modify [r] reject [q] quit"
stdscr.hline(h - 2, 0, curses.ACS_HLINE, w)
stdscr.addnstr(h - 1, 0, footer, w - 1, curses.A_DIM)
if status_line:
stdscr.addnstr(h - 3, 0, status_line, w - 1, curses.A_BOLD)
stdscr.refresh()
def _detail_view(
stdscr: "curses._CursesWindow", # type: ignore
qp: QueuedProposal,
*,
green_attr: int = 0,
) -> None:
"""Render the full proposal. Scrollable. Press q to return."""
lines = _detail_lines(qp, green_attr=green_attr)
offset = 0
while True:
stdscr.erase()
h, w = stdscr.getmaxyx()
for i, (text, attr) in enumerate(lines[offset:offset + h - 1]):
stdscr.addnstr(i, 0, text, w - 1, attr)
stdscr.addnstr(
h - 1, 0,
"[j/k] scroll [g/G] top/bottom [a] approve [m] modify [r] reject [q] back",
w - 1, curses.A_DIM,
)
stdscr.refresh()
key = stdscr.getch()
if key in (ord("q"), 27):
return
if key in (curses.KEY_DOWN, ord("j")):
offset = min(offset + 1, max(0, len(lines) - 1))
elif key in (curses.KEY_UP, ord("k")):
offset = max(offset - 1, 0)
elif key == ord("g"):
offset = 0
elif key == ord("G"):
offset = max(0, len(lines) - 1)
elif key == ord("a"):
try:
approve(qp)
except ApplyError:
pass
return
elif key == ord("m"):
edited = _modify(stdscr, qp)
if edited is not None:
try:
approve(qp, final_file=edited, notes="operator modified before approving")
except ApplyError:
pass
return
elif key == ord("r"):
reason = _prompt(stdscr, "reject reason: ")
if reason:
reject(qp, reason=reason)
return
def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None: # type: ignore
"""Suspend curses, open $EDITOR on the proposed file, return edited content."""
suffix = _suffix_for_tool(qp.proposal.tool)
curses.endwin()
try:
edited = edit_in_editor(qp.proposal.proposed_file, suffix=suffix)
finally:
stdscr.refresh()
return edited
def _prompt(stdscr: "curses._CursesWindow", label: str) -> str: # type: ignore
"""One-line input at the bottom of the screen."""
curses.curs_set(1)
h, _ = stdscr.getmaxyx()
stdscr.move(h - 2, 0)
stdscr.clrtoeol()
stdscr.addstr(h - 2, 0, label)
stdscr.refresh()
curses.echo()
try:
raw = stdscr.getstr(h - 2, len(label), 200)
finally:
curses.noecho()
curses.curs_set(0)
return raw.decode("utf-8", errors="replace").strip()
__all__ = [
"QueuedProposal",
"approve",
"cmd_supervise",
"discover_pending",
"edit_in_editor",
"reject",
]