From 9185c145a1e4aa0db07910800c1559a680244147 Mon Sep 17 00:00:00 2001 From: codex Date: Tue, 2 Jun 2026 08:00:44 +0000 Subject: [PATCH 1/3] docs(prd): add pipelock yaml contract --- .../0037-pipelock-yaml-render-contract.md | 106 ++++++++++++++++++ 1 file changed, 106 insertions(+) create mode 100644 docs/prds/0037-pipelock-yaml-render-contract.md diff --git a/docs/prds/0037-pipelock-yaml-render-contract.md b/docs/prds/0037-pipelock-yaml-render-contract.md new file mode 100644 index 0000000..3406f72 --- /dev/null +++ b/docs/prds/0037-pipelock-yaml-render-contract.md @@ -0,0 +1,106 @@ +# PRD 0037: Pipelock YAML Render Contract + +- **Status:** Draft +- **Author:** didericis-codex +- **Created:** 2026-06-02 +- **Issue:** #130 + +## Summary + +Lock down the contract between `pipelock_build_config` and +`pipelock_render_yaml` so hand-rendered pipelock YAML stays aligned with the +structured config bot-bottle builds. Keep the stdlib-only renderer, but add +shape validation and semantic tests for every supported section. + +## Problem + +`bot_bottle/pipelock.py` builds a structured dict and then renders a fixed YAML +shape by hand. This avoids a runtime YAML dependency, but it also means the +renderer directly indexes expected keys. If `pipelock_build_config` adds, +renames, or conditionalizes a section, rendering can fail at runtime or emit +YAML that no longer matches the config semantics. + +Existing tests assert important rendered fragments, but they do not fully lock +the build/render contract or optional-section combinations. A mismatch here can +weaken DLP enforcement or break bottle launch after a future pipelock policy +change. + +## Goals / Success Criteria + +- Keep the renderer stdlib-only. +- Define the supported pipelock config shape in one place. +- Fail clearly when `pipelock_render_yaml` receives an unsupported or malformed + config shape. +- Add tests covering all supported sections: + - base allowlist and forward proxy. + - seed phrase detection toggle. + - DLP and request-body/header scanning. + - TLS interception and passthrough domains. + - SSRF IP allowlist. +- Add semantic tests that compare structured config values to rendered YAML + output without relying only on brittle substring assertions. +- Preserve current rendered YAML for existing configs unless a clearer failure + path requires an error message change. + +## Non-goals + +- No PyYAML or other runtime dependency. +- No change to pipelock policy defaults. +- No change to egress-to-pipelock topology. +- No change to pipelock image version or config schema beyond validation of the + shape bot-bottle already emits. + +## Scope + +In scope: + +- `bot_bottle/pipelock.py` render helpers and validation. +- Unit tests in `tests/unit/test_pipelock_yaml.py` and related focused + pipelock tests. +- Small helper functions for typed access to config sections, if useful. + +Out of scope: + +- Launch/backend changes. +- Integration tests that start a real pipelock container. +- Changing the manifest schema for route-level pipelock policy. + +## Design + +Treat `pipelock_render_yaml` as a serializer for the narrow config shape +produced by `pipelock_build_config`, not as a generic YAML renderer. Before +rendering a section, validate that required keys exist with the expected +primitive/list/dict types. Missing or unsupported shapes should raise a clear +`ValueError` naming the section and key. + +Tests should cover both normal output and failure cases. Because the project is +stdlib-only, semantic tests can use a small purpose-built parser for the exact +rendered shape or compare rendered lines to values from the structured config +through helper assertions. The goal is to detect drift between config dict and +YAML without adding a general YAML dependency. + +Optional sections should be exercised in combinations: + +- no TLS and no SSRF. +- TLS enabled with empty and non-empty passthrough domains. +- SSRF enabled with one or more IP/CIDR entries. +- all optional sections enabled together. + +## Testing Strategy + +- Extend `tests/unit/test_pipelock_yaml.py` with semantic assertions tying each + rendered section back to the config dict. +- Add malformed-config tests for missing required keys and wrong section types. +- Keep existing render fragment tests where they protect exact pipelock syntax. + +Run: + +- `python3 -m unittest tests.unit.test_pipelock_yaml` +- `python3 -m unittest tests.unit.test_pipelock_allowlist` +- `python3 -m unittest discover -s tests/unit` + +## Open Questions + +- Should malformed config errors be `ValueError`, matching current + `pipelock_build_config` validation, or a new internal exception type? Prefer + `ValueError` unless a caller needs to distinguish serializer errors. -- 2.52.0 From 6e954da9b78bbe49edec344bbee13700b8ccb7d6 Mon Sep 17 00:00:00 2001 From: codex Date: Tue, 2 Jun 2026 08:14:48 +0000 Subject: [PATCH 2/3] fix(pipelock): validate yaml render config --- bot_bottle/pipelock.py | 205 +++++++++++++++++- .../0037-pipelock-yaml-render-contract.md | 13 +- tests/unit/test_pipelock_yaml.py | 107 +++++++++ 3 files changed, 312 insertions(+), 13 deletions(-) diff --git a/bot_bottle/pipelock.py b/bot_bottle/pipelock.py index fcab300..0443d31 100644 --- a/bot_bottle/pipelock.py +++ b/bot_bottle/pipelock.py @@ -19,7 +19,6 @@ from __future__ import annotations from dataclasses import dataclass from pathlib import Path -from typing import cast from .egress import EGRESS_HOSTNAME, EgressRoute, egress_routes_for_bottle from .supervise import SUPERVISE_HOSTNAME @@ -223,6 +222,180 @@ def pipelock_build_config( return cfg +_PIPELOCK_TOP_LEVEL_KEYS = { + "version", + "mode", + "enforce", + "api_allowlist", + "seed_phrase_detection", + "forward_proxy", + "dlp", + "request_body_scanning", + "tls_interception", + "ssrf", +} + + +def _pipelock_render_error(section: str, key: str, expected: str) -> ValueError: + return ValueError( + f"pipelock_render_yaml: {section}.{key} must be {expected}" + ) + + +def _reject_unknown_keys( + section: str, + obj: dict[str, object], + allowed: set[str], +) -> None: + for key in sorted(set(obj) - allowed): + raise ValueError(f"pipelock_render_yaml: {section}.{key} is unsupported") + + +def _required_dict( + obj: dict[str, object], + section: str, + key: str, +) -> dict[str, object]: + value = obj.get(key) + if not isinstance(value, dict): + raise _pipelock_render_error(section, key, "a mapping") + return value + + +def _required_bool(obj: dict[str, object], section: str, key: str) -> bool: + value = obj.get(key) + if not isinstance(value, bool): + raise _pipelock_render_error(section, key, "a boolean") + return value + + +def _required_int(obj: dict[str, object], section: str, key: str) -> int: + value = obj.get(key) + if isinstance(value, bool) or not isinstance(value, int): + raise _pipelock_render_error(section, key, "an integer") + return value + + +def _required_str(obj: dict[str, object], section: str, key: str) -> str: + value = obj.get(key) + if not isinstance(value, str): + raise _pipelock_render_error(section, key, "a string") + return value + + +def _required_str_list( + obj: dict[str, object], + section: str, + key: str, +) -> list[str]: + value = obj.get(key) + if not isinstance(value, list) or not all(isinstance(v, str) for v in value): + raise _pipelock_render_error(section, key, "a list of strings") + return value + + +def _optional_str_list( + obj: dict[str, object], + section: str, + key: str, +) -> list[str]: + if key not in obj: + return [] + return _required_str_list(obj, section, key) + + +def _optional_bool( + obj: dict[str, object], + section: str, + key: str, +) -> bool | None: + if key not in obj: + return None + return _required_bool(obj, section, key) + + +def _optional_str( + obj: dict[str, object], + section: str, + key: str, +) -> str | None: + if key not in obj: + return None + return _required_str(obj, section, key) + + +def _validate_pipelock_render_config(cfg: dict[str, object]) -> dict[str, object]: + _reject_unknown_keys("config", cfg, _PIPELOCK_TOP_LEVEL_KEYS) + normalized: dict[str, object] = { + "version": _required_int(cfg, "config", "version"), + "mode": _required_str(cfg, "config", "mode"), + "enforce": _required_bool(cfg, "config", "enforce"), + "api_allowlist": _required_str_list(cfg, "config", "api_allowlist"), + } + + if "seed_phrase_detection" in cfg: + spd = _required_dict(cfg, "config", "seed_phrase_detection") + _reject_unknown_keys("seed_phrase_detection", spd, {"enabled"}) + normalized["seed_phrase_detection"] = { + "enabled": _required_bool(spd, "seed_phrase_detection", "enabled"), + } + + fp = _required_dict(cfg, "config", "forward_proxy") + _reject_unknown_keys("forward_proxy", fp, {"enabled"}) + normalized["forward_proxy"] = { + "enabled": _required_bool(fp, "forward_proxy", "enabled"), + } + + dlp = _required_dict(cfg, "config", "dlp") + _reject_unknown_keys("dlp", dlp, {"include_defaults", "scan_env"}) + normalized["dlp"] = { + "include_defaults": _required_bool(dlp, "dlp", "include_defaults"), + "scan_env": _required_bool(dlp, "dlp", "scan_env"), + } + + rbs = _required_dict(cfg, "config", "request_body_scanning") + _reject_unknown_keys( + "request_body_scanning", + rbs, + {"action", "scan_headers", "header_mode"}, + ) + normalized_rbs: dict[str, object] = { + "action": _required_str(rbs, "request_body_scanning", "action"), + } + scan_headers = _optional_bool(rbs, "request_body_scanning", "scan_headers") + if scan_headers is not None: + normalized_rbs["scan_headers"] = scan_headers + header_mode = _optional_str(rbs, "request_body_scanning", "header_mode") + if header_mode is not None: + normalized_rbs["header_mode"] = header_mode + normalized["request_body_scanning"] = normalized_rbs + + if "tls_interception" in cfg: + tls = _required_dict(cfg, "config", "tls_interception") + _reject_unknown_keys( + "tls_interception", + tls, + {"enabled", "ca_cert", "ca_key", "passthrough_domains"}, + ) + normalized["tls_interception"] = { + "enabled": _required_bool(tls, "tls_interception", "enabled"), + "ca_cert": _required_str(tls, "tls_interception", "ca_cert"), + "ca_key": _required_str(tls, "tls_interception", "ca_key"), + "passthrough_domains": _optional_str_list( + tls, "tls_interception", "passthrough_domains", + ), + } + + if "ssrf" in cfg: + ssrf = _required_dict(cfg, "config", "ssrf") + _reject_unknown_keys("ssrf", ssrf, {"ip_allowlist"}) + normalized["ssrf"] = { + "ip_allowlist": _required_str_list(ssrf, "ssrf", "ip_allowlist"), + } + + return normalized + + def pipelock_render_yaml(cfg: dict[str, object]) -> str: """Render a pipelock config dict (as produced by `pipelock_build_config`) as YAML. Hand-rolled so we don't take a @@ -230,31 +403,38 @@ def pipelock_render_yaml(cfg: dict[str, object]) -> str: def _bool(b: object) -> str: return "true" if b else "false" + cfg = _validate_pipelock_render_config(cfg) lines: list[str] = [] lines.append(f"version: {cfg['version']}") lines.append(f"mode: {cfg['mode']}") lines.append(f"enforce: {_bool(cfg['enforce'])}") lines.append("") lines.append("api_allowlist:") - for h in cast(list[str], cfg["api_allowlist"]): + api_allowlist = cfg["api_allowlist"] + assert isinstance(api_allowlist, list) + for h in api_allowlist: lines.append(f' - "{h}"') lines.append("") if "seed_phrase_detection" in cfg: lines.append("seed_phrase_detection:") - spd = cast(dict[str, object], cfg["seed_phrase_detection"]) + spd = cfg["seed_phrase_detection"] + assert isinstance(spd, dict) lines.append(f" enabled: {_bool(spd['enabled'])}") lines.append("") lines.append("forward_proxy:") - fp = cast(dict[str, object], cfg["forward_proxy"]) + fp = cfg["forward_proxy"] + assert isinstance(fp, dict) lines.append(f" enabled: {_bool(fp['enabled'])}") lines.append("") lines.append("dlp:") - dlp = cast(dict[str, object], cfg["dlp"]) + dlp = cfg["dlp"] + assert isinstance(dlp, dict) lines.append(f" include_defaults: {_bool(dlp['include_defaults'])}") lines.append(f" scan_env: {_bool(dlp['scan_env'])}") lines.append("") lines.append("request_body_scanning:") - rbs = cast(dict[str, object], cfg["request_body_scanning"]) + rbs = cfg["request_body_scanning"] + assert isinstance(rbs, dict) lines.append(f' action: "{rbs["action"]}"') if "scan_headers" in rbs: lines.append(f" scan_headers: {_bool(rbs['scan_headers'])}") @@ -263,11 +443,13 @@ def pipelock_render_yaml(cfg: dict[str, object]) -> str: if "tls_interception" in cfg: lines.append("") lines.append("tls_interception:") - tls = cast(dict[str, object], cfg["tls_interception"]) + tls = cfg["tls_interception"] + assert isinstance(tls, dict) lines.append(f" enabled: {_bool(tls['enabled'])}") lines.append(f' ca_cert: "{tls["ca_cert"]}"') lines.append(f' ca_key: "{tls["ca_key"]}"') - passthrough = cast(list[str], tls.get("passthrough_domains", [])) + passthrough = tls["passthrough_domains"] + assert isinstance(passthrough, list) if passthrough: lines.append(" passthrough_domains:") for d in passthrough: @@ -275,9 +457,12 @@ def pipelock_render_yaml(cfg: dict[str, object]) -> str: if "ssrf" in cfg: lines.append("") lines.append("ssrf:") - ssrf = cast(dict[str, object], cfg["ssrf"]) + ssrf = cfg["ssrf"] + assert isinstance(ssrf, dict) lines.append(" ip_allowlist:") - for ip in cast(list[str], ssrf["ip_allowlist"]): + ip_allowlist = ssrf["ip_allowlist"] + assert isinstance(ip_allowlist, list) + for ip in ip_allowlist: lines.append(f' - "{ip}"') return "\n".join(lines) + "\n" diff --git a/docs/prds/0037-pipelock-yaml-render-contract.md b/docs/prds/0037-pipelock-yaml-render-contract.md index 3406f72..352d7d7 100644 --- a/docs/prds/0037-pipelock-yaml-render-contract.md +++ b/docs/prds/0037-pipelock-yaml-render-contract.md @@ -73,6 +73,15 @@ rendering a section, validate that required keys exist with the expected primitive/list/dict types. Missing or unsupported shapes should raise a clear `ValueError` naming the section and key. +The supported top-level shape is `version`, `mode`, `enforce`, +`api_allowlist`, `seed_phrase_detection`, `forward_proxy`, `dlp`, +`request_body_scanning`, `tls_interception`, and `ssrf`. Required sections are +validated before rendering; optional sections keep the current omission +behavior. `request_body_scanning.scan_headers`, +`request_body_scanning.header_mode`, and +`tls_interception.passthrough_domains` remain optional for compatibility with +parsed running configs that only contain the older rendered subset. + Tests should cover both normal output and failure cases. Because the project is stdlib-only, semantic tests can use a small purpose-built parser for the exact rendered shape or compare rendered lines to values from the structured config @@ -101,6 +110,4 @@ Run: ## Open Questions -- Should malformed config errors be `ValueError`, matching current - `pipelock_build_config` validation, or a new internal exception type? Prefer - `ValueError` unless a caller needs to distinguish serializer errors. +None. diff --git a/tests/unit/test_pipelock_yaml.py b/tests/unit/test_pipelock_yaml.py index 1728242..e565694 100644 --- a/tests/unit/test_pipelock_yaml.py +++ b/tests/unit/test_pipelock_yaml.py @@ -19,6 +19,7 @@ from bot_bottle.pipelock import ( pipelock_build_config, pipelock_render_yaml, ) +from bot_bottle.yaml_subset import parse_yaml_subset from tests.fixtures import fixture_minimal @@ -158,6 +159,51 @@ class TestRenderAndWrite(unittest.TestCase): import shutil shutil.rmtree(self.out_dir, ignore_errors=True) + def assert_render_semantics_match(self, cfg: dict[str, object]) -> None: + parsed = parse_yaml_subset(pipelock_render_yaml(cfg)) + self.assertEqual(cfg["version"], parsed["version"]) + self.assertEqual(cfg["mode"], parsed["mode"]) + self.assertEqual(cfg["enforce"], parsed["enforce"]) + parsed_allowlist = parsed["api_allowlist"] + if cfg["api_allowlist"] == [] and parsed_allowlist is None: + parsed_allowlist = [] + self.assertEqual(cfg["api_allowlist"], parsed_allowlist) + self.assertEqual(cfg["forward_proxy"], parsed["forward_proxy"]) + self.assertEqual(cfg["dlp"], parsed["dlp"]) + self.assertEqual( + cfg["request_body_scanning"], + parsed["request_body_scanning"], + ) + if "seed_phrase_detection" in cfg: + self.assertEqual( + cfg["seed_phrase_detection"], + parsed["seed_phrase_detection"], + ) + else: + self.assertNotIn("seed_phrase_detection", parsed) + + if "tls_interception" in cfg: + expected_tls = cast(dict[str, object], cfg["tls_interception"]) + actual_tls = cast(dict[str, object], parsed["tls_interception"]) + self.assertEqual(expected_tls["enabled"], actual_tls["enabled"]) + self.assertEqual(expected_tls["ca_cert"], actual_tls["ca_cert"]) + self.assertEqual(expected_tls["ca_key"], actual_tls["ca_key"]) + expected_passthrough = expected_tls["passthrough_domains"] + if expected_passthrough: + self.assertEqual( + expected_passthrough, + actual_tls["passthrough_domains"], + ) + else: + self.assertNotIn("passthrough_domains", actual_tls) + else: + self.assertNotIn("tls_interception", parsed) + + if "ssrf" in cfg: + self.assertEqual(cfg["ssrf"], parsed["ssrf"]) + else: + self.assertNotIn("ssrf", parsed) + def test_render_emits_required_top_level_keys(self): """One render-level smoke check: the serialized YAML is plausibly the shape pipelock expects. We don't grep every key here — that's @@ -175,6 +221,67 @@ class TestRenderAndWrite(unittest.TestCase): self.assertNotIn("trusted_domains:", text) self.assertNotIn("ssrf:", text) + def test_render_semantics_match_minimal_config(self): + cfg = pipelock_build_config(fixture_minimal().bottles["dev"]) + self.assert_render_semantics_match(cfg) + + def test_render_semantics_match_tls_with_empty_passthrough(self): + cfg = pipelock_build_config( + fixture_minimal().bottles["dev"], + ca_cert_path="/etc/pipelock-ca.pem", + ca_key_path="/etc/pipelock-ca-key.pem", + ) + self.assert_render_semantics_match(cfg) + + def test_render_semantics_match_all_optional_sections(self): + bottle = Manifest.from_json_obj({ + "bottles": {"dev": {"egress": {"routes": [ + {"host": "api.openai.com", + "pipelock": {"tls_passthrough": True}}, + {"host": "gitea.dideric.is", + "pipelock": {"ssrf_ip_allowlist": ["100.78.141.42/32"]}}, + ]}}}, + "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, + }).bottles["dev"] + cfg = pipelock_build_config( + bottle, + ca_cert_path="/etc/pipelock-ca.pem", + ca_key_path="/etc/pipelock-ca-key.pem", + ssrf_ip_allowlist=("172.20.0.0/16",), + ) + self.assert_render_semantics_match(cfg) + + def test_render_rejects_missing_required_key(self): + cfg = pipelock_build_config(fixture_minimal().bottles["dev"]) + del cfg["mode"] + with self.assertRaisesRegex(ValueError, r"config\.mode"): + pipelock_render_yaml(cfg) + + def test_render_rejects_wrong_section_type(self): + cfg = pipelock_build_config(fixture_minimal().bottles["dev"]) + cfg["dlp"] = [] + with self.assertRaisesRegex(ValueError, r"config\.dlp.*mapping"): + pipelock_render_yaml(cfg) + + def test_render_rejects_wrong_list_item_type(self): + cfg = pipelock_build_config( + fixture_minimal().bottles["dev"], + ca_cert_path="/etc/pipelock-ca.pem", + ca_key_path="/etc/pipelock-ca-key.pem", + ) + tls = cast(dict[str, object], cfg["tls_interception"]) + tls["passthrough_domains"] = ["api.openai.com", 3] + with self.assertRaisesRegex( + ValueError, r"tls_interception\.passthrough_domains", + ): + pipelock_render_yaml(cfg) + + def test_render_rejects_unsupported_top_level_section(self): + cfg = pipelock_build_config(fixture_minimal().bottles["dev"]) + cfg["trusted_domains"] = [] + with self.assertRaisesRegex(ValueError, r"config\.trusted_domains"): + pipelock_render_yaml(cfg) + def test_prepare_writes_file_at_mode_600(self): plan = PipelockProxy().prepare( fixture_minimal().bottles["dev"], "demo", self.out_dir -- 2.52.0 From f95ef0c4468827d37d0e0170a41d2096c6cb4a3b Mon Sep 17 00:00:00 2001 From: codex Date: Tue, 2 Jun 2026 08:14:57 +0000 Subject: [PATCH 3/3] complete(prd): mark PRD 0037 active --- docs/prds/0037-pipelock-yaml-render-contract.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/prds/0037-pipelock-yaml-render-contract.md b/docs/prds/0037-pipelock-yaml-render-contract.md index 352d7d7..a321c95 100644 --- a/docs/prds/0037-pipelock-yaml-render-contract.md +++ b/docs/prds/0037-pipelock-yaml-render-contract.md @@ -1,6 +1,6 @@ # PRD 0037: Pipelock YAML Render Contract -- **Status:** Draft +- **Status:** Active - **Author:** didericis-codex - **Created:** 2026-06-02 - **Issue:** #130 -- 2.52.0