feat(dashboard): pipelock edit TUI verb (PRD 0015)
Phase 3 of PRD 0015. Adds the proactive `pipelock edit` path, mirroring routes edit from PRD 0014: - discover_pipelock_slugs() lists running pipelock sidecars. - operator_edit_allowlist(slug, new) wraps apply_allowlist_change and writes an audit entry tagged ACTION_OPERATOR_EDIT. - New 'p' keybinding in the main TUI: discover slugs, prompt if multiple, fetch current allowlist, open in $EDITOR, apply on save. - Extracts shared scaffolding into _operator_edit_flow used by both routes-edit and pipelock-edit — DRY without sacrificing the per-verb status-line copy. - Footer updated. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -70,16 +70,15 @@ class QueuedProposal:
|
|||||||
queue_dir: Path
|
queue_dir: Path
|
||||||
|
|
||||||
|
|
||||||
def discover_cred_proxy_slugs() -> list[str]:
|
def _discover_sidecar_slugs(name_prefix: str) -> list[str]:
|
||||||
"""Slugs of bottles with a running cred-proxy sidecar. Used by
|
"""Slugs of bottles whose sidecar container names start with
|
||||||
the operator-initiated `routes edit` verb to know which bottles
|
`name_prefix`. Empty list if docker isn't reachable or not
|
||||||
are editable. Empty list if docker isn't reachable or not
|
|
||||||
installed."""
|
installed."""
|
||||||
try:
|
try:
|
||||||
r = subprocess.run(
|
r = subprocess.run(
|
||||||
[
|
[
|
||||||
"docker", "ps",
|
"docker", "ps",
|
||||||
"--filter", "name=^claude-bottle-cred-proxy-",
|
"--filter", f"name=^{name_prefix}",
|
||||||
"--format", "{{.Names}}",
|
"--format", "{{.Names}}",
|
||||||
],
|
],
|
||||||
capture_output=True, text=True, check=False,
|
capture_output=True, text=True, check=False,
|
||||||
@@ -88,15 +87,26 @@ def discover_cred_proxy_slugs() -> list[str]:
|
|||||||
return []
|
return []
|
||||||
if r.returncode != 0:
|
if r.returncode != 0:
|
||||||
return []
|
return []
|
||||||
prefix = "claude-bottle-cred-proxy-"
|
|
||||||
out: list[str] = []
|
out: list[str] = []
|
||||||
for line in (r.stdout or "").splitlines():
|
for line in (r.stdout or "").splitlines():
|
||||||
line = line.strip()
|
line = line.strip()
|
||||||
if line.startswith(prefix):
|
if line.startswith(name_prefix):
|
||||||
out.append(line[len(prefix):])
|
out.append(line[len(name_prefix):])
|
||||||
return sorted(out)
|
return sorted(out)
|
||||||
|
|
||||||
|
|
||||||
|
def discover_cred_proxy_slugs() -> list[str]:
|
||||||
|
"""Slugs of bottles with a running cred-proxy sidecar. Used by
|
||||||
|
the operator-initiated `routes edit` verb."""
|
||||||
|
return _discover_sidecar_slugs("claude-bottle-cred-proxy-")
|
||||||
|
|
||||||
|
|
||||||
|
def discover_pipelock_slugs() -> list[str]:
|
||||||
|
"""Slugs of bottles with a running pipelock sidecar. Used by
|
||||||
|
the operator-initiated `pipelock edit` verb."""
|
||||||
|
return _discover_sidecar_slugs("claude-bottle-pipelock-")
|
||||||
|
|
||||||
|
|
||||||
def discover_pending() -> list[QueuedProposal]:
|
def discover_pending() -> list[QueuedProposal]:
|
||||||
"""Walk ~/.claude-bottle/queue/* and collect pending proposals
|
"""Walk ~/.claude-bottle/queue/* and collect pending proposals
|
||||||
from every bottle's queue. Sorted by arrival time across the
|
from every bottle's queue. Sorted by arrival time across the
|
||||||
@@ -197,6 +207,27 @@ def operator_edit_routes(slug: str, new_content: str) -> tuple[str, str]:
|
|||||||
return before, after
|
return before, after
|
||||||
|
|
||||||
|
|
||||||
|
def operator_edit_allowlist(slug: str, new_content: str) -> tuple[str, str]:
|
||||||
|
"""Apply an operator-initiated pipelock allowlist change (no
|
||||||
|
agent proposal). Used by the `pipelock edit <bottle>` TUI verb
|
||||||
|
and available for scripted use. Returns (before, after) like
|
||||||
|
apply_allowlist_change. Writes an audit entry tagged
|
||||||
|
ACTION_OPERATOR_EDIT to distinguish from tool-call approvals.
|
||||||
|
|
||||||
|
Raises PipelockApplyError on failure."""
|
||||||
|
before, after = apply_allowlist_change(slug, new_content)
|
||||||
|
write_audit_entry(AuditEntry(
|
||||||
|
timestamp=datetime.now(timezone.utc).isoformat(),
|
||||||
|
bottle_slug=slug,
|
||||||
|
component="pipelock",
|
||||||
|
operator_action=ACTION_OPERATOR_EDIT,
|
||||||
|
operator_notes="",
|
||||||
|
justification="",
|
||||||
|
diff=render_diff(before, after, label="pipelock"),
|
||||||
|
))
|
||||||
|
return before, after
|
||||||
|
|
||||||
|
|
||||||
def _write_audit(
|
def _write_audit(
|
||||||
qp: QueuedProposal,
|
qp: QueuedProposal,
|
||||||
*,
|
*,
|
||||||
@@ -312,6 +343,9 @@ def _main_loop(stdscr: "curses._CursesWindow") -> None:
|
|||||||
if key == ord("e"):
|
if key == ord("e"):
|
||||||
status_line = _operator_edit_routes_flow(stdscr)
|
status_line = _operator_edit_routes_flow(stdscr)
|
||||||
continue
|
continue
|
||||||
|
if key == ord("p"):
|
||||||
|
status_line = _operator_edit_allowlist_flow(stdscr)
|
||||||
|
continue
|
||||||
if not pending:
|
if not pending:
|
||||||
continue
|
continue
|
||||||
qp = pending[selected]
|
qp = pending[selected]
|
||||||
@@ -381,7 +415,10 @@ def _render(
|
|||||||
attr = curses.A_REVERSE if i == selected else curses.A_NORMAL
|
attr = curses.A_REVERSE if i == selected else curses.A_NORMAL
|
||||||
stdscr.addnstr(row, 0, line, w - 1, attr)
|
stdscr.addnstr(row, 0, line, w - 1, attr)
|
||||||
|
|
||||||
footer = "[Enter] view [a] approve [m] modify [r] reject [e] routes edit [j/k] move [q] quit"
|
footer = (
|
||||||
|
"[Enter] view [a] approve [m] modify [r] reject "
|
||||||
|
"[e] routes edit [p] pipelock edit [j/k] move [q] quit"
|
||||||
|
)
|
||||||
stdscr.hline(h - 2, 0, curses.ACS_HLINE, w)
|
stdscr.hline(h - 2, 0, curses.ACS_HLINE, w)
|
||||||
stdscr.addnstr(h - 1, 0, footer, w - 1, curses.A_DIM)
|
stdscr.addnstr(h - 1, 0, footer, w - 1, curses.A_DIM)
|
||||||
if status_line:
|
if status_line:
|
||||||
@@ -481,33 +518,68 @@ def _operator_edit_routes_flow(stdscr: "curses._CursesWindow") -> str:
|
|||||||
cred-proxy sidecars, pick one (single → use directly; multi →
|
cred-proxy sidecars, pick one (single → use directly; multi →
|
||||||
prompt), fetch the current routes, open in $EDITOR, apply on
|
prompt), fetch the current routes, open in $EDITOR, apply on
|
||||||
save. Returns a status-line message."""
|
save. Returns a status-line message."""
|
||||||
slugs = discover_cred_proxy_slugs()
|
return _operator_edit_flow(
|
||||||
|
stdscr,
|
||||||
|
label="routes",
|
||||||
|
discover=discover_cred_proxy_slugs,
|
||||||
|
fetch=fetch_current_routes,
|
||||||
|
apply=operator_edit_routes,
|
||||||
|
suffix=".json",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _operator_edit_allowlist_flow(stdscr: "curses._CursesWindow") -> str:
|
||||||
|
"""Operator-initiated pipelock allowlist edit."""
|
||||||
|
return _operator_edit_flow(
|
||||||
|
stdscr,
|
||||||
|
label="pipelock",
|
||||||
|
discover=discover_pipelock_slugs,
|
||||||
|
fetch=fetch_current_allowlist,
|
||||||
|
apply=operator_edit_allowlist,
|
||||||
|
suffix=".txt",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _operator_edit_flow(
|
||||||
|
stdscr: "curses._CursesWindow",
|
||||||
|
*,
|
||||||
|
label: str,
|
||||||
|
discover,
|
||||||
|
fetch,
|
||||||
|
apply,
|
||||||
|
suffix: str,
|
||||||
|
) -> str:
|
||||||
|
"""Shared scaffolding for the routes-edit + pipelock-edit verbs.
|
||||||
|
`discover` returns running-sidecar slugs; `fetch(slug)` returns
|
||||||
|
the current operator-facing config; `apply(slug, new)` does the
|
||||||
|
write + restart/SIGHUP and writes the audit entry."""
|
||||||
|
slugs = discover()
|
||||||
if not slugs:
|
if not slugs:
|
||||||
return "no running cred-proxy sidecars to edit"
|
return f"no running {label} sidecars to edit"
|
||||||
if len(slugs) == 1:
|
if len(slugs) == 1:
|
||||||
slug = slugs[0]
|
slug = slugs[0]
|
||||||
else:
|
else:
|
||||||
slug = _prompt(stdscr, f"bottle ({', '.join(slugs)}): ")
|
slug = _prompt(stdscr, f"bottle ({', '.join(slugs)}): ")
|
||||||
if not slug:
|
if not slug:
|
||||||
return "routes edit aborted"
|
return f"{label} edit aborted"
|
||||||
if slug not in slugs:
|
if slug not in slugs:
|
||||||
return f"unknown bottle {slug!r}"
|
return f"unknown bottle {slug!r}"
|
||||||
try:
|
try:
|
||||||
current = fetch_current_routes(slug)
|
current = fetch(slug)
|
||||||
except CredProxyApplyError as e:
|
except ApplyError as e:
|
||||||
return f"fetch failed: {e}"
|
return f"fetch failed: {e}"
|
||||||
curses.endwin()
|
curses.endwin()
|
||||||
try:
|
try:
|
||||||
edited = edit_in_editor(current, suffix=".json")
|
edited = edit_in_editor(current, suffix=suffix)
|
||||||
finally:
|
finally:
|
||||||
stdscr.refresh()
|
stdscr.refresh()
|
||||||
if edited is None:
|
if edited is None:
|
||||||
return f"routes for [{slug}] unchanged"
|
return f"{label} for [{slug}] unchanged"
|
||||||
try:
|
try:
|
||||||
operator_edit_routes(slug, edited)
|
apply(slug, edited)
|
||||||
except CredProxyApplyError as e:
|
except ApplyError as e:
|
||||||
return f"apply failed: {e}"
|
return f"apply failed: {e}"
|
||||||
return f"updated routes for [{slug}]"
|
return f"updated {label} for [{slug}]"
|
||||||
|
|
||||||
|
|
||||||
def _prompt(stdscr: "curses._CursesWindow", label: str) -> str:
|
def _prompt(stdscr: "curses._CursesWindow", label: str) -> str:
|
||||||
|
|||||||
@@ -378,10 +378,41 @@ class TestDiscoverCredProxySlugs(unittest.TestCase):
|
|||||||
os.environ["PATH"] = "/nonexistent-no-docker-here"
|
os.environ["PATH"] = "/nonexistent-no-docker-here"
|
||||||
try:
|
try:
|
||||||
self.assertEqual([], dashboard.discover_cred_proxy_slugs())
|
self.assertEqual([], dashboard.discover_cred_proxy_slugs())
|
||||||
|
self.assertEqual([], dashboard.discover_pipelock_slugs())
|
||||||
finally:
|
finally:
|
||||||
os.environ["PATH"] = original
|
os.environ["PATH"] = original
|
||||||
|
|
||||||
|
|
||||||
|
class TestOperatorEditAllowlist(_FakeHomeMixin, unittest.TestCase):
|
||||||
|
"""PRD 0015 Phase 3: operator-initiated pipelock allowlist edit."""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self._setup_fake_home()
|
||||||
|
self._original = dashboard.apply_allowlist_change
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
dashboard.apply_allowlist_change = self._original
|
||||||
|
self._teardown_fake_home()
|
||||||
|
|
||||||
|
def test_writes_audit_with_operator_edit_action(self):
|
||||||
|
dashboard.apply_allowlist_change = lambda slug, content: (
|
||||||
|
"old.example\n", content,
|
||||||
|
)
|
||||||
|
dashboard.operator_edit_allowlist("dev", "old.example\nnew.example\n")
|
||||||
|
entries = read_audit_entries("pipelock", "dev")
|
||||||
|
self.assertEqual(1, len(entries))
|
||||||
|
self.assertEqual(supervise.ACTION_OPERATOR_EDIT, entries[0].operator_action)
|
||||||
|
self.assertIn("+new.example", entries[0].diff)
|
||||||
|
|
||||||
|
def test_failure_does_not_write_audit(self):
|
||||||
|
dashboard.apply_allowlist_change = lambda slug, content: (_ for _ in ()).throw(
|
||||||
|
PipelockApplyError("nope")
|
||||||
|
)
|
||||||
|
with self.assertRaises(PipelockApplyError):
|
||||||
|
dashboard.operator_edit_allowlist("dev", "x.example\n")
|
||||||
|
self.assertEqual([], read_audit_entries("pipelock", "dev"))
|
||||||
|
|
||||||
|
|
||||||
class TestEditInEditor(unittest.TestCase):
|
class TestEditInEditor(unittest.TestCase):
|
||||||
def test_runs_editor_returns_edited_content(self):
|
def test_runs_editor_returns_edited_content(self):
|
||||||
# Fake "editor" is /bin/sh -c 'cat <<EOF > $1 ... EOF'
|
# Fake "editor" is /bin/sh -c 'cat <<EOF > $1 ... EOF'
|
||||||
|
|||||||
Reference in New Issue
Block a user