diff --git a/bot_bottle/dlp_detectors.py b/bot_bottle/dlp_detectors.py index fec3df4..ef36e7f 100644 --- a/bot_bottle/dlp_detectors.py +++ b/bot_bottle/dlp_detectors.py @@ -283,6 +283,14 @@ _CRLF_ENCODED_RE = re.compile(r"%0[dD]%0[aA]", re.ASCII) _CRLF_HEADER_INJECT_RE = re.compile(r"\r\n[A-Za-z][A-Za-z0-9\-]+\s*:", re.ASCII) +def strip_crlf(text: str) -> str: + """Remove URL-encoded and literal CRLF injection sequences from a request + surface (PRD 0062 redact policy). Used to scrub the request line / headers + so the request can be forwarded instead of hard-blocked.""" + text = _CRLF_ENCODED_RE.sub("", text) + return _CRLF_HEADER_INJECT_RE.sub(lambda m: m.group(0)[2:], text) + + def scan_crlf_injection(text: str) -> ScanResult | None: if _CRLF_ENCODED_RE.search(text): return ScanResult( @@ -306,4 +314,5 @@ __all__ = [ "scan_known_secrets", "scan_naive_injection", "scan_token_patterns", + "strip_crlf", ] diff --git a/bot_bottle/egress_addon.py b/bot_bottle/egress_addon.py index 6251b03..41f4ca5 100644 --- a/bot_bottle/egress_addon.py +++ b/bot_bottle/egress_addon.py @@ -39,9 +39,12 @@ from egress_addon_core import ( # type: ignore[import-not-found] # pylint: dis ) try: - from dlp_detectors import redact_tokens # type: ignore[import-not-found] + from dlp_detectors import redact_tokens, strip_crlf # type: ignore[import-not-found] except ImportError: # pragma: no cover - host-side path - from bot_bottle.dlp_detectors import redact_tokens # type: ignore[import-not-found] + from bot_bottle.dlp_detectors import ( # type: ignore[import-not-found] + redact_tokens, + strip_crlf, + ) try: import supervise as _sv # type: ignore[import-not-found] @@ -267,27 +270,26 @@ class EgressAddon: while True: request_path, _, query = flow.request.path.partition("?") body = flow.request.get_text(strict=False) or "" + headers = outbound_scan_headers(route, dict(flow.request.headers)) scan_text = build_outbound_scan_text( - flow.request.pretty_host, - request_path, - query, - outbound_scan_headers(route, dict(flow.request.headers)), - body, + flow.request.pretty_host, request_path, query, headers, body, + ) + # CRLF is scanned only over the request line + headers, never the + # body (see scan_outbound) — a body is not an injection vector. + crlf_text = build_outbound_scan_text( + flow.request.pretty_host, request_path, query, headers, "", ) result = scan_outbound( - route, scan_text, os.environ, safe_tokens=self.safe_tokens, + route, scan_text, os.environ, + safe_tokens=self.safe_tokens, crlf_text=crlf_text, ) if result is None or result.severity != "block": return True - # Structural blocks (CRLF, no safelist-able value) are always a - # hard 403, regardless of the route's on-match policy. - if not result.matched: - self._block_dlp(flow, result) - return False - policy = route.outbound_on_match or DEFAULT_OUTBOUND_ON_MATCH + # redact scrubs every detection (tokens and structural CRLF) and + # forwards; it fails closed only if a match survives the scrub. if policy == ON_MATCH_REDACT: if self._redact_outbound(flow, route): if self.config.log >= LOG_BLOCKS: @@ -305,7 +307,10 @@ class EgressAddon: ) return False - if policy == ON_MATCH_BLOCK: + # Structural blocks (CRLF, no safelist-able value) cannot be + # supervised — there is nothing to approve and remember — so under + # block/supervise they are a hard 403. + if policy == ON_MATCH_BLOCK or not result.matched: self._block_dlp(flow, result) return False @@ -320,10 +325,11 @@ class EgressAddon: # loop: the approved value is now in safe_tokens; re-scan. def _redact_outbound(self, flow: http.HTTPFlow, route: Route) -> bool: - """Scrub detected tokens from the mutable request surfaces (body, - headers, path/query) and re-scan. Returns True if the request is now - clean; False if a block-severity match remains on a surface redaction - cannot rewrite (the hostname) so the caller fails closed.""" + """Scrub detected tokens (and CRLF injection sequences) from the mutable + request surfaces (body, headers, path/query) and re-scan. Returns True + if the request is now clean; False if a block-severity match remains on + a surface redaction cannot rewrite (the hostname) so the caller fails + closed.""" body = flow.request.get_text(strict=False) if body: redacted_body = redact_tokens(body, env=os.environ) @@ -332,23 +338,23 @@ class EgressAddon: for name, value in list(flow.request.headers.items()): if name.lower() == "host": continue # routing-critical; never a legitimate token - redacted = redact_tokens(value, env=os.environ) + redacted = strip_crlf(redact_tokens(value, env=os.environ)) if redacted != value: flow.request.headers[name] = redacted - redacted_path = redact_tokens(flow.request.path, env=os.environ) + redacted_path = strip_crlf(redact_tokens(flow.request.path, env=os.environ)) if redacted_path != flow.request.path: flow.request.path = redacted_path request_path, _, query = flow.request.path.partition("?") new_body = flow.request.get_text(strict=False) or "" + headers = outbound_scan_headers(route, dict(flow.request.headers)) scan_text = build_outbound_scan_text( - flow.request.pretty_host, - request_path, - query, - outbound_scan_headers(route, dict(flow.request.headers)), - new_body, + flow.request.pretty_host, request_path, query, headers, new_body, ) - result = scan_outbound(route, scan_text, os.environ) + crlf_text = build_outbound_scan_text( + flow.request.pretty_host, request_path, query, headers, "", + ) + result = scan_outbound(route, scan_text, os.environ, crlf_text=crlf_text) return result is None or result.severity != "block" async def _supervise_token_block( @@ -491,8 +497,11 @@ class EgressAddon: message = flow.websocket.messages[-1] # type: ignore[union-attr] content = message.content.decode("utf-8", errors="replace") if message.from_client: + # A WebSocket data frame is not an HTTP request line, so CRLF is + # not an injection vector here — scan only for credential leakage. result = scan_outbound( - route, content, os.environ, safe_tokens=self.safe_tokens, + route, content, os.environ, + safe_tokens=self.safe_tokens, crlf_text="", ) if result is not None and result.severity == "block": sys.stderr.write(f"egress DLP: {result.reason}\n") diff --git a/bot_bottle/egress_addon_core.py b/bot_bottle/egress_addon_core.py index 6eff59d..324f7c7 100644 --- a/bot_bottle/egress_addon_core.py +++ b/bot_bottle/egress_addon_core.py @@ -722,6 +722,7 @@ def scan_outbound( environ: typing.Mapping[str, str], *, safe_tokens: typing.AbstractSet[str] | None = None, + crlf_text: str | None = None, ) -> ScanResult | None: # Lazy import to avoid circular deps and keep dlp_detectors optional # at import time (the sidecar copies it flat alongside this file). @@ -740,9 +741,14 @@ def scan_outbound( text = body if isinstance(body, str) else body.decode("utf-8", errors="replace") - # CRLF injection is never legitimate — runs unconditionally, not gated - # by outbound_detectors config, and never override-able by safe_tokens. - result = scan_crlf_injection(text) + # CRLF injection is only an attack in the request line + headers, never the + # body: an HTTP body is delimited by Content-Length, so CRLF bytes there + # cannot split the request. Scanning the body produces false positives on + # legitimate form-encoded / multi-line content. Callers pass the + # body-excluded surfaces as `crlf_text`; `None` falls back to the full text + # for backward-compatible callers (host-side tests, websocket frames). + crlf_target = text if crlf_text is None else crlf_text + result = scan_crlf_injection(crlf_target) if result is not None: return result diff --git a/tests/unit/test_dlp_detectors.py b/tests/unit/test_dlp_detectors.py index d723fb1..3028729 100644 --- a/tests/unit/test_dlp_detectors.py +++ b/tests/unit/test_dlp_detectors.py @@ -487,5 +487,21 @@ class TestMatchedAndSafeTokens(unittest.TestCase): self.assertEqual("", result.matched) +class TestStripCrlf(unittest.TestCase): + def test_removes_url_encoded_crlf(self): + from bot_bottle.dlp_detectors import strip_crlf + out = strip_crlf("next=%0d%0aX-Injected: evil") + self.assertNotRegex(out, r"%0[dD]%0[aA]") + + def test_removes_literal_header_injection(self): + from bot_bottle.dlp_detectors import strip_crlf + out = strip_crlf("value\r\nX-Injected: evil") + self.assertIsNone(scan_crlf_injection(out)) + + def test_leaves_clean_text_unchanged(self): + from bot_bottle.dlp_detectors import strip_crlf + self.assertEqual("/api/v1/data?q=hello", strip_crlf("/api/v1/data?q=hello")) + + if __name__ == "__main__": unittest.main() diff --git a/tests/unit/test_egress_addon_core.py b/tests/unit/test_egress_addon_core.py index 2a6c564..2ea6abd 100644 --- a/tests/unit/test_egress_addon_core.py +++ b/tests/unit/test_egress_addon_core.py @@ -1212,6 +1212,43 @@ class TestScanOutboundSafeTokens(unittest.TestCase): self.assertEqual(_AWS_KEY, result.matched) +class TestScanOutboundCrlfText(unittest.TestCase): + """PRD 0062: CRLF is scanned only over the request line + headers + (crlf_text), never the body — a body is not an injection vector.""" + + def test_body_crlf_not_flagged_when_crlf_text_excludes_body(self): + # A form-encoded multi-line body legitimately contains %0d%0a. + body = "comment=line1%0d%0aline2" + full = build_outbound_scan_text( + host="api.example.com", path="/submit", query="", + headers={}, body=body, + ) + crlf_text = build_outbound_scan_text( + host="api.example.com", path="/submit", query="", + headers={}, body="", + ) + self.assertIsNone(scan_outbound(_ROUTE, full, {}, crlf_text=crlf_text)) + + def test_request_line_crlf_still_flagged(self): + full = build_outbound_scan_text( + host="api.example.com", path="/p", query="next=%0d%0aX:evil", + headers={}, body="", + ) + crlf_text = full + result = scan_outbound(_ROUTE, full, {}, crlf_text=crlf_text) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("block", result.severity) + + def test_default_crlf_text_scans_full_blob(self): + # Backward compatibility: crlf_text=None scans everything (body too). + full = build_outbound_scan_text( + host="api.example.com", path="/submit", query="", + headers={}, body="x=%0d%0aX:evil", + ) + self.assertIsNotNone(scan_outbound(_ROUTE, full, {})) + + class TestBuildTokenAllowPayload(unittest.TestCase): def test_payload_includes_context_and_no_raw_token(self): result = ScanResult(