diff --git a/bot_bottle/backend/docker/compose.py b/bot_bottle/backend/docker/compose.py index 9ad0011..1bbe708 100644 --- a/bot_bottle/backend/docker/compose.py +++ b/bot_bottle/backend/docker/compose.py @@ -28,6 +28,8 @@ from typing import Any from ...egress import ( EGRESS_HOSTNAME, EGRESS_ROUTES_IN_CONTAINER, + egress_agent_env_entries, + egress_sidecar_env_entries, ) from ...git_gate import GIT_GATE_HOSTNAME from ...log import die, warn @@ -135,8 +137,7 @@ def _sidecar_bundle_service(plan: DockerBottlePlan) -> dict[str, Any]: volumes.append(_bind(ep.mitmproxy_ca_host_path, EGRESS_CA_IN_CONTAINER)) if ep.routes: volumes.append(_bind(ep.routes_path.parent, str(Path(EGRESS_ROUTES_IN_CONTAINER).parent))) - for token_env in sorted(ep.token_env_map.keys()): - env.append(token_env) + env.extend(egress_sidecar_env_entries(ep)) # --- git-gate ----------------------------------------------------- gp = plan.git_gate_plan @@ -220,6 +221,7 @@ def _agent_service(plan: DockerBottlePlan) -> dict[str, Any]: # never lands on argv or in the compose file. for name in sorted(plan.forwarded_env.keys()): env.append(name) + env.extend(egress_agent_env_entries(plan.egress_plan)) service: dict[str, Any] = { "image": plan.image, diff --git a/bot_bottle/backend/macos_container/launch.py b/bot_bottle/backend/macos_container/launch.py index 8de4385..d4e75fb 100644 --- a/bot_bottle/backend/macos_container/launch.py +++ b/bot_bottle/backend/macos_container/launch.py @@ -22,7 +22,12 @@ from ...bottle_state import ( git_gate_state_dir, read_committed_image, ) -from ...egress import EGRESS_ROUTES_IN_CONTAINER, egress_resolve_token_values +from ...egress import ( + EGRESS_ROUTES_IN_CONTAINER, + egress_agent_env_entries, + egress_resolve_token_values, + egress_sidecar_env_entries, +) from ...git_gate import revoke_git_gate_provisioned_keys from ...log import die, info, warn from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT @@ -350,9 +355,7 @@ def _sidecar_daemons(plan: MacosContainerBottlePlan) -> tuple[str, ...]: def _sidecar_env_entries(plan: MacosContainerBottlePlan) -> tuple[str, ...]: - env: list[str] = [] - if plan.egress_plan.routes: - env.extend(sorted(plan.egress_plan.token_env_map.keys())) + env: list[str] = list(egress_sidecar_env_entries(plan.egress_plan)) if plan.git_gate_plan.upstreams: env.append(f"BOT_BOTTLE_GIT_GATE_READY_FILE={_GIT_GATE_READY_FILE}") if plan.supervise_plan is not None: @@ -420,6 +423,7 @@ def _agent_env_entries( env.append(f"{name}={value}") for name in sorted(plan.forwarded_env.keys()): env.append(name) + env.extend(egress_agent_env_entries(plan.egress_plan)) return tuple(env) diff --git a/bot_bottle/backend/smolmachines/launch.py b/bot_bottle/backend/smolmachines/launch.py index e45fbbb..8b67e25 100644 --- a/bot_bottle/backend/smolmachines/launch.py +++ b/bot_bottle/backend/smolmachines/launch.py @@ -23,7 +23,9 @@ from typing import Callable, Generator from ...egress import ( EGRESS_ROUTES_IN_CONTAINER, + egress_agent_env_entries, egress_resolve_token_values, + egress_sidecar_env_entries, ) from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT from ...util import expand_tilde @@ -228,6 +230,9 @@ def _discover_urls( guest_env["GIT_GATE_URL"] = f"http://{agent_git_gate_host}" if agent_supervise_url: guest_env["MCP_SUPERVISE_URL"] = agent_supervise_url + for entry in egress_agent_env_entries(plan.egress_plan): + name, value = entry.split("=", 1) + guest_env[name] = value return dataclasses.replace( plan, @@ -316,11 +321,7 @@ def _bundle_launch_spec( volumes.append((str(ep.mitmproxy_ca_host_path), EGRESS_CA_IN_CONTAINER, True)) if ep.routes: volumes.append((str(ep.routes_path.parent), str(Path(EGRESS_ROUTES_IN_CONTAINER).parent), True)) - # Bare-name entries for upstream-token slots. Their values - # come from the docker-run subprocess env (inherited from - # the operator's shell), never landing on argv. - for token_env in sorted(ep.token_env_map.keys()): - env.append(token_env) + env.extend(egress_sidecar_env_entries(ep)) # --- git-gate --------------------------------------------- gp = plan.git_gate_plan diff --git a/bot_bottle/dlp_detectors.py b/bot_bottle/dlp_detectors.py index ef36e7f..ac0e29e 100644 --- a/bot_bottle/dlp_detectors.py +++ b/bot_bottle/dlp_detectors.py @@ -15,6 +15,8 @@ import gzip import re import typing import unicodedata +from math import log2 +from collections import Counter from urllib.parse import quote as url_quote try: @@ -107,20 +109,21 @@ def redact_tokens( text: str, *, env: typing.Mapping[str, str] | None = None, + sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",), ) -> str: """Replace token pattern matches and (if env given) provisioned secrets with REDACT.""" for _, pattern in TOKEN_PATTERNS: text = pattern.sub(REDACT, text) if env is not None: for key, value in env.items(): - if key.startswith("EGRESS_TOKEN_") and value: + if any(key.startswith(p) for p in sensitive_prefixes) and value: for variant in _encoded_variants(value): text = text.replace(variant, REDACT) return text # --------------------------------------------------------------------------- -# Known secrets detector (Phase 1b) +# Known secrets detector # --------------------------------------------------------------------------- def _encoded_variants(secret: str) -> list[str]: @@ -161,18 +164,65 @@ def _encoded_variants(secret: str) -> list[str]: return variants +# --------------------------------------------------------------------------- +# Fragmentation-resistant helpers +# --------------------------------------------------------------------------- + +# Minimum length of alnum projection for projection-based checks to run. +# Short secrets produce too many false positives in projection space. +_ALNUM_MIN_LEN = 8 + +# Minimum window length for the partial-substring sliding scan. +PARTIAL_MATCH_MIN_LEN = 12 + + +def _alnum_projection(text: str) -> str: + """Return text with every non-alphanumeric character stripped. + + Used for fragmentation-resistant matching: separator-injected secrets + (spaces, hyphens, dots inserted between characters) are identical to + their originals in alnum projection space. + """ + return "".join(c for c in text if c.isalnum()) + + +def _find_partial_window(secret_alnum: str, text_alnum: str, min_len: int) -> int | None: + """Return the position in text_alnum where any min_len-char window of + secret_alnum first appears, or None. + + Slides a window of width min_len across secret_alnum and searches for + each window in text_alnum. The first hit position is returned. + """ + if len(secret_alnum) < min_len or len(text_alnum) < min_len: + return None + for i in range(len(secret_alnum) - min_len + 1): + window = secret_alnum[i:i + min_len] + pos = text_alnum.find(window) + if pos >= 0: + return pos + return None + + def scan_known_secrets( text: str, *, location: str = "body", env: typing.Mapping[str, str] | None = None, + sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",), safe_tokens: typing.AbstractSet[str] | None = None, ) -> ScanResult | None: if env is None: return None + + # Pre-compute alnum projection of the scan text once; reused per secret. + text_alnum: str | None = None + for key, value in env.items(): - if not key.startswith("EGRESS_TOKEN_") or not value: + if not any(key.startswith(p) for p in sensitive_prefixes) or not value: continue + + # Pass 1: exact match across encoded variants (original behaviour). + approved_exact = False for variant in _encoded_variants(value): pos = text.find(variant) if pos >= 0: @@ -180,6 +230,7 @@ def scan_known_secrets( # (PRD 0062); a different encoding of the same secret is a # fresh block. if safe_tokens is not None and variant in safe_tokens: + approved_exact = True continue return ScanResult( severity="block", @@ -188,6 +239,104 @@ def scan_known_secrets( context=_snippet(text, pos, pos + len(variant)), matched=variant, ) + if approved_exact: + # Exact match was found and approved; projection passes would + # fire on the same value, so skip them for this secret. + continue + + # Pass 2 & 3: fragmentation-resistant projection checks. + secret_alnum = _alnum_projection(value) + if len(secret_alnum) < _ALNUM_MIN_LEN: + continue + + if text_alnum is None: + text_alnum = _alnum_projection(text) + + # Pass 2: full alnum-projection exact match (catches separator injection). + pos2 = text_alnum.find(secret_alnum) + if pos2 >= 0: + return ScanResult( + severity="block", + reason=( + f"provisioned secret from {key} found in {location} " + f"(fragmented match — separator injection)" + ), + location=location, + context=_snippet(text_alnum, pos2, pos2 + len(secret_alnum)), + ) + + # Pass 3: sliding-window partial match (catches chunked-substring leaks). + pos3 = _find_partial_window(secret_alnum, text_alnum, PARTIAL_MATCH_MIN_LEN) + if pos3 is not None: + return ScanResult( + severity="block", + reason=( + f"provisioned secret from {key} found in {location} " + f"(partial match — at least {PARTIAL_MATCH_MIN_LEN} consecutive " + f"alphanumeric chars)" + ), + location=location, + context=_snippet(text_alnum, pos3, pos3 + PARTIAL_MATCH_MIN_LEN), + ) + + return None + + +# --------------------------------------------------------------------------- +# Entropy detector (warn-only) +# --------------------------------------------------------------------------- + +# Sliding window size and step for the entropy scan. +ENTROPY_WINDOW = 64 +ENTROPY_STEP = 32 + +# Bits-per-character threshold. Random ASCII printable ≈ 6.6 bits; random +# lowercase hex ≈ 4 bits; random base64url ≈ 6 bits. 5.5 sits above +# typical structured data (JSON, URLs) while staying below truly random +# content. +ENTROPY_BLOCK_THRESHOLD = 5.5 + + +def _shannon_entropy(text: str) -> float: + if not text: + return 0.0 + counts = Counter(text) + n = len(text) + return -sum((c / n) * log2(c / n) for c in counts.values()) + + +def scan_entropy( + text: str, + *, + location: str = "body", + window: int = ENTROPY_WINDOW, + threshold: float = ENTROPY_BLOCK_THRESHOLD, +) -> ScanResult | None: + """Warn-only detector: flag windows of `window` chars with Shannon entropy + above `threshold` bits per character. + + Never blocks; always returns severity='warn'. Disabled by default — + routes must opt in via dlp.outbound_detectors=['entropy']. + """ + if not text: + return None + step = max(1, window // 2) + end = len(text) + # Scan overlapping windows; also check the final tail if shorter than window. + positions = list(range(0, end - window + 1, step)) + if end < window: + positions = [0] + elif (end - window) % step != 0: + positions.append(end - window) + for i in positions: + chunk = text[i:i + window] + if _shannon_entropy(chunk) >= threshold: + return ScanResult( + severity="warn", + reason=f"high-entropy content in {location} (possible encrypted exfil)", + location=location, + context=_snippet(text, i, i + len(chunk)), + ) return None @@ -306,11 +455,18 @@ def scan_crlf_injection(text: str) -> ScanResult | None: __all__ = [ + "ENTROPY_BLOCK_THRESHOLD", + "ENTROPY_WINDOW", + "ENTROPY_STEP", + "PARTIAL_MATCH_MIN_LEN", "REDACT", "SNIPPET_CONTEXT", "TOKEN_PATTERNS", + "_alnum_projection", + "_shannon_entropy", "redact_tokens", "scan_crlf_injection", + "scan_entropy", "scan_known_secrets", "scan_naive_injection", "scan_token_patterns", diff --git a/bot_bottle/egress.py b/bot_bottle/egress.py index 8943049..13718d6 100644 --- a/bot_bottle/egress.py +++ b/bot_bottle/egress.py @@ -10,6 +10,7 @@ specific and lives on concrete subclasses (see from __future__ import annotations import dataclasses +import secrets from abc import ABC from dataclasses import dataclass from pathlib import Path @@ -34,6 +35,50 @@ EGRESS_HOSTNAME = "egress" EGRESS_ROUTES_IN_CONTAINER = "/etc/egress/routes.yaml" EGRESS_ROUTES_FILENAME = Path(EGRESS_ROUTES_IN_CONTAINER).name +_CANARY_ENV_WORDS = ( + "ACCORD", + "ANCHOR", + "ATLAS", + "CANON", + "CIPHER", + "EMBER", + "FALCON", + "HARBOR", + "LANTERN", + "MARBLE", + "NOVA", + "ORBIT", + "PIVOT", + "RADIUS", + "SUMMIT", + "VECTOR", +) + + +def _random_canary_env() -> str: + first = secrets.choice(_CANARY_ENV_WORDS) + remaining = tuple(word for word in _CANARY_ENV_WORDS if word != first) + second = secrets.choice(remaining) + return f"{first}_{second}_SECRET" + + +def egress_sidecar_env_entries(plan: "EgressPlan") -> tuple[str, ...]: + """Return sidecar env entries needed by egress across all backends.""" + env: list[str] = [] + if plan.routes: + env.extend(sorted(plan.token_env_map.keys())) + if plan.canary and plan.canary_env: + env.append(f"{plan.canary_env}={plan.canary}") + env.append(f"BOT_BOTTLE_SENSITIVE_PREFIXES={plan.canary_env}") + return tuple(env) + + +def egress_agent_env_entries(plan: "EgressPlan") -> tuple[str, ...]: + """Return agent-visible egress env entries shared by all backends.""" + if plan.canary and plan.canary_env: + return (f"{plan.canary_env}={plan.canary}",) + return () + @dataclass(frozen=True) class EgressRoute(Route): @@ -65,6 +110,8 @@ class EgressPlan: mitmproxy_ca_host_path: Path = Path() mitmproxy_ca_cert_only_host_path: Path = Path() log: int = 0 + canary: str = "" + canary_env: str = "" def egress_manifest_routes( @@ -324,12 +371,18 @@ class Egress(ABC): routes_path = stage_dir / EGRESS_ROUTES_FILENAME routes_path.write_text(egress_render_routes(routes, log=log)) routes_path.chmod(0o600) + # Generate a per-session fake secret under a plausible random env name. + # The sidecar marks that exact env name as sensitive for known-secret + # scanning; the agent receives the same name/value as exfil bait. + canary = secrets.token_urlsafe(32) return EgressPlan( slug=slug, routes_path=routes_path, routes=routes, token_env_map=egress_token_env_map(routes), log=log, + canary=canary, + canary_env=_random_canary_env(), ) __all__ = [ @@ -344,5 +397,7 @@ __all__ = [ "egress_render_routes", "egress_resolve_token_values", "egress_routes_for_bottle", + "egress_agent_env_entries", + "egress_sidecar_env_entries", "egress_token_env_map", ] diff --git a/bot_bottle/egress_addon_core.py b/bot_bottle/egress_addon_core.py index 324f7c7..16556a5 100644 --- a/bot_bottle/egress_addon_core.py +++ b/bot_bottle/egress_addon_core.py @@ -34,7 +34,7 @@ VALID_METHODS = frozenset({ "CONNECT", }) -OUTBOUND_DETECTOR_NAMES = frozenset({"token_patterns", "known_secrets"}) +OUTBOUND_DETECTOR_NAMES = frozenset({"token_patterns", "known_secrets", "entropy"}) INBOUND_DETECTOR_NAMES = frozenset({"naive_injection_detection"}) # Per-route policy for what the proxy does when an outbound DLP detector @@ -729,17 +729,28 @@ def scan_outbound( try: from dlp_detectors import ( # type: ignore[import-not-found] scan_crlf_injection, + scan_entropy, scan_known_secrets, scan_token_patterns, ) except ImportError: # pragma: no cover - host-side path from .dlp_detectors import ( # type: ignore[import-not-found] scan_crlf_injection, + scan_entropy, scan_known_secrets, scan_token_patterns, ) - text = body if isinstance(body, str) else body.decode("utf-8", errors="replace") + # Binary bodies: latin-1 is a bijective byte↔codepoint mapping that + # preserves every byte value, so ASCII-range secret strings remain + # findable by str.find / regex. Prefer strict UTF-8 for valid text bodies. + if isinstance(body, bytes): + try: + text = body.decode("utf-8") + except UnicodeDecodeError: + text = body.decode("latin-1") + else: + text = body # CRLF injection is only an attack in the request line + headers, never the # body: an HTTP body is delimited by Content-Length, so CRLF bytes there @@ -758,12 +769,30 @@ def scan_outbound( return result if _detector_enabled(route.outbound_detectors, "known_secrets"): + # BOT_BOTTLE_SENSITIVE_PREFIXES lets operators add extra env prefixes + # beyond EGRESS_TOKEN_* without changing the manifest schema. + extra_raw = environ.get("BOT_BOTTLE_SENSITIVE_PREFIXES", "") + extra = tuple(p for p in extra_raw.split(",") if p) + sensitive_prefixes = ("EGRESS_TOKEN_",) + extra result = scan_known_secrets( - text, location="body", env=environ, safe_tokens=safe_tokens, + text, location="body", env=environ, + sensitive_prefixes=sensitive_prefixes, safe_tokens=safe_tokens, ) if result is not None: return result + # Entropy scanning requires explicit opt-in: it is NOT part of the + # default "all detectors" set because it produces false positives on + # legitimate base64 / binary payloads. Routes must list "entropy" in + # dlp.outbound_detectors to enable it. + if ( + route.outbound_detectors is not None + and "entropy" in route.outbound_detectors + ): + result = scan_entropy(text, location="body") + if result is not None: + return result + return None diff --git a/docs/prds/prd-new-strengthen-outbound-exfil-detection.md b/docs/prds/prd-new-strengthen-outbound-exfil-detection.md new file mode 100644 index 0000000..490e519 --- /dev/null +++ b/docs/prds/prd-new-strengthen-outbound-exfil-detection.md @@ -0,0 +1,189 @@ +# PRD prd-new: Strengthen outbound exfiltration detection + +- **Status:** Active +- **Author:** claude +- **Created:** 2026-06-25 +- **Issue:** #259 + +## Summary + +The current DLP system has two detection tiers: weak (vendor token regexes) +and strong (exact matching of provisioned `EGRESS_TOKEN_*` values in multiple +encodings). This PRD adds four complementary hardening layers in ranked order +of impact: + +1. **Canary tokens** — inject a per-session fake secret into the agent's + environment; any outbound appearance is a zero-false-positive exfil signal. +2. **Broadened known-value scanning** — extend the strong tier beyond + `EGRESS_TOKEN_*` via a `BOT_BOTTLE_SENSITIVE_PREFIXES` sidecar env var. +3. **Fragmentation-resistant matching** — catch separator injection and + long-substring leaks via alnum projection and sliding-window substring + search. +4. **Entropy scoring** — warn-only flag on high-entropy blobs as a + complementary signal for encrypted exfil. + +## Problem + +After PRD 0056 the outbound scan covers all HTTP request surfaces (host, path, +query, headers, body). But the content-based strong tier only matches +`EGRESS_TOKEN_*` values and their encodings. Three gaps remain: + +| Gap | How an agent exploits it | +|-----|--------------------------| +| No canary | There is no "can't be false-positive" signal | +| Narrow prefix filter | Sensitive values provisioned under other env prefixes (MCP keys, API keys injected via the agent provider) are invisible to `scan_known_secrets` | +| Fragmentation blindness | Inserting `-`, space, or `\n` between secret characters turns an exact-match into a miss | + +## Goals / Success Criteria + +1. Each launched bottle has a unique canary token in the agent's environment + under a randomized `WORD_WORD_SECRET` env var name. The egress sidecar gets + the same env var and registers that exact name through + `BOT_BOTTLE_SENSITIVE_PREFIXES`. Any outbound appearance of the canary + blocks the request as a known-secret match. +2. `scan_known_secrets` accepts a `sensitive_prefixes` parameter (default: + `("EGRESS_TOKEN_",)`). `scan_outbound` reads + `BOT_BOTTLE_SENSITIVE_PREFIXES` from `environ` and merges those prefixes + in, so operators can mark additional env vars as scanned values without + changing the manifest schema. +3. For every secret that passes exact-match, a secondary alnum-projection pass + checks for the secret with all non-alphanumeric characters stripped. This + catches separator-injection evasion (`MY-SECRET` → body contains + `MY SECRET`). +4. A sliding-window partial-match pass checks for long-enough contiguous + substrings of the secret's alnum projection in the text's alnum projection. + Any match ≥ `PARTIAL_MATCH_MIN_LEN` (12 chars) blocks with reason + `"partial match"`. +5. A new `scan_entropy` detector flags outbound text windows with Shannon + entropy ≥ `ENTROPY_BLOCK_THRESHOLD` (5.5 bits/char) at **warn** severity + only. It is registered under the new detector name `"entropy"` in + `OUTBOUND_DETECTOR_NAMES` and disabled by default (routes must opt in). +6. Binary request bodies are decoded via `latin-1` instead of + `utf-8 errors="replace"`, preserving every byte value and allowing + ASCII-range secrets to be found within binary payloads. +7. All new behaviour is unit-tested; existing tests pass unchanged. + +## Non-goals + +- Rolling per-host buffer for split-across-requests detection (state in the + stateless addon is complex; deferred). +- Additional vendor regexes. +- ML / embedding-based detection. +- Entropy-based hard blocks (warn only per the issue). + +## Design + +### Canary token flow + +``` +Egress.prepare() + canary = secrets.token_urlsafe(32) + canary_env = + EgressPlan(canary=canary, canary_env=canary_env, ...) + +Docker compose render: + sidecar env: = + sidecar env: BOT_BOTTLE_SENSITIVE_PREFIXES= + agent env: = ← visible to agent as a "secret" + +macos-container launch: same literals added to sidecar + agent env entries +``` + +The sidecar uses `BOT_BOTTLE_SENSITIVE_PREFIXES` to make the random canary env +name part of the existing `scan_known_secrets` detector without adding a +manifest schema field. + +### Broadened known-value scanning + +`scan_known_secrets` gains a `sensitive_prefixes` parameter: + +```python +def scan_known_secrets( + text: str, + *, + location: str = "body", + env: Mapping[str, str] | None = None, + sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",), +) -> ScanResult | None: +``` + +`scan_outbound` reads `BOT_BOTTLE_SENSITIVE_PREFIXES` (comma-separated list +of additional prefixes) from `environ` and appends them: + +```python +extra = tuple( + p for p in environ.get("BOT_BOTTLE_SENSITIVE_PREFIXES", "").split(",") if p +) +sensitive_prefixes = ("EGRESS_TOKEN_",) + extra +``` + +`redact_tokens` receives the same treatment for consistent redaction. + +### Fragmentation-resistant matching + +A new helper `_alnum_projection(text)` strips all non-alphanumeric characters. +`scan_known_secrets` runs two passes per secret: + +1. **Exact pass** — existing encoded-variant loop (unchanged). +2. **Alnum-projection pass** — if the secret's alnum projection has ≥ 8 chars, + check if it appears in the text's alnum projection. Match → block with + `"fragmented match (separator injection)"` reason. +3. **Partial-substring pass** — if the secret's alnum projection has ≥ + `PARTIAL_MATCH_MIN_LEN` chars (12), slide a window of that length across the + secret's projection and look for each window in the text's alnum projection. + First match → block with `"partial match"` reason. + +All three passes run only for the `"known_secrets"` detector; the token-pattern +and entropy detectors are unchanged. + +### Entropy scoring + +New public function: + +```python +def scan_entropy( + text: str, + *, + location: str = "body", + window: int = ENTROPY_WINDOW, # 64 + threshold: float = ENTROPY_BLOCK_THRESHOLD, # 5.5 +) -> ScanResult | None: +``` + +Slides a window of `window` characters across `text` in steps of `window // 2`. +If any window's Shannon entropy exceeds `threshold`, returns a **warn**-severity +`ScanResult`. Never blocks. + +`OUTBOUND_DETECTOR_NAMES` gains `"entropy"`. Routes opt in via their `dlp` +block; entropy scanning is **off by default** to avoid false-positive noise on +legitimate binary payloads. + +### Binary body handling + +In `scan_outbound`, the bytes → str decoding changes from: + +```python +body.decode("utf-8", errors="replace") +``` + +to: + +```python +body.decode("utf-8") if body is str else body.decode("latin-1") +``` + +`latin-1` is a bijective byte↔codepoint mapping; every byte value is preserved +as its corresponding Latin-1 code point, so ASCII-range secret strings remain +intact and `str.find` / regex still locate them correctly. The fallback from +strict UTF-8 is tried first so valid UTF-8 bodies are decoded faithfully. + +## Implementation + +Delivered in three commits on the same branch: + +1. **DLP detector changes** — `_alnum_projection`, fragmentation passes, + `scan_entropy`, broadened `scan_known_secrets`, updated `scan_outbound` and + `redact_tokens`; all accompanying unit tests. +2. **Canary injection** — `EgressPlan.canary`, `Egress.prepare()`, + Docker compose + macos-container backend injection. +3. **PRD flip** — `Status: Draft → Active`. diff --git a/tests/unit/test_compose.py b/tests/unit/test_compose.py index 8b10eec..0e5d743 100644 --- a/tests/unit/test_compose.py +++ b/tests/unit/test_compose.py @@ -80,7 +80,11 @@ def _git_gate_plan(upstreams: tuple[GitGateUpstream, ...] = ()) -> GitGatePlan: ) -def _egress_plan(routes: tuple[EgressRoute, ...] = ()) -> EgressPlan: +def _egress_plan( + routes: tuple[EgressRoute, ...] = (), + *, + canary: bool = False, +) -> EgressPlan: token_env_map = { r.token_env: r.token_ref for r in routes @@ -95,6 +99,8 @@ def _egress_plan(routes: tuple[EgressRoute, ...] = ()) -> EgressPlan: egress_network=f"bot-bottle-egress-{SLUG}", mitmproxy_ca_host_path=STATE / "egress-ca" / "mitmproxy-ca.pem", mitmproxy_ca_cert_only_host_path=STATE / "egress-ca" / "ca.pem", + canary="fake-canary-value" if canary else "", + canary_env="CANON_ALPHA_SECRET" if canary else "", ) @@ -112,6 +118,7 @@ def _plan( with_git: bool = False, with_egress: bool = False, supervise: bool = False, + canary: bool = False, ) -> DockerBottlePlan: """Build a fully-resolved DockerBottlePlan. Toggles cover the matrix the renderer's conditional-service logic branches on.""" @@ -150,7 +157,7 @@ def _plan( slug=SLUG, forwarded_env={"CLAUDE_CODE_OAUTH_TOKEN": "x"}, git_gate_plan=_git_gate_plan(upstreams), - egress_plan=_egress_plan(routes), + egress_plan=_egress_plan(routes, canary=canary), supervise_plan=_supervise_plan() if supervise else None, use_runsc=False, agent_provision=AgentProvisionPlan( @@ -375,6 +382,20 @@ class TestSidecarBundleShape(unittest.TestCase): env_strings = sc["environment"] self.assertNotIn("EGRESS_TOKEN_0", env_strings) + def test_canary_env_registered_as_sensitive_in_sidecar(self): + sc = self._render(canary=True)["services"]["sidecars"] + env_strings = sc["environment"] + self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", env_strings) + self.assertIn( + "BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET", + env_strings, + ) + + def test_canary_env_visible_to_agent(self): + agent = self._render(canary=True)["services"]["agent"] + env_strings = agent["environment"] + self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", env_strings) + def test_supervise_env_present_when_active(self): sc = self._render(supervise=True)["services"]["sidecars"] env_strings = sc["environment"] diff --git a/tests/unit/test_dlp_detectors.py b/tests/unit/test_dlp_detectors.py index 3028729..dea4afc 100644 --- a/tests/unit/test_dlp_detectors.py +++ b/tests/unit/test_dlp_detectors.py @@ -1,18 +1,23 @@ """Unit: DLP detectors (PRD 0053). -Tests for token pattern scanning, known secret detection, and -naive prompt injection detection.""" +Tests for token pattern scanning, known secret detection, fragmentation- +resistant matching, entropy scoring, and naive prompt injection detection.""" import base64 import gzip import unittest from bot_bottle.dlp_detectors import ( + ENTROPY_BLOCK_THRESHOLD, + PARTIAL_MATCH_MIN_LEN, REDACT, + _alnum_projection, _encoded_variants, _normalize_text, + _shannon_entropy, redact_tokens, scan_crlf_injection, + scan_entropy, scan_known_secrets, scan_naive_injection, scan_token_patterns, @@ -502,6 +507,191 @@ class TestStripCrlf(unittest.TestCase): from bot_bottle.dlp_detectors import strip_crlf self.assertEqual("/api/v1/data?q=hello", strip_crlf("/api/v1/data?q=hello")) +class TestAlnumProjection(unittest.TestCase): + def test_alphanumeric_unchanged(self): + self.assertEqual("abc123XYZ", _alnum_projection("abc123XYZ")) + + def test_strips_hyphens(self): + self.assertEqual("mysecretvalue", _alnum_projection("my-secret-value")) + + def test_strips_spaces(self): + self.assertEqual("mysecretvalue", _alnum_projection("my secret value")) + + def test_strips_dots_and_underscores(self): + self.assertEqual("mysecretvalue", _alnum_projection("my.secret_value")) + + def test_empty_string(self): + self.assertEqual("", _alnum_projection("")) + + def test_all_special_chars(self): + self.assertEqual("", _alnum_projection("!@#$%^&*()")) + + +class TestFragmentationResistantMatching(unittest.TestCase): + """scan_known_secrets catches separator-injection and partial-substring evasion.""" + + # Secrets long enough that their alnum projections are ≥ 8 chars. + SECRET = "supersecrettoken99" + ENV = {"EGRESS_TOKEN_0": SECRET} + + def test_exact_match_still_works(self): + result = scan_known_secrets(f"key={self.SECRET}", env=self.ENV) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("block", result.severity) + + def test_separator_injection_blocked(self): + # Hyphens inserted between chars of the secret. + fragmented = "-".join(self.SECRET) + result = scan_known_secrets(f"data={fragmented}", env=self.ENV) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("block", result.severity) + self.assertIn("separator injection", result.reason) + + def test_space_separator_blocked(self): + fragmented = " ".join(self.SECRET) + result = scan_known_secrets(f"body: {fragmented}", env=self.ENV) + self.assertIsNotNone(result) + assert result is not None + self.assertIn("separator injection", result.reason) + + def test_partial_substring_blocked(self): + # First PARTIAL_MATCH_MIN_LEN alnum chars of the secret, no separators. + partial = _alnum_projection(self.SECRET)[:PARTIAL_MATCH_MIN_LEN] + result = scan_known_secrets(f"x={partial}&y=other", env=self.ENV) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("block", result.severity) + self.assertIn("partial match", result.reason) + + def test_short_secret_skips_projection(self): + # Secrets shorter than _ALNUM_MIN_LEN in alnum projection are not + # fragmentation-checked (too many false positives). + short_env = {"EGRESS_TOKEN_0": "abc"} + # "a b c" has alnum projection "abc" (3 chars, < 8); should not block. + self.assertIsNone(scan_known_secrets("a b c", env=short_env)) + + def test_clean_text_not_blocked(self): + self.assertIsNone(scan_known_secrets("nothing to see here", env=self.ENV)) + + def test_sensitive_prefixes_param_extra_prefix(self): + env = {"MY_CRED_0": self.SECRET, "IGNORED": "other"} + result = scan_known_secrets( + f"key={self.SECRET}", + env=env, + sensitive_prefixes=("MY_CRED_",), + ) + self.assertIsNotNone(result) + assert result is not None + self.assertIn("MY_CRED_0", result.reason) + + def test_sensitive_prefixes_default_only_egress_token(self): + # A value under a non-EGRESS_TOKEN_ key is ignored with default prefixes. + env = {"MY_CRED_0": self.SECRET} + self.assertIsNone(scan_known_secrets(f"key={self.SECRET}", env=env)) + + def test_canary_prefix_detected(self): + canary_value = "canary-fake-secret-value-xyz" + env = {"CANON_ALPHA_SECRET": canary_value} + result = scan_known_secrets( + f"x={canary_value}", + env=env, + sensitive_prefixes=("CANON_ALPHA_SECRET",), + ) + self.assertIsNotNone(result) + assert result is not None + self.assertIn("CANON_ALPHA_SECRET", result.reason) + + +class TestRedactTokensBroadenedPrefixes(unittest.TestCase): + SECRET = "my-provisioned-secret" + + def test_default_redacts_egress_token(self): + env = {"EGRESS_TOKEN_0": self.SECRET} + out = redact_tokens(f"val={self.SECRET}", env=env) + self.assertNotIn(self.SECRET, out) + self.assertIn(REDACT, out) + + def test_extra_prefix_redacted(self): + env = {"MY_SECRET_KEY": self.SECRET} + out = redact_tokens( + f"val={self.SECRET}", + env=env, + sensitive_prefixes=("MY_SECRET_",), + ) + self.assertNotIn(self.SECRET, out) + self.assertIn(REDACT, out) + + def test_non_matching_prefix_not_redacted(self): + env = {"MY_SECRET_KEY": self.SECRET} + out = redact_tokens(f"val={self.SECRET}", env=env) + # Default prefixes only include EGRESS_TOKEN_ → secret not redacted + self.assertIn(self.SECRET, out) + + +class TestShannonEntropy(unittest.TestCase): + def test_empty_string_zero(self): + self.assertEqual(0.0, _shannon_entropy("")) + + def test_single_char_zero(self): + self.assertEqual(0.0, _shannon_entropy("aaaaaa")) + + def test_two_equal_chars_one_bit(self): + self.assertAlmostEqual(1.0, _shannon_entropy("abababab"), places=10) + + def test_high_entropy_random_like(self): + # Uniform 64-char string over 64 distinct symbols has entropy 6 bits. + import string + alphabet = (string.ascii_letters + string.digits + "+/")[:64] + text = alphabet # each char appears exactly once + self.assertAlmostEqual(6.0, _shannon_entropy(text), places=10) + + +class TestScanEntropy(unittest.TestCase): + def test_empty_returns_none(self): + self.assertIsNone(scan_entropy("")) + + def test_low_entropy_returns_none(self): + # Highly repetitive text has low entropy. + self.assertIsNone(scan_entropy("a" * 200)) + + def test_high_entropy_warns(self): + # Build a 64-char string with entropy > ENTROPY_BLOCK_THRESHOLD. + # Use all 64 distinct printable chars to maximise entropy (~6 bits). + import string + alphabet = (string.ascii_letters + string.digits + "+/")[:64] + result = scan_entropy(alphabet, threshold=ENTROPY_BLOCK_THRESHOLD) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("warn", result.severity) + self.assertIn("high-entropy", result.reason) + + def test_never_blocks(self): + import string + alphabet = (string.ascii_letters + string.digits + "+/")[:64] + result = scan_entropy(alphabet) + # scan_entropy is warn-only; it must never return severity="block". + if result is not None: + self.assertNotEqual("block", result.severity) + + def test_location_in_result(self): + import string + alphabet = (string.ascii_letters + string.digits + "+/")[:64] + result = scan_entropy(alphabet, location="authorization header") + if result is not None: + self.assertIn("authorization header", result.location) + + def test_structured_json_no_warn(self): + # Typical JSON has low entropy and should not be flagged. + json_body = '{"status": "ok", "message": "hello world", "count": 42}' + self.assertIsNone(scan_entropy(json_body)) + + def test_short_text_below_window(self): + # Text shorter than the window: checked as one chunk. + # Use a uniform string to ensure it won't be flagged. + self.assertIsNone(scan_entropy("abcde", threshold=ENTROPY_BLOCK_THRESHOLD)) + if __name__ == "__main__": unittest.main() diff --git a/tests/unit/test_egress.py b/tests/unit/test_egress.py index 4fdae02..a25a2a0 100644 --- a/tests/unit/test_egress.py +++ b/tests/unit/test_egress.py @@ -1,15 +1,21 @@ """Unit: Egress route lift + routes.yaml render + token resolution (PRD 0017, PRD 0053).""" +import tempfile import unittest +from pathlib import Path from bot_bottle.egress import ( CODEX_HOST_CREDENTIAL_TOKEN_REF, + Egress, + EgressPlan, EgressRoute, + egress_agent_env_entries, egress_manifest_routes, egress_render_routes, egress_resolve_token_values, egress_routes_for_bottle, + egress_sidecar_env_entries, egress_token_env_map, ) from bot_bottle.log import Die @@ -443,5 +449,119 @@ class TestResolveTokenValues(unittest.TestCase): self.assertEqual({"EGRESS_TOKEN_0": "codex-access-token"}, out) +class TestCanaryGeneration(unittest.TestCase): + """Egress.prepare() generates a unique canary token per session.""" + + def _bottle_obj(self): + return ManifestIndex.from_json_obj({ + "bottles": {"dev": {"egress": {"routes": []}}}, + "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, + }).bottles["dev"] + + def _make_plan(self) -> EgressPlan: + # Use a concrete no-op subclass so we can call prepare() without + # a real backend. + class _TestEgress(Egress): + pass + + e = _TestEgress() + with tempfile.TemporaryDirectory() as td: + return e.prepare(self._bottle_obj(), "test-slug", Path(td)) + + def test_canary_is_non_empty(self): + plan = self._make_plan() + self.assertIsInstance(plan.canary, str) + self.assertGreater(len(plan.canary), 0) + self.assertRegex(plan.canary_env, r"^[A-Z]+_[A-Z]+_SECRET$") + + def test_canary_is_unique_per_session(self): + with tempfile.TemporaryDirectory() as td: + bottle = self._bottle_obj() + + class _TestEgress(Egress): + pass + + e = _TestEgress() + plan_a = e.prepare(bottle, "slug-a", Path(td)) + plan_b = e.prepare(bottle, "slug-b", Path(td)) + self.assertNotEqual(plan_a.canary, plan_b.canary) + + def test_canary_detected_by_scan_known_secrets(self): + from bot_bottle.dlp_detectors import scan_known_secrets + + plan = self._make_plan() + env = {plan.canary_env: plan.canary} + result = scan_known_secrets( + f"exfil={plan.canary}", + env=env, + sensitive_prefixes=(plan.canary_env,), + ) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("block", result.severity) + self.assertIn(plan.canary_env, result.reason) + + def test_egress_plan_canary_field_default_empty(self): + # Verify EgressPlan can be constructed with an empty canary (backward compat). + from pathlib import Path + plan = EgressPlan( + slug="s", + routes_path=Path("/tmp/r.yaml"), + routes=(), + token_env_map={}, + ) + self.assertEqual("", plan.canary) + self.assertEqual("", plan.canary_env) + + +class TestEgressEnvEntries(unittest.TestCase): + def test_sidecar_entries_include_route_tokens_and_canary_scan_prefix(self): + plan = EgressPlan( + slug="s", + routes_path=Path("/tmp/r.yaml"), + routes=(EgressRoute(host="api.example"),), + token_env_map={"EGRESS_TOKEN_1": "T1", "EGRESS_TOKEN_0": "T0"}, + canary="fake-canary-value", + canary_env="CANON_ALPHA_SECRET", + ) + + self.assertEqual( + ( + "EGRESS_TOKEN_0", + "EGRESS_TOKEN_1", + "CANON_ALPHA_SECRET=fake-canary-value", + "BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET", + ), + egress_sidecar_env_entries(plan), + ) + + def test_agent_entries_include_only_canary_bait(self): + plan = EgressPlan( + slug="s", + routes_path=Path("/tmp/r.yaml"), + routes=(), + token_env_map={}, + canary="fake-canary-value", + canary_env="CANON_ALPHA_SECRET", + ) + + self.assertEqual( + ("CANON_ALPHA_SECRET=fake-canary-value",), + egress_agent_env_entries(plan), + ) + + def test_canary_entries_omitted_when_name_missing(self): + plan = EgressPlan( + slug="s", + routes_path=Path("/tmp/r.yaml"), + routes=(), + token_env_map={}, + canary="fake-canary-value", + ) + + self.assertEqual((), egress_sidecar_env_entries(plan)) + self.assertEqual((), egress_agent_env_entries(plan)) + + if __name__ == "__main__": unittest.main() diff --git a/tests/unit/test_egress_addon_core.py b/tests/unit/test_egress_addon_core.py index 2ea6abd..758b85a 100644 --- a/tests/unit/test_egress_addon_core.py +++ b/tests/unit/test_egress_addon_core.py @@ -1273,6 +1273,109 @@ class TestBuildTokenAllowPayload(unittest.TestCase): result = ScanResult(severity="block", reason="r", matched="x") payload = build_token_allow_payload("h", "GET", "/", result) self.assertNotIn("context:", payload) +class TestScanOutboundEnhanced(unittest.TestCase): + """scan_outbound changes: binary decode, entropy detector, + broadened known-value prefixes, fragmentation resistance.""" + + _ROUTE = Route(host="api.example.com") + _ROUTE_ENTROPY = Route( + host="api.example.com", + outbound_detectors=("entropy",), + ) + + def test_binary_body_latin1_decode_finds_ascii_secret(self): + # Body contains valid ASCII secret surrounded by non-UTF-8 bytes. + secret = "supersecrettoken99" + env = {"EGRESS_TOKEN_0": secret} + # Wrap the secret in bytes that are invalid UTF-8. + body = b"\x80\x81" + secret.encode("ascii") + b"\xff" + result = scan_outbound(self._ROUTE, body, env) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("block", result.severity) + + def test_binary_body_valid_utf8_decoded_correctly(self): + env = {"EGRESS_TOKEN_0": "mysecret"} + # Valid UTF-8 body — should be decoded as UTF-8, not latin-1. + body = "clean body with mysecret".encode("utf-8") + result = scan_outbound(self._ROUTE, body, env) + self.assertIsNotNone(result) + + def test_entropy_detector_off_by_default(self): + import string + # High-entropy content should NOT warn if the route has no entropy detector. + alphabet = (string.ascii_letters + string.digits + "+/")[:64] + result = scan_outbound(self._ROUTE, alphabet, {}) + self.assertIsNone(result) + + def test_entropy_detector_warns_when_enabled(self): + import string + alphabet = (string.ascii_letters + string.digits + "+/")[:64] + result = scan_outbound(self._ROUTE_ENTROPY, alphabet, {}) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("warn", result.severity) + + def test_bot_bottle_sensitive_prefixes_env_var(self): + # When the sidecar env contains BOT_BOTTLE_SENSITIVE_PREFIXES, + # scan_outbound should scan those additional prefixes. + secret = "extra-sensitive-value-abc" + env = { + "MY_CRED_KEY": secret, + "BOT_BOTTLE_SENSITIVE_PREFIXES": "MY_CRED_", + } + result = scan_outbound(self._ROUTE, f"x={secret}", env) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("block", result.severity) + + def test_bot_bottle_sensitive_prefixes_multiple(self): + secret = "my-api-key-value-xyz" + env = { + "ANTHROPIC_API_0": secret, + "BOT_BOTTLE_SENSITIVE_PREFIXES": "ANTHROPIC_API_,OTHER_", + } + result = scan_outbound(self._ROUTE, f"auth={secret}", env) + self.assertIsNotNone(result) + + def test_canary_detected_via_random_secret_env_name(self): + # The fake secret uses a randomized env name that the sidecar marks + # as sensitive through BOT_BOTTLE_SENSITIVE_PREFIXES. + canary = "canaryvalue12345abcdef" + env = { + "CANON_ALPHA_SECRET": canary, + "BOT_BOTTLE_SENSITIVE_PREFIXES": "CANON_ALPHA_SECRET", + } + result = scan_outbound(self._ROUTE, f"data={canary}", env) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("block", result.severity) + self.assertIn("CANON_ALPHA_SECRET", result.reason) + + def test_fragmented_canary_blocked(self): + # Canary with separators injected is still caught. + canary = "supersecretcanary99" + env = { + "CANON_ALPHA_SECRET": canary, + "BOT_BOTTLE_SENSITIVE_PREFIXES": "CANON_ALPHA_SECRET", + } + fragmented = "-".join(canary) + result = scan_outbound(self._ROUTE, f"x={fragmented}", env) + self.assertIsNotNone(result) + + +class TestOutboundDetectorNames(unittest.TestCase): + def test_entropy_in_outbound_detector_names(self): + from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES + self.assertIn("entropy", OUTBOUND_DETECTOR_NAMES) + + def test_known_secrets_in_outbound_detector_names(self): + from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES + self.assertIn("known_secrets", OUTBOUND_DETECTOR_NAMES) + + def test_token_patterns_in_outbound_detector_names(self): + from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES + self.assertIn("token_patterns", OUTBOUND_DETECTOR_NAMES) if __name__ == "__main__": diff --git a/tests/unit/test_macos_container_launch.py b/tests/unit/test_macos_container_launch.py index d9ae81c..3e1038e 100644 --- a/tests/unit/test_macos_container_launch.py +++ b/tests/unit/test_macos_container_launch.py @@ -30,6 +30,7 @@ def _plan( supervise: bool = False, agent_git_gate_url: str = "", agent_supervise_url: str = "", + canary: bool = False, ) -> MacosContainerBottlePlan: routes_path = stage_dir / "routes.yaml" routes_path.write_text("routes: []\n", encoding="utf-8") @@ -42,6 +43,8 @@ def _plan( routes_path=routes_path, routes=("route",), token_env_map={"EGRESS_TOKEN_0": "HOST_TOKEN"}, + canary="fake-canary-value" if canary else "", + canary_env="CANON_ALPHA_SECRET" if canary else "", ) if git: key_path = stage_dir / "origin-key" @@ -138,6 +141,26 @@ class TestMacosContainerLaunchArgv(unittest.TestCase): argv, ) + def test_sidecar_argv_registers_canary_env_as_sensitive(self): + plan = _plan(stage_dir=self.stage_dir, canary=True) + argv = launch._sidecar_run_argv( + plan, + "bot-bottle-sidecars-dev-abc", + "bot-bottle-net-dev-abc", + "bot-bottle-egress-dev-abc", + ) + self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", argv) + self.assertIn("BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET", argv) + + def test_agent_argv_receives_canary_env(self): + plan = _plan(stage_dir=self.stage_dir, canary=True) + argv = launch._agent_run_argv( + plan, + "bot-bottle-net-dev-abc", + "192.0.2.10", + ) + self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", argv) + def test_agent_env_points_proxy_at_sidecar_ip(self): plan = _plan( stage_dir=self.stage_dir, @@ -271,7 +294,7 @@ def _build_plan(stage_dir: Path) -> MacosContainerBottlePlan: manifest=_MANIFEST, stage_dir=stage_dir, git_gate_plan=cast(GitGatePlan, SimpleNamespace(upstreams=())), - egress_plan=cast(EgressPlan, SimpleNamespace()), + egress_plan=cast(EgressPlan, SimpleNamespace(canary="")), supervise_plan=None, agent_provision=AgentProvisionPlan( template="claude", diff --git a/tests/unit/test_smolmachines_provision.py b/tests/unit/test_smolmachines_provision.py index e3fbbfc..b5744c3 100644 --- a/tests/unit/test_smolmachines_provision.py +++ b/tests/unit/test_smolmachines_provision.py @@ -26,9 +26,7 @@ from bot_bottle.backend.smolmachines.bottle import SmolmachinesBottle from bot_bottle.backend.smolmachines.bottle_plan import ( SmolmachinesBottlePlan, ) -# from bot_bottle.backend.smolmachines.provision import ( -# workspace as _workspace, -# ) +from bot_bottle.backend.smolmachines import launch as _launch from bot_bottle.backend.smolmachines.launch import _bundle_launch_spec from bot_bottle.backend.util import AGENT_CA_PATH from bot_bottle.egress import EgressPlan, EgressRoute @@ -86,6 +84,7 @@ def _plan( stage_dir: Path | None = None, egress_routes: tuple[EgressRoute, ...] = (), egress_ca_path: Path = Path(), + canary: bool = False, supervise: bool = False, bundle_ip: str = "192.168.50.2", agent_git_gate_host: str = "127.0.0.1:55555", @@ -156,6 +155,8 @@ def _plan( routes=egress_routes, token_env_map={}, mitmproxy_ca_cert_only_host_path=egress_ca_path, + canary="fake-canary-value" if canary else "", + canary_env="CANON_ALPHA_SECRET" if canary else "", ), supervise_plan=supervise_plan, agent_git_gate_host=agent_git_gate_host, @@ -411,6 +412,31 @@ class TestBundleLaunchSpec(unittest.TestCase): self.assertIn(9420, spec.ports_to_publish) self.assertNotIn(9418, spec.ports_to_publish) + def test_canary_env_registered_as_sensitive_in_bundle(self): + plan = _plan(canary=True) + + spec = _bundle_launch_spec(plan, "net", "127.0.0.16") + + self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", spec.environment) + self.assertIn( + "BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET", + spec.environment, + ) + + def test_canary_env_visible_to_smolvm_guest(self): + plan = _plan(canary=True) + with patch.object( + _launch._bundle, + "bundle_host_port", + return_value="65000", + ): + stamped = _launch._discover_urls(plan, "127.0.0.16") + + self.assertEqual( + "fake-canary-value", + stamped.guest_env["CANON_ALPHA_SECRET"], + ) + class TestProvisionGitUser(unittest.TestCase): """`provision_git` runs `git config --global` inside the