PRD 0013: supervise plane foundation #19

Merged
didericis merged 7 commits from prd-0013-supervise-foundation into main 2026-05-25 04:35:57 -04:00
16 changed files with 3103 additions and 3 deletions
+32
View File
@@ -0,0 +1,32 @@
# Per-bottle supervise sidecar image (PRD 0013).
#
# Exposes three MCP tools (cred-proxy-block, pipelock-block,
# capability-block) the agent calls to propose config changes when
# stuck. Each tool call writes a Proposal to a host-mounted queue
# dir and blocks waiting for the operator's Response.
#
# Stdlib-only Python. The bottle slug arrives via
# SUPERVISE_BOTTLE_SLUG; the host's ~/.claude-bottle/queue/<slug>/
# is bind-mounted at /run/supervise/queue.
# python:3.13-alpine, pinned by digest (same image cred-proxy uses,
# so docker pulls / caches once for both sidecars).
FROM python@sha256:420cd0bf0f3998275875e02ecd5808168cf0843cbb4d3c536432f729247b2acc
# Both files ship as single files into /app; supervise_server.py
# imports supervise via same-directory resolution.
COPY claude_bottle/supervise.py /app/supervise.py
COPY claude_bottle/supervise_server.py /app/supervise_server.py
# Pre-create the queue mount point so docker's bind-mount has a
# parent dir. Matches Dockerfile.cred-proxy's pattern.
RUN mkdir -p /run/supervise/queue
EXPOSE 9100
# WORKDIR makes the in-app same-dir import deterministic regardless
# of how the container is launched.
WORKDIR /app
# PID 1 is python for clean signal handling and exit codes.
ENTRYPOINT ["python3", "/app/supervise_server.py"]
+4
View File
@@ -31,6 +31,7 @@ from .provision import cred_proxy as _cred_proxy
from .provision import git as _git
from .provision import prompt as _prompt
from .provision import skills as _skills
from .supervise import DockerSupervise
class DockerBottleBackend(BottleBackend["DockerBottlePlan", "DockerBottleCleanupPlan"]):
@@ -43,6 +44,7 @@ class DockerBottleBackend(BottleBackend["DockerBottlePlan", "DockerBottleCleanup
self._proxy = DockerPipelockProxy()
self._git_gate = DockerGitGate()
self._cred_proxy = DockerCredProxy()
self._supervise = DockerSupervise()
def _resolve_plan(self, spec: BottleSpec, *, stage_dir: Path) -> DockerBottlePlan:
return _prepare.resolve_plan(
@@ -51,6 +53,7 @@ class DockerBottleBackend(BottleBackend["DockerBottlePlan", "DockerBottleCleanup
proxy=self._proxy,
git_gate=self._git_gate,
cred_proxy=self._cred_proxy,
supervise=self._supervise,
)
@contextmanager
@@ -60,6 +63,7 @@ class DockerBottleBackend(BottleBackend["DockerBottlePlan", "DockerBottleCleanup
proxy=self._proxy,
git_gate=self._git_gate,
cred_proxy=self._cred_proxy,
supervise=self._supervise,
provision=self.provision,
) as bottle:
yield bottle
@@ -16,6 +16,7 @@ from ...git_gate import GitGatePlan
from ...log import info
from ...manifest import Agent, Bottle
from ...pipelock import PipelockProxyPlan, pipelock_effective_allowlist
from ...supervise import SupervisePlan
from .. import BottlePlan
@@ -53,6 +54,9 @@ class DockerBottlePlan(BottlePlan):
proxy_plan: PipelockProxyPlan
git_gate_plan: GitGatePlan
cred_proxy_plan: CredProxyPlan
# None when bottle.supervise is False. PRD 0013 supervise sidecar
# is opt-in via the manifest's bottle.supervise field.
supervise_plan: SupervisePlan | None
allowlist_summary: str
use_runsc: bool
@@ -116,6 +120,12 @@ class DockerBottlePlan(BottlePlan):
info(" cred-proxy : (none)")
info(f" egress : {self.allowlist_summary}")
info(" tls intercept : pipelock (per-bottle ephemeral CA, generated at launch)")
if self.supervise_plan is not None:
info(
f" supervise : enabled; queue at {self.supervise_plan.queue_dir}"
)
else:
info(" supervise : disabled (set bottle.supervise=true to enable)")
info(
f"prompt : {len(v.agent.prompt)} chars; "
f"first line: {v.prompt_first_line or '(empty)'}"
@@ -169,6 +179,14 @@ class DockerBottlePlan(BottlePlan):
"ca_fingerprint": None,
},
},
"supervise": {
"enabled": self.supervise_plan is not None,
"queue_dir": (
str(self.supervise_plan.queue_dir)
if self.supervise_plan is not None
else None
),
},
"prompt": {
"length": len(v.agent.prompt),
"first_line": v.prompt_first_line,
+26
View File
@@ -19,6 +19,7 @@ from typing import Callable, Generator
from ...log import die, info
from ...pipelock import pipelock_build_config, pipelock_render_yaml
from ...supervise import CURRENT_CONFIG_DIR_IN_AGENT
from . import network as network_mod
from . import util as docker_mod
from .bottle import DockerBottle
@@ -33,6 +34,7 @@ from .pipelock import (
pipelock_tls_init,
)
from .provision.ca import AGENT_CA_BUNDLE, AGENT_CA_PATH
from .supervise import DockerSupervise
# Where the repo root lives, for `docker build` context. Computed once.
@@ -46,6 +48,7 @@ def launch(
proxy: DockerPipelockProxy,
git_gate: DockerGitGate,
cred_proxy: DockerCredProxy,
supervise: DockerSupervise,
provision: Callable[[DockerBottlePlan, str], str | None],
) -> Generator[DockerBottle, None, None]:
"""Build, launch, and provision a Docker bottle. Teardown on exit.
@@ -156,6 +159,19 @@ def launch(
cred_proxy_name = cred_proxy.start(plan.cred_proxy_plan)
stack.callback(cred_proxy.stop, cred_proxy_name)
# Supervise sidecar (PRD 0013). Opt-in via bottle.supervise.
# Internal-network only — the sidecar makes no outbound calls.
# Must come up BEFORE the agent so DNS resolution for
# `supervise` succeeds on the agent's first tool call.
if plan.supervise_plan is not None:
supervise_plan = dataclasses.replace(
plan.supervise_plan,
internal_network=internal_network,
)
plan = dataclasses.replace(plan, supervise_plan=supervise_plan)
supervise_name = supervise.start(plan.supervise_plan)
stack.callback(supervise.stop, supervise_name)
container = _run_agent_container(plan, internal_network)
stack.callback(docker_mod.force_remove_container, container)
@@ -196,6 +212,16 @@ def _run_agent_container(plan: DockerBottlePlan, internal_network: str) -> str:
for name in plan.forwarded_env:
docker_args.extend(["-e", name])
# PRD 0013: read-only current-config mount so the agent can read
# routes.json / allowlist / Dockerfile before composing a
# supervise tool-call proposal. Mounted from the per-bottle
# stage_dir/current-config/ populated at prepare time.
if plan.supervise_plan is not None:
docker_args.extend([
"-v",
f"{plan.supervise_plan.current_config_dir}:{CURRENT_CONFIG_DIR_IN_AGENT}:ro",
])
docker_args.extend([plan.runtime_image, "sleep", "infinity"])
info(f"starting container {plan.container_name} from {plan.runtime_image}")
+22
View File
@@ -14,6 +14,7 @@ import os
from pathlib import Path
from ... import pipelock
from ...cred_proxy import cred_proxy_render_routes
from ...env import ResolvedEnv, resolve_env
from ...log import die
from .. import BottleSpec
@@ -26,6 +27,7 @@ from .cred_proxy import (
)
from .git_gate import DockerGitGate, git_gate_container_name
from .pipelock import DockerPipelockProxy, pipelock_container_name
from .supervise import DockerSupervise, supervise_container_name
def resolve_plan(
@@ -35,6 +37,7 @@ def resolve_plan(
proxy: DockerPipelockProxy,
git_gate: DockerGitGate,
cred_proxy: DockerCredProxy,
supervise: DockerSupervise,
) -> DockerBottlePlan:
"""Resolve Docker-specific names and write scratch files. Trusts
that the agent and its skills/git-gate keys are present —
@@ -94,6 +97,8 @@ def resolve_plan(
sidecar_probes.append(("git-gate", git_gate_container_name(slug)))
if bottle.cred_proxy.routes:
sidecar_probes.append(("cred-proxy", cred_proxy_container_name(slug)))
if bottle.supervise:
sidecar_probes.append(("supervise", supervise_container_name(slug)))
for label, sidecar_name in sidecar_probes:
if docker_mod.container_exists(sidecar_name):
die(
@@ -111,6 +116,22 @@ def resolve_plan(
proxy_plan = proxy.prepare(bottle, slug, stage_dir)
git_gate_plan = git_gate.prepare(bottle, slug, stage_dir)
cred_proxy_plan = cred_proxy.prepare(bottle, slug, stage_dir)
supervise_plan = None
if bottle.supervise:
routes_content = cred_proxy_render_routes(cred_proxy_plan.routes) if cred_proxy_plan.routes else ""
allowlist_content = "\n".join(pipelock.pipelock_effective_allowlist(bottle)) + "\n"
# Current Dockerfile for the agent image. Read from the repo
# root; for `--cwd` derived images the base Dockerfile is what
# the agent should propose changes against (the derived layer
# is just a workspace copy).
dockerfile_path = Path(__file__).resolve().parent.parent.parent.parent / "Dockerfile"
dockerfile_content = dockerfile_path.read_text() if dockerfile_path.is_file() else ""
supervise_plan = supervise.prepare(
slug, stage_dir,
routes_content=routes_content,
allowlist_content=allowlist_content,
dockerfile_content=dockerfile_content,
)
resolved = resolve_env(manifest, spec.agent_name)
# Everything that should reach the bottle by-name (so its value
# never lands on argv or in env_file) goes into one dict. Nothing
@@ -169,6 +190,7 @@ def resolve_plan(
proxy_plan=proxy_plan,
git_gate_plan=git_gate_plan,
cred_proxy_plan=cred_proxy_plan,
supervise_plan=supervise_plan,
allowlist_summary=allowlist_summary,
use_runsc=use_runsc,
)
+131
View File
@@ -0,0 +1,131 @@
"""DockerSupervise — the Docker-specific lifecycle for the per-bottle
supervise sidecar (PRD 0013). Inherits the platform-agnostic prepare
step (queue dir + current-config staging) from `Supervise`."""
from __future__ import annotations
import os
import subprocess
from pathlib import Path
from ...log import die, info, warn
from ...supervise import (
QUEUE_DIR_IN_CONTAINER,
SUPERVISE_HOSTNAME,
SUPERVISE_PORT,
Supervise,
SupervisePlan,
)
from . import util as docker_mod
SUPERVISE_IMAGE = os.environ.get(
"CLAUDE_BOTTLE_SUPERVISE_IMAGE",
"claude-bottle-supervise:latest",
)
SUPERVISE_DOCKERFILE = "Dockerfile.supervise"
_REPO_DIR = str(Path(__file__).resolve().parent.parent.parent.parent)
def supervise_container_name(slug: str) -> str:
return f"claude-bottle-supervise-{slug}"
def supervise_url() -> str:
"""Base URL the agent's MCP client dials. Stable across bottles
because the sidecar attaches `--network-alias supervise` on the
internal network."""
return f"http://{SUPERVISE_HOSTNAME}:{SUPERVISE_PORT}"
def build_supervise_image() -> None:
"""Build the supervise image from `Dockerfile.supervise`. Called
by `DockerSupervise.start`; exposed at module level so tests can
build it without running the full launch pipeline."""
docker_mod.build_image(SUPERVISE_IMAGE, _REPO_DIR, dockerfile=SUPERVISE_DOCKERFILE)
class DockerSupervise(Supervise):
"""Brings the supervise sidecar up and down via Docker."""
def start(self, plan: SupervisePlan) -> str:
"""Boot the supervise sidecar:
1. Build the supervise image (no-op when cache is hot).
2. `docker create` on the internal network with
`--network-alias supervise` and SUPERVISE_BOTTLE_SLUG in
the environ.
3. Bind-mount the host queue dir at /run/supervise/queue.
4. `docker start`.
No egress network — the supervise sidecar does not make
outbound calls. Returns the container name."""
if not plan.internal_network:
die("DockerSupervise.start: plan.internal_network must be set before start")
if not plan.queue_dir.is_dir():
die(
f"DockerSupervise.start: queue dir missing at {plan.queue_dir}; "
f"Supervise.prepare must run first"
)
build_supervise_image()
name = supervise_container_name(plan.slug)
info(f"starting supervise sidecar {name} on network {plan.internal_network}")
create_args = [
"docker", "create",
"--name", name,
"--network", plan.internal_network,
"--network-alias", SUPERVISE_HOSTNAME,
"-e", f"SUPERVISE_BOTTLE_SLUG={plan.slug}",
"-e", f"SUPERVISE_QUEUE_DIR={QUEUE_DIR_IN_CONTAINER}",
"-e", f"SUPERVISE_PORT={SUPERVISE_PORT}",
"-v", f"{plan.queue_dir}:{QUEUE_DIR_IN_CONTAINER}",
SUPERVISE_IMAGE,
]
create_result = subprocess.run(
create_args, capture_output=True, text=True, check=False,
)
if create_result.returncode != 0:
die(
f"failed to create supervise sidecar {name}: "
f"{create_result.stderr.strip()}"
)
start_result = subprocess.run(
["docker", "start", name], capture_output=True, text=True, check=False,
)
if start_result.returncode != 0:
subprocess.run(
["docker", "rm", "-f", name],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
)
die(
f"failed to start supervise sidecar {name}: "
f"{start_result.stderr.strip()}"
)
return name
def stop(self, target: str) -> None:
"""Idempotent: missing container is success."""
if subprocess.run(
["docker", "inspect", target],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
).returncode == 0:
if subprocess.run(
["docker", "rm", "-f", target],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
).returncode != 0:
warn(
f"failed to remove supervise sidecar {target}; "
f"clean up with 'docker rm -f {target}'"
)
+4 -1
View File
@@ -1,6 +1,6 @@
"""Main CLI dispatcher.
Commands: cleanup, edit, info, init, list, start
Commands: cleanup, dashboard, edit, info, init, list, start
"""
from __future__ import annotations
@@ -11,6 +11,7 @@ from ..log import Die, die
from ._common import PROG
from . import list as _list_mod
from .cleanup import cmd_cleanup
from .dashboard import cmd_dashboard
from .edit import cmd_edit
from .info import cmd_info
from .init import cmd_init
@@ -20,6 +21,7 @@ cmd_list = _list_mod.cmd_list
COMMANDS = {
"cleanup": cmd_cleanup,
"dashboard": cmd_dashboard,
"edit": cmd_edit,
"info": cmd_info,
"init": cmd_init,
@@ -32,6 +34,7 @@ def usage() -> None:
sys.stderr.write(f"usage: {PROG} <command> [args...]\n\n")
sys.stderr.write("Commands:\n")
sys.stderr.write(" cleanup stop and remove all active claude-bottle containers\n")
sys.stderr.write(" dashboard view + approve/modify/reject pending supervise proposals (PRD 0013)\n")
sys.stderr.write(" edit open an agent in vim for editing\n")
sys.stderr.write(" info print env, skills, and prompt details for a named agent\n")
sys.stderr.write(" init interactively create a new agent and add it to claude-bottle.json\n")
+397
View File
@@ -0,0 +1,397 @@
"""dashboard: list pending supervise proposals across all bottles and
act on them (approve / modify / reject). PRD 0013 v1.
Curses-based TUI; modify-then-approve shells out to $EDITOR. For
0013 the approval handlers are no-ops on the supervisor side: the
response file is written (and the sidecar returns it to the agent),
and an audit entry is appended, but no host-side config change runs.
PRDs 0014 (cred-proxy) and 0015 (pipelock) wire in the actual
writes.
"""
from __future__ import annotations
import argparse
import curses
import os
import subprocess
import sys
import tempfile
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from .. import supervise as _supervise
from ..log import info
from ..supervise import (
ACTION_OPERATOR_EDIT,
COMPONENT_FOR_TOOL,
AuditEntry,
Proposal,
Response,
STATUS_APPROVED,
STATUS_MODIFIED,
STATUS_REJECTED,
TOOL_CAPABILITY_BLOCK,
list_pending_proposals,
render_diff,
write_audit_entry,
write_response,
)
from ._common import PROG
# --- Discovery -------------------------------------------------------------
@dataclass(frozen=True)
class QueuedProposal:
"""A pending proposal plus the queue dir it was found in."""
proposal: Proposal
queue_dir: Path
def discover_pending() -> list[QueuedProposal]:
"""Walk ~/.claude-bottle/queue/* and collect pending proposals
from every bottle's queue. Sorted by arrival time across the
union — the operator works the global FIFO."""
queue_root = _supervise.claude_bottle_root() / "queue"
if not queue_root.is_dir():
return []
out: list[QueuedProposal] = []
for slug_dir in sorted(queue_root.iterdir()):
if not slug_dir.is_dir():
continue
for proposal in list_pending_proposals(slug_dir):
out.append(QueuedProposal(proposal=proposal, queue_dir=slug_dir))
out.sort(key=lambda q: q.proposal.arrival_timestamp)
return out
# --- Operator actions ------------------------------------------------------
def approve(
qp: QueuedProposal,
*,
notes: str = "",
final_file: str | None = None,
) -> None:
"""Write an approval response and an audit entry. If `final_file`
is provided the status is `modified`; otherwise `approved`."""
status = STATUS_MODIFIED if final_file is not None else STATUS_APPROVED
response = Response(
proposal_id=qp.proposal.id,
status=status,
notes=notes,
final_file=final_file,
)
write_response(qp.queue_dir, response)
_write_audit(qp, action=status, notes=notes, final_file=final_file)
def reject(qp: QueuedProposal, *, reason: str) -> None:
"""Write a rejection response and an audit entry."""
response = Response(
proposal_id=qp.proposal.id,
status=STATUS_REJECTED,
notes=reason,
final_file=None,
)
write_response(qp.queue_dir, response)
_write_audit(qp, action=STATUS_REJECTED, notes=reason, final_file=None)
def _write_audit(
qp: QueuedProposal,
*,
action: str,
notes: str,
final_file: str | None,
) -> None:
"""Audit log for cred-proxy / pipelock tools. capability-block has
no audit log (its changes are captured by the bottle's rebuild
record + git history per PRD 0016)."""
component = COMPONENT_FOR_TOOL.get(qp.proposal.tool)
if component is None:
# capability-block: skip audit log; 0016 records via rebuild.
return
# v1 audit diff is empty: 0013's no-op handler doesn't have the
# actual current-on-disk file to diff against, only the agent's
# proposed file. 0014 / 0015 fill in the real diff against the
# live routes.json / allowlist after writing the change.
write_audit_entry(AuditEntry(
timestamp=datetime.now(timezone.utc).isoformat(),
bottle_slug=qp.proposal.bottle_slug,
component=component,
operator_action=action,
operator_notes=notes,
justification=qp.proposal.justification,
diff=render_diff(
"",
final_file if final_file is not None else qp.proposal.proposed_file,
label=component,
),
))
# --- $EDITOR integration --------------------------------------------------
def edit_in_editor(content: str, *, suffix: str = ".tmp") -> str | None:
"""Suspend curses (caller is responsible for that), drop `content`
to a temp file, exec $EDITOR on it, return the edited content.
Returns None if the edit was a no-op."""
editor = os.environ.get("EDITOR", "vim")
with tempfile.NamedTemporaryFile(
mode="w", suffix=suffix, delete=False, prefix="supervise-modify.",
) as f:
f.write(content)
path = f.name
try:
subprocess.run([editor, path], check=False)
with open(path) as f:
edited = f.read()
return edited if edited != content else None
finally:
try:
os.unlink(path)
except OSError:
pass
# --- TUI -------------------------------------------------------------------
def cmd_dashboard(argv: list[str]) -> int:
parser = argparse.ArgumentParser(prog=f"{PROG} dashboard", add_help=True)
parser.add_argument(
"--once", action="store_true",
help="list pending proposals once and exit (no TUI)",
)
args = parser.parse_args(argv)
if args.once:
return _list_once()
try:
curses.wrapper(_main_loop)
except KeyboardInterrupt:
return 130
return 0
def _list_once() -> int:
pending = discover_pending()
if not pending:
info("no pending proposals")
return 0
for qp in pending:
sys.stdout.write(
f"{qp.proposal.arrival_timestamp} "
f"[{qp.proposal.bottle_slug}] "
f"{qp.proposal.tool} "
f"{qp.proposal.id}\n"
)
sys.stdout.write(f" {qp.proposal.justification}\n")
return 0
def _main_loop(stdscr: "curses._CursesWindow") -> None:
curses.curs_set(0)
stdscr.nodelay(False)
selected = 0
status_line = ""
while True:
pending = discover_pending()
if selected >= len(pending):
selected = max(0, len(pending) - 1)
_render(stdscr, pending, selected, status_line)
status_line = ""
try:
key = stdscr.getch()
except KeyboardInterrupt:
return
if key in (ord("q"), 27): # q or ESC
return
if not pending:
continue
qp = pending[selected]
if key in (curses.KEY_DOWN, ord("j")):
selected = min(selected + 1, len(pending) - 1)
elif key in (curses.KEY_UP, ord("k")):
selected = max(selected - 1, 0)
elif key in (curses.KEY_ENTER, 10, 13, ord("v")):
_detail_view(stdscr, qp)
elif key == ord("a"):
approve(qp)
status_line = f"approved {qp.proposal.tool} for [{qp.proposal.bottle_slug}]"
elif key == ord("m"):
edited = _modify(stdscr, qp)
if edited is None:
status_line = "modify aborted (no change)"
else:
approve(qp, final_file=edited, notes="operator modified before approving")
status_line = f"modified+approved {qp.proposal.tool} for [{qp.proposal.bottle_slug}]"
elif key == ord("r"):
reason = _prompt(stdscr, "reject reason: ")
if reason:
reject(qp, reason=reason)
status_line = f"rejected {qp.proposal.tool} for [{qp.proposal.bottle_slug}]"
else:
status_line = "reject aborted (empty reason)"
def _render(
stdscr: "curses._CursesWindow",
pending: list[QueuedProposal],
selected: int,
status_line: str,
) -> None:
stdscr.erase()
h, w = stdscr.getmaxyx()
header = f"claude-bottle dashboard ({len(pending)} pending)"
stdscr.addnstr(0, 0, header, w - 1, curses.A_BOLD)
stdscr.hline(1, 0, curses.ACS_HLINE, w)
if not pending:
stdscr.addnstr(
3, 2,
"no pending proposals; agents will queue here when they call a "
"supervise tool",
w - 4,
)
else:
for i, qp in enumerate(pending):
row = 2 + i
if row >= h - 2:
break
p = qp.proposal
ts_short = p.arrival_timestamp.split("T", 1)[1][:8] if "T" in p.arrival_timestamp else p.arrival_timestamp
line = (
f"{'> ' if i == selected else ' '}"
f"[{p.bottle_slug}] {p.tool:<20} {ts_short} "
f"{p.justification[:60]}"
)
attr = curses.A_REVERSE if i == selected else curses.A_NORMAL
stdscr.addnstr(row, 0, line, w - 1, attr)
footer = "[Enter] view [a] approve [m] modify [r] reject [j/k] move [q] quit"
stdscr.hline(h - 2, 0, curses.ACS_HLINE, w)
stdscr.addnstr(h - 1, 0, footer, w - 1, curses.A_DIM)
if status_line:
stdscr.addnstr(h - 3, 0, status_line, w - 1, curses.A_BOLD)
stdscr.refresh()
def _detail_view(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> None:
"""Render the full proposal: header, justification, proposed file
contents. Scrollable. Press q to return."""
lines = _detail_lines(qp)
offset = 0
while True:
stdscr.erase()
h, w = stdscr.getmaxyx()
for i, line in enumerate(lines[offset:offset + h - 1]):
stdscr.addnstr(i, 0, line, w - 1)
stdscr.addnstr(
h - 1, 0,
"[j/k] scroll [g/G] top/bottom [a] approve [m] modify [r] reject [q] back",
w - 1, curses.A_DIM,
)
stdscr.refresh()
key = stdscr.getch()
if key in (ord("q"), 27):
return
if key in (curses.KEY_DOWN, ord("j")):
offset = min(offset + 1, max(0, len(lines) - 1))
elif key in (curses.KEY_UP, ord("k")):
offset = max(offset - 1, 0)
elif key == ord("g"):
offset = 0
elif key == ord("G"):
offset = max(0, len(lines) - 1)
elif key == ord("a"):
approve(qp)
return
elif key == ord("m"):
edited = _modify(stdscr, qp)
if edited is not None:
approve(qp, final_file=edited, notes="operator modified before approving")
return
elif key == ord("r"):
reason = _prompt(stdscr, "reject reason: ")
if reason:
reject(qp, reason=reason)
return
def _detail_lines(qp: QueuedProposal) -> list[str]:
p = qp.proposal
out = [
f"bottle: {p.bottle_slug}",
f"tool: {p.tool}",
f"id: {p.id}",
f"arrived: {p.arrival_timestamp}",
f"queue: {qp.queue_dir}",
"",
"justification:",
]
out.extend(" " + line for line in p.justification.splitlines() or [""])
out.extend([
"",
"proposed file:",
])
out.extend(p.proposed_file.splitlines() or [""])
return out
def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None:
"""Suspend curses, open $EDITOR on the proposed file, return the
edited content (or None if unchanged)."""
suffix = _suffix_for_tool(qp.proposal.tool)
curses.endwin()
try:
edited = edit_in_editor(qp.proposal.proposed_file, suffix=suffix)
finally:
stdscr.refresh()
return edited
def _suffix_for_tool(tool: str) -> str:
if tool == TOOL_CAPABILITY_BLOCK:
return ".dockerfile"
# cred-proxy-block / pipelock-block: JSON-ish + plain.
return ".txt"
def _prompt(stdscr: "curses._CursesWindow", label: str) -> str:
"""One-line input at the bottom of the screen."""
curses.curs_set(1)
h, _ = stdscr.getmaxyx()
stdscr.move(h - 2, 0)
stdscr.clrtoeol()
stdscr.addstr(h - 2, 0, label)
stdscr.refresh()
curses.echo()
try:
raw = stdscr.getstr(h - 2, len(label), 200)
finally:
curses.noecho()
curses.curs_set(0)
return raw.decode("utf-8", errors="replace").strip()
__all__ = [
"ACTION_OPERATOR_EDIT", # re-exported for 0014/0015 to write operator-initiated audit entries
"QueuedProposal",
"approve",
"cmd_dashboard",
"discover_pending",
"edit_in_editor",
"reject",
]
+19 -2
View File
@@ -329,6 +329,13 @@ class Bottle:
git: tuple[GitEntry, ...] = ()
cred_proxy: CredProxyConfig = field(default_factory=CredProxyConfig)
egress: BottleEgress = field(default_factory=BottleEgress)
# Opt-in per-bottle stuck-recovery sidecar (PRD 0013). When true,
# the launch step brings up a supervise sidecar that exposes three
# MCP tools to the agent (cred-proxy-block, pipelock-block,
# capability-block) plus mounts the current-config dir read-only
# into the agent at /etc/claude-bottle/current-config. False (the
# default) skips the sidecar and the mount.
supervise: bool = False
@classmethod
def from_dict(cls, name: str, raw: object) -> "Bottle":
@@ -396,7 +403,17 @@ class Bottle:
else BottleEgress()
)
return cls(env=env, git=git, cred_proxy=cred_proxy, egress=egress)
supervise_raw = d.get("supervise", False)
if not isinstance(supervise_raw, bool):
die(
f"bottle '{name}' supervise must be a boolean "
f"(was {type(supervise_raw).__name__})"
)
return cls(
env=env, git=git, cred_proxy=cred_proxy, egress=egress,
supervise=supervise_raw,
)
@dataclass(frozen=True)
@@ -747,7 +764,7 @@ _FILENAME_RX = re.compile(r"^[a-z][a-z0-9-]*$")
# Frontmatter keys we accept on each entity. Anything not in these
# sets dies with a "did you mean" pointer — typos shouldn't silently
# ghost into an empty config.
_BOTTLE_KEYS = frozenset({"env", "git", "cred_proxy", "egress"})
_BOTTLE_KEYS = frozenset({"env", "git", "cred_proxy", "egress", "supervise"})
_AGENT_KEYS_REQUIRED = frozenset({"bottle"})
_AGENT_KEYS_OPTIONAL = frozenset({"skills"})
# Claude Code subagent fields claude-bottle ignores at launch but
+586
View File
@@ -0,0 +1,586 @@
"""Per-bottle supervise plane (PRD 0013).
The supervise plane is the per-bottle MCP sidecar plus its host-side
queue/audit support. The sidecar (claude_bottle.supervise_server)
sits on the bottle's internal network and exposes three MCP tools the
agent calls when it hits a stuck-recovery category:
* cred-proxy-block agent proposes a new routes.json
* pipelock-block agent proposes a new pipelock allowlist
* capability-block agent proposes a new agent Dockerfile
Each tool call: the agent passes the full proposed file plus a
justification text. The sidecar validates the proposal syntactically,
writes it to the host's per-bottle queue dir, and holds the tool-call
connection open. The operator's TUI dashboard
(claude_bottle.cli.dashboard) sees the proposal, accepts
approve / modify / reject, and writes a response file alongside the
proposal. The sidecar sees the response and returns `{status, notes}`
to the agent.
This module defines the host-side library: dataclasses for the queue
file shapes, queue read/write helpers, the audit log writer, and the
diff renderer. The in-container sidecar lives in
claude_bottle/supervise_server.py; the Docker lifecycle in
claude_bottle/backend/docker/supervise.py.
For 0013 the supervisor's approval handlers are deliberately no-ops:
on approval the audit log is written and the response file is
delivered to the agent, but no host-side config change happens. The
remediation engines that wire real config changes land in PRDs 0014,
0015, and 0016.
"""
from __future__ import annotations
import dataclasses
import difflib
import hashlib
import json
import os
import time
import uuid
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
SUPERVISE_HOSTNAME = "supervise"
SUPERVISE_PORT = 9100
TOOL_CRED_PROXY_BLOCK = "cred-proxy-block"
TOOL_PIPELOCK_BLOCK = "pipelock-block"
TOOL_CAPABILITY_BLOCK = "capability-block"
TOOLS: tuple[str, ...] = (
TOOL_CRED_PROXY_BLOCK,
TOOL_PIPELOCK_BLOCK,
TOOL_CAPABILITY_BLOCK,
)
# capability-block has no on-disk config the operator edits in place
# (the Dockerfile is rebuilt, not patched), so it has no audit log
# here — those changes are captured by git history + the rebuild
# record laid down in PRD 0016.
COMPONENT_FOR_TOOL: dict[str, str] = {
TOOL_CRED_PROXY_BLOCK: "cred-proxy",
TOOL_PIPELOCK_BLOCK: "pipelock",
}
STATUS_APPROVED = "approved"
STATUS_MODIFIED = "modified"
STATUS_REJECTED = "rejected"
STATUSES: tuple[str, ...] = (STATUS_APPROVED, STATUS_MODIFIED, STATUS_REJECTED)
# Operator-initiated audit entries (no tool call). PRD 0014's
# `routes edit <bottle>` and PRD 0015's `pipelock edit <bottle>`
# verbs write entries with this action.
ACTION_OPERATOR_EDIT = "operator-edit"
QUEUE_DIR_IN_CONTAINER = "/run/supervise/queue"
CURRENT_CONFIG_DIR_IN_AGENT = "/etc/claude-bottle/current-config"
DEFAULT_POLL_INTERVAL_SEC = 0.5
# --- Paths -----------------------------------------------------------------
def claude_bottle_root() -> Path:
return Path.home() / ".claude-bottle"
def queue_dir_for_slug(slug: str) -> Path:
return claude_bottle_root() / "queue" / slug
def audit_dir() -> Path:
return claude_bottle_root() / "audit"
def audit_log_path(component: str, slug: str) -> Path:
return audit_dir() / f"{component}-{slug}.log"
# --- Dataclasses -----------------------------------------------------------
@dataclass(frozen=True)
class Proposal:
"""One pending tool-call from the agent. The sidecar writes one
of these to the queue dir on a tool call; the operator's TUI
reads them; the sidecar polls for a matching Response."""
id: str
bottle_slug: str
tool: str
proposed_file: str
justification: str
arrival_timestamp: str
current_file_hash: str
@classmethod
def new(
cls,
*,
bottle_slug: str,
tool: str,
proposed_file: str,
justification: str,
current_file_hash: str,
now: datetime | None = None,
) -> "Proposal":
ts = (now or datetime.now(timezone.utc)).isoformat()
return cls(
id=str(uuid.uuid4()),
bottle_slug=bottle_slug,
tool=tool,
proposed_file=proposed_file,
justification=justification,
arrival_timestamp=ts,
current_file_hash=current_file_hash,
)
def to_dict(self) -> dict[str, object]:
return dataclasses.asdict(self)
@classmethod
def from_dict(cls, raw: dict[str, object]) -> "Proposal":
tool = _require_str(raw, "tool")
if tool not in TOOLS:
raise ValueError(f"tool must be one of {TOOLS}; got {tool!r}")
return cls(
id=_require_str(raw, "id"),
bottle_slug=_require_str(raw, "bottle_slug"),
tool=tool,
proposed_file=_require_str(raw, "proposed_file"),
justification=_require_str(raw, "justification"),
arrival_timestamp=_require_str(raw, "arrival_timestamp"),
current_file_hash=_require_str(raw, "current_file_hash"),
)
@dataclass(frozen=True)
class Response:
"""The operator's decision on a proposal. The TUI writes one of
these to the queue dir; the sidecar reads it and returns the
`{status, notes}` pair to the agent's tool call.
`final_file` carries the file content the supervisor will
actually apply: for `approved`, equal to the proposal's
`proposed_file`; for `modified`, the operator's edited version
(the audit diff is current final_file, not current
proposed_file); for `rejected`, None."""
proposal_id: str
status: str
notes: str
final_file: str | None = None
def to_dict(self) -> dict[str, object]:
return dataclasses.asdict(self)
@classmethod
def from_dict(cls, raw: dict[str, object]) -> "Response":
status = _require_str(raw, "status")
if status not in STATUSES:
raise ValueError(
f"response status must be one of {STATUSES}; got {status!r}"
)
final = raw.get("final_file")
if final is not None and not isinstance(final, str):
raise ValueError(
f"final_file must be a string or null; got {type(final).__name__}"
)
return cls(
proposal_id=_require_str(raw, "proposal_id"),
status=status,
notes=_require_str(raw, "notes"),
final_file=final,
)
@dataclass(frozen=True)
class AuditEntry:
"""One row of the per-bottle audit log. JSON-Lines, append-only."""
timestamp: str
bottle_slug: str
component: str
operator_action: str
operator_notes: str
justification: str
diff: str
def to_dict(self) -> dict[str, object]:
return dataclasses.asdict(self)
# --- Queue I/O -------------------------------------------------------------
def _proposal_filename(proposal_id: str) -> str:
return f"{proposal_id}.proposal.json"
def _response_filename(proposal_id: str) -> str:
return f"{proposal_id}.response.json"
def _id_from_proposal_filename(path: Path) -> str | None:
name = path.name
if not name.endswith(".proposal.json"):
return None
return name[: -len(".proposal.json")]
def write_proposal(queue_dir: Path, proposal: Proposal) -> Path:
"""Persist `proposal` as JSON in the queue dir, mode 0o600.
Directory is created if missing."""
queue_dir.mkdir(parents=True, exist_ok=True)
path = queue_dir / _proposal_filename(proposal.id)
payload = json.dumps(proposal.to_dict(), indent=2) + "\n"
_atomic_write(path, payload, mode=0o600)
return path
def read_proposal(queue_dir: Path, proposal_id: str) -> Proposal:
path = queue_dir / _proposal_filename(proposal_id)
with path.open() as f:
raw = json.load(f)
if not isinstance(raw, dict):
raise ValueError(f"{path}: top-level must be an object")
return Proposal.from_dict(raw)
def list_pending_proposals(queue_dir: Path) -> list[Proposal]:
"""All proposals in `queue_dir` that do not yet have a matching
response file. Sorted by `arrival_timestamp` so the operator
sees the queue FIFO."""
if not queue_dir.is_dir():
return []
out: list[Proposal] = []
for path in sorted(queue_dir.glob("*.proposal.json")):
proposal_id = _id_from_proposal_filename(path)
if proposal_id is None:
continue
if (queue_dir / _response_filename(proposal_id)).exists():
continue
try:
with path.open() as f:
raw = json.load(f)
except (OSError, json.JSONDecodeError):
continue
if not isinstance(raw, dict):
continue
try:
out.append(Proposal.from_dict(raw))
except (KeyError, ValueError):
continue
out.sort(key=lambda p: p.arrival_timestamp)
return out
def write_response(queue_dir: Path, response: Response) -> Path:
queue_dir.mkdir(parents=True, exist_ok=True)
path = queue_dir / _response_filename(response.proposal_id)
payload = json.dumps(response.to_dict(), indent=2) + "\n"
_atomic_write(path, payload, mode=0o600)
return path
def read_response(queue_dir: Path, proposal_id: str) -> Response:
path = queue_dir / _response_filename(proposal_id)
with path.open() as f:
raw = json.load(f)
if not isinstance(raw, dict):
raise ValueError(f"{path}: top-level must be an object")
return Response.from_dict(raw)
def wait_for_response(
queue_dir: Path,
proposal_id: str,
*,
poll_interval: float = DEFAULT_POLL_INTERVAL_SEC,
deadline: float | None = None,
) -> Response:
"""Block until a response file appears for `proposal_id`, then
return it. `deadline` is an absolute time.monotonic() value after
which the wait raises TimeoutError. None waits forever the
natural shape, since the operator's response time is unbounded.
Polls the filesystem so the implementation stays portable and
stdlib-only."""
path = queue_dir / _response_filename(proposal_id)
while True:
if path.exists():
try:
with path.open() as f:
raw = json.load(f)
except (OSError, json.JSONDecodeError):
raw = None
if isinstance(raw, dict):
try:
return Response.from_dict(raw)
except (KeyError, ValueError):
pass
if deadline is not None and time.monotonic() >= deadline:
raise TimeoutError(f"no response for proposal {proposal_id!r}")
time.sleep(poll_interval)
def archive_proposal(queue_dir: Path, proposal_id: str) -> None:
"""Move both proposal and response files to `<queue_dir>/processed/`.
Idempotent missing files are silently skipped."""
processed = queue_dir / "processed"
processed.mkdir(parents=True, exist_ok=True)
for name in (_proposal_filename(proposal_id), _response_filename(proposal_id)):
src = queue_dir / name
if src.exists():
src.rename(processed / name)
# --- Audit log -------------------------------------------------------------
def write_audit_entry(entry: AuditEntry) -> Path:
"""Append `entry` as one JSON-Lines record to the per-bottle
audit log. Acquires an advisory exclusive lock so concurrent
writers don't interleave bytes."""
path = audit_log_path(entry.component, entry.bottle_slug)
path.parent.mkdir(parents=True, exist_ok=True)
line = json.dumps(entry.to_dict(), sort_keys=False) + "\n"
fd = os.open(path, os.O_WRONLY | os.O_APPEND | os.O_CREAT, 0o600)
try:
_try_flock(fd)
try:
os.write(fd, line.encode("utf-8"))
finally:
_try_funlock(fd)
finally:
os.close(fd)
return path
def read_audit_entries(component: str, slug: str) -> list[AuditEntry]:
"""Load all audit entries for the given component+slug. Empty
list if the log doesn't exist."""
path = audit_log_path(component, slug)
if not path.is_file():
return []
out: list[AuditEntry] = []
with path.open() as f:
for raw_line in f:
raw_line = raw_line.strip()
if not raw_line:
continue
try:
raw = json.loads(raw_line)
except json.JSONDecodeError:
continue
if not isinstance(raw, dict):
continue
try:
out.append(AuditEntry(
timestamp=_require_str(raw, "timestamp"),
bottle_slug=_require_str(raw, "bottle_slug"),
component=_require_str(raw, "component"),
operator_action=_require_str(raw, "operator_action"),
operator_notes=_require_str(raw, "operator_notes"),
justification=_require_str(raw, "justification"),
diff=_require_str(raw, "diff"),
))
except ValueError:
continue
return out
# --- Diff rendering --------------------------------------------------------
def render_diff(before: str, after: str, *, label: str = "config") -> str:
"""Unified diff suitable for the audit log + TUI. Empty diff (no
changes) renders as the empty string."""
diff = difflib.unified_diff(
before.splitlines(keepends=True),
after.splitlines(keepends=True),
fromfile=f"{label} (current)",
tofile=f"{label} (proposed)",
lineterm="",
)
parts = list(diff)
if not parts:
return ""
return "".join(p if p.endswith("\n") else p + "\n" for p in parts).rstrip("\n")
def sha256_hex(content: str) -> str:
return hashlib.sha256(content.encode("utf-8")).hexdigest()
# --- Sidecar plan + abstract lifecycle -------------------------------------
# Filenames inside the per-bottle current-config dir. The agent reads
# these (read-only) from CURRENT_CONFIG_DIR_IN_AGENT and proposes
# modified versions back via the three MCP tools.
CURRENT_CONFIG_ROUTES = "routes.json"
CURRENT_CONFIG_ALLOWLIST = "allowlist"
CURRENT_CONFIG_DOCKERFILE = "Dockerfile"
@dataclass(frozen=True)
class SupervisePlan:
"""Output of Supervise.prepare; consumed by .start.
`queue_dir` is the host directory bind-mounted into the sidecar
at /run/supervise/queue. `current_config_dir` is the host
directory bind-mounted (read-only) into the *agent* container at
/etc/claude-bottle/current-config, holding routes.json + allowlist
+ Dockerfile so the agent can read them before composing a
proposal. `internal_network` is empty at prepare time; the
backend's launch step fills it via dataclasses.replace before
calling .start."""
slug: str
queue_dir: Path
current_config_dir: Path
internal_network: str = ""
class Supervise(ABC):
"""Per-bottle supervise sidecar. Encapsulates the host-side
prepare (queue dir + current-config staging); the sidecar's
start/stop lifecycle is backend-specific."""
def prepare(
self,
slug: str,
stage_dir: Path,
*,
routes_content: str = "",
allowlist_content: str = "",
dockerfile_content: str = "",
) -> SupervisePlan:
"""Stage the per-bottle queue dir on the host and the
current-config dir under `stage_dir`. Returns the plan;
`internal_network` must be set by the launch step before
.start runs."""
queue_dir = queue_dir_for_slug(slug)
queue_dir.mkdir(parents=True, exist_ok=True)
current_config_dir = stage_dir / "current-config"
current_config_dir.mkdir(parents=True, exist_ok=True)
(current_config_dir / CURRENT_CONFIG_ROUTES).write_text(
routes_content or '{"routes": []}\n'
)
(current_config_dir / CURRENT_CONFIG_ALLOWLIST).write_text(allowlist_content)
(current_config_dir / CURRENT_CONFIG_DOCKERFILE).write_text(dockerfile_content)
for name in (
CURRENT_CONFIG_ROUTES,
CURRENT_CONFIG_ALLOWLIST,
CURRENT_CONFIG_DOCKERFILE,
):
(current_config_dir / name).chmod(0o644)
return SupervisePlan(
slug=slug,
queue_dir=queue_dir,
current_config_dir=current_config_dir,
)
@abstractmethod
def start(self, plan: SupervisePlan) -> str:
"""Bring up the supervise sidecar according to `plan`. Returns
the target string identifying the running instance the same
value to pass to `.stop`. Backend-specific."""
@abstractmethod
def stop(self, target: str) -> None:
"""Tear down the supervise sidecar identified by `target`.
Idempotent: a missing target is success."""
# --- Helpers ---------------------------------------------------------------
def _require_str(raw: dict[str, object], key: str) -> str:
value = raw.get(key)
if not isinstance(value, str):
raise ValueError(f"missing or non-string field {key!r}")
return value
def _atomic_write(path: Path, content: str, *, mode: int) -> None:
"""Atomic: write to a sibling tmp file, fsync, rename."""
tmp = path.with_suffix(path.suffix + ".tmp")
fd = os.open(tmp, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, mode)
try:
os.write(fd, content.encode("utf-8"))
os.fsync(fd)
finally:
os.close(fd)
os.replace(tmp, path)
try:
import fcntl as _fcntl
def _try_flock(fd: int) -> None:
try:
_fcntl.flock(fd, _fcntl.LOCK_EX)
except OSError:
pass
def _try_funlock(fd: int) -> None:
try:
_fcntl.flock(fd, _fcntl.LOCK_UN)
except OSError:
pass
except ImportError: # pragma: no cover — Windows path
def _try_flock(fd: int) -> None:
return None
def _try_funlock(fd: int) -> None:
return None
__all__ = [
"ACTION_OPERATOR_EDIT",
"AuditEntry",
"COMPONENT_FOR_TOOL",
"CURRENT_CONFIG_ALLOWLIST",
"CURRENT_CONFIG_DIR_IN_AGENT",
"CURRENT_CONFIG_DOCKERFILE",
"CURRENT_CONFIG_ROUTES",
"DEFAULT_POLL_INTERVAL_SEC",
"Proposal",
"QUEUE_DIR_IN_CONTAINER",
"Response",
"STATUSES",
"STATUS_APPROVED",
"STATUS_MODIFIED",
"STATUS_REJECTED",
"SUPERVISE_HOSTNAME",
"SUPERVISE_PORT",
"Supervise",
"SupervisePlan",
"TOOLS",
"TOOL_CAPABILITY_BLOCK",
"TOOL_CRED_PROXY_BLOCK",
"TOOL_PIPELOCK_BLOCK",
"archive_proposal",
"audit_dir",
"audit_log_path",
"claude_bottle_root",
"list_pending_proposals",
"queue_dir_for_slug",
"read_audit_entries",
"read_proposal",
"read_response",
"render_diff",
"sha256_hex",
"wait_for_response",
"write_audit_entry",
"write_proposal",
"write_response",
]
+499
View File
@@ -0,0 +1,499 @@
"""Supervise sidecar HTTP server (PRD 0013).
Per-bottle MCP server exposing three tools `cred-proxy-block`,
`pipelock-block`, `capability-block` that the agent calls to
propose config changes when stuck. Each tool call:
1. Validates the proposed file syntactically.
2. Writes a Proposal to /run/supervise/queue/ (bind-mounted from
the host's ~/.claude-bottle/queue/<slug>/).
3. Blocks polling for a matching Response file.
4. Returns the operator's `{status, notes}` to the agent.
The bottle slug arrives via SUPERVISE_BOTTLE_SLUG env (stamped at
container creation by the backend's start step). The queue dir comes
from SUPERVISE_QUEUE_DIR (default `/run/supervise/queue`).
Speaks MCP over HTTP+JSON-RPC. Methods handled:
* `initialize` handshake; returns server info + caps.
* `notifications/initialized` ack-only.
* `tools/list` returns the three tool definitions.
* `tools/call` validates, queues, blocks, returns.
Everything else returns JSON-RPC error -32601 (method not found).
Stdlib-only. The Dockerfile copies this file + claude_bottle/supervise.py
into the image; the server imports `supervise` for the queue / Proposal
plumbing.
"""
from __future__ import annotations
import http.server
import json
import os
import socketserver
import sys
import typing
from dataclasses import dataclass
from pathlib import Path
# Same-directory import inside the container; `supervise.py` is COPYed
# alongside this file by Dockerfile.supervise.
import supervise as _sv
# --- JSON-RPC / MCP plumbing ----------------------------------------------
MCP_PROTOCOL_VERSION = "2024-11-05"
SERVER_NAME = "claude-bottle-supervise"
SERVER_VERSION = "0.1.0"
JSONRPC_VERSION = "2.0"
# JSON-RPC 2.0 standard error codes.
ERR_PARSE = -32700
ERR_INVALID_REQUEST = -32600
ERR_METHOD_NOT_FOUND = -32601
ERR_INVALID_PARAMS = -32602
ERR_INTERNAL = -32603
@dataclass(frozen=True)
class JsonRpcRequest:
method: str
params: dict[str, object]
id: object # None for notifications; int/str/null for requests
is_notification: bool
def parse_jsonrpc(body: bytes) -> JsonRpcRequest:
"""Parse a single JSON-RPC 2.0 request body. Raises ValueError
with a JSON-RPC error code attached if the shape is wrong."""
try:
raw = json.loads(body)
except json.JSONDecodeError as e:
raise _RpcError(ERR_PARSE, f"parse error: {e}") from e
if not isinstance(raw, dict):
raise _RpcError(ERR_INVALID_REQUEST, "request must be a JSON object")
if raw.get("jsonrpc") != JSONRPC_VERSION:
raise _RpcError(ERR_INVALID_REQUEST, "jsonrpc field must be '2.0'")
method = raw.get("method")
if not isinstance(method, str):
raise _RpcError(ERR_INVALID_REQUEST, "method must be a string")
params = raw.get("params", {})
if params is None:
params = {}
if not isinstance(params, dict):
raise _RpcError(ERR_INVALID_PARAMS, "params must be an object")
rpc_id = raw.get("id", _NO_ID)
is_notification = rpc_id is _NO_ID
return JsonRpcRequest(
method=method,
params=params,
id=None if is_notification else rpc_id,
is_notification=is_notification,
)
_NO_ID = object()
class _RpcError(Exception):
def __init__(self, code: int, message: str):
super().__init__(message)
self.code = code
self.message = message
def jsonrpc_result(request_id: object, result: object) -> bytes:
payload = {"jsonrpc": JSONRPC_VERSION, "id": request_id, "result": result}
return (json.dumps(payload) + "\n").encode("utf-8")
def jsonrpc_error(request_id: object, code: int, message: str) -> bytes:
payload = {
"jsonrpc": JSONRPC_VERSION,
"id": request_id,
"error": {"code": code, "message": message},
}
return (json.dumps(payload) + "\n").encode("utf-8")
# --- Tool definitions ------------------------------------------------------
TOOL_DEFINITIONS: list[dict[str, object]] = [
{
"name": _sv.TOOL_CRED_PROXY_BLOCK,
"description": (
"Call when cred-proxy refused your HTTPS request — missing "
"route, expired token, wrong scope (typically a 403 or a "
"404 from `http://cred-proxy:<port>/<path>/`). Read the "
"current routes.json from "
"/etc/claude-bottle/current-config/routes.json, compose a "
"modified version with the route you need, and pass the "
"full new file plus a justification. The operator approves "
"or rejects in the supervise TUI. On approval the supervisor "
"writes the new routes.json on the host and SIGHUPs cred-proxy "
"(wired in PRD 0014; in the v1 supervise foundation the "
"approval is acknowledged but no config change runs)."
),
"inputSchema": {
"type": "object",
"properties": {
"routes": {
"type": "string",
"description": "Full proposed routes.json file content (JSON text).",
},
"justification": {
"type": "string",
"description": "Why this routes change is justified.",
},
},
"required": ["routes", "justification"],
},
},
{
"name": _sv.TOOL_PIPELOCK_BLOCK,
"description": (
"Call when pipelock refused your outbound request — host "
"not in the allowlist, protocol blocked, connection "
"refused at the egress layer. Read the current allowlist "
"from /etc/claude-bottle/current-config/allowlist, compose "
"a modified version, and pass the full new file plus a "
"justification. On approval the supervisor writes the new "
"allowlist and restarts pipelock (wired in PRD 0015; v1 "
"acknowledges only)."
),
"inputSchema": {
"type": "object",
"properties": {
"allowlist": {
"type": "string",
"description": "Full proposed pipelock allowlist (one hostname per line).",
},
"justification": {
"type": "string",
"description": "Why the new host(s) should be allowed.",
},
},
"required": ["allowlist", "justification"],
},
},
{
"name": _sv.TOOL_CAPABILITY_BLOCK,
"description": (
"Call when the bottle is missing a tool, skill, permission, "
"or env var you need — something that lives in the agent "
"Dockerfile rather than in routes or the pipelock allowlist. "
"Read the current Dockerfile from "
"/etc/claude-bottle/current-config/Dockerfile, compose a "
"modified version, and pass the full new file plus a "
"justification. On approval the supervisor rebuilds the "
"bottle from the new Dockerfile and starts a replacement on "
"the same branch (wired in PRD 0016; v1 acknowledges only)."
),
"inputSchema": {
"type": "object",
"properties": {
"dockerfile": {
"type": "string",
"description": "Full proposed Dockerfile content.",
},
"justification": {
"type": "string",
"description": "Why this capability is needed.",
},
},
"required": ["dockerfile", "justification"],
},
},
]
# Map each tool to the input field that carries the proposed file.
PROPOSED_FILE_FIELD: dict[str, str] = {
_sv.TOOL_CRED_PROXY_BLOCK: "routes",
_sv.TOOL_PIPELOCK_BLOCK: "allowlist",
_sv.TOOL_CAPABILITY_BLOCK: "dockerfile",
}
# --- Validation ------------------------------------------------------------
def validate_proposed_file(tool: str, content: str) -> None:
"""Syntactic validation. The operator is the real gate; this just
catches obvious paste-errors / wrong-tool selections before they
enter the queue."""
if not content.strip():
raise _RpcError(ERR_INVALID_PARAMS, f"{tool}: proposed file is empty")
if tool == _sv.TOOL_CRED_PROXY_BLOCK:
try:
parsed = json.loads(content)
except json.JSONDecodeError as e:
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: proposed routes.json is not valid JSON: {e}",
) from e
if not isinstance(parsed, dict) or not isinstance(parsed.get("routes"), list):
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: proposed routes.json must be an object with a 'routes' array",
)
elif tool == _sv.TOOL_PIPELOCK_BLOCK:
for i, line in enumerate(content.splitlines()):
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
# Hostnames are conservative: letters/digits/dots/dashes only.
for ch in stripped:
if not (ch.isalnum() or ch in ".-_"):
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: allowlist line {i + 1} has invalid character {ch!r}",
)
elif tool == _sv.TOOL_CAPABILITY_BLOCK:
# Dockerfiles are too varied to validate syntactically beyond
# non-empty. The operator reads the diff in the TUI.
pass
else:
raise _RpcError(ERR_INVALID_PARAMS, f"unknown tool {tool!r}")
# --- MCP handlers ----------------------------------------------------------
@dataclass(frozen=True)
class ServerConfig:
bottle_slug: str
queue_dir: Path
def handle_initialize(_params: dict[str, object]) -> dict[str, object]:
return {
"protocolVersion": MCP_PROTOCOL_VERSION,
"capabilities": {"tools": {"listChanged": False}},
"serverInfo": {"name": SERVER_NAME, "version": SERVER_VERSION},
}
def handle_tools_list(_params: dict[str, object]) -> dict[str, object]:
return {"tools": TOOL_DEFINITIONS}
def handle_tools_call(
params: dict[str, object],
config: ServerConfig,
) -> dict[str, object]:
"""Validates the proposal, writes it to the queue, blocks waiting
for a Response, returns the result wrapped in MCP `content`."""
name = params.get("name")
if not isinstance(name, str):
raise _RpcError(ERR_INVALID_PARAMS, "tools/call missing 'name'")
if name not in PROPOSED_FILE_FIELD:
raise _RpcError(ERR_INVALID_PARAMS, f"unknown tool {name!r}")
args_raw = params.get("arguments", {})
if not isinstance(args_raw, dict):
raise _RpcError(ERR_INVALID_PARAMS, "tools/call 'arguments' must be an object")
file_field = PROPOSED_FILE_FIELD[name]
proposed_file = args_raw.get(file_field)
justification = args_raw.get("justification")
if not isinstance(proposed_file, str):
raise _RpcError(
ERR_INVALID_PARAMS,
f"{name}: '{file_field}' is required and must be a string",
)
if not isinstance(justification, str) or not justification.strip():
raise _RpcError(
ERR_INVALID_PARAMS,
f"{name}: 'justification' is required and must be a non-empty string",
)
validate_proposed_file(name, proposed_file)
proposal = _sv.Proposal.new(
bottle_slug=config.bottle_slug,
tool=name,
proposed_file=proposed_file,
justification=justification,
current_file_hash=_sv.sha256_hex(proposed_file),
)
_sv.write_proposal(config.queue_dir, proposal)
sys.stderr.write(
f"supervise: queued proposal {proposal.id} ({name}) "
f"for bottle {config.bottle_slug}; waiting for operator...\n"
)
sys.stderr.flush()
response = _sv.wait_for_response(config.queue_dir, proposal.id)
_sv.archive_proposal(config.queue_dir, proposal.id)
text = format_response_text(response)
return {
"content": [{"type": "text", "text": text}],
"isError": response.status == _sv.STATUS_REJECTED,
}
def format_response_text(response: "_sv.Response") -> str:
"""Pretty-print a Response for the tool's text content. The agent
reads the text and decides whether to retry / give up / surface."""
lines = [f"status: {response.status}"]
if response.notes:
lines.append(f"notes: {response.notes}")
if response.status == _sv.STATUS_MODIFIED and response.final_file is not None:
lines.append("the operator modified your proposal before approving; "
"the final config is now what's been applied")
return "\n".join(lines)
# --- HTTP transport --------------------------------------------------------
# Max request body the server accepts. Generous because Dockerfile
# proposals can be a few KB; routes.json is small. 1 MB is well above
# any realistic config file.
MAX_BODY_BYTES = 1 * 1024 * 1024
class MCPHandler(http.server.BaseHTTPRequestHandler):
"""Per-request JSON-RPC handler. Each tools/call may block for
a long time; the ThreadingMixIn on the server class ensures
other requests can be processed concurrently."""
server_version = f"{SERVER_NAME}/{SERVER_VERSION}"
def log_message(self, format: str, *args: typing.Any) -> None:
if os.environ.get("SUPERVISE_DEBUG"):
super().log_message(format, *args)
def do_GET(self) -> None:
# /health for liveness; everything else 405. POST is the only
# method MCP needs.
if self.path == "/health":
self._write_text(200, "ok\n")
return
self._write_text(405, "use POST for MCP requests\n")
def do_POST(self) -> None:
length_header = self.headers.get("Content-Length")
if length_header is None:
self._write_text(411, "Content-Length required\n")
return
try:
length = int(length_header)
except ValueError:
self._write_text(400, "invalid Content-Length\n")
return
if length < 0 or length > MAX_BODY_BYTES:
self._write_text(413, "request body too large\n")
return
body = self.rfile.read(length)
try:
req = parse_jsonrpc(body)
except _RpcError as e:
self._write_jsonrpc(jsonrpc_error(None, e.code, e.message))
return
config = typing.cast("MCPServer", self.server).config
try:
result = self._dispatch(req, config)
except _RpcError as e:
self._write_jsonrpc(jsonrpc_error(req.id, e.code, e.message))
return
except Exception as e: # pragma: no cover — defensive
sys.stderr.write(f"supervise: internal error: {e}\n")
self._write_jsonrpc(jsonrpc_error(req.id, ERR_INTERNAL, "internal error"))
return
if req.is_notification:
self._write_text(202, "")
return
self._write_jsonrpc(jsonrpc_result(req.id, result))
def _dispatch(self, req: JsonRpcRequest, config: ServerConfig) -> object:
method = req.method
if method == "initialize":
return handle_initialize(req.params)
if method == "notifications/initialized":
return None # ack-only
if method == "tools/list":
return handle_tools_list(req.params)
if method == "tools/call":
return handle_tools_call(req.params, config)
raise _RpcError(ERR_METHOD_NOT_FOUND, f"method not found: {method}")
def _write_jsonrpc(self, body: bytes) -> None:
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.send_header("Connection", "close")
self.end_headers()
self.wfile.write(body)
def _write_text(self, status: int, body: str) -> None:
encoded = body.encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "text/plain; charset=utf-8")
self.send_header("Content-Length", str(len(encoded)))
self.send_header("Connection", "close")
self.end_headers()
if encoded:
self.wfile.write(encoded)
class MCPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
allow_reuse_address = True
daemon_threads = True
config: ServerConfig = ServerConfig(bottle_slug="", queue_dir=Path())
# --- Entry point -----------------------------------------------------------
def serve(
*,
bottle_slug: str,
queue_dir: Path,
port: int = _sv.SUPERVISE_PORT,
bind: str = "0.0.0.0",
) -> typing.NoReturn:
queue_dir.mkdir(parents=True, exist_ok=True)
server = MCPServer((bind, port), MCPHandler)
server.config = ServerConfig(bottle_slug=bottle_slug, queue_dir=queue_dir)
sys.stderr.write(
f"supervise listening on {bind}:{port}; "
f"slug={bottle_slug!r}; queue={queue_dir}; "
f"tools: {', '.join(t['name'] for t in TOOL_DEFINITIONS)}\n" # type: ignore[arg-type]
)
sys.stderr.flush()
try:
server.serve_forever()
except KeyboardInterrupt:
pass
finally:
server.server_close()
sys.exit(0)
def main(argv: list[str]) -> int:
del argv # config is env-only, matches cred_proxy_server pattern
bottle_slug = os.environ.get("SUPERVISE_BOTTLE_SLUG", "")
if not bottle_slug:
sys.stderr.write("supervise: SUPERVISE_BOTTLE_SLUG env is unset\n")
return 2
queue_dir = Path(os.environ.get("SUPERVISE_QUEUE_DIR", _sv.QUEUE_DIR_IN_CONTAINER))
port = int(os.environ.get("SUPERVISE_PORT", str(_sv.SUPERVISE_PORT)))
bind = os.environ.get("SUPERVISE_BIND", "0.0.0.0")
serve(bottle_slug=bottle_slug, queue_dir=queue_dir, port=port, bind=bind)
return 0 # serve() does not return
if __name__ == "__main__":
raise SystemExit(main(sys.argv))
@@ -0,0 +1,83 @@
# PRD 0013: Supervise plane foundation
- **Status:** Draft
- **Author:** didericis
- **Created:** 2026-05-25
- **Parent:** PRD 0012
## Summary
The shared infrastructure that PRDs 00140016 build on. Adds a per-bottle MCP sidecar that exposes three tools (`cred-proxy-block`, `pipelock-block`, `capability-block`) to the agent; a read-only `/etc/claude-bottle/current-config/` mount in the agent container that exposes the current `routes.json`, pipelock allowlist, and Dockerfile; a host-mounted proposal queue; a minimal TUI dashboard that lists pending proposals and supports approve / modify / reject; and the audit log format. After this PRD, an operator can see proposals and approve/reject them — but the approval handlers are no-ops. The remediation engines that actually act on approvals land in 0014, 0015, and 0016.
## Problem
See PRD 0012 for the broader stuck-agent problem. This PRD specifically addresses: there is no protocol for the agent to ask the operator for help, no place for the operator to see what the agent is asking, and no audit trail tying agent asks to operator decisions.
## Goals / Success Criteria
- The agent in a bottle can call any of the three MCP tools and receive a structured response from a real operator action.
- The operator can list pending proposals across all running bottles in a TUI and approve / modify / reject each one with a single command.
- Each approve / modify / reject decision writes an entry to the bottle's audit log, capturing the agent's justification and the operator's action.
- The approval handlers in 0013 are deliberately no-ops: an "approved" response is delivered to the tool, but no host-side config change happens. 00140016 wire in the actual remediations.
## Non-goals
- Any actual remediation: SIGHUP reload, pipelock restart, bottle rebuild are all out of scope for 0013 (covered by 0014, 0015, 0016 respectively).
- TUI polish beyond minimum viable. v1 list + approve/reject is enough.
- Proactive operator-initiated `routes edit <bottle>` / `pipelock edit <bottle>` verbs — they live with the remediation PRDs that own those components.
## Scope
### In scope
- A per-bottle MCP sidecar container on the bottle's internal network.
- MCP tool definitions for `cred-proxy-block`, `pipelock-block`, `capability-block` (input schemas as defined in PRD 0012 *Stuck categories*).
- Tool output: `{status: "approved"|"modified"|"rejected", notes: "..."}`.
- A read-only mount at `/etc/claude-bottle/current-config/` in the agent container exposing the current `routes.json`, pipelock allowlist, and Dockerfile.
- A host-mounted per-bottle proposal queue at `~/.claude-bottle/queue/<slug>/` (file-per-proposal, with metadata and proposed file content).
- A `claude-bottle dashboard` (or similarly named) TUI that lists running bottles and pending proposals across all of them; supports approve, modify-then-approve, and reject-with-reason for each pending proposal.
- Audit log files at `~/.claude-bottle/audit/cred-proxy-<slug>.log` and `~/.claude-bottle/audit/pipelock-<slug>.log` with the agreed-upon format (timestamp, diff before/after, justification text, operator action with notes). Entries are written by the supervisor on each approve/modify/reject decision. (capability-block has no separate audit log — capability changes are captured by the bottle's rebuild record / git history.)
- Bottle lifecycle script changes to launch the MCP sidecar alongside the other sidecars and mount the read-only current-config directory.
### Out of scope
- The remediation engines themselves (0014, 0015, 0016).
- Proactive operator-initiated `routes edit <bottle>` / `pipelock edit <bottle>` verbs.
## Proposed Design
### New services / components
- **MCP sidecar.** New per-bottle container on the bottle's internal network. Exposes the three tools to the agent. On a tool call: validates the proposed file syntactically (valid JSON for `routes.json`, parseable Dockerfile, etc.), writes the proposal to the queue, and holds the tool-call connection open until the supervisor responds. Returns `{status, notes}` to the agent on response.
- **Read-only current-config mount.** `/etc/claude-bottle/current-config/` in the agent container exposes `routes.json`, the pipelock allowlist, and the agent Dockerfile from the host. Read-only — the agent proposes changes via the tool call, never by writing the file directly.
- **Proposal queue.** Per-bottle directory under `~/.claude-bottle/queue/<slug>/` on the host. One file per pending proposal with `{id, tool, proposed_file, justification, arrival_timestamp, current_file_hash, bottle_slug}`.
- **Minimal TUI dashboard.** Lists running bottles and pending proposals. For each proposal: shows current vs. proposed diff and justification. Operator actions: approve / modify-then-approve / reject-with-reason. Stdlib only (curses) unless that proves painful.
- **Audit log format.** Append-only files at `~/.claude-bottle/audit/<component>-<slug>.log`. Each entry: timestamp, diff before/after, agent justification (if from a tool call), operator action + notes. Defines the format; the per-component PRDs (0014, 0015) fill in real entries.
- **No-op approval handlers.** Each tool's approve path in 0013 writes an audit entry and returns `{status: "approved"}` to the agent but doesn't actually change any config. 0014 / 0015 / 0016 replace these with real handlers.
### Existing code touched
- **Bottle lifecycle scripts** — launch the MCP sidecar alongside other sidecars; mount `/etc/claude-bottle/current-config/` read-only into the agent container.
- **`cli.py`** — adds the dashboard subcommand.
### Data model changes
- A per-bottle pending-proposal queue (see above).
- Per-bottle audit log files (see above).
### External dependencies
- An MCP server library / framework. Pick the lightest option that lets the sidecar advertise three tools with structured input/output schemas; do not adopt a heavier MCP framework than the three tools justify.
- A TUI library is a *maybe* — only if stdlib can't carry the dashboard experience. Default to no new dependency.
## Open questions
- **MCP sidecar placement: own container vs. fold into cred-proxy.** v1 plan is its own container. Folding saves one sidecar per bottle but mixes the credential plane and the supervise plane. Worth deciding once the sidecar's actual line count is known.
- **Multiple pending proposals from the same bottle.** If the agent calls a second tool before the first is answered: replace, append, or refuse? Append feels safest; replace is wrong (loses context); refuse forces the agent to handle a new error mode. Also: can different tools from the same bottle be pending simultaneously?
- **Proposal validation strictness.** The sidecar validates syntactically. Should it also do a deeper check — e.g. does the proposed `routes.json` introduce a route the operator already rejected this session? Probably no for v1; the operator is the gate.
## References
- PRD 0010 — cred-proxy.
- PRD 0012 — stuck-agent recovery flow overview.
- PRD 0014 / 0015 / 0016 — remediation engines that plug into the foundation laid here.
+287
View File
@@ -0,0 +1,287 @@
"""Integration: drive `DockerSupervise.start` against the supervise
sidecar and round-trip an MCP tool call through the queue (PRD 0013).
Topology mirrors production minimally: a per-bottle internal docker
network for the agent supervise leg, no egress network (supervise
doesn't make outbound calls). The "agent" is a curl container on the
internal net; the supervisor lives on the host (this test process)
and uses claude_bottle.cli.dashboard helpers to write Response files.
Verifies:
1. `tools/list` returns the three PRD 0013 tool names over real MCP
wire format.
2. A `tools/call` from the in-container agent blocks until the host
writes a Response to the queue; once written, the agent receives
the approval payload.
"""
from __future__ import annotations
import json
import os
import shutil
import subprocess
import tempfile
import threading
import time
import unittest
from pathlib import Path
from claude_bottle import supervise as _sv
from claude_bottle.backend.docker.network import (
network_create_internal,
network_remove,
)
from claude_bottle.backend.docker.supervise import (
DockerSupervise,
build_supervise_image,
supervise_container_name,
)
from claude_bottle.cli import dashboard
from claude_bottle.supervise import SupervisePlan, list_pending_proposals
from tests._docker import skip_unless_docker
CURL_IMAGE = "curlimages/curl:latest"
@skip_unless_docker()
class TestSuperviseSidecar(unittest.TestCase):
@classmethod
def setUpClass(cls):
r = subprocess.run(
["docker", "pull", CURL_IMAGE],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
)
if r.returncode != 0:
raise unittest.SkipTest(f"could not pull {CURL_IMAGE}")
build_supervise_image()
def setUp(self):
self.slug = f"cb-test-sv-{os.getpid()}-{int(time.time())}"
self.sidecar_name = ""
self.internal_net = ""
self.work_dir = Path(tempfile.mkdtemp(prefix="supervise-int."))
self.queue_dir = self.work_dir / "queue"
self.queue_dir.mkdir()
def tearDown(self):
if self.sidecar_name:
subprocess.run(
["docker", "rm", "-f", self.sidecar_name],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
)
if self.internal_net:
network_remove(self.internal_net)
shutil.rmtree(self.work_dir, ignore_errors=True)
def _require_bind_mount_sharing(self) -> None:
"""Skip if `docker run -v <host-path>:<container-path>` doesn't
share the filesystem between the test process and the spawned
container. In docker-in-docker CI (Gitea Actions runner with
host socket forwarded), bind-mount paths are resolved against
the outer host's fs, not the runner container's so the
sidecar writes proposals to a dir the test process can't see.
Cached on the class so the probe runs once per test session."""
cached = getattr(type(self), "_bind_mount_ok", None)
if cached is True:
return
if cached is False:
self.skipTest(
"docker bind mounts don't share fs with this test process "
"(likely docker-in-docker); the supervise queue round-trip "
"requires real host fs sharing"
)
probe_dir = Path(tempfile.mkdtemp(prefix="supervise-bind-probe."))
try:
(probe_dir / "from-host").write_text("x")
r = subprocess.run(
[
"docker", "run", "--rm",
"-v", f"{probe_dir}:/probe",
"--entrypoint", "sh",
CURL_IMAGE,
"-c", "test -f /probe/from-host && touch /probe/from-container",
],
capture_output=True,
check=False,
)
ok = (
r.returncode == 0
and (probe_dir / "from-container").exists()
)
finally:
shutil.rmtree(probe_dir, ignore_errors=True)
type(self)._bind_mount_ok = ok
if not ok:
self.skipTest(
"docker bind mounts don't share fs with this test process "
"(likely docker-in-docker); the supervise queue round-trip "
"requires real host fs sharing"
)
def _bring_up_sidecar(self) -> None:
self.internal_net = network_create_internal(self.slug)
plan = SupervisePlan(
slug=self.slug,
queue_dir=self.queue_dir,
current_config_dir=self.work_dir / "current-config",
internal_network=self.internal_net,
)
# current_config_dir isn't bind-mounted into the sidecar, only
# the queue dir is. Create it for symmetry with production.
plan.current_config_dir.mkdir()
self.sidecar_name = DockerSupervise().start(plan)
# Block until the server is ready to answer (the container
# `docker start` returns immediately; python is still
# binding to the port).
deadline = time.monotonic() + 10.0
while time.monotonic() < deadline:
rc = subprocess.run(
[
"docker", "run", "--rm",
"--network", self.internal_net,
CURL_IMAGE,
"-fsS", "-o", "/dev/null",
"--max-time", "2",
f"http://{_sv.SUPERVISE_HOSTNAME}:{_sv.SUPERVISE_PORT}/health",
],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
).returncode
if rc == 0:
return
time.sleep(0.25)
raise AssertionError("supervise sidecar /health never came up")
def _curl_jsonrpc(self, body: dict[str, object]) -> dict[str, object]:
"""Invoke curl on the internal network to POST a JSON-RPC
request to the supervise sidecar and parse the response."""
payload = json.dumps(body)
result = subprocess.run(
[
"docker", "run", "--rm",
"--network", self.internal_net,
CURL_IMAGE,
"-sS", "--max-time", "30",
"-H", "Content-Type: application/json",
"-X", "POST",
"--data", payload,
f"http://{_sv.SUPERVISE_HOSTNAME}:{_sv.SUPERVISE_PORT}/",
],
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0:
raise AssertionError(
f"curl to supervise failed: {result.stderr}\n"
f"stdout: {result.stdout}"
)
return json.loads(result.stdout)
def test_tools_list_over_mcp(self):
self._bring_up_sidecar()
result = self._curl_jsonrpc(
{"jsonrpc": "2.0", "id": 1, "method": "tools/list"},
)
self.assertEqual(1, result["id"])
names = {t["name"] for t in result["result"]["tools"]}
self.assertEqual(
{
_sv.TOOL_CRED_PROXY_BLOCK,
_sv.TOOL_PIPELOCK_BLOCK,
_sv.TOOL_CAPABILITY_BLOCK,
},
names,
)
def test_tools_call_round_trips_through_queue(self):
"""End-to-end: agent in the bottle calls cred-proxy-block;
the call blocks on the queue; the host approves via the
dashboard helpers; the agent receives the approval."""
self._require_bind_mount_sharing()
self._bring_up_sidecar()
captured: dict[str, object] = {}
def caller() -> None:
captured["response"] = self._curl_jsonrpc({
"jsonrpc": "2.0", "id": 7, "method": "tools/call",
"params": {
"name": _sv.TOOL_CRED_PROXY_BLOCK,
"arguments": {
"routes": '{"routes": [{"path": "/x/"}]}',
"justification": "integration test",
},
},
})
t = threading.Thread(target=caller)
t.start()
try:
# Wait for the proposal to appear in the queue (the
# sidecar writes it before blocking on wait_for_response).
deadline = time.monotonic() + 10.0
qp = None
while time.monotonic() < deadline:
pending = list_pending_proposals(self.queue_dir)
if pending:
qp = dashboard.QueuedProposal(
proposal=pending[0], queue_dir=self.queue_dir,
)
break
time.sleep(0.1)
self.assertIsNotNone(qp, "proposal never appeared in queue")
assert qp is not None # type-narrowing
self.assertEqual(
_sv.TOOL_CRED_PROXY_BLOCK, qp.proposal.tool,
)
self.assertEqual("integration test", qp.proposal.justification)
# Approve via the dashboard helper (same path the TUI
# uses). For 0013 this writes a Response file + a no-op
# audit entry (no real config change).
dashboard.approve(qp, notes="lgtm from integration test")
finally:
t.join(timeout=20)
response = captured.get("response")
self.assertIsNotNone(response, "curl thread never produced a response")
assert isinstance(response, dict) # type-narrowing
self.assertEqual(7, response["id"])
result = response["result"]
assert isinstance(result, dict)
self.assertFalse(result.get("isError"))
text = result["content"][0]["text"]
self.assertIn("status: approved", text)
self.assertIn("notes: lgtm from integration test", text)
def test_orphan_sidecar_name_collision_recovered(self):
"""An orphan supervise sidecar from a previous run blocks
the next .start with a duplicate-name error. Documents the
observed behavior so a future change that adds auto-cleanup
can flip the assertion."""
self._bring_up_sidecar()
self.assertEqual(supervise_container_name(self.slug), self.sidecar_name)
# Second .start should fail because the container name is
# taken. cleanup is handled by the orphan probe in prepare.py
# (tested separately in test_orphan_cleanup).
with self.assertRaises(SystemExit):
DockerSupervise().start(SupervisePlan(
slug=self.slug,
queue_dir=self.queue_dir,
current_config_dir=self.work_dir / "current-config",
internal_network=self.internal_net,
))
if __name__ == "__main__":
unittest.main()
+227
View File
@@ -0,0 +1,227 @@
"""Unit: dashboard headless paths (PRD 0013 phase 4).
The curses TUI itself isn't exercised here — these tests cover the
discovery + approve/reject + audit-write paths that the TUI's key
handlers call into.
"""
import os
import tempfile
import unittest
from datetime import datetime, timezone
from pathlib import Path
from claude_bottle import supervise
from claude_bottle.cli import dashboard
from claude_bottle.supervise import (
Proposal,
STATUS_APPROVED,
STATUS_MODIFIED,
STATUS_REJECTED,
TOOL_CAPABILITY_BLOCK,
TOOL_CRED_PROXY_BLOCK,
TOOL_PIPELOCK_BLOCK,
read_audit_entries,
read_response,
sha256_hex,
)
FIXED = datetime(2026, 5, 25, 12, 0, 0, tzinfo=timezone.utc)
def _proposal(slug: str = "dev", tool: str = TOOL_CRED_PROXY_BLOCK) -> Proposal:
return Proposal.new(
bottle_slug=slug, tool=tool,
proposed_file='{"routes": []}\n',
justification=f"needed for {slug}",
current_file_hash=sha256_hex("{}"),
now=FIXED,
)
class _FakeHomeMixin:
"""Patch supervise.claude_bottle_root to a temp dir for the test."""
def _setup_fake_home(self):
self._tmp = tempfile.TemporaryDirectory(prefix="dashboard-test.")
original = supervise.claude_bottle_root
def fake_root() -> Path:
return Path(self._tmp.name) / ".claude-bottle"
supervise.claude_bottle_root = fake_root # type: ignore[assignment]
self._restore_home = lambda: setattr(supervise, "claude_bottle_root", original)
def _teardown_fake_home(self):
self._restore_home()
self._tmp.cleanup()
class TestDiscoverPending(_FakeHomeMixin, unittest.TestCase):
def setUp(self):
self._setup_fake_home()
def tearDown(self):
self._teardown_fake_home()
def test_empty_when_no_queues(self):
self.assertEqual([], dashboard.discover_pending())
def test_walks_all_slug_subdirs(self):
for slug in ("dev", "api"):
qdir = supervise.queue_dir_for_slug(slug)
qdir.mkdir(parents=True)
supervise.write_proposal(qdir, _proposal(slug=slug))
pending = dashboard.discover_pending()
self.assertEqual({"dev", "api"}, {qp.proposal.bottle_slug for qp in pending})
def test_sorted_by_arrival_across_bottles(self):
early = Proposal.new(
bottle_slug="api", tool=TOOL_CRED_PROXY_BLOCK,
proposed_file="{}", justification="early",
current_file_hash="h",
now=datetime(2026, 5, 25, 10, 0, 0, tzinfo=timezone.utc),
)
late = Proposal.new(
bottle_slug="dev", tool=TOOL_CRED_PROXY_BLOCK,
proposed_file="{}", justification="late",
current_file_hash="h",
now=datetime(2026, 5, 25, 14, 0, 0, tzinfo=timezone.utc),
)
for p in (late, early):
qdir = supervise.queue_dir_for_slug(p.bottle_slug)
qdir.mkdir(parents=True, exist_ok=True)
supervise.write_proposal(qdir, p)
pending = dashboard.discover_pending()
self.assertEqual([early.id, late.id], [qp.proposal.id for qp in pending])
def test_excludes_already_responded(self):
p = _proposal()
qdir = supervise.queue_dir_for_slug("dev")
qdir.mkdir(parents=True)
supervise.write_proposal(qdir, p)
supervise.write_response(qdir, supervise.Response(
proposal_id=p.id, status=STATUS_APPROVED, notes="",
))
self.assertEqual([], dashboard.discover_pending())
class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
def setUp(self):
self._setup_fake_home()
def tearDown(self):
self._teardown_fake_home()
def _enqueue(self, tool: str = TOOL_CRED_PROXY_BLOCK):
p = _proposal(tool=tool)
qdir = supervise.queue_dir_for_slug("dev")
qdir.mkdir(parents=True, exist_ok=True)
supervise.write_proposal(qdir, p)
return dashboard.QueuedProposal(proposal=p, queue_dir=qdir)
def test_approve_writes_response_and_audit(self):
qp = self._enqueue()
dashboard.approve(qp)
resp = read_response(qp.queue_dir, qp.proposal.id)
self.assertEqual(STATUS_APPROVED, resp.status)
self.assertIsNone(resp.final_file)
entries = read_audit_entries("cred-proxy", "dev")
self.assertEqual(1, len(entries))
self.assertEqual("approved", entries[0].operator_action)
def test_approve_with_final_file_marks_modified(self):
qp = self._enqueue()
dashboard.approve(qp, final_file='{"routes": [{"path": "/x/"}]}\n', notes="tweaked")
resp = read_response(qp.queue_dir, qp.proposal.id)
self.assertEqual(STATUS_MODIFIED, resp.status)
self.assertEqual('{"routes": [{"path": "/x/"}]}\n', resp.final_file)
self.assertEqual("tweaked", resp.notes)
entries = read_audit_entries("cred-proxy", "dev")
self.assertEqual("modified", entries[0].operator_action)
def test_reject_writes_rejection(self):
qp = self._enqueue()
dashboard.reject(qp, reason="nope")
resp = read_response(qp.queue_dir, qp.proposal.id)
self.assertEqual(STATUS_REJECTED, resp.status)
self.assertEqual("nope", resp.notes)
entries = read_audit_entries("cred-proxy", "dev")
self.assertEqual("rejected", entries[0].operator_action)
self.assertEqual("nope", entries[0].operator_notes)
def test_capability_block_skips_audit_log(self):
qp = self._enqueue(tool=TOOL_CAPABILITY_BLOCK)
dashboard.approve(qp)
# No audit log for capability-block (per PRD 0013 / 0016).
# cred-proxy and pipelock logs both empty.
self.assertEqual([], read_audit_entries("cred-proxy", "dev"))
self.assertEqual([], read_audit_entries("pipelock", "dev"))
def test_pipelock_audit_distinct_from_cred_proxy(self):
qp = self._enqueue(tool=TOOL_PIPELOCK_BLOCK)
dashboard.approve(qp)
self.assertEqual(1, len(read_audit_entries("pipelock", "dev")))
self.assertEqual(0, len(read_audit_entries("cred-proxy", "dev")))
class TestEditInEditor(unittest.TestCase):
def test_runs_editor_returns_edited_content(self):
# Fake "editor" is /bin/sh -c 'cat <<EOF > $1 ... EOF'
original_editor = os.environ.get("EDITOR")
try:
# Use a fake editor that overwrites the file with a known
# marker. EDITOR is split with shlex equivalence by
# subprocess.run when invoked as a list — keep it as a
# single program path that takes the file as argv[1].
os.environ["EDITOR"] = (
"/bin/sh -c 'printf %s \"edited\" > \"$0\"'"
)
# subprocess.run with the str as the first list element
# would try to find a binary literally named "/bin/sh -c ..."
# — that won't work. Use shell mode trick: wrap in a script.
# Easier: build a tiny helper script.
with tempfile.NamedTemporaryFile(
mode="w", suffix=".sh", delete=False, prefix="fake-editor.",
) as script:
script.write('#!/bin/sh\nprintf "%s" "edited" > "$1"\n')
editor_script = script.name
os.chmod(editor_script, 0o755)
os.environ["EDITOR"] = editor_script
try:
result = dashboard.edit_in_editor("original")
self.assertEqual("edited", result)
finally:
os.unlink(editor_script)
finally:
if original_editor is None:
os.environ.pop("EDITOR", None)
else:
os.environ["EDITOR"] = original_editor
def test_returns_none_when_unchanged(self):
original_editor = os.environ.get("EDITOR")
try:
# No-op editor: touch the file (leaves it unchanged).
with tempfile.NamedTemporaryFile(
mode="w", suffix=".sh", delete=False, prefix="noop-editor.",
) as script:
script.write('#!/bin/sh\n: $1\n')
editor_script = script.name
os.chmod(editor_script, 0o755)
os.environ["EDITOR"] = editor_script
try:
result = dashboard.edit_in_editor("original")
self.assertIsNone(result)
finally:
os.unlink(editor_script)
finally:
if original_editor is None:
os.environ.pop("EDITOR", None)
else:
os.environ["EDITOR"] = original_editor
if __name__ == "__main__":
unittest.main()
+390
View File
@@ -0,0 +1,390 @@
"""Unit: supervise queue + audit log + diff helpers (PRD 0013)."""
import json
import tempfile
import threading
import time
import unittest
from datetime import datetime, timezone
from pathlib import Path
from claude_bottle import supervise
from claude_bottle.supervise import (
AuditEntry,
Proposal,
Response,
STATUS_APPROVED,
STATUS_MODIFIED,
STATUS_REJECTED,
TOOL_CAPABILITY_BLOCK,
TOOL_CRED_PROXY_BLOCK,
TOOL_PIPELOCK_BLOCK,
archive_proposal,
audit_log_path,
list_pending_proposals,
read_audit_entries,
read_proposal,
read_response,
render_diff,
sha256_hex,
wait_for_response,
write_audit_entry,
write_proposal,
write_response,
)
FIXED_TS = datetime(2026, 5, 25, 12, 0, 0, tzinfo=timezone.utc)
def _proposal(tool: str = TOOL_CRED_PROXY_BLOCK, proposed: str = "{}", justification: str = "need a route") -> Proposal:
return Proposal.new(
bottle_slug="dev",
tool=tool,
proposed_file=proposed,
justification=justification,
current_file_hash=sha256_hex("{}"),
now=FIXED_TS,
)
class TestProposalRoundtrip(unittest.TestCase):
def test_new_stamps_uuid_and_iso_timestamp(self):
p = _proposal()
self.assertTrue(p.id)
self.assertEqual("2026-05-25T12:00:00+00:00", p.arrival_timestamp)
self.assertEqual("dev", p.bottle_slug)
self.assertEqual(TOOL_CRED_PROXY_BLOCK, p.tool)
def test_to_from_dict_roundtrip(self):
p = _proposal()
self.assertEqual(p, Proposal.from_dict(p.to_dict()))
def test_from_dict_rejects_unknown_tool(self):
raw = _proposal().to_dict()
raw["tool"] = "not-a-real-tool"
with self.assertRaises(ValueError):
Proposal.from_dict(raw)
def test_from_dict_rejects_missing_field(self):
raw = _proposal().to_dict()
del raw["justification"]
with self.assertRaises(ValueError):
Proposal.from_dict(raw)
class TestResponseRoundtrip(unittest.TestCase):
def test_to_from_dict_approved(self):
r = Response(proposal_id="abc", status=STATUS_APPROVED, notes="lgtm")
self.assertEqual(r, Response.from_dict(r.to_dict()))
def test_to_from_dict_modified_with_final_file(self):
r = Response(
proposal_id="abc",
status=STATUS_MODIFIED,
notes="tweaked the upstream",
final_file='{"routes": []}\n',
)
self.assertEqual(r, Response.from_dict(r.to_dict()))
def test_rejects_unknown_status(self):
with self.assertRaises(ValueError):
Response.from_dict({
"proposal_id": "abc",
"status": "maybe",
"notes": "",
"final_file": None,
})
def test_rejects_non_string_final_file(self):
with self.assertRaises(ValueError):
Response.from_dict({
"proposal_id": "abc",
"status": STATUS_APPROVED,
"notes": "",
"final_file": 123,
})
class TestQueueIO(unittest.TestCase):
def setUp(self):
self._tmp = tempfile.TemporaryDirectory(prefix="claude-bottle-supervise-test.")
self.queue_dir = Path(self._tmp.name)
def tearDown(self):
self._tmp.cleanup()
def test_write_and_read_proposal(self):
p = _proposal()
path = write_proposal(self.queue_dir, p)
self.assertTrue(path.exists())
self.assertEqual(0o600, path.stat().st_mode & 0o777)
loaded = read_proposal(self.queue_dir, p.id)
self.assertEqual(p, loaded)
def test_list_pending_excludes_responded(self):
a = _proposal(justification="first")
b = _proposal(justification="second")
write_proposal(self.queue_dir, a)
write_proposal(self.queue_dir, b)
write_response(self.queue_dir, Response(
proposal_id=a.id, status=STATUS_APPROVED, notes="",
))
pending = list_pending_proposals(self.queue_dir)
self.assertEqual([b.id], [p.id for p in pending])
def test_list_pending_returns_empty_for_missing_dir(self):
self.assertEqual([], list_pending_proposals(self.queue_dir / "nope"))
def test_list_pending_sorted_by_arrival(self):
# Fabricate two with explicit timestamps.
a = Proposal.new(
bottle_slug="dev", tool=TOOL_CRED_PROXY_BLOCK,
proposed_file="{}", justification="early",
current_file_hash="x",
now=datetime(2026, 5, 25, 10, 0, 0, tzinfo=timezone.utc),
)
b = Proposal.new(
bottle_slug="dev", tool=TOOL_CRED_PROXY_BLOCK,
proposed_file="{}", justification="late",
current_file_hash="x",
now=datetime(2026, 5, 25, 14, 0, 0, tzinfo=timezone.utc),
)
# Write in reverse order.
write_proposal(self.queue_dir, b)
write_proposal(self.queue_dir, a)
ordered = list_pending_proposals(self.queue_dir)
self.assertEqual([a.id, b.id], [p.id for p in ordered])
def test_write_and_read_response(self):
r = Response(proposal_id="xyz", status=STATUS_REJECTED, notes="no")
write_response(self.queue_dir, r)
self.assertEqual(r, read_response(self.queue_dir, "xyz"))
def test_wait_for_response_returns_when_file_appears(self):
p = _proposal()
write_proposal(self.queue_dir, p)
def write_after_delay():
time.sleep(0.05)
write_response(self.queue_dir, Response(
proposal_id=p.id, status=STATUS_APPROVED, notes="ok",
))
t = threading.Thread(target=write_after_delay)
t.start()
try:
r = wait_for_response(self.queue_dir, p.id, poll_interval=0.01)
finally:
t.join()
self.assertEqual(STATUS_APPROVED, r.status)
self.assertEqual("ok", r.notes)
def test_wait_for_response_times_out(self):
deadline = time.monotonic() + 0.05
with self.assertRaises(TimeoutError):
wait_for_response(
self.queue_dir, "never",
poll_interval=0.01, deadline=deadline,
)
def test_archive_proposal_moves_both_files(self):
p = _proposal()
write_proposal(self.queue_dir, p)
write_response(self.queue_dir, Response(
proposal_id=p.id, status=STATUS_APPROVED, notes="",
))
archive_proposal(self.queue_dir, p.id)
self.assertFalse((self.queue_dir / f"{p.id}.proposal.json").exists())
self.assertFalse((self.queue_dir / f"{p.id}.response.json").exists())
self.assertTrue((self.queue_dir / "processed" / f"{p.id}.proposal.json").exists())
self.assertTrue((self.queue_dir / "processed" / f"{p.id}.response.json").exists())
def test_archive_is_idempotent_on_missing_files(self):
# Should not raise.
archive_proposal(self.queue_dir, "nope")
class TestAuditLog(unittest.TestCase):
def setUp(self):
self._tmp = tempfile.TemporaryDirectory(prefix="claude-bottle-supervise-audit.")
self._home_patch = self._patch_home(Path(self._tmp.name))
def tearDown(self):
self._home_patch()
self._tmp.cleanup()
def _patch_home(self, fake_home: Path):
original = supervise.claude_bottle_root
def fake_root() -> Path:
return fake_home / ".claude-bottle"
supervise.claude_bottle_root = fake_root # type: ignore[assignment]
return lambda: setattr(supervise, "claude_bottle_root", original)
def test_write_then_read_single_entry(self):
e = AuditEntry(
timestamp="2026-05-25T12:00:00+00:00",
bottle_slug="dev",
component="cred-proxy",
operator_action=STATUS_APPROVED,
operator_notes="lgtm",
justification="agent needed gh-api token",
diff="--- before\n+++ after\n",
)
path = write_audit_entry(e)
self.assertEqual(0o600, path.stat().st_mode & 0o777)
loaded = read_audit_entries("cred-proxy", "dev")
self.assertEqual([e], loaded)
def test_appends_one_line_per_entry(self):
for i in range(3):
write_audit_entry(AuditEntry(
timestamp=f"2026-05-25T12:00:0{i}+00:00",
bottle_slug="dev",
component="pipelock",
operator_action=STATUS_APPROVED,
operator_notes=f"n{i}",
justification="",
diff="",
))
path = audit_log_path("pipelock", "dev")
with path.open() as f:
lines = [line for line in f if line.strip()]
self.assertEqual(3, len(lines))
for line in lines:
self.assertTrue(json.loads(line)) # each line is valid JSON
def test_separate_logs_per_component_slug(self):
write_audit_entry(AuditEntry(
timestamp="t",
bottle_slug="dev",
component="cred-proxy",
operator_action=STATUS_APPROVED,
operator_notes="",
justification="",
diff="",
))
write_audit_entry(AuditEntry(
timestamp="t",
bottle_slug="dev",
component="pipelock",
operator_action=STATUS_APPROVED,
operator_notes="",
justification="",
diff="",
))
write_audit_entry(AuditEntry(
timestamp="t",
bottle_slug="other",
component="cred-proxy",
operator_action=STATUS_REJECTED,
operator_notes="",
justification="",
diff="",
))
self.assertEqual(1, len(read_audit_entries("cred-proxy", "dev")))
self.assertEqual(1, len(read_audit_entries("pipelock", "dev")))
self.assertEqual(1, len(read_audit_entries("cred-proxy", "other")))
def test_read_audit_entries_missing_log_returns_empty(self):
self.assertEqual([], read_audit_entries("cred-proxy", "no-such-bottle"))
class TestDiffAndHash(unittest.TestCase):
def test_render_diff_returns_empty_when_unchanged(self):
self.assertEqual("", render_diff("a\nb\n", "a\nb\n"))
def test_render_diff_shows_changes(self):
diff = render_diff("a\nb\nc\n", "a\nB\nc\n", label="routes.json")
self.assertIn("routes.json (current)", diff)
self.assertIn("routes.json (proposed)", diff)
self.assertIn("-b", diff)
self.assertIn("+B", diff)
def test_sha256_hex_is_deterministic_and_hex(self):
h1 = sha256_hex("hello")
h2 = sha256_hex("hello")
self.assertEqual(h1, h2)
self.assertEqual(64, len(h1))
int(h1, 16) # parses as hex
class TestToolConstants(unittest.TestCase):
def test_tools_tuple_matches_individual_constants(self):
self.assertEqual(
(TOOL_CRED_PROXY_BLOCK, TOOL_PIPELOCK_BLOCK, TOOL_CAPABILITY_BLOCK),
supervise.TOOLS,
)
def test_component_map_covers_two_remediation_tools_only(self):
self.assertIn(TOOL_CRED_PROXY_BLOCK, supervise.COMPONENT_FOR_TOOL)
self.assertIn(TOOL_PIPELOCK_BLOCK, supervise.COMPONENT_FOR_TOOL)
self.assertNotIn(TOOL_CAPABILITY_BLOCK, supervise.COMPONENT_FOR_TOOL)
class _StubSupervise(supervise.Supervise):
"""Concrete Supervise subclass for testing the prepare template."""
def start(self, plan):
return f"stub-{plan.slug}"
def stop(self, target):
return None
class TestSupervisePrepare(unittest.TestCase):
def setUp(self):
self._tmp = tempfile.TemporaryDirectory(prefix="supervise-prepare-test.")
self._home_patch = self._patch_home(Path(self._tmp.name))
self.stage_dir = Path(self._tmp.name) / "stage"
self.stage_dir.mkdir()
def tearDown(self):
self._home_patch()
self._tmp.cleanup()
def _patch_home(self, fake_home: Path):
original = supervise.claude_bottle_root
def fake_root() -> Path:
return fake_home / ".claude-bottle"
supervise.claude_bottle_root = fake_root # type: ignore[assignment]
return lambda: setattr(supervise, "claude_bottle_root", original)
def test_prepare_creates_queue_and_current_config(self):
plan = _StubSupervise().prepare(
"dev", self.stage_dir,
routes_content='{"routes": [{"path": "/x/"}]}\n',
allowlist_content="example.com\n",
dockerfile_content="FROM python:3.13\n",
)
self.assertTrue(plan.queue_dir.is_dir())
self.assertTrue(plan.current_config_dir.is_dir())
self.assertEqual(
'{"routes": [{"path": "/x/"}]}\n',
(plan.current_config_dir / "routes.json").read_text(),
)
self.assertEqual(
"example.com\n",
(plan.current_config_dir / "allowlist").read_text(),
)
self.assertEqual(
"FROM python:3.13\n",
(plan.current_config_dir / "Dockerfile").read_text(),
)
self.assertEqual("dev", plan.slug)
self.assertEqual("", plan.internal_network)
def test_prepare_defaults_routes_to_empty_when_absent(self):
plan = _StubSupervise().prepare("dev", self.stage_dir)
self.assertEqual(
'{"routes": []}\n',
(plan.current_config_dir / "routes.json").read_text(),
)
if __name__ == "__main__":
unittest.main()
+378
View File
@@ -0,0 +1,378 @@
"""Unit: supervise sidecar MCP server (PRD 0013)."""
import http.client
import json
import sys
import tempfile
import threading
import time
import unittest
from pathlib import Path
# The server module loads `supervise` via same-directory import inside
# the container (Dockerfile.supervise WORKDIRs into /app). For tests
# we mirror that by injecting claude_bottle/ onto sys.path under the
# bare name `supervise`.
sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent / "claude_bottle"))
import supervise as _sv # noqa: E402
from claude_bottle import supervise_server # noqa: E402
from claude_bottle.supervise_server import (
ERR_INVALID_PARAMS,
ERR_INVALID_REQUEST,
ERR_METHOD_NOT_FOUND,
ERR_PARSE,
MCPHandler,
MCPServer,
PROPOSED_FILE_FIELD,
ServerConfig,
TOOL_DEFINITIONS,
_RpcError,
format_response_text,
handle_initialize,
handle_tools_call,
handle_tools_list,
jsonrpc_error,
jsonrpc_result,
parse_jsonrpc,
serve,
validate_proposed_file,
)
# --- Validation ------------------------------------------------------------
class TestValidation(unittest.TestCase):
def test_cred_proxy_block_requires_valid_json(self):
with self.assertRaises(_RpcError) as cm:
validate_proposed_file(_sv.TOOL_CRED_PROXY_BLOCK, "{not json")
self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code)
self.assertIn("not valid JSON", cm.exception.message)
def test_cred_proxy_block_requires_routes_array(self):
with self.assertRaises(_RpcError):
validate_proposed_file(_sv.TOOL_CRED_PROXY_BLOCK, '{"other": []}')
def test_cred_proxy_block_accepts_valid_routes(self):
validate_proposed_file(
_sv.TOOL_CRED_PROXY_BLOCK,
'{"routes": [{"path": "/x/", "upstream": "https://example.com"}]}',
)
def test_pipelock_block_accepts_clean_hostnames(self):
validate_proposed_file(
_sv.TOOL_PIPELOCK_BLOCK,
"api.example.com\n# comment\nfoo.bar.baz\n",
)
def test_pipelock_block_rejects_invalid_char(self):
with self.assertRaises(_RpcError):
validate_proposed_file(_sv.TOOL_PIPELOCK_BLOCK, "host with space.com\n")
def test_capability_block_accepts_anything_nonempty(self):
validate_proposed_file(
_sv.TOOL_CAPABILITY_BLOCK,
"FROM python:3.13\nRUN apk add git\n",
)
def test_empty_proposed_file_rejected_for_all_tools(self):
for tool in _sv.TOOLS:
with self.subTest(tool=tool):
with self.assertRaises(_RpcError):
validate_proposed_file(tool, " \n\t")
# --- JSON-RPC parsing ------------------------------------------------------
class TestParseJsonRpc(unittest.TestCase):
def test_parses_request_with_id(self):
req = parse_jsonrpc(
b'{"jsonrpc": "2.0", "id": 7, "method": "tools/list", "params": {}}'
)
self.assertEqual("tools/list", req.method)
self.assertEqual(7, req.id)
self.assertFalse(req.is_notification)
def test_parses_notification_no_id(self):
req = parse_jsonrpc(
b'{"jsonrpc": "2.0", "method": "notifications/initialized"}'
)
self.assertTrue(req.is_notification)
self.assertIsNone(req.id)
def test_rejects_bad_json(self):
with self.assertRaises(_RpcError) as cm:
parse_jsonrpc(b"{not json")
self.assertEqual(ERR_PARSE, cm.exception.code)
def test_rejects_wrong_jsonrpc_version(self):
with self.assertRaises(_RpcError) as cm:
parse_jsonrpc(b'{"jsonrpc": "1.0", "method": "x"}')
self.assertEqual(ERR_INVALID_REQUEST, cm.exception.code)
def test_rejects_missing_method(self):
with self.assertRaises(_RpcError):
parse_jsonrpc(b'{"jsonrpc": "2.0"}')
def test_treats_null_id_as_request(self):
# JSON-RPC spec: id can be null for a request (just discouraged).
req = parse_jsonrpc(b'{"jsonrpc": "2.0", "id": null, "method": "x"}')
self.assertFalse(req.is_notification)
self.assertIsNone(req.id)
# --- JSON-RPC response framing --------------------------------------------
class TestJsonRpcFraming(unittest.TestCase):
def test_result_envelope(self):
body = jsonrpc_result(1, {"ok": True})
decoded = json.loads(body)
self.assertEqual({"jsonrpc": "2.0", "id": 1, "result": {"ok": True}}, decoded)
def test_error_envelope(self):
body = jsonrpc_error(2, -32601, "method not found: foo")
decoded = json.loads(body)
self.assertEqual(
{"jsonrpc": "2.0", "id": 2,
"error": {"code": -32601, "message": "method not found: foo"}},
decoded,
)
# --- MCP handlers ----------------------------------------------------------
class TestHandleInitialize(unittest.TestCase):
def test_returns_protocol_version_and_caps(self):
result = handle_initialize({})
self.assertEqual("2024-11-05", result["protocolVersion"])
self.assertIn("tools", result["capabilities"]) # type: ignore[index]
self.assertEqual(
"claude-bottle-supervise",
result["serverInfo"]["name"], # type: ignore[index]
)
class TestHandleToolsList(unittest.TestCase):
def test_returns_three_tools(self):
result = handle_tools_list({})
names = [t["name"] for t in result["tools"]] # type: ignore[index]
self.assertEqual(
sorted([
_sv.TOOL_CRED_PROXY_BLOCK,
_sv.TOOL_PIPELOCK_BLOCK,
_sv.TOOL_CAPABILITY_BLOCK,
]),
sorted(names),
)
def test_each_tool_has_inputSchema_with_two_required_fields(self):
for tool in TOOL_DEFINITIONS:
with self.subTest(name=tool["name"]):
schema = tool["inputSchema"]
self.assertEqual("object", schema["type"]) # type: ignore[index]
required = schema["required"] # type: ignore[index]
self.assertEqual(2, len(required))
self.assertIn("justification", required)
self.assertIn(PROPOSED_FILE_FIELD[tool["name"]], required) # type: ignore[index]
class TestHandleToolsCall(unittest.TestCase):
def setUp(self):
self._tmp = tempfile.TemporaryDirectory(prefix="supervise-server-test.")
self.queue_dir = Path(self._tmp.name)
self.config = ServerConfig(bottle_slug="dev", queue_dir=self.queue_dir)
def tearDown(self):
self._tmp.cleanup()
def _respond_when_proposal_appears(self, status: str, notes: str = "") -> threading.Thread:
"""Background thread: poll the queue for a fresh proposal, write a
matching response. Returns the thread so the test can join it."""
def runner():
for _ in range(200):
pending = _sv.list_pending_proposals(self.queue_dir)
if pending:
p = pending[0]
_sv.write_response(self.queue_dir, _sv.Response(
proposal_id=p.id, status=status, notes=notes,
))
return
time.sleep(0.01)
t = threading.Thread(target=runner)
t.start()
return t
def test_call_round_trips_through_queue(self):
responder = self._respond_when_proposal_appears(_sv.STATUS_APPROVED, notes="lgtm")
try:
result = handle_tools_call(
{
"name": _sv.TOOL_CRED_PROXY_BLOCK,
"arguments": {
"routes": '{"routes": []}',
"justification": "need a route",
},
},
self.config,
)
finally:
responder.join()
self.assertFalse(result["isError"]) # type: ignore[index]
text = result["content"][0]["text"] # type: ignore[index]
self.assertIn("status: approved", text)
self.assertIn("notes: lgtm", text)
def test_rejected_response_sets_isError(self):
responder = self._respond_when_proposal_appears(_sv.STATUS_REJECTED, notes="nope")
try:
result = handle_tools_call(
{
"name": _sv.TOOL_PIPELOCK_BLOCK,
"arguments": {
"allowlist": "example.com\n",
"justification": "needed for tests",
},
},
self.config,
)
finally:
responder.join()
self.assertTrue(result["isError"]) # type: ignore[index]
def test_invalid_tool_name_raises(self):
with self.assertRaises(_RpcError) as cm:
handle_tools_call(
{"name": "not-a-tool", "arguments": {}},
self.config,
)
self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code)
def test_missing_justification_raises(self):
with self.assertRaises(_RpcError):
handle_tools_call(
{
"name": _sv.TOOL_CRED_PROXY_BLOCK,
"arguments": {"routes": '{"routes": []}'},
},
self.config,
)
def test_archives_proposal_after_response(self):
responder = self._respond_when_proposal_appears(_sv.STATUS_APPROVED)
try:
handle_tools_call(
{
"name": _sv.TOOL_CRED_PROXY_BLOCK,
"arguments": {
"routes": '{"routes": []}',
"justification": "x",
},
},
self.config,
)
finally:
responder.join()
# No pending proposals left after archive.
self.assertEqual([], _sv.list_pending_proposals(self.queue_dir))
# Both files moved to processed/.
processed = list((self.queue_dir / "processed").glob("*.json"))
self.assertEqual(2, len(processed))
# --- Response text formatting ---------------------------------------------
class TestFormatResponseText(unittest.TestCase):
def test_approved_with_notes(self):
text = format_response_text(_sv.Response(
proposal_id="x", status=_sv.STATUS_APPROVED, notes="retry now",
))
self.assertIn("status: approved", text)
self.assertIn("notes: retry now", text)
def test_modified_includes_modified_hint(self):
text = format_response_text(_sv.Response(
proposal_id="x", status=_sv.STATUS_MODIFIED, notes="",
final_file="modified content",
))
self.assertIn("status: modified", text)
self.assertIn("the operator modified", text.lower())
# --- End-to-end HTTP sanity ------------------------------------------------
class TestHttpEndToEnd(unittest.TestCase):
"""Spin up the server on a random port and round-trip a tools/list
over real HTTP. Catches the JSON-RPC plumbing if it ever drifts
from the unit-level handlers."""
def setUp(self):
self._tmp = tempfile.TemporaryDirectory(prefix="supervise-http-test.")
self.queue_dir = Path(self._tmp.name)
# Pick a random port by binding to :0 first.
import socket
s = socket.socket()
s.bind(("127.0.0.1", 0))
self.port = s.getsockname()[1]
s.close()
self.server = MCPServer(("127.0.0.1", self.port), MCPHandler)
self.server.config = ServerConfig(bottle_slug="dev", queue_dir=self.queue_dir)
self.thread = threading.Thread(
target=self.server.serve_forever, daemon=True,
)
self.thread.start()
def tearDown(self):
self.server.shutdown()
self.server.server_close()
self.thread.join(timeout=2)
self._tmp.cleanup()
def _post_jsonrpc(self, body: dict[str, object]) -> dict[str, object]:
conn = http.client.HTTPConnection("127.0.0.1", self.port, timeout=5)
try:
payload = json.dumps(body).encode("utf-8")
conn.request("POST", "/", body=payload,
headers={"Content-Type": "application/json",
"Content-Length": str(len(payload))})
resp = conn.getresponse()
data = resp.read()
return json.loads(data)
finally:
conn.close()
def test_tools_list_over_http(self):
result = self._post_jsonrpc(
{"jsonrpc": "2.0", "id": 1, "method": "tools/list"},
)
self.assertEqual("2.0", result["jsonrpc"])
self.assertEqual(1, result["id"])
names = [t["name"] for t in result["result"]["tools"]] # type: ignore[index]
self.assertIn(_sv.TOOL_CRED_PROXY_BLOCK, names)
def test_unknown_method_returns_jsonrpc_error(self):
result = self._post_jsonrpc(
{"jsonrpc": "2.0", "id": 2, "method": "does/not/exist"},
)
self.assertEqual(ERR_METHOD_NOT_FOUND, result["error"]["code"]) # type: ignore[index]
def test_health_endpoint(self):
conn = http.client.HTTPConnection("127.0.0.1", self.port, timeout=5)
try:
conn.request("GET", "/health")
resp = conn.getresponse()
self.assertEqual(200, resp.status)
self.assertEqual(b"ok\n", resp.read())
finally:
conn.close()
if __name__ == "__main__":
unittest.main()