From 5c5e9f817ee091452afbe4c3b6f2efcaa5cacd9c Mon Sep 17 00:00:00 2001 From: didericis Date: Tue, 12 May 2026 18:48:14 -0400 Subject: [PATCH] feat(manifest): add bottle.git field for git-gate upstreams MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Each entry pairs a Name (local alias the gate exposes) with an ssh:// Upstream URL, an IdentityFile the gate uses to push to that upstream, and an optional KnownHostKey for upstream host-key pinning. The Upstream URL is parsed at construction into UpstreamUser/Host/Port/Path so downstream code doesn't re-parse. Two cross-validation rules: Names must be unique within a bottle (each maps to a distinct bare repo), and no git entry's (host, port) may overlap an ssh entry's (Hostname, Port) — the same upstream reachable two ways would let a misbehaving agent route around the gitleaks-bearing git-gate via the L4 ssh-gate. PRD: docs/prds/0008-git-gate.md --- claude_bottle/backend/__init__.py | 13 +- claude_bottle/manifest.py | 142 +++++++++++++++++++++- tests/fixtures.py | 29 +++++ tests/unit/test_manifest_git.py | 192 ++++++++++++++++++++++++++++++ 4 files changed, 374 insertions(+), 2 deletions(-) create mode 100644 tests/unit/test_manifest_git.py diff --git a/claude_bottle/backend/__init__.py b/claude_bottle/backend/__init__.py index 8e0dc63..1436bf0 100644 --- a/claude_bottle/backend/__init__.py +++ b/claude_bottle/backend/__init__.py @@ -37,7 +37,7 @@ from pathlib import Path from typing import Any, Generic, Sequence, TypeVar from ..log import die -from ..manifest import Manifest, SshEntry +from ..manifest import GitEntry, Manifest, SshEntry from ..util import expand_tilde from .util import host_skill_dir @@ -171,6 +171,7 @@ class BottleBackend(ABC, Generic[PlanT, CleanupT]): bottle = manifest.bottle_for(spec.agent_name) self._validate_skills(agent.skills) self._validate_ssh_entries(bottle.ssh) + self._validate_git_entries(bottle.git) def _validate_skills(self, skills: Sequence[str]) -> None: """Each named skill must be a directory under the host's @@ -193,6 +194,16 @@ class BottleBackend(ABC, Generic[PlanT, CleanupT]): if not os.path.isfile(key): die(f"ssh key file not found for host '{entry.Host}': {key}") + def _validate_git_entries(self, entries: Sequence[GitEntry]) -> None: + """Each entry's IdentityFile must exist on the host (after + expanding leading ~) — the git-gate copies it in at start time + to authenticate the upstream push (PRD 0008). Shape is already + enforced by Manifest validation; this only checks presence.""" + for entry in entries: + key = expand_tilde(entry.IdentityFile) + if not os.path.isfile(key): + die(f"git upstream key file not found for '{entry.Name}': {key}") + @abstractmethod def _resolve_plan(self, spec: BottleSpec, *, stage_dir: Path) -> PlanT: """Backend-specific plan resolution: image/container names, diff --git a/claude_bottle/manifest.py b/claude_bottle/manifest.py index eed15d3..a7910b3 100644 --- a/claude_bottle/manifest.py +++ b/claude_bottle/manifest.py @@ -7,6 +7,7 @@ Schema (see CLAUDE.md "Intended design"): "": { "env": { "": , ... }, "ssh": [ , ... ], + "git": [ , ... ], "egress": { "allowlist": [ "", ... ] } } }, @@ -79,6 +80,65 @@ class SshEntry: ) +@dataclass(frozen=True) +class GitEntry: + """One upstream the per-agent git-gate (PRD 0008) is allowed to + talk to. `Upstream` is the real remote URL the agent would push to + if there were no gate; the gate hosts a bare repo at /git/.git + and `IdentityFile` is the SSH key the gate uses to push that repo + upstream after gitleaks passes. The agent itself never holds the + upstream credential. + + The Upstream URL is parsed once at construction and the pieces are + stashed in the `Upstream*` fields so the git-gate render step + doesn't have to re-parse.""" + + Name: str + Upstream: str + IdentityFile: str + KnownHostKey: str = "" + UpstreamUser: str = "" + UpstreamHost: str = "" + UpstreamPort: str = "" + UpstreamPath: str = "" + + @classmethod + def from_dict(cls, bottle_name: str, idx: int, raw: object) -> "GitEntry": + d = _as_json_object(raw, f"bottle '{bottle_name}' git[{idx}]") + name = d.get("Name") + if not isinstance(name, str) or not name: + die(f"bottle '{bottle_name}' git[{idx}] missing required string field 'Name'") + upstream = d.get("Upstream") + if not isinstance(upstream, str) or not upstream: + die( + f"bottle '{bottle_name}' git '{name}' missing required string field " + f"'Upstream'" + ) + ident = d.get("IdentityFile") + if not isinstance(ident, str) or not ident: + die( + f"bottle '{bottle_name}' git '{name}' missing required string field " + f"'IdentityFile'" + ) + khk = _opt_str( + d.get("KnownHostKey"), + f"bottle '{bottle_name}' git '{name}' KnownHostKey", + ) + user, host, port, path = _parse_git_upstream( + upstream, f"bottle '{bottle_name}' git '{name}' Upstream" + ) + return cls( + Name=name, + Upstream=upstream, + IdentityFile=ident, + KnownHostKey=khk, + UpstreamUser=user, + UpstreamHost=host, + UpstreamPort=port, + UpstreamPath=path, + ) + + DLP_ACTIONS = ("block", "warn") @@ -134,6 +194,7 @@ class BottleEgress: class Bottle: env: Mapping[str, str] = field(default_factory=_empty_str_dict) ssh: tuple[SshEntry, ...] = () + git: tuple[GitEntry, ...] = () egress: BottleEgress = field(default_factory=BottleEgress) @classmethod @@ -171,6 +232,19 @@ class Bottle: for i, entry in enumerate(ssh_list) ) + git: tuple[GitEntry, ...] = () + git_raw = d.get("git") + if git_raw is not None: + if not isinstance(git_raw, list): + die(f"bottle '{name}' git must be an array (was {type(git_raw).__name__})") + git_list = cast(list[object], git_raw) + git = tuple( + GitEntry.from_dict(name, i, entry) + for i, entry in enumerate(git_list) + ) + _validate_unique_git_names(name, git) + _validate_no_shadow_route(name, ssh, git) + egress_raw = d.get("egress") egress = ( BottleEgress.from_dict(name, egress_raw) @@ -178,7 +252,7 @@ class Bottle: else BottleEgress() ) - return cls(env=env, ssh=ssh, egress=egress) + return cls(env=env, ssh=ssh, git=git, egress=egress) @dataclass(frozen=True) @@ -359,3 +433,69 @@ def _opt_port(value: object, label: str) -> str: if isinstance(value, str): return value die(f"{label} must be a string or number (was {type(value).__name__})") + + +def _parse_git_upstream(url: str, label: str) -> tuple[str, str, str, str]: + """Parse `ssh://user@host[:port]/path` into (user, host, port, path). + Dies if `url` doesn't match the ssh:// shape v1 supports. Default + port is 22 (matches OpenSSH).""" + if not url.startswith("ssh://"): + die(f"{label} must be an ssh:// URL (was {url!r})") + rest = url[len("ssh://"):] + if "@" not in rest: + die(f"{label} must include a user (e.g. ssh://git@host/path.git); was {url!r}") + user, _, hostpart = rest.partition("@") + if not user: + die(f"{label} user is empty in {url!r}") + if "/" not in hostpart: + die(f"{label} must include a path (e.g. ssh://git@host/path.git); was {url!r}") + hostport, _, path = hostpart.partition("/") + if not path: + die(f"{label} path is empty in {url!r}") + if ":" in hostport: + host, _, port = hostport.partition(":") + if not port.isdigit(): + die(f"{label} port must be numeric in {url!r}") + else: + host = hostport + port = "22" + if not host: + die(f"{label} host is empty in {url!r}") + return (user, host, port, path) + + +def _validate_unique_git_names(bottle_name: str, git: tuple[GitEntry, ...]) -> None: + seen: dict[str, None] = {} + for g in git: + if g.Name in seen: + die( + f"bottle '{bottle_name}' git entries have duplicate Name '{g.Name}'; " + f"each entry maps to a distinct bare repo on the gate." + ) + seen[g.Name] = None + + +def _validate_no_shadow_route( + bottle_name: str, + ssh: tuple[SshEntry, ...], + git: tuple[GitEntry, ...], +) -> None: + """Reject if any git entry's (host, port) matches an ssh entry's + (Hostname, Port). The same upstream reachable two ways — once through + the L4 ssh-gate, once through the gitleaks-bearing git-gate — defeats + the git-gate.""" + ssh_targets: dict[tuple[str, str], str] = {} + for e in ssh: + if not e.Hostname: + continue + port = e.Port or "22" + ssh_targets[(e.Hostname, port)] = e.Host + for g in git: + ssh_host = ssh_targets.get((g.UpstreamHost, g.UpstreamPort)) + if ssh_host is not None: + die( + f"bottle '{bottle_name}' has ssh entry '{ssh_host}' " + f"({g.UpstreamHost}:{g.UpstreamPort}) and git entry '{g.Name}' " + f"pointing at the same upstream. The same remote reachable two " + f"ways defeats the git-gate; remove one." + ) diff --git a/tests/fixtures.py b/tests/fixtures.py index b5fd316..49fc04d 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -65,6 +65,31 @@ def fixture_with_ssh_dict() -> dict[str, Any]: } +def fixture_with_git_dict() -> dict[str, Any]: + """Bottle declares a git-gate upstream. JSON shape.""" + return { + "bottles": { + "dev": { + "git": [ + { + "Name": "claude-bottle", + "Upstream": "ssh://git@gitea.dideric.is:30009/didericis/claude-bottle.git", + "IdentityFile": "/dev/null", + "KnownHostKey": "ssh-ed25519 AAAA...", + }, + { + "Name": "foo", + "Upstream": "ssh://git@github.com/didericis/foo.git", + "IdentityFile": "/dev/null", + "KnownHostKey": "ssh-ed25519 BBBB...", + }, + ] + } + }, + "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, + } + + def fixture_minimal() -> Manifest: return Manifest.from_json_obj(fixture_minimal_dict()) @@ -77,6 +102,10 @@ def fixture_with_ssh() -> Manifest: return Manifest.from_json_obj(fixture_with_ssh_dict()) +def fixture_with_git() -> Manifest: + return Manifest.from_json_obj(fixture_with_git_dict()) + + def write_fixture(fn: Callable[[], dict[str, Any]]) -> Path: """Write fixture JSON to a temp file; return the path. Caller must rm. Accepts a function returning either a dict (JSON shape) or a Manifest; diff --git a/tests/unit/test_manifest_git.py b/tests/unit/test_manifest_git.py new file mode 100644 index 0000000..87cc1bc --- /dev/null +++ b/tests/unit/test_manifest_git.py @@ -0,0 +1,192 @@ +"""Unit: Bottle.git manifest parsing + validation (PRD 0008).""" + +import unittest + +from claude_bottle.log import Die +from claude_bottle.manifest import Manifest + + +def _manifest(git_entries): + return { + "bottles": {"dev": {"git": git_entries}}, + "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, + } + + +class TestGitEntryParsing(unittest.TestCase): + def test_parses_minimal_entry(self): + m = Manifest.from_json_obj(_manifest([{ + "Name": "claude-bottle", + "Upstream": "ssh://git@gitea.dideric.is:30009/didericis/claude-bottle.git", + "IdentityFile": "/dev/null", + }])) + entries = m.bottles["dev"].git + self.assertEqual(1, len(entries)) + e = entries[0] + self.assertEqual("claude-bottle", e.Name) + self.assertEqual("git", e.UpstreamUser) + self.assertEqual("gitea.dideric.is", e.UpstreamHost) + self.assertEqual("30009", e.UpstreamPort) + self.assertEqual("didericis/claude-bottle.git", e.UpstreamPath) + + def test_default_port_is_22(self): + m = Manifest.from_json_obj(_manifest([{ + "Name": "foo", + "Upstream": "ssh://git@github.com/didericis/foo.git", + "IdentityFile": "/dev/null", + }])) + e = m.bottles["dev"].git[0] + self.assertEqual("22", e.UpstreamPort) + self.assertEqual("github.com", e.UpstreamHost) + + def test_known_host_key_optional(self): + m = Manifest.from_json_obj(_manifest([{ + "Name": "foo", + "Upstream": "ssh://git@github.com/foo.git", + "IdentityFile": "/dev/null", + }])) + self.assertEqual("", m.bottles["dev"].git[0].KnownHostKey) + + def test_missing_name_dies(self): + with self.assertRaises(Die): + Manifest.from_json_obj(_manifest([{ + "Upstream": "ssh://git@github.com/foo.git", + "IdentityFile": "/dev/null", + }])) + + def test_missing_upstream_dies(self): + with self.assertRaises(Die): + Manifest.from_json_obj(_manifest([{ + "Name": "foo", + "IdentityFile": "/dev/null", + }])) + + def test_missing_identity_file_dies(self): + with self.assertRaises(Die): + Manifest.from_json_obj(_manifest([{ + "Name": "foo", + "Upstream": "ssh://git@github.com/foo.git", + }])) + + def test_non_ssh_upstream_dies(self): + with self.assertRaises(Die): + Manifest.from_json_obj(_manifest([{ + "Name": "foo", + "Upstream": "https://github.com/didericis/foo.git", + "IdentityFile": "/dev/null", + }])) + + def test_scp_style_upstream_dies(self): + # SCP-style "git@host:path" is intentionally not supported in + # v1 — ssh:// only. + with self.assertRaises(Die): + Manifest.from_json_obj(_manifest([{ + "Name": "foo", + "Upstream": "git@github.com:didericis/foo.git", + "IdentityFile": "/dev/null", + }])) + + def test_upstream_without_user_dies(self): + with self.assertRaises(Die): + Manifest.from_json_obj(_manifest([{ + "Name": "foo", + "Upstream": "ssh://github.com/foo.git", + "IdentityFile": "/dev/null", + }])) + + def test_upstream_without_path_dies(self): + with self.assertRaises(Die): + Manifest.from_json_obj(_manifest([{ + "Name": "foo", + "Upstream": "ssh://git@github.com", + "IdentityFile": "/dev/null", + }])) + + def test_non_numeric_port_dies(self): + with self.assertRaises(Die): + Manifest.from_json_obj(_manifest([{ + "Name": "foo", + "Upstream": "ssh://git@github.com:notaport/foo.git", + "IdentityFile": "/dev/null", + }])) + + +class TestGitEntryCrossValidation(unittest.TestCase): + def test_duplicate_name_dies(self): + with self.assertRaises(Die): + Manifest.from_json_obj(_manifest([ + {"Name": "foo", "Upstream": "ssh://git@a.example/x.git", + "IdentityFile": "/dev/null"}, + {"Name": "foo", "Upstream": "ssh://git@b.example/y.git", + "IdentityFile": "/dev/null"}, + ])) + + def test_shadow_route_with_ssh_entry_dies(self): + # An ssh entry pointing at gitea.dideric.is:30009 AND a git + # entry pointing at ssh://git@gitea.dideric.is:30009/... is a + # bypass: agents could route around the gate by using the + # ssh-gate. Manifest construction must reject. + with self.assertRaises(Die): + Manifest.from_json_obj({ + "bottles": { + "dev": { + "ssh": [{ + "Host": "gitea", + "IdentityFile": "/dev/null", + "Hostname": "gitea.dideric.is", + "User": "git", + "Port": 30009, + }], + "git": [{ + "Name": "claude-bottle", + "Upstream": "ssh://git@gitea.dideric.is:30009/didericis/claude-bottle.git", + "IdentityFile": "/dev/null", + }], + }, + }, + "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, + }) + + def test_independent_ssh_and_git_targets_allowed(self): + # Same hostname but different ports are independent targets. + m = Manifest.from_json_obj({ + "bottles": { + "dev": { + "ssh": [{ + "Host": "gitea-ssh", + "IdentityFile": "/dev/null", + "Hostname": "gitea.dideric.is", + "User": "git", + "Port": 22, + }], + "git": [{ + "Name": "claude-bottle", + "Upstream": "ssh://git@gitea.dideric.is:30009/didericis/claude-bottle.git", + "IdentityFile": "/dev/null", + }], + }, + }, + "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, + }) + self.assertEqual(1, len(m.bottles["dev"].ssh)) + self.assertEqual(1, len(m.bottles["dev"].git)) + + +class TestEmptyGitField(unittest.TestCase): + def test_no_git_field_yields_empty_tuple(self): + m = Manifest.from_json_obj({ + "bottles": {"dev": {}}, + "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, + }) + self.assertEqual((), m.bottles["dev"].git) + + def test_git_array_type_required(self): + with self.assertRaises(Die): + Manifest.from_json_obj({ + "bottles": {"dev": {"git": "not-a-list"}}, + "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, + }) + + +if __name__ == "__main__": + unittest.main()