Compare commits

..

12 Commits

Author SHA1 Message Date
didericis-codex f5fdc0ea72 fix: satisfy pyright for log redaction tests
lint / lint (push) Successful in 2m42s
test / unit (pull_request) Successful in 54s
test / integration (pull_request) Successful in 26s
2026-06-25 00:15:45 -04:00
didericis-claude ca1f14b855 fix(egress): strip injected Authorization and redact bodies in LOG_FULL path
_log_request and _log_response wrote headers and bodies to stderr verbatim.
_log_request also included the sidecar-injected upstream Authorization value,
exposing live bearer tokens on every allowed request under LOG_FULL.

Apply redact_tokens to all header values and bodies in both log functions;
exclude the authorization header from _log_request entirely since its value
is always a live sidecar-injected credential by the time _log_request runs.

Closes #257
2026-06-25 00:15:45 -04:00
didericis-codex d9a9eef276 docs: remove prd-new code citations
test / integration (pull_request) Successful in 46s
test / unit (pull_request) Successful in 1m4s
lint / lint (push) Successful in 2m36s
prd-number / assign-numbers (push) Successful in 1m24s
test / integration (push) Successful in 34s
test / unit (push) Successful in 52s
Update Quality Badges / update-badges (push) Successful in 2m11s
2026-06-25 03:57:41 +00:00
didericis-codex 5204b98777 refactor(egress): centralize launch env entries
lint / lint (push) Successful in 2m12s
test / unit (pull_request) Successful in 43s
test / integration (pull_request) Successful in 25s
2026-06-25 03:35:24 +00:00
didericis-codex 14ae89580a fix(egress): wire canary env for smolmachines
lint / lint (push) Successful in 2m16s
test / unit (pull_request) Successful in 42s
test / integration (pull_request) Successful in 23s
2026-06-25 03:31:51 +00:00
didericis-codex 4808ef557a fix(egress): randomize canary secret env name
lint / lint (push) Successful in 2m15s
test / unit (pull_request) Successful in 45s
test / integration (pull_request) Successful in 26s
2026-06-25 03:25:37 +00:00
didericis-codex 0a7e166b35 fix(tests): remove unused dlp entropy import
lint / lint (push) Successful in 2m8s
test / unit (pull_request) Successful in 40s
test / integration (pull_request) Successful in 23s
2026-06-24 23:09:11 -04:00
didericis-claude a920203730 fix(dlp): skip projection passes when exact variant is safe-listed
When a supervisor-approved safe-token exactly matched an env secret
(Pass 1), Passes 2 & 3 (alnum projection) still ran and re-blocked on
the same value.  Track whether any variant was found-and-approved and
skip the projection passes for that secret in that case.
2026-06-24 23:09:11 -04:00
didericis-claude e02fab15d0 docs(prd): flip prd-new-strengthen-outbound-exfil-detection Draft → Active
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-24 23:09:11 -04:00
didericis-claude 11cf12188d feat(egress): inject per-session canary token into sidecar and agent environments
EgressPlan gains a `canary: str` field (default "") populated in Egress.prepare()
using secrets.token_urlsafe(32).  Each launched bottle:

  - sidecar receives EGRESS_TOKEN_CANARY=<value> (literal env entry, scanned by
    existing known-secrets detector without any detector code changes)
  - agent receives BOT_BOTTLE_CANARY=<value> (visible fake secret that signals
    exfiltration with zero false positives if it appears in outbound traffic)

Docker compose and macos-container backends updated; smolmachines shares docker
compose and so picks this up automatically.  Unit tests cover canary uniqueness,
detection via scan_known_secrets, and EgressPlan backward-compat default.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-24 23:09:11 -04:00
didericis-claude 701df6cb2f feat(dlp): fragmentation resistance, entropy detector, broadened known-value scan
- _alnum_projection(): strip non-alphanumeric chars for separator-injection detection
- scan_known_secrets() gains two extra passes per secret after exact-variant matching:
  alnum-projection exact match (catches hyphens/spaces between secret chars) and a
  sliding-window partial-match scan (catches chunked substrings ≥ PARTIAL_MATCH_MIN_LEN)
- scan_known_secrets() accepts sensitive_prefixes param (default ("EGRESS_TOKEN_",))
  so redact_tokens and call-sites can extend the scanned env-var prefix set
- scan_entropy() warn-only detector flagging windows with Shannon entropy ≥ 5.5 bits/char
- "entropy" added to OUTBOUND_DETECTOR_NAMES; scan_outbound opts it in only when
  explicitly listed in dlp.outbound_detectors (never part of the default "all" set)
- scan_outbound reads BOT_BOTTLE_SENSITIVE_PREFIXES from environ to extend
  scan_known_secrets beyond EGRESS_TOKEN_* without schema changes
