Stop scanning the request body for CRLF injection
lint / lint (push) Successful in 1m41s
test / unit (pull_request) Successful in 31s
test / integration (pull_request) Successful in 18s

A 403 "egress DLP: URL-encoded CRLF (%0d%0a)" was firing on legitimate
requests (e.g. the Claude Code login flow) and bypassing the on-match
policy entirely, because CRLF blocks carry no matched value and were
routed straight to a hard 403.

Root cause: CRLF injection is only an attack in the request line and
headers. An HTTP body is delimited by Content-Length, so CRLF bytes in
the body cannot split the request — but the scan flattened the body into
the same blob it checked, so form-encoded / multi-line body content
(which legitimately contains %0d%0a) tripped it.

Fix:
- scan_outbound takes a crlf_text param; the addon scans CRLF only over
  the body-excluded request line + headers. crlf_text=None keeps the
  old full-blob behavior for host-side callers/tests; the websocket path
  passes "" since a data frame is not a request line.
- The redact policy now also scrubs CRLF (new strip_crlf helper) from the
  path and headers, so redact is a complete escape hatch and structural
  CRLF in the URL/headers can be forwarded when a route opts into it.

Tests: strip_crlf unit tests; scan_outbound crlf_text body-exclusion and
backward-compat tests.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01HnvBjPZC5V7qeQpFbQdDmS
This commit is contained in:
2026-06-24 20:37:26 -04:00
parent cdfaaa3de8
commit b411577e76
5 changed files with 108 additions and 31 deletions
+9
View File
@@ -283,6 +283,14 @@ _CRLF_ENCODED_RE = re.compile(r"%0[dD]%0[aA]", re.ASCII)
_CRLF_HEADER_INJECT_RE = re.compile(r"\r\n[A-Za-z][A-Za-z0-9\-]+\s*:", re.ASCII)
def strip_crlf(text: str) -> str:
"""Remove URL-encoded and literal CRLF injection sequences from a request
surface (PRD 0062 redact policy). Used to scrub the request line / headers
so the request can be forwarded instead of hard-blocked."""
text = _CRLF_ENCODED_RE.sub("", text)
return _CRLF_HEADER_INJECT_RE.sub(lambda m: m.group(0)[2:], text)
def scan_crlf_injection(text: str) -> ScanResult | None:
if _CRLF_ENCODED_RE.search(text):
return ScanResult(
@@ -306,4 +314,5 @@ __all__ = [
"scan_known_secrets",
"scan_naive_injection",
"scan_token_patterns",
"strip_crlf",
]
+37 -28
View File
@@ -39,9 +39,12 @@ from egress_addon_core import ( # type: ignore[import-not-found] # pylint: dis
)
try:
from dlp_detectors import redact_tokens # type: ignore[import-not-found]
from dlp_detectors import redact_tokens, strip_crlf # type: ignore[import-not-found]
except ImportError: # pragma: no cover - host-side path
from bot_bottle.dlp_detectors import redact_tokens # type: ignore[import-not-found]
from bot_bottle.dlp_detectors import ( # type: ignore[import-not-found]
redact_tokens,
strip_crlf,
)
try:
import supervise as _sv # type: ignore[import-not-found]
@@ -267,27 +270,26 @@ class EgressAddon:
while True:
request_path, _, query = flow.request.path.partition("?")
body = flow.request.get_text(strict=False) or ""
headers = outbound_scan_headers(route, dict(flow.request.headers))
scan_text = build_outbound_scan_text(
flow.request.pretty_host,
request_path,
query,
outbound_scan_headers(route, dict(flow.request.headers)),
body,
flow.request.pretty_host, request_path, query, headers, body,
)
# CRLF is scanned only over the request line + headers, never the
# body (see scan_outbound) — a body is not an injection vector.
crlf_text = build_outbound_scan_text(
flow.request.pretty_host, request_path, query, headers, "",
)
result = scan_outbound(
route, scan_text, os.environ, safe_tokens=self.safe_tokens,
route, scan_text, os.environ,
safe_tokens=self.safe_tokens, crlf_text=crlf_text,
)
if result is None or result.severity != "block":
return True
# Structural blocks (CRLF, no safelist-able value) are always a
# hard 403, regardless of the route's on-match policy.
if not result.matched:
self._block_dlp(flow, result)
return False
policy = route.outbound_on_match or DEFAULT_OUTBOUND_ON_MATCH
# redact scrubs every detection (tokens and structural CRLF) and
# forwards; it fails closed only if a match survives the scrub.
if policy == ON_MATCH_REDACT:
if self._redact_outbound(flow, route):
if self.config.log >= LOG_BLOCKS:
@@ -305,7 +307,10 @@ class EgressAddon:
)
return False
if policy == ON_MATCH_BLOCK:
# Structural blocks (CRLF, no safelist-able value) cannot be
# supervised — there is nothing to approve and remember — so under
# block/supervise they are a hard 403.
if policy == ON_MATCH_BLOCK or not result.matched:
self._block_dlp(flow, result)
return False
@@ -320,10 +325,11 @@ class EgressAddon:
# loop: the approved value is now in safe_tokens; re-scan.
def _redact_outbound(self, flow: http.HTTPFlow, route: Route) -> bool:
"""Scrub detected tokens from the mutable request surfaces (body,
headers, path/query) and re-scan. Returns True if the request is now
clean; False if a block-severity match remains on a surface redaction
cannot rewrite (the hostname) so the caller fails closed."""
"""Scrub detected tokens (and CRLF injection sequences) from the mutable
request surfaces (body, headers, path/query) and re-scan. Returns True
if the request is now clean; False if a block-severity match remains on
a surface redaction cannot rewrite (the hostname) so the caller fails
closed."""
body = flow.request.get_text(strict=False)
if body:
redacted_body = redact_tokens(body, env=os.environ)
@@ -332,23 +338,23 @@ class EgressAddon:
for name, value in list(flow.request.headers.items()):
if name.lower() == "host":
continue # routing-critical; never a legitimate token
redacted = redact_tokens(value, env=os.environ)
redacted = strip_crlf(redact_tokens(value, env=os.environ))
if redacted != value:
flow.request.headers[name] = redacted
redacted_path = redact_tokens(flow.request.path, env=os.environ)
redacted_path = strip_crlf(redact_tokens(flow.request.path, env=os.environ))
if redacted_path != flow.request.path:
flow.request.path = redacted_path
request_path, _, query = flow.request.path.partition("?")
new_body = flow.request.get_text(strict=False) or ""
headers = outbound_scan_headers(route, dict(flow.request.headers))
scan_text = build_outbound_scan_text(
flow.request.pretty_host,
request_path,
query,
outbound_scan_headers(route, dict(flow.request.headers)),
new_body,
flow.request.pretty_host, request_path, query, headers, new_body,
)
result = scan_outbound(route, scan_text, os.environ)
crlf_text = build_outbound_scan_text(
flow.request.pretty_host, request_path, query, headers, "",
)
result = scan_outbound(route, scan_text, os.environ, crlf_text=crlf_text)
return result is None or result.severity != "block"
async def _supervise_token_block(
@@ -491,8 +497,11 @@ class EgressAddon:
message = flow.websocket.messages[-1] # type: ignore[union-attr]
content = message.content.decode("utf-8", errors="replace")
if message.from_client:
# A WebSocket data frame is not an HTTP request line, so CRLF is
# not an injection vector here — scan only for credential leakage.
result = scan_outbound(
route, content, os.environ, safe_tokens=self.safe_tokens,
route, content, os.environ,
safe_tokens=self.safe_tokens, crlf_text="",
)
if result is not None and result.severity == "block":
sys.stderr.write(f"egress DLP: {result.reason}\n")
+9 -3
View File
@@ -722,6 +722,7 @@ def scan_outbound(
environ: typing.Mapping[str, str],
*,
safe_tokens: typing.AbstractSet[str] | None = None,
crlf_text: str | None = None,
) -> ScanResult | None:
# Lazy import to avoid circular deps and keep dlp_detectors optional
# at import time (the sidecar copies it flat alongside this file).
@@ -740,9 +741,14 @@ def scan_outbound(
text = body if isinstance(body, str) else body.decode("utf-8", errors="replace")
# CRLF injection is never legitimate — runs unconditionally, not gated
# by outbound_detectors config, and never override-able by safe_tokens.
result = scan_crlf_injection(text)
# CRLF injection is only an attack in the request line + headers, never the
# body: an HTTP body is delimited by Content-Length, so CRLF bytes there
# cannot split the request. Scanning the body produces false positives on
# legitimate form-encoded / multi-line content. Callers pass the
# body-excluded surfaces as `crlf_text`; `None` falls back to the full text
# for backward-compatible callers (host-side tests, websocket frames).
crlf_target = text if crlf_text is None else crlf_text
result = scan_crlf_injection(crlf_target)
if result is not None:
return result
+16
View File
@@ -487,5 +487,21 @@ class TestMatchedAndSafeTokens(unittest.TestCase):
self.assertEqual("", result.matched)
class TestStripCrlf(unittest.TestCase):
def test_removes_url_encoded_crlf(self):
from bot_bottle.dlp_detectors import strip_crlf
out = strip_crlf("next=%0d%0aX-Injected: evil")
self.assertNotRegex(out, r"%0[dD]%0[aA]")
def test_removes_literal_header_injection(self):
from bot_bottle.dlp_detectors import strip_crlf
out = strip_crlf("value\r\nX-Injected: evil")
self.assertIsNone(scan_crlf_injection(out))
def test_leaves_clean_text_unchanged(self):
from bot_bottle.dlp_detectors import strip_crlf
self.assertEqual("/api/v1/data?q=hello", strip_crlf("/api/v1/data?q=hello"))
if __name__ == "__main__":
unittest.main()
+37
View File
@@ -1212,6 +1212,43 @@ class TestScanOutboundSafeTokens(unittest.TestCase):
self.assertEqual(_AWS_KEY, result.matched)
class TestScanOutboundCrlfText(unittest.TestCase):
"""PRD 0062: CRLF is scanned only over the request line + headers
(crlf_text), never the body a body is not an injection vector."""
def test_body_crlf_not_flagged_when_crlf_text_excludes_body(self):
# A form-encoded multi-line body legitimately contains %0d%0a.
body = "comment=line1%0d%0aline2"
full = build_outbound_scan_text(
host="api.example.com", path="/submit", query="",
headers={}, body=body,
)
crlf_text = build_outbound_scan_text(
host="api.example.com", path="/submit", query="",
headers={}, body="",
)
self.assertIsNone(scan_outbound(_ROUTE, full, {}, crlf_text=crlf_text))
def test_request_line_crlf_still_flagged(self):
full = build_outbound_scan_text(
host="api.example.com", path="/p", query="next=%0d%0aX:evil",
headers={}, body="",
)
crlf_text = full
result = scan_outbound(_ROUTE, full, {}, crlf_text=crlf_text)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_default_crlf_text_scans_full_blob(self):
# Backward compatibility: crlf_text=None scans everything (body too).
full = build_outbound_scan_text(
host="api.example.com", path="/submit", query="",
headers={}, body="x=%0d%0aX:evil",
)
self.assertIsNotNone(scan_outbound(_ROUTE, full, {}))
class TestBuildTokenAllowPayload(unittest.TestCase):
def test_payload_includes_context_and_no_raw_token(self):
result = ScanResult(