feat: forward pipelock config dict instead of parsing individual fields
lint / lint (push) Failing after 1m32s
test / unit (pull_request) Failing after 37s
test / integration (pull_request) Successful in 42s

- Change PipelockRoutePolicy to store raw pipelock config dict instead
  of individual coerced fields (TlsPassthrough, SsrfIpAllowlist)
- Update pipelock.py and egress.py to extract values from Config dict
- Simplifies manifest validation: pipelock handles its own schema
- Enables new pipelock options like skip_scan_for_extensions without
  updating bot-bottle code

This allows bottles to configure pipelock directly, e.g.:

  pipelock:
    skip_scan_for_extensions: [".whl", ".tar.gz"]

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-06-04 13:04:12 -04:00
parent f114c861b4
commit 8601c686f3
3 changed files with 18 additions and 50 deletions
+3 -1
View File
@@ -141,13 +141,15 @@ def egress_manifest_routes(
routes are merged."""
out: list[EgressRoute] = []
for r in bottle.egress.routes:
tls_pt = r.Pipelock.Config.get("tls_passthrough", False)
tls_passthrough = tls_pt if isinstance(tls_pt, bool) else False
out.append(EgressRoute(
host=r.Host,
path_allowlist=r.PathAllowlist,
auth_scheme=r.AuthScheme,
token_ref=r.TokenRef,
roles=r.Role,
tls_passthrough=r.Pipelock.TlsPassthrough,
tls_passthrough=tls_passthrough,
))
return tuple(out)
+10 -47
View File
@@ -2,7 +2,6 @@
from __future__ import annotations
import ipaddress
from dataclasses import dataclass, field
from typing import cast
@@ -43,17 +42,18 @@ def validate_egress_routes(
class PipelockRoutePolicy:
"""Per-route pipelock policy overrides.
`TlsPassthrough` adds the route host to pipelock's
`tls_interception.passthrough_domains`, so pipelock still enforces
the hostname allowlist but does not MITM/decrypt request bodies or
headers for that host.
Stores raw pipelock configuration that's passed through to the
pipelock sidecar. Pipelock validates all config options, so
bot-bottle forwards manifest settings without coercion or strict
validation. Supported options include:
`SsrfIpAllowlist` adds explicit IPs/CIDRs to pipelock's SSRF
allowlist for private/internal destinations behind this route.
- `tls_passthrough`: bool — skip TLS MITM for this host
- `ssrf_ip_allowlist`: list of CIDR/IP — allow private destinations
- `skip_scan_for_extensions`: list of file extensions to skip DLP
scanning for (e.g., [".whl", ".tar.gz"])
"""
TlsPassthrough: bool = False
SsrfIpAllowlist: tuple[str, ...] = ()
Config: dict[str, object] = field(default_factory=dict)
@classmethod
def from_dict(
@@ -61,44 +61,7 @@ class PipelockRoutePolicy:
) -> "PipelockRoutePolicy":
label = f"bottle '{bottle_name}' egress.routes[{idx}] pipelock"
d = as_json_object(raw, label)
for k in d:
if k not in ("tls_passthrough", "ssrf_ip_allowlist"):
raise ManifestError(
f"{label} has unknown key {k!r}; "
f"only 'tls_passthrough' and 'ssrf_ip_allowlist' "
f"are accepted"
)
tls_passthrough_raw = d.get("tls_passthrough", False)
if not isinstance(tls_passthrough_raw, bool):
raise ManifestError(
f"{label}.tls_passthrough must be a boolean "
f"(was {type(tls_passthrough_raw).__name__})"
)
ssrf_raw = d.get("ssrf_ip_allowlist", [])
if not isinstance(ssrf_raw, list):
raise ManifestError(
f"{label}.ssrf_ip_allowlist must be an array "
f"(was {type(ssrf_raw).__name__})"
)
ssrf_ip_allowlist: list[str] = []
for j, item in enumerate(ssrf_raw):
if not isinstance(item, str) or not item:
raise ManifestError(
f"{label}.ssrf_ip_allowlist[{j}] must be a non-empty "
f"string (was {type(item).__name__})"
)
try:
ipaddress.ip_network(item, strict=False)
except ValueError as e:
raise ManifestError(
f"{label}.ssrf_ip_allowlist[{j}] must be an IP address "
f"or CIDR (was {item!r}): {e}"
) from e
ssrf_ip_allowlist.append(item)
return cls(
TlsPassthrough=tls_passthrough_raw,
SsrfIpAllowlist=tuple(ssrf_ip_allowlist),
)
return cls(Config=d)
@dataclass(frozen=True)
+4 -1
View File
@@ -132,7 +132,10 @@ def pipelock_effective_ssrf_ip_allowlist(
"""
seen: dict[str, None] = {ip: None for ip in extra}
for route in bottle.egress.routes:
for ip in route.Pipelock.SsrfIpAllowlist:
ssrf_raw = route.Pipelock.Config.get("ssrf_ip_allowlist", [])
if isinstance(ssrf_raw, list):
for ip in ssrf_raw:
if isinstance(ip, str):
seen.setdefault(ip, None)
return sorted(seen.keys())