diff --git a/claude_bottle/backend/docker/egress_proxy_apply.py b/claude_bottle/backend/docker/egress_proxy_apply.py index 87af4cf..03ee702 100644 --- a/claude_bottle/backend/docker/egress_proxy_apply.py +++ b/claude_bottle/backend/docker/egress_proxy_apply.py @@ -24,6 +24,7 @@ from __future__ import annotations import json import os +import re import subprocess import tempfile from pathlib import Path @@ -88,16 +89,39 @@ def _hosts_in_routes(content: str) -> list[str]: return sorted({r.host for r in routes if r.host}) +# Pipelock's allowlist parser accepts only literal hostnames: +# `[A-Za-z0-9_.-]+`. Wildcard hosts (e.g. `*.example.com`) that +# egress-proxy's route table accepts MUST be stripped here or the +# whole pipelock apply fails parse before the new allowlist is +# even written. Egress-proxy still has the wildcard route on its +# side; pipelock's allowlist just won't pin a hostname for the +# wildcard-matched traffic (the user accepts that pipelock-side +# enforcement is hostname-only for those routes). +_PIPELOCK_HOST_RE = re.compile(r"^[A-Za-z0-9_.-]+$") + + +def _pipelock_safe_hosts(hosts: list[str]) -> list[str]: + """Drop any host pipelock's allowlist parser would reject — + today that means anything with characters outside + `[A-Za-z0-9_.-]` (wildcards, IPv6 literals, etc.). Order + preserved.""" + return [h for h in hosts if _PIPELOCK_HOST_RE.match(h)] + + def _mirror_hosts_to_pipelock(slug: str, hosts: list[str]) -> None: - """Ensure every `hosts` entry is on pipelock's allowlist. Fetches - pipelock's current allowlist, merges, re-applies. No-op if every - host is already present (apply still restarts pipelock if any - host is new). Raises EgressProxyApplyError on pipelock failures - so the caller's diff/audit reflects the half-state.""" + """Ensure every pipelock-compatible `hosts` entry is on + pipelock's allowlist. Fetches pipelock's current allowlist, + merges, re-applies. Hosts pipelock can't represent (wildcards, + etc.) are silently skipped — they stay live on egress-proxy + but aren't enforced at pipelock. No-op if every host is already + present (apply still restarts pipelock if any host is new). + Raises EgressProxyApplyError on pipelock failures so the + caller's diff/audit reflects the half-state.""" + safe_hosts = _pipelock_safe_hosts(hosts) try: current = fetch_current_allowlist(slug) existing = parse_allowlist_content(current) - merged = sorted(set(existing) | set(hosts)) + merged = sorted(set(existing) | set(safe_hosts)) if merged == sorted(existing): return # nothing to add apply_allowlist_change(slug, render_allowlist_content(merged)) diff --git a/tests/unit/test_egress_proxy_apply.py b/tests/unit/test_egress_proxy_apply.py index f300db1..244bdd4 100644 --- a/tests/unit/test_egress_proxy_apply.py +++ b/tests/unit/test_egress_proxy_apply.py @@ -10,6 +10,7 @@ from claude_bottle.backend.docker.egress_proxy_apply import ( EgressProxyApplyError, _hosts_in_routes, _merge_single_route, + _pipelock_safe_hosts, validate_routes_content, ) @@ -189,5 +190,38 @@ class TestMergeSingleRoute(unittest.TestCase): _merge_single_route("{not json", {"host": "x.example"}) +class TestPipelockSafeHosts(unittest.TestCase): + def test_passes_normal_hostnames_through(self): + self.assertEqual( + ["api.github.com", "registry.npmjs.org"], + _pipelock_safe_hosts(["api.github.com", "registry.npmjs.org"]), + ) + + def test_strips_wildcards(self): + # Pipelock's allowlist parser rejects `*` — egress-proxy can + # accept wildcard routes on its side, but the pipelock mirror + # has to skip them or apply fails before the new yaml is even + # written. + self.assertEqual( + ["api.github.com"], + _pipelock_safe_hosts(["*.example.com", "api.github.com"]), + ) + + def test_strips_ipv6_literals(self): + # Brackets aren't in pipelock's allowed charset either. + self.assertEqual( + ["api.example.com"], + _pipelock_safe_hosts(["[::1]", "api.example.com"]), + ) + + def test_preserves_order(self): + self.assertEqual( + ["a.example", "b.example", "c.example"], + _pipelock_safe_hosts([ + "a.example", "*.junk", "b.example", "weird host", "c.example", + ]), + ) + + if __name__ == "__main__": unittest.main()