Files
bot-bottle/claude_bottle/backend/docker/backend.py
T
didericis c9fe23a043
test / run tests/run_tests.py (pull_request) Successful in 18s
refactor(docker): absorb claude_bottle/skills.py into DockerBottleBackend
The whole module folds into two methods on the backend:

  validate_skills(skills)  — called from prepare; fails loudly when
                              a named skill is missing on the host so
                              the user doesn't get a y/N for a plan
                              that's already known to break.
  _host_skill_dir(name)    — private helper used by both
                              validate_skills and provision_skills.

skills.py is deleted; the four prior functions (host_skill_dir,
host_skill_exists, require_host_skill, skills_validate_all) collapse
into the two above without losing the pre-y/N validation.
2026-05-11 00:44:34 -04:00

478 lines
18 KiB
Python

"""DockerBottleBackend — the Docker implementation of BottleBackend.
Methods:
.prepare(spec, stage_dir=...) -> DockerBottlePlan
.launch(plan) -> ContextManager[DockerBottle]
.prepare_cleanup() -> DockerBottleCleanupPlan
.cleanup(plan) -> None
.list_active() -> None
"""
from __future__ import annotations
import os
import subprocess
import sys
from contextlib import contextmanager
from pathlib import Path
from typing import Iterator
from ... import pipelock
from ... import ssh as ssh_mod
from ...env_resolve import env_resolve
from ...log import die, info
from .. import BottleBackend, BottleCleanupPlan, BottlePlan, BottleSpec
from . import network as network_mod
from . import util as docker_mod
from .bottle import DockerBottle
from .bottle_cleanup_plan import DockerBottleCleanupPlan
from .bottle_plan import DockerBottlePlan
# Where the repo root lives, for `docker build` context. Computed once.
_REPO_DIR = str(Path(__file__).resolve().parent.parent.parent.parent)
class DockerBottleBackend(BottleBackend):
"""Docker backend implementation. Selected by CLAUDE_BOTTLE_BACKEND
(default)."""
name = "docker"
def prepare(self, spec: BottleSpec, *, stage_dir: Path) -> DockerBottlePlan:
"""Resolve names, validate, write scratch files. No Docker
resources are created; the only side effects are host-side
files under stage_dir and a probe of `docker info`."""
docker_mod.require_docker()
manifest = spec.manifest
manifest.require_agent(spec.agent_name)
agent = manifest.agents[spec.agent_name]
bottle = manifest.bottle_for(spec.agent_name)
bottle_name = agent.bottle
slug = docker_mod.slugify(spec.agent_name)
image = os.environ.get("CLAUDE_BOTTLE_IMAGE", "claude-bottle:latest")
derived_image = ""
runtime_image = image
if spec.copy_cwd:
derived_image = os.environ.get(
"CLAUDE_BOTTLE_DERIVED_IMAGE", f"claude-bottle:cwd-{slug}"
)
runtime_image = derived_image
default_container = f"claude-bottle-{slug}"
pinned_container = os.environ.get("CLAUDE_BOTTLE_CONTAINER", "")
container_name = pinned_container or default_container
container_name_pinned = bool(pinned_container)
suffix = 2
if container_name_pinned:
if docker_mod.container_exists(container_name):
die(
f"container '{container_name}' already exists "
f"(pinned via CLAUDE_BOTTLE_CONTAINER). "
f"Remove it with 'docker rm -f {container_name}' or unset the override."
)
else:
while docker_mod.container_exists(container_name):
container_name = f"{default_container}-{suffix}"
suffix += 1
if suffix > 100:
die(
f"could not find a free container name after "
f"{default_container}-99; clean up old containers with "
f"'docker rm -f <name>'"
)
if agent.skills:
self.validate_skills(list(agent.skills))
if bottle.ssh:
ssh_mod.ssh_validate_entries(bottle.ssh)
env_file = stage_dir / "agent.env"
args_file = stage_dir / "docker-args"
prompt_file = stage_dir / "prompt.txt"
pipelock_yaml_filename = "pipelock.yaml"
pipelock_yaml = stage_dir / pipelock_yaml_filename
env_file.write_text("")
env_file.chmod(0o600)
args_file.write_text("")
prompt_file.write_text("")
prompt_file.chmod(0o600)
pipelock.pipelock_write_yaml(manifest, bottle_name, pipelock_yaml)
env_resolve(manifest, spec.agent_name, env_file, args_file)
prompt_file.write_text(agent.prompt)
allowlist_summary = pipelock.pipelock_allowlist_summary(manifest, bottle_name)
use_runsc = docker_mod.runsc_available()
return DockerBottlePlan(
spec=spec,
stage_dir=stage_dir,
slug=slug,
container_name=container_name,
container_name_pinned=container_name_pinned,
image=image,
derived_image=derived_image,
runtime_image=runtime_image,
env_file=env_file,
args_file=args_file,
prompt_file=prompt_file,
pipelock_yaml_path=pipelock_yaml,
pipelock_yaml_filename=pipelock_yaml_filename,
allowlist_summary=allowlist_summary,
use_runsc=use_runsc,
)
@contextmanager
def launch(self, plan: BottlePlan) -> Iterator[DockerBottle]:
"""Build, launch, and provision a Docker bottle. Teardown on exit."""
assert isinstance(plan, DockerBottlePlan), (
f"DockerBottleBackend.launch expects DockerBottlePlan, "
f"got {type(plan).__name__}"
)
state: dict[str, str] = {
"container": "",
"pipelock": "",
"internal_network": "",
"egress_network": "",
}
def teardown() -> None:
try:
if state["container"] and docker_mod.container_exists(state["container"]):
subprocess.run(
["docker", "rm", "-f", state["container"]],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
state["container"] = ""
if state["pipelock"]:
pipelock.pipelock_stop(plan.slug)
state["pipelock"] = ""
if state["internal_network"]:
network_mod.network_remove(state["internal_network"])
state["internal_network"] = ""
if state["egress_network"]:
network_mod.network_remove(state["egress_network"])
state["egress_network"] = ""
except BaseException:
# Teardown must not raise; swallow so the caller's
# __exit__ path can still propagate the original error.
pass
try:
docker_mod.build_image(plan.image, _REPO_DIR)
if plan.derived_image:
docker_mod.build_image_with_cwd(
plan.derived_image, plan.image, plan.spec.user_cwd
)
state["internal_network"] = network_mod.network_create_internal(plan.slug)
state["egress_network"] = network_mod.network_create_egress(plan.slug)
state["pipelock"] = pipelock.pipelock_start(
plan.slug,
state["internal_network"],
state["egress_network"],
plan.stage_dir,
plan.pipelock_yaml_filename,
)
container = self._run_agent_container(plan, state["internal_network"])
state["container"] = container
prompt_path = self.provision(plan, container)
bottle = DockerBottle(container, teardown, prompt_path)
yield bottle
finally:
teardown()
def _run_agent_container(self, plan: DockerBottlePlan, internal_network: str) -> str:
"""Build the `docker run` argv and execute it, handling
name-conflict races by incrementing the suffix (unless the name
was user-pinned). Returns the resolved container name."""
proxy_url = pipelock.pipelock_proxy_url(plan.slug)
docker_args: list[str] = [
"--rm", "-d",
"--name", plan.container_name,
"--network", internal_network,
"-e", f"HTTPS_PROXY={proxy_url}",
"-e", f"HTTP_PROXY={proxy_url}",
"-e", "NO_PROXY=localhost,127.0.0.1",
]
if plan.use_runsc:
docker_args.extend(["--runtime", "runsc"])
if plan.env_file.stat().st_size > 0:
docker_args.extend(["--env-file", str(plan.env_file)])
# ARGS_FILE pairs (-e, NAME) line-by-line.
args_lines = plan.args_file.read_text().splitlines()
i = 0
while i < len(args_lines):
flag = args_lines[i]
i += 1
if not flag:
continue
if i >= len(args_lines):
break
vname = args_lines[i]
i += 1
docker_args.extend([flag, vname])
if plan.spec.forward_oauth_token:
os.environ["CLAUDE_CODE_OAUTH_TOKEN"] = os.environ["CLAUDE_BOTTLE_OAUTH_TOKEN"]
docker_args.extend(["-e", "CLAUDE_CODE_OAUTH_TOKEN"])
docker_args.extend([plan.runtime_image, "sleep", "infinity"])
info(f"starting container {plan.container_name} from {plan.runtime_image}")
container = plan.container_name
base_name = plan.container_name
suffix = 2
while True:
run_result = subprocess.run(
["docker", "run", *docker_args],
capture_output=True,
text=True,
)
if run_result.returncode == 0:
return container
err_text = run_result.stderr
if plan.container_name_pinned or "is already in use" not in err_text:
sys.stderr.write(err_text + "\n")
die(f"docker run failed for container '{container}'")
if suffix > 100:
die(
f"could not find a free container name after "
f"{base_name}-99 retries; clean up old containers"
)
container = f"{base_name}-{suffix}"
suffix += 1
name_idx = docker_args.index("--name") + 1
docker_args[name_idx] = container
info(f"name conflict; retrying as {container}")
def provision_prompt(self, plan: BottlePlan, target: str) -> str | None:
"""Copy the prompt file into the container, fix ownership/mode.
Returns the in-container path if the agent has a non-empty
prompt (drives --append-system-prompt-file), else None. The
file is copied either way so the path always exists."""
assert isinstance(plan, DockerBottlePlan)
container = target
container_home = os.environ.get("CLAUDE_BOTTLE_CONTAINER_HOME", "/home/node")
in_container_prompt_path = f"{container_home}/.claude-bottle-prompt.txt"
subprocess.run(
["docker", "cp", str(plan.prompt_file), f"{container}:{in_container_prompt_path}"],
stdout=subprocess.DEVNULL,
check=True,
)
# `docker cp` preserves host UID; re-own/mode as root so node
# can read its own mode-600 prompt regardless of host UID.
subprocess.run(
["docker", "exec", "-u", "0", container, "chown", "node:node", in_container_prompt_path],
stdout=subprocess.DEVNULL,
check=True,
)
subprocess.run(
["docker", "exec", "-u", "0", container, "chmod", "600", in_container_prompt_path],
stdout=subprocess.DEVNULL,
check=True,
)
agent = plan.spec.manifest.agents[plan.spec.agent_name]
return in_container_prompt_path if agent.prompt else None
def validate_skills(self, skills: list[str]) -> None:
"""Fail loudly if any named skill is missing from the host's
~/.claude/skills/. Called from `prepare` before the y/N so the
user doesn't get a launch prompt for a plan that's already
known to break."""
for name in skills:
path = self._host_skill_dir(name)
if not os.path.isdir(path):
die(
f"skill '{name}' not found on host at {path}. "
f"Create it under ~/.claude/skills/, then re-run."
)
def _host_skill_dir(self, name: str) -> str:
home = os.environ.get("HOME")
if not home:
die("HOME not set")
return f"{home}/.claude/skills/{name}"
def provision_skills(self, plan: BottlePlan, target: str) -> None:
"""Copy each of the agent's named skills from the host's
~/.claude/skills/<name>/ into the container's equivalent path.
For each skill: ensure parent dir, wipe any prior copy, then
`docker cp <host>/. <container>:<dst>/` so the contents are
copied into a freshly-created destination dir. No-op when the
agent has no skills."""
assert isinstance(plan, DockerBottlePlan)
agent = plan.spec.manifest.agents[plan.spec.agent_name]
if not agent.skills:
return
container = target
container_home = os.environ.get("CLAUDE_BOTTLE_CONTAINER_HOME", "/home/node")
skills_dir = os.environ.get(
"CLAUDE_BOTTLE_CONTAINER_SKILLS_DIR", f"{container_home}/.claude/skills"
)
subprocess.run(
["docker", "exec", container, "mkdir", "-p", skills_dir],
stdout=subprocess.DEVNULL,
check=True,
)
for n in agent.skills:
src = self._host_skill_dir(n)
if not os.path.isdir(src):
die(f"skill '{n}' disappeared from host between validation and copy at {src}.")
dst = f"{skills_dir}/{n}"
info(f"copying skill {n} into {container}:{dst}")
subprocess.run(
["docker", "exec", container, "rm", "-rf", dst],
stdout=subprocess.DEVNULL,
check=True,
)
subprocess.run(
["docker", "exec", container, "mkdir", "-p", dst],
stdout=subprocess.DEVNULL,
check=True,
)
subprocess.run(
["docker", "cp", f"{src}/.", f"{container}:{dst}/"],
stdout=subprocess.DEVNULL,
check=True,
)
def provision_ssh(self, plan: BottlePlan, target: str) -> None:
"""If the bottle has SSH entries, set up the in-container
ssh-agent and config so node can authenticate without ever
seeing the key bytes. No-op when the bottle has no SSH."""
assert isinstance(plan, DockerBottlePlan)
bottle = plan.spec.manifest.bottle_for(plan.spec.agent_name)
if not bottle.ssh:
return
proxy_host_port = pipelock.pipelock_proxy_host_port(plan.slug)
ssh_mod.ssh_setup(target, plan.stage_dir, proxy_host_port, bottle.ssh)
def provision_git(self, plan: BottlePlan, target: str) -> None:
"""If --cwd was set and the host cwd has a .git directory, copy
it into /home/node/workspace/.git and fix ownership. No-op
otherwise."""
assert isinstance(plan, DockerBottlePlan)
if not (plan.spec.copy_cwd and Path(plan.spec.user_cwd, ".git").is_dir()):
return
container = target
info(f"copying {plan.spec.user_cwd}/.git -> {container}:/home/node/workspace/.git")
subprocess.run(
["docker", "cp", f"{plan.spec.user_cwd}/.git", f"{container}:/home/node/workspace/.git"],
stdout=subprocess.DEVNULL,
check=True,
)
subprocess.run(
[
"docker", "exec", "-u", "0", container,
"chown", "-R", "node:node", "/home/node/workspace/.git",
],
stdout=subprocess.DEVNULL,
check=True,
)
# --- Cleanup ---
def prepare_cleanup(self) -> DockerBottleCleanupPlan:
"""Enumerate all claude-bottle-prefixed containers (running or
stopped) and networks. No removals — caller confirms first."""
docker_mod.require_docker()
# `docker ps -a --filter name=...` uses regex matching; anchor at
# the start so we don't pick up containers that merely contain
# "claude-bottle-" mid-name.
cr = subprocess.run(
[
"docker", "ps", "-a",
"--filter", "name=^claude-bottle-",
"--format", "{{.Names}}",
],
capture_output=True,
text=True,
)
containers = tuple(sorted(
line for line in (cr.stdout or "").splitlines() if line
))
# `docker network ls --filter name=...` uses substring matching.
# "claude-bottle-" is specific enough that false positives are
# not a concern.
nr = subprocess.run(
[
"docker", "network", "ls",
"--filter", "name=claude-bottle-",
"--format", "{{.Name}}",
],
capture_output=True,
text=True,
)
networks = tuple(sorted(
line for line in (nr.stdout or "").splitlines() if line
))
return DockerBottleCleanupPlan(containers=containers, networks=networks)
def cleanup(self, plan: BottleCleanupPlan) -> None:
"""Remove the containers and networks listed in the plan.
Containers first; networks would refuse to delete while
containers are still attached."""
assert isinstance(plan, DockerBottleCleanupPlan), (
f"DockerBottleBackend.cleanup expects DockerBottleCleanupPlan, "
f"got {type(plan).__name__}"
)
for name in plan.containers:
info(f"removing container {name}")
subprocess.run(
["docker", "rm", "-f", name],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
for name in plan.networks:
info(f"removing network {name}")
subprocess.run(
["docker", "network", "rm", name],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
# --- List ---
def list_active(self) -> None:
"""Print all running claude-bottle containers (name + status).
Prints a single-line banner if there are none."""
docker_mod.require_docker()
result = subprocess.run(
[
"docker", "ps",
"--filter", "name=^claude-bottle-",
"--format", "{{.Names}}\t{{.Status}}",
],
capture_output=True,
text=True,
)
containers = (result.stdout or "").strip()
if not containers:
info("no active claude-bottle containers")
return
print()
for line in containers.splitlines():
name, _, status = line.partition("\t")
info(f"container: {name} status: {status}")
print()