diff --git a/bot_bottle/pipelock.py b/bot_bottle/pipelock.py index fcab300..0443d31 100644 --- a/bot_bottle/pipelock.py +++ b/bot_bottle/pipelock.py @@ -19,7 +19,6 @@ from __future__ import annotations from dataclasses import dataclass from pathlib import Path -from typing import cast from .egress import EGRESS_HOSTNAME, EgressRoute, egress_routes_for_bottle from .supervise import SUPERVISE_HOSTNAME @@ -223,6 +222,180 @@ def pipelock_build_config( return cfg +_PIPELOCK_TOP_LEVEL_KEYS = { + "version", + "mode", + "enforce", + "api_allowlist", + "seed_phrase_detection", + "forward_proxy", + "dlp", + "request_body_scanning", + "tls_interception", + "ssrf", +} + + +def _pipelock_render_error(section: str, key: str, expected: str) -> ValueError: + return ValueError( + f"pipelock_render_yaml: {section}.{key} must be {expected}" + ) + + +def _reject_unknown_keys( + section: str, + obj: dict[str, object], + allowed: set[str], +) -> None: + for key in sorted(set(obj) - allowed): + raise ValueError(f"pipelock_render_yaml: {section}.{key} is unsupported") + + +def _required_dict( + obj: dict[str, object], + section: str, + key: str, +) -> dict[str, object]: + value = obj.get(key) + if not isinstance(value, dict): + raise _pipelock_render_error(section, key, "a mapping") + return value + + +def _required_bool(obj: dict[str, object], section: str, key: str) -> bool: + value = obj.get(key) + if not isinstance(value, bool): + raise _pipelock_render_error(section, key, "a boolean") + return value + + +def _required_int(obj: dict[str, object], section: str, key: str) -> int: + value = obj.get(key) + if isinstance(value, bool) or not isinstance(value, int): + raise _pipelock_render_error(section, key, "an integer") + return value + + +def _required_str(obj: dict[str, object], section: str, key: str) -> str: + value = obj.get(key) + if not isinstance(value, str): + raise _pipelock_render_error(section, key, "a string") + return value + + +def _required_str_list( + obj: dict[str, object], + section: str, + key: str, +) -> list[str]: + value = obj.get(key) + if not isinstance(value, list) or not all(isinstance(v, str) for v in value): + raise _pipelock_render_error(section, key, "a list of strings") + return value + + +def _optional_str_list( + obj: dict[str, object], + section: str, + key: str, +) -> list[str]: + if key not in obj: + return [] + return _required_str_list(obj, section, key) + + +def _optional_bool( + obj: dict[str, object], + section: str, + key: str, +) -> bool | None: + if key not in obj: + return None + return _required_bool(obj, section, key) + + +def _optional_str( + obj: dict[str, object], + section: str, + key: str, +) -> str | None: + if key not in obj: + return None + return _required_str(obj, section, key) + + +def _validate_pipelock_render_config(cfg: dict[str, object]) -> dict[str, object]: + _reject_unknown_keys("config", cfg, _PIPELOCK_TOP_LEVEL_KEYS) + normalized: dict[str, object] = { + "version": _required_int(cfg, "config", "version"), + "mode": _required_str(cfg, "config", "mode"), + "enforce": _required_bool(cfg, "config", "enforce"), + "api_allowlist": _required_str_list(cfg, "config", "api_allowlist"), + } + + if "seed_phrase_detection" in cfg: + spd = _required_dict(cfg, "config", "seed_phrase_detection") + _reject_unknown_keys("seed_phrase_detection", spd, {"enabled"}) + normalized["seed_phrase_detection"] = { + "enabled": _required_bool(spd, "seed_phrase_detection", "enabled"), + } + + fp = _required_dict(cfg, "config", "forward_proxy") + _reject_unknown_keys("forward_proxy", fp, {"enabled"}) + normalized["forward_proxy"] = { + "enabled": _required_bool(fp, "forward_proxy", "enabled"), + } + + dlp = _required_dict(cfg, "config", "dlp") + _reject_unknown_keys("dlp", dlp, {"include_defaults", "scan_env"}) + normalized["dlp"] = { + "include_defaults": _required_bool(dlp, "dlp", "include_defaults"), + "scan_env": _required_bool(dlp, "dlp", "scan_env"), + } + + rbs = _required_dict(cfg, "config", "request_body_scanning") + _reject_unknown_keys( + "request_body_scanning", + rbs, + {"action", "scan_headers", "header_mode"}, + ) + normalized_rbs: dict[str, object] = { + "action": _required_str(rbs, "request_body_scanning", "action"), + } + scan_headers = _optional_bool(rbs, "request_body_scanning", "scan_headers") + if scan_headers is not None: + normalized_rbs["scan_headers"] = scan_headers + header_mode = _optional_str(rbs, "request_body_scanning", "header_mode") + if header_mode is not None: + normalized_rbs["header_mode"] = header_mode + normalized["request_body_scanning"] = normalized_rbs + + if "tls_interception" in cfg: + tls = _required_dict(cfg, "config", "tls_interception") + _reject_unknown_keys( + "tls_interception", + tls, + {"enabled", "ca_cert", "ca_key", "passthrough_domains"}, + ) + normalized["tls_interception"] = { + "enabled": _required_bool(tls, "tls_interception", "enabled"), + "ca_cert": _required_str(tls, "tls_interception", "ca_cert"), + "ca_key": _required_str(tls, "tls_interception", "ca_key"), + "passthrough_domains": _optional_str_list( + tls, "tls_interception", "passthrough_domains", + ), + } + + if "ssrf" in cfg: + ssrf = _required_dict(cfg, "config", "ssrf") + _reject_unknown_keys("ssrf", ssrf, {"ip_allowlist"}) + normalized["ssrf"] = { + "ip_allowlist": _required_str_list(ssrf, "ssrf", "ip_allowlist"), + } + + return normalized + + def pipelock_render_yaml(cfg: dict[str, object]) -> str: """Render a pipelock config dict (as produced by `pipelock_build_config`) as YAML. Hand-rolled so we don't take a @@ -230,31 +403,38 @@ def pipelock_render_yaml(cfg: dict[str, object]) -> str: def _bool(b: object) -> str: return "true" if b else "false" + cfg = _validate_pipelock_render_config(cfg) lines: list[str] = [] lines.append(f"version: {cfg['version']}") lines.append(f"mode: {cfg['mode']}") lines.append(f"enforce: {_bool(cfg['enforce'])}") lines.append("") lines.append("api_allowlist:") - for h in cast(list[str], cfg["api_allowlist"]): + api_allowlist = cfg["api_allowlist"] + assert isinstance(api_allowlist, list) + for h in api_allowlist: lines.append(f' - "{h}"') lines.append("") if "seed_phrase_detection" in cfg: lines.append("seed_phrase_detection:") - spd = cast(dict[str, object], cfg["seed_phrase_detection"]) + spd = cfg["seed_phrase_detection"] + assert isinstance(spd, dict) lines.append(f" enabled: {_bool(spd['enabled'])}") lines.append("") lines.append("forward_proxy:") - fp = cast(dict[str, object], cfg["forward_proxy"]) + fp = cfg["forward_proxy"] + assert isinstance(fp, dict) lines.append(f" enabled: {_bool(fp['enabled'])}") lines.append("") lines.append("dlp:") - dlp = cast(dict[str, object], cfg["dlp"]) + dlp = cfg["dlp"] + assert isinstance(dlp, dict) lines.append(f" include_defaults: {_bool(dlp['include_defaults'])}") lines.append(f" scan_env: {_bool(dlp['scan_env'])}") lines.append("") lines.append("request_body_scanning:") - rbs = cast(dict[str, object], cfg["request_body_scanning"]) + rbs = cfg["request_body_scanning"] + assert isinstance(rbs, dict) lines.append(f' action: "{rbs["action"]}"') if "scan_headers" in rbs: lines.append(f" scan_headers: {_bool(rbs['scan_headers'])}") @@ -263,11 +443,13 @@ def pipelock_render_yaml(cfg: dict[str, object]) -> str: if "tls_interception" in cfg: lines.append("") lines.append("tls_interception:") - tls = cast(dict[str, object], cfg["tls_interception"]) + tls = cfg["tls_interception"] + assert isinstance(tls, dict) lines.append(f" enabled: {_bool(tls['enabled'])}") lines.append(f' ca_cert: "{tls["ca_cert"]}"') lines.append(f' ca_key: "{tls["ca_key"]}"') - passthrough = cast(list[str], tls.get("passthrough_domains", [])) + passthrough = tls["passthrough_domains"] + assert isinstance(passthrough, list) if passthrough: lines.append(" passthrough_domains:") for d in passthrough: @@ -275,9 +457,12 @@ def pipelock_render_yaml(cfg: dict[str, object]) -> str: if "ssrf" in cfg: lines.append("") lines.append("ssrf:") - ssrf = cast(dict[str, object], cfg["ssrf"]) + ssrf = cfg["ssrf"] + assert isinstance(ssrf, dict) lines.append(" ip_allowlist:") - for ip in cast(list[str], ssrf["ip_allowlist"]): + ip_allowlist = ssrf["ip_allowlist"] + assert isinstance(ip_allowlist, list) + for ip in ip_allowlist: lines.append(f' - "{ip}"') return "\n".join(lines) + "\n" diff --git a/docs/prds/0037-pipelock-yaml-render-contract.md b/docs/prds/0037-pipelock-yaml-render-contract.md index 3406f72..352d7d7 100644 --- a/docs/prds/0037-pipelock-yaml-render-contract.md +++ b/docs/prds/0037-pipelock-yaml-render-contract.md @@ -73,6 +73,15 @@ rendering a section, validate that required keys exist with the expected primitive/list/dict types. Missing or unsupported shapes should raise a clear `ValueError` naming the section and key. +The supported top-level shape is `version`, `mode`, `enforce`, +`api_allowlist`, `seed_phrase_detection`, `forward_proxy`, `dlp`, +`request_body_scanning`, `tls_interception`, and `ssrf`. Required sections are +validated before rendering; optional sections keep the current omission +behavior. `request_body_scanning.scan_headers`, +`request_body_scanning.header_mode`, and +`tls_interception.passthrough_domains` remain optional for compatibility with +parsed running configs that only contain the older rendered subset. + Tests should cover both normal output and failure cases. Because the project is stdlib-only, semantic tests can use a small purpose-built parser for the exact rendered shape or compare rendered lines to values from the structured config @@ -101,6 +110,4 @@ Run: ## Open Questions -- Should malformed config errors be `ValueError`, matching current - `pipelock_build_config` validation, or a new internal exception type? Prefer - `ValueError` unless a caller needs to distinguish serializer errors. +None. diff --git a/tests/unit/test_pipelock_yaml.py b/tests/unit/test_pipelock_yaml.py index 1728242..e565694 100644 --- a/tests/unit/test_pipelock_yaml.py +++ b/tests/unit/test_pipelock_yaml.py @@ -19,6 +19,7 @@ from bot_bottle.pipelock import ( pipelock_build_config, pipelock_render_yaml, ) +from bot_bottle.yaml_subset import parse_yaml_subset from tests.fixtures import fixture_minimal @@ -158,6 +159,51 @@ class TestRenderAndWrite(unittest.TestCase): import shutil shutil.rmtree(self.out_dir, ignore_errors=True) + def assert_render_semantics_match(self, cfg: dict[str, object]) -> None: + parsed = parse_yaml_subset(pipelock_render_yaml(cfg)) + self.assertEqual(cfg["version"], parsed["version"]) + self.assertEqual(cfg["mode"], parsed["mode"]) + self.assertEqual(cfg["enforce"], parsed["enforce"]) + parsed_allowlist = parsed["api_allowlist"] + if cfg["api_allowlist"] == [] and parsed_allowlist is None: + parsed_allowlist = [] + self.assertEqual(cfg["api_allowlist"], parsed_allowlist) + self.assertEqual(cfg["forward_proxy"], parsed["forward_proxy"]) + self.assertEqual(cfg["dlp"], parsed["dlp"]) + self.assertEqual( + cfg["request_body_scanning"], + parsed["request_body_scanning"], + ) + if "seed_phrase_detection" in cfg: + self.assertEqual( + cfg["seed_phrase_detection"], + parsed["seed_phrase_detection"], + ) + else: + self.assertNotIn("seed_phrase_detection", parsed) + + if "tls_interception" in cfg: + expected_tls = cast(dict[str, object], cfg["tls_interception"]) + actual_tls = cast(dict[str, object], parsed["tls_interception"]) + self.assertEqual(expected_tls["enabled"], actual_tls["enabled"]) + self.assertEqual(expected_tls["ca_cert"], actual_tls["ca_cert"]) + self.assertEqual(expected_tls["ca_key"], actual_tls["ca_key"]) + expected_passthrough = expected_tls["passthrough_domains"] + if expected_passthrough: + self.assertEqual( + expected_passthrough, + actual_tls["passthrough_domains"], + ) + else: + self.assertNotIn("passthrough_domains", actual_tls) + else: + self.assertNotIn("tls_interception", parsed) + + if "ssrf" in cfg: + self.assertEqual(cfg["ssrf"], parsed["ssrf"]) + else: + self.assertNotIn("ssrf", parsed) + def test_render_emits_required_top_level_keys(self): """One render-level smoke check: the serialized YAML is plausibly the shape pipelock expects. We don't grep every key here — that's @@ -175,6 +221,67 @@ class TestRenderAndWrite(unittest.TestCase): self.assertNotIn("trusted_domains:", text) self.assertNotIn("ssrf:", text) + def test_render_semantics_match_minimal_config(self): + cfg = pipelock_build_config(fixture_minimal().bottles["dev"]) + self.assert_render_semantics_match(cfg) + + def test_render_semantics_match_tls_with_empty_passthrough(self): + cfg = pipelock_build_config( + fixture_minimal().bottles["dev"], + ca_cert_path="/etc/pipelock-ca.pem", + ca_key_path="/etc/pipelock-ca-key.pem", + ) + self.assert_render_semantics_match(cfg) + + def test_render_semantics_match_all_optional_sections(self): + bottle = Manifest.from_json_obj({ + "bottles": {"dev": {"egress": {"routes": [ + {"host": "api.openai.com", + "pipelock": {"tls_passthrough": True}}, + {"host": "gitea.dideric.is", + "pipelock": {"ssrf_ip_allowlist": ["100.78.141.42/32"]}}, + ]}}}, + "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, + }).bottles["dev"] + cfg = pipelock_build_config( + bottle, + ca_cert_path="/etc/pipelock-ca.pem", + ca_key_path="/etc/pipelock-ca-key.pem", + ssrf_ip_allowlist=("172.20.0.0/16",), + ) + self.assert_render_semantics_match(cfg) + + def test_render_rejects_missing_required_key(self): + cfg = pipelock_build_config(fixture_minimal().bottles["dev"]) + del cfg["mode"] + with self.assertRaisesRegex(ValueError, r"config\.mode"): + pipelock_render_yaml(cfg) + + def test_render_rejects_wrong_section_type(self): + cfg = pipelock_build_config(fixture_minimal().bottles["dev"]) + cfg["dlp"] = [] + with self.assertRaisesRegex(ValueError, r"config\.dlp.*mapping"): + pipelock_render_yaml(cfg) + + def test_render_rejects_wrong_list_item_type(self): + cfg = pipelock_build_config( + fixture_minimal().bottles["dev"], + ca_cert_path="/etc/pipelock-ca.pem", + ca_key_path="/etc/pipelock-ca-key.pem", + ) + tls = cast(dict[str, object], cfg["tls_interception"]) + tls["passthrough_domains"] = ["api.openai.com", 3] + with self.assertRaisesRegex( + ValueError, r"tls_interception\.passthrough_domains", + ): + pipelock_render_yaml(cfg) + + def test_render_rejects_unsupported_top_level_section(self): + cfg = pipelock_build_config(fixture_minimal().bottles["dev"]) + cfg["trusted_domains"] = [] + with self.assertRaisesRegex(ValueError, r"config\.trusted_domains"): + pipelock_render_yaml(cfg) + def test_prepare_writes_file_at_mode_600(self): plan = PipelockProxy().prepare( fixture_minimal().bottles["dev"], "demo", self.out_dir