PRD 0015: pipelock block remediation #21
@@ -0,0 +1,183 @@
|
|||||||
|
"""pipelock_apply — host-side helper to apply an api_allowlist
|
||||||
|
change to a running pipelock sidecar (PRD 0015).
|
||||||
|
|
||||||
|
Used by the supervise dashboard when the operator approves a
|
||||||
|
pipelock-block proposal (or runs the operator-initiated `pipelock
|
||||||
|
edit <bottle>` verb). Fetches the current pipelock.yaml via `docker
|
||||||
|
exec`, parses it, swaps the api_allowlist with the proposed hosts,
|
||||||
|
re-renders, writes back via `docker cp`, then `docker restart` so
|
||||||
|
pipelock picks up the new config.
|
||||||
|
|
||||||
|
v1 uses restart, not SIGHUP — pipelock has no in-process reload
|
||||||
|
hook and adding one is the "SIGHUP reload for pipelock" open
|
||||||
|
question in PRD 0015. Restart drops in-flight outbound calls; the
|
||||||
|
agent's HTTP client retries pick up against the restarted proxy.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
import subprocess
|
||||||
|
import tempfile
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from ...pipelock import pipelock_render_yaml
|
||||||
|
from ...yaml_subset import parse_yaml_subset
|
||||||
|
from .pipelock import pipelock_container_name
|
||||||
|
|
||||||
|
|
||||||
|
PIPELOCK_YAML_IN_CONTAINER = "/etc/pipelock.yaml"
|
||||||
|
|
||||||
|
# Allowlist proposals are one-hostname-per-line. Blank lines and
|
||||||
|
# `#`-prefixed comments are ignored. The character set matches the
|
||||||
|
# supervise sidecar's syntactic check on the agent's pipelock-block
|
||||||
|
# proposal (alphanumerics + dot/dash/underscore).
|
||||||
|
_HOST_OK = re.compile(r"^[A-Za-z0-9_.-]+$")
|
||||||
|
|
||||||
|
|
||||||
|
class PipelockApplyError(RuntimeError):
|
||||||
|
"""Raised when fetch / parse / apply fails. The dashboard renders
|
||||||
|
the message and keeps the proposal pending — never crashes."""
|
||||||
|
|
||||||
|
|
||||||
|
def parse_allowlist_content(content: str) -> list[str]:
|
||||||
|
"""One hostname per line. Blanks and `#` comments are ignored.
|
||||||
|
Raises PipelockApplyError if a line has a disallowed character."""
|
||||||
|
hosts: list[str] = []
|
||||||
|
for i, raw_line in enumerate(content.splitlines(), start=1):
|
||||||
|
line = raw_line.strip()
|
||||||
|
if not line or line.startswith("#"):
|
||||||
|
continue
|
||||||
|
if not _HOST_OK.match(line):
|
||||||
|
raise PipelockApplyError(
|
||||||
|
f"allowlist line {i}: {line!r} has disallowed characters"
|
||||||
|
)
|
||||||
|
hosts.append(line)
|
||||||
|
return hosts
|
||||||
|
|
||||||
|
|
||||||
|
def render_allowlist_content(hosts: list[str]) -> str:
|
||||||
|
"""Hosts → one-per-line string (the operator-facing format)."""
|
||||||
|
if not hosts:
|
||||||
|
return ""
|
||||||
|
return "\n".join(hosts) + "\n"
|
||||||
|
|
||||||
|
|
||||||
|
def fetch_current_yaml(slug: str) -> str:
|
||||||
|
"""Read the live /etc/pipelock.yaml from the pipelock sidecar.
|
||||||
|
|
||||||
|
Uses `docker cp` (not `docker exec cat`) because the pipelock
|
||||||
|
image is distroless and has no shell utilities. `docker cp` is a
|
||||||
|
daemon-API tarball copy — works on stopped containers too, and
|
||||||
|
doesn't need anything in the container's PATH.
|
||||||
|
|
||||||
|
Raises PipelockApplyError if the read fails."""
|
||||||
|
container = pipelock_container_name(slug)
|
||||||
|
fd, tmp_path = tempfile.mkstemp(prefix="cb-pipelock-fetch.", suffix=".yaml")
|
||||||
|
os.close(fd)
|
||||||
|
try:
|
||||||
|
r = subprocess.run(
|
||||||
|
[
|
||||||
|
"docker", "cp",
|
||||||
|
f"{container}:{PIPELOCK_YAML_IN_CONTAINER}", tmp_path,
|
||||||
|
],
|
||||||
|
capture_output=True, text=True, check=False,
|
||||||
|
)
|
||||||
|
if r.returncode != 0:
|
||||||
|
raise PipelockApplyError(
|
||||||
|
f"could not fetch pipelock.yaml from {container}: "
|
||||||
|
f"{(r.stderr or '').strip() or 'container not running?'}"
|
||||||
|
)
|
||||||
|
return Path(tmp_path).read_text()
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
Path(tmp_path).unlink()
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def fetch_current_allowlist(slug: str) -> str:
|
||||||
|
"""Fetch the live yaml, extract api_allowlist, render as one-per-
|
||||||
|
line — the operator-facing format for the TUI / agent's
|
||||||
|
current-config mount."""
|
||||||
|
yaml = fetch_current_yaml(slug)
|
||||||
|
cfg = parse_yaml_subset(yaml)
|
||||||
|
hosts = cfg.get("api_allowlist", [])
|
||||||
|
if not isinstance(hosts, list):
|
||||||
|
raise PipelockApplyError(
|
||||||
|
"running pipelock yaml: api_allowlist is not a list"
|
||||||
|
)
|
||||||
|
return render_allowlist_content([str(h) for h in hosts])
|
||||||
|
|
||||||
|
|
||||||
|
def apply_allowlist_change(
|
||||||
|
slug: str, new_allowlist_content: str,
|
||||||
|
) -> tuple[str, str]:
|
||||||
|
"""Apply `new_allowlist_content` to the pipelock sidecar:
|
||||||
|
1. Parse the proposed hosts (one per line).
|
||||||
|
2. Fetch + parse current pipelock.yaml.
|
||||||
|
3. Replace api_allowlist with the proposed hosts; re-render.
|
||||||
|
4. docker cp the new yaml into the sidecar.
|
||||||
|
5. docker restart so pipelock reloads.
|
||||||
|
|
||||||
|
Returns (before, after) where both are one-per-line allowlist
|
||||||
|
strings (operator-facing format). Raises PipelockApplyError on
|
||||||
|
any failure; the sidecar's existing config stays in place until
|
||||||
|
docker cp succeeds, and the restart is what makes it live."""
|
||||||
|
new_hosts = parse_allowlist_content(new_allowlist_content)
|
||||||
|
container = pipelock_container_name(slug)
|
||||||
|
current_yaml = fetch_current_yaml(slug)
|
||||||
|
cfg = parse_yaml_subset(current_yaml)
|
||||||
|
current_hosts = cfg.get("api_allowlist", [])
|
||||||
|
if not isinstance(current_hosts, list):
|
||||||
|
raise PipelockApplyError(
|
||||||
|
"running pipelock yaml: api_allowlist is not a list"
|
||||||
|
)
|
||||||
|
|
||||||
|
before = render_allowlist_content([str(h) for h in current_hosts])
|
||||||
|
after = render_allowlist_content(new_hosts)
|
||||||
|
|
||||||
|
cfg["api_allowlist"] = new_hosts
|
||||||
|
rendered = pipelock_render_yaml(cfg)
|
||||||
|
|
||||||
|
fd, tmp_path = tempfile.mkstemp(prefix="cb-pipelock-yaml.", suffix=".yaml")
|
||||||
|
try:
|
||||||
|
with os.fdopen(fd, "w") as f:
|
||||||
|
f.write(rendered)
|
||||||
|
cp = subprocess.run(
|
||||||
|
["docker", "cp", tmp_path, f"{container}:{PIPELOCK_YAML_IN_CONTAINER}"],
|
||||||
|
capture_output=True, text=True, check=False,
|
||||||
|
)
|
||||||
|
if cp.returncode != 0:
|
||||||
|
raise PipelockApplyError(
|
||||||
|
f"failed to copy pipelock.yaml into {container}: "
|
||||||
|
f"{(cp.stderr or '').strip()}"
|
||||||
|
)
|
||||||
|
restart = subprocess.run(
|
||||||
|
["docker", "restart", container],
|
||||||
|
capture_output=True, text=True, check=False,
|
||||||
|
)
|
||||||
|
if restart.returncode != 0:
|
||||||
|
raise PipelockApplyError(
|
||||||
|
f"failed to restart {container}: "
|
||||||
|
f"{(restart.stderr or '').strip()}"
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
Path(tmp_path).unlink()
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return before, after
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"PIPELOCK_YAML_IN_CONTAINER",
|
||||||
|
"PipelockApplyError",
|
||||||
|
"apply_allowlist_change",
|
||||||
|
"fetch_current_allowlist",
|
||||||
|
"fetch_current_yaml",
|
||||||
|
"parse_allowlist_content",
|
||||||
|
"render_allowlist_content",
|
||||||
|
]
|
||||||
+114
-26
@@ -27,6 +27,11 @@ from ..backend.docker.cred_proxy_apply import (
|
|||||||
apply_routes_change,
|
apply_routes_change,
|
||||||
fetch_current_routes,
|
fetch_current_routes,
|
||||||
)
|
)
|
||||||
|
from ..backend.docker.pipelock_apply import (
|
||||||
|
PipelockApplyError,
|
||||||
|
apply_allowlist_change,
|
||||||
|
fetch_current_allowlist,
|
||||||
|
)
|
||||||
from ..log import info
|
from ..log import info
|
||||||
from ..supervise import (
|
from ..supervise import (
|
||||||
ACTION_OPERATOR_EDIT,
|
ACTION_OPERATOR_EDIT,
|
||||||
@@ -39,6 +44,7 @@ from ..supervise import (
|
|||||||
STATUS_REJECTED,
|
STATUS_REJECTED,
|
||||||
TOOL_CAPABILITY_BLOCK,
|
TOOL_CAPABILITY_BLOCK,
|
||||||
TOOL_CRED_PROXY_BLOCK,
|
TOOL_CRED_PROXY_BLOCK,
|
||||||
|
TOOL_PIPELOCK_BLOCK,
|
||||||
list_pending_proposals,
|
list_pending_proposals,
|
||||||
render_diff,
|
render_diff,
|
||||||
write_audit_entry,
|
write_audit_entry,
|
||||||
@@ -47,6 +53,12 @@ from ..supervise import (
|
|||||||
from ._common import PROG
|
from ._common import PROG
|
||||||
|
|
||||||
|
|
||||||
|
# Errors any remediation engine may raise. Caught by the TUI key
|
||||||
|
# handlers and surfaced in the status line so a failed apply keeps
|
||||||
|
# the proposal pending rather than crashing curses.
|
||||||
|
ApplyError = (CredProxyApplyError, PipelockApplyError)
|
||||||
|
|
||||||
|
|
||||||
# --- Discovery -------------------------------------------------------------
|
# --- Discovery -------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
@@ -58,16 +70,15 @@ class QueuedProposal:
|
|||||||
queue_dir: Path
|
queue_dir: Path
|
||||||
|
|
||||||
|
|
||||||
def discover_cred_proxy_slugs() -> list[str]:
|
def _discover_sidecar_slugs(name_prefix: str) -> list[str]:
|
||||||
"""Slugs of bottles with a running cred-proxy sidecar. Used by
|
"""Slugs of bottles whose sidecar container names start with
|
||||||
the operator-initiated `routes edit` verb to know which bottles
|
`name_prefix`. Empty list if docker isn't reachable or not
|
||||||
are editable. Empty list if docker isn't reachable or not
|
|
||||||
installed."""
|
installed."""
|
||||||
try:
|
try:
|
||||||
r = subprocess.run(
|
r = subprocess.run(
|
||||||
[
|
[
|
||||||
"docker", "ps",
|
"docker", "ps",
|
||||||
"--filter", "name=^claude-bottle-cred-proxy-",
|
"--filter", f"name=^{name_prefix}",
|
||||||
"--format", "{{.Names}}",
|
"--format", "{{.Names}}",
|
||||||
],
|
],
|
||||||
capture_output=True, text=True, check=False,
|
capture_output=True, text=True, check=False,
|
||||||
@@ -76,15 +87,26 @@ def discover_cred_proxy_slugs() -> list[str]:
|
|||||||
return []
|
return []
|
||||||
if r.returncode != 0:
|
if r.returncode != 0:
|
||||||
return []
|
return []
|
||||||
prefix = "claude-bottle-cred-proxy-"
|
|
||||||
out: list[str] = []
|
out: list[str] = []
|
||||||
for line in (r.stdout or "").splitlines():
|
for line in (r.stdout or "").splitlines():
|
||||||
line = line.strip()
|
line = line.strip()
|
||||||
if line.startswith(prefix):
|
if line.startswith(name_prefix):
|
||||||
out.append(line[len(prefix):])
|
out.append(line[len(name_prefix):])
|
||||||
return sorted(out)
|
return sorted(out)
|
||||||
|
|
||||||
|
|
||||||
|
def discover_cred_proxy_slugs() -> list[str]:
|
||||||
|
"""Slugs of bottles with a running cred-proxy sidecar. Used by
|
||||||
|
the operator-initiated `routes edit` verb."""
|
||||||
|
return _discover_sidecar_slugs("claude-bottle-cred-proxy-")
|
||||||
|
|
||||||
|
|
||||||
|
def discover_pipelock_slugs() -> list[str]:
|
||||||
|
"""Slugs of bottles with a running pipelock sidecar. Used by
|
||||||
|
the operator-initiated `pipelock edit` verb."""
|
||||||
|
return _discover_sidecar_slugs("claude-bottle-pipelock-")
|
||||||
|
|
||||||
|
|
||||||
def discover_pending() -> list[QueuedProposal]:
|
def discover_pending() -> list[QueuedProposal]:
|
||||||
"""Walk ~/.claude-bottle/queue/* and collect pending proposals
|
"""Walk ~/.claude-bottle/queue/* and collect pending proposals
|
||||||
from every bottle's queue. Sorted by arrival time across the
|
from every bottle's queue. Sorted by arrival time across the
|
||||||
@@ -129,9 +151,13 @@ def approve(
|
|||||||
diff_before, diff_after = apply_routes_change(
|
diff_before, diff_after = apply_routes_change(
|
||||||
qp.proposal.bottle_slug, file_to_apply,
|
qp.proposal.bottle_slug, file_to_apply,
|
||||||
)
|
)
|
||||||
# pipelock-block + capability-block remediation lands in PRDs
|
elif qp.proposal.tool == TOOL_PIPELOCK_BLOCK:
|
||||||
# 0015 + 0016; for 0014 they remain no-op approvals and the
|
diff_before, diff_after = apply_allowlist_change(
|
||||||
# audit diff stays empty.
|
qp.proposal.bottle_slug, file_to_apply,
|
||||||
|
)
|
||||||
|
# capability-block remediation lands in PRD 0016; until then
|
||||||
|
# it stays a no-op approval and the audit (none for capability)
|
||||||
|
# is skipped.
|
||||||
|
|
||||||
response = Response(
|
response = Response(
|
||||||
proposal_id=qp.proposal.id,
|
proposal_id=qp.proposal.id,
|
||||||
@@ -181,6 +207,27 @@ def operator_edit_routes(slug: str, new_content: str) -> tuple[str, str]:
|
|||||||
return before, after
|
return before, after
|
||||||
|
|
||||||
|
|
||||||
|
def operator_edit_allowlist(slug: str, new_content: str) -> tuple[str, str]:
|
||||||
|
"""Apply an operator-initiated pipelock allowlist change (no
|
||||||
|
agent proposal). Used by the `pipelock edit <bottle>` TUI verb
|
||||||
|
and available for scripted use. Returns (before, after) like
|
||||||
|
apply_allowlist_change. Writes an audit entry tagged
|
||||||
|
ACTION_OPERATOR_EDIT to distinguish from tool-call approvals.
|
||||||
|
|
||||||
|
Raises PipelockApplyError on failure."""
|
||||||
|
before, after = apply_allowlist_change(slug, new_content)
|
||||||
|
write_audit_entry(AuditEntry(
|
||||||
|
timestamp=datetime.now(timezone.utc).isoformat(),
|
||||||
|
bottle_slug=slug,
|
||||||
|
component="pipelock",
|
||||||
|
operator_action=ACTION_OPERATOR_EDIT,
|
||||||
|
operator_notes="",
|
||||||
|
justification="",
|
||||||
|
diff=render_diff(before, after, label="pipelock"),
|
||||||
|
))
|
||||||
|
return before, after
|
||||||
|
|
||||||
|
|
||||||
def _write_audit(
|
def _write_audit(
|
||||||
qp: QueuedProposal,
|
qp: QueuedProposal,
|
||||||
*,
|
*,
|
||||||
@@ -296,6 +343,9 @@ def _main_loop(stdscr: "curses._CursesWindow") -> None:
|
|||||||
if key == ord("e"):
|
if key == ord("e"):
|
||||||
status_line = _operator_edit_routes_flow(stdscr)
|
status_line = _operator_edit_routes_flow(stdscr)
|
||||||
continue
|
continue
|
||||||
|
if key == ord("p"):
|
||||||
|
status_line = _operator_edit_allowlist_flow(stdscr)
|
||||||
|
continue
|
||||||
if not pending:
|
if not pending:
|
||||||
continue
|
continue
|
||||||
qp = pending[selected]
|
qp = pending[selected]
|
||||||
@@ -310,7 +360,7 @@ def _main_loop(stdscr: "curses._CursesWindow") -> None:
|
|||||||
try:
|
try:
|
||||||
approve(qp)
|
approve(qp)
|
||||||
status_line = f"approved {qp.proposal.tool} for [{qp.proposal.bottle_slug}]"
|
status_line = f"approved {qp.proposal.tool} for [{qp.proposal.bottle_slug}]"
|
||||||
except CredProxyApplyError as e:
|
except ApplyError as e:
|
||||||
status_line = f"apply failed: {e}"
|
status_line = f"apply failed: {e}"
|
||||||
elif key == ord("m"):
|
elif key == ord("m"):
|
||||||
edited = _modify(stdscr, qp)
|
edited = _modify(stdscr, qp)
|
||||||
@@ -320,7 +370,7 @@ def _main_loop(stdscr: "curses._CursesWindow") -> None:
|
|||||||
try:
|
try:
|
||||||
approve(qp, final_file=edited, notes="operator modified before approving")
|
approve(qp, final_file=edited, notes="operator modified before approving")
|
||||||
status_line = f"modified+approved {qp.proposal.tool} for [{qp.proposal.bottle_slug}]"
|
status_line = f"modified+approved {qp.proposal.tool} for [{qp.proposal.bottle_slug}]"
|
||||||
except CredProxyApplyError as e:
|
except ApplyError as e:
|
||||||
status_line = f"apply failed: {e}"
|
status_line = f"apply failed: {e}"
|
||||||
elif key == ord("r"):
|
elif key == ord("r"):
|
||||||
reason = _prompt(stdscr, "reject reason: ")
|
reason = _prompt(stdscr, "reject reason: ")
|
||||||
@@ -365,7 +415,10 @@ def _render(
|
|||||||
attr = curses.A_REVERSE if i == selected else curses.A_NORMAL
|
attr = curses.A_REVERSE if i == selected else curses.A_NORMAL
|
||||||
stdscr.addnstr(row, 0, line, w - 1, attr)
|
stdscr.addnstr(row, 0, line, w - 1, attr)
|
||||||
|
|
||||||
footer = "[Enter] view [a] approve [m] modify [r] reject [e] routes edit [j/k] move [q] quit"
|
footer = (
|
||||||
|
"[Enter] view [a] approve [m] modify [r] reject "
|
||||||
|
"[e] routes edit [p] pipelock edit [j/k] move [q] quit"
|
||||||
|
)
|
||||||
stdscr.hline(h - 2, 0, curses.ACS_HLINE, w)
|
stdscr.hline(h - 2, 0, curses.ACS_HLINE, w)
|
||||||
stdscr.addnstr(h - 1, 0, footer, w - 1, curses.A_DIM)
|
stdscr.addnstr(h - 1, 0, footer, w - 1, curses.A_DIM)
|
||||||
if status_line:
|
if status_line:
|
||||||
@@ -403,7 +456,7 @@ def _detail_view(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> None:
|
|||||||
elif key == ord("a"):
|
elif key == ord("a"):
|
||||||
try:
|
try:
|
||||||
approve(qp)
|
approve(qp)
|
||||||
except CredProxyApplyError:
|
except ApplyError:
|
||||||
pass # Status surfaces back in the list view's render.
|
pass # Status surfaces back in the list view's render.
|
||||||
return
|
return
|
||||||
elif key == ord("m"):
|
elif key == ord("m"):
|
||||||
@@ -411,7 +464,7 @@ def _detail_view(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> None:
|
|||||||
if edited is not None:
|
if edited is not None:
|
||||||
try:
|
try:
|
||||||
approve(qp, final_file=edited, notes="operator modified before approving")
|
approve(qp, final_file=edited, notes="operator modified before approving")
|
||||||
except CredProxyApplyError:
|
except ApplyError:
|
||||||
pass
|
pass
|
||||||
return
|
return
|
||||||
elif key == ord("r"):
|
elif key == ord("r"):
|
||||||
@@ -465,33 +518,68 @@ def _operator_edit_routes_flow(stdscr: "curses._CursesWindow") -> str:
|
|||||||
cred-proxy sidecars, pick one (single → use directly; multi →
|
cred-proxy sidecars, pick one (single → use directly; multi →
|
||||||
prompt), fetch the current routes, open in $EDITOR, apply on
|
prompt), fetch the current routes, open in $EDITOR, apply on
|
||||||
save. Returns a status-line message."""
|
save. Returns a status-line message."""
|
||||||
slugs = discover_cred_proxy_slugs()
|
return _operator_edit_flow(
|
||||||
|
stdscr,
|
||||||
|
label="routes",
|
||||||
|
discover=discover_cred_proxy_slugs,
|
||||||
|
fetch=fetch_current_routes,
|
||||||
|
apply=operator_edit_routes,
|
||||||
|
suffix=".json",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _operator_edit_allowlist_flow(stdscr: "curses._CursesWindow") -> str:
|
||||||
|
"""Operator-initiated pipelock allowlist edit."""
|
||||||
|
return _operator_edit_flow(
|
||||||
|
stdscr,
|
||||||
|
label="pipelock",
|
||||||
|
discover=discover_pipelock_slugs,
|
||||||
|
fetch=fetch_current_allowlist,
|
||||||
|
apply=operator_edit_allowlist,
|
||||||
|
suffix=".txt",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _operator_edit_flow(
|
||||||
|
stdscr: "curses._CursesWindow",
|
||||||
|
*,
|
||||||
|
label: str,
|
||||||
|
discover,
|
||||||
|
fetch,
|
||||||
|
apply,
|
||||||
|
suffix: str,
|
||||||
|
) -> str:
|
||||||
|
"""Shared scaffolding for the routes-edit + pipelock-edit verbs.
|
||||||
|
`discover` returns running-sidecar slugs; `fetch(slug)` returns
|
||||||
|
the current operator-facing config; `apply(slug, new)` does the
|
||||||
|
write + restart/SIGHUP and writes the audit entry."""
|
||||||
|
slugs = discover()
|
||||||
if not slugs:
|
if not slugs:
|
||||||
return "no running cred-proxy sidecars to edit"
|
return f"no running {label} sidecars to edit"
|
||||||
if len(slugs) == 1:
|
if len(slugs) == 1:
|
||||||
slug = slugs[0]
|
slug = slugs[0]
|
||||||
else:
|
else:
|
||||||
slug = _prompt(stdscr, f"bottle ({', '.join(slugs)}): ")
|
slug = _prompt(stdscr, f"bottle ({', '.join(slugs)}): ")
|
||||||
if not slug:
|
if not slug:
|
||||||
return "routes edit aborted"
|
return f"{label} edit aborted"
|
||||||
if slug not in slugs:
|
if slug not in slugs:
|
||||||
return f"unknown bottle {slug!r}"
|
return f"unknown bottle {slug!r}"
|
||||||
try:
|
try:
|
||||||
current = fetch_current_routes(slug)
|
current = fetch(slug)
|
||||||
except CredProxyApplyError as e:
|
except ApplyError as e:
|
||||||
return f"fetch failed: {e}"
|
return f"fetch failed: {e}"
|
||||||
curses.endwin()
|
curses.endwin()
|
||||||
try:
|
try:
|
||||||
edited = edit_in_editor(current, suffix=".json")
|
edited = edit_in_editor(current, suffix=suffix)
|
||||||
finally:
|
finally:
|
||||||
stdscr.refresh()
|
stdscr.refresh()
|
||||||
if edited is None:
|
if edited is None:
|
||||||
return f"routes for [{slug}] unchanged"
|
return f"{label} for [{slug}] unchanged"
|
||||||
try:
|
try:
|
||||||
operator_edit_routes(slug, edited)
|
apply(slug, edited)
|
||||||
except CredProxyApplyError as e:
|
except ApplyError as e:
|
||||||
return f"apply failed: {e}"
|
return f"apply failed: {e}"
|
||||||
return f"updated routes for [{slug}]"
|
return f"updated {label} for [{slug}]"
|
||||||
|
|
||||||
|
|
||||||
def _prompt(stdscr: "curses._CursesWindow", label: str) -> str:
|
def _prompt(stdscr: "curses._CursesWindow", label: str) -> str:
|
||||||
|
|||||||
@@ -0,0 +1,67 @@
|
|||||||
|
# PRD 0015: pipelock block remediation
|
||||||
|
|
||||||
|
- **Status:** Draft
|
||||||
|
- **Author:** didericis
|
||||||
|
- **Created:** 2026-05-25
|
||||||
|
- **Parent:** PRD 0012
|
||||||
|
- **Depends on:** PRD 0013
|
||||||
|
|
||||||
|
## Summary
|
||||||
|
|
||||||
|
Wires the **pipelock block** path (PRD 0012 *Stuck categories*) end-to-end. The supervisor, on approval of a `pipelock-block` proposal, writes the new pipelock allowlist to the host and restarts pipelock; the agent's in-flight outbound calls may drop and rely on retry. The TUI gains a proactive `pipelock edit <bottle>` verb for operator-initiated edits unrelated to a tool call. The pipelock audit log (format defined in PRD 0013) is filled in with real entries on every edit.
|
||||||
|
|
||||||
|
## Problem
|
||||||
|
|
||||||
|
See PRD 0012. This PRD specifically addresses: with 0013 in place, the operator can approve a `pipelock-block` proposal but nothing happens — the allowlist doesn't change and pipelock doesn't notice. This PRD closes the loop, using restart-based reload. SIGHUP for pipelock is deferred to a follow-up (see Open questions).
|
||||||
|
|
||||||
|
## Goals / Success Criteria
|
||||||
|
|
||||||
|
A real pipelock block recovers end-to-end: the agent's outbound HTTP request fails with a connection-refused (host not in allowlist), the agent calls `pipelock-block` with a proposed allowlist and justification, the operator approves in the TUI, the supervisor writes the new allowlist and restarts pipelock, the agent retries and proceeds.
|
||||||
|
|
||||||
|
## Non-goals
|
||||||
|
|
||||||
|
- SIGHUP / hot reload for pipelock. v1 ships restart-based reload only.
|
||||||
|
- One-off gitlock / pipelock exceptions (e.g. waive a specific commit SHA, not a permanent allowlist entry) — see Open questions.
|
||||||
|
- cred-proxy or capability handling (covered by 0014 and 0016).
|
||||||
|
|
||||||
|
## Scope
|
||||||
|
|
||||||
|
### In scope
|
||||||
|
|
||||||
|
- Supervisor write path: on operator approval of a `pipelock-block` proposal, write the proposed allowlist to the host-side path pipelock reads, then restart the pipelock container.
|
||||||
|
- `pipelock edit <bottle>` TUI verb: open the bottle's current pipelock allowlist in `$EDITOR`, write + restart pipelock on save. Not gated on a pending proposal.
|
||||||
|
- pipelock audit log entries: every allowlist edit (from a tool-call approval or from a proactive `pipelock edit`) appends an entry with timestamp, diff, justification (if from tool call), and operator action.
|
||||||
|
|
||||||
|
### Out of scope
|
||||||
|
|
||||||
|
- SIGHUP reload (deferred — see Open questions).
|
||||||
|
- cred-proxy equivalents (PRD 0014).
|
||||||
|
|
||||||
|
## Proposed Design
|
||||||
|
|
||||||
|
### New services / components
|
||||||
|
|
||||||
|
- **`pipelock edit <bottle>` TUI verb.** Opens the bottle's current pipelock allowlist in `$EDITOR`. On save, the supervisor writes the new file and restarts pipelock.
|
||||||
|
|
||||||
|
### Existing code touched
|
||||||
|
|
||||||
|
- **pipelock** — gains a clean restart path that picks up the new allowlist on container restart. No pipelock code changes likely needed if pipelock already reads its config on startup; the orchestration is supervisor-side.
|
||||||
|
- **MCP sidecar** (PRD 0013) — the `pipelock-block` approval handler stops being a no-op; on approval, calls the supervisor's write+restart path.
|
||||||
|
- **`cli.py`** — dashboard subcommand gains the `pipelock edit` verb.
|
||||||
|
|
||||||
|
### Data model changes
|
||||||
|
|
||||||
|
None beyond PRD 0013.
|
||||||
|
|
||||||
|
## Open questions
|
||||||
|
|
||||||
|
- **SIGHUP reload for pipelock.** v1 ships restart-based reload, which drops in-flight outbound calls. Should pipelock gain SIGHUP support so **pipelock block** is as cheap as **cred-proxy block**? Depends on how often the operator edits the allowlist mid-task and how disruptive a pipelock bounce actually is.
|
||||||
|
- **Gitlock / pipelock one-off exceptions.** Some pipelock denials don't want a permanent allowlist entry — e.g. a commit that includes docs with intentionally-bogus tokens that the secret scanner correctly flags. The shape (agent blocked → tool call → operator decides → result) is the same, but the *resolution* is a per-operation override or a scoped allowlist entry, not a permanent edit. Is this a fourth tool (`exception-block`?), or does it fold into `pipelock-block` with a scoped one-shot allowlist entry? Either way, the approval must be auditable so a future reader can see what was waived and why. See `docs/research/git-gate-commit-approval.md` for a survey of gitleaks's native allowlist primitives and a recommendation.
|
||||||
|
|
||||||
|
## References
|
||||||
|
|
||||||
|
- PRD 0001 — per-agent egress proxy via pipelock.
|
||||||
|
- PRD 0008 — git-gate.
|
||||||
|
- PRD 0012 — stuck-agent recovery flow overview.
|
||||||
|
- PRD 0013 — supervise plane foundation (prerequisite).
|
||||||
|
- `docs/research/git-gate-commit-approval.md` — gitleaks allowlist primitives.
|
||||||
@@ -0,0 +1,152 @@
|
|||||||
|
"""Integration: drive `apply_allowlist_change` against a real
|
||||||
|
pipelock sidecar (PRD 0015).
|
||||||
|
|
||||||
|
Brings up a real pipelock sidecar (via the production DockerPipelockProxy
|
||||||
|
bring-up), calls apply_allowlist_change to swap the api_allowlist,
|
||||||
|
restarts pipelock, and verifies the running container now serves the
|
||||||
|
new yaml.
|
||||||
|
|
||||||
|
Setup uses pipelock_tls_init which bind-mounts a host path into a
|
||||||
|
one-shot pipelock container — that doesn't work in DinD, so the test
|
||||||
|
skips under GITEA_ACTIONS the same way the existing pipelock smoke
|
||||||
|
test does.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import dataclasses
|
||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
import subprocess
|
||||||
|
import tempfile
|
||||||
|
import time
|
||||||
|
import unittest
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from claude_bottle.backend.docker.network import (
|
||||||
|
network_create_egress,
|
||||||
|
network_create_internal,
|
||||||
|
network_remove,
|
||||||
|
)
|
||||||
|
from claude_bottle.backend.docker.pipelock import (
|
||||||
|
DockerPipelockProxy,
|
||||||
|
pipelock_container_name,
|
||||||
|
pipelock_tls_init,
|
||||||
|
)
|
||||||
|
from claude_bottle.backend.docker.pipelock_apply import (
|
||||||
|
PipelockApplyError,
|
||||||
|
apply_allowlist_change,
|
||||||
|
fetch_current_allowlist,
|
||||||
|
fetch_current_yaml,
|
||||||
|
)
|
||||||
|
from claude_bottle.yaml_subset import parse_yaml_subset
|
||||||
|
from tests._docker import skip_unless_docker
|
||||||
|
from tests.fixtures import fixture_minimal
|
||||||
|
|
||||||
|
|
||||||
|
@skip_unless_docker()
|
||||||
|
@unittest.skipIf(
|
||||||
|
os.environ.get("GITEA_ACTIONS") == "true",
|
||||||
|
"skipped under act_runner: pipelock_tls_init uses a host bind mount "
|
||||||
|
"that doesn't share fs with the runner container",
|
||||||
|
)
|
||||||
|
class TestPipelockApply(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.slug = f"cb-test-pla-{os.getpid()}-{int(time.time())}"
|
||||||
|
self.sidecar_name = ""
|
||||||
|
self.internal_net = ""
|
||||||
|
self.egress_net = ""
|
||||||
|
self.work_dir = Path(tempfile.mkdtemp(prefix="pipelock-apply."))
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
if self.sidecar_name:
|
||||||
|
DockerPipelockProxy().stop(self.sidecar_name)
|
||||||
|
for n in (self.internal_net, self.egress_net):
|
||||||
|
if n:
|
||||||
|
network_remove(n)
|
||||||
|
shutil.rmtree(self.work_dir, ignore_errors=True)
|
||||||
|
|
||||||
|
def _bring_up(self) -> None:
|
||||||
|
proxy = DockerPipelockProxy()
|
||||||
|
prep = proxy.prepare(fixture_minimal().bottles["dev"], self.slug, self.work_dir)
|
||||||
|
self.internal_net = network_create_internal(self.slug)
|
||||||
|
self.egress_net = network_create_egress(self.slug)
|
||||||
|
ca_cert_host, ca_key_host = pipelock_tls_init(self.work_dir)
|
||||||
|
plan = dataclasses.replace(
|
||||||
|
prep,
|
||||||
|
internal_network=self.internal_net,
|
||||||
|
egress_network=self.egress_net,
|
||||||
|
ca_cert_host_path=ca_cert_host,
|
||||||
|
ca_key_host_path=ca_key_host,
|
||||||
|
)
|
||||||
|
self.sidecar_name = proxy.start(plan)
|
||||||
|
self.assertEqual(pipelock_container_name(self.slug), self.sidecar_name)
|
||||||
|
# Wait until docker exec succeeds — the container is up but
|
||||||
|
# pipelock may still be initializing. fetch_current_yaml is
|
||||||
|
# itself a docker exec, so retrying it doubles as a readiness
|
||||||
|
# probe.
|
||||||
|
deadline = time.monotonic() + 15.0
|
||||||
|
while time.monotonic() < deadline:
|
||||||
|
try:
|
||||||
|
fetch_current_yaml(self.slug)
|
||||||
|
return
|
||||||
|
except PipelockApplyError:
|
||||||
|
pass
|
||||||
|
time.sleep(0.25)
|
||||||
|
raise AssertionError("pipelock sidecar never became reachable")
|
||||||
|
|
||||||
|
def _wait_for_yaml(self, contains: str, *, deadline_s: float = 15.0) -> str:
|
||||||
|
"""Poll docker exec until /etc/pipelock.yaml contains `contains`,
|
||||||
|
returning the yaml. Used to bridge the docker-restart window."""
|
||||||
|
deadline = time.monotonic() + deadline_s
|
||||||
|
while time.monotonic() < deadline:
|
||||||
|
try:
|
||||||
|
yaml = fetch_current_yaml(self.slug)
|
||||||
|
if contains in yaml:
|
||||||
|
return yaml
|
||||||
|
except PipelockApplyError:
|
||||||
|
pass
|
||||||
|
time.sleep(0.25)
|
||||||
|
self.fail(f"never saw {contains!r} in /etc/pipelock.yaml")
|
||||||
|
|
||||||
|
def test_apply_swaps_api_allowlist(self):
|
||||||
|
self._bring_up()
|
||||||
|
|
||||||
|
initial_yaml = fetch_current_yaml(self.slug)
|
||||||
|
# fixture_minimal yields the baked-in DEFAULT_ALLOWLIST in
|
||||||
|
# pipelock.py; api.anthropic.com is in there.
|
||||||
|
self.assertIn("api.anthropic.com", initial_yaml)
|
||||||
|
|
||||||
|
new_content = "api.anthropic.com\nnew-host.example\n"
|
||||||
|
before, after = apply_allowlist_change(self.slug, new_content)
|
||||||
|
self.assertIn("api.anthropic.com", before)
|
||||||
|
self.assertNotIn("new-host.example", before)
|
||||||
|
self.assertIn("new-host.example", after)
|
||||||
|
|
||||||
|
updated = self._wait_for_yaml("new-host.example")
|
||||||
|
cfg = parse_yaml_subset(updated)
|
||||||
|
self.assertIn("new-host.example", cfg["api_allowlist"]) # type: ignore[operator]
|
||||||
|
self.assertIn("api.anthropic.com", cfg["api_allowlist"]) # type: ignore[operator]
|
||||||
|
# tls_interception block (set up by the production prepare
|
||||||
|
# via pipelock_build_config) is preserved across the swap.
|
||||||
|
self.assertIn("tls_interception", cfg)
|
||||||
|
|
||||||
|
def test_apply_with_invalid_host_raises(self):
|
||||||
|
self._bring_up()
|
||||||
|
with self.assertRaises(PipelockApplyError):
|
||||||
|
apply_allowlist_change(self.slug, "host with space.example\n")
|
||||||
|
|
||||||
|
def test_fetch_current_allowlist_renders_one_per_line(self):
|
||||||
|
self._bring_up()
|
||||||
|
listing = fetch_current_allowlist(self.slug)
|
||||||
|
self.assertTrue(listing.endswith("\n"))
|
||||||
|
self.assertIn("api.anthropic.com\n", listing)
|
||||||
|
|
||||||
|
def test_apply_against_missing_sidecar_raises(self):
|
||||||
|
# Don't bring up — the slug points at nothing.
|
||||||
|
with self.assertRaises(PipelockApplyError):
|
||||||
|
apply_allowlist_change(self.slug, "x.example\n")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
@@ -17,6 +17,7 @@ from pathlib import Path
|
|||||||
|
|
||||||
from claude_bottle import supervise
|
from claude_bottle import supervise
|
||||||
from claude_bottle.backend.docker.cred_proxy_apply import CredProxyApplyError
|
from claude_bottle.backend.docker.cred_proxy_apply import CredProxyApplyError
|
||||||
|
from claude_bottle.backend.docker.pipelock_apply import PipelockApplyError
|
||||||
from claude_bottle.cli import dashboard
|
from claude_bottle.cli import dashboard
|
||||||
from claude_bottle.supervise import (
|
from claude_bottle.supervise import (
|
||||||
Proposal,
|
Proposal,
|
||||||
@@ -115,15 +116,20 @@ class TestDiscoverPending(_FakeHomeMixin, unittest.TestCase):
|
|||||||
class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
|
class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self._setup_fake_home()
|
self._setup_fake_home()
|
||||||
self._original_apply = dashboard.apply_routes_change
|
self._original_apply_routes = dashboard.apply_routes_change
|
||||||
# Default stub: succeed with deterministic before/after so the
|
self._original_apply_allowlist = dashboard.apply_allowlist_change
|
||||||
|
# Default stubs: succeed with deterministic before/after so the
|
||||||
# audit log shows a non-empty diff.
|
# audit log shows a non-empty diff.
|
||||||
dashboard.apply_routes_change = lambda slug, content: (
|
dashboard.apply_routes_change = lambda slug, content: (
|
||||||
'{"routes": []}\n', content,
|
'{"routes": []}\n', content,
|
||||||
)
|
)
|
||||||
|
dashboard.apply_allowlist_change = lambda slug, content: (
|
||||||
|
"old.example\n", content,
|
||||||
|
)
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
dashboard.apply_routes_change = self._original_apply
|
dashboard.apply_routes_change = self._original_apply_routes
|
||||||
|
dashboard.apply_allowlist_change = self._original_apply_allowlist
|
||||||
self._teardown_fake_home()
|
self._teardown_fake_home()
|
||||||
|
|
||||||
def _enqueue(self, tool: str = TOOL_CRED_PROXY_BLOCK):
|
def _enqueue(self, tool: str = TOOL_CRED_PROXY_BLOCK):
|
||||||
@@ -268,6 +274,65 @@ class TestCredProxyApplyWiring(_FakeHomeMixin, unittest.TestCase):
|
|||||||
self.assertEqual("", entries[0].diff)
|
self.assertEqual("", entries[0].diff)
|
||||||
|
|
||||||
|
|
||||||
|
class TestPipelockApplyWiring(_FakeHomeMixin, unittest.TestCase):
|
||||||
|
"""PRD 0015 Phase 2: approve() on a pipelock-block proposal
|
||||||
|
must call apply_allowlist_change and surface its failures."""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self._setup_fake_home()
|
||||||
|
self._original = dashboard.apply_allowlist_change
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
dashboard.apply_allowlist_change = self._original
|
||||||
|
self._teardown_fake_home()
|
||||||
|
|
||||||
|
def _enqueue_pipelock(self, proposed: str = "host.example\n"):
|
||||||
|
p = Proposal.new(
|
||||||
|
bottle_slug="dev", tool=TOOL_PIPELOCK_BLOCK,
|
||||||
|
proposed_file=proposed,
|
||||||
|
justification="need new host",
|
||||||
|
current_file_hash=sha256_hex(proposed),
|
||||||
|
now=FIXED,
|
||||||
|
)
|
||||||
|
qdir = supervise.queue_dir_for_slug("dev")
|
||||||
|
qdir.mkdir(parents=True, exist_ok=True)
|
||||||
|
supervise.write_proposal(qdir, p)
|
||||||
|
return dashboard.QueuedProposal(proposal=p, queue_dir=qdir)
|
||||||
|
|
||||||
|
def test_pipelock_block_calls_apply_with_proposed_file(self):
|
||||||
|
calls = []
|
||||||
|
dashboard.apply_allowlist_change = lambda slug, content: (
|
||||||
|
calls.append((slug, content)) or ("before", content)
|
||||||
|
)
|
||||||
|
qp = self._enqueue_pipelock("new.example\n")
|
||||||
|
dashboard.approve(qp)
|
||||||
|
self.assertEqual([("dev", "new.example\n")], calls)
|
||||||
|
|
||||||
|
def test_apply_failure_blocks_response_and_audit(self):
|
||||||
|
dashboard.apply_allowlist_change = lambda slug, content: (_ for _ in ()).throw(
|
||||||
|
PipelockApplyError("docker exec failed")
|
||||||
|
)
|
||||||
|
qp = self._enqueue_pipelock()
|
||||||
|
with self.assertRaises(PipelockApplyError):
|
||||||
|
dashboard.approve(qp)
|
||||||
|
self.assertEqual(
|
||||||
|
[qp.proposal.id],
|
||||||
|
[p.id for p in supervise.list_pending_proposals(qp.queue_dir)],
|
||||||
|
)
|
||||||
|
self.assertEqual([], read_audit_entries("pipelock", "dev"))
|
||||||
|
|
||||||
|
def test_real_diff_lands_in_audit(self):
|
||||||
|
dashboard.apply_allowlist_change = lambda slug, content: (
|
||||||
|
"old.example\n",
|
||||||
|
"old.example\nnew.example\n",
|
||||||
|
)
|
||||||
|
qp = self._enqueue_pipelock("old.example\nnew.example\n")
|
||||||
|
dashboard.approve(qp)
|
||||||
|
entries = read_audit_entries("pipelock", "dev")
|
||||||
|
self.assertEqual(1, len(entries))
|
||||||
|
self.assertIn("+new.example", entries[0].diff)
|
||||||
|
|
||||||
|
|
||||||
class TestOperatorEditRoutes(_FakeHomeMixin, unittest.TestCase):
|
class TestOperatorEditRoutes(_FakeHomeMixin, unittest.TestCase):
|
||||||
"""PRD 0014 Phase 4: operator-initiated routes edit (not gated
|
"""PRD 0014 Phase 4: operator-initiated routes edit (not gated
|
||||||
on a pending proposal)."""
|
on a pending proposal)."""
|
||||||
@@ -313,10 +378,41 @@ class TestDiscoverCredProxySlugs(unittest.TestCase):
|
|||||||
os.environ["PATH"] = "/nonexistent-no-docker-here"
|
os.environ["PATH"] = "/nonexistent-no-docker-here"
|
||||||
try:
|
try:
|
||||||
self.assertEqual([], dashboard.discover_cred_proxy_slugs())
|
self.assertEqual([], dashboard.discover_cred_proxy_slugs())
|
||||||
|
self.assertEqual([], dashboard.discover_pipelock_slugs())
|
||||||
finally:
|
finally:
|
||||||
os.environ["PATH"] = original
|
os.environ["PATH"] = original
|
||||||
|
|
||||||
|
|
||||||
|
class TestOperatorEditAllowlist(_FakeHomeMixin, unittest.TestCase):
|
||||||
|
"""PRD 0015 Phase 3: operator-initiated pipelock allowlist edit."""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self._setup_fake_home()
|
||||||
|
self._original = dashboard.apply_allowlist_change
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
dashboard.apply_allowlist_change = self._original
|
||||||
|
self._teardown_fake_home()
|
||||||
|
|
||||||
|
def test_writes_audit_with_operator_edit_action(self):
|
||||||
|
dashboard.apply_allowlist_change = lambda slug, content: (
|
||||||
|
"old.example\n", content,
|
||||||
|
)
|
||||||
|
dashboard.operator_edit_allowlist("dev", "old.example\nnew.example\n")
|
||||||
|
entries = read_audit_entries("pipelock", "dev")
|
||||||
|
self.assertEqual(1, len(entries))
|
||||||
|
self.assertEqual(supervise.ACTION_OPERATOR_EDIT, entries[0].operator_action)
|
||||||
|
self.assertIn("+new.example", entries[0].diff)
|
||||||
|
|
||||||
|
def test_failure_does_not_write_audit(self):
|
||||||
|
dashboard.apply_allowlist_change = lambda slug, content: (_ for _ in ()).throw(
|
||||||
|
PipelockApplyError("nope")
|
||||||
|
)
|
||||||
|
with self.assertRaises(PipelockApplyError):
|
||||||
|
dashboard.operator_edit_allowlist("dev", "x.example\n")
|
||||||
|
self.assertEqual([], read_audit_entries("pipelock", "dev"))
|
||||||
|
|
||||||
|
|
||||||
class TestEditInEditor(unittest.TestCase):
|
class TestEditInEditor(unittest.TestCase):
|
||||||
def test_runs_editor_returns_edited_content(self):
|
def test_runs_editor_returns_edited_content(self):
|
||||||
# Fake "editor" is /bin/sh -c 'cat <<EOF > $1 ... EOF'
|
# Fake "editor" is /bin/sh -c 'cat <<EOF > $1 ... EOF'
|
||||||
|
|||||||
@@ -0,0 +1,115 @@
|
|||||||
|
"""Unit: pipelock_apply parsers + helpers (PRD 0015 Phase 1).
|
||||||
|
|
||||||
|
docker exec / cp / restart paths are covered by the integration
|
||||||
|
test in Phase 4. Here we cover the host-side parsing + yaml roundtrip.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
from claude_bottle.backend.docker.pipelock_apply import (
|
||||||
|
PipelockApplyError,
|
||||||
|
parse_allowlist_content,
|
||||||
|
render_allowlist_content,
|
||||||
|
)
|
||||||
|
from claude_bottle.pipelock import pipelock_render_yaml
|
||||||
|
from claude_bottle.yaml_subset import parse_yaml_subset
|
||||||
|
|
||||||
|
|
||||||
|
class TestParseAllowlistContent(unittest.TestCase):
|
||||||
|
def test_one_per_line(self):
|
||||||
|
self.assertEqual(
|
||||||
|
["a.example", "b.example"],
|
||||||
|
parse_allowlist_content("a.example\nb.example\n"),
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_blank_lines_ignored(self):
|
||||||
|
self.assertEqual(
|
||||||
|
["a", "b"],
|
||||||
|
parse_allowlist_content("a\n\n \nb\n"),
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_comments_ignored(self):
|
||||||
|
self.assertEqual(
|
||||||
|
["a"],
|
||||||
|
parse_allowlist_content("# top comment\na\n# trailing\n"),
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_invalid_char_raises(self):
|
||||||
|
with self.assertRaises(PipelockApplyError) as cm:
|
||||||
|
parse_allowlist_content("host with space\n")
|
||||||
|
self.assertIn("disallowed characters", str(cm.exception))
|
||||||
|
|
||||||
|
def test_empty_input_returns_empty_list(self):
|
||||||
|
self.assertEqual([], parse_allowlist_content(""))
|
||||||
|
|
||||||
|
|
||||||
|
class TestRenderAllowlistContent(unittest.TestCase):
|
||||||
|
def test_one_per_line_with_trailing_newline(self):
|
||||||
|
self.assertEqual("a\nb\n", render_allowlist_content(["a", "b"]))
|
||||||
|
|
||||||
|
def test_empty_renders_empty(self):
|
||||||
|
self.assertEqual("", render_allowlist_content([]))
|
||||||
|
|
||||||
|
def test_roundtrip(self):
|
||||||
|
original = ["api.example.com", "ghcr.io", "example.org"]
|
||||||
|
self.assertEqual(
|
||||||
|
original,
|
||||||
|
parse_allowlist_content(render_allowlist_content(original)),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestYamlRoundtripPreservesPipelockFields(unittest.TestCase):
|
||||||
|
"""The apply path parses the running pipelock.yaml, swaps
|
||||||
|
api_allowlist, re-renders. Verify that parse(render(cfg)) ==
|
||||||
|
cfg for the fields pipelock_render_yaml emits — otherwise
|
||||||
|
the apply would silently drop config."""
|
||||||
|
|
||||||
|
def test_minimal_config_roundtrips(self):
|
||||||
|
cfg = {
|
||||||
|
"version": 1,
|
||||||
|
"mode": "strict",
|
||||||
|
"enforce": True,
|
||||||
|
"api_allowlist": ["a.example", "b.example"],
|
||||||
|
"forward_proxy": {"enabled": True},
|
||||||
|
"dlp": {"include_defaults": True, "scan_env": True},
|
||||||
|
"request_body_scanning": {"action": "block"},
|
||||||
|
}
|
||||||
|
rendered = pipelock_render_yaml(cfg)
|
||||||
|
parsed = parse_yaml_subset(rendered)
|
||||||
|
self.assertEqual(["a.example", "b.example"], parsed["api_allowlist"])
|
||||||
|
self.assertEqual(1, parsed["version"])
|
||||||
|
self.assertEqual("strict", parsed["mode"])
|
||||||
|
self.assertEqual(True, parsed["enforce"])
|
||||||
|
|
||||||
|
def test_swap_allowlist_then_render_preserves_other_fields(self):
|
||||||
|
cfg = {
|
||||||
|
"version": 1,
|
||||||
|
"mode": "strict",
|
||||||
|
"enforce": True,
|
||||||
|
"api_allowlist": ["old.example"],
|
||||||
|
"forward_proxy": {"enabled": True},
|
||||||
|
"dlp": {"include_defaults": True, "scan_env": True},
|
||||||
|
"request_body_scanning": {"action": "block"},
|
||||||
|
"tls_interception": {
|
||||||
|
"enabled": True,
|
||||||
|
"ca_cert": "/etc/pipelock-ca.pem",
|
||||||
|
"ca_key": "/etc/pipelock-ca-key.pem",
|
||||||
|
"passthrough_domains": ["api.anthropic.com"],
|
||||||
|
},
|
||||||
|
}
|
||||||
|
parsed = parse_yaml_subset(pipelock_render_yaml(cfg))
|
||||||
|
parsed["api_allowlist"] = ["new.example"]
|
||||||
|
rerendered = pipelock_render_yaml(parsed)
|
||||||
|
roundtripped = parse_yaml_subset(rerendered)
|
||||||
|
self.assertEqual(["new.example"], roundtripped["api_allowlist"])
|
||||||
|
# Non-allowlist fields stay put.
|
||||||
|
self.assertEqual("strict", roundtripped["mode"])
|
||||||
|
tls = roundtripped["tls_interception"]
|
||||||
|
self.assertIsInstance(tls, dict)
|
||||||
|
assert isinstance(tls, dict) # type-narrowing
|
||||||
|
self.assertEqual("/etc/pipelock-ca.pem", tls["ca_cert"])
|
||||||
|
self.assertEqual(["api.anthropic.com"], tls["passthrough_domains"])
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
Reference in New Issue
Block a user