feat(dlp): add 7 token patterns, Unicode normalization, CRLF injection detection (PRD 0053)
lint / lint (push) Successful in 1m43s
test / unit (pull_request) Successful in 40s
test / integration (pull_request) Successful in 53s

Token patterns: HuggingFace (hf_), Databricks (dapi), Slack (xox[baprs]-),
npm (npm_), SendGrid (SG.x.y), PyPI (pypi-), HashiCorp Vault (hvs.).

Unicode normalization (_normalize_text) applies NFKD + strips combining
marks and control chars before pattern matching, defeating fullwidth-char
and combining-mark evasion.

CRLF injection (scan_crlf_injection) detects %0d%0a in URLs and literal
\r\n header-injection patterns; runs unconditionally in scan_outbound
regardless of outbound_detectors config.
This commit is contained in:
2026-06-06 18:37:37 +00:00
parent baf1908f76
commit 50dd1a5ef7
4 changed files with 209 additions and 4 deletions
+111
View File
@@ -9,6 +9,8 @@ import unittest
from bot_bottle.dlp_detectors import (
_encoded_variants,
_normalize_text,
scan_crlf_injection,
scan_known_secrets,
scan_naive_injection,
scan_token_patterns,
@@ -209,6 +211,115 @@ class TestEncodedVariants(unittest.TestCase):
self.assertEqual(len(v), len(set(v)))
class TestScanTokenPatternsExtended(unittest.TestCase):
def test_huggingface_token(self):
result = scan_token_patterns("token=hf_" + "A" * 34) # gitleaks:allow
assert result is not None
self.assertIn("HuggingFace", result.reason)
def test_databricks_token(self):
result = scan_token_patterns("dapi" + "a" * 32) # gitleaks:allow
assert result is not None
self.assertIn("Databricks", result.reason)
def test_slack_bot_token(self):
# Use all-zero numeric segments to keep entropy low
result = scan_token_patterns("xoxb-00000000000-00000000000-" + "A" * 24) # gitleaks:allow
assert result is not None
self.assertIn("Slack", result.reason)
def test_npm_token(self):
result = scan_token_patterns("npm_" + "A" * 36) # gitleaks:allow
assert result is not None
self.assertIn("npm", result.reason)
def test_sendgrid_key(self):
result = scan_token_patterns("SG." + "A" * 22 + "." + "B" * 43) # gitleaks:allow
assert result is not None
self.assertIn("SendGrid", result.reason)
def test_pypi_token(self):
result = scan_token_patterns("pypi-" + "A" * 80) # gitleaks:allow
assert result is not None
self.assertIn("PyPI", result.reason)
def test_vault_token(self):
result = scan_token_patterns("hvs." + "A" * 24) # gitleaks:allow
assert result is not None
self.assertIn("Vault", result.reason)
class TestUnicodeNormalization(unittest.TestCase):
def test_fullwidth_chars_normalized(self):
# Fullwidth ASCII chars (U+FF21..U+FF3A) should map to ASCII
fullwidth_A = "" # FULLWIDTH LATIN CAPITAL LETTER A
# NFKD maps fullwidth A → A, so AKIA pattern becomes detectable
result = scan_token_patterns(fullwidth_A + "KIA" + "0" * 16)
assert result is not None
self.assertIn("AWS", result.reason)
def test_combining_marks_stripped(self):
# Combining mark inserted between chars (e.g. A + combining grave)
secret = "AKIA" + "̀" + "0" * 16 # AKIA with combining grave after A
normalized = _normalize_text(secret)
# Combining mark is stripped → AKIA0...0 is visible to regex
self.assertNotIn("̀", normalized)
result = scan_token_patterns(secret)
assert result is not None
self.assertIn("AWS", result.reason)
def test_control_chars_stripped(self):
# Null byte inserted to split a token
secret = "AK\x00IA" + "0" * 16
normalized = _normalize_text(secret)
self.assertNotIn("\x00", normalized)
def test_common_whitespace_preserved(self):
normalized = _normalize_text("line1\nline2\r\nline3\t end")
self.assertIn("\n", normalized)
self.assertIn("\r\n", normalized)
self.assertIn("\t", normalized)
def test_clean_text_unchanged(self):
text = "hello world 123"
self.assertEqual(text, _normalize_text(text))
class TestScanCrlfInjection(unittest.TestCase):
def test_url_encoded_crlf_lowercase(self):
result = scan_crlf_injection("/path?next=%0d%0aX-Injected: evil")
assert result is not None
self.assertEqual("block", result.severity)
self.assertIn("%0d%0a", result.reason)
def test_url_encoded_crlf_uppercase(self):
result = scan_crlf_injection("/path?next=%0D%0AX-Injected: evil")
assert result is not None
self.assertEqual("block", result.severity)
def test_url_encoded_crlf_mixed_case(self):
result = scan_crlf_injection("redirect=%0d%0ASet-Cookie: session=x")
assert result is not None
self.assertEqual("block", result.severity)
def test_literal_crlf_header_injection(self):
result = scan_crlf_injection("value\r\nX-Injected: evil")
assert result is not None
self.assertEqual("block", result.severity)
self.assertIn("header injection", result.reason)
def test_literal_crlf_in_body_not_flagged(self):
# Plain CRLF without a following header-like pattern is not flagged
# (legitimate in Windows text or multipart bodies)
self.assertIsNone(scan_crlf_injection("line1\r\nline2\r\nline3"))
def test_clean_url_returns_none(self):
self.assertIsNone(scan_crlf_injection("/api/v1/data?q=hello+world"))
def test_clean_body_returns_none(self):
self.assertIsNone(scan_crlf_injection('{"key": "value", "other": "data"}'))
class TestKnownSecretsNewVariants(unittest.TestCase):
SECRET = "super-secret-token"
ENV = {"EGRESS_TOKEN_0": SECRET}
+29
View File
@@ -855,6 +855,35 @@ class TestScanOutbound(unittest.TestCase):
assert result is not None
self.assertEqual("block", result.severity)
def test_crlf_in_query_blocked(self):
# CRLF injection attempt via URL-encoded %0d%0a in a query param
text = build_outbound_scan_text(
host="api.example.com",
path="/search",
query="next=%0d%0aX-Injected%3A+evil",
headers={},
body="",
)
result = scan_outbound(_ROUTE, text, {})
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_crlf_blocked_even_when_detectors_disabled(self):
# CRLF scan runs unconditionally; outbound_detectors: false doesn't skip it
route = Route(host="api.example.com", outbound_detectors=())
text = build_outbound_scan_text(
host="api.example.com",
path="/data",
query="",
headers={"x-redirect": "value\r\nX-Injected: evil"},
body="",
)
result = scan_outbound(route, text, {})
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
# --- build_inbound_scan_text --------------------------------------------