Fix Codex supervise MCP registration #248

Merged
didericis merged 12 commits from fix-codex-mcp-supervise-transport into main 2026-06-23 16:42:20 -04:00
21 changed files with 502 additions and 99 deletions
+1 -1
View File
@@ -134,7 +134,7 @@ def _sidecar_bundle_service(plan: DockerBottlePlan) -> dict[str, Any]:
ep = plan.egress_plan
volumes.append(_bind(ep.mitmproxy_ca_host_path, EGRESS_CA_IN_CONTAINER))
if ep.routes:
volumes.append(_bind(ep.routes_path, EGRESS_ROUTES_IN_CONTAINER))
volumes.append(_bind(ep.routes_path.parent, str(Path(EGRESS_ROUTES_IN_CONTAINER).parent)))
for token_env in sorted(ep.token_env_map.keys()):
env.append(token_env)
+28 -17
View File
@@ -1,24 +1,21 @@
"""Host-side helper for egress sidecar inspection (issue #198).
"""Host-side helper for egress sidecar inspection and live updates.
`_merge_single_route`, `add_route`, and `apply_routes_change` were
removed when the egress-block MCP tool was dropped. The remaining
helpers support runtime inspection and validation of the routes file
without modifying it at runtime.
The approve path uses this module to validate a proposed routes file,
write it to the bottle's live egress state dir, and signal the sidecar
bundle so the mitmproxy addon reloads it.
"""
from __future__ import annotations
import os
import subprocess
from ...egress import EGRESS_ROUTES_IN_CONTAINER
from ...egress_addon_core import load_routes
from ...log import warn
from ..egress_apply import EgressApplicator, EgressApplyError
from .sidecar_bundle import sidecar_bundle_container_name
class EgressApplyError(RuntimeError):
pass
def fetch_current_routes(slug: str) -> str:
container = sidecar_bundle_container_name(slug)
r = subprocess.run(
@@ -33,17 +30,31 @@ def fetch_current_routes(slug: str) -> str:
return r.stdout
def validate_routes_content(content: str) -> None:
try:
load_routes(content)
except ValueError as e:
class DockerEgressApplicator(EgressApplicator):
def _signal_bundle_reload(self, slug: str) -> None:
container = sidecar_bundle_container_name(slug)
result = subprocess.run(
["docker", "kill", "--signal", "HUP", container],
capture_output=True, text=True, check=False, env=os.environ,
)
if result.returncode != 0:
last_error = (result.stderr or "").strip() or (result.stdout or "").strip()
warn(
f"egress: routes updated on disk for {slug}, but bundle reload failed: "
f"{last_error or 'docker kill failed'}"
)
raise EgressApplyError(
f"proposed routes.yaml is not valid: {e}"
) from e
f"could not reload egress bundle {container}: "
f"{last_error or 'docker kill failed'}"
)
applicator = DockerEgressApplicator()
__all__ = [
"DockerEgressApplicator",
"EgressApplyError",
"applicator",
"fetch_current_routes",
"validate_routes_content",
]
+50
View File
@@ -0,0 +1,50 @@
"""Shared base class for host-side egress apply across backends.
Each backend subclasses EgressApplicator and overrides _signal_bundle_reload
with the backend-specific kill command.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from pathlib import Path
from ..bottle_state import egress_state_dir
from ..egress import EGRESS_ROUTES_FILENAME
from ..egress_addon_core import load_routes
class EgressApplyError(RuntimeError):
pass
class EgressApplicator(ABC):
def apply_routes_change(self, slug: str, content: str) -> tuple[str, str]:
"""Persist `content` to the live routes file and reload egress."""
self.validate_routes_content(content)
routes_path = self._routes_path(slug)
routes_path.parent.mkdir(parents=True, exist_ok=True)
before = routes_path.read_text(encoding="utf-8") if routes_path.exists() else ""
routes_path.write_text(content, encoding="utf-8")
routes_path.chmod(0o600)
self._signal_bundle_reload(slug)
return before, content
@staticmethod
def validate_routes_content(content: str) -> None:
try:
load_routes(content)
except ValueError as e:
raise EgressApplyError(
f"proposed routes.yaml is not valid: {e}"
) from e
@staticmethod
def _routes_path(slug: str) -> Path:
return egress_state_dir(slug) / EGRESS_ROUTES_FILENAME
@abstractmethod
def _signal_bundle_reload(self, slug: str) -> None: ...
__all__ = ["EgressApplicator", "EgressApplyError"]
@@ -0,0 +1,39 @@
"""Host-side egress apply for the macos-container backend.
Uses `container kill --signal HUP` (Apple Container framework) instead
of `docker kill` to signal the sidecar bundle.
"""
from __future__ import annotations
import os
import subprocess
from ...log import warn
from ..egress_apply import EgressApplicator, EgressApplyError
from .launch import sidecar_container_name
class MacOSContainerEgressApplicator(EgressApplicator):
def _signal_bundle_reload(self, slug: str) -> None:
container = sidecar_container_name(slug)
result = subprocess.run(
didericis marked this conversation as resolved Outdated
Outdated
Review

This belongs in a base class shared between backends, named something like "EgressApplicator", and calls an internal/overridden _signal_bundle_reload function that differs by backend (seems to be the only difference between backends)

This belongs in a base class shared between backends, named something like "EgressApplicator", and calls an internal/overridden `_signal_bundle_reload` function that differs by backend (seems to be the only difference between backends)
["container", "kill", "--signal", "HUP", container],
capture_output=True, text=True, check=False, env=os.environ,
)
if result.returncode != 0:
last_error = (result.stderr or "").strip() or (result.stdout or "").strip()
warn(
f"egress: routes updated on disk for {slug}, but bundle reload failed: "
f"{last_error or 'container kill failed'}"
)
raise EgressApplyError(
f"could not reload egress bundle {container}: "
f"{last_error or 'container kill failed'}"
)
applicator = MacOSContainerEgressApplicator()
didericis marked this conversation as resolved Outdated
Outdated
Review

