diff --git a/bot_bottle/egress.py b/bot_bottle/egress.py index d5f546e..e9160a5 100644 --- a/bot_bottle/egress.py +++ b/bot_bottle/egress.py @@ -141,13 +141,15 @@ def egress_manifest_routes( routes are merged.""" out: list[EgressRoute] = [] for r in bottle.egress.routes: + tls_pt = r.Pipelock.Config.get("tls_passthrough", False) + tls_passthrough = tls_pt if isinstance(tls_pt, bool) else False out.append(EgressRoute( host=r.Host, path_allowlist=r.PathAllowlist, auth_scheme=r.AuthScheme, token_ref=r.TokenRef, roles=r.Role, - tls_passthrough=r.Pipelock.TlsPassthrough, + tls_passthrough=tls_passthrough, )) return tuple(out) diff --git a/bot_bottle/manifest_egress.py b/bot_bottle/manifest_egress.py index 24a6b67..6da4602 100644 --- a/bot_bottle/manifest_egress.py +++ b/bot_bottle/manifest_egress.py @@ -2,7 +2,6 @@ from __future__ import annotations -import ipaddress from dataclasses import dataclass, field from typing import cast @@ -43,17 +42,18 @@ def validate_egress_routes( class PipelockRoutePolicy: """Per-route pipelock policy overrides. - `TlsPassthrough` adds the route host to pipelock's - `tls_interception.passthrough_domains`, so pipelock still enforces - the hostname allowlist but does not MITM/decrypt request bodies or - headers for that host. + Stores raw pipelock configuration that's passed through to the + pipelock sidecar. Pipelock validates all config options, so + bot-bottle forwards manifest settings without coercion or strict + validation. Supported options include: - `SsrfIpAllowlist` adds explicit IPs/CIDRs to pipelock's SSRF - allowlist for private/internal destinations behind this route. + - `tls_passthrough`: bool — skip TLS MITM for this host + - `ssrf_ip_allowlist`: list of CIDR/IP — allow private destinations + - `skip_scan_for_extensions`: list of file extensions to skip DLP + scanning for (e.g., [".whl", ".tar.gz"]) """ - TlsPassthrough: bool = False - SsrfIpAllowlist: tuple[str, ...] = () + Config: dict[str, object] = field(default_factory=dict) @classmethod def from_dict( @@ -61,44 +61,7 @@ class PipelockRoutePolicy: ) -> "PipelockRoutePolicy": label = f"bottle '{bottle_name}' egress.routes[{idx}] pipelock" d = as_json_object(raw, label) - for k in d: - if k not in ("tls_passthrough", "ssrf_ip_allowlist"): - raise ManifestError( - f"{label} has unknown key {k!r}; " - f"only 'tls_passthrough' and 'ssrf_ip_allowlist' " - f"are accepted" - ) - tls_passthrough_raw = d.get("tls_passthrough", False) - if not isinstance(tls_passthrough_raw, bool): - raise ManifestError( - f"{label}.tls_passthrough must be a boolean " - f"(was {type(tls_passthrough_raw).__name__})" - ) - ssrf_raw = d.get("ssrf_ip_allowlist", []) - if not isinstance(ssrf_raw, list): - raise ManifestError( - f"{label}.ssrf_ip_allowlist must be an array " - f"(was {type(ssrf_raw).__name__})" - ) - ssrf_ip_allowlist: list[str] = [] - for j, item in enumerate(ssrf_raw): - if not isinstance(item, str) or not item: - raise ManifestError( - f"{label}.ssrf_ip_allowlist[{j}] must be a non-empty " - f"string (was {type(item).__name__})" - ) - try: - ipaddress.ip_network(item, strict=False) - except ValueError as e: - raise ManifestError( - f"{label}.ssrf_ip_allowlist[{j}] must be an IP address " - f"or CIDR (was {item!r}): {e}" - ) from e - ssrf_ip_allowlist.append(item) - return cls( - TlsPassthrough=tls_passthrough_raw, - SsrfIpAllowlist=tuple(ssrf_ip_allowlist), - ) + return cls(Config=d) @dataclass(frozen=True) diff --git a/bot_bottle/pipelock.py b/bot_bottle/pipelock.py index c9ea82d..bf5b607 100644 --- a/bot_bottle/pipelock.py +++ b/bot_bottle/pipelock.py @@ -132,8 +132,11 @@ def pipelock_effective_ssrf_ip_allowlist( """ seen: dict[str, None] = {ip: None for ip in extra} for route in bottle.egress.routes: - for ip in route.Pipelock.SsrfIpAllowlist: - seen.setdefault(ip, None) + ssrf_raw = route.Pipelock.Config.get("ssrf_ip_allowlist", []) + if isinstance(ssrf_raw, list): + for ip in ssrf_raw: + if isinstance(ip, str): + seen.setdefault(ip, None) return sorted(seen.keys())