fix(pipelock): validate yaml render config

This commit is contained in:
2026-06-02 08:14:48 +00:00
parent 9185c145a1
commit 6e954da9b7
3 changed files with 312 additions and 13 deletions
+107
View File
@@ -19,6 +19,7 @@ from bot_bottle.pipelock import (
pipelock_build_config,
pipelock_render_yaml,
)
from bot_bottle.yaml_subset import parse_yaml_subset
from tests.fixtures import fixture_minimal
@@ -158,6 +159,51 @@ class TestRenderAndWrite(unittest.TestCase):
import shutil
shutil.rmtree(self.out_dir, ignore_errors=True)
def assert_render_semantics_match(self, cfg: dict[str, object]) -> None:
parsed = parse_yaml_subset(pipelock_render_yaml(cfg))
self.assertEqual(cfg["version"], parsed["version"])
self.assertEqual(cfg["mode"], parsed["mode"])
self.assertEqual(cfg["enforce"], parsed["enforce"])
parsed_allowlist = parsed["api_allowlist"]
if cfg["api_allowlist"] == [] and parsed_allowlist is None:
parsed_allowlist = []
self.assertEqual(cfg["api_allowlist"], parsed_allowlist)
self.assertEqual(cfg["forward_proxy"], parsed["forward_proxy"])
self.assertEqual(cfg["dlp"], parsed["dlp"])
self.assertEqual(
cfg["request_body_scanning"],
parsed["request_body_scanning"],
)
if "seed_phrase_detection" in cfg:
self.assertEqual(
cfg["seed_phrase_detection"],
parsed["seed_phrase_detection"],
)
else:
self.assertNotIn("seed_phrase_detection", parsed)
if "tls_interception" in cfg:
expected_tls = cast(dict[str, object], cfg["tls_interception"])
actual_tls = cast(dict[str, object], parsed["tls_interception"])
self.assertEqual(expected_tls["enabled"], actual_tls["enabled"])
self.assertEqual(expected_tls["ca_cert"], actual_tls["ca_cert"])
self.assertEqual(expected_tls["ca_key"], actual_tls["ca_key"])
expected_passthrough = expected_tls["passthrough_domains"]
if expected_passthrough:
self.assertEqual(
expected_passthrough,
actual_tls["passthrough_domains"],
)
else:
self.assertNotIn("passthrough_domains", actual_tls)
else:
self.assertNotIn("tls_interception", parsed)
if "ssrf" in cfg:
self.assertEqual(cfg["ssrf"], parsed["ssrf"])
else:
self.assertNotIn("ssrf", parsed)
def test_render_emits_required_top_level_keys(self):
"""One render-level smoke check: the serialized YAML is plausibly
the shape pipelock expects. We don't grep every key here — that's
@@ -175,6 +221,67 @@ class TestRenderAndWrite(unittest.TestCase):
self.assertNotIn("trusted_domains:", text)
self.assertNotIn("ssrf:", text)
def test_render_semantics_match_minimal_config(self):
cfg = pipelock_build_config(fixture_minimal().bottles["dev"])
self.assert_render_semantics_match(cfg)
def test_render_semantics_match_tls_with_empty_passthrough(self):
cfg = pipelock_build_config(
fixture_minimal().bottles["dev"],
ca_cert_path="/etc/pipelock-ca.pem",
ca_key_path="/etc/pipelock-ca-key.pem",
)
self.assert_render_semantics_match(cfg)
def test_render_semantics_match_all_optional_sections(self):
bottle = Manifest.from_json_obj({
"bottles": {"dev": {"egress": {"routes": [
{"host": "api.openai.com",
"pipelock": {"tls_passthrough": True}},
{"host": "gitea.dideric.is",
"pipelock": {"ssrf_ip_allowlist": ["100.78.141.42/32"]}},
]}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
}).bottles["dev"]
cfg = pipelock_build_config(
bottle,
ca_cert_path="/etc/pipelock-ca.pem",
ca_key_path="/etc/pipelock-ca-key.pem",
ssrf_ip_allowlist=("172.20.0.0/16",),
)
self.assert_render_semantics_match(cfg)
def test_render_rejects_missing_required_key(self):
cfg = pipelock_build_config(fixture_minimal().bottles["dev"])
del cfg["mode"]
with self.assertRaisesRegex(ValueError, r"config\.mode"):
pipelock_render_yaml(cfg)
def test_render_rejects_wrong_section_type(self):
cfg = pipelock_build_config(fixture_minimal().bottles["dev"])
cfg["dlp"] = []
with self.assertRaisesRegex(ValueError, r"config\.dlp.*mapping"):
pipelock_render_yaml(cfg)
def test_render_rejects_wrong_list_item_type(self):
cfg = pipelock_build_config(
fixture_minimal().bottles["dev"],
ca_cert_path="/etc/pipelock-ca.pem",
ca_key_path="/etc/pipelock-ca-key.pem",
)
tls = cast(dict[str, object], cfg["tls_interception"])
tls["passthrough_domains"] = ["api.openai.com", 3]
with self.assertRaisesRegex(
ValueError, r"tls_interception\.passthrough_domains",
):
pipelock_render_yaml(cfg)
def test_render_rejects_unsupported_top_level_section(self):
cfg = pipelock_build_config(fixture_minimal().bottles["dev"])
cfg["trusted_domains"] = []
with self.assertRaisesRegex(ValueError, r"config\.trusted_domains"):
pipelock_render_yaml(cfg)
def test_prepare_writes_file_at_mode_600(self):
plan = PipelockProxy().prepare(
fixture_minimal().bottles["dev"], "demo", self.out_dir