feat(supervise)!: remove egress-block MCP tool and runtime route-mutation
lint / lint (push) Successful in 1m23s
test / unit (pull_request) Successful in 31s
test / integration (pull_request) Successful in 44s

Drops `egress-block` from the supervise sidecar, removes
`_merge_single_route`, `add_route`, and `apply_routes_change` from
egress_apply.py, and strips the proposal/approve/reject flow for egress
from the supervise CLI. The list-egress-routes and capability-block tools
are unaffected. Tests updated throughout.

Closes #198
This commit is contained in:
2026-06-06 16:41:57 +00:00
parent 8c0a9c5bc6
commit c6080be4f4
8 changed files with 63 additions and 668 deletions
+5 -191
View File
@@ -1,70 +1,20 @@
"""Host-side helper to apply a routes.yaml change to a running
egress sidecar (PRD 0014 retargeted by PRD 0017 chunk 3, PRD 0053).
"""Host-side helper for egress sidecar inspection (issue #198).
Used by the supervise dashboard when the operator approves an
egress-block proposal. Fetches current routes.yaml, validates,
writes into the sidecar, then SIGHUPs to reload.
`_merge_single_route`, `add_route`, and `apply_routes_change` were
removed when the egress-block MCP tool was dropped. The remaining
helpers support runtime inspection and validation of the routes file
without modifying it at runtime.
"""
from __future__ import annotations
import json
import subprocess
from pathlib import Path
from typing import cast
from ...egress import EGRESS_ROUTES_IN_CONTAINER
from ...egress_addon_core import load_routes
from ...yaml_subset import YamlSubsetError, parse_yaml_subset
from .bottle_state import egress_state_dir
from .sidecar_bundle import sidecar_bundle_container_name
def _render_routes_payload(routes_list: list[dict[str, object]]) -> str:
"""Render a list-of-dicts routes payload as YAML matching the
shape `egress_render_routes` produces."""
if not routes_list:
return "routes: []\n"
lines: list[str] = ["routes:"]
for entry in routes_list:
host = str(entry.get("host", ""))
lines.append(f' - host: "{host}"')
auth_scheme = entry.get("auth_scheme")
token_env = entry.get("token_env")
if auth_scheme and token_env:
lines.append(f' auth_scheme: "{auth_scheme}"')
lines.append(f' token_env: "{token_env}"')
matches_obj = entry.get("matches")
if isinstance(matches_obj, list) and matches_obj:
lines.append(" matches:")
for match_entry in matches_obj:
me = cast(dict[str, object], match_entry)
first_key = True
if "paths" in me:
lines.append(" - paths:")
first_key = False
for pd in cast(list[dict[str, str]], me["paths"]):
if "type" in pd:
lines.append(f' - type: "{pd["type"]}"')
lines.append(f' value: "{pd["value"]}"')
else:
lines.append(f' - value: "{pd["value"]}"')
if "methods" in me:
methods_str = ", ".join(
f'"{m}"' for m in cast(list[str], me["methods"])
)
prefix = " - " if first_key else " "
lines.append(f'{prefix}methods: [{methods_str}]')
first_key = False
if first_key:
lines.append(" - {}")
return "\n".join(lines) + "\n"
def _egress_routes_host_path(slug: str) -> Path:
return egress_state_dir(slug) / "egress_routes.yaml"
class EgressApplyError(RuntimeError):
pass
@@ -92,144 +42,8 @@ def validate_routes_content(content: str) -> None:
) from e
def apply_routes_change(slug: str, new_content: str) -> tuple[str, str]:
container = sidecar_bundle_container_name(slug)
before = fetch_current_routes(slug)
validate_routes_content(new_content)
target = _egress_routes_host_path(slug)
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(new_content)
target.chmod(0o644)
sig = subprocess.run(
["docker", "kill", "--signal", "HUP", container],
capture_output=True, text=True, check=False,
)
if sig.returncode != 0:
raise EgressApplyError(
f"failed to SIGHUP {container}: "
f"{(sig.stderr or '').strip()}"
)
return before, new_content
def _merge_single_route(
current_yaml: str, new_route: dict[str, object],
) -> str:
"""Merge a single proposed route into the current routes.yaml.
- Host absent → append the route.
- Host present → union the match paths (proposed existing).
Auth is preserved from existing route.
"""
try:
cfg = parse_yaml_subset(current_yaml)
except YamlSubsetError as e:
raise EgressApplyError(
f"current routes.yaml is not valid YAML: {e}"
) from e
routes = cfg.get("routes")
if not isinstance(routes, list):
raise EgressApplyError(
"current routes.yaml: 'routes' is not a list"
)
routes_typed = cast(list[object], routes)
new_host = str(new_route.get("host", "")).lower()
if not new_host:
raise EgressApplyError(
"proposed route is missing 'host'"
)
# Build proposed matches from the input
proposed_matches = new_route.get("matches")
if proposed_matches is None:
# Accept legacy path_allowlist from agent proposals and convert
proposed_paths = new_route.get("path_allowlist")
if isinstance(proposed_paths, list) and proposed_paths:
proposed_matches = [{"paths": [{"value": p} for p in proposed_paths]}]
for entry in routes_typed:
if not isinstance(entry, dict):
continue
entry_typed = cast(dict[str, object], entry)
if str(entry_typed.get("host", "")).lower() == new_host:
# Merge matches: union path values from proposed into existing
if isinstance(proposed_matches, list) and proposed_matches:
existing_matches = entry_typed.get("matches")
if not isinstance(existing_matches, list):
existing_matches = []
# Simple merge: collect all existing path values, add new ones
existing_paths: set[str] = set()
for me in existing_matches:
me_typed = cast(dict[str, object], me) if isinstance(me, dict) else {}
paths = me_typed.get("paths")
if isinstance(paths, list):
for p in paths:
p_typed = cast(dict[str, object], p) if isinstance(p, dict) else {}
val = p_typed.get("value")
if isinstance(val, str):
existing_paths.add(val)
new_paths: list[str] = []
for me in proposed_matches:
me_typed = cast(dict[str, object], me) if isinstance(me, dict) else {}
paths = me_typed.get("paths")
if isinstance(paths, list):
for p in paths:
p_typed = cast(dict[str, object], p) if isinstance(p, dict) else {}
val = p_typed.get("value")
if isinstance(val, str) and val not in existing_paths:
new_paths.append(val)
existing_paths.add(val)
if new_paths:
existing_matches.append(
{"paths": [{"value": p} for p in new_paths]}
)
entry_typed["matches"] = existing_matches
break
else:
entry_typed: dict[str, object] = {"host": new_route.get("host")} # type: ignore
if isinstance(proposed_matches, list) and proposed_matches:
entry_typed["matches"] = proposed_matches
auth = new_route.get("auth")
if isinstance(auth, dict) and auth.get("scheme") and auth.get("token_ref"): # type: ignore
auth_typed = cast(dict[str, object], auth)
existing_slots = sorted({
str(r_entry.get("token_env", ""))
for r_entry_obj in routes_typed
if isinstance(r_entry_obj, dict)
for r_entry in [cast(dict[str, object], r_entry_obj)]
if r_entry.get("token_env")
})
next_idx = len(existing_slots)
entry_typed["auth_scheme"] = str(cast(object, auth_typed.get("scheme")))
entry_typed["token_env"] = f"EGRESS_TOKEN_{next_idx}"
routes_typed.append(entry_typed)
return _render_routes_payload(cast(list[dict[str, object]], routes_typed))
def add_route(slug: str, proposed_route_json: str) -> tuple[str, str]:
try:
proposed = json.loads(proposed_route_json)
except json.JSONDecodeError as e:
raise EgressApplyError(
f"proposed route is not valid JSON: {e}"
) from e
if not isinstance(proposed, dict):
raise EgressApplyError(
"proposed route must be a JSON object"
)
current = fetch_current_routes(slug)
merged = _merge_single_route(current, proposed)
return apply_routes_change(slug, merged)
__all__ = [
"EgressApplyError",
"add_route",
"apply_routes_change",
"fetch_current_routes",
"validate_routes_content",
]
+5 -14
View File
@@ -2,9 +2,8 @@
act on them (approve / modify / reject).
Curses-based TUI; modify-then-approve shells out to $EDITOR. The
approval handlers wire to the per-tool remediation engines:
PRD 0014 (egress) writes routes.yaml + SIGHUPs egress; PRD 0016
(capability) rebuilds the bottle Dockerfile.
approval handler wires to PRD 0016 (capability-block), which rebuilds
the bottle Dockerfile. The egress-block tool was removed in issue #198.
"""
from __future__ import annotations
@@ -26,7 +25,6 @@ from ..backend.docker.capability_apply import (
CapabilityApplyError,
apply_capability_change,
)
from ..backend.docker.egress_apply import EgressApplyError, add_route
from ..log import Die, error, info
from ..supervise import (
COMPONENT_FOR_TOOL,
@@ -37,7 +35,6 @@ from ..supervise import (
STATUS_MODIFIED,
STATUS_REJECTED,
TOOL_CAPABILITY_BLOCK,
TOOL_EGRESS_BLOCK,
archive_proposal,
list_pending_proposals,
render_diff,
@@ -61,7 +58,7 @@ class QueuedProposal:
# Errors any remediation engine may raise. Caught by the TUI key
# handlers and surfaced in the status line so a failed apply keeps
# the proposal pending rather than crashing curses.
ApplyError = (EgressApplyError, CapabilityApplyError)
ApplyError = (CapabilityApplyError,)
def discover_pending() -> list[QueuedProposal]:
@@ -82,9 +79,7 @@ def discover_pending() -> list[QueuedProposal]:
def _approval_status(qp: QueuedProposal, verb: str) -> str:
"""Status-line text after a successful approval."""
base = f"{verb} {qp.proposal.tool} for [{qp.proposal.bottle_slug}]"
if qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
return f"{base}; resume: ./cli.py resume {qp.proposal.bottle_slug}"
return base
return f"{base}; resume: ./cli.py resume {qp.proposal.bottle_slug}"
def _detail_lines(
@@ -132,11 +127,7 @@ def approve(
file_to_apply = final_file if final_file is not None else qp.proposal.proposed_file
diff_before, diff_after = "", ""
if qp.proposal.tool == TOOL_EGRESS_BLOCK:
diff_before, diff_after = add_route(
qp.proposal.bottle_slug, file_to_apply,
)
elif qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
if qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
_meta = read_metadata(qp.proposal.bottle_slug)
if _meta is not None and not _meta.compose_project:
raise CapabilityApplyError(
+2 -7
View File
@@ -48,11 +48,9 @@ from pathlib import Path
SUPERVISE_HOSTNAME = "supervise"
SUPERVISE_PORT = 9100
TOOL_EGRESS_BLOCK = "egress-block"
TOOL_CAPABILITY_BLOCK = "capability-block"
TOOL_LIST_EGRESS_ROUTES = "list-egress-routes"
TOOLS: tuple[str, ...] = (
TOOL_EGRESS_BLOCK,
TOOL_CAPABILITY_BLOCK,
TOOL_LIST_EGRESS_ROUTES,
)
@@ -70,10 +68,8 @@ EGRESS_INTROSPECT_URL = "http://_egress.local/allowlist"
# capability-block has no on-disk config the operator edits in place
# (the Dockerfile is rebuilt, not patched), so it has no audit log
# here — those changes are captured by git history + the rebuild
# record laid down in PRD 0016.
COMPONENT_FOR_TOOL: dict[str, str] = {
TOOL_EGRESS_BLOCK: "egress",
}
# record laid down in PRD 0016. egress-block was removed in issue #198.
COMPONENT_FOR_TOOL: dict[str, str] = {}
STATUS_APPROVED = "approved"
STATUS_MODIFIED = "modified"
@@ -555,7 +551,6 @@ __all__ = [
"EGRESS_FORWARD_PROXY",
"EGRESS_INTROSPECT_URL",
"TOOL_CAPABILITY_BLOCK",
"TOOL_EGRESS_BLOCK",
"TOOL_LIST_EGRESS_ROUTES",
"archive_proposal",
"audit_dir",
+6 -142
View File
@@ -1,8 +1,10 @@
"""Supervise sidecar HTTP server (PRD 0013).
Per-bottle MCP server exposing two tools — `egress-block`,
`capability-block` — that the agent calls to propose config changes
when stuck. Each tool call:
Per-bottle MCP server exposing tools the agent calls to propose config
changes when stuck. The egress-block tool was removed in issue #198;
the remaining tools are `capability-block` and `list-egress-routes`.
Each queued tool call:
1. Validates the proposed file syntactically.
2. Writes a Proposal to /run/supervise/queue/ (bind-mounted from
@@ -133,69 +135,6 @@ def jsonrpc_error(request_id: object, code: int, message: str) -> bytes:
TOOL_DEFINITIONS: list[dict[str, object]] = [
{
"name": _sv.TOOL_EGRESS_BLOCK,
"description": (
"Call when egress refused your HTTPS request — host "
"without a matching route, or a request that did not match "
"the route's matches rules (typically a 403 from the "
"proxy). Propose a SINGLE route to add: the host you "
"need + (optionally) a path_allowlist of path prefixes + "
"(optionally) an auth block. The supervisor merges the "
"route into the live table at approval time — you do NOT "
"need to see or reproduce the existing routes. If the "
"host already has a route, the proposed paths are unioned "
"with the existing ones (host stays single-route). The "
"operator approves or rejects in the supervise TUI. On "
"approval the supervisor writes the merged routes.yaml "
"and SIGHUPs egress (no dropped connections)."
),
"inputSchema": {
"type": "object",
"properties": {
"host": {
"type": "string",
"description": (
"The hostname to allow (e.g. 'api.github.com'). "
"Case-insensitive on match."
),
},
"path_allowlist": {
"type": "array",
"items": {"type": "string"},
"description": (
"Optional URL path prefixes the route permits. "
"Each must start with '/'. Omit to allow all "
"paths under this host (bare-pass route). "
"Internally converted to matches entries."
),
},
"auth": {
"type": "object",
"description": (
"Optional credential injection. {scheme, "
"token_ref}: scheme is 'Bearer' or 'token'; "
"token_ref names the host env var holding the "
"secret value. Omit to add a host without "
"credential injection. Ignored if the host "
"already has a route (operator decides auth "
"changes, not the agent)."
),
"properties": {
"scheme": {"type": "string"},
"token_ref": {"type": "string"},
},
"required": ["scheme", "token_ref"],
"additionalProperties": False,
},
"justification": {
"type": "string",
"description": "Why this host needs to be allowed.",
},
},
"required": ["host", "justification"],
},
},
{
"name": _sv.TOOL_LIST_EGRESS_ROUTES,
"description": (
@@ -254,11 +193,6 @@ PROPOSED_FILE_FIELD: dict[str, str] = {
# --- Validation ------------------------------------------------------------
# Auth schemes accepted on egress-block proposals — match the
# manifest-side EGRESS_AUTH_SCHEMES.
_AUTH_SCHEMES = ("Bearer", "token")
def validate_proposed_file(tool: str, content: str) -> None:
"""Syntactic validation. The operator is the real gate; this just
catches obvious paste-errors / wrong-tool selections before they
@@ -273,70 +207,6 @@ def validate_proposed_file(tool: str, content: str) -> None:
raise _RpcError(ERR_INVALID_PARAMS, f"unknown tool {tool!r}")
def _validate_and_bundle_egress_route(
args: dict[str, object],
) -> str:
"""Validate egress-block input fields and bundle them into
a JSON string that becomes the Proposal.proposed_file. Raises
_RpcError on bad input — the agent retries with a fixed shape."""
tool = _sv.TOOL_EGRESS_BLOCK
host = args.get("host")
if not isinstance(host, str) or not host.strip():
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: 'host' is required and must be a non-empty string",
)
payload: dict[str, object] = {"host": host}
path_allow_raw = args.get("path_allowlist")
if path_allow_raw is not None:
if not isinstance(path_allow_raw, list):
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: 'path_allowlist' must be an array of strings",
)
prefixes: list[str] = []
for i, p in enumerate(path_allow_raw):
if not isinstance(p, str):
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: path_allowlist[{i}] must be a string",
)
if not p.startswith("/"):
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: path_allowlist[{i}] {p!r} must start with '/'",
)
prefixes.append(p)
if prefixes:
payload["path_allowlist"] = prefixes
auth_raw = args.get("auth")
if auth_raw is not None:
if not isinstance(auth_raw, dict):
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: 'auth' must be an object with 'scheme' and 'token_ref'",
)
scheme = auth_raw.get("scheme")
token_ref = auth_raw.get("token_ref")
if not isinstance(scheme, str) or scheme not in _AUTH_SCHEMES:
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: auth.scheme must be one of "
f"{', '.join(_AUTH_SCHEMES)} (got {scheme!r})",
)
if not isinstance(token_ref, str) or not token_ref:
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: auth.token_ref must be a non-empty string "
f"naming the host env var holding the token",
)
payload["auth"] = {"scheme": scheme, "token_ref": token_ref}
return json.dumps(payload, indent=2) + "\n"
# --- MCP handlers ----------------------------------------------------------
@@ -422,13 +292,7 @@ def handle_tools_call(
f"{name}: 'justification' is required and must be a non-empty string",
)
if name == _sv.TOOL_EGRESS_BLOCK:
# Structured input → JSON bundle on Proposal.proposed_file.
# The dashboard's apply step (egress_apply.add_route)
# parses this JSON, fetches the current routes, merges in
# the new one, and writes the merged file.
proposed_file = _validate_and_bundle_egress_route(args_raw)
elif name in PROPOSED_FILE_FIELD:
if name in PROPOSED_FILE_FIELD:
file_field = PROPOSED_FILE_FIELD[name]
proposed_file = args_raw.get(file_field)
if not isinstance(proposed_file, str):
+3 -136
View File
@@ -1,27 +1,19 @@
"""Unit: validate_routes_content (PRD 0014 retargeted by PRD 0017
chunk 3, PRD 0053). docker exec / cp / kill paths are covered by the
integration test."""
"""Unit: validate_routes_content (issue #198: _merge_single_route and
add_route removed; docker exec / cp / kill paths are covered by the
integration test)."""
import unittest
from bot_bottle.backend.docker.egress_apply import (
EgressApplyError,
_merge_single_route,
validate_routes_content,
)
from bot_bottle.yaml_subset import parse_yaml_subset
_ROUTES_EMPTY = "routes: []\n"
_ROUTES_ONE = 'routes:\n - host: "api.anthropic.com"\n'
def _routes(parsed: str) -> list[dict]: # type: ignore
"""Parse a YAML routes string and pull out the routes list, so
tests can assert on shape directly."""
return parse_yaml_subset(parsed)["routes"] # type: ignore
class TestValidateRoutesContent(unittest.TestCase):
def test_accepts_minimal_route_table(self):
validate_routes_content(_ROUTES_EMPTY)
@@ -60,130 +52,5 @@ class TestValidateRoutesContent(unittest.TestCase):
)
class TestMergeSingleRoute(unittest.TestCase):
BASE = _ROUTES_ONE
def test_appends_route_when_host_absent(self):
merged = _merge_single_route(self.BASE, {"host": "github.com"})
hosts = [r["host"] for r in _routes(merged)]
self.assertEqual(["api.anthropic.com", "github.com"], hosts)
def test_appends_matches(self):
merged = _merge_single_route(
self.BASE,
{"host": "github.com", "matches": [
{"paths": [{"value": "/repos/x/"}]}
]},
)
new_route = _routes(merged)[-1]
self.assertIn("matches", new_route)
def test_appends_legacy_path_allowlist_as_matches(self):
merged = _merge_single_route(
self.BASE,
{"host": "github.com", "path_allowlist": ["/repos/x/"]},
)
new_route = _routes(merged)[-1]
self.assertIn("matches", new_route)
def test_appends_auth_with_token_env_slot(self):
merged = _merge_single_route(
self.BASE,
{
"host": "api.github.com",
"auth": {"scheme": "Bearer", "token_ref": "GH"},
},
)
new_route = _routes(merged)[-1]
self.assertEqual("Bearer", new_route["auth_scheme"])
self.assertEqual("EGRESS_TOKEN_0", new_route["token_env"])
def test_auth_slot_increments_past_existing(self):
base = (
'routes:\n'
' - host: "api.anthropic.com"\n'
' auth_scheme: "Bearer"\n'
' token_env: "EGRESS_TOKEN_0"\n'
)
merged = _merge_single_route(base, {
"host": "api.github.com",
"auth": {"scheme": "Bearer", "token_ref": "GH"},
})
new_route = _routes(merged)[-1]
self.assertEqual("EGRESS_TOKEN_1", new_route["token_env"])
def test_existing_host_merges_match_paths_as_union(self):
base = (
'routes:\n'
' - host: "github.com"\n'
' matches:\n'
' - paths:\n'
' - value: "/a/"\n'
)
merged = _merge_single_route(base, {
"host": "github.com",
"matches": [{"paths": [{"value": "/b/"}]}],
})
routes = _routes(merged)
self.assertEqual(1, len(routes))
all_paths: list[str] = []
for me in routes[0].get("matches", []):
for p in me.get("paths", []):
all_paths.append(p["value"])
self.assertIn("/a/", all_paths)
self.assertIn("/b/", all_paths)
def test_existing_host_dedup_match_paths(self):
base = (
'routes:\n'
' - host: "github.com"\n'
' matches:\n'
' - paths:\n'
' - value: "/a/"\n'
)
merged = _merge_single_route(base, {
"host": "github.com",
"matches": [{"paths": [{"value": "/a/"}, {"value": "/b/"}]}],
})
all_paths: list[str] = []
for me in _routes(merged)[0].get("matches", []):
for p in me.get("paths", []):
all_paths.append(p["value"])
self.assertEqual(1, all_paths.count("/a/"))
self.assertIn("/b/", all_paths)
def test_existing_host_preserves_existing_auth_ignores_proposed(self):
base = (
'routes:\n'
' - host: "api.github.com"\n'
' auth_scheme: "Bearer"\n'
' token_env: "EGRESS_TOKEN_0"\n'
)
merged = _merge_single_route(base, {
"host": "api.github.com",
"auth": {"scheme": "token", "token_ref": "DIFFERENT"},
})
route = _routes(merged)[0]
self.assertEqual("Bearer", route["auth_scheme"])
self.assertEqual("EGRESS_TOKEN_0", route["token_env"])
def test_host_match_is_case_insensitive(self):
base = 'routes:\n - host: "GitHub.com"\n'
merged = _merge_single_route(base, {
"host": "github.com",
"matches": [{"paths": [{"value": "/x/"}]}],
})
routes = _routes(merged)
self.assertEqual(1, len(routes))
def test_missing_host_raises(self):
with self.assertRaises(EgressApplyError):
_merge_single_route(self.BASE, {})
def test_invalid_current_yaml_raises(self):
with self.assertRaises(EgressApplyError):
_merge_single_route("routes:\n\tbad", {"host": "x.example"})
if __name__ == "__main__":
unittest.main()
+12 -14
View File
@@ -17,7 +17,6 @@ from bot_bottle.supervise import (
STATUS_MODIFIED,
STATUS_REJECTED,
TOOL_CAPABILITY_BLOCK,
TOOL_EGRESS_BLOCK,
archive_proposal,
audit_log_path,
list_pending_proposals,
@@ -37,16 +36,16 @@ FIXED_TS = datetime(2026, 5, 25, 12, 0, 0, tzinfo=timezone.utc)
def _proposal(
tool: str = TOOL_EGRESS_BLOCK,
proposed: str = "{}",
justification: str = "need a route",
tool: str = TOOL_CAPABILITY_BLOCK,
proposed: str = "FROM python:3.13\n",
justification: str = "need a capability",
) -> Proposal:
return Proposal.new(
bottle_slug="dev",
tool=tool,
proposed_file=proposed,
justification=justification,
current_file_hash=sha256_hex("{}"),
current_file_hash=sha256_hex(proposed),
now=FIXED_TS,
)
@@ -57,7 +56,7 @@ class TestProposalRoundtrip(unittest.TestCase):
self.assertTrue(p.id)
self.assertEqual("2026-05-25T12:00:00+00:00", p.arrival_timestamp)
self.assertEqual("dev", p.bottle_slug)
self.assertEqual(TOOL_EGRESS_BLOCK, p.tool)
self.assertEqual(TOOL_CAPABILITY_BLOCK, p.tool)
def test_to_from_dict_roundtrip(self):
p = _proposal()
@@ -142,14 +141,14 @@ class TestQueueIO(unittest.TestCase):
def test_list_pending_sorted_by_arrival(self):
# Fabricate two with explicit timestamps.
a = Proposal.new(
bottle_slug="dev", tool=TOOL_EGRESS_BLOCK,
proposed_file="{}", justification="early",
bottle_slug="dev", tool=TOOL_CAPABILITY_BLOCK,
proposed_file="FROM python:3.13\n", justification="early",
current_file_hash="x",
now=datetime(2026, 5, 25, 10, 0, 0, tzinfo=timezone.utc),
)
b = Proposal.new(
bottle_slug="dev", tool=TOOL_EGRESS_BLOCK,
proposed_file="{}", justification="late",
bottle_slug="dev", tool=TOOL_CAPABILITY_BLOCK,
proposed_file="FROM python:3.13\n", justification="late",
current_file_hash="x",
now=datetime(2026, 5, 25, 14, 0, 0, tzinfo=timezone.utc),
)
@@ -318,16 +317,15 @@ class TestToolConstants(unittest.TestCase):
def test_tools_tuple_matches_individual_constants(self):
self.assertEqual(
(
TOOL_EGRESS_BLOCK,
TOOL_CAPABILITY_BLOCK,
supervise.TOOL_LIST_EGRESS_ROUTES,
),
supervise.TOOLS,
)
def test_component_map_covers_egress_remediation_only(self):
self.assertIn(TOOL_EGRESS_BLOCK, supervise.COMPONENT_FOR_TOOL)
self.assertNotIn(TOOL_CAPABILITY_BLOCK, supervise.COMPONENT_FOR_TOOL)
def test_component_map_has_no_entries(self):
# egress-block removed in issue #198; capability-block never had one.
self.assertEqual({}, supervise.COMPONENT_FOR_TOOL)
class _StubSupervise(supervise.Supervise):
+18 -152
View File
@@ -1,12 +1,10 @@
"""Unit: supervise headless paths (PRD 0013 phase 4, PRD 0014).
"""Unit: supervise headless paths (PRD 0013 phase 4, PRD 0016).
The curses TUI itself isn't exercised here — these tests cover the
discovery + approve/reject + audit-write paths that the TUI's key
handlers call into.
discovery + approve/reject paths that the TUI's key handlers call into.
add_route is stubbed at the supervise CLI module level so the tests
don't need a running egress sidecar; the real docker exec/cp/SIGHUP
plumbing is covered by the integration test.
egress-block (add_route) was removed in issue #198; the TestEgressApplyWiring
class and all stubs for add_route have been dropped accordingly.
"""
import os
@@ -17,7 +15,6 @@ from pathlib import Path
from bot_bottle import supervise
from bot_bottle.backend.docker.capability_apply import CapabilityApplyError
from bot_bottle.backend.docker.egress_apply import EgressApplyError
from bot_bottle.cli import supervise as supervise_cli
from bot_bottle.supervise import (
Proposal,
@@ -25,7 +22,6 @@ from bot_bottle.supervise import (
STATUS_MODIFIED,
STATUS_REJECTED,
TOOL_CAPABILITY_BLOCK,
TOOL_EGRESS_BLOCK,
read_audit_entries,
read_response,
sha256_hex,
@@ -35,9 +31,8 @@ from bot_bottle.supervise import (
FIXED = datetime(2026, 5, 25, 12, 0, 0, tzinfo=timezone.utc)
def _proposal(slug: str = "dev", tool: str = TOOL_EGRESS_BLOCK) -> Proposal:
def _proposal(slug: str = "dev", tool: str = TOOL_CAPABILITY_BLOCK) -> Proposal:
payloads = {
TOOL_EGRESS_BLOCK: '{"routes": []}\n',
TOOL_CAPABILITY_BLOCK: "FROM python:3.13\n",
}
payload = payloads.get(tool, "")
@@ -88,14 +83,14 @@ class TestDiscoverPending(_FakeHomeMixin, unittest.TestCase):
def test_sorted_by_arrival_across_bottles(self):
early = Proposal.new(
bottle_slug="api", tool=TOOL_EGRESS_BLOCK,
proposed_file="{}", justification="early",
bottle_slug="api", tool=TOOL_CAPABILITY_BLOCK,
proposed_file="FROM python:3.13\n", justification="early",
current_file_hash="h",
now=datetime(2026, 5, 25, 10, 0, 0, tzinfo=timezone.utc),
)
late = Proposal.new(
bottle_slug="dev", tool=TOOL_EGRESS_BLOCK,
proposed_file="{}", justification="late",
bottle_slug="dev", tool=TOOL_CAPABILITY_BLOCK,
proposed_file="FROM python:3.13\n", justification="late",
current_file_hash="h",
now=datetime(2026, 5, 25, 14, 0, 0, tzinfo=timezone.utc),
)
@@ -120,48 +115,38 @@ class TestDiscoverPending(_FakeHomeMixin, unittest.TestCase):
class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
def setUp(self):
self._setup_fake_home()
self._original_add_route = supervise_cli.add_route
self._original_apply_capability = supervise_cli.apply_capability_change
# Default stubs: succeed with deterministic before/after so the
# audit log shows a non-empty diff.
supervise_cli.add_route = lambda slug, content: ( # type: ignore
'{"routes": []}\n', '{"routes": [{"host": "x"}]}\n',
)
supervise_cli.apply_capability_change = lambda slug, content: ( # type: ignore
"FROM old\n", content,
)
def tearDown(self):
supervise_cli.add_route = self._original_add_route
supervise_cli.apply_capability_change = self._original_apply_capability
self._teardown_fake_home()
def _enqueue(self, tool: str = TOOL_EGRESS_BLOCK):
def _enqueue(self, tool: str = TOOL_CAPABILITY_BLOCK):
p = _proposal(tool=tool)
qdir = supervise.queue_dir_for_slug("dev")
qdir.mkdir(parents=True, exist_ok=True)
supervise.write_proposal(qdir, p)
return supervise_cli.QueuedProposal(proposal=p, queue_dir=qdir)
def test_approve_writes_response_and_audit(self):
def test_approve_writes_response(self):
qp = self._enqueue()
supervise_cli.approve(qp)
resp = read_response(qp.queue_dir, qp.proposal.id)
# capability-block is archived on approve, so the response file
# moves to processed/ before the caller can read it.
resp = read_response(qp.queue_dir / "processed", qp.proposal.id)
self.assertEqual(STATUS_APPROVED, resp.status)
self.assertIsNone(resp.final_file)
entries = read_audit_entries("egress", "dev")
self.assertEqual(1, len(entries))
self.assertEqual("approved", entries[0].operator_action)
def test_approve_with_final_file_marks_modified(self):
qp = self._enqueue()
supervise_cli.approve(qp, final_file='{"routes": [{"path": "/x/"}]}\n', notes="tweaked")
resp = read_response(qp.queue_dir, qp.proposal.id)
supervise_cli.approve(qp, final_file="FROM bookworm\n", notes="tweaked")
resp = read_response(qp.queue_dir / "processed", qp.proposal.id)
self.assertEqual(STATUS_MODIFIED, resp.status)
self.assertEqual('{"routes": [{"path": "/x/"}]}\n', resp.final_file)
self.assertEqual("FROM bookworm\n", resp.final_file)
self.assertEqual("tweaked", resp.notes)
entries = read_audit_entries("egress", "dev")
self.assertEqual("modified", entries[0].operator_action)
def test_reject_writes_rejection(self):
qp = self._enqueue()
@@ -169,113 +154,13 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
resp = read_response(qp.queue_dir, qp.proposal.id)
self.assertEqual(STATUS_REJECTED, resp.status)
self.assertEqual("nope", resp.notes)
entries = read_audit_entries("egress", "dev")
self.assertEqual("rejected", entries[0].operator_action)
self.assertEqual("nope", entries[0].operator_notes)
def test_capability_block_skips_audit_log(self):
def test_no_audit_log_for_capability_block(self):
qp = self._enqueue(tool=TOOL_CAPABILITY_BLOCK)
supervise_cli.approve(qp)
# No audit log for capability-block (per PRD 0013 / 0016).
self.assertEqual([], read_audit_entries("egress", "dev"))
class TestEgressApplyWiring(_FakeHomeMixin, unittest.TestCase):
"""PRD 0017 chunk 3: approve() on an egress-block proposal
must call add_route (single-route merge) with the right args
and surface its failures."""
def setUp(self):
self._setup_fake_home()
self._original_add_route = supervise_cli.add_route
def tearDown(self):
supervise_cli.add_route = self._original_add_route
self._teardown_fake_home()
def _enqueue_egress(self, proposed: str = '{"host": "x.example"}\n'):
p = Proposal.new(
bottle_slug="dev", tool=TOOL_EGRESS_BLOCK,
proposed_file=proposed,
justification="need a route",
current_file_hash=sha256_hex(proposed),
now=FIXED,
)
qdir = supervise.queue_dir_for_slug("dev")
qdir.mkdir(parents=True, exist_ok=True)
supervise.write_proposal(qdir, p)
return supervise_cli.QueuedProposal(proposal=p, queue_dir=qdir)
def test_egress_block_calls_add_route_with_proposed_json(self):
calls = []
supervise_cli.add_route = lambda slug, content: ( # type: ignore
calls.append((slug, content)) or ("before", "after")
)
qp = self._enqueue_egress(
proposed='{"host": "new.example", "path_allowlist": ["/x/"]}\n'
)
supervise_cli.approve(qp)
self.assertEqual(1, len(calls))
slug, content = calls[0]
self.assertEqual("dev", slug)
# The single-route JSON the agent proposed reaches add_route
# unchanged — add_route fetches current state + merges.
self.assertEqual(
'{"host": "new.example", "path_allowlist": ["/x/"]}\n',
content,
)
def test_modify_passes_final_file_to_add_route(self):
calls = []
supervise_cli.add_route = lambda slug, content: ( # type: ignore
calls.append(content) or ("before", "after")
)
qp = self._enqueue_egress()
supervise_cli.approve(
qp,
final_file='{"host": "edited.example"}\n',
notes="tweaked",
)
self.assertEqual(['{"host": "edited.example"}\n'], calls)
def test_apply_failure_blocks_response_and_audit(self):
supervise_cli.add_route = lambda slug, content: (_ for _ in ()).throw( # type: ignore
EgressApplyError("docker exec failed")
)
qp = self._enqueue_egress()
with self.assertRaises(EgressApplyError):
supervise_cli.approve(qp)
# No response file (proposal stays pending).
self.assertEqual(
[qp.proposal.id],
[p.id for p in supervise.list_pending_proposals(qp.queue_dir)],
)
# No audit entry.
self.assertEqual([], read_audit_entries("egress", "dev"))
def test_real_diff_lands_in_audit(self):
supervise_cli.add_route = lambda slug, content: ( # type: ignore
'{"routes": []}\n', # before
'{"routes": [{"host": "new.example"}]}\n', # after
)
qp = self._enqueue_egress(proposed='{"host": "new.example"}\n')
supervise_cli.approve(qp)
entries = read_audit_entries("egress", "dev")
self.assertEqual(1, len(entries))
self.assertIn('+{"routes": [{"host": "new.example"}]}', entries[0].diff)
self.assertIn('-{"routes": []}', entries[0].diff)
def test_reject_does_not_call_apply(self):
qp = self._enqueue_egress()
supervise_cli.reject(qp, reason="no thanks")
# Reject still writes a response + audit entry with empty diff.
resp = read_response(qp.queue_dir, qp.proposal.id)
self.assertEqual(STATUS_REJECTED, resp.status)
entries = read_audit_entries("egress", "dev")
self.assertEqual(1, len(entries))
self.assertEqual("", entries[0].diff)
class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
"""PRD 0016 Phase 3: approve() on a capability-block proposal
calls apply_capability_change, archives the proposal afterward
@@ -328,17 +213,12 @@ class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
supervise_cli.apply_capability_change = lambda slug, content: ("FROM old\n", content) # type: ignore
qp = self._enqueue_capability()
supervise_cli.approve(qp)
# capability-block has no audit log per PRD 0013 — its record
# lives in the per-bottle Dockerfile + transcript state.
self.assertEqual([], read_audit_entries("egress", "dev"))
def test_proposal_archived_after_apply(self):
supervise_cli.apply_capability_change = lambda slug, content: ("FROM old\n", content) # type: ignore
qp = self._enqueue_capability()
supervise_cli.approve(qp)
# Sidecar would normally archive after delivering the response,
# but it's gone by then. The supervise TUI archives so
# discover_pending stops surfacing the resolved proposal.
self.assertEqual([], supervise.list_pending_proposals(qp.queue_dir))
processed = list((qp.queue_dir / "processed").glob("*.json"))
self.assertEqual(2, len(processed))
@@ -346,20 +226,8 @@ class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
class TestEditInEditor(unittest.TestCase):
def test_runs_editor_returns_edited_content(self):
# Fake "editor" is /bin/sh -c 'cat <<EOF > $1 ... EOF'
original_editor = os.environ.get("EDITOR")
try:
# Use a fake editor that overwrites the file with a known
# marker. EDITOR is split with shlex equivalence by
# subprocess.run when invoked as a list — keep it as a
# single program path that takes the file as argv[1].
os.environ["EDITOR"] = (
"/bin/sh -c 'printf %s \"edited\" > \"$0\"'"
)
# subprocess.run with the str as the first list element
# would try to find a binary literally named "/bin/sh -c ..."
# — that won't work. Use shell mode trick: wrap in a script.
# Easier: build a tiny helper script.
with tempfile.NamedTemporaryFile(
mode="w", suffix=".sh", delete=False, prefix="fake-editor.",
) as script:
@@ -381,7 +249,6 @@ class TestEditInEditor(unittest.TestCase):
def test_returns_none_when_unchanged(self):
original_editor = os.environ.get("EDITOR")
try:
# No-op editor: touch the file (leaves it unchanged).
with tempfile.NamedTemporaryFile(
mode="w", suffix=".sh", delete=False, prefix="noop-editor.",
) as script:
@@ -445,7 +312,6 @@ class TestCapabilityBlockSmolmachinesGuard(_FakeHomeMixin, unittest.TestCase):
supervise_cli.approve(qp) # must not raise
def test_no_metadata_falls_through_to_docker_path(self):
# No metadata at all → assume Docker (backward-compatible).
qp = self._enqueue_capability("dev")
supervise_cli.approve(qp) # must not raise
+12 -12
View File
@@ -141,7 +141,6 @@ class TestHandleToolsList(unittest.TestCase):
names = [t["name"] for t in result["tools"]] # type: ignore[index]
self.assertEqual(
sorted([
_sv.TOOL_EGRESS_BLOCK,
_sv.TOOL_CAPABILITY_BLOCK,
_sv.TOOL_LIST_EGRESS_ROUTES,
]),
@@ -206,10 +205,10 @@ class TestHandleToolsCall(unittest.TestCase):
try:
result = handle_tools_call(
{
"name": _sv.TOOL_EGRESS_BLOCK,
"name": _sv.TOOL_CAPABILITY_BLOCK,
"arguments": {
"host": "example.com",
"justification": "need a route",
"dockerfile": "FROM python:3.13\n",
"justification": "need git",
},
},
self.config,
@@ -250,8 +249,8 @@ class TestHandleToolsCall(unittest.TestCase):
with self.assertRaises(_RpcError):
handle_tools_call(
{
"name": _sv.TOOL_EGRESS_BLOCK,
"arguments": {"host": "example.com"},
"name": _sv.TOOL_CAPABILITY_BLOCK,
"arguments": {"dockerfile": "FROM python:3.13\n"},
},
self.config,
)
@@ -261,9 +260,9 @@ class TestHandleToolsCall(unittest.TestCase):
try:
handle_tools_call(
{
"name": _sv.TOOL_EGRESS_BLOCK,
"name": _sv.TOOL_CAPABILITY_BLOCK,
"arguments": {
"host": "example.com",
"dockerfile": "FROM python:3.13\n",
"justification": "x",
},
},
@@ -285,10 +284,10 @@ class TestHandleToolsCall(unittest.TestCase):
)
result = handle_tools_call(
{
"name": _sv.TOOL_EGRESS_BLOCK,
"name": _sv.TOOL_CAPABILITY_BLOCK,
"arguments": {
"host": "example.com",
"justification": "need a route",
"dockerfile": "FROM python:3.13\n",
"justification": "need a capability",
},
},
config,
@@ -412,7 +411,8 @@ class TestHttpEndToEnd(unittest.TestCase):
self.assertEqual("2.0", result["jsonrpc"])
self.assertEqual(1, result["id"])
names = [t["name"] for t in result["result"]["tools"]] # type: ignore[index]
self.assertIn(_sv.TOOL_EGRESS_BLOCK, names)
self.assertIn(_sv.TOOL_CAPABILITY_BLOCK, names)
self.assertNotIn("egress-block", names)
def test_unknown_method_returns_jsonrpc_error(self):
result = self._post_jsonrpc(