From 6130ea385fa31293890fbc9400b8a3f43c7d8f38 Mon Sep 17 00:00:00 2001 From: didericis Date: Tue, 12 May 2026 16:08:26 -0400 Subject: [PATCH] refactor(pipelock): drop bottle.ssh carve-outs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PRD 0007: SSH traffic now flows through the per-agent ssh-gate sidecar, so pipelock should know nothing about bottle.ssh. Removed: - pipelock_bottle_ssh_hostnames, _trusted_domains, _ip_cidrs. - The trusted_domains / ssrf blocks built from ssh entries. - pipelock_proxy_host_port — its last caller (the ssh provisioner) is gone. - is_ipv4_literal — only used to classify ssh hostnames into trusted_domains vs ssrf.ip_allowlist, both of which are gone. api_allowlist now derives solely from baked-in defaults + bottle.egress.allowlist. Tests updated to pin the new shape and assert ssh hostnames do NOT leak into pipelock's config. --- claude_bottle/backend/docker/pipelock.py | 4 --- claude_bottle/pipelock.py | 40 +++--------------------- claude_bottle/util.py | 13 -------- tests/unit/test_pipelock_allowlist.py | 34 ++++++-------------- tests/unit/test_pipelock_classify.py | 33 ------------------- tests/unit/test_pipelock_yaml.py | 27 +++++++++------- 6 files changed, 29 insertions(+), 122 deletions(-) delete mode 100644 tests/unit/test_pipelock_classify.py diff --git a/claude_bottle/backend/docker/pipelock.py b/claude_bottle/backend/docker/pipelock.py index 73c431d..7359da4 100644 --- a/claude_bottle/backend/docker/pipelock.py +++ b/claude_bottle/backend/docker/pipelock.py @@ -37,10 +37,6 @@ def pipelock_proxy_url(slug: str) -> str: return f"http://{pipelock_container_name(slug)}:{PIPELOCK_PORT}" -def pipelock_proxy_host_port(slug: str) -> str: - return f"{pipelock_container_name(slug)}:{PIPELOCK_PORT}" - - def pipelock_tls_init(stage_dir: Path) -> tuple[Path, Path]: """Generate a fresh per-bottle CA via a one-shot pipelock container. diff --git a/claude_bottle/pipelock.py b/claude_bottle/pipelock.py index e9238c7..0a4edfc 100644 --- a/claude_bottle/pipelock.py +++ b/claude_bottle/pipelock.py @@ -18,7 +18,6 @@ from pathlib import Path from typing import cast from .manifest import Bottle -from .util import is_ipv4_literal # Baked-in default allowlist for hosts Claude Code itself needs. DEFAULT_ALLOWLIST: tuple[str, ...] = ( @@ -40,30 +39,17 @@ def pipelock_bottle_allowlist(bottle: Bottle) -> list[str]: return list(bottle.egress.allowlist) -def pipelock_bottle_ssh_hostnames(bottle: Bottle) -> list[str]: - return [e.Hostname for e in bottle.ssh if e.Hostname] - - -def pipelock_bottle_ssh_trusted_domains(bottle: Bottle) -> list[str]: - return [h for h in pipelock_bottle_ssh_hostnames(bottle) if not is_ipv4_literal(h)] - - -def pipelock_bottle_ssh_ip_cidrs(bottle: Bottle) -> list[str]: - return [f"{h}/32" for h in pipelock_bottle_ssh_hostnames(bottle) if is_ipv4_literal(h)] - - def pipelock_effective_allowlist(bottle: Bottle) -> list[str]: - """Deduplicated union of: baked-in defaults, bottle.egress.allowlist, - bottle.ssh[].Hostname. Sorted for stability.""" + """Deduplicated union of: baked-in defaults, bottle.egress.allowlist. + Sorted for stability. Per PRD 0007, bottle.ssh entries do NOT + contribute here — SSH traffic flows through the per-agent ssh-gate + sidecar, not pipelock.""" seen: dict[str, None] = {} for h in DEFAULT_ALLOWLIST: seen.setdefault(h, None) for h in pipelock_bottle_allowlist(bottle): if h: seen.setdefault(h, None) - for h in pipelock_bottle_ssh_hostnames(bottle): - if h: - seen.setdefault(h, None) return sorted(seen.keys()) @@ -116,12 +102,6 @@ def pipelock_build_config( "api_allowlist": pipelock_effective_allowlist(bottle), "forward_proxy": {"enabled": True}, } - trusted = pipelock_bottle_ssh_trusted_domains(bottle) - if trusted: - cfg["trusted_domains"] = trusted - ip_cidrs = pipelock_bottle_ssh_ip_cidrs(bottle) - if ip_cidrs: - cfg["ssrf"] = {"ip_allowlist": ip_cidrs} cfg["dlp"] = {"include_defaults": True, "scan_env": True} # Body-scan enforcement is a separate pipelock section (each DLP # "surface" — body, MCP, response — has its own action). Pipelock's @@ -163,18 +143,6 @@ def pipelock_render_yaml(cfg: dict[str, object]) -> str: fp = cast(dict[str, object], cfg["forward_proxy"]) lines.append(f" enabled: {_bool(fp['enabled'])}") lines.append("") - if "trusted_domains" in cfg: - lines.append("trusted_domains:") - for td in cast(list[str], cfg["trusted_domains"]): - lines.append(f' - "{td}"') - lines.append("") - if "ssrf" in cfg: - lines.append("ssrf:") - ssrf = cast(dict[str, object], cfg["ssrf"]) - lines.append(" ip_allowlist:") - for cidr in cast(list[str], ssrf["ip_allowlist"]): - lines.append(f' - "{cidr}"') - lines.append("") lines.append("dlp:") dlp = cast(dict[str, object], cfg["dlp"]) lines.append(f" include_defaults: {_bool(dlp['include_defaults'])}") diff --git a/claude_bottle/util.py b/claude_bottle/util.py index c8108f3..adbd2a2 100644 --- a/claude_bottle/util.py +++ b/claude_bottle/util.py @@ -6,7 +6,6 @@ level deeper, under their backend package.""" from __future__ import annotations import os -import re def expand_tilde(path: str) -> str: @@ -17,15 +16,3 @@ def expand_tilde(path: str) -> str: home = os.environ.get("HOME", "") return home + path[1:] return path - - -_IPV4_RE = re.compile(r"^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+$") - - -def is_ipv4_literal(s: str) -> bool: - """True iff `s` looks like a dotted-quad IPv4 literal. Does not - validate octet ranges; consumers that care about that should run - a stricter check. Empty input returns False.""" - if not s: - return False - return bool(_IPV4_RE.match(s)) diff --git a/tests/unit/test_pipelock_allowlist.py b/tests/unit/test_pipelock_allowlist.py index 501f364..3cd6626 100644 --- a/tests/unit/test_pipelock_allowlist.py +++ b/tests/unit/test_pipelock_allowlist.py @@ -1,20 +1,12 @@ -"""Unit: pipelock_effective_allowlist — the union of baked-in defaults, -bottle.egress.allowlist, and bottle.ssh[].Hostname. Plus a small check -that IPv4 hostnames pick up the /32 suffix when classified as CIDRs. - -The lower-level one-line helpers (pipelock_bottle_allowlist, -pipelock_bottle_ssh_hostnames, pipelock_bottle_ssh_trusted_domains) -are exercised end-to-end by test_union_and_dedup, so they don't get -their own tests.""" +"""Unit: pipelock_effective_allowlist — the union of baked-in defaults +and bottle.egress.allowlist. Per PRD 0007, bottle.ssh entries do NOT +contribute (SSH traffic goes through the per-agent ssh-gate, not +pipelock).""" import unittest from claude_bottle.manifest import Manifest -from claude_bottle.pipelock import ( - pipelock_bottle_ssh_ip_cidrs, - pipelock_effective_allowlist, -) -from tests.fixtures import fixture_with_ssh +from claude_bottle.pipelock import pipelock_effective_allowlist class TestEffectiveAllowlist(unittest.TestCase): @@ -36,20 +28,14 @@ class TestEffectiveAllowlist(unittest.TestCase): eff = pipelock_effective_allowlist(manifest.bottles["dev"]) self.assertIn("api.anthropic.com", eff, "baked default present") self.assertIn("registry.npmjs.org", eff, "egress.allowlist present") - self.assertIn("100.78.141.42", eff, "ssh ipv4 hostname present") - self.assertIn("github.com", eff, "ssh hostname present") + # PRD 0007: ssh hostnames must not contribute to pipelock's + # allowlist anymore — they're routed through the ssh-gate + # sidecar, which is on its own egress path. + self.assertNotIn("100.78.141.42", eff) + self.assertNotIn("github.com", eff) self.assertEqual(len(eff), len(set(eff)), "deduplicated") self.assertEqual(eff, sorted(eff), "sorted") -class TestSSHIPCidrs(unittest.TestCase): - def test_ipv4_hostname_gets_32_suffix(self): - cidrs = pipelock_bottle_ssh_ip_cidrs(fixture_with_ssh().bottles["dev"]) - self.assertIn("100.78.141.42/32", cidrs) - # Hostname-typed entries don't end up here. - self.assertNotIn("github.com", cidrs) - self.assertNotIn("github.com/32", cidrs) - - if __name__ == "__main__": unittest.main() diff --git a/tests/unit/test_pipelock_classify.py b/tests/unit/test_pipelock_classify.py deleted file mode 100644 index 3c08ddf..0000000 --- a/tests/unit/test_pipelock_classify.py +++ /dev/null @@ -1,33 +0,0 @@ -"""Unit: is_ipv4_literal — the classifier that decides whether -bottle.ssh[].Hostname goes into pipelock's ssrf.ip_allowlist (IPv4 -literal) or trusted_domains (hostname).""" - -import unittest - -from claude_bottle.util import is_ipv4_literal - - -class TestIPv4Classify(unittest.TestCase): - def test_positive(self): - for ip in ("127.0.0.1", "10.0.0.5", "100.78.141.42", "0.0.0.0", "255.255.255.255"): - with self.subTest(ip=ip): - self.assertTrue(is_ipv4_literal(ip), ip) - - def test_negative(self): - for hn in ( - "github.com", - "gitea.dideric.is", - "100.78.141", - "100.78.141.42.5", - "::1", - "fe80::1", - "localhost", - "", - "1.2.3.4.example.com", - ): - with self.subTest(hn=hn): - self.assertFalse(is_ipv4_literal(hn), hn) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/unit/test_pipelock_yaml.py b/tests/unit/test_pipelock_yaml.py index f039752..b4fd9ea 100644 --- a/tests/unit/test_pipelock_yaml.py +++ b/tests/unit/test_pipelock_yaml.py @@ -34,23 +34,25 @@ class TestBuildConfig(unittest.TestCase): # Baked defaults always present. self.assertIn("api.anthropic.com", cast(list[str], cfg["api_allowlist"])) self.assertIn("raw.githubusercontent.com", cast(list[str], cfg["api_allowlist"])) - # No SSH entries → no trusted_domains, no ssrf. + # PRD 0007: pipelock has no SSH carve-outs at all — neither + # trusted_domains nor ssrf are ever emitted from bottle data + # in v1. self.assertNotIn("trusted_domains", cfg) self.assertNotIn("ssrf", cfg) # Without CA paths, the tls_interception block is omitted — # pipelock falls back to its built-in default of `enabled: false`. self.assertNotIn("tls_interception", cfg) - def test_ssh_shape(self): + def test_ssh_entries_do_not_leak_into_pipelock(self): + # PRD 0007: bottle.ssh routes through the ssh-gate sidecar, + # so pipelock's config must not reflect those hostnames or + # IPs in any of its blocks. cfg = pipelock_build_config(fixture_with_ssh().bottles["dev"]) - self.assertIn("github.com", cast(list[str], cfg["trusted_domains"])) - self.assertNotIn("100.78.141.42", cast(list[str], cfg["trusted_domains"])) - self.assertIn( - "100.78.141.42/32", - cast(dict[str, Any], cfg["ssrf"])["ip_allowlist"], - ) - # Strict mode: IPv4 host is also in the api_allowlist union. - self.assertIn("100.78.141.42", cast(list[str], cfg["api_allowlist"])) + allow = cast(list[str], cfg["api_allowlist"]) + self.assertNotIn("github.com", allow) + self.assertNotIn("100.78.141.42", allow) + self.assertNotIn("trusted_domains", cfg) + self.assertNotIn("ssrf", cfg) def test_tls_interception_block_emitted_when_paths_supplied(self): # PRD 0006: paths flow in via DockerPipelockProxy's in-container @@ -95,12 +97,13 @@ class TestRenderAndWrite(unittest.TestCase): for required in ( "api_allowlist:", "forward_proxy:", - "trusted_domains:", - "ssrf:", "dlp:", "request_body_scanning:", ): self.assertIn(required, text) + # PRD 0007: no ssh carve-outs in the rendered yaml. + self.assertNotIn("trusted_domains:", text) + self.assertNotIn("ssrf:", text) def test_prepare_writes_file_at_mode_600(self): plan = DockerPipelockProxy().prepare(