feat: support pipelock skip_scan_for_extensions config #191

Closed
didericis-claude wants to merge 3 commits from feat/pipelock-skip-scan-extensions into main
4 changed files with 42 additions and 81 deletions
+3 -1
View File
@@ -141,13 +141,15 @@ def egress_manifest_routes(
routes are merged.""" routes are merged."""
out: list[EgressRoute] = [] out: list[EgressRoute] = []
for r in bottle.egress.routes: for r in bottle.egress.routes:
tls_pt = r.Pipelock.Config.get("tls_passthrough", False)
tls_passthrough = tls_pt if isinstance(tls_pt, bool) else False
out.append(EgressRoute( out.append(EgressRoute(
host=r.Host, host=r.Host,
path_allowlist=r.PathAllowlist, path_allowlist=r.PathAllowlist,
auth_scheme=r.AuthScheme, auth_scheme=r.AuthScheme,
token_ref=r.TokenRef, token_ref=r.TokenRef,
roles=r.Role, roles=r.Role,
tls_passthrough=r.Pipelock.TlsPassthrough, tls_passthrough=tls_passthrough,
)) ))
return tuple(out) return tuple(out)
+10 -47
View File
@@ -2,7 +2,6 @@
from __future__ import annotations from __future__ import annotations
import ipaddress
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import cast from typing import cast
@@ -43,17 +42,18 @@ def validate_egress_routes(
class PipelockRoutePolicy: class PipelockRoutePolicy:
"""Per-route pipelock policy overrides. """Per-route pipelock policy overrides.
`TlsPassthrough` adds the route host to pipelock's Stores raw pipelock configuration that's passed through to the
`tls_interception.passthrough_domains`, so pipelock still enforces pipelock sidecar. Pipelock validates all config options, so
the hostname allowlist but does not MITM/decrypt request bodies or bot-bottle forwards manifest settings without coercion or strict
headers for that host. validation. Supported options include:
`SsrfIpAllowlist` adds explicit IPs/CIDRs to pipelock's SSRF - `tls_passthrough`: bool — skip TLS MITM for this host
allowlist for private/internal destinations behind this route. - `ssrf_ip_allowlist`: list of CIDR/IP — allow private destinations
- `skip_scan_for_extensions`: list of file extensions to skip DLP
scanning for (e.g., [".whl", ".tar.gz"])
""" """
TlsPassthrough: bool = False Config: dict[str, object] = field(default_factory=dict)
SsrfIpAllowlist: tuple[str, ...] = ()
@classmethod @classmethod
def from_dict( def from_dict(
@@ -61,44 +61,7 @@ class PipelockRoutePolicy:
) -> "PipelockRoutePolicy": ) -> "PipelockRoutePolicy":
label = f"bottle '{bottle_name}' egress.routes[{idx}] pipelock" label = f"bottle '{bottle_name}' egress.routes[{idx}] pipelock"
d = as_json_object(raw, label) d = as_json_object(raw, label)
for k in d: return cls(Config=d)
if k not in ("tls_passthrough", "ssrf_ip_allowlist"):
raise ManifestError(
f"{label} has unknown key {k!r}; "
f"only 'tls_passthrough' and 'ssrf_ip_allowlist' "
f"are accepted"
)
tls_passthrough_raw = d.get("tls_passthrough", False)
if not isinstance(tls_passthrough_raw, bool):
raise ManifestError(
f"{label}.tls_passthrough must be a boolean "
f"(was {type(tls_passthrough_raw).__name__})"
)
ssrf_raw = d.get("ssrf_ip_allowlist", [])
if not isinstance(ssrf_raw, list):
raise ManifestError(
f"{label}.ssrf_ip_allowlist must be an array "
f"(was {type(ssrf_raw).__name__})"
)
ssrf_ip_allowlist: list[str] = []
for j, item in enumerate(ssrf_raw):
if not isinstance(item, str) or not item:
raise ManifestError(
f"{label}.ssrf_ip_allowlist[{j}] must be a non-empty "
f"string (was {type(item).__name__})"
)
try:
ipaddress.ip_network(item, strict=False)
except ValueError as e:
raise ManifestError(
f"{label}.ssrf_ip_allowlist[{j}] must be an IP address "
f"or CIDR (was {item!r}): {e}"
) from e
ssrf_ip_allowlist.append(item)
return cls(
TlsPassthrough=tls_passthrough_raw,
SsrfIpAllowlist=tuple(ssrf_ip_allowlist),
)
@dataclass(frozen=True) @dataclass(frozen=True)
+14 -2
View File
@@ -132,8 +132,11 @@ def pipelock_effective_ssrf_ip_allowlist(
""" """
seen: dict[str, None] = {ip: None for ip in extra} seen: dict[str, None] = {ip: None for ip in extra}
for route in bottle.egress.routes: for route in bottle.egress.routes:
for ip in route.Pipelock.SsrfIpAllowlist: ssrf_raw = route.Pipelock.Config.get("ssrf_ip_allowlist", [])
seen.setdefault(ip, None) if isinstance(ssrf_raw, list):
for ip in ssrf_raw:
if isinstance(ip, str):
seen.setdefault(ip, None)
return sorted(seen.keys()) return sorted(seen.keys())
@@ -220,6 +223,15 @@ def pipelock_build_config(
) )
if effective_ssrf_ip_allowlist: if effective_ssrf_ip_allowlist:
cfg["ssrf"] = {"ip_allowlist": effective_ssrf_ip_allowlist} cfg["ssrf"] = {"ip_allowlist": effective_ssrf_ip_allowlist}
# Merge per-route pipelock config (e.g., response_body_scanning settings).
# Routes can specify arbitrary pipelock options that apply globally.
for route in bottle.egress.routes:
for key, value in route.Pipelock.Config.items():
if key not in ("tls_passthrough", "ssrf_ip_allowlist"):
if key not in cfg:
cfg[key] = value
return cfg return cfg
+15 -31
View File
@@ -225,7 +225,7 @@ class TestPipelockPolicy(unittest.TestCase):
"host": "api.openai.com", "host": "api.openai.com",
"pipelock": {"tls_passthrough": True}, "pipelock": {"tls_passthrough": True},
}]) }])
self.assertTrue(b.egress.routes[0].Pipelock.TlsPassthrough) self.assertTrue(b.egress.routes[0].Pipelock.Config["tls_passthrough"])
def test_ssrf_ip_allowlist_route_policy(self): def test_ssrf_ip_allowlist_route_policy(self):
b = _bottle([{ b = _bottle([{
@@ -233,44 +233,28 @@ class TestPipelockPolicy(unittest.TestCase):
"pipelock": {"ssrf_ip_allowlist": ["100.78.141.42/32"]}, "pipelock": {"ssrf_ip_allowlist": ["100.78.141.42/32"]},
}]) }])
self.assertEqual( self.assertEqual(
("100.78.141.42/32",), ["100.78.141.42/32"],
b.egress.routes[0].Pipelock.SsrfIpAllowlist, b.egress.routes[0].Pipelock.Config["ssrf_ip_allowlist"],
) )
def test_tls_passthrough_defaults_false(self): def test_skip_scan_for_extensions_route_policy(self):
b = _bottle([{
"host": "files.pythonhosted.org",
"pipelock": {"skip_scan_for_extensions": [".whl", ".tar.gz"]},
}])
self.assertEqual(
[".whl", ".tar.gz"],
b.egress.routes[0].Pipelock.Config["skip_scan_for_extensions"],
)
def test_empty_config_when_pipelock_omitted(self):
b = _bottle([{"host": "api.openai.com"}]) b = _bottle([{"host": "api.openai.com"}])
self.assertFalse(b.egress.routes[0].Pipelock.TlsPassthrough) self.assertEqual({}, b.egress.routes[0].Pipelock.Config)
self.assertEqual((), b.egress.routes[0].Pipelock.SsrfIpAllowlist)
def test_pipelock_policy_must_be_object(self): def test_pipelock_policy_must_be_object(self):
with self.assertRaises(ManifestError): with self.assertRaises(ManifestError):
_bottle([{"host": "x.example", "pipelock": True}]) _bottle([{"host": "x.example", "pipelock": True}])
def test_tls_passthrough_must_be_bool(self):
with self.assertRaises(ManifestError):
_bottle([{
"host": "x.example",
"pipelock": {"tls_passthrough": "yes"},
}])
def test_ssrf_ip_allowlist_must_be_array(self):
with self.assertRaises(ManifestError):
_bottle([{
"host": "x.example",
"pipelock": {"ssrf_ip_allowlist": "100.78.141.42/32"},
}])
def test_ssrf_ip_allowlist_items_must_be_cidr_or_ip(self):
with self.assertRaises(ManifestError):
_bottle([{
"host": "x.example",
"pipelock": {"ssrf_ip_allowlist": ["not-an-ip"]},
}])
def test_unknown_pipelock_key_rejected(self):
with self.assertRaises(ManifestError):
_bottle([{"host": "x.example", "pipelock": {"wat": True}}])
class TestRouteValidation(unittest.TestCase): class TestRouteValidation(unittest.TestCase):
def test_duplicate_hosts_rejected(self): def test_duplicate_hosts_rejected(self):