refactor(sidecars): drop vestigial start/stop methods (PRD 0024 chunk 3)
test / unit (pull_request) Successful in 21s
test / integration (pull_request) Successful in 41s

Compose-up has owned per-container lifecycle since PRD 0018 ch3;
the .start() / .stop() methods on DockerPipelockProxy /
DockerEgress / DockerGitGate / DockerSupervise (and their
abstractmethod declarations in the four base ABCs) were already
documented as vestigial. With the bundle path in flight
(PRD 0024 ch2), they are truly dead — collapse to nothing.

Changes:
- Removed start/stop methods from the four DockerSidecar
  classes. Plan dataclasses, image/path constants,
  container-name helpers, and the .prepare() methods all stay
  (the renderer + apply path still need them).
- Removed the matching @abstractmethod declarations in the
  base ABCs so concrete subclasses don't have to stub them.
- launch.launch() and prepare.resolve_plan() no longer take
  proxy/git_gate/egress/supervise instance parameters. backend.py
  loses the four instance attributes it threaded through.
  prepare.resolve_plan() instantiates the four classes itself
  to call their .prepare() methods.
- Deleted four integration tests that only exercised the
  removed lifecycle: test_pipelock_sidecar_smoke,
  test_supervise_sidecar, test_git_gate_sidecar,
  test_git_gate_mirror.
- Dropped the .stop-idempotency case in test_orphan_cleanup;
  the network-cleanup cases stay (those test real production
  code).
- Marked test_pipelock_apply @skip pending chunk 4 — its
  bringup helper used .start; chunk 4 rewrites it with direct
  `docker run`.

Dockerfile deletion deferred to chunk 5 (when the bundle flag
default flips) — the legacy compose path still needs
Dockerfile.{egress,git-gate,supervise} until then.

Net: 708 lines removed, 80 added.

