Files
bot-bottle/bot_bottle/dlp_detectors.py
T
didericis-claude abcb336e7c
lint / lint (push) Failing after 1m24s
test / unit (pull_request) Successful in 30s
test / integration (pull_request) Successful in 44s
fix(dlp): rework naive injection to proximity-based disclosure+jailbreak
Token detection is already handled by the token_patterns detector
running separately — calling it again from scan_naive_injection was
redundant. New logic:

- Warn on any disclosure phrase
- Warn on any jailbreak phrase
- Block when both appear within 500 chars of each other

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 20:34:21 +00:00

167 lines
5.2 KiB
Python

"""DLP detectors for the egress proxy (PRD 0053).
Pure Python, no mitmproxy dependency. Each detector is a module-level
function returning `ScanResult | None`.
Ships flat into the sidecar bundle image alongside
`egress_addon_core.py` — both this file and the package source use
the same try/except import shim pattern.
"""
from __future__ import annotations
import base64
import re
import typing
from urllib.parse import quote as url_quote
try:
from egress_addon_core import ScanResult # type: ignore[import-not-found]
except ImportError: # pragma: no cover - host-side path
from .egress_addon_core import ScanResult
# ---------------------------------------------------------------------------
# Token patterns detector (Phase 1a)
# ---------------------------------------------------------------------------
TOKEN_PATTERNS: tuple[tuple[str, re.Pattern[str]], ...] = (
("AWS access key", re.compile(r"AKIA[0-9A-Z]{16}")),
("GitHub token (classic)", re.compile(r"ghp_[A-Za-z0-9_]{36}")),
("GitHub fine-grained token", re.compile(r"github_pat_[A-Za-z0-9_]{82}")),
("Anthropic API key", re.compile(r"sk-ant-[A-Za-z0-9\-_]{93}")),
("OpenAI API key", re.compile(r"sk-[A-Za-z0-9]{48}")),
("Stripe live key", re.compile(r"sk_live_[A-Za-z0-9]{24}")),
("Generic Bearer JWT", re.compile(r"Bearer\s+[A-Za-z0-9._\-]{50,}")),
)
def scan_token_patterns(text: str) -> ScanResult | None:
for name, pattern in TOKEN_PATTERNS:
if pattern.search(text):
return ScanResult(
severity="block",
reason=f"outbound request contains {name}",
)
return None
# ---------------------------------------------------------------------------
# Known secrets detector (Phase 1b)
# ---------------------------------------------------------------------------
def _encoded_variants(secret: str) -> list[str]:
"""Return the secret plus base64, URL-encoded, and hex variants."""
variants = [secret]
secret_bytes = secret.encode("utf-8")
b64 = base64.b64encode(secret_bytes).decode("ascii")
if b64 != secret:
variants.append(b64)
url_enc = url_quote(secret, safe="")
if url_enc != secret:
variants.append(url_enc)
hex_enc = secret_bytes.hex()
if hex_enc != secret:
variants.append(hex_enc)
return variants
def scan_known_secrets(
text: str,
*,
env: typing.Mapping[str, str] | None = None,
) -> ScanResult | None:
if env is None:
return None
for key, value in env.items():
if not key.startswith("EGRESS_TOKEN_") or not value:
continue
for variant in _encoded_variants(value):
if variant in text:
return ScanResult(
severity="block",
reason=(
f"outbound request contains provisioned secret "
f"from {key}"
),
)
return None
# ---------------------------------------------------------------------------
# Naive prompt injection detector (Phase 2)
# ---------------------------------------------------------------------------
DISCLOSURE_PHRASES: tuple[re.Pattern[str], ...] = (
re.compile(r"(?i)system\s+prompt"),
re.compile(r"(?i)my\s+instructions\s+are"),
re.compile(r"(?i)original\s+instructions"),
re.compile(r"(?i)secret\s+instructions"),
re.compile(r"(?i)hidden\s+rules"),
)
JAILBREAK_PHRASES: tuple[re.Pattern[str], ...] = (
re.compile(r"(?i)ignore\s+previous"),
re.compile(r"(?i)forget\s+everything"),
re.compile(r"(?i)disregard\s+(?:all\s+)?(?:previous|prior)"),
re.compile(r"(?i)pretend\s+you\s+are"),
re.compile(r"(?i)act\s+as\s+(?:if|though)"),
)
PROXIMITY_CHARS = 500
def _min_distance(
a_matches: list[re.Match[str]],
b_matches: list[re.Match[str]],
) -> int | None:
"""Smallest char distance between any pair of matches."""
if not a_matches or not b_matches:
return None
best = None
for a in a_matches:
for b in b_matches:
gap = max(0, max(a.start(), b.start()) - min(a.end(), b.end()))
if best is None or gap < best:
best = gap
return best
def scan_naive_injection(text: str) -> ScanResult | None:
disclosure_hits = [m for p in DISCLOSURE_PHRASES for m in p.finditer(text)]
jailbreak_hits = [m for p in JAILBREAK_PHRASES for m in p.finditer(text)]
if disclosure_hits and jailbreak_hits:
dist = _min_distance(disclosure_hits, jailbreak_hits)
if dist is not None and dist <= PROXIMITY_CHARS:
return ScanResult(
severity="block",
reason=(
f"disclosure and jailbreak phrases within "
f"{dist} chars in response"
),
)
if disclosure_hits:
return ScanResult(
severity="warn",
reason="prompt disclosure phrase detected in response",
)
if jailbreak_hits:
return ScanResult(
severity="warn",
reason="jailbreak phrase detected in response",
)
return None
__all__ = [
"TOKEN_PATTERNS",
"scan_known_secrets",
"scan_naive_injection",
"scan_token_patterns",
]