Merge pull request 'PRD 0015: pipelock block remediation' (#21) from prd-0015-pipelock-block into main
test / unit (push) Successful in 17s
test / integration (push) Successful in 1m9s

This commit was merged in pull request #21.
This commit is contained in:
2026-05-25 05:15:16 -04:00
6 changed files with 730 additions and 29 deletions
@@ -0,0 +1,183 @@
"""pipelock_apply — host-side helper to apply an api_allowlist
change to a running pipelock sidecar (PRD 0015).
Used by the supervise dashboard when the operator approves a
pipelock-block proposal (or runs the operator-initiated `pipelock
edit <bottle>` verb). Fetches the current pipelock.yaml via `docker
exec`, parses it, swaps the api_allowlist with the proposed hosts,
re-renders, writes back via `docker cp`, then `docker restart` so
pipelock picks up the new config.
v1 uses restart, not SIGHUP — pipelock has no in-process reload
hook and adding one is the "SIGHUP reload for pipelock" open
question in PRD 0015. Restart drops in-flight outbound calls; the
agent's HTTP client retries pick up against the restarted proxy.
"""
from __future__ import annotations
import os
import re
import subprocess
import tempfile
from pathlib import Path
from ...pipelock import pipelock_render_yaml
from ...yaml_subset import parse_yaml_subset
from .pipelock import pipelock_container_name
PIPELOCK_YAML_IN_CONTAINER = "/etc/pipelock.yaml"
# Allowlist proposals are one-hostname-per-line. Blank lines and
# `#`-prefixed comments are ignored. The character set matches the
# supervise sidecar's syntactic check on the agent's pipelock-block
# proposal (alphanumerics + dot/dash/underscore).
_HOST_OK = re.compile(r"^[A-Za-z0-9_.-]+$")
class PipelockApplyError(RuntimeError):
"""Raised when fetch / parse / apply fails. The dashboard renders
the message and keeps the proposal pending — never crashes."""
def parse_allowlist_content(content: str) -> list[str]:
"""One hostname per line. Blanks and `#` comments are ignored.
Raises PipelockApplyError if a line has a disallowed character."""
hosts: list[str] = []
for i, raw_line in enumerate(content.splitlines(), start=1):
line = raw_line.strip()
if not line or line.startswith("#"):
continue
if not _HOST_OK.match(line):
raise PipelockApplyError(
f"allowlist line {i}: {line!r} has disallowed characters"
)
hosts.append(line)
return hosts
def render_allowlist_content(hosts: list[str]) -> str:
"""Hosts → one-per-line string (the operator-facing format)."""
if not hosts:
return ""
return "\n".join(hosts) + "\n"
def fetch_current_yaml(slug: str) -> str:
"""Read the live /etc/pipelock.yaml from the pipelock sidecar.
Uses `docker cp` (not `docker exec cat`) because the pipelock
image is distroless and has no shell utilities. `docker cp` is a
daemon-API tarball copy — works on stopped containers too, and
doesn't need anything in the container's PATH.
Raises PipelockApplyError if the read fails."""
container = pipelock_container_name(slug)
fd, tmp_path = tempfile.mkstemp(prefix="cb-pipelock-fetch.", suffix=".yaml")
os.close(fd)
try:
r = subprocess.run(
[
"docker", "cp",
f"{container}:{PIPELOCK_YAML_IN_CONTAINER}", tmp_path,
],
capture_output=True, text=True, check=False,
)
if r.returncode != 0:
raise PipelockApplyError(
f"could not fetch pipelock.yaml from {container}: "
f"{(r.stderr or '').strip() or 'container not running?'}"
)
return Path(tmp_path).read_text()
finally:
try:
Path(tmp_path).unlink()
except OSError:
pass
def fetch_current_allowlist(slug: str) -> str:
"""Fetch the live yaml, extract api_allowlist, render as one-per-
line — the operator-facing format for the TUI / agent's
current-config mount."""
yaml = fetch_current_yaml(slug)
cfg = parse_yaml_subset(yaml)
hosts = cfg.get("api_allowlist", [])
if not isinstance(hosts, list):
raise PipelockApplyError(
"running pipelock yaml: api_allowlist is not a list"
)
return render_allowlist_content([str(h) for h in hosts])
def apply_allowlist_change(
slug: str, new_allowlist_content: str,
) -> tuple[str, str]:
"""Apply `new_allowlist_content` to the pipelock sidecar:
1. Parse the proposed hosts (one per line).
2. Fetch + parse current pipelock.yaml.
3. Replace api_allowlist with the proposed hosts; re-render.
4. docker cp the new yaml into the sidecar.
5. docker restart so pipelock reloads.
Returns (before, after) where both are one-per-line allowlist
strings (operator-facing format). Raises PipelockApplyError on
any failure; the sidecar's existing config stays in place until
docker cp succeeds, and the restart is what makes it live."""
new_hosts = parse_allowlist_content(new_allowlist_content)
container = pipelock_container_name(slug)
current_yaml = fetch_current_yaml(slug)
cfg = parse_yaml_subset(current_yaml)
current_hosts = cfg.get("api_allowlist", [])
if not isinstance(current_hosts, list):
raise PipelockApplyError(
"running pipelock yaml: api_allowlist is not a list"
)
before = render_allowlist_content([str(h) for h in current_hosts])
after = render_allowlist_content(new_hosts)
cfg["api_allowlist"] = new_hosts
rendered = pipelock_render_yaml(cfg)
fd, tmp_path = tempfile.mkstemp(prefix="cb-pipelock-yaml.", suffix=".yaml")
try:
with os.fdopen(fd, "w") as f:
f.write(rendered)
cp = subprocess.run(
["docker", "cp", tmp_path, f"{container}:{PIPELOCK_YAML_IN_CONTAINER}"],
capture_output=True, text=True, check=False,
)
if cp.returncode != 0:
raise PipelockApplyError(
f"failed to copy pipelock.yaml into {container}: "
f"{(cp.stderr or '').strip()}"
)
restart = subprocess.run(
["docker", "restart", container],
capture_output=True, text=True, check=False,
)
if restart.returncode != 0:
raise PipelockApplyError(
f"failed to restart {container}: "
f"{(restart.stderr or '').strip()}"
)
finally:
try:
Path(tmp_path).unlink()
except OSError:
pass
return before, after
__all__ = [
"PIPELOCK_YAML_IN_CONTAINER",
"PipelockApplyError",
"apply_allowlist_change",
"fetch_current_allowlist",
"fetch_current_yaml",
"parse_allowlist_content",
"render_allowlist_content",
]
+114 -26
View File
@@ -27,6 +27,11 @@ from ..backend.docker.cred_proxy_apply import (
apply_routes_change,
fetch_current_routes,
)
from ..backend.docker.pipelock_apply import (
PipelockApplyError,
apply_allowlist_change,
fetch_current_allowlist,
)
from ..log import info
from ..supervise import (
ACTION_OPERATOR_EDIT,
@@ -39,6 +44,7 @@ from ..supervise import (
STATUS_REJECTED,
TOOL_CAPABILITY_BLOCK,
TOOL_CRED_PROXY_BLOCK,
TOOL_PIPELOCK_BLOCK,
list_pending_proposals,
render_diff,
write_audit_entry,
@@ -47,6 +53,12 @@ from ..supervise import (
from ._common import PROG
# Errors any remediation engine may raise. Caught by the TUI key
# handlers and surfaced in the status line so a failed apply keeps
# the proposal pending rather than crashing curses.
ApplyError = (CredProxyApplyError, PipelockApplyError)
# --- Discovery -------------------------------------------------------------
@@ -58,16 +70,15 @@ class QueuedProposal:
queue_dir: Path
def discover_cred_proxy_slugs() -> list[str]:
"""Slugs of bottles with a running cred-proxy sidecar. Used by
the operator-initiated `routes edit` verb to know which bottles
are editable. Empty list if docker isn't reachable or not
def _discover_sidecar_slugs(name_prefix: str) -> list[str]:
"""Slugs of bottles whose sidecar container names start with
`name_prefix`. Empty list if docker isn't reachable or not
installed."""
try:
r = subprocess.run(
[
"docker", "ps",
"--filter", "name=^claude-bottle-cred-proxy-",
"--filter", f"name=^{name_prefix}",
"--format", "{{.Names}}",
],
capture_output=True, text=True, check=False,
@@ -76,15 +87,26 @@ def discover_cred_proxy_slugs() -> list[str]:
return []
if r.returncode != 0:
return []
prefix = "claude-bottle-cred-proxy-"
out: list[str] = []
for line in (r.stdout or "").splitlines():
line = line.strip()
if line.startswith(prefix):
out.append(line[len(prefix):])
if line.startswith(name_prefix):
out.append(line[len(name_prefix):])
return sorted(out)
def discover_cred_proxy_slugs() -> list[str]:
"""Slugs of bottles with a running cred-proxy sidecar. Used by
the operator-initiated `routes edit` verb."""
return _discover_sidecar_slugs("claude-bottle-cred-proxy-")
def discover_pipelock_slugs() -> list[str]:
"""Slugs of bottles with a running pipelock sidecar. Used by
the operator-initiated `pipelock edit` verb."""
return _discover_sidecar_slugs("claude-bottle-pipelock-")
def discover_pending() -> list[QueuedProposal]:
"""Walk ~/.claude-bottle/queue/* and collect pending proposals
from every bottle's queue. Sorted by arrival time across the
@@ -129,9 +151,13 @@ def approve(
diff_before, diff_after = apply_routes_change(
qp.proposal.bottle_slug, file_to_apply,
)
# pipelock-block + capability-block remediation lands in PRDs
# 0015 + 0016; for 0014 they remain no-op approvals and the
# audit diff stays empty.
elif qp.proposal.tool == TOOL_PIPELOCK_BLOCK:
diff_before, diff_after = apply_allowlist_change(
qp.proposal.bottle_slug, file_to_apply,
)
# capability-block remediation lands in PRD 0016; until then
# it stays a no-op approval and the audit (none for capability)
# is skipped.
response = Response(
proposal_id=qp.proposal.id,
@@ -181,6 +207,27 @@ def operator_edit_routes(slug: str, new_content: str) -> tuple[str, str]:
return before, after
def operator_edit_allowlist(slug: str, new_content: str) -> tuple[str, str]:
"""Apply an operator-initiated pipelock allowlist change (no
agent proposal). Used by the `pipelock edit <bottle>` TUI verb
and available for scripted use. Returns (before, after) like
apply_allowlist_change. Writes an audit entry tagged
ACTION_OPERATOR_EDIT to distinguish from tool-call approvals.
Raises PipelockApplyError on failure."""
before, after = apply_allowlist_change(slug, new_content)
write_audit_entry(AuditEntry(
timestamp=datetime.now(timezone.utc).isoformat(),
bottle_slug=slug,
component="pipelock",
operator_action=ACTION_OPERATOR_EDIT,
operator_notes="",
justification="",
diff=render_diff(before, after, label="pipelock"),
))
return before, after
def _write_audit(
qp: QueuedProposal,
*,
@@ -296,6 +343,9 @@ def _main_loop(stdscr: "curses._CursesWindow") -> None:
if key == ord("e"):
status_line = _operator_edit_routes_flow(stdscr)
continue
if key == ord("p"):
status_line = _operator_edit_allowlist_flow(stdscr)
continue
if not pending:
continue
qp = pending[selected]
@@ -310,7 +360,7 @@ def _main_loop(stdscr: "curses._CursesWindow") -> None:
try:
approve(qp)
status_line = f"approved {qp.proposal.tool} for [{qp.proposal.bottle_slug}]"
except CredProxyApplyError as e:
except ApplyError as e:
status_line = f"apply failed: {e}"
elif key == ord("m"):
edited = _modify(stdscr, qp)
@@ -320,7 +370,7 @@ def _main_loop(stdscr: "curses._CursesWindow") -> None:
try:
approve(qp, final_file=edited, notes="operator modified before approving")
status_line = f"modified+approved {qp.proposal.tool} for [{qp.proposal.bottle_slug}]"
except CredProxyApplyError as e:
except ApplyError as e:
status_line = f"apply failed: {e}"
elif key == ord("r"):
reason = _prompt(stdscr, "reject reason: ")
@@ -365,7 +415,10 @@ def _render(
attr = curses.A_REVERSE if i == selected else curses.A_NORMAL
stdscr.addnstr(row, 0, line, w - 1, attr)
footer = "[Enter] view [a] approve [m] modify [r] reject [e] routes edit [j/k] move [q] quit"
footer = (
"[Enter] view [a] approve [m] modify [r] reject "
"[e] routes edit [p] pipelock edit [j/k] move [q] quit"
)
stdscr.hline(h - 2, 0, curses.ACS_HLINE, w)
stdscr.addnstr(h - 1, 0, footer, w - 1, curses.A_DIM)
if status_line:
@@ -403,7 +456,7 @@ def _detail_view(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> None:
elif key == ord("a"):
try:
approve(qp)
except CredProxyApplyError:
except ApplyError:
pass # Status surfaces back in the list view's render.
return
elif key == ord("m"):
@@ -411,7 +464,7 @@ def _detail_view(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> None:
if edited is not None:
try:
approve(qp, final_file=edited, notes="operator modified before approving")
except CredProxyApplyError:
except ApplyError:
pass
return
elif key == ord("r"):
@@ -465,33 +518,68 @@ def _operator_edit_routes_flow(stdscr: "curses._CursesWindow") -> str:
cred-proxy sidecars, pick one (single → use directly; multi →
prompt), fetch the current routes, open in $EDITOR, apply on
save. Returns a status-line message."""
slugs = discover_cred_proxy_slugs()
return _operator_edit_flow(
stdscr,
label="routes",
discover=discover_cred_proxy_slugs,
fetch=fetch_current_routes,
apply=operator_edit_routes,
suffix=".json",
)
def _operator_edit_allowlist_flow(stdscr: "curses._CursesWindow") -> str:
"""Operator-initiated pipelock allowlist edit."""
return _operator_edit_flow(
stdscr,
label="pipelock",
discover=discover_pipelock_slugs,
fetch=fetch_current_allowlist,
apply=operator_edit_allowlist,
suffix=".txt",
)
def _operator_edit_flow(
stdscr: "curses._CursesWindow",
*,
label: str,
discover,
fetch,
apply,
suffix: str,
) -> str:
"""Shared scaffolding for the routes-edit + pipelock-edit verbs.
`discover` returns running-sidecar slugs; `fetch(slug)` returns
the current operator-facing config; `apply(slug, new)` does the
write + restart/SIGHUP and writes the audit entry."""
slugs = discover()
if not slugs:
return "no running cred-proxy sidecars to edit"
return f"no running {label} sidecars to edit"
if len(slugs) == 1:
slug = slugs[0]
else:
slug = _prompt(stdscr, f"bottle ({', '.join(slugs)}): ")
if not slug:
return "routes edit aborted"
return f"{label} edit aborted"
if slug not in slugs:
return f"unknown bottle {slug!r}"
try:
current = fetch_current_routes(slug)
except CredProxyApplyError as e:
current = fetch(slug)
except ApplyError as e:
return f"fetch failed: {e}"
curses.endwin()
try:
edited = edit_in_editor(current, suffix=".json")
edited = edit_in_editor(current, suffix=suffix)
finally:
stdscr.refresh()
if edited is None:
return f"routes for [{slug}] unchanged"
return f"{label} for [{slug}] unchanged"
try:
operator_edit_routes(slug, edited)
except CredProxyApplyError as e:
apply(slug, edited)
except ApplyError as e:
return f"apply failed: {e}"
return f"updated routes for [{slug}]"
return f"updated {label} for [{slug}]"
def _prompt(stdscr: "curses._CursesWindow", label: str) -> str:
@@ -0,0 +1,67 @@
# PRD 0015: pipelock block remediation
- **Status:** Draft
- **Author:** didericis
- **Created:** 2026-05-25
- **Parent:** PRD 0012
- **Depends on:** PRD 0013
## Summary
Wires the **pipelock block** path (PRD 0012 *Stuck categories*) end-to-end. The supervisor, on approval of a `pipelock-block` proposal, writes the new pipelock allowlist to the host and restarts pipelock; the agent's in-flight outbound calls may drop and rely on retry. The TUI gains a proactive `pipelock edit <bottle>` verb for operator-initiated edits unrelated to a tool call. The pipelock audit log (format defined in PRD 0013) is filled in with real entries on every edit.
## Problem
See PRD 0012. This PRD specifically addresses: with 0013 in place, the operator can approve a `pipelock-block` proposal but nothing happens — the allowlist doesn't change and pipelock doesn't notice. This PRD closes the loop, using restart-based reload. SIGHUP for pipelock is deferred to a follow-up (see Open questions).
## Goals / Success Criteria
A real pipelock block recovers end-to-end: the agent's outbound HTTP request fails with a connection-refused (host not in allowlist), the agent calls `pipelock-block` with a proposed allowlist and justification, the operator approves in the TUI, the supervisor writes the new allowlist and restarts pipelock, the agent retries and proceeds.
## Non-goals
- SIGHUP / hot reload for pipelock. v1 ships restart-based reload only.
- One-off gitlock / pipelock exceptions (e.g. waive a specific commit SHA, not a permanent allowlist entry) — see Open questions.
- cred-proxy or capability handling (covered by 0014 and 0016).
## Scope
### In scope
- Supervisor write path: on operator approval of a `pipelock-block` proposal, write the proposed allowlist to the host-side path pipelock reads, then restart the pipelock container.
- `pipelock edit <bottle>` TUI verb: open the bottle's current pipelock allowlist in `$EDITOR`, write + restart pipelock on save. Not gated on a pending proposal.
- pipelock audit log entries: every allowlist edit (from a tool-call approval or from a proactive `pipelock edit`) appends an entry with timestamp, diff, justification (if from tool call), and operator action.
### Out of scope
- SIGHUP reload (deferred — see Open questions).
- cred-proxy equivalents (PRD 0014).
## Proposed Design
### New services / components
- **`pipelock edit <bottle>` TUI verb.** Opens the bottle's current pipelock allowlist in `$EDITOR`. On save, the supervisor writes the new file and restarts pipelock.
### Existing code touched
- **pipelock** — gains a clean restart path that picks up the new allowlist on container restart. No pipelock code changes likely needed if pipelock already reads its config on startup; the orchestration is supervisor-side.
- **MCP sidecar** (PRD 0013) — the `pipelock-block` approval handler stops being a no-op; on approval, calls the supervisor's write+restart path.
- **`cli.py`** — dashboard subcommand gains the `pipelock edit` verb.
### Data model changes
None beyond PRD 0013.
## Open questions
- **SIGHUP reload for pipelock.** v1 ships restart-based reload, which drops in-flight outbound calls. Should pipelock gain SIGHUP support so **pipelock block** is as cheap as **cred-proxy block**? Depends on how often the operator edits the allowlist mid-task and how disruptive a pipelock bounce actually is.
- **Gitlock / pipelock one-off exceptions.** Some pipelock denials don't want a permanent allowlist entry — e.g. a commit that includes docs with intentionally-bogus tokens that the secret scanner correctly flags. The shape (agent blocked → tool call → operator decides → result) is the same, but the *resolution* is a per-operation override or a scoped allowlist entry, not a permanent edit. Is this a fourth tool (`exception-block`?), or does it fold into `pipelock-block` with a scoped one-shot allowlist entry? Either way, the approval must be auditable so a future reader can see what was waived and why. See `docs/research/git-gate-commit-approval.md` for a survey of gitleaks's native allowlist primitives and a recommendation.
## References
- PRD 0001 — per-agent egress proxy via pipelock.
- PRD 0008 — git-gate.
- PRD 0012 — stuck-agent recovery flow overview.
- PRD 0013 — supervise plane foundation (prerequisite).
- `docs/research/git-gate-commit-approval.md` — gitleaks allowlist primitives.
+152
View File
@@ -0,0 +1,152 @@
"""Integration: drive `apply_allowlist_change` against a real
pipelock sidecar (PRD 0015).
Brings up a real pipelock sidecar (via the production DockerPipelockProxy
bring-up), calls apply_allowlist_change to swap the api_allowlist,
restarts pipelock, and verifies the running container now serves the
new yaml.
Setup uses pipelock_tls_init which bind-mounts a host path into a
one-shot pipelock container — that doesn't work in DinD, so the test
skips under GITEA_ACTIONS the same way the existing pipelock smoke
test does.
"""
from __future__ import annotations
import dataclasses
import os
import shutil
import subprocess
import tempfile
import time
import unittest
from pathlib import Path
from claude_bottle.backend.docker.network import (
network_create_egress,
network_create_internal,
network_remove,
)
from claude_bottle.backend.docker.pipelock import (
DockerPipelockProxy,
pipelock_container_name,
pipelock_tls_init,
)
from claude_bottle.backend.docker.pipelock_apply import (
PipelockApplyError,
apply_allowlist_change,
fetch_current_allowlist,
fetch_current_yaml,
)
from claude_bottle.yaml_subset import parse_yaml_subset
from tests._docker import skip_unless_docker
from tests.fixtures import fixture_minimal
@skip_unless_docker()
@unittest.skipIf(
os.environ.get("GITEA_ACTIONS") == "true",
"skipped under act_runner: pipelock_tls_init uses a host bind mount "
"that doesn't share fs with the runner container",
)
class TestPipelockApply(unittest.TestCase):
def setUp(self):
self.slug = f"cb-test-pla-{os.getpid()}-{int(time.time())}"
self.sidecar_name = ""
self.internal_net = ""
self.egress_net = ""
self.work_dir = Path(tempfile.mkdtemp(prefix="pipelock-apply."))
def tearDown(self):
if self.sidecar_name:
DockerPipelockProxy().stop(self.sidecar_name)
for n in (self.internal_net, self.egress_net):
if n:
network_remove(n)
shutil.rmtree(self.work_dir, ignore_errors=True)
def _bring_up(self) -> None:
proxy = DockerPipelockProxy()
prep = proxy.prepare(fixture_minimal().bottles["dev"], self.slug, self.work_dir)
self.internal_net = network_create_internal(self.slug)
self.egress_net = network_create_egress(self.slug)
ca_cert_host, ca_key_host = pipelock_tls_init(self.work_dir)
plan = dataclasses.replace(
prep,
internal_network=self.internal_net,
egress_network=self.egress_net,
ca_cert_host_path=ca_cert_host,
ca_key_host_path=ca_key_host,
)
self.sidecar_name = proxy.start(plan)
self.assertEqual(pipelock_container_name(self.slug), self.sidecar_name)
# Wait until docker exec succeeds — the container is up but
# pipelock may still be initializing. fetch_current_yaml is
# itself a docker exec, so retrying it doubles as a readiness
# probe.
deadline = time.monotonic() + 15.0
while time.monotonic() < deadline:
try:
fetch_current_yaml(self.slug)
return
except PipelockApplyError:
pass
time.sleep(0.25)
raise AssertionError("pipelock sidecar never became reachable")
def _wait_for_yaml(self, contains: str, *, deadline_s: float = 15.0) -> str:
"""Poll docker exec until /etc/pipelock.yaml contains `contains`,
returning the yaml. Used to bridge the docker-restart window."""
deadline = time.monotonic() + deadline_s
while time.monotonic() < deadline:
try:
yaml = fetch_current_yaml(self.slug)
if contains in yaml:
return yaml
except PipelockApplyError:
pass
time.sleep(0.25)
self.fail(f"never saw {contains!r} in /etc/pipelock.yaml")
def test_apply_swaps_api_allowlist(self):
self._bring_up()
initial_yaml = fetch_current_yaml(self.slug)
# fixture_minimal yields the baked-in DEFAULT_ALLOWLIST in
# pipelock.py; api.anthropic.com is in there.
self.assertIn("api.anthropic.com", initial_yaml)
new_content = "api.anthropic.com\nnew-host.example\n"
before, after = apply_allowlist_change(self.slug, new_content)
self.assertIn("api.anthropic.com", before)
self.assertNotIn("new-host.example", before)
self.assertIn("new-host.example", after)
updated = self._wait_for_yaml("new-host.example")
cfg = parse_yaml_subset(updated)
self.assertIn("new-host.example", cfg["api_allowlist"]) # type: ignore[operator]
self.assertIn("api.anthropic.com", cfg["api_allowlist"]) # type: ignore[operator]
# tls_interception block (set up by the production prepare
# via pipelock_build_config) is preserved across the swap.
self.assertIn("tls_interception", cfg)
def test_apply_with_invalid_host_raises(self):
self._bring_up()
with self.assertRaises(PipelockApplyError):
apply_allowlist_change(self.slug, "host with space.example\n")
def test_fetch_current_allowlist_renders_one_per_line(self):
self._bring_up()
listing = fetch_current_allowlist(self.slug)
self.assertTrue(listing.endswith("\n"))
self.assertIn("api.anthropic.com\n", listing)
def test_apply_against_missing_sidecar_raises(self):
# Don't bring up — the slug points at nothing.
with self.assertRaises(PipelockApplyError):
apply_allowlist_change(self.slug, "x.example\n")
if __name__ == "__main__":
unittest.main()
+99 -3
View File
@@ -17,6 +17,7 @@ from pathlib import Path
from claude_bottle import supervise
from claude_bottle.backend.docker.cred_proxy_apply import CredProxyApplyError
from claude_bottle.backend.docker.pipelock_apply import PipelockApplyError
from claude_bottle.cli import dashboard
from claude_bottle.supervise import (
Proposal,
@@ -115,15 +116,20 @@ class TestDiscoverPending(_FakeHomeMixin, unittest.TestCase):
class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
def setUp(self):
self._setup_fake_home()
self._original_apply = dashboard.apply_routes_change
# Default stub: succeed with deterministic before/after so the
self._original_apply_routes = dashboard.apply_routes_change
self._original_apply_allowlist = dashboard.apply_allowlist_change
# Default stubs: succeed with deterministic before/after so the
# audit log shows a non-empty diff.
dashboard.apply_routes_change = lambda slug, content: (
'{"routes": []}\n', content,
)
dashboard.apply_allowlist_change = lambda slug, content: (
"old.example\n", content,
)
def tearDown(self):
dashboard.apply_routes_change = self._original_apply
dashboard.apply_routes_change = self._original_apply_routes
dashboard.apply_allowlist_change = self._original_apply_allowlist
self._teardown_fake_home()
def _enqueue(self, tool: str = TOOL_CRED_PROXY_BLOCK):
@@ -268,6 +274,65 @@ class TestCredProxyApplyWiring(_FakeHomeMixin, unittest.TestCase):
self.assertEqual("", entries[0].diff)
class TestPipelockApplyWiring(_FakeHomeMixin, unittest.TestCase):
"""PRD 0015 Phase 2: approve() on a pipelock-block proposal
must call apply_allowlist_change and surface its failures."""
def setUp(self):
self._setup_fake_home()
self._original = dashboard.apply_allowlist_change
def tearDown(self):
dashboard.apply_allowlist_change = self._original
self._teardown_fake_home()
def _enqueue_pipelock(self, proposed: str = "host.example\n"):
p = Proposal.new(
bottle_slug="dev", tool=TOOL_PIPELOCK_BLOCK,
proposed_file=proposed,
justification="need new host",
current_file_hash=sha256_hex(proposed),
now=FIXED,
)
qdir = supervise.queue_dir_for_slug("dev")
qdir.mkdir(parents=True, exist_ok=True)
supervise.write_proposal(qdir, p)
return dashboard.QueuedProposal(proposal=p, queue_dir=qdir)
def test_pipelock_block_calls_apply_with_proposed_file(self):
calls = []
dashboard.apply_allowlist_change = lambda slug, content: (
calls.append((slug, content)) or ("before", content)
)
qp = self._enqueue_pipelock("new.example\n")
dashboard.approve(qp)
self.assertEqual([("dev", "new.example\n")], calls)
def test_apply_failure_blocks_response_and_audit(self):
dashboard.apply_allowlist_change = lambda slug, content: (_ for _ in ()).throw(
PipelockApplyError("docker exec failed")
)
qp = self._enqueue_pipelock()
with self.assertRaises(PipelockApplyError):
dashboard.approve(qp)
self.assertEqual(
[qp.proposal.id],
[p.id for p in supervise.list_pending_proposals(qp.queue_dir)],
)
self.assertEqual([], read_audit_entries("pipelock", "dev"))
def test_real_diff_lands_in_audit(self):
dashboard.apply_allowlist_change = lambda slug, content: (
"old.example\n",
"old.example\nnew.example\n",
)
qp = self._enqueue_pipelock("old.example\nnew.example\n")
dashboard.approve(qp)
entries = read_audit_entries("pipelock", "dev")
self.assertEqual(1, len(entries))
self.assertIn("+new.example", entries[0].diff)
class TestOperatorEditRoutes(_FakeHomeMixin, unittest.TestCase):
"""PRD 0014 Phase 4: operator-initiated routes edit (not gated
on a pending proposal)."""
@@ -313,10 +378,41 @@ class TestDiscoverCredProxySlugs(unittest.TestCase):
os.environ["PATH"] = "/nonexistent-no-docker-here"
try:
self.assertEqual([], dashboard.discover_cred_proxy_slugs())
self.assertEqual([], dashboard.discover_pipelock_slugs())
finally:
os.environ["PATH"] = original
class TestOperatorEditAllowlist(_FakeHomeMixin, unittest.TestCase):
"""PRD 0015 Phase 3: operator-initiated pipelock allowlist edit."""
def setUp(self):
self._setup_fake_home()
self._original = dashboard.apply_allowlist_change
def tearDown(self):
dashboard.apply_allowlist_change = self._original
self._teardown_fake_home()
def test_writes_audit_with_operator_edit_action(self):
dashboard.apply_allowlist_change = lambda slug, content: (
"old.example\n", content,
)
dashboard.operator_edit_allowlist("dev", "old.example\nnew.example\n")
entries = read_audit_entries("pipelock", "dev")
self.assertEqual(1, len(entries))
self.assertEqual(supervise.ACTION_OPERATOR_EDIT, entries[0].operator_action)
self.assertIn("+new.example", entries[0].diff)
def test_failure_does_not_write_audit(self):
dashboard.apply_allowlist_change = lambda slug, content: (_ for _ in ()).throw(
PipelockApplyError("nope")
)
with self.assertRaises(PipelockApplyError):
dashboard.operator_edit_allowlist("dev", "x.example\n")
self.assertEqual([], read_audit_entries("pipelock", "dev"))
class TestEditInEditor(unittest.TestCase):
def test_runs_editor_returns_edited_content(self):
# Fake "editor" is /bin/sh -c 'cat <<EOF > $1 ... EOF'
+115
View File
@@ -0,0 +1,115 @@
"""Unit: pipelock_apply parsers + helpers (PRD 0015 Phase 1).
docker exec / cp / restart paths are covered by the integration
test in Phase 4. Here we cover the host-side parsing + yaml roundtrip.
"""
import unittest
from claude_bottle.backend.docker.pipelock_apply import (
PipelockApplyError,
parse_allowlist_content,
render_allowlist_content,
)
from claude_bottle.pipelock import pipelock_render_yaml
from claude_bottle.yaml_subset import parse_yaml_subset
class TestParseAllowlistContent(unittest.TestCase):
def test_one_per_line(self):
self.assertEqual(
["a.example", "b.example"],
parse_allowlist_content("a.example\nb.example\n"),
)
def test_blank_lines_ignored(self):
self.assertEqual(
["a", "b"],
parse_allowlist_content("a\n\n \nb\n"),
)
def test_comments_ignored(self):
self.assertEqual(
["a"],
parse_allowlist_content("# top comment\na\n# trailing\n"),
)
def test_invalid_char_raises(self):
with self.assertRaises(PipelockApplyError) as cm:
parse_allowlist_content("host with space\n")
self.assertIn("disallowed characters", str(cm.exception))
def test_empty_input_returns_empty_list(self):
self.assertEqual([], parse_allowlist_content(""))
class TestRenderAllowlistContent(unittest.TestCase):
def test_one_per_line_with_trailing_newline(self):
self.assertEqual("a\nb\n", render_allowlist_content(["a", "b"]))
def test_empty_renders_empty(self):
self.assertEqual("", render_allowlist_content([]))
def test_roundtrip(self):
original = ["api.example.com", "ghcr.io", "example.org"]
self.assertEqual(
original,
parse_allowlist_content(render_allowlist_content(original)),
)
class TestYamlRoundtripPreservesPipelockFields(unittest.TestCase):
"""The apply path parses the running pipelock.yaml, swaps
api_allowlist, re-renders. Verify that parse(render(cfg)) ==
cfg for the fields pipelock_render_yaml emits otherwise
the apply would silently drop config."""
def test_minimal_config_roundtrips(self):
cfg = {
"version": 1,
"mode": "strict",
"enforce": True,
"api_allowlist": ["a.example", "b.example"],
"forward_proxy": {"enabled": True},
"dlp": {"include_defaults": True, "scan_env": True},
"request_body_scanning": {"action": "block"},
}
rendered = pipelock_render_yaml(cfg)
parsed = parse_yaml_subset(rendered)
self.assertEqual(["a.example", "b.example"], parsed["api_allowlist"])
self.assertEqual(1, parsed["version"])
self.assertEqual("strict", parsed["mode"])
self.assertEqual(True, parsed["enforce"])
def test_swap_allowlist_then_render_preserves_other_fields(self):
cfg = {
"version": 1,
"mode": "strict",
"enforce": True,
"api_allowlist": ["old.example"],
"forward_proxy": {"enabled": True},
"dlp": {"include_defaults": True, "scan_env": True},
"request_body_scanning": {"action": "block"},
"tls_interception": {
"enabled": True,
"ca_cert": "/etc/pipelock-ca.pem",
"ca_key": "/etc/pipelock-ca-key.pem",
"passthrough_domains": ["api.anthropic.com"],
},
}
parsed = parse_yaml_subset(pipelock_render_yaml(cfg))
parsed["api_allowlist"] = ["new.example"]
rerendered = pipelock_render_yaml(parsed)
roundtripped = parse_yaml_subset(rerendered)
self.assertEqual(["new.example"], roundtripped["api_allowlist"])
# Non-allowlist fields stay put.
self.assertEqual("strict", roundtripped["mode"])
tls = roundtripped["tls_interception"]
self.assertIsInstance(tls, dict)
assert isinstance(tls, dict) # type-narrowing
self.assertEqual("/etc/pipelock-ca.pem", tls["ca_cert"])
self.assertEqual(["api.anthropic.com"], tls["passthrough_domains"])
if __name__ == "__main__":
unittest.main()