Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| dee3600400 | |||
| d90b04d343 | |||
| 8601c686f3 |
@@ -141,13 +141,15 @@ def egress_manifest_routes(
|
||||
routes are merged."""
|
||||
out: list[EgressRoute] = []
|
||||
for r in bottle.egress.routes:
|
||||
tls_pt = r.Pipelock.Config.get("tls_passthrough", False)
|
||||
tls_passthrough = tls_pt if isinstance(tls_pt, bool) else False
|
||||
out.append(EgressRoute(
|
||||
host=r.Host,
|
||||
path_allowlist=r.PathAllowlist,
|
||||
auth_scheme=r.AuthScheme,
|
||||
token_ref=r.TokenRef,
|
||||
roles=r.Role,
|
||||
tls_passthrough=r.Pipelock.TlsPassthrough,
|
||||
tls_passthrough=tls_passthrough,
|
||||
))
|
||||
return tuple(out)
|
||||
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import ipaddress
|
||||
from dataclasses import dataclass, field
|
||||
from typing import cast
|
||||
|
||||
@@ -43,17 +42,18 @@ def validate_egress_routes(
|
||||
class PipelockRoutePolicy:
|
||||
"""Per-route pipelock policy overrides.
|
||||
|
||||
`TlsPassthrough` adds the route host to pipelock's
|
||||
`tls_interception.passthrough_domains`, so pipelock still enforces
|
||||
the hostname allowlist but does not MITM/decrypt request bodies or
|
||||
headers for that host.
|
||||
Stores raw pipelock configuration that's passed through to the
|
||||
pipelock sidecar. Pipelock validates all config options, so
|
||||
bot-bottle forwards manifest settings without coercion or strict
|
||||
validation. Supported options include:
|
||||
|
||||
`SsrfIpAllowlist` adds explicit IPs/CIDRs to pipelock's SSRF
|
||||
allowlist for private/internal destinations behind this route.
|
||||
- `tls_passthrough`: bool — skip TLS MITM for this host
|
||||
- `ssrf_ip_allowlist`: list of CIDR/IP — allow private destinations
|
||||
- `skip_scan_for_extensions`: list of file extensions to skip DLP
|
||||
scanning for (e.g., [".whl", ".tar.gz"])
|
||||
"""
|
||||
|
||||
TlsPassthrough: bool = False
|
||||
SsrfIpAllowlist: tuple[str, ...] = ()
|
||||
Config: dict[str, object] = field(default_factory=dict)
|
||||
|
||||
@classmethod
|
||||
def from_dict(
|
||||
@@ -61,44 +61,7 @@ class PipelockRoutePolicy:
|
||||
) -> "PipelockRoutePolicy":
|
||||
label = f"bottle '{bottle_name}' egress.routes[{idx}] pipelock"
|
||||
d = as_json_object(raw, label)
|
||||
for k in d:
|
||||
if k not in ("tls_passthrough", "ssrf_ip_allowlist"):
|
||||
raise ManifestError(
|
||||
f"{label} has unknown key {k!r}; "
|
||||
f"only 'tls_passthrough' and 'ssrf_ip_allowlist' "
|
||||
f"are accepted"
|
||||
)
|
||||
tls_passthrough_raw = d.get("tls_passthrough", False)
|
||||
if not isinstance(tls_passthrough_raw, bool):
|
||||
raise ManifestError(
|
||||
f"{label}.tls_passthrough must be a boolean "
|
||||
f"(was {type(tls_passthrough_raw).__name__})"
|
||||
)
|
||||
ssrf_raw = d.get("ssrf_ip_allowlist", [])
|
||||
if not isinstance(ssrf_raw, list):
|
||||
raise ManifestError(
|
||||
f"{label}.ssrf_ip_allowlist must be an array "
|
||||
f"(was {type(ssrf_raw).__name__})"
|
||||
)
|
||||
ssrf_ip_allowlist: list[str] = []
|
||||
for j, item in enumerate(ssrf_raw):
|
||||
if not isinstance(item, str) or not item:
|
||||
raise ManifestError(
|
||||
f"{label}.ssrf_ip_allowlist[{j}] must be a non-empty "
|
||||
f"string (was {type(item).__name__})"
|
||||
)
|
||||
try:
|
||||
ipaddress.ip_network(item, strict=False)
|
||||
except ValueError as e:
|
||||
raise ManifestError(
|
||||
f"{label}.ssrf_ip_allowlist[{j}] must be an IP address "
|
||||
f"or CIDR (was {item!r}): {e}"
|
||||
) from e
|
||||
ssrf_ip_allowlist.append(item)
|
||||
return cls(
|
||||
TlsPassthrough=tls_passthrough_raw,
|
||||
SsrfIpAllowlist=tuple(ssrf_ip_allowlist),
|
||||
)
|
||||
return cls(Config=d)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
|
||||
+13
-1
@@ -132,7 +132,10 @@ def pipelock_effective_ssrf_ip_allowlist(
|
||||
"""
|
||||
seen: dict[str, None] = {ip: None for ip in extra}
|
||||
for route in bottle.egress.routes:
|
||||
for ip in route.Pipelock.SsrfIpAllowlist:
|
||||
ssrf_raw = route.Pipelock.Config.get("ssrf_ip_allowlist", [])
|
||||
if isinstance(ssrf_raw, list):
|
||||
for ip in ssrf_raw:
|
||||
if isinstance(ip, str):
|
||||
seen.setdefault(ip, None)
|
||||
return sorted(seen.keys())
|
||||
|
||||
@@ -220,6 +223,15 @@ def pipelock_build_config(
|
||||
)
|
||||
if effective_ssrf_ip_allowlist:
|
||||
cfg["ssrf"] = {"ip_allowlist": effective_ssrf_ip_allowlist}
|
||||
|
||||
# Merge per-route pipelock config (e.g., response_body_scanning settings).
|
||||
# Routes can specify arbitrary pipelock options that apply globally.
|
||||
for route in bottle.egress.routes:
|
||||
for key, value in route.Pipelock.Config.items():
|
||||
if key not in ("tls_passthrough", "ssrf_ip_allowlist"):
|
||||
if key not in cfg:
|
||||
cfg[key] = value
|
||||
|
||||
return cfg
|
||||
|
||||
|
||||
|
||||
@@ -225,7 +225,7 @@ class TestPipelockPolicy(unittest.TestCase):
|
||||
"host": "api.openai.com",
|
||||
"pipelock": {"tls_passthrough": True},
|
||||
}])
|
||||
self.assertTrue(b.egress.routes[0].Pipelock.TlsPassthrough)
|
||||
self.assertTrue(b.egress.routes[0].Pipelock.Config["tls_passthrough"])
|
||||
|
||||
def test_ssrf_ip_allowlist_route_policy(self):
|
||||
b = _bottle([{
|
||||
@@ -233,44 +233,28 @@ class TestPipelockPolicy(unittest.TestCase):
|
||||
"pipelock": {"ssrf_ip_allowlist": ["100.78.141.42/32"]},
|
||||
}])
|
||||
self.assertEqual(
|
||||
("100.78.141.42/32",),
|
||||
b.egress.routes[0].Pipelock.SsrfIpAllowlist,
|
||||
["100.78.141.42/32"],
|
||||
b.egress.routes[0].Pipelock.Config["ssrf_ip_allowlist"],
|
||||
)
|
||||
|
||||
def test_tls_passthrough_defaults_false(self):
|
||||
def test_skip_scan_for_extensions_route_policy(self):
|
||||
b = _bottle([{
|
||||
"host": "files.pythonhosted.org",
|
||||
"pipelock": {"skip_scan_for_extensions": [".whl", ".tar.gz"]},
|
||||
}])
|
||||
self.assertEqual(
|
||||
[".whl", ".tar.gz"],
|
||||
b.egress.routes[0].Pipelock.Config["skip_scan_for_extensions"],
|
||||
)
|
||||
|
||||
def test_empty_config_when_pipelock_omitted(self):
|
||||
b = _bottle([{"host": "api.openai.com"}])
|
||||
self.assertFalse(b.egress.routes[0].Pipelock.TlsPassthrough)
|
||||
self.assertEqual((), b.egress.routes[0].Pipelock.SsrfIpAllowlist)
|
||||
self.assertEqual({}, b.egress.routes[0].Pipelock.Config)
|
||||
|
||||
def test_pipelock_policy_must_be_object(self):
|
||||
with self.assertRaises(ManifestError):
|
||||
_bottle([{"host": "x.example", "pipelock": True}])
|
||||
|
||||
def test_tls_passthrough_must_be_bool(self):
|
||||
with self.assertRaises(ManifestError):
|
||||
_bottle([{
|
||||
"host": "x.example",
|
||||
"pipelock": {"tls_passthrough": "yes"},
|
||||
}])
|
||||
|
||||
def test_ssrf_ip_allowlist_must_be_array(self):
|
||||
with self.assertRaises(ManifestError):
|
||||
_bottle([{
|
||||
"host": "x.example",
|
||||
"pipelock": {"ssrf_ip_allowlist": "100.78.141.42/32"},
|
||||
}])
|
||||
|
||||
def test_ssrf_ip_allowlist_items_must_be_cidr_or_ip(self):
|
||||
with self.assertRaises(ManifestError):
|
||||
_bottle([{
|
||||
"host": "x.example",
|
||||
"pipelock": {"ssrf_ip_allowlist": ["not-an-ip"]},
|
||||
}])
|
||||
|
||||
def test_unknown_pipelock_key_rejected(self):
|
||||
with self.assertRaises(ManifestError):
|
||||
_bottle([{"host": "x.example", "pipelock": {"wat": True}}])
|
||||
|
||||
|
||||
class TestRouteValidation(unittest.TestCase):
|
||||
def test_duplicate_hosts_rejected(self):
|
||||
|
||||
Reference in New Issue
Block a user