feat(egress-proxy-block): single-route input + merge-on-apply
test / unit (pull_request) Successful in 17s
test / integration (pull_request) Successful in 1m14s

Instead of asking the agent to compose and submit a full routes
file, the tool now takes ONE proposed route — host + optional
path_allowlist + optional auth — and the supervisor merges it
into the live routes table at approval time. The agent no longer
needs to fetch / reproduce / extend the existing allowlist; it
just describes the host it wants reachable.

Tool input (new):
  - `host` (required)
  - `path_allowlist` (optional, array of absolute path prefixes)
  - `auth` (optional, {scheme, token_ref})
  - `justification` (required)

Merge semantics (in `egress_proxy_apply._merge_single_route`):
  - Host NOT in current routes → append the proposed route as a
    new entry. If `auth` is set, assign the next EGRESS_PROXY_TOKEN_N
    slot.
  - Host already present → union the proposed `path_allowlist`
    with the existing one (proposed entries appended after
    existing, deduped). Existing `auth_scheme` / `token_env`
    preserved; proposed `auth` ignored (operator-controlled, not
    agent-controlled).
  - Hostname comparison is case-insensitive.

Dashboard wiring: `approve()` on an egress-proxy-block proposal
now calls `add_route(slug, proposed_route_json)` instead of
`apply_routes_change(slug, full_file)`. add_route fetches the
current routes from the running egress-proxy, merges, and calls
apply_routes_change with the merged content — so the
pipelock-mirror + SIGHUP plumbing from chunk 3 still runs
end-to-end. Audit diff still captures the full-file before/after.

Tool description rewritten to make the new shape obvious and to
stop pointing the agent at the routes file. The
`list-egress-proxy-routes` tool stays available for agents that
want to see what's currently allowed.

