feat(pipelock): allow route tls passthrough policy
test / unit (pull_request) Successful in 37s
test / integration (pull_request) Successful in 58s

This commit is contained in:
2026-05-28 19:19:40 -04:00
parent 3299674c30
commit bcadc07d09
11 changed files with 164 additions and 78 deletions
@@ -12,12 +12,11 @@ pipelock's per-bottle CA so curl trusts pipelock's bumped leaf, and
pipelock sees the decrypted body and returns its known
`blocked: request body contains secret: <pattern>` 403.
The host has to be allowlisted (so the CONNECT is accepted) but NOT
in `tls_interception.passthrough_domains` (so the body actually gets
scanned). `api.anthropic.com` is passthrough'd to skip MITM on the
LLM endpoint, so this probe targets `raw.githubusercontent.com` —
also on the baked allowlist (Claude Code fetches release assets from
it) and intercepted+scanned like any non-passthrough host."""
The host has to be allowlisted (so the CONNECT is accepted) but must
not opt into `pipelock.tls_passthrough` (so the body actually gets
scanned). This probe targets `raw.githubusercontent.com`, which is on
the baked allowlist and intercepted+scanned like any non-passthrough
host."""
from __future__ import annotations
@@ -1,17 +1,14 @@
"""Integration: pipelock's `tls_interception.passthrough_domains`
exempts api.anthropic.com from MITM, so request bodies that would
otherwise trip the body-scan layer (notably the BIP-39 seed-phrase
detector firing on user-authored Claude conversation text) are not
inspected and the request reaches Anthropic's TLS endpoint.
"""Integration: route-owned `pipelock.tls_passthrough` renders into
pipelock's `tls_interception.passthrough_domains`, so request bodies
that would otherwise trip the body-scan layer are not inspected and the
request reaches the provider TLS endpoint.
Probe: POST the canonical zero-entropy 12-word BIP-39 mnemonic
(`abandon` × 11 + `about`) — checksum-valid by construction — to
`https://api.anthropic.com/v1/messages`. Without the passthrough,
pipelock returns a 403 `blocked: request body contains secret:
BIP-39 Seed Phrase`. With it, pipelock relays the CONNECT opaquely
and the upstream replies with whatever it likes (401/4xx from
Anthropic for an unauthenticated junk POST). We assert that the
verdict is NOT pipelock's block.
`https://api.anthropic.com/v1/messages`. With the route policy,
pipelock relays the CONNECT opaquely and the upstream replies with
whatever it likes (401/4xx from Anthropic for an unauthenticated junk
POST). We assert that the verdict is NOT pipelock's block.
"""
from __future__ import annotations
@@ -46,7 +43,13 @@ class TestPipelockLlmPassthrough(unittest.TestCase):
def test_bip39_body_to_anthropic_is_not_blocked(self):
manifest = Manifest.from_json_obj({
"bottles": {
"dev": {"env": {"SEED": _BIP39_PHRASE}},
"dev": {
"env": {"SEED": _BIP39_PHRASE},
"egress": {"routes": [{
"host": "api.anthropic.com",
"pipelock": {"tls_passthrough": True},
}]},
},
},
"agents": {
"demo": {"skills": [], "prompt": "", "bottle": "dev"},
+4 -9
View File
@@ -310,15 +310,10 @@ class TestSandboxEscape(unittest.TestCase):
remediation lands as its own PRD before this test merges.
DON'T mark expectedFailure to silence it.
Destination note: we use `raw.githubusercontent.com` (one
of the DEFAULT_ALLOWLIST hosts) rather than
api.anthropic.com because pipelock passthrough's the
Anthropic API endpoint specifically — its DLP scanners
false-positive on real LLM conversation bodies (BIP-39
seed phrases, etc.). That trade-off is documented in
`pipelock.DEFAULT_TLS_PASSTHROUGH`. For non-passthrough
hosts pipelock MITMs and the DLP scan applies, which is
what this attack exercises."""
Destination note: we use `raw.githubusercontent.com`, one
of the DEFAULT_ALLOWLIST hosts. It is not route-configured
for pipelock TLS passthrough, so pipelock MITMs it and the
DLP scan applies, which is what this attack exercises."""
# Capture HTTP code via curl's -w; don't use --fail so
# we get the response body even on 4xx.
url_base = "https://raw.githubusercontent.com"
+28
View File
@@ -215,6 +215,34 @@ class TestRole(unittest.TestCase):
}])
class TestPipelockPolicy(unittest.TestCase):
def test_tls_passthrough_route_policy(self):
b = _bottle([{
"host": "api.openai.com",
"pipelock": {"tls_passthrough": True},
}])
self.assertTrue(b.egress.routes[0].Pipelock.TlsPassthrough)
def test_tls_passthrough_defaults_false(self):
b = _bottle([{"host": "api.openai.com"}])
self.assertFalse(b.egress.routes[0].Pipelock.TlsPassthrough)
def test_pipelock_policy_must_be_object(self):
with self.assertRaises(Die):
_bottle([{"host": "x.example", "pipelock": True}])
def test_tls_passthrough_must_be_bool(self):
with self.assertRaises(Die):
_bottle([{
"host": "x.example",
"pipelock": {"tls_passthrough": "yes"},
}])
def test_unknown_pipelock_key_rejected(self):
with self.assertRaises(Die):
_bottle([{"host": "x.example", "pipelock": {"wat": True}}])
class TestRouteValidation(unittest.TestCase):
def test_duplicate_hosts_rejected(self):
# Routes match by exact host; duplicates leave the choice
+14 -4
View File
@@ -89,18 +89,28 @@ class TestAllowlistWithRoutes(unittest.TestCase):
class TestTlsPassthrough(unittest.TestCase):
def test_default_includes_api_anthropic(self):
def test_default_empty(self):
passthrough = pipelock_effective_tls_passthrough(_bottle({}))
self.assertEqual(["api.anthropic.com"], passthrough)
self.assertEqual([], passthrough)
def test_route_hosts_NOT_added_to_passthrough(self):
def test_route_hosts_not_added_to_passthrough_by_default(self):
passthrough = pipelock_effective_tls_passthrough(_bottle(_routes([
{"host": "api.github.com",
"auth": {"scheme": "Bearer", "token_ref": "G"}},
{"host": "registry.npmjs.org",
"auth": {"scheme": "Bearer", "token_ref": "N"}},
])))
self.assertEqual(["api.anthropic.com"], passthrough)
self.assertEqual([], passthrough)
def test_route_policy_adds_tls_passthrough(self):
passthrough = pipelock_effective_tls_passthrough(_bottle(_routes([
{"host": "api.openai.com",
"auth": {"scheme": "Bearer", "token_ref": "O"},
"pipelock": {"tls_passthrough": True}},
{"host": "api.github.com",
"auth": {"scheme": "Bearer", "token_ref": "G"}},
])))
self.assertEqual(["api.openai.com"], passthrough)
if __name__ == "__main__":
+30 -12
View File
@@ -54,11 +54,7 @@ class TestBuildConfig(unittest.TestCase):
def test_tls_interception_block_emitted_when_paths_supplied(self):
# PRD 0006: paths flow in via the platform-neutral in-container
# constants; this directly pins the dict shape. passthrough_domains
# is baked in so LLM provider endpoints (api.anthropic.com) skip
# MITM — pipelock's docs explicitly recommend this for LLM hosts,
# and without it the BIP-39 body scanner false-positives on
# Claude conversation traffic.
# constants; this directly pins the dict shape.
cfg = pipelock_build_config(
fixture_minimal().bottles["dev"],
ca_cert_path="/etc/pipelock-ca.pem",
@@ -69,11 +65,28 @@ class TestBuildConfig(unittest.TestCase):
"enabled": True,
"ca_cert": "/etc/pipelock-ca.pem",
"ca_key": "/etc/pipelock-ca-key.pem",
"passthrough_domains": list(DEFAULT_TLS_PASSTHROUGH),
"passthrough_domains": [],
},
cfg["tls_interception"],
)
self.assertIn("api.anthropic.com", DEFAULT_TLS_PASSTHROUGH)
self.assertEqual((), DEFAULT_TLS_PASSTHROUGH)
def test_tls_passthrough_route_policy_emits_domain(self):
bottle = Manifest.from_json_obj({
"bottles": {"dev": {"egress": {"routes": [
{"host": "api.openai.com",
"auth": {"scheme": "Bearer", "token_ref": "T"},
"pipelock": {"tls_passthrough": True}},
]}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
}).bottles["dev"]
cfg = pipelock_build_config(
bottle,
ca_cert_path="/etc/pipelock-ca.pem",
ca_key_path="/etc/pipelock-ca-key.pem",
)
tls = cast(dict[str, object], cfg["tls_interception"])
self.assertEqual(["api.openai.com"], tls["passthrough_domains"])
def test_tls_interception_requires_both_paths(self):
# Half-set is a programmer error, not a silent omission.
@@ -179,19 +192,24 @@ class TestRenderAndWrite(unittest.TestCase):
"""`PipelockProxy.prepare` plumbs the module-level in-container
CA constants through to the YAML. The block should land in the
rendered output with `enabled: true`, the configured paths,
and the baked LLM-provider passthrough list. The actual
and any route-owned passthrough domains. The actual
host-side CA generation happens in launch (not prepare), so
this test exercises only the YAML rendering."""
plan = PipelockProxy().prepare(
fixture_minimal().bottles["dev"], "demo", self.out_dir
)
bottle = Manifest.from_json_obj({
"bottles": {"dev": {"egress": {"routes": [
{"host": "api.openai.com",
"pipelock": {"tls_passthrough": True}},
]}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
}).bottles["dev"]
plan = PipelockProxy().prepare(bottle, "demo", self.out_dir)
content = plan.yaml_path.read_text()
self.assertIn("tls_interception:", content)
self.assertIn("enabled: true", content)
self.assertIn('ca_cert: "/etc/pipelock-ca.pem"', content)
self.assertIn('ca_key: "/etc/pipelock-ca-key.pem"', content)
self.assertIn("passthrough_domains:", content)
self.assertIn('- "api.anthropic.com"', content)
self.assertIn('- "api.openai.com"', content)
def test_render_emits_ssrf_block_when_allowlist_given(self):
cfg = pipelock_build_config(