feat(dlp): add 7 token patterns, Unicode normalization, CRLF injection detection (PRD 0053)

Token patterns: HuggingFace (hf_), Databricks (dapi), Slack (xox[baprs]-),
npm (npm_), SendGrid (SG.x.y), PyPI (pypi-), HashiCorp Vault (hvs.).

Unicode normalization (_normalize_text) applies NFKD + strips combining
marks and control chars before pattern matching, defeating fullwidth-char
and combining-mark evasion.

CRLF injection (scan_crlf_injection) detects %0d%0a in URLs and literal
\r\n header-injection patterns; runs unconditionally in scan_outbound
regardless of outbound_detectors config.
This commit is contained in:
2026-06-06 18:37:37 +00:00
committed by didericis
parent 1ecef55fea
commit 451e6fc2fc
4 changed files with 205 additions and 4 deletions
+53 -2
View File
@@ -14,6 +14,7 @@ import base64
import gzip
import re
import typing
import unicodedata
from urllib.parse import quote as url_quote
try:
@@ -38,7 +39,24 @@ def _snippet(text: str, start: int, end: int) -> str:
# ---------------------------------------------------------------------------
# Token patterns detector (Phase 1a)
# Unicode normalization (defeats confusable-char and combining-mark evasion)
# ---------------------------------------------------------------------------
def _normalize_text(text: str) -> str:
# NFKD separates base characters from combining marks and resolves
# compatibility equivalents (fullwidth ASCII, ligatures, etc.)
decomposed = unicodedata.normalize("NFKD", text)
return "".join(
ch for ch in decomposed
# Strip combining marks inserted between chars to break patterns
if unicodedata.category(ch) != "Mn"
# Strip control chars; keep common whitespace (\n \r \t)
and (unicodedata.category(ch) != "Cc" or ch in "\n\r\t")
)
# ---------------------------------------------------------------------------
# Token patterns detector
# ---------------------------------------------------------------------------
TOKEN_PATTERNS: tuple[tuple[str, re.Pattern[str]], ...] = (
@@ -50,12 +68,20 @@ TOKEN_PATTERNS: tuple[tuple[str, re.Pattern[str]], ...] = (
("OpenAI project API key", re.compile(r"sk-proj-[A-Za-z0-9_\-]{48,}")),
("Stripe live key", re.compile(r"sk_live_[A-Za-z0-9]{24}")),
("Generic Bearer JWT", re.compile(r"Bearer\s+[A-Za-z0-9._\-]{50,}")),
("HuggingFace token", re.compile(r"hf_[A-Za-z0-9]{34,}")),
("Databricks token", re.compile(r"dapi[A-Za-z0-9]{32}")),
("Slack token", re.compile(r"xox[baprs]-[A-Za-z0-9]+-[A-Za-z0-9]+-[A-Za-z0-9]{24,}")),
("npm token", re.compile(r"npm_[A-Za-z0-9]{36}")),
("SendGrid API key", re.compile(r"SG\.[A-Za-z0-9_\-]{22}\.[A-Za-z0-9_\-]{43}")),
("PyPI token", re.compile(r"pypi-[A-Za-z0-9_\-]{80,}")),
("HashiCorp Vault token", re.compile(r"hvs\.[A-Za-z0-9_\-]{24,}")),
)
def scan_token_patterns(text: str, *, location: str = "body") -> ScanResult | None:
normalized = _normalize_text(text)
for name, pattern in TOKEN_PATTERNS:
m = pattern.search(text)
m = pattern.search(normalized)
if m is not None:
return ScanResult(
severity="block",
@@ -229,11 +255,36 @@ def scan_naive_injection(text: str) -> ScanResult | None:
return None
# ---------------------------------------------------------------------------
# CRLF injection detector
# ---------------------------------------------------------------------------
# URL-encoded CRLF is never legitimate in a request URL or header value.
_CRLF_ENCODED_RE = re.compile(r"%0[dD]%0[aA]", re.ASCII)
# Literal CRLF followed by a header-name pattern indicates header injection.
_CRLF_HEADER_INJECT_RE = re.compile(r"\r\n[A-Za-z][A-Za-z0-9\-]+\s*:", re.ASCII)
def scan_crlf_injection(text: str) -> ScanResult | None:
if _CRLF_ENCODED_RE.search(text):
return ScanResult(
severity="block",
reason="URL-encoded CRLF (%0d%0a) in outbound request",
)
if _CRLF_HEADER_INJECT_RE.search(text):
return ScanResult(
severity="block",
reason="CRLF header injection pattern in outbound request",
)
return None
__all__ = [
"REDACT",
"SNIPPET_CONTEXT",
"TOKEN_PATTERNS",
"redact_tokens",
"scan_crlf_injection",
"scan_known_secrets",
"scan_naive_injection",
"scan_token_patterns",
+12 -2
View File
@@ -574,15 +574,25 @@ def scan_outbound(
# at import time (the sidecar copies it flat alongside this file).
try:
from dlp_detectors import ( # type: ignore[import-not-found]
scan_token_patterns, scan_known_secrets,
scan_crlf_injection,
scan_known_secrets,
scan_token_patterns,
)
except ImportError: # pragma: no cover - host-side path
from .dlp_detectors import ( # type: ignore[import-not-found]
scan_token_patterns, scan_known_secrets,
scan_crlf_injection,
scan_known_secrets,
scan_token_patterns,
)
text = body if isinstance(body, str) else body.decode("utf-8", errors="replace")
# CRLF injection is never legitimate — runs unconditionally, not gated
# by outbound_detectors config.
result = scan_crlf_injection(text)
if result is not None:
return result
if _detector_enabled(route.outbound_detectors, "token_patterns"):
result = scan_token_patterns(text, location="body")
if result is not None: