"""Host Codex auth helpers. Reads the host's Codex ChatGPT/device-login auth state and returns only the short-lived access token needed by egress. This module deliberately does not expose refresh tokens or raw auth payloads. """ from __future__ import annotations import base64 import json import os from copy import deepcopy from datetime import datetime, timezone from pathlib import Path from .log import die from .util import expand_tilde def codex_auth_path(host_env: dict[str, str] | None = None) -> Path: env = os.environ if host_env is None else host_env home = env.get("CODEX_HOME") if home: return Path(expand_tilde(home)) / "auth.json" return Path.home() / ".codex" / "auth.json" def codex_host_access_token( host_env: dict[str, str] | None = None, *, now: datetime | None = None, ) -> str: path = codex_auth_path(host_env) if not path.is_file(): die( f"codex host credentials: auth file missing at {path}. " "Run `codex login --device-auth` on the host or disable " "agent_provider.forward_host_credentials." ) raw = _read_auth_object(path) auth_mode = raw.get("auth_mode") if not isinstance(auth_mode, str) or auth_mode == "api_key": die( "codex host credentials: host Codex auth is not user/device " "auth. Run `codex login --device-auth` on the host." ) tokens = raw.get("tokens") if not isinstance(tokens, dict): die(f"codex host credentials: {path} is missing tokens") access = tokens.get("access_token") if not isinstance(access, str) or not access: die( f"codex host credentials: {path} is missing tokens.access_token. " "Run `codex login --device-auth` on the host." ) exp = _jwt_exp(access) if exp is None: die("codex host credentials: tokens.access_token is not a JWT with exp") check_now = now or datetime.now(timezone.utc) if exp <= check_now: die( "codex host credentials: host Codex access token is expired. " "Run `codex login --device-auth` on the host and restart the bottle." ) return access def codex_dummy_auth_json( host_env: dict[str, str] | None = None, *, now: datetime | None = None, ) -> str: """Return a non-secret `auth.json` that keeps Codex in the host's auth branch while egress owns the real bearer token.""" path = codex_auth_path(host_env) codex_host_access_token(host_env, now=now) raw = _read_auth_object(path) dummy = _redact_codex_auth(deepcopy(raw), now=now) return json.dumps(dummy, indent=2, sort_keys=True) + "\n" def write_codex_dummy_auth_file( path: Path, host_env: dict[str, str] | None = None, *, now: datetime | None = None, ) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(codex_dummy_auth_json(host_env, now=now)) path.chmod(0o600) def _read_auth_object(path: Path) -> dict: try: raw = json.loads(path.read_text()) except (OSError, json.JSONDecodeError) as e: die(f"codex host credentials: could not read valid JSON at {path}: {e}") if not isinstance(raw, dict): die(f"codex host credentials: {path} must contain a JSON object") return raw def _dummy_jwt(now: datetime | None = None) -> str: check_now = now or datetime.now(timezone.utc) exp = int(check_now.timestamp()) + 3600 return _encode_dummy_jwt({ "exp": exp, "sub": "bot-bottle-placeholder", }) def _dummy_jwt_from_host(value: object, *, now: datetime | None = None) -> str: if not isinstance(value, str): return _dummy_jwt(now) parts = value.split(".") if len(parts) < 2: return _dummy_jwt(now) try: payload = json.loads(_b64url_decode(parts[1])) except (ValueError, json.JSONDecodeError): return _dummy_jwt(now) if not isinstance(payload, dict): return _dummy_jwt(now) return _encode_dummy_jwt(_redact_jwt_payload(payload, now=now)) def _encode_dummy_jwt(payload: dict) -> str: def enc(obj: dict) -> str: raw = json.dumps(obj, separators=(",", ":")).encode() return base64.urlsafe_b64encode(raw).decode().rstrip("=") return f"{enc({'alg': 'none', 'typ': 'JWT'})}.{enc(payload)}.placeholder" def _redact_jwt_payload( payload: dict, *, now: datetime | None = None, ) -> dict: check_now = now or datetime.now(timezone.utc) out = _redact_claims(payload) if not isinstance(out, dict): out = {} out["exp"] = int(check_now.timestamp()) + 3600 out.setdefault("sub", "bot-bottle-placeholder") return out def _redact_claims(value: object) -> object: if isinstance(value, dict): out: dict[str, object] = {} for key, inner in value.items(): lower = key.lower() if key == "https://api.openai.com/profile": out[key] = _redact_profile_claim(inner) elif key == "https://api.openai.com/auth": out[key] = _redact_auth_claim(inner) elif lower == "email": out[key] = "bot-bottle@example.invalid" elif lower == "email_verified": out[key] = True elif lower in {"exp", "iat", "nbf", "auth_time", "pwd_auth_time"}: out[key] = inner if isinstance(inner, (int, float)) else 0 elif lower in {"aud", "scp", "amr"}: out[key] = inner if isinstance(inner, list) else [] elif isinstance(inner, bool): out[key] = inner elif isinstance(inner, (dict, list)): out[key] = _redact_claims(inner) else: out[key] = "bot-bottle-placeholder" return out if isinstance(value, list): return [] return "bot-bottle-placeholder" def _redact_profile_claim(value: object) -> dict: profile = value if isinstance(value, dict) else {} return { "email": "bot-bottle@example.invalid", "email_verified": bool(profile.get("email_verified", True)), } def _redact_auth_claim(value: object) -> dict: auth = value if isinstance(value, dict) else {} out: dict[str, object] = {} for key, inner in auth.items(): lower = key.lower() if lower == "chatgpt_plan_type" and isinstance(inner, str) and inner: out[key] = inner elif lower == "localhost" and isinstance(inner, bool): out[key] = inner elif isinstance(inner, bool): out[key] = inner elif isinstance(inner, list): out[key] = [] elif isinstance(inner, dict): out[key] = {} else: out[key] = "bot-bottle-placeholder" out.setdefault("chatgpt_plan_type", "unknown") out.setdefault("user_id", "bot-bottle-placeholder") out.setdefault("chatgpt_user_id", "bot-bottle-placeholder") out.setdefault("chatgpt_account_id", "bot-bottle-placeholder") return out def _redact_codex_auth(value: object, *, now: datetime | None = None) -> object: if isinstance(value, dict): out: dict[str, object] = {} for key, inner in value.items(): lower = key.lower() if lower == "openai_api_key": out[key] = None elif lower == "tokens": out[key] = _redact_codex_auth(inner, now=now) elif lower in {"access_token", "id_token"}: out[key] = _dummy_jwt_from_host(inner, now=now) elif "token" in lower or "secret" in lower or lower.endswith("_key"): out[key] = "bot-bottle-placeholder" elif lower in {"account_id", "user_id", "email"}: out[key] = "bot-bottle-placeholder" else: out[key] = _redact_codex_auth(inner, now=now) return out if isinstance(value, list): return [_redact_codex_auth(v, now=now) for v in value] return value def _jwt_exp(token: str) -> datetime | None: parts = token.split(".") if len(parts) < 2: return None try: payload = json.loads(_b64url_decode(parts[1])) except (ValueError, json.JSONDecodeError): return None if not isinstance(payload, dict): return None exp = payload.get("exp") if not isinstance(exp, (int, float)): return None return datetime.fromtimestamp(exp, timezone.utc) def _b64url_decode(value: str) -> str: padded = value + ("=" * (-len(value) % 4)) return base64.urlsafe_b64decode(padded.encode("ascii")).decode("utf-8") __all__ = [ "codex_auth_path", "codex_dummy_auth_json", "codex_host_access_token", "write_codex_dummy_auth_file", ]