export the applicator from each of the backends as a singleton instead of exporting a apply_routes_change function wrapper

export the applicator from each of the backends as a singleton instead of exporting a `apply_routes_change` function wrapper
__all__ = ["MacOSContainerEgressApplicator", "EgressApplyError", "applicator"]
+1 -13
View File
@@ -12,7 +12,6 @@ from __future__ import annotations
import dataclasses
import os
import shutil
import subprocess
from contextlib import ExitStack, contextmanager
from pathlib import Path
@@ -364,7 +363,7 @@ def _sidecar_mounts(
))
if ep.routes:
mounts.append((
str(_stage_routes_dir(plan)),
str(ep.routes_path.parent),
str(Path(EGRESS_ROUTES_IN_CONTAINER).parent),
True,
))
@@ -375,17 +374,6 @@ def _sidecar_mounts(
return tuple(mounts)
def _stage_routes_dir(plan: MacosContainerBottlePlan) -> Path:
routes_dir = plan.stage_dir / "macos-container-egress"
routes_dir.mkdir(parents=True, exist_ok=True)
shutil.copyfile(
plan.egress_plan.routes_path,
routes_dir / Path(EGRESS_ROUTES_IN_CONTAINER).name,
)
return routes_dir
def _mount_spec(host_path: str, container_path: str, read_only: bool) -> str:
spec = f"type=bind,source={host_path},target={container_path}"
if read_only:
@@ -0,0 +1,21 @@
"""Egress apply for the smolmachines backend.
The smolmachines sidecar bundle runs as a host-side Docker container,
so egress signalling is identical to the docker backend.
"""
from __future__ import annotations
from ..docker.egress_apply import ( # noqa: F401
DockerEgressApplicator,
EgressApplyError,
applicator,
fetch_current_routes,
)
__all__ = [
"DockerEgressApplicator",
"EgressApplyError",
"applicator",
"fetch_current_routes",
]
+6 -2
View File
@@ -217,11 +217,15 @@ def _discover_urls(
agent_supervise_url = f"http://{loopback_ip}:{supervise_host_port}/"
existing_no_proxy = plan.guest_env.get("NO_PROXY", "localhost,127.0.0.1")
no_proxy = f"{existing_no_proxy},{loopback_ip}"
guest_env = {
**plan.guest_env,
"HTTPS_PROXY": agent_proxy_url,
"HTTP_PROXY": agent_proxy_url,
"NO_PROXY": f"{existing_no_proxy},{loopback_ip}",
"https_proxy": agent_proxy_url,
"http_proxy": agent_proxy_url,
"NO_PROXY": no_proxy,
"no_proxy": no_proxy,
}
if agent_git_gate_host:
guest_env["GIT_GATE_URL"] = f"http://{agent_git_gate_host}"
@@ -308,7 +312,7 @@ def _bundle_launch_spec(
ep = plan.egress_plan
volumes.append((str(ep.mitmproxy_ca_host_path), EGRESS_CA_IN_CONTAINER, True))
if ep.routes:
volumes.append((str(ep.routes_path), EGRESS_ROUTES_IN_CONTAINER, True))
volumes.append((str(ep.routes_path.parent), str(Path(EGRESS_ROUTES_IN_CONTAINER).parent), True))
# Bare-name entries for upstream-token slots. Their values
# come from the docker-run subprocess env (inherited from
# the operator's shell), never landing on argv.
+34 -3
View File
@@ -3,7 +3,8 @@ act on them (approve / modify / reject).
Curses-based TUI; modify-then-approve shells out to $EDITOR. The
approval handler wires to PRD 0016 (capability-block), which rebuilds
the bottle Dockerfile. The egress-block tool was removed in issue #198.
the bottle Dockerfile. Egress proposals are queued for operator review
as full routes.yaml updates.
"""
from __future__ import annotations
@@ -20,11 +21,21 @@ from datetime import datetime, timezone
from pathlib import Path
from .. import supervise as _supervise
# from ..bottle_state import read_metadata
from ..bottle_state import read_metadata
# from ..backend.docker.capability_apply import (
# CapabilityApplyError,
# apply_capability_change,
# )
from ..backend.docker.egress_apply import (
EgressApplyError,
applicator as _docker_applicator,
)
from ..backend.macos_container.egress_apply import (
applicator as _macos_applicator,
)
from ..backend.smolmachines.egress_apply import (
applicator as _smolmachines_applicator,
)
from ..log import Die, error, info
@@ -40,6 +51,8 @@ from ..supervise import (
STATUS_MODIFIED,
STATUS_REJECTED,
TOOL_CAPABILITY_BLOCK,
TOOL_ALLOW,
TOOL_EGRESS_BLOCK,
archive_proposal,
list_pending_proposals,
render_diff,
@@ -63,7 +76,17 @@ class QueuedProposal:
# Errors any remediation engine may raise. Caught by the TUI key
# handlers and surfaced in the status line so a failed apply keeps
# the proposal pending rather than crashing curses.
ApplyError = (CapabilityApplyError,)
ApplyError = (CapabilityApplyError, EgressApplyError)
def apply_routes_change(slug: str, content: str) -> tuple[str, str]:
meta = read_metadata(slug)
backend = meta.backend if meta is not None else ""
if backend == "macos-container":
return _macos_applicator.apply_routes_change(slug, content)
if backend == "smolmachines":
return _smolmachines_applicator.apply_routes_change(slug, content)
return _docker_applicator.apply_routes_change(slug, content)
def discover_pending() -> list[QueuedProposal]:
@@ -115,6 +138,8 @@ def _detail_lines(
def _suffix_for_tool(tool: str) -> str:
if tool == TOOL_CAPABILITY_BLOCK:
return ".dockerfile"
if tool in (TOOL_ALLOW, TOOL_EGRESS_BLOCK):
return ".yaml"
return ".txt"
@@ -129,6 +154,7 @@ def approve(
) -> None:
"""Apply the proposal, write the waiting response, and audit it."""
status = STATUS_MODIFIED if final_file is not None else STATUS_APPROVED
file_to_apply = final_file if final_file is not None else qp.proposal.proposed_file
diff_before, diff_after = "", ""
# if qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
@@ -142,6 +168,11 @@ def approve(
# diff_before, diff_after = apply_capability_change(
# qp.proposal.bottle_slug, file_to_apply,
# )
if qp.proposal.tool in (TOOL_ALLOW, TOOL_EGRESS_BLOCK):
diff_before, diff_after = apply_routes_change(
qp.proposal.bottle_slug,
file_to_apply,
)
response = Response(
proposal_id=qp.proposal.id,
+3 -3
View File
@@ -261,8 +261,8 @@ class CodexAgentProvider(AgentProvider):
return
info(f"registering supervise MCP server in agent codex config → {supervise_url}")
r = bottle.exec(
f"codex mcp add --transport http "
f"{_SUPERVISE_MCP_NAME} {supervise_url}",
f"codex mcp add {_SUPERVISE_MCP_NAME} --url "
f"{shlex.quote(supervise_url)}",
user="node",
)
if r.returncode != 0:
@@ -270,7 +270,7 @@ class CodexAgentProvider(AgentProvider):
f"`codex mcp add supervise` failed (exit {r.returncode}): "
f"{(r.stderr or r.stdout or '').strip()}. Inside the bottle, "
f"register manually with: "
f"codex mcp add --transport http supervise {supervise_url}"
f"codex mcp add supervise --url {shlex.quote(supervise_url)}"
)
+3 -1
View File
@@ -31,6 +31,7 @@ CODEX_HOST_CREDENTIAL_TOKEN_REF = "BOT_BOTTLE_CODEX_HOST_ACCESS_TOKEN"
EGRESS_HOSTNAME = "egress"
EGRESS_ROUTES_IN_CONTAINER = "/etc/egress/routes.yaml"
EGRESS_ROUTES_FILENAME = Path(EGRESS_ROUTES_IN_CONTAINER).name
@dataclass(frozen=True)
@@ -295,7 +296,7 @@ class Egress(ABC):
) -> EgressPlan:
routes = egress_routes_for_bottle(bottle, provider_routes)
log = bottle.egress.Log
routes_path = stage_dir / "egress_routes.yaml"
routes_path = stage_dir / EGRESS_ROUTES_FILENAME
routes_path.write_text(egress_render_routes(routes, log=log))
routes_path.chmod(0o600)
return EgressPlan(
@@ -309,6 +310,7 @@ class Egress(ABC):
__all__ = [
"CODEX_HOST_CREDENTIAL_TOKEN_REF",
"EGRESS_HOSTNAME",
"EGRESS_ROUTES_FILENAME",
"EGRESS_ROUTES_IN_CONTAINER",
"Egress",
"EgressPlan",
+2 -2
View File
@@ -5,7 +5,6 @@ egress container."""
from __future__ import annotations
import dataclasses
import json
import os
import signal
@@ -27,6 +26,7 @@ from egress_addon_core import ( # type: ignore[import-not-found] # pylint: dis
load_config,
match_route,
outbound_scan_headers,
route_to_yaml_dict,
scan_inbound,
scan_outbound,
)
@@ -82,7 +82,7 @@ class EgressAddon:
def _serve_introspection(self, flow: http.HTTPFlow, path: str) -> None:
if path == "/allowlist":
payload = json.dumps(
{"routes": [dataclasses.asdict(r) for r in self.config.routes]},
{"routes": [route_to_yaml_dict(r) for r in self.config.routes]},
indent=2,
).encode("utf-8")
flow.response = http.Response.make(
+51
View File
@@ -359,6 +359,56 @@ def _parse_one(idx: int, raw: object) -> Route:
)
def _path_match_to_dict(pm: PathMatch) -> dict[str, object]:
d: dict[str, object] = {"value": pm.value}
if pm.type != "prefix":
d["type"] = pm.type
return d
def _header_match_to_dict(hm: HeaderMatch) -> dict[str, object]:
d: dict[str, object] = {"name": hm.name, "value": hm.value}
if hm.type != "exact":
d["type"] = hm.type
return d
def _match_entry_to_dict(me: MatchEntry) -> dict[str, object]:
d: dict[str, object] = {}
if me.paths:
d["paths"] = [_path_match_to_dict(p) for p in me.paths]
if me.methods:
d["methods"] = list(me.methods)
if me.headers:
d["headers"] = [_header_match_to_dict(h) for h in me.headers]
return d
def route_to_yaml_dict(r: Route) -> dict[str, object]:
"""Serialize a Route to YAML-schema-compatible dict.
Uses the same field names the YAML parser accepts, so the output
can be round-tripped directly into an `allow` or `egress-block`
proposal without translation. Fields that are empty/default are
omitted so the agent doesn't copy irrelevant keys."""
d: dict[str, object] = {"host": r.host}
if r.auth_scheme:
d["auth_scheme"] = r.auth_scheme
d["token_env"] = r.token_env
if r.matches:
d["matches"] = [_match_entry_to_dict(m) for m in r.matches]
if r.git_fetch:
d["git"] = {"fetch": True}
dlp: dict[str, object] = {}
if r.outbound_detectors is not None:
dlp["outbound_detectors"] = list(r.outbound_detectors)
if r.inbound_detectors is not None:
dlp["inbound_detectors"] = list(r.inbound_detectors)
if dlp:
d["dlp"] = dlp
return d
def load_routes(text: str) -> tuple[Route, ...]:
"""Parse YAML text → routes."""
try:
@@ -698,6 +748,7 @@ def scan_inbound(
__all__ = [
"LOG_BLOCKS",
"route_to_yaml_dict",
"LOG_FULL",
"LOG_OFF",
"Config",
+19 -12
View File
@@ -5,7 +5,7 @@ queue/audit support. The sidecar (bot_bottle.supervise_server)
sits on the bottle's internal network and exposes three MCP tools the
agent calls when it hits a stuck-recovery category:
* egress-block agent proposes a new routes.yaml
* egress-block / allow agent proposes a new routes.yaml
* capability-block agent proposes a new agent Dockerfile
Each tool call: the agent passes the full proposed file plus a
@@ -49,27 +49,34 @@ SUPERVISE_HOSTNAME = "supervise"
SUPERVISE_PORT = 9100
TOOL_CAPABILITY_BLOCK = "capability-block"
TOOL_EGRESS_BLOCK = "egress-block"
TOOL_ALLOW = "allow"
TOOL_LIST_EGRESS_ROUTES = "list-egress-routes"
TOOLS: tuple[str, ...] = (
TOOL_ALLOW,
TOOL_CAPABILITY_BLOCK,
TOOL_EGRESS_BLOCK,
TOOL_LIST_EGRESS_ROUTES,
)
# The supervise sidecar uses these to query egress's
# introspection endpoint for the `list-egress-routes` MCP
# tool. The hostname + port match egress's docker network
# alias + listen port (see bot_bottle.egress.EGRESS_HOSTNAME
# and backend.docker.egress.EGRESS_PORT — the values
# are inlined here so the in-container supervise_server doesn't
# need to import the egress package).
EGRESS_FORWARD_PROXY = "http://egress:9099"
# listen port (see backend.docker.egress.EGRESS_PORT). The supervise
# daemon runs inside the sidecar bundle alongside egress, so loopback
# is the stable address across docker, smolmachines, and Apple
# Container backends.
EGRESS_FORWARD_PROXY = "http://127.0.0.1:9099"
EGRESS_INTROSPECT_URL = "http://_egress.local/allowlist"
# capability-block has no on-disk config the operator edits in place
# (the Dockerfile is rebuilt, not patched), so it has no audit log
# here — those changes are captured by git history + the rebuild
# record laid down in PRD 0016. egress-block was removed in issue #198.
COMPONENT_FOR_TOOL: dict[str, str] = {}
# here — those changes are captured by git history + the rebuild record
# laid down in PRD 0016.
COMPONENT_FOR_TOOL: dict[str, str] = {
TOOL_ALLOW: "egress",
TOOL_EGRESS_BLOCK: "egress",
}
STATUS_APPROVED = "approved"
STATUS_MODIFIED = "modified"
@@ -431,9 +438,9 @@ def sha256_hex(content: str) -> str:
# Dockerfile and propose modifications.
#
# routes.yaml + allowlist used to live here too; PRD 0017 chunk 3
# moved them behind the `list-egress-routes` MCP tool (live
# state from egress's introspection endpoint) so the agent
# always sees current data rather than a launch-time snapshot.
# moved them behind the `list-egress-routes` MCP tool (live state
# from egress's introspection endpoint) so the agent always sees
# current data rather than a launch-time snapshot.
CURRENT_CONFIG_DOCKERFILE = "Dockerfile"
+107 -9
View File
@@ -1,8 +1,8 @@
"""Supervise sidecar HTTP server (PRD 0013).
Per-bottle MCP server exposing tools the agent calls to propose config
changes when stuck. The egress-block tool was removed in issue #198;
the remaining tools are `capability-block` and `list-egress-routes`.
changes when stuck. The tools are `allow`, `egress-block`,
`capability-block`, and `list-egress-routes`.
Each queued tool call:
@@ -44,9 +44,15 @@ import urllib.request
from dataclasses import dataclass
from pathlib import Path
# Same-directory import inside the bundle container; `supervise.py`
# is COPYed alongside this file by Dockerfile.sidecars.
try:
# Same-directory imports inside the bundle container; these files are
# COPYed flat under /app by Dockerfile.sidecars.
from egress_addon_core import load_routes
import supervise as _sv
except ModuleNotFoundError:
# Package imports for host-side tests and tooling.
from .egress_addon_core import load_routes
from . import supervise as _sv
# --- JSON-RPC / MCP plumbing ----------------------------------------------
@@ -142,8 +148,9 @@ TOOL_DEFINITIONS: list[dict[str, object]] = [
"allowlist. Returns JSON with one entry per allowed host, "
"each carrying its matches rules (if any) and whether "
"the proxy injects Authorization for the route. Use this "
"before composing an `egress-block` proposal so the new "
"routes file extends the live one rather than replacing it."
"before composing an `allow` or `egress-block` proposal so "
"the new routes file extends the live one rather than "
"replacing it."
),
"inputSchema": {
"type": "object",
@@ -151,6 +158,88 @@ TOOL_DEFINITIONS: list[dict[str, object]] = [
"additionalProperties": False,
},
},
{
"name": _sv.TOOL_ALLOW,
"description": (
"Request operator approval to change the bottle's egress "
"allowlist. Pass the full proposed routes.yaml content, not "
"just the new host, plus a justification. Use "
"`list-egress-routes` first so the proposal preserves existing "
"routes."
),
"inputSchema": {
"type": "object",
"properties": {
"routes_yaml": {
"type": "string",
"description": (
"Full proposed /etc/egress/routes.yaml content. "
"Each route entry accepts these keys:\n"
" host: <hostname> (required)\n"
" auth_scheme: Bearer|token (must pair with token_env)\n"
" token_env: <ENV_VAR_NAME> (must pair with auth_scheme)\n"
" matches: (optional list of match entries)\n"
" - paths: [{type: prefix|exact|regex, value: /...}]\n"
" methods: [GET, POST, ...]\n"
" headers: [{name: X-Hdr, value: val, type: exact|regex}]\n"
" git: (optional; omit to block git clone/fetch)\n"
" fetch: true\n"
" dlp: (optional DLP scanner overrides)\n"
" outbound_detectors: [token_patterns, known_secrets]\n"
" inbound_detectors: [naive_injection_detection]\n"
"Omit any key that should use its default. "
"`list-egress-routes` returns routes in this same format."
),
},
"justification": {
"type": "string",
"description": "Why this egress route is needed.",
},
},
"required": ["routes_yaml", "justification"],
},
},
{
"name": _sv.TOOL_EGRESS_BLOCK,
"description": (
"Request operator approval to change the bottle's egress "
"allowlist after a blocked outbound request. Pass the full "
"proposed routes.yaml content plus a justification. Use "
"`list-egress-routes` first so the proposal preserves existing "
"routes."
),
"inputSchema": {
"type": "object",
"properties": {
"routes_yaml": {
"type": "string",
"description": (
"Full proposed /etc/egress/routes.yaml content. "
"Each route entry accepts these keys:\n"
" host: <hostname> (required)\n"
" auth_scheme: Bearer|token (must pair with token_env)\n"
" token_env: <ENV_VAR_NAME> (must pair with auth_scheme)\n"
" matches: (optional list of match entries)\n"
" - paths: [{type: prefix|exact|regex, value: /...}]\n"
" methods: [GET, POST, ...]\n"
" headers: [{name: X-Hdr, value: val, type: exact|regex}]\n"
" git: (optional; omit to block git clone/fetch)\n"
" fetch: true\n"
" dlp: (optional DLP scanner overrides)\n"
" outbound_detectors: [token_patterns, known_secrets]\n"
" inbound_detectors: [naive_injection_detection]\n"
"Omit any key that should use its default. "
"`list-egress-routes` returns routes in this same format."
),
},
"justification": {
"type": "string",
"description": "Why this egress route is needed.",
},
},
"required": ["routes_yaml", "justification"],
},
},
{
"name": _sv.TOOL_CAPABILITY_BLOCK,
"description": (
@@ -182,11 +271,12 @@ TOOL_DEFINITIONS: list[dict[str, object]] = [
]
# Map each non-egress tool to the input field that carries the agent's
# payload (stored in Proposal.proposed_file). egress-block builds its
# payload from structured input fields in `handle_egress_block`.
# Map each proposal tool to the input field that carries the agent's
# payload (stored in Proposal.proposed_file).
PROPOSED_FILE_FIELD: dict[str, str] = {
_sv.TOOL_ALLOW: "routes_yaml",
_sv.TOOL_CAPABILITY_BLOCK: "dockerfile",
_sv.TOOL_EGRESS_BLOCK: "routes_yaml",
}
@@ -203,6 +293,14 @@ def validate_proposed_file(tool: str, content: str) -> None:
# Dockerfiles are too varied to validate syntactically beyond
# non-empty. The operator reads the diff in the TUI.
pass
elif tool in (_sv.TOOL_ALLOW, _sv.TOOL_EGRESS_BLOCK):
try:
load_routes(content)
except ValueError as e:
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: proposed routes.yaml is not valid: {e}",
) from e
else:
raise _RpcError(ERR_INVALID_PARAMS, f"unknown tool {tool!r}")
+1 -1
View File
@@ -392,7 +392,7 @@ class TestSidecarBundleShape(unittest.TestCase):
"services"]["sidecars"]
targets = {v["target"] for v in sc["volumes"]}
self.assertIn("/home/mitmproxy/.mitmproxy/mitmproxy-ca.pem", targets)
self.assertIn("/etc/egress/routes.yaml", targets)
self.assertIn("/etc/egress", targets)
self.assertIn("/git-gate-entrypoint.sh", targets)
self.assertIn("/git-gate/creds/upstream-known_hosts", targets)
self.assertTrue(any("supervise/queue" in t or t.startswith("/run/supervise")
+4 -4
View File
@@ -292,10 +292,10 @@ class TestCodexSuperviseMcp(unittest.TestCase):
bottle.exec.assert_called_once()
script = bottle.exec.call_args.args[0]
self.assertEqual("node", bottle.exec.call_args.kwargs.get("user"))
self.assertIn("codex mcp add", script)
self.assertIn("--transport http", script)
self.assertIn("supervise", script)
self.assertIn(_URL, script)
self.assertEqual(
f"codex mcp add supervise --url {_URL}",
script,
)
def test_logs_warning_on_failure_but_does_not_raise(self):
bottle = _make_bottle(
+54 -11
View File
@@ -2,12 +2,15 @@
add_route removed; docker exec / cp / kill paths are covered by the
integration test)."""
import tempfile
import unittest
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import patch
from bot_bottle.backend.docker.egress_apply import (
EgressApplyError,
validate_routes_content,
)
from bot_bottle import supervise
from bot_bottle.backend.egress_apply import EgressApplyError
from bot_bottle.backend.docker.egress_apply import applicator
_ROUTES_EMPTY = "routes: []\n"
@@ -16,11 +19,11 @@ _ROUTES_ONE = 'routes:\n - host: "api.anthropic.com"\n'
class TestValidateRoutesContent(unittest.TestCase):
def test_accepts_minimal_route_table(self):
validate_routes_content(_ROUTES_EMPTY)
validate_routes_content(_ROUTES_ONE)
applicator.validate_routes_content(_ROUTES_EMPTY)
applicator.validate_routes_content(_ROUTES_ONE)
def test_accepts_full_route_with_matches(self):
validate_routes_content(
applicator.validate_routes_content(
'routes:\n'
' - host: "api.github.com"\n'
' auth_scheme: "Bearer"\n'
@@ -32,25 +35,65 @@ class TestValidateRoutesContent(unittest.TestCase):
def test_rejects_bad_yaml(self):
with self.assertRaises(EgressApplyError) as cm:
validate_routes_content("routes:\n\t- host: x\n")
applicator.validate_routes_content("routes:\n\t- host: x\n")
self.assertIn("not valid", str(cm.exception))
def test_rejects_missing_routes_key(self):
with self.assertRaises(EgressApplyError):
validate_routes_content("other: []\n")
applicator.validate_routes_content("other: []\n")
def test_rejects_non_list_routes(self):
with self.assertRaises(EgressApplyError):
validate_routes_content('routes: "not a list"\n')
applicator.validate_routes_content('routes: "not a list"\n')
def test_rejects_partial_auth_pair(self):
with self.assertRaises(EgressApplyError):
validate_routes_content(
applicator.validate_routes_content(
'routes:\n'
' - host: "x.example"\n'
' auth_scheme: "Bearer"\n'
)
class TestApplyRoutesChange(unittest.TestCase):
def setUp(self):
self._tmp = tempfile.TemporaryDirectory(prefix="egress-apply-test.")
original = supervise.bot_bottle_root
def fake_root() -> Path:
return Path(self._tmp.name) / ".bot-bottle"
supervise.bot_bottle_root = fake_root # type: ignore[assignment]
self.addCleanup(lambda: setattr(supervise, "bot_bottle_root", original))
self.addCleanup(self._tmp.cleanup)
def test_writes_live_routes_and_signals_reload(self):
calls: list[list[str]] = []
def fake_run(argv: list[str], **kwargs: object) -> SimpleNamespace:
calls.append(list(argv))
return SimpleNamespace(returncode=0, stdout="", stderr="")
with patch(
"bot_bottle.backend.docker.egress_apply.subprocess.run",
side_effect=fake_run,
):
before, after = applicator.apply_routes_change(
"dev",
"routes:\n - host: google.com\n",
)
self.assertEqual("", before)
self.assertEqual("routes:\n - host: google.com\n", after)
self.assertEqual(
"routes:\n - host: google.com\n",
(Path(self._tmp.name) / ".bot-bottle/state/dev/egress/routes.yaml").read_text(encoding="utf-8"),
)
self.assertEqual(
["docker", "kill", "--signal", "HUP", "bot-bottle-sidecars-dev"],
calls[0],
)
if __name__ == "__main__":
unittest.main()
+2 -7
View File
@@ -27,7 +27,7 @@ def _plan(
agent_git_gate_url: str = "",
agent_supervise_url: str = "",
) -> MacosContainerBottlePlan:
routes_path = stage_dir / "source-routes.yaml"
routes_path = stage_dir / "routes.yaml"
routes_path.write_text("routes: []\n", encoding="utf-8")
ca_dir = stage_dir / "egress-ca"
ca_dir.mkdir(exist_ok=True)
@@ -125,15 +125,10 @@ class TestMacosContainerLaunchArgv(unittest.TestCase):
f"type=bind,source={self.stage_dir / 'egress-ca'},target=/home/mitmproxy/.mitmproxy",
argv,
)
routes_dir = self.stage_dir / "macos-container-egress"
self.assertIn(
f"type=bind,source={routes_dir},target=/etc/egress,readonly",
f"type=bind,source={self.stage_dir},target=/etc/egress,readonly",
argv,
)
self.assertEqual(
"routes: []\n",
(routes_dir / "routes.yaml").read_text(encoding="utf-8"),
)
self.assertIn(
"type=bind,source=/state/supervise/queue,target=/run/supervise/queue",
argv,
+10 -3
View File
@@ -317,15 +317,22 @@ class TestToolConstants(unittest.TestCase):
def test_tools_tuple_matches_individual_constants(self):
self.assertEqual(
(
supervise.TOOL_ALLOW,
TOOL_CAPABILITY_BLOCK,
supervise.TOOL_EGRESS_BLOCK,
supervise.TOOL_LIST_EGRESS_ROUTES,
),
supervise.TOOLS,
)
def test_component_map_has_no_entries(self):
# egress-block removed in issue #198; capability-block never had one.
self.assertEqual({}, supervise.COMPONENT_FOR_TOOL)
def test_component_map_has_egress_entries(self):
self.assertEqual(
{
supervise.TOOL_ALLOW: "egress",
supervise.TOOL_EGRESS_BLOCK: "egress",
},
supervise.COMPONENT_FOR_TOOL,
)
class _StubSupervise(supervise.Supervise):
+19 -3
View File
@@ -2,9 +2,6 @@
The curses TUI itself isn't exercised here — these tests cover the
discovery + approve/reject paths that the TUI's key handlers call into.
egress-block (add_route) was removed in issue #198; the TestEgressApplyWiring
class and all stubs for add_route have been dropped accordingly.
"""
import os
@@ -12,6 +9,7 @@ import tempfile
import unittest
from datetime import datetime, timezone
from pathlib import Path
from unittest.mock import patch
from bot_bottle import supervise
from bot_bottle.cli import supervise as supervise_cli
@@ -33,6 +31,8 @@ FIXED = datetime(2026, 5, 25, 12, 0, 0, tzinfo=timezone.utc)
def _proposal(slug: str = "dev", tool: str = TOOL_CAPABILITY_BLOCK) -> Proposal:
payloads = {
TOOL_CAPABILITY_BLOCK: "FROM python:3.13\n",
supervise.TOOL_ALLOW: "routes:\n - host: example.com\n",
supervise.TOOL_EGRESS_BLOCK: "routes:\n - host: example.com\n",
}
payload = payloads.get(tool, "")
return Proposal.new(
@@ -154,6 +154,22 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
supervise_cli.approve(qp)
self.assertEqual([], read_audit_entries("egress", "dev"))
def test_approve_egress_block_writes_audit_log(self):
qp = self._enqueue(tool=supervise.TOOL_EGRESS_BLOCK)
with patch(
"bot_bottle.cli.supervise.apply_routes_change",
return_value=("routes: []\n", "routes:\n - host: example.com\n"),
) as apply_routes_change:
supervise_cli.approve(qp)
apply_routes_change.assert_called_once_with(
"dev",
"routes:\n - host: example.com\n",
)
entries = read_audit_entries("egress", "dev")
self.assertEqual(1, len(entries))
self.assertEqual(STATUS_APPROVED, entries[0].operator_action)
self.assertEqual("needed for dev", entries[0].justification)
# class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
# # DISABLED — capability_apply functionality is currently commented out.
+45 -5
View File
@@ -54,13 +54,19 @@ class TestValidation(unittest.TestCase):
)
def test_empty_proposed_file_rejected_for_tools_with_file_field(self):
# egress-block has structured input (validated in
# _validate_and_bundle_egress_route, not here) and
# list-egress-routes takes no input. Only capability-block
# goes through `validate_proposed_file`.
with self.assertRaises(_RpcError):
validate_proposed_file(_sv.TOOL_CAPABILITY_BLOCK, " \n\t")
def test_egress_routes_yaml_is_validated(self):
validate_proposed_file(
_sv.TOOL_ALLOW,
"routes:\n - host: example.com\n",
)
def test_invalid_egress_routes_yaml_rejected(self):
with self.assertRaises(_RpcError):
validate_proposed_file(_sv.TOOL_EGRESS_BLOCK, "routes: nope\n")
# --- JSON-RPC parsing ------------------------------------------------------
@@ -141,7 +147,9 @@ class TestHandleToolsList(unittest.TestCase):
names = [t["name"] for t in result["tools"]] # type: ignore[index]
self.assertEqual(
sorted([
_sv.TOOL_ALLOW,
_sv.TOOL_CAPABILITY_BLOCK,
_sv.TOOL_EGRESS_BLOCK,
_sv.TOOL_LIST_EGRESS_ROUTES,
]),
sorted(names),
@@ -172,6 +180,17 @@ class TestHandleToolsList(unittest.TestCase):
# No `required` array because no inputs are required.
self.assertNotIn("required", schema) # type: ignore[operator]
def test_egress_tools_take_routes_yaml_and_justification(self):
for tool_name in (_sv.TOOL_ALLOW, _sv.TOOL_EGRESS_BLOCK):
with self.subTest(tool_name=tool_name):
tool = next(t for t in TOOL_DEFINITIONS if t["name"] == tool_name)
schema = tool["inputSchema"]
self.assertEqual("object", schema["type"]) # type: ignore[index]
self.assertEqual(
["routes_yaml", "justification"],
schema["required"], # type: ignore[index]
)
class TestHandleToolsCall(unittest.TestCase):
def setUp(self):
@@ -220,6 +239,26 @@ class TestHandleToolsCall(unittest.TestCase):
self.assertIn("status: approved", text)
self.assertIn("notes: lgtm", text)
def test_allow_round_trips_through_queue(self):
responder = self._respond_when_proposal_appears(_sv.STATUS_APPROVED, notes="ok")
try:
result = handle_tools_call(
{
"name": _sv.TOOL_ALLOW,
"arguments": {
"routes_yaml": "routes:\n - host: example.com\n",
"justification": "need example.com",
},
},
self.config,
)
finally:
responder.join()
self.assertFalse(result["isError"]) # type: ignore[index]
text = result["content"][0]["text"] # type: ignore[index]
self.assertIn("status: approved", text)
self.assertIn("notes: ok", text)
def test_rejected_response_sets_isError(self):
responder = self._respond_when_proposal_appears(_sv.STATUS_REJECTED, notes="nope")
try:
@@ -412,7 +451,8 @@ class TestHttpEndToEnd(unittest.TestCase):
self.assertEqual(1, result["id"])
names = [t["name"] for t in result["result"]["tools"]] # type: ignore[index]
self.assertIn(_sv.TOOL_CAPABILITY_BLOCK, names)
self.assertNotIn("egress-block", names)
self.assertIn(_sv.TOOL_ALLOW, names)
self.assertIn(_sv.TOOL_EGRESS_BLOCK, names)
def test_unknown_method_returns_jsonrpc_error(self):
result = self._post_jsonrpc(