Tests: 9 new `_merge_single_route` cases (host absent/present,
path-allowlist union+dedup, auth-slot indexing, case-insensitive
match, existing-auth preservation, missing-host rejection,
malformed-current rejection). 407 unit + integration pass.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-25 18:45:17 -04:00
parent 3be70eb07a
commit 1542ee0b93
7 changed files with 419 additions and 96 deletions
@@ -22,6 +22,7 @@ operator can retry.
from __future__ import annotations
import json
import os
import subprocess
import tempfile
@@ -180,8 +181,117 @@ def apply_routes_change(slug: str, new_content: str) -> tuple[str, str]:
return before, new_content
def _merge_single_route(
current_yaml: str, new_route: dict[str, object],
) -> str:
"""Merge a single proposed route into the current routes.yaml
content, returning the merged JSON-as-yaml string.
Behavior:
- If `new_route['host']` is NOT in the current routes →
append the route.
- If the host IS already present → union the path_allowlist
entries (proposed existing). The existing `auth_scheme`
and `token_env` are preserved — agent-proposed auth changes
on an existing host are ignored, matching the tool's
documented semantics.
The supervisor renders the merged routes.yaml with the same
JSON layout the addon expects (host + path_allowlist +
auth_scheme + token_env). Token VALUES never appear here; the
routes file carries only env-var slot NAMES."""
try:
cfg = json.loads(current_yaml)
except json.JSONDecodeError as e:
raise EgressProxyApplyError(
f"current routes.yaml is not valid JSON: {e}"
) from e
routes = cfg.get("routes")
if not isinstance(routes, list):
raise EgressProxyApplyError(
"current routes.yaml: 'routes' is not a list"
)
new_host = str(new_route.get("host", "")).lower()
if not new_host:
raise EgressProxyApplyError(
"proposed route is missing 'host'"
)
proposed_paths = list(new_route.get("path_allowlist") or [])
# Look for an existing entry with the same host (case-insensitive).
for entry in routes:
if not isinstance(entry, dict):
continue
if str(entry.get("host", "")).lower() == new_host:
# Merge path_allowlist: union proposed + existing, ordered
# by first-seen so existing paths stay in original order.
existing_paths: list[str] = list(entry.get("path_allowlist") or [])
seen = {p: None for p in existing_paths}
for p in proposed_paths:
seen.setdefault(p, None)
merged_paths = list(seen.keys())
if merged_paths:
entry["path_allowlist"] = merged_paths
# Preserve existing auth — tool description says agent-
# proposed auth on an existing host is ignored.
break
else:
# Host not present; build a new route entry from the
# proposed fields. Need to assign a token_env slot if
# `auth` was proposed (otherwise the addon's parser rejects
# a half-set auth pair). Slots: count existing slots, pick
# the next free index.
entry = {"host": new_route["host"]}
if proposed_paths:
entry["path_allowlist"] = proposed_paths
auth = new_route.get("auth")
if isinstance(auth, dict) and auth.get("scheme") and auth.get("token_ref"):
existing_slots = sorted({
str(r.get("token_env"))
for r in routes
if isinstance(r, dict) and r.get("token_env")
})
next_idx = len(existing_slots)
entry["auth_scheme"] = str(auth["scheme"])
entry["token_env"] = f"EGRESS_PROXY_TOKEN_{next_idx}"
# NOTE: the addon reads token VALUES from its container's
# environ keyed by token_env. A newly-added auth route at
# runtime points at a slot that has no env value → the
# addon will 403 with "token env unset" until the operator
# arranges for the value to land in the container's env.
# Recording this here so the operator-facing diff carries
# the slot name they'll need to provision.
routes.append(entry)
cfg["routes"] = routes
return json.dumps(cfg, indent=2) + "\n"
def add_route(slug: str, proposed_route_json: str) -> tuple[str, str]:
"""Apply a single-route addition to the egress-proxy. Parses the
agent's proposed route, fetches the current routes file, merges,
and applies via `apply_routes_change`. Returns (before, after)
full-file content for the audit log."""
try:
proposed = json.loads(proposed_route_json)
except json.JSONDecodeError as e:
raise EgressProxyApplyError(
f"proposed route is not valid JSON: {e}"
) from e
if not isinstance(proposed, dict):
raise EgressProxyApplyError(
"proposed route must be a JSON object"
)
current = fetch_current_routes(slug)
merged = _merge_single_route(current, proposed)
return apply_routes_change(slug, merged)
__all__ = [
"EgressProxyApplyError",
"add_route",
"apply_routes_change",
"fetch_current_routes",
"validate_routes_content",
+8 -1
View File
@@ -29,6 +29,7 @@ from ..backend.docker.capability_apply import (
)
from ..backend.docker.egress_proxy_apply import (
EgressProxyApplyError,
add_route,
apply_routes_change,
fetch_current_routes,
)
@@ -166,7 +167,13 @@ def approve(
diff_before, diff_after = "", ""
if qp.proposal.tool == TOOL_EGRESS_PROXY_BLOCK:
diff_before, diff_after = apply_routes_change(
# The proposal is a single-route JSON; add_route fetches the
# current routes from the running egress-proxy, merges the
# new route in, and applies the full merged file. The
# audit log gets the BEFORE/AFTER of the full file so the
# diff renders cleanly even though the agent only proposed
# one entry.
diff_before, diff_after = add_route(
qp.proposal.bottle_slug, file_to_apply,
)
elif qp.proposal.tool == TOOL_PIPELOCK_BLOCK:
+142 -41
View File
@@ -134,31 +134,61 @@ TOOL_DEFINITIONS: list[dict[str, object]] = [
"description": (
"Call when egress-proxy refused your HTTPS request — host "
"without a matching route, or a path outside the route's "
"path_allowlist (typically a 403 from the proxy). First "
"call `list-egress-proxy-routes` to see the current route "
"table; compose a modified version that adds or relaxes "
"the route you need, and pass the full new file plus a "
"justification. The operator approves or rejects in the "
"supervise TUI. On approval the supervisor writes the "
"new routes.yaml on the host, SIGHUPs egress-proxy (the "
"addon's reload swaps the route table atomically without "
"dropping in-flight connections), and mirrors the route "
"hosts onto pipelock's allowlist so the downstream gate "
"lets them through too."
"path_allowlist (typically a 403 from the proxy). Propose "
"a SINGLE route to add: the host you need + (optionally) "
"a path_allowlist + (optionally) an auth block. The "
"supervisor merges the route into the live table at "
"approval time — you do NOT need to see or reproduce the "
"existing routes, and you do not pass a full routes file. "
"If the host already has a route, the proposed "
"path_allowlist entries are unioned with the existing "
"ones (host stays single-route). The operator approves "
"or rejects in the supervise TUI. On approval the "
"supervisor writes the merged routes.yaml, SIGHUPs "
"egress-proxy (atomic swap, no dropped connections), and "
"mirrors the host onto pipelock's allowlist for the "
"downstream gate."
),
"inputSchema": {
"type": "object",
"properties": {
"routes": {
"host": {
"type": "string",
"description": "Full proposed routes.yaml file content (JSON text — every JSON document is valid YAML).",
"description": "The hostname to allow (e.g. 'api.github.com'). Case-insensitive on match.",
},
"path_allowlist": {
"type": "array",
"items": {"type": "string"},
"description": (
"Optional URL path prefixes the route permits. "
"Each must start with '/'. Omit to allow all "
"paths under this host (bare-pass route)."
),
},
"auth": {
"type": "object",
"description": (
"Optional credential injection. {scheme, "
"token_ref}: scheme is 'Bearer' or 'token'; "
"token_ref names the host env var holding the "
"secret value. Omit to add a host without "
"credential injection. Ignored if the host "
"already has a route (operator decides auth "
"changes, not the agent)."
),
"properties": {
"scheme": {"type": "string"},
"token_ref": {"type": "string"},
},
"required": ["scheme", "token_ref"],
"additionalProperties": False,
},
"justification": {
"type": "string",
"description": "Why this routes change is justified.",
"description": "Why this host needs to be allowed.",
},
},
"required": ["routes", "justification"],
"required": ["host", "justification"],
},
},
{
@@ -252,15 +282,22 @@ TOOL_DEFINITIONS: list[dict[str, object]] = [
# tool-specific payload (stored in Proposal.proposed_file as
# free-form text the apply path interprets per tool).
#
# egress-proxy-block: full proposed routes.yaml
# egress-proxy-block: JSON object describing a SINGLE route to
# add — `{host, path_allowlist?, auth?}`. The
# supervisor merges this into the live routes
# file at approval time.
# pipelock-block: the full failed URL (scheme + host + path) —
# supervisor extracts the host, merges into the
# bottle's current allowlist; the path is shown
# to the operator for context (pipelock doesn't
# do path-level matching).
# capability-block: full proposed Dockerfile
#
# Egress-proxy-block doesn't use a single "field name" → the JSON
# payload is constructed from multiple structured input fields in
# `handle_egress_proxy_block`. The mapping stays one-entry-per-tool
# so the generic dispatch keeps working for the other two.
PROPOSED_FILE_FIELD: dict[str, str] = {
_sv.TOOL_EGRESS_PROXY_BLOCK: "routes",
_sv.TOOL_PIPELOCK_BLOCK: "failed_url",
_sv.TOOL_CAPABILITY_BLOCK: "dockerfile",
}
@@ -269,26 +306,18 @@ PROPOSED_FILE_FIELD: dict[str, str] = {
# --- Validation ------------------------------------------------------------
# Auth schemes accepted on egress-proxy-block proposals — match the
# manifest-side EGRESS_PROXY_AUTH_SCHEMES.
_AUTH_SCHEMES = ("Bearer", "token")
def validate_proposed_file(tool: str, content: str) -> None:
"""Syntactic validation. The operator is the real gate; this just
catches obvious paste-errors / wrong-tool selections before they
enter the queue."""
if not content.strip():
raise _RpcError(ERR_INVALID_PARAMS, f"{tool}: proposed file is empty")
if tool == _sv.TOOL_EGRESS_PROXY_BLOCK:
try:
parsed = json.loads(content)
except json.JSONDecodeError as e:
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: proposed routes.yaml is not valid JSON: {e}",
) from e
if not isinstance(parsed, dict) or not isinstance(parsed.get("routes"), list):
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: proposed routes.yaml must be an object with a 'routes' array",
)
elif tool == _sv.TOOL_PIPELOCK_BLOCK:
if tool == _sv.TOOL_PIPELOCK_BLOCK:
# `content` is the full failed URL. Require scheme + host so
# the supervisor can extract a hostname for the allowlist
# merge; the path is preserved for operator context.
@@ -312,6 +341,70 @@ def validate_proposed_file(tool: str, content: str) -> None:
raise _RpcError(ERR_INVALID_PARAMS, f"unknown tool {tool!r}")
def _validate_and_bundle_egress_route(
args: dict[str, object],
) -> str:
"""Validate egress-proxy-block input fields and bundle them into
a JSON string that becomes the Proposal.proposed_file. Raises
_RpcError on bad input — the agent retries with a fixed shape."""
tool = _sv.TOOL_EGRESS_PROXY_BLOCK
host = args.get("host")
if not isinstance(host, str) or not host.strip():
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: 'host' is required and must be a non-empty string",
)
payload: dict[str, object] = {"host": host}
path_allow_raw = args.get("path_allowlist")
if path_allow_raw is not None:
if not isinstance(path_allow_raw, list):
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: 'path_allowlist' must be an array of strings",
)
prefixes: list[str] = []
for i, p in enumerate(path_allow_raw):
if not isinstance(p, str):
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: path_allowlist[{i}] must be a string",
)
if not p.startswith("/"):
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: path_allowlist[{i}] {p!r} must start with '/'",
)
prefixes.append(p)
if prefixes:
payload["path_allowlist"] = prefixes
auth_raw = args.get("auth")
if auth_raw is not None:
if not isinstance(auth_raw, dict):
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: 'auth' must be an object with 'scheme' and 'token_ref'",
)
scheme = auth_raw.get("scheme")
token_ref = auth_raw.get("token_ref")
if not isinstance(scheme, str) or scheme not in _AUTH_SCHEMES:
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: auth.scheme must be one of "
f"{', '.join(_AUTH_SCHEMES)} (got {scheme!r})",
)
if not isinstance(token_ref, str) or not token_ref:
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: auth.token_ref must be a non-empty string "
f"naming the host env var holding the token",
)
payload["auth"] = {"scheme": scheme, "token_ref": token_ref}
return json.dumps(payload, indent=2) + "\n"
# --- MCP handlers ----------------------------------------------------------
@@ -384,27 +477,35 @@ def handle_tools_call(
raise _RpcError(ERR_INVALID_PARAMS, "tools/call missing 'name'")
if name == _sv.TOOL_LIST_EGRESS_PROXY_ROUTES:
return handle_list_egress_proxy_routes(params.get("arguments", {}), config)
if name not in PROPOSED_FILE_FIELD:
raise _RpcError(ERR_INVALID_PARAMS, f"unknown tool {name!r}")
args_raw = params.get("arguments", {})
if not isinstance(args_raw, dict):
raise _RpcError(ERR_INVALID_PARAMS, "tools/call 'arguments' must be an object")
file_field = PROPOSED_FILE_FIELD[name]
proposed_file = args_raw.get(file_field)
justification = args_raw.get("justification")
if not isinstance(proposed_file, str):
raise _RpcError(
ERR_INVALID_PARAMS,
f"{name}: '{file_field}' is required and must be a string",
)
if not isinstance(justification, str) or not justification.strip():
raise _RpcError(
ERR_INVALID_PARAMS,
f"{name}: 'justification' is required and must be a non-empty string",
)
validate_proposed_file(name, proposed_file)
if name == _sv.TOOL_EGRESS_PROXY_BLOCK:
# Structured input → JSON bundle on Proposal.proposed_file.
# The dashboard's apply step (egress_proxy_apply.add_route)
# parses this JSON, fetches the current routes, merges in
# the new one, and writes the merged file.
proposed_file = _validate_and_bundle_egress_route(args_raw)
elif name in PROPOSED_FILE_FIELD:
file_field = PROPOSED_FILE_FIELD[name]
proposed_file = args_raw.get(file_field)
if not isinstance(proposed_file, str):
raise _RpcError(
ERR_INVALID_PARAMS,
f"{name}: '{file_field}' is required and must be a string",
)
validate_proposed_file(name, proposed_file)
else:
raise _RpcError(ERR_INVALID_PARAMS, f"unknown tool {name!r}")
proposal = _sv.Proposal.new(
bottle_slug=config.bottle_slug,
+8 -8
View File
@@ -218,13 +218,13 @@ class TestSuperviseSidecar(unittest.TestCase):
self._bring_up_sidecar()
# Stub the apply step. The dashboard's approve() calls
# apply_routes_change to docker-exec into the egress-proxy
# sidecar; this test isn't exercising the real sidecar, so
# patch it to a no-op that returns plausible before/after
# strings the audit-log writer can render.
# add_route to docker-exec into the egress-proxy sidecar;
# this test isn't exercising the real sidecar, so patch it
# to a no-op that returns plausible before/after strings
# the audit-log writer can render.
from claude_bottle.cli import dashboard as _dash
original_apply = _dash.apply_routes_change
_dash.apply_routes_change = (
original_apply = _dash.add_route
_dash.add_route = (
lambda slug, new: ("(stubbed before)", new)
)
@@ -236,7 +236,7 @@ class TestSuperviseSidecar(unittest.TestCase):
"params": {
"name": _sv.TOOL_EGRESS_PROXY_BLOCK,
"arguments": {
"routes": '{"routes": [{"host": "api.example.com"}]}',
"host": "api.example.com",
"justification": "integration test",
},
},
@@ -270,7 +270,7 @@ class TestSuperviseSidecar(unittest.TestCase):
# file and returns to the curl caller.
dashboard.approve(qp, notes="lgtm from integration test")
finally:
_dash.apply_routes_change = original_apply
_dash.add_route = original_apply
t.join(timeout=20)
response = captured.get("response")
+36 -25
View File
@@ -127,14 +127,14 @@ class TestDiscoverPending(_FakeHomeMixin, unittest.TestCase):
class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
def setUp(self):
self._setup_fake_home()
self._original_apply_routes = dashboard.apply_routes_change
self._original_add_route = dashboard.add_route
self._original_apply_allowlist = dashboard.apply_allowlist_change
self._original_fetch_allowlist = dashboard.fetch_current_allowlist
self._original_apply_capability = dashboard.apply_capability_change
# Default stubs: succeed with deterministic before/after so the
# audit log shows a non-empty diff.
dashboard.apply_routes_change = lambda slug, content: (
'{"routes": []}\n', content,
dashboard.add_route = lambda slug, content: (
'{"routes": []}\n', '{"routes": [{"host": "x"}]}\n',
)
dashboard.apply_allowlist_change = lambda slug, content: (
"old.example\n", content,
@@ -145,7 +145,7 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
)
def tearDown(self):
dashboard.apply_routes_change = self._original_apply_routes
dashboard.add_route = self._original_add_route
dashboard.apply_allowlist_change = self._original_apply_allowlist
dashboard.fetch_current_allowlist = self._original_fetch_allowlist
dashboard.apply_capability_change = self._original_apply_capability
@@ -204,19 +204,19 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
class TestEgressProxyApplyWiring(_FakeHomeMixin, unittest.TestCase):
"""PRD 0014 Phase 3: approve() on a egress-proxy-block proposal
must call apply_routes_change with the right args and surface
its failures."""
"""PRD 0017 chunk 3: approve() on an egress-proxy-block proposal
must call add_route (single-route merge) with the right args
and surface its failures."""
def setUp(self):
self._setup_fake_home()
self._original_apply = dashboard.apply_routes_change
self._original_add_route = dashboard.add_route
def tearDown(self):
dashboard.apply_routes_change = self._original_apply
dashboard.add_route = self._original_add_route
self._teardown_fake_home()
def _enqueue_egress_proxy(self, proposed: str = '{"routes": []}\n'):
def _enqueue_egress_proxy(self, proposed: str = '{"host": "x.example"}\n'):
p = Proposal.new(
bottle_slug="dev", tool=TOOL_EGRESS_PROXY_BLOCK,
proposed_file=proposed,
@@ -229,29 +229,40 @@ class TestEgressProxyApplyWiring(_FakeHomeMixin, unittest.TestCase):
supervise.write_proposal(qdir, p)
return dashboard.QueuedProposal(proposal=p, queue_dir=qdir)
def test_egress_proxy_block_calls_apply_with_proposed_file(self):
def test_egress_proxy_block_calls_add_route_with_proposed_json(self):
calls = []
dashboard.apply_routes_change = lambda slug, content: (
calls.append((slug, content)) or ("before", content)
dashboard.add_route = lambda slug, content: (
calls.append((slug, content)) or ("before", "after")
)
qp = self._enqueue_egress_proxy(
proposed='{"host": "new.example", "path_allowlist": ["/x/"]}\n'
)
qp = self._enqueue_egress_proxy(proposed='{"routes": [{"path": "/new/"}]}\n')
dashboard.approve(qp)
self.assertEqual(1, len(calls))
slug, content = calls[0]
self.assertEqual("dev", slug)
self.assertEqual('{"routes": [{"path": "/new/"}]}\n', content)
# The single-route JSON the agent proposed reaches add_route
# unchanged — add_route fetches current state + merges.
self.assertEqual(
'{"host": "new.example", "path_allowlist": ["/x/"]}\n',
content,
)
def test_modify_passes_final_file_to_apply(self):
def test_modify_passes_final_file_to_add_route(self):
calls = []
dashboard.apply_routes_change = lambda slug, content: (
calls.append(content) or ("before", content)
dashboard.add_route = lambda slug, content: (
calls.append(content) or ("before", "after")
)
qp = self._enqueue_egress_proxy()
dashboard.approve(qp, final_file='{"routes": [{"path": "/edited/"}]}\n', notes="tweaked")
self.assertEqual(['{"routes": [{"path": "/edited/"}]}\n'], calls)
dashboard.approve(
qp,
final_file='{"host": "edited.example"}\n',
notes="tweaked",
)
self.assertEqual(['{"host": "edited.example"}\n'], calls)
def test_apply_failure_blocks_response_and_audit(self):
dashboard.apply_routes_change = lambda slug, content: (_ for _ in ()).throw(
dashboard.add_route = lambda slug, content: (_ for _ in ()).throw(
EgressProxyApplyError("docker exec failed")
)
qp = self._enqueue_egress_proxy()
@@ -266,15 +277,15 @@ class TestEgressProxyApplyWiring(_FakeHomeMixin, unittest.TestCase):
self.assertEqual([], read_audit_entries("egress-proxy", "dev"))
def test_real_diff_lands_in_audit(self):
dashboard.apply_routes_change = lambda slug, content: (
dashboard.add_route = lambda slug, content: (
'{"routes": []}\n', # before
'{"routes": [{"path": "/new/"}]}\n', # after
'{"routes": [{"host": "new.example"}]}\n', # after
)
qp = self._enqueue_egress_proxy(proposed='{"routes": [{"path": "/new/"}]}\n')
qp = self._enqueue_egress_proxy(proposed='{"host": "new.example"}\n')
dashboard.approve(qp)
entries = read_audit_entries("egress-proxy", "dev")
self.assertEqual(1, len(entries))
self.assertIn('+{"routes": [{"path": "/new/"}]}', entries[0].diff)
self.assertIn('+{"routes": [{"host": "new.example"}]}', entries[0].diff)
self.assertIn('-{"routes": []}', entries[0].diff)
def test_reject_does_not_call_apply(self):
+106
View File
@@ -4,9 +4,12 @@ integration test."""
import unittest
import json
from claude_bottle.backend.docker.egress_proxy_apply import (
EgressProxyApplyError,
_hosts_in_routes,
_merge_single_route,
validate_routes_content,
)
@@ -83,5 +86,108 @@ class TestHostsInRoutes(unittest.TestCase):
_hosts_in_routes('{"routes": [{"path": "/no-host/"}]}')
class TestMergeSingleRoute(unittest.TestCase):
BASE = '{"routes": [{"host": "api.anthropic.com"}]}'
def test_appends_route_when_host_absent(self):
merged = _merge_single_route(self.BASE, {"host": "github.com"})
routes = json.loads(merged)["routes"]
hosts = [r["host"] for r in routes]
self.assertEqual(["api.anthropic.com", "github.com"], hosts)
def test_appends_path_allowlist(self):
merged = _merge_single_route(
self.BASE,
{"host": "github.com", "path_allowlist": ["/repos/x/"]},
)
new_route = json.loads(merged)["routes"][-1]
self.assertEqual(["/repos/x/"], new_route["path_allowlist"])
def test_appends_auth_with_token_env_slot(self):
merged = _merge_single_route(
self.BASE,
{
"host": "api.github.com",
"auth": {"scheme": "Bearer", "token_ref": "GH"},
},
)
new_route = json.loads(merged)["routes"][-1]
self.assertEqual("Bearer", new_route["auth_scheme"])
# First auth slot when no prior auth routes exist.
self.assertEqual("EGRESS_PROXY_TOKEN_0", new_route["token_env"])
def test_auth_slot_increments_past_existing(self):
base = json.dumps({"routes": [
{"host": "api.anthropic.com",
"auth_scheme": "Bearer",
"token_env": "EGRESS_PROXY_TOKEN_0"},
]})
merged = _merge_single_route(base, {
"host": "api.github.com",
"auth": {"scheme": "Bearer", "token_ref": "GH"},
})
new_route = json.loads(merged)["routes"][-1]
self.assertEqual("EGRESS_PROXY_TOKEN_1", new_route["token_env"])
def test_existing_host_merges_path_allowlist_as_union(self):
base = json.dumps({"routes": [
{"host": "github.com", "path_allowlist": ["/a/"]},
]})
merged = _merge_single_route(base, {
"host": "github.com",
"path_allowlist": ["/b/"],
})
routes = json.loads(merged)["routes"]
self.assertEqual(1, len(routes)) # not duplicated
self.assertEqual(["/a/", "/b/"], routes[0]["path_allowlist"])
def test_existing_host_dedup_path_allowlist(self):
base = json.dumps({"routes": [
{"host": "github.com", "path_allowlist": ["/a/"]},
]})
merged = _merge_single_route(base, {
"host": "github.com",
"path_allowlist": ["/a/", "/b/"],
})
self.assertEqual(
["/a/", "/b/"],
json.loads(merged)["routes"][0]["path_allowlist"],
)
def test_existing_host_preserves_existing_auth_ignores_proposed(self):
# Tool docs: auth on an existing host is operator-controlled,
# not agent-controlled. The merge must not overwrite.
base = json.dumps({"routes": [
{"host": "api.github.com",
"auth_scheme": "Bearer",
"token_env": "EGRESS_PROXY_TOKEN_0"},
]})
merged = _merge_single_route(base, {
"host": "api.github.com",
"auth": {"scheme": "token", "token_ref": "DIFFERENT"},
})
route = json.loads(merged)["routes"][0]
self.assertEqual("Bearer", route["auth_scheme"])
self.assertEqual("EGRESS_PROXY_TOKEN_0", route["token_env"])
def test_host_match_is_case_insensitive(self):
base = json.dumps({"routes": [{"host": "GitHub.com"}]})
merged = _merge_single_route(base, {
"host": "github.com",
"path_allowlist": ["/x/"],
})
routes = json.loads(merged)["routes"]
self.assertEqual(1, len(routes))
self.assertEqual(["/x/"], routes[0]["path_allowlist"])
def test_missing_host_raises(self):
with self.assertRaises(EgressProxyApplyError):
_merge_single_route(self.BASE, {})
def test_invalid_current_yaml_raises(self):
with self.assertRaises(EgressProxyApplyError):
_merge_single_route("{not json", {"host": "x.example"})
if __name__ == "__main__":
unittest.main()
+9 -21
View File
@@ -45,22 +45,6 @@ from claude_bottle.supervise_server import (
class TestValidation(unittest.TestCase):
def test_egress_proxy_block_requires_valid_json(self):
with self.assertRaises(_RpcError) as cm:
validate_proposed_file(_sv.TOOL_EGRESS_PROXY_BLOCK, "{not json")
self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code)
self.assertIn("not valid JSON", cm.exception.message)
def test_egress_proxy_block_requires_routes_array(self):
with self.assertRaises(_RpcError):
validate_proposed_file(_sv.TOOL_EGRESS_PROXY_BLOCK, '{"other": []}')
def test_egress_proxy_block_accepts_valid_routes(self):
validate_proposed_file(
_sv.TOOL_EGRESS_PROXY_BLOCK,
'{"routes": [{"path": "/x/", "upstream": "https://example.com"}]}',
)
def test_pipelock_block_accepts_https_url(self):
validate_proposed_file(
_sv.TOOL_PIPELOCK_BLOCK,
@@ -89,8 +73,12 @@ class TestValidation(unittest.TestCase):
"FROM python:3.13\nRUN apk add git\n",
)
def test_empty_proposed_file_rejected_for_all_tools(self):
for tool in _sv.TOOLS:
def test_empty_proposed_file_rejected_for_tools_with_file_field(self):
# egress-proxy-block has structured input (validated in
# _validate_and_bundle_egress_route, not here) and
# list-egress-proxy-routes takes no input. Only the other
# two go through `validate_proposed_file`.
for tool in (_sv.TOOL_PIPELOCK_BLOCK, _sv.TOOL_CAPABILITY_BLOCK):
with self.subTest(tool=tool):
with self.assertRaises(_RpcError):
validate_proposed_file(tool, " \n\t")
@@ -243,7 +231,7 @@ class TestHandleToolsCall(unittest.TestCase):
{
"name": _sv.TOOL_EGRESS_PROXY_BLOCK,
"arguments": {
"routes": '{"routes": []}',
"host": "example.com",
"justification": "need a route",
},
},
@@ -286,7 +274,7 @@ class TestHandleToolsCall(unittest.TestCase):
handle_tools_call(
{
"name": _sv.TOOL_EGRESS_PROXY_BLOCK,
"arguments": {"routes": '{"routes": []}'},
"arguments": {"host": "example.com"},
},
self.config,
)
@@ -298,7 +286,7 @@ class TestHandleToolsCall(unittest.TestCase):
{
"name": _sv.TOOL_EGRESS_PROXY_BLOCK,
"arguments": {
"routes": '{"routes": []}',
"host": "example.com",
"justification": "x",
},
},