refactor: rename egress-proxy → egress everywhere
The manifest key is `egress:` now; finish the rename so the rest of the codebase matches. Files (Dockerfile.egress, claude_bottle/egress.py etc.), classes (Egress, EgressConfig, EgressRoute, EgressPlan, DockerEgress), constants (EGRESS_HOSTNAME, EGRESS_ROUTES, ...), container name prefix (claude-bottle-egress-*), docker network alias (egress), the introspection host (_egress.local), the MCP tool IDs (egress-block, list-egress-routes), and the preflight label all drop the `-proxy` suffix.
This commit is contained in:
@@ -0,0 +1,229 @@
|
||||
"""Unit: validate_routes_content (PRD 0014 retargeted by PRD 0017
|
||||
chunk 3). docker exec / cp / kill paths are covered by the
|
||||
integration test."""
|
||||
|
||||
import unittest
|
||||
|
||||
import json
|
||||
|
||||
from claude_bottle.backend.docker.egress_apply import (
|
||||
EgressApplyError,
|
||||
_hosts_in_routes,
|
||||
_merge_single_route,
|
||||
_pipelock_safe_hosts,
|
||||
validate_routes_content,
|
||||
)
|
||||
|
||||
|
||||
class TestValidateRoutesContent(unittest.TestCase):
|
||||
def test_accepts_minimal_route_table(self):
|
||||
validate_routes_content('{"routes": []}')
|
||||
validate_routes_content(
|
||||
'{"routes": [{"host": "api.github.com"}]}'
|
||||
)
|
||||
|
||||
def test_accepts_full_route(self):
|
||||
validate_routes_content(
|
||||
'{"routes": [{"host": "api.github.com",'
|
||||
' "path_allowlist": ["/repos/x/"],'
|
||||
' "auth_scheme": "Bearer",'
|
||||
' "token_env": "EGRESS_TOKEN_0"}]}'
|
||||
)
|
||||
|
||||
def test_rejects_bad_json(self):
|
||||
with self.assertRaises(EgressApplyError) as cm:
|
||||
validate_routes_content("{not json")
|
||||
self.assertIn("not valid", str(cm.exception))
|
||||
|
||||
def test_rejects_non_object_top_level(self):
|
||||
with self.assertRaises(EgressApplyError):
|
||||
validate_routes_content("[]")
|
||||
|
||||
def test_rejects_missing_routes_key(self):
|
||||
with self.assertRaises(EgressApplyError):
|
||||
validate_routes_content('{"other": []}')
|
||||
|
||||
def test_rejects_non_list_routes(self):
|
||||
with self.assertRaises(EgressApplyError):
|
||||
validate_routes_content('{"routes": "not a list"}')
|
||||
|
||||
def test_rejects_partial_auth_pair(self):
|
||||
# The addon-core parser enforces both-or-neither — the apply
|
||||
# path picks this up before SIGHUP'ing the sidecar.
|
||||
with self.assertRaises(EgressApplyError):
|
||||
validate_routes_content(
|
||||
'{"routes": [{"host": "x.example",'
|
||||
' "auth_scheme": "Bearer"}]}'
|
||||
)
|
||||
|
||||
|
||||
class TestHostsInRoutes(unittest.TestCase):
|
||||
def test_extracts_each_unique_host(self):
|
||||
hosts = _hosts_in_routes(
|
||||
'{"routes": [{"host": "api.github.com"},'
|
||||
' {"host": "github.com"},'
|
||||
' {"host": "api.anthropic.com"}]}'
|
||||
)
|
||||
# Sorted+deduped.
|
||||
self.assertEqual(
|
||||
["api.anthropic.com", "api.github.com", "github.com"],
|
||||
hosts,
|
||||
)
|
||||
|
||||
def test_dedupes_same_host(self):
|
||||
hosts = _hosts_in_routes(
|
||||
'{"routes": [{"host": "x.example", "path_allowlist": ["/a/"]},'
|
||||
' {"host": "x.example", "path_allowlist": ["/b/"]}]}'
|
||||
)
|
||||
self.assertEqual(["x.example"], hosts)
|
||||
|
||||
def test_empty_routes_returns_empty(self):
|
||||
self.assertEqual([], _hosts_in_routes('{"routes": []}'))
|
||||
|
||||
def test_invalid_routes_raises(self):
|
||||
# The mirror helper relies on parsing succeeding; bad input
|
||||
# should error before pipelock is touched.
|
||||
with self.assertRaises(EgressApplyError):
|
||||
_hosts_in_routes('{"routes": [{"path": "/no-host/"}]}')
|
||||
|
||||
|
||||
class TestMergeSingleRoute(unittest.TestCase):
|
||||
BASE = '{"routes": [{"host": "api.anthropic.com"}]}'
|
||||
|
||||
def test_appends_route_when_host_absent(self):
|
||||
merged = _merge_single_route(self.BASE, {"host": "github.com"})
|
||||
routes = json.loads(merged)["routes"]
|
||||
hosts = [r["host"] for r in routes]
|
||||
self.assertEqual(["api.anthropic.com", "github.com"], hosts)
|
||||
|
||||
def test_appends_path_allowlist(self):
|
||||
merged = _merge_single_route(
|
||||
self.BASE,
|
||||
{"host": "github.com", "path_allowlist": ["/repos/x/"]},
|
||||
)
|
||||
new_route = json.loads(merged)["routes"][-1]
|
||||
self.assertEqual(["/repos/x/"], new_route["path_allowlist"])
|
||||
|
||||
def test_appends_auth_with_token_env_slot(self):
|
||||
merged = _merge_single_route(
|
||||
self.BASE,
|
||||
{
|
||||
"host": "api.github.com",
|
||||
"auth": {"scheme": "Bearer", "token_ref": "GH"},
|
||||
},
|
||||
)
|
||||
new_route = json.loads(merged)["routes"][-1]
|
||||
self.assertEqual("Bearer", new_route["auth_scheme"])
|
||||
# First auth slot when no prior auth routes exist.
|
||||
self.assertEqual("EGRESS_TOKEN_0", new_route["token_env"])
|
||||
|
||||
def test_auth_slot_increments_past_existing(self):
|
||||
base = json.dumps({"routes": [
|
||||
{"host": "api.anthropic.com",
|
||||
"auth_scheme": "Bearer",
|
||||
"token_env": "EGRESS_TOKEN_0"},
|
||||
]})
|
||||
merged = _merge_single_route(base, {
|
||||
"host": "api.github.com",
|
||||
"auth": {"scheme": "Bearer", "token_ref": "GH"},
|
||||
})
|
||||
new_route = json.loads(merged)["routes"][-1]
|
||||
self.assertEqual("EGRESS_TOKEN_1", new_route["token_env"])
|
||||
|
||||
def test_existing_host_merges_path_allowlist_as_union(self):
|
||||
base = json.dumps({"routes": [
|
||||
{"host": "github.com", "path_allowlist": ["/a/"]},
|
||||
]})
|
||||
merged = _merge_single_route(base, {
|
||||
"host": "github.com",
|
||||
"path_allowlist": ["/b/"],
|
||||
})
|
||||
routes = json.loads(merged)["routes"]
|
||||
self.assertEqual(1, len(routes)) # not duplicated
|
||||
self.assertEqual(["/a/", "/b/"], routes[0]["path_allowlist"])
|
||||
|
||||
def test_existing_host_dedup_path_allowlist(self):
|
||||
base = json.dumps({"routes": [
|
||||
{"host": "github.com", "path_allowlist": ["/a/"]},
|
||||
]})
|
||||
merged = _merge_single_route(base, {
|
||||
"host": "github.com",
|
||||
"path_allowlist": ["/a/", "/b/"],
|
||||
})
|
||||
self.assertEqual(
|
||||
["/a/", "/b/"],
|
||||
json.loads(merged)["routes"][0]["path_allowlist"],
|
||||
)
|
||||
|
||||
def test_existing_host_preserves_existing_auth_ignores_proposed(self):
|
||||
# Tool docs: auth on an existing host is operator-controlled,
|
||||
# not agent-controlled. The merge must not overwrite.
|
||||
base = json.dumps({"routes": [
|
||||
{"host": "api.github.com",
|
||||
"auth_scheme": "Bearer",
|
||||
"token_env": "EGRESS_TOKEN_0"},
|
||||
]})
|
||||
merged = _merge_single_route(base, {
|
||||
"host": "api.github.com",
|
||||
"auth": {"scheme": "token", "token_ref": "DIFFERENT"},
|
||||
})
|
||||
route = json.loads(merged)["routes"][0]
|
||||
self.assertEqual("Bearer", route["auth_scheme"])
|
||||
self.assertEqual("EGRESS_TOKEN_0", route["token_env"])
|
||||
|
||||
def test_host_match_is_case_insensitive(self):
|
||||
base = json.dumps({"routes": [{"host": "GitHub.com"}]})
|
||||
merged = _merge_single_route(base, {
|
||||
"host": "github.com",
|
||||
"path_allowlist": ["/x/"],
|
||||
})
|
||||
routes = json.loads(merged)["routes"]
|
||||
self.assertEqual(1, len(routes))
|
||||
self.assertEqual(["/x/"], routes[0]["path_allowlist"])
|
||||
|
||||
def test_missing_host_raises(self):
|
||||
with self.assertRaises(EgressApplyError):
|
||||
_merge_single_route(self.BASE, {})
|
||||
|
||||
def test_invalid_current_yaml_raises(self):
|
||||
with self.assertRaises(EgressApplyError):
|
||||
_merge_single_route("{not json", {"host": "x.example"})
|
||||
|
||||
|
||||
class TestPipelockSafeHosts(unittest.TestCase):
|
||||
def test_passes_normal_hostnames_through(self):
|
||||
self.assertEqual(
|
||||
["api.github.com", "registry.npmjs.org"],
|
||||
_pipelock_safe_hosts(["api.github.com", "registry.npmjs.org"]),
|
||||
)
|
||||
|
||||
def test_drops_wildcards(self):
|
||||
# Wildcard host matching was removed from egress too,
|
||||
# so a `*.foo.com` route is dead weight anyway; we drop it
|
||||
# entirely from the pipelock mirror so the apply doesn't
|
||||
# fail parse.
|
||||
self.assertEqual(
|
||||
["api.github.com"],
|
||||
_pipelock_safe_hosts(["*.example.com", "api.github.com"]),
|
||||
)
|
||||
|
||||
def test_drops_bare_wildcard(self):
|
||||
self.assertEqual([], _pipelock_safe_hosts(["*"]))
|
||||
|
||||
def test_drops_ipv6_literals(self):
|
||||
self.assertEqual(
|
||||
["api.example.com"],
|
||||
_pipelock_safe_hosts(["[::1]", "api.example.com"]),
|
||||
)
|
||||
|
||||
def test_preserves_order(self):
|
||||
self.assertEqual(
|
||||
["a.example", "b.example", "c.example"],
|
||||
_pipelock_safe_hosts([
|
||||
"a.example", "*.junk", "b.example", "weird host", "c.example",
|
||||
]),
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user