diff --git a/claude_bottle/cli/dashboard.py b/claude_bottle/cli/dashboard.py index 3a6e360..c379fb6 100644 --- a/claude_bottle/cli/dashboard.py +++ b/claude_bottle/cli/dashboard.py @@ -70,16 +70,15 @@ class QueuedProposal: queue_dir: Path -def discover_cred_proxy_slugs() -> list[str]: - """Slugs of bottles with a running cred-proxy sidecar. Used by - the operator-initiated `routes edit` verb to know which bottles - are editable. Empty list if docker isn't reachable or not +def _discover_sidecar_slugs(name_prefix: str) -> list[str]: + """Slugs of bottles whose sidecar container names start with + `name_prefix`. Empty list if docker isn't reachable or not installed.""" try: r = subprocess.run( [ "docker", "ps", - "--filter", "name=^claude-bottle-cred-proxy-", + "--filter", f"name=^{name_prefix}", "--format", "{{.Names}}", ], capture_output=True, text=True, check=False, @@ -88,15 +87,26 @@ def discover_cred_proxy_slugs() -> list[str]: return [] if r.returncode != 0: return [] - prefix = "claude-bottle-cred-proxy-" out: list[str] = [] for line in (r.stdout or "").splitlines(): line = line.strip() - if line.startswith(prefix): - out.append(line[len(prefix):]) + if line.startswith(name_prefix): + out.append(line[len(name_prefix):]) return sorted(out) +def discover_cred_proxy_slugs() -> list[str]: + """Slugs of bottles with a running cred-proxy sidecar. Used by + the operator-initiated `routes edit` verb.""" + return _discover_sidecar_slugs("claude-bottle-cred-proxy-") + + +def discover_pipelock_slugs() -> list[str]: + """Slugs of bottles with a running pipelock sidecar. Used by + the operator-initiated `pipelock edit` verb.""" + return _discover_sidecar_slugs("claude-bottle-pipelock-") + + def discover_pending() -> list[QueuedProposal]: """Walk ~/.claude-bottle/queue/* and collect pending proposals from every bottle's queue. Sorted by arrival time across the @@ -197,6 +207,27 @@ def operator_edit_routes(slug: str, new_content: str) -> tuple[str, str]: return before, after +def operator_edit_allowlist(slug: str, new_content: str) -> tuple[str, str]: + """Apply an operator-initiated pipelock allowlist change (no + agent proposal). Used by the `pipelock edit ` TUI verb + and available for scripted use. Returns (before, after) like + apply_allowlist_change. Writes an audit entry tagged + ACTION_OPERATOR_EDIT to distinguish from tool-call approvals. + + Raises PipelockApplyError on failure.""" + before, after = apply_allowlist_change(slug, new_content) + write_audit_entry(AuditEntry( + timestamp=datetime.now(timezone.utc).isoformat(), + bottle_slug=slug, + component="pipelock", + operator_action=ACTION_OPERATOR_EDIT, + operator_notes="", + justification="", + diff=render_diff(before, after, label="pipelock"), + )) + return before, after + + def _write_audit( qp: QueuedProposal, *, @@ -312,6 +343,9 @@ def _main_loop(stdscr: "curses._CursesWindow") -> None: if key == ord("e"): status_line = _operator_edit_routes_flow(stdscr) continue + if key == ord("p"): + status_line = _operator_edit_allowlist_flow(stdscr) + continue if not pending: continue qp = pending[selected] @@ -381,7 +415,10 @@ def _render( attr = curses.A_REVERSE if i == selected else curses.A_NORMAL stdscr.addnstr(row, 0, line, w - 1, attr) - footer = "[Enter] view [a] approve [m] modify [r] reject [e] routes edit [j/k] move [q] quit" + footer = ( + "[Enter] view [a] approve [m] modify [r] reject " + "[e] routes edit [p] pipelock edit [j/k] move [q] quit" + ) stdscr.hline(h - 2, 0, curses.ACS_HLINE, w) stdscr.addnstr(h - 1, 0, footer, w - 1, curses.A_DIM) if status_line: @@ -481,33 +518,68 @@ def _operator_edit_routes_flow(stdscr: "curses._CursesWindow") -> str: cred-proxy sidecars, pick one (single → use directly; multi → prompt), fetch the current routes, open in $EDITOR, apply on save. Returns a status-line message.""" - slugs = discover_cred_proxy_slugs() + return _operator_edit_flow( + stdscr, + label="routes", + discover=discover_cred_proxy_slugs, + fetch=fetch_current_routes, + apply=operator_edit_routes, + suffix=".json", + ) + + +def _operator_edit_allowlist_flow(stdscr: "curses._CursesWindow") -> str: + """Operator-initiated pipelock allowlist edit.""" + return _operator_edit_flow( + stdscr, + label="pipelock", + discover=discover_pipelock_slugs, + fetch=fetch_current_allowlist, + apply=operator_edit_allowlist, + suffix=".txt", + ) + + +def _operator_edit_flow( + stdscr: "curses._CursesWindow", + *, + label: str, + discover, + fetch, + apply, + suffix: str, +) -> str: + """Shared scaffolding for the routes-edit + pipelock-edit verbs. + `discover` returns running-sidecar slugs; `fetch(slug)` returns + the current operator-facing config; `apply(slug, new)` does the + write + restart/SIGHUP and writes the audit entry.""" + slugs = discover() if not slugs: - return "no running cred-proxy sidecars to edit" + return f"no running {label} sidecars to edit" if len(slugs) == 1: slug = slugs[0] else: slug = _prompt(stdscr, f"bottle ({', '.join(slugs)}): ") if not slug: - return "routes edit aborted" + return f"{label} edit aborted" if slug not in slugs: return f"unknown bottle {slug!r}" try: - current = fetch_current_routes(slug) - except CredProxyApplyError as e: + current = fetch(slug) + except ApplyError as e: return f"fetch failed: {e}" curses.endwin() try: - edited = edit_in_editor(current, suffix=".json") + edited = edit_in_editor(current, suffix=suffix) finally: stdscr.refresh() if edited is None: - return f"routes for [{slug}] unchanged" + return f"{label} for [{slug}] unchanged" try: - operator_edit_routes(slug, edited) - except CredProxyApplyError as e: + apply(slug, edited) + except ApplyError as e: return f"apply failed: {e}" - return f"updated routes for [{slug}]" + return f"updated {label} for [{slug}]" def _prompt(stdscr: "curses._CursesWindow", label: str) -> str: diff --git a/tests/unit/test_dashboard.py b/tests/unit/test_dashboard.py index a77b3a9..6e00994 100644 --- a/tests/unit/test_dashboard.py +++ b/tests/unit/test_dashboard.py @@ -378,10 +378,41 @@ class TestDiscoverCredProxySlugs(unittest.TestCase): os.environ["PATH"] = "/nonexistent-no-docker-here" try: self.assertEqual([], dashboard.discover_cred_proxy_slugs()) + self.assertEqual([], dashboard.discover_pipelock_slugs()) finally: os.environ["PATH"] = original +class TestOperatorEditAllowlist(_FakeHomeMixin, unittest.TestCase): + """PRD 0015 Phase 3: operator-initiated pipelock allowlist edit.""" + + def setUp(self): + self._setup_fake_home() + self._original = dashboard.apply_allowlist_change + + def tearDown(self): + dashboard.apply_allowlist_change = self._original + self._teardown_fake_home() + + def test_writes_audit_with_operator_edit_action(self): + dashboard.apply_allowlist_change = lambda slug, content: ( + "old.example\n", content, + ) + dashboard.operator_edit_allowlist("dev", "old.example\nnew.example\n") + entries = read_audit_entries("pipelock", "dev") + self.assertEqual(1, len(entries)) + self.assertEqual(supervise.ACTION_OPERATOR_EDIT, entries[0].operator_action) + self.assertIn("+new.example", entries[0].diff) + + def test_failure_does_not_write_audit(self): + dashboard.apply_allowlist_change = lambda slug, content: (_ for _ in ()).throw( + PipelockApplyError("nope") + ) + with self.assertRaises(PipelockApplyError): + dashboard.operator_edit_allowlist("dev", "x.example\n") + self.assertEqual([], read_audit_entries("pipelock", "dev")) + + class TestEditInEditor(unittest.TestCase): def test_runs_editor_returns_edited_content(self): # Fake "editor" is /bin/sh -c 'cat < $1 ... EOF'