Files
bot-bottle/bot_bottle/dlp_detectors.py
T
didericis-claude 6d8c4d62bf
lint / lint (push) Failing after 1m36s
test / unit (pull_request) Successful in 37s
test / integration (pull_request) Successful in 52s
perf(dlp): replace O(n*m) proximity check with O(n log n) sorted scan
Sort all match positions and scan linearly instead of checking every
a-b pair. Early-exits on overlap (gap=0) or once the gap drops below
the threshold.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 21:01:56 +00:00

181 lines
5.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""DLP detectors for the egress proxy (PRD 0053).
Pure Python, no mitmproxy dependency. Each detector is a module-level
function returning `ScanResult | None`.
Ships flat into the sidecar bundle image alongside
`egress_addon_core.py` — both this file and the package source use
the same try/except import shim pattern.
"""
from __future__ import annotations
import base64
import re
import typing
from urllib.parse import quote as url_quote
try:
from egress_addon_core import ScanResult # type: ignore[import-not-found]
except ImportError: # pragma: no cover - host-side path
from .egress_addon_core import ScanResult
# ---------------------------------------------------------------------------
# Token patterns detector (Phase 1a)
# ---------------------------------------------------------------------------
TOKEN_PATTERNS: tuple[tuple[str, re.Pattern[str]], ...] = (
("AWS access key", re.compile(r"AKIA[0-9A-Z]{16}")),
("GitHub token (classic)", re.compile(r"ghp_[A-Za-z0-9_]{36}")),
("GitHub fine-grained token", re.compile(r"github_pat_[A-Za-z0-9_]{82}")),
("Anthropic API key", re.compile(r"sk-ant-[A-Za-z0-9\-_]{93}")),
("OpenAI API key", re.compile(r"sk-[A-Za-z0-9]{48}")),
("Stripe live key", re.compile(r"sk_live_[A-Za-z0-9]{24}")),
("Generic Bearer JWT", re.compile(r"Bearer\s+[A-Za-z0-9._\-]{50,}")),
)
def scan_token_patterns(text: str) -> ScanResult | None:
for name, pattern in TOKEN_PATTERNS:
if pattern.search(text):
return ScanResult(
severity="block",
reason=f"outbound request contains {name}",
)
return None
# ---------------------------------------------------------------------------
# Known secrets detector (Phase 1b)
# ---------------------------------------------------------------------------
def _encoded_variants(secret: str) -> list[str]:
"""Return the secret plus base64, URL-encoded, and hex variants."""
variants = [secret]
secret_bytes = secret.encode("utf-8")
b64 = base64.b64encode(secret_bytes).decode("ascii")
if b64 != secret:
variants.append(b64)
url_enc = url_quote(secret, safe="")
if url_enc != secret:
variants.append(url_enc)
hex_enc = secret_bytes.hex()
if hex_enc != secret:
variants.append(hex_enc)
return variants
def scan_known_secrets(
text: str,
*,
env: typing.Mapping[str, str] | None = None,
) -> ScanResult | None:
if env is None:
return None
for key, value in env.items():
if not key.startswith("EGRESS_TOKEN_") or not value:
continue
for variant in _encoded_variants(value):
if variant in text:
return ScanResult(
severity="block",
reason=(
f"outbound request contains provisioned secret "
f"from {key}"
),
)
return None
# ---------------------------------------------------------------------------
# Naive prompt injection detector (Phase 2)
# ---------------------------------------------------------------------------
DISCLOSURE_PHRASES: tuple[re.Pattern[str], ...] = (
re.compile(r"(?i)system\s+prompt"),
re.compile(r"(?i)my\s+instructions\s+are"),
re.compile(r"(?i)original\s+instructions"),
re.compile(r"(?i)secret\s+instructions"),
re.compile(r"(?i)hidden\s+rules"),
)
JAILBREAK_PHRASES: tuple[re.Pattern[str], ...] = (
re.compile(r"(?i)ignore\s+previous"),
re.compile(r"(?i)forget\s+everything"),
re.compile(r"(?i)disregard\s+(?:all\s+)?(?:previous|prior)"),
re.compile(r"(?i)pretend\s+you\s+are"),
re.compile(r"(?i)act\s+as\s+(?:if|though)"),
)
PROXIMITY_CHARS = 500
def _nearby(
a_matches: list[re.Match[str]],
b_matches: list[re.Match[str]],
threshold: int,
) -> int | None:
"""Return the smallest char gap between any ab pair, or None if
both lists are empty. O(n log n) via sort + linear scan."""
if not a_matches or not b_matches:
return None
events = sorted(
[(m.start(), m.end(), "a") for m in a_matches]
+ [(m.start(), m.end(), "b") for m in b_matches],
)
best: int | None = None
prev_end: int | None = None
prev_tag: str | None = None
for start, end, tag in events:
if prev_tag is not None and prev_tag != tag and prev_end is not None:
gap = max(0, start - prev_end)
if best is None or gap < best:
best = gap
if best == 0:
return 0
if best <= threshold:
return best
prev_end = end if prev_end is None else max(prev_end, end)
prev_tag = tag
return best
def scan_naive_injection(text: str) -> ScanResult | None:
disclosure_hits = [m for p in DISCLOSURE_PHRASES for m in p.finditer(text)]
jailbreak_hits = [m for p in JAILBREAK_PHRASES for m in p.finditer(text)]
if disclosure_hits and jailbreak_hits:
dist = _nearby(disclosure_hits, jailbreak_hits, PROXIMITY_CHARS)
if dist is not None and dist <= PROXIMITY_CHARS:
return ScanResult(
severity="block",
reason=(
f"disclosure and jailbreak phrases within "
f"{dist} chars in response"
),
)
if disclosure_hits:
return ScanResult(
severity="warn",
reason="prompt disclosure phrase detected in response",
)
if jailbreak_hits:
return ScanResult(
severity="warn",
reason="jailbreak phrase detected in response",
)
return None
__all__ = [
"TOKEN_PATTERNS",
"scan_known_secrets",
"scan_naive_injection",
"scan_token_patterns",
]