refactor: rename platform abstraction to backend
test / run tests/run_tests.py (pull_request) Successful in 21s

Across the package:
  - claude_bottle/platform/         -> claude_bottle/backend/
  - platform/docker/platform.py     -> backend/docker/backend.py
  - class BottlePlatform            -> BottleBackend
  - class DockerBottlePlatform      -> DockerBottleBackend
  - get_bottle_platform()           -> get_bottle_backend()
  - env var CLAUDE_BOTTLE_PLATFORM  -> CLAUDE_BOTTLE_BACKEND
  - dict _PLATFORMS                 -> _BACKENDS

"Backend" is shorter and more established as the term for a
pluggable strategy-pattern implementation. "Platform" was vague
(could mean OS, hardware, cloud) and mildly redundant — Docker is
itself a platform.

The previous PRD section claiming "the Backend protocol was
rejected" referred to a low-level run/exec/cp/network_connect
protocol; the name was never the reason. The PRD is updated to
describe that rejected design by shape rather than by name.

The bottle/agent concepts and the manifest schema are unchanged.
This commit is contained in:
2026-05-10 23:59:38 -04:00
parent c79966731c
commit 70a22fa210
13 changed files with 91 additions and 87 deletions
+29
View File
@@ -0,0 +1,29 @@
"""Docker bottle backend.
The bulk of the implementation lives in sibling modules:
- util: thin Docker subprocess wrappers
- network: Docker network plumbing
- bottle_plan: DockerBottlePlan
- bottle_cleanup_plan: DockerBottleCleanupPlan
- bottle: DockerBottle handle
- backend: DockerBottleBackend
This file only re-exports the public names so
`from claude_bottle.backend.docker import DockerBottleBackend` keeps
working.
"""
from __future__ import annotations
from .backend import DockerBottleBackend
from .bottle import DockerBottle
from .bottle_cleanup_plan import DockerBottleCleanupPlan
from .bottle_plan import DockerBottlePlan
__all__ = [
"DockerBottle",
"DockerBottleBackend",
"DockerBottleCleanupPlan",
"DockerBottlePlan",
]
+402
View File
@@ -0,0 +1,402 @@
"""DockerBottleBackend — the Docker implementation of BottleBackend.
Methods:
.prepare(spec, stage_dir=...) -> DockerBottlePlan
.launch(plan) -> ContextManager[DockerBottle]
.prepare_cleanup() -> DockerBottleCleanupPlan
.cleanup(plan) -> None
.list_active() -> None
"""
from __future__ import annotations
import os
import subprocess
import sys
from contextlib import contextmanager
from pathlib import Path
from typing import Iterator
from ... import pipelock
from ... import skills as skills_mod
from ... import ssh as ssh_mod
from ...env_resolve import env_resolve
from ...log import die, info
from .. import BottleBackend, BottleCleanupPlan, BottlePlan, BottleSpec
from . import network as network_mod
from . import util as docker_mod
from .bottle import DockerBottle
from .bottle_cleanup_plan import DockerBottleCleanupPlan
from .bottle_plan import DockerBottlePlan
# Where the repo root lives, for `docker build` context. Computed once.
_REPO_DIR = str(Path(__file__).resolve().parent.parent.parent.parent)
class DockerBottleBackend(BottleBackend):
"""Docker backend implementation. Selected by CLAUDE_BOTTLE_BACKEND
(default)."""
name = "docker"
def prepare(self, spec: BottleSpec, *, stage_dir: Path) -> DockerBottlePlan:
"""Resolve names, validate, write scratch files. No Docker
resources are created; the only side effects are host-side
files under stage_dir and a probe of `docker info`."""
docker_mod.require_docker()
manifest = spec.manifest
manifest.require_agent(spec.agent_name)
agent = manifest.agents[spec.agent_name]
bottle = manifest.bottle_for(spec.agent_name)
bottle_name = agent.bottle
slug = docker_mod.slugify(spec.agent_name)
image = os.environ.get("CLAUDE_BOTTLE_IMAGE", "claude-bottle:latest")
derived_image = ""
runtime_image = image
if spec.copy_cwd:
derived_image = os.environ.get(
"CLAUDE_BOTTLE_DERIVED_IMAGE", f"claude-bottle:cwd-{slug}"
)
runtime_image = derived_image
default_container = f"claude-bottle-{slug}"
pinned_container = os.environ.get("CLAUDE_BOTTLE_CONTAINER", "")
container_name = pinned_container or default_container
container_name_pinned = bool(pinned_container)
suffix = 2
if container_name_pinned:
if docker_mod.container_exists(container_name):
die(
f"container '{container_name}' already exists "
f"(pinned via CLAUDE_BOTTLE_CONTAINER). "
f"Remove it with 'docker rm -f {container_name}' or unset the override."
)
else:
while docker_mod.container_exists(container_name):
container_name = f"{default_container}-{suffix}"
suffix += 1
if suffix > 100:
die(
f"could not find a free container name after "
f"{default_container}-99; clean up old containers with "
f"'docker rm -f <name>'"
)
if agent.skills:
skills_mod.skills_validate_all(list(agent.skills))
if bottle.ssh:
ssh_mod.ssh_validate_entries(bottle.ssh)
env_file = stage_dir / "agent.env"
args_file = stage_dir / "docker-args"
prompt_file = stage_dir / "prompt.txt"
pipelock_yaml_filename = "pipelock.yaml"
pipelock_yaml = stage_dir / pipelock_yaml_filename
env_file.write_text("")
env_file.chmod(0o600)
args_file.write_text("")
prompt_file.write_text("")
prompt_file.chmod(0o600)
pipelock.pipelock_write_yaml(manifest, bottle_name, pipelock_yaml)
env_resolve(manifest, spec.agent_name, env_file, args_file)
prompt_file.write_text(agent.prompt)
allowlist_summary = pipelock.pipelock_allowlist_summary(manifest, bottle_name)
use_runsc = docker_mod.runsc_available()
return DockerBottlePlan(
spec=spec,
stage_dir=stage_dir,
slug=slug,
container_name=container_name,
container_name_pinned=container_name_pinned,
image=image,
derived_image=derived_image,
runtime_image=runtime_image,
env_file=env_file,
args_file=args_file,
prompt_file=prompt_file,
pipelock_yaml_path=pipelock_yaml,
pipelock_yaml_filename=pipelock_yaml_filename,
allowlist_summary=allowlist_summary,
use_runsc=use_runsc,
)
@contextmanager
def launch(self, plan: BottlePlan) -> Iterator[DockerBottle]:
"""Build, launch, and provision a Docker bottle. Teardown on exit."""
assert isinstance(plan, DockerBottlePlan), (
f"DockerBottleBackend.launch expects DockerBottlePlan, "
f"got {type(plan).__name__}"
)
state: dict[str, str] = {
"container": "",
"pipelock": "",
"internal_network": "",
"egress_network": "",
}
def teardown() -> None:
try:
if state["container"] and docker_mod.container_exists(state["container"]):
subprocess.run(
["docker", "rm", "-f", state["container"]],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
state["container"] = ""
if state["pipelock"]:
pipelock.pipelock_stop(plan.slug)
state["pipelock"] = ""
if state["internal_network"]:
network_mod.network_remove(state["internal_network"])
state["internal_network"] = ""
if state["egress_network"]:
network_mod.network_remove(state["egress_network"])
state["egress_network"] = ""
except BaseException:
# Teardown must not raise; swallow so the caller's
# __exit__ path can still propagate the original error.
pass
try:
docker_mod.build_image(plan.image, _REPO_DIR)
if plan.derived_image:
docker_mod.build_image_with_cwd(
plan.derived_image, plan.image, plan.spec.user_cwd
)
state["internal_network"] = network_mod.network_create_internal(plan.slug)
state["egress_network"] = network_mod.network_create_egress(plan.slug)
state["pipelock"] = pipelock.pipelock_start(
plan.slug,
state["internal_network"],
state["egress_network"],
plan.stage_dir,
plan.pipelock_yaml_filename,
)
container = self._run_agent_container(plan, state["internal_network"])
state["container"] = container
prompt_path = self._provision_container(plan, container)
bottle = DockerBottle(container, teardown, prompt_path)
yield bottle
finally:
teardown()
def _run_agent_container(self, plan: DockerBottlePlan, internal_network: str) -> str:
"""Build the `docker run` argv and execute it, handling
name-conflict races by incrementing the suffix (unless the name
was user-pinned). Returns the resolved container name."""
proxy_url = pipelock.pipelock_proxy_url(plan.slug)
docker_args: list[str] = [
"--rm", "-d",
"--name", plan.container_name,
"--network", internal_network,
"-e", f"HTTPS_PROXY={proxy_url}",
"-e", f"HTTP_PROXY={proxy_url}",
"-e", "NO_PROXY=localhost,127.0.0.1",
]
if plan.use_runsc:
docker_args.extend(["--runtime", "runsc"])
if plan.env_file.stat().st_size > 0:
docker_args.extend(["--env-file", str(plan.env_file)])
# ARGS_FILE pairs (-e, NAME) line-by-line.
args_lines = plan.args_file.read_text().splitlines()
i = 0
while i < len(args_lines):
flag = args_lines[i]
i += 1
if not flag:
continue
if i >= len(args_lines):
break
vname = args_lines[i]
i += 1
docker_args.extend([flag, vname])
if plan.spec.forward_oauth_token:
os.environ["CLAUDE_CODE_OAUTH_TOKEN"] = os.environ["CLAUDE_BOTTLE_OAUTH_TOKEN"]
docker_args.extend(["-e", "CLAUDE_CODE_OAUTH_TOKEN"])
docker_args.extend([plan.runtime_image, "sleep", "infinity"])
info(f"starting container {plan.container_name} from {plan.runtime_image}")
container = plan.container_name
base_name = plan.container_name
suffix = 2
while True:
run_result = subprocess.run(
["docker", "run", *docker_args],
capture_output=True,
text=True,
)
if run_result.returncode == 0:
return container
err_text = run_result.stderr
if plan.container_name_pinned or "is already in use" not in err_text:
sys.stderr.write(err_text + "\n")
die(f"docker run failed for container '{container}'")
if suffix > 100:
die(
f"could not find a free container name after "
f"{base_name}-99 retries; clean up old containers"
)
container = f"{base_name}-{suffix}"
suffix += 1
name_idx = docker_args.index("--name") + 1
docker_args[name_idx] = container
info(f"name conflict; retrying as {container}")
def _provision_container(self, plan: DockerBottlePlan, container: str) -> str | None:
"""Copy prompt, skills, ssh keys, and (optionally) .git into the
running container. Returns the in-container prompt path if a
prompt was provisioned, else None — the Bottle handle uses it
to decide whether to add --append-system-prompt-file to
claude's argv."""
container_home = os.environ.get("CLAUDE_BOTTLE_CONTAINER_HOME", "/home/node")
in_container_prompt_path = f"{container_home}/.claude-bottle-prompt.txt"
subprocess.run(
["docker", "cp", str(plan.prompt_file), f"{container}:{in_container_prompt_path}"],
stdout=subprocess.DEVNULL,
check=True,
)
# `docker cp` preserves host UID; re-own/mode as root so node
# can read its own mode-600 prompt regardless of host UID.
subprocess.run(
["docker", "exec", "-u", "0", container, "chown", "node:node", in_container_prompt_path],
stdout=subprocess.DEVNULL,
check=True,
)
subprocess.run(
["docker", "exec", "-u", "0", container, "chmod", "600", in_container_prompt_path],
stdout=subprocess.DEVNULL,
check=True,
)
agent = plan.spec.manifest.agents[plan.spec.agent_name]
if agent.skills:
skills_mod.skills_copy_into(container, list(agent.skills))
bottle = plan.spec.manifest.bottle_for(plan.spec.agent_name)
if bottle.ssh:
proxy_host_port = pipelock.pipelock_proxy_host_port(plan.slug)
ssh_mod.ssh_setup(container, plan.stage_dir, proxy_host_port, bottle.ssh)
if plan.spec.copy_cwd and Path(plan.spec.user_cwd, ".git").is_dir():
info(f"copying {plan.spec.user_cwd}/.git -> {container}:/home/node/workspace/.git")
subprocess.run(
["docker", "cp", f"{plan.spec.user_cwd}/.git", f"{container}:/home/node/workspace/.git"],
stdout=subprocess.DEVNULL,
check=True,
)
subprocess.run(
[
"docker", "exec", "-u", "0", container,
"chown", "-R", "node:node", "/home/node/workspace/.git",
],
stdout=subprocess.DEVNULL,
check=True,
)
return in_container_prompt_path if agent.prompt else None
# --- Cleanup ---
def prepare_cleanup(self) -> DockerBottleCleanupPlan:
"""Enumerate all claude-bottle-prefixed containers (running or
stopped) and networks. No removals — caller confirms first."""
docker_mod.require_docker()
# `docker ps -a --filter name=...` uses regex matching; anchor at
# the start so we don't pick up containers that merely contain
# "claude-bottle-" mid-name.
cr = subprocess.run(
[
"docker", "ps", "-a",
"--filter", "name=^claude-bottle-",
"--format", "{{.Names}}",
],
capture_output=True,
text=True,
)
containers = tuple(sorted(
line for line in (cr.stdout or "").splitlines() if line
))
# `docker network ls --filter name=...` uses substring matching.
# "claude-bottle-" is specific enough that false positives are
# not a concern.
nr = subprocess.run(
[
"docker", "network", "ls",
"--filter", "name=claude-bottle-",
"--format", "{{.Name}}",
],
capture_output=True,
text=True,
)
networks = tuple(sorted(
line for line in (nr.stdout or "").splitlines() if line
))
return DockerBottleCleanupPlan(containers=containers, networks=networks)
def cleanup(self, plan: BottleCleanupPlan) -> None:
"""Remove the containers and networks listed in the plan.
Containers first; networks would refuse to delete while
containers are still attached."""
assert isinstance(plan, DockerBottleCleanupPlan), (
f"DockerBottleBackend.cleanup expects DockerBottleCleanupPlan, "
f"got {type(plan).__name__}"
)
for name in plan.containers:
info(f"removing container {name}")
subprocess.run(
["docker", "rm", "-f", name],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
for name in plan.networks:
info(f"removing network {name}")
subprocess.run(
["docker", "network", "rm", name],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
# --- List ---
def list_active(self) -> None:
"""Print all running claude-bottle containers (name + status).
Prints a single-line banner if there are none."""
docker_mod.require_docker()
result = subprocess.run(
[
"docker", "ps",
"--filter", "name=^claude-bottle-",
"--format", "{{.Names}}\t{{.Status}}",
],
capture_output=True,
text=True,
)
containers = (result.stdout or "").strip()
if not containers:
info("no active claude-bottle containers")
return
print()
for line in containers.splitlines():
name, _, status = line.partition("\t")
info(f"container: {name} status: {status}")
print()
+46
View File
@@ -0,0 +1,46 @@
"""DockerBottle — concrete Bottle handle yielded by
DockerBottleBackend.launch.
Holds the container name plus the in-container prompt path so
exec_claude can transparently add --append-system-prompt-file when a
prompt was provisioned.
"""
from __future__ import annotations
import subprocess
from .. import Bottle
class DockerBottle(Bottle):
"""Concrete Bottle for Docker."""
def __init__(self, container: str, teardown, prompt_path_in_container: str | None):
self.name = container
self._teardown = teardown
self._prompt_path = prompt_path_in_container
self._closed = False
def exec_claude(self, argv: list[str], *, tty: bool = True) -> int:
full_argv = list(argv)
if self._prompt_path:
full_argv.extend(["--append-system-prompt-file", self._prompt_path])
cmd = ["docker", "exec"]
if tty:
cmd.append("-it")
cmd.extend([self.name, "claude", *full_argv])
return subprocess.run(cmd).returncode
def cp_in(self, host_path: str, container_path: str) -> None:
subprocess.run(
["docker", "cp", host_path, f"{self.name}:{container_path}"],
stdout=subprocess.DEVNULL,
check=True,
)
def close(self) -> None:
if self._closed:
return
self._closed = True
self._teardown()
@@ -0,0 +1,36 @@
"""DockerBottleCleanupPlan — concrete subclass of BottleCleanupPlan.
Holds the tuples of container and network names that
DockerBottleBackend.cleanup will remove. The y/N preflight reads
these via `print`; the CLI short-circuits via `empty`.
"""
from __future__ import annotations
import sys
from dataclasses import dataclass
from ...log import info
from .. import BottleCleanupPlan
@dataclass(frozen=True)
class DockerBottleCleanupPlan(BottleCleanupPlan):
"""Resources DockerBottleBackend.cleanup will remove. Produced by
`prepare_cleanup` from a snapshot of `docker ps -a` + `docker
network ls`; sorted so the y/N output is stable."""
containers: tuple[str, ...]
networks: tuple[str, ...]
@property
def empty(self) -> bool:
return not self.containers and not self.networks
def print(self) -> None:
print(file=sys.stderr)
for name in self.containers:
info(f"container: {name}")
for name in self.networks:
info(f"network: {name}")
print(file=sys.stderr)
@@ -0,0 +1,77 @@
"""DockerBottlePlan — concrete subclass of BottlePlan.
Carries the Docker-specific resolved fields produced by
DockerBottleBackend.prepare. The launch step consumes it without
further resolution; show_plan-style rendering is the `print` method.
"""
from __future__ import annotations
import sys
from dataclasses import dataclass
from pathlib import Path
from ...log import info
from .. import BottlePlan
@dataclass(frozen=True)
class DockerBottlePlan(BottlePlan):
"""Docker-specific resolved fields produced by
DockerBottleBackend.prepare. Inherits `spec` and `stage_dir` from
BottlePlan."""
slug: str
container_name: str
container_name_pinned: bool
image: str
derived_image: str # "" -> no derived image
runtime_image: str # image actually launched (derived or base)
env_file: Path
args_file: Path
prompt_file: Path
pipelock_yaml_path: Path
pipelock_yaml_filename: str
allowlist_summary: str
use_runsc: bool
def print(self, *, remote_control: bool) -> None:
"""Render the y/N preflight summary to stderr. Pure presentation."""
spec = self.spec
manifest = spec.manifest
agent = manifest.agents[spec.agent_name]
bottle = manifest.bottle_for(spec.agent_name)
env_names = list(bottle.env.keys())
if spec.forward_oauth_token:
env_names.append("CLAUDE_CODE_OAUTH_TOKEN")
ssh_hosts = [e.Host for e in bottle.ssh]
prompt_first_line = agent.prompt.splitlines()[0] if agent.prompt else ""
runtime_label = "runsc (gVisor)" if self.use_runsc else "runc (default)"
print(file=sys.stderr)
info(f"agent : {spec.agent_name}")
info(f"image : {self.image}")
if self.derived_image:
info(
f"cwd : {spec.user_cwd} -> /home/node/workspace "
f"(derived: {self.derived_image})"
)
info(f"container : {self.container_name}")
info(f"stage dir : {self.stage_dir}")
info("env (names only): " + (", ".join(env_names) if env_names else "(none)"))
info("skills : " + (" ".join(agent.skills) if agent.skills else "(none)"))
info(f"docker runtime : {runtime_label}")
info(f"bottle : {agent.bottle}")
if ssh_hosts:
info(f" ssh hosts : {', '.join(ssh_hosts)}")
else:
info(" ssh hosts : (none)")
info(f" egress : {self.allowlist_summary}")
info(
f"prompt : {len(agent.prompt)} chars; "
f"first line: {prompt_first_line or '(empty)'}"
)
info("remote-control : " + ("enabled" if remote_control else "disabled"))
print(file=sys.stderr)
+107
View File
@@ -0,0 +1,107 @@
"""Docker network plumbing for the per-agent egress-proxy topology.
The agent container sits on a Docker `--internal` network (no default
gateway). Pipelock straddles that network and a per-agent user-defined
bridge for upstream egress. We deliberately do NOT use Docker's legacy
`bridge` network because only user-defined bridges run Docker's
embedded DNS resolver, which pipelock needs to resolve api.anthropic.com
and similar upstream hostnames.
Naming: claude-bottle-net-<slug> (internal),
claude-bottle-egress-<slug> (egress). Numeric suffix on conflict
(-2, -3, ..., capped at 100).
"""
from __future__ import annotations
import subprocess
from ...log import die, info, warn
def network_name_for_slug(slug: str) -> str:
return f"claude-bottle-net-{slug}"
def network_egress_name_for_slug(slug: str) -> str:
return f"claude-bottle-egress-{slug}"
def network_exists(name: str) -> bool:
"""Uses `docker network inspect`, not `docker network ls -f name=...`,
because the latter does substring matching."""
return (
subprocess.run(
["docker", "network", "inspect", name],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
).returncode
== 0
)
def _network_create_with_prefix(base: str, internal: bool) -> str:
"""Create a per-agent Docker network whose name is <base> (with
-2, -3, ... appended on conflict, capped at 100). Returns the
resolved name."""
name = base
suffix = 2
while network_exists(name):
name = f"{base}-{suffix}"
suffix += 1
if suffix > 100:
die(
f"could not find a free network name after {base}-99; "
f"clean up old networks with 'docker network rm <name>'"
)
kind = "internal" if internal else "bridge (egress)"
args = ["docker", "network", "create"]
if internal:
args.append("--internal")
args.append(name)
info(f"creating {kind} network {name}")
result = subprocess.run(args, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
if result.returncode != 0:
flag = " --internal" if internal else ""
die(f"docker network create{flag} {name} failed")
return name
def network_create_internal(slug: str) -> str:
"""Create a Docker `--internal` network for the agent. Returns the
resolved name."""
return _network_create_with_prefix(network_name_for_slug(slug), internal=True)
def network_create_egress(slug: str) -> str:
"""Create a per-agent user-defined bridge (NOT the legacy `bridge`)
so the pipelock sidecar has working DNS for upstream hostnames."""
return _network_create_with_prefix(network_egress_name_for_slug(slug), internal=False)
def network_attach(network: str, container: str) -> None:
result = subprocess.run(
["docker", "network", "connect", network, container],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
if result.returncode != 0:
die(f"docker network connect {network} {container} failed")
def network_remove(name: str) -> bool:
"""Idempotent: a missing network is treated as success so this can
be called from a teardown trap. Returns True if removal succeeded
(or the network was already gone)."""
if not network_exists(name):
return True
result = subprocess.run(
["docker", "network", "rm", name],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
if result.returncode != 0:
warn(f"failed to remove network {name}; clean up with 'docker network rm {name}'")
return False
return True
+108
View File
@@ -0,0 +1,108 @@
"""Docker host-side primitives used by DockerBottleBackend: probing
for docker on PATH, slugifying agent names, checking image/container
existence, and building images."""
from __future__ import annotations
import re
import shutil
import subprocess
from typing import Iterable
from ...log import die, info
def runsc_available() -> bool:
"""Return True if the Docker daemon has the gVisor (`runsc`) runtime
registered. Called once per prepare; the result lives on the plan."""
r = subprocess.run(
["docker", "info", "--format", "{{json .Runtimes}}"],
capture_output=True,
text=True,
)
return r.returncode == 0 and "runsc" in r.stdout
def require_docker() -> None:
"""Fail with an install pointer if `docker` is not on PATH."""
if shutil.which("docker") is None:
info("Docker is required but was not found on PATH.")
info("macOS: install Docker Desktop https://docs.docker.com/desktop/install/mac-install/")
info("Linux: install Docker Engine https://docs.docker.com/engine/install/")
die("docker not found")
def image_exists(ref: str) -> bool:
return _silent_run(["docker", "image", "inspect", ref]) == 0
def container_exists(name: str) -> bool:
"""Returns True if a container (running or stopped) with the given
name exists. Uses `docker ps -a -q -f name=^<name>$` so substring
matches don't false-positive."""
result = subprocess.run(
["docker", "ps", "-a", "-q", "-f", f"name=^{name}$"],
capture_output=True,
text=True,
)
return bool(result.stdout.strip())
_SLUG_RE = re.compile(r"[^a-z0-9]+")
def slugify(name: str) -> str:
"""Lowercase, non-alnum runs → '-', trimmed. Dies on empty result."""
if not name:
die("slugify: missing name")
slug = _SLUG_RE.sub("-", name.lower()).strip("-")
if not slug:
die(f"name '{name}' produced an empty slug; use alphanumeric characters")
return slug
def build_image(ref: str, context: str) -> None:
"""Invokes `docker build` every call. Layer cache makes no-change
rebuilds cheap; running every time means Dockerfile edits land
without manual `docker rmi`."""
info(f"building image {ref} from {context} (layer cache keeps repeat builds fast)")
subprocess.run(["docker", "build", "-t", ref, context], check=True)
_TRUST_DIALOG_NODE_SCRIPT = (
'const fs=require("fs"),p=process.env.HOME+"/.claude.json",'
'c=JSON.parse(fs.readFileSync(p,"utf8"));'
'c.projects=c.projects||{};'
'c.projects[process.env.HOME+"/workspace"]={hasTrustDialogAccepted:true};'
'fs.writeFileSync(p,JSON.stringify(c,null,2));'
)
def build_image_with_cwd(derived: str, base: str, cwd: str) -> None:
"""Build a thin derived image that copies <cwd> into
/home/node/workspace and adds a trust-dialog entry for it."""
import os
if not os.path.isdir(cwd):
die(f"cwd not found at {cwd}")
info(f"building image {derived} from {base} with {cwd} -> /home/node/workspace")
dockerfile = (
f"FROM {base}\n"
f"COPY --chown=node:node . /home/node/workspace\n"
f"RUN node -e '{_TRUST_DIALOG_NODE_SCRIPT}'\n"
f"WORKDIR /home/node/workspace\n"
)
subprocess.run(
["docker", "build", "-t", derived, "-f", "-", cwd],
input=dockerfile,
text=True,
check=True,
)
def _silent_run(cmd: Iterable[str]) -> int:
return subprocess.run(
list(cmd),
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
).returncode