Files
bot-bottle/tests/unit/test_codex_auth.py
T
didericis-codex 8e5262b539
test / unit (pull_request) Successful in 37s
test / integration (pull_request) Successful in 45s
fix(codex): make host-credential bottles actually authenticate
Debugging a live codex smolmachines bottle surfaced three independent
failures past the sign-in screen; fix each so forward_host_credentials
works end to end:

- codex_auth: dummy access/id tokens now inherit the *real* host token's
  exp instead of now+1h. Codex (0.135) refreshes when its local token's
  JWT exp lapses; with a placeholder refresh_token that refresh fails and
  drops to the sign-in screen. Aligning exp tracks the real token's life.

- prepare: set CODEX_CA_CERTIFICATE to the agent CA bundle for codex
  bottles. Codex is rustls and ignores the system store / NODE_EXTRA_CA_
  CERTS; it reads CODEX_CA_CERTIFICATE (fallback SSL_CERT_FILE) for custom
  roots across HTTPS + wss, so it must be pointed at the egress MITM CA or
  injection can't work without tls_passthrough.

- pipelock: auto tls_passthrough the Codex API hosts when
  forward_host_credentials is on. Egress injects the bearer before
  pipelock, whose header DLP then flags the JWT ("request header contains
  secret") and the retry storm trips its 429. passthrough host-gates the
  CONNECT but skips decrypt+rescan of egress-owned auth. The auto-added
  routes aren't in bottle.egress.routes, so the hosts are added explicitly.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-01 16:38:34 -04:00

208 lines
7.4 KiB
Python

"""Unit: host Codex auth extraction."""
from __future__ import annotations
import base64
import json
import tempfile
import unittest
from datetime import datetime, timezone
from pathlib import Path
from bot_bottle.codex_auth import (
codex_auth_path,
codex_dummy_auth_json,
codex_host_access_token,
)
from bot_bottle.log import Die
def _jwt(exp: int) -> str:
def enc(obj: dict) -> str:
raw = json.dumps(obj, separators=(",", ":")).encode()
return base64.urlsafe_b64encode(raw).decode().rstrip("=")
return f"{enc({'alg': 'none'})}.{enc({'exp': exp})}.sig"
def _jwt_payload(token: str) -> dict:
payload = token.split(".")[1]
payload += "=" * (-len(payload) % 4)
return json.loads(base64.urlsafe_b64decode(payload.encode()).decode())
class TestCodexHostAccessToken(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.TemporaryDirectory(prefix="cb-codex-auth.")
self.home = Path(self.tmp.name)
self.auth_path = self.home / "auth.json"
def tearDown(self):
self.tmp.cleanup()
def _write(self, payload: dict) -> None:
self.auth_path.write_text(json.dumps(payload))
def test_auth_path_uses_codex_home(self):
self.assertEqual(
self.auth_path,
codex_auth_path({"CODEX_HOME": str(self.home)}),
)
def test_returns_fresh_chatgpt_access_token(self):
token = _jwt(2000000000)
self._write({
"auth_mode": "chatgpt",
"tokens": {"access_token": token, "refresh_token": "hidden"},
})
out = codex_host_access_token(
{"CODEX_HOME": str(self.home)},
now=datetime(2026, 1, 1, tzinfo=timezone.utc),
)
self.assertEqual(token, out)
def test_missing_auth_file_dies(self):
with self.assertRaises(Die):
codex_host_access_token({"CODEX_HOME": str(self.home)})
def test_non_chatgpt_auth_dies(self):
self._write({"auth_mode": "api_key", "tokens": {"access_token": _jwt(2)}})
with self.assertRaises(Die):
codex_host_access_token({"CODEX_HOME": str(self.home)})
def test_user_auth_mode_is_allowed(self):
token = _jwt(2000000000)
self._write({"auth_mode": "user", "tokens": {"access_token": token}})
out = codex_host_access_token(
{"CODEX_HOME": str(self.home)},
now=datetime(2026, 1, 1, tzinfo=timezone.utc),
)
self.assertEqual(token, out)
def test_expired_token_dies(self):
self._write({
"auth_mode": "chatgpt",
"tokens": {"access_token": _jwt(1000)},
})
with self.assertRaises(Die):
codex_host_access_token(
{"CODEX_HOME": str(self.home)},
now=datetime(2026, 1, 1, tzinfo=timezone.utc),
)
def test_non_jwt_token_dies(self):
self._write({
"auth_mode": "chatgpt",
"tokens": {"access_token": "not-a-jwt"},
})
with self.assertRaises(Die):
codex_host_access_token({"CODEX_HOME": str(self.home)})
def test_dummy_auth_preserves_mode_and_redacts_tokens(self):
access = _jwt(2000000000)
refresh = "host-refresh-token"
self._write({
"auth_mode": "chatgpt",
"OPENAI_API_KEY": None,
"tokens": {
"access_token": access,
"id_token": _jwt(2000000000),
"refresh_token": refresh,
"account_id": "acct-host",
},
"last_refresh": "2026-05-29T00:00:00.000Z",
})
dummy = json.loads(codex_dummy_auth_json(
{"CODEX_HOME": str(self.home)},
now=datetime(2026, 1, 1, tzinfo=timezone.utc),
))
self.assertEqual("chatgpt", dummy["auth_mode"])
self.assertIsNone(dummy["OPENAI_API_KEY"])
self.assertNotEqual(access, dummy["tokens"]["access_token"])
self.assertNotEqual(refresh, dummy["tokens"]["refresh_token"])
self.assertEqual("bot-bottle-placeholder", dummy["tokens"]["refresh_token"])
self.assertEqual("acct-host", dummy["tokens"]["account_id"])
self.assertIsNotNone(
codex_host_access_token(
{"CODEX_HOME": str(self.home)},
now=datetime(2026, 1, 1, tzinfo=timezone.utc),
)
)
def test_dummy_auth_tokens_inherit_host_token_exp(self):
# Codex refreshes when its local access token is at/past exp;
# the dummy must carry the host token's real exp so Codex does
# not drop to the sign-in screen after an artificial TTL while
# egress still holds a valid bearer.
host_exp = 2000000000
self._write({
"auth_mode": "chatgpt",
"tokens": {
"access_token": _jwt(host_exp),
"id_token": _jwt(host_exp),
"refresh_token": "hidden",
},
})
dummy = json.loads(codex_dummy_auth_json(
{"CODEX_HOME": str(self.home)},
now=datetime(2026, 1, 1, tzinfo=timezone.utc),
))
self.assertEqual(
host_exp, _jwt_payload(dummy["tokens"]["access_token"])["exp"],
)
self.assertEqual(
host_exp, _jwt_payload(dummy["tokens"]["id_token"])["exp"],
)
def test_dummy_auth_keeps_required_account_claim_shape(self):
def jwt(payload: dict) -> str:
def enc(obj: dict) -> str:
raw = json.dumps(obj, separators=(",", ":")).encode()
return base64.urlsafe_b64encode(raw).decode().rstrip("=")
return f"{enc({'alg': 'none'})}.{enc(payload)}.sig"
self._write({
"auth_mode": "chatgpt",
"tokens": {
"access_token": jwt({
"exp": 2000000000,
"https://api.openai.com/auth": {
"chatgpt_plan_type": "plus",
"chatgpt_account_id": "acct-real",
"chatgpt_user_id": "user-real",
"user_id": "auth-user-real",
"localhost": True,
},
"https://api.openai.com/profile": {
"email": "real@example.invalid",
"email_verified": True,
},
}),
"id_token": jwt({
"exp": 2000000000,
"email": "real@example.invalid",
"email_verified": True,
"https://api.openai.com/auth": {
"chatgpt_plan_type": "plus",
"chatgpt_account_id": "acct-real",
},
}),
"refresh_token": "hidden",
},
})
dummy = json.loads(codex_dummy_auth_json(
{"CODEX_HOME": str(self.home)},
now=datetime(2026, 1, 1, tzinfo=timezone.utc),
))
access_payload = _jwt_payload(dummy["tokens"]["access_token"])
auth = access_payload["https://api.openai.com/auth"]
profile = access_payload["https://api.openai.com/profile"]
self.assertEqual("plus", auth["chatgpt_plan_type"])
self.assertEqual("acct-real", auth["chatgpt_account_id"])
self.assertEqual("bot-bottle-placeholder", auth["chatgpt_user_id"])
self.assertEqual("bot-bottle@example.invalid", profile["email"])
self.assertTrue(profile["email_verified"])
if __name__ == "__main__":
unittest.main()