refactor(pipelock): drop bottle.ssh carve-outs

PRD 0007: SSH traffic now flows through the per-agent ssh-gate
sidecar, so pipelock should know nothing about bottle.ssh.

Removed:
- pipelock_bottle_ssh_hostnames, _trusted_domains, _ip_cidrs.
- The trusted_domains / ssrf blocks built from ssh entries.
- pipelock_proxy_host_port — its last caller (the ssh provisioner)
  is gone.
- is_ipv4_literal — only used to classify ssh hostnames into
  trusted_domains vs ssrf.ip_allowlist, both of which are gone.

api_allowlist now derives solely from baked-in defaults +
bottle.egress.allowlist. Tests updated to pin the new shape and
assert ssh hostnames do NOT leak into pipelock's config.
This commit is contained in:
2026-05-12 16:08:26 -04:00
parent ce948db0b7
commit 6130ea385f
6 changed files with 29 additions and 122 deletions
-4
View File
@@ -37,10 +37,6 @@ def pipelock_proxy_url(slug: str) -> str:
return f"http://{pipelock_container_name(slug)}:{PIPELOCK_PORT}"
def pipelock_proxy_host_port(slug: str) -> str:
return f"{pipelock_container_name(slug)}:{PIPELOCK_PORT}"
def pipelock_tls_init(stage_dir: Path) -> tuple[Path, Path]:
"""Generate a fresh per-bottle CA via a one-shot pipelock container.
+4 -36
View File
@@ -18,7 +18,6 @@ from pathlib import Path
from typing import cast
from .manifest import Bottle
from .util import is_ipv4_literal
# Baked-in default allowlist for hosts Claude Code itself needs.
DEFAULT_ALLOWLIST: tuple[str, ...] = (
@@ -40,30 +39,17 @@ def pipelock_bottle_allowlist(bottle: Bottle) -> list[str]:
return list(bottle.egress.allowlist)
def pipelock_bottle_ssh_hostnames(bottle: Bottle) -> list[str]:
return [e.Hostname for e in bottle.ssh if e.Hostname]
def pipelock_bottle_ssh_trusted_domains(bottle: Bottle) -> list[str]:
return [h for h in pipelock_bottle_ssh_hostnames(bottle) if not is_ipv4_literal(h)]
def pipelock_bottle_ssh_ip_cidrs(bottle: Bottle) -> list[str]:
return [f"{h}/32" for h in pipelock_bottle_ssh_hostnames(bottle) if is_ipv4_literal(h)]
def pipelock_effective_allowlist(bottle: Bottle) -> list[str]:
"""Deduplicated union of: baked-in defaults, bottle.egress.allowlist,
bottle.ssh[].Hostname. Sorted for stability."""
"""Deduplicated union of: baked-in defaults, bottle.egress.allowlist.
Sorted for stability. Per PRD 0007, bottle.ssh entries do NOT
contribute here — SSH traffic flows through the per-agent ssh-gate
sidecar, not pipelock."""
seen: dict[str, None] = {}
for h in DEFAULT_ALLOWLIST:
seen.setdefault(h, None)
for h in pipelock_bottle_allowlist(bottle):
if h:
seen.setdefault(h, None)
for h in pipelock_bottle_ssh_hostnames(bottle):
if h:
seen.setdefault(h, None)
return sorted(seen.keys())
@@ -116,12 +102,6 @@ def pipelock_build_config(
"api_allowlist": pipelock_effective_allowlist(bottle),
"forward_proxy": {"enabled": True},
}
trusted = pipelock_bottle_ssh_trusted_domains(bottle)
if trusted:
cfg["trusted_domains"] = trusted
ip_cidrs = pipelock_bottle_ssh_ip_cidrs(bottle)
if ip_cidrs:
cfg["ssrf"] = {"ip_allowlist": ip_cidrs}
cfg["dlp"] = {"include_defaults": True, "scan_env": True}
# Body-scan enforcement is a separate pipelock section (each DLP
# "surface" — body, MCP, response — has its own action). Pipelock's
@@ -163,18 +143,6 @@ def pipelock_render_yaml(cfg: dict[str, object]) -> str:
fp = cast(dict[str, object], cfg["forward_proxy"])
lines.append(f" enabled: {_bool(fp['enabled'])}")
lines.append("")
if "trusted_domains" in cfg:
lines.append("trusted_domains:")
for td in cast(list[str], cfg["trusted_domains"]):
lines.append(f' - "{td}"')
lines.append("")
if "ssrf" in cfg:
lines.append("ssrf:")
ssrf = cast(dict[str, object], cfg["ssrf"])
lines.append(" ip_allowlist:")
for cidr in cast(list[str], ssrf["ip_allowlist"]):
lines.append(f' - "{cidr}"')
lines.append("")
lines.append("dlp:")
dlp = cast(dict[str, object], cfg["dlp"])
lines.append(f" include_defaults: {_bool(dlp['include_defaults'])}")
-13
View File
@@ -6,7 +6,6 @@ level deeper, under their backend package."""
from __future__ import annotations
import os
import re
def expand_tilde(path: str) -> str:
@@ -17,15 +16,3 @@ def expand_tilde(path: str) -> str:
home = os.environ.get("HOME", "")
return home + path[1:]
return path
_IPV4_RE = re.compile(r"^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+$")
def is_ipv4_literal(s: str) -> bool:
"""True iff `s` looks like a dotted-quad IPv4 literal. Does not
validate octet ranges; consumers that care about that should run
a stricter check. Empty input returns False."""
if not s:
return False
return bool(_IPV4_RE.match(s))