From f0ca4e352770ea8902d3d452a67a1f38ba45c2f8 Mon Sep 17 00:00:00 2001 From: claude Date: Wed, 3 Jun 2026 15:52:27 +0000 Subject: [PATCH] refactor: extract dashboard state/model layer into dashboard_model.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Splits the 2103-line dashboard.py into two modules. Pure data structures (QueuedProposal), discovery helpers (discover_pending, discover_active_agents), derived-value helpers (_is_recent, _approval_status, _format_agent_row, _detail_lines, etc.), and argv-builder helpers (_build_split_pane_argv, _build_respawn_pane_argv, _build_resume_argv_with_fallback, _agent_runtime_args) all move to dashboard_model.py. The curses TUI, $EDITOR integration, tmux subprocess flows, and action handlers (approve, reject, operator_edit_routes, operator_edit_allowlist) remain in dashboard.py, which re-imports everything from dashboard_model so existing callers and tests are unaffected. Adds tests/unit/test_dashboard_model.py covering _approval_status, _proposed_payload_label, and _suffix_for_tool — three helpers that had no prior coverage. All 894 unit tests pass. Closes #158 --- bot_bottle/cli/dashboard.py | 418 ++-------------------------- bot_bottle/cli/dashboard_model.py | 421 +++++++++++++++++++++++++++++ tests/unit/test_dashboard_model.py | 94 +++++++ 3 files changed, 543 insertions(+), 390 deletions(-) create mode 100644 bot_bottle/cli/dashboard_model.py create mode 100644 tests/unit/test_dashboard_model.py diff --git a/bot_bottle/cli/dashboard.py b/bot_bottle/cli/dashboard.py index 9e225ee..edcad09 100644 --- a/bot_bottle/cli/dashboard.py +++ b/bot_bottle/cli/dashboard.py @@ -15,31 +15,27 @@ import argparse import contextlib import curses import os -import shlex import shutil import subprocess import sys import tempfile import time import traceback -from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from .. import supervise as _supervise -from ..agent_provider import runtime_for from ..backend import ( ActiveAgent, BottleSpec, - enumerate_active_agents, get_bottle_backend, known_backend_names, ) +from ..backend.docker.bottle_state import bottle_state_dir, read_metadata from ..backend.docker.capability_apply import ( CapabilityApplyError, apply_capability_change, ) -from ..backend.docker.bottle_state import bottle_state_dir, read_metadata from ..backend.docker.egress_apply import ( EgressApplyError, add_route, @@ -68,12 +64,38 @@ from ..supervise import ( TOOL_EGRESS_BLOCK, TOOL_PIPELOCK_BLOCK, archive_proposal, - list_pending_proposals, render_diff, write_audit_entry, write_response, ) from ._common import PROG, USER_CWD +from .dashboard_model import ( + PANE_AGENTS, + PANE_PROPOSALS, + QueuedProposal, + _NEW_PROPOSAL_HIGHLIGHT_SEC, + _REFRESH_INTERVAL_MS, + _agent_runtime_args, + _approval_status, + _bottle_for_slug, + _build_respawn_pane_argv, + _build_resume_argv_with_fallback, + _build_split_pane_argv, + _detail_lines, + _failed_url_host, + _filter_agents, + _format_agent_row, + _in_tmux, + _is_recent, + _pick_next_after_stop, + _proposed_payload_label, + _running_counts, + _selected_agent, + _selection_status, + _suffix_for_tool, + discover_active_agents, + discover_pending, +) from .start import ( attach_agent, capture_claude_session_state, @@ -88,55 +110,6 @@ from .start import ( ApplyError = (EgressApplyError, PipelockApplyError, CapabilityApplyError) -# --- Discovery ------------------------------------------------------------- - - -@dataclass(frozen=True) -class QueuedProposal: - """A pending proposal plus the queue dir it was found in.""" - - proposal: Proposal - queue_dir: Path - - -def discover_active_agents() -> list[ActiveAgent]: - """All currently-running agents across every backend with - their metadata + service set. Returns [] when neither - backend is reachable. Backed by the shared - `enumerate_active_agents` helper so the CLI's - `./cli.py list active` and this dashboard show the same data.""" - return enumerate_active_agents() - - - - -def _approval_status(qp: QueuedProposal, verb: str) -> str: - """Status-line text after a successful approval. For capability- - block, append the `resume ` hint so the operator can - bring the rebuilt bottle back up with one copy-paste.""" - base = f"{verb} {qp.proposal.tool} for [{qp.proposal.bottle_slug}]" - if qp.proposal.tool == TOOL_CAPABILITY_BLOCK: - return f"{base}; resume: ./cli.py resume {qp.proposal.bottle_slug}" - return base - - -def discover_pending() -> list[QueuedProposal]: - """Walk ~/.bot-bottle/queue/* and collect pending proposals - from every bottle's queue. Sorted by arrival time across the - union — the operator works the global FIFO.""" - queue_root = _supervise.bot_bottle_root() / "queue" - if not queue_root.is_dir(): - return [] - out: list[QueuedProposal] = [] - for slug_dir in sorted(queue_root.iterdir()): - if not slug_dir.is_dir(): - continue - for proposal in list_pending_proposals(slug_dir): - out.append(QueuedProposal(proposal=proposal, queue_dir=slug_dir)) - out.sort(key=lambda q: q.proposal.arrival_timestamp) - return out - - # --- Operator actions ------------------------------------------------------ @@ -359,15 +332,6 @@ def edit_in_editor(content: str, *, suffix: str = ".tmp") -> str | None: # loop's `bottles` dict; chunks 3/4 wire Enter / `x` to act on it. -def _filter_agents(query: str, names: list[str]) -> list[str]: - """Case-insensitive substring filter for the picker. Pure - function — no curses, easy to unit-test.""" - if not query: - return list(names) - q = query.lower() - return [n for n in names if q in n.lower()] - - def _picker_modal( stdscr: "curses._CursesWindow", names: list[str], @@ -627,63 +591,6 @@ def _capture_preflight_text(plan) -> str: return buf.getvalue().strip("\n") -def _running_counts( - bottles: dict, agents_now: list[ActiveAgent], -) -> dict[str, int]: - """Per-agent running count: dashboard-owned + externally- - discovered, summed by agent_name. The picker shows this so the - operator knows whether picking an agent starts a fresh bottle - or a Nth one.""" - counts: dict[str, int] = {} - for a in agents_now: - counts[a.agent_name] = counts.get(a.agent_name, 0) + 1 - return counts - - -def _bottle_for_slug( - slug: str, - bottles: dict, - manifest: Manifest | None, -) -> tuple["object", str]: - """Return `(bottle_handle, prompt_path_hint)` for a re-attach. - If the slug is in `bottles` (dashboard-owned), return the stored - handle directly. Otherwise synthesize a bottle from the persisted - metadata. The backend field in metadata (PRD 0040) selects Docker - or smolmachines; unknown or missing metadata defaults to Docker. - - Returns the empty string for prompt_path_hint when we omit the - flag — the caller passes None to DockerBottle in that case.""" - from ..backend.docker.bottle import DockerBottle - from ..backend.docker.bottle_state import read_metadata - from ..backend.smolmachines.bottle import SmolmachinesBottle - if slug in bottles: - _cm, bottle, _identity = bottles[slug] - return bottle, "" - instance_name = f"bot-bottle-{slug}" - prompt_path: str | None = None - metadata = read_metadata(slug) - if metadata is not None and manifest is not None: - agent = manifest.agents.get(metadata.agent_name) - if agent is not None and agent.prompt: - container_home = os.environ.get( - "BOT_BOTTLE_CONTAINER_HOME", "/home/node", - ) - prompt_path = f"{container_home}/.bot-bottle-prompt.txt" - backend = metadata.backend if metadata is not None else "" - if backend == "smolmachines": - synth: object = SmolmachinesBottle( - instance_name, - prompt_path=prompt_path, - ) - else: - synth = DockerBottle( - container=instance_name, - teardown=lambda: None, - prompt_path_in_container=prompt_path, - ) - return synth, (prompt_path or "") - - def _stop_bottle_flow( stdscr: "curses._CursesWindow", bottles: dict, @@ -770,100 +677,6 @@ def _stop_bottle_flow( # reused across attaches. -def _in_tmux() -> bool: - """True when the dashboard is running inside a tmux session. - Tmux sets `$TMUX` to the path of its server socket.""" - return bool(os.environ.get("TMUX")) - - -def _agent_runtime_args( - *, resume: bool, remote_control: bool = False, agent_provider_template: str = "claude", -) -> list[str]: - """The argv the dashboard hands to `bottle.agent_argv` - on every attach — matches what `attach_agent` builds for the - foreground handoff so both surfaces produce the same claude - invocation.""" - runtime = runtime_for(agent_provider_template) - args = list(runtime.bypass_args) - if remote_control: - args.extend(runtime.remote_control_args) - if resume: - args.extend(runtime.resume_args) - return args - - -def _build_resume_argv_with_fallback( - bottle, *, remote_control: bool = False, agent_provider_template: str = "claude", -) -> list[str]: - """Build a backend-exec argv that runs `claude --continue` and - falls back to plain `claude` if no prior session exists. - - `--continue` exits non-zero when an agent has been spun up - but never typed at — there's no transcript to resume. The - shell-level `||` wrapper makes that case start a fresh - session instead of crashing the pane. The trade-off: we - invoke `sh -c` inside the bottle, so the command is two - `claude` invocations behind a tiny shell rather than one - direct exec. Acceptable; the shell adds microseconds and - the fallback only kicks in when --continue would have - failed anyway. - - Works across backends because `bottle.agent_argv` always - surfaces the `claude` token preceded by the backend's exec - framing (docker: `docker exec -it `; smolmachines: - `smolvm machine exec --name -- runuser -u node --`). - Splitting at `claude` keeps the framing as the prefix and - wraps just the agent tail in `sh -c`.""" - if agent_provider_template != "claude": - return bottle.agent_argv( - _agent_runtime_args( - resume=True, - remote_control=remote_control, - agent_provider_template=agent_provider_template, - ) - ) - base_args = _agent_runtime_args( - resume=False, - remote_control=remote_control, - agent_provider_template=agent_provider_template, - ) - base_exec = bottle.agent_argv(base_args) - # Split exec-framing prefix from the agent-and-args tail so - # we can compose ` --continue || ` inside - # `sh -c`. The provider command token is the marker. - command = getattr(bottle, "agent_command", runtime_for(agent_provider_template).command) - agent_idx = base_exec.index(command) - prefix = base_exec[:agent_idx] - agent_cmd = " ".join(shlex.quote(a) for a in base_exec[agent_idx:]) - resume_args = " ".join( - shlex.quote(a) for a in runtime_for(agent_provider_template).resume_args - ) - return [ - *prefix, - "sh", "-c", - f"{agent_cmd} {resume_args} || {agent_cmd}", - ] - - -def _build_split_pane_argv(agent_argv: list[str]) -> list[str]: - """Pure helper: wrap a backend-exec argv with `tmux split-window - -h -P -F '#{pane_id}'`. The `-P -F` combo tells tmux to print - the new pane's id on stdout so we can track it for later - `respawn-pane` calls.""" - return [ - "tmux", "split-window", "-h", - "-P", "-F", "#{pane_id}", - *agent_argv, - ] - - -def _build_respawn_pane_argv(pane_id: str, agent_argv: list[str]) -> list[str]: - """Pure helper: wrap a backend-exec argv with `tmux respawn-pane - -k -t `. `-k` kills the existing process in the pane - before respawning.""" - return ["tmux", "respawn-pane", "-k", "-t", pane_id, *agent_argv] - - @contextlib.contextmanager def _redirect_stderr_to_file(path): """Redirect file descriptor 2 (stderr) to `path` for the @@ -980,24 +793,6 @@ def _tmux_close_right_pane(tmux_state: dict) -> None: tmux_state["slug"] = None -def _pick_next_after_stop( - agents_before: list[ActiveAgent], - selected_index: int, - stopped_slug: str, -) -> tuple[int, ActiveAgent] | None: - """After stopping `stopped_slug` from the agents list, choose - the agent that should take focus next. The agent below the - stopped row (which slides up to fill its index) is the - natural pick; if the stopped agent was last, the row above - instead. Returns (new_index, agent) or None if no agents - remain. Pure — easy to unit-test.""" - new_agents = [a for a in agents_before if a.slug != stopped_slug] - if not new_agents: - return None - new_index = min(max(selected_index, 0), len(new_agents) - 1) - return new_index, new_agents[new_index] - - def _ensure_right_pane(tmux_state: dict, argv: list[str]) -> str | None: """Run `argv` in the dashboard's right pane — respawn an existing tracked pane if one is alive, split-window to @@ -1355,31 +1150,6 @@ def _list_once() -> int: return 0 -_REFRESH_INTERVAL_MS = 1000 - -# How long a newly-arrived proposal stays highlighted (green) in the -# list. Long enough for the operator to notice in their peripheral -# vision, short enough to fade before the queue feels permanently -# noisy. -_NEW_PROPOSAL_HIGHLIGHT_SEC = 5.0 - - -def _is_recent( - proposal_id: str, - first_seen: dict[str, float] | None, - now: float | None, -) -> bool: - """True if `proposal_id` was first seen within the highlight - window. Both `first_seen` and `now` may be None (rendered as - not-recent) so the helper is safe in cold-start paths.""" - if first_seen is None or now is None: - return False - started = first_seen.get(proposal_id) - if started is None: - return False - return (now - started) < _NEW_PROPOSAL_HIGHLIGHT_SEC - - def _try_init_green() -> int: """Initialise a green color pair and return its attr, or 0 if the terminal doesn't support color. Caller ORs the returned value @@ -1417,14 +1187,6 @@ def _quit_without_teardown(bottles: dict) -> None: os._exit(0) -# PRD 0019 chunk 3: which pane the j/k/arrow keys move through. -# Tab toggles. The proposals pane is the default focus — proposal -# action keys (a/m/r/Enter) require it; agent-scoped keys (e/p, -# chunk 4) require the agents pane. -PANE_PROPOSALS = "proposals" -PANE_AGENTS = "agents" - - def _main_loop(stdscr: "curses._CursesWindow") -> None: curses.curs_set(0) # Auto-refresh: getch() returns -1 after the timeout if no key @@ -1811,63 +1573,6 @@ def _render( stdscr.refresh() -def _selection_status( - focus: str, agents: list[ActiveAgent], selected_agent: int, -) -> str: - """Status-line text for the idle state. Surfaces the agents- - pane selection so the operator can tell what an agent-scoped - edit verb would target.""" - if focus != PANE_AGENTS: - return "" - if not agents: - return "[no active agents]" - if 0 <= selected_agent < len(agents): - return f"[selected: {agents[selected_agent].slug}]" - return "[no agent selected]" - - -def _selected_agent( - focus: str, agents: list[ActiveAgent], selected_agent: int, -) -> ActiveAgent | None: - """The selected agent to scope `e` / `p` to, or None if no - selection is valid (proposals pane focused, no active agents, - or selection out of bounds).""" - if focus != PANE_AGENTS: - return None - if not agents: - return None - if 0 <= selected_agent < len(agents): - return agents[selected_agent] - return None - - -def _format_agent_row(a: ActiveAgent, maxw: int) -> str: - """One-line agent row: ` [] started - []`. The `agent` service is filtered out of - the displayed list — it's always present for an active bottle, - so listing it carries no information; the sidecars are the - differentiator. - - The `[docker]` / `[smolmachines]` prefix lets the operator tell - which backend a bottle came from (issue #77). Truncated to - `maxw` because the renderer's addnstr only enforces width if - we hand it a properly-sized string.""" - started = ( - a.started_at.split("T", 1)[1][:8] - if "T" in a.started_at else (a.started_at or "?") - ) - sidecars = tuple(s for s in a.services if s != "agent") - services = ",".join(sidecars) if sidecars else "(starting)" - backend_tag = f"[{a.backend_name}]" if a.backend_name else "" - line = ( - f" {backend_tag} {a.slug} {a.agent_name} " - f"started {started} [{services}]" - ) - if len(line) > maxw: - return line[: max(0, maxw - 1)] + "…" - return line - - def _detail_view( stdscr: "curses._CursesWindow", qp: QueuedProposal, @@ -1921,66 +1626,6 @@ def _detail_view( return -def _detail_lines( - qp: QueuedProposal, - *, - green_attr: int = 0, -) -> list[tuple[str, int]]: - """Return the detail-view body as (text, curses-attr) tuples. - Most lines are plain (attr=0); pipelock-block proposals append - a green "→ would allow host: ..." line so the operator sees at - a glance which hostname will land in pipelock's allowlist if - they hit approve. The URL itself is shown above for context.""" - p = qp.proposal - out: list[tuple[str, int]] = [ - (f"bottle: {p.bottle_slug}", 0), - (f"tool: {p.tool}", 0), - (f"id: {p.id}", 0), - (f"arrived: {p.arrival_timestamp}", 0), - (f"queue: {qp.queue_dir}", 0), - ("", 0), - ("justification:", 0), - ] - out.extend((" " + line, 0) for line in p.justification.splitlines() or [""]) - out.extend([ - ("", 0), - (_proposed_payload_label(p.tool) + ":", 0), - ]) - out.extend((line, 0) for line in p.proposed_file.splitlines() or [""]) - if p.tool == TOOL_PIPELOCK_BLOCK: - host = _failed_url_host(p.proposed_file) - if host: - # Show the literal line that will be appended to the - # bottle's pipelock allowlist on approve. Green so it - # reads as "what changes"; the URL above carries the - # path context (which pipelock can't enforce — see the - # follow-up note on _apply_pipelock_url). - out.append(("", 0)) - out.append((host, green_attr)) - return out - - -def _failed_url_host(url: str) -> str: - """Best-effort hostname extraction from a pipelock-block proposal's - failed_url payload. Returns empty string on unparseable input — - callers handle empty as "nothing to highlight".""" - import urllib.parse - try: - return urllib.parse.urlsplit(url.strip()).hostname or "" - except ValueError: - return "" - - -def _proposed_payload_label(tool: str) -> str: - """The detail-view section heading for the proposal's payload — - `proposed_file` is what the dataclass calls it, but for - pipelock-block the payload is a single URL not a file. Render - the label per tool so the operator's eye matches.""" - if tool == TOOL_PIPELOCK_BLOCK: - return "failed URL" - return "proposed file" - - def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None: """Suspend curses, open $EDITOR on the proposed file, return the edited content (or None if unchanged).""" @@ -1993,13 +1638,6 @@ def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None: return edited -def _suffix_for_tool(tool: str) -> str: - if tool == TOOL_CAPABILITY_BLOCK: - return ".dockerfile" - # egress-block / pipelock-block: JSON-ish + plain. - return ".txt" - - def _operator_edit_routes_flow( stdscr: "curses._CursesWindow", agent: ActiveAgent, ) -> str: diff --git a/bot_bottle/cli/dashboard_model.py b/bot_bottle/cli/dashboard_model.py new file mode 100644 index 0000000..0a7b252 --- /dev/null +++ b/bot_bottle/cli/dashboard_model.py @@ -0,0 +1,421 @@ +"""dashboard_model: state/model layer for the dashboard TUI. + +Data structures, discovery queries, pure state helpers, and derived +values extracted from dashboard.py so they can be tested in isolation +and navigated without wading through curses rendering code. +""" + +from __future__ import annotations + +import os +import shlex +from dataclasses import dataclass +from pathlib import Path + +from .. import supervise as _supervise +from ..agent_provider import runtime_for +from ..backend import ActiveAgent, enumerate_active_agents +from ..backend.docker.capability_apply import CapabilityApplyError +from ..backend.docker.egress_apply import EgressApplyError +from ..backend.docker.pipelock_apply import PipelockApplyError +from ..manifest import Manifest +from ..supervise import ( + TOOL_CAPABILITY_BLOCK, + TOOL_PIPELOCK_BLOCK, + Proposal, + list_pending_proposals, +) + + +# --- Constants --------------------------------------------------------------- + + +_REFRESH_INTERVAL_MS = 1000 + +_NEW_PROPOSAL_HIGHLIGHT_SEC = 5.0 + +PANE_PROPOSALS = "proposals" +PANE_AGENTS = "agents" + + +# --- Data structures --------------------------------------------------------- + + +@dataclass(frozen=True) +class QueuedProposal: + """A pending proposal plus the queue dir it was found in.""" + + proposal: Proposal + queue_dir: Path + + +ApplyError = (EgressApplyError, PipelockApplyError, CapabilityApplyError) + + +# --- Discovery --------------------------------------------------------------- + + +def discover_active_agents() -> list[ActiveAgent]: + """All currently-running agents across every backend with + their metadata + service set. Returns [] when neither + backend is reachable. Backed by the shared + `enumerate_active_agents` helper so the CLI's + `./cli.py list active` and this dashboard show the same data.""" + return enumerate_active_agents() + + +def discover_pending() -> list[QueuedProposal]: + """Walk ~/.bot-bottle/queue/* and collect pending proposals + from every bottle's queue. Sorted by arrival time across the + union — the operator works the global FIFO.""" + queue_root = _supervise.bot_bottle_root() / "queue" + if not queue_root.is_dir(): + return [] + out: list[QueuedProposal] = [] + for slug_dir in sorted(queue_root.iterdir()): + if not slug_dir.is_dir(): + continue + for proposal in list_pending_proposals(slug_dir): + out.append(QueuedProposal(proposal=proposal, queue_dir=slug_dir)) + out.sort(key=lambda q: q.proposal.arrival_timestamp) + return out + + +# --- Derived values ---------------------------------------------------------- + + +def _approval_status(qp: QueuedProposal, verb: str) -> str: + """Status-line text after a successful approval. For capability- + block, append the `resume ` hint so the operator can + bring the rebuilt bottle back up with one copy-paste.""" + base = f"{verb} {qp.proposal.tool} for [{qp.proposal.bottle_slug}]" + if qp.proposal.tool == TOOL_CAPABILITY_BLOCK: + return f"{base}; resume: ./cli.py resume {qp.proposal.bottle_slug}" + return base + + +def _is_recent( + proposal_id: str, + first_seen: dict[str, float] | None, + now: float | None, +) -> bool: + """True if `proposal_id` was first seen within the highlight + window. Both `first_seen` and `now` may be None (rendered as + not-recent) so the helper is safe in cold-start paths.""" + if first_seen is None or now is None: + return False + started = first_seen.get(proposal_id) + if started is None: + return False + return (now - started) < _NEW_PROPOSAL_HIGHLIGHT_SEC + + +def _selection_status( + focus: str, agents: list[ActiveAgent], selected_agent: int, +) -> str: + """Status-line text for the idle state. Surfaces the agents- + pane selection so the operator can tell what an agent-scoped + edit verb would target.""" + if focus != PANE_AGENTS: + return "" + if not agents: + return "[no active agents]" + if 0 <= selected_agent < len(agents): + return f"[selected: {agents[selected_agent].slug}]" + return "[no agent selected]" + + +def _selected_agent( + focus: str, agents: list[ActiveAgent], selected_agent: int, +) -> ActiveAgent | None: + """The selected agent to scope `e` / `p` to, or None if no + selection is valid (proposals pane focused, no active agents, + or selection out of bounds).""" + if focus != PANE_AGENTS: + return None + if not agents: + return None + if 0 <= selected_agent < len(agents): + return agents[selected_agent] + return None + + +# --- Picker helpers ---------------------------------------------------------- + + +def _filter_agents(query: str, names: list[str]) -> list[str]: + """Case-insensitive substring filter for the picker. Pure + function — no curses, easy to unit-test.""" + if not query: + return list(names) + q = query.lower() + return [n for n in names if q in n.lower()] + + +def _running_counts( + bottles: dict, agents_now: list[ActiveAgent], +) -> dict[str, int]: + """Per-agent running count: dashboard-owned + externally- + discovered, summed by agent_name. The picker shows this so the + operator knows whether picking an agent starts a fresh bottle + or a Nth one.""" + counts: dict[str, int] = {} + for a in agents_now: + counts[a.agent_name] = counts.get(a.agent_name, 0) + 1 + return counts + + +# --- Agent-row rendering helpers --------------------------------------------- + + +def _format_agent_row(a: ActiveAgent, maxw: int) -> str: + """One-line agent row: ` [] started + []`. The `agent` service is filtered out of + the displayed list — it's always present for an active bottle, + so listing it carries no information; the sidecars are the + differentiator. + + The `[docker]` / `[smolmachines]` prefix lets the operator tell + which backend a bottle came from (issue #77). Truncated to + `maxw` because the renderer's addnstr only enforces width if + we hand it a properly-sized string.""" + started = ( + a.started_at.split("T", 1)[1][:8] + if "T" in a.started_at else (a.started_at or "?") + ) + sidecars = tuple(s for s in a.services if s != "agent") + services = ",".join(sidecars) if sidecars else "(starting)" + backend_tag = f"[{a.backend_name}]" if a.backend_name else "" + line = ( + f" {backend_tag} {a.slug} {a.agent_name} " + f"started {started} [{services}]" + ) + if len(line) > maxw: + return line[: max(0, maxw - 1)] + "…" + return line + + +# --- Detail-view helpers ----------------------------------------------------- + + +def _detail_lines( + qp: QueuedProposal, + *, + green_attr: int = 0, +) -> list[tuple[str, int]]: + """Return the detail-view body as (text, curses-attr) tuples. + Most lines are plain (attr=0); pipelock-block proposals append + a green "→ would allow host: ..." line so the operator sees at + a glance which hostname will land in pipelock's allowlist if + they hit approve. The URL itself is shown above for context.""" + p = qp.proposal + out: list[tuple[str, int]] = [ + (f"bottle: {p.bottle_slug}", 0), + (f"tool: {p.tool}", 0), + (f"id: {p.id}", 0), + (f"arrived: {p.arrival_timestamp}", 0), + (f"queue: {qp.queue_dir}", 0), + ("", 0), + ("justification:", 0), + ] + out.extend((" " + line, 0) for line in p.justification.splitlines() or [""]) + out.extend([ + ("", 0), + (_proposed_payload_label(p.tool) + ":", 0), + ]) + out.extend((line, 0) for line in p.proposed_file.splitlines() or [""]) + if p.tool == TOOL_PIPELOCK_BLOCK: + host = _failed_url_host(p.proposed_file) + if host: + out.append(("", 0)) + out.append((host, green_attr)) + return out + + +def _failed_url_host(url: str) -> str: + """Best-effort hostname extraction from a pipelock-block proposal's + failed_url payload. Returns empty string on unparseable input — + callers handle empty as "nothing to highlight".""" + import urllib.parse + try: + return urllib.parse.urlsplit(url.strip()).hostname or "" + except ValueError: + return "" + + +def _proposed_payload_label(tool: str) -> str: + """The detail-view section heading for the proposal's payload — + `proposed_file` is what the dataclass calls it, but for + pipelock-block the payload is a single URL not a file. Render + the label per tool so the operator's eye matches.""" + if tool == TOOL_PIPELOCK_BLOCK: + return "failed URL" + return "proposed file" + + +def _suffix_for_tool(tool: str) -> str: + if tool == TOOL_CAPABILITY_BLOCK: + return ".dockerfile" + return ".txt" + + +# --- Bottle/agent resolution ------------------------------------------------- + + +def _bottle_for_slug( + slug: str, + bottles: dict, + manifest: Manifest | None, +) -> tuple["object", str]: + """Return `(bottle_handle, prompt_path_hint)` for a re-attach. + If the slug is in `bottles` (dashboard-owned), return the stored + handle directly. Otherwise synthesize a bottle from the persisted + metadata. The backend field in metadata (PRD 0040) selects Docker + or smolmachines; unknown or missing metadata defaults to Docker. + + Returns the empty string for prompt_path_hint when we omit the + flag — the caller passes None to DockerBottle in that case.""" + from ..backend.docker.bottle import DockerBottle + from ..backend.docker.bottle_state import read_metadata + from ..backend.smolmachines.bottle import SmolmachinesBottle + if slug in bottles: + _cm, bottle, _identity = bottles[slug] + return bottle, "" + instance_name = f"bot-bottle-{slug}" + prompt_path: str | None = None + metadata = read_metadata(slug) + if metadata is not None and manifest is not None: + agent = manifest.agents.get(metadata.agent_name) + if agent is not None and agent.prompt: + container_home = os.environ.get( + "BOT_BOTTLE_CONTAINER_HOME", "/home/node", + ) + prompt_path = f"{container_home}/.bot-bottle-prompt.txt" + backend = metadata.backend if metadata is not None else "" + if backend == "smolmachines": + synth: object = SmolmachinesBottle( + instance_name, + prompt_path=prompt_path, + ) + else: + synth = DockerBottle( + container=instance_name, + teardown=lambda: None, + prompt_path_in_container=prompt_path, + ) + return synth, (prompt_path or "") + + +def _pick_next_after_stop( + agents_before: list[ActiveAgent], + selected_index: int, + stopped_slug: str, +) -> tuple[int, ActiveAgent] | None: + """After stopping `stopped_slug` from the agents list, choose + the agent that should take focus next. The agent below the + stopped row (which slides up to fill its index) is the + natural pick; if the stopped agent was last, the row above + instead. Returns (new_index, agent) or None if no agents + remain. Pure — easy to unit-test.""" + new_agents = [a for a in agents_before if a.slug != stopped_slug] + if not new_agents: + return None + new_index = min(max(selected_index, 0), len(new_agents) - 1) + return new_index, new_agents[new_index] + + +# --- tmux argv builders ------------------------------------------------------ + + +def _in_tmux() -> bool: + """True when the dashboard is running inside a tmux session. + Tmux sets `$TMUX` to the path of its server socket.""" + return bool(os.environ.get("TMUX")) + + +def _agent_runtime_args( + *, resume: bool, remote_control: bool = False, agent_provider_template: str = "claude", +) -> list[str]: + """The argv the dashboard hands to `bottle.agent_argv` + on every attach — matches what `attach_agent` builds for the + foreground handoff so both surfaces produce the same claude + invocation.""" + runtime = runtime_for(agent_provider_template) + args = list(runtime.bypass_args) + if remote_control: + args.extend(runtime.remote_control_args) + if resume: + args.extend(runtime.resume_args) + return args + + +def _build_resume_argv_with_fallback( + bottle, *, remote_control: bool = False, agent_provider_template: str = "claude", +) -> list[str]: + """Build a backend-exec argv that runs `claude --continue` and + falls back to plain `claude` if no prior session exists. + + `--continue` exits non-zero when an agent has been spun up + but never typed at — there's no transcript to resume. The + shell-level `||` wrapper makes that case start a fresh + session instead of crashing the pane. The trade-off: we + invoke `sh -c` inside the bottle, so the command is two + `claude` invocations behind a tiny shell rather than one + direct exec. Acceptable; the shell adds microseconds and + the fallback only kicks in when --continue would have + failed anyway. + + Works across backends because `bottle.agent_argv` always + surfaces the `claude` token preceded by the backend's exec + framing (docker: `docker exec -it `; smolmachines: + `smolvm machine exec --name -- runuser -u node --`). + Splitting at `claude` keeps the framing as the prefix and + wraps just the agent tail in `sh -c`.""" + if agent_provider_template != "claude": + return bottle.agent_argv( + _agent_runtime_args( + resume=True, + remote_control=remote_control, + agent_provider_template=agent_provider_template, + ) + ) + base_args = _agent_runtime_args( + resume=False, + remote_control=remote_control, + agent_provider_template=agent_provider_template, + ) + base_exec = bottle.agent_argv(base_args) + # Split exec-framing prefix from the agent-and-args tail so + # we can compose ` --continue || ` inside + # `sh -c`. The provider command token is the marker. + command = getattr(bottle, "agent_command", runtime_for(agent_provider_template).command) + agent_idx = base_exec.index(command) + prefix = base_exec[:agent_idx] + agent_cmd = " ".join(shlex.quote(a) for a in base_exec[agent_idx:]) + resume_args = " ".join( + shlex.quote(a) for a in runtime_for(agent_provider_template).resume_args + ) + return [ + *prefix, + "sh", "-c", + f"{agent_cmd} {resume_args} || {agent_cmd}", + ] + + +def _build_split_pane_argv(agent_argv: list[str]) -> list[str]: + """Pure helper: wrap a backend-exec argv with `tmux split-window + -h -P -F '#{pane_id}'`. The `-P -F` combo tells tmux to print + the new pane's id on stdout so we can track it for later + `respawn-pane` calls.""" + return [ + "tmux", "split-window", "-h", + "-P", "-F", "#{pane_id}", + *agent_argv, + ] + + +def _build_respawn_pane_argv(pane_id: str, agent_argv: list[str]) -> list[str]: + """Pure helper: wrap a backend-exec argv with `tmux respawn-pane + -k -t `. `-k` kills the existing process in the pane + before respawning.""" + return ["tmux", "respawn-pane", "-k", "-t", pane_id, *agent_argv] diff --git a/tests/unit/test_dashboard_model.py b/tests/unit/test_dashboard_model.py new file mode 100644 index 0000000..3f523bd --- /dev/null +++ b/tests/unit/test_dashboard_model.py @@ -0,0 +1,94 @@ +"""Unit: dashboard_model — state/model layer extracted from dashboard.py. + +Tests for functions that were previously buried in the 2103-line +dashboard.py and had no coverage: _approval_status, +_proposed_payload_label, and _suffix_for_tool.""" + +import unittest +from pathlib import Path + +from bot_bottle.cli.dashboard_model import ( + QueuedProposal, + _approval_status, + _proposed_payload_label, + _suffix_for_tool, +) +from bot_bottle.supervise import ( + Proposal, + TOOL_CAPABILITY_BLOCK, + TOOL_EGRESS_BLOCK, + TOOL_PIPELOCK_BLOCK, + sha256_hex, +) +from datetime import datetime, timezone + + +def _qp(tool: str, slug: str = "dev") -> QueuedProposal: + payload = "x" + p = Proposal.new( + bottle_slug=slug, + tool=tool, + proposed_file=payload, + justification="test", + current_file_hash=sha256_hex(payload), + now=datetime(2026, 6, 1, 0, 0, 0, tzinfo=timezone.utc), + ) + return QueuedProposal(proposal=p, queue_dir=Path("/tmp/q")) + + +class TestApprovalStatus(unittest.TestCase): + def test_egress_block_base_message(self): + qp = _qp(TOOL_EGRESS_BLOCK, slug="my-bot") + msg = _approval_status(qp, "approved") + self.assertEqual("approved egress-block for [my-bot]", msg) + + def test_modified_verb(self): + qp = _qp(TOOL_PIPELOCK_BLOCK, slug="dev") + msg = _approval_status(qp, "modified+approved") + self.assertEqual("modified+approved pipelock-block for [dev]", msg) + + def test_capability_block_appends_resume_hint(self): + qp = _qp(TOOL_CAPABILITY_BLOCK, slug="alpha") + msg = _approval_status(qp, "approved") + self.assertIn("resume: ./cli.py resume alpha", msg) + self.assertIn("approved capability-block for [alpha]", msg) + + def test_egress_block_has_no_resume_hint(self): + qp = _qp(TOOL_EGRESS_BLOCK) + self.assertNotIn("resume", _approval_status(qp, "approved")) + + def test_pipelock_block_has_no_resume_hint(self): + qp = _qp(TOOL_PIPELOCK_BLOCK) + self.assertNotIn("resume", _approval_status(qp, "approved")) + + +class TestProposedPayloadLabel(unittest.TestCase): + def test_pipelock_returns_failed_url(self): + self.assertEqual("failed URL", _proposed_payload_label(TOOL_PIPELOCK_BLOCK)) + + def test_egress_returns_proposed_file(self): + self.assertEqual("proposed file", _proposed_payload_label(TOOL_EGRESS_BLOCK)) + + def test_capability_returns_proposed_file(self): + self.assertEqual("proposed file", _proposed_payload_label(TOOL_CAPABILITY_BLOCK)) + + def test_unknown_tool_returns_proposed_file(self): + self.assertEqual("proposed file", _proposed_payload_label("unknown-tool")) + + +class TestSuffixForTool(unittest.TestCase): + def test_capability_block_returns_dockerfile_suffix(self): + self.assertEqual(".dockerfile", _suffix_for_tool(TOOL_CAPABILITY_BLOCK)) + + def test_egress_block_returns_txt(self): + self.assertEqual(".txt", _suffix_for_tool(TOOL_EGRESS_BLOCK)) + + def test_pipelock_block_returns_txt(self): + self.assertEqual(".txt", _suffix_for_tool(TOOL_PIPELOCK_BLOCK)) + + def test_unknown_tool_returns_txt(self): + self.assertEqual(".txt", _suffix_for_tool("whatever")) + + +if __name__ == "__main__": + unittest.main() -- 2.52.0