PRD 0010: Credential proxy for agent-bound API tokens #14
+165
-1
@@ -7,6 +7,7 @@ Schema (see CLAUDE.md "Intended design"):
|
||||
"<bottle-name>": {
|
||||
"env": { "<NAME>": <env-entry>, ... },
|
||||
"git": [ <git-entry>, ... ],
|
||||
"tokens": [ <token-entry>, ... ],
|
||||
"egress": { "allowlist": [ "<hostname>", ... ] }
|
||||
}
|
||||
},
|
||||
@@ -113,6 +114,94 @@ class GitEntry:
|
||||
)
|
||||
|
||||
|
||||
TOKEN_KINDS = ("anthropic", "github", "gitea", "npm")
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class TokenEntry:
|
||||
"""One credential the per-bottle cred-proxy sidecar (PRD 0010)
|
||||
holds and injects on the agent's behalf.
|
||||
|
||||
`Kind` selects the route handler: `anthropic` / `github` / `npm`
|
||||
have fixed upstream URLs; `gitea` requires an explicit `Url`
|
||||
because the upstream is per-instance.
|
||||
|
||||
`TokenRef` is the name of the host env var the CLI resolves at
|
||||
launch time. The value is forwarded into the cred-proxy
|
||||
container's environ via `docker run -e NAME` — never onto argv,
|
||||
never into a file. The value does NOT land in the agent's
|
||||
environ.
|
||||
|
||||
`UpstreamHost` is parsed from `Url` for `gitea` entries (or the
|
||||
documented default for the other kinds). It exists so the
|
||||
cross-validator can spot collisions with `bottle.git` upstreams
|
||||
without re-parsing URLs at every call site."""
|
||||
|
||||
Kind: str
|
||||
TokenRef: str
|
||||
Url: str = ""
|
||||
UpstreamHost: str = ""
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, bottle_name: str, idx: int, raw: object) -> "TokenEntry":
|
||||
d = _as_json_object(raw, f"bottle '{bottle_name}' tokens[{idx}]")
|
||||
kind = d.get("Kind")
|
||||
if not isinstance(kind, str) or not kind:
|
||||
die(
|
||||
f"bottle '{bottle_name}' tokens[{idx}] missing required string field "
|
||||
f"'Kind'"
|
||||
)
|
||||
if kind not in TOKEN_KINDS:
|
||||
die(
|
||||
f"bottle '{bottle_name}' tokens[{idx}] Kind {kind!r} is not one of "
|
||||
f"{', '.join(TOKEN_KINDS)}"
|
||||
)
|
||||
token_ref = d.get("TokenRef")
|
||||
if not isinstance(token_ref, str) or not token_ref:
|
||||
die(
|
||||
f"bottle '{bottle_name}' tokens[{idx}] ({kind}) missing required "
|
||||
f"string field 'TokenRef' (name of the host env var to forward)"
|
||||
)
|
||||
url_raw = d.get("Url")
|
||||
if url_raw is None:
|
||||
url = ""
|
||||
elif isinstance(url_raw, str):
|
||||
url = url_raw
|
||||
else:
|
||||
die(
|
||||
f"bottle '{bottle_name}' tokens[{idx}] ({kind}) Url must be a string "
|
||||
f"(was {type(url_raw).__name__})"
|
||||
)
|
||||
if kind == "gitea":
|
||||
if not url:
|
||||
die(
|
||||
f"bottle '{bottle_name}' tokens[{idx}] (gitea) requires a Url "
|
||||
f"(the Gitea instance, e.g. https://gitea.dideric.is)"
|
||||
)
|
||||
host = _parse_https_host(
|
||||
url, f"bottle '{bottle_name}' tokens[{idx}] (gitea) Url"
|
||||
)
|
||||
else:
|
||||
if url:
|
||||
die(
|
||||
f"bottle '{bottle_name}' tokens[{idx}] ({kind}) cannot set Url; "
|
||||
f"the upstream for this Kind is fixed by cred-proxy. Drop the "
|
||||
f"'Url' field."
|
||||
)
|
||||
host = _TOKEN_DEFAULT_HOST[kind]
|
||||
return cls(Kind=kind, TokenRef=token_ref, Url=url, UpstreamHost=host)
|
||||
|
||||
|
||||
# Hostnames the cred-proxy talks to upstream for the non-gitea kinds.
|
||||
# Used both for the proxy's route table and for the manifest cross-
|
||||
# validator that rejects overlap with `bottle.git`.
|
||||
_TOKEN_DEFAULT_HOST: dict[str, str] = {
|
||||
"anthropic": "api.anthropic.com",
|
||||
"github": "github.com",
|
||||
"npm": "registry.npmjs.org",
|
||||
}
|
||||
|
||||
|
||||
DLP_ACTIONS = ("block", "warn")
|
||||
|
||||
|
||||
@@ -168,6 +257,7 @@ class BottleEgress:
|
||||
class Bottle:
|
||||
env: Mapping[str, str] = field(default_factory=_empty_str_dict)
|
||||
git: tuple[GitEntry, ...] = ()
|
||||
tokens: tuple[TokenEntry, ...] = ()
|
||||
egress: BottleEgress = field(default_factory=BottleEgress)
|
||||
|
||||
@classmethod
|
||||
@@ -215,6 +305,21 @@ class Bottle:
|
||||
)
|
||||
_validate_unique_git_names(name, git)
|
||||
|
||||
tokens: tuple[TokenEntry, ...] = ()
|
||||
tokens_raw = d.get("tokens")
|
||||
if tokens_raw is not None:
|
||||
if not isinstance(tokens_raw, list):
|
||||
die(
|
||||
f"bottle '{name}' tokens must be an array "
|
||||
f"(was {type(tokens_raw).__name__})"
|
||||
)
|
||||
tokens_list = cast(list[object], tokens_raw)
|
||||
tokens = tuple(
|
||||
TokenEntry.from_dict(name, i, entry)
|
||||
for i, entry in enumerate(tokens_list)
|
||||
)
|
||||
_validate_tokens(name, tokens, git)
|
||||
|
||||
egress_raw = d.get("egress")
|
||||
egress = (
|
||||
BottleEgress.from_dict(name, egress_raw)
|
||||
@@ -222,7 +327,7 @@ class Bottle:
|
||||
else BottleEgress()
|
||||
)
|
||||
|
||||
return cls(env=env, git=git, egress=egress)
|
||||
return cls(env=env, git=git, tokens=tokens, egress=egress)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
@@ -441,6 +546,65 @@ def _parse_git_upstream(url: str, label: str) -> tuple[str, str, str, str]:
|
||||
return (user, host, port, path)
|
||||
|
||||
|
||||
def _parse_https_host(url: str, label: str) -> str:
|
||||
"""Extract the host from an `https://host[:port][/path]` URL.
|
||||
Dies if `url` is not an https:// URL or the host segment is empty.
|
||||
Used to derive `TokenEntry.UpstreamHost` from a gitea Url so the
|
||||
cross-validator can spot collisions with `bottle.git` hosts."""
|
||||
if not url.startswith("https://"):
|
||||
die(f"{label} must be an https:// URL (was {url!r})")
|
||||
rest = url[len("https://"):]
|
||||
hostport, _, _ = rest.partition("/")
|
||||
host, _, _port = hostport.partition(":")
|
||||
if not host:
|
||||
die(f"{label} host is empty in {url!r}")
|
||||
return host
|
||||
|
||||
|
||||
def _validate_tokens(
|
||||
bottle_name: str,
|
||||
tokens: tuple[TokenEntry, ...],
|
||||
git: tuple[GitEntry, ...],
|
||||
) -> None:
|
||||
"""Cross-validation for `bottle.tokens`:
|
||||
|
||||
- At most one entry per Kind, except `gitea` which may have
|
||||
multiple entries (one per Gitea instance) with distinct Urls.
|
||||
- No overlap with `bottle.git` hosts: a `github` or `gitea` token
|
||||
whose host matches a `bottle.git` upstream host would put two
|
||||
credential brokers on the same remote (git-gate's gitleaks-
|
||||
scanning gate AND cred-proxy's bearer injection). Pick one.
|
||||
"""
|
||||
by_kind: dict[str, list[TokenEntry]] = {}
|
||||
for t in tokens:
|
||||
by_kind.setdefault(t.Kind, []).append(t)
|
||||
for kind, entries in by_kind.items():
|
||||
if kind == "gitea":
|
||||
seen: dict[str, None] = {}
|
||||
for e in entries:
|
||||
if e.Url in seen:
|
||||
die(
|
||||
f"bottle '{bottle_name}' tokens has duplicate gitea Url "
|
||||
f"{e.Url!r}; one entry per Gitea instance."
|
||||
)
|
||||
seen[e.Url] = None
|
||||
elif len(entries) > 1:
|
||||
die(
|
||||
f"bottle '{bottle_name}' tokens has {len(entries)} entries with "
|
||||
f"Kind {kind!r}; at most one is allowed (gitea is the only Kind "
|
||||
f"that may have multiple entries)."
|
||||
)
|
||||
|
||||
git_hosts = {g.UpstreamHost for g in git}
|
||||
for t in tokens:
|
||||
if t.Kind in ("github", "gitea") and t.UpstreamHost in git_hosts:
|
||||
die(
|
||||
f"bottle '{bottle_name}' token ({t.Kind}, host {t.UpstreamHost!r}) "
|
||||
f"overlaps a bottle.git upstream on the same host. git-gate already "
|
||||
f"brokers this remote; drop the token entry or remove the git entry."
|
||||
)
|
||||
|
||||
|
||||
def _validate_unique_git_names(bottle_name: str, git: tuple[GitEntry, ...]) -> None:
|
||||
seen: dict[str, None] = {}
|
||||
for g in git:
|
||||
|
||||
@@ -0,0 +1,191 @@
|
||||
"""Unit: Bottle.tokens manifest parsing + validation (PRD 0010)."""
|
||||
|
||||
import unittest
|
||||
|
||||
from claude_bottle.log import Die
|
||||
from claude_bottle.manifest import Manifest
|
||||
|
||||
|
||||
def _manifest(tokens, git=None):
|
||||
bottle: dict[str, object] = {"tokens": tokens}
|
||||
if git is not None:
|
||||
bottle["git"] = git
|
||||
return {
|
||||
"bottles": {"dev": bottle},
|
||||
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
|
||||
}
|
||||
|
||||
|
||||
class TestTokenEntryParsing(unittest.TestCase):
|
||||
def test_parses_anthropic_entry(self):
|
||||
m = Manifest.from_json_obj(_manifest([
|
||||
{"Kind": "anthropic", "TokenRef": "CLAUDE_BOTTLE_OAUTH_TOKEN"},
|
||||
]))
|
||||
entries = m.bottles["dev"].tokens
|
||||
self.assertEqual(1, len(entries))
|
||||
e = entries[0]
|
||||
self.assertEqual("anthropic", e.Kind)
|
||||
self.assertEqual("CLAUDE_BOTTLE_OAUTH_TOKEN", e.TokenRef)
|
||||
self.assertEqual("", e.Url)
|
||||
self.assertEqual("api.anthropic.com", e.UpstreamHost)
|
||||
|
||||
def test_parses_github_entry(self):
|
||||
m = Manifest.from_json_obj(_manifest([
|
||||
{"Kind": "github", "TokenRef": "GITHUB_TOKEN"},
|
||||
]))
|
||||
e = m.bottles["dev"].tokens[0]
|
||||
self.assertEqual("github", e.Kind)
|
||||
self.assertEqual("github.com", e.UpstreamHost)
|
||||
|
||||
def test_parses_npm_entry(self):
|
||||
m = Manifest.from_json_obj(_manifest([
|
||||
{"Kind": "npm", "TokenRef": "NPM_TOKEN"},
|
||||
]))
|
||||
e = m.bottles["dev"].tokens[0]
|
||||
self.assertEqual("registry.npmjs.org", e.UpstreamHost)
|
||||
|
||||
def test_parses_gitea_entry_with_url(self):
|
||||
m = Manifest.from_json_obj(_manifest([
|
||||
{"Kind": "gitea", "TokenRef": "GITEA_TOKEN",
|
||||
"Url": "https://gitea.dideric.is"},
|
||||
]))
|
||||
e = m.bottles["dev"].tokens[0]
|
||||
self.assertEqual("gitea", e.Kind)
|
||||
self.assertEqual("https://gitea.dideric.is", e.Url)
|
||||
self.assertEqual("gitea.dideric.is", e.UpstreamHost)
|
||||
|
||||
def test_gitea_url_with_port_strips_port_from_host(self):
|
||||
m = Manifest.from_json_obj(_manifest([
|
||||
{"Kind": "gitea", "TokenRef": "GITEA_TOKEN",
|
||||
"Url": "https://gitea.dideric.is:30009"},
|
||||
]))
|
||||
self.assertEqual("gitea.dideric.is", m.bottles["dev"].tokens[0].UpstreamHost)
|
||||
|
||||
|
||||
class TestTokenEntryValidation(unittest.TestCase):
|
||||
def test_unknown_kind_dies(self):
|
||||
with self.assertRaises(Die):
|
||||
Manifest.from_json_obj(_manifest([
|
||||
{"Kind": "aws", "TokenRef": "AWS_TOKEN"},
|
||||
]))
|
||||
|
||||
def test_missing_kind_dies(self):
|
||||
with self.assertRaises(Die):
|
||||
Manifest.from_json_obj(_manifest([
|
||||
{"TokenRef": "GITHUB_TOKEN"},
|
||||
]))
|
||||
|
||||
def test_missing_token_ref_dies(self):
|
||||
with self.assertRaises(Die):
|
||||
Manifest.from_json_obj(_manifest([
|
||||
{"Kind": "github"},
|
||||
]))
|
||||
|
||||
def test_gitea_without_url_dies(self):
|
||||
with self.assertRaises(Die):
|
||||
Manifest.from_json_obj(_manifest([
|
||||
{"Kind": "gitea", "TokenRef": "GITEA_TOKEN"},
|
||||
]))
|
||||
|
||||
def test_gitea_with_non_https_url_dies(self):
|
||||
with self.assertRaises(Die):
|
||||
Manifest.from_json_obj(_manifest([
|
||||
{"Kind": "gitea", "TokenRef": "GITEA_TOKEN",
|
||||
"Url": "http://gitea.dideric.is"},
|
||||
]))
|
||||
|
||||
def test_non_gitea_kind_with_url_dies(self):
|
||||
# Url is fixed for anthropic / github / npm — passing one is a
|
||||
# configuration smell, not an override knob.
|
||||
with self.assertRaises(Die):
|
||||
Manifest.from_json_obj(_manifest([
|
||||
{"Kind": "github", "TokenRef": "GITHUB_TOKEN",
|
||||
"Url": "https://api.example.com"},
|
||||
]))
|
||||
|
||||
def test_duplicate_non_gitea_kind_dies(self):
|
||||
with self.assertRaises(Die):
|
||||
Manifest.from_json_obj(_manifest([
|
||||
{"Kind": "github", "TokenRef": "A"},
|
||||
{"Kind": "github", "TokenRef": "B"},
|
||||
]))
|
||||
|
||||
def test_two_gitea_with_distinct_urls_ok(self):
|
||||
m = Manifest.from_json_obj(_manifest([
|
||||
{"Kind": "gitea", "TokenRef": "T1",
|
||||
"Url": "https://gitea.dideric.is"},
|
||||
{"Kind": "gitea", "TokenRef": "T2",
|
||||
"Url": "https://gitea.example.com"},
|
||||
]))
|
||||
self.assertEqual(2, len(m.bottles["dev"].tokens))
|
||||
|
||||
def test_two_gitea_with_same_url_dies(self):
|
||||
with self.assertRaises(Die):
|
||||
Manifest.from_json_obj(_manifest([
|
||||
{"Kind": "gitea", "TokenRef": "T1",
|
||||
"Url": "https://gitea.dideric.is"},
|
||||
{"Kind": "gitea", "TokenRef": "T2",
|
||||
"Url": "https://gitea.dideric.is"},
|
||||
]))
|
||||
|
||||
|
||||
class TestTokenGitOverlap(unittest.TestCase):
|
||||
def test_github_token_collides_with_github_git_entry(self):
|
||||
# bottle.git already brokers github.com via the gate; declaring
|
||||
# a github token on top would put two credential brokers on
|
||||
# the same remote.
|
||||
with self.assertRaises(Die):
|
||||
Manifest.from_json_obj(_manifest(
|
||||
tokens=[{"Kind": "github", "TokenRef": "GITHUB_TOKEN"}],
|
||||
git=[{
|
||||
"Name": "myrepo",
|
||||
"Upstream": "ssh://git@github.com/me/myrepo.git",
|
||||
"IdentityFile": "/dev/null",
|
||||
}],
|
||||
))
|
||||
|
||||
def test_gitea_token_collides_with_same_host_git_entry(self):
|
||||
with self.assertRaises(Die):
|
||||
Manifest.from_json_obj(_manifest(
|
||||
tokens=[{
|
||||
"Kind": "gitea", "TokenRef": "GITEA_TOKEN",
|
||||
"Url": "https://gitea.dideric.is",
|
||||
}],
|
||||
git=[{
|
||||
"Name": "myrepo",
|
||||
"Upstream": "ssh://git@gitea.dideric.is:30009/me/myrepo.git",
|
||||
"IdentityFile": "/dev/null",
|
||||
}],
|
||||
))
|
||||
|
||||
def test_anthropic_token_does_not_collide_with_git(self):
|
||||
# api.anthropic.com isn't a git host; no overlap possible.
|
||||
m = Manifest.from_json_obj(_manifest(
|
||||
tokens=[{"Kind": "anthropic", "TokenRef": "CLAUDE_BOTTLE_OAUTH_TOKEN"}],
|
||||
git=[{
|
||||
"Name": "myrepo",
|
||||
"Upstream": "ssh://git@gitea.dideric.is:30009/me/myrepo.git",
|
||||
"IdentityFile": "/dev/null",
|
||||
}],
|
||||
))
|
||||
self.assertEqual(1, len(m.bottles["dev"].tokens))
|
||||
|
||||
|
||||
class TestEmptyTokensField(unittest.TestCase):
|
||||
def test_no_tokens_field_yields_empty_tuple(self):
|
||||
m = Manifest.from_json_obj({
|
||||
"bottles": {"dev": {}},
|
||||
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
|
||||
})
|
||||
self.assertEqual((), m.bottles["dev"].tokens)
|
||||
|
||||
def test_tokens_array_type_required(self):
|
||||
with self.assertRaises(Die):
|
||||
Manifest.from_json_obj({
|
||||
"bottles": {"dev": {"tokens": "not-a-list"}},
|
||||
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
|
||||
})
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user