fix(codex): include account claims in dummy auth
This commit is contained in:
@@ -108,15 +108,108 @@ def _dummy_jwt(now: datetime | None = None) -> str:
|
|||||||
check_now = now or datetime.now(timezone.utc)
|
check_now = now or datetime.now(timezone.utc)
|
||||||
exp = int(check_now.timestamp()) + 3600
|
exp = int(check_now.timestamp()) + 3600
|
||||||
|
|
||||||
|
return _encode_dummy_jwt({
|
||||||
|
"exp": exp,
|
||||||
|
"sub": "bot-bottle-placeholder",
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
|
def _dummy_jwt_from_host(value: object, *, now: datetime | None = None) -> str:
|
||||||
|
if not isinstance(value, str):
|
||||||
|
return _dummy_jwt(now)
|
||||||
|
parts = value.split(".")
|
||||||
|
if len(parts) < 2:
|
||||||
|
return _dummy_jwt(now)
|
||||||
|
try:
|
||||||
|
payload = json.loads(_b64url_decode(parts[1]))
|
||||||
|
except (ValueError, json.JSONDecodeError):
|
||||||
|
return _dummy_jwt(now)
|
||||||
|
if not isinstance(payload, dict):
|
||||||
|
return _dummy_jwt(now)
|
||||||
|
return _encode_dummy_jwt(_redact_jwt_payload(payload, now=now))
|
||||||
|
|
||||||
|
|
||||||
|
def _encode_dummy_jwt(payload: dict) -> str:
|
||||||
def enc(obj: dict) -> str:
|
def enc(obj: dict) -> str:
|
||||||
raw = json.dumps(obj, separators=(",", ":")).encode()
|
raw = json.dumps(obj, separators=(",", ":")).encode()
|
||||||
return base64.urlsafe_b64encode(raw).decode().rstrip("=")
|
return base64.urlsafe_b64encode(raw).decode().rstrip("=")
|
||||||
|
|
||||||
return (
|
return f"{enc({'alg': 'none', 'typ': 'JWT'})}.{enc(payload)}.placeholder"
|
||||||
f"{enc({'alg': 'none', 'typ': 'JWT'})}."
|
|
||||||
f"{enc({'exp': exp, 'sub': 'bot-bottle-placeholder'})}."
|
|
||||||
"placeholder"
|
def _redact_jwt_payload(
|
||||||
)
|
payload: dict,
|
||||||
|
*,
|
||||||
|
now: datetime | None = None,
|
||||||
|
) -> dict:
|
||||||
|
check_now = now or datetime.now(timezone.utc)
|
||||||
|
out = _redact_claims(payload)
|
||||||
|
if not isinstance(out, dict):
|
||||||
|
out = {}
|
||||||
|
out["exp"] = int(check_now.timestamp()) + 3600
|
||||||
|
out.setdefault("sub", "bot-bottle-placeholder")
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def _redact_claims(value: object) -> object:
|
||||||
|
if isinstance(value, dict):
|
||||||
|
out: dict[str, object] = {}
|
||||||
|
for key, inner in value.items():
|
||||||
|
lower = key.lower()
|
||||||
|
if key == "https://api.openai.com/profile":
|
||||||
|
out[key] = _redact_profile_claim(inner)
|
||||||
|
elif key == "https://api.openai.com/auth":
|
||||||
|
out[key] = _redact_auth_claim(inner)
|
||||||
|
elif lower == "email":
|
||||||
|
out[key] = "bot-bottle@example.invalid"
|
||||||
|
elif lower == "email_verified":
|
||||||
|
out[key] = True
|
||||||
|
elif lower in {"exp", "iat", "nbf", "auth_time", "pwd_auth_time"}:
|
||||||
|
out[key] = inner if isinstance(inner, (int, float)) else 0
|
||||||
|
elif lower in {"aud", "scp", "amr"}:
|
||||||
|
out[key] = inner if isinstance(inner, list) else []
|
||||||
|
elif isinstance(inner, bool):
|
||||||
|
out[key] = inner
|
||||||
|
elif isinstance(inner, (dict, list)):
|
||||||
|
out[key] = _redact_claims(inner)
|
||||||
|
else:
|
||||||
|
out[key] = "bot-bottle-placeholder"
|
||||||
|
return out
|
||||||
|
if isinstance(value, list):
|
||||||
|
return []
|
||||||
|
return "bot-bottle-placeholder"
|
||||||
|
|
||||||
|
|
||||||
|
def _redact_profile_claim(value: object) -> dict:
|
||||||
|
profile = value if isinstance(value, dict) else {}
|
||||||
|
return {
|
||||||
|
"email": "bot-bottle@example.invalid",
|
||||||
|
"email_verified": bool(profile.get("email_verified", True)),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _redact_auth_claim(value: object) -> dict:
|
||||||
|
auth = value if isinstance(value, dict) else {}
|
||||||
|
out: dict[str, object] = {}
|
||||||
|
for key, inner in auth.items():
|
||||||
|
lower = key.lower()
|
||||||
|
if lower == "chatgpt_plan_type" and isinstance(inner, str) and inner:
|
||||||
|
out[key] = inner
|
||||||
|
elif lower == "localhost" and isinstance(inner, bool):
|
||||||
|
out[key] = inner
|
||||||
|
elif isinstance(inner, bool):
|
||||||
|
out[key] = inner
|
||||||
|
elif isinstance(inner, list):
|
||||||
|
out[key] = []
|
||||||
|
elif isinstance(inner, dict):
|
||||||
|
out[key] = {}
|
||||||
|
else:
|
||||||
|
out[key] = "bot-bottle-placeholder"
|
||||||
|
out.setdefault("chatgpt_plan_type", "unknown")
|
||||||
|
out.setdefault("user_id", "bot-bottle-placeholder")
|
||||||
|
out.setdefault("chatgpt_user_id", "bot-bottle-placeholder")
|
||||||
|
out.setdefault("chatgpt_account_id", "bot-bottle-placeholder")
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
def _redact_codex_auth(value: object, *, now: datetime | None = None) -> object:
|
def _redact_codex_auth(value: object, *, now: datetime | None = None) -> object:
|
||||||
@@ -129,7 +222,7 @@ def _redact_codex_auth(value: object, *, now: datetime | None = None) -> object:
|
|||||||
elif lower == "tokens":
|
elif lower == "tokens":
|
||||||
out[key] = _redact_codex_auth(inner, now=now)
|
out[key] = _redact_codex_auth(inner, now=now)
|
||||||
elif lower in {"access_token", "id_token"}:
|
elif lower in {"access_token", "id_token"}:
|
||||||
out[key] = _dummy_jwt(now)
|
out[key] = _dummy_jwt_from_host(inner, now=now)
|
||||||
elif "token" in lower or "secret" in lower or lower.endswith("_key"):
|
elif "token" in lower or "secret" in lower or lower.endswith("_key"):
|
||||||
out[key] = "bot-bottle-placeholder"
|
out[key] = "bot-bottle-placeholder"
|
||||||
elif lower in {"account_id", "user_id", "email"}:
|
elif lower in {"account_id", "user_id", "email"}:
|
||||||
|
|||||||
@@ -24,6 +24,12 @@ def _jwt(exp: int) -> str:
|
|||||||
return f"{enc({'alg': 'none'})}.{enc({'exp': exp})}.sig"
|
return f"{enc({'alg': 'none'})}.{enc({'exp': exp})}.sig"
|
||||||
|
|
||||||
|
|
||||||
|
def _jwt_payload(token: str) -> dict:
|
||||||
|
payload = token.split(".")[1]
|
||||||
|
payload += "=" * (-len(payload) % 4)
|
||||||
|
return json.loads(base64.urlsafe_b64decode(payload.encode()).decode())
|
||||||
|
|
||||||
|
|
||||||
class TestCodexHostAccessToken(unittest.TestCase):
|
class TestCodexHostAccessToken(unittest.TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.tmp = tempfile.TemporaryDirectory(prefix="cb-codex-auth.")
|
self.tmp = tempfile.TemporaryDirectory(prefix="cb-codex-auth.")
|
||||||
@@ -122,6 +128,55 @@ class TestCodexHostAccessToken(unittest.TestCase):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def test_dummy_auth_keeps_required_account_claim_shape(self):
|
||||||
|
def jwt(payload: dict) -> str:
|
||||||
|
def enc(obj: dict) -> str:
|
||||||
|
raw = json.dumps(obj, separators=(",", ":")).encode()
|
||||||
|
return base64.urlsafe_b64encode(raw).decode().rstrip("=")
|
||||||
|
return f"{enc({'alg': 'none'})}.{enc(payload)}.sig"
|
||||||
|
|
||||||
|
self._write({
|
||||||
|
"auth_mode": "chatgpt",
|
||||||
|
"tokens": {
|
||||||
|
"access_token": jwt({
|
||||||
|
"exp": 2000000000,
|
||||||
|
"https://api.openai.com/auth": {
|
||||||
|
"chatgpt_plan_type": "plus",
|
||||||
|
"chatgpt_account_id": "acct-real",
|
||||||
|
"chatgpt_user_id": "user-real",
|
||||||
|
"user_id": "auth-user-real",
|
||||||
|
"localhost": True,
|
||||||
|
},
|
||||||
|
"https://api.openai.com/profile": {
|
||||||
|
"email": "real@example.invalid",
|
||||||
|
"email_verified": True,
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
"id_token": jwt({
|
||||||
|
"exp": 2000000000,
|
||||||
|
"email": "real@example.invalid",
|
||||||
|
"email_verified": True,
|
||||||
|
"https://api.openai.com/auth": {
|
||||||
|
"chatgpt_plan_type": "plus",
|
||||||
|
"chatgpt_account_id": "acct-real",
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
"refresh_token": "hidden",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
dummy = json.loads(codex_dummy_auth_json(
|
||||||
|
{"CODEX_HOME": str(self.home)},
|
||||||
|
now=datetime(2026, 1, 1, tzinfo=timezone.utc),
|
||||||
|
))
|
||||||
|
access_payload = _jwt_payload(dummy["tokens"]["access_token"])
|
||||||
|
auth = access_payload["https://api.openai.com/auth"]
|
||||||
|
profile = access_payload["https://api.openai.com/profile"]
|
||||||
|
self.assertEqual("plus", auth["chatgpt_plan_type"])
|
||||||
|
self.assertEqual("bot-bottle-placeholder", auth["chatgpt_account_id"])
|
||||||
|
self.assertEqual("bot-bottle-placeholder", auth["chatgpt_user_id"])
|
||||||
|
self.assertEqual("bot-bottle@example.invalid", profile["email"])
|
||||||
|
self.assertTrue(profile["email_verified"])
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|||||||
Reference in New Issue
Block a user