refactor(backend): lift shared CA helpers #92

Merged
didericis merged 2 commits from refactor-lift-ca-helpers into main 2026-05-28 20:36:30 -04:00
4 changed files with 75 additions and 88 deletions
+1 -1
View File
@@ -59,6 +59,7 @@ from ...supervise import (
SUPERVISE_PORT,
)
from ...util import expand_tilde
from ..util import AGENT_CA_BUNDLE, AGENT_CA_PATH
from .bottle_plan import DockerBottlePlan
from .egress import (
EGRESS_CA_IN_CONTAINER,
@@ -75,7 +76,6 @@ from .pipelock import (
PIPELOCK_CA_KEY_IN_CONTAINER,
PIPELOCK_PORT,
)
from .provision.ca import AGENT_CA_BUNDLE, AGENT_CA_PATH
from .sidecar_bundle import (
SIDECAR_BUNDLE_DOCKERFILE,
SIDECAR_BUNDLE_IMAGE,
+3 -43
View File
@@ -31,54 +31,18 @@ stage dir; nothing in the agent ever sees it."""
from __future__ import annotations
import hashlib
import ssl
import subprocess
from pathlib import Path
from ....log import info
from ...util import AGENT_CA_PATH, log_ca_fingerprint, select_ca_cert
from ..bottle_plan import DockerBottlePlan
# Debian-family path for sources that `update-ca-certificates` reads.
# Bundle path is what the command rebuilds and what every standard
# TLS consumer in the image reads.
AGENT_CA_PATH = "/usr/local/share/ca-certificates/bot-bottle-mitm-ca.crt"
AGENT_CA_BUNDLE = "/etc/ssl/certs/ca-certificates.crt"
def _select_ca_cert(plan: DockerBottlePlan) -> tuple[Path, str]:
"""Pick the CA cert (and a short label for the log line) that
matches the proxy the agent's HTTP_PROXY points at. Egress-proxy
wins when the bottle declares any routes (it sits in front of
pipelock); else pipelock."""
if plan.egress_plan.routes:
cert = plan.egress_plan.mitmproxy_ca_cert_only_host_path
if cert == Path() or not cert.is_file():
from ....log import die
die(
f"egress CA cert missing at {cert or '(empty)'}; "
f"launch must have called egress_tls_init and "
f"re-bound the plan before provision"
)
return cert, "egress"
cert = plan.proxy_plan.ca_cert_host_path
if not cert or not cert.is_file():
from ....log import die
die(
f"pipelock CA cert missing at {cert or '(empty)'}; "
f"launch must have called pipelock_tls_init and re-bound "
f"the plan before provision"
)
return cert, "pipelock"
def provision_ca(plan: DockerBottlePlan, target: str) -> None:
"""Copy the agent-facing CA cert into the agent, rebuild the
trust bundle, emit a one-line fingerprint log. Called from
`BottleBackend.provision` after the agent container is up."""
container = target
cert_host_path, label = _select_ca_cert(plan)
cert_host_path, label = select_ca_cert(plan.egress_plan, plan.proxy_plan)
subprocess.run(
["docker", "cp", str(cert_host_path), f"{container}:{AGENT_CA_PATH}"],
@@ -96,8 +60,4 @@ def provision_ca(plan: DockerBottlePlan, target: str) -> None:
check=True,
)
# Stdlib SHA-256 of the cert's DER bytes — the standard
# fingerprint form. Never the private key.
der = ssl.PEM_cert_to_DER_cert(cert_host_path.read_text())
fingerprint = hashlib.sha256(der).hexdigest()
info(f"{label} ca fingerprint: sha256:{fingerprint[:32]}...")
log_ca_fingerprint(cert_host_path, label)
+11 -43
View File
@@ -15,49 +15,22 @@ flag exists; the VM init is root), so we don't need the explicit
from __future__ import annotations
import hashlib
import ssl
from pathlib import Path
from ....log import die, info
from ...docker.provision.ca import AGENT_CA_BUNDLE, AGENT_CA_PATH
from ....log import die
from ...util import (
AGENT_CA_BUNDLE,
AGENT_CA_PATH,
log_ca_fingerprint,
select_ca_cert,
)
from .. import smolvm as _smolvm
from ..bottle_plan import SmolmachinesBottlePlan
def _select_ca_cert(plan: SmolmachinesBottlePlan) -> tuple[Path, str]:
"""Pick the CA cert (and a short label for the log line) that
matches the proxy the agent's HTTP_PROXY points at. Egress-proxy
wins when the bottle declares any routes; else pipelock.
The launch step minted both CAs (pipelock always; egress when
routes are declared) and stored their host paths back into the
inner Plans via `dataclasses.replace`. If those paths are empty
here something has gone wrong in launch's bringup."""
if plan.egress_plan.routes:
cert = plan.egress_plan.mitmproxy_ca_cert_only_host_path
if cert == Path() or not cert.is_file():
die(
f"egress CA cert missing at {cert or '(empty)'}; "
f"launch must have called egress_tls_init and "
f"re-bound the plan before provision"
)
return cert, "egress"
cert = plan.proxy_plan.ca_cert_host_path
if not cert or not cert.is_file():
die(
f"pipelock CA cert missing at {cert or '(empty)'}; "
f"launch must have called pipelock_tls_init and re-bound "
f"the plan before provision"
)
return cert, "pipelock"
def provision_ca(plan: SmolmachinesBottlePlan, target: str) -> None:
"""Copy the agent-facing CA cert into the guest, rebuild the
trust bundle, emit a one-line fingerprint log. Called from
`BottleBackend.provision` after the smolvm guest is up."""
cert_host_path, label = _select_ca_cert(plan)
cert_host_path, label = select_ca_cert(plan.egress_plan, plan.proxy_plan)
_smolvm.machine_cp(str(cert_host_path), f"{target}:{AGENT_CA_PATH}")
# Mode 0644 — readable to non-root tools in the guest.
@@ -90,15 +63,10 @@ def provision_ca(plan: SmolmachinesBottlePlan, target: str) -> None:
f"stderr={(r.stderr or '').strip()!r}"
)
# Stdlib SHA-256 of the cert's DER bytes — the standard
# fingerprint form. Never the private key.
der = ssl.PEM_cert_to_DER_cert(cert_host_path.read_text())
fingerprint = hashlib.sha256(der).hexdigest()
info(f"{label} ca fingerprint: sha256:{fingerprint[:32]}...")
log_ca_fingerprint(cert_host_path, label)
# Re-exported for the launch/provision_ca caller + tests. The path
# constants come from the docker module because they're tied to
# Debian's `update-ca-certificates` layout same in both backends
# since both guest images are Debian-family.
# constants live in the shared `backend.util` (Debian's
# `update-ca-certificates` layout is the same in both backends).
__all__ = ["AGENT_CA_BUNDLE", "AGENT_CA_PATH", "provision_ca"]
+60 -1
View File
@@ -4,9 +4,26 @@ deeper (e.g. bot_bottle/backend/docker/util.py)."""
from __future__ import annotations
import hashlib
import os
import ssl
from pathlib import Path
from typing import TYPE_CHECKING
from ..log import die
from ..log import die, info
if TYPE_CHECKING:
from ..egress import EgressPlan
from ..pipelock import PipelockProxyPlan
# Debian-family CA layout, shared by every backend (all guest images
# are Debian-family). AGENT_CA_PATH is the source path that
# `update-ca-certificates` reads; AGENT_CA_BUNDLE is the bundle it
# rebuilds, which curl, Python `ssl`, and OpenSSL-based tools all read
# by default.
AGENT_CA_PATH = "/usr/local/share/ca-certificates/bot-bottle-mitm-ca.crt"
AGENT_CA_BUNDLE = "/etc/ssl/certs/ca-certificates.crt"
def host_skill_dir(name: str) -> str:
@@ -16,3 +33,45 @@ def host_skill_dir(name: str) -> str:
if not home:
die("HOME not set")
return f"{home}/.claude/skills/{name}"
def select_ca_cert(
egress_plan: EgressPlan, proxy_plan: PipelockProxyPlan
) -> tuple[Path, str]:
"""Pick the agent-facing CA cert (and a short label for the log
line) that matches the proxy the agent's HTTP_PROXY points at.
Egress wins when the bottle declares any routes (it sits in front
of pipelock); else pipelock.
Shared by every backend's `provision_ca`: launch mints the chosen
CA(s) and re-binds their host paths into these inner plans before
provision runs, so an empty/missing path here means launch's
bringup is broken — fatal."""
if egress_plan.routes:
cert = egress_plan.mitmproxy_ca_cert_only_host_path
if cert == Path() or not cert.is_file():
die(
f"egress CA cert missing at {cert or '(empty)'}; "
f"launch must have called egress_tls_init and "
f"re-bound the plan before provision"
)
return cert, "egress"
cert = proxy_plan.ca_cert_host_path
if not cert or not cert.is_file():
die(
f"pipelock CA cert missing at {cert or '(empty)'}; "
f"launch must have called pipelock_tls_init and re-bound "
f"the plan before provision"
)
return cert, "pipelock"
def log_ca_fingerprint(cert_host_path: Path, label: str) -> None:
"""Compute the cert's SHA-256 fingerprint over its DER bytes
(stdlib `ssl` + `hashlib`) and log it once to stderr — the
standard fingerprint form. Only ever touches the public cert;
the private key stays on the host under the stage dir until
teardown."""
der = ssl.PEM_cert_to_DER_cert(cert_host_path.read_text())
fingerprint = hashlib.sha256(der).hexdigest()
info(f"{label} ca fingerprint: sha256:{fingerprint[:32]}...")