4ef1cc58df
scripts/demo.sh + scripts/demo_harness.py drive a real bottle through four probes (pipelock allow, host-allowlist block, DLP body-scan block, git-gate gitleaks rejection). docs/demo.tape is the VHS source that renders docs/demo.gif, embedded at the top of the README as a working proof of the security model the prose describes. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
249 lines
8.1 KiB
Python
249 lines
8.1 KiB
Python
"""End-to-end demo: spin up one bottle and show pipelock + git-gate
|
|
in action across four scenarios. Mirrors the integration tests in
|
|
tests/integration/ but with paced output suitable for screen
|
|
recording. Run with `bash scripts/demo.sh` for a banner-wrapped
|
|
session; this file is also runnable directly as `python -u
|
|
scripts/demo_harness.py`.
|
|
|
|
The bottle declares one `git` upstream (unreachable on purpose: the
|
|
gitleaks pre-receive hook runs before the gate would try to forward
|
|
to the real upstream) and one `FAKE_TOKEN` env var whose value has
|
|
the shape of a GitHub PAT so pipelock's DLP layer recognizes it.
|
|
|
|
All four probes run inside the same bottle so the demo proves the
|
|
same agent that can reach `api.anthropic.com` is the one that gets
|
|
blocked from reaching `example.com` and from posting credentials
|
|
anywhere.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
from pathlib import Path
|
|
|
|
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
|
|
|
from claude_bottle.backend import BottleSpec, get_bottle_backend # noqa: E402
|
|
from claude_bottle.manifest import Manifest # noqa: E402
|
|
|
|
|
|
FAKE_TOKEN = "ghp_aB3cD4eF5gH6iJ7kL8mN9oP0qR1sT2uV3wX4yZ"
|
|
FAKE_AWS_KEY = "AKIAQRJHK7N5ZPM2VXTL"
|
|
|
|
ALLOW_TARGET = "https://raw.githubusercontent.com/git/git/master/README.md"
|
|
BLOCK_HOST = "example.com"
|
|
DLP_TARGET_HOST = "api.anthropic.com"
|
|
|
|
|
|
def banner(n: int, total: int, title: str) -> None:
|
|
bar = "━" * 60
|
|
print(f"\n\033[1;36m{bar}\033[0m")
|
|
print(f"\033[1;36mScenario {n}/{total}\033[0m \033[1m{title}\033[0m")
|
|
print(f"\033[1;36m{bar}\033[0m")
|
|
sys.stdout.flush()
|
|
|
|
|
|
def cmd(s: str) -> None:
|
|
print(f"\033[2m$ {s}\033[0m")
|
|
sys.stdout.flush()
|
|
|
|
|
|
def verdict(passed: bool, msg: str) -> None:
|
|
if passed:
|
|
print(f"\033[1;32m✓ {msg}\033[0m")
|
|
else:
|
|
print(f"\033[1;31m✗ {msg}\033[0m")
|
|
sys.stdout.flush()
|
|
|
|
|
|
def pause(seconds: float = 1.2) -> None:
|
|
time.sleep(seconds)
|
|
|
|
|
|
def scenario_allow(bottle) -> bool:
|
|
banner(1, 4, f"Allowlisted HTTPS GET → {ALLOW_TARGET.split('/')[2]}")
|
|
cmd(f'curl --proxy "$HTTPS_PROXY" {ALLOW_TARGET}')
|
|
script = (
|
|
"set -eu\n"
|
|
'curl --proxy "$HTTPS_PROXY" -s --max-time 10 '
|
|
"-w 'status=%{http_code} ' -o /tmp/body.txt "
|
|
f"{ALLOW_TARGET}\n"
|
|
'echo "len=$(wc -c < /tmp/body.txt)"\n'
|
|
)
|
|
result = bottle.exec(script)
|
|
print(result.stdout.rstrip())
|
|
ok = "status=200" in result.stdout and "len=0" not in result.stdout
|
|
verdict(ok, "forwarded — the proxy isn't just blocking everything")
|
|
return ok
|
|
|
|
|
|
_HTTP_PROBE = r"""
|
|
const http = require('http');
|
|
const proxy = new URL(process.env.HTTPS_PROXY);
|
|
const target = process.env.PROBE_TARGET; // absolute http URL
|
|
const body = process.env.PROBE_BODY || '';
|
|
const method = process.env.PROBE_METHOD || 'GET';
|
|
const url = new URL(target);
|
|
const opts = {
|
|
host: proxy.hostname, port: proxy.port, method,
|
|
path: target,
|
|
headers: { Host: url.host },
|
|
};
|
|
if (body) {
|
|
opts.headers['Content-Type'] = 'application/x-www-form-urlencoded';
|
|
opts.headers['Content-Length'] = Buffer.byteLength(body);
|
|
}
|
|
const req = http.request(opts, (res) => {
|
|
res.resume();
|
|
res.on('end', () => { console.log('status=' + res.statusCode); process.exit(0); });
|
|
});
|
|
req.on('error', (e) => { console.log('error=' + (e.code||'') + ' ' + e.message); process.exit(0); });
|
|
req.setTimeout(5000, () => { console.log('timeout'); req.destroy(); });
|
|
if (body) req.write(body);
|
|
req.end();
|
|
"""
|
|
|
|
|
|
def _node_probe(bottle, *, target: str, method: str = "GET", body: str = ""):
|
|
script = (
|
|
"set -e\n"
|
|
"cat > /tmp/probe.js <<'PROBE_EOF'\n"
|
|
f"{_HTTP_PROBE}\n"
|
|
"PROBE_EOF\n"
|
|
f"PROBE_TARGET={target!r} PROBE_METHOD={method!r} "
|
|
f"PROBE_BODY={body!r} node /tmp/probe.js\n"
|
|
)
|
|
return bottle.exec(script)
|
|
|
|
|
|
def scenario_block_host(bottle) -> bool:
|
|
banner(2, 4, f"Non-allowlisted host → GET http://{BLOCK_HOST}/")
|
|
cmd(f"node probe.js # GET http://{BLOCK_HOST}/")
|
|
result = _node_probe(bottle, target=f"http://{BLOCK_HOST}/")
|
|
print(result.stdout.rstrip())
|
|
ok = "status=200" not in result.stdout
|
|
verdict(ok, f"blocked at the host allowlist — DLP never had to run")
|
|
return ok
|
|
|
|
|
|
def scenario_block_dlp(bottle) -> bool:
|
|
banner(
|
|
3, 4,
|
|
f"Allowlisted host + secret in body → POST http://{DLP_TARGET_HOST}/dlp-probe",
|
|
)
|
|
cmd(f"node probe.js # POST token=ghp_… to http://{DLP_TARGET_HOST}/dlp-probe")
|
|
result = _node_probe(
|
|
bottle,
|
|
target=f"http://{DLP_TARGET_HOST}/dlp-probe",
|
|
method="POST",
|
|
body=f"token={FAKE_TOKEN}",
|
|
)
|
|
print(result.stdout.rstrip())
|
|
ok = "status=403" in result.stdout
|
|
verdict(
|
|
ok,
|
|
"blocked by pipelock DLP — host was allowed, body matched a credential pattern",
|
|
)
|
|
return ok
|
|
|
|
|
|
def scenario_block_git_push(bottle) -> bool:
|
|
banner(4, 4, "git push of a file containing AKIA… → git-gate")
|
|
cmd("echo AKIA… > leak.txt && git commit -m leak && git push")
|
|
push_script = (
|
|
"set -e\n"
|
|
"cd /tmp\n"
|
|
"rm -rf repo && git init -q -b main repo && cd repo\n"
|
|
"git config user.email demo@example.com\n"
|
|
"git config user.name demo\n"
|
|
f"echo '{FAKE_AWS_KEY}' > leak.txt\n"
|
|
"git add leak.txt\n"
|
|
"git commit -q -m 'oops: hardcoded key'\n"
|
|
# `~/.gitconfig`'s insteadOf rewrites this to git://<gate>/foo.git.
|
|
# Retry briefly because git-daemon takes a moment to bind after
|
|
# the gate container starts.
|
|
"for i in $(seq 1 15); do\n"
|
|
" out=$(git push ssh://git@upstream.invalid/path.git main 2>&1) && echo \"$out\" && exit 0\n"
|
|
" case \"$out\" in *'gitleaks'*|*'leaks found'*|*'rejected'*) echo \"$out\"; exit 1;; esac\n"
|
|
" sleep 1\n"
|
|
"done\n"
|
|
"echo TIMEOUT_WAITING_FOR_GATE; exit 2\n"
|
|
)
|
|
result = bottle.exec(push_script)
|
|
out = (result.stdout + result.stderr).rstrip()
|
|
# Only print the gitleaks-relevant tail; raw git stderr is noisy.
|
|
tail = "\n".join(out.splitlines()[-8:])
|
|
print(tail)
|
|
ok = result.returncode != 0 and (
|
|
"gitleaks rejected" in out or "leaks found" in out
|
|
)
|
|
verdict(
|
|
ok,
|
|
"rejected by git-gate pre-receive hook — upstream never saw the ref",
|
|
)
|
|
return ok
|
|
|
|
|
|
def main() -> int:
|
|
stage_dir = Path(tempfile.mkdtemp(prefix="cb-demo-stage."))
|
|
fake_key = stage_dir / "fake-key"
|
|
fake_key.write_text("not-a-real-key\n")
|
|
|
|
manifest = Manifest.from_json_obj({
|
|
"bottles": {
|
|
"demo": {
|
|
"env": {"FAKE_TOKEN": FAKE_TOKEN},
|
|
"git": [{
|
|
"Name": "foo",
|
|
"Upstream": "ssh://git@upstream.invalid/path.git",
|
|
"IdentityFile": str(fake_key),
|
|
"KnownHostKey": "ssh-ed25519 AAAAEXAMPLE",
|
|
}],
|
|
},
|
|
},
|
|
"agents": {
|
|
"demo": {"skills": [], "prompt": "", "bottle": "demo"},
|
|
},
|
|
})
|
|
|
|
print("\033[1mclaude-bottle demo: pipelock + git-gate, four probes, one bottle\033[0m")
|
|
print(
|
|
"\033[2mbuilding image graph, starting bottle and sidecars…\033[0m"
|
|
)
|
|
sys.stdout.flush()
|
|
|
|
backend = get_bottle_backend()
|
|
spec = BottleSpec(
|
|
manifest=manifest,
|
|
agent_name="demo",
|
|
copy_cwd=False,
|
|
user_cwd=str(stage_dir),
|
|
forward_oauth_token=False,
|
|
)
|
|
plan = backend.prepare(spec, stage_dir=stage_dir)
|
|
results: list[bool] = []
|
|
with backend.launch(plan) as bottle:
|
|
pause(0.6)
|
|
results.append(scenario_allow(bottle))
|
|
pause(2.0)
|
|
results.append(scenario_block_host(bottle))
|
|
pause(2.0)
|
|
results.append(scenario_block_dlp(bottle))
|
|
pause(2.0)
|
|
results.append(scenario_block_git_push(bottle))
|
|
pause(1.0)
|
|
|
|
bar = "━" * 60
|
|
print(f"\n\033[1;36m{bar}\033[0m")
|
|
passed = sum(1 for r in results if r)
|
|
color = "32" if passed == len(results) else "31"
|
|
print(f"\033[1;{color}m{passed}/{len(results)} scenarios passed\033[0m")
|
|
print(f"\033[1;36m{bar}\033[0m")
|
|
return 0 if passed == len(results) else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|