diff --git a/bot_bottle/egress.py b/bot_bottle/egress.py index d5f546e..e9160a5 100644 --- a/bot_bottle/egress.py +++ b/bot_bottle/egress.py @@ -141,13 +141,15 @@ def egress_manifest_routes( routes are merged.""" out: list[EgressRoute] = [] for r in bottle.egress.routes: + tls_pt = r.Pipelock.Config.get("tls_passthrough", False) + tls_passthrough = tls_pt if isinstance(tls_pt, bool) else False out.append(EgressRoute( host=r.Host, path_allowlist=r.PathAllowlist, auth_scheme=r.AuthScheme, token_ref=r.TokenRef, roles=r.Role, - tls_passthrough=r.Pipelock.TlsPassthrough, + tls_passthrough=tls_passthrough, )) return tuple(out) diff --git a/bot_bottle/manifest_egress.py b/bot_bottle/manifest_egress.py index 24a6b67..6da4602 100644 --- a/bot_bottle/manifest_egress.py +++ b/bot_bottle/manifest_egress.py @@ -2,7 +2,6 @@ from __future__ import annotations -import ipaddress from dataclasses import dataclass, field from typing import cast @@ -43,17 +42,18 @@ def validate_egress_routes( class PipelockRoutePolicy: """Per-route pipelock policy overrides. - `TlsPassthrough` adds the route host to pipelock's - `tls_interception.passthrough_domains`, so pipelock still enforces - the hostname allowlist but does not MITM/decrypt request bodies or - headers for that host. + Stores raw pipelock configuration that's passed through to the + pipelock sidecar. Pipelock validates all config options, so + bot-bottle forwards manifest settings without coercion or strict + validation. Supported options include: - `SsrfIpAllowlist` adds explicit IPs/CIDRs to pipelock's SSRF - allowlist for private/internal destinations behind this route. + - `tls_passthrough`: bool — skip TLS MITM for this host + - `ssrf_ip_allowlist`: list of CIDR/IP — allow private destinations + - `skip_scan_for_extensions`: list of file extensions to skip DLP + scanning for (e.g., [".whl", ".tar.gz"]) """ - TlsPassthrough: bool = False - SsrfIpAllowlist: tuple[str, ...] = () + Config: dict[str, object] = field(default_factory=dict) @classmethod def from_dict( @@ -61,44 +61,7 @@ class PipelockRoutePolicy: ) -> "PipelockRoutePolicy": label = f"bottle '{bottle_name}' egress.routes[{idx}] pipelock" d = as_json_object(raw, label) - for k in d: - if k not in ("tls_passthrough", "ssrf_ip_allowlist"): - raise ManifestError( - f"{label} has unknown key {k!r}; " - f"only 'tls_passthrough' and 'ssrf_ip_allowlist' " - f"are accepted" - ) - tls_passthrough_raw = d.get("tls_passthrough", False) - if not isinstance(tls_passthrough_raw, bool): - raise ManifestError( - f"{label}.tls_passthrough must be a boolean " - f"(was {type(tls_passthrough_raw).__name__})" - ) - ssrf_raw = d.get("ssrf_ip_allowlist", []) - if not isinstance(ssrf_raw, list): - raise ManifestError( - f"{label}.ssrf_ip_allowlist must be an array " - f"(was {type(ssrf_raw).__name__})" - ) - ssrf_ip_allowlist: list[str] = [] - for j, item in enumerate(ssrf_raw): - if not isinstance(item, str) or not item: - raise ManifestError( - f"{label}.ssrf_ip_allowlist[{j}] must be a non-empty " - f"string (was {type(item).__name__})" - ) - try: - ipaddress.ip_network(item, strict=False) - except ValueError as e: - raise ManifestError( - f"{label}.ssrf_ip_allowlist[{j}] must be an IP address " - f"or CIDR (was {item!r}): {e}" - ) from e - ssrf_ip_allowlist.append(item) - return cls( - TlsPassthrough=tls_passthrough_raw, - SsrfIpAllowlist=tuple(ssrf_ip_allowlist), - ) + return cls(Config=d) @dataclass(frozen=True) diff --git a/bot_bottle/pipelock.py b/bot_bottle/pipelock.py index c9ea82d..e8e21ea 100644 --- a/bot_bottle/pipelock.py +++ b/bot_bottle/pipelock.py @@ -132,8 +132,11 @@ def pipelock_effective_ssrf_ip_allowlist( """ seen: dict[str, None] = {ip: None for ip in extra} for route in bottle.egress.routes: - for ip in route.Pipelock.SsrfIpAllowlist: - seen.setdefault(ip, None) + ssrf_raw = route.Pipelock.Config.get("ssrf_ip_allowlist", []) + if isinstance(ssrf_raw, list): + for ip in ssrf_raw: + if isinstance(ip, str): + seen.setdefault(ip, None) return sorted(seen.keys()) @@ -220,6 +223,15 @@ def pipelock_build_config( ) if effective_ssrf_ip_allowlist: cfg["ssrf"] = {"ip_allowlist": effective_ssrf_ip_allowlist} + + # Merge per-route pipelock config (e.g., response_body_scanning settings). + # Routes can specify arbitrary pipelock options that apply globally. + for route in bottle.egress.routes: + for key, value in route.Pipelock.Config.items(): + if key not in ("tls_passthrough", "ssrf_ip_allowlist"): + if key not in cfg: + cfg[key] = value + return cfg diff --git a/tests/unit/test_manifest_egress.py b/tests/unit/test_manifest_egress.py index caf6cc4..6803a15 100644 --- a/tests/unit/test_manifest_egress.py +++ b/tests/unit/test_manifest_egress.py @@ -225,7 +225,7 @@ class TestPipelockPolicy(unittest.TestCase): "host": "api.openai.com", "pipelock": {"tls_passthrough": True}, }]) - self.assertTrue(b.egress.routes[0].Pipelock.TlsPassthrough) + self.assertTrue(b.egress.routes[0].Pipelock.Config["tls_passthrough"]) def test_ssrf_ip_allowlist_route_policy(self): b = _bottle([{ @@ -233,44 +233,28 @@ class TestPipelockPolicy(unittest.TestCase): "pipelock": {"ssrf_ip_allowlist": ["100.78.141.42/32"]}, }]) self.assertEqual( - ("100.78.141.42/32",), - b.egress.routes[0].Pipelock.SsrfIpAllowlist, + ["100.78.141.42/32"], + b.egress.routes[0].Pipelock.Config["ssrf_ip_allowlist"], ) - def test_tls_passthrough_defaults_false(self): + def test_skip_scan_for_extensions_route_policy(self): + b = _bottle([{ + "host": "files.pythonhosted.org", + "pipelock": {"skip_scan_for_extensions": [".whl", ".tar.gz"]}, + }]) + self.assertEqual( + [".whl", ".tar.gz"], + b.egress.routes[0].Pipelock.Config["skip_scan_for_extensions"], + ) + + def test_empty_config_when_pipelock_omitted(self): b = _bottle([{"host": "api.openai.com"}]) - self.assertFalse(b.egress.routes[0].Pipelock.TlsPassthrough) - self.assertEqual((), b.egress.routes[0].Pipelock.SsrfIpAllowlist) + self.assertEqual({}, b.egress.routes[0].Pipelock.Config) def test_pipelock_policy_must_be_object(self): with self.assertRaises(ManifestError): _bottle([{"host": "x.example", "pipelock": True}]) - def test_tls_passthrough_must_be_bool(self): - with self.assertRaises(ManifestError): - _bottle([{ - "host": "x.example", - "pipelock": {"tls_passthrough": "yes"}, - }]) - - def test_ssrf_ip_allowlist_must_be_array(self): - with self.assertRaises(ManifestError): - _bottle([{ - "host": "x.example", - "pipelock": {"ssrf_ip_allowlist": "100.78.141.42/32"}, - }]) - - def test_ssrf_ip_allowlist_items_must_be_cidr_or_ip(self): - with self.assertRaises(ManifestError): - _bottle([{ - "host": "x.example", - "pipelock": {"ssrf_ip_allowlist": ["not-an-ip"]}, - }]) - - def test_unknown_pipelock_key_rejected(self): - with self.assertRaises(ManifestError): - _bottle([{"host": "x.example", "pipelock": {"wat": True}}]) - class TestRouteValidation(unittest.TestCase): def test_duplicate_hosts_rejected(self):