From 427ef96e3f33860e1ad0cb34028901553426534b Mon Sep 17 00:00:00 2001 From: didericis Date: Tue, 12 May 2026 11:39:25 -0400 Subject: [PATCH] feat(pipelock): enforce DLP body-scan hits by default Adds bottle.egress.dlp_action ("block" | "warn", default block) and wires it into pipelock as request_body_scanning.action. Pipelock's own default is "warn", which previously meant claude-bottle detected credential patterns in outbound bodies but forwarded the request anyway. The matching integration test posts a manifest env var shaped like a GitHub PAT to api.anthropic.com via plain HTTP forward proxy so pipelock can see the body. Pipelock answers 403 from its body-scan layer instead of forwarding to the upstream. Behavior change: bottles without an explicit egress.dlp_action now block on body-scan hits. Set egress.dlp_action: "warn" to restore the prior detect-only behavior. Co-Authored-By: Claude Opus 4.7 --- claude_bottle/manifest.py | 52 +++++-- claude_bottle/pipelock.py | 10 ++ .../test_pipelock_blocks_secret_post.py | 133 ++++++++++++++++++ tests/unit/test_pipelock_yaml.py | 5 + 4 files changed, 186 insertions(+), 14 deletions(-) create mode 100644 tests/integration/test_pipelock_blocks_secret_post.py diff --git a/claude_bottle/manifest.py b/claude_bottle/manifest.py index d86bd4f..eed15d3 100644 --- a/claude_bottle/manifest.py +++ b/claude_bottle/manifest.py @@ -79,31 +79,55 @@ class SshEntry: ) +DLP_ACTIONS = ("block", "warn") + + @dataclass(frozen=True) class BottleEgress: allowlist: tuple[str, ...] = () + # Action pipelock takes when its DLP layer matches a credential + # pattern in a request body. "block" → 403 from the proxy, the + # request never leaves the egress network. "warn" → forward the + # request and emit a log line. Default is "block": detect-only + # would let real secrets escape under the agent's compromised + # tooling, which is the threat model claude-bottle was built for. + dlp_action: str = "block" @classmethod def from_dict(cls, bottle_name: str, raw: object) -> "BottleEgress": d = _as_json_object(raw, f"bottle '{bottle_name}' egress") allow = d.get("allowlist") - if allow is None: - return cls() - if not isinstance(allow, list): - die( - f"bottle '{bottle_name}' egress.allowlist must be an array " - f"(was {type(allow).__name__})" - ) items: list[str] = [] - allow_list = cast(list[object], allow) - for i, host in enumerate(allow_list): - if not isinstance(host, str): + if allow is not None: + if not isinstance(allow, list): die( - f"bottle '{bottle_name}' egress.allowlist[{i}] must be a string " - f"(was {type(host).__name__})" + f"bottle '{bottle_name}' egress.allowlist must be an array " + f"(was {type(allow).__name__})" ) - items.append(host) - return cls(allowlist=tuple(items)) + allow_list = cast(list[object], allow) + for i, host in enumerate(allow_list): + if not isinstance(host, str): + die( + f"bottle '{bottle_name}' egress.allowlist[{i}] must be a string " + f"(was {type(host).__name__})" + ) + items.append(host) + dlp_action_raw = d.get("dlp_action") + if dlp_action_raw is None: + dlp_action = "block" + elif isinstance(dlp_action_raw, str): + if dlp_action_raw not in DLP_ACTIONS: + die( + f"bottle '{bottle_name}' egress.dlp_action must be one of " + f"{', '.join(DLP_ACTIONS)} (was {dlp_action_raw!r})" + ) + dlp_action = dlp_action_raw + else: + die( + f"bottle '{bottle_name}' egress.dlp_action must be a string " + f"(was {type(dlp_action_raw).__name__})" + ) + return cls(allowlist=tuple(items), dlp_action=dlp_action) @dataclass(frozen=True) diff --git a/claude_bottle/pipelock.py b/claude_bottle/pipelock.py index 2a28041..2c0aa7f 100644 --- a/claude_bottle/pipelock.py +++ b/claude_bottle/pipelock.py @@ -110,6 +110,12 @@ def pipelock_build_config(bottle: Bottle) -> dict[str, object]: if ip_cidrs: cfg["ssrf"] = {"ip_allowlist": ip_cidrs} cfg["dlp"] = {"include_defaults": True, "scan_env": True} + # Body-scan enforcement is a separate pipelock section (each DLP + # "surface" — body, MCP, response — has its own action). Pipelock's + # built-in default for request_body_scanning is "warn" (forward + # with a log line); claude-bottle's default is "block" so a hit + # actually stops the request from leaving the egress network. + cfg["request_body_scanning"] = {"action": bottle.egress.dlp_action} return cfg @@ -149,6 +155,10 @@ def pipelock_render_yaml(cfg: dict[str, object]) -> str: dlp = cast(dict[str, object], cfg["dlp"]) lines.append(f" include_defaults: {_bool(dlp['include_defaults'])}") lines.append(f" scan_env: {_bool(dlp['scan_env'])}") + lines.append("") + lines.append("request_body_scanning:") + rbs = cast(dict[str, object], cfg["request_body_scanning"]) + lines.append(f' action: "{rbs["action"]}"') return "\n".join(lines) + "\n" diff --git a/tests/integration/test_pipelock_blocks_secret_post.py b/tests/integration/test_pipelock_blocks_secret_post.py new file mode 100644 index 0000000..6d6fb72 --- /dev/null +++ b/tests/integration/test_pipelock_blocks_secret_post.py @@ -0,0 +1,133 @@ +"""Integration: pipelock blocks a POST whose body carries a +recognized credential pattern, even when the host is on the +allowlist. + +End-to-end companion to the block / allow node tests. The manifest +carries a literal env var whose value matches pipelock's DLP rules. +A Node script POSTs that value to an allowlisted host via plain +HTTP forward proxy (absolute-URI form) so pipelock can scan the +body — routing the same request over CONNECT would tunnel TLS +opaquely and the DLP layer would have nothing to see. The 403 +return from pipelock isolates the body-scan layer as the active +control, distinct from the host-allowlist decision the other two +tests pin down. +""" + +from __future__ import annotations + +import os +import shutil +import tempfile +import unittest +from pathlib import Path + +from claude_bottle.backend import BottleSpec, get_bottle_backend +from claude_bottle.manifest import Manifest +from tests._docker import skip_unless_docker + + +# Synthetic value shaped like a GitHub Personal Access Token +# (`ghp_` + 36 alnum chars). Not a real token; the only relevant +# property is that pipelock's default DLP rules recognize the +# shape. Kept obviously dummy so a stray grep can't mistake it +# for a real credential. +_FAKE_TOKEN = "ghp_aB3cD4eF5gH6iJ7kL8mN9oP0qR1sT2uV3wX4yZ" + + +# Output contract (parsed by the test): +# - "status=" proxy answered with an HTTP response +# - "error= " transport-level failure +# - "timeout" request hung +_PROBE_JS = r""" +const http = require('http'); +const proxy = new URL(process.env.HTTPS_PROXY); +const body = 'token=' + process.env.FAKE_TOKEN; +const req = http.request({ + host: proxy.hostname, + port: proxy.port, + method: 'POST', + // Absolute-URI form: pipelock acts as a plain HTTP forward proxy + // and the body is visible to its DLP scanner. CONNECT would + // tunnel TLS bytes that pipelock can't see into. + path: 'http://api.anthropic.com/dlp-probe', + headers: { + Host: 'api.anthropic.com', + 'Content-Type': 'application/x-www-form-urlencoded', + 'Content-Length': Buffer.byteLength(body), + }, +}, (res) => { + res.resume(); + res.on('end', () => { + console.log('status=' + res.statusCode); + process.exit(0); + }); +}); +req.on('error', (e) => { + console.log('error=' + (e.code || '') + ' ' + e.message); + process.exit(0); +}); +req.setTimeout(5000, () => { + console.log('timeout'); + req.destroy(); +}); +req.write(body); +req.end(); +""" + + +@skip_unless_docker() +class TestPipelockBlocksSecretPost(unittest.TestCase): + @unittest.skipIf( + os.environ.get("GITEA_ACTIONS") == "true", + "skipped under act_runner: docker socket mount topology breaks " + "in-process visibility of networks created on the host daemon", + ) + def test_post_with_credential_body_is_blocked(self): + manifest = Manifest.from_json_obj({ + "bottles": { + "dev": {"env": {"FAKE_TOKEN": _FAKE_TOKEN}}, + }, + "agents": { + "demo": {"skills": [], "prompt": "", "bottle": "dev"}, + }, + }) + backend = get_bottle_backend() + stage_dir = Path(tempfile.mkdtemp(prefix="cb-test-stage.")) + try: + spec = BottleSpec( + manifest=manifest, + agent_name="demo", + copy_cwd=False, + user_cwd=str(stage_dir), + forward_oauth_token=False, + ) + plan = backend.prepare(spec, stage_dir=stage_dir) + with backend.launch(plan) as bottle: + script = ( + "set -e\n" + "cat > /tmp/probe.js <<'PROBE_EOF'\n" + f"{_PROBE_JS}\n" + "PROBE_EOF\n" + "node /tmp/probe.js\n" + ) + result = bottle.exec(script) + finally: + shutil.rmtree(stage_dir, ignore_errors=True) + + self.assertEqual( + 0, result.returncode, + f"exec wrapper failed: stdout={result.stdout!r} stderr={result.stderr!r}", + ) + # api.anthropic.com is on the baked-in allowlist, so the + # host-allowlist layer would have let this through. Pipelock's + # DLP body-scan layer must catch the credential pattern and + # answer 403; any other code means the body reached the + # upstream. + self.assertIn( + "status=403", result.stdout, + f"pipelock DLP should have blocked the credential POST; got: {result.stdout!r}", + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/test_pipelock_yaml.py b/tests/unit/test_pipelock_yaml.py index 1818e05..53d3ff7 100644 --- a/tests/unit/test_pipelock_yaml.py +++ b/tests/unit/test_pipelock_yaml.py @@ -27,6 +27,10 @@ class TestBuildConfig(unittest.TestCase): self.assertEqual( {"include_defaults": True, "scan_env": True}, cfg["dlp"] ) + # Default body-scan action is "block" — see BottleEgress.dlp_action. + self.assertEqual( + {"action": "block"}, cfg["request_body_scanning"] + ) # Baked defaults always present. self.assertIn("api.anthropic.com", cast(list[str], cfg["api_allowlist"])) self.assertIn("raw.githubusercontent.com", cast(list[str], cfg["api_allowlist"])) @@ -66,6 +70,7 @@ class TestRenderAndWrite(unittest.TestCase): "trusted_domains:", "ssrf:", "dlp:", + "request_body_scanning:", ): self.assertIn(required, text)