From 30b4f1228845be4f7363ceae2f5c230492749cfc Mon Sep 17 00:00:00 2001 From: didericis Date: Mon, 11 May 2026 16:23:12 -0400 Subject: [PATCH] refactor(pipelock): expose structured config; assert on dict in tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Split pipelock config building from YAML rendering: pipelock_build_config returns a dict, pipelock_render_yaml serializes it, and _build_pipelock_yaml chains the two onto disk. Unchanged behavior — pipelock loads the same YAML. The yaml test now asserts on the structured config dict, which is robust to cosmetic YAML changes (key order, quoting). The two checks that only make sense on the rendered output — file mode 0600 and no-secret-leakage — stay against the on-disk content. Co-Authored-By: Claude Opus 4.7 --- claude_bottle/pipelock.py | 110 +++++++++++++++++++------------ tests/unit/test_pipelock_yaml.py | 97 +++++++++++++++------------ 2 files changed, 125 insertions(+), 82 deletions(-) diff --git a/claude_bottle/pipelock.py b/claude_bottle/pipelock.py index 4d9967f..9661c93 100644 --- a/claude_bottle/pipelock.py +++ b/claude_bottle/pipelock.py @@ -15,6 +15,7 @@ from __future__ import annotations from abc import ABC, abstractmethod from dataclasses import dataclass from pathlib import Path +from typing import cast from .manifest import Bottle from .util import is_ipv4_literal @@ -85,6 +86,72 @@ def pipelock_allowlist_summary(bottle: Bottle) -> str: +# --- Config build + YAML render -------------------------------------------- + + +def pipelock_build_config(bottle: Bottle) -> dict[str, object]: + """Build the structured pipelock config dict the sidecar will load. + + Deliberately carries no env values, no secrets, no per-agent + customization beyond the resolved hostname list. The shape mirrors + the YAML pipelock expects on disk; `pipelock_render_yaml` serializes + it. Tests assert on this dict; production code renders it.""" + cfg: dict[str, object] = { + "version": 1, + "mode": "strict", + "enforce": True, + "api_allowlist": pipelock_effective_allowlist(bottle), + "forward_proxy": {"enabled": True}, + } + trusted = pipelock_bottle_ssh_trusted_domains(bottle) + if trusted: + cfg["trusted_domains"] = trusted + ip_cidrs = pipelock_bottle_ssh_ip_cidrs(bottle) + if ip_cidrs: + cfg["ssrf"] = {"ip_allowlist": ip_cidrs} + cfg["dlp"] = {"include_defaults": True, "scan_env": True} + return cfg + + +def pipelock_render_yaml(cfg: dict[str, object]) -> str: + """Render a pipelock config dict (as produced by + `pipelock_build_config`) as YAML. Hand-rolled so we don't take a + YAML-parser dependency for a fixed, narrow shape.""" + def _bool(b: object) -> str: + return "true" if b else "false" + + lines: list[str] = [] + lines.append(f"version: {cfg['version']}") + lines.append(f"mode: {cfg['mode']}") + lines.append(f"enforce: {_bool(cfg['enforce'])}") + lines.append("") + lines.append("api_allowlist:") + for h in cast(list[str], cfg["api_allowlist"]): + lines.append(f' - "{h}"') + lines.append("") + lines.append("forward_proxy:") + fp = cast(dict[str, object], cfg["forward_proxy"]) + lines.append(f" enabled: {_bool(fp['enabled'])}") + lines.append("") + if "trusted_domains" in cfg: + lines.append("trusted_domains:") + for td in cast(list[str], cfg["trusted_domains"]): + lines.append(f' - "{td}"') + lines.append("") + if "ssrf" in cfg: + lines.append("ssrf:") + ssrf = cast(dict[str, object], cfg["ssrf"]) + lines.append(" ip_allowlist:") + for cidr in cast(list[str], ssrf["ip_allowlist"]): + lines.append(f' - "{cidr}"') + lines.append("") + lines.append("dlp:") + dlp = cast(dict[str, object], cfg["dlp"]) + lines.append(f" include_defaults: {_bool(dlp['include_defaults'])}") + lines.append(f" scan_env: {_bool(dlp['scan_env'])}") + return "\n".join(lines) + "\n" + + # --- Proxy class ----------------------------------------------------------- @@ -125,47 +192,8 @@ class PipelockProxy(ABC): return PipelockProxyPlan(yaml_path=yaml_path, slug=slug) def _build_pipelock_yaml(self, bottle: Bottle, yaml_path: Path): - """Write the pipelock yaml config (mode 600) to `yaml_path` - for the sidecar to consume when it boots. Carries the - effective allowlist (bottle.egress.allowlist UNION - claude-bottle defaults UNION ssh hostnames), a fixed listen - port, strict mode + forward_proxy + DLP defaults + scan_env. - Deliberately contains no env values, no secrets, no per-agent - customization beyond the hostname list.""" - allowlist = pipelock_effective_allowlist(bottle) - trusted = pipelock_bottle_ssh_trusted_domains(bottle) - ip_cidrs = pipelock_bottle_ssh_ip_cidrs(bottle) - - lines: list[str] = [] - lines.append("version: 1") - lines.append("mode: strict") - lines.append("enforce: true") - lines.append("") - lines.append("# Hostnames the agent is allowed to reach. Effective list is") - lines.append("# claude-bottle defaults UNION bottle.egress.allowlist (sorted, deduped).") - lines.append("api_allowlist:") - for h in allowlist: - lines.append(f' - "{h}"') - lines.append("") - lines.append("forward_proxy:") - lines.append(" enabled: true") - lines.append("") - if trusted: - lines.append("trusted_domains:") - for td in trusted: - lines.append(f' - "{td}"') - lines.append("") - if ip_cidrs: - lines.append("ssrf:") - lines.append(" ip_allowlist:") - for cidr in ip_cidrs: - lines.append(f' - "{cidr}"') - lines.append("") - lines.append("dlp:") - lines.append(" include_defaults: true") - lines.append(" scan_env: true") - - yaml_path.write_text("\n".join(lines) + "\n") + """Write the pipelock yaml config (mode 600) to `yaml_path`.""" + yaml_path.write_text(pipelock_render_yaml(pipelock_build_config(bottle))) yaml_path.chmod(0o600) @abstractmethod diff --git a/tests/unit/test_pipelock_yaml.py b/tests/unit/test_pipelock_yaml.py index ae6a80b..2ed6aa7 100644 --- a/tests/unit/test_pipelock_yaml.py +++ b/tests/unit/test_pipelock_yaml.py @@ -1,6 +1,10 @@ -"""Unit: PipelockProxy.prepare — produces a pipelock YAML config -containing the expected top-level keys and per-bottle entries. We -don't fully parse YAML; we grep for content shape.""" +"""Unit: pipelock config building and YAML rendering. + +`pipelock_build_config` produces the structured config dict pipelock +will load; tests assert on that dict so they don't break on cosmetic +YAML changes. A small set of tests still hit the rendered output for +properties that only make sense on disk (file mode, no-secret-leakage). +""" import os import tempfile @@ -9,49 +13,66 @@ from pathlib import Path from claude_bottle.backend.docker.pipelock import DockerPipelockProxy from claude_bottle.manifest import Manifest +from claude_bottle.pipelock import pipelock_build_config, pipelock_render_yaml from tests.fixtures import fixture_minimal, fixture_with_ssh -class TestPipelockProxyPrepare(unittest.TestCase): +class TestBuildConfig(unittest.TestCase): + def test_minimal_shape(self): + cfg = pipelock_build_config(fixture_minimal().bottles["dev"]) + self.assertEqual("strict", cfg["mode"]) + self.assertEqual(True, cfg["enforce"]) + self.assertEqual({"enabled": True}, cfg["forward_proxy"]) + self.assertEqual( + {"include_defaults": True, "scan_env": True}, cfg["dlp"] + ) + # Baked defaults always present. + self.assertIn("api.anthropic.com", cfg["api_allowlist"]) + self.assertIn("raw.githubusercontent.com", cfg["api_allowlist"]) + # No SSH entries → no trusted_domains, no ssrf. + self.assertNotIn("trusted_domains", cfg) + self.assertNotIn("ssrf", cfg) + + def test_ssh_shape(self): + cfg = pipelock_build_config(fixture_with_ssh().bottles["dev"]) + self.assertIn("github.com", cfg["trusted_domains"]) + self.assertNotIn("100.78.141.42", cfg["trusted_domains"]) + self.assertIn("100.78.141.42/32", cfg["ssrf"]["ip_allowlist"]) + # Strict mode: IPv4 host is also in the api_allowlist union. + self.assertIn("100.78.141.42", cfg["api_allowlist"]) + + +class TestRenderAndWrite(unittest.TestCase): def setUp(self): self.out_dir = Path(tempfile.mkdtemp()) - self.proxy = DockerPipelockProxy() def tearDown(self): import shutil shutil.rmtree(self.out_dir, ignore_errors=True) - def test_minimal(self): + def test_render_emits_required_top_level_keys(self): + """One render-level smoke check: the serialized YAML is plausibly + the shape pipelock expects. We don't grep every key here — that's + what TestBuildConfig is for.""" + cfg = pipelock_build_config(fixture_with_ssh().bottles["dev"]) + text = pipelock_render_yaml(cfg) + for required in ( + "api_allowlist:", + "forward_proxy:", + "trusted_domains:", + "ssrf:", + "dlp:", + ): + self.assertIn(required, text) + + def test_prepare_writes_file_at_mode_600(self): yaml_path = self.out_dir / "min.yaml" - self.proxy.prepare(fixture_minimal().bottles["dev"], "demo", yaml_path) - content = yaml_path.read_text() - self.assertIn("mode: strict", content) - self.assertIn("enforce: true", content) - self.assertIn("api_allowlist:", content) - self.assertIn("api.anthropic.com", content) - self.assertIn("raw.githubusercontent.com", content) - self.assertIn("forward_proxy:", content) - self.assertIn("enabled: true", content) - self.assertIn("dlp:", content) - self.assertIn("include_defaults: true", content) - self.assertIn("scan_env: true", content) - # No ssh entries → no trusted_domains nor ssrf block. - self.assertNotIn("trusted_domains:", content) - self.assertNotIn("ssrf:", content) + DockerPipelockProxy().prepare( + fixture_minimal().bottles["dev"], "demo", yaml_path + ) + self.assertEqual(0o600, os.stat(yaml_path).st_mode & 0o777) - def test_ssh_blocks(self): - yaml_path = self.out_dir / "ssh.yaml" - self.proxy.prepare(fixture_with_ssh().bottles["dev"], "demo", yaml_path) - content = yaml_path.read_text() - self.assertIn("trusted_domains:", content) - self.assertIn("github.com", content) - self.assertIn("ssrf:", content) - self.assertIn("ip_allowlist:", content) - self.assertIn("100.78.141.42/32", content) - # ipv4 host should also be in api_allowlist (strict mode requires both). - self.assertIn("100.78.141.42", content) - - def test_secret_hygiene(self): + def test_prepare_does_not_leak_env_names_or_values(self): manifest = Manifest.from_json_obj({ "bottles": { "dev": { @@ -65,18 +86,12 @@ class TestPipelockProxyPrepare(unittest.TestCase): "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, }) yaml_path = self.out_dir / "secret.yaml" - self.proxy.prepare(manifest.bottles["dev"], "demo", yaml_path) + DockerPipelockProxy().prepare(manifest.bottles["dev"], "demo", yaml_path) content = yaml_path.read_text() self.assertNotIn("literal-value-should-not-appear", content) self.assertNotIn("MY_SECRET", content) self.assertNotIn("prompt-message", content) - def test_file_mode_is_600(self): - yaml_path = self.out_dir / "min.yaml" - self.proxy.prepare(fixture_minimal().bottles["dev"], "demo", yaml_path) - mode = os.stat(yaml_path).st_mode & 0o777 - self.assertEqual(0o600, mode) - if __name__ == "__main__": unittest.main()