refactor: provision egress routes via AgentProvisionPlan
test / unit (pull_request) Failing after 31s
test / integration (pull_request) Failing after 18s

Remove provider-specific branching from egress.py and pipelock.py.
Previously, `egress_routes_for_bottle` and `pipelock_effective_tls_passthrough`
both contained `template == "codex"` checks — the same pattern the rest
of the PR moved out of the backends.

Root cause: `EgressRoute` had no `tls_passthrough` field, so pipelock
couldn't learn from the synthesised Codex routes that they needed
passthrough. Fix:

- Add `EgressRoute.tls_passthrough: bool`. `egress_manifest_routes` lifts
  the existing `pipelock.tls_passthrough` manifest flag here; provider
  routes set it directly.
- Add `AgentProvisionPlan.egress_routes`. `agent_provision_plan` populates
  it for Codex + `forward_host_credentials`, including `tls_passthrough=True`.
- Replace Codex-specific `egress_routes_for_bottle` logic with a generic
  `_merge_provider_route` helper. Backends call `egress_routes_for_bottle(bottle,
  plan.egress_routes)`; no provider type checks inside egress or pipelock.
- Rewrite `pipelock_effective_tls_passthrough` to read `route.tls_passthrough`
  from the merged route set instead of re-implementing the provider check.
- Both backends now call `agent_provision_plan` before `Egress.prepare` and
  `PipelockProxy.prepare`, threading `plan.egress_routes` to both. `has_provider_auth`
  is derived from `egress_manifest_routes` (manifest routes only — provider
  routes carry no auth roles, so the result is identical).

