feat(prd-0048): implement SSH deploy-key provisioning with contrib/gitea

- manifest_git.py: add ProvisionedKeyConfig dataclass; extend GitEntry
  with ProvisionedKey field (optional); make IdentityFile default to ""
  so provisioned_key entries can be constructed without a static path;
  add _parse_provisioned_key_config; update from_repos_entry to accept
  provisioned_key as an alternative to identity (mutually exclusive,
  parser rejects both-or-neither)

- deploy_key_provisioner.py (new): DeployKeyProvisioner ABC with create()
  and delete() abstract methods; get_provisioner() factory with lazy
  contrib import for gitea

- contrib/gitea/deploy_key_provisioner.py (new): GiteaDeployKeyProvisioner
  generating ed25519 keypairs via ssh-keygen and managing them through
  the Gitea deploy-key API (POST/DELETE); 404 on delete is success;
  all other errors raise RuntimeError

- git_gate.py: add _provision_dynamic_key() called in GitGate.prepare()
  for entries with ProvisionedKey — generates key, writes private key
  and key ID files to stage_dir, patches GitGateUpstream.identity_file;
  add revoke_git_gate_provisioned_keys() for teardown — raises on failure

- docker/launch.py: call revoke_git_gate_provisioned_keys() in teardown()
  after stack.close() so revocation runs after containers stop and
  failures propagate (not suppressed)

- smolmachines/launch.py: extract _teardown_smolmachines() helper that
  catches stack.close() errors (warn + re-raise) then calls revocation;
  same fatal-on-failure contract as docker backend

- test_manifest_git.py: 9 new cases for provisioned_key parsing
- test_deploy_key_provisioner.py (new): factory smoke tests
- test_contrib_gitea_deploy_key.py (new): create/delete/error/split tests

