feat(supervise): TUI dashboard for approve/modify/reject (PRD 0013)

Phase 4 of PRD 0013. Adds `claude-bottle dashboard` subcommand:

- discover_pending() walks ~/.claude-bottle/queue/* and gathers
  pending proposals across all bottles, sorted FIFO by arrival.
- approve / approve-with-final-file / reject helpers write the
  Response file the sidecar polls, and append an AuditEntry for
  cred-proxy and pipelock tools. capability-block proposals don't
  write to an audit log here (PRD 0016 captures via rebuild record).
- Stdlib-curses TUI: list view, detail view, $EDITOR shellout for
  modify-then-approve, inline prompt for reject reason.
- `dashboard --once` dumps pending proposals to stdout without
  bringing up curses — useful for scripted checks and tests.

For 0013 the audit entry's diff field is render_diff("", proposed)
because we don't yet have access to the live on-disk current file;
PRDs 0014 / 0015 fill in real before→after diffs once they own the
host-side config writes.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-25 04:14:45 -04:00
parent 4b2dbcdefd
commit 0aecb41e33
3 changed files with 628 additions and 1 deletions
+4 -1
View File
@@ -1,6 +1,6 @@
"""Main CLI dispatcher.
Commands: cleanup, edit, info, init, list, start
Commands: cleanup, dashboard, edit, info, init, list, start
"""
from __future__ import annotations
@@ -11,6 +11,7 @@ from ..log import Die, die
from ._common import PROG
from . import list as _list_mod
from .cleanup import cmd_cleanup
from .dashboard import cmd_dashboard
from .edit import cmd_edit
from .info import cmd_info
from .init import cmd_init
@@ -20,6 +21,7 @@ cmd_list = _list_mod.cmd_list
COMMANDS = {
"cleanup": cmd_cleanup,
"dashboard": cmd_dashboard,
"edit": cmd_edit,
"info": cmd_info,
"init": cmd_init,
@@ -32,6 +34,7 @@ def usage() -> None:
sys.stderr.write(f"usage: {PROG} <command> [args...]\n\n")
sys.stderr.write("Commands:\n")
sys.stderr.write(" cleanup stop and remove all active claude-bottle containers\n")
sys.stderr.write(" dashboard view + approve/modify/reject pending supervise proposals (PRD 0013)\n")
sys.stderr.write(" edit open an agent in vim for editing\n")
sys.stderr.write(" info print env, skills, and prompt details for a named agent\n")
sys.stderr.write(" init interactively create a new agent and add it to claude-bottle.json\n")
+397
View File
@@ -0,0 +1,397 @@
"""dashboard: list pending supervise proposals across all bottles and
act on them (approve / modify / reject). PRD 0013 v1.
Curses-based TUI; modify-then-approve shells out to $EDITOR. For
0013 the approval handlers are no-ops on the supervisor side: the
response file is written (and the sidecar returns it to the agent),
and an audit entry is appended, but no host-side config change runs.
PRDs 0014 (cred-proxy) and 0015 (pipelock) wire in the actual
writes.
"""
from __future__ import annotations
import argparse
import curses
import os
import subprocess
import sys
import tempfile
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from .. import supervise as _supervise
from ..log import info
from ..supervise import (
ACTION_OPERATOR_EDIT,
COMPONENT_FOR_TOOL,
AuditEntry,
Proposal,
Response,
STATUS_APPROVED,
STATUS_MODIFIED,
STATUS_REJECTED,
TOOL_CAPABILITY_BLOCK,
list_pending_proposals,
render_diff,
write_audit_entry,
write_response,
)
from ._common import PROG
# --- Discovery -------------------------------------------------------------
@dataclass(frozen=True)
class QueuedProposal:
"""A pending proposal plus the queue dir it was found in."""
proposal: Proposal
queue_dir: Path
def discover_pending() -> list[QueuedProposal]:
"""Walk ~/.claude-bottle/queue/* and collect pending proposals
from every bottle's queue. Sorted by arrival time across the
union — the operator works the global FIFO."""
queue_root = _supervise.claude_bottle_root() / "queue"
if not queue_root.is_dir():
return []
out: list[QueuedProposal] = []
for slug_dir in sorted(queue_root.iterdir()):
if not slug_dir.is_dir():
continue
for proposal in list_pending_proposals(slug_dir):
out.append(QueuedProposal(proposal=proposal, queue_dir=slug_dir))
out.sort(key=lambda q: q.proposal.arrival_timestamp)
return out
# --- Operator actions ------------------------------------------------------
def approve(
qp: QueuedProposal,
*,
notes: str = "",
final_file: str | None = None,
) -> None:
"""Write an approval response and an audit entry. If `final_file`
is provided the status is `modified`; otherwise `approved`."""
status = STATUS_MODIFIED if final_file is not None else STATUS_APPROVED
response = Response(
proposal_id=qp.proposal.id,
status=status,
notes=notes,
final_file=final_file,
)
write_response(qp.queue_dir, response)
_write_audit(qp, action=status, notes=notes, final_file=final_file)
def reject(qp: QueuedProposal, *, reason: str) -> None:
"""Write a rejection response and an audit entry."""
response = Response(
proposal_id=qp.proposal.id,
status=STATUS_REJECTED,
notes=reason,
final_file=None,
)
write_response(qp.queue_dir, response)
_write_audit(qp, action=STATUS_REJECTED, notes=reason, final_file=None)
def _write_audit(
qp: QueuedProposal,
*,
action: str,
notes: str,
final_file: str | None,
) -> None:
"""Audit log for cred-proxy / pipelock tools. capability-block has
no audit log (its changes are captured by the bottle's rebuild
record + git history per PRD 0016)."""
component = COMPONENT_FOR_TOOL.get(qp.proposal.tool)
if component is None:
# capability-block: skip audit log; 0016 records via rebuild.
return
# v1 audit diff is empty: 0013's no-op handler doesn't have the
# actual current-on-disk file to diff against, only the agent's
# proposed file. 0014 / 0015 fill in the real diff against the
# live routes.json / allowlist after writing the change.
write_audit_entry(AuditEntry(
timestamp=datetime.now(timezone.utc).isoformat(),
bottle_slug=qp.proposal.bottle_slug,
component=component,
operator_action=action,
operator_notes=notes,
justification=qp.proposal.justification,
diff=render_diff(
"",
final_file if final_file is not None else qp.proposal.proposed_file,
label=component,
),
))
# --- $EDITOR integration --------------------------------------------------
def edit_in_editor(content: str, *, suffix: str = ".tmp") -> str | None:
"""Suspend curses (caller is responsible for that), drop `content`
to a temp file, exec $EDITOR on it, return the edited content.
Returns None if the edit was a no-op."""
editor = os.environ.get("EDITOR", "vim")
with tempfile.NamedTemporaryFile(
mode="w", suffix=suffix, delete=False, prefix="supervise-modify.",
) as f:
f.write(content)
path = f.name
try:
subprocess.run([editor, path], check=False)
with open(path) as f:
edited = f.read()
return edited if edited != content else None
finally:
try:
os.unlink(path)
except OSError:
pass
# --- TUI -------------------------------------------------------------------
def cmd_dashboard(argv: list[str]) -> int:
parser = argparse.ArgumentParser(prog=f"{PROG} dashboard", add_help=True)
parser.add_argument(
"--once", action="store_true",
help="list pending proposals once and exit (no TUI)",
)
args = parser.parse_args(argv)
if args.once:
return _list_once()
try:
curses.wrapper(_main_loop)
except KeyboardInterrupt:
return 130
return 0
def _list_once() -> int:
pending = discover_pending()
if not pending:
info("no pending proposals")
return 0
for qp in pending:
sys.stdout.write(
f"{qp.proposal.arrival_timestamp} "
f"[{qp.proposal.bottle_slug}] "
f"{qp.proposal.tool} "
f"{qp.proposal.id}\n"
)
sys.stdout.write(f" {qp.proposal.justification}\n")
return 0
def _main_loop(stdscr: "curses._CursesWindow") -> None:
curses.curs_set(0)
stdscr.nodelay(False)
selected = 0
status_line = ""
while True:
pending = discover_pending()
if selected >= len(pending):
selected = max(0, len(pending) - 1)
_render(stdscr, pending, selected, status_line)
status_line = ""
try:
key = stdscr.getch()
except KeyboardInterrupt:
return
if key in (ord("q"), 27): # q or ESC
return
if not pending:
continue
qp = pending[selected]
if key in (curses.KEY_DOWN, ord("j")):
selected = min(selected + 1, len(pending) - 1)
elif key in (curses.KEY_UP, ord("k")):
selected = max(selected - 1, 0)
elif key in (curses.KEY_ENTER, 10, 13, ord("v")):
_detail_view(stdscr, qp)
elif key == ord("a"):
approve(qp)
status_line = f"approved {qp.proposal.tool} for [{qp.proposal.bottle_slug}]"
elif key == ord("m"):
edited = _modify(stdscr, qp)
if edited is None:
status_line = "modify aborted (no change)"
else:
approve(qp, final_file=edited, notes="operator modified before approving")
status_line = f"modified+approved {qp.proposal.tool} for [{qp.proposal.bottle_slug}]"
elif key == ord("r"):
reason = _prompt(stdscr, "reject reason: ")
if reason:
reject(qp, reason=reason)
status_line = f"rejected {qp.proposal.tool} for [{qp.proposal.bottle_slug}]"
else:
status_line = "reject aborted (empty reason)"
def _render(
stdscr: "curses._CursesWindow",
pending: list[QueuedProposal],
selected: int,
status_line: str,
) -> None:
stdscr.erase()
h, w = stdscr.getmaxyx()
header = f"claude-bottle dashboard ({len(pending)} pending)"
stdscr.addnstr(0, 0, header, w - 1, curses.A_BOLD)
stdscr.hline(1, 0, curses.ACS_HLINE, w)
if not pending:
stdscr.addnstr(
3, 2,
"no pending proposals; agents will queue here when they call a "
"supervise tool",
w - 4,
)
else:
for i, qp in enumerate(pending):
row = 2 + i
if row >= h - 2:
break
p = qp.proposal
ts_short = p.arrival_timestamp.split("T", 1)[1][:8] if "T" in p.arrival_timestamp else p.arrival_timestamp
line = (
f"{'> ' if i == selected else ' '}"
f"[{p.bottle_slug}] {p.tool:<20} {ts_short} "
f"{p.justification[:60]}"
)
attr = curses.A_REVERSE if i == selected else curses.A_NORMAL
stdscr.addnstr(row, 0, line, w - 1, attr)
footer = "[Enter] view [a] approve [m] modify [r] reject [j/k] move [q] quit"
stdscr.hline(h - 2, 0, curses.ACS_HLINE, w)
stdscr.addnstr(h - 1, 0, footer, w - 1, curses.A_DIM)
if status_line:
stdscr.addnstr(h - 3, 0, status_line, w - 1, curses.A_BOLD)
stdscr.refresh()
def _detail_view(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> None:
"""Render the full proposal: header, justification, proposed file
contents. Scrollable. Press q to return."""
lines = _detail_lines(qp)
offset = 0
while True:
stdscr.erase()
h, w = stdscr.getmaxyx()
for i, line in enumerate(lines[offset:offset + h - 1]):
stdscr.addnstr(i, 0, line, w - 1)
stdscr.addnstr(
h - 1, 0,
"[j/k] scroll [g/G] top/bottom [a] approve [m] modify [r] reject [q] back",
w - 1, curses.A_DIM,
)
stdscr.refresh()
key = stdscr.getch()
if key in (ord("q"), 27):
return
if key in (curses.KEY_DOWN, ord("j")):
offset = min(offset + 1, max(0, len(lines) - 1))
elif key in (curses.KEY_UP, ord("k")):
offset = max(offset - 1, 0)
elif key == ord("g"):
offset = 0
elif key == ord("G"):
offset = max(0, len(lines) - 1)
elif key == ord("a"):
approve(qp)
return
elif key == ord("m"):
edited = _modify(stdscr, qp)
if edited is not None:
approve(qp, final_file=edited, notes="operator modified before approving")
return
elif key == ord("r"):
reason = _prompt(stdscr, "reject reason: ")
if reason:
reject(qp, reason=reason)
return
def _detail_lines(qp: QueuedProposal) -> list[str]:
p = qp.proposal
out = [
f"bottle: {p.bottle_slug}",
f"tool: {p.tool}",
f"id: {p.id}",
f"arrived: {p.arrival_timestamp}",
f"queue: {qp.queue_dir}",
"",
"justification:",
]
out.extend(" " + line for line in p.justification.splitlines() or [""])
out.extend([
"",
"proposed file:",
])
out.extend(p.proposed_file.splitlines() or [""])
return out
def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None:
"""Suspend curses, open $EDITOR on the proposed file, return the
edited content (or None if unchanged)."""
suffix = _suffix_for_tool(qp.proposal.tool)
curses.endwin()
try:
edited = edit_in_editor(qp.proposal.proposed_file, suffix=suffix)
finally:
stdscr.refresh()
return edited
def _suffix_for_tool(tool: str) -> str:
if tool == TOOL_CAPABILITY_BLOCK:
return ".dockerfile"
# cred-proxy-block / pipelock-block: JSON-ish + plain.
return ".txt"
def _prompt(stdscr: "curses._CursesWindow", label: str) -> str:
"""One-line input at the bottom of the screen."""
curses.curs_set(1)
h, _ = stdscr.getmaxyx()
stdscr.move(h - 2, 0)
stdscr.clrtoeol()
stdscr.addstr(h - 2, 0, label)
stdscr.refresh()
curses.echo()
try:
raw = stdscr.getstr(h - 2, len(label), 200)
finally:
curses.noecho()
curses.curs_set(0)
return raw.decode("utf-8", errors="replace").strip()
__all__ = [
"ACTION_OPERATOR_EDIT", # re-exported for 0014/0015 to write operator-initiated audit entries
"QueuedProposal",
"approve",
"cmd_dashboard",
"discover_pending",
"edit_in_editor",
"reject",
]
+227
View File
@@ -0,0 +1,227 @@
"""Unit: dashboard headless paths (PRD 0013 phase 4).
The curses TUI itself isn't exercised here — these tests cover the
discovery + approve/reject + audit-write paths that the TUI's key
handlers call into.
"""
import os
import tempfile
import unittest
from datetime import datetime, timezone
from pathlib import Path
from claude_bottle import supervise
from claude_bottle.cli import dashboard
from claude_bottle.supervise import (
Proposal,
STATUS_APPROVED,
STATUS_MODIFIED,
STATUS_REJECTED,
TOOL_CAPABILITY_BLOCK,
TOOL_CRED_PROXY_BLOCK,
TOOL_PIPELOCK_BLOCK,
read_audit_entries,
read_response,
sha256_hex,
)
FIXED = datetime(2026, 5, 25, 12, 0, 0, tzinfo=timezone.utc)
def _proposal(slug: str = "dev", tool: str = TOOL_CRED_PROXY_BLOCK) -> Proposal:
return Proposal.new(
bottle_slug=slug, tool=tool,
proposed_file='{"routes": []}\n',
justification=f"needed for {slug}",
current_file_hash=sha256_hex("{}"),
now=FIXED,
)
class _FakeHomeMixin:
"""Patch supervise.claude_bottle_root to a temp dir for the test."""
def _setup_fake_home(self):
self._tmp = tempfile.TemporaryDirectory(prefix="dashboard-test.")
original = supervise.claude_bottle_root
def fake_root() -> Path:
return Path(self._tmp.name) / ".claude-bottle"
supervise.claude_bottle_root = fake_root # type: ignore[assignment]
self._restore_home = lambda: setattr(supervise, "claude_bottle_root", original)
def _teardown_fake_home(self):
self._restore_home()
self._tmp.cleanup()
class TestDiscoverPending(_FakeHomeMixin, unittest.TestCase):
def setUp(self):
self._setup_fake_home()
def tearDown(self):
self._teardown_fake_home()
def test_empty_when_no_queues(self):
self.assertEqual([], dashboard.discover_pending())
def test_walks_all_slug_subdirs(self):
for slug in ("dev", "api"):
qdir = supervise.queue_dir_for_slug(slug)
qdir.mkdir(parents=True)
supervise.write_proposal(qdir, _proposal(slug=slug))
pending = dashboard.discover_pending()
self.assertEqual({"dev", "api"}, {qp.proposal.bottle_slug for qp in pending})
def test_sorted_by_arrival_across_bottles(self):
early = Proposal.new(
bottle_slug="api", tool=TOOL_CRED_PROXY_BLOCK,
proposed_file="{}", justification="early",
current_file_hash="h",
now=datetime(2026, 5, 25, 10, 0, 0, tzinfo=timezone.utc),
)
late = Proposal.new(
bottle_slug="dev", tool=TOOL_CRED_PROXY_BLOCK,
proposed_file="{}", justification="late",
current_file_hash="h",
now=datetime(2026, 5, 25, 14, 0, 0, tzinfo=timezone.utc),
)
for p in (late, early):
qdir = supervise.queue_dir_for_slug(p.bottle_slug)
qdir.mkdir(parents=True, exist_ok=True)
supervise.write_proposal(qdir, p)
pending = dashboard.discover_pending()
self.assertEqual([early.id, late.id], [qp.proposal.id for qp in pending])
def test_excludes_already_responded(self):
p = _proposal()
qdir = supervise.queue_dir_for_slug("dev")
qdir.mkdir(parents=True)
supervise.write_proposal(qdir, p)
supervise.write_response(qdir, supervise.Response(
proposal_id=p.id, status=STATUS_APPROVED, notes="",
))
self.assertEqual([], dashboard.discover_pending())
class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
def setUp(self):
self._setup_fake_home()
def tearDown(self):
self._teardown_fake_home()
def _enqueue(self, tool: str = TOOL_CRED_PROXY_BLOCK):
p = _proposal(tool=tool)
qdir = supervise.queue_dir_for_slug("dev")
qdir.mkdir(parents=True, exist_ok=True)
supervise.write_proposal(qdir, p)
return dashboard.QueuedProposal(proposal=p, queue_dir=qdir)
def test_approve_writes_response_and_audit(self):
qp = self._enqueue()
dashboard.approve(qp)
resp = read_response(qp.queue_dir, qp.proposal.id)
self.assertEqual(STATUS_APPROVED, resp.status)
self.assertIsNone(resp.final_file)
entries = read_audit_entries("cred-proxy", "dev")
self.assertEqual(1, len(entries))
self.assertEqual("approved", entries[0].operator_action)
def test_approve_with_final_file_marks_modified(self):
qp = self._enqueue()
dashboard.approve(qp, final_file='{"routes": [{"path": "/x/"}]}\n', notes="tweaked")
resp = read_response(qp.queue_dir, qp.proposal.id)
self.assertEqual(STATUS_MODIFIED, resp.status)
self.assertEqual('{"routes": [{"path": "/x/"}]}\n', resp.final_file)
self.assertEqual("tweaked", resp.notes)
entries = read_audit_entries("cred-proxy", "dev")
self.assertEqual("modified", entries[0].operator_action)
def test_reject_writes_rejection(self):
qp = self._enqueue()
dashboard.reject(qp, reason="nope")
resp = read_response(qp.queue_dir, qp.proposal.id)
self.assertEqual(STATUS_REJECTED, resp.status)
self.assertEqual("nope", resp.notes)
entries = read_audit_entries("cred-proxy", "dev")
self.assertEqual("rejected", entries[0].operator_action)
self.assertEqual("nope", entries[0].operator_notes)
def test_capability_block_skips_audit_log(self):
qp = self._enqueue(tool=TOOL_CAPABILITY_BLOCK)
dashboard.approve(qp)
# No audit log for capability-block (per PRD 0013 / 0016).
# cred-proxy and pipelock logs both empty.
self.assertEqual([], read_audit_entries("cred-proxy", "dev"))
self.assertEqual([], read_audit_entries("pipelock", "dev"))
def test_pipelock_audit_distinct_from_cred_proxy(self):
qp = self._enqueue(tool=TOOL_PIPELOCK_BLOCK)
dashboard.approve(qp)
self.assertEqual(1, len(read_audit_entries("pipelock", "dev")))
self.assertEqual(0, len(read_audit_entries("cred-proxy", "dev")))
class TestEditInEditor(unittest.TestCase):
def test_runs_editor_returns_edited_content(self):
# Fake "editor" is /bin/sh -c 'cat <<EOF > $1 ... EOF'
original_editor = os.environ.get("EDITOR")
try:
# Use a fake editor that overwrites the file with a known
# marker. EDITOR is split with shlex equivalence by
# subprocess.run when invoked as a list — keep it as a
# single program path that takes the file as argv[1].
os.environ["EDITOR"] = (
"/bin/sh -c 'printf %s \"edited\" > \"$0\"'"
)
# subprocess.run with the str as the first list element
# would try to find a binary literally named "/bin/sh -c ..."
# — that won't work. Use shell mode trick: wrap in a script.
# Easier: build a tiny helper script.
with tempfile.NamedTemporaryFile(
mode="w", suffix=".sh", delete=False, prefix="fake-editor.",
) as script:
script.write('#!/bin/sh\nprintf "%s" "edited" > "$1"\n')
editor_script = script.name
os.chmod(editor_script, 0o755)
os.environ["EDITOR"] = editor_script
try:
result = dashboard.edit_in_editor("original")
self.assertEqual("edited", result)
finally:
os.unlink(editor_script)
finally:
if original_editor is None:
os.environ.pop("EDITOR", None)
else:
os.environ["EDITOR"] = original_editor
def test_returns_none_when_unchanged(self):
original_editor = os.environ.get("EDITOR")
try:
# No-op editor: touch the file (leaves it unchanged).
with tempfile.NamedTemporaryFile(
mode="w", suffix=".sh", delete=False, prefix="noop-editor.",
) as script:
script.write('#!/bin/sh\n: $1\n')
editor_script = script.name
os.chmod(editor_script, 0o755)
os.environ["EDITOR"] = editor_script
try:
result = dashboard.edit_in_editor("original")
self.assertIsNone(result)
finally:
os.unlink(editor_script)
finally:
if original_editor is None:
os.environ.pop("EDITOR", None)
else:
os.environ["EDITOR"] = original_editor
if __name__ == "__main__":
unittest.main()