diff --git a/claude_bottle/pipelock.py b/claude_bottle/pipelock.py index 4d9967f..9661c93 100644 --- a/claude_bottle/pipelock.py +++ b/claude_bottle/pipelock.py @@ -15,6 +15,7 @@ from __future__ import annotations from abc import ABC, abstractmethod from dataclasses import dataclass from pathlib import Path +from typing import cast from .manifest import Bottle from .util import is_ipv4_literal @@ -85,6 +86,72 @@ def pipelock_allowlist_summary(bottle: Bottle) -> str: +# --- Config build + YAML render -------------------------------------------- + + +def pipelock_build_config(bottle: Bottle) -> dict[str, object]: + """Build the structured pipelock config dict the sidecar will load. + + Deliberately carries no env values, no secrets, no per-agent + customization beyond the resolved hostname list. The shape mirrors + the YAML pipelock expects on disk; `pipelock_render_yaml` serializes + it. Tests assert on this dict; production code renders it.""" + cfg: dict[str, object] = { + "version": 1, + "mode": "strict", + "enforce": True, + "api_allowlist": pipelock_effective_allowlist(bottle), + "forward_proxy": {"enabled": True}, + } + trusted = pipelock_bottle_ssh_trusted_domains(bottle) + if trusted: + cfg["trusted_domains"] = trusted + ip_cidrs = pipelock_bottle_ssh_ip_cidrs(bottle) + if ip_cidrs: + cfg["ssrf"] = {"ip_allowlist": ip_cidrs} + cfg["dlp"] = {"include_defaults": True, "scan_env": True} + return cfg + + +def pipelock_render_yaml(cfg: dict[str, object]) -> str: + """Render a pipelock config dict (as produced by + `pipelock_build_config`) as YAML. Hand-rolled so we don't take a + YAML-parser dependency for a fixed, narrow shape.""" + def _bool(b: object) -> str: + return "true" if b else "false" + + lines: list[str] = [] + lines.append(f"version: {cfg['version']}") + lines.append(f"mode: {cfg['mode']}") + lines.append(f"enforce: {_bool(cfg['enforce'])}") + lines.append("") + lines.append("api_allowlist:") + for h in cast(list[str], cfg["api_allowlist"]): + lines.append(f' - "{h}"') + lines.append("") + lines.append("forward_proxy:") + fp = cast(dict[str, object], cfg["forward_proxy"]) + lines.append(f" enabled: {_bool(fp['enabled'])}") + lines.append("") + if "trusted_domains" in cfg: + lines.append("trusted_domains:") + for td in cast(list[str], cfg["trusted_domains"]): + lines.append(f' - "{td}"') + lines.append("") + if "ssrf" in cfg: + lines.append("ssrf:") + ssrf = cast(dict[str, object], cfg["ssrf"]) + lines.append(" ip_allowlist:") + for cidr in cast(list[str], ssrf["ip_allowlist"]): + lines.append(f' - "{cidr}"') + lines.append("") + lines.append("dlp:") + dlp = cast(dict[str, object], cfg["dlp"]) + lines.append(f" include_defaults: {_bool(dlp['include_defaults'])}") + lines.append(f" scan_env: {_bool(dlp['scan_env'])}") + return "\n".join(lines) + "\n" + + # --- Proxy class ----------------------------------------------------------- @@ -125,47 +192,8 @@ class PipelockProxy(ABC): return PipelockProxyPlan(yaml_path=yaml_path, slug=slug) def _build_pipelock_yaml(self, bottle: Bottle, yaml_path: Path): - """Write the pipelock yaml config (mode 600) to `yaml_path` - for the sidecar to consume when it boots. Carries the - effective allowlist (bottle.egress.allowlist UNION - claude-bottle defaults UNION ssh hostnames), a fixed listen - port, strict mode + forward_proxy + DLP defaults + scan_env. - Deliberately contains no env values, no secrets, no per-agent - customization beyond the hostname list.""" - allowlist = pipelock_effective_allowlist(bottle) - trusted = pipelock_bottle_ssh_trusted_domains(bottle) - ip_cidrs = pipelock_bottle_ssh_ip_cidrs(bottle) - - lines: list[str] = [] - lines.append("version: 1") - lines.append("mode: strict") - lines.append("enforce: true") - lines.append("") - lines.append("# Hostnames the agent is allowed to reach. Effective list is") - lines.append("# claude-bottle defaults UNION bottle.egress.allowlist (sorted, deduped).") - lines.append("api_allowlist:") - for h in allowlist: - lines.append(f' - "{h}"') - lines.append("") - lines.append("forward_proxy:") - lines.append(" enabled: true") - lines.append("") - if trusted: - lines.append("trusted_domains:") - for td in trusted: - lines.append(f' - "{td}"') - lines.append("") - if ip_cidrs: - lines.append("ssrf:") - lines.append(" ip_allowlist:") - for cidr in ip_cidrs: - lines.append(f' - "{cidr}"') - lines.append("") - lines.append("dlp:") - lines.append(" include_defaults: true") - lines.append(" scan_env: true") - - yaml_path.write_text("\n".join(lines) + "\n") + """Write the pipelock yaml config (mode 600) to `yaml_path`.""" + yaml_path.write_text(pipelock_render_yaml(pipelock_build_config(bottle))) yaml_path.chmod(0o600) @abstractmethod diff --git a/tests/unit/test_pipelock_yaml.py b/tests/unit/test_pipelock_yaml.py index ae6a80b..2ed6aa7 100644 --- a/tests/unit/test_pipelock_yaml.py +++ b/tests/unit/test_pipelock_yaml.py @@ -1,6 +1,10 @@ -"""Unit: PipelockProxy.prepare — produces a pipelock YAML config -containing the expected top-level keys and per-bottle entries. We -don't fully parse YAML; we grep for content shape.""" +"""Unit: pipelock config building and YAML rendering. + +`pipelock_build_config` produces the structured config dict pipelock +will load; tests assert on that dict so they don't break on cosmetic +YAML changes. A small set of tests still hit the rendered output for +properties that only make sense on disk (file mode, no-secret-leakage). +""" import os import tempfile @@ -9,49 +13,66 @@ from pathlib import Path from claude_bottle.backend.docker.pipelock import DockerPipelockProxy from claude_bottle.manifest import Manifest +from claude_bottle.pipelock import pipelock_build_config, pipelock_render_yaml from tests.fixtures import fixture_minimal, fixture_with_ssh -class TestPipelockProxyPrepare(unittest.TestCase): +class TestBuildConfig(unittest.TestCase): + def test_minimal_shape(self): + cfg = pipelock_build_config(fixture_minimal().bottles["dev"]) + self.assertEqual("strict", cfg["mode"]) + self.assertEqual(True, cfg["enforce"]) + self.assertEqual({"enabled": True}, cfg["forward_proxy"]) + self.assertEqual( + {"include_defaults": True, "scan_env": True}, cfg["dlp"] + ) + # Baked defaults always present. + self.assertIn("api.anthropic.com", cfg["api_allowlist"]) + self.assertIn("raw.githubusercontent.com", cfg["api_allowlist"]) + # No SSH entries → no trusted_domains, no ssrf. + self.assertNotIn("trusted_domains", cfg) + self.assertNotIn("ssrf", cfg) + + def test_ssh_shape(self): + cfg = pipelock_build_config(fixture_with_ssh().bottles["dev"]) + self.assertIn("github.com", cfg["trusted_domains"]) + self.assertNotIn("100.78.141.42", cfg["trusted_domains"]) + self.assertIn("100.78.141.42/32", cfg["ssrf"]["ip_allowlist"]) + # Strict mode: IPv4 host is also in the api_allowlist union. + self.assertIn("100.78.141.42", cfg["api_allowlist"]) + + +class TestRenderAndWrite(unittest.TestCase): def setUp(self): self.out_dir = Path(tempfile.mkdtemp()) - self.proxy = DockerPipelockProxy() def tearDown(self): import shutil shutil.rmtree(self.out_dir, ignore_errors=True) - def test_minimal(self): + def test_render_emits_required_top_level_keys(self): + """One render-level smoke check: the serialized YAML is plausibly + the shape pipelock expects. We don't grep every key here — that's + what TestBuildConfig is for.""" + cfg = pipelock_build_config(fixture_with_ssh().bottles["dev"]) + text = pipelock_render_yaml(cfg) + for required in ( + "api_allowlist:", + "forward_proxy:", + "trusted_domains:", + "ssrf:", + "dlp:", + ): + self.assertIn(required, text) + + def test_prepare_writes_file_at_mode_600(self): yaml_path = self.out_dir / "min.yaml" - self.proxy.prepare(fixture_minimal().bottles["dev"], "demo", yaml_path) - content = yaml_path.read_text() - self.assertIn("mode: strict", content) - self.assertIn("enforce: true", content) - self.assertIn("api_allowlist:", content) - self.assertIn("api.anthropic.com", content) - self.assertIn("raw.githubusercontent.com", content) - self.assertIn("forward_proxy:", content) - self.assertIn("enabled: true", content) - self.assertIn("dlp:", content) - self.assertIn("include_defaults: true", content) - self.assertIn("scan_env: true", content) - # No ssh entries → no trusted_domains nor ssrf block. - self.assertNotIn("trusted_domains:", content) - self.assertNotIn("ssrf:", content) + DockerPipelockProxy().prepare( + fixture_minimal().bottles["dev"], "demo", yaml_path + ) + self.assertEqual(0o600, os.stat(yaml_path).st_mode & 0o777) - def test_ssh_blocks(self): - yaml_path = self.out_dir / "ssh.yaml" - self.proxy.prepare(fixture_with_ssh().bottles["dev"], "demo", yaml_path) - content = yaml_path.read_text() - self.assertIn("trusted_domains:", content) - self.assertIn("github.com", content) - self.assertIn("ssrf:", content) - self.assertIn("ip_allowlist:", content) - self.assertIn("100.78.141.42/32", content) - # ipv4 host should also be in api_allowlist (strict mode requires both). - self.assertIn("100.78.141.42", content) - - def test_secret_hygiene(self): + def test_prepare_does_not_leak_env_names_or_values(self): manifest = Manifest.from_json_obj({ "bottles": { "dev": { @@ -65,18 +86,12 @@ class TestPipelockProxyPrepare(unittest.TestCase): "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, }) yaml_path = self.out_dir / "secret.yaml" - self.proxy.prepare(manifest.bottles["dev"], "demo", yaml_path) + DockerPipelockProxy().prepare(manifest.bottles["dev"], "demo", yaml_path) content = yaml_path.read_text() self.assertNotIn("literal-value-should-not-appear", content) self.assertNotIn("MY_SECRET", content) self.assertNotIn("prompt-message", content) - def test_file_mode_is_600(self): - yaml_path = self.out_dir / "min.yaml" - self.proxy.prepare(fixture_minimal().bottles["dev"], "demo", yaml_path) - mode = os.stat(yaml_path).st_mode & 0o777 - self.assertEqual(0o600, mode) - if __name__ == "__main__": unittest.main()