Compare commits

...

3 Commits

Author SHA1 Message Date
didericis-claude dee3600400 test: update PipelockRoutePolicy tests for Config dict design
lint / lint (push) Successful in 1m29s
test / unit (pull_request) Successful in 37s
test / integration (pull_request) Successful in 49s
Replace typed-attribute assertions (TlsPassthrough, SsrfIpAllowlist)
with Config dict lookups, drop the four strict-validation tests that
were intentionally removed in the refactor, and add a
skip_scan_for_extensions test to cover the PR's stated new feature.
2026-06-04 17:22:44 +00:00
didericis d90b04d343 feat: add generic pipelock config merging for future extensibility
lint / lint (push) Failing after 1m26s
test / unit (pull_request) Failing after 36s
test / integration (pull_request) Successful in 44s
- Merge arbitrary pipelock settings from routes into global config
- Allows routes to configure new pipelock options without code changes
- Special-case tls_passthrough and ssrf_ip_allowlist (already aggregated)

Note: Pipelock doesn't currently support per-path/per-host response
scanning rules or response size limits, so response_body_scanning config
is not yet usable. For now, use tls_passthrough for binary download hosts.

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-06-04 13:14:59 -04:00
didericis 8601c686f3 feat: forward pipelock config dict instead of parsing individual fields
lint / lint (push) Failing after 1m32s
test / unit (pull_request) Failing after 37s
test / integration (pull_request) Successful in 42s
- Change PipelockRoutePolicy to store raw pipelock config dict instead
  of individual coerced fields (TlsPassthrough, SsrfIpAllowlist)
- Update pipelock.py and egress.py to extract values from Config dict
- Simplifies manifest validation: pipelock handles its own schema
- Enables new pipelock options like skip_scan_for_extensions without
  updating bot-bottle code

