refactor(egress): provisioned-wins merge + _route_to_yaml_fields (PRD 0031)

Replace _merge_provider_route's five-case nested conditional with a flat
provisioned-wins merge: provider routes claim their hosts outright, manifest
routes for unclaimed hosts append unchanged. Token slot assignment moves to a
single _assign_token_slots pass over the merged list.

Add _route_to_yaml_fields as the single authoritative EgressRoute→YAML mapping,
eliminating the risk of EgressRoute and egress_addon_core.Route silently
drifting apart when new fields are added.

egress_manifest_routes is now a pure lifter with no slot assignment.
_merge_provider_route and _find_or_alloc_token_env are removed.

Tests updated: conflict-die case removed, upgrade-bare replaced with
provider-wins semantics, slot-assignment tests moved to TestSlotAssignment.
This commit is contained in:
2026-06-02 05:45:20 +00:00
parent ae33d1abfb
commit 10d0872043
2 changed files with 117 additions and 179 deletions
+68 -133
View File
@@ -24,6 +24,7 @@ flow (PRD 0014) at egress and renames the MCP tool.
from __future__ import annotations
import dataclasses
from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
@@ -141,43 +142,20 @@ class EgressPlan:
def egress_manifest_routes(
bottle: Bottle,
) -> tuple[EgressRoute, ...]:
"""Lift each `bottle.egress.routes[]` manifest entry into a
resolved EgressRoute. Order is preserved so route lookup at
the proxy is stable.
Token-env slots are assigned per distinct `token_ref`: the first
authenticated route with `token_ref` "GH_PAT" gets
`EGRESS_TOKEN_0`; a second route with the same `token_ref`
shares slot 0. Unauthenticated routes (`auth` omitted) contribute
no slot.
This is the effective set the addon enforces. Provider runtime
routes are intentionally not injected implicitly; every allowed
host must come from the home-owned bottle manifest."""
"""Lift each `bottle.egress.routes[]` manifest entry into an EgressRoute.
Order is preserved. Token slots are not assigned here — slot assignment
is a final step in `egress_routes_for_bottle` after provider and manifest
routes are merged."""
out: list[EgressRoute] = []
slot_for_token: dict[str, str] = {}
for r in bottle.egress.routes:
if r.AuthScheme and r.TokenRef:
token_env = slot_for_token.get(r.TokenRef)
if token_env is None:
token_env = f"EGRESS_TOKEN_{len(slot_for_token)}"
slot_for_token[r.TokenRef] = token_env
out.append(EgressRoute(
host=r.Host,
path_allowlist=r.PathAllowlist,
auth_scheme=r.AuthScheme,
token_env=token_env,
token_ref=r.TokenRef,
roles=r.Role,
tls_passthrough=r.Pipelock.TlsPassthrough,
))
else:
out.append(EgressRoute(
host=r.Host,
path_allowlist=r.PathAllowlist,
roles=r.Role,
tls_passthrough=r.Pipelock.TlsPassthrough,
))
out.append(EgressRoute(
host=r.Host,
path_allowlist=r.PathAllowlist,
auth_scheme=r.AuthScheme,
token_ref=r.TokenRef,
roles=r.Role,
tls_passthrough=r.Pipelock.TlsPassthrough,
))
return tuple(out)
@@ -185,90 +163,39 @@ def egress_routes_for_bottle(
bottle: Bottle,
provider_routes: tuple[EgressRoute, ...] = (),
) -> tuple[EgressRoute, ...]:
"""Effective egress routes for the agent. This is what gets rendered
into routes.yaml and what the addon enforces.
"""Effective egress routes for the agent.
Merges manifest-declared routes with provider-owned routes. The
manifest is the primary surface; `provider_routes` are synthesised
by `agent_provision_plan` and may add or upgrade manifest entries.
Provider routes that conflict with an existing authenticated manifest
route (different auth scheme or token ref) raise a hard error."""
routes = list(egress_manifest_routes(bottle))
for pr in provider_routes:
routes = _merge_provider_route(routes, pr)
return tuple(routes)
Provider routes own their hosts outright; manifest routes for hosts
not claimed by any provider are appended. Token slots are assigned
in a final pass over the merged list in order, so provisioned routes
get the lower slot numbers."""
manifest = egress_manifest_routes(bottle)
provisioned_hosts = {pr.host.lower() for pr in provider_routes}
merged = list(provider_routes) + [
r for r in manifest if r.host.lower() not in provisioned_hosts
]
return _assign_token_slots(merged)
def _find_or_alloc_token_env(routes: list[EgressRoute], token_ref: str) -> str:
"""Return the existing token_env slot for `token_ref`, or allocate the next one."""
if not token_ref:
return ""
for route in routes:
if route.token_ref == token_ref and route.token_env:
return route.token_env
return f"EGRESS_TOKEN_{len({r.token_env for r in routes if r.token_env})}"
def _assign_token_slots(
routes: list[EgressRoute],
) -> tuple[EgressRoute, ...]:
"""Assign EGRESS_TOKEN_N slots to authenticated routes in order.
def _merge_provider_route(
routes: list[EgressRoute], pr: EgressRoute,
) -> list[EgressRoute]:
"""Merge one provider-declared route into the manifest route list.
Upgrade a bare-pass manifest route to authenticated if the provider
declares auth for that host, or append if the host isn't in the manifest.
Identical auth (same scheme + token_ref) on an existing route is a
no-op, with a tls_passthrough upgrade if the provider route sets it.
Conflicting auth (different scheme or token_ref) dies."""
for idx, route in enumerate(routes):
if route.host.lower() != pr.host.lower():
continue
if route.auth_scheme or route.token_ref:
if route.auth_scheme == pr.auth_scheme and route.token_ref == pr.token_ref:
if pr.tls_passthrough and not route.tls_passthrough:
routes[idx] = EgressRoute(
host=route.host,
path_allowlist=route.path_allowlist,
auth_scheme=route.auth_scheme,
token_env=route.token_env,
token_ref=route.token_ref,
roles=route.roles,
tls_passthrough=True,
)
return routes
die(
f"provider egress route for {pr.host!r} conflicts with an "
f"authenticated manifest route (different auth scheme or token "
f"ref). Remove the manifest route's auth block or disable the "
f"feature that adds this provider route."
)
token_env = (
_find_or_alloc_token_env(routes, pr.token_ref)
if pr.auth_scheme and pr.token_ref
else ""
)
routes[idx] = EgressRoute(
host=route.host,
path_allowlist=route.path_allowlist,
auth_scheme=pr.auth_scheme,
token_env=token_env,
token_ref=pr.token_ref,
roles=route.roles,
tls_passthrough=pr.tls_passthrough,
)
return routes
token_env = (
_find_or_alloc_token_env(routes, pr.token_ref)
if pr.auth_scheme and pr.token_ref
else ""
)
routes.append(EgressRoute(
host=pr.host,
auth_scheme=pr.auth_scheme,
token_env=token_env,
token_ref=pr.token_ref,
tls_passthrough=pr.tls_passthrough,
))
return routes
Routes sharing a token_ref share a slot. Unauthenticated routes
(no auth_scheme / token_ref) keep token_env empty."""
slot_for_ref: dict[str, str] = {}
out: list[EgressRoute] = []
for r in routes:
if r.auth_scheme and r.token_ref:
slot = slot_for_ref.get(r.token_ref)
if slot is None:
slot = f"EGRESS_TOKEN_{len(slot_for_ref)}"
slot_for_ref[r.token_ref] = slot
out.append(dataclasses.replace(r, token_env=slot))
else:
out.append(r)
return tuple(out)
def egress_token_env_map(
@@ -296,35 +223,43 @@ def egress_token_env_map(
return out
def _route_to_yaml_fields(r: EgressRoute) -> dict:
"""Return the addon-visible fields for one route.
Single authoritative mapping between EgressRoute (host-side) and
egress_addon_core.Route (sidecar-side). When a field is added to
the addon's Route that must appear in the YAML, add it here and
in egress_addon_core._parse_one together."""
fields: dict = {"host": r.host}
if r.auth_scheme and r.token_env:
fields["auth_scheme"] = r.auth_scheme
fields["token_env"] = r.token_env
if r.path_allowlist:
fields["path_allowlist"] = list(r.path_allowlist)
return fields
def egress_render_routes(
routes: tuple[EgressRoute, ...],
) -> str:
"""Serialize the route table for the addon to read.
YAML content — no token values, no host env-var names. The only
thing the addon needs at runtime is the host → path_allowlist
+ auth_scheme + in-container env-var mapping. The actual token
values arrive via the container's environ.
Authenticated routes carry `auth_scheme` + `token_env`;
unauthenticated routes omit both keys (the addon's parser
enforces both-or-neither). Hand-rolled YAML in the style of
`pipelock_render_yaml` so the addon's parser
(`yaml_subset.parse_yaml_subset`) round-trips it cleanly."""
YAML content — no token values, no host env-var names. Fields are
determined by `_route_to_yaml_fields`, which is the single point of
truth for the EgressRoute → egress_addon_core.Route mapping."""
lines: list[str] = ["routes:"]
if not routes:
# `routes:` with an empty list on the same line — the parser
# needs SOMETHING here. Empty inline list is the cleanest.
lines[0] = "routes: []"
return "\n".join(lines) + "\n"
for r in routes:
lines.append(f' - host: "{r.host}"')
if r.auth_scheme and r.token_env:
lines.append(f' auth_scheme: "{r.auth_scheme}"')
lines.append(f' token_env: "{r.token_env}"')
if r.path_allowlist:
f = _route_to_yaml_fields(r)
lines.append(f' - host: "{f["host"]}"')
if "auth_scheme" in f:
lines.append(f' auth_scheme: "{f["auth_scheme"]}"')
lines.append(f' token_env: "{f["token_env"]}"')
if "path_allowlist" in f:
lines.append(" path_allowlist:")
for p in r.path_allowlist:
for p in f["path_allowlist"]:
lines.append(f' - "{p}"')
return "\n".join(lines) + "\n"
+49 -46
View File
@@ -33,8 +33,10 @@ def _provider_route(host: str, token_ref: str, *, tls_passthrough: bool = False)
)
class TestRoutesForBottle(unittest.TestCase):
def test_authenticated_route_gets_slot(self):
class TestManifestRouteLift(unittest.TestCase):
"""egress_manifest_routes is a pure lifter — no slot assignment."""
def test_authenticated_route_lifted_without_slot(self):
b = _bottle([{
"host": "api.github.com",
"auth": {"scheme": "Bearer", "token_ref": "GH_PAT"},
@@ -44,8 +46,8 @@ class TestRoutesForBottle(unittest.TestCase):
r = routes[0]
self.assertEqual("api.github.com", r.host)
self.assertEqual("Bearer", r.auth_scheme)
self.assertEqual("EGRESS_TOKEN_0", r.token_env)
self.assertEqual("GH_PAT", r.token_ref)
self.assertEqual("", r.token_env) # slot assigned later
self.assertEqual((), r.path_allowlist)
def test_unauthenticated_route_has_empty_auth_fields(self):
@@ -57,6 +59,20 @@ class TestRoutesForBottle(unittest.TestCase):
self.assertEqual("", r.token_ref)
self.assertEqual(("/x/",), r.path_allowlist)
class TestSlotAssignment(unittest.TestCase):
"""Slot assignment happens in egress_routes_for_bottle."""
def test_authenticated_route_gets_slot(self):
b = _bottle([{
"host": "api.github.com",
"auth": {"scheme": "Bearer", "token_ref": "GH_PAT"},
}])
routes = egress_routes_for_bottle(b)
r = routes[0]
self.assertEqual("EGRESS_TOKEN_0", r.token_env)
self.assertEqual("GH_PAT", r.token_ref)
def test_shared_token_ref_collapses_to_one_slot(self):
b = _bottle([
{"host": "api.github.com",
@@ -64,7 +80,7 @@ class TestRoutesForBottle(unittest.TestCase):
{"host": "github.com",
"auth": {"scheme": "Bearer", "token_ref": "GH_PAT"}},
])
routes = egress_manifest_routes(b)
routes = egress_routes_for_bottle(b)
slots = {r.token_env for r in routes}
self.assertEqual({"EGRESS_TOKEN_0"}, slots)
@@ -75,7 +91,7 @@ class TestRoutesForBottle(unittest.TestCase):
{"host": "b.example",
"auth": {"scheme": "Bearer", "token_ref": "T2"}},
])
routes = egress_manifest_routes(b)
routes = egress_routes_for_bottle(b)
slots = [r.token_env for r in routes]
self.assertEqual(["EGRESS_TOKEN_0", "EGRESS_TOKEN_1"], slots)
@@ -89,7 +105,7 @@ class TestRoutesForBottle(unittest.TestCase):
{"host": "b.example",
"auth": {"scheme": "Bearer", "token_ref": "T2"}},
])
routes = egress_manifest_routes(b)
routes = egress_routes_for_bottle(b)
authed = [r.token_env for r in routes if r.token_env]
self.assertEqual(["EGRESS_TOKEN_0", "EGRESS_TOKEN_1"], authed)
self.assertEqual("", routes[1].token_env)
@@ -133,7 +149,7 @@ class TestRoutesForBottleManifestOnly(unittest.TestCase):
class TestProviderRouteMerge(unittest.TestCase):
"""Provider routes are merged into manifest routes generically."""
"""Provider routes win on host collision; manifest fills the rest."""
def test_provider_route_appended_when_not_in_manifest(self):
b = _bottle([])
@@ -156,8 +172,9 @@ class TestProviderRouteMerge(unittest.TestCase):
self.assertEqual("", routes[0].token_ref)
self.assertEqual({}, egress_token_env_map(routes))
def test_unauthenticated_provider_route_upgrades_bare_without_token_slot(self):
b = _bottle([{"host": "api.openai.com"}])
def test_provider_route_wins_over_bare_manifest_route(self):
# Provisioned host wins outright; manifest path_allowlist is dropped.
b = _bottle([{"host": "api.openai.com", "path_allowlist": ["/v1/"]}])
pr = EgressRoute(host="api.openai.com", tls_passthrough=True)
routes = egress_routes_for_bottle(b, (pr,))
self.assertEqual(1, len(routes))
@@ -165,6 +182,7 @@ class TestProviderRouteMerge(unittest.TestCase):
self.assertEqual("", routes[0].token_env)
self.assertEqual("", routes[0].token_ref)
self.assertTrue(routes[0].tls_passthrough)
self.assertEqual((), routes[0].path_allowlist)
self.assertEqual({}, egress_token_env_map(routes))
def test_two_provider_routes_with_same_token_ref_share_slot(self):
@@ -177,8 +195,11 @@ class TestProviderRouteMerge(unittest.TestCase):
self.assertEqual("EGRESS_TOKEN_0", routes[0].token_env)
self.assertEqual("EGRESS_TOKEN_0", routes[1].token_env)
def test_provider_route_upgrades_bare_manifest_route(self):
b = _bottle([{"host": "chatgpt.com", "path_allowlist": ["/backend-api/"]}])
def test_provider_route_wins_over_authed_manifest_route(self):
# Provider wins even when manifest has its own auth for the host.
b = _bottle([{"host": "chatgpt.com",
"path_allowlist": ["/backend-api/"],
"auth": {"scheme": "Bearer", "token_ref": "OTHER"}}])
pr = _provider_route("chatgpt.com", CODEX_HOST_CREDENTIAL_TOKEN_REF)
routes = egress_routes_for_bottle(b, (pr,))
self.assertEqual(1, len(routes))
@@ -186,38 +207,20 @@ class TestProviderRouteMerge(unittest.TestCase):
self.assertEqual("Bearer", routes[0].auth_scheme)
self.assertEqual("EGRESS_TOKEN_0", routes[0].token_env)
self.assertEqual(CODEX_HOST_CREDENTIAL_TOKEN_REF, routes[0].token_ref)
self.assertEqual(("/backend-api/",), routes[0].path_allowlist)
self.assertEqual((), routes[0].path_allowlist)
def test_provider_route_noop_when_same_auth_already_in_manifest(self):
b = _bottle([{
"host": "api.openai.com",
"auth": {"scheme": "Bearer", "token_ref": CODEX_HOST_CREDENTIAL_TOKEN_REF},
}])
def test_manifest_route_preserved_for_non_provisioned_host(self):
b = _bottle([
{"host": "api.openai.com"},
{"host": "api.github.com",
"auth": {"scheme": "Bearer", "token_ref": "GH_PAT"}},
])
pr = _provider_route("api.openai.com", CODEX_HOST_CREDENTIAL_TOKEN_REF)
routes = egress_routes_for_bottle(b, (pr,))
self.assertEqual(1, len(routes))
self.assertEqual("EGRESS_TOKEN_0", routes[0].token_env)
def test_provider_route_upgrades_tls_passthrough_on_existing_same_auth(self):
b = _bottle([{
"host": "api.openai.com",
"auth": {"scheme": "Bearer", "token_ref": CODEX_HOST_CREDENTIAL_TOKEN_REF},
}])
pr = _provider_route(
"api.openai.com", CODEX_HOST_CREDENTIAL_TOKEN_REF, tls_passthrough=True,
)
routes = egress_routes_for_bottle(b, (pr,))
self.assertEqual(1, len(routes))
self.assertTrue(routes[0].tls_passthrough)
def test_provider_route_conflicts_with_different_authed_manifest_route(self):
b = _bottle([{
"host": "chatgpt.com",
"auth": {"scheme": "Bearer", "token_ref": "OTHER"},
}])
pr = _provider_route("chatgpt.com", CODEX_HOST_CREDENTIAL_TOKEN_REF)
with self.assertRaises(Die):
egress_routes_for_bottle(b, (pr,))
hosts = [r.host for r in routes]
self.assertEqual(["api.openai.com", "api.github.com"], hosts)
self.assertEqual(CODEX_HOST_CREDENTIAL_TOKEN_REF, routes[0].token_ref)
self.assertEqual("GH_PAT", routes[1].token_ref)
def test_provider_route_tls_passthrough_set_on_appended_route(self):
b = _bottle([])
@@ -225,7 +228,7 @@ class TestProviderRouteMerge(unittest.TestCase):
routes = egress_routes_for_bottle(b, (pr,))
self.assertTrue(routes[0].tls_passthrough)
def test_provider_route_tls_passthrough_set_on_upgraded_bare_route(self):
def test_provider_route_tls_passthrough_wins_over_bare_manifest_route(self):
b = _bottle([{"host": "api.openai.com"}])
pr = _provider_route("api.openai.com", "TOK", tls_passthrough=True)
routes = egress_routes_for_bottle(b, (pr,))
@@ -239,7 +242,7 @@ class TestTokenEnvMap(unittest.TestCase):
"auth": {"scheme": "Bearer", "token_ref": "T1"}},
{"host": "passthrough.example"},
])
routes = egress_manifest_routes(b)
routes = egress_routes_for_bottle(b)
m = egress_token_env_map(routes)
self.assertEqual({"EGRESS_TOKEN_0": "T1"}, m)
@@ -263,7 +266,7 @@ class TestRenderRoutes(unittest.TestCase):
"auth": {"scheme": "Bearer", "token_ref": "GH_PAT"},
"path_allowlist": ["/repos/x/"],
}])
routes = egress_manifest_routes(b)
routes = egress_routes_for_bottle(b)
parsed = self._parsed(routes)
self.assertEqual(
[{
@@ -281,7 +284,7 @@ class TestRenderRoutes(unittest.TestCase):
# enforces both-or-neither, so emitting empty strings would
# round-trip as a partial pair and crash.
b = _bottle([{"host": "github.com", "path_allowlist": ["/x/"]}])
routes = egress_manifest_routes(b)
routes = egress_routes_for_bottle(b)
entry = self._parsed(routes)[0]
self.assertNotIn("auth_scheme", entry)
self.assertNotIn("token_env", entry)
@@ -291,7 +294,7 @@ class TestRenderRoutes(unittest.TestCase):
"host": "api.anthropic.com",
"auth": {"scheme": "Bearer", "token_ref": "CL"},
}])
routes = egress_manifest_routes(b)
routes = egress_routes_for_bottle(b)
self.assertNotIn("path_allowlist", self._parsed(routes)[0])
def test_empty_routes_round_trips(self):
@@ -310,7 +313,7 @@ class TestRenderRoutes(unittest.TestCase):
{"host": "github.com", "path_allowlist": ["/x/"]},
{"host": "api.anthropic.com"},
])
routes = egress_manifest_routes(b)
routes = egress_routes_for_bottle(b)
addon_routes = load_routes(egress_render_routes(routes))
self.assertEqual(3, len(addon_routes))
self.assertEqual("Bearer", addon_routes[0].auth_scheme)