Compare commits

..

6 Commits

Author SHA1 Message Date
didericis-claude d0d8a68da8 docs(prd): add PRD 0049 — named/labelled agents
test / unit (pull_request) Successful in 33s
test / integration (pull_request) Successful in 44s
Draft PRD for prompting operators for a custom label and optional
ANSI color at agent launch time, storing both in metadata.json, and
surfacing the label (in color) in the dashboard's active-agents pane.

Closes #171
2026-06-03 12:02:06 -04:00
didericis-claude 83463f1cc8 docs(prd): activate PRD 0048 — SSH deploy-key provisioning
test / unit (push) Successful in 40s
test / integration (push) Successful in 41s
2026-06-03 11:58:36 -04:00
didericis-claude 0b5d59cf9e feat(prd-0048): implement SSH deploy-key provisioning with contrib/gitea
- manifest_git.py: add ProvisionedKeyConfig dataclass; extend GitEntry
  with ProvisionedKey field (optional); make IdentityFile default to ""
  so provisioned_key entries can be constructed without a static path;
  add _parse_provisioned_key_config; update from_repos_entry to accept
  provisioned_key as an alternative to identity (mutually exclusive,
  parser rejects both-or-neither)

- deploy_key_provisioner.py (new): DeployKeyProvisioner ABC with create()
  and delete() abstract methods; get_provisioner() factory with lazy
  contrib import for gitea

- contrib/gitea/deploy_key_provisioner.py (new): GiteaDeployKeyProvisioner
  generating ed25519 keypairs via ssh-keygen and managing them through
  the Gitea deploy-key API (POST/DELETE); 404 on delete is success;
  all other errors raise RuntimeError

- git_gate.py: add _provision_dynamic_key() called in GitGate.prepare()
  for entries with ProvisionedKey — generates key, writes private key
  and key ID files to stage_dir, patches GitGateUpstream.identity_file;
  add revoke_git_gate_provisioned_keys() for teardown — raises on failure

- docker/launch.py: call revoke_git_gate_provisioned_keys() in teardown()
  after stack.close() so revocation runs after containers stop and
  failures propagate (not suppressed)

- smolmachines/launch.py: extract _teardown_smolmachines() helper that
  catches stack.close() errors (warn + re-raise) then calls revocation;
  same fatal-on-failure contract as docker backend

- test_manifest_git.py: 9 new cases for provisioned_key parsing
- test_deploy_key_provisioner.py (new): factory smoke tests
- test_contrib_gitea_deploy_key.py (new): create/delete/error/split tests

Closes #169
2026-06-03 11:58:36 -04:00
didericis-claude 464012d97c docs(prd): address review feedback on PRD 0048
- Rename deploy_key → provisioned_key throughout (manifest key,
  dataclass names, internal field names, test descriptions)
- Revocation failure at teardown now halts cleanup and propagates
  loudly; a stranded key is a security concern that must surface
2026-06-03 11:58:36 -04:00
didericis-claude b5f8a27c47 docs(prd): add SSH deploy-key provisioning plan (PRD 0048)
Introduces the design for short-lived deploy keys provisioned at spin-up
and revoked at teardown, plus the contrib package structure for
platform-specific provisioner implementations. First contrib provider
targets the Gitea deploy-key API.

Closes #169
2026-06-03 11:58:36 -04:00
didericis-claude f0ca4e3527 refactor: extract dashboard state/model layer into dashboard_model.py
test / unit (pull_request) Successful in 41s
test / integration (pull_request) Successful in 47s
test / unit (push) Successful in 35s
test / integration (push) Successful in 44s
Splits the 2103-line dashboard.py into two modules. Pure data
structures (QueuedProposal), discovery helpers (discover_pending,
discover_active_agents), derived-value helpers (_is_recent,
_approval_status, _format_agent_row, _detail_lines, etc.), and
argv-builder helpers (_build_split_pane_argv, _build_respawn_pane_argv,
_build_resume_argv_with_fallback, _agent_runtime_args) all move to
dashboard_model.py. The curses TUI, $EDITOR integration, tmux
subprocess flows, and action handlers (approve, reject,
operator_edit_routes, operator_edit_allowlist) remain in dashboard.py,
which re-imports everything from dashboard_model so existing callers and
tests are unaffected.

Adds tests/unit/test_dashboard_model.py covering _approval_status,
_proposed_payload_label, and _suffix_for_tool — three helpers that had
no prior coverage. All 894 unit tests pass.

