diff --git a/claude_bottle/backend/docker/backend.py b/claude_bottle/backend/docker/backend.py index f66972a..6d2f5c4 100644 --- a/claude_bottle/backend/docker/backend.py +++ b/claude_bottle/backend/docker/backend.py @@ -102,7 +102,7 @@ class DockerBottleBackend(BottleBackend): prompt_file.write_text("") prompt_file.chmod(0o600) - pipelock.pipelock_write_yaml(manifest, bottle_name, pipelock_yaml) + self.prepare_proxy(spec, pipelock_yaml) env_resolve(manifest, spec.agent_name, env_file, args_file) prompt_file.write_text(agent.prompt) @@ -127,6 +127,51 @@ class DockerBottleBackend(BottleBackend): use_runsc=use_runsc, ) + def prepare_proxy(self, spec: BottleSpec, yaml_path: Path) -> None: + """Write the pipelock proxy's yaml config (mode 600) to + `yaml_path` for the sidecar to consume when it boots in + `launch`. Carries the effective allowlist (bottle.egress.allowlist + UNION claude-bottle defaults UNION ssh hostnames), a fixed + listen port, strict mode + forward_proxy + DLP defaults + + scan_env. Deliberately contains no env values, no secrets, no + per-agent customization beyond the hostname list.""" + bottle_name = spec.manifest.agents[spec.agent_name].bottle + allowlist = pipelock.pipelock_effective_allowlist(spec.manifest, bottle_name) + trusted = pipelock.pipelock_bottle_ssh_trusted_domains(spec.manifest, bottle_name) + ip_cidrs = pipelock.pipelock_bottle_ssh_ip_cidrs(spec.manifest, bottle_name) + + lines: list[str] = [] + lines.append("version: 1") + lines.append("mode: strict") + lines.append("enforce: true") + lines.append("") + lines.append("# Hostnames the agent is allowed to reach. Effective list is") + lines.append("# claude-bottle defaults UNION bottle.egress.allowlist (sorted, deduped).") + lines.append("api_allowlist:") + for h in allowlist: + lines.append(f' - "{h}"') + lines.append("") + lines.append("forward_proxy:") + lines.append(" enabled: true") + lines.append("") + if trusted: + lines.append("trusted_domains:") + for td in trusted: + lines.append(f' - "{td}"') + lines.append("") + if ip_cidrs: + lines.append("ssrf:") + lines.append(" ip_allowlist:") + for cidr in ip_cidrs: + lines.append(f' - "{cidr}"') + lines.append("") + lines.append("dlp:") + lines.append(" include_defaults: true") + lines.append(" scan_env: true") + + yaml_path.write_text("\n".join(lines) + "\n") + yaml_path.chmod(0o600) + @contextmanager def launch(self, plan: BottlePlan) -> Iterator[DockerBottle]: """Build, launch, and provision a Docker bottle. Teardown on exit.""" diff --git a/claude_bottle/pipelock.py b/claude_bottle/pipelock.py index e6f2bb9..ba94416 100644 --- a/claude_bottle/pipelock.py +++ b/claude_bottle/pipelock.py @@ -119,53 +119,6 @@ def pipelock_allowlist_summary(manifest: Manifest, bottle_name: str) -> str: return f"{count} hosts allowed ({joined})" -# --- YAML generation ------------------------------------------------------- - - -def pipelock_write_yaml(manifest: Manifest, bottle_name: str, out_path: Path) -> None: - """Write a pipelock YAML config (mode 600) carrying: - - the effective allowlist (hostnames), - - a fixed listen port, - - strict mode + forward_proxy.enabled + DLP defaults + scan_env. - - Deliberately contains no env values, no secrets, no per-agent - customization beyond the hostname list.""" - allowlist = pipelock_effective_allowlist(manifest, bottle_name) - trusted = pipelock_bottle_ssh_trusted_domains(manifest, bottle_name) - ip_cidrs = pipelock_bottle_ssh_ip_cidrs(manifest, bottle_name) - - lines: list[str] = [] - lines.append("version: 1") - lines.append("mode: strict") - lines.append("enforce: true") - lines.append("") - lines.append("# Hostnames the agent is allowed to reach. Effective list is") - lines.append("# claude-bottle defaults UNION bottle.egress.allowlist (sorted, deduped).") - lines.append("api_allowlist:") - for h in allowlist: - lines.append(f' - "{h}"') - lines.append("") - lines.append("forward_proxy:") - lines.append(" enabled: true") - lines.append("") - if trusted: - lines.append("trusted_domains:") - for td in trusted: - lines.append(f' - "{td}"') - lines.append("") - if ip_cidrs: - lines.append("ssrf:") - lines.append(" ip_allowlist:") - for cidr in ip_cidrs: - lines.append(f' - "{cidr}"') - lines.append("") - lines.append("dlp:") - lines.append(" include_defaults: true") - lines.append(" scan_env: true") - - out_path.write_text("\n".join(lines) + "\n") - out_path.chmod(0o600) - # --- Sidecar lifecycle ----------------------------------------------------- @@ -188,7 +141,7 @@ def pipelock_start( name = pipelock_container_name(slug) host_yaml = yaml_dir / yaml_filename if not host_yaml.is_file(): - die(f"pipelock yaml not found at {host_yaml}; pipelock_write_yaml must run first") + die(f"pipelock yaml not found at {host_yaml}; backend.prepare_proxy must run first") info(f"starting pipelock sidecar {name} on network {internal_network}") diff --git a/tests/test_pipelock_sidecar_smoke.py b/tests/test_pipelock_sidecar_smoke.py index 8fe5101..873e1c2 100644 --- a/tests/test_pipelock_sidecar_smoke.py +++ b/tests/test_pipelock_sidecar_smoke.py @@ -12,7 +12,9 @@ import unittest import urllib.request from pathlib import Path -from claude_bottle.pipelock import PIPELOCK_IMAGE, pipelock_write_yaml +from claude_bottle.backend import BottleSpec +from claude_bottle.backend.docker import DockerBottleBackend +from claude_bottle.pipelock import PIPELOCK_IMAGE from tests._docker import skip_unless_docker from tests.fixtures import fixture_minimal @@ -38,7 +40,14 @@ class TestPipelockSidecarSmoke(unittest.TestCase): ) def test_smoke(self): yaml_path = self.work_dir / "pipelock.yaml" - pipelock_write_yaml(fixture_minimal(), "dev", yaml_path) + spec = BottleSpec( + manifest=fixture_minimal(), + agent_name="demo", + copy_cwd=False, + user_cwd="/tmp", + forward_oauth_token=False, + ) + DockerBottleBackend().prepare_proxy(spec, yaml_path) create = subprocess.run( [ diff --git a/tests/test_pipelock_yaml.py b/tests/test_pipelock_yaml.py index 9602458..3647921 100644 --- a/tests/test_pipelock_yaml.py +++ b/tests/test_pipelock_yaml.py @@ -1,20 +1,34 @@ -"""Unit: pipelock_write_yaml — produces a YAML config containing the -expected top-level keys and per-bottle entries. We don't fully parse -YAML; we grep for content shape.""" +"""Unit: DockerBottleBackend.prepare_proxy — produces a pipelock YAML +config containing the expected top-level keys and per-bottle entries. +We don't fully parse YAML; we grep for content shape.""" import os import tempfile import unittest from pathlib import Path +from claude_bottle.backend import BottleSpec +from claude_bottle.backend.docker import DockerBottleBackend from claude_bottle.manifest import Manifest -from claude_bottle.pipelock import pipelock_write_yaml from tests.fixtures import fixture_minimal, fixture_with_ssh -class TestPipelockYaml(unittest.TestCase): +def _spec(manifest: Manifest) -> BottleSpec: + """Construct a minimal BottleSpec around a fixture manifest. The + fixtures all define an agent named 'demo' on a bottle named 'dev'.""" + return BottleSpec( + manifest=manifest, + agent_name="demo", + copy_cwd=False, + user_cwd="/tmp", + forward_oauth_token=False, + ) + + +class TestPrepareProxyYaml(unittest.TestCase): def setUp(self): self.out_dir = Path(tempfile.mkdtemp()) + self.backend = DockerBottleBackend() def tearDown(self): import shutil @@ -22,7 +36,7 @@ class TestPipelockYaml(unittest.TestCase): def test_minimal(self): yaml_path = self.out_dir / "min.yaml" - pipelock_write_yaml(fixture_minimal(), "dev", yaml_path) + self.backend.prepare_proxy(_spec(fixture_minimal()), yaml_path) content = yaml_path.read_text() self.assertIn("mode: strict", content) self.assertIn("enforce: true", content) @@ -40,7 +54,7 @@ class TestPipelockYaml(unittest.TestCase): def test_ssh_blocks(self): yaml_path = self.out_dir / "ssh.yaml" - pipelock_write_yaml(fixture_with_ssh(), "dev", yaml_path) + self.backend.prepare_proxy(_spec(fixture_with_ssh()), yaml_path) content = yaml_path.read_text() self.assertIn("trusted_domains:", content) self.assertIn("github.com", content) @@ -64,7 +78,7 @@ class TestPipelockYaml(unittest.TestCase): "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, }) yaml_path = self.out_dir / "secret.yaml" - pipelock_write_yaml(manifest, "dev", yaml_path) + self.backend.prepare_proxy(_spec(manifest), yaml_path) content = yaml_path.read_text() self.assertNotIn("literal-value-should-not-appear", content) self.assertNotIn("MY_SECRET", content) @@ -72,7 +86,7 @@ class TestPipelockYaml(unittest.TestCase): def test_file_mode_is_600(self): yaml_path = self.out_dir / "min.yaml" - pipelock_write_yaml(fixture_minimal(), "dev", yaml_path) + self.backend.prepare_proxy(_spec(fixture_minimal()), yaml_path) mode = os.stat(yaml_path).st_mode & 0o777 self.assertEqual(0o600, mode)