Closes #169
This commit is contained in:
2026-06-03 15:56:22 +00:00
committed by didericis
parent 464012d97c
commit 0b5d59cf9e
11 changed files with 686 additions and 12 deletions
+8
View File
@@ -43,6 +43,7 @@ from pathlib import Path
from typing import Callable, Generator
from ...egress import egress_resolve_token_values
from ...git_gate import revoke_git_gate_provisioned_keys
from ...log import info, warn
from . import network as network_mod
from . import util as docker_mod
@@ -51,6 +52,7 @@ from .bottle_plan import DockerBottlePlan
from .bottle_state import (
bottle_state_dir,
egress_state_dir,
git_gate_state_dir,
pipelock_state_dir,
)
from .compose import (
@@ -84,6 +86,9 @@ def launch(
Teardown on exit."""
stack = ExitStack()
_bottle_for_revoke = plan.spec.manifest.bottle_for(plan.spec.agent_name)
_git_gate_dir_for_revoke = git_gate_state_dir(plan.slug)
def teardown() -> None:
try:
stack.close()
@@ -92,6 +97,9 @@ def launch(
f"teardown failed for container {plan.container_name}"
f" (compose-down): {exc!r}"
)
revoke_git_gate_provisioned_keys(
_bottle_for_revoke, _git_gate_dir_for_revoke
)
try:
# Step 1: agent image build. Sidecar images get built lazily by
+24
View File
@@ -53,6 +53,9 @@ from ..docker.pipelock import (
PIPELOCK_PORT as _PIPELOCK_PORT_STR,
pipelock_tls_init,
)
from ...git_gate import revoke_git_gate_provisioned_keys
from ...log import warn
from ..docker.bottle_state import git_gate_state_dir
from . import loopback_alias as _loopback
from . import sidecar_bundle as _bundle
from . import smolvm as _smolvm
@@ -120,7 +123,28 @@ def launch(
agent_prompt_mode=plan.agent_prompt_mode,
)
finally:
_teardown_smolmachines(stack, plan)
def _teardown_smolmachines(
stack: ExitStack,
plan: SmolmachinesBottlePlan,
) -> None:
"""Unwind the ExitStack, then revoke any provisioned deploy keys.
ExitStack errors are caught and logged (non-fatal) so that key
revocation always runs. Revocation errors propagate — a stranded
deploy key is a security concern the operator must address."""
teardown_exc: BaseException | None = None
try:
stack.close()
except BaseException as exc:
teardown_exc = exc
warn(f"smolmachines teardown failed: {exc!r}")
bottle = plan.spec.manifest.bottle_for(plan.spec.agent_name)
revoke_git_gate_provisioned_keys(bottle, git_gate_state_dir(plan.slug))
if teardown_exc is not None:
raise teardown_exc
def _allocate_resources(
View File
@@ -0,0 +1,121 @@
"""Gitea deploy-key provisioner (PRD 0048, contrib).
Generates ed25519 keypairs via `ssh-keygen` and registers / deletes
them using the Gitea deploy-key HTTP API. No new Python dependencies —
only stdlib `urllib.request` and `subprocess`."""
from __future__ import annotations
import json
import subprocess
import tempfile
import urllib.error
import urllib.request
from pathlib import Path
from ...deploy_key_provisioner import DeployKeyProvisioner
class GiteaDeployKeyProvisioner(DeployKeyProvisioner):
"""Manages deploy keys on a Gitea instance."""
def __init__(self, *, token: str, api_url: str) -> None:
self._token = token
self._api_url = api_url.rstrip("/")
def create(self, owner_repo: str, title: str) -> tuple[str, bytes]:
"""Generate an ed25519 keypair, register the public half as a
repo deploy key, and return `(key_id, private_key_bytes)`.
The key is registered with `read_only=False` because git-gate
needs push access to forward gitleaks-scanned refs upstream."""
with tempfile.TemporaryDirectory() as tmpdir:
key_path = Path(tmpdir) / "key"
subprocess.run(
[
"ssh-keygen", "-t", "ed25519",
"-f", str(key_path),
"-N", "",
],
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
private_key = key_path.read_bytes()
public_key = key_path.with_suffix(".pub").read_text().strip()
owner, repo = _split_owner_repo(owner_repo)
url = f"{self._api_url}/api/v1/repos/{owner}/{repo}/keys"
payload = json.dumps({
"key": public_key,
"read_only": False,
"title": title,
}).encode()
req = urllib.request.Request(
url,
data=payload,
headers={
"Authorization": f"token {self._token}",
"Content-Type": "application/json",
},
method="POST",
)
try:
with urllib.request.urlopen(req) as resp:
body = json.loads(resp.read())
except urllib.error.HTTPError as exc:
_body = _read_error_body(exc)
raise RuntimeError(
f"failed to create deploy key for {owner_repo}: "
f"HTTP {exc.code}{_body}"
) from exc
except urllib.error.URLError as exc:
raise RuntimeError(
f"failed to create deploy key for {owner_repo}: {exc.reason}"
) from exc
return str(body["id"]), private_key
def delete(self, owner_repo: str, key_id: str) -> None:
"""Delete the deploy key. HTTP 404 (already gone) is success.
All other errors raise RuntimeError so teardown halts."""
owner, repo = _split_owner_repo(owner_repo)
url = f"{self._api_url}/api/v1/repos/{owner}/{repo}/keys/{key_id}"
req = urllib.request.Request(
url,
headers={"Authorization": f"token {self._token}"},
method="DELETE",
)
try:
with urllib.request.urlopen(req):
pass
except urllib.error.HTTPError as exc:
if exc.code == 404:
return
_body = _read_error_body(exc)
raise RuntimeError(
f"failed to delete deploy key {key_id} for {owner_repo}: "
f"HTTP {exc.code}{_body}"
) from exc
except urllib.error.URLError as exc:
raise RuntimeError(
f"failed to delete deploy key {key_id} for {owner_repo}: "
f"{exc.reason}"
) from exc
def _split_owner_repo(owner_repo: str) -> tuple[str, str]:
"""Split `'owner/repo'` into `('owner', 'repo')`."""
parts = owner_repo.split("/", 1)
if len(parts) != 2 or not all(parts):
raise ValueError(
f"expected 'owner/repo' format, got {owner_repo!r}"
)
return parts[0], parts[1]
def _read_error_body(exc: urllib.error.HTTPError) -> str:
try:
return exc.read().decode("utf-8", errors="replace")
except Exception:
return ""
+52
View File
@@ -0,0 +1,52 @@
"""Deploy-key provisioner interface and factory (PRD 0048).
The core defines the abstract contract; concrete implementations live
in `bot_bottle/contrib/<provider>/deploy_key_provisioner.py`. The
factory `get_provisioner` imports contrib modules lazily so that a
missing optional dependency in one provider doesn't break unrelated
features."""
from __future__ import annotations
from abc import ABC, abstractmethod
class DeployKeyProvisioner(ABC):
"""Manages a single deploy-key lifecycle on a remote forge."""
@abstractmethod
def create(self, owner_repo: str, title: str) -> tuple[str, bytes]:
"""Generate a keypair and register the public half as a
deploy key on the forge.
`owner_repo` is the `<owner>/<repo>` path (no `.git` suffix).
`title` is the human-readable label shown in the forge UI.
Returns `(key_id, private_key_bytes)` where `key_id` is opaque
to the caller and is only ever passed back to `delete`."""
@abstractmethod
def delete(self, owner_repo: str, key_id: str) -> None:
"""Delete the registered deploy key.
Must not raise if the key is already absent (HTTP 404 is
success). Must raise for all other failures so teardown halts."""
def get_provisioner(
provider: str, token: str, api_url: str
) -> DeployKeyProvisioner:
"""Instantiate the contrib provisioner for `provider`.
Raises `ManifestError` for unknown providers so the error surfaces
at parse time rather than at runtime."""
if provider == "gitea":
from bot_bottle.contrib.gitea.deploy_key_provisioner import (
GiteaDeployKeyProvisioner,
)
return GiteaDeployKeyProvisioner(token=token, api_url=api_url)
from .manifest_util import ManifestError
raise ManifestError(
f"unknown provisioned_key provider: {provider!r}; "
f"available: gitea"
)
+89 -1
View File
@@ -29,11 +29,14 @@ backend-specific and lives on concrete subclasses (see
from __future__ import annotations
import dataclasses
import os
import shlex
from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
from .log import info
from .manifest import Bottle, GitEntry
@@ -357,6 +360,80 @@ exit 0
"""
def _provision_dynamic_key(
entry: GitEntry,
slug: str,
stage_dir: Path,
) -> str:
"""Generate a fresh ed25519 keypair, register the public half with
the forge, and persist the private key + key ID under `stage_dir`.
Returns the host-side path to the private key file so the caller
can inject it into the GitGateUpstream as `identity_file`."""
from .deploy_key_provisioner import get_provisioner
pk = entry.ProvisionedKey
assert pk is not None
token = os.environ.get(pk.token_env)
if token is None:
raise RuntimeError(
f"git-gate.repos[{entry.Name!r}] provisioned_key.token_env"
f" = {pk.token_env!r}: env var is not set"
)
api_url = pk.api_url or f"https://{entry.UpstreamHost}"
provisioner = get_provisioner(pk.provider, token, api_url)
owner_repo = entry.UpstreamPath
if owner_repo.endswith(".git"):
owner_repo = owner_repo[:-4]
title = f"bot-bottle:{slug}:{entry.Name}"
info(f"provisioning deploy key for git-gate.repos[{entry.Name!r}]")
key_id, private_key_bytes = provisioner.create(owner_repo, title)
key_file = stage_dir / f"{entry.Name}-key"
key_file.write_bytes(private_key_bytes)
key_file.chmod(0o600)
id_file = stage_dir / f"{entry.Name}-deploy-key-id"
id_file.write_text(key_id)
id_file.chmod(0o600)
info(f"provisioned deploy key {key_id} for git-gate.repos[{entry.Name!r}]")
return str(key_file)
def revoke_git_gate_provisioned_keys(bottle: Bottle, stage_dir: Path) -> None:
"""Revoke all deploy keys provisioned for `bottle` during prepare.
Called at teardown after containers stop. Raises if any revocation
fails — a stranded key is a security concern that the operator must
address manually."""
from .deploy_key_provisioner import get_provisioner
for entry in bottle.git:
if entry.ProvisionedKey is None:
continue
pk = entry.ProvisionedKey
id_file = stage_dir / f"{entry.Name}-deploy-key-id"
if not id_file.exists():
continue
key_id = id_file.read_text().strip()
token = os.environ.get(pk.token_env)
if token is None:
raise RuntimeError(
f"git-gate.repos[{entry.Name!r}] provisioned_key.token_env"
f" = {pk.token_env!r}: env var is not set;"
f" cannot revoke deploy key {key_id}"
)
api_url = pk.api_url or f"https://{entry.UpstreamHost}"
provisioner = get_provisioner(pk.provider, token, api_url)
owner_repo = entry.UpstreamPath
if owner_repo.endswith(".git"):
owner_repo = owner_repo[:-4]
info(f"revoking deploy key {key_id} for git-gate.repos[{entry.Name!r}]")
provisioner.delete(owner_repo, key_id)
info(f"revoked deploy key {key_id} for git-gate.repos[{entry.Name!r}]")
class GitGate(ABC):
"""The per-agent git-gate. Encapsulates the host-side prepare
(upstream lift + entrypoint/hook render); the sidecar's
@@ -368,10 +445,21 @@ class GitGate(ABC):
entrypoint, pre-receive hook, and access-hook scripts (mode
600) under `stage_dir`. Pure host-side, no docker subprocess.
For `provisioned_key` entries, also generates and registers
a fresh deploy key via the forge API and writes the private key
+ key ID to `stage_dir`.
Returned plan is incomplete: the launch step must fill
`internal_network` / `egress_network` via `dataclasses.replace`
before passing the plan to `.start`."""
upstreams = git_gate_upstreams_for_bottle(bottle)
upstreams_list = list(git_gate_upstreams_for_bottle(bottle))
for i, entry in enumerate(bottle.git):
if entry.ProvisionedKey is not None:
key_file = _provision_dynamic_key(entry, slug, stage_dir)
upstreams_list[i] = dataclasses.replace(
upstreams_list[i], identity_file=key_file
)
upstreams = tuple(upstreams_list)
entrypoint = stage_dir / "git_gate_entrypoint.sh"
entrypoint.write_text(git_gate_render_entrypoint(upstreams))
entrypoint.chmod(0o600)
+90 -11
View File
@@ -4,6 +4,7 @@ from __future__ import annotations
import re
from dataclasses import dataclass
from typing import Optional
from .manifest_util import ManifestError, as_json_object
@@ -61,6 +62,24 @@ def validate_unique_git_names(bottle_name: str, git: tuple[GitEntry, ...]) -> No
seen[g.Name] = None
@dataclass(frozen=True)
class ProvisionedKeyConfig:
"""Configuration for automatic deploy-key lifecycle management
(PRD 0048). Used when a git-gate.repos entry opts out of a
static identity file and instead wants a fresh SSH keypair
generated at spin-up and revoked at teardown.
`provider` names the contrib sub-package to load (e.g. `gitea`).
`token_env` is the name of a host-side env var carrying the API
token; the value is read at provision time, never stored on the
plan. `api_url` is the forge's HTTP API root; if empty, it is
derived from the upstream URL's host at provision time."""
provider: str
token_env: str
api_url: str = ""
@dataclass(frozen=True)
class GitEntry:
"""One upstream the per-agent git-gate (PRD 0008) is allowed to
@@ -74,14 +93,15 @@ class GitEntry:
stashed in the `Upstream*` fields so the git-gate render step
doesn't have to re-parse.
Manifest source: `git-gate.repos.<Name>` (PRD 0047). The YAML keys
are `url`, `identity`, and `host_key`; the internal field names are
stable across that rename."""
Manifest source: `git-gate.repos.<Name>` (PRD 0047/0048). Exactly
one of `identity` (static key path) or `provisioned_key` (automatic
lifecycle) must be present. The internal field names are stable."""
Name: str
Upstream: str
IdentityFile: str
IdentityFile: str = ""
KnownHostKey: str = ""
ProvisionedKey: Optional[ProvisionedKeyConfig] = None
RemoteKey: str = ""
UpstreamUser: str = ""
UpstreamHost: str = ""
@@ -94,8 +114,9 @@ class GitEntry:
) -> "GitEntry":
"""Parse one entry from `git-gate.repos.<repo_name>`.
YAML keys: `url` (required), `identity` (required),
`host_key` (optional). The repo_name becomes `Name`."""
YAML keys: `url` (required), exactly one of `identity` or
`provisioned_key` (required), `host_key` (optional).
The repo_name becomes `Name`."""
if not repo_name:
raise ManifestError(
f"bottle '{bottle_name}' git-gate.repos has an empty key"
@@ -108,21 +129,44 @@ class GitEntry:
label = f"git-gate.repos[{repo_name!r}]"
d = as_json_object(raw, f"bottle '{bottle_name}' {label}")
for k in d:
if k not in {"url", "identity", "host_key"}:
if k not in {"url", "identity", "provisioned_key", "host_key"}:
raise ManifestError(
f"bottle '{bottle_name}' {label} has unknown key {k!r}; "
f"allowed: url, identity, host_key"
f"allowed: url, identity, provisioned_key, host_key"
)
upstream = d.get("url")
if not isinstance(upstream, str) or not upstream:
raise ManifestError(
f"bottle '{bottle_name}' {label} missing required string field 'url'"
)
ident = d.get("identity")
if not isinstance(ident, str) or not ident:
has_identity = "identity" in d
has_provisioned = "provisioned_key" in d
if has_identity and has_provisioned:
raise ManifestError(
f"bottle '{bottle_name}' {label} missing required string field 'identity'"
f"bottle '{bottle_name}' {label} must set exactly one of "
f"'identity' or 'provisioned_key'; got both."
)
if not has_identity and not has_provisioned:
raise ManifestError(
f"bottle '{bottle_name}' {label} must set exactly one of "
f"'identity' or 'provisioned_key'; got neither."
)
ident = ""
provisioned_key: Optional[ProvisionedKeyConfig] = None
if has_identity:
raw_ident = d.get("identity")
if not isinstance(raw_ident, str) or not raw_ident:
raise ManifestError(
f"bottle '{bottle_name}' {label} 'identity' must be a non-empty string"
)
ident = raw_ident
else:
provisioned_key = _parse_provisioned_key_config(
bottle_name, label, d["provisioned_key"]
)
khk = _opt_str(
d.get("host_key"),
f"bottle '{bottle_name}' {label} host_key",
@@ -135,6 +179,7 @@ class GitEntry:
Upstream=upstream,
IdentityFile=ident,
KnownHostKey=khk,
ProvisionedKey=provisioned_key,
RemoteKey=host,
UpstreamUser=user,
UpstreamHost=host,
@@ -143,6 +188,40 @@ class GitEntry:
)
def _parse_provisioned_key_config(
bottle_name: str, label: str, raw: object
) -> ProvisionedKeyConfig:
d = as_json_object(raw, f"bottle '{bottle_name}' {label}.provisioned_key")
for k in d:
if k not in {"provider", "token_env", "api_url"}:
raise ManifestError(
f"bottle '{bottle_name}' {label}.provisioned_key has unknown key {k!r}; "
f"allowed: provider, token_env, api_url"
)
provider = d.get("provider")
if not isinstance(provider, str) or not provider:
raise ManifestError(
f"bottle '{bottle_name}' {label}.provisioned_key missing required "
f"string field 'provider'"
)
token_env = d.get("token_env")
if not isinstance(token_env, str) or not token_env:
raise ManifestError(
f"bottle '{bottle_name}' {label}.provisioned_key missing required "
f"string field 'token_env'"
)
api_url_raw = d.get("api_url", "")
if not isinstance(api_url_raw, str):
raise ManifestError(
f"bottle '{bottle_name}' {label}.provisioned_key 'api_url' must be a string"
)
return ProvisionedKeyConfig(
provider=provider,
token_env=token_env,
api_url=api_url_raw,
)
@dataclass(frozen=True)
class GitUser:
"""Per-bottle `git config --global user.name` / `user.email`
+166
View File
@@ -0,0 +1,166 @@
"""Unit: GiteaDeployKeyProvisioner (PRD 0048, contrib/gitea)."""
from __future__ import annotations
import json
import unittest
import urllib.error
from io import BytesIO
from pathlib import Path
from tempfile import mkdtemp
from unittest.mock import MagicMock, call, patch
from bot_bottle.contrib.gitea.deploy_key_provisioner import (
GiteaDeployKeyProvisioner,
_split_owner_repo,
)
def _provisioner() -> GiteaDeployKeyProvisioner:
return GiteaDeployKeyProvisioner(
token="test-token", api_url="https://gitea.example.com"
)
def _urlopen_response(body: dict, status: int = 200) -> MagicMock:
resp = MagicMock()
resp.read.return_value = json.dumps(body).encode()
resp.status = status
resp.__enter__ = lambda s: s
resp.__exit__ = MagicMock(return_value=False)
return resp
def _http_error(code: int, body: str = "") -> urllib.error.HTTPError:
return urllib.error.HTTPError(
url="http://x",
code=code,
msg="err",
hdrs=None, # type: ignore[arg-type]
fp=BytesIO(body.encode()),
)
class TestCreate(unittest.TestCase):
def test_create_calls_ssh_keygen_and_posts_to_api(self):
provisioner = _provisioner()
fake_key_id = 42
fake_private = b"PRIVATE_KEY"
fake_public = "ssh-ed25519 AAAA fake"
with patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.subprocess.run"
) as mock_run, patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.urllib.request.urlopen"
) as mock_urlopen, patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.Path.read_bytes",
return_value=fake_private,
), patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.Path.read_text",
return_value=fake_public + "\n",
):
mock_urlopen.return_value = _urlopen_response({"id": fake_key_id})
key_id, private_bytes = provisioner.create(
"didericis/bot-bottle", "bot-bottle:slug:repo"
)
# ssh-keygen called with ed25519
mock_run.assert_called_once()
run_args = mock_run.call_args.args[0]
self.assertIn("ssh-keygen", run_args)
self.assertIn("-t", run_args)
self.assertIn("ed25519", run_args)
# POST body contains public key
post_call = mock_urlopen.call_args.args[0]
payload = json.loads(post_call.data)
self.assertEqual(fake_public, payload["key"])
self.assertFalse(payload["read_only"])
# Correct URL
self.assertIn(
"/api/v1/repos/didericis/bot-bottle/keys", post_call.full_url
)
self.assertEqual(str(fake_key_id), key_id)
self.assertEqual(fake_private, private_bytes)
def test_create_raises_on_http_error(self):
provisioner = _provisioner()
with patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.subprocess.run"
), patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.urllib.request.urlopen",
side_effect=_http_error(403, "forbidden"),
), patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.Path.read_bytes",
return_value=b"pk",
), patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.Path.read_text",
return_value="ssh-ed25519 AAAA\n",
):
with self.assertRaises(RuntimeError) as ctx:
provisioner.create("owner/repo", "title")
self.assertIn("403", str(ctx.exception))
class TestDelete(unittest.TestCase):
def test_delete_calls_correct_endpoint(self):
provisioner = _provisioner()
with patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.urllib.request.urlopen"
) as mock_urlopen:
mock_urlopen.return_value = _urlopen_response({})
provisioner.delete("didericis/bot-bottle", "99")
req = mock_urlopen.call_args.args[0]
self.assertIn("/api/v1/repos/didericis/bot-bottle/keys/99", req.full_url)
self.assertEqual("DELETE", req.get_method())
def test_delete_tolerates_404(self):
provisioner = _provisioner()
with patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.urllib.request.urlopen",
side_effect=_http_error(404),
):
provisioner.delete("owner/repo", "123") # must not raise
def test_delete_raises_on_non_404_http_error(self):
provisioner = _provisioner()
with patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.urllib.request.urlopen",
side_effect=_http_error(500, "internal server error"),
):
with self.assertRaises(RuntimeError) as ctx:
provisioner.delete("owner/repo", "7")
self.assertIn("500", str(ctx.exception))
def test_delete_raises_on_url_error(self):
provisioner = _provisioner()
with patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.urllib.request.urlopen",
side_effect=urllib.error.URLError("connection refused"),
):
with self.assertRaises(RuntimeError) as ctx:
provisioner.delete("owner/repo", "7")
self.assertIn("connection refused", str(ctx.exception))
class TestSplitOwnerRepo(unittest.TestCase):
def test_simple(self):
self.assertEqual(("owner", "repo"), _split_owner_repo("owner/repo"))
def test_raises_on_missing_slash(self):
with self.assertRaises(ValueError):
_split_owner_repo("noslash")
def test_raises_on_empty_owner(self):
with self.assertRaises(ValueError):
_split_owner_repo("/repo")
def test_raises_on_empty_repo(self):
with self.assertRaises(ValueError):
_split_owner_repo("owner/")
if __name__ == "__main__":
unittest.main()
+29
View File
@@ -0,0 +1,29 @@
"""Unit: deploy_key_provisioner factory (PRD 0048)."""
from __future__ import annotations
import unittest
from unittest.mock import patch
from bot_bottle.deploy_key_provisioner import DeployKeyProvisioner, get_provisioner
from bot_bottle.manifest import ManifestError
class TestGetProvisioner(unittest.TestCase):
def test_gitea_returns_gitea_provisioner(self):
from bot_bottle.contrib.gitea.deploy_key_provisioner import (
GiteaDeployKeyProvisioner,
)
p = get_provisioner("gitea", token="tok", api_url="https://gitea.example.com")
self.assertIsInstance(p, GiteaDeployKeyProvisioner)
self.assertIsInstance(p, DeployKeyProvisioner)
def test_unknown_provider_raises_manifest_error(self):
with self.assertRaises(ManifestError) as ctx:
get_provisioner("github", token="tok", api_url="https://github.com")
self.assertIn("github", str(ctx.exception))
self.assertIn("provisioned_key provider", str(ctx.exception))
if __name__ == "__main__":
unittest.main()
+107
View File
@@ -243,6 +243,113 @@ class TestGitEntryCrossValidation(unittest.TestCase):
self.assertIn("PRD 0047", msg)
class TestProvisionedKey(unittest.TestCase):
"""git-gate.repos entries that use provisioned_key (PRD 0048)."""
def test_provisioned_key_minimal(self):
m = Manifest.from_json_obj(_manifest({
"bot-bottle": {
"url": "ssh://git@gitea.dideric.is:30009/didericis/bot-bottle.git",
"provisioned_key": {
"provider": "gitea",
"token_env": "GITEA_TOKEN",
},
},
}))
e = m.bottles["dev"].git[0]
self.assertEqual("bot-bottle", e.Name)
self.assertIsNotNone(e.ProvisionedKey)
assert e.ProvisionedKey is not None
self.assertEqual("gitea", e.ProvisionedKey.provider)
self.assertEqual("GITEA_TOKEN", e.ProvisionedKey.token_env)
self.assertEqual("", e.ProvisionedKey.api_url)
self.assertEqual("", e.IdentityFile)
def test_provisioned_key_with_api_url(self):
m = Manifest.from_json_obj(_manifest({
"repo": {
"url": "ssh://git@gitea.example.com/org/repo.git",
"provisioned_key": {
"provider": "gitea",
"token_env": "MY_TOKEN",
"api_url": "https://gitea.example.com",
},
},
}))
pk = m.bottles["dev"].git[0].ProvisionedKey
assert pk is not None
self.assertEqual("https://gitea.example.com", pk.api_url)
def test_both_identity_and_provisioned_key_dies(self):
with self.assertRaises(ManifestError) as ctx:
Manifest.from_json_obj(_manifest({
"foo": {
"url": "ssh://git@github.com/foo.git",
"identity": "/dev/null",
"provisioned_key": {"provider": "gitea", "token_env": "T"},
},
}))
self.assertIn("exactly one of", str(ctx.exception))
self.assertIn("got both", str(ctx.exception))
def test_neither_identity_nor_provisioned_key_dies(self):
with self.assertRaises(ManifestError) as ctx:
Manifest.from_json_obj(_manifest({
"foo": {"url": "ssh://git@github.com/foo.git"},
}))
self.assertIn("exactly one of", str(ctx.exception))
self.assertIn("got neither", str(ctx.exception))
def test_unknown_key_in_provisioned_key_block_dies(self):
with self.assertRaises(ManifestError):
Manifest.from_json_obj(_manifest({
"foo": {
"url": "ssh://git@github.com/foo.git",
"provisioned_key": {
"provider": "gitea",
"token_env": "T",
"key_type": "rsa", # not allowed
},
},
}))
def test_missing_provider_dies(self):
with self.assertRaises(ManifestError):
Manifest.from_json_obj(_manifest({
"foo": {
"url": "ssh://git@github.com/foo.git",
"provisioned_key": {"token_env": "T"},
},
}))
def test_missing_token_env_dies(self):
with self.assertRaises(ManifestError):
Manifest.from_json_obj(_manifest({
"foo": {
"url": "ssh://git@github.com/foo.git",
"provisioned_key": {"provider": "gitea"},
},
}))
def test_provisioned_key_entry_has_no_identity_file(self):
m = Manifest.from_json_obj(_manifest({
"foo": {
"url": "ssh://git@github.com/didericis/foo.git",
"provisioned_key": {"provider": "gitea", "token_env": "T"},
},
}))
self.assertEqual("", m.bottles["dev"].git[0].IdentityFile)
def test_identity_entry_has_no_provisioned_key(self):
m = Manifest.from_json_obj(_manifest({
"foo": {
"url": "ssh://git@github.com/foo.git",
"identity": "/dev/null",
},
}))
self.assertIsNone(m.bottles["dev"].git[0].ProvisionedKey)
class TestEmptyGitGateField(unittest.TestCase):
def test_no_git_gate_field_yields_empty_tuple(self):
m = Manifest.from_json_obj({