From 1542ee0b93907743a45eafd71fbb12e7fb5f845f Mon Sep 17 00:00:00 2001 From: didericis Date: Mon, 25 May 2026 18:45:17 -0400 Subject: [PATCH] feat(egress-proxy-block): single-route input + merge-on-apply MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Instead of asking the agent to compose and submit a full routes file, the tool now takes ONE proposed route — host + optional path_allowlist + optional auth — and the supervisor merges it into the live routes table at approval time. The agent no longer needs to fetch / reproduce / extend the existing allowlist; it just describes the host it wants reachable. Tool input (new): - `host` (required) - `path_allowlist` (optional, array of absolute path prefixes) - `auth` (optional, {scheme, token_ref}) - `justification` (required) Merge semantics (in `egress_proxy_apply._merge_single_route`): - Host NOT in current routes → append the proposed route as a new entry. If `auth` is set, assign the next EGRESS_PROXY_TOKEN_N slot. - Host already present → union the proposed `path_allowlist` with the existing one (proposed entries appended after existing, deduped). Existing `auth_scheme` / `token_env` preserved; proposed `auth` ignored (operator-controlled, not agent-controlled). - Hostname comparison is case-insensitive. Dashboard wiring: `approve()` on an egress-proxy-block proposal now calls `add_route(slug, proposed_route_json)` instead of `apply_routes_change(slug, full_file)`. add_route fetches the current routes from the running egress-proxy, merges, and calls apply_routes_change with the merged content — so the pipelock-mirror + SIGHUP plumbing from chunk 3 still runs end-to-end. Audit diff still captures the full-file before/after. Tool description rewritten to make the new shape obvious and to stop pointing the agent at the routes file. The `list-egress-proxy-routes` tool stays available for agents that want to see what's currently allowed. Tests: 9 new `_merge_single_route` cases (host absent/present, path-allowlist union+dedup, auth-slot indexing, case-insensitive match, existing-auth preservation, missing-host rejection, malformed-current rejection). 407 unit + integration pass. Co-Authored-By: Claude Opus 4.7 --- .../backend/docker/egress_proxy_apply.py | 110 +++++++++++ claude_bottle/cli/dashboard.py | 9 +- claude_bottle/supervise_server.py | 183 ++++++++++++++---- tests/integration/test_supervise_sidecar.py | 16 +- tests/unit/test_dashboard.py | 61 +++--- tests/unit/test_egress_proxy_apply.py | 106 ++++++++++ tests/unit/test_supervise_server.py | 30 +-- 7 files changed, 419 insertions(+), 96 deletions(-) diff --git a/claude_bottle/backend/docker/egress_proxy_apply.py b/claude_bottle/backend/docker/egress_proxy_apply.py index d828f68..535552b 100644 --- a/claude_bottle/backend/docker/egress_proxy_apply.py +++ b/claude_bottle/backend/docker/egress_proxy_apply.py @@ -22,6 +22,7 @@ operator can retry. from __future__ import annotations +import json import os import subprocess import tempfile @@ -180,8 +181,117 @@ def apply_routes_change(slug: str, new_content: str) -> tuple[str, str]: return before, new_content +def _merge_single_route( + current_yaml: str, new_route: dict[str, object], +) -> str: + """Merge a single proposed route into the current routes.yaml + content, returning the merged JSON-as-yaml string. + + Behavior: + - If `new_route['host']` is NOT in the current routes → + append the route. + - If the host IS already present → union the path_allowlist + entries (proposed ∪ existing). The existing `auth_scheme` + and `token_env` are preserved — agent-proposed auth changes + on an existing host are ignored, matching the tool's + documented semantics. + + The supervisor renders the merged routes.yaml with the same + JSON layout the addon expects (host + path_allowlist + + auth_scheme + token_env). Token VALUES never appear here; the + routes file carries only env-var slot NAMES.""" + try: + cfg = json.loads(current_yaml) + except json.JSONDecodeError as e: + raise EgressProxyApplyError( + f"current routes.yaml is not valid JSON: {e}" + ) from e + routes = cfg.get("routes") + if not isinstance(routes, list): + raise EgressProxyApplyError( + "current routes.yaml: 'routes' is not a list" + ) + + new_host = str(new_route.get("host", "")).lower() + if not new_host: + raise EgressProxyApplyError( + "proposed route is missing 'host'" + ) + + proposed_paths = list(new_route.get("path_allowlist") or []) + + # Look for an existing entry with the same host (case-insensitive). + for entry in routes: + if not isinstance(entry, dict): + continue + if str(entry.get("host", "")).lower() == new_host: + # Merge path_allowlist: union proposed + existing, ordered + # by first-seen so existing paths stay in original order. + existing_paths: list[str] = list(entry.get("path_allowlist") or []) + seen = {p: None for p in existing_paths} + for p in proposed_paths: + seen.setdefault(p, None) + merged_paths = list(seen.keys()) + if merged_paths: + entry["path_allowlist"] = merged_paths + # Preserve existing auth — tool description says agent- + # proposed auth on an existing host is ignored. + break + else: + # Host not present; build a new route entry from the + # proposed fields. Need to assign a token_env slot if + # `auth` was proposed (otherwise the addon's parser rejects + # a half-set auth pair). Slots: count existing slots, pick + # the next free index. + entry = {"host": new_route["host"]} + if proposed_paths: + entry["path_allowlist"] = proposed_paths + auth = new_route.get("auth") + if isinstance(auth, dict) and auth.get("scheme") and auth.get("token_ref"): + existing_slots = sorted({ + str(r.get("token_env")) + for r in routes + if isinstance(r, dict) and r.get("token_env") + }) + next_idx = len(existing_slots) + entry["auth_scheme"] = str(auth["scheme"]) + entry["token_env"] = f"EGRESS_PROXY_TOKEN_{next_idx}" + # NOTE: the addon reads token VALUES from its container's + # environ keyed by token_env. A newly-added auth route at + # runtime points at a slot that has no env value → the + # addon will 403 with "token env unset" until the operator + # arranges for the value to land in the container's env. + # Recording this here so the operator-facing diff carries + # the slot name they'll need to provision. + routes.append(entry) + + cfg["routes"] = routes + return json.dumps(cfg, indent=2) + "\n" + + +def add_route(slug: str, proposed_route_json: str) -> tuple[str, str]: + """Apply a single-route addition to the egress-proxy. Parses the + agent's proposed route, fetches the current routes file, merges, + and applies via `apply_routes_change`. Returns (before, after) + full-file content for the audit log.""" + try: + proposed = json.loads(proposed_route_json) + except json.JSONDecodeError as e: + raise EgressProxyApplyError( + f"proposed route is not valid JSON: {e}" + ) from e + if not isinstance(proposed, dict): + raise EgressProxyApplyError( + "proposed route must be a JSON object" + ) + current = fetch_current_routes(slug) + merged = _merge_single_route(current, proposed) + return apply_routes_change(slug, merged) + + __all__ = [ "EgressProxyApplyError", + "add_route", "apply_routes_change", "fetch_current_routes", "validate_routes_content", diff --git a/claude_bottle/cli/dashboard.py b/claude_bottle/cli/dashboard.py index f41db21..f94b0b7 100644 --- a/claude_bottle/cli/dashboard.py +++ b/claude_bottle/cli/dashboard.py @@ -29,6 +29,7 @@ from ..backend.docker.capability_apply import ( ) from ..backend.docker.egress_proxy_apply import ( EgressProxyApplyError, + add_route, apply_routes_change, fetch_current_routes, ) @@ -166,7 +167,13 @@ def approve( diff_before, diff_after = "", "" if qp.proposal.tool == TOOL_EGRESS_PROXY_BLOCK: - diff_before, diff_after = apply_routes_change( + # The proposal is a single-route JSON; add_route fetches the + # current routes from the running egress-proxy, merges the + # new route in, and applies the full merged file. The + # audit log gets the BEFORE/AFTER of the full file so the + # diff renders cleanly even though the agent only proposed + # one entry. + diff_before, diff_after = add_route( qp.proposal.bottle_slug, file_to_apply, ) elif qp.proposal.tool == TOOL_PIPELOCK_BLOCK: diff --git a/claude_bottle/supervise_server.py b/claude_bottle/supervise_server.py index a8fd5b0..dd455e9 100644 --- a/claude_bottle/supervise_server.py +++ b/claude_bottle/supervise_server.py @@ -134,31 +134,61 @@ TOOL_DEFINITIONS: list[dict[str, object]] = [ "description": ( "Call when egress-proxy refused your HTTPS request — host " "without a matching route, or a path outside the route's " - "path_allowlist (typically a 403 from the proxy). First " - "call `list-egress-proxy-routes` to see the current route " - "table; compose a modified version that adds or relaxes " - "the route you need, and pass the full new file plus a " - "justification. The operator approves or rejects in the " - "supervise TUI. On approval the supervisor writes the " - "new routes.yaml on the host, SIGHUPs egress-proxy (the " - "addon's reload swaps the route table atomically without " - "dropping in-flight connections), and mirrors the route " - "hosts onto pipelock's allowlist so the downstream gate " - "lets them through too." + "path_allowlist (typically a 403 from the proxy). Propose " + "a SINGLE route to add: the host you need + (optionally) " + "a path_allowlist + (optionally) an auth block. The " + "supervisor merges the route into the live table at " + "approval time — you do NOT need to see or reproduce the " + "existing routes, and you do not pass a full routes file. " + "If the host already has a route, the proposed " + "path_allowlist entries are unioned with the existing " + "ones (host stays single-route). The operator approves " + "or rejects in the supervise TUI. On approval the " + "supervisor writes the merged routes.yaml, SIGHUPs " + "egress-proxy (atomic swap, no dropped connections), and " + "mirrors the host onto pipelock's allowlist for the " + "downstream gate." ), "inputSchema": { "type": "object", "properties": { - "routes": { + "host": { "type": "string", - "description": "Full proposed routes.yaml file content (JSON text — every JSON document is valid YAML).", + "description": "The hostname to allow (e.g. 'api.github.com'). Case-insensitive on match.", + }, + "path_allowlist": { + "type": "array", + "items": {"type": "string"}, + "description": ( + "Optional URL path prefixes the route permits. " + "Each must start with '/'. Omit to allow all " + "paths under this host (bare-pass route)." + ), + }, + "auth": { + "type": "object", + "description": ( + "Optional credential injection. {scheme, " + "token_ref}: scheme is 'Bearer' or 'token'; " + "token_ref names the host env var holding the " + "secret value. Omit to add a host without " + "credential injection. Ignored if the host " + "already has a route (operator decides auth " + "changes, not the agent)." + ), + "properties": { + "scheme": {"type": "string"}, + "token_ref": {"type": "string"}, + }, + "required": ["scheme", "token_ref"], + "additionalProperties": False, }, "justification": { "type": "string", - "description": "Why this routes change is justified.", + "description": "Why this host needs to be allowed.", }, }, - "required": ["routes", "justification"], + "required": ["host", "justification"], }, }, { @@ -252,15 +282,22 @@ TOOL_DEFINITIONS: list[dict[str, object]] = [ # tool-specific payload (stored in Proposal.proposed_file as # free-form text the apply path interprets per tool). # -# egress-proxy-block: full proposed routes.yaml +# egress-proxy-block: JSON object describing a SINGLE route to +# add — `{host, path_allowlist?, auth?}`. The +# supervisor merges this into the live routes +# file at approval time. # pipelock-block: the full failed URL (scheme + host + path) — # supervisor extracts the host, merges into the # bottle's current allowlist; the path is shown # to the operator for context (pipelock doesn't # do path-level matching). # capability-block: full proposed Dockerfile +# +# Egress-proxy-block doesn't use a single "field name" → the JSON +# payload is constructed from multiple structured input fields in +# `handle_egress_proxy_block`. The mapping stays one-entry-per-tool +# so the generic dispatch keeps working for the other two. PROPOSED_FILE_FIELD: dict[str, str] = { - _sv.TOOL_EGRESS_PROXY_BLOCK: "routes", _sv.TOOL_PIPELOCK_BLOCK: "failed_url", _sv.TOOL_CAPABILITY_BLOCK: "dockerfile", } @@ -269,26 +306,18 @@ PROPOSED_FILE_FIELD: dict[str, str] = { # --- Validation ------------------------------------------------------------ +# Auth schemes accepted on egress-proxy-block proposals — match the +# manifest-side EGRESS_PROXY_AUTH_SCHEMES. +_AUTH_SCHEMES = ("Bearer", "token") + + def validate_proposed_file(tool: str, content: str) -> None: """Syntactic validation. The operator is the real gate; this just catches obvious paste-errors / wrong-tool selections before they enter the queue.""" if not content.strip(): raise _RpcError(ERR_INVALID_PARAMS, f"{tool}: proposed file is empty") - if tool == _sv.TOOL_EGRESS_PROXY_BLOCK: - try: - parsed = json.loads(content) - except json.JSONDecodeError as e: - raise _RpcError( - ERR_INVALID_PARAMS, - f"{tool}: proposed routes.yaml is not valid JSON: {e}", - ) from e - if not isinstance(parsed, dict) or not isinstance(parsed.get("routes"), list): - raise _RpcError( - ERR_INVALID_PARAMS, - f"{tool}: proposed routes.yaml must be an object with a 'routes' array", - ) - elif tool == _sv.TOOL_PIPELOCK_BLOCK: + if tool == _sv.TOOL_PIPELOCK_BLOCK: # `content` is the full failed URL. Require scheme + host so # the supervisor can extract a hostname for the allowlist # merge; the path is preserved for operator context. @@ -312,6 +341,70 @@ def validate_proposed_file(tool: str, content: str) -> None: raise _RpcError(ERR_INVALID_PARAMS, f"unknown tool {tool!r}") +def _validate_and_bundle_egress_route( + args: dict[str, object], +) -> str: + """Validate egress-proxy-block input fields and bundle them into + a JSON string that becomes the Proposal.proposed_file. Raises + _RpcError on bad input — the agent retries with a fixed shape.""" + tool = _sv.TOOL_EGRESS_PROXY_BLOCK + host = args.get("host") + if not isinstance(host, str) or not host.strip(): + raise _RpcError( + ERR_INVALID_PARAMS, + f"{tool}: 'host' is required and must be a non-empty string", + ) + payload: dict[str, object] = {"host": host} + + path_allow_raw = args.get("path_allowlist") + if path_allow_raw is not None: + if not isinstance(path_allow_raw, list): + raise _RpcError( + ERR_INVALID_PARAMS, + f"{tool}: 'path_allowlist' must be an array of strings", + ) + prefixes: list[str] = [] + for i, p in enumerate(path_allow_raw): + if not isinstance(p, str): + raise _RpcError( + ERR_INVALID_PARAMS, + f"{tool}: path_allowlist[{i}] must be a string", + ) + if not p.startswith("/"): + raise _RpcError( + ERR_INVALID_PARAMS, + f"{tool}: path_allowlist[{i}] {p!r} must start with '/'", + ) + prefixes.append(p) + if prefixes: + payload["path_allowlist"] = prefixes + + auth_raw = args.get("auth") + if auth_raw is not None: + if not isinstance(auth_raw, dict): + raise _RpcError( + ERR_INVALID_PARAMS, + f"{tool}: 'auth' must be an object with 'scheme' and 'token_ref'", + ) + scheme = auth_raw.get("scheme") + token_ref = auth_raw.get("token_ref") + if not isinstance(scheme, str) or scheme not in _AUTH_SCHEMES: + raise _RpcError( + ERR_INVALID_PARAMS, + f"{tool}: auth.scheme must be one of " + f"{', '.join(_AUTH_SCHEMES)} (got {scheme!r})", + ) + if not isinstance(token_ref, str) or not token_ref: + raise _RpcError( + ERR_INVALID_PARAMS, + f"{tool}: auth.token_ref must be a non-empty string " + f"naming the host env var holding the token", + ) + payload["auth"] = {"scheme": scheme, "token_ref": token_ref} + + return json.dumps(payload, indent=2) + "\n" + + # --- MCP handlers ---------------------------------------------------------- @@ -384,27 +477,35 @@ def handle_tools_call( raise _RpcError(ERR_INVALID_PARAMS, "tools/call missing 'name'") if name == _sv.TOOL_LIST_EGRESS_PROXY_ROUTES: return handle_list_egress_proxy_routes(params.get("arguments", {}), config) - if name not in PROPOSED_FILE_FIELD: - raise _RpcError(ERR_INVALID_PARAMS, f"unknown tool {name!r}") + args_raw = params.get("arguments", {}) if not isinstance(args_raw, dict): raise _RpcError(ERR_INVALID_PARAMS, "tools/call 'arguments' must be an object") - file_field = PROPOSED_FILE_FIELD[name] - proposed_file = args_raw.get(file_field) justification = args_raw.get("justification") - if not isinstance(proposed_file, str): - raise _RpcError( - ERR_INVALID_PARAMS, - f"{name}: '{file_field}' is required and must be a string", - ) if not isinstance(justification, str) or not justification.strip(): raise _RpcError( ERR_INVALID_PARAMS, f"{name}: 'justification' is required and must be a non-empty string", ) - validate_proposed_file(name, proposed_file) + if name == _sv.TOOL_EGRESS_PROXY_BLOCK: + # Structured input → JSON bundle on Proposal.proposed_file. + # The dashboard's apply step (egress_proxy_apply.add_route) + # parses this JSON, fetches the current routes, merges in + # the new one, and writes the merged file. + proposed_file = _validate_and_bundle_egress_route(args_raw) + elif name in PROPOSED_FILE_FIELD: + file_field = PROPOSED_FILE_FIELD[name] + proposed_file = args_raw.get(file_field) + if not isinstance(proposed_file, str): + raise _RpcError( + ERR_INVALID_PARAMS, + f"{name}: '{file_field}' is required and must be a string", + ) + validate_proposed_file(name, proposed_file) + else: + raise _RpcError(ERR_INVALID_PARAMS, f"unknown tool {name!r}") proposal = _sv.Proposal.new( bottle_slug=config.bottle_slug, diff --git a/tests/integration/test_supervise_sidecar.py b/tests/integration/test_supervise_sidecar.py index fc19f23..eb33e5e 100644 --- a/tests/integration/test_supervise_sidecar.py +++ b/tests/integration/test_supervise_sidecar.py @@ -218,13 +218,13 @@ class TestSuperviseSidecar(unittest.TestCase): self._bring_up_sidecar() # Stub the apply step. The dashboard's approve() calls - # apply_routes_change to docker-exec into the egress-proxy - # sidecar; this test isn't exercising the real sidecar, so - # patch it to a no-op that returns plausible before/after - # strings the audit-log writer can render. + # add_route to docker-exec into the egress-proxy sidecar; + # this test isn't exercising the real sidecar, so patch it + # to a no-op that returns plausible before/after strings + # the audit-log writer can render. from claude_bottle.cli import dashboard as _dash - original_apply = _dash.apply_routes_change - _dash.apply_routes_change = ( + original_apply = _dash.add_route + _dash.add_route = ( lambda slug, new: ("(stubbed before)", new) ) @@ -236,7 +236,7 @@ class TestSuperviseSidecar(unittest.TestCase): "params": { "name": _sv.TOOL_EGRESS_PROXY_BLOCK, "arguments": { - "routes": '{"routes": [{"host": "api.example.com"}]}', + "host": "api.example.com", "justification": "integration test", }, }, @@ -270,7 +270,7 @@ class TestSuperviseSidecar(unittest.TestCase): # file and returns to the curl caller. dashboard.approve(qp, notes="lgtm from integration test") finally: - _dash.apply_routes_change = original_apply + _dash.add_route = original_apply t.join(timeout=20) response = captured.get("response") diff --git a/tests/unit/test_dashboard.py b/tests/unit/test_dashboard.py index 39ce94a..834d9ce 100644 --- a/tests/unit/test_dashboard.py +++ b/tests/unit/test_dashboard.py @@ -127,14 +127,14 @@ class TestDiscoverPending(_FakeHomeMixin, unittest.TestCase): class TestApproveReject(_FakeHomeMixin, unittest.TestCase): def setUp(self): self._setup_fake_home() - self._original_apply_routes = dashboard.apply_routes_change + self._original_add_route = dashboard.add_route self._original_apply_allowlist = dashboard.apply_allowlist_change self._original_fetch_allowlist = dashboard.fetch_current_allowlist self._original_apply_capability = dashboard.apply_capability_change # Default stubs: succeed with deterministic before/after so the # audit log shows a non-empty diff. - dashboard.apply_routes_change = lambda slug, content: ( - '{"routes": []}\n', content, + dashboard.add_route = lambda slug, content: ( + '{"routes": []}\n', '{"routes": [{"host": "x"}]}\n', ) dashboard.apply_allowlist_change = lambda slug, content: ( "old.example\n", content, @@ -145,7 +145,7 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase): ) def tearDown(self): - dashboard.apply_routes_change = self._original_apply_routes + dashboard.add_route = self._original_add_route dashboard.apply_allowlist_change = self._original_apply_allowlist dashboard.fetch_current_allowlist = self._original_fetch_allowlist dashboard.apply_capability_change = self._original_apply_capability @@ -204,19 +204,19 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase): class TestEgressProxyApplyWiring(_FakeHomeMixin, unittest.TestCase): - """PRD 0014 Phase 3: approve() on a egress-proxy-block proposal - must call apply_routes_change with the right args and surface - its failures.""" + """PRD 0017 chunk 3: approve() on an egress-proxy-block proposal + must call add_route (single-route merge) with the right args + and surface its failures.""" def setUp(self): self._setup_fake_home() - self._original_apply = dashboard.apply_routes_change + self._original_add_route = dashboard.add_route def tearDown(self): - dashboard.apply_routes_change = self._original_apply + dashboard.add_route = self._original_add_route self._teardown_fake_home() - def _enqueue_egress_proxy(self, proposed: str = '{"routes": []}\n'): + def _enqueue_egress_proxy(self, proposed: str = '{"host": "x.example"}\n'): p = Proposal.new( bottle_slug="dev", tool=TOOL_EGRESS_PROXY_BLOCK, proposed_file=proposed, @@ -229,29 +229,40 @@ class TestEgressProxyApplyWiring(_FakeHomeMixin, unittest.TestCase): supervise.write_proposal(qdir, p) return dashboard.QueuedProposal(proposal=p, queue_dir=qdir) - def test_egress_proxy_block_calls_apply_with_proposed_file(self): + def test_egress_proxy_block_calls_add_route_with_proposed_json(self): calls = [] - dashboard.apply_routes_change = lambda slug, content: ( - calls.append((slug, content)) or ("before", content) + dashboard.add_route = lambda slug, content: ( + calls.append((slug, content)) or ("before", "after") + ) + qp = self._enqueue_egress_proxy( + proposed='{"host": "new.example", "path_allowlist": ["/x/"]}\n' ) - qp = self._enqueue_egress_proxy(proposed='{"routes": [{"path": "/new/"}]}\n') dashboard.approve(qp) self.assertEqual(1, len(calls)) slug, content = calls[0] self.assertEqual("dev", slug) - self.assertEqual('{"routes": [{"path": "/new/"}]}\n', content) + # The single-route JSON the agent proposed reaches add_route + # unchanged — add_route fetches current state + merges. + self.assertEqual( + '{"host": "new.example", "path_allowlist": ["/x/"]}\n', + content, + ) - def test_modify_passes_final_file_to_apply(self): + def test_modify_passes_final_file_to_add_route(self): calls = [] - dashboard.apply_routes_change = lambda slug, content: ( - calls.append(content) or ("before", content) + dashboard.add_route = lambda slug, content: ( + calls.append(content) or ("before", "after") ) qp = self._enqueue_egress_proxy() - dashboard.approve(qp, final_file='{"routes": [{"path": "/edited/"}]}\n', notes="tweaked") - self.assertEqual(['{"routes": [{"path": "/edited/"}]}\n'], calls) + dashboard.approve( + qp, + final_file='{"host": "edited.example"}\n', + notes="tweaked", + ) + self.assertEqual(['{"host": "edited.example"}\n'], calls) def test_apply_failure_blocks_response_and_audit(self): - dashboard.apply_routes_change = lambda slug, content: (_ for _ in ()).throw( + dashboard.add_route = lambda slug, content: (_ for _ in ()).throw( EgressProxyApplyError("docker exec failed") ) qp = self._enqueue_egress_proxy() @@ -266,15 +277,15 @@ class TestEgressProxyApplyWiring(_FakeHomeMixin, unittest.TestCase): self.assertEqual([], read_audit_entries("egress-proxy", "dev")) def test_real_diff_lands_in_audit(self): - dashboard.apply_routes_change = lambda slug, content: ( + dashboard.add_route = lambda slug, content: ( '{"routes": []}\n', # before - '{"routes": [{"path": "/new/"}]}\n', # after + '{"routes": [{"host": "new.example"}]}\n', # after ) - qp = self._enqueue_egress_proxy(proposed='{"routes": [{"path": "/new/"}]}\n') + qp = self._enqueue_egress_proxy(proposed='{"host": "new.example"}\n') dashboard.approve(qp) entries = read_audit_entries("egress-proxy", "dev") self.assertEqual(1, len(entries)) - self.assertIn('+{"routes": [{"path": "/new/"}]}', entries[0].diff) + self.assertIn('+{"routes": [{"host": "new.example"}]}', entries[0].diff) self.assertIn('-{"routes": []}', entries[0].diff) def test_reject_does_not_call_apply(self): diff --git a/tests/unit/test_egress_proxy_apply.py b/tests/unit/test_egress_proxy_apply.py index 2cd7428..f300db1 100644 --- a/tests/unit/test_egress_proxy_apply.py +++ b/tests/unit/test_egress_proxy_apply.py @@ -4,9 +4,12 @@ integration test.""" import unittest +import json + from claude_bottle.backend.docker.egress_proxy_apply import ( EgressProxyApplyError, _hosts_in_routes, + _merge_single_route, validate_routes_content, ) @@ -83,5 +86,108 @@ class TestHostsInRoutes(unittest.TestCase): _hosts_in_routes('{"routes": [{"path": "/no-host/"}]}') +class TestMergeSingleRoute(unittest.TestCase): + BASE = '{"routes": [{"host": "api.anthropic.com"}]}' + + def test_appends_route_when_host_absent(self): + merged = _merge_single_route(self.BASE, {"host": "github.com"}) + routes = json.loads(merged)["routes"] + hosts = [r["host"] for r in routes] + self.assertEqual(["api.anthropic.com", "github.com"], hosts) + + def test_appends_path_allowlist(self): + merged = _merge_single_route( + self.BASE, + {"host": "github.com", "path_allowlist": ["/repos/x/"]}, + ) + new_route = json.loads(merged)["routes"][-1] + self.assertEqual(["/repos/x/"], new_route["path_allowlist"]) + + def test_appends_auth_with_token_env_slot(self): + merged = _merge_single_route( + self.BASE, + { + "host": "api.github.com", + "auth": {"scheme": "Bearer", "token_ref": "GH"}, + }, + ) + new_route = json.loads(merged)["routes"][-1] + self.assertEqual("Bearer", new_route["auth_scheme"]) + # First auth slot when no prior auth routes exist. + self.assertEqual("EGRESS_PROXY_TOKEN_0", new_route["token_env"]) + + def test_auth_slot_increments_past_existing(self): + base = json.dumps({"routes": [ + {"host": "api.anthropic.com", + "auth_scheme": "Bearer", + "token_env": "EGRESS_PROXY_TOKEN_0"}, + ]}) + merged = _merge_single_route(base, { + "host": "api.github.com", + "auth": {"scheme": "Bearer", "token_ref": "GH"}, + }) + new_route = json.loads(merged)["routes"][-1] + self.assertEqual("EGRESS_PROXY_TOKEN_1", new_route["token_env"]) + + def test_existing_host_merges_path_allowlist_as_union(self): + base = json.dumps({"routes": [ + {"host": "github.com", "path_allowlist": ["/a/"]}, + ]}) + merged = _merge_single_route(base, { + "host": "github.com", + "path_allowlist": ["/b/"], + }) + routes = json.loads(merged)["routes"] + self.assertEqual(1, len(routes)) # not duplicated + self.assertEqual(["/a/", "/b/"], routes[0]["path_allowlist"]) + + def test_existing_host_dedup_path_allowlist(self): + base = json.dumps({"routes": [ + {"host": "github.com", "path_allowlist": ["/a/"]}, + ]}) + merged = _merge_single_route(base, { + "host": "github.com", + "path_allowlist": ["/a/", "/b/"], + }) + self.assertEqual( + ["/a/", "/b/"], + json.loads(merged)["routes"][0]["path_allowlist"], + ) + + def test_existing_host_preserves_existing_auth_ignores_proposed(self): + # Tool docs: auth on an existing host is operator-controlled, + # not agent-controlled. The merge must not overwrite. + base = json.dumps({"routes": [ + {"host": "api.github.com", + "auth_scheme": "Bearer", + "token_env": "EGRESS_PROXY_TOKEN_0"}, + ]}) + merged = _merge_single_route(base, { + "host": "api.github.com", + "auth": {"scheme": "token", "token_ref": "DIFFERENT"}, + }) + route = json.loads(merged)["routes"][0] + self.assertEqual("Bearer", route["auth_scheme"]) + self.assertEqual("EGRESS_PROXY_TOKEN_0", route["token_env"]) + + def test_host_match_is_case_insensitive(self): + base = json.dumps({"routes": [{"host": "GitHub.com"}]}) + merged = _merge_single_route(base, { + "host": "github.com", + "path_allowlist": ["/x/"], + }) + routes = json.loads(merged)["routes"] + self.assertEqual(1, len(routes)) + self.assertEqual(["/x/"], routes[0]["path_allowlist"]) + + def test_missing_host_raises(self): + with self.assertRaises(EgressProxyApplyError): + _merge_single_route(self.BASE, {}) + + def test_invalid_current_yaml_raises(self): + with self.assertRaises(EgressProxyApplyError): + _merge_single_route("{not json", {"host": "x.example"}) + + if __name__ == "__main__": unittest.main() diff --git a/tests/unit/test_supervise_server.py b/tests/unit/test_supervise_server.py index d4616f6..eb7c296 100644 --- a/tests/unit/test_supervise_server.py +++ b/tests/unit/test_supervise_server.py @@ -45,22 +45,6 @@ from claude_bottle.supervise_server import ( class TestValidation(unittest.TestCase): - def test_egress_proxy_block_requires_valid_json(self): - with self.assertRaises(_RpcError) as cm: - validate_proposed_file(_sv.TOOL_EGRESS_PROXY_BLOCK, "{not json") - self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code) - self.assertIn("not valid JSON", cm.exception.message) - - def test_egress_proxy_block_requires_routes_array(self): - with self.assertRaises(_RpcError): - validate_proposed_file(_sv.TOOL_EGRESS_PROXY_BLOCK, '{"other": []}') - - def test_egress_proxy_block_accepts_valid_routes(self): - validate_proposed_file( - _sv.TOOL_EGRESS_PROXY_BLOCK, - '{"routes": [{"path": "/x/", "upstream": "https://example.com"}]}', - ) - def test_pipelock_block_accepts_https_url(self): validate_proposed_file( _sv.TOOL_PIPELOCK_BLOCK, @@ -89,8 +73,12 @@ class TestValidation(unittest.TestCase): "FROM python:3.13\nRUN apk add git\n", ) - def test_empty_proposed_file_rejected_for_all_tools(self): - for tool in _sv.TOOLS: + def test_empty_proposed_file_rejected_for_tools_with_file_field(self): + # egress-proxy-block has structured input (validated in + # _validate_and_bundle_egress_route, not here) and + # list-egress-proxy-routes takes no input. Only the other + # two go through `validate_proposed_file`. + for tool in (_sv.TOOL_PIPELOCK_BLOCK, _sv.TOOL_CAPABILITY_BLOCK): with self.subTest(tool=tool): with self.assertRaises(_RpcError): validate_proposed_file(tool, " \n\t") @@ -243,7 +231,7 @@ class TestHandleToolsCall(unittest.TestCase): { "name": _sv.TOOL_EGRESS_PROXY_BLOCK, "arguments": { - "routes": '{"routes": []}', + "host": "example.com", "justification": "need a route", }, }, @@ -286,7 +274,7 @@ class TestHandleToolsCall(unittest.TestCase): handle_tools_call( { "name": _sv.TOOL_EGRESS_PROXY_BLOCK, - "arguments": {"routes": '{"routes": []}'}, + "arguments": {"host": "example.com"}, }, self.config, ) @@ -298,7 +286,7 @@ class TestHandleToolsCall(unittest.TestCase): { "name": _sv.TOOL_EGRESS_PROXY_BLOCK, "arguments": { - "routes": '{"routes": []}', + "host": "example.com", "justification": "x", }, },