Compare commits

...

12 Commits

Author SHA1 Message Date
didericis-claude 200306f1cf refactor: export applicator singletons from egress_apply backends
test / unit (pull_request) Successful in 35s
test / integration (pull_request) Successful in 21s
lint / lint (push) Successful in 1m44s
test / unit (push) Successful in 32s
test / integration (push) Successful in 19s
Update Quality Badges / update-badges (push) Successful in 1m17s
Replace module-level apply_routes_change wrappers with a public
applicator singleton in each backend. Callers now work with the
EgressApplicator instance directly (applicator.apply_routes_change)
rather than through a function shim.
2026-06-23 20:39:05 +00:00
didericis-claude 77bdaf0a96 refactor: extract EgressApplicator base class shared between backends
lint / lint (push) Successful in 1m56s
test / unit (pull_request) Successful in 42s
test / integration (pull_request) Successful in 20s
Pulls the duplicated apply_routes_change / validate_routes_content /
_routes_path logic into EgressApplicator (ABC) in backend/egress_apply.py.
DockerEgressApplicator and MacOSContainerEgressApplicator override the
single abstract _signal_bundle_reload method with their respective kill
commands. Module-level shims preserve the existing public API.
2026-06-23 20:33:43 +00:00
didericis 7e344bbb53 fix: add lowercase proxy env vars, route_to_yaml_dict, and richer tool descriptions
lint / lint (push) Successful in 1m51s
test / unit (pull_request) Successful in 41s
test / integration (pull_request) Successful in 18s
- Set http_proxy/https_proxy (lowercase) alongside uppercase variants in smolmachines guest env for tools that only check lowercase
- Replace dataclasses.asdict with route_to_yaml_dict in /allowlist introspection so returned routes use YAML-schema-compatible keys
- Expand routes_yaml tool description in supervise_server to document all accepted route keys, making the round-trip from list-egress-routes to propose/apply explicit

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-23 16:13:07 -04:00
didericis-claude 5eb27cd9a8 fix: mount egress dir (not file) for docker and smolmachines backends
lint / lint (push) Successful in 1m37s
test / unit (pull_request) Successful in 31s
test / integration (pull_request) Successful in 16s
Mirrors the fix already applied to the macos-container backend in
eb3e64e: bind-mount the parent egress directory instead of the
routes file itself, so the live routes update is visible inside the
running sidecar bundle when the host overwrites the file.
2026-06-23 09:05:44 +00:00
didericis-claude 5808d0b828 feat: add smolmachines/egress_apply proxying docker backend
lint / lint (push) Successful in 1m40s
test / unit (pull_request) Successful in 33s
test / integration (pull_request) Successful in 16s
2026-06-23 06:53:56 +00:00
didericis-claude 7a991e1f5e refactor: split _signal_bundle_reload per backend, move macos egress to macos_container
lint / lint (push) Successful in 1m47s
test / unit (pull_request) Successful in 38s
test / integration (pull_request) Successful in 19s
2026-06-23 05:57:07 +00:00
didericis-claude 5606797ac2 refactor: drop legacy routes path fallback from _routes_path
lint / lint (push) Successful in 1m37s
test / unit (pull_request) Failing after 29s
test / integration (pull_request) Successful in 18s
2026-06-23 05:48:50 +00:00
didericis-claude ebbb4053cf fix: add type annotations to fake_run in test_egress_apply
lint / lint (push) Successful in 1m40s
test / unit (pull_request) Successful in 36s
test / integration (pull_request) Successful in 18s
2026-06-23 05:47:11 +00:00
didericis eb3e64ea8f fix(macos-container): mount live egress routes dir
lint / lint (push) Failing after 1m35s
test / unit (pull_request) Successful in 33s
test / integration (pull_request) Successful in 16s
2026-06-23 01:39:29 -04:00
didericis 0ec1085238 fix(supervise): apply egress approvals
lint / lint (push) Failing after 1m34s
test / unit (pull_request) Successful in 33s
test / integration (pull_request) Successful in 15s
2026-06-23 01:33:35 -04:00
didericis 4c39b45e34 fix(supervise): restore egress proposal tools
lint / lint (push) Successful in 1m35s
test / unit (pull_request) Successful in 30s
test / integration (pull_request) Successful in 16s
2026-06-23 01:24:28 -04:00
didericis-codex 3ea35ba5d2 fix: update codex supervise mcp registration
lint / lint (push) Successful in 1m54s
test / unit (pull_request) Successful in 38s
test / integration (pull_request) Successful in 22s
2026-06-23 04:06:21 +00:00
21 changed files with 502 additions and 99 deletions
+1 -1
View File
@@ -134,7 +134,7 @@ def _sidecar_bundle_service(plan: DockerBottlePlan) -> dict[str, Any]:
ep = plan.egress_plan ep = plan.egress_plan
volumes.append(_bind(ep.mitmproxy_ca_host_path, EGRESS_CA_IN_CONTAINER)) volumes.append(_bind(ep.mitmproxy_ca_host_path, EGRESS_CA_IN_CONTAINER))
if ep.routes: if ep.routes:
volumes.append(_bind(ep.routes_path, EGRESS_ROUTES_IN_CONTAINER)) volumes.append(_bind(ep.routes_path.parent, str(Path(EGRESS_ROUTES_IN_CONTAINER).parent)))
for token_env in sorted(ep.token_env_map.keys()): for token_env in sorted(ep.token_env_map.keys()):
env.append(token_env) env.append(token_env)
+28 -17
View File
@@ -1,24 +1,21 @@
"""Host-side helper for egress sidecar inspection (issue #198). """Host-side helper for egress sidecar inspection and live updates.
`_merge_single_route`, `add_route`, and `apply_routes_change` were The approve path uses this module to validate a proposed routes file,
removed when the egress-block MCP tool was dropped. The remaining write it to the bottle's live egress state dir, and signal the sidecar
helpers support runtime inspection and validation of the routes file bundle so the mitmproxy addon reloads it.
without modifying it at runtime.
""" """
from __future__ import annotations from __future__ import annotations
import os
import subprocess import subprocess
from ...egress import EGRESS_ROUTES_IN_CONTAINER from ...egress import EGRESS_ROUTES_IN_CONTAINER
from ...egress_addon_core import load_routes from ...log import warn
from ..egress_apply import EgressApplicator, EgressApplyError
from .sidecar_bundle import sidecar_bundle_container_name from .sidecar_bundle import sidecar_bundle_container_name
class EgressApplyError(RuntimeError):
pass
def fetch_current_routes(slug: str) -> str: def fetch_current_routes(slug: str) -> str:
container = sidecar_bundle_container_name(slug) container = sidecar_bundle_container_name(slug)
r = subprocess.run( r = subprocess.run(
@@ -33,17 +30,31 @@ def fetch_current_routes(slug: str) -> str:
return r.stdout return r.stdout
def validate_routes_content(content: str) -> None: class DockerEgressApplicator(EgressApplicator):
try: def _signal_bundle_reload(self, slug: str) -> None:
load_routes(content) container = sidecar_bundle_container_name(slug)
except ValueError as e: result = subprocess.run(
["docker", "kill", "--signal", "HUP", container],
capture_output=True, text=True, check=False, env=os.environ,
)
if result.returncode != 0:
last_error = (result.stderr or "").strip() or (result.stdout or "").strip()
warn(
f"egress: routes updated on disk for {slug}, but bundle reload failed: "
f"{last_error or 'docker kill failed'}"
)
raise EgressApplyError( raise EgressApplyError(
f"proposed routes.yaml is not valid: {e}" f"could not reload egress bundle {container}: "
) from e f"{last_error or 'docker kill failed'}"
)
applicator = DockerEgressApplicator()
__all__ = [ __all__ = [
"DockerEgressApplicator",
"EgressApplyError", "EgressApplyError",
"applicator",
"fetch_current_routes", "fetch_current_routes",
"validate_routes_content",
] ]
+50
View File
@@ -0,0 +1,50 @@
"""Shared base class for host-side egress apply across backends.
Each backend subclasses EgressApplicator and overrides _signal_bundle_reload
with the backend-specific kill command.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from pathlib import Path
from ..bottle_state import egress_state_dir
from ..egress import EGRESS_ROUTES_FILENAME
from ..egress_addon_core import load_routes
class EgressApplyError(RuntimeError):
pass
class EgressApplicator(ABC):
def apply_routes_change(self, slug: str, content: str) -> tuple[str, str]:
"""Persist `content` to the live routes file and reload egress."""
self.validate_routes_content(content)
routes_path = self._routes_path(slug)
routes_path.parent.mkdir(parents=True, exist_ok=True)
before = routes_path.read_text(encoding="utf-8") if routes_path.exists() else ""
routes_path.write_text(content, encoding="utf-8")
routes_path.chmod(0o600)
self._signal_bundle_reload(slug)
return before, content
@staticmethod
def validate_routes_content(content: str) -> None:
try:
load_routes(content)
except ValueError as e:
raise EgressApplyError(
f"proposed routes.yaml is not valid: {e}"
) from e
@staticmethod
def _routes_path(slug: str) -> Path:
return egress_state_dir(slug) / EGRESS_ROUTES_FILENAME
@abstractmethod
def _signal_bundle_reload(self, slug: str) -> None: ...
__all__ = ["EgressApplicator", "EgressApplyError"]
@@ -0,0 +1,39 @@
"""Host-side egress apply for the macos-container backend.
Uses `container kill --signal HUP` (Apple Container framework) instead
of `docker kill` to signal the sidecar bundle.
"""
from __future__ import annotations
import os
import subprocess
from ...log import warn
from ..egress_apply import EgressApplicator, EgressApplyError
from .launch import sidecar_container_name
class MacOSContainerEgressApplicator(EgressApplicator):
def _signal_bundle_reload(self, slug: str) -> None:
container = sidecar_container_name(slug)
result = subprocess.run(
["container", "kill", "--signal", "HUP", container],
capture_output=True, text=True, check=False, env=os.environ,
)
if result.returncode != 0:
last_error = (result.stderr or "").strip() or (result.stdout or "").strip()
warn(
f"egress: routes updated on disk for {slug}, but bundle reload failed: "
f"{last_error or 'container kill failed'}"
)
raise EgressApplyError(
f"could not reload egress bundle {container}: "
f"{last_error or 'container kill failed'}"
)
applicator = MacOSContainerEgressApplicator()
__all__ = ["MacOSContainerEgressApplicator", "EgressApplyError", "applicator"]
+1 -13
View File
@@ -12,7 +12,6 @@ from __future__ import annotations
import dataclasses import dataclasses
import os import os
import shutil
import subprocess import subprocess
from contextlib import ExitStack, contextmanager from contextlib import ExitStack, contextmanager
from pathlib import Path from pathlib import Path
@@ -364,7 +363,7 @@ def _sidecar_mounts(
)) ))
if ep.routes: if ep.routes:
mounts.append(( mounts.append((
str(_stage_routes_dir(plan)), str(ep.routes_path.parent),
str(Path(EGRESS_ROUTES_IN_CONTAINER).parent), str(Path(EGRESS_ROUTES_IN_CONTAINER).parent),
True, True,
)) ))
@@ -375,17 +374,6 @@ def _sidecar_mounts(
return tuple(mounts) return tuple(mounts)
def _stage_routes_dir(plan: MacosContainerBottlePlan) -> Path:
routes_dir = plan.stage_dir / "macos-container-egress"
routes_dir.mkdir(parents=True, exist_ok=True)
shutil.copyfile(
plan.egress_plan.routes_path,
routes_dir / Path(EGRESS_ROUTES_IN_CONTAINER).name,
)
return routes_dir
def _mount_spec(host_path: str, container_path: str, read_only: bool) -> str: def _mount_spec(host_path: str, container_path: str, read_only: bool) -> str:
spec = f"type=bind,source={host_path},target={container_path}" spec = f"type=bind,source={host_path},target={container_path}"
if read_only: if read_only:
@@ -0,0 +1,21 @@
"""Egress apply for the smolmachines backend.
The smolmachines sidecar bundle runs as a host-side Docker container,
so egress signalling is identical to the docker backend.
"""
from __future__ import annotations
from ..docker.egress_apply import ( # noqa: F401
DockerEgressApplicator,
EgressApplyError,
applicator,
fetch_current_routes,
)
__all__ = [
"DockerEgressApplicator",
"EgressApplyError",
"applicator",
"fetch_current_routes",
]
+6 -2
View File
@@ -217,11 +217,15 @@ def _discover_urls(
agent_supervise_url = f"http://{loopback_ip}:{supervise_host_port}/" agent_supervise_url = f"http://{loopback_ip}:{supervise_host_port}/"
existing_no_proxy = plan.guest_env.get("NO_PROXY", "localhost,127.0.0.1") existing_no_proxy = plan.guest_env.get("NO_PROXY", "localhost,127.0.0.1")
no_proxy = f"{existing_no_proxy},{loopback_ip}"
guest_env = { guest_env = {
**plan.guest_env, **plan.guest_env,
"HTTPS_PROXY": agent_proxy_url, "HTTPS_PROXY": agent_proxy_url,
"HTTP_PROXY": agent_proxy_url, "HTTP_PROXY": agent_proxy_url,
"NO_PROXY": f"{existing_no_proxy},{loopback_ip}", "https_proxy": agent_proxy_url,
"http_proxy": agent_proxy_url,
"NO_PROXY": no_proxy,
"no_proxy": no_proxy,
} }
if agent_git_gate_host: if agent_git_gate_host:
guest_env["GIT_GATE_URL"] = f"http://{agent_git_gate_host}" guest_env["GIT_GATE_URL"] = f"http://{agent_git_gate_host}"
@@ -308,7 +312,7 @@ def _bundle_launch_spec(
ep = plan.egress_plan ep = plan.egress_plan
volumes.append((str(ep.mitmproxy_ca_host_path), EGRESS_CA_IN_CONTAINER, True)) volumes.append((str(ep.mitmproxy_ca_host_path), EGRESS_CA_IN_CONTAINER, True))
if ep.routes: if ep.routes:
volumes.append((str(ep.routes_path), EGRESS_ROUTES_IN_CONTAINER, True)) volumes.append((str(ep.routes_path.parent), str(Path(EGRESS_ROUTES_IN_CONTAINER).parent), True))
# Bare-name entries for upstream-token slots. Their values # Bare-name entries for upstream-token slots. Their values
# come from the docker-run subprocess env (inherited from # come from the docker-run subprocess env (inherited from
# the operator's shell), never landing on argv. # the operator's shell), never landing on argv.
+34 -3
View File
@@ -3,7 +3,8 @@ act on them (approve / modify / reject).
Curses-based TUI; modify-then-approve shells out to $EDITOR. The Curses-based TUI; modify-then-approve shells out to $EDITOR. The
approval handler wires to PRD 0016 (capability-block), which rebuilds approval handler wires to PRD 0016 (capability-block), which rebuilds
the bottle Dockerfile. The egress-block tool was removed in issue #198. the bottle Dockerfile. Egress proposals are queued for operator review
as full routes.yaml updates.
""" """
from __future__ import annotations from __future__ import annotations
@@ -20,11 +21,21 @@ from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from .. import supervise as _supervise from .. import supervise as _supervise
# from ..bottle_state import read_metadata from ..bottle_state import read_metadata
# from ..backend.docker.capability_apply import ( # from ..backend.docker.capability_apply import (
# CapabilityApplyError, # CapabilityApplyError,
# apply_capability_change, # apply_capability_change,
# ) # )
from ..backend.docker.egress_apply import (
EgressApplyError,
applicator as _docker_applicator,
)
from ..backend.macos_container.egress_apply import (
applicator as _macos_applicator,
)
from ..backend.smolmachines.egress_apply import (
applicator as _smolmachines_applicator,
)
from ..log import Die, error, info from ..log import Die, error, info
@@ -40,6 +51,8 @@ from ..supervise import (
STATUS_MODIFIED, STATUS_MODIFIED,
STATUS_REJECTED, STATUS_REJECTED,
TOOL_CAPABILITY_BLOCK, TOOL_CAPABILITY_BLOCK,
TOOL_ALLOW,
TOOL_EGRESS_BLOCK,
archive_proposal, archive_proposal,
list_pending_proposals, list_pending_proposals,
render_diff, render_diff,
@@ -63,7 +76,17 @@ class QueuedProposal:
# Errors any remediation engine may raise. Caught by the TUI key # Errors any remediation engine may raise. Caught by the TUI key
# handlers and surfaced in the status line so a failed apply keeps # handlers and surfaced in the status line so a failed apply keeps
# the proposal pending rather than crashing curses. # the proposal pending rather than crashing curses.
ApplyError = (CapabilityApplyError,) ApplyError = (CapabilityApplyError, EgressApplyError)
def apply_routes_change(slug: str, content: str) -> tuple[str, str]:
meta = read_metadata(slug)
backend = meta.backend if meta is not None else ""
if backend == "macos-container":
return _macos_applicator.apply_routes_change(slug, content)
if backend == "smolmachines":
return _smolmachines_applicator.apply_routes_change(slug, content)
return _docker_applicator.apply_routes_change(slug, content)
def discover_pending() -> list[QueuedProposal]: def discover_pending() -> list[QueuedProposal]:
@@ -115,6 +138,8 @@ def _detail_lines(
def _suffix_for_tool(tool: str) -> str: def _suffix_for_tool(tool: str) -> str:
if tool == TOOL_CAPABILITY_BLOCK: if tool == TOOL_CAPABILITY_BLOCK:
return ".dockerfile" return ".dockerfile"
if tool in (TOOL_ALLOW, TOOL_EGRESS_BLOCK):
return ".yaml"
return ".txt" return ".txt"
@@ -129,6 +154,7 @@ def approve(
) -> None: ) -> None:
"""Apply the proposal, write the waiting response, and audit it.""" """Apply the proposal, write the waiting response, and audit it."""
status = STATUS_MODIFIED if final_file is not None else STATUS_APPROVED status = STATUS_MODIFIED if final_file is not None else STATUS_APPROVED
file_to_apply = final_file if final_file is not None else qp.proposal.proposed_file
diff_before, diff_after = "", "" diff_before, diff_after = "", ""
# if qp.proposal.tool == TOOL_CAPABILITY_BLOCK: # if qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
@@ -142,6 +168,11 @@ def approve(
# diff_before, diff_after = apply_capability_change( # diff_before, diff_after = apply_capability_change(
# qp.proposal.bottle_slug, file_to_apply, # qp.proposal.bottle_slug, file_to_apply,
# ) # )
if qp.proposal.tool in (TOOL_ALLOW, TOOL_EGRESS_BLOCK):
diff_before, diff_after = apply_routes_change(
qp.proposal.bottle_slug,
file_to_apply,
)
response = Response( response = Response(
proposal_id=qp.proposal.id, proposal_id=qp.proposal.id,
+3 -3
View File
@@ -261,8 +261,8 @@ class CodexAgentProvider(AgentProvider):
return return
info(f"registering supervise MCP server in agent codex config → {supervise_url}") info(f"registering supervise MCP server in agent codex config → {supervise_url}")
r = bottle.exec( r = bottle.exec(
f"codex mcp add --transport http " f"codex mcp add {_SUPERVISE_MCP_NAME} --url "
f"{_SUPERVISE_MCP_NAME} {supervise_url}", f"{shlex.quote(supervise_url)}",
user="node", user="node",
) )
if r.returncode != 0: if r.returncode != 0:
@@ -270,7 +270,7 @@ class CodexAgentProvider(AgentProvider):
f"`codex mcp add supervise` failed (exit {r.returncode}): " f"`codex mcp add supervise` failed (exit {r.returncode}): "
f"{(r.stderr or r.stdout or '').strip()}. Inside the bottle, " f"{(r.stderr or r.stdout or '').strip()}. Inside the bottle, "
f"register manually with: " f"register manually with: "
f"codex mcp add --transport http supervise {supervise_url}" f"codex mcp add supervise --url {shlex.quote(supervise_url)}"
) )
+3 -1
View File
@@ -31,6 +31,7 @@ CODEX_HOST_CREDENTIAL_TOKEN_REF = "BOT_BOTTLE_CODEX_HOST_ACCESS_TOKEN"
EGRESS_HOSTNAME = "egress" EGRESS_HOSTNAME = "egress"
EGRESS_ROUTES_IN_CONTAINER = "/etc/egress/routes.yaml" EGRESS_ROUTES_IN_CONTAINER = "/etc/egress/routes.yaml"
EGRESS_ROUTES_FILENAME = Path(EGRESS_ROUTES_IN_CONTAINER).name
@dataclass(frozen=True) @dataclass(frozen=True)
@@ -295,7 +296,7 @@ class Egress(ABC):
) -> EgressPlan: ) -> EgressPlan:
routes = egress_routes_for_bottle(bottle, provider_routes) routes = egress_routes_for_bottle(bottle, provider_routes)
log = bottle.egress.Log log = bottle.egress.Log
routes_path = stage_dir / "egress_routes.yaml" routes_path = stage_dir / EGRESS_ROUTES_FILENAME
routes_path.write_text(egress_render_routes(routes, log=log)) routes_path.write_text(egress_render_routes(routes, log=log))
routes_path.chmod(0o600) routes_path.chmod(0o600)
return EgressPlan( return EgressPlan(
@@ -309,6 +310,7 @@ class Egress(ABC):
__all__ = [ __all__ = [
"CODEX_HOST_CREDENTIAL_TOKEN_REF", "CODEX_HOST_CREDENTIAL_TOKEN_REF",
"EGRESS_HOSTNAME", "EGRESS_HOSTNAME",
"EGRESS_ROUTES_FILENAME",
"EGRESS_ROUTES_IN_CONTAINER", "EGRESS_ROUTES_IN_CONTAINER",
"Egress", "Egress",
"EgressPlan", "EgressPlan",
+2 -2
View File
@@ -5,7 +5,6 @@ egress container."""
from __future__ import annotations from __future__ import annotations
import dataclasses
import json import json
import os import os
import signal import signal
@@ -27,6 +26,7 @@ from egress_addon_core import ( # type: ignore[import-not-found] # pylint: dis
load_config, load_config,
match_route, match_route,
outbound_scan_headers, outbound_scan_headers,
route_to_yaml_dict,
scan_inbound, scan_inbound,
scan_outbound, scan_outbound,
) )
@@ -82,7 +82,7 @@ class EgressAddon:
def _serve_introspection(self, flow: http.HTTPFlow, path: str) -> None: def _serve_introspection(self, flow: http.HTTPFlow, path: str) -> None:
if path == "/allowlist": if path == "/allowlist":
payload = json.dumps( payload = json.dumps(
{"routes": [dataclasses.asdict(r) for r in self.config.routes]}, {"routes": [route_to_yaml_dict(r) for r in self.config.routes]},
indent=2, indent=2,
).encode("utf-8") ).encode("utf-8")
flow.response = http.Response.make( flow.response = http.Response.make(
+51
View File
@@ -359,6 +359,56 @@ def _parse_one(idx: int, raw: object) -> Route:
) )
def _path_match_to_dict(pm: PathMatch) -> dict[str, object]:
d: dict[str, object] = {"value": pm.value}
if pm.type != "prefix":
d["type"] = pm.type
return d
def _header_match_to_dict(hm: HeaderMatch) -> dict[str, object]:
d: dict[str, object] = {"name": hm.name, "value": hm.value}
if hm.type != "exact":
d["type"] = hm.type
return d
def _match_entry_to_dict(me: MatchEntry) -> dict[str, object]:
d: dict[str, object] = {}
if me.paths:
d["paths"] = [_path_match_to_dict(p) for p in me.paths]
if me.methods:
d["methods"] = list(me.methods)
if me.headers:
d["headers"] = [_header_match_to_dict(h) for h in me.headers]
return d
def route_to_yaml_dict(r: Route) -> dict[str, object]:
"""Serialize a Route to YAML-schema-compatible dict.
Uses the same field names the YAML parser accepts, so the output
can be round-tripped directly into an `allow` or `egress-block`
proposal without translation. Fields that are empty/default are
omitted so the agent doesn't copy irrelevant keys."""
d: dict[str, object] = {"host": r.host}
if r.auth_scheme:
d["auth_scheme"] = r.auth_scheme
d["token_env"] = r.token_env
if r.matches:
d["matches"] = [_match_entry_to_dict(m) for m in r.matches]
if r.git_fetch:
d["git"] = {"fetch": True}
dlp: dict[str, object] = {}
if r.outbound_detectors is not None:
dlp["outbound_detectors"] = list(r.outbound_detectors)
if r.inbound_detectors is not None:
dlp["inbound_detectors"] = list(r.inbound_detectors)
if dlp:
d["dlp"] = dlp
return d
def load_routes(text: str) -> tuple[Route, ...]: def load_routes(text: str) -> tuple[Route, ...]:
"""Parse YAML text → routes.""" """Parse YAML text → routes."""
try: try:
@@ -698,6 +748,7 @@ def scan_inbound(
__all__ = [ __all__ = [
"LOG_BLOCKS", "LOG_BLOCKS",
"route_to_yaml_dict",
"LOG_FULL", "LOG_FULL",
"LOG_OFF", "LOG_OFF",
"Config", "Config",
+19 -12
View File
@@ -5,7 +5,7 @@ queue/audit support. The sidecar (bot_bottle.supervise_server)
sits on the bottle's internal network and exposes three MCP tools the sits on the bottle's internal network and exposes three MCP tools the
agent calls when it hits a stuck-recovery category: agent calls when it hits a stuck-recovery category:
* egress-block agent proposes a new routes.yaml * egress-block / allow agent proposes a new routes.yaml
* capability-block agent proposes a new agent Dockerfile * capability-block agent proposes a new agent Dockerfile
Each tool call: the agent passes the full proposed file plus a Each tool call: the agent passes the full proposed file plus a
@@ -49,27 +49,34 @@ SUPERVISE_HOSTNAME = "supervise"
SUPERVISE_PORT = 9100 SUPERVISE_PORT = 9100
TOOL_CAPABILITY_BLOCK = "capability-block" TOOL_CAPABILITY_BLOCK = "capability-block"
TOOL_EGRESS_BLOCK = "egress-block"
TOOL_ALLOW = "allow"
TOOL_LIST_EGRESS_ROUTES = "list-egress-routes" TOOL_LIST_EGRESS_ROUTES = "list-egress-routes"
TOOLS: tuple[str, ...] = ( TOOLS: tuple[str, ...] = (
TOOL_ALLOW,
TOOL_CAPABILITY_BLOCK, TOOL_CAPABILITY_BLOCK,
TOOL_EGRESS_BLOCK,
TOOL_LIST_EGRESS_ROUTES, TOOL_LIST_EGRESS_ROUTES,
) )
# The supervise sidecar uses these to query egress's # The supervise sidecar uses these to query egress's
# introspection endpoint for the `list-egress-routes` MCP # introspection endpoint for the `list-egress-routes` MCP
# tool. The hostname + port match egress's docker network # tool. The hostname + port match egress's docker network
# alias + listen port (see bot_bottle.egress.EGRESS_HOSTNAME # listen port (see backend.docker.egress.EGRESS_PORT). The supervise
# and backend.docker.egress.EGRESS_PORT — the values # daemon runs inside the sidecar bundle alongside egress, so loopback
# are inlined here so the in-container supervise_server doesn't # is the stable address across docker, smolmachines, and Apple
# need to import the egress package). # Container backends.
EGRESS_FORWARD_PROXY = "http://egress:9099" EGRESS_FORWARD_PROXY = "http://127.0.0.1:9099"
EGRESS_INTROSPECT_URL = "http://_egress.local/allowlist" EGRESS_INTROSPECT_URL = "http://_egress.local/allowlist"
# capability-block has no on-disk config the operator edits in place # capability-block has no on-disk config the operator edits in place
# (the Dockerfile is rebuilt, not patched), so it has no audit log # (the Dockerfile is rebuilt, not patched), so it has no audit log
# here — those changes are captured by git history + the rebuild # here — those changes are captured by git history + the rebuild record
# record laid down in PRD 0016. egress-block was removed in issue #198. # laid down in PRD 0016.
COMPONENT_FOR_TOOL: dict[str, str] = {} COMPONENT_FOR_TOOL: dict[str, str] = {
TOOL_ALLOW: "egress",
TOOL_EGRESS_BLOCK: "egress",
}
STATUS_APPROVED = "approved" STATUS_APPROVED = "approved"
STATUS_MODIFIED = "modified" STATUS_MODIFIED = "modified"
@@ -431,9 +438,9 @@ def sha256_hex(content: str) -> str:
# Dockerfile and propose modifications. # Dockerfile and propose modifications.
# #
# routes.yaml + allowlist used to live here too; PRD 0017 chunk 3 # routes.yaml + allowlist used to live here too; PRD 0017 chunk 3
# moved them behind the `list-egress-routes` MCP tool (live # moved them behind the `list-egress-routes` MCP tool (live state
# state from egress's introspection endpoint) so the agent # from egress's introspection endpoint) so the agent always sees
# always sees current data rather than a launch-time snapshot. # current data rather than a launch-time snapshot.
CURRENT_CONFIG_DOCKERFILE = "Dockerfile" CURRENT_CONFIG_DOCKERFILE = "Dockerfile"
+107 -9
View File
@@ -1,8 +1,8 @@
"""Supervise sidecar HTTP server (PRD 0013). """Supervise sidecar HTTP server (PRD 0013).
Per-bottle MCP server exposing tools the agent calls to propose config Per-bottle MCP server exposing tools the agent calls to propose config
changes when stuck. The egress-block tool was removed in issue #198; changes when stuck. The tools are `allow`, `egress-block`,
the remaining tools are `capability-block` and `list-egress-routes`. `capability-block`, and `list-egress-routes`.
Each queued tool call: Each queued tool call:
@@ -44,9 +44,15 @@ import urllib.request
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
# Same-directory import inside the bundle container; `supervise.py` try:
# is COPYed alongside this file by Dockerfile.sidecars. # Same-directory imports inside the bundle container; these files are
# COPYed flat under /app by Dockerfile.sidecars.
from egress_addon_core import load_routes
import supervise as _sv import supervise as _sv
except ModuleNotFoundError:
# Package imports for host-side tests and tooling.
from .egress_addon_core import load_routes
from . import supervise as _sv
# --- JSON-RPC / MCP plumbing ---------------------------------------------- # --- JSON-RPC / MCP plumbing ----------------------------------------------
@@ -142,8 +148,9 @@ TOOL_DEFINITIONS: list[dict[str, object]] = [
"allowlist. Returns JSON with one entry per allowed host, " "allowlist. Returns JSON with one entry per allowed host, "
"each carrying its matches rules (if any) and whether " "each carrying its matches rules (if any) and whether "
"the proxy injects Authorization for the route. Use this " "the proxy injects Authorization for the route. Use this "
"before composing an `egress-block` proposal so the new " "before composing an `allow` or `egress-block` proposal so "
"routes file extends the live one rather than replacing it." "the new routes file extends the live one rather than "
"replacing it."
), ),
"inputSchema": { "inputSchema": {
"type": "object", "type": "object",
@@ -151,6 +158,88 @@ TOOL_DEFINITIONS: list[dict[str, object]] = [
"additionalProperties": False, "additionalProperties": False,
}, },
}, },
{
"name": _sv.TOOL_ALLOW,
"description": (
"Request operator approval to change the bottle's egress "
"allowlist. Pass the full proposed routes.yaml content, not "
"just the new host, plus a justification. Use "
"`list-egress-routes` first so the proposal preserves existing "
"routes."
),
"inputSchema": {
"type": "object",
"properties": {
"routes_yaml": {
"type": "string",
"description": (
"Full proposed /etc/egress/routes.yaml content. "
"Each route entry accepts these keys:\n"
" host: <hostname> (required)\n"
" auth_scheme: Bearer|token (must pair with token_env)\n"
" token_env: <ENV_VAR_NAME> (must pair with auth_scheme)\n"
" matches: (optional list of match entries)\n"
" - paths: [{type: prefix|exact|regex, value: /...}]\n"
" methods: [GET, POST, ...]\n"
" headers: [{name: X-Hdr, value: val, type: exact|regex}]\n"
" git: (optional; omit to block git clone/fetch)\n"
" fetch: true\n"
" dlp: (optional DLP scanner overrides)\n"
" outbound_detectors: [token_patterns, known_secrets]\n"
" inbound_detectors: [naive_injection_detection]\n"
"Omit any key that should use its default. "
"`list-egress-routes` returns routes in this same format."
),
},
"justification": {
"type": "string",
"description": "Why this egress route is needed.",
},
},
"required": ["routes_yaml", "justification"],
},
},
{
"name": _sv.TOOL_EGRESS_BLOCK,
"description": (
"Request operator approval to change the bottle's egress "
"allowlist after a blocked outbound request. Pass the full "
"proposed routes.yaml content plus a justification. Use "
"`list-egress-routes` first so the proposal preserves existing "
"routes."
),
"inputSchema": {
"type": "object",
"properties": {
"routes_yaml": {
"type": "string",
"description": (
"Full proposed /etc/egress/routes.yaml content. "
"Each route entry accepts these keys:\n"
" host: <hostname> (required)\n"
" auth_scheme: Bearer|token (must pair with token_env)\n"
" token_env: <ENV_VAR_NAME> (must pair with auth_scheme)\n"
" matches: (optional list of match entries)\n"
" - paths: [{type: prefix|exact|regex, value: /...}]\n"
" methods: [GET, POST, ...]\n"
" headers: [{name: X-Hdr, value: val, type: exact|regex}]\n"
" git: (optional; omit to block git clone/fetch)\n"
" fetch: true\n"
" dlp: (optional DLP scanner overrides)\n"
" outbound_detectors: [token_patterns, known_secrets]\n"
" inbound_detectors: [naive_injection_detection]\n"
"Omit any key that should use its default. "
"`list-egress-routes` returns routes in this same format."
),
},
"justification": {
"type": "string",
"description": "Why this egress route is needed.",
},
},
"required": ["routes_yaml", "justification"],
},
},
{ {
"name": _sv.TOOL_CAPABILITY_BLOCK, "name": _sv.TOOL_CAPABILITY_BLOCK,
"description": ( "description": (
@@ -182,11 +271,12 @@ TOOL_DEFINITIONS: list[dict[str, object]] = [
] ]
# Map each non-egress tool to the input field that carries the agent's # Map each proposal tool to the input field that carries the agent's
# payload (stored in Proposal.proposed_file). egress-block builds its # payload (stored in Proposal.proposed_file).
# payload from structured input fields in `handle_egress_block`.
PROPOSED_FILE_FIELD: dict[str, str] = { PROPOSED_FILE_FIELD: dict[str, str] = {
_sv.TOOL_ALLOW: "routes_yaml",
_sv.TOOL_CAPABILITY_BLOCK: "dockerfile", _sv.TOOL_CAPABILITY_BLOCK: "dockerfile",
_sv.TOOL_EGRESS_BLOCK: "routes_yaml",
} }
@@ -203,6 +293,14 @@ def validate_proposed_file(tool: str, content: str) -> None:
# Dockerfiles are too varied to validate syntactically beyond # Dockerfiles are too varied to validate syntactically beyond
# non-empty. The operator reads the diff in the TUI. # non-empty. The operator reads the diff in the TUI.
pass pass
elif tool in (_sv.TOOL_ALLOW, _sv.TOOL_EGRESS_BLOCK):
try:
load_routes(content)
except ValueError as e:
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: proposed routes.yaml is not valid: {e}",
) from e
else: else:
raise _RpcError(ERR_INVALID_PARAMS, f"unknown tool {tool!r}") raise _RpcError(ERR_INVALID_PARAMS, f"unknown tool {tool!r}")
+1 -1
View File
@@ -392,7 +392,7 @@ class TestSidecarBundleShape(unittest.TestCase):
"services"]["sidecars"] "services"]["sidecars"]
targets = {v["target"] for v in sc["volumes"]} targets = {v["target"] for v in sc["volumes"]}
self.assertIn("/home/mitmproxy/.mitmproxy/mitmproxy-ca.pem", targets) self.assertIn("/home/mitmproxy/.mitmproxy/mitmproxy-ca.pem", targets)
self.assertIn("/etc/egress/routes.yaml", targets) self.assertIn("/etc/egress", targets)
self.assertIn("/git-gate-entrypoint.sh", targets) self.assertIn("/git-gate-entrypoint.sh", targets)
self.assertIn("/git-gate/creds/upstream-known_hosts", targets) self.assertIn("/git-gate/creds/upstream-known_hosts", targets)
self.assertTrue(any("supervise/queue" in t or t.startswith("/run/supervise") self.assertTrue(any("supervise/queue" in t or t.startswith("/run/supervise")
+4 -4
View File
@@ -292,10 +292,10 @@ class TestCodexSuperviseMcp(unittest.TestCase):
bottle.exec.assert_called_once() bottle.exec.assert_called_once()
script = bottle.exec.call_args.args[0] script = bottle.exec.call_args.args[0]
self.assertEqual("node", bottle.exec.call_args.kwargs.get("user")) self.assertEqual("node", bottle.exec.call_args.kwargs.get("user"))
self.assertIn("codex mcp add", script) self.assertEqual(
self.assertIn("--transport http", script) f"codex mcp add supervise --url {_URL}",
self.assertIn("supervise", script) script,
self.assertIn(_URL, script) )
def test_logs_warning_on_failure_but_does_not_raise(self): def test_logs_warning_on_failure_but_does_not_raise(self):
bottle = _make_bottle( bottle = _make_bottle(
+54 -11
View File
@@ -2,12 +2,15 @@
add_route removed; docker exec / cp / kill paths are covered by the add_route removed; docker exec / cp / kill paths are covered by the
integration test).""" integration test)."""
import tempfile
import unittest import unittest
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import patch
from bot_bottle.backend.docker.egress_apply import ( from bot_bottle import supervise
EgressApplyError, from bot_bottle.backend.egress_apply import EgressApplyError
validate_routes_content, from bot_bottle.backend.docker.egress_apply import applicator
)
_ROUTES_EMPTY = "routes: []\n" _ROUTES_EMPTY = "routes: []\n"
@@ -16,11 +19,11 @@ _ROUTES_ONE = 'routes:\n - host: "api.anthropic.com"\n'
class TestValidateRoutesContent(unittest.TestCase): class TestValidateRoutesContent(unittest.TestCase):
def test_accepts_minimal_route_table(self): def test_accepts_minimal_route_table(self):
validate_routes_content(_ROUTES_EMPTY) applicator.validate_routes_content(_ROUTES_EMPTY)
validate_routes_content(_ROUTES_ONE) applicator.validate_routes_content(_ROUTES_ONE)
def test_accepts_full_route_with_matches(self): def test_accepts_full_route_with_matches(self):
validate_routes_content( applicator.validate_routes_content(
'routes:\n' 'routes:\n'
' - host: "api.github.com"\n' ' - host: "api.github.com"\n'
' auth_scheme: "Bearer"\n' ' auth_scheme: "Bearer"\n'
@@ -32,25 +35,65 @@ class TestValidateRoutesContent(unittest.TestCase):
def test_rejects_bad_yaml(self): def test_rejects_bad_yaml(self):
with self.assertRaises(EgressApplyError) as cm: with self.assertRaises(EgressApplyError) as cm:
validate_routes_content("routes:\n\t- host: x\n") applicator.validate_routes_content("routes:\n\t- host: x\n")
self.assertIn("not valid", str(cm.exception)) self.assertIn("not valid", str(cm.exception))
def test_rejects_missing_routes_key(self): def test_rejects_missing_routes_key(self):
with self.assertRaises(EgressApplyError): with self.assertRaises(EgressApplyError):
validate_routes_content("other: []\n") applicator.validate_routes_content("other: []\n")
def test_rejects_non_list_routes(self): def test_rejects_non_list_routes(self):
with self.assertRaises(EgressApplyError): with self.assertRaises(EgressApplyError):
validate_routes_content('routes: "not a list"\n') applicator.validate_routes_content('routes: "not a list"\n')
def test_rejects_partial_auth_pair(self): def test_rejects_partial_auth_pair(self):
with self.assertRaises(EgressApplyError): with self.assertRaises(EgressApplyError):
validate_routes_content( applicator.validate_routes_content(
'routes:\n' 'routes:\n'
' - host: "x.example"\n' ' - host: "x.example"\n'
' auth_scheme: "Bearer"\n' ' auth_scheme: "Bearer"\n'
) )
class TestApplyRoutesChange(unittest.TestCase):
def setUp(self):
self._tmp = tempfile.TemporaryDirectory(prefix="egress-apply-test.")
original = supervise.bot_bottle_root
def fake_root() -> Path:
return Path(self._tmp.name) / ".bot-bottle"
supervise.bot_bottle_root = fake_root # type: ignore[assignment]
self.addCleanup(lambda: setattr(supervise, "bot_bottle_root", original))
self.addCleanup(self._tmp.cleanup)
def test_writes_live_routes_and_signals_reload(self):
calls: list[list[str]] = []
def fake_run(argv: list[str], **kwargs: object) -> SimpleNamespace:
calls.append(list(argv))
return SimpleNamespace(returncode=0, stdout="", stderr="")
with patch(
"bot_bottle.backend.docker.egress_apply.subprocess.run",
side_effect=fake_run,
):
before, after = applicator.apply_routes_change(
"dev",
"routes:\n - host: google.com\n",
)
self.assertEqual("", before)
self.assertEqual("routes:\n - host: google.com\n", after)
self.assertEqual(
"routes:\n - host: google.com\n",
(Path(self._tmp.name) / ".bot-bottle/state/dev/egress/routes.yaml").read_text(encoding="utf-8"),
)
self.assertEqual(
["docker", "kill", "--signal", "HUP", "bot-bottle-sidecars-dev"],
calls[0],
)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
+2 -7
View File
@@ -27,7 +27,7 @@ def _plan(
agent_git_gate_url: str = "", agent_git_gate_url: str = "",
agent_supervise_url: str = "", agent_supervise_url: str = "",
) -> MacosContainerBottlePlan: ) -> MacosContainerBottlePlan:
routes_path = stage_dir / "source-routes.yaml" routes_path = stage_dir / "routes.yaml"
routes_path.write_text("routes: []\n", encoding="utf-8") routes_path.write_text("routes: []\n", encoding="utf-8")
ca_dir = stage_dir / "egress-ca" ca_dir = stage_dir / "egress-ca"
ca_dir.mkdir(exist_ok=True) ca_dir.mkdir(exist_ok=True)
@@ -125,15 +125,10 @@ class TestMacosContainerLaunchArgv(unittest.TestCase):
f"type=bind,source={self.stage_dir / 'egress-ca'},target=/home/mitmproxy/.mitmproxy", f"type=bind,source={self.stage_dir / 'egress-ca'},target=/home/mitmproxy/.mitmproxy",
argv, argv,
) )
routes_dir = self.stage_dir / "macos-container-egress"
self.assertIn( self.assertIn(
f"type=bind,source={routes_dir},target=/etc/egress,readonly", f"type=bind,source={self.stage_dir},target=/etc/egress,readonly",
argv, argv,
) )
self.assertEqual(
"routes: []\n",
(routes_dir / "routes.yaml").read_text(encoding="utf-8"),
)
self.assertIn( self.assertIn(
"type=bind,source=/state/supervise/queue,target=/run/supervise/queue", "type=bind,source=/state/supervise/queue,target=/run/supervise/queue",
argv, argv,
+10 -3
View File
@@ -317,15 +317,22 @@ class TestToolConstants(unittest.TestCase):
def test_tools_tuple_matches_individual_constants(self): def test_tools_tuple_matches_individual_constants(self):
self.assertEqual( self.assertEqual(
( (
supervise.TOOL_ALLOW,
TOOL_CAPABILITY_BLOCK, TOOL_CAPABILITY_BLOCK,
supervise.TOOL_EGRESS_BLOCK,
supervise.TOOL_LIST_EGRESS_ROUTES, supervise.TOOL_LIST_EGRESS_ROUTES,
), ),
supervise.TOOLS, supervise.TOOLS,
) )
def test_component_map_has_no_entries(self): def test_component_map_has_egress_entries(self):
# egress-block removed in issue #198; capability-block never had one. self.assertEqual(
self.assertEqual({}, supervise.COMPONENT_FOR_TOOL) {
supervise.TOOL_ALLOW: "egress",
supervise.TOOL_EGRESS_BLOCK: "egress",
},
supervise.COMPONENT_FOR_TOOL,
)
class _StubSupervise(supervise.Supervise): class _StubSupervise(supervise.Supervise):
+19 -3
View File
@@ -2,9 +2,6 @@
The curses TUI itself isn't exercised here — these tests cover the The curses TUI itself isn't exercised here — these tests cover the
discovery + approve/reject paths that the TUI's key handlers call into. discovery + approve/reject paths that the TUI's key handlers call into.
egress-block (add_route) was removed in issue #198; the TestEgressApplyWiring
class and all stubs for add_route have been dropped accordingly.
""" """
import os import os
@@ -12,6 +9,7 @@ import tempfile
import unittest import unittest
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from unittest.mock import patch
from bot_bottle import supervise from bot_bottle import supervise
from bot_bottle.cli import supervise as supervise_cli from bot_bottle.cli import supervise as supervise_cli
@@ -33,6 +31,8 @@ FIXED = datetime(2026, 5, 25, 12, 0, 0, tzinfo=timezone.utc)
def _proposal(slug: str = "dev", tool: str = TOOL_CAPABILITY_BLOCK) -> Proposal: def _proposal(slug: str = "dev", tool: str = TOOL_CAPABILITY_BLOCK) -> Proposal:
payloads = { payloads = {
TOOL_CAPABILITY_BLOCK: "FROM python:3.13\n", TOOL_CAPABILITY_BLOCK: "FROM python:3.13\n",
supervise.TOOL_ALLOW: "routes:\n - host: example.com\n",
supervise.TOOL_EGRESS_BLOCK: "routes:\n - host: example.com\n",
} }
payload = payloads.get(tool, "") payload = payloads.get(tool, "")
return Proposal.new( return Proposal.new(
@@ -154,6 +154,22 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
supervise_cli.approve(qp) supervise_cli.approve(qp)
self.assertEqual([], read_audit_entries("egress", "dev")) self.assertEqual([], read_audit_entries("egress", "dev"))
def test_approve_egress_block_writes_audit_log(self):
qp = self._enqueue(tool=supervise.TOOL_EGRESS_BLOCK)
with patch(
"bot_bottle.cli.supervise.apply_routes_change",
return_value=("routes: []\n", "routes:\n - host: example.com\n"),
) as apply_routes_change:
supervise_cli.approve(qp)
apply_routes_change.assert_called_once_with(
"dev",
"routes:\n - host: example.com\n",
)
entries = read_audit_entries("egress", "dev")
self.assertEqual(1, len(entries))
self.assertEqual(STATUS_APPROVED, entries[0].operator_action)
self.assertEqual("needed for dev", entries[0].justification)
# class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase): # class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
# # DISABLED — capability_apply functionality is currently commented out. # # DISABLED — capability_apply functionality is currently commented out.
+45 -5
View File
@@ -54,13 +54,19 @@ class TestValidation(unittest.TestCase):
) )
def test_empty_proposed_file_rejected_for_tools_with_file_field(self): def test_empty_proposed_file_rejected_for_tools_with_file_field(self):
# egress-block has structured input (validated in
# _validate_and_bundle_egress_route, not here) and
# list-egress-routes takes no input. Only capability-block
# goes through `validate_proposed_file`.
with self.assertRaises(_RpcError): with self.assertRaises(_RpcError):
validate_proposed_file(_sv.TOOL_CAPABILITY_BLOCK, " \n\t") validate_proposed_file(_sv.TOOL_CAPABILITY_BLOCK, " \n\t")
def test_egress_routes_yaml_is_validated(self):
validate_proposed_file(
_sv.TOOL_ALLOW,
"routes:\n - host: example.com\n",
)
def test_invalid_egress_routes_yaml_rejected(self):
with self.assertRaises(_RpcError):
validate_proposed_file(_sv.TOOL_EGRESS_BLOCK, "routes: nope\n")
# --- JSON-RPC parsing ------------------------------------------------------ # --- JSON-RPC parsing ------------------------------------------------------
@@ -141,7 +147,9 @@ class TestHandleToolsList(unittest.TestCase):
names = [t["name"] for t in result["tools"]] # type: ignore[index] names = [t["name"] for t in result["tools"]] # type: ignore[index]
self.assertEqual( self.assertEqual(
sorted([ sorted([
_sv.TOOL_ALLOW,
_sv.TOOL_CAPABILITY_BLOCK, _sv.TOOL_CAPABILITY_BLOCK,
_sv.TOOL_EGRESS_BLOCK,
_sv.TOOL_LIST_EGRESS_ROUTES, _sv.TOOL_LIST_EGRESS_ROUTES,
]), ]),
sorted(names), sorted(names),
@@ -172,6 +180,17 @@ class TestHandleToolsList(unittest.TestCase):
# No `required` array because no inputs are required. # No `required` array because no inputs are required.
self.assertNotIn("required", schema) # type: ignore[operator] self.assertNotIn("required", schema) # type: ignore[operator]
def test_egress_tools_take_routes_yaml_and_justification(self):
for tool_name in (_sv.TOOL_ALLOW, _sv.TOOL_EGRESS_BLOCK):
with self.subTest(tool_name=tool_name):
tool = next(t for t in TOOL_DEFINITIONS if t["name"] == tool_name)
schema = tool["inputSchema"]
self.assertEqual("object", schema["type"]) # type: ignore[index]
self.assertEqual(
["routes_yaml", "justification"],
schema["required"], # type: ignore[index]
)
class TestHandleToolsCall(unittest.TestCase): class TestHandleToolsCall(unittest.TestCase):
def setUp(self): def setUp(self):
@@ -220,6 +239,26 @@ class TestHandleToolsCall(unittest.TestCase):
self.assertIn("status: approved", text) self.assertIn("status: approved", text)
self.assertIn("notes: lgtm", text) self.assertIn("notes: lgtm", text)
def test_allow_round_trips_through_queue(self):
responder = self._respond_when_proposal_appears(_sv.STATUS_APPROVED, notes="ok")
try:
result = handle_tools_call(
{
"name": _sv.TOOL_ALLOW,
"arguments": {
"routes_yaml": "routes:\n - host: example.com\n",
"justification": "need example.com",
},
},
self.config,
)
finally:
responder.join()
self.assertFalse(result["isError"]) # type: ignore[index]
text = result["content"][0]["text"] # type: ignore[index]
self.assertIn("status: approved", text)
self.assertIn("notes: ok", text)
def test_rejected_response_sets_isError(self): def test_rejected_response_sets_isError(self):
responder = self._respond_when_proposal_appears(_sv.STATUS_REJECTED, notes="nope") responder = self._respond_when_proposal_appears(_sv.STATUS_REJECTED, notes="nope")
try: try:
@@ -412,7 +451,8 @@ class TestHttpEndToEnd(unittest.TestCase):
self.assertEqual(1, result["id"]) self.assertEqual(1, result["id"])
names = [t["name"] for t in result["result"]["tools"]] # type: ignore[index] names = [t["name"] for t in result["result"]["tools"]] # type: ignore[index]
self.assertIn(_sv.TOOL_CAPABILITY_BLOCK, names) self.assertIn(_sv.TOOL_CAPABILITY_BLOCK, names)
self.assertNotIn("egress-block", names) self.assertIn(_sv.TOOL_ALLOW, names)
self.assertIn(_sv.TOOL_EGRESS_BLOCK, names)
def test_unknown_method_returns_jsonrpc_error(self): def test_unknown_method_returns_jsonrpc_error(self):
result = self._post_jsonrpc( result = self._post_jsonrpc(