feat(pipelock-block): tool sends failed URL, supervisor merges host
test / unit (pull_request) Successful in 16s
test / integration (pull_request) Successful in 1m32s

Reshape the pipelock-block MCP tool around what the agent actually
knows at the moment of failure (the URL pipelock just refused), not
what the operator needs (a full allowlist file).

Before: agent had to read /etc/claude-bottle/current-config/allowlist,
copy the whole file, append their host, send back. Lots of work,
easy to get wrong, and the operator's diff was noisy because the
proposal contained every host the agent saw — most of which weren't
the change.

After: agent calls
  pipelock-block(failed_url="https://api.github.com/repos/foo/bar",
                 justification="...")
supervisor extracts api.github.com, fetches the running allowlist,
adds the host if not already present, applies the merged content.

Path is captured as operator context (the detail view labels it
"failed URL" instead of "proposed file") but isn't enforced —
pipelock's api_allowlist is hostname-only, so the path can't
become an allow rule.

- supervise_server: pipelock-block input schema gains `failed_url`
  (replaces `allowlist`); validate_proposed_file checks for
  http/https + hostname.
- PROPOSED_FILE_FIELD updated; tool description rewritten.
- dashboard._apply_pipelock_url: extract host, fetch current,
  merge, apply.
- _proposed_payload_label: detail view renders "failed URL" for
  pipelock-block, "proposed file" otherwise.
- Tests updated end-to-end; new url-host-merge + idempotent-merge +
  invalid-url cases added.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-25 08:02:53 -04:00
