PRD 0037: Pipelock YAML Render Contract #133
+195
-10
@@ -19,7 +19,6 @@ from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import cast
|
||||
|
||||
from .egress import EGRESS_HOSTNAME, EgressRoute, egress_routes_for_bottle
|
||||
from .supervise import SUPERVISE_HOSTNAME
|
||||
@@ -223,6 +222,180 @@ def pipelock_build_config(
|
||||
return cfg
|
||||
|
||||
|
||||
_PIPELOCK_TOP_LEVEL_KEYS = {
|
||||
"version",
|
||||
"mode",
|
||||
"enforce",
|
||||
"api_allowlist",
|
||||
"seed_phrase_detection",
|
||||
"forward_proxy",
|
||||
"dlp",
|
||||
"request_body_scanning",
|
||||
"tls_interception",
|
||||
"ssrf",
|
||||
}
|
||||
|
||||
|
||||
def _pipelock_render_error(section: str, key: str, expected: str) -> ValueError:
|
||||
return ValueError(
|
||||
f"pipelock_render_yaml: {section}.{key} must be {expected}"
|
||||
)
|
||||
|
||||
|
||||
def _reject_unknown_keys(
|
||||
section: str,
|
||||
obj: dict[str, object],
|
||||
allowed: set[str],
|
||||
) -> None:
|
||||
for key in sorted(set(obj) - allowed):
|
||||
raise ValueError(f"pipelock_render_yaml: {section}.{key} is unsupported")
|
||||
|
||||
|
||||
def _required_dict(
|
||||
obj: dict[str, object],
|
||||
section: str,
|
||||
key: str,
|
||||
) -> dict[str, object]:
|
||||
value = obj.get(key)
|
||||
if not isinstance(value, dict):
|
||||
raise _pipelock_render_error(section, key, "a mapping")
|
||||
return value
|
||||
|
||||
|
||||
def _required_bool(obj: dict[str, object], section: str, key: str) -> bool:
|
||||
value = obj.get(key)
|
||||
if not isinstance(value, bool):
|
||||
raise _pipelock_render_error(section, key, "a boolean")
|
||||
return value
|
||||
|
||||
|
||||
def _required_int(obj: dict[str, object], section: str, key: str) -> int:
|
||||
value = obj.get(key)
|
||||
if isinstance(value, bool) or not isinstance(value, int):
|
||||
raise _pipelock_render_error(section, key, "an integer")
|
||||
return value
|
||||
|
||||
|
||||
def _required_str(obj: dict[str, object], section: str, key: str) -> str:
|
||||
value = obj.get(key)
|
||||
if not isinstance(value, str):
|
||||
raise _pipelock_render_error(section, key, "a string")
|
||||
return value
|
||||
|
||||
|
||||
def _required_str_list(
|
||||
obj: dict[str, object],
|
||||
section: str,
|
||||
key: str,
|
||||
) -> list[str]:
|
||||
value = obj.get(key)
|
||||
if not isinstance(value, list) or not all(isinstance(v, str) for v in value):
|
||||
raise _pipelock_render_error(section, key, "a list of strings")
|
||||
return value
|
||||
|
||||
|
||||
def _optional_str_list(
|
||||
obj: dict[str, object],
|
||||
section: str,
|
||||
key: str,
|
||||
) -> list[str]:
|
||||
if key not in obj:
|
||||
return []
|
||||
return _required_str_list(obj, section, key)
|
||||
|
||||
|
||||
def _optional_bool(
|
||||
obj: dict[str, object],
|
||||
section: str,
|
||||
key: str,
|
||||
) -> bool | None:
|
||||
if key not in obj:
|
||||
return None
|
||||
return _required_bool(obj, section, key)
|
||||
|
||||
|
||||
def _optional_str(
|
||||
obj: dict[str, object],
|
||||
section: str,
|
||||
key: str,
|
||||
) -> str | None:
|
||||
if key not in obj:
|
||||
return None
|
||||
return _required_str(obj, section, key)
|
||||
|
||||
|
||||
def _validate_pipelock_render_config(cfg: dict[str, object]) -> dict[str, object]:
|
||||
_reject_unknown_keys("config", cfg, _PIPELOCK_TOP_LEVEL_KEYS)
|
||||
normalized: dict[str, object] = {
|
||||
"version": _required_int(cfg, "config", "version"),
|
||||
"mode": _required_str(cfg, "config", "mode"),
|
||||
"enforce": _required_bool(cfg, "config", "enforce"),
|
||||
"api_allowlist": _required_str_list(cfg, "config", "api_allowlist"),
|
||||
}
|
||||
|
||||
if "seed_phrase_detection" in cfg:
|
||||
spd = _required_dict(cfg, "config", "seed_phrase_detection")
|
||||
_reject_unknown_keys("seed_phrase_detection", spd, {"enabled"})
|
||||
normalized["seed_phrase_detection"] = {
|
||||
"enabled": _required_bool(spd, "seed_phrase_detection", "enabled"),
|
||||
}
|
||||
|
||||
fp = _required_dict(cfg, "config", "forward_proxy")
|
||||
_reject_unknown_keys("forward_proxy", fp, {"enabled"})
|
||||
normalized["forward_proxy"] = {
|
||||
"enabled": _required_bool(fp, "forward_proxy", "enabled"),
|
||||
}
|
||||
|
||||
dlp = _required_dict(cfg, "config", "dlp")
|
||||
_reject_unknown_keys("dlp", dlp, {"include_defaults", "scan_env"})
|
||||
normalized["dlp"] = {
|
||||
"include_defaults": _required_bool(dlp, "dlp", "include_defaults"),
|
||||
"scan_env": _required_bool(dlp, "dlp", "scan_env"),
|
||||
}
|
||||
|
||||
rbs = _required_dict(cfg, "config", "request_body_scanning")
|
||||
_reject_unknown_keys(
|
||||
"request_body_scanning",
|
||||
rbs,
|
||||
{"action", "scan_headers", "header_mode"},
|
||||
)
|
||||
normalized_rbs: dict[str, object] = {
|
||||
"action": _required_str(rbs, "request_body_scanning", "action"),
|
||||
}
|
||||
scan_headers = _optional_bool(rbs, "request_body_scanning", "scan_headers")
|
||||
if scan_headers is not None:
|
||||
normalized_rbs["scan_headers"] = scan_headers
|
||||
header_mode = _optional_str(rbs, "request_body_scanning", "header_mode")
|
||||
if header_mode is not None:
|
||||
normalized_rbs["header_mode"] = header_mode
|
||||
normalized["request_body_scanning"] = normalized_rbs
|
||||
|
||||
if "tls_interception" in cfg:
|
||||
tls = _required_dict(cfg, "config", "tls_interception")
|
||||
_reject_unknown_keys(
|
||||
"tls_interception",
|
||||
tls,
|
||||
{"enabled", "ca_cert", "ca_key", "passthrough_domains"},
|
||||
)
|
||||
normalized["tls_interception"] = {
|
||||
"enabled": _required_bool(tls, "tls_interception", "enabled"),
|
||||
"ca_cert": _required_str(tls, "tls_interception", "ca_cert"),
|
||||
"ca_key": _required_str(tls, "tls_interception", "ca_key"),
|
||||
"passthrough_domains": _optional_str_list(
|
||||
tls, "tls_interception", "passthrough_domains",
|
||||
),
|
||||
}
|
||||
|
||||
if "ssrf" in cfg:
|
||||
ssrf = _required_dict(cfg, "config", "ssrf")
|
||||
_reject_unknown_keys("ssrf", ssrf, {"ip_allowlist"})
|
||||
normalized["ssrf"] = {
|
||||
"ip_allowlist": _required_str_list(ssrf, "ssrf", "ip_allowlist"),
|
||||
}
|
||||
|
||||
return normalized
|
||||
|
||||
|
||||
def pipelock_render_yaml(cfg: dict[str, object]) -> str:
|
||||
"""Render a pipelock config dict (as produced by
|
||||
`pipelock_build_config`) as YAML. Hand-rolled so we don't take a
|
||||
@@ -230,31 +403,38 @@ def pipelock_render_yaml(cfg: dict[str, object]) -> str:
|
||||
def _bool(b: object) -> str:
|
||||
return "true" if b else "false"
|
||||
|
||||
cfg = _validate_pipelock_render_config(cfg)
|
||||
lines: list[str] = []
|
||||
lines.append(f"version: {cfg['version']}")
|
||||
lines.append(f"mode: {cfg['mode']}")
|
||||
lines.append(f"enforce: {_bool(cfg['enforce'])}")
|
||||
lines.append("")
|
||||
lines.append("api_allowlist:")
|
||||
for h in cast(list[str], cfg["api_allowlist"]):
|
||||
api_allowlist = cfg["api_allowlist"]
|
||||
assert isinstance(api_allowlist, list)
|
||||
for h in api_allowlist:
|
||||
lines.append(f' - "{h}"')
|
||||
lines.append("")
|
||||
if "seed_phrase_detection" in cfg:
|
||||
lines.append("seed_phrase_detection:")
|
||||
spd = cast(dict[str, object], cfg["seed_phrase_detection"])
|
||||
spd = cfg["seed_phrase_detection"]
|
||||
assert isinstance(spd, dict)
|
||||
lines.append(f" enabled: {_bool(spd['enabled'])}")
|
||||
lines.append("")
|
||||
lines.append("forward_proxy:")
|
||||
fp = cast(dict[str, object], cfg["forward_proxy"])
|
||||
fp = cfg["forward_proxy"]
|
||||
assert isinstance(fp, dict)
|
||||
lines.append(f" enabled: {_bool(fp['enabled'])}")
|
||||
lines.append("")
|
||||
lines.append("dlp:")
|
||||
dlp = cast(dict[str, object], cfg["dlp"])
|
||||
dlp = cfg["dlp"]
|
||||
assert isinstance(dlp, dict)
|
||||
lines.append(f" include_defaults: {_bool(dlp['include_defaults'])}")
|
||||
lines.append(f" scan_env: {_bool(dlp['scan_env'])}")
|
||||
lines.append("")
|
||||
lines.append("request_body_scanning:")
|
||||
rbs = cast(dict[str, object], cfg["request_body_scanning"])
|
||||
rbs = cfg["request_body_scanning"]
|
||||
assert isinstance(rbs, dict)
|
||||
lines.append(f' action: "{rbs["action"]}"')
|
||||
if "scan_headers" in rbs:
|
||||
lines.append(f" scan_headers: {_bool(rbs['scan_headers'])}")
|
||||
@@ -263,11 +443,13 @@ def pipelock_render_yaml(cfg: dict[str, object]) -> str:
|
||||
if "tls_interception" in cfg:
|
||||
lines.append("")
|
||||
lines.append("tls_interception:")
|
||||
tls = cast(dict[str, object], cfg["tls_interception"])
|
||||
tls = cfg["tls_interception"]
|
||||
assert isinstance(tls, dict)
|
||||
lines.append(f" enabled: {_bool(tls['enabled'])}")
|
||||
lines.append(f' ca_cert: "{tls["ca_cert"]}"')
|
||||
lines.append(f' ca_key: "{tls["ca_key"]}"')
|
||||
passthrough = cast(list[str], tls.get("passthrough_domains", []))
|
||||
passthrough = tls["passthrough_domains"]
|
||||
assert isinstance(passthrough, list)
|
||||
if passthrough:
|
||||
lines.append(" passthrough_domains:")
|
||||
for d in passthrough:
|
||||
@@ -275,9 +457,12 @@ def pipelock_render_yaml(cfg: dict[str, object]) -> str:
|
||||
if "ssrf" in cfg:
|
||||
lines.append("")
|
||||
lines.append("ssrf:")
|
||||
ssrf = cast(dict[str, object], cfg["ssrf"])
|
||||
ssrf = cfg["ssrf"]
|
||||
assert isinstance(ssrf, dict)
|
||||
lines.append(" ip_allowlist:")
|
||||
for ip in cast(list[str], ssrf["ip_allowlist"]):
|
||||
ip_allowlist = ssrf["ip_allowlist"]
|
||||
assert isinstance(ip_allowlist, list)
|
||||
for ip in ip_allowlist:
|
||||
lines.append(f' - "{ip}"')
|
||||
return "\n".join(lines) + "\n"
|
||||
|
||||
|
||||
@@ -0,0 +1,113 @@
|
||||
# PRD 0037: Pipelock YAML Render Contract
|
||||
|
||||
- **Status:** Active
|
||||
- **Author:** didericis-codex
|
||||
- **Created:** 2026-06-02
|
||||
- **Issue:** #130
|
||||
|
||||
## Summary
|
||||
|
||||
Lock down the contract between `pipelock_build_config` and
|
||||
`pipelock_render_yaml` so hand-rendered pipelock YAML stays aligned with the
|
||||
structured config bot-bottle builds. Keep the stdlib-only renderer, but add
|
||||
shape validation and semantic tests for every supported section.
|
||||
|
||||
## Problem
|
||||
|
||||
`bot_bottle/pipelock.py` builds a structured dict and then renders a fixed YAML
|
||||
shape by hand. This avoids a runtime YAML dependency, but it also means the
|
||||
renderer directly indexes expected keys. If `pipelock_build_config` adds,
|
||||
renames, or conditionalizes a section, rendering can fail at runtime or emit
|
||||
YAML that no longer matches the config semantics.
|
||||
|
||||
Existing tests assert important rendered fragments, but they do not fully lock
|
||||
the build/render contract or optional-section combinations. A mismatch here can
|
||||
weaken DLP enforcement or break bottle launch after a future pipelock policy
|
||||
change.
|
||||
|
||||
## Goals / Success Criteria
|
||||
|
||||
- Keep the renderer stdlib-only.
|
||||
- Define the supported pipelock config shape in one place.
|
||||
- Fail clearly when `pipelock_render_yaml` receives an unsupported or malformed
|
||||
config shape.
|
||||
- Add tests covering all supported sections:
|
||||
- base allowlist and forward proxy.
|
||||
- seed phrase detection toggle.
|
||||
- DLP and request-body/header scanning.
|
||||
- TLS interception and passthrough domains.
|
||||
- SSRF IP allowlist.
|
||||
- Add semantic tests that compare structured config values to rendered YAML
|
||||
output without relying only on brittle substring assertions.
|
||||
- Preserve current rendered YAML for existing configs unless a clearer failure
|
||||
path requires an error message change.
|
||||
|
||||
## Non-goals
|
||||
|
||||
- No PyYAML or other runtime dependency.
|
||||
- No change to pipelock policy defaults.
|
||||
- No change to egress-to-pipelock topology.
|
||||
- No change to pipelock image version or config schema beyond validation of the
|
||||
shape bot-bottle already emits.
|
||||
|
||||
## Scope
|
||||
|
||||
In scope:
|
||||
|
||||
- `bot_bottle/pipelock.py` render helpers and validation.
|
||||
- Unit tests in `tests/unit/test_pipelock_yaml.py` and related focused
|
||||
pipelock tests.
|
||||
- Small helper functions for typed access to config sections, if useful.
|
||||
|
||||
Out of scope:
|
||||
|
||||
- Launch/backend changes.
|
||||
- Integration tests that start a real pipelock container.
|
||||
- Changing the manifest schema for route-level pipelock policy.
|
||||
|
||||
## Design
|
||||
|
||||
Treat `pipelock_render_yaml` as a serializer for the narrow config shape
|
||||
produced by `pipelock_build_config`, not as a generic YAML renderer. Before
|
||||
rendering a section, validate that required keys exist with the expected
|
||||
primitive/list/dict types. Missing or unsupported shapes should raise a clear
|
||||
`ValueError` naming the section and key.
|
||||
|
||||
The supported top-level shape is `version`, `mode`, `enforce`,
|
||||
`api_allowlist`, `seed_phrase_detection`, `forward_proxy`, `dlp`,
|
||||
`request_body_scanning`, `tls_interception`, and `ssrf`. Required sections are
|
||||
validated before rendering; optional sections keep the current omission
|
||||
behavior. `request_body_scanning.scan_headers`,
|
||||
`request_body_scanning.header_mode`, and
|
||||
`tls_interception.passthrough_domains` remain optional for compatibility with
|
||||
parsed running configs that only contain the older rendered subset.
|
||||
|
||||
Tests should cover both normal output and failure cases. Because the project is
|
||||
stdlib-only, semantic tests can use a small purpose-built parser for the exact
|
||||
rendered shape or compare rendered lines to values from the structured config
|
||||
through helper assertions. The goal is to detect drift between config dict and
|
||||
YAML without adding a general YAML dependency.
|
||||
|
||||
Optional sections should be exercised in combinations:
|
||||
|
||||
- no TLS and no SSRF.
|
||||
- TLS enabled with empty and non-empty passthrough domains.
|
||||
- SSRF enabled with one or more IP/CIDR entries.
|
||||
- all optional sections enabled together.
|
||||
|
||||
## Testing Strategy
|
||||
|
||||
- Extend `tests/unit/test_pipelock_yaml.py` with semantic assertions tying each
|
||||
rendered section back to the config dict.
|
||||
- Add malformed-config tests for missing required keys and wrong section types.
|
||||
- Keep existing render fragment tests where they protect exact pipelock syntax.
|
||||
|
||||
Run:
|
||||
|
||||
- `python3 -m unittest tests.unit.test_pipelock_yaml`
|
||||
- `python3 -m unittest tests.unit.test_pipelock_allowlist`
|
||||
- `python3 -m unittest discover -s tests/unit`
|
||||
|
||||
## Open Questions
|
||||
|
||||
None.
|
||||
@@ -19,6 +19,7 @@ from bot_bottle.pipelock import (
|
||||
pipelock_build_config,
|
||||
pipelock_render_yaml,
|
||||
)
|
||||
from bot_bottle.yaml_subset import parse_yaml_subset
|
||||
from tests.fixtures import fixture_minimal
|
||||
|
||||
|
||||
@@ -158,6 +159,51 @@ class TestRenderAndWrite(unittest.TestCase):
|
||||
import shutil
|
||||
shutil.rmtree(self.out_dir, ignore_errors=True)
|
||||
|
||||
def assert_render_semantics_match(self, cfg: dict[str, object]) -> None:
|
||||
parsed = parse_yaml_subset(pipelock_render_yaml(cfg))
|
||||
self.assertEqual(cfg["version"], parsed["version"])
|
||||
self.assertEqual(cfg["mode"], parsed["mode"])
|
||||
self.assertEqual(cfg["enforce"], parsed["enforce"])
|
||||
parsed_allowlist = parsed["api_allowlist"]
|
||||
if cfg["api_allowlist"] == [] and parsed_allowlist is None:
|
||||
parsed_allowlist = []
|
||||
self.assertEqual(cfg["api_allowlist"], parsed_allowlist)
|
||||
self.assertEqual(cfg["forward_proxy"], parsed["forward_proxy"])
|
||||
self.assertEqual(cfg["dlp"], parsed["dlp"])
|
||||
self.assertEqual(
|
||||
cfg["request_body_scanning"],
|
||||
parsed["request_body_scanning"],
|
||||
)
|
||||
if "seed_phrase_detection" in cfg:
|
||||
self.assertEqual(
|
||||
cfg["seed_phrase_detection"],
|
||||
parsed["seed_phrase_detection"],
|
||||
)
|
||||
else:
|
||||
self.assertNotIn("seed_phrase_detection", parsed)
|
||||
|
||||
if "tls_interception" in cfg:
|
||||
expected_tls = cast(dict[str, object], cfg["tls_interception"])
|
||||
actual_tls = cast(dict[str, object], parsed["tls_interception"])
|
||||
self.assertEqual(expected_tls["enabled"], actual_tls["enabled"])
|
||||
self.assertEqual(expected_tls["ca_cert"], actual_tls["ca_cert"])
|
||||
self.assertEqual(expected_tls["ca_key"], actual_tls["ca_key"])
|
||||
expected_passthrough = expected_tls["passthrough_domains"]
|
||||
if expected_passthrough:
|
||||
self.assertEqual(
|
||||
expected_passthrough,
|
||||
actual_tls["passthrough_domains"],
|
||||
)
|
||||
else:
|
||||
self.assertNotIn("passthrough_domains", actual_tls)
|
||||
else:
|
||||
self.assertNotIn("tls_interception", parsed)
|
||||
|
||||
if "ssrf" in cfg:
|
||||
self.assertEqual(cfg["ssrf"], parsed["ssrf"])
|
||||
else:
|
||||
self.assertNotIn("ssrf", parsed)
|
||||
|
||||
def test_render_emits_required_top_level_keys(self):
|
||||
"""One render-level smoke check: the serialized YAML is plausibly
|
||||
the shape pipelock expects. We don't grep every key here — that's
|
||||
@@ -175,6 +221,67 @@ class TestRenderAndWrite(unittest.TestCase):
|
||||
self.assertNotIn("trusted_domains:", text)
|
||||
self.assertNotIn("ssrf:", text)
|
||||
|
||||
def test_render_semantics_match_minimal_config(self):
|
||||
cfg = pipelock_build_config(fixture_minimal().bottles["dev"])
|
||||
self.assert_render_semantics_match(cfg)
|
||||
|
||||
def test_render_semantics_match_tls_with_empty_passthrough(self):
|
||||
cfg = pipelock_build_config(
|
||||
fixture_minimal().bottles["dev"],
|
||||
ca_cert_path="/etc/pipelock-ca.pem",
|
||||
ca_key_path="/etc/pipelock-ca-key.pem",
|
||||
)
|
||||
self.assert_render_semantics_match(cfg)
|
||||
|
||||
def test_render_semantics_match_all_optional_sections(self):
|
||||
bottle = Manifest.from_json_obj({
|
||||
"bottles": {"dev": {"egress": {"routes": [
|
||||
{"host": "api.openai.com",
|
||||
"pipelock": {"tls_passthrough": True}},
|
||||
{"host": "gitea.dideric.is",
|
||||
"pipelock": {"ssrf_ip_allowlist": ["100.78.141.42/32"]}},
|
||||
]}}},
|
||||
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
|
||||
}).bottles["dev"]
|
||||
cfg = pipelock_build_config(
|
||||
bottle,
|
||||
ca_cert_path="/etc/pipelock-ca.pem",
|
||||
ca_key_path="/etc/pipelock-ca-key.pem",
|
||||
ssrf_ip_allowlist=("172.20.0.0/16",),
|
||||
)
|
||||
self.assert_render_semantics_match(cfg)
|
||||
|
||||
def test_render_rejects_missing_required_key(self):
|
||||
cfg = pipelock_build_config(fixture_minimal().bottles["dev"])
|
||||
del cfg["mode"]
|
||||
with self.assertRaisesRegex(ValueError, r"config\.mode"):
|
||||
pipelock_render_yaml(cfg)
|
||||
|
||||
def test_render_rejects_wrong_section_type(self):
|
||||
cfg = pipelock_build_config(fixture_minimal().bottles["dev"])
|
||||
cfg["dlp"] = []
|
||||
with self.assertRaisesRegex(ValueError, r"config\.dlp.*mapping"):
|
||||
pipelock_render_yaml(cfg)
|
||||
|
||||
def test_render_rejects_wrong_list_item_type(self):
|
||||
cfg = pipelock_build_config(
|
||||
fixture_minimal().bottles["dev"],
|
||||
ca_cert_path="/etc/pipelock-ca.pem",
|
||||
ca_key_path="/etc/pipelock-ca-key.pem",
|
||||
)
|
||||
tls = cast(dict[str, object], cfg["tls_interception"])
|
||||
tls["passthrough_domains"] = ["api.openai.com", 3]
|
||||
with self.assertRaisesRegex(
|
||||
ValueError, r"tls_interception\.passthrough_domains",
|
||||
):
|
||||
pipelock_render_yaml(cfg)
|
||||
|
||||
def test_render_rejects_unsupported_top_level_section(self):
|
||||
cfg = pipelock_build_config(fixture_minimal().bottles["dev"])
|
||||
cfg["trusted_domains"] = []
|
||||
with self.assertRaisesRegex(ValueError, r"config\.trusted_domains"):
|
||||
pipelock_render_yaml(cfg)
|
||||
|
||||
def test_prepare_writes_file_at_mode_600(self):
|
||||
plan = PipelockProxy().prepare(
|
||||
fixture_minimal().bottles["dev"], "demo", self.out_dir
|
||||
|
||||
Reference in New Issue
Block a user