feat(pipelock): enable tls_interception with per-bottle ephemeral CA

First step of PRD 0006. Pipelock now does the CONNECT bumping that
PR #8's mitmproxy chain was supposed to provide — natively, in the
same single sidecar PRD 0001 wired up.

- claude_bottle/pipelock.py: pipelock_build_config grows optional
  ca_cert_path / ca_key_path kwargs. When both are passed the
  rendered YAML carries a `tls_interception: { enabled: true,
  ca_cert, ca_key }` block. PipelockProxy gains class-level
  CA_CERT_IN_CONTAINER / CA_KEY_IN_CONTAINER constants that
  subclasses set to wherever they place the CA inside the
  sidecar. PipelockProxyPlan gains ca_cert_host_path /
  ca_key_host_path fields (default empty Path() — sentinel for
  "not yet populated", filled by launch via dataclasses.replace).

- claude_bottle/backend/docker/pipelock.py: new
  pipelock_tls_init(stage_dir) helper runs `pipelock tls init`
  in a one-shot container against a host-mounted scratch dir.
  DockerPipelockProxy sets its class constants to
  /etc/pipelock-ca.pem and /etc/pipelock-ca-key.pem; .start
  docker-cp's the cert + key into those paths between
  `docker create` and `docker start`. Pipelock runs as root in
  its distroless image, so no chown is needed (verified).

- claude_bottle/backend/docker/launch.py: calls pipelock_tls_init
  between network creation and proxy.start. Prepare stays
  side-effect-free on docker; the one-shot ca-init container
  only runs on a real launch, not on `start --dry-run`.

- tests/unit/test_pipelock_yaml.py: new assertions that
  pipelock_build_config emits the tls_interception block only
  when both paths are supplied (and rejects a half-set pair),
  plus a test that the docker proxy's prepare plumbs the
  in-container paths through to the rendered YAML.

