diff --git a/claude_bottle/backend/docker/pipelock_apply.py b/claude_bottle/backend/docker/pipelock_apply.py new file mode 100644 index 0000000..9a27cd2 --- /dev/null +++ b/claude_bottle/backend/docker/pipelock_apply.py @@ -0,0 +1,183 @@ +"""pipelock_apply — host-side helper to apply an api_allowlist +change to a running pipelock sidecar (PRD 0015). + +Used by the supervise dashboard when the operator approves a +pipelock-block proposal (or runs the operator-initiated `pipelock +edit ` verb). Fetches the current pipelock.yaml via `docker +exec`, parses it, swaps the api_allowlist with the proposed hosts, +re-renders, writes back via `docker cp`, then `docker restart` so +pipelock picks up the new config. + +v1 uses restart, not SIGHUP — pipelock has no in-process reload +hook and adding one is the "SIGHUP reload for pipelock" open +question in PRD 0015. Restart drops in-flight outbound calls; the +agent's HTTP client retries pick up against the restarted proxy. +""" + +from __future__ import annotations + +import os +import re +import subprocess +import tempfile +from pathlib import Path + +from ...pipelock import pipelock_render_yaml +from ...yaml_subset import parse_yaml_subset +from .pipelock import pipelock_container_name + + +PIPELOCK_YAML_IN_CONTAINER = "/etc/pipelock.yaml" + +# Allowlist proposals are one-hostname-per-line. Blank lines and +# `#`-prefixed comments are ignored. The character set matches the +# supervise sidecar's syntactic check on the agent's pipelock-block +# proposal (alphanumerics + dot/dash/underscore). +_HOST_OK = re.compile(r"^[A-Za-z0-9_.-]+$") + + +class PipelockApplyError(RuntimeError): + """Raised when fetch / parse / apply fails. The dashboard renders + the message and keeps the proposal pending — never crashes.""" + + +def parse_allowlist_content(content: str) -> list[str]: + """One hostname per line. Blanks and `#` comments are ignored. + Raises PipelockApplyError if a line has a disallowed character.""" + hosts: list[str] = [] + for i, raw_line in enumerate(content.splitlines(), start=1): + line = raw_line.strip() + if not line or line.startswith("#"): + continue + if not _HOST_OK.match(line): + raise PipelockApplyError( + f"allowlist line {i}: {line!r} has disallowed characters" + ) + hosts.append(line) + return hosts + + +def render_allowlist_content(hosts: list[str]) -> str: + """Hosts → one-per-line string (the operator-facing format).""" + if not hosts: + return "" + return "\n".join(hosts) + "\n" + + +def fetch_current_yaml(slug: str) -> str: + """Read the live /etc/pipelock.yaml from the pipelock sidecar. + + Uses `docker cp` (not `docker exec cat`) because the pipelock + image is distroless and has no shell utilities. `docker cp` is a + daemon-API tarball copy — works on stopped containers too, and + doesn't need anything in the container's PATH. + + Raises PipelockApplyError if the read fails.""" + container = pipelock_container_name(slug) + fd, tmp_path = tempfile.mkstemp(prefix="cb-pipelock-fetch.", suffix=".yaml") + os.close(fd) + try: + r = subprocess.run( + [ + "docker", "cp", + f"{container}:{PIPELOCK_YAML_IN_CONTAINER}", tmp_path, + ], + capture_output=True, text=True, check=False, + ) + if r.returncode != 0: + raise PipelockApplyError( + f"could not fetch pipelock.yaml from {container}: " + f"{(r.stderr or '').strip() or 'container not running?'}" + ) + return Path(tmp_path).read_text() + finally: + try: + Path(tmp_path).unlink() + except OSError: + pass + + +def fetch_current_allowlist(slug: str) -> str: + """Fetch the live yaml, extract api_allowlist, render as one-per- + line — the operator-facing format for the TUI / agent's + current-config mount.""" + yaml = fetch_current_yaml(slug) + cfg = parse_yaml_subset(yaml) + hosts = cfg.get("api_allowlist", []) + if not isinstance(hosts, list): + raise PipelockApplyError( + "running pipelock yaml: api_allowlist is not a list" + ) + return render_allowlist_content([str(h) for h in hosts]) + + +def apply_allowlist_change( + slug: str, new_allowlist_content: str, +) -> tuple[str, str]: + """Apply `new_allowlist_content` to the pipelock sidecar: + 1. Parse the proposed hosts (one per line). + 2. Fetch + parse current pipelock.yaml. + 3. Replace api_allowlist with the proposed hosts; re-render. + 4. docker cp the new yaml into the sidecar. + 5. docker restart so pipelock reloads. + + Returns (before, after) where both are one-per-line allowlist + strings (operator-facing format). Raises PipelockApplyError on + any failure; the sidecar's existing config stays in place until + docker cp succeeds, and the restart is what makes it live.""" + new_hosts = parse_allowlist_content(new_allowlist_content) + container = pipelock_container_name(slug) + current_yaml = fetch_current_yaml(slug) + cfg = parse_yaml_subset(current_yaml) + current_hosts = cfg.get("api_allowlist", []) + if not isinstance(current_hosts, list): + raise PipelockApplyError( + "running pipelock yaml: api_allowlist is not a list" + ) + + before = render_allowlist_content([str(h) for h in current_hosts]) + after = render_allowlist_content(new_hosts) + + cfg["api_allowlist"] = new_hosts + rendered = pipelock_render_yaml(cfg) + + fd, tmp_path = tempfile.mkstemp(prefix="cb-pipelock-yaml.", suffix=".yaml") + try: + with os.fdopen(fd, "w") as f: + f.write(rendered) + cp = subprocess.run( + ["docker", "cp", tmp_path, f"{container}:{PIPELOCK_YAML_IN_CONTAINER}"], + capture_output=True, text=True, check=False, + ) + if cp.returncode != 0: + raise PipelockApplyError( + f"failed to copy pipelock.yaml into {container}: " + f"{(cp.stderr or '').strip()}" + ) + restart = subprocess.run( + ["docker", "restart", container], + capture_output=True, text=True, check=False, + ) + if restart.returncode != 0: + raise PipelockApplyError( + f"failed to restart {container}: " + f"{(restart.stderr or '').strip()}" + ) + finally: + try: + Path(tmp_path).unlink() + except OSError: + pass + + return before, after + + +__all__ = [ + "PIPELOCK_YAML_IN_CONTAINER", + "PipelockApplyError", + "apply_allowlist_change", + "fetch_current_allowlist", + "fetch_current_yaml", + "parse_allowlist_content", + "render_allowlist_content", +] diff --git a/claude_bottle/cli/dashboard.py b/claude_bottle/cli/dashboard.py index f39d401..c379fb6 100644 --- a/claude_bottle/cli/dashboard.py +++ b/claude_bottle/cli/dashboard.py @@ -27,6 +27,11 @@ from ..backend.docker.cred_proxy_apply import ( apply_routes_change, fetch_current_routes, ) +from ..backend.docker.pipelock_apply import ( + PipelockApplyError, + apply_allowlist_change, + fetch_current_allowlist, +) from ..log import info from ..supervise import ( ACTION_OPERATOR_EDIT, @@ -39,6 +44,7 @@ from ..supervise import ( STATUS_REJECTED, TOOL_CAPABILITY_BLOCK, TOOL_CRED_PROXY_BLOCK, + TOOL_PIPELOCK_BLOCK, list_pending_proposals, render_diff, write_audit_entry, @@ -47,6 +53,12 @@ from ..supervise import ( from ._common import PROG +# Errors any remediation engine may raise. Caught by the TUI key +# handlers and surfaced in the status line so a failed apply keeps +# the proposal pending rather than crashing curses. +ApplyError = (CredProxyApplyError, PipelockApplyError) + + # --- Discovery ------------------------------------------------------------- @@ -58,16 +70,15 @@ class QueuedProposal: queue_dir: Path -def discover_cred_proxy_slugs() -> list[str]: - """Slugs of bottles with a running cred-proxy sidecar. Used by - the operator-initiated `routes edit` verb to know which bottles - are editable. Empty list if docker isn't reachable or not +def _discover_sidecar_slugs(name_prefix: str) -> list[str]: + """Slugs of bottles whose sidecar container names start with + `name_prefix`. Empty list if docker isn't reachable or not installed.""" try: r = subprocess.run( [ "docker", "ps", - "--filter", "name=^claude-bottle-cred-proxy-", + "--filter", f"name=^{name_prefix}", "--format", "{{.Names}}", ], capture_output=True, text=True, check=False, @@ -76,15 +87,26 @@ def discover_cred_proxy_slugs() -> list[str]: return [] if r.returncode != 0: return [] - prefix = "claude-bottle-cred-proxy-" out: list[str] = [] for line in (r.stdout or "").splitlines(): line = line.strip() - if line.startswith(prefix): - out.append(line[len(prefix):]) + if line.startswith(name_prefix): + out.append(line[len(name_prefix):]) return sorted(out) +def discover_cred_proxy_slugs() -> list[str]: + """Slugs of bottles with a running cred-proxy sidecar. Used by + the operator-initiated `routes edit` verb.""" + return _discover_sidecar_slugs("claude-bottle-cred-proxy-") + + +def discover_pipelock_slugs() -> list[str]: + """Slugs of bottles with a running pipelock sidecar. Used by + the operator-initiated `pipelock edit` verb.""" + return _discover_sidecar_slugs("claude-bottle-pipelock-") + + def discover_pending() -> list[QueuedProposal]: """Walk ~/.claude-bottle/queue/* and collect pending proposals from every bottle's queue. Sorted by arrival time across the @@ -129,9 +151,13 @@ def approve( diff_before, diff_after = apply_routes_change( qp.proposal.bottle_slug, file_to_apply, ) - # pipelock-block + capability-block remediation lands in PRDs - # 0015 + 0016; for 0014 they remain no-op approvals and the - # audit diff stays empty. + elif qp.proposal.tool == TOOL_PIPELOCK_BLOCK: + diff_before, diff_after = apply_allowlist_change( + qp.proposal.bottle_slug, file_to_apply, + ) + # capability-block remediation lands in PRD 0016; until then + # it stays a no-op approval and the audit (none for capability) + # is skipped. response = Response( proposal_id=qp.proposal.id, @@ -181,6 +207,27 @@ def operator_edit_routes(slug: str, new_content: str) -> tuple[str, str]: return before, after +def operator_edit_allowlist(slug: str, new_content: str) -> tuple[str, str]: + """Apply an operator-initiated pipelock allowlist change (no + agent proposal). Used by the `pipelock edit ` TUI verb + and available for scripted use. Returns (before, after) like + apply_allowlist_change. Writes an audit entry tagged + ACTION_OPERATOR_EDIT to distinguish from tool-call approvals. + + Raises PipelockApplyError on failure.""" + before, after = apply_allowlist_change(slug, new_content) + write_audit_entry(AuditEntry( + timestamp=datetime.now(timezone.utc).isoformat(), + bottle_slug=slug, + component="pipelock", + operator_action=ACTION_OPERATOR_EDIT, + operator_notes="", + justification="", + diff=render_diff(before, after, label="pipelock"), + )) + return before, after + + def _write_audit( qp: QueuedProposal, *, @@ -296,6 +343,9 @@ def _main_loop(stdscr: "curses._CursesWindow") -> None: if key == ord("e"): status_line = _operator_edit_routes_flow(stdscr) continue + if key == ord("p"): + status_line = _operator_edit_allowlist_flow(stdscr) + continue if not pending: continue qp = pending[selected] @@ -310,7 +360,7 @@ def _main_loop(stdscr: "curses._CursesWindow") -> None: try: approve(qp) status_line = f"approved {qp.proposal.tool} for [{qp.proposal.bottle_slug}]" - except CredProxyApplyError as e: + except ApplyError as e: status_line = f"apply failed: {e}" elif key == ord("m"): edited = _modify(stdscr, qp) @@ -320,7 +370,7 @@ def _main_loop(stdscr: "curses._CursesWindow") -> None: try: approve(qp, final_file=edited, notes="operator modified before approving") status_line = f"modified+approved {qp.proposal.tool} for [{qp.proposal.bottle_slug}]" - except CredProxyApplyError as e: + except ApplyError as e: status_line = f"apply failed: {e}" elif key == ord("r"): reason = _prompt(stdscr, "reject reason: ") @@ -365,7 +415,10 @@ def _render( attr = curses.A_REVERSE if i == selected else curses.A_NORMAL stdscr.addnstr(row, 0, line, w - 1, attr) - footer = "[Enter] view [a] approve [m] modify [r] reject [e] routes edit [j/k] move [q] quit" + footer = ( + "[Enter] view [a] approve [m] modify [r] reject " + "[e] routes edit [p] pipelock edit [j/k] move [q] quit" + ) stdscr.hline(h - 2, 0, curses.ACS_HLINE, w) stdscr.addnstr(h - 1, 0, footer, w - 1, curses.A_DIM) if status_line: @@ -403,7 +456,7 @@ def _detail_view(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> None: elif key == ord("a"): try: approve(qp) - except CredProxyApplyError: + except ApplyError: pass # Status surfaces back in the list view's render. return elif key == ord("m"): @@ -411,7 +464,7 @@ def _detail_view(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> None: if edited is not None: try: approve(qp, final_file=edited, notes="operator modified before approving") - except CredProxyApplyError: + except ApplyError: pass return elif key == ord("r"): @@ -465,33 +518,68 @@ def _operator_edit_routes_flow(stdscr: "curses._CursesWindow") -> str: cred-proxy sidecars, pick one (single → use directly; multi → prompt), fetch the current routes, open in $EDITOR, apply on save. Returns a status-line message.""" - slugs = discover_cred_proxy_slugs() + return _operator_edit_flow( + stdscr, + label="routes", + discover=discover_cred_proxy_slugs, + fetch=fetch_current_routes, + apply=operator_edit_routes, + suffix=".json", + ) + + +def _operator_edit_allowlist_flow(stdscr: "curses._CursesWindow") -> str: + """Operator-initiated pipelock allowlist edit.""" + return _operator_edit_flow( + stdscr, + label="pipelock", + discover=discover_pipelock_slugs, + fetch=fetch_current_allowlist, + apply=operator_edit_allowlist, + suffix=".txt", + ) + + +def _operator_edit_flow( + stdscr: "curses._CursesWindow", + *, + label: str, + discover, + fetch, + apply, + suffix: str, +) -> str: + """Shared scaffolding for the routes-edit + pipelock-edit verbs. + `discover` returns running-sidecar slugs; `fetch(slug)` returns + the current operator-facing config; `apply(slug, new)` does the + write + restart/SIGHUP and writes the audit entry.""" + slugs = discover() if not slugs: - return "no running cred-proxy sidecars to edit" + return f"no running {label} sidecars to edit" if len(slugs) == 1: slug = slugs[0] else: slug = _prompt(stdscr, f"bottle ({', '.join(slugs)}): ") if not slug: - return "routes edit aborted" + return f"{label} edit aborted" if slug not in slugs: return f"unknown bottle {slug!r}" try: - current = fetch_current_routes(slug) - except CredProxyApplyError as e: + current = fetch(slug) + except ApplyError as e: return f"fetch failed: {e}" curses.endwin() try: - edited = edit_in_editor(current, suffix=".json") + edited = edit_in_editor(current, suffix=suffix) finally: stdscr.refresh() if edited is None: - return f"routes for [{slug}] unchanged" + return f"{label} for [{slug}] unchanged" try: - operator_edit_routes(slug, edited) - except CredProxyApplyError as e: + apply(slug, edited) + except ApplyError as e: return f"apply failed: {e}" - return f"updated routes for [{slug}]" + return f"updated {label} for [{slug}]" def _prompt(stdscr: "curses._CursesWindow", label: str) -> str: diff --git a/docs/prds/0015-pipelock-block-remediation.md b/docs/prds/0015-pipelock-block-remediation.md new file mode 100644 index 0000000..1534fa4 --- /dev/null +++ b/docs/prds/0015-pipelock-block-remediation.md @@ -0,0 +1,67 @@ +# PRD 0015: pipelock block remediation + +- **Status:** Draft +- **Author:** didericis +- **Created:** 2026-05-25 +- **Parent:** PRD 0012 +- **Depends on:** PRD 0013 + +## Summary + +Wires the **pipelock block** path (PRD 0012 *Stuck categories*) end-to-end. The supervisor, on approval of a `pipelock-block` proposal, writes the new pipelock allowlist to the host and restarts pipelock; the agent's in-flight outbound calls may drop and rely on retry. The TUI gains a proactive `pipelock edit ` verb for operator-initiated edits unrelated to a tool call. The pipelock audit log (format defined in PRD 0013) is filled in with real entries on every edit. + +## Problem + +See PRD 0012. This PRD specifically addresses: with 0013 in place, the operator can approve a `pipelock-block` proposal but nothing happens — the allowlist doesn't change and pipelock doesn't notice. This PRD closes the loop, using restart-based reload. SIGHUP for pipelock is deferred to a follow-up (see Open questions). + +## Goals / Success Criteria + +A real pipelock block recovers end-to-end: the agent's outbound HTTP request fails with a connection-refused (host not in allowlist), the agent calls `pipelock-block` with a proposed allowlist and justification, the operator approves in the TUI, the supervisor writes the new allowlist and restarts pipelock, the agent retries and proceeds. + +## Non-goals + +- SIGHUP / hot reload for pipelock. v1 ships restart-based reload only. +- One-off gitlock / pipelock exceptions (e.g. waive a specific commit SHA, not a permanent allowlist entry) — see Open questions. +- cred-proxy or capability handling (covered by 0014 and 0016). + +## Scope + +### In scope + +- Supervisor write path: on operator approval of a `pipelock-block` proposal, write the proposed allowlist to the host-side path pipelock reads, then restart the pipelock container. +- `pipelock edit ` TUI verb: open the bottle's current pipelock allowlist in `$EDITOR`, write + restart pipelock on save. Not gated on a pending proposal. +- pipelock audit log entries: every allowlist edit (from a tool-call approval or from a proactive `pipelock edit`) appends an entry with timestamp, diff, justification (if from tool call), and operator action. + +### Out of scope + +- SIGHUP reload (deferred — see Open questions). +- cred-proxy equivalents (PRD 0014). + +## Proposed Design + +### New services / components + +- **`pipelock edit ` TUI verb.** Opens the bottle's current pipelock allowlist in `$EDITOR`. On save, the supervisor writes the new file and restarts pipelock. + +### Existing code touched + +- **pipelock** — gains a clean restart path that picks up the new allowlist on container restart. No pipelock code changes likely needed if pipelock already reads its config on startup; the orchestration is supervisor-side. +- **MCP sidecar** (PRD 0013) — the `pipelock-block` approval handler stops being a no-op; on approval, calls the supervisor's write+restart path. +- **`cli.py`** — dashboard subcommand gains the `pipelock edit` verb. + +### Data model changes + +None beyond PRD 0013. + +## Open questions + +- **SIGHUP reload for pipelock.** v1 ships restart-based reload, which drops in-flight outbound calls. Should pipelock gain SIGHUP support so **pipelock block** is as cheap as **cred-proxy block**? Depends on how often the operator edits the allowlist mid-task and how disruptive a pipelock bounce actually is. +- **Gitlock / pipelock one-off exceptions.** Some pipelock denials don't want a permanent allowlist entry — e.g. a commit that includes docs with intentionally-bogus tokens that the secret scanner correctly flags. The shape (agent blocked → tool call → operator decides → result) is the same, but the *resolution* is a per-operation override or a scoped allowlist entry, not a permanent edit. Is this a fourth tool (`exception-block`?), or does it fold into `pipelock-block` with a scoped one-shot allowlist entry? Either way, the approval must be auditable so a future reader can see what was waived and why. See `docs/research/git-gate-commit-approval.md` for a survey of gitleaks's native allowlist primitives and a recommendation. + +## References + +- PRD 0001 — per-agent egress proxy via pipelock. +- PRD 0008 — git-gate. +- PRD 0012 — stuck-agent recovery flow overview. +- PRD 0013 — supervise plane foundation (prerequisite). +- `docs/research/git-gate-commit-approval.md` — gitleaks allowlist primitives. diff --git a/tests/integration/test_pipelock_apply.py b/tests/integration/test_pipelock_apply.py new file mode 100644 index 0000000..3f0181b --- /dev/null +++ b/tests/integration/test_pipelock_apply.py @@ -0,0 +1,152 @@ +"""Integration: drive `apply_allowlist_change` against a real +pipelock sidecar (PRD 0015). + +Brings up a real pipelock sidecar (via the production DockerPipelockProxy +bring-up), calls apply_allowlist_change to swap the api_allowlist, +restarts pipelock, and verifies the running container now serves the +new yaml. + +Setup uses pipelock_tls_init which bind-mounts a host path into a +one-shot pipelock container — that doesn't work in DinD, so the test +skips under GITEA_ACTIONS the same way the existing pipelock smoke +test does. +""" + +from __future__ import annotations + +import dataclasses +import os +import shutil +import subprocess +import tempfile +import time +import unittest +from pathlib import Path + +from claude_bottle.backend.docker.network import ( + network_create_egress, + network_create_internal, + network_remove, +) +from claude_bottle.backend.docker.pipelock import ( + DockerPipelockProxy, + pipelock_container_name, + pipelock_tls_init, +) +from claude_bottle.backend.docker.pipelock_apply import ( + PipelockApplyError, + apply_allowlist_change, + fetch_current_allowlist, + fetch_current_yaml, +) +from claude_bottle.yaml_subset import parse_yaml_subset +from tests._docker import skip_unless_docker +from tests.fixtures import fixture_minimal + + +@skip_unless_docker() +@unittest.skipIf( + os.environ.get("GITEA_ACTIONS") == "true", + "skipped under act_runner: pipelock_tls_init uses a host bind mount " + "that doesn't share fs with the runner container", +) +class TestPipelockApply(unittest.TestCase): + def setUp(self): + self.slug = f"cb-test-pla-{os.getpid()}-{int(time.time())}" + self.sidecar_name = "" + self.internal_net = "" + self.egress_net = "" + self.work_dir = Path(tempfile.mkdtemp(prefix="pipelock-apply.")) + + def tearDown(self): + if self.sidecar_name: + DockerPipelockProxy().stop(self.sidecar_name) + for n in (self.internal_net, self.egress_net): + if n: + network_remove(n) + shutil.rmtree(self.work_dir, ignore_errors=True) + + def _bring_up(self) -> None: + proxy = DockerPipelockProxy() + prep = proxy.prepare(fixture_minimal().bottles["dev"], self.slug, self.work_dir) + self.internal_net = network_create_internal(self.slug) + self.egress_net = network_create_egress(self.slug) + ca_cert_host, ca_key_host = pipelock_tls_init(self.work_dir) + plan = dataclasses.replace( + prep, + internal_network=self.internal_net, + egress_network=self.egress_net, + ca_cert_host_path=ca_cert_host, + ca_key_host_path=ca_key_host, + ) + self.sidecar_name = proxy.start(plan) + self.assertEqual(pipelock_container_name(self.slug), self.sidecar_name) + # Wait until docker exec succeeds — the container is up but + # pipelock may still be initializing. fetch_current_yaml is + # itself a docker exec, so retrying it doubles as a readiness + # probe. + deadline = time.monotonic() + 15.0 + while time.monotonic() < deadline: + try: + fetch_current_yaml(self.slug) + return + except PipelockApplyError: + pass + time.sleep(0.25) + raise AssertionError("pipelock sidecar never became reachable") + + def _wait_for_yaml(self, contains: str, *, deadline_s: float = 15.0) -> str: + """Poll docker exec until /etc/pipelock.yaml contains `contains`, + returning the yaml. Used to bridge the docker-restart window.""" + deadline = time.monotonic() + deadline_s + while time.monotonic() < deadline: + try: + yaml = fetch_current_yaml(self.slug) + if contains in yaml: + return yaml + except PipelockApplyError: + pass + time.sleep(0.25) + self.fail(f"never saw {contains!r} in /etc/pipelock.yaml") + + def test_apply_swaps_api_allowlist(self): + self._bring_up() + + initial_yaml = fetch_current_yaml(self.slug) + # fixture_minimal yields the baked-in DEFAULT_ALLOWLIST in + # pipelock.py; api.anthropic.com is in there. + self.assertIn("api.anthropic.com", initial_yaml) + + new_content = "api.anthropic.com\nnew-host.example\n" + before, after = apply_allowlist_change(self.slug, new_content) + self.assertIn("api.anthropic.com", before) + self.assertNotIn("new-host.example", before) + self.assertIn("new-host.example", after) + + updated = self._wait_for_yaml("new-host.example") + cfg = parse_yaml_subset(updated) + self.assertIn("new-host.example", cfg["api_allowlist"]) # type: ignore[operator] + self.assertIn("api.anthropic.com", cfg["api_allowlist"]) # type: ignore[operator] + # tls_interception block (set up by the production prepare + # via pipelock_build_config) is preserved across the swap. + self.assertIn("tls_interception", cfg) + + def test_apply_with_invalid_host_raises(self): + self._bring_up() + with self.assertRaises(PipelockApplyError): + apply_allowlist_change(self.slug, "host with space.example\n") + + def test_fetch_current_allowlist_renders_one_per_line(self): + self._bring_up() + listing = fetch_current_allowlist(self.slug) + self.assertTrue(listing.endswith("\n")) + self.assertIn("api.anthropic.com\n", listing) + + def test_apply_against_missing_sidecar_raises(self): + # Don't bring up — the slug points at nothing. + with self.assertRaises(PipelockApplyError): + apply_allowlist_change(self.slug, "x.example\n") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/test_dashboard.py b/tests/unit/test_dashboard.py index c897cf9..6e00994 100644 --- a/tests/unit/test_dashboard.py +++ b/tests/unit/test_dashboard.py @@ -17,6 +17,7 @@ from pathlib import Path from claude_bottle import supervise from claude_bottle.backend.docker.cred_proxy_apply import CredProxyApplyError +from claude_bottle.backend.docker.pipelock_apply import PipelockApplyError from claude_bottle.cli import dashboard from claude_bottle.supervise import ( Proposal, @@ -115,15 +116,20 @@ class TestDiscoverPending(_FakeHomeMixin, unittest.TestCase): class TestApproveReject(_FakeHomeMixin, unittest.TestCase): def setUp(self): self._setup_fake_home() - self._original_apply = dashboard.apply_routes_change - # Default stub: succeed with deterministic before/after so the + self._original_apply_routes = dashboard.apply_routes_change + self._original_apply_allowlist = dashboard.apply_allowlist_change + # Default stubs: succeed with deterministic before/after so the # audit log shows a non-empty diff. dashboard.apply_routes_change = lambda slug, content: ( '{"routes": []}\n', content, ) + dashboard.apply_allowlist_change = lambda slug, content: ( + "old.example\n", content, + ) def tearDown(self): - dashboard.apply_routes_change = self._original_apply + dashboard.apply_routes_change = self._original_apply_routes + dashboard.apply_allowlist_change = self._original_apply_allowlist self._teardown_fake_home() def _enqueue(self, tool: str = TOOL_CRED_PROXY_BLOCK): @@ -268,6 +274,65 @@ class TestCredProxyApplyWiring(_FakeHomeMixin, unittest.TestCase): self.assertEqual("", entries[0].diff) +class TestPipelockApplyWiring(_FakeHomeMixin, unittest.TestCase): + """PRD 0015 Phase 2: approve() on a pipelock-block proposal + must call apply_allowlist_change and surface its failures.""" + + def setUp(self): + self._setup_fake_home() + self._original = dashboard.apply_allowlist_change + + def tearDown(self): + dashboard.apply_allowlist_change = self._original + self._teardown_fake_home() + + def _enqueue_pipelock(self, proposed: str = "host.example\n"): + p = Proposal.new( + bottle_slug="dev", tool=TOOL_PIPELOCK_BLOCK, + proposed_file=proposed, + justification="need new host", + current_file_hash=sha256_hex(proposed), + now=FIXED, + ) + qdir = supervise.queue_dir_for_slug("dev") + qdir.mkdir(parents=True, exist_ok=True) + supervise.write_proposal(qdir, p) + return dashboard.QueuedProposal(proposal=p, queue_dir=qdir) + + def test_pipelock_block_calls_apply_with_proposed_file(self): + calls = [] + dashboard.apply_allowlist_change = lambda slug, content: ( + calls.append((slug, content)) or ("before", content) + ) + qp = self._enqueue_pipelock("new.example\n") + dashboard.approve(qp) + self.assertEqual([("dev", "new.example\n")], calls) + + def test_apply_failure_blocks_response_and_audit(self): + dashboard.apply_allowlist_change = lambda slug, content: (_ for _ in ()).throw( + PipelockApplyError("docker exec failed") + ) + qp = self._enqueue_pipelock() + with self.assertRaises(PipelockApplyError): + dashboard.approve(qp) + self.assertEqual( + [qp.proposal.id], + [p.id for p in supervise.list_pending_proposals(qp.queue_dir)], + ) + self.assertEqual([], read_audit_entries("pipelock", "dev")) + + def test_real_diff_lands_in_audit(self): + dashboard.apply_allowlist_change = lambda slug, content: ( + "old.example\n", + "old.example\nnew.example\n", + ) + qp = self._enqueue_pipelock("old.example\nnew.example\n") + dashboard.approve(qp) + entries = read_audit_entries("pipelock", "dev") + self.assertEqual(1, len(entries)) + self.assertIn("+new.example", entries[0].diff) + + class TestOperatorEditRoutes(_FakeHomeMixin, unittest.TestCase): """PRD 0014 Phase 4: operator-initiated routes edit (not gated on a pending proposal).""" @@ -313,10 +378,41 @@ class TestDiscoverCredProxySlugs(unittest.TestCase): os.environ["PATH"] = "/nonexistent-no-docker-here" try: self.assertEqual([], dashboard.discover_cred_proxy_slugs()) + self.assertEqual([], dashboard.discover_pipelock_slugs()) finally: os.environ["PATH"] = original +class TestOperatorEditAllowlist(_FakeHomeMixin, unittest.TestCase): + """PRD 0015 Phase 3: operator-initiated pipelock allowlist edit.""" + + def setUp(self): + self._setup_fake_home() + self._original = dashboard.apply_allowlist_change + + def tearDown(self): + dashboard.apply_allowlist_change = self._original + self._teardown_fake_home() + + def test_writes_audit_with_operator_edit_action(self): + dashboard.apply_allowlist_change = lambda slug, content: ( + "old.example\n", content, + ) + dashboard.operator_edit_allowlist("dev", "old.example\nnew.example\n") + entries = read_audit_entries("pipelock", "dev") + self.assertEqual(1, len(entries)) + self.assertEqual(supervise.ACTION_OPERATOR_EDIT, entries[0].operator_action) + self.assertIn("+new.example", entries[0].diff) + + def test_failure_does_not_write_audit(self): + dashboard.apply_allowlist_change = lambda slug, content: (_ for _ in ()).throw( + PipelockApplyError("nope") + ) + with self.assertRaises(PipelockApplyError): + dashboard.operator_edit_allowlist("dev", "x.example\n") + self.assertEqual([], read_audit_entries("pipelock", "dev")) + + class TestEditInEditor(unittest.TestCase): def test_runs_editor_returns_edited_content(self): # Fake "editor" is /bin/sh -c 'cat < $1 ... EOF' diff --git a/tests/unit/test_pipelock_apply.py b/tests/unit/test_pipelock_apply.py new file mode 100644 index 0000000..1f48226 --- /dev/null +++ b/tests/unit/test_pipelock_apply.py @@ -0,0 +1,115 @@ +"""Unit: pipelock_apply parsers + helpers (PRD 0015 Phase 1). + +docker exec / cp / restart paths are covered by the integration +test in Phase 4. Here we cover the host-side parsing + yaml roundtrip. +""" + +import unittest + +from claude_bottle.backend.docker.pipelock_apply import ( + PipelockApplyError, + parse_allowlist_content, + render_allowlist_content, +) +from claude_bottle.pipelock import pipelock_render_yaml +from claude_bottle.yaml_subset import parse_yaml_subset + + +class TestParseAllowlistContent(unittest.TestCase): + def test_one_per_line(self): + self.assertEqual( + ["a.example", "b.example"], + parse_allowlist_content("a.example\nb.example\n"), + ) + + def test_blank_lines_ignored(self): + self.assertEqual( + ["a", "b"], + parse_allowlist_content("a\n\n \nb\n"), + ) + + def test_comments_ignored(self): + self.assertEqual( + ["a"], + parse_allowlist_content("# top comment\na\n# trailing\n"), + ) + + def test_invalid_char_raises(self): + with self.assertRaises(PipelockApplyError) as cm: + parse_allowlist_content("host with space\n") + self.assertIn("disallowed characters", str(cm.exception)) + + def test_empty_input_returns_empty_list(self): + self.assertEqual([], parse_allowlist_content("")) + + +class TestRenderAllowlistContent(unittest.TestCase): + def test_one_per_line_with_trailing_newline(self): + self.assertEqual("a\nb\n", render_allowlist_content(["a", "b"])) + + def test_empty_renders_empty(self): + self.assertEqual("", render_allowlist_content([])) + + def test_roundtrip(self): + original = ["api.example.com", "ghcr.io", "example.org"] + self.assertEqual( + original, + parse_allowlist_content(render_allowlist_content(original)), + ) + + +class TestYamlRoundtripPreservesPipelockFields(unittest.TestCase): + """The apply path parses the running pipelock.yaml, swaps + api_allowlist, re-renders. Verify that parse(render(cfg)) == + cfg for the fields pipelock_render_yaml emits — otherwise + the apply would silently drop config.""" + + def test_minimal_config_roundtrips(self): + cfg = { + "version": 1, + "mode": "strict", + "enforce": True, + "api_allowlist": ["a.example", "b.example"], + "forward_proxy": {"enabled": True}, + "dlp": {"include_defaults": True, "scan_env": True}, + "request_body_scanning": {"action": "block"}, + } + rendered = pipelock_render_yaml(cfg) + parsed = parse_yaml_subset(rendered) + self.assertEqual(["a.example", "b.example"], parsed["api_allowlist"]) + self.assertEqual(1, parsed["version"]) + self.assertEqual("strict", parsed["mode"]) + self.assertEqual(True, parsed["enforce"]) + + def test_swap_allowlist_then_render_preserves_other_fields(self): + cfg = { + "version": 1, + "mode": "strict", + "enforce": True, + "api_allowlist": ["old.example"], + "forward_proxy": {"enabled": True}, + "dlp": {"include_defaults": True, "scan_env": True}, + "request_body_scanning": {"action": "block"}, + "tls_interception": { + "enabled": True, + "ca_cert": "/etc/pipelock-ca.pem", + "ca_key": "/etc/pipelock-ca-key.pem", + "passthrough_domains": ["api.anthropic.com"], + }, + } + parsed = parse_yaml_subset(pipelock_render_yaml(cfg)) + parsed["api_allowlist"] = ["new.example"] + rerendered = pipelock_render_yaml(parsed) + roundtripped = parse_yaml_subset(rerendered) + self.assertEqual(["new.example"], roundtripped["api_allowlist"]) + # Non-allowlist fields stay put. + self.assertEqual("strict", roundtripped["mode"]) + tls = roundtripped["tls_interception"] + self.assertIsInstance(tls, dict) + assert isinstance(tls, dict) # type-narrowing + self.assertEqual("/etc/pipelock-ca.pem", tls["ca_cert"]) + self.assertEqual(["api.anthropic.com"], tls["passthrough_domains"]) + + +if __name__ == "__main__": + unittest.main()