533 unit tests + 27 integration tests passing (5 skipped: the
chunk-4-pending case + existing GITEA_ACTIONS guards).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-27 01:01:10 -04:00
parent c37344608b
commit 539234f29e
18 changed files with 80 additions and 1758 deletions
-391
View File
@@ -1,391 +0,0 @@
"""Integration: the git-gate is a bidirectional mirror of its
upstream (PRD 0008 v1.1).
Three round-trip assertions against a real Docker daemon plus a
sibling sshd container playing the role of "real upstream":
1. clone-through-gate returns whatever the upstream has at the
moment of clone (refs + content).
2. After a second commit lands on the upstream out-of-band, a
fetch through the gate picks it up — the access-hook is
refreshing before each upload-pack.
3. A push through the gate (clean commit) lands on the upstream's
bare repo — the pre-receive hook's forward phase works.
These are the user-facing semantics: every operation against the
gate is observably equivalent to the same operation against the
real upstream.
"""
import dataclasses
import os
import shutil
import subprocess
import tempfile
import textwrap
import unittest
from pathlib import Path
from claude_bottle.backend.docker.git_gate import (
DockerGitGate,
build_git_gate_image,
)
from claude_bottle.backend.docker.network import (
network_create_egress,
network_create_internal,
network_remove,
)
from claude_bottle.manifest import Manifest
from tests._docker import skip_unless_docker
# Same image used by test_git_gate_sidecar — alpine + git + gitleaks.
CLIENT_IMAGE = "zricethezav/gitleaks@sha256:c00b6bd0aeb3071cbcb79009cb16a60dd9e0a7c60e2be9ab65d25e6bc8abbb7f"
# Built once in setUpClass via `docker build -` from the inline
# Dockerfile below. Carries openssh-server, a `git` user, baked-in
# host keys, and a bare repo at /git/foo.git seeded with one commit.
UPSTREAM_IMAGE = "claude-bottle-test-upstream:latest"
UPSTREAM_DOCKERFILE = textwrap.dedent("""
FROM alpine:3.20
RUN apk add --no-cache openssh-server git
RUN adduser -D -s /usr/bin/git-shell git && \\
passwd -u git && \\
mkdir -p /home/git/.ssh && \\
chown git:git /home/git/.ssh && \\
chmod 700 /home/git/.ssh && \\
mkdir -p /git && \\
chown git:git /git
# Bake host keys into the image so the test can pin the
# KnownHostKey value before the container starts. Re-running
# ssh-keygen -A at boot would invalidate that pinning.
RUN ssh-keygen -A
USER git
RUN git config --global init.defaultBranch main && \\
git config --global user.email upstream@example && \\
git config --global user.name upstream && \\
git init --bare /git/foo.git && \\
git clone /git/foo.git /tmp/w && \\
cd /tmp/w && \\
echo "initial upstream content" > README.md && \\
git add README.md && \\
git commit -q -m "initial commit" && \\
git push -q origin main && \\
rm -rf /tmp/w
USER root
RUN echo "PermitRootLogin no" >> /etc/ssh/sshd_config && \\
echo "PasswordAuthentication no" >> /etc/ssh/sshd_config && \\
echo "AuthorizedKeysFile /home/git/.ssh/authorized_keys" >> /etc/ssh/sshd_config
CMD ["/usr/sbin/sshd", "-D", "-e"]
""").strip()
@skip_unless_docker()
class TestGitGateBidirectionalMirror(unittest.TestCase):
@classmethod
def setUpClass(cls):
# Pull the client image first (other suites do the same — keeps
# registry races contained to setUpClass).
if subprocess.run(
["docker", "pull", CLIENT_IMAGE],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False,
).returncode != 0:
raise unittest.SkipTest(f"could not pull {CLIENT_IMAGE}")
# Build the upstream sshd image from stdin (no build context
# needed — Dockerfile has no COPY/ADD).
build_result = subprocess.run(
["docker", "build", "-t", UPSTREAM_IMAGE, "-"],
input=UPSTREAM_DOCKERFILE,
text=True,
capture_output=True,
check=False,
)
if build_result.returncode != 0:
raise unittest.SkipTest(
f"could not build upstream image: {build_result.stderr}"
)
# Pull the upstream's baked-in ed25519 host pubkey out of the
# image so we can pin it as KnownHostKey on the gate's manifest
# entry. Reading from a transient container ensures we get the
# same key the running sshd will present.
pub_result = subprocess.run(
["docker", "run", "--rm", "--entrypoint", "cat",
UPSTREAM_IMAGE, "/etc/ssh/ssh_host_ed25519_key.pub"],
capture_output=True, text=True, check=True,
)
parts = pub_result.stdout.strip().split()
# Format: "ssh-ed25519 <base64-pubkey> <comment>" — drop comment.
cls.upstream_host_key = f"{parts[0]} {parts[1]}"
# Build the gate image (uses build cache after the first run).
build_git_gate_image()
def setUp(self):
suffix = self.id().rsplit('.', 1)[-1].replace('_', '-')[-12:]
self.slug = f"t{os.getpid()}-{suffix}"
self.gate_name = ""
self.upstream_name = f"claude-bottle-test-upstream-{self.slug}"
self.internal_net = ""
self.egress_net = ""
self.work_dir = Path(tempfile.mkdtemp())
# Per-test SSH auth keypair. The host gets the private key
# path on disk (manifest IdentityFile); the upstream's
# authorized_keys gets the public key, docker-cp'd in just
# before sshd starts.
self.auth_key = self.work_dir / "auth_key"
subprocess.run(
["ssh-keygen", "-t", "ed25519", "-N", "", "-f", str(self.auth_key),
"-C", "git-gate-test"],
check=True, stdout=subprocess.DEVNULL,
)
self.auth_pub = self.work_dir / "auth_key.pub"
# Networks first so the upstream can attach to the egress
# network at create time.
self.internal_net = network_create_internal(self.slug)
self.egress_net = network_create_egress(self.slug)
# Start the upstream sshd container, attached to the egress
# network (which the gate also lives on). Container name doubles
# as its DNS-resolvable hostname.
subprocess.run(
["docker", "create",
"--name", self.upstream_name,
"--network", self.egress_net,
UPSTREAM_IMAGE],
check=True, stdout=subprocess.DEVNULL,
)
# docker cp the per-test pubkey into the upstream as
# /home/git/.ssh/authorized_keys (right user, right path).
subprocess.run(
["docker", "cp", str(self.auth_pub),
f"{self.upstream_name}:/home/git/.ssh/authorized_keys"],
check=True, stdout=subprocess.DEVNULL,
)
# chown / chmod the authorized_keys before sshd refuses to
# use it.
for argv in (
["chown", "git:git", "/home/git/.ssh/authorized_keys"],
["chmod", "600", "/home/git/.ssh/authorized_keys"],
):
subprocess.run(
["docker", "exec", "-u", "0", self.upstream_name, *argv],
check=False, stdout=subprocess.DEVNULL,
)
# The exec-then-start ordering is unusual — exec on a stopped
# container is OK on modern docker but if it errors we just
# do the chown after start instead. Retry post-start to be
# safe.
subprocess.run(
["docker", "start", self.upstream_name],
check=True, stdout=subprocess.DEVNULL,
)
for argv in (
["chown", "git:git", "/home/git/.ssh/authorized_keys"],
["chmod", "600", "/home/git/.ssh/authorized_keys"],
):
subprocess.run(
["docker", "exec", "-u", "0", self.upstream_name, *argv],
check=False, stdout=subprocess.DEVNULL,
)
# Wait for sshd to bind; a short retry against TCP 22 is enough.
ready = False
for _ in range(30):
probe = subprocess.run(
["docker", "exec", self.upstream_name,
"sh", "-c", "nc -z 127.0.0.1 22"],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False,
)
if probe.returncode == 0:
ready = True
break
subprocess.run(["sleep", "0.2"], check=False)
if not ready:
self.fail("upstream sshd never bound port 22")
# Build the gate plan + start it. Upstream URL points at the
# upstream container's hostname (Docker DNS resolves it on the
# egress network) on port 22, user `git`.
manifest = Manifest.from_json_obj({
"bottles": {
"dev": {
"git": [{
"Name": "foo",
"Upstream": f"ssh://git@{self.upstream_name}/git/foo.git",
"IdentityFile": str(self.auth_key),
"KnownHostKey": self.upstream_host_key,
}],
},
},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
})
bottle = manifest.bottles["dev"]
gate = DockerGitGate()
prep = gate.prepare(bottle, self.slug, self.work_dir)
plan = dataclasses.replace(
prep,
internal_network=self.internal_net,
egress_network=self.egress_net,
)
self.gate_name = gate.start(plan)
def tearDown(self):
if self.gate_name:
DockerGitGate().stop(self.gate_name)
if self.upstream_name:
subprocess.run(
["docker", "rm", "-f", self.upstream_name],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False,
)
for n in (self.internal_net, self.egress_net):
if n:
network_remove(n)
shutil.rmtree(self.work_dir, ignore_errors=True)
def _upstream_main_sha(self) -> str:
"""Read upstream's current refs/heads/main sha by exec'ing
directly into the upstream container's bare repo."""
out = subprocess.run(
["docker", "exec", "-u", "git", self.upstream_name,
"git", "-C", "/git/foo.git", "rev-parse", "refs/heads/main"],
capture_output=True, text=True, check=True,
)
return out.stdout.strip()
def _push_to_upstream_oob(self, message: str) -> str:
"""Make a new commit directly on the upstream's bare repo
(out-of-band, not through the gate). Returns the new sha."""
script = textwrap.dedent(f"""
set -e
cd /tmp
rm -rf w
git clone /git/foo.git w
cd w
git config user.email upstream@example
git config user.name upstream
echo "$RANDOM-$$" >> README.md
git add README.md
git commit -q -m "{message}"
git push -q origin main
git rev-parse HEAD
""").strip()
out = subprocess.run(
["docker", "exec", "-u", "git", self.upstream_name,
"sh", "-c", script],
capture_output=True, text=True, check=True,
)
return out.stdout.strip().splitlines()[-1]
@unittest.skipIf(
os.environ.get("GITEA_ACTIONS") == "true",
"skipped under act_runner: docker socket mount topology breaks "
"in-process visibility of networks created on the host daemon",
)
def test_clone_and_refetch_reflect_upstream(self):
"""Clone via gate returns upstream's commit. After a second
commit lands on the upstream out-of-band, a re-fetch through
the gate picks it up — the access-hook is refreshing before
each upload-pack."""
initial_sha = self._upstream_main_sha()
# Clone via gate.
clone_script = (
f"set -e\n"
f"cd /tmp && git clone -q git://{self.gate_name}/foo.git r\n"
f"git -C r rev-parse refs/remotes/origin/main\n"
f"cat r/README.md\n"
)
clone = subprocess.run(
["docker", "run", "--rm",
"--network", self.internal_net,
"--entrypoint", "sh",
CLIENT_IMAGE,
"-c", clone_script],
capture_output=True, text=True, timeout=60, check=False,
)
self.assertEqual(
0, clone.returncode,
f"clone via gate failed: stdout={clone.stdout!r} "
f"stderr={clone.stderr!r}",
)
cloned_sha = clone.stdout.strip().splitlines()[0]
self.assertEqual(
initial_sha, cloned_sha,
"clone via gate must return the upstream's current sha",
)
self.assertIn("initial upstream content", clone.stdout)
# Out-of-band commit on the upstream.
new_sha = self._push_to_upstream_oob("second commit")
self.assertNotEqual(initial_sha, new_sha)
# ls-remote via gate (re-fetch should pick up the new sha).
ls = subprocess.run(
["docker", "run", "--rm",
"--network", self.internal_net,
"--entrypoint", "sh",
CLIENT_IMAGE,
"-c", f"git ls-remote git://{self.gate_name}/foo.git refs/heads/main"],
capture_output=True, text=True, timeout=60, check=False,
)
self.assertEqual(0, ls.returncode, f"ls-remote failed: {ls.stderr!r}")
gate_sha = ls.stdout.split()[0]
self.assertEqual(
new_sha, gate_sha,
"ls-remote via gate must reflect the upstream's out-of-band update; "
"if this assertion fails, the access-hook is not refreshing on every "
"upload-pack and the gate is serving stale data",
)
@unittest.skipIf(
os.environ.get("GITEA_ACTIONS") == "true",
"skipped under act_runner: docker socket mount topology breaks "
"in-process visibility of networks created on the host daemon",
)
def test_push_through_gate_lands_on_upstream(self):
"""A clean (no-gitleaks-hit) push through the gate lands on
the upstream's bare repo — pre-receive phase 2 forwards
the accepted refs."""
# Make a commit through the gate. The script clones via gate
# (so the commit will be a child of upstream's current main).
push_script = textwrap.dedent(f"""
set -e
cd /tmp
git clone -q git://{self.gate_name}/foo.git r
cd r
git config user.email client@example
git config user.name client
echo "client-side commit" > NEW.md
git add NEW.md
git commit -q -m "client commit"
git rev-parse HEAD
git push origin main 2>&1
""").strip()
push = subprocess.run(
["docker", "run", "--rm",
"--network", self.internal_net,
"--entrypoint", "sh",
CLIENT_IMAGE,
"-c", push_script],
capture_output=True, text=True, timeout=120, check=False,
)
self.assertEqual(
0, push.returncode,
f"push via gate failed: stdout={push.stdout!r} "
f"stderr={push.stderr!r}",
)
client_sha = push.stdout.splitlines()[0].strip()
self.assertEqual(
client_sha, self._upstream_main_sha(),
"push via gate must land on upstream's bare repo; "
"if this fails the pre-receive forward phase is broken or the "
"upstream credential is misconfigured",
)
if __name__ == "__main__":
unittest.main()
-224
View File
@@ -1,224 +0,0 @@
"""Integration: per-agent git-gate sidecar (PRD 0008).
Two tests against a real Docker daemon:
1. ls-remote against a gate whose upstream is unreachable fails
with the access-hook's fail-closed rejection. Proves the
daemon is bound to its port AND the access-hook is wired:
a working ls-remote against the gate is necessarily a working
ls-remote against the upstream (PRD 0008's transparent-mirror
contract).
2. A push containing a gitleaks-detectable secret is rejected
by the pre-receive hook with a non-zero exit on the agent
side and a gitleaks-rejection line in the response. The PRD's
primary success criterion.
A successful round-trip (clone through gate reflects upstream)
needs a reachable upstream SSH host; deferred to a follow-up.
"""
import dataclasses
import os
import shutil
import subprocess
import tempfile
import unittest
from pathlib import Path
from claude_bottle.backend.docker.git_gate import (
DockerGitGate,
build_git_gate_image,
)
from claude_bottle.backend.docker.network import (
network_create_egress,
network_create_internal,
network_remove,
)
from claude_bottle.manifest import Manifest
from tests._docker import skip_unless_docker
# The official gitleaks image already has git + alpine; reusing it
# for the client side too saves a separate image pull.
CLIENT_IMAGE = "zricethezav/gitleaks@sha256:c00b6bd0aeb3071cbcb79009cb16a60dd9e0a7c60e2be9ab65d25e6bc8abbb7f"
# Synthetic high-entropy AKIA-shaped string; gitleaks's aws-access-token
# rule fires on this with the default config. AWS's own example
# ("AKIAIOSFODNN7EXAMPLE") is NOT flagged by gitleaks v8.x — entropy
# filter rejects it — so we use a distinct random-looking value.
FAKE_AWS_KEY = "AKIAQRJHK7N5ZPM2VXTL"
@skip_unless_docker()
class TestGitGateSidecar(unittest.TestCase):
@classmethod
def setUpClass(cls):
# Pre-pull the client/gitleaks base so per-test runs aren't
# racing the registry. Skip cleanly on pull failure (a real
# outage is out of scope here).
result = subprocess.run(
["docker", "pull", CLIENT_IMAGE],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False,
)
if result.returncode != 0:
raise unittest.SkipTest(f"could not pull {CLIENT_IMAGE}")
# Build the gate image once for the class. Layer cache makes
# repeated runs cheap.
build_git_gate_image()
def setUp(self):
# DNS hostnames on user-defined Docker networks max out at 63
# chars per label (RFC 1035). The full container name is
# `claude-bottle-git-gate-<slug>` = 23 + len(slug), so the slug
# has to stay under ~40 to be resolvable. Keep it short.
suffix = self.id().rsplit('.', 1)[-1].replace('_', '-')[-12:]
self.slug = f"t{os.getpid()}-{suffix}"
self.gate_name = ""
self.internal_net = ""
self.egress_net = ""
self.work_dir = Path(tempfile.mkdtemp())
def tearDown(self):
if self.gate_name:
DockerGitGate().stop(self.gate_name)
for n in (self.internal_net, self.egress_net):
if n:
network_remove(n)
shutil.rmtree(self.work_dir, ignore_errors=True)
def _start_gate(self, name: str = "foo") -> str:
"""Build a one-upstream gate and bring it up. Returns the
container name (== git-gate hostname on the internal net)."""
# Contents of the fake key don't matter for these tests — the
# rejection-path hook never reaches phase 2 where it would be
# used, and ls-remote doesn't push.
fake_key = self.work_dir / "fake-key"
fake_key.write_text("not-a-real-key\n")
manifest = Manifest.from_json_obj({
"bottles": {
"dev": {
"git": [{
"Name": name,
"Upstream": "ssh://git@upstream.invalid/path.git",
"IdentityFile": str(fake_key),
"KnownHostKey": "ssh-ed25519 AAAAEXAMPLE",
}],
},
},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
})
bottle = manifest.bottles["dev"]
gate = DockerGitGate()
prep = gate.prepare(bottle, self.slug, self.work_dir)
self.internal_net = network_create_internal(self.slug)
self.egress_net = network_create_egress(self.slug)
plan = dataclasses.replace(
prep,
internal_network=self.internal_net,
egress_network=self.egress_net,
)
self.gate_name = gate.start(plan)
return self.gate_name
@unittest.skipIf(
os.environ.get("GITEA_ACTIONS") == "true",
"skipped under act_runner: docker socket mount topology breaks "
"in-process visibility of networks created on the host daemon",
)
def test_ls_remote_fails_closed_when_upstream_unreachable(self):
"""The gate's access-hook runs `git fetch origin --prune` before
every upload-pack. With the fixture's deliberately unreachable
`ssh://git@upstream.invalid/...`, that fetch fails and the
hook exits 1; the daemon reports access-denied. Asserting
non-zero here is what proves the access-hook is wired: under
the v1 (push-only) design ls-remote against a fresh gate
returned exit 0 with no refs."""
gate = self._start_gate("foo")
# Daemon still has to bind first; retry the TCP connect a few
# times. The expected end state is a non-zero exit from the
# daemon's access-denied response — not a connection refused.
probe = subprocess.run(
["docker", "run", "--rm",
"--network", self.internal_net,
"--entrypoint", "sh",
CLIENT_IMAGE,
"-c",
f"for i in $(seq 1 15); do "
f" out=$(git ls-remote git://{gate}/foo.git 2>&1) && exit 99;"
f" case \"$out\" in *'access denied'*|*'not exported'*) "
f" echo \"$out\"; exit 1;; esac;"
f" sleep 1;"
f"done;"
f"echo TIMEOUT; exit 2"],
capture_output=True, text=True, timeout=60, check=False,
)
# exit 1: daemon access-denied as expected. exit 99 would mean
# ls-remote actually succeeded against the unreachable upstream
# (impossible — would indicate stale-data serving, the very
# thing the access-hook is meant to prevent).
self.assertEqual(
1, probe.returncode,
f"expected fail-closed access-denied; got "
f"exit={probe.returncode} stdout={probe.stdout!r} "
f"stderr={probe.stderr!r}",
)
@unittest.skipIf(
os.environ.get("GITEA_ACTIONS") == "true",
"skipped under act_runner: docker socket mount topology breaks "
"in-process visibility of networks created on the host daemon",
)
def test_push_with_secret_is_rejected(self):
"""The PRD 0008 success criterion: a push containing a
gitleaks-detectable secret is rejected; the hook's "gitleaks
rejected" line appears in the response, and git push exits
non-zero on the client side."""
gate = self._start_gate("foo")
push_script = (
"set -e\n"
"cd /tmp\n"
# Wait for git daemon to bind. Under the v1.1 design,
# ls-remote never returns 0 against an unreachable
# upstream (access-hook fail-closed), so we wait for *any*
# response (the daemon's access-denied line) as the
# readiness signal.
f"for i in $(seq 1 15); do "
f" out=$(git ls-remote git://{gate}/foo.git 2>&1) || true;"
f" case \"$out\" in *'remote error'*|*'access denied'*) break;; esac;"
f" sleep 1;"
f"done\n"
"git init -q -b main repo\n"
"cd repo\n"
"git config user.email test@example.com\n"
"git config user.name test\n"
f"echo '{FAKE_AWS_KEY}' > leak.txt\n"
"git add leak.txt\n"
"git commit -q -m leak\n"
f"git push git://{gate}/foo.git main 2>&1\n"
)
result = subprocess.run(
["docker", "run", "--rm",
"--network", self.internal_net,
"--entrypoint", "sh",
CLIENT_IMAGE,
"-c", push_script],
capture_output=True, text=True, timeout=120, check=False,
)
combined = result.stdout + result.stderr
self.assertNotEqual(
0, result.returncode,
f"expected push to fail; output={combined!r}",
)
# Hook's stderr is delivered to the client via the `remote:`
# prefix during a git push. Either token is enough to prove
# the pre-receive hook ran and rejected the push.
self.assertTrue(
"gitleaks rejected" in combined or "leaks found" in combined,
f"expected a gitleaks rejection in the response; got: {combined!r}",
)
if __name__ == "__main__":
unittest.main()
+10 -13
View File
@@ -1,8 +1,13 @@
"""Integration: the cleanup primitives the start-flow trap depends on
are idempotent. The original orphan-network bug was a trap-ordering
issue; the fix moved the install earlier. The trap is only safe if
network_remove and PipelockProxy.stop are no-ops against missing
resources."""
"""Integration: the network-cleanup primitives the start-flow trap
depends on are idempotent. The original orphan-network bug was a
trap-ordering issue; the fix moved the install earlier. The trap
is only safe if network_remove is a no-op against missing
resources.
The PipelockProxy.stop idempotency case that used to live here was
removed in PRD 0024 chunk 3 when the per-container .stop method
went away — sidecar teardown is now compose's responsibility, and
`compose down` already no-ops on missing containers."""
import os
import subprocess
@@ -13,10 +18,6 @@ from claude_bottle.backend.docker.network import (
network_create_internal,
network_remove,
)
from claude_bottle.backend.docker.pipelock import (
DockerPipelockProxy,
pipelock_container_name,
)
from tests._docker import skip_unless_docker
@@ -71,10 +72,6 @@ class TestOrphanCleanup(unittest.TestCase):
self.assertTrue(network_remove(self.internal_name))
self.assertTrue(network_remove(self.egress_name))
def test_pipelock_stop_missing_sidecar(self):
# Should not raise.
DockerPipelockProxy().stop(pipelock_container_name(f"missing-{self.slug}"))
if __name__ == "__main__":
unittest.main()
+5
View File
@@ -50,6 +50,11 @@ from tests.fixtures import fixture_minimal
"skipped under act_runner: pipelock_tls_init uses a host bind mount "
"that doesn't share fs with the runner container",
)
@unittest.skip(
"PRD 0024 chunk 3: the .start-based bringup helper this test used was "
"deleted. Chunk 4 rewrites the bringup with a direct `docker run` so "
"the apply_allowlist_change hot-reload retains integration coverage."
)
class TestPipelockApply(unittest.TestCase):
def setUp(self):
self.slug = f"cb-test-pla-{os.getpid()}-{int(time.time())}"
@@ -1,128 +0,0 @@
"""Integration: drive the production pipelock-sidecar bring-up
(`DockerPipelockProxy.prepare` → `.start`) and probe /health from a
sibling container on the same internal network. The point is that the
test exercises the production code path — if the docker create/cp/start
sequence in DockerPipelockProxy.start changes shape, this test should
notice.
We don't probe /health from the host because the sidecar is created
attached to an `--internal` network with no published port (that's
the production topology). An in-network curl container reaches it the
same way the agent container would in production.
"""
import dataclasses
import os
import shutil
import subprocess
import tempfile
import unittest
from pathlib import Path
from claude_bottle.backend.docker.network import (
network_create_egress,
network_create_internal,
network_remove,
)
from claude_bottle.backend.docker.pipelock import (
PIPELOCK_PORT,
DockerPipelockProxy,
pipelock_container_name,
pipelock_tls_init,
)
from tests._docker import skip_unless_docker
from tests.fixtures import fixture_minimal
CURL_IMAGE = "curlimages/curl:latest"
@skip_unless_docker()
class TestPipelockSidecarSmoke(unittest.TestCase):
@classmethod
def setUpClass(cls):
# Pre-pull curlimages/curl so the per-test retry loop isn't
# racing the registry. Skip cleanly if the pull fails (the
# canary suite will surface a real registry outage separately).
result = subprocess.run(
["docker", "pull", CURL_IMAGE],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
)
if result.returncode != 0:
raise unittest.SkipTest(f"could not pull {CURL_IMAGE}")
def setUp(self):
self.slug = f"cb-test-smoke-{os.getpid()}"
self.sidecar_name = ""
self.internal_net = ""
self.egress_net = ""
self.work_dir = Path(tempfile.mkdtemp())
def tearDown(self):
if self.sidecar_name:
DockerPipelockProxy().stop(self.sidecar_name)
for n in (self.internal_net, self.egress_net):
if n:
network_remove(n)
shutil.rmtree(self.work_dir, ignore_errors=True)
@unittest.skipIf(
os.environ.get("GITEA_ACTIONS") == "true",
"skipped under act_runner: docker socket mount topology breaks "
"in-process visibility of networks created on the host daemon",
)
def test_prepare_and_start_yield_healthy_sidecar(self):
proxy = DockerPipelockProxy()
prep = proxy.prepare(fixture_minimal().bottles["dev"], self.slug, self.work_dir)
self.internal_net = network_create_internal(self.slug)
self.egress_net = network_create_egress(self.slug)
# PRD 0006: pipelock's tls_interception block in the rendered
# YAML references in-container CA paths; .start docker-cp's
# those files in. The full launch flow generates the CA via
# `pipelock_tls_init`; this smoke test calls it directly.
ca_cert_host, ca_key_host = pipelock_tls_init(self.work_dir)
plan = dataclasses.replace(
prep,
internal_network=self.internal_net,
egress_network=self.egress_net,
ca_cert_host_path=ca_cert_host,
ca_key_host_path=ca_key_host,
)
self.sidecar_name = proxy.start(plan)
self.assertEqual(pipelock_container_name(self.slug), self.sidecar_name)
# Probe /health from a sibling container on the internal network —
# same access topology the agent container uses in production.
# curl retries on connection refused while pipelock is booting.
probe = subprocess.run(
[
"docker", "run", "--rm",
"--network", self.internal_net,
CURL_IMAGE,
"-sf", "--max-time", "2",
"--retry", "15",
"--retry-delay", "1",
"--retry-connrefused",
f"http://{self.sidecar_name}:{PIPELOCK_PORT}/health",
],
capture_output=True,
text=True,
timeout=60,
check=False,
)
self.assertEqual(
0, probe.returncode,
f"health probe failed: stdout={probe.stdout!r} stderr={probe.stderr!r}",
)
body = probe.stdout
self.assertIn('"status":"healthy"', body)
self.assertRegex(body, r'"version":"[0-9]+\.[0-9]+\.[0-9]+"')
if __name__ == "__main__":
unittest.main()
-307
View File
@@ -1,307 +0,0 @@
"""Integration: drive `DockerSupervise.start` against the supervise
sidecar and round-trip an MCP tool call through the queue (PRD 0013).
Topology mirrors production minimally: a per-bottle internal docker
network for the agent ↔ supervise leg, no egress network (supervise
doesn't make outbound calls). The "agent" is a curl container on the
internal net; the supervisor lives on the host (this test process)
and uses claude_bottle.cli.dashboard helpers to write Response files.
Verifies:
1. `tools/list` returns the three PRD 0013 tool names over real MCP
wire format.
2. A `tools/call` from the in-container agent blocks until the host
writes a Response to the queue; once written, the agent receives
the approval payload.
"""
from __future__ import annotations
import json
import os
import shutil
import subprocess
import tempfile
import threading
import time
import unittest
from pathlib import Path
from claude_bottle import supervise as _sv
from claude_bottle.backend.docker.network import (
network_create_internal,
network_remove,
)
from claude_bottle.backend.docker.supervise import (
DockerSupervise,
build_supervise_image,
supervise_container_name,
)
from claude_bottle.cli import dashboard
from claude_bottle.supervise import SupervisePlan, list_pending_proposals
from tests._docker import skip_unless_docker
CURL_IMAGE = "curlimages/curl:latest"
@skip_unless_docker()
class TestSuperviseSidecar(unittest.TestCase):
@classmethod
def setUpClass(cls):
r = subprocess.run(
["docker", "pull", CURL_IMAGE],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
)
if r.returncode != 0:
raise unittest.SkipTest(f"could not pull {CURL_IMAGE}")
build_supervise_image()
def setUp(self):
self.slug = f"cb-test-sv-{os.getpid()}-{int(time.time())}"
self.sidecar_name = ""
self.internal_net = ""
self.work_dir = Path(tempfile.mkdtemp(prefix="supervise-int."))
self.queue_dir = self.work_dir / "queue"
self.queue_dir.mkdir()
def tearDown(self):
if self.sidecar_name:
subprocess.run(
["docker", "rm", "-f", self.sidecar_name],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
)
if self.internal_net:
network_remove(self.internal_net)
shutil.rmtree(self.work_dir, ignore_errors=True)
def _require_bind_mount_sharing(self) -> None:
"""Skip if `docker run -v <host-path>:<container-path>` doesn't
share the filesystem between the test process and the spawned
container. In docker-in-docker CI (Gitea Actions runner with
host socket forwarded), bind-mount paths are resolved against
the outer host's fs, not the runner container's — so the
sidecar writes proposals to a dir the test process can't see.
Cached on the class so the probe runs once per test session."""
cached = getattr(type(self), "_bind_mount_ok", None)
if cached is True:
return
if cached is False:
self.skipTest(
"docker bind mounts don't share fs with this test process "
"(likely docker-in-docker); the supervise queue round-trip "
"requires real host fs sharing"
)
probe_dir = Path(tempfile.mkdtemp(prefix="supervise-bind-probe."))
try:
(probe_dir / "from-host").write_text("x")
r = subprocess.run(
[
"docker", "run", "--rm",
"-v", f"{probe_dir}:/probe",
"--entrypoint", "sh",
CURL_IMAGE,
"-c", "test -f /probe/from-host && touch /probe/from-container",
],
capture_output=True,
check=False,
)
ok = (
r.returncode == 0
and (probe_dir / "from-container").exists()
)
finally:
shutil.rmtree(probe_dir, ignore_errors=True)
type(self)._bind_mount_ok = ok
if not ok:
self.skipTest(
"docker bind mounts don't share fs with this test process "
"(likely docker-in-docker); the supervise queue round-trip "
"requires real host fs sharing"
)
def _bring_up_sidecar(self) -> None:
self.internal_net = network_create_internal(self.slug)
plan = SupervisePlan(
slug=self.slug,
queue_dir=self.queue_dir,
current_config_dir=self.work_dir / "current-config",
internal_network=self.internal_net,
)
# current_config_dir isn't bind-mounted into the sidecar, only
# the queue dir is. Create it for symmetry with production.
plan.current_config_dir.mkdir()
self.sidecar_name = DockerSupervise().start(plan)
# Block until the server is ready to answer (the container
# `docker start` returns immediately; python is still
# binding to the port).
deadline = time.monotonic() + 10.0
while time.monotonic() < deadline:
rc = subprocess.run(
[
"docker", "run", "--rm",
"--network", self.internal_net,
CURL_IMAGE,
"-fsS", "-o", "/dev/null",
"--max-time", "2",
f"http://{_sv.SUPERVISE_HOSTNAME}:{_sv.SUPERVISE_PORT}/health",
],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
).returncode
if rc == 0:
return
time.sleep(0.25)
raise AssertionError("supervise sidecar /health never came up")
def _curl_jsonrpc(self, body: dict[str, object]) -> dict[str, object]:
"""Invoke curl on the internal network to POST a JSON-RPC
request to the supervise sidecar and parse the response."""
payload = json.dumps(body)
result = subprocess.run(
[
"docker", "run", "--rm",
"--network", self.internal_net,
CURL_IMAGE,
"-sS", "--max-time", "30",
"-H", "Content-Type: application/json",
"-X", "POST",
"--data", payload,
f"http://{_sv.SUPERVISE_HOSTNAME}:{_sv.SUPERVISE_PORT}/",
],
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0:
raise AssertionError(
f"curl to supervise failed: {result.stderr}\n"
f"stdout: {result.stdout}"
)
return json.loads(result.stdout)
def test_tools_list_over_mcp(self):
self._bring_up_sidecar()
result = self._curl_jsonrpc(
{"jsonrpc": "2.0", "id": 1, "method": "tools/list"},
)
self.assertEqual(1, result["id"])
names = {t["name"] for t in result["result"]["tools"]}
self.assertEqual(
{
_sv.TOOL_EGRESS_BLOCK,
_sv.TOOL_PIPELOCK_BLOCK,
_sv.TOOL_CAPABILITY_BLOCK,
_sv.TOOL_LIST_EGRESS_ROUTES,
},
names,
)
def test_tools_call_round_trips_through_queue(self):
"""End-to-end: agent in the bottle calls egress-block;
the call blocks on the queue; the host approves via the
dashboard helpers; the agent receives the approval.
This test focuses on the supervise sidecar's queue + response
plumbing, not the egress apply path itself. The apply
function is stubbed so we don't need to bring up a real
egress sidecar (its docker lifecycle has its own
integration coverage)."""
self._require_bind_mount_sharing()
self._bring_up_sidecar()
# Stub the apply step. The dashboard's approve() calls
# add_route to docker-exec into the egress sidecar;
# this test isn't exercising the real sidecar, so patch it
# to a no-op that returns plausible before/after strings
# the audit-log writer can render.
from claude_bottle.cli import dashboard as _dash
original_apply = _dash.add_route
_dash.add_route = (
lambda slug, new: ("(stubbed before)", new)
)
captured: dict[str, object] = {}
def caller() -> None:
captured["response"] = self._curl_jsonrpc({
"jsonrpc": "2.0", "id": 7, "method": "tools/call",
"params": {
"name": _sv.TOOL_EGRESS_BLOCK,
"arguments": {
"host": "api.example.com",
"justification": "integration test",
},
},
})
t = threading.Thread(target=caller)
t.start()
try:
# Wait for the proposal to appear in the queue (the
# sidecar writes it before blocking on wait_for_response).
deadline = time.monotonic() + 10.0
qp = None
while time.monotonic() < deadline:
pending = list_pending_proposals(self.queue_dir)
if pending:
qp = dashboard.QueuedProposal(
proposal=pending[0], queue_dir=self.queue_dir,
)
break
time.sleep(0.1)
self.assertIsNotNone(qp, "proposal never appeared in queue")
assert qp is not None # type-narrowing
self.assertEqual(
_sv.TOOL_EGRESS_BLOCK, qp.proposal.tool,
)
self.assertEqual("integration test", qp.proposal.justification)
# Approve via the dashboard helper. The apply step (now
# stubbed) would docker-exec into the egress sidecar
# and SIGHUP it. The supervise sidecar sees the response
# file and returns to the curl caller.
dashboard.approve(qp, notes="lgtm from integration test")
finally:
_dash.add_route = original_apply
t.join(timeout=20)
response = captured.get("response")
self.assertIsNotNone(response, "curl thread never produced a response")
assert isinstance(response, dict) # type-narrowing
self.assertEqual(7, response["id"])
result = response["result"]
assert isinstance(result, dict)
self.assertFalse(result.get("isError"))
text = result["content"][0]["text"]
self.assertIn("status: approved", text)
self.assertIn("notes: lgtm from integration test", text)
def test_orphan_sidecar_name_collision_recovered(self):
"""An orphan supervise sidecar from a previous run blocks
the next .start with a duplicate-name error. Documents the
observed behavior so a future change that adds auto-cleanup
can flip the assertion."""
self._bring_up_sidecar()
self.assertEqual(supervise_container_name(self.slug), self.sidecar_name)
# Second .start should fail because the container name is
# taken. cleanup is handled by the orphan probe in prepare.py
# (tested separately in test_orphan_cleanup).
with self.assertRaises(SystemExit):
DockerSupervise().start(SupervisePlan(
slug=self.slug,
queue_dir=self.queue_dir,
current_config_dir=self.work_dir / "current-config",
internal_network=self.internal_net,
))
if __name__ == "__main__":
unittest.main()