test(pipelock): HTTPS integration tests for the bumped path
test / unit (pull_request) Successful in 11s
test / integration (pull_request) Successful in 13s

Fourth and final step of PRD 0006. Two new end-to-end tests pin
the two paths through pipelock's tls_interception layer.

- test_pipelock_blocks_secret_https_post: posts a GitHub-PAT-shaped
  body to api.anthropic.com over HTTPS through the bottle. With
  pipelock now bumping the CONNECT and seeing the decrypted body,
  it returns 403 with the documented `blocked: request body
  contains secret: GitHub Token` body. The probe is a single curl
  invocation — curl natively does CONNECT through HTTPS_PROXY, the
  agent's trust store now contains pipelock's CA, no hand-rolled
  TLS in the test.

- test_pipelock_allows_normal_https: GETs git's README from
  raw.githubusercontent.com (a baked-in allowlist host). 200 +
  non-zero body length proves the full chain works:
  pipelock_tls_init → docker cp of CA into sidecar → bumped CONNECT
  → provision_ca installed CA in agent → curl trusts pipelock's
  bumped leaf → body forwarded back through the tunnel.

- test_pipelock_sidecar_smoke: pre-existing direct-start smoke
  test updated to call pipelock_tls_init and populate the CA
  paths on the plan. (The full launch flow does this in launch.py;
  this test exercises the proxy class in isolation.)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-12 15:01:17 -04:00
