Compare commits

..

1 Commits

Author SHA1 Message Date
didericis a6ae6841bb fix: route remote control through provider startup args
lint / lint (push) Successful in 2m20s
test / unit (pull_request) Successful in 47s
test / integration (pull_request) Successful in 28s
2026-06-25 00:30:50 -04:00
22 changed files with 94 additions and 1417 deletions
+2 -4
View File
@@ -28,8 +28,6 @@ from typing import Any
from ...egress import ( from ...egress import (
EGRESS_HOSTNAME, EGRESS_HOSTNAME,
EGRESS_ROUTES_IN_CONTAINER, EGRESS_ROUTES_IN_CONTAINER,
egress_agent_env_entries,
egress_sidecar_env_entries,
) )
from ...git_gate import GIT_GATE_HOSTNAME from ...git_gate import GIT_GATE_HOSTNAME
from ...log import die, warn from ...log import die, warn
@@ -137,7 +135,8 @@ def _sidecar_bundle_service(plan: DockerBottlePlan) -> dict[str, Any]:
volumes.append(_bind(ep.mitmproxy_ca_host_path, EGRESS_CA_IN_CONTAINER)) volumes.append(_bind(ep.mitmproxy_ca_host_path, EGRESS_CA_IN_CONTAINER))
if ep.routes: if ep.routes:
volumes.append(_bind(ep.routes_path.parent, str(Path(EGRESS_ROUTES_IN_CONTAINER).parent))) volumes.append(_bind(ep.routes_path.parent, str(Path(EGRESS_ROUTES_IN_CONTAINER).parent)))
env.extend(egress_sidecar_env_entries(ep)) for token_env in sorted(ep.token_env_map.keys()):
env.append(token_env)
# --- git-gate ----------------------------------------------------- # --- git-gate -----------------------------------------------------
gp = plan.git_gate_plan gp = plan.git_gate_plan
@@ -221,7 +220,6 @@ def _agent_service(plan: DockerBottlePlan) -> dict[str, Any]:
# never lands on argv or in the compose file. # never lands on argv or in the compose file.
for name in sorted(plan.forwarded_env.keys()): for name in sorted(plan.forwarded_env.keys()):
env.append(name) env.append(name)
env.extend(egress_agent_env_entries(plan.egress_plan))
service: dict[str, Any] = { service: dict[str, Any] = {
"image": plan.image, "image": plan.image,
+2 -6
View File
@@ -11,7 +11,7 @@ from pathlib import Path
from ..bottle_state import egress_state_dir from ..bottle_state import egress_state_dir
from ..egress import EGRESS_ROUTES_FILENAME from ..egress import EGRESS_ROUTES_FILENAME
from ..egress_addon_core import LOG_OFF, load_config from ..egress_addon_core import load_routes
class EgressApplyError(RuntimeError): class EgressApplyError(RuntimeError):
@@ -33,15 +33,11 @@ class EgressApplicator(ABC):
@staticmethod @staticmethod
def validate_routes_content(content: str) -> None: def validate_routes_content(content: str) -> None:
try: try:
config = load_config(content) load_routes(content)
except ValueError as e: except ValueError as e:
raise EgressApplyError( raise EgressApplyError(
f"proposed routes.yaml is not valid: {e}" f"proposed routes.yaml is not valid: {e}"
) from e ) from e
if config.log != LOG_OFF:
raise EgressApplyError(
"proposed routes.yaml must not change egress logging"
)
@staticmethod @staticmethod
def _routes_path(slug: str) -> Path: def _routes_path(slug: str) -> Path:
+4 -8
View File
@@ -22,12 +22,7 @@ from ...bottle_state import (
git_gate_state_dir, git_gate_state_dir,
read_committed_image, read_committed_image,
) )
from ...egress import ( from ...egress import EGRESS_ROUTES_IN_CONTAINER, egress_resolve_token_values
EGRESS_ROUTES_IN_CONTAINER,
egress_agent_env_entries,
egress_resolve_token_values,
egress_sidecar_env_entries,
)
from ...git_gate import revoke_git_gate_provisioned_keys from ...git_gate import revoke_git_gate_provisioned_keys
from ...log import die, info, warn from ...log import die, info, warn
from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT
@@ -355,7 +350,9 @@ def _sidecar_daemons(plan: MacosContainerBottlePlan) -> tuple[str, ...]:
def _sidecar_env_entries(plan: MacosContainerBottlePlan) -> tuple[str, ...]: def _sidecar_env_entries(plan: MacosContainerBottlePlan) -> tuple[str, ...]:
env: list[str] = list(egress_sidecar_env_entries(plan.egress_plan)) env: list[str] = []
if plan.egress_plan.routes:
env.extend(sorted(plan.egress_plan.token_env_map.keys()))
if plan.git_gate_plan.upstreams: if plan.git_gate_plan.upstreams:
env.append(f"BOT_BOTTLE_GIT_GATE_READY_FILE={_GIT_GATE_READY_FILE}") env.append(f"BOT_BOTTLE_GIT_GATE_READY_FILE={_GIT_GATE_READY_FILE}")
if plan.supervise_plan is not None: if plan.supervise_plan is not None:
@@ -423,7 +420,6 @@ def _agent_env_entries(
env.append(f"{name}={value}") env.append(f"{name}={value}")
for name in sorted(plan.forwarded_env.keys()): for name in sorted(plan.forwarded_env.keys()):
env.append(name) env.append(name)
env.extend(egress_agent_env_entries(plan.egress_plan))
return tuple(env) return tuple(env)
@@ -68,11 +68,6 @@ def build_image(ref: str, context: str, *, dockerfile: str = "") -> None:
_ensure_builder_dns() _ensure_builder_dns()
args = [_CONTAINER, "build", "-t", ref, "--dns", dns_server()] args = [_CONTAINER, "build", "-t", ref, "--dns", dns_server()]
if dockerfile: if dockerfile:
# `container build` resolves -f relative to the current working
# directory, not the build context. Anchor a relative Dockerfile to
# the context so builds work from any cwd.
if not os.path.isabs(dockerfile):
dockerfile = os.path.join(context, dockerfile)
args.extend(["-f", dockerfile]) args.extend(["-f", dockerfile])
args.append(context) args.append(context)
subprocess.run(args, check=True) subprocess.run(args, check=True)
+5 -6
View File
@@ -23,9 +23,7 @@ from typing import Callable, Generator
from ...egress import ( from ...egress import (
EGRESS_ROUTES_IN_CONTAINER, EGRESS_ROUTES_IN_CONTAINER,
egress_agent_env_entries,
egress_resolve_token_values, egress_resolve_token_values,
egress_sidecar_env_entries,
) )
from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT
from ...util import expand_tilde from ...util import expand_tilde
@@ -230,9 +228,6 @@ def _discover_urls(
guest_env["GIT_GATE_URL"] = f"http://{agent_git_gate_host}" guest_env["GIT_GATE_URL"] = f"http://{agent_git_gate_host}"
if agent_supervise_url: if agent_supervise_url:
guest_env["MCP_SUPERVISE_URL"] = agent_supervise_url guest_env["MCP_SUPERVISE_URL"] = agent_supervise_url
for entry in egress_agent_env_entries(plan.egress_plan):
name, value = entry.split("=", 1)
guest_env[name] = value
return dataclasses.replace( return dataclasses.replace(
plan, plan,
@@ -321,7 +316,11 @@ def _bundle_launch_spec(
volumes.append((str(ep.mitmproxy_ca_host_path), EGRESS_CA_IN_CONTAINER, True)) volumes.append((str(ep.mitmproxy_ca_host_path), EGRESS_CA_IN_CONTAINER, True))
if ep.routes: if ep.routes:
volumes.append((str(ep.routes_path.parent), str(Path(EGRESS_ROUTES_IN_CONTAINER).parent), True)) volumes.append((str(ep.routes_path.parent), str(Path(EGRESS_ROUTES_IN_CONTAINER).parent), True))
env.extend(egress_sidecar_env_entries(ep)) # Bare-name entries for upstream-token slots. Their values
# come from the docker-run subprocess env (inherited from
# the operator's shell), never landing on argv.
for token_env in sorted(ep.token_env_map.keys()):
env.append(token_env)
# --- git-gate --------------------------------------------- # --- git-gate ---------------------------------------------
gp = plan.git_gate_plan gp = plan.git_gate_plan
+3 -159
View File
@@ -15,8 +15,6 @@ import gzip
import re import re
import typing import typing
import unicodedata import unicodedata
from math import log2
from collections import Counter
from urllib.parse import quote as url_quote from urllib.parse import quote as url_quote
try: try:
@@ -109,21 +107,20 @@ def redact_tokens(
text: str, text: str,
*, *,
env: typing.Mapping[str, str] | None = None, env: typing.Mapping[str, str] | None = None,
sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",),
) -> str: ) -> str:
"""Replace token pattern matches and (if env given) provisioned secrets with REDACT.""" """Replace token pattern matches and (if env given) provisioned secrets with REDACT."""
for _, pattern in TOKEN_PATTERNS: for _, pattern in TOKEN_PATTERNS:
text = pattern.sub(REDACT, text) text = pattern.sub(REDACT, text)
if env is not None: if env is not None:
for key, value in env.items(): for key, value in env.items():
if any(key.startswith(p) for p in sensitive_prefixes) and value: if key.startswith("EGRESS_TOKEN_") and value:
for variant in _encoded_variants(value): for variant in _encoded_variants(value):
text = text.replace(variant, REDACT) text = text.replace(variant, REDACT)
return text return text
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Known secrets detector # Known secrets detector (Phase 1b)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def _encoded_variants(secret: str) -> list[str]: def _encoded_variants(secret: str) -> list[str]:
@@ -164,65 +161,18 @@ def _encoded_variants(secret: str) -> list[str]:
return variants return variants
# ---------------------------------------------------------------------------
# Fragmentation-resistant helpers
# ---------------------------------------------------------------------------
# Minimum length of alnum projection for projection-based checks to run.
# Short secrets produce too many false positives in projection space.
_ALNUM_MIN_LEN = 8
# Minimum window length for the partial-substring sliding scan.
PARTIAL_MATCH_MIN_LEN = 12
def _alnum_projection(text: str) -> str:
"""Return text with every non-alphanumeric character stripped.
Used for fragmentation-resistant matching: separator-injected secrets
(spaces, hyphens, dots inserted between characters) are identical to
their originals in alnum projection space.
"""
return "".join(c for c in text if c.isalnum())
def _find_partial_window(secret_alnum: str, text_alnum: str, min_len: int) -> int | None:
"""Return the position in text_alnum where any min_len-char window of
secret_alnum first appears, or None.
Slides a window of width min_len across secret_alnum and searches for
each window in text_alnum. The first hit position is returned.
"""
if len(secret_alnum) < min_len or len(text_alnum) < min_len:
return None
for i in range(len(secret_alnum) - min_len + 1):
window = secret_alnum[i:i + min_len]
pos = text_alnum.find(window)
if pos >= 0:
return pos
return None
def scan_known_secrets( def scan_known_secrets(
text: str, text: str,
*, *,
location: str = "body", location: str = "body",
env: typing.Mapping[str, str] | None = None, env: typing.Mapping[str, str] | None = None,
sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",),
safe_tokens: typing.AbstractSet[str] | None = None, safe_tokens: typing.AbstractSet[str] | None = None,
) -> ScanResult | None: ) -> ScanResult | None:
if env is None: if env is None:
return None return None
# Pre-compute alnum projection of the scan text once; reused per secret.
text_alnum: str | None = None
for key, value in env.items(): for key, value in env.items():
if not any(key.startswith(p) for p in sensitive_prefixes) or not value: if not key.startswith("EGRESS_TOKEN_") or not value:
continue continue
# Pass 1: exact match across encoded variants (original behaviour).
approved_exact = False
for variant in _encoded_variants(value): for variant in _encoded_variants(value):
pos = text.find(variant) pos = text.find(variant)
if pos >= 0: if pos >= 0:
@@ -230,7 +180,6 @@ def scan_known_secrets(
# (PRD 0062); a different encoding of the same secret is a # (PRD 0062); a different encoding of the same secret is a
# fresh block. # fresh block.
if safe_tokens is not None and variant in safe_tokens: if safe_tokens is not None and variant in safe_tokens:
approved_exact = True
continue continue
return ScanResult( return ScanResult(
severity="block", severity="block",
@@ -239,104 +188,6 @@ def scan_known_secrets(
context=_snippet(text, pos, pos + len(variant)), context=_snippet(text, pos, pos + len(variant)),
matched=variant, matched=variant,
) )
if approved_exact:
# Exact match was found and approved; projection passes would
# fire on the same value, so skip them for this secret.
continue
# Pass 2 & 3: fragmentation-resistant projection checks.
secret_alnum = _alnum_projection(value)
if len(secret_alnum) < _ALNUM_MIN_LEN:
continue
if text_alnum is None:
text_alnum = _alnum_projection(text)
# Pass 2: full alnum-projection exact match (catches separator injection).
pos2 = text_alnum.find(secret_alnum)
if pos2 >= 0:
return ScanResult(
severity="block",
reason=(
f"provisioned secret from {key} found in {location} "
f"(fragmented match — separator injection)"
),
location=location,
context=_snippet(text_alnum, pos2, pos2 + len(secret_alnum)),
)
# Pass 3: sliding-window partial match (catches chunked-substring leaks).
pos3 = _find_partial_window(secret_alnum, text_alnum, PARTIAL_MATCH_MIN_LEN)
if pos3 is not None:
return ScanResult(
severity="block",
reason=(
f"provisioned secret from {key} found in {location} "
f"(partial match — at least {PARTIAL_MATCH_MIN_LEN} consecutive "
f"alphanumeric chars)"
),
location=location,
context=_snippet(text_alnum, pos3, pos3 + PARTIAL_MATCH_MIN_LEN),
)
return None
# ---------------------------------------------------------------------------
# Entropy detector (warn-only)
# ---------------------------------------------------------------------------
# Sliding window size and step for the entropy scan.
ENTROPY_WINDOW = 64
ENTROPY_STEP = 32
# Bits-per-character threshold. Random ASCII printable ≈ 6.6 bits; random
# lowercase hex ≈ 4 bits; random base64url ≈ 6 bits. 5.5 sits above
# typical structured data (JSON, URLs) while staying below truly random
# content.
ENTROPY_BLOCK_THRESHOLD = 5.5
def _shannon_entropy(text: str) -> float:
if not text:
return 0.0
counts = Counter(text)
n = len(text)
return -sum((c / n) * log2(c / n) for c in counts.values())
def scan_entropy(
text: str,
*,
location: str = "body",
window: int = ENTROPY_WINDOW,
threshold: float = ENTROPY_BLOCK_THRESHOLD,
) -> ScanResult | None:
"""Warn-only detector: flag windows of `window` chars with Shannon entropy
above `threshold` bits per character.
Never blocks; always returns severity='warn'. Disabled by default
routes must opt in via dlp.outbound_detectors=['entropy'].
"""
if not text:
return None
step = max(1, window // 2)
end = len(text)
# Scan overlapping windows; also check the final tail if shorter than window.
positions = list(range(0, end - window + 1, step))
if end < window:
positions = [0]
elif (end - window) % step != 0:
positions.append(end - window)
for i in positions:
chunk = text[i:i + window]
if _shannon_entropy(chunk) >= threshold:
return ScanResult(
severity="warn",
reason=f"high-entropy content in {location} (possible encrypted exfil)",
location=location,
context=_snippet(text, i, i + len(chunk)),
)
return None return None
@@ -455,18 +306,11 @@ def scan_crlf_injection(text: str) -> ScanResult | None:
__all__ = [ __all__ = [
"ENTROPY_BLOCK_THRESHOLD",
"ENTROPY_WINDOW",
"ENTROPY_STEP",
"PARTIAL_MATCH_MIN_LEN",
"REDACT", "REDACT",
"SNIPPET_CONTEXT", "SNIPPET_CONTEXT",
"TOKEN_PATTERNS", "TOKEN_PATTERNS",
"_alnum_projection",
"_shannon_entropy",
"redact_tokens", "redact_tokens",
"scan_crlf_injection", "scan_crlf_injection",
"scan_entropy",
"scan_known_secrets", "scan_known_secrets",
"scan_naive_injection", "scan_naive_injection",
"scan_token_patterns", "scan_token_patterns",
-55
View File
@@ -10,7 +10,6 @@ specific and lives on concrete subclasses (see
from __future__ import annotations from __future__ import annotations
import dataclasses import dataclasses
import secrets
from abc import ABC from abc import ABC
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
@@ -35,50 +34,6 @@ EGRESS_HOSTNAME = "egress"
EGRESS_ROUTES_IN_CONTAINER = "/etc/egress/routes.yaml" EGRESS_ROUTES_IN_CONTAINER = "/etc/egress/routes.yaml"
EGRESS_ROUTES_FILENAME = Path(EGRESS_ROUTES_IN_CONTAINER).name EGRESS_ROUTES_FILENAME = Path(EGRESS_ROUTES_IN_CONTAINER).name
_CANARY_ENV_WORDS = (
"ACCORD",
"ANCHOR",
"ATLAS",
"CANON",
"CIPHER",
"EMBER",
"FALCON",
"HARBOR",
"LANTERN",
"MARBLE",
"NOVA",
"ORBIT",
"PIVOT",
"RADIUS",
"SUMMIT",
"VECTOR",
)
def _random_canary_env() -> str:
first = secrets.choice(_CANARY_ENV_WORDS)
remaining = tuple(word for word in _CANARY_ENV_WORDS if word != first)
second = secrets.choice(remaining)
return f"{first}_{second}_SECRET"
def egress_sidecar_env_entries(plan: "EgressPlan") -> tuple[str, ...]:
"""Return sidecar env entries needed by egress across all backends."""
env: list[str] = []
if plan.routes:
env.extend(sorted(plan.token_env_map.keys()))
if plan.canary and plan.canary_env:
env.append(f"{plan.canary_env}={plan.canary}")
env.append(f"BOT_BOTTLE_SENSITIVE_PREFIXES={plan.canary_env}")
return tuple(env)
def egress_agent_env_entries(plan: "EgressPlan") -> tuple[str, ...]:
"""Return agent-visible egress env entries shared by all backends."""
if plan.canary and plan.canary_env:
return (f"{plan.canary_env}={plan.canary}",)
return ()
@dataclass(frozen=True) @dataclass(frozen=True)
class EgressRoute(Route): class EgressRoute(Route):
@@ -110,8 +65,6 @@ class EgressPlan:
mitmproxy_ca_host_path: Path = Path() mitmproxy_ca_host_path: Path = Path()
mitmproxy_ca_cert_only_host_path: Path = Path() mitmproxy_ca_cert_only_host_path: Path = Path()
log: int = 0 log: int = 0
canary: str = ""
canary_env: str = ""
def egress_manifest_routes( def egress_manifest_routes(
@@ -371,18 +324,12 @@ class Egress(ABC):
routes_path = stage_dir / EGRESS_ROUTES_FILENAME routes_path = stage_dir / EGRESS_ROUTES_FILENAME
routes_path.write_text(egress_render_routes(routes, log=log)) routes_path.write_text(egress_render_routes(routes, log=log))
routes_path.chmod(0o600) routes_path.chmod(0o600)
# Generate a per-session fake secret under a plausible random env name.
# The sidecar marks that exact env name as sensitive for known-secret
# scanning; the agent receives the same name/value as exfil bait.
canary = secrets.token_urlsafe(32)
return EgressPlan( return EgressPlan(
slug=slug, slug=slug,
routes_path=routes_path, routes_path=routes_path,
routes=routes, routes=routes,
token_env_map=egress_token_env_map(routes), token_env_map=egress_token_env_map(routes),
log=log, log=log,
canary=canary,
canary_env=_random_canary_env(),
) )
__all__ = [ __all__ = [
@@ -397,7 +344,5 @@ __all__ = [
"egress_render_routes", "egress_render_routes",
"egress_resolve_token_values", "egress_resolve_token_values",
"egress_routes_for_bottle", "egress_routes_for_bottle",
"egress_agent_env_entries",
"egress_sidecar_env_entries",
"egress_token_env_map", "egress_token_env_map",
] ]
+4 -15
View File
@@ -160,37 +160,26 @@ class EgressAddon:
) )
def _log_request(self, flow: http.HTTPFlow) -> None: def _log_request(self, flow: http.HTTPFlow) -> None:
headers = {
k: redact_tokens(v, env=os.environ)
for k, v in flow.request.headers.items()
if k.lower() != "authorization"
}
body = redact_tokens(flow.request.get_text(strict=False) or "", env=os.environ)
sys.stderr.write( sys.stderr.write(
json.dumps({ json.dumps({
"event": "egress_request", "event": "egress_request",
"host": redact_tokens(flow.request.pretty_host, env=os.environ), "host": redact_tokens(flow.request.pretty_host, env=os.environ),
"method": flow.request.method, "method": flow.request.method,
"path": redact_tokens(flow.request.path, env=os.environ), "path": redact_tokens(flow.request.path, env=os.environ),
"headers": headers, "headers": dict(flow.request.headers),
"body": body, "body": flow.request.get_text(strict=False) or "",
}) })
+ "\n" + "\n"
) )
def _log_response(self, flow: http.HTTPFlow) -> None: def _log_response(self, flow: http.HTTPFlow) -> None:
headers = {
k: redact_tokens(v, env=os.environ)
for k, v in flow.response.headers.items()
}
body = redact_tokens(flow.response.get_text(strict=False) or "", env=os.environ)
sys.stderr.write( sys.stderr.write(
json.dumps({ json.dumps({
"event": "egress_response", "event": "egress_response",
"host": flow.request.pretty_host, "host": flow.request.pretty_host,
"status": flow.response.status_code, "status": flow.response.status_code,
"headers": headers, "headers": dict(flow.response.headers),
"body": body, "body": flow.response.get_text(strict=False) or "",
}) })
+ "\n" + "\n"
) )
+13 -32
View File
@@ -34,7 +34,7 @@ VALID_METHODS = frozenset({
"CONNECT", "CONNECT",
}) })
OUTBOUND_DETECTOR_NAMES = frozenset({"token_patterns", "known_secrets", "entropy"}) OUTBOUND_DETECTOR_NAMES = frozenset({"token_patterns", "known_secrets"})
INBOUND_DETECTOR_NAMES = frozenset({"naive_injection_detection"}) INBOUND_DETECTOR_NAMES = frozenset({"naive_injection_detection"})
# Per-route policy for what the proxy does when an outbound DLP detector # Per-route policy for what the proxy does when an outbound DLP detector
@@ -439,6 +439,15 @@ def route_to_yaml_dict(r: Route) -> dict[str, object]:
return d return d
def load_routes(text: str) -> tuple[Route, ...]:
"""Parse YAML text → routes."""
try:
payload = parse_yaml_subset(text)
except YamlSubsetError as e:
raise ValueError(f"routes payload: invalid YAML: {e}") from e
return parse_routes(payload)
def parse_config(payload: object) -> "Config": def parse_config(payload: object) -> "Config":
"""Parse a full egress config payload (top-level log level + routes).""" """Parse a full egress config payload (top-level log level + routes)."""
if not isinstance(payload, dict): if not isinstance(payload, dict):
@@ -720,28 +729,17 @@ def scan_outbound(
try: try:
from dlp_detectors import ( # type: ignore[import-not-found] from dlp_detectors import ( # type: ignore[import-not-found]
scan_crlf_injection, scan_crlf_injection,
scan_entropy,
scan_known_secrets, scan_known_secrets,
scan_token_patterns, scan_token_patterns,
) )
except ImportError: # pragma: no cover - host-side path except ImportError: # pragma: no cover - host-side path
from .dlp_detectors import ( # type: ignore[import-not-found] from .dlp_detectors import ( # type: ignore[import-not-found]
scan_crlf_injection, scan_crlf_injection,
scan_entropy,
scan_known_secrets, scan_known_secrets,
scan_token_patterns, scan_token_patterns,
) )
# Binary bodies: latin-1 is a bijective byte↔codepoint mapping that text = body if isinstance(body, str) else body.decode("utf-8", errors="replace")
# preserves every byte value, so ASCII-range secret strings remain
# findable by str.find / regex. Prefer strict UTF-8 for valid text bodies.
if isinstance(body, bytes):
try:
text = body.decode("utf-8")
except UnicodeDecodeError:
text = body.decode("latin-1")
else:
text = body
# CRLF injection is only an attack in the request line + headers, never the # CRLF injection is only an attack in the request line + headers, never the
# body: an HTTP body is delimited by Content-Length, so CRLF bytes there # body: an HTTP body is delimited by Content-Length, so CRLF bytes there
@@ -760,30 +758,12 @@ def scan_outbound(
return result return result
if _detector_enabled(route.outbound_detectors, "known_secrets"): if _detector_enabled(route.outbound_detectors, "known_secrets"):
# BOT_BOTTLE_SENSITIVE_PREFIXES lets operators add extra env prefixes
# beyond EGRESS_TOKEN_* without changing the manifest schema.
extra_raw = environ.get("BOT_BOTTLE_SENSITIVE_PREFIXES", "")
extra = tuple(p for p in extra_raw.split(",") if p)
sensitive_prefixes = ("EGRESS_TOKEN_",) + extra
result = scan_known_secrets( result = scan_known_secrets(
text, location="body", env=environ, text, location="body", env=environ, safe_tokens=safe_tokens,
sensitive_prefixes=sensitive_prefixes, safe_tokens=safe_tokens,
) )
if result is not None: if result is not None:
return result return result
# Entropy scanning requires explicit opt-in: it is NOT part of the
# default "all detectors" set because it produces false positives on
# legitimate base64 / binary payloads. Routes must list "entropy" in
# dlp.outbound_detectors to enable it.
if (
route.outbound_detectors is not None
and "entropy" in route.outbound_detectors
):
result = scan_entropy(text, location="body")
if result is not None:
return result
return None return None
@@ -853,6 +833,7 @@ __all__ = [
"is_git_push_request", "is_git_push_request",
"is_git_fetch_request", "is_git_fetch_request",
"load_config", "load_config",
"load_routes",
"match_route", "match_route",
"outbound_scan_headers", "outbound_scan_headers",
"parse_config", "parse_config",
+3 -8
View File
@@ -47,11 +47,11 @@ from pathlib import Path
try: try:
# Same-directory imports inside the bundle container; these files are # Same-directory imports inside the bundle container; these files are
# COPYed flat under /app by Dockerfile.sidecars. # COPYed flat under /app by Dockerfile.sidecars.
from egress_addon_core import LOG_OFF, load_config from egress_addon_core import load_routes
import supervise as _sv import supervise as _sv
except ModuleNotFoundError: except ModuleNotFoundError:
# Package imports for host-side tests and tooling. # Package imports for host-side tests and tooling.
from .egress_addon_core import LOG_OFF, load_config from .egress_addon_core import load_routes
from . import supervise as _sv from . import supervise as _sv
@@ -297,17 +297,12 @@ def validate_proposed_file(tool: str, content: str) -> None:
pass pass
elif tool in (_sv.TOOL_EGRESS_ALLOW, _sv.TOOL_EGRESS_BLOCK): elif tool in (_sv.TOOL_EGRESS_ALLOW, _sv.TOOL_EGRESS_BLOCK):
try: try:
config = load_config(content) load_routes(content)
except ValueError as e: except ValueError as e:
raise _RpcError( raise _RpcError(
ERR_INVALID_PARAMS, ERR_INVALID_PARAMS,
f"{tool}: proposed routes.yaml is not valid: {e}", f"{tool}: proposed routes.yaml is not valid: {e}",
) from e ) from e
if config.log != LOG_OFF:
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: proposed routes.yaml must not change egress logging",
)
else: else:
raise _RpcError(ERR_INVALID_PARAMS, f"unknown tool {tool!r}") raise _RpcError(ERR_INVALID_PARAMS, f"unknown tool {tool!r}")
@@ -1,189 +0,0 @@
# PRD 0063: Strengthen outbound exfiltration detection
- **Status:** Active
- **Author:** claude
- **Created:** 2026-06-25
- **Issue:** #259
## Summary
The current DLP system has two detection tiers: weak (vendor token regexes)
and strong (exact matching of provisioned `EGRESS_TOKEN_*` values in multiple
encodings). This PRD adds four complementary hardening layers in ranked order
of impact:
1. **Canary tokens** — inject a per-session fake secret into the agent's
environment; any outbound appearance is a zero-false-positive exfil signal.
2. **Broadened known-value scanning** — extend the strong tier beyond
`EGRESS_TOKEN_*` via a `BOT_BOTTLE_SENSITIVE_PREFIXES` sidecar env var.
3. **Fragmentation-resistant matching** — catch separator injection and
long-substring leaks via alnum projection and sliding-window substring
search.
4. **Entropy scoring** — warn-only flag on high-entropy blobs as a
complementary signal for encrypted exfil.
## Problem
After PRD 0056 the outbound scan covers all HTTP request surfaces (host, path,
query, headers, body). But the content-based strong tier only matches
`EGRESS_TOKEN_*` values and their encodings. Three gaps remain:
| Gap | How an agent exploits it |
|-----|--------------------------|
| No canary | There is no "can't be false-positive" signal |
| Narrow prefix filter | Sensitive values provisioned under other env prefixes (MCP keys, API keys injected via the agent provider) are invisible to `scan_known_secrets` |
| Fragmentation blindness | Inserting `-`, space, or `\n` between secret characters turns an exact-match into a miss |
## Goals / Success Criteria
1. Each launched bottle has a unique canary token in the agent's environment
under a randomized `WORD_WORD_SECRET` env var name. The egress sidecar gets
the same env var and registers that exact name through
`BOT_BOTTLE_SENSITIVE_PREFIXES`. Any outbound appearance of the canary
blocks the request as a known-secret match.
2. `scan_known_secrets` accepts a `sensitive_prefixes` parameter (default:
`("EGRESS_TOKEN_",)`). `scan_outbound` reads
`BOT_BOTTLE_SENSITIVE_PREFIXES` from `environ` and merges those prefixes
in, so operators can mark additional env vars as scanned values without
changing the manifest schema.
3. For every secret that passes exact-match, a secondary alnum-projection pass
checks for the secret with all non-alphanumeric characters stripped. This
catches separator-injection evasion (`MY-SECRET` → body contains
`MY SECRET`).
4. A sliding-window partial-match pass checks for long-enough contiguous
substrings of the secret's alnum projection in the text's alnum projection.
Any match ≥ `PARTIAL_MATCH_MIN_LEN` (12 chars) blocks with reason
`"partial match"`.
5. A new `scan_entropy` detector flags outbound text windows with Shannon
entropy ≥ `ENTROPY_BLOCK_THRESHOLD` (5.5 bits/char) at **warn** severity
only. It is registered under the new detector name `"entropy"` in
`OUTBOUND_DETECTOR_NAMES` and disabled by default (routes must opt in).
6. Binary request bodies are decoded via `latin-1` instead of
`utf-8 errors="replace"`, preserving every byte value and allowing
ASCII-range secrets to be found within binary payloads.
7. All new behaviour is unit-tested; existing tests pass unchanged.
## Non-goals
- Rolling per-host buffer for split-across-requests detection (state in the
stateless addon is complex; deferred).
- Additional vendor regexes.
- ML / embedding-based detection.
- Entropy-based hard blocks (warn only per the issue).
## Design
### Canary token flow
```
Egress.prepare()
canary = secrets.token_urlsafe(32)
canary_env = <random WORD_WORD_SECRET>
EgressPlan(canary=canary, canary_env=canary_env, ...)
Docker compose render:
sidecar env: <canary_env>=<canary>
sidecar env: BOT_BOTTLE_SENSITIVE_PREFIXES=<canary_env>
agent env: <canary_env>=<canary> ← visible to agent as a "secret"
macos-container launch: same literals added to sidecar + agent env entries
```
The sidecar uses `BOT_BOTTLE_SENSITIVE_PREFIXES` to make the random canary env
name part of the existing `scan_known_secrets` detector without adding a
manifest schema field.
### Broadened known-value scanning
`scan_known_secrets` gains a `sensitive_prefixes` parameter:
```python
def scan_known_secrets(
text: str,
*,
location: str = "body",
env: Mapping[str, str] | None = None,
sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",),
) -> ScanResult | None:
```
`scan_outbound` reads `BOT_BOTTLE_SENSITIVE_PREFIXES` (comma-separated list
of additional prefixes) from `environ` and appends them:
```python
extra = tuple(
p for p in environ.get("BOT_BOTTLE_SENSITIVE_PREFIXES", "").split(",") if p
)
sensitive_prefixes = ("EGRESS_TOKEN_",) + extra
```
`redact_tokens` receives the same treatment for consistent redaction.
### Fragmentation-resistant matching
A new helper `_alnum_projection(text)` strips all non-alphanumeric characters.
`scan_known_secrets` runs two passes per secret:
1. **Exact pass** — existing encoded-variant loop (unchanged).
2. **Alnum-projection pass** — if the secret's alnum projection has ≥ 8 chars,
check if it appears in the text's alnum projection. Match → block with
`"fragmented match (separator injection)"` reason.
3. **Partial-substring pass** — if the secret's alnum projection has ≥
`PARTIAL_MATCH_MIN_LEN` chars (12), slide a window of that length across the
secret's projection and look for each window in the text's alnum projection.
First match → block with `"partial match"` reason.
All three passes run only for the `"known_secrets"` detector; the token-pattern
and entropy detectors are unchanged.
### Entropy scoring
New public function:
```python
def scan_entropy(
text: str,
*,
location: str = "body",
window: int = ENTROPY_WINDOW, # 64
threshold: float = ENTROPY_BLOCK_THRESHOLD, # 5.5
) -> ScanResult | None:
```
Slides a window of `window` characters across `text` in steps of `window // 2`.
If any window's Shannon entropy exceeds `threshold`, returns a **warn**-severity
`ScanResult`. Never blocks.
`OUTBOUND_DETECTOR_NAMES` gains `"entropy"`. Routes opt in via their `dlp`
block; entropy scanning is **off by default** to avoid false-positive noise on
legitimate binary payloads.
### Binary body handling
In `scan_outbound`, the bytes → str decoding changes from:
```python
body.decode("utf-8", errors="replace")
```
to:
```python
body.decode("utf-8") if body is str else body.decode("latin-1")
```
`latin-1` is a bijective byte↔codepoint mapping; every byte value is preserved
as its corresponding Latin-1 code point, so ASCII-range secret strings remain
intact and `str.find` / regex still locate them correctly. The fallback from
strict UTF-8 is tried first so valid UTF-8 bodies are decoded faithfully.
## Implementation
Delivered in three commits on the same branch:
1. **DLP detector changes**`_alnum_projection`, fragmentation passes,
`scan_entropy`, broadened `scan_known_secrets`, updated `scan_outbound` and
`redact_tokens`; all accompanying unit tests.
2. **Canary injection**`EgressPlan.canary`, `Egress.prepare()`,
Docker compose + macos-container backend injection.
3. **PRD flip**`Status: Draft → Active`.
@@ -1,85 +0,0 @@
# PRD 0064: LOG_FULL egress logging credential redaction
- **Status:** Active
- **Author:** claude
- **Created:** 2026-06-25
- **Issue:** #257
## Summary
The `LOG_FULL` egress logging path (`_log_request` and `_log_response` in `egress_addon.py`) writes request/response headers and bodies to stderr without redaction and includes the sidecar-injected upstream `Authorization` header verbatim. This PR applies `redact_tokens` to header values and bodies in both log functions and strips the injected `Authorization` header from request logs entirely.
## Problem
`LOG_FULL` (log level 2) is intended for debugging egress traffic. When active it calls `_log_request` and `_log_response`. Both functions have two related bugs:
1. **Injected `Authorization` header exposure.** `_log_request` is called *after* the sidecar injects upstream credentials (`flow.request.headers["authorization"] = decision.inject_authorization`). The full header dict — including the live credential — is serialized to stderr. Any log collector that ingests the egress container's stderr will receive the upstream bearer token in plaintext.
2. **Unredacted bodies and header values.** Neither `_log_request` nor `_log_response` passes body or header values through `redact_tokens`. By contrast, `_req_ctx` (used for block/warn events) already calls `redact_tokens` on path and host. Any provisioned secret or recognized token pattern that appears in a request body, response body, or non-Authorization header value will be logged verbatim under `LOG_FULL`.
These two bugs compose: an agent that enables `LOG_FULL` and simultaneously triggers a request that carries a known token gains a write path from credentials → egress logs.
## Goals / Success Criteria
- `_log_request` never logs the `authorization` header in any form.
- `_log_request` applies `redact_tokens(value, env=os.environ)` to every other header value before serializing.
- `_log_request` applies `redact_tokens(body, env=os.environ)` to the request body before logging.
- `_log_response` applies `redact_tokens(value, env=os.environ)` to every response header value before logging.
- `_log_response` applies `redact_tokens(body, env=os.environ)` to the response body before logging.
- Unit tests cover each of the five cases above.
## Non-goals
- Redacting host or path in the full-log path (already covered by `_req_ctx` for block/warn events; `_log_request` already calls `redact_tokens` on host and path).
- Suppressing `LOG_FULL` or adding a new log level.
- Changing the outbound DLP scan logic.
## Design
### `_log_request`
```python
def _log_request(self, flow: http.HTTPFlow) -> None:
headers = {
k: redact_tokens(v, env=os.environ)
for k, v in flow.request.headers.items()
if k.lower() != "authorization"
}
body = redact_tokens(flow.request.get_text(strict=False) or "", env=os.environ)
sys.stderr.write(
json.dumps({
"event": "egress_request",
"host": redact_tokens(flow.request.pretty_host, env=os.environ),
"method": flow.request.method,
"path": redact_tokens(flow.request.path, env=os.environ),
"headers": headers,
"body": body,
})
+ "\n"
)
```
The `authorization` key is excluded because by the time `_log_request` is called the sidecar has already injected the upstream credential (`decision.inject_authorization`). Logging it would write a live bearer token to stderr on every allowed request. There is no safe subset to log — the value is always a live credential or empty.
### `_log_response`
```python
def _log_response(self, flow: http.HTTPFlow) -> None:
headers = {
k: redact_tokens(v, env=os.environ)
for k, v in flow.response.headers.items()
}
body = redact_tokens(flow.response.get_text(strict=False) or "", env=os.environ)
sys.stderr.write(
json.dumps({
"event": "egress_response",
"host": flow.request.pretty_host,
"status": flow.response.status_code,
"headers": headers,
"body": body,
})
+ "\n"
)
```
Response headers don't carry injected credentials, so no header name is suppressed — only the values are scrubbed by `redact_tokens`.
+2 -23
View File
@@ -80,11 +80,7 @@ def _git_gate_plan(upstreams: tuple[GitGateUpstream, ...] = ()) -> GitGatePlan:
) )
def _egress_plan( def _egress_plan(routes: tuple[EgressRoute, ...] = ()) -> EgressPlan:
routes: tuple[EgressRoute, ...] = (),
*,
canary: bool = False,
) -> EgressPlan:
token_env_map = { token_env_map = {
r.token_env: r.token_ref r.token_env: r.token_ref
for r in routes for r in routes
@@ -99,8 +95,6 @@ def _egress_plan(
egress_network=f"bot-bottle-egress-{SLUG}", egress_network=f"bot-bottle-egress-{SLUG}",
mitmproxy_ca_host_path=STATE / "egress-ca" / "mitmproxy-ca.pem", mitmproxy_ca_host_path=STATE / "egress-ca" / "mitmproxy-ca.pem",
mitmproxy_ca_cert_only_host_path=STATE / "egress-ca" / "ca.pem", mitmproxy_ca_cert_only_host_path=STATE / "egress-ca" / "ca.pem",
canary="fake-canary-value" if canary else "",
canary_env="CANON_ALPHA_SECRET" if canary else "",
) )
@@ -118,7 +112,6 @@ def _plan(
with_git: bool = False, with_git: bool = False,
with_egress: bool = False, with_egress: bool = False,
supervise: bool = False, supervise: bool = False,
canary: bool = False,
) -> DockerBottlePlan: ) -> DockerBottlePlan:
"""Build a fully-resolved DockerBottlePlan. Toggles cover the """Build a fully-resolved DockerBottlePlan. Toggles cover the
matrix the renderer's conditional-service logic branches on.""" matrix the renderer's conditional-service logic branches on."""
@@ -157,7 +150,7 @@ def _plan(
slug=SLUG, slug=SLUG,
forwarded_env={"CLAUDE_CODE_OAUTH_TOKEN": "x"}, forwarded_env={"CLAUDE_CODE_OAUTH_TOKEN": "x"},
git_gate_plan=_git_gate_plan(upstreams), git_gate_plan=_git_gate_plan(upstreams),
egress_plan=_egress_plan(routes, canary=canary), egress_plan=_egress_plan(routes),
supervise_plan=_supervise_plan() if supervise else None, supervise_plan=_supervise_plan() if supervise else None,
use_runsc=False, use_runsc=False,
agent_provision=AgentProvisionPlan( agent_provision=AgentProvisionPlan(
@@ -382,20 +375,6 @@ class TestSidecarBundleShape(unittest.TestCase):
env_strings = sc["environment"] env_strings = sc["environment"]
self.assertNotIn("EGRESS_TOKEN_0", env_strings) self.assertNotIn("EGRESS_TOKEN_0", env_strings)
def test_canary_env_registered_as_sensitive_in_sidecar(self):
sc = self._render(canary=True)["services"]["sidecars"]
env_strings = sc["environment"]
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", env_strings)
self.assertIn(
"BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET",
env_strings,
)
def test_canary_env_visible_to_agent(self):
agent = self._render(canary=True)["services"]["agent"]
env_strings = agent["environment"]
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", env_strings)
def test_supervise_env_present_when_active(self): def test_supervise_env_present_when_active(self):
sc = self._render(supervise=True)["services"]["sidecars"] sc = self._render(supervise=True)["services"]["sidecars"]
env_strings = sc["environment"] env_strings = sc["environment"]
+2 -192
View File
@@ -1,23 +1,18 @@
"""Unit: DLP detectors (PRD 0053). """Unit: DLP detectors (PRD 0053).
Tests for token pattern scanning, known secret detection, fragmentation- Tests for token pattern scanning, known secret detection, and
resistant matching, entropy scoring, and naive prompt injection detection.""" naive prompt injection detection."""
import base64 import base64
import gzip import gzip
import unittest import unittest
from bot_bottle.dlp_detectors import ( from bot_bottle.dlp_detectors import (
ENTROPY_BLOCK_THRESHOLD,
PARTIAL_MATCH_MIN_LEN,
REDACT, REDACT,
_alnum_projection,
_encoded_variants, _encoded_variants,
_normalize_text, _normalize_text,
_shannon_entropy,
redact_tokens, redact_tokens,
scan_crlf_injection, scan_crlf_injection,
scan_entropy,
scan_known_secrets, scan_known_secrets,
scan_naive_injection, scan_naive_injection,
scan_token_patterns, scan_token_patterns,
@@ -507,191 +502,6 @@ class TestStripCrlf(unittest.TestCase):
from bot_bottle.dlp_detectors import strip_crlf from bot_bottle.dlp_detectors import strip_crlf
self.assertEqual("/api/v1/data?q=hello", strip_crlf("/api/v1/data?q=hello")) self.assertEqual("/api/v1/data?q=hello", strip_crlf("/api/v1/data?q=hello"))
class TestAlnumProjection(unittest.TestCase):
def test_alphanumeric_unchanged(self):
self.assertEqual("abc123XYZ", _alnum_projection("abc123XYZ"))
def test_strips_hyphens(self):
self.assertEqual("mysecretvalue", _alnum_projection("my-secret-value"))
def test_strips_spaces(self):
self.assertEqual("mysecretvalue", _alnum_projection("my secret value"))
def test_strips_dots_and_underscores(self):
self.assertEqual("mysecretvalue", _alnum_projection("my.secret_value"))
def test_empty_string(self):
self.assertEqual("", _alnum_projection(""))
def test_all_special_chars(self):
self.assertEqual("", _alnum_projection("!@#$%^&*()"))
class TestFragmentationResistantMatching(unittest.TestCase):
"""scan_known_secrets catches separator-injection and partial-substring evasion."""
# Secrets long enough that their alnum projections are ≥ 8 chars.
SECRET = "supersecrettoken99"
ENV = {"EGRESS_TOKEN_0": SECRET}
def test_exact_match_still_works(self):
result = scan_known_secrets(f"key={self.SECRET}", env=self.ENV)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_separator_injection_blocked(self):
# Hyphens inserted between chars of the secret.
fragmented = "-".join(self.SECRET)
result = scan_known_secrets(f"data={fragmented}", env=self.ENV)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
self.assertIn("separator injection", result.reason)
def test_space_separator_blocked(self):
fragmented = " ".join(self.SECRET)
result = scan_known_secrets(f"body: {fragmented}", env=self.ENV)
self.assertIsNotNone(result)
assert result is not None
self.assertIn("separator injection", result.reason)
def test_partial_substring_blocked(self):
# First PARTIAL_MATCH_MIN_LEN alnum chars of the secret, no separators.
partial = _alnum_projection(self.SECRET)[:PARTIAL_MATCH_MIN_LEN]
result = scan_known_secrets(f"x={partial}&y=other", env=self.ENV)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
self.assertIn("partial match", result.reason)
def test_short_secret_skips_projection(self):
# Secrets shorter than _ALNUM_MIN_LEN in alnum projection are not
# fragmentation-checked (too many false positives).
short_env = {"EGRESS_TOKEN_0": "abc"}
# "a b c" has alnum projection "abc" (3 chars, < 8); should not block.
self.assertIsNone(scan_known_secrets("a b c", env=short_env))
def test_clean_text_not_blocked(self):
self.assertIsNone(scan_known_secrets("nothing to see here", env=self.ENV))
def test_sensitive_prefixes_param_extra_prefix(self):
env = {"MY_CRED_0": self.SECRET, "IGNORED": "other"}
result = scan_known_secrets(
f"key={self.SECRET}",
env=env,
sensitive_prefixes=("MY_CRED_",),
)
self.assertIsNotNone(result)
assert result is not None
self.assertIn("MY_CRED_0", result.reason)
def test_sensitive_prefixes_default_only_egress_token(self):
# A value under a non-EGRESS_TOKEN_ key is ignored with default prefixes.
env = {"MY_CRED_0": self.SECRET}
self.assertIsNone(scan_known_secrets(f"key={self.SECRET}", env=env))
def test_canary_prefix_detected(self):
canary_value = "canary-fake-secret-value-xyz"
env = {"CANON_ALPHA_SECRET": canary_value}
result = scan_known_secrets(
f"x={canary_value}",
env=env,
sensitive_prefixes=("CANON_ALPHA_SECRET",),
)
self.assertIsNotNone(result)
assert result is not None
self.assertIn("CANON_ALPHA_SECRET", result.reason)
class TestRedactTokensBroadenedPrefixes(unittest.TestCase):
SECRET = "my-provisioned-secret"
def test_default_redacts_egress_token(self):
env = {"EGRESS_TOKEN_0": self.SECRET}
out = redact_tokens(f"val={self.SECRET}", env=env)
self.assertNotIn(self.SECRET, out)
self.assertIn(REDACT, out)
def test_extra_prefix_redacted(self):
env = {"MY_SECRET_KEY": self.SECRET}
out = redact_tokens(
f"val={self.SECRET}",
env=env,
sensitive_prefixes=("MY_SECRET_",),
)
self.assertNotIn(self.SECRET, out)
self.assertIn(REDACT, out)
def test_non_matching_prefix_not_redacted(self):
env = {"MY_SECRET_KEY": self.SECRET}
out = redact_tokens(f"val={self.SECRET}", env=env)
# Default prefixes only include EGRESS_TOKEN_ → secret not redacted
self.assertIn(self.SECRET, out)
class TestShannonEntropy(unittest.TestCase):
def test_empty_string_zero(self):
self.assertEqual(0.0, _shannon_entropy(""))
def test_single_char_zero(self):
self.assertEqual(0.0, _shannon_entropy("aaaaaa"))
def test_two_equal_chars_one_bit(self):
self.assertAlmostEqual(1.0, _shannon_entropy("abababab"), places=10)
def test_high_entropy_random_like(self):
# Uniform 64-char string over 64 distinct symbols has entropy 6 bits.
import string
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
text = alphabet # each char appears exactly once
self.assertAlmostEqual(6.0, _shannon_entropy(text), places=10)
class TestScanEntropy(unittest.TestCase):
def test_empty_returns_none(self):
self.assertIsNone(scan_entropy(""))
def test_low_entropy_returns_none(self):
# Highly repetitive text has low entropy.
self.assertIsNone(scan_entropy("a" * 200))
def test_high_entropy_warns(self):
# Build a 64-char string with entropy > ENTROPY_BLOCK_THRESHOLD.
# Use all 64 distinct printable chars to maximise entropy (~6 bits).
import string
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
result = scan_entropy(alphabet, threshold=ENTROPY_BLOCK_THRESHOLD)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("warn", result.severity)
self.assertIn("high-entropy", result.reason)
def test_never_blocks(self):
import string
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
result = scan_entropy(alphabet)
# scan_entropy is warn-only; it must never return severity="block".
if result is not None:
self.assertNotEqual("block", result.severity)
def test_location_in_result(self):
import string
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
result = scan_entropy(alphabet, location="authorization header")
if result is not None:
self.assertIn("authorization header", result.location)
def test_structured_json_no_warn(self):
# Typical JSON has low entropy and should not be flagged.
json_body = '{"status": "ok", "message": "hello world", "count": 42}'
self.assertIsNone(scan_entropy(json_body))
def test_short_text_below_window(self):
# Text shorter than the window: checked as one chunk.
# Use a uniform string to ensure it won't be flagged.
self.assertIsNone(scan_entropy("abcde", threshold=ENTROPY_BLOCK_THRESHOLD))
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
+8 -128
View File
@@ -1,21 +1,15 @@
"""Unit: Egress route lift + routes.yaml render + token """Unit: Egress route lift + routes.yaml render + token
resolution (PRD 0017, PRD 0053).""" resolution (PRD 0017, PRD 0053)."""
import tempfile
import unittest import unittest
from pathlib import Path
from bot_bottle.egress import ( from bot_bottle.egress import (
CODEX_HOST_CREDENTIAL_TOKEN_REF, CODEX_HOST_CREDENTIAL_TOKEN_REF,
Egress,
EgressPlan,
EgressRoute, EgressRoute,
egress_agent_env_entries,
egress_manifest_routes, egress_manifest_routes,
egress_render_routes, egress_render_routes,
egress_resolve_token_values, egress_resolve_token_values,
egress_routes_for_bottle, egress_routes_for_bottle,
egress_sidecar_env_entries,
egress_token_env_map, egress_token_env_map,
) )
from bot_bottle.log import Die from bot_bottle.log import Die
@@ -322,7 +316,7 @@ class TestRenderRoutes(unittest.TestCase):
self.assertEqual([], parse_yaml_subset(rendered)["routes"]) self.assertEqual([], parse_yaml_subset(rendered)["routes"])
def test_round_trip_through_addon_core(self): def test_round_trip_through_addon_core(self):
from bot_bottle.egress_addon_core import load_config from bot_bottle.egress_addon_core import load_routes
b = _bottle([ b = _bottle([
{"host": "api.github.com", {"host": "api.github.com",
"auth": {"scheme": "Bearer", "token_ref": "GH_PAT"}, "auth": {"scheme": "Bearer", "token_ref": "GH_PAT"},
@@ -333,7 +327,7 @@ class TestRenderRoutes(unittest.TestCase):
{"host": "api.anthropic.com"}, {"host": "api.anthropic.com"},
]) ])
routes = egress_routes_for_bottle(b) routes = egress_routes_for_bottle(b)
addon_routes = load_config(egress_render_routes(routes)).routes addon_routes = load_routes(egress_render_routes(routes))
self.assertEqual(3, len(addon_routes)) self.assertEqual(3, len(addon_routes))
self.assertEqual("Bearer", addon_routes[0].auth_scheme) self.assertEqual("Bearer", addon_routes[0].auth_scheme)
self.assertEqual("EGRESS_TOKEN_0", addon_routes[0].token_env) self.assertEqual("EGRESS_TOKEN_0", addon_routes[0].token_env)
@@ -341,26 +335,26 @@ class TestRenderRoutes(unittest.TestCase):
self.assertEqual("", addon_routes[2].auth_scheme) self.assertEqual("", addon_routes[2].auth_scheme)
def test_dlp_round_trips(self): def test_dlp_round_trips(self):
from bot_bottle.egress_addon_core import load_config from bot_bottle.egress_addon_core import load_routes
b = _bottle([{"host": "x.example", "dlp": { b = _bottle([{"host": "x.example", "dlp": {
"outbound_detectors": ["token_patterns"], "outbound_detectors": ["token_patterns"],
"inbound_detectors": False, "inbound_detectors": False,
}}]) }}])
routes = egress_routes_for_bottle(b) routes = egress_routes_for_bottle(b)
rendered = egress_render_routes(routes) rendered = egress_render_routes(routes)
addon_routes = load_config(rendered).routes addon_routes = load_routes(rendered)
self.assertEqual(("token_patterns",), addon_routes[0].outbound_detectors) self.assertEqual(("token_patterns",), addon_routes[0].outbound_detectors)
self.assertEqual((), addon_routes[0].inbound_detectors) self.assertEqual((), addon_routes[0].inbound_detectors)
def test_outbound_on_match_round_trips(self): def test_outbound_on_match_round_trips(self):
from bot_bottle.egress_addon_core import load_config from bot_bottle.egress_addon_core import load_routes
b = _bottle([{"host": "logs.example", "dlp": { b = _bottle([{"host": "logs.example", "dlp": {
"outbound_on_match": "redact", "outbound_on_match": "redact",
}}]) }}])
routes = egress_routes_for_bottle(b) routes = egress_routes_for_bottle(b)
rendered = egress_render_routes(routes) rendered = egress_render_routes(routes)
self.assertIn('outbound_on_match: "redact"', rendered) self.assertIn('outbound_on_match: "redact"', rendered)
addon_routes = load_config(rendered).routes addon_routes = load_routes(rendered)
self.assertEqual("redact", addon_routes[0].outbound_on_match) self.assertEqual("redact", addon_routes[0].outbound_on_match)
def test_outbound_on_match_default_omitted_from_render(self): def test_outbound_on_match_default_omitted_from_render(self):
@@ -370,12 +364,12 @@ class TestRenderRoutes(unittest.TestCase):
self.assertNotIn("outbound_on_match", rendered) self.assertNotIn("outbound_on_match", rendered)
def test_git_fetch_policy_round_trips(self): def test_git_fetch_policy_round_trips(self):
from bot_bottle.egress_addon_core import load_config from bot_bottle.egress_addon_core import load_routes
b = _bottle([{"host": "github.com", "git": {"fetch": True}}]) b = _bottle([{"host": "github.com", "git": {"fetch": True}}])
routes = egress_routes_for_bottle(b) routes = egress_routes_for_bottle(b)
rendered = egress_render_routes(routes) rendered = egress_render_routes(routes)
self.assertEqual({"fetch": True}, self._parsed(routes)[0]["git"]) self.assertEqual({"fetch": True}, self._parsed(routes)[0]["git"])
addon_routes = load_config(rendered).routes addon_routes = load_routes(rendered)
self.assertTrue(addon_routes[0].git_fetch) self.assertTrue(addon_routes[0].git_fetch)
def test_log_zero_omitted_from_render(self): def test_log_zero_omitted_from_render(self):
@@ -449,119 +443,5 @@ class TestResolveTokenValues(unittest.TestCase):
self.assertEqual({"EGRESS_TOKEN_0": "codex-access-token"}, out) self.assertEqual({"EGRESS_TOKEN_0": "codex-access-token"}, out)
class TestCanaryGeneration(unittest.TestCase):
"""Egress.prepare() generates a unique canary token per session."""
def _bottle_obj(self):
return ManifestIndex.from_json_obj({
"bottles": {"dev": {"egress": {"routes": []}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
}).bottles["dev"]
def _make_plan(self) -> EgressPlan:
# Use a concrete no-op subclass so we can call prepare() without
# a real backend.
class _TestEgress(Egress):
pass
e = _TestEgress()
with tempfile.TemporaryDirectory() as td:
return e.prepare(self._bottle_obj(), "test-slug", Path(td))
def test_canary_is_non_empty(self):
plan = self._make_plan()
self.assertIsInstance(plan.canary, str)
self.assertGreater(len(plan.canary), 0)
self.assertRegex(plan.canary_env, r"^[A-Z]+_[A-Z]+_SECRET$")
def test_canary_is_unique_per_session(self):
with tempfile.TemporaryDirectory() as td:
bottle = self._bottle_obj()
class _TestEgress(Egress):
pass
e = _TestEgress()
plan_a = e.prepare(bottle, "slug-a", Path(td))
plan_b = e.prepare(bottle, "slug-b", Path(td))
self.assertNotEqual(plan_a.canary, plan_b.canary)
def test_canary_detected_by_scan_known_secrets(self):
from bot_bottle.dlp_detectors import scan_known_secrets
plan = self._make_plan()
env = {plan.canary_env: plan.canary}
result = scan_known_secrets(
f"exfil={plan.canary}",
env=env,
sensitive_prefixes=(plan.canary_env,),
)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
self.assertIn(plan.canary_env, result.reason)
def test_egress_plan_canary_field_default_empty(self):
# Verify EgressPlan can be constructed with an empty canary (backward compat).
from pathlib import Path
plan = EgressPlan(
slug="s",
routes_path=Path("/tmp/r.yaml"),
routes=(),
token_env_map={},
)
self.assertEqual("", plan.canary)
self.assertEqual("", plan.canary_env)
class TestEgressEnvEntries(unittest.TestCase):
def test_sidecar_entries_include_route_tokens_and_canary_scan_prefix(self):
plan = EgressPlan(
slug="s",
routes_path=Path("/tmp/r.yaml"),
routes=(EgressRoute(host="api.example"),),
token_env_map={"EGRESS_TOKEN_1": "T1", "EGRESS_TOKEN_0": "T0"},
canary="fake-canary-value",
canary_env="CANON_ALPHA_SECRET",
)
self.assertEqual(
(
"EGRESS_TOKEN_0",
"EGRESS_TOKEN_1",
"CANON_ALPHA_SECRET=fake-canary-value",
"BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET",
),
egress_sidecar_env_entries(plan),
)
def test_agent_entries_include_only_canary_bait(self):
plan = EgressPlan(
slug="s",
routes_path=Path("/tmp/r.yaml"),
routes=(),
token_env_map={},
canary="fake-canary-value",
canary_env="CANON_ALPHA_SECRET",
)
self.assertEqual(
("CANON_ALPHA_SECRET=fake-canary-value",),
egress_agent_env_entries(plan),
)
def test_canary_entries_omitted_when_name_missing(self):
plan = EgressPlan(
slug="s",
routes_path=Path("/tmp/r.yaml"),
routes=(),
token_env_map={},
canary="fake-canary-value",
)
self.assertEqual((), egress_sidecar_env_entries(plan))
self.assertEqual((), egress_agent_env_entries(plan))
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
+42 -130
View File
@@ -32,6 +32,7 @@ from bot_bottle.egress_addon_core import (
is_git_fetch_request, is_git_fetch_request,
is_git_push_request, is_git_push_request,
load_config, load_config,
load_routes,
match_route, match_route,
outbound_scan_headers, outbound_scan_headers,
parse_config, parse_config,
@@ -288,6 +289,47 @@ class TestParseDlp(unittest.TestCase):
}]}) }]})
# --- load_routes ---------------------------------------------------------
class TestLoadRoutes(unittest.TestCase):
def test_yaml_text_round_trip(self):
routes = load_routes(
'routes:\n'
' - host: "api.example"\n'
)
self.assertEqual(1, len(routes))
self.assertEqual("api.example", routes[0].host)
def test_full_route_shape_parses(self):
routes = load_routes(
'routes:\n'
' - host: "api.example"\n'
' auth_scheme: "Bearer"\n'
' token_env: "EGRESS_TOKEN_0"\n'
' matches:\n'
' - paths:\n'
' - value: "/v1/"\n'
' - type: "exact"\n'
' value: "/messages"\n'
)
self.assertEqual(1, len(routes))
r = routes[0]
self.assertEqual("api.example", r.host)
self.assertEqual("Bearer", r.auth_scheme)
self.assertEqual("EGRESS_TOKEN_0", r.token_env)
self.assertEqual(1, len(r.matches))
self.assertEqual(2, len(r.matches[0].paths))
def test_empty_routes_list(self):
routes = load_routes("routes: []\n")
self.assertEqual((), routes)
def test_invalid_yaml_raises_value_error(self):
with self.assertRaises(ValueError):
load_routes("routes:\n\t- host: x\n")
# --- load_config / parse_config ------------------------------------------ # --- load_config / parse_config ------------------------------------------
@@ -336,33 +378,6 @@ class TestLoadConfig(unittest.TestCase):
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
parse_config("not a dict") parse_config("not a dict")
def test_empty_routes_list(self):
cfg = load_config("routes: []\n")
self.assertEqual((), cfg.routes)
def test_full_route_shape_parses(self):
cfg = load_config(
'routes:\n'
' - host: "api.example"\n'
' auth_scheme: "Bearer"\n'
' token_env: "EGRESS_TOKEN_0"\n'
' matches:\n'
' - paths:\n'
' - value: "/v1/"\n'
' - type: "exact"\n'
' value: "/messages"\n'
)
r = cfg.routes[0]
self.assertEqual("api.example", r.host)
self.assertEqual("Bearer", r.auth_scheme)
self.assertEqual("EGRESS_TOKEN_0", r.token_env)
self.assertEqual(1, len(r.matches))
self.assertEqual(2, len(r.matches[0].paths))
def test_invalid_yaml_raises_value_error(self):
with self.assertRaises(ValueError):
load_config("routes:\n\t- host: x\n")
# --- evaluate_matches --------------------------------------------------- # --- evaluate_matches ---------------------------------------------------
@@ -1258,109 +1273,6 @@ class TestBuildTokenAllowPayload(unittest.TestCase):
result = ScanResult(severity="block", reason="r", matched="x") result = ScanResult(severity="block", reason="r", matched="x")
payload = build_token_allow_payload("h", "GET", "/", result) payload = build_token_allow_payload("h", "GET", "/", result)
self.assertNotIn("context:", payload) self.assertNotIn("context:", payload)
class TestScanOutboundEnhanced(unittest.TestCase):
"""scan_outbound changes: binary decode, entropy detector,
broadened known-value prefixes, fragmentation resistance."""
_ROUTE = Route(host="api.example.com")
_ROUTE_ENTROPY = Route(
host="api.example.com",
outbound_detectors=("entropy",),
)
def test_binary_body_latin1_decode_finds_ascii_secret(self):
# Body contains valid ASCII secret surrounded by non-UTF-8 bytes.
secret = "supersecrettoken99"
env = {"EGRESS_TOKEN_0": secret}
# Wrap the secret in bytes that are invalid UTF-8.
body = b"\x80\x81" + secret.encode("ascii") + b"\xff"
result = scan_outbound(self._ROUTE, body, env)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_binary_body_valid_utf8_decoded_correctly(self):
env = {"EGRESS_TOKEN_0": "mysecret"}
# Valid UTF-8 body — should be decoded as UTF-8, not latin-1.
body = "clean body with mysecret".encode("utf-8")
result = scan_outbound(self._ROUTE, body, env)
self.assertIsNotNone(result)
def test_entropy_detector_off_by_default(self):
import string
# High-entropy content should NOT warn if the route has no entropy detector.
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
result = scan_outbound(self._ROUTE, alphabet, {})
self.assertIsNone(result)
def test_entropy_detector_warns_when_enabled(self):
import string
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
result = scan_outbound(self._ROUTE_ENTROPY, alphabet, {})
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("warn", result.severity)
def test_bot_bottle_sensitive_prefixes_env_var(self):
# When the sidecar env contains BOT_BOTTLE_SENSITIVE_PREFIXES,
# scan_outbound should scan those additional prefixes.
secret = "extra-sensitive-value-abc"
env = {
"MY_CRED_KEY": secret,
"BOT_BOTTLE_SENSITIVE_PREFIXES": "MY_CRED_",
}
result = scan_outbound(self._ROUTE, f"x={secret}", env)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_bot_bottle_sensitive_prefixes_multiple(self):
secret = "my-api-key-value-xyz"
env = {
"ANTHROPIC_API_0": secret,
"BOT_BOTTLE_SENSITIVE_PREFIXES": "ANTHROPIC_API_,OTHER_",
}
result = scan_outbound(self._ROUTE, f"auth={secret}", env)
self.assertIsNotNone(result)
def test_canary_detected_via_random_secret_env_name(self):
# The fake secret uses a randomized env name that the sidecar marks
# as sensitive through BOT_BOTTLE_SENSITIVE_PREFIXES.
canary = "canaryvalue12345abcdef"
env = {
"CANON_ALPHA_SECRET": canary,
"BOT_BOTTLE_SENSITIVE_PREFIXES": "CANON_ALPHA_SECRET",
}
result = scan_outbound(self._ROUTE, f"data={canary}", env)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
self.assertIn("CANON_ALPHA_SECRET", result.reason)
def test_fragmented_canary_blocked(self):
# Canary with separators injected is still caught.
canary = "supersecretcanary99"
env = {
"CANON_ALPHA_SECRET": canary,
"BOT_BOTTLE_SENSITIVE_PREFIXES": "CANON_ALPHA_SECRET",
}
fragmented = "-".join(canary)
result = scan_outbound(self._ROUTE, f"x={fragmented}", env)
self.assertIsNotNone(result)
class TestOutboundDetectorNames(unittest.TestCase):
def test_entropy_in_outbound_detector_names(self):
from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES
self.assertIn("entropy", OUTBOUND_DETECTOR_NAMES)
def test_known_secrets_in_outbound_detector_names(self):
from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES
self.assertIn("known_secrets", OUTBOUND_DETECTOR_NAMES)
def test_token_patterns_in_outbound_detector_names(self):
from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES
self.assertIn("token_patterns", OUTBOUND_DETECTOR_NAMES)
if __name__ == "__main__": if __name__ == "__main__":
@@ -1,274 +0,0 @@
"""Unit: LOG_FULL credential redaction in _log_request / _log_response (issue #257).
egress_addon.py is sidecar-only code that depends on mitmproxy, which is
not installed on the host. This file pre-populates sys.modules with the
minimum mocks needed so EgressAddon can be imported and tested without the
real mitmproxy package."""
from __future__ import annotations
import json
import sys
import types
import unittest
from io import StringIO
from typing import Any
from unittest.mock import patch
# ---------------------------------------------------------------------------
# Sidecar-import shims — must run before importing egress_addon
# ---------------------------------------------------------------------------
def _ensure_shims() -> None:
if "mitmproxy" not in sys.modules:
_mm = types.ModuleType("mitmproxy")
_mh = types.ModuleType("mitmproxy.http")
setattr(_mm, "http", _mh)
sys.modules["mitmproxy"] = _mm
sys.modules["mitmproxy.http"] = _mh
if "egress_addon_core" not in sys.modules:
import bot_bottle.egress_addon_core as _core
sys.modules["egress_addon_core"] = _core
_ensure_shims()
from bot_bottle.egress_addon import EgressAddon # noqa: E402 (import after shims)
from bot_bottle.egress_addon_core import Config, LOG_FULL # noqa: E402
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _addon() -> EgressAddon:
"""Return a bare EgressAddon with LOG_FULL config and no routes file."""
a: EgressAddon = EgressAddon.__new__(EgressAddon)
a.config = Config(routes=(), log=LOG_FULL)
a.safe_tokens = set()
a._supervise_queue_dir = ""
a._supervise_slug = ""
a._token_allow_timeout = 300.0
return a
class _Headers:
def __init__(self, d: dict[str, str]) -> None:
self._d = d
def items(self) -> list[tuple[str, str]]:
return list(self._d.items())
class _Request:
def __init__(
self,
host: str = "api.example.com",
method: str = "POST",
path: str = "/v1/messages",
headers: dict[str, str] | None = None,
body: str = "",
) -> None:
self.pretty_host = host
self.method = method
self.path = path
self.headers = _Headers(headers or {})
self._body = body
def get_text(self, *, strict: bool = True) -> str:
return self._body
class _Response:
def __init__(
self,
status_code: int = 200,
headers: dict[str, str] | None = None,
body: str = "",
) -> None:
self.status_code = status_code
self.headers = _Headers(headers or {})
self._body = body
def get_text(self, *, strict: bool = True) -> str:
return self._body
class _Flow:
def __init__(
self,
request: _Request | None = None,
response: _Response | None = None,
) -> None:
self.request = request or _Request()
self.response = response or _Response()
def _log_request(addon: EgressAddon, flow: _Flow) -> dict[str, Any]:
buf = StringIO()
with patch("sys.stderr", buf):
addon._log_request(flow) # type: ignore[arg-type]
return json.loads(buf.getvalue())
def _log_response(addon: EgressAddon, flow: _Flow) -> dict[str, Any]:
buf = StringIO()
with patch("sys.stderr", buf):
addon._log_response(flow) # type: ignore[arg-type]
return json.loads(buf.getvalue())
# ---------------------------------------------------------------------------
# _log_request — authorization header stripped
# ---------------------------------------------------------------------------
class TestLogRequestAuthorizationStripped(unittest.TestCase):
def test_lowercase_authorization_excluded(self) -> None:
addon = _addon()
flow = _Flow(request=_Request(headers={"authorization": "Bearer sk-real-secret"}))
entry = _log_request(addon, flow)
self.assertNotIn("authorization", entry["headers"])
def test_titlecase_authorization_excluded(self) -> None:
addon = _addon()
flow = _Flow(request=_Request(headers={"Authorization": "Bearer sk-real-secret"}))
entry = _log_request(addon, flow)
self.assertNotIn("Authorization", entry["headers"])
self.assertNotIn("authorization", entry["headers"])
def test_non_auth_headers_retained(self) -> None:
addon = _addon()
flow = _Flow(request=_Request(headers={
"authorization": "Bearer sk-real-secret",
"content-type": "application/json",
}))
entry = _log_request(addon, flow)
self.assertIn("content-type", entry["headers"])
self.assertEqual("application/json", entry["headers"]["content-type"])
def test_no_authorization_header_logs_all_others(self) -> None:
addon = _addon()
flow = _Flow(request=_Request(headers={"x-request-id": "abc"}))
entry = _log_request(addon, flow)
self.assertEqual({"x-request-id": "abc"}, entry["headers"])
# ---------------------------------------------------------------------------
# _log_request — body redaction
# ---------------------------------------------------------------------------
_OPENAI_KEY = "sk-" + "A" * 48
class TestLogRequestBodyRedacted(unittest.TestCase):
def test_token_pattern_in_body_scrubbed(self) -> None:
addon = _addon()
flow = _Flow(request=_Request(body=f"key={_OPENAI_KEY}"))
entry = _log_request(addon, flow)
self.assertNotIn(_OPENAI_KEY, entry["body"])
self.assertIn("********", entry["body"])
def test_provisioned_secret_in_body_scrubbed(self) -> None:
addon = _addon()
secret = "provisioned-egress-secret-xyz"
flow = _Flow(request=_Request(body=f"token={secret}"))
with patch.dict("os.environ", {"EGRESS_TOKEN_0": secret}):
entry = _log_request(addon, flow)
self.assertNotIn(secret, entry["body"])
self.assertIn("********", entry["body"])
def test_clean_body_preserved(self) -> None:
addon = _addon()
payload = '{"model": "claude-3", "max_tokens": 1024}'
flow = _Flow(request=_Request(body=payload))
entry = _log_request(addon, flow)
self.assertEqual(payload, entry["body"])
# ---------------------------------------------------------------------------
# _log_request — non-authorization header value redaction
# ---------------------------------------------------------------------------
class TestLogRequestHeaderValuesRedacted(unittest.TestCase):
def test_token_in_custom_header_scrubbed(self) -> None:
addon = _addon()
flow = _Flow(request=_Request(headers={"x-api-key": _OPENAI_KEY}))
entry = _log_request(addon, flow)
self.assertNotIn(_OPENAI_KEY, entry["headers"].get("x-api-key", ""))
self.assertIn("********", entry["headers"].get("x-api-key", ""))
def test_clean_header_value_preserved(self) -> None:
addon = _addon()
flow = _Flow(request=_Request(headers={"accept": "application/json"}))
entry = _log_request(addon, flow)
self.assertEqual("application/json", entry["headers"]["accept"])
# ---------------------------------------------------------------------------
# _log_response — body redaction
# ---------------------------------------------------------------------------
class TestLogResponseBodyRedacted(unittest.TestCase):
def test_token_pattern_in_response_body_scrubbed(self) -> None:
addon = _addon()
flow = _Flow(
request=_Request(),
response=_Response(body=f'{{"key": "{_OPENAI_KEY}"}}'),
)
entry = _log_response(addon, flow)
self.assertNotIn(_OPENAI_KEY, entry["body"])
self.assertIn("********", entry["body"])
def test_provisioned_secret_in_response_body_scrubbed(self) -> None:
addon = _addon()
secret = "provisioned-egress-secret-xyz"
flow = _Flow(
request=_Request(),
response=_Response(body=f'{{"token": "{secret}"}}'),
)
with patch.dict("os.environ", {"EGRESS_TOKEN_0": secret}):
entry = _log_response(addon, flow)
self.assertNotIn(secret, entry["body"])
self.assertIn("********", entry["body"])
def test_clean_response_body_preserved(self) -> None:
addon = _addon()
flow = _Flow(request=_Request(), response=_Response(body='{"result": "ok"}'))
entry = _log_response(addon, flow)
self.assertEqual('{"result": "ok"}', entry["body"])
# ---------------------------------------------------------------------------
# _log_response — response header value redaction
# ---------------------------------------------------------------------------
class TestLogResponseHeaderValuesRedacted(unittest.TestCase):
def test_token_in_response_header_scrubbed(self) -> None:
addon = _addon()
flow = _Flow(
request=_Request(),
response=_Response(headers={"set-cookie": f"token={_OPENAI_KEY}"}),
)
entry = _log_response(addon, flow)
cookie_val = entry["headers"].get("set-cookie", "")
self.assertNotIn(_OPENAI_KEY, cookie_val)
self.assertIn("********", cookie_val)
def test_clean_response_header_preserved(self) -> None:
addon = _addon()
flow = _Flow(
request=_Request(),
response=_Response(headers={"content-type": "application/json"}),
)
entry = _log_response(addon, flow)
self.assertEqual("application/json", entry["headers"]["content-type"])
if __name__ == "__main__":
unittest.main()
-9
View File
@@ -54,15 +54,6 @@ class TestValidateRoutesContent(unittest.TestCase):
' auth_scheme: "Bearer"\n' ' auth_scheme: "Bearer"\n'
) )
def test_rejects_log_full(self):
with self.assertRaises(EgressApplyError) as cm:
applicator.validate_routes_content(
'log: 2\n'
'routes:\n'
' - host: "x.example"\n'
)
self.assertIn("must not change egress logging", str(cm.exception))
class TestApplyRoutesChange(unittest.TestCase): class TestApplyRoutesChange(unittest.TestCase):
def setUp(self): def setUp(self):
+1 -24
View File
@@ -30,7 +30,6 @@ def _plan(
supervise: bool = False, supervise: bool = False,
agent_git_gate_url: str = "", agent_git_gate_url: str = "",
agent_supervise_url: str = "", agent_supervise_url: str = "",
canary: bool = False,
) -> MacosContainerBottlePlan: ) -> MacosContainerBottlePlan:
routes_path = stage_dir / "routes.yaml" routes_path = stage_dir / "routes.yaml"
routes_path.write_text("routes: []\n", encoding="utf-8") routes_path.write_text("routes: []\n", encoding="utf-8")
@@ -43,8 +42,6 @@ def _plan(
routes_path=routes_path, routes_path=routes_path,
routes=("route",), routes=("route",),
token_env_map={"EGRESS_TOKEN_0": "HOST_TOKEN"}, token_env_map={"EGRESS_TOKEN_0": "HOST_TOKEN"},
canary="fake-canary-value" if canary else "",
canary_env="CANON_ALPHA_SECRET" if canary else "",
) )
if git: if git:
key_path = stage_dir / "origin-key" key_path = stage_dir / "origin-key"
@@ -141,26 +138,6 @@ class TestMacosContainerLaunchArgv(unittest.TestCase):
argv, argv,
) )
def test_sidecar_argv_registers_canary_env_as_sensitive(self):
plan = _plan(stage_dir=self.stage_dir, canary=True)
argv = launch._sidecar_run_argv(
plan,
"bot-bottle-sidecars-dev-abc",
"bot-bottle-net-dev-abc",
"bot-bottle-egress-dev-abc",
)
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", argv)
self.assertIn("BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET", argv)
def test_agent_argv_receives_canary_env(self):
plan = _plan(stage_dir=self.stage_dir, canary=True)
argv = launch._agent_run_argv(
plan,
"bot-bottle-net-dev-abc",
"192.0.2.10",
)
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", argv)
def test_agent_env_points_proxy_at_sidecar_ip(self): def test_agent_env_points_proxy_at_sidecar_ip(self):
plan = _plan( plan = _plan(
stage_dir=self.stage_dir, stage_dir=self.stage_dir,
@@ -294,7 +271,7 @@ def _build_plan(stage_dir: Path) -> MacosContainerBottlePlan:
manifest=_MANIFEST, manifest=_MANIFEST,
stage_dir=stage_dir, stage_dir=stage_dir,
git_gate_plan=cast(GitGatePlan, SimpleNamespace(upstreams=())), git_gate_plan=cast(GitGatePlan, SimpleNamespace(upstreams=())),
egress_plan=cast(EgressPlan, SimpleNamespace(canary="")), egress_plan=cast(EgressPlan, SimpleNamespace()),
supervise_plan=None, supervise_plan=None,
agent_provision=AgentProvisionPlan( agent_provision=AgentProvisionPlan(
template="claude", template="claude",
-27
View File
@@ -73,33 +73,6 @@ resolver #2
) )
self.assertTrue(run.call_args_list[-1].kwargs["check"]) self.assertTrue(run.call_args_list[-1].kwargs["check"])
def test_build_image_anchors_relative_dockerfile_to_context(self):
status = util.subprocess.CompletedProcess(
args=[],
returncode=0,
stdout=(
'[{"status":{"state":"running"},'
'"configuration":{"dns":{"nameservers":["9.9.9.9"]}}}]'
),
stderr="",
)
with patch.object(util.subprocess, "run", return_value=status) as run, \
patch.object(util.os, "environ", {
"BOT_BOTTLE_MACOS_CONTAINER_DNS": "9.9.9.9",
}):
util.build_image(
"bot-bottle-sidecars:latest",
"/repo",
dockerfile="Dockerfile.sidecars",
)
self.assertEqual(
[
"container", "build", "-t", "bot-bottle-sidecars:latest",
"--dns", "9.9.9.9", "-f", "/repo/Dockerfile.sidecars", "/repo",
],
run.call_args_list[-1].args[0],
)
def test_commit_container_execs_tar_and_builds_image(self): def test_commit_container_execs_tar_and_builds_image(self):
# stderr is bytes because subprocess.run uses stderr=PIPE without text=True # stderr is bytes because subprocess.run uses stderr=PIPE without text=True
completed = util.subprocess.CompletedProcess( completed = util.subprocess.CompletedProcess(
+3 -29
View File
@@ -26,7 +26,9 @@ from bot_bottle.backend.smolmachines.bottle import SmolmachinesBottle
from bot_bottle.backend.smolmachines.bottle_plan import ( from bot_bottle.backend.smolmachines.bottle_plan import (
SmolmachinesBottlePlan, SmolmachinesBottlePlan,
) )
from bot_bottle.backend.smolmachines import launch as _launch # from bot_bottle.backend.smolmachines.provision import (
# workspace as _workspace,
# )
from bot_bottle.backend.smolmachines.launch import _bundle_launch_spec from bot_bottle.backend.smolmachines.launch import _bundle_launch_spec
from bot_bottle.backend.util import AGENT_CA_PATH from bot_bottle.backend.util import AGENT_CA_PATH
from bot_bottle.egress import EgressPlan, EgressRoute from bot_bottle.egress import EgressPlan, EgressRoute
@@ -83,7 +85,6 @@ def _plan(
stage_dir: Path | None = None, stage_dir: Path | None = None,
egress_routes: tuple[EgressRoute, ...] = (), egress_routes: tuple[EgressRoute, ...] = (),
egress_ca_path: Path = Path(), egress_ca_path: Path = Path(),
canary: bool = False,
supervise: bool = False, supervise: bool = False,
bundle_ip: str = "192.168.50.2", bundle_ip: str = "192.168.50.2",
agent_git_gate_host: str = "127.0.0.1:55555", agent_git_gate_host: str = "127.0.0.1:55555",
@@ -154,8 +155,6 @@ def _plan(
routes=egress_routes, routes=egress_routes,
token_env_map={}, token_env_map={},
mitmproxy_ca_cert_only_host_path=egress_ca_path, mitmproxy_ca_cert_only_host_path=egress_ca_path,
canary="fake-canary-value" if canary else "",
canary_env="CANON_ALPHA_SECRET" if canary else "",
), ),
supervise_plan=supervise_plan, supervise_plan=supervise_plan,
agent_git_gate_host=agent_git_gate_host, agent_git_gate_host=agent_git_gate_host,
@@ -411,31 +410,6 @@ class TestBundleLaunchSpec(unittest.TestCase):
self.assertIn(9420, spec.ports_to_publish) self.assertIn(9420, spec.ports_to_publish)
self.assertNotIn(9418, spec.ports_to_publish) self.assertNotIn(9418, spec.ports_to_publish)
def test_canary_env_registered_as_sensitive_in_bundle(self):
plan = _plan(canary=True)
spec = _bundle_launch_spec(plan, "net", "127.0.0.16")
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", spec.environment)
self.assertIn(
"BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET",
spec.environment,
)
def test_canary_env_visible_to_smolvm_guest(self):
plan = _plan(canary=True)
with patch.object(
_launch._bundle,
"bundle_host_port",
return_value="65000",
):
stamped = _launch._discover_urls(plan, "127.0.0.16")
self.assertEqual(
"fake-canary-value",
stamped.guest_env["CANON_ALPHA_SECRET"],
)
class TestProvisionGitUser(unittest.TestCase): class TestProvisionGitUser(unittest.TestCase):
"""`provision_git` runs `git config --global` inside the """`provision_git` runs `git config --global` inside the
-9
View File
@@ -67,15 +67,6 @@ class TestValidation(unittest.TestCase):
with self.assertRaises(_RpcError): with self.assertRaises(_RpcError):
validate_proposed_file(_sv.TOOL_EGRESS_BLOCK, "routes: nope\n") validate_proposed_file(_sv.TOOL_EGRESS_BLOCK, "routes: nope\n")
def test_egress_routes_yaml_rejects_log_full(self):
with self.assertRaises(_RpcError) as cm:
validate_proposed_file(
_sv.TOOL_EGRESS_ALLOW,
"log: 2\nroutes:\n - host: example.com\n",
)
self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code)
self.assertIn("must not change egress logging", cm.exception.message)
# --- JSON-RPC parsing ------------------------------------------------------ # --- JSON-RPC parsing ------------------------------------------------------