From fed006441dc6568ddc6ffe2329b343e800c70a0b Mon Sep 17 00:00:00 2001 From: codex Date: Thu, 28 May 2026 19:32:31 -0400 Subject: [PATCH] fix(pipelock): allow route ssrf ip policy --- README.md | 15 +++++++++++ bot_bottle/manifest.py | 38 ++++++++++++++++++++++++--- bot_bottle/pipelock.py | 23 ++++++++++++++-- tests/unit/test_manifest_egress.py | 25 ++++++++++++++++++ tests/unit/test_pipelock_allowlist.py | 25 ++++++++++++++++++ tests/unit/test_pipelock_yaml.py | 14 ++++++++++ 6 files changed, 134 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 07280eb..a5d76b2 100644 --- a/README.md +++ b/README.md @@ -435,6 +435,21 @@ egress: tls_passthrough: true ``` +Routes that resolve to private or Tailscale addresses can opt into +pipelock's SSRF destination allowlist explicitly: + +```yaml +egress: + routes: + - host: gitea.dideric.is + auth: + scheme: token + token_ref: BOT_BOTTLE_GITEA_TOKEN + pipelock: + ssrf_ip_allowlist: + - 100.78.141.42/32 +``` + At launch, `cli.py` reads `BOT_BOTTLE_CLAUDE_OAUTH_TOKEN` from the host env and forwards it into the cred-proxy container's environ — never into the agent's. The agent receives `ANTHROPIC_BASE_URL` pointing at diff --git a/bot_bottle/manifest.py b/bot_bottle/manifest.py index 43a05b3..ed760fe 100644 --- a/bot_bottle/manifest.py +++ b/bot_bottle/manifest.py @@ -19,7 +19,7 @@ Bottle schema (frontmatter): remotes: { : , ... } # optional egress: { routes: [ , ... ] } # route keys: host, path_allowlist, auth, role, pipelock - # pipelock: { tls_passthrough: } + # pipelock: { tls_passthrough: , ssrf_ip_allowlist: [, ...] } supervise: # optional Agent schema (frontmatter): @@ -43,6 +43,7 @@ on-disk files. from __future__ import annotations +import ipaddress import json import os import re @@ -329,9 +330,13 @@ class PipelockRoutePolicy: `tls_interception.passthrough_domains`, so pipelock still enforces the hostname allowlist but does not MITM/decrypt request bodies or headers for that host. + + `SsrfIpAllowlist` adds explicit IPs/CIDRs to pipelock's SSRF + allowlist for private/internal destinations behind this route. """ TlsPassthrough: bool = False + SsrfIpAllowlist: tuple[str, ...] = () @classmethod def from_dict( @@ -340,10 +345,11 @@ class PipelockRoutePolicy: label = f"bottle '{bottle_name}' egress.routes[{idx}] pipelock" d = _as_json_object(raw, label) for k in d: - if k not in ("tls_passthrough",): + if k not in ("tls_passthrough", "ssrf_ip_allowlist"): die( f"{label} has unknown key {k!r}; " - f"only 'tls_passthrough' is accepted" + f"only 'tls_passthrough' and 'ssrf_ip_allowlist' " + f"are accepted" ) tls_passthrough_raw = d.get("tls_passthrough", False) if not isinstance(tls_passthrough_raw, bool): @@ -351,7 +357,31 @@ class PipelockRoutePolicy: f"{label}.tls_passthrough must be a boolean " f"(was {type(tls_passthrough_raw).__name__})" ) - return cls(TlsPassthrough=tls_passthrough_raw) + ssrf_raw = d.get("ssrf_ip_allowlist", []) + if not isinstance(ssrf_raw, list): + die( + f"{label}.ssrf_ip_allowlist must be an array " + f"(was {type(ssrf_raw).__name__})" + ) + ssrf_ip_allowlist: list[str] = [] + for j, item in enumerate(ssrf_raw): + if not isinstance(item, str) or not item: + die( + f"{label}.ssrf_ip_allowlist[{j}] must be a non-empty " + f"string (was {type(item).__name__})" + ) + try: + ipaddress.ip_network(item, strict=False) + except ValueError as e: + die( + f"{label}.ssrf_ip_allowlist[{j}] must be an IP address " + f"or CIDR (was {item!r}): {e}" + ) + ssrf_ip_allowlist.append(item) + return cls( + TlsPassthrough=tls_passthrough_raw, + SsrfIpAllowlist=tuple(ssrf_ip_allowlist), + ) @dataclass(frozen=True) diff --git a/bot_bottle/pipelock.py b/bot_bottle/pipelock.py index 6d137fb..793a2aa 100644 --- a/bot_bottle/pipelock.py +++ b/bot_bottle/pipelock.py @@ -114,6 +114,22 @@ def pipelock_effective_tls_passthrough(bottle: Bottle) -> list[str]: return sorted(seen.keys()) +def pipelock_effective_ssrf_ip_allowlist( + bottle: Bottle, + extra: tuple[str, ...] = (), +) -> list[str]: + """IP/CIDR entries that bypass pipelock's SSRF destination guard. + + Launch code can pass backend-owned entries through `extra`, while + route-owned entries come from `pipelock.ssrf_ip_allowlist`. + """ + seen: dict[str, None] = {ip: None for ip in extra} + for route in bottle.egress.routes: + for ip in route.Pipelock.SsrfIpAllowlist: + seen.setdefault(ip, None) + return sorted(seen.keys()) + + @@ -191,8 +207,11 @@ def pipelock_build_config( "ca_key": ca_key_path, "passthrough_domains": pipelock_effective_tls_passthrough(bottle), } - if ssrf_ip_allowlist: - cfg["ssrf"] = {"ip_allowlist": list(ssrf_ip_allowlist)} + effective_ssrf_ip_allowlist = pipelock_effective_ssrf_ip_allowlist( + bottle, ssrf_ip_allowlist, + ) + if effective_ssrf_ip_allowlist: + cfg["ssrf"] = {"ip_allowlist": effective_ssrf_ip_allowlist} return cfg diff --git a/tests/unit/test_manifest_egress.py b/tests/unit/test_manifest_egress.py index 43f89ef..95f91e0 100644 --- a/tests/unit/test_manifest_egress.py +++ b/tests/unit/test_manifest_egress.py @@ -223,9 +223,20 @@ class TestPipelockPolicy(unittest.TestCase): }]) self.assertTrue(b.egress.routes[0].Pipelock.TlsPassthrough) + def test_ssrf_ip_allowlist_route_policy(self): + b = _bottle([{ + "host": "gitea.dideric.is", + "pipelock": {"ssrf_ip_allowlist": ["100.78.141.42/32"]}, + }]) + self.assertEqual( + ("100.78.141.42/32",), + b.egress.routes[0].Pipelock.SsrfIpAllowlist, + ) + def test_tls_passthrough_defaults_false(self): b = _bottle([{"host": "api.openai.com"}]) self.assertFalse(b.egress.routes[0].Pipelock.TlsPassthrough) + self.assertEqual((), b.egress.routes[0].Pipelock.SsrfIpAllowlist) def test_pipelock_policy_must_be_object(self): with self.assertRaises(Die): @@ -238,6 +249,20 @@ class TestPipelockPolicy(unittest.TestCase): "pipelock": {"tls_passthrough": "yes"}, }]) + def test_ssrf_ip_allowlist_must_be_array(self): + with self.assertRaises(Die): + _bottle([{ + "host": "x.example", + "pipelock": {"ssrf_ip_allowlist": "100.78.141.42/32"}, + }]) + + def test_ssrf_ip_allowlist_items_must_be_cidr_or_ip(self): + with self.assertRaises(Die): + _bottle([{ + "host": "x.example", + "pipelock": {"ssrf_ip_allowlist": ["not-an-ip"]}, + }]) + def test_unknown_pipelock_key_rejected(self): with self.assertRaises(Die): _bottle([{"host": "x.example", "pipelock": {"wat": True}}]) diff --git a/tests/unit/test_pipelock_allowlist.py b/tests/unit/test_pipelock_allowlist.py index 5a85b0c..0443736 100644 --- a/tests/unit/test_pipelock_allowlist.py +++ b/tests/unit/test_pipelock_allowlist.py @@ -8,6 +8,7 @@ import unittest from bot_bottle.manifest import Manifest from bot_bottle.pipelock import ( pipelock_effective_allowlist, + pipelock_effective_ssrf_ip_allowlist, pipelock_effective_tls_passthrough, ) @@ -113,5 +114,29 @@ class TestTlsPassthrough(unittest.TestCase): self.assertEqual(["api.openai.com"], passthrough) +class TestSsrfIpAllowlist(unittest.TestCase): + def test_default_empty(self): + allowlist = pipelock_effective_ssrf_ip_allowlist(_bottle({})) + self.assertEqual([], allowlist) + + def test_route_policy_adds_ssrf_ip_allowlist(self): + allowlist = pipelock_effective_ssrf_ip_allowlist(_bottle(_routes([ + {"host": "gitea.dideric.is", + "auth": {"scheme": "token", "token_ref": "G"}, + "pipelock": {"ssrf_ip_allowlist": ["100.78.141.42/32"]}}, + ]))) + self.assertEqual(["100.78.141.42/32"], allowlist) + + def test_route_policy_merges_with_extra(self): + allowlist = pipelock_effective_ssrf_ip_allowlist( + _bottle(_routes([ + {"host": "gitea.dideric.is", + "pipelock": {"ssrf_ip_allowlist": ["100.78.141.42/32"]}}, + ])), + ("172.20.0.0/16",), + ) + self.assertEqual(["100.78.141.42/32", "172.20.0.0/16"], allowlist) + + if __name__ == "__main__": unittest.main() diff --git a/tests/unit/test_pipelock_yaml.py b/tests/unit/test_pipelock_yaml.py index 913826d..1728242 100644 --- a/tests/unit/test_pipelock_yaml.py +++ b/tests/unit/test_pipelock_yaml.py @@ -111,6 +111,20 @@ class TestBuildConfig(unittest.TestCase): self.assertIn("ssrf", cfg) self.assertEqual({"ip_allowlist": ["172.20.0.0/16"]}, cfg["ssrf"]) + def test_ssrf_block_emitted_from_route_policy(self): + bottle = Manifest.from_json_obj({ + "bottles": {"dev": {"egress": {"routes": [ + {"host": "gitea.dideric.is", + "pipelock": {"ssrf_ip_allowlist": ["100.78.141.42/32"]}}, + ]}}}, + "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, + }).bottles["dev"] + cfg = pipelock_build_config(bottle) + self.assertEqual( + {"ip_allowlist": ["100.78.141.42/32"]}, + cfg["ssrf"], + ) + def test_seed_phrase_detection_disabled_by_default(self): # Only the broad BIP-39 detector is disabled. The rest of # DLP remains enabled via the `dlp` and request-body sections.