330e836085
- _alnum_projection(): strip non-alphanumeric chars for separator-injection detection
- scan_known_secrets() gains two extra passes per secret after exact-variant matching:
alnum-projection exact match (catches hyphens/spaces between secret chars) and a
sliding-window partial-match scan (catches chunked substrings ≥ PARTIAL_MATCH_MIN_LEN)
- scan_known_secrets() accepts sensitive_prefixes param (default ("EGRESS_TOKEN_",))
so redact_tokens and call-sites can extend the scanned env-var prefix set
- scan_entropy() warn-only detector flagging windows with Shannon entropy ≥ 5.5 bits/char
- "entropy" added to OUTBOUND_DETECTOR_NAMES; scan_outbound opts it in only when
explicitly listed in dlp.outbound_detectors (never part of the default "all" set)
- scan_outbound reads BOT_BOTTLE_SENSITIVE_PREFIXES from environ to extend
scan_known_secrets beyond EGRESS_TOKEN_* without schema changes
- Binary bodies decoded via latin-1 fallback (bijective byte↔codepoint) instead
of utf-8 errors=replace, preserving ASCII secret strings in binary payloads
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
469 lines
17 KiB
Python
469 lines
17 KiB
Python
"""DLP detectors for the egress proxy (PRD 0053, prd-new).
|
|
|
|
Pure Python, no mitmproxy dependency. Each detector is a module-level
|
|
function returning `ScanResult | None`.
|
|
|
|
Ships flat into the sidecar bundle image alongside
|
|
`egress_addon_core.py` — both this file and the package source use
|
|
the same try/except import shim pattern.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import gzip
|
|
import re
|
|
import typing
|
|
import unicodedata
|
|
from math import log2
|
|
from collections import Counter
|
|
from urllib.parse import quote as url_quote
|
|
|
|
try:
|
|
from egress_addon_core import ScanResult # type: ignore[import-not-found]
|
|
except ImportError: # pragma: no cover - host-side path
|
|
from .egress_addon_core import ScanResult
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Snippet helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
SNIPPET_CONTEXT = 40 # chars of surrounding text to include on each side
|
|
REDACT = "********" # fixed-width replacement for the matched sensitive value
|
|
|
|
|
|
def _snippet(text: str, start: int, end: int) -> str:
|
|
"""Return context around a match with the matched span replaced by REDACT."""
|
|
before = text[max(0, start - SNIPPET_CONTEXT):start].replace("\n", " ").replace("\r", " ")
|
|
after = text[end:end + SNIPPET_CONTEXT].replace("\n", " ").replace("\r", " ")
|
|
return f"{before}{REDACT}{after}"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Unicode normalization (defeats confusable-char and combining-mark evasion)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _normalize_text(text: str) -> str:
|
|
# NFKD separates base characters from combining marks and resolves
|
|
# compatibility equivalents (fullwidth ASCII, ligatures, etc.)
|
|
decomposed = unicodedata.normalize("NFKD", text)
|
|
return "".join(
|
|
ch for ch in decomposed
|
|
# Strip combining marks inserted between chars to break patterns
|
|
if unicodedata.category(ch) != "Mn"
|
|
# Strip control chars; keep common whitespace (\n \r \t)
|
|
and (unicodedata.category(ch) != "Cc" or ch in "\n\r\t")
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Token patterns detector
|
|
# ---------------------------------------------------------------------------
|
|
|
|
TOKEN_PATTERNS: tuple[tuple[str, re.Pattern[str]], ...] = (
|
|
("AWS access key", re.compile(r"AKIA[0-9A-Z]{16}")),
|
|
("GitHub token (classic)", re.compile(r"ghp_[A-Za-z0-9_]{36}")),
|
|
("GitHub fine-grained token", re.compile(r"github_pat_[A-Za-z0-9_]{82}")),
|
|
("Anthropic API key", re.compile(r"sk-ant-[A-Za-z0-9\-_]{93}")),
|
|
("OpenAI API key", re.compile(r"sk-[A-Za-z0-9]{48}")),
|
|
("OpenAI project API key", re.compile(r"sk-proj-[A-Za-z0-9_\-]{48,}")),
|
|
("Stripe live key", re.compile(r"sk_live_[A-Za-z0-9]{24}")),
|
|
("Generic Bearer JWT", re.compile(r"Bearer\s+[A-Za-z0-9._\-]{50,}")),
|
|
("HuggingFace token", re.compile(r"hf_[A-Za-z0-9]{34,}")),
|
|
("Databricks token", re.compile(r"dapi[A-Za-z0-9]{32}")),
|
|
("Slack token", re.compile(r"xox[baprs]-[A-Za-z0-9]+-[A-Za-z0-9]+-[A-Za-z0-9]{24,}")),
|
|
("npm token", re.compile(r"npm_[A-Za-z0-9]{36}")),
|
|
("SendGrid API key", re.compile(r"SG\.[A-Za-z0-9_\-]{22}\.[A-Za-z0-9_\-]{43}")),
|
|
("PyPI token", re.compile(r"pypi-[A-Za-z0-9_\-]{80,}")),
|
|
("HashiCorp Vault token", re.compile(r"hvs\.[A-Za-z0-9_\-]{24,}")),
|
|
)
|
|
|
|
|
|
def scan_token_patterns(
|
|
text: str,
|
|
*,
|
|
location: str = "body",
|
|
safe_tokens: typing.AbstractSet[str] | None = None,
|
|
) -> ScanResult | None:
|
|
normalized = _normalize_text(text)
|
|
for name, pattern in TOKEN_PATTERNS:
|
|
for m in pattern.finditer(normalized):
|
|
value = m.group(0)
|
|
# A value the supervisor has approved (PRD 0062) is no longer a
|
|
# block — keep scanning so a second, un-approved token in the
|
|
# same request is still caught.
|
|
if safe_tokens is not None and value in safe_tokens:
|
|
continue
|
|
return ScanResult(
|
|
severity="block",
|
|
reason=f"{name} found in {location}",
|
|
location=location,
|
|
context=_snippet(normalized, m.start(), m.end()),
|
|
matched=value,
|
|
)
|
|
return None
|
|
|
|
|
|
def redact_tokens(
|
|
text: str,
|
|
*,
|
|
env: typing.Mapping[str, str] | None = None,
|
|
sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",),
|
|
) -> str:
|
|
"""Replace token pattern matches and (if env given) provisioned secrets with REDACT."""
|
|
for _, pattern in TOKEN_PATTERNS:
|
|
text = pattern.sub(REDACT, text)
|
|
if env is not None:
|
|
for key, value in env.items():
|
|
if any(key.startswith(p) for p in sensitive_prefixes) and value:
|
|
for variant in _encoded_variants(value):
|
|
text = text.replace(variant, REDACT)
|
|
return text
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Known secrets detector (Phase 1b, prd-new)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _encoded_variants(secret: str) -> list[str]:
|
|
"""Return the secret plus common encoded variants for exfil detection."""
|
|
seen: set[str] = {secret}
|
|
variants: list[str] = [secret]
|
|
|
|
def _add(v: str) -> None:
|
|
if v not in seen:
|
|
seen.add(v)
|
|
variants.append(v)
|
|
|
|
secret_bytes = secret.encode("utf-8")
|
|
|
|
# Standard base64 — with and without padding
|
|
b64 = base64.b64encode(secret_bytes).decode("ascii")
|
|
_add(b64)
|
|
_add(b64.rstrip("="))
|
|
|
|
# URL-safe base64 (JWT/OAuth use -_ alphabet) — with and without padding
|
|
b64url = base64.urlsafe_b64encode(secret_bytes).decode("ascii")
|
|
_add(b64url)
|
|
_add(b64url.rstrip("="))
|
|
|
|
# URL percent-encoding
|
|
_add(url_quote(secret, safe=""))
|
|
|
|
# Hex — lowercase and uppercase
|
|
_add(secret_bytes.hex())
|
|
_add(secret_bytes.hex().upper())
|
|
|
|
# Base32 (TOTP seeds, some DNS-exfil channels)
|
|
_add(base64.b32encode(secret_bytes).decode("ascii"))
|
|
|
|
# gzip + base64 (deterministic: mtime=0); recognisable by H4sI prefix
|
|
_add(base64.b64encode(gzip.compress(secret_bytes, mtime=0)).decode("ascii"))
|
|
|
|
return variants
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fragmentation-resistant helpers (prd-new)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# Minimum length of alnum projection for projection-based checks to run.
|
|
# Short secrets produce too many false positives in projection space.
|
|
_ALNUM_MIN_LEN = 8
|
|
|
|
# Minimum window length for the partial-substring sliding scan.
|
|
PARTIAL_MATCH_MIN_LEN = 12
|
|
|
|
|
|
def _alnum_projection(text: str) -> str:
|
|
"""Return text with every non-alphanumeric character stripped.
|
|
|
|
Used for fragmentation-resistant matching: separator-injected secrets
|
|
(spaces, hyphens, dots inserted between characters) are identical to
|
|
their originals in alnum projection space.
|
|
"""
|
|
return "".join(c for c in text if c.isalnum())
|
|
|
|
|
|
def _find_partial_window(secret_alnum: str, text_alnum: str, min_len: int) -> int | None:
|
|
"""Return the position in text_alnum where any min_len-char window of
|
|
secret_alnum first appears, or None.
|
|
|
|
Slides a window of width min_len across secret_alnum and searches for
|
|
each window in text_alnum. The first hit position is returned.
|
|
"""
|
|
if len(secret_alnum) < min_len or len(text_alnum) < min_len:
|
|
return None
|
|
for i in range(len(secret_alnum) - min_len + 1):
|
|
window = secret_alnum[i:i + min_len]
|
|
pos = text_alnum.find(window)
|
|
if pos >= 0:
|
|
return pos
|
|
return None
|
|
|
|
|
|
def scan_known_secrets(
|
|
text: str,
|
|
*,
|
|
location: str = "body",
|
|
env: typing.Mapping[str, str] | None = None,
|
|
sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",),
|
|
safe_tokens: typing.AbstractSet[str] | None = None,
|
|
) -> ScanResult | None:
|
|
if env is None:
|
|
return None
|
|
|
|
# Pre-compute alnum projection of the scan text once; reused per secret.
|
|
text_alnum: str | None = None
|
|
|
|
for key, value in env.items():
|
|
if not any(key.startswith(p) for p in sensitive_prefixes) or not value:
|
|
continue
|
|
|
|
# Pass 1: exact match across encoded variants (original behaviour).
|
|
for variant in _encoded_variants(value):
|
|
pos = text.find(variant)
|
|
if pos >= 0:
|
|
# The supervisor approves the exact encoded variant found
|
|
# (PRD 0062); a different encoding of the same secret is a
|
|
# fresh block.
|
|
if safe_tokens is not None and variant in safe_tokens:
|
|
continue
|
|
return ScanResult(
|
|
severity="block",
|
|
reason=f"provisioned secret from {key} found in {location}",
|
|
location=location,
|
|
context=_snippet(text, pos, pos + len(variant)),
|
|
matched=variant,
|
|
)
|
|
|
|
# Pass 2 & 3: fragmentation-resistant projection checks.
|
|
secret_alnum = _alnum_projection(value)
|
|
if len(secret_alnum) < _ALNUM_MIN_LEN:
|
|
continue
|
|
|
|
if text_alnum is None:
|
|
text_alnum = _alnum_projection(text)
|
|
|
|
# Pass 2: full alnum-projection exact match (catches separator injection).
|
|
pos2 = text_alnum.find(secret_alnum)
|
|
if pos2 >= 0:
|
|
return ScanResult(
|
|
severity="block",
|
|
reason=(
|
|
f"provisioned secret from {key} found in {location} "
|
|
f"(fragmented match — separator injection)"
|
|
),
|
|
location=location,
|
|
context=_snippet(text_alnum, pos2, pos2 + len(secret_alnum)),
|
|
)
|
|
|
|
# Pass 3: sliding-window partial match (catches chunked-substring leaks).
|
|
pos3 = _find_partial_window(secret_alnum, text_alnum, PARTIAL_MATCH_MIN_LEN)
|
|
if pos3 is not None:
|
|
return ScanResult(
|
|
severity="block",
|
|
reason=(
|
|
f"provisioned secret from {key} found in {location} "
|
|
f"(partial match — at least {PARTIAL_MATCH_MIN_LEN} consecutive "
|
|
f"alphanumeric chars)"
|
|
),
|
|
location=location,
|
|
context=_snippet(text_alnum, pos3, pos3 + PARTIAL_MATCH_MIN_LEN),
|
|
)
|
|
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Entropy detector (warn-only, prd-new)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# Sliding window size and step for the entropy scan.
|
|
ENTROPY_WINDOW = 64
|
|
ENTROPY_STEP = 32
|
|
|
|
# Bits-per-character threshold. Random ASCII printable ≈ 6.6 bits; random
|
|
# lowercase hex ≈ 4 bits; random base64url ≈ 6 bits. 5.5 sits above
|
|
# typical structured data (JSON, URLs) while staying below truly random
|
|
# content.
|
|
ENTROPY_BLOCK_THRESHOLD = 5.5
|
|
|
|
|
|
def _shannon_entropy(text: str) -> float:
|
|
if not text:
|
|
return 0.0
|
|
counts = Counter(text)
|
|
n = len(text)
|
|
return -sum((c / n) * log2(c / n) for c in counts.values())
|
|
|
|
|
|
def scan_entropy(
|
|
text: str,
|
|
*,
|
|
location: str = "body",
|
|
window: int = ENTROPY_WINDOW,
|
|
threshold: float = ENTROPY_BLOCK_THRESHOLD,
|
|
) -> ScanResult | None:
|
|
"""Warn-only detector: flag windows of `window` chars with Shannon entropy
|
|
above `threshold` bits per character.
|
|
|
|
Never blocks; always returns severity='warn'. Disabled by default —
|
|
routes must opt in via dlp.outbound_detectors=['entropy'].
|
|
"""
|
|
if not text:
|
|
return None
|
|
step = max(1, window // 2)
|
|
end = len(text)
|
|
# Scan overlapping windows; also check the final tail if shorter than window.
|
|
positions = list(range(0, end - window + 1, step))
|
|
if end < window:
|
|
positions = [0]
|
|
elif (end - window) % step != 0:
|
|
positions.append(end - window)
|
|
for i in positions:
|
|
chunk = text[i:i + window]
|
|
if _shannon_entropy(chunk) >= threshold:
|
|
return ScanResult(
|
|
severity="warn",
|
|
reason=f"high-entropy content in {location} (possible encrypted exfil)",
|
|
location=location,
|
|
context=_snippet(text, i, i + len(chunk)),
|
|
)
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Naive prompt injection detector (Phase 2)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
DISCLOSURE_PHRASES: tuple[re.Pattern[str], ...] = (
|
|
re.compile(r"(?i)system\s+prompt"),
|
|
re.compile(r"(?i)my\s+instructions\s+are"),
|
|
re.compile(r"(?i)original\s+instructions"),
|
|
re.compile(r"(?i)secret\s+instructions"),
|
|
re.compile(r"(?i)hidden\s+rules"),
|
|
)
|
|
|
|
JAILBREAK_PHRASES: tuple[re.Pattern[str], ...] = (
|
|
re.compile(r"(?i)ignore\s+previous"),
|
|
re.compile(r"(?i)forget\s+everything"),
|
|
re.compile(r"(?i)disregard\s+(?:all\s+)?(?:previous|prior)"),
|
|
re.compile(r"(?i)pretend\s+you\s+are"),
|
|
re.compile(r"(?i)act\s+as\s+(?:if|though)"),
|
|
)
|
|
|
|
|
|
PROXIMITY_CHARS = 500
|
|
|
|
|
|
def _closest_pair(
|
|
a_matches: list[re.Match[str]],
|
|
b_matches: list[re.Match[str]],
|
|
) -> tuple[re.Match[str], re.Match[str]] | None:
|
|
"""Return the pair (a, b) with the smallest character gap, or None."""
|
|
best: tuple[re.Match[str], re.Match[str]] | None = None
|
|
best_gap: int | None = None
|
|
for a in a_matches:
|
|
for b in b_matches:
|
|
gap = max(0, max(a.start(), b.start()) - min(a.end(), b.end()))
|
|
if best_gap is None or gap < best_gap:
|
|
best_gap = gap
|
|
best = (a, b)
|
|
return best
|
|
|
|
|
|
def scan_naive_injection(text: str) -> ScanResult | None:
|
|
location = "response body"
|
|
disclosure_hits = [m for p in DISCLOSURE_PHRASES for m in p.finditer(text)]
|
|
jailbreak_hits = [m for p in JAILBREAK_PHRASES for m in p.finditer(text)]
|
|
|
|
if disclosure_hits and jailbreak_hits:
|
|
pair = _closest_pair(disclosure_hits, jailbreak_hits)
|
|
if pair is not None:
|
|
dist = max(0, max(pair[0].start(), pair[1].start()) - min(pair[0].end(), pair[1].end()))
|
|
if dist <= PROXIMITY_CHARS:
|
|
first = pair[0] if pair[0].start() <= pair[1].start() else pair[1]
|
|
return ScanResult(
|
|
severity="block",
|
|
reason=(
|
|
f"disclosure and jailbreak phrases within "
|
|
f"{dist} chars in {location}"
|
|
),
|
|
location=location,
|
|
context=_snippet(text, first.start(), first.end()),
|
|
)
|
|
|
|
if disclosure_hits:
|
|
m = disclosure_hits[0]
|
|
return ScanResult(
|
|
severity="warn",
|
|
reason=f"prompt disclosure phrase detected in {location}",
|
|
location=location,
|
|
context=_snippet(text, m.start(), m.end()),
|
|
)
|
|
|
|
if jailbreak_hits:
|
|
m = jailbreak_hits[0]
|
|
return ScanResult(
|
|
severity="warn",
|
|
reason=f"jailbreak phrase detected in {location}",
|
|
location=location,
|
|
context=_snippet(text, m.start(), m.end()),
|
|
)
|
|
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CRLF injection detector
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# URL-encoded CRLF is never legitimate in a request URL or header value.
|
|
_CRLF_ENCODED_RE = re.compile(r"%0[dD]%0[aA]", re.ASCII)
|
|
# Literal CRLF followed by a header-name pattern indicates header injection.
|
|
_CRLF_HEADER_INJECT_RE = re.compile(r"\r\n[A-Za-z][A-Za-z0-9\-]+\s*:", re.ASCII)
|
|
|
|
|
|
def strip_crlf(text: str) -> str:
|
|
"""Remove URL-encoded and literal CRLF injection sequences from a request
|
|
surface (PRD 0062 redact policy). Used to scrub the request line / headers
|
|
so the request can be forwarded instead of hard-blocked."""
|
|
text = _CRLF_ENCODED_RE.sub("", text)
|
|
return _CRLF_HEADER_INJECT_RE.sub(lambda m: m.group(0)[2:], text)
|
|
|
|
|
|
def scan_crlf_injection(text: str) -> ScanResult | None:
|
|
if _CRLF_ENCODED_RE.search(text):
|
|
return ScanResult(
|
|
severity="block",
|
|
reason="URL-encoded CRLF (%0d%0a) in outbound request",
|
|
)
|
|
if _CRLF_HEADER_INJECT_RE.search(text):
|
|
return ScanResult(
|
|
severity="block",
|
|
reason="CRLF header injection pattern in outbound request",
|
|
)
|
|
return None
|
|
|
|
|
|
__all__ = [
|
|
"ENTROPY_BLOCK_THRESHOLD",
|
|
"ENTROPY_WINDOW",
|
|
"ENTROPY_STEP",
|
|
"PARTIAL_MATCH_MIN_LEN",
|
|
"REDACT",
|
|
"SNIPPET_CONTEXT",
|
|
"TOKEN_PATTERNS",
|
|
"_alnum_projection",
|
|
"_shannon_entropy",
|
|
"redact_tokens",
|
|
"scan_crlf_injection",
|
|
"scan_entropy",
|
|
"scan_known_secrets",
|
|
"scan_naive_injection",
|
|
"scan_token_patterns",
|
|
"strip_crlf",
|
|
]
|