refactor(egress): write routes.yaml as actual YAML, not JSON-in-yml
test / unit (pull_request) Successful in 18s
test / integration (pull_request) Successful in 1m7s

`egress_render_routes` now emits hand-rolled YAML in the same style
as `pipelock_render_yaml`. The egress addon parses it via
`yaml_subset.parse_yaml_subset` — the same parser the manifest
loader + pipelock_apply use.

Why bother: routes.yaml is bind-mounted into the egress sidecar
AND surfaced to operators through `routes edit` (PRD 0019). JSON-
in-yml renders ugly in $EDITOR and signals "this is data" rather
than "this is config you can read at a glance". Real YAML reads
cleanly.

Mechanics:

  - `yaml_subset.py` drops its `claude_bottle.log` dependency.
    Errors now raise `YamlSubsetError` (a `ValueError`); the
    manifest loader + pipelock_apply catch it at the boundary
    and forward to `die` / `PipelockApplyError` so callers see
    the same behavior they did before.
  - `Dockerfile.egress` adds one COPY line for `yaml_subset.py`
    so it sits flat in `/app/` next to the addon. The addon
    uses an absolute-import-with-fallback shim so the same file
    works inside the container AND from the host's unit tests.
  - `egress_apply._merge_single_route` round-trips current
    routes.yaml through `parse_yaml_subset` + a new
    `_render_routes_payload` helper instead of `json.loads` +
    `json.dumps`.

