edd8b444a6
test / run tests/run_tests.py (pull_request) Successful in 18s
PipelockProxy becomes an ABC with the platform-agnostic
prepare/_build_pipelock_yaml as concrete methods and start/stop as
abstract. Docker-specific sidecar lifecycle moves to a new sibling
file:
claude_bottle/backend/docker/pipelock.py
DockerPipelockProxy(PipelockProxy) — implements start (docker
create/cp/network connect/start) and stop (docker inspect/rm -f).
DockerBottleBackend._proxy is now a DockerPipelockProxy instance.
Tests that previously instantiated PipelockProxy() directly switch to
DockerPipelockProxy() (the base is no longer constructable).
200 lines
6.9 KiB
Python
200 lines
6.9 KiB
Python
"""Pipelock sidecar lifecycle for the per-agent egress topology.
|
|
|
|
Pipelock (https://github.com/luckyPipewrench/pipelock) is an HTTP
|
|
forward proxy with hostname allowlisting + DLP scanning + URL-entropy
|
|
checks. One sidecar per agent, attached to the agent's --internal
|
|
network and a per-agent user-defined egress bridge. Combined with
|
|
HTTPS_PROXY/HTTP_PROXY pointing at the sidecar's service name, pipelock
|
|
is the only egress route the agent has.
|
|
|
|
Image pin: ghcr.io/luckypipewrench/pipelock@sha256:<digest> for tag 2.3.0.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
from abc import ABC, abstractmethod
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
from .manifest import Bottle, Manifest
|
|
from .util import is_ipv4_literal
|
|
|
|
# Pipelock image, pinned by digest. The digest is the multi-arch image
|
|
# index for ghcr.io/luckypipewrench/pipelock:2.3.0.
|
|
PIPELOCK_IMAGE = os.environ.get(
|
|
"CLAUDE_BOTTLE_PIPELOCK_IMAGE",
|
|
"ghcr.io/luckypipewrench/pipelock@sha256:3b1a39417b98406ddc5dc2d8fcb42865ddc0c68a43d355db55f0f8cb06bc6de9",
|
|
)
|
|
|
|
# Listening port for pipelock's forward proxy.
|
|
PIPELOCK_PORT = os.environ.get("CLAUDE_BOTTLE_PIPELOCK_PORT", "8888")
|
|
|
|
# Baked-in default allowlist for hosts Claude Code itself needs.
|
|
DEFAULT_ALLOWLIST: tuple[str, ...] = (
|
|
"api.anthropic.com",
|
|
"statsig.anthropic.com",
|
|
"sentry.io",
|
|
"claude.ai",
|
|
"platform.claude.com",
|
|
"downloads.claude.ai",
|
|
"raw.githubusercontent.com",
|
|
)
|
|
|
|
|
|
def pipelock_container_name(slug: str) -> str:
|
|
return f"claude-bottle-pipelock-{slug}"
|
|
|
|
|
|
def pipelock_proxy_url(slug: str) -> str:
|
|
return f"http://{pipelock_container_name(slug)}:{PIPELOCK_PORT}"
|
|
|
|
|
|
def pipelock_proxy_host_port(slug: str) -> str:
|
|
return f"{pipelock_container_name(slug)}:{PIPELOCK_PORT}"
|
|
|
|
|
|
# --- Allowlist resolution --------------------------------------------------
|
|
|
|
|
|
def pipelock_bottle_allowlist(bottle: Bottle) -> list[str]:
|
|
"""Hostnames in bottle.egress.allowlist."""
|
|
return list(bottle.egress.allowlist)
|
|
|
|
|
|
def pipelock_bottle_ssh_hostnames(bottle: Bottle) -> list[str]:
|
|
return [e.Hostname for e in bottle.ssh if e.Hostname]
|
|
|
|
|
|
def pipelock_bottle_ssh_trusted_domains(bottle: Bottle) -> list[str]:
|
|
return [h for h in pipelock_bottle_ssh_hostnames(bottle) if not is_ipv4_literal(h)]
|
|
|
|
|
|
def pipelock_bottle_ssh_ip_cidrs(bottle: Bottle) -> list[str]:
|
|
return [f"{h}/32" for h in pipelock_bottle_ssh_hostnames(bottle) if is_ipv4_literal(h)]
|
|
|
|
|
|
def pipelock_effective_allowlist(bottle: Bottle) -> list[str]:
|
|
"""Deduplicated union of: baked-in defaults, bottle.egress.allowlist,
|
|
bottle.ssh[].Hostname. Sorted for stability."""
|
|
seen: dict[str, None] = {}
|
|
for h in DEFAULT_ALLOWLIST:
|
|
seen.setdefault(h, None)
|
|
for h in pipelock_bottle_allowlist(bottle):
|
|
if h:
|
|
seen.setdefault(h, None)
|
|
for h in pipelock_bottle_ssh_hostnames(bottle):
|
|
if h:
|
|
seen.setdefault(h, None)
|
|
return sorted(seen.keys())
|
|
|
|
|
|
def pipelock_allowlist_summary(bottle: Bottle) -> str:
|
|
"""One-line summary for the y/N preflight display:
|
|
"<N> hosts allowed (host1, host2, host3, +M more)"."""
|
|
hosts = pipelock_effective_allowlist(bottle)
|
|
count = len(hosts)
|
|
if count == 0:
|
|
return "0 hosts allowed (none)"
|
|
show = count
|
|
more = 0
|
|
if count > 5:
|
|
show = 3
|
|
more = count - show
|
|
joined = ", ".join(hosts[:show])
|
|
if more > 0:
|
|
return f"{count} hosts allowed ({joined}, +{more} more)"
|
|
return f"{count} hosts allowed ({joined})"
|
|
|
|
|
|
|
|
# --- Proxy class -----------------------------------------------------------
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class PipelockProxyPlan:
|
|
"""Output of PipelockProxy.prepare; consumed by .start when the
|
|
sidecar needs to be brought up.
|
|
|
|
yaml_path + slug are filled in at prepare time. internal_network
|
|
and egress_network default to empty and are populated by the
|
|
backend's launch step (via dataclasses.replace) once those networks
|
|
have actually been created."""
|
|
|
|
yaml_path: Path
|
|
slug: str
|
|
internal_network: str = ""
|
|
egress_network: str = ""
|
|
|
|
|
|
class PipelockProxy(ABC):
|
|
"""The pipelock egress proxy. Encapsulates the YAML-config
|
|
generation; the sidecar's start/stop lifecycle is backend-specific
|
|
and lives on concrete subclasses (e.g. DockerPipelockProxy)."""
|
|
|
|
def prepare(
|
|
self, manifest: Manifest, bottle_name: str, slug: str, yaml_path: Path
|
|
) -> PipelockProxyPlan:
|
|
"""Write the pipelock yaml config (mode 600) to `yaml_path`
|
|
for the sidecar to consume when it boots. Carries the
|
|
effective allowlist (bottle.egress.allowlist UNION
|
|
claude-bottle defaults UNION ssh hostnames), a fixed listen
|
|
port, strict mode + forward_proxy + DLP defaults + scan_env.
|
|
Deliberately contains no env values, no secrets, no per-agent
|
|
customization beyond the hostname list."""
|
|
return self._build_pipelock_yaml(manifest, bottle_name, slug, yaml_path)
|
|
|
|
def _build_pipelock_yaml(
|
|
self, manifest: Manifest, bottle_name: str, slug: str, yaml_path: Path
|
|
) -> PipelockProxyPlan:
|
|
bottle = manifest.bottles[bottle_name]
|
|
allowlist = pipelock_effective_allowlist(bottle)
|
|
trusted = pipelock_bottle_ssh_trusted_domains(bottle)
|
|
ip_cidrs = pipelock_bottle_ssh_ip_cidrs(bottle)
|
|
|
|
lines: list[str] = []
|
|
lines.append("version: 1")
|
|
lines.append("mode: strict")
|
|
lines.append("enforce: true")
|
|
lines.append("")
|
|
lines.append("# Hostnames the agent is allowed to reach. Effective list is")
|
|
lines.append("# claude-bottle defaults UNION bottle.egress.allowlist (sorted, deduped).")
|
|
lines.append("api_allowlist:")
|
|
for h in allowlist:
|
|
lines.append(f' - "{h}"')
|
|
lines.append("")
|
|
lines.append("forward_proxy:")
|
|
lines.append(" enabled: true")
|
|
lines.append("")
|
|
if trusted:
|
|
lines.append("trusted_domains:")
|
|
for td in trusted:
|
|
lines.append(f' - "{td}"')
|
|
lines.append("")
|
|
if ip_cidrs:
|
|
lines.append("ssrf:")
|
|
lines.append(" ip_allowlist:")
|
|
for cidr in ip_cidrs:
|
|
lines.append(f' - "{cidr}"')
|
|
lines.append("")
|
|
lines.append("dlp:")
|
|
lines.append(" include_defaults: true")
|
|
lines.append(" scan_env: true")
|
|
|
|
yaml_path.write_text("\n".join(lines) + "\n")
|
|
yaml_path.chmod(0o600)
|
|
|
|
return PipelockProxyPlan(yaml_path=yaml_path, slug=slug)
|
|
|
|
@abstractmethod
|
|
def start(self, plan: PipelockProxyPlan) -> str:
|
|
"""Bring up the pipelock sidecar according to `plan`. Returns
|
|
the proxy_target string identifying the running instance — the
|
|
same value to pass to `.stop`. Backend-specific."""
|
|
|
|
@abstractmethod
|
|
def stop(self, proxy_target: str) -> None:
|
|
"""Tear down the pipelock sidecar identified by `proxy_target`
|
|
(the value `.start` returned). Idempotent: a missing target is
|
|
success. Backend-specific."""
|