"""Host Codex auth helpers. Reads the host's Codex ChatGPT/device-login auth state and returns only the short-lived access token needed by egress. This module deliberately does not expose refresh tokens or raw auth payloads. """ from __future__ import annotations import base64 import json import os from copy import deepcopy from datetime import datetime, timezone from pathlib import Path from typing import cast from bot_bottle.log import die from bot_bottle.util import expand_tilde def codex_auth_path(host_env: dict[str, str] | None = None) -> Path: env = os.environ if host_env is None else host_env home = env.get("CODEX_HOME") if home: return Path(expand_tilde(home)) / "auth.json" return Path.home() / ".codex" / "auth.json" def codex_host_access_token( host_env: dict[str, str] | None = None, *, now: datetime | None = None, ) -> str: path = codex_auth_path(host_env) if not path.is_file(): die( f"codex host credentials: auth file missing at {path}. " "Run `codex login --device-auth` on the host or disable " "agent_provider.forward_host_credentials." ) raw = _read_auth_object(path) auth_mode = raw.get("auth_mode") if not isinstance(auth_mode, str) or auth_mode == "api_key": die( "codex host credentials: host Codex auth is not user/device " "auth. Run `codex login --device-auth` on the host." ) tokens = raw.get("tokens") if not isinstance(tokens, dict): die(f"codex host credentials: {path} is missing tokens") tokens_typed = cast(dict[str, object], tokens) access = tokens_typed.get("access_token") if not isinstance(access, str) or not access: die( f"codex host credentials: {path} is missing tokens.access_token. " "Run `codex login --device-auth` on the host." ) exp = _jwt_exp(access) if exp is None: die("codex host credentials: tokens.access_token is not a JWT with exp") check_now = now or datetime.now(timezone.utc) if exp <= check_now: die( "codex host credentials: host Codex access token is expired. " "Run `codex login --device-auth` on the host and restart the bottle." ) return access def codex_dummy_auth_json( host_env: dict[str, str] | None = None, *, now: datetime | None = None, ) -> str: """Return a non-secret `auth.json` that keeps Codex in the host's auth branch while egress owns the real bearer token. The dummy access/id tokens carry the *host* token's real `exp` so Codex's proactive refresh lifecycle (it refreshes when its local access token is at/past expiry) tracks the real token instead of firing after an artificial TTL. Codex cannot refresh inside the bottle — the refresh token is a placeholder and the OpenAI token endpoint is off-route — so a shorter dummy exp would drop Codex to the sign-in screen the moment it lapsed, even while egress still holds a valid bearer.""" path = codex_auth_path(host_env) access = codex_host_access_token(host_env, now=now) raw = _read_auth_object(path) host_exp = _jwt_exp(access) exp_ts = int(host_exp.timestamp()) if host_exp is not None else None dummy = _redact_codex_auth(deepcopy(raw), now=now, exp_ts=exp_ts) return json.dumps(dummy, indent=2, sort_keys=True) + "\n" def write_codex_dummy_auth_file( path: Path, host_env: dict[str, str] | None = None, *, now: datetime | None = None, ) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(codex_dummy_auth_json(host_env, now=now)) path.chmod(0o600) def _read_auth_object(path: Path) -> dict[str, object]: try: raw = json.loads(path.read_text()) except (OSError, json.JSONDecodeError) as e: die(f"codex host credentials: could not read valid JSON at {path}: {e}") if not isinstance(raw, dict): die(f"codex host credentials: {path} must contain a JSON object") return cast(dict[str, object], raw) def _dummy_exp(now: datetime | None, exp_ts: int | None) -> int: if exp_ts is not None: return exp_ts check_now = now or datetime.now(timezone.utc) return int(check_now.timestamp()) + 3600 def _dummy_timestamp(now: datetime | None = None) -> str: check_now = now or datetime.now(timezone.utc) if check_now.tzinfo is None: check_now = check_now.replace(tzinfo=timezone.utc) check_now = check_now.astimezone(timezone.utc) return check_now.isoformat(timespec="milliseconds").replace("+00:00", "Z") def _dummy_jwt(now: datetime | None = None, *, exp_ts: int | None = None) -> str: return _encode_dummy_jwt({ "exp": _dummy_exp(now, exp_ts), "sub": "bot-bottle-placeholder", }) def _dummy_jwt_from_host( value: object, *, now: datetime | None = None, exp_ts: int | None = None, ) -> str: if not isinstance(value, str): return _dummy_jwt(now, exp_ts=exp_ts) parts = value.split(".") if len(parts) < 2: return _dummy_jwt(now, exp_ts=exp_ts) try: payload = json.loads(_b64url_decode(parts[1])) except (ValueError, json.JSONDecodeError): return _dummy_jwt(now, exp_ts=exp_ts) if not isinstance(payload, dict): return _dummy_jwt(now, exp_ts=exp_ts) return _encode_dummy_jwt( _redact_jwt_payload(cast(dict[str, object], payload), now=now, exp_ts=exp_ts) ) def _encode_dummy_jwt(payload: dict[str, object]) -> str: def enc(obj: dict[str, object]) -> str: raw = json.dumps(obj, separators=(",", ":")).encode() return base64.urlsafe_b64encode(raw).decode().rstrip("=") return f"{enc({'alg': 'none', 'typ': 'JWT'})}.{enc(payload)}.placeholder" def _redact_jwt_payload( payload: dict[str, object], *, now: datetime | None = None, exp_ts: int | None = None, ) -> dict[str, object]: out = _redact_claims(payload) if not isinstance(out, dict): out = {} out_typed: dict[str, object] = cast(dict[str, object], out) out_typed["exp"] = _dummy_exp(now, exp_ts) out_typed.setdefault("sub", "bot-bottle-placeholder") return out_typed def _redact_claims(value: object) -> object: if isinstance(value, dict): out: dict[str, object] = {} for key, inner in cast(dict[str, object], value).items(): lower = key.lower() if key == "https://api.openai.com/profile": out[key] = _redact_profile_claim(inner) elif key == "https://api.openai.com/auth": out[key] = _redact_auth_claim(inner) elif lower == "email": out[key] = "bot-bottle@example.invalid" elif lower == "email_verified": out[key] = True elif lower in {"exp", "iat", "nbf", "auth_time", "pwd_auth_time"}: out[key] = inner if isinstance(inner, (int, float)) else 0 elif lower in {"aud", "scp", "amr"}: out[key] = inner if isinstance(inner, list) else [] elif isinstance(inner, bool): out[key] = inner elif isinstance(inner, dict): out[key] = {} elif isinstance(inner, list): out[key] = [] else: out[key] = "bot-bottle-placeholder" return out if isinstance(value, list): return [] return "bot-bottle-placeholder" def _redact_profile_claim(value: object) -> dict[str, object]: profile = cast(dict[str, object], value) if isinstance(value, dict) else {} return { "email": "bot-bottle@example.invalid", "email_verified": bool(profile.get("email_verified", True)), } def _redact_auth_claim(value: object) -> dict[str, object]: auth = cast(dict[str, object], value) if isinstance(value, dict) else {} out: dict[str, object] = {} for key, inner in auth.items(): lower = key.lower() if lower == "chatgpt_plan_type" and isinstance(inner, str) and inner: out[key] = inner elif lower == "chatgpt_account_id" and isinstance(inner, str) and inner: # Current Codex uses the selected account id when building # ChatGPT requests. Keep that non-secret identifier aligned # with the host while egress owns the real bearer token. out[key] = inner elif lower == "localhost" and isinstance(inner, bool): out[key] = inner elif isinstance(inner, bool): out[key] = inner elif isinstance(inner, list): out[key] = [] elif isinstance(inner, dict): out[key] = {} else: out[key] = "bot-bottle-placeholder" out.setdefault("chatgpt_plan_type", "unknown") out.setdefault("user_id", "bot-bottle-placeholder") out.setdefault("chatgpt_user_id", "bot-bottle-placeholder") out.setdefault("chatgpt_account_id", "bot-bottle-placeholder") return out def _redact_codex_auth( value: object, *, now: datetime | None = None, exp_ts: int | None = None, ) -> object: auth = cast(dict[str, object], value) if isinstance(value, dict) else {} out: dict[str, object] = {} for key, inner in auth.items(): lower = key.lower() if lower == "auth_mode" and isinstance(inner, str) and inner: out[key] = inner elif lower == "openai_api_key": out[key] = None elif lower == "last_refresh": # Codex parses this as a timestamp on startup. Keep the # schema valid without copying host-side session metadata. out[key] = _dummy_timestamp(now) elif lower == "tokens": out[key] = _redact_token_block(inner, now=now, exp_ts=exp_ts) else: out[key] = _redact_unknown_auth_value(inner) return out def _redact_token_block( value: object, *, now: datetime | None = None, exp_ts: int | None = None, ) -> dict[str, object]: tokens = cast(dict[str, object], value) if isinstance(value, dict) else {} out: dict[str, object] = {} for key, inner in tokens.items(): lower = key.lower() if lower in {"access_token", "id_token"}: out[key] = _dummy_jwt_from_host(inner, now=now, exp_ts=exp_ts) elif lower == "account_id" and isinstance(inner, str) and inner: # Current Codex uses this non-secret selected account id # while egress owns the real bearer token. out[key] = inner else: out[key] = _redact_unknown_auth_value(inner) return out def _redact_unknown_auth_value(value: object) -> object: if isinstance(value, bool): return value if isinstance(value, dict): return {} if isinstance(value, list): return [] if value is None: return None return "bot-bottle-placeholder" def _jwt_exp(token: str) -> datetime | None: parts = token.split(".") if len(parts) < 2: return None try: payload = json.loads(_b64url_decode(parts[1])) except (ValueError, json.JSONDecodeError): return None if not isinstance(payload, dict): return None exp = cast(dict[str, object], payload).get("exp") if not isinstance(exp, (int, float)): return None return datetime.fromtimestamp(exp, timezone.utc) def _b64url_decode(value: str) -> str: padded = value + ("=" * (-len(value) % 4)) return base64.urlsafe_b64decode(padded.encode("ascii")).decode("utf-8") __all__ = [ "codex_auth_path", "codex_dummy_auth_json", "codex_host_access_token", "write_codex_dummy_auth_file", ]