From c05457fbef66c07d16ff26b28a79f32869628055 Mon Sep 17 00:00:00 2001 From: didericis Date: Mon, 25 May 2026 04:59:13 -0400 Subject: [PATCH] feat(pipelock): host-side apply_allowlist_change helper (PRD 0015) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 1 of PRD 0015. New module claude_bottle/backend/docker/pipelock_apply.py: - fetch_current_yaml(slug): docker exec cat of the live /etc/pipelock.yaml. - fetch_current_allowlist(slug): parses the yaml, extracts api_allowlist, renders as one-per-line for the operator/agent. - parse_allowlist_content / render_allowlist_content: one-per-line with `#` comments + blank-line tolerance, conservative hostname validation. - apply_allowlist_change(slug, new): parses new hosts, fetches + parses current yaml, swaps api_allowlist, re-renders via pipelock_render_yaml, docker cp into sidecar, docker restart. Returns (before, after) as one-per-line strings for the audit diff. - PipelockApplyError: caller surfaces to operator without crashing the dashboard. v1 uses restart, not SIGHUP — pipelock has no in-process reload hook; adding one is the PRD's open question. Restart drops in-flight outbound calls and the agent retries pick up the restarted proxy. Yaml roundtrip is covered by tests: parse(render(cfg)) preserves all fields pipelock_render_yaml emits, including tls_interception + passthrough_domains. Co-Authored-By: Claude Opus 4.7 --- .../backend/docker/pipelock_apply.py | 166 ++++++++++++++++++ tests/unit/test_pipelock_apply.py | 115 ++++++++++++ 2 files changed, 281 insertions(+) create mode 100644 claude_bottle/backend/docker/pipelock_apply.py create mode 100644 tests/unit/test_pipelock_apply.py diff --git a/claude_bottle/backend/docker/pipelock_apply.py b/claude_bottle/backend/docker/pipelock_apply.py new file mode 100644 index 0000000..641c5ca --- /dev/null +++ b/claude_bottle/backend/docker/pipelock_apply.py @@ -0,0 +1,166 @@ +"""pipelock_apply — host-side helper to apply an api_allowlist +change to a running pipelock sidecar (PRD 0015). + +Used by the supervise dashboard when the operator approves a +pipelock-block proposal (or runs the operator-initiated `pipelock +edit ` verb). Fetches the current pipelock.yaml via `docker +exec`, parses it, swaps the api_allowlist with the proposed hosts, +re-renders, writes back via `docker cp`, then `docker restart` so +pipelock picks up the new config. + +v1 uses restart, not SIGHUP — pipelock has no in-process reload +hook and adding one is the "SIGHUP reload for pipelock" open +question in PRD 0015. Restart drops in-flight outbound calls; the +agent's HTTP client retries pick up against the restarted proxy. +""" + +from __future__ import annotations + +import os +import re +import subprocess +import tempfile +from pathlib import Path + +from ...pipelock import pipelock_render_yaml +from ...yaml_subset import parse_yaml_subset +from .pipelock import pipelock_container_name + + +PIPELOCK_YAML_IN_CONTAINER = "/etc/pipelock.yaml" + +# Allowlist proposals are one-hostname-per-line. Blank lines and +# `#`-prefixed comments are ignored. The character set matches the +# supervise sidecar's syntactic check on the agent's pipelock-block +# proposal (alphanumerics + dot/dash/underscore). +_HOST_OK = re.compile(r"^[A-Za-z0-9_.-]+$") + + +class PipelockApplyError(RuntimeError): + """Raised when fetch / parse / apply fails. The dashboard renders + the message and keeps the proposal pending — never crashes.""" + + +def parse_allowlist_content(content: str) -> list[str]: + """One hostname per line. Blanks and `#` comments are ignored. + Raises PipelockApplyError if a line has a disallowed character.""" + hosts: list[str] = [] + for i, raw_line in enumerate(content.splitlines(), start=1): + line = raw_line.strip() + if not line or line.startswith("#"): + continue + if not _HOST_OK.match(line): + raise PipelockApplyError( + f"allowlist line {i}: {line!r} has disallowed characters" + ) + hosts.append(line) + return hosts + + +def render_allowlist_content(hosts: list[str]) -> str: + """Hosts → one-per-line string (the operator-facing format).""" + if not hosts: + return "" + return "\n".join(hosts) + "\n" + + +def fetch_current_yaml(slug: str) -> str: + """Read the live /etc/pipelock.yaml from the running pipelock + sidecar. Raises PipelockApplyError if the read fails.""" + container = pipelock_container_name(slug) + r = subprocess.run( + ["docker", "exec", container, "cat", PIPELOCK_YAML_IN_CONTAINER], + capture_output=True, text=True, check=False, + ) + if r.returncode != 0: + raise PipelockApplyError( + f"could not read pipelock.yaml from {container}: " + f"{(r.stderr or '').strip() or 'container not running?'}" + ) + return r.stdout + + +def fetch_current_allowlist(slug: str) -> str: + """Fetch the live yaml, extract api_allowlist, render as one-per- + line — the operator-facing format for the TUI / agent's + current-config mount.""" + yaml = fetch_current_yaml(slug) + cfg = parse_yaml_subset(yaml) + hosts = cfg.get("api_allowlist", []) + if not isinstance(hosts, list): + raise PipelockApplyError( + "running pipelock yaml: api_allowlist is not a list" + ) + return render_allowlist_content([str(h) for h in hosts]) + + +def apply_allowlist_change( + slug: str, new_allowlist_content: str, +) -> tuple[str, str]: + """Apply `new_allowlist_content` to the pipelock sidecar: + 1. Parse the proposed hosts (one per line). + 2. Fetch + parse current pipelock.yaml. + 3. Replace api_allowlist with the proposed hosts; re-render. + 4. docker cp the new yaml into the sidecar. + 5. docker restart so pipelock reloads. + + Returns (before, after) where both are one-per-line allowlist + strings (operator-facing format). Raises PipelockApplyError on + any failure; the sidecar's existing config stays in place until + docker cp succeeds, and the restart is what makes it live.""" + new_hosts = parse_allowlist_content(new_allowlist_content) + container = pipelock_container_name(slug) + current_yaml = fetch_current_yaml(slug) + cfg = parse_yaml_subset(current_yaml) + current_hosts = cfg.get("api_allowlist", []) + if not isinstance(current_hosts, list): + raise PipelockApplyError( + "running pipelock yaml: api_allowlist is not a list" + ) + + before = render_allowlist_content([str(h) for h in current_hosts]) + after = render_allowlist_content(new_hosts) + + cfg["api_allowlist"] = new_hosts + rendered = pipelock_render_yaml(cfg) + + fd, tmp_path = tempfile.mkstemp(prefix="cb-pipelock-yaml.", suffix=".yaml") + try: + with os.fdopen(fd, "w") as f: + f.write(rendered) + cp = subprocess.run( + ["docker", "cp", tmp_path, f"{container}:{PIPELOCK_YAML_IN_CONTAINER}"], + capture_output=True, text=True, check=False, + ) + if cp.returncode != 0: + raise PipelockApplyError( + f"failed to copy pipelock.yaml into {container}: " + f"{(cp.stderr or '').strip()}" + ) + restart = subprocess.run( + ["docker", "restart", container], + capture_output=True, text=True, check=False, + ) + if restart.returncode != 0: + raise PipelockApplyError( + f"failed to restart {container}: " + f"{(restart.stderr or '').strip()}" + ) + finally: + try: + Path(tmp_path).unlink() + except OSError: + pass + + return before, after + + +__all__ = [ + "PIPELOCK_YAML_IN_CONTAINER", + "PipelockApplyError", + "apply_allowlist_change", + "fetch_current_allowlist", + "fetch_current_yaml", + "parse_allowlist_content", + "render_allowlist_content", +] diff --git a/tests/unit/test_pipelock_apply.py b/tests/unit/test_pipelock_apply.py new file mode 100644 index 0000000..1f48226 --- /dev/null +++ b/tests/unit/test_pipelock_apply.py @@ -0,0 +1,115 @@ +"""Unit: pipelock_apply parsers + helpers (PRD 0015 Phase 1). + +docker exec / cp / restart paths are covered by the integration +test in Phase 4. Here we cover the host-side parsing + yaml roundtrip. +""" + +import unittest + +from claude_bottle.backend.docker.pipelock_apply import ( + PipelockApplyError, + parse_allowlist_content, + render_allowlist_content, +) +from claude_bottle.pipelock import pipelock_render_yaml +from claude_bottle.yaml_subset import parse_yaml_subset + + +class TestParseAllowlistContent(unittest.TestCase): + def test_one_per_line(self): + self.assertEqual( + ["a.example", "b.example"], + parse_allowlist_content("a.example\nb.example\n"), + ) + + def test_blank_lines_ignored(self): + self.assertEqual( + ["a", "b"], + parse_allowlist_content("a\n\n \nb\n"), + ) + + def test_comments_ignored(self): + self.assertEqual( + ["a"], + parse_allowlist_content("# top comment\na\n# trailing\n"), + ) + + def test_invalid_char_raises(self): + with self.assertRaises(PipelockApplyError) as cm: + parse_allowlist_content("host with space\n") + self.assertIn("disallowed characters", str(cm.exception)) + + def test_empty_input_returns_empty_list(self): + self.assertEqual([], parse_allowlist_content("")) + + +class TestRenderAllowlistContent(unittest.TestCase): + def test_one_per_line_with_trailing_newline(self): + self.assertEqual("a\nb\n", render_allowlist_content(["a", "b"])) + + def test_empty_renders_empty(self): + self.assertEqual("", render_allowlist_content([])) + + def test_roundtrip(self): + original = ["api.example.com", "ghcr.io", "example.org"] + self.assertEqual( + original, + parse_allowlist_content(render_allowlist_content(original)), + ) + + +class TestYamlRoundtripPreservesPipelockFields(unittest.TestCase): + """The apply path parses the running pipelock.yaml, swaps + api_allowlist, re-renders. Verify that parse(render(cfg)) == + cfg for the fields pipelock_render_yaml emits — otherwise + the apply would silently drop config.""" + + def test_minimal_config_roundtrips(self): + cfg = { + "version": 1, + "mode": "strict", + "enforce": True, + "api_allowlist": ["a.example", "b.example"], + "forward_proxy": {"enabled": True}, + "dlp": {"include_defaults": True, "scan_env": True}, + "request_body_scanning": {"action": "block"}, + } + rendered = pipelock_render_yaml(cfg) + parsed = parse_yaml_subset(rendered) + self.assertEqual(["a.example", "b.example"], parsed["api_allowlist"]) + self.assertEqual(1, parsed["version"]) + self.assertEqual("strict", parsed["mode"]) + self.assertEqual(True, parsed["enforce"]) + + def test_swap_allowlist_then_render_preserves_other_fields(self): + cfg = { + "version": 1, + "mode": "strict", + "enforce": True, + "api_allowlist": ["old.example"], + "forward_proxy": {"enabled": True}, + "dlp": {"include_defaults": True, "scan_env": True}, + "request_body_scanning": {"action": "block"}, + "tls_interception": { + "enabled": True, + "ca_cert": "/etc/pipelock-ca.pem", + "ca_key": "/etc/pipelock-ca-key.pem", + "passthrough_domains": ["api.anthropic.com"], + }, + } + parsed = parse_yaml_subset(pipelock_render_yaml(cfg)) + parsed["api_allowlist"] = ["new.example"] + rerendered = pipelock_render_yaml(parsed) + roundtripped = parse_yaml_subset(rerendered) + self.assertEqual(["new.example"], roundtripped["api_allowlist"]) + # Non-allowlist fields stay put. + self.assertEqual("strict", roundtripped["mode"]) + tls = roundtripped["tls_interception"] + self.assertIsInstance(tls, dict) + assert isinstance(tls, dict) # type-narrowing + self.assertEqual("/etc/pipelock-ca.pem", tls["ca_cert"]) + self.assertEqual(["api.anthropic.com"], tls["passthrough_domains"]) + + +if __name__ == "__main__": + unittest.main()