Files
bot-bottle/bot_bottle/contrib/codex/codex_auth.py
T
didericis-claude 3866461bf4 fix: correct broken imports and fileno() guard after rebase
codex_auth.py was moved into contrib/codex/ but still used `.log`/
`.util` relative imports that resolved to the parent bot_bottle
package before the move — update to `...log` / `...util`.

_read_winsize() called sys.stdin.fileno() outside the OSError guard;
pytest's redirected stdin raises UnsupportedOperation (an OSError
subclass) there, breaking test_returns_first_tty_size. Move fileno()
inside the try block so any non-TTY stream is skipped cleanly.
2026-06-06 16:06:09 -04:00

329 lines
11 KiB
Python

"""Host Codex auth helpers.
Reads the host's Codex ChatGPT/device-login auth state and returns only
the short-lived access token needed by egress. This module deliberately
does not expose refresh tokens or raw auth payloads.
"""
from __future__ import annotations
import base64
import json
import os
from copy import deepcopy
from datetime import datetime, timezone
from pathlib import Path
from typing import cast
from ...log import die
from ...util import expand_tilde
def codex_auth_path(host_env: dict[str, str] | None = None) -> Path:
env = os.environ if host_env is None else host_env
home = env.get("CODEX_HOME")
if home:
return Path(expand_tilde(home)) / "auth.json"
return Path.home() / ".codex" / "auth.json"
def codex_host_access_token(
host_env: dict[str, str] | None = None,
*,
now: datetime | None = None,
) -> str:
path = codex_auth_path(host_env)
if not path.is_file():
die(
f"codex host credentials: auth file missing at {path}. "
"Run `codex login --device-auth` on the host or disable "
"agent_provider.forward_host_credentials."
)
raw = _read_auth_object(path)
auth_mode = raw.get("auth_mode")
if not isinstance(auth_mode, str) or auth_mode == "api_key":
die(
"codex host credentials: host Codex auth is not user/device "
"auth. Run `codex login --device-auth` on the host."
)
tokens = raw.get("tokens")
if not isinstance(tokens, dict):
die(f"codex host credentials: {path} is missing tokens")
tokens_typed = cast(dict[str, object], tokens)
access = tokens_typed.get("access_token")
if not isinstance(access, str) or not access:
die(
f"codex host credentials: {path} is missing tokens.access_token. "
"Run `codex login --device-auth` on the host."
)
exp = _jwt_exp(access)
if exp is None:
die("codex host credentials: tokens.access_token is not a JWT with exp")
check_now = now or datetime.now(timezone.utc)
if exp <= check_now:
die(
"codex host credentials: host Codex access token is expired. "
"Run `codex login --device-auth` on the host and restart the bottle."
)
return access
def codex_dummy_auth_json(
host_env: dict[str, str] | None = None,
*,
now: datetime | None = None,
) -> str:
"""Return a non-secret `auth.json` that keeps Codex in the host's
auth branch while egress owns the real bearer token.
The dummy access/id tokens carry the *host* token's real `exp` so
Codex's proactive refresh lifecycle (it refreshes when its local
access token is at/past expiry) tracks the real token instead of
firing after an artificial TTL. Codex cannot refresh inside the
bottle — the refresh token is a placeholder and the OpenAI token
endpoint is off-route — so a shorter dummy exp would drop Codex to
the sign-in screen the moment it lapsed, even while egress still
holds a valid bearer."""
path = codex_auth_path(host_env)
access = codex_host_access_token(host_env, now=now)
raw = _read_auth_object(path)
host_exp = _jwt_exp(access)
exp_ts = int(host_exp.timestamp()) if host_exp is not None else None
dummy = _redact_codex_auth(deepcopy(raw), now=now, exp_ts=exp_ts)
return json.dumps(dummy, indent=2, sort_keys=True) + "\n"
def write_codex_dummy_auth_file(
path: Path,
host_env: dict[str, str] | None = None,
*,
now: datetime | None = None,
) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(codex_dummy_auth_json(host_env, now=now))
path.chmod(0o600)
def _read_auth_object(path: Path) -> dict[str, object]:
try:
raw = json.loads(path.read_text())
except (OSError, json.JSONDecodeError) as e:
die(f"codex host credentials: could not read valid JSON at {path}: {e}")
if not isinstance(raw, dict):
die(f"codex host credentials: {path} must contain a JSON object")
return cast(dict[str, object], raw)
def _dummy_exp(now: datetime | None, exp_ts: int | None) -> int:
if exp_ts is not None:
return exp_ts
check_now = now or datetime.now(timezone.utc)
return int(check_now.timestamp()) + 3600
def _dummy_timestamp(now: datetime | None = None) -> str:
check_now = now or datetime.now(timezone.utc)
if check_now.tzinfo is None:
check_now = check_now.replace(tzinfo=timezone.utc)
check_now = check_now.astimezone(timezone.utc)
return check_now.isoformat(timespec="milliseconds").replace("+00:00", "Z")
def _dummy_jwt(now: datetime | None = None, *, exp_ts: int | None = None) -> str:
return _encode_dummy_jwt({
"exp": _dummy_exp(now, exp_ts),
"sub": "bot-bottle-placeholder",
})
def _dummy_jwt_from_host(
value: object, *, now: datetime | None = None, exp_ts: int | None = None,
) -> str:
if not isinstance(value, str):
return _dummy_jwt(now, exp_ts=exp_ts)
parts = value.split(".")
if len(parts) < 2:
return _dummy_jwt(now, exp_ts=exp_ts)
try:
payload = json.loads(_b64url_decode(parts[1]))
except (ValueError, json.JSONDecodeError):
return _dummy_jwt(now, exp_ts=exp_ts)
if not isinstance(payload, dict):
return _dummy_jwt(now, exp_ts=exp_ts)
return _encode_dummy_jwt(_redact_jwt_payload(cast(dict[str, object], payload), now=now, exp_ts=exp_ts))
def _encode_dummy_jwt(payload: dict[str, object]) -> str:
def enc(obj: dict[str, object]) -> str:
raw = json.dumps(obj, separators=(",", ":")).encode()
return base64.urlsafe_b64encode(raw).decode().rstrip("=")
return f"{enc({'alg': 'none', 'typ': 'JWT'})}.{enc(payload)}.placeholder"
def _redact_jwt_payload(
payload: dict[str, object],
*,
now: datetime | None = None,
exp_ts: int | None = None,
) -> dict[str, object]:
out = _redact_claims(payload)
if not isinstance(out, dict):
out = {}
out_typed: dict[str, object] = cast(dict[str, object], out)
out_typed["exp"] = _dummy_exp(now, exp_ts)
out_typed.setdefault("sub", "bot-bottle-placeholder")
return out_typed
def _redact_claims(value: object) -> object:
if isinstance(value, dict):
out: dict[str, object] = {}
for key, inner in cast(dict[str, object], value).items():
lower = key.lower()
if key == "https://api.openai.com/profile":
out[key] = _redact_profile_claim(inner)
elif key == "https://api.openai.com/auth":
out[key] = _redact_auth_claim(inner)
elif lower == "email":
out[key] = "bot-bottle@example.invalid"
elif lower == "email_verified":
out[key] = True
elif lower in {"exp", "iat", "nbf", "auth_time", "pwd_auth_time"}:
out[key] = inner if isinstance(inner, (int, float)) else 0
elif lower in {"aud", "scp", "amr"}:
out[key] = inner if isinstance(inner, list) else []
elif isinstance(inner, bool):
out[key] = inner
elif isinstance(inner, dict):
out[key] = {}
elif isinstance(inner, list):
out[key] = []
else:
out[key] = "bot-bottle-placeholder"
return out
if isinstance(value, list):
return []
return "bot-bottle-placeholder"
def _redact_profile_claim(value: object) -> dict[str, object]:
profile = cast(dict[str, object], value) if isinstance(value, dict) else {}
return {
"email": "bot-bottle@example.invalid",
"email_verified": bool(profile.get("email_verified", True)),
}
def _redact_auth_claim(value: object) -> dict[str, object]:
auth = cast(dict[str, object], value) if isinstance(value, dict) else {}
out: dict[str, object] = {}
for key, inner in auth.items():
lower = key.lower()
if lower == "chatgpt_plan_type" and isinstance(inner, str) and inner:
out[key] = inner
elif lower == "chatgpt_account_id" and isinstance(inner, str) and inner:
# Current Codex uses the selected account id when building
# ChatGPT requests. Keep that non-secret identifier aligned
# with the host while egress owns the real bearer token.
out[key] = inner
elif lower == "localhost" and isinstance(inner, bool):
out[key] = inner
elif isinstance(inner, bool):
out[key] = inner
elif isinstance(inner, list):
out[key] = []
elif isinstance(inner, dict):
out[key] = {}
else:
out[key] = "bot-bottle-placeholder"
out.setdefault("chatgpt_plan_type", "unknown")
out.setdefault("user_id", "bot-bottle-placeholder")
out.setdefault("chatgpt_user_id", "bot-bottle-placeholder")
out.setdefault("chatgpt_account_id", "bot-bottle-placeholder")
return out
def _redact_codex_auth(
value: object, *, now: datetime | None = None, exp_ts: int | None = None,
) -> object:
auth = cast(dict[str, object], value) if isinstance(value, dict) else {}
out: dict[str, object] = {}
for key, inner in auth.items():
lower = key.lower()
if lower == "auth_mode" and isinstance(inner, str) and inner:
out[key] = inner
elif lower == "openai_api_key":
out[key] = None
elif lower == "last_refresh":
# Codex parses this as a timestamp on startup. Keep the
# schema valid without copying host-side session metadata.
out[key] = _dummy_timestamp(now)
elif lower == "tokens":
out[key] = _redact_token_block(inner, now=now, exp_ts=exp_ts)
else:
out[key] = _redact_unknown_auth_value(inner)
return out
def _redact_token_block(
value: object, *, now: datetime | None = None, exp_ts: int | None = None,
) -> dict[str, object]:
tokens = cast(dict[str, object], value) if isinstance(value, dict) else {}
out: dict[str, object] = {}
for key, inner in tokens.items():
lower = key.lower()
if lower in {"access_token", "id_token"}:
out[key] = _dummy_jwt_from_host(inner, now=now, exp_ts=exp_ts)
elif lower == "account_id" and isinstance(inner, str) and inner:
# Current Codex uses this non-secret selected account id
# while egress owns the real bearer token.
out[key] = inner
else:
out[key] = _redact_unknown_auth_value(inner)
return out
def _redact_unknown_auth_value(value: object) -> object:
if isinstance(value, bool):
return value
if isinstance(value, dict):
return {}
if isinstance(value, list):
return []
if value is None:
return None
return "bot-bottle-placeholder"
def _jwt_exp(token: str) -> datetime | None:
parts = token.split(".")
if len(parts) < 2:
return None
try:
payload = json.loads(_b64url_decode(parts[1]))
except (ValueError, json.JSONDecodeError):
return None
if not isinstance(payload, dict):
return None
exp = cast(dict[str, object], payload).get("exp")
if not isinstance(exp, (int, float)):
return None
return datetime.fromtimestamp(exp, timezone.utc)
def _b64url_decode(value: str) -> str:
padded = value + ("=" * (-len(value) % 4))
return base64.urlsafe_b64decode(padded.encode("ascii")).decode("utf-8")
__all__ = [
"codex_auth_path",
"codex_dummy_auth_json",
"codex_host_access_token",
"write_codex_dummy_auth_file",
]