PRD 0029: provision egress routes via AgentProvisionPlan #115

Merged
didericis merged 9 commits from prd-0029-egress-routes-via-agent-provision-plan into codex/prd-codex-host-credentials 2026-06-01 22:04:33 -04:00
15 changed files with 486 additions and 437 deletions
+40 -8
View File
@@ -13,11 +13,17 @@ from pathlib import Path
from typing import Literal from typing import Literal
from .codex_auth import write_codex_dummy_auth_file from .codex_auth import write_codex_dummy_auth_file
from .egress import CODEX_HOST_CREDENTIAL_TOKEN_REF, EgressRoute
PROVIDER_CLAUDE = "claude" PROVIDER_CLAUDE = "claude"
PROVIDER_CODEX = "codex" PROVIDER_CODEX = "codex"
PROVIDER_TEMPLATES = frozenset({PROVIDER_CLAUDE, PROVIDER_CODEX}) PROVIDER_TEMPLATES = frozenset({PROVIDER_CLAUDE, PROVIDER_CODEX})
# Hosts that egress injects the host ChatGPT bearer on when Codex
# forward_host_credentials is enabled. Pipelock must pass these through
# (no TLS MITM) or its header DLP blocks the injected JWT.
CODEX_HOST_CREDENTIAL_HOSTS = ("api.openai.com", "chatgpt.com")
PromptMode = Literal["append_file", "read_prompt_file"] PromptMode = Literal["append_file", "read_prompt_file"]
@@ -27,8 +33,6 @@ class AgentProviderRuntime:
command: str command: str
image: str image: str
dockerfile: str dockerfile: str
auth_role: str
placeholder_env: str
prompt_mode: PromptMode prompt_mode: PromptMode
bypass_args: tuple[str, ...] bypass_args: tuple[str, ...]
resume_args: tuple[str, ...] resume_args: tuple[str, ...]
@@ -63,6 +67,16 @@ class AgentProvisionPlan:
Backends interpret this plan with their own copy/exec primitives. Backends interpret this plan with their own copy/exec primitives.
Provider-specific content stays here so future provider plugins can Provider-specific content stays here so future provider plugins can
return the same shape without adding backend-plan fields. return the same shape without adding backend-plan fields.
`egress_routes` are provider-declared EgressRoutes that backends
pass to `Egress.prepare` and `PipelockProxy.prepare`. This keeps
provider logic out of the egress and pipelock modules — they merge
provider routes generically without knowing the provider type.
`hidden_env_names` is the set of env var names the provider injected
as non-secret placeholders. `print_util.visible_agent_env_names` uses
this to suppress them from the preflight summary so operators don't
mistake them for real credentials.
""" """
template: str template: str
@@ -76,6 +90,8 @@ class AgentProvisionPlan:
files: tuple[AgentProvisionFile, ...] = () files: tuple[AgentProvisionFile, ...] = ()
pre_copy: tuple[AgentProvisionCommand, ...] = () pre_copy: tuple[AgentProvisionCommand, ...] = ()
verify: tuple[AgentProvisionCommand, ...] = () verify: tuple[AgentProvisionCommand, ...] = ()
egress_routes: tuple[EgressRoute, ...] = ()
hidden_env_names: frozenset[str] = field(default_factory=frozenset)
_REPO_ROOT = Path(__file__).resolve().parent.parent _REPO_ROOT = Path(__file__).resolve().parent.parent
1
@@ -87,8 +103,6 @@ _RUNTIMES = {
command="claude", command="claude",
image="bot-bottle-claude:latest", image="bot-bottle-claude:latest",
dockerfile=str(_REPO_ROOT / "Dockerfile.claude"), dockerfile=str(_REPO_ROOT / "Dockerfile.claude"),
auth_role="claude_code_oauth",
placeholder_env="CLAUDE_CODE_OAUTH_TOKEN",
prompt_mode="append_file", prompt_mode="append_file",
bypass_args=("--dangerously-skip-permissions",), bypass_args=("--dangerously-skip-permissions",),
resume_args=("--continue",), resume_args=("--continue",),
@@ -99,8 +113,6 @@ _RUNTIMES = {
command="codex", command="codex",
image="bot-bottle-codex:latest", image="bot-bottle-codex:latest",
dockerfile=str(_REPO_ROOT / "Dockerfile.codex"), dockerfile=str(_REPO_ROOT / "Dockerfile.codex"),
auth_role="",
placeholder_env="",
prompt_mode="read_prompt_file", prompt_mode="read_prompt_file",
bypass_args=("--dangerously-bypass-approvals-and-sandbox",), bypass_args=("--dangerously-bypass-approvals-and-sandbox",),
resume_args=("resume", "--last"), resume_args=("resume", "--last"),
1
@@ -120,8 +132,8 @@ def agent_provision_plan(
state_dir: Path, state_dir: Path,
guest_home: str = "/home/node", guest_home: str = "/home/node",
guest_env: dict[str, str] | None = None, guest_env: dict[str, str] | None = None,
auth_token: str = "",
forward_host_credentials: bool = False, forward_host_credentials: bool = False,
has_provider_auth: bool = False,
host_env: dict[str, str] | None = None, host_env: dict[str, str] | None = None,
) -> AgentProvisionPlan: ) -> AgentProvisionPlan:
runtime = runtime_for(template) runtime = runtime_for(template)
@@ -131,6 +143,8 @@ def agent_provision_plan(
files: list[AgentProvisionFile] = [] files: list[AgentProvisionFile] = []
pre_copy: list[AgentProvisionCommand] = [] pre_copy: list[AgentProvisionCommand] = []
verify: list[AgentProvisionCommand] = [] verify: list[AgentProvisionCommand] = []
egress_routes: list[EgressRoute] = []
hidden_env_names: frozenset[str] = frozenset()
if template == PROVIDER_CODEX: if template == PROVIDER_CODEX:
env_vars["CODEX_CA_CERTIFICATE"] = "/etc/ssl/certs/ca-certificates.crt" env_vars["CODEX_CA_CERTIFICATE"] = "/etc/ssl/certs/ca-certificates.crt"
@@ -147,6 +161,13 @@ def agent_provision_plan(
config_file.chmod(0o600) config_file.chmod(0o600)
files.append(AgentProvisionFile(config_file, config_path)) files.append(AgentProvisionFile(config_file, config_path))
for host in CODEX_HOST_CREDENTIAL_HOSTS:
egress_routes.append(EgressRoute(
host=host,
auth_scheme="Bearer" if forward_host_credentials else "",
token_ref=CODEX_HOST_CREDENTIAL_TOKEN_REF if forward_host_credentials else "",
tls_passthrough=True,
))
if forward_host_credentials: if forward_host_credentials:
auth_file = state_dir / "codex-auth.json" auth_file = state_dir / "codex-auth.json"
write_codex_dummy_auth_file(auth_file, host_env or dict(os.environ)) write_codex_dummy_auth_file(auth_file, host_env or dict(os.environ))
@@ -172,9 +193,18 @@ def agent_provision_plan(
"codex host credentials: dummy auth was copied into the " "codex host credentials: dummy auth was copied into the "
"guest, but Codex did not accept it" "guest, but Codex did not accept it"
))) )))
if template == PROVIDER_CLAUDE and has_provider_auth: if template == PROVIDER_CLAUDE:
env_vars["CLAUDE_CODE_DISABLE_NONESSENTIAL_TRAFFIC"] = "1" env_vars["CLAUDE_CODE_DISABLE_NONESSENTIAL_TRAFFIC"] = "1"
didericis marked this conversation as resolved Outdated
Outdated
Review

Similarly to codex, we should always include these in egress routes (whether or not auth_token is present), but we only have the egress add the auth token when it's present

