PRD: Strengthen outbound exfiltration detection #263

Merged
didericis merged 10 commits from strengthen-outbound-exfil-detection into main 2026-06-25 00:15:33 -04:00
13 changed files with 944 additions and 25 deletions
+4 -2
View File
@@ -28,6 +28,8 @@ from typing import Any
from ...egress import ( from ...egress import (
EGRESS_HOSTNAME, EGRESS_HOSTNAME,
EGRESS_ROUTES_IN_CONTAINER, EGRESS_ROUTES_IN_CONTAINER,
egress_agent_env_entries,
egress_sidecar_env_entries,
) )
from ...git_gate import GIT_GATE_HOSTNAME from ...git_gate import GIT_GATE_HOSTNAME
from ...log import die, warn from ...log import die, warn
@@ -135,8 +137,7 @@ def _sidecar_bundle_service(plan: DockerBottlePlan) -> dict[str, Any]:
volumes.append(_bind(ep.mitmproxy_ca_host_path, EGRESS_CA_IN_CONTAINER)) volumes.append(_bind(ep.mitmproxy_ca_host_path, EGRESS_CA_IN_CONTAINER))
if ep.routes: if ep.routes:
volumes.append(_bind(ep.routes_path.parent, str(Path(EGRESS_ROUTES_IN_CONTAINER).parent))) volumes.append(_bind(ep.routes_path.parent, str(Path(EGRESS_ROUTES_IN_CONTAINER).parent)))
for token_env in sorted(ep.token_env_map.keys()): env.extend(egress_sidecar_env_entries(ep))
env.append(token_env)
# --- git-gate ----------------------------------------------------- # --- git-gate -----------------------------------------------------
gp = plan.git_gate_plan gp = plan.git_gate_plan
@@ -220,6 +221,7 @@ def _agent_service(plan: DockerBottlePlan) -> dict[str, Any]:
# never lands on argv or in the compose file. # never lands on argv or in the compose file.
for name in sorted(plan.forwarded_env.keys()): for name in sorted(plan.forwarded_env.keys()):
env.append(name) env.append(name)
env.extend(egress_agent_env_entries(plan.egress_plan))
service: dict[str, Any] = { service: dict[str, Any] = {
"image": plan.image, "image": plan.image,
+8 -4
View File
@@ -22,7 +22,12 @@ from ...bottle_state import (
git_gate_state_dir, git_gate_state_dir,
read_committed_image, read_committed_image,
) )
from ...egress import EGRESS_ROUTES_IN_CONTAINER, egress_resolve_token_values from ...egress import (
EGRESS_ROUTES_IN_CONTAINER,
egress_agent_env_entries,
egress_resolve_token_values,
egress_sidecar_env_entries,
)
from ...git_gate import revoke_git_gate_provisioned_keys from ...git_gate import revoke_git_gate_provisioned_keys
from ...log import die, info, warn from ...log import die, info, warn
from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT
@@ -350,9 +355,7 @@ def _sidecar_daemons(plan: MacosContainerBottlePlan) -> tuple[str, ...]:
didericis marked this conversation as resolved Outdated
Outdated
Review

The fact that we needed to update all the backends to propagate the env vars here is a red flag, should be backend agnostic. Can we move this sidecar env provisioning to a location that gets shared between backends?

The fact that we needed to update all the backends to propagate the env vars here is a red flag, should be backend agnostic. Can we move this sidecar env provisioning to a location that gets shared between backends?
def _sidecar_env_entries(plan: MacosContainerBottlePlan) -> tuple[str, ...]: def _sidecar_env_entries(plan: MacosContainerBottlePlan) -> tuple[str, ...]:
env: list[str] = [] env: list[str] = list(egress_sidecar_env_entries(plan.egress_plan))
if plan.egress_plan.routes:
env.extend(sorted(plan.egress_plan.token_env_map.keys()))
if plan.git_gate_plan.upstreams: if plan.git_gate_plan.upstreams:
env.append(f"BOT_BOTTLE_GIT_GATE_READY_FILE={_GIT_GATE_READY_FILE}") env.append(f"BOT_BOTTLE_GIT_GATE_READY_FILE={_GIT_GATE_READY_FILE}")
if plan.supervise_plan is not None: if plan.supervise_plan is not None:
@@ -420,6 +423,7 @@ def _agent_env_entries(
env.append(f"{name}={value}") env.append(f"{name}={value}")
for name in sorted(plan.forwarded_env.keys()): for name in sorted(plan.forwarded_env.keys()):
env.append(name) env.append(name)
env.extend(egress_agent_env_entries(plan.egress_plan))
return tuple(env) return tuple(env)
+6 -5
View File
@@ -23,7 +23,9 @@ from typing import Callable, Generator
from ...egress import ( from ...egress import (
EGRESS_ROUTES_IN_CONTAINER, EGRESS_ROUTES_IN_CONTAINER,
egress_agent_env_entries,
egress_resolve_token_values, egress_resolve_token_values,
egress_sidecar_env_entries,
) )
from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT
from ...util import expand_tilde from ...util import expand_tilde
@@ -228,6 +230,9 @@ def _discover_urls(
guest_env["GIT_GATE_URL"] = f"http://{agent_git_gate_host}" guest_env["GIT_GATE_URL"] = f"http://{agent_git_gate_host}"
if agent_supervise_url: if agent_supervise_url:
guest_env["MCP_SUPERVISE_URL"] = agent_supervise_url guest_env["MCP_SUPERVISE_URL"] = agent_supervise_url
for entry in egress_agent_env_entries(plan.egress_plan):
name, value = entry.split("=", 1)
guest_env[name] = value
return dataclasses.replace( return dataclasses.replace(
plan, plan,
@@ -316,11 +321,7 @@ def _bundle_launch_spec(
volumes.append((str(ep.mitmproxy_ca_host_path), EGRESS_CA_IN_CONTAINER, True)) volumes.append((str(ep.mitmproxy_ca_host_path), EGRESS_CA_IN_CONTAINER, True))
if ep.routes: if ep.routes:
volumes.append((str(ep.routes_path.parent), str(Path(EGRESS_ROUTES_IN_CONTAINER).parent), True)) volumes.append((str(ep.routes_path.parent), str(Path(EGRESS_ROUTES_IN_CONTAINER).parent), True))
# Bare-name entries for upstream-token slots. Their values env.extend(egress_sidecar_env_entries(ep))
# come from the docker-run subprocess env (inherited from
# the operator's shell), never landing on argv.
for token_env in sorted(ep.token_env_map.keys()):
env.append(token_env)
# --- git-gate --------------------------------------------- # --- git-gate ---------------------------------------------
gp = plan.git_gate_plan gp = plan.git_gate_plan
+159 -3
View File
1
@@ -15,6 +15,8 @@ import gzip
import re import re
import typing import typing
import unicodedata import unicodedata
from math import log2
from collections import Counter
from urllib.parse import quote as url_quote from urllib.parse import quote as url_quote
try: try:
@@ -107,20 +109,21 @@ def redact_tokens(
text: str, text: str,
*, *,
env: typing.Mapping[str, str] | None = None, env: typing.Mapping[str, str] | None = None,
sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",),
) -> str: ) -> str:
"""Replace token pattern matches and (if env given) provisioned secrets with REDACT.""" """Replace token pattern matches and (if env given) provisioned secrets with REDACT."""
for _, pattern in TOKEN_PATTERNS: for _, pattern in TOKEN_PATTERNS:
text = pattern.sub(REDACT, text) text = pattern.sub(REDACT, text)
if env is not None: if env is not None:
for key, value in env.items(): for key, value in env.items():
if key.startswith("EGRESS_TOKEN_") and value: if any(key.startswith(p) for p in sensitive_prefixes) and value:
for variant in _encoded_variants(value): for variant in _encoded_variants(value):
text = text.replace(variant, REDACT) text = text.replace(variant, REDACT)
return text return text
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Known secrets detector (Phase 1b) # Known secrets detector
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def _encoded_variants(secret: str) -> list[str]: def _encoded_variants(secret: str) -> list[str]:
@@ -161,18 +164,65 @@ def _encoded_variants(secret: str) -> list[str]:
return variants return variants
# ---------------------------------------------------------------------------
# Fragmentation-resistant helpers
# ---------------------------------------------------------------------------
# Minimum length of alnum projection for projection-based checks to run.
# Short secrets produce too many false positives in projection space.
_ALNUM_MIN_LEN = 8
# Minimum window length for the partial-substring sliding scan.
PARTIAL_MATCH_MIN_LEN = 12
def _alnum_projection(text: str) -> str:
"""Return text with every non-alphanumeric character stripped.
Used for fragmentation-resistant matching: separator-injected secrets
(spaces, hyphens, dots inserted between characters) are identical to
their originals in alnum projection space.
"""
return "".join(c for c in text if c.isalnum())
def _find_partial_window(secret_alnum: str, text_alnum: str, min_len: int) -> int | None:
"""Return the position in text_alnum where any min_len-char window of
secret_alnum first appears, or None.
Slides a window of width min_len across secret_alnum and searches for
each window in text_alnum. The first hit position is returned.
"""
if len(secret_alnum) < min_len or len(text_alnum) < min_len:
return None
for i in range(len(secret_alnum) - min_len + 1):
window = secret_alnum[i:i + min_len]
pos = text_alnum.find(window)
if pos >= 0:
return pos
return None
def scan_known_secrets( def scan_known_secrets(
text: str, text: str,
*, *,
location: str = "body", location: str = "body",
env: typing.Mapping[str, str] | None = None, env: typing.Mapping[str, str] | None = None,
sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",),
safe_tokens: typing.AbstractSet[str] | None = None, safe_tokens: typing.AbstractSet[str] | None = None,
) -> ScanResult | None: ) -> ScanResult | None:
if env is None: if env is None:
return None return None
# Pre-compute alnum projection of the scan text once; reused per secret.
text_alnum: str | None = None
for key, value in env.items(): for key, value in env.items():
if not key.startswith("EGRESS_TOKEN_") or not value: if not any(key.startswith(p) for p in sensitive_prefixes) or not value:
continue continue
# Pass 1: exact match across encoded variants (original behaviour).
approved_exact = False
for variant in _encoded_variants(value): for variant in _encoded_variants(value):
pos = text.find(variant) pos = text.find(variant)
if pos >= 0: if pos >= 0:
@@ -180,6 +230,7 @@ def scan_known_secrets(
# (PRD 0062); a different encoding of the same secret is a # (PRD 0062); a different encoding of the same secret is a
# fresh block. # fresh block.
if safe_tokens is not None and variant in safe_tokens: if safe_tokens is not None and variant in safe_tokens:
approved_exact = True
continue continue
return ScanResult( return ScanResult(
severity="block", severity="block",
@@ -188,6 +239,104 @@ def scan_known_secrets(
context=_snippet(text, pos, pos + len(variant)), context=_snippet(text, pos, pos + len(variant)),
matched=variant, matched=variant,
) )
if approved_exact:
# Exact match was found and approved; projection passes would
# fire on the same value, so skip them for this secret.
continue
# Pass 2 & 3: fragmentation-resistant projection checks.
secret_alnum = _alnum_projection(value)
if len(secret_alnum) < _ALNUM_MIN_LEN:
continue
if text_alnum is None:
text_alnum = _alnum_projection(text)
# Pass 2: full alnum-projection exact match (catches separator injection).
pos2 = text_alnum.find(secret_alnum)
if pos2 >= 0:
return ScanResult(
severity="block",
reason=(
f"provisioned secret from {key} found in {location} "
f"(fragmented match — separator injection)"
),
location=location,
context=_snippet(text_alnum, pos2, pos2 + len(secret_alnum)),
)
# Pass 3: sliding-window partial match (catches chunked-substring leaks).
pos3 = _find_partial_window(secret_alnum, text_alnum, PARTIAL_MATCH_MIN_LEN)
if pos3 is not None:
return ScanResult(
severity="block",
reason=(
f"provisioned secret from {key} found in {location} "
f"(partial match — at least {PARTIAL_MATCH_MIN_LEN} consecutive "
f"alphanumeric chars)"
),
location=location,
context=_snippet(text_alnum, pos3, pos3 + PARTIAL_MATCH_MIN_LEN),
)
return None
# ---------------------------------------------------------------------------
# Entropy detector (warn-only)
# ---------------------------------------------------------------------------
# Sliding window size and step for the entropy scan.
ENTROPY_WINDOW = 64
ENTROPY_STEP = 32
# Bits-per-character threshold. Random ASCII printable ≈ 6.6 bits; random
# lowercase hex ≈ 4 bits; random base64url ≈ 6 bits. 5.5 sits above
# typical structured data (JSON, URLs) while staying below truly random
# content.
ENTROPY_BLOCK_THRESHOLD = 5.5
def _shannon_entropy(text: str) -> float:
if not text:
return 0.0
counts = Counter(text)
n = len(text)
return -sum((c / n) * log2(c / n) for c in counts.values())
def scan_entropy(
text: str,
*,
location: str = "body",
window: int = ENTROPY_WINDOW,
threshold: float = ENTROPY_BLOCK_THRESHOLD,
) -> ScanResult | None:
"""Warn-only detector: flag windows of `window` chars with Shannon entropy
above `threshold` bits per character.
Never blocks; always returns severity='warn'. Disabled by default —
routes must opt in via dlp.outbound_detectors=['entropy'].
"""
if not text:
return None
step = max(1, window // 2)
end = len(text)
# Scan overlapping windows; also check the final tail if shorter than window.
positions = list(range(0, end - window + 1, step))
if end < window:
positions = [0]
elif (end - window) % step != 0:
positions.append(end - window)
for i in positions:
chunk = text[i:i + window]
if _shannon_entropy(chunk) >= threshold:
return ScanResult(
severity="warn",
reason=f"high-entropy content in {location} (possible encrypted exfil)",
location=location,
context=_snippet(text, i, i + len(chunk)),
)
return None return None
@@ -306,11 +455,18 @@ def scan_crlf_injection(text: str) -> ScanResult | None:
__all__ = [ __all__ = [
"ENTROPY_BLOCK_THRESHOLD",
"ENTROPY_WINDOW",
"ENTROPY_STEP",
"PARTIAL_MATCH_MIN_LEN",
"REDACT", "REDACT",
"SNIPPET_CONTEXT", "SNIPPET_CONTEXT",
"TOKEN_PATTERNS", "TOKEN_PATTERNS",
"_alnum_projection",
"_shannon_entropy",
"redact_tokens", "redact_tokens",
"scan_crlf_injection", "scan_crlf_injection",
"scan_entropy",
"scan_known_secrets", "scan_known_secrets",
"scan_naive_injection", "scan_naive_injection",
"scan_token_patterns", "scan_token_patterns",
+55
View File
@@ -10,6 +10,7 @@ specific and lives on concrete subclasses (see
from __future__ import annotations from __future__ import annotations
import dataclasses import dataclasses
import secrets
from abc import ABC from abc import ABC
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
@@ -34,6 +35,50 @@ EGRESS_HOSTNAME = "egress"
EGRESS_ROUTES_IN_CONTAINER = "/etc/egress/routes.yaml" EGRESS_ROUTES_IN_CONTAINER = "/etc/egress/routes.yaml"
EGRESS_ROUTES_FILENAME = Path(EGRESS_ROUTES_IN_CONTAINER).name EGRESS_ROUTES_FILENAME = Path(EGRESS_ROUTES_IN_CONTAINER).name
_CANARY_ENV_WORDS = (
"ACCORD",
"ANCHOR",
"ATLAS",
"CANON",
"CIPHER",
"EMBER",
"FALCON",
"HARBOR",
"LANTERN",
"MARBLE",
"NOVA",
"ORBIT",
"PIVOT",
"RADIUS",
"SUMMIT",
"VECTOR",
)
def _random_canary_env() -> str:
first = secrets.choice(_CANARY_ENV_WORDS)
remaining = tuple(word for word in _CANARY_ENV_WORDS if word != first)
second = secrets.choice(remaining)
return f"{first}_{second}_SECRET"
def egress_sidecar_env_entries(plan: "EgressPlan") -> tuple[str, ...]:
"""Return sidecar env entries needed by egress across all backends."""
env: list[str] = []
if plan.routes:
env.extend(sorted(plan.token_env_map.keys()))
if plan.canary and plan.canary_env:
env.append(f"{plan.canary_env}={plan.canary}")
env.append(f"BOT_BOTTLE_SENSITIVE_PREFIXES={plan.canary_env}")
return tuple(env)
def egress_agent_env_entries(plan: "EgressPlan") -> tuple[str, ...]:
"""Return agent-visible egress env entries shared by all backends."""
if plan.canary and plan.canary_env:
return (f"{plan.canary_env}={plan.canary}",)
return ()
@dataclass(frozen=True) @dataclass(frozen=True)
class EgressRoute(Route): class EgressRoute(Route):
@@ -65,6 +110,8 @@ class EgressPlan:
mitmproxy_ca_host_path: Path = Path() mitmproxy_ca_host_path: Path = Path()
mitmproxy_ca_cert_only_host_path: Path = Path() mitmproxy_ca_cert_only_host_path: Path = Path()
log: int = 0 log: int = 0
canary: str = ""
canary_env: str = ""
def egress_manifest_routes( def egress_manifest_routes(
@@ -324,12 +371,18 @@ class Egress(ABC):
routes_path = stage_dir / EGRESS_ROUTES_FILENAME routes_path = stage_dir / EGRESS_ROUTES_FILENAME
routes_path.write_text(egress_render_routes(routes, log=log)) routes_path.write_text(egress_render_routes(routes, log=log))
routes_path.chmod(0o600) routes_path.chmod(0o600)
# Generate a per-session fake secret under a plausible random env name.
# The sidecar marks that exact env name as sensitive for known-secret
# scanning; the agent receives the same name/value as exfil bait.
canary = secrets.token_urlsafe(32)
return EgressPlan( return EgressPlan(
slug=slug, slug=slug,
routes_path=routes_path, routes_path=routes_path,
routes=routes, routes=routes,
token_env_map=egress_token_env_map(routes), token_env_map=egress_token_env_map(routes),
log=log, log=log,
canary=canary,
canary_env=_random_canary_env(),
) )
__all__ = [ __all__ = [
@@ -344,5 +397,7 @@ __all__ = [
"egress_render_routes", "egress_render_routes",
"egress_resolve_token_values", "egress_resolve_token_values",
"egress_routes_for_bottle", "egress_routes_for_bottle",
"egress_agent_env_entries",
"egress_sidecar_env_entries",
"egress_token_env_map", "egress_token_env_map",
] ]
+32 -3
View File
@@ -34,7 +34,7 @@ VALID_METHODS = frozenset({
"CONNECT", "CONNECT",
}) })
OUTBOUND_DETECTOR_NAMES = frozenset({"token_patterns", "known_secrets"}) OUTBOUND_DETECTOR_NAMES = frozenset({"token_patterns", "known_secrets", "entropy"})
INBOUND_DETECTOR_NAMES = frozenset({"naive_injection_detection"}) INBOUND_DETECTOR_NAMES = frozenset({"naive_injection_detection"})
# Per-route policy for what the proxy does when an outbound DLP detector # Per-route policy for what the proxy does when an outbound DLP detector
@@ -729,17 +729,28 @@ def scan_outbound(
try: try:
from dlp_detectors import ( # type: ignore[import-not-found] from dlp_detectors import ( # type: ignore[import-not-found]
scan_crlf_injection, scan_crlf_injection,
scan_entropy,
scan_known_secrets, scan_known_secrets,
scan_token_patterns, scan_token_patterns,
) )
except ImportError: # pragma: no cover - host-side path except ImportError: # pragma: no cover - host-side path
from .dlp_detectors import ( # type: ignore[import-not-found] from .dlp_detectors import ( # type: ignore[import-not-found]
scan_crlf_injection, scan_crlf_injection,
scan_entropy,
scan_known_secrets, scan_known_secrets,
scan_token_patterns, scan_token_patterns,
) )
text = body if isinstance(body, str) else body.decode("utf-8", errors="replace") # Binary bodies: latin-1 is a bijective byte↔codepoint mapping that
# preserves every byte value, so ASCII-range secret strings remain
# findable by str.find / regex. Prefer strict UTF-8 for valid text bodies.
if isinstance(body, bytes):
try:
text = body.decode("utf-8")
except UnicodeDecodeError:
text = body.decode("latin-1")
else:
text = body
# CRLF injection is only an attack in the request line + headers, never the # CRLF injection is only an attack in the request line + headers, never the
# body: an HTTP body is delimited by Content-Length, so CRLF bytes there # body: an HTTP body is delimited by Content-Length, so CRLF bytes there
@@ -758,12 +769,30 @@ def scan_outbound(
return result return result
if _detector_enabled(route.outbound_detectors, "known_secrets"): if _detector_enabled(route.outbound_detectors, "known_secrets"):
# BOT_BOTTLE_SENSITIVE_PREFIXES lets operators add extra env prefixes
# beyond EGRESS_TOKEN_* without changing the manifest schema.
extra_raw = environ.get("BOT_BOTTLE_SENSITIVE_PREFIXES", "")
extra = tuple(p for p in extra_raw.split(",") if p)
sensitive_prefixes = ("EGRESS_TOKEN_",) + extra
result = scan_known_secrets( result = scan_known_secrets(
text, location="body", env=environ, safe_tokens=safe_tokens, text, location="body", env=environ,
sensitive_prefixes=sensitive_prefixes, safe_tokens=safe_tokens,
) )
if result is not None: if result is not None:
return result return result
# Entropy scanning requires explicit opt-in: it is NOT part of the
# default "all detectors" set because it produces false positives on
# legitimate base64 / binary payloads. Routes must list "entropy" in
# dlp.outbound_detectors to enable it.
if (
route.outbound_detectors is not None
and "entropy" in route.outbound_detectors
):
result = scan_entropy(text, location="body")
if result is not None:
return result
return None return None
@@ -0,0 +1,189 @@
# PRD prd-new: Strengthen outbound exfiltration detection
- **Status:** Active
- **Author:** claude
- **Created:** 2026-06-25
- **Issue:** #259
## Summary
The current DLP system has two detection tiers: weak (vendor token regexes)
and strong (exact matching of provisioned `EGRESS_TOKEN_*` values in multiple
encodings). This PRD adds four complementary hardening layers in ranked order
of impact:
1. **Canary tokens** — inject a per-session fake secret into the agent's
environment; any outbound appearance is a zero-false-positive exfil signal.
2. **Broadened known-value scanning** — extend the strong tier beyond
`EGRESS_TOKEN_*` via a `BOT_BOTTLE_SENSITIVE_PREFIXES` sidecar env var.
3. **Fragmentation-resistant matching** — catch separator injection and
long-substring leaks via alnum projection and sliding-window substring
search.
4. **Entropy scoring** — warn-only flag on high-entropy blobs as a
complementary signal for encrypted exfil.
## Problem
After PRD 0056 the outbound scan covers all HTTP request surfaces (host, path,
query, headers, body). But the content-based strong tier only matches
`EGRESS_TOKEN_*` values and their encodings. Three gaps remain:
| Gap | How an agent exploits it |
|-----|--------------------------|
| No canary | There is no "can't be false-positive" signal |
| Narrow prefix filter | Sensitive values provisioned under other env prefixes (MCP keys, API keys injected via the agent provider) are invisible to `scan_known_secrets` |
| Fragmentation blindness | Inserting `-`, space, or `\n` between secret characters turns an exact-match into a miss |
## Goals / Success Criteria
1. Each launched bottle has a unique canary token in the agent's environment
under a randomized `WORD_WORD_SECRET` env var name. The egress sidecar gets
the same env var and registers that exact name through
`BOT_BOTTLE_SENSITIVE_PREFIXES`. Any outbound appearance of the canary
blocks the request as a known-secret match.
2. `scan_known_secrets` accepts a `sensitive_prefixes` parameter (default:
`("EGRESS_TOKEN_",)`). `scan_outbound` reads
`BOT_BOTTLE_SENSITIVE_PREFIXES` from `environ` and merges those prefixes
in, so operators can mark additional env vars as scanned values without
changing the manifest schema.
3. For every secret that passes exact-match, a secondary alnum-projection pass
checks for the secret with all non-alphanumeric characters stripped. This
catches separator-injection evasion (`MY-SECRET` → body contains
`MY SECRET`).
4. A sliding-window partial-match pass checks for long-enough contiguous
substrings of the secret's alnum projection in the text's alnum projection.
Any match ≥ `PARTIAL_MATCH_MIN_LEN` (12 chars) blocks with reason
`"partial match"`.
5. A new `scan_entropy` detector flags outbound text windows with Shannon
entropy ≥ `ENTROPY_BLOCK_THRESHOLD` (5.5 bits/char) at **warn** severity
only. It is registered under the new detector name `"entropy"` in
`OUTBOUND_DETECTOR_NAMES` and disabled by default (routes must opt in).
6. Binary request bodies are decoded via `latin-1` instead of
`utf-8 errors="replace"`, preserving every byte value and allowing
ASCII-range secrets to be found within binary payloads.
7. All new behaviour is unit-tested; existing tests pass unchanged.
## Non-goals
- Rolling per-host buffer for split-across-requests detection (state in the
stateless addon is complex; deferred).
- Additional vendor regexes.
- ML / embedding-based detection.
- Entropy-based hard blocks (warn only per the issue).
## Design
### Canary token flow
```
Egress.prepare()
canary = secrets.token_urlsafe(32)
canary_env = <random WORD_WORD_SECRET>
EgressPlan(canary=canary, canary_env=canary_env, ...)
Docker compose render:
sidecar env: <canary_env>=<canary>
sidecar env: BOT_BOTTLE_SENSITIVE_PREFIXES=<canary_env>
agent env: <canary_env>=<canary> ← visible to agent as a "secret"
macos-container launch: same literals added to sidecar + agent env entries
```
The sidecar uses `BOT_BOTTLE_SENSITIVE_PREFIXES` to make the random canary env
name part of the existing `scan_known_secrets` detector without adding a
manifest schema field.
### Broadened known-value scanning
`scan_known_secrets` gains a `sensitive_prefixes` parameter:
```python
def scan_known_secrets(
text: str,
*,
location: str = "body",
env: Mapping[str, str] | None = None,
sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",),
) -> ScanResult | None:
```
`scan_outbound` reads `BOT_BOTTLE_SENSITIVE_PREFIXES` (comma-separated list
of additional prefixes) from `environ` and appends them:
```python
extra = tuple(
p for p in environ.get("BOT_BOTTLE_SENSITIVE_PREFIXES", "").split(",") if p
)
sensitive_prefixes = ("EGRESS_TOKEN_",) + extra
```
`redact_tokens` receives the same treatment for consistent redaction.
### Fragmentation-resistant matching
A new helper `_alnum_projection(text)` strips all non-alphanumeric characters.
`scan_known_secrets` runs two passes per secret:
1. **Exact pass** — existing encoded-variant loop (unchanged).
2. **Alnum-projection pass** — if the secret's alnum projection has ≥ 8 chars,
check if it appears in the text's alnum projection. Match → block with
`"fragmented match (separator injection)"` reason.
3. **Partial-substring pass** — if the secret's alnum projection has ≥
`PARTIAL_MATCH_MIN_LEN` chars (12), slide a window of that length across the
secret's projection and look for each window in the text's alnum projection.
First match → block with `"partial match"` reason.
All three passes run only for the `"known_secrets"` detector; the token-pattern
and entropy detectors are unchanged.
### Entropy scoring
New public function:
```python
def scan_entropy(
text: str,
*,
location: str = "body",
window: int = ENTROPY_WINDOW, # 64
threshold: float = ENTROPY_BLOCK_THRESHOLD, # 5.5
) -> ScanResult | None:
```
Slides a window of `window` characters across `text` in steps of `window // 2`.
If any window's Shannon entropy exceeds `threshold`, returns a **warn**-severity
`ScanResult`. Never blocks.
`OUTBOUND_DETECTOR_NAMES` gains `"entropy"`. Routes opt in via their `dlp`
block; entropy scanning is **off by default** to avoid false-positive noise on
legitimate binary payloads.
### Binary body handling
In `scan_outbound`, the bytes → str decoding changes from:
```python
body.decode("utf-8", errors="replace")
```
to:
```python
body.decode("utf-8") if body is str else body.decode("latin-1")
```
`latin-1` is a bijective byte↔codepoint mapping; every byte value is preserved
as its corresponding Latin-1 code point, so ASCII-range secret strings remain
intact and `str.find` / regex still locate them correctly. The fallback from
strict UTF-8 is tried first so valid UTF-8 bodies are decoded faithfully.
## Implementation
Delivered in three commits on the same branch:
1. **DLP detector changes**`_alnum_projection`, fragmentation passes,
`scan_entropy`, broadened `scan_known_secrets`, updated `scan_outbound` and
`redact_tokens`; all accompanying unit tests.
2. **Canary injection**`EgressPlan.canary`, `Egress.prepare()`,
Docker compose + macos-container backend injection.
3. **PRD flip**`Status: Draft → Active`.
+23 -2
View File
@@ -80,7 +80,11 @@ def _git_gate_plan(upstreams: tuple[GitGateUpstream, ...] = ()) -> GitGatePlan:
) )
def _egress_plan(routes: tuple[EgressRoute, ...] = ()) -> EgressPlan: def _egress_plan(
routes: tuple[EgressRoute, ...] = (),
*,
canary: bool = False,
) -> EgressPlan:
token_env_map = { token_env_map = {
r.token_env: r.token_ref r.token_env: r.token_ref
for r in routes for r in routes
@@ -95,6 +99,8 @@ def _egress_plan(routes: tuple[EgressRoute, ...] = ()) -> EgressPlan:
egress_network=f"bot-bottle-egress-{SLUG}", egress_network=f"bot-bottle-egress-{SLUG}",
mitmproxy_ca_host_path=STATE / "egress-ca" / "mitmproxy-ca.pem", mitmproxy_ca_host_path=STATE / "egress-ca" / "mitmproxy-ca.pem",
mitmproxy_ca_cert_only_host_path=STATE / "egress-ca" / "ca.pem", mitmproxy_ca_cert_only_host_path=STATE / "egress-ca" / "ca.pem",
canary="fake-canary-value" if canary else "",
canary_env="CANON_ALPHA_SECRET" if canary else "",
) )
@@ -112,6 +118,7 @@ def _plan(
with_git: bool = False, with_git: bool = False,
with_egress: bool = False, with_egress: bool = False,
supervise: bool = False, supervise: bool = False,
canary: bool = False,
) -> DockerBottlePlan: ) -> DockerBottlePlan:
"""Build a fully-resolved DockerBottlePlan. Toggles cover the """Build a fully-resolved DockerBottlePlan. Toggles cover the
matrix the renderer's conditional-service logic branches on.""" matrix the renderer's conditional-service logic branches on."""
@@ -150,7 +157,7 @@ def _plan(
slug=SLUG, slug=SLUG,
forwarded_env={"CLAUDE_CODE_OAUTH_TOKEN": "x"}, forwarded_env={"CLAUDE_CODE_OAUTH_TOKEN": "x"},
git_gate_plan=_git_gate_plan(upstreams), git_gate_plan=_git_gate_plan(upstreams),
egress_plan=_egress_plan(routes), egress_plan=_egress_plan(routes, canary=canary),
supervise_plan=_supervise_plan() if supervise else None, supervise_plan=_supervise_plan() if supervise else None,
use_runsc=False, use_runsc=False,
agent_provision=AgentProvisionPlan( agent_provision=AgentProvisionPlan(
@@ -375,6 +382,20 @@ class TestSidecarBundleShape(unittest.TestCase):
env_strings = sc["environment"] env_strings = sc["environment"]
self.assertNotIn("EGRESS_TOKEN_0", env_strings) self.assertNotIn("EGRESS_TOKEN_0", env_strings)
def test_canary_env_registered_as_sensitive_in_sidecar(self):
sc = self._render(canary=True)["services"]["sidecars"]
env_strings = sc["environment"]
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", env_strings)
self.assertIn(
"BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET",
env_strings,
)
def test_canary_env_visible_to_agent(self):
agent = self._render(canary=True)["services"]["agent"]
env_strings = agent["environment"]
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", env_strings)
def test_supervise_env_present_when_active(self): def test_supervise_env_present_when_active(self):
sc = self._render(supervise=True)["services"]["sidecars"] sc = self._render(supervise=True)["services"]["sidecars"]
env_strings = sc["environment"] env_strings = sc["environment"]
+192 -2
View File
@@ -1,18 +1,23 @@
"""Unit: DLP detectors (PRD 0053). """Unit: DLP detectors (PRD 0053).
Tests for token pattern scanning, known secret detection, and Tests for token pattern scanning, known secret detection, fragmentation-
naive prompt injection detection.""" resistant matching, entropy scoring, and naive prompt injection detection."""
import base64 import base64
import gzip import gzip
import unittest import unittest
from bot_bottle.dlp_detectors import ( from bot_bottle.dlp_detectors import (
ENTROPY_BLOCK_THRESHOLD,
PARTIAL_MATCH_MIN_LEN,
REDACT, REDACT,
_alnum_projection,
_encoded_variants, _encoded_variants,
_normalize_text, _normalize_text,
_shannon_entropy,
redact_tokens, redact_tokens,
scan_crlf_injection, scan_crlf_injection,
scan_entropy,
scan_known_secrets, scan_known_secrets,
scan_naive_injection, scan_naive_injection,
scan_token_patterns, scan_token_patterns,
@@ -502,6 +507,191 @@ class TestStripCrlf(unittest.TestCase):
from bot_bottle.dlp_detectors import strip_crlf from bot_bottle.dlp_detectors import strip_crlf
self.assertEqual("/api/v1/data?q=hello", strip_crlf("/api/v1/data?q=hello")) self.assertEqual("/api/v1/data?q=hello", strip_crlf("/api/v1/data?q=hello"))
class TestAlnumProjection(unittest.TestCase):
def test_alphanumeric_unchanged(self):
self.assertEqual("abc123XYZ", _alnum_projection("abc123XYZ"))
def test_strips_hyphens(self):
self.assertEqual("mysecretvalue", _alnum_projection("my-secret-value"))
def test_strips_spaces(self):
self.assertEqual("mysecretvalue", _alnum_projection("my secret value"))
def test_strips_dots_and_underscores(self):
self.assertEqual("mysecretvalue", _alnum_projection("my.secret_value"))
def test_empty_string(self):
self.assertEqual("", _alnum_projection(""))
def test_all_special_chars(self):
self.assertEqual("", _alnum_projection("!@#$%^&*()"))
class TestFragmentationResistantMatching(unittest.TestCase):
"""scan_known_secrets catches separator-injection and partial-substring evasion."""
# Secrets long enough that their alnum projections are ≥ 8 chars.
SECRET = "supersecrettoken99"
ENV = {"EGRESS_TOKEN_0": SECRET}
def test_exact_match_still_works(self):
result = scan_known_secrets(f"key={self.SECRET}", env=self.ENV)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_separator_injection_blocked(self):
# Hyphens inserted between chars of the secret.
fragmented = "-".join(self.SECRET)
result = scan_known_secrets(f"data={fragmented}", env=self.ENV)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
self.assertIn("separator injection", result.reason)
def test_space_separator_blocked(self):
fragmented = " ".join(self.SECRET)
result = scan_known_secrets(f"body: {fragmented}", env=self.ENV)
self.assertIsNotNone(result)
assert result is not None
self.assertIn("separator injection", result.reason)
def test_partial_substring_blocked(self):
# First PARTIAL_MATCH_MIN_LEN alnum chars of the secret, no separators.
partial = _alnum_projection(self.SECRET)[:PARTIAL_MATCH_MIN_LEN]
result = scan_known_secrets(f"x={partial}&y=other", env=self.ENV)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
self.assertIn("partial match", result.reason)
def test_short_secret_skips_projection(self):
# Secrets shorter than _ALNUM_MIN_LEN in alnum projection are not
# fragmentation-checked (too many false positives).
short_env = {"EGRESS_TOKEN_0": "abc"}
# "a b c" has alnum projection "abc" (3 chars, < 8); should not block.
self.assertIsNone(scan_known_secrets("a b c", env=short_env))
def test_clean_text_not_blocked(self):
self.assertIsNone(scan_known_secrets("nothing to see here", env=self.ENV))
def test_sensitive_prefixes_param_extra_prefix(self):
env = {"MY_CRED_0": self.SECRET, "IGNORED": "other"}
result = scan_known_secrets(
f"key={self.SECRET}",
env=env,
sensitive_prefixes=("MY_CRED_",),
)
self.assertIsNotNone(result)
assert result is not None
self.assertIn("MY_CRED_0", result.reason)
def test_sensitive_prefixes_default_only_egress_token(self):
# A value under a non-EGRESS_TOKEN_ key is ignored with default prefixes.
env = {"MY_CRED_0": self.SECRET}
self.assertIsNone(scan_known_secrets(f"key={self.SECRET}", env=env))
def test_canary_prefix_detected(self):
canary_value = "canary-fake-secret-value-xyz"
env = {"CANON_ALPHA_SECRET": canary_value}
result = scan_known_secrets(
f"x={canary_value}",
env=env,
sensitive_prefixes=("CANON_ALPHA_SECRET",),
)
self.assertIsNotNone(result)
assert result is not None
self.assertIn("CANON_ALPHA_SECRET", result.reason)
class TestRedactTokensBroadenedPrefixes(unittest.TestCase):
SECRET = "my-provisioned-secret"
def test_default_redacts_egress_token(self):
env = {"EGRESS_TOKEN_0": self.SECRET}
out = redact_tokens(f"val={self.SECRET}", env=env)
self.assertNotIn(self.SECRET, out)
self.assertIn(REDACT, out)
def test_extra_prefix_redacted(self):
env = {"MY_SECRET_KEY": self.SECRET}
out = redact_tokens(
f"val={self.SECRET}",
env=env,
sensitive_prefixes=("MY_SECRET_",),
)
self.assertNotIn(self.SECRET, out)
self.assertIn(REDACT, out)
def test_non_matching_prefix_not_redacted(self):
env = {"MY_SECRET_KEY": self.SECRET}
out = redact_tokens(f"val={self.SECRET}", env=env)
# Default prefixes only include EGRESS_TOKEN_ → secret not redacted
self.assertIn(self.SECRET, out)
class TestShannonEntropy(unittest.TestCase):
def test_empty_string_zero(self):
self.assertEqual(0.0, _shannon_entropy(""))
def test_single_char_zero(self):
self.assertEqual(0.0, _shannon_entropy("aaaaaa"))
def test_two_equal_chars_one_bit(self):
self.assertAlmostEqual(1.0, _shannon_entropy("abababab"), places=10)
def test_high_entropy_random_like(self):
# Uniform 64-char string over 64 distinct symbols has entropy 6 bits.
import string
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
text = alphabet # each char appears exactly once
self.assertAlmostEqual(6.0, _shannon_entropy(text), places=10)
class TestScanEntropy(unittest.TestCase):
def test_empty_returns_none(self):
self.assertIsNone(scan_entropy(""))
def test_low_entropy_returns_none(self):
# Highly repetitive text has low entropy.
self.assertIsNone(scan_entropy("a" * 200))
def test_high_entropy_warns(self):
# Build a 64-char string with entropy > ENTROPY_BLOCK_THRESHOLD.
# Use all 64 distinct printable chars to maximise entropy (~6 bits).
import string
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
result = scan_entropy(alphabet, threshold=ENTROPY_BLOCK_THRESHOLD)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("warn", result.severity)
self.assertIn("high-entropy", result.reason)
def test_never_blocks(self):
import string
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
result = scan_entropy(alphabet)
# scan_entropy is warn-only; it must never return severity="block".
if result is not None:
self.assertNotEqual("block", result.severity)
def test_location_in_result(self):
import string
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
result = scan_entropy(alphabet, location="authorization header")
if result is not None:
self.assertIn("authorization header", result.location)
def test_structured_json_no_warn(self):
# Typical JSON has low entropy and should not be flagged.
json_body = '{"status": "ok", "message": "hello world", "count": 42}'
self.assertIsNone(scan_entropy(json_body))
def test_short_text_below_window(self):
# Text shorter than the window: checked as one chunk.
# Use a uniform string to ensure it won't be flagged.
self.assertIsNone(scan_entropy("abcde", threshold=ENTROPY_BLOCK_THRESHOLD))
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
+120
View File
@@ -1,15 +1,21 @@
"""Unit: Egress route lift + routes.yaml render + token """Unit: Egress route lift + routes.yaml render + token
resolution (PRD 0017, PRD 0053).""" resolution (PRD 0017, PRD 0053)."""
import tempfile
import unittest import unittest
from pathlib import Path
from bot_bottle.egress import ( from bot_bottle.egress import (
CODEX_HOST_CREDENTIAL_TOKEN_REF, CODEX_HOST_CREDENTIAL_TOKEN_REF,
Egress,
EgressPlan,
EgressRoute, EgressRoute,
egress_agent_env_entries,
egress_manifest_routes, egress_manifest_routes,
egress_render_routes, egress_render_routes,
egress_resolve_token_values, egress_resolve_token_values,
egress_routes_for_bottle, egress_routes_for_bottle,
egress_sidecar_env_entries,
egress_token_env_map, egress_token_env_map,
) )
from bot_bottle.log import Die from bot_bottle.log import Die
@@ -443,5 +449,119 @@ class TestResolveTokenValues(unittest.TestCase):
self.assertEqual({"EGRESS_TOKEN_0": "codex-access-token"}, out) self.assertEqual({"EGRESS_TOKEN_0": "codex-access-token"}, out)
class TestCanaryGeneration(unittest.TestCase):
"""Egress.prepare() generates a unique canary token per session."""
def _bottle_obj(self):
return ManifestIndex.from_json_obj({
"bottles": {"dev": {"egress": {"routes": []}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
}).bottles["dev"]
def _make_plan(self) -> EgressPlan:
# Use a concrete no-op subclass so we can call prepare() without
# a real backend.
class _TestEgress(Egress):
pass
e = _TestEgress()
with tempfile.TemporaryDirectory() as td:
return e.prepare(self._bottle_obj(), "test-slug", Path(td))
def test_canary_is_non_empty(self):
plan = self._make_plan()
self.assertIsInstance(plan.canary, str)
self.assertGreater(len(plan.canary), 0)
self.assertRegex(plan.canary_env, r"^[A-Z]+_[A-Z]+_SECRET$")
def test_canary_is_unique_per_session(self):
with tempfile.TemporaryDirectory() as td:
bottle = self._bottle_obj()
class _TestEgress(Egress):
pass
e = _TestEgress()
plan_a = e.prepare(bottle, "slug-a", Path(td))
plan_b = e.prepare(bottle, "slug-b", Path(td))
self.assertNotEqual(plan_a.canary, plan_b.canary)
def test_canary_detected_by_scan_known_secrets(self):
from bot_bottle.dlp_detectors import scan_known_secrets
plan = self._make_plan()
env = {plan.canary_env: plan.canary}
result = scan_known_secrets(
f"exfil={plan.canary}",
env=env,
sensitive_prefixes=(plan.canary_env,),
)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
self.assertIn(plan.canary_env, result.reason)
def test_egress_plan_canary_field_default_empty(self):
# Verify EgressPlan can be constructed with an empty canary (backward compat).
from pathlib import Path
plan = EgressPlan(
slug="s",
routes_path=Path("/tmp/r.yaml"),
routes=(),
token_env_map={},
)
self.assertEqual("", plan.canary)
self.assertEqual("", plan.canary_env)
class TestEgressEnvEntries(unittest.TestCase):
def test_sidecar_entries_include_route_tokens_and_canary_scan_prefix(self):
plan = EgressPlan(
slug="s",
routes_path=Path("/tmp/r.yaml"),
routes=(EgressRoute(host="api.example"),),
token_env_map={"EGRESS_TOKEN_1": "T1", "EGRESS_TOKEN_0": "T0"},
canary="fake-canary-value",
canary_env="CANON_ALPHA_SECRET",
)
self.assertEqual(
(
"EGRESS_TOKEN_0",
"EGRESS_TOKEN_1",
"CANON_ALPHA_SECRET=fake-canary-value",
"BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET",
),
egress_sidecar_env_entries(plan),
)
def test_agent_entries_include_only_canary_bait(self):
plan = EgressPlan(
slug="s",
routes_path=Path("/tmp/r.yaml"),
routes=(),
token_env_map={},
canary="fake-canary-value",
canary_env="CANON_ALPHA_SECRET",
)
self.assertEqual(
("CANON_ALPHA_SECRET=fake-canary-value",),
egress_agent_env_entries(plan),
)
def test_canary_entries_omitted_when_name_missing(self):
plan = EgressPlan(
slug="s",
routes_path=Path("/tmp/r.yaml"),
routes=(),
token_env_map={},
canary="fake-canary-value",
)
self.assertEqual((), egress_sidecar_env_entries(plan))
self.assertEqual((), egress_agent_env_entries(plan))
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
+103
View File
@@ -1273,6 +1273,109 @@ class TestBuildTokenAllowPayload(unittest.TestCase):
result = ScanResult(severity="block", reason="r", matched="x") result = ScanResult(severity="block", reason="r", matched="x")
payload = build_token_allow_payload("h", "GET", "/", result) payload = build_token_allow_payload("h", "GET", "/", result)
self.assertNotIn("context:", payload) self.assertNotIn("context:", payload)
class TestScanOutboundEnhanced(unittest.TestCase):
"""scan_outbound changes: binary decode, entropy detector,
broadened known-value prefixes, fragmentation resistance."""
_ROUTE = Route(host="api.example.com")
_ROUTE_ENTROPY = Route(
host="api.example.com",
outbound_detectors=("entropy",),
)
def test_binary_body_latin1_decode_finds_ascii_secret(self):
# Body contains valid ASCII secret surrounded by non-UTF-8 bytes.
secret = "supersecrettoken99"
env = {"EGRESS_TOKEN_0": secret}
# Wrap the secret in bytes that are invalid UTF-8.
body = b"\x80\x81" + secret.encode("ascii") + b"\xff"
result = scan_outbound(self._ROUTE, body, env)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_binary_body_valid_utf8_decoded_correctly(self):
env = {"EGRESS_TOKEN_0": "mysecret"}
# Valid UTF-8 body — should be decoded as UTF-8, not latin-1.
body = "clean body with mysecret".encode("utf-8")
result = scan_outbound(self._ROUTE, body, env)
self.assertIsNotNone(result)
def test_entropy_detector_off_by_default(self):
import string
# High-entropy content should NOT warn if the route has no entropy detector.
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
result = scan_outbound(self._ROUTE, alphabet, {})
self.assertIsNone(result)
def test_entropy_detector_warns_when_enabled(self):
import string
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
result = scan_outbound(self._ROUTE_ENTROPY, alphabet, {})
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("warn", result.severity)
def test_bot_bottle_sensitive_prefixes_env_var(self):
# When the sidecar env contains BOT_BOTTLE_SENSITIVE_PREFIXES,
# scan_outbound should scan those additional prefixes.
secret = "extra-sensitive-value-abc"
env = {
"MY_CRED_KEY": secret,
"BOT_BOTTLE_SENSITIVE_PREFIXES": "MY_CRED_",
}
result = scan_outbound(self._ROUTE, f"x={secret}", env)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_bot_bottle_sensitive_prefixes_multiple(self):
secret = "my-api-key-value-xyz"
env = {
"ANTHROPIC_API_0": secret,
"BOT_BOTTLE_SENSITIVE_PREFIXES": "ANTHROPIC_API_,OTHER_",
}
result = scan_outbound(self._ROUTE, f"auth={secret}", env)
self.assertIsNotNone(result)
def test_canary_detected_via_random_secret_env_name(self):
# The fake secret uses a randomized env name that the sidecar marks
# as sensitive through BOT_BOTTLE_SENSITIVE_PREFIXES.
canary = "canaryvalue12345abcdef"
env = {
"CANON_ALPHA_SECRET": canary,
"BOT_BOTTLE_SENSITIVE_PREFIXES": "CANON_ALPHA_SECRET",
}
result = scan_outbound(self._ROUTE, f"data={canary}", env)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
self.assertIn("CANON_ALPHA_SECRET", result.reason)
def test_fragmented_canary_blocked(self):
# Canary with separators injected is still caught.
canary = "supersecretcanary99"
env = {
"CANON_ALPHA_SECRET": canary,
"BOT_BOTTLE_SENSITIVE_PREFIXES": "CANON_ALPHA_SECRET",
}
fragmented = "-".join(canary)
result = scan_outbound(self._ROUTE, f"x={fragmented}", env)
self.assertIsNotNone(result)
class TestOutboundDetectorNames(unittest.TestCase):
def test_entropy_in_outbound_detector_names(self):
from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES
self.assertIn("entropy", OUTBOUND_DETECTOR_NAMES)
def test_known_secrets_in_outbound_detector_names(self):
from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES
self.assertIn("known_secrets", OUTBOUND_DETECTOR_NAMES)
def test_token_patterns_in_outbound_detector_names(self):
from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES
self.assertIn("token_patterns", OUTBOUND_DETECTOR_NAMES)
if __name__ == "__main__": if __name__ == "__main__":
+24 -1
View File
@@ -30,6 +30,7 @@ def _plan(
supervise: bool = False, supervise: bool = False,
agent_git_gate_url: str = "", agent_git_gate_url: str = "",
agent_supervise_url: str = "", agent_supervise_url: str = "",
canary: bool = False,
) -> MacosContainerBottlePlan: ) -> MacosContainerBottlePlan:
routes_path = stage_dir / "routes.yaml" routes_path = stage_dir / "routes.yaml"
routes_path.write_text("routes: []\n", encoding="utf-8") routes_path.write_text("routes: []\n", encoding="utf-8")
@@ -42,6 +43,8 @@ def _plan(
routes_path=routes_path, routes_path=routes_path,
routes=("route",), routes=("route",),
token_env_map={"EGRESS_TOKEN_0": "HOST_TOKEN"}, token_env_map={"EGRESS_TOKEN_0": "HOST_TOKEN"},
canary="fake-canary-value" if canary else "",
canary_env="CANON_ALPHA_SECRET" if canary else "",
) )
if git: if git:
key_path = stage_dir / "origin-key" key_path = stage_dir / "origin-key"
@@ -138,6 +141,26 @@ class TestMacosContainerLaunchArgv(unittest.TestCase):
argv, argv,
) )
def test_sidecar_argv_registers_canary_env_as_sensitive(self):
plan = _plan(stage_dir=self.stage_dir, canary=True)
argv = launch._sidecar_run_argv(
plan,
"bot-bottle-sidecars-dev-abc",
"bot-bottle-net-dev-abc",
"bot-bottle-egress-dev-abc",
)
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", argv)
self.assertIn("BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET", argv)
def test_agent_argv_receives_canary_env(self):
plan = _plan(stage_dir=self.stage_dir, canary=True)
argv = launch._agent_run_argv(
plan,
"bot-bottle-net-dev-abc",
"192.0.2.10",
)
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", argv)
def test_agent_env_points_proxy_at_sidecar_ip(self): def test_agent_env_points_proxy_at_sidecar_ip(self):
plan = _plan( plan = _plan(
stage_dir=self.stage_dir, stage_dir=self.stage_dir,
@@ -271,7 +294,7 @@ def _build_plan(stage_dir: Path) -> MacosContainerBottlePlan:
manifest=_MANIFEST, manifest=_MANIFEST,
stage_dir=stage_dir, stage_dir=stage_dir,
git_gate_plan=cast(GitGatePlan, SimpleNamespace(upstreams=())), git_gate_plan=cast(GitGatePlan, SimpleNamespace(upstreams=())),
egress_plan=cast(EgressPlan, SimpleNamespace()), egress_plan=cast(EgressPlan, SimpleNamespace(canary="")),
supervise_plan=None, supervise_plan=None,
agent_provision=AgentProvisionPlan( agent_provision=AgentProvisionPlan(
template="claude", template="claude",
+29 -3
View File
@@ -26,9 +26,7 @@ from bot_bottle.backend.smolmachines.bottle import SmolmachinesBottle
from bot_bottle.backend.smolmachines.bottle_plan import ( from bot_bottle.backend.smolmachines.bottle_plan import (
SmolmachinesBottlePlan, SmolmachinesBottlePlan,
) )
# from bot_bottle.backend.smolmachines.provision import ( from bot_bottle.backend.smolmachines import launch as _launch
# workspace as _workspace,
# )
from bot_bottle.backend.smolmachines.launch import _bundle_launch_spec from bot_bottle.backend.smolmachines.launch import _bundle_launch_spec
from bot_bottle.backend.util import AGENT_CA_PATH from bot_bottle.backend.util import AGENT_CA_PATH
from bot_bottle.egress import EgressPlan, EgressRoute from bot_bottle.egress import EgressPlan, EgressRoute
@@ -86,6 +84,7 @@ def _plan(
stage_dir: Path | None = None, stage_dir: Path | None = None,
egress_routes: tuple[EgressRoute, ...] = (), egress_routes: tuple[EgressRoute, ...] = (),
egress_ca_path: Path = Path(), egress_ca_path: Path = Path(),
canary: bool = False,
supervise: bool = False, supervise: bool = False,
bundle_ip: str = "192.168.50.2", bundle_ip: str = "192.168.50.2",
agent_git_gate_host: str = "127.0.0.1:55555", agent_git_gate_host: str = "127.0.0.1:55555",
@@ -156,6 +155,8 @@ def _plan(
routes=egress_routes, routes=egress_routes,
token_env_map={}, token_env_map={},
mitmproxy_ca_cert_only_host_path=egress_ca_path, mitmproxy_ca_cert_only_host_path=egress_ca_path,
canary="fake-canary-value" if canary else "",
canary_env="CANON_ALPHA_SECRET" if canary else "",
), ),
supervise_plan=supervise_plan, supervise_plan=supervise_plan,
agent_git_gate_host=agent_git_gate_host, agent_git_gate_host=agent_git_gate_host,
@@ -411,6 +412,31 @@ class TestBundleLaunchSpec(unittest.TestCase):
self.assertIn(9420, spec.ports_to_publish) self.assertIn(9420, spec.ports_to_publish)
self.assertNotIn(9418, spec.ports_to_publish) self.assertNotIn(9418, spec.ports_to_publish)
def test_canary_env_registered_as_sensitive_in_bundle(self):
plan = _plan(canary=True)
spec = _bundle_launch_spec(plan, "net", "127.0.0.16")
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", spec.environment)
self.assertIn(
"BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET",
spec.environment,
)
def test_canary_env_visible_to_smolvm_guest(self):
plan = _plan(canary=True)
with patch.object(
_launch._bundle,
"bundle_host_port",
return_value="65000",
):
stamped = _launch._discover_urls(plan, "127.0.0.16")
self.assertEqual(
"fake-canary-value",
stamped.guest_env["CANON_ALPHA_SECRET"],
)
class TestProvisionGitUser(unittest.TestCase): class TestProvisionGitUser(unittest.TestCase):
"""`provision_git` runs `git config --global` inside the """`provision_git` runs `git config --global` inside the