feat(manifest): add bottle.git field for git-gate upstreams
test / unit (pull_request) Successful in 12s
test / integration (pull_request) Successful in 15s

Each entry pairs a Name (local alias the gate exposes) with an
ssh:// Upstream URL, an IdentityFile the gate uses to push to
that upstream, and an optional KnownHostKey for upstream
host-key pinning. The Upstream URL is parsed at construction
into UpstreamUser/Host/Port/Path so downstream code doesn't
re-parse.

Two cross-validation rules: Names must be unique within a
bottle (each maps to a distinct bare repo), and no git entry's
(host, port) may overlap an ssh entry's (Hostname, Port) — the
same upstream reachable two ways would let a misbehaving agent
route around the gitleaks-bearing git-gate via the L4 ssh-gate.

PRD: docs/prds/0008-git-gate.md
This commit is contained in:
2026-05-12 18:48:14 -04:00
parent c91395425c
commit 5c5e9f817e
4 changed files with 374 additions and 2 deletions
+12 -1
View File
@@ -37,7 +37,7 @@ from pathlib import Path
from typing import Any, Generic, Sequence, TypeVar
from ..log import die
from ..manifest import Manifest, SshEntry
from ..manifest import GitEntry, Manifest, SshEntry
from ..util import expand_tilde
from .util import host_skill_dir
@@ -171,6 +171,7 @@ class BottleBackend(ABC, Generic[PlanT, CleanupT]):
bottle = manifest.bottle_for(spec.agent_name)
self._validate_skills(agent.skills)
self._validate_ssh_entries(bottle.ssh)
self._validate_git_entries(bottle.git)
def _validate_skills(self, skills: Sequence[str]) -> None:
"""Each named skill must be a directory under the host's
@@ -193,6 +194,16 @@ class BottleBackend(ABC, Generic[PlanT, CleanupT]):
if not os.path.isfile(key):
die(f"ssh key file not found for host '{entry.Host}': {key}")
def _validate_git_entries(self, entries: Sequence[GitEntry]) -> None:
"""Each entry's IdentityFile must exist on the host (after
expanding leading ~) — the git-gate copies it in at start time
to authenticate the upstream push (PRD 0008). Shape is already
enforced by Manifest validation; this only checks presence."""
for entry in entries:
key = expand_tilde(entry.IdentityFile)
if not os.path.isfile(key):
die(f"git upstream key file not found for '{entry.Name}': {key}")
@abstractmethod
def _resolve_plan(self, spec: BottleSpec, *, stage_dir: Path) -> PlanT:
"""Backend-specific plan resolution: image/container names,
+141 -1
View File
@@ -7,6 +7,7 @@ Schema (see CLAUDE.md "Intended design"):
"<bottle-name>": {
"env": { "<NAME>": <env-entry>, ... },
"ssh": [ <ssh-entry>, ... ],
"git": [ <git-entry>, ... ],
"egress": { "allowlist": [ "<hostname>", ... ] }
}
},
@@ -79,6 +80,65 @@ class SshEntry:
)
@dataclass(frozen=True)
class GitEntry:
"""One upstream the per-agent git-gate (PRD 0008) is allowed to
talk to. `Upstream` is the real remote URL the agent would push to
if there were no gate; the gate hosts a bare repo at /git/<Name>.git
and `IdentityFile` is the SSH key the gate uses to push that repo
upstream after gitleaks passes. The agent itself never holds the
upstream credential.
The Upstream URL is parsed once at construction and the pieces are
stashed in the `Upstream*` fields so the git-gate render step
doesn't have to re-parse."""
Name: str
Upstream: str
IdentityFile: str
KnownHostKey: str = ""
UpstreamUser: str = ""
UpstreamHost: str = ""
UpstreamPort: str = ""
UpstreamPath: str = ""
@classmethod
def from_dict(cls, bottle_name: str, idx: int, raw: object) -> "GitEntry":
d = _as_json_object(raw, f"bottle '{bottle_name}' git[{idx}]")
name = d.get("Name")
if not isinstance(name, str) or not name:
die(f"bottle '{bottle_name}' git[{idx}] missing required string field 'Name'")
upstream = d.get("Upstream")
if not isinstance(upstream, str) or not upstream:
die(
f"bottle '{bottle_name}' git '{name}' missing required string field "
f"'Upstream'"
)
ident = d.get("IdentityFile")
if not isinstance(ident, str) or not ident:
die(
f"bottle '{bottle_name}' git '{name}' missing required string field "
f"'IdentityFile'"
)
khk = _opt_str(
d.get("KnownHostKey"),
f"bottle '{bottle_name}' git '{name}' KnownHostKey",
)
user, host, port, path = _parse_git_upstream(
upstream, f"bottle '{bottle_name}' git '{name}' Upstream"
)
return cls(
Name=name,
Upstream=upstream,
IdentityFile=ident,
KnownHostKey=khk,
UpstreamUser=user,
UpstreamHost=host,
UpstreamPort=port,
UpstreamPath=path,
)
DLP_ACTIONS = ("block", "warn")
@@ -134,6 +194,7 @@ class BottleEgress:
class Bottle:
env: Mapping[str, str] = field(default_factory=_empty_str_dict)
ssh: tuple[SshEntry, ...] = ()
git: tuple[GitEntry, ...] = ()
egress: BottleEgress = field(default_factory=BottleEgress)
@classmethod
@@ -171,6 +232,19 @@ class Bottle:
for i, entry in enumerate(ssh_list)
)
git: tuple[GitEntry, ...] = ()
git_raw = d.get("git")
if git_raw is not None:
if not isinstance(git_raw, list):
die(f"bottle '{name}' git must be an array (was {type(git_raw).__name__})")
git_list = cast(list[object], git_raw)
git = tuple(
GitEntry.from_dict(name, i, entry)
for i, entry in enumerate(git_list)
)
_validate_unique_git_names(name, git)
_validate_no_shadow_route(name, ssh, git)
egress_raw = d.get("egress")
egress = (
BottleEgress.from_dict(name, egress_raw)
@@ -178,7 +252,7 @@ class Bottle:
else BottleEgress()
)
return cls(env=env, ssh=ssh, egress=egress)
return cls(env=env, ssh=ssh, git=git, egress=egress)
@dataclass(frozen=True)
@@ -359,3 +433,69 @@ def _opt_port(value: object, label: str) -> str:
if isinstance(value, str):
return value
die(f"{label} must be a string or number (was {type(value).__name__})")
def _parse_git_upstream(url: str, label: str) -> tuple[str, str, str, str]:
"""Parse `ssh://user@host[:port]/path` into (user, host, port, path).
Dies if `url` doesn't match the ssh:// shape v1 supports. Default
port is 22 (matches OpenSSH)."""
if not url.startswith("ssh://"):
die(f"{label} must be an ssh:// URL (was {url!r})")
rest = url[len("ssh://"):]
if "@" not in rest:
die(f"{label} must include a user (e.g. ssh://git@host/path.git); was {url!r}")
user, _, hostpart = rest.partition("@")
if not user:
die(f"{label} user is empty in {url!r}")
if "/" not in hostpart:
die(f"{label} must include a path (e.g. ssh://git@host/path.git); was {url!r}")
hostport, _, path = hostpart.partition("/")
if not path:
die(f"{label} path is empty in {url!r}")
if ":" in hostport:
host, _, port = hostport.partition(":")
if not port.isdigit():
die(f"{label} port must be numeric in {url!r}")
else:
host = hostport
port = "22"
if not host:
die(f"{label} host is empty in {url!r}")
return (user, host, port, path)
def _validate_unique_git_names(bottle_name: str, git: tuple[GitEntry, ...]) -> None:
seen: dict[str, None] = {}
for g in git:
if g.Name in seen:
die(
f"bottle '{bottle_name}' git entries have duplicate Name '{g.Name}'; "
f"each entry maps to a distinct bare repo on the gate."
)
seen[g.Name] = None
def _validate_no_shadow_route(
bottle_name: str,
ssh: tuple[SshEntry, ...],
git: tuple[GitEntry, ...],
) -> None:
"""Reject if any git entry's (host, port) matches an ssh entry's
(Hostname, Port). The same upstream reachable two ways — once through
the L4 ssh-gate, once through the gitleaks-bearing git-gate — defeats
the git-gate."""
ssh_targets: dict[tuple[str, str], str] = {}
for e in ssh:
if not e.Hostname:
continue
port = e.Port or "22"
ssh_targets[(e.Hostname, port)] = e.Host
for g in git:
ssh_host = ssh_targets.get((g.UpstreamHost, g.UpstreamPort))
if ssh_host is not None:
die(
f"bottle '{bottle_name}' has ssh entry '{ssh_host}' "
f"({g.UpstreamHost}:{g.UpstreamPort}) and git entry '{g.Name}' "
f"pointing at the same upstream. The same remote reachable two "
f"ways defeats the git-gate; remove one."
)
+29
View File
@@ -65,6 +65,31 @@ def fixture_with_ssh_dict() -> dict[str, Any]:
}
def fixture_with_git_dict() -> dict[str, Any]:
"""Bottle declares a git-gate upstream. JSON shape."""
return {
"bottles": {
"dev": {
"git": [
{
"Name": "claude-bottle",
"Upstream": "ssh://git@gitea.dideric.is:30009/didericis/claude-bottle.git",
"IdentityFile": "/dev/null",
"KnownHostKey": "ssh-ed25519 AAAA...",
},
{
"Name": "foo",
"Upstream": "ssh://git@github.com/didericis/foo.git",
"IdentityFile": "/dev/null",
"KnownHostKey": "ssh-ed25519 BBBB...",
},
]
}
},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
}
def fixture_minimal() -> Manifest:
return Manifest.from_json_obj(fixture_minimal_dict())
@@ -77,6 +102,10 @@ def fixture_with_ssh() -> Manifest:
return Manifest.from_json_obj(fixture_with_ssh_dict())
def fixture_with_git() -> Manifest:
return Manifest.from_json_obj(fixture_with_git_dict())
def write_fixture(fn: Callable[[], dict[str, Any]]) -> Path:
"""Write fixture JSON to a temp file; return the path. Caller must rm.
Accepts a function returning either a dict (JSON shape) or a Manifest;
+192
View File
@@ -0,0 +1,192 @@
"""Unit: Bottle.git manifest parsing + validation (PRD 0008)."""
import unittest
from claude_bottle.log import Die
from claude_bottle.manifest import Manifest
def _manifest(git_entries):
return {
"bottles": {"dev": {"git": git_entries}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
}
class TestGitEntryParsing(unittest.TestCase):
def test_parses_minimal_entry(self):
m = Manifest.from_json_obj(_manifest([{
"Name": "claude-bottle",
"Upstream": "ssh://git@gitea.dideric.is:30009/didericis/claude-bottle.git",
"IdentityFile": "/dev/null",
}]))
entries = m.bottles["dev"].git
self.assertEqual(1, len(entries))
e = entries[0]
self.assertEqual("claude-bottle", e.Name)
self.assertEqual("git", e.UpstreamUser)
self.assertEqual("gitea.dideric.is", e.UpstreamHost)
self.assertEqual("30009", e.UpstreamPort)
self.assertEqual("didericis/claude-bottle.git", e.UpstreamPath)
def test_default_port_is_22(self):
m = Manifest.from_json_obj(_manifest([{
"Name": "foo",
"Upstream": "ssh://git@github.com/didericis/foo.git",
"IdentityFile": "/dev/null",
}]))
e = m.bottles["dev"].git[0]
self.assertEqual("22", e.UpstreamPort)
self.assertEqual("github.com", e.UpstreamHost)
def test_known_host_key_optional(self):
m = Manifest.from_json_obj(_manifest([{
"Name": "foo",
"Upstream": "ssh://git@github.com/foo.git",
"IdentityFile": "/dev/null",
}]))
self.assertEqual("", m.bottles["dev"].git[0].KnownHostKey)
def test_missing_name_dies(self):
with self.assertRaises(Die):
Manifest.from_json_obj(_manifest([{
"Upstream": "ssh://git@github.com/foo.git",
"IdentityFile": "/dev/null",
}]))
def test_missing_upstream_dies(self):
with self.assertRaises(Die):
Manifest.from_json_obj(_manifest([{
"Name": "foo",
"IdentityFile": "/dev/null",
}]))
def test_missing_identity_file_dies(self):
with self.assertRaises(Die):
Manifest.from_json_obj(_manifest([{
"Name": "foo",
"Upstream": "ssh://git@github.com/foo.git",
}]))
def test_non_ssh_upstream_dies(self):
with self.assertRaises(Die):
Manifest.from_json_obj(_manifest([{
"Name": "foo",
"Upstream": "https://github.com/didericis/foo.git",
"IdentityFile": "/dev/null",
}]))
def test_scp_style_upstream_dies(self):
# SCP-style "git@host:path" is intentionally not supported in
# v1 — ssh:// only.
with self.assertRaises(Die):
Manifest.from_json_obj(_manifest([{
"Name": "foo",
"Upstream": "git@github.com:didericis/foo.git",
"IdentityFile": "/dev/null",
}]))
def test_upstream_without_user_dies(self):
with self.assertRaises(Die):
Manifest.from_json_obj(_manifest([{
"Name": "foo",
"Upstream": "ssh://github.com/foo.git",
"IdentityFile": "/dev/null",
}]))
def test_upstream_without_path_dies(self):
with self.assertRaises(Die):
Manifest.from_json_obj(_manifest([{
"Name": "foo",
"Upstream": "ssh://git@github.com",
"IdentityFile": "/dev/null",
}]))
def test_non_numeric_port_dies(self):
with self.assertRaises(Die):
Manifest.from_json_obj(_manifest([{
"Name": "foo",
"Upstream": "ssh://git@github.com:notaport/foo.git",
"IdentityFile": "/dev/null",
}]))
class TestGitEntryCrossValidation(unittest.TestCase):
def test_duplicate_name_dies(self):
with self.assertRaises(Die):
Manifest.from_json_obj(_manifest([
{"Name": "foo", "Upstream": "ssh://git@a.example/x.git",
"IdentityFile": "/dev/null"},
{"Name": "foo", "Upstream": "ssh://git@b.example/y.git",
"IdentityFile": "/dev/null"},
]))
def test_shadow_route_with_ssh_entry_dies(self):
# An ssh entry pointing at gitea.dideric.is:30009 AND a git
# entry pointing at ssh://git@gitea.dideric.is:30009/... is a
# bypass: agents could route around the gate by using the
# ssh-gate. Manifest construction must reject.
with self.assertRaises(Die):
Manifest.from_json_obj({
"bottles": {
"dev": {
"ssh": [{
"Host": "gitea",
"IdentityFile": "/dev/null",
"Hostname": "gitea.dideric.is",
"User": "git",
"Port": 30009,
}],
"git": [{
"Name": "claude-bottle",
"Upstream": "ssh://git@gitea.dideric.is:30009/didericis/claude-bottle.git",
"IdentityFile": "/dev/null",
}],
},
},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
})
def test_independent_ssh_and_git_targets_allowed(self):
# Same hostname but different ports are independent targets.
m = Manifest.from_json_obj({
"bottles": {
"dev": {
"ssh": [{
"Host": "gitea-ssh",
"IdentityFile": "/dev/null",
"Hostname": "gitea.dideric.is",
"User": "git",
"Port": 22,
}],
"git": [{
"Name": "claude-bottle",
"Upstream": "ssh://git@gitea.dideric.is:30009/didericis/claude-bottle.git",
"IdentityFile": "/dev/null",
}],
},
},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
})
self.assertEqual(1, len(m.bottles["dev"].ssh))
self.assertEqual(1, len(m.bottles["dev"].git))
class TestEmptyGitField(unittest.TestCase):
def test_no_git_field_yields_empty_tuple(self):
m = Manifest.from_json_obj({
"bottles": {"dev": {}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
})
self.assertEqual((), m.bottles["dev"].git)
def test_git_array_type_required(self):
with self.assertRaises(Die):
Manifest.from_json_obj({
"bottles": {"dev": {"git": "not-a-list"}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
})
if __name__ == "__main__":
unittest.main()