refactor(pipelock): expose structured config; assert on dict in tests

Split pipelock config building from YAML rendering: pipelock_build_config
returns a dict, pipelock_render_yaml serializes it, and _build_pipelock_yaml
chains the two onto disk. Unchanged behavior — pipelock loads the same YAML.

The yaml test now asserts on the structured config dict, which is
robust to cosmetic YAML changes (key order, quoting). The two checks
that only make sense on the rendered output — file mode 0600 and
no-secret-leakage — stay against the on-disk content.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-11 16:23:12 -04:00
parent 4462863d56
commit 30b4f12288
2 changed files with 125 additions and 82 deletions
+69 -41
View File
@@ -15,6 +15,7 @@ from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
from typing import cast
from .manifest import Bottle
from .util import is_ipv4_literal
@@ -85,6 +86,72 @@ def pipelock_allowlist_summary(bottle: Bottle) -> str:
# --- Config build + YAML render --------------------------------------------
def pipelock_build_config(bottle: Bottle) -> dict[str, object]:
"""Build the structured pipelock config dict the sidecar will load.
Deliberately carries no env values, no secrets, no per-agent
customization beyond the resolved hostname list. The shape mirrors
the YAML pipelock expects on disk; `pipelock_render_yaml` serializes
it. Tests assert on this dict; production code renders it."""
cfg: dict[str, object] = {
"version": 1,
"mode": "strict",
"enforce": True,
"api_allowlist": pipelock_effective_allowlist(bottle),
"forward_proxy": {"enabled": True},
}
trusted = pipelock_bottle_ssh_trusted_domains(bottle)
if trusted:
cfg["trusted_domains"] = trusted
ip_cidrs = pipelock_bottle_ssh_ip_cidrs(bottle)
if ip_cidrs:
cfg["ssrf"] = {"ip_allowlist": ip_cidrs}
cfg["dlp"] = {"include_defaults": True, "scan_env": True}
return cfg
def pipelock_render_yaml(cfg: dict[str, object]) -> str:
"""Render a pipelock config dict (as produced by
`pipelock_build_config`) as YAML. Hand-rolled so we don't take a
YAML-parser dependency for a fixed, narrow shape."""
def _bool(b: object) -> str:
return "true" if b else "false"
lines: list[str] = []
lines.append(f"version: {cfg['version']}")
lines.append(f"mode: {cfg['mode']}")
lines.append(f"enforce: {_bool(cfg['enforce'])}")
lines.append("")
lines.append("api_allowlist:")
for h in cast(list[str], cfg["api_allowlist"]):
lines.append(f' - "{h}"')
lines.append("")
lines.append("forward_proxy:")
fp = cast(dict[str, object], cfg["forward_proxy"])
lines.append(f" enabled: {_bool(fp['enabled'])}")
lines.append("")
if "trusted_domains" in cfg:
lines.append("trusted_domains:")
for td in cast(list[str], cfg["trusted_domains"]):
lines.append(f' - "{td}"')
lines.append("")
if "ssrf" in cfg:
lines.append("ssrf:")
ssrf = cast(dict[str, object], cfg["ssrf"])
lines.append(" ip_allowlist:")
for cidr in cast(list[str], ssrf["ip_allowlist"]):
lines.append(f' - "{cidr}"')
lines.append("")
lines.append("dlp:")
dlp = cast(dict[str, object], cfg["dlp"])
lines.append(f" include_defaults: {_bool(dlp['include_defaults'])}")
lines.append(f" scan_env: {_bool(dlp['scan_env'])}")
return "\n".join(lines) + "\n"
# --- Proxy class -----------------------------------------------------------
@@ -125,47 +192,8 @@ class PipelockProxy(ABC):
return PipelockProxyPlan(yaml_path=yaml_path, slug=slug)
def _build_pipelock_yaml(self, bottle: Bottle, yaml_path: Path):
"""Write the pipelock yaml config (mode 600) to `yaml_path`
for the sidecar to consume when it boots. Carries the
effective allowlist (bottle.egress.allowlist UNION
claude-bottle defaults UNION ssh hostnames), a fixed listen
port, strict mode + forward_proxy + DLP defaults + scan_env.
Deliberately contains no env values, no secrets, no per-agent
customization beyond the hostname list."""
allowlist = pipelock_effective_allowlist(bottle)
trusted = pipelock_bottle_ssh_trusted_domains(bottle)
ip_cidrs = pipelock_bottle_ssh_ip_cidrs(bottle)
lines: list[str] = []
lines.append("version: 1")
lines.append("mode: strict")
lines.append("enforce: true")
lines.append("")
lines.append("# Hostnames the agent is allowed to reach. Effective list is")
lines.append("# claude-bottle defaults UNION bottle.egress.allowlist (sorted, deduped).")
lines.append("api_allowlist:")
for h in allowlist:
lines.append(f' - "{h}"')
lines.append("")
lines.append("forward_proxy:")
lines.append(" enabled: true")
lines.append("")
if trusted:
lines.append("trusted_domains:")
for td in trusted:
lines.append(f' - "{td}"')
lines.append("")
if ip_cidrs:
lines.append("ssrf:")
lines.append(" ip_allowlist:")
for cidr in ip_cidrs:
lines.append(f' - "{cidr}"')
lines.append("")
lines.append("dlp:")
lines.append(" include_defaults: true")
lines.append(" scan_env: true")
yaml_path.write_text("\n".join(lines) + "\n")
"""Write the pipelock yaml config (mode 600) to `yaml_path`."""
yaml_path.write_text(pipelock_render_yaml(pipelock_build_config(bottle)))
yaml_path.chmod(0o600)
@abstractmethod
+56 -41
View File
@@ -1,6 +1,10 @@
"""Unit: PipelockProxy.prepare — produces a pipelock YAML config
containing the expected top-level keys and per-bottle entries. We
don't fully parse YAML; we grep for content shape."""
"""Unit: pipelock config building and YAML rendering.
`pipelock_build_config` produces the structured config dict pipelock
will load; tests assert on that dict so they don't break on cosmetic
YAML changes. A small set of tests still hit the rendered output for
properties that only make sense on disk (file mode, no-secret-leakage).
"""
import os
import tempfile
@@ -9,49 +13,66 @@ from pathlib import Path
from claude_bottle.backend.docker.pipelock import DockerPipelockProxy
from claude_bottle.manifest import Manifest
from claude_bottle.pipelock import pipelock_build_config, pipelock_render_yaml
from tests.fixtures import fixture_minimal, fixture_with_ssh
class TestPipelockProxyPrepare(unittest.TestCase):
class TestBuildConfig(unittest.TestCase):
def test_minimal_shape(self):
cfg = pipelock_build_config(fixture_minimal().bottles["dev"])
self.assertEqual("strict", cfg["mode"])
self.assertEqual(True, cfg["enforce"])
self.assertEqual({"enabled": True}, cfg["forward_proxy"])
self.assertEqual(
{"include_defaults": True, "scan_env": True}, cfg["dlp"]
)
# Baked defaults always present.
self.assertIn("api.anthropic.com", cfg["api_allowlist"])
self.assertIn("raw.githubusercontent.com", cfg["api_allowlist"])
# No SSH entries → no trusted_domains, no ssrf.
self.assertNotIn("trusted_domains", cfg)
self.assertNotIn("ssrf", cfg)
def test_ssh_shape(self):
cfg = pipelock_build_config(fixture_with_ssh().bottles["dev"])
self.assertIn("github.com", cfg["trusted_domains"])
self.assertNotIn("100.78.141.42", cfg["trusted_domains"])
self.assertIn("100.78.141.42/32", cfg["ssrf"]["ip_allowlist"])
# Strict mode: IPv4 host is also in the api_allowlist union.
self.assertIn("100.78.141.42", cfg["api_allowlist"])
class TestRenderAndWrite(unittest.TestCase):
def setUp(self):
self.out_dir = Path(tempfile.mkdtemp())
self.proxy = DockerPipelockProxy()
def tearDown(self):
import shutil
shutil.rmtree(self.out_dir, ignore_errors=True)
def test_minimal(self):
def test_render_emits_required_top_level_keys(self):
"""One render-level smoke check: the serialized YAML is plausibly
the shape pipelock expects. We don't grep every key here — that's
what TestBuildConfig is for."""
cfg = pipelock_build_config(fixture_with_ssh().bottles["dev"])
text = pipelock_render_yaml(cfg)
for required in (
"api_allowlist:",
"forward_proxy:",
"trusted_domains:",
"ssrf:",
"dlp:",
):
self.assertIn(required, text)
def test_prepare_writes_file_at_mode_600(self):
yaml_path = self.out_dir / "min.yaml"
self.proxy.prepare(fixture_minimal().bottles["dev"], "demo", yaml_path)
content = yaml_path.read_text()
self.assertIn("mode: strict", content)
self.assertIn("enforce: true", content)
self.assertIn("api_allowlist:", content)
self.assertIn("api.anthropic.com", content)
self.assertIn("raw.githubusercontent.com", content)
self.assertIn("forward_proxy:", content)
self.assertIn("enabled: true", content)
self.assertIn("dlp:", content)
self.assertIn("include_defaults: true", content)
self.assertIn("scan_env: true", content)
# No ssh entries → no trusted_domains nor ssrf block.
self.assertNotIn("trusted_domains:", content)
self.assertNotIn("ssrf:", content)
DockerPipelockProxy().prepare(
fixture_minimal().bottles["dev"], "demo", yaml_path
)
self.assertEqual(0o600, os.stat(yaml_path).st_mode & 0o777)
def test_ssh_blocks(self):
yaml_path = self.out_dir / "ssh.yaml"
self.proxy.prepare(fixture_with_ssh().bottles["dev"], "demo", yaml_path)
content = yaml_path.read_text()
self.assertIn("trusted_domains:", content)
self.assertIn("github.com", content)
self.assertIn("ssrf:", content)
self.assertIn("ip_allowlist:", content)
self.assertIn("100.78.141.42/32", content)
# ipv4 host should also be in api_allowlist (strict mode requires both).
self.assertIn("100.78.141.42", content)
def test_secret_hygiene(self):
def test_prepare_does_not_leak_env_names_or_values(self):
manifest = Manifest.from_json_obj({
"bottles": {
"dev": {
@@ -65,18 +86,12 @@ class TestPipelockProxyPrepare(unittest.TestCase):
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
})
yaml_path = self.out_dir / "secret.yaml"
self.proxy.prepare(manifest.bottles["dev"], "demo", yaml_path)
DockerPipelockProxy().prepare(manifest.bottles["dev"], "demo", yaml_path)
content = yaml_path.read_text()
self.assertNotIn("literal-value-should-not-appear", content)
self.assertNotIn("MY_SECRET", content)
self.assertNotIn("prompt-message", content)
def test_file_mode_is_600(self):
yaml_path = self.out_dir / "min.yaml"
self.proxy.prepare(fixture_minimal().bottles["dev"], "demo", yaml_path)
mode = os.stat(yaml_path).st_mode & 0o777
self.assertEqual(0o600, mode)
if __name__ == "__main__":
unittest.main()