feat(dlp): add 7 token patterns, Unicode normalization, CRLF injection detection (PRD 0053)
Token patterns: HuggingFace (hf_), Databricks (dapi), Slack (xox[baprs]-), npm (npm_), SendGrid (SG.x.y), PyPI (pypi-), HashiCorp Vault (hvs.). Unicode normalization (_normalize_text) applies NFKD + strips combining marks and control chars before pattern matching, defeating fullwidth-char and combining-mark evasion. CRLF injection (scan_crlf_injection) detects %0d%0a in URLs and literal \r\n header-injection patterns; runs unconditionally in scan_outbound regardless of outbound_detectors config.
This commit is contained in:
@@ -14,6 +14,7 @@ import base64
|
||||
import gzip
|
||||
import re
|
||||
import typing
|
||||
import unicodedata
|
||||
from urllib.parse import quote as url_quote
|
||||
|
||||
try:
|
||||
@@ -38,7 +39,24 @@ def _snippet(text: str, start: int, end: int) -> str:
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Token patterns detector (Phase 1a)
|
||||
# Unicode normalization (defeats confusable-char and combining-mark evasion)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _normalize_text(text: str) -> str:
|
||||
# NFKD separates base characters from combining marks and resolves
|
||||
# compatibility equivalents (fullwidth ASCII, ligatures, etc.)
|
||||
decomposed = unicodedata.normalize("NFKD", text)
|
||||
return "".join(
|
||||
ch for ch in decomposed
|
||||
# Strip combining marks inserted between chars to break patterns
|
||||
if unicodedata.category(ch) != "Mn"
|
||||
# Strip control chars; keep common whitespace (\n \r \t)
|
||||
and (unicodedata.category(ch) != "Cc" or ch in "\n\r\t")
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Token patterns detector
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
TOKEN_PATTERNS: tuple[tuple[str, re.Pattern[str]], ...] = (
|
||||
@@ -50,12 +68,20 @@ TOKEN_PATTERNS: tuple[tuple[str, re.Pattern[str]], ...] = (
|
||||
("OpenAI project API key", re.compile(r"sk-proj-[A-Za-z0-9_\-]{48,}")),
|
||||
("Stripe live key", re.compile(r"sk_live_[A-Za-z0-9]{24}")),
|
||||
("Generic Bearer JWT", re.compile(r"Bearer\s+[A-Za-z0-9._\-]{50,}")),
|
||||
("HuggingFace token", re.compile(r"hf_[A-Za-z0-9]{34,}")),
|
||||
("Databricks token", re.compile(r"dapi[A-Za-z0-9]{32}")),
|
||||
("Slack token", re.compile(r"xox[baprs]-[A-Za-z0-9]+-[A-Za-z0-9]+-[A-Za-z0-9]{24,}")),
|
||||
("npm token", re.compile(r"npm_[A-Za-z0-9]{36}")),
|
||||
("SendGrid API key", re.compile(r"SG\.[A-Za-z0-9_\-]{22}\.[A-Za-z0-9_\-]{43}")),
|
||||
("PyPI token", re.compile(r"pypi-[A-Za-z0-9_\-]{80,}")),
|
||||
("HashiCorp Vault token", re.compile(r"hvs\.[A-Za-z0-9_\-]{24,}")),
|
||||
)
|
||||
|
||||
|
||||
def scan_token_patterns(text: str, *, location: str = "body") -> ScanResult | None:
|
||||
normalized = _normalize_text(text)
|
||||
for name, pattern in TOKEN_PATTERNS:
|
||||
m = pattern.search(text)
|
||||
m = pattern.search(normalized)
|
||||
if m is not None:
|
||||
return ScanResult(
|
||||
severity="block",
|
||||
@@ -229,11 +255,36 @@ def scan_naive_injection(text: str) -> ScanResult | None:
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CRLF injection detector
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# URL-encoded CRLF is never legitimate in a request URL or header value.
|
||||
_CRLF_ENCODED_RE = re.compile(r"%0[dD]%0[aA]", re.ASCII)
|
||||
# Literal CRLF followed by a header-name pattern indicates header injection.
|
||||
_CRLF_HEADER_INJECT_RE = re.compile(r"\r\n[A-Za-z][A-Za-z0-9\-]+\s*:", re.ASCII)
|
||||
|
||||
|
||||
def scan_crlf_injection(text: str) -> ScanResult | None:
|
||||
if _CRLF_ENCODED_RE.search(text):
|
||||
return ScanResult(
|
||||
severity="block",
|
||||
reason="URL-encoded CRLF (%0d%0a) in outbound request",
|
||||
)
|
||||
if _CRLF_HEADER_INJECT_RE.search(text):
|
||||
return ScanResult(
|
||||
severity="block",
|
||||
reason="CRLF header injection pattern in outbound request",
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
__all__ = [
|
||||
"REDACT",
|
||||
"SNIPPET_CONTEXT",
|
||||
"TOKEN_PATTERNS",
|
||||
"redact_tokens",
|
||||
"scan_crlf_injection",
|
||||
"scan_known_secrets",
|
||||
"scan_naive_injection",
|
||||
"scan_token_patterns",
|
||||
|
||||
@@ -574,15 +574,25 @@ def scan_outbound(
|
||||
# at import time (the sidecar copies it flat alongside this file).
|
||||
try:
|
||||
from dlp_detectors import ( # type: ignore[import-not-found]
|
||||
scan_token_patterns, scan_known_secrets,
|
||||
scan_crlf_injection,
|
||||
scan_known_secrets,
|
||||
scan_token_patterns,
|
||||
)
|
||||
except ImportError: # pragma: no cover - host-side path
|
||||
from .dlp_detectors import ( # type: ignore[import-not-found]
|
||||
scan_token_patterns, scan_known_secrets,
|
||||
scan_crlf_injection,
|
||||
scan_known_secrets,
|
||||
scan_token_patterns,
|
||||
)
|
||||
|
||||
text = body if isinstance(body, str) else body.decode("utf-8", errors="replace")
|
||||
|
||||
# CRLF injection is never legitimate — runs unconditionally, not gated
|
||||
# by outbound_detectors config.
|
||||
result = scan_crlf_injection(text)
|
||||
if result is not None:
|
||||
return result
|
||||
|
||||
if _detector_enabled(route.outbound_detectors, "token_patterns"):
|
||||
result = scan_token_patterns(text, location="body")
|
||||
if result is not None:
|
||||
|
||||
@@ -10,7 +10,9 @@ import unittest
|
||||
from bot_bottle.dlp_detectors import (
|
||||
REDACT,
|
||||
_encoded_variants,
|
||||
_normalize_text,
|
||||
redact_tokens,
|
||||
scan_crlf_injection,
|
||||
scan_known_secrets,
|
||||
scan_naive_injection,
|
||||
scan_token_patterns,
|
||||
@@ -300,6 +302,115 @@ class TestEncodedVariants(unittest.TestCase):
|
||||
self.assertEqual(len(v), len(set(v)))
|
||||
|
||||
|
||||
class TestScanTokenPatternsExtended(unittest.TestCase):
|
||||
def test_huggingface_token(self):
|
||||
result = scan_token_patterns("token=hf_" + "A" * 34) # gitleaks:allow
|
||||
assert result is not None
|
||||
self.assertIn("HuggingFace", result.reason)
|
||||
|
||||
def test_databricks_token(self):
|
||||
result = scan_token_patterns("dapi" + "a" * 32) # gitleaks:allow
|
||||
assert result is not None
|
||||
self.assertIn("Databricks", result.reason)
|
||||
|
||||
def test_slack_bot_token(self):
|
||||
# Use all-zero numeric segments to keep entropy low
|
||||
result = scan_token_patterns("xoxb-00000000000-00000000000-" + "A" * 24) # gitleaks:allow
|
||||
assert result is not None
|
||||
self.assertIn("Slack", result.reason)
|
||||
|
||||
def test_npm_token(self):
|
||||
result = scan_token_patterns("npm_" + "A" * 36) # gitleaks:allow
|
||||
assert result is not None
|
||||
self.assertIn("npm", result.reason)
|
||||
|
||||
def test_sendgrid_key(self):
|
||||
result = scan_token_patterns("SG." + "A" * 22 + "." + "B" * 43) # gitleaks:allow
|
||||
assert result is not None
|
||||
self.assertIn("SendGrid", result.reason)
|
||||
|
||||
def test_pypi_token(self):
|
||||
result = scan_token_patterns("pypi-" + "A" * 80) # gitleaks:allow
|
||||
assert result is not None
|
||||
self.assertIn("PyPI", result.reason)
|
||||
|
||||
def test_vault_token(self):
|
||||
result = scan_token_patterns("hvs." + "A" * 24) # gitleaks:allow
|
||||
assert result is not None
|
||||
self.assertIn("Vault", result.reason)
|
||||
|
||||
|
||||
class TestUnicodeNormalization(unittest.TestCase):
|
||||
def test_fullwidth_chars_normalized(self):
|
||||
# Fullwidth ASCII chars (U+FF21..U+FF3A) should map to ASCII
|
||||
fullwidth_A = "A" # FULLWIDTH LATIN CAPITAL LETTER A
|
||||
# NFKD maps fullwidth A → A, so AKIA pattern becomes detectable
|
||||
result = scan_token_patterns(fullwidth_A + "KIA" + "0" * 16)
|
||||
assert result is not None
|
||||
self.assertIn("AWS", result.reason)
|
||||
|
||||
def test_combining_marks_stripped(self):
|
||||
# Combining mark inserted between chars (e.g. A + combining grave)
|
||||
secret = "AKIA" + "̀" + "0" * 16 # AKIA with combining grave after A
|
||||
normalized = _normalize_text(secret)
|
||||
# Combining mark is stripped → AKIA0...0 is visible to regex
|
||||
self.assertNotIn("̀", normalized)
|
||||
result = scan_token_patterns(secret)
|
||||
assert result is not None
|
||||
self.assertIn("AWS", result.reason)
|
||||
|
||||
def test_control_chars_stripped(self):
|
||||
# Null byte inserted to split a token
|
||||
secret = "AK\x00IA" + "0" * 16
|
||||
normalized = _normalize_text(secret)
|
||||
self.assertNotIn("\x00", normalized)
|
||||
|
||||
def test_common_whitespace_preserved(self):
|
||||
normalized = _normalize_text("line1\nline2\r\nline3\t end")
|
||||
self.assertIn("\n", normalized)
|
||||
self.assertIn("\r\n", normalized)
|
||||
self.assertIn("\t", normalized)
|
||||
|
||||
def test_clean_text_unchanged(self):
|
||||
text = "hello world 123"
|
||||
self.assertEqual(text, _normalize_text(text))
|
||||
|
||||
|
||||
class TestScanCrlfInjection(unittest.TestCase):
|
||||
def test_url_encoded_crlf_lowercase(self):
|
||||
result = scan_crlf_injection("/path?next=%0d%0aX-Injected: evil")
|
||||
assert result is not None
|
||||
self.assertEqual("block", result.severity)
|
||||
self.assertIn("%0d%0a", result.reason)
|
||||
|
||||
def test_url_encoded_crlf_uppercase(self):
|
||||
result = scan_crlf_injection("/path?next=%0D%0AX-Injected: evil")
|
||||
assert result is not None
|
||||
self.assertEqual("block", result.severity)
|
||||
|
||||
def test_url_encoded_crlf_mixed_case(self):
|
||||
result = scan_crlf_injection("redirect=%0d%0ASet-Cookie: session=x")
|
||||
assert result is not None
|
||||
self.assertEqual("block", result.severity)
|
||||
|
||||
def test_literal_crlf_header_injection(self):
|
||||
result = scan_crlf_injection("value\r\nX-Injected: evil")
|
||||
assert result is not None
|
||||
self.assertEqual("block", result.severity)
|
||||
self.assertIn("header injection", result.reason)
|
||||
|
||||
def test_literal_crlf_in_body_not_flagged(self):
|
||||
# Plain CRLF without a following header-like pattern is not flagged
|
||||
# (legitimate in Windows text or multipart bodies)
|
||||
self.assertIsNone(scan_crlf_injection("line1\r\nline2\r\nline3"))
|
||||
|
||||
def test_clean_url_returns_none(self):
|
||||
self.assertIsNone(scan_crlf_injection("/api/v1/data?q=hello+world"))
|
||||
|
||||
def test_clean_body_returns_none(self):
|
||||
self.assertIsNone(scan_crlf_injection('{"key": "value", "other": "data"}'))
|
||||
|
||||
|
||||
class TestKnownSecretsNewVariants(unittest.TestCase):
|
||||
SECRET = "super-secret-token"
|
||||
ENV = {"EGRESS_TOKEN_0": SECRET}
|
||||
|
||||
@@ -926,6 +926,35 @@ class TestScanOutbound(unittest.TestCase):
|
||||
assert result is not None
|
||||
self.assertEqual("block", result.severity)
|
||||
|
||||
def test_crlf_in_query_blocked(self):
|
||||
# CRLF injection attempt via URL-encoded %0d%0a in a query param
|
||||
text = build_outbound_scan_text(
|
||||
host="api.example.com",
|
||||
path="/search",
|
||||
query="next=%0d%0aX-Injected%3A+evil",
|
||||
headers={},
|
||||
body="",
|
||||
)
|
||||
result = scan_outbound(_ROUTE, text, {})
|
||||
self.assertIsNotNone(result)
|
||||
assert result is not None
|
||||
self.assertEqual("block", result.severity)
|
||||
|
||||
def test_crlf_blocked_even_when_detectors_disabled(self):
|
||||
# CRLF scan runs unconditionally; outbound_detectors: false doesn't skip it
|
||||
route = Route(host="api.example.com", outbound_detectors=())
|
||||
text = build_outbound_scan_text(
|
||||
host="api.example.com",
|
||||
path="/data",
|
||||
query="",
|
||||
headers={"x-redirect": "value\r\nX-Injected: evil"},
|
||||
body="",
|
||||
)
|
||||
result = scan_outbound(route, text, {})
|
||||
self.assertIsNotNone(result)
|
||||
assert result is not None
|
||||
self.assertEqual("block", result.severity)
|
||||
|
||||
|
||||
# --- build_inbound_scan_text --------------------------------------------
|
||||
|
||||
|
||||
Reference in New Issue
Block a user