feat(mitmproxy): vendor the addon and Docker sidecar lifecycle
First step of PRD 0005. Three new files for the mitmproxy-in-front-of-pipelock topology — wiring into the bottle launch comes in the next commit. - claude_bottle/mitmproxy/__init__.py: abstract MitmproxyProxy base + MitmproxyProxyPlan. Mirrors the PipelockProxy shape (prepare / start / stop) and adds extract_ca_cert for the CA cert hand-off into the agent. - claude_bottle/mitmproxy/addon.py: the vendored Python addon mitmproxy loads inside the sidecar. Forwards each decrypted request to pipelock as a plain HTTP forward-proxy call, inspects the response, and short-circuits the flow with 403 on a pipelock block (status=403 + body starts with `blocked: `, pinned empirically against pipelock 2.3.0 in the impl spike). Self-contained — no claude_bottle imports — so it loads in a sidecar that doesn't have claude_bottle on its path. - claude_bottle/backend/docker/mitmproxy.py: DockerMitmproxyProxy with create / cp / network connect / start lifecycle. Pinned to mitmproxy/mitmproxy@sha256:00b77b5d… (multi-arch manifest for v12.2.3). - tests/unit/test_mitmproxy_verdict.py: pins the verdict fingerprint so a pipelock-side body shape change breaks loudly. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,178 @@
|
|||||||
|
"""DockerMitmproxyProxy — the Docker-specific lifecycle for the
|
||||||
|
mitmproxy sidecar. Inherits the addon-bundling from MitmproxyProxy.
|
||||||
|
|
||||||
|
The sidecar runs `mitmdump -s /addon/addon.py`, listens on
|
||||||
|
MITMPROXY_PORT inside the per-bottle internal network, and generates
|
||||||
|
its own ephemeral CA on first launch (extracted by provision_ca,
|
||||||
|
installed into the agent's trust store)."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import os
|
||||||
|
import subprocess
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from ...log import die, info, warn
|
||||||
|
from ...mitmproxy import MitmproxyProxy, MitmproxyProxyPlan
|
||||||
|
|
||||||
|
|
||||||
|
# mitmproxy/mitmproxy:12.2.3 (mitmproxy v12 release line). The digest
|
||||||
|
# is the multi-arch image index — pulls resolve to the right per-arch
|
||||||
|
# child digest. Bumped deliberately; see PRD 0005.
|
||||||
|
MITMPROXY_IMAGE = os.environ.get(
|
||||||
|
"CLAUDE_BOTTLE_MITMPROXY_IMAGE",
|
||||||
|
"mitmproxy/mitmproxy@sha256:00b77b5d8804c8ad18cb6caefbf9d5849e895e8986c5ce011f4ae30f4385962f",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Listening port for mitmproxy's forward proxy (agent-facing).
|
||||||
|
MITMPROXY_PORT = os.environ.get("CLAUDE_BOTTLE_MITMPROXY_PORT", "8080")
|
||||||
|
|
||||||
|
# Path inside the sidecar where the addon is dropped by docker cp.
|
||||||
|
MITMPROXY_ADDON_PATH = "/addon/addon.py"
|
||||||
|
|
||||||
|
# Path inside the sidecar where mitmproxy generates its CA.
|
||||||
|
_CA_PATH_IN_SIDECAR = "/home/mitmproxy/.mitmproxy/mitmproxy-ca-cert.pem"
|
||||||
|
|
||||||
|
|
||||||
|
def mitmproxy_container_name(slug: str) -> str:
|
||||||
|
return f"claude-bottle-mitm-{slug}"
|
||||||
|
|
||||||
|
|
||||||
|
def mitmproxy_proxy_url(slug: str) -> str:
|
||||||
|
return f"http://{mitmproxy_container_name(slug)}:{MITMPROXY_PORT}"
|
||||||
|
|
||||||
|
|
||||||
|
class DockerMitmproxyProxy(MitmproxyProxy):
|
||||||
|
"""Brings the mitmproxy sidecar up and down via Docker."""
|
||||||
|
|
||||||
|
def start(self, plan: MitmproxyProxyPlan, *, pipelock_url: str) -> str:
|
||||||
|
"""Boot the mitmproxy sidecar:
|
||||||
|
1. `docker create` on the internal network with mitmdump
|
||||||
|
argv: `--listen-port <port> -s <addon path>` plus the
|
||||||
|
pipelock URL injected as an env var.
|
||||||
|
2. `docker cp` the vendored addon to the sidecar.
|
||||||
|
3. Attach to the per-agent egress network so mitmproxy
|
||||||
|
can reach real upstreams.
|
||||||
|
4. `docker start`.
|
||||||
|
Returns the container name (the proxy_target passed to .stop
|
||||||
|
and .extract_ca_cert)."""
|
||||||
|
name = mitmproxy_container_name(plan.slug)
|
||||||
|
if not plan.addon_src.is_file():
|
||||||
|
die(f"mitmproxy addon not found at {plan.addon_src}")
|
||||||
|
|
||||||
|
info(f"starting mitmproxy sidecar {name} on network {plan.internal_network}")
|
||||||
|
|
||||||
|
create_args = [
|
||||||
|
"docker", "create",
|
||||||
|
"--name", name,
|
||||||
|
"--network", plan.internal_network,
|
||||||
|
"-e", f"CLAUDE_BOTTLE_PIPELOCK_URL={pipelock_url}",
|
||||||
|
MITMPROXY_IMAGE,
|
||||||
|
"mitmdump",
|
||||||
|
"--listen-port", MITMPROXY_PORT,
|
||||||
|
"-s", MITMPROXY_ADDON_PATH,
|
||||||
|
]
|
||||||
|
if subprocess.run(
|
||||||
|
create_args,
|
||||||
|
stdout=subprocess.DEVNULL,
|
||||||
|
stderr=subprocess.DEVNULL,
|
||||||
|
check=False,
|
||||||
|
).returncode != 0:
|
||||||
|
die(f"failed to create mitmproxy sidecar {name}")
|
||||||
|
|
||||||
|
cp_result = subprocess.run(
|
||||||
|
["docker", "cp", str(plan.addon_src), f"{name}:{MITMPROXY_ADDON_PATH}"],
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
check=False,
|
||||||
|
)
|
||||||
|
if cp_result.returncode != 0:
|
||||||
|
subprocess.run(
|
||||||
|
["docker", "rm", "-f", name],
|
||||||
|
stdout=subprocess.DEVNULL,
|
||||||
|
stderr=subprocess.DEVNULL,
|
||||||
|
check=False,
|
||||||
|
)
|
||||||
|
die(f"failed to copy mitmproxy addon into {name}: {cp_result.stderr.strip()}")
|
||||||
|
|
||||||
|
if subprocess.run(
|
||||||
|
["docker", "network", "connect", plan.egress_network, name],
|
||||||
|
stdout=subprocess.DEVNULL,
|
||||||
|
stderr=subprocess.DEVNULL,
|
||||||
|
check=False,
|
||||||
|
).returncode != 0:
|
||||||
|
subprocess.run(
|
||||||
|
["docker", "rm", "-f", name],
|
||||||
|
stdout=subprocess.DEVNULL,
|
||||||
|
stderr=subprocess.DEVNULL,
|
||||||
|
check=False,
|
||||||
|
)
|
||||||
|
die(f"failed to attach mitmproxy sidecar {name} to egress "
|
||||||
|
f"network {plan.egress_network}")
|
||||||
|
|
||||||
|
if subprocess.run(
|
||||||
|
["docker", "start", name],
|
||||||
|
stdout=subprocess.DEVNULL,
|
||||||
|
stderr=subprocess.DEVNULL,
|
||||||
|
check=False,
|
||||||
|
).returncode != 0:
|
||||||
|
subprocess.run(
|
||||||
|
["docker", "rm", "-f", name],
|
||||||
|
stdout=subprocess.DEVNULL,
|
||||||
|
stderr=subprocess.DEVNULL,
|
||||||
|
check=False,
|
||||||
|
)
|
||||||
|
die(f"failed to start mitmproxy sidecar {name}")
|
||||||
|
|
||||||
|
return name
|
||||||
|
|
||||||
|
def stop(self, proxy_target: str) -> None:
|
||||||
|
"""Idempotent: missing container is success. Mirrors
|
||||||
|
DockerPipelockProxy.stop."""
|
||||||
|
if subprocess.run(
|
||||||
|
["docker", "inspect", proxy_target],
|
||||||
|
stdout=subprocess.DEVNULL,
|
||||||
|
stderr=subprocess.DEVNULL,
|
||||||
|
check=False,
|
||||||
|
).returncode == 0:
|
||||||
|
if subprocess.run(
|
||||||
|
["docker", "rm", "-f", proxy_target],
|
||||||
|
stdout=subprocess.DEVNULL,
|
||||||
|
stderr=subprocess.DEVNULL,
|
||||||
|
check=False,
|
||||||
|
).returncode != 0:
|
||||||
|
warn(
|
||||||
|
f"failed to remove mitmproxy sidecar {proxy_target}; "
|
||||||
|
f"clean up with 'docker rm -f {proxy_target}'"
|
||||||
|
)
|
||||||
|
|
||||||
|
def extract_ca_cert(self, proxy_target: str, dest_path: Path) -> None:
|
||||||
|
"""Poll the running sidecar for the CA cert (mitmproxy
|
||||||
|
generates it on first launch, typically <1s after start),
|
||||||
|
then `docker cp` the public half to `dest_path`. The private
|
||||||
|
key never leaves the container."""
|
||||||
|
deadline = time.monotonic() + 15
|
||||||
|
while time.monotonic() < deadline:
|
||||||
|
check = subprocess.run(
|
||||||
|
["docker", "exec", proxy_target, "test", "-f", _CA_PATH_IN_SIDECAR],
|
||||||
|
stdout=subprocess.DEVNULL,
|
||||||
|
stderr=subprocess.DEVNULL,
|
||||||
|
check=False,
|
||||||
|
)
|
||||||
|
if check.returncode == 0:
|
||||||
|
break
|
||||||
|
time.sleep(0.5)
|
||||||
|
else:
|
||||||
|
die(f"mitmproxy CA cert did not appear at {_CA_PATH_IN_SIDECAR} "
|
||||||
|
f"after 15s — sidecar {proxy_target} may have failed to start")
|
||||||
|
|
||||||
|
cp_result = subprocess.run(
|
||||||
|
["docker", "cp", f"{proxy_target}:{_CA_PATH_IN_SIDECAR}", str(dest_path)],
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
check=False,
|
||||||
|
)
|
||||||
|
if cp_result.returncode != 0:
|
||||||
|
die(f"failed to extract mitmproxy CA cert from {proxy_target}: "
|
||||||
|
f"{cp_result.stderr.strip()}")
|
||||||
@@ -0,0 +1,81 @@
|
|||||||
|
"""mitmproxy TLS-interception sidecar for the per-bottle egress
|
||||||
|
topology (PRD 0005).
|
||||||
|
|
||||||
|
Sits in front of pipelock on the bottle's egress path so pipelock's
|
||||||
|
body / header / URL DLP scanners see plaintext for HTTPS targets.
|
||||||
|
The sidecar runs in mitmproxy's `regular` mode and loads the
|
||||||
|
vendored addon at `addon.py`; the addon forwards each decrypted
|
||||||
|
request to pipelock as a plain HTTP forward-proxy call and gates
|
||||||
|
the mitmproxy flow on pipelock's verdict.
|
||||||
|
|
||||||
|
This module is platform-agnostic: it owns the abstract proxy
|
||||||
|
lifecycle (prepare / start / stop / extract_ca_cert). The
|
||||||
|
Docker-specific lifecycle lives in
|
||||||
|
`claude_bottle/backend/docker/mitmproxy.py`.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class MitmproxyProxyPlan:
|
||||||
|
"""Output of MitmproxyProxy.prepare; consumed by .start when the
|
||||||
|
sidecar needs to be brought up.
|
||||||
|
|
||||||
|
`addon_src` is the host-side path to the vendored addon.py,
|
||||||
|
resolved at prepare time. `slug` is the per-agent identifier
|
||||||
|
used as the suffix in every per-bottle resource name. The
|
||||||
|
network fields default to empty and are populated by the
|
||||||
|
backend's launch step (via dataclasses.replace) once those
|
||||||
|
networks have actually been created — same pattern as
|
||||||
|
PipelockProxyPlan."""
|
||||||
|
|
||||||
|
addon_src: Path
|
||||||
|
slug: str
|
||||||
|
internal_network: str = ""
|
||||||
|
egress_network: str = ""
|
||||||
|
|
||||||
|
|
||||||
|
class MitmproxyProxy(ABC):
|
||||||
|
"""The mitmproxy TLS-interception sidecar. The proxy-config + addon
|
||||||
|
bundling are platform-agnostic; the sidecar's start/stop lifecycle
|
||||||
|
and the CA extraction step are backend-specific and live on
|
||||||
|
concrete subclasses."""
|
||||||
|
|
||||||
|
def prepare(self, slug: str) -> MitmproxyProxyPlan:
|
||||||
|
"""Locate the vendored addon source and return the start
|
||||||
|
plan. The addon is checked into the project and identical
|
||||||
|
across bottles; per-bottle wiring (pipelock URL) is injected
|
||||||
|
via env vars at start time, not via a generated config."""
|
||||||
|
addon_src = Path(__file__).resolve().parent / "addon.py"
|
||||||
|
if not addon_src.is_file():
|
||||||
|
raise FileNotFoundError(
|
||||||
|
f"mitmproxy addon not found at {addon_src}; the "
|
||||||
|
f"package was installed incompletely"
|
||||||
|
)
|
||||||
|
return MitmproxyProxyPlan(addon_src=addon_src, slug=slug)
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def start(self, plan: MitmproxyProxyPlan, *, pipelock_url: str) -> str:
|
||||||
|
"""Bring up the mitmproxy sidecar according to `plan`.
|
||||||
|
`pipelock_url` is injected into the sidecar's env (as
|
||||||
|
CLAUDE_BOTTLE_PIPELOCK_URL) so the addon knows where to
|
||||||
|
scan. Returns the proxy_target string identifying the
|
||||||
|
running sidecar — the same value to pass to `.stop` and
|
||||||
|
`.extract_ca_cert`."""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def stop(self, proxy_target: str) -> None:
|
||||||
|
"""Tear down the sidecar identified by `proxy_target`.
|
||||||
|
Idempotent: a missing target is success."""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def extract_ca_cert(self, proxy_target: str, dest_path: Path) -> None:
|
||||||
|
"""Copy the public CA cert from the running sidecar to
|
||||||
|
`dest_path` on the host. Polls the sidecar for the cert
|
||||||
|
file to appear (mitmproxy generates the CA on first launch).
|
||||||
|
The private key never leaves the sidecar."""
|
||||||
@@ -0,0 +1,169 @@
|
|||||||
|
"""mitmproxy addon: forward each decrypted request to pipelock for
|
||||||
|
scanning, then either short-circuit with pipelock's 403 (block) or
|
||||||
|
let mitmproxy proceed to the real upstream (allow).
|
||||||
|
|
||||||
|
Loaded inside the mitmproxy sidecar container via `mitmdump -s ...`.
|
||||||
|
Must be self-contained — the sidecar image doesn't have claude_bottle
|
||||||
|
on its import path. Imports are limited to the Python stdlib plus
|
||||||
|
mitmproxy itself (which is the host).
|
||||||
|
|
||||||
|
Pipelock's URL is read from CLAUDE_BOTTLE_PIPELOCK_URL at startup
|
||||||
|
(injected by DockerMitmproxyProxy.start).
|
||||||
|
|
||||||
|
The verdict function `is_pipelock_block` is exported as a pure
|
||||||
|
function so unit tests can exercise it without importing mitmproxy.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import urllib.error
|
||||||
|
import urllib.request
|
||||||
|
|
||||||
|
|
||||||
|
PIPELOCK_URL_ENV = "CLAUDE_BOTTLE_PIPELOCK_URL"
|
||||||
|
PIPELOCK_TIMEOUT_SEC = 5
|
||||||
|
|
||||||
|
# Hop-by-hop headers per RFC 7230 §6.1; should not be forwarded
|
||||||
|
# across a proxy. Lower-cased for case-insensitive comparison.
|
||||||
|
_HOP_BY_HOP = frozenset({
|
||||||
|
"connection",
|
||||||
|
"keep-alive",
|
||||||
|
"proxy-authenticate",
|
||||||
|
"proxy-authorization",
|
||||||
|
"te",
|
||||||
|
"trailers",
|
||||||
|
"transfer-encoding",
|
||||||
|
"upgrade",
|
||||||
|
})
|
||||||
|
|
||||||
|
log = logging.getLogger("pipelock-bridge")
|
||||||
|
|
||||||
|
|
||||||
|
def is_pipelock_block(status: int, body_bytes: bytes) -> bool:
|
||||||
|
"""Return True iff pipelock's response indicates the proxy itself
|
||||||
|
blocked (DLP / allowlist), distinguishing from a relayed upstream
|
||||||
|
4xx that pipelock happened to forward back.
|
||||||
|
|
||||||
|
Pipelock's block bodies are plain text starting with
|
||||||
|
`blocked: <reason>` and the status is always 403. A relayed
|
||||||
|
upstream response has whatever body the upstream sent —
|
||||||
|
extremely unlikely to begin with `blocked: `. Pinned empirically
|
||||||
|
against pipelock v2.3.0 in the impl spike (DLP block:
|
||||||
|
"blocked: request body contains secret: GitHub Token";
|
||||||
|
allowlist block: "blocked: domain not in allowlist: example.com").
|
||||||
|
|
||||||
|
Long-term cleanup: file an upstream feature request for an
|
||||||
|
`X-Pipelock-Verdict: block` response header so we can match on a
|
||||||
|
structured signal instead of pattern-matching the body."""
|
||||||
|
return status == 403 and body_bytes.startswith(b"blocked: ")
|
||||||
|
|
||||||
|
|
||||||
|
def _scan_via_pipelock(
|
||||||
|
pipelock_url: str,
|
||||||
|
method: str,
|
||||||
|
target_url: str,
|
||||||
|
headers: dict[str, str],
|
||||||
|
body: bytes,
|
||||||
|
) -> tuple[int, bytes]:
|
||||||
|
"""Forward the decrypted request to pipelock as a plain HTTP
|
||||||
|
forward-proxy call. Returns (status, body_bytes). Raises on
|
||||||
|
transport-level errors so the caller can fail closed.
|
||||||
|
|
||||||
|
The target URL is rewritten to http:// so pipelock receives an
|
||||||
|
absolute-URI forward-proxy request shape. Pipelock will scan,
|
||||||
|
then may attempt an upstream forward over plain HTTP — that
|
||||||
|
response is read back too, but the addon discards it on allow
|
||||||
|
(mitmproxy makes the real HTTPS request itself)."""
|
||||||
|
rewritten_url = target_url
|
||||||
|
if rewritten_url.startswith("https://"):
|
||||||
|
rewritten_url = "http://" + rewritten_url[len("https://"):]
|
||||||
|
|
||||||
|
forwarded_headers = {
|
||||||
|
k: v for k, v in headers.items()
|
||||||
|
if k.lower() not in _HOP_BY_HOP
|
||||||
|
}
|
||||||
|
|
||||||
|
proxy_handler = urllib.request.ProxyHandler({"http": pipelock_url})
|
||||||
|
opener = urllib.request.build_opener(proxy_handler)
|
||||||
|
req = urllib.request.Request(
|
||||||
|
url=rewritten_url,
|
||||||
|
data=body if body else None,
|
||||||
|
headers=forwarded_headers,
|
||||||
|
method=method,
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
resp = opener.open(req, timeout=PIPELOCK_TIMEOUT_SEC)
|
||||||
|
return resp.status, resp.read()
|
||||||
|
except urllib.error.HTTPError as e:
|
||||||
|
return e.code, e.read()
|
||||||
|
|
||||||
|
|
||||||
|
class PipelockBridge:
|
||||||
|
"""mitmproxy addon class. mitmproxy instantiates one of these via
|
||||||
|
the `addons = [...]` module attribute at the bottom of this file."""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
# Read once per sidecar lifetime. Empty string is allowed at
|
||||||
|
# construction (so the module can be imported in test
|
||||||
|
# environments) but the request handler fails closed if it's
|
||||||
|
# missing at request time.
|
||||||
|
self._pipelock_url = os.environ.get(PIPELOCK_URL_ENV, "")
|
||||||
|
|
||||||
|
def request(self, flow) -> None:
|
||||||
|
"""mitmproxy callback. Called for each decrypted client
|
||||||
|
request before mitmproxy forwards to the real upstream.
|
||||||
|
Setting flow.response short-circuits the flow with that
|
||||||
|
response; leaving it None lets mitmproxy proceed."""
|
||||||
|
# Late import so this module can be loaded in test
|
||||||
|
# environments without mitmproxy installed (the verdict
|
||||||
|
# function is unit-testable in isolation).
|
||||||
|
from mitmproxy import http
|
||||||
|
|
||||||
|
if not self._pipelock_url:
|
||||||
|
log.error("%s is unset; failing closed", PIPELOCK_URL_ENV)
|
||||||
|
flow.response = http.Response.make(
|
||||||
|
503,
|
||||||
|
b"egress scanner not configured",
|
||||||
|
{"Content-Type": "text/plain",
|
||||||
|
"X-Pipelock-Bridge": "misconfigured"},
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
target_url = flow.request.pretty_url
|
||||||
|
method = flow.request.method
|
||||||
|
headers = {k: v for k, v in flow.request.headers.items()}
|
||||||
|
body = bytes(flow.request.content or b"")
|
||||||
|
|
||||||
|
try:
|
||||||
|
status, response_body = _scan_via_pipelock(
|
||||||
|
self._pipelock_url, method, target_url, headers, body,
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
# Fail closed: scanner unreachable means no verdict, so
|
||||||
|
# refuse rather than risk leaking.
|
||||||
|
log.warning("pipelock unreachable; failing closed: %s", e)
|
||||||
|
flow.response = http.Response.make(
|
||||||
|
503,
|
||||||
|
b"egress scanner unreachable",
|
||||||
|
{"Content-Type": "text/plain",
|
||||||
|
"X-Pipelock-Bridge": "error"},
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
if is_pipelock_block(status, response_body):
|
||||||
|
flow.response = http.Response.make(
|
||||||
|
status,
|
||||||
|
response_body,
|
||||||
|
{"Content-Type": "text/plain",
|
||||||
|
"X-Pipelock-Bridge": "block"},
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Allow path: discard pipelock's response (it's the wasted
|
||||||
|
# upstream-forward attempt). Leave flow.response as None;
|
||||||
|
# mitmproxy proceeds to the real upstream on its own.
|
||||||
|
|
||||||
|
|
||||||
|
addons = [PipelockBridge()]
|
||||||
@@ -0,0 +1,62 @@
|
|||||||
|
"""Unit: the addon's verdict function pinning pipelock-block vs.
|
||||||
|
relayed-upstream 4xx.
|
||||||
|
|
||||||
|
The fingerprint shape is the contract the addon depends on; this
|
||||||
|
test should break loudly if pipelock changes its 403-body prefix
|
||||||
|
under a version bump."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
from claude_bottle.mitmproxy.addon import is_pipelock_block
|
||||||
|
|
||||||
|
|
||||||
|
class TestIsPipelockBlock(unittest.TestCase):
|
||||||
|
def test_block_dlp_body(self):
|
||||||
|
# Pipelock v2.3.0 DLP block, captured in the impl spike.
|
||||||
|
self.assertTrue(is_pipelock_block(
|
||||||
|
403,
|
||||||
|
b"blocked: request body contains secret: GitHub Token",
|
||||||
|
))
|
||||||
|
|
||||||
|
def test_block_allowlist_body(self):
|
||||||
|
# Pipelock v2.3.0 allowlist block, captured in the impl spike.
|
||||||
|
self.assertTrue(is_pipelock_block(
|
||||||
|
403,
|
||||||
|
b"blocked: domain not in allowlist: example.com",
|
||||||
|
))
|
||||||
|
|
||||||
|
def test_block_header_dlp_body(self):
|
||||||
|
# Header DLP path; same body prefix per the spike.
|
||||||
|
self.assertTrue(is_pipelock_block(
|
||||||
|
403,
|
||||||
|
b"blocked: request header Authorization contains secret",
|
||||||
|
))
|
||||||
|
|
||||||
|
def test_403_without_blocked_prefix_is_not_a_block(self):
|
||||||
|
# A real-upstream 403 relayed by pipelock — body is whatever
|
||||||
|
# the upstream sent, almost certainly not starting with
|
||||||
|
# `blocked: `. Must be treated as allow so the addon hands
|
||||||
|
# the flow back to mitmproxy.
|
||||||
|
self.assertFalse(is_pipelock_block(
|
||||||
|
403,
|
||||||
|
b'{"error":"forbidden","detail":"insufficient permissions"}',
|
||||||
|
))
|
||||||
|
|
||||||
|
def test_non_403_with_blocked_prefix_is_not_a_block(self):
|
||||||
|
# Defensive: if some intermediate ever returns 502/504 with
|
||||||
|
# a body that happens to begin `blocked: `, we should still
|
||||||
|
# not short-circuit. Block status is always 403 by contract.
|
||||||
|
self.assertFalse(is_pipelock_block(502, b"blocked: ..."))
|
||||||
|
|
||||||
|
def test_200_is_not_a_block(self):
|
||||||
|
# Allow path, normal forwarded response.
|
||||||
|
self.assertFalse(is_pipelock_block(200, b'{"ok":true}'))
|
||||||
|
|
||||||
|
def test_empty_body_is_not_a_block(self):
|
||||||
|
self.assertFalse(is_pipelock_block(403, b""))
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
Reference in New Issue
Block a user