diff --git a/tests/integration/test_supervise_sidecar.py b/tests/integration/test_supervise_sidecar.py new file mode 100644 index 0000000..6e65f05 --- /dev/null +++ b/tests/integration/test_supervise_sidecar.py @@ -0,0 +1,240 @@ +"""Integration: drive `DockerSupervise.start` against the supervise +sidecar and round-trip an MCP tool call through the queue (PRD 0013). + +Topology mirrors production minimally: a per-bottle internal docker +network for the agent ↔ supervise leg, no egress network (supervise +doesn't make outbound calls). The "agent" is a curl container on the +internal net; the supervisor lives on the host (this test process) +and uses claude_bottle.cli.dashboard helpers to write Response files. + +Verifies: + 1. `tools/list` returns the three PRD 0013 tool names over real MCP + wire format. + 2. A `tools/call` from the in-container agent blocks until the host + writes a Response to the queue; once written, the agent receives + the approval payload. +""" + +from __future__ import annotations + +import json +import os +import shutil +import subprocess +import tempfile +import threading +import time +import unittest +from pathlib import Path + +from claude_bottle import supervise as _sv +from claude_bottle.backend.docker.network import ( + network_create_internal, + network_remove, +) +from claude_bottle.backend.docker.supervise import ( + DockerSupervise, + build_supervise_image, + supervise_container_name, +) +from claude_bottle.cli import dashboard +from claude_bottle.supervise import SupervisePlan, list_pending_proposals +from tests._docker import skip_unless_docker + + +CURL_IMAGE = "curlimages/curl:latest" + + +@skip_unless_docker() +class TestSuperviseSidecar(unittest.TestCase): + @classmethod + def setUpClass(cls): + r = subprocess.run( + ["docker", "pull", CURL_IMAGE], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + check=False, + ) + if r.returncode != 0: + raise unittest.SkipTest(f"could not pull {CURL_IMAGE}") + build_supervise_image() + + def setUp(self): + self.slug = f"cb-test-sv-{os.getpid()}-{int(time.time())}" + self.sidecar_name = "" + self.internal_net = "" + self.work_dir = Path(tempfile.mkdtemp(prefix="supervise-int.")) + self.queue_dir = self.work_dir / "queue" + self.queue_dir.mkdir() + + def tearDown(self): + if self.sidecar_name: + subprocess.run( + ["docker", "rm", "-f", self.sidecar_name], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + check=False, + ) + if self.internal_net: + network_remove(self.internal_net) + shutil.rmtree(self.work_dir, ignore_errors=True) + + def _bring_up_sidecar(self) -> None: + self.internal_net = network_create_internal(self.slug) + plan = SupervisePlan( + slug=self.slug, + queue_dir=self.queue_dir, + current_config_dir=self.work_dir / "current-config", + internal_network=self.internal_net, + ) + # current_config_dir isn't bind-mounted into the sidecar, only + # the queue dir is. Create it for symmetry with production. + plan.current_config_dir.mkdir() + self.sidecar_name = DockerSupervise().start(plan) + + # Block until the server is ready to answer (the container + # `docker start` returns immediately; python is still + # binding to the port). + deadline = time.monotonic() + 10.0 + while time.monotonic() < deadline: + rc = subprocess.run( + [ + "docker", "run", "--rm", + "--network", self.internal_net, + CURL_IMAGE, + "-fsS", "-o", "/dev/null", + "--max-time", "2", + f"http://{_sv.SUPERVISE_HOSTNAME}:{_sv.SUPERVISE_PORT}/health", + ], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + check=False, + ).returncode + if rc == 0: + return + time.sleep(0.25) + raise AssertionError("supervise sidecar /health never came up") + + def _curl_jsonrpc(self, body: dict[str, object]) -> dict[str, object]: + """Invoke curl on the internal network to POST a JSON-RPC + request to the supervise sidecar and parse the response.""" + payload = json.dumps(body) + result = subprocess.run( + [ + "docker", "run", "--rm", + "--network", self.internal_net, + CURL_IMAGE, + "-sS", "--max-time", "30", + "-H", "Content-Type: application/json", + "-X", "POST", + "--data", payload, + f"http://{_sv.SUPERVISE_HOSTNAME}:{_sv.SUPERVISE_PORT}/", + ], + capture_output=True, + text=True, + check=False, + ) + if result.returncode != 0: + raise AssertionError( + f"curl to supervise failed: {result.stderr}\n" + f"stdout: {result.stdout}" + ) + return json.loads(result.stdout) + + def test_tools_list_over_mcp(self): + self._bring_up_sidecar() + result = self._curl_jsonrpc( + {"jsonrpc": "2.0", "id": 1, "method": "tools/list"}, + ) + self.assertEqual(1, result["id"]) + names = {t["name"] for t in result["result"]["tools"]} + self.assertEqual( + { + _sv.TOOL_CRED_PROXY_BLOCK, + _sv.TOOL_PIPELOCK_BLOCK, + _sv.TOOL_CAPABILITY_BLOCK, + }, + names, + ) + + def test_tools_call_round_trips_through_queue(self): + """End-to-end: agent in the bottle calls cred-proxy-block; + the call blocks on the queue; the host approves via the + dashboard helpers; the agent receives the approval.""" + self._bring_up_sidecar() + + captured: dict[str, object] = {} + + def caller() -> None: + captured["response"] = self._curl_jsonrpc({ + "jsonrpc": "2.0", "id": 7, "method": "tools/call", + "params": { + "name": _sv.TOOL_CRED_PROXY_BLOCK, + "arguments": { + "routes": '{"routes": [{"path": "/x/"}]}', + "justification": "integration test", + }, + }, + }) + + t = threading.Thread(target=caller) + t.start() + try: + # Wait for the proposal to appear in the queue (the + # sidecar writes it before blocking on wait_for_response). + deadline = time.monotonic() + 10.0 + qp = None + while time.monotonic() < deadline: + pending = list_pending_proposals(self.queue_dir) + if pending: + qp = dashboard.QueuedProposal( + proposal=pending[0], queue_dir=self.queue_dir, + ) + break + time.sleep(0.1) + self.assertIsNotNone(qp, "proposal never appeared in queue") + assert qp is not None # type-narrowing + self.assertEqual( + _sv.TOOL_CRED_PROXY_BLOCK, qp.proposal.tool, + ) + self.assertEqual("integration test", qp.proposal.justification) + + # Approve via the dashboard helper (same path the TUI + # uses). For 0013 this writes a Response file + a no-op + # audit entry (no real config change). + dashboard.approve(qp, notes="lgtm from integration test") + finally: + t.join(timeout=20) + + response = captured.get("response") + self.assertIsNotNone(response, "curl thread never produced a response") + assert isinstance(response, dict) # type-narrowing + self.assertEqual(7, response["id"]) + result = response["result"] + assert isinstance(result, dict) + self.assertFalse(result.get("isError")) + text = result["content"][0]["text"] + self.assertIn("status: approved", text) + self.assertIn("notes: lgtm from integration test", text) + + def test_orphan_sidecar_name_collision_recovered(self): + """An orphan supervise sidecar from a previous run blocks + the next .start with a duplicate-name error. Documents the + observed behavior so a future change that adds auto-cleanup + can flip the assertion.""" + self._bring_up_sidecar() + self.assertEqual(supervise_container_name(self.slug), self.sidecar_name) + # Second .start should fail because the container name is + # taken. cleanup is handled by the orphan probe in prepare.py + # (tested separately in test_orphan_cleanup). + with self.assertRaises(SystemExit): + DockerSupervise().start(SupervisePlan( + slug=self.slug, + queue_dir=self.queue_dir, + current_config_dir=self.work_dir / "current-config", + internal_network=self.internal_net, + )) + + +if __name__ == "__main__": + unittest.main()