fix(supervise): apply egress approvals
lint / lint (push) Failing after 1m34s
test / unit (pull_request) Successful in 33s
test / integration (pull_request) Successful in 15s

This commit is contained in:
2026-06-23 01:33:35 -04:00
parent 4c39b45e34
commit 0ec1085238
6 changed files with 140 additions and 28 deletions
+70 -6
View File
@@ -1,17 +1,20 @@
"""Host-side helper for egress sidecar inspection (issue #198).
"""Host-side helper for egress sidecar inspection and live updates.
`_merge_single_route`, `add_route`, and `apply_routes_change` were
removed when the egress-block MCP tool was dropped. The remaining
helpers support runtime inspection and validation of the routes file
without modifying it at runtime.
The approve path uses this module to validate a proposed routes file,
write it to the bottle's live egress state dir, and signal the sidecar
bundle so the mitmproxy addon reloads it.
"""
from __future__ import annotations
import os
import subprocess
from pathlib import Path
from ...bottle_state import egress_state_dir, read_metadata
from ...egress import EGRESS_ROUTES_IN_CONTAINER
from ...egress_addon_core import load_routes
from ...log import warn
from .sidecar_bundle import sidecar_bundle_container_name
@@ -29,10 +32,22 @@ def fetch_current_routes(slug: str) -> str:
raise EgressApplyError(
f"could not read routes.yaml from {container}: "
f"{(r.stderr or '').strip() or 'container not running?'}"
)
)
return r.stdout
def apply_routes_change(slug: str, content: str) -> tuple[str, str]:
"""Persist `content` to the live routes file and reload egress."""
validate_routes_content(content)
routes_path = _routes_path(slug)
routes_path.parent.mkdir(parents=True, exist_ok=True)
before = routes_path.read_text(encoding="utf-8") if routes_path.exists() else ""
routes_path.write_text(content, encoding="utf-8")
routes_path.chmod(0o600)
_signal_bundle_reload(slug)
return before, content
def validate_routes_content(content: str) -> None:
try:
load_routes(content)
@@ -42,8 +57,57 @@ def validate_routes_content(content: str) -> None:
) from e
def _routes_path(slug: str) -> Path:
return egress_state_dir(slug) / "egress_routes.yaml"
def _signal_bundle_reload(slug: str) -> None:
container = sidecar_bundle_container_name(slug)
backend = ""
metadata = read_metadata(slug)
if metadata is not None:
backend = metadata.backend
candidates: list[list[str]]
if backend == "macos-container":
candidates = [["container", "kill", "--signal", "HUP", container]]
elif backend:
candidates = [["docker", "kill", "--signal", "HUP", container]]
else:
candidates = [
["docker", "kill", "--signal", "HUP", container],
["container", "kill", "--signal", "HUP", container],
]
last_error = ""
for argv in candidates:
try:
result = subprocess.run(
argv,
capture_output=True,
text=True,
check=False,
env=os.environ,
)
except FileNotFoundError as e:
last_error = str(e)
continue
if result.returncode == 0:
return
last_error = (result.stderr or "").strip() or (result.stdout or "").strip()
warn(
f"egress: routes updated on disk for {slug}, but bundle reload failed: "
f"{last_error or 'no reload command succeeded'}"
)
raise EgressApplyError(
f"could not reload egress bundle {container}: "
f"{last_error or 'no reload command succeeded'}"
)
__all__ = [
"EgressApplyError",
"apply_routes_change",
"fetch_current_routes",
"validate_routes_content",
]
+2 -14
View File
@@ -12,7 +12,6 @@ from __future__ import annotations
import dataclasses
import os
import shutil
import subprocess
from contextlib import ExitStack, contextmanager
from pathlib import Path
@@ -364,8 +363,8 @@ def _sidecar_mounts(
))
if ep.routes:
mounts.append((
str(_stage_routes_dir(plan)),
str(Path(EGRESS_ROUTES_IN_CONTAINER).parent),
str(ep.routes_path),
EGRESS_ROUTES_IN_CONTAINER,
True,
))
@@ -375,17 +374,6 @@ def _sidecar_mounts(
return tuple(mounts)
def _stage_routes_dir(plan: MacosContainerBottlePlan) -> Path:
routes_dir = plan.stage_dir / "macos-container-egress"
routes_dir.mkdir(parents=True, exist_ok=True)
shutil.copyfile(
plan.egress_plan.routes_path,
routes_dir / Path(EGRESS_ROUTES_IN_CONTAINER).name,
)
return routes_dir
def _mount_spec(host_path: str, container_path: str, read_only: bool) -> str:
spec = f"type=bind,source={host_path},target={container_path}"
if read_only:
+11 -1
View File
@@ -26,6 +26,10 @@ from .. import supervise as _supervise
# CapabilityApplyError,
# apply_capability_change,
# )
from ..backend.docker.egress_apply import (
EgressApplyError,
apply_routes_change,
)
from ..log import Die, error, info
@@ -66,7 +70,7 @@ class QueuedProposal:
# Errors any remediation engine may raise. Caught by the TUI key
# handlers and surfaced in the status line so a failed apply keeps
# the proposal pending rather than crashing curses.
ApplyError = (CapabilityApplyError,)
ApplyError = (CapabilityApplyError, EgressApplyError)
def discover_pending() -> list[QueuedProposal]:
@@ -134,6 +138,7 @@ def approve(
) -> None:
"""Apply the proposal, write the waiting response, and audit it."""
status = STATUS_MODIFIED if final_file is not None else STATUS_APPROVED
file_to_apply = final_file if final_file is not None else qp.proposal.proposed_file
diff_before, diff_after = "", ""
# if qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
@@ -147,6 +152,11 @@ def approve(
# diff_before, diff_after = apply_capability_change(
# qp.proposal.bottle_slug, file_to_apply,
# )
if qp.proposal.tool in (TOOL_ALLOW, TOOL_EGRESS_BLOCK):
diff_before, diff_after = apply_routes_change(
qp.proposal.bottle_slug,
file_to_apply,
)
response = Response(
proposal_id=qp.proposal.id,
+46
View File
@@ -2,10 +2,16 @@
add_route removed; docker exec / cp / kill paths are covered by the
integration test)."""
import tempfile
import unittest
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import patch
from bot_bottle import supervise
from bot_bottle.backend.docker.egress_apply import (
EgressApplyError,
apply_routes_change,
validate_routes_content,
)
@@ -52,5 +58,45 @@ class TestValidateRoutesContent(unittest.TestCase):
)
class TestApplyRoutesChange(unittest.TestCase):
def setUp(self):
self._tmp = tempfile.TemporaryDirectory(prefix="egress-apply-test.")
original = supervise.bot_bottle_root
def fake_root() -> Path:
return Path(self._tmp.name) / ".bot-bottle"
supervise.bot_bottle_root = fake_root # type: ignore[assignment]
self.addCleanup(lambda: setattr(supervise, "bot_bottle_root", original))
self.addCleanup(self._tmp.cleanup)
def test_writes_live_routes_and_signals_reload(self):
calls: list[list[str]] = []
def fake_run(argv, **kwargs):
calls.append(list(argv))
return SimpleNamespace(returncode=0, stdout="", stderr="")
with patch(
"bot_bottle.backend.docker.egress_apply.subprocess.run",
side_effect=fake_run,
):
before, after = apply_routes_change(
"dev",
"routes:\n - host: google.com\n",
)
self.assertEqual("", before)
self.assertEqual("routes:\n - host: google.com\n", after)
self.assertEqual(
"routes:\n - host: google.com\n",
(Path(self._tmp.name) / ".bot-bottle/state/dev/egress/egress_routes.yaml").read_text(encoding="utf-8"),
)
self.assertEqual(
["docker", "kill", "--signal", "HUP", "bot-bottle-sidecars-dev"],
calls[0],
)
if __name__ == "__main__":
unittest.main()
+1 -6
View File
@@ -125,15 +125,10 @@ class TestMacosContainerLaunchArgv(unittest.TestCase):
f"type=bind,source={self.stage_dir / 'egress-ca'},target=/home/mitmproxy/.mitmproxy",
argv,
)
routes_dir = self.stage_dir / "macos-container-egress"
self.assertIn(
f"type=bind,source={routes_dir},target=/etc/egress,readonly",
f"type=bind,source={self.stage_dir / 'source-routes.yaml'},target=/etc/egress/routes.yaml,readonly",
argv,
)
self.assertEqual(
"routes: []\n",
(routes_dir / "routes.yaml").read_text(encoding="utf-8"),
)
self.assertIn(
"type=bind,source=/state/supervise/queue,target=/run/supervise/queue",
argv,
+10 -1
View File
@@ -9,6 +9,7 @@ import tempfile
import unittest
from datetime import datetime, timezone
from pathlib import Path
from unittest.mock import patch
from bot_bottle import supervise
from bot_bottle.cli import supervise as supervise_cli
@@ -155,7 +156,15 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
def test_approve_egress_block_writes_audit_log(self):
qp = self._enqueue(tool=supervise.TOOL_EGRESS_BLOCK)
supervise_cli.approve(qp)
with patch(
"bot_bottle.cli.supervise.apply_routes_change",
return_value=("routes: []\n", "routes:\n - host: example.com\n"),
) as apply_routes_change:
supervise_cli.approve(qp)
apply_routes_change.assert_called_once_with(
"dev",
"routes:\n - host: example.com\n",
)
entries = read_audit_entries("egress", "dev")
self.assertEqual(1, len(entries))
self.assertEqual(STATUS_APPROVED, entries[0].operator_action)