diff --git a/claude_bottle/backend/docker/backend.py b/claude_bottle/backend/docker/backend.py index 3e1d4a8..8b053ac 100644 --- a/claude_bottle/backend/docker/backend.py +++ b/claude_bottle/backend/docker/backend.py @@ -52,7 +52,6 @@ class DockerBottleBackend(BottleBackend): manifest.require_agent(spec.agent_name) agent = manifest.agents[spec.agent_name] bottle = manifest.bottle_for(spec.agent_name) - bottle_name = agent.bottle slug = docker_mod.slugify(spec.agent_name) @@ -106,7 +105,7 @@ class DockerBottleBackend(BottleBackend): env_resolve(manifest, spec.agent_name, env_file, args_file) prompt_file.write_text(agent.prompt) - allowlist_summary = pipelock.pipelock_allowlist_summary(manifest, bottle_name) + allowlist_summary = pipelock.pipelock_allowlist_summary(bottle) use_runsc = docker_mod.runsc_available() return DockerBottlePlan( diff --git a/claude_bottle/pipelock.py b/claude_bottle/pipelock.py index b19bd1a..f8454ab 100644 --- a/claude_bottle/pipelock.py +++ b/claude_bottle/pipelock.py @@ -18,7 +18,7 @@ from dataclasses import dataclass from pathlib import Path from .log import die, info, warn -from .manifest import Manifest +from .manifest import Bottle, Manifest from .util import is_ipv4_literal # Pipelock image, pinned by digest. The digest is the multi-arch image @@ -58,42 +58,42 @@ def pipelock_proxy_host_port(slug: str) -> str: # --- Allowlist resolution -------------------------------------------------- -def pipelock_bottle_allowlist(manifest: Manifest, bottle_name: str) -> list[str]: - """Hostnames in bottles[].egress.allowlist.""" - return list(manifest.bottles[bottle_name].egress.allowlist) +def pipelock_bottle_allowlist(bottle: Bottle) -> list[str]: + """Hostnames in bottle.egress.allowlist.""" + return list(bottle.egress.allowlist) -def pipelock_bottle_ssh_hostnames(manifest: Manifest, bottle_name: str) -> list[str]: - return [e.Hostname for e in manifest.bottles[bottle_name].ssh if e.Hostname] +def pipelock_bottle_ssh_hostnames(bottle: Bottle) -> list[str]: + return [e.Hostname for e in bottle.ssh if e.Hostname] -def pipelock_bottle_ssh_trusted_domains(manifest: Manifest, bottle_name: str) -> list[str]: - return [h for h in pipelock_bottle_ssh_hostnames(manifest, bottle_name) if not is_ipv4_literal(h)] +def pipelock_bottle_ssh_trusted_domains(bottle: Bottle) -> list[str]: + return [h for h in pipelock_bottle_ssh_hostnames(bottle) if not is_ipv4_literal(h)] -def pipelock_bottle_ssh_ip_cidrs(manifest: Manifest, bottle_name: str) -> list[str]: - return [f"{h}/32" for h in pipelock_bottle_ssh_hostnames(manifest, bottle_name) if is_ipv4_literal(h)] +def pipelock_bottle_ssh_ip_cidrs(bottle: Bottle) -> list[str]: + return [f"{h}/32" for h in pipelock_bottle_ssh_hostnames(bottle) if is_ipv4_literal(h)] -def pipelock_effective_allowlist(manifest: Manifest, bottle_name: str) -> list[str]: +def pipelock_effective_allowlist(bottle: Bottle) -> list[str]: """Deduplicated union of: baked-in defaults, bottle.egress.allowlist, bottle.ssh[].Hostname. Sorted for stability.""" seen: dict[str, None] = {} for h in DEFAULT_ALLOWLIST: seen.setdefault(h, None) - for h in pipelock_bottle_allowlist(manifest, bottle_name): + for h in pipelock_bottle_allowlist(bottle): if h: seen.setdefault(h, None) - for h in pipelock_bottle_ssh_hostnames(manifest, bottle_name): + for h in pipelock_bottle_ssh_hostnames(bottle): if h: seen.setdefault(h, None) return sorted(seen.keys()) -def pipelock_allowlist_summary(manifest: Manifest, bottle_name: str) -> str: +def pipelock_allowlist_summary(bottle: Bottle) -> str: """One-line summary for the y/N preflight display: " hosts allowed (host1, host2, host3, +M more)".""" - hosts = pipelock_effective_allowlist(manifest, bottle_name) + hosts = pipelock_effective_allowlist(bottle) count = len(hosts) if count == 0: return "0 hosts allowed (none)" @@ -142,9 +142,15 @@ class PipelockProxy: port, strict mode + forward_proxy + DLP defaults + scan_env. Deliberately contains no env values, no secrets, no per-agent customization beyond the hostname list.""" - allowlist = pipelock_effective_allowlist(manifest, bottle_name) - trusted = pipelock_bottle_ssh_trusted_domains(manifest, bottle_name) - ip_cidrs = pipelock_bottle_ssh_ip_cidrs(manifest, bottle_name) + return self._build_pipelock_yaml(manifest, bottle_name, slug, yaml_path) + + def _build_pipelock_yaml( + self, manifest: Manifest, bottle_name: str, slug: str, yaml_path: Path + ) -> PipelockProxyPlan: + bottle = manifest.bottles[bottle_name] + allowlist = pipelock_effective_allowlist(bottle) + trusted = pipelock_bottle_ssh_trusted_domains(bottle) + ip_cidrs = pipelock_bottle_ssh_ip_cidrs(bottle) lines: list[str] = [] lines.append("version: 1") diff --git a/tests/test_pipelock_allowlist.py b/tests/test_pipelock_allowlist.py index 887cfcf..90847e3 100644 --- a/tests/test_pipelock_allowlist.py +++ b/tests/test_pipelock_allowlist.py @@ -18,13 +18,13 @@ from tests.fixtures import fixture_minimal, fixture_with_egress, fixture_with_ss class TestBottleAllowlist(unittest.TestCase): def test_egress_allowlist_present(self): - out = pipelock_bottle_allowlist(fixture_with_egress(), "dev") + out = pipelock_bottle_allowlist(fixture_with_egress().bottles["dev"]) self.assertIn("github.com", out) self.assertIn("gitlab.com", out) self.assertIn("registry.npmjs.org", out) def test_empty_when_no_egress_block(self): - out = pipelock_bottle_allowlist(fixture_minimal(), "dev") + out = pipelock_bottle_allowlist(fixture_minimal().bottles["dev"]) self.assertEqual([], out) def test_rejects_non_string_entry(self): @@ -38,17 +38,17 @@ class TestBottleAllowlist(unittest.TestCase): class TestSSHHostnames(unittest.TestCase): def test_hostnames_include_both(self): - hosts = pipelock_bottle_ssh_hostnames(fixture_with_ssh(), "dev") + hosts = pipelock_bottle_ssh_hostnames(fixture_with_ssh().bottles["dev"]) self.assertIn("100.78.141.42", hosts) self.assertIn("github.com", hosts) def test_ip_cidrs_only_ipv4(self): - cidrs = pipelock_bottle_ssh_ip_cidrs(fixture_with_ssh(), "dev") + cidrs = pipelock_bottle_ssh_ip_cidrs(fixture_with_ssh().bottles["dev"]) self.assertIn("100.78.141.42/32", cidrs) self.assertNotIn("github.com", cidrs) def test_trusted_domains_only_hostnames(self): - trusted = pipelock_bottle_ssh_trusted_domains(fixture_with_ssh(), "dev") + trusted = pipelock_bottle_ssh_trusted_domains(fixture_with_ssh().bottles["dev"]) self.assertIn("github.com", trusted) self.assertNotIn("100.78.141.42", trusted) @@ -69,7 +69,7 @@ class TestEffectiveAllowlist(unittest.TestCase): }, "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, }) - eff = pipelock_effective_allowlist(manifest, "dev") + eff = pipelock_effective_allowlist(manifest.bottles["dev"]) self.assertIn("api.anthropic.com", eff) self.assertIn("registry.npmjs.org", eff) self.assertIn("100.78.141.42", eff)