feat(pipelock): enforce DLP body-scan hits by default
Adds bottle.egress.dlp_action ("block" | "warn", default block) and
wires it into pipelock as request_body_scanning.action. Pipelock's
own default is "warn", which previously meant claude-bottle detected
credential patterns in outbound bodies but forwarded the request
anyway.
The matching integration test posts a manifest env var shaped like
a GitHub PAT to api.anthropic.com via plain HTTP forward proxy so
pipelock can see the body. Pipelock answers 403 from its body-scan
layer instead of forwarding to the upstream.
Behavior change: bottles without an explicit egress.dlp_action now
block on body-scan hits. Set egress.dlp_action: "warn" to restore
the prior detect-only behavior.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
+38
-14
@@ -79,31 +79,55 @@ class SshEntry:
|
||||
)
|
||||
|
||||
|
||||
DLP_ACTIONS = ("block", "warn")
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class BottleEgress:
|
||||
allowlist: tuple[str, ...] = ()
|
||||
# Action pipelock takes when its DLP layer matches a credential
|
||||
# pattern in a request body. "block" → 403 from the proxy, the
|
||||
# request never leaves the egress network. "warn" → forward the
|
||||
# request and emit a log line. Default is "block": detect-only
|
||||
# would let real secrets escape under the agent's compromised
|
||||
# tooling, which is the threat model claude-bottle was built for.
|
||||
dlp_action: str = "block"
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, bottle_name: str, raw: object) -> "BottleEgress":
|
||||
d = _as_json_object(raw, f"bottle '{bottle_name}' egress")
|
||||
allow = d.get("allowlist")
|
||||
if allow is None:
|
||||
return cls()
|
||||
if not isinstance(allow, list):
|
||||
die(
|
||||
f"bottle '{bottle_name}' egress.allowlist must be an array "
|
||||
f"(was {type(allow).__name__})"
|
||||
)
|
||||
items: list[str] = []
|
||||
allow_list = cast(list[object], allow)
|
||||
for i, host in enumerate(allow_list):
|
||||
if not isinstance(host, str):
|
||||
if allow is not None:
|
||||
if not isinstance(allow, list):
|
||||
die(
|
||||
f"bottle '{bottle_name}' egress.allowlist[{i}] must be a string "
|
||||
f"(was {type(host).__name__})"
|
||||
f"bottle '{bottle_name}' egress.allowlist must be an array "
|
||||
f"(was {type(allow).__name__})"
|
||||
)
|
||||
items.append(host)
|
||||
return cls(allowlist=tuple(items))
|
||||
allow_list = cast(list[object], allow)
|
||||
for i, host in enumerate(allow_list):
|
||||
if not isinstance(host, str):
|
||||
die(
|
||||
f"bottle '{bottle_name}' egress.allowlist[{i}] must be a string "
|
||||
f"(was {type(host).__name__})"
|
||||
)
|
||||
items.append(host)
|
||||
dlp_action_raw = d.get("dlp_action")
|
||||
if dlp_action_raw is None:
|
||||
dlp_action = "block"
|
||||
elif isinstance(dlp_action_raw, str):
|
||||
if dlp_action_raw not in DLP_ACTIONS:
|
||||
die(
|
||||
f"bottle '{bottle_name}' egress.dlp_action must be one of "
|
||||
f"{', '.join(DLP_ACTIONS)} (was {dlp_action_raw!r})"
|
||||
)
|
||||
dlp_action = dlp_action_raw
|
||||
else:
|
||||
die(
|
||||
f"bottle '{bottle_name}' egress.dlp_action must be a string "
|
||||
f"(was {type(dlp_action_raw).__name__})"
|
||||
)
|
||||
return cls(allowlist=tuple(items), dlp_action=dlp_action)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
|
||||
@@ -110,6 +110,12 @@ def pipelock_build_config(bottle: Bottle) -> dict[str, object]:
|
||||
if ip_cidrs:
|
||||
cfg["ssrf"] = {"ip_allowlist": ip_cidrs}
|
||||
cfg["dlp"] = {"include_defaults": True, "scan_env": True}
|
||||
# Body-scan enforcement is a separate pipelock section (each DLP
|
||||
# "surface" — body, MCP, response — has its own action). Pipelock's
|
||||
# built-in default for request_body_scanning is "warn" (forward
|
||||
# with a log line); claude-bottle's default is "block" so a hit
|
||||
# actually stops the request from leaving the egress network.
|
||||
cfg["request_body_scanning"] = {"action": bottle.egress.dlp_action}
|
||||
return cfg
|
||||
|
||||
|
||||
@@ -149,6 +155,10 @@ def pipelock_render_yaml(cfg: dict[str, object]) -> str:
|
||||
dlp = cast(dict[str, object], cfg["dlp"])
|
||||
lines.append(f" include_defaults: {_bool(dlp['include_defaults'])}")
|
||||
lines.append(f" scan_env: {_bool(dlp['scan_env'])}")
|
||||
lines.append("")
|
||||
lines.append("request_body_scanning:")
|
||||
rbs = cast(dict[str, object], cfg["request_body_scanning"])
|
||||
lines.append(f' action: "{rbs["action"]}"')
|
||||
return "\n".join(lines) + "\n"
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user