Compare commits
12 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| f5fdc0ea72 | |||
| ca1f14b855 | |||
| d9a9eef276 | |||
| 5204b98777 | |||
| 14ae89580a | |||
| 4808ef557a | |||
| 0a7e166b35 | |||
| a920203730 | |||
| e02fab15d0 | |||
| 11cf12188d | |||
| 701df6cb2f | |||
| ea6bc5a170 |
@@ -28,6 +28,8 @@ from typing import Any
|
|||||||
from ...egress import (
|
from ...egress import (
|
||||||
EGRESS_HOSTNAME,
|
EGRESS_HOSTNAME,
|
||||||
EGRESS_ROUTES_IN_CONTAINER,
|
EGRESS_ROUTES_IN_CONTAINER,
|
||||||
|
egress_agent_env_entries,
|
||||||
|
egress_sidecar_env_entries,
|
||||||
)
|
)
|
||||||
from ...git_gate import GIT_GATE_HOSTNAME
|
from ...git_gate import GIT_GATE_HOSTNAME
|
||||||
from ...log import die, warn
|
from ...log import die, warn
|
||||||
@@ -135,8 +137,7 @@ def _sidecar_bundle_service(plan: DockerBottlePlan) -> dict[str, Any]:
|
|||||||
volumes.append(_bind(ep.mitmproxy_ca_host_path, EGRESS_CA_IN_CONTAINER))
|
volumes.append(_bind(ep.mitmproxy_ca_host_path, EGRESS_CA_IN_CONTAINER))
|
||||||
if ep.routes:
|
if ep.routes:
|
||||||
volumes.append(_bind(ep.routes_path.parent, str(Path(EGRESS_ROUTES_IN_CONTAINER).parent)))
|
volumes.append(_bind(ep.routes_path.parent, str(Path(EGRESS_ROUTES_IN_CONTAINER).parent)))
|
||||||
for token_env in sorted(ep.token_env_map.keys()):
|
env.extend(egress_sidecar_env_entries(ep))
|
||||||
env.append(token_env)
|
|
||||||
|
|
||||||
# --- git-gate -----------------------------------------------------
|
# --- git-gate -----------------------------------------------------
|
||||||
gp = plan.git_gate_plan
|
gp = plan.git_gate_plan
|
||||||
@@ -220,6 +221,7 @@ def _agent_service(plan: DockerBottlePlan) -> dict[str, Any]:
|
|||||||
# never lands on argv or in the compose file.
|
# never lands on argv or in the compose file.
|
||||||
for name in sorted(plan.forwarded_env.keys()):
|
for name in sorted(plan.forwarded_env.keys()):
|
||||||
env.append(name)
|
env.append(name)
|
||||||
|
env.extend(egress_agent_env_entries(plan.egress_plan))
|
||||||
|
|
||||||
service: dict[str, Any] = {
|
service: dict[str, Any] = {
|
||||||
"image": plan.image,
|
"image": plan.image,
|
||||||
|
|||||||
@@ -22,7 +22,12 @@ from ...bottle_state import (
|
|||||||
git_gate_state_dir,
|
git_gate_state_dir,
|
||||||
read_committed_image,
|
read_committed_image,
|
||||||
)
|
)
|
||||||
from ...egress import EGRESS_ROUTES_IN_CONTAINER, egress_resolve_token_values
|
from ...egress import (
|
||||||
|
EGRESS_ROUTES_IN_CONTAINER,
|
||||||
|
egress_agent_env_entries,
|
||||||
|
egress_resolve_token_values,
|
||||||
|
egress_sidecar_env_entries,
|
||||||
|
)
|
||||||
from ...git_gate import revoke_git_gate_provisioned_keys
|
from ...git_gate import revoke_git_gate_provisioned_keys
|
||||||
from ...log import die, info, warn
|
from ...log import die, info, warn
|
||||||
from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT
|
from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT
|
||||||
@@ -350,9 +355,7 @@ def _sidecar_daemons(plan: MacosContainerBottlePlan) -> tuple[str, ...]:
|
|||||||
|
|
||||||
|
|
||||||
def _sidecar_env_entries(plan: MacosContainerBottlePlan) -> tuple[str, ...]:
|
def _sidecar_env_entries(plan: MacosContainerBottlePlan) -> tuple[str, ...]:
|
||||||
env: list[str] = []
|
env: list[str] = list(egress_sidecar_env_entries(plan.egress_plan))
|
||||||
if plan.egress_plan.routes:
|
|
||||||
env.extend(sorted(plan.egress_plan.token_env_map.keys()))
|
|
||||||
if plan.git_gate_plan.upstreams:
|
if plan.git_gate_plan.upstreams:
|
||||||
env.append(f"BOT_BOTTLE_GIT_GATE_READY_FILE={_GIT_GATE_READY_FILE}")
|
env.append(f"BOT_BOTTLE_GIT_GATE_READY_FILE={_GIT_GATE_READY_FILE}")
|
||||||
if plan.supervise_plan is not None:
|
if plan.supervise_plan is not None:
|
||||||
@@ -420,6 +423,7 @@ def _agent_env_entries(
|
|||||||
env.append(f"{name}={value}")
|
env.append(f"{name}={value}")
|
||||||
for name in sorted(plan.forwarded_env.keys()):
|
for name in sorted(plan.forwarded_env.keys()):
|
||||||
env.append(name)
|
env.append(name)
|
||||||
|
env.extend(egress_agent_env_entries(plan.egress_plan))
|
||||||
return tuple(env)
|
return tuple(env)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -23,7 +23,9 @@ from typing import Callable, Generator
|
|||||||
|
|
||||||
from ...egress import (
|
from ...egress import (
|
||||||
EGRESS_ROUTES_IN_CONTAINER,
|
EGRESS_ROUTES_IN_CONTAINER,
|
||||||
|
egress_agent_env_entries,
|
||||||
egress_resolve_token_values,
|
egress_resolve_token_values,
|
||||||
|
egress_sidecar_env_entries,
|
||||||
)
|
)
|
||||||
from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT
|
from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT
|
||||||
from ...util import expand_tilde
|
from ...util import expand_tilde
|
||||||
@@ -228,6 +230,9 @@ def _discover_urls(
|
|||||||
guest_env["GIT_GATE_URL"] = f"http://{agent_git_gate_host}"
|
guest_env["GIT_GATE_URL"] = f"http://{agent_git_gate_host}"
|
||||||
if agent_supervise_url:
|
if agent_supervise_url:
|
||||||
guest_env["MCP_SUPERVISE_URL"] = agent_supervise_url
|
guest_env["MCP_SUPERVISE_URL"] = agent_supervise_url
|
||||||
|
for entry in egress_agent_env_entries(plan.egress_plan):
|
||||||
|
name, value = entry.split("=", 1)
|
||||||
|
guest_env[name] = value
|
||||||
|
|
||||||
return dataclasses.replace(
|
return dataclasses.replace(
|
||||||
plan,
|
plan,
|
||||||
@@ -316,11 +321,7 @@ def _bundle_launch_spec(
|
|||||||
volumes.append((str(ep.mitmproxy_ca_host_path), EGRESS_CA_IN_CONTAINER, True))
|
volumes.append((str(ep.mitmproxy_ca_host_path), EGRESS_CA_IN_CONTAINER, True))
|
||||||
if ep.routes:
|
if ep.routes:
|
||||||
volumes.append((str(ep.routes_path.parent), str(Path(EGRESS_ROUTES_IN_CONTAINER).parent), True))
|
volumes.append((str(ep.routes_path.parent), str(Path(EGRESS_ROUTES_IN_CONTAINER).parent), True))
|
||||||
# Bare-name entries for upstream-token slots. Their values
|
env.extend(egress_sidecar_env_entries(ep))
|
||||||
# come from the docker-run subprocess env (inherited from
|
|
||||||
# the operator's shell), never landing on argv.
|
|
||||||
for token_env in sorted(ep.token_env_map.keys()):
|
|
||||||
env.append(token_env)
|
|
||||||
|
|
||||||
# --- git-gate ---------------------------------------------
|
# --- git-gate ---------------------------------------------
|
||||||
gp = plan.git_gate_plan
|
gp = plan.git_gate_plan
|
||||||
|
|||||||
+159
-3
@@ -15,6 +15,8 @@ import gzip
|
|||||||
import re
|
import re
|
||||||
import typing
|
import typing
|
||||||
import unicodedata
|
import unicodedata
|
||||||
|
from math import log2
|
||||||
|
from collections import Counter
|
||||||
from urllib.parse import quote as url_quote
|
from urllib.parse import quote as url_quote
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -107,20 +109,21 @@ def redact_tokens(
|
|||||||
text: str,
|
text: str,
|
||||||
*,
|
*,
|
||||||
env: typing.Mapping[str, str] | None = None,
|
env: typing.Mapping[str, str] | None = None,
|
||||||
|
sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",),
|
||||||
) -> str:
|
) -> str:
|
||||||
"""Replace token pattern matches and (if env given) provisioned secrets with REDACT."""
|
"""Replace token pattern matches and (if env given) provisioned secrets with REDACT."""
|
||||||
for _, pattern in TOKEN_PATTERNS:
|
for _, pattern in TOKEN_PATTERNS:
|
||||||
text = pattern.sub(REDACT, text)
|
text = pattern.sub(REDACT, text)
|
||||||
if env is not None:
|
if env is not None:
|
||||||
for key, value in env.items():
|
for key, value in env.items():
|
||||||
if key.startswith("EGRESS_TOKEN_") and value:
|
if any(key.startswith(p) for p in sensitive_prefixes) and value:
|
||||||
for variant in _encoded_variants(value):
|
for variant in _encoded_variants(value):
|
||||||
text = text.replace(variant, REDACT)
|
text = text.replace(variant, REDACT)
|
||||||
return text
|
return text
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Known secrets detector (Phase 1b)
|
# Known secrets detector
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
def _encoded_variants(secret: str) -> list[str]:
|
def _encoded_variants(secret: str) -> list[str]:
|
||||||
@@ -161,18 +164,65 @@ def _encoded_variants(secret: str) -> list[str]:
|
|||||||
return variants
|
return variants
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Fragmentation-resistant helpers
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
# Minimum length of alnum projection for projection-based checks to run.
|
||||||
|
# Short secrets produce too many false positives in projection space.
|
||||||
|
_ALNUM_MIN_LEN = 8
|
||||||
|
|
||||||
|
# Minimum window length for the partial-substring sliding scan.
|
||||||
|
PARTIAL_MATCH_MIN_LEN = 12
|
||||||
|
|
||||||
|
|
||||||
|
def _alnum_projection(text: str) -> str:
|
||||||
|
"""Return text with every non-alphanumeric character stripped.
|
||||||
|
|
||||||
|
Used for fragmentation-resistant matching: separator-injected secrets
|
||||||
|
(spaces, hyphens, dots inserted between characters) are identical to
|
||||||
|
their originals in alnum projection space.
|
||||||
|
"""
|
||||||
|
return "".join(c for c in text if c.isalnum())
|
||||||
|
|
||||||
|
|
||||||
|
def _find_partial_window(secret_alnum: str, text_alnum: str, min_len: int) -> int | None:
|
||||||
|
"""Return the position in text_alnum where any min_len-char window of
|
||||||
|
secret_alnum first appears, or None.
|
||||||
|
|
||||||
|
Slides a window of width min_len across secret_alnum and searches for
|
||||||
|
each window in text_alnum. The first hit position is returned.
|
||||||
|
"""
|
||||||
|
if len(secret_alnum) < min_len or len(text_alnum) < min_len:
|
||||||
|
return None
|
||||||
|
for i in range(len(secret_alnum) - min_len + 1):
|
||||||
|
window = secret_alnum[i:i + min_len]
|
||||||
|
pos = text_alnum.find(window)
|
||||||
|
if pos >= 0:
|
||||||
|
return pos
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def scan_known_secrets(
|
def scan_known_secrets(
|
||||||
text: str,
|
text: str,
|
||||||
*,
|
*,
|
||||||
location: str = "body",
|
location: str = "body",
|
||||||
env: typing.Mapping[str, str] | None = None,
|
env: typing.Mapping[str, str] | None = None,
|
||||||
|
sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",),
|
||||||
safe_tokens: typing.AbstractSet[str] | None = None,
|
safe_tokens: typing.AbstractSet[str] | None = None,
|
||||||
) -> ScanResult | None:
|
) -> ScanResult | None:
|
||||||
if env is None:
|
if env is None:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
# Pre-compute alnum projection of the scan text once; reused per secret.
|
||||||
|
text_alnum: str | None = None
|
||||||
|
|
||||||
for key, value in env.items():
|
for key, value in env.items():
|
||||||
if not key.startswith("EGRESS_TOKEN_") or not value:
|
if not any(key.startswith(p) for p in sensitive_prefixes) or not value:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# Pass 1: exact match across encoded variants (original behaviour).
|
||||||
|
approved_exact = False
|
||||||
for variant in _encoded_variants(value):
|
for variant in _encoded_variants(value):
|
||||||
pos = text.find(variant)
|
pos = text.find(variant)
|
||||||
if pos >= 0:
|
if pos >= 0:
|
||||||
@@ -180,6 +230,7 @@ def scan_known_secrets(
|
|||||||
# (PRD 0062); a different encoding of the same secret is a
|
# (PRD 0062); a different encoding of the same secret is a
|
||||||
# fresh block.
|
# fresh block.
|
||||||
if safe_tokens is not None and variant in safe_tokens:
|
if safe_tokens is not None and variant in safe_tokens:
|
||||||
|
approved_exact = True
|
||||||
continue
|
continue
|
||||||
return ScanResult(
|
return ScanResult(
|
||||||
severity="block",
|
severity="block",
|
||||||
@@ -188,6 +239,104 @@ def scan_known_secrets(
|
|||||||
context=_snippet(text, pos, pos + len(variant)),
|
context=_snippet(text, pos, pos + len(variant)),
|
||||||
matched=variant,
|
matched=variant,
|
||||||
)
|
)
|
||||||
|
if approved_exact:
|
||||||
|
# Exact match was found and approved; projection passes would
|
||||||
|
# fire on the same value, so skip them for this secret.
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Pass 2 & 3: fragmentation-resistant projection checks.
|
||||||
|
secret_alnum = _alnum_projection(value)
|
||||||
|
if len(secret_alnum) < _ALNUM_MIN_LEN:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if text_alnum is None:
|
||||||
|
text_alnum = _alnum_projection(text)
|
||||||
|
|
||||||
|
# Pass 2: full alnum-projection exact match (catches separator injection).
|
||||||
|
pos2 = text_alnum.find(secret_alnum)
|
||||||
|
if pos2 >= 0:
|
||||||
|
return ScanResult(
|
||||||
|
severity="block",
|
||||||
|
reason=(
|
||||||
|
f"provisioned secret from {key} found in {location} "
|
||||||
|
f"(fragmented match — separator injection)"
|
||||||
|
),
|
||||||
|
location=location,
|
||||||
|
context=_snippet(text_alnum, pos2, pos2 + len(secret_alnum)),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Pass 3: sliding-window partial match (catches chunked-substring leaks).
|
||||||
|
pos3 = _find_partial_window(secret_alnum, text_alnum, PARTIAL_MATCH_MIN_LEN)
|
||||||
|
if pos3 is not None:
|
||||||
|
return ScanResult(
|
||||||
|
severity="block",
|
||||||
|
reason=(
|
||||||
|
f"provisioned secret from {key} found in {location} "
|
||||||
|
f"(partial match — at least {PARTIAL_MATCH_MIN_LEN} consecutive "
|
||||||
|
f"alphanumeric chars)"
|
||||||
|
),
|
||||||
|
location=location,
|
||||||
|
context=_snippet(text_alnum, pos3, pos3 + PARTIAL_MATCH_MIN_LEN),
|
||||||
|
)
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Entropy detector (warn-only)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
# Sliding window size and step for the entropy scan.
|
||||||
|
ENTROPY_WINDOW = 64
|
||||||
|
ENTROPY_STEP = 32
|
||||||
|
|
||||||
|
# Bits-per-character threshold. Random ASCII printable ≈ 6.6 bits; random
|
||||||
|
# lowercase hex ≈ 4 bits; random base64url ≈ 6 bits. 5.5 sits above
|
||||||
|
# typical structured data (JSON, URLs) while staying below truly random
|
||||||
|
# content.
|
||||||
|
ENTROPY_BLOCK_THRESHOLD = 5.5
|
||||||
|
|
||||||
|
|
||||||
|
def _shannon_entropy(text: str) -> float:
|
||||||
|
if not text:
|
||||||
|
return 0.0
|
||||||
|
counts = Counter(text)
|
||||||
|
n = len(text)
|
||||||
|
return -sum((c / n) * log2(c / n) for c in counts.values())
|
||||||
|
|
||||||
|
|
||||||
|
def scan_entropy(
|
||||||
|
text: str,
|
||||||
|
*,
|
||||||
|
location: str = "body",
|
||||||
|
window: int = ENTROPY_WINDOW,
|
||||||
|
threshold: float = ENTROPY_BLOCK_THRESHOLD,
|
||||||
|
) -> ScanResult | None:
|
||||||
|
"""Warn-only detector: flag windows of `window` chars with Shannon entropy
|
||||||
|
above `threshold` bits per character.
|
||||||
|
|
||||||
|
Never blocks; always returns severity='warn'. Disabled by default —
|
||||||
|
routes must opt in via dlp.outbound_detectors=['entropy'].
|
||||||
|
"""
|
||||||
|
if not text:
|
||||||
|
return None
|
||||||
|
step = max(1, window // 2)
|
||||||
|
end = len(text)
|
||||||
|
# Scan overlapping windows; also check the final tail if shorter than window.
|
||||||
|
positions = list(range(0, end - window + 1, step))
|
||||||
|
if end < window:
|
||||||
|
positions = [0]
|
||||||
|
elif (end - window) % step != 0:
|
||||||
|
positions.append(end - window)
|
||||||
|
for i in positions:
|
||||||
|
chunk = text[i:i + window]
|
||||||
|
if _shannon_entropy(chunk) >= threshold:
|
||||||
|
return ScanResult(
|
||||||
|
severity="warn",
|
||||||
|
reason=f"high-entropy content in {location} (possible encrypted exfil)",
|
||||||
|
location=location,
|
||||||
|
context=_snippet(text, i, i + len(chunk)),
|
||||||
|
)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
@@ -306,11 +455,18 @@ def scan_crlf_injection(text: str) -> ScanResult | None:
|
|||||||
|
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
|
"ENTROPY_BLOCK_THRESHOLD",
|
||||||
|
"ENTROPY_WINDOW",
|
||||||
|
"ENTROPY_STEP",
|
||||||
|
"PARTIAL_MATCH_MIN_LEN",
|
||||||
"REDACT",
|
"REDACT",
|
||||||
"SNIPPET_CONTEXT",
|
"SNIPPET_CONTEXT",
|
||||||
"TOKEN_PATTERNS",
|
"TOKEN_PATTERNS",
|
||||||
|
"_alnum_projection",
|
||||||
|
"_shannon_entropy",
|
||||||
"redact_tokens",
|
"redact_tokens",
|
||||||
"scan_crlf_injection",
|
"scan_crlf_injection",
|
||||||
|
"scan_entropy",
|
||||||
"scan_known_secrets",
|
"scan_known_secrets",
|
||||||
"scan_naive_injection",
|
"scan_naive_injection",
|
||||||
"scan_token_patterns",
|
"scan_token_patterns",
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ specific and lives on concrete subclasses (see
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import dataclasses
|
import dataclasses
|
||||||
|
import secrets
|
||||||
from abc import ABC
|
from abc import ABC
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
@@ -34,6 +35,50 @@ EGRESS_HOSTNAME = "egress"
|
|||||||
EGRESS_ROUTES_IN_CONTAINER = "/etc/egress/routes.yaml"
|
EGRESS_ROUTES_IN_CONTAINER = "/etc/egress/routes.yaml"
|
||||||
EGRESS_ROUTES_FILENAME = Path(EGRESS_ROUTES_IN_CONTAINER).name
|
EGRESS_ROUTES_FILENAME = Path(EGRESS_ROUTES_IN_CONTAINER).name
|
||||||
|
|
||||||
|
_CANARY_ENV_WORDS = (
|
||||||
|
"ACCORD",
|
||||||
|
"ANCHOR",
|
||||||
|
"ATLAS",
|
||||||
|
"CANON",
|
||||||
|
"CIPHER",
|
||||||
|
"EMBER",
|
||||||
|
"FALCON",
|
||||||
|
"HARBOR",
|
||||||
|
"LANTERN",
|
||||||
|
"MARBLE",
|
||||||
|
"NOVA",
|
||||||
|
"ORBIT",
|
||||||
|
"PIVOT",
|
||||||
|
"RADIUS",
|
||||||
|
"SUMMIT",
|
||||||
|
"VECTOR",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _random_canary_env() -> str:
|
||||||
|
first = secrets.choice(_CANARY_ENV_WORDS)
|
||||||
|
remaining = tuple(word for word in _CANARY_ENV_WORDS if word != first)
|
||||||
|
second = secrets.choice(remaining)
|
||||||
|
return f"{first}_{second}_SECRET"
|
||||||
|
|
||||||
|
|
||||||
|
def egress_sidecar_env_entries(plan: "EgressPlan") -> tuple[str, ...]:
|
||||||
|
"""Return sidecar env entries needed by egress across all backends."""
|
||||||
|
env: list[str] = []
|
||||||
|
if plan.routes:
|
||||||
|
env.extend(sorted(plan.token_env_map.keys()))
|
||||||
|
if plan.canary and plan.canary_env:
|
||||||
|
env.append(f"{plan.canary_env}={plan.canary}")
|
||||||
|
env.append(f"BOT_BOTTLE_SENSITIVE_PREFIXES={plan.canary_env}")
|
||||||
|
return tuple(env)
|
||||||
|
|
||||||
|
|
||||||
|
def egress_agent_env_entries(plan: "EgressPlan") -> tuple[str, ...]:
|
||||||
|
"""Return agent-visible egress env entries shared by all backends."""
|
||||||
|
if plan.canary and plan.canary_env:
|
||||||
|
return (f"{plan.canary_env}={plan.canary}",)
|
||||||
|
return ()
|
||||||
|
|
||||||
|
|
||||||
@dataclass(frozen=True)
|
@dataclass(frozen=True)
|
||||||
class EgressRoute(Route):
|
class EgressRoute(Route):
|
||||||
@@ -65,6 +110,8 @@ class EgressPlan:
|
|||||||
mitmproxy_ca_host_path: Path = Path()
|
mitmproxy_ca_host_path: Path = Path()
|
||||||
mitmproxy_ca_cert_only_host_path: Path = Path()
|
mitmproxy_ca_cert_only_host_path: Path = Path()
|
||||||
log: int = 0
|
log: int = 0
|
||||||
|
canary: str = ""
|
||||||
|
canary_env: str = ""
|
||||||
|
|
||||||
|
|
||||||
def egress_manifest_routes(
|
def egress_manifest_routes(
|
||||||
@@ -324,12 +371,18 @@ class Egress(ABC):
|
|||||||
routes_path = stage_dir / EGRESS_ROUTES_FILENAME
|
routes_path = stage_dir / EGRESS_ROUTES_FILENAME
|
||||||
routes_path.write_text(egress_render_routes(routes, log=log))
|
routes_path.write_text(egress_render_routes(routes, log=log))
|
||||||
routes_path.chmod(0o600)
|
routes_path.chmod(0o600)
|
||||||
|
# Generate a per-session fake secret under a plausible random env name.
|
||||||
|
# The sidecar marks that exact env name as sensitive for known-secret
|
||||||
|
# scanning; the agent receives the same name/value as exfil bait.
|
||||||
|
canary = secrets.token_urlsafe(32)
|
||||||
return EgressPlan(
|
return EgressPlan(
|
||||||
slug=slug,
|
slug=slug,
|
||||||
routes_path=routes_path,
|
routes_path=routes_path,
|
||||||
routes=routes,
|
routes=routes,
|
||||||
token_env_map=egress_token_env_map(routes),
|
token_env_map=egress_token_env_map(routes),
|
||||||
log=log,
|
log=log,
|
||||||
|
canary=canary,
|
||||||
|
canary_env=_random_canary_env(),
|
||||||
)
|
)
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
@@ -344,5 +397,7 @@ __all__ = [
|
|||||||
"egress_render_routes",
|
"egress_render_routes",
|
||||||
"egress_resolve_token_values",
|
"egress_resolve_token_values",
|
||||||
"egress_routes_for_bottle",
|
"egress_routes_for_bottle",
|
||||||
|
"egress_agent_env_entries",
|
||||||
|
"egress_sidecar_env_entries",
|
||||||
"egress_token_env_map",
|
"egress_token_env_map",
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -34,7 +34,7 @@ VALID_METHODS = frozenset({
|
|||||||
"CONNECT",
|
"CONNECT",
|
||||||
})
|
})
|
||||||
|
|
||||||
OUTBOUND_DETECTOR_NAMES = frozenset({"token_patterns", "known_secrets"})
|
OUTBOUND_DETECTOR_NAMES = frozenset({"token_patterns", "known_secrets", "entropy"})
|
||||||
INBOUND_DETECTOR_NAMES = frozenset({"naive_injection_detection"})
|
INBOUND_DETECTOR_NAMES = frozenset({"naive_injection_detection"})
|
||||||
|
|
||||||
# Per-route policy for what the proxy does when an outbound DLP detector
|
# Per-route policy for what the proxy does when an outbound DLP detector
|
||||||
@@ -729,17 +729,28 @@ def scan_outbound(
|
|||||||
try:
|
try:
|
||||||
from dlp_detectors import ( # type: ignore[import-not-found]
|
from dlp_detectors import ( # type: ignore[import-not-found]
|
||||||
scan_crlf_injection,
|
scan_crlf_injection,
|
||||||
|
scan_entropy,
|
||||||
scan_known_secrets,
|
scan_known_secrets,
|
||||||
scan_token_patterns,
|
scan_token_patterns,
|
||||||
)
|
)
|
||||||
except ImportError: # pragma: no cover - host-side path
|
except ImportError: # pragma: no cover - host-side path
|
||||||
from .dlp_detectors import ( # type: ignore[import-not-found]
|
from .dlp_detectors import ( # type: ignore[import-not-found]
|
||||||
scan_crlf_injection,
|
scan_crlf_injection,
|
||||||
|
scan_entropy,
|
||||||
scan_known_secrets,
|
scan_known_secrets,
|
||||||
scan_token_patterns,
|
scan_token_patterns,
|
||||||
)
|
)
|
||||||
|
|
||||||
text = body if isinstance(body, str) else body.decode("utf-8", errors="replace")
|
# Binary bodies: latin-1 is a bijective byte↔codepoint mapping that
|
||||||
|
# preserves every byte value, so ASCII-range secret strings remain
|
||||||
|
# findable by str.find / regex. Prefer strict UTF-8 for valid text bodies.
|
||||||
|
if isinstance(body, bytes):
|
||||||
|
try:
|
||||||
|
text = body.decode("utf-8")
|
||||||
|
except UnicodeDecodeError:
|
||||||
|
text = body.decode("latin-1")
|
||||||
|
else:
|
||||||
|
text = body
|
||||||
|
|
||||||
# CRLF injection is only an attack in the request line + headers, never the
|
# CRLF injection is only an attack in the request line + headers, never the
|
||||||
# body: an HTTP body is delimited by Content-Length, so CRLF bytes there
|
# body: an HTTP body is delimited by Content-Length, so CRLF bytes there
|
||||||
@@ -758,12 +769,30 @@ def scan_outbound(
|
|||||||
return result
|
return result
|
||||||
|
|
||||||
if _detector_enabled(route.outbound_detectors, "known_secrets"):
|
if _detector_enabled(route.outbound_detectors, "known_secrets"):
|
||||||
|
# BOT_BOTTLE_SENSITIVE_PREFIXES lets operators add extra env prefixes
|
||||||
|
# beyond EGRESS_TOKEN_* without changing the manifest schema.
|
||||||
|
extra_raw = environ.get("BOT_BOTTLE_SENSITIVE_PREFIXES", "")
|
||||||
|
extra = tuple(p for p in extra_raw.split(",") if p)
|
||||||
|
sensitive_prefixes = ("EGRESS_TOKEN_",) + extra
|
||||||
result = scan_known_secrets(
|
result = scan_known_secrets(
|
||||||
text, location="body", env=environ, safe_tokens=safe_tokens,
|
text, location="body", env=environ,
|
||||||
|
sensitive_prefixes=sensitive_prefixes, safe_tokens=safe_tokens,
|
||||||
)
|
)
|
||||||
if result is not None:
|
if result is not None:
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
# Entropy scanning requires explicit opt-in: it is NOT part of the
|
||||||
|
# default "all detectors" set because it produces false positives on
|
||||||
|
# legitimate base64 / binary payloads. Routes must list "entropy" in
|
||||||
|
# dlp.outbound_detectors to enable it.
|
||||||
|
if (
|
||||||
|
route.outbound_detectors is not None
|
||||||
|
and "entropy" in route.outbound_detectors
|
||||||
|
):
|
||||||
|
result = scan_entropy(text, location="body")
|
||||||
|
if result is not None:
|
||||||
|
return result
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,189 @@
|
|||||||
|
# PRD prd-new: Strengthen outbound exfiltration detection
|
||||||
|
|
||||||
|
- **Status:** Active
|
||||||
|
- **Author:** claude
|
||||||
|
- **Created:** 2026-06-25
|
||||||
|
- **Issue:** #259
|
||||||
|
|
||||||
|
## Summary
|
||||||
|
|
||||||
|
The current DLP system has two detection tiers: weak (vendor token regexes)
|
||||||
|
and strong (exact matching of provisioned `EGRESS_TOKEN_*` values in multiple
|
||||||
|
encodings). This PRD adds four complementary hardening layers in ranked order
|
||||||
|
of impact:
|
||||||
|
|
||||||
|
1. **Canary tokens** — inject a per-session fake secret into the agent's
|
||||||
|
environment; any outbound appearance is a zero-false-positive exfil signal.
|
||||||
|
2. **Broadened known-value scanning** — extend the strong tier beyond
|
||||||
|
`EGRESS_TOKEN_*` via a `BOT_BOTTLE_SENSITIVE_PREFIXES` sidecar env var.
|
||||||
|
3. **Fragmentation-resistant matching** — catch separator injection and
|
||||||
|
long-substring leaks via alnum projection and sliding-window substring
|
||||||
|
search.
|
||||||
|
4. **Entropy scoring** — warn-only flag on high-entropy blobs as a
|
||||||
|
complementary signal for encrypted exfil.
|
||||||
|
|
||||||
|
## Problem
|
||||||
|
|
||||||
|
After PRD 0056 the outbound scan covers all HTTP request surfaces (host, path,
|
||||||
|
query, headers, body). But the content-based strong tier only matches
|
||||||
|
`EGRESS_TOKEN_*` values and their encodings. Three gaps remain:
|
||||||
|
|
||||||
|
| Gap | How an agent exploits it |
|
||||||
|
|-----|--------------------------|
|
||||||
|
| No canary | There is no "can't be false-positive" signal |
|
||||||
|
| Narrow prefix filter | Sensitive values provisioned under other env prefixes (MCP keys, API keys injected via the agent provider) are invisible to `scan_known_secrets` |
|
||||||
|
| Fragmentation blindness | Inserting `-`, space, or `\n` between secret characters turns an exact-match into a miss |
|
||||||
|
|
||||||
|
## Goals / Success Criteria
|
||||||
|
|
||||||
|
1. Each launched bottle has a unique canary token in the agent's environment
|
||||||
|
under a randomized `WORD_WORD_SECRET` env var name. The egress sidecar gets
|
||||||
|
the same env var and registers that exact name through
|
||||||
|
`BOT_BOTTLE_SENSITIVE_PREFIXES`. Any outbound appearance of the canary
|
||||||
|
blocks the request as a known-secret match.
|
||||||
|
2. `scan_known_secrets` accepts a `sensitive_prefixes` parameter (default:
|
||||||
|
`("EGRESS_TOKEN_",)`). `scan_outbound` reads
|
||||||
|
`BOT_BOTTLE_SENSITIVE_PREFIXES` from `environ` and merges those prefixes
|
||||||
|
in, so operators can mark additional env vars as scanned values without
|
||||||
|
changing the manifest schema.
|
||||||
|
3. For every secret that passes exact-match, a secondary alnum-projection pass
|
||||||
|
checks for the secret with all non-alphanumeric characters stripped. This
|
||||||
|
catches separator-injection evasion (`MY-SECRET` → body contains
|
||||||
|
`MY SECRET`).
|
||||||
|
4. A sliding-window partial-match pass checks for long-enough contiguous
|
||||||
|
substrings of the secret's alnum projection in the text's alnum projection.
|
||||||
|
Any match ≥ `PARTIAL_MATCH_MIN_LEN` (12 chars) blocks with reason
|
||||||
|
`"partial match"`.
|
||||||
|
5. A new `scan_entropy` detector flags outbound text windows with Shannon
|
||||||
|
entropy ≥ `ENTROPY_BLOCK_THRESHOLD` (5.5 bits/char) at **warn** severity
|
||||||
|
only. It is registered under the new detector name `"entropy"` in
|
||||||
|
`OUTBOUND_DETECTOR_NAMES` and disabled by default (routes must opt in).
|
||||||
|
6. Binary request bodies are decoded via `latin-1` instead of
|
||||||
|
`utf-8 errors="replace"`, preserving every byte value and allowing
|
||||||
|
ASCII-range secrets to be found within binary payloads.
|
||||||
|
7. All new behaviour is unit-tested; existing tests pass unchanged.
|
||||||
|
|
||||||
|
## Non-goals
|
||||||
|
|
||||||
|
- Rolling per-host buffer for split-across-requests detection (state in the
|
||||||
|
stateless addon is complex; deferred).
|
||||||
|
- Additional vendor regexes.
|
||||||
|
- ML / embedding-based detection.
|
||||||
|
- Entropy-based hard blocks (warn only per the issue).
|
||||||
|
|
||||||
|
## Design
|
||||||
|
|
||||||
|
### Canary token flow
|
||||||
|
|
||||||
|
```
|
||||||
|
Egress.prepare()
|
||||||
|
canary = secrets.token_urlsafe(32)
|
||||||
|
canary_env = <random WORD_WORD_SECRET>
|
||||||
|
EgressPlan(canary=canary, canary_env=canary_env, ...)
|
||||||
|
|
||||||
|
Docker compose render:
|
||||||
|
sidecar env: <canary_env>=<canary>
|
||||||
|
sidecar env: BOT_BOTTLE_SENSITIVE_PREFIXES=<canary_env>
|
||||||
|
agent env: <canary_env>=<canary> ← visible to agent as a "secret"
|
||||||
|
|
||||||
|
macos-container launch: same literals added to sidecar + agent env entries
|
||||||
|
```
|
||||||
|
|
||||||
|
The sidecar uses `BOT_BOTTLE_SENSITIVE_PREFIXES` to make the random canary env
|
||||||
|
name part of the existing `scan_known_secrets` detector without adding a
|
||||||
|
manifest schema field.
|
||||||
|
|
||||||
|
### Broadened known-value scanning
|
||||||
|
|
||||||
|
`scan_known_secrets` gains a `sensitive_prefixes` parameter:
|
||||||
|
|
||||||
|
```python
|
||||||
|
def scan_known_secrets(
|
||||||
|
text: str,
|
||||||
|
*,
|
||||||
|
location: str = "body",
|
||||||
|
env: Mapping[str, str] | None = None,
|
||||||
|
sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",),
|
||||||
|
) -> ScanResult | None:
|
||||||
|
```
|
||||||
|
|
||||||
|
`scan_outbound` reads `BOT_BOTTLE_SENSITIVE_PREFIXES` (comma-separated list
|
||||||
|
of additional prefixes) from `environ` and appends them:
|
||||||
|
|
||||||
|
```python
|
||||||
|
extra = tuple(
|
||||||
|
p for p in environ.get("BOT_BOTTLE_SENSITIVE_PREFIXES", "").split(",") if p
|
||||||
|
)
|
||||||
|
sensitive_prefixes = ("EGRESS_TOKEN_",) + extra
|
||||||
|
```
|
||||||
|
|
||||||
|
`redact_tokens` receives the same treatment for consistent redaction.
|
||||||
|
|
||||||
|
### Fragmentation-resistant matching
|
||||||
|
|
||||||
|
A new helper `_alnum_projection(text)` strips all non-alphanumeric characters.
|
||||||
|
`scan_known_secrets` runs two passes per secret:
|
||||||
|
|
||||||
|
1. **Exact pass** — existing encoded-variant loop (unchanged).
|
||||||
|
2. **Alnum-projection pass** — if the secret's alnum projection has ≥ 8 chars,
|
||||||
|
check if it appears in the text's alnum projection. Match → block with
|
||||||
|
`"fragmented match (separator injection)"` reason.
|
||||||
|
3. **Partial-substring pass** — if the secret's alnum projection has ≥
|
||||||
|
`PARTIAL_MATCH_MIN_LEN` chars (12), slide a window of that length across the
|
||||||
|
secret's projection and look for each window in the text's alnum projection.
|
||||||
|
First match → block with `"partial match"` reason.
|
||||||
|
|
||||||
|
All three passes run only for the `"known_secrets"` detector; the token-pattern
|
||||||
|
and entropy detectors are unchanged.
|
||||||
|
|
||||||
|
### Entropy scoring
|
||||||
|
|
||||||
|
New public function:
|
||||||
|
|
||||||
|
```python
|
||||||
|
def scan_entropy(
|
||||||
|
text: str,
|
||||||
|
*,
|
||||||
|
location: str = "body",
|
||||||
|
window: int = ENTROPY_WINDOW, # 64
|
||||||
|
threshold: float = ENTROPY_BLOCK_THRESHOLD, # 5.5
|
||||||
|
) -> ScanResult | None:
|
||||||
|
```
|
||||||
|
|
||||||
|
Slides a window of `window` characters across `text` in steps of `window // 2`.
|
||||||
|
If any window's Shannon entropy exceeds `threshold`, returns a **warn**-severity
|
||||||
|
`ScanResult`. Never blocks.
|
||||||
|
|
||||||
|
`OUTBOUND_DETECTOR_NAMES` gains `"entropy"`. Routes opt in via their `dlp`
|
||||||
|
block; entropy scanning is **off by default** to avoid false-positive noise on
|
||||||
|
legitimate binary payloads.
|
||||||
|
|
||||||
|
### Binary body handling
|
||||||
|
|
||||||
|
In `scan_outbound`, the bytes → str decoding changes from:
|
||||||
|
|
||||||
|
```python
|
||||||
|
body.decode("utf-8", errors="replace")
|
||||||
|
```
|
||||||
|
|
||||||
|
to:
|
||||||
|
|
||||||
|
```python
|
||||||
|
body.decode("utf-8") if body is str else body.decode("latin-1")
|
||||||
|
```
|
||||||
|
|
||||||
|
`latin-1` is a bijective byte↔codepoint mapping; every byte value is preserved
|
||||||
|
as its corresponding Latin-1 code point, so ASCII-range secret strings remain
|
||||||
|
intact and `str.find` / regex still locate them correctly. The fallback from
|
||||||
|
strict UTF-8 is tried first so valid UTF-8 bodies are decoded faithfully.
|
||||||
|
|
||||||
|
## Implementation
|
||||||
|
|
||||||
|
Delivered in three commits on the same branch:
|
||||||
|
|
||||||
|
1. **DLP detector changes** — `_alnum_projection`, fragmentation passes,
|
||||||
|
`scan_entropy`, broadened `scan_known_secrets`, updated `scan_outbound` and
|
||||||
|
`redact_tokens`; all accompanying unit tests.
|
||||||
|
2. **Canary injection** — `EgressPlan.canary`, `Egress.prepare()`,
|
||||||
|
Docker compose + macos-container backend injection.
|
||||||
|
3. **PRD flip** — `Status: Draft → Active`.
|
||||||
@@ -80,7 +80,11 @@ def _git_gate_plan(upstreams: tuple[GitGateUpstream, ...] = ()) -> GitGatePlan:
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def _egress_plan(routes: tuple[EgressRoute, ...] = ()) -> EgressPlan:
|
def _egress_plan(
|
||||||
|
routes: tuple[EgressRoute, ...] = (),
|
||||||
|
*,
|
||||||
|
canary: bool = False,
|
||||||
|
) -> EgressPlan:
|
||||||
token_env_map = {
|
token_env_map = {
|
||||||
r.token_env: r.token_ref
|
r.token_env: r.token_ref
|
||||||
for r in routes
|
for r in routes
|
||||||
@@ -95,6 +99,8 @@ def _egress_plan(routes: tuple[EgressRoute, ...] = ()) -> EgressPlan:
|
|||||||
egress_network=f"bot-bottle-egress-{SLUG}",
|
egress_network=f"bot-bottle-egress-{SLUG}",
|
||||||
mitmproxy_ca_host_path=STATE / "egress-ca" / "mitmproxy-ca.pem",
|
mitmproxy_ca_host_path=STATE / "egress-ca" / "mitmproxy-ca.pem",
|
||||||
mitmproxy_ca_cert_only_host_path=STATE / "egress-ca" / "ca.pem",
|
mitmproxy_ca_cert_only_host_path=STATE / "egress-ca" / "ca.pem",
|
||||||
|
canary="fake-canary-value" if canary else "",
|
||||||
|
canary_env="CANON_ALPHA_SECRET" if canary else "",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -112,6 +118,7 @@ def _plan(
|
|||||||
with_git: bool = False,
|
with_git: bool = False,
|
||||||
with_egress: bool = False,
|
with_egress: bool = False,
|
||||||
supervise: bool = False,
|
supervise: bool = False,
|
||||||
|
canary: bool = False,
|
||||||
) -> DockerBottlePlan:
|
) -> DockerBottlePlan:
|
||||||
"""Build a fully-resolved DockerBottlePlan. Toggles cover the
|
"""Build a fully-resolved DockerBottlePlan. Toggles cover the
|
||||||
matrix the renderer's conditional-service logic branches on."""
|
matrix the renderer's conditional-service logic branches on."""
|
||||||
@@ -150,7 +157,7 @@ def _plan(
|
|||||||
slug=SLUG,
|
slug=SLUG,
|
||||||
forwarded_env={"CLAUDE_CODE_OAUTH_TOKEN": "x"},
|
forwarded_env={"CLAUDE_CODE_OAUTH_TOKEN": "x"},
|
||||||
git_gate_plan=_git_gate_plan(upstreams),
|
git_gate_plan=_git_gate_plan(upstreams),
|
||||||
egress_plan=_egress_plan(routes),
|
egress_plan=_egress_plan(routes, canary=canary),
|
||||||
supervise_plan=_supervise_plan() if supervise else None,
|
supervise_plan=_supervise_plan() if supervise else None,
|
||||||
use_runsc=False,
|
use_runsc=False,
|
||||||
agent_provision=AgentProvisionPlan(
|
agent_provision=AgentProvisionPlan(
|
||||||
@@ -375,6 +382,20 @@ class TestSidecarBundleShape(unittest.TestCase):
|
|||||||
env_strings = sc["environment"]
|
env_strings = sc["environment"]
|
||||||
self.assertNotIn("EGRESS_TOKEN_0", env_strings)
|
self.assertNotIn("EGRESS_TOKEN_0", env_strings)
|
||||||
|
|
||||||
|
def test_canary_env_registered_as_sensitive_in_sidecar(self):
|
||||||
|
sc = self._render(canary=True)["services"]["sidecars"]
|
||||||
|
env_strings = sc["environment"]
|
||||||
|
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", env_strings)
|
||||||
|
self.assertIn(
|
||||||
|
"BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET",
|
||||||
|
env_strings,
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_canary_env_visible_to_agent(self):
|
||||||
|
agent = self._render(canary=True)["services"]["agent"]
|
||||||
|
env_strings = agent["environment"]
|
||||||
|
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", env_strings)
|
||||||
|
|
||||||
def test_supervise_env_present_when_active(self):
|
def test_supervise_env_present_when_active(self):
|
||||||
sc = self._render(supervise=True)["services"]["sidecars"]
|
sc = self._render(supervise=True)["services"]["sidecars"]
|
||||||
env_strings = sc["environment"]
|
env_strings = sc["environment"]
|
||||||
|
|||||||
@@ -1,18 +1,23 @@
|
|||||||
"""Unit: DLP detectors (PRD 0053).
|
"""Unit: DLP detectors (PRD 0053).
|
||||||
|
|
||||||
Tests for token pattern scanning, known secret detection, and
|
Tests for token pattern scanning, known secret detection, fragmentation-
|
||||||
naive prompt injection detection."""
|
resistant matching, entropy scoring, and naive prompt injection detection."""
|
||||||
|
|
||||||
import base64
|
import base64
|
||||||
import gzip
|
import gzip
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from bot_bottle.dlp_detectors import (
|
from bot_bottle.dlp_detectors import (
|
||||||
|
ENTROPY_BLOCK_THRESHOLD,
|
||||||
|
PARTIAL_MATCH_MIN_LEN,
|
||||||
REDACT,
|
REDACT,
|
||||||
|
_alnum_projection,
|
||||||
_encoded_variants,
|
_encoded_variants,
|
||||||
_normalize_text,
|
_normalize_text,
|
||||||
|
_shannon_entropy,
|
||||||
redact_tokens,
|
redact_tokens,
|
||||||
scan_crlf_injection,
|
scan_crlf_injection,
|
||||||
|
scan_entropy,
|
||||||
scan_known_secrets,
|
scan_known_secrets,
|
||||||
scan_naive_injection,
|
scan_naive_injection,
|
||||||
scan_token_patterns,
|
scan_token_patterns,
|
||||||
@@ -502,6 +507,191 @@ class TestStripCrlf(unittest.TestCase):
|
|||||||
from bot_bottle.dlp_detectors import strip_crlf
|
from bot_bottle.dlp_detectors import strip_crlf
|
||||||
self.assertEqual("/api/v1/data?q=hello", strip_crlf("/api/v1/data?q=hello"))
|
self.assertEqual("/api/v1/data?q=hello", strip_crlf("/api/v1/data?q=hello"))
|
||||||
|
|
||||||
|
class TestAlnumProjection(unittest.TestCase):
|
||||||
|
def test_alphanumeric_unchanged(self):
|
||||||
|
self.assertEqual("abc123XYZ", _alnum_projection("abc123XYZ"))
|
||||||
|
|
||||||
|
def test_strips_hyphens(self):
|
||||||
|
self.assertEqual("mysecretvalue", _alnum_projection("my-secret-value"))
|
||||||
|
|
||||||
|
def test_strips_spaces(self):
|
||||||
|
self.assertEqual("mysecretvalue", _alnum_projection("my secret value"))
|
||||||
|
|
||||||
|
def test_strips_dots_and_underscores(self):
|
||||||
|
self.assertEqual("mysecretvalue", _alnum_projection("my.secret_value"))
|
||||||
|
|
||||||
|
def test_empty_string(self):
|
||||||
|
self.assertEqual("", _alnum_projection(""))
|
||||||
|
|
||||||
|
def test_all_special_chars(self):
|
||||||
|
self.assertEqual("", _alnum_projection("!@#$%^&*()"))
|
||||||
|
|
||||||
|
|
||||||
|
class TestFragmentationResistantMatching(unittest.TestCase):
|
||||||
|
"""scan_known_secrets catches separator-injection and partial-substring evasion."""
|
||||||
|
|
||||||
|
# Secrets long enough that their alnum projections are ≥ 8 chars.
|
||||||
|
SECRET = "supersecrettoken99"
|
||||||
|
ENV = {"EGRESS_TOKEN_0": SECRET}
|
||||||
|
|
||||||
|
def test_exact_match_still_works(self):
|
||||||
|
result = scan_known_secrets(f"key={self.SECRET}", env=self.ENV)
|
||||||
|
self.assertIsNotNone(result)
|
||||||
|
assert result is not None
|
||||||
|
self.assertEqual("block", result.severity)
|
||||||
|
|
||||||
|
def test_separator_injection_blocked(self):
|
||||||
|
# Hyphens inserted between chars of the secret.
|
||||||
|
fragmented = "-".join(self.SECRET)
|
||||||
|
result = scan_known_secrets(f"data={fragmented}", env=self.ENV)
|
||||||
|
self.assertIsNotNone(result)
|
||||||
|
assert result is not None
|
||||||
|
self.assertEqual("block", result.severity)
|
||||||
|
self.assertIn("separator injection", result.reason)
|
||||||
|
|
||||||
|
def test_space_separator_blocked(self):
|
||||||
|
fragmented = " ".join(self.SECRET)
|
||||||
|
result = scan_known_secrets(f"body: {fragmented}", env=self.ENV)
|
||||||
|
self.assertIsNotNone(result)
|
||||||
|
assert result is not None
|
||||||
|
self.assertIn("separator injection", result.reason)
|
||||||
|
|
||||||
|
def test_partial_substring_blocked(self):
|
||||||
|
# First PARTIAL_MATCH_MIN_LEN alnum chars of the secret, no separators.
|
||||||
|
partial = _alnum_projection(self.SECRET)[:PARTIAL_MATCH_MIN_LEN]
|
||||||
|
result = scan_known_secrets(f"x={partial}&y=other", env=self.ENV)
|
||||||
|
self.assertIsNotNone(result)
|
||||||
|
assert result is not None
|
||||||
|
self.assertEqual("block", result.severity)
|
||||||
|
self.assertIn("partial match", result.reason)
|
||||||
|
|
||||||
|
def test_short_secret_skips_projection(self):
|
||||||
|
# Secrets shorter than _ALNUM_MIN_LEN in alnum projection are not
|
||||||
|
# fragmentation-checked (too many false positives).
|
||||||
|
short_env = {"EGRESS_TOKEN_0": "abc"}
|
||||||
|
# "a b c" has alnum projection "abc" (3 chars, < 8); should not block.
|
||||||
|
self.assertIsNone(scan_known_secrets("a b c", env=short_env))
|
||||||
|
|
||||||
|
def test_clean_text_not_blocked(self):
|
||||||
|
self.assertIsNone(scan_known_secrets("nothing to see here", env=self.ENV))
|
||||||
|
|
||||||
|
def test_sensitive_prefixes_param_extra_prefix(self):
|
||||||
|
env = {"MY_CRED_0": self.SECRET, "IGNORED": "other"}
|
||||||
|
result = scan_known_secrets(
|
||||||
|
f"key={self.SECRET}",
|
||||||
|
env=env,
|
||||||
|
sensitive_prefixes=("MY_CRED_",),
|
||||||
|
)
|
||||||
|
self.assertIsNotNone(result)
|
||||||
|
assert result is not None
|
||||||
|
self.assertIn("MY_CRED_0", result.reason)
|
||||||
|
|
||||||
|
def test_sensitive_prefixes_default_only_egress_token(self):
|
||||||
|
# A value under a non-EGRESS_TOKEN_ key is ignored with default prefixes.
|
||||||
|
env = {"MY_CRED_0": self.SECRET}
|
||||||
|
self.assertIsNone(scan_known_secrets(f"key={self.SECRET}", env=env))
|
||||||
|
|
||||||
|
def test_canary_prefix_detected(self):
|
||||||
|
canary_value = "canary-fake-secret-value-xyz"
|
||||||
|
env = {"CANON_ALPHA_SECRET": canary_value}
|
||||||
|
result = scan_known_secrets(
|
||||||
|
f"x={canary_value}",
|
||||||
|
env=env,
|
||||||
|
sensitive_prefixes=("CANON_ALPHA_SECRET",),
|
||||||
|
)
|
||||||
|
self.assertIsNotNone(result)
|
||||||
|
assert result is not None
|
||||||
|
self.assertIn("CANON_ALPHA_SECRET", result.reason)
|
||||||
|
|
||||||
|
|
||||||
|
class TestRedactTokensBroadenedPrefixes(unittest.TestCase):
|
||||||
|
SECRET = "my-provisioned-secret"
|
||||||
|
|
||||||
|
def test_default_redacts_egress_token(self):
|
||||||
|
env = {"EGRESS_TOKEN_0": self.SECRET}
|
||||||
|
out = redact_tokens(f"val={self.SECRET}", env=env)
|
||||||
|
self.assertNotIn(self.SECRET, out)
|
||||||
|
self.assertIn(REDACT, out)
|
||||||
|
|
||||||
|
def test_extra_prefix_redacted(self):
|
||||||
|
env = {"MY_SECRET_KEY": self.SECRET}
|
||||||
|
out = redact_tokens(
|
||||||
|
f"val={self.SECRET}",
|
||||||
|
env=env,
|
||||||
|
sensitive_prefixes=("MY_SECRET_",),
|
||||||
|
)
|
||||||
|
self.assertNotIn(self.SECRET, out)
|
||||||
|
self.assertIn(REDACT, out)
|
||||||
|
|
||||||
|
def test_non_matching_prefix_not_redacted(self):
|
||||||
|
env = {"MY_SECRET_KEY": self.SECRET}
|
||||||
|
out = redact_tokens(f"val={self.SECRET}", env=env)
|
||||||
|
# Default prefixes only include EGRESS_TOKEN_ → secret not redacted
|
||||||
|
self.assertIn(self.SECRET, out)
|
||||||
|
|
||||||
|
|
||||||
|
class TestShannonEntropy(unittest.TestCase):
|
||||||
|
def test_empty_string_zero(self):
|
||||||
|
self.assertEqual(0.0, _shannon_entropy(""))
|
||||||
|
|
||||||
|
def test_single_char_zero(self):
|
||||||
|
self.assertEqual(0.0, _shannon_entropy("aaaaaa"))
|
||||||
|
|
||||||
|
def test_two_equal_chars_one_bit(self):
|
||||||
|
self.assertAlmostEqual(1.0, _shannon_entropy("abababab"), places=10)
|
||||||
|
|
||||||
|
def test_high_entropy_random_like(self):
|
||||||
|
# Uniform 64-char string over 64 distinct symbols has entropy 6 bits.
|
||||||
|
import string
|
||||||
|
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
|
||||||
|
text = alphabet # each char appears exactly once
|
||||||
|
self.assertAlmostEqual(6.0, _shannon_entropy(text), places=10)
|
||||||
|
|
||||||
|
|
||||||
|
class TestScanEntropy(unittest.TestCase):
|
||||||
|
def test_empty_returns_none(self):
|
||||||
|
self.assertIsNone(scan_entropy(""))
|
||||||
|
|
||||||
|
def test_low_entropy_returns_none(self):
|
||||||
|
# Highly repetitive text has low entropy.
|
||||||
|
self.assertIsNone(scan_entropy("a" * 200))
|
||||||
|
|
||||||
|
def test_high_entropy_warns(self):
|
||||||
|
# Build a 64-char string with entropy > ENTROPY_BLOCK_THRESHOLD.
|
||||||
|
# Use all 64 distinct printable chars to maximise entropy (~6 bits).
|
||||||
|
import string
|
||||||
|
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
|
||||||
|
result = scan_entropy(alphabet, threshold=ENTROPY_BLOCK_THRESHOLD)
|
||||||
|
self.assertIsNotNone(result)
|
||||||
|
assert result is not None
|
||||||
|
self.assertEqual("warn", result.severity)
|
||||||
|
self.assertIn("high-entropy", result.reason)
|
||||||
|
|
||||||
|
def test_never_blocks(self):
|
||||||
|
import string
|
||||||
|
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
|
||||||
|
result = scan_entropy(alphabet)
|
||||||
|
# scan_entropy is warn-only; it must never return severity="block".
|
||||||
|
if result is not None:
|
||||||
|
self.assertNotEqual("block", result.severity)
|
||||||
|
|
||||||
|
def test_location_in_result(self):
|
||||||
|
import string
|
||||||
|
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
|
||||||
|
result = scan_entropy(alphabet, location="authorization header")
|
||||||
|
if result is not None:
|
||||||
|
self.assertIn("authorization header", result.location)
|
||||||
|
|
||||||
|
def test_structured_json_no_warn(self):
|
||||||
|
# Typical JSON has low entropy and should not be flagged.
|
||||||
|
json_body = '{"status": "ok", "message": "hello world", "count": 42}'
|
||||||
|
self.assertIsNone(scan_entropy(json_body))
|
||||||
|
|
||||||
|
def test_short_text_below_window(self):
|
||||||
|
# Text shorter than the window: checked as one chunk.
|
||||||
|
# Use a uniform string to ensure it won't be flagged.
|
||||||
|
self.assertIsNone(scan_entropy("abcde", threshold=ENTROPY_BLOCK_THRESHOLD))
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|||||||
@@ -1,15 +1,21 @@
|
|||||||
"""Unit: Egress route lift + routes.yaml render + token
|
"""Unit: Egress route lift + routes.yaml render + token
|
||||||
resolution (PRD 0017, PRD 0053)."""
|
resolution (PRD 0017, PRD 0053)."""
|
||||||
|
|
||||||
|
import tempfile
|
||||||
import unittest
|
import unittest
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
from bot_bottle.egress import (
|
from bot_bottle.egress import (
|
||||||
CODEX_HOST_CREDENTIAL_TOKEN_REF,
|
CODEX_HOST_CREDENTIAL_TOKEN_REF,
|
||||||
|
Egress,
|
||||||
|
EgressPlan,
|
||||||
EgressRoute,
|
EgressRoute,
|
||||||
|
egress_agent_env_entries,
|
||||||
egress_manifest_routes,
|
egress_manifest_routes,
|
||||||
egress_render_routes,
|
egress_render_routes,
|
||||||
egress_resolve_token_values,
|
egress_resolve_token_values,
|
||||||
egress_routes_for_bottle,
|
egress_routes_for_bottle,
|
||||||
|
egress_sidecar_env_entries,
|
||||||
egress_token_env_map,
|
egress_token_env_map,
|
||||||
)
|
)
|
||||||
from bot_bottle.log import Die
|
from bot_bottle.log import Die
|
||||||
@@ -443,5 +449,119 @@ class TestResolveTokenValues(unittest.TestCase):
|
|||||||
self.assertEqual({"EGRESS_TOKEN_0": "codex-access-token"}, out)
|
self.assertEqual({"EGRESS_TOKEN_0": "codex-access-token"}, out)
|
||||||
|
|
||||||
|
|
||||||
|
class TestCanaryGeneration(unittest.TestCase):
|
||||||
|
"""Egress.prepare() generates a unique canary token per session."""
|
||||||
|
|
||||||
|
def _bottle_obj(self):
|
||||||
|
return ManifestIndex.from_json_obj({
|
||||||
|
"bottles": {"dev": {"egress": {"routes": []}}},
|
||||||
|
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
|
||||||
|
}).bottles["dev"]
|
||||||
|
|
||||||
|
def _make_plan(self) -> EgressPlan:
|
||||||
|
# Use a concrete no-op subclass so we can call prepare() without
|
||||||
|
# a real backend.
|
||||||
|
class _TestEgress(Egress):
|
||||||
|
pass
|
||||||
|
|
||||||
|
e = _TestEgress()
|
||||||
|
with tempfile.TemporaryDirectory() as td:
|
||||||
|
return e.prepare(self._bottle_obj(), "test-slug", Path(td))
|
||||||
|
|
||||||
|
def test_canary_is_non_empty(self):
|
||||||
|
plan = self._make_plan()
|
||||||
|
self.assertIsInstance(plan.canary, str)
|
||||||
|
self.assertGreater(len(plan.canary), 0)
|
||||||
|
self.assertRegex(plan.canary_env, r"^[A-Z]+_[A-Z]+_SECRET$")
|
||||||
|
|
||||||
|
def test_canary_is_unique_per_session(self):
|
||||||
|
with tempfile.TemporaryDirectory() as td:
|
||||||
|
bottle = self._bottle_obj()
|
||||||
|
|
||||||
|
class _TestEgress(Egress):
|
||||||
|
pass
|
||||||
|
|
||||||
|
e = _TestEgress()
|
||||||
|
plan_a = e.prepare(bottle, "slug-a", Path(td))
|
||||||
|
plan_b = e.prepare(bottle, "slug-b", Path(td))
|
||||||
|
self.assertNotEqual(plan_a.canary, plan_b.canary)
|
||||||
|
|
||||||
|
def test_canary_detected_by_scan_known_secrets(self):
|
||||||
|
from bot_bottle.dlp_detectors import scan_known_secrets
|
||||||
|
|
||||||
|
plan = self._make_plan()
|
||||||
|
env = {plan.canary_env: plan.canary}
|
||||||
|
result = scan_known_secrets(
|
||||||
|
f"exfil={plan.canary}",
|
||||||
|
env=env,
|
||||||
|
sensitive_prefixes=(plan.canary_env,),
|
||||||
|
)
|
||||||
|
self.assertIsNotNone(result)
|
||||||
|
assert result is not None
|
||||||
|
self.assertEqual("block", result.severity)
|
||||||
|
self.assertIn(plan.canary_env, result.reason)
|
||||||
|
|
||||||
|
def test_egress_plan_canary_field_default_empty(self):
|
||||||
|
# Verify EgressPlan can be constructed with an empty canary (backward compat).
|
||||||
|
from pathlib import Path
|
||||||
|
plan = EgressPlan(
|
||||||
|
slug="s",
|
||||||
|
routes_path=Path("/tmp/r.yaml"),
|
||||||
|
routes=(),
|
||||||
|
token_env_map={},
|
||||||
|
)
|
||||||
|
self.assertEqual("", plan.canary)
|
||||||
|
self.assertEqual("", plan.canary_env)
|
||||||
|
|
||||||
|
|
||||||
|
class TestEgressEnvEntries(unittest.TestCase):
|
||||||
|
def test_sidecar_entries_include_route_tokens_and_canary_scan_prefix(self):
|
||||||
|
plan = EgressPlan(
|
||||||
|
slug="s",
|
||||||
|
routes_path=Path("/tmp/r.yaml"),
|
||||||
|
routes=(EgressRoute(host="api.example"),),
|
||||||
|
token_env_map={"EGRESS_TOKEN_1": "T1", "EGRESS_TOKEN_0": "T0"},
|
||||||
|
canary="fake-canary-value",
|
||||||
|
canary_env="CANON_ALPHA_SECRET",
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
(
|
||||||
|
"EGRESS_TOKEN_0",
|
||||||
|
"EGRESS_TOKEN_1",
|
||||||
|
"CANON_ALPHA_SECRET=fake-canary-value",
|
||||||
|
"BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET",
|
||||||
|
),
|
||||||
|
egress_sidecar_env_entries(plan),
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_agent_entries_include_only_canary_bait(self):
|
||||||
|
plan = EgressPlan(
|
||||||
|
slug="s",
|
||||||
|
routes_path=Path("/tmp/r.yaml"),
|
||||||
|
routes=(),
|
||||||
|
token_env_map={},
|
||||||
|
canary="fake-canary-value",
|
||||||
|
canary_env="CANON_ALPHA_SECRET",
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
("CANON_ALPHA_SECRET=fake-canary-value",),
|
||||||
|
egress_agent_env_entries(plan),
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_canary_entries_omitted_when_name_missing(self):
|
||||||
|
plan = EgressPlan(
|
||||||
|
slug="s",
|
||||||
|
routes_path=Path("/tmp/r.yaml"),
|
||||||
|
routes=(),
|
||||||
|
token_env_map={},
|
||||||
|
canary="fake-canary-value",
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual((), egress_sidecar_env_entries(plan))
|
||||||
|
self.assertEqual((), egress_agent_env_entries(plan))
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|||||||
@@ -1273,6 +1273,109 @@ class TestBuildTokenAllowPayload(unittest.TestCase):
|
|||||||
result = ScanResult(severity="block", reason="r", matched="x")
|
result = ScanResult(severity="block", reason="r", matched="x")
|
||||||
payload = build_token_allow_payload("h", "GET", "/", result)
|
payload = build_token_allow_payload("h", "GET", "/", result)
|
||||||
self.assertNotIn("context:", payload)
|
self.assertNotIn("context:", payload)
|
||||||
|
class TestScanOutboundEnhanced(unittest.TestCase):
|
||||||
|
"""scan_outbound changes: binary decode, entropy detector,
|
||||||
|
broadened known-value prefixes, fragmentation resistance."""
|
||||||
|
|
||||||
|
_ROUTE = Route(host="api.example.com")
|
||||||
|
_ROUTE_ENTROPY = Route(
|
||||||
|
host="api.example.com",
|
||||||
|
outbound_detectors=("entropy",),
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_binary_body_latin1_decode_finds_ascii_secret(self):
|
||||||
|
# Body contains valid ASCII secret surrounded by non-UTF-8 bytes.
|
||||||
|
secret = "supersecrettoken99"
|
||||||
|
env = {"EGRESS_TOKEN_0": secret}
|
||||||
|
# Wrap the secret in bytes that are invalid UTF-8.
|
||||||
|
body = b"\x80\x81" + secret.encode("ascii") + b"\xff"
|
||||||
|
result = scan_outbound(self._ROUTE, body, env)
|
||||||
|
self.assertIsNotNone(result)
|
||||||
|
assert result is not None
|
||||||
|
self.assertEqual("block", result.severity)
|
||||||
|
|
||||||
|
def test_binary_body_valid_utf8_decoded_correctly(self):
|
||||||
|
env = {"EGRESS_TOKEN_0": "mysecret"}
|
||||||
|
# Valid UTF-8 body — should be decoded as UTF-8, not latin-1.
|
||||||
|
body = "clean body with mysecret".encode("utf-8")
|
||||||
|
result = scan_outbound(self._ROUTE, body, env)
|
||||||
|
self.assertIsNotNone(result)
|
||||||
|
|
||||||
|
def test_entropy_detector_off_by_default(self):
|
||||||
|
import string
|
||||||
|
# High-entropy content should NOT warn if the route has no entropy detector.
|
||||||
|
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
|
||||||
|
result = scan_outbound(self._ROUTE, alphabet, {})
|
||||||
|
self.assertIsNone(result)
|
||||||
|
|
||||||
|
def test_entropy_detector_warns_when_enabled(self):
|
||||||
|
import string
|
||||||
|
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
|
||||||
|
result = scan_outbound(self._ROUTE_ENTROPY, alphabet, {})
|
||||||
|
self.assertIsNotNone(result)
|
||||||
|
assert result is not None
|
||||||
|
self.assertEqual("warn", result.severity)
|
||||||
|
|
||||||
|
def test_bot_bottle_sensitive_prefixes_env_var(self):
|
||||||
|
# When the sidecar env contains BOT_BOTTLE_SENSITIVE_PREFIXES,
|
||||||
|
# scan_outbound should scan those additional prefixes.
|
||||||
|
secret = "extra-sensitive-value-abc"
|
||||||
|
env = {
|
||||||
|
"MY_CRED_KEY": secret,
|
||||||
|
"BOT_BOTTLE_SENSITIVE_PREFIXES": "MY_CRED_",
|
||||||
|
}
|
||||||
|
result = scan_outbound(self._ROUTE, f"x={secret}", env)
|
||||||
|
self.assertIsNotNone(result)
|
||||||
|
assert result is not None
|
||||||
|
self.assertEqual("block", result.severity)
|
||||||
|
|
||||||
|
def test_bot_bottle_sensitive_prefixes_multiple(self):
|
||||||
|
secret = "my-api-key-value-xyz"
|
||||||
|
env = {
|
||||||
|
"ANTHROPIC_API_0": secret,
|
||||||
|
"BOT_BOTTLE_SENSITIVE_PREFIXES": "ANTHROPIC_API_,OTHER_",
|
||||||
|
}
|
||||||
|
result = scan_outbound(self._ROUTE, f"auth={secret}", env)
|
||||||
|
self.assertIsNotNone(result)
|
||||||
|
|
||||||
|
def test_canary_detected_via_random_secret_env_name(self):
|
||||||
|
# The fake secret uses a randomized env name that the sidecar marks
|
||||||
|
# as sensitive through BOT_BOTTLE_SENSITIVE_PREFIXES.
|
||||||
|
canary = "canaryvalue12345abcdef"
|
||||||
|
env = {
|
||||||
|
"CANON_ALPHA_SECRET": canary,
|
||||||
|
"BOT_BOTTLE_SENSITIVE_PREFIXES": "CANON_ALPHA_SECRET",
|
||||||
|
}
|
||||||
|
result = scan_outbound(self._ROUTE, f"data={canary}", env)
|
||||||
|
self.assertIsNotNone(result)
|
||||||
|
assert result is not None
|
||||||
|
self.assertEqual("block", result.severity)
|
||||||
|
self.assertIn("CANON_ALPHA_SECRET", result.reason)
|
||||||
|
|
||||||
|
def test_fragmented_canary_blocked(self):
|
||||||
|
# Canary with separators injected is still caught.
|
||||||
|
canary = "supersecretcanary99"
|
||||||
|
env = {
|
||||||
|
"CANON_ALPHA_SECRET": canary,
|
||||||
|
"BOT_BOTTLE_SENSITIVE_PREFIXES": "CANON_ALPHA_SECRET",
|
||||||
|
}
|
||||||
|
fragmented = "-".join(canary)
|
||||||
|
result = scan_outbound(self._ROUTE, f"x={fragmented}", env)
|
||||||
|
self.assertIsNotNone(result)
|
||||||
|
|
||||||
|
|
||||||
|
class TestOutboundDetectorNames(unittest.TestCase):
|
||||||
|
def test_entropy_in_outbound_detector_names(self):
|
||||||
|
from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES
|
||||||
|
self.assertIn("entropy", OUTBOUND_DETECTOR_NAMES)
|
||||||
|
|
||||||
|
def test_known_secrets_in_outbound_detector_names(self):
|
||||||
|
from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES
|
||||||
|
self.assertIn("known_secrets", OUTBOUND_DETECTOR_NAMES)
|
||||||
|
|
||||||
|
def test_token_patterns_in_outbound_detector_names(self):
|
||||||
|
from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES
|
||||||
|
self.assertIn("token_patterns", OUTBOUND_DETECTOR_NAMES)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
@@ -30,6 +30,7 @@ def _plan(
|
|||||||
supervise: bool = False,
|
supervise: bool = False,
|
||||||
agent_git_gate_url: str = "",
|
agent_git_gate_url: str = "",
|
||||||
agent_supervise_url: str = "",
|
agent_supervise_url: str = "",
|
||||||
|
canary: bool = False,
|
||||||
) -> MacosContainerBottlePlan:
|
) -> MacosContainerBottlePlan:
|
||||||
routes_path = stage_dir / "routes.yaml"
|
routes_path = stage_dir / "routes.yaml"
|
||||||
routes_path.write_text("routes: []\n", encoding="utf-8")
|
routes_path.write_text("routes: []\n", encoding="utf-8")
|
||||||
@@ -42,6 +43,8 @@ def _plan(
|
|||||||
routes_path=routes_path,
|
routes_path=routes_path,
|
||||||
routes=("route",),
|
routes=("route",),
|
||||||
token_env_map={"EGRESS_TOKEN_0": "HOST_TOKEN"},
|
token_env_map={"EGRESS_TOKEN_0": "HOST_TOKEN"},
|
||||||
|
canary="fake-canary-value" if canary else "",
|
||||||
|
canary_env="CANON_ALPHA_SECRET" if canary else "",
|
||||||
)
|
)
|
||||||
if git:
|
if git:
|
||||||
key_path = stage_dir / "origin-key"
|
key_path = stage_dir / "origin-key"
|
||||||
@@ -138,6 +141,26 @@ class TestMacosContainerLaunchArgv(unittest.TestCase):
|
|||||||
argv,
|
argv,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def test_sidecar_argv_registers_canary_env_as_sensitive(self):
|
||||||
|
plan = _plan(stage_dir=self.stage_dir, canary=True)
|
||||||
|
argv = launch._sidecar_run_argv(
|
||||||
|
plan,
|
||||||
|
"bot-bottle-sidecars-dev-abc",
|
||||||
|
"bot-bottle-net-dev-abc",
|
||||||
|
"bot-bottle-egress-dev-abc",
|
||||||
|
)
|
||||||
|
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", argv)
|
||||||
|
self.assertIn("BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET", argv)
|
||||||
|
|
||||||
|
def test_agent_argv_receives_canary_env(self):
|
||||||
|
plan = _plan(stage_dir=self.stage_dir, canary=True)
|
||||||
|
argv = launch._agent_run_argv(
|
||||||
|
plan,
|
||||||
|
"bot-bottle-net-dev-abc",
|
||||||
|
"192.0.2.10",
|
||||||
|
)
|
||||||
|
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", argv)
|
||||||
|
|
||||||
def test_agent_env_points_proxy_at_sidecar_ip(self):
|
def test_agent_env_points_proxy_at_sidecar_ip(self):
|
||||||
plan = _plan(
|
plan = _plan(
|
||||||
stage_dir=self.stage_dir,
|
stage_dir=self.stage_dir,
|
||||||
@@ -271,7 +294,7 @@ def _build_plan(stage_dir: Path) -> MacosContainerBottlePlan:
|
|||||||
manifest=_MANIFEST,
|
manifest=_MANIFEST,
|
||||||
stage_dir=stage_dir,
|
stage_dir=stage_dir,
|
||||||
git_gate_plan=cast(GitGatePlan, SimpleNamespace(upstreams=())),
|
git_gate_plan=cast(GitGatePlan, SimpleNamespace(upstreams=())),
|
||||||
egress_plan=cast(EgressPlan, SimpleNamespace()),
|
egress_plan=cast(EgressPlan, SimpleNamespace(canary="")),
|
||||||
supervise_plan=None,
|
supervise_plan=None,
|
||||||
agent_provision=AgentProvisionPlan(
|
agent_provision=AgentProvisionPlan(
|
||||||
template="claude",
|
template="claude",
|
||||||
|
|||||||
@@ -26,9 +26,7 @@ from bot_bottle.backend.smolmachines.bottle import SmolmachinesBottle
|
|||||||
from bot_bottle.backend.smolmachines.bottle_plan import (
|
from bot_bottle.backend.smolmachines.bottle_plan import (
|
||||||
SmolmachinesBottlePlan,
|
SmolmachinesBottlePlan,
|
||||||
)
|
)
|
||||||
# from bot_bottle.backend.smolmachines.provision import (
|
from bot_bottle.backend.smolmachines import launch as _launch
|
||||||
# workspace as _workspace,
|
|
||||||
# )
|
|
||||||
from bot_bottle.backend.smolmachines.launch import _bundle_launch_spec
|
from bot_bottle.backend.smolmachines.launch import _bundle_launch_spec
|
||||||
from bot_bottle.backend.util import AGENT_CA_PATH
|
from bot_bottle.backend.util import AGENT_CA_PATH
|
||||||
from bot_bottle.egress import EgressPlan, EgressRoute
|
from bot_bottle.egress import EgressPlan, EgressRoute
|
||||||
@@ -86,6 +84,7 @@ def _plan(
|
|||||||
stage_dir: Path | None = None,
|
stage_dir: Path | None = None,
|
||||||
egress_routes: tuple[EgressRoute, ...] = (),
|
egress_routes: tuple[EgressRoute, ...] = (),
|
||||||
egress_ca_path: Path = Path(),
|
egress_ca_path: Path = Path(),
|
||||||
|
canary: bool = False,
|
||||||
supervise: bool = False,
|
supervise: bool = False,
|
||||||
bundle_ip: str = "192.168.50.2",
|
bundle_ip: str = "192.168.50.2",
|
||||||
agent_git_gate_host: str = "127.0.0.1:55555",
|
agent_git_gate_host: str = "127.0.0.1:55555",
|
||||||
@@ -156,6 +155,8 @@ def _plan(
|
|||||||
routes=egress_routes,
|
routes=egress_routes,
|
||||||
token_env_map={},
|
token_env_map={},
|
||||||
mitmproxy_ca_cert_only_host_path=egress_ca_path,
|
mitmproxy_ca_cert_only_host_path=egress_ca_path,
|
||||||
|
canary="fake-canary-value" if canary else "",
|
||||||
|
canary_env="CANON_ALPHA_SECRET" if canary else "",
|
||||||
),
|
),
|
||||||
supervise_plan=supervise_plan,
|
supervise_plan=supervise_plan,
|
||||||
agent_git_gate_host=agent_git_gate_host,
|
agent_git_gate_host=agent_git_gate_host,
|
||||||
@@ -411,6 +412,31 @@ class TestBundleLaunchSpec(unittest.TestCase):
|
|||||||
self.assertIn(9420, spec.ports_to_publish)
|
self.assertIn(9420, spec.ports_to_publish)
|
||||||
self.assertNotIn(9418, spec.ports_to_publish)
|
self.assertNotIn(9418, spec.ports_to_publish)
|
||||||
|
|
||||||
|
def test_canary_env_registered_as_sensitive_in_bundle(self):
|
||||||
|
plan = _plan(canary=True)
|
||||||
|
|
||||||
|
spec = _bundle_launch_spec(plan, "net", "127.0.0.16")
|
||||||
|
|
||||||
|
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", spec.environment)
|
||||||
|
self.assertIn(
|
||||||
|
"BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET",
|
||||||
|
spec.environment,
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_canary_env_visible_to_smolvm_guest(self):
|
||||||
|
plan = _plan(canary=True)
|
||||||
|
with patch.object(
|
||||||
|
_launch._bundle,
|
||||||
|
"bundle_host_port",
|
||||||
|
return_value="65000",
|
||||||
|
):
|
||||||
|
stamped = _launch._discover_urls(plan, "127.0.0.16")
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
"fake-canary-value",
|
||||||
|
stamped.guest_env["CANON_ALPHA_SECRET"],
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class TestProvisionGitUser(unittest.TestCase):
|
class TestProvisionGitUser(unittest.TestCase):
|
||||||
"""`provision_git` runs `git config --global` inside the
|
"""`provision_git` runs `git config --global` inside the
|
||||||
|
|||||||
Reference in New Issue
Block a user