diff --git a/bot_bottle/dlp_detectors.py b/bot_bottle/dlp_detectors.py index 1263161..c2c038b 100644 --- a/bot_bottle/dlp_detectors.py +++ b/bot_bottle/dlp_detectors.py @@ -14,6 +14,7 @@ import base64 import gzip import re import typing +import unicodedata from urllib.parse import quote as url_quote try: @@ -38,7 +39,24 @@ def _snippet(text: str, start: int, end: int) -> str: # --------------------------------------------------------------------------- -# Token patterns detector (Phase 1a) +# Unicode normalization (defeats confusable-char and combining-mark evasion) +# --------------------------------------------------------------------------- + +def _normalize_text(text: str) -> str: + # NFKD separates base characters from combining marks and resolves + # compatibility equivalents (fullwidth ASCII, ligatures, etc.) + decomposed = unicodedata.normalize("NFKD", text) + return "".join( + ch for ch in decomposed + # Strip combining marks inserted between chars to break patterns + if unicodedata.category(ch) != "Mn" + # Strip control chars; keep common whitespace (\n \r \t) + and (unicodedata.category(ch) != "Cc" or ch in "\n\r\t") + ) + + +# --------------------------------------------------------------------------- +# Token patterns detector # --------------------------------------------------------------------------- TOKEN_PATTERNS: tuple[tuple[str, re.Pattern[str]], ...] = ( @@ -50,12 +68,20 @@ TOKEN_PATTERNS: tuple[tuple[str, re.Pattern[str]], ...] = ( ("OpenAI project API key", re.compile(r"sk-proj-[A-Za-z0-9_\-]{48,}")), ("Stripe live key", re.compile(r"sk_live_[A-Za-z0-9]{24}")), ("Generic Bearer JWT", re.compile(r"Bearer\s+[A-Za-z0-9._\-]{50,}")), + ("HuggingFace token", re.compile(r"hf_[A-Za-z0-9]{34,}")), + ("Databricks token", re.compile(r"dapi[A-Za-z0-9]{32}")), + ("Slack token", re.compile(r"xox[baprs]-[A-Za-z0-9]+-[A-Za-z0-9]+-[A-Za-z0-9]{24,}")), + ("npm token", re.compile(r"npm_[A-Za-z0-9]{36}")), + ("SendGrid API key", re.compile(r"SG\.[A-Za-z0-9_\-]{22}\.[A-Za-z0-9_\-]{43}")), + ("PyPI token", re.compile(r"pypi-[A-Za-z0-9_\-]{80,}")), + ("HashiCorp Vault token", re.compile(r"hvs\.[A-Za-z0-9_\-]{24,}")), ) def scan_token_patterns(text: str, *, location: str = "body") -> ScanResult | None: + normalized = _normalize_text(text) for name, pattern in TOKEN_PATTERNS: - m = pattern.search(text) + m = pattern.search(normalized) if m is not None: return ScanResult( severity="block", @@ -229,11 +255,36 @@ def scan_naive_injection(text: str) -> ScanResult | None: return None +# --------------------------------------------------------------------------- +# CRLF injection detector +# --------------------------------------------------------------------------- + +# URL-encoded CRLF is never legitimate in a request URL or header value. +_CRLF_ENCODED_RE = re.compile(r"%0[dD]%0[aA]", re.ASCII) +# Literal CRLF followed by a header-name pattern indicates header injection. +_CRLF_HEADER_INJECT_RE = re.compile(r"\r\n[A-Za-z][A-Za-z0-9\-]+\s*:", re.ASCII) + + +def scan_crlf_injection(text: str) -> ScanResult | None: + if _CRLF_ENCODED_RE.search(text): + return ScanResult( + severity="block", + reason="URL-encoded CRLF (%0d%0a) in outbound request", + ) + if _CRLF_HEADER_INJECT_RE.search(text): + return ScanResult( + severity="block", + reason="CRLF header injection pattern in outbound request", + ) + return None + + __all__ = [ "REDACT", "SNIPPET_CONTEXT", "TOKEN_PATTERNS", "redact_tokens", + "scan_crlf_injection", "scan_known_secrets", "scan_naive_injection", "scan_token_patterns", diff --git a/bot_bottle/egress_addon_core.py b/bot_bottle/egress_addon_core.py index c409746..6112814 100644 --- a/bot_bottle/egress_addon_core.py +++ b/bot_bottle/egress_addon_core.py @@ -574,15 +574,25 @@ def scan_outbound( # at import time (the sidecar copies it flat alongside this file). try: from dlp_detectors import ( # type: ignore[import-not-found] - scan_token_patterns, scan_known_secrets, + scan_crlf_injection, + scan_known_secrets, + scan_token_patterns, ) except ImportError: # pragma: no cover - host-side path from .dlp_detectors import ( # type: ignore[import-not-found] - scan_token_patterns, scan_known_secrets, + scan_crlf_injection, + scan_known_secrets, + scan_token_patterns, ) text = body if isinstance(body, str) else body.decode("utf-8", errors="replace") + # CRLF injection is never legitimate — runs unconditionally, not gated + # by outbound_detectors config. + result = scan_crlf_injection(text) + if result is not None: + return result + if _detector_enabled(route.outbound_detectors, "token_patterns"): result = scan_token_patterns(text, location="body") if result is not None: diff --git a/tests/unit/test_dlp_detectors.py b/tests/unit/test_dlp_detectors.py index 3024bcc..03ddae6 100644 --- a/tests/unit/test_dlp_detectors.py +++ b/tests/unit/test_dlp_detectors.py @@ -10,7 +10,9 @@ import unittest from bot_bottle.dlp_detectors import ( REDACT, _encoded_variants, + _normalize_text, redact_tokens, + scan_crlf_injection, scan_known_secrets, scan_naive_injection, scan_token_patterns, @@ -300,6 +302,115 @@ class TestEncodedVariants(unittest.TestCase): self.assertEqual(len(v), len(set(v))) +class TestScanTokenPatternsExtended(unittest.TestCase): + def test_huggingface_token(self): + result = scan_token_patterns("token=hf_" + "A" * 34) # gitleaks:allow + assert result is not None + self.assertIn("HuggingFace", result.reason) + + def test_databricks_token(self): + result = scan_token_patterns("dapi" + "a" * 32) # gitleaks:allow + assert result is not None + self.assertIn("Databricks", result.reason) + + def test_slack_bot_token(self): + # Use all-zero numeric segments to keep entropy low + result = scan_token_patterns("xoxb-00000000000-00000000000-" + "A" * 24) # gitleaks:allow + assert result is not None + self.assertIn("Slack", result.reason) + + def test_npm_token(self): + result = scan_token_patterns("npm_" + "A" * 36) # gitleaks:allow + assert result is not None + self.assertIn("npm", result.reason) + + def test_sendgrid_key(self): + result = scan_token_patterns("SG." + "A" * 22 + "." + "B" * 43) # gitleaks:allow + assert result is not None + self.assertIn("SendGrid", result.reason) + + def test_pypi_token(self): + result = scan_token_patterns("pypi-" + "A" * 80) # gitleaks:allow + assert result is not None + self.assertIn("PyPI", result.reason) + + def test_vault_token(self): + result = scan_token_patterns("hvs." + "A" * 24) # gitleaks:allow + assert result is not None + self.assertIn("Vault", result.reason) + + +class TestUnicodeNormalization(unittest.TestCase): + def test_fullwidth_chars_normalized(self): + # Fullwidth ASCII chars (U+FF21..U+FF3A) should map to ASCII + fullwidth_A = "A" # FULLWIDTH LATIN CAPITAL LETTER A + # NFKD maps fullwidth A → A, so AKIA pattern becomes detectable + result = scan_token_patterns(fullwidth_A + "KIA" + "0" * 16) + assert result is not None + self.assertIn("AWS", result.reason) + + def test_combining_marks_stripped(self): + # Combining mark inserted between chars (e.g. A + combining grave) + secret = "AKIA" + "̀" + "0" * 16 # AKIA with combining grave after A + normalized = _normalize_text(secret) + # Combining mark is stripped → AKIA0...0 is visible to regex + self.assertNotIn("̀", normalized) + result = scan_token_patterns(secret) + assert result is not None + self.assertIn("AWS", result.reason) + + def test_control_chars_stripped(self): + # Null byte inserted to split a token + secret = "AK\x00IA" + "0" * 16 + normalized = _normalize_text(secret) + self.assertNotIn("\x00", normalized) + + def test_common_whitespace_preserved(self): + normalized = _normalize_text("line1\nline2\r\nline3\t end") + self.assertIn("\n", normalized) + self.assertIn("\r\n", normalized) + self.assertIn("\t", normalized) + + def test_clean_text_unchanged(self): + text = "hello world 123" + self.assertEqual(text, _normalize_text(text)) + + +class TestScanCrlfInjection(unittest.TestCase): + def test_url_encoded_crlf_lowercase(self): + result = scan_crlf_injection("/path?next=%0d%0aX-Injected: evil") + assert result is not None + self.assertEqual("block", result.severity) + self.assertIn("%0d%0a", result.reason) + + def test_url_encoded_crlf_uppercase(self): + result = scan_crlf_injection("/path?next=%0D%0AX-Injected: evil") + assert result is not None + self.assertEqual("block", result.severity) + + def test_url_encoded_crlf_mixed_case(self): + result = scan_crlf_injection("redirect=%0d%0ASet-Cookie: session=x") + assert result is not None + self.assertEqual("block", result.severity) + + def test_literal_crlf_header_injection(self): + result = scan_crlf_injection("value\r\nX-Injected: evil") + assert result is not None + self.assertEqual("block", result.severity) + self.assertIn("header injection", result.reason) + + def test_literal_crlf_in_body_not_flagged(self): + # Plain CRLF without a following header-like pattern is not flagged + # (legitimate in Windows text or multipart bodies) + self.assertIsNone(scan_crlf_injection("line1\r\nline2\r\nline3")) + + def test_clean_url_returns_none(self): + self.assertIsNone(scan_crlf_injection("/api/v1/data?q=hello+world")) + + def test_clean_body_returns_none(self): + self.assertIsNone(scan_crlf_injection('{"key": "value", "other": "data"}')) + + class TestKnownSecretsNewVariants(unittest.TestCase): SECRET = "super-secret-token" ENV = {"EGRESS_TOKEN_0": SECRET} diff --git a/tests/unit/test_egress_addon_core.py b/tests/unit/test_egress_addon_core.py index acb8ec1..fa4f7f4 100644 --- a/tests/unit/test_egress_addon_core.py +++ b/tests/unit/test_egress_addon_core.py @@ -926,6 +926,35 @@ class TestScanOutbound(unittest.TestCase): assert result is not None self.assertEqual("block", result.severity) + def test_crlf_in_query_blocked(self): + # CRLF injection attempt via URL-encoded %0d%0a in a query param + text = build_outbound_scan_text( + host="api.example.com", + path="/search", + query="next=%0d%0aX-Injected%3A+evil", + headers={}, + body="", + ) + result = scan_outbound(_ROUTE, text, {}) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("block", result.severity) + + def test_crlf_blocked_even_when_detectors_disabled(self): + # CRLF scan runs unconditionally; outbound_detectors: false doesn't skip it + route = Route(host="api.example.com", outbound_detectors=()) + text = build_outbound_scan_text( + host="api.example.com", + path="/data", + query="", + headers={"x-redirect": "value\r\nX-Injected: evil"}, + body="", + ) + result = scan_outbound(route, text, {}) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("block", result.severity) + # --- build_inbound_scan_text --------------------------------------------