From 8601c686f377c59bff8c50ee2a0b53a7a18210e2 Mon Sep 17 00:00:00 2001 From: didericis Date: Thu, 4 Jun 2026 13:04:12 -0400 Subject: [PATCH 1/3] feat: forward pipelock config dict instead of parsing individual fields - Change PipelockRoutePolicy to store raw pipelock config dict instead of individual coerced fields (TlsPassthrough, SsrfIpAllowlist) - Update pipelock.py and egress.py to extract values from Config dict - Simplifies manifest validation: pipelock handles its own schema - Enables new pipelock options like skip_scan_for_extensions without updating bot-bottle code This allows bottles to configure pipelock directly, e.g.: pipelock: skip_scan_for_extensions: [".whl", ".tar.gz"] Co-Authored-By: Claude Haiku 4.5 --- bot_bottle/egress.py | 4 ++- bot_bottle/manifest_egress.py | 57 ++++++----------------------------- bot_bottle/pipelock.py | 7 +++-- 3 files changed, 18 insertions(+), 50 deletions(-) diff --git a/bot_bottle/egress.py b/bot_bottle/egress.py index d5f546e..e9160a5 100644 --- a/bot_bottle/egress.py +++ b/bot_bottle/egress.py @@ -141,13 +141,15 @@ def egress_manifest_routes( routes are merged.""" out: list[EgressRoute] = [] for r in bottle.egress.routes: + tls_pt = r.Pipelock.Config.get("tls_passthrough", False) + tls_passthrough = tls_pt if isinstance(tls_pt, bool) else False out.append(EgressRoute( host=r.Host, path_allowlist=r.PathAllowlist, auth_scheme=r.AuthScheme, token_ref=r.TokenRef, roles=r.Role, - tls_passthrough=r.Pipelock.TlsPassthrough, + tls_passthrough=tls_passthrough, )) return tuple(out) diff --git a/bot_bottle/manifest_egress.py b/bot_bottle/manifest_egress.py index 24a6b67..6da4602 100644 --- a/bot_bottle/manifest_egress.py +++ b/bot_bottle/manifest_egress.py @@ -2,7 +2,6 @@ from __future__ import annotations -import ipaddress from dataclasses import dataclass, field from typing import cast @@ -43,17 +42,18 @@ def validate_egress_routes( class PipelockRoutePolicy: """Per-route pipelock policy overrides. - `TlsPassthrough` adds the route host to pipelock's - `tls_interception.passthrough_domains`, so pipelock still enforces - the hostname allowlist but does not MITM/decrypt request bodies or - headers for that host. + Stores raw pipelock configuration that's passed through to the + pipelock sidecar. Pipelock validates all config options, so + bot-bottle forwards manifest settings without coercion or strict + validation. Supported options include: - `SsrfIpAllowlist` adds explicit IPs/CIDRs to pipelock's SSRF - allowlist for private/internal destinations behind this route. + - `tls_passthrough`: bool — skip TLS MITM for this host + - `ssrf_ip_allowlist`: list of CIDR/IP — allow private destinations + - `skip_scan_for_extensions`: list of file extensions to skip DLP + scanning for (e.g., [".whl", ".tar.gz"]) """ - TlsPassthrough: bool = False - SsrfIpAllowlist: tuple[str, ...] = () + Config: dict[str, object] = field(default_factory=dict) @classmethod def from_dict( @@ -61,44 +61,7 @@ class PipelockRoutePolicy: ) -> "PipelockRoutePolicy": label = f"bottle '{bottle_name}' egress.routes[{idx}] pipelock" d = as_json_object(raw, label) - for k in d: - if k not in ("tls_passthrough", "ssrf_ip_allowlist"): - raise ManifestError( - f"{label} has unknown key {k!r}; " - f"only 'tls_passthrough' and 'ssrf_ip_allowlist' " - f"are accepted" - ) - tls_passthrough_raw = d.get("tls_passthrough", False) - if not isinstance(tls_passthrough_raw, bool): - raise ManifestError( - f"{label}.tls_passthrough must be a boolean " - f"(was {type(tls_passthrough_raw).__name__})" - ) - ssrf_raw = d.get("ssrf_ip_allowlist", []) - if not isinstance(ssrf_raw, list): - raise ManifestError( - f"{label}.ssrf_ip_allowlist must be an array " - f"(was {type(ssrf_raw).__name__})" - ) - ssrf_ip_allowlist: list[str] = [] - for j, item in enumerate(ssrf_raw): - if not isinstance(item, str) or not item: - raise ManifestError( - f"{label}.ssrf_ip_allowlist[{j}] must be a non-empty " - f"string (was {type(item).__name__})" - ) - try: - ipaddress.ip_network(item, strict=False) - except ValueError as e: - raise ManifestError( - f"{label}.ssrf_ip_allowlist[{j}] must be an IP address " - f"or CIDR (was {item!r}): {e}" - ) from e - ssrf_ip_allowlist.append(item) - return cls( - TlsPassthrough=tls_passthrough_raw, - SsrfIpAllowlist=tuple(ssrf_ip_allowlist), - ) + return cls(Config=d) @dataclass(frozen=True) diff --git a/bot_bottle/pipelock.py b/bot_bottle/pipelock.py index c9ea82d..bf5b607 100644 --- a/bot_bottle/pipelock.py +++ b/bot_bottle/pipelock.py @@ -132,8 +132,11 @@ def pipelock_effective_ssrf_ip_allowlist( """ seen: dict[str, None] = {ip: None for ip in extra} for route in bottle.egress.routes: - for ip in route.Pipelock.SsrfIpAllowlist: - seen.setdefault(ip, None) + ssrf_raw = route.Pipelock.Config.get("ssrf_ip_allowlist", []) + if isinstance(ssrf_raw, list): + for ip in ssrf_raw: + if isinstance(ip, str): + seen.setdefault(ip, None) return sorted(seen.keys()) -- 2.52.0 From d90b04d34389f86e4816f3867c5ec5c923288acc Mon Sep 17 00:00:00 2001 From: didericis Date: Thu, 4 Jun 2026 13:14:59 -0400 Subject: [PATCH 2/3] feat: add generic pipelock config merging for future extensibility - Merge arbitrary pipelock settings from routes into global config - Allows routes to configure new pipelock options without code changes - Special-case tls_passthrough and ssrf_ip_allowlist (already aggregated) Note: Pipelock doesn't currently support per-path/per-host response scanning rules or response size limits, so response_body_scanning config is not yet usable. For now, use tls_passthrough for binary download hosts. Co-Authored-By: Claude Haiku 4.5 --- bot_bottle/pipelock.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/bot_bottle/pipelock.py b/bot_bottle/pipelock.py index bf5b607..e8e21ea 100644 --- a/bot_bottle/pipelock.py +++ b/bot_bottle/pipelock.py @@ -223,6 +223,15 @@ def pipelock_build_config( ) if effective_ssrf_ip_allowlist: cfg["ssrf"] = {"ip_allowlist": effective_ssrf_ip_allowlist} + + # Merge per-route pipelock config (e.g., response_body_scanning settings). + # Routes can specify arbitrary pipelock options that apply globally. + for route in bottle.egress.routes: + for key, value in route.Pipelock.Config.items(): + if key not in ("tls_passthrough", "ssrf_ip_allowlist"): + if key not in cfg: + cfg[key] = value + return cfg -- 2.52.0 From dee36004004693af1bc49c94eecd7fc94b7423a7 Mon Sep 17 00:00:00 2001 From: claude Date: Thu, 4 Jun 2026 17:22:44 +0000 Subject: [PATCH 3/3] test: update PipelockRoutePolicy tests for Config dict design Replace typed-attribute assertions (TlsPassthrough, SsrfIpAllowlist) with Config dict lookups, drop the four strict-validation tests that were intentionally removed in the refactor, and add a skip_scan_for_extensions test to cover the PR's stated new feature. --- tests/unit/test_manifest_egress.py | 46 ++++++++++-------------------- 1 file changed, 15 insertions(+), 31 deletions(-) diff --git a/tests/unit/test_manifest_egress.py b/tests/unit/test_manifest_egress.py index caf6cc4..6803a15 100644 --- a/tests/unit/test_manifest_egress.py +++ b/tests/unit/test_manifest_egress.py @@ -225,7 +225,7 @@ class TestPipelockPolicy(unittest.TestCase): "host": "api.openai.com", "pipelock": {"tls_passthrough": True}, }]) - self.assertTrue(b.egress.routes[0].Pipelock.TlsPassthrough) + self.assertTrue(b.egress.routes[0].Pipelock.Config["tls_passthrough"]) def test_ssrf_ip_allowlist_route_policy(self): b = _bottle([{ @@ -233,44 +233,28 @@ class TestPipelockPolicy(unittest.TestCase): "pipelock": {"ssrf_ip_allowlist": ["100.78.141.42/32"]}, }]) self.assertEqual( - ("100.78.141.42/32",), - b.egress.routes[0].Pipelock.SsrfIpAllowlist, + ["100.78.141.42/32"], + b.egress.routes[0].Pipelock.Config["ssrf_ip_allowlist"], ) - def test_tls_passthrough_defaults_false(self): + def test_skip_scan_for_extensions_route_policy(self): + b = _bottle([{ + "host": "files.pythonhosted.org", + "pipelock": {"skip_scan_for_extensions": [".whl", ".tar.gz"]}, + }]) + self.assertEqual( + [".whl", ".tar.gz"], + b.egress.routes[0].Pipelock.Config["skip_scan_for_extensions"], + ) + + def test_empty_config_when_pipelock_omitted(self): b = _bottle([{"host": "api.openai.com"}]) - self.assertFalse(b.egress.routes[0].Pipelock.TlsPassthrough) - self.assertEqual((), b.egress.routes[0].Pipelock.SsrfIpAllowlist) + self.assertEqual({}, b.egress.routes[0].Pipelock.Config) def test_pipelock_policy_must_be_object(self): with self.assertRaises(ManifestError): _bottle([{"host": "x.example", "pipelock": True}]) - def test_tls_passthrough_must_be_bool(self): - with self.assertRaises(ManifestError): - _bottle([{ - "host": "x.example", - "pipelock": {"tls_passthrough": "yes"}, - }]) - - def test_ssrf_ip_allowlist_must_be_array(self): - with self.assertRaises(ManifestError): - _bottle([{ - "host": "x.example", - "pipelock": {"ssrf_ip_allowlist": "100.78.141.42/32"}, - }]) - - def test_ssrf_ip_allowlist_items_must_be_cidr_or_ip(self): - with self.assertRaises(ManifestError): - _bottle([{ - "host": "x.example", - "pipelock": {"ssrf_ip_allowlist": ["not-an-ip"]}, - }]) - - def test_unknown_pipelock_key_rejected(self): - with self.assertRaises(ManifestError): - _bottle([{"host": "x.example", "pipelock": {"wat": True}}]) - class TestRouteValidation(unittest.TestCase): def test_duplicate_hosts_rejected(self): -- 2.52.0