- Binary bodies decoded via latin-1 fallback (bijective byte↔codepoint) instead
  of utf-8 errors=replace, preserving ASCII secret strings in binary payloads

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-24 23:09:11 -04:00
didericis-claude ea6bc5a170 docs: draft PRD prd-new for strengthen-outbound-exfil-detection
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-24 23:09:11 -04:00
13 changed files with 944 additions and 25 deletions
+4 -2
View File
@@ -28,6 +28,8 @@ from typing import Any
from ...egress import ( from ...egress import (
EGRESS_HOSTNAME, EGRESS_HOSTNAME,
EGRESS_ROUTES_IN_CONTAINER, EGRESS_ROUTES_IN_CONTAINER,
egress_agent_env_entries,
egress_sidecar_env_entries,
) )
from ...git_gate import GIT_GATE_HOSTNAME from ...git_gate import GIT_GATE_HOSTNAME
from ...log import die, warn from ...log import die, warn
@@ -135,8 +137,7 @@ def _sidecar_bundle_service(plan: DockerBottlePlan) -> dict[str, Any]:
volumes.append(_bind(ep.mitmproxy_ca_host_path, EGRESS_CA_IN_CONTAINER)) volumes.append(_bind(ep.mitmproxy_ca_host_path, EGRESS_CA_IN_CONTAINER))
if ep.routes: if ep.routes:
volumes.append(_bind(ep.routes_path.parent, str(Path(EGRESS_ROUTES_IN_CONTAINER).parent))) volumes.append(_bind(ep.routes_path.parent, str(Path(EGRESS_ROUTES_IN_CONTAINER).parent)))
for token_env in sorted(ep.token_env_map.keys()): env.extend(egress_sidecar_env_entries(ep))
env.append(token_env)
# --- git-gate ----------------------------------------------------- # --- git-gate -----------------------------------------------------
gp = plan.git_gate_plan gp = plan.git_gate_plan
@@ -220,6 +221,7 @@ def _agent_service(plan: DockerBottlePlan) -> dict[str, Any]:
# never lands on argv or in the compose file. # never lands on argv or in the compose file.
for name in sorted(plan.forwarded_env.keys()): for name in sorted(plan.forwarded_env.keys()):
env.append(name) env.append(name)
env.extend(egress_agent_env_entries(plan.egress_plan))
service: dict[str, Any] = { service: dict[str, Any] = {
"image": plan.image, "image": plan.image,
+8 -4
View File
@@ -22,7 +22,12 @@ from ...bottle_state import (
git_gate_state_dir, git_gate_state_dir,
read_committed_image, read_committed_image,
) )
from ...egress import EGRESS_ROUTES_IN_CONTAINER, egress_resolve_token_values from ...egress import (
EGRESS_ROUTES_IN_CONTAINER,
egress_agent_env_entries,
egress_resolve_token_values,
egress_sidecar_env_entries,
)
from ...git_gate import revoke_git_gate_provisioned_keys from ...git_gate import revoke_git_gate_provisioned_keys
from ...log import die, info, warn from ...log import die, info, warn
from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT
@@ -350,9 +355,7 @@ def _sidecar_daemons(plan: MacosContainerBottlePlan) -> tuple[str, ...]:
def _sidecar_env_entries(plan: MacosContainerBottlePlan) -> tuple[str, ...]: def _sidecar_env_entries(plan: MacosContainerBottlePlan) -> tuple[str, ...]:
env: list[str] = [] env: list[str] = list(egress_sidecar_env_entries(plan.egress_plan))
if plan.egress_plan.routes:
env.extend(sorted(plan.egress_plan.token_env_map.keys()))
if plan.git_gate_plan.upstreams: if plan.git_gate_plan.upstreams:
env.append(f"BOT_BOTTLE_GIT_GATE_READY_FILE={_GIT_GATE_READY_FILE}") env.append(f"BOT_BOTTLE_GIT_GATE_READY_FILE={_GIT_GATE_READY_FILE}")
if plan.supervise_plan is not None: if plan.supervise_plan is not None:
@@ -420,6 +423,7 @@ def _agent_env_entries(
env.append(f"{name}={value}") env.append(f"{name}={value}")
for name in sorted(plan.forwarded_env.keys()): for name in sorted(plan.forwarded_env.keys()):
env.append(name) env.append(name)
env.extend(egress_agent_env_entries(plan.egress_plan))
return tuple(env) return tuple(env)
+6 -5
View File
@@ -23,7 +23,9 @@ from typing import Callable, Generator
from ...egress import ( from ...egress import (
EGRESS_ROUTES_IN_CONTAINER, EGRESS_ROUTES_IN_CONTAINER,
egress_agent_env_entries,
egress_resolve_token_values, egress_resolve_token_values,
egress_sidecar_env_entries,
) )
from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT
from ...util import expand_tilde from ...util import expand_tilde
@@ -228,6 +230,9 @@ def _discover_urls(
guest_env["GIT_GATE_URL"] = f"http://{agent_git_gate_host}" guest_env["GIT_GATE_URL"] = f"http://{agent_git_gate_host}"
if agent_supervise_url: if agent_supervise_url:
guest_env["MCP_SUPERVISE_URL"] = agent_supervise_url guest_env["MCP_SUPERVISE_URL"] = agent_supervise_url
for entry in egress_agent_env_entries(plan.egress_plan):
name, value = entry.split("=", 1)
guest_env[name] = value
return dataclasses.replace( return dataclasses.replace(
plan, plan,
@@ -316,11 +321,7 @@ def _bundle_launch_spec(
volumes.append((str(ep.mitmproxy_ca_host_path), EGRESS_CA_IN_CONTAINER, True)) volumes.append((str(ep.mitmproxy_ca_host_path), EGRESS_CA_IN_CONTAINER, True))
if ep.routes: if ep.routes:
volumes.append((str(ep.routes_path.parent), str(Path(EGRESS_ROUTES_IN_CONTAINER).parent), True)) volumes.append((str(ep.routes_path.parent), str(Path(EGRESS_ROUTES_IN_CONTAINER).parent), True))
# Bare-name entries for upstream-token slots. Their values env.extend(egress_sidecar_env_entries(ep))
# come from the docker-run subprocess env (inherited from
# the operator's shell), never landing on argv.
for token_env in sorted(ep.token_env_map.keys()):
env.append(token_env)
# --- git-gate --------------------------------------------- # --- git-gate ---------------------------------------------
gp = plan.git_gate_plan gp = plan.git_gate_plan
+159 -3
View File
@@ -15,6 +15,8 @@ import gzip
import re import re
import typing import typing
import unicodedata import unicodedata
from math import log2
from collections import Counter
from urllib.parse import quote as url_quote from urllib.parse import quote as url_quote
try: try:
@@ -107,20 +109,21 @@ def redact_tokens(
text: str, text: str,
*, *,
env: typing.Mapping[str, str] | None = None, env: typing.Mapping[str, str] | None = None,
sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",),
) -> str: ) -> str:
"""Replace token pattern matches and (if env given) provisioned secrets with REDACT.""" """Replace token pattern matches and (if env given) provisioned secrets with REDACT."""
for _, pattern in TOKEN_PATTERNS: for _, pattern in TOKEN_PATTERNS:
text = pattern.sub(REDACT, text) text = pattern.sub(REDACT, text)
if env is not None: if env is not None:
for key, value in env.items(): for key, value in env.items():
if key.startswith("EGRESS_TOKEN_") and value: if any(key.startswith(p) for p in sensitive_prefixes) and value:
for variant in _encoded_variants(value): for variant in _encoded_variants(value):
text = text.replace(variant, REDACT) text = text.replace(variant, REDACT)
return text return text
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Known secrets detector (Phase 1b) # Known secrets detector
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def _encoded_variants(secret: str) -> list[str]: def _encoded_variants(secret: str) -> list[str]:
@@ -161,18 +164,65 @@ def _encoded_variants(secret: str) -> list[str]:
return variants return variants
# ---------------------------------------------------------------------------
# Fragmentation-resistant helpers
# ---------------------------------------------------------------------------
# Minimum length of alnum projection for projection-based checks to run.
# Short secrets produce too many false positives in projection space.
_ALNUM_MIN_LEN = 8
# Minimum window length for the partial-substring sliding scan.
PARTIAL_MATCH_MIN_LEN = 12
def _alnum_projection(text: str) -> str:
"""Return text with every non-alphanumeric character stripped.
Used for fragmentation-resistant matching: separator-injected secrets
(spaces, hyphens, dots inserted between characters) are identical to
their originals in alnum projection space.
"""
return "".join(c for c in text if c.isalnum())
def _find_partial_window(secret_alnum: str, text_alnum: str, min_len: int) -> int | None:
"""Return the position in text_alnum where any min_len-char window of
secret_alnum first appears, or None.
Slides a window of width min_len across secret_alnum and searches for
each window in text_alnum. The first hit position is returned.
"""
if len(secret_alnum) < min_len or len(text_alnum) < min_len:
return None
for i in range(len(secret_alnum) - min_len + 1):
window = secret_alnum[i:i + min_len]
pos = text_alnum.find(window)
if pos >= 0:
return pos
return None
def scan_known_secrets( def scan_known_secrets(
text: str, text: str,
*, *,
location: str = "body", location: str = "body",
env: typing.Mapping[str, str] | None = None, env: typing.Mapping[str, str] | None = None,
sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",),
safe_tokens: typing.AbstractSet[str] | None = None, safe_tokens: typing.AbstractSet[str] | None = None,
) -> ScanResult | None: ) -> ScanResult | None:
if env is None: if env is None:
return None return None
# Pre-compute alnum projection of the scan text once; reused per secret.
text_alnum: str | None = None
for key, value in env.items(): for key, value in env.items():
if not key.startswith("EGRESS_TOKEN_") or not value: if not any(key.startswith(p) for p in sensitive_prefixes) or not value:
continue continue
# Pass 1: exact match across encoded variants (original behaviour).
approved_exact = False
for variant in _encoded_variants(value): for variant in _encoded_variants(value):
pos = text.find(variant) pos = text.find(variant)
if pos >= 0: if pos >= 0:
@@ -180,6 +230,7 @@ def scan_known_secrets(
# (PRD 0062); a different encoding of the same secret is a # (PRD 0062); a different encoding of the same secret is a
# fresh block. # fresh block.
if safe_tokens is not None and variant in safe_tokens: if safe_tokens is not None and variant in safe_tokens:
approved_exact = True
continue continue
return ScanResult( return ScanResult(
severity="block", severity="block",
@@ -188,6 +239,104 @@ def scan_known_secrets(
context=_snippet(text, pos, pos + len(variant)), context=_snippet(text, pos, pos + len(variant)),
matched=variant, matched=variant,
) )
if approved_exact:
# Exact match was found and approved; projection passes would
# fire on the same value, so skip them for this secret.
continue
# Pass 2 & 3: fragmentation-resistant projection checks.
secret_alnum = _alnum_projection(value)
if len(secret_alnum) < _ALNUM_MIN_LEN:
continue
if text_alnum is None:
text_alnum = _alnum_projection(text)
# Pass 2: full alnum-projection exact match (catches separator injection).
pos2 = text_alnum.find(secret_alnum)
if pos2 >= 0:
return ScanResult(
severity="block",
reason=(
f"provisioned secret from {key} found in {location} "
f"(fragmented match — separator injection)"
),
location=location,
context=_snippet(text_alnum, pos2, pos2 + len(secret_alnum)),
)
# Pass 3: sliding-window partial match (catches chunked-substring leaks).
pos3 = _find_partial_window(secret_alnum, text_alnum, PARTIAL_MATCH_MIN_LEN)
if pos3 is not None:
return ScanResult(
severity="block",
reason=(
f"provisioned secret from {key} found in {location} "
f"(partial match — at least {PARTIAL_MATCH_MIN_LEN} consecutive "
f"alphanumeric chars)"
),
location=location,
context=_snippet(text_alnum, pos3, pos3 + PARTIAL_MATCH_MIN_LEN),
)
return None
# ---------------------------------------------------------------------------
# Entropy detector (warn-only)
# ---------------------------------------------------------------------------
# Sliding window size and step for the entropy scan.
ENTROPY_WINDOW = 64
ENTROPY_STEP = 32
# Bits-per-character threshold. Random ASCII printable ≈ 6.6 bits; random
# lowercase hex ≈ 4 bits; random base64url ≈ 6 bits. 5.5 sits above
# typical structured data (JSON, URLs) while staying below truly random
# content.
ENTROPY_BLOCK_THRESHOLD = 5.5
def _shannon_entropy(text: str) -> float:
if not text:
return 0.0
counts = Counter(text)
n = len(text)
return -sum((c / n) * log2(c / n) for c in counts.values())
def scan_entropy(
text: str,
*,
location: str = "body",
window: int = ENTROPY_WINDOW,
threshold: float = ENTROPY_BLOCK_THRESHOLD,
) -> ScanResult | None:
"""Warn-only detector: flag windows of `window` chars with Shannon entropy
above `threshold` bits per character.
Never blocks; always returns severity='warn'. Disabled by default —
routes must opt in via dlp.outbound_detectors=['entropy'].
"""
if not text:
return None
step = max(1, window // 2)
end = len(text)
# Scan overlapping windows; also check the final tail if shorter than window.
positions = list(range(0, end - window + 1, step))
if end < window:
positions = [0]
elif (end - window) % step != 0:
positions.append(end - window)
for i in positions:
chunk = text[i:i + window]
if _shannon_entropy(chunk) >= threshold:
return ScanResult(
severity="warn",
reason=f"high-entropy content in {location} (possible encrypted exfil)",
location=location,
context=_snippet(text, i, i + len(chunk)),
)
return None return None
@@ -306,11 +455,18 @@ def scan_crlf_injection(text: str) -> ScanResult | None:
__all__ = [ __all__ = [
"ENTROPY_BLOCK_THRESHOLD",
"ENTROPY_WINDOW",
"ENTROPY_STEP",
"PARTIAL_MATCH_MIN_LEN",
"REDACT", "REDACT",
"SNIPPET_CONTEXT", "SNIPPET_CONTEXT",
"TOKEN_PATTERNS", "TOKEN_PATTERNS",
"_alnum_projection",
"_shannon_entropy",
"redact_tokens", "redact_tokens",
"scan_crlf_injection", "scan_crlf_injection",
"scan_entropy",
"scan_known_secrets", "scan_known_secrets",
"scan_naive_injection", "scan_naive_injection",
"scan_token_patterns", "scan_token_patterns",
+55
View File
@@ -10,6 +10,7 @@ specific and lives on concrete subclasses (see
from __future__ import annotations from __future__ import annotations
import dataclasses import dataclasses
import secrets
from abc import ABC from abc import ABC
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
@@ -34,6 +35,50 @@ EGRESS_HOSTNAME = "egress"
EGRESS_ROUTES_IN_CONTAINER = "/etc/egress/routes.yaml" EGRESS_ROUTES_IN_CONTAINER = "/etc/egress/routes.yaml"
EGRESS_ROUTES_FILENAME = Path(EGRESS_ROUTES_IN_CONTAINER).name EGRESS_ROUTES_FILENAME = Path(EGRESS_ROUTES_IN_CONTAINER).name
_CANARY_ENV_WORDS = (
"ACCORD",
"ANCHOR",
"ATLAS",
"CANON",
"CIPHER",
"EMBER",
"FALCON",
"HARBOR",
"LANTERN",
"MARBLE",
"NOVA",
"ORBIT",
"PIVOT",
"RADIUS",
"SUMMIT",
"VECTOR",
)
def _random_canary_env() -> str:
first = secrets.choice(_CANARY_ENV_WORDS)
remaining = tuple(word for word in _CANARY_ENV_WORDS if word != first)
second = secrets.choice(remaining)
return f"{first}_{second}_SECRET"
def egress_sidecar_env_entries(plan: "EgressPlan") -> tuple[str, ...]:
"""Return sidecar env entries needed by egress across all backends."""
env: list[str] = []
if plan.routes:
env.extend(sorted(plan.token_env_map.keys()))
if plan.canary and plan.canary_env:
env.append(f"{plan.canary_env}={plan.canary}")
env.append(f"BOT_BOTTLE_SENSITIVE_PREFIXES={plan.canary_env}")
return tuple(env)
def egress_agent_env_entries(plan: "EgressPlan") -> tuple[str, ...]:
"""Return agent-visible egress env entries shared by all backends."""
if plan.canary and plan.canary_env:
return (f"{plan.canary_env}={plan.canary}",)
return ()
@dataclass(frozen=True) @dataclass(frozen=True)
class EgressRoute(Route): class EgressRoute(Route):
@@ -65,6 +110,8 @@ class EgressPlan:
mitmproxy_ca_host_path: Path = Path() mitmproxy_ca_host_path: Path = Path()
mitmproxy_ca_cert_only_host_path: Path = Path() mitmproxy_ca_cert_only_host_path: Path = Path()
log: int = 0 log: int = 0
canary: str = ""
canary_env: str = ""
def egress_manifest_routes( def egress_manifest_routes(
@@ -324,12 +371,18 @@ class Egress(ABC):
routes_path = stage_dir / EGRESS_ROUTES_FILENAME routes_path = stage_dir / EGRESS_ROUTES_FILENAME
routes_path.write_text(egress_render_routes(routes, log=log)) routes_path.write_text(egress_render_routes(routes, log=log))
routes_path.chmod(0o600) routes_path.chmod(0o600)
# Generate a per-session fake secret under a plausible random env name.
# The sidecar marks that exact env name as sensitive for known-secret
# scanning; the agent receives the same name/value as exfil bait.
canary = secrets.token_urlsafe(32)
return EgressPlan( return EgressPlan(
slug=slug, slug=slug,
routes_path=routes_path, routes_path=routes_path,
routes=routes, routes=routes,
token_env_map=egress_token_env_map(routes), token_env_map=egress_token_env_map(routes),
log=log, log=log,
canary=canary,
canary_env=_random_canary_env(),
) )
__all__ = [ __all__ = [
@@ -344,5 +397,7 @@ __all__ = [
"egress_render_routes", "egress_render_routes",
"egress_resolve_token_values", "egress_resolve_token_values",
"egress_routes_for_bottle", "egress_routes_for_bottle",
"egress_agent_env_entries",
"egress_sidecar_env_entries",
"egress_token_env_map", "egress_token_env_map",
] ]
+32 -3
View File
@@ -34,7 +34,7 @@ VALID_METHODS = frozenset({
"CONNECT", "CONNECT",
}) })
OUTBOUND_DETECTOR_NAMES = frozenset({"token_patterns", "known_secrets"}) OUTBOUND_DETECTOR_NAMES = frozenset({"token_patterns", "known_secrets", "entropy"})
INBOUND_DETECTOR_NAMES = frozenset({"naive_injection_detection"}) INBOUND_DETECTOR_NAMES = frozenset({"naive_injection_detection"})
# Per-route policy for what the proxy does when an outbound DLP detector # Per-route policy for what the proxy does when an outbound DLP detector
@@ -729,17 +729,28 @@ def scan_outbound(
try: try:
from dlp_detectors import ( # type: ignore[import-not-found] from dlp_detectors import ( # type: ignore[import-not-found]
scan_crlf_injection, scan_crlf_injection,
scan_entropy,
scan_known_secrets, scan_known_secrets,
scan_token_patterns, scan_token_patterns,
) )
except ImportError: # pragma: no cover - host-side path except ImportError: # pragma: no cover - host-side path
from .dlp_detectors import ( # type: ignore[import-not-found] from .dlp_detectors import ( # type: ignore[import-not-found]
scan_crlf_injection, scan_crlf_injection,
scan_entropy,
scan_known_secrets, scan_known_secrets,
scan_token_patterns, scan_token_patterns,
) )
text = body if isinstance(body, str) else body.decode("utf-8", errors="replace") # Binary bodies: latin-1 is a bijective byte↔codepoint mapping that
# preserves every byte value, so ASCII-range secret strings remain
# findable by str.find / regex. Prefer strict UTF-8 for valid text bodies.
if isinstance(body, bytes):
try:
text = body.decode("utf-8")
except UnicodeDecodeError:
text = body.decode("latin-1")
else:
text = body
# CRLF injection is only an attack in the request line + headers, never the # CRLF injection is only an attack in the request line + headers, never the
# body: an HTTP body is delimited by Content-Length, so CRLF bytes there # body: an HTTP body is delimited by Content-Length, so CRLF bytes there
@@ -758,12 +769,30 @@ def scan_outbound(
return result return result
if _detector_enabled(route.outbound_detectors, "known_secrets"): if _detector_enabled(route.outbound_detectors, "known_secrets"):
# BOT_BOTTLE_SENSITIVE_PREFIXES lets operators add extra env prefixes
# beyond EGRESS_TOKEN_* without changing the manifest schema.
extra_raw = environ.get("BOT_BOTTLE_SENSITIVE_PREFIXES", "")
extra = tuple(p for p in extra_raw.split(",") if p)
sensitive_prefixes = ("EGRESS_TOKEN_",) + extra
result = scan_known_secrets( result = scan_known_secrets(
text, location="body", env=environ, safe_tokens=safe_tokens, text, location="body", env=environ,
sensitive_prefixes=sensitive_prefixes, safe_tokens=safe_tokens,
) )
if result is not None: if result is not None:
return result return result
# Entropy scanning requires explicit opt-in: it is NOT part of the
# default "all detectors" set because it produces false positives on
# legitimate base64 / binary payloads. Routes must list "entropy" in
# dlp.outbound_detectors to enable it.
if (
route.outbound_detectors is not None
and "entropy" in route.outbound_detectors
):
result = scan_entropy(text, location="body")
if result is not None:
return result
return None return None
@@ -0,0 +1,189 @@
# PRD prd-new: Strengthen outbound exfiltration detection
- **Status:** Active
- **Author:** claude
- **Created:** 2026-06-25
- **Issue:** #259
## Summary
The current DLP system has two detection tiers: weak (vendor token regexes)
and strong (exact matching of provisioned `EGRESS_TOKEN_*` values in multiple
encodings). This PRD adds four complementary hardening layers in ranked order
of impact:
1. **Canary tokens** — inject a per-session fake secret into the agent's
environment; any outbound appearance is a zero-false-positive exfil signal.
2. **Broadened known-value scanning** — extend the strong tier beyond
`EGRESS_TOKEN_*` via a `BOT_BOTTLE_SENSITIVE_PREFIXES` sidecar env var.
3. **Fragmentation-resistant matching** — catch separator injection and
long-substring leaks via alnum projection and sliding-window substring
search.
4. **Entropy scoring** — warn-only flag on high-entropy blobs as a
complementary signal for encrypted exfil.
## Problem
After PRD 0056 the outbound scan covers all HTTP request surfaces (host, path,
query, headers, body). But the content-based strong tier only matches
`EGRESS_TOKEN_*` values and their encodings. Three gaps remain:
| Gap | How an agent exploits it |
|-----|--------------------------|
| No canary | There is no "can't be false-positive" signal |
| Narrow prefix filter | Sensitive values provisioned under other env prefixes (MCP keys, API keys injected via the agent provider) are invisible to `scan_known_secrets` |
| Fragmentation blindness | Inserting `-`, space, or `\n` between secret characters turns an exact-match into a miss |
## Goals / Success Criteria
1. Each launched bottle has a unique canary token in the agent's environment
under a randomized `WORD_WORD_SECRET` env var name. The egress sidecar gets
the same env var and registers that exact name through
`BOT_BOTTLE_SENSITIVE_PREFIXES`. Any outbound appearance of the canary
blocks the request as a known-secret match.
2. `scan_known_secrets` accepts a `sensitive_prefixes` parameter (default:
`("EGRESS_TOKEN_",)`). `scan_outbound` reads
`BOT_BOTTLE_SENSITIVE_PREFIXES` from `environ` and merges those prefixes
in, so operators can mark additional env vars as scanned values without
changing the manifest schema.
3. For every secret that passes exact-match, a secondary alnum-projection pass
checks for the secret with all non-alphanumeric characters stripped. This
catches separator-injection evasion (`MY-SECRET` → body contains
`MY SECRET`).
4. A sliding-window partial-match pass checks for long-enough contiguous
substrings of the secret's alnum projection in the text's alnum projection.
Any match ≥ `PARTIAL_MATCH_MIN_LEN` (12 chars) blocks with reason
`"partial match"`.
5. A new `scan_entropy` detector flags outbound text windows with Shannon
entropy ≥ `ENTROPY_BLOCK_THRESHOLD` (5.5 bits/char) at **warn** severity
only. It is registered under the new detector name `"entropy"` in
`OUTBOUND_DETECTOR_NAMES` and disabled by default (routes must opt in).
6. Binary request bodies are decoded via `latin-1` instead of
`utf-8 errors="replace"`, preserving every byte value and allowing
ASCII-range secrets to be found within binary payloads.
7. All new behaviour is unit-tested; existing tests pass unchanged.
## Non-goals
- Rolling per-host buffer for split-across-requests detection (state in the
stateless addon is complex; deferred).
- Additional vendor regexes.
- ML / embedding-based detection.
- Entropy-based hard blocks (warn only per the issue).
## Design
### Canary token flow
```
Egress.prepare()
canary = secrets.token_urlsafe(32)
canary_env = <random WORD_WORD_SECRET>
EgressPlan(canary=canary, canary_env=canary_env, ...)
Docker compose render:
sidecar env: <canary_env>=<canary>
sidecar env: BOT_BOTTLE_SENSITIVE_PREFIXES=<canary_env>
agent env: <canary_env>=<canary> ← visible to agent as a "secret"
macos-container launch: same literals added to sidecar + agent env entries
```
The sidecar uses `BOT_BOTTLE_SENSITIVE_PREFIXES` to make the random canary env
name part of the existing `scan_known_secrets` detector without adding a
manifest schema field.
### Broadened known-value scanning
`scan_known_secrets` gains a `sensitive_prefixes` parameter:
```python
def scan_known_secrets(
text: str,
*,
location: str = "body",
env: Mapping[str, str] | None = None,
sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",),
) -> ScanResult | None:
```
`scan_outbound` reads `BOT_BOTTLE_SENSITIVE_PREFIXES` (comma-separated list
of additional prefixes) from `environ` and appends them:
```python
extra = tuple(
p for p in environ.get("BOT_BOTTLE_SENSITIVE_PREFIXES", "").split(",") if p
)
sensitive_prefixes = ("EGRESS_TOKEN_",) + extra
```
`redact_tokens` receives the same treatment for consistent redaction.
### Fragmentation-resistant matching
A new helper `_alnum_projection(text)` strips all non-alphanumeric characters.
`scan_known_secrets` runs two passes per secret:
1. **Exact pass** — existing encoded-variant loop (unchanged).
2. **Alnum-projection pass** — if the secret's alnum projection has ≥ 8 chars,
check if it appears in the text's alnum projection. Match → block with
`"fragmented match (separator injection)"` reason.
3. **Partial-substring pass** — if the secret's alnum projection has ≥
`PARTIAL_MATCH_MIN_LEN` chars (12), slide a window of that length across the
secret's projection and look for each window in the text's alnum projection.
First match → block with `"partial match"` reason.
All three passes run only for the `"known_secrets"` detector; the token-pattern
and entropy detectors are unchanged.
### Entropy scoring
New public function:
```python
def scan_entropy(
text: str,
*,
location: str = "body",
window: int = ENTROPY_WINDOW, # 64
threshold: float = ENTROPY_BLOCK_THRESHOLD, # 5.5
) -> ScanResult | None:
```
Slides a window of `window` characters across `text` in steps of `window // 2`.
If any window's Shannon entropy exceeds `threshold`, returns a **warn**-severity
`ScanResult`. Never blocks.
`OUTBOUND_DETECTOR_NAMES` gains `"entropy"`. Routes opt in via their `dlp`
block; entropy scanning is **off by default** to avoid false-positive noise on
legitimate binary payloads.
### Binary body handling
In `scan_outbound`, the bytes → str decoding changes from:
```python
body.decode("utf-8", errors="replace")
```
to:
```python
body.decode("utf-8") if body is str else body.decode("latin-1")
```
`latin-1` is a bijective byte↔codepoint mapping; every byte value is preserved
as its corresponding Latin-1 code point, so ASCII-range secret strings remain
intact and `str.find` / regex still locate them correctly. The fallback from
strict UTF-8 is tried first so valid UTF-8 bodies are decoded faithfully.
## Implementation
Delivered in three commits on the same branch:
1. **DLP detector changes**`_alnum_projection`, fragmentation passes,
`scan_entropy`, broadened `scan_known_secrets`, updated `scan_outbound` and
`redact_tokens`; all accompanying unit tests.
2. **Canary injection**`EgressPlan.canary`, `Egress.prepare()`,
Docker compose + macos-container backend injection.
3. **PRD flip**`Status: Draft → Active`.
+23 -2
View File
@@ -80,7 +80,11 @@ def _git_gate_plan(upstreams: tuple[GitGateUpstream, ...] = ()) -> GitGatePlan:
) )
def _egress_plan(routes: tuple[EgressRoute, ...] = ()) -> EgressPlan: def _egress_plan(
routes: tuple[EgressRoute, ...] = (),
*,
canary: bool = False,
) -> EgressPlan:
token_env_map = { token_env_map = {
r.token_env: r.token_ref r.token_env: r.token_ref
for r in routes for r in routes
@@ -95,6 +99,8 @@ def _egress_plan(routes: tuple[EgressRoute, ...] = ()) -> EgressPlan:
egress_network=f"bot-bottle-egress-{SLUG}", egress_network=f"bot-bottle-egress-{SLUG}",
mitmproxy_ca_host_path=STATE / "egress-ca" / "mitmproxy-ca.pem", mitmproxy_ca_host_path=STATE / "egress-ca" / "mitmproxy-ca.pem",
mitmproxy_ca_cert_only_host_path=STATE / "egress-ca" / "ca.pem", mitmproxy_ca_cert_only_host_path=STATE / "egress-ca" / "ca.pem",
canary="fake-canary-value" if canary else "",
canary_env="CANON_ALPHA_SECRET" if canary else "",
) )
@@ -112,6 +118,7 @@ def _plan(
with_git: bool = False, with_git: bool = False,
with_egress: bool = False, with_egress: bool = False,
supervise: bool = False, supervise: bool = False,
canary: bool = False,
) -> DockerBottlePlan: ) -> DockerBottlePlan:
"""Build a fully-resolved DockerBottlePlan. Toggles cover the """Build a fully-resolved DockerBottlePlan. Toggles cover the
matrix the renderer's conditional-service logic branches on.""" matrix the renderer's conditional-service logic branches on."""
@@ -150,7 +157,7 @@ def _plan(
slug=SLUG, slug=SLUG,
forwarded_env={"CLAUDE_CODE_OAUTH_TOKEN": "x"}, forwarded_env={"CLAUDE_CODE_OAUTH_TOKEN": "x"},
git_gate_plan=_git_gate_plan(upstreams), git_gate_plan=_git_gate_plan(upstreams),
egress_plan=_egress_plan(routes), egress_plan=_egress_plan(routes, canary=canary),
supervise_plan=_supervise_plan() if supervise else None, supervise_plan=_supervise_plan() if supervise else None,
use_runsc=False, use_runsc=False,
agent_provision=AgentProvisionPlan( agent_provision=AgentProvisionPlan(
@@ -375,6 +382,20 @@ class TestSidecarBundleShape(unittest.TestCase):
env_strings = sc["environment"] env_strings = sc["environment"]
self.assertNotIn("EGRESS_TOKEN_0", env_strings) self.assertNotIn("EGRESS_TOKEN_0", env_strings)
def test_canary_env_registered_as_sensitive_in_sidecar(self):
sc = self._render(canary=True)["services"]["sidecars"]
env_strings = sc["environment"]
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", env_strings)
self.assertIn(
"BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET",
env_strings,
)
def test_canary_env_visible_to_agent(self):
agent = self._render(canary=True)["services"]["agent"]
env_strings = agent["environment"]
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", env_strings)
def test_supervise_env_present_when_active(self): def test_supervise_env_present_when_active(self):
sc = self._render(supervise=True)["services"]["sidecars"] sc = self._render(supervise=True)["services"]["sidecars"]
env_strings = sc["environment"] env_strings = sc["environment"]
+192 -2
View File
@@ -1,18 +1,23 @@
"""Unit: DLP detectors (PRD 0053). """Unit: DLP detectors (PRD 0053).
Tests for token pattern scanning, known secret detection, and Tests for token pattern scanning, known secret detection, fragmentation-
naive prompt injection detection.""" resistant matching, entropy scoring, and naive prompt injection detection."""
import base64 import base64
import gzip import gzip
import unittest import unittest
from bot_bottle.dlp_detectors import ( from bot_bottle.dlp_detectors import (
ENTROPY_BLOCK_THRESHOLD,
PARTIAL_MATCH_MIN_LEN,
REDACT, REDACT,
_alnum_projection,
_encoded_variants, _encoded_variants,
_normalize_text, _normalize_text,
_shannon_entropy,
redact_tokens, redact_tokens,
scan_crlf_injection, scan_crlf_injection,
scan_entropy,
scan_known_secrets, scan_known_secrets,
scan_naive_injection, scan_naive_injection,
scan_token_patterns, scan_token_patterns,
@@ -502,6 +507,191 @@ class TestStripCrlf(unittest.TestCase):
from bot_bottle.dlp_detectors import strip_crlf from bot_bottle.dlp_detectors import strip_crlf
self.assertEqual("/api/v1/data?q=hello", strip_crlf("/api/v1/data?q=hello")) self.assertEqual("/api/v1/data?q=hello", strip_crlf("/api/v1/data?q=hello"))
class TestAlnumProjection(unittest.TestCase):
def test_alphanumeric_unchanged(self):
self.assertEqual("abc123XYZ", _alnum_projection("abc123XYZ"))
def test_strips_hyphens(self):
self.assertEqual("mysecretvalue", _alnum_projection("my-secret-value"))
def test_strips_spaces(self):
self.assertEqual("mysecretvalue", _alnum_projection("my secret value"))
def test_strips_dots_and_underscores(self):
self.assertEqual("mysecretvalue", _alnum_projection("my.secret_value"))
def test_empty_string(self):
self.assertEqual("", _alnum_projection(""))
def test_all_special_chars(self):
self.assertEqual("", _alnum_projection("!@#$%^&*()"))
class TestFragmentationResistantMatching(unittest.TestCase):
"""scan_known_secrets catches separator-injection and partial-substring evasion."""
# Secrets long enough that their alnum projections are ≥ 8 chars.
SECRET = "supersecrettoken99"
ENV = {"EGRESS_TOKEN_0": SECRET}
def test_exact_match_still_works(self):
result = scan_known_secrets(f"key={self.SECRET}", env=self.ENV)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_separator_injection_blocked(self):
# Hyphens inserted between chars of the secret.
fragmented = "-".join(self.SECRET)
result = scan_known_secrets(f"data={fragmented}", env=self.ENV)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
self.assertIn("separator injection", result.reason)
def test_space_separator_blocked(self):
fragmented = " ".join(self.SECRET)
result = scan_known_secrets(f"body: {fragmented}", env=self.ENV)
self.assertIsNotNone(result)
assert result is not None
self.assertIn("separator injection", result.reason)
def test_partial_substring_blocked(self):
# First PARTIAL_MATCH_MIN_LEN alnum chars of the secret, no separators.
partial = _alnum_projection(self.SECRET)[:PARTIAL_MATCH_MIN_LEN]
result = scan_known_secrets(f"x={partial}&y=other", env=self.ENV)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
self.assertIn("partial match", result.reason)
def test_short_secret_skips_projection(self):
# Secrets shorter than _ALNUM_MIN_LEN in alnum projection are not
# fragmentation-checked (too many false positives).
short_env = {"EGRESS_TOKEN_0": "abc"}
# "a b c" has alnum projection "abc" (3 chars, < 8); should not block.
self.assertIsNone(scan_known_secrets("a b c", env=short_env))
def test_clean_text_not_blocked(self):
self.assertIsNone(scan_known_secrets("nothing to see here", env=self.ENV))
def test_sensitive_prefixes_param_extra_prefix(self):
env = {"MY_CRED_0": self.SECRET, "IGNORED": "other"}
result = scan_known_secrets(
f"key={self.SECRET}",
env=env,
sensitive_prefixes=("MY_CRED_",),
)
self.assertIsNotNone(result)
assert result is not None
self.assertIn("MY_CRED_0", result.reason)
def test_sensitive_prefixes_default_only_egress_token(self):
# A value under a non-EGRESS_TOKEN_ key is ignored with default prefixes.
env = {"MY_CRED_0": self.SECRET}
self.assertIsNone(scan_known_secrets(f"key={self.SECRET}", env=env))
def test_canary_prefix_detected(self):
canary_value = "canary-fake-secret-value-xyz"
env = {"CANON_ALPHA_SECRET": canary_value}
result = scan_known_secrets(
f"x={canary_value}",
env=env,
sensitive_prefixes=("CANON_ALPHA_SECRET",),
)
self.assertIsNotNone(result)
assert result is not None
self.assertIn("CANON_ALPHA_SECRET", result.reason)
class TestRedactTokensBroadenedPrefixes(unittest.TestCase):
SECRET = "my-provisioned-secret"
def test_default_redacts_egress_token(self):
env = {"EGRESS_TOKEN_0": self.SECRET}
out = redact_tokens(f"val={self.SECRET}", env=env)
self.assertNotIn(self.SECRET, out)
self.assertIn(REDACT, out)
def test_extra_prefix_redacted(self):
env = {"MY_SECRET_KEY": self.SECRET}
out = redact_tokens(
f"val={self.SECRET}",
env=env,
sensitive_prefixes=("MY_SECRET_",),
)
self.assertNotIn(self.SECRET, out)
self.assertIn(REDACT, out)
def test_non_matching_prefix_not_redacted(self):
env = {"MY_SECRET_KEY": self.SECRET}
out = redact_tokens(f"val={self.SECRET}", env=env)
# Default prefixes only include EGRESS_TOKEN_ → secret not redacted
self.assertIn(self.SECRET, out)
class TestShannonEntropy(unittest.TestCase):
def test_empty_string_zero(self):
self.assertEqual(0.0, _shannon_entropy(""))
def test_single_char_zero(self):
self.assertEqual(0.0, _shannon_entropy("aaaaaa"))
def test_two_equal_chars_one_bit(self):
self.assertAlmostEqual(1.0, _shannon_entropy("abababab"), places=10)
def test_high_entropy_random_like(self):
# Uniform 64-char string over 64 distinct symbols has entropy 6 bits.
import string
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
text = alphabet # each char appears exactly once
self.assertAlmostEqual(6.0, _shannon_entropy(text), places=10)
class TestScanEntropy(unittest.TestCase):
def test_empty_returns_none(self):
self.assertIsNone(scan_entropy(""))
def test_low_entropy_returns_none(self):
# Highly repetitive text has low entropy.
self.assertIsNone(scan_entropy("a" * 200))
def test_high_entropy_warns(self):
# Build a 64-char string with entropy > ENTROPY_BLOCK_THRESHOLD.
# Use all 64 distinct printable chars to maximise entropy (~6 bits).
import string
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
result = scan_entropy(alphabet, threshold=ENTROPY_BLOCK_THRESHOLD)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("warn", result.severity)
self.assertIn("high-entropy", result.reason)
def test_never_blocks(self):
import string
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
result = scan_entropy(alphabet)
# scan_entropy is warn-only; it must never return severity="block".
if result is not None:
self.assertNotEqual("block", result.severity)
def test_location_in_result(self):
import string
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
result = scan_entropy(alphabet, location="authorization header")
if result is not None:
self.assertIn("authorization header", result.location)
def test_structured_json_no_warn(self):
# Typical JSON has low entropy and should not be flagged.
json_body = '{"status": "ok", "message": "hello world", "count": 42}'
self.assertIsNone(scan_entropy(json_body))
def test_short_text_below_window(self):
# Text shorter than the window: checked as one chunk.
# Use a uniform string to ensure it won't be flagged.
self.assertIsNone(scan_entropy("abcde", threshold=ENTROPY_BLOCK_THRESHOLD))
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
+120
View File
@@ -1,15 +1,21 @@
"""Unit: Egress route lift + routes.yaml render + token """Unit: Egress route lift + routes.yaml render + token
resolution (PRD 0017, PRD 0053).""" resolution (PRD 0017, PRD 0053)."""
import tempfile
import unittest import unittest
from pathlib import Path
from bot_bottle.egress import ( from bot_bottle.egress import (
CODEX_HOST_CREDENTIAL_TOKEN_REF, CODEX_HOST_CREDENTIAL_TOKEN_REF,
Egress,
EgressPlan,
EgressRoute, EgressRoute,
egress_agent_env_entries,
egress_manifest_routes, egress_manifest_routes,
egress_render_routes, egress_render_routes,
egress_resolve_token_values, egress_resolve_token_values,
egress_routes_for_bottle, egress_routes_for_bottle,
egress_sidecar_env_entries,
egress_token_env_map, egress_token_env_map,
) )
from bot_bottle.log import Die from bot_bottle.log import Die
@@ -443,5 +449,119 @@ class TestResolveTokenValues(unittest.TestCase):
self.assertEqual({"EGRESS_TOKEN_0": "codex-access-token"}, out) self.assertEqual({"EGRESS_TOKEN_0": "codex-access-token"}, out)
class TestCanaryGeneration(unittest.TestCase):
"""Egress.prepare() generates a unique canary token per session."""
def _bottle_obj(self):
return ManifestIndex.from_json_obj({
"bottles": {"dev": {"egress": {"routes": []}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
}).bottles["dev"]
def _make_plan(self) -> EgressPlan:
# Use a concrete no-op subclass so we can call prepare() without
# a real backend.
class _TestEgress(Egress):
pass
e = _TestEgress()
with tempfile.TemporaryDirectory() as td:
return e.prepare(self._bottle_obj(), "test-slug", Path(td))
def test_canary_is_non_empty(self):
plan = self._make_plan()
self.assertIsInstance(plan.canary, str)
self.assertGreater(len(plan.canary), 0)
self.assertRegex(plan.canary_env, r"^[A-Z]+_[A-Z]+_SECRET$")
def test_canary_is_unique_per_session(self):
with tempfile.TemporaryDirectory() as td:
bottle = self._bottle_obj()
class _TestEgress(Egress):
pass
e = _TestEgress()
plan_a = e.prepare(bottle, "slug-a", Path(td))
plan_b = e.prepare(bottle, "slug-b", Path(td))
self.assertNotEqual(plan_a.canary, plan_b.canary)
def test_canary_detected_by_scan_known_secrets(self):
from bot_bottle.dlp_detectors import scan_known_secrets
plan = self._make_plan()
env = {plan.canary_env: plan.canary}
result = scan_known_secrets(
f"exfil={plan.canary}",
env=env,
sensitive_prefixes=(plan.canary_env,),
)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
self.assertIn(plan.canary_env, result.reason)
def test_egress_plan_canary_field_default_empty(self):
# Verify EgressPlan can be constructed with an empty canary (backward compat).
from pathlib import Path
plan = EgressPlan(
slug="s",
routes_path=Path("/tmp/r.yaml"),
routes=(),
token_env_map={},
)
self.assertEqual("", plan.canary)
self.assertEqual("", plan.canary_env)
class TestEgressEnvEntries(unittest.TestCase):
def test_sidecar_entries_include_route_tokens_and_canary_scan_prefix(self):
plan = EgressPlan(
slug="s",
routes_path=Path("/tmp/r.yaml"),
routes=(EgressRoute(host="api.example"),),
token_env_map={"EGRESS_TOKEN_1": "T1", "EGRESS_TOKEN_0": "T0"},
canary="fake-canary-value",
canary_env="CANON_ALPHA_SECRET",
)
self.assertEqual(
(
"EGRESS_TOKEN_0",
"EGRESS_TOKEN_1",
"CANON_ALPHA_SECRET=fake-canary-value",
"BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET",
),
egress_sidecar_env_entries(plan),
)
def test_agent_entries_include_only_canary_bait(self):
plan = EgressPlan(
slug="s",
routes_path=Path("/tmp/r.yaml"),
routes=(),
token_env_map={},
canary="fake-canary-value",
canary_env="CANON_ALPHA_SECRET",
)
self.assertEqual(
("CANON_ALPHA_SECRET=fake-canary-value",),
egress_agent_env_entries(plan),
)
def test_canary_entries_omitted_when_name_missing(self):
plan = EgressPlan(
slug="s",
routes_path=Path("/tmp/r.yaml"),
routes=(),
token_env_map={},
canary="fake-canary-value",
)
self.assertEqual((), egress_sidecar_env_entries(plan))
self.assertEqual((), egress_agent_env_entries(plan))
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
+103
View File
@@ -1273,6 +1273,109 @@ class TestBuildTokenAllowPayload(unittest.TestCase):
result = ScanResult(severity="block", reason="r", matched="x") result = ScanResult(severity="block", reason="r", matched="x")
payload = build_token_allow_payload("h", "GET", "/", result) payload = build_token_allow_payload("h", "GET", "/", result)
self.assertNotIn("context:", payload) self.assertNotIn("context:", payload)
class TestScanOutboundEnhanced(unittest.TestCase):
"""scan_outbound changes: binary decode, entropy detector,
broadened known-value prefixes, fragmentation resistance."""
_ROUTE = Route(host="api.example.com")
_ROUTE_ENTROPY = Route(
host="api.example.com",
outbound_detectors=("entropy",),
)
def test_binary_body_latin1_decode_finds_ascii_secret(self):
# Body contains valid ASCII secret surrounded by non-UTF-8 bytes.
secret = "supersecrettoken99"
env = {"EGRESS_TOKEN_0": secret}
# Wrap the secret in bytes that are invalid UTF-8.
body = b"\x80\x81" + secret.encode("ascii") + b"\xff"
result = scan_outbound(self._ROUTE, body, env)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_binary_body_valid_utf8_decoded_correctly(self):
env = {"EGRESS_TOKEN_0": "mysecret"}
# Valid UTF-8 body — should be decoded as UTF-8, not latin-1.
body = "clean body with mysecret".encode("utf-8")
result = scan_outbound(self._ROUTE, body, env)
self.assertIsNotNone(result)
def test_entropy_detector_off_by_default(self):
import string
# High-entropy content should NOT warn if the route has no entropy detector.
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
result = scan_outbound(self._ROUTE, alphabet, {})
self.assertIsNone(result)
def test_entropy_detector_warns_when_enabled(self):
import string
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
result = scan_outbound(self._ROUTE_ENTROPY, alphabet, {})
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("warn", result.severity)
def test_bot_bottle_sensitive_prefixes_env_var(self):
# When the sidecar env contains BOT_BOTTLE_SENSITIVE_PREFIXES,
# scan_outbound should scan those additional prefixes.
secret = "extra-sensitive-value-abc"
env = {
"MY_CRED_KEY": secret,
"BOT_BOTTLE_SENSITIVE_PREFIXES": "MY_CRED_",
}
result = scan_outbound(self._ROUTE, f"x={secret}", env)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_bot_bottle_sensitive_prefixes_multiple(self):
secret = "my-api-key-value-xyz"
env = {
"ANTHROPIC_API_0": secret,
"BOT_BOTTLE_SENSITIVE_PREFIXES": "ANTHROPIC_API_,OTHER_",
}
result = scan_outbound(self._ROUTE, f"auth={secret}", env)
self.assertIsNotNone(result)
def test_canary_detected_via_random_secret_env_name(self):
# The fake secret uses a randomized env name that the sidecar marks
# as sensitive through BOT_BOTTLE_SENSITIVE_PREFIXES.
canary = "canaryvalue12345abcdef"
env = {
"CANON_ALPHA_SECRET": canary,
"BOT_BOTTLE_SENSITIVE_PREFIXES": "CANON_ALPHA_SECRET",
}
result = scan_outbound(self._ROUTE, f"data={canary}", env)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
self.assertIn("CANON_ALPHA_SECRET", result.reason)
def test_fragmented_canary_blocked(self):
# Canary with separators injected is still caught.
canary = "supersecretcanary99"
env = {
"CANON_ALPHA_SECRET": canary,
"BOT_BOTTLE_SENSITIVE_PREFIXES": "CANON_ALPHA_SECRET",
}
fragmented = "-".join(canary)
result = scan_outbound(self._ROUTE, f"x={fragmented}", env)
self.assertIsNotNone(result)
class TestOutboundDetectorNames(unittest.TestCase):
def test_entropy_in_outbound_detector_names(self):
from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES
self.assertIn("entropy", OUTBOUND_DETECTOR_NAMES)
def test_known_secrets_in_outbound_detector_names(self):
from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES
self.assertIn("known_secrets", OUTBOUND_DETECTOR_NAMES)
def test_token_patterns_in_outbound_detector_names(self):
from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES
self.assertIn("token_patterns", OUTBOUND_DETECTOR_NAMES)
if __name__ == "__main__": if __name__ == "__main__":
+24 -1
View File
@@ -30,6 +30,7 @@ def _plan(
supervise: bool = False, supervise: bool = False,
agent_git_gate_url: str = "", agent_git_gate_url: str = "",
agent_supervise_url: str = "", agent_supervise_url: str = "",
canary: bool = False,
) -> MacosContainerBottlePlan: ) -> MacosContainerBottlePlan:
routes_path = stage_dir / "routes.yaml" routes_path = stage_dir / "routes.yaml"
routes_path.write_text("routes: []\n", encoding="utf-8") routes_path.write_text("routes: []\n", encoding="utf-8")
@@ -42,6 +43,8 @@ def _plan(
routes_path=routes_path, routes_path=routes_path,
routes=("route",), routes=("route",),
token_env_map={"EGRESS_TOKEN_0": "HOST_TOKEN"}, token_env_map={"EGRESS_TOKEN_0": "HOST_TOKEN"},
canary="fake-canary-value" if canary else "",
canary_env="CANON_ALPHA_SECRET" if canary else "",
) )
if git: if git:
key_path = stage_dir / "origin-key" key_path = stage_dir / "origin-key"
@@ -138,6 +141,26 @@ class TestMacosContainerLaunchArgv(unittest.TestCase):
argv, argv,
) )
def test_sidecar_argv_registers_canary_env_as_sensitive(self):
plan = _plan(stage_dir=self.stage_dir, canary=True)
argv = launch._sidecar_run_argv(
plan,
"bot-bottle-sidecars-dev-abc",
"bot-bottle-net-dev-abc",
"bot-bottle-egress-dev-abc",
)
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", argv)
self.assertIn("BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET", argv)
def test_agent_argv_receives_canary_env(self):
plan = _plan(stage_dir=self.stage_dir, canary=True)
argv = launch._agent_run_argv(
plan,
"bot-bottle-net-dev-abc",
"192.0.2.10",
)
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", argv)
def test_agent_env_points_proxy_at_sidecar_ip(self): def test_agent_env_points_proxy_at_sidecar_ip(self):
plan = _plan( plan = _plan(
stage_dir=self.stage_dir, stage_dir=self.stage_dir,
@@ -271,7 +294,7 @@ def _build_plan(stage_dir: Path) -> MacosContainerBottlePlan:
manifest=_MANIFEST, manifest=_MANIFEST,
stage_dir=stage_dir, stage_dir=stage_dir,
git_gate_plan=cast(GitGatePlan, SimpleNamespace(upstreams=())), git_gate_plan=cast(GitGatePlan, SimpleNamespace(upstreams=())),
egress_plan=cast(EgressPlan, SimpleNamespace()), egress_plan=cast(EgressPlan, SimpleNamespace(canary="")),
supervise_plan=None, supervise_plan=None,
agent_provision=AgentProvisionPlan( agent_provision=AgentProvisionPlan(
template="claude", template="claude",
+29 -3
View File
@@ -26,9 +26,7 @@ from bot_bottle.backend.smolmachines.bottle import SmolmachinesBottle
from bot_bottle.backend.smolmachines.bottle_plan import ( from bot_bottle.backend.smolmachines.bottle_plan import (
SmolmachinesBottlePlan, SmolmachinesBottlePlan,
) )
# from bot_bottle.backend.smolmachines.provision import ( from bot_bottle.backend.smolmachines import launch as _launch
# workspace as _workspace,
# )
from bot_bottle.backend.smolmachines.launch import _bundle_launch_spec from bot_bottle.backend.smolmachines.launch import _bundle_launch_spec
from bot_bottle.backend.util import AGENT_CA_PATH from bot_bottle.backend.util import AGENT_CA_PATH
from bot_bottle.egress import EgressPlan, EgressRoute from bot_bottle.egress import EgressPlan, EgressRoute
@@ -86,6 +84,7 @@ def _plan(
stage_dir: Path | None = None, stage_dir: Path | None = None,
egress_routes: tuple[EgressRoute, ...] = (), egress_routes: tuple[EgressRoute, ...] = (),
egress_ca_path: Path = Path(), egress_ca_path: Path = Path(),
canary: bool = False,
supervise: bool = False, supervise: bool = False,
bundle_ip: str = "192.168.50.2", bundle_ip: str = "192.168.50.2",
agent_git_gate_host: str = "127.0.0.1:55555", agent_git_gate_host: str = "127.0.0.1:55555",
@@ -156,6 +155,8 @@ def _plan(
routes=egress_routes, routes=egress_routes,
token_env_map={}, token_env_map={},
mitmproxy_ca_cert_only_host_path=egress_ca_path, mitmproxy_ca_cert_only_host_path=egress_ca_path,
canary="fake-canary-value" if canary else "",
canary_env="CANON_ALPHA_SECRET" if canary else "",
), ),
supervise_plan=supervise_plan, supervise_plan=supervise_plan,
agent_git_gate_host=agent_git_gate_host, agent_git_gate_host=agent_git_gate_host,
@@ -411,6 +412,31 @@ class TestBundleLaunchSpec(unittest.TestCase):
self.assertIn(9420, spec.ports_to_publish) self.assertIn(9420, spec.ports_to_publish)
self.assertNotIn(9418, spec.ports_to_publish) self.assertNotIn(9418, spec.ports_to_publish)
def test_canary_env_registered_as_sensitive_in_bundle(self):
plan = _plan(canary=True)
spec = _bundle_launch_spec(plan, "net", "127.0.0.16")
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", spec.environment)
self.assertIn(
"BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET",
spec.environment,
)
def test_canary_env_visible_to_smolvm_guest(self):
plan = _plan(canary=True)
with patch.object(
_launch._bundle,
"bundle_host_port",
return_value="65000",
):
stamped = _launch._discover_urls(plan, "127.0.0.16")
self.assertEqual(
"fake-canary-value",
stamped.guest_env["CANON_ALPHA_SECRET"],
)
class TestProvisionGitUser(unittest.TestCase): class TestProvisionGitUser(unittest.TestCase):
"""`provision_git` runs `git config --global` inside the """`provision_git` runs `git config --global` inside the