Assisted-by: Claude Code
This commit is contained in:
2026-06-01 23:27:27 +00:00
parent 1fceaae8e6
commit b79b49090f
8 changed files with 350 additions and 214 deletions
+21
View File
@@ -13,11 +13,17 @@ from pathlib import Path
from typing import Literal
from .codex_auth import write_codex_dummy_auth_file
from .egress import CODEX_HOST_CREDENTIAL_TOKEN_REF, EgressRoute
PROVIDER_CLAUDE = "claude"
PROVIDER_CODEX = "codex"
PROVIDER_TEMPLATES = frozenset({PROVIDER_CLAUDE, PROVIDER_CODEX})
# Hosts that egress injects the host ChatGPT bearer on when Codex
# forward_host_credentials is enabled. Pipelock must pass these through
# (no TLS MITM) or its header DLP blocks the injected JWT.
CODEX_HOST_CREDENTIAL_HOSTS = ("api.openai.com", "chatgpt.com")
PromptMode = Literal["append_file", "read_prompt_file"]
@@ -63,6 +69,11 @@ class AgentProvisionPlan:
Backends interpret this plan with their own copy/exec primitives.
Provider-specific content stays here so future provider plugins can
return the same shape without adding backend-plan fields.
`egress_routes` are provider-declared EgressRoutes that backends
pass to `Egress.prepare` and `PipelockProxy.prepare`. This keeps
provider logic out of the egress and pipelock modules — they merge
provider routes generically without knowing the provider type.
"""
template: str
@@ -76,6 +87,7 @@ class AgentProvisionPlan:
files: tuple[AgentProvisionFile, ...] = ()
pre_copy: tuple[AgentProvisionCommand, ...] = ()
verify: tuple[AgentProvisionCommand, ...] = ()
egress_routes: tuple[EgressRoute, ...] = ()
_REPO_ROOT = Path(__file__).resolve().parent.parent
@@ -131,6 +143,7 @@ def agent_provision_plan(
files: list[AgentProvisionFile] = []
pre_copy: list[AgentProvisionCommand] = []
verify: list[AgentProvisionCommand] = []
egress_routes: list[EgressRoute] = []
if template == PROVIDER_CODEX:
env_vars["CODEX_CA_CERTIFICATE"] = "/etc/ssl/certs/ca-certificates.crt"
@@ -148,6 +161,13 @@ def agent_provision_plan(
files.append(AgentProvisionFile(config_file, config_path))
if forward_host_credentials:
for host in CODEX_HOST_CREDENTIAL_HOSTS:
egress_routes.append(EgressRoute(
host=host,
auth_scheme="Bearer",
token_ref=CODEX_HOST_CREDENTIAL_TOKEN_REF,
tls_passthrough=True,
))
auth_file = state_dir / "codex-auth.json"
write_codex_dummy_auth_file(auth_file, host_env or dict(os.environ))
files.append(AgentProvisionFile(auth_file, f"{auth_dir}/auth.json"))
@@ -188,6 +208,7 @@ def agent_provision_plan(
files=tuple(files),
pre_copy=tuple(pre_copy),
verify=tuple(verify),
egress_routes=tuple(egress_routes),
)
+46 -41
View File
@@ -16,7 +16,7 @@ from dataclasses import replace
from pathlib import Path
from ...agent_provider import agent_provision_plan, runtime_for
from ...egress import Egress
from ...egress import Egress, egress_manifest_routes
from ...env import ResolvedEnv, resolve_env
from ...git_gate import GitGate
from ...log import die
@@ -159,17 +159,57 @@ def resolve_plan(
prompt_file.write_text("")
prompt_file.chmod(0o600)
pipelock_dir = pipelock_state_dir(slug)
pipelock_dir.mkdir(parents=True, exist_ok=True)
proxy_plan = proxy.prepare(bottle, slug, pipelock_dir)
git_gate_dir = git_gate_state_dir(slug)
git_gate_dir.mkdir(parents=True, exist_ok=True)
git_gate_plan = git_gate.prepare(bottle, slug, git_gate_dir)
resolved = resolve_env(manifest, spec.agent_name)
# Everything that should reach the bottle by-name (so its value
# never lands on argv or in env_file) goes into one dict. Nothing
# mutates the host os.environ.
forwarded_env: dict[str, str] = dict(resolved.forwarded)
# Some provider CLIs refuse to start without *some* credential
# env var even when egress will strip + re-inject the real
# Authorization header. For those providers, auth_role names the
# route marker that enables a non-secret placeholder env. Codex is
# intentionally absent here: it should use its device/ChatGPT login
# state, and an OPENAI_API_KEY placeholder would force API-key auth.
has_provider_auth = any(
provider_runtime.auth_role
and provider_runtime.auth_role in r.roles
for r in egress_manifest_routes(bottle)
)
if has_provider_auth and provider_runtime.placeholder_env:
forwarded_env[provider_runtime.placeholder_env] = "egress-placeholder"
_write_env_file(resolved, env_file)
prompt_file.write_text(agent.prompt)
use_runsc = docker_mod.runsc_available()
agent_provision = agent_provision_plan(
template=provider.template,
dockerfile=dockerfile_path,
state_dir=agent_dir,
guest_home=os.environ.get("BOT_BOTTLE_CONTAINER_HOME", "/home/node"),
forward_host_credentials=provider.forward_host_credentials,
has_provider_auth=has_provider_auth,
host_env=dict(os.environ),
)
guest_env = dict(agent_provision.guest_env)
for key, val in agent_provision.env_vars.items():
guest_env.setdefault(key, val)
agent_provision = replace(agent_provision, guest_env=guest_env)
pipelock_dir = pipelock_state_dir(slug)
pipelock_dir.mkdir(parents=True, exist_ok=True)
proxy_plan = proxy.prepare(
bottle, slug, pipelock_dir, agent_provision.egress_routes,
)
egress_dir = egress_state_dir(slug)
egress_dir.mkdir(parents=True, exist_ok=True)
egress_plan = egress.prepare(bottle, slug, egress_dir)
egress_plan = egress.prepare(
bottle, slug, egress_dir, agent_provision.egress_routes,
)
supervise_plan = None
if bottle.supervise:
@@ -197,41 +237,6 @@ def resolve_plan(
slug, supervise_dir,
dockerfile_content=dockerfile_content,
)
resolved = resolve_env(manifest, spec.agent_name)
# Everything that should reach the bottle by-name (so its value
# never lands on argv or in env_file) goes into one dict. Nothing
# mutates the host os.environ.
forwarded_env: dict[str, str] = dict(resolved.forwarded)
# Some provider CLIs refuse to start without *some* credential
# env var even when egress will strip + re-inject the real
# Authorization header. For those providers, auth_role names the
# route marker that enables a non-secret placeholder env. Codex is
# intentionally absent here: it should use its device/ChatGPT login
# state, and an OPENAI_API_KEY placeholder would force API-key auth.
has_provider_auth = any(
provider_runtime.auth_role
and provider_runtime.auth_role in r.roles
for r in egress_plan.routes
)
if has_provider_auth and provider_runtime.placeholder_env:
forwarded_env[provider_runtime.placeholder_env] = "egress-placeholder"
_write_env_file(resolved, env_file)
prompt_file.write_text(agent.prompt)
use_runsc = docker_mod.runsc_available()
agent_provision = agent_provision_plan(
template=provider.template,
dockerfile=dockerfile_path,
state_dir=agent_dir,
guest_home=os.environ.get("BOT_BOTTLE_CONTAINER_HOME", "/home/node"),
forward_host_credentials=provider.forward_host_credentials,
has_provider_auth=has_provider_auth,
host_env=dict(os.environ),
)
guest_env = dict(agent_provision.guest_env)
for key, val in agent_provision.env_vars.items():
guest_env.setdefault(key, val)
agent_provision = replace(agent_provision, guest_env=guest_env)
return DockerBottlePlan(
spec=spec,
+26 -21
View File
@@ -16,6 +16,7 @@ from dataclasses import replace
from pathlib import Path
from ...agent_provider import agent_provision_plan, runtime_for
from ...egress import egress_manifest_routes
from ...backend import BottleSpec
from ...backend.docker.bottle_state import (
BottleMetadata,
@@ -95,24 +96,10 @@ def resolve_plan(
"REQUESTS_CA_BUNDLE": "/etc/ssl/certs/ca-certificates.crt",
}
# Inner Plans for the four bundle daemons. The ABCs are
# platform-neutral — `.prepare()` writes config files + returns
# a Plan dataclass with no backend-specific assumptions. State
# dirs are still keyed by slug under the docker backend's
# bottle_state layout (shared on-host convention; not a docker
# dependency).
pipelock_dir = pipelock_state_dir(slug)
pipelock_dir.mkdir(parents=True, exist_ok=True)
proxy_plan = PipelockProxy().prepare(bottle, slug, pipelock_dir)
git_gate_dir = git_gate_state_dir(slug)
git_gate_dir.mkdir(parents=True, exist_ok=True)
git_gate_plan = GitGate().prepare(bottle, slug, git_gate_dir)
egress_dir = egress_state_dir(slug)
egress_dir.mkdir(parents=True, exist_ok=True)
egress_plan = Egress().prepare(bottle, slug, egress_dir)
# Some provider CLIs refuse to start without *some* credential
# env var even when egress will strip + re-inject the real
# Authorization header. For those providers, auth_role names the
@@ -122,17 +109,11 @@ def resolve_plan(
has_provider_auth = any(
provider_runtime.auth_role
and provider_runtime.auth_role in r.roles
for r in egress_plan.routes
for r in egress_manifest_routes(bottle)
)
if has_provider_auth and provider_runtime.placeholder_env:
guest_env[provider_runtime.placeholder_env] = "egress-placeholder"
supervise_plan = None
if bottle.supervise:
supervise_dir = supervise_state_dir(slug)
supervise_dir.mkdir(parents=True, exist_ok=True)
supervise_plan = Supervise().prepare(slug, supervise_dir)
# Prompt file is always written (mode 0o600) so the in-VM
# path always exists. Content is the agent's `prompt`
# field (markdown body) — empty for agents with no prompt.
@@ -175,6 +156,30 @@ def resolve_plan(
merged_guest_env.setdefault(key, val)
agent_provision = replace(agent_provision, guest_env=merged_guest_env)
# Inner Plans for the four bundle daemons. The ABCs are
# platform-neutral — `.prepare()` writes config files + returns
# a Plan dataclass with no backend-specific assumptions. State
# dirs are still keyed by slug under the docker backend's
# bottle_state layout (shared on-host convention; not a docker
# dependency).
pipelock_dir = pipelock_state_dir(slug)
pipelock_dir.mkdir(parents=True, exist_ok=True)
proxy_plan = PipelockProxy().prepare(
bottle, slug, pipelock_dir, agent_provision.egress_routes,
)
egress_dir = egress_state_dir(slug)
egress_dir.mkdir(parents=True, exist_ok=True)
egress_plan = Egress().prepare(
bottle, slug, egress_dir, agent_provision.egress_routes,
)
supervise_plan = None
if bottle.supervise:
supervise_dir = supervise_state_dir(slug)
supervise_dir.mkdir(parents=True, exist_ok=True)
supervise_plan = Supervise().prepare(slug, supervise_dir)
return SmolmachinesBottlePlan(
spec=spec,
stage_dir=stage_dir,
+73 -51
View File
@@ -31,7 +31,6 @@ from pathlib import Path
from .log import die
from .manifest import Bottle
CODEX_HOST_CREDENTIAL_HOSTS = ("api.openai.com", "chatgpt.com")
CODEX_HOST_CREDENTIAL_TOKEN_REF = "BOT_BOTTLE_CODEX_HOST_ACCESS_TOKEN"
@@ -69,7 +68,13 @@ class EgressRoute:
`roles` carries the manifest route's optional role markers (see
`manifest.EGRESS_ROLES`). The launch step reads these for
side effects like the claude-code OAuth placeholder env."""
side effects like the claude-code OAuth placeholder env.
`tls_passthrough` signals that pipelock must not TLS-MITM this
host — either because the manifest declared `pipelock.tls_passthrough:
true` (lifted in `egress_manifest_routes`) or because a provider
route set it (e.g. egress injects its own Bearer on that host
after the agent boundary and pipelock's header DLP would block it)."""
host: str
path_allowlist: tuple[str, ...] = ()
@@ -77,6 +82,7 @@ class EgressRoute:
token_env: str = ""
token_ref: str = ""
roles: tuple[str, ...] = ()
tls_passthrough: bool = False
@dataclass(frozen=True)
@@ -161,84 +167,94 @@ def egress_manifest_routes(
token_env=token_env,
token_ref=r.TokenRef,
roles=r.Role,
tls_passthrough=r.Pipelock.TlsPassthrough,
))
else:
out.append(EgressRoute(
host=r.Host,
path_allowlist=r.PathAllowlist,
roles=r.Role,
tls_passthrough=r.Pipelock.TlsPassthrough,
))
return tuple(out)
def egress_routes_for_bottle(
bottle: Bottle,
provider_routes: tuple[EgressRoute, ...] = (),
) -> tuple[EgressRoute, ...]:
"""Effective egress routes. This is what gets rendered into
routes.yaml + what the addon enforces.
"""Effective egress routes for the agent. This is what gets rendered
into routes.yaml and what the addon enforces.
Operators that want to allow a host usually declare it directly in
`bottle.egress.routes` as an authenticated route or bare-pass entry
(`- host: <name>`). Codex host-credential forwarding is the
provider-owned exception: when explicitly enabled, it adds or
upgrades the Codex API hosts to egress-owned authenticated routes. The
legacy `bottle.egress.allowlist` folding is gone — egress is the
single allowlist surface."""
Merges manifest-declared routes with provider-owned routes. The
manifest is the primary surface; `provider_routes` are synthesised
by `agent_provision_plan` and may add or upgrade manifest entries.
Provider routes that conflict with an existing authenticated manifest
route (different auth scheme or token ref) raise a hard error."""
routes = list(egress_manifest_routes(bottle))
if not bottle.agent_provider.forward_host_credentials:
return tuple(routes)
if bottle.agent_provider.template != "codex":
return tuple(routes)
for host in CODEX_HOST_CREDENTIAL_HOSTS:
routes = _ensure_codex_host_credential_route(routes, host)
for pr in provider_routes:
routes = _merge_provider_route(routes, pr)
return tuple(routes)
def _next_token_env(routes: list[EgressRoute]) -> str:
def _find_or_alloc_token_env(routes: list[EgressRoute], token_ref: str) -> str:
"""Return the existing token_env slot for `token_ref`, or allocate the next one."""
for route in routes:
if route.token_ref == token_ref and route.token_env:
return route.token_env
return f"EGRESS_TOKEN_{len({r.token_env for r in routes if r.token_env})}"
def _codex_host_credential_token_env(routes: list[EgressRoute]) -> str:
for route in routes:
if route.token_ref == CODEX_HOST_CREDENTIAL_TOKEN_REF:
return route.token_env
return _next_token_env(routes)
def _ensure_codex_host_credential_route(
routes: list[EgressRoute], host: str,
def _merge_provider_route(
routes: list[EgressRoute], pr: EgressRoute,
) -> list[EgressRoute]:
"""Merge one provider-declared route into the manifest route list.
Upgrade a bare-pass manifest route to authenticated if the provider
declares auth for that host, or append if the host isn't in the manifest.
Identical auth (same scheme + token_ref) on an existing route is a
no-op, with a tls_passthrough upgrade if the provider route sets it.
Conflicting auth (different scheme or token_ref) dies."""
for idx, route in enumerate(routes):
if route.host.lower() != host:
if route.host.lower() != pr.host.lower():
continue
if route.auth_scheme or route.token_ref:
if (
route.auth_scheme == "Bearer"
and route.token_ref == CODEX_HOST_CREDENTIAL_TOKEN_REF
):
if route.auth_scheme == pr.auth_scheme and route.token_ref == pr.token_ref:
if pr.tls_passthrough and not route.tls_passthrough:
routes[idx] = EgressRoute(
host=route.host,
path_allowlist=route.path_allowlist,
auth_scheme=route.auth_scheme,
token_env=route.token_env,
token_ref=route.token_ref,
roles=route.roles,
tls_passthrough=True,
)
return routes
die(
"codex host credential forwarding conflicts with an "
f"authenticated egress route for {host}. Remove that "
"route auth block or disable agent_provider.forward_host_credentials."
f"provider egress route for {pr.host!r} conflicts with an "
f"authenticated manifest route (different auth scheme or token "
f"ref). Remove the manifest route's auth block or disable the "
f"feature that adds this provider route."
)
token_env = _find_or_alloc_token_env(routes, pr.token_ref)
routes[idx] = EgressRoute(
host=route.host,
path_allowlist=route.path_allowlist,
auth_scheme="Bearer",
token_env=_codex_host_credential_token_env(routes),
token_ref=CODEX_HOST_CREDENTIAL_TOKEN_REF,
auth_scheme=pr.auth_scheme,
token_env=token_env,
token_ref=pr.token_ref,
roles=route.roles,
tls_passthrough=pr.tls_passthrough,
)
return routes
token_env = _find_or_alloc_token_env(routes, pr.token_ref)
routes.append(EgressRoute(
host=host,
auth_scheme="Bearer",
token_env=_codex_host_credential_token_env(routes),
token_ref=CODEX_HOST_CREDENTIAL_TOKEN_REF,
host=pr.host,
auth_scheme=pr.auth_scheme,
token_env=token_env,
token_ref=pr.token_ref,
tls_passthrough=pr.tls_passthrough,
))
return routes
@@ -338,18 +354,23 @@ class Egress(ABC):
sidecar's start/stop lifecycle is backend-specific and lives on
concrete subclasses."""
def prepare(self, bottle: Bottle, slug: str, stage_dir: Path) -> EgressPlan:
"""Lift `bottle.egress.routes` into resolved routes,
render the routes file (mode 600) under `stage_dir`, and
def prepare(
self,
bottle: Bottle,
slug: str,
stage_dir: Path,
provider_routes: tuple[EgressRoute, ...] = (),
) -> EgressPlan:
"""Lift `bottle.egress.routes` + `provider_routes` into resolved
routes, render the routes file (mode 600) under `stage_dir`, and
return the plan. Pure host-side, no docker subprocess. The
token-env map records the mapping the launch step uses to
forward values from the host's environ into the sidecar's
environ.
forward values from the host's environ into the sidecar's environ.
Returned plan is incomplete: the launch step must fill
`internal_network` / `egress_network` / `pipelock_proxy_url`
via `dataclasses.replace` before passing it to `.start`."""
routes = egress_routes_for_bottle(bottle)
routes = egress_routes_for_bottle(bottle, provider_routes)
routes_path = stage_dir / "egress_routes.yaml"
routes_path.write_text(egress_render_routes(routes))
routes_path.chmod(0o600)
@@ -361,6 +382,7 @@ class Egress(ABC):
)
__all__ = [
"CODEX_HOST_CREDENTIAL_TOKEN_REF",
"EGRESS_HOSTNAME",
"EGRESS_ROUTES_IN_CONTAINER",
"Egress",
+33 -37
View File
@@ -21,11 +21,7 @@ from dataclasses import dataclass
from pathlib import Path
from typing import cast
from .egress import (
CODEX_HOST_CREDENTIAL_HOSTS,
EGRESS_HOSTNAME,
egress_routes_for_bottle,
)
from .egress import EGRESS_HOSTNAME, EgressRoute, egress_routes_for_bottle
from .supervise import SUPERVISE_HOSTNAME
from .manifest import Bottle
@@ -54,14 +50,17 @@ PIPELOCK_HOSTNAME = "pipelock"
# --- Allowlist resolution --------------------------------------------------
def pipelock_effective_allowlist(bottle: Bottle) -> list[str]:
def pipelock_effective_allowlist(
bottle: Bottle,
provider_routes: tuple[EgressRoute, ...] = (),
) -> list[str]:
"""Hostnames pipelock allows. Sorted for stability.
Always mirrors `egress_routes_for_bottle(bottle)` — egress is the
single allowlist surface, and pipelock's allowlist is the downstream
copy for defense-in-depth + DLP body scanning. For bottles without
any `egress.routes[]` declared, this is empty except for supervise
sidecar traffic when `supervise: true`.
Always mirrors `egress_routes_for_bottle(bottle, provider_routes)` —
egress is the single allowlist surface, and pipelock's allowlist is
the downstream copy for defense-in-depth + DLP body scanning. For
bottles without any `egress.routes[]` declared, this is empty except
for supervise sidecar traffic when `supervise: true`.
The supervise sidecar's hostname is auto-added when supervise
is enabled (sibling-sidecar traffic that flows through pipelock
@@ -69,7 +68,7 @@ def pipelock_effective_allowlist(bottle: Bottle) -> list[str]:
`bottle.git` do NOT contribute here — git traffic flows
through git-gate (PRD 0008), not pipelock."""
seen: dict[str, None] = {}
for r in egress_routes_for_bottle(bottle):
for r in egress_routes_for_bottle(bottle, provider_routes):
if r.host:
seen.setdefault(r.host, None)
if bottle.supervise:
@@ -102,32 +101,23 @@ def pipelock_seed_phrase_detection_enabled(bottle: Bottle) -> bool:
return False
def pipelock_effective_tls_passthrough(bottle: Bottle) -> list[str]:
def pipelock_effective_tls_passthrough(
bottle: Bottle,
provider_routes: tuple[EgressRoute, ...] = (),
) -> list[str]:
"""Hostnames pipelock should pass through (no TLS MITM).
A route opts in with `pipelock.tls_passthrough: true`. This is
useful for provider API routes where egress injects the
Authorization header after the agent boundary; pipelock still
enforces the host allowlist but does not decrypt and scan that
provider request.
A manifest route opts in with `pipelock.tls_passthrough: true`
(lifted into `EgressRoute.tls_passthrough` in `egress_manifest_routes`).
Provider routes that set `tls_passthrough=True` (e.g. Codex credential
routes where egress injects the host bearer after the agent boundary)
are also included. Both arrive via `egress_routes_for_bottle` — no
provider-specific branching needed here.
"""
seen: dict[str, None] = {host: None for host in DEFAULT_TLS_PASSTHROUGH}
for route in bottle.egress.routes:
if route.Pipelock.TlsPassthrough:
seen.setdefault(route.Host, None)
# forward_host_credentials makes egress inject the host ChatGPT bearer
# on the Codex API hosts AFTER the agent boundary. Pipelock sits
# downstream of egress and DLP-scans request headers; left to MITM
# these routes it flags the injected JWT as a leaked secret
# ("request header contains secret") and blocks. Pass them through so
# pipelock still enforces the host allowlist on CONNECT but does not
# decrypt + rescan egress-owned auth. The auto-added routes live in
# egress_routes_for_bottle, not bottle.egress.routes, so add the
# hosts explicitly here.
provider = bottle.agent_provider
if provider.forward_host_credentials and provider.template == "codex":
for host in CODEX_HOST_CREDENTIAL_HOSTS:
seen.setdefault(host, None)
for route in egress_routes_for_bottle(bottle, provider_routes):
if route.tls_passthrough:
seen.setdefault(route.host, None)
return sorted(seen.keys())
@@ -159,6 +149,7 @@ def pipelock_build_config(
ca_cert_path: str = "",
ca_key_path: str = "",
ssrf_ip_allowlist: tuple[str, ...] = (),
provider_routes: tuple[EgressRoute, ...] = (),
) -> dict[str, object]:
"""Build the structured pipelock config dict the sidecar will load.
@@ -188,7 +179,7 @@ def pipelock_build_config(
"version": 1,
"mode": "strict",
"enforce": True,
"api_allowlist": pipelock_effective_allowlist(bottle),
"api_allowlist": pipelock_effective_allowlist(bottle, provider_routes),
"forward_proxy": {"enabled": True},
}
if not pipelock_seed_phrase_detection_enabled(bottle):
@@ -222,7 +213,7 @@ def pipelock_build_config(
"enabled": True,
"ca_cert": ca_cert_path,
"ca_key": ca_key_path,
"passthrough_domains": pipelock_effective_tls_passthrough(bottle),
"passthrough_domains": pipelock_effective_tls_passthrough(bottle, provider_routes),
}
effective_ssrf_ip_allowlist = pipelock_effective_ssrf_ip_allowlist(
bottle, ssrf_ip_allowlist,
@@ -336,7 +327,11 @@ class PipelockProxy:
(`PIPELOCK_CA_CERT_IN_CONTAINER` / `PIPELOCK_CA_KEY_IN_CONTAINER`)."""
def prepare(
self, bottle: Bottle, slug: str, stage_dir: Path
self,
bottle: Bottle,
slug: str,
stage_dir: Path,
provider_routes: tuple[EgressRoute, ...] = (),
) -> PipelockProxyPlan:
"""Write the pipelock yaml config (mode 600) under `stage_dir`
and return the plan for launch. Pure host-side, no docker
@@ -359,6 +354,7 @@ class PipelockProxy:
bottle,
ca_cert_path=PIPELOCK_CA_CERT_IN_CONTAINER,
ca_key_path=PIPELOCK_CA_KEY_IN_CONTAINER,
provider_routes=provider_routes,
)
yaml_path.write_text(pipelock_render_yaml(cfg))
yaml_path.chmod(0o600)
+47 -1
View File
@@ -8,7 +8,12 @@ import tempfile
import unittest
from pathlib import Path
from bot_bottle.agent_provider import agent_provision_plan, runtime_for
from bot_bottle.agent_provider import (
CODEX_HOST_CREDENTIAL_HOSTS,
agent_provision_plan,
runtime_for,
)
from bot_bottle.egress import CODEX_HOST_CREDENTIAL_TOKEN_REF
def _jwt(exp: int) -> str:
@@ -90,6 +95,47 @@ class TestAgentProviderRuntime(unittest.TestCase):
)
self.assertEqual("1", plan.env_vars["DISABLE_ERROR_REPORTING"])
def test_codex_forward_host_credentials_populates_egress_routes(self):
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
home = Path(tmp) / "host-codex"
home.mkdir()
(home / "auth.json").write_text(json.dumps({
"auth_mode": "chatgpt",
"tokens": {"access_token": _jwt(2000000000)},
}))
plan = agent_provision_plan(
template="codex",
dockerfile="",
state_dir=Path(tmp),
forward_host_credentials=True,
host_env={"CODEX_HOME": str(home)},
)
hosts = [r.host for r in plan.egress_routes]
self.assertEqual(sorted(CODEX_HOST_CREDENTIAL_HOSTS), sorted(hosts))
for r in plan.egress_routes:
self.assertEqual("Bearer", r.auth_scheme)
self.assertEqual(CODEX_HOST_CREDENTIAL_TOKEN_REF, r.token_ref)
self.assertTrue(r.tls_passthrough)
def test_codex_without_forward_host_credentials_has_no_egress_routes(self):
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
plan = agent_provision_plan(
template="codex",
dockerfile="",
state_dir=Path(tmp),
forward_host_credentials=False,
)
self.assertEqual((), plan.egress_routes)
def test_claude_plan_has_no_egress_routes(self):
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
plan = agent_provision_plan(
template="claude",
dockerfile="",
state_dir=Path(tmp),
)
self.assertEqual((), plan.egress_routes)
if __name__ == "__main__":
unittest.main()
+87 -54
View File
@@ -5,6 +5,7 @@ import unittest
from bot_bottle.egress import (
CODEX_HOST_CREDENTIAL_TOKEN_REF,
EgressRoute,
egress_manifest_routes,
egress_render_routes,
egress_resolve_token_values,
@@ -23,19 +24,13 @@ def _bottle(routes):
}).bottles["dev"]
def _codex_bottle(*, forward_host_credentials: bool, routes):
return Manifest.from_json_obj({
"bottles": {
"dev": {
"agent_provider": {
"template": "codex",
"forward_host_credentials": forward_host_credentials,
},
"egress": {"routes": routes},
}
},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
}).bottles["dev"]
def _provider_route(host: str, token_ref: str, *, tls_passthrough: bool = False) -> EgressRoute:
return EgressRoute(
host=host,
auth_scheme="Bearer",
token_ref=token_ref,
tls_passthrough=tls_passthrough,
)
class TestRoutesForBottle(unittest.TestCase):
@@ -100,9 +95,8 @@ class TestRoutesForBottle(unittest.TestCase):
self.assertEqual("", routes[1].token_env)
class TestRoutesForBottleUsesManifestOnly(unittest.TestCase):
"""The effective route table is exactly the manifest-declared
routes. Provider defaults are not injected implicitly."""
class TestRoutesForBottleManifestOnly(unittest.TestCase):
"""Without provider routes the effective table is exactly the manifest."""
def test_no_manifest_routes_means_no_effective_routes(self):
b = _bottle([])
@@ -123,58 +117,97 @@ class TestRoutesForBottleUsesManifestOnly(unittest.TestCase):
effective = [r.host for r in egress_routes_for_bottle(b)]
self.assertEqual(["x.example"], effective)
def test_codex_forward_host_credentials_adds_codex_routes(self):
b = _codex_bottle(forward_host_credentials=True, routes=[])
def test_tls_passthrough_lifted_from_manifest(self):
b = _bottle([{
"host": "api.openai.com",
"auth": {"scheme": "Bearer", "token_ref": "T"},
"pipelock": {"tls_passthrough": True},
}])
routes = egress_routes_for_bottle(b)
self.assertEqual(["api.openai.com", "chatgpt.com"], [r.host for r in routes])
self.assertTrue(routes[0].tls_passthrough)
def test_tls_passthrough_false_by_default(self):
b = _bottle([{"host": "api.github.com"}])
routes = egress_routes_for_bottle(b)
self.assertFalse(routes[0].tls_passthrough)
class TestProviderRouteMerge(unittest.TestCase):
"""Provider routes are merged into manifest routes generically."""
def test_provider_route_appended_when_not_in_manifest(self):
b = _bottle([])
pr = _provider_route("api.openai.com", "TOK")
routes = egress_routes_for_bottle(b, (pr,))
self.assertEqual(1, len(routes))
self.assertEqual("api.openai.com", routes[0].host)
self.assertEqual("Bearer", routes[0].auth_scheme)
self.assertEqual("EGRESS_TOKEN_0", routes[0].token_env)
self.assertEqual(CODEX_HOST_CREDENTIAL_TOKEN_REF, routes[0].token_ref)
self.assertEqual("Bearer", routes[1].auth_scheme)
self.assertEqual("EGRESS_TOKEN_0", routes[1].token_env)
self.assertEqual(CODEX_HOST_CREDENTIAL_TOKEN_REF, routes[1].token_ref)
self.assertEqual("TOK", routes[0].token_ref)
def test_codex_forward_host_credentials_upgrades_bare_chatgpt_route(self):
b = _codex_bottle(
forward_host_credentials=True,
routes=[{"host": "chatgpt.com", "path_allowlist": ["/backend-api/"]}],
)
routes = egress_routes_for_bottle(b)
self.assertEqual(2, len(routes))
def test_two_provider_routes_with_same_token_ref_share_slot(self):
b = _bottle([])
routes = egress_routes_for_bottle(b, (
_provider_route("api.openai.com", CODEX_HOST_CREDENTIAL_TOKEN_REF),
_provider_route("chatgpt.com", CODEX_HOST_CREDENTIAL_TOKEN_REF),
))
self.assertEqual(["api.openai.com", "chatgpt.com"], [r.host for r in routes])
self.assertEqual("EGRESS_TOKEN_0", routes[0].token_env)
self.assertEqual("EGRESS_TOKEN_0", routes[1].token_env)
def test_provider_route_upgrades_bare_manifest_route(self):
b = _bottle([{"host": "chatgpt.com", "path_allowlist": ["/backend-api/"]}])
pr = _provider_route("chatgpt.com", CODEX_HOST_CREDENTIAL_TOKEN_REF)
routes = egress_routes_for_bottle(b, (pr,))
self.assertEqual(1, len(routes))
self.assertEqual("chatgpt.com", routes[0].host)
self.assertEqual("Bearer", routes[0].auth_scheme)
self.assertEqual("EGRESS_TOKEN_0", routes[0].token_env)
self.assertEqual(CODEX_HOST_CREDENTIAL_TOKEN_REF, routes[0].token_ref)
self.assertEqual(("/backend-api/",), routes[0].path_allowlist)
self.assertEqual("api.openai.com", routes[1].host)
self.assertEqual("EGRESS_TOKEN_0", routes[1].token_env)
def test_codex_forward_host_credentials_accepts_explicit_synthetic_route(self):
b = _codex_bottle(
forward_host_credentials=True,
routes=[{
"host": "api.openai.com",
"auth": {
"scheme": "Bearer",
"token_ref": CODEX_HOST_CREDENTIAL_TOKEN_REF,
},
}],
)
routes = egress_routes_for_bottle(b)
self.assertEqual(["api.openai.com", "chatgpt.com"], [r.host for r in routes])
def test_provider_route_noop_when_same_auth_already_in_manifest(self):
b = _bottle([{
"host": "api.openai.com",
"auth": {"scheme": "Bearer", "token_ref": CODEX_HOST_CREDENTIAL_TOKEN_REF},
}])
pr = _provider_route("api.openai.com", CODEX_HOST_CREDENTIAL_TOKEN_REF)
routes = egress_routes_for_bottle(b, (pr,))
self.assertEqual(1, len(routes))
self.assertEqual("EGRESS_TOKEN_0", routes[0].token_env)
self.assertEqual("EGRESS_TOKEN_0", routes[1].token_env)
def test_codex_forward_host_credentials_conflicts_with_authed_route(self):
b = _codex_bottle(
forward_host_credentials=True,
routes=[{
"host": "chatgpt.com",
"auth": {"scheme": "Bearer", "token_ref": "OTHER"},
}],
def test_provider_route_upgrades_tls_passthrough_on_existing_same_auth(self):
b = _bottle([{
"host": "api.openai.com",
"auth": {"scheme": "Bearer", "token_ref": CODEX_HOST_CREDENTIAL_TOKEN_REF},
}])
pr = _provider_route(
"api.openai.com", CODEX_HOST_CREDENTIAL_TOKEN_REF, tls_passthrough=True,
)
routes = egress_routes_for_bottle(b, (pr,))
self.assertEqual(1, len(routes))
self.assertTrue(routes[0].tls_passthrough)
def test_provider_route_conflicts_with_different_authed_manifest_route(self):
b = _bottle([{
"host": "chatgpt.com",
"auth": {"scheme": "Bearer", "token_ref": "OTHER"},
}])
pr = _provider_route("chatgpt.com", CODEX_HOST_CREDENTIAL_TOKEN_REF)
with self.assertRaises(Die):
egress_routes_for_bottle(b)
egress_routes_for_bottle(b, (pr,))
def test_provider_route_tls_passthrough_set_on_appended_route(self):
b = _bottle([])
pr = _provider_route("api.openai.com", "TOK", tls_passthrough=True)
routes = egress_routes_for_bottle(b, (pr,))
self.assertTrue(routes[0].tls_passthrough)
def test_provider_route_tls_passthrough_set_on_upgraded_bare_route(self):
b = _bottle([{"host": "api.openai.com"}])
pr = _provider_route("api.openai.com", "TOK", tls_passthrough=True)
routes = egress_routes_for_bottle(b, (pr,))
self.assertTrue(routes[0].tls_passthrough)
class TestTokenEnvMap(unittest.TestCase):
+17 -9
View File
@@ -5,6 +5,8 @@ git-gate (PRD 0008)."""
import unittest
from bot_bottle.agent_provider import CODEX_HOST_CREDENTIAL_HOSTS
from bot_bottle.egress import CODEX_HOST_CREDENTIAL_TOKEN_REF, EgressRoute
from bot_bottle.manifest import Manifest
from bot_bottle.pipelock import (
pipelock_effective_allowlist,
@@ -116,17 +118,23 @@ class TestTlsPassthrough(unittest.TestCase):
def test_forward_host_credentials_passes_through_codex_hosts(self):
# Egress injects the host bearer on the Codex API hosts; pipelock
# must pass them through or its header DLP blocks the injected JWT
# ("request header contains secret"). These routes are auto-added
# (not in bottle.egress.routes), so passthrough is host-derived.
passthrough = pipelock_effective_tls_passthrough(_bottle({
"agent_provider": {
"template": "codex",
"forward_host_credentials": True,
},
}))
# ("request header contains secret"). Provider routes carry
# tls_passthrough=True; pipelock reads this via egress_routes_for_bottle.
provider_routes = tuple(
EgressRoute(
host=host,
auth_scheme="Bearer",
token_ref=CODEX_HOST_CREDENTIAL_TOKEN_REF,
tls_passthrough=True,
)
for host in CODEX_HOST_CREDENTIAL_HOSTS
)
passthrough = pipelock_effective_tls_passthrough(
_bottle({}), provider_routes,
)
self.assertEqual(["api.openai.com", "chatgpt.com"], passthrough)
def test_no_codex_passthrough_without_forward_host_credentials(self):
def test_no_codex_passthrough_without_provider_routes(self):
passthrough = pipelock_effective_tls_passthrough(_bottle({
"agent_provider": {"template": "codex"},
}))