From ea6bc5a17082a9a27230b22cf45c525b7da51010 Mon Sep 17 00:00:00 2001 From: claude Date: Thu, 25 Jun 2026 00:54:28 +0000 Subject: [PATCH 01/10] docs: draft PRD prd-new for strengthen-outbound-exfil-detection Co-Authored-By: Claude Sonnet 4.6 --- ...new-strengthen-outbound-exfil-detection.md | 186 ++++++++++++++++++ 1 file changed, 186 insertions(+) create mode 100644 docs/prds/prd-new-strengthen-outbound-exfil-detection.md diff --git a/docs/prds/prd-new-strengthen-outbound-exfil-detection.md b/docs/prds/prd-new-strengthen-outbound-exfil-detection.md new file mode 100644 index 0000000..1f55627 --- /dev/null +++ b/docs/prds/prd-new-strengthen-outbound-exfil-detection.md @@ -0,0 +1,186 @@ +# PRD prd-new: Strengthen outbound exfiltration detection + +- **Status:** Draft +- **Author:** claude +- **Created:** 2026-06-25 +- **Issue:** #259 + +## Summary + +The current DLP system has two detection tiers: weak (vendor token regexes) +and strong (exact matching of provisioned `EGRESS_TOKEN_*` values in multiple +encodings). This PRD adds four complementary hardening layers in ranked order +of impact: + +1. **Canary tokens** — inject a per-session fake secret into the agent's + environment; any outbound appearance is a zero-false-positive exfil signal. +2. **Broadened known-value scanning** — extend the strong tier beyond + `EGRESS_TOKEN_*` via a `BOT_BOTTLE_SENSITIVE_PREFIXES` sidecar env var. +3. **Fragmentation-resistant matching** — catch separator injection and + long-substring leaks via alnum projection and sliding-window substring + search. +4. **Entropy scoring** — warn-only flag on high-entropy blobs as a + complementary signal for encrypted exfil. + +## Problem + +After PRD 0056 the outbound scan covers all HTTP request surfaces (host, path, +query, headers, body). But the content-based strong tier only matches +`EGRESS_TOKEN_*` values and their encodings. Three gaps remain: + +| Gap | How an agent exploits it | +|-----|--------------------------| +| No canary | There is no "can't be false-positive" signal | +| Narrow prefix filter | Sensitive values provisioned under other env prefixes (MCP keys, API keys injected via the agent provider) are invisible to `scan_known_secrets` | +| Fragmentation blindness | Inserting `-`, space, or `\n` between secret characters turns an exact-match into a miss | + +## Goals / Success Criteria + +1. Each launched bottle has a unique canary token in the agent's environment + (`BOT_BOTTLE_CANARY`) and the egress sidecar's environment + (`EGRESS_TOKEN_CANARY`). Any outbound appearance of the canary blocks the + request with reason `"canary token"`. +2. `scan_known_secrets` accepts a `sensitive_prefixes` parameter (default: + `("EGRESS_TOKEN_",)`). `scan_outbound` reads + `BOT_BOTTLE_SENSITIVE_PREFIXES` from `environ` and merges those prefixes + in, so operators can mark additional env vars as scanned values without + changing the manifest schema. +3. For every secret that passes exact-match, a secondary alnum-projection pass + checks for the secret with all non-alphanumeric characters stripped. This + catches separator-injection evasion (`MY-SECRET` → body contains + `MY SECRET`). +4. A sliding-window partial-match pass checks for long-enough contiguous + substrings of the secret's alnum projection in the text's alnum projection. + Any match ≥ `PARTIAL_MATCH_MIN_LEN` (12 chars) blocks with reason + `"partial match"`. +5. A new `scan_entropy` detector flags outbound text windows with Shannon + entropy ≥ `ENTROPY_BLOCK_THRESHOLD` (5.5 bits/char) at **warn** severity + only. It is registered under the new detector name `"entropy"` in + `OUTBOUND_DETECTOR_NAMES` and disabled by default (routes must opt in). +6. Binary request bodies are decoded via `latin-1` instead of + `utf-8 errors="replace"`, preserving every byte value and allowing + ASCII-range secrets to be found within binary payloads. +7. All new behaviour is unit-tested; existing tests pass unchanged. + +## Non-goals + +- Rolling per-host buffer for split-across-requests detection (state in the + stateless addon is complex; deferred). +- Additional vendor regexes. +- ML / embedding-based detection. +- Entropy-based hard blocks (warn only per the issue). + +## Design + +### Canary token flow + +``` +Egress.prepare() + canary = secrets.token_urlsafe(32) + EgressPlan(canary=canary, ...) + +Docker compose render: + sidecar env: EGRESS_TOKEN_CANARY= ← scanned by existing known-secrets detector + agent env: BOT_BOTTLE_CANARY= ← visible to agent as a "secret" + +macos-container launch: same literals added to sidecar + agent env entries +``` + +`EGRESS_TOKEN_CANARY` matches the `EGRESS_TOKEN_` prefix already scanned by +`scan_known_secrets`, so no detector code changes are required for canary +detection — only the injection path. + +### Broadened known-value scanning + +`scan_known_secrets` gains a `sensitive_prefixes` parameter: + +```python +def scan_known_secrets( + text: str, + *, + location: str = "body", + env: Mapping[str, str] | None = None, + sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",), +) -> ScanResult | None: +``` + +`scan_outbound` reads `BOT_BOTTLE_SENSITIVE_PREFIXES` (comma-separated list +of additional prefixes) from `environ` and appends them: + +```python +extra = tuple( + p for p in environ.get("BOT_BOTTLE_SENSITIVE_PREFIXES", "").split(",") if p +) +sensitive_prefixes = ("EGRESS_TOKEN_",) + extra +``` + +`redact_tokens` receives the same treatment for consistent redaction. + +### Fragmentation-resistant matching + +A new helper `_alnum_projection(text)` strips all non-alphanumeric characters. +`scan_known_secrets` runs two passes per secret: + +1. **Exact pass** — existing encoded-variant loop (unchanged). +2. **Alnum-projection pass** — if the secret's alnum projection has ≥ 8 chars, + check if it appears in the text's alnum projection. Match → block with + `"fragmented match (separator injection)"` reason. +3. **Partial-substring pass** — if the secret's alnum projection has ≥ + `PARTIAL_MATCH_MIN_LEN` chars (12), slide a window of that length across the + secret's projection and look for each window in the text's alnum projection. + First match → block with `"partial match"` reason. + +All three passes run only for the `"known_secrets"` detector; the token-pattern +and entropy detectors are unchanged. + +### Entropy scoring + +New public function: + +```python +def scan_entropy( + text: str, + *, + location: str = "body", + window: int = ENTROPY_WINDOW, # 64 + threshold: float = ENTROPY_BLOCK_THRESHOLD, # 5.5 +) -> ScanResult | None: +``` + +Slides a window of `window` characters across `text` in steps of `window // 2`. +If any window's Shannon entropy exceeds `threshold`, returns a **warn**-severity +`ScanResult`. Never blocks. + +`OUTBOUND_DETECTOR_NAMES` gains `"entropy"`. Routes opt in via their `dlp` +block; entropy scanning is **off by default** to avoid false-positive noise on +legitimate binary payloads. + +### Binary body handling + +In `scan_outbound`, the bytes → str decoding changes from: + +```python +body.decode("utf-8", errors="replace") +``` + +to: + +```python +body.decode("utf-8") if body is str else body.decode("latin-1") +``` + +`latin-1` is a bijective byte↔codepoint mapping; every byte value is preserved +as its corresponding Latin-1 code point, so ASCII-range secret strings remain +intact and `str.find` / regex still locate them correctly. The fallback from +strict UTF-8 is tried first so valid UTF-8 bodies are decoded faithfully. + +## Implementation + +Delivered in three commits on the same branch: + +1. **DLP detector changes** — `_alnum_projection`, fragmentation passes, + `scan_entropy`, broadened `scan_known_secrets`, updated `scan_outbound` and + `redact_tokens`; all accompanying unit tests. +2. **Canary injection** — `EgressPlan.canary`, `Egress.prepare()`, + Docker compose + macos-container backend injection. +3. **PRD flip** — `Status: Draft → Active`. -- 2.52.0 From 701df6cb2f6ffd28c87abc3f2cd9606eeb55c89b Mon Sep 17 00:00:00 2001 From: claude Date: Thu, 25 Jun 2026 01:02:34 +0000 Subject: [PATCH 02/10] feat(dlp): fragmentation resistance, entropy detector, broadened known-value scan MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - _alnum_projection(): strip non-alphanumeric chars for separator-injection detection - scan_known_secrets() gains two extra passes per secret after exact-variant matching: alnum-projection exact match (catches hyphens/spaces between secret chars) and a sliding-window partial-match scan (catches chunked substrings ≥ PARTIAL_MATCH_MIN_LEN) - scan_known_secrets() accepts sensitive_prefixes param (default ("EGRESS_TOKEN_",)) so redact_tokens and call-sites can extend the scanned env-var prefix set - scan_entropy() warn-only detector flagging windows with Shannon entropy ≥ 5.5 bits/char - "entropy" added to OUTBOUND_DETECTOR_NAMES; scan_outbound opts it in only when explicitly listed in dlp.outbound_detectors (never part of the default "all" set) - scan_outbound reads BOT_BOTTLE_SENSITIVE_PREFIXES from environ to extend scan_known_secrets beyond EGRESS_TOKEN_* without schema changes - Binary bodies decoded via latin-1 fallback (bijective byte↔codepoint) instead of utf-8 errors=replace, preserving ASCII secret strings in binary payloads Co-Authored-By: Claude Sonnet 4.6 --- bot_bottle/dlp_detectors.py | 158 +++++++++++++++++- bot_bottle/egress_addon_core.py | 35 +++- tests/unit/test_dlp_detectors.py | 193 +++++++++++++++++++++- tests/unit/test_egress_addon_core.py | 96 +++++++++++ tests/unit/test_macos_container_launch.py | 3 +- 5 files changed, 474 insertions(+), 11 deletions(-) diff --git a/bot_bottle/dlp_detectors.py b/bot_bottle/dlp_detectors.py index ef36e7f..e46960d 100644 --- a/bot_bottle/dlp_detectors.py +++ b/bot_bottle/dlp_detectors.py @@ -1,4 +1,4 @@ -"""DLP detectors for the egress proxy (PRD 0053). +"""DLP detectors for the egress proxy (PRD 0053, prd-new). Pure Python, no mitmproxy dependency. Each detector is a module-level function returning `ScanResult | None`. @@ -15,6 +15,8 @@ import gzip import re import typing import unicodedata +from math import log2 +from collections import Counter from urllib.parse import quote as url_quote try: @@ -107,20 +109,21 @@ def redact_tokens( text: str, *, env: typing.Mapping[str, str] | None = None, + sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",), ) -> str: """Replace token pattern matches and (if env given) provisioned secrets with REDACT.""" for _, pattern in TOKEN_PATTERNS: text = pattern.sub(REDACT, text) if env is not None: for key, value in env.items(): - if key.startswith("EGRESS_TOKEN_") and value: + if any(key.startswith(p) for p in sensitive_prefixes) and value: for variant in _encoded_variants(value): text = text.replace(variant, REDACT) return text # --------------------------------------------------------------------------- -# Known secrets detector (Phase 1b) +# Known secrets detector (Phase 1b, prd-new) # --------------------------------------------------------------------------- def _encoded_variants(secret: str) -> list[str]: @@ -161,18 +164,64 @@ def _encoded_variants(secret: str) -> list[str]: return variants +# --------------------------------------------------------------------------- +# Fragmentation-resistant helpers (prd-new) +# --------------------------------------------------------------------------- + +# Minimum length of alnum projection for projection-based checks to run. +# Short secrets produce too many false positives in projection space. +_ALNUM_MIN_LEN = 8 + +# Minimum window length for the partial-substring sliding scan. +PARTIAL_MATCH_MIN_LEN = 12 + + +def _alnum_projection(text: str) -> str: + """Return text with every non-alphanumeric character stripped. + + Used for fragmentation-resistant matching: separator-injected secrets + (spaces, hyphens, dots inserted between characters) are identical to + their originals in alnum projection space. + """ + return "".join(c for c in text if c.isalnum()) + + +def _find_partial_window(secret_alnum: str, text_alnum: str, min_len: int) -> int | None: + """Return the position in text_alnum where any min_len-char window of + secret_alnum first appears, or None. + + Slides a window of width min_len across secret_alnum and searches for + each window in text_alnum. The first hit position is returned. + """ + if len(secret_alnum) < min_len or len(text_alnum) < min_len: + return None + for i in range(len(secret_alnum) - min_len + 1): + window = secret_alnum[i:i + min_len] + pos = text_alnum.find(window) + if pos >= 0: + return pos + return None + + def scan_known_secrets( text: str, *, location: str = "body", env: typing.Mapping[str, str] | None = None, + sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",), safe_tokens: typing.AbstractSet[str] | None = None, ) -> ScanResult | None: if env is None: return None + + # Pre-compute alnum projection of the scan text once; reused per secret. + text_alnum: str | None = None + for key, value in env.items(): - if not key.startswith("EGRESS_TOKEN_") or not value: + if not any(key.startswith(p) for p in sensitive_prefixes) or not value: continue + + # Pass 1: exact match across encoded variants (original behaviour). for variant in _encoded_variants(value): pos = text.find(variant) if pos >= 0: @@ -188,6 +237,100 @@ def scan_known_secrets( context=_snippet(text, pos, pos + len(variant)), matched=variant, ) + + # Pass 2 & 3: fragmentation-resistant projection checks. + secret_alnum = _alnum_projection(value) + if len(secret_alnum) < _ALNUM_MIN_LEN: + continue + + if text_alnum is None: + text_alnum = _alnum_projection(text) + + # Pass 2: full alnum-projection exact match (catches separator injection). + pos2 = text_alnum.find(secret_alnum) + if pos2 >= 0: + return ScanResult( + severity="block", + reason=( + f"provisioned secret from {key} found in {location} " + f"(fragmented match — separator injection)" + ), + location=location, + context=_snippet(text_alnum, pos2, pos2 + len(secret_alnum)), + ) + + # Pass 3: sliding-window partial match (catches chunked-substring leaks). + pos3 = _find_partial_window(secret_alnum, text_alnum, PARTIAL_MATCH_MIN_LEN) + if pos3 is not None: + return ScanResult( + severity="block", + reason=( + f"provisioned secret from {key} found in {location} " + f"(partial match — at least {PARTIAL_MATCH_MIN_LEN} consecutive " + f"alphanumeric chars)" + ), + location=location, + context=_snippet(text_alnum, pos3, pos3 + PARTIAL_MATCH_MIN_LEN), + ) + + return None + + +# --------------------------------------------------------------------------- +# Entropy detector (warn-only, prd-new) +# --------------------------------------------------------------------------- + +# Sliding window size and step for the entropy scan. +ENTROPY_WINDOW = 64 +ENTROPY_STEP = 32 + +# Bits-per-character threshold. Random ASCII printable ≈ 6.6 bits; random +# lowercase hex ≈ 4 bits; random base64url ≈ 6 bits. 5.5 sits above +# typical structured data (JSON, URLs) while staying below truly random +# content. +ENTROPY_BLOCK_THRESHOLD = 5.5 + + +def _shannon_entropy(text: str) -> float: + if not text: + return 0.0 + counts = Counter(text) + n = len(text) + return -sum((c / n) * log2(c / n) for c in counts.values()) + + +def scan_entropy( + text: str, + *, + location: str = "body", + window: int = ENTROPY_WINDOW, + threshold: float = ENTROPY_BLOCK_THRESHOLD, +) -> ScanResult | None: + """Warn-only detector: flag windows of `window` chars with Shannon entropy + above `threshold` bits per character. + + Never blocks; always returns severity='warn'. Disabled by default — + routes must opt in via dlp.outbound_detectors=['entropy']. + """ + if not text: + return None + step = max(1, window // 2) + end = len(text) + # Scan overlapping windows; also check the final tail if shorter than window. + positions = list(range(0, end - window + 1, step)) + if end < window: + positions = [0] + elif (end - window) % step != 0: + positions.append(end - window) + for i in positions: + chunk = text[i:i + window] + if _shannon_entropy(chunk) >= threshold: + return ScanResult( + severity="warn", + reason=f"high-entropy content in {location} (possible encrypted exfil)", + location=location, + context=_snippet(text, i, i + len(chunk)), + ) return None @@ -306,11 +449,18 @@ def scan_crlf_injection(text: str) -> ScanResult | None: __all__ = [ + "ENTROPY_BLOCK_THRESHOLD", + "ENTROPY_WINDOW", + "ENTROPY_STEP", + "PARTIAL_MATCH_MIN_LEN", "REDACT", "SNIPPET_CONTEXT", "TOKEN_PATTERNS", + "_alnum_projection", + "_shannon_entropy", "redact_tokens", "scan_crlf_injection", + "scan_entropy", "scan_known_secrets", "scan_naive_injection", "scan_token_patterns", diff --git a/bot_bottle/egress_addon_core.py b/bot_bottle/egress_addon_core.py index 324f7c7..16556a5 100644 --- a/bot_bottle/egress_addon_core.py +++ b/bot_bottle/egress_addon_core.py @@ -34,7 +34,7 @@ VALID_METHODS = frozenset({ "CONNECT", }) -OUTBOUND_DETECTOR_NAMES = frozenset({"token_patterns", "known_secrets"}) +OUTBOUND_DETECTOR_NAMES = frozenset({"token_patterns", "known_secrets", "entropy"}) INBOUND_DETECTOR_NAMES = frozenset({"naive_injection_detection"}) # Per-route policy for what the proxy does when an outbound DLP detector @@ -729,17 +729,28 @@ def scan_outbound( try: from dlp_detectors import ( # type: ignore[import-not-found] scan_crlf_injection, + scan_entropy, scan_known_secrets, scan_token_patterns, ) except ImportError: # pragma: no cover - host-side path from .dlp_detectors import ( # type: ignore[import-not-found] scan_crlf_injection, + scan_entropy, scan_known_secrets, scan_token_patterns, ) - text = body if isinstance(body, str) else body.decode("utf-8", errors="replace") + # Binary bodies: latin-1 is a bijective byte↔codepoint mapping that + # preserves every byte value, so ASCII-range secret strings remain + # findable by str.find / regex. Prefer strict UTF-8 for valid text bodies. + if isinstance(body, bytes): + try: + text = body.decode("utf-8") + except UnicodeDecodeError: + text = body.decode("latin-1") + else: + text = body # CRLF injection is only an attack in the request line + headers, never the # body: an HTTP body is delimited by Content-Length, so CRLF bytes there @@ -758,12 +769,30 @@ def scan_outbound( return result if _detector_enabled(route.outbound_detectors, "known_secrets"): + # BOT_BOTTLE_SENSITIVE_PREFIXES lets operators add extra env prefixes + # beyond EGRESS_TOKEN_* without changing the manifest schema. + extra_raw = environ.get("BOT_BOTTLE_SENSITIVE_PREFIXES", "") + extra = tuple(p for p in extra_raw.split(",") if p) + sensitive_prefixes = ("EGRESS_TOKEN_",) + extra result = scan_known_secrets( - text, location="body", env=environ, safe_tokens=safe_tokens, + text, location="body", env=environ, + sensitive_prefixes=sensitive_prefixes, safe_tokens=safe_tokens, ) if result is not None: return result + # Entropy scanning requires explicit opt-in: it is NOT part of the + # default "all detectors" set because it produces false positives on + # legitimate base64 / binary payloads. Routes must list "entropy" in + # dlp.outbound_detectors to enable it. + if ( + route.outbound_detectors is not None + and "entropy" in route.outbound_detectors + ): + result = scan_entropy(text, location="body") + if result is not None: + return result + return None diff --git a/tests/unit/test_dlp_detectors.py b/tests/unit/test_dlp_detectors.py index 3028729..4be520f 100644 --- a/tests/unit/test_dlp_detectors.py +++ b/tests/unit/test_dlp_detectors.py @@ -1,18 +1,24 @@ -"""Unit: DLP detectors (PRD 0053). +"""Unit: DLP detectors (PRD 0053, prd-new). -Tests for token pattern scanning, known secret detection, and -naive prompt injection detection.""" +Tests for token pattern scanning, known secret detection, fragmentation- +resistant matching, entropy scoring, and naive prompt injection detection.""" import base64 import gzip import unittest from bot_bottle.dlp_detectors import ( + ENTROPY_BLOCK_THRESHOLD, + ENTROPY_WINDOW, + PARTIAL_MATCH_MIN_LEN, REDACT, + _alnum_projection, _encoded_variants, _normalize_text, + _shannon_entropy, redact_tokens, scan_crlf_injection, + scan_entropy, scan_known_secrets, scan_naive_injection, scan_token_patterns, @@ -502,6 +508,187 @@ class TestStripCrlf(unittest.TestCase): from bot_bottle.dlp_detectors import strip_crlf self.assertEqual("/api/v1/data?q=hello", strip_crlf("/api/v1/data?q=hello")) +class TestAlnumProjection(unittest.TestCase): + def test_alphanumeric_unchanged(self): + self.assertEqual("abc123XYZ", _alnum_projection("abc123XYZ")) + + def test_strips_hyphens(self): + self.assertEqual("mysecretvalue", _alnum_projection("my-secret-value")) + + def test_strips_spaces(self): + self.assertEqual("mysecretvalue", _alnum_projection("my secret value")) + + def test_strips_dots_and_underscores(self): + self.assertEqual("mysecretvalue", _alnum_projection("my.secret_value")) + + def test_empty_string(self): + self.assertEqual("", _alnum_projection("")) + + def test_all_special_chars(self): + self.assertEqual("", _alnum_projection("!@#$%^&*()")) + + +class TestFragmentationResistantMatching(unittest.TestCase): + """scan_known_secrets catches separator-injection and partial-substring evasion.""" + + # Secrets long enough that their alnum projections are ≥ 8 chars. + SECRET = "supersecrettoken99" + ENV = {"EGRESS_TOKEN_0": SECRET} + + def test_exact_match_still_works(self): + result = scan_known_secrets(f"key={self.SECRET}", env=self.ENV) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("block", result.severity) + + def test_separator_injection_blocked(self): + # Hyphens inserted between chars of the secret. + fragmented = "-".join(self.SECRET) + result = scan_known_secrets(f"data={fragmented}", env=self.ENV) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("block", result.severity) + self.assertIn("separator injection", result.reason) + + def test_space_separator_blocked(self): + fragmented = " ".join(self.SECRET) + result = scan_known_secrets(f"body: {fragmented}", env=self.ENV) + self.assertIsNotNone(result) + assert result is not None + self.assertIn("separator injection", result.reason) + + def test_partial_substring_blocked(self): + # First PARTIAL_MATCH_MIN_LEN alnum chars of the secret, no separators. + partial = _alnum_projection(self.SECRET)[:PARTIAL_MATCH_MIN_LEN] + result = scan_known_secrets(f"x={partial}&y=other", env=self.ENV) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("block", result.severity) + self.assertIn("partial match", result.reason) + + def test_short_secret_skips_projection(self): + # Secrets shorter than _ALNUM_MIN_LEN in alnum projection are not + # fragmentation-checked (too many false positives). + short_env = {"EGRESS_TOKEN_0": "abc"} + # "a b c" has alnum projection "abc" (3 chars, < 8); should not block. + self.assertIsNone(scan_known_secrets("a b c", env=short_env)) + + def test_clean_text_not_blocked(self): + self.assertIsNone(scan_known_secrets("nothing to see here", env=self.ENV)) + + def test_sensitive_prefixes_param_extra_prefix(self): + env = {"MY_CRED_0": self.SECRET, "IGNORED": "other"} + result = scan_known_secrets( + f"key={self.SECRET}", + env=env, + sensitive_prefixes=("MY_CRED_",), + ) + self.assertIsNotNone(result) + assert result is not None + self.assertIn("MY_CRED_0", result.reason) + + def test_sensitive_prefixes_default_only_egress_token(self): + # A value under a non-EGRESS_TOKEN_ key is ignored with default prefixes. + env = {"MY_CRED_0": self.SECRET} + self.assertIsNone(scan_known_secrets(f"key={self.SECRET}", env=env)) + + def test_canary_prefix_detected(self): + canary_value = "canary-fake-secret-value-xyz" + env = {"EGRESS_TOKEN_CANARY": canary_value} + result = scan_known_secrets(f"x={canary_value}", env=env) + self.assertIsNotNone(result) + assert result is not None + self.assertIn("EGRESS_TOKEN_CANARY", result.reason) + + +class TestRedactTokensBroadenedPrefixes(unittest.TestCase): + SECRET = "my-provisioned-secret" + + def test_default_redacts_egress_token(self): + env = {"EGRESS_TOKEN_0": self.SECRET} + out = redact_tokens(f"val={self.SECRET}", env=env) + self.assertNotIn(self.SECRET, out) + self.assertIn(REDACT, out) + + def test_extra_prefix_redacted(self): + env = {"MY_SECRET_KEY": self.SECRET} + out = redact_tokens( + f"val={self.SECRET}", + env=env, + sensitive_prefixes=("MY_SECRET_",), + ) + self.assertNotIn(self.SECRET, out) + self.assertIn(REDACT, out) + + def test_non_matching_prefix_not_redacted(self): + env = {"MY_SECRET_KEY": self.SECRET} + out = redact_tokens(f"val={self.SECRET}", env=env) + # Default prefixes only include EGRESS_TOKEN_ → secret not redacted + self.assertIn(self.SECRET, out) + + +class TestShannonEntropy(unittest.TestCase): + def test_empty_string_zero(self): + self.assertEqual(0.0, _shannon_entropy("")) + + def test_single_char_zero(self): + self.assertEqual(0.0, _shannon_entropy("aaaaaa")) + + def test_two_equal_chars_one_bit(self): + self.assertAlmostEqual(1.0, _shannon_entropy("abababab"), places=10) + + def test_high_entropy_random_like(self): + # Uniform 64-char string over 64 distinct symbols has entropy 6 bits. + import string + alphabet = (string.ascii_letters + string.digits + "+/")[:64] + text = alphabet # each char appears exactly once + self.assertAlmostEqual(6.0, _shannon_entropy(text), places=10) + + +class TestScanEntropy(unittest.TestCase): + def test_empty_returns_none(self): + self.assertIsNone(scan_entropy("")) + + def test_low_entropy_returns_none(self): + # Highly repetitive text has low entropy. + self.assertIsNone(scan_entropy("a" * 200)) + + def test_high_entropy_warns(self): + # Build a 64-char string with entropy > ENTROPY_BLOCK_THRESHOLD. + # Use all 64 distinct printable chars to maximise entropy (~6 bits). + import string + alphabet = (string.ascii_letters + string.digits + "+/")[:64] + result = scan_entropy(alphabet, threshold=ENTROPY_BLOCK_THRESHOLD) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("warn", result.severity) + self.assertIn("high-entropy", result.reason) + + def test_never_blocks(self): + import string + alphabet = (string.ascii_letters + string.digits + "+/")[:64] + result = scan_entropy(alphabet) + # scan_entropy is warn-only; it must never return severity="block". + if result is not None: + self.assertNotEqual("block", result.severity) + + def test_location_in_result(self): + import string + alphabet = (string.ascii_letters + string.digits + "+/")[:64] + result = scan_entropy(alphabet, location="authorization header") + if result is not None: + self.assertIn("authorization header", result.location) + + def test_structured_json_no_warn(self): + # Typical JSON has low entropy and should not be flagged. + json_body = '{"status": "ok", "message": "hello world", "count": 42}' + self.assertIsNone(scan_entropy(json_body)) + + def test_short_text_below_window(self): + # Text shorter than the window: checked as one chunk. + # Use a uniform string to ensure it won't be flagged. + self.assertIsNone(scan_entropy("abcde", threshold=ENTROPY_BLOCK_THRESHOLD)) + if __name__ == "__main__": unittest.main() diff --git a/tests/unit/test_egress_addon_core.py b/tests/unit/test_egress_addon_core.py index 2ea6abd..023f0ca 100644 --- a/tests/unit/test_egress_addon_core.py +++ b/tests/unit/test_egress_addon_core.py @@ -1273,6 +1273,102 @@ class TestBuildTokenAllowPayload(unittest.TestCase): result = ScanResult(severity="block", reason="r", matched="x") payload = build_token_allow_payload("h", "GET", "/", result) self.assertNotIn("context:", payload) +class TestScanOutboundEnhanced(unittest.TestCase): + """scan_outbound changes from prd-new: binary decode, entropy detector, + broadened known-value prefixes, fragmentation resistance.""" + + _ROUTE = Route(host="api.example.com") + _ROUTE_ENTROPY = Route( + host="api.example.com", + outbound_detectors=("entropy",), + ) + + def test_binary_body_latin1_decode_finds_ascii_secret(self): + # Body contains valid ASCII secret surrounded by non-UTF-8 bytes. + secret = "supersecrettoken99" + env = {"EGRESS_TOKEN_0": secret} + # Wrap the secret in bytes that are invalid UTF-8. + body = b"\x80\x81" + secret.encode("ascii") + b"\xff" + result = scan_outbound(self._ROUTE, body, env) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("block", result.severity) + + def test_binary_body_valid_utf8_decoded_correctly(self): + env = {"EGRESS_TOKEN_0": "mysecret"} + # Valid UTF-8 body — should be decoded as UTF-8, not latin-1. + body = "clean body with mysecret".encode("utf-8") + result = scan_outbound(self._ROUTE, body, env) + self.assertIsNotNone(result) + + def test_entropy_detector_off_by_default(self): + import string + # High-entropy content should NOT warn if the route has no entropy detector. + alphabet = (string.ascii_letters + string.digits + "+/")[:64] + result = scan_outbound(self._ROUTE, alphabet, {}) + self.assertIsNone(result) + + def test_entropy_detector_warns_when_enabled(self): + import string + alphabet = (string.ascii_letters + string.digits + "+/")[:64] + result = scan_outbound(self._ROUTE_ENTROPY, alphabet, {}) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("warn", result.severity) + + def test_bot_bottle_sensitive_prefixes_env_var(self): + # When the sidecar env contains BOT_BOTTLE_SENSITIVE_PREFIXES, + # scan_outbound should scan those additional prefixes. + secret = "extra-sensitive-value-abc" + env = { + "MY_CRED_KEY": secret, + "BOT_BOTTLE_SENSITIVE_PREFIXES": "MY_CRED_", + } + result = scan_outbound(self._ROUTE, f"x={secret}", env) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("block", result.severity) + + def test_bot_bottle_sensitive_prefixes_multiple(self): + secret = "my-api-key-value-xyz" + env = { + "ANTHROPIC_API_0": secret, + "BOT_BOTTLE_SENSITIVE_PREFIXES": "ANTHROPIC_API_,OTHER_", + } + result = scan_outbound(self._ROUTE, f"auth={secret}", env) + self.assertIsNotNone(result) + + def test_canary_detected_via_egress_token_canary(self): + # The canary (injected as EGRESS_TOKEN_CANARY) is caught by known_secrets. + canary = "canaryvalue12345abcdef" + env = {"EGRESS_TOKEN_CANARY": canary} + result = scan_outbound(self._ROUTE, f"data={canary}", env) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("block", result.severity) + self.assertIn("EGRESS_TOKEN_CANARY", result.reason) + + def test_fragmented_canary_blocked(self): + # Canary with separators injected is still caught. + canary = "supersecretcanary99" + env = {"EGRESS_TOKEN_CANARY": canary} + fragmented = "-".join(canary) + result = scan_outbound(self._ROUTE, f"x={fragmented}", env) + self.assertIsNotNone(result) + + +class TestOutboundDetectorNames(unittest.TestCase): + def test_entropy_in_outbound_detector_names(self): + from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES + self.assertIn("entropy", OUTBOUND_DETECTOR_NAMES) + + def test_known_secrets_in_outbound_detector_names(self): + from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES + self.assertIn("known_secrets", OUTBOUND_DETECTOR_NAMES) + + def test_token_patterns_in_outbound_detector_names(self): + from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES + self.assertIn("token_patterns", OUTBOUND_DETECTOR_NAMES) if __name__ == "__main__": diff --git a/tests/unit/test_macos_container_launch.py b/tests/unit/test_macos_container_launch.py index d9ae81c..4f66005 100644 --- a/tests/unit/test_macos_container_launch.py +++ b/tests/unit/test_macos_container_launch.py @@ -42,6 +42,7 @@ def _plan( routes_path=routes_path, routes=("route",), token_env_map={"EGRESS_TOKEN_0": "HOST_TOKEN"}, + canary="", ) if git: key_path = stage_dir / "origin-key" @@ -271,7 +272,7 @@ def _build_plan(stage_dir: Path) -> MacosContainerBottlePlan: manifest=_MANIFEST, stage_dir=stage_dir, git_gate_plan=cast(GitGatePlan, SimpleNamespace(upstreams=())), - egress_plan=cast(EgressPlan, SimpleNamespace()), + egress_plan=cast(EgressPlan, SimpleNamespace(canary="")), supervise_plan=None, agent_provision=AgentProvisionPlan( template="claude", -- 2.52.0 From 11cf12188dd8f66dc304c75eecab868e1e4b12e2 Mon Sep 17 00:00:00 2001 From: claude Date: Thu, 25 Jun 2026 01:02:44 +0000 Subject: [PATCH 03/10] feat(egress): inject per-session canary token into sidecar and agent environments EgressPlan gains a `canary: str` field (default "") populated in Egress.prepare() using secrets.token_urlsafe(32). Each launched bottle: - sidecar receives EGRESS_TOKEN_CANARY= (literal env entry, scanned by existing known-secrets detector without any detector code changes) - agent receives BOT_BOTTLE_CANARY= (visible fake secret that signals exfiltration with zero false positives if it appears in outbound traffic) Docker compose and macos-container backends updated; smolmachines shares docker compose and so picks this up automatically. Unit tests cover canary uniqueness, detection via scan_known_secrets, and EgressPlan backward-compat default. Co-Authored-By: Claude Sonnet 4.6 --- bot_bottle/backend/docker/compose.py | 8 +++ bot_bottle/backend/macos_container/launch.py | 4 ++ bot_bottle/egress.py | 7 +++ tests/unit/test_egress.py | 65 +++++++++++++++++++- 4 files changed, 83 insertions(+), 1 deletion(-) diff --git a/bot_bottle/backend/docker/compose.py b/bot_bottle/backend/docker/compose.py index 9ad0011..121fc02 100644 --- a/bot_bottle/backend/docker/compose.py +++ b/bot_bottle/backend/docker/compose.py @@ -137,6 +137,10 @@ def _sidecar_bundle_service(plan: DockerBottlePlan) -> dict[str, Any]: volumes.append(_bind(ep.routes_path.parent, str(Path(EGRESS_ROUTES_IN_CONTAINER).parent))) for token_env in sorted(ep.token_env_map.keys()): env.append(token_env) + if ep.canary: + # Inject canary as a literal NAME=VALUE (not a bare name) — the + # value is a fake secret so it need not be hidden from the compose file. + env.append(f"EGRESS_TOKEN_CANARY={ep.canary}") # --- git-gate ----------------------------------------------------- gp = plan.git_gate_plan @@ -220,6 +224,10 @@ def _agent_service(plan: DockerBottlePlan) -> dict[str, Any]: # never lands on argv or in the compose file. for name in sorted(plan.forwarded_env.keys()): env.append(name) + # Canary token: visible to the agent as a fake secret so that any + # outbound appearance of this value is a zero-FP exfil signal. + if plan.egress_plan.canary: + env.append(f"BOT_BOTTLE_CANARY={plan.egress_plan.canary}") service: dict[str, Any] = { "image": plan.image, diff --git a/bot_bottle/backend/macos_container/launch.py b/bot_bottle/backend/macos_container/launch.py index 8de4385..9c0f145 100644 --- a/bot_bottle/backend/macos_container/launch.py +++ b/bot_bottle/backend/macos_container/launch.py @@ -353,6 +353,8 @@ def _sidecar_env_entries(plan: MacosContainerBottlePlan) -> tuple[str, ...]: env: list[str] = [] if plan.egress_plan.routes: env.extend(sorted(plan.egress_plan.token_env_map.keys())) + if plan.egress_plan.canary: + env.append(f"EGRESS_TOKEN_CANARY={plan.egress_plan.canary}") if plan.git_gate_plan.upstreams: env.append(f"BOT_BOTTLE_GIT_GATE_READY_FILE={_GIT_GATE_READY_FILE}") if plan.supervise_plan is not None: @@ -420,6 +422,8 @@ def _agent_env_entries( env.append(f"{name}={value}") for name in sorted(plan.forwarded_env.keys()): env.append(name) + if plan.egress_plan.canary: + env.append(f"BOT_BOTTLE_CANARY={plan.egress_plan.canary}") return tuple(env) diff --git a/bot_bottle/egress.py b/bot_bottle/egress.py index 8943049..ea10183 100644 --- a/bot_bottle/egress.py +++ b/bot_bottle/egress.py @@ -10,6 +10,7 @@ specific and lives on concrete subclasses (see from __future__ import annotations import dataclasses +import secrets from abc import ABC from dataclasses import dataclass from pathlib import Path @@ -65,6 +66,7 @@ class EgressPlan: mitmproxy_ca_host_path: Path = Path() mitmproxy_ca_cert_only_host_path: Path = Path() log: int = 0 + canary: str = "" def egress_manifest_routes( @@ -324,12 +326,17 @@ class Egress(ABC): routes_path = stage_dir / EGRESS_ROUTES_FILENAME routes_path.write_text(egress_render_routes(routes, log=log)) routes_path.chmod(0o600) + # Generate a per-session canary token. The sidecar receives it as + # EGRESS_TOKEN_CANARY (scanned by the existing known-secrets detector); + # the agent receives it as BOT_BOTTLE_CANARY (a visible fake secret). + canary = secrets.token_urlsafe(32) return EgressPlan( slug=slug, routes_path=routes_path, routes=routes, token_env_map=egress_token_env_map(routes), log=log, + canary=canary, ) __all__ = [ diff --git a/tests/unit/test_egress.py b/tests/unit/test_egress.py index 4fdae02..73d4c9a 100644 --- a/tests/unit/test_egress.py +++ b/tests/unit/test_egress.py @@ -1,10 +1,14 @@ """Unit: Egress route lift + routes.yaml render + token -resolution (PRD 0017, PRD 0053).""" +resolution (PRD 0017, PRD 0053, prd-new).""" +import tempfile import unittest +from pathlib import Path from bot_bottle.egress import ( CODEX_HOST_CREDENTIAL_TOKEN_REF, + Egress, + EgressPlan, EgressRoute, egress_manifest_routes, egress_render_routes, @@ -443,5 +447,64 @@ class TestResolveTokenValues(unittest.TestCase): self.assertEqual({"EGRESS_TOKEN_0": "codex-access-token"}, out) +class TestCanaryGeneration(unittest.TestCase): + """Egress.prepare() generates a unique canary token per session (prd-new).""" + + def _bottle_obj(self): + return ManifestIndex.from_json_obj({ + "bottles": {"dev": {"egress": {"routes": []}}}, + "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, + }).bottles["dev"] + + def _make_plan(self) -> EgressPlan: + # Use a concrete no-op subclass so we can call prepare() without + # a real backend. + class _TestEgress(Egress): + pass + + e = _TestEgress() + with tempfile.TemporaryDirectory() as td: + return e.prepare(self._bottle_obj(), "test-slug", Path(td)) + + def test_canary_is_non_empty(self): + plan = self._make_plan() + self.assertIsInstance(plan.canary, str) + self.assertGreater(len(plan.canary), 0) + + def test_canary_is_unique_per_session(self): + with tempfile.TemporaryDirectory() as td: + bottle = self._bottle_obj() + + class _TestEgress(Egress): + pass + + e = _TestEgress() + plan_a = e.prepare(bottle, "slug-a", Path(td)) + plan_b = e.prepare(bottle, "slug-b", Path(td)) + self.assertNotEqual(plan_a.canary, plan_b.canary) + + def test_canary_detected_by_scan_known_secrets(self): + from bot_bottle.dlp_detectors import scan_known_secrets + + plan = self._make_plan() + env = {"EGRESS_TOKEN_CANARY": plan.canary} + result = scan_known_secrets(f"exfil={plan.canary}", env=env) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("block", result.severity) + self.assertIn("EGRESS_TOKEN_CANARY", result.reason) + + def test_egress_plan_canary_field_default_empty(self): + # Verify EgressPlan can be constructed with an empty canary (backward compat). + from pathlib import Path + plan = EgressPlan( + slug="s", + routes_path=Path("/tmp/r.yaml"), + routes=(), + token_env_map={}, + ) + self.assertEqual("", plan.canary) + + if __name__ == "__main__": unittest.main() -- 2.52.0 From e02fab15d071d341b95e11499f32a5d2e7b11650 Mon Sep 17 00:00:00 2001 From: claude Date: Thu, 25 Jun 2026 01:02:55 +0000 Subject: [PATCH 04/10] =?UTF-8?q?docs(prd):=20flip=20prd-new-strengthen-ou?= =?UTF-8?q?tbound-exfil-detection=20Draft=20=E2=86=92=20Active?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Sonnet 4.6 --- docs/prds/prd-new-strengthen-outbound-exfil-detection.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/prds/prd-new-strengthen-outbound-exfil-detection.md b/docs/prds/prd-new-strengthen-outbound-exfil-detection.md index 1f55627..7cf49d5 100644 --- a/docs/prds/prd-new-strengthen-outbound-exfil-detection.md +++ b/docs/prds/prd-new-strengthen-outbound-exfil-detection.md @@ -1,6 +1,6 @@ # PRD prd-new: Strengthen outbound exfiltration detection -- **Status:** Draft +- **Status:** Active - **Author:** claude - **Created:** 2026-06-25 - **Issue:** #259 -- 2.52.0 From a9202037302371c5c3bf420403e07d3e1e006c59 Mon Sep 17 00:00:00 2001 From: claude Date: Thu, 25 Jun 2026 02:33:34 +0000 Subject: [PATCH 05/10] fix(dlp): skip projection passes when exact variant is safe-listed When a supervisor-approved safe-token exactly matched an env secret (Pass 1), Passes 2 & 3 (alnum projection) still ran and re-blocked on the same value. Track whether any variant was found-and-approved and skip the projection passes for that secret in that case. --- bot_bottle/dlp_detectors.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/bot_bottle/dlp_detectors.py b/bot_bottle/dlp_detectors.py index e46960d..d4a4cb1 100644 --- a/bot_bottle/dlp_detectors.py +++ b/bot_bottle/dlp_detectors.py @@ -222,6 +222,7 @@ def scan_known_secrets( continue # Pass 1: exact match across encoded variants (original behaviour). + approved_exact = False for variant in _encoded_variants(value): pos = text.find(variant) if pos >= 0: @@ -229,6 +230,7 @@ def scan_known_secrets( # (PRD 0062); a different encoding of the same secret is a # fresh block. if safe_tokens is not None and variant in safe_tokens: + approved_exact = True continue return ScanResult( severity="block", @@ -237,6 +239,10 @@ def scan_known_secrets( context=_snippet(text, pos, pos + len(variant)), matched=variant, ) + if approved_exact: + # Exact match was found and approved; projection passes would + # fire on the same value, so skip them for this secret. + continue # Pass 2 & 3: fragmentation-resistant projection checks. secret_alnum = _alnum_projection(value) -- 2.52.0 From 0a7e166b35d6890ebcf05ce90316f7b5bccfc03a Mon Sep 17 00:00:00 2001 From: codex Date: Thu, 25 Jun 2026 02:48:49 +0000 Subject: [PATCH 06/10] fix(tests): remove unused dlp entropy import --- tests/unit/test_dlp_detectors.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/unit/test_dlp_detectors.py b/tests/unit/test_dlp_detectors.py index 4be520f..f7f593c 100644 --- a/tests/unit/test_dlp_detectors.py +++ b/tests/unit/test_dlp_detectors.py @@ -9,7 +9,6 @@ import unittest from bot_bottle.dlp_detectors import ( ENTROPY_BLOCK_THRESHOLD, - ENTROPY_WINDOW, PARTIAL_MATCH_MIN_LEN, REDACT, _alnum_projection, -- 2.52.0 From 4808ef557a3dd3b4d4a821b914864be50582d4bb Mon Sep 17 00:00:00 2001 From: codex Date: Thu, 25 Jun 2026 03:25:23 +0000 Subject: [PATCH 07/10] fix(egress): randomize canary secret env name --- bot_bottle/backend/docker/compose.py | 13 +++---- bot_bottle/backend/macos_container/launch.py | 9 ++--- bot_bottle/egress.py | 34 +++++++++++++++++-- ...new-strengthen-outbound-exfil-detection.md | 21 +++++++----- tests/unit/test_compose.py | 25 ++++++++++++-- tests/unit/test_dlp_detectors.py | 10 ++++-- tests/unit/test_egress.py | 12 +++++-- tests/unit/test_egress_addon_core.py | 17 +++++++--- tests/unit/test_macos_container_launch.py | 24 ++++++++++++- 9 files changed, 129 insertions(+), 36 deletions(-) diff --git a/bot_bottle/backend/docker/compose.py b/bot_bottle/backend/docker/compose.py index 121fc02..2806509 100644 --- a/bot_bottle/backend/docker/compose.py +++ b/bot_bottle/backend/docker/compose.py @@ -137,10 +137,11 @@ def _sidecar_bundle_service(plan: DockerBottlePlan) -> dict[str, Any]: volumes.append(_bind(ep.routes_path.parent, str(Path(EGRESS_ROUTES_IN_CONTAINER).parent))) for token_env in sorted(ep.token_env_map.keys()): env.append(token_env) - if ep.canary: - # Inject canary as a literal NAME=VALUE (not a bare name) — the - # value is a fake secret so it need not be hidden from the compose file. - env.append(f"EGRESS_TOKEN_CANARY={ep.canary}") + if ep.canary and ep.canary_env: + # Inject the fake secret as a literal NAME=VALUE (not a bare name) and + # tell the sidecar to scan that exact env var as sensitive. + env.append(f"{ep.canary_env}={ep.canary}") + env.append(f"BOT_BOTTLE_SENSITIVE_PREFIXES={ep.canary_env}") # --- git-gate ----------------------------------------------------- gp = plan.git_gate_plan @@ -226,8 +227,8 @@ def _agent_service(plan: DockerBottlePlan) -> dict[str, Any]: env.append(name) # Canary token: visible to the agent as a fake secret so that any # outbound appearance of this value is a zero-FP exfil signal. - if plan.egress_plan.canary: - env.append(f"BOT_BOTTLE_CANARY={plan.egress_plan.canary}") + if plan.egress_plan.canary and plan.egress_plan.canary_env: + env.append(f"{plan.egress_plan.canary_env}={plan.egress_plan.canary}") service: dict[str, Any] = { "image": plan.image, diff --git a/bot_bottle/backend/macos_container/launch.py b/bot_bottle/backend/macos_container/launch.py index 9c0f145..26c8d17 100644 --- a/bot_bottle/backend/macos_container/launch.py +++ b/bot_bottle/backend/macos_container/launch.py @@ -353,8 +353,9 @@ def _sidecar_env_entries(plan: MacosContainerBottlePlan) -> tuple[str, ...]: env: list[str] = [] if plan.egress_plan.routes: env.extend(sorted(plan.egress_plan.token_env_map.keys())) - if plan.egress_plan.canary: - env.append(f"EGRESS_TOKEN_CANARY={plan.egress_plan.canary}") + if plan.egress_plan.canary and plan.egress_plan.canary_env: + env.append(f"{plan.egress_plan.canary_env}={plan.egress_plan.canary}") + env.append(f"BOT_BOTTLE_SENSITIVE_PREFIXES={plan.egress_plan.canary_env}") if plan.git_gate_plan.upstreams: env.append(f"BOT_BOTTLE_GIT_GATE_READY_FILE={_GIT_GATE_READY_FILE}") if plan.supervise_plan is not None: @@ -422,8 +423,8 @@ def _agent_env_entries( env.append(f"{name}={value}") for name in sorted(plan.forwarded_env.keys()): env.append(name) - if plan.egress_plan.canary: - env.append(f"BOT_BOTTLE_CANARY={plan.egress_plan.canary}") + if plan.egress_plan.canary and plan.egress_plan.canary_env: + env.append(f"{plan.egress_plan.canary_env}={plan.egress_plan.canary}") return tuple(env) diff --git a/bot_bottle/egress.py b/bot_bottle/egress.py index ea10183..cd78790 100644 --- a/bot_bottle/egress.py +++ b/bot_bottle/egress.py @@ -35,6 +35,32 @@ EGRESS_HOSTNAME = "egress" EGRESS_ROUTES_IN_CONTAINER = "/etc/egress/routes.yaml" EGRESS_ROUTES_FILENAME = Path(EGRESS_ROUTES_IN_CONTAINER).name +_CANARY_ENV_WORDS = ( + "ACCORD", + "ANCHOR", + "ATLAS", + "CANON", + "CIPHER", + "EMBER", + "FALCON", + "HARBOR", + "LANTERN", + "MARBLE", + "NOVA", + "ORBIT", + "PIVOT", + "RADIUS", + "SUMMIT", + "VECTOR", +) + + +def _random_canary_env() -> str: + first = secrets.choice(_CANARY_ENV_WORDS) + remaining = tuple(word for word in _CANARY_ENV_WORDS if word != first) + second = secrets.choice(remaining) + return f"{first}_{second}_SECRET" + @dataclass(frozen=True) class EgressRoute(Route): @@ -67,6 +93,7 @@ class EgressPlan: mitmproxy_ca_cert_only_host_path: Path = Path() log: int = 0 canary: str = "" + canary_env: str = "" def egress_manifest_routes( @@ -326,9 +353,9 @@ class Egress(ABC): routes_path = stage_dir / EGRESS_ROUTES_FILENAME routes_path.write_text(egress_render_routes(routes, log=log)) routes_path.chmod(0o600) - # Generate a per-session canary token. The sidecar receives it as - # EGRESS_TOKEN_CANARY (scanned by the existing known-secrets detector); - # the agent receives it as BOT_BOTTLE_CANARY (a visible fake secret). + # Generate a per-session fake secret under a plausible random env name. + # The sidecar marks that exact env name as sensitive for known-secret + # scanning; the agent receives the same name/value as exfil bait. canary = secrets.token_urlsafe(32) return EgressPlan( slug=slug, @@ -337,6 +364,7 @@ class Egress(ABC): token_env_map=egress_token_env_map(routes), log=log, canary=canary, + canary_env=_random_canary_env(), ) __all__ = [ diff --git a/docs/prds/prd-new-strengthen-outbound-exfil-detection.md b/docs/prds/prd-new-strengthen-outbound-exfil-detection.md index 7cf49d5..490e519 100644 --- a/docs/prds/prd-new-strengthen-outbound-exfil-detection.md +++ b/docs/prds/prd-new-strengthen-outbound-exfil-detection.md @@ -37,9 +37,10 @@ query, headers, body). But the content-based strong tier only matches ## Goals / Success Criteria 1. Each launched bottle has a unique canary token in the agent's environment - (`BOT_BOTTLE_CANARY`) and the egress sidecar's environment - (`EGRESS_TOKEN_CANARY`). Any outbound appearance of the canary blocks the - request with reason `"canary token"`. + under a randomized `WORD_WORD_SECRET` env var name. The egress sidecar gets + the same env var and registers that exact name through + `BOT_BOTTLE_SENSITIVE_PREFIXES`. Any outbound appearance of the canary + blocks the request as a known-secret match. 2. `scan_known_secrets` accepts a `sensitive_prefixes` parameter (default: `("EGRESS_TOKEN_",)`). `scan_outbound` reads `BOT_BOTTLE_SENSITIVE_PREFIXES` from `environ` and merges those prefixes @@ -77,18 +78,20 @@ query, headers, body). But the content-based strong tier only matches ``` Egress.prepare() canary = secrets.token_urlsafe(32) - EgressPlan(canary=canary, ...) + canary_env = + EgressPlan(canary=canary, canary_env=canary_env, ...) Docker compose render: - sidecar env: EGRESS_TOKEN_CANARY= ← scanned by existing known-secrets detector - agent env: BOT_BOTTLE_CANARY= ← visible to agent as a "secret" + sidecar env: = + sidecar env: BOT_BOTTLE_SENSITIVE_PREFIXES= + agent env: = ← visible to agent as a "secret" macos-container launch: same literals added to sidecar + agent env entries ``` -`EGRESS_TOKEN_CANARY` matches the `EGRESS_TOKEN_` prefix already scanned by -`scan_known_secrets`, so no detector code changes are required for canary -detection — only the injection path. +The sidecar uses `BOT_BOTTLE_SENSITIVE_PREFIXES` to make the random canary env +name part of the existing `scan_known_secrets` detector without adding a +manifest schema field. ### Broadened known-value scanning diff --git a/tests/unit/test_compose.py b/tests/unit/test_compose.py index 8b10eec..0e5d743 100644 --- a/tests/unit/test_compose.py +++ b/tests/unit/test_compose.py @@ -80,7 +80,11 @@ def _git_gate_plan(upstreams: tuple[GitGateUpstream, ...] = ()) -> GitGatePlan: ) -def _egress_plan(routes: tuple[EgressRoute, ...] = ()) -> EgressPlan: +def _egress_plan( + routes: tuple[EgressRoute, ...] = (), + *, + canary: bool = False, +) -> EgressPlan: token_env_map = { r.token_env: r.token_ref for r in routes @@ -95,6 +99,8 @@ def _egress_plan(routes: tuple[EgressRoute, ...] = ()) -> EgressPlan: egress_network=f"bot-bottle-egress-{SLUG}", mitmproxy_ca_host_path=STATE / "egress-ca" / "mitmproxy-ca.pem", mitmproxy_ca_cert_only_host_path=STATE / "egress-ca" / "ca.pem", + canary="fake-canary-value" if canary else "", + canary_env="CANON_ALPHA_SECRET" if canary else "", ) @@ -112,6 +118,7 @@ def _plan( with_git: bool = False, with_egress: bool = False, supervise: bool = False, + canary: bool = False, ) -> DockerBottlePlan: """Build a fully-resolved DockerBottlePlan. Toggles cover the matrix the renderer's conditional-service logic branches on.""" @@ -150,7 +157,7 @@ def _plan( slug=SLUG, forwarded_env={"CLAUDE_CODE_OAUTH_TOKEN": "x"}, git_gate_plan=_git_gate_plan(upstreams), - egress_plan=_egress_plan(routes), + egress_plan=_egress_plan(routes, canary=canary), supervise_plan=_supervise_plan() if supervise else None, use_runsc=False, agent_provision=AgentProvisionPlan( @@ -375,6 +382,20 @@ class TestSidecarBundleShape(unittest.TestCase): env_strings = sc["environment"] self.assertNotIn("EGRESS_TOKEN_0", env_strings) + def test_canary_env_registered_as_sensitive_in_sidecar(self): + sc = self._render(canary=True)["services"]["sidecars"] + env_strings = sc["environment"] + self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", env_strings) + self.assertIn( + "BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET", + env_strings, + ) + + def test_canary_env_visible_to_agent(self): + agent = self._render(canary=True)["services"]["agent"] + env_strings = agent["environment"] + self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", env_strings) + def test_supervise_env_present_when_active(self): sc = self._render(supervise=True)["services"]["sidecars"] env_strings = sc["environment"] diff --git a/tests/unit/test_dlp_detectors.py b/tests/unit/test_dlp_detectors.py index f7f593c..bf6ebd6 100644 --- a/tests/unit/test_dlp_detectors.py +++ b/tests/unit/test_dlp_detectors.py @@ -593,11 +593,15 @@ class TestFragmentationResistantMatching(unittest.TestCase): def test_canary_prefix_detected(self): canary_value = "canary-fake-secret-value-xyz" - env = {"EGRESS_TOKEN_CANARY": canary_value} - result = scan_known_secrets(f"x={canary_value}", env=env) + env = {"CANON_ALPHA_SECRET": canary_value} + result = scan_known_secrets( + f"x={canary_value}", + env=env, + sensitive_prefixes=("CANON_ALPHA_SECRET",), + ) self.assertIsNotNone(result) assert result is not None - self.assertIn("EGRESS_TOKEN_CANARY", result.reason) + self.assertIn("CANON_ALPHA_SECRET", result.reason) class TestRedactTokensBroadenedPrefixes(unittest.TestCase): diff --git a/tests/unit/test_egress.py b/tests/unit/test_egress.py index 73d4c9a..ffcfe38 100644 --- a/tests/unit/test_egress.py +++ b/tests/unit/test_egress.py @@ -470,6 +470,7 @@ class TestCanaryGeneration(unittest.TestCase): plan = self._make_plan() self.assertIsInstance(plan.canary, str) self.assertGreater(len(plan.canary), 0) + self.assertRegex(plan.canary_env, r"^[A-Z]+_[A-Z]+_SECRET$") def test_canary_is_unique_per_session(self): with tempfile.TemporaryDirectory() as td: @@ -487,12 +488,16 @@ class TestCanaryGeneration(unittest.TestCase): from bot_bottle.dlp_detectors import scan_known_secrets plan = self._make_plan() - env = {"EGRESS_TOKEN_CANARY": plan.canary} - result = scan_known_secrets(f"exfil={plan.canary}", env=env) + env = {plan.canary_env: plan.canary} + result = scan_known_secrets( + f"exfil={plan.canary}", + env=env, + sensitive_prefixes=(plan.canary_env,), + ) self.assertIsNotNone(result) assert result is not None self.assertEqual("block", result.severity) - self.assertIn("EGRESS_TOKEN_CANARY", result.reason) + self.assertIn(plan.canary_env, result.reason) def test_egress_plan_canary_field_default_empty(self): # Verify EgressPlan can be constructed with an empty canary (backward compat). @@ -504,6 +509,7 @@ class TestCanaryGeneration(unittest.TestCase): token_env_map={}, ) self.assertEqual("", plan.canary) + self.assertEqual("", plan.canary_env) if __name__ == "__main__": diff --git a/tests/unit/test_egress_addon_core.py b/tests/unit/test_egress_addon_core.py index 023f0ca..460d018 100644 --- a/tests/unit/test_egress_addon_core.py +++ b/tests/unit/test_egress_addon_core.py @@ -1338,20 +1338,27 @@ class TestScanOutboundEnhanced(unittest.TestCase): result = scan_outbound(self._ROUTE, f"auth={secret}", env) self.assertIsNotNone(result) - def test_canary_detected_via_egress_token_canary(self): - # The canary (injected as EGRESS_TOKEN_CANARY) is caught by known_secrets. + def test_canary_detected_via_random_secret_env_name(self): + # The fake secret uses a randomized env name that the sidecar marks + # as sensitive through BOT_BOTTLE_SENSITIVE_PREFIXES. canary = "canaryvalue12345abcdef" - env = {"EGRESS_TOKEN_CANARY": canary} + env = { + "CANON_ALPHA_SECRET": canary, + "BOT_BOTTLE_SENSITIVE_PREFIXES": "CANON_ALPHA_SECRET", + } result = scan_outbound(self._ROUTE, f"data={canary}", env) self.assertIsNotNone(result) assert result is not None self.assertEqual("block", result.severity) - self.assertIn("EGRESS_TOKEN_CANARY", result.reason) + self.assertIn("CANON_ALPHA_SECRET", result.reason) def test_fragmented_canary_blocked(self): # Canary with separators injected is still caught. canary = "supersecretcanary99" - env = {"EGRESS_TOKEN_CANARY": canary} + env = { + "CANON_ALPHA_SECRET": canary, + "BOT_BOTTLE_SENSITIVE_PREFIXES": "CANON_ALPHA_SECRET", + } fragmented = "-".join(canary) result = scan_outbound(self._ROUTE, f"x={fragmented}", env) self.assertIsNotNone(result) diff --git a/tests/unit/test_macos_container_launch.py b/tests/unit/test_macos_container_launch.py index 4f66005..3e1038e 100644 --- a/tests/unit/test_macos_container_launch.py +++ b/tests/unit/test_macos_container_launch.py @@ -30,6 +30,7 @@ def _plan( supervise: bool = False, agent_git_gate_url: str = "", agent_supervise_url: str = "", + canary: bool = False, ) -> MacosContainerBottlePlan: routes_path = stage_dir / "routes.yaml" routes_path.write_text("routes: []\n", encoding="utf-8") @@ -42,7 +43,8 @@ def _plan( routes_path=routes_path, routes=("route",), token_env_map={"EGRESS_TOKEN_0": "HOST_TOKEN"}, - canary="", + canary="fake-canary-value" if canary else "", + canary_env="CANON_ALPHA_SECRET" if canary else "", ) if git: key_path = stage_dir / "origin-key" @@ -139,6 +141,26 @@ class TestMacosContainerLaunchArgv(unittest.TestCase): argv, ) + def test_sidecar_argv_registers_canary_env_as_sensitive(self): + plan = _plan(stage_dir=self.stage_dir, canary=True) + argv = launch._sidecar_run_argv( + plan, + "bot-bottle-sidecars-dev-abc", + "bot-bottle-net-dev-abc", + "bot-bottle-egress-dev-abc", + ) + self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", argv) + self.assertIn("BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET", argv) + + def test_agent_argv_receives_canary_env(self): + plan = _plan(stage_dir=self.stage_dir, canary=True) + argv = launch._agent_run_argv( + plan, + "bot-bottle-net-dev-abc", + "192.0.2.10", + ) + self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", argv) + def test_agent_env_points_proxy_at_sidecar_ip(self): plan = _plan( stage_dir=self.stage_dir, -- 2.52.0 From 14ae89580ad291933c7bacdb076b73b133706d8c Mon Sep 17 00:00:00 2001 From: codex Date: Thu, 25 Jun 2026 03:31:51 +0000 Subject: [PATCH 08/10] fix(egress): wire canary env for smolmachines --- bot_bottle/backend/smolmachines/launch.py | 5 ++++ tests/unit/test_smolmachines_provision.py | 32 ++++++++++++++++++++--- 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/bot_bottle/backend/smolmachines/launch.py b/bot_bottle/backend/smolmachines/launch.py index e45fbbb..d14a6c2 100644 --- a/bot_bottle/backend/smolmachines/launch.py +++ b/bot_bottle/backend/smolmachines/launch.py @@ -228,6 +228,8 @@ def _discover_urls( guest_env["GIT_GATE_URL"] = f"http://{agent_git_gate_host}" if agent_supervise_url: guest_env["MCP_SUPERVISE_URL"] = agent_supervise_url + if plan.egress_plan.canary and plan.egress_plan.canary_env: + guest_env[plan.egress_plan.canary_env] = plan.egress_plan.canary return dataclasses.replace( plan, @@ -321,6 +323,9 @@ def _bundle_launch_spec( # the operator's shell), never landing on argv. for token_env in sorted(ep.token_env_map.keys()): env.append(token_env) + if ep.canary and ep.canary_env: + env.append(f"{ep.canary_env}={ep.canary}") + env.append(f"BOT_BOTTLE_SENSITIVE_PREFIXES={ep.canary_env}") # --- git-gate --------------------------------------------- gp = plan.git_gate_plan diff --git a/tests/unit/test_smolmachines_provision.py b/tests/unit/test_smolmachines_provision.py index e3fbbfc..b5744c3 100644 --- a/tests/unit/test_smolmachines_provision.py +++ b/tests/unit/test_smolmachines_provision.py @@ -26,9 +26,7 @@ from bot_bottle.backend.smolmachines.bottle import SmolmachinesBottle from bot_bottle.backend.smolmachines.bottle_plan import ( SmolmachinesBottlePlan, ) -# from bot_bottle.backend.smolmachines.provision import ( -# workspace as _workspace, -# ) +from bot_bottle.backend.smolmachines import launch as _launch from bot_bottle.backend.smolmachines.launch import _bundle_launch_spec from bot_bottle.backend.util import AGENT_CA_PATH from bot_bottle.egress import EgressPlan, EgressRoute @@ -86,6 +84,7 @@ def _plan( stage_dir: Path | None = None, egress_routes: tuple[EgressRoute, ...] = (), egress_ca_path: Path = Path(), + canary: bool = False, supervise: bool = False, bundle_ip: str = "192.168.50.2", agent_git_gate_host: str = "127.0.0.1:55555", @@ -156,6 +155,8 @@ def _plan( routes=egress_routes, token_env_map={}, mitmproxy_ca_cert_only_host_path=egress_ca_path, + canary="fake-canary-value" if canary else "", + canary_env="CANON_ALPHA_SECRET" if canary else "", ), supervise_plan=supervise_plan, agent_git_gate_host=agent_git_gate_host, @@ -411,6 +412,31 @@ class TestBundleLaunchSpec(unittest.TestCase): self.assertIn(9420, spec.ports_to_publish) self.assertNotIn(9418, spec.ports_to_publish) + def test_canary_env_registered_as_sensitive_in_bundle(self): + plan = _plan(canary=True) + + spec = _bundle_launch_spec(plan, "net", "127.0.0.16") + + self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", spec.environment) + self.assertIn( + "BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET", + spec.environment, + ) + + def test_canary_env_visible_to_smolvm_guest(self): + plan = _plan(canary=True) + with patch.object( + _launch._bundle, + "bundle_host_port", + return_value="65000", + ): + stamped = _launch._discover_urls(plan, "127.0.0.16") + + self.assertEqual( + "fake-canary-value", + stamped.guest_env["CANON_ALPHA_SECRET"], + ) + class TestProvisionGitUser(unittest.TestCase): """`provision_git` runs `git config --global` inside the -- 2.52.0 From 5204b987773542f5be9a51583bf01ffeef057209 Mon Sep 17 00:00:00 2001 From: codex Date: Thu, 25 Jun 2026 03:35:24 +0000 Subject: [PATCH 09/10] refactor(egress): centralize launch env entries --- bot_bottle/backend/docker/compose.py | 15 ++---- bot_bottle/backend/macos_container/launch.py | 17 +++---- bot_bottle/backend/smolmachines/launch.py | 16 +++--- bot_bottle/egress.py | 20 ++++++++ tests/unit/test_egress.py | 51 ++++++++++++++++++++ 5 files changed, 89 insertions(+), 30 deletions(-) diff --git a/bot_bottle/backend/docker/compose.py b/bot_bottle/backend/docker/compose.py index 2806509..1bbe708 100644 --- a/bot_bottle/backend/docker/compose.py +++ b/bot_bottle/backend/docker/compose.py @@ -28,6 +28,8 @@ from typing import Any from ...egress import ( EGRESS_HOSTNAME, EGRESS_ROUTES_IN_CONTAINER, + egress_agent_env_entries, + egress_sidecar_env_entries, ) from ...git_gate import GIT_GATE_HOSTNAME from ...log import die, warn @@ -135,13 +137,7 @@ def _sidecar_bundle_service(plan: DockerBottlePlan) -> dict[str, Any]: volumes.append(_bind(ep.mitmproxy_ca_host_path, EGRESS_CA_IN_CONTAINER)) if ep.routes: volumes.append(_bind(ep.routes_path.parent, str(Path(EGRESS_ROUTES_IN_CONTAINER).parent))) - for token_env in sorted(ep.token_env_map.keys()): - env.append(token_env) - if ep.canary and ep.canary_env: - # Inject the fake secret as a literal NAME=VALUE (not a bare name) and - # tell the sidecar to scan that exact env var as sensitive. - env.append(f"{ep.canary_env}={ep.canary}") - env.append(f"BOT_BOTTLE_SENSITIVE_PREFIXES={ep.canary_env}") + env.extend(egress_sidecar_env_entries(ep)) # --- git-gate ----------------------------------------------------- gp = plan.git_gate_plan @@ -225,10 +221,7 @@ def _agent_service(plan: DockerBottlePlan) -> dict[str, Any]: # never lands on argv or in the compose file. for name in sorted(plan.forwarded_env.keys()): env.append(name) - # Canary token: visible to the agent as a fake secret so that any - # outbound appearance of this value is a zero-FP exfil signal. - if plan.egress_plan.canary and plan.egress_plan.canary_env: - env.append(f"{plan.egress_plan.canary_env}={plan.egress_plan.canary}") + env.extend(egress_agent_env_entries(plan.egress_plan)) service: dict[str, Any] = { "image": plan.image, diff --git a/bot_bottle/backend/macos_container/launch.py b/bot_bottle/backend/macos_container/launch.py index 26c8d17..d4e75fb 100644 --- a/bot_bottle/backend/macos_container/launch.py +++ b/bot_bottle/backend/macos_container/launch.py @@ -22,7 +22,12 @@ from ...bottle_state import ( git_gate_state_dir, read_committed_image, ) -from ...egress import EGRESS_ROUTES_IN_CONTAINER, egress_resolve_token_values +from ...egress import ( + EGRESS_ROUTES_IN_CONTAINER, + egress_agent_env_entries, + egress_resolve_token_values, + egress_sidecar_env_entries, +) from ...git_gate import revoke_git_gate_provisioned_keys from ...log import die, info, warn from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT @@ -350,12 +355,7 @@ def _sidecar_daemons(plan: MacosContainerBottlePlan) -> tuple[str, ...]: def _sidecar_env_entries(plan: MacosContainerBottlePlan) -> tuple[str, ...]: - env: list[str] = [] - if plan.egress_plan.routes: - env.extend(sorted(plan.egress_plan.token_env_map.keys())) - if plan.egress_plan.canary and plan.egress_plan.canary_env: - env.append(f"{plan.egress_plan.canary_env}={plan.egress_plan.canary}") - env.append(f"BOT_BOTTLE_SENSITIVE_PREFIXES={plan.egress_plan.canary_env}") + env: list[str] = list(egress_sidecar_env_entries(plan.egress_plan)) if plan.git_gate_plan.upstreams: env.append(f"BOT_BOTTLE_GIT_GATE_READY_FILE={_GIT_GATE_READY_FILE}") if plan.supervise_plan is not None: @@ -423,8 +423,7 @@ def _agent_env_entries( env.append(f"{name}={value}") for name in sorted(plan.forwarded_env.keys()): env.append(name) - if plan.egress_plan.canary and plan.egress_plan.canary_env: - env.append(f"{plan.egress_plan.canary_env}={plan.egress_plan.canary}") + env.extend(egress_agent_env_entries(plan.egress_plan)) return tuple(env) diff --git a/bot_bottle/backend/smolmachines/launch.py b/bot_bottle/backend/smolmachines/launch.py index d14a6c2..8b67e25 100644 --- a/bot_bottle/backend/smolmachines/launch.py +++ b/bot_bottle/backend/smolmachines/launch.py @@ -23,7 +23,9 @@ from typing import Callable, Generator from ...egress import ( EGRESS_ROUTES_IN_CONTAINER, + egress_agent_env_entries, egress_resolve_token_values, + egress_sidecar_env_entries, ) from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT from ...util import expand_tilde @@ -228,8 +230,9 @@ def _discover_urls( guest_env["GIT_GATE_URL"] = f"http://{agent_git_gate_host}" if agent_supervise_url: guest_env["MCP_SUPERVISE_URL"] = agent_supervise_url - if plan.egress_plan.canary and plan.egress_plan.canary_env: - guest_env[plan.egress_plan.canary_env] = plan.egress_plan.canary + for entry in egress_agent_env_entries(plan.egress_plan): + name, value = entry.split("=", 1) + guest_env[name] = value return dataclasses.replace( plan, @@ -318,14 +321,7 @@ def _bundle_launch_spec( volumes.append((str(ep.mitmproxy_ca_host_path), EGRESS_CA_IN_CONTAINER, True)) if ep.routes: volumes.append((str(ep.routes_path.parent), str(Path(EGRESS_ROUTES_IN_CONTAINER).parent), True)) - # Bare-name entries for upstream-token slots. Their values - # come from the docker-run subprocess env (inherited from - # the operator's shell), never landing on argv. - for token_env in sorted(ep.token_env_map.keys()): - env.append(token_env) - if ep.canary and ep.canary_env: - env.append(f"{ep.canary_env}={ep.canary}") - env.append(f"BOT_BOTTLE_SENSITIVE_PREFIXES={ep.canary_env}") + env.extend(egress_sidecar_env_entries(ep)) # --- git-gate --------------------------------------------- gp = plan.git_gate_plan diff --git a/bot_bottle/egress.py b/bot_bottle/egress.py index cd78790..13718d6 100644 --- a/bot_bottle/egress.py +++ b/bot_bottle/egress.py @@ -62,6 +62,24 @@ def _random_canary_env() -> str: return f"{first}_{second}_SECRET" +def egress_sidecar_env_entries(plan: "EgressPlan") -> tuple[str, ...]: + """Return sidecar env entries needed by egress across all backends.""" + env: list[str] = [] + if plan.routes: + env.extend(sorted(plan.token_env_map.keys())) + if plan.canary and plan.canary_env: + env.append(f"{plan.canary_env}={plan.canary}") + env.append(f"BOT_BOTTLE_SENSITIVE_PREFIXES={plan.canary_env}") + return tuple(env) + + +def egress_agent_env_entries(plan: "EgressPlan") -> tuple[str, ...]: + """Return agent-visible egress env entries shared by all backends.""" + if plan.canary and plan.canary_env: + return (f"{plan.canary_env}={plan.canary}",) + return () + + @dataclass(frozen=True) class EgressRoute(Route): """Host-side extension of the addon's `Route`. @@ -379,5 +397,7 @@ __all__ = [ "egress_render_routes", "egress_resolve_token_values", "egress_routes_for_bottle", + "egress_agent_env_entries", + "egress_sidecar_env_entries", "egress_token_env_map", ] diff --git a/tests/unit/test_egress.py b/tests/unit/test_egress.py index ffcfe38..c2b6899 100644 --- a/tests/unit/test_egress.py +++ b/tests/unit/test_egress.py @@ -10,10 +10,12 @@ from bot_bottle.egress import ( Egress, EgressPlan, EgressRoute, + egress_agent_env_entries, egress_manifest_routes, egress_render_routes, egress_resolve_token_values, egress_routes_for_bottle, + egress_sidecar_env_entries, egress_token_env_map, ) from bot_bottle.log import Die @@ -512,5 +514,54 @@ class TestCanaryGeneration(unittest.TestCase): self.assertEqual("", plan.canary_env) +class TestEgressEnvEntries(unittest.TestCase): + def test_sidecar_entries_include_route_tokens_and_canary_scan_prefix(self): + plan = EgressPlan( + slug="s", + routes_path=Path("/tmp/r.yaml"), + routes=(EgressRoute(host="api.example"),), + token_env_map={"EGRESS_TOKEN_1": "T1", "EGRESS_TOKEN_0": "T0"}, + canary="fake-canary-value", + canary_env="CANON_ALPHA_SECRET", + ) + + self.assertEqual( + ( + "EGRESS_TOKEN_0", + "EGRESS_TOKEN_1", + "CANON_ALPHA_SECRET=fake-canary-value", + "BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET", + ), + egress_sidecar_env_entries(plan), + ) + + def test_agent_entries_include_only_canary_bait(self): + plan = EgressPlan( + slug="s", + routes_path=Path("/tmp/r.yaml"), + routes=(), + token_env_map={}, + canary="fake-canary-value", + canary_env="CANON_ALPHA_SECRET", + ) + + self.assertEqual( + ("CANON_ALPHA_SECRET=fake-canary-value",), + egress_agent_env_entries(plan), + ) + + def test_canary_entries_omitted_when_name_missing(self): + plan = EgressPlan( + slug="s", + routes_path=Path("/tmp/r.yaml"), + routes=(), + token_env_map={}, + canary="fake-canary-value", + ) + + self.assertEqual((), egress_sidecar_env_entries(plan)) + self.assertEqual((), egress_agent_env_entries(plan)) + + if __name__ == "__main__": unittest.main() -- 2.52.0 From d9a9eef27608d5523572c706e7eb863ae0ab7bfe Mon Sep 17 00:00:00 2001 From: codex Date: Thu, 25 Jun 2026 03:57:41 +0000 Subject: [PATCH 10/10] docs: remove prd-new code citations --- bot_bottle/dlp_detectors.py | 8 ++++---- tests/unit/test_dlp_detectors.py | 2 +- tests/unit/test_egress.py | 4 ++-- tests/unit/test_egress_addon_core.py | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/bot_bottle/dlp_detectors.py b/bot_bottle/dlp_detectors.py index d4a4cb1..ac0e29e 100644 --- a/bot_bottle/dlp_detectors.py +++ b/bot_bottle/dlp_detectors.py @@ -1,4 +1,4 @@ -"""DLP detectors for the egress proxy (PRD 0053, prd-new). +"""DLP detectors for the egress proxy (PRD 0053). Pure Python, no mitmproxy dependency. Each detector is a module-level function returning `ScanResult | None`. @@ -123,7 +123,7 @@ def redact_tokens( # --------------------------------------------------------------------------- -# Known secrets detector (Phase 1b, prd-new) +# Known secrets detector # --------------------------------------------------------------------------- def _encoded_variants(secret: str) -> list[str]: @@ -165,7 +165,7 @@ def _encoded_variants(secret: str) -> list[str]: # --------------------------------------------------------------------------- -# Fragmentation-resistant helpers (prd-new) +# Fragmentation-resistant helpers # --------------------------------------------------------------------------- # Minimum length of alnum projection for projection-based checks to run. @@ -283,7 +283,7 @@ def scan_known_secrets( # --------------------------------------------------------------------------- -# Entropy detector (warn-only, prd-new) +# Entropy detector (warn-only) # --------------------------------------------------------------------------- # Sliding window size and step for the entropy scan. diff --git a/tests/unit/test_dlp_detectors.py b/tests/unit/test_dlp_detectors.py index bf6ebd6..dea4afc 100644 --- a/tests/unit/test_dlp_detectors.py +++ b/tests/unit/test_dlp_detectors.py @@ -1,4 +1,4 @@ -"""Unit: DLP detectors (PRD 0053, prd-new). +"""Unit: DLP detectors (PRD 0053). Tests for token pattern scanning, known secret detection, fragmentation- resistant matching, entropy scoring, and naive prompt injection detection.""" diff --git a/tests/unit/test_egress.py b/tests/unit/test_egress.py index c2b6899..a25a2a0 100644 --- a/tests/unit/test_egress.py +++ b/tests/unit/test_egress.py @@ -1,5 +1,5 @@ """Unit: Egress route lift + routes.yaml render + token -resolution (PRD 0017, PRD 0053, prd-new).""" +resolution (PRD 0017, PRD 0053).""" import tempfile import unittest @@ -450,7 +450,7 @@ class TestResolveTokenValues(unittest.TestCase): class TestCanaryGeneration(unittest.TestCase): - """Egress.prepare() generates a unique canary token per session (prd-new).""" + """Egress.prepare() generates a unique canary token per session.""" def _bottle_obj(self): return ManifestIndex.from_json_obj({ diff --git a/tests/unit/test_egress_addon_core.py b/tests/unit/test_egress_addon_core.py index 460d018..758b85a 100644 --- a/tests/unit/test_egress_addon_core.py +++ b/tests/unit/test_egress_addon_core.py @@ -1274,7 +1274,7 @@ class TestBuildTokenAllowPayload(unittest.TestCase): payload = build_token_allow_payload("h", "GET", "/", result) self.assertNotIn("context:", payload) class TestScanOutboundEnhanced(unittest.TestCase): - """scan_outbound changes from prd-new: binary decode, entropy detector, + """scan_outbound changes: binary decode, entropy detector, broadened known-value prefixes, fragmentation resistance.""" _ROUTE = Route(host="api.example.com") -- 2.52.0