feat(egress-proxy): retarget remediation flow (PRD 0017 chunk 3) #30

Merged
didericis merged 18 commits from egress-proxy-block-remediation into main 2026-05-25 20:34:24 -04:00
2 changed files with 47 additions and 18 deletions
Showing only changes of commit e26fe874e4 - Show all commits
@@ -91,21 +91,33 @@ def _hosts_in_routes(content: str) -> list[str]:
# Pipelock's allowlist parser accepts only literal hostnames: # Pipelock's allowlist parser accepts only literal hostnames:
# `[A-Za-z0-9_.-]+`. Wildcard hosts (e.g. `*.example.com`) that # `[A-Za-z0-9_.-]+`. Wildcard hosts (e.g. `*.example.com`) that
# egress-proxy's route table accepts MUST be stripped here or the # egress-proxy's route table accepts get normalised here by
# whole pipelock apply fails parse before the new allowlist is # stripping the leading `*.` (so `*.example.com` → `example.com`)
# even written. Egress-proxy still has the wildcard route on its # — egress-proxy retains the wildcard for its own host matching,
# side; pipelock's allowlist just won't pin a hostname for the # and pipelock's allowlist gets the suffix, which still permits
# wildcard-matched traffic (the user accepts that pipelock-side # the wildcard-matched upstream connections without expanding to
# enforcement is hostname-only for those routes). # arbitrary subdomains. Hosts that still don't fit the pipelock
# charset after normalisation (bare `*`, IPv6 literals, weird
# chars) are silently skipped.
_PIPELOCK_HOST_RE = re.compile(r"^[A-Za-z0-9_.-]+$") _PIPELOCK_HOST_RE = re.compile(r"^[A-Za-z0-9_.-]+$")
def _pipelock_safe_hosts(hosts: list[str]) -> list[str]: def _pipelock_safe_hosts(hosts: list[str]) -> list[str]:
"""Drop any host pipelock's allowlist parser would reject — """Normalise hosts for pipelock's allowlist: strip leading
today that means anything with characters outside `*.` from wildcards, drop anything that still doesn't match
`[A-Za-z0-9_.-]` (wildcards, IPv6 literals, etc.). Order pipelock's allowed charset. Order preserved; duplicates that
preserved.""" arise from normalisation are de-duped (first-seen wins)."""
return [h for h in hosts if _PIPELOCK_HOST_RE.match(h)] out: list[str] = []
seen: set[str] = set()
for h in hosts:
candidate = h[2:] if h.startswith("*.") else h
if not candidate or not _PIPELOCK_HOST_RE.match(candidate):
continue
if candidate in seen:
continue
seen.add(candidate)
out.append(candidate)
return out
def _mirror_hosts_to_pipelock(slug: str, hosts: list[str]) -> None: def _mirror_hosts_to_pipelock(slug: str, hosts: list[str]) -> None:
+24 -7
View File
@@ -197,16 +197,26 @@ class TestPipelockSafeHosts(unittest.TestCase):
_pipelock_safe_hosts(["api.github.com", "registry.npmjs.org"]), _pipelock_safe_hosts(["api.github.com", "registry.npmjs.org"]),
) )
def test_strips_wildcards(self): def test_strips_wildcard_prefix(self):
# Pipelock's allowlist parser rejects `*` — egress-proxy can # `*.example.com` becomes `example.com` — pipelock pins the
# accept wildcard routes on its side, but the pipelock mirror # suffix, egress-proxy keeps the wildcard on its side.
# has to skip them or apply fails before the new yaml is even
# written.
self.assertEqual( self.assertEqual(
["api.github.com"], ["example.com", "api.github.com"],
_pipelock_safe_hosts(["*.example.com", "api.github.com"]), _pipelock_safe_hosts(["*.example.com", "api.github.com"]),
) )
def test_wildcard_strips_one_label_not_recursive(self):
# `*.foo.bar.com` → `foo.bar.com` (one strip of `*.`).
self.assertEqual(
["foo.bar.com"],
_pipelock_safe_hosts(["*.foo.bar.com"]),
)
def test_drops_bare_wildcard(self):
# `*` alone would normalise to empty; nothing useful to send
# to pipelock.
self.assertEqual([], _pipelock_safe_hosts(["*"]))
def test_strips_ipv6_literals(self): def test_strips_ipv6_literals(self):
# Brackets aren't in pipelock's allowed charset either. # Brackets aren't in pipelock's allowed charset either.
self.assertEqual( self.assertEqual(
@@ -214,11 +224,18 @@ class TestPipelockSafeHosts(unittest.TestCase):
_pipelock_safe_hosts(["[::1]", "api.example.com"]), _pipelock_safe_hosts(["[::1]", "api.example.com"]),
) )
def test_dedupes_after_normalisation(self):
# `*.example.com` + `example.com` both yield `example.com`.
self.assertEqual(
["example.com"],
_pipelock_safe_hosts(["*.example.com", "example.com"]),
)
def test_preserves_order(self): def test_preserves_order(self):
self.assertEqual( self.assertEqual(
["a.example", "b.example", "c.example"], ["a.example", "b.example", "c.example"],
_pipelock_safe_hosts([ _pipelock_safe_hosts([
"a.example", "*.junk", "b.example", "weird host", "c.example", "a.example", "weird host", "b.example", "*", "c.example",
]), ]),
) )