test: update PipelockRoutePolicy tests for Config dict design
lint / lint (push) Successful in 1m29s
test / unit (pull_request) Successful in 37s
test / integration (pull_request) Successful in 49s

Replace typed-attribute assertions (TlsPassthrough, SsrfIpAllowlist)
with Config dict lookups, drop the four strict-validation tests that
were intentionally removed in the refactor, and add a
skip_scan_for_extensions test to cover the PR's stated new feature.
This commit is contained in:
2026-06-04 17:22:44 +00:00
parent d90b04d343
commit dee3600400
+15 -31
View File
@@ -225,7 +225,7 @@ class TestPipelockPolicy(unittest.TestCase):
"host": "api.openai.com", "host": "api.openai.com",
"pipelock": {"tls_passthrough": True}, "pipelock": {"tls_passthrough": True},
}]) }])
self.assertTrue(b.egress.routes[0].Pipelock.TlsPassthrough) self.assertTrue(b.egress.routes[0].Pipelock.Config["tls_passthrough"])
def test_ssrf_ip_allowlist_route_policy(self): def test_ssrf_ip_allowlist_route_policy(self):
b = _bottle([{ b = _bottle([{
@@ -233,44 +233,28 @@ class TestPipelockPolicy(unittest.TestCase):
"pipelock": {"ssrf_ip_allowlist": ["100.78.141.42/32"]}, "pipelock": {"ssrf_ip_allowlist": ["100.78.141.42/32"]},
}]) }])
self.assertEqual( self.assertEqual(
("100.78.141.42/32",), ["100.78.141.42/32"],
b.egress.routes[0].Pipelock.SsrfIpAllowlist, b.egress.routes[0].Pipelock.Config["ssrf_ip_allowlist"],
) )
def test_tls_passthrough_defaults_false(self): def test_skip_scan_for_extensions_route_policy(self):
b = _bottle([{
"host": "files.pythonhosted.org",
"pipelock": {"skip_scan_for_extensions": [".whl", ".tar.gz"]},
}])
self.assertEqual(
[".whl", ".tar.gz"],
b.egress.routes[0].Pipelock.Config["skip_scan_for_extensions"],
)
def test_empty_config_when_pipelock_omitted(self):
b = _bottle([{"host": "api.openai.com"}]) b = _bottle([{"host": "api.openai.com"}])
self.assertFalse(b.egress.routes[0].Pipelock.TlsPassthrough) self.assertEqual({}, b.egress.routes[0].Pipelock.Config)
self.assertEqual((), b.egress.routes[0].Pipelock.SsrfIpAllowlist)
def test_pipelock_policy_must_be_object(self): def test_pipelock_policy_must_be_object(self):
with self.assertRaises(ManifestError): with self.assertRaises(ManifestError):
_bottle([{"host": "x.example", "pipelock": True}]) _bottle([{"host": "x.example", "pipelock": True}])
def test_tls_passthrough_must_be_bool(self):
with self.assertRaises(ManifestError):
_bottle([{
"host": "x.example",
"pipelock": {"tls_passthrough": "yes"},
}])
def test_ssrf_ip_allowlist_must_be_array(self):
with self.assertRaises(ManifestError):
_bottle([{
"host": "x.example",
"pipelock": {"ssrf_ip_allowlist": "100.78.141.42/32"},
}])
def test_ssrf_ip_allowlist_items_must_be_cidr_or_ip(self):
with self.assertRaises(ManifestError):
_bottle([{
"host": "x.example",
"pipelock": {"ssrf_ip_allowlist": ["not-an-ip"]},
}])
def test_unknown_pipelock_key_rejected(self):
with self.assertRaises(ManifestError):
_bottle([{"host": "x.example", "pipelock": {"wat": True}}])
class TestRouteValidation(unittest.TestCase): class TestRouteValidation(unittest.TestCase):
def test_duplicate_hosts_rejected(self): def test_duplicate_hosts_rejected(self):