d3115ae5fd
Fourth and final step of PRD 0006. Two new end-to-end tests pin the two paths through pipelock's tls_interception layer. - test_pipelock_blocks_secret_https_post: posts a GitHub-PAT-shaped body to api.anthropic.com over HTTPS through the bottle. With pipelock now bumping the CONNECT and seeing the decrypted body, it returns 403 with the documented `blocked: request body contains secret: GitHub Token` body. The probe is a single curl invocation — curl natively does CONNECT through HTTPS_PROXY, the agent's trust store now contains pipelock's CA, no hand-rolled TLS in the test. - test_pipelock_allows_normal_https: GETs git's README from raw.githubusercontent.com (a baked-in allowlist host). 200 + non-zero body length proves the full chain works: pipelock_tls_init → docker cp of CA into sidecar → bumped CONNECT → provision_ca installed CA in agent → curl trusts pipelock's bumped leaf → body forwarded back through the tunnel. - test_pipelock_sidecar_smoke: pre-existing direct-start smoke test updated to call pipelock_tls_init and populate the CA paths on the plan. (The full launch flow does this in launch.py; this test exercises the proxy class in isolation.) Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
97 lines
3.7 KiB
Python
97 lines
3.7 KiB
Python
"""Integration: with pipelock's tls_interception enabled (PRD 0006),
|
|
a credential POST sent over HTTPS is blocked by pipelock's body-scan
|
|
layer — closing the gap that motivated this PRD.
|
|
|
|
End-to-end: drives `BottleBackend.prepare → launch` so the real
|
|
image build, network plumbing, pipelock_tls_init, sidecar bring-up,
|
|
and provision_ca (CA install in the agent's trust store) are all in
|
|
the loop. The probe is a single `curl --proxy "$HTTPS_PROXY" -X POST
|
|
... https://api.anthropic.com/...` — curl natively does CONNECT
|
|
through the proxy, the agent's trust store now contains pipelock's
|
|
per-bottle CA so curl trusts pipelock's bumped leaf, and pipelock
|
|
sees the decrypted body and returns its known
|
|
`blocked: request body contains secret: <pattern>` 403."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import shutil
|
|
import tempfile
|
|
import unittest
|
|
from pathlib import Path
|
|
|
|
from claude_bottle.backend import BottleSpec, get_bottle_backend
|
|
from claude_bottle.manifest import Manifest
|
|
from tests._docker import skip_unless_docker
|
|
|
|
|
|
# Synthetic value shaped like a GitHub Personal Access Token; not a
|
|
# real credential. Carried into the bottle as an env var so the
|
|
# probe shell can read it via $FAKE_TOKEN without ever interpolating
|
|
# the value on the bash `bottle.exec` argv.
|
|
_FAKE_TOKEN = "ghp_aB3cD4eF5gH6iJ7kL8mN9oP0qR1sT2uV3wX4yZ"
|
|
|
|
|
|
@skip_unless_docker()
|
|
class TestPipelockBlocksSecretHttpsPost(unittest.TestCase):
|
|
@unittest.skipIf(
|
|
os.environ.get("GITEA_ACTIONS") == "true",
|
|
"skipped under act_runner: docker socket mount topology breaks "
|
|
"in-process visibility of networks created on the host daemon",
|
|
)
|
|
def test_https_post_with_credential_body_is_blocked(self):
|
|
manifest = Manifest.from_json_obj({
|
|
"bottles": {
|
|
"dev": {"env": {"FAKE_TOKEN": _FAKE_TOKEN}},
|
|
},
|
|
"agents": {
|
|
"demo": {"skills": [], "prompt": "", "bottle": "dev"},
|
|
},
|
|
})
|
|
backend = get_bottle_backend()
|
|
stage_dir = Path(tempfile.mkdtemp(prefix="cb-test-stage."))
|
|
try:
|
|
spec = BottleSpec(
|
|
manifest=manifest,
|
|
agent_name="demo",
|
|
copy_cwd=False,
|
|
user_cwd=str(stage_dir),
|
|
forward_oauth_token=False,
|
|
)
|
|
plan = backend.prepare(spec, stage_dir=stage_dir)
|
|
with backend.launch(plan) as bottle:
|
|
script = (
|
|
"set -eu\n"
|
|
'curl --proxy "$HTTPS_PROXY" -s --max-time 8 \\\n'
|
|
" -w 'status=%{http_code}\\n' \\\n"
|
|
" -o /tmp/probe-body.txt \\\n"
|
|
' -X POST -d "token=$FAKE_TOKEN" \\\n'
|
|
" https://api.anthropic.com/dlp-probe\n"
|
|
'echo "body=$(head -c 200 /tmp/probe-body.txt)"\n'
|
|
)
|
|
result = bottle.exec(script)
|
|
finally:
|
|
shutil.rmtree(stage_dir, ignore_errors=True)
|
|
|
|
self.assertEqual(
|
|
0, result.returncode,
|
|
f"exec wrapper failed: stdout={result.stdout!r} stderr={result.stderr!r}",
|
|
)
|
|
# Pipelock's body-scan block returns 403 with a plain-text
|
|
# body starting `blocked: ` (pinned empirically; see
|
|
# tests/unit/test_mitmproxy_verdict.py for the
|
|
# corresponding-fingerprint test, retained from PR #8 as
|
|
# general pipelock-block-shape coverage).
|
|
self.assertIn(
|
|
"status=403", result.stdout,
|
|
f"expected 403 from pipelock; got: {result.stdout!r}",
|
|
)
|
|
self.assertIn(
|
|
"body=blocked: ", result.stdout,
|
|
f"expected pipelock block body; got: {result.stdout!r}",
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|