parent fb10c8dd8a
commit d3115ae5fd
3 changed files with 188 additions and 0 deletions
@@ -0,0 +1,84 @@
"""Integration: with pipelock's tls_interception enabled (PRD 0006),
a clean HTTPS GET to an allowlisted host succeeds end-to-end through
the bumped tunnel.
Complement to test_pipelock_blocks_secret_https_post — together they
pin pipelock's two paths (block on body match, allow on clean
traffic). This test is also the implicit TLS-trust check: if
provision_ca had failed to install pipelock's CA into the agent's
trust store, curl would have rejected the bumped leaf cert and the
fetch would have failed before any HTTP response could come back."""
from __future__ import annotations
import os
import shutil
import tempfile
import unittest
from pathlib import Path
from claude_bottle.backend import BottleSpec, get_bottle_backend
from tests._docker import skip_unless_docker
from tests.fixtures import fixture_minimal
# raw.githubusercontent.com is in the baked-in DEFAULT_ALLOWLIST.
# `git`'s own README on the master branch is a long-lived raw file
# (~3 KB) that any CI runner with internet can fetch.
_TARGET_URL = "https://raw.githubusercontent.com/git/git/master/README.md"
@skip_unless_docker()
class TestPipelockAllowsNormalHttps(unittest.TestCase):
@unittest.skipIf(
os.environ.get("GITEA_ACTIONS") == "true",
"skipped under act_runner: docker socket mount topology breaks "
"in-process visibility of networks created on the host daemon",
)
def test_https_get_to_allowed_host_succeeds(self):
backend = get_bottle_backend()
stage_dir = Path(tempfile.mkdtemp(prefix="cb-test-stage."))
try:
spec = BottleSpec(
manifest=fixture_minimal(),
agent_name="demo",
copy_cwd=False,
user_cwd=str(stage_dir),
forward_oauth_token=False,
)
plan = backend.prepare(spec, stage_dir=stage_dir)
with backend.launch(plan) as bottle:
script = (
"set -eu\n"
'curl --proxy "$HTTPS_PROXY" -s --max-time 10 \\\n'
" -w 'status=%{http_code}\\n' \\\n"
" -o /tmp/probe-body.txt \\\n"
f" {_TARGET_URL}\n"
'echo "len=$(wc -c < /tmp/probe-body.txt)"\n'
)
result = bottle.exec(script)
finally:
shutil.rmtree(stage_dir, ignore_errors=True)
self.assertEqual(
0, result.returncode,
f"exec wrapper failed: stdout={result.stdout!r} stderr={result.stderr!r}",
)
# 200 from the upstream (pipelock forwarded after the body
# scan passed). If curl had failed the bumped-cert trust
# check, the exit code or status would be non-200 here.
self.assertIn(
"status=200", result.stdout,
f"expected 200 from raw.githubusercontent.com; got: {result.stdout!r}",
)
# The git README is ~3 KB. Anything substantially non-zero
# proves the response body actually transferred — i.e. the
# CONNECT tunnel + bumped TLS + body forwarding all worked.
self.assertNotIn(
"len=0\n", result.stdout,
f"response body was empty: {result.stdout!r}",
)
if __name__ == "__main__":
unittest.main()
@@ -0,0 +1,96 @@
"""Integration: with pipelock's tls_interception enabled (PRD 0006),
a credential POST sent over HTTPS is blocked by pipelock's body-scan
layer — closing the gap that motivated this PRD.
End-to-end: drives `BottleBackend.prepare → launch` so the real
image build, network plumbing, pipelock_tls_init, sidecar bring-up,
and provision_ca (CA install in the agent's trust store) are all in
the loop. The probe is a single `curl --proxy "$HTTPS_PROXY" -X POST
... https://api.anthropic.com/...` — curl natively does CONNECT
through the proxy, the agent's trust store now contains pipelock's
per-bottle CA so curl trusts pipelock's bumped leaf, and pipelock
sees the decrypted body and returns its known
`blocked: request body contains secret: <pattern>` 403."""
from __future__ import annotations
import os
import shutil
import tempfile
import unittest
from pathlib import Path
from claude_bottle.backend import BottleSpec, get_bottle_backend
from claude_bottle.manifest import Manifest
from tests._docker import skip_unless_docker
# Synthetic value shaped like a GitHub Personal Access Token; not a
# real credential. Carried into the bottle as an env var so the
# probe shell can read it via $FAKE_TOKEN without ever interpolating
# the value on the bash `bottle.exec` argv.
_FAKE_TOKEN = "ghp_aB3cD4eF5gH6iJ7kL8mN9oP0qR1sT2uV3wX4yZ"
@skip_unless_docker()
class TestPipelockBlocksSecretHttpsPost(unittest.TestCase):
@unittest.skipIf(
os.environ.get("GITEA_ACTIONS") == "true",
"skipped under act_runner: docker socket mount topology breaks "
"in-process visibility of networks created on the host daemon",
)
def test_https_post_with_credential_body_is_blocked(self):
manifest = Manifest.from_json_obj({
"bottles": {
"dev": {"env": {"FAKE_TOKEN": _FAKE_TOKEN}},
},
"agents": {
"demo": {"skills": [], "prompt": "", "bottle": "dev"},
},
})
backend = get_bottle_backend()
stage_dir = Path(tempfile.mkdtemp(prefix="cb-test-stage."))
try:
spec = BottleSpec(
manifest=manifest,
agent_name="demo",
copy_cwd=False,
user_cwd=str(stage_dir),
forward_oauth_token=False,
)
plan = backend.prepare(spec, stage_dir=stage_dir)
with backend.launch(plan) as bottle:
script = (
"set -eu\n"
'curl --proxy "$HTTPS_PROXY" -s --max-time 8 \\\n'
" -w 'status=%{http_code}\\n' \\\n"
" -o /tmp/probe-body.txt \\\n"
' -X POST -d "token=$FAKE_TOKEN" \\\n'
" https://api.anthropic.com/dlp-probe\n"
'echo "body=$(head -c 200 /tmp/probe-body.txt)"\n'
)
result = bottle.exec(script)
finally:
shutil.rmtree(stage_dir, ignore_errors=True)
self.assertEqual(
0, result.returncode,
f"exec wrapper failed: stdout={result.stdout!r} stderr={result.stderr!r}",
)
# Pipelock's body-scan block returns 403 with a plain-text
# body starting `blocked: ` (pinned empirically; see
# tests/unit/test_mitmproxy_verdict.py for the
# corresponding-fingerprint test, retained from PR #8 as
# general pipelock-block-shape coverage).
self.assertIn(
"status=403", result.stdout,
f"expected 403 from pipelock; got: {result.stdout!r}",
)
self.assertIn(
"body=blocked: ", result.stdout,
f"expected pipelock block body; got: {result.stdout!r}",
)
if __name__ == "__main__":
unittest.main()
@@ -28,6 +28,7 @@ from claude_bottle.backend.docker.pipelock import (
PIPELOCK_PORT,
DockerPipelockProxy,
pipelock_container_name,
pipelock_tls_init,
)
from tests._docker import skip_unless_docker
from tests.fixtures import fixture_minimal
@@ -79,10 +80,17 @@ class TestPipelockSidecarSmoke(unittest.TestCase):
self.internal_net = network_create_internal(self.slug)
self.egress_net = network_create_egress(self.slug)
# PRD 0006: pipelock's tls_interception block in the rendered
# YAML references in-container CA paths; .start docker-cp's
# those files in. The full launch flow generates the CA via
# `pipelock_tls_init`; this smoke test calls it directly.
ca_cert_host, ca_key_host = pipelock_tls_init(self.work_dir)
plan = dataclasses.replace(
prep,
internal_network=self.internal_net,
egress_network=self.egress_net,
ca_cert_host_path=ca_cert_host,
ca_key_host_path=ca_key_host,
)
self.sidecar_name = proxy.start(plan)