parent a9bb34cb77
commit f3f2e3e9ab
4 changed files with 163 additions and 57 deletions
+35 -2
View File
@@ -36,6 +36,8 @@ from ..backend.docker.pipelock_apply import (
PipelockApplyError,
apply_allowlist_change,
fetch_current_allowlist,
parse_allowlist_content,
render_allowlist_content,
)
from ..log import info
from ..supervise import (
@@ -168,7 +170,7 @@ def approve(
qp.proposal.bottle_slug, file_to_apply,
)
elif qp.proposal.tool == TOOL_PIPELOCK_BLOCK:
diff_before, diff_after = apply_allowlist_change(
diff_before, diff_after = _apply_pipelock_url(
qp.proposal.bottle_slug, file_to_apply,
)
elif qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
@@ -230,6 +232,27 @@ def operator_edit_routes(slug: str, new_content: str) -> tuple[str, str]:
return before, after
def _apply_pipelock_url(slug: str, failed_url: str) -> tuple[str, str]:
"""pipelock-block proposals carry a single failed URL, not a
full allowlist. Extract the host, merge into the running
allowlist, and hand the merged content to apply_allowlist_change.
The full URL (with path) is preserved on the proposal for the
operator's read; only the host ends up in pipelock's allowlist
(pipelock can't enforce path-level rules)."""
import urllib.parse
parsed = urllib.parse.urlsplit(failed_url.strip())
host = parsed.hostname or ""
if not host:
raise PipelockApplyError(
f"proposed failed_url has no extractable host: {failed_url!r}"
)
current = fetch_current_allowlist(slug)
hosts = parse_allowlist_content(current)
if host not in hosts:
hosts.append(host)
return apply_allowlist_change(slug, render_allowlist_content(hosts))
def operator_edit_allowlist(slug: str, new_content: str) -> tuple[str, str]:
"""Apply an operator-initiated pipelock allowlist change (no
agent proposal). Used by the `pipelock edit <bottle>` TUI verb
@@ -580,12 +603,22 @@ def _detail_lines(qp: QueuedProposal) -> list[str]:
out.extend(" " + line for line in p.justification.splitlines() or [""])
out.extend([
"",
"proposed file:",
_proposed_payload_label(p.tool) + ":",
])
out.extend(p.proposed_file.splitlines() or [""])
return out
def _proposed_payload_label(tool: str) -> str:
"""The detail-view section heading for the proposal's payload —
`proposed_file` is what the dataclass calls it, but for
pipelock-block the payload is a single URL not a file. Render
the label per tool so the operator's eye matches."""
if tool == TOOL_PIPELOCK_BLOCK:
return "failed URL"
return "proposed file"
def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None:
"""Suspend curses, open $EDITOR on the proposed file, return the
edited content (or None if unchanged)."""
+46 -24
View File
@@ -36,6 +36,7 @@ import os
import socketserver
import sys
import typing
import urllib.parse
from dataclasses import dataclass
from pathlib import Path
@@ -160,27 +161,34 @@ TOOL_DEFINITIONS: list[dict[str, object]] = [
"name": _sv.TOOL_PIPELOCK_BLOCK,
"description": (
"Call when pipelock refused your outbound request — host "
"not in the allowlist, protocol blocked, connection "
"refused at the egress layer. Read the current allowlist "
"from /etc/claude-bottle/current-config/allowlist, compose "
"a modified version, and pass the full new file plus a "
"justification. On approval the supervisor writes the new "
"allowlist and restarts pipelock (wired in PRD 0015; v1 "
"acknowledges only)."
"not in the allowlist, connection refused at the egress "
"layer. Pass the full URL you tried to hit (scheme + "
"host + path) plus a justification. The supervisor "
"extracts the hostname and merges it into the bottle's "
"current pipelock allowlist; the path is captured as "
"context for the operator to review (pipelock's allowlist "
"is hostname-only — it can't enforce path-level rules). "
"On approval the supervisor restarts pipelock with the "
"merged allowlist."
),
"inputSchema": {
"type": "object",
"properties": {
"allowlist": {
"failed_url": {
"type": "string",
"description": "Full proposed pipelock allowlist (one hostname per line).",
"description": (
"The full URL pipelock blocked, e.g. "
"https://api.github.com/repos/foo/bar. Scheme "
"and hostname are required; path is recorded "
"as operator context."
),
},
"justification": {
"type": "string",
"description": "Why the new host(s) should be allowed.",
"description": "Why the new host should be allowed.",
},
},
"required": ["allowlist", "justification"],
"required": ["failed_url", "justification"],
},
},
{
@@ -214,10 +222,20 @@ TOOL_DEFINITIONS: list[dict[str, object]] = [
]
# Map each tool to the input field that carries the proposed file.
# Map each tool to the input field that carries the agent's
# tool-specific payload (stored in Proposal.proposed_file as
# free-form text the apply path interprets per tool).
#
# cred-proxy-block: full proposed routes.json
# pipelock-block: the full failed URL (scheme + host + path) —
# supervisor extracts the host, merges into the
# bottle's current allowlist; the path is shown
# to the operator for context (pipelock doesn't
# do path-level matching).
# capability-block: full proposed Dockerfile
PROPOSED_FILE_FIELD: dict[str, str] = {
_sv.TOOL_CRED_PROXY_BLOCK: "routes",
_sv.TOOL_PIPELOCK_BLOCK: "allowlist",
_sv.TOOL_PIPELOCK_BLOCK: "failed_url",
_sv.TOOL_CAPABILITY_BLOCK: "dockerfile",
}
@@ -245,17 +263,21 @@ def validate_proposed_file(tool: str, content: str) -> None:
f"{tool}: proposed routes.json must be an object with a 'routes' array",
)
elif tool == _sv.TOOL_PIPELOCK_BLOCK:
for i, line in enumerate(content.splitlines()):
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
# Hostnames are conservative: letters/digits/dots/dashes only.
for ch in stripped:
if not (ch.isalnum() or ch in ".-_"):
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: allowlist line {i + 1} has invalid character {ch!r}",
)
# `content` is the full failed URL. Require scheme + host so
# the supervisor can extract a hostname for the allowlist
# merge; the path is preserved for operator context.
parsed = urllib.parse.urlsplit(content.strip())
if parsed.scheme not in ("http", "https"):
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: failed_url must start with http:// or https:// "
f"(got {content!r})",
)
if not parsed.hostname:
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: failed_url is missing a hostname (got {content!r})",
)
elif tool == _sv.TOOL_CAPABILITY_BLOCK:
# Dockerfiles are too varied to validate syntactically beyond
# non-empty. The operator reads the diff in the TUI.
+64 -25
View File
@@ -38,11 +38,21 @@ FIXED = datetime(2026, 5, 25, 12, 0, 0, tzinfo=timezone.utc)
def _proposal(slug: str = "dev", tool: str = TOOL_CRED_PROXY_BLOCK) -> Proposal:
# Per-tool payload shape: cred-proxy gets routes.json, pipelock
# gets a failed URL (PR #25 follow-up), capability gets a
# Dockerfile-ish blob. Match the production dispatch in
# PROPOSED_FILE_FIELD.
payloads = {
TOOL_CRED_PROXY_BLOCK: '{"routes": []}\n',
TOOL_PIPELOCK_BLOCK: "https://example.com/path",
TOOL_CAPABILITY_BLOCK: "FROM python:3.13\n",
}
payload = payloads.get(tool, "")
return Proposal.new(
bottle_slug=slug, tool=tool,
proposed_file='{"routes": []}\n',
proposed_file=payload,
justification=f"needed for {slug}",
current_file_hash=sha256_hex("{}"),
current_file_hash=sha256_hex(payload),
now=FIXED,
)
@@ -119,6 +129,7 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
self._setup_fake_home()
self._original_apply_routes = dashboard.apply_routes_change
self._original_apply_allowlist = dashboard.apply_allowlist_change
self._original_fetch_allowlist = dashboard.fetch_current_allowlist
self._original_apply_capability = dashboard.apply_capability_change
# Default stubs: succeed with deterministic before/after so the
# audit log shows a non-empty diff.
@@ -128,6 +139,7 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
dashboard.apply_allowlist_change = lambda slug, content: (
"old.example\n", content,
)
dashboard.fetch_current_allowlist = lambda slug: "old.example\n"
dashboard.apply_capability_change = lambda slug, content: (
"FROM old\n", content,
)
@@ -135,6 +147,7 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
def tearDown(self):
dashboard.apply_routes_change = self._original_apply_routes
dashboard.apply_allowlist_change = self._original_apply_allowlist
dashboard.fetch_current_allowlist = self._original_fetch_allowlist
dashboard.apply_capability_change = self._original_apply_capability
self._teardown_fake_home()
@@ -281,23 +294,27 @@ class TestCredProxyApplyWiring(_FakeHomeMixin, unittest.TestCase):
class TestPipelockApplyWiring(_FakeHomeMixin, unittest.TestCase):
"""PRD 0015 Phase 2: approve() on a pipelock-block proposal
must call apply_allowlist_change and surface its failures."""
"""PRD 0015 Phase 2 + PR #25 follow-up: approve() on a
pipelock-block proposal carries the failed URL; the dashboard
extracts the host, merges it into the running allowlist, and
calls apply_allowlist_change with the merged content."""
def setUp(self):
self._setup_fake_home()
self._original = dashboard.apply_allowlist_change
self._original_apply = dashboard.apply_allowlist_change
self._original_fetch = dashboard.fetch_current_allowlist
def tearDown(self):
dashboard.apply_allowlist_change = self._original
dashboard.apply_allowlist_change = self._original_apply
dashboard.fetch_current_allowlist = self._original_fetch
self._teardown_fake_home()
def _enqueue_pipelock(self, proposed: str = "host.example\n"):
def _enqueue_pipelock(self, failed_url: str = "https://api.github.com/repos/foo/bar"):
p = Proposal.new(
bottle_slug="dev", tool=TOOL_PIPELOCK_BLOCK,
proposed_file=proposed,
justification="need new host",
current_file_hash=sha256_hex(proposed),
proposed_file=failed_url,
justification="need to read PR metadata",
current_file_hash=sha256_hex(failed_url),
now=FIXED,
)
qdir = supervise.queue_dir_for_slug("dev")
@@ -305,16 +322,41 @@ class TestPipelockApplyWiring(_FakeHomeMixin, unittest.TestCase):
supervise.write_proposal(qdir, p)
return dashboard.QueuedProposal(proposal=p, queue_dir=qdir)
def test_pipelock_block_calls_apply_with_proposed_file(self):
calls = []
def test_url_host_merged_into_current_allowlist(self):
dashboard.fetch_current_allowlist = lambda slug: "existing.example\n"
applied = []
dashboard.apply_allowlist_change = lambda slug, content: (
calls.append((slug, content)) or ("before", content)
applied.append((slug, content))
or ("existing.example\n", content)
)
qp = self._enqueue_pipelock("new.example\n")
qp = self._enqueue_pipelock("https://api.github.com/repos/foo/bar")
dashboard.approve(qp)
self.assertEqual([("dev", "new.example\n")], calls)
# apply_allowlist_change was called with the merged content:
# existing host + the URL's host (no path, since pipelock is
# hostname-only).
self.assertEqual(1, len(applied))
slug, content = applied[0]
self.assertEqual("dev", slug)
self.assertIn("existing.example", content)
self.assertIn("api.github.com", content)
self.assertNotIn("/repos/foo/bar", content) # path stripped
def test_host_already_in_allowlist_is_idempotent(self):
dashboard.fetch_current_allowlist = lambda slug: "api.github.com\n"
applied = []
dashboard.apply_allowlist_change = lambda slug, content: (
applied.append(content)
or ("api.github.com\n", content)
)
qp = self._enqueue_pipelock("https://api.github.com/some/path")
dashboard.approve(qp)
# Still applied, but the content is unchanged from current —
# before/after diff is empty.
self.assertEqual(1, len(applied))
self.assertEqual("api.github.com\n", applied[0])
def test_apply_failure_blocks_response_and_audit(self):
dashboard.fetch_current_allowlist = lambda slug: "existing.example\n"
dashboard.apply_allowlist_change = lambda slug, content: (_ for _ in ()).throw(
PipelockApplyError("docker exec failed")
)
@@ -327,16 +369,13 @@ class TestPipelockApplyWiring(_FakeHomeMixin, unittest.TestCase):
)
self.assertEqual([], read_audit_entries("pipelock", "dev"))
def test_real_diff_lands_in_audit(self):
dashboard.apply_allowlist_change = lambda slug, content: (
"old.example\n",
"old.example\nnew.example\n",
)
qp = self._enqueue_pipelock("old.example\nnew.example\n")
dashboard.approve(qp)
entries = read_audit_entries("pipelock", "dev")
self.assertEqual(1, len(entries))
self.assertIn("+new.example", entries[0].diff)
def test_url_without_host_raises(self):
dashboard.fetch_current_allowlist = lambda slug: ""
# supervise_server's validator would catch this; if a broken
# URL ever makes it through, the dashboard surfaces it too.
qp = self._enqueue_pipelock("https:///nohost")
with self.assertRaises(PipelockApplyError):
dashboard.approve(qp)
class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
+18 -6
View File
@@ -61,15 +61,27 @@ class TestValidation(unittest.TestCase):
'{"routes": [{"path": "/x/", "upstream": "https://example.com"}]}',
)
def test_pipelock_block_accepts_clean_hostnames(self):
def test_pipelock_block_accepts_https_url(self):
validate_proposed_file(
_sv.TOOL_PIPELOCK_BLOCK,
"api.example.com\n# comment\nfoo.bar.baz\n",
"https://api.github.com/repos/foo/bar",
)
def test_pipelock_block_rejects_invalid_char(self):
with self.assertRaises(_RpcError):
validate_proposed_file(_sv.TOOL_PIPELOCK_BLOCK, "host with space.com\n")
def test_pipelock_block_accepts_http_url(self):
validate_proposed_file(
_sv.TOOL_PIPELOCK_BLOCK,
"http://internal.example/path/to/thing",
)
def test_pipelock_block_rejects_missing_scheme(self):
with self.assertRaises(_RpcError) as cm:
validate_proposed_file(_sv.TOOL_PIPELOCK_BLOCK, "api.github.com/foo")
self.assertIn("http://", str(cm.exception.message))
def test_pipelock_block_rejects_missing_host(self):
with self.assertRaises(_RpcError) as cm:
validate_proposed_file(_sv.TOOL_PIPELOCK_BLOCK, "https:///just-a-path")
self.assertIn("hostname", str(cm.exception.message))
def test_capability_block_accepts_anything_nonempty(self):
validate_proposed_file(
@@ -235,7 +247,7 @@ class TestHandleToolsCall(unittest.TestCase):
{
"name": _sv.TOOL_PIPELOCK_BLOCK,
"arguments": {
"allowlist": "example.com\n",
"failed_url": "https://example.com/path",
"justification": "needed for tests",
},
},