fix(supervise): provision MCP via claude mcp add #25

Merged
didericis merged 9 commits from supervise-mcp-add-via-cli into main 2026-05-25 08:31:17 -04:00
12 changed files with 588 additions and 157 deletions
+25 -2
View File
@@ -19,7 +19,7 @@ from typing import Callable, Generator
from ...log import die, info
from ...pipelock import pipelock_build_config, pipelock_render_yaml
from ...supervise import CURRENT_CONFIG_DIR_IN_AGENT
from ...supervise import CURRENT_CONFIG_DIR_IN_AGENT, SUPERVISE_HOSTNAME
from . import network as network_mod
from . import util as docker_mod
from .bottle import DockerBottle
@@ -185,6 +185,29 @@ def launch(
teardown()
def _agent_no_proxy(plan: DockerBottlePlan) -> str:
"""NO_PROXY value for the agent container. Standard loopback +
`supervise` when the supervise sidecar is enabled.
Supervise needs to bypass pipelock because the MCP tool-call
pattern is long-poll: claude-code opens an HTTPS-style request to
http://supervise:9100/, the sidecar holds it open until the
operator approves (potentially minutes), then returns the
response. Pipelock is a forward proxy with idle timeouts;
pipelock cuts the long-polled connection well before the operator
can act, and claude-code reports the tool as ✘ failed even
though /mcp shows ✔ connected.
The supervise sidecar is on the bottle's internal network with
the `supervise` network-alias, so the agent can dial it
directly via docker DNS. Body-scanning the supervise traffic
isn't critical — the operator reviews every proposal in the TUI."""
hosts = ["localhost", "127.0.0.1"]
if plan.supervise_plan is not None:
hosts.append(SUPERVISE_HOSTNAME)
return ",".join(hosts)
def _run_agent_container(plan: DockerBottlePlan, internal_network: str) -> str:
"""Build the `docker run` argv and execute it, handling name-
conflict races by incrementing the suffix (unless the name was
@@ -196,7 +219,7 @@ def _run_agent_container(plan: DockerBottlePlan, internal_network: str) -> str:
"--network", internal_network,
"-e", f"HTTPS_PROXY={proxy_url}",
"-e", f"HTTP_PROXY={proxy_url}",
"-e", "NO_PROXY=localhost,127.0.0.1",
"-e", f"NO_PROXY={_agent_no_proxy(plan)}",
# CA trust trio for the agent process. Docker propagates
# run-time env into `docker exec`, so `claude` sees these
# without per-exec threading. NODE_EXTRA_CA_CERTS points at
@@ -1,10 +1,15 @@
"""Supervise sidecar provisioning inside a running Docker bottle
(PRD 0013).
Writes ~/.claude/settings.json with an `mcpServers.supervise` entry
pointing at the per-bottle supervise sidecar so the in-bottle Claude
Code discovers the three stuck-recovery MCP tools (cred-proxy-block,
pipelock-block, capability-block) at startup.
Registers the per-bottle supervise sidecar as an HTTP MCP server in
the agent's claude-code config so the agent discovers the three
stuck-recovery MCP tools (cred-proxy-block, pipelock-block,
capability-block) at startup.
Uses `claude mcp add` rather than writing JSON directly. claude-code
owns the on-disk config format (`~/.claude.json` `mcpServers` shape,
field names, scope semantics) and changes it between versions; the
official command handles whatever the installed version expects.
No-op when bottle.supervise is False — bottles that haven't opted
into the supervise sidecar shouldn't get an MCP entry pointing at a
@@ -13,63 +18,48 @@ sidecar that isn't running.
from __future__ import annotations
import json
import os
import subprocess
from ....log import info
from ....log import info, warn
from ....supervise import SUPERVISE_HOSTNAME, SUPERVISE_PORT
from .. import util as docker_mod
from ..bottle_plan import DockerBottlePlan
_AGENT_HOME_DEFAULT = "/home/node"
_SETTINGS_REL_PATH = ".claude/settings.json"
_SUPERVISE_MCP_NAME = "supervise"
def render_settings() -> str:
"""The settings.json content the agent reads on startup. Stable
shape — only the URL is parameterized in case CLAUDE_BOTTLE_*
env overrides change the supervise hostname/port someday."""
cfg = {
"mcpServers": {
_SUPERVISE_MCP_NAME: {
"type": "http",
"url": f"http://{SUPERVISE_HOSTNAME}:{SUPERVISE_PORT}/",
},
},
}
return json.dumps(cfg, indent=2) + "\n"
def supervise_mcp_url() -> str:
return f"http://{SUPERVISE_HOSTNAME}:{SUPERVISE_PORT}/"
def provision_supervise(plan: DockerBottlePlan, target: str) -> None:
"""Drop ~/.claude/settings.json into the running agent container
when bottle.supervise is True. No-op otherwise."""
"""Run `claude mcp add` inside the agent container to register
the supervise sidecar in claude-code's user config. No-op when
bottle.supervise is False.
Failure is logged but not fatal: the bottle still works (you
just can't call supervise tools from the agent until the entry
is added manually). The operator sees the warning at launch."""
if plan.supervise_plan is None:
return
container_home = os.environ.get(
"CLAUDE_BOTTLE_CONTAINER_HOME", _AGENT_HOME_DEFAULT,
)
settings_in_container = f"{container_home}/{_SETTINGS_REL_PATH}"
settings_dir_in_container = settings_in_container.rsplit("/", 1)[0]
host_path = plan.stage_dir / "agent_claude_settings.json"
host_path.write_text(render_settings())
host_path.chmod(0o644)
info(f"writing {settings_in_container} (supervise MCP server entry)")
# The Dockerfile creates ~/.claude.json at the top of HOME but
# not the ~/.claude/ subdir, so make sure it exists before cp.
docker_mod.docker_exec_root(target, ["mkdir", "-p", settings_dir_in_container])
subprocess.run(
["docker", "cp", str(host_path), f"{target}:{settings_in_container}"],
stdout=subprocess.DEVNULL,
check=True,
)
docker_mod.docker_exec_root(target, ["chown", "-R", "node:node", settings_dir_in_container])
docker_mod.docker_exec_root(target, ["chmod", "644", settings_in_container])
url = supervise_mcp_url()
argv = [
"docker", "exec", "-u", "node", target,
"claude", "mcp", "add",
"--scope", "user",
"--transport", "http",
_SUPERVISE_MCP_NAME,
url,
]
info(f"registering supervise MCP server in agent claude config → {url}")
r = subprocess.run(argv, capture_output=True, text=True, check=False)
if r.returncode != 0:
warn(
f"`claude mcp add supervise` failed (exit {r.returncode}): "
f"{(r.stderr or r.stdout or '').strip()}. Inside the bottle, "
f"register manually with: "
f"claude mcp add --scope user --transport http supervise {url}"
)
__all__ = ["provision_supervise", "render_settings"]
__all__ = ["provision_supervise", "supervise_mcp_url"]
+174 -22
View File
@@ -17,6 +17,7 @@ import os
import subprocess
import sys
import tempfile
import time
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
@@ -35,6 +36,8 @@ from ..backend.docker.pipelock_apply import (
PipelockApplyError,
apply_allowlist_change,
fetch_current_allowlist,
parse_allowlist_content,
render_allowlist_content,
)
from ..log import info
from ..supervise import (
@@ -167,7 +170,7 @@ def approve(
qp.proposal.bottle_slug, file_to_apply,
)
elif qp.proposal.tool == TOOL_PIPELOCK_BLOCK:
diff_before, diff_after = apply_allowlist_change(
diff_before, diff_after = _apply_pipelock_url(
qp.proposal.bottle_slug, file_to_apply,
)
elif qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
@@ -229,6 +232,41 @@ def operator_edit_routes(slug: str, new_content: str) -> tuple[str, str]:
return before, after
def _apply_pipelock_url(slug: str, failed_url: str) -> tuple[str, str]:
"""pipelock-block proposals carry a single failed URL, not a
full allowlist. Extract the host, merge into the running
allowlist, and hand the merged content to apply_allowlist_change.
The full URL (with path) is preserved on the proposal for the
operator's read; only the host ends up in pipelock's allowlist.
FOLLOW-UP — path-aware filtering. Pipelock 2.3.0's api_allowlist
is hostname-only (verified by inspecting the binary's strict
preset; the only "path" fields in pipelock's schema are about
local filesystem paths under sandbox / file_sentry / taint). So
approving pipelock-block opens the entire host, not the URL's
path. If/when per-path enforcement becomes load-bearing, the
follow-up is most likely adding an `auth_scheme: none` mode +
`path_allowlist` field to cred-proxy (which already does
path-prefix routing) and rewiring pipelock-block to propose
cred-proxy routes instead of pipelock hostnames. That's a
multi-touch change deserving its own PRD — out of scope for the
supervise-loop work that introduced this function. See PR
discussion on https://gitea.dideric.is/didericis/claude-bottle/pulls/25
for the design conversation."""
import urllib.parse
parsed = urllib.parse.urlsplit(failed_url.strip())
host = parsed.hostname or ""
if not host:
raise PipelockApplyError(
f"proposed failed_url has no extractable host: {failed_url!r}"
)
current = fetch_current_allowlist(slug)
hosts = parse_allowlist_content(current)
if host not in hosts:
hosts.append(host)
return apply_allowlist_change(slug, render_allowlist_content(hosts))
def operator_edit_allowlist(slug: str, new_content: str) -> tuple[str, str]:
"""Apply an operator-initiated pipelock allowlist change (no
agent proposal). Used by the `pipelock edit <bottle>` TUI verb
@@ -342,9 +380,57 @@ def _list_once() -> int:
return 0
_REFRESH_INTERVAL_MS = 1000
# How long a newly-arrived proposal stays highlighted (green) in the
# list. Long enough for the operator to notice in their peripheral
# vision, short enough to fade before the queue feels permanently
# noisy.
_NEW_PROPOSAL_HIGHLIGHT_SEC = 5.0
def _is_recent(
proposal_id: str,
first_seen: dict[str, float] | None,
now: float | None,
) -> bool:
"""True if `proposal_id` was first seen within the highlight
window. Both `first_seen` and `now` may be None (rendered as
not-recent) so the helper is safe in cold-start paths."""
if first_seen is None or now is None:
return False
started = first_seen.get(proposal_id)
if started is None:
return False
return (now - started) < _NEW_PROPOSAL_HIGHLIGHT_SEC
def _try_init_green() -> int:
"""Initialise a green color pair and return its attr, or 0 if the
terminal doesn't support color. Caller ORs the returned value
into addnstr's attr argument; OR 0 is a no-op."""
try:
curses.start_color()
curses.use_default_colors()
curses.init_pair(1, curses.COLOR_GREEN, -1)
return curses.color_pair(1)
except curses.error:
return 0
def _main_loop(stdscr: "curses._CursesWindow") -> None:
curses.curs_set(0)
stdscr.nodelay(False)
# Auto-refresh: getch() returns -1 after the timeout if no key
# was pressed, so the loop re-renders with any newly-arrived
# proposals every ~1s. Without this the screen only updates
# when the operator hits a key — a tool call landing while the
# operator is just watching wouldn't appear.
stdscr.timeout(_REFRESH_INTERVAL_MS)
green_attr = _try_init_green()
# Per-proposal first-seen timestamps drive the "new" highlight.
# We add entries as proposals show up and prune ones that are
# gone (approved / rejected / archived) so the dict stays small.
first_seen: dict[str, float] = {}
selected = 0
status_line = ""
while True:
@@ -352,14 +438,30 @@ def _main_loop(stdscr: "curses._CursesWindow") -> None:
if selected >= len(pending):
selected = max(0, len(pending) - 1)
_render(stdscr, pending, selected, status_line)
status_line = ""
now = time.monotonic()
live_ids = {qp.proposal.id for qp in pending}
for proposal_id in live_ids:
first_seen.setdefault(proposal_id, now)
for stale_id in list(first_seen.keys() - live_ids):
del first_seen[stale_id]
_render(stdscr, pending, selected, status_line, first_seen, now, green_attr)
try:
key = stdscr.getch()
except KeyboardInterrupt:
return
if key == -1:
# Timeout fired — re-render with fresh queue. Status_line
# is left intact so messages from a prior keystroke stay
# readable until the operator actually does something else.
continue
# Real keystroke: clear any stale status before dispatching
# so the next render reflects what just happened.
status_line = ""
if key in (ord("q"), 27): # q or ESC
return
if key == ord("e"):
@@ -377,7 +479,7 @@ def _main_loop(stdscr: "curses._CursesWindow") -> None:
elif key in (curses.KEY_UP, ord("k")):
selected = max(selected - 1, 0)
elif key in (curses.KEY_ENTER, 10, 13, ord("v")):
_detail_view(stdscr, qp)
_detail_view(stdscr, qp, green_attr=green_attr)
elif key == ord("a"):
try:
approve(qp)
@@ -408,6 +510,9 @@ def _render(
pending: list[QueuedProposal],
selected: int,
status_line: str,
first_seen: dict[str, float] | None = None,
now: float | None = None,
green_attr: int = 0,
) -> None:
stdscr.erase()
h, w = stdscr.getmaxyx()
@@ -435,6 +540,8 @@ def _render(
f"{p.justification[:60]}"
)
attr = curses.A_REVERSE if i == selected else curses.A_NORMAL
if _is_recent(p.id, first_seen, now):
attr |= green_attr
stdscr.addnstr(row, 0, line, w - 1, attr)
footer = (
@@ -448,16 +555,21 @@ def _render(
stdscr.refresh()
def _detail_view(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> None:
def _detail_view(
stdscr: "curses._CursesWindow",
qp: QueuedProposal,
*,
green_attr: int = 0,
) -> None:
"""Render the full proposal: header, justification, proposed file
contents. Scrollable. Press q to return."""
lines = _detail_lines(qp)
lines = _detail_lines(qp, green_attr=green_attr)
offset = 0
while True:
stdscr.erase()
h, w = stdscr.getmaxyx()
for i, line in enumerate(lines[offset:offset + h - 1]):
stdscr.addnstr(i, 0, line, w - 1)
for i, (text, attr) in enumerate(lines[offset:offset + h - 1]):
stdscr.addnstr(i, 0, text, w - 1, attr)
stdscr.addnstr(
h - 1, 0,
"[j/k] scroll [g/G] top/bottom [a] approve [m] modify [r] reject [q] back",
@@ -496,26 +608,66 @@ def _detail_view(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> None:
return
def _detail_lines(qp: QueuedProposal) -> list[str]:
def _detail_lines(
qp: QueuedProposal,
*,
green_attr: int = 0,
) -> list[tuple[str, int]]:
"""Return the detail-view body as (text, curses-attr) tuples.
Most lines are plain (attr=0); pipelock-block proposals append
a green "→ would allow host: ..." line so the operator sees at
a glance which hostname will land in pipelock's allowlist if
they hit approve. The URL itself is shown above for context."""
p = qp.proposal
out = [
f"bottle: {p.bottle_slug}",
f"tool: {p.tool}",
f"id: {p.id}",
f"arrived: {p.arrival_timestamp}",
f"queue: {qp.queue_dir}",
"",
"justification:",
out: list[tuple[str, int]] = [
(f"bottle: {p.bottle_slug}", 0),
(f"tool: {p.tool}", 0),
(f"id: {p.id}", 0),
(f"arrived: {p.arrival_timestamp}", 0),
(f"queue: {qp.queue_dir}", 0),
("", 0),
("justification:", 0),
]
out.extend(" " + line for line in p.justification.splitlines() or [""])
out.extend((" " + line, 0) for line in p.justification.splitlines() or [""])
out.extend([
"",
"proposed file:",
("", 0),
(_proposed_payload_label(p.tool) + ":", 0),
])
out.extend(p.proposed_file.splitlines() or [""])
out.extend((line, 0) for line in p.proposed_file.splitlines() or [""])
if p.tool == TOOL_PIPELOCK_BLOCK:
host = _failed_url_host(p.proposed_file)
if host:
# Show the literal line that will be appended to the
# bottle's pipelock allowlist on approve. Green so it
# reads as "what changes"; the URL above carries the
# path context (which pipelock can't enforce — see the
# follow-up note on _apply_pipelock_url).
out.append(("", 0))
out.append((host, green_attr))
return out
def _failed_url_host(url: str) -> str:
"""Best-effort hostname extraction from a pipelock-block proposal's
failed_url payload. Returns empty string on unparseable input —
callers handle empty as "nothing to highlight"."""
import urllib.parse
try:
return urllib.parse.urlsplit(url.strip()).hostname or ""
except ValueError:
return ""
def _proposed_payload_label(tool: str) -> str:
"""The detail-view section heading for the proposal's payload —
`proposed_file` is what the dataclass calls it, but for
pipelock-block the payload is a single URL not a file. Render
the label per tool so the operator's eye matches."""
if tool == TOOL_PIPELOCK_BLOCK:
return "failed URL"
return "proposed file"
def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None:
"""Suspend curses, open $EDITOR on the proposed file, return the
edited content (or None if unchanged)."""
+12 -7
View File
@@ -18,6 +18,7 @@ from pathlib import Path
from typing import cast
from .cred_proxy import CRED_PROXY_HOSTNAME
from .supervise import SUPERVISE_HOSTNAME
from .manifest import Bottle
# Baked-in default allowlist for hosts Claude Code itself needs.
@@ -76,16 +77,18 @@ def pipelock_token_hosts(bottle: Bottle) -> list[str]:
def pipelock_effective_allowlist(bottle: Bottle) -> list[str]:
"""Deduplicated union of: baked-in defaults, bottle.egress.allowlist,
the cred-proxy upstream hosts derived from bottle.cred_proxy.routes,
and the cred-proxy sidecar's own hostname when any cred_proxy route
is declared. Sorted for stability. Git upstreams declared in
the cred-proxy sidecar's own hostname when any cred_proxy route is
declared, and the supervise sidecar's hostname when bottle.supervise
is enabled. Sorted for stability. Git upstreams declared in
`bottle.git` do NOT contribute here — git traffic flows through the
per-agent git-gate sidecar (PRD 0008), not pipelock.
The cred-proxy hostname is auto-added because the agent's
HTTP_PROXY points at pipelock, so a manifest-driven URL like
`http://cred-proxy:9099/anthropic/...` arrives at pipelock as a
request for hostname `cred-proxy`. Without this auto-allow,
pipelock would 403 the request before it reached the sidecar."""
The cred-proxy + supervise hostnames are auto-added because the
agent's HTTP_PROXY points at pipelock, so a manifest-driven URL
like `http://cred-proxy:9099/anthropic/...` or
`http://supervise:9100/` arrives at pipelock as a request for the
sidecar hostname. Without this auto-allow, pipelock would 403 the
request before it reached the sidecar."""
seen: dict[str, None] = {}
for h in DEFAULT_ALLOWLIST:
seen.setdefault(h, None)
@@ -96,6 +99,8 @@ def pipelock_effective_allowlist(bottle: Bottle) -> list[str]:
seen.setdefault(h, None)
if bottle.cred_proxy.routes:
seen.setdefault(CRED_PROXY_HOSTNAME, None)
if bottle.supervise:
seen.setdefault(SUPERVISE_HOSTNAME, None)
return sorted(seen.keys())
+46 -24
View File
@@ -36,6 +36,7 @@ import os
import socketserver
import sys
import typing
import urllib.parse
from dataclasses import dataclass
from pathlib import Path
@@ -160,27 +161,34 @@ TOOL_DEFINITIONS: list[dict[str, object]] = [
"name": _sv.TOOL_PIPELOCK_BLOCK,
"description": (
"Call when pipelock refused your outbound request — host "
"not in the allowlist, protocol blocked, connection "
"refused at the egress layer. Read the current allowlist "
"from /etc/claude-bottle/current-config/allowlist, compose "
"a modified version, and pass the full new file plus a "
"justification. On approval the supervisor writes the new "
"allowlist and restarts pipelock (wired in PRD 0015; v1 "
"acknowledges only)."
"not in the allowlist, connection refused at the egress "
"layer. Pass the full URL you tried to hit (scheme + "
"host + path) plus a justification. The supervisor "
"extracts the hostname and merges it into the bottle's "
"current pipelock allowlist; the path is captured as "
"context for the operator to review (pipelock's allowlist "
"is hostname-only — it can't enforce path-level rules). "
"On approval the supervisor restarts pipelock with the "
"merged allowlist."
),
"inputSchema": {
"type": "object",
"properties": {
"allowlist": {
"failed_url": {
"type": "string",
"description": "Full proposed pipelock allowlist (one hostname per line).",
"description": (
"The full URL pipelock blocked, e.g. "
"https://api.github.com/repos/foo/bar. Scheme "
"and hostname are required; path is recorded "
"as operator context."
),
},
"justification": {
"type": "string",
"description": "Why the new host(s) should be allowed.",
"description": "Why the new host should be allowed.",
},
},
"required": ["allowlist", "justification"],
"required": ["failed_url", "justification"],
},
},
{
@@ -214,10 +222,20 @@ TOOL_DEFINITIONS: list[dict[str, object]] = [
]
# Map each tool to the input field that carries the proposed file.
# Map each tool to the input field that carries the agent's
# tool-specific payload (stored in Proposal.proposed_file as
# free-form text the apply path interprets per tool).
#
# cred-proxy-block: full proposed routes.json
# pipelock-block: the full failed URL (scheme + host + path) —
# supervisor extracts the host, merges into the
# bottle's current allowlist; the path is shown
# to the operator for context (pipelock doesn't
# do path-level matching).
# capability-block: full proposed Dockerfile
PROPOSED_FILE_FIELD: dict[str, str] = {
_sv.TOOL_CRED_PROXY_BLOCK: "routes",
_sv.TOOL_PIPELOCK_BLOCK: "allowlist",
_sv.TOOL_PIPELOCK_BLOCK: "failed_url",
_sv.TOOL_CAPABILITY_BLOCK: "dockerfile",
}
@@ -245,17 +263,21 @@ def validate_proposed_file(tool: str, content: str) -> None:
f"{tool}: proposed routes.json must be an object with a 'routes' array",
)
elif tool == _sv.TOOL_PIPELOCK_BLOCK:
for i, line in enumerate(content.splitlines()):
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
# Hostnames are conservative: letters/digits/dots/dashes only.
for ch in stripped:
if not (ch.isalnum() or ch in ".-_"):
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: allowlist line {i + 1} has invalid character {ch!r}",
)
# `content` is the full failed URL. Require scheme + host so
# the supervisor can extract a hostname for the allowlist
# merge; the path is preserved for operator context.
parsed = urllib.parse.urlsplit(content.strip())
if parsed.scheme not in ("http", "https"):
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: failed_url must start with http:// or https:// "
f"(got {content!r})",
)
if not parsed.hostname:
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: failed_url is missing a hostname (got {content!r})",
)
elif tool == _sv.TOOL_CAPABILITY_BLOCK:
# Dockerfiles are too varied to validate syntactically beyond
# non-empty. The operator reads the diff in the TUI.
+42
View File
@@ -0,0 +1,42 @@
"""Unit: agent NO_PROXY value builder (PR #25 follow-up).
claude-code's HTTP MCP client must bypass pipelock for the supervise
sidecar — long-poll tool calls would hit pipelock's idle timeout
otherwise. This test pins the rule: localhost always; supervise iff
the supervise sidecar is in the plan."""
import unittest
from pathlib import Path
from claude_bottle.backend.docker.launch import _agent_no_proxy
class _FakePlan:
"""Just enough plan shape for the helper — no full DockerBottlePlan
construction needed."""
def __init__(self, supervise_plan):
self.supervise_plan = supervise_plan
class _SentinelSupervisePlan:
"""The helper only checks `supervise_plan is not None`; any object
is fine."""
class TestAgentNoProxy(unittest.TestCase):
def test_loopback_only_when_no_supervise(self):
self.assertEqual(
"localhost,127.0.0.1",
_agent_no_proxy(_FakePlan(supervise_plan=None)),
)
def test_supervise_appended_when_enabled(self):
self.assertEqual(
"localhost,127.0.0.1,supervise",
_agent_no_proxy(_FakePlan(supervise_plan=_SentinelSupervisePlan())),
)
if __name__ == "__main__":
unittest.main()
+64 -25
View File
@@ -38,11 +38,21 @@ FIXED = datetime(2026, 5, 25, 12, 0, 0, tzinfo=timezone.utc)
def _proposal(slug: str = "dev", tool: str = TOOL_CRED_PROXY_BLOCK) -> Proposal:
# Per-tool payload shape: cred-proxy gets routes.json, pipelock
# gets a failed URL (PR #25 follow-up), capability gets a
# Dockerfile-ish blob. Match the production dispatch in
# PROPOSED_FILE_FIELD.
payloads = {
TOOL_CRED_PROXY_BLOCK: '{"routes": []}\n',
TOOL_PIPELOCK_BLOCK: "https://example.com/path",
TOOL_CAPABILITY_BLOCK: "FROM python:3.13\n",
}
payload = payloads.get(tool, "")
return Proposal.new(
bottle_slug=slug, tool=tool,
proposed_file='{"routes": []}\n',
proposed_file=payload,
justification=f"needed for {slug}",
current_file_hash=sha256_hex("{}"),
current_file_hash=sha256_hex(payload),
now=FIXED,
)
@@ -119,6 +129,7 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
self._setup_fake_home()
self._original_apply_routes = dashboard.apply_routes_change
self._original_apply_allowlist = dashboard.apply_allowlist_change
self._original_fetch_allowlist = dashboard.fetch_current_allowlist
self._original_apply_capability = dashboard.apply_capability_change
# Default stubs: succeed with deterministic before/after so the
# audit log shows a non-empty diff.
@@ -128,6 +139,7 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
dashboard.apply_allowlist_change = lambda slug, content: (
"old.example\n", content,
)
dashboard.fetch_current_allowlist = lambda slug: "old.example\n"
dashboard.apply_capability_change = lambda slug, content: (
"FROM old\n", content,
)
@@ -135,6 +147,7 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
def tearDown(self):
dashboard.apply_routes_change = self._original_apply_routes
dashboard.apply_allowlist_change = self._original_apply_allowlist
dashboard.fetch_current_allowlist = self._original_fetch_allowlist
dashboard.apply_capability_change = self._original_apply_capability
self._teardown_fake_home()
@@ -281,23 +294,27 @@ class TestCredProxyApplyWiring(_FakeHomeMixin, unittest.TestCase):
class TestPipelockApplyWiring(_FakeHomeMixin, unittest.TestCase):
"""PRD 0015 Phase 2: approve() on a pipelock-block proposal
must call apply_allowlist_change and surface its failures."""
"""PRD 0015 Phase 2 + PR #25 follow-up: approve() on a
pipelock-block proposal carries the failed URL; the dashboard
extracts the host, merges it into the running allowlist, and
calls apply_allowlist_change with the merged content."""
def setUp(self):
self._setup_fake_home()
self._original = dashboard.apply_allowlist_change
self._original_apply = dashboard.apply_allowlist_change
self._original_fetch = dashboard.fetch_current_allowlist
def tearDown(self):
dashboard.apply_allowlist_change = self._original
dashboard.apply_allowlist_change = self._original_apply
dashboard.fetch_current_allowlist = self._original_fetch
self._teardown_fake_home()
def _enqueue_pipelock(self, proposed: str = "host.example\n"):
def _enqueue_pipelock(self, failed_url: str = "https://api.github.com/repos/foo/bar"):
p = Proposal.new(
bottle_slug="dev", tool=TOOL_PIPELOCK_BLOCK,
proposed_file=proposed,
justification="need new host",
current_file_hash=sha256_hex(proposed),
proposed_file=failed_url,
justification="need to read PR metadata",
current_file_hash=sha256_hex(failed_url),
now=FIXED,
)
qdir = supervise.queue_dir_for_slug("dev")
@@ -305,16 +322,41 @@ class TestPipelockApplyWiring(_FakeHomeMixin, unittest.TestCase):
supervise.write_proposal(qdir, p)
return dashboard.QueuedProposal(proposal=p, queue_dir=qdir)
def test_pipelock_block_calls_apply_with_proposed_file(self):
calls = []
def test_url_host_merged_into_current_allowlist(self):
dashboard.fetch_current_allowlist = lambda slug: "existing.example\n"
applied = []
dashboard.apply_allowlist_change = lambda slug, content: (
calls.append((slug, content)) or ("before", content)
applied.append((slug, content))
or ("existing.example\n", content)
)
qp = self._enqueue_pipelock("new.example\n")
qp = self._enqueue_pipelock("https://api.github.com/repos/foo/bar")
dashboard.approve(qp)
self.assertEqual([("dev", "new.example\n")], calls)
# apply_allowlist_change was called with the merged content:
# existing host + the URL's host (no path, since pipelock is
# hostname-only).
self.assertEqual(1, len(applied))
slug, content = applied[0]
self.assertEqual("dev", slug)
self.assertIn("existing.example", content)
self.assertIn("api.github.com", content)
self.assertNotIn("/repos/foo/bar", content) # path stripped
def test_host_already_in_allowlist_is_idempotent(self):
dashboard.fetch_current_allowlist = lambda slug: "api.github.com\n"
applied = []
dashboard.apply_allowlist_change = lambda slug, content: (
applied.append(content)
or ("api.github.com\n", content)
)
qp = self._enqueue_pipelock("https://api.github.com/some/path")
dashboard.approve(qp)
# Still applied, but the content is unchanged from current —
# before/after diff is empty.
self.assertEqual(1, len(applied))
self.assertEqual("api.github.com\n", applied[0])
def test_apply_failure_blocks_response_and_audit(self):
dashboard.fetch_current_allowlist = lambda slug: "existing.example\n"
dashboard.apply_allowlist_change = lambda slug, content: (_ for _ in ()).throw(
PipelockApplyError("docker exec failed")
)
@@ -327,16 +369,13 @@ class TestPipelockApplyWiring(_FakeHomeMixin, unittest.TestCase):
)
self.assertEqual([], read_audit_entries("pipelock", "dev"))
def test_real_diff_lands_in_audit(self):
dashboard.apply_allowlist_change = lambda slug, content: (
"old.example\n",
"old.example\nnew.example\n",
)
qp = self._enqueue_pipelock("old.example\nnew.example\n")
dashboard.approve(qp)
entries = read_audit_entries("pipelock", "dev")
self.assertEqual(1, len(entries))
self.assertIn("+new.example", entries[0].diff)
def test_url_without_host_raises(self):
dashboard.fetch_current_allowlist = lambda slug: ""
# supervise_server's validator would catch this; if a broken
# URL ever makes it through, the dashboard surfaces it too.
qp = self._enqueue_pipelock("https:///nohost")
with self.assertRaises(PipelockApplyError):
dashboard.approve(qp)
class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
+100
View File
@@ -0,0 +1,100 @@
"""Unit: dashboard's detail-view line builder.
_detail_lines returns (text, attr) tuples. Most are plain; for
pipelock-block proposals it appends a "→ would allow host: <host>"
line tagged with the green attr so the operator sees at a glance
which hostname will land in pipelock's allowlist on approval."""
import unittest
from claude_bottle import supervise
from claude_bottle.cli import dashboard
from claude_bottle.supervise import (
Proposal,
TOOL_CAPABILITY_BLOCK,
TOOL_CRED_PROXY_BLOCK,
TOOL_PIPELOCK_BLOCK,
sha256_hex,
)
def _qp(tool: str, payload: str) -> dashboard.QueuedProposal:
from datetime import datetime, timezone
from pathlib import Path
p = Proposal.new(
bottle_slug="dev",
tool=tool,
proposed_file=payload,
justification="needs",
current_file_hash=sha256_hex(payload),
now=datetime(2026, 5, 25, 12, 0, 0, tzinfo=timezone.utc),
)
return dashboard.QueuedProposal(proposal=p, queue_dir=Path("/tmp/q"))
class TestPipelockHostHighlight(unittest.TestCase):
GREEN = 0xDEADBEEF # arbitrary sentinel; _detail_lines passes through
def test_appends_green_host_line_for_pipelock_block(self):
lines = dashboard._detail_lines(
_qp(TOOL_PIPELOCK_BLOCK, "https://api.github.com/repos/foo/bar"),
green_attr=self.GREEN,
)
# The host appears as its own green-tagged line — literal
# text of what gets appended to pipelock's allowlist on
# approve.
green_lines = [text for text, attr in lines if attr == self.GREEN]
self.assertEqual(["api.github.com"], green_lines)
def test_no_green_lines_for_cred_proxy_block(self):
lines = dashboard._detail_lines(
_qp(TOOL_CRED_PROXY_BLOCK, '{"routes": []}'),
green_attr=self.GREEN,
)
self.assertEqual([], [t for t, a in lines if a == self.GREEN])
def test_no_green_lines_for_capability_block(self):
lines = dashboard._detail_lines(
_qp(TOOL_CAPABILITY_BLOCK, "FROM python:3.13\n"),
green_attr=self.GREEN,
)
self.assertEqual([], [t for t, a in lines if a == self.GREEN])
def test_skips_host_line_when_url_unparseable(self):
# Shouldn't happen in production — supervise_server validates
# the URL before queuing — but if a malformed payload ever
# reaches the dashboard, don't render a misleading host line.
lines = dashboard._detail_lines(
_qp(TOOL_PIPELOCK_BLOCK, "garbage-not-a-url"),
green_attr=self.GREEN,
)
self.assertEqual([], [t for t, a in lines if a == self.GREEN])
def test_no_green_attr_passed_still_renders_host(self):
# Even without color support (green_attr=0), the host line
# is still present — it just won't be coloured.
lines = dashboard._detail_lines(
_qp(TOOL_PIPELOCK_BLOCK, "https://api.github.com/x"),
green_attr=0,
)
# Last non-empty line should be the host.
non_empty = [t for t, _ in lines if t]
self.assertEqual("api.github.com", non_empty[-1])
class TestFailedUrlHost(unittest.TestCase):
def test_extracts_hostname(self):
self.assertEqual(
"api.github.com",
dashboard._failed_url_host("https://api.github.com/repos/foo"),
)
def test_returns_empty_for_unparseable(self):
self.assertEqual("", dashboard._failed_url_host("not a url"))
def test_returns_empty_for_url_without_host(self):
self.assertEqual("", dashboard._failed_url_host("https:///nohost"))
if __name__ == "__main__":
unittest.main()
+39
View File
@@ -0,0 +1,39 @@
"""Unit: dashboard's new-proposal highlight window.
The curses rendering itself is exercised manually; this isolates
the pure decision `is the proposal still in its post-arrival
highlight window?`"""
import unittest
from claude_bottle.cli import dashboard
class TestIsRecent(unittest.TestCase):
def test_just_seen_is_recent(self):
self.assertTrue(dashboard._is_recent("p1", {"p1": 100.0}, now=100.5))
def test_seen_within_window(self):
# Default window is 5s.
self.assertTrue(
dashboard._is_recent("p1", {"p1": 100.0}, now=104.9),
)
def test_seen_past_window_is_not_recent(self):
self.assertFalse(
dashboard._is_recent("p1", {"p1": 100.0}, now=106.0),
)
def test_unknown_proposal_is_not_recent(self):
self.assertFalse(
dashboard._is_recent("p2", {"p1": 100.0}, now=100.5),
)
def test_none_args_safe_default(self):
self.assertFalse(dashboard._is_recent("p1", None, None))
self.assertFalse(dashboard._is_recent("p1", {"p1": 100.0}, None))
self.assertFalse(dashboard._is_recent("p1", None, 100.5))
if __name__ == "__main__":
unittest.main()
+15
View File
@@ -91,6 +91,21 @@ class TestAllowlistWithTokens(unittest.TestCase):
eff = pipelock_effective_allowlist(_bottle({}))
self.assertNotIn("cred-proxy", eff)
def test_supervise_hostname_auto_added_when_supervise_enabled(self):
# Same reasoning as cred-proxy: the agent's HTTP_PROXY points
# at pipelock, so http://supervise:9100/ (the MCP endpoint)
# arrives at pipelock as hostname `supervise`. Without this
# auto-allow, claude-code's MCP client gets a 403 and the
# supervise server shows up as "failed" in /mcp.
eff = pipelock_effective_allowlist(_bottle({"supervise": True}))
self.assertIn("supervise", eff)
def test_supervise_hostname_NOT_added_when_disabled(self):
eff = pipelock_effective_allowlist(_bottle({}))
self.assertNotIn("supervise", eff)
eff_explicit = pipelock_effective_allowlist(_bottle({"supervise": False}))
self.assertNotIn("supervise", eff_explicit)
class TestTlsPassthrough(unittest.TestCase):
def test_default_includes_api_anthropic(self):
+15 -23
View File
@@ -1,37 +1,29 @@
"""Unit: supervise MCP settings renderer (PRD 0013 follow-up).
"""Unit: supervise MCP provisioning (PRD 0013 follow-up).
The docker cp / chown side of provision_supervise is exercised by
the existing supervise integration test once the agent container is
brought up; here we cover the pure render path so a settings.json
shape regression would surface in unit-level CI."""
The real provisioning runs `claude mcp add` inside the agent
container — exercised by the existing supervise integration test
chain once the agent container is brought up. Here we just cover
the URL computation so a regression in SUPERVISE_HOSTNAME / PORT
plumbing surfaces in unit CI."""
import json
import unittest
from claude_bottle.backend.docker.provision.supervise import render_settings
from claude_bottle.backend.docker.provision.supervise import supervise_mcp_url
from claude_bottle.supervise import SUPERVISE_HOSTNAME, SUPERVISE_PORT
class TestRenderSettings(unittest.TestCase):
def test_output_is_valid_json(self):
json.loads(render_settings())
def test_has_mcp_servers_supervise_http_entry(self):
cfg = json.loads(render_settings())
servers = cfg["mcpServers"]
self.assertIn("supervise", servers)
sv = servers["supervise"]
self.assertEqual("http", sv["type"])
class TestSuperviseMcpUrl(unittest.TestCase):
def test_url_matches_sidecar_constants(self):
self.assertEqual(
f"http://{SUPERVISE_HOSTNAME}:{SUPERVISE_PORT}/",
sv["url"],
supervise_mcp_url(),
)
def test_only_supervise_server_is_emitted(self):
cfg = json.loads(render_settings())
# Keep the provisioner narrowly scoped — it owns just the
# supervise entry, no other tools/servers.
self.assertEqual({"supervise"}, set(cfg["mcpServers"].keys()))
def test_url_is_http_not_https(self):
# The agent dials the sidecar on the internal docker network;
# no TLS termination, no CA trust juggling. If this ever
# needs HTTPS, the sidecar's listener side has to change too.
self.assertTrue(supervise_mcp_url().startswith("http://"))
if __name__ == "__main__":
+18 -6
View File
@@ -61,15 +61,27 @@ class TestValidation(unittest.TestCase):
'{"routes": [{"path": "/x/", "upstream": "https://example.com"}]}',
)
def test_pipelock_block_accepts_clean_hostnames(self):
def test_pipelock_block_accepts_https_url(self):
validate_proposed_file(
_sv.TOOL_PIPELOCK_BLOCK,
"api.example.com\n# comment\nfoo.bar.baz\n",
"https://api.github.com/repos/foo/bar",
)
def test_pipelock_block_rejects_invalid_char(self):
with self.assertRaises(_RpcError):
validate_proposed_file(_sv.TOOL_PIPELOCK_BLOCK, "host with space.com\n")
def test_pipelock_block_accepts_http_url(self):
validate_proposed_file(
_sv.TOOL_PIPELOCK_BLOCK,
"http://internal.example/path/to/thing",
)
def test_pipelock_block_rejects_missing_scheme(self):
with self.assertRaises(_RpcError) as cm:
validate_proposed_file(_sv.TOOL_PIPELOCK_BLOCK, "api.github.com/foo")
self.assertIn("http://", str(cm.exception.message))
def test_pipelock_block_rejects_missing_host(self):
with self.assertRaises(_RpcError) as cm:
validate_proposed_file(_sv.TOOL_PIPELOCK_BLOCK, "https:///just-a-path")
self.assertIn("hostname", str(cm.exception.message))
def test_capability_block_accepts_anything_nonempty(self):
validate_proposed_file(
@@ -235,7 +247,7 @@ class TestHandleToolsCall(unittest.TestCase):
{
"name": _sv.TOOL_PIPELOCK_BLOCK,
"arguments": {
"allowlist": "example.com\n",
"failed_url": "https://example.com/path",
"justification": "needed for tests",
},
},