Similarly to codex, we should always include these in egress routes (whether or not auth_token is present), but we only have the egress add the auth token when it's present
env_vars["DISABLE_ERROR_REPORTING"] = "1" env_vars["DISABLE_ERROR_REPORTING"] = "1"
egress_routes.append(EgressRoute(
host="api.anthropic.com",
auth_scheme="Bearer" if auth_token else "",
token_ref=auth_token,
tls_passthrough=True,
))
if auth_token:
env_vars["CLAUDE_CODE_OAUTH_TOKEN"] = "egress-placeholder"
hidden_env_names = frozenset({"CLAUDE_CODE_OAUTH_TOKEN"})
return AgentProvisionPlan( return AgentProvisionPlan(
template=template, template=template,
@@ -188,6 +218,8 @@ def agent_provision_plan(
files=tuple(files), files=tuple(files),
pre_copy=tuple(pre_copy), pre_copy=tuple(pre_copy),
verify=tuple(verify), verify=tuple(verify),
egress_routes=tuple(egress_routes),
hidden_env_names=hidden_env_names,
) )
+1 -1
View File
@@ -89,7 +89,7 @@ class DockerBottlePlan(BottlePlan):
| set(self.forwarded_env.keys()) | set(self.forwarded_env.keys())
| set(self.agent_provision.guest_env.keys()) | set(self.agent_provision.guest_env.keys())
), ),
agent_provider_template=self.agent_provider_template, hidden_env_names=self.agent_provision.hidden_env_names,
) )
print(file=sys.stderr) print(file=sys.stderr)
+32 -40
View File
@@ -159,17 +159,44 @@ def resolve_plan(
prompt_file.write_text("") prompt_file.write_text("")
prompt_file.chmod(0o600) prompt_file.chmod(0o600)
pipelock_dir = pipelock_state_dir(slug)
pipelock_dir.mkdir(parents=True, exist_ok=True)
proxy_plan = proxy.prepare(bottle, slug, pipelock_dir)
git_gate_dir = git_gate_state_dir(slug) git_gate_dir = git_gate_state_dir(slug)
git_gate_dir.mkdir(parents=True, exist_ok=True) git_gate_dir.mkdir(parents=True, exist_ok=True)
git_gate_plan = git_gate.prepare(bottle, slug, git_gate_dir) git_gate_plan = git_gate.prepare(bottle, slug, git_gate_dir)
resolved = resolve_env(manifest, spec.agent_name)
# Everything that should reach the bottle by-name (so its value
# never lands on argv or in env_file) goes into one dict. Nothing
# mutates the host os.environ.
forwarded_env: dict[str, str] = dict(resolved.forwarded)
_write_env_file(resolved, env_file)
prompt_file.write_text(agent.prompt)
use_runsc = docker_mod.runsc_available()
agent_provision = agent_provision_plan(
template=provider.template,
dockerfile=dockerfile_path,
didericis marked this conversation as resolved Outdated
Outdated
Review

lines 177-183 should also be moved into the agent provision step: it's the responsibility of the agent provisioner to determine whether or not there should be something like "egress-placeholder" in the env and which env var it should go into. This should also remove the need for placeholder_env in the provider runtime.

lines 177-183 should also be moved into the agent provision step: it's the responsibility of the agent provisioner to determine whether or not there should be something like "egress-placeholder" in the env and which env var it should go into. This should also remove the need for `placeholder_env` in the provider runtime.
state_dir=agent_dir,
guest_home=os.environ.get("BOT_BOTTLE_CONTAINER_HOME", "/home/node"),
forward_host_credentials=provider.forward_host_credentials,
auth_token=provider.auth_token,
host_env=dict(os.environ),
)
guest_env = dict(agent_provision.guest_env)
for key, val in agent_provision.env_vars.items():
guest_env.setdefault(key, val)
agent_provision = replace(agent_provision, guest_env=guest_env)
pipelock_dir = pipelock_state_dir(slug)
pipelock_dir.mkdir(parents=True, exist_ok=True)
proxy_plan = proxy.prepare(
bottle, slug, pipelock_dir, agent_provision.egress_routes,
)
egress_dir = egress_state_dir(slug) egress_dir = egress_state_dir(slug)
egress_dir.mkdir(parents=True, exist_ok=True) egress_dir.mkdir(parents=True, exist_ok=True)
egress_plan = egress.prepare(bottle, slug, egress_dir) egress_plan = egress.prepare(
bottle, slug, egress_dir, agent_provision.egress_routes,
)
supervise_plan = None supervise_plan = None
if bottle.supervise: if bottle.supervise:
@@ -197,41 +224,6 @@ def resolve_plan(
slug, supervise_dir, slug, supervise_dir,
dockerfile_content=dockerfile_content, dockerfile_content=dockerfile_content,
) )
resolved = resolve_env(manifest, spec.agent_name)
# Everything that should reach the bottle by-name (so its value
# never lands on argv or in env_file) goes into one dict. Nothing
# mutates the host os.environ.
forwarded_env: dict[str, str] = dict(resolved.forwarded)
# Some provider CLIs refuse to start without *some* credential
# env var even when egress will strip + re-inject the real
# Authorization header. For those providers, auth_role names the
# route marker that enables a non-secret placeholder env. Codex is
# intentionally absent here: it should use its device/ChatGPT login
# state, and an OPENAI_API_KEY placeholder would force API-key auth.
has_provider_auth = any(
provider_runtime.auth_role
and provider_runtime.auth_role in r.roles
for r in egress_plan.routes
)
if has_provider_auth and provider_runtime.placeholder_env:
forwarded_env[provider_runtime.placeholder_env] = "egress-placeholder"
_write_env_file(resolved, env_file)
prompt_file.write_text(agent.prompt)
use_runsc = docker_mod.runsc_available()
agent_provision = agent_provision_plan(
template=provider.template,
dockerfile=dockerfile_path,
state_dir=agent_dir,
guest_home=os.environ.get("BOT_BOTTLE_CONTAINER_HOME", "/home/node"),
forward_host_credentials=provider.forward_host_credentials,
has_provider_auth=has_provider_auth,
host_env=dict(os.environ),
)
guest_env = dict(agent_provision.guest_env)
for key, val in agent_provision.env_vars.items():
guest_env.setdefault(key, val)
agent_provision = replace(agent_provision, guest_env=guest_env)
return DockerBottlePlan( return DockerBottlePlan(
spec=spec, spec=spec,
+6 -10
View File
@@ -9,7 +9,6 @@ from __future__ import annotations
from typing import Sequence from typing import Sequence
from ..agent_provider import runtime_for
from ..log import info from ..log import info
@@ -30,16 +29,13 @@ def print_multi(label: str, values: Sequence[str]) -> None:
def visible_agent_env_names( def visible_agent_env_names(
env_names: Sequence[str], *, agent_provider_template: str, env_names: Sequence[str], *, hidden_env_names: frozenset[str],
) -> list[str]: ) -> list[str]:
"""Env names worth showing in launch summaries. """Env names worth showing in launch summaries.
Provider auth placeholders (currently `CLAUDE_CODE_OAUTH_TOKEN`) Provider-injected placeholder env vars are implementation details:
are implementation details: they are non-secret dummy values that they are non-secret dummy values that satisfy provider CLIs while
satisfy the provider CLI while egress injects the real upstream egress injects the real Authorization header. The plan's
Authorization header. Showing them in preflight makes the operator `hidden_env_names` carries exactly which names to suppress.
think a real key is entering the agent, so hide only the active
provider-owned placeholder.
""" """
hidden = {runtime_for(agent_provider_template).placeholder_env} return sorted({name for name in env_names if name and name not in hidden_env_names})
return sorted({name for name in env_names if name and name not in hidden})
@@ -125,7 +125,7 @@ class SmolmachinesBottlePlan(BottlePlan):
set(bottle.env.keys()) set(bottle.env.keys())
| set(self.agent_provision.guest_env.keys()) | set(self.agent_provision.guest_env.keys())
), ),
agent_provider_template=self.agent_provider_template, hidden_env_names=self.agent_provision.hidden_env_names,
) )
upstreams = [ upstreams = [
f"{g.Name}{g.Upstream}" for g in bottle.git f"{g.Name}{g.Upstream}" for g in bottle.git
+25 -35
View File
@@ -95,44 +95,10 @@ def resolve_plan(
"REQUESTS_CA_BUNDLE": "/etc/ssl/certs/ca-certificates.crt", "REQUESTS_CA_BUNDLE": "/etc/ssl/certs/ca-certificates.crt",
} }
# Inner Plans for the four bundle daemons. The ABCs are
# platform-neutral — `.prepare()` writes config files + returns
# a Plan dataclass with no backend-specific assumptions. State
# dirs are still keyed by slug under the docker backend's
# bottle_state layout (shared on-host convention; not a docker
# dependency).
pipelock_dir = pipelock_state_dir(slug)
pipelock_dir.mkdir(parents=True, exist_ok=True)
proxy_plan = PipelockProxy().prepare(bottle, slug, pipelock_dir)
git_gate_dir = git_gate_state_dir(slug) git_gate_dir = git_gate_state_dir(slug)
git_gate_dir.mkdir(parents=True, exist_ok=True) git_gate_dir.mkdir(parents=True, exist_ok=True)
git_gate_plan = GitGate().prepare(bottle, slug, git_gate_dir) git_gate_plan = GitGate().prepare(bottle, slug, git_gate_dir)
egress_dir = egress_state_dir(slug)
egress_dir.mkdir(parents=True, exist_ok=True)
egress_plan = Egress().prepare(bottle, slug, egress_dir)
# Some provider CLIs refuse to start without *some* credential
# env var even when egress will strip + re-inject the real
# Authorization header. For those providers, auth_role names the
# route marker that enables a non-secret placeholder env. Codex is
# intentionally absent here: it should use its device/ChatGPT login
# state, and an OPENAI_API_KEY placeholder would force API-key auth.
has_provider_auth = any(
provider_runtime.auth_role
and provider_runtime.auth_role in r.roles
for r in egress_plan.routes
)
if has_provider_auth and provider_runtime.placeholder_env:
guest_env[provider_runtime.placeholder_env] = "egress-placeholder"
supervise_plan = None
if bottle.supervise:
supervise_dir = supervise_state_dir(slug)
supervise_dir.mkdir(parents=True, exist_ok=True)
supervise_plan = Supervise().prepare(slug, supervise_dir)
# Prompt file is always written (mode 0o600) so the in-VM # Prompt file is always written (mode 0o600) so the in-VM
# path always exists. Content is the agent's `prompt` # path always exists. Content is the agent's `prompt`
# field (markdown body) — empty for agents with no prompt. # field (markdown body) — empty for agents with no prompt.
@@ -167,7 +133,7 @@ def resolve_plan(
guest_home=os.environ.get("BOT_BOTTLE_GUEST_HOME", "/home/node"), guest_home=os.environ.get("BOT_BOTTLE_GUEST_HOME", "/home/node"),
guest_env=guest_env, guest_env=guest_env,
forward_host_credentials=provider.forward_host_credentials, forward_host_credentials=provider.forward_host_credentials,
has_provider_auth=has_provider_auth, auth_token=provider.auth_token,
host_env=dict(os.environ), host_env=dict(os.environ),
) )
merged_guest_env = dict(agent_provision.guest_env) merged_guest_env = dict(agent_provision.guest_env)
@@ -175,6 +141,30 @@ def resolve_plan(
merged_guest_env.setdefault(key, val) merged_guest_env.setdefault(key, val)
agent_provision = replace(agent_provision, guest_env=merged_guest_env) agent_provision = replace(agent_provision, guest_env=merged_guest_env)
# Inner Plans for the four bundle daemons. The ABCs are
# platform-neutral — `.prepare()` writes config files + returns
# a Plan dataclass with no backend-specific assumptions. State
# dirs are still keyed by slug under the docker backend's
# bottle_state layout (shared on-host convention; not a docker
# dependency).
pipelock_dir = pipelock_state_dir(slug)
pipelock_dir.mkdir(parents=True, exist_ok=True)
proxy_plan = PipelockProxy().prepare(
bottle, slug, pipelock_dir, agent_provision.egress_routes,
)
egress_dir = egress_state_dir(slug)
egress_dir.mkdir(parents=True, exist_ok=True)
egress_plan = Egress().prepare(
bottle, slug, egress_dir, agent_provision.egress_routes,
)
supervise_plan = None
if bottle.supervise:
supervise_dir = supervise_state_dir(slug)
supervise_dir.mkdir(parents=True, exist_ok=True)
supervise_plan = Supervise().prepare(slug, supervise_dir)
return SmolmachinesBottlePlan( return SmolmachinesBottlePlan(
spec=spec, spec=spec,
stage_dir=stage_dir, stage_dir=stage_dir,
+78 -54
View File
@@ -27,11 +27,13 @@ from __future__ import annotations
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING
from .log import die from .log import die
from .manifest import Bottle
CODEX_HOST_CREDENTIAL_HOSTS = ("api.openai.com", "chatgpt.com") if TYPE_CHECKING:
from .manifest import Bottle
CODEX_HOST_CREDENTIAL_TOKEN_REF = "BOT_BOTTLE_CODEX_HOST_ACCESS_TOKEN" CODEX_HOST_CREDENTIAL_TOKEN_REF = "BOT_BOTTLE_CODEX_HOST_ACCESS_TOKEN"
@@ -67,9 +69,14 @@ class EgressRoute:
under `token_env`. Routes that share a `token_ref` coalesce to under `token_env`. Routes that share a `token_ref` coalesce to
one `token_env` slot. one `token_env` slot.
`roles` carries the manifest route's optional role markers (see `roles` carries the manifest route's role tuple (reserved for
`manifest.EGRESS_ROLES`). The launch step reads these for future use; always empty today).
side effects like the claude-code OAuth placeholder env."""
`tls_passthrough` signals that pipelock must not TLS-MITM this
host — either because the manifest declared `pipelock.tls_passthrough:
true` (lifted in `egress_manifest_routes`) or because a provider
route set it (e.g. egress injects its own Bearer on that host
after the agent boundary and pipelock's header DLP would block it)."""
host: str host: str
path_allowlist: tuple[str, ...] = () path_allowlist: tuple[str, ...] = ()
@@ -77,6 +84,7 @@ class EgressRoute:
token_env: str = "" token_env: str = ""
token_ref: str = "" token_ref: str = ""
roles: tuple[str, ...] = () roles: tuple[str, ...] = ()
tls_passthrough: bool = False
@dataclass(frozen=True) @dataclass(frozen=True)
@@ -161,84 +169,94 @@ def egress_manifest_routes(
token_env=token_env, token_env=token_env,
token_ref=r.TokenRef, token_ref=r.TokenRef,
roles=r.Role, roles=r.Role,
tls_passthrough=r.Pipelock.TlsPassthrough,
)) ))
else: else:
out.append(EgressRoute( out.append(EgressRoute(
host=r.Host, host=r.Host,
path_allowlist=r.PathAllowlist, path_allowlist=r.PathAllowlist,
roles=r.Role, roles=r.Role,
tls_passthrough=r.Pipelock.TlsPassthrough,
)) ))
return tuple(out) return tuple(out)
def egress_routes_for_bottle( def egress_routes_for_bottle(
bottle: Bottle, bottle: Bottle,
provider_routes: tuple[EgressRoute, ...] = (),
) -> tuple[EgressRoute, ...]: ) -> tuple[EgressRoute, ...]:
"""Effective egress routes. This is what gets rendered into """Effective egress routes for the agent. This is what gets rendered
routes.yaml + what the addon enforces. into routes.yaml and what the addon enforces.
Operators that want to allow a host usually declare it directly in Merges manifest-declared routes with provider-owned routes. The
`bottle.egress.routes` as an authenticated route or bare-pass entry manifest is the primary surface; `provider_routes` are synthesised
(`- host: <name>`). Codex host-credential forwarding is the by `agent_provision_plan` and may add or upgrade manifest entries.
provider-owned exception: when explicitly enabled, it adds or Provider routes that conflict with an existing authenticated manifest
upgrades the Codex API hosts to egress-owned authenticated routes. The route (different auth scheme or token ref) raise a hard error."""
legacy `bottle.egress.allowlist` folding is gone — egress is the
single allowlist surface."""
routes = list(egress_manifest_routes(bottle)) routes = list(egress_manifest_routes(bottle))
if not bottle.agent_provider.forward_host_credentials: for pr in provider_routes:
return tuple(routes) routes = _merge_provider_route(routes, pr)
if bottle.agent_provider.template != "codex":
return tuple(routes)
for host in CODEX_HOST_CREDENTIAL_HOSTS:
routes = _ensure_codex_host_credential_route(routes, host)
return tuple(routes) return tuple(routes)
def _next_token_env(routes: list[EgressRoute]) -> str: def _find_or_alloc_token_env(routes: list[EgressRoute], token_ref: str) -> str:
"""Return the existing token_env slot for `token_ref`, or allocate the next one."""
for route in routes:
if route.token_ref == token_ref and route.token_env:
return route.token_env
return f"EGRESS_TOKEN_{len({r.token_env for r in routes if r.token_env})}" return f"EGRESS_TOKEN_{len({r.token_env for r in routes if r.token_env})}"
def _codex_host_credential_token_env(routes: list[EgressRoute]) -> str: def _merge_provider_route(
for route in routes: routes: list[EgressRoute], pr: EgressRoute,
if route.token_ref == CODEX_HOST_CREDENTIAL_TOKEN_REF:
return route.token_env
return _next_token_env(routes)
def _ensure_codex_host_credential_route(
routes: list[EgressRoute], host: str,
) -> list[EgressRoute]: ) -> list[EgressRoute]:
"""Merge one provider-declared route into the manifest route list.
Upgrade a bare-pass manifest route to authenticated if the provider
declares auth for that host, or append if the host isn't in the manifest.
Identical auth (same scheme + token_ref) on an existing route is a
no-op, with a tls_passthrough upgrade if the provider route sets it.
Conflicting auth (different scheme or token_ref) dies."""
for idx, route in enumerate(routes): for idx, route in enumerate(routes):
if route.host.lower() != host: if route.host.lower() != pr.host.lower():
continue continue
if route.auth_scheme or route.token_ref: if route.auth_scheme or route.token_ref:
if ( if route.auth_scheme == pr.auth_scheme and route.token_ref == pr.token_ref:
route.auth_scheme == "Bearer" if pr.tls_passthrough and not route.tls_passthrough:
and route.token_ref == CODEX_HOST_CREDENTIAL_TOKEN_REF routes[idx] = EgressRoute(
): host=route.host,
path_allowlist=route.path_allowlist,
auth_scheme=route.auth_scheme,
token_env=route.token_env,
token_ref=route.token_ref,
roles=route.roles,
tls_passthrough=True,
)
return routes return routes
die( die(
"codex host credential forwarding conflicts with an " f"provider egress route for {pr.host!r} conflicts with an "
f"authenticated egress route for {host}. Remove that " f"authenticated manifest route (different auth scheme or token "
"route auth block or disable agent_provider.forward_host_credentials." f"ref). Remove the manifest route's auth block or disable the "
f"feature that adds this provider route."
) )
token_env = _find_or_alloc_token_env(routes, pr.token_ref)
routes[idx] = EgressRoute( routes[idx] = EgressRoute(
host=route.host, host=route.host,
path_allowlist=route.path_allowlist, path_allowlist=route.path_allowlist,
auth_scheme="Bearer", auth_scheme=pr.auth_scheme,
token_env=_codex_host_credential_token_env(routes), token_env=token_env,
token_ref=CODEX_HOST_CREDENTIAL_TOKEN_REF, token_ref=pr.token_ref,
roles=route.roles, roles=route.roles,
tls_passthrough=pr.tls_passthrough,
) )
return routes return routes
token_env = _find_or_alloc_token_env(routes, pr.token_ref)
routes.append(EgressRoute( routes.append(EgressRoute(
host=host, host=pr.host,
auth_scheme="Bearer", auth_scheme=pr.auth_scheme,
token_env=_codex_host_credential_token_env(routes), token_env=token_env,
token_ref=CODEX_HOST_CREDENTIAL_TOKEN_REF, token_ref=pr.token_ref,
tls_passthrough=pr.tls_passthrough,
)) ))
return routes return routes
@@ -338,18 +356,23 @@ class Egress(ABC):
sidecar's start/stop lifecycle is backend-specific and lives on sidecar's start/stop lifecycle is backend-specific and lives on
concrete subclasses.""" concrete subclasses."""
def prepare(self, bottle: Bottle, slug: str, stage_dir: Path) -> EgressPlan: def prepare(
"""Lift `bottle.egress.routes` into resolved routes, self,
render the routes file (mode 600) under `stage_dir`, and bottle: Bottle,
slug: str,
stage_dir: Path,
provider_routes: tuple[EgressRoute, ...] = (),
) -> EgressPlan:
"""Lift `bottle.egress.routes` + `provider_routes` into resolved
routes, render the routes file (mode 600) under `stage_dir`, and
return the plan. Pure host-side, no docker subprocess. The return the plan. Pure host-side, no docker subprocess. The
token-env map records the mapping the launch step uses to token-env map records the mapping the launch step uses to
forward values from the host's environ into the sidecar's forward values from the host's environ into the sidecar's environ.
environ.
Returned plan is incomplete: the launch step must fill Returned plan is incomplete: the launch step must fill
`internal_network` / `egress_network` / `pipelock_proxy_url` `internal_network` / `egress_network` / `pipelock_proxy_url`
via `dataclasses.replace` before passing it to `.start`.""" via `dataclasses.replace` before passing it to `.start`."""
routes = egress_routes_for_bottle(bottle) routes = egress_routes_for_bottle(bottle, provider_routes)
routes_path = stage_dir / "egress_routes.yaml" routes_path = stage_dir / "egress_routes.yaml"
routes_path.write_text(egress_render_routes(routes)) routes_path.write_text(egress_render_routes(routes))
routes_path.chmod(0o600) routes_path.chmod(0o600)
@@ -361,6 +384,7 @@ class Egress(ABC):
) )
__all__ = [ __all__ = [
"CODEX_HOST_CREDENTIAL_TOKEN_REF",
"EGRESS_HOSTNAME", "EGRESS_HOSTNAME",
"EGRESS_ROUTES_IN_CONTAINER", "EGRESS_ROUTES_IN_CONTAINER",
"Egress", "Egress",
+41 -101
View File
@@ -175,47 +175,6 @@ class GitEntry:
# token-not-Bearer quirk (go-gitea/gitea#16734). # token-not-Bearer quirk (go-gitea/gitea#16734).
EGRESS_AUTH_SCHEMES = ("Bearer", "token") EGRESS_AUTH_SCHEMES = ("Bearer", "token")
# Optional per-route role markers. A role signals "this route plays
# a specific named part in the bottle's auth flow"; the launch step
# acts on the marker.
#
# claude_code_oauth: this route auth-injects on the agent's
# claude-code OAuth flow. Triggers prepare.py
# to ship a placeholder CLAUDE_CODE_OAUTH_TOKEN
# to the agent (so claude-code starts) and
# disable nonessential-traffic / error-reporting
# env vars. Host doesn't matter to the placeholder
# logic — declare the role on whichever route
# injects the OAuth header.
#
# codex_auth: placeholder marker reserved for follow-up Codex
# credential-injection work. It is still accepted so
# existing manifests and future egress-held auth flows
# have a stable role name, but it no longer triggers an
# OPENAI_API_KEY placeholder. Codex bottles should prefer
# device/ChatGPT login state today.
#
# Routes without a `role` are pure proxy entries: egress
# enforces path_allowlist + injects auth on its own, but nothing
# special happens on the agent side.
EGRESS_ROLES = frozenset({
"claude_code_oauth",
"codex_auth",
})
# Singleton roles may appear on at most one route per bottle. Some
# roles drive a single provider auth path; two routes claiming one
# marker would leave "which one is canonical?" ambiguous.
EGRESS_SINGLETON_ROLES = frozenset({
"claude_code_oauth",
"codex_auth",
})
PROVIDER_EGRESS_ROLES = {
"claude": frozenset({"claude_code_oauth"}),
"codex": frozenset({"codex_auth"}),
}
@dataclass(frozen=True) @dataclass(frozen=True)
class AgentProvider: class AgentProvider:
1
@@ -224,20 +183,30 @@ class AgentProvider:
`template` selects a built-in launch/runtime contract. `dockerfile` `template` selects a built-in launch/runtime contract. `dockerfile`
optionally points at a custom agent-image Dockerfile while leaving optionally points at a custom agent-image Dockerfile while leaving
bot-bottle's sidecar infrastructure intact. bot-bottle's sidecar infrastructure intact.
`auth_token` names the host env var that holds the provider's OAuth
token (Claude only). The provisioner injects a provider-owned egress
route for api.anthropic.com that re-injects this token as the Bearer
header, and sets a placeholder CLAUDE_CODE_OAUTH_TOKEN in the agent
so the Claude Code CLI starts.
`forward_host_credentials` forwards the host Codex auth token into
the egress sidecar (Codex only).
""" """
template: str = "claude" template: str = "claude"
dockerfile: str = "" dockerfile: str = ""
auth_token: str = ""
forward_host_credentials: bool = False forward_host_credentials: bool = False
@classmethod @classmethod
def from_dict(cls, bottle_name: str, raw: object) -> "AgentProvider": def from_dict(cls, bottle_name: str, raw: object) -> "AgentProvider":
d = _as_json_object(raw, f"bottle '{bottle_name}' agent_provider") d = _as_json_object(raw, f"bottle '{bottle_name}' agent_provider")
for k in d: for k in d:
if k not in {"template", "dockerfile", "forward_host_credentials"}: if k not in {"template", "dockerfile", "auth_token", "forward_host_credentials"}:
raise ManifestError( raise ManifestError(
f"bottle '{bottle_name}' agent_provider has unknown key {k!r}; " f"bottle '{bottle_name}' agent_provider has unknown key {k!r}; "
f"allowed: template, dockerfile, forward_host_credentials" f"allowed: template, dockerfile, auth_token, forward_host_credentials"
) )
template = d.get("template", "claude") template = d.get("template", "claude")
if not isinstance(template, str) or not template: if not isinstance(template, str) or not template:
@@ -256,6 +225,17 @@ class AgentProvider:
f"bottle '{bottle_name}' agent_provider.dockerfile must be a " f"bottle '{bottle_name}' agent_provider.dockerfile must be a "
f"string (was {type(dockerfile).__name__})" f"string (was {type(dockerfile).__name__})"
) )
auth_token = d.get("auth_token", "")
if not isinstance(auth_token, str):
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.auth_token must be a "
f"string (was {type(auth_token).__name__})"
)
if auth_token and template != "claude":
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.auth_token is only "
f"supported for template 'claude'"
)
forward_host_credentials = d.get("forward_host_credentials", False) forward_host_credentials = d.get("forward_host_credentials", False)
if not isinstance(forward_host_credentials, bool): if not isinstance(forward_host_credentials, bool):
raise ManifestError( raise ManifestError(
@@ -270,6 +250,7 @@ class AgentProvider:
return cls( return cls(
template=template, template=template,
dockerfile=dockerfile, dockerfile=dockerfile,
auth_token=auth_token,
forward_host_credentials=forward_host_credentials, forward_host_credentials=forward_host_credentials,
) )
@@ -428,10 +409,8 @@ class EgressRoute:
manifest's `auth` block is omitted both fields are empty strings — manifest's `auth` block is omitted both fields are empty strings —
no Authorization is written, no token forwarded. no Authorization is written, no token forwarded.
`Role` is an optional tuple of named markers (see `Role` is reserved for future use; all role strings are currently
EGRESS_ROLES). The launch step reads these and triggers rejected by the validator.
associated side effects (e.g. the `claude_code_oauth` marker
causes prepare.py to set a placeholder OAuth env on the agent).
Validation rules (enforced in `from_dict`): Validation rules (enforced in `from_dict`):
- `host` required, non-empty. - `host` required, non-empty.
@@ -440,10 +419,7 @@ class EgressRoute:
`token_ref` as non-empty strings; an empty `auth: {}` is an `token_ref` as non-empty strings; an empty `auth: {}` is an
error rather than a synonym for "no auth" (omit `auth` for error rather than a synonym for "no auth" (omit `auth` for
that case). that case).
- `role` optional. String or list of strings drawn from - `role` optional, reserved any non-empty value is rejected.
EGRESS_ROLES. Singleton roles (see
EGRESS_SINGLETON_ROLES) may appear on at most one
route per bottle.
""" """
Host: str Host: str
@@ -541,12 +517,11 @@ class EgressRoute:
f"{label} role must be a string or a list of strings " f"{label} role must be a string or a list of strings "
f"(was {type(role_raw).__name__})" f"(was {type(role_raw).__name__})"
) )
for r in roles: if roles:
if r not in EGRESS_ROLES: raise ManifestError(
raise ManifestError( f"{label} role {roles[0]!r} is not accepted; "
f"{label} role {r!r} is not one of " f"the 'role' field is reserved for future use"
f"{', '.join(sorted(EGRESS_ROLES))}" )
)
pipelock = ( pipelock = (
PipelockRoutePolicy.from_dict(bottle_name, idx, d["pipelock"]) PipelockRoutePolicy.from_dict(bottle_name, idx, d["pipelock"])
@@ -581,9 +556,7 @@ class EgressConfig:
routes: tuple[EgressRoute, ...] = () routes: tuple[EgressRoute, ...] = ()
@classmethod @classmethod
def from_dict( def from_dict(cls, bottle_name: str, raw: object) -> "EgressConfig":
cls, bottle_name: str, raw: object, *, agent_provider_template: str = "claude",
) -> "EgressConfig":
d = _as_json_object(raw, f"bottle '{bottle_name}' egress") d = _as_json_object(raw, f"bottle '{bottle_name}' egress")
routes_raw = d.get("routes") routes_raw = d.get("routes")
routes: tuple[EgressRoute, ...] = () routes: tuple[EgressRoute, ...] = ()
@@ -598,9 +571,7 @@ class EgressConfig:
EgressRoute.from_dict(bottle_name, i, entry) EgressRoute.from_dict(bottle_name, i, entry)
for i, entry in enumerate(routes_list) for i, entry in enumerate(routes_list)
) )
_validate_egress_routes( _validate_egress_routes(bottle_name, routes)
bottle_name, routes, agent_provider_template=agent_provider_template,
)
for k in d: for k in d:
if k != "routes": if k != "routes":
raise ManifestError( raise ManifestError(
@@ -691,10 +662,7 @@ class Bottle:
) )
egress = ( egress = (
EgressConfig.from_dict( EgressConfig.from_dict(name, d["egress"])
name, d["egress"],
agent_provider_template=agent_provider.template,
)
if "egress" in d if "egress" in d
else EgressConfig() else EgressConfig()
) )
@@ -1058,21 +1026,15 @@ def _is_ip_literal(value: str) -> bool:
def _validate_egress_routes( def _validate_egress_routes(
bottle_name: str, bottle_name: str,
routes: tuple[EgressRoute, ...], routes: tuple[EgressRoute, ...],
*,
agent_provider_template: str = "claude",
) -> None: ) -> None:
"""Cross-validation for `bottle.egress.routes`: """Cross-validation for `bottle.egress.routes`: hosts must be unique.
- Hosts must be unique within the bottle. The proxy matches by The proxy matches by exact-host (v1); duplicate hosts leave the
exact-host (v1, prefix matching is on path_allowlist only); route choice ambiguous so we reject them up front.
duplicate hosts leave the route choice ambiguous.
- Singleton roles (see EGRESS_SINGLETON_ROLES) may appear
on at most one route per bottle.
No cross-validation against `bottle.git` is performed. git-gate No cross-validation against `bottle.git` is performed. git-gate
(SSH push/fetch) and egress (HTTPS) broker different (SSH push/fetch) and egress (HTTPS) broker different protocols;
protocols; declaring both for the same host is a legitimate declaring both for the same host is a legitimate dev setup."""
dev setup."""
seen_hosts: dict[str, None] = {} seen_hosts: dict[str, None] = {}
for r in routes: for r in routes:
key = r.Host.lower() key = r.Host.lower()
@@ -1082,25 +1044,6 @@ def _validate_egress_routes(
f"{r.Host!r}; each host must be unique on the proxy." f"{r.Host!r}; each host must be unique on the proxy."
) )
seen_hosts[key] = None seen_hosts[key] = None
for role in EGRESS_SINGLETON_ROLES:
with_role = [r for r in routes if role in r.Role]
if len(with_role) > 1:
hosts = ", ".join(r.Host for r in with_role)
raise ManifestError(
f"bottle '{bottle_name}' egress.routes has {len(with_role)} "
f"routes with role {role!r} (hosts: {hosts}); this role drives a "
f"single launch-step side effect — pick one."
)
allowed_roles = PROVIDER_EGRESS_ROLES[agent_provider_template]
for route in routes:
for role in route.Role:
if role not in allowed_roles:
raise ManifestError(
f"bottle '{bottle_name}' egress route for host "
f"{route.Host!r} has role {role!r}, but provider "
f"{agent_provider_template!r} only accepts roles "
f"{', '.join(sorted(allowed_roles)) or '(none)'}"
)
def _validate_unique_git_names(bottle_name: str, git: tuple[GitEntry, ...]) -> None: def _validate_unique_git_names(bottle_name: str, git: tuple[GitEntry, ...]) -> None:
@@ -1311,10 +1254,7 @@ def _merge_bottles(
merged_supervise = ( merged_supervise = (
child.supervise if "supervise" in child_raw else parent.supervise child.supervise if "supervise" in child_raw else parent.supervise
) )
_validate_egress_routes( _validate_egress_routes(name, merged_egress.routes)
name, merged_egress.routes,
agent_provider_template=merged_agent_provider.template,
)
return Bottle( return Bottle(
env=merged_env, env=merged_env,
+33 -37
View File
@@ -21,11 +21,7 @@ from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from typing import cast from typing import cast
from .egress import ( from .egress import EGRESS_HOSTNAME, EgressRoute, egress_routes_for_bottle
CODEX_HOST_CREDENTIAL_HOSTS,
EGRESS_HOSTNAME,
egress_routes_for_bottle,
)
from .supervise import SUPERVISE_HOSTNAME from .supervise import SUPERVISE_HOSTNAME
from .manifest import Bottle from .manifest import Bottle
@@ -54,14 +50,17 @@ PIPELOCK_HOSTNAME = "pipelock"
# --- Allowlist resolution -------------------------------------------------- # --- Allowlist resolution --------------------------------------------------
def pipelock_effective_allowlist(bottle: Bottle) -> list[str]: def pipelock_effective_allowlist(
bottle: Bottle,
provider_routes: tuple[EgressRoute, ...] = (),
) -> list[str]:
"""Hostnames pipelock allows. Sorted for stability. """Hostnames pipelock allows. Sorted for stability.
Always mirrors `egress_routes_for_bottle(bottle)` egress is the Always mirrors `egress_routes_for_bottle(bottle, provider_routes)`
single allowlist surface, and pipelock's allowlist is the downstream egress is the single allowlist surface, and pipelock's allowlist is
copy for defense-in-depth + DLP body scanning. For bottles without the downstream copy for defense-in-depth + DLP body scanning. For
any `egress.routes[]` declared, this is empty except for supervise bottles without any `egress.routes[]` declared, this is empty except
sidecar traffic when `supervise: true`. for supervise sidecar traffic when `supervise: true`.
The supervise sidecar's hostname is auto-added when supervise The supervise sidecar's hostname is auto-added when supervise
is enabled (sibling-sidecar traffic that flows through pipelock is enabled (sibling-sidecar traffic that flows through pipelock
@@ -69,7 +68,7 @@ def pipelock_effective_allowlist(bottle: Bottle) -> list[str]:
`bottle.git` do NOT contribute here git traffic flows `bottle.git` do NOT contribute here git traffic flows
through git-gate (PRD 0008), not pipelock.""" through git-gate (PRD 0008), not pipelock."""
seen: dict[str, None] = {} seen: dict[str, None] = {}
for r in egress_routes_for_bottle(bottle): for r in egress_routes_for_bottle(bottle, provider_routes):
if r.host: if r.host:
seen.setdefault(r.host, None) seen.setdefault(r.host, None)
if bottle.supervise: if bottle.supervise:
@@ -102,32 +101,23 @@ def pipelock_seed_phrase_detection_enabled(bottle: Bottle) -> bool:
return False return False
def pipelock_effective_tls_passthrough(bottle: Bottle) -> list[str]: def pipelock_effective_tls_passthrough(
bottle: Bottle,
provider_routes: tuple[EgressRoute, ...] = (),
) -> list[str]:
"""Hostnames pipelock should pass through (no TLS MITM). """Hostnames pipelock should pass through (no TLS MITM).
A route opts in with `pipelock.tls_passthrough: true`. This is A manifest route opts in with `pipelock.tls_passthrough: true`
useful for provider API routes where egress injects the (lifted into `EgressRoute.tls_passthrough` in `egress_manifest_routes`).
Authorization header after the agent boundary; pipelock still Provider routes that set `tls_passthrough=True` (e.g. Codex credential
enforces the host allowlist but does not decrypt and scan that routes where egress injects the host bearer after the agent boundary)
provider request. are also included. Both arrive via `egress_routes_for_bottle` no
provider-specific branching needed here.
""" """
seen: dict[str, None] = {host: None for host in DEFAULT_TLS_PASSTHROUGH} seen: dict[str, None] = {host: None for host in DEFAULT_TLS_PASSTHROUGH}
for route in bottle.egress.routes: for route in egress_routes_for_bottle(bottle, provider_routes):
if route.Pipelock.TlsPassthrough: if route.tls_passthrough:
seen.setdefault(route.Host, None) seen.setdefault(route.host, None)
# forward_host_credentials makes egress inject the host ChatGPT bearer
# on the Codex API hosts AFTER the agent boundary. Pipelock sits
# downstream of egress and DLP-scans request headers; left to MITM
# these routes it flags the injected JWT as a leaked secret
# ("request header contains secret") and blocks. Pass them through so
# pipelock still enforces the host allowlist on CONNECT but does not
# decrypt + rescan egress-owned auth. The auto-added routes live in
# egress_routes_for_bottle, not bottle.egress.routes, so add the
# hosts explicitly here.
provider = bottle.agent_provider
if provider.forward_host_credentials and provider.template == "codex":
for host in CODEX_HOST_CREDENTIAL_HOSTS:
seen.setdefault(host, None)
return sorted(seen.keys()) return sorted(seen.keys())
@@ -159,6 +149,7 @@ def pipelock_build_config(
ca_cert_path: str = "", ca_cert_path: str = "",
ca_key_path: str = "", ca_key_path: str = "",
ssrf_ip_allowlist: tuple[str, ...] = (), ssrf_ip_allowlist: tuple[str, ...] = (),
provider_routes: tuple[EgressRoute, ...] = (),
) -> dict[str, object]: ) -> dict[str, object]:
"""Build the structured pipelock config dict the sidecar will load. """Build the structured pipelock config dict the sidecar will load.
@@ -188,7 +179,7 @@ def pipelock_build_config(
"version": 1, "version": 1,
"mode": "strict", "mode": "strict",
"enforce": True, "enforce": True,
"api_allowlist": pipelock_effective_allowlist(bottle), "api_allowlist": pipelock_effective_allowlist(bottle, provider_routes),
"forward_proxy": {"enabled": True}, "forward_proxy": {"enabled": True},
} }
if not pipelock_seed_phrase_detection_enabled(bottle): if not pipelock_seed_phrase_detection_enabled(bottle):
@@ -222,7 +213,7 @@ def pipelock_build_config(
"enabled": True, "enabled": True,
"ca_cert": ca_cert_path, "ca_cert": ca_cert_path,
"ca_key": ca_key_path, "ca_key": ca_key_path,
"passthrough_domains": pipelock_effective_tls_passthrough(bottle), "passthrough_domains": pipelock_effective_tls_passthrough(bottle, provider_routes),
} }
effective_ssrf_ip_allowlist = pipelock_effective_ssrf_ip_allowlist( effective_ssrf_ip_allowlist = pipelock_effective_ssrf_ip_allowlist(
bottle, ssrf_ip_allowlist, bottle, ssrf_ip_allowlist,
@@ -336,7 +327,11 @@ class PipelockProxy:
(`PIPELOCK_CA_CERT_IN_CONTAINER` / `PIPELOCK_CA_KEY_IN_CONTAINER`).""" (`PIPELOCK_CA_CERT_IN_CONTAINER` / `PIPELOCK_CA_KEY_IN_CONTAINER`)."""
def prepare( def prepare(
self, bottle: Bottle, slug: str, stage_dir: Path self,
bottle: Bottle,
slug: str,
stage_dir: Path,
provider_routes: tuple[EgressRoute, ...] = (),
) -> PipelockProxyPlan: ) -> PipelockProxyPlan:
"""Write the pipelock yaml config (mode 600) under `stage_dir` """Write the pipelock yaml config (mode 600) under `stage_dir`
and return the plan for launch. Pure host-side, no docker and return the plan for launch. Pure host-side, no docker
@@ -359,6 +354,7 @@ class PipelockProxy:
bottle, bottle,
ca_cert_path=PIPELOCK_CA_CERT_IN_CONTAINER, ca_cert_path=PIPELOCK_CA_CERT_IN_CONTAINER,
ca_key_path=PIPELOCK_CA_KEY_IN_CONTAINER, ca_key_path=PIPELOCK_CA_KEY_IN_CONTAINER,
provider_routes=provider_routes,
) )
yaml_path.write_text(pipelock_render_yaml(cfg)) yaml_path.write_text(pipelock_render_yaml(cfg))
yaml_path.chmod(0o600) yaml_path.chmod(0o600)
@@ -1,4 +1,4 @@
# PRD 0029: Codex host credentials through egress # PRD 0029: Provider auth credentials through egress
- **Status:** Draft - **Status:** Draft
- **Author:** didericis-codex - **Author:** didericis-codex
@@ -7,9 +7,12 @@
## Summary ## Summary
Allow Codex bottles to use a host-authorized ChatGPT/device-login Allow provider bottles to inject host credentials into the egress
access token by forwarding it only into the egress sidecar, gated by an sidecar without exposing them to the agent. Codex uses
explicit `agent_provider.forward_host_credentials` manifest flag. `agent_provider.forward_host_credentials` for ChatGPT/device-login
access tokens. Claude uses `agent_provider.auth_token` to name the host
env var holding its OAuth token, which egress injects on
`api.anthropic.com` requests.
## Problem ## Problem
@@ -51,8 +54,8 @@ possible, not in the agent.
current access token at launch; operators can restart after host Codex current access token at launch; operators can restart after host Codex
refreshes auth. refreshes auth.
- Copying host `~/.codex/auth.json` credentials into the agent. - Copying host `~/.codex/auth.json` credentials into the agent.
- Allowing arbitrary host credential forwarding. This PRD covers Codex - Allowing arbitrary host credential forwarding beyond the two providers
ChatGPT/device-login credentials only. covered here (Codex ChatGPT/device-login and Claude OAuth).
- Hot-applying new authenticated Codex routes to an existing running - Hot-applying new authenticated Codex routes to an existing running
sidecar. The current hot-apply path cannot safely populate new token sidecar. The current hot-apply path cannot safely populate new token
env slots in an already-running container. env slots in an already-running container.
@@ -64,6 +67,15 @@ possible, not in the agent.
- Add `agent_provider.forward_host_credentials` to the bottle manifest - Add `agent_provider.forward_host_credentials` to the bottle manifest
schema, defaulting to `false`. schema, defaulting to `false`.
- Support the flag for `agent_provider.template: codex`. - Support the flag for `agent_provider.template: codex`.
- Add `agent_provider.auth_token` to the bottle manifest schema.
- Support the field for `agent_provider.template: claude`: the named
host env var is forwarded only into the egress sidecar as the Bearer
token for `api.anthropic.com`, and a placeholder
`CLAUDE_CODE_OAUTH_TOKEN` is set in the agent so the Claude Code CLI
starts without a real credential.
- Remove the `claude_code_oauth` egress route role, which previously
required operators to declare the OAuth route manually. The provisioner
now injects it from `auth_token`.
- Read host Codex auth from `$CODEX_HOME/auth.json` when `CODEX_HOME` is - Read host Codex auth from `$CODEX_HOME/auth.json` when `CODEX_HOME` is
set, otherwise from `~/.codex/auth.json`. set, otherwise from `~/.codex/auth.json`.
- Extract only `tokens.access_token` for egress injection. - Extract only `tokens.access_token` for egress injection.
+71 -16
View File
@@ -8,7 +8,12 @@ import tempfile
import unittest import unittest
from pathlib import Path from pathlib import Path
from bot_bottle.agent_provider import agent_provision_plan, runtime_for from bot_bottle.agent_provider import (
CODEX_HOST_CREDENTIAL_HOSTS,
agent_provision_plan,
runtime_for,
)
from bot_bottle.egress import CODEX_HOST_CREDENTIAL_TOKEN_REF
def _jwt(exp: int) -> str: def _jwt(exp: int) -> str:
@@ -19,16 +24,6 @@ def _jwt(exp: int) -> str:
class TestAgentProviderRuntime(unittest.TestCase): class TestAgentProviderRuntime(unittest.TestCase):
def test_claude_keeps_oauth_placeholder(self):
runtime = runtime_for("claude")
self.assertEqual("claude_code_oauth", runtime.auth_role)
self.assertEqual("CLAUDE_CODE_OAUTH_TOKEN", runtime.placeholder_env)
def test_codex_does_not_inject_openai_api_key_placeholder(self):
runtime = runtime_for("codex")
self.assertEqual("", runtime.auth_role)
self.assertEqual("", runtime.placeholder_env)
def test_codex_plan_declares_home_state(self): def test_codex_plan_declares_home_state(self):
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp: with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
plan = agent_provision_plan( plan = agent_provision_plan(
@@ -76,19 +71,79 @@ class TestAgentProviderRuntime(unittest.TestCase):
self.assertEqual(1, len(plan.verify)) self.assertEqual(1, len(plan.verify))
self.assertIn("CODEX_HOME=/run/codex-home", plan.verify[0].argv) self.assertIn("CODEX_HOME=/run/codex-home", plan.verify[0].argv)
def test_claude_with_provider_auth_disables_nonessential_traffic(self): def test_claude_with_auth_token_injects_provider_route_and_placeholder(self):
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp: with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
plan = agent_provision_plan( plan = agent_provision_plan(
template="claude", template="claude",
dockerfile="/tmp/Dockerfile.claude", dockerfile="/tmp/Dockerfile.claude",
state_dir=Path(tmp), state_dir=Path(tmp),
has_provider_auth=True, auth_token="BOT_BOTTLE_CLAUDE_OAUTH_TOKEN",
)
self.assertEqual(1, len(plan.egress_routes))
route = plan.egress_routes[0]
self.assertEqual("api.anthropic.com", route.host)
self.assertEqual("Bearer", route.auth_scheme)
self.assertEqual("BOT_BOTTLE_CLAUDE_OAUTH_TOKEN", route.token_ref)
self.assertTrue(route.tls_passthrough)
self.assertEqual("egress-placeholder", plan.env_vars["CLAUDE_CODE_OAUTH_TOKEN"])
self.assertEqual("1", plan.env_vars["CLAUDE_CODE_DISABLE_NONESSENTIAL_TRAFFIC"])
self.assertEqual("1", plan.env_vars["DISABLE_ERROR_REPORTING"])
self.assertEqual(frozenset({"CLAUDE_CODE_OAUTH_TOKEN"}), plan.hidden_env_names)
def test_codex_forward_host_credentials_populates_egress_routes(self):
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
home = Path(tmp) / "host-codex"
home.mkdir()
(home / "auth.json").write_text(json.dumps({
"auth_mode": "chatgpt",
"tokens": {"access_token": _jwt(2000000000)},
}))
plan = agent_provision_plan(
template="codex",
dockerfile="",
state_dir=Path(tmp),
forward_host_credentials=True,
host_env={"CODEX_HOME": str(home)},
)
hosts = [r.host for r in plan.egress_routes]
self.assertEqual(sorted(CODEX_HOST_CREDENTIAL_HOSTS), sorted(hosts))
for r in plan.egress_routes:
self.assertEqual("Bearer", r.auth_scheme)
self.assertEqual(CODEX_HOST_CREDENTIAL_TOKEN_REF, r.token_ref)
self.assertTrue(r.tls_passthrough)
def test_codex_without_forward_host_credentials_has_passthrough_egress_routes(self):
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
plan = agent_provision_plan(
template="codex",
dockerfile="",
state_dir=Path(tmp),
didericis marked this conversation as resolved Outdated
Outdated
Review

when we don't forward host credentials there should still be egress routes, just not egress routes with an auto-injected token (and we should have passthrough set to true so the tokens the user would set after logging in don't get stripped out)

when we don't forward host credentials there should still be egress routes, just not egress routes with an auto-injected token (and we should have passthrough set to true so the tokens the user would set after logging in don't get stripped out)
forward_host_credentials=False,
) )
self.assertEqual( self.assertEqual(
"1", {r.host for r in plan.egress_routes},
plan.env_vars["CLAUDE_CODE_DISABLE_NONESSENTIAL_TRAFFIC"], set(CODEX_HOST_CREDENTIAL_HOSTS),
) )
self.assertEqual("1", plan.env_vars["DISABLE_ERROR_REPORTING"]) for r in plan.egress_routes:
self.assertEqual("", r.auth_scheme)
self.assertEqual("", r.token_ref)
self.assertTrue(r.tls_passthrough)
def test_claude_without_auth_token_has_passthrough_egress_route(self):
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
plan = agent_provision_plan(
template="claude",
dockerfile="",
state_dir=Path(tmp),
)
self.assertEqual(1, len(plan.egress_routes))
route = plan.egress_routes[0]
self.assertEqual("api.anthropic.com", route.host)
self.assertEqual("", route.auth_scheme)
self.assertEqual("", route.token_ref)
self.assertTrue(route.tls_passthrough)
self.assertNotIn("CLAUDE_CODE_OAUTH_TOKEN", plan.env_vars)
self.assertEqual(frozenset(), plan.hidden_env_names)
if __name__ == "__main__": if __name__ == "__main__":
+87 -54
View File
@@ -5,6 +5,7 @@ import unittest
from bot_bottle.egress import ( from bot_bottle.egress import (
CODEX_HOST_CREDENTIAL_TOKEN_REF, CODEX_HOST_CREDENTIAL_TOKEN_REF,
EgressRoute,
egress_manifest_routes, egress_manifest_routes,
egress_render_routes, egress_render_routes,
egress_resolve_token_values, egress_resolve_token_values,
@@ -23,19 +24,13 @@ def _bottle(routes):
}).bottles["dev"] }).bottles["dev"]
def _codex_bottle(*, forward_host_credentials: bool, routes): def _provider_route(host: str, token_ref: str, *, tls_passthrough: bool = False) -> EgressRoute:
return Manifest.from_json_obj({ return EgressRoute(
"bottles": { host=host,
"dev": { auth_scheme="Bearer",
"agent_provider": { token_ref=token_ref,
"template": "codex", tls_passthrough=tls_passthrough,
"forward_host_credentials": forward_host_credentials, )
},
"egress": {"routes": routes},
}
},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
}).bottles["dev"]
class TestRoutesForBottle(unittest.TestCase): class TestRoutesForBottle(unittest.TestCase):
@@ -100,9 +95,8 @@ class TestRoutesForBottle(unittest.TestCase):
self.assertEqual("", routes[1].token_env) self.assertEqual("", routes[1].token_env)
class TestRoutesForBottleUsesManifestOnly(unittest.TestCase): class TestRoutesForBottleManifestOnly(unittest.TestCase):
"""The effective route table is exactly the manifest-declared """Without provider routes the effective table is exactly the manifest."""
routes. Provider defaults are not injected implicitly."""
def test_no_manifest_routes_means_no_effective_routes(self): def test_no_manifest_routes_means_no_effective_routes(self):
b = _bottle([]) b = _bottle([])
@@ -123,58 +117,97 @@ class TestRoutesForBottleUsesManifestOnly(unittest.TestCase):
effective = [r.host for r in egress_routes_for_bottle(b)] effective = [r.host for r in egress_routes_for_bottle(b)]
self.assertEqual(["x.example"], effective) self.assertEqual(["x.example"], effective)
def test_codex_forward_host_credentials_adds_codex_routes(self): def test_tls_passthrough_lifted_from_manifest(self):
b = _codex_bottle(forward_host_credentials=True, routes=[]) b = _bottle([{
"host": "api.openai.com",
"auth": {"scheme": "Bearer", "token_ref": "T"},
"pipelock": {"tls_passthrough": True},
}])
routes = egress_routes_for_bottle(b) routes = egress_routes_for_bottle(b)
self.assertEqual(["api.openai.com", "chatgpt.com"], [r.host for r in routes]) self.assertTrue(routes[0].tls_passthrough)
def test_tls_passthrough_false_by_default(self):
b = _bottle([{"host": "api.github.com"}])
routes = egress_routes_for_bottle(b)
self.assertFalse(routes[0].tls_passthrough)
class TestProviderRouteMerge(unittest.TestCase):
"""Provider routes are merged into manifest routes generically."""
def test_provider_route_appended_when_not_in_manifest(self):
b = _bottle([])
pr = _provider_route("api.openai.com", "TOK")
routes = egress_routes_for_bottle(b, (pr,))
self.assertEqual(1, len(routes))
self.assertEqual("api.openai.com", routes[0].host)
self.assertEqual("Bearer", routes[0].auth_scheme) self.assertEqual("Bearer", routes[0].auth_scheme)
self.assertEqual("EGRESS_TOKEN_0", routes[0].token_env) self.assertEqual("EGRESS_TOKEN_0", routes[0].token_env)
self.assertEqual(CODEX_HOST_CREDENTIAL_TOKEN_REF, routes[0].token_ref) self.assertEqual("TOK", routes[0].token_ref)
self.assertEqual("Bearer", routes[1].auth_scheme)
self.assertEqual("EGRESS_TOKEN_0", routes[1].token_env)
self.assertEqual(CODEX_HOST_CREDENTIAL_TOKEN_REF, routes[1].token_ref)
def test_codex_forward_host_credentials_upgrades_bare_chatgpt_route(self): def test_two_provider_routes_with_same_token_ref_share_slot(self):
b = _codex_bottle( b = _bottle([])
forward_host_credentials=True, routes = egress_routes_for_bottle(b, (
routes=[{"host": "chatgpt.com", "path_allowlist": ["/backend-api/"]}], _provider_route("api.openai.com", CODEX_HOST_CREDENTIAL_TOKEN_REF),
) _provider_route("chatgpt.com", CODEX_HOST_CREDENTIAL_TOKEN_REF),
routes = egress_routes_for_bottle(b) ))
self.assertEqual(2, len(routes)) self.assertEqual(["api.openai.com", "chatgpt.com"], [r.host for r in routes])
self.assertEqual("EGRESS_TOKEN_0", routes[0].token_env)
self.assertEqual("EGRESS_TOKEN_0", routes[1].token_env)
def test_provider_route_upgrades_bare_manifest_route(self):
b = _bottle([{"host": "chatgpt.com", "path_allowlist": ["/backend-api/"]}])
pr = _provider_route("chatgpt.com", CODEX_HOST_CREDENTIAL_TOKEN_REF)
routes = egress_routes_for_bottle(b, (pr,))
self.assertEqual(1, len(routes))
self.assertEqual("chatgpt.com", routes[0].host) self.assertEqual("chatgpt.com", routes[0].host)
self.assertEqual("Bearer", routes[0].auth_scheme) self.assertEqual("Bearer", routes[0].auth_scheme)
self.assertEqual("EGRESS_TOKEN_0", routes[0].token_env) self.assertEqual("EGRESS_TOKEN_0", routes[0].token_env)
self.assertEqual(CODEX_HOST_CREDENTIAL_TOKEN_REF, routes[0].token_ref) self.assertEqual(CODEX_HOST_CREDENTIAL_TOKEN_REF, routes[0].token_ref)
self.assertEqual(("/backend-api/",), routes[0].path_allowlist) self.assertEqual(("/backend-api/",), routes[0].path_allowlist)
self.assertEqual("api.openai.com", routes[1].host)
self.assertEqual("EGRESS_TOKEN_0", routes[1].token_env)
def test_codex_forward_host_credentials_accepts_explicit_synthetic_route(self): def test_provider_route_noop_when_same_auth_already_in_manifest(self):
b = _codex_bottle( b = _bottle([{
forward_host_credentials=True, "host": "api.openai.com",
routes=[{ "auth": {"scheme": "Bearer", "token_ref": CODEX_HOST_CREDENTIAL_TOKEN_REF},
"host": "api.openai.com", }])
"auth": { pr = _provider_route("api.openai.com", CODEX_HOST_CREDENTIAL_TOKEN_REF)
"scheme": "Bearer", routes = egress_routes_for_bottle(b, (pr,))
"token_ref": CODEX_HOST_CREDENTIAL_TOKEN_REF, self.assertEqual(1, len(routes))
},
}],
)
routes = egress_routes_for_bottle(b)
self.assertEqual(["api.openai.com", "chatgpt.com"], [r.host for r in routes])
self.assertEqual("EGRESS_TOKEN_0", routes[0].token_env) self.assertEqual("EGRESS_TOKEN_0", routes[0].token_env)
self.assertEqual("EGRESS_TOKEN_0", routes[1].token_env)
def test_codex_forward_host_credentials_conflicts_with_authed_route(self): def test_provider_route_upgrades_tls_passthrough_on_existing_same_auth(self):
b = _codex_bottle( b = _bottle([{
forward_host_credentials=True, "host": "api.openai.com",
routes=[{ "auth": {"scheme": "Bearer", "token_ref": CODEX_HOST_CREDENTIAL_TOKEN_REF},
"host": "chatgpt.com", }])
"auth": {"scheme": "Bearer", "token_ref": "OTHER"}, pr = _provider_route(
}], "api.openai.com", CODEX_HOST_CREDENTIAL_TOKEN_REF, tls_passthrough=True,
) )
routes = egress_routes_for_bottle(b, (pr,))
self.assertEqual(1, len(routes))
self.assertTrue(routes[0].tls_passthrough)
def test_provider_route_conflicts_with_different_authed_manifest_route(self):
b = _bottle([{
"host": "chatgpt.com",
"auth": {"scheme": "Bearer", "token_ref": "OTHER"},
}])
pr = _provider_route("chatgpt.com", CODEX_HOST_CREDENTIAL_TOKEN_REF)
with self.assertRaises(Die): with self.assertRaises(Die):
egress_routes_for_bottle(b) egress_routes_for_bottle(b, (pr,))
def test_provider_route_tls_passthrough_set_on_appended_route(self):
b = _bottle([])
pr = _provider_route("api.openai.com", "TOK", tls_passthrough=True)
routes = egress_routes_for_bottle(b, (pr,))
self.assertTrue(routes[0].tls_passthrough)
def test_provider_route_tls_passthrough_set_on_upgraded_bare_route(self):
b = _bottle([{"host": "api.openai.com"}])
pr = _provider_route("api.openai.com", "TOK", tls_passthrough=True)
routes = egress_routes_for_bottle(b, (pr,))
self.assertTrue(routes[0].tls_passthrough)
class TestTokenEnvMap(unittest.TestCase): class TestTokenEnvMap(unittest.TestCase):
+32 -61
View File
@@ -85,6 +85,31 @@ class TestAgentProviderHostCredentials(unittest.TestCase):
"forward_host_credentials": True, "forward_host_credentials": True,
}) })
def test_auth_token_defaults_empty(self):
b = _provider_config_bottle({"template": "claude"})
self.assertEqual("", b.agent_provider.auth_token)
def test_auth_token_allowed_for_claude(self):
b = _provider_config_bottle({
"template": "claude",
"auth_token": "BOT_BOTTLE_CLAUDE_OAUTH_TOKEN",
})
self.assertEqual("BOT_BOTTLE_CLAUDE_OAUTH_TOKEN", b.agent_provider.auth_token)
def test_auth_token_must_be_string(self):
with self.assertRaises(ManifestError):
_provider_config_bottle({
"template": "claude",
"auth_token": 42,
})
def test_auth_token_rejected_for_codex(self):
with self.assertRaises(ManifestError):
_provider_config_bottle({
"template": "codex",
"auth_token": "SOME_TOKEN",
})
class TestPathAllowlist(unittest.TestCase): class TestPathAllowlist(unittest.TestCase):
def test_optional(self): def test_optional(self):
@@ -178,29 +203,12 @@ class TestRole(unittest.TestCase):
b = _bottle([{"host": "x.example"}]) b = _bottle([{"host": "x.example"}])
self.assertEqual((), b.egress.routes[0].Role) self.assertEqual((), b.egress.routes[0].Role)
def test_string_normalizes_to_tuple(self): def test_any_role_rejected(self):
b = _bottle([{ # All former roles removed; the field is reserved for future use.
"host": "api.anthropic.com", for role in ("claude_code_oauth", "codex_auth", "totally-made-up"):
"role": "claude_code_oauth", with self.subTest(role=role):
"auth": {"scheme": "Bearer", "token_ref": "T"}, with self.assertRaises(ManifestError):
}]) _bottle([{"host": "x.example", "role": role}])
self.assertEqual(("claude_code_oauth",),
b.egress.routes[0].Role)
def test_list_supported(self):
b = _bottle([{
"host": "api.anthropic.com",
"role": ["claude_code_oauth"],
"auth": {"scheme": "Bearer", "token_ref": "T"},
}])
self.assertEqual(("claude_code_oauth",),
b.egress.routes[0].Role)
def test_unknown_role_rejected(self):
# The role enum is locked down — typos shouldn't silently
# become no-op markers.
with self.assertRaises(ManifestError):
_bottle([{"host": "x.example", "role": "totally-made-up"}])
def test_non_string_role_rejected(self): def test_non_string_role_rejected(self):
with self.assertRaises(ManifestError): with self.assertRaises(ManifestError):
@@ -208,44 +216,7 @@ class TestRole(unittest.TestCase):
def test_list_with_non_string_item_rejected(self): def test_list_with_non_string_item_rejected(self):
with self.assertRaises(ManifestError): with self.assertRaises(ManifestError):
_bottle([{"host": "x.example", _bottle([{"host": "x.example", "role": ["x", 42]}])
"role": ["claude_code_oauth", 42]}])
def test_singleton_claude_code_oauth_enforced(self):
# Two routes both claiming the role would make "which one
# drives the placeholder env?" ambiguous.
with self.assertRaises(ManifestError):
_bottle([
{"host": "api.anthropic.com", "role": "claude_code_oauth",
"auth": {"scheme": "Bearer", "token_ref": "T1"}},
{"host": "api2.anthropic.example",
"role": "claude_code_oauth",
"auth": {"scheme": "Bearer", "token_ref": "T2"}},
])
def test_codex_auth_role_allowed_for_codex_provider(self):
b = _provider_bottle("codex", [{
"host": "api.openai.com",
"role": "codex_auth",
"auth": {"scheme": "Bearer", "token_ref": "OPENAI_TOKEN"},
}])
self.assertEqual(("codex_auth",), b.egress.routes[0].Role)
def test_claude_role_rejected_for_codex_provider(self):
with self.assertRaises(ManifestError):
_provider_bottle("codex", [{
"host": "api.anthropic.com",
"role": "claude_code_oauth",
"auth": {"scheme": "Bearer", "token_ref": "T"},
}])
def test_codex_role_rejected_for_default_claude_provider(self):
with self.assertRaises(ManifestError):
_bottle([{
"host": "api.openai.com",
"role": "codex_auth",
"auth": {"scheme": "Bearer", "token_ref": "T"},
}])
class TestPipelockPolicy(unittest.TestCase): class TestPipelockPolicy(unittest.TestCase):
+17 -9
View File
@@ -5,6 +5,8 @@ git-gate (PRD 0008)."""
import unittest import unittest
from bot_bottle.agent_provider import CODEX_HOST_CREDENTIAL_HOSTS
from bot_bottle.egress import CODEX_HOST_CREDENTIAL_TOKEN_REF, EgressRoute
from bot_bottle.manifest import Manifest from bot_bottle.manifest import Manifest
from bot_bottle.pipelock import ( from bot_bottle.pipelock import (
pipelock_effective_allowlist, pipelock_effective_allowlist,
@@ -116,17 +118,23 @@ class TestTlsPassthrough(unittest.TestCase):
def test_forward_host_credentials_passes_through_codex_hosts(self): def test_forward_host_credentials_passes_through_codex_hosts(self):
# Egress injects the host bearer on the Codex API hosts; pipelock # Egress injects the host bearer on the Codex API hosts; pipelock
# must pass them through or its header DLP blocks the injected JWT # must pass them through or its header DLP blocks the injected JWT
# ("request header contains secret"). These routes are auto-added # ("request header contains secret"). Provider routes carry
# (not in bottle.egress.routes), so passthrough is host-derived. # tls_passthrough=True; pipelock reads this via egress_routes_for_bottle.
passthrough = pipelock_effective_tls_passthrough(_bottle({ provider_routes = tuple(
"agent_provider": { EgressRoute(
"template": "codex", host=host,
"forward_host_credentials": True, auth_scheme="Bearer",
}, token_ref=CODEX_HOST_CREDENTIAL_TOKEN_REF,
})) tls_passthrough=True,
)
for host in CODEX_HOST_CREDENTIAL_HOSTS
)
passthrough = pipelock_effective_tls_passthrough(
_bottle({}), provider_routes,
)
self.assertEqual(["api.openai.com", "chatgpt.com"], passthrough) self.assertEqual(["api.openai.com", "chatgpt.com"], passthrough)
def test_no_codex_passthrough_without_forward_host_credentials(self): def test_no_codex_passthrough_without_provider_routes(self):
passthrough = pipelock_effective_tls_passthrough(_bottle({ passthrough = pipelock_effective_tls_passthrough(_bottle({
"agent_provider": {"template": "codex"}, "agent_provider": {"template": "codex"},
})) }))
+4 -4
View File
@@ -8,21 +8,21 @@ from bot_bottle.backend.print_util import visible_agent_env_names
class TestVisibleAgentEnvNames(unittest.TestCase): class TestVisibleAgentEnvNames(unittest.TestCase):
def test_codex_shows_openai_api_key_if_user_declares_it(self): def test_shows_all_when_no_hidden_names(self):
self.assertEqual( self.assertEqual(
["CUSTOM", "OPENAI_API_KEY"], ["CUSTOM", "OPENAI_API_KEY"],
visible_agent_env_names( visible_agent_env_names(
["OPENAI_API_KEY", "CUSTOM"], ["OPENAI_API_KEY", "CUSTOM"],
agent_provider_template="codex", hidden_env_names=frozenset(),
), ),
) )
def test_hides_only_active_provider_placeholder(self): def test_hides_provider_placeholder(self):
self.assertEqual( self.assertEqual(
["CUSTOM", "OPENAI_API_KEY"], ["CUSTOM", "OPENAI_API_KEY"],
visible_agent_env_names( visible_agent_env_names(
["CLAUDE_CODE_OAUTH_TOKEN", "OPENAI_API_KEY", "CUSTOM"], ["CLAUDE_CODE_OAUTH_TOKEN", "OPENAI_API_KEY", "CUSTOM"],
agent_provider_template="claude", hidden_env_names=frozenset({"CLAUDE_CODE_OAUTH_TOKEN"}),
), ),
) )