refactor(cli): rename dashboard command to supervise
test / unit (pull_request) Failing after 38s
test / integration (pull_request) Successful in 54s

This commit is contained in:
2026-06-03 17:23:40 +00:00
parent 5c17f0de95
commit 6f0a42159f
4 changed files with 625 additions and 2167 deletions
+5 -5
View File
@@ -1,6 +1,6 @@
"""Main CLI dispatcher.
Commands: cleanup, dashboard, edit, info, init, list, resume, start
Commands: cleanup, edit, info, init, list, resume, start, supervise
"""
from __future__ import annotations
@@ -12,24 +12,24 @@ from ..manifest import ManifestError
from ._common import PROG
from . import list as _list_mod
from .cleanup import cmd_cleanup
from .dashboard import cmd_dashboard
from .edit import cmd_edit
from .info import cmd_info
from .init import cmd_init
from .resume import cmd_resume
from .start import cmd_start
from .supervise import cmd_supervise
cmd_list = _list_mod.cmd_list
COMMANDS = {
"cleanup": cmd_cleanup,
"dashboard": cmd_dashboard,
"edit": cmd_edit,
"info": cmd_info,
"init": cmd_init,
"list": cmd_list,
"resume": cmd_resume,
"start": cmd_start,
"supervise": cmd_supervise,
}
@@ -37,13 +37,13 @@ def usage() -> None:
sys.stderr.write(f"usage: {PROG} <command> [args...]\n\n")
sys.stderr.write("Commands:\n")
sys.stderr.write(" cleanup stop and remove all active bot-bottle containers\n")
sys.stderr.write(" dashboard view + approve/modify/reject pending supervise proposals (PRD 0013)\n")
sys.stderr.write(" edit open an agent in vim for editing\n")
sys.stderr.write(" info print env, skills, and prompt details for a named agent\n")
sys.stderr.write(" init interactively create a new agent and add it to bot-bottle.json\n")
sys.stderr.write(" list list available agents or active containers\n")
sys.stderr.write(" resume re-launch a bottle by its identity (continues state from PRD 0016)\n")
sys.stderr.write(" start boot a container for a named agent and attach an interactive session\n\n")
sys.stderr.write(" start boot a container for a named agent and attach an interactive session\n")
sys.stderr.write(" supervise view + approve/modify/reject pending supervise proposals (PRD 0013)\n\n")
sys.stderr.write(f"Run '{PROG} <command> --help' for command-specific usage.\n")
File diff suppressed because it is too large Load Diff
-421
View File
@@ -1,421 +0,0 @@
"""dashboard_model: state/model layer for the dashboard TUI.
Data structures, discovery queries, pure state helpers, and derived
values extracted from dashboard.py so they can be tested in isolation
and navigated without wading through curses rendering code.
"""
from __future__ import annotations
import os
import shlex
from dataclasses import dataclass
from pathlib import Path
from .. import supervise as _supervise
from ..agent_provider import runtime_for
from ..backend import ActiveAgent, enumerate_active_agents
from ..backend.docker.capability_apply import CapabilityApplyError
from ..backend.docker.egress_apply import EgressApplyError
from ..backend.docker.pipelock_apply import PipelockApplyError
from ..manifest import Manifest
from ..supervise import (
TOOL_CAPABILITY_BLOCK,
TOOL_PIPELOCK_BLOCK,
Proposal,
list_pending_proposals,
)
# --- Constants ---------------------------------------------------------------
_REFRESH_INTERVAL_MS = 1000
_NEW_PROPOSAL_HIGHLIGHT_SEC = 5.0
PANE_PROPOSALS = "proposals"
PANE_AGENTS = "agents"
# --- Data structures ---------------------------------------------------------
@dataclass(frozen=True)
class QueuedProposal:
"""A pending proposal plus the queue dir it was found in."""
proposal: Proposal
queue_dir: Path
ApplyError = (EgressApplyError, PipelockApplyError, CapabilityApplyError)
# --- Discovery ---------------------------------------------------------------
def discover_active_agents() -> list[ActiveAgent]:
"""All currently-running agents across every backend with
their metadata + service set. Returns [] when neither
backend is reachable. Backed by the shared
`enumerate_active_agents` helper so the CLI's
`./cli.py list active` and this dashboard show the same data."""
return enumerate_active_agents()
def discover_pending() -> list[QueuedProposal]:
"""Walk ~/.bot-bottle/queue/* and collect pending proposals
from every bottle's queue. Sorted by arrival time across the
union — the operator works the global FIFO."""
queue_root = _supervise.bot_bottle_root() / "queue"
if not queue_root.is_dir():
return []
out: list[QueuedProposal] = []
for slug_dir in sorted(queue_root.iterdir()):
if not slug_dir.is_dir():
continue
for proposal in list_pending_proposals(slug_dir):
out.append(QueuedProposal(proposal=proposal, queue_dir=slug_dir))
out.sort(key=lambda q: q.proposal.arrival_timestamp)
return out
# --- Derived values ----------------------------------------------------------
def _approval_status(qp: QueuedProposal, verb: str) -> str:
"""Status-line text after a successful approval. For capability-
block, append the `resume <identity>` hint so the operator can
bring the rebuilt bottle back up with one copy-paste."""
base = f"{verb} {qp.proposal.tool} for [{qp.proposal.bottle_slug}]"
if qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
return f"{base}; resume: ./cli.py resume {qp.proposal.bottle_slug}"
return base
def _is_recent(
proposal_id: str,
first_seen: dict[str, float] | None,
now: float | None,
) -> bool:
"""True if `proposal_id` was first seen within the highlight
window. Both `first_seen` and `now` may be None (rendered as
not-recent) so the helper is safe in cold-start paths."""
if first_seen is None or now is None:
return False
started = first_seen.get(proposal_id)
if started is None:
return False
return (now - started) < _NEW_PROPOSAL_HIGHLIGHT_SEC
def _selection_status(
focus: str, agents: list[ActiveAgent], selected_agent: int,
) -> str:
"""Status-line text for the idle state. Surfaces the agents-
pane selection so the operator can tell what an agent-scoped
edit verb would target."""
if focus != PANE_AGENTS:
return ""
if not agents:
return "[no active agents]"
if 0 <= selected_agent < len(agents):
return f"[selected: {agents[selected_agent].slug}]"
return "[no agent selected]"
def _selected_agent(
focus: str, agents: list[ActiveAgent], selected_agent: int,
) -> ActiveAgent | None:
"""The selected agent to scope `e` / `p` to, or None if no
selection is valid (proposals pane focused, no active agents,
or selection out of bounds)."""
if focus != PANE_AGENTS:
return None
if not agents:
return None
if 0 <= selected_agent < len(agents):
return agents[selected_agent]
return None
# --- Picker helpers ----------------------------------------------------------
def _filter_agents(query: str, names: list[str]) -> list[str]:
"""Case-insensitive substring filter for the picker. Pure
function — no curses, easy to unit-test."""
if not query:
return list(names)
q = query.lower()
return [n for n in names if q in n.lower()]
def _running_counts(
bottles: dict, agents_now: list[ActiveAgent],
) -> dict[str, int]:
"""Per-agent running count: dashboard-owned + externally-
discovered, summed by agent_name. The picker shows this so the
operator knows whether picking an agent starts a fresh bottle
or a Nth one."""
counts: dict[str, int] = {}
for a in agents_now:
counts[a.agent_name] = counts.get(a.agent_name, 0) + 1
return counts
# --- Agent-row rendering helpers ---------------------------------------------
def _format_agent_row(a: ActiveAgent, maxw: int) -> str:
"""One-line agent row: ` [<backend>] <slug> <agent_name> started
<HH:MM:SS> [<sidecars>]`. The `agent` service is filtered out of
the displayed list — it's always present for an active bottle,
so listing it carries no information; the sidecars are the
differentiator.
The `[docker]` / `[smolmachines]` prefix lets the operator tell
which backend a bottle came from (issue #77). Truncated to
`maxw` because the renderer's addnstr only enforces width if
we hand it a properly-sized string."""
started = (
a.started_at.split("T", 1)[1][:8]
if "T" in a.started_at else (a.started_at or "?")
)
sidecars = tuple(s for s in a.services if s != "agent")
services = ",".join(sidecars) if sidecars else "(starting)"
backend_tag = f"[{a.backend_name}]" if a.backend_name else ""
line = (
f" {backend_tag} {a.slug} {a.agent_name} "
f"started {started} [{services}]"
)
if len(line) > maxw:
return line[: max(0, maxw - 1)] + ""
return line
# --- Detail-view helpers -----------------------------------------------------
def _detail_lines(
qp: QueuedProposal,
*,
green_attr: int = 0,
) -> list[tuple[str, int]]:
"""Return the detail-view body as (text, curses-attr) tuples.
Most lines are plain (attr=0); pipelock-block proposals append
a green "→ would allow host: ..." line so the operator sees at
a glance which hostname will land in pipelock's allowlist if
they hit approve. The URL itself is shown above for context."""
p = qp.proposal
out: list[tuple[str, int]] = [
(f"bottle: {p.bottle_slug}", 0),
(f"tool: {p.tool}", 0),
(f"id: {p.id}", 0),
(f"arrived: {p.arrival_timestamp}", 0),
(f"queue: {qp.queue_dir}", 0),
("", 0),
("justification:", 0),
]
out.extend((" " + line, 0) for line in p.justification.splitlines() or [""])
out.extend([
("", 0),
(_proposed_payload_label(p.tool) + ":", 0),
])
out.extend((line, 0) for line in p.proposed_file.splitlines() or [""])
if p.tool == TOOL_PIPELOCK_BLOCK:
host = _failed_url_host(p.proposed_file)
if host:
out.append(("", 0))
out.append((host, green_attr))
return out
def _failed_url_host(url: str) -> str:
"""Best-effort hostname extraction from a pipelock-block proposal's
failed_url payload. Returns empty string on unparseable input —
callers handle empty as "nothing to highlight"."""
import urllib.parse
try:
return urllib.parse.urlsplit(url.strip()).hostname or ""
except ValueError:
return ""
def _proposed_payload_label(tool: str) -> str:
"""The detail-view section heading for the proposal's payload —
`proposed_file` is what the dataclass calls it, but for
pipelock-block the payload is a single URL not a file. Render
the label per tool so the operator's eye matches."""
if tool == TOOL_PIPELOCK_BLOCK:
return "failed URL"
return "proposed file"
def _suffix_for_tool(tool: str) -> str:
if tool == TOOL_CAPABILITY_BLOCK:
return ".dockerfile"
return ".txt"
# --- Bottle/agent resolution -------------------------------------------------
def _bottle_for_slug(
slug: str,
bottles: dict,
manifest: Manifest | None,
) -> tuple["object", str]:
"""Return `(bottle_handle, prompt_path_hint)` for a re-attach.
If the slug is in `bottles` (dashboard-owned), return the stored
handle directly. Otherwise synthesize a bottle from the persisted
metadata. The backend field in metadata (PRD 0040) selects Docker
or smolmachines; unknown or missing metadata defaults to Docker.
Returns the empty string for prompt_path_hint when we omit the
flag — the caller passes None to DockerBottle in that case."""
from ..backend.docker.bottle import DockerBottle
from ..backend.docker.bottle_state import read_metadata
from ..backend.smolmachines.bottle import SmolmachinesBottle
if slug in bottles:
_cm, bottle, _identity = bottles[slug]
return bottle, ""
instance_name = f"bot-bottle-{slug}"
prompt_path: str | None = None
metadata = read_metadata(slug)
if metadata is not None and manifest is not None:
agent = manifest.agents.get(metadata.agent_name)
if agent is not None and agent.prompt:
container_home = os.environ.get(
"BOT_BOTTLE_CONTAINER_HOME", "/home/node",
)
prompt_path = f"{container_home}/.bot-bottle-prompt.txt"
backend = metadata.backend if metadata is not None else ""
if backend == "smolmachines":
synth: object = SmolmachinesBottle(
instance_name,
prompt_path=prompt_path,
)
else:
synth = DockerBottle(
container=instance_name,
teardown=lambda: None,
prompt_path_in_container=prompt_path,
)
return synth, (prompt_path or "")
def _pick_next_after_stop(
agents_before: list[ActiveAgent],
selected_index: int,
stopped_slug: str,
) -> tuple[int, ActiveAgent] | None:
"""After stopping `stopped_slug` from the agents list, choose
the agent that should take focus next. The agent below the
stopped row (which slides up to fill its index) is the
natural pick; if the stopped agent was last, the row above
instead. Returns (new_index, agent) or None if no agents
remain. Pure — easy to unit-test."""
new_agents = [a for a in agents_before if a.slug != stopped_slug]
if not new_agents:
return None
new_index = min(max(selected_index, 0), len(new_agents) - 1)
return new_index, new_agents[new_index]
# --- tmux argv builders ------------------------------------------------------
def _in_tmux() -> bool:
"""True when the dashboard is running inside a tmux session.
Tmux sets `$TMUX` to the path of its server socket."""
return bool(os.environ.get("TMUX"))
def _agent_runtime_args(
*, resume: bool, remote_control: bool = False, agent_provider_template: str = "claude",
) -> list[str]:
"""The argv the dashboard hands to `bottle.agent_argv`
on every attach — matches what `attach_agent` builds for the
foreground handoff so both surfaces produce the same claude
invocation."""
runtime = runtime_for(agent_provider_template)
args = list(runtime.bypass_args)
if remote_control:
args.extend(runtime.remote_control_args)
if resume:
args.extend(runtime.resume_args)
return args
def _build_resume_argv_with_fallback(
bottle, *, remote_control: bool = False, agent_provider_template: str = "claude",
) -> list[str]:
"""Build a backend-exec argv that runs `claude --continue` and
falls back to plain `claude` if no prior session exists.
`--continue` exits non-zero when an agent has been spun up
but never typed at — there's no transcript to resume. The
shell-level `||` wrapper makes that case start a fresh
session instead of crashing the pane. The trade-off: we
invoke `sh -c` inside the bottle, so the command is two
`claude` invocations behind a tiny shell rather than one
direct exec. Acceptable; the shell adds microseconds and
the fallback only kicks in when --continue would have
failed anyway.
Works across backends because `bottle.agent_argv` always
surfaces the `claude` token preceded by the backend's exec
framing (docker: `docker exec -it <c>`; smolmachines:
`smolvm machine exec --name <m> -- runuser -u node --`).
Splitting at `claude` keeps the framing as the prefix and
wraps just the agent tail in `sh -c`."""
if agent_provider_template != "claude":
return bottle.agent_argv(
_agent_runtime_args(
resume=True,
remote_control=remote_control,
agent_provider_template=agent_provider_template,
)
)
base_args = _agent_runtime_args(
resume=False,
remote_control=remote_control,
agent_provider_template=agent_provider_template,
)
base_exec = bottle.agent_argv(base_args)
# Split exec-framing prefix from the agent-and-args tail so
# we can compose `<claude…> --continue || <claude…>` inside
# `sh -c`. The provider command token is the marker.
command = getattr(bottle, "agent_command", runtime_for(agent_provider_template).command)
agent_idx = base_exec.index(command)
prefix = base_exec[:agent_idx]
agent_cmd = " ".join(shlex.quote(a) for a in base_exec[agent_idx:])
resume_args = " ".join(
shlex.quote(a) for a in runtime_for(agent_provider_template).resume_args
)
return [
*prefix,
"sh", "-c",
f"{agent_cmd} {resume_args} || {agent_cmd}",
]
def _build_split_pane_argv(agent_argv: list[str]) -> list[str]:
"""Pure helper: wrap a backend-exec argv with `tmux split-window
-h -P -F '#{pane_id}'`. The `-P -F` combo tells tmux to print
the new pane's id on stdout so we can track it for later
`respawn-pane` calls."""
return [
"tmux", "split-window", "-h",
"-P", "-F", "#{pane_id}",
*agent_argv,
]
def _build_respawn_pane_argv(pane_id: str, agent_argv: list[str]) -> list[str]:
"""Pure helper: wrap a backend-exec argv with `tmux respawn-pane
-k -t <pane_id>`. `-k` kills the existing process in the pane
before respawning."""
return ["tmux", "respawn-pane", "-k", "-t", pane_id, *agent_argv]
+620
View File
@@ -0,0 +1,620 @@
"""supervise: list pending supervise proposals across all bottles and
act on them (approve / modify / reject).
Curses-based TUI; modify-then-approve shells out to $EDITOR. The
approval handlers wire to the per-tool remediation engines:
PRD 0014 (egress, retargeted from cred-proxy in PRD 0017
chunk 3) writes routes.yaml + SIGHUPs egress; PRD 0015
(pipelock) writes the allowlist + restarts pipelock; PRD 0016
(capability) rebuilds the bottle Dockerfile.
"""
from __future__ import annotations
import argparse
import curses
import os
import subprocess
import sys
import tempfile
import time
import traceback
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from .. import supervise as _supervise
from ..backend.docker.bottle_state import read_metadata
from ..backend.docker.capability_apply import (
CapabilityApplyError,
apply_capability_change,
)
from ..backend.docker.egress_apply import EgressApplyError, add_route
from ..backend.docker.pipelock_apply import (
PipelockApplyError,
apply_allowlist_change,
fetch_current_allowlist,
parse_allowlist_content,
render_allowlist_content,
)
from ..log import Die, error, info
from ..supervise import (
COMPONENT_FOR_TOOL,
AuditEntry,
Proposal,
Response,
STATUS_APPROVED,
STATUS_MODIFIED,
STATUS_REJECTED,
TOOL_CAPABILITY_BLOCK,
TOOL_EGRESS_BLOCK,
TOOL_PIPELOCK_BLOCK,
archive_proposal,
list_pending_proposals,
render_diff,
write_audit_entry,
write_response,
)
from ._common import PROG
_REFRESH_INTERVAL_MS = 1000
_NEW_PROPOSAL_HIGHLIGHT_SEC = 5.0
@dataclass(frozen=True)
class QueuedProposal:
"""A pending proposal plus the queue dir it was found in."""
proposal: Proposal
queue_dir: Path
# Errors any remediation engine may raise. Caught by the TUI key
# handlers and surfaced in the status line so a failed apply keeps
# the proposal pending rather than crashing curses.
ApplyError = (EgressApplyError, PipelockApplyError, CapabilityApplyError)
def discover_pending() -> list[QueuedProposal]:
"""Walk ~/.bot-bottle/queue/* and collect pending proposals."""
queue_root = _supervise.bot_bottle_root() / "queue"
if not queue_root.is_dir():
return []
out: list[QueuedProposal] = []
for slug_dir in sorted(queue_root.iterdir()):
if not slug_dir.is_dir():
continue
for proposal in list_pending_proposals(slug_dir):
out.append(QueuedProposal(proposal=proposal, queue_dir=slug_dir))
out.sort(key=lambda q: q.proposal.arrival_timestamp)
return out
def _approval_status(qp: QueuedProposal, verb: str) -> str:
"""Status-line text after a successful approval."""
base = f"{verb} {qp.proposal.tool} for [{qp.proposal.bottle_slug}]"
if qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
return f"{base}; resume: ./cli.py resume {qp.proposal.bottle_slug}"
return base
def _is_recent(
proposal_id: str,
first_seen: dict[str, float] | None,
now: float | None,
) -> bool:
"""True if `proposal_id` was first seen within the highlight window."""
if first_seen is None or now is None:
return False
started = first_seen.get(proposal_id)
if started is None:
return False
return (now - started) < _NEW_PROPOSAL_HIGHLIGHT_SEC
def _detail_lines(
qp: QueuedProposal,
*,
green_attr: int = 0,
) -> list[tuple[str, int]]:
"""Return the detail-view body as (text, curses-attr) tuples."""
p = qp.proposal
out: list[tuple[str, int]] = [
(f"bottle: {p.bottle_slug}", 0),
(f"tool: {p.tool}", 0),
(f"id: {p.id}", 0),
(f"arrived: {p.arrival_timestamp}", 0),
(f"queue: {qp.queue_dir}", 0),
("", 0),
("justification:", 0),
]
out.extend((" " + line, 0) for line in p.justification.splitlines() or [""])
out.extend([
("", 0),
(_proposed_payload_label(p.tool) + ":", 0),
])
out.extend((line, 0) for line in p.proposed_file.splitlines() or [""])
if p.tool == TOOL_PIPELOCK_BLOCK:
host = _failed_url_host(p.proposed_file)
if host:
out.append(("", 0))
out.append((host, green_attr))
return out
def _failed_url_host(url: str) -> str:
"""Best-effort hostname extraction from a pipelock-block proposal."""
import urllib.parse
try:
return urllib.parse.urlsplit(url.strip()).hostname or ""
except ValueError:
return ""
def _proposed_payload_label(tool: str) -> str:
if tool == TOOL_PIPELOCK_BLOCK:
return "failed URL"
return "proposed file"
def _suffix_for_tool(tool: str) -> str:
if tool == TOOL_CAPABILITY_BLOCK:
return ".dockerfile"
return ".txt"
# --- Operator actions ------------------------------------------------------
def approve(
qp: QueuedProposal,
*,
notes: str = "",
final_file: str | None = None,
) -> None:
"""Apply the proposal, write the waiting response, and audit it."""
status = STATUS_MODIFIED if final_file is not None else STATUS_APPROVED
file_to_apply = final_file if final_file is not None else qp.proposal.proposed_file
diff_before, diff_after = "", ""
if qp.proposal.tool == TOOL_EGRESS_BLOCK:
diff_before, diff_after = add_route(
qp.proposal.bottle_slug, file_to_apply,
)
elif qp.proposal.tool == TOOL_PIPELOCK_BLOCK:
diff_before, diff_after = _apply_pipelock_url(
qp.proposal.bottle_slug, file_to_apply,
)
elif qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
_meta = read_metadata(qp.proposal.bottle_slug)
if _meta is not None and not _meta.compose_project:
raise CapabilityApplyError(
"capability-block remediation is not supported for smolmachines "
"bottles. Reject this proposal or handle the capability change "
"manually, then restart the bottle."
)
diff_before, diff_after = apply_capability_change(
qp.proposal.bottle_slug, file_to_apply,
)
response = Response(
proposal_id=qp.proposal.id,
status=status,
notes=notes,
final_file=final_file,
)
write_response(qp.queue_dir, response)
_write_audit(
qp, action=status, notes=notes,
diff_before=diff_before, diff_after=diff_after,
)
if qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
archive_proposal(qp.queue_dir, qp.proposal.id)
def reject(qp: QueuedProposal, *, reason: str) -> None:
"""Write a rejection response and an audit entry."""
response = Response(
proposal_id=qp.proposal.id,
status=STATUS_REJECTED,
notes=reason,
final_file=None,
)
write_response(qp.queue_dir, response)
_write_audit(qp, action=STATUS_REJECTED, notes=reason, diff_before="", diff_after="")
def _apply_pipelock_url(slug: str, failed_url: str) -> tuple[str, str]:
"""Merge a pipelock-block failed URL's host into the allowlist."""
import urllib.parse
parsed = urllib.parse.urlsplit(failed_url.strip())
host = parsed.hostname or ""
if not host:
raise PipelockApplyError(
f"proposed failed_url has no extractable host: {failed_url!r}"
)
current = fetch_current_allowlist(slug)
hosts = parse_allowlist_content(current)
if host not in hosts:
hosts.append(host)
return apply_allowlist_change(slug, render_allowlist_content(hosts))
def _write_audit(
qp: QueuedProposal,
*,
action: str,
notes: str,
diff_before: str,
diff_after: str,
) -> None:
"""Audit log for egress / pipelock tools."""
component = COMPONENT_FOR_TOOL.get(qp.proposal.tool)
if component is None:
return
write_audit_entry(AuditEntry(
timestamp=datetime.now(timezone.utc).isoformat(),
bottle_slug=qp.proposal.bottle_slug,
component=component,
operator_action=action,
operator_notes=notes,
justification=qp.proposal.justification,
diff=render_diff(diff_before, diff_after, label=component),
))
# --- $EDITOR integration --------------------------------------------------
def edit_in_editor(content: str, *, suffix: str = ".tmp") -> str | None:
"""Open `content` in $EDITOR and return edited content, if changed."""
editor = os.environ.get("EDITOR", "vim")
with tempfile.NamedTemporaryFile(
mode="w", suffix=suffix, delete=False, prefix="supervise-modify.",
) as f:
f.write(content)
path = f.name
try:
subprocess.run([editor, path], check=False)
with open(path) as f:
edited = f.read()
return edited if edited != content else None
finally:
try:
os.unlink(path)
except OSError:
pass
# --- TUI -------------------------------------------------------------------
def cmd_supervise(argv: list[str]) -> int:
parser = argparse.ArgumentParser(prog=f"{PROG} supervise", add_help=True)
parser.add_argument(
"--once", action="store_true",
help="list pending proposals once and exit (no TUI)",
)
args = parser.parse_args(argv)
if args.once:
return _list_once()
try:
curses.wrapper(_main_loop)
except KeyboardInterrupt:
return 130
except Die as e:
if e.message:
error(e.message)
else:
error("supervise exited on a fatal error (no detail captured).")
return e.code if isinstance(e.code, int) else 1
except Exception as e:
log_path = _write_crash_log(e)
error(f"supervise crashed: {type(e).__name__}: {e}")
error(f"full traceback written to {log_path}")
return 1
return 0
def _write_crash_log(exc: BaseException) -> Path:
"""Persist `exc`'s traceback to a stable file under ~/.bot-bottle/."""
stamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
body = "".join(
traceback.format_exception(type(exc), exc, exc.__traceback__)
)
entry = f"=== supervise crash {stamp} ===\n{body}\n"
try:
log_dir = _supervise.bot_bottle_root() / "logs"
log_dir.mkdir(parents=True, exist_ok=True)
path = log_dir / "supervise-crash.log"
with path.open("a", encoding="utf-8") as fh:
fh.write(entry)
return path
except OSError:
fd, tmp = tempfile.mkstemp(
prefix="bot-bottle-supervise-crash-", suffix=".log",
)
with os.fdopen(fd, "w", encoding="utf-8") as fh:
fh.write(entry)
return Path(tmp)
def _list_once() -> int:
pending = discover_pending()
if not pending:
info("no pending proposals")
return 0
for qp in pending:
sys.stdout.write(
f"{qp.proposal.arrival_timestamp} "
f"[{qp.proposal.bottle_slug}] "
f"{qp.proposal.tool} "
f"{qp.proposal.id}\n"
)
sys.stdout.write(f" {qp.proposal.justification}\n")
return 0
def _try_init_green() -> int:
"""Initialise a green color pair and return its attr, or 0."""
try:
curses.start_color()
curses.use_default_colors()
curses.init_pair(1, curses.COLOR_GREEN, -1)
return curses.color_pair(1)
except curses.error:
return 0
def _in_tmux() -> bool:
return bool(os.environ.get("TMUX"))
def _select_tmux_pane(pane_id: str) -> None:
try:
subprocess.run(
["tmux", "select-pane", "-t", pane_id],
capture_output=True, text=True, check=False,
)
except FileNotFoundError:
pass
def _main_loop(stdscr: "curses._CursesWindow") -> None:
curses.curs_set(0)
stdscr.timeout(_REFRESH_INTERVAL_MS)
green_attr = _try_init_green()
first_seen: dict[str, float] = {}
selected = 0
status_line = ""
saw_first_tick = False
supervise_pane_id = os.environ.get("TMUX_PANE", "")
while True:
pending = discover_pending()
if selected >= len(pending):
selected = max(0, len(pending) - 1)
now = time.monotonic()
live_ids = {qp.proposal.id for qp in pending}
newly_arrived = live_ids - first_seen.keys()
if saw_first_tick and newly_arrived:
try:
curses.beep()
except curses.error:
pass
if supervise_pane_id and _in_tmux():
_select_tmux_pane(supervise_pane_id)
for i, qp in enumerate(pending):
if qp.proposal.id in newly_arrived:
selected = i
break
for proposal_id in live_ids:
first_seen.setdefault(proposal_id, now)
for stale_id in list(first_seen.keys() - live_ids):
del first_seen[stale_id]
saw_first_tick = True
_render(
stdscr, pending, selected, status_line,
first_seen=first_seen, now=now, green_attr=green_attr,
)
try:
key = stdscr.getch()
except KeyboardInterrupt:
return
if key == -1:
continue
status_line = ""
if key in (ord("q"), 27):
return
if not pending:
continue
qp = pending[selected]
if key in (curses.KEY_DOWN, ord("j")):
selected = min(selected + 1, len(pending) - 1)
elif key in (curses.KEY_UP, ord("k")):
selected = max(selected - 1, 0)
elif key in (curses.KEY_ENTER, 10, 13):
_detail_view(stdscr, qp, green_attr=green_attr)
elif key == ord("a"):
try:
approve(qp)
status_line = _approval_status(qp, "approved")
except ApplyError as e:
status_line = f"apply failed: {e}"
elif key == ord("m"):
edited = _modify(stdscr, qp)
if edited is None:
status_line = "modify aborted (no change)"
else:
try:
approve(qp, final_file=edited, notes="operator modified before approving")
status_line = _approval_status(qp, "modified+approved")
except ApplyError as e:
status_line = f"apply failed: {e}"
elif key == ord("r"):
reason = _prompt(stdscr, "reject reason: ")
if reason:
reject(qp, reason=reason)
status_line = f"rejected {qp.proposal.tool} for [{qp.proposal.bottle_slug}]"
else:
status_line = "reject aborted (empty reason)"
def _render(
stdscr: "curses._CursesWindow",
pending: list[QueuedProposal],
selected: int,
status_line: str,
*,
first_seen: dict[str, float] | None = None,
now: float | None = None,
green_attr: int = 0,
) -> None:
stdscr.erase()
h, w = stdscr.getmaxyx()
header = f"bot-bottle supervise ({len(pending)} pending)"
stdscr.addnstr(0, 0, header, w - 1, curses.A_BOLD)
stdscr.hline(1, 0, curses.ACS_HLINE, w)
row = 2
if not pending:
stdscr.addnstr(
row, 2,
"no pending proposals; agents will queue here when they call a "
"supervise tool",
w - 4,
)
else:
for i, qp in enumerate(pending):
if row >= h - 3:
break
p = qp.proposal
ts_short = (
p.arrival_timestamp.split("T", 1)[1][:8]
if "T" in p.arrival_timestamp else p.arrival_timestamp
)
cursor = "> " if i == selected else " "
line = (
f"{cursor}{ts_short} "
f"[{p.bottle_slug}] {p.tool:<18} {p.id[:8]} "
f"{_proposed_payload_label(p.tool)}"
)
attr = curses.A_REVERSE if i == selected else curses.A_NORMAL
if _is_recent(p.id, first_seen, now):
attr |= green_attr
stdscr.addnstr(row, 0, line, w - 1, attr)
row += 1
if row >= h - 3:
break
if p.justification:
stdscr.addnstr(row, 4, p.justification[: max(0, w - 5)], w - 5)
row += 1
footer = "[j/k] move [Enter] view [a] approve [m] modify [r] reject [q] quit"
stdscr.hline(h - 2, 0, curses.ACS_HLINE, w)
stdscr.addnstr(h - 1, 0, footer, w - 1, curses.A_DIM)
if status_line:
stdscr.addnstr(h - 3, 0, status_line, w - 1, curses.A_BOLD)
stdscr.refresh()
def _detail_view(
stdscr: "curses._CursesWindow",
qp: QueuedProposal,
*,
green_attr: int = 0,
) -> None:
"""Render the full proposal. Scrollable. Press q to return."""
lines = _detail_lines(qp, green_attr=green_attr)
offset = 0
while True:
stdscr.erase()
h, w = stdscr.getmaxyx()
for i, (text, attr) in enumerate(lines[offset:offset + h - 1]):
stdscr.addnstr(i, 0, text, w - 1, attr)
stdscr.addnstr(
h - 1, 0,
"[j/k] scroll [g/G] top/bottom [a] approve [m] modify [r] reject [q] back",
w - 1, curses.A_DIM,
)
stdscr.refresh()
key = stdscr.getch()
if key in (ord("q"), 27):
return
if key in (curses.KEY_DOWN, ord("j")):
offset = min(offset + 1, max(0, len(lines) - 1))
elif key in (curses.KEY_UP, ord("k")):
offset = max(offset - 1, 0)
elif key == ord("g"):
offset = 0
elif key == ord("G"):
offset = max(0, len(lines) - 1)
elif key == ord("a"):
try:
approve(qp)
except ApplyError:
pass
return
elif key == ord("m"):
edited = _modify(stdscr, qp)
if edited is not None:
try:
approve(qp, final_file=edited, notes="operator modified before approving")
except ApplyError:
pass
return
elif key == ord("r"):
reason = _prompt(stdscr, "reject reason: ")
if reason:
reject(qp, reason=reason)
return
def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None:
"""Suspend curses, open $EDITOR on the proposed file, return edited content."""
suffix = _suffix_for_tool(qp.proposal.tool)
curses.endwin()
try:
edited = edit_in_editor(qp.proposal.proposed_file, suffix=suffix)
finally:
stdscr.refresh()
return edited
def _prompt(stdscr: "curses._CursesWindow", label: str) -> str:
"""One-line input at the bottom of the screen."""
curses.curs_set(1)
h, _ = stdscr.getmaxyx()
stdscr.move(h - 2, 0)
stdscr.clrtoeol()
stdscr.addstr(h - 2, 0, label)
stdscr.refresh()
curses.echo()
try:
raw = stdscr.getstr(h - 2, len(label), 200)
finally:
curses.noecho()
curses.curs_set(0)
return raw.decode("utf-8", errors="replace").strip()
__all__ = [
"QueuedProposal",
"approve",
"cmd_supervise",
"discover_pending",
"edit_in_editor",
"reject",
]