Files
bot-bottle/bot_bottle/codex_auth.py
T

314 lines
10 KiB
Python

"""Host Codex auth helpers.
Reads the host's Codex ChatGPT/device-login auth state and returns only
the short-lived access token needed by egress. This module deliberately
does not expose refresh tokens or raw auth payloads.
"""
from __future__ import annotations
import base64
import json
import os
from copy import deepcopy
from datetime import datetime, timezone
from pathlib import Path
from .log import die
from .util import expand_tilde
def codex_auth_path(host_env: dict[str, str] | None = None) -> Path:
env = os.environ if host_env is None else host_env
home = env.get("CODEX_HOME")
if home:
return Path(expand_tilde(home)) / "auth.json"
return Path.home() / ".codex" / "auth.json"
def codex_host_access_token(
host_env: dict[str, str] | None = None,
*,
now: datetime | None = None,
) -> str:
path = codex_auth_path(host_env)
if not path.is_file():
die(
f"codex host credentials: auth file missing at {path}. "
"Run `codex login --device-auth` on the host or disable "
"agent_provider.forward_host_credentials."
)
raw = _read_auth_object(path)
auth_mode = raw.get("auth_mode")
if not isinstance(auth_mode, str) or auth_mode == "api_key":
die(
"codex host credentials: host Codex auth is not user/device "
"auth. Run `codex login --device-auth` on the host."
)
tokens = raw.get("tokens")
if not isinstance(tokens, dict):
die(f"codex host credentials: {path} is missing tokens")
access = tokens.get("access_token")
if not isinstance(access, str) or not access:
die(
f"codex host credentials: {path} is missing tokens.access_token. "
"Run `codex login --device-auth` on the host."
)
exp = _jwt_exp(access)
if exp is None:
die("codex host credentials: tokens.access_token is not a JWT with exp")
check_now = now or datetime.now(timezone.utc)
if exp <= check_now:
die(
"codex host credentials: host Codex access token is expired. "
"Run `codex login --device-auth` on the host and restart the bottle."
)
return access
def codex_dummy_auth_json(
host_env: dict[str, str] | None = None,
*,
now: datetime | None = None,
) -> str:
"""Return a non-secret `auth.json` that keeps Codex in the host's
auth branch while egress owns the real bearer token.
The dummy access/id tokens carry the *host* token's real `exp` so
Codex's proactive refresh lifecycle (it refreshes when its local
access token is at/past expiry) tracks the real token instead of
firing after an artificial TTL. Codex cannot refresh inside the
bottle — the refresh token is a placeholder and the OpenAI token
endpoint is off-route — so a shorter dummy exp would drop Codex to
the sign-in screen the moment it lapsed, even while egress still
holds a valid bearer."""
path = codex_auth_path(host_env)
access = codex_host_access_token(host_env, now=now)
raw = _read_auth_object(path)
host_exp = _jwt_exp(access)
exp_ts = int(host_exp.timestamp()) if host_exp is not None else None
dummy = _redact_codex_auth(deepcopy(raw), now=now, exp_ts=exp_ts)
return json.dumps(dummy, indent=2, sort_keys=True) + "\n"
def write_codex_dummy_auth_file(
path: Path,
host_env: dict[str, str] | None = None,
*,
now: datetime | None = None,
) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(codex_dummy_auth_json(host_env, now=now))
path.chmod(0o600)
def _read_auth_object(path: Path) -> dict:
try:
raw = json.loads(path.read_text())
except (OSError, json.JSONDecodeError) as e:
die(f"codex host credentials: could not read valid JSON at {path}: {e}")
if not isinstance(raw, dict):
die(f"codex host credentials: {path} must contain a JSON object")
return raw
def _dummy_exp(now: datetime | None, exp_ts: int | None) -> int:
if exp_ts is not None:
return exp_ts
check_now = now or datetime.now(timezone.utc)
return int(check_now.timestamp()) + 3600
def _dummy_jwt(now: datetime | None = None, *, exp_ts: int | None = None) -> str:
return _encode_dummy_jwt({
"exp": _dummy_exp(now, exp_ts),
"sub": "bot-bottle-placeholder",
})
def _dummy_jwt_from_host(
value: object, *, now: datetime | None = None, exp_ts: int | None = None,
) -> str:
if not isinstance(value, str):
return _dummy_jwt(now, exp_ts=exp_ts)
parts = value.split(".")
if len(parts) < 2:
return _dummy_jwt(now, exp_ts=exp_ts)
try:
payload = json.loads(_b64url_decode(parts[1]))
except (ValueError, json.JSONDecodeError):
return _dummy_jwt(now, exp_ts=exp_ts)
if not isinstance(payload, dict):
return _dummy_jwt(now, exp_ts=exp_ts)
return _encode_dummy_jwt(_redact_jwt_payload(payload, now=now, exp_ts=exp_ts))
def _encode_dummy_jwt(payload: dict) -> str:
def enc(obj: dict) -> str:
raw = json.dumps(obj, separators=(",", ":")).encode()
return base64.urlsafe_b64encode(raw).decode().rstrip("=")
return f"{enc({'alg': 'none', 'typ': 'JWT'})}.{enc(payload)}.placeholder"
def _redact_jwt_payload(
payload: dict,
*,
now: datetime | None = None,
exp_ts: int | None = None,
) -> dict:
out = _redact_claims(payload)
if not isinstance(out, dict):
out = {}
out["exp"] = _dummy_exp(now, exp_ts)
out.setdefault("sub", "bot-bottle-placeholder")
return out
def _redact_claims(value: object) -> object:
if isinstance(value, dict):
out: dict[str, object] = {}
for key, inner in value.items():
lower = key.lower()
if key == "https://api.openai.com/profile":
out[key] = _redact_profile_claim(inner)
elif key == "https://api.openai.com/auth":
out[key] = _redact_auth_claim(inner)
elif lower == "email":
out[key] = "bot-bottle@example.invalid"
elif lower == "email_verified":
out[key] = True
elif lower in {"exp", "iat", "nbf", "auth_time", "pwd_auth_time"}:
out[key] = inner if isinstance(inner, (int, float)) else 0
elif lower in {"aud", "scp", "amr"}:
out[key] = inner if isinstance(inner, list) else []
elif isinstance(inner, bool):
out[key] = inner
elif isinstance(inner, dict):
out[key] = {}
elif isinstance(inner, list):
out[key] = []
else:
out[key] = "bot-bottle-placeholder"
return out
if isinstance(value, list):
return []
return "bot-bottle-placeholder"
def _redact_profile_claim(value: object) -> dict:
profile = value if isinstance(value, dict) else {}
return {
"email": "bot-bottle@example.invalid",
"email_verified": bool(profile.get("email_verified", True)),
}
def _redact_auth_claim(value: object) -> dict:
auth = value if isinstance(value, dict) else {}
out: dict[str, object] = {}
for key, inner in auth.items():
lower = key.lower()
if lower == "chatgpt_plan_type" and isinstance(inner, str) and inner:
out[key] = inner
elif lower == "chatgpt_account_id" and isinstance(inner, str) and inner:
# Current Codex uses the selected account id when building
# ChatGPT requests. Keep that non-secret identifier aligned
# with the host while egress owns the real bearer token.
out[key] = inner
elif lower == "localhost" and isinstance(inner, bool):
out[key] = inner
elif isinstance(inner, bool):
out[key] = inner
elif isinstance(inner, list):
out[key] = []
elif isinstance(inner, dict):
out[key] = {}
else:
out[key] = "bot-bottle-placeholder"
out.setdefault("chatgpt_plan_type", "unknown")
out.setdefault("user_id", "bot-bottle-placeholder")
out.setdefault("chatgpt_user_id", "bot-bottle-placeholder")
out.setdefault("chatgpt_account_id", "bot-bottle-placeholder")
return out
def _redact_codex_auth(
value: object, *, now: datetime | None = None, exp_ts: int | None = None,
) -> object:
auth = value if isinstance(value, dict) else {}
out: dict[str, object] = {}
for key, inner in auth.items():
lower = key.lower()
if lower == "auth_mode" and isinstance(inner, str) and inner:
out[key] = inner
elif lower == "openai_api_key":
out[key] = None
elif lower == "tokens":
out[key] = _redact_token_block(inner, now=now, exp_ts=exp_ts)
else:
out[key] = _redact_unknown_auth_value(inner)
return out
def _redact_token_block(
value: object, *, now: datetime | None = None, exp_ts: int | None = None,
) -> dict[str, object]:
tokens = value if isinstance(value, dict) else {}
out: dict[str, object] = {}
for key, inner in tokens.items():
lower = key.lower()
if lower in {"access_token", "id_token"}:
out[key] = _dummy_jwt_from_host(inner, now=now, exp_ts=exp_ts)
elif lower == "account_id" and isinstance(inner, str) and inner:
# Current Codex uses this non-secret selected account id
# while egress owns the real bearer token.
out[key] = inner
else:
out[key] = _redact_unknown_auth_value(inner)
return out
def _redact_unknown_auth_value(value: object) -> object:
if isinstance(value, bool):
return value
if isinstance(value, dict):
return {}
if isinstance(value, list):
return []
if value is None:
return None
return "bot-bottle-placeholder"
def _jwt_exp(token: str) -> datetime | None:
parts = token.split(".")
if len(parts) < 2:
return None
try:
payload = json.loads(_b64url_decode(parts[1]))
except (ValueError, json.JSONDecodeError):
return None
if not isinstance(payload, dict):
return None
exp = payload.get("exp")
if not isinstance(exp, (int, float)):
return None
return datetime.fromtimestamp(exp, timezone.utc)
def _b64url_decode(value: str) -> str:
padded = value + ("=" * (-len(value) % 4))
return base64.urlsafe_b64decode(padded.encode("ascii")).decode("utf-8")
__all__ = [
"codex_auth_path",
"codex_dummy_auth_json",
"codex_host_access_token",
"write_codex_dummy_auth_file",
]