fix(pipelock): allow route ssrf ip policy
test / unit (pull_request) Successful in 28s
test / integration (pull_request) Successful in 44s

This commit is contained in:
2026-05-28 19:32:31 -04:00
parent bcadc07d09
commit fed006441d
6 changed files with 134 additions and 6 deletions
+34 -4
View File
@@ -19,7 +19,7 @@ Bottle schema (frontmatter):
remotes: { <host>: <git-entry>, ... } # optional
egress: { routes: [ <egress-route>, ... ] }
# route keys: host, path_allowlist, auth, role, pipelock
# pipelock: { tls_passthrough: <bool> }
# pipelock: { tls_passthrough: <bool>, ssrf_ip_allowlist: [<cidr>, ...] }
supervise: <bool> # optional
Agent schema (frontmatter):
@@ -43,6 +43,7 @@ on-disk files.
from __future__ import annotations
import ipaddress
import json
import os
import re
@@ -329,9 +330,13 @@ class PipelockRoutePolicy:
`tls_interception.passthrough_domains`, so pipelock still enforces
the hostname allowlist but does not MITM/decrypt request bodies or
headers for that host.
`SsrfIpAllowlist` adds explicit IPs/CIDRs to pipelock's SSRF
allowlist for private/internal destinations behind this route.
"""
TlsPassthrough: bool = False
SsrfIpAllowlist: tuple[str, ...] = ()
@classmethod
def from_dict(
@@ -340,10 +345,11 @@ class PipelockRoutePolicy:
label = f"bottle '{bottle_name}' egress.routes[{idx}] pipelock"
d = _as_json_object(raw, label)
for k in d:
if k not in ("tls_passthrough",):
if k not in ("tls_passthrough", "ssrf_ip_allowlist"):
die(
f"{label} has unknown key {k!r}; "
f"only 'tls_passthrough' is accepted"
f"only 'tls_passthrough' and 'ssrf_ip_allowlist' "
f"are accepted"
)
tls_passthrough_raw = d.get("tls_passthrough", False)
if not isinstance(tls_passthrough_raw, bool):
@@ -351,7 +357,31 @@ class PipelockRoutePolicy:
f"{label}.tls_passthrough must be a boolean "
f"(was {type(tls_passthrough_raw).__name__})"
)
return cls(TlsPassthrough=tls_passthrough_raw)
ssrf_raw = d.get("ssrf_ip_allowlist", [])
if not isinstance(ssrf_raw, list):
die(
f"{label}.ssrf_ip_allowlist must be an array "
f"(was {type(ssrf_raw).__name__})"
)
ssrf_ip_allowlist: list[str] = []
for j, item in enumerate(ssrf_raw):
if not isinstance(item, str) or not item:
die(
f"{label}.ssrf_ip_allowlist[{j}] must be a non-empty "
f"string (was {type(item).__name__})"
)
try:
ipaddress.ip_network(item, strict=False)
except ValueError as e:
die(
f"{label}.ssrf_ip_allowlist[{j}] must be an IP address "
f"or CIDR (was {item!r}): {e}"
)
ssrf_ip_allowlist.append(item)
return cls(
TlsPassthrough=tls_passthrough_raw,
SsrfIpAllowlist=tuple(ssrf_ip_allowlist),
)
@dataclass(frozen=True)