PRD 0048: SSH deploy-key provisioning #170

Merged
didericis merged 4 commits from prd/0048-ssh-deploy-key-provisioning into main 2026-06-03 11:58:39 -04:00
12 changed files with 982 additions and 12 deletions
+8
View File
@@ -43,6 +43,7 @@ from pathlib import Path
from typing import Callable, Generator
from ...egress import egress_resolve_token_values
from ...git_gate import revoke_git_gate_provisioned_keys
from ...log import info, warn
from . import network as network_mod
from . import util as docker_mod
@@ -51,6 +52,7 @@ from .bottle_plan import DockerBottlePlan
from .bottle_state import (
bottle_state_dir,
egress_state_dir,
git_gate_state_dir,
pipelock_state_dir,
)
from .compose import (
@@ -84,6 +86,9 @@ def launch(
Teardown on exit."""
stack = ExitStack()
_bottle_for_revoke = plan.spec.manifest.bottle_for(plan.spec.agent_name)
_git_gate_dir_for_revoke = git_gate_state_dir(plan.slug)
def teardown() -> None:
try:
stack.close()
@@ -92,6 +97,9 @@ def launch(
f"teardown failed for container {plan.container_name}"
f" (compose-down): {exc!r}"
)
revoke_git_gate_provisioned_keys(
_bottle_for_revoke, _git_gate_dir_for_revoke
)
try:
# Step 1: agent image build. Sidecar images get built lazily by
+24
View File
@@ -53,6 +53,9 @@ from ..docker.pipelock import (
PIPELOCK_PORT as _PIPELOCK_PORT_STR,
pipelock_tls_init,
)
from ...git_gate import revoke_git_gate_provisioned_keys
from ...log import warn
from ..docker.bottle_state import git_gate_state_dir
from . import loopback_alias as _loopback
from . import sidecar_bundle as _bundle
from . import smolvm as _smolvm
@@ -120,7 +123,28 @@ def launch(
agent_prompt_mode=plan.agent_prompt_mode,
)
finally:
_teardown_smolmachines(stack, plan)
def _teardown_smolmachines(
stack: ExitStack,
plan: SmolmachinesBottlePlan,
) -> None:
"""Unwind the ExitStack, then revoke any provisioned deploy keys.
ExitStack errors are caught and logged (non-fatal) so that key
revocation always runs. Revocation errors propagate — a stranded
deploy key is a security concern the operator must address."""
teardown_exc: BaseException | None = None
try:
stack.close()
except BaseException as exc:
teardown_exc = exc
warn(f"smolmachines teardown failed: {exc!r}")
bottle = plan.spec.manifest.bottle_for(plan.spec.agent_name)
revoke_git_gate_provisioned_keys(bottle, git_gate_state_dir(plan.slug))
if teardown_exc is not None:
raise teardown_exc
def _allocate_resources(
View File
@@ -0,0 +1,121 @@
"""Gitea deploy-key provisioner (PRD 0048, contrib).
Generates ed25519 keypairs via `ssh-keygen` and registers / deletes
them using the Gitea deploy-key HTTP API. No new Python dependencies —
only stdlib `urllib.request` and `subprocess`."""
from __future__ import annotations
import json
import subprocess
import tempfile
import urllib.error
import urllib.request
from pathlib import Path
from ...deploy_key_provisioner import DeployKeyProvisioner
class GiteaDeployKeyProvisioner(DeployKeyProvisioner):
"""Manages deploy keys on a Gitea instance."""
def __init__(self, *, token: str, api_url: str) -> None:
self._token = token
self._api_url = api_url.rstrip("/")
def create(self, owner_repo: str, title: str) -> tuple[str, bytes]:
"""Generate an ed25519 keypair, register the public half as a
repo deploy key, and return `(key_id, private_key_bytes)`.
The key is registered with `read_only=False` because git-gate
needs push access to forward gitleaks-scanned refs upstream."""
with tempfile.TemporaryDirectory() as tmpdir:
key_path = Path(tmpdir) / "key"
subprocess.run(
[
"ssh-keygen", "-t", "ed25519",
"-f", str(key_path),
"-N", "",
],
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
private_key = key_path.read_bytes()
public_key = key_path.with_suffix(".pub").read_text().strip()
owner, repo = _split_owner_repo(owner_repo)
url = f"{self._api_url}/api/v1/repos/{owner}/{repo}/keys"
payload = json.dumps({
"key": public_key,
"read_only": False,
"title": title,
}).encode()
req = urllib.request.Request(
url,
data=payload,
headers={
"Authorization": f"token {self._token}",
"Content-Type": "application/json",
},
method="POST",
)
try:
with urllib.request.urlopen(req) as resp:
body = json.loads(resp.read())
except urllib.error.HTTPError as exc:
_body = _read_error_body(exc)
raise RuntimeError(
f"failed to create deploy key for {owner_repo}: "
f"HTTP {exc.code}{_body}"
) from exc
except urllib.error.URLError as exc:
raise RuntimeError(
f"failed to create deploy key for {owner_repo}: {exc.reason}"
) from exc
return str(body["id"]), private_key
def delete(self, owner_repo: str, key_id: str) -> None:
"""Delete the deploy key. HTTP 404 (already gone) is success.
All other errors raise RuntimeError so teardown halts."""
owner, repo = _split_owner_repo(owner_repo)
url = f"{self._api_url}/api/v1/repos/{owner}/{repo}/keys/{key_id}"
req = urllib.request.Request(
url,
headers={"Authorization": f"token {self._token}"},
method="DELETE",
)
try:
with urllib.request.urlopen(req):
pass
except urllib.error.HTTPError as exc:
if exc.code == 404:
return
_body = _read_error_body(exc)
raise RuntimeError(
f"failed to delete deploy key {key_id} for {owner_repo}: "
f"HTTP {exc.code}{_body}"
) from exc
except urllib.error.URLError as exc:
raise RuntimeError(
f"failed to delete deploy key {key_id} for {owner_repo}: "
f"{exc.reason}"
) from exc
def _split_owner_repo(owner_repo: str) -> tuple[str, str]:
"""Split `'owner/repo'` into `('owner', 'repo')`."""
parts = owner_repo.split("/", 1)
if len(parts) != 2 or not all(parts):
raise ValueError(
f"expected 'owner/repo' format, got {owner_repo!r}"
)
return parts[0], parts[1]
def _read_error_body(exc: urllib.error.HTTPError) -> str:
try:
return exc.read().decode("utf-8", errors="replace")
except Exception:
return ""
+52
View File
@@ -0,0 +1,52 @@
"""Deploy-key provisioner interface and factory (PRD 0048).
The core defines the abstract contract; concrete implementations live
in `bot_bottle/contrib/<provider>/deploy_key_provisioner.py`. The
factory `get_provisioner` imports contrib modules lazily so that a
missing optional dependency in one provider doesn't break unrelated
features."""
from __future__ import annotations
from abc import ABC, abstractmethod
class DeployKeyProvisioner(ABC):
"""Manages a single deploy-key lifecycle on a remote forge."""
@abstractmethod
def create(self, owner_repo: str, title: str) -> tuple[str, bytes]:
"""Generate a keypair and register the public half as a
deploy key on the forge.
`owner_repo` is the `<owner>/<repo>` path (no `.git` suffix).
`title` is the human-readable label shown in the forge UI.
Returns `(key_id, private_key_bytes)` where `key_id` is opaque
to the caller and is only ever passed back to `delete`."""
@abstractmethod
def delete(self, owner_repo: str, key_id: str) -> None:
"""Delete the registered deploy key.
Must not raise if the key is already absent (HTTP 404 is
success). Must raise for all other failures so teardown halts."""
def get_provisioner(
provider: str, token: str, api_url: str
) -> DeployKeyProvisioner:
"""Instantiate the contrib provisioner for `provider`.
Raises `ManifestError` for unknown providers so the error surfaces
at parse time rather than at runtime."""
if provider == "gitea":
from bot_bottle.contrib.gitea.deploy_key_provisioner import (
GiteaDeployKeyProvisioner,
)
return GiteaDeployKeyProvisioner(token=token, api_url=api_url)
from .manifest_util import ManifestError
raise ManifestError(
f"unknown provisioned_key provider: {provider!r}; "
f"available: gitea"
)
+89 -1
View File
@@ -29,11 +29,14 @@ backend-specific and lives on concrete subclasses (see
from __future__ import annotations
import dataclasses
import os
import shlex
from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
from .log import info
from .manifest import Bottle, GitEntry
@@ -357,6 +360,80 @@ exit 0
"""
def _provision_dynamic_key(
entry: GitEntry,
slug: str,
stage_dir: Path,
) -> str:
"""Generate a fresh ed25519 keypair, register the public half with
the forge, and persist the private key + key ID under `stage_dir`.
Returns the host-side path to the private key file so the caller
can inject it into the GitGateUpstream as `identity_file`."""
from .deploy_key_provisioner import get_provisioner
pk = entry.ProvisionedKey
assert pk is not None
token = os.environ.get(pk.token_env)
if token is None:
raise RuntimeError(
f"git-gate.repos[{entry.Name!r}] provisioned_key.token_env"
f" = {pk.token_env!r}: env var is not set"
)
api_url = pk.api_url or f"https://{entry.UpstreamHost}"
provisioner = get_provisioner(pk.provider, token, api_url)
owner_repo = entry.UpstreamPath
if owner_repo.endswith(".git"):
owner_repo = owner_repo[:-4]
title = f"bot-bottle:{slug}:{entry.Name}"
info(f"provisioning deploy key for git-gate.repos[{entry.Name!r}]")
key_id, private_key_bytes = provisioner.create(owner_repo, title)
key_file = stage_dir / f"{entry.Name}-key"
key_file.write_bytes(private_key_bytes)
key_file.chmod(0o600)
id_file = stage_dir / f"{entry.Name}-deploy-key-id"
id_file.write_text(key_id)
id_file.chmod(0o600)
info(f"provisioned deploy key {key_id} for git-gate.repos[{entry.Name!r}]")
return str(key_file)
def revoke_git_gate_provisioned_keys(bottle: Bottle, stage_dir: Path) -> None:
"""Revoke all deploy keys provisioned for `bottle` during prepare.
Called at teardown after containers stop. Raises if any revocation
fails — a stranded key is a security concern that the operator must
address manually."""
from .deploy_key_provisioner import get_provisioner
for entry in bottle.git:
if entry.ProvisionedKey is None:
continue
pk = entry.ProvisionedKey
id_file = stage_dir / f"{entry.Name}-deploy-key-id"
if not id_file.exists():
continue
key_id = id_file.read_text().strip()
token = os.environ.get(pk.token_env)
if token is None:
raise RuntimeError(
f"git-gate.repos[{entry.Name!r}] provisioned_key.token_env"
f" = {pk.token_env!r}: env var is not set;"
f" cannot revoke deploy key {key_id}"
)
api_url = pk.api_url or f"https://{entry.UpstreamHost}"
provisioner = get_provisioner(pk.provider, token, api_url)
owner_repo = entry.UpstreamPath
if owner_repo.endswith(".git"):
owner_repo = owner_repo[:-4]
info(f"revoking deploy key {key_id} for git-gate.repos[{entry.Name!r}]")
provisioner.delete(owner_repo, key_id)
info(f"revoked deploy key {key_id} for git-gate.repos[{entry.Name!r}]")
class GitGate(ABC):
"""The per-agent git-gate. Encapsulates the host-side prepare
(upstream lift + entrypoint/hook render); the sidecar's
@@ -368,10 +445,21 @@ class GitGate(ABC):
entrypoint, pre-receive hook, and access-hook scripts (mode
600) under `stage_dir`. Pure host-side, no docker subprocess.
For `provisioned_key` entries, also generates and registers
a fresh deploy key via the forge API and writes the private key
+ key ID to `stage_dir`.
Returned plan is incomplete: the launch step must fill
`internal_network` / `egress_network` via `dataclasses.replace`
before passing the plan to `.start`."""
upstreams = git_gate_upstreams_for_bottle(bottle)
upstreams_list = list(git_gate_upstreams_for_bottle(bottle))
for i, entry in enumerate(bottle.git):
if entry.ProvisionedKey is not None:
key_file = _provision_dynamic_key(entry, slug, stage_dir)
upstreams_list[i] = dataclasses.replace(
upstreams_list[i], identity_file=key_file
)
upstreams = tuple(upstreams_list)
entrypoint = stage_dir / "git_gate_entrypoint.sh"
entrypoint.write_text(git_gate_render_entrypoint(upstreams))
entrypoint.chmod(0o600)
+90 -11
View File
@@ -4,6 +4,7 @@ from __future__ import annotations
import re
from dataclasses import dataclass
from typing import Optional
from .manifest_util import ManifestError, as_json_object
@@ -61,6 +62,24 @@ def validate_unique_git_names(bottle_name: str, git: tuple[GitEntry, ...]) -> No
seen[g.Name] = None
@dataclass(frozen=True)
class ProvisionedKeyConfig:
"""Configuration for automatic deploy-key lifecycle management
(PRD 0048). Used when a git-gate.repos entry opts out of a
static identity file and instead wants a fresh SSH keypair
generated at spin-up and revoked at teardown.
`provider` names the contrib sub-package to load (e.g. `gitea`).
`token_env` is the name of a host-side env var carrying the API
token; the value is read at provision time, never stored on the
plan. `api_url` is the forge's HTTP API root; if empty, it is
derived from the upstream URL's host at provision time."""
provider: str
token_env: str
api_url: str = ""
@dataclass(frozen=True)
class GitEntry:
"""One upstream the per-agent git-gate (PRD 0008) is allowed to
@@ -74,14 +93,15 @@ class GitEntry:
stashed in the `Upstream*` fields so the git-gate render step
doesn't have to re-parse.
Manifest source: `git-gate.repos.<Name>` (PRD 0047). The YAML keys
are `url`, `identity`, and `host_key`; the internal field names are
stable across that rename."""
Manifest source: `git-gate.repos.<Name>` (PRD 0047/0048). Exactly
one of `identity` (static key path) or `provisioned_key` (automatic
lifecycle) must be present. The internal field names are stable."""
Name: str
Upstream: str
IdentityFile: str
IdentityFile: str = ""
KnownHostKey: str = ""
ProvisionedKey: Optional[ProvisionedKeyConfig] = None
RemoteKey: str = ""
UpstreamUser: str = ""
UpstreamHost: str = ""
@@ -94,8 +114,9 @@ class GitEntry:
) -> "GitEntry":
"""Parse one entry from `git-gate.repos.<repo_name>`.
YAML keys: `url` (required), `identity` (required),
`host_key` (optional). The repo_name becomes `Name`."""
YAML keys: `url` (required), exactly one of `identity` or
`provisioned_key` (required), `host_key` (optional).
The repo_name becomes `Name`."""
if not repo_name:
raise ManifestError(
f"bottle '{bottle_name}' git-gate.repos has an empty key"
@@ -108,21 +129,44 @@ class GitEntry:
label = f"git-gate.repos[{repo_name!r}]"
d = as_json_object(raw, f"bottle '{bottle_name}' {label}")
for k in d:
if k not in {"url", "identity", "host_key"}:
if k not in {"url", "identity", "provisioned_key", "host_key"}:
raise ManifestError(
f"bottle '{bottle_name}' {label} has unknown key {k!r}; "
f"allowed: url, identity, host_key"
f"allowed: url, identity, provisioned_key, host_key"
)
upstream = d.get("url")
if not isinstance(upstream, str) or not upstream:
raise ManifestError(
f"bottle '{bottle_name}' {label} missing required string field 'url'"
)
ident = d.get("identity")
if not isinstance(ident, str) or not ident:
has_identity = "identity" in d
has_provisioned = "provisioned_key" in d
if has_identity and has_provisioned:
raise ManifestError(
f"bottle '{bottle_name}' {label} missing required string field 'identity'"
f"bottle '{bottle_name}' {label} must set exactly one of "
f"'identity' or 'provisioned_key'; got both."
)
if not has_identity and not has_provisioned:
raise ManifestError(
f"bottle '{bottle_name}' {label} must set exactly one of "
f"'identity' or 'provisioned_key'; got neither."
)
ident = ""
provisioned_key: Optional[ProvisionedKeyConfig] = None
if has_identity:
raw_ident = d.get("identity")
if not isinstance(raw_ident, str) or not raw_ident:
raise ManifestError(
f"bottle '{bottle_name}' {label} 'identity' must be a non-empty string"
)
ident = raw_ident
else:
provisioned_key = _parse_provisioned_key_config(
bottle_name, label, d["provisioned_key"]
)
khk = _opt_str(
d.get("host_key"),
f"bottle '{bottle_name}' {label} host_key",
@@ -135,6 +179,7 @@ class GitEntry:
Upstream=upstream,
IdentityFile=ident,
KnownHostKey=khk,
ProvisionedKey=provisioned_key,
RemoteKey=host,
UpstreamUser=user,
UpstreamHost=host,
@@ -143,6 +188,40 @@ class GitEntry:
)
def _parse_provisioned_key_config(
bottle_name: str, label: str, raw: object
) -> ProvisionedKeyConfig:
d = as_json_object(raw, f"bottle '{bottle_name}' {label}.provisioned_key")
for k in d:
if k not in {"provider", "token_env", "api_url"}:
raise ManifestError(
f"bottle '{bottle_name}' {label}.provisioned_key has unknown key {k!r}; "
f"allowed: provider, token_env, api_url"
)
provider = d.get("provider")
if not isinstance(provider, str) or not provider:
raise ManifestError(
f"bottle '{bottle_name}' {label}.provisioned_key missing required "
f"string field 'provider'"
)
token_env = d.get("token_env")
if not isinstance(token_env, str) or not token_env:
raise ManifestError(
f"bottle '{bottle_name}' {label}.provisioned_key missing required "
f"string field 'token_env'"
)
api_url_raw = d.get("api_url", "")
if not isinstance(api_url_raw, str):
raise ManifestError(
f"bottle '{bottle_name}' {label}.provisioned_key 'api_url' must be a string"
)
return ProvisionedKeyConfig(
provider=provider,
token_env=token_env,
api_url=api_url_raw,
)
@dataclass(frozen=True)
class GitUser:
"""Per-bottle `git config --global user.name` / `user.email`
@@ -0,0 +1,296 @@
# PRD 0048: SSH Deploy-Key Provisioning
- **Status:** Active
- **Author:** didericis-claude
- **Created:** 2026-06-03
- **Issue:** #169
## Summary
Replace per-repo static SSH identity files with short-lived ed25519 deploy
keys that are generated at spin-up and revoked at teardown. Introduce
`bot_bottle/contrib/` as the package for platform-specific provisioners and
ship the first contrib sub-package: `bot_bottle/contrib/gitea/` with
`GiteaDeployKeyProvisioner`. A new `provisioned_key:` block in `git-gate.repos`
entries opts a repo into automatic key lifecycle management; `identity:` stays
valid for operators who supply their own key material.
## Problem
The current `git-gate.repos` entries require an `identity:` field pointing to
a host-side SSH private key (PRD 0047). Keys are static: the operator generates
them once, registers them with the upstream forge, and the same key is reused
across every bottle spin-up. This has several consequences:
- **No automatic revocation.** If a bottle misbehaves or a key leaks, the
operator must notice and manually delete the key from the forge. There is no
teardown hook that does it.
- **Broad blast radius.** A forge deploy key typically grants write access for
the lifetime of the key. A static key that survives bottle teardown continues
to grant that access.
- **Manual rotation burden.** Operators must manage key files on disk, keeping
them secure, rotating them on a schedule, and distributing them across hosts
that run `./cli.py start`.
## Goals / Success Criteria
- `git-gate.repos` entries accept `provisioned_key:` as an alternative to
`identity:`. The parser rejects entries that have both, or neither.
- `provisioned_key.provider: gitea` provisions and revokes deploy keys via the
Gitea HTTP API.
- At prepare time the provisioner generates a fresh ed25519 keypair, registers
the public half as a repo-scoped deploy key, and makes the private key
available to git-gate at the path it expects — the rest of the pipeline is
unchanged.
- At teardown the provisioner deletes the registered deploy key. Failure to
delete halts teardown and propagates the error loudly.
- `bot_bottle/contrib/` is introduced as the package for platform-specific
implementations; the core defines the abstract interface; contrib sub-packages
provide concrete implementations.
- Existing `identity:`-based repos continue to work without change.
- The unit test suite passes unchanged for `identity:` paths; new tests cover
`provisioned_key:` parse, validation, and provisioner dispatch.
## Non-goals
- GitHub, GitLab, or other forge providers (a future contrib sub-package each).
- Dashboard UI for listing or revoking orphaned deploy keys.
- SSH CA certificate approach (rejected in the issue thread in favour of
per-repo deploy keys for simpler revocation, smaller blast radius, and forge
compatibility).
- Key rotation mid-session (keys live for exactly one spin-up / teardown cycle).
- Any change to how `identity:` repos are provisioned.
## Design
### Manifest changes (builds on PRD 0047)
`git-gate.repos.<name>` currently accepts exactly:
```
url (required string)
identity (required string)
host_key (optional string)
```
After this PRD:
```
url (required string)
identity (optional string — mutually exclusive with provisioned_key)
provisioned_key (optional object — mutually exclusive with identity)
host_key (optional string)
```
Exactly one of `identity` or `provisioned_key` must be present. The parser
emits a targeted error for each violation:
```
bottle 'dev' git-gate.repos['bot-bottle'] must set exactly one of
'identity' or 'provisioned_key'; got neither.
bottle 'dev' git-gate.repos['bot-bottle'] must set exactly one of
'identity' or 'provisioned_key'; got both.
```
`provisioned_key` object schema:
```yaml
provisioned_key:
provider: gitea # required; names the contrib module to load
token_env: GITEA_TOKEN # required; name of a host env var holding the API token
api_url: https://... # optional; defaults to https://<host from url>
```
| Field | Type | Notes |
|-------|------|-------|
| `provider` | required string | Must match a sub-package under `bot_bottle/contrib/` |
| `token_env` | required string | Resolved at provision time via `os.environ`; never stored in plan |
| `api_url` | optional string | Override when the API endpoint differs from the git host |
**Example bottle manifest:**
```yaml
git-gate:
user:
name: implementer-bot
email: eric+implementer@dideric.is
repos:
bot-bottle:
url: ssh://git@gitea.dideric.is:30009/didericis/bot-bottle.git
provisioned_key:
provider: gitea
token_env: GITEA_DEPLOY_TOKEN
host_key: "ssh-rsa AAAA..."
```
### `contrib` package structure
```
bot_bottle/
contrib/
__init__.py # empty; no core symbols
gitea/
__init__.py # empty
deploy_key_provisioner.py
```
`contrib` is a flat namespace of forge/platform sub-packages. Each sub-package
is self-contained; the core imports from contrib lazily (inside factory
functions) so that missing optional dependencies in a contrib sub-package don't
break unrelated features.
### Core interface
New file: `bot_bottle/deploy_key_provisioner.py`
```python
from abc import ABC, abstractmethod
class DeployKeyProvisioner(ABC):
@abstractmethod
def create(self, owner_repo: str, title: str) -> tuple[str, bytes]:
"""Generate a keypair and register the public half.
owner_repo: '<owner>/<repo>' portion of the git upstream URL.
title: human-readable label shown in the forge key list.
Returns (key_id, private_key_pem) where key_id is opaque to
the caller and is only passed back to delete()."""
@abstractmethod
def delete(self, owner_repo: str, key_id: str) -> None:
"""Delete the registered deploy key.
Must not raise if the key is already absent (HTTP 404 is success).
Must raise for all other failures so that teardown halts."""
def get_provisioner(provider: str, token: str, api_url: str) -> DeployKeyProvisioner:
"""Instantiate the named contrib provisioner.
Raises ManifestError for unknown providers so the error is caught
at parse time rather than at runtime."""
if provider == "gitea":
from bot_bottle.contrib.gitea.deploy_key_provisioner import (
GiteaDeployKeyProvisioner,
)
return GiteaDeployKeyProvisioner(token=token, api_url=api_url)
from .manifest_util import ManifestError
raise ManifestError(f"unknown provisioned_key provider: {provider!r}")
```
### Gitea contrib implementation
`bot_bottle/contrib/gitea/deploy_key_provisioner.py`:
`create(owner_repo, title)`:
1. Generate an ed25519 keypair via `ssh-keygen -t ed25519 -f <tmpfile> -N ''`
(uses the SSH tooling already required by git-gate; no new Python dependency).
2. Read the private key bytes and the `.pub` file.
3. `POST /api/v1/repos/{owner}/{repo}/keys` with the public key, `title`, and
`read_only: false` (deploy keys always need push access for git-gate).
4. Return `(str(response["id"]), private_key_bytes)`.
`delete(owner_repo, key_id)`:
1. `DELETE /api/v1/repos/{owner}/{repo}/keys/{id}`.
2. Treat HTTP 404 as success (key already gone).
3. Raise `RuntimeError` for any other non-2xx response or network error,
including the status code and response body in the message.
HTTP calls use `urllib.request` from the stdlib; no new runtime dependency.
### `GitEntry` dataclass changes
`bot_bottle/manifest_git.py`:
- Add `ProvisionedKeyConfig` dataclass:
```python
@dataclass(frozen=True)
class ProvisionedKeyConfig:
provider: str
token_env: str
api_url: str # empty string means "derive from UpstreamHost"
```
- `GitEntry`:
- `IdentityFile: str` unchanged internally; empty string when
`provisioned_key` is used; set at provision time, not parse time.
- New field: `ProvisionedKey: ProvisionedKeyConfig | None = None`
- `from_repos_entry` validates the mutually-exclusive constraint and parses
the `provisioned_key` block when present.
### `GitGateUpstream` / prepare-time changes
`bot_bottle/git_gate.py` and `bot_bottle/backend/docker/provision/git.py`:
The existing path writes the identity file path into `GitGateUpstream.IdentityFile`
and docker-cp's it into `/git-gate/creds/<name>-key`. That path stays unchanged
for `identity:` repos.
For `provisioned_key:` repos, a new helper `provision_deploy_key(entry,
stage_dir, bottle_name)` runs before the git-gate sidecar starts:
1. Resolve `token = os.environ[entry.ProvisionedKey.token_env]`. Missing key
raises `RuntimeError` with a clear message naming the env var.
2. Resolve `api_url = entry.ProvisionedKey.api_url or f"https://{entry.UpstreamHost}"`.
3. Instantiate `get_provisioner(entry.ProvisionedKey.provider, token, api_url)`.
4. Call `provisioner.create(entry.UpstreamPath.lstrip("/"), title)` where
`title = f"bot-bottle:{bottle_name}:{entry.Name}"`.
5. Write private key to `stage_dir / f"{entry.Name}-key"` (mode 0o600).
6. Write key ID to `stage_dir / f"{entry.Name}-deploy-key-id"` (plain text).
7. Return the key file path; caller sets `GitGateUpstream.IdentityFile` to it.
`owner_repo` is extracted from `entry.UpstreamPath` (the path component of the
`ssh://` URL, e.g. `/didericis/bot-bottle.git` → `didericis/bot-bottle`).
### Teardown changes
`bot_bottle/backend/docker/cleanup.py` (or the equivalent teardown path):
After the git-gate sidecar stops, for each `GitEntry` with `ProvisionedKey`
set:
1. Check that `stage_dir / f"{entry.Name}-deploy-key-id"` exists; skip if
absent (provision never ran or already cleaned up).
2. Resolve token and API URL as above.
3. Instantiate provisioner and call `provisioner.delete(owner_repo, key_id)`.
4. On success, log at INFO. On failure, allow the exception to propagate —
teardown halts and the error surfaces to the operator.
A stranded deploy key is a security concern: the operator must know about it
and address it manually. Silent continuation is not acceptable.
The private key file in `stage_dir` is cleaned up as part of normal stage-dir
teardown (no extra step needed).
## Testing strategy
```
python3 -m unittest discover -s tests/unit
```
New / modified test files:
- `tests/unit/test_manifest_git.py` — add cases for:
- `provisioned_key:` accepted with valid `provider`, `token_env`, optional `api_url`
- Both `identity` and `provisioned_key` present → `ManifestError`
- Neither `identity` nor `provisioned_key` present → `ManifestError`
- Unknown key inside `provisioned_key` block → `ManifestError`
- Missing `provider` or `token_env` inside `provisioned_key` → `ManifestError`
- `tests/unit/test_deploy_key_provisioner.py` — new:
- `get_provisioner("gitea", ...)` returns `GiteaDeployKeyProvisioner`
- `get_provisioner("unknown", ...)` raises `ManifestError`
- `tests/unit/test_contrib_gitea_deploy_key.py` — new (using `unittest.mock`
to stub `urllib.request.urlopen` and `subprocess.run`):
- `create()` calls `ssh-keygen`, POSTs to correct endpoint, returns key ID
- `delete()` DELETEs to correct endpoint
- `delete()` tolerates HTTP 404 (already-deleted key)
- `delete()` raises `RuntimeError` on non-404 HTTP error
## Open questions
None.
+166
View File
@@ -0,0 +1,166 @@
"""Unit: GiteaDeployKeyProvisioner (PRD 0048, contrib/gitea)."""
from __future__ import annotations
import json
import unittest
import urllib.error
from io import BytesIO
from pathlib import Path
from tempfile import mkdtemp
from unittest.mock import MagicMock, call, patch
from bot_bottle.contrib.gitea.deploy_key_provisioner import (
GiteaDeployKeyProvisioner,
_split_owner_repo,
)
def _provisioner() -> GiteaDeployKeyProvisioner:
return GiteaDeployKeyProvisioner(
token="test-token", api_url="https://gitea.example.com"
)
def _urlopen_response(body: dict, status: int = 200) -> MagicMock:
resp = MagicMock()
resp.read.return_value = json.dumps(body).encode()
resp.status = status
resp.__enter__ = lambda s: s
resp.__exit__ = MagicMock(return_value=False)
return resp
def _http_error(code: int, body: str = "") -> urllib.error.HTTPError:
return urllib.error.HTTPError(
url="http://x",
code=code,
msg="err",
hdrs=None, # type: ignore[arg-type]
fp=BytesIO(body.encode()),
)
class TestCreate(unittest.TestCase):
def test_create_calls_ssh_keygen_and_posts_to_api(self):
provisioner = _provisioner()
fake_key_id = 42
fake_private = b"PRIVATE_KEY"
fake_public = "ssh-ed25519 AAAA fake"
with patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.subprocess.run"
) as mock_run, patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.urllib.request.urlopen"
) as mock_urlopen, patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.Path.read_bytes",
return_value=fake_private,
), patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.Path.read_text",
return_value=fake_public + "\n",
):
mock_urlopen.return_value = _urlopen_response({"id": fake_key_id})
key_id, private_bytes = provisioner.create(
"didericis/bot-bottle", "bot-bottle:slug:repo"
)
# ssh-keygen called with ed25519
mock_run.assert_called_once()
run_args = mock_run.call_args.args[0]
self.assertIn("ssh-keygen", run_args)
self.assertIn("-t", run_args)
self.assertIn("ed25519", run_args)
# POST body contains public key
post_call = mock_urlopen.call_args.args[0]
payload = json.loads(post_call.data)
self.assertEqual(fake_public, payload["key"])
self.assertFalse(payload["read_only"])
# Correct URL
self.assertIn(
"/api/v1/repos/didericis/bot-bottle/keys", post_call.full_url
)
self.assertEqual(str(fake_key_id), key_id)
self.assertEqual(fake_private, private_bytes)
def test_create_raises_on_http_error(self):
provisioner = _provisioner()
with patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.subprocess.run"
), patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.urllib.request.urlopen",
side_effect=_http_error(403, "forbidden"),
), patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.Path.read_bytes",
return_value=b"pk",
), patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.Path.read_text",
return_value="ssh-ed25519 AAAA\n",
):
with self.assertRaises(RuntimeError) as ctx:
provisioner.create("owner/repo", "title")
self.assertIn("403", str(ctx.exception))
class TestDelete(unittest.TestCase):
def test_delete_calls_correct_endpoint(self):
provisioner = _provisioner()
with patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.urllib.request.urlopen"
) as mock_urlopen:
mock_urlopen.return_value = _urlopen_response({})
provisioner.delete("didericis/bot-bottle", "99")
req = mock_urlopen.call_args.args[0]
self.assertIn("/api/v1/repos/didericis/bot-bottle/keys/99", req.full_url)
self.assertEqual("DELETE", req.get_method())
def test_delete_tolerates_404(self):
provisioner = _provisioner()
with patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.urllib.request.urlopen",
side_effect=_http_error(404),
):
provisioner.delete("owner/repo", "123") # must not raise
def test_delete_raises_on_non_404_http_error(self):
provisioner = _provisioner()
with patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.urllib.request.urlopen",
side_effect=_http_error(500, "internal server error"),
):
with self.assertRaises(RuntimeError) as ctx:
provisioner.delete("owner/repo", "7")
self.assertIn("500", str(ctx.exception))
def test_delete_raises_on_url_error(self):
provisioner = _provisioner()
with patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.urllib.request.urlopen",
side_effect=urllib.error.URLError("connection refused"),
):
with self.assertRaises(RuntimeError) as ctx:
provisioner.delete("owner/repo", "7")
self.assertIn("connection refused", str(ctx.exception))
class TestSplitOwnerRepo(unittest.TestCase):
def test_simple(self):
self.assertEqual(("owner", "repo"), _split_owner_repo("owner/repo"))
def test_raises_on_missing_slash(self):
with self.assertRaises(ValueError):
_split_owner_repo("noslash")
def test_raises_on_empty_owner(self):
with self.assertRaises(ValueError):
_split_owner_repo("/repo")
def test_raises_on_empty_repo(self):
with self.assertRaises(ValueError):
_split_owner_repo("owner/")
if __name__ == "__main__":
unittest.main()
+29
View File
@@ -0,0 +1,29 @@
"""Unit: deploy_key_provisioner factory (PRD 0048)."""
from __future__ import annotations
import unittest
from unittest.mock import patch
from bot_bottle.deploy_key_provisioner import DeployKeyProvisioner, get_provisioner
from bot_bottle.manifest import ManifestError
class TestGetProvisioner(unittest.TestCase):
def test_gitea_returns_gitea_provisioner(self):
from bot_bottle.contrib.gitea.deploy_key_provisioner import (
GiteaDeployKeyProvisioner,
)
p = get_provisioner("gitea", token="tok", api_url="https://gitea.example.com")
self.assertIsInstance(p, GiteaDeployKeyProvisioner)
self.assertIsInstance(p, DeployKeyProvisioner)
def test_unknown_provider_raises_manifest_error(self):
with self.assertRaises(ManifestError) as ctx:
get_provisioner("github", token="tok", api_url="https://github.com")
self.assertIn("github", str(ctx.exception))
self.assertIn("provisioned_key provider", str(ctx.exception))
if __name__ == "__main__":
unittest.main()
+107
View File
@@ -243,6 +243,113 @@ class TestGitEntryCrossValidation(unittest.TestCase):
self.assertIn("PRD 0047", msg)
class TestProvisionedKey(unittest.TestCase):
"""git-gate.repos entries that use provisioned_key (PRD 0048)."""
def test_provisioned_key_minimal(self):
m = Manifest.from_json_obj(_manifest({
"bot-bottle": {
"url": "ssh://git@gitea.dideric.is:30009/didericis/bot-bottle.git",
"provisioned_key": {
"provider": "gitea",
"token_env": "GITEA_TOKEN",
},
},
}))
e = m.bottles["dev"].git[0]
self.assertEqual("bot-bottle", e.Name)
self.assertIsNotNone(e.ProvisionedKey)
assert e.ProvisionedKey is not None
self.assertEqual("gitea", e.ProvisionedKey.provider)
self.assertEqual("GITEA_TOKEN", e.ProvisionedKey.token_env)
self.assertEqual("", e.ProvisionedKey.api_url)
self.assertEqual("", e.IdentityFile)
def test_provisioned_key_with_api_url(self):
m = Manifest.from_json_obj(_manifest({
"repo": {
"url": "ssh://git@gitea.example.com/org/repo.git",
"provisioned_key": {
"provider": "gitea",
"token_env": "MY_TOKEN",
"api_url": "https://gitea.example.com",
},
},
}))
pk = m.bottles["dev"].git[0].ProvisionedKey
assert pk is not None
self.assertEqual("https://gitea.example.com", pk.api_url)
def test_both_identity_and_provisioned_key_dies(self):
with self.assertRaises(ManifestError) as ctx:
Manifest.from_json_obj(_manifest({
"foo": {
"url": "ssh://git@github.com/foo.git",
"identity": "/dev/null",
"provisioned_key": {"provider": "gitea", "token_env": "T"},
},
}))
self.assertIn("exactly one of", str(ctx.exception))
self.assertIn("got both", str(ctx.exception))
def test_neither_identity_nor_provisioned_key_dies(self):
with self.assertRaises(ManifestError) as ctx:
Manifest.from_json_obj(_manifest({
"foo": {"url": "ssh://git@github.com/foo.git"},
}))
self.assertIn("exactly one of", str(ctx.exception))
self.assertIn("got neither", str(ctx.exception))
def test_unknown_key_in_provisioned_key_block_dies(self):
with self.assertRaises(ManifestError):
Manifest.from_json_obj(_manifest({
"foo": {
"url": "ssh://git@github.com/foo.git",
"provisioned_key": {
"provider": "gitea",
"token_env": "T",
"key_type": "rsa", # not allowed
},
},
}))
def test_missing_provider_dies(self):
with self.assertRaises(ManifestError):
Manifest.from_json_obj(_manifest({
"foo": {
"url": "ssh://git@github.com/foo.git",
"provisioned_key": {"token_env": "T"},
},
}))
def test_missing_token_env_dies(self):
with self.assertRaises(ManifestError):
Manifest.from_json_obj(_manifest({
"foo": {
"url": "ssh://git@github.com/foo.git",
"provisioned_key": {"provider": "gitea"},
},
}))
def test_provisioned_key_entry_has_no_identity_file(self):
m = Manifest.from_json_obj(_manifest({
"foo": {
"url": "ssh://git@github.com/didericis/foo.git",
"provisioned_key": {"provider": "gitea", "token_env": "T"},
},
}))
self.assertEqual("", m.bottles["dev"].git[0].IdentityFile)
def test_identity_entry_has_no_provisioned_key(self):
m = Manifest.from_json_obj(_manifest({
"foo": {
"url": "ssh://git@github.com/foo.git",
"identity": "/dev/null",
},
}))
self.assertIsNone(m.bottles["dev"].git[0].ProvisionedKey)
class TestEmptyGitGateField(unittest.TestCase):
def test_no_git_gate_field_yields_empty_tuple(self):
m = Manifest.from_json_obj({