From c6080be4f48b7cd6da8d2c56a260db04df214124 Mon Sep 17 00:00:00 2001 From: claude Date: Sat, 6 Jun 2026 16:41:57 +0000 Subject: [PATCH] feat(supervise)!: remove egress-block MCP tool and runtime route-mutation Drops `egress-block` from the supervise sidecar, removes `_merge_single_route`, `add_route`, and `apply_routes_change` from egress_apply.py, and strips the proposal/approve/reject flow for egress from the supervise CLI. The list-egress-routes and capability-block tools are unaffected. Tests updated throughout. Closes #198 --- bot_bottle/backend/docker/egress_apply.py | 196 +--------------------- bot_bottle/cli/supervise.py | 19 +-- bot_bottle/supervise.py | 9 +- bot_bottle/supervise_server.py | 148 +--------------- tests/unit/test_egress_apply.py | 139 +-------------- tests/unit/test_supervise.py | 26 ++- tests/unit/test_supervise_cli.py | 170 ++----------------- tests/unit/test_supervise_server.py | 24 +-- 8 files changed, 63 insertions(+), 668 deletions(-) diff --git a/bot_bottle/backend/docker/egress_apply.py b/bot_bottle/backend/docker/egress_apply.py index 87b6d42..3e03de2 100644 --- a/bot_bottle/backend/docker/egress_apply.py +++ b/bot_bottle/backend/docker/egress_apply.py @@ -1,70 +1,20 @@ -"""Host-side helper to apply a routes.yaml change to a running -egress sidecar (PRD 0014 retargeted by PRD 0017 chunk 3, PRD 0053). +"""Host-side helper for egress sidecar inspection (issue #198). -Used by the supervise dashboard when the operator approves an -egress-block proposal. Fetches current routes.yaml, validates, -writes into the sidecar, then SIGHUPs to reload. +`_merge_single_route`, `add_route`, and `apply_routes_change` were +removed when the egress-block MCP tool was dropped. The remaining +helpers support runtime inspection and validation of the routes file +without modifying it at runtime. """ from __future__ import annotations -import json import subprocess -from pathlib import Path -from typing import cast from ...egress import EGRESS_ROUTES_IN_CONTAINER from ...egress_addon_core import load_routes -from ...yaml_subset import YamlSubsetError, parse_yaml_subset -from .bottle_state import egress_state_dir from .sidecar_bundle import sidecar_bundle_container_name -def _render_routes_payload(routes_list: list[dict[str, object]]) -> str: - """Render a list-of-dicts routes payload as YAML matching the - shape `egress_render_routes` produces.""" - if not routes_list: - return "routes: []\n" - lines: list[str] = ["routes:"] - for entry in routes_list: - host = str(entry.get("host", "")) - lines.append(f' - host: "{host}"') - auth_scheme = entry.get("auth_scheme") - token_env = entry.get("token_env") - if auth_scheme and token_env: - lines.append(f' auth_scheme: "{auth_scheme}"') - lines.append(f' token_env: "{token_env}"') - matches_obj = entry.get("matches") - if isinstance(matches_obj, list) and matches_obj: - lines.append(" matches:") - for match_entry in matches_obj: - me = cast(dict[str, object], match_entry) - first_key = True - if "paths" in me: - lines.append(" - paths:") - first_key = False - for pd in cast(list[dict[str, str]], me["paths"]): - if "type" in pd: - lines.append(f' - type: "{pd["type"]}"') - lines.append(f' value: "{pd["value"]}"') - else: - lines.append(f' - value: "{pd["value"]}"') - if "methods" in me: - methods_str = ", ".join( - f'"{m}"' for m in cast(list[str], me["methods"]) - ) - prefix = " - " if first_key else " " - lines.append(f'{prefix}methods: [{methods_str}]') - first_key = False - if first_key: - lines.append(" - {}") - return "\n".join(lines) + "\n" - - -def _egress_routes_host_path(slug: str) -> Path: - return egress_state_dir(slug) / "egress_routes.yaml" - - class EgressApplyError(RuntimeError): pass @@ -92,144 +42,8 @@ def validate_routes_content(content: str) -> None: ) from e -def apply_routes_change(slug: str, new_content: str) -> tuple[str, str]: - container = sidecar_bundle_container_name(slug) - before = fetch_current_routes(slug) - validate_routes_content(new_content) - - target = _egress_routes_host_path(slug) - target.parent.mkdir(parents=True, exist_ok=True) - target.write_text(new_content) - target.chmod(0o644) - sig = subprocess.run( - ["docker", "kill", "--signal", "HUP", container], - capture_output=True, text=True, check=False, - ) - if sig.returncode != 0: - raise EgressApplyError( - f"failed to SIGHUP {container}: " - f"{(sig.stderr or '').strip()}" - ) - - return before, new_content - - -def _merge_single_route( - current_yaml: str, new_route: dict[str, object], -) -> str: - """Merge a single proposed route into the current routes.yaml. - - - Host absent → append the route. - - Host present → union the match paths (proposed ∪ existing). - Auth is preserved from existing route. - """ - try: - cfg = parse_yaml_subset(current_yaml) - except YamlSubsetError as e: - raise EgressApplyError( - f"current routes.yaml is not valid YAML: {e}" - ) from e - routes = cfg.get("routes") - if not isinstance(routes, list): - raise EgressApplyError( - "current routes.yaml: 'routes' is not a list" - ) - routes_typed = cast(list[object], routes) - - new_host = str(new_route.get("host", "")).lower() - if not new_host: - raise EgressApplyError( - "proposed route is missing 'host'" - ) - - # Build proposed matches from the input - proposed_matches = new_route.get("matches") - if proposed_matches is None: - # Accept legacy path_allowlist from agent proposals and convert - proposed_paths = new_route.get("path_allowlist") - if isinstance(proposed_paths, list) and proposed_paths: - proposed_matches = [{"paths": [{"value": p} for p in proposed_paths]}] - - for entry in routes_typed: - if not isinstance(entry, dict): - continue - entry_typed = cast(dict[str, object], entry) - if str(entry_typed.get("host", "")).lower() == new_host: - # Merge matches: union path values from proposed into existing - if isinstance(proposed_matches, list) and proposed_matches: - existing_matches = entry_typed.get("matches") - if not isinstance(existing_matches, list): - existing_matches = [] - # Simple merge: collect all existing path values, add new ones - existing_paths: set[str] = set() - for me in existing_matches: - me_typed = cast(dict[str, object], me) if isinstance(me, dict) else {} - paths = me_typed.get("paths") - if isinstance(paths, list): - for p in paths: - p_typed = cast(dict[str, object], p) if isinstance(p, dict) else {} - val = p_typed.get("value") - if isinstance(val, str): - existing_paths.add(val) - new_paths: list[str] = [] - for me in proposed_matches: - me_typed = cast(dict[str, object], me) if isinstance(me, dict) else {} - paths = me_typed.get("paths") - if isinstance(paths, list): - for p in paths: - p_typed = cast(dict[str, object], p) if isinstance(p, dict) else {} - val = p_typed.get("value") - if isinstance(val, str) and val not in existing_paths: - new_paths.append(val) - existing_paths.add(val) - if new_paths: - existing_matches.append( - {"paths": [{"value": p} for p in new_paths]} - ) - entry_typed["matches"] = existing_matches - break - else: - entry_typed: dict[str, object] = {"host": new_route.get("host")} # type: ignore - if isinstance(proposed_matches, list) and proposed_matches: - entry_typed["matches"] = proposed_matches - auth = new_route.get("auth") - if isinstance(auth, dict) and auth.get("scheme") and auth.get("token_ref"): # type: ignore - auth_typed = cast(dict[str, object], auth) - existing_slots = sorted({ - str(r_entry.get("token_env", "")) - for r_entry_obj in routes_typed - if isinstance(r_entry_obj, dict) - for r_entry in [cast(dict[str, object], r_entry_obj)] - if r_entry.get("token_env") - }) - next_idx = len(existing_slots) - entry_typed["auth_scheme"] = str(cast(object, auth_typed.get("scheme"))) - entry_typed["token_env"] = f"EGRESS_TOKEN_{next_idx}" - routes_typed.append(entry_typed) - - return _render_routes_payload(cast(list[dict[str, object]], routes_typed)) - - -def add_route(slug: str, proposed_route_json: str) -> tuple[str, str]: - try: - proposed = json.loads(proposed_route_json) - except json.JSONDecodeError as e: - raise EgressApplyError( - f"proposed route is not valid JSON: {e}" - ) from e - if not isinstance(proposed, dict): - raise EgressApplyError( - "proposed route must be a JSON object" - ) - current = fetch_current_routes(slug) - merged = _merge_single_route(current, proposed) - return apply_routes_change(slug, merged) - - __all__ = [ "EgressApplyError", - "add_route", - "apply_routes_change", "fetch_current_routes", "validate_routes_content", ] diff --git a/bot_bottle/cli/supervise.py b/bot_bottle/cli/supervise.py index 3bd5975..20849c9 100644 --- a/bot_bottle/cli/supervise.py +++ b/bot_bottle/cli/supervise.py @@ -2,9 +2,8 @@ act on them (approve / modify / reject). Curses-based TUI; modify-then-approve shells out to $EDITOR. The -approval handlers wire to the per-tool remediation engines: -PRD 0014 (egress) writes routes.yaml + SIGHUPs egress; PRD 0016 -(capability) rebuilds the bottle Dockerfile. +approval handler wires to PRD 0016 (capability-block), which rebuilds +the bottle Dockerfile. The egress-block tool was removed in issue #198. """ from __future__ import annotations @@ -26,7 +25,6 @@ from ..backend.docker.capability_apply import ( CapabilityApplyError, apply_capability_change, ) -from ..backend.docker.egress_apply import EgressApplyError, add_route from ..log import Die, error, info from ..supervise import ( COMPONENT_FOR_TOOL, @@ -37,7 +35,6 @@ from ..supervise import ( STATUS_MODIFIED, STATUS_REJECTED, TOOL_CAPABILITY_BLOCK, - TOOL_EGRESS_BLOCK, archive_proposal, list_pending_proposals, render_diff, @@ -61,7 +58,7 @@ class QueuedProposal: # Errors any remediation engine may raise. Caught by the TUI key # handlers and surfaced in the status line so a failed apply keeps # the proposal pending rather than crashing curses. -ApplyError = (EgressApplyError, CapabilityApplyError) +ApplyError = (CapabilityApplyError,) def discover_pending() -> list[QueuedProposal]: @@ -82,9 +79,7 @@ def discover_pending() -> list[QueuedProposal]: def _approval_status(qp: QueuedProposal, verb: str) -> str: """Status-line text after a successful approval.""" base = f"{verb} {qp.proposal.tool} for [{qp.proposal.bottle_slug}]" - if qp.proposal.tool == TOOL_CAPABILITY_BLOCK: - return f"{base}; resume: ./cli.py resume {qp.proposal.bottle_slug}" - return base + return f"{base}; resume: ./cli.py resume {qp.proposal.bottle_slug}" def _detail_lines( @@ -132,11 +127,7 @@ def approve( file_to_apply = final_file if final_file is not None else qp.proposal.proposed_file diff_before, diff_after = "", "" - if qp.proposal.tool == TOOL_EGRESS_BLOCK: - diff_before, diff_after = add_route( - qp.proposal.bottle_slug, file_to_apply, - ) - elif qp.proposal.tool == TOOL_CAPABILITY_BLOCK: + if qp.proposal.tool == TOOL_CAPABILITY_BLOCK: _meta = read_metadata(qp.proposal.bottle_slug) if _meta is not None and not _meta.compose_project: raise CapabilityApplyError( diff --git a/bot_bottle/supervise.py b/bot_bottle/supervise.py index b6a2806..68c1bf1 100644 --- a/bot_bottle/supervise.py +++ b/bot_bottle/supervise.py @@ -48,11 +48,9 @@ from pathlib import Path SUPERVISE_HOSTNAME = "supervise" SUPERVISE_PORT = 9100 -TOOL_EGRESS_BLOCK = "egress-block" TOOL_CAPABILITY_BLOCK = "capability-block" TOOL_LIST_EGRESS_ROUTES = "list-egress-routes" TOOLS: tuple[str, ...] = ( - TOOL_EGRESS_BLOCK, TOOL_CAPABILITY_BLOCK, TOOL_LIST_EGRESS_ROUTES, ) @@ -70,10 +68,8 @@ EGRESS_INTROSPECT_URL = "http://_egress.local/allowlist" # capability-block has no on-disk config the operator edits in place # (the Dockerfile is rebuilt, not patched), so it has no audit log # here — those changes are captured by git history + the rebuild -# record laid down in PRD 0016. -COMPONENT_FOR_TOOL: dict[str, str] = { - TOOL_EGRESS_BLOCK: "egress", -} +# record laid down in PRD 0016. egress-block was removed in issue #198. +COMPONENT_FOR_TOOL: dict[str, str] = {} STATUS_APPROVED = "approved" STATUS_MODIFIED = "modified" @@ -555,7 +551,6 @@ __all__ = [ "EGRESS_FORWARD_PROXY", "EGRESS_INTROSPECT_URL", "TOOL_CAPABILITY_BLOCK", - "TOOL_EGRESS_BLOCK", "TOOL_LIST_EGRESS_ROUTES", "archive_proposal", "audit_dir", diff --git a/bot_bottle/supervise_server.py b/bot_bottle/supervise_server.py index cf2fd10..f5fe67f 100644 --- a/bot_bottle/supervise_server.py +++ b/bot_bottle/supervise_server.py @@ -1,8 +1,10 @@ """Supervise sidecar HTTP server (PRD 0013). -Per-bottle MCP server exposing two tools — `egress-block`, -`capability-block` — that the agent calls to propose config changes -when stuck. Each tool call: +Per-bottle MCP server exposing tools the agent calls to propose config +changes when stuck. The egress-block tool was removed in issue #198; +the remaining tools are `capability-block` and `list-egress-routes`. + +Each queued tool call: 1. Validates the proposed file syntactically. 2. Writes a Proposal to /run/supervise/queue/ (bind-mounted from @@ -133,69 +135,6 @@ def jsonrpc_error(request_id: object, code: int, message: str) -> bytes: TOOL_DEFINITIONS: list[dict[str, object]] = [ - { - "name": _sv.TOOL_EGRESS_BLOCK, - "description": ( - "Call when egress refused your HTTPS request — host " - "without a matching route, or a request that did not match " - "the route's matches rules (typically a 403 from the " - "proxy). Propose a SINGLE route to add: the host you " - "need + (optionally) a path_allowlist of path prefixes + " - "(optionally) an auth block. The supervisor merges the " - "route into the live table at approval time — you do NOT " - "need to see or reproduce the existing routes. If the " - "host already has a route, the proposed paths are unioned " - "with the existing ones (host stays single-route). The " - "operator approves or rejects in the supervise TUI. On " - "approval the supervisor writes the merged routes.yaml " - "and SIGHUPs egress (no dropped connections)." - ), - "inputSchema": { - "type": "object", - "properties": { - "host": { - "type": "string", - "description": ( - "The hostname to allow (e.g. 'api.github.com'). " - "Case-insensitive on match." - ), - }, - "path_allowlist": { - "type": "array", - "items": {"type": "string"}, - "description": ( - "Optional URL path prefixes the route permits. " - "Each must start with '/'. Omit to allow all " - "paths under this host (bare-pass route). " - "Internally converted to matches entries." - ), - }, - "auth": { - "type": "object", - "description": ( - "Optional credential injection. {scheme, " - "token_ref}: scheme is 'Bearer' or 'token'; " - "token_ref names the host env var holding the " - "secret value. Omit to add a host without " - "credential injection. Ignored if the host " - "already has a route (operator decides auth " - "changes, not the agent)." - ), - "properties": { - "scheme": {"type": "string"}, - "token_ref": {"type": "string"}, - }, - "required": ["scheme", "token_ref"], - "additionalProperties": False, - }, - "justification": { - "type": "string", - "description": "Why this host needs to be allowed.", - }, - }, - "required": ["host", "justification"], - }, - }, { "name": _sv.TOOL_LIST_EGRESS_ROUTES, "description": ( @@ -254,11 +193,6 @@ PROPOSED_FILE_FIELD: dict[str, str] = { # --- Validation ------------------------------------------------------------ -# Auth schemes accepted on egress-block proposals — match the -# manifest-side EGRESS_AUTH_SCHEMES. -_AUTH_SCHEMES = ("Bearer", "token") - - def validate_proposed_file(tool: str, content: str) -> None: """Syntactic validation. The operator is the real gate; this just catches obvious paste-errors / wrong-tool selections before they @@ -273,70 +207,6 @@ def validate_proposed_file(tool: str, content: str) -> None: raise _RpcError(ERR_INVALID_PARAMS, f"unknown tool {tool!r}") -def _validate_and_bundle_egress_route( - args: dict[str, object], -) -> str: - """Validate egress-block input fields and bundle them into - a JSON string that becomes the Proposal.proposed_file. Raises - _RpcError on bad input — the agent retries with a fixed shape.""" - tool = _sv.TOOL_EGRESS_BLOCK - host = args.get("host") - if not isinstance(host, str) or not host.strip(): - raise _RpcError( - ERR_INVALID_PARAMS, - f"{tool}: 'host' is required and must be a non-empty string", - ) - payload: dict[str, object] = {"host": host} - - path_allow_raw = args.get("path_allowlist") - if path_allow_raw is not None: - if not isinstance(path_allow_raw, list): - raise _RpcError( - ERR_INVALID_PARAMS, - f"{tool}: 'path_allowlist' must be an array of strings", - ) - prefixes: list[str] = [] - for i, p in enumerate(path_allow_raw): - if not isinstance(p, str): - raise _RpcError( - ERR_INVALID_PARAMS, - f"{tool}: path_allowlist[{i}] must be a string", - ) - if not p.startswith("/"): - raise _RpcError( - ERR_INVALID_PARAMS, - f"{tool}: path_allowlist[{i}] {p!r} must start with '/'", - ) - prefixes.append(p) - if prefixes: - payload["path_allowlist"] = prefixes - - auth_raw = args.get("auth") - if auth_raw is not None: - if not isinstance(auth_raw, dict): - raise _RpcError( - ERR_INVALID_PARAMS, - f"{tool}: 'auth' must be an object with 'scheme' and 'token_ref'", - ) - scheme = auth_raw.get("scheme") - token_ref = auth_raw.get("token_ref") - if not isinstance(scheme, str) or scheme not in _AUTH_SCHEMES: - raise _RpcError( - ERR_INVALID_PARAMS, - f"{tool}: auth.scheme must be one of " - f"{', '.join(_AUTH_SCHEMES)} (got {scheme!r})", - ) - if not isinstance(token_ref, str) or not token_ref: - raise _RpcError( - ERR_INVALID_PARAMS, - f"{tool}: auth.token_ref must be a non-empty string " - f"naming the host env var holding the token", - ) - payload["auth"] = {"scheme": scheme, "token_ref": token_ref} - - return json.dumps(payload, indent=2) + "\n" - - # --- MCP handlers ---------------------------------------------------------- @@ -422,13 +292,7 @@ def handle_tools_call( f"{name}: 'justification' is required and must be a non-empty string", ) - if name == _sv.TOOL_EGRESS_BLOCK: - # Structured input → JSON bundle on Proposal.proposed_file. - # The dashboard's apply step (egress_apply.add_route) - # parses this JSON, fetches the current routes, merges in - # the new one, and writes the merged file. - proposed_file = _validate_and_bundle_egress_route(args_raw) - elif name in PROPOSED_FILE_FIELD: + if name in PROPOSED_FILE_FIELD: file_field = PROPOSED_FILE_FIELD[name] proposed_file = args_raw.get(file_field) if not isinstance(proposed_file, str): diff --git a/tests/unit/test_egress_apply.py b/tests/unit/test_egress_apply.py index 71a77d0..36e515d 100644 --- a/tests/unit/test_egress_apply.py +++ b/tests/unit/test_egress_apply.py @@ -1,27 +1,19 @@ -"""Unit: validate_routes_content (PRD 0014 retargeted by PRD 0017 -chunk 3, PRD 0053). docker exec / cp / kill paths are covered by the -integration test.""" +"""Unit: validate_routes_content (issue #198: _merge_single_route and +add_route removed; docker exec / cp / kill paths are covered by the +integration test).""" import unittest from bot_bottle.backend.docker.egress_apply import ( EgressApplyError, - _merge_single_route, validate_routes_content, ) -from bot_bottle.yaml_subset import parse_yaml_subset _ROUTES_EMPTY = "routes: []\n" _ROUTES_ONE = 'routes:\n - host: "api.anthropic.com"\n' -def _routes(parsed: str) -> list[dict]: # type: ignore - """Parse a YAML routes string and pull out the routes list, so - tests can assert on shape directly.""" - return parse_yaml_subset(parsed)["routes"] # type: ignore - - class TestValidateRoutesContent(unittest.TestCase): def test_accepts_minimal_route_table(self): validate_routes_content(_ROUTES_EMPTY) @@ -60,130 +52,5 @@ class TestValidateRoutesContent(unittest.TestCase): ) -class TestMergeSingleRoute(unittest.TestCase): - BASE = _ROUTES_ONE - - def test_appends_route_when_host_absent(self): - merged = _merge_single_route(self.BASE, {"host": "github.com"}) - hosts = [r["host"] for r in _routes(merged)] - self.assertEqual(["api.anthropic.com", "github.com"], hosts) - - def test_appends_matches(self): - merged = _merge_single_route( - self.BASE, - {"host": "github.com", "matches": [ - {"paths": [{"value": "/repos/x/"}]} - ]}, - ) - new_route = _routes(merged)[-1] - self.assertIn("matches", new_route) - - def test_appends_legacy_path_allowlist_as_matches(self): - merged = _merge_single_route( - self.BASE, - {"host": "github.com", "path_allowlist": ["/repos/x/"]}, - ) - new_route = _routes(merged)[-1] - self.assertIn("matches", new_route) - - def test_appends_auth_with_token_env_slot(self): - merged = _merge_single_route( - self.BASE, - { - "host": "api.github.com", - "auth": {"scheme": "Bearer", "token_ref": "GH"}, - }, - ) - new_route = _routes(merged)[-1] - self.assertEqual("Bearer", new_route["auth_scheme"]) - self.assertEqual("EGRESS_TOKEN_0", new_route["token_env"]) - - def test_auth_slot_increments_past_existing(self): - base = ( - 'routes:\n' - ' - host: "api.anthropic.com"\n' - ' auth_scheme: "Bearer"\n' - ' token_env: "EGRESS_TOKEN_0"\n' - ) - merged = _merge_single_route(base, { - "host": "api.github.com", - "auth": {"scheme": "Bearer", "token_ref": "GH"}, - }) - new_route = _routes(merged)[-1] - self.assertEqual("EGRESS_TOKEN_1", new_route["token_env"]) - - def test_existing_host_merges_match_paths_as_union(self): - base = ( - 'routes:\n' - ' - host: "github.com"\n' - ' matches:\n' - ' - paths:\n' - ' - value: "/a/"\n' - ) - merged = _merge_single_route(base, { - "host": "github.com", - "matches": [{"paths": [{"value": "/b/"}]}], - }) - routes = _routes(merged) - self.assertEqual(1, len(routes)) - all_paths: list[str] = [] - for me in routes[0].get("matches", []): - for p in me.get("paths", []): - all_paths.append(p["value"]) - self.assertIn("/a/", all_paths) - self.assertIn("/b/", all_paths) - - def test_existing_host_dedup_match_paths(self): - base = ( - 'routes:\n' - ' - host: "github.com"\n' - ' matches:\n' - ' - paths:\n' - ' - value: "/a/"\n' - ) - merged = _merge_single_route(base, { - "host": "github.com", - "matches": [{"paths": [{"value": "/a/"}, {"value": "/b/"}]}], - }) - all_paths: list[str] = [] - for me in _routes(merged)[0].get("matches", []): - for p in me.get("paths", []): - all_paths.append(p["value"]) - self.assertEqual(1, all_paths.count("/a/")) - self.assertIn("/b/", all_paths) - - def test_existing_host_preserves_existing_auth_ignores_proposed(self): - base = ( - 'routes:\n' - ' - host: "api.github.com"\n' - ' auth_scheme: "Bearer"\n' - ' token_env: "EGRESS_TOKEN_0"\n' - ) - merged = _merge_single_route(base, { - "host": "api.github.com", - "auth": {"scheme": "token", "token_ref": "DIFFERENT"}, - }) - route = _routes(merged)[0] - self.assertEqual("Bearer", route["auth_scheme"]) - self.assertEqual("EGRESS_TOKEN_0", route["token_env"]) - - def test_host_match_is_case_insensitive(self): - base = 'routes:\n - host: "GitHub.com"\n' - merged = _merge_single_route(base, { - "host": "github.com", - "matches": [{"paths": [{"value": "/x/"}]}], - }) - routes = _routes(merged) - self.assertEqual(1, len(routes)) - - def test_missing_host_raises(self): - with self.assertRaises(EgressApplyError): - _merge_single_route(self.BASE, {}) - - def test_invalid_current_yaml_raises(self): - with self.assertRaises(EgressApplyError): - _merge_single_route("routes:\n\tbad", {"host": "x.example"}) - - if __name__ == "__main__": unittest.main() diff --git a/tests/unit/test_supervise.py b/tests/unit/test_supervise.py index de92d73..c811d34 100644 --- a/tests/unit/test_supervise.py +++ b/tests/unit/test_supervise.py @@ -17,7 +17,6 @@ from bot_bottle.supervise import ( STATUS_MODIFIED, STATUS_REJECTED, TOOL_CAPABILITY_BLOCK, - TOOL_EGRESS_BLOCK, archive_proposal, audit_log_path, list_pending_proposals, @@ -37,16 +36,16 @@ FIXED_TS = datetime(2026, 5, 25, 12, 0, 0, tzinfo=timezone.utc) def _proposal( - tool: str = TOOL_EGRESS_BLOCK, - proposed: str = "{}", - justification: str = "need a route", + tool: str = TOOL_CAPABILITY_BLOCK, + proposed: str = "FROM python:3.13\n", + justification: str = "need a capability", ) -> Proposal: return Proposal.new( bottle_slug="dev", tool=tool, proposed_file=proposed, justification=justification, - current_file_hash=sha256_hex("{}"), + current_file_hash=sha256_hex(proposed), now=FIXED_TS, ) @@ -57,7 +56,7 @@ class TestProposalRoundtrip(unittest.TestCase): self.assertTrue(p.id) self.assertEqual("2026-05-25T12:00:00+00:00", p.arrival_timestamp) self.assertEqual("dev", p.bottle_slug) - self.assertEqual(TOOL_EGRESS_BLOCK, p.tool) + self.assertEqual(TOOL_CAPABILITY_BLOCK, p.tool) def test_to_from_dict_roundtrip(self): p = _proposal() @@ -142,14 +141,14 @@ class TestQueueIO(unittest.TestCase): def test_list_pending_sorted_by_arrival(self): # Fabricate two with explicit timestamps. a = Proposal.new( - bottle_slug="dev", tool=TOOL_EGRESS_BLOCK, - proposed_file="{}", justification="early", + bottle_slug="dev", tool=TOOL_CAPABILITY_BLOCK, + proposed_file="FROM python:3.13\n", justification="early", current_file_hash="x", now=datetime(2026, 5, 25, 10, 0, 0, tzinfo=timezone.utc), ) b = Proposal.new( - bottle_slug="dev", tool=TOOL_EGRESS_BLOCK, - proposed_file="{}", justification="late", + bottle_slug="dev", tool=TOOL_CAPABILITY_BLOCK, + proposed_file="FROM python:3.13\n", justification="late", current_file_hash="x", now=datetime(2026, 5, 25, 14, 0, 0, tzinfo=timezone.utc), ) @@ -318,16 +317,15 @@ class TestToolConstants(unittest.TestCase): def test_tools_tuple_matches_individual_constants(self): self.assertEqual( ( - TOOL_EGRESS_BLOCK, TOOL_CAPABILITY_BLOCK, supervise.TOOL_LIST_EGRESS_ROUTES, ), supervise.TOOLS, ) - def test_component_map_covers_egress_remediation_only(self): - self.assertIn(TOOL_EGRESS_BLOCK, supervise.COMPONENT_FOR_TOOL) - self.assertNotIn(TOOL_CAPABILITY_BLOCK, supervise.COMPONENT_FOR_TOOL) + def test_component_map_has_no_entries(self): + # egress-block removed in issue #198; capability-block never had one. + self.assertEqual({}, supervise.COMPONENT_FOR_TOOL) class _StubSupervise(supervise.Supervise): diff --git a/tests/unit/test_supervise_cli.py b/tests/unit/test_supervise_cli.py index 3fd8a09..d672a7f 100644 --- a/tests/unit/test_supervise_cli.py +++ b/tests/unit/test_supervise_cli.py @@ -1,12 +1,10 @@ -"""Unit: supervise headless paths (PRD 0013 phase 4, PRD 0014). +"""Unit: supervise headless paths (PRD 0013 phase 4, PRD 0016). The curses TUI itself isn't exercised here — these tests cover the -discovery + approve/reject + audit-write paths that the TUI's key -handlers call into. +discovery + approve/reject paths that the TUI's key handlers call into. -add_route is stubbed at the supervise CLI module level so the tests -don't need a running egress sidecar; the real docker exec/cp/SIGHUP -plumbing is covered by the integration test. +egress-block (add_route) was removed in issue #198; the TestEgressApplyWiring +class and all stubs for add_route have been dropped accordingly. """ import os @@ -17,7 +15,6 @@ from pathlib import Path from bot_bottle import supervise from bot_bottle.backend.docker.capability_apply import CapabilityApplyError -from bot_bottle.backend.docker.egress_apply import EgressApplyError from bot_bottle.cli import supervise as supervise_cli from bot_bottle.supervise import ( Proposal, @@ -25,7 +22,6 @@ from bot_bottle.supervise import ( STATUS_MODIFIED, STATUS_REJECTED, TOOL_CAPABILITY_BLOCK, - TOOL_EGRESS_BLOCK, read_audit_entries, read_response, sha256_hex, @@ -35,9 +31,8 @@ from bot_bottle.supervise import ( FIXED = datetime(2026, 5, 25, 12, 0, 0, tzinfo=timezone.utc) -def _proposal(slug: str = "dev", tool: str = TOOL_EGRESS_BLOCK) -> Proposal: +def _proposal(slug: str = "dev", tool: str = TOOL_CAPABILITY_BLOCK) -> Proposal: payloads = { - TOOL_EGRESS_BLOCK: '{"routes": []}\n', TOOL_CAPABILITY_BLOCK: "FROM python:3.13\n", } payload = payloads.get(tool, "") @@ -88,14 +83,14 @@ class TestDiscoverPending(_FakeHomeMixin, unittest.TestCase): def test_sorted_by_arrival_across_bottles(self): early = Proposal.new( - bottle_slug="api", tool=TOOL_EGRESS_BLOCK, - proposed_file="{}", justification="early", + bottle_slug="api", tool=TOOL_CAPABILITY_BLOCK, + proposed_file="FROM python:3.13\n", justification="early", current_file_hash="h", now=datetime(2026, 5, 25, 10, 0, 0, tzinfo=timezone.utc), ) late = Proposal.new( - bottle_slug="dev", tool=TOOL_EGRESS_BLOCK, - proposed_file="{}", justification="late", + bottle_slug="dev", tool=TOOL_CAPABILITY_BLOCK, + proposed_file="FROM python:3.13\n", justification="late", current_file_hash="h", now=datetime(2026, 5, 25, 14, 0, 0, tzinfo=timezone.utc), ) @@ -120,48 +115,38 @@ class TestDiscoverPending(_FakeHomeMixin, unittest.TestCase): class TestApproveReject(_FakeHomeMixin, unittest.TestCase): def setUp(self): self._setup_fake_home() - self._original_add_route = supervise_cli.add_route self._original_apply_capability = supervise_cli.apply_capability_change - # Default stubs: succeed with deterministic before/after so the - # audit log shows a non-empty diff. - supervise_cli.add_route = lambda slug, content: ( # type: ignore - '{"routes": []}\n', '{"routes": [{"host": "x"}]}\n', - ) supervise_cli.apply_capability_change = lambda slug, content: ( # type: ignore "FROM old\n", content, ) def tearDown(self): - supervise_cli.add_route = self._original_add_route supervise_cli.apply_capability_change = self._original_apply_capability self._teardown_fake_home() - def _enqueue(self, tool: str = TOOL_EGRESS_BLOCK): + def _enqueue(self, tool: str = TOOL_CAPABILITY_BLOCK): p = _proposal(tool=tool) qdir = supervise.queue_dir_for_slug("dev") qdir.mkdir(parents=True, exist_ok=True) supervise.write_proposal(qdir, p) return supervise_cli.QueuedProposal(proposal=p, queue_dir=qdir) - def test_approve_writes_response_and_audit(self): + def test_approve_writes_response(self): qp = self._enqueue() supervise_cli.approve(qp) - resp = read_response(qp.queue_dir, qp.proposal.id) + # capability-block is archived on approve, so the response file + # moves to processed/ before the caller can read it. + resp = read_response(qp.queue_dir / "processed", qp.proposal.id) self.assertEqual(STATUS_APPROVED, resp.status) self.assertIsNone(resp.final_file) - entries = read_audit_entries("egress", "dev") - self.assertEqual(1, len(entries)) - self.assertEqual("approved", entries[0].operator_action) def test_approve_with_final_file_marks_modified(self): qp = self._enqueue() - supervise_cli.approve(qp, final_file='{"routes": [{"path": "/x/"}]}\n', notes="tweaked") - resp = read_response(qp.queue_dir, qp.proposal.id) + supervise_cli.approve(qp, final_file="FROM bookworm\n", notes="tweaked") + resp = read_response(qp.queue_dir / "processed", qp.proposal.id) self.assertEqual(STATUS_MODIFIED, resp.status) - self.assertEqual('{"routes": [{"path": "/x/"}]}\n', resp.final_file) + self.assertEqual("FROM bookworm\n", resp.final_file) self.assertEqual("tweaked", resp.notes) - entries = read_audit_entries("egress", "dev") - self.assertEqual("modified", entries[0].operator_action) def test_reject_writes_rejection(self): qp = self._enqueue() @@ -169,113 +154,13 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase): resp = read_response(qp.queue_dir, qp.proposal.id) self.assertEqual(STATUS_REJECTED, resp.status) self.assertEqual("nope", resp.notes) - entries = read_audit_entries("egress", "dev") - self.assertEqual("rejected", entries[0].operator_action) - self.assertEqual("nope", entries[0].operator_notes) - def test_capability_block_skips_audit_log(self): + def test_no_audit_log_for_capability_block(self): qp = self._enqueue(tool=TOOL_CAPABILITY_BLOCK) supervise_cli.approve(qp) - # No audit log for capability-block (per PRD 0013 / 0016). self.assertEqual([], read_audit_entries("egress", "dev")) -class TestEgressApplyWiring(_FakeHomeMixin, unittest.TestCase): - """PRD 0017 chunk 3: approve() on an egress-block proposal - must call add_route (single-route merge) with the right args - and surface its failures.""" - - def setUp(self): - self._setup_fake_home() - self._original_add_route = supervise_cli.add_route - - def tearDown(self): - supervise_cli.add_route = self._original_add_route - self._teardown_fake_home() - - def _enqueue_egress(self, proposed: str = '{"host": "x.example"}\n'): - p = Proposal.new( - bottle_slug="dev", tool=TOOL_EGRESS_BLOCK, - proposed_file=proposed, - justification="need a route", - current_file_hash=sha256_hex(proposed), - now=FIXED, - ) - qdir = supervise.queue_dir_for_slug("dev") - qdir.mkdir(parents=True, exist_ok=True) - supervise.write_proposal(qdir, p) - return supervise_cli.QueuedProposal(proposal=p, queue_dir=qdir) - - def test_egress_block_calls_add_route_with_proposed_json(self): - calls = [] - supervise_cli.add_route = lambda slug, content: ( # type: ignore - calls.append((slug, content)) or ("before", "after") - ) - qp = self._enqueue_egress( - proposed='{"host": "new.example", "path_allowlist": ["/x/"]}\n' - ) - supervise_cli.approve(qp) - self.assertEqual(1, len(calls)) - slug, content = calls[0] - self.assertEqual("dev", slug) - # The single-route JSON the agent proposed reaches add_route - # unchanged — add_route fetches current state + merges. - self.assertEqual( - '{"host": "new.example", "path_allowlist": ["/x/"]}\n', - content, - ) - - def test_modify_passes_final_file_to_add_route(self): - calls = [] - supervise_cli.add_route = lambda slug, content: ( # type: ignore - calls.append(content) or ("before", "after") - ) - qp = self._enqueue_egress() - supervise_cli.approve( - qp, - final_file='{"host": "edited.example"}\n', - notes="tweaked", - ) - self.assertEqual(['{"host": "edited.example"}\n'], calls) - - def test_apply_failure_blocks_response_and_audit(self): - supervise_cli.add_route = lambda slug, content: (_ for _ in ()).throw( # type: ignore - EgressApplyError("docker exec failed") - ) - qp = self._enqueue_egress() - with self.assertRaises(EgressApplyError): - supervise_cli.approve(qp) - # No response file (proposal stays pending). - self.assertEqual( - [qp.proposal.id], - [p.id for p in supervise.list_pending_proposals(qp.queue_dir)], - ) - # No audit entry. - self.assertEqual([], read_audit_entries("egress", "dev")) - - def test_real_diff_lands_in_audit(self): - supervise_cli.add_route = lambda slug, content: ( # type: ignore - '{"routes": []}\n', # before - '{"routes": [{"host": "new.example"}]}\n', # after - ) - qp = self._enqueue_egress(proposed='{"host": "new.example"}\n') - supervise_cli.approve(qp) - entries = read_audit_entries("egress", "dev") - self.assertEqual(1, len(entries)) - self.assertIn('+{"routes": [{"host": "new.example"}]}', entries[0].diff) - self.assertIn('-{"routes": []}', entries[0].diff) - - def test_reject_does_not_call_apply(self): - qp = self._enqueue_egress() - supervise_cli.reject(qp, reason="no thanks") - # Reject still writes a response + audit entry with empty diff. - resp = read_response(qp.queue_dir, qp.proposal.id) - self.assertEqual(STATUS_REJECTED, resp.status) - entries = read_audit_entries("egress", "dev") - self.assertEqual(1, len(entries)) - self.assertEqual("", entries[0].diff) - - class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase): """PRD 0016 Phase 3: approve() on a capability-block proposal calls apply_capability_change, archives the proposal afterward @@ -328,17 +213,12 @@ class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase): supervise_cli.apply_capability_change = lambda slug, content: ("FROM old\n", content) # type: ignore qp = self._enqueue_capability() supervise_cli.approve(qp) - # capability-block has no audit log per PRD 0013 — its record - # lives in the per-bottle Dockerfile + transcript state. self.assertEqual([], read_audit_entries("egress", "dev")) def test_proposal_archived_after_apply(self): supervise_cli.apply_capability_change = lambda slug, content: ("FROM old\n", content) # type: ignore qp = self._enqueue_capability() supervise_cli.approve(qp) - # Sidecar would normally archive after delivering the response, - # but it's gone by then. The supervise TUI archives so - # discover_pending stops surfacing the resolved proposal. self.assertEqual([], supervise.list_pending_proposals(qp.queue_dir)) processed = list((qp.queue_dir / "processed").glob("*.json")) self.assertEqual(2, len(processed)) @@ -346,20 +226,8 @@ class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase): class TestEditInEditor(unittest.TestCase): def test_runs_editor_returns_edited_content(self): - # Fake "editor" is /bin/sh -c 'cat < $1 ... EOF' original_editor = os.environ.get("EDITOR") try: - # Use a fake editor that overwrites the file with a known - # marker. EDITOR is split with shlex equivalence by - # subprocess.run when invoked as a list — keep it as a - # single program path that takes the file as argv[1]. - os.environ["EDITOR"] = ( - "/bin/sh -c 'printf %s \"edited\" > \"$0\"'" - ) - # subprocess.run with the str as the first list element - # would try to find a binary literally named "/bin/sh -c ..." - # — that won't work. Use shell mode trick: wrap in a script. - # Easier: build a tiny helper script. with tempfile.NamedTemporaryFile( mode="w", suffix=".sh", delete=False, prefix="fake-editor.", ) as script: @@ -381,7 +249,6 @@ class TestEditInEditor(unittest.TestCase): def test_returns_none_when_unchanged(self): original_editor = os.environ.get("EDITOR") try: - # No-op editor: touch the file (leaves it unchanged). with tempfile.NamedTemporaryFile( mode="w", suffix=".sh", delete=False, prefix="noop-editor.", ) as script: @@ -445,7 +312,6 @@ class TestCapabilityBlockSmolmachinesGuard(_FakeHomeMixin, unittest.TestCase): supervise_cli.approve(qp) # must not raise def test_no_metadata_falls_through_to_docker_path(self): - # No metadata at all → assume Docker (backward-compatible). qp = self._enqueue_capability("dev") supervise_cli.approve(qp) # must not raise diff --git a/tests/unit/test_supervise_server.py b/tests/unit/test_supervise_server.py index 8f63eb6..aed68d4 100644 --- a/tests/unit/test_supervise_server.py +++ b/tests/unit/test_supervise_server.py @@ -141,7 +141,6 @@ class TestHandleToolsList(unittest.TestCase): names = [t["name"] for t in result["tools"]] # type: ignore[index] self.assertEqual( sorted([ - _sv.TOOL_EGRESS_BLOCK, _sv.TOOL_CAPABILITY_BLOCK, _sv.TOOL_LIST_EGRESS_ROUTES, ]), @@ -206,10 +205,10 @@ class TestHandleToolsCall(unittest.TestCase): try: result = handle_tools_call( { - "name": _sv.TOOL_EGRESS_BLOCK, + "name": _sv.TOOL_CAPABILITY_BLOCK, "arguments": { - "host": "example.com", - "justification": "need a route", + "dockerfile": "FROM python:3.13\n", + "justification": "need git", }, }, self.config, @@ -250,8 +249,8 @@ class TestHandleToolsCall(unittest.TestCase): with self.assertRaises(_RpcError): handle_tools_call( { - "name": _sv.TOOL_EGRESS_BLOCK, - "arguments": {"host": "example.com"}, + "name": _sv.TOOL_CAPABILITY_BLOCK, + "arguments": {"dockerfile": "FROM python:3.13\n"}, }, self.config, ) @@ -261,9 +260,9 @@ class TestHandleToolsCall(unittest.TestCase): try: handle_tools_call( { - "name": _sv.TOOL_EGRESS_BLOCK, + "name": _sv.TOOL_CAPABILITY_BLOCK, "arguments": { - "host": "example.com", + "dockerfile": "FROM python:3.13\n", "justification": "x", }, }, @@ -285,10 +284,10 @@ class TestHandleToolsCall(unittest.TestCase): ) result = handle_tools_call( { - "name": _sv.TOOL_EGRESS_BLOCK, + "name": _sv.TOOL_CAPABILITY_BLOCK, "arguments": { - "host": "example.com", - "justification": "need a route", + "dockerfile": "FROM python:3.13\n", + "justification": "need a capability", }, }, config, @@ -412,7 +411,8 @@ class TestHttpEndToEnd(unittest.TestCase): self.assertEqual("2.0", result["jsonrpc"]) self.assertEqual(1, result["id"]) names = [t["name"] for t in result["result"]["tools"]] # type: ignore[index] - self.assertIn(_sv.TOOL_EGRESS_BLOCK, names) + self.assertIn(_sv.TOOL_CAPABILITY_BLOCK, names) + self.assertNotIn("egress-block", names) def test_unknown_method_returns_jsonrpc_error(self): result = self._post_jsonrpc(