feat(pipelock): auto-allowlist cred-proxy upstream hosts (PRD 0010)

bottle.tokens declarations contribute their upstream hosts to both
pipelock's allowlist (so cred-proxy can reach them) and
passthrough_domains (so pipelock doesn't MITM the connection —
cred-proxy validates real upstream certs with the system CA bundle).

Mapping: anthropic -> api.anthropic.com (already on defaults);
github -> api.github.com + github.com; gitea -> the entry's host;
npm -> registry.npmjs.org.
This commit is contained in:
2026-05-13 16:22:44 -04:00
parent 8334f51268
commit 051896ba4c
2 changed files with 140 additions and 20 deletions
+45 -2
View File
@@ -55,8 +55,35 @@ def pipelock_bottle_allowlist(bottle: Bottle) -> list[str]:
return list(bottle.egress.allowlist)
def pipelock_token_hosts(bottle: Bottle) -> list[str]:
"""Hostnames the cred-proxy sidecar (PRD 0010) talks to upstream
on the agent's behalf. Derived from `bottle.tokens[]`. Returned
sorted+deduped.
These hosts must be on pipelock's allowlist so cred-proxy's
outbound HTTPS traffic can leave the egress network, and on
pipelock's TLS-passthrough list so pipelock does not MITM them —
cred-proxy validates real upstream certs with the system CA store,
so a pipelock-bumped cert would fail trust."""
hosts: set[str] = set()
for t in bottle.tokens:
if t.Kind == "github":
hosts.add("api.github.com")
hosts.add("github.com")
elif t.Kind == "gitea":
if t.UpstreamHost:
hosts.add(t.UpstreamHost)
elif t.Kind == "npm":
hosts.add("registry.npmjs.org")
elif t.Kind == "anthropic":
# Already on DEFAULT_ALLOWLIST + DEFAULT_TLS_PASSTHROUGH.
hosts.add("api.anthropic.com")
return sorted(hosts)
def pipelock_effective_allowlist(bottle: Bottle) -> list[str]:
"""Deduplicated union of: baked-in defaults, bottle.egress.allowlist.
"""Deduplicated union of: baked-in defaults, bottle.egress.allowlist,
and the cred-proxy upstream hosts derived from bottle.tokens.
Sorted for stability. Git upstreams declared in `bottle.git` do NOT
contribute here — git traffic flows through the per-agent git-gate
sidecar (PRD 0008), not pipelock."""
@@ -66,6 +93,22 @@ def pipelock_effective_allowlist(bottle: Bottle) -> list[str]:
for h in pipelock_bottle_allowlist(bottle):
if h:
seen.setdefault(h, None)
for h in pipelock_token_hosts(bottle):
seen.setdefault(h, None)
return sorted(seen.keys())
def pipelock_effective_tls_passthrough(bottle: Bottle) -> list[str]:
"""Hostnames pipelock should pass through (no TLS MITM, no body
scan). Default carries the LLM API endpoint (its request bodies
legitimately trip DLP); cred-proxy upstream hosts are added so
cred-proxy's HTTPS client (which trusts only the real CA bundle)
can complete the upstream handshake."""
seen: dict[str, None] = {}
for h in DEFAULT_TLS_PASSTHROUGH:
seen.setdefault(h, None)
for h in pipelock_token_hosts(bottle):
seen.setdefault(h, None)
return sorted(seen.keys())
@@ -135,7 +178,7 @@ def pipelock_build_config(
"enabled": True,
"ca_cert": ca_cert_path,
"ca_key": ca_key_path,
"passthrough_domains": list(DEFAULT_TLS_PASSTHROUGH),
"passthrough_domains": pipelock_effective_tls_passthrough(bottle),
}
return cfg
+95 -18
View File
@@ -1,36 +1,113 @@
"""Unit: pipelock_effective_allowlist — the union of baked-in defaults
and bottle.egress.allowlist. Git upstreams declared in bottle.git do not
"""Unit: pipelock_effective_allowlist — the union of baked-in defaults,
bottle.egress.allowlist, and cred-proxy upstream hosts derived from
bottle.tokens (PRD 0010). Git upstreams declared in bottle.git do not
contribute here; they flow through the per-agent git-gate (PRD 0008)."""
import unittest
from claude_bottle.manifest import Manifest
from claude_bottle.pipelock import pipelock_effective_allowlist
from claude_bottle.pipelock import (
pipelock_effective_allowlist,
pipelock_effective_tls_passthrough,
pipelock_token_hosts,
)
def _bottle(spec):
return Manifest.from_json_obj({
"bottles": {"dev": spec},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
}).bottles["dev"]
class TestEffectiveAllowlist(unittest.TestCase):
def test_union_and_dedup(self):
manifest = Manifest.from_json_obj({
"bottles": {
"dev": {
"egress": {
"allowlist": [
"registry.npmjs.org",
# Duplicate of a baked default; the union
# must dedupe.
"api.anthropic.com",
],
},
},
eff = pipelock_effective_allowlist(_bottle({
"egress": {
"allowlist": [
"registry.npmjs.org",
# Duplicate of a baked default; the union must dedupe.
"api.anthropic.com",
],
},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
})
eff = pipelock_effective_allowlist(manifest.bottles["dev"])
}))
self.assertIn("api.anthropic.com", eff, "baked default present")
self.assertIn("registry.npmjs.org", eff, "egress.allowlist present")
self.assertEqual(len(eff), len(set(eff)), "deduplicated")
self.assertEqual(eff, sorted(eff), "sorted")
class TestTokenHosts(unittest.TestCase):
def test_github_yields_both_hosts(self):
hosts = pipelock_token_hosts(_bottle({
"tokens": [{"Kind": "github", "TokenRef": "GH"}],
}))
self.assertEqual(["api.github.com", "github.com"], hosts)
def test_gitea_yields_configured_host(self):
hosts = pipelock_token_hosts(_bottle({
"tokens": [{"Kind": "gitea", "TokenRef": "T",
"Url": "https://gitea.dideric.is"}],
}))
self.assertEqual(["gitea.dideric.is"], hosts)
def test_npm_yields_registry(self):
hosts = pipelock_token_hosts(_bottle({
"tokens": [{"Kind": "npm", "TokenRef": "N"}],
}))
self.assertEqual(["registry.npmjs.org"], hosts)
def test_anthropic_yields_api_host(self):
hosts = pipelock_token_hosts(_bottle({
"tokens": [{"Kind": "anthropic", "TokenRef": "A"}],
}))
self.assertEqual(["api.anthropic.com"], hosts)
def test_no_tokens_empty(self):
self.assertEqual([], pipelock_token_hosts(_bottle({})))
class TestAllowlistWithTokens(unittest.TestCase):
def test_token_hosts_added_to_allowlist(self):
eff = pipelock_effective_allowlist(_bottle({
"tokens": [
{"Kind": "npm", "TokenRef": "N"},
{"Kind": "github", "TokenRef": "G"},
],
}))
self.assertIn("registry.npmjs.org", eff)
self.assertIn("api.github.com", eff)
self.assertIn("github.com", eff)
def test_gitea_host_added(self):
eff = pipelock_effective_allowlist(_bottle({
"tokens": [{"Kind": "gitea", "TokenRef": "T",
"Url": "https://gitea.dideric.is"}],
}))
self.assertIn("gitea.dideric.is", eff)
class TestTlsPassthrough(unittest.TestCase):
def test_default_includes_api_anthropic(self):
passthrough = pipelock_effective_tls_passthrough(_bottle({}))
self.assertEqual(["api.anthropic.com"], passthrough)
def test_token_hosts_added_to_passthrough(self):
# cred-proxy validates upstream certs with the real CA bundle;
# pipelock must not MITM these or the handshake fails.
passthrough = pipelock_effective_tls_passthrough(_bottle({
"tokens": [
{"Kind": "github", "TokenRef": "G"},
{"Kind": "npm", "TokenRef": "N"},
{"Kind": "gitea", "TokenRef": "T",
"Url": "https://gitea.dideric.is"},
],
}))
for host in ("api.anthropic.com", "api.github.com", "github.com",
"registry.npmjs.org", "gitea.dideric.is"):
self.assertIn(host, passthrough)
self.assertEqual(passthrough, sorted(passthrough), "sorted")
if __name__ == "__main__":
unittest.main()