diff --git a/claude_bottle/cli/dashboard.py b/claude_bottle/cli/dashboard.py index 3aae60c..227bb34 100644 --- a/claude_bottle/cli/dashboard.py +++ b/claude_bottle/cli/dashboard.py @@ -36,6 +36,8 @@ from ..backend.docker.pipelock_apply import ( PipelockApplyError, apply_allowlist_change, fetch_current_allowlist, + parse_allowlist_content, + render_allowlist_content, ) from ..log import info from ..supervise import ( @@ -168,7 +170,7 @@ def approve( qp.proposal.bottle_slug, file_to_apply, ) elif qp.proposal.tool == TOOL_PIPELOCK_BLOCK: - diff_before, diff_after = apply_allowlist_change( + diff_before, diff_after = _apply_pipelock_url( qp.proposal.bottle_slug, file_to_apply, ) elif qp.proposal.tool == TOOL_CAPABILITY_BLOCK: @@ -230,6 +232,27 @@ def operator_edit_routes(slug: str, new_content: str) -> tuple[str, str]: return before, after +def _apply_pipelock_url(slug: str, failed_url: str) -> tuple[str, str]: + """pipelock-block proposals carry a single failed URL, not a + full allowlist. Extract the host, merge into the running + allowlist, and hand the merged content to apply_allowlist_change. + The full URL (with path) is preserved on the proposal for the + operator's read; only the host ends up in pipelock's allowlist + (pipelock can't enforce path-level rules).""" + import urllib.parse + parsed = urllib.parse.urlsplit(failed_url.strip()) + host = parsed.hostname or "" + if not host: + raise PipelockApplyError( + f"proposed failed_url has no extractable host: {failed_url!r}" + ) + current = fetch_current_allowlist(slug) + hosts = parse_allowlist_content(current) + if host not in hosts: + hosts.append(host) + return apply_allowlist_change(slug, render_allowlist_content(hosts)) + + def operator_edit_allowlist(slug: str, new_content: str) -> tuple[str, str]: """Apply an operator-initiated pipelock allowlist change (no agent proposal). Used by the `pipelock edit ` TUI verb @@ -580,12 +603,22 @@ def _detail_lines(qp: QueuedProposal) -> list[str]: out.extend(" " + line for line in p.justification.splitlines() or [""]) out.extend([ "", - "proposed file:", + _proposed_payload_label(p.tool) + ":", ]) out.extend(p.proposed_file.splitlines() or [""]) return out +def _proposed_payload_label(tool: str) -> str: + """The detail-view section heading for the proposal's payload — + `proposed_file` is what the dataclass calls it, but for + pipelock-block the payload is a single URL not a file. Render + the label per tool so the operator's eye matches.""" + if tool == TOOL_PIPELOCK_BLOCK: + return "failed URL" + return "proposed file" + + def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None: """Suspend curses, open $EDITOR on the proposed file, return the edited content (or None if unchanged).""" diff --git a/claude_bottle/supervise_server.py b/claude_bottle/supervise_server.py index a384c08..e98b70a 100644 --- a/claude_bottle/supervise_server.py +++ b/claude_bottle/supervise_server.py @@ -36,6 +36,7 @@ import os import socketserver import sys import typing +import urllib.parse from dataclasses import dataclass from pathlib import Path @@ -160,27 +161,34 @@ TOOL_DEFINITIONS: list[dict[str, object]] = [ "name": _sv.TOOL_PIPELOCK_BLOCK, "description": ( "Call when pipelock refused your outbound request — host " - "not in the allowlist, protocol blocked, connection " - "refused at the egress layer. Read the current allowlist " - "from /etc/claude-bottle/current-config/allowlist, compose " - "a modified version, and pass the full new file plus a " - "justification. On approval the supervisor writes the new " - "allowlist and restarts pipelock (wired in PRD 0015; v1 " - "acknowledges only)." + "not in the allowlist, connection refused at the egress " + "layer. Pass the full URL you tried to hit (scheme + " + "host + path) plus a justification. The supervisor " + "extracts the hostname and merges it into the bottle's " + "current pipelock allowlist; the path is captured as " + "context for the operator to review (pipelock's allowlist " + "is hostname-only — it can't enforce path-level rules). " + "On approval the supervisor restarts pipelock with the " + "merged allowlist." ), "inputSchema": { "type": "object", "properties": { - "allowlist": { + "failed_url": { "type": "string", - "description": "Full proposed pipelock allowlist (one hostname per line).", + "description": ( + "The full URL pipelock blocked, e.g. " + "https://api.github.com/repos/foo/bar. Scheme " + "and hostname are required; path is recorded " + "as operator context." + ), }, "justification": { "type": "string", - "description": "Why the new host(s) should be allowed.", + "description": "Why the new host should be allowed.", }, }, - "required": ["allowlist", "justification"], + "required": ["failed_url", "justification"], }, }, { @@ -214,10 +222,20 @@ TOOL_DEFINITIONS: list[dict[str, object]] = [ ] -# Map each tool to the input field that carries the proposed file. +# Map each tool to the input field that carries the agent's +# tool-specific payload (stored in Proposal.proposed_file as +# free-form text the apply path interprets per tool). +# +# cred-proxy-block: full proposed routes.json +# pipelock-block: the full failed URL (scheme + host + path) — +# supervisor extracts the host, merges into the +# bottle's current allowlist; the path is shown +# to the operator for context (pipelock doesn't +# do path-level matching). +# capability-block: full proposed Dockerfile PROPOSED_FILE_FIELD: dict[str, str] = { _sv.TOOL_CRED_PROXY_BLOCK: "routes", - _sv.TOOL_PIPELOCK_BLOCK: "allowlist", + _sv.TOOL_PIPELOCK_BLOCK: "failed_url", _sv.TOOL_CAPABILITY_BLOCK: "dockerfile", } @@ -245,17 +263,21 @@ def validate_proposed_file(tool: str, content: str) -> None: f"{tool}: proposed routes.json must be an object with a 'routes' array", ) elif tool == _sv.TOOL_PIPELOCK_BLOCK: - for i, line in enumerate(content.splitlines()): - stripped = line.strip() - if not stripped or stripped.startswith("#"): - continue - # Hostnames are conservative: letters/digits/dots/dashes only. - for ch in stripped: - if not (ch.isalnum() or ch in ".-_"): - raise _RpcError( - ERR_INVALID_PARAMS, - f"{tool}: allowlist line {i + 1} has invalid character {ch!r}", - ) + # `content` is the full failed URL. Require scheme + host so + # the supervisor can extract a hostname for the allowlist + # merge; the path is preserved for operator context. + parsed = urllib.parse.urlsplit(content.strip()) + if parsed.scheme not in ("http", "https"): + raise _RpcError( + ERR_INVALID_PARAMS, + f"{tool}: failed_url must start with http:// or https:// " + f"(got {content!r})", + ) + if not parsed.hostname: + raise _RpcError( + ERR_INVALID_PARAMS, + f"{tool}: failed_url is missing a hostname (got {content!r})", + ) elif tool == _sv.TOOL_CAPABILITY_BLOCK: # Dockerfiles are too varied to validate syntactically beyond # non-empty. The operator reads the diff in the TUI. diff --git a/tests/unit/test_dashboard.py b/tests/unit/test_dashboard.py index b6f1911..3d0ce60 100644 --- a/tests/unit/test_dashboard.py +++ b/tests/unit/test_dashboard.py @@ -38,11 +38,21 @@ FIXED = datetime(2026, 5, 25, 12, 0, 0, tzinfo=timezone.utc) def _proposal(slug: str = "dev", tool: str = TOOL_CRED_PROXY_BLOCK) -> Proposal: + # Per-tool payload shape: cred-proxy gets routes.json, pipelock + # gets a failed URL (PR #25 follow-up), capability gets a + # Dockerfile-ish blob. Match the production dispatch in + # PROPOSED_FILE_FIELD. + payloads = { + TOOL_CRED_PROXY_BLOCK: '{"routes": []}\n', + TOOL_PIPELOCK_BLOCK: "https://example.com/path", + TOOL_CAPABILITY_BLOCK: "FROM python:3.13\n", + } + payload = payloads.get(tool, "") return Proposal.new( bottle_slug=slug, tool=tool, - proposed_file='{"routes": []}\n', + proposed_file=payload, justification=f"needed for {slug}", - current_file_hash=sha256_hex("{}"), + current_file_hash=sha256_hex(payload), now=FIXED, ) @@ -119,6 +129,7 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase): self._setup_fake_home() self._original_apply_routes = dashboard.apply_routes_change self._original_apply_allowlist = dashboard.apply_allowlist_change + self._original_fetch_allowlist = dashboard.fetch_current_allowlist self._original_apply_capability = dashboard.apply_capability_change # Default stubs: succeed with deterministic before/after so the # audit log shows a non-empty diff. @@ -128,6 +139,7 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase): dashboard.apply_allowlist_change = lambda slug, content: ( "old.example\n", content, ) + dashboard.fetch_current_allowlist = lambda slug: "old.example\n" dashboard.apply_capability_change = lambda slug, content: ( "FROM old\n", content, ) @@ -135,6 +147,7 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase): def tearDown(self): dashboard.apply_routes_change = self._original_apply_routes dashboard.apply_allowlist_change = self._original_apply_allowlist + dashboard.fetch_current_allowlist = self._original_fetch_allowlist dashboard.apply_capability_change = self._original_apply_capability self._teardown_fake_home() @@ -281,23 +294,27 @@ class TestCredProxyApplyWiring(_FakeHomeMixin, unittest.TestCase): class TestPipelockApplyWiring(_FakeHomeMixin, unittest.TestCase): - """PRD 0015 Phase 2: approve() on a pipelock-block proposal - must call apply_allowlist_change and surface its failures.""" + """PRD 0015 Phase 2 + PR #25 follow-up: approve() on a + pipelock-block proposal carries the failed URL; the dashboard + extracts the host, merges it into the running allowlist, and + calls apply_allowlist_change with the merged content.""" def setUp(self): self._setup_fake_home() - self._original = dashboard.apply_allowlist_change + self._original_apply = dashboard.apply_allowlist_change + self._original_fetch = dashboard.fetch_current_allowlist def tearDown(self): - dashboard.apply_allowlist_change = self._original + dashboard.apply_allowlist_change = self._original_apply + dashboard.fetch_current_allowlist = self._original_fetch self._teardown_fake_home() - def _enqueue_pipelock(self, proposed: str = "host.example\n"): + def _enqueue_pipelock(self, failed_url: str = "https://api.github.com/repos/foo/bar"): p = Proposal.new( bottle_slug="dev", tool=TOOL_PIPELOCK_BLOCK, - proposed_file=proposed, - justification="need new host", - current_file_hash=sha256_hex(proposed), + proposed_file=failed_url, + justification="need to read PR metadata", + current_file_hash=sha256_hex(failed_url), now=FIXED, ) qdir = supervise.queue_dir_for_slug("dev") @@ -305,16 +322,41 @@ class TestPipelockApplyWiring(_FakeHomeMixin, unittest.TestCase): supervise.write_proposal(qdir, p) return dashboard.QueuedProposal(proposal=p, queue_dir=qdir) - def test_pipelock_block_calls_apply_with_proposed_file(self): - calls = [] + def test_url_host_merged_into_current_allowlist(self): + dashboard.fetch_current_allowlist = lambda slug: "existing.example\n" + applied = [] dashboard.apply_allowlist_change = lambda slug, content: ( - calls.append((slug, content)) or ("before", content) + applied.append((slug, content)) + or ("existing.example\n", content) ) - qp = self._enqueue_pipelock("new.example\n") + qp = self._enqueue_pipelock("https://api.github.com/repos/foo/bar") dashboard.approve(qp) - self.assertEqual([("dev", "new.example\n")], calls) + # apply_allowlist_change was called with the merged content: + # existing host + the URL's host (no path, since pipelock is + # hostname-only). + self.assertEqual(1, len(applied)) + slug, content = applied[0] + self.assertEqual("dev", slug) + self.assertIn("existing.example", content) + self.assertIn("api.github.com", content) + self.assertNotIn("/repos/foo/bar", content) # path stripped + + def test_host_already_in_allowlist_is_idempotent(self): + dashboard.fetch_current_allowlist = lambda slug: "api.github.com\n" + applied = [] + dashboard.apply_allowlist_change = lambda slug, content: ( + applied.append(content) + or ("api.github.com\n", content) + ) + qp = self._enqueue_pipelock("https://api.github.com/some/path") + dashboard.approve(qp) + # Still applied, but the content is unchanged from current — + # before/after diff is empty. + self.assertEqual(1, len(applied)) + self.assertEqual("api.github.com\n", applied[0]) def test_apply_failure_blocks_response_and_audit(self): + dashboard.fetch_current_allowlist = lambda slug: "existing.example\n" dashboard.apply_allowlist_change = lambda slug, content: (_ for _ in ()).throw( PipelockApplyError("docker exec failed") ) @@ -327,16 +369,13 @@ class TestPipelockApplyWiring(_FakeHomeMixin, unittest.TestCase): ) self.assertEqual([], read_audit_entries("pipelock", "dev")) - def test_real_diff_lands_in_audit(self): - dashboard.apply_allowlist_change = lambda slug, content: ( - "old.example\n", - "old.example\nnew.example\n", - ) - qp = self._enqueue_pipelock("old.example\nnew.example\n") - dashboard.approve(qp) - entries = read_audit_entries("pipelock", "dev") - self.assertEqual(1, len(entries)) - self.assertIn("+new.example", entries[0].diff) + def test_url_without_host_raises(self): + dashboard.fetch_current_allowlist = lambda slug: "" + # supervise_server's validator would catch this; if a broken + # URL ever makes it through, the dashboard surfaces it too. + qp = self._enqueue_pipelock("https:///nohost") + with self.assertRaises(PipelockApplyError): + dashboard.approve(qp) class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase): diff --git a/tests/unit/test_supervise_server.py b/tests/unit/test_supervise_server.py index 2b48db6..e77f9bc 100644 --- a/tests/unit/test_supervise_server.py +++ b/tests/unit/test_supervise_server.py @@ -61,15 +61,27 @@ class TestValidation(unittest.TestCase): '{"routes": [{"path": "/x/", "upstream": "https://example.com"}]}', ) - def test_pipelock_block_accepts_clean_hostnames(self): + def test_pipelock_block_accepts_https_url(self): validate_proposed_file( _sv.TOOL_PIPELOCK_BLOCK, - "api.example.com\n# comment\nfoo.bar.baz\n", + "https://api.github.com/repos/foo/bar", ) - def test_pipelock_block_rejects_invalid_char(self): - with self.assertRaises(_RpcError): - validate_proposed_file(_sv.TOOL_PIPELOCK_BLOCK, "host with space.com\n") + def test_pipelock_block_accepts_http_url(self): + validate_proposed_file( + _sv.TOOL_PIPELOCK_BLOCK, + "http://internal.example/path/to/thing", + ) + + def test_pipelock_block_rejects_missing_scheme(self): + with self.assertRaises(_RpcError) as cm: + validate_proposed_file(_sv.TOOL_PIPELOCK_BLOCK, "api.github.com/foo") + self.assertIn("http://", str(cm.exception.message)) + + def test_pipelock_block_rejects_missing_host(self): + with self.assertRaises(_RpcError) as cm: + validate_proposed_file(_sv.TOOL_PIPELOCK_BLOCK, "https:///just-a-path") + self.assertIn("hostname", str(cm.exception.message)) def test_capability_block_accepts_anything_nonempty(self): validate_proposed_file( @@ -235,7 +247,7 @@ class TestHandleToolsCall(unittest.TestCase): { "name": _sv.TOOL_PIPELOCK_BLOCK, "arguments": { - "allowlist": "example.com\n", + "failed_url": "https://example.com/path", "justification": "needed for tests", }, },