diff --git a/bot_bottle/egress.py b/bot_bottle/egress.py index 0e1bc0d..607feaf 100644 --- a/bot_bottle/egress.py +++ b/bot_bottle/egress.py @@ -24,6 +24,7 @@ flow (PRD 0014) at egress and renames the MCP tool. from __future__ import annotations +import dataclasses from abc import ABC, abstractmethod from dataclasses import dataclass from pathlib import Path @@ -141,43 +142,20 @@ class EgressPlan: def egress_manifest_routes( bottle: Bottle, ) -> tuple[EgressRoute, ...]: - """Lift each `bottle.egress.routes[]` manifest entry into a - resolved EgressRoute. Order is preserved so route lookup at - the proxy is stable. - - Token-env slots are assigned per distinct `token_ref`: the first - authenticated route with `token_ref` "GH_PAT" gets - `EGRESS_TOKEN_0`; a second route with the same `token_ref` - shares slot 0. Unauthenticated routes (`auth` omitted) contribute - no slot. - - This is the effective set the addon enforces. Provider runtime - routes are intentionally not injected implicitly; every allowed - host must come from the home-owned bottle manifest.""" + """Lift each `bottle.egress.routes[]` manifest entry into an EgressRoute. + Order is preserved. Token slots are not assigned here — slot assignment + is a final step in `egress_routes_for_bottle` after provider and manifest + routes are merged.""" out: list[EgressRoute] = [] - slot_for_token: dict[str, str] = {} for r in bottle.egress.routes: - if r.AuthScheme and r.TokenRef: - token_env = slot_for_token.get(r.TokenRef) - if token_env is None: - token_env = f"EGRESS_TOKEN_{len(slot_for_token)}" - slot_for_token[r.TokenRef] = token_env - out.append(EgressRoute( - host=r.Host, - path_allowlist=r.PathAllowlist, - auth_scheme=r.AuthScheme, - token_env=token_env, - token_ref=r.TokenRef, - roles=r.Role, - tls_passthrough=r.Pipelock.TlsPassthrough, - )) - else: - out.append(EgressRoute( - host=r.Host, - path_allowlist=r.PathAllowlist, - roles=r.Role, - tls_passthrough=r.Pipelock.TlsPassthrough, - )) + out.append(EgressRoute( + host=r.Host, + path_allowlist=r.PathAllowlist, + auth_scheme=r.AuthScheme, + token_ref=r.TokenRef, + roles=r.Role, + tls_passthrough=r.Pipelock.TlsPassthrough, + )) return tuple(out) @@ -185,90 +163,39 @@ def egress_routes_for_bottle( bottle: Bottle, provider_routes: tuple[EgressRoute, ...] = (), ) -> tuple[EgressRoute, ...]: - """Effective egress routes for the agent. This is what gets rendered - into routes.yaml and what the addon enforces. + """Effective egress routes for the agent. - Merges manifest-declared routes with provider-owned routes. The - manifest is the primary surface; `provider_routes` are synthesised - by `agent_provision_plan` and may add or upgrade manifest entries. - Provider routes that conflict with an existing authenticated manifest - route (different auth scheme or token ref) raise a hard error.""" - routes = list(egress_manifest_routes(bottle)) - for pr in provider_routes: - routes = _merge_provider_route(routes, pr) - return tuple(routes) + Provider routes own their hosts outright; manifest routes for hosts + not claimed by any provider are appended. Token slots are assigned + in a final pass over the merged list in order, so provisioned routes + get the lower slot numbers.""" + manifest = egress_manifest_routes(bottle) + provisioned_hosts = {pr.host.lower() for pr in provider_routes} + merged = list(provider_routes) + [ + r for r in manifest if r.host.lower() not in provisioned_hosts + ] + return _assign_token_slots(merged) -def _find_or_alloc_token_env(routes: list[EgressRoute], token_ref: str) -> str: - """Return the existing token_env slot for `token_ref`, or allocate the next one.""" - if not token_ref: - return "" - for route in routes: - if route.token_ref == token_ref and route.token_env: - return route.token_env - return f"EGRESS_TOKEN_{len({r.token_env for r in routes if r.token_env})}" +def _assign_token_slots( + routes: list[EgressRoute], +) -> tuple[EgressRoute, ...]: + """Assign EGRESS_TOKEN_N slots to authenticated routes in order. - -def _merge_provider_route( - routes: list[EgressRoute], pr: EgressRoute, -) -> list[EgressRoute]: - """Merge one provider-declared route into the manifest route list. - - Upgrade a bare-pass manifest route to authenticated if the provider - declares auth for that host, or append if the host isn't in the manifest. - Identical auth (same scheme + token_ref) on an existing route is a - no-op, with a tls_passthrough upgrade if the provider route sets it. - Conflicting auth (different scheme or token_ref) dies.""" - for idx, route in enumerate(routes): - if route.host.lower() != pr.host.lower(): - continue - if route.auth_scheme or route.token_ref: - if route.auth_scheme == pr.auth_scheme and route.token_ref == pr.token_ref: - if pr.tls_passthrough and not route.tls_passthrough: - routes[idx] = EgressRoute( - host=route.host, - path_allowlist=route.path_allowlist, - auth_scheme=route.auth_scheme, - token_env=route.token_env, - token_ref=route.token_ref, - roles=route.roles, - tls_passthrough=True, - ) - return routes - die( - f"provider egress route for {pr.host!r} conflicts with an " - f"authenticated manifest route (different auth scheme or token " - f"ref). Remove the manifest route's auth block or disable the " - f"feature that adds this provider route." - ) - token_env = ( - _find_or_alloc_token_env(routes, pr.token_ref) - if pr.auth_scheme and pr.token_ref - else "" - ) - routes[idx] = EgressRoute( - host=route.host, - path_allowlist=route.path_allowlist, - auth_scheme=pr.auth_scheme, - token_env=token_env, - token_ref=pr.token_ref, - roles=route.roles, - tls_passthrough=pr.tls_passthrough, - ) - return routes - token_env = ( - _find_or_alloc_token_env(routes, pr.token_ref) - if pr.auth_scheme and pr.token_ref - else "" - ) - routes.append(EgressRoute( - host=pr.host, - auth_scheme=pr.auth_scheme, - token_env=token_env, - token_ref=pr.token_ref, - tls_passthrough=pr.tls_passthrough, - )) - return routes + Routes sharing a token_ref share a slot. Unauthenticated routes + (no auth_scheme / token_ref) keep token_env empty.""" + slot_for_ref: dict[str, str] = {} + out: list[EgressRoute] = [] + for r in routes: + if r.auth_scheme and r.token_ref: + slot = slot_for_ref.get(r.token_ref) + if slot is None: + slot = f"EGRESS_TOKEN_{len(slot_for_ref)}" + slot_for_ref[r.token_ref] = slot + out.append(dataclasses.replace(r, token_env=slot)) + else: + out.append(r) + return tuple(out) def egress_token_env_map( @@ -296,35 +223,43 @@ def egress_token_env_map( return out +def _route_to_yaml_fields(r: EgressRoute) -> dict: + """Return the addon-visible fields for one route. + + Single authoritative mapping between EgressRoute (host-side) and + egress_addon_core.Route (sidecar-side). When a field is added to + the addon's Route that must appear in the YAML, add it here and + in egress_addon_core._parse_one together.""" + fields: dict = {"host": r.host} + if r.auth_scheme and r.token_env: + fields["auth_scheme"] = r.auth_scheme + fields["token_env"] = r.token_env + if r.path_allowlist: + fields["path_allowlist"] = list(r.path_allowlist) + return fields + + def egress_render_routes( routes: tuple[EgressRoute, ...], ) -> str: """Serialize the route table for the addon to read. - YAML content — no token values, no host env-var names. The only - thing the addon needs at runtime is the host → path_allowlist - + auth_scheme + in-container env-var mapping. The actual token - values arrive via the container's environ. - - Authenticated routes carry `auth_scheme` + `token_env`; - unauthenticated routes omit both keys (the addon's parser - enforces both-or-neither). Hand-rolled YAML in the style of - `pipelock_render_yaml` so the addon's parser - (`yaml_subset.parse_yaml_subset`) round-trips it cleanly.""" + YAML content — no token values, no host env-var names. Fields are + determined by `_route_to_yaml_fields`, which is the single point of + truth for the EgressRoute → egress_addon_core.Route mapping.""" lines: list[str] = ["routes:"] if not routes: - # `routes:` with an empty list on the same line — the parser - # needs SOMETHING here. Empty inline list is the cleanest. lines[0] = "routes: []" return "\n".join(lines) + "\n" for r in routes: - lines.append(f' - host: "{r.host}"') - if r.auth_scheme and r.token_env: - lines.append(f' auth_scheme: "{r.auth_scheme}"') - lines.append(f' token_env: "{r.token_env}"') - if r.path_allowlist: + f = _route_to_yaml_fields(r) + lines.append(f' - host: "{f["host"]}"') + if "auth_scheme" in f: + lines.append(f' auth_scheme: "{f["auth_scheme"]}"') + lines.append(f' token_env: "{f["token_env"]}"') + if "path_allowlist" in f: lines.append(" path_allowlist:") - for p in r.path_allowlist: + for p in f["path_allowlist"]: lines.append(f' - "{p}"') return "\n".join(lines) + "\n" diff --git a/tests/unit/test_egress.py b/tests/unit/test_egress.py index f1129cc..ade8cff 100644 --- a/tests/unit/test_egress.py +++ b/tests/unit/test_egress.py @@ -33,8 +33,10 @@ def _provider_route(host: str, token_ref: str, *, tls_passthrough: bool = False) ) -class TestRoutesForBottle(unittest.TestCase): - def test_authenticated_route_gets_slot(self): +class TestManifestRouteLift(unittest.TestCase): + """egress_manifest_routes is a pure lifter — no slot assignment.""" + + def test_authenticated_route_lifted_without_slot(self): b = _bottle([{ "host": "api.github.com", "auth": {"scheme": "Bearer", "token_ref": "GH_PAT"}, @@ -44,8 +46,8 @@ class TestRoutesForBottle(unittest.TestCase): r = routes[0] self.assertEqual("api.github.com", r.host) self.assertEqual("Bearer", r.auth_scheme) - self.assertEqual("EGRESS_TOKEN_0", r.token_env) self.assertEqual("GH_PAT", r.token_ref) + self.assertEqual("", r.token_env) # slot assigned later self.assertEqual((), r.path_allowlist) def test_unauthenticated_route_has_empty_auth_fields(self): @@ -57,6 +59,20 @@ class TestRoutesForBottle(unittest.TestCase): self.assertEqual("", r.token_ref) self.assertEqual(("/x/",), r.path_allowlist) + +class TestSlotAssignment(unittest.TestCase): + """Slot assignment happens in egress_routes_for_bottle.""" + + def test_authenticated_route_gets_slot(self): + b = _bottle([{ + "host": "api.github.com", + "auth": {"scheme": "Bearer", "token_ref": "GH_PAT"}, + }]) + routes = egress_routes_for_bottle(b) + r = routes[0] + self.assertEqual("EGRESS_TOKEN_0", r.token_env) + self.assertEqual("GH_PAT", r.token_ref) + def test_shared_token_ref_collapses_to_one_slot(self): b = _bottle([ {"host": "api.github.com", @@ -64,7 +80,7 @@ class TestRoutesForBottle(unittest.TestCase): {"host": "github.com", "auth": {"scheme": "Bearer", "token_ref": "GH_PAT"}}, ]) - routes = egress_manifest_routes(b) + routes = egress_routes_for_bottle(b) slots = {r.token_env for r in routes} self.assertEqual({"EGRESS_TOKEN_0"}, slots) @@ -75,7 +91,7 @@ class TestRoutesForBottle(unittest.TestCase): {"host": "b.example", "auth": {"scheme": "Bearer", "token_ref": "T2"}}, ]) - routes = egress_manifest_routes(b) + routes = egress_routes_for_bottle(b) slots = [r.token_env for r in routes] self.assertEqual(["EGRESS_TOKEN_0", "EGRESS_TOKEN_1"], slots) @@ -89,7 +105,7 @@ class TestRoutesForBottle(unittest.TestCase): {"host": "b.example", "auth": {"scheme": "Bearer", "token_ref": "T2"}}, ]) - routes = egress_manifest_routes(b) + routes = egress_routes_for_bottle(b) authed = [r.token_env for r in routes if r.token_env] self.assertEqual(["EGRESS_TOKEN_0", "EGRESS_TOKEN_1"], authed) self.assertEqual("", routes[1].token_env) @@ -133,7 +149,7 @@ class TestRoutesForBottleManifestOnly(unittest.TestCase): class TestProviderRouteMerge(unittest.TestCase): - """Provider routes are merged into manifest routes generically.""" + """Provider routes win on host collision; manifest fills the rest.""" def test_provider_route_appended_when_not_in_manifest(self): b = _bottle([]) @@ -156,8 +172,9 @@ class TestProviderRouteMerge(unittest.TestCase): self.assertEqual("", routes[0].token_ref) self.assertEqual({}, egress_token_env_map(routes)) - def test_unauthenticated_provider_route_upgrades_bare_without_token_slot(self): - b = _bottle([{"host": "api.openai.com"}]) + def test_provider_route_wins_over_bare_manifest_route(self): + # Provisioned host wins outright; manifest path_allowlist is dropped. + b = _bottle([{"host": "api.openai.com", "path_allowlist": ["/v1/"]}]) pr = EgressRoute(host="api.openai.com", tls_passthrough=True) routes = egress_routes_for_bottle(b, (pr,)) self.assertEqual(1, len(routes)) @@ -165,6 +182,7 @@ class TestProviderRouteMerge(unittest.TestCase): self.assertEqual("", routes[0].token_env) self.assertEqual("", routes[0].token_ref) self.assertTrue(routes[0].tls_passthrough) + self.assertEqual((), routes[0].path_allowlist) self.assertEqual({}, egress_token_env_map(routes)) def test_two_provider_routes_with_same_token_ref_share_slot(self): @@ -177,8 +195,11 @@ class TestProviderRouteMerge(unittest.TestCase): self.assertEqual("EGRESS_TOKEN_0", routes[0].token_env) self.assertEqual("EGRESS_TOKEN_0", routes[1].token_env) - def test_provider_route_upgrades_bare_manifest_route(self): - b = _bottle([{"host": "chatgpt.com", "path_allowlist": ["/backend-api/"]}]) + def test_provider_route_wins_over_authed_manifest_route(self): + # Provider wins even when manifest has its own auth for the host. + b = _bottle([{"host": "chatgpt.com", + "path_allowlist": ["/backend-api/"], + "auth": {"scheme": "Bearer", "token_ref": "OTHER"}}]) pr = _provider_route("chatgpt.com", CODEX_HOST_CREDENTIAL_TOKEN_REF) routes = egress_routes_for_bottle(b, (pr,)) self.assertEqual(1, len(routes)) @@ -186,38 +207,20 @@ class TestProviderRouteMerge(unittest.TestCase): self.assertEqual("Bearer", routes[0].auth_scheme) self.assertEqual("EGRESS_TOKEN_0", routes[0].token_env) self.assertEqual(CODEX_HOST_CREDENTIAL_TOKEN_REF, routes[0].token_ref) - self.assertEqual(("/backend-api/",), routes[0].path_allowlist) + self.assertEqual((), routes[0].path_allowlist) - def test_provider_route_noop_when_same_auth_already_in_manifest(self): - b = _bottle([{ - "host": "api.openai.com", - "auth": {"scheme": "Bearer", "token_ref": CODEX_HOST_CREDENTIAL_TOKEN_REF}, - }]) + def test_manifest_route_preserved_for_non_provisioned_host(self): + b = _bottle([ + {"host": "api.openai.com"}, + {"host": "api.github.com", + "auth": {"scheme": "Bearer", "token_ref": "GH_PAT"}}, + ]) pr = _provider_route("api.openai.com", CODEX_HOST_CREDENTIAL_TOKEN_REF) routes = egress_routes_for_bottle(b, (pr,)) - self.assertEqual(1, len(routes)) - self.assertEqual("EGRESS_TOKEN_0", routes[0].token_env) - - def test_provider_route_upgrades_tls_passthrough_on_existing_same_auth(self): - b = _bottle([{ - "host": "api.openai.com", - "auth": {"scheme": "Bearer", "token_ref": CODEX_HOST_CREDENTIAL_TOKEN_REF}, - }]) - pr = _provider_route( - "api.openai.com", CODEX_HOST_CREDENTIAL_TOKEN_REF, tls_passthrough=True, - ) - routes = egress_routes_for_bottle(b, (pr,)) - self.assertEqual(1, len(routes)) - self.assertTrue(routes[0].tls_passthrough) - - def test_provider_route_conflicts_with_different_authed_manifest_route(self): - b = _bottle([{ - "host": "chatgpt.com", - "auth": {"scheme": "Bearer", "token_ref": "OTHER"}, - }]) - pr = _provider_route("chatgpt.com", CODEX_HOST_CREDENTIAL_TOKEN_REF) - with self.assertRaises(Die): - egress_routes_for_bottle(b, (pr,)) + hosts = [r.host for r in routes] + self.assertEqual(["api.openai.com", "api.github.com"], hosts) + self.assertEqual(CODEX_HOST_CREDENTIAL_TOKEN_REF, routes[0].token_ref) + self.assertEqual("GH_PAT", routes[1].token_ref) def test_provider_route_tls_passthrough_set_on_appended_route(self): b = _bottle([]) @@ -225,7 +228,7 @@ class TestProviderRouteMerge(unittest.TestCase): routes = egress_routes_for_bottle(b, (pr,)) self.assertTrue(routes[0].tls_passthrough) - def test_provider_route_tls_passthrough_set_on_upgraded_bare_route(self): + def test_provider_route_tls_passthrough_wins_over_bare_manifest_route(self): b = _bottle([{"host": "api.openai.com"}]) pr = _provider_route("api.openai.com", "TOK", tls_passthrough=True) routes = egress_routes_for_bottle(b, (pr,)) @@ -239,7 +242,7 @@ class TestTokenEnvMap(unittest.TestCase): "auth": {"scheme": "Bearer", "token_ref": "T1"}}, {"host": "passthrough.example"}, ]) - routes = egress_manifest_routes(b) + routes = egress_routes_for_bottle(b) m = egress_token_env_map(routes) self.assertEqual({"EGRESS_TOKEN_0": "T1"}, m) @@ -263,7 +266,7 @@ class TestRenderRoutes(unittest.TestCase): "auth": {"scheme": "Bearer", "token_ref": "GH_PAT"}, "path_allowlist": ["/repos/x/"], }]) - routes = egress_manifest_routes(b) + routes = egress_routes_for_bottle(b) parsed = self._parsed(routes) self.assertEqual( [{ @@ -281,7 +284,7 @@ class TestRenderRoutes(unittest.TestCase): # enforces both-or-neither, so emitting empty strings would # round-trip as a partial pair and crash. b = _bottle([{"host": "github.com", "path_allowlist": ["/x/"]}]) - routes = egress_manifest_routes(b) + routes = egress_routes_for_bottle(b) entry = self._parsed(routes)[0] self.assertNotIn("auth_scheme", entry) self.assertNotIn("token_env", entry) @@ -291,7 +294,7 @@ class TestRenderRoutes(unittest.TestCase): "host": "api.anthropic.com", "auth": {"scheme": "Bearer", "token_ref": "CL"}, }]) - routes = egress_manifest_routes(b) + routes = egress_routes_for_bottle(b) self.assertNotIn("path_allowlist", self._parsed(routes)[0]) def test_empty_routes_round_trips(self): @@ -310,7 +313,7 @@ class TestRenderRoutes(unittest.TestCase): {"host": "github.com", "path_allowlist": ["/x/"]}, {"host": "api.anthropic.com"}, ]) - routes = egress_manifest_routes(b) + routes = egress_routes_for_bottle(b) addon_routes = load_routes(egress_render_routes(routes)) self.assertEqual(3, len(addon_routes)) self.assertEqual("Bearer", addon_routes[0].auth_scheme)