PRD 0029: Codex host credentials through egress #110

Merged
didericis merged 25 commits from codex/prd-codex-host-credentials into main 2026-06-01 23:16:25 -04:00
32 changed files with 2180 additions and 361 deletions
+19 -2
View File
@@ -352,10 +352,15 @@ auth through egress and gitea.dideric.is over SSH.
For a Codex-backed base bottle, set `agent_provider.template: codex`.
The Codex template expects ChatGPT/device login state instead of an
`OPENAI_API_KEY` env var; no API-key placeholder is forwarded into the
agent. To let headless device-code login request a user code, add an
unauthenticated egress route for the device-auth endpoint:
agent. To let bot-bottle read the host's current Codex ChatGPT access
token and inject it from egress only for Codex's API calls, opt in
explicitly:
```yaml
agent_provider:
template: codex
forward_host_credentials: true
egress:
routes:
- host: auth.openai.com
@@ -363,6 +368,18 @@ egress:
- /api/accounts/deviceauth/
```
Run `codex login --device-auth` on the host before launch. The
launcher reads `tokens.access_token` from the host's
`~/.codex/auth.json`, verifies it is fresh user/device auth, and passes
it to the sidecar's `EGRESS_TOKEN_N` env slot. The agent container gets
a dummy `~/.codex/auth.json` that preserves the host auth-mode shape
but replaces credential values with placeholders. It keeps the selected
ChatGPT account id so Codex sends requests for the same account while
egress owns the real bearer token. The agent never receives real access
tokens, refresh tokens, or `OPENAI_API_KEY`. The effective egress table
automatically adds or upgrades `api.openai.com` and `chatgpt.com` to
authenticated routes when `forward_host_credentials` is true.
The built-in Codex template uses `Dockerfile.codex`; set
`agent_provider.dockerfile` to build the agent from a custom Dockerfile
while keeping the bot-bottle sidecars in place.
+163 -7
View File
@@ -7,14 +7,23 @@ command, default image, and prompt/auth behavior.
from __future__ import annotations
from dataclasses import dataclass
import os
from dataclasses import dataclass, field
from pathlib import Path
from typing import Literal
from .codex_auth import write_codex_dummy_auth_file
from .egress import CODEX_HOST_CREDENTIAL_TOKEN_REF, EgressRoute
PROVIDER_CLAUDE = "claude"
PROVIDER_CODEX = "codex"
PROVIDER_TEMPLATES = frozenset({PROVIDER_CLAUDE, PROVIDER_CODEX})
# Hosts that egress injects the host ChatGPT bearer on when Codex
# forward_host_credentials is enabled. Pipelock must pass these through
# (no TLS MITM) or its header DLP blocks the injected JWT.
CODEX_HOST_CREDENTIAL_HOSTS = ("api.openai.com", "chatgpt.com")
PromptMode = Literal["append_file", "read_prompt_file"]
@@ -24,14 +33,67 @@ class AgentProviderRuntime:
command: str
image: str
dockerfile: str
auth_role: str
placeholder_env: str
prompt_mode: PromptMode
bypass_args: tuple[str, ...]
resume_args: tuple[str, ...]
remote_control_args: tuple[str, ...]
@dataclass(frozen=True)
class AgentProvisionDir:
guest_path: str
mode: str = "700"
owner: str = "node:node"
@dataclass(frozen=True)
class AgentProvisionFile:
host_path: Path
guest_path: str
mode: str = "600"
owner: str = "node:node"
@dataclass(frozen=True)
class AgentProvisionCommand:
argv: tuple[str, ...]
error: str = ""
@dataclass(frozen=True)
class AgentProvisionPlan:
"""Provider-owned guest setup.
Backends interpret this plan with their own copy/exec primitives.
Provider-specific content stays here so future provider plugins can
return the same shape without adding backend-plan fields.
`egress_routes` are provider-declared EgressRoutes that backends
pass to `Egress.prepare` and `PipelockProxy.prepare`. This keeps
provider logic out of the egress and pipelock modules — they merge
provider routes generically without knowing the provider type.
`hidden_env_names` is the set of env var names the provider injected
as non-secret placeholders. `print_util.visible_agent_env_names` uses
this to suppress them from the preflight summary so operators don't
mistake them for real credentials.
"""
template: str
command: str
prompt_mode: PromptMode
image: str
dockerfile: str
guest_env: dict[str, str]
env_vars: dict[str, str] = field(default_factory=dict)
dirs: tuple[AgentProvisionDir, ...] = ()
files: tuple[AgentProvisionFile, ...] = ()
pre_copy: tuple[AgentProvisionCommand, ...] = ()
verify: tuple[AgentProvisionCommand, ...] = ()
egress_routes: tuple[EgressRoute, ...] = ()
hidden_env_names: frozenset[str] = field(default_factory=frozenset)
_REPO_ROOT = Path(__file__).resolve().parent.parent
@@ -41,8 +103,6 @@ _RUNTIMES = {
command="claude",
image="bot-bottle-claude:latest",
dockerfile=str(_REPO_ROOT / "Dockerfile.claude"),
auth_role="claude_code_oauth",
placeholder_env="CLAUDE_CODE_OAUTH_TOKEN",
prompt_mode="append_file",
bypass_args=("--dangerously-skip-permissions",),
resume_args=("--continue",),
@@ -53,8 +113,6 @@ _RUNTIMES = {
command="codex",
image="bot-bottle-codex:latest",
dockerfile=str(_REPO_ROOT / "Dockerfile.codex"),
auth_role="",
placeholder_env="",
prompt_mode="read_prompt_file",
bypass_args=("--dangerously-bypass-approvals-and-sandbox",),
resume_args=("resume", "--last"),
@@ -67,6 +125,104 @@ def runtime_for(template: str) -> AgentProviderRuntime:
return _RUNTIMES[template]
def agent_provision_plan(
*,
template: str,
dockerfile: str,
state_dir: Path,
guest_home: str = "/home/node",
guest_env: dict[str, str] | None = None,
auth_token: str = "",
forward_host_credentials: bool = False,
host_env: dict[str, str] | None = None,
) -> AgentProvisionPlan:
runtime = runtime_for(template)
resolved_guest_env = dict(guest_env or {})
env_vars: dict[str, str] = {}
dirs: list[AgentProvisionDir] = []
files: list[AgentProvisionFile] = []
pre_copy: list[AgentProvisionCommand] = []
verify: list[AgentProvisionCommand] = []
egress_routes: list[EgressRoute] = []
hidden_env_names: frozenset[str] = frozenset()
if template == PROVIDER_CODEX:
env_vars["CODEX_CA_CERTIFICATE"] = "/etc/ssl/certs/ca-certificates.crt"
auth_dir = resolved_guest_env.get("CODEX_HOME", f"{guest_home}/.codex")
if forward_host_credentials:
env_vars["CODEX_HOME"] = auth_dir
dirs.append(AgentProvisionDir(auth_dir))
config_path = f"{auth_dir}/config.toml"
config_file = state_dir / "codex-config.toml"
config_file.write_text(
f'[projects."{guest_home}"]\n'
'trust_level = "trusted"\n'
)
config_file.chmod(0o600)
files.append(AgentProvisionFile(config_file, config_path))
for host in CODEX_HOST_CREDENTIAL_HOSTS:
egress_routes.append(EgressRoute(
host=host,
auth_scheme="Bearer" if forward_host_credentials else "",
token_ref=CODEX_HOST_CREDENTIAL_TOKEN_REF if forward_host_credentials else "",
tls_passthrough=True,
))
if forward_host_credentials:
auth_file = state_dir / "codex-auth.json"
write_codex_dummy_auth_file(auth_file, host_env or dict(os.environ))
files.append(AgentProvisionFile(auth_file, f"{auth_dir}/auth.json"))
pre_copy.append(AgentProvisionCommand((
"find", auth_dir,
"-maxdepth", "1",
"-type", "f",
"(",
"-name", "*.sqlite",
"-o", "-name", "*.sqlite-*",
"-o", "-name", "*.codex-repair-*.bak",
")",
"-delete",
), "codex host credentials: could not reset runtime db files"))
verify.append(AgentProvisionCommand((
"runuser", "-u", "node", "--",
"env",
f"HOME={guest_home}",
f"CODEX_HOME={auth_dir}",
"codex", "login", "status",
), (
"codex host credentials: dummy auth was copied into the "
"guest, but Codex did not accept it"
)))
if template == PROVIDER_CLAUDE:
env_vars["CLAUDE_CODE_DISABLE_NONESSENTIAL_TRAFFIC"] = "1"
env_vars["DISABLE_ERROR_REPORTING"] = "1"
egress_routes.append(EgressRoute(
host="api.anthropic.com",
auth_scheme="Bearer" if auth_token else "",
token_ref=auth_token,
tls_passthrough=True,
))
if auth_token:
env_vars["CLAUDE_CODE_OAUTH_TOKEN"] = "egress-placeholder"
hidden_env_names = frozenset({"CLAUDE_CODE_OAUTH_TOKEN"})
return AgentProvisionPlan(
template=template,
command=runtime.command,
prompt_mode=runtime.prompt_mode,
image=runtime.image,
dockerfile=dockerfile,
env_vars=env_vars,
guest_env=resolved_guest_env,
dirs=tuple(dirs),
files=tuple(files),
pre_copy=tuple(pre_copy),
verify=tuple(verify),
egress_routes=tuple(egress_routes),
hidden_env_names=hidden_env_names,
)
def prompt_args(
prompt_mode: PromptMode,
prompt_path: str | None,
+6
View File
@@ -286,6 +286,7 @@ class BottleBackend(ABC, Generic[PlanT, CleanupT]):
intercepted without per-tool reconfiguration."""
self.provision_ca(plan, target)
prompt_path = self.provision_prompt(plan, target)
self.provision_provider_auth(plan, target)
self.provision_skills(plan, target)
self.provision_git(plan, target)
self.provision_supervise(plan, target)
@@ -300,6 +301,11 @@ class BottleBackend(ABC, Generic[PlanT, CleanupT]):
backend overrides to docker-cp the cert in and run
`update-ca-certificates`."""
def provision_provider_auth(self, plan: PlanT, target: str) -> None:
"""Install non-secret provider auth marker files into the agent
home when a provider needs them to select the right auth mode.
The default is no-op."""
@abstractmethod
def provision_prompt(self, plan: PlanT, target: str) -> str | None:
"""Copy the prompt file into the running bottle. Returns the
+4
View File
@@ -29,6 +29,7 @@ from .bottle_plan import DockerBottlePlan
from .provision import ca as _ca
from .provision import git as _git
from .provision import prompt as _prompt
from .provision import provider_auth as _provider_auth
from .provision import skills as _skills
from .provision import supervise as _supervise_prov
@@ -62,6 +63,9 @@ class DockerBottleBackend(BottleBackend["DockerBottlePlan", "DockerBottleCleanup
def provision_prompt(self, plan: DockerBottlePlan, target: str) -> str | None:
return _prompt.provision_prompt(plan, target)
def provision_provider_auth(self, plan: DockerBottlePlan, target: str) -> None:
_provider_auth.provision_provider_auth(plan, target)
def provision_skills(self, plan: DockerBottlePlan, target: str) -> None:
_skills.provision_skills(plan, target)
+20 -6
View File
@@ -11,7 +11,7 @@ import sys
from dataclasses import dataclass, field
from pathlib import Path
from ...agent_provider import PromptMode
from ...agent_provider import AgentProvisionPlan, PromptMode
from ...egress import EgressPlan
from ...git_gate import GitGatePlan
from ...log import info
@@ -52,9 +52,19 @@ class DockerBottlePlan(BottlePlan):
# is opt-in via the manifest's bottle.supervise field.
supervise_plan: SupervisePlan | None
use_runsc: bool
agent_command: str = "claude"
agent_prompt_mode: PromptMode = "append_file"
agent_provider_template: str = "claude"
agent_provision: AgentProvisionPlan
@property
def agent_command(self) -> str:
return self.agent_provision.command
@property
def agent_prompt_mode(self) -> PromptMode:
return self.agent_provision.prompt_mode
@property
def agent_provider_template(self) -> str:
return self.agent_provision.template
def print(self, *, remote_control: bool) -> None:
"""Render the y/N preflight summary to stderr — compact form
@@ -74,8 +84,12 @@ class DockerBottlePlan(BottlePlan):
# upstream tokens in its own environ, so no token forwarding
# from the agent to the proxy is needed.
env_names = visible_agent_env_names(
sorted(set(bottle.env.keys()) | set(self.forwarded_env.keys())),
agent_provider_template=self.agent_provider_template,
sorted(
set(bottle.env.keys())
| set(self.forwarded_env.keys())
| set(self.agent_provision.guest_env.keys())
),
hidden_env_names=self.agent_provision.hidden_env_names,
)
print(file=sys.stderr)
+2
View File
@@ -286,6 +286,8 @@ def _agent_service(plan: DockerBottlePlan) -> dict[str, Any]:
f"SSL_CERT_FILE={AGENT_CA_BUNDLE}",
f"REQUESTS_CA_BUNDLE={AGENT_CA_BUNDLE}",
]
for name, value in sorted(plan.agent_provision.guest_env.items()):
env.append(f"{name}={value}")
# Forwarded vars (OAuth token, manifest host-interpolations):
# bare name → inherits from compose-up process env, value
# never lands on argv or in the compose file.
+12 -1
View File
@@ -42,7 +42,11 @@ from contextlib import ExitStack, contextmanager
from pathlib import Path
from typing import Callable, Generator
from ...egress import egress_resolve_token_values
from ...codex_auth import codex_host_access_token
from ...egress import (
CODEX_HOST_CREDENTIAL_TOKEN_REF,
egress_resolve_token_values,
)
from ...log import info
from . import network as network_mod
from . import util as docker_mod
@@ -181,6 +185,13 @@ def launch(
token_values = egress_resolve_token_values(
plan.egress_plan.token_env_map, dict(os.environ),
)
if plan.spec.manifest.bottle_for(
plan.spec.agent_name,
).agent_provider.forward_host_credentials:
access_token = codex_host_access_token(dict(os.environ))
for token_env, token_ref in plan.egress_plan.token_env_map.items():
if token_ref == CODEX_HOST_CREDENTIAL_TOKEN_REF:
token_values[token_env] = access_token
compose_env: dict[str, str] = {
**os.environ,
**plan.forwarded_env,
+35 -36
View File
@@ -12,9 +12,10 @@ from __future__ import annotations
import os
from datetime import datetime, timezone
from dataclasses import replace
from pathlib import Path
from ...agent_provider import runtime_for
from ...agent_provider import agent_provision_plan, runtime_for
from ...egress import Egress
from ...env import ResolvedEnv, resolve_env
from ...git_gate import GitGate
@@ -158,17 +159,44 @@ def resolve_plan(
prompt_file.write_text("")
prompt_file.chmod(0o600)
pipelock_dir = pipelock_state_dir(slug)
pipelock_dir.mkdir(parents=True, exist_ok=True)
proxy_plan = proxy.prepare(bottle, slug, pipelock_dir)
git_gate_dir = git_gate_state_dir(slug)
git_gate_dir.mkdir(parents=True, exist_ok=True)
git_gate_plan = git_gate.prepare(bottle, slug, git_gate_dir)
resolved = resolve_env(manifest, spec.agent_name)
# Everything that should reach the bottle by-name (so its value
# never lands on argv or in env_file) goes into one dict. Nothing
# mutates the host os.environ.
forwarded_env: dict[str, str] = dict(resolved.forwarded)
_write_env_file(resolved, env_file)
prompt_file.write_text(agent.prompt)
use_runsc = docker_mod.runsc_available()
agent_provision = agent_provision_plan(
template=provider.template,
dockerfile=dockerfile_path,
state_dir=agent_dir,
guest_home=os.environ.get("BOT_BOTTLE_CONTAINER_HOME", "/home/node"),
forward_host_credentials=provider.forward_host_credentials,
auth_token=provider.auth_token,
host_env=dict(os.environ),
)
guest_env = dict(agent_provision.guest_env)
for key, val in agent_provision.env_vars.items():
guest_env.setdefault(key, val)
agent_provision = replace(agent_provision, guest_env=guest_env)
pipelock_dir = pipelock_state_dir(slug)
pipelock_dir.mkdir(parents=True, exist_ok=True)
proxy_plan = proxy.prepare(
bottle, slug, pipelock_dir, agent_provision.egress_routes,
)
egress_dir = egress_state_dir(slug)
egress_dir.mkdir(parents=True, exist_ok=True)
egress_plan = egress.prepare(bottle, slug, egress_dir)
egress_plan = egress.prepare(
bottle, slug, egress_dir, agent_provision.egress_routes,
)
supervise_plan = None
if bottle.supervise:
@@ -196,33 +224,6 @@ def resolve_plan(
slug, supervise_dir,
dockerfile_content=dockerfile_content,
)
resolved = resolve_env(manifest, spec.agent_name)
# Everything that should reach the bottle by-name (so its value
# never lands on argv or in env_file) goes into one dict. Nothing
# mutates the host os.environ.
forwarded_env: dict[str, str] = dict(resolved.forwarded)
# Some provider CLIs refuse to start without *some* credential
# env var even when egress will strip + re-inject the real
# Authorization header. For those providers, auth_role names the
# route marker that enables a non-secret placeholder env. Codex is
# intentionally absent here: it should use its device/ChatGPT login
# state, and an OPENAI_API_KEY placeholder would force API-key auth.
has_provider_auth = any(
provider_runtime.auth_role
and provider_runtime.auth_role in r.roles
for r in egress_plan.routes
)
if has_provider_auth and provider_runtime.placeholder_env:
forwarded_env[provider_runtime.placeholder_env] = "egress-placeholder"
if provider.template == "claude" and has_provider_auth:
# Belt-and-braces: turn off telemetry endpoints (statsig,
# error reporting) that egress can't gate by auth.
forwarded_env.setdefault("CLAUDE_CODE_DISABLE_NONESSENTIAL_TRAFFIC", "1")
forwarded_env.setdefault("DISABLE_ERROR_REPORTING", "1")
_write_env_file(resolved, env_file)
prompt_file.write_text(agent.prompt)
use_runsc = docker_mod.runsc_available()
return DockerBottlePlan(
spec=spec,
@@ -242,9 +243,7 @@ def resolve_plan(
egress_plan=egress_plan,
supervise_plan=supervise_plan,
use_runsc=use_runsc,
agent_command=provider_runtime.command,
agent_prompt_mode=provider_runtime.prompt_mode,
agent_provider_template=provider.template,
agent_provision=agent_provision,
)
@@ -0,0 +1,36 @@
"""Provision non-secret provider auth markers into a Docker bottle."""
from __future__ import annotations
import subprocess
from ..bottle_plan import DockerBottlePlan
def provision_provider_auth(plan: DockerBottlePlan, target: str) -> None:
"""Apply provider-owned guest setup through Docker primitives."""
provision = plan.agent_provision
for d in provision.dirs:
_exec(target, ["mkdir", "-p", d.guest_path])
_exec(target, ["chown", d.owner, d.guest_path])
_exec(target, ["chmod", d.mode, d.guest_path])
for command in provision.pre_copy:
_exec(target, list(command.argv))
for f in provision.files:
subprocess.run(
["docker", "cp", str(f.host_path), f"{target}:{f.guest_path}"],
stdout=subprocess.DEVNULL,
check=True,
)
_exec(target, ["chown", f.owner, f.guest_path])
_exec(target, ["chmod", f.mode, f.guest_path])
for command in provision.verify:
_exec(target, list(command.argv))
def _exec(target: str, argv: list[str]) -> None:
subprocess.run(
["docker", "exec", "-u", "0", target, *argv],
stdout=subprocess.DEVNULL,
check=True,
)
+6 -10
View File
@@ -9,7 +9,6 @@ from __future__ import annotations
from typing import Sequence
from ..agent_provider import runtime_for
from ..log import info
@@ -30,16 +29,13 @@ def print_multi(label: str, values: Sequence[str]) -> None:
def visible_agent_env_names(
env_names: Sequence[str], *, agent_provider_template: str,
env_names: Sequence[str], *, hidden_env_names: frozenset[str],
) -> list[str]:
"""Env names worth showing in launch summaries.
Provider auth placeholders (currently `CLAUDE_CODE_OAUTH_TOKEN`)
are implementation details: they are non-secret dummy values that
satisfy the provider CLI while egress injects the real upstream
Authorization header. Showing them in preflight makes the operator
think a real key is entering the agent, so hide only the active
provider-owned placeholder.
Provider-injected placeholder env vars are implementation details:
they are non-secret dummy values that satisfy provider CLIs while
egress injects the real Authorization header. The plan's
`hidden_env_names` carries exactly which names to suppress.
"""
hidden = {runtime_for(agent_provider_template).placeholder_env}
return sorted({name for name in env_names if name and name not in hidden})
return sorted({name for name in env_names if name and name not in hidden_env_names})
@@ -19,6 +19,7 @@ from .bottle_plan import SmolmachinesBottlePlan
from .provision import ca as _ca
from .provision import git as _git
from .provision import prompt as _prompt
from .provision import provider_auth as _provider_auth
from .provision import skills as _skills
from .provision import supervise as _supervise
@@ -61,6 +62,11 @@ class SmolmachinesBottleBackend(
) -> str | None:
return _prompt.provision_prompt(plan, target)
def provision_provider_auth(
self, plan: SmolmachinesBottlePlan, target: str
) -> None:
_provider_auth.provision_provider_auth(plan, target)
def provision_skills(
self, plan: SmolmachinesBottlePlan, target: str
) -> None:
+15 -24
View File
@@ -45,19 +45,11 @@ _HOME_FOR = {
}
def _env_flags_for(user: str) -> list[str]:
def _env_assignments_for(user: str, env: Mapping[str, str]) -> list[str]:
home = _HOME_FOR.get(user, f"/home/{user}")
return ["-e", f"HOME={home}", "-e", f"USER={user}"]
def _guest_env_flags(env: Mapping[str, str]) -> list[str]:
"""Render `{K: V}` into a flat `-e K=V` argv slice for
`smolvm machine exec`. `smolvm machine create -e` set env
on PID 1 but it doesn't propagate to fresh exec process
trees, so we have to re-pass them every call."""
out: list[str] = []
out = [f"HOME={home}", f"USER={user}"]
for k, v in env.items():
out += ["-e", f"{k}={v}"]
out.append(f"{k}={v}")
return out
@@ -98,9 +90,8 @@ class SmolmachinesBottle(Bottle):
flags = ["smolvm", "machine", "exec", "--name", self.name]
if tty:
flags += ["-i", "-t"]
flags += _env_flags_for("node")
flags += _guest_env_flags(self._guest_env)
agent_tail = [self.agent_command]
agent_tail = ["env", *_env_assignments_for("node", self._guest_env),
self.agent_command]
provider_prompt_args = prompt_args(
self._agent_prompt_mode, self._prompt_path, argv=argv,
)
@@ -148,16 +139,16 @@ class SmolmachinesBottle(Bottle):
on both backends. Pass `user="root"` for tests that need
root.
`runuser -u <user> -- /bin/sh -c <script>` switches UID
without invoking a login shell; HOME / USER are set via
`smolvm -e` (see `_env_flags_for`)."""
argv = (
_env_flags_for(user)
+ _guest_env_flags(self._guest_env)
+ ["--", "runuser", "-u", user, "--", "/bin/sh", "-c", script]
)
# _smolvm.machine_exec expects argv (the bit after `--`);
# the -e flags go before, so call smolvm directly.
`runuser -u <user> -- env ... /bin/sh -c <script>` switches UID
without invoking a login shell, then sets HOME / USER and the
bottle env in the child process."""
argv = [
"--", "runuser", "-u", user, "--",
"env", *_env_assignments_for(user, self._guest_env),
"/bin/sh", "-c", script,
]
# Call smolvm directly because this path needs the host-side
# subprocess capture shape used by the Docker backend.
r = subprocess.run(
["smolvm", "machine", "exec", "--name", self.name] + argv,
capture_output=True, text=True, check=False,
+23 -7
View File
@@ -12,7 +12,7 @@ import sys
from dataclasses import dataclass
from pathlib import Path
from ...agent_provider import PromptMode
from ...agent_provider import AgentProvisionPlan, PromptMode
from ...egress import EgressPlan
from ...git_gate import GitGatePlan
from ...log import info
@@ -82,6 +82,7 @@ class SmolmachinesBottlePlan(BottlePlan):
# None when bottle.supervise is False, matching the docker
# backend's convention.
supervise_plan: SupervisePlan | None
agent_provision: AgentProvisionPlan
# Agent-side endpoints. On Docker Desktop the docker bridge
# IPs aren't reachable from the smolvm guest (TSI uses macOS
# networking; docker container IPs live in the daemon's VM),
@@ -93,10 +94,22 @@ class SmolmachinesBottlePlan(BottlePlan):
agent_proxy_url: str = ""
agent_git_gate_host: str = ""
agent_supervise_url: str = ""
agent_command: str = "claude"
agent_prompt_mode: PromptMode = "append_file"
agent_provider_template: str = "claude"
agent_dockerfile_path: str = ""
@property
def agent_command(self) -> str:
return self.agent_provision.command
didericis marked this conversation as resolved Outdated
Outdated
Review

This should be grouped with the agent_provider template in some way/probably scoped under the agent provider, and only respected when the template is set to codex

This should be grouped with the agent_provider template in some way/probably scoped under the agent provider, and only respected when the template is set to `codex`
@property
def agent_prompt_mode(self) -> PromptMode:
return self.agent_provision.prompt_mode
@property
def agent_provider_template(self) -> str:
return self.agent_provision.template
@property
def agent_dockerfile_path(self) -> str:
return self.agent_provision.dockerfile
def print(self, *, remote_control: bool) -> None:
"""Compact y/N preflight. Same shape as the Docker
@@ -108,8 +121,11 @@ class SmolmachinesBottlePlan(BottlePlan):
bottle = manifest.bottle_for(spec.agent_name)
env_names = visible_agent_env_names(
sorted(bottle.env.keys()),
agent_provider_template=self.agent_provider_template,
sorted(
set(bottle.env.keys())
| set(self.agent_provision.guest_env.keys())
),
hidden_env_names=self.agent_provision.hidden_env_names,
)
upstreams = [
f"{g.Name}{g.Upstream}" for g in bottle.git
+16 -2
View File
@@ -26,7 +26,12 @@ from contextlib import ExitStack, contextmanager
from pathlib import Path
from typing import Callable, Generator
from ...egress import EGRESS_ROUTES_IN_CONTAINER, egress_resolve_token_values
from ...codex_auth import codex_host_access_token
from ...egress import (
CODEX_HOST_CREDENTIAL_TOKEN_REF,
EGRESS_ROUTES_IN_CONTAINER,
egress_resolve_token_values,
)
from ...pipelock import (
PIPELOCK_CA_CERT_IN_CONTAINER,
PIPELOCK_CA_KEY_IN_CONTAINER,
@@ -423,7 +428,16 @@ def _resolve_token_env(
ep = plan.egress_plan
if not ep.routes:
return {}
return egress_resolve_token_values(ep.token_env_map, dict(host_env))
env = dict(host_env)
token_values = egress_resolve_token_values(ep.token_env_map, env)
if plan.spec.manifest.bottle_for(
plan.spec.agent_name,
).agent_provider.forward_host_credentials:
access_token = codex_host_access_token(env)
for token_env, token_ref in ep.token_env_map.items():
if token_ref == CODEX_HOST_CREDENTIAL_TOKEN_REF:
token_values[token_env] = access_token
return token_values
def _ensure_smolmachine(image_ref: str, *, dockerfile: str = "") -> Path:
+42 -43
View File
@@ -12,9 +12,10 @@ from __future__ import annotations
import os
from datetime import datetime, timezone
from dataclasses import replace
from pathlib import Path
from ...agent_provider import runtime_for
from ...agent_provider import agent_provision_plan, runtime_for
from ...backend import BottleSpec
from ...backend.docker.bottle_state import (
BottleMetadata,
@@ -94,47 +95,10 @@ def resolve_plan(
"REQUESTS_CA_BUNDLE": "/etc/ssl/certs/ca-certificates.crt",
}
# Inner Plans for the four bundle daemons. The ABCs are
# platform-neutral — `.prepare()` writes config files + returns
# a Plan dataclass with no backend-specific assumptions. State
# dirs are still keyed by slug under the docker backend's
# bottle_state layout (shared on-host convention; not a docker
# dependency).
pipelock_dir = pipelock_state_dir(slug)
pipelock_dir.mkdir(parents=True, exist_ok=True)
proxy_plan = PipelockProxy().prepare(bottle, slug, pipelock_dir)
git_gate_dir = git_gate_state_dir(slug)
git_gate_dir.mkdir(parents=True, exist_ok=True)
git_gate_plan = GitGate().prepare(bottle, slug, git_gate_dir)
egress_dir = egress_state_dir(slug)
egress_dir.mkdir(parents=True, exist_ok=True)
egress_plan = Egress().prepare(bottle, slug, egress_dir)
# Some provider CLIs refuse to start without *some* credential
# env var even when egress will strip + re-inject the real
# Authorization header. For those providers, auth_role names the
# route marker that enables a non-secret placeholder env. Codex is
# intentionally absent here: it should use its device/ChatGPT login
# state, and an OPENAI_API_KEY placeholder would force API-key auth.
has_provider_auth = any(
provider_runtime.auth_role
and provider_runtime.auth_role in r.roles
for r in egress_plan.routes
)
if has_provider_auth and provider_runtime.placeholder_env:
guest_env[provider_runtime.placeholder_env] = "egress-placeholder"
if provider.template == "claude" and has_provider_auth:
guest_env.setdefault("CLAUDE_CODE_DISABLE_NONESSENTIAL_TRAFFIC", "1")
guest_env.setdefault("DISABLE_ERROR_REPORTING", "1")
supervise_plan = None
if bottle.supervise:
supervise_dir = supervise_state_dir(slug)
supervise_dir.mkdir(parents=True, exist_ok=True)
supervise_plan = Supervise().prepare(slug, supervise_dir)
# Prompt file is always written (mode 0o600) so the in-VM
# path always exists. Content is the agent's `prompt`
# field (markdown body) — empty for agents with no prompt.
@@ -162,6 +126,44 @@ def resolve_plan(
else:
image_default = provider_runtime.image
agent_image_ref = os.environ.get("BOT_BOTTLE_IMAGE", image_default)
agent_provision = agent_provision_plan(
didericis marked this conversation as resolved Outdated
Outdated
Review

this should also be in the agent provisioner now, assuming we can evaluate has_provider_auth at that stage. If not we'll need a more generic hook to call into here/there should not be any logic specific to an specific type of agent in here anymore.

this should also be in the agent provisioner now, assuming we can evaluate `has_provider_auth` at that stage. If not we'll need a more generic hook to call into here/there should not be any logic specific to an specific type of agent in here anymore.
template=provider.template,
dockerfile=agent_dockerfile_path,
state_dir=agent_dir,
guest_home=os.environ.get("BOT_BOTTLE_GUEST_HOME", "/home/node"),
guest_env=guest_env,
forward_host_credentials=provider.forward_host_credentials,
auth_token=provider.auth_token,
host_env=dict(os.environ),
)
merged_guest_env = dict(agent_provision.guest_env)
for key, val in agent_provision.env_vars.items():
Outdated
Review

env provisioning should be a part of the agent provider plan/we shouldn't need to know anything about codex here.

env provisioning should be a part of the agent provider plan/we shouldn't need to know anything about codex here.
merged_guest_env.setdefault(key, val)
agent_provision = replace(agent_provision, guest_env=merged_guest_env)
# Inner Plans for the four bundle daemons. The ABCs are
# platform-neutral — `.prepare()` writes config files + returns
# a Plan dataclass with no backend-specific assumptions. State
# dirs are still keyed by slug under the docker backend's
# bottle_state layout (shared on-host convention; not a docker
# dependency).
pipelock_dir = pipelock_state_dir(slug)
pipelock_dir.mkdir(parents=True, exist_ok=True)
proxy_plan = PipelockProxy().prepare(
bottle, slug, pipelock_dir, agent_provision.egress_routes,
)
egress_dir = egress_state_dir(slug)
egress_dir.mkdir(parents=True, exist_ok=True)
egress_plan = Egress().prepare(
bottle, slug, egress_dir, agent_provision.egress_routes,
)
supervise_plan = None
if bottle.supervise:
supervise_dir = supervise_state_dir(slug)
supervise_dir.mkdir(parents=True, exist_ok=True)
supervise_plan = Supervise().prepare(slug, supervise_dir)
return SmolmachinesBottlePlan(
spec=spec,
@@ -172,16 +174,13 @@ def resolve_plan(
bundle_ip=bundle_ip,
machine_name=machine_name,
agent_image_ref=agent_image_ref,
guest_env=guest_env,
guest_env=agent_provision.guest_env,
prompt_file=prompt_file,
proxy_plan=proxy_plan,
git_gate_plan=git_gate_plan,
egress_plan=egress_plan,
supervise_plan=supervise_plan,
agent_command=provider_runtime.command,
agent_prompt_mode=provider_runtime.prompt_mode,
agent_provider_template=provider.template,
agent_dockerfile_path=agent_dockerfile_path,
agent_provision=agent_provision,
)
@@ -0,0 +1,33 @@
"""Provision non-secret provider auth markers into a smolmachines bottle."""
from __future__ import annotations
from ....log import die
from .. import smolvm as _smolvm
from ..bottle_plan import SmolmachinesBottlePlan
def provision_provider_auth(plan: SmolmachinesBottlePlan, target: str) -> None:
"""Apply provider-owned guest setup through smolvm primitives."""
provision = plan.agent_provision
for d in provision.dirs:
_exec(target, ["mkdir", "-p", d.guest_path], f"could not create {d.guest_path}")
_exec(target, ["chown", d.owner, d.guest_path], f"could not chown {d.guest_path}")
_exec(target, ["chmod", d.mode, d.guest_path], f"could not chmod {d.guest_path}")
for command in provision.pre_copy:
_exec(target, list(command.argv), command.error)
for f in provision.files:
_smolvm.machine_cp(str(f.host_path), f"{target}:{f.guest_path}")
_exec(target, ["chown", f.owner, f.guest_path], f"could not chown {f.guest_path}")
_exec(target, ["chmod", f.mode, f.guest_path], f"could not chmod {f.guest_path}")
for command in provision.verify:
_exec(target, list(command.argv), command.error)
def _exec(target: str, argv: list[str], error: str) -> None:
result = _smolvm.machine_exec(target, argv)
if result.returncode != 0:
detail = (result.stderr or result.stdout).strip()
if detail:
detail = f": {detail}"
die(f"agent provider provisioning: {error}{detail}")
+290
View File
@@ -0,0 +1,290 @@
"""Host Codex auth helpers.
Reads the host's Codex ChatGPT/device-login auth state and returns only
the short-lived access token needed by egress. This module deliberately
does not expose refresh tokens or raw auth payloads.
"""
from __future__ import annotations
import base64
import json
import os
from copy import deepcopy
from datetime import datetime, timezone
from pathlib import Path
from .log import die
from .util import expand_tilde
def codex_auth_path(host_env: dict[str, str] | None = None) -> Path:
env = os.environ if host_env is None else host_env
home = env.get("CODEX_HOME")
if home:
return Path(expand_tilde(home)) / "auth.json"
return Path.home() / ".codex" / "auth.json"
def codex_host_access_token(
host_env: dict[str, str] | None = None,
*,
now: datetime | None = None,
) -> str:
path = codex_auth_path(host_env)
if not path.is_file():
die(
f"codex host credentials: auth file missing at {path}. "
"Run `codex login --device-auth` on the host or disable "
"agent_provider.forward_host_credentials."
)
raw = _read_auth_object(path)
auth_mode = raw.get("auth_mode")
if not isinstance(auth_mode, str) or auth_mode == "api_key":
die(
"codex host credentials: host Codex auth is not user/device "
"auth. Run `codex login --device-auth` on the host."
)
tokens = raw.get("tokens")
if not isinstance(tokens, dict):
die(f"codex host credentials: {path} is missing tokens")
access = tokens.get("access_token")
if not isinstance(access, str) or not access:
die(
f"codex host credentials: {path} is missing tokens.access_token. "
"Run `codex login --device-auth` on the host."
)
exp = _jwt_exp(access)
if exp is None:
die("codex host credentials: tokens.access_token is not a JWT with exp")
check_now = now or datetime.now(timezone.utc)
if exp <= check_now:
die(
"codex host credentials: host Codex access token is expired. "
"Run `codex login --device-auth` on the host and restart the bottle."
)
return access
def codex_dummy_auth_json(
host_env: dict[str, str] | None = None,
*,
now: datetime | None = None,
) -> str:
"""Return a non-secret `auth.json` that keeps Codex in the host's
auth branch while egress owns the real bearer token.
The dummy access/id tokens carry the *host* token's real `exp` so
Codex's proactive refresh lifecycle (it refreshes when its local
access token is at/past expiry) tracks the real token instead of
firing after an artificial TTL. Codex cannot refresh inside the
bottle the refresh token is a placeholder and the OpenAI token
endpoint is off-route so a shorter dummy exp would drop Codex to
the sign-in screen the moment it lapsed, even while egress still
holds a valid bearer."""
path = codex_auth_path(host_env)
access = codex_host_access_token(host_env, now=now)
raw = _read_auth_object(path)
host_exp = _jwt_exp(access)
exp_ts = int(host_exp.timestamp()) if host_exp is not None else None
dummy = _redact_codex_auth(deepcopy(raw), now=now, exp_ts=exp_ts)
return json.dumps(dummy, indent=2, sort_keys=True) + "\n"
def write_codex_dummy_auth_file(
path: Path,
host_env: dict[str, str] | None = None,
*,
now: datetime | None = None,
) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(codex_dummy_auth_json(host_env, now=now))
path.chmod(0o600)
def _read_auth_object(path: Path) -> dict:
try:
raw = json.loads(path.read_text())
except (OSError, json.JSONDecodeError) as e:
die(f"codex host credentials: could not read valid JSON at {path}: {e}")
if not isinstance(raw, dict):
die(f"codex host credentials: {path} must contain a JSON object")
return raw
def _dummy_exp(now: datetime | None, exp_ts: int | None) -> int:
if exp_ts is not None:
return exp_ts
check_now = now or datetime.now(timezone.utc)
return int(check_now.timestamp()) + 3600
def _dummy_jwt(now: datetime | None = None, *, exp_ts: int | None = None) -> str:
return _encode_dummy_jwt({
"exp": _dummy_exp(now, exp_ts),
"sub": "bot-bottle-placeholder",
})
def _dummy_jwt_from_host(
value: object, *, now: datetime | None = None, exp_ts: int | None = None,
) -> str:
if not isinstance(value, str):
return _dummy_jwt(now, exp_ts=exp_ts)
parts = value.split(".")
if len(parts) < 2:
return _dummy_jwt(now, exp_ts=exp_ts)
try:
payload = json.loads(_b64url_decode(parts[1]))
except (ValueError, json.JSONDecodeError):
return _dummy_jwt(now, exp_ts=exp_ts)
if not isinstance(payload, dict):
return _dummy_jwt(now, exp_ts=exp_ts)
return _encode_dummy_jwt(_redact_jwt_payload(payload, now=now, exp_ts=exp_ts))
def _encode_dummy_jwt(payload: dict) -> str:
def enc(obj: dict) -> str:
raw = json.dumps(obj, separators=(",", ":")).encode()
return base64.urlsafe_b64encode(raw).decode().rstrip("=")
return f"{enc({'alg': 'none', 'typ': 'JWT'})}.{enc(payload)}.placeholder"
def _redact_jwt_payload(
payload: dict,
*,
now: datetime | None = None,
exp_ts: int | None = None,
) -> dict:
out = _redact_claims(payload)
if not isinstance(out, dict):
out = {}
out["exp"] = _dummy_exp(now, exp_ts)
out.setdefault("sub", "bot-bottle-placeholder")
return out
def _redact_claims(value: object) -> object:
if isinstance(value, dict):
out: dict[str, object] = {}
for key, inner in value.items():
lower = key.lower()
if key == "https://api.openai.com/profile":
out[key] = _redact_profile_claim(inner)
elif key == "https://api.openai.com/auth":
out[key] = _redact_auth_claim(inner)
elif lower == "email":
out[key] = "bot-bottle@example.invalid"
elif lower == "email_verified":
out[key] = True
elif lower in {"exp", "iat", "nbf", "auth_time", "pwd_auth_time"}:
out[key] = inner if isinstance(inner, (int, float)) else 0
elif lower in {"aud", "scp", "amr"}:
out[key] = inner if isinstance(inner, list) else []
elif isinstance(inner, bool):
out[key] = inner
elif isinstance(inner, (dict, list)):
out[key] = _redact_claims(inner)
else:
out[key] = "bot-bottle-placeholder"
return out
if isinstance(value, list):
return []
return "bot-bottle-placeholder"
def _redact_profile_claim(value: object) -> dict:
profile = value if isinstance(value, dict) else {}
return {
"email": "bot-bottle@example.invalid",
"email_verified": bool(profile.get("email_verified", True)),
}
def _redact_auth_claim(value: object) -> dict:
auth = value if isinstance(value, dict) else {}
out: dict[str, object] = {}
for key, inner in auth.items():
lower = key.lower()
if lower == "chatgpt_plan_type" and isinstance(inner, str) and inner:
out[key] = inner
elif lower == "chatgpt_account_id" and isinstance(inner, str) and inner:
# Current Codex uses the selected account id when building
# ChatGPT requests. Keep that non-secret identifier aligned
# with the host while egress owns the real bearer token.
out[key] = inner
elif lower == "localhost" and isinstance(inner, bool):
out[key] = inner
elif isinstance(inner, bool):
out[key] = inner
elif isinstance(inner, list):
out[key] = []
elif isinstance(inner, dict):
out[key] = {}
else:
out[key] = "bot-bottle-placeholder"
out.setdefault("chatgpt_plan_type", "unknown")
out.setdefault("user_id", "bot-bottle-placeholder")
out.setdefault("chatgpt_user_id", "bot-bottle-placeholder")
out.setdefault("chatgpt_account_id", "bot-bottle-placeholder")
return out
def _redact_codex_auth(
value: object, *, now: datetime | None = None, exp_ts: int | None = None,
) -> object:
if isinstance(value, dict):
out: dict[str, object] = {}
for key, inner in value.items():
lower = key.lower()
if lower == "openai_api_key":
out[key] = None
elif lower == "tokens":
out[key] = _redact_codex_auth(inner, now=now, exp_ts=exp_ts)
elif lower in {"access_token", "id_token"}:
out[key] = _dummy_jwt_from_host(inner, now=now, exp_ts=exp_ts)
elif "token" in lower or "secret" in lower or lower.endswith("_key"):
out[key] = "bot-bottle-placeholder"
elif lower == "account_id" and isinstance(inner, str) and inner:
out[key] = inner
elif lower in {"account_id", "user_id", "email"}:
out[key] = "bot-bottle-placeholder"
else:
out[key] = _redact_codex_auth(inner, now=now, exp_ts=exp_ts)
return out
if isinstance(value, list):
return [_redact_codex_auth(v, now=now, exp_ts=exp_ts) for v in value]
return value
def _jwt_exp(token: str) -> datetime | None:
parts = token.split(".")
if len(parts) < 2:
return None
try:
payload = json.loads(_b64url_decode(parts[1]))
except (ValueError, json.JSONDecodeError):
return None
if not isinstance(payload, dict):
return None
exp = payload.get("exp")
if not isinstance(exp, (int, float)):
return None
return datetime.fromtimestamp(exp, timezone.utc)
def _b64url_decode(value: str) -> str:
padded = value + ("=" * (-len(value) % 4))
return base64.urlsafe_b64decode(padded.encode("ascii")).decode("utf-8")
__all__ = [
"codex_auth_path",
"codex_dummy_auth_json",
"codex_host_access_token",
"write_codex_dummy_auth_file",
]
+116 -18
View File
@@ -27,9 +27,14 @@ from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING
from .log import die
from .manifest import Bottle
if TYPE_CHECKING:
from .manifest import Bottle
CODEX_HOST_CREDENTIAL_TOKEN_REF = "BOT_BOTTLE_CODEX_HOST_ACCESS_TOKEN"
# DNS name agents will dial for the per-bottle egress sidecar.
@@ -64,9 +69,14 @@ class EgressRoute:
under `token_env`. Routes that share a `token_ref` coalesce to
one `token_env` slot.
`roles` carries the manifest route's optional role markers (see
`manifest.EGRESS_ROLES`). The launch step reads these for
side effects like the claude-code OAuth placeholder env."""
`roles` carries the manifest route's role tuple (reserved for
future use; always empty today).
`tls_passthrough` signals that pipelock must not TLS-MITM this
host either because the manifest declared `pipelock.tls_passthrough:
true` (lifted in `egress_manifest_routes`) or because a provider
route set it (e.g. egress injects its own Bearer on that host
after the agent boundary and pipelock's header DLP would block it)."""
host: str
path_allowlist: tuple[str, ...] = ()
@@ -74,6 +84,7 @@ class EgressRoute:
token_env: str = ""
token_ref: str = ""
roles: tuple[str, ...] = ()
tls_passthrough: bool = False
@dataclass(frozen=True)
@@ -158,27 +169,106 @@ def egress_manifest_routes(
token_env=token_env,
token_ref=r.TokenRef,
roles=r.Role,
tls_passthrough=r.Pipelock.TlsPassthrough,
))
else:
out.append(EgressRoute(
host=r.Host,
path_allowlist=r.PathAllowlist,
roles=r.Role,
tls_passthrough=r.Pipelock.TlsPassthrough,
))
return tuple(out)
def egress_routes_for_bottle(
bottle: Bottle,
provider_routes: tuple[EgressRoute, ...] = (),
) -> tuple[EgressRoute, ...]:
"""Effective egress routes. This is what gets rendered into
routes.yaml + what the addon enforces.
"""Effective egress routes for the agent. This is what gets rendered
into routes.yaml and what the addon enforces.
Operators that want to allow a host declare it directly in
`bottle.egress.routes` as an authenticated route or bare-pass entry
(`- host: <name>`). The legacy `bottle.egress.allowlist`
folding is gone egress is the single allowlist surface."""
return egress_manifest_routes(bottle)
Merges manifest-declared routes with provider-owned routes. The
manifest is the primary surface; `provider_routes` are synthesised
by `agent_provision_plan` and may add or upgrade manifest entries.
Provider routes that conflict with an existing authenticated manifest
route (different auth scheme or token ref) raise a hard error."""
routes = list(egress_manifest_routes(bottle))
for pr in provider_routes:
routes = _merge_provider_route(routes, pr)
return tuple(routes)
def _find_or_alloc_token_env(routes: list[EgressRoute], token_ref: str) -> str:
"""Return the existing token_env slot for `token_ref`, or allocate the next one."""
if not token_ref:
return ""
for route in routes:
if route.token_ref == token_ref and route.token_env:
return route.token_env
return f"EGRESS_TOKEN_{len({r.token_env for r in routes if r.token_env})}"
def _merge_provider_route(
routes: list[EgressRoute], pr: EgressRoute,
) -> list[EgressRoute]:
"""Merge one provider-declared route into the manifest route list.
Upgrade a bare-pass manifest route to authenticated if the provider
declares auth for that host, or append if the host isn't in the manifest.
Identical auth (same scheme + token_ref) on an existing route is a
no-op, with a tls_passthrough upgrade if the provider route sets it.
Conflicting auth (different scheme or token_ref) dies."""
for idx, route in enumerate(routes):
if route.host.lower() != pr.host.lower():
continue
if route.auth_scheme or route.token_ref:
if route.auth_scheme == pr.auth_scheme and route.token_ref == pr.token_ref:
if pr.tls_passthrough and not route.tls_passthrough:
routes[idx] = EgressRoute(
host=route.host,
path_allowlist=route.path_allowlist,
auth_scheme=route.auth_scheme,
token_env=route.token_env,
token_ref=route.token_ref,
roles=route.roles,
tls_passthrough=True,
)
return routes
die(
f"provider egress route for {pr.host!r} conflicts with an "
f"authenticated manifest route (different auth scheme or token "
f"ref). Remove the manifest route's auth block or disable the "
f"feature that adds this provider route."
)
token_env = (
_find_or_alloc_token_env(routes, pr.token_ref)
if pr.auth_scheme and pr.token_ref
else ""
)
routes[idx] = EgressRoute(
host=route.host,
path_allowlist=route.path_allowlist,
auth_scheme=pr.auth_scheme,
token_env=token_env,
token_ref=pr.token_ref,
roles=route.roles,
tls_passthrough=pr.tls_passthrough,
)
return routes
token_env = (
_find_or_alloc_token_env(routes, pr.token_ref)
if pr.auth_scheme and pr.token_ref
else ""
)
routes.append(EgressRoute(
host=pr.host,
auth_scheme=pr.auth_scheme,
token_env=token_env,
token_ref=pr.token_ref,
tls_passthrough=pr.tls_passthrough,
))
return routes
def egress_token_env_map(
@@ -193,7 +283,7 @@ def egress_token_env_map(
silently picking one."""
out: dict[str, str] = {}
for r in routes:
if not r.token_env:
if not (r.auth_scheme and r.token_ref and r.token_env):
continue
existing = out.get(r.token_env)
if existing is not None and existing != r.token_ref:
@@ -251,6 +341,8 @@ def egress_resolve_token_values(
a sealed mapping without touching `os.environ`."""
out: dict[str, str] = {}
for token_env, token_ref in token_env_map.items():
if token_ref == CODEX_HOST_CREDENTIAL_TOKEN_REF:
continue
value = host_env.get(token_ref)
if value is None:
die(
@@ -274,18 +366,23 @@ class Egress(ABC):
sidecar's start/stop lifecycle is backend-specific and lives on
concrete subclasses."""
def prepare(self, bottle: Bottle, slug: str, stage_dir: Path) -> EgressPlan:
"""Lift `bottle.egress.routes` into resolved routes,
render the routes file (mode 600) under `stage_dir`, and
def prepare(
self,
bottle: Bottle,
slug: str,
stage_dir: Path,
provider_routes: tuple[EgressRoute, ...] = (),
) -> EgressPlan:
"""Lift `bottle.egress.routes` + `provider_routes` into resolved
routes, render the routes file (mode 600) under `stage_dir`, and
return the plan. Pure host-side, no docker subprocess. The
token-env map records the mapping the launch step uses to
forward values from the host's environ into the sidecar's
environ.
forward values from the host's environ into the sidecar's environ.
Returned plan is incomplete: the launch step must fill
`internal_network` / `egress_network` / `pipelock_proxy_url`
via `dataclasses.replace` before passing it to `.start`."""
routes = egress_routes_for_bottle(bottle)
routes = egress_routes_for_bottle(bottle, provider_routes)
routes_path = stage_dir / "egress_routes.yaml"
routes_path.write_text(egress_render_routes(routes))
routes_path.chmod(0o600)
@@ -297,6 +394,7 @@ class Egress(ABC):
)
__all__ = [
"CODEX_HOST_CREDENTIAL_TOKEN_REF",
"EGRESS_HOSTNAME",
"EGRESS_ROUTES_IN_CONTAINER",
"Egress",
+58 -102
View File
@@ -175,47 +175,6 @@ class GitEntry:
# token-not-Bearer quirk (go-gitea/gitea#16734).
EGRESS_AUTH_SCHEMES = ("Bearer", "token")
# Optional per-route role markers. A role signals "this route plays
# a specific named part in the bottle's auth flow"; the launch step
# acts on the marker.
#
# claude_code_oauth: this route auth-injects on the agent's
# claude-code OAuth flow. Triggers prepare.py
# to ship a placeholder CLAUDE_CODE_OAUTH_TOKEN
# to the agent (so claude-code starts) and
# disable nonessential-traffic / error-reporting
# env vars. Host doesn't matter to the placeholder
# logic — declare the role on whichever route
# injects the OAuth header.
#
# codex_auth: placeholder marker reserved for follow-up Codex
# credential-injection work. It is still accepted so
# existing manifests and future egress-held auth flows
# have a stable role name, but it no longer triggers an
# OPENAI_API_KEY placeholder. Codex bottles should prefer
# device/ChatGPT login state today.
#
# Routes without a `role` are pure proxy entries: egress
# enforces path_allowlist + injects auth on its own, but nothing
# special happens on the agent side.
EGRESS_ROLES = frozenset({
"claude_code_oauth",
"codex_auth",
})
# Singleton roles may appear on at most one route per bottle. Some
# roles drive a single provider auth path; two routes claiming one
# marker would leave "which one is canonical?" ambiguous.
EGRESS_SINGLETON_ROLES = frozenset({
"claude_code_oauth",
"codex_auth",
})
PROVIDER_EGRESS_ROLES = {
"claude": frozenset({"claude_code_oauth"}),
"codex": frozenset({"codex_auth"}),
}
@dataclass(frozen=True)
class AgentProvider:
@@ -224,19 +183,30 @@ class AgentProvider:
`template` selects a built-in launch/runtime contract. `dockerfile`
optionally points at a custom agent-image Dockerfile while leaving
bot-bottle's sidecar infrastructure intact.
`auth_token` names the host env var that holds the provider's OAuth
token (Claude only). The provisioner injects a provider-owned egress
route for api.anthropic.com that re-injects this token as the Bearer
header, and sets a placeholder CLAUDE_CODE_OAUTH_TOKEN in the agent
so the Claude Code CLI starts.
`forward_host_credentials` forwards the host Codex auth token into
the egress sidecar (Codex only).
"""
template: str = "claude"
dockerfile: str = ""
auth_token: str = ""
forward_host_credentials: bool = False
@classmethod
def from_dict(cls, bottle_name: str, raw: object) -> "AgentProvider":
d = _as_json_object(raw, f"bottle '{bottle_name}' agent_provider")
for k in d:
if k not in {"template", "dockerfile"}:
if k not in {"template", "dockerfile", "auth_token", "forward_host_credentials"}:
raise ManifestError(
f"bottle '{bottle_name}' agent_provider has unknown key {k!r}; "
f"allowed: template, dockerfile"
f"allowed: template, dockerfile, auth_token, forward_host_credentials"
)
template = d.get("template", "claude")
if not isinstance(template, str) or not template:
@@ -255,7 +225,34 @@ class AgentProvider:
f"bottle '{bottle_name}' agent_provider.dockerfile must be a "
f"string (was {type(dockerfile).__name__})"
)
return cls(template=template, dockerfile=dockerfile)
auth_token = d.get("auth_token", "")
if not isinstance(auth_token, str):
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.auth_token must be a "
f"string (was {type(auth_token).__name__})"
)
if auth_token and template != "claude":
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.auth_token is only "
f"supported for template 'claude'"
)
forward_host_credentials = d.get("forward_host_credentials", False)
if not isinstance(forward_host_credentials, bool):
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.forward_host_credentials "
f"must be a boolean (was {type(forward_host_credentials).__name__})"
)
if forward_host_credentials and template != "codex":
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.forward_host_credentials "
"is currently only supported for template 'codex'"
)
return cls(
template=template,
dockerfile=dockerfile,
auth_token=auth_token,
forward_host_credentials=forward_host_credentials,
)
@dataclass(frozen=True)
@@ -412,10 +409,8 @@ class EgressRoute:
manifest's `auth` block is omitted both fields are empty strings —
no Authorization is written, no token forwarded.
`Role` is an optional tuple of named markers (see
EGRESS_ROLES). The launch step reads these and triggers
associated side effects (e.g. the `claude_code_oauth` marker
causes prepare.py to set a placeholder OAuth env on the agent).
`Role` is reserved for future use; all role strings are currently
rejected by the validator.
Validation rules (enforced in `from_dict`):
- `host` required, non-empty.
@@ -424,10 +419,7 @@ class EgressRoute:
`token_ref` as non-empty strings; an empty `auth: {}` is an
error rather than a synonym for "no auth" (omit `auth` for
that case).
- `role` optional. String or list of strings drawn from
EGRESS_ROLES. Singleton roles (see
EGRESS_SINGLETON_ROLES) may appear on at most one
route per bottle.
- `role` optional, reserved any non-empty value is rejected.
"""
Host: str
@@ -525,12 +517,11 @@ class EgressRoute:
f"{label} role must be a string or a list of strings "
f"(was {type(role_raw).__name__})"
)
for r in roles:
if r not in EGRESS_ROLES:
raise ManifestError(
f"{label} role {r!r} is not one of "
f"{', '.join(sorted(EGRESS_ROLES))}"
)
if roles:
raise ManifestError(
f"{label} role {roles[0]!r} is not accepted; "
f"the 'role' field is reserved for future use"
)
pipelock = (
PipelockRoutePolicy.from_dict(bottle_name, idx, d["pipelock"])
@@ -565,9 +556,7 @@ class EgressConfig:
routes: tuple[EgressRoute, ...] = ()
@classmethod
def from_dict(
cls, bottle_name: str, raw: object, *, agent_provider_template: str = "claude",
) -> "EgressConfig":
def from_dict(cls, bottle_name: str, raw: object) -> "EgressConfig":
d = _as_json_object(raw, f"bottle '{bottle_name}' egress")
routes_raw = d.get("routes")
routes: tuple[EgressRoute, ...] = ()
@@ -582,9 +571,7 @@ class EgressConfig:
EgressRoute.from_dict(bottle_name, i, entry)
for i, entry in enumerate(routes_list)
)
_validate_egress_routes(
bottle_name, routes, agent_provider_template=agent_provider_template,
)
_validate_egress_routes(bottle_name, routes)
for k in d:
if k != "routes":
raise ManifestError(
@@ -675,10 +662,7 @@ class Bottle:
)
egress = (
EgressConfig.from_dict(
name, d["egress"],
agent_provider_template=agent_provider.template,
)
EgressConfig.from_dict(name, d["egress"])
if "egress" in d
else EgressConfig()
)
@@ -1042,21 +1026,15 @@ def _is_ip_literal(value: str) -> bool:
def _validate_egress_routes(
bottle_name: str,
routes: tuple[EgressRoute, ...],
*,
agent_provider_template: str = "claude",
) -> None:
"""Cross-validation for `bottle.egress.routes`:
"""Cross-validation for `bottle.egress.routes`: hosts must be unique.
- Hosts must be unique within the bottle. The proxy matches by
exact-host (v1, prefix matching is on path_allowlist only);
duplicate hosts leave the route choice ambiguous.
- Singleton roles (see EGRESS_SINGLETON_ROLES) may appear
on at most one route per bottle.
The proxy matches by exact-host (v1); duplicate hosts leave the
route choice ambiguous so we reject them up front.
No cross-validation against `bottle.git` is performed. git-gate
(SSH push/fetch) and egress (HTTPS) broker different
protocols; declaring both for the same host is a legitimate
dev setup."""
(SSH push/fetch) and egress (HTTPS) broker different protocols;
declaring both for the same host is a legitimate dev setup."""
seen_hosts: dict[str, None] = {}
for r in routes:
key = r.Host.lower()
@@ -1066,25 +1044,6 @@ def _validate_egress_routes(
f"{r.Host!r}; each host must be unique on the proxy."
)
seen_hosts[key] = None
for role in EGRESS_SINGLETON_ROLES:
with_role = [r for r in routes if role in r.Role]
if len(with_role) > 1:
hosts = ", ".join(r.Host for r in with_role)
raise ManifestError(
f"bottle '{bottle_name}' egress.routes has {len(with_role)} "
f"routes with role {role!r} (hosts: {hosts}); this role drives a "
f"single launch-step side effect — pick one."
)
allowed_roles = PROVIDER_EGRESS_ROLES[agent_provider_template]
for route in routes:
for role in route.Role:
if role not in allowed_roles:
raise ManifestError(
f"bottle '{bottle_name}' egress route for host "
f"{route.Host!r} has role {role!r}, but provider "
f"{agent_provider_template!r} only accepts roles "
f"{', '.join(sorted(allowed_roles)) or '(none)'}"
)
def _validate_unique_git_names(bottle_name: str, git: tuple[GitEntry, ...]) -> None:
@@ -1295,10 +1254,7 @@ def _merge_bottles(
merged_supervise = (
child.supervise if "supervise" in child_raw else parent.supervise
)
_validate_egress_routes(
name, merged_egress.routes,
agent_provider_template=merged_agent_provider.template,
)
_validate_egress_routes(name, merged_egress.routes)
return Bottle(
env=merged_env,
+33 -20
View File
@@ -21,7 +21,7 @@ from dataclasses import dataclass
from pathlib import Path
from typing import cast
from .egress import EGRESS_HOSTNAME, egress_routes_for_bottle
from .egress import EGRESS_HOSTNAME, EgressRoute, egress_routes_for_bottle
from .supervise import SUPERVISE_HOSTNAME
from .manifest import Bottle
@@ -50,14 +50,17 @@ PIPELOCK_HOSTNAME = "pipelock"
# --- Allowlist resolution --------------------------------------------------
def pipelock_effective_allowlist(bottle: Bottle) -> list[str]:
def pipelock_effective_allowlist(
bottle: Bottle,
provider_routes: tuple[EgressRoute, ...] = (),
) -> list[str]:
"""Hostnames pipelock allows. Sorted for stability.
Always mirrors `egress_routes_for_bottle(bottle)` egress is the
single allowlist surface, and pipelock's allowlist is the downstream
copy for defense-in-depth + DLP body scanning. For bottles without
any `egress.routes[]` declared, this is empty except for supervise
sidecar traffic when `supervise: true`.
Always mirrors `egress_routes_for_bottle(bottle, provider_routes)`
egress is the single allowlist surface, and pipelock's allowlist is
the downstream copy for defense-in-depth + DLP body scanning. For
bottles without any `egress.routes[]` declared, this is empty except
for supervise sidecar traffic when `supervise: true`.
The supervise sidecar's hostname is auto-added when supervise
is enabled (sibling-sidecar traffic that flows through pipelock
@@ -65,7 +68,7 @@ def pipelock_effective_allowlist(bottle: Bottle) -> list[str]:
`bottle.git` do NOT contribute here git traffic flows
through git-gate (PRD 0008), not pipelock."""
seen: dict[str, None] = {}
for r in egress_routes_for_bottle(bottle):
for r in egress_routes_for_bottle(bottle, provider_routes):
if r.host:
seen.setdefault(r.host, None)
if bottle.supervise:
@@ -98,19 +101,23 @@ def pipelock_seed_phrase_detection_enabled(bottle: Bottle) -> bool:
return False
def pipelock_effective_tls_passthrough(bottle: Bottle) -> list[str]:
def pipelock_effective_tls_passthrough(
bottle: Bottle,
provider_routes: tuple[EgressRoute, ...] = (),
) -> list[str]:
"""Hostnames pipelock should pass through (no TLS MITM).
A route opts in with `pipelock.tls_passthrough: true`. This is
useful for provider API routes where egress injects the
Authorization header after the agent boundary; pipelock still
enforces the host allowlist but does not decrypt and scan that
provider request.
A manifest route opts in with `pipelock.tls_passthrough: true`
(lifted into `EgressRoute.tls_passthrough` in `egress_manifest_routes`).
Provider routes that set `tls_passthrough=True` (e.g. Codex credential
routes where egress injects the host bearer after the agent boundary)
are also included. Both arrive via `egress_routes_for_bottle` no
provider-specific branching needed here.
"""
seen: dict[str, None] = {host: None for host in DEFAULT_TLS_PASSTHROUGH}
for route in bottle.egress.routes:
if route.Pipelock.TlsPassthrough:
seen.setdefault(route.Host, None)
for route in egress_routes_for_bottle(bottle, provider_routes):
if route.tls_passthrough:
seen.setdefault(route.host, None)
return sorted(seen.keys())
1
@@ -142,6 +149,7 @@ def pipelock_build_config(
ca_cert_path: str = "",
ca_key_path: str = "",
ssrf_ip_allowlist: tuple[str, ...] = (),
provider_routes: tuple[EgressRoute, ...] = (),
) -> dict[str, object]:
"""Build the structured pipelock config dict the sidecar will load.
@@ -171,7 +179,7 @@ def pipelock_build_config(
"version": 1,
"mode": "strict",
"enforce": True,
"api_allowlist": pipelock_effective_allowlist(bottle),
"api_allowlist": pipelock_effective_allowlist(bottle, provider_routes),
"forward_proxy": {"enabled": True},
}
if not pipelock_seed_phrase_detection_enabled(bottle):
@@ -205,7 +213,7 @@ def pipelock_build_config(
"enabled": True,
"ca_cert": ca_cert_path,
"ca_key": ca_key_path,
"passthrough_domains": pipelock_effective_tls_passthrough(bottle),
"passthrough_domains": pipelock_effective_tls_passthrough(bottle, provider_routes),
}
effective_ssrf_ip_allowlist = pipelock_effective_ssrf_ip_allowlist(
bottle, ssrf_ip_allowlist,
@@ -319,7 +327,11 @@ class PipelockProxy:
(`PIPELOCK_CA_CERT_IN_CONTAINER` / `PIPELOCK_CA_KEY_IN_CONTAINER`)."""
def prepare(
self, bottle: Bottle, slug: str, stage_dir: Path
self,
bottle: Bottle,
slug: str,
stage_dir: Path,
provider_routes: tuple[EgressRoute, ...] = (),
) -> PipelockProxyPlan:
"""Write the pipelock yaml config (mode 600) under `stage_dir`
and return the plan for launch. Pure host-side, no docker
@@ -342,6 +354,7 @@ class PipelockProxy:
bottle,
ca_cert_path=PIPELOCK_CA_CERT_IN_CONTAINER,
ca_key_path=PIPELOCK_CA_KEY_IN_CONTAINER,
provider_routes=provider_routes,
)
yaml_path.write_text(pipelock_render_yaml(cfg))
yaml_path.chmod(0o600)
@@ -0,0 +1,198 @@
# PRD 0029: Provider auth credentials through egress
- **Status:** Active
- **Author:** didericis-codex
- **Created:** 2026-05-29
- **Issue:** #109
## Summary
Allow provider bottles to inject host credentials into the egress
sidecar without exposing them to the agent. Codex uses
`agent_provider.forward_host_credentials` for ChatGPT/device-login
access tokens. Claude uses `agent_provider.auth_token` to name the host
env var holding its OAuth token, which egress injects on
`api.anthropic.com` requests.
## Problem
Codex bottles can reach OpenAI hosts after they are added to egress, but
requests to Codex's ChatGPT-backed API endpoints still fail with HTTP
403 when the egress route is unauthenticated. The egress proxy strips
agent-originated `Authorization` headers and only re-injects auth for
routes that declare an egress-owned token. Bare `api.openai.com` or
`chatgpt.com` routes therefore forward Codex requests without the
ChatGPT bearer token.
Copying the host `~/.codex/auth.json` into the agent would solve auth
mode detection but would also put access and refresh material inside the
agent sandbox. That cuts against bot-bottle's credential minimization
model: provider credentials should live in the sidecar boundary when
possible, not in the agent.
## Goals / Success Criteria
- A Codex bottle with host ChatGPT auth can call Codex's
`api.openai.com` and `chatgpt.com` endpoints through egress.
- Host credential forwarding happens only when the bottle declares
`agent_provider.forward_host_credentials: true`.
- The agent container does not receive `OPENAI_API_KEY`,
`CODEX_ACCESS_TOKEN`, or real `tokens.access_token` /
`tokens.refresh_token` values.
- The agent container receives only a dummy Codex `auth.json` that
preserves the host auth-mode shape, keeps the selected ChatGPT
account id, and replaces credential values with placeholders.
- Egress route files remain non-secret: they contain only host/path/auth
slot metadata, never token values.
- Missing, API-key, malformed, or expired host Codex auth fails
launch with a clear operator-facing message.
- Existing Claude OAuth placeholder behavior remains unchanged.
## Non-goals
- Refreshing Codex tokens in the sidecar. The first cut reads the host's
current access token at launch; operators can restart after host Codex
refreshes auth.
- Copying host `~/.codex/auth.json` credentials into the agent.
- Allowing arbitrary host credential forwarding beyond the two providers
covered here (Codex ChatGPT/device-login and Claude OAuth).
- Hot-applying new authenticated Codex routes to an existing running
sidecar. The current hot-apply path cannot safely populate new token
env slots in an already-running container.
## Scope
### In scope
- Add `agent_provider.forward_host_credentials` to the bottle manifest
schema, defaulting to `false`.
- Support the flag for `agent_provider.template: codex`.
- Add `agent_provider.auth_token` to the bottle manifest schema.
- Support the field for `agent_provider.template: claude`: the named
host env var is forwarded only into the egress sidecar as the Bearer
token for `api.anthropic.com`, and a placeholder
`CLAUDE_CODE_OAUTH_TOKEN` is set in the agent so the Claude Code CLI
starts without a real credential.
- Remove the `claude_code_oauth` egress route role, which previously
required operators to declare the OAuth route manually. The provisioner
now injects it from `auth_token`.
- Read host Codex auth from `$CODEX_HOME/auth.json` when `CODEX_HOME` is
set, otherwise from `~/.codex/auth.json`.
- Extract only `tokens.access_token` for egress injection.
- Generate a dummy agent-side `auth.json` from the host auth file's
mode and key shape, without copying real token values.
- Validate that host auth is not API-key mode and the access token is
present, JWT-shaped, and not expired.
- Add or upgrade `api.openai.com` and `chatgpt.com` egress routes to
inject that access token via a shared `EGRESS_TOKEN_N` sidecar env
slot.
- Pass the extracted token only into the sidecar compose/run
environment, alongside other egress token values.
### Out of scope
- Sidecar-owned refresh using `tokens.refresh_token`.
- Sharing full Codex auth state with the agent.
- Supporting host credential forwarding for non-Codex providers.
## Design
### Manifest
Extend `agent_provider`:
```yaml
agent_provider:
template: codex
forward_host_credentials: true
```
The field defaults to `false`. If set on a non-Codex provider, manifest
validation should reject it until that provider has a concrete,
credential-minimizing implementation.
### Host auth extraction
At prepare/launch time, when the flag is enabled for Codex:
1. Resolve the host Codex home directory from `$CODEX_HOME`, falling
back to `~/.codex`.
2. Parse `auth.json`.
3. Require user/device auth mode rather than API-key auth.
4. Require a non-empty `tokens.access_token`.
5. Parse the JWT payload enough to require an `exp` claim in the future.
6. Return only the access token value to the launch path.
Errors should name the missing or invalid condition and point the
operator at `codex login --device-auth`, without printing token values.
### Egress route
When forwarding host Codex credentials, the effective egress route table
should contain authenticated `api.openai.com` and `chatgpt.com` routes.
If the bottle already declares either host as a bare-pass route, upgrade
it in the effective route table rather than requiring a duplicate
manifest entry. If the bottle already declares an authenticated route for
either host, fail rather than guessing whether to override
operator-provided auth, unless that route already uses the synthetic
Codex host credential token reference.
The rendered route should look like any other egress-owned auth route:
```yaml
routes:
- host: "api.openai.com"
auth_scheme: "Bearer"
token_env: "EGRESS_TOKEN_N"
- host: "chatgpt.com"
auth_scheme: "Bearer"
token_env: "EGRESS_TOKEN_N"
```
The access token value is supplied through the sidecar process
environment for that `EGRESS_TOKEN_N` slot. It must not be written to
`routes.yaml`, compose files, env files, logs, or user-facing output.
### Data flow
```mermaid
flowchart LR
H["Host ~/.codex/auth.json"] --> L["bot-bottle launch"]
L -->|access token only| S["egress sidecar env"]
L -->|dummy auth.json only| A
A["Codex agent"] -->|HTTPS via proxy, auth stripped| E["egress"]
E -->|Bearer injected from env| C["api.openai.com / chatgpt.com"]
```
## Implementation chunks
1. **PRD first.** Land this document as the first commit on the feature
branch.
2. **Manifest schema.** Add `forward_host_credentials`, validation, and
unit tests.
3. **Host Codex auth reader.** Add a small stdlib-only helper for
parsing and validating host Codex auth without printing values.
4. **Effective egress route.** Add/upgrade the Codex API routes when the
flag is enabled, and add tests for bare route upgrade,
missing-route insertion, and authenticated-route conflict.
5. **Agent auth marker.** Provision a dummy Codex `auth.json` into the
agent home so Codex selects the host's user/device auth branch while
real credentials stay in egress.
6. **Launch wiring.** Pass the host access token into the egress sidecar
env for Docker and smolmachines without exposing it to the agent.
7. **Docs and tests.** Update README examples and run the unit suite.
## Open questions
- Should a later version support sidecar refresh using the host refresh
token, or should restart-on-expiry remain the policy?
- Should telemetry hosts such as `ab.chatgpt.com` stay blocked by
default even when Codex ChatGPT auth is enabled?
## References
- Gitea issue #109: Codex ChatGPT auth should inject host access token
via egress.
- PRD 0017: Egress-proxy — universal MITM with path filtering + auth
injection.
- PRD 0026: Agent provider templates.
+136 -9
View File
@@ -2,21 +2,148 @@
from __future__ import annotations
import base64
import json
import tempfile
import unittest
from pathlib import Path
from bot_bottle.agent_provider import runtime_for
from bot_bottle.agent_provider import (
CODEX_HOST_CREDENTIAL_HOSTS,
agent_provision_plan,
runtime_for,
)
from bot_bottle.egress import CODEX_HOST_CREDENTIAL_TOKEN_REF
def _jwt(exp: int) -> str:
def enc(obj: dict) -> str:
raw = json.dumps(obj, separators=(",", ":")).encode()
return base64.urlsafe_b64encode(raw).decode().rstrip("=")
return f"{enc({'alg': 'none'})}.{enc({'exp': exp})}.sig"
class TestAgentProviderRuntime(unittest.TestCase):
def test_claude_keeps_oauth_placeholder(self):
runtime = runtime_for("claude")
self.assertEqual("claude_code_oauth", runtime.auth_role)
self.assertEqual("CLAUDE_CODE_OAUTH_TOKEN", runtime.placeholder_env)
def test_codex_plan_declares_home_state(self):
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
plan = agent_provision_plan(
template="codex",
dockerfile="/tmp/Dockerfile.codex",
state_dir=Path(tmp),
)
self.assertEqual("codex", plan.template)
self.assertEqual("codex", plan.command)
self.assertEqual("read_prompt_file", plan.prompt_mode)
self.assertEqual("/tmp/Dockerfile.codex", plan.dockerfile)
self.assertEqual(
"/etc/ssl/certs/ca-certificates.crt",
plan.env_vars["CODEX_CA_CERTIFICATE"],
)
self.assertEqual({}, plan.guest_env)
self.assertEqual(("/home/node/.codex",), tuple(d.guest_path for d in plan.dirs))
self.assertEqual(
("/home/node/.codex/config.toml",),
tuple(f.guest_path for f in plan.files),
)
def test_codex_does_not_inject_openai_api_key_placeholder(self):
runtime = runtime_for("codex")
self.assertEqual("", runtime.auth_role)
self.assertEqual("", runtime.placeholder_env)
def test_codex_forward_host_credentials_adds_auth_and_verify(self):
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
home = Path(tmp) / "host-codex"
home.mkdir()
(home / "auth.json").write_text(json.dumps({
"auth_mode": "chatgpt",
"tokens": {"access_token": _jwt(2000000000)},
}))
plan = agent_provision_plan(
template="codex",
dockerfile="",
state_dir=Path(tmp),
guest_env={"CODEX_HOME": "/run/codex-home"},
forward_host_credentials=True,
host_env={"CODEX_HOME": str(home)},
)
self.assertIn(
"/run/codex-home/auth.json",
{f.guest_path for f in plan.files},
)
self.assertEqual("/run/codex-home", plan.env_vars["CODEX_HOME"])
self.assertEqual(1, len(plan.pre_copy))
self.assertEqual(1, len(plan.verify))
self.assertIn("CODEX_HOME=/run/codex-home", plan.verify[0].argv)
def test_claude_with_auth_token_injects_provider_route_and_placeholder(self):
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
plan = agent_provision_plan(
template="claude",
dockerfile="/tmp/Dockerfile.claude",
state_dir=Path(tmp),
auth_token="BOT_BOTTLE_CLAUDE_OAUTH_TOKEN",
)
self.assertEqual(1, len(plan.egress_routes))
route = plan.egress_routes[0]
self.assertEqual("api.anthropic.com", route.host)
self.assertEqual("Bearer", route.auth_scheme)
self.assertEqual("BOT_BOTTLE_CLAUDE_OAUTH_TOKEN", route.token_ref)
self.assertTrue(route.tls_passthrough)
self.assertEqual("egress-placeholder", plan.env_vars["CLAUDE_CODE_OAUTH_TOKEN"])
self.assertEqual("1", plan.env_vars["CLAUDE_CODE_DISABLE_NONESSENTIAL_TRAFFIC"])
self.assertEqual("1", plan.env_vars["DISABLE_ERROR_REPORTING"])
self.assertEqual(frozenset({"CLAUDE_CODE_OAUTH_TOKEN"}), plan.hidden_env_names)
def test_codex_forward_host_credentials_populates_egress_routes(self):
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
home = Path(tmp) / "host-codex"
home.mkdir()
(home / "auth.json").write_text(json.dumps({
"auth_mode": "chatgpt",
"tokens": {"access_token": _jwt(2000000000)},
}))
plan = agent_provision_plan(
template="codex",
dockerfile="",
state_dir=Path(tmp),
forward_host_credentials=True,
host_env={"CODEX_HOME": str(home)},
)
hosts = [r.host for r in plan.egress_routes]
self.assertEqual(sorted(CODEX_HOST_CREDENTIAL_HOSTS), sorted(hosts))
for r in plan.egress_routes:
self.assertEqual("Bearer", r.auth_scheme)
self.assertEqual(CODEX_HOST_CREDENTIAL_TOKEN_REF, r.token_ref)
self.assertTrue(r.tls_passthrough)
def test_codex_without_forward_host_credentials_has_passthrough_egress_routes(self):
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
plan = agent_provision_plan(
template="codex",
dockerfile="",
state_dir=Path(tmp),
forward_host_credentials=False,
)
self.assertEqual(
{r.host for r in plan.egress_routes},
set(CODEX_HOST_CREDENTIAL_HOSTS),
)
for r in plan.egress_routes:
self.assertEqual("", r.auth_scheme)
self.assertEqual("", r.token_ref)
self.assertTrue(r.tls_passthrough)
def test_claude_without_auth_token_has_passthrough_egress_route(self):
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
plan = agent_provision_plan(
template="claude",
dockerfile="",
state_dir=Path(tmp),
)
self.assertEqual(1, len(plan.egress_routes))
route = plan.egress_routes[0]
self.assertEqual("api.anthropic.com", route.host)
self.assertEqual("", route.auth_scheme)
self.assertEqual("", route.token_ref)
self.assertTrue(route.tls_passthrough)
self.assertNotIn("CLAUDE_CODE_OAUTH_TOKEN", plan.env_vars)
self.assertEqual(frozenset(), plan.hidden_env_names)
if __name__ == "__main__":
+207
View File
@@ -0,0 +1,207 @@
"""Unit: host Codex auth extraction."""
from __future__ import annotations
import base64
import json
import tempfile
import unittest
from datetime import datetime, timezone
from pathlib import Path
from bot_bottle.codex_auth import (
codex_auth_path,
codex_dummy_auth_json,
codex_host_access_token,
)
from bot_bottle.log import Die
def _jwt(exp: int) -> str:
def enc(obj: dict) -> str:
raw = json.dumps(obj, separators=(",", ":")).encode()
return base64.urlsafe_b64encode(raw).decode().rstrip("=")
return f"{enc({'alg': 'none'})}.{enc({'exp': exp})}.sig"
def _jwt_payload(token: str) -> dict:
payload = token.split(".")[1]
payload += "=" * (-len(payload) % 4)
return json.loads(base64.urlsafe_b64decode(payload.encode()).decode())
class TestCodexHostAccessToken(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.TemporaryDirectory(prefix="cb-codex-auth.")
self.home = Path(self.tmp.name)
self.auth_path = self.home / "auth.json"
def tearDown(self):
self.tmp.cleanup()
def _write(self, payload: dict) -> None:
self.auth_path.write_text(json.dumps(payload))
def test_auth_path_uses_codex_home(self):
self.assertEqual(
self.auth_path,
codex_auth_path({"CODEX_HOME": str(self.home)}),
)
def test_returns_fresh_chatgpt_access_token(self):
token = _jwt(2000000000)
self._write({
"auth_mode": "chatgpt",
"tokens": {"access_token": token, "refresh_token": "hidden"},
})
out = codex_host_access_token(
{"CODEX_HOME": str(self.home)},
now=datetime(2026, 1, 1, tzinfo=timezone.utc),
)
self.assertEqual(token, out)
def test_missing_auth_file_dies(self):
with self.assertRaises(Die):
codex_host_access_token({"CODEX_HOME": str(self.home)})
def test_non_chatgpt_auth_dies(self):
self._write({"auth_mode": "api_key", "tokens": {"access_token": _jwt(2)}})
with self.assertRaises(Die):
codex_host_access_token({"CODEX_HOME": str(self.home)})
def test_user_auth_mode_is_allowed(self):
token = _jwt(2000000000)
self._write({"auth_mode": "user", "tokens": {"access_token": token}})
out = codex_host_access_token(
{"CODEX_HOME": str(self.home)},
now=datetime(2026, 1, 1, tzinfo=timezone.utc),
)
self.assertEqual(token, out)
def test_expired_token_dies(self):
self._write({
"auth_mode": "chatgpt",
"tokens": {"access_token": _jwt(1000)},
})
with self.assertRaises(Die):
codex_host_access_token(
{"CODEX_HOME": str(self.home)},
now=datetime(2026, 1, 1, tzinfo=timezone.utc),
)
def test_non_jwt_token_dies(self):
self._write({
"auth_mode": "chatgpt",
"tokens": {"access_token": "not-a-jwt"},
})
with self.assertRaises(Die):
codex_host_access_token({"CODEX_HOME": str(self.home)})
def test_dummy_auth_preserves_mode_and_redacts_tokens(self):
access = _jwt(2000000000)
refresh = "host-refresh-token"
self._write({
"auth_mode": "chatgpt",
"OPENAI_API_KEY": None,
"tokens": {
"access_token": access,
"id_token": _jwt(2000000000),
"refresh_token": refresh,
"account_id": "acct-host",
},
"last_refresh": "2026-05-29T00:00:00.000Z",
})
dummy = json.loads(codex_dummy_auth_json(
{"CODEX_HOME": str(self.home)},
now=datetime(2026, 1, 1, tzinfo=timezone.utc),
))
self.assertEqual("chatgpt", dummy["auth_mode"])
self.assertIsNone(dummy["OPENAI_API_KEY"])
self.assertNotEqual(access, dummy["tokens"]["access_token"])
self.assertNotEqual(refresh, dummy["tokens"]["refresh_token"])
self.assertEqual("bot-bottle-placeholder", dummy["tokens"]["refresh_token"])
self.assertEqual("acct-host", dummy["tokens"]["account_id"])
self.assertIsNotNone(
codex_host_access_token(
{"CODEX_HOME": str(self.home)},
now=datetime(2026, 1, 1, tzinfo=timezone.utc),
)
)
def test_dummy_auth_tokens_inherit_host_token_exp(self):
# Codex refreshes when its local access token is at/past exp;
# the dummy must carry the host token's real exp so Codex does
# not drop to the sign-in screen after an artificial TTL while
# egress still holds a valid bearer.
host_exp = 2000000000
self._write({
"auth_mode": "chatgpt",
"tokens": {
"access_token": _jwt(host_exp),
"id_token": _jwt(host_exp),
"refresh_token": "hidden",
},
})
dummy = json.loads(codex_dummy_auth_json(
{"CODEX_HOME": str(self.home)},
now=datetime(2026, 1, 1, tzinfo=timezone.utc),
))
self.assertEqual(
host_exp, _jwt_payload(dummy["tokens"]["access_token"])["exp"],
)
self.assertEqual(
host_exp, _jwt_payload(dummy["tokens"]["id_token"])["exp"],
)
def test_dummy_auth_keeps_required_account_claim_shape(self):
def jwt(payload: dict) -> str:
def enc(obj: dict) -> str:
raw = json.dumps(obj, separators=(",", ":")).encode()
return base64.urlsafe_b64encode(raw).decode().rstrip("=")
return f"{enc({'alg': 'none'})}.{enc(payload)}.sig"
self._write({
"auth_mode": "chatgpt",
"tokens": {
"access_token": jwt({
"exp": 2000000000,
"https://api.openai.com/auth": {
"chatgpt_plan_type": "plus",
"chatgpt_account_id": "acct-real",
"chatgpt_user_id": "user-real",
"user_id": "auth-user-real",
"localhost": True,
},
"https://api.openai.com/profile": {
"email": "real@example.invalid",
"email_verified": True,
},
}),
"id_token": jwt({
"exp": 2000000000,
"email": "real@example.invalid",
"email_verified": True,
"https://api.openai.com/auth": {
"chatgpt_plan_type": "plus",
"chatgpt_account_id": "acct-real",
},
}),
"refresh_token": "hidden",
},
})
dummy = json.loads(codex_dummy_auth_json(
{"CODEX_HOME": str(self.home)},
now=datetime(2026, 1, 1, tzinfo=timezone.utc),
))
access_payload = _jwt_payload(dummy["tokens"]["access_token"])
auth = access_payload["https://api.openai.com/auth"]
profile = access_payload["https://api.openai.com/profile"]
self.assertEqual("plus", auth["chatgpt_plan_type"])
self.assertEqual("acct-real", auth["chatgpt_account_id"])
self.assertEqual("bot-bottle-placeholder", auth["chatgpt_user_id"])
self.assertEqual("bot-bottle@example.invalid", profile["email"])
self.assertTrue(profile["email_verified"])
if __name__ == "__main__":
unittest.main()
+23
View File
@@ -14,6 +14,7 @@ import unittest
from pathlib import Path
from unittest import mock
from bot_bottle.agent_provider import AgentProvisionPlan
from bot_bottle.backend import BottleSpec
from bot_bottle.backend.docker.bottle_plan import DockerBottlePlan
from bot_bottle.backend.docker.compose import (
@@ -180,6 +181,14 @@ def _plan(
egress_plan=_egress_plan(routes),
supervise_plan=_supervise_plan() if supervise else None,
use_runsc=False,
agent_provision=AgentProvisionPlan(
template="claude",
command="claude",
prompt_mode="append_file",
image="bot-bottle-claude:latest",
dockerfile="",
guest_env={},
),
)
@@ -250,6 +259,20 @@ class TestAgentAlwaysPresent(unittest.TestCase):
s = bottle_plan_to_compose(_plan())["services"]["agent"]
self.assertIn("CLAUDE_CODE_OAUTH_TOKEN", s["environment"])
def test_agent_provider_env_uses_literal_values(self):
plan = _plan()
provision = AgentProvisionPlan(
template="codex",
command="codex",
prompt_mode="read_prompt_file",
image="bot-bottle-codex:latest",
dockerfile="",
guest_env={"CODEX_HOME": "/home/node/.codex"},
)
plan = type(plan)(**{**vars(plan), "agent_provision": provision})
s = bottle_plan_to_compose(plan)["services"]["agent"]
self.assertIn("CODEX_HOME=/home/node/.codex", s["environment"])
def test_agent_runsc_runtime(self):
plan = _plan()
plan = type(plan)(**{**vars(plan), "use_runsc": True})
@@ -13,6 +13,7 @@ import unittest
from pathlib import Path
from unittest.mock import patch
from bot_bottle.agent_provider import AgentProvisionPlan
from bot_bottle.backend import BottleSpec
from bot_bottle.backend.docker.bottle_plan import DockerBottlePlan
from bot_bottle.backend.docker.provision import git as _git
@@ -66,6 +67,14 @@ def _plan(*, git_user: dict | None = None,
),
supervise_plan=None,
use_runsc=False,
agent_provision=AgentProvisionPlan(
template="claude",
command="claude",
prompt_mode="append_file",
image="bot-bottle-claude:latest",
dockerfile="",
guest_env={},
),
)
@@ -0,0 +1,188 @@
"""Unit: docker provider auth marker provisioning."""
from __future__ import annotations
import unittest
from pathlib import Path
from unittest.mock import patch
from bot_bottle.agent_provider import (
AgentProvisionDir,
AgentProvisionFile,
AgentProvisionPlan,
)
from bot_bottle.backend import BottleSpec
from bot_bottle.backend.docker.bottle_plan import DockerBottlePlan
from bot_bottle.backend.docker.provision import provider_auth as _provider_auth
from bot_bottle.egress import EgressPlan
from bot_bottle.git_gate import GitGatePlan
from bot_bottle.manifest import Manifest
from bot_bottle.pipelock import PipelockProxyPlan
def _plan(
*,
codex_auth_file: Path | None = None,
agent_provider_template: str = "codex",
) -> DockerBottlePlan:
manifest = Manifest.from_json_obj({
"bottles": {"dev": {"agent_provider": {"template": "codex"}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
})
return DockerBottlePlan(
spec=BottleSpec(
manifest=manifest,
agent_name="demo",
copy_cwd=False,
user_cwd="/tmp/x",
),
stage_dir=Path("/tmp/stage"),
slug="demo-abc12",
container_name="bot-bottle-demo-abc12",
container_name_pinned=False,
image="bot-bottle-codex:latest",
derived_image="",
runtime_image="bot-bottle-codex:latest",
dockerfile_path="",
env_file=Path("/tmp/agent.env"),
forwarded_env={},
prompt_file=Path("/tmp/prompt.txt"),
proxy_plan=PipelockProxyPlan(
yaml_path=Path("/tmp/pipelock.yaml"),
slug="demo-abc12",
),
git_gate_plan=GitGatePlan(
slug="demo-abc12",
entrypoint_script=Path("/tmp/git-gate-entrypoint.sh"),
hook_script=Path("/tmp/git-gate-hook"),
access_hook_script=Path("/tmp/git-gate-access-hook"),
upstreams=(),
),
egress_plan=EgressPlan(
slug="demo-abc12",
routes_path=Path("/tmp/routes.yaml"),
routes=(),
token_env_map={},
),
supervise_plan=None,
use_runsc=False,
agent_provision=_agent_provision(
agent_provider_template, codex_auth_file=codex_auth_file,
),
)
def _agent_provision(
template: str, *, codex_auth_file: Path | None = None,
) -> AgentProvisionPlan:
if template != "codex":
return AgentProvisionPlan(
template=template,
command=template,
prompt_mode="append_file",
image="",
dockerfile="",
guest_env={},
)
files = [
AgentProvisionFile(
Path("/tmp/codex-config.toml"),
"/home/node/.codex/config.toml",
),
]
if codex_auth_file is not None:
files.append(AgentProvisionFile(
codex_auth_file,
"/home/node/.codex/auth.json",
))
return AgentProvisionPlan(
template="codex",
command="codex",
prompt_mode="read_prompt_file",
image="bot-bottle-codex:latest",
dockerfile="",
guest_env={},
dirs=(AgentProvisionDir("/home/node/.codex"),),
files=tuple(files),
)
class TestProvisionProviderAuth(unittest.TestCase):
def test_noop_for_non_codex_provider(self):
with patch.object(_provider_auth.subprocess, "run") as run:
_provider_auth.provision_provider_auth(
_plan(agent_provider_template="claude"), "bot-bottle-demo-abc12",
)
self.assertEqual(0, run.call_count)
def test_codex_provider_trusts_launch_dir_without_auth_file(self):
with patch.object(_provider_auth.subprocess, "run") as run:
_provider_auth.provision_provider_auth(
_plan(), "bot-bottle-demo-abc12",
)
argvs = [call.args[0] for call in run.call_args_list]
self.assertIn(
["docker", "exec", "-u", "0", "bot-bottle-demo-abc12",
"mkdir", "-p", "/home/node/.codex"],
argvs,
)
trust_config = next(
a for a in argvs
if a[:2] == ["docker", "cp"] and a[2] == "/tmp/codex-config.toml"
)
self.assertEqual(
"bot-bottle-demo-abc12:/home/node/.codex/config.toml",
trust_config[3],
)
self.assertIn(
["docker", "exec", "-u", "0", "bot-bottle-demo-abc12",
"chown", "node:node", "/home/node/.codex/config.toml"],
argvs,
)
self.assertIn(
["docker", "exec", "-u", "0", "bot-bottle-demo-abc12",
"chmod", "600", "/home/node/.codex/config.toml"],
argvs,
)
def test_copies_dummy_auth_json_to_codex_home(self):
with patch.object(_provider_auth.subprocess, "run") as run:
_provider_auth.provision_provider_auth(
_plan(codex_auth_file=Path("/tmp/codex-auth.json")),
"bot-bottle-demo-abc12",
)
argvs = [call.args[0] for call in run.call_args_list]
self.assertIn(
["docker", "exec", "-u", "0", "bot-bottle-demo-abc12",
"mkdir", "-p", "/home/node/.codex"],
argvs,
)
self.assertIn(
["docker", "exec", "-u", "0", "bot-bottle-demo-abc12",
"chown", "node:node", "/home/node/.codex"],
argvs,
)
self.assertIn(
["docker", "exec", "-u", "0", "bot-bottle-demo-abc12",
"chmod", "700", "/home/node/.codex"],
argvs,
)
self.assertIn(
["docker", "cp", "/tmp/codex-auth.json",
"bot-bottle-demo-abc12:/home/node/.codex/auth.json"],
argvs,
)
self.assertIn(
["docker", "exec", "-u", "0", "bot-bottle-demo-abc12",
"chown", "node:node", "/home/node/.codex/auth.json"],
argvs,
)
self.assertIn(
["docker", "exec", "-u", "0", "bot-bottle-demo-abc12",
"chmod", "600", "/home/node/.codex/auth.json"],
argvs,
)
if __name__ == "__main__":
unittest.main()
+134 -3
View File
@@ -4,6 +4,8 @@ resolution (PRD 0017)."""
import unittest
from bot_bottle.egress import (
CODEX_HOST_CREDENTIAL_TOKEN_REF,
EgressRoute,
egress_manifest_routes,
egress_render_routes,
egress_resolve_token_values,
@@ -22,6 +24,15 @@ def _bottle(routes):
}).bottles["dev"]
def _provider_route(host: str, token_ref: str, *, tls_passthrough: bool = False) -> EgressRoute:
return EgressRoute(
host=host,
auth_scheme="Bearer",
token_ref=token_ref,
tls_passthrough=tls_passthrough,
)
class TestRoutesForBottle(unittest.TestCase):
def test_authenticated_route_gets_slot(self):
b = _bottle([{
@@ -84,9 +95,8 @@ class TestRoutesForBottle(unittest.TestCase):
self.assertEqual("", routes[1].token_env)
class TestRoutesForBottleUsesManifestOnly(unittest.TestCase):
"""The effective route table is exactly the manifest-declared
routes. Provider defaults are not injected implicitly."""
class TestRoutesForBottleManifestOnly(unittest.TestCase):
"""Without provider routes the effective table is exactly the manifest."""
def test_no_manifest_routes_means_no_effective_routes(self):
b = _bottle([])
@@ -107,6 +117,120 @@ class TestRoutesForBottleUsesManifestOnly(unittest.TestCase):
effective = [r.host for r in egress_routes_for_bottle(b)]
self.assertEqual(["x.example"], effective)
def test_tls_passthrough_lifted_from_manifest(self):
b = _bottle([{
"host": "api.openai.com",
"auth": {"scheme": "Bearer", "token_ref": "T"},
"pipelock": {"tls_passthrough": True},
}])
routes = egress_routes_for_bottle(b)
self.assertTrue(routes[0].tls_passthrough)
def test_tls_passthrough_false_by_default(self):
b = _bottle([{"host": "api.github.com"}])
routes = egress_routes_for_bottle(b)
self.assertFalse(routes[0].tls_passthrough)
class TestProviderRouteMerge(unittest.TestCase):
"""Provider routes are merged into manifest routes generically."""
def test_provider_route_appended_when_not_in_manifest(self):
b = _bottle([])
pr = _provider_route("api.openai.com", "TOK")
routes = egress_routes_for_bottle(b, (pr,))
self.assertEqual(1, len(routes))
self.assertEqual("api.openai.com", routes[0].host)
self.assertEqual("Bearer", routes[0].auth_scheme)
self.assertEqual("EGRESS_TOKEN_0", routes[0].token_env)
self.assertEqual("TOK", routes[0].token_ref)
def test_unauthenticated_provider_route_appends_without_token_slot(self):
b = _bottle([])
pr = EgressRoute(host="api.openai.com", tls_passthrough=True)
routes = egress_routes_for_bottle(b, (pr,))
self.assertEqual(1, len(routes))
self.assertEqual("api.openai.com", routes[0].host)
self.assertEqual("", routes[0].auth_scheme)
self.assertEqual("", routes[0].token_env)
self.assertEqual("", routes[0].token_ref)
self.assertEqual({}, egress_token_env_map(routes))
def test_unauthenticated_provider_route_upgrades_bare_without_token_slot(self):
b = _bottle([{"host": "api.openai.com"}])
pr = EgressRoute(host="api.openai.com", tls_passthrough=True)
routes = egress_routes_for_bottle(b, (pr,))
self.assertEqual(1, len(routes))
self.assertEqual("", routes[0].auth_scheme)
self.assertEqual("", routes[0].token_env)
self.assertEqual("", routes[0].token_ref)
self.assertTrue(routes[0].tls_passthrough)
self.assertEqual({}, egress_token_env_map(routes))
def test_two_provider_routes_with_same_token_ref_share_slot(self):
b = _bottle([])
routes = egress_routes_for_bottle(b, (
_provider_route("api.openai.com", CODEX_HOST_CREDENTIAL_TOKEN_REF),
_provider_route("chatgpt.com", CODEX_HOST_CREDENTIAL_TOKEN_REF),
))
self.assertEqual(["api.openai.com", "chatgpt.com"], [r.host for r in routes])
self.assertEqual("EGRESS_TOKEN_0", routes[0].token_env)
self.assertEqual("EGRESS_TOKEN_0", routes[1].token_env)
def test_provider_route_upgrades_bare_manifest_route(self):
b = _bottle([{"host": "chatgpt.com", "path_allowlist": ["/backend-api/"]}])
pr = _provider_route("chatgpt.com", CODEX_HOST_CREDENTIAL_TOKEN_REF)
routes = egress_routes_for_bottle(b, (pr,))
self.assertEqual(1, len(routes))
self.assertEqual("chatgpt.com", routes[0].host)
self.assertEqual("Bearer", routes[0].auth_scheme)
self.assertEqual("EGRESS_TOKEN_0", routes[0].token_env)
self.assertEqual(CODEX_HOST_CREDENTIAL_TOKEN_REF, routes[0].token_ref)
self.assertEqual(("/backend-api/",), routes[0].path_allowlist)
def test_provider_route_noop_when_same_auth_already_in_manifest(self):
b = _bottle([{
"host": "api.openai.com",
"auth": {"scheme": "Bearer", "token_ref": CODEX_HOST_CREDENTIAL_TOKEN_REF},
}])
pr = _provider_route("api.openai.com", CODEX_HOST_CREDENTIAL_TOKEN_REF)
routes = egress_routes_for_bottle(b, (pr,))
self.assertEqual(1, len(routes))
self.assertEqual("EGRESS_TOKEN_0", routes[0].token_env)
def test_provider_route_upgrades_tls_passthrough_on_existing_same_auth(self):
b = _bottle([{
"host": "api.openai.com",
"auth": {"scheme": "Bearer", "token_ref": CODEX_HOST_CREDENTIAL_TOKEN_REF},
}])
pr = _provider_route(
"api.openai.com", CODEX_HOST_CREDENTIAL_TOKEN_REF, tls_passthrough=True,
)
routes = egress_routes_for_bottle(b, (pr,))
self.assertEqual(1, len(routes))
self.assertTrue(routes[0].tls_passthrough)
def test_provider_route_conflicts_with_different_authed_manifest_route(self):
b = _bottle([{
"host": "chatgpt.com",
"auth": {"scheme": "Bearer", "token_ref": "OTHER"},
}])
pr = _provider_route("chatgpt.com", CODEX_HOST_CREDENTIAL_TOKEN_REF)
with self.assertRaises(Die):
egress_routes_for_bottle(b, (pr,))
def test_provider_route_tls_passthrough_set_on_appended_route(self):
b = _bottle([])
pr = _provider_route("api.openai.com", "TOK", tls_passthrough=True)
routes = egress_routes_for_bottle(b, (pr,))
self.assertTrue(routes[0].tls_passthrough)
def test_provider_route_tls_passthrough_set_on_upgraded_bare_route(self):
b = _bottle([{"host": "api.openai.com"}])
pr = _provider_route("api.openai.com", "TOK", tls_passthrough=True)
routes = egress_routes_for_bottle(b, (pr,))
self.assertTrue(routes[0].tls_passthrough)
class TestTokenEnvMap(unittest.TestCase):
def test_only_authenticated_routes_contribute(self):
@@ -217,6 +341,13 @@ class TestResolveTokenValues(unittest.TestCase):
{"GH_PAT": ""},
)
def test_codex_host_credential_ref_is_resolved_by_launch(self):
out = egress_resolve_token_values(
{"EGRESS_TOKEN_0": CODEX_HOST_CREDENTIAL_TOKEN_REF},
{},
)
self.assertEqual({}, out)
if __name__ == "__main__":
unittest.main()
+66 -61
View File
@@ -29,6 +29,13 @@ def _provider_bottle(provider, routes):
}).bottles["dev"]
def _provider_config_bottle(agent_provider):
return Manifest.from_json_obj({
"bottles": {"dev": {"agent_provider": agent_provider}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
}).bottles["dev"]
class TestMinimalRoute(unittest.TestCase):
def test_host_only(self):
b = _bottle([{"host": "api.example.com"}])
@@ -52,6 +59,58 @@ class TestMinimalRoute(unittest.TestCase):
_bottle([{"host": "x.example", "wat": "yes"}])
class TestAgentProviderHostCredentials(unittest.TestCase):
def test_forward_host_credentials_defaults_false(self):
b = _provider_config_bottle({"template": "codex"})
self.assertFalse(b.agent_provider.forward_host_credentials)
def test_forward_host_credentials_allowed_for_codex(self):
b = _provider_config_bottle({
"template": "codex",
"forward_host_credentials": True,
})
self.assertTrue(b.agent_provider.forward_host_credentials)
def test_forward_host_credentials_must_be_boolean(self):
with self.assertRaises(ManifestError):
_provider_config_bottle({
"template": "codex",
"forward_host_credentials": "yes",
})
def test_forward_host_credentials_rejected_for_claude(self):
with self.assertRaises(ManifestError):
_provider_config_bottle({
"template": "claude",
"forward_host_credentials": True,
})
def test_auth_token_defaults_empty(self):
b = _provider_config_bottle({"template": "claude"})
self.assertEqual("", b.agent_provider.auth_token)
def test_auth_token_allowed_for_claude(self):
b = _provider_config_bottle({
"template": "claude",
"auth_token": "BOT_BOTTLE_CLAUDE_OAUTH_TOKEN",
})
self.assertEqual("BOT_BOTTLE_CLAUDE_OAUTH_TOKEN", b.agent_provider.auth_token)
def test_auth_token_must_be_string(self):
with self.assertRaises(ManifestError):
_provider_config_bottle({
"template": "claude",
"auth_token": 42,
})
def test_auth_token_rejected_for_codex(self):
with self.assertRaises(ManifestError):
_provider_config_bottle({
"template": "codex",
"auth_token": "SOME_TOKEN",
})
class TestPathAllowlist(unittest.TestCase):
def test_optional(self):
b = _bottle([{"host": "x.example"}])
@@ -144,29 +203,12 @@ class TestRole(unittest.TestCase):
b = _bottle([{"host": "x.example"}])
self.assertEqual((), b.egress.routes[0].Role)
def test_string_normalizes_to_tuple(self):
b = _bottle([{
"host": "api.anthropic.com",
"role": "claude_code_oauth",
"auth": {"scheme": "Bearer", "token_ref": "T"},
}])
self.assertEqual(("claude_code_oauth",),
b.egress.routes[0].Role)
def test_list_supported(self):
b = _bottle([{
"host": "api.anthropic.com",
"role": ["claude_code_oauth"],
"auth": {"scheme": "Bearer", "token_ref": "T"},
}])
self.assertEqual(("claude_code_oauth",),
b.egress.routes[0].Role)
def test_unknown_role_rejected(self):
# The role enum is locked down — typos shouldn't silently
# become no-op markers.
with self.assertRaises(ManifestError):
_bottle([{"host": "x.example", "role": "totally-made-up"}])
def test_any_role_rejected(self):
# All former roles removed; the field is reserved for future use.
for role in ("claude_code_oauth", "codex_auth", "totally-made-up"):
with self.subTest(role=role):
with self.assertRaises(ManifestError):
_bottle([{"host": "x.example", "role": role}])
def test_non_string_role_rejected(self):
with self.assertRaises(ManifestError):
@@ -174,44 +216,7 @@ class TestRole(unittest.TestCase):
def test_list_with_non_string_item_rejected(self):
with self.assertRaises(ManifestError):
_bottle([{"host": "x.example",
"role": ["claude_code_oauth", 42]}])
def test_singleton_claude_code_oauth_enforced(self):
# Two routes both claiming the role would make "which one
# drives the placeholder env?" ambiguous.
with self.assertRaises(ManifestError):
_bottle([
{"host": "api.anthropic.com", "role": "claude_code_oauth",
"auth": {"scheme": "Bearer", "token_ref": "T1"}},
{"host": "api2.anthropic.example",
"role": "claude_code_oauth",
"auth": {"scheme": "Bearer", "token_ref": "T2"}},
])
def test_codex_auth_role_allowed_for_codex_provider(self):
b = _provider_bottle("codex", [{
"host": "api.openai.com",
"role": "codex_auth",
"auth": {"scheme": "Bearer", "token_ref": "OPENAI_TOKEN"},
}])
self.assertEqual(("codex_auth",), b.egress.routes[0].Role)
def test_claude_role_rejected_for_codex_provider(self):
with self.assertRaises(ManifestError):
_provider_bottle("codex", [{
"host": "api.anthropic.com",
"role": "claude_code_oauth",
"auth": {"scheme": "Bearer", "token_ref": "T"},
}])
def test_codex_role_rejected_for_default_claude_provider(self):
with self.assertRaises(ManifestError):
_bottle([{
"host": "api.openai.com",
"role": "codex_auth",
"auth": {"scheme": "Bearer", "token_ref": "T"},
}])
_bottle([{"host": "x.example", "role": ["x", 42]}])
class TestPipelockPolicy(unittest.TestCase):
+27
View File
@@ -5,6 +5,8 @@ git-gate (PRD 0008)."""
import unittest
from bot_bottle.agent_provider import CODEX_HOST_CREDENTIAL_HOSTS
from bot_bottle.egress import CODEX_HOST_CREDENTIAL_TOKEN_REF, EgressRoute
from bot_bottle.manifest import Manifest
from bot_bottle.pipelock import (
pipelock_effective_allowlist,
@@ -113,6 +115,31 @@ class TestTlsPassthrough(unittest.TestCase):
])))
self.assertEqual(["api.openai.com"], passthrough)
def test_forward_host_credentials_passes_through_codex_hosts(self):
# Egress injects the host bearer on the Codex API hosts; pipelock
# must pass them through or its header DLP blocks the injected JWT
# ("request header contains secret"). Provider routes carry
# tls_passthrough=True; pipelock reads this via egress_routes_for_bottle.
provider_routes = tuple(
EgressRoute(
host=host,
auth_scheme="Bearer",
token_ref=CODEX_HOST_CREDENTIAL_TOKEN_REF,
tls_passthrough=True,
)
for host in CODEX_HOST_CREDENTIAL_HOSTS
)
passthrough = pipelock_effective_tls_passthrough(
_bottle({}), provider_routes,
)
self.assertEqual(["api.openai.com", "chatgpt.com"], passthrough)
def test_no_codex_passthrough_without_provider_routes(self):
passthrough = pipelock_effective_tls_passthrough(_bottle({
"agent_provider": {"template": "codex"},
}))
self.assertEqual([], passthrough)
class TestSsrfIpAllowlist(unittest.TestCase):
def test_default_empty(self):
+4 -4
View File
@@ -8,21 +8,21 @@ from bot_bottle.backend.print_util import visible_agent_env_names
class TestVisibleAgentEnvNames(unittest.TestCase):
def test_codex_shows_openai_api_key_if_user_declares_it(self):
def test_shows_all_when_no_hidden_names(self):
self.assertEqual(
["CUSTOM", "OPENAI_API_KEY"],
visible_agent_env_names(
["OPENAI_API_KEY", "CUSTOM"],
agent_provider_template="codex",
hidden_env_names=frozenset(),
),
)
def test_hides_only_active_provider_placeholder(self):
def test_hides_provider_placeholder(self):
self.assertEqual(
["CUSTOM", "OPENAI_API_KEY"],
visible_agent_env_names(
["CLAUDE_CODE_OAUTH_TOKEN", "OPENAI_API_KEY", "CUSTOM"],
agent_provider_template="claude",
hidden_env_names=frozenset({"CLAUDE_CODE_OAUTH_TOKEN"}),
),
)
+4 -5
View File
@@ -59,10 +59,9 @@ class TestClaudeArgvWrapped(unittest.TestCase):
"smolvm", "machine", "exec", "--name",
"bot-bottle-dev-abc",
"-i", "-t",
"-e", "HOME=/home/node",
"-e", "USER=node",
"--",
"runuser", "-u", "node", "--",
"env", "HOME=/home/node", "USER=node",
"claude",
],
argv,
@@ -107,7 +106,7 @@ class TestClaudeArgvWrapped(unittest.TestCase):
HTTPS_PROXY="http://127.0.0.1:1234",
NO_PROXY="localhost",
).agent_argv([]))
self.assertIn("-e", argv)
self.assertIn("env", argv)
self.assertIn("HTTPS_PROXY=http://127.0.0.1:1234", argv)
self.assertIn("NO_PROXY=localhost", argv)
@@ -119,8 +118,8 @@ class TestClaudeArgvWrapped(unittest.TestCase):
argv = _bottle().agent_argv([])
agent_idx = argv.index("claude")
self.assertEqual(
["runuser", "-u", "node", "--"],
argv[agent_idx - 4:agent_idx],
["runuser", "-u", "node", "--", "env"],
argv[agent_idx - 7:agent_idx - 2],
)
+249 -1
View File
@@ -13,6 +13,12 @@ from dataclasses import replace
from pathlib import Path
from unittest.mock import patch
from bot_bottle.agent_provider import (
AgentProvisionCommand,
AgentProvisionDir,
AgentProvisionFile,
AgentProvisionPlan,
)
from bot_bottle.backend import BottleSpec
from bot_bottle.backend.smolmachines.bottle_plan import (
SmolmachinesBottlePlan,
@@ -21,6 +27,7 @@ from bot_bottle.backend.smolmachines.provision import (
ca as _ca,
git as _git,
prompt as _prompt,
provider_auth as _provider_auth,
skills as _skills,
supervise as _supervise,
)
@@ -55,6 +62,9 @@ def _plan(
bundle_ip: str = "192.168.50.2",
agent_git_gate_host: str = "127.0.0.1:55555",
agent_supervise_url: str = "http://127.0.0.1:55556/",
codex_auth_file: Path | None = None,
agent_provider_template: str = "claude",
guest_env: dict[str, str] | None = None,
) -> SmolmachinesBottlePlan:
bottle_json: dict = {}
git_json: dict = {}
@@ -105,7 +115,7 @@ def _plan(
bundle_ip=bundle_ip,
machine_name="bot-bottle-demo-abc12",
agent_image_ref="bot-bottle-claude:latest",
guest_env={},
guest_env=dict(guest_env or {}),
prompt_file=Path("/tmp/state/demo-abc12/agent/prompt.txt"),
proxy_plan=PipelockProxyPlan(
yaml_path=Path("/tmp/pipelock.yaml"),
@@ -129,6 +139,69 @@ def _plan(
supervise_plan=supervise_plan,
agent_git_gate_host=agent_git_gate_host,
agent_supervise_url=agent_supervise_url,
agent_provision=_agent_provision(
agent_provider_template,
codex_auth_file=codex_auth_file,
guest_env=dict(guest_env or {}),
),
)
def _agent_provision(
template: str,
*,
codex_auth_file: Path | None = None,
guest_env: dict[str, str] | None = None,
) -> AgentProvisionPlan:
if template != "codex":
return AgentProvisionPlan(
template=template,
command=template,
prompt_mode="append_file",
image="",
dockerfile="",
guest_env=dict(guest_env or {}),
)
auth_dir = (guest_env or {}).get("CODEX_HOME", "/home/node/.codex")
files = [
AgentProvisionFile(
Path("/tmp/codex-config.toml"),
f"{auth_dir}/config.toml",
),
]
pre_copy: tuple[AgentProvisionCommand, ...] = ()
verify: tuple[AgentProvisionCommand, ...] = ()
if codex_auth_file is not None:
files.append(AgentProvisionFile(codex_auth_file, f"{auth_dir}/auth.json"))
pre_copy = (AgentProvisionCommand((
"find", auth_dir,
"-maxdepth", "1",
"-type", "f",
"(",
"-name", "*.sqlite",
"-o", "-name", "*.sqlite-*",
"-o", "-name", "*.codex-repair-*.bak",
")",
"-delete",
), "codex host credentials: could not reset runtime db files"),)
verify = (AgentProvisionCommand((
"runuser", "-u", "node", "--",
"env",
"HOME=/home/node",
f"CODEX_HOME={auth_dir}",
"codex", "login", "status",
), "codex host credentials: dummy auth was copied into the guest"),)
return AgentProvisionPlan(
template="codex",
command="codex",
prompt_mode="read_prompt_file",
image="bot-bottle-codex:latest",
dockerfile="",
guest_env=dict(guest_env or {}),
dirs=(AgentProvisionDir(auth_dir),),
files=tuple(files),
pre_copy=pre_copy,
verify=verify,
)
@@ -189,6 +262,181 @@ class TestProvisionPrompt(unittest.TestCase):
)
class TestProvisionProviderAuth(unittest.TestCase):
def _patch(self):
return (
patch(
"bot_bottle.backend.smolmachines.provision.provider_auth._smolvm.machine_cp"
),
patch(
"bot_bottle.backend.smolmachines.provision.provider_auth._smolvm.machine_exec"
),
)
def test_noop_for_non_codex_provider(self):
cp_p, ex_p = self._patch()
with cp_p as cp, ex_p as ex:
_provider_auth.provision_provider_auth(_plan(), "bot-bottle-demo-abc12")
self.assertEqual(0, cp.call_count)
self.assertEqual(0, ex.call_count)
def test_codex_provider_trusts_launch_dir_without_auth_file(self):
cp_p, ex_p = self._patch()
with cp_p as cp, ex_p as ex:
ex.return_value = SmolvmRunResult(0, "", "")
_provider_auth.provision_provider_auth(
_plan(agent_provider_template="codex"),
"bot-bottle-demo-abc12",
)
cp.assert_called_once_with(
"/tmp/codex-config.toml",
"bot-bottle-demo-abc12:/home/node/.codex/config.toml",
)
argv_seen = [call.args[1] for call in ex.call_args_list]
self.assertIn(["mkdir", "-p", "/home/node/.codex"], argv_seen)
self.assertIn(
["chown", "node:node", "/home/node/.codex/config.toml"],
argv_seen,
)
self.assertIn(["chmod", "600", "/home/node/.codex/config.toml"], argv_seen)
def test_copies_dummy_auth_json_to_codex_home(self):
cp_p, ex_p = self._patch()
with cp_p as cp, ex_p as ex:
ex.return_value = SmolvmRunResult(0, "Logged in using ChatGPT\n", "")
_provider_auth.provision_provider_auth(
_plan(
agent_provider_template="codex",
codex_auth_file=Path("/tmp/codex-auth.json"),
),
"bot-bottle-demo-abc12",
)
cp_calls = [call.args for call in cp.call_args_list]
self.assertIn(
("/tmp/codex-config.toml",
"bot-bottle-demo-abc12:/home/node/.codex/config.toml"),
cp_calls,
)
self.assertIn(
("/tmp/codex-auth.json",
"bot-bottle-demo-abc12:/home/node/.codex/auth.json"),
cp_calls,
)
argv_seen = [call.args[1] for call in ex.call_args_list]
self.assertIn(["mkdir", "-p", "/home/node/.codex"], argv_seen)
self.assertIn(
["chown", "node:node", "/home/node/.codex"],
argv_seen,
)
self.assertIn(
["chmod", "700", "/home/node/.codex"],
argv_seen,
)
self.assertIn(
[
"find", "/home/node/.codex",
"-maxdepth", "1",
"-type", "f",
"(",
"-name", "*.sqlite",
"-o", "-name", "*.sqlite-*",
"-o", "-name", "*.codex-repair-*.bak",
")",
"-delete",
],
argv_seen,
)
self.assertIn(
["chown", "node:node", "/home/node/.codex/auth.json"],
argv_seen,
)
self.assertIn(["chmod", "600", "/home/node/.codex/auth.json"], argv_seen)
self.assertIn(
[
"runuser", "-u", "node", "--",
"env",
"HOME=/home/node",
"CODEX_HOME=/home/node/.codex",
"codex", "login", "status",
],
argv_seen,
)
def test_honors_codex_home_from_guest_env(self):
cp_p, ex_p = self._patch()
with cp_p as cp, ex_p as ex:
ex.return_value = SmolvmRunResult(0, "Logged in using ChatGPT\n", "")
_provider_auth.provision_provider_auth(
_plan(
agent_provider_template="codex",
codex_auth_file=Path("/tmp/codex-auth.json"),
guest_env={"CODEX_HOME": "/run/codex-home"},
),
"bot-bottle-demo-abc12",
)
cp_calls = [call.args for call in cp.call_args_list]
self.assertIn(
("/tmp/codex-config.toml",
"bot-bottle-demo-abc12:/run/codex-home/config.toml"),
cp_calls,
)
self.assertIn(
("/tmp/codex-auth.json",
"bot-bottle-demo-abc12:/run/codex-home/auth.json"),
cp_calls,
)
argv_seen = [call.args[1] for call in ex.call_args_list]
self.assertIn(
[
"runuser", "-u", "node", "--",
"env",
"HOME=/home/node",
"CODEX_HOME=/run/codex-home",
"codex", "login", "status",
],
argv_seen,
)
def test_dies_when_codex_home_cannot_be_created(self):
cp_p, ex_p = self._patch()
with cp_p as cp, ex_p as ex:
ex.return_value = SmolvmRunResult(1, "", "mkdir: nope\n")
with self.assertRaises(SystemExit):
_provider_auth.provision_provider_auth(
_plan(
agent_provider_template="codex",
codex_auth_file=Path("/tmp/codex-auth.json"),
),
"bot-bottle-demo-abc12",
)
self.assertEqual(0, cp.call_count)
self.assertEqual(1, ex.call_count)
def test_dies_when_codex_rejects_dummy_auth(self):
cp_p, ex_p = self._patch()
with cp_p, ex_p as ex:
# CODEX_HOME setup ok (0), but codex login status fails (1).
ex.side_effect = [
SmolvmRunResult(0, "", ""), # mkdir CODEX_HOME
SmolvmRunResult(0, "", ""), # chown CODEX_HOME
SmolvmRunResult(0, "", ""), # chmod CODEX_HOME
SmolvmRunResult(0, "", ""), # reset runtime db files
SmolvmRunResult(0, "", ""), # chown config.toml
SmolvmRunResult(0, "", ""), # chmod config.toml
SmolvmRunResult(0, "", ""), # chown auth.json
SmolvmRunResult(0, "", ""), # chmod auth.json
SmolvmRunResult(1, "Not logged in\n", ""), # login status
]
with self.assertRaises(SystemExit):
_provider_auth.provision_provider_auth(
_plan(
agent_provider_template="codex",
codex_auth_file=Path("/tmp/codex-auth.json"),
),
"bot-bottle-demo-abc12",
)
class TestProvisionSkills(unittest.TestCase):
def _patch_host_skill_dir(self, returns: dict[str, str]):
return patch(