From aab450f2f7b2de7049db2106acbf49f3df85fb33 Mon Sep 17 00:00:00 2001 From: claude Date: Thu, 25 Jun 2026 01:02:34 +0000 Subject: [PATCH] feat(dlp): fragmentation resistance, entropy detector, broadened known-value scan MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - _alnum_projection(): strip non-alphanumeric chars for separator-injection detection - scan_known_secrets() gains two extra passes per secret after exact-variant matching: alnum-projection exact match (catches hyphens/spaces between secret chars) and a sliding-window partial-match scan (catches chunked substrings ≥ PARTIAL_MATCH_MIN_LEN) - scan_known_secrets() accepts sensitive_prefixes param (default ("EGRESS_TOKEN_",)) so redact_tokens and call-sites can extend the scanned env-var prefix set - scan_entropy() warn-only detector flagging windows with Shannon entropy ≥ 5.5 bits/char - "entropy" added to OUTBOUND_DETECTOR_NAMES; scan_outbound opts it in only when explicitly listed in dlp.outbound_detectors (never part of the default "all" set) - scan_outbound reads BOT_BOTTLE_SENSITIVE_PREFIXES from environ to extend scan_known_secrets beyond EGRESS_TOKEN_* without schema changes - Binary bodies decoded via latin-1 fallback (bijective byte↔codepoint) instead of utf-8 errors=replace, preserving ASCII secret strings in binary payloads Co-Authored-By: Claude Sonnet 4.6 --- bot_bottle/dlp_detectors.py | 158 +++++++++++++++++- bot_bottle/egress_addon_core.py | 35 +++- tests/unit/test_dlp_detectors.py | 193 +++++++++++++++++++++- tests/unit/test_egress_addon_core.py | 96 +++++++++++ tests/unit/test_macos_container_launch.py | 3 +- 5 files changed, 474 insertions(+), 11 deletions(-) diff --git a/bot_bottle/dlp_detectors.py b/bot_bottle/dlp_detectors.py index ef36e7f..e46960d 100644 --- a/bot_bottle/dlp_detectors.py +++ b/bot_bottle/dlp_detectors.py @@ -1,4 +1,4 @@ -"""DLP detectors for the egress proxy (PRD 0053). +"""DLP detectors for the egress proxy (PRD 0053, prd-new). Pure Python, no mitmproxy dependency. Each detector is a module-level function returning `ScanResult | None`. @@ -15,6 +15,8 @@ import gzip import re import typing import unicodedata +from math import log2 +from collections import Counter from urllib.parse import quote as url_quote try: @@ -107,20 +109,21 @@ def redact_tokens( text: str, *, env: typing.Mapping[str, str] | None = None, + sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",), ) -> str: """Replace token pattern matches and (if env given) provisioned secrets with REDACT.""" for _, pattern in TOKEN_PATTERNS: text = pattern.sub(REDACT, text) if env is not None: for key, value in env.items(): - if key.startswith("EGRESS_TOKEN_") and value: + if any(key.startswith(p) for p in sensitive_prefixes) and value: for variant in _encoded_variants(value): text = text.replace(variant, REDACT) return text # --------------------------------------------------------------------------- -# Known secrets detector (Phase 1b) +# Known secrets detector (Phase 1b, prd-new) # --------------------------------------------------------------------------- def _encoded_variants(secret: str) -> list[str]: @@ -161,18 +164,64 @@ def _encoded_variants(secret: str) -> list[str]: return variants +# --------------------------------------------------------------------------- +# Fragmentation-resistant helpers (prd-new) +# --------------------------------------------------------------------------- + +# Minimum length of alnum projection for projection-based checks to run. +# Short secrets produce too many false positives in projection space. +_ALNUM_MIN_LEN = 8 + +# Minimum window length for the partial-substring sliding scan. +PARTIAL_MATCH_MIN_LEN = 12 + + +def _alnum_projection(text: str) -> str: + """Return text with every non-alphanumeric character stripped. + + Used for fragmentation-resistant matching: separator-injected secrets + (spaces, hyphens, dots inserted between characters) are identical to + their originals in alnum projection space. + """ + return "".join(c for c in text if c.isalnum()) + + +def _find_partial_window(secret_alnum: str, text_alnum: str, min_len: int) -> int | None: + """Return the position in text_alnum where any min_len-char window of + secret_alnum first appears, or None. + + Slides a window of width min_len across secret_alnum and searches for + each window in text_alnum. The first hit position is returned. + """ + if len(secret_alnum) < min_len or len(text_alnum) < min_len: + return None + for i in range(len(secret_alnum) - min_len + 1): + window = secret_alnum[i:i + min_len] + pos = text_alnum.find(window) + if pos >= 0: + return pos + return None + + def scan_known_secrets( text: str, *, location: str = "body", env: typing.Mapping[str, str] | None = None, + sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",), safe_tokens: typing.AbstractSet[str] | None = None, ) -> ScanResult | None: if env is None: return None + + # Pre-compute alnum projection of the scan text once; reused per secret. + text_alnum: str | None = None + for key, value in env.items(): - if not key.startswith("EGRESS_TOKEN_") or not value: + if not any(key.startswith(p) for p in sensitive_prefixes) or not value: continue + + # Pass 1: exact match across encoded variants (original behaviour). for variant in _encoded_variants(value): pos = text.find(variant) if pos >= 0: @@ -188,6 +237,100 @@ def scan_known_secrets( context=_snippet(text, pos, pos + len(variant)), matched=variant, ) + + # Pass 2 & 3: fragmentation-resistant projection checks. + secret_alnum = _alnum_projection(value) + if len(secret_alnum) < _ALNUM_MIN_LEN: + continue + + if text_alnum is None: + text_alnum = _alnum_projection(text) + + # Pass 2: full alnum-projection exact match (catches separator injection). + pos2 = text_alnum.find(secret_alnum) + if pos2 >= 0: + return ScanResult( + severity="block", + reason=( + f"provisioned secret from {key} found in {location} " + f"(fragmented match — separator injection)" + ), + location=location, + context=_snippet(text_alnum, pos2, pos2 + len(secret_alnum)), + ) + + # Pass 3: sliding-window partial match (catches chunked-substring leaks). + pos3 = _find_partial_window(secret_alnum, text_alnum, PARTIAL_MATCH_MIN_LEN) + if pos3 is not None: + return ScanResult( + severity="block", + reason=( + f"provisioned secret from {key} found in {location} " + f"(partial match — at least {PARTIAL_MATCH_MIN_LEN} consecutive " + f"alphanumeric chars)" + ), + location=location, + context=_snippet(text_alnum, pos3, pos3 + PARTIAL_MATCH_MIN_LEN), + ) + + return None + + +# --------------------------------------------------------------------------- +# Entropy detector (warn-only, prd-new) +# --------------------------------------------------------------------------- + +# Sliding window size and step for the entropy scan. +ENTROPY_WINDOW = 64 +ENTROPY_STEP = 32 + +# Bits-per-character threshold. Random ASCII printable ≈ 6.6 bits; random +# lowercase hex ≈ 4 bits; random base64url ≈ 6 bits. 5.5 sits above +# typical structured data (JSON, URLs) while staying below truly random +# content. +ENTROPY_BLOCK_THRESHOLD = 5.5 + + +def _shannon_entropy(text: str) -> float: + if not text: + return 0.0 + counts = Counter(text) + n = len(text) + return -sum((c / n) * log2(c / n) for c in counts.values()) + + +def scan_entropy( + text: str, + *, + location: str = "body", + window: int = ENTROPY_WINDOW, + threshold: float = ENTROPY_BLOCK_THRESHOLD, +) -> ScanResult | None: + """Warn-only detector: flag windows of `window` chars with Shannon entropy + above `threshold` bits per character. + + Never blocks; always returns severity='warn'. Disabled by default — + routes must opt in via dlp.outbound_detectors=['entropy']. + """ + if not text: + return None + step = max(1, window // 2) + end = len(text) + # Scan overlapping windows; also check the final tail if shorter than window. + positions = list(range(0, end - window + 1, step)) + if end < window: + positions = [0] + elif (end - window) % step != 0: + positions.append(end - window) + for i in positions: + chunk = text[i:i + window] + if _shannon_entropy(chunk) >= threshold: + return ScanResult( + severity="warn", + reason=f"high-entropy content in {location} (possible encrypted exfil)", + location=location, + context=_snippet(text, i, i + len(chunk)), + ) return None @@ -306,11 +449,18 @@ def scan_crlf_injection(text: str) -> ScanResult | None: __all__ = [ + "ENTROPY_BLOCK_THRESHOLD", + "ENTROPY_WINDOW", + "ENTROPY_STEP", + "PARTIAL_MATCH_MIN_LEN", "REDACT", "SNIPPET_CONTEXT", "TOKEN_PATTERNS", + "_alnum_projection", + "_shannon_entropy", "redact_tokens", "scan_crlf_injection", + "scan_entropy", "scan_known_secrets", "scan_naive_injection", "scan_token_patterns", diff --git a/bot_bottle/egress_addon_core.py b/bot_bottle/egress_addon_core.py index 324f7c7..16556a5 100644 --- a/bot_bottle/egress_addon_core.py +++ b/bot_bottle/egress_addon_core.py @@ -34,7 +34,7 @@ VALID_METHODS = frozenset({ "CONNECT", }) -OUTBOUND_DETECTOR_NAMES = frozenset({"token_patterns", "known_secrets"}) +OUTBOUND_DETECTOR_NAMES = frozenset({"token_patterns", "known_secrets", "entropy"}) INBOUND_DETECTOR_NAMES = frozenset({"naive_injection_detection"}) # Per-route policy for what the proxy does when an outbound DLP detector @@ -729,17 +729,28 @@ def scan_outbound( try: from dlp_detectors import ( # type: ignore[import-not-found] scan_crlf_injection, + scan_entropy, scan_known_secrets, scan_token_patterns, ) except ImportError: # pragma: no cover - host-side path from .dlp_detectors import ( # type: ignore[import-not-found] scan_crlf_injection, + scan_entropy, scan_known_secrets, scan_token_patterns, ) - text = body if isinstance(body, str) else body.decode("utf-8", errors="replace") + # Binary bodies: latin-1 is a bijective byte↔codepoint mapping that + # preserves every byte value, so ASCII-range secret strings remain + # findable by str.find / regex. Prefer strict UTF-8 for valid text bodies. + if isinstance(body, bytes): + try: + text = body.decode("utf-8") + except UnicodeDecodeError: + text = body.decode("latin-1") + else: + text = body # CRLF injection is only an attack in the request line + headers, never the # body: an HTTP body is delimited by Content-Length, so CRLF bytes there @@ -758,12 +769,30 @@ def scan_outbound( return result if _detector_enabled(route.outbound_detectors, "known_secrets"): + # BOT_BOTTLE_SENSITIVE_PREFIXES lets operators add extra env prefixes + # beyond EGRESS_TOKEN_* without changing the manifest schema. + extra_raw = environ.get("BOT_BOTTLE_SENSITIVE_PREFIXES", "") + extra = tuple(p for p in extra_raw.split(",") if p) + sensitive_prefixes = ("EGRESS_TOKEN_",) + extra result = scan_known_secrets( - text, location="body", env=environ, safe_tokens=safe_tokens, + text, location="body", env=environ, + sensitive_prefixes=sensitive_prefixes, safe_tokens=safe_tokens, ) if result is not None: return result + # Entropy scanning requires explicit opt-in: it is NOT part of the + # default "all detectors" set because it produces false positives on + # legitimate base64 / binary payloads. Routes must list "entropy" in + # dlp.outbound_detectors to enable it. + if ( + route.outbound_detectors is not None + and "entropy" in route.outbound_detectors + ): + result = scan_entropy(text, location="body") + if result is not None: + return result + return None diff --git a/tests/unit/test_dlp_detectors.py b/tests/unit/test_dlp_detectors.py index 3028729..4be520f 100644 --- a/tests/unit/test_dlp_detectors.py +++ b/tests/unit/test_dlp_detectors.py @@ -1,18 +1,24 @@ -"""Unit: DLP detectors (PRD 0053). +"""Unit: DLP detectors (PRD 0053, prd-new). -Tests for token pattern scanning, known secret detection, and -naive prompt injection detection.""" +Tests for token pattern scanning, known secret detection, fragmentation- +resistant matching, entropy scoring, and naive prompt injection detection.""" import base64 import gzip import unittest from bot_bottle.dlp_detectors import ( + ENTROPY_BLOCK_THRESHOLD, + ENTROPY_WINDOW, + PARTIAL_MATCH_MIN_LEN, REDACT, + _alnum_projection, _encoded_variants, _normalize_text, + _shannon_entropy, redact_tokens, scan_crlf_injection, + scan_entropy, scan_known_secrets, scan_naive_injection, scan_token_patterns, @@ -502,6 +508,187 @@ class TestStripCrlf(unittest.TestCase): from bot_bottle.dlp_detectors import strip_crlf self.assertEqual("/api/v1/data?q=hello", strip_crlf("/api/v1/data?q=hello")) +class TestAlnumProjection(unittest.TestCase): + def test_alphanumeric_unchanged(self): + self.assertEqual("abc123XYZ", _alnum_projection("abc123XYZ")) + + def test_strips_hyphens(self): + self.assertEqual("mysecretvalue", _alnum_projection("my-secret-value")) + + def test_strips_spaces(self): + self.assertEqual("mysecretvalue", _alnum_projection("my secret value")) + + def test_strips_dots_and_underscores(self): + self.assertEqual("mysecretvalue", _alnum_projection("my.secret_value")) + + def test_empty_string(self): + self.assertEqual("", _alnum_projection("")) + + def test_all_special_chars(self): + self.assertEqual("", _alnum_projection("!@#$%^&*()")) + + +class TestFragmentationResistantMatching(unittest.TestCase): + """scan_known_secrets catches separator-injection and partial-substring evasion.""" + + # Secrets long enough that their alnum projections are ≥ 8 chars. + SECRET = "supersecrettoken99" + ENV = {"EGRESS_TOKEN_0": SECRET} + + def test_exact_match_still_works(self): + result = scan_known_secrets(f"key={self.SECRET}", env=self.ENV) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("block", result.severity) + + def test_separator_injection_blocked(self): + # Hyphens inserted between chars of the secret. + fragmented = "-".join(self.SECRET) + result = scan_known_secrets(f"data={fragmented}", env=self.ENV) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("block", result.severity) + self.assertIn("separator injection", result.reason) + + def test_space_separator_blocked(self): + fragmented = " ".join(self.SECRET) + result = scan_known_secrets(f"body: {fragmented}", env=self.ENV) + self.assertIsNotNone(result) + assert result is not None + self.assertIn("separator injection", result.reason) + + def test_partial_substring_blocked(self): + # First PARTIAL_MATCH_MIN_LEN alnum chars of the secret, no separators. + partial = _alnum_projection(self.SECRET)[:PARTIAL_MATCH_MIN_LEN] + result = scan_known_secrets(f"x={partial}&y=other", env=self.ENV) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("block", result.severity) + self.assertIn("partial match", result.reason) + + def test_short_secret_skips_projection(self): + # Secrets shorter than _ALNUM_MIN_LEN in alnum projection are not + # fragmentation-checked (too many false positives). + short_env = {"EGRESS_TOKEN_0": "abc"} + # "a b c" has alnum projection "abc" (3 chars, < 8); should not block. + self.assertIsNone(scan_known_secrets("a b c", env=short_env)) + + def test_clean_text_not_blocked(self): + self.assertIsNone(scan_known_secrets("nothing to see here", env=self.ENV)) + + def test_sensitive_prefixes_param_extra_prefix(self): + env = {"MY_CRED_0": self.SECRET, "IGNORED": "other"} + result = scan_known_secrets( + f"key={self.SECRET}", + env=env, + sensitive_prefixes=("MY_CRED_",), + ) + self.assertIsNotNone(result) + assert result is not None + self.assertIn("MY_CRED_0", result.reason) + + def test_sensitive_prefixes_default_only_egress_token(self): + # A value under a non-EGRESS_TOKEN_ key is ignored with default prefixes. + env = {"MY_CRED_0": self.SECRET} + self.assertIsNone(scan_known_secrets(f"key={self.SECRET}", env=env)) + + def test_canary_prefix_detected(self): + canary_value = "canary-fake-secret-value-xyz" + env = {"EGRESS_TOKEN_CANARY": canary_value} + result = scan_known_secrets(f"x={canary_value}", env=env) + self.assertIsNotNone(result) + assert result is not None + self.assertIn("EGRESS_TOKEN_CANARY", result.reason) + + +class TestRedactTokensBroadenedPrefixes(unittest.TestCase): + SECRET = "my-provisioned-secret" + + def test_default_redacts_egress_token(self): + env = {"EGRESS_TOKEN_0": self.SECRET} + out = redact_tokens(f"val={self.SECRET}", env=env) + self.assertNotIn(self.SECRET, out) + self.assertIn(REDACT, out) + + def test_extra_prefix_redacted(self): + env = {"MY_SECRET_KEY": self.SECRET} + out = redact_tokens( + f"val={self.SECRET}", + env=env, + sensitive_prefixes=("MY_SECRET_",), + ) + self.assertNotIn(self.SECRET, out) + self.assertIn(REDACT, out) + + def test_non_matching_prefix_not_redacted(self): + env = {"MY_SECRET_KEY": self.SECRET} + out = redact_tokens(f"val={self.SECRET}", env=env) + # Default prefixes only include EGRESS_TOKEN_ → secret not redacted + self.assertIn(self.SECRET, out) + + +class TestShannonEntropy(unittest.TestCase): + def test_empty_string_zero(self): + self.assertEqual(0.0, _shannon_entropy("")) + + def test_single_char_zero(self): + self.assertEqual(0.0, _shannon_entropy("aaaaaa")) + + def test_two_equal_chars_one_bit(self): + self.assertAlmostEqual(1.0, _shannon_entropy("abababab"), places=10) + + def test_high_entropy_random_like(self): + # Uniform 64-char string over 64 distinct symbols has entropy 6 bits. + import string + alphabet = (string.ascii_letters + string.digits + "+/")[:64] + text = alphabet # each char appears exactly once + self.assertAlmostEqual(6.0, _shannon_entropy(text), places=10) + + +class TestScanEntropy(unittest.TestCase): + def test_empty_returns_none(self): + self.assertIsNone(scan_entropy("")) + + def test_low_entropy_returns_none(self): + # Highly repetitive text has low entropy. + self.assertIsNone(scan_entropy("a" * 200)) + + def test_high_entropy_warns(self): + # Build a 64-char string with entropy > ENTROPY_BLOCK_THRESHOLD. + # Use all 64 distinct printable chars to maximise entropy (~6 bits). + import string + alphabet = (string.ascii_letters + string.digits + "+/")[:64] + result = scan_entropy(alphabet, threshold=ENTROPY_BLOCK_THRESHOLD) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("warn", result.severity) + self.assertIn("high-entropy", result.reason) + + def test_never_blocks(self): + import string + alphabet = (string.ascii_letters + string.digits + "+/")[:64] + result = scan_entropy(alphabet) + # scan_entropy is warn-only; it must never return severity="block". + if result is not None: + self.assertNotEqual("block", result.severity) + + def test_location_in_result(self): + import string + alphabet = (string.ascii_letters + string.digits + "+/")[:64] + result = scan_entropy(alphabet, location="authorization header") + if result is not None: + self.assertIn("authorization header", result.location) + + def test_structured_json_no_warn(self): + # Typical JSON has low entropy and should not be flagged. + json_body = '{"status": "ok", "message": "hello world", "count": 42}' + self.assertIsNone(scan_entropy(json_body)) + + def test_short_text_below_window(self): + # Text shorter than the window: checked as one chunk. + # Use a uniform string to ensure it won't be flagged. + self.assertIsNone(scan_entropy("abcde", threshold=ENTROPY_BLOCK_THRESHOLD)) + if __name__ == "__main__": unittest.main() diff --git a/tests/unit/test_egress_addon_core.py b/tests/unit/test_egress_addon_core.py index 2ea6abd..023f0ca 100644 --- a/tests/unit/test_egress_addon_core.py +++ b/tests/unit/test_egress_addon_core.py @@ -1273,6 +1273,102 @@ class TestBuildTokenAllowPayload(unittest.TestCase): result = ScanResult(severity="block", reason="r", matched="x") payload = build_token_allow_payload("h", "GET", "/", result) self.assertNotIn("context:", payload) +class TestScanOutboundEnhanced(unittest.TestCase): + """scan_outbound changes from prd-new: binary decode, entropy detector, + broadened known-value prefixes, fragmentation resistance.""" + + _ROUTE = Route(host="api.example.com") + _ROUTE_ENTROPY = Route( + host="api.example.com", + outbound_detectors=("entropy",), + ) + + def test_binary_body_latin1_decode_finds_ascii_secret(self): + # Body contains valid ASCII secret surrounded by non-UTF-8 bytes. + secret = "supersecrettoken99" + env = {"EGRESS_TOKEN_0": secret} + # Wrap the secret in bytes that are invalid UTF-8. + body = b"\x80\x81" + secret.encode("ascii") + b"\xff" + result = scan_outbound(self._ROUTE, body, env) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("block", result.severity) + + def test_binary_body_valid_utf8_decoded_correctly(self): + env = {"EGRESS_TOKEN_0": "mysecret"} + # Valid UTF-8 body — should be decoded as UTF-8, not latin-1. + body = "clean body with mysecret".encode("utf-8") + result = scan_outbound(self._ROUTE, body, env) + self.assertIsNotNone(result) + + def test_entropy_detector_off_by_default(self): + import string + # High-entropy content should NOT warn if the route has no entropy detector. + alphabet = (string.ascii_letters + string.digits + "+/")[:64] + result = scan_outbound(self._ROUTE, alphabet, {}) + self.assertIsNone(result) + + def test_entropy_detector_warns_when_enabled(self): + import string + alphabet = (string.ascii_letters + string.digits + "+/")[:64] + result = scan_outbound(self._ROUTE_ENTROPY, alphabet, {}) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("warn", result.severity) + + def test_bot_bottle_sensitive_prefixes_env_var(self): + # When the sidecar env contains BOT_BOTTLE_SENSITIVE_PREFIXES, + # scan_outbound should scan those additional prefixes. + secret = "extra-sensitive-value-abc" + env = { + "MY_CRED_KEY": secret, + "BOT_BOTTLE_SENSITIVE_PREFIXES": "MY_CRED_", + } + result = scan_outbound(self._ROUTE, f"x={secret}", env) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("block", result.severity) + + def test_bot_bottle_sensitive_prefixes_multiple(self): + secret = "my-api-key-value-xyz" + env = { + "ANTHROPIC_API_0": secret, + "BOT_BOTTLE_SENSITIVE_PREFIXES": "ANTHROPIC_API_,OTHER_", + } + result = scan_outbound(self._ROUTE, f"auth={secret}", env) + self.assertIsNotNone(result) + + def test_canary_detected_via_egress_token_canary(self): + # The canary (injected as EGRESS_TOKEN_CANARY) is caught by known_secrets. + canary = "canaryvalue12345abcdef" + env = {"EGRESS_TOKEN_CANARY": canary} + result = scan_outbound(self._ROUTE, f"data={canary}", env) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("block", result.severity) + self.assertIn("EGRESS_TOKEN_CANARY", result.reason) + + def test_fragmented_canary_blocked(self): + # Canary with separators injected is still caught. + canary = "supersecretcanary99" + env = {"EGRESS_TOKEN_CANARY": canary} + fragmented = "-".join(canary) + result = scan_outbound(self._ROUTE, f"x={fragmented}", env) + self.assertIsNotNone(result) + + +class TestOutboundDetectorNames(unittest.TestCase): + def test_entropy_in_outbound_detector_names(self): + from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES + self.assertIn("entropy", OUTBOUND_DETECTOR_NAMES) + + def test_known_secrets_in_outbound_detector_names(self): + from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES + self.assertIn("known_secrets", OUTBOUND_DETECTOR_NAMES) + + def test_token_patterns_in_outbound_detector_names(self): + from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES + self.assertIn("token_patterns", OUTBOUND_DETECTOR_NAMES) if __name__ == "__main__": diff --git a/tests/unit/test_macos_container_launch.py b/tests/unit/test_macos_container_launch.py index d9ae81c..4f66005 100644 --- a/tests/unit/test_macos_container_launch.py +++ b/tests/unit/test_macos_container_launch.py @@ -42,6 +42,7 @@ def _plan( routes_path=routes_path, routes=("route",), token_env_map={"EGRESS_TOKEN_0": "HOST_TOKEN"}, + canary="", ) if git: key_path = stage_dir / "origin-key" @@ -271,7 +272,7 @@ def _build_plan(stage_dir: Path) -> MacosContainerBottlePlan: manifest=_MANIFEST, stage_dir=stage_dir, git_gate_plan=cast(GitGatePlan, SimpleNamespace(upstreams=())), - egress_plan=cast(EgressPlan, SimpleNamespace()), + egress_plan=cast(EgressPlan, SimpleNamespace(canary="")), supervise_plan=None, agent_provision=AgentProvisionPlan( template="claude",