refactor(pipelock): allowlist-resolution helpers take a Bottle, not (manifest, name)
test / run tests/run_tests.py (pull_request) Successful in 17s

Every function in the 'Allowlist resolution' section was doing
`manifest.bottles[bottle_name].X` as its first move. Push the lookup
to the caller and have each helper take a resolved Bottle:

  pipelock_bottle_allowlist
  pipelock_bottle_ssh_hostnames
  pipelock_bottle_ssh_trusted_domains
  pipelock_bottle_ssh_ip_cidrs
  pipelock_effective_allowlist
  pipelock_allowlist_summary

PipelockProxy._build_pipelock_yaml resolves bottle once at the top
and passes it through; DockerBottleBackend.prepare already had the
bottle in scope and now uses it directly. Tests pass the resolved
bottle from each fixture.
This commit is contained in:
2026-05-11 13:44:58 -04:00
parent c62b3204a8
commit 25e67137f2
3 changed files with 31 additions and 26 deletions
+24 -18
View File
@@ -18,7 +18,7 @@ from dataclasses import dataclass
from pathlib import Path
from .log import die, info, warn
from .manifest import Manifest
from .manifest import Bottle, Manifest
from .util import is_ipv4_literal
# Pipelock image, pinned by digest. The digest is the multi-arch image
@@ -58,42 +58,42 @@ def pipelock_proxy_host_port(slug: str) -> str:
# --- Allowlist resolution --------------------------------------------------
def pipelock_bottle_allowlist(manifest: Manifest, bottle_name: str) -> list[str]:
"""Hostnames in bottles[<bottle_name>].egress.allowlist."""
return list(manifest.bottles[bottle_name].egress.allowlist)
def pipelock_bottle_allowlist(bottle: Bottle) -> list[str]:
"""Hostnames in bottle.egress.allowlist."""
return list(bottle.egress.allowlist)
def pipelock_bottle_ssh_hostnames(manifest: Manifest, bottle_name: str) -> list[str]:
return [e.Hostname for e in manifest.bottles[bottle_name].ssh if e.Hostname]
def pipelock_bottle_ssh_hostnames(bottle: Bottle) -> list[str]:
return [e.Hostname for e in bottle.ssh if e.Hostname]
def pipelock_bottle_ssh_trusted_domains(manifest: Manifest, bottle_name: str) -> list[str]:
return [h for h in pipelock_bottle_ssh_hostnames(manifest, bottle_name) if not is_ipv4_literal(h)]
def pipelock_bottle_ssh_trusted_domains(bottle: Bottle) -> list[str]:
return [h for h in pipelock_bottle_ssh_hostnames(bottle) if not is_ipv4_literal(h)]
def pipelock_bottle_ssh_ip_cidrs(manifest: Manifest, bottle_name: str) -> list[str]:
return [f"{h}/32" for h in pipelock_bottle_ssh_hostnames(manifest, bottle_name) if is_ipv4_literal(h)]
def pipelock_bottle_ssh_ip_cidrs(bottle: Bottle) -> list[str]:
return [f"{h}/32" for h in pipelock_bottle_ssh_hostnames(bottle) if is_ipv4_literal(h)]
def pipelock_effective_allowlist(manifest: Manifest, bottle_name: str) -> list[str]:
def pipelock_effective_allowlist(bottle: Bottle) -> list[str]:
"""Deduplicated union of: baked-in defaults, bottle.egress.allowlist,
bottle.ssh[].Hostname. Sorted for stability."""
seen: dict[str, None] = {}
for h in DEFAULT_ALLOWLIST:
seen.setdefault(h, None)
for h in pipelock_bottle_allowlist(manifest, bottle_name):
for h in pipelock_bottle_allowlist(bottle):
if h:
seen.setdefault(h, None)
for h in pipelock_bottle_ssh_hostnames(manifest, bottle_name):
for h in pipelock_bottle_ssh_hostnames(bottle):
if h:
seen.setdefault(h, None)
return sorted(seen.keys())
def pipelock_allowlist_summary(manifest: Manifest, bottle_name: str) -> str:
def pipelock_allowlist_summary(bottle: Bottle) -> str:
"""One-line summary for the y/N preflight display:
"<N> hosts allowed (host1, host2, host3, +M more)"."""
hosts = pipelock_effective_allowlist(manifest, bottle_name)
hosts = pipelock_effective_allowlist(bottle)
count = len(hosts)
if count == 0:
return "0 hosts allowed (none)"
@@ -142,9 +142,15 @@ class PipelockProxy:
port, strict mode + forward_proxy + DLP defaults + scan_env.
Deliberately contains no env values, no secrets, no per-agent
customization beyond the hostname list."""
allowlist = pipelock_effective_allowlist(manifest, bottle_name)
trusted = pipelock_bottle_ssh_trusted_domains(manifest, bottle_name)
ip_cidrs = pipelock_bottle_ssh_ip_cidrs(manifest, bottle_name)
return self._build_pipelock_yaml(manifest, bottle_name, slug, yaml_path)
def _build_pipelock_yaml(
self, manifest: Manifest, bottle_name: str, slug: str, yaml_path: Path
) -> PipelockProxyPlan:
bottle = manifest.bottles[bottle_name]
allowlist = pipelock_effective_allowlist(bottle)
trusted = pipelock_bottle_ssh_trusted_domains(bottle)
ip_cidrs = pipelock_bottle_ssh_ip_cidrs(bottle)
lines: list[str] = []
lines.append("version: 1")