feat(supervise)!: remove egress-block MCP tool and runtime route-mutation
Drops `egress-block` from the supervise sidecar, removes `_merge_single_route`, `add_route`, and `apply_routes_change` from egress_apply.py, and strips the proposal/approve/reject flow for egress from the supervise CLI. The list-egress-routes and capability-block tools are unaffected. Tests updated throughout. Closes #198
This commit is contained in:
@@ -1,70 +1,20 @@
|
||||
"""Host-side helper to apply a routes.yaml change to a running
|
||||
egress sidecar (PRD 0014 retargeted by PRD 0017 chunk 3, PRD 0053).
|
||||
"""Host-side helper for egress sidecar inspection (issue #198).
|
||||
|
||||
Used by the supervise dashboard when the operator approves an
|
||||
egress-block proposal. Fetches current routes.yaml, validates,
|
||||
writes into the sidecar, then SIGHUPs to reload.
|
||||
`_merge_single_route`, `add_route`, and `apply_routes_change` were
|
||||
removed when the egress-block MCP tool was dropped. The remaining
|
||||
helpers support runtime inspection and validation of the routes file
|
||||
without modifying it at runtime.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from typing import cast
|
||||
|
||||
from ...egress import EGRESS_ROUTES_IN_CONTAINER
|
||||
from ...egress_addon_core import load_routes
|
||||
from ...yaml_subset import YamlSubsetError, parse_yaml_subset
|
||||
from .bottle_state import egress_state_dir
|
||||
from .sidecar_bundle import sidecar_bundle_container_name
|
||||
|
||||
|
||||
def _render_routes_payload(routes_list: list[dict[str, object]]) -> str:
|
||||
"""Render a list-of-dicts routes payload as YAML matching the
|
||||
shape `egress_render_routes` produces."""
|
||||
if not routes_list:
|
||||
return "routes: []\n"
|
||||
lines: list[str] = ["routes:"]
|
||||
for entry in routes_list:
|
||||
host = str(entry.get("host", ""))
|
||||
lines.append(f' - host: "{host}"')
|
||||
auth_scheme = entry.get("auth_scheme")
|
||||
token_env = entry.get("token_env")
|
||||
if auth_scheme and token_env:
|
||||
lines.append(f' auth_scheme: "{auth_scheme}"')
|
||||
lines.append(f' token_env: "{token_env}"')
|
||||
matches_obj = entry.get("matches")
|
||||
if isinstance(matches_obj, list) and matches_obj:
|
||||
lines.append(" matches:")
|
||||
for match_entry in matches_obj:
|
||||
me = cast(dict[str, object], match_entry)
|
||||
first_key = True
|
||||
if "paths" in me:
|
||||
lines.append(" - paths:")
|
||||
first_key = False
|
||||
for pd in cast(list[dict[str, str]], me["paths"]):
|
||||
if "type" in pd:
|
||||
lines.append(f' - type: "{pd["type"]}"')
|
||||
lines.append(f' value: "{pd["value"]}"')
|
||||
else:
|
||||
lines.append(f' - value: "{pd["value"]}"')
|
||||
if "methods" in me:
|
||||
methods_str = ", ".join(
|
||||
f'"{m}"' for m in cast(list[str], me["methods"])
|
||||
)
|
||||
prefix = " - " if first_key else " "
|
||||
lines.append(f'{prefix}methods: [{methods_str}]')
|
||||
first_key = False
|
||||
if first_key:
|
||||
lines.append(" - {}")
|
||||
return "\n".join(lines) + "\n"
|
||||
|
||||
|
||||
def _egress_routes_host_path(slug: str) -> Path:
|
||||
return egress_state_dir(slug) / "egress_routes.yaml"
|
||||
|
||||
|
||||
class EgressApplyError(RuntimeError):
|
||||
pass
|
||||
|
||||
@@ -92,144 +42,8 @@ def validate_routes_content(content: str) -> None:
|
||||
) from e
|
||||
|
||||
|
||||
def apply_routes_change(slug: str, new_content: str) -> tuple[str, str]:
|
||||
container = sidecar_bundle_container_name(slug)
|
||||
before = fetch_current_routes(slug)
|
||||
validate_routes_content(new_content)
|
||||
|
||||
target = _egress_routes_host_path(slug)
|
||||
target.parent.mkdir(parents=True, exist_ok=True)
|
||||
target.write_text(new_content)
|
||||
target.chmod(0o644)
|
||||
sig = subprocess.run(
|
||||
["docker", "kill", "--signal", "HUP", container],
|
||||
capture_output=True, text=True, check=False,
|
||||
)
|
||||
if sig.returncode != 0:
|
||||
raise EgressApplyError(
|
||||
f"failed to SIGHUP {container}: "
|
||||
f"{(sig.stderr or '').strip()}"
|
||||
)
|
||||
|
||||
return before, new_content
|
||||
|
||||
|
||||
def _merge_single_route(
|
||||
current_yaml: str, new_route: dict[str, object],
|
||||
) -> str:
|
||||
"""Merge a single proposed route into the current routes.yaml.
|
||||
|
||||
- Host absent → append the route.
|
||||
- Host present → union the match paths (proposed ∪ existing).
|
||||
Auth is preserved from existing route.
|
||||
"""
|
||||
try:
|
||||
cfg = parse_yaml_subset(current_yaml)
|
||||
except YamlSubsetError as e:
|
||||
raise EgressApplyError(
|
||||
f"current routes.yaml is not valid YAML: {e}"
|
||||
) from e
|
||||
routes = cfg.get("routes")
|
||||
if not isinstance(routes, list):
|
||||
raise EgressApplyError(
|
||||
"current routes.yaml: 'routes' is not a list"
|
||||
)
|
||||
routes_typed = cast(list[object], routes)
|
||||
|
||||
new_host = str(new_route.get("host", "")).lower()
|
||||
if not new_host:
|
||||
raise EgressApplyError(
|
||||
"proposed route is missing 'host'"
|
||||
)
|
||||
|
||||
# Build proposed matches from the input
|
||||
proposed_matches = new_route.get("matches")
|
||||
if proposed_matches is None:
|
||||
# Accept legacy path_allowlist from agent proposals and convert
|
||||
proposed_paths = new_route.get("path_allowlist")
|
||||
if isinstance(proposed_paths, list) and proposed_paths:
|
||||
proposed_matches = [{"paths": [{"value": p} for p in proposed_paths]}]
|
||||
|
||||
for entry in routes_typed:
|
||||
if not isinstance(entry, dict):
|
||||
continue
|
||||
entry_typed = cast(dict[str, object], entry)
|
||||
if str(entry_typed.get("host", "")).lower() == new_host:
|
||||
# Merge matches: union path values from proposed into existing
|
||||
if isinstance(proposed_matches, list) and proposed_matches:
|
||||
existing_matches = entry_typed.get("matches")
|
||||
if not isinstance(existing_matches, list):
|
||||
existing_matches = []
|
||||
# Simple merge: collect all existing path values, add new ones
|
||||
existing_paths: set[str] = set()
|
||||
for me in existing_matches:
|
||||
me_typed = cast(dict[str, object], me) if isinstance(me, dict) else {}
|
||||
paths = me_typed.get("paths")
|
||||
if isinstance(paths, list):
|
||||
for p in paths:
|
||||
p_typed = cast(dict[str, object], p) if isinstance(p, dict) else {}
|
||||
val = p_typed.get("value")
|
||||
if isinstance(val, str):
|
||||
existing_paths.add(val)
|
||||
new_paths: list[str] = []
|
||||
for me in proposed_matches:
|
||||
me_typed = cast(dict[str, object], me) if isinstance(me, dict) else {}
|
||||
paths = me_typed.get("paths")
|
||||
if isinstance(paths, list):
|
||||
for p in paths:
|
||||
p_typed = cast(dict[str, object], p) if isinstance(p, dict) else {}
|
||||
val = p_typed.get("value")
|
||||
if isinstance(val, str) and val not in existing_paths:
|
||||
new_paths.append(val)
|
||||
existing_paths.add(val)
|
||||
if new_paths:
|
||||
existing_matches.append(
|
||||
{"paths": [{"value": p} for p in new_paths]}
|
||||
)
|
||||
entry_typed["matches"] = existing_matches
|
||||
break
|
||||
else:
|
||||
entry_typed: dict[str, object] = {"host": new_route.get("host")} # type: ignore
|
||||
if isinstance(proposed_matches, list) and proposed_matches:
|
||||
entry_typed["matches"] = proposed_matches
|
||||
auth = new_route.get("auth")
|
||||
if isinstance(auth, dict) and auth.get("scheme") and auth.get("token_ref"): # type: ignore
|
||||
auth_typed = cast(dict[str, object], auth)
|
||||
existing_slots = sorted({
|
||||
str(r_entry.get("token_env", ""))
|
||||
for r_entry_obj in routes_typed
|
||||
if isinstance(r_entry_obj, dict)
|
||||
for r_entry in [cast(dict[str, object], r_entry_obj)]
|
||||
if r_entry.get("token_env")
|
||||
})
|
||||
next_idx = len(existing_slots)
|
||||
entry_typed["auth_scheme"] = str(cast(object, auth_typed.get("scheme")))
|
||||
entry_typed["token_env"] = f"EGRESS_TOKEN_{next_idx}"
|
||||
routes_typed.append(entry_typed)
|
||||
|
||||
return _render_routes_payload(cast(list[dict[str, object]], routes_typed))
|
||||
|
||||
|
||||
def add_route(slug: str, proposed_route_json: str) -> tuple[str, str]:
|
||||
try:
|
||||
proposed = json.loads(proposed_route_json)
|
||||
except json.JSONDecodeError as e:
|
||||
raise EgressApplyError(
|
||||
f"proposed route is not valid JSON: {e}"
|
||||
) from e
|
||||
if not isinstance(proposed, dict):
|
||||
raise EgressApplyError(
|
||||
"proposed route must be a JSON object"
|
||||
)
|
||||
current = fetch_current_routes(slug)
|
||||
merged = _merge_single_route(current, proposed)
|
||||
return apply_routes_change(slug, merged)
|
||||
|
||||
|
||||
__all__ = [
|
||||
"EgressApplyError",
|
||||
"add_route",
|
||||
"apply_routes_change",
|
||||
"fetch_current_routes",
|
||||
"validate_routes_content",
|
||||
]
|
||||
|
||||
@@ -2,9 +2,8 @@
|
||||
act on them (approve / modify / reject).
|
||||
|
||||
Curses-based TUI; modify-then-approve shells out to $EDITOR. The
|
||||
approval handlers wire to the per-tool remediation engines:
|
||||
PRD 0014 (egress) writes routes.yaml + SIGHUPs egress; PRD 0016
|
||||
(capability) rebuilds the bottle Dockerfile.
|
||||
approval handler wires to PRD 0016 (capability-block), which rebuilds
|
||||
the bottle Dockerfile. The egress-block tool was removed in issue #198.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -26,7 +25,6 @@ from ..backend.docker.capability_apply import (
|
||||
CapabilityApplyError,
|
||||
apply_capability_change,
|
||||
)
|
||||
from ..backend.docker.egress_apply import EgressApplyError, add_route
|
||||
from ..log import Die, error, info
|
||||
from ..supervise import (
|
||||
COMPONENT_FOR_TOOL,
|
||||
@@ -37,7 +35,6 @@ from ..supervise import (
|
||||
STATUS_MODIFIED,
|
||||
STATUS_REJECTED,
|
||||
TOOL_CAPABILITY_BLOCK,
|
||||
TOOL_EGRESS_BLOCK,
|
||||
archive_proposal,
|
||||
list_pending_proposals,
|
||||
render_diff,
|
||||
@@ -61,7 +58,7 @@ class QueuedProposal:
|
||||
# Errors any remediation engine may raise. Caught by the TUI key
|
||||
# handlers and surfaced in the status line so a failed apply keeps
|
||||
# the proposal pending rather than crashing curses.
|
||||
ApplyError = (EgressApplyError, CapabilityApplyError)
|
||||
ApplyError = (CapabilityApplyError,)
|
||||
|
||||
|
||||
def discover_pending() -> list[QueuedProposal]:
|
||||
@@ -82,9 +79,7 @@ def discover_pending() -> list[QueuedProposal]:
|
||||
def _approval_status(qp: QueuedProposal, verb: str) -> str:
|
||||
"""Status-line text after a successful approval."""
|
||||
base = f"{verb} {qp.proposal.tool} for [{qp.proposal.bottle_slug}]"
|
||||
if qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
|
||||
return f"{base}; resume: ./cli.py resume {qp.proposal.bottle_slug}"
|
||||
return base
|
||||
return f"{base}; resume: ./cli.py resume {qp.proposal.bottle_slug}"
|
||||
|
||||
|
||||
def _detail_lines(
|
||||
@@ -132,11 +127,7 @@ def approve(
|
||||
file_to_apply = final_file if final_file is not None else qp.proposal.proposed_file
|
||||
|
||||
diff_before, diff_after = "", ""
|
||||
if qp.proposal.tool == TOOL_EGRESS_BLOCK:
|
||||
diff_before, diff_after = add_route(
|
||||
qp.proposal.bottle_slug, file_to_apply,
|
||||
)
|
||||
elif qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
|
||||
if qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
|
||||
_meta = read_metadata(qp.proposal.bottle_slug)
|
||||
if _meta is not None and not _meta.compose_project:
|
||||
raise CapabilityApplyError(
|
||||
|
||||
@@ -48,11 +48,9 @@ from pathlib import Path
|
||||
SUPERVISE_HOSTNAME = "supervise"
|
||||
SUPERVISE_PORT = 9100
|
||||
|
||||
TOOL_EGRESS_BLOCK = "egress-block"
|
||||
TOOL_CAPABILITY_BLOCK = "capability-block"
|
||||
TOOL_LIST_EGRESS_ROUTES = "list-egress-routes"
|
||||
TOOLS: tuple[str, ...] = (
|
||||
TOOL_EGRESS_BLOCK,
|
||||
TOOL_CAPABILITY_BLOCK,
|
||||
TOOL_LIST_EGRESS_ROUTES,
|
||||
)
|
||||
@@ -70,10 +68,8 @@ EGRESS_INTROSPECT_URL = "http://_egress.local/allowlist"
|
||||
# capability-block has no on-disk config the operator edits in place
|
||||
# (the Dockerfile is rebuilt, not patched), so it has no audit log
|
||||
# here — those changes are captured by git history + the rebuild
|
||||
# record laid down in PRD 0016.
|
||||
COMPONENT_FOR_TOOL: dict[str, str] = {
|
||||
TOOL_EGRESS_BLOCK: "egress",
|
||||
}
|
||||
# record laid down in PRD 0016. egress-block was removed in issue #198.
|
||||
COMPONENT_FOR_TOOL: dict[str, str] = {}
|
||||
|
||||
STATUS_APPROVED = "approved"
|
||||
STATUS_MODIFIED = "modified"
|
||||
@@ -555,7 +551,6 @@ __all__ = [
|
||||
"EGRESS_FORWARD_PROXY",
|
||||
"EGRESS_INTROSPECT_URL",
|
||||
"TOOL_CAPABILITY_BLOCK",
|
||||
"TOOL_EGRESS_BLOCK",
|
||||
"TOOL_LIST_EGRESS_ROUTES",
|
||||
"archive_proposal",
|
||||
"audit_dir",
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
"""Supervise sidecar HTTP server (PRD 0013).
|
||||
|
||||
Per-bottle MCP server exposing two tools — `egress-block`,
|
||||
`capability-block` — that the agent calls to propose config changes
|
||||
when stuck. Each tool call:
|
||||
Per-bottle MCP server exposing tools the agent calls to propose config
|
||||
changes when stuck. The egress-block tool was removed in issue #198;
|
||||
the remaining tools are `capability-block` and `list-egress-routes`.
|
||||
|
||||
Each queued tool call:
|
||||
|
||||
1. Validates the proposed file syntactically.
|
||||
2. Writes a Proposal to /run/supervise/queue/ (bind-mounted from
|
||||
@@ -133,69 +135,6 @@ def jsonrpc_error(request_id: object, code: int, message: str) -> bytes:
|
||||
|
||||
|
||||
TOOL_DEFINITIONS: list[dict[str, object]] = [
|
||||
{
|
||||
"name": _sv.TOOL_EGRESS_BLOCK,
|
||||
"description": (
|
||||
"Call when egress refused your HTTPS request — host "
|
||||
"without a matching route, or a request that did not match "
|
||||
"the route's matches rules (typically a 403 from the "
|
||||
"proxy). Propose a SINGLE route to add: the host you "
|
||||
"need + (optionally) a path_allowlist of path prefixes + "
|
||||
"(optionally) an auth block. The supervisor merges the "
|
||||
"route into the live table at approval time — you do NOT "
|
||||
"need to see or reproduce the existing routes. If the "
|
||||
"host already has a route, the proposed paths are unioned "
|
||||
"with the existing ones (host stays single-route). The "
|
||||
"operator approves or rejects in the supervise TUI. On "
|
||||
"approval the supervisor writes the merged routes.yaml "
|
||||
"and SIGHUPs egress (no dropped connections)."
|
||||
),
|
||||
"inputSchema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"host": {
|
||||
"type": "string",
|
||||
"description": (
|
||||
"The hostname to allow (e.g. 'api.github.com'). "
|
||||
"Case-insensitive on match."
|
||||
),
|
||||
},
|
||||
"path_allowlist": {
|
||||
"type": "array",
|
||||
"items": {"type": "string"},
|
||||
"description": (
|
||||
"Optional URL path prefixes the route permits. "
|
||||
"Each must start with '/'. Omit to allow all "
|
||||
"paths under this host (bare-pass route). "
|
||||
"Internally converted to matches entries."
|
||||
),
|
||||
},
|
||||
"auth": {
|
||||
"type": "object",
|
||||
"description": (
|
||||
"Optional credential injection. {scheme, "
|
||||
"token_ref}: scheme is 'Bearer' or 'token'; "
|
||||
"token_ref names the host env var holding the "
|
||||
"secret value. Omit to add a host without "
|
||||
"credential injection. Ignored if the host "
|
||||
"already has a route (operator decides auth "
|
||||
"changes, not the agent)."
|
||||
),
|
||||
"properties": {
|
||||
"scheme": {"type": "string"},
|
||||
"token_ref": {"type": "string"},
|
||||
},
|
||||
"required": ["scheme", "token_ref"],
|
||||
"additionalProperties": False,
|
||||
},
|
||||
"justification": {
|
||||
"type": "string",
|
||||
"description": "Why this host needs to be allowed.",
|
||||
},
|
||||
},
|
||||
"required": ["host", "justification"],
|
||||
},
|
||||
},
|
||||
{
|
||||
"name": _sv.TOOL_LIST_EGRESS_ROUTES,
|
||||
"description": (
|
||||
@@ -254,11 +193,6 @@ PROPOSED_FILE_FIELD: dict[str, str] = {
|
||||
# --- Validation ------------------------------------------------------------
|
||||
|
||||
|
||||
# Auth schemes accepted on egress-block proposals — match the
|
||||
# manifest-side EGRESS_AUTH_SCHEMES.
|
||||
_AUTH_SCHEMES = ("Bearer", "token")
|
||||
|
||||
|
||||
def validate_proposed_file(tool: str, content: str) -> None:
|
||||
"""Syntactic validation. The operator is the real gate; this just
|
||||
catches obvious paste-errors / wrong-tool selections before they
|
||||
@@ -273,70 +207,6 @@ def validate_proposed_file(tool: str, content: str) -> None:
|
||||
raise _RpcError(ERR_INVALID_PARAMS, f"unknown tool {tool!r}")
|
||||
|
||||
|
||||
def _validate_and_bundle_egress_route(
|
||||
args: dict[str, object],
|
||||
) -> str:
|
||||
"""Validate egress-block input fields and bundle them into
|
||||
a JSON string that becomes the Proposal.proposed_file. Raises
|
||||
_RpcError on bad input — the agent retries with a fixed shape."""
|
||||
tool = _sv.TOOL_EGRESS_BLOCK
|
||||
host = args.get("host")
|
||||
if not isinstance(host, str) or not host.strip():
|
||||
raise _RpcError(
|
||||
ERR_INVALID_PARAMS,
|
||||
f"{tool}: 'host' is required and must be a non-empty string",
|
||||
)
|
||||
payload: dict[str, object] = {"host": host}
|
||||
|
||||
path_allow_raw = args.get("path_allowlist")
|
||||
if path_allow_raw is not None:
|
||||
if not isinstance(path_allow_raw, list):
|
||||
raise _RpcError(
|
||||
ERR_INVALID_PARAMS,
|
||||
f"{tool}: 'path_allowlist' must be an array of strings",
|
||||
)
|
||||
prefixes: list[str] = []
|
||||
for i, p in enumerate(path_allow_raw):
|
||||
if not isinstance(p, str):
|
||||
raise _RpcError(
|
||||
ERR_INVALID_PARAMS,
|
||||
f"{tool}: path_allowlist[{i}] must be a string",
|
||||
)
|
||||
if not p.startswith("/"):
|
||||
raise _RpcError(
|
||||
ERR_INVALID_PARAMS,
|
||||
f"{tool}: path_allowlist[{i}] {p!r} must start with '/'",
|
||||
)
|
||||
prefixes.append(p)
|
||||
if prefixes:
|
||||
payload["path_allowlist"] = prefixes
|
||||
|
||||
auth_raw = args.get("auth")
|
||||
if auth_raw is not None:
|
||||
if not isinstance(auth_raw, dict):
|
||||
raise _RpcError(
|
||||
ERR_INVALID_PARAMS,
|
||||
f"{tool}: 'auth' must be an object with 'scheme' and 'token_ref'",
|
||||
)
|
||||
scheme = auth_raw.get("scheme")
|
||||
token_ref = auth_raw.get("token_ref")
|
||||
if not isinstance(scheme, str) or scheme not in _AUTH_SCHEMES:
|
||||
raise _RpcError(
|
||||
ERR_INVALID_PARAMS,
|
||||
f"{tool}: auth.scheme must be one of "
|
||||
f"{', '.join(_AUTH_SCHEMES)} (got {scheme!r})",
|
||||
)
|
||||
if not isinstance(token_ref, str) or not token_ref:
|
||||
raise _RpcError(
|
||||
ERR_INVALID_PARAMS,
|
||||
f"{tool}: auth.token_ref must be a non-empty string "
|
||||
f"naming the host env var holding the token",
|
||||
)
|
||||
payload["auth"] = {"scheme": scheme, "token_ref": token_ref}
|
||||
|
||||
return json.dumps(payload, indent=2) + "\n"
|
||||
|
||||
|
||||
# --- MCP handlers ----------------------------------------------------------
|
||||
|
||||
|
||||
@@ -422,13 +292,7 @@ def handle_tools_call(
|
||||
f"{name}: 'justification' is required and must be a non-empty string",
|
||||
)
|
||||
|
||||
if name == _sv.TOOL_EGRESS_BLOCK:
|
||||
# Structured input → JSON bundle on Proposal.proposed_file.
|
||||
# The dashboard's apply step (egress_apply.add_route)
|
||||
# parses this JSON, fetches the current routes, merges in
|
||||
# the new one, and writes the merged file.
|
||||
proposed_file = _validate_and_bundle_egress_route(args_raw)
|
||||
elif name in PROPOSED_FILE_FIELD:
|
||||
if name in PROPOSED_FILE_FIELD:
|
||||
file_field = PROPOSED_FILE_FIELD[name]
|
||||
proposed_file = args_raw.get(file_field)
|
||||
if not isinstance(proposed_file, str):
|
||||
|
||||
@@ -1,27 +1,19 @@
|
||||
"""Unit: validate_routes_content (PRD 0014 retargeted by PRD 0017
|
||||
chunk 3, PRD 0053). docker exec / cp / kill paths are covered by the
|
||||
integration test."""
|
||||
"""Unit: validate_routes_content (issue #198: _merge_single_route and
|
||||
add_route removed; docker exec / cp / kill paths are covered by the
|
||||
integration test)."""
|
||||
|
||||
import unittest
|
||||
|
||||
from bot_bottle.backend.docker.egress_apply import (
|
||||
EgressApplyError,
|
||||
_merge_single_route,
|
||||
validate_routes_content,
|
||||
)
|
||||
from bot_bottle.yaml_subset import parse_yaml_subset
|
||||
|
||||
|
||||
_ROUTES_EMPTY = "routes: []\n"
|
||||
_ROUTES_ONE = 'routes:\n - host: "api.anthropic.com"\n'
|
||||
|
||||
|
||||
def _routes(parsed: str) -> list[dict]: # type: ignore
|
||||
"""Parse a YAML routes string and pull out the routes list, so
|
||||
tests can assert on shape directly."""
|
||||
return parse_yaml_subset(parsed)["routes"] # type: ignore
|
||||
|
||||
|
||||
class TestValidateRoutesContent(unittest.TestCase):
|
||||
def test_accepts_minimal_route_table(self):
|
||||
validate_routes_content(_ROUTES_EMPTY)
|
||||
@@ -60,130 +52,5 @@ class TestValidateRoutesContent(unittest.TestCase):
|
||||
)
|
||||
|
||||
|
||||
class TestMergeSingleRoute(unittest.TestCase):
|
||||
BASE = _ROUTES_ONE
|
||||
|
||||
def test_appends_route_when_host_absent(self):
|
||||
merged = _merge_single_route(self.BASE, {"host": "github.com"})
|
||||
hosts = [r["host"] for r in _routes(merged)]
|
||||
self.assertEqual(["api.anthropic.com", "github.com"], hosts)
|
||||
|
||||
def test_appends_matches(self):
|
||||
merged = _merge_single_route(
|
||||
self.BASE,
|
||||
{"host": "github.com", "matches": [
|
||||
{"paths": [{"value": "/repos/x/"}]}
|
||||
]},
|
||||
)
|
||||
new_route = _routes(merged)[-1]
|
||||
self.assertIn("matches", new_route)
|
||||
|
||||
def test_appends_legacy_path_allowlist_as_matches(self):
|
||||
merged = _merge_single_route(
|
||||
self.BASE,
|
||||
{"host": "github.com", "path_allowlist": ["/repos/x/"]},
|
||||
)
|
||||
new_route = _routes(merged)[-1]
|
||||
self.assertIn("matches", new_route)
|
||||
|
||||
def test_appends_auth_with_token_env_slot(self):
|
||||
merged = _merge_single_route(
|
||||
self.BASE,
|
||||
{
|
||||
"host": "api.github.com",
|
||||
"auth": {"scheme": "Bearer", "token_ref": "GH"},
|
||||
},
|
||||
)
|
||||
new_route = _routes(merged)[-1]
|
||||
self.assertEqual("Bearer", new_route["auth_scheme"])
|
||||
self.assertEqual("EGRESS_TOKEN_0", new_route["token_env"])
|
||||
|
||||
def test_auth_slot_increments_past_existing(self):
|
||||
base = (
|
||||
'routes:\n'
|
||||
' - host: "api.anthropic.com"\n'
|
||||
' auth_scheme: "Bearer"\n'
|
||||
' token_env: "EGRESS_TOKEN_0"\n'
|
||||
)
|
||||
merged = _merge_single_route(base, {
|
||||
"host": "api.github.com",
|
||||
"auth": {"scheme": "Bearer", "token_ref": "GH"},
|
||||
})
|
||||
new_route = _routes(merged)[-1]
|
||||
self.assertEqual("EGRESS_TOKEN_1", new_route["token_env"])
|
||||
|
||||
def test_existing_host_merges_match_paths_as_union(self):
|
||||
base = (
|
||||
'routes:\n'
|
||||
' - host: "github.com"\n'
|
||||
' matches:\n'
|
||||
' - paths:\n'
|
||||
' - value: "/a/"\n'
|
||||
)
|
||||
merged = _merge_single_route(base, {
|
||||
"host": "github.com",
|
||||
"matches": [{"paths": [{"value": "/b/"}]}],
|
||||
})
|
||||
routes = _routes(merged)
|
||||
self.assertEqual(1, len(routes))
|
||||
all_paths: list[str] = []
|
||||
for me in routes[0].get("matches", []):
|
||||
for p in me.get("paths", []):
|
||||
all_paths.append(p["value"])
|
||||
self.assertIn("/a/", all_paths)
|
||||
self.assertIn("/b/", all_paths)
|
||||
|
||||
def test_existing_host_dedup_match_paths(self):
|
||||
base = (
|
||||
'routes:\n'
|
||||
' - host: "github.com"\n'
|
||||
' matches:\n'
|
||||
' - paths:\n'
|
||||
' - value: "/a/"\n'
|
||||
)
|
||||
merged = _merge_single_route(base, {
|
||||
"host": "github.com",
|
||||
"matches": [{"paths": [{"value": "/a/"}, {"value": "/b/"}]}],
|
||||
})
|
||||
all_paths: list[str] = []
|
||||
for me in _routes(merged)[0].get("matches", []):
|
||||
for p in me.get("paths", []):
|
||||
all_paths.append(p["value"])
|
||||
self.assertEqual(1, all_paths.count("/a/"))
|
||||
self.assertIn("/b/", all_paths)
|
||||
|
||||
def test_existing_host_preserves_existing_auth_ignores_proposed(self):
|
||||
base = (
|
||||
'routes:\n'
|
||||
' - host: "api.github.com"\n'
|
||||
' auth_scheme: "Bearer"\n'
|
||||
' token_env: "EGRESS_TOKEN_0"\n'
|
||||
)
|
||||
merged = _merge_single_route(base, {
|
||||
"host": "api.github.com",
|
||||
"auth": {"scheme": "token", "token_ref": "DIFFERENT"},
|
||||
})
|
||||
route = _routes(merged)[0]
|
||||
self.assertEqual("Bearer", route["auth_scheme"])
|
||||
self.assertEqual("EGRESS_TOKEN_0", route["token_env"])
|
||||
|
||||
def test_host_match_is_case_insensitive(self):
|
||||
base = 'routes:\n - host: "GitHub.com"\n'
|
||||
merged = _merge_single_route(base, {
|
||||
"host": "github.com",
|
||||
"matches": [{"paths": [{"value": "/x/"}]}],
|
||||
})
|
||||
routes = _routes(merged)
|
||||
self.assertEqual(1, len(routes))
|
||||
|
||||
def test_missing_host_raises(self):
|
||||
with self.assertRaises(EgressApplyError):
|
||||
_merge_single_route(self.BASE, {})
|
||||
|
||||
def test_invalid_current_yaml_raises(self):
|
||||
with self.assertRaises(EgressApplyError):
|
||||
_merge_single_route("routes:\n\tbad", {"host": "x.example"})
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
@@ -17,7 +17,6 @@ from bot_bottle.supervise import (
|
||||
STATUS_MODIFIED,
|
||||
STATUS_REJECTED,
|
||||
TOOL_CAPABILITY_BLOCK,
|
||||
TOOL_EGRESS_BLOCK,
|
||||
archive_proposal,
|
||||
audit_log_path,
|
||||
list_pending_proposals,
|
||||
@@ -37,16 +36,16 @@ FIXED_TS = datetime(2026, 5, 25, 12, 0, 0, tzinfo=timezone.utc)
|
||||
|
||||
|
||||
def _proposal(
|
||||
tool: str = TOOL_EGRESS_BLOCK,
|
||||
proposed: str = "{}",
|
||||
justification: str = "need a route",
|
||||
tool: str = TOOL_CAPABILITY_BLOCK,
|
||||
proposed: str = "FROM python:3.13\n",
|
||||
justification: str = "need a capability",
|
||||
) -> Proposal:
|
||||
return Proposal.new(
|
||||
bottle_slug="dev",
|
||||
tool=tool,
|
||||
proposed_file=proposed,
|
||||
justification=justification,
|
||||
current_file_hash=sha256_hex("{}"),
|
||||
current_file_hash=sha256_hex(proposed),
|
||||
now=FIXED_TS,
|
||||
)
|
||||
|
||||
@@ -57,7 +56,7 @@ class TestProposalRoundtrip(unittest.TestCase):
|
||||
self.assertTrue(p.id)
|
||||
self.assertEqual("2026-05-25T12:00:00+00:00", p.arrival_timestamp)
|
||||
self.assertEqual("dev", p.bottle_slug)
|
||||
self.assertEqual(TOOL_EGRESS_BLOCK, p.tool)
|
||||
self.assertEqual(TOOL_CAPABILITY_BLOCK, p.tool)
|
||||
|
||||
def test_to_from_dict_roundtrip(self):
|
||||
p = _proposal()
|
||||
@@ -142,14 +141,14 @@ class TestQueueIO(unittest.TestCase):
|
||||
def test_list_pending_sorted_by_arrival(self):
|
||||
# Fabricate two with explicit timestamps.
|
||||
a = Proposal.new(
|
||||
bottle_slug="dev", tool=TOOL_EGRESS_BLOCK,
|
||||
proposed_file="{}", justification="early",
|
||||
bottle_slug="dev", tool=TOOL_CAPABILITY_BLOCK,
|
||||
proposed_file="FROM python:3.13\n", justification="early",
|
||||
current_file_hash="x",
|
||||
now=datetime(2026, 5, 25, 10, 0, 0, tzinfo=timezone.utc),
|
||||
)
|
||||
b = Proposal.new(
|
||||
bottle_slug="dev", tool=TOOL_EGRESS_BLOCK,
|
||||
proposed_file="{}", justification="late",
|
||||
bottle_slug="dev", tool=TOOL_CAPABILITY_BLOCK,
|
||||
proposed_file="FROM python:3.13\n", justification="late",
|
||||
current_file_hash="x",
|
||||
now=datetime(2026, 5, 25, 14, 0, 0, tzinfo=timezone.utc),
|
||||
)
|
||||
@@ -318,16 +317,15 @@ class TestToolConstants(unittest.TestCase):
|
||||
def test_tools_tuple_matches_individual_constants(self):
|
||||
self.assertEqual(
|
||||
(
|
||||
TOOL_EGRESS_BLOCK,
|
||||
TOOL_CAPABILITY_BLOCK,
|
||||
supervise.TOOL_LIST_EGRESS_ROUTES,
|
||||
),
|
||||
supervise.TOOLS,
|
||||
)
|
||||
|
||||
def test_component_map_covers_egress_remediation_only(self):
|
||||
self.assertIn(TOOL_EGRESS_BLOCK, supervise.COMPONENT_FOR_TOOL)
|
||||
self.assertNotIn(TOOL_CAPABILITY_BLOCK, supervise.COMPONENT_FOR_TOOL)
|
||||
def test_component_map_has_no_entries(self):
|
||||
# egress-block removed in issue #198; capability-block never had one.
|
||||
self.assertEqual({}, supervise.COMPONENT_FOR_TOOL)
|
||||
|
||||
|
||||
class _StubSupervise(supervise.Supervise):
|
||||
|
||||
@@ -1,12 +1,10 @@
|
||||
"""Unit: supervise headless paths (PRD 0013 phase 4, PRD 0014).
|
||||
"""Unit: supervise headless paths (PRD 0013 phase 4, PRD 0016).
|
||||
|
||||
The curses TUI itself isn't exercised here — these tests cover the
|
||||
discovery + approve/reject + audit-write paths that the TUI's key
|
||||
handlers call into.
|
||||
discovery + approve/reject paths that the TUI's key handlers call into.
|
||||
|
||||
add_route is stubbed at the supervise CLI module level so the tests
|
||||
don't need a running egress sidecar; the real docker exec/cp/SIGHUP
|
||||
plumbing is covered by the integration test.
|
||||
egress-block (add_route) was removed in issue #198; the TestEgressApplyWiring
|
||||
class and all stubs for add_route have been dropped accordingly.
|
||||
"""
|
||||
|
||||
import os
|
||||
@@ -17,7 +15,6 @@ from pathlib import Path
|
||||
|
||||
from bot_bottle import supervise
|
||||
from bot_bottle.backend.docker.capability_apply import CapabilityApplyError
|
||||
from bot_bottle.backend.docker.egress_apply import EgressApplyError
|
||||
from bot_bottle.cli import supervise as supervise_cli
|
||||
from bot_bottle.supervise import (
|
||||
Proposal,
|
||||
@@ -25,7 +22,6 @@ from bot_bottle.supervise import (
|
||||
STATUS_MODIFIED,
|
||||
STATUS_REJECTED,
|
||||
TOOL_CAPABILITY_BLOCK,
|
||||
TOOL_EGRESS_BLOCK,
|
||||
read_audit_entries,
|
||||
read_response,
|
||||
sha256_hex,
|
||||
@@ -35,9 +31,8 @@ from bot_bottle.supervise import (
|
||||
FIXED = datetime(2026, 5, 25, 12, 0, 0, tzinfo=timezone.utc)
|
||||
|
||||
|
||||
def _proposal(slug: str = "dev", tool: str = TOOL_EGRESS_BLOCK) -> Proposal:
|
||||
def _proposal(slug: str = "dev", tool: str = TOOL_CAPABILITY_BLOCK) -> Proposal:
|
||||
payloads = {
|
||||
TOOL_EGRESS_BLOCK: '{"routes": []}\n',
|
||||
TOOL_CAPABILITY_BLOCK: "FROM python:3.13\n",
|
||||
}
|
||||
payload = payloads.get(tool, "")
|
||||
@@ -88,14 +83,14 @@ class TestDiscoverPending(_FakeHomeMixin, unittest.TestCase):
|
||||
|
||||
def test_sorted_by_arrival_across_bottles(self):
|
||||
early = Proposal.new(
|
||||
bottle_slug="api", tool=TOOL_EGRESS_BLOCK,
|
||||
proposed_file="{}", justification="early",
|
||||
bottle_slug="api", tool=TOOL_CAPABILITY_BLOCK,
|
||||
proposed_file="FROM python:3.13\n", justification="early",
|
||||
current_file_hash="h",
|
||||
now=datetime(2026, 5, 25, 10, 0, 0, tzinfo=timezone.utc),
|
||||
)
|
||||
late = Proposal.new(
|
||||
bottle_slug="dev", tool=TOOL_EGRESS_BLOCK,
|
||||
proposed_file="{}", justification="late",
|
||||
bottle_slug="dev", tool=TOOL_CAPABILITY_BLOCK,
|
||||
proposed_file="FROM python:3.13\n", justification="late",
|
||||
current_file_hash="h",
|
||||
now=datetime(2026, 5, 25, 14, 0, 0, tzinfo=timezone.utc),
|
||||
)
|
||||
@@ -120,48 +115,38 @@ class TestDiscoverPending(_FakeHomeMixin, unittest.TestCase):
|
||||
class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
|
||||
def setUp(self):
|
||||
self._setup_fake_home()
|
||||
self._original_add_route = supervise_cli.add_route
|
||||
self._original_apply_capability = supervise_cli.apply_capability_change
|
||||
# Default stubs: succeed with deterministic before/after so the
|
||||
# audit log shows a non-empty diff.
|
||||
supervise_cli.add_route = lambda slug, content: ( # type: ignore
|
||||
'{"routes": []}\n', '{"routes": [{"host": "x"}]}\n',
|
||||
)
|
||||
supervise_cli.apply_capability_change = lambda slug, content: ( # type: ignore
|
||||
"FROM old\n", content,
|
||||
)
|
||||
|
||||
def tearDown(self):
|
||||
supervise_cli.add_route = self._original_add_route
|
||||
supervise_cli.apply_capability_change = self._original_apply_capability
|
||||
self._teardown_fake_home()
|
||||
|
||||
def _enqueue(self, tool: str = TOOL_EGRESS_BLOCK):
|
||||
def _enqueue(self, tool: str = TOOL_CAPABILITY_BLOCK):
|
||||
p = _proposal(tool=tool)
|
||||
qdir = supervise.queue_dir_for_slug("dev")
|
||||
qdir.mkdir(parents=True, exist_ok=True)
|
||||
supervise.write_proposal(qdir, p)
|
||||
return supervise_cli.QueuedProposal(proposal=p, queue_dir=qdir)
|
||||
|
||||
def test_approve_writes_response_and_audit(self):
|
||||
def test_approve_writes_response(self):
|
||||
qp = self._enqueue()
|
||||
supervise_cli.approve(qp)
|
||||
resp = read_response(qp.queue_dir, qp.proposal.id)
|
||||
# capability-block is archived on approve, so the response file
|
||||
# moves to processed/ before the caller can read it.
|
||||
resp = read_response(qp.queue_dir / "processed", qp.proposal.id)
|
||||
self.assertEqual(STATUS_APPROVED, resp.status)
|
||||
self.assertIsNone(resp.final_file)
|
||||
entries = read_audit_entries("egress", "dev")
|
||||
self.assertEqual(1, len(entries))
|
||||
self.assertEqual("approved", entries[0].operator_action)
|
||||
|
||||
def test_approve_with_final_file_marks_modified(self):
|
||||
qp = self._enqueue()
|
||||
supervise_cli.approve(qp, final_file='{"routes": [{"path": "/x/"}]}\n', notes="tweaked")
|
||||
resp = read_response(qp.queue_dir, qp.proposal.id)
|
||||
supervise_cli.approve(qp, final_file="FROM bookworm\n", notes="tweaked")
|
||||
resp = read_response(qp.queue_dir / "processed", qp.proposal.id)
|
||||
self.assertEqual(STATUS_MODIFIED, resp.status)
|
||||
self.assertEqual('{"routes": [{"path": "/x/"}]}\n', resp.final_file)
|
||||
self.assertEqual("FROM bookworm\n", resp.final_file)
|
||||
self.assertEqual("tweaked", resp.notes)
|
||||
entries = read_audit_entries("egress", "dev")
|
||||
self.assertEqual("modified", entries[0].operator_action)
|
||||
|
||||
def test_reject_writes_rejection(self):
|
||||
qp = self._enqueue()
|
||||
@@ -169,113 +154,13 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
|
||||
resp = read_response(qp.queue_dir, qp.proposal.id)
|
||||
self.assertEqual(STATUS_REJECTED, resp.status)
|
||||
self.assertEqual("nope", resp.notes)
|
||||
entries = read_audit_entries("egress", "dev")
|
||||
self.assertEqual("rejected", entries[0].operator_action)
|
||||
self.assertEqual("nope", entries[0].operator_notes)
|
||||
|
||||
def test_capability_block_skips_audit_log(self):
|
||||
def test_no_audit_log_for_capability_block(self):
|
||||
qp = self._enqueue(tool=TOOL_CAPABILITY_BLOCK)
|
||||
supervise_cli.approve(qp)
|
||||
# No audit log for capability-block (per PRD 0013 / 0016).
|
||||
self.assertEqual([], read_audit_entries("egress", "dev"))
|
||||
|
||||
|
||||
class TestEgressApplyWiring(_FakeHomeMixin, unittest.TestCase):
|
||||
"""PRD 0017 chunk 3: approve() on an egress-block proposal
|
||||
must call add_route (single-route merge) with the right args
|
||||
and surface its failures."""
|
||||
|
||||
def setUp(self):
|
||||
self._setup_fake_home()
|
||||
self._original_add_route = supervise_cli.add_route
|
||||
|
||||
def tearDown(self):
|
||||
supervise_cli.add_route = self._original_add_route
|
||||
self._teardown_fake_home()
|
||||
|
||||
def _enqueue_egress(self, proposed: str = '{"host": "x.example"}\n'):
|
||||
p = Proposal.new(
|
||||
bottle_slug="dev", tool=TOOL_EGRESS_BLOCK,
|
||||
proposed_file=proposed,
|
||||
justification="need a route",
|
||||
current_file_hash=sha256_hex(proposed),
|
||||
now=FIXED,
|
||||
)
|
||||
qdir = supervise.queue_dir_for_slug("dev")
|
||||
qdir.mkdir(parents=True, exist_ok=True)
|
||||
supervise.write_proposal(qdir, p)
|
||||
return supervise_cli.QueuedProposal(proposal=p, queue_dir=qdir)
|
||||
|
||||
def test_egress_block_calls_add_route_with_proposed_json(self):
|
||||
calls = []
|
||||
supervise_cli.add_route = lambda slug, content: ( # type: ignore
|
||||
calls.append((slug, content)) or ("before", "after")
|
||||
)
|
||||
qp = self._enqueue_egress(
|
||||
proposed='{"host": "new.example", "path_allowlist": ["/x/"]}\n'
|
||||
)
|
||||
supervise_cli.approve(qp)
|
||||
self.assertEqual(1, len(calls))
|
||||
slug, content = calls[0]
|
||||
self.assertEqual("dev", slug)
|
||||
# The single-route JSON the agent proposed reaches add_route
|
||||
# unchanged — add_route fetches current state + merges.
|
||||
self.assertEqual(
|
||||
'{"host": "new.example", "path_allowlist": ["/x/"]}\n',
|
||||
content,
|
||||
)
|
||||
|
||||
def test_modify_passes_final_file_to_add_route(self):
|
||||
calls = []
|
||||
supervise_cli.add_route = lambda slug, content: ( # type: ignore
|
||||
calls.append(content) or ("before", "after")
|
||||
)
|
||||
qp = self._enqueue_egress()
|
||||
supervise_cli.approve(
|
||||
qp,
|
||||
final_file='{"host": "edited.example"}\n',
|
||||
notes="tweaked",
|
||||
)
|
||||
self.assertEqual(['{"host": "edited.example"}\n'], calls)
|
||||
|
||||
def test_apply_failure_blocks_response_and_audit(self):
|
||||
supervise_cli.add_route = lambda slug, content: (_ for _ in ()).throw( # type: ignore
|
||||
EgressApplyError("docker exec failed")
|
||||
)
|
||||
qp = self._enqueue_egress()
|
||||
with self.assertRaises(EgressApplyError):
|
||||
supervise_cli.approve(qp)
|
||||
# No response file (proposal stays pending).
|
||||
self.assertEqual(
|
||||
[qp.proposal.id],
|
||||
[p.id for p in supervise.list_pending_proposals(qp.queue_dir)],
|
||||
)
|
||||
# No audit entry.
|
||||
self.assertEqual([], read_audit_entries("egress", "dev"))
|
||||
|
||||
def test_real_diff_lands_in_audit(self):
|
||||
supervise_cli.add_route = lambda slug, content: ( # type: ignore
|
||||
'{"routes": []}\n', # before
|
||||
'{"routes": [{"host": "new.example"}]}\n', # after
|
||||
)
|
||||
qp = self._enqueue_egress(proposed='{"host": "new.example"}\n')
|
||||
supervise_cli.approve(qp)
|
||||
entries = read_audit_entries("egress", "dev")
|
||||
self.assertEqual(1, len(entries))
|
||||
self.assertIn('+{"routes": [{"host": "new.example"}]}', entries[0].diff)
|
||||
self.assertIn('-{"routes": []}', entries[0].diff)
|
||||
|
||||
def test_reject_does_not_call_apply(self):
|
||||
qp = self._enqueue_egress()
|
||||
supervise_cli.reject(qp, reason="no thanks")
|
||||
# Reject still writes a response + audit entry with empty diff.
|
||||
resp = read_response(qp.queue_dir, qp.proposal.id)
|
||||
self.assertEqual(STATUS_REJECTED, resp.status)
|
||||
entries = read_audit_entries("egress", "dev")
|
||||
self.assertEqual(1, len(entries))
|
||||
self.assertEqual("", entries[0].diff)
|
||||
|
||||
|
||||
class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
|
||||
"""PRD 0016 Phase 3: approve() on a capability-block proposal
|
||||
calls apply_capability_change, archives the proposal afterward
|
||||
@@ -328,17 +213,12 @@ class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
|
||||
supervise_cli.apply_capability_change = lambda slug, content: ("FROM old\n", content) # type: ignore
|
||||
qp = self._enqueue_capability()
|
||||
supervise_cli.approve(qp)
|
||||
# capability-block has no audit log per PRD 0013 — its record
|
||||
# lives in the per-bottle Dockerfile + transcript state.
|
||||
self.assertEqual([], read_audit_entries("egress", "dev"))
|
||||
|
||||
def test_proposal_archived_after_apply(self):
|
||||
supervise_cli.apply_capability_change = lambda slug, content: ("FROM old\n", content) # type: ignore
|
||||
qp = self._enqueue_capability()
|
||||
supervise_cli.approve(qp)
|
||||
# Sidecar would normally archive after delivering the response,
|
||||
# but it's gone by then. The supervise TUI archives so
|
||||
# discover_pending stops surfacing the resolved proposal.
|
||||
self.assertEqual([], supervise.list_pending_proposals(qp.queue_dir))
|
||||
processed = list((qp.queue_dir / "processed").glob("*.json"))
|
||||
self.assertEqual(2, len(processed))
|
||||
@@ -346,20 +226,8 @@ class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
|
||||
|
||||
class TestEditInEditor(unittest.TestCase):
|
||||
def test_runs_editor_returns_edited_content(self):
|
||||
# Fake "editor" is /bin/sh -c 'cat <<EOF > $1 ... EOF'
|
||||
original_editor = os.environ.get("EDITOR")
|
||||
try:
|
||||
# Use a fake editor that overwrites the file with a known
|
||||
# marker. EDITOR is split with shlex equivalence by
|
||||
# subprocess.run when invoked as a list — keep it as a
|
||||
# single program path that takes the file as argv[1].
|
||||
os.environ["EDITOR"] = (
|
||||
"/bin/sh -c 'printf %s \"edited\" > \"$0\"'"
|
||||
)
|
||||
# subprocess.run with the str as the first list element
|
||||
# would try to find a binary literally named "/bin/sh -c ..."
|
||||
# — that won't work. Use shell mode trick: wrap in a script.
|
||||
# Easier: build a tiny helper script.
|
||||
with tempfile.NamedTemporaryFile(
|
||||
mode="w", suffix=".sh", delete=False, prefix="fake-editor.",
|
||||
) as script:
|
||||
@@ -381,7 +249,6 @@ class TestEditInEditor(unittest.TestCase):
|
||||
def test_returns_none_when_unchanged(self):
|
||||
original_editor = os.environ.get("EDITOR")
|
||||
try:
|
||||
# No-op editor: touch the file (leaves it unchanged).
|
||||
with tempfile.NamedTemporaryFile(
|
||||
mode="w", suffix=".sh", delete=False, prefix="noop-editor.",
|
||||
) as script:
|
||||
@@ -445,7 +312,6 @@ class TestCapabilityBlockSmolmachinesGuard(_FakeHomeMixin, unittest.TestCase):
|
||||
supervise_cli.approve(qp) # must not raise
|
||||
|
||||
def test_no_metadata_falls_through_to_docker_path(self):
|
||||
# No metadata at all → assume Docker (backward-compatible).
|
||||
qp = self._enqueue_capability("dev")
|
||||
supervise_cli.approve(qp) # must not raise
|
||||
|
||||
|
||||
@@ -141,7 +141,6 @@ class TestHandleToolsList(unittest.TestCase):
|
||||
names = [t["name"] for t in result["tools"]] # type: ignore[index]
|
||||
self.assertEqual(
|
||||
sorted([
|
||||
_sv.TOOL_EGRESS_BLOCK,
|
||||
_sv.TOOL_CAPABILITY_BLOCK,
|
||||
_sv.TOOL_LIST_EGRESS_ROUTES,
|
||||
]),
|
||||
@@ -206,10 +205,10 @@ class TestHandleToolsCall(unittest.TestCase):
|
||||
try:
|
||||
result = handle_tools_call(
|
||||
{
|
||||
"name": _sv.TOOL_EGRESS_BLOCK,
|
||||
"name": _sv.TOOL_CAPABILITY_BLOCK,
|
||||
"arguments": {
|
||||
"host": "example.com",
|
||||
"justification": "need a route",
|
||||
"dockerfile": "FROM python:3.13\n",
|
||||
"justification": "need git",
|
||||
},
|
||||
},
|
||||
self.config,
|
||||
@@ -250,8 +249,8 @@ class TestHandleToolsCall(unittest.TestCase):
|
||||
with self.assertRaises(_RpcError):
|
||||
handle_tools_call(
|
||||
{
|
||||
"name": _sv.TOOL_EGRESS_BLOCK,
|
||||
"arguments": {"host": "example.com"},
|
||||
"name": _sv.TOOL_CAPABILITY_BLOCK,
|
||||
"arguments": {"dockerfile": "FROM python:3.13\n"},
|
||||
},
|
||||
self.config,
|
||||
)
|
||||
@@ -261,9 +260,9 @@ class TestHandleToolsCall(unittest.TestCase):
|
||||
try:
|
||||
handle_tools_call(
|
||||
{
|
||||
"name": _sv.TOOL_EGRESS_BLOCK,
|
||||
"name": _sv.TOOL_CAPABILITY_BLOCK,
|
||||
"arguments": {
|
||||
"host": "example.com",
|
||||
"dockerfile": "FROM python:3.13\n",
|
||||
"justification": "x",
|
||||
},
|
||||
},
|
||||
@@ -285,10 +284,10 @@ class TestHandleToolsCall(unittest.TestCase):
|
||||
)
|
||||
result = handle_tools_call(
|
||||
{
|
||||
"name": _sv.TOOL_EGRESS_BLOCK,
|
||||
"name": _sv.TOOL_CAPABILITY_BLOCK,
|
||||
"arguments": {
|
||||
"host": "example.com",
|
||||
"justification": "need a route",
|
||||
"dockerfile": "FROM python:3.13\n",
|
||||
"justification": "need a capability",
|
||||
},
|
||||
},
|
||||
config,
|
||||
@@ -412,7 +411,8 @@ class TestHttpEndToEnd(unittest.TestCase):
|
||||
self.assertEqual("2.0", result["jsonrpc"])
|
||||
self.assertEqual(1, result["id"])
|
||||
names = [t["name"] for t in result["result"]["tools"]] # type: ignore[index]
|
||||
self.assertIn(_sv.TOOL_EGRESS_BLOCK, names)
|
||||
self.assertIn(_sv.TOOL_CAPABILITY_BLOCK, names)
|
||||
self.assertNotIn("egress-block", names)
|
||||
|
||||
def test_unknown_method_returns_jsonrpc_error(self):
|
||||
result = self._post_jsonrpc(
|
||||
|
||||
Reference in New Issue
Block a user