From c9825cf701b871ea3af04e1afeb93708f4e039db Mon Sep 17 00:00:00 2001 From: didericis Date: Tue, 26 May 2026 02:17:42 -0400 Subject: [PATCH] refactor(egress): write routes.yaml as actual YAML, not JSON-in-yml MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `egress_render_routes` now emits hand-rolled YAML in the same style as `pipelock_render_yaml`. The egress addon parses it via `yaml_subset.parse_yaml_subset` — the same parser the manifest loader + pipelock_apply use. Why bother: routes.yaml is bind-mounted into the egress sidecar AND surfaced to operators through `routes edit` (PRD 0019). JSON- in-yml renders ugly in $EDITOR and signals "this is data" rather than "this is config you can read at a glance". Real YAML reads cleanly. Mechanics: - `yaml_subset.py` drops its `claude_bottle.log` dependency. Errors now raise `YamlSubsetError` (a `ValueError`); the manifest loader + pipelock_apply catch it at the boundary and forward to `die` / `PipelockApplyError` so callers see the same behavior they did before. - `Dockerfile.egress` adds one COPY line for `yaml_subset.py` so it sits flat in `/app/` next to the addon. The addon uses an absolute-import-with-fallback shim so the same file works inside the container AND from the host's unit tests. - `egress_apply._merge_single_route` round-trips current routes.yaml through `parse_yaml_subset` + a new `_render_routes_payload` helper instead of `json.loads` + `json.dumps`. End-to-end: rebuilt the egress image, ran `./cli.py start` to a full bring-up, confirmed the addon's boot log shows `egress: loaded 9 route(s)` — i.e., the YAML parses inside the container. 453 unit + 3 integration tests pass. --- Dockerfile.egress | 11 +- claude_bottle/backend/docker/egress_apply.py | 44 ++++-- .../backend/docker/pipelock_apply.py | 12 +- claude_bottle/egress.py | 43 +++--- claude_bottle/egress_addon_core.py | 30 ++-- claude_bottle/manifest.py | 6 +- claude_bottle/yaml_subset.py | 15 +- tests/unit/test_egress.py | 26 +++- tests/unit/test_egress_addon_core.py | 35 ++++- tests/unit/test_egress_apply.py | 134 +++++++++++------- tests/unit/test_yaml_subset.py | 22 +-- 11 files changed, 254 insertions(+), 124 deletions(-) diff --git a/Dockerfile.egress b/Dockerfile.egress index 9a51728..9993832 100644 --- a/Dockerfile.egress +++ b/Dockerfile.egress @@ -18,12 +18,15 @@ FROM mitmproxy/mitmproxy:11.1.3 USER root -# The addon ships as two files. `_core.py` is pure-logic, importable -# both inside the container and from the host's tests; `_addon.py` is -# the mitmproxy hook wrapper. Both land flat in /app/ so mitmdump's -# loader finds them as top-level sibling modules. +# The addon ships as three files. `_core.py` is pure-logic, +# importable both inside the container and from the host's tests; +# `_addon.py` is the mitmproxy hook wrapper; `yaml_subset.py` is +# the stdlib-only YAML parser the addon uses to read routes.yaml. +# All three land flat in /app/ so mitmdump's loader resolves them +# as top-level sibling modules (absolute imports). COPY claude_bottle/egress_addon_core.py /app/egress_addon_core.py COPY claude_bottle/egress_addon.py /app/egress_addon.py +COPY claude_bottle/yaml_subset.py /app/yaml_subset.py # Pre-create the runtime directories the backend's start step will # `docker cp` into. docker cp does not create intermediate dirs, so diff --git a/claude_bottle/backend/docker/egress_apply.py b/claude_bottle/backend/docker/egress_apply.py index eb72cd8..3f17e34 100644 --- a/claude_bottle/backend/docker/egress_apply.py +++ b/claude_bottle/backend/docker/egress_apply.py @@ -31,6 +31,7 @@ from pathlib import Path from ...egress import EGRESS_ROUTES_IN_CONTAINER from ...egress_addon_core import load_routes +from ...yaml_subset import YamlSubsetError, parse_yaml_subset from .bottle_state import egress_state_dir from .egress import egress_container_name from .pipelock_apply import ( @@ -42,6 +43,30 @@ from .pipelock_apply import ( ) +def _render_routes_payload(routes_list: list[dict[str, object]]) -> str: + """Render a list-of-dicts routes payload as YAML matching the + shape `egress_render_routes` produces. The apply path + round-trips current routes.yaml through this so the file the + sidecar sees stays in the YAML format the addon expects.""" + if not routes_list: + return "routes: []\n" + lines: list[str] = ["routes:"] + for entry in routes_list: + host = str(entry.get("host", "")) + lines.append(f' - host: "{host}"') + auth_scheme = entry.get("auth_scheme") + token_env = entry.get("token_env") + if auth_scheme and token_env: + lines.append(f' auth_scheme: "{auth_scheme}"') + lines.append(f' token_env: "{token_env}"') + paths = entry.get("path_allowlist") or [] + if paths: + lines.append(" path_allowlist:") + for p in paths: + lines.append(f' - "{p}"') + return "\n".join(lines) + "\n" + + def _egress_routes_host_path(slug: str) -> Path: """The bind-mount source for the egress sidecar's routes.yaml. Must match what egress.prepare wrote at chunk-2 paths.""" @@ -219,7 +244,7 @@ def _merge_single_route( current_yaml: str, new_route: dict[str, object], ) -> str: """Merge a single proposed route into the current routes.yaml - content, returning the merged JSON-as-yaml string. + content, returning the merged YAML string. Behavior: - If `new_route['host']` is NOT in the current routes → @@ -230,15 +255,15 @@ def _merge_single_route( on an existing host are ignored, matching the tool's documented semantics. - The supervisor renders the merged routes.yaml with the same - JSON layout the addon expects (host + path_allowlist + - auth_scheme + token_env). Token VALUES never appear here; the - routes file carries only env-var slot NAMES.""" + Round-trips the file through `yaml_subset` (the same parser + the addon uses), so the merged output is in the YAML format + the sidecar reads. Token VALUES never appear here; the routes + file carries only env-var slot NAMES.""" try: - cfg = json.loads(current_yaml) - except json.JSONDecodeError as e: + cfg = parse_yaml_subset(current_yaml) + except YamlSubsetError as e: raise EgressApplyError( - f"current routes.yaml is not valid JSON: {e}" + f"current routes.yaml is not valid YAML: {e}" ) from e routes = cfg.get("routes") if not isinstance(routes, list): @@ -299,8 +324,7 @@ def _merge_single_route( # the slot name they'll need to provision. routes.append(entry) - cfg["routes"] = routes - return json.dumps(cfg, indent=2) + "\n" + return _render_routes_payload(routes) def add_route(slug: str, proposed_route_json: str) -> tuple[str, str]: diff --git a/claude_bottle/backend/docker/pipelock_apply.py b/claude_bottle/backend/docker/pipelock_apply.py index 6c3ae0a..fe2c4d0 100644 --- a/claude_bottle/backend/docker/pipelock_apply.py +++ b/claude_bottle/backend/docker/pipelock_apply.py @@ -23,7 +23,7 @@ import tempfile from pathlib import Path from ...pipelock import pipelock_render_yaml -from ...yaml_subset import parse_yaml_subset +from ...yaml_subset import YamlSubsetError, parse_yaml_subset from .bottle_state import pipelock_state_dir from .pipelock import pipelock_container_name @@ -110,7 +110,10 @@ def fetch_current_allowlist(slug: str) -> str: line — the operator-facing format for the TUI / agent's current-config mount.""" yaml = fetch_current_yaml(slug) - cfg = parse_yaml_subset(yaml) + try: + cfg = parse_yaml_subset(yaml) + except YamlSubsetError as e: + raise PipelockApplyError(f"running pipelock yaml: {e}") from e hosts = cfg.get("api_allowlist", []) if not isinstance(hosts, list): raise PipelockApplyError( @@ -136,7 +139,10 @@ def apply_allowlist_change( new_hosts = parse_allowlist_content(new_allowlist_content) container = pipelock_container_name(slug) current_yaml = fetch_current_yaml(slug) - cfg = parse_yaml_subset(current_yaml) + try: + cfg = parse_yaml_subset(current_yaml) + except YamlSubsetError as e: + raise PipelockApplyError(f"running pipelock yaml: {e}") from e current_hosts = cfg.get("api_allowlist", []) if not isinstance(current_hosts, list): raise PipelockApplyError( diff --git a/claude_bottle/egress.py b/claude_bottle/egress.py index aa96acc..5823129 100644 --- a/claude_bottle/egress.py +++ b/claude_bottle/egress.py @@ -24,7 +24,6 @@ flow (PRD 0014) at egress and renames the MCP tool. from __future__ import annotations -import json from abc import ABC, abstractmethod from dataclasses import dataclass from pathlib import Path @@ -41,9 +40,10 @@ from .manifest import Bottle EGRESS_HOSTNAME = "egress" # In-container path the addon reads. Pre-created in -# `Dockerfile.egress` so `docker cp` can drop the file directly. -# `.yaml` extension per PRD 0017 — content is JSON (valid YAML) so -# both sides can use stdlib `json`. +# `Dockerfile.egress` so the host bind-mount can drop the file +# directly. Content is YAML (hand-rolled by `egress_render_routes` +# in the style of `pipelock_render_yaml`, parsed by `yaml_subset` +# inside the addon). EGRESS_ROUTES_IN_CONTAINER = "/etc/egress/routes.yaml" @@ -241,25 +241,32 @@ def egress_render_routes( ) -> str: """Serialize the route table for the addon to read. - JSON content (valid YAML), no token values, no host env-var - names — the only thing the addon needs at runtime is the host → - path_allowlist + auth_scheme + in-container env-var mapping. The - actual token values arrive via the container's environ. + YAML content — no token values, no host env-var names. The only + thing the addon needs at runtime is the host → path_allowlist + + auth_scheme + in-container env-var mapping. The actual token + values arrive via the container's environ. Authenticated routes carry `auth_scheme` + `token_env`; unauthenticated routes omit both keys (the addon's parser - enforces both-or-neither).""" - payload_routes: list[dict[str, object]] = [] + enforces both-or-neither). Hand-rolled YAML in the style of + `pipelock_render_yaml` so the addon's parser + (`yaml_subset.parse_yaml_subset`) round-trips it cleanly.""" + lines: list[str] = ["routes:"] + if not routes: + # `routes:` with an empty list on the same line — the parser + # needs SOMETHING here. Empty inline list is the cleanest. + lines[0] = "routes: []" + return "\n".join(lines) + "\n" for r in routes: - entry: dict[str, object] = {"host": r.host} - if r.path_allowlist: - entry["path_allowlist"] = list(r.path_allowlist) + lines.append(f' - host: "{r.host}"') if r.auth_scheme and r.token_env: - entry["auth_scheme"] = r.auth_scheme - entry["token_env"] = r.token_env - payload_routes.append(entry) - payload = {"routes": payload_routes} - return json.dumps(payload, indent=2, sort_keys=False) + "\n" + lines.append(f' auth_scheme: "{r.auth_scheme}"') + lines.append(f' token_env: "{r.token_env}"') + if r.path_allowlist: + lines.append(" path_allowlist:") + for p in r.path_allowlist: + lines.append(f' - "{p}"') + return "\n".join(lines) + "\n" def egress_resolve_token_values( diff --git a/claude_bottle/egress_addon_core.py b/claude_bottle/egress_addon_core.py index 396eeee..835dc5f 100644 --- a/claude_bottle/egress_addon_core.py +++ b/claude_bottle/egress_addon_core.py @@ -6,16 +6,28 @@ exercise the parse + decision functions without depending on the `mitmproxy.http.HTTPFlow` API and is loaded inside the sidecar container. -Stdlib only: this file ships into the egress image, where the -container's Python is whatever mitmproxy itself runs on. +Imports: stdlib + `yaml_subset` (which is itself stdlib-only and +ships flat into the egress image alongside this file — see +`Dockerfile.egress`). """ from __future__ import annotations -import json import typing from dataclasses import dataclass +# Absolute import — `yaml_subset.py` is copied flat into the egress +# image's `/app/` next to this file (via `Dockerfile.egress`). The +# host-side unit tests run with the repo on sys.path, where the +# bare `yaml_subset` module also resolves because +# `claude_bottle/yaml_subset.py` shadows it at import time... actually +# no, on host the module lives under the `claude_bottle` package. +# The try/except shim picks whichever import works. +try: + from yaml_subset import YamlSubsetError, parse_yaml_subset # type: ignore[import-not-found] +except ImportError: # pragma: no cover - host-side path + from .yaml_subset import YamlSubsetError, parse_yaml_subset + @dataclass(frozen=True) class Route: @@ -126,12 +138,14 @@ def _parse_one(idx: int, raw: object) -> Route: def load_routes(text: str) -> tuple[Route, ...]: - """Convenience: parse JSON text → routes. Raises `ValueError` for - both decode and shape errors so callers handle them uniformly.""" + """Parse YAML text → routes. Raises `ValueError` for both + decode and shape errors so callers handle them uniformly. + `YamlSubsetError` from the parser is a `ValueError` subclass so + it already satisfies the same surface; we let it propagate.""" try: - payload = json.loads(text) - except json.JSONDecodeError as e: - raise ValueError(f"routes payload: invalid JSON: {e}") from e + payload = parse_yaml_subset(text) + except YamlSubsetError as e: + raise ValueError(f"routes payload: invalid YAML: {e}") from e return parse_routes(payload) diff --git a/claude_bottle/manifest.py b/claude_bottle/manifest.py index 4ccb9a0..6725fec 100644 --- a/claude_bottle/manifest.py +++ b/claude_bottle/manifest.py @@ -45,7 +45,7 @@ from pathlib import Path from typing import Mapping, cast from .log import die, warn -from .yaml_subset import parse_frontmatter +from .yaml_subset import YamlSubsetError, parse_frontmatter def _empty_str_dict() -> dict[str, str]: @@ -832,6 +832,8 @@ def _load_bottles_from_dir(bottles_dir: Path) -> dict[str, Bottle]: fm, _body = parse_frontmatter(path.read_text()) except OSError as e: die(f"could not read {path}: {e}") + except YamlSubsetError as e: + die(f"{path}: {e}") unknown = set(fm.keys()) - _BOTTLE_KEYS if unknown: allowed = ", ".join(sorted(_BOTTLE_KEYS)) @@ -867,6 +869,8 @@ def _load_agents_from_dir( fm, body = parse_frontmatter(path.read_text()) except OSError as e: die(f"could not read {path}: {e}") + except YamlSubsetError as e: + die(f"{path}: {e}") unknown = set(fm.keys()) - _AGENT_KEYS if unknown: allowed = ", ".join(sorted(_AGENT_KEYS)) diff --git a/claude_bottle/yaml_subset.py b/claude_bottle/yaml_subset.py index 4267dc8..f6cb777 100644 --- a/claude_bottle/yaml_subset.py +++ b/claude_bottle/yaml_subset.py @@ -59,7 +59,20 @@ from __future__ import annotations import re from dataclasses import dataclass -from .log import die + +class YamlSubsetError(ValueError): + """Raised when input violates the YAML subset's rules. Callers + that want fatal-exit semantics (manifest loader, pipelock-apply, + etc.) catch this at their own boundary and forward to `die`; + callers running outside the claude-bottle CLI process (the + egress sidecar's addon) handle it as a normal exception.""" + + +def die(msg: str) -> None: + """Module-local helper so the parser body reads cleanly. Just + raises YamlSubsetError — the `claude-bottle: error: ` prefix + is added by the boundary `die` in `claude_bottle.log`.""" + raise YamlSubsetError(msg) # --- Tokenizer / line preprocessing ---------------------------------------- diff --git a/tests/unit/test_egress.py b/tests/unit/test_egress.py index 6dcd4f3..4587db3 100644 --- a/tests/unit/test_egress.py +++ b/tests/unit/test_egress.py @@ -1,7 +1,6 @@ """Unit: Egress route lift + routes.yaml render + token resolution (PRD 0017).""" -import json import unittest from claude_bottle.egress import ( @@ -14,6 +13,7 @@ from claude_bottle.egress import ( ) from claude_bottle.log import Die from claude_bottle.manifest import Manifest +from claude_bottle.yaml_subset import parse_yaml_subset def _bottle(routes): @@ -134,6 +134,15 @@ class TestTokenEnvMap(unittest.TestCase): class TestRenderRoutes(unittest.TestCase): + """Render is YAML now (PRD 0017 follow-up). Tests parse the + output via `yaml_subset` — the same parser the addon uses — + so the assertions check the actual semantic shape the addon + will see, not the textual layout.""" + + @staticmethod + def _parsed(routes) -> list[dict]: + return parse_yaml_subset(egress_render_routes(routes))["routes"] + def test_authenticated_route_serialised_with_auth_fields(self): b = _bottle([{ "host": "api.github.com", @@ -141,7 +150,7 @@ class TestRenderRoutes(unittest.TestCase): "path_allowlist": ["/repos/x/"], }]) routes = egress_manifest_routes(b) - payload = json.loads(egress_render_routes(routes)) + parsed = self._parsed(routes) self.assertEqual( [{ "host": "api.github.com", @@ -149,7 +158,7 @@ class TestRenderRoutes(unittest.TestCase): "auth_scheme": "Bearer", "token_env": "EGRESS_TOKEN_0", }], - payload["routes"], + parsed, ) def test_unauthenticated_route_omits_auth_fields(self): @@ -159,8 +168,7 @@ class TestRenderRoutes(unittest.TestCase): # round-trip as a partial pair and crash. b = _bottle([{"host": "github.com", "path_allowlist": ["/x/"]}]) routes = egress_manifest_routes(b) - payload = json.loads(egress_render_routes(routes)) - entry = payload["routes"][0] + entry = self._parsed(routes)[0] self.assertNotIn("auth_scheme", entry) self.assertNotIn("token_env", entry) @@ -170,8 +178,12 @@ class TestRenderRoutes(unittest.TestCase): "auth": {"scheme": "Bearer", "token_ref": "CL"}, }]) routes = egress_manifest_routes(b) - payload = json.loads(egress_render_routes(routes)) - self.assertNotIn("path_allowlist", payload["routes"][0]) + self.assertNotIn("path_allowlist", self._parsed(routes)[0]) + + def test_empty_routes_round_trips(self): + rendered = egress_render_routes(()) + # Inline-empty-list form is what the parser accepts. + self.assertEqual([], parse_yaml_subset(rendered)["routes"]) def test_round_trip_through_addon_core(self): # Render here → parse in the addon must succeed for every diff --git a/tests/unit/test_egress_addon_core.py b/tests/unit/test_egress_addon_core.py index 4212f16..ab13f8b 100644 --- a/tests/unit/test_egress_addon_core.py +++ b/tests/unit/test_egress_addon_core.py @@ -105,16 +105,39 @@ class TestParseRoutes(unittest.TestCase): class TestLoadRoutes(unittest.TestCase): - def test_json_text_round_trip(self): - routes = load_routes('{"routes":[{"host":"api.example"}]}') + def test_yaml_text_round_trip(self): + routes = load_routes( + 'routes:\n' + ' - host: "api.example"\n' + ) self.assertEqual(1, len(routes)) self.assertEqual("api.example", routes[0].host) - def test_invalid_json_raises_value_error(self): - # Both decode and schema errors land as ValueError so callers - # have a single except clause. + def test_full_route_shape_parses(self): + routes = load_routes( + 'routes:\n' + ' - host: "api.example"\n' + ' auth_scheme: "Bearer"\n' + ' token_env: "EGRESS_TOKEN_0"\n' + ' path_allowlist:\n' + ' - "/v1/"\n' + ' - "/messages"\n' + ) + self.assertEqual(1, len(routes)) + r = routes[0] + self.assertEqual("api.example", r.host) + self.assertEqual("Bearer", r.auth_scheme) + self.assertEqual("EGRESS_TOKEN_0", r.token_env) + self.assertEqual(("/v1/", "/messages"), r.path_allowlist) + + def test_empty_routes_list(self): + routes = load_routes("routes: []\n") + self.assertEqual((), routes) + + def test_invalid_yaml_raises_value_error(self): + # Tab indent is a YamlSubsetError; ValueError is its base. with self.assertRaises(ValueError): - load_routes("not json at all") + load_routes("routes:\n\t- host: x\n") # --- match_route --------------------------------------------------------- diff --git a/tests/unit/test_egress_apply.py b/tests/unit/test_egress_apply.py index ad40941..dbe714c 100644 --- a/tests/unit/test_egress_apply.py +++ b/tests/unit/test_egress_apply.py @@ -4,8 +4,6 @@ integration test.""" import unittest -import json - from claude_bottle.backend.docker.egress_apply import ( EgressApplyError, _hosts_in_routes, @@ -13,56 +11,68 @@ from claude_bottle.backend.docker.egress_apply import ( _pipelock_safe_hosts, validate_routes_content, ) +from claude_bottle.yaml_subset import parse_yaml_subset + + +# YAML fixtures matching the hand-rolled `_render_routes_payload` +# shape. Per-test custom shapes are spelled inline; these are the +# common ones. +_ROUTES_EMPTY = "routes: []\n" +_ROUTES_ONE = 'routes:\n - host: "api.anthropic.com"\n' + + +def _routes(parsed: str) -> list[dict]: + """Parse a YAML routes string and pull out the routes list, so + tests can assert on shape directly.""" + return parse_yaml_subset(parsed)["routes"] class TestValidateRoutesContent(unittest.TestCase): def test_accepts_minimal_route_table(self): - validate_routes_content('{"routes": []}') - validate_routes_content( - '{"routes": [{"host": "api.github.com"}]}' - ) + validate_routes_content(_ROUTES_EMPTY) + validate_routes_content(_ROUTES_ONE) def test_accepts_full_route(self): validate_routes_content( - '{"routes": [{"host": "api.github.com",' - ' "path_allowlist": ["/repos/x/"],' - ' "auth_scheme": "Bearer",' - ' "token_env": "EGRESS_TOKEN_0"}]}' + 'routes:\n' + ' - host: "api.github.com"\n' + ' auth_scheme: "Bearer"\n' + ' token_env: "EGRESS_TOKEN_0"\n' + ' path_allowlist:\n' + ' - "/repos/x/"\n' ) - def test_rejects_bad_json(self): + def test_rejects_bad_yaml(self): with self.assertRaises(EgressApplyError) as cm: - validate_routes_content("{not json") + validate_routes_content("routes:\n\t- host: x\n") self.assertIn("not valid", str(cm.exception)) - def test_rejects_non_object_top_level(self): - with self.assertRaises(EgressApplyError): - validate_routes_content("[]") - def test_rejects_missing_routes_key(self): with self.assertRaises(EgressApplyError): - validate_routes_content('{"other": []}') + validate_routes_content("other: []\n") def test_rejects_non_list_routes(self): with self.assertRaises(EgressApplyError): - validate_routes_content('{"routes": "not a list"}') + validate_routes_content('routes: "not a list"\n') def test_rejects_partial_auth_pair(self): # The addon-core parser enforces both-or-neither — the apply # path picks this up before SIGHUP'ing the sidecar. with self.assertRaises(EgressApplyError): validate_routes_content( - '{"routes": [{"host": "x.example",' - ' "auth_scheme": "Bearer"}]}' + 'routes:\n' + ' - host: "x.example"\n' + ' auth_scheme: "Bearer"\n' ) class TestHostsInRoutes(unittest.TestCase): def test_extracts_each_unique_host(self): hosts = _hosts_in_routes( - '{"routes": [{"host": "api.github.com"},' - ' {"host": "github.com"},' - ' {"host": "api.anthropic.com"}]}' + 'routes:\n' + ' - host: "api.github.com"\n' + ' - host: "github.com"\n' + ' - host: "api.anthropic.com"\n' ) # Sorted+deduped. self.assertEqual( @@ -72,28 +82,34 @@ class TestHostsInRoutes(unittest.TestCase): def test_dedupes_same_host(self): hosts = _hosts_in_routes( - '{"routes": [{"host": "x.example", "path_allowlist": ["/a/"]},' - ' {"host": "x.example", "path_allowlist": ["/b/"]}]}' + 'routes:\n' + ' - host: "x.example"\n' + ' path_allowlist:\n' + ' - "/a/"\n' + ' - host: "x.example"\n' + ' path_allowlist:\n' + ' - "/b/"\n' ) self.assertEqual(["x.example"], hosts) def test_empty_routes_returns_empty(self): - self.assertEqual([], _hosts_in_routes('{"routes": []}')) + self.assertEqual([], _hosts_in_routes(_ROUTES_EMPTY)) def test_invalid_routes_raises(self): # The mirror helper relies on parsing succeeding; bad input # should error before pipelock is touched. with self.assertRaises(EgressApplyError): - _hosts_in_routes('{"routes": [{"path": "/no-host/"}]}') + _hosts_in_routes( + 'routes:\n - path_allowlist:\n - "/no-host/"\n' + ) class TestMergeSingleRoute(unittest.TestCase): - BASE = '{"routes": [{"host": "api.anthropic.com"}]}' + BASE = _ROUTES_ONE def test_appends_route_when_host_absent(self): merged = _merge_single_route(self.BASE, {"host": "github.com"}) - routes = json.loads(merged)["routes"] - hosts = [r["host"] for r in routes] + hosts = [r["host"] for r in _routes(merged)] self.assertEqual(["api.anthropic.com", "github.com"], hosts) def test_appends_path_allowlist(self): @@ -101,7 +117,7 @@ class TestMergeSingleRoute(unittest.TestCase): self.BASE, {"host": "github.com", "path_allowlist": ["/repos/x/"]}, ) - new_route = json.loads(merged)["routes"][-1] + new_route = _routes(merged)[-1] self.assertEqual(["/repos/x/"], new_route["path_allowlist"]) def test_appends_auth_with_token_env_slot(self): @@ -112,72 +128,80 @@ class TestMergeSingleRoute(unittest.TestCase): "auth": {"scheme": "Bearer", "token_ref": "GH"}, }, ) - new_route = json.loads(merged)["routes"][-1] + new_route = _routes(merged)[-1] self.assertEqual("Bearer", new_route["auth_scheme"]) # First auth slot when no prior auth routes exist. self.assertEqual("EGRESS_TOKEN_0", new_route["token_env"]) def test_auth_slot_increments_past_existing(self): - base = json.dumps({"routes": [ - {"host": "api.anthropic.com", - "auth_scheme": "Bearer", - "token_env": "EGRESS_TOKEN_0"}, - ]}) + base = ( + 'routes:\n' + ' - host: "api.anthropic.com"\n' + ' auth_scheme: "Bearer"\n' + ' token_env: "EGRESS_TOKEN_0"\n' + ) merged = _merge_single_route(base, { "host": "api.github.com", "auth": {"scheme": "Bearer", "token_ref": "GH"}, }) - new_route = json.loads(merged)["routes"][-1] + new_route = _routes(merged)[-1] self.assertEqual("EGRESS_TOKEN_1", new_route["token_env"]) def test_existing_host_merges_path_allowlist_as_union(self): - base = json.dumps({"routes": [ - {"host": "github.com", "path_allowlist": ["/a/"]}, - ]}) + base = ( + 'routes:\n' + ' - host: "github.com"\n' + ' path_allowlist:\n' + ' - "/a/"\n' + ) merged = _merge_single_route(base, { "host": "github.com", "path_allowlist": ["/b/"], }) - routes = json.loads(merged)["routes"] + routes = _routes(merged) self.assertEqual(1, len(routes)) # not duplicated self.assertEqual(["/a/", "/b/"], routes[0]["path_allowlist"]) def test_existing_host_dedup_path_allowlist(self): - base = json.dumps({"routes": [ - {"host": "github.com", "path_allowlist": ["/a/"]}, - ]}) + base = ( + 'routes:\n' + ' - host: "github.com"\n' + ' path_allowlist:\n' + ' - "/a/"\n' + ) merged = _merge_single_route(base, { "host": "github.com", "path_allowlist": ["/a/", "/b/"], }) self.assertEqual( ["/a/", "/b/"], - json.loads(merged)["routes"][0]["path_allowlist"], + _routes(merged)[0]["path_allowlist"], ) def test_existing_host_preserves_existing_auth_ignores_proposed(self): # Tool docs: auth on an existing host is operator-controlled, # not agent-controlled. The merge must not overwrite. - base = json.dumps({"routes": [ - {"host": "api.github.com", - "auth_scheme": "Bearer", - "token_env": "EGRESS_TOKEN_0"}, - ]}) + base = ( + 'routes:\n' + ' - host: "api.github.com"\n' + ' auth_scheme: "Bearer"\n' + ' token_env: "EGRESS_TOKEN_0"\n' + ) merged = _merge_single_route(base, { "host": "api.github.com", "auth": {"scheme": "token", "token_ref": "DIFFERENT"}, }) - route = json.loads(merged)["routes"][0] + route = _routes(merged)[0] self.assertEqual("Bearer", route["auth_scheme"]) self.assertEqual("EGRESS_TOKEN_0", route["token_env"]) def test_host_match_is_case_insensitive(self): - base = json.dumps({"routes": [{"host": "GitHub.com"}]}) + base = 'routes:\n - host: "GitHub.com"\n' merged = _merge_single_route(base, { "host": "github.com", "path_allowlist": ["/x/"], }) - routes = json.loads(merged)["routes"] + routes = _routes(merged) self.assertEqual(1, len(routes)) self.assertEqual(["/x/"], routes[0]["path_allowlist"]) @@ -187,7 +211,7 @@ class TestMergeSingleRoute(unittest.TestCase): def test_invalid_current_yaml_raises(self): with self.assertRaises(EgressApplyError): - _merge_single_route("{not json", {"host": "x.example"}) + _merge_single_route("routes:\n\tbad", {"host": "x.example"}) class TestPipelockSafeHosts(unittest.TestCase): diff --git a/tests/unit/test_yaml_subset.py b/tests/unit/test_yaml_subset.py index 04a515f..cd60736 100644 --- a/tests/unit/test_yaml_subset.py +++ b/tests/unit/test_yaml_subset.py @@ -5,7 +5,7 @@ actually use, and every rejection case the PRD enumerates.""" import textwrap import unittest -from claude_bottle.log import Die +from claude_bottle.yaml_subset import YamlSubsetError from claude_bottle.yaml_subset import parse_frontmatter, parse_yaml_subset @@ -60,7 +60,7 @@ class TestForbiddenBoolLikes(unittest.TestCase): """Ambiguous bool-ish tokens have to be quoted explicitly.""" def _expect_die(self, src: str): - with self.assertRaises(Die): + with self.assertRaises(YamlSubsetError): _y(src) def test_yes_dies(self): @@ -81,7 +81,7 @@ class TestForbiddenBoolLikes(unittest.TestCase): class TestForbiddenScalarShapes(unittest.TestCase): def _expect_die(self, src: str): - with self.assertRaises(Die): + with self.assertRaises(YamlSubsetError): _y(src) def test_bare_date_dies(self): @@ -120,14 +120,14 @@ class TestMapping(unittest.TestCase): self.assertEqual({"outer": {"inner": "hello", "other": 5}}, out) def test_duplicate_key_dies(self): - with self.assertRaises(Die): + with self.assertRaises(YamlSubsetError): _y(""" a: 1 a: 2 """) def test_key_must_be_bare_identifier(self): - with self.assertRaises(Die): + with self.assertRaises(YamlSubsetError): _y('"weird key": 1\n') @@ -202,20 +202,20 @@ class TestInline(unittest.TestCase): ) def test_nested_flow_dies(self): - with self.assertRaises(Die): + with self.assertRaises(YamlSubsetError): _y("l: [[1, 2], [3, 4]]\n") class TestForbiddenConstructs(unittest.TestCase): def test_anchor_dies(self): - with self.assertRaises(Die): + with self.assertRaises(YamlSubsetError): _y(""" a: &anchor 1 b: *anchor """) def test_multiline_block_scalar_dies(self): - with self.assertRaises(Die): + with self.assertRaises(YamlSubsetError): _y(""" k: | line 1 @@ -223,11 +223,11 @@ class TestForbiddenConstructs(unittest.TestCase): """) def test_tag_dies(self): - with self.assertRaises(Die): + with self.assertRaises(YamlSubsetError): _y("k: !!str hello\n") def test_tab_in_indent_dies(self): - with self.assertRaises(Die): + with self.assertRaises(YamlSubsetError): _y("a:\n\tb: 1\n") @@ -306,7 +306,7 @@ class TestFrontmatter(unittest.TestCase): self.assertEqual(text, body) def test_unclosed_frontmatter_dies(self): - with self.assertRaises(Die): + with self.assertRaises(YamlSubsetError): parse_frontmatter("---\nbottle: dev\nno closing") def test_body_preserves_blank_lines(self):