Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 8e076e2c14 |
@@ -43,7 +43,6 @@ from pathlib import Path
|
||||
from typing import Callable, Generator
|
||||
|
||||
from ...egress import egress_resolve_token_values
|
||||
from ...git_gate import revoke_git_gate_provisioned_keys
|
||||
from ...log import info, warn
|
||||
from . import network as network_mod
|
||||
from . import util as docker_mod
|
||||
@@ -52,7 +51,6 @@ from .bottle_plan import DockerBottlePlan
|
||||
from .bottle_state import (
|
||||
bottle_state_dir,
|
||||
egress_state_dir,
|
||||
git_gate_state_dir,
|
||||
pipelock_state_dir,
|
||||
)
|
||||
from .compose import (
|
||||
@@ -86,9 +84,6 @@ def launch(
|
||||
Teardown on exit."""
|
||||
stack = ExitStack()
|
||||
|
||||
_bottle_for_revoke = plan.spec.manifest.bottle_for(plan.spec.agent_name)
|
||||
_git_gate_dir_for_revoke = git_gate_state_dir(plan.slug)
|
||||
|
||||
def teardown() -> None:
|
||||
try:
|
||||
stack.close()
|
||||
@@ -97,9 +92,6 @@ def launch(
|
||||
f"teardown failed for container {plan.container_name}"
|
||||
f" (compose-down): {exc!r}"
|
||||
)
|
||||
revoke_git_gate_provisioned_keys(
|
||||
_bottle_for_revoke, _git_gate_dir_for_revoke
|
||||
)
|
||||
|
||||
try:
|
||||
# Step 1: agent image build. Sidecar images get built lazily by
|
||||
|
||||
@@ -53,9 +53,6 @@ from ..docker.pipelock import (
|
||||
PIPELOCK_PORT as _PIPELOCK_PORT_STR,
|
||||
pipelock_tls_init,
|
||||
)
|
||||
from ...git_gate import revoke_git_gate_provisioned_keys
|
||||
from ...log import warn
|
||||
from ..docker.bottle_state import git_gate_state_dir
|
||||
from . import loopback_alias as _loopback
|
||||
from . import sidecar_bundle as _bundle
|
||||
from . import smolvm as _smolvm
|
||||
@@ -123,28 +120,7 @@ def launch(
|
||||
agent_prompt_mode=plan.agent_prompt_mode,
|
||||
)
|
||||
finally:
|
||||
_teardown_smolmachines(stack, plan)
|
||||
|
||||
|
||||
def _teardown_smolmachines(
|
||||
stack: ExitStack,
|
||||
plan: SmolmachinesBottlePlan,
|
||||
) -> None:
|
||||
"""Unwind the ExitStack, then revoke any provisioned deploy keys.
|
||||
|
||||
ExitStack errors are caught and logged (non-fatal) so that key
|
||||
revocation always runs. Revocation errors propagate — a stranded
|
||||
deploy key is a security concern the operator must address."""
|
||||
teardown_exc: BaseException | None = None
|
||||
try:
|
||||
stack.close()
|
||||
except BaseException as exc:
|
||||
teardown_exc = exc
|
||||
warn(f"smolmachines teardown failed: {exc!r}")
|
||||
bottle = plan.spec.manifest.bottle_for(plan.spec.agent_name)
|
||||
revoke_git_gate_provisioned_keys(bottle, git_gate_state_dir(plan.slug))
|
||||
if teardown_exc is not None:
|
||||
raise teardown_exc
|
||||
|
||||
|
||||
def _allocate_resources(
|
||||
|
||||
+390
-28
@@ -15,27 +15,31 @@ import argparse
|
||||
import contextlib
|
||||
import curses
|
||||
import os
|
||||
import shlex
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
import traceback
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
from .. import supervise as _supervise
|
||||
from ..agent_provider import runtime_for
|
||||
from ..backend import (
|
||||
ActiveAgent,
|
||||
BottleSpec,
|
||||
enumerate_active_agents,
|
||||
get_bottle_backend,
|
||||
known_backend_names,
|
||||
)
|
||||
from ..backend.docker.bottle_state import bottle_state_dir, read_metadata
|
||||
from ..backend.docker.capability_apply import (
|
||||
CapabilityApplyError,
|
||||
apply_capability_change,
|
||||
)
|
||||
from ..backend.docker.bottle_state import bottle_state_dir, read_metadata
|
||||
from ..backend.docker.egress_apply import (
|
||||
EgressApplyError,
|
||||
add_route,
|
||||
@@ -64,38 +68,12 @@ from ..supervise import (
|
||||
TOOL_EGRESS_BLOCK,
|
||||
TOOL_PIPELOCK_BLOCK,
|
||||
archive_proposal,
|
||||
list_pending_proposals,
|
||||
render_diff,
|
||||
write_audit_entry,
|
||||
write_response,
|
||||
)
|
||||
from ._common import PROG, USER_CWD
|
||||
from .dashboard_model import (
|
||||
PANE_AGENTS,
|
||||
PANE_PROPOSALS,
|
||||
QueuedProposal,
|
||||
_NEW_PROPOSAL_HIGHLIGHT_SEC,
|
||||
_REFRESH_INTERVAL_MS,
|
||||
_agent_runtime_args,
|
||||
_approval_status,
|
||||
_bottle_for_slug,
|
||||
_build_respawn_pane_argv,
|
||||
_build_resume_argv_with_fallback,
|
||||
_build_split_pane_argv,
|
||||
_detail_lines,
|
||||
_failed_url_host,
|
||||
_filter_agents,
|
||||
_format_agent_row,
|
||||
_in_tmux,
|
||||
_is_recent,
|
||||
_pick_next_after_stop,
|
||||
_proposed_payload_label,
|
||||
_running_counts,
|
||||
_selected_agent,
|
||||
_selection_status,
|
||||
_suffix_for_tool,
|
||||
discover_active_agents,
|
||||
discover_pending,
|
||||
)
|
||||
from .start import (
|
||||
attach_agent,
|
||||
capture_claude_session_state,
|
||||
@@ -110,6 +88,55 @@ from .start import (
|
||||
ApplyError = (EgressApplyError, PipelockApplyError, CapabilityApplyError)
|
||||
|
||||
|
||||
# --- Discovery -------------------------------------------------------------
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class QueuedProposal:
|
||||
"""A pending proposal plus the queue dir it was found in."""
|
||||
|
||||
proposal: Proposal
|
||||
queue_dir: Path
|
||||
|
||||
|
||||
def discover_active_agents() -> list[ActiveAgent]:
|
||||
"""All currently-running agents across every backend with
|
||||
their metadata + service set. Returns [] when neither
|
||||
backend is reachable. Backed by the shared
|
||||
`enumerate_active_agents` helper so the CLI's
|
||||
`./cli.py list active` and this dashboard show the same data."""
|
||||
return enumerate_active_agents()
|
||||
|
||||
|
||||
|
||||
|
||||
def _approval_status(qp: QueuedProposal, verb: str) -> str:
|
||||
"""Status-line text after a successful approval. For capability-
|
||||
block, append the `resume <identity>` hint so the operator can
|
||||
bring the rebuilt bottle back up with one copy-paste."""
|
||||
base = f"{verb} {qp.proposal.tool} for [{qp.proposal.bottle_slug}]"
|
||||
if qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
|
||||
return f"{base}; resume: ./cli.py resume {qp.proposal.bottle_slug}"
|
||||
return base
|
||||
|
||||
|
||||
def discover_pending() -> list[QueuedProposal]:
|
||||
"""Walk ~/.bot-bottle/queue/* and collect pending proposals
|
||||
from every bottle's queue. Sorted by arrival time across the
|
||||
union — the operator works the global FIFO."""
|
||||
queue_root = _supervise.bot_bottle_root() / "queue"
|
||||
if not queue_root.is_dir():
|
||||
return []
|
||||
out: list[QueuedProposal] = []
|
||||
for slug_dir in sorted(queue_root.iterdir()):
|
||||
if not slug_dir.is_dir():
|
||||
continue
|
||||
for proposal in list_pending_proposals(slug_dir):
|
||||
out.append(QueuedProposal(proposal=proposal, queue_dir=slug_dir))
|
||||
out.sort(key=lambda q: q.proposal.arrival_timestamp)
|
||||
return out
|
||||
|
||||
|
||||
# --- Operator actions ------------------------------------------------------
|
||||
|
||||
|
||||
@@ -332,6 +359,15 @@ def edit_in_editor(content: str, *, suffix: str = ".tmp") -> str | None:
|
||||
# loop's `bottles` dict; chunks 3/4 wire Enter / `x` to act on it.
|
||||
|
||||
|
||||
def _filter_agents(query: str, names: list[str]) -> list[str]:
|
||||
"""Case-insensitive substring filter for the picker. Pure
|
||||
function — no curses, easy to unit-test."""
|
||||
if not query:
|
||||
return list(names)
|
||||
q = query.lower()
|
||||
return [n for n in names if q in n.lower()]
|
||||
|
||||
|
||||
def _picker_modal(
|
||||
stdscr: "curses._CursesWindow",
|
||||
names: list[str],
|
||||
@@ -591,6 +627,63 @@ def _capture_preflight_text(plan) -> str:
|
||||
return buf.getvalue().strip("\n")
|
||||
|
||||
|
||||
def _running_counts(
|
||||
bottles: dict, agents_now: list[ActiveAgent],
|
||||
) -> dict[str, int]:
|
||||
"""Per-agent running count: dashboard-owned + externally-
|
||||
discovered, summed by agent_name. The picker shows this so the
|
||||
operator knows whether picking an agent starts a fresh bottle
|
||||
or a Nth one."""
|
||||
counts: dict[str, int] = {}
|
||||
for a in agents_now:
|
||||
counts[a.agent_name] = counts.get(a.agent_name, 0) + 1
|
||||
return counts
|
||||
|
||||
|
||||
def _bottle_for_slug(
|
||||
slug: str,
|
||||
bottles: dict,
|
||||
manifest: Manifest | None,
|
||||
) -> tuple["object", str]:
|
||||
"""Return `(bottle_handle, prompt_path_hint)` for a re-attach.
|
||||
If the slug is in `bottles` (dashboard-owned), return the stored
|
||||
handle directly. Otherwise synthesize a bottle from the persisted
|
||||
metadata. The backend field in metadata (PRD 0040) selects Docker
|
||||
or smolmachines; unknown or missing metadata defaults to Docker.
|
||||
|
||||
Returns the empty string for prompt_path_hint when we omit the
|
||||
flag — the caller passes None to DockerBottle in that case."""
|
||||
from ..backend.docker.bottle import DockerBottle
|
||||
from ..backend.docker.bottle_state import read_metadata
|
||||
from ..backend.smolmachines.bottle import SmolmachinesBottle
|
||||
if slug in bottles:
|
||||
_cm, bottle, _identity = bottles[slug]
|
||||
return bottle, ""
|
||||
instance_name = f"bot-bottle-{slug}"
|
||||
prompt_path: str | None = None
|
||||
metadata = read_metadata(slug)
|
||||
if metadata is not None and manifest is not None:
|
||||
agent = manifest.agents.get(metadata.agent_name)
|
||||
if agent is not None and agent.prompt:
|
||||
container_home = os.environ.get(
|
||||
"BOT_BOTTLE_CONTAINER_HOME", "/home/node",
|
||||
)
|
||||
prompt_path = f"{container_home}/.bot-bottle-prompt.txt"
|
||||
backend = metadata.backend if metadata is not None else ""
|
||||
if backend == "smolmachines":
|
||||
synth: object = SmolmachinesBottle(
|
||||
instance_name,
|
||||
prompt_path=prompt_path,
|
||||
)
|
||||
else:
|
||||
synth = DockerBottle(
|
||||
container=instance_name,
|
||||
teardown=lambda: None,
|
||||
prompt_path_in_container=prompt_path,
|
||||
)
|
||||
return synth, (prompt_path or "")
|
||||
|
||||
|
||||
def _stop_bottle_flow(
|
||||
stdscr: "curses._CursesWindow",
|
||||
bottles: dict,
|
||||
@@ -677,6 +770,100 @@ def _stop_bottle_flow(
|
||||
# reused across attaches.
|
||||
|
||||
|
||||
def _in_tmux() -> bool:
|
||||
"""True when the dashboard is running inside a tmux session.
|
||||
Tmux sets `$TMUX` to the path of its server socket."""
|
||||
return bool(os.environ.get("TMUX"))
|
||||
|
||||
|
||||
def _agent_runtime_args(
|
||||
*, resume: bool, remote_control: bool = False, agent_provider_template: str = "claude",
|
||||
) -> list[str]:
|
||||
"""The argv the dashboard hands to `bottle.agent_argv`
|
||||
on every attach — matches what `attach_agent` builds for the
|
||||
foreground handoff so both surfaces produce the same claude
|
||||
invocation."""
|
||||
runtime = runtime_for(agent_provider_template)
|
||||
args = list(runtime.bypass_args)
|
||||
if remote_control:
|
||||
args.extend(runtime.remote_control_args)
|
||||
if resume:
|
||||
args.extend(runtime.resume_args)
|
||||
return args
|
||||
|
||||
|
||||
def _build_resume_argv_with_fallback(
|
||||
bottle, *, remote_control: bool = False, agent_provider_template: str = "claude",
|
||||
) -> list[str]:
|
||||
"""Build a backend-exec argv that runs `claude --continue` and
|
||||
falls back to plain `claude` if no prior session exists.
|
||||
|
||||
`--continue` exits non-zero when an agent has been spun up
|
||||
but never typed at — there's no transcript to resume. The
|
||||
shell-level `||` wrapper makes that case start a fresh
|
||||
session instead of crashing the pane. The trade-off: we
|
||||
invoke `sh -c` inside the bottle, so the command is two
|
||||
`claude` invocations behind a tiny shell rather than one
|
||||
direct exec. Acceptable; the shell adds microseconds and
|
||||
the fallback only kicks in when --continue would have
|
||||
failed anyway.
|
||||
|
||||
Works across backends because `bottle.agent_argv` always
|
||||
surfaces the `claude` token preceded by the backend's exec
|
||||
framing (docker: `docker exec -it <c>`; smolmachines:
|
||||
`smolvm machine exec --name <m> -- runuser -u node --`).
|
||||
Splitting at `claude` keeps the framing as the prefix and
|
||||
wraps just the agent tail in `sh -c`."""
|
||||
if agent_provider_template != "claude":
|
||||
return bottle.agent_argv(
|
||||
_agent_runtime_args(
|
||||
resume=True,
|
||||
remote_control=remote_control,
|
||||
agent_provider_template=agent_provider_template,
|
||||
)
|
||||
)
|
||||
base_args = _agent_runtime_args(
|
||||
resume=False,
|
||||
remote_control=remote_control,
|
||||
agent_provider_template=agent_provider_template,
|
||||
)
|
||||
base_exec = bottle.agent_argv(base_args)
|
||||
# Split exec-framing prefix from the agent-and-args tail so
|
||||
# we can compose `<claude…> --continue || <claude…>` inside
|
||||
# `sh -c`. The provider command token is the marker.
|
||||
command = getattr(bottle, "agent_command", runtime_for(agent_provider_template).command)
|
||||
agent_idx = base_exec.index(command)
|
||||
prefix = base_exec[:agent_idx]
|
||||
agent_cmd = " ".join(shlex.quote(a) for a in base_exec[agent_idx:])
|
||||
resume_args = " ".join(
|
||||
shlex.quote(a) for a in runtime_for(agent_provider_template).resume_args
|
||||
)
|
||||
return [
|
||||
*prefix,
|
||||
"sh", "-c",
|
||||
f"{agent_cmd} {resume_args} || {agent_cmd}",
|
||||
]
|
||||
|
||||
|
||||
def _build_split_pane_argv(agent_argv: list[str]) -> list[str]:
|
||||
"""Pure helper: wrap a backend-exec argv with `tmux split-window
|
||||
-h -P -F '#{pane_id}'`. The `-P -F` combo tells tmux to print
|
||||
the new pane's id on stdout so we can track it for later
|
||||
`respawn-pane` calls."""
|
||||
return [
|
||||
"tmux", "split-window", "-h",
|
||||
"-P", "-F", "#{pane_id}",
|
||||
*agent_argv,
|
||||
]
|
||||
|
||||
|
||||
def _build_respawn_pane_argv(pane_id: str, agent_argv: list[str]) -> list[str]:
|
||||
"""Pure helper: wrap a backend-exec argv with `tmux respawn-pane
|
||||
-k -t <pane_id>`. `-k` kills the existing process in the pane
|
||||
before respawning."""
|
||||
return ["tmux", "respawn-pane", "-k", "-t", pane_id, *agent_argv]
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _redirect_stderr_to_file(path):
|
||||
"""Redirect file descriptor 2 (stderr) to `path` for the
|
||||
@@ -793,6 +980,24 @@ def _tmux_close_right_pane(tmux_state: dict) -> None:
|
||||
tmux_state["slug"] = None
|
||||
|
||||
|
||||
def _pick_next_after_stop(
|
||||
agents_before: list[ActiveAgent],
|
||||
selected_index: int,
|
||||
stopped_slug: str,
|
||||
) -> tuple[int, ActiveAgent] | None:
|
||||
"""After stopping `stopped_slug` from the agents list, choose
|
||||
the agent that should take focus next. The agent below the
|
||||
stopped row (which slides up to fill its index) is the
|
||||
natural pick; if the stopped agent was last, the row above
|
||||
instead. Returns (new_index, agent) or None if no agents
|
||||
remain. Pure — easy to unit-test."""
|
||||
new_agents = [a for a in agents_before if a.slug != stopped_slug]
|
||||
if not new_agents:
|
||||
return None
|
||||
new_index = min(max(selected_index, 0), len(new_agents) - 1)
|
||||
return new_index, new_agents[new_index]
|
||||
|
||||
|
||||
def _ensure_right_pane(tmux_state: dict, argv: list[str]) -> str | None:
|
||||
"""Run `argv` in the dashboard's right pane — respawn an
|
||||
existing tracked pane if one is alive, split-window to
|
||||
@@ -1150,6 +1355,31 @@ def _list_once() -> int:
|
||||
return 0
|
||||
|
||||
|
||||
_REFRESH_INTERVAL_MS = 1000
|
||||
|
||||
# How long a newly-arrived proposal stays highlighted (green) in the
|
||||
# list. Long enough for the operator to notice in their peripheral
|
||||
# vision, short enough to fade before the queue feels permanently
|
||||
# noisy.
|
||||
_NEW_PROPOSAL_HIGHLIGHT_SEC = 5.0
|
||||
|
||||
|
||||
def _is_recent(
|
||||
proposal_id: str,
|
||||
first_seen: dict[str, float] | None,
|
||||
now: float | None,
|
||||
) -> bool:
|
||||
"""True if `proposal_id` was first seen within the highlight
|
||||
window. Both `first_seen` and `now` may be None (rendered as
|
||||
not-recent) so the helper is safe in cold-start paths."""
|
||||
if first_seen is None or now is None:
|
||||
return False
|
||||
started = first_seen.get(proposal_id)
|
||||
if started is None:
|
||||
return False
|
||||
return (now - started) < _NEW_PROPOSAL_HIGHLIGHT_SEC
|
||||
|
||||
|
||||
def _try_init_green() -> int:
|
||||
"""Initialise a green color pair and return its attr, or 0 if the
|
||||
terminal doesn't support color. Caller ORs the returned value
|
||||
@@ -1187,6 +1417,14 @@ def _quit_without_teardown(bottles: dict) -> None:
|
||||
os._exit(0)
|
||||
|
||||
|
||||
# PRD 0019 chunk 3: which pane the j/k/arrow keys move through.
|
||||
# Tab toggles. The proposals pane is the default focus — proposal
|
||||
# action keys (a/m/r/Enter) require it; agent-scoped keys (e/p,
|
||||
# chunk 4) require the agents pane.
|
||||
PANE_PROPOSALS = "proposals"
|
||||
PANE_AGENTS = "agents"
|
||||
|
||||
|
||||
def _main_loop(stdscr: "curses._CursesWindow") -> None:
|
||||
curses.curs_set(0)
|
||||
# Auto-refresh: getch() returns -1 after the timeout if no key
|
||||
@@ -1573,6 +1811,63 @@ def _render(
|
||||
stdscr.refresh()
|
||||
|
||||
|
||||
def _selection_status(
|
||||
focus: str, agents: list[ActiveAgent], selected_agent: int,
|
||||
) -> str:
|
||||
"""Status-line text for the idle state. Surfaces the agents-
|
||||
pane selection so the operator can tell what an agent-scoped
|
||||
edit verb would target."""
|
||||
if focus != PANE_AGENTS:
|
||||
return ""
|
||||
if not agents:
|
||||
return "[no active agents]"
|
||||
if 0 <= selected_agent < len(agents):
|
||||
return f"[selected: {agents[selected_agent].slug}]"
|
||||
return "[no agent selected]"
|
||||
|
||||
|
||||
def _selected_agent(
|
||||
focus: str, agents: list[ActiveAgent], selected_agent: int,
|
||||
) -> ActiveAgent | None:
|
||||
"""The selected agent to scope `e` / `p` to, or None if no
|
||||
selection is valid (proposals pane focused, no active agents,
|
||||
or selection out of bounds)."""
|
||||
if focus != PANE_AGENTS:
|
||||
return None
|
||||
if not agents:
|
||||
return None
|
||||
if 0 <= selected_agent < len(agents):
|
||||
return agents[selected_agent]
|
||||
return None
|
||||
|
||||
|
||||
def _format_agent_row(a: ActiveAgent, maxw: int) -> str:
|
||||
"""One-line agent row: ` [<backend>] <slug> <agent_name> started
|
||||
<HH:MM:SS> [<sidecars>]`. The `agent` service is filtered out of
|
||||
the displayed list — it's always present for an active bottle,
|
||||
so listing it carries no information; the sidecars are the
|
||||
differentiator.
|
||||
|
||||
The `[docker]` / `[smolmachines]` prefix lets the operator tell
|
||||
which backend a bottle came from (issue #77). Truncated to
|
||||
`maxw` because the renderer's addnstr only enforces width if
|
||||
we hand it a properly-sized string."""
|
||||
started = (
|
||||
a.started_at.split("T", 1)[1][:8]
|
||||
if "T" in a.started_at else (a.started_at or "?")
|
||||
)
|
||||
sidecars = tuple(s for s in a.services if s != "agent")
|
||||
services = ",".join(sidecars) if sidecars else "(starting)"
|
||||
backend_tag = f"[{a.backend_name}]" if a.backend_name else ""
|
||||
line = (
|
||||
f" {backend_tag} {a.slug} {a.agent_name} "
|
||||
f"started {started} [{services}]"
|
||||
)
|
||||
if len(line) > maxw:
|
||||
return line[: max(0, maxw - 1)] + "…"
|
||||
return line
|
||||
|
||||
|
||||
def _detail_view(
|
||||
stdscr: "curses._CursesWindow",
|
||||
qp: QueuedProposal,
|
||||
@@ -1626,6 +1921,66 @@ def _detail_view(
|
||||
return
|
||||
|
||||
|
||||
def _detail_lines(
|
||||
qp: QueuedProposal,
|
||||
*,
|
||||
green_attr: int = 0,
|
||||
) -> list[tuple[str, int]]:
|
||||
"""Return the detail-view body as (text, curses-attr) tuples.
|
||||
Most lines are plain (attr=0); pipelock-block proposals append
|
||||
a green "→ would allow host: ..." line so the operator sees at
|
||||
a glance which hostname will land in pipelock's allowlist if
|
||||
they hit approve. The URL itself is shown above for context."""
|
||||
p = qp.proposal
|
||||
out: list[tuple[str, int]] = [
|
||||
(f"bottle: {p.bottle_slug}", 0),
|
||||
(f"tool: {p.tool}", 0),
|
||||
(f"id: {p.id}", 0),
|
||||
(f"arrived: {p.arrival_timestamp}", 0),
|
||||
(f"queue: {qp.queue_dir}", 0),
|
||||
("", 0),
|
||||
("justification:", 0),
|
||||
]
|
||||
out.extend((" " + line, 0) for line in p.justification.splitlines() or [""])
|
||||
out.extend([
|
||||
("", 0),
|
||||
(_proposed_payload_label(p.tool) + ":", 0),
|
||||
])
|
||||
out.extend((line, 0) for line in p.proposed_file.splitlines() or [""])
|
||||
if p.tool == TOOL_PIPELOCK_BLOCK:
|
||||
host = _failed_url_host(p.proposed_file)
|
||||
if host:
|
||||
# Show the literal line that will be appended to the
|
||||
# bottle's pipelock allowlist on approve. Green so it
|
||||
# reads as "what changes"; the URL above carries the
|
||||
# path context (which pipelock can't enforce — see the
|
||||
# follow-up note on _apply_pipelock_url).
|
||||
out.append(("", 0))
|
||||
out.append((host, green_attr))
|
||||
return out
|
||||
|
||||
|
||||
def _failed_url_host(url: str) -> str:
|
||||
"""Best-effort hostname extraction from a pipelock-block proposal's
|
||||
failed_url payload. Returns empty string on unparseable input —
|
||||
callers handle empty as "nothing to highlight"."""
|
||||
import urllib.parse
|
||||
try:
|
||||
return urllib.parse.urlsplit(url.strip()).hostname or ""
|
||||
except ValueError:
|
||||
return ""
|
||||
|
||||
|
||||
def _proposed_payload_label(tool: str) -> str:
|
||||
"""The detail-view section heading for the proposal's payload —
|
||||
`proposed_file` is what the dataclass calls it, but for
|
||||
pipelock-block the payload is a single URL not a file. Render
|
||||
the label per tool so the operator's eye matches."""
|
||||
if tool == TOOL_PIPELOCK_BLOCK:
|
||||
return "failed URL"
|
||||
return "proposed file"
|
||||
|
||||
|
||||
def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None:
|
||||
"""Suspend curses, open $EDITOR on the proposed file, return the
|
||||
edited content (or None if unchanged)."""
|
||||
@@ -1638,6 +1993,13 @@ def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None:
|
||||
return edited
|
||||
|
||||
|
||||
def _suffix_for_tool(tool: str) -> str:
|
||||
if tool == TOOL_CAPABILITY_BLOCK:
|
||||
return ".dockerfile"
|
||||
# egress-block / pipelock-block: JSON-ish + plain.
|
||||
return ".txt"
|
||||
|
||||
|
||||
def _operator_edit_routes_flow(
|
||||
stdscr: "curses._CursesWindow", agent: ActiveAgent,
|
||||
) -> str:
|
||||
|
||||
@@ -1,421 +0,0 @@
|
||||
"""dashboard_model: state/model layer for the dashboard TUI.
|
||||
|
||||
Data structures, discovery queries, pure state helpers, and derived
|
||||
values extracted from dashboard.py so they can be tested in isolation
|
||||
and navigated without wading through curses rendering code.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import shlex
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
from .. import supervise as _supervise
|
||||
from ..agent_provider import runtime_for
|
||||
from ..backend import ActiveAgent, enumerate_active_agents
|
||||
from ..backend.docker.capability_apply import CapabilityApplyError
|
||||
from ..backend.docker.egress_apply import EgressApplyError
|
||||
from ..backend.docker.pipelock_apply import PipelockApplyError
|
||||
from ..manifest import Manifest
|
||||
from ..supervise import (
|
||||
TOOL_CAPABILITY_BLOCK,
|
||||
TOOL_PIPELOCK_BLOCK,
|
||||
Proposal,
|
||||
list_pending_proposals,
|
||||
)
|
||||
|
||||
|
||||
# --- Constants ---------------------------------------------------------------
|
||||
|
||||
|
||||
_REFRESH_INTERVAL_MS = 1000
|
||||
|
||||
_NEW_PROPOSAL_HIGHLIGHT_SEC = 5.0
|
||||
|
||||
PANE_PROPOSALS = "proposals"
|
||||
PANE_AGENTS = "agents"
|
||||
|
||||
|
||||
# --- Data structures ---------------------------------------------------------
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class QueuedProposal:
|
||||
"""A pending proposal plus the queue dir it was found in."""
|
||||
|
||||
proposal: Proposal
|
||||
queue_dir: Path
|
||||
|
||||
|
||||
ApplyError = (EgressApplyError, PipelockApplyError, CapabilityApplyError)
|
||||
|
||||
|
||||
# --- Discovery ---------------------------------------------------------------
|
||||
|
||||
|
||||
def discover_active_agents() -> list[ActiveAgent]:
|
||||
"""All currently-running agents across every backend with
|
||||
their metadata + service set. Returns [] when neither
|
||||
backend is reachable. Backed by the shared
|
||||
`enumerate_active_agents` helper so the CLI's
|
||||
`./cli.py list active` and this dashboard show the same data."""
|
||||
return enumerate_active_agents()
|
||||
|
||||
|
||||
def discover_pending() -> list[QueuedProposal]:
|
||||
"""Walk ~/.bot-bottle/queue/* and collect pending proposals
|
||||
from every bottle's queue. Sorted by arrival time across the
|
||||
union — the operator works the global FIFO."""
|
||||
queue_root = _supervise.bot_bottle_root() / "queue"
|
||||
if not queue_root.is_dir():
|
||||
return []
|
||||
out: list[QueuedProposal] = []
|
||||
for slug_dir in sorted(queue_root.iterdir()):
|
||||
if not slug_dir.is_dir():
|
||||
continue
|
||||
for proposal in list_pending_proposals(slug_dir):
|
||||
out.append(QueuedProposal(proposal=proposal, queue_dir=slug_dir))
|
||||
out.sort(key=lambda q: q.proposal.arrival_timestamp)
|
||||
return out
|
||||
|
||||
|
||||
# --- Derived values ----------------------------------------------------------
|
||||
|
||||
|
||||
def _approval_status(qp: QueuedProposal, verb: str) -> str:
|
||||
"""Status-line text after a successful approval. For capability-
|
||||
block, append the `resume <identity>` hint so the operator can
|
||||
bring the rebuilt bottle back up with one copy-paste."""
|
||||
base = f"{verb} {qp.proposal.tool} for [{qp.proposal.bottle_slug}]"
|
||||
if qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
|
||||
return f"{base}; resume: ./cli.py resume {qp.proposal.bottle_slug}"
|
||||
return base
|
||||
|
||||
|
||||
def _is_recent(
|
||||
proposal_id: str,
|
||||
first_seen: dict[str, float] | None,
|
||||
now: float | None,
|
||||
) -> bool:
|
||||
"""True if `proposal_id` was first seen within the highlight
|
||||
window. Both `first_seen` and `now` may be None (rendered as
|
||||
not-recent) so the helper is safe in cold-start paths."""
|
||||
if first_seen is None or now is None:
|
||||
return False
|
||||
started = first_seen.get(proposal_id)
|
||||
if started is None:
|
||||
return False
|
||||
return (now - started) < _NEW_PROPOSAL_HIGHLIGHT_SEC
|
||||
|
||||
|
||||
def _selection_status(
|
||||
focus: str, agents: list[ActiveAgent], selected_agent: int,
|
||||
) -> str:
|
||||
"""Status-line text for the idle state. Surfaces the agents-
|
||||
pane selection so the operator can tell what an agent-scoped
|
||||
edit verb would target."""
|
||||
if focus != PANE_AGENTS:
|
||||
return ""
|
||||
if not agents:
|
||||
return "[no active agents]"
|
||||
if 0 <= selected_agent < len(agents):
|
||||
return f"[selected: {agents[selected_agent].slug}]"
|
||||
return "[no agent selected]"
|
||||
|
||||
|
||||
def _selected_agent(
|
||||
focus: str, agents: list[ActiveAgent], selected_agent: int,
|
||||
) -> ActiveAgent | None:
|
||||
"""The selected agent to scope `e` / `p` to, or None if no
|
||||
selection is valid (proposals pane focused, no active agents,
|
||||
or selection out of bounds)."""
|
||||
if focus != PANE_AGENTS:
|
||||
return None
|
||||
if not agents:
|
||||
return None
|
||||
if 0 <= selected_agent < len(agents):
|
||||
return agents[selected_agent]
|
||||
return None
|
||||
|
||||
|
||||
# --- Picker helpers ----------------------------------------------------------
|
||||
|
||||
|
||||
def _filter_agents(query: str, names: list[str]) -> list[str]:
|
||||
"""Case-insensitive substring filter for the picker. Pure
|
||||
function — no curses, easy to unit-test."""
|
||||
if not query:
|
||||
return list(names)
|
||||
q = query.lower()
|
||||
return [n for n in names if q in n.lower()]
|
||||
|
||||
|
||||
def _running_counts(
|
||||
bottles: dict, agents_now: list[ActiveAgent],
|
||||
) -> dict[str, int]:
|
||||
"""Per-agent running count: dashboard-owned + externally-
|
||||
discovered, summed by agent_name. The picker shows this so the
|
||||
operator knows whether picking an agent starts a fresh bottle
|
||||
or a Nth one."""
|
||||
counts: dict[str, int] = {}
|
||||
for a in agents_now:
|
||||
counts[a.agent_name] = counts.get(a.agent_name, 0) + 1
|
||||
return counts
|
||||
|
||||
|
||||
# --- Agent-row rendering helpers ---------------------------------------------
|
||||
|
||||
|
||||
def _format_agent_row(a: ActiveAgent, maxw: int) -> str:
|
||||
"""One-line agent row: ` [<backend>] <slug> <agent_name> started
|
||||
<HH:MM:SS> [<sidecars>]`. The `agent` service is filtered out of
|
||||
the displayed list — it's always present for an active bottle,
|
||||
so listing it carries no information; the sidecars are the
|
||||
differentiator.
|
||||
|
||||
The `[docker]` / `[smolmachines]` prefix lets the operator tell
|
||||
which backend a bottle came from (issue #77). Truncated to
|
||||
`maxw` because the renderer's addnstr only enforces width if
|
||||
we hand it a properly-sized string."""
|
||||
started = (
|
||||
a.started_at.split("T", 1)[1][:8]
|
||||
if "T" in a.started_at else (a.started_at or "?")
|
||||
)
|
||||
sidecars = tuple(s for s in a.services if s != "agent")
|
||||
services = ",".join(sidecars) if sidecars else "(starting)"
|
||||
backend_tag = f"[{a.backend_name}]" if a.backend_name else ""
|
||||
line = (
|
||||
f" {backend_tag} {a.slug} {a.agent_name} "
|
||||
f"started {started} [{services}]"
|
||||
)
|
||||
if len(line) > maxw:
|
||||
return line[: max(0, maxw - 1)] + "…"
|
||||
return line
|
||||
|
||||
|
||||
# --- Detail-view helpers -----------------------------------------------------
|
||||
|
||||
|
||||
def _detail_lines(
|
||||
qp: QueuedProposal,
|
||||
*,
|
||||
green_attr: int = 0,
|
||||
) -> list[tuple[str, int]]:
|
||||
"""Return the detail-view body as (text, curses-attr) tuples.
|
||||
Most lines are plain (attr=0); pipelock-block proposals append
|
||||
a green "→ would allow host: ..." line so the operator sees at
|
||||
a glance which hostname will land in pipelock's allowlist if
|
||||
they hit approve. The URL itself is shown above for context."""
|
||||
p = qp.proposal
|
||||
out: list[tuple[str, int]] = [
|
||||
(f"bottle: {p.bottle_slug}", 0),
|
||||
(f"tool: {p.tool}", 0),
|
||||
(f"id: {p.id}", 0),
|
||||
(f"arrived: {p.arrival_timestamp}", 0),
|
||||
(f"queue: {qp.queue_dir}", 0),
|
||||
("", 0),
|
||||
("justification:", 0),
|
||||
]
|
||||
out.extend((" " + line, 0) for line in p.justification.splitlines() or [""])
|
||||
out.extend([
|
||||
("", 0),
|
||||
(_proposed_payload_label(p.tool) + ":", 0),
|
||||
])
|
||||
out.extend((line, 0) for line in p.proposed_file.splitlines() or [""])
|
||||
if p.tool == TOOL_PIPELOCK_BLOCK:
|
||||
host = _failed_url_host(p.proposed_file)
|
||||
if host:
|
||||
out.append(("", 0))
|
||||
out.append((host, green_attr))
|
||||
return out
|
||||
|
||||
|
||||
def _failed_url_host(url: str) -> str:
|
||||
"""Best-effort hostname extraction from a pipelock-block proposal's
|
||||
failed_url payload. Returns empty string on unparseable input —
|
||||
callers handle empty as "nothing to highlight"."""
|
||||
import urllib.parse
|
||||
try:
|
||||
return urllib.parse.urlsplit(url.strip()).hostname or ""
|
||||
except ValueError:
|
||||
return ""
|
||||
|
||||
|
||||
def _proposed_payload_label(tool: str) -> str:
|
||||
"""The detail-view section heading for the proposal's payload —
|
||||
`proposed_file` is what the dataclass calls it, but for
|
||||
pipelock-block the payload is a single URL not a file. Render
|
||||
the label per tool so the operator's eye matches."""
|
||||
if tool == TOOL_PIPELOCK_BLOCK:
|
||||
return "failed URL"
|
||||
return "proposed file"
|
||||
|
||||
|
||||
def _suffix_for_tool(tool: str) -> str:
|
||||
if tool == TOOL_CAPABILITY_BLOCK:
|
||||
return ".dockerfile"
|
||||
return ".txt"
|
||||
|
||||
|
||||
# --- Bottle/agent resolution -------------------------------------------------
|
||||
|
||||
|
||||
def _bottle_for_slug(
|
||||
slug: str,
|
||||
bottles: dict,
|
||||
manifest: Manifest | None,
|
||||
) -> tuple["object", str]:
|
||||
"""Return `(bottle_handle, prompt_path_hint)` for a re-attach.
|
||||
If the slug is in `bottles` (dashboard-owned), return the stored
|
||||
handle directly. Otherwise synthesize a bottle from the persisted
|
||||
metadata. The backend field in metadata (PRD 0040) selects Docker
|
||||
or smolmachines; unknown or missing metadata defaults to Docker.
|
||||
|
||||
Returns the empty string for prompt_path_hint when we omit the
|
||||
flag — the caller passes None to DockerBottle in that case."""
|
||||
from ..backend.docker.bottle import DockerBottle
|
||||
from ..backend.docker.bottle_state import read_metadata
|
||||
from ..backend.smolmachines.bottle import SmolmachinesBottle
|
||||
if slug in bottles:
|
||||
_cm, bottle, _identity = bottles[slug]
|
||||
return bottle, ""
|
||||
instance_name = f"bot-bottle-{slug}"
|
||||
prompt_path: str | None = None
|
||||
metadata = read_metadata(slug)
|
||||
if metadata is not None and manifest is not None:
|
||||
agent = manifest.agents.get(metadata.agent_name)
|
||||
if agent is not None and agent.prompt:
|
||||
container_home = os.environ.get(
|
||||
"BOT_BOTTLE_CONTAINER_HOME", "/home/node",
|
||||
)
|
||||
prompt_path = f"{container_home}/.bot-bottle-prompt.txt"
|
||||
backend = metadata.backend if metadata is not None else ""
|
||||
if backend == "smolmachines":
|
||||
synth: object = SmolmachinesBottle(
|
||||
instance_name,
|
||||
prompt_path=prompt_path,
|
||||
)
|
||||
else:
|
||||
synth = DockerBottle(
|
||||
container=instance_name,
|
||||
teardown=lambda: None,
|
||||
prompt_path_in_container=prompt_path,
|
||||
)
|
||||
return synth, (prompt_path or "")
|
||||
|
||||
|
||||
def _pick_next_after_stop(
|
||||
agents_before: list[ActiveAgent],
|
||||
selected_index: int,
|
||||
stopped_slug: str,
|
||||
) -> tuple[int, ActiveAgent] | None:
|
||||
"""After stopping `stopped_slug` from the agents list, choose
|
||||
the agent that should take focus next. The agent below the
|
||||
stopped row (which slides up to fill its index) is the
|
||||
natural pick; if the stopped agent was last, the row above
|
||||
instead. Returns (new_index, agent) or None if no agents
|
||||
remain. Pure — easy to unit-test."""
|
||||
new_agents = [a for a in agents_before if a.slug != stopped_slug]
|
||||
if not new_agents:
|
||||
return None
|
||||
new_index = min(max(selected_index, 0), len(new_agents) - 1)
|
||||
return new_index, new_agents[new_index]
|
||||
|
||||
|
||||
# --- tmux argv builders ------------------------------------------------------
|
||||
|
||||
|
||||
def _in_tmux() -> bool:
|
||||
"""True when the dashboard is running inside a tmux session.
|
||||
Tmux sets `$TMUX` to the path of its server socket."""
|
||||
return bool(os.environ.get("TMUX"))
|
||||
|
||||
|
||||
def _agent_runtime_args(
|
||||
*, resume: bool, remote_control: bool = False, agent_provider_template: str = "claude",
|
||||
) -> list[str]:
|
||||
"""The argv the dashboard hands to `bottle.agent_argv`
|
||||
on every attach — matches what `attach_agent` builds for the
|
||||
foreground handoff so both surfaces produce the same claude
|
||||
invocation."""
|
||||
runtime = runtime_for(agent_provider_template)
|
||||
args = list(runtime.bypass_args)
|
||||
if remote_control:
|
||||
args.extend(runtime.remote_control_args)
|
||||
if resume:
|
||||
args.extend(runtime.resume_args)
|
||||
return args
|
||||
|
||||
|
||||
def _build_resume_argv_with_fallback(
|
||||
bottle, *, remote_control: bool = False, agent_provider_template: str = "claude",
|
||||
) -> list[str]:
|
||||
"""Build a backend-exec argv that runs `claude --continue` and
|
||||
falls back to plain `claude` if no prior session exists.
|
||||
|
||||
`--continue` exits non-zero when an agent has been spun up
|
||||
but never typed at — there's no transcript to resume. The
|
||||
shell-level `||` wrapper makes that case start a fresh
|
||||
session instead of crashing the pane. The trade-off: we
|
||||
invoke `sh -c` inside the bottle, so the command is two
|
||||
`claude` invocations behind a tiny shell rather than one
|
||||
direct exec. Acceptable; the shell adds microseconds and
|
||||
the fallback only kicks in when --continue would have
|
||||
failed anyway.
|
||||
|
||||
Works across backends because `bottle.agent_argv` always
|
||||
surfaces the `claude` token preceded by the backend's exec
|
||||
framing (docker: `docker exec -it <c>`; smolmachines:
|
||||
`smolvm machine exec --name <m> -- runuser -u node --`).
|
||||
Splitting at `claude` keeps the framing as the prefix and
|
||||
wraps just the agent tail in `sh -c`."""
|
||||
if agent_provider_template != "claude":
|
||||
return bottle.agent_argv(
|
||||
_agent_runtime_args(
|
||||
resume=True,
|
||||
remote_control=remote_control,
|
||||
agent_provider_template=agent_provider_template,
|
||||
)
|
||||
)
|
||||
base_args = _agent_runtime_args(
|
||||
resume=False,
|
||||
remote_control=remote_control,
|
||||
agent_provider_template=agent_provider_template,
|
||||
)
|
||||
base_exec = bottle.agent_argv(base_args)
|
||||
# Split exec-framing prefix from the agent-and-args tail so
|
||||
# we can compose `<claude…> --continue || <claude…>` inside
|
||||
# `sh -c`. The provider command token is the marker.
|
||||
command = getattr(bottle, "agent_command", runtime_for(agent_provider_template).command)
|
||||
agent_idx = base_exec.index(command)
|
||||
prefix = base_exec[:agent_idx]
|
||||
agent_cmd = " ".join(shlex.quote(a) for a in base_exec[agent_idx:])
|
||||
resume_args = " ".join(
|
||||
shlex.quote(a) for a in runtime_for(agent_provider_template).resume_args
|
||||
)
|
||||
return [
|
||||
*prefix,
|
||||
"sh", "-c",
|
||||
f"{agent_cmd} {resume_args} || {agent_cmd}",
|
||||
]
|
||||
|
||||
|
||||
def _build_split_pane_argv(agent_argv: list[str]) -> list[str]:
|
||||
"""Pure helper: wrap a backend-exec argv with `tmux split-window
|
||||
-h -P -F '#{pane_id}'`. The `-P -F` combo tells tmux to print
|
||||
the new pane's id on stdout so we can track it for later
|
||||
`respawn-pane` calls."""
|
||||
return [
|
||||
"tmux", "split-window", "-h",
|
||||
"-P", "-F", "#{pane_id}",
|
||||
*agent_argv,
|
||||
]
|
||||
|
||||
|
||||
def _build_respawn_pane_argv(pane_id: str, agent_argv: list[str]) -> list[str]:
|
||||
"""Pure helper: wrap a backend-exec argv with `tmux respawn-pane
|
||||
-k -t <pane_id>`. `-k` kills the existing process in the pane
|
||||
before respawning."""
|
||||
return ["tmux", "respawn-pane", "-k", "-t", pane_id, *agent_argv]
|
||||
@@ -1,121 +0,0 @@
|
||||
"""Gitea deploy-key provisioner (PRD 0048, contrib).
|
||||
|
||||
Generates ed25519 keypairs via `ssh-keygen` and registers / deletes
|
||||
them using the Gitea deploy-key HTTP API. No new Python dependencies —
|
||||
only stdlib `urllib.request` and `subprocess`."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import subprocess
|
||||
import tempfile
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
from pathlib import Path
|
||||
|
||||
from ...deploy_key_provisioner import DeployKeyProvisioner
|
||||
|
||||
|
||||
class GiteaDeployKeyProvisioner(DeployKeyProvisioner):
|
||||
"""Manages deploy keys on a Gitea instance."""
|
||||
|
||||
def __init__(self, *, token: str, api_url: str) -> None:
|
||||
self._token = token
|
||||
self._api_url = api_url.rstrip("/")
|
||||
|
||||
def create(self, owner_repo: str, title: str) -> tuple[str, bytes]:
|
||||
"""Generate an ed25519 keypair, register the public half as a
|
||||
repo deploy key, and return `(key_id, private_key_bytes)`.
|
||||
|
||||
The key is registered with `read_only=False` because git-gate
|
||||
needs push access to forward gitleaks-scanned refs upstream."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
key_path = Path(tmpdir) / "key"
|
||||
subprocess.run(
|
||||
[
|
||||
"ssh-keygen", "-t", "ed25519",
|
||||
"-f", str(key_path),
|
||||
"-N", "",
|
||||
],
|
||||
check=True,
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL,
|
||||
)
|
||||
private_key = key_path.read_bytes()
|
||||
public_key = key_path.with_suffix(".pub").read_text().strip()
|
||||
|
||||
owner, repo = _split_owner_repo(owner_repo)
|
||||
url = f"{self._api_url}/api/v1/repos/{owner}/{repo}/keys"
|
||||
payload = json.dumps({
|
||||
"key": public_key,
|
||||
"read_only": False,
|
||||
"title": title,
|
||||
}).encode()
|
||||
req = urllib.request.Request(
|
||||
url,
|
||||
data=payload,
|
||||
headers={
|
||||
"Authorization": f"token {self._token}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
method="POST",
|
||||
)
|
||||
try:
|
||||
with urllib.request.urlopen(req) as resp:
|
||||
body = json.loads(resp.read())
|
||||
except urllib.error.HTTPError as exc:
|
||||
_body = _read_error_body(exc)
|
||||
raise RuntimeError(
|
||||
f"failed to create deploy key for {owner_repo}: "
|
||||
f"HTTP {exc.code} — {_body}"
|
||||
) from exc
|
||||
except urllib.error.URLError as exc:
|
||||
raise RuntimeError(
|
||||
f"failed to create deploy key for {owner_repo}: {exc.reason}"
|
||||
) from exc
|
||||
|
||||
return str(body["id"]), private_key
|
||||
|
||||
def delete(self, owner_repo: str, key_id: str) -> None:
|
||||
"""Delete the deploy key. HTTP 404 (already gone) is success.
|
||||
All other errors raise RuntimeError so teardown halts."""
|
||||
owner, repo = _split_owner_repo(owner_repo)
|
||||
url = f"{self._api_url}/api/v1/repos/{owner}/{repo}/keys/{key_id}"
|
||||
req = urllib.request.Request(
|
||||
url,
|
||||
headers={"Authorization": f"token {self._token}"},
|
||||
method="DELETE",
|
||||
)
|
||||
try:
|
||||
with urllib.request.urlopen(req):
|
||||
pass
|
||||
except urllib.error.HTTPError as exc:
|
||||
if exc.code == 404:
|
||||
return
|
||||
_body = _read_error_body(exc)
|
||||
raise RuntimeError(
|
||||
f"failed to delete deploy key {key_id} for {owner_repo}: "
|
||||
f"HTTP {exc.code} — {_body}"
|
||||
) from exc
|
||||
except urllib.error.URLError as exc:
|
||||
raise RuntimeError(
|
||||
f"failed to delete deploy key {key_id} for {owner_repo}: "
|
||||
f"{exc.reason}"
|
||||
) from exc
|
||||
|
||||
|
||||
def _split_owner_repo(owner_repo: str) -> tuple[str, str]:
|
||||
"""Split `'owner/repo'` into `('owner', 'repo')`."""
|
||||
parts = owner_repo.split("/", 1)
|
||||
if len(parts) != 2 or not all(parts):
|
||||
raise ValueError(
|
||||
f"expected 'owner/repo' format, got {owner_repo!r}"
|
||||
)
|
||||
return parts[0], parts[1]
|
||||
|
||||
|
||||
def _read_error_body(exc: urllib.error.HTTPError) -> str:
|
||||
try:
|
||||
return exc.read().decode("utf-8", errors="replace")
|
||||
except Exception:
|
||||
return ""
|
||||
@@ -1,52 +0,0 @@
|
||||
"""Deploy-key provisioner interface and factory (PRD 0048).
|
||||
|
||||
The core defines the abstract contract; concrete implementations live
|
||||
in `bot_bottle/contrib/<provider>/deploy_key_provisioner.py`. The
|
||||
factory `get_provisioner` imports contrib modules lazily so that a
|
||||
missing optional dependency in one provider doesn't break unrelated
|
||||
features."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
|
||||
class DeployKeyProvisioner(ABC):
|
||||
"""Manages a single deploy-key lifecycle on a remote forge."""
|
||||
|
||||
@abstractmethod
|
||||
def create(self, owner_repo: str, title: str) -> tuple[str, bytes]:
|
||||
"""Generate a keypair and register the public half as a
|
||||
deploy key on the forge.
|
||||
|
||||
`owner_repo` is the `<owner>/<repo>` path (no `.git` suffix).
|
||||
`title` is the human-readable label shown in the forge UI.
|
||||
|
||||
Returns `(key_id, private_key_bytes)` where `key_id` is opaque
|
||||
to the caller and is only ever passed back to `delete`."""
|
||||
|
||||
@abstractmethod
|
||||
def delete(self, owner_repo: str, key_id: str) -> None:
|
||||
"""Delete the registered deploy key.
|
||||
|
||||
Must not raise if the key is already absent (HTTP 404 is
|
||||
success). Must raise for all other failures so teardown halts."""
|
||||
|
||||
|
||||
def get_provisioner(
|
||||
provider: str, token: str, api_url: str
|
||||
) -> DeployKeyProvisioner:
|
||||
"""Instantiate the contrib provisioner for `provider`.
|
||||
|
||||
Raises `ManifestError` for unknown providers so the error surfaces
|
||||
at parse time rather than at runtime."""
|
||||
if provider == "gitea":
|
||||
from bot_bottle.contrib.gitea.deploy_key_provisioner import (
|
||||
GiteaDeployKeyProvisioner,
|
||||
)
|
||||
return GiteaDeployKeyProvisioner(token=token, api_url=api_url)
|
||||
from .manifest_util import ManifestError
|
||||
raise ManifestError(
|
||||
f"unknown provisioned_key provider: {provider!r}; "
|
||||
f"available: gitea"
|
||||
)
|
||||
+1
-89
@@ -29,14 +29,11 @@ backend-specific and lives on concrete subclasses (see
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import dataclasses
|
||||
import os
|
||||
import shlex
|
||||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
from .log import info
|
||||
from .manifest import Bottle, GitEntry
|
||||
|
||||
|
||||
@@ -360,80 +357,6 @@ exit 0
|
||||
"""
|
||||
|
||||
|
||||
def _provision_dynamic_key(
|
||||
entry: GitEntry,
|
||||
slug: str,
|
||||
stage_dir: Path,
|
||||
) -> str:
|
||||
"""Generate a fresh ed25519 keypair, register the public half with
|
||||
the forge, and persist the private key + key ID under `stage_dir`.
|
||||
|
||||
Returns the host-side path to the private key file so the caller
|
||||
can inject it into the GitGateUpstream as `identity_file`."""
|
||||
from .deploy_key_provisioner import get_provisioner
|
||||
pk = entry.ProvisionedKey
|
||||
assert pk is not None
|
||||
token = os.environ.get(pk.token_env)
|
||||
if token is None:
|
||||
raise RuntimeError(
|
||||
f"git-gate.repos[{entry.Name!r}] provisioned_key.token_env"
|
||||
f" = {pk.token_env!r}: env var is not set"
|
||||
)
|
||||
api_url = pk.api_url or f"https://{entry.UpstreamHost}"
|
||||
provisioner = get_provisioner(pk.provider, token, api_url)
|
||||
|
||||
owner_repo = entry.UpstreamPath
|
||||
if owner_repo.endswith(".git"):
|
||||
owner_repo = owner_repo[:-4]
|
||||
title = f"bot-bottle:{slug}:{entry.Name}"
|
||||
|
||||
info(f"provisioning deploy key for git-gate.repos[{entry.Name!r}]")
|
||||
key_id, private_key_bytes = provisioner.create(owner_repo, title)
|
||||
|
||||
key_file = stage_dir / f"{entry.Name}-key"
|
||||
key_file.write_bytes(private_key_bytes)
|
||||
key_file.chmod(0o600)
|
||||
|
||||
id_file = stage_dir / f"{entry.Name}-deploy-key-id"
|
||||
id_file.write_text(key_id)
|
||||
id_file.chmod(0o600)
|
||||
|
||||
info(f"provisioned deploy key {key_id} for git-gate.repos[{entry.Name!r}]")
|
||||
return str(key_file)
|
||||
|
||||
|
||||
def revoke_git_gate_provisioned_keys(bottle: Bottle, stage_dir: Path) -> None:
|
||||
"""Revoke all deploy keys provisioned for `bottle` during prepare.
|
||||
|
||||
Called at teardown after containers stop. Raises if any revocation
|
||||
fails — a stranded key is a security concern that the operator must
|
||||
address manually."""
|
||||
from .deploy_key_provisioner import get_provisioner
|
||||
for entry in bottle.git:
|
||||
if entry.ProvisionedKey is None:
|
||||
continue
|
||||
pk = entry.ProvisionedKey
|
||||
id_file = stage_dir / f"{entry.Name}-deploy-key-id"
|
||||
if not id_file.exists():
|
||||
continue
|
||||
key_id = id_file.read_text().strip()
|
||||
token = os.environ.get(pk.token_env)
|
||||
if token is None:
|
||||
raise RuntimeError(
|
||||
f"git-gate.repos[{entry.Name!r}] provisioned_key.token_env"
|
||||
f" = {pk.token_env!r}: env var is not set;"
|
||||
f" cannot revoke deploy key {key_id}"
|
||||
)
|
||||
api_url = pk.api_url or f"https://{entry.UpstreamHost}"
|
||||
provisioner = get_provisioner(pk.provider, token, api_url)
|
||||
owner_repo = entry.UpstreamPath
|
||||
if owner_repo.endswith(".git"):
|
||||
owner_repo = owner_repo[:-4]
|
||||
info(f"revoking deploy key {key_id} for git-gate.repos[{entry.Name!r}]")
|
||||
provisioner.delete(owner_repo, key_id)
|
||||
info(f"revoked deploy key {key_id} for git-gate.repos[{entry.Name!r}]")
|
||||
|
||||
|
||||
class GitGate(ABC):
|
||||
"""The per-agent git-gate. Encapsulates the host-side prepare
|
||||
(upstream lift + entrypoint/hook render); the sidecar's
|
||||
@@ -445,21 +368,10 @@ class GitGate(ABC):
|
||||
entrypoint, pre-receive hook, and access-hook scripts (mode
|
||||
600) under `stage_dir`. Pure host-side, no docker subprocess.
|
||||
|
||||
For `provisioned_key` entries, also generates and registers
|
||||
a fresh deploy key via the forge API and writes the private key
|
||||
+ key ID to `stage_dir`.
|
||||
|
||||
Returned plan is incomplete: the launch step must fill
|
||||
`internal_network` / `egress_network` via `dataclasses.replace`
|
||||
before passing the plan to `.start`."""
|
||||
upstreams_list = list(git_gate_upstreams_for_bottle(bottle))
|
||||
for i, entry in enumerate(bottle.git):
|
||||
if entry.ProvisionedKey is not None:
|
||||
key_file = _provision_dynamic_key(entry, slug, stage_dir)
|
||||
upstreams_list[i] = dataclasses.replace(
|
||||
upstreams_list[i], identity_file=key_file
|
||||
)
|
||||
upstreams = tuple(upstreams_list)
|
||||
upstreams = git_gate_upstreams_for_bottle(bottle)
|
||||
entrypoint = stage_dir / "git_gate_entrypoint.sh"
|
||||
entrypoint.write_text(git_gate_render_entrypoint(upstreams))
|
||||
entrypoint.chmod(0o600)
|
||||
|
||||
+11
-90
@@ -4,7 +4,6 @@ from __future__ import annotations
|
||||
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
from .manifest_util import ManifestError, as_json_object
|
||||
|
||||
@@ -62,24 +61,6 @@ def validate_unique_git_names(bottle_name: str, git: tuple[GitEntry, ...]) -> No
|
||||
seen[g.Name] = None
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ProvisionedKeyConfig:
|
||||
"""Configuration for automatic deploy-key lifecycle management
|
||||
(PRD 0048). Used when a git-gate.repos entry opts out of a
|
||||
static identity file and instead wants a fresh SSH keypair
|
||||
generated at spin-up and revoked at teardown.
|
||||
|
||||
`provider` names the contrib sub-package to load (e.g. `gitea`).
|
||||
`token_env` is the name of a host-side env var carrying the API
|
||||
token; the value is read at provision time, never stored on the
|
||||
plan. `api_url` is the forge's HTTP API root; if empty, it is
|
||||
derived from the upstream URL's host at provision time."""
|
||||
|
||||
provider: str
|
||||
token_env: str
|
||||
api_url: str = ""
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class GitEntry:
|
||||
"""One upstream the per-agent git-gate (PRD 0008) is allowed to
|
||||
@@ -93,15 +74,14 @@ class GitEntry:
|
||||
stashed in the `Upstream*` fields so the git-gate render step
|
||||
doesn't have to re-parse.
|
||||
|
||||
Manifest source: `git-gate.repos.<Name>` (PRD 0047/0048). Exactly
|
||||
one of `identity` (static key path) or `provisioned_key` (automatic
|
||||
lifecycle) must be present. The internal field names are stable."""
|
||||
Manifest source: `git-gate.repos.<Name>` (PRD 0047). The YAML keys
|
||||
are `url`, `identity`, and `host_key`; the internal field names are
|
||||
stable across that rename."""
|
||||
|
||||
Name: str
|
||||
Upstream: str
|
||||
IdentityFile: str = ""
|
||||
IdentityFile: str
|
||||
KnownHostKey: str = ""
|
||||
ProvisionedKey: Optional[ProvisionedKeyConfig] = None
|
||||
RemoteKey: str = ""
|
||||
UpstreamUser: str = ""
|
||||
UpstreamHost: str = ""
|
||||
@@ -114,9 +94,8 @@ class GitEntry:
|
||||
) -> "GitEntry":
|
||||
"""Parse one entry from `git-gate.repos.<repo_name>`.
|
||||
|
||||
YAML keys: `url` (required), exactly one of `identity` or
|
||||
`provisioned_key` (required), `host_key` (optional).
|
||||
The repo_name becomes `Name`."""
|
||||
YAML keys: `url` (required), `identity` (required),
|
||||
`host_key` (optional). The repo_name becomes `Name`."""
|
||||
if not repo_name:
|
||||
raise ManifestError(
|
||||
f"bottle '{bottle_name}' git-gate.repos has an empty key"
|
||||
@@ -129,44 +108,21 @@ class GitEntry:
|
||||
label = f"git-gate.repos[{repo_name!r}]"
|
||||
d = as_json_object(raw, f"bottle '{bottle_name}' {label}")
|
||||
for k in d:
|
||||
if k not in {"url", "identity", "provisioned_key", "host_key"}:
|
||||
if k not in {"url", "identity", "host_key"}:
|
||||
raise ManifestError(
|
||||
f"bottle '{bottle_name}' {label} has unknown key {k!r}; "
|
||||
f"allowed: url, identity, provisioned_key, host_key"
|
||||
f"allowed: url, identity, host_key"
|
||||
)
|
||||
upstream = d.get("url")
|
||||
if not isinstance(upstream, str) or not upstream:
|
||||
raise ManifestError(
|
||||
f"bottle '{bottle_name}' {label} missing required string field 'url'"
|
||||
)
|
||||
|
||||
has_identity = "identity" in d
|
||||
has_provisioned = "provisioned_key" in d
|
||||
if has_identity and has_provisioned:
|
||||
ident = d.get("identity")
|
||||
if not isinstance(ident, str) or not ident:
|
||||
raise ManifestError(
|
||||
f"bottle '{bottle_name}' {label} must set exactly one of "
|
||||
f"'identity' or 'provisioned_key'; got both."
|
||||
f"bottle '{bottle_name}' {label} missing required string field 'identity'"
|
||||
)
|
||||
if not has_identity and not has_provisioned:
|
||||
raise ManifestError(
|
||||
f"bottle '{bottle_name}' {label} must set exactly one of "
|
||||
f"'identity' or 'provisioned_key'; got neither."
|
||||
)
|
||||
|
||||
ident = ""
|
||||
provisioned_key: Optional[ProvisionedKeyConfig] = None
|
||||
if has_identity:
|
||||
raw_ident = d.get("identity")
|
||||
if not isinstance(raw_ident, str) or not raw_ident:
|
||||
raise ManifestError(
|
||||
f"bottle '{bottle_name}' {label} 'identity' must be a non-empty string"
|
||||
)
|
||||
ident = raw_ident
|
||||
else:
|
||||
provisioned_key = _parse_provisioned_key_config(
|
||||
bottle_name, label, d["provisioned_key"]
|
||||
)
|
||||
|
||||
khk = _opt_str(
|
||||
d.get("host_key"),
|
||||
f"bottle '{bottle_name}' {label} host_key",
|
||||
@@ -179,7 +135,6 @@ class GitEntry:
|
||||
Upstream=upstream,
|
||||
IdentityFile=ident,
|
||||
KnownHostKey=khk,
|
||||
ProvisionedKey=provisioned_key,
|
||||
RemoteKey=host,
|
||||
UpstreamUser=user,
|
||||
UpstreamHost=host,
|
||||
@@ -188,40 +143,6 @@ class GitEntry:
|
||||
)
|
||||
|
||||
|
||||
def _parse_provisioned_key_config(
|
||||
bottle_name: str, label: str, raw: object
|
||||
) -> ProvisionedKeyConfig:
|
||||
d = as_json_object(raw, f"bottle '{bottle_name}' {label}.provisioned_key")
|
||||
for k in d:
|
||||
if k not in {"provider", "token_env", "api_url"}:
|
||||
raise ManifestError(
|
||||
f"bottle '{bottle_name}' {label}.provisioned_key has unknown key {k!r}; "
|
||||
f"allowed: provider, token_env, api_url"
|
||||
)
|
||||
provider = d.get("provider")
|
||||
if not isinstance(provider, str) or not provider:
|
||||
raise ManifestError(
|
||||
f"bottle '{bottle_name}' {label}.provisioned_key missing required "
|
||||
f"string field 'provider'"
|
||||
)
|
||||
token_env = d.get("token_env")
|
||||
if not isinstance(token_env, str) or not token_env:
|
||||
raise ManifestError(
|
||||
f"bottle '{bottle_name}' {label}.provisioned_key missing required "
|
||||
f"string field 'token_env'"
|
||||
)
|
||||
api_url_raw = d.get("api_url", "")
|
||||
if not isinstance(api_url_raw, str):
|
||||
raise ManifestError(
|
||||
f"bottle '{bottle_name}' {label}.provisioned_key 'api_url' must be a string"
|
||||
)
|
||||
return ProvisionedKeyConfig(
|
||||
provider=provider,
|
||||
token_env=token_env,
|
||||
api_url=api_url_raw,
|
||||
)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class GitUser:
|
||||
"""Per-bottle `git config --global user.name` / `user.email`
|
||||
|
||||
@@ -1,296 +0,0 @@
|
||||
# PRD 0048: SSH Deploy-Key Provisioning
|
||||
|
||||
- **Status:** Active
|
||||
- **Author:** didericis-claude
|
||||
- **Created:** 2026-06-03
|
||||
- **Issue:** #169
|
||||
|
||||
## Summary
|
||||
|
||||
Replace per-repo static SSH identity files with short-lived ed25519 deploy
|
||||
keys that are generated at spin-up and revoked at teardown. Introduce
|
||||
`bot_bottle/contrib/` as the package for platform-specific provisioners and
|
||||
ship the first contrib sub-package: `bot_bottle/contrib/gitea/` with
|
||||
`GiteaDeployKeyProvisioner`. A new `provisioned_key:` block in `git-gate.repos`
|
||||
entries opts a repo into automatic key lifecycle management; `identity:` stays
|
||||
valid for operators who supply their own key material.
|
||||
|
||||
## Problem
|
||||
|
||||
The current `git-gate.repos` entries require an `identity:` field pointing to
|
||||
a host-side SSH private key (PRD 0047). Keys are static: the operator generates
|
||||
them once, registers them with the upstream forge, and the same key is reused
|
||||
across every bottle spin-up. This has several consequences:
|
||||
|
||||
- **No automatic revocation.** If a bottle misbehaves or a key leaks, the
|
||||
operator must notice and manually delete the key from the forge. There is no
|
||||
teardown hook that does it.
|
||||
- **Broad blast radius.** A forge deploy key typically grants write access for
|
||||
the lifetime of the key. A static key that survives bottle teardown continues
|
||||
to grant that access.
|
||||
- **Manual rotation burden.** Operators must manage key files on disk, keeping
|
||||
them secure, rotating them on a schedule, and distributing them across hosts
|
||||
that run `./cli.py start`.
|
||||
|
||||
## Goals / Success Criteria
|
||||
|
||||
- `git-gate.repos` entries accept `provisioned_key:` as an alternative to
|
||||
`identity:`. The parser rejects entries that have both, or neither.
|
||||
- `provisioned_key.provider: gitea` provisions and revokes deploy keys via the
|
||||
Gitea HTTP API.
|
||||
- At prepare time the provisioner generates a fresh ed25519 keypair, registers
|
||||
the public half as a repo-scoped deploy key, and makes the private key
|
||||
available to git-gate at the path it expects — the rest of the pipeline is
|
||||
unchanged.
|
||||
- At teardown the provisioner deletes the registered deploy key. Failure to
|
||||
delete halts teardown and propagates the error loudly.
|
||||
- `bot_bottle/contrib/` is introduced as the package for platform-specific
|
||||
implementations; the core defines the abstract interface; contrib sub-packages
|
||||
provide concrete implementations.
|
||||
- Existing `identity:`-based repos continue to work without change.
|
||||
- The unit test suite passes unchanged for `identity:` paths; new tests cover
|
||||
`provisioned_key:` parse, validation, and provisioner dispatch.
|
||||
|
||||
## Non-goals
|
||||
|
||||
- GitHub, GitLab, or other forge providers (a future contrib sub-package each).
|
||||
- Dashboard UI for listing or revoking orphaned deploy keys.
|
||||
- SSH CA certificate approach (rejected in the issue thread in favour of
|
||||
per-repo deploy keys for simpler revocation, smaller blast radius, and forge
|
||||
compatibility).
|
||||
- Key rotation mid-session (keys live for exactly one spin-up / teardown cycle).
|
||||
- Any change to how `identity:` repos are provisioned.
|
||||
|
||||
## Design
|
||||
|
||||
### Manifest changes (builds on PRD 0047)
|
||||
|
||||
`git-gate.repos.<name>` currently accepts exactly:
|
||||
|
||||
```
|
||||
url (required string)
|
||||
identity (required string)
|
||||
host_key (optional string)
|
||||
```
|
||||
|
||||
After this PRD:
|
||||
|
||||
```
|
||||
url (required string)
|
||||
identity (optional string — mutually exclusive with provisioned_key)
|
||||
provisioned_key (optional object — mutually exclusive with identity)
|
||||
host_key (optional string)
|
||||
```
|
||||
|
||||
Exactly one of `identity` or `provisioned_key` must be present. The parser
|
||||
emits a targeted error for each violation:
|
||||
|
||||
```
|
||||
bottle 'dev' git-gate.repos['bot-bottle'] must set exactly one of
|
||||
'identity' or 'provisioned_key'; got neither.
|
||||
|
||||
bottle 'dev' git-gate.repos['bot-bottle'] must set exactly one of
|
||||
'identity' or 'provisioned_key'; got both.
|
||||
```
|
||||
|
||||
`provisioned_key` object schema:
|
||||
|
||||
```yaml
|
||||
provisioned_key:
|
||||
provider: gitea # required; names the contrib module to load
|
||||
token_env: GITEA_TOKEN # required; name of a host env var holding the API token
|
||||
api_url: https://... # optional; defaults to https://<host from url>
|
||||
```
|
||||
|
||||
| Field | Type | Notes |
|
||||
|-------|------|-------|
|
||||
| `provider` | required string | Must match a sub-package under `bot_bottle/contrib/` |
|
||||
| `token_env` | required string | Resolved at provision time via `os.environ`; never stored in plan |
|
||||
| `api_url` | optional string | Override when the API endpoint differs from the git host |
|
||||
|
||||
**Example bottle manifest:**
|
||||
|
||||
```yaml
|
||||
git-gate:
|
||||
user:
|
||||
name: implementer-bot
|
||||
email: eric+implementer@dideric.is
|
||||
repos:
|
||||
bot-bottle:
|
||||
url: ssh://git@gitea.dideric.is:30009/didericis/bot-bottle.git
|
||||
provisioned_key:
|
||||
provider: gitea
|
||||
token_env: GITEA_DEPLOY_TOKEN
|
||||
host_key: "ssh-rsa AAAA..."
|
||||
```
|
||||
|
||||
### `contrib` package structure
|
||||
|
||||
```
|
||||
bot_bottle/
|
||||
contrib/
|
||||
__init__.py # empty; no core symbols
|
||||
gitea/
|
||||
__init__.py # empty
|
||||
deploy_key_provisioner.py
|
||||
```
|
||||
|
||||
`contrib` is a flat namespace of forge/platform sub-packages. Each sub-package
|
||||
is self-contained; the core imports from contrib lazily (inside factory
|
||||
functions) so that missing optional dependencies in a contrib sub-package don't
|
||||
break unrelated features.
|
||||
|
||||
### Core interface
|
||||
|
||||
New file: `bot_bottle/deploy_key_provisioner.py`
|
||||
|
||||
```python
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
class DeployKeyProvisioner(ABC):
|
||||
@abstractmethod
|
||||
def create(self, owner_repo: str, title: str) -> tuple[str, bytes]:
|
||||
"""Generate a keypair and register the public half.
|
||||
|
||||
owner_repo: '<owner>/<repo>' portion of the git upstream URL.
|
||||
title: human-readable label shown in the forge key list.
|
||||
|
||||
Returns (key_id, private_key_pem) where key_id is opaque to
|
||||
the caller and is only passed back to delete()."""
|
||||
|
||||
@abstractmethod
|
||||
def delete(self, owner_repo: str, key_id: str) -> None:
|
||||
"""Delete the registered deploy key.
|
||||
|
||||
Must not raise if the key is already absent (HTTP 404 is success).
|
||||
Must raise for all other failures so that teardown halts."""
|
||||
|
||||
|
||||
def get_provisioner(provider: str, token: str, api_url: str) -> DeployKeyProvisioner:
|
||||
"""Instantiate the named contrib provisioner.
|
||||
|
||||
Raises ManifestError for unknown providers so the error is caught
|
||||
at parse time rather than at runtime."""
|
||||
if provider == "gitea":
|
||||
from bot_bottle.contrib.gitea.deploy_key_provisioner import (
|
||||
GiteaDeployKeyProvisioner,
|
||||
)
|
||||
return GiteaDeployKeyProvisioner(token=token, api_url=api_url)
|
||||
from .manifest_util import ManifestError
|
||||
raise ManifestError(f"unknown provisioned_key provider: {provider!r}")
|
||||
```
|
||||
|
||||
### Gitea contrib implementation
|
||||
|
||||
`bot_bottle/contrib/gitea/deploy_key_provisioner.py`:
|
||||
|
||||
`create(owner_repo, title)`:
|
||||
1. Generate an ed25519 keypair via `ssh-keygen -t ed25519 -f <tmpfile> -N ''`
|
||||
(uses the SSH tooling already required by git-gate; no new Python dependency).
|
||||
2. Read the private key bytes and the `.pub` file.
|
||||
3. `POST /api/v1/repos/{owner}/{repo}/keys` with the public key, `title`, and
|
||||
`read_only: false` (deploy keys always need push access for git-gate).
|
||||
4. Return `(str(response["id"]), private_key_bytes)`.
|
||||
|
||||
`delete(owner_repo, key_id)`:
|
||||
1. `DELETE /api/v1/repos/{owner}/{repo}/keys/{id}`.
|
||||
2. Treat HTTP 404 as success (key already gone).
|
||||
3. Raise `RuntimeError` for any other non-2xx response or network error,
|
||||
including the status code and response body in the message.
|
||||
|
||||
HTTP calls use `urllib.request` from the stdlib; no new runtime dependency.
|
||||
|
||||
### `GitEntry` dataclass changes
|
||||
|
||||
`bot_bottle/manifest_git.py`:
|
||||
|
||||
- Add `ProvisionedKeyConfig` dataclass:
|
||||
|
||||
```python
|
||||
@dataclass(frozen=True)
|
||||
class ProvisionedKeyConfig:
|
||||
provider: str
|
||||
token_env: str
|
||||
api_url: str # empty string means "derive from UpstreamHost"
|
||||
```
|
||||
|
||||
- `GitEntry`:
|
||||
- `IdentityFile: str` unchanged internally; empty string when
|
||||
`provisioned_key` is used; set at provision time, not parse time.
|
||||
- New field: `ProvisionedKey: ProvisionedKeyConfig | None = None`
|
||||
- `from_repos_entry` validates the mutually-exclusive constraint and parses
|
||||
the `provisioned_key` block when present.
|
||||
|
||||
### `GitGateUpstream` / prepare-time changes
|
||||
|
||||
`bot_bottle/git_gate.py` and `bot_bottle/backend/docker/provision/git.py`:
|
||||
|
||||
The existing path writes the identity file path into `GitGateUpstream.IdentityFile`
|
||||
and docker-cp's it into `/git-gate/creds/<name>-key`. That path stays unchanged
|
||||
for `identity:` repos.
|
||||
|
||||
For `provisioned_key:` repos, a new helper `provision_deploy_key(entry,
|
||||
stage_dir, bottle_name)` runs before the git-gate sidecar starts:
|
||||
|
||||
1. Resolve `token = os.environ[entry.ProvisionedKey.token_env]`. Missing key
|
||||
raises `RuntimeError` with a clear message naming the env var.
|
||||
2. Resolve `api_url = entry.ProvisionedKey.api_url or f"https://{entry.UpstreamHost}"`.
|
||||
3. Instantiate `get_provisioner(entry.ProvisionedKey.provider, token, api_url)`.
|
||||
4. Call `provisioner.create(entry.UpstreamPath.lstrip("/"), title)` where
|
||||
`title = f"bot-bottle:{bottle_name}:{entry.Name}"`.
|
||||
5. Write private key to `stage_dir / f"{entry.Name}-key"` (mode 0o600).
|
||||
6. Write key ID to `stage_dir / f"{entry.Name}-deploy-key-id"` (plain text).
|
||||
7. Return the key file path; caller sets `GitGateUpstream.IdentityFile` to it.
|
||||
|
||||
`owner_repo` is extracted from `entry.UpstreamPath` (the path component of the
|
||||
`ssh://` URL, e.g. `/didericis/bot-bottle.git` → `didericis/bot-bottle`).
|
||||
|
||||
### Teardown changes
|
||||
|
||||
`bot_bottle/backend/docker/cleanup.py` (or the equivalent teardown path):
|
||||
|
||||
After the git-gate sidecar stops, for each `GitEntry` with `ProvisionedKey`
|
||||
set:
|
||||
|
||||
1. Check that `stage_dir / f"{entry.Name}-deploy-key-id"` exists; skip if
|
||||
absent (provision never ran or already cleaned up).
|
||||
2. Resolve token and API URL as above.
|
||||
3. Instantiate provisioner and call `provisioner.delete(owner_repo, key_id)`.
|
||||
4. On success, log at INFO. On failure, allow the exception to propagate —
|
||||
teardown halts and the error surfaces to the operator.
|
||||
|
||||
A stranded deploy key is a security concern: the operator must know about it
|
||||
and address it manually. Silent continuation is not acceptable.
|
||||
|
||||
The private key file in `stage_dir` is cleaned up as part of normal stage-dir
|
||||
teardown (no extra step needed).
|
||||
|
||||
## Testing strategy
|
||||
|
||||
```
|
||||
python3 -m unittest discover -s tests/unit
|
||||
```
|
||||
|
||||
New / modified test files:
|
||||
|
||||
- `tests/unit/test_manifest_git.py` — add cases for:
|
||||
- `provisioned_key:` accepted with valid `provider`, `token_env`, optional `api_url`
|
||||
- Both `identity` and `provisioned_key` present → `ManifestError`
|
||||
- Neither `identity` nor `provisioned_key` present → `ManifestError`
|
||||
- Unknown key inside `provisioned_key` block → `ManifestError`
|
||||
- Missing `provider` or `token_env` inside `provisioned_key` → `ManifestError`
|
||||
|
||||
- `tests/unit/test_deploy_key_provisioner.py` — new:
|
||||
- `get_provisioner("gitea", ...)` returns `GiteaDeployKeyProvisioner`
|
||||
- `get_provisioner("unknown", ...)` raises `ManifestError`
|
||||
|
||||
- `tests/unit/test_contrib_gitea_deploy_key.py` — new (using `unittest.mock`
|
||||
to stub `urllib.request.urlopen` and `subprocess.run`):
|
||||
- `create()` calls `ssh-keygen`, POSTs to correct endpoint, returns key ID
|
||||
- `delete()` DELETEs to correct endpoint
|
||||
- `delete()` tolerates HTTP 404 (already-deleted key)
|
||||
- `delete()` raises `RuntimeError` on non-404 HTTP error
|
||||
|
||||
## Open questions
|
||||
|
||||
None.
|
||||
@@ -1,166 +0,0 @@
|
||||
"""Unit: GiteaDeployKeyProvisioner (PRD 0048, contrib/gitea)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import unittest
|
||||
import urllib.error
|
||||
from io import BytesIO
|
||||
from pathlib import Path
|
||||
from tempfile import mkdtemp
|
||||
from unittest.mock import MagicMock, call, patch
|
||||
|
||||
from bot_bottle.contrib.gitea.deploy_key_provisioner import (
|
||||
GiteaDeployKeyProvisioner,
|
||||
_split_owner_repo,
|
||||
)
|
||||
|
||||
|
||||
def _provisioner() -> GiteaDeployKeyProvisioner:
|
||||
return GiteaDeployKeyProvisioner(
|
||||
token="test-token", api_url="https://gitea.example.com"
|
||||
)
|
||||
|
||||
|
||||
def _urlopen_response(body: dict, status: int = 200) -> MagicMock:
|
||||
resp = MagicMock()
|
||||
resp.read.return_value = json.dumps(body).encode()
|
||||
resp.status = status
|
||||
resp.__enter__ = lambda s: s
|
||||
resp.__exit__ = MagicMock(return_value=False)
|
||||
return resp
|
||||
|
||||
|
||||
def _http_error(code: int, body: str = "") -> urllib.error.HTTPError:
|
||||
return urllib.error.HTTPError(
|
||||
url="http://x",
|
||||
code=code,
|
||||
msg="err",
|
||||
hdrs=None, # type: ignore[arg-type]
|
||||
fp=BytesIO(body.encode()),
|
||||
)
|
||||
|
||||
|
||||
class TestCreate(unittest.TestCase):
|
||||
def test_create_calls_ssh_keygen_and_posts_to_api(self):
|
||||
provisioner = _provisioner()
|
||||
fake_key_id = 42
|
||||
fake_private = b"PRIVATE_KEY"
|
||||
fake_public = "ssh-ed25519 AAAA fake"
|
||||
|
||||
with patch(
|
||||
"bot_bottle.contrib.gitea.deploy_key_provisioner.subprocess.run"
|
||||
) as mock_run, patch(
|
||||
"bot_bottle.contrib.gitea.deploy_key_provisioner.urllib.request.urlopen"
|
||||
) as mock_urlopen, patch(
|
||||
"bot_bottle.contrib.gitea.deploy_key_provisioner.Path.read_bytes",
|
||||
return_value=fake_private,
|
||||
), patch(
|
||||
"bot_bottle.contrib.gitea.deploy_key_provisioner.Path.read_text",
|
||||
return_value=fake_public + "\n",
|
||||
):
|
||||
mock_urlopen.return_value = _urlopen_response({"id": fake_key_id})
|
||||
key_id, private_bytes = provisioner.create(
|
||||
"didericis/bot-bottle", "bot-bottle:slug:repo"
|
||||
)
|
||||
|
||||
# ssh-keygen called with ed25519
|
||||
mock_run.assert_called_once()
|
||||
run_args = mock_run.call_args.args[0]
|
||||
self.assertIn("ssh-keygen", run_args)
|
||||
self.assertIn("-t", run_args)
|
||||
self.assertIn("ed25519", run_args)
|
||||
|
||||
# POST body contains public key
|
||||
post_call = mock_urlopen.call_args.args[0]
|
||||
payload = json.loads(post_call.data)
|
||||
self.assertEqual(fake_public, payload["key"])
|
||||
self.assertFalse(payload["read_only"])
|
||||
|
||||
# Correct URL
|
||||
self.assertIn(
|
||||
"/api/v1/repos/didericis/bot-bottle/keys", post_call.full_url
|
||||
)
|
||||
self.assertEqual(str(fake_key_id), key_id)
|
||||
self.assertEqual(fake_private, private_bytes)
|
||||
|
||||
def test_create_raises_on_http_error(self):
|
||||
provisioner = _provisioner()
|
||||
with patch(
|
||||
"bot_bottle.contrib.gitea.deploy_key_provisioner.subprocess.run"
|
||||
), patch(
|
||||
"bot_bottle.contrib.gitea.deploy_key_provisioner.urllib.request.urlopen",
|
||||
side_effect=_http_error(403, "forbidden"),
|
||||
), patch(
|
||||
"bot_bottle.contrib.gitea.deploy_key_provisioner.Path.read_bytes",
|
||||
return_value=b"pk",
|
||||
), patch(
|
||||
"bot_bottle.contrib.gitea.deploy_key_provisioner.Path.read_text",
|
||||
return_value="ssh-ed25519 AAAA\n",
|
||||
):
|
||||
with self.assertRaises(RuntimeError) as ctx:
|
||||
provisioner.create("owner/repo", "title")
|
||||
self.assertIn("403", str(ctx.exception))
|
||||
|
||||
|
||||
class TestDelete(unittest.TestCase):
|
||||
def test_delete_calls_correct_endpoint(self):
|
||||
provisioner = _provisioner()
|
||||
with patch(
|
||||
"bot_bottle.contrib.gitea.deploy_key_provisioner.urllib.request.urlopen"
|
||||
) as mock_urlopen:
|
||||
mock_urlopen.return_value = _urlopen_response({})
|
||||
provisioner.delete("didericis/bot-bottle", "99")
|
||||
|
||||
req = mock_urlopen.call_args.args[0]
|
||||
self.assertIn("/api/v1/repos/didericis/bot-bottle/keys/99", req.full_url)
|
||||
self.assertEqual("DELETE", req.get_method())
|
||||
|
||||
def test_delete_tolerates_404(self):
|
||||
provisioner = _provisioner()
|
||||
with patch(
|
||||
"bot_bottle.contrib.gitea.deploy_key_provisioner.urllib.request.urlopen",
|
||||
side_effect=_http_error(404),
|
||||
):
|
||||
provisioner.delete("owner/repo", "123") # must not raise
|
||||
|
||||
def test_delete_raises_on_non_404_http_error(self):
|
||||
provisioner = _provisioner()
|
||||
with patch(
|
||||
"bot_bottle.contrib.gitea.deploy_key_provisioner.urllib.request.urlopen",
|
||||
side_effect=_http_error(500, "internal server error"),
|
||||
):
|
||||
with self.assertRaises(RuntimeError) as ctx:
|
||||
provisioner.delete("owner/repo", "7")
|
||||
self.assertIn("500", str(ctx.exception))
|
||||
|
||||
def test_delete_raises_on_url_error(self):
|
||||
provisioner = _provisioner()
|
||||
with patch(
|
||||
"bot_bottle.contrib.gitea.deploy_key_provisioner.urllib.request.urlopen",
|
||||
side_effect=urllib.error.URLError("connection refused"),
|
||||
):
|
||||
with self.assertRaises(RuntimeError) as ctx:
|
||||
provisioner.delete("owner/repo", "7")
|
||||
self.assertIn("connection refused", str(ctx.exception))
|
||||
|
||||
|
||||
class TestSplitOwnerRepo(unittest.TestCase):
|
||||
def test_simple(self):
|
||||
self.assertEqual(("owner", "repo"), _split_owner_repo("owner/repo"))
|
||||
|
||||
def test_raises_on_missing_slash(self):
|
||||
with self.assertRaises(ValueError):
|
||||
_split_owner_repo("noslash")
|
||||
|
||||
def test_raises_on_empty_owner(self):
|
||||
with self.assertRaises(ValueError):
|
||||
_split_owner_repo("/repo")
|
||||
|
||||
def test_raises_on_empty_repo(self):
|
||||
with self.assertRaises(ValueError):
|
||||
_split_owner_repo("owner/")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -1,94 +0,0 @@
|
||||
"""Unit: dashboard_model — state/model layer extracted from dashboard.py.
|
||||
|
||||
Tests for functions that were previously buried in the 2103-line
|
||||
dashboard.py and had no coverage: _approval_status,
|
||||
_proposed_payload_label, and _suffix_for_tool."""
|
||||
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
from bot_bottle.cli.dashboard_model import (
|
||||
QueuedProposal,
|
||||
_approval_status,
|
||||
_proposed_payload_label,
|
||||
_suffix_for_tool,
|
||||
)
|
||||
from bot_bottle.supervise import (
|
||||
Proposal,
|
||||
TOOL_CAPABILITY_BLOCK,
|
||||
TOOL_EGRESS_BLOCK,
|
||||
TOOL_PIPELOCK_BLOCK,
|
||||
sha256_hex,
|
||||
)
|
||||
from datetime import datetime, timezone
|
||||
|
||||
|
||||
def _qp(tool: str, slug: str = "dev") -> QueuedProposal:
|
||||
payload = "x"
|
||||
p = Proposal.new(
|
||||
bottle_slug=slug,
|
||||
tool=tool,
|
||||
proposed_file=payload,
|
||||
justification="test",
|
||||
current_file_hash=sha256_hex(payload),
|
||||
now=datetime(2026, 6, 1, 0, 0, 0, tzinfo=timezone.utc),
|
||||
)
|
||||
return QueuedProposal(proposal=p, queue_dir=Path("/tmp/q"))
|
||||
|
||||
|
||||
class TestApprovalStatus(unittest.TestCase):
|
||||
def test_egress_block_base_message(self):
|
||||
qp = _qp(TOOL_EGRESS_BLOCK, slug="my-bot")
|
||||
msg = _approval_status(qp, "approved")
|
||||
self.assertEqual("approved egress-block for [my-bot]", msg)
|
||||
|
||||
def test_modified_verb(self):
|
||||
qp = _qp(TOOL_PIPELOCK_BLOCK, slug="dev")
|
||||
msg = _approval_status(qp, "modified+approved")
|
||||
self.assertEqual("modified+approved pipelock-block for [dev]", msg)
|
||||
|
||||
def test_capability_block_appends_resume_hint(self):
|
||||
qp = _qp(TOOL_CAPABILITY_BLOCK, slug="alpha")
|
||||
msg = _approval_status(qp, "approved")
|
||||
self.assertIn("resume: ./cli.py resume alpha", msg)
|
||||
self.assertIn("approved capability-block for [alpha]", msg)
|
||||
|
||||
def test_egress_block_has_no_resume_hint(self):
|
||||
qp = _qp(TOOL_EGRESS_BLOCK)
|
||||
self.assertNotIn("resume", _approval_status(qp, "approved"))
|
||||
|
||||
def test_pipelock_block_has_no_resume_hint(self):
|
||||
qp = _qp(TOOL_PIPELOCK_BLOCK)
|
||||
self.assertNotIn("resume", _approval_status(qp, "approved"))
|
||||
|
||||
|
||||
class TestProposedPayloadLabel(unittest.TestCase):
|
||||
def test_pipelock_returns_failed_url(self):
|
||||
self.assertEqual("failed URL", _proposed_payload_label(TOOL_PIPELOCK_BLOCK))
|
||||
|
||||
def test_egress_returns_proposed_file(self):
|
||||
self.assertEqual("proposed file", _proposed_payload_label(TOOL_EGRESS_BLOCK))
|
||||
|
||||
def test_capability_returns_proposed_file(self):
|
||||
self.assertEqual("proposed file", _proposed_payload_label(TOOL_CAPABILITY_BLOCK))
|
||||
|
||||
def test_unknown_tool_returns_proposed_file(self):
|
||||
self.assertEqual("proposed file", _proposed_payload_label("unknown-tool"))
|
||||
|
||||
|
||||
class TestSuffixForTool(unittest.TestCase):
|
||||
def test_capability_block_returns_dockerfile_suffix(self):
|
||||
self.assertEqual(".dockerfile", _suffix_for_tool(TOOL_CAPABILITY_BLOCK))
|
||||
|
||||
def test_egress_block_returns_txt(self):
|
||||
self.assertEqual(".txt", _suffix_for_tool(TOOL_EGRESS_BLOCK))
|
||||
|
||||
def test_pipelock_block_returns_txt(self):
|
||||
self.assertEqual(".txt", _suffix_for_tool(TOOL_PIPELOCK_BLOCK))
|
||||
|
||||
def test_unknown_tool_returns_txt(self):
|
||||
self.assertEqual(".txt", _suffix_for_tool("whatever"))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -1,29 +0,0 @@
|
||||
"""Unit: deploy_key_provisioner factory (PRD 0048)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import unittest
|
||||
from unittest.mock import patch
|
||||
|
||||
from bot_bottle.deploy_key_provisioner import DeployKeyProvisioner, get_provisioner
|
||||
from bot_bottle.manifest import ManifestError
|
||||
|
||||
|
||||
class TestGetProvisioner(unittest.TestCase):
|
||||
def test_gitea_returns_gitea_provisioner(self):
|
||||
from bot_bottle.contrib.gitea.deploy_key_provisioner import (
|
||||
GiteaDeployKeyProvisioner,
|
||||
)
|
||||
p = get_provisioner("gitea", token="tok", api_url="https://gitea.example.com")
|
||||
self.assertIsInstance(p, GiteaDeployKeyProvisioner)
|
||||
self.assertIsInstance(p, DeployKeyProvisioner)
|
||||
|
||||
def test_unknown_provider_raises_manifest_error(self):
|
||||
with self.assertRaises(ManifestError) as ctx:
|
||||
get_provisioner("github", token="tok", api_url="https://github.com")
|
||||
self.assertIn("github", str(ctx.exception))
|
||||
self.assertIn("provisioned_key provider", str(ctx.exception))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -243,113 +243,6 @@ class TestGitEntryCrossValidation(unittest.TestCase):
|
||||
self.assertIn("PRD 0047", msg)
|
||||
|
||||
|
||||
class TestProvisionedKey(unittest.TestCase):
|
||||
"""git-gate.repos entries that use provisioned_key (PRD 0048)."""
|
||||
|
||||
def test_provisioned_key_minimal(self):
|
||||
m = Manifest.from_json_obj(_manifest({
|
||||
"bot-bottle": {
|
||||
"url": "ssh://git@gitea.dideric.is:30009/didericis/bot-bottle.git",
|
||||
"provisioned_key": {
|
||||
"provider": "gitea",
|
||||
"token_env": "GITEA_TOKEN",
|
||||
},
|
||||
},
|
||||
}))
|
||||
e = m.bottles["dev"].git[0]
|
||||
self.assertEqual("bot-bottle", e.Name)
|
||||
self.assertIsNotNone(e.ProvisionedKey)
|
||||
assert e.ProvisionedKey is not None
|
||||
self.assertEqual("gitea", e.ProvisionedKey.provider)
|
||||
self.assertEqual("GITEA_TOKEN", e.ProvisionedKey.token_env)
|
||||
self.assertEqual("", e.ProvisionedKey.api_url)
|
||||
self.assertEqual("", e.IdentityFile)
|
||||
|
||||
def test_provisioned_key_with_api_url(self):
|
||||
m = Manifest.from_json_obj(_manifest({
|
||||
"repo": {
|
||||
"url": "ssh://git@gitea.example.com/org/repo.git",
|
||||
"provisioned_key": {
|
||||
"provider": "gitea",
|
||||
"token_env": "MY_TOKEN",
|
||||
"api_url": "https://gitea.example.com",
|
||||
},
|
||||
},
|
||||
}))
|
||||
pk = m.bottles["dev"].git[0].ProvisionedKey
|
||||
assert pk is not None
|
||||
self.assertEqual("https://gitea.example.com", pk.api_url)
|
||||
|
||||
def test_both_identity_and_provisioned_key_dies(self):
|
||||
with self.assertRaises(ManifestError) as ctx:
|
||||
Manifest.from_json_obj(_manifest({
|
||||
"foo": {
|
||||
"url": "ssh://git@github.com/foo.git",
|
||||
"identity": "/dev/null",
|
||||
"provisioned_key": {"provider": "gitea", "token_env": "T"},
|
||||
},
|
||||
}))
|
||||
self.assertIn("exactly one of", str(ctx.exception))
|
||||
self.assertIn("got both", str(ctx.exception))
|
||||
|
||||
def test_neither_identity_nor_provisioned_key_dies(self):
|
||||
with self.assertRaises(ManifestError) as ctx:
|
||||
Manifest.from_json_obj(_manifest({
|
||||
"foo": {"url": "ssh://git@github.com/foo.git"},
|
||||
}))
|
||||
self.assertIn("exactly one of", str(ctx.exception))
|
||||
self.assertIn("got neither", str(ctx.exception))
|
||||
|
||||
def test_unknown_key_in_provisioned_key_block_dies(self):
|
||||
with self.assertRaises(ManifestError):
|
||||
Manifest.from_json_obj(_manifest({
|
||||
"foo": {
|
||||
"url": "ssh://git@github.com/foo.git",
|
||||
"provisioned_key": {
|
||||
"provider": "gitea",
|
||||
"token_env": "T",
|
||||
"key_type": "rsa", # not allowed
|
||||
},
|
||||
},
|
||||
}))
|
||||
|
||||
def test_missing_provider_dies(self):
|
||||
with self.assertRaises(ManifestError):
|
||||
Manifest.from_json_obj(_manifest({
|
||||
"foo": {
|
||||
"url": "ssh://git@github.com/foo.git",
|
||||
"provisioned_key": {"token_env": "T"},
|
||||
},
|
||||
}))
|
||||
|
||||
def test_missing_token_env_dies(self):
|
||||
with self.assertRaises(ManifestError):
|
||||
Manifest.from_json_obj(_manifest({
|
||||
"foo": {
|
||||
"url": "ssh://git@github.com/foo.git",
|
||||
"provisioned_key": {"provider": "gitea"},
|
||||
},
|
||||
}))
|
||||
|
||||
def test_provisioned_key_entry_has_no_identity_file(self):
|
||||
m = Manifest.from_json_obj(_manifest({
|
||||
"foo": {
|
||||
"url": "ssh://git@github.com/didericis/foo.git",
|
||||
"provisioned_key": {"provider": "gitea", "token_env": "T"},
|
||||
},
|
||||
}))
|
||||
self.assertEqual("", m.bottles["dev"].git[0].IdentityFile)
|
||||
|
||||
def test_identity_entry_has_no_provisioned_key(self):
|
||||
m = Manifest.from_json_obj(_manifest({
|
||||
"foo": {
|
||||
"url": "ssh://git@github.com/foo.git",
|
||||
"identity": "/dev/null",
|
||||
},
|
||||
}))
|
||||
self.assertIsNone(m.bottles["dev"].git[0].ProvisionedKey)
|
||||
|
||||
|
||||
class TestEmptyGitGateField(unittest.TestCase):
|
||||
def test_no_git_gate_field_yields_empty_tuple(self):
|
||||
m = Manifest.from_json_obj({
|
||||
|
||||
Reference in New Issue
Block a user