feat(dlp): fragmentation resistance, entropy detector, broadened known-value scan
- _alnum_projection(): strip non-alphanumeric chars for separator-injection detection
- scan_known_secrets() gains two extra passes per secret after exact-variant matching:
alnum-projection exact match (catches hyphens/spaces between secret chars) and a
sliding-window partial-match scan (catches chunked substrings ≥ PARTIAL_MATCH_MIN_LEN)
- scan_known_secrets() accepts sensitive_prefixes param (default ("EGRESS_TOKEN_",))
so redact_tokens and call-sites can extend the scanned env-var prefix set
- scan_entropy() warn-only detector flagging windows with Shannon entropy ≥ 5.5 bits/char
- "entropy" added to OUTBOUND_DETECTOR_NAMES; scan_outbound opts it in only when
explicitly listed in dlp.outbound_detectors (never part of the default "all" set)
- scan_outbound reads BOT_BOTTLE_SENSITIVE_PREFIXES from environ to extend
scan_known_secrets beyond EGRESS_TOKEN_* without schema changes
- Binary bodies decoded via latin-1 fallback (bijective byte↔codepoint) instead
of utf-8 errors=replace, preserving ASCII secret strings in binary payloads
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,18 +1,24 @@
|
||||
"""Unit: DLP detectors (PRD 0053).
|
||||
"""Unit: DLP detectors (PRD 0053, prd-new).
|
||||
|
||||
Tests for token pattern scanning, known secret detection, and
|
||||
naive prompt injection detection."""
|
||||
Tests for token pattern scanning, known secret detection, fragmentation-
|
||||
resistant matching, entropy scoring, and naive prompt injection detection."""
|
||||
|
||||
import base64
|
||||
import gzip
|
||||
import unittest
|
||||
|
||||
from bot_bottle.dlp_detectors import (
|
||||
ENTROPY_BLOCK_THRESHOLD,
|
||||
ENTROPY_WINDOW,
|
||||
PARTIAL_MATCH_MIN_LEN,
|
||||
REDACT,
|
||||
_alnum_projection,
|
||||
_encoded_variants,
|
||||
_normalize_text,
|
||||
_shannon_entropy,
|
||||
redact_tokens,
|
||||
scan_crlf_injection,
|
||||
scan_entropy,
|
||||
scan_known_secrets,
|
||||
scan_naive_injection,
|
||||
scan_token_patterns,
|
||||
@@ -502,6 +508,187 @@ class TestStripCrlf(unittest.TestCase):
|
||||
from bot_bottle.dlp_detectors import strip_crlf
|
||||
self.assertEqual("/api/v1/data?q=hello", strip_crlf("/api/v1/data?q=hello"))
|
||||
|
||||
class TestAlnumProjection(unittest.TestCase):
|
||||
def test_alphanumeric_unchanged(self):
|
||||
self.assertEqual("abc123XYZ", _alnum_projection("abc123XYZ"))
|
||||
|
||||
def test_strips_hyphens(self):
|
||||
self.assertEqual("mysecretvalue", _alnum_projection("my-secret-value"))
|
||||
|
||||
def test_strips_spaces(self):
|
||||
self.assertEqual("mysecretvalue", _alnum_projection("my secret value"))
|
||||
|
||||
def test_strips_dots_and_underscores(self):
|
||||
self.assertEqual("mysecretvalue", _alnum_projection("my.secret_value"))
|
||||
|
||||
def test_empty_string(self):
|
||||
self.assertEqual("", _alnum_projection(""))
|
||||
|
||||
def test_all_special_chars(self):
|
||||
self.assertEqual("", _alnum_projection("!@#$%^&*()"))
|
||||
|
||||
|
||||
class TestFragmentationResistantMatching(unittest.TestCase):
|
||||
"""scan_known_secrets catches separator-injection and partial-substring evasion."""
|
||||
|
||||
# Secrets long enough that their alnum projections are ≥ 8 chars.
|
||||
SECRET = "supersecrettoken99"
|
||||
ENV = {"EGRESS_TOKEN_0": SECRET}
|
||||
|
||||
def test_exact_match_still_works(self):
|
||||
result = scan_known_secrets(f"key={self.SECRET}", env=self.ENV)
|
||||
self.assertIsNotNone(result)
|
||||
assert result is not None
|
||||
self.assertEqual("block", result.severity)
|
||||
|
||||
def test_separator_injection_blocked(self):
|
||||
# Hyphens inserted between chars of the secret.
|
||||
fragmented = "-".join(self.SECRET)
|
||||
result = scan_known_secrets(f"data={fragmented}", env=self.ENV)
|
||||
self.assertIsNotNone(result)
|
||||
assert result is not None
|
||||
self.assertEqual("block", result.severity)
|
||||
self.assertIn("separator injection", result.reason)
|
||||
|
||||
def test_space_separator_blocked(self):
|
||||
fragmented = " ".join(self.SECRET)
|
||||
result = scan_known_secrets(f"body: {fragmented}", env=self.ENV)
|
||||
self.assertIsNotNone(result)
|
||||
assert result is not None
|
||||
self.assertIn("separator injection", result.reason)
|
||||
|
||||
def test_partial_substring_blocked(self):
|
||||
# First PARTIAL_MATCH_MIN_LEN alnum chars of the secret, no separators.
|
||||
partial = _alnum_projection(self.SECRET)[:PARTIAL_MATCH_MIN_LEN]
|
||||
result = scan_known_secrets(f"x={partial}&y=other", env=self.ENV)
|
||||
self.assertIsNotNone(result)
|
||||
assert result is not None
|
||||
self.assertEqual("block", result.severity)
|
||||
self.assertIn("partial match", result.reason)
|
||||
|
||||
def test_short_secret_skips_projection(self):
|
||||
# Secrets shorter than _ALNUM_MIN_LEN in alnum projection are not
|
||||
# fragmentation-checked (too many false positives).
|
||||
short_env = {"EGRESS_TOKEN_0": "abc"}
|
||||
# "a b c" has alnum projection "abc" (3 chars, < 8); should not block.
|
||||
self.assertIsNone(scan_known_secrets("a b c", env=short_env))
|
||||
|
||||
def test_clean_text_not_blocked(self):
|
||||
self.assertIsNone(scan_known_secrets("nothing to see here", env=self.ENV))
|
||||
|
||||
def test_sensitive_prefixes_param_extra_prefix(self):
|
||||
env = {"MY_CRED_0": self.SECRET, "IGNORED": "other"}
|
||||
result = scan_known_secrets(
|
||||
f"key={self.SECRET}",
|
||||
env=env,
|
||||
sensitive_prefixes=("MY_CRED_",),
|
||||
)
|
||||
self.assertIsNotNone(result)
|
||||
assert result is not None
|
||||
self.assertIn("MY_CRED_0", result.reason)
|
||||
|
||||
def test_sensitive_prefixes_default_only_egress_token(self):
|
||||
# A value under a non-EGRESS_TOKEN_ key is ignored with default prefixes.
|
||||
env = {"MY_CRED_0": self.SECRET}
|
||||
self.assertIsNone(scan_known_secrets(f"key={self.SECRET}", env=env))
|
||||
|
||||
def test_canary_prefix_detected(self):
|
||||
canary_value = "canary-fake-secret-value-xyz"
|
||||
env = {"EGRESS_TOKEN_CANARY": canary_value}
|
||||
result = scan_known_secrets(f"x={canary_value}", env=env)
|
||||
self.assertIsNotNone(result)
|
||||
assert result is not None
|
||||
self.assertIn("EGRESS_TOKEN_CANARY", result.reason)
|
||||
|
||||
|
||||
class TestRedactTokensBroadenedPrefixes(unittest.TestCase):
|
||||
SECRET = "my-provisioned-secret"
|
||||
|
||||
def test_default_redacts_egress_token(self):
|
||||
env = {"EGRESS_TOKEN_0": self.SECRET}
|
||||
out = redact_tokens(f"val={self.SECRET}", env=env)
|
||||
self.assertNotIn(self.SECRET, out)
|
||||
self.assertIn(REDACT, out)
|
||||
|
||||
def test_extra_prefix_redacted(self):
|
||||
env = {"MY_SECRET_KEY": self.SECRET}
|
||||
out = redact_tokens(
|
||||
f"val={self.SECRET}",
|
||||
env=env,
|
||||
sensitive_prefixes=("MY_SECRET_",),
|
||||
)
|
||||
self.assertNotIn(self.SECRET, out)
|
||||
self.assertIn(REDACT, out)
|
||||
|
||||
def test_non_matching_prefix_not_redacted(self):
|
||||
env = {"MY_SECRET_KEY": self.SECRET}
|
||||
out = redact_tokens(f"val={self.SECRET}", env=env)
|
||||
# Default prefixes only include EGRESS_TOKEN_ → secret not redacted
|
||||
self.assertIn(self.SECRET, out)
|
||||
|
||||
|
||||
class TestShannonEntropy(unittest.TestCase):
|
||||
def test_empty_string_zero(self):
|
||||
self.assertEqual(0.0, _shannon_entropy(""))
|
||||
|
||||
def test_single_char_zero(self):
|
||||
self.assertEqual(0.0, _shannon_entropy("aaaaaa"))
|
||||
|
||||
def test_two_equal_chars_one_bit(self):
|
||||
self.assertAlmostEqual(1.0, _shannon_entropy("abababab"), places=10)
|
||||
|
||||
def test_high_entropy_random_like(self):
|
||||
# Uniform 64-char string over 64 distinct symbols has entropy 6 bits.
|
||||
import string
|
||||
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
|
||||
text = alphabet # each char appears exactly once
|
||||
self.assertAlmostEqual(6.0, _shannon_entropy(text), places=10)
|
||||
|
||||
|
||||
class TestScanEntropy(unittest.TestCase):
|
||||
def test_empty_returns_none(self):
|
||||
self.assertIsNone(scan_entropy(""))
|
||||
|
||||
def test_low_entropy_returns_none(self):
|
||||
# Highly repetitive text has low entropy.
|
||||
self.assertIsNone(scan_entropy("a" * 200))
|
||||
|
||||
def test_high_entropy_warns(self):
|
||||
# Build a 64-char string with entropy > ENTROPY_BLOCK_THRESHOLD.
|
||||
# Use all 64 distinct printable chars to maximise entropy (~6 bits).
|
||||
import string
|
||||
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
|
||||
result = scan_entropy(alphabet, threshold=ENTROPY_BLOCK_THRESHOLD)
|
||||
self.assertIsNotNone(result)
|
||||
assert result is not None
|
||||
self.assertEqual("warn", result.severity)
|
||||
self.assertIn("high-entropy", result.reason)
|
||||
|
||||
def test_never_blocks(self):
|
||||
import string
|
||||
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
|
||||
result = scan_entropy(alphabet)
|
||||
# scan_entropy is warn-only; it must never return severity="block".
|
||||
if result is not None:
|
||||
self.assertNotEqual("block", result.severity)
|
||||
|
||||
def test_location_in_result(self):
|
||||
import string
|
||||
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
|
||||
result = scan_entropy(alphabet, location="authorization header")
|
||||
if result is not None:
|
||||
self.assertIn("authorization header", result.location)
|
||||
|
||||
def test_structured_json_no_warn(self):
|
||||
# Typical JSON has low entropy and should not be flagged.
|
||||
json_body = '{"status": "ok", "message": "hello world", "count": 42}'
|
||||
self.assertIsNone(scan_entropy(json_body))
|
||||
|
||||
def test_short_text_below_window(self):
|
||||
# Text shorter than the window: checked as one chunk.
|
||||
# Use a uniform string to ensure it won't be flagged.
|
||||
self.assertIsNone(scan_entropy("abcde", threshold=ENTROPY_BLOCK_THRESHOLD))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
@@ -1273,6 +1273,102 @@ class TestBuildTokenAllowPayload(unittest.TestCase):
|
||||
result = ScanResult(severity="block", reason="r", matched="x")
|
||||
payload = build_token_allow_payload("h", "GET", "/", result)
|
||||
self.assertNotIn("context:", payload)
|
||||
class TestScanOutboundEnhanced(unittest.TestCase):
|
||||
"""scan_outbound changes from prd-new: binary decode, entropy detector,
|
||||
broadened known-value prefixes, fragmentation resistance."""
|
||||
|
||||
_ROUTE = Route(host="api.example.com")
|
||||
_ROUTE_ENTROPY = Route(
|
||||
host="api.example.com",
|
||||
outbound_detectors=("entropy",),
|
||||
)
|
||||
|
||||
def test_binary_body_latin1_decode_finds_ascii_secret(self):
|
||||
# Body contains valid ASCII secret surrounded by non-UTF-8 bytes.
|
||||
secret = "supersecrettoken99"
|
||||
env = {"EGRESS_TOKEN_0": secret}
|
||||
# Wrap the secret in bytes that are invalid UTF-8.
|
||||
body = b"\x80\x81" + secret.encode("ascii") + b"\xff"
|
||||
result = scan_outbound(self._ROUTE, body, env)
|
||||
self.assertIsNotNone(result)
|
||||
assert result is not None
|
||||
self.assertEqual("block", result.severity)
|
||||
|
||||
def test_binary_body_valid_utf8_decoded_correctly(self):
|
||||
env = {"EGRESS_TOKEN_0": "mysecret"}
|
||||
# Valid UTF-8 body — should be decoded as UTF-8, not latin-1.
|
||||
body = "clean body with mysecret".encode("utf-8")
|
||||
result = scan_outbound(self._ROUTE, body, env)
|
||||
self.assertIsNotNone(result)
|
||||
|
||||
def test_entropy_detector_off_by_default(self):
|
||||
import string
|
||||
# High-entropy content should NOT warn if the route has no entropy detector.
|
||||
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
|
||||
result = scan_outbound(self._ROUTE, alphabet, {})
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_entropy_detector_warns_when_enabled(self):
|
||||
import string
|
||||
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
|
||||
result = scan_outbound(self._ROUTE_ENTROPY, alphabet, {})
|
||||
self.assertIsNotNone(result)
|
||||
assert result is not None
|
||||
self.assertEqual("warn", result.severity)
|
||||
|
||||
def test_bot_bottle_sensitive_prefixes_env_var(self):
|
||||
# When the sidecar env contains BOT_BOTTLE_SENSITIVE_PREFIXES,
|
||||
# scan_outbound should scan those additional prefixes.
|
||||
secret = "extra-sensitive-value-abc"
|
||||
env = {
|
||||
"MY_CRED_KEY": secret,
|
||||
"BOT_BOTTLE_SENSITIVE_PREFIXES": "MY_CRED_",
|
||||
}
|
||||
result = scan_outbound(self._ROUTE, f"x={secret}", env)
|
||||
self.assertIsNotNone(result)
|
||||
assert result is not None
|
||||
self.assertEqual("block", result.severity)
|
||||
|
||||
def test_bot_bottle_sensitive_prefixes_multiple(self):
|
||||
secret = "my-api-key-value-xyz"
|
||||
env = {
|
||||
"ANTHROPIC_API_0": secret,
|
||||
"BOT_BOTTLE_SENSITIVE_PREFIXES": "ANTHROPIC_API_,OTHER_",
|
||||
}
|
||||
result = scan_outbound(self._ROUTE, f"auth={secret}", env)
|
||||
self.assertIsNotNone(result)
|
||||
|
||||
def test_canary_detected_via_egress_token_canary(self):
|
||||
# The canary (injected as EGRESS_TOKEN_CANARY) is caught by known_secrets.
|
||||
canary = "canaryvalue12345abcdef"
|
||||
env = {"EGRESS_TOKEN_CANARY": canary}
|
||||
result = scan_outbound(self._ROUTE, f"data={canary}", env)
|
||||
self.assertIsNotNone(result)
|
||||
assert result is not None
|
||||
self.assertEqual("block", result.severity)
|
||||
self.assertIn("EGRESS_TOKEN_CANARY", result.reason)
|
||||
|
||||
def test_fragmented_canary_blocked(self):
|
||||
# Canary with separators injected is still caught.
|
||||
canary = "supersecretcanary99"
|
||||
env = {"EGRESS_TOKEN_CANARY": canary}
|
||||
fragmented = "-".join(canary)
|
||||
result = scan_outbound(self._ROUTE, f"x={fragmented}", env)
|
||||
self.assertIsNotNone(result)
|
||||
|
||||
|
||||
class TestOutboundDetectorNames(unittest.TestCase):
|
||||
def test_entropy_in_outbound_detector_names(self):
|
||||
from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES
|
||||
self.assertIn("entropy", OUTBOUND_DETECTOR_NAMES)
|
||||
|
||||
def test_known_secrets_in_outbound_detector_names(self):
|
||||
from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES
|
||||
self.assertIn("known_secrets", OUTBOUND_DETECTOR_NAMES)
|
||||
|
||||
def test_token_patterns_in_outbound_detector_names(self):
|
||||
from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES
|
||||
self.assertIn("token_patterns", OUTBOUND_DETECTOR_NAMES)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -42,6 +42,7 @@ def _plan(
|
||||
routes_path=routes_path,
|
||||
routes=("route",),
|
||||
token_env_map={"EGRESS_TOKEN_0": "HOST_TOKEN"},
|
||||
canary="",
|
||||
)
|
||||
if git:
|
||||
key_path = stage_dir / "origin-key"
|
||||
@@ -271,7 +272,7 @@ def _build_plan(stage_dir: Path) -> MacosContainerBottlePlan:
|
||||
manifest=_MANIFEST,
|
||||
stage_dir=stage_dir,
|
||||
git_gate_plan=cast(GitGatePlan, SimpleNamespace(upstreams=())),
|
||||
egress_plan=cast(EgressPlan, SimpleNamespace()),
|
||||
egress_plan=cast(EgressPlan, SimpleNamespace(canary="")),
|
||||
supervise_plan=None,
|
||||
agent_provision=AgentProvisionPlan(
|
||||
template="claude",
|
||||
|
||||
Reference in New Issue
Block a user