68e5097534
Debugging a live codex smolmachines bottle surfaced three independent
failures past the sign-in screen; fix each so forward_host_credentials
works end to end:
- codex_auth: dummy access/id tokens now inherit the *real* host token's
exp instead of now+1h. Codex (0.135) refreshes when its local token's
JWT exp lapses; with a placeholder refresh_token that refresh fails and
drops to the sign-in screen. Aligning exp tracks the real token's life.
- prepare: set CODEX_CA_CERTIFICATE to the agent CA bundle for codex
bottles. Codex is rustls and ignores the system store / NODE_EXTRA_CA_
CERTS; it reads CODEX_CA_CERTIFICATE (fallback SSL_CERT_FILE) for custom
roots across HTTPS + wss, so it must be pointed at the egress MITM CA or
injection can't work without tls_passthrough.
- pipelock: auto tls_passthrough the Codex API hosts when
forward_host_credentials is on. Egress injects the bearer before
pipelock, whose header DLP then flags the JWT ("request header contains
secret") and the retry storm trips its 429. passthrough host-gates the
CONNECT but skips decrypt+rescan of egress-owned auth. The auto-added
routes aren't in bottle.egress.routes, so the hosts are added explicitly.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
208 lines
7.4 KiB
Python
208 lines
7.4 KiB
Python
"""Unit: host Codex auth extraction."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import json
|
|
import tempfile
|
|
import unittest
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
from bot_bottle.codex_auth import (
|
|
codex_auth_path,
|
|
codex_dummy_auth_json,
|
|
codex_host_access_token,
|
|
)
|
|
from bot_bottle.log import Die
|
|
|
|
|
|
def _jwt(exp: int) -> str:
|
|
def enc(obj: dict) -> str:
|
|
raw = json.dumps(obj, separators=(",", ":")).encode()
|
|
return base64.urlsafe_b64encode(raw).decode().rstrip("=")
|
|
return f"{enc({'alg': 'none'})}.{enc({'exp': exp})}.sig"
|
|
|
|
|
|
def _jwt_payload(token: str) -> dict:
|
|
payload = token.split(".")[1]
|
|
payload += "=" * (-len(payload) % 4)
|
|
return json.loads(base64.urlsafe_b64decode(payload.encode()).decode())
|
|
|
|
|
|
class TestCodexHostAccessToken(unittest.TestCase):
|
|
def setUp(self):
|
|
self.tmp = tempfile.TemporaryDirectory(prefix="cb-codex-auth.")
|
|
self.home = Path(self.tmp.name)
|
|
self.auth_path = self.home / "auth.json"
|
|
|
|
def tearDown(self):
|
|
self.tmp.cleanup()
|
|
|
|
def _write(self, payload: dict) -> None:
|
|
self.auth_path.write_text(json.dumps(payload))
|
|
|
|
def test_auth_path_uses_codex_home(self):
|
|
self.assertEqual(
|
|
self.auth_path,
|
|
codex_auth_path({"CODEX_HOME": str(self.home)}),
|
|
)
|
|
|
|
def test_returns_fresh_chatgpt_access_token(self):
|
|
token = _jwt(2000000000)
|
|
self._write({
|
|
"auth_mode": "chatgpt",
|
|
"tokens": {"access_token": token, "refresh_token": "hidden"},
|
|
})
|
|
out = codex_host_access_token(
|
|
{"CODEX_HOME": str(self.home)},
|
|
now=datetime(2026, 1, 1, tzinfo=timezone.utc),
|
|
)
|
|
self.assertEqual(token, out)
|
|
|
|
def test_missing_auth_file_dies(self):
|
|
with self.assertRaises(Die):
|
|
codex_host_access_token({"CODEX_HOME": str(self.home)})
|
|
|
|
def test_non_chatgpt_auth_dies(self):
|
|
self._write({"auth_mode": "api_key", "tokens": {"access_token": _jwt(2)}})
|
|
with self.assertRaises(Die):
|
|
codex_host_access_token({"CODEX_HOME": str(self.home)})
|
|
|
|
def test_user_auth_mode_is_allowed(self):
|
|
token = _jwt(2000000000)
|
|
self._write({"auth_mode": "user", "tokens": {"access_token": token}})
|
|
out = codex_host_access_token(
|
|
{"CODEX_HOME": str(self.home)},
|
|
now=datetime(2026, 1, 1, tzinfo=timezone.utc),
|
|
)
|
|
self.assertEqual(token, out)
|
|
|
|
def test_expired_token_dies(self):
|
|
self._write({
|
|
"auth_mode": "chatgpt",
|
|
"tokens": {"access_token": _jwt(1000)},
|
|
})
|
|
with self.assertRaises(Die):
|
|
codex_host_access_token(
|
|
{"CODEX_HOME": str(self.home)},
|
|
now=datetime(2026, 1, 1, tzinfo=timezone.utc),
|
|
)
|
|
|
|
def test_non_jwt_token_dies(self):
|
|
self._write({
|
|
"auth_mode": "chatgpt",
|
|
"tokens": {"access_token": "not-a-jwt"},
|
|
})
|
|
with self.assertRaises(Die):
|
|
codex_host_access_token({"CODEX_HOME": str(self.home)})
|
|
|
|
def test_dummy_auth_preserves_mode_and_redacts_tokens(self):
|
|
access = _jwt(2000000000)
|
|
refresh = "host-refresh-token"
|
|
self._write({
|
|
"auth_mode": "chatgpt",
|
|
"OPENAI_API_KEY": None,
|
|
"tokens": {
|
|
"access_token": access,
|
|
"id_token": _jwt(2000000000),
|
|
"refresh_token": refresh,
|
|
"account_id": "acct-host",
|
|
},
|
|
"last_refresh": "2026-05-29T00:00:00.000Z",
|
|
})
|
|
dummy = json.loads(codex_dummy_auth_json(
|
|
{"CODEX_HOME": str(self.home)},
|
|
now=datetime(2026, 1, 1, tzinfo=timezone.utc),
|
|
))
|
|
self.assertEqual("chatgpt", dummy["auth_mode"])
|
|
self.assertIsNone(dummy["OPENAI_API_KEY"])
|
|
self.assertNotEqual(access, dummy["tokens"]["access_token"])
|
|
self.assertNotEqual(refresh, dummy["tokens"]["refresh_token"])
|
|
self.assertEqual("bot-bottle-placeholder", dummy["tokens"]["refresh_token"])
|
|
self.assertEqual("acct-host", dummy["tokens"]["account_id"])
|
|
self.assertIsNotNone(
|
|
codex_host_access_token(
|
|
{"CODEX_HOME": str(self.home)},
|
|
now=datetime(2026, 1, 1, tzinfo=timezone.utc),
|
|
)
|
|
)
|
|
|
|
def test_dummy_auth_tokens_inherit_host_token_exp(self):
|
|
# Codex refreshes when its local access token is at/past exp;
|
|
# the dummy must carry the host token's real exp so Codex does
|
|
# not drop to the sign-in screen after an artificial TTL while
|
|
# egress still holds a valid bearer.
|
|
host_exp = 2000000000
|
|
self._write({
|
|
"auth_mode": "chatgpt",
|
|
"tokens": {
|
|
"access_token": _jwt(host_exp),
|
|
"id_token": _jwt(host_exp),
|
|
"refresh_token": "hidden",
|
|
},
|
|
})
|
|
dummy = json.loads(codex_dummy_auth_json(
|
|
{"CODEX_HOME": str(self.home)},
|
|
now=datetime(2026, 1, 1, tzinfo=timezone.utc),
|
|
))
|
|
self.assertEqual(
|
|
host_exp, _jwt_payload(dummy["tokens"]["access_token"])["exp"],
|
|
)
|
|
self.assertEqual(
|
|
host_exp, _jwt_payload(dummy["tokens"]["id_token"])["exp"],
|
|
)
|
|
|
|
def test_dummy_auth_keeps_required_account_claim_shape(self):
|
|
def jwt(payload: dict) -> str:
|
|
def enc(obj: dict) -> str:
|
|
raw = json.dumps(obj, separators=(",", ":")).encode()
|
|
return base64.urlsafe_b64encode(raw).decode().rstrip("=")
|
|
return f"{enc({'alg': 'none'})}.{enc(payload)}.sig"
|
|
|
|
self._write({
|
|
"auth_mode": "chatgpt",
|
|
"tokens": {
|
|
"access_token": jwt({
|
|
"exp": 2000000000,
|
|
"https://api.openai.com/auth": {
|
|
"chatgpt_plan_type": "plus",
|
|
"chatgpt_account_id": "acct-real",
|
|
"chatgpt_user_id": "user-real",
|
|
"user_id": "auth-user-real",
|
|
"localhost": True,
|
|
},
|
|
"https://api.openai.com/profile": {
|
|
"email": "real@example.invalid",
|
|
"email_verified": True,
|
|
},
|
|
}),
|
|
"id_token": jwt({
|
|
"exp": 2000000000,
|
|
"email": "real@example.invalid",
|
|
"email_verified": True,
|
|
"https://api.openai.com/auth": {
|
|
"chatgpt_plan_type": "plus",
|
|
"chatgpt_account_id": "acct-real",
|
|
},
|
|
}),
|
|
"refresh_token": "hidden",
|
|
},
|
|
})
|
|
dummy = json.loads(codex_dummy_auth_json(
|
|
{"CODEX_HOME": str(self.home)},
|
|
now=datetime(2026, 1, 1, tzinfo=timezone.utc),
|
|
))
|
|
access_payload = _jwt_payload(dummy["tokens"]["access_token"])
|
|
auth = access_payload["https://api.openai.com/auth"]
|
|
profile = access_payload["https://api.openai.com/profile"]
|
|
self.assertEqual("plus", auth["chatgpt_plan_type"])
|
|
self.assertEqual("acct-real", auth["chatgpt_account_id"])
|
|
self.assertEqual("bot-bottle-placeholder", auth["chatgpt_user_id"])
|
|
self.assertEqual("bot-bottle@example.invalid", profile["email"])
|
|
self.assertTrue(profile["email_verified"])
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|