This allows bottles to configure pipelock directly, e.g.:

  pipelock:
    skip_scan_for_extensions: [".whl", ".tar.gz"]

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-06-04 13:04:12 -04:00
4 changed files with 42 additions and 81 deletions
+3 -1
View File
@@ -141,13 +141,15 @@ def egress_manifest_routes(
routes are merged.""" routes are merged."""
out: list[EgressRoute] = [] out: list[EgressRoute] = []
for r in bottle.egress.routes: for r in bottle.egress.routes:
tls_pt = r.Pipelock.Config.get("tls_passthrough", False)
tls_passthrough = tls_pt if isinstance(tls_pt, bool) else False
out.append(EgressRoute( out.append(EgressRoute(
host=r.Host, host=r.Host,
path_allowlist=r.PathAllowlist, path_allowlist=r.PathAllowlist,
auth_scheme=r.AuthScheme, auth_scheme=r.AuthScheme,
token_ref=r.TokenRef, token_ref=r.TokenRef,
roles=r.Role, roles=r.Role,
tls_passthrough=r.Pipelock.TlsPassthrough, tls_passthrough=tls_passthrough,
)) ))
return tuple(out) return tuple(out)
+10 -47
View File
@@ -2,7 +2,6 @@
from __future__ import annotations from __future__ import annotations
import ipaddress
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import cast from typing import cast
@@ -43,17 +42,18 @@ def validate_egress_routes(
class PipelockRoutePolicy: class PipelockRoutePolicy:
"""Per-route pipelock policy overrides. """Per-route pipelock policy overrides.
`TlsPassthrough` adds the route host to pipelock's Stores raw pipelock configuration that's passed through to the
`tls_interception.passthrough_domains`, so pipelock still enforces pipelock sidecar. Pipelock validates all config options, so
the hostname allowlist but does not MITM/decrypt request bodies or bot-bottle forwards manifest settings without coercion or strict
headers for that host. validation. Supported options include:
`SsrfIpAllowlist` adds explicit IPs/CIDRs to pipelock's SSRF - `tls_passthrough`: bool — skip TLS MITM for this host
allowlist for private/internal destinations behind this route. - `ssrf_ip_allowlist`: list of CIDR/IP — allow private destinations
- `skip_scan_for_extensions`: list of file extensions to skip DLP
scanning for (e.g., [".whl", ".tar.gz"])
""" """
TlsPassthrough: bool = False Config: dict[str, object] = field(default_factory=dict)
SsrfIpAllowlist: tuple[str, ...] = ()
@classmethod @classmethod
def from_dict( def from_dict(
@@ -61,44 +61,7 @@ class PipelockRoutePolicy:
) -> "PipelockRoutePolicy": ) -> "PipelockRoutePolicy":
label = f"bottle '{bottle_name}' egress.routes[{idx}] pipelock" label = f"bottle '{bottle_name}' egress.routes[{idx}] pipelock"
d = as_json_object(raw, label) d = as_json_object(raw, label)
for k in d: return cls(Config=d)
if k not in ("tls_passthrough", "ssrf_ip_allowlist"):
raise ManifestError(
f"{label} has unknown key {k!r}; "
f"only 'tls_passthrough' and 'ssrf_ip_allowlist' "
f"are accepted"
)
tls_passthrough_raw = d.get("tls_passthrough", False)
if not isinstance(tls_passthrough_raw, bool):
raise ManifestError(
f"{label}.tls_passthrough must be a boolean "
f"(was {type(tls_passthrough_raw).__name__})"
)
ssrf_raw = d.get("ssrf_ip_allowlist", [])
if not isinstance(ssrf_raw, list):
raise ManifestError(
f"{label}.ssrf_ip_allowlist must be an array "
f"(was {type(ssrf_raw).__name__})"
)
ssrf_ip_allowlist: list[str] = []
for j, item in enumerate(ssrf_raw):
if not isinstance(item, str) or not item:
raise ManifestError(
f"{label}.ssrf_ip_allowlist[{j}] must be a non-empty "
f"string (was {type(item).__name__})"
)
try:
ipaddress.ip_network(item, strict=False)
except ValueError as e:
raise ManifestError(
f"{label}.ssrf_ip_allowlist[{j}] must be an IP address "
f"or CIDR (was {item!r}): {e}"
) from e
ssrf_ip_allowlist.append(item)
return cls(
TlsPassthrough=tls_passthrough_raw,
SsrfIpAllowlist=tuple(ssrf_ip_allowlist),
)
@dataclass(frozen=True) @dataclass(frozen=True)
+14 -2
View File
@@ -132,8 +132,11 @@ def pipelock_effective_ssrf_ip_allowlist(
""" """
seen: dict[str, None] = {ip: None for ip in extra} seen: dict[str, None] = {ip: None for ip in extra}
for route in bottle.egress.routes: for route in bottle.egress.routes:
for ip in route.Pipelock.SsrfIpAllowlist: ssrf_raw = route.Pipelock.Config.get("ssrf_ip_allowlist", [])
seen.setdefault(ip, None) if isinstance(ssrf_raw, list):
for ip in ssrf_raw:
if isinstance(ip, str):
seen.setdefault(ip, None)
return sorted(seen.keys()) return sorted(seen.keys())
@@ -220,6 +223,15 @@ def pipelock_build_config(
) )
if effective_ssrf_ip_allowlist: if effective_ssrf_ip_allowlist:
cfg["ssrf"] = {"ip_allowlist": effective_ssrf_ip_allowlist} cfg["ssrf"] = {"ip_allowlist": effective_ssrf_ip_allowlist}
# Merge per-route pipelock config (e.g., response_body_scanning settings).
# Routes can specify arbitrary pipelock options that apply globally.
for route in bottle.egress.routes:
for key, value in route.Pipelock.Config.items():
if key not in ("tls_passthrough", "ssrf_ip_allowlist"):
if key not in cfg:
cfg[key] = value
return cfg return cfg
+15 -31
View File
@@ -225,7 +225,7 @@ class TestPipelockPolicy(unittest.TestCase):
"host": "api.openai.com", "host": "api.openai.com",
"pipelock": {"tls_passthrough": True}, "pipelock": {"tls_passthrough": True},
}]) }])
self.assertTrue(b.egress.routes[0].Pipelock.TlsPassthrough) self.assertTrue(b.egress.routes[0].Pipelock.Config["tls_passthrough"])
def test_ssrf_ip_allowlist_route_policy(self): def test_ssrf_ip_allowlist_route_policy(self):
b = _bottle([{ b = _bottle([{
@@ -233,44 +233,28 @@ class TestPipelockPolicy(unittest.TestCase):
"pipelock": {"ssrf_ip_allowlist": ["100.78.141.42/32"]}, "pipelock": {"ssrf_ip_allowlist": ["100.78.141.42/32"]},
}]) }])
self.assertEqual( self.assertEqual(
("100.78.141.42/32",), ["100.78.141.42/32"],
b.egress.routes[0].Pipelock.SsrfIpAllowlist, b.egress.routes[0].Pipelock.Config["ssrf_ip_allowlist"],
) )
def test_tls_passthrough_defaults_false(self): def test_skip_scan_for_extensions_route_policy(self):
b = _bottle([{
"host": "files.pythonhosted.org",
"pipelock": {"skip_scan_for_extensions": [".whl", ".tar.gz"]},
}])
self.assertEqual(
[".whl", ".tar.gz"],
b.egress.routes[0].Pipelock.Config["skip_scan_for_extensions"],
)
def test_empty_config_when_pipelock_omitted(self):
b = _bottle([{"host": "api.openai.com"}]) b = _bottle([{"host": "api.openai.com"}])
self.assertFalse(b.egress.routes[0].Pipelock.TlsPassthrough) self.assertEqual({}, b.egress.routes[0].Pipelock.Config)
self.assertEqual((), b.egress.routes[0].Pipelock.SsrfIpAllowlist)
def test_pipelock_policy_must_be_object(self): def test_pipelock_policy_must_be_object(self):
with self.assertRaises(ManifestError): with self.assertRaises(ManifestError):
_bottle([{"host": "x.example", "pipelock": True}]) _bottle([{"host": "x.example", "pipelock": True}])
def test_tls_passthrough_must_be_bool(self):
with self.assertRaises(ManifestError):
_bottle([{
"host": "x.example",
"pipelock": {"tls_passthrough": "yes"},
}])
def test_ssrf_ip_allowlist_must_be_array(self):
with self.assertRaises(ManifestError):
_bottle([{
"host": "x.example",
"pipelock": {"ssrf_ip_allowlist": "100.78.141.42/32"},
}])
def test_ssrf_ip_allowlist_items_must_be_cidr_or_ip(self):
with self.assertRaises(ManifestError):
_bottle([{
"host": "x.example",
"pipelock": {"ssrf_ip_allowlist": ["not-an-ip"]},
}])
def test_unknown_pipelock_key_rejected(self):
with self.assertRaises(ManifestError):
_bottle([{"host": "x.example", "pipelock": {"wat": True}}])
class TestRouteValidation(unittest.TestCase): class TestRouteValidation(unittest.TestCase):
def test_duplicate_hosts_rejected(self): def test_duplicate_hosts_rejected(self):