The end-to-end "bumping actually fires" assertion lands in
chunk 4 (HTTPS integration tests).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-12 14:45:36 -04:00
parent f44e884d8a
commit 3755e66abe
4 changed files with 194 additions and 30 deletions
+10 -1
View File
@@ -22,7 +22,7 @@ from . import network as network_mod
from . import util as docker_mod
from .bottle import DockerBottle
from .bottle_plan import DockerBottlePlan
from .pipelock import DockerPipelockProxy, pipelock_proxy_url
from .pipelock import DockerPipelockProxy, pipelock_proxy_url, pipelock_tls_init
# Where the repo root lives, for `docker build` context. Computed once.
@@ -63,10 +63,19 @@ def launch(
egress_network = network_mod.network_create_egress(plan.slug)
stack.callback(network_mod.network_remove, egress_network)
# Per-bottle ephemeral CA for pipelock's TLS interception
# (PRD 0006). One-shot pipelock container writes ca.pem +
# ca-key.pem under plan.stage_dir; .start docker-cp's them
# into the sidecar. The private key never leaves the host
# stage dir, which start.py's outer finally `shutil.rmtree`s
# after the sidecar is torn down.
ca_cert_host, ca_key_host = pipelock_tls_init(plan.stage_dir)
proxy_plan = dataclasses.replace(
plan.proxy_plan,
internal_network=internal_network,
egress_network=egress_network,
ca_cert_host_path=ca_cert_host,
ca_key_host_path=ca_key_host,
)
pipelock_name = proxy.start(proxy_plan)
stack.callback(proxy.stop, pipelock_name)
+71 -14
View File
@@ -6,6 +6,7 @@ from __future__ import annotations
import os
import subprocess
from pathlib import Path
from ...log import die, info, warn
from ...pipelock import PipelockProxy, PipelockProxyPlan
@@ -21,6 +22,12 @@ PIPELOCK_IMAGE = os.environ.get(
# Listening port for pipelock's forward proxy.
PIPELOCK_PORT = os.environ.get("CLAUDE_BOTTLE_PIPELOCK_PORT", "8888")
# In-container paths where the per-bottle CA cert + key land after
# `docker cp` in `DockerPipelockProxy.start`. Pipelock's rendered
# YAML references these paths under `tls_interception`.
PIPELOCK_CA_CERT_IN_CONTAINER = "/etc/pipelock-ca.pem"
PIPELOCK_CA_KEY_IN_CONTAINER = "/etc/pipelock-ca-key.pem"
def pipelock_container_name(slug: str) -> str:
return f"claude-bottle-pipelock-{slug}"
@@ -34,19 +41,56 @@ def pipelock_proxy_host_port(slug: str) -> str:
return f"{pipelock_container_name(slug)}:{PIPELOCK_PORT}"
def pipelock_tls_init(stage_dir: Path) -> tuple[Path, Path]:
"""Generate a fresh per-bottle CA via a one-shot pipelock container.
Runs `pipelock tls init` against a host-mounted scratch dir, leaving
`ca.pem` (public cert, mode 600) and `ca-key.pem` (private key, mode
600) under `<stage_dir>/pipelock-ca/`. Returns the two host paths.
The image is pinned (same digest the running sidecar uses) so the
generated CA matches what the sidecar expects. Output is owned by
whatever UID the one-shot ran as; `DockerPipelockProxy.start`
`docker cp`s the files into the sidecar's filesystem layer, so
runtime ownership inside the sidecar (root in pipelock's
distroless image) is independent."""
work = stage_dir / "pipelock-ca"
work.mkdir(exist_ok=True)
result = subprocess.run(
["docker", "run", "--rm",
"-v", f"{work}:/h",
"-e", "PIPELOCK_HOME=/h",
PIPELOCK_IMAGE, "tls", "init"],
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0:
die(f"pipelock tls init failed: {result.stderr.strip()}")
cert = work / "ca.pem"
key = work / "ca-key.pem"
if not cert.is_file() or not key.is_file():
die(f"pipelock tls init did not produce ca files in {work}")
return (cert, key)
class DockerPipelockProxy(PipelockProxy):
"""Brings the pipelock sidecar up and down via Docker."""
CA_CERT_IN_CONTAINER = PIPELOCK_CA_CERT_IN_CONTAINER
CA_KEY_IN_CONTAINER = PIPELOCK_CA_KEY_IN_CONTAINER
def start(self, plan: PipelockProxyPlan) -> str:
"""Boot the pipelock sidecar:
1. `docker create` on the internal network with the canonical
name and argv `run --config /etc/pipelock.yaml --listen
0.0.0.0:<port>`.
2. `docker cp` the YAML config to /etc/pipelock.yaml in the
writable layer (parent dir must already exist; image is
distroless).
3. Attach to the per-agent egress network.
4. `docker start`.
2. `docker cp` the YAML config to /etc/pipelock.yaml.
3. `docker cp` the CA cert + key to /etc/pipelock-ca.pem
and /etc/pipelock-ca-key.pem (pipelock runs as root in
its distroless image, so no chown is needed).
4. Attach to the per-agent egress network.
5. `docker start`.
Returns the container name (the proxy_target passed to .stop)."""
name = pipelock_container_name(plan.slug)
if not plan.yaml_path.is_file():
@@ -54,6 +98,11 @@ class DockerPipelockProxy(PipelockProxy):
f"pipelock yaml not found at {plan.yaml_path}; "
f"PipelockProxy.prepare must run first"
)
if not plan.ca_cert_host_path.is_file() or not plan.ca_key_host_path.is_file():
die(
f"pipelock CA missing at {plan.ca_cert_host_path} / "
f"{plan.ca_key_host_path}; pipelock_tls_init must run first"
)
info(f"starting pipelock sidecar {name} on network {plan.internal_network}")
@@ -68,15 +117,23 @@ class DockerPipelockProxy(PipelockProxy):
if subprocess.run(create_args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False).returncode != 0:
die(f"failed to create pipelock sidecar {name}")
cp_result = subprocess.run(
["docker", "cp", str(plan.yaml_path), f"{name}:/etc/pipelock.yaml"],
capture_output=True,
text=True,
check=False,
)
if cp_result.returncode != 0:
subprocess.run(["docker", "rm", "-f", name], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False)
die(f"failed to copy pipelock yaml into {name}: {cp_result.stderr.strip()}")
for src, dst, label in (
(plan.yaml_path, "/etc/pipelock.yaml", "yaml"),
(plan.ca_cert_host_path, PIPELOCK_CA_CERT_IN_CONTAINER, "ca cert"),
(plan.ca_key_host_path, PIPELOCK_CA_KEY_IN_CONTAINER, "ca key"),
):
cp_result = subprocess.run(
["docker", "cp", str(src), f"{name}:{dst}"],
capture_output=True,
text=True,
check=False,
)
if cp_result.returncode != 0:
subprocess.run(
["docker", "rm", "-f", name],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False,
)
die(f"failed to copy pipelock {label} into {name}: {cp_result.stderr.strip()}")
if subprocess.run(
["docker", "network", "connect", plan.egress_network, name],
+70 -15
View File
@@ -89,13 +89,26 @@ def pipelock_allowlist_summary(bottle: Bottle) -> str:
# --- Config build + YAML render --------------------------------------------
def pipelock_build_config(bottle: Bottle) -> dict[str, object]:
def pipelock_build_config(
bottle: Bottle,
*,
ca_cert_path: str = "",
ca_key_path: str = "",
) -> dict[str, object]:
"""Build the structured pipelock config dict the sidecar will load.
Deliberately carries no env values, no secrets, no per-agent
customization beyond the resolved hostname list. The shape mirrors
the YAML pipelock expects on disk; `pipelock_render_yaml` serializes
it. Tests assert on this dict; production code renders it."""
it. Tests assert on this dict; production code renders it.
`ca_cert_path` / `ca_key_path` are the **in-container** paths the
pipelock sidecar will read its CA from at runtime (they're
populated into the container at start time via `docker cp`).
Pass both or neither: both → emit `tls_interception` block with
`enabled: true`; neither → omit the block entirely (pipelock
falls back to its built-in default of `enabled: false`). Used
by PRD 0006 to turn on pipelock's native TLS interception."""
cfg: dict[str, object] = {
"version": 1,
"mode": "strict",
@@ -116,6 +129,17 @@ def pipelock_build_config(bottle: Bottle) -> dict[str, object]:
# with a log line); claude-bottle's default is "block" so a hit
# actually stops the request from leaving the egress network.
cfg["request_body_scanning"] = {"action": bottle.egress.dlp_action}
if ca_cert_path or ca_key_path:
if not (ca_cert_path and ca_key_path):
raise ValueError(
"pipelock_build_config: pass both ca_cert_path and ca_key_path "
"to enable tls_interception, or neither to leave it off"
)
cfg["tls_interception"] = {
"enabled": True,
"ca_cert": ca_cert_path,
"ca_key": ca_key_path,
}
return cfg
@@ -159,6 +183,13 @@ def pipelock_render_yaml(cfg: dict[str, object]) -> str:
lines.append("request_body_scanning:")
rbs = cast(dict[str, object], cfg["request_body_scanning"])
lines.append(f' action: "{rbs["action"]}"')
if "tls_interception" in cfg:
lines.append("")
lines.append("tls_interception:")
tls = cast(dict[str, object], cfg["tls_interception"])
lines.append(f" enabled: {_bool(tls['enabled'])}")
lines.append(f' ca_cert: "{tls["ca_cert"]}"')
lines.append(f' ca_key: "{tls["ca_key"]}"')
return "\n".join(lines) + "\n"
@@ -170,42 +201,66 @@ class PipelockProxyPlan:
"""Output of PipelockProxy.prepare; consumed by .start when the
sidecar needs to be brought up.
yaml_path + slug are filled in at prepare time. internal_network
and egress_network default to empty and are populated by the
backend's launch step (via dataclasses.replace) once those networks
have actually been created."""
yaml_path + slug are filled in at prepare time (host-side, side-
effect-free; the YAML references the in-container CA paths
already so it doesn't need the host paths to be valid). The
remaining fields are populated by the backend's launch step
via `dataclasses.replace`: internal/egress networks once
those networks exist, and the CA host paths once the
one-shot `pipelock tls init` has run. Empty defaults are
sentinels meaning "not yet set"; `.start` validates that
they are populated."""
yaml_path: Path
slug: str
internal_network: str = ""
egress_network: str = ""
ca_cert_host_path: Path = Path()
ca_key_host_path: Path = Path()
class PipelockProxy(ABC):
"""The pipelock egress proxy. Encapsulates the YAML-config
generation; the sidecar's start/stop lifecycle is backend-specific
and lives on concrete subclasses."""
and lives on concrete subclasses.
The class-level constants `CA_CERT_IN_CONTAINER` /
`CA_KEY_IN_CONTAINER` are the in-container paths the YAML config
references — they correspond to wherever the backend's `.start`
places the CA cert and key inside the sidecar. Subclasses
override the constants."""
CA_CERT_IN_CONTAINER: str = ""
CA_KEY_IN_CONTAINER: str = ""
def prepare(
self, bottle: Bottle, slug: str, stage_dir: Path
) -> PipelockProxyPlan:
"""Write the pipelock yaml config (mode 600) under `stage_dir`
and return the plan for `.start`.
and return the plan for `.start`. Pure host-side, no docker
subprocess.
`slug` is the agent-derived identifier (lowercased,
hyphen-normalized) used as the suffix in every per-agent
resource name — the agent container, the pipelock container
(`claude-bottle-pipelock-<slug>`), the internal/egress
networks. It's stored on the returned plan so the backend's
start step can derive the sidecar's container name."""
yaml_path = stage_dir / "pipelock.yaml"
self._build_pipelock_yaml(bottle, yaml_path)
return PipelockProxyPlan(yaml_path=yaml_path, slug=slug)
start step can derive the sidecar's container name.
def _build_pipelock_yaml(self, bottle: Bottle, yaml_path: Path):
"""Write the pipelock yaml config (mode 600) to `yaml_path`."""
yaml_path.write_text(pipelock_render_yaml(pipelock_build_config(bottle)))
The CA paths the YAML references are the in-container paths
from the concrete subclass's class-level constants. The
host-side counterparts are generated by the launch step
(not here, so prepare stays side-effect-free on docker) and
added to the plan via `dataclasses.replace` before `.start`."""
yaml_path = stage_dir / "pipelock.yaml"
cfg = pipelock_build_config(
bottle,
ca_cert_path=self.CA_CERT_IN_CONTAINER,
ca_key_path=self.CA_KEY_IN_CONTAINER,
)
yaml_path.write_text(pipelock_render_yaml(cfg))
yaml_path.chmod(0o600)
return PipelockProxyPlan(yaml_path=yaml_path, slug=slug)
@abstractmethod
def start(self, plan: PipelockProxyPlan) -> str:
+43
View File
@@ -37,6 +37,9 @@ class TestBuildConfig(unittest.TestCase):
# No SSH entries → no trusted_domains, no ssrf.
self.assertNotIn("trusted_domains", cfg)
self.assertNotIn("ssrf", cfg)
# Without CA paths, the tls_interception block is omitted —
# pipelock falls back to its built-in default of `enabled: false`.
self.assertNotIn("tls_interception", cfg)
def test_ssh_shape(self):
cfg = pipelock_build_config(fixture_with_ssh().bottles["dev"])
@@ -49,6 +52,31 @@ class TestBuildConfig(unittest.TestCase):
# Strict mode: IPv4 host is also in the api_allowlist union.
self.assertIn("100.78.141.42", cast(list[str], cfg["api_allowlist"]))
def test_tls_interception_block_emitted_when_paths_supplied(self):
# PRD 0006: paths flow in via DockerPipelockProxy's in-container
# constants; this directly pins the dict shape.
cfg = pipelock_build_config(
fixture_minimal().bottles["dev"],
ca_cert_path="/etc/pipelock-ca.pem",
ca_key_path="/etc/pipelock-ca-key.pem",
)
self.assertEqual(
{
"enabled": True,
"ca_cert": "/etc/pipelock-ca.pem",
"ca_key": "/etc/pipelock-ca-key.pem",
},
cfg["tls_interception"],
)
def test_tls_interception_requires_both_paths(self):
# Half-set is a programmer error, not a silent omission.
with self.assertRaises(ValueError):
pipelock_build_config(
fixture_minimal().bottles["dev"],
ca_cert_path="/etc/pipelock-ca.pem",
)
class TestRenderAndWrite(unittest.TestCase):
def setUp(self):
@@ -101,6 +129,21 @@ class TestRenderAndWrite(unittest.TestCase):
self.assertNotIn("MY_SECRET", content)
self.assertNotIn("prompt-message", content)
def test_render_emits_tls_interception_via_prepare(self):
"""`DockerPipelockProxy.prepare` plumbs its in-container CA
constants through to the YAML. The block should land in the
rendered output with `enabled: true` and the configured paths.
The actual host-side CA generation happens in launch (not
prepare), so this test exercises only the YAML rendering."""
plan = DockerPipelockProxy().prepare(
fixture_minimal().bottles["dev"], "demo", self.out_dir
)
content = plan.yaml_path.read_text()
self.assertIn("tls_interception:", content)
self.assertIn("enabled: true", content)
self.assertIn('ca_cert: "/etc/pipelock-ca.pem"', content)
self.assertIn('ca_key: "/etc/pipelock-ca-key.pem"', content)
if __name__ == "__main__":
unittest.main()