End-to-end: rebuilt the egress image, ran `./cli.py start` to a
full bring-up, confirmed the addon's boot log shows `egress:
loaded 9 route(s)` — i.e., the YAML parses inside the container.
453 unit + 3 integration tests pass.
This commit is contained in:
2026-05-26 02:17:42 -04:00
parent 11d5bf1489
commit c9825cf701
11 changed files with 254 additions and 124 deletions
+7 -4
View File
@@ -18,12 +18,15 @@ FROM mitmproxy/mitmproxy:11.1.3
USER root USER root
# The addon ships as two files. `_core.py` is pure-logic, importable # The addon ships as three files. `_core.py` is pure-logic,
# both inside the container and from the host's tests; `_addon.py` is # importable both inside the container and from the host's tests;
# the mitmproxy hook wrapper. Both land flat in /app/ so mitmdump's # `_addon.py` is the mitmproxy hook wrapper; `yaml_subset.py` is
# loader finds them as top-level sibling modules. # the stdlib-only YAML parser the addon uses to read routes.yaml.
# All three land flat in /app/ so mitmdump's loader resolves them
# as top-level sibling modules (absolute imports).
COPY claude_bottle/egress_addon_core.py /app/egress_addon_core.py COPY claude_bottle/egress_addon_core.py /app/egress_addon_core.py
COPY claude_bottle/egress_addon.py /app/egress_addon.py COPY claude_bottle/egress_addon.py /app/egress_addon.py
COPY claude_bottle/yaml_subset.py /app/yaml_subset.py
# Pre-create the runtime directories the backend's start step will # Pre-create the runtime directories the backend's start step will
# `docker cp` into. docker cp does not create intermediate dirs, so # `docker cp` into. docker cp does not create intermediate dirs, so
+34 -10
View File
@@ -31,6 +31,7 @@ from pathlib import Path
from ...egress import EGRESS_ROUTES_IN_CONTAINER from ...egress import EGRESS_ROUTES_IN_CONTAINER
from ...egress_addon_core import load_routes from ...egress_addon_core import load_routes
from ...yaml_subset import YamlSubsetError, parse_yaml_subset
from .bottle_state import egress_state_dir from .bottle_state import egress_state_dir
from .egress import egress_container_name from .egress import egress_container_name
from .pipelock_apply import ( from .pipelock_apply import (
@@ -42,6 +43,30 @@ from .pipelock_apply import (
) )
def _render_routes_payload(routes_list: list[dict[str, object]]) -> str:
"""Render a list-of-dicts routes payload as YAML matching the
shape `egress_render_routes` produces. The apply path
round-trips current routes.yaml through this so the file the
sidecar sees stays in the YAML format the addon expects."""
if not routes_list:
return "routes: []\n"
lines: list[str] = ["routes:"]
for entry in routes_list:
host = str(entry.get("host", ""))
lines.append(f' - host: "{host}"')
auth_scheme = entry.get("auth_scheme")
token_env = entry.get("token_env")
if auth_scheme and token_env:
lines.append(f' auth_scheme: "{auth_scheme}"')
lines.append(f' token_env: "{token_env}"')
paths = entry.get("path_allowlist") or []
if paths:
lines.append(" path_allowlist:")
for p in paths:
lines.append(f' - "{p}"')
return "\n".join(lines) + "\n"
def _egress_routes_host_path(slug: str) -> Path: def _egress_routes_host_path(slug: str) -> Path:
"""The bind-mount source for the egress sidecar's routes.yaml. """The bind-mount source for the egress sidecar's routes.yaml.
Must match what egress.prepare wrote at chunk-2 paths.""" Must match what egress.prepare wrote at chunk-2 paths."""
@@ -219,7 +244,7 @@ def _merge_single_route(
current_yaml: str, new_route: dict[str, object], current_yaml: str, new_route: dict[str, object],
) -> str: ) -> str:
"""Merge a single proposed route into the current routes.yaml """Merge a single proposed route into the current routes.yaml
content, returning the merged JSON-as-yaml string. content, returning the merged YAML string.
Behavior: Behavior:
- If `new_route['host']` is NOT in the current routes → - If `new_route['host']` is NOT in the current routes →
@@ -230,15 +255,15 @@ def _merge_single_route(
on an existing host are ignored, matching the tool's on an existing host are ignored, matching the tool's
documented semantics. documented semantics.
The supervisor renders the merged routes.yaml with the same Round-trips the file through `yaml_subset` (the same parser
JSON layout the addon expects (host + path_allowlist + the addon uses), so the merged output is in the YAML format
auth_scheme + token_env). Token VALUES never appear here; the the sidecar reads. Token VALUES never appear here; the routes
routes file carries only env-var slot NAMES.""" file carries only env-var slot NAMES."""
try: try:
cfg = json.loads(current_yaml) cfg = parse_yaml_subset(current_yaml)
except json.JSONDecodeError as e: except YamlSubsetError as e:
raise EgressApplyError( raise EgressApplyError(
f"current routes.yaml is not valid JSON: {e}" f"current routes.yaml is not valid YAML: {e}"
) from e ) from e
routes = cfg.get("routes") routes = cfg.get("routes")
if not isinstance(routes, list): if not isinstance(routes, list):
@@ -299,8 +324,7 @@ def _merge_single_route(
# the slot name they'll need to provision. # the slot name they'll need to provision.
routes.append(entry) routes.append(entry)
cfg["routes"] = routes return _render_routes_payload(routes)
return json.dumps(cfg, indent=2) + "\n"
def add_route(slug: str, proposed_route_json: str) -> tuple[str, str]: def add_route(slug: str, proposed_route_json: str) -> tuple[str, str]:
@@ -23,7 +23,7 @@ import tempfile
from pathlib import Path from pathlib import Path
from ...pipelock import pipelock_render_yaml from ...pipelock import pipelock_render_yaml
from ...yaml_subset import parse_yaml_subset from ...yaml_subset import YamlSubsetError, parse_yaml_subset
from .bottle_state import pipelock_state_dir from .bottle_state import pipelock_state_dir
from .pipelock import pipelock_container_name from .pipelock import pipelock_container_name
@@ -110,7 +110,10 @@ def fetch_current_allowlist(slug: str) -> str:
line — the operator-facing format for the TUI / agent's line — the operator-facing format for the TUI / agent's
current-config mount.""" current-config mount."""
yaml = fetch_current_yaml(slug) yaml = fetch_current_yaml(slug)
cfg = parse_yaml_subset(yaml) try:
cfg = parse_yaml_subset(yaml)
except YamlSubsetError as e:
raise PipelockApplyError(f"running pipelock yaml: {e}") from e
hosts = cfg.get("api_allowlist", []) hosts = cfg.get("api_allowlist", [])
if not isinstance(hosts, list): if not isinstance(hosts, list):
raise PipelockApplyError( raise PipelockApplyError(
@@ -136,7 +139,10 @@ def apply_allowlist_change(
new_hosts = parse_allowlist_content(new_allowlist_content) new_hosts = parse_allowlist_content(new_allowlist_content)
container = pipelock_container_name(slug) container = pipelock_container_name(slug)
current_yaml = fetch_current_yaml(slug) current_yaml = fetch_current_yaml(slug)
cfg = parse_yaml_subset(current_yaml) try:
cfg = parse_yaml_subset(current_yaml)
except YamlSubsetError as e:
raise PipelockApplyError(f"running pipelock yaml: {e}") from e
current_hosts = cfg.get("api_allowlist", []) current_hosts = cfg.get("api_allowlist", [])
if not isinstance(current_hosts, list): if not isinstance(current_hosts, list):
raise PipelockApplyError( raise PipelockApplyError(
+25 -18
View File
@@ -24,7 +24,6 @@ flow (PRD 0014) at egress and renames the MCP tool.
from __future__ import annotations from __future__ import annotations
import json
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
@@ -41,9 +40,10 @@ from .manifest import Bottle
EGRESS_HOSTNAME = "egress" EGRESS_HOSTNAME = "egress"
# In-container path the addon reads. Pre-created in # In-container path the addon reads. Pre-created in
# `Dockerfile.egress` so `docker cp` can drop the file directly. # `Dockerfile.egress` so the host bind-mount can drop the file
# `.yaml` extension per PRD 0017 — content is JSON (valid YAML) so # directly. Content is YAML (hand-rolled by `egress_render_routes`
# both sides can use stdlib `json`. # in the style of `pipelock_render_yaml`, parsed by `yaml_subset`
# inside the addon).
EGRESS_ROUTES_IN_CONTAINER = "/etc/egress/routes.yaml" EGRESS_ROUTES_IN_CONTAINER = "/etc/egress/routes.yaml"
@@ -241,25 +241,32 @@ def egress_render_routes(
) -> str: ) -> str:
"""Serialize the route table for the addon to read. """Serialize the route table for the addon to read.
JSON content (valid YAML), no token values, no host env-var YAML content no token values, no host env-var names. The only
names — the only thing the addon needs at runtime is the host → thing the addon needs at runtime is the host → path_allowlist
path_allowlist + auth_scheme + in-container env-var mapping. The + auth_scheme + in-container env-var mapping. The actual token
actual token values arrive via the container's environ. values arrive via the container's environ.
Authenticated routes carry `auth_scheme` + `token_env`; Authenticated routes carry `auth_scheme` + `token_env`;
unauthenticated routes omit both keys (the addon's parser unauthenticated routes omit both keys (the addon's parser
enforces both-or-neither).""" enforces both-or-neither). Hand-rolled YAML in the style of
payload_routes: list[dict[str, object]] = [] `pipelock_render_yaml` so the addon's parser
(`yaml_subset.parse_yaml_subset`) round-trips it cleanly."""
lines: list[str] = ["routes:"]
if not routes:
# `routes:` with an empty list on the same line — the parser
# needs SOMETHING here. Empty inline list is the cleanest.
lines[0] = "routes: []"
return "\n".join(lines) + "\n"
for r in routes: for r in routes:
entry: dict[str, object] = {"host": r.host} lines.append(f' - host: "{r.host}"')
if r.path_allowlist:
entry["path_allowlist"] = list(r.path_allowlist)
if r.auth_scheme and r.token_env: if r.auth_scheme and r.token_env:
entry["auth_scheme"] = r.auth_scheme lines.append(f' auth_scheme: "{r.auth_scheme}"')
entry["token_env"] = r.token_env lines.append(f' token_env: "{r.token_env}"')
payload_routes.append(entry) if r.path_allowlist:
payload = {"routes": payload_routes} lines.append(" path_allowlist:")
return json.dumps(payload, indent=2, sort_keys=False) + "\n" for p in r.path_allowlist:
lines.append(f' - "{p}"')
return "\n".join(lines) + "\n"
def egress_resolve_token_values( def egress_resolve_token_values(
+22 -8
View File
@@ -6,16 +6,28 @@ exercise the parse + decision functions without depending on the
`mitmproxy.http.HTTPFlow` API and is loaded inside the sidecar `mitmproxy.http.HTTPFlow` API and is loaded inside the sidecar
container. container.
Stdlib only: this file ships into the egress image, where the Imports: stdlib + `yaml_subset` (which is itself stdlib-only and
container's Python is whatever mitmproxy itself runs on. ships flat into the egress image alongside this file — see
`Dockerfile.egress`).
""" """
from __future__ import annotations from __future__ import annotations
import json
import typing import typing
from dataclasses import dataclass from dataclasses import dataclass
# Absolute import — `yaml_subset.py` is copied flat into the egress
# image's `/app/` next to this file (via `Dockerfile.egress`). The
# host-side unit tests run with the repo on sys.path, where the
# bare `yaml_subset` module also resolves because
# `claude_bottle/yaml_subset.py` shadows it at import time... actually
# no, on host the module lives under the `claude_bottle` package.
# The try/except shim picks whichever import works.
try:
from yaml_subset import YamlSubsetError, parse_yaml_subset # type: ignore[import-not-found]
except ImportError: # pragma: no cover - host-side path
from .yaml_subset import YamlSubsetError, parse_yaml_subset
@dataclass(frozen=True) @dataclass(frozen=True)
class Route: class Route:
@@ -126,12 +138,14 @@ def _parse_one(idx: int, raw: object) -> Route:
def load_routes(text: str) -> tuple[Route, ...]: def load_routes(text: str) -> tuple[Route, ...]:
"""Convenience: parse JSON text → routes. Raises `ValueError` for """Parse YAML text → routes. Raises `ValueError` for both
both decode and shape errors so callers handle them uniformly.""" decode and shape errors so callers handle them uniformly.
`YamlSubsetError` from the parser is a `ValueError` subclass so
it already satisfies the same surface; we let it propagate."""
try: try:
payload = json.loads(text) payload = parse_yaml_subset(text)
except json.JSONDecodeError as e: except YamlSubsetError as e:
raise ValueError(f"routes payload: invalid JSON: {e}") from e raise ValueError(f"routes payload: invalid YAML: {e}") from e
return parse_routes(payload) return parse_routes(payload)
+5 -1
View File
@@ -45,7 +45,7 @@ from pathlib import Path
from typing import Mapping, cast from typing import Mapping, cast
from .log import die, warn from .log import die, warn
from .yaml_subset import parse_frontmatter from .yaml_subset import YamlSubsetError, parse_frontmatter
def _empty_str_dict() -> dict[str, str]: def _empty_str_dict() -> dict[str, str]:
@@ -832,6 +832,8 @@ def _load_bottles_from_dir(bottles_dir: Path) -> dict[str, Bottle]:
fm, _body = parse_frontmatter(path.read_text()) fm, _body = parse_frontmatter(path.read_text())
except OSError as e: except OSError as e:
die(f"could not read {path}: {e}") die(f"could not read {path}: {e}")
except YamlSubsetError as e:
die(f"{path}: {e}")
unknown = set(fm.keys()) - _BOTTLE_KEYS unknown = set(fm.keys()) - _BOTTLE_KEYS
if unknown: if unknown:
allowed = ", ".join(sorted(_BOTTLE_KEYS)) allowed = ", ".join(sorted(_BOTTLE_KEYS))
@@ -867,6 +869,8 @@ def _load_agents_from_dir(
fm, body = parse_frontmatter(path.read_text()) fm, body = parse_frontmatter(path.read_text())
except OSError as e: except OSError as e:
die(f"could not read {path}: {e}") die(f"could not read {path}: {e}")
except YamlSubsetError as e:
die(f"{path}: {e}")
unknown = set(fm.keys()) - _AGENT_KEYS unknown = set(fm.keys()) - _AGENT_KEYS
if unknown: if unknown:
allowed = ", ".join(sorted(_AGENT_KEYS)) allowed = ", ".join(sorted(_AGENT_KEYS))
+14 -1
View File
@@ -59,7 +59,20 @@ from __future__ import annotations
import re import re
from dataclasses import dataclass from dataclasses import dataclass
from .log import die
class YamlSubsetError(ValueError):
"""Raised when input violates the YAML subset's rules. Callers
that want fatal-exit semantics (manifest loader, pipelock-apply,
etc.) catch this at their own boundary and forward to `die`;
callers running outside the claude-bottle CLI process (the
egress sidecar's addon) handle it as a normal exception."""
def die(msg: str) -> None:
"""Module-local helper so the parser body reads cleanly. Just
raises YamlSubsetError — the `claude-bottle: error: ` prefix
is added by the boundary `die` in `claude_bottle.log`."""
raise YamlSubsetError(msg)
# --- Tokenizer / line preprocessing ---------------------------------------- # --- Tokenizer / line preprocessing ----------------------------------------
+19 -7
View File
@@ -1,7 +1,6 @@
"""Unit: Egress route lift + routes.yaml render + token """Unit: Egress route lift + routes.yaml render + token
resolution (PRD 0017).""" resolution (PRD 0017)."""
import json
import unittest import unittest
from claude_bottle.egress import ( from claude_bottle.egress import (
@@ -14,6 +13,7 @@ from claude_bottle.egress import (
) )
from claude_bottle.log import Die from claude_bottle.log import Die
from claude_bottle.manifest import Manifest from claude_bottle.manifest import Manifest
from claude_bottle.yaml_subset import parse_yaml_subset
def _bottle(routes): def _bottle(routes):
@@ -134,6 +134,15 @@ class TestTokenEnvMap(unittest.TestCase):
class TestRenderRoutes(unittest.TestCase): class TestRenderRoutes(unittest.TestCase):
"""Render is YAML now (PRD 0017 follow-up). Tests parse the
output via `yaml_subset` the same parser the addon uses
so the assertions check the actual semantic shape the addon
will see, not the textual layout."""
@staticmethod
def _parsed(routes) -> list[dict]:
return parse_yaml_subset(egress_render_routes(routes))["routes"]
def test_authenticated_route_serialised_with_auth_fields(self): def test_authenticated_route_serialised_with_auth_fields(self):
b = _bottle([{ b = _bottle([{
"host": "api.github.com", "host": "api.github.com",
@@ -141,7 +150,7 @@ class TestRenderRoutes(unittest.TestCase):
"path_allowlist": ["/repos/x/"], "path_allowlist": ["/repos/x/"],
}]) }])
routes = egress_manifest_routes(b) routes = egress_manifest_routes(b)
payload = json.loads(egress_render_routes(routes)) parsed = self._parsed(routes)
self.assertEqual( self.assertEqual(
[{ [{
"host": "api.github.com", "host": "api.github.com",
@@ -149,7 +158,7 @@ class TestRenderRoutes(unittest.TestCase):
"auth_scheme": "Bearer", "auth_scheme": "Bearer",
"token_env": "EGRESS_TOKEN_0", "token_env": "EGRESS_TOKEN_0",
}], }],
payload["routes"], parsed,
) )
def test_unauthenticated_route_omits_auth_fields(self): def test_unauthenticated_route_omits_auth_fields(self):
@@ -159,8 +168,7 @@ class TestRenderRoutes(unittest.TestCase):
# round-trip as a partial pair and crash. # round-trip as a partial pair and crash.
b = _bottle([{"host": "github.com", "path_allowlist": ["/x/"]}]) b = _bottle([{"host": "github.com", "path_allowlist": ["/x/"]}])
routes = egress_manifest_routes(b) routes = egress_manifest_routes(b)
payload = json.loads(egress_render_routes(routes)) entry = self._parsed(routes)[0]
entry = payload["routes"][0]
self.assertNotIn("auth_scheme", entry) self.assertNotIn("auth_scheme", entry)
self.assertNotIn("token_env", entry) self.assertNotIn("token_env", entry)
@@ -170,8 +178,12 @@ class TestRenderRoutes(unittest.TestCase):
"auth": {"scheme": "Bearer", "token_ref": "CL"}, "auth": {"scheme": "Bearer", "token_ref": "CL"},
}]) }])
routes = egress_manifest_routes(b) routes = egress_manifest_routes(b)
payload = json.loads(egress_render_routes(routes)) self.assertNotIn("path_allowlist", self._parsed(routes)[0])
self.assertNotIn("path_allowlist", payload["routes"][0])
def test_empty_routes_round_trips(self):
rendered = egress_render_routes(())
# Inline-empty-list form is what the parser accepts.
self.assertEqual([], parse_yaml_subset(rendered)["routes"])
def test_round_trip_through_addon_core(self): def test_round_trip_through_addon_core(self):
# Render here → parse in the addon must succeed for every # Render here → parse in the addon must succeed for every
+29 -6
View File
@@ -105,16 +105,39 @@ class TestParseRoutes(unittest.TestCase):
class TestLoadRoutes(unittest.TestCase): class TestLoadRoutes(unittest.TestCase):
def test_json_text_round_trip(self): def test_yaml_text_round_trip(self):
routes = load_routes('{"routes":[{"host":"api.example"}]}') routes = load_routes(
'routes:\n'
' - host: "api.example"\n'
)
self.assertEqual(1, len(routes)) self.assertEqual(1, len(routes))
self.assertEqual("api.example", routes[0].host) self.assertEqual("api.example", routes[0].host)
def test_invalid_json_raises_value_error(self): def test_full_route_shape_parses(self):
# Both decode and schema errors land as ValueError so callers routes = load_routes(
# have a single except clause. 'routes:\n'
' - host: "api.example"\n'
' auth_scheme: "Bearer"\n'
' token_env: "EGRESS_TOKEN_0"\n'
' path_allowlist:\n'
' - "/v1/"\n'
' - "/messages"\n'
)
self.assertEqual(1, len(routes))
r = routes[0]
self.assertEqual("api.example", r.host)
self.assertEqual("Bearer", r.auth_scheme)
self.assertEqual("EGRESS_TOKEN_0", r.token_env)
self.assertEqual(("/v1/", "/messages"), r.path_allowlist)
def test_empty_routes_list(self):
routes = load_routes("routes: []\n")
self.assertEqual((), routes)
def test_invalid_yaml_raises_value_error(self):
# Tab indent is a YamlSubsetError; ValueError is its base.
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
load_routes("not json at all") load_routes("routes:\n\t- host: x\n")
# --- match_route --------------------------------------------------------- # --- match_route ---------------------------------------------------------
+79 -55
View File
@@ -4,8 +4,6 @@ integration test."""
import unittest import unittest
import json
from claude_bottle.backend.docker.egress_apply import ( from claude_bottle.backend.docker.egress_apply import (
EgressApplyError, EgressApplyError,
_hosts_in_routes, _hosts_in_routes,
@@ -13,56 +11,68 @@ from claude_bottle.backend.docker.egress_apply import (
_pipelock_safe_hosts, _pipelock_safe_hosts,
validate_routes_content, validate_routes_content,
) )
from claude_bottle.yaml_subset import parse_yaml_subset
# YAML fixtures matching the hand-rolled `_render_routes_payload`
# shape. Per-test custom shapes are spelled inline; these are the
# common ones.
_ROUTES_EMPTY = "routes: []\n"
_ROUTES_ONE = 'routes:\n - host: "api.anthropic.com"\n'
def _routes(parsed: str) -> list[dict]:
"""Parse a YAML routes string and pull out the routes list, so
tests can assert on shape directly."""
return parse_yaml_subset(parsed)["routes"]
class TestValidateRoutesContent(unittest.TestCase): class TestValidateRoutesContent(unittest.TestCase):
def test_accepts_minimal_route_table(self): def test_accepts_minimal_route_table(self):
validate_routes_content('{"routes": []}') validate_routes_content(_ROUTES_EMPTY)
validate_routes_content( validate_routes_content(_ROUTES_ONE)
'{"routes": [{"host": "api.github.com"}]}'
)
def test_accepts_full_route(self): def test_accepts_full_route(self):
validate_routes_content( validate_routes_content(
'{"routes": [{"host": "api.github.com",' 'routes:\n'
' "path_allowlist": ["/repos/x/"],' ' - host: "api.github.com"\n'
' "auth_scheme": "Bearer",' ' auth_scheme: "Bearer"\n'
' "token_env": "EGRESS_TOKEN_0"}]}' ' token_env: "EGRESS_TOKEN_0"\n'
' path_allowlist:\n'
' - "/repos/x/"\n'
) )
def test_rejects_bad_json(self): def test_rejects_bad_yaml(self):
with self.assertRaises(EgressApplyError) as cm: with self.assertRaises(EgressApplyError) as cm:
validate_routes_content("{not json") validate_routes_content("routes:\n\t- host: x\n")
self.assertIn("not valid", str(cm.exception)) self.assertIn("not valid", str(cm.exception))
def test_rejects_non_object_top_level(self):
with self.assertRaises(EgressApplyError):
validate_routes_content("[]")
def test_rejects_missing_routes_key(self): def test_rejects_missing_routes_key(self):
with self.assertRaises(EgressApplyError): with self.assertRaises(EgressApplyError):
validate_routes_content('{"other": []}') validate_routes_content("other: []\n")
def test_rejects_non_list_routes(self): def test_rejects_non_list_routes(self):
with self.assertRaises(EgressApplyError): with self.assertRaises(EgressApplyError):
validate_routes_content('{"routes": "not a list"}') validate_routes_content('routes: "not a list"\n')
def test_rejects_partial_auth_pair(self): def test_rejects_partial_auth_pair(self):
# The addon-core parser enforces both-or-neither — the apply # The addon-core parser enforces both-or-neither — the apply
# path picks this up before SIGHUP'ing the sidecar. # path picks this up before SIGHUP'ing the sidecar.
with self.assertRaises(EgressApplyError): with self.assertRaises(EgressApplyError):
validate_routes_content( validate_routes_content(
'{"routes": [{"host": "x.example",' 'routes:\n'
' "auth_scheme": "Bearer"}]}' ' - host: "x.example"\n'
' auth_scheme: "Bearer"\n'
) )
class TestHostsInRoutes(unittest.TestCase): class TestHostsInRoutes(unittest.TestCase):
def test_extracts_each_unique_host(self): def test_extracts_each_unique_host(self):
hosts = _hosts_in_routes( hosts = _hosts_in_routes(
'{"routes": [{"host": "api.github.com"},' 'routes:\n'
' {"host": "github.com"},' ' - host: "api.github.com"\n'
' {"host": "api.anthropic.com"}]}' ' - host: "github.com"\n'
' - host: "api.anthropic.com"\n'
) )
# Sorted+deduped. # Sorted+deduped.
self.assertEqual( self.assertEqual(
@@ -72,28 +82,34 @@ class TestHostsInRoutes(unittest.TestCase):
def test_dedupes_same_host(self): def test_dedupes_same_host(self):
hosts = _hosts_in_routes( hosts = _hosts_in_routes(
'{"routes": [{"host": "x.example", "path_allowlist": ["/a/"]},' 'routes:\n'
' {"host": "x.example", "path_allowlist": ["/b/"]}]}' ' - host: "x.example"\n'
' path_allowlist:\n'
' - "/a/"\n'
' - host: "x.example"\n'
' path_allowlist:\n'
' - "/b/"\n'
) )
self.assertEqual(["x.example"], hosts) self.assertEqual(["x.example"], hosts)
def test_empty_routes_returns_empty(self): def test_empty_routes_returns_empty(self):
self.assertEqual([], _hosts_in_routes('{"routes": []}')) self.assertEqual([], _hosts_in_routes(_ROUTES_EMPTY))
def test_invalid_routes_raises(self): def test_invalid_routes_raises(self):
# The mirror helper relies on parsing succeeding; bad input # The mirror helper relies on parsing succeeding; bad input
# should error before pipelock is touched. # should error before pipelock is touched.
with self.assertRaises(EgressApplyError): with self.assertRaises(EgressApplyError):
_hosts_in_routes('{"routes": [{"path": "/no-host/"}]}') _hosts_in_routes(
'routes:\n - path_allowlist:\n - "/no-host/"\n'
)
class TestMergeSingleRoute(unittest.TestCase): class TestMergeSingleRoute(unittest.TestCase):
BASE = '{"routes": [{"host": "api.anthropic.com"}]}' BASE = _ROUTES_ONE
def test_appends_route_when_host_absent(self): def test_appends_route_when_host_absent(self):
merged = _merge_single_route(self.BASE, {"host": "github.com"}) merged = _merge_single_route(self.BASE, {"host": "github.com"})
routes = json.loads(merged)["routes"] hosts = [r["host"] for r in _routes(merged)]
hosts = [r["host"] for r in routes]
self.assertEqual(["api.anthropic.com", "github.com"], hosts) self.assertEqual(["api.anthropic.com", "github.com"], hosts)
def test_appends_path_allowlist(self): def test_appends_path_allowlist(self):
@@ -101,7 +117,7 @@ class TestMergeSingleRoute(unittest.TestCase):
self.BASE, self.BASE,
{"host": "github.com", "path_allowlist": ["/repos/x/"]}, {"host": "github.com", "path_allowlist": ["/repos/x/"]},
) )
new_route = json.loads(merged)["routes"][-1] new_route = _routes(merged)[-1]
self.assertEqual(["/repos/x/"], new_route["path_allowlist"]) self.assertEqual(["/repos/x/"], new_route["path_allowlist"])
def test_appends_auth_with_token_env_slot(self): def test_appends_auth_with_token_env_slot(self):
@@ -112,72 +128,80 @@ class TestMergeSingleRoute(unittest.TestCase):
"auth": {"scheme": "Bearer", "token_ref": "GH"}, "auth": {"scheme": "Bearer", "token_ref": "GH"},
}, },
) )
new_route = json.loads(merged)["routes"][-1] new_route = _routes(merged)[-1]
self.assertEqual("Bearer", new_route["auth_scheme"]) self.assertEqual("Bearer", new_route["auth_scheme"])
# First auth slot when no prior auth routes exist. # First auth slot when no prior auth routes exist.
self.assertEqual("EGRESS_TOKEN_0", new_route["token_env"]) self.assertEqual("EGRESS_TOKEN_0", new_route["token_env"])
def test_auth_slot_increments_past_existing(self): def test_auth_slot_increments_past_existing(self):
base = json.dumps({"routes": [ base = (
{"host": "api.anthropic.com", 'routes:\n'
"auth_scheme": "Bearer", ' - host: "api.anthropic.com"\n'
"token_env": "EGRESS_TOKEN_0"}, ' auth_scheme: "Bearer"\n'
]}) ' token_env: "EGRESS_TOKEN_0"\n'
)
merged = _merge_single_route(base, { merged = _merge_single_route(base, {
"host": "api.github.com", "host": "api.github.com",
"auth": {"scheme": "Bearer", "token_ref": "GH"}, "auth": {"scheme": "Bearer", "token_ref": "GH"},
}) })
new_route = json.loads(merged)["routes"][-1] new_route = _routes(merged)[-1]
self.assertEqual("EGRESS_TOKEN_1", new_route["token_env"]) self.assertEqual("EGRESS_TOKEN_1", new_route["token_env"])
def test_existing_host_merges_path_allowlist_as_union(self): def test_existing_host_merges_path_allowlist_as_union(self):
base = json.dumps({"routes": [ base = (
{"host": "github.com", "path_allowlist": ["/a/"]}, 'routes:\n'
]}) ' - host: "github.com"\n'
' path_allowlist:\n'
' - "/a/"\n'
)
merged = _merge_single_route(base, { merged = _merge_single_route(base, {
"host": "github.com", "host": "github.com",
"path_allowlist": ["/b/"], "path_allowlist": ["/b/"],
}) })
routes = json.loads(merged)["routes"] routes = _routes(merged)
self.assertEqual(1, len(routes)) # not duplicated self.assertEqual(1, len(routes)) # not duplicated
self.assertEqual(["/a/", "/b/"], routes[0]["path_allowlist"]) self.assertEqual(["/a/", "/b/"], routes[0]["path_allowlist"])
def test_existing_host_dedup_path_allowlist(self): def test_existing_host_dedup_path_allowlist(self):
base = json.dumps({"routes": [ base = (
{"host": "github.com", "path_allowlist": ["/a/"]}, 'routes:\n'
]}) ' - host: "github.com"\n'
' path_allowlist:\n'
' - "/a/"\n'
)
merged = _merge_single_route(base, { merged = _merge_single_route(base, {
"host": "github.com", "host": "github.com",
"path_allowlist": ["/a/", "/b/"], "path_allowlist": ["/a/", "/b/"],
}) })
self.assertEqual( self.assertEqual(
["/a/", "/b/"], ["/a/", "/b/"],
json.loads(merged)["routes"][0]["path_allowlist"], _routes(merged)[0]["path_allowlist"],
) )
def test_existing_host_preserves_existing_auth_ignores_proposed(self): def test_existing_host_preserves_existing_auth_ignores_proposed(self):
# Tool docs: auth on an existing host is operator-controlled, # Tool docs: auth on an existing host is operator-controlled,
# not agent-controlled. The merge must not overwrite. # not agent-controlled. The merge must not overwrite.
base = json.dumps({"routes": [ base = (
{"host": "api.github.com", 'routes:\n'
"auth_scheme": "Bearer", ' - host: "api.github.com"\n'
"token_env": "EGRESS_TOKEN_0"}, ' auth_scheme: "Bearer"\n'
]}) ' token_env: "EGRESS_TOKEN_0"\n'
)
merged = _merge_single_route(base, { merged = _merge_single_route(base, {
"host": "api.github.com", "host": "api.github.com",
"auth": {"scheme": "token", "token_ref": "DIFFERENT"}, "auth": {"scheme": "token", "token_ref": "DIFFERENT"},
}) })
route = json.loads(merged)["routes"][0] route = _routes(merged)[0]
self.assertEqual("Bearer", route["auth_scheme"]) self.assertEqual("Bearer", route["auth_scheme"])
self.assertEqual("EGRESS_TOKEN_0", route["token_env"]) self.assertEqual("EGRESS_TOKEN_0", route["token_env"])
def test_host_match_is_case_insensitive(self): def test_host_match_is_case_insensitive(self):
base = json.dumps({"routes": [{"host": "GitHub.com"}]}) base = 'routes:\n - host: "GitHub.com"\n'
merged = _merge_single_route(base, { merged = _merge_single_route(base, {
"host": "github.com", "host": "github.com",
"path_allowlist": ["/x/"], "path_allowlist": ["/x/"],
}) })
routes = json.loads(merged)["routes"] routes = _routes(merged)
self.assertEqual(1, len(routes)) self.assertEqual(1, len(routes))
self.assertEqual(["/x/"], routes[0]["path_allowlist"]) self.assertEqual(["/x/"], routes[0]["path_allowlist"])
@@ -187,7 +211,7 @@ class TestMergeSingleRoute(unittest.TestCase):
def test_invalid_current_yaml_raises(self): def test_invalid_current_yaml_raises(self):
with self.assertRaises(EgressApplyError): with self.assertRaises(EgressApplyError):
_merge_single_route("{not json", {"host": "x.example"}) _merge_single_route("routes:\n\tbad", {"host": "x.example"})
class TestPipelockSafeHosts(unittest.TestCase): class TestPipelockSafeHosts(unittest.TestCase):
+11 -11
View File
@@ -5,7 +5,7 @@ actually use, and every rejection case the PRD enumerates."""
import textwrap import textwrap
import unittest import unittest
from claude_bottle.log import Die from claude_bottle.yaml_subset import YamlSubsetError
from claude_bottle.yaml_subset import parse_frontmatter, parse_yaml_subset from claude_bottle.yaml_subset import parse_frontmatter, parse_yaml_subset
@@ -60,7 +60,7 @@ class TestForbiddenBoolLikes(unittest.TestCase):
"""Ambiguous bool-ish tokens have to be quoted explicitly.""" """Ambiguous bool-ish tokens have to be quoted explicitly."""
def _expect_die(self, src: str): def _expect_die(self, src: str):
with self.assertRaises(Die): with self.assertRaises(YamlSubsetError):
_y(src) _y(src)
def test_yes_dies(self): def test_yes_dies(self):
@@ -81,7 +81,7 @@ class TestForbiddenBoolLikes(unittest.TestCase):
class TestForbiddenScalarShapes(unittest.TestCase): class TestForbiddenScalarShapes(unittest.TestCase):
def _expect_die(self, src: str): def _expect_die(self, src: str):
with self.assertRaises(Die): with self.assertRaises(YamlSubsetError):
_y(src) _y(src)
def test_bare_date_dies(self): def test_bare_date_dies(self):
@@ -120,14 +120,14 @@ class TestMapping(unittest.TestCase):
self.assertEqual({"outer": {"inner": "hello", "other": 5}}, out) self.assertEqual({"outer": {"inner": "hello", "other": 5}}, out)
def test_duplicate_key_dies(self): def test_duplicate_key_dies(self):
with self.assertRaises(Die): with self.assertRaises(YamlSubsetError):
_y(""" _y("""
a: 1 a: 1
a: 2 a: 2
""") """)
def test_key_must_be_bare_identifier(self): def test_key_must_be_bare_identifier(self):
with self.assertRaises(Die): with self.assertRaises(YamlSubsetError):
_y('"weird key": 1\n') _y('"weird key": 1\n')
@@ -202,20 +202,20 @@ class TestInline(unittest.TestCase):
) )
def test_nested_flow_dies(self): def test_nested_flow_dies(self):
with self.assertRaises(Die): with self.assertRaises(YamlSubsetError):
_y("l: [[1, 2], [3, 4]]\n") _y("l: [[1, 2], [3, 4]]\n")
class TestForbiddenConstructs(unittest.TestCase): class TestForbiddenConstructs(unittest.TestCase):
def test_anchor_dies(self): def test_anchor_dies(self):
with self.assertRaises(Die): with self.assertRaises(YamlSubsetError):
_y(""" _y("""
a: &anchor 1 a: &anchor 1
b: *anchor b: *anchor
""") """)
def test_multiline_block_scalar_dies(self): def test_multiline_block_scalar_dies(self):
with self.assertRaises(Die): with self.assertRaises(YamlSubsetError):
_y(""" _y("""
k: | k: |
line 1 line 1
@@ -223,11 +223,11 @@ class TestForbiddenConstructs(unittest.TestCase):
""") """)
def test_tag_dies(self): def test_tag_dies(self):
with self.assertRaises(Die): with self.assertRaises(YamlSubsetError):
_y("k: !!str hello\n") _y("k: !!str hello\n")
def test_tab_in_indent_dies(self): def test_tab_in_indent_dies(self):
with self.assertRaises(Die): with self.assertRaises(YamlSubsetError):
_y("a:\n\tb: 1\n") _y("a:\n\tb: 1\n")
@@ -306,7 +306,7 @@ class TestFrontmatter(unittest.TestCase):
self.assertEqual(text, body) self.assertEqual(text, body)
def test_unclosed_frontmatter_dies(self): def test_unclosed_frontmatter_dies(self):
with self.assertRaises(Die): with self.assertRaises(YamlSubsetError):
parse_frontmatter("---\nbottle: dev\nno closing") parse_frontmatter("---\nbottle: dev\nno closing")
def test_body_preserves_blank_lines(self): def test_body_preserves_blank_lines(self):