Closes #158
2026-06-03 15:52:27 +00:00
15 changed files with 1525 additions and 402 deletions
+8
View File
@@ -43,6 +43,7 @@ from pathlib import Path
from typing import Callable, Generator from typing import Callable, Generator
from ...egress import egress_resolve_token_values from ...egress import egress_resolve_token_values
from ...git_gate import revoke_git_gate_provisioned_keys
from ...log import info, warn from ...log import info, warn
from . import network as network_mod from . import network as network_mod
from . import util as docker_mod from . import util as docker_mod
@@ -51,6 +52,7 @@ from .bottle_plan import DockerBottlePlan
from .bottle_state import ( from .bottle_state import (
bottle_state_dir, bottle_state_dir,
egress_state_dir, egress_state_dir,
git_gate_state_dir,
pipelock_state_dir, pipelock_state_dir,
) )
from .compose import ( from .compose import (
@@ -84,6 +86,9 @@ def launch(
Teardown on exit.""" Teardown on exit."""
stack = ExitStack() stack = ExitStack()
_bottle_for_revoke = plan.spec.manifest.bottle_for(plan.spec.agent_name)
_git_gate_dir_for_revoke = git_gate_state_dir(plan.slug)
def teardown() -> None: def teardown() -> None:
try: try:
stack.close() stack.close()
@@ -92,6 +97,9 @@ def launch(
f"teardown failed for container {plan.container_name}" f"teardown failed for container {plan.container_name}"
f" (compose-down): {exc!r}" f" (compose-down): {exc!r}"
) )
revoke_git_gate_provisioned_keys(
_bottle_for_revoke, _git_gate_dir_for_revoke
)
try: try:
# Step 1: agent image build. Sidecar images get built lazily by # Step 1: agent image build. Sidecar images get built lazily by
+24
View File
@@ -53,6 +53,9 @@ from ..docker.pipelock import (
PIPELOCK_PORT as _PIPELOCK_PORT_STR, PIPELOCK_PORT as _PIPELOCK_PORT_STR,
pipelock_tls_init, pipelock_tls_init,
) )
from ...git_gate import revoke_git_gate_provisioned_keys
from ...log import warn
from ..docker.bottle_state import git_gate_state_dir
from . import loopback_alias as _loopback from . import loopback_alias as _loopback
from . import sidecar_bundle as _bundle from . import sidecar_bundle as _bundle
from . import smolvm as _smolvm from . import smolvm as _smolvm
@@ -120,7 +123,28 @@ def launch(
agent_prompt_mode=plan.agent_prompt_mode, agent_prompt_mode=plan.agent_prompt_mode,
) )
finally: finally:
_teardown_smolmachines(stack, plan)
def _teardown_smolmachines(
stack: ExitStack,
plan: SmolmachinesBottlePlan,
) -> None:
"""Unwind the ExitStack, then revoke any provisioned deploy keys.
ExitStack errors are caught and logged (non-fatal) so that key
revocation always runs. Revocation errors propagate — a stranded
deploy key is a security concern the operator must address."""
teardown_exc: BaseException | None = None
try:
stack.close() stack.close()
except BaseException as exc:
teardown_exc = exc
warn(f"smolmachines teardown failed: {exc!r}")
bottle = plan.spec.manifest.bottle_for(plan.spec.agent_name)
revoke_git_gate_provisioned_keys(bottle, git_gate_state_dir(plan.slug))
if teardown_exc is not None:
raise teardown_exc
def _allocate_resources( def _allocate_resources(
+28 -390
View File
@@ -15,31 +15,27 @@ import argparse
import contextlib import contextlib
import curses import curses
import os import os
import shlex
import shutil import shutil
import subprocess import subprocess
import sys import sys
import tempfile import tempfile
import time import time
import traceback import traceback
from dataclasses import dataclass
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from .. import supervise as _supervise from .. import supervise as _supervise
from ..agent_provider import runtime_for
from ..backend import ( from ..backend import (
ActiveAgent, ActiveAgent,
BottleSpec, BottleSpec,
enumerate_active_agents,
get_bottle_backend, get_bottle_backend,
known_backend_names, known_backend_names,
) )
from ..backend.docker.bottle_state import bottle_state_dir, read_metadata
from ..backend.docker.capability_apply import ( from ..backend.docker.capability_apply import (
CapabilityApplyError, CapabilityApplyError,
apply_capability_change, apply_capability_change,
) )
from ..backend.docker.bottle_state import bottle_state_dir, read_metadata
from ..backend.docker.egress_apply import ( from ..backend.docker.egress_apply import (
EgressApplyError, EgressApplyError,
add_route, add_route,
@@ -68,12 +64,38 @@ from ..supervise import (
TOOL_EGRESS_BLOCK, TOOL_EGRESS_BLOCK,
TOOL_PIPELOCK_BLOCK, TOOL_PIPELOCK_BLOCK,
archive_proposal, archive_proposal,
list_pending_proposals,
render_diff, render_diff,
write_audit_entry, write_audit_entry,
write_response, write_response,
) )
from ._common import PROG, USER_CWD from ._common import PROG, USER_CWD
from .dashboard_model import (
PANE_AGENTS,
PANE_PROPOSALS,
QueuedProposal,
_NEW_PROPOSAL_HIGHLIGHT_SEC,
_REFRESH_INTERVAL_MS,
_agent_runtime_args,
_approval_status,
_bottle_for_slug,
_build_respawn_pane_argv,
_build_resume_argv_with_fallback,
_build_split_pane_argv,
_detail_lines,
_failed_url_host,
_filter_agents,
_format_agent_row,
_in_tmux,
_is_recent,
_pick_next_after_stop,
_proposed_payload_label,
_running_counts,
_selected_agent,
_selection_status,
_suffix_for_tool,
discover_active_agents,
discover_pending,
)
from .start import ( from .start import (
attach_agent, attach_agent,
capture_claude_session_state, capture_claude_session_state,
@@ -88,55 +110,6 @@ from .start import (
ApplyError = (EgressApplyError, PipelockApplyError, CapabilityApplyError) ApplyError = (EgressApplyError, PipelockApplyError, CapabilityApplyError)
# --- Discovery -------------------------------------------------------------
@dataclass(frozen=True)
class QueuedProposal:
"""A pending proposal plus the queue dir it was found in."""
proposal: Proposal
queue_dir: Path
def discover_active_agents() -> list[ActiveAgent]:
"""All currently-running agents across every backend with
their metadata + service set. Returns [] when neither
backend is reachable. Backed by the shared
`enumerate_active_agents` helper so the CLI's
`./cli.py list active` and this dashboard show the same data."""
return enumerate_active_agents()
def _approval_status(qp: QueuedProposal, verb: str) -> str:
"""Status-line text after a successful approval. For capability-
block, append the `resume <identity>` hint so the operator can
bring the rebuilt bottle back up with one copy-paste."""
base = f"{verb} {qp.proposal.tool} for [{qp.proposal.bottle_slug}]"
if qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
return f"{base}; resume: ./cli.py resume {qp.proposal.bottle_slug}"
return base
def discover_pending() -> list[QueuedProposal]:
"""Walk ~/.bot-bottle/queue/* and collect pending proposals
from every bottle's queue. Sorted by arrival time across the
union — the operator works the global FIFO."""
queue_root = _supervise.bot_bottle_root() / "queue"
if not queue_root.is_dir():
return []
out: list[QueuedProposal] = []
for slug_dir in sorted(queue_root.iterdir()):
if not slug_dir.is_dir():
continue
for proposal in list_pending_proposals(slug_dir):
out.append(QueuedProposal(proposal=proposal, queue_dir=slug_dir))
out.sort(key=lambda q: q.proposal.arrival_timestamp)
return out
# --- Operator actions ------------------------------------------------------ # --- Operator actions ------------------------------------------------------
@@ -359,15 +332,6 @@ def edit_in_editor(content: str, *, suffix: str = ".tmp") -> str | None:
# loop's `bottles` dict; chunks 3/4 wire Enter / `x` to act on it. # loop's `bottles` dict; chunks 3/4 wire Enter / `x` to act on it.
def _filter_agents(query: str, names: list[str]) -> list[str]:
"""Case-insensitive substring filter for the picker. Pure
function — no curses, easy to unit-test."""
if not query:
return list(names)
q = query.lower()
return [n for n in names if q in n.lower()]
def _picker_modal( def _picker_modal(
stdscr: "curses._CursesWindow", stdscr: "curses._CursesWindow",
names: list[str], names: list[str],
@@ -627,63 +591,6 @@ def _capture_preflight_text(plan) -> str:
return buf.getvalue().strip("\n") return buf.getvalue().strip("\n")
def _running_counts(
bottles: dict, agents_now: list[ActiveAgent],
) -> dict[str, int]:
"""Per-agent running count: dashboard-owned + externally-
discovered, summed by agent_name. The picker shows this so the
operator knows whether picking an agent starts a fresh bottle
or a Nth one."""
counts: dict[str, int] = {}
for a in agents_now:
counts[a.agent_name] = counts.get(a.agent_name, 0) + 1
return counts
def _bottle_for_slug(
slug: str,
bottles: dict,
manifest: Manifest | None,
) -> tuple["object", str]:
"""Return `(bottle_handle, prompt_path_hint)` for a re-attach.
If the slug is in `bottles` (dashboard-owned), return the stored
handle directly. Otherwise synthesize a bottle from the persisted
metadata. The backend field in metadata (PRD 0040) selects Docker
or smolmachines; unknown or missing metadata defaults to Docker.
Returns the empty string for prompt_path_hint when we omit the
flag — the caller passes None to DockerBottle in that case."""
from ..backend.docker.bottle import DockerBottle
from ..backend.docker.bottle_state import read_metadata
from ..backend.smolmachines.bottle import SmolmachinesBottle
if slug in bottles:
_cm, bottle, _identity = bottles[slug]
return bottle, ""
instance_name = f"bot-bottle-{slug}"
prompt_path: str | None = None
metadata = read_metadata(slug)
if metadata is not None and manifest is not None:
agent = manifest.agents.get(metadata.agent_name)
if agent is not None and agent.prompt:
container_home = os.environ.get(
"BOT_BOTTLE_CONTAINER_HOME", "/home/node",
)
prompt_path = f"{container_home}/.bot-bottle-prompt.txt"
backend = metadata.backend if metadata is not None else ""
if backend == "smolmachines":
synth: object = SmolmachinesBottle(
instance_name,
prompt_path=prompt_path,
)
else:
synth = DockerBottle(
container=instance_name,
teardown=lambda: None,
prompt_path_in_container=prompt_path,
)
return synth, (prompt_path or "")
def _stop_bottle_flow( def _stop_bottle_flow(
stdscr: "curses._CursesWindow", stdscr: "curses._CursesWindow",
bottles: dict, bottles: dict,
@@ -770,100 +677,6 @@ def _stop_bottle_flow(
# reused across attaches. # reused across attaches.
def _in_tmux() -> bool:
"""True when the dashboard is running inside a tmux session.
Tmux sets `$TMUX` to the path of its server socket."""
return bool(os.environ.get("TMUX"))
def _agent_runtime_args(
*, resume: bool, remote_control: bool = False, agent_provider_template: str = "claude",
) -> list[str]:
"""The argv the dashboard hands to `bottle.agent_argv`
on every attach — matches what `attach_agent` builds for the
foreground handoff so both surfaces produce the same claude
invocation."""
runtime = runtime_for(agent_provider_template)
args = list(runtime.bypass_args)
if remote_control:
args.extend(runtime.remote_control_args)
if resume:
args.extend(runtime.resume_args)
return args
def _build_resume_argv_with_fallback(
bottle, *, remote_control: bool = False, agent_provider_template: str = "claude",
) -> list[str]:
"""Build a backend-exec argv that runs `claude --continue` and
falls back to plain `claude` if no prior session exists.
`--continue` exits non-zero when an agent has been spun up
but never typed at — there's no transcript to resume. The
shell-level `||` wrapper makes that case start a fresh
session instead of crashing the pane. The trade-off: we
invoke `sh -c` inside the bottle, so the command is two
`claude` invocations behind a tiny shell rather than one
direct exec. Acceptable; the shell adds microseconds and
the fallback only kicks in when --continue would have
failed anyway.
Works across backends because `bottle.agent_argv` always
surfaces the `claude` token preceded by the backend's exec
framing (docker: `docker exec -it <c>`; smolmachines:
`smolvm machine exec --name <m> -- runuser -u node --`).
Splitting at `claude` keeps the framing as the prefix and
wraps just the agent tail in `sh -c`."""
if agent_provider_template != "claude":
return bottle.agent_argv(
_agent_runtime_args(
resume=True,
remote_control=remote_control,
agent_provider_template=agent_provider_template,
)
)
base_args = _agent_runtime_args(
resume=False,
remote_control=remote_control,
agent_provider_template=agent_provider_template,
)
base_exec = bottle.agent_argv(base_args)
# Split exec-framing prefix from the agent-and-args tail so
# we can compose `<claude…> --continue || <claude…>` inside
# `sh -c`. The provider command token is the marker.
command = getattr(bottle, "agent_command", runtime_for(agent_provider_template).command)
agent_idx = base_exec.index(command)
prefix = base_exec[:agent_idx]
agent_cmd = " ".join(shlex.quote(a) for a in base_exec[agent_idx:])
resume_args = " ".join(
shlex.quote(a) for a in runtime_for(agent_provider_template).resume_args
)
return [
*prefix,
"sh", "-c",
f"{agent_cmd} {resume_args} || {agent_cmd}",
]
def _build_split_pane_argv(agent_argv: list[str]) -> list[str]:
"""Pure helper: wrap a backend-exec argv with `tmux split-window
-h -P -F '#{pane_id}'`. The `-P -F` combo tells tmux to print
the new pane's id on stdout so we can track it for later
`respawn-pane` calls."""
return [
"tmux", "split-window", "-h",
"-P", "-F", "#{pane_id}",
*agent_argv,
]
def _build_respawn_pane_argv(pane_id: str, agent_argv: list[str]) -> list[str]:
"""Pure helper: wrap a backend-exec argv with `tmux respawn-pane
-k -t <pane_id>`. `-k` kills the existing process in the pane
before respawning."""
return ["tmux", "respawn-pane", "-k", "-t", pane_id, *agent_argv]
@contextlib.contextmanager @contextlib.contextmanager
def _redirect_stderr_to_file(path): def _redirect_stderr_to_file(path):
"""Redirect file descriptor 2 (stderr) to `path` for the """Redirect file descriptor 2 (stderr) to `path` for the
@@ -980,24 +793,6 @@ def _tmux_close_right_pane(tmux_state: dict) -> None:
tmux_state["slug"] = None tmux_state["slug"] = None
def _pick_next_after_stop(
agents_before: list[ActiveAgent],
selected_index: int,
stopped_slug: str,
) -> tuple[int, ActiveAgent] | None:
"""After stopping `stopped_slug` from the agents list, choose
the agent that should take focus next. The agent below the
stopped row (which slides up to fill its index) is the
natural pick; if the stopped agent was last, the row above
instead. Returns (new_index, agent) or None if no agents
remain. Pure — easy to unit-test."""
new_agents = [a for a in agents_before if a.slug != stopped_slug]
if not new_agents:
return None
new_index = min(max(selected_index, 0), len(new_agents) - 1)
return new_index, new_agents[new_index]
def _ensure_right_pane(tmux_state: dict, argv: list[str]) -> str | None: def _ensure_right_pane(tmux_state: dict, argv: list[str]) -> str | None:
"""Run `argv` in the dashboard's right pane — respawn an """Run `argv` in the dashboard's right pane — respawn an
existing tracked pane if one is alive, split-window to existing tracked pane if one is alive, split-window to
@@ -1355,31 +1150,6 @@ def _list_once() -> int:
return 0 return 0
_REFRESH_INTERVAL_MS = 1000
# How long a newly-arrived proposal stays highlighted (green) in the
# list. Long enough for the operator to notice in their peripheral
# vision, short enough to fade before the queue feels permanently
# noisy.
_NEW_PROPOSAL_HIGHLIGHT_SEC = 5.0
def _is_recent(
proposal_id: str,
first_seen: dict[str, float] | None,
now: float | None,
) -> bool:
"""True if `proposal_id` was first seen within the highlight
window. Both `first_seen` and `now` may be None (rendered as
not-recent) so the helper is safe in cold-start paths."""
if first_seen is None or now is None:
return False
started = first_seen.get(proposal_id)
if started is None:
return False
return (now - started) < _NEW_PROPOSAL_HIGHLIGHT_SEC
def _try_init_green() -> int: def _try_init_green() -> int:
"""Initialise a green color pair and return its attr, or 0 if the """Initialise a green color pair and return its attr, or 0 if the
terminal doesn't support color. Caller ORs the returned value terminal doesn't support color. Caller ORs the returned value
@@ -1417,14 +1187,6 @@ def _quit_without_teardown(bottles: dict) -> None:
os._exit(0) os._exit(0)
# PRD 0019 chunk 3: which pane the j/k/arrow keys move through.
# Tab toggles. The proposals pane is the default focus — proposal
# action keys (a/m/r/Enter) require it; agent-scoped keys (e/p,
# chunk 4) require the agents pane.
PANE_PROPOSALS = "proposals"
PANE_AGENTS = "agents"
def _main_loop(stdscr: "curses._CursesWindow") -> None: def _main_loop(stdscr: "curses._CursesWindow") -> None:
curses.curs_set(0) curses.curs_set(0)
# Auto-refresh: getch() returns -1 after the timeout if no key # Auto-refresh: getch() returns -1 after the timeout if no key
@@ -1811,63 +1573,6 @@ def _render(
stdscr.refresh() stdscr.refresh()
def _selection_status(
focus: str, agents: list[ActiveAgent], selected_agent: int,
) -> str:
"""Status-line text for the idle state. Surfaces the agents-
pane selection so the operator can tell what an agent-scoped
edit verb would target."""
if focus != PANE_AGENTS:
return ""
if not agents:
return "[no active agents]"
if 0 <= selected_agent < len(agents):
return f"[selected: {agents[selected_agent].slug}]"
return "[no agent selected]"
def _selected_agent(
focus: str, agents: list[ActiveAgent], selected_agent: int,
) -> ActiveAgent | None:
"""The selected agent to scope `e` / `p` to, or None if no
selection is valid (proposals pane focused, no active agents,
or selection out of bounds)."""
if focus != PANE_AGENTS:
return None
if not agents:
return None
if 0 <= selected_agent < len(agents):
return agents[selected_agent]
return None
def _format_agent_row(a: ActiveAgent, maxw: int) -> str:
"""One-line agent row: ` [<backend>] <slug> <agent_name> started
<HH:MM:SS> [<sidecars>]`. The `agent` service is filtered out of
the displayed list — it's always present for an active bottle,
so listing it carries no information; the sidecars are the
differentiator.
The `[docker]` / `[smolmachines]` prefix lets the operator tell
which backend a bottle came from (issue #77). Truncated to
`maxw` because the renderer's addnstr only enforces width if
we hand it a properly-sized string."""
started = (
a.started_at.split("T", 1)[1][:8]
if "T" in a.started_at else (a.started_at or "?")
)
sidecars = tuple(s for s in a.services if s != "agent")
services = ",".join(sidecars) if sidecars else "(starting)"
backend_tag = f"[{a.backend_name}]" if a.backend_name else ""
line = (
f" {backend_tag} {a.slug} {a.agent_name} "
f"started {started} [{services}]"
)
if len(line) > maxw:
return line[: max(0, maxw - 1)] + ""
return line
def _detail_view( def _detail_view(
stdscr: "curses._CursesWindow", stdscr: "curses._CursesWindow",
qp: QueuedProposal, qp: QueuedProposal,
@@ -1921,66 +1626,6 @@ def _detail_view(
return return
def _detail_lines(
qp: QueuedProposal,
*,
green_attr: int = 0,
) -> list[tuple[str, int]]:
"""Return the detail-view body as (text, curses-attr) tuples.
Most lines are plain (attr=0); pipelock-block proposals append
a green "→ would allow host: ..." line so the operator sees at
a glance which hostname will land in pipelock's allowlist if
they hit approve. The URL itself is shown above for context."""
p = qp.proposal
out: list[tuple[str, int]] = [
(f"bottle: {p.bottle_slug}", 0),
(f"tool: {p.tool}", 0),
(f"id: {p.id}", 0),
(f"arrived: {p.arrival_timestamp}", 0),
(f"queue: {qp.queue_dir}", 0),
("", 0),
("justification:", 0),
]
out.extend((" " + line, 0) for line in p.justification.splitlines() or [""])
out.extend([
("", 0),
(_proposed_payload_label(p.tool) + ":", 0),
])
out.extend((line, 0) for line in p.proposed_file.splitlines() or [""])
if p.tool == TOOL_PIPELOCK_BLOCK:
host = _failed_url_host(p.proposed_file)
if host:
# Show the literal line that will be appended to the
# bottle's pipelock allowlist on approve. Green so it
# reads as "what changes"; the URL above carries the
# path context (which pipelock can't enforce — see the
# follow-up note on _apply_pipelock_url).
out.append(("", 0))
out.append((host, green_attr))
return out
def _failed_url_host(url: str) -> str:
"""Best-effort hostname extraction from a pipelock-block proposal's
failed_url payload. Returns empty string on unparseable input —
callers handle empty as "nothing to highlight"."""
import urllib.parse
try:
return urllib.parse.urlsplit(url.strip()).hostname or ""
except ValueError:
return ""
def _proposed_payload_label(tool: str) -> str:
"""The detail-view section heading for the proposal's payload —
`proposed_file` is what the dataclass calls it, but for
pipelock-block the payload is a single URL not a file. Render
the label per tool so the operator's eye matches."""
if tool == TOOL_PIPELOCK_BLOCK:
return "failed URL"
return "proposed file"
def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None: def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None:
"""Suspend curses, open $EDITOR on the proposed file, return the """Suspend curses, open $EDITOR on the proposed file, return the
edited content (or None if unchanged).""" edited content (or None if unchanged)."""
@@ -1993,13 +1638,6 @@ def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None:
return edited return edited
def _suffix_for_tool(tool: str) -> str:
if tool == TOOL_CAPABILITY_BLOCK:
return ".dockerfile"
# egress-block / pipelock-block: JSON-ish + plain.
return ".txt"
def _operator_edit_routes_flow( def _operator_edit_routes_flow(
stdscr: "curses._CursesWindow", agent: ActiveAgent, stdscr: "curses._CursesWindow", agent: ActiveAgent,
) -> str: ) -> str:
+421
View File
@@ -0,0 +1,421 @@
"""dashboard_model: state/model layer for the dashboard TUI.
Data structures, discovery queries, pure state helpers, and derived
values extracted from dashboard.py so they can be tested in isolation
and navigated without wading through curses rendering code.
"""
from __future__ import annotations
import os
import shlex
from dataclasses import dataclass
from pathlib import Path
from .. import supervise as _supervise
from ..agent_provider import runtime_for
from ..backend import ActiveAgent, enumerate_active_agents
from ..backend.docker.capability_apply import CapabilityApplyError
from ..backend.docker.egress_apply import EgressApplyError
from ..backend.docker.pipelock_apply import PipelockApplyError
from ..manifest import Manifest
from ..supervise import (
TOOL_CAPABILITY_BLOCK,
TOOL_PIPELOCK_BLOCK,
Proposal,
list_pending_proposals,
)
# --- Constants ---------------------------------------------------------------
_REFRESH_INTERVAL_MS = 1000
_NEW_PROPOSAL_HIGHLIGHT_SEC = 5.0
PANE_PROPOSALS = "proposals"
PANE_AGENTS = "agents"
# --- Data structures ---------------------------------------------------------
@dataclass(frozen=True)
class QueuedProposal:
"""A pending proposal plus the queue dir it was found in."""
proposal: Proposal
queue_dir: Path
ApplyError = (EgressApplyError, PipelockApplyError, CapabilityApplyError)
# --- Discovery ---------------------------------------------------------------
def discover_active_agents() -> list[ActiveAgent]:
"""All currently-running agents across every backend with
their metadata + service set. Returns [] when neither
backend is reachable. Backed by the shared
`enumerate_active_agents` helper so the CLI's
`./cli.py list active` and this dashboard show the same data."""
return enumerate_active_agents()
def discover_pending() -> list[QueuedProposal]:
"""Walk ~/.bot-bottle/queue/* and collect pending proposals
from every bottle's queue. Sorted by arrival time across the
union — the operator works the global FIFO."""
queue_root = _supervise.bot_bottle_root() / "queue"
if not queue_root.is_dir():
return []
out: list[QueuedProposal] = []
for slug_dir in sorted(queue_root.iterdir()):
if not slug_dir.is_dir():
continue
for proposal in list_pending_proposals(slug_dir):
out.append(QueuedProposal(proposal=proposal, queue_dir=slug_dir))
out.sort(key=lambda q: q.proposal.arrival_timestamp)
return out
# --- Derived values ----------------------------------------------------------
def _approval_status(qp: QueuedProposal, verb: str) -> str:
"""Status-line text after a successful approval. For capability-
block, append the `resume <identity>` hint so the operator can
bring the rebuilt bottle back up with one copy-paste."""
base = f"{verb} {qp.proposal.tool} for [{qp.proposal.bottle_slug}]"
if qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
return f"{base}; resume: ./cli.py resume {qp.proposal.bottle_slug}"
return base
def _is_recent(
proposal_id: str,
first_seen: dict[str, float] | None,
now: float | None,
) -> bool:
"""True if `proposal_id` was first seen within the highlight
window. Both `first_seen` and `now` may be None (rendered as
not-recent) so the helper is safe in cold-start paths."""
if first_seen is None or now is None:
return False
started = first_seen.get(proposal_id)
if started is None:
return False
return (now - started) < _NEW_PROPOSAL_HIGHLIGHT_SEC
def _selection_status(
focus: str, agents: list[ActiveAgent], selected_agent: int,
) -> str:
"""Status-line text for the idle state. Surfaces the agents-
pane selection so the operator can tell what an agent-scoped
edit verb would target."""
if focus != PANE_AGENTS:
return ""
if not agents:
return "[no active agents]"
if 0 <= selected_agent < len(agents):
return f"[selected: {agents[selected_agent].slug}]"
return "[no agent selected]"
def _selected_agent(
focus: str, agents: list[ActiveAgent], selected_agent: int,
) -> ActiveAgent | None:
"""The selected agent to scope `e` / `p` to, or None if no
selection is valid (proposals pane focused, no active agents,
or selection out of bounds)."""
if focus != PANE_AGENTS:
return None
if not agents:
return None
if 0 <= selected_agent < len(agents):
return agents[selected_agent]
return None
# --- Picker helpers ----------------------------------------------------------
def _filter_agents(query: str, names: list[str]) -> list[str]:
"""Case-insensitive substring filter for the picker. Pure
function — no curses, easy to unit-test."""
if not query:
return list(names)
q = query.lower()
return [n for n in names if q in n.lower()]
def _running_counts(
bottles: dict, agents_now: list[ActiveAgent],
) -> dict[str, int]:
"""Per-agent running count: dashboard-owned + externally-
discovered, summed by agent_name. The picker shows this so the
operator knows whether picking an agent starts a fresh bottle
or a Nth one."""
counts: dict[str, int] = {}
for a in agents_now:
counts[a.agent_name] = counts.get(a.agent_name, 0) + 1
return counts
# --- Agent-row rendering helpers ---------------------------------------------
def _format_agent_row(a: ActiveAgent, maxw: int) -> str:
"""One-line agent row: ` [<backend>] <slug> <agent_name> started
<HH:MM:SS> [<sidecars>]`. The `agent` service is filtered out of
the displayed list — it's always present for an active bottle,
so listing it carries no information; the sidecars are the
differentiator.
The `[docker]` / `[smolmachines]` prefix lets the operator tell
which backend a bottle came from (issue #77). Truncated to
`maxw` because the renderer's addnstr only enforces width if
we hand it a properly-sized string."""
started = (
a.started_at.split("T", 1)[1][:8]
if "T" in a.started_at else (a.started_at or "?")
)
sidecars = tuple(s for s in a.services if s != "agent")
services = ",".join(sidecars) if sidecars else "(starting)"
backend_tag = f"[{a.backend_name}]" if a.backend_name else ""
line = (
f" {backend_tag} {a.slug} {a.agent_name} "
f"started {started} [{services}]"
)
if len(line) > maxw:
return line[: max(0, maxw - 1)] + ""
return line
# --- Detail-view helpers -----------------------------------------------------
def _detail_lines(
qp: QueuedProposal,
*,
green_attr: int = 0,
) -> list[tuple[str, int]]:
"""Return the detail-view body as (text, curses-attr) tuples.
Most lines are plain (attr=0); pipelock-block proposals append
a green "→ would allow host: ..." line so the operator sees at
a glance which hostname will land in pipelock's allowlist if
they hit approve. The URL itself is shown above for context."""
p = qp.proposal
out: list[tuple[str, int]] = [
(f"bottle: {p.bottle_slug}", 0),
(f"tool: {p.tool}", 0),
(f"id: {p.id}", 0),
(f"arrived: {p.arrival_timestamp}", 0),
(f"queue: {qp.queue_dir}", 0),
("", 0),
("justification:", 0),
]
out.extend((" " + line, 0) for line in p.justification.splitlines() or [""])
out.extend([
("", 0),
(_proposed_payload_label(p.tool) + ":", 0),
])
out.extend((line, 0) for line in p.proposed_file.splitlines() or [""])
if p.tool == TOOL_PIPELOCK_BLOCK:
host = _failed_url_host(p.proposed_file)
if host:
out.append(("", 0))
out.append((host, green_attr))
return out
def _failed_url_host(url: str) -> str:
"""Best-effort hostname extraction from a pipelock-block proposal's
failed_url payload. Returns empty string on unparseable input —
callers handle empty as "nothing to highlight"."""
import urllib.parse
try:
return urllib.parse.urlsplit(url.strip()).hostname or ""
except ValueError:
return ""
def _proposed_payload_label(tool: str) -> str:
"""The detail-view section heading for the proposal's payload —
`proposed_file` is what the dataclass calls it, but for
pipelock-block the payload is a single URL not a file. Render
the label per tool so the operator's eye matches."""
if tool == TOOL_PIPELOCK_BLOCK:
return "failed URL"
return "proposed file"
def _suffix_for_tool(tool: str) -> str:
if tool == TOOL_CAPABILITY_BLOCK:
return ".dockerfile"
return ".txt"
# --- Bottle/agent resolution -------------------------------------------------
def _bottle_for_slug(
slug: str,
bottles: dict,
manifest: Manifest | None,
) -> tuple["object", str]:
"""Return `(bottle_handle, prompt_path_hint)` for a re-attach.
If the slug is in `bottles` (dashboard-owned), return the stored
handle directly. Otherwise synthesize a bottle from the persisted
metadata. The backend field in metadata (PRD 0040) selects Docker
or smolmachines; unknown or missing metadata defaults to Docker.
Returns the empty string for prompt_path_hint when we omit the
flag — the caller passes None to DockerBottle in that case."""
from ..backend.docker.bottle import DockerBottle
from ..backend.docker.bottle_state import read_metadata
from ..backend.smolmachines.bottle import SmolmachinesBottle
if slug in bottles:
_cm, bottle, _identity = bottles[slug]
return bottle, ""
instance_name = f"bot-bottle-{slug}"
prompt_path: str | None = None
metadata = read_metadata(slug)
if metadata is not None and manifest is not None:
agent = manifest.agents.get(metadata.agent_name)
if agent is not None and agent.prompt:
container_home = os.environ.get(
"BOT_BOTTLE_CONTAINER_HOME", "/home/node",
)
prompt_path = f"{container_home}/.bot-bottle-prompt.txt"
backend = metadata.backend if metadata is not None else ""
if backend == "smolmachines":
synth: object = SmolmachinesBottle(
instance_name,
prompt_path=prompt_path,
)
else:
synth = DockerBottle(
container=instance_name,
teardown=lambda: None,
prompt_path_in_container=prompt_path,
)
return synth, (prompt_path or "")
def _pick_next_after_stop(
agents_before: list[ActiveAgent],
selected_index: int,
stopped_slug: str,
) -> tuple[int, ActiveAgent] | None:
"""After stopping `stopped_slug` from the agents list, choose
the agent that should take focus next. The agent below the
stopped row (which slides up to fill its index) is the
natural pick; if the stopped agent was last, the row above
instead. Returns (new_index, agent) or None if no agents
remain. Pure — easy to unit-test."""
new_agents = [a for a in agents_before if a.slug != stopped_slug]
if not new_agents:
return None
new_index = min(max(selected_index, 0), len(new_agents) - 1)
return new_index, new_agents[new_index]
# --- tmux argv builders ------------------------------------------------------
def _in_tmux() -> bool:
"""True when the dashboard is running inside a tmux session.
Tmux sets `$TMUX` to the path of its server socket."""
return bool(os.environ.get("TMUX"))
def _agent_runtime_args(
*, resume: bool, remote_control: bool = False, agent_provider_template: str = "claude",
) -> list[str]:
"""The argv the dashboard hands to `bottle.agent_argv`
on every attach — matches what `attach_agent` builds for the
foreground handoff so both surfaces produce the same claude
invocation."""
runtime = runtime_for(agent_provider_template)
args = list(runtime.bypass_args)
if remote_control:
args.extend(runtime.remote_control_args)
if resume:
args.extend(runtime.resume_args)
return args
def _build_resume_argv_with_fallback(
bottle, *, remote_control: bool = False, agent_provider_template: str = "claude",
) -> list[str]:
"""Build a backend-exec argv that runs `claude --continue` and
falls back to plain `claude` if no prior session exists.
`--continue` exits non-zero when an agent has been spun up
but never typed at — there's no transcript to resume. The
shell-level `||` wrapper makes that case start a fresh
session instead of crashing the pane. The trade-off: we
invoke `sh -c` inside the bottle, so the command is two
`claude` invocations behind a tiny shell rather than one
direct exec. Acceptable; the shell adds microseconds and
the fallback only kicks in when --continue would have
failed anyway.
Works across backends because `bottle.agent_argv` always
surfaces the `claude` token preceded by the backend's exec
framing (docker: `docker exec -it <c>`; smolmachines:
`smolvm machine exec --name <m> -- runuser -u node --`).
Splitting at `claude` keeps the framing as the prefix and
wraps just the agent tail in `sh -c`."""
if agent_provider_template != "claude":
return bottle.agent_argv(
_agent_runtime_args(
resume=True,
remote_control=remote_control,
agent_provider_template=agent_provider_template,
)
)
base_args = _agent_runtime_args(
resume=False,
remote_control=remote_control,
agent_provider_template=agent_provider_template,
)
base_exec = bottle.agent_argv(base_args)
# Split exec-framing prefix from the agent-and-args tail so
# we can compose `<claude…> --continue || <claude…>` inside
# `sh -c`. The provider command token is the marker.
command = getattr(bottle, "agent_command", runtime_for(agent_provider_template).command)
agent_idx = base_exec.index(command)
prefix = base_exec[:agent_idx]
agent_cmd = " ".join(shlex.quote(a) for a in base_exec[agent_idx:])
resume_args = " ".join(
shlex.quote(a) for a in runtime_for(agent_provider_template).resume_args
)
return [
*prefix,
"sh", "-c",
f"{agent_cmd} {resume_args} || {agent_cmd}",
]
def _build_split_pane_argv(agent_argv: list[str]) -> list[str]:
"""Pure helper: wrap a backend-exec argv with `tmux split-window
-h -P -F '#{pane_id}'`. The `-P -F` combo tells tmux to print
the new pane's id on stdout so we can track it for later
`respawn-pane` calls."""
return [
"tmux", "split-window", "-h",
"-P", "-F", "#{pane_id}",
*agent_argv,
]
def _build_respawn_pane_argv(pane_id: str, agent_argv: list[str]) -> list[str]:
"""Pure helper: wrap a backend-exec argv with `tmux respawn-pane
-k -t <pane_id>`. `-k` kills the existing process in the pane
before respawning."""
return ["tmux", "respawn-pane", "-k", "-t", pane_id, *agent_argv]
View File
@@ -0,0 +1,121 @@
"""Gitea deploy-key provisioner (PRD 0048, contrib).
Generates ed25519 keypairs via `ssh-keygen` and registers / deletes
them using the Gitea deploy-key HTTP API. No new Python dependencies —
only stdlib `urllib.request` and `subprocess`."""
from __future__ import annotations
import json
import subprocess
import tempfile
import urllib.error
import urllib.request
from pathlib import Path
from ...deploy_key_provisioner import DeployKeyProvisioner
class GiteaDeployKeyProvisioner(DeployKeyProvisioner):
"""Manages deploy keys on a Gitea instance."""
def __init__(self, *, token: str, api_url: str) -> None:
self._token = token
self._api_url = api_url.rstrip("/")
def create(self, owner_repo: str, title: str) -> tuple[str, bytes]:
"""Generate an ed25519 keypair, register the public half as a
repo deploy key, and return `(key_id, private_key_bytes)`.
The key is registered with `read_only=False` because git-gate
needs push access to forward gitleaks-scanned refs upstream."""
with tempfile.TemporaryDirectory() as tmpdir:
key_path = Path(tmpdir) / "key"
subprocess.run(
[
"ssh-keygen", "-t", "ed25519",
"-f", str(key_path),
"-N", "",
],
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
private_key = key_path.read_bytes()
public_key = key_path.with_suffix(".pub").read_text().strip()
owner, repo = _split_owner_repo(owner_repo)
url = f"{self._api_url}/api/v1/repos/{owner}/{repo}/keys"
payload = json.dumps({
"key": public_key,
"read_only": False,
"title": title,
}).encode()
req = urllib.request.Request(
url,
data=payload,
headers={
"Authorization": f"token {self._token}",
"Content-Type": "application/json",
},
method="POST",
)
try:
with urllib.request.urlopen(req) as resp:
body = json.loads(resp.read())
except urllib.error.HTTPError as exc:
_body = _read_error_body(exc)
raise RuntimeError(
f"failed to create deploy key for {owner_repo}: "
f"HTTP {exc.code}{_body}"
) from exc
except urllib.error.URLError as exc:
raise RuntimeError(
f"failed to create deploy key for {owner_repo}: {exc.reason}"
) from exc
return str(body["id"]), private_key
def delete(self, owner_repo: str, key_id: str) -> None:
"""Delete the deploy key. HTTP 404 (already gone) is success.
All other errors raise RuntimeError so teardown halts."""
owner, repo = _split_owner_repo(owner_repo)
url = f"{self._api_url}/api/v1/repos/{owner}/{repo}/keys/{key_id}"
req = urllib.request.Request(
url,
headers={"Authorization": f"token {self._token}"},
method="DELETE",
)
try:
with urllib.request.urlopen(req):
pass
except urllib.error.HTTPError as exc:
if exc.code == 404:
return
_body = _read_error_body(exc)
raise RuntimeError(
f"failed to delete deploy key {key_id} for {owner_repo}: "
f"HTTP {exc.code}{_body}"
) from exc
except urllib.error.URLError as exc:
raise RuntimeError(
f"failed to delete deploy key {key_id} for {owner_repo}: "
f"{exc.reason}"
) from exc
def _split_owner_repo(owner_repo: str) -> tuple[str, str]:
"""Split `'owner/repo'` into `('owner', 'repo')`."""
parts = owner_repo.split("/", 1)
if len(parts) != 2 or not all(parts):
raise ValueError(
f"expected 'owner/repo' format, got {owner_repo!r}"
)
return parts[0], parts[1]
def _read_error_body(exc: urllib.error.HTTPError) -> str:
try:
return exc.read().decode("utf-8", errors="replace")
except Exception:
return ""
+52
View File
@@ -0,0 +1,52 @@
"""Deploy-key provisioner interface and factory (PRD 0048).
The core defines the abstract contract; concrete implementations live
in `bot_bottle/contrib/<provider>/deploy_key_provisioner.py`. The
factory `get_provisioner` imports contrib modules lazily so that a
missing optional dependency in one provider doesn't break unrelated
features."""
from __future__ import annotations
from abc import ABC, abstractmethod
class DeployKeyProvisioner(ABC):
"""Manages a single deploy-key lifecycle on a remote forge."""
@abstractmethod
def create(self, owner_repo: str, title: str) -> tuple[str, bytes]:
"""Generate a keypair and register the public half as a
deploy key on the forge.
`owner_repo` is the `<owner>/<repo>` path (no `.git` suffix).
`title` is the human-readable label shown in the forge UI.
Returns `(key_id, private_key_bytes)` where `key_id` is opaque
to the caller and is only ever passed back to `delete`."""
@abstractmethod
def delete(self, owner_repo: str, key_id: str) -> None:
"""Delete the registered deploy key.
Must not raise if the key is already absent (HTTP 404 is
success). Must raise for all other failures so teardown halts."""
def get_provisioner(
provider: str, token: str, api_url: str
) -> DeployKeyProvisioner:
"""Instantiate the contrib provisioner for `provider`.
Raises `ManifestError` for unknown providers so the error surfaces
at parse time rather than at runtime."""
if provider == "gitea":
from bot_bottle.contrib.gitea.deploy_key_provisioner import (
GiteaDeployKeyProvisioner,
)
return GiteaDeployKeyProvisioner(token=token, api_url=api_url)
from .manifest_util import ManifestError
raise ManifestError(
f"unknown provisioned_key provider: {provider!r}; "
f"available: gitea"
)
+89 -1
View File
@@ -29,11 +29,14 @@ backend-specific and lives on concrete subclasses (see
from __future__ import annotations from __future__ import annotations
import dataclasses
import os
import shlex import shlex
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from .log import info
from .manifest import Bottle, GitEntry from .manifest import Bottle, GitEntry
@@ -357,6 +360,80 @@ exit 0
""" """
def _provision_dynamic_key(
entry: GitEntry,
slug: str,
stage_dir: Path,
) -> str:
"""Generate a fresh ed25519 keypair, register the public half with
the forge, and persist the private key + key ID under `stage_dir`.
Returns the host-side path to the private key file so the caller
can inject it into the GitGateUpstream as `identity_file`."""
from .deploy_key_provisioner import get_provisioner
pk = entry.ProvisionedKey
assert pk is not None
token = os.environ.get(pk.token_env)
if token is None:
raise RuntimeError(
f"git-gate.repos[{entry.Name!r}] provisioned_key.token_env"
f" = {pk.token_env!r}: env var is not set"
)
api_url = pk.api_url or f"https://{entry.UpstreamHost}"
provisioner = get_provisioner(pk.provider, token, api_url)
owner_repo = entry.UpstreamPath
if owner_repo.endswith(".git"):
owner_repo = owner_repo[:-4]
title = f"bot-bottle:{slug}:{entry.Name}"
info(f"provisioning deploy key for git-gate.repos[{entry.Name!r}]")
key_id, private_key_bytes = provisioner.create(owner_repo, title)
key_file = stage_dir / f"{entry.Name}-key"
key_file.write_bytes(private_key_bytes)
key_file.chmod(0o600)
id_file = stage_dir / f"{entry.Name}-deploy-key-id"
id_file.write_text(key_id)
id_file.chmod(0o600)
info(f"provisioned deploy key {key_id} for git-gate.repos[{entry.Name!r}]")
return str(key_file)
def revoke_git_gate_provisioned_keys(bottle: Bottle, stage_dir: Path) -> None:
"""Revoke all deploy keys provisioned for `bottle` during prepare.
Called at teardown after containers stop. Raises if any revocation
fails — a stranded key is a security concern that the operator must
address manually."""
from .deploy_key_provisioner import get_provisioner
for entry in bottle.git:
if entry.ProvisionedKey is None:
continue
pk = entry.ProvisionedKey
id_file = stage_dir / f"{entry.Name}-deploy-key-id"
if not id_file.exists():
continue
key_id = id_file.read_text().strip()
token = os.environ.get(pk.token_env)
if token is None:
raise RuntimeError(
f"git-gate.repos[{entry.Name!r}] provisioned_key.token_env"
f" = {pk.token_env!r}: env var is not set;"
f" cannot revoke deploy key {key_id}"
)
api_url = pk.api_url or f"https://{entry.UpstreamHost}"
provisioner = get_provisioner(pk.provider, token, api_url)
owner_repo = entry.UpstreamPath
if owner_repo.endswith(".git"):
owner_repo = owner_repo[:-4]
info(f"revoking deploy key {key_id} for git-gate.repos[{entry.Name!r}]")
provisioner.delete(owner_repo, key_id)
info(f"revoked deploy key {key_id} for git-gate.repos[{entry.Name!r}]")
class GitGate(ABC): class GitGate(ABC):
"""The per-agent git-gate. Encapsulates the host-side prepare """The per-agent git-gate. Encapsulates the host-side prepare
(upstream lift + entrypoint/hook render); the sidecar's (upstream lift + entrypoint/hook render); the sidecar's
@@ -368,10 +445,21 @@ class GitGate(ABC):
entrypoint, pre-receive hook, and access-hook scripts (mode entrypoint, pre-receive hook, and access-hook scripts (mode
600) under `stage_dir`. Pure host-side, no docker subprocess. 600) under `stage_dir`. Pure host-side, no docker subprocess.
For `provisioned_key` entries, also generates and registers
a fresh deploy key via the forge API and writes the private key
+ key ID to `stage_dir`.
Returned plan is incomplete: the launch step must fill Returned plan is incomplete: the launch step must fill
`internal_network` / `egress_network` via `dataclasses.replace` `internal_network` / `egress_network` via `dataclasses.replace`
before passing the plan to `.start`.""" before passing the plan to `.start`."""
upstreams = git_gate_upstreams_for_bottle(bottle) upstreams_list = list(git_gate_upstreams_for_bottle(bottle))
for i, entry in enumerate(bottle.git):
if entry.ProvisionedKey is not None:
key_file = _provision_dynamic_key(entry, slug, stage_dir)
upstreams_list[i] = dataclasses.replace(
upstreams_list[i], identity_file=key_file
)
upstreams = tuple(upstreams_list)
entrypoint = stage_dir / "git_gate_entrypoint.sh" entrypoint = stage_dir / "git_gate_entrypoint.sh"
entrypoint.write_text(git_gate_render_entrypoint(upstreams)) entrypoint.write_text(git_gate_render_entrypoint(upstreams))
entrypoint.chmod(0o600) entrypoint.chmod(0o600)
+90 -11
View File
@@ -4,6 +4,7 @@ from __future__ import annotations
import re import re
from dataclasses import dataclass from dataclasses import dataclass
from typing import Optional
from .manifest_util import ManifestError, as_json_object from .manifest_util import ManifestError, as_json_object
@@ -61,6 +62,24 @@ def validate_unique_git_names(bottle_name: str, git: tuple[GitEntry, ...]) -> No
seen[g.Name] = None seen[g.Name] = None
@dataclass(frozen=True)
class ProvisionedKeyConfig:
"""Configuration for automatic deploy-key lifecycle management
(PRD 0048). Used when a git-gate.repos entry opts out of a
static identity file and instead wants a fresh SSH keypair
generated at spin-up and revoked at teardown.
`provider` names the contrib sub-package to load (e.g. `gitea`).
`token_env` is the name of a host-side env var carrying the API
token; the value is read at provision time, never stored on the
plan. `api_url` is the forge's HTTP API root; if empty, it is
derived from the upstream URL's host at provision time."""
provider: str
token_env: str
api_url: str = ""
@dataclass(frozen=True) @dataclass(frozen=True)
class GitEntry: class GitEntry:
"""One upstream the per-agent git-gate (PRD 0008) is allowed to """One upstream the per-agent git-gate (PRD 0008) is allowed to
@@ -74,14 +93,15 @@ class GitEntry:
stashed in the `Upstream*` fields so the git-gate render step stashed in the `Upstream*` fields so the git-gate render step
doesn't have to re-parse. doesn't have to re-parse.
Manifest source: `git-gate.repos.<Name>` (PRD 0047). The YAML keys Manifest source: `git-gate.repos.<Name>` (PRD 0047/0048). Exactly
are `url`, `identity`, and `host_key`; the internal field names are one of `identity` (static key path) or `provisioned_key` (automatic
stable across that rename.""" lifecycle) must be present. The internal field names are stable."""
Name: str Name: str
Upstream: str Upstream: str
IdentityFile: str IdentityFile: str = ""
KnownHostKey: str = "" KnownHostKey: str = ""
ProvisionedKey: Optional[ProvisionedKeyConfig] = None
RemoteKey: str = "" RemoteKey: str = ""
UpstreamUser: str = "" UpstreamUser: str = ""
UpstreamHost: str = "" UpstreamHost: str = ""
@@ -94,8 +114,9 @@ class GitEntry:
) -> "GitEntry": ) -> "GitEntry":
"""Parse one entry from `git-gate.repos.<repo_name>`. """Parse one entry from `git-gate.repos.<repo_name>`.
YAML keys: `url` (required), `identity` (required), YAML keys: `url` (required), exactly one of `identity` or
`host_key` (optional). The repo_name becomes `Name`.""" `provisioned_key` (required), `host_key` (optional).
The repo_name becomes `Name`."""
if not repo_name: if not repo_name:
raise ManifestError( raise ManifestError(
f"bottle '{bottle_name}' git-gate.repos has an empty key" f"bottle '{bottle_name}' git-gate.repos has an empty key"
@@ -108,21 +129,44 @@ class GitEntry:
label = f"git-gate.repos[{repo_name!r}]" label = f"git-gate.repos[{repo_name!r}]"
d = as_json_object(raw, f"bottle '{bottle_name}' {label}") d = as_json_object(raw, f"bottle '{bottle_name}' {label}")
for k in d: for k in d:
if k not in {"url", "identity", "host_key"}: if k not in {"url", "identity", "provisioned_key", "host_key"}:
raise ManifestError( raise ManifestError(
f"bottle '{bottle_name}' {label} has unknown key {k!r}; " f"bottle '{bottle_name}' {label} has unknown key {k!r}; "
f"allowed: url, identity, host_key" f"allowed: url, identity, provisioned_key, host_key"
) )
upstream = d.get("url") upstream = d.get("url")
if not isinstance(upstream, str) or not upstream: if not isinstance(upstream, str) or not upstream:
raise ManifestError( raise ManifestError(
f"bottle '{bottle_name}' {label} missing required string field 'url'" f"bottle '{bottle_name}' {label} missing required string field 'url'"
) )
ident = d.get("identity")
if not isinstance(ident, str) or not ident: has_identity = "identity" in d
has_provisioned = "provisioned_key" in d
if has_identity and has_provisioned:
raise ManifestError( raise ManifestError(
f"bottle '{bottle_name}' {label} missing required string field 'identity'" f"bottle '{bottle_name}' {label} must set exactly one of "
f"'identity' or 'provisioned_key'; got both."
) )
if not has_identity and not has_provisioned:
raise ManifestError(
f"bottle '{bottle_name}' {label} must set exactly one of "
f"'identity' or 'provisioned_key'; got neither."
)
ident = ""
provisioned_key: Optional[ProvisionedKeyConfig] = None
if has_identity:
raw_ident = d.get("identity")
if not isinstance(raw_ident, str) or not raw_ident:
raise ManifestError(
f"bottle '{bottle_name}' {label} 'identity' must be a non-empty string"
)
ident = raw_ident
else:
provisioned_key = _parse_provisioned_key_config(
bottle_name, label, d["provisioned_key"]
)
khk = _opt_str( khk = _opt_str(
d.get("host_key"), d.get("host_key"),
f"bottle '{bottle_name}' {label} host_key", f"bottle '{bottle_name}' {label} host_key",
@@ -135,6 +179,7 @@ class GitEntry:
Upstream=upstream, Upstream=upstream,
IdentityFile=ident, IdentityFile=ident,
KnownHostKey=khk, KnownHostKey=khk,
ProvisionedKey=provisioned_key,
RemoteKey=host, RemoteKey=host,
UpstreamUser=user, UpstreamUser=user,
UpstreamHost=host, UpstreamHost=host,
@@ -143,6 +188,40 @@ class GitEntry:
) )
def _parse_provisioned_key_config(
bottle_name: str, label: str, raw: object
) -> ProvisionedKeyConfig:
d = as_json_object(raw, f"bottle '{bottle_name}' {label}.provisioned_key")
for k in d:
if k not in {"provider", "token_env", "api_url"}:
raise ManifestError(
f"bottle '{bottle_name}' {label}.provisioned_key has unknown key {k!r}; "
f"allowed: provider, token_env, api_url"
)
provider = d.get("provider")
if not isinstance(provider, str) or not provider:
raise ManifestError(
f"bottle '{bottle_name}' {label}.provisioned_key missing required "
f"string field 'provider'"
)
token_env = d.get("token_env")
if not isinstance(token_env, str) or not token_env:
raise ManifestError(
f"bottle '{bottle_name}' {label}.provisioned_key missing required "
f"string field 'token_env'"
)
api_url_raw = d.get("api_url", "")
if not isinstance(api_url_raw, str):
raise ManifestError(
f"bottle '{bottle_name}' {label}.provisioned_key 'api_url' must be a string"
)
return ProvisionedKeyConfig(
provider=provider,
token_env=token_env,
api_url=api_url_raw,
)
@dataclass(frozen=True) @dataclass(frozen=True)
class GitUser: class GitUser:
"""Per-bottle `git config --global user.name` / `user.email` """Per-bottle `git config --global user.name` / `user.email`
@@ -0,0 +1,296 @@
# PRD 0048: SSH Deploy-Key Provisioning
- **Status:** Active
- **Author:** didericis-claude
- **Created:** 2026-06-03
- **Issue:** #169
## Summary
Replace per-repo static SSH identity files with short-lived ed25519 deploy
keys that are generated at spin-up and revoked at teardown. Introduce
`bot_bottle/contrib/` as the package for platform-specific provisioners and
ship the first contrib sub-package: `bot_bottle/contrib/gitea/` with
`GiteaDeployKeyProvisioner`. A new `provisioned_key:` block in `git-gate.repos`
entries opts a repo into automatic key lifecycle management; `identity:` stays
valid for operators who supply their own key material.
## Problem
The current `git-gate.repos` entries require an `identity:` field pointing to
a host-side SSH private key (PRD 0047). Keys are static: the operator generates
them once, registers them with the upstream forge, and the same key is reused
across every bottle spin-up. This has several consequences:
- **No automatic revocation.** If a bottle misbehaves or a key leaks, the
operator must notice and manually delete the key from the forge. There is no
teardown hook that does it.
- **Broad blast radius.** A forge deploy key typically grants write access for
the lifetime of the key. A static key that survives bottle teardown continues
to grant that access.
- **Manual rotation burden.** Operators must manage key files on disk, keeping
them secure, rotating them on a schedule, and distributing them across hosts
that run `./cli.py start`.
## Goals / Success Criteria
- `git-gate.repos` entries accept `provisioned_key:` as an alternative to
`identity:`. The parser rejects entries that have both, or neither.
- `provisioned_key.provider: gitea` provisions and revokes deploy keys via the
Gitea HTTP API.
- At prepare time the provisioner generates a fresh ed25519 keypair, registers
the public half as a repo-scoped deploy key, and makes the private key
available to git-gate at the path it expects — the rest of the pipeline is
unchanged.
- At teardown the provisioner deletes the registered deploy key. Failure to
delete halts teardown and propagates the error loudly.
- `bot_bottle/contrib/` is introduced as the package for platform-specific
implementations; the core defines the abstract interface; contrib sub-packages
provide concrete implementations.
- Existing `identity:`-based repos continue to work without change.
- The unit test suite passes unchanged for `identity:` paths; new tests cover
`provisioned_key:` parse, validation, and provisioner dispatch.
## Non-goals
- GitHub, GitLab, or other forge providers (a future contrib sub-package each).
- Dashboard UI for listing or revoking orphaned deploy keys.
- SSH CA certificate approach (rejected in the issue thread in favour of
per-repo deploy keys for simpler revocation, smaller blast radius, and forge
compatibility).
- Key rotation mid-session (keys live for exactly one spin-up / teardown cycle).
- Any change to how `identity:` repos are provisioned.
## Design
### Manifest changes (builds on PRD 0047)
`git-gate.repos.<name>` currently accepts exactly:
```
url (required string)
identity (required string)
host_key (optional string)
```
After this PRD:
```
url (required string)
identity (optional string — mutually exclusive with provisioned_key)
provisioned_key (optional object — mutually exclusive with identity)
host_key (optional string)
```
Exactly one of `identity` or `provisioned_key` must be present. The parser
emits a targeted error for each violation:
```
bottle 'dev' git-gate.repos['bot-bottle'] must set exactly one of
'identity' or 'provisioned_key'; got neither.
bottle 'dev' git-gate.repos['bot-bottle'] must set exactly one of
'identity' or 'provisioned_key'; got both.
```
`provisioned_key` object schema:
```yaml
provisioned_key:
provider: gitea # required; names the contrib module to load
token_env: GITEA_TOKEN # required; name of a host env var holding the API token
api_url: https://... # optional; defaults to https://<host from url>
```
| Field | Type | Notes |
|-------|------|-------|
| `provider` | required string | Must match a sub-package under `bot_bottle/contrib/` |
| `token_env` | required string | Resolved at provision time via `os.environ`; never stored in plan |
| `api_url` | optional string | Override when the API endpoint differs from the git host |
**Example bottle manifest:**
```yaml
git-gate:
user:
name: implementer-bot
email: eric+implementer@dideric.is
repos:
bot-bottle:
url: ssh://git@gitea.dideric.is:30009/didericis/bot-bottle.git
provisioned_key:
provider: gitea
token_env: GITEA_DEPLOY_TOKEN
host_key: "ssh-rsa AAAA..."
```
### `contrib` package structure
```
bot_bottle/
contrib/
__init__.py # empty; no core symbols
gitea/
__init__.py # empty
deploy_key_provisioner.py
```
`contrib` is a flat namespace of forge/platform sub-packages. Each sub-package
is self-contained; the core imports from contrib lazily (inside factory
functions) so that missing optional dependencies in a contrib sub-package don't
break unrelated features.
### Core interface
New file: `bot_bottle/deploy_key_provisioner.py`
```python
from abc import ABC, abstractmethod
class DeployKeyProvisioner(ABC):
@abstractmethod
def create(self, owner_repo: str, title: str) -> tuple[str, bytes]:
"""Generate a keypair and register the public half.
owner_repo: '<owner>/<repo>' portion of the git upstream URL.
title: human-readable label shown in the forge key list.
Returns (key_id, private_key_pem) where key_id is opaque to
the caller and is only passed back to delete()."""
@abstractmethod
def delete(self, owner_repo: str, key_id: str) -> None:
"""Delete the registered deploy key.
Must not raise if the key is already absent (HTTP 404 is success).
Must raise for all other failures so that teardown halts."""
def get_provisioner(provider: str, token: str, api_url: str) -> DeployKeyProvisioner:
"""Instantiate the named contrib provisioner.
Raises ManifestError for unknown providers so the error is caught
at parse time rather than at runtime."""
if provider == "gitea":
from bot_bottle.contrib.gitea.deploy_key_provisioner import (
GiteaDeployKeyProvisioner,
)
return GiteaDeployKeyProvisioner(token=token, api_url=api_url)
from .manifest_util import ManifestError
raise ManifestError(f"unknown provisioned_key provider: {provider!r}")
```
### Gitea contrib implementation
`bot_bottle/contrib/gitea/deploy_key_provisioner.py`:
`create(owner_repo, title)`:
1. Generate an ed25519 keypair via `ssh-keygen -t ed25519 -f <tmpfile> -N ''`
(uses the SSH tooling already required by git-gate; no new Python dependency).
2. Read the private key bytes and the `.pub` file.
3. `POST /api/v1/repos/{owner}/{repo}/keys` with the public key, `title`, and
`read_only: false` (deploy keys always need push access for git-gate).
4. Return `(str(response["id"]), private_key_bytes)`.
`delete(owner_repo, key_id)`:
1. `DELETE /api/v1/repos/{owner}/{repo}/keys/{id}`.
2. Treat HTTP 404 as success (key already gone).
3. Raise `RuntimeError` for any other non-2xx response or network error,
including the status code and response body in the message.
HTTP calls use `urllib.request` from the stdlib; no new runtime dependency.
### `GitEntry` dataclass changes
`bot_bottle/manifest_git.py`:
- Add `ProvisionedKeyConfig` dataclass:
```python
@dataclass(frozen=True)
class ProvisionedKeyConfig:
provider: str
token_env: str
api_url: str # empty string means "derive from UpstreamHost"
```
- `GitEntry`:
- `IdentityFile: str` unchanged internally; empty string when
`provisioned_key` is used; set at provision time, not parse time.
- New field: `ProvisionedKey: ProvisionedKeyConfig | None = None`
- `from_repos_entry` validates the mutually-exclusive constraint and parses
the `provisioned_key` block when present.
### `GitGateUpstream` / prepare-time changes
`bot_bottle/git_gate.py` and `bot_bottle/backend/docker/provision/git.py`:
The existing path writes the identity file path into `GitGateUpstream.IdentityFile`
and docker-cp's it into `/git-gate/creds/<name>-key`. That path stays unchanged
for `identity:` repos.
For `provisioned_key:` repos, a new helper `provision_deploy_key(entry,
stage_dir, bottle_name)` runs before the git-gate sidecar starts:
1. Resolve `token = os.environ[entry.ProvisionedKey.token_env]`. Missing key
raises `RuntimeError` with a clear message naming the env var.
2. Resolve `api_url = entry.ProvisionedKey.api_url or f"https://{entry.UpstreamHost}"`.
3. Instantiate `get_provisioner(entry.ProvisionedKey.provider, token, api_url)`.
4. Call `provisioner.create(entry.UpstreamPath.lstrip("/"), title)` where
`title = f"bot-bottle:{bottle_name}:{entry.Name}"`.
5. Write private key to `stage_dir / f"{entry.Name}-key"` (mode 0o600).
6. Write key ID to `stage_dir / f"{entry.Name}-deploy-key-id"` (plain text).
7. Return the key file path; caller sets `GitGateUpstream.IdentityFile` to it.
`owner_repo` is extracted from `entry.UpstreamPath` (the path component of the
`ssh://` URL, e.g. `/didericis/bot-bottle.git` → `didericis/bot-bottle`).
### Teardown changes
`bot_bottle/backend/docker/cleanup.py` (or the equivalent teardown path):
After the git-gate sidecar stops, for each `GitEntry` with `ProvisionedKey`
set:
1. Check that `stage_dir / f"{entry.Name}-deploy-key-id"` exists; skip if
absent (provision never ran or already cleaned up).
2. Resolve token and API URL as above.
3. Instantiate provisioner and call `provisioner.delete(owner_repo, key_id)`.
4. On success, log at INFO. On failure, allow the exception to propagate —
teardown halts and the error surfaces to the operator.
A stranded deploy key is a security concern: the operator must know about it
and address it manually. Silent continuation is not acceptable.
The private key file in `stage_dir` is cleaned up as part of normal stage-dir
teardown (no extra step needed).
## Testing strategy
```
python3 -m unittest discover -s tests/unit
```
New / modified test files:
- `tests/unit/test_manifest_git.py` — add cases for:
- `provisioned_key:` accepted with valid `provider`, `token_env`, optional `api_url`
- Both `identity` and `provisioned_key` present → `ManifestError`
- Neither `identity` nor `provisioned_key` present → `ManifestError`
- Unknown key inside `provisioned_key` block → `ManifestError`
- Missing `provider` or `token_env` inside `provisioned_key` → `ManifestError`
- `tests/unit/test_deploy_key_provisioner.py` — new:
- `get_provisioner("gitea", ...)` returns `GiteaDeployKeyProvisioner`
- `get_provisioner("unknown", ...)` raises `ManifestError`
- `tests/unit/test_contrib_gitea_deploy_key.py` — new (using `unittest.mock`
to stub `urllib.request.urlopen` and `subprocess.run`):
- `create()` calls `ssh-keygen`, POSTs to correct endpoint, returns key ID
- `delete()` DELETEs to correct endpoint
- `delete()` tolerates HTTP 404 (already-deleted key)
- `delete()` raises `RuntimeError` on non-404 HTTP error
## Open questions
None.
+166
View File
@@ -0,0 +1,166 @@
"""Unit: GiteaDeployKeyProvisioner (PRD 0048, contrib/gitea)."""
from __future__ import annotations
import json
import unittest
import urllib.error
from io import BytesIO
from pathlib import Path
from tempfile import mkdtemp
from unittest.mock import MagicMock, call, patch
from bot_bottle.contrib.gitea.deploy_key_provisioner import (
GiteaDeployKeyProvisioner,
_split_owner_repo,
)
def _provisioner() -> GiteaDeployKeyProvisioner:
return GiteaDeployKeyProvisioner(
token="test-token", api_url="https://gitea.example.com"
)
def _urlopen_response(body: dict, status: int = 200) -> MagicMock:
resp = MagicMock()
resp.read.return_value = json.dumps(body).encode()
resp.status = status
resp.__enter__ = lambda s: s
resp.__exit__ = MagicMock(return_value=False)
return resp
def _http_error(code: int, body: str = "") -> urllib.error.HTTPError:
return urllib.error.HTTPError(
url="http://x",
code=code,
msg="err",
hdrs=None, # type: ignore[arg-type]
fp=BytesIO(body.encode()),
)
class TestCreate(unittest.TestCase):
def test_create_calls_ssh_keygen_and_posts_to_api(self):
provisioner = _provisioner()
fake_key_id = 42
fake_private = b"PRIVATE_KEY"
fake_public = "ssh-ed25519 AAAA fake"
with patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.subprocess.run"
) as mock_run, patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.urllib.request.urlopen"
) as mock_urlopen, patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.Path.read_bytes",
return_value=fake_private,
), patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.Path.read_text",
return_value=fake_public + "\n",
):
mock_urlopen.return_value = _urlopen_response({"id": fake_key_id})
key_id, private_bytes = provisioner.create(
"didericis/bot-bottle", "bot-bottle:slug:repo"
)
# ssh-keygen called with ed25519
mock_run.assert_called_once()
run_args = mock_run.call_args.args[0]
self.assertIn("ssh-keygen", run_args)
self.assertIn("-t", run_args)
self.assertIn("ed25519", run_args)
# POST body contains public key
post_call = mock_urlopen.call_args.args[0]
payload = json.loads(post_call.data)
self.assertEqual(fake_public, payload["key"])
self.assertFalse(payload["read_only"])
# Correct URL
self.assertIn(
"/api/v1/repos/didericis/bot-bottle/keys", post_call.full_url
)
self.assertEqual(str(fake_key_id), key_id)
self.assertEqual(fake_private, private_bytes)
def test_create_raises_on_http_error(self):
provisioner = _provisioner()
with patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.subprocess.run"
), patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.urllib.request.urlopen",
side_effect=_http_error(403, "forbidden"),
), patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.Path.read_bytes",
return_value=b"pk",
), patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.Path.read_text",
return_value="ssh-ed25519 AAAA\n",
):
with self.assertRaises(RuntimeError) as ctx:
provisioner.create("owner/repo", "title")
self.assertIn("403", str(ctx.exception))
class TestDelete(unittest.TestCase):
def test_delete_calls_correct_endpoint(self):
provisioner = _provisioner()
with patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.urllib.request.urlopen"
) as mock_urlopen:
mock_urlopen.return_value = _urlopen_response({})
provisioner.delete("didericis/bot-bottle", "99")
req = mock_urlopen.call_args.args[0]
self.assertIn("/api/v1/repos/didericis/bot-bottle/keys/99", req.full_url)
self.assertEqual("DELETE", req.get_method())
def test_delete_tolerates_404(self):
provisioner = _provisioner()
with patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.urllib.request.urlopen",
side_effect=_http_error(404),
):
provisioner.delete("owner/repo", "123") # must not raise
def test_delete_raises_on_non_404_http_error(self):
provisioner = _provisioner()
with patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.urllib.request.urlopen",
side_effect=_http_error(500, "internal server error"),
):
with self.assertRaises(RuntimeError) as ctx:
provisioner.delete("owner/repo", "7")
self.assertIn("500", str(ctx.exception))
def test_delete_raises_on_url_error(self):
provisioner = _provisioner()
with patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.urllib.request.urlopen",
side_effect=urllib.error.URLError("connection refused"),
):
with self.assertRaises(RuntimeError) as ctx:
provisioner.delete("owner/repo", "7")
self.assertIn("connection refused", str(ctx.exception))
class TestSplitOwnerRepo(unittest.TestCase):
def test_simple(self):
self.assertEqual(("owner", "repo"), _split_owner_repo("owner/repo"))
def test_raises_on_missing_slash(self):
with self.assertRaises(ValueError):
_split_owner_repo("noslash")
def test_raises_on_empty_owner(self):
with self.assertRaises(ValueError):
_split_owner_repo("/repo")
def test_raises_on_empty_repo(self):
with self.assertRaises(ValueError):
_split_owner_repo("owner/")
if __name__ == "__main__":
unittest.main()
+94
View File
@@ -0,0 +1,94 @@
"""Unit: dashboard_model — state/model layer extracted from dashboard.py.
Tests for functions that were previously buried in the 2103-line
dashboard.py and had no coverage: _approval_status,
_proposed_payload_label, and _suffix_for_tool."""
import unittest
from pathlib import Path
from bot_bottle.cli.dashboard_model import (
QueuedProposal,
_approval_status,
_proposed_payload_label,
_suffix_for_tool,
)
from bot_bottle.supervise import (
Proposal,
TOOL_CAPABILITY_BLOCK,
TOOL_EGRESS_BLOCK,
TOOL_PIPELOCK_BLOCK,
sha256_hex,
)
from datetime import datetime, timezone
def _qp(tool: str, slug: str = "dev") -> QueuedProposal:
payload = "x"
p = Proposal.new(
bottle_slug=slug,
tool=tool,
proposed_file=payload,
justification="test",
current_file_hash=sha256_hex(payload),
now=datetime(2026, 6, 1, 0, 0, 0, tzinfo=timezone.utc),
)
return QueuedProposal(proposal=p, queue_dir=Path("/tmp/q"))
class TestApprovalStatus(unittest.TestCase):
def test_egress_block_base_message(self):
qp = _qp(TOOL_EGRESS_BLOCK, slug="my-bot")
msg = _approval_status(qp, "approved")
self.assertEqual("approved egress-block for [my-bot]", msg)
def test_modified_verb(self):
qp = _qp(TOOL_PIPELOCK_BLOCK, slug="dev")
msg = _approval_status(qp, "modified+approved")
self.assertEqual("modified+approved pipelock-block for [dev]", msg)
def test_capability_block_appends_resume_hint(self):
qp = _qp(TOOL_CAPABILITY_BLOCK, slug="alpha")
msg = _approval_status(qp, "approved")
self.assertIn("resume: ./cli.py resume alpha", msg)
self.assertIn("approved capability-block for [alpha]", msg)
def test_egress_block_has_no_resume_hint(self):
qp = _qp(TOOL_EGRESS_BLOCK)
self.assertNotIn("resume", _approval_status(qp, "approved"))
def test_pipelock_block_has_no_resume_hint(self):
qp = _qp(TOOL_PIPELOCK_BLOCK)
self.assertNotIn("resume", _approval_status(qp, "approved"))
class TestProposedPayloadLabel(unittest.TestCase):
def test_pipelock_returns_failed_url(self):
self.assertEqual("failed URL", _proposed_payload_label(TOOL_PIPELOCK_BLOCK))
def test_egress_returns_proposed_file(self):
self.assertEqual("proposed file", _proposed_payload_label(TOOL_EGRESS_BLOCK))
def test_capability_returns_proposed_file(self):
self.assertEqual("proposed file", _proposed_payload_label(TOOL_CAPABILITY_BLOCK))
def test_unknown_tool_returns_proposed_file(self):
self.assertEqual("proposed file", _proposed_payload_label("unknown-tool"))
class TestSuffixForTool(unittest.TestCase):
def test_capability_block_returns_dockerfile_suffix(self):
self.assertEqual(".dockerfile", _suffix_for_tool(TOOL_CAPABILITY_BLOCK))
def test_egress_block_returns_txt(self):
self.assertEqual(".txt", _suffix_for_tool(TOOL_EGRESS_BLOCK))
def test_pipelock_block_returns_txt(self):
self.assertEqual(".txt", _suffix_for_tool(TOOL_PIPELOCK_BLOCK))
def test_unknown_tool_returns_txt(self):
self.assertEqual(".txt", _suffix_for_tool("whatever"))
if __name__ == "__main__":
unittest.main()
+29
View File
@@ -0,0 +1,29 @@
"""Unit: deploy_key_provisioner factory (PRD 0048)."""
from __future__ import annotations
import unittest
from unittest.mock import patch
from bot_bottle.deploy_key_provisioner import DeployKeyProvisioner, get_provisioner
from bot_bottle.manifest import ManifestError
class TestGetProvisioner(unittest.TestCase):
def test_gitea_returns_gitea_provisioner(self):
from bot_bottle.contrib.gitea.deploy_key_provisioner import (
GiteaDeployKeyProvisioner,
)
p = get_provisioner("gitea", token="tok", api_url="https://gitea.example.com")
self.assertIsInstance(p, GiteaDeployKeyProvisioner)
self.assertIsInstance(p, DeployKeyProvisioner)
def test_unknown_provider_raises_manifest_error(self):
with self.assertRaises(ManifestError) as ctx:
get_provisioner("github", token="tok", api_url="https://github.com")
self.assertIn("github", str(ctx.exception))
self.assertIn("provisioned_key provider", str(ctx.exception))
if __name__ == "__main__":
unittest.main()
+107
View File
@@ -243,6 +243,113 @@ class TestGitEntryCrossValidation(unittest.TestCase):
self.assertIn("PRD 0047", msg) self.assertIn("PRD 0047", msg)
class TestProvisionedKey(unittest.TestCase):
"""git-gate.repos entries that use provisioned_key (PRD 0048)."""
def test_provisioned_key_minimal(self):
m = Manifest.from_json_obj(_manifest({
"bot-bottle": {
"url": "ssh://git@gitea.dideric.is:30009/didericis/bot-bottle.git",
"provisioned_key": {
"provider": "gitea",
"token_env": "GITEA_TOKEN",
},
},
}))
e = m.bottles["dev"].git[0]
self.assertEqual("bot-bottle", e.Name)
self.assertIsNotNone(e.ProvisionedKey)
assert e.ProvisionedKey is not None
self.assertEqual("gitea", e.ProvisionedKey.provider)
self.assertEqual("GITEA_TOKEN", e.ProvisionedKey.token_env)
self.assertEqual("", e.ProvisionedKey.api_url)
self.assertEqual("", e.IdentityFile)
def test_provisioned_key_with_api_url(self):
m = Manifest.from_json_obj(_manifest({
"repo": {
"url": "ssh://git@gitea.example.com/org/repo.git",
"provisioned_key": {
"provider": "gitea",
"token_env": "MY_TOKEN",
"api_url": "https://gitea.example.com",
},
},
}))
pk = m.bottles["dev"].git[0].ProvisionedKey
assert pk is not None
self.assertEqual("https://gitea.example.com", pk.api_url)
def test_both_identity_and_provisioned_key_dies(self):
with self.assertRaises(ManifestError) as ctx:
Manifest.from_json_obj(_manifest({
"foo": {
"url": "ssh://git@github.com/foo.git",
"identity": "/dev/null",
"provisioned_key": {"provider": "gitea", "token_env": "T"},
},
}))
self.assertIn("exactly one of", str(ctx.exception))
self.assertIn("got both", str(ctx.exception))
def test_neither_identity_nor_provisioned_key_dies(self):
with self.assertRaises(ManifestError) as ctx:
Manifest.from_json_obj(_manifest({
"foo": {"url": "ssh://git@github.com/foo.git"},
}))
self.assertIn("exactly one of", str(ctx.exception))
self.assertIn("got neither", str(ctx.exception))
def test_unknown_key_in_provisioned_key_block_dies(self):
with self.assertRaises(ManifestError):
Manifest.from_json_obj(_manifest({
"foo": {
"url": "ssh://git@github.com/foo.git",
"provisioned_key": {
"provider": "gitea",
"token_env": "T",
"key_type": "rsa", # not allowed
},
},
}))
def test_missing_provider_dies(self):
with self.assertRaises(ManifestError):
Manifest.from_json_obj(_manifest({
"foo": {
"url": "ssh://git@github.com/foo.git",
"provisioned_key": {"token_env": "T"},
},
}))
def test_missing_token_env_dies(self):
with self.assertRaises(ManifestError):
Manifest.from_json_obj(_manifest({
"foo": {
"url": "ssh://git@github.com/foo.git",
"provisioned_key": {"provider": "gitea"},
},
}))
def test_provisioned_key_entry_has_no_identity_file(self):
m = Manifest.from_json_obj(_manifest({
"foo": {
"url": "ssh://git@github.com/didericis/foo.git",
"provisioned_key": {"provider": "gitea", "token_env": "T"},
},
}))
self.assertEqual("", m.bottles["dev"].git[0].IdentityFile)
def test_identity_entry_has_no_provisioned_key(self):
m = Manifest.from_json_obj(_manifest({
"foo": {
"url": "ssh://git@github.com/foo.git",
"identity": "/dev/null",
},
}))
self.assertIsNone(m.bottles["dev"].git[0].ProvisionedKey)
class TestEmptyGitGateField(unittest.TestCase): class TestEmptyGitGateField(unittest.TestCase):
def test_no_git_gate_field_yields_empty_tuple(self): def test_no_git_gate_field_yields_empty_tuple(self):
m = Manifest.from_json_obj({ m = Manifest.from_json_obj({