From 0b5d59cf9e67cff638d3020cd71df9bc044b63b6 Mon Sep 17 00:00:00 2001 From: claude Date: Wed, 3 Jun 2026 15:56:22 +0000 Subject: [PATCH] feat(prd-0048): implement SSH deploy-key provisioning with contrib/gitea MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - manifest_git.py: add ProvisionedKeyConfig dataclass; extend GitEntry with ProvisionedKey field (optional); make IdentityFile default to "" so provisioned_key entries can be constructed without a static path; add _parse_provisioned_key_config; update from_repos_entry to accept provisioned_key as an alternative to identity (mutually exclusive, parser rejects both-or-neither) - deploy_key_provisioner.py (new): DeployKeyProvisioner ABC with create() and delete() abstract methods; get_provisioner() factory with lazy contrib import for gitea - contrib/gitea/deploy_key_provisioner.py (new): GiteaDeployKeyProvisioner generating ed25519 keypairs via ssh-keygen and managing them through the Gitea deploy-key API (POST/DELETE); 404 on delete is success; all other errors raise RuntimeError - git_gate.py: add _provision_dynamic_key() called in GitGate.prepare() for entries with ProvisionedKey — generates key, writes private key and key ID files to stage_dir, patches GitGateUpstream.identity_file; add revoke_git_gate_provisioned_keys() for teardown — raises on failure - docker/launch.py: call revoke_git_gate_provisioned_keys() in teardown() after stack.close() so revocation runs after containers stop and failures propagate (not suppressed) - smolmachines/launch.py: extract _teardown_smolmachines() helper that catches stack.close() errors (warn + re-raise) then calls revocation; same fatal-on-failure contract as docker backend - test_manifest_git.py: 9 new cases for provisioned_key parsing - test_deploy_key_provisioner.py (new): factory smoke tests - test_contrib_gitea_deploy_key.py (new): create/delete/error/split tests Closes #169 --- bot_bottle/backend/docker/launch.py | 8 + bot_bottle/backend/smolmachines/launch.py | 24 +++ bot_bottle/contrib/__init__.py | 0 bot_bottle/contrib/gitea/__init__.py | 0 .../contrib/gitea/deploy_key_provisioner.py | 121 +++++++++++++ bot_bottle/deploy_key_provisioner.py | 52 ++++++ bot_bottle/git_gate.py | 90 +++++++++- bot_bottle/manifest_git.py | 101 +++++++++-- tests/unit/test_contrib_gitea_deploy_key.py | 166 ++++++++++++++++++ tests/unit/test_deploy_key_provisioner.py | 29 +++ tests/unit/test_manifest_git.py | 107 +++++++++++ 11 files changed, 686 insertions(+), 12 deletions(-) create mode 100644 bot_bottle/contrib/__init__.py create mode 100644 bot_bottle/contrib/gitea/__init__.py create mode 100644 bot_bottle/contrib/gitea/deploy_key_provisioner.py create mode 100644 bot_bottle/deploy_key_provisioner.py create mode 100644 tests/unit/test_contrib_gitea_deploy_key.py create mode 100644 tests/unit/test_deploy_key_provisioner.py diff --git a/bot_bottle/backend/docker/launch.py b/bot_bottle/backend/docker/launch.py index c6676e1..09e72a0 100644 --- a/bot_bottle/backend/docker/launch.py +++ b/bot_bottle/backend/docker/launch.py @@ -43,6 +43,7 @@ from pathlib import Path from typing import Callable, Generator from ...egress import egress_resolve_token_values +from ...git_gate import revoke_git_gate_provisioned_keys from ...log import info, warn from . import network as network_mod from . import util as docker_mod @@ -51,6 +52,7 @@ from .bottle_plan import DockerBottlePlan from .bottle_state import ( bottle_state_dir, egress_state_dir, + git_gate_state_dir, pipelock_state_dir, ) from .compose import ( @@ -84,6 +86,9 @@ def launch( Teardown on exit.""" stack = ExitStack() + _bottle_for_revoke = plan.spec.manifest.bottle_for(plan.spec.agent_name) + _git_gate_dir_for_revoke = git_gate_state_dir(plan.slug) + def teardown() -> None: try: stack.close() @@ -92,6 +97,9 @@ def launch( f"teardown failed for container {plan.container_name}" f" (compose-down): {exc!r}" ) + revoke_git_gate_provisioned_keys( + _bottle_for_revoke, _git_gate_dir_for_revoke + ) try: # Step 1: agent image build. Sidecar images get built lazily by diff --git a/bot_bottle/backend/smolmachines/launch.py b/bot_bottle/backend/smolmachines/launch.py index 052682a..c4826db 100644 --- a/bot_bottle/backend/smolmachines/launch.py +++ b/bot_bottle/backend/smolmachines/launch.py @@ -53,6 +53,9 @@ from ..docker.pipelock import ( PIPELOCK_PORT as _PIPELOCK_PORT_STR, pipelock_tls_init, ) +from ...git_gate import revoke_git_gate_provisioned_keys +from ...log import warn +from ..docker.bottle_state import git_gate_state_dir from . import loopback_alias as _loopback from . import sidecar_bundle as _bundle from . import smolvm as _smolvm @@ -120,7 +123,28 @@ def launch( agent_prompt_mode=plan.agent_prompt_mode, ) finally: + _teardown_smolmachines(stack, plan) + + +def _teardown_smolmachines( + stack: ExitStack, + plan: SmolmachinesBottlePlan, +) -> None: + """Unwind the ExitStack, then revoke any provisioned deploy keys. + + ExitStack errors are caught and logged (non-fatal) so that key + revocation always runs. Revocation errors propagate — a stranded + deploy key is a security concern the operator must address.""" + teardown_exc: BaseException | None = None + try: stack.close() + except BaseException as exc: + teardown_exc = exc + warn(f"smolmachines teardown failed: {exc!r}") + bottle = plan.spec.manifest.bottle_for(plan.spec.agent_name) + revoke_git_gate_provisioned_keys(bottle, git_gate_state_dir(plan.slug)) + if teardown_exc is not None: + raise teardown_exc def _allocate_resources( diff --git a/bot_bottle/contrib/__init__.py b/bot_bottle/contrib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/bot_bottle/contrib/gitea/__init__.py b/bot_bottle/contrib/gitea/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/bot_bottle/contrib/gitea/deploy_key_provisioner.py b/bot_bottle/contrib/gitea/deploy_key_provisioner.py new file mode 100644 index 0000000..7006856 --- /dev/null +++ b/bot_bottle/contrib/gitea/deploy_key_provisioner.py @@ -0,0 +1,121 @@ +"""Gitea deploy-key provisioner (PRD 0048, contrib). + +Generates ed25519 keypairs via `ssh-keygen` and registers / deletes +them using the Gitea deploy-key HTTP API. No new Python dependencies — +only stdlib `urllib.request` and `subprocess`.""" + +from __future__ import annotations + +import json +import subprocess +import tempfile +import urllib.error +import urllib.request +from pathlib import Path + +from ...deploy_key_provisioner import DeployKeyProvisioner + + +class GiteaDeployKeyProvisioner(DeployKeyProvisioner): + """Manages deploy keys on a Gitea instance.""" + + def __init__(self, *, token: str, api_url: str) -> None: + self._token = token + self._api_url = api_url.rstrip("/") + + def create(self, owner_repo: str, title: str) -> tuple[str, bytes]: + """Generate an ed25519 keypair, register the public half as a + repo deploy key, and return `(key_id, private_key_bytes)`. + + The key is registered with `read_only=False` because git-gate + needs push access to forward gitleaks-scanned refs upstream.""" + with tempfile.TemporaryDirectory() as tmpdir: + key_path = Path(tmpdir) / "key" + subprocess.run( + [ + "ssh-keygen", "-t", "ed25519", + "-f", str(key_path), + "-N", "", + ], + check=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + private_key = key_path.read_bytes() + public_key = key_path.with_suffix(".pub").read_text().strip() + + owner, repo = _split_owner_repo(owner_repo) + url = f"{self._api_url}/api/v1/repos/{owner}/{repo}/keys" + payload = json.dumps({ + "key": public_key, + "read_only": False, + "title": title, + }).encode() + req = urllib.request.Request( + url, + data=payload, + headers={ + "Authorization": f"token {self._token}", + "Content-Type": "application/json", + }, + method="POST", + ) + try: + with urllib.request.urlopen(req) as resp: + body = json.loads(resp.read()) + except urllib.error.HTTPError as exc: + _body = _read_error_body(exc) + raise RuntimeError( + f"failed to create deploy key for {owner_repo}: " + f"HTTP {exc.code} — {_body}" + ) from exc + except urllib.error.URLError as exc: + raise RuntimeError( + f"failed to create deploy key for {owner_repo}: {exc.reason}" + ) from exc + + return str(body["id"]), private_key + + def delete(self, owner_repo: str, key_id: str) -> None: + """Delete the deploy key. HTTP 404 (already gone) is success. + All other errors raise RuntimeError so teardown halts.""" + owner, repo = _split_owner_repo(owner_repo) + url = f"{self._api_url}/api/v1/repos/{owner}/{repo}/keys/{key_id}" + req = urllib.request.Request( + url, + headers={"Authorization": f"token {self._token}"}, + method="DELETE", + ) + try: + with urllib.request.urlopen(req): + pass + except urllib.error.HTTPError as exc: + if exc.code == 404: + return + _body = _read_error_body(exc) + raise RuntimeError( + f"failed to delete deploy key {key_id} for {owner_repo}: " + f"HTTP {exc.code} — {_body}" + ) from exc + except urllib.error.URLError as exc: + raise RuntimeError( + f"failed to delete deploy key {key_id} for {owner_repo}: " + f"{exc.reason}" + ) from exc + + +def _split_owner_repo(owner_repo: str) -> tuple[str, str]: + """Split `'owner/repo'` into `('owner', 'repo')`.""" + parts = owner_repo.split("/", 1) + if len(parts) != 2 or not all(parts): + raise ValueError( + f"expected 'owner/repo' format, got {owner_repo!r}" + ) + return parts[0], parts[1] + + +def _read_error_body(exc: urllib.error.HTTPError) -> str: + try: + return exc.read().decode("utf-8", errors="replace") + except Exception: + return "" diff --git a/bot_bottle/deploy_key_provisioner.py b/bot_bottle/deploy_key_provisioner.py new file mode 100644 index 0000000..b7cac3c --- /dev/null +++ b/bot_bottle/deploy_key_provisioner.py @@ -0,0 +1,52 @@ +"""Deploy-key provisioner interface and factory (PRD 0048). + +The core defines the abstract contract; concrete implementations live +in `bot_bottle/contrib//deploy_key_provisioner.py`. The +factory `get_provisioner` imports contrib modules lazily so that a +missing optional dependency in one provider doesn't break unrelated +features.""" + +from __future__ import annotations + +from abc import ABC, abstractmethod + + +class DeployKeyProvisioner(ABC): + """Manages a single deploy-key lifecycle on a remote forge.""" + + @abstractmethod + def create(self, owner_repo: str, title: str) -> tuple[str, bytes]: + """Generate a keypair and register the public half as a + deploy key on the forge. + + `owner_repo` is the `/` path (no `.git` suffix). + `title` is the human-readable label shown in the forge UI. + + Returns `(key_id, private_key_bytes)` where `key_id` is opaque + to the caller and is only ever passed back to `delete`.""" + + @abstractmethod + def delete(self, owner_repo: str, key_id: str) -> None: + """Delete the registered deploy key. + + Must not raise if the key is already absent (HTTP 404 is + success). Must raise for all other failures so teardown halts.""" + + +def get_provisioner( + provider: str, token: str, api_url: str +) -> DeployKeyProvisioner: + """Instantiate the contrib provisioner for `provider`. + + Raises `ManifestError` for unknown providers so the error surfaces + at parse time rather than at runtime.""" + if provider == "gitea": + from bot_bottle.contrib.gitea.deploy_key_provisioner import ( + GiteaDeployKeyProvisioner, + ) + return GiteaDeployKeyProvisioner(token=token, api_url=api_url) + from .manifest_util import ManifestError + raise ManifestError( + f"unknown provisioned_key provider: {provider!r}; " + f"available: gitea" + ) diff --git a/bot_bottle/git_gate.py b/bot_bottle/git_gate.py index 274d121..49ad750 100644 --- a/bot_bottle/git_gate.py +++ b/bot_bottle/git_gate.py @@ -29,11 +29,14 @@ backend-specific and lives on concrete subclasses (see from __future__ import annotations +import dataclasses +import os import shlex from abc import ABC, abstractmethod from dataclasses import dataclass from pathlib import Path +from .log import info from .manifest import Bottle, GitEntry @@ -357,6 +360,80 @@ exit 0 """ +def _provision_dynamic_key( + entry: GitEntry, + slug: str, + stage_dir: Path, +) -> str: + """Generate a fresh ed25519 keypair, register the public half with + the forge, and persist the private key + key ID under `stage_dir`. + + Returns the host-side path to the private key file so the caller + can inject it into the GitGateUpstream as `identity_file`.""" + from .deploy_key_provisioner import get_provisioner + pk = entry.ProvisionedKey + assert pk is not None + token = os.environ.get(pk.token_env) + if token is None: + raise RuntimeError( + f"git-gate.repos[{entry.Name!r}] provisioned_key.token_env" + f" = {pk.token_env!r}: env var is not set" + ) + api_url = pk.api_url or f"https://{entry.UpstreamHost}" + provisioner = get_provisioner(pk.provider, token, api_url) + + owner_repo = entry.UpstreamPath + if owner_repo.endswith(".git"): + owner_repo = owner_repo[:-4] + title = f"bot-bottle:{slug}:{entry.Name}" + + info(f"provisioning deploy key for git-gate.repos[{entry.Name!r}]") + key_id, private_key_bytes = provisioner.create(owner_repo, title) + + key_file = stage_dir / f"{entry.Name}-key" + key_file.write_bytes(private_key_bytes) + key_file.chmod(0o600) + + id_file = stage_dir / f"{entry.Name}-deploy-key-id" + id_file.write_text(key_id) + id_file.chmod(0o600) + + info(f"provisioned deploy key {key_id} for git-gate.repos[{entry.Name!r}]") + return str(key_file) + + +def revoke_git_gate_provisioned_keys(bottle: Bottle, stage_dir: Path) -> None: + """Revoke all deploy keys provisioned for `bottle` during prepare. + + Called at teardown after containers stop. Raises if any revocation + fails — a stranded key is a security concern that the operator must + address manually.""" + from .deploy_key_provisioner import get_provisioner + for entry in bottle.git: + if entry.ProvisionedKey is None: + continue + pk = entry.ProvisionedKey + id_file = stage_dir / f"{entry.Name}-deploy-key-id" + if not id_file.exists(): + continue + key_id = id_file.read_text().strip() + token = os.environ.get(pk.token_env) + if token is None: + raise RuntimeError( + f"git-gate.repos[{entry.Name!r}] provisioned_key.token_env" + f" = {pk.token_env!r}: env var is not set;" + f" cannot revoke deploy key {key_id}" + ) + api_url = pk.api_url or f"https://{entry.UpstreamHost}" + provisioner = get_provisioner(pk.provider, token, api_url) + owner_repo = entry.UpstreamPath + if owner_repo.endswith(".git"): + owner_repo = owner_repo[:-4] + info(f"revoking deploy key {key_id} for git-gate.repos[{entry.Name!r}]") + provisioner.delete(owner_repo, key_id) + info(f"revoked deploy key {key_id} for git-gate.repos[{entry.Name!r}]") + + class GitGate(ABC): """The per-agent git-gate. Encapsulates the host-side prepare (upstream lift + entrypoint/hook render); the sidecar's @@ -368,10 +445,21 @@ class GitGate(ABC): entrypoint, pre-receive hook, and access-hook scripts (mode 600) under `stage_dir`. Pure host-side, no docker subprocess. + For `provisioned_key` entries, also generates and registers + a fresh deploy key via the forge API and writes the private key + + key ID to `stage_dir`. + Returned plan is incomplete: the launch step must fill `internal_network` / `egress_network` via `dataclasses.replace` before passing the plan to `.start`.""" - upstreams = git_gate_upstreams_for_bottle(bottle) + upstreams_list = list(git_gate_upstreams_for_bottle(bottle)) + for i, entry in enumerate(bottle.git): + if entry.ProvisionedKey is not None: + key_file = _provision_dynamic_key(entry, slug, stage_dir) + upstreams_list[i] = dataclasses.replace( + upstreams_list[i], identity_file=key_file + ) + upstreams = tuple(upstreams_list) entrypoint = stage_dir / "git_gate_entrypoint.sh" entrypoint.write_text(git_gate_render_entrypoint(upstreams)) entrypoint.chmod(0o600) diff --git a/bot_bottle/manifest_git.py b/bot_bottle/manifest_git.py index cf24a83..1ba6019 100644 --- a/bot_bottle/manifest_git.py +++ b/bot_bottle/manifest_git.py @@ -4,6 +4,7 @@ from __future__ import annotations import re from dataclasses import dataclass +from typing import Optional from .manifest_util import ManifestError, as_json_object @@ -61,6 +62,24 @@ def validate_unique_git_names(bottle_name: str, git: tuple[GitEntry, ...]) -> No seen[g.Name] = None +@dataclass(frozen=True) +class ProvisionedKeyConfig: + """Configuration for automatic deploy-key lifecycle management + (PRD 0048). Used when a git-gate.repos entry opts out of a + static identity file and instead wants a fresh SSH keypair + generated at spin-up and revoked at teardown. + + `provider` names the contrib sub-package to load (e.g. `gitea`). + `token_env` is the name of a host-side env var carrying the API + token; the value is read at provision time, never stored on the + plan. `api_url` is the forge's HTTP API root; if empty, it is + derived from the upstream URL's host at provision time.""" + + provider: str + token_env: str + api_url: str = "" + + @dataclass(frozen=True) class GitEntry: """One upstream the per-agent git-gate (PRD 0008) is allowed to @@ -74,14 +93,15 @@ class GitEntry: stashed in the `Upstream*` fields so the git-gate render step doesn't have to re-parse. - Manifest source: `git-gate.repos.` (PRD 0047). The YAML keys - are `url`, `identity`, and `host_key`; the internal field names are - stable across that rename.""" + Manifest source: `git-gate.repos.` (PRD 0047/0048). Exactly + one of `identity` (static key path) or `provisioned_key` (automatic + lifecycle) must be present. The internal field names are stable.""" Name: str Upstream: str - IdentityFile: str + IdentityFile: str = "" KnownHostKey: str = "" + ProvisionedKey: Optional[ProvisionedKeyConfig] = None RemoteKey: str = "" UpstreamUser: str = "" UpstreamHost: str = "" @@ -94,8 +114,9 @@ class GitEntry: ) -> "GitEntry": """Parse one entry from `git-gate.repos.`. - YAML keys: `url` (required), `identity` (required), - `host_key` (optional). The repo_name becomes `Name`.""" + YAML keys: `url` (required), exactly one of `identity` or + `provisioned_key` (required), `host_key` (optional). + The repo_name becomes `Name`.""" if not repo_name: raise ManifestError( f"bottle '{bottle_name}' git-gate.repos has an empty key" @@ -108,21 +129,44 @@ class GitEntry: label = f"git-gate.repos[{repo_name!r}]" d = as_json_object(raw, f"bottle '{bottle_name}' {label}") for k in d: - if k not in {"url", "identity", "host_key"}: + if k not in {"url", "identity", "provisioned_key", "host_key"}: raise ManifestError( f"bottle '{bottle_name}' {label} has unknown key {k!r}; " - f"allowed: url, identity, host_key" + f"allowed: url, identity, provisioned_key, host_key" ) upstream = d.get("url") if not isinstance(upstream, str) or not upstream: raise ManifestError( f"bottle '{bottle_name}' {label} missing required string field 'url'" ) - ident = d.get("identity") - if not isinstance(ident, str) or not ident: + + has_identity = "identity" in d + has_provisioned = "provisioned_key" in d + if has_identity and has_provisioned: raise ManifestError( - f"bottle '{bottle_name}' {label} missing required string field 'identity'" + f"bottle '{bottle_name}' {label} must set exactly one of " + f"'identity' or 'provisioned_key'; got both." ) + if not has_identity and not has_provisioned: + raise ManifestError( + f"bottle '{bottle_name}' {label} must set exactly one of " + f"'identity' or 'provisioned_key'; got neither." + ) + + ident = "" + provisioned_key: Optional[ProvisionedKeyConfig] = None + if has_identity: + raw_ident = d.get("identity") + if not isinstance(raw_ident, str) or not raw_ident: + raise ManifestError( + f"bottle '{bottle_name}' {label} 'identity' must be a non-empty string" + ) + ident = raw_ident + else: + provisioned_key = _parse_provisioned_key_config( + bottle_name, label, d["provisioned_key"] + ) + khk = _opt_str( d.get("host_key"), f"bottle '{bottle_name}' {label} host_key", @@ -135,6 +179,7 @@ class GitEntry: Upstream=upstream, IdentityFile=ident, KnownHostKey=khk, + ProvisionedKey=provisioned_key, RemoteKey=host, UpstreamUser=user, UpstreamHost=host, @@ -143,6 +188,40 @@ class GitEntry: ) +def _parse_provisioned_key_config( + bottle_name: str, label: str, raw: object +) -> ProvisionedKeyConfig: + d = as_json_object(raw, f"bottle '{bottle_name}' {label}.provisioned_key") + for k in d: + if k not in {"provider", "token_env", "api_url"}: + raise ManifestError( + f"bottle '{bottle_name}' {label}.provisioned_key has unknown key {k!r}; " + f"allowed: provider, token_env, api_url" + ) + provider = d.get("provider") + if not isinstance(provider, str) or not provider: + raise ManifestError( + f"bottle '{bottle_name}' {label}.provisioned_key missing required " + f"string field 'provider'" + ) + token_env = d.get("token_env") + if not isinstance(token_env, str) or not token_env: + raise ManifestError( + f"bottle '{bottle_name}' {label}.provisioned_key missing required " + f"string field 'token_env'" + ) + api_url_raw = d.get("api_url", "") + if not isinstance(api_url_raw, str): + raise ManifestError( + f"bottle '{bottle_name}' {label}.provisioned_key 'api_url' must be a string" + ) + return ProvisionedKeyConfig( + provider=provider, + token_env=token_env, + api_url=api_url_raw, + ) + + @dataclass(frozen=True) class GitUser: """Per-bottle `git config --global user.name` / `user.email` diff --git a/tests/unit/test_contrib_gitea_deploy_key.py b/tests/unit/test_contrib_gitea_deploy_key.py new file mode 100644 index 0000000..095a0f8 --- /dev/null +++ b/tests/unit/test_contrib_gitea_deploy_key.py @@ -0,0 +1,166 @@ +"""Unit: GiteaDeployKeyProvisioner (PRD 0048, contrib/gitea).""" + +from __future__ import annotations + +import json +import unittest +import urllib.error +from io import BytesIO +from pathlib import Path +from tempfile import mkdtemp +from unittest.mock import MagicMock, call, patch + +from bot_bottle.contrib.gitea.deploy_key_provisioner import ( + GiteaDeployKeyProvisioner, + _split_owner_repo, +) + + +def _provisioner() -> GiteaDeployKeyProvisioner: + return GiteaDeployKeyProvisioner( + token="test-token", api_url="https://gitea.example.com" + ) + + +def _urlopen_response(body: dict, status: int = 200) -> MagicMock: + resp = MagicMock() + resp.read.return_value = json.dumps(body).encode() + resp.status = status + resp.__enter__ = lambda s: s + resp.__exit__ = MagicMock(return_value=False) + return resp + + +def _http_error(code: int, body: str = "") -> urllib.error.HTTPError: + return urllib.error.HTTPError( + url="http://x", + code=code, + msg="err", + hdrs=None, # type: ignore[arg-type] + fp=BytesIO(body.encode()), + ) + + +class TestCreate(unittest.TestCase): + def test_create_calls_ssh_keygen_and_posts_to_api(self): + provisioner = _provisioner() + fake_key_id = 42 + fake_private = b"PRIVATE_KEY" + fake_public = "ssh-ed25519 AAAA fake" + + with patch( + "bot_bottle.contrib.gitea.deploy_key_provisioner.subprocess.run" + ) as mock_run, patch( + "bot_bottle.contrib.gitea.deploy_key_provisioner.urllib.request.urlopen" + ) as mock_urlopen, patch( + "bot_bottle.contrib.gitea.deploy_key_provisioner.Path.read_bytes", + return_value=fake_private, + ), patch( + "bot_bottle.contrib.gitea.deploy_key_provisioner.Path.read_text", + return_value=fake_public + "\n", + ): + mock_urlopen.return_value = _urlopen_response({"id": fake_key_id}) + key_id, private_bytes = provisioner.create( + "didericis/bot-bottle", "bot-bottle:slug:repo" + ) + + # ssh-keygen called with ed25519 + mock_run.assert_called_once() + run_args = mock_run.call_args.args[0] + self.assertIn("ssh-keygen", run_args) + self.assertIn("-t", run_args) + self.assertIn("ed25519", run_args) + + # POST body contains public key + post_call = mock_urlopen.call_args.args[0] + payload = json.loads(post_call.data) + self.assertEqual(fake_public, payload["key"]) + self.assertFalse(payload["read_only"]) + + # Correct URL + self.assertIn( + "/api/v1/repos/didericis/bot-bottle/keys", post_call.full_url + ) + self.assertEqual(str(fake_key_id), key_id) + self.assertEqual(fake_private, private_bytes) + + def test_create_raises_on_http_error(self): + provisioner = _provisioner() + with patch( + "bot_bottle.contrib.gitea.deploy_key_provisioner.subprocess.run" + ), patch( + "bot_bottle.contrib.gitea.deploy_key_provisioner.urllib.request.urlopen", + side_effect=_http_error(403, "forbidden"), + ), patch( + "bot_bottle.contrib.gitea.deploy_key_provisioner.Path.read_bytes", + return_value=b"pk", + ), patch( + "bot_bottle.contrib.gitea.deploy_key_provisioner.Path.read_text", + return_value="ssh-ed25519 AAAA\n", + ): + with self.assertRaises(RuntimeError) as ctx: + provisioner.create("owner/repo", "title") + self.assertIn("403", str(ctx.exception)) + + +class TestDelete(unittest.TestCase): + def test_delete_calls_correct_endpoint(self): + provisioner = _provisioner() + with patch( + "bot_bottle.contrib.gitea.deploy_key_provisioner.urllib.request.urlopen" + ) as mock_urlopen: + mock_urlopen.return_value = _urlopen_response({}) + provisioner.delete("didericis/bot-bottle", "99") + + req = mock_urlopen.call_args.args[0] + self.assertIn("/api/v1/repos/didericis/bot-bottle/keys/99", req.full_url) + self.assertEqual("DELETE", req.get_method()) + + def test_delete_tolerates_404(self): + provisioner = _provisioner() + with patch( + "bot_bottle.contrib.gitea.deploy_key_provisioner.urllib.request.urlopen", + side_effect=_http_error(404), + ): + provisioner.delete("owner/repo", "123") # must not raise + + def test_delete_raises_on_non_404_http_error(self): + provisioner = _provisioner() + with patch( + "bot_bottle.contrib.gitea.deploy_key_provisioner.urllib.request.urlopen", + side_effect=_http_error(500, "internal server error"), + ): + with self.assertRaises(RuntimeError) as ctx: + provisioner.delete("owner/repo", "7") + self.assertIn("500", str(ctx.exception)) + + def test_delete_raises_on_url_error(self): + provisioner = _provisioner() + with patch( + "bot_bottle.contrib.gitea.deploy_key_provisioner.urllib.request.urlopen", + side_effect=urllib.error.URLError("connection refused"), + ): + with self.assertRaises(RuntimeError) as ctx: + provisioner.delete("owner/repo", "7") + self.assertIn("connection refused", str(ctx.exception)) + + +class TestSplitOwnerRepo(unittest.TestCase): + def test_simple(self): + self.assertEqual(("owner", "repo"), _split_owner_repo("owner/repo")) + + def test_raises_on_missing_slash(self): + with self.assertRaises(ValueError): + _split_owner_repo("noslash") + + def test_raises_on_empty_owner(self): + with self.assertRaises(ValueError): + _split_owner_repo("/repo") + + def test_raises_on_empty_repo(self): + with self.assertRaises(ValueError): + _split_owner_repo("owner/") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/test_deploy_key_provisioner.py b/tests/unit/test_deploy_key_provisioner.py new file mode 100644 index 0000000..8a3a81f --- /dev/null +++ b/tests/unit/test_deploy_key_provisioner.py @@ -0,0 +1,29 @@ +"""Unit: deploy_key_provisioner factory (PRD 0048).""" + +from __future__ import annotations + +import unittest +from unittest.mock import patch + +from bot_bottle.deploy_key_provisioner import DeployKeyProvisioner, get_provisioner +from bot_bottle.manifest import ManifestError + + +class TestGetProvisioner(unittest.TestCase): + def test_gitea_returns_gitea_provisioner(self): + from bot_bottle.contrib.gitea.deploy_key_provisioner import ( + GiteaDeployKeyProvisioner, + ) + p = get_provisioner("gitea", token="tok", api_url="https://gitea.example.com") + self.assertIsInstance(p, GiteaDeployKeyProvisioner) + self.assertIsInstance(p, DeployKeyProvisioner) + + def test_unknown_provider_raises_manifest_error(self): + with self.assertRaises(ManifestError) as ctx: + get_provisioner("github", token="tok", api_url="https://github.com") + self.assertIn("github", str(ctx.exception)) + self.assertIn("provisioned_key provider", str(ctx.exception)) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/test_manifest_git.py b/tests/unit/test_manifest_git.py index 8fcc22e..16a7d8d 100644 --- a/tests/unit/test_manifest_git.py +++ b/tests/unit/test_manifest_git.py @@ -243,6 +243,113 @@ class TestGitEntryCrossValidation(unittest.TestCase): self.assertIn("PRD 0047", msg) +class TestProvisionedKey(unittest.TestCase): + """git-gate.repos entries that use provisioned_key (PRD 0048).""" + + def test_provisioned_key_minimal(self): + m = Manifest.from_json_obj(_manifest({ + "bot-bottle": { + "url": "ssh://git@gitea.dideric.is:30009/didericis/bot-bottle.git", + "provisioned_key": { + "provider": "gitea", + "token_env": "GITEA_TOKEN", + }, + }, + })) + e = m.bottles["dev"].git[0] + self.assertEqual("bot-bottle", e.Name) + self.assertIsNotNone(e.ProvisionedKey) + assert e.ProvisionedKey is not None + self.assertEqual("gitea", e.ProvisionedKey.provider) + self.assertEqual("GITEA_TOKEN", e.ProvisionedKey.token_env) + self.assertEqual("", e.ProvisionedKey.api_url) + self.assertEqual("", e.IdentityFile) + + def test_provisioned_key_with_api_url(self): + m = Manifest.from_json_obj(_manifest({ + "repo": { + "url": "ssh://git@gitea.example.com/org/repo.git", + "provisioned_key": { + "provider": "gitea", + "token_env": "MY_TOKEN", + "api_url": "https://gitea.example.com", + }, + }, + })) + pk = m.bottles["dev"].git[0].ProvisionedKey + assert pk is not None + self.assertEqual("https://gitea.example.com", pk.api_url) + + def test_both_identity_and_provisioned_key_dies(self): + with self.assertRaises(ManifestError) as ctx: + Manifest.from_json_obj(_manifest({ + "foo": { + "url": "ssh://git@github.com/foo.git", + "identity": "/dev/null", + "provisioned_key": {"provider": "gitea", "token_env": "T"}, + }, + })) + self.assertIn("exactly one of", str(ctx.exception)) + self.assertIn("got both", str(ctx.exception)) + + def test_neither_identity_nor_provisioned_key_dies(self): + with self.assertRaises(ManifestError) as ctx: + Manifest.from_json_obj(_manifest({ + "foo": {"url": "ssh://git@github.com/foo.git"}, + })) + self.assertIn("exactly one of", str(ctx.exception)) + self.assertIn("got neither", str(ctx.exception)) + + def test_unknown_key_in_provisioned_key_block_dies(self): + with self.assertRaises(ManifestError): + Manifest.from_json_obj(_manifest({ + "foo": { + "url": "ssh://git@github.com/foo.git", + "provisioned_key": { + "provider": "gitea", + "token_env": "T", + "key_type": "rsa", # not allowed + }, + }, + })) + + def test_missing_provider_dies(self): + with self.assertRaises(ManifestError): + Manifest.from_json_obj(_manifest({ + "foo": { + "url": "ssh://git@github.com/foo.git", + "provisioned_key": {"token_env": "T"}, + }, + })) + + def test_missing_token_env_dies(self): + with self.assertRaises(ManifestError): + Manifest.from_json_obj(_manifest({ + "foo": { + "url": "ssh://git@github.com/foo.git", + "provisioned_key": {"provider": "gitea"}, + }, + })) + + def test_provisioned_key_entry_has_no_identity_file(self): + m = Manifest.from_json_obj(_manifest({ + "foo": { + "url": "ssh://git@github.com/didericis/foo.git", + "provisioned_key": {"provider": "gitea", "token_env": "T"}, + }, + })) + self.assertEqual("", m.bottles["dev"].git[0].IdentityFile) + + def test_identity_entry_has_no_provisioned_key(self): + m = Manifest.from_json_obj(_manifest({ + "foo": { + "url": "ssh://git@github.com/foo.git", + "identity": "/dev/null", + }, + })) + self.assertIsNone(m.bottles["dev"].git[0].ProvisionedKey) + + class TestEmptyGitGateField(unittest.TestCase): def test_no_git_gate_field_yields_empty_tuple(self): m = Manifest.from_json_obj({