refactor(backend): lift shared CA cert select + fingerprint helpers

Both backends' provision_ca duplicated _select_ca_cert and the
SHA-256 fingerprint computation verbatim. Lift them into the shared
backend/util.py as select_ca_cert + log_ca_fingerprint; docker and
smolmachines now call the shared helpers. No behavior change.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-05-28 18:09:40 -04:00
parent 9183c64225
commit 78d03a8269
3 changed files with 58 additions and 76 deletions
+3 -36
View File
@@ -31,12 +31,9 @@ stage dir; nothing in the agent ever sees it."""
from __future__ import annotations
import hashlib
import ssl
import subprocess
from pathlib import Path
from ....log import info
from ...util import log_ca_fingerprint, select_ca_cert
from ..bottle_plan import DockerBottlePlan
@@ -47,38 +44,12 @@ AGENT_CA_PATH = "/usr/local/share/ca-certificates/claude-bottle-mitm-ca.crt"
AGENT_CA_BUNDLE = "/etc/ssl/certs/ca-certificates.crt"
def _select_ca_cert(plan: DockerBottlePlan) -> tuple[Path, str]:
"""Pick the CA cert (and a short label for the log line) that
matches the proxy the agent's HTTP_PROXY points at. Egress-proxy
wins when the bottle declares any routes (it sits in front of
pipelock); else pipelock."""
if plan.egress_plan.routes:
cert = plan.egress_plan.mitmproxy_ca_cert_only_host_path
if cert == Path() or not cert.is_file():
from ....log import die
die(
f"egress CA cert missing at {cert or '(empty)'}; "
f"launch must have called egress_tls_init and "
f"re-bound the plan before provision"
)
return cert, "egress"
cert = plan.proxy_plan.ca_cert_host_path
if not cert or not cert.is_file():
from ....log import die
die(
f"pipelock CA cert missing at {cert or '(empty)'}; "
f"launch must have called pipelock_tls_init and re-bound "
f"the plan before provision"
)
return cert, "pipelock"
def provision_ca(plan: DockerBottlePlan, target: str) -> None:
"""Copy the agent-facing CA cert into the agent, rebuild the
trust bundle, emit a one-line fingerprint log. Called from
`BottleBackend.provision` after the agent container is up."""
container = target
cert_host_path, label = _select_ca_cert(plan)
cert_host_path, label = select_ca_cert(plan.egress_plan, plan.proxy_plan)
subprocess.run(
["docker", "cp", str(cert_host_path), f"{container}:{AGENT_CA_PATH}"],
@@ -96,8 +67,4 @@ def provision_ca(plan: DockerBottlePlan, target: str) -> None:
check=True,
)
# Stdlib SHA-256 of the cert's DER bytes — the standard
# fingerprint form. Never the private key.
der = ssl.PEM_cert_to_DER_cert(cert_host_path.read_text())
fingerprint = hashlib.sha256(der).hexdigest()
info(f"{label} ca fingerprint: sha256:{fingerprint[:32]}...")
log_ca_fingerprint(cert_host_path, label)
@@ -15,49 +15,18 @@ flag exists; the VM init is root), so we don't need the explicit
from __future__ import annotations
import hashlib
import ssl
from pathlib import Path
from ....log import die, info
from ....log import die
from ...docker.provision.ca import AGENT_CA_BUNDLE, AGENT_CA_PATH
from ...util import log_ca_fingerprint, select_ca_cert
from .. import smolvm as _smolvm
from ..bottle_plan import SmolmachinesBottlePlan
def _select_ca_cert(plan: SmolmachinesBottlePlan) -> tuple[Path, str]:
"""Pick the CA cert (and a short label for the log line) that
matches the proxy the agent's HTTP_PROXY points at. Egress-proxy
wins when the bottle declares any routes; else pipelock.
The launch step minted both CAs (pipelock always; egress when
routes are declared) and stored their host paths back into the
inner Plans via `dataclasses.replace`. If those paths are empty
here something has gone wrong in launch's bringup."""
if plan.egress_plan.routes:
cert = plan.egress_plan.mitmproxy_ca_cert_only_host_path
if cert == Path() or not cert.is_file():
die(
f"egress CA cert missing at {cert or '(empty)'}; "
f"launch must have called egress_tls_init and "
f"re-bound the plan before provision"
)
return cert, "egress"
cert = plan.proxy_plan.ca_cert_host_path
if not cert or not cert.is_file():
die(
f"pipelock CA cert missing at {cert or '(empty)'}; "
f"launch must have called pipelock_tls_init and re-bound "
f"the plan before provision"
)
return cert, "pipelock"
def provision_ca(plan: SmolmachinesBottlePlan, target: str) -> None:
"""Copy the agent-facing CA cert into the guest, rebuild the
trust bundle, emit a one-line fingerprint log. Called from
`BottleBackend.provision` after the smolvm guest is up."""
cert_host_path, label = _select_ca_cert(plan)
cert_host_path, label = select_ca_cert(plan.egress_plan, plan.proxy_plan)
_smolvm.machine_cp(str(cert_host_path), f"{target}:{AGENT_CA_PATH}")
# Mode 0644 — readable to non-root tools in the guest.
@@ -90,11 +59,7 @@ def provision_ca(plan: SmolmachinesBottlePlan, target: str) -> None:
f"stderr={(r.stderr or '').strip()!r}"
)
# Stdlib SHA-256 of the cert's DER bytes — the standard
# fingerprint form. Never the private key.
der = ssl.PEM_cert_to_DER_cert(cert_host_path.read_text())
fingerprint = hashlib.sha256(der).hexdigest()
info(f"{label} ca fingerprint: sha256:{fingerprint[:32]}...")
log_ca_fingerprint(cert_host_path, label)
# Re-exported for the launch/provision_ca caller + tests. The path
+51 -1
View File
@@ -4,9 +4,17 @@ deeper (e.g. claude_bottle/backend/docker/util.py)."""
from __future__ import annotations
import hashlib
import os
import ssl
from pathlib import Path
from typing import TYPE_CHECKING
from ..log import die
from ..log import die, info
if TYPE_CHECKING:
from ..egress import EgressPlan
from ..pipelock import PipelockProxyPlan
def host_skill_dir(name: str) -> str:
@@ -16,3 +24,45 @@ def host_skill_dir(name: str) -> str:
if not home:
die("HOME not set")
return f"{home}/.claude/skills/{name}"
def select_ca_cert(
egress_plan: EgressPlan, proxy_plan: PipelockProxyPlan
) -> tuple[Path, str]:
"""Pick the agent-facing CA cert (and a short label for the log
line) that matches the proxy the agent's HTTP_PROXY points at.
Egress wins when the bottle declares any routes (it sits in front
of pipelock); else pipelock.
Shared by every backend's `provision_ca`: launch mints the chosen
CA(s) and re-binds their host paths into these inner plans before
provision runs, so an empty/missing path here means launch's
bringup is broken — fatal."""
if egress_plan.routes:
cert = egress_plan.mitmproxy_ca_cert_only_host_path
if cert == Path() or not cert.is_file():
die(
f"egress CA cert missing at {cert or '(empty)'}; "
f"launch must have called egress_tls_init and "
f"re-bound the plan before provision"
)
return cert, "egress"
cert = proxy_plan.ca_cert_host_path
if not cert or not cert.is_file():
die(
f"pipelock CA cert missing at {cert or '(empty)'}; "
f"launch must have called pipelock_tls_init and re-bound "
f"the plan before provision"
)
return cert, "pipelock"
def log_ca_fingerprint(cert_host_path: Path, label: str) -> None:
"""Compute the cert's SHA-256 fingerprint over its DER bytes
(stdlib `ssl` + `hashlib`) and log it once to stderr — the
standard fingerprint form. Only ever touches the public cert;
the private key stays on the host under the stage dir until
teardown."""
der = ssl.PEM_cert_to_DER_cert(cert_host_path.read_text())
fingerprint = hashlib.sha256(der).hexdigest()
info(f"{label} ca fingerprint: sha256:{fingerprint[:32]}...")