feat(claude): add forward_host_credentials support
lint / lint (push) Successful in 2m19s
test / unit (pull_request) Successful in 1m2s
test / integration (pull_request) Successful in 22s
test / coverage (pull_request) Successful in 1m14s

Reads the host's Claude OAuth session key from ~/.claude.json at launch
and forwards it only to the egress sidecar (never to the agent), placing
a placeholder CLAUDE_CODE_OAUTH_TOKEN in the agent env so Claude Code
starts without seeing the real credential.

Mirrors the existing Codex forward_host_credentials flow (PRD 0029).
Adds claude_auth.py to extract and validate the sessionKey, a
CLAUDE_HOST_CREDENTIAL_TOKEN_REF constant in egress.py, and updates
manifest_agent.py to allow the flag for both 'codex' and 'claude'
templates. Also adds a mutual-exclusion check that rejects setting
both auth_token and forward_host_credentials together.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-07-01 21:14:37 +00:00
parent 71699b3ecd
commit f0d27863c2
10 changed files with 423 additions and 13 deletions
+64 -1
View File
@@ -9,11 +9,15 @@ import unittest
from pathlib import Path
from bot_bottle.agent_provider import (
CLAUDE_HOST_CREDENTIAL_HOSTS,
CODEX_HOST_CREDENTIAL_HOSTS,
build_agent_provision_plan,
prompt_args,
)
from bot_bottle.egress import CODEX_HOST_CREDENTIAL_TOKEN_REF
from bot_bottle.egress import (
CLAUDE_HOST_CREDENTIAL_TOKEN_REF,
CODEX_HOST_CREDENTIAL_TOKEN_REF,
)
def _jwt(exp: int) -> str:
@@ -289,6 +293,65 @@ class TestAgentProviderRuntime(unittest.TestCase):
)
self.assertEqual({}, plan.provisioned_env)
def test_claude_forward_host_credentials_populates_egress_route(self):
session_key = "sk-ant-oat01-test-key"
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
home = Path(tmp) / "host-claude"
home.mkdir()
(home / ".claude.json").write_text(json.dumps({
"oauthAccount": {"sessionKey": session_key},
}))
plan = build_agent_provision_plan(
template="claude",
dockerfile="",
state_dir=Path(tmp),
instance_name="bot-bottle-test",
prompt_file=Path(tmp) / "prompt.txt",
forward_host_credentials=True,
host_env={"HOME": str(home)},
)
self.assertEqual(1, len(plan.egress_routes))
route = plan.egress_routes[0]
self.assertIn(route.host, CLAUDE_HOST_CREDENTIAL_HOSTS)
self.assertEqual("Bearer", route.auth_scheme)
self.assertEqual(CLAUDE_HOST_CREDENTIAL_TOKEN_REF, route.token_ref)
self.assertEqual("egress-placeholder", plan.env_vars["CLAUDE_CODE_OAUTH_TOKEN"])
self.assertEqual(frozenset({"CLAUDE_CODE_OAUTH_TOKEN"}), plan.hidden_env_names)
def test_claude_forward_host_credentials_populates_provisioned_env(self):
session_key = "sk-ant-oat01-test-key"
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
home = Path(tmp) / "host-claude"
home.mkdir()
(home / ".claude.json").write_text(json.dumps({
"oauthAccount": {"sessionKey": session_key},
}))
plan = build_agent_provision_plan(
template="claude",
dockerfile="",
state_dir=Path(tmp),
instance_name="bot-bottle-test",
prompt_file=Path(tmp) / "prompt.txt",
forward_host_credentials=True,
host_env={"HOME": str(home)},
)
self.assertEqual(
{CLAUDE_HOST_CREDENTIAL_TOKEN_REF: session_key},
plan.provisioned_env,
)
def test_claude_without_forward_host_credentials_has_empty_provisioned_env(self):
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
plan = build_agent_provision_plan(
template="claude",
dockerfile="",
state_dir=Path(tmp),
instance_name="bot-bottle-test",
prompt_file=Path(tmp) / "prompt.txt",
forward_host_credentials=False,
)
self.assertEqual({}, plan.provisioned_env)
def test_pi_plan_writes_default_ollama_models(self):
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
plan = build_agent_provision_plan(
+106
View File
@@ -0,0 +1,106 @@
"""Unit: host Claude auth extraction."""
from __future__ import annotations
import json
import tempfile
import unittest
from datetime import datetime, timezone
from pathlib import Path
from bot_bottle.contrib.claude.claude_auth import (
claude_auth_path,
claude_host_access_token,
)
from bot_bottle.log import Die
class TestClaudeHostAccessToken(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.TemporaryDirectory(prefix="bb-claude-auth.")
self.home = Path(self.tmp.name)
self.auth_path = self.home / ".claude.json"
def tearDown(self):
self.tmp.cleanup()
def _write(self, payload: dict) -> None: # type: ignore
self.auth_path.write_text(json.dumps(payload))
def test_auth_path_uses_home_env(self):
self.assertEqual(
self.auth_path,
claude_auth_path({"HOME": str(self.home)}),
)
def test_returns_session_key(self):
key = "sk-ant-oat01-real-key"
self._write({"oauthAccount": {"sessionKey": key}})
out = claude_host_access_token({"HOME": str(self.home)})
self.assertEqual(key, out)
def test_missing_auth_file_dies(self):
with self.assertRaises(Die):
claude_host_access_token({"HOME": str(self.home)})
def test_missing_oauth_account_dies(self):
self._write({"hasCompletedOnboarding": True})
with self.assertRaises(Die):
claude_host_access_token({"HOME": str(self.home)})
def test_missing_session_key_dies(self):
self._write({"oauthAccount": {"expiresAt": 2000000000}})
with self.assertRaises(Die):
claude_host_access_token({"HOME": str(self.home)})
def test_empty_session_key_dies(self):
self._write({"oauthAccount": {"sessionKey": ""}})
with self.assertRaises(Die):
claude_host_access_token({"HOME": str(self.home)})
def test_expired_token_dies(self):
self._write({
"oauthAccount": {
"sessionKey": "sk-ant-oat01-x",
"expiresAt": 1000,
},
})
with self.assertRaises(Die):
claude_host_access_token(
{"HOME": str(self.home)},
now=datetime(2026, 1, 1, tzinfo=timezone.utc),
)
def test_future_expiry_is_accepted(self):
key = "sk-ant-oat01-y"
self._write({
"oauthAccount": {
"sessionKey": key,
"expiresAt": 2000000000,
},
})
out = claude_host_access_token(
{"HOME": str(self.home)},
now=datetime(2026, 1, 1, tzinfo=timezone.utc),
)
self.assertEqual(key, out)
def test_absent_expiry_field_is_accepted(self):
key = "sk-ant-oat01-z"
self._write({"oauthAccount": {"sessionKey": key}})
out = claude_host_access_token({"HOME": str(self.home)})
self.assertEqual(key, out)
def test_non_json_file_dies(self):
self.auth_path.write_text("not json {{{")
with self.assertRaises(Die):
claude_host_access_token({"HOME": str(self.home)})
def test_json_array_root_dies(self):
self.auth_path.write_text("[]")
with self.assertRaises(Die):
claude_host_access_token({"HOME": str(self.home)})
if __name__ == "__main__":
unittest.main()
+9 -1
View File
@@ -80,11 +80,19 @@ class TestAgentProviderHostCredentials(unittest.TestCase):
"forward_host_credentials": "yes",
})
def test_forward_host_credentials_rejected_for_claude(self):
def test_forward_host_credentials_allowed_for_claude(self):
b = _provider_config_bottle({
"template": "claude",
"forward_host_credentials": True,
})
self.assertTrue(b.agent_provider.forward_host_credentials)
def test_forward_host_credentials_and_auth_token_rejected_together(self):
with self.assertRaises(ManifestError):
_provider_config_bottle({
"template": "claude",
"forward_host_credentials": True,
"auth_token": "SOME_TOKEN",
})
def test_auth_token_defaults_empty(self):
+14 -2
View File
@@ -82,10 +82,22 @@ class TestAgentProviderValidation(unittest.TestCase):
"b", {"forward_host_credentials": True, "template": "weird"}
)
def test_forward_creds_non_codex_template(self) -> None:
def test_forward_creds_pi_template_rejected(self) -> None:
with self.assertRaises(ManifestError):
ManifestAgentProvider.from_dict(
"b", {"forward_host_credentials": True, "template": "claude"}
"b", {"forward_host_credentials": True, "template": "pi"}
)
def test_forward_creds_claude_allowed(self) -> None:
p = ManifestAgentProvider.from_dict(
"b", {"forward_host_credentials": True, "template": "claude"}
)
self.assertTrue(p.forward_host_credentials)
def test_forward_creds_and_auth_token_rejected(self) -> None:
with self.assertRaises(ManifestError):
ManifestAgentProvider.from_dict(
"b", {"forward_host_credentials": True, "auth_token": "T", "template": "claude"}
)
def test_valid_claude_auth_token(self) -> None: