feat(supervise): TUI dashboard for approve/modify/reject (PRD 0013)
Phase 4 of PRD 0013. Adds `claude-bottle dashboard` subcommand:
- discover_pending() walks ~/.claude-bottle/queue/* and gathers
pending proposals across all bottles, sorted FIFO by arrival.
- approve / approve-with-final-file / reject helpers write the
Response file the sidecar polls, and append an AuditEntry for
cred-proxy and pipelock tools. capability-block proposals don't
write to an audit log here (PRD 0016 captures via rebuild record).
- Stdlib-curses TUI: list view, detail view, $EDITOR shellout for
modify-then-approve, inline prompt for reject reason.
- `dashboard --once` dumps pending proposals to stdout without
bringing up curses — useful for scripted checks and tests.
For 0013 the audit entry's diff field is render_diff("", proposed)
because we don't yet have access to the live on-disk current file;
PRDs 0014 / 0015 fill in real before→after diffs once they own the
host-side config writes.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
"""Main CLI dispatcher.
|
||||
|
||||
Commands: cleanup, edit, info, init, list, start
|
||||
Commands: cleanup, dashboard, edit, info, init, list, start
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -11,6 +11,7 @@ from ..log import Die, die
|
||||
from ._common import PROG
|
||||
from . import list as _list_mod
|
||||
from .cleanup import cmd_cleanup
|
||||
from .dashboard import cmd_dashboard
|
||||
from .edit import cmd_edit
|
||||
from .info import cmd_info
|
||||
from .init import cmd_init
|
||||
@@ -20,6 +21,7 @@ cmd_list = _list_mod.cmd_list
|
||||
|
||||
COMMANDS = {
|
||||
"cleanup": cmd_cleanup,
|
||||
"dashboard": cmd_dashboard,
|
||||
"edit": cmd_edit,
|
||||
"info": cmd_info,
|
||||
"init": cmd_init,
|
||||
@@ -32,6 +34,7 @@ def usage() -> None:
|
||||
sys.stderr.write(f"usage: {PROG} <command> [args...]\n\n")
|
||||
sys.stderr.write("Commands:\n")
|
||||
sys.stderr.write(" cleanup stop and remove all active claude-bottle containers\n")
|
||||
sys.stderr.write(" dashboard view + approve/modify/reject pending supervise proposals (PRD 0013)\n")
|
||||
sys.stderr.write(" edit open an agent in vim for editing\n")
|
||||
sys.stderr.write(" info print env, skills, and prompt details for a named agent\n")
|
||||
sys.stderr.write(" init interactively create a new agent and add it to claude-bottle.json\n")
|
||||
|
||||
@@ -0,0 +1,397 @@
|
||||
"""dashboard: list pending supervise proposals across all bottles and
|
||||
act on them (approve / modify / reject). PRD 0013 v1.
|
||||
|
||||
Curses-based TUI; modify-then-approve shells out to $EDITOR. For
|
||||
0013 the approval handlers are no-ops on the supervisor side: the
|
||||
response file is written (and the sidecar returns it to the agent),
|
||||
and an audit entry is appended, but no host-side config change runs.
|
||||
PRDs 0014 (cred-proxy) and 0015 (pipelock) wire in the actual
|
||||
writes.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import curses
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
from .. import supervise as _supervise
|
||||
from ..log import info
|
||||
from ..supervise import (
|
||||
ACTION_OPERATOR_EDIT,
|
||||
COMPONENT_FOR_TOOL,
|
||||
AuditEntry,
|
||||
Proposal,
|
||||
Response,
|
||||
STATUS_APPROVED,
|
||||
STATUS_MODIFIED,
|
||||
STATUS_REJECTED,
|
||||
TOOL_CAPABILITY_BLOCK,
|
||||
list_pending_proposals,
|
||||
render_diff,
|
||||
write_audit_entry,
|
||||
write_response,
|
||||
)
|
||||
from ._common import PROG
|
||||
|
||||
|
||||
# --- Discovery -------------------------------------------------------------
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class QueuedProposal:
|
||||
"""A pending proposal plus the queue dir it was found in."""
|
||||
|
||||
proposal: Proposal
|
||||
queue_dir: Path
|
||||
|
||||
|
||||
def discover_pending() -> list[QueuedProposal]:
|
||||
"""Walk ~/.claude-bottle/queue/* and collect pending proposals
|
||||
from every bottle's queue. Sorted by arrival time across the
|
||||
union — the operator works the global FIFO."""
|
||||
queue_root = _supervise.claude_bottle_root() / "queue"
|
||||
if not queue_root.is_dir():
|
||||
return []
|
||||
out: list[QueuedProposal] = []
|
||||
for slug_dir in sorted(queue_root.iterdir()):
|
||||
if not slug_dir.is_dir():
|
||||
continue
|
||||
for proposal in list_pending_proposals(slug_dir):
|
||||
out.append(QueuedProposal(proposal=proposal, queue_dir=slug_dir))
|
||||
out.sort(key=lambda q: q.proposal.arrival_timestamp)
|
||||
return out
|
||||
|
||||
|
||||
# --- Operator actions ------------------------------------------------------
|
||||
|
||||
|
||||
def approve(
|
||||
qp: QueuedProposal,
|
||||
*,
|
||||
notes: str = "",
|
||||
final_file: str | None = None,
|
||||
) -> None:
|
||||
"""Write an approval response and an audit entry. If `final_file`
|
||||
is provided the status is `modified`; otherwise `approved`."""
|
||||
status = STATUS_MODIFIED if final_file is not None else STATUS_APPROVED
|
||||
response = Response(
|
||||
proposal_id=qp.proposal.id,
|
||||
status=status,
|
||||
notes=notes,
|
||||
final_file=final_file,
|
||||
)
|
||||
write_response(qp.queue_dir, response)
|
||||
_write_audit(qp, action=status, notes=notes, final_file=final_file)
|
||||
|
||||
|
||||
def reject(qp: QueuedProposal, *, reason: str) -> None:
|
||||
"""Write a rejection response and an audit entry."""
|
||||
response = Response(
|
||||
proposal_id=qp.proposal.id,
|
||||
status=STATUS_REJECTED,
|
||||
notes=reason,
|
||||
final_file=None,
|
||||
)
|
||||
write_response(qp.queue_dir, response)
|
||||
_write_audit(qp, action=STATUS_REJECTED, notes=reason, final_file=None)
|
||||
|
||||
|
||||
def _write_audit(
|
||||
qp: QueuedProposal,
|
||||
*,
|
||||
action: str,
|
||||
notes: str,
|
||||
final_file: str | None,
|
||||
) -> None:
|
||||
"""Audit log for cred-proxy / pipelock tools. capability-block has
|
||||
no audit log (its changes are captured by the bottle's rebuild
|
||||
record + git history per PRD 0016)."""
|
||||
component = COMPONENT_FOR_TOOL.get(qp.proposal.tool)
|
||||
if component is None:
|
||||
# capability-block: skip audit log; 0016 records via rebuild.
|
||||
return
|
||||
# v1 audit diff is empty: 0013's no-op handler doesn't have the
|
||||
# actual current-on-disk file to diff against, only the agent's
|
||||
# proposed file. 0014 / 0015 fill in the real diff against the
|
||||
# live routes.json / allowlist after writing the change.
|
||||
write_audit_entry(AuditEntry(
|
||||
timestamp=datetime.now(timezone.utc).isoformat(),
|
||||
bottle_slug=qp.proposal.bottle_slug,
|
||||
component=component,
|
||||
operator_action=action,
|
||||
operator_notes=notes,
|
||||
justification=qp.proposal.justification,
|
||||
diff=render_diff(
|
||||
"",
|
||||
final_file if final_file is not None else qp.proposal.proposed_file,
|
||||
label=component,
|
||||
),
|
||||
))
|
||||
|
||||
|
||||
# --- $EDITOR integration --------------------------------------------------
|
||||
|
||||
|
||||
def edit_in_editor(content: str, *, suffix: str = ".tmp") -> str | None:
|
||||
"""Suspend curses (caller is responsible for that), drop `content`
|
||||
to a temp file, exec $EDITOR on it, return the edited content.
|
||||
Returns None if the edit was a no-op."""
|
||||
editor = os.environ.get("EDITOR", "vim")
|
||||
with tempfile.NamedTemporaryFile(
|
||||
mode="w", suffix=suffix, delete=False, prefix="supervise-modify.",
|
||||
) as f:
|
||||
f.write(content)
|
||||
path = f.name
|
||||
try:
|
||||
subprocess.run([editor, path], check=False)
|
||||
with open(path) as f:
|
||||
edited = f.read()
|
||||
return edited if edited != content else None
|
||||
finally:
|
||||
try:
|
||||
os.unlink(path)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
# --- TUI -------------------------------------------------------------------
|
||||
|
||||
|
||||
def cmd_dashboard(argv: list[str]) -> int:
|
||||
parser = argparse.ArgumentParser(prog=f"{PROG} dashboard", add_help=True)
|
||||
parser.add_argument(
|
||||
"--once", action="store_true",
|
||||
help="list pending proposals once and exit (no TUI)",
|
||||
)
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
if args.once:
|
||||
return _list_once()
|
||||
try:
|
||||
curses.wrapper(_main_loop)
|
||||
except KeyboardInterrupt:
|
||||
return 130
|
||||
return 0
|
||||
|
||||
|
||||
def _list_once() -> int:
|
||||
pending = discover_pending()
|
||||
if not pending:
|
||||
info("no pending proposals")
|
||||
return 0
|
||||
for qp in pending:
|
||||
sys.stdout.write(
|
||||
f"{qp.proposal.arrival_timestamp} "
|
||||
f"[{qp.proposal.bottle_slug}] "
|
||||
f"{qp.proposal.tool} "
|
||||
f"{qp.proposal.id}\n"
|
||||
)
|
||||
sys.stdout.write(f" {qp.proposal.justification}\n")
|
||||
return 0
|
||||
|
||||
|
||||
def _main_loop(stdscr: "curses._CursesWindow") -> None:
|
||||
curses.curs_set(0)
|
||||
stdscr.nodelay(False)
|
||||
selected = 0
|
||||
status_line = ""
|
||||
while True:
|
||||
pending = discover_pending()
|
||||
if selected >= len(pending):
|
||||
selected = max(0, len(pending) - 1)
|
||||
|
||||
_render(stdscr, pending, selected, status_line)
|
||||
status_line = ""
|
||||
|
||||
try:
|
||||
key = stdscr.getch()
|
||||
except KeyboardInterrupt:
|
||||
return
|
||||
|
||||
if key in (ord("q"), 27): # q or ESC
|
||||
return
|
||||
if not pending:
|
||||
continue
|
||||
qp = pending[selected]
|
||||
|
||||
if key in (curses.KEY_DOWN, ord("j")):
|
||||
selected = min(selected + 1, len(pending) - 1)
|
||||
elif key in (curses.KEY_UP, ord("k")):
|
||||
selected = max(selected - 1, 0)
|
||||
elif key in (curses.KEY_ENTER, 10, 13, ord("v")):
|
||||
_detail_view(stdscr, qp)
|
||||
elif key == ord("a"):
|
||||
approve(qp)
|
||||
status_line = f"approved {qp.proposal.tool} for [{qp.proposal.bottle_slug}]"
|
||||
elif key == ord("m"):
|
||||
edited = _modify(stdscr, qp)
|
||||
if edited is None:
|
||||
status_line = "modify aborted (no change)"
|
||||
else:
|
||||
approve(qp, final_file=edited, notes="operator modified before approving")
|
||||
status_line = f"modified+approved {qp.proposal.tool} for [{qp.proposal.bottle_slug}]"
|
||||
elif key == ord("r"):
|
||||
reason = _prompt(stdscr, "reject reason: ")
|
||||
if reason:
|
||||
reject(qp, reason=reason)
|
||||
status_line = f"rejected {qp.proposal.tool} for [{qp.proposal.bottle_slug}]"
|
||||
else:
|
||||
status_line = "reject aborted (empty reason)"
|
||||
|
||||
|
||||
def _render(
|
||||
stdscr: "curses._CursesWindow",
|
||||
pending: list[QueuedProposal],
|
||||
selected: int,
|
||||
status_line: str,
|
||||
) -> None:
|
||||
stdscr.erase()
|
||||
h, w = stdscr.getmaxyx()
|
||||
header = f"claude-bottle dashboard ({len(pending)} pending)"
|
||||
stdscr.addnstr(0, 0, header, w - 1, curses.A_BOLD)
|
||||
stdscr.hline(1, 0, curses.ACS_HLINE, w)
|
||||
|
||||
if not pending:
|
||||
stdscr.addnstr(
|
||||
3, 2,
|
||||
"no pending proposals; agents will queue here when they call a "
|
||||
"supervise tool",
|
||||
w - 4,
|
||||
)
|
||||
else:
|
||||
for i, qp in enumerate(pending):
|
||||
row = 2 + i
|
||||
if row >= h - 2:
|
||||
break
|
||||
p = qp.proposal
|
||||
ts_short = p.arrival_timestamp.split("T", 1)[1][:8] if "T" in p.arrival_timestamp else p.arrival_timestamp
|
||||
line = (
|
||||
f"{'> ' if i == selected else ' '}"
|
||||
f"[{p.bottle_slug}] {p.tool:<20} {ts_short} "
|
||||
f"{p.justification[:60]}"
|
||||
)
|
||||
attr = curses.A_REVERSE if i == selected else curses.A_NORMAL
|
||||
stdscr.addnstr(row, 0, line, w - 1, attr)
|
||||
|
||||
footer = "[Enter] view [a] approve [m] modify [r] reject [j/k] move [q] quit"
|
||||
stdscr.hline(h - 2, 0, curses.ACS_HLINE, w)
|
||||
stdscr.addnstr(h - 1, 0, footer, w - 1, curses.A_DIM)
|
||||
if status_line:
|
||||
stdscr.addnstr(h - 3, 0, status_line, w - 1, curses.A_BOLD)
|
||||
stdscr.refresh()
|
||||
|
||||
|
||||
def _detail_view(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> None:
|
||||
"""Render the full proposal: header, justification, proposed file
|
||||
contents. Scrollable. Press q to return."""
|
||||
lines = _detail_lines(qp)
|
||||
offset = 0
|
||||
while True:
|
||||
stdscr.erase()
|
||||
h, w = stdscr.getmaxyx()
|
||||
for i, line in enumerate(lines[offset:offset + h - 1]):
|
||||
stdscr.addnstr(i, 0, line, w - 1)
|
||||
stdscr.addnstr(
|
||||
h - 1, 0,
|
||||
"[j/k] scroll [g/G] top/bottom [a] approve [m] modify [r] reject [q] back",
|
||||
w - 1, curses.A_DIM,
|
||||
)
|
||||
stdscr.refresh()
|
||||
key = stdscr.getch()
|
||||
if key in (ord("q"), 27):
|
||||
return
|
||||
if key in (curses.KEY_DOWN, ord("j")):
|
||||
offset = min(offset + 1, max(0, len(lines) - 1))
|
||||
elif key in (curses.KEY_UP, ord("k")):
|
||||
offset = max(offset - 1, 0)
|
||||
elif key == ord("g"):
|
||||
offset = 0
|
||||
elif key == ord("G"):
|
||||
offset = max(0, len(lines) - 1)
|
||||
elif key == ord("a"):
|
||||
approve(qp)
|
||||
return
|
||||
elif key == ord("m"):
|
||||
edited = _modify(stdscr, qp)
|
||||
if edited is not None:
|
||||
approve(qp, final_file=edited, notes="operator modified before approving")
|
||||
return
|
||||
elif key == ord("r"):
|
||||
reason = _prompt(stdscr, "reject reason: ")
|
||||
if reason:
|
||||
reject(qp, reason=reason)
|
||||
return
|
||||
|
||||
|
||||
def _detail_lines(qp: QueuedProposal) -> list[str]:
|
||||
p = qp.proposal
|
||||
out = [
|
||||
f"bottle: {p.bottle_slug}",
|
||||
f"tool: {p.tool}",
|
||||
f"id: {p.id}",
|
||||
f"arrived: {p.arrival_timestamp}",
|
||||
f"queue: {qp.queue_dir}",
|
||||
"",
|
||||
"justification:",
|
||||
]
|
||||
out.extend(" " + line for line in p.justification.splitlines() or [""])
|
||||
out.extend([
|
||||
"",
|
||||
"proposed file:",
|
||||
])
|
||||
out.extend(p.proposed_file.splitlines() or [""])
|
||||
return out
|
||||
|
||||
|
||||
def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None:
|
||||
"""Suspend curses, open $EDITOR on the proposed file, return the
|
||||
edited content (or None if unchanged)."""
|
||||
suffix = _suffix_for_tool(qp.proposal.tool)
|
||||
curses.endwin()
|
||||
try:
|
||||
edited = edit_in_editor(qp.proposal.proposed_file, suffix=suffix)
|
||||
finally:
|
||||
stdscr.refresh()
|
||||
return edited
|
||||
|
||||
|
||||
def _suffix_for_tool(tool: str) -> str:
|
||||
if tool == TOOL_CAPABILITY_BLOCK:
|
||||
return ".dockerfile"
|
||||
# cred-proxy-block / pipelock-block: JSON-ish + plain.
|
||||
return ".txt"
|
||||
|
||||
|
||||
def _prompt(stdscr: "curses._CursesWindow", label: str) -> str:
|
||||
"""One-line input at the bottom of the screen."""
|
||||
curses.curs_set(1)
|
||||
h, _ = stdscr.getmaxyx()
|
||||
stdscr.move(h - 2, 0)
|
||||
stdscr.clrtoeol()
|
||||
stdscr.addstr(h - 2, 0, label)
|
||||
stdscr.refresh()
|
||||
curses.echo()
|
||||
try:
|
||||
raw = stdscr.getstr(h - 2, len(label), 200)
|
||||
finally:
|
||||
curses.noecho()
|
||||
curses.curs_set(0)
|
||||
return raw.decode("utf-8", errors="replace").strip()
|
||||
|
||||
|
||||
__all__ = [
|
||||
"ACTION_OPERATOR_EDIT", # re-exported for 0014/0015 to write operator-initiated audit entries
|
||||
"QueuedProposal",
|
||||
"approve",
|
||||
"cmd_dashboard",
|
||||
"discover_pending",
|
||||
"edit_in_editor",
|
||||
"reject",
|
||||
]
|
||||
@@ -0,0 +1,227 @@
|
||||
"""Unit: dashboard headless paths (PRD 0013 phase 4).
|
||||
|
||||
The curses TUI itself isn't exercised here — these tests cover the
|
||||
discovery + approve/reject + audit-write paths that the TUI's key
|
||||
handlers call into.
|
||||
"""
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
import unittest
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
from claude_bottle import supervise
|
||||
from claude_bottle.cli import dashboard
|
||||
from claude_bottle.supervise import (
|
||||
Proposal,
|
||||
STATUS_APPROVED,
|
||||
STATUS_MODIFIED,
|
||||
STATUS_REJECTED,
|
||||
TOOL_CAPABILITY_BLOCK,
|
||||
TOOL_CRED_PROXY_BLOCK,
|
||||
TOOL_PIPELOCK_BLOCK,
|
||||
read_audit_entries,
|
||||
read_response,
|
||||
sha256_hex,
|
||||
)
|
||||
|
||||
|
||||
FIXED = datetime(2026, 5, 25, 12, 0, 0, tzinfo=timezone.utc)
|
||||
|
||||
|
||||
def _proposal(slug: str = "dev", tool: str = TOOL_CRED_PROXY_BLOCK) -> Proposal:
|
||||
return Proposal.new(
|
||||
bottle_slug=slug, tool=tool,
|
||||
proposed_file='{"routes": []}\n',
|
||||
justification=f"needed for {slug}",
|
||||
current_file_hash=sha256_hex("{}"),
|
||||
now=FIXED,
|
||||
)
|
||||
|
||||
|
||||
class _FakeHomeMixin:
|
||||
"""Patch supervise.claude_bottle_root to a temp dir for the test."""
|
||||
|
||||
def _setup_fake_home(self):
|
||||
self._tmp = tempfile.TemporaryDirectory(prefix="dashboard-test.")
|
||||
original = supervise.claude_bottle_root
|
||||
|
||||
def fake_root() -> Path:
|
||||
return Path(self._tmp.name) / ".claude-bottle"
|
||||
|
||||
supervise.claude_bottle_root = fake_root # type: ignore[assignment]
|
||||
self._restore_home = lambda: setattr(supervise, "claude_bottle_root", original)
|
||||
|
||||
def _teardown_fake_home(self):
|
||||
self._restore_home()
|
||||
self._tmp.cleanup()
|
||||
|
||||
|
||||
class TestDiscoverPending(_FakeHomeMixin, unittest.TestCase):
|
||||
def setUp(self):
|
||||
self._setup_fake_home()
|
||||
|
||||
def tearDown(self):
|
||||
self._teardown_fake_home()
|
||||
|
||||
def test_empty_when_no_queues(self):
|
||||
self.assertEqual([], dashboard.discover_pending())
|
||||
|
||||
def test_walks_all_slug_subdirs(self):
|
||||
for slug in ("dev", "api"):
|
||||
qdir = supervise.queue_dir_for_slug(slug)
|
||||
qdir.mkdir(parents=True)
|
||||
supervise.write_proposal(qdir, _proposal(slug=slug))
|
||||
pending = dashboard.discover_pending()
|
||||
self.assertEqual({"dev", "api"}, {qp.proposal.bottle_slug for qp in pending})
|
||||
|
||||
def test_sorted_by_arrival_across_bottles(self):
|
||||
early = Proposal.new(
|
||||
bottle_slug="api", tool=TOOL_CRED_PROXY_BLOCK,
|
||||
proposed_file="{}", justification="early",
|
||||
current_file_hash="h",
|
||||
now=datetime(2026, 5, 25, 10, 0, 0, tzinfo=timezone.utc),
|
||||
)
|
||||
late = Proposal.new(
|
||||
bottle_slug="dev", tool=TOOL_CRED_PROXY_BLOCK,
|
||||
proposed_file="{}", justification="late",
|
||||
current_file_hash="h",
|
||||
now=datetime(2026, 5, 25, 14, 0, 0, tzinfo=timezone.utc),
|
||||
)
|
||||
for p in (late, early):
|
||||
qdir = supervise.queue_dir_for_slug(p.bottle_slug)
|
||||
qdir.mkdir(parents=True, exist_ok=True)
|
||||
supervise.write_proposal(qdir, p)
|
||||
pending = dashboard.discover_pending()
|
||||
self.assertEqual([early.id, late.id], [qp.proposal.id for qp in pending])
|
||||
|
||||
def test_excludes_already_responded(self):
|
||||
p = _proposal()
|
||||
qdir = supervise.queue_dir_for_slug("dev")
|
||||
qdir.mkdir(parents=True)
|
||||
supervise.write_proposal(qdir, p)
|
||||
supervise.write_response(qdir, supervise.Response(
|
||||
proposal_id=p.id, status=STATUS_APPROVED, notes="",
|
||||
))
|
||||
self.assertEqual([], dashboard.discover_pending())
|
||||
|
||||
|
||||
class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
|
||||
def setUp(self):
|
||||
self._setup_fake_home()
|
||||
|
||||
def tearDown(self):
|
||||
self._teardown_fake_home()
|
||||
|
||||
def _enqueue(self, tool: str = TOOL_CRED_PROXY_BLOCK):
|
||||
p = _proposal(tool=tool)
|
||||
qdir = supervise.queue_dir_for_slug("dev")
|
||||
qdir.mkdir(parents=True, exist_ok=True)
|
||||
supervise.write_proposal(qdir, p)
|
||||
return dashboard.QueuedProposal(proposal=p, queue_dir=qdir)
|
||||
|
||||
def test_approve_writes_response_and_audit(self):
|
||||
qp = self._enqueue()
|
||||
dashboard.approve(qp)
|
||||
resp = read_response(qp.queue_dir, qp.proposal.id)
|
||||
self.assertEqual(STATUS_APPROVED, resp.status)
|
||||
self.assertIsNone(resp.final_file)
|
||||
entries = read_audit_entries("cred-proxy", "dev")
|
||||
self.assertEqual(1, len(entries))
|
||||
self.assertEqual("approved", entries[0].operator_action)
|
||||
|
||||
def test_approve_with_final_file_marks_modified(self):
|
||||
qp = self._enqueue()
|
||||
dashboard.approve(qp, final_file='{"routes": [{"path": "/x/"}]}\n', notes="tweaked")
|
||||
resp = read_response(qp.queue_dir, qp.proposal.id)
|
||||
self.assertEqual(STATUS_MODIFIED, resp.status)
|
||||
self.assertEqual('{"routes": [{"path": "/x/"}]}\n', resp.final_file)
|
||||
self.assertEqual("tweaked", resp.notes)
|
||||
entries = read_audit_entries("cred-proxy", "dev")
|
||||
self.assertEqual("modified", entries[0].operator_action)
|
||||
|
||||
def test_reject_writes_rejection(self):
|
||||
qp = self._enqueue()
|
||||
dashboard.reject(qp, reason="nope")
|
||||
resp = read_response(qp.queue_dir, qp.proposal.id)
|
||||
self.assertEqual(STATUS_REJECTED, resp.status)
|
||||
self.assertEqual("nope", resp.notes)
|
||||
entries = read_audit_entries("cred-proxy", "dev")
|
||||
self.assertEqual("rejected", entries[0].operator_action)
|
||||
self.assertEqual("nope", entries[0].operator_notes)
|
||||
|
||||
def test_capability_block_skips_audit_log(self):
|
||||
qp = self._enqueue(tool=TOOL_CAPABILITY_BLOCK)
|
||||
dashboard.approve(qp)
|
||||
# No audit log for capability-block (per PRD 0013 / 0016).
|
||||
# cred-proxy and pipelock logs both empty.
|
||||
self.assertEqual([], read_audit_entries("cred-proxy", "dev"))
|
||||
self.assertEqual([], read_audit_entries("pipelock", "dev"))
|
||||
|
||||
def test_pipelock_audit_distinct_from_cred_proxy(self):
|
||||
qp = self._enqueue(tool=TOOL_PIPELOCK_BLOCK)
|
||||
dashboard.approve(qp)
|
||||
self.assertEqual(1, len(read_audit_entries("pipelock", "dev")))
|
||||
self.assertEqual(0, len(read_audit_entries("cred-proxy", "dev")))
|
||||
|
||||
|
||||
class TestEditInEditor(unittest.TestCase):
|
||||
def test_runs_editor_returns_edited_content(self):
|
||||
# Fake "editor" is /bin/sh -c 'cat <<EOF > $1 ... EOF'
|
||||
original_editor = os.environ.get("EDITOR")
|
||||
try:
|
||||
# Use a fake editor that overwrites the file with a known
|
||||
# marker. EDITOR is split with shlex equivalence by
|
||||
# subprocess.run when invoked as a list — keep it as a
|
||||
# single program path that takes the file as argv[1].
|
||||
os.environ["EDITOR"] = (
|
||||
"/bin/sh -c 'printf %s \"edited\" > \"$0\"'"
|
||||
)
|
||||
# subprocess.run with the str as the first list element
|
||||
# would try to find a binary literally named "/bin/sh -c ..."
|
||||
# — that won't work. Use shell mode trick: wrap in a script.
|
||||
# Easier: build a tiny helper script.
|
||||
with tempfile.NamedTemporaryFile(
|
||||
mode="w", suffix=".sh", delete=False, prefix="fake-editor.",
|
||||
) as script:
|
||||
script.write('#!/bin/sh\nprintf "%s" "edited" > "$1"\n')
|
||||
editor_script = script.name
|
||||
os.chmod(editor_script, 0o755)
|
||||
os.environ["EDITOR"] = editor_script
|
||||
try:
|
||||
result = dashboard.edit_in_editor("original")
|
||||
self.assertEqual("edited", result)
|
||||
finally:
|
||||
os.unlink(editor_script)
|
||||
finally:
|
||||
if original_editor is None:
|
||||
os.environ.pop("EDITOR", None)
|
||||
else:
|
||||
os.environ["EDITOR"] = original_editor
|
||||
|
||||
def test_returns_none_when_unchanged(self):
|
||||
original_editor = os.environ.get("EDITOR")
|
||||
try:
|
||||
# No-op editor: touch the file (leaves it unchanged).
|
||||
with tempfile.NamedTemporaryFile(
|
||||
mode="w", suffix=".sh", delete=False, prefix="noop-editor.",
|
||||
) as script:
|
||||
script.write('#!/bin/sh\n: $1\n')
|
||||
editor_script = script.name
|
||||
os.chmod(editor_script, 0o755)
|
||||
os.environ["EDITOR"] = editor_script
|
||||
try:
|
||||
result = dashboard.edit_in_editor("original")
|
||||
self.assertIsNone(result)
|
||||
finally:
|
||||
os.unlink(editor_script)
|
||||
finally:
|
||||
if original_editor is None:
|
||||
os.environ.pop("EDITOR", None)
|
||||
else:
|
||||
os.environ["EDITOR"] = original_editor
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user