diff --git a/README.md b/README.md index f2ad087..63dcafe 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ ## Features - **Per-bottle egress allowlist** — TLS-bumped HTTP/HTTPS chokepoint with a per-manifest host allowlist; per-route path/method/header `matches` filtering; outbound DLP scanning for known tokens and secrets, inbound DLP scanning for prompt-injection attempts; DoH and arbitrary hosts blocked by default. +- **Per-route token-match policy** — each egress route picks what happens when the outbound DLP catches a token via `dlp.outbound_on_match`: `supervise` (default) holds the request and surfaces it in `./cli.py supervise` for approval (an approved value is remembered for the life of the proxy); `redact` scrubs the value and forwards; `block` is a hard `403`. Cuts false-positive friction without weakening default-deny. - **Tokens the agent never sees** — host secrets live in a sidecar; the agent dials `http://sidecar:9099/` and the proxy strips inbound `Authorization` and injects the real token before forwarding. `printenv` in the agent shows proxy URLs only. - **Gitleaks-scanned push (git-gate)** — `bottle.git` remotes route through a per-bottle `git daemon` that gitleaks-scans incoming refs pre-receive and forwards clean refs upstream over SSH. The agent never holds the upstream credential. - **Manifest-scoped skills + secrets** — each bottle declares its skills, env, git identity, remotes, and egress routes; unknown keys die at load. @@ -148,8 +149,11 @@ You help maintain Gitea-hosted projects. | `dlp` | no | Per-route DLP overrides. Omit to use defaults (all detectors on). | | `dlp.outbound_detectors` | no | `false` disables outbound scanning; list restricts to named detectors (`token_patterns`, `known_secrets`). | | `dlp.inbound_detectors` | no | `false` disables inbound scanning; list restricts to named detectors (`naive_injection_detection`). | +| `dlp.outbound_on_match` | no | What to do when an outbound token is detected: `supervise` (default for manifest routes — hold for operator approval), `redact` (scrub the value and forward), or `block` (hard 403). Agent-provider routes (e.g. `api.anthropic.com`) default to `redact`. | | `git.fetch` | no | `true` permits smart HTTP clone/fetch (`git-upload-pack`) for this host. Push (`git-receive-pack`) remains blocked. | +When an outbound DLP detector matches a token, the route's `dlp.outbound_on_match` policy decides what happens. Under the default `supervise`, the proxy queues an `egress-token-allow` proposal for the operator's `./cli.py supervise` TUI and holds the request open until it is answered (or `EGRESS_TOKEN_ALLOW_TIMEOUT_SECONDS`, default 300s, elapses — after which it fails closed). The operator never sees the raw token, only the host, method, path, and a redacted snippet; approving adds the value to an in-memory safelist for the life of the egress proxy. Under `redact`, the matched value is scrubbed from the body, headers, and path and the request is forwarded (failing closed if a match lands somewhere unredactable, like the hostname). Under `block` it stays a hard `403`. Structural blocks (CRLF injection) and not-in-allowlist host blocks are always hard `403`s regardless of policy. + More examples in `examples/`. Full design lives under `docs/prds/`; the trust-boundary rationale is in `docs/prds/0011-per-file-md-manifest.md`. ## Trademarks diff --git a/bot_bottle/cli/supervise.py b/bot_bottle/cli/supervise.py index fb76958..9db0b93 100644 --- a/bot_bottle/cli/supervise.py +++ b/bot_bottle/cli/supervise.py @@ -54,6 +54,7 @@ from ..supervise import ( TOOL_ALLOW, TOOL_EGRESS_BLOCK, TOOL_GITLEAKS_ALLOW, + TOOL_EGRESS_TOKEN_ALLOW, archive_proposal, list_pending_proposals, render_diff, @@ -65,6 +66,11 @@ from ._common import PROG _REFRESH_INTERVAL_MS = 1000 +# Proposal tools whose payload is a read-only report, not a file the operator +# edits: modify is unavailable and approval requires a recorded reason for the +# audit trail. +_REPORT_ONLY_TOOLS: tuple[str, ...] = (TOOL_GITLEAKS_ALLOW, TOOL_EGRESS_TOKEN_ALLOW) + @dataclass(frozen=True) class QueuedProposal: @@ -141,7 +147,7 @@ def _suffix_for_tool(tool: str) -> str: return ".dockerfile" if tool in (TOOL_ALLOW, TOOL_EGRESS_BLOCK): return ".yaml" - if tool == TOOL_GITLEAKS_ALLOW: + if tool in (TOOL_GITLEAKS_ALLOW, TOOL_EGRESS_TOKEN_ALLOW): return ".txt" return ".txt" @@ -212,8 +218,8 @@ def _approve_from_tui( notes: str = "", ) -> str: """Approve from curses, prompting for any tool-specific audit note.""" - if qp.proposal.tool == TOOL_GITLEAKS_ALLOW and final_file is None: - notes = _prompt(stdscr, "allow reason (test fixture/false positive): ") + if qp.proposal.tool in _REPORT_ONLY_TOOLS and final_file is None: + notes = _prompt(stdscr, "allow reason (false positive / legitimately needed): ") if not notes: return "approve aborted (empty reason)" approve(qp, final_file=final_file, notes=notes) @@ -411,8 +417,8 @@ def _main_loop(stdscr: "curses._CursesWindow") -> None: # type: ignore except ApplyError as e: status_line = f"apply failed: {e}" elif key == ord("m"): - if qp.proposal.tool == TOOL_GITLEAKS_ALLOW: - status_line = "modify unavailable for gitleaks-allow" + if qp.proposal.tool in _REPORT_ONLY_TOOLS: + status_line = f"modify unavailable for {qp.proposal.tool}" continue edited = _modify(stdscr, qp) if edited is None: @@ -525,7 +531,7 @@ def _detail_view( pass return elif key == ord("m"): - if qp.proposal.tool == TOOL_GITLEAKS_ALLOW: + if qp.proposal.tool in _REPORT_ONLY_TOOLS: return edited = _modify(stdscr, qp) if edited is not None: diff --git a/bot_bottle/dlp_detectors.py b/bot_bottle/dlp_detectors.py index c2c038b..ef36e7f 100644 --- a/bot_bottle/dlp_detectors.py +++ b/bot_bottle/dlp_detectors.py @@ -78,16 +78,27 @@ TOKEN_PATTERNS: tuple[tuple[str, re.Pattern[str]], ...] = ( ) -def scan_token_patterns(text: str, *, location: str = "body") -> ScanResult | None: +def scan_token_patterns( + text: str, + *, + location: str = "body", + safe_tokens: typing.AbstractSet[str] | None = None, +) -> ScanResult | None: normalized = _normalize_text(text) for name, pattern in TOKEN_PATTERNS: - m = pattern.search(normalized) - if m is not None: + for m in pattern.finditer(normalized): + value = m.group(0) + # A value the supervisor has approved (PRD 0062) is no longer a + # block — keep scanning so a second, un-approved token in the + # same request is still caught. + if safe_tokens is not None and value in safe_tokens: + continue return ScanResult( severity="block", reason=f"{name} found in {location}", location=location, - context=_snippet(text, m.start(), m.end()), + context=_snippet(normalized, m.start(), m.end()), + matched=value, ) return None @@ -155,6 +166,7 @@ def scan_known_secrets( *, location: str = "body", env: typing.Mapping[str, str] | None = None, + safe_tokens: typing.AbstractSet[str] | None = None, ) -> ScanResult | None: if env is None: return None @@ -164,11 +176,17 @@ def scan_known_secrets( for variant in _encoded_variants(value): pos = text.find(variant) if pos >= 0: + # The supervisor approves the exact encoded variant found + # (PRD 0062); a different encoding of the same secret is a + # fresh block. + if safe_tokens is not None and variant in safe_tokens: + continue return ScanResult( severity="block", reason=f"provisioned secret from {key} found in {location}", location=location, context=_snippet(text, pos, pos + len(variant)), + matched=variant, ) return None @@ -265,6 +283,14 @@ _CRLF_ENCODED_RE = re.compile(r"%0[dD]%0[aA]", re.ASCII) _CRLF_HEADER_INJECT_RE = re.compile(r"\r\n[A-Za-z][A-Za-z0-9\-]+\s*:", re.ASCII) +def strip_crlf(text: str) -> str: + """Remove URL-encoded and literal CRLF injection sequences from a request + surface (PRD 0062 redact policy). Used to scrub the request line / headers + so the request can be forwarded instead of hard-blocked.""" + text = _CRLF_ENCODED_RE.sub("", text) + return _CRLF_HEADER_INJECT_RE.sub(lambda m: m.group(0)[2:], text) + + def scan_crlf_injection(text: str) -> ScanResult | None: if _CRLF_ENCODED_RE.search(text): return ScanResult( @@ -288,4 +314,5 @@ __all__ = [ "scan_known_secrets", "scan_naive_injection", "scan_token_patterns", + "strip_crlf", ] diff --git a/bot_bottle/egress.py b/bot_bottle/egress.py index f9e2ee8..8943049 100644 --- a/bot_bottle/egress.py +++ b/bot_bottle/egress.py @@ -16,6 +16,7 @@ from pathlib import Path from typing import TYPE_CHECKING from .egress_addon_core import ( + ON_MATCH_REDACT, HeaderMatch as CoreHeaderMatch, MatchEntry as CoreMatchEntry, PathMatch as CorePathMatch, @@ -95,6 +96,7 @@ def egress_manifest_routes( git_fetch=r.GitFetch, outbound_detectors=r.OutboundDetectors, inbound_detectors=r.InboundDetectors, + outbound_on_match=r.OutboundOnMatch, )) return tuple(out) @@ -105,12 +107,27 @@ def egress_routes_for_bottle( ) -> tuple[EgressRoute, ...]: manifest = egress_manifest_routes(bottle) provisioned_hosts = {pr.host.lower() for pr in provider_routes} - merged = list(provider_routes) + [ + merged = list(_default_provider_on_match(provider_routes)) + [ r for r in manifest if r.host.lower() not in provisioned_hosts ] return _assign_token_slots(merged) +def _default_provider_on_match( + provider_routes: tuple[EgressRoute, ...], +) -> tuple[EgressRoute, ...]: + """Provider routes (the agent talking to its own LLM API) default to the + `redact` on-match policy (PRD 0062): high-volume conversation payloads are + the worst source of token-shaped false positives, so a match is scrubbed + and forwarded rather than hard-blocked or queued for the operator. A + provider that sets `outbound_on_match` explicitly keeps its choice.""" + return tuple( + r if r.outbound_on_match + else dataclasses.replace(r, outbound_on_match=ON_MATCH_REDACT) + for r in provider_routes + ) + + def _assign_token_slots( routes: list[EgressRoute], ) -> tuple[EgressRoute, ...]: @@ -177,7 +194,11 @@ def _route_to_yaml_fields(r: Route) -> dict[str, object]: fields["matches"] = matches_data if r.git_fetch: fields["git"] = {"fetch": True} - if r.outbound_detectors is not None or r.inbound_detectors is not None: + if ( + r.outbound_detectors is not None + or r.inbound_detectors is not None + or r.outbound_on_match + ): dlp: dict[str, object] = {} if r.outbound_detectors is not None: dlp["outbound_detectors"] = ( @@ -189,6 +210,8 @@ def _route_to_yaml_fields(r: Route) -> dict[str, object]: False if not r.inbound_detectors else list(r.inbound_detectors) ) + if r.outbound_on_match: + dlp["outbound_on_match"] = r.outbound_on_match fields["dlp"] = dlp return fields @@ -260,6 +283,8 @@ def egress_render_routes( elif isinstance(dv, list): items_str = ", ".join(f'"{x}"' for x in dv) lines.append(f" {dk}: [{items_str}]") + elif isinstance(dv, str): + lines.append(f' {dk}: "{dv}"') return "\n".join(lines) + "\n" diff --git a/bot_bottle/egress_addon.py b/bot_bottle/egress_addon.py index c0df317..41f4ca5 100644 --- a/bot_bottle/egress_addon.py +++ b/bot_bottle/egress_addon.py @@ -5,6 +5,7 @@ egress container.""" from __future__ import annotations +import asyncio import json import os import signal @@ -16,9 +17,15 @@ from mitmproxy import http # type: ignore[import-not-found] # pylint: disable= from egress_addon_core import ( # type: ignore[import-not-found] # pylint: disable=import-error LOG_BLOCKS, LOG_FULL, + DEFAULT_OUTBOUND_ON_MATCH, + ON_MATCH_BLOCK, + ON_MATCH_REDACT, Config, + Route, + ScanResult, build_inbound_scan_text, build_outbound_scan_text, + build_token_allow_payload, decide, decide_git_fetch, is_git_fetch_request, @@ -32,23 +39,55 @@ from egress_addon_core import ( # type: ignore[import-not-found] # pylint: dis ) try: - from dlp_detectors import redact_tokens # type: ignore[import-not-found] + from dlp_detectors import redact_tokens, strip_crlf # type: ignore[import-not-found] except ImportError: # pragma: no cover - host-side path - from bot_bottle.dlp_detectors import redact_tokens # type: ignore[import-not-found] + from bot_bottle.dlp_detectors import ( # type: ignore[import-not-found] + redact_tokens, + strip_crlf, + ) + +try: + import supervise as _sv # type: ignore[import-not-found] +except ImportError: # pragma: no cover - host-side path + from bot_bottle import supervise as _sv # type: ignore[import-not-found] DEFAULT_ROUTES_PATH = "/etc/egress/routes.yaml" INTROSPECT_HOST = "_egress.local" +# Seconds the egress proxy holds a token-blocked request open waiting for the +# operator's supervisor decision (PRD 0062), overridable via env. +DEFAULT_TOKEN_ALLOW_TIMEOUT_SECONDS = 300.0 +# Filesystem poll cadence while awaiting the operator's response. +TOKEN_ALLOW_POLL_INTERVAL_SECONDS = 0.5 + +# Fixed operator guidance attached to every token-allow proposal. +_TOKEN_ALLOW_JUSTIFICATION = ( + "egress DLP blocked an outbound request carrying a detected token. " + "Approve only if this value is a false positive or a credential this " + "request legitimately needs; the value is then allowed for the life of " + "this bottle's egress proxy." +) + class EgressAddon: def __init__(self) -> None: self.routes_path = os.environ.get("EGRESS_ROUTES", DEFAULT_ROUTES_PATH) self.config: Config = Config(routes=()) + # Tokens the operator has approved this session (PRD 0062). In-memory + # only — a restart re-prompts. Mutated only from the asyncio loop that + # runs the addon hooks, so no lock is needed. + self.safe_tokens: set[str] = set() + self._supervise_queue_dir = os.environ.get("SUPERVISE_QUEUE_DIR", "").strip() + self._supervise_slug = os.environ.get("SUPERVISE_BOTTLE_SLUG", "").strip() + self._token_allow_timeout = _token_allow_timeout_from_env(os.environ) self._reload(initial=True) self._install_sighup() + def _supervise_available(self) -> bool: + return bool(self._supervise_queue_dir and self._supervise_slug) + def _reload(self, *, initial: bool = False) -> None: try: text = Path(self.routes_path).read_text(encoding="utf-8") @@ -145,7 +184,7 @@ class EgressAddon: + "\n" ) - def request(self, flow: http.HTTPFlow) -> None: + async def request(self, flow: http.HTTPFlow) -> None: request_path, _, query = flow.request.path.partition("?") if flow.request.pretty_host == INTROSPECT_HOST: @@ -157,21 +196,11 @@ class EgressAddon: # Hostname is included to catch DNS-tunnelling exfiltration attempts. route = match_route(self.config.routes, flow.request.pretty_host) if route is not None: - body = flow.request.get_text(strict=False) or "" - scan_text = build_outbound_scan_text( - flow.request.pretty_host, - request_path, - query, - outbound_scan_headers(route, dict(flow.request.headers)), - body, - ) - dlp_result = scan_outbound(route, scan_text, os.environ) - if dlp_result is not None and dlp_result.severity == "block": - ctx = self._req_ctx(flow) - if dlp_result.context: - ctx = {**ctx, "context": dlp_result.context} - self._block(flow, f"egress DLP: {dlp_result.reason}", ctx=ctx) + if not await self._handle_outbound_dlp(flow, route): return + # The redact policy may have rewritten the request line; recompute + # the path/query the git checks below rely on. + request_path, _, query = flow.request.path.partition("?") if is_git_push_request(request_path, query): self._block( @@ -221,6 +250,202 @@ class EgressAddon: if self.config.log >= LOG_FULL: self._log_request(flow) + def _block_dlp(self, flow: http.HTTPFlow, result: ScanResult) -> None: + ctx = self._req_ctx(flow) + if result.context: + ctx = {**ctx, "context": result.context} + self._block(flow, f"egress DLP: {result.reason}", ctx=ctx) + + async def _handle_outbound_dlp( + self, + flow: http.HTTPFlow, + route: Route, + ) -> bool: + """Scan the outbound request and apply the route's on-match policy + (PRD 0062). Returns True if the request may be forwarded, False if a + 403 response has been written to `flow`. + + Loops so the supervise policy can re-scan after each approval — a + second, un-approved token in the same request is still caught.""" + while True: + request_path, _, query = flow.request.path.partition("?") + body = flow.request.get_text(strict=False) or "" + headers = outbound_scan_headers(route, dict(flow.request.headers)) + scan_text = build_outbound_scan_text( + flow.request.pretty_host, request_path, query, headers, body, + ) + # CRLF is scanned only over the request line + headers, never the + # body (see scan_outbound) — a body is not an injection vector. + crlf_text = build_outbound_scan_text( + flow.request.pretty_host, request_path, query, headers, "", + ) + result = scan_outbound( + route, scan_text, os.environ, + safe_tokens=self.safe_tokens, crlf_text=crlf_text, + ) + if result is None or result.severity != "block": + return True + + policy = route.outbound_on_match or DEFAULT_OUTBOUND_ON_MATCH + + # redact scrubs every detection (tokens and structural CRLF) and + # forwards; it fails closed only if a match survives the scrub. + if policy == ON_MATCH_REDACT: + if self._redact_outbound(flow, route): + if self.config.log >= LOG_BLOCKS: + sys.stderr.write(json.dumps({ + "event": "egress_redacted", + "reason": f"egress DLP: {result.reason}", + **self._req_ctx(flow), + }) + "\n") + return True + self._block( + flow, + f"egress DLP: {result.reason}; redaction could not remove " + "all matches (e.g. a match in the hostname)", + ctx=self._req_ctx(flow), + ) + return False + + # Structural blocks (CRLF, no safelist-able value) cannot be + # supervised — there is nothing to approve and remember — so under + # block/supervise they are a hard 403. + if policy == ON_MATCH_BLOCK or not result.matched: + self._block_dlp(flow, result) + return False + + # supervise (default): hold the request for operator approval. + # Fall back to a hard 403 when supervise isn't wired for the bottle. + if not self._supervise_available(): + self._block_dlp(flow, result) + return False + approved = await self._supervise_token_block(flow, request_path, result) + if not approved: + return False # _supervise_token_block wrote the 403 response + # loop: the approved value is now in safe_tokens; re-scan. + + def _redact_outbound(self, flow: http.HTTPFlow, route: Route) -> bool: + """Scrub detected tokens (and CRLF injection sequences) from the mutable + request surfaces (body, headers, path/query) and re-scan. Returns True + if the request is now clean; False if a block-severity match remains on + a surface redaction cannot rewrite (the hostname) so the caller fails + closed.""" + body = flow.request.get_text(strict=False) + if body: + redacted_body = redact_tokens(body, env=os.environ) + if redacted_body != body: + flow.request.text = redacted_body + for name, value in list(flow.request.headers.items()): + if name.lower() == "host": + continue # routing-critical; never a legitimate token + redacted = strip_crlf(redact_tokens(value, env=os.environ)) + if redacted != value: + flow.request.headers[name] = redacted + redacted_path = strip_crlf(redact_tokens(flow.request.path, env=os.environ)) + if redacted_path != flow.request.path: + flow.request.path = redacted_path + + request_path, _, query = flow.request.path.partition("?") + new_body = flow.request.get_text(strict=False) or "" + headers = outbound_scan_headers(route, dict(flow.request.headers)) + scan_text = build_outbound_scan_text( + flow.request.pretty_host, request_path, query, headers, new_body, + ) + crlf_text = build_outbound_scan_text( + flow.request.pretty_host, request_path, query, headers, "", + ) + result = scan_outbound(route, scan_text, os.environ, crlf_text=crlf_text) + return result is None or result.severity != "block" + + async def _supervise_token_block( + self, + flow: http.HTTPFlow, + request_path: str, + result: ScanResult, + ) -> bool: + """Route a token DLP block to the operator's supervisor queue and wait. + + Returns True if the operator approved (the matched value is added to + `self.safe_tokens` and the caller re-scans); False if the request must + be blocked (a 403 response has been written to `flow`).""" + host = flow.request.pretty_host + payload = build_token_allow_payload( + redact_tokens(host, env=os.environ), + flow.request.method, + redact_tokens(request_path, env=os.environ), + result, + ) + proposal = _sv.Proposal.new( + bottle_slug=self._supervise_slug, + tool=_sv.TOOL_EGRESS_TOKEN_ALLOW, + proposed_file=payload, + justification=_TOKEN_ALLOW_JUSTIFICATION, + current_file_hash=_sv.sha256_hex(payload), + ) + queue_dir = Path(self._supervise_queue_dir) + try: + _sv.write_proposal(queue_dir, proposal) + except OSError as e: + sys.stderr.write( + f"egress: could not queue token-allow proposal: {e}; " + "blocking request\n" + ) + self._block(flow, f"egress DLP: {result.reason}", ctx=self._req_ctx(flow)) + return False + + sys.stderr.write(json.dumps({ + "event": "egress_token_supervise", + "reason": f"egress DLP: {result.reason}", + "proposal": proposal.id, + **self._req_ctx(flow), + }) + "\n") + + response = await self._await_token_response(queue_dir, proposal.id) + _sv.archive_proposal(queue_dir, proposal.id) + + if response is not None and response.status in ( + _sv.STATUS_APPROVED, _sv.STATUS_MODIFIED, + ): + self.safe_tokens.add(result.matched) + if self.config.log >= LOG_BLOCKS: + sys.stderr.write(json.dumps({ + "event": "egress_token_allowed", + "reason": f"egress DLP: {result.reason}", + "proposal": proposal.id, + **self._req_ctx(flow), + }) + "\n") + return True + + if response is None: + reason = ( + f"egress DLP: {result.reason}; supervisor approval timed out " + f"after {self._token_allow_timeout:g}s" + ) + else: + reason = f"egress DLP: {result.reason}; supervisor rejected the request" + self._block(flow, reason, ctx=self._req_ctx(flow)) + return False + + async def _await_token_response( + self, + queue_dir: Path, + proposal_id: str, + ) -> "_sv.Response | None": + """Poll the queue dir for the operator's response without blocking the + proxy event loop. Returns the Response, or None on timeout.""" + loop = asyncio.get_running_loop() + deadline = loop.time() + self._token_allow_timeout + while True: + try: + return _sv.read_response(queue_dir, proposal_id) + except (OSError, ValueError, KeyError): + # Not written yet, or a partial/malformed write — retry until + # the deadline, then fail closed. + pass + if loop.time() >= deadline: + return None + await asyncio.sleep(TOKEN_ALLOW_POLL_INTERVAL_SECONDS) + def response(self, flow: http.HTTPFlow) -> None: """DLP inbound scan on response headers and body.""" route = match_route(self.config.routes, flow.request.pretty_host) @@ -272,7 +497,12 @@ class EgressAddon: message = flow.websocket.messages[-1] # type: ignore[union-attr] content = message.content.decode("utf-8", errors="replace") if message.from_client: - result = scan_outbound(route, content, os.environ) + # A WebSocket data frame is not an HTTP request line, so CRLF is + # not an injection vector here — scan only for credential leakage. + result = scan_outbound( + route, content, os.environ, + safe_tokens=self.safe_tokens, crlf_text="", + ) if result is not None and result.severity == "block": sys.stderr.write(f"egress DLP: {result.reason}\n") flow.kill() # type: ignore[union-attr] @@ -286,4 +516,23 @@ class EgressAddon: sys.stderr.write(f"egress DLP warn: {result.reason}\n") +def _token_allow_timeout_from_env(env: "os._Environ[str]") -> float: + """Read EGRESS_TOKEN_ALLOW_TIMEOUT_SECONDS; fall back to the default on an + unset or invalid value (a bad value should not wedge egress at boot).""" + raw = env.get("EGRESS_TOKEN_ALLOW_TIMEOUT_SECONDS", "").strip() + if not raw: + return DEFAULT_TOKEN_ALLOW_TIMEOUT_SECONDS + try: + value = float(raw) + except ValueError: + value = 0.0 + if value <= 0: + sys.stderr.write( + "egress: invalid EGRESS_TOKEN_ALLOW_TIMEOUT_SECONDS=" + f"{raw!r}; using default {DEFAULT_TOKEN_ALLOW_TIMEOUT_SECONDS:g}s\n" + ) + return DEFAULT_TOKEN_ALLOW_TIMEOUT_SECONDS + return value + + addons = [EgressAddon()] diff --git a/bot_bottle/egress_addon_core.py b/bot_bottle/egress_addon_core.py index 595baeb..324f7c7 100644 --- a/bot_bottle/egress_addon_core.py +++ b/bot_bottle/egress_addon_core.py @@ -37,6 +37,15 @@ VALID_METHODS = frozenset({ OUTBOUND_DETECTOR_NAMES = frozenset({"token_patterns", "known_secrets"}) INBOUND_DETECTOR_NAMES = frozenset({"naive_injection_detection"}) +# Per-route policy for what the proxy does when an outbound DLP detector +# matches a token (PRD 0062). +ON_MATCH_BLOCK = "block" # hard 403, never overridable +ON_MATCH_REDACT = "redact" # scrub the matched value, forward the request +ON_MATCH_SUPERVISE = "supervise" # queue for operator approval, hold the request +OUTBOUND_ON_MATCH_VALUES = (ON_MATCH_BLOCK, ON_MATCH_REDACT, ON_MATCH_SUPERVISE) +# Unset resolves to supervise (fall back to block when supervise is not wired). +DEFAULT_OUTBOUND_ON_MATCH = ON_MATCH_SUPERVISE + @dataclass(frozen=True) class PathMatch: @@ -69,6 +78,8 @@ class Route: git_fetch: bool = False outbound_detectors: tuple[str, ...] | None = None inbound_detectors: tuple[str, ...] | None = None + # "" means unset → DEFAULT_OUTBOUND_ON_MATCH. See OUTBOUND_ON_MATCH_VALUES. + outbound_on_match: str = "" LOG_OFF = 0 # no logging @@ -95,6 +106,11 @@ class ScanResult: reason: str location: str = "" # where the match was found, e.g. "body", "authorization header" context: str = "" # surrounding text with the match replaced by REDACT + # Raw substring the detector matched. Used inside the sidecar to key the + # supervisor-approved "safe tokens" set (PRD 0062); never logged or written + # to a proposal file. Empty for structural detectors (CRLF) that carry no + # safelist-able value. + matched: str = "" # --------------------------------------------------------------------------- @@ -218,12 +234,12 @@ def _parse_detectors( idx: int, host: str, raw_dict: dict[str, object], -) -> tuple[tuple[str, ...] | None, tuple[str, ...] | None]: +) -> tuple[tuple[str, ...] | None, tuple[str, ...] | None, str]: """Parse the optional `dlp` block on a route, returning - (outbound_detectors, inbound_detectors).""" + (outbound_detectors, inbound_detectors, outbound_on_match).""" dlp_raw = raw_dict.get("dlp") if dlp_raw is None: - return None, None + return None, None, "" label = f"route[{idx}] ({host})" if not isinstance(dlp_raw, dict): raise ValueError(f"{label}: 'dlp' must be an object") @@ -260,13 +276,24 @@ def _parse_detectors( outbound = _parse_detector_field("outbound_detectors", OUTBOUND_DETECTOR_NAMES) inbound = _parse_detector_field("inbound_detectors", INBOUND_DETECTOR_NAMES) + on_match = "" + on_match_raw = dlp.get("outbound_on_match") + if on_match_raw is not None: + if not isinstance(on_match_raw, str) or on_match_raw not in OUTBOUND_ON_MATCH_VALUES: + raise ValueError( + f"{label}: dlp.outbound_on_match must be one of " + f"{', '.join(OUTBOUND_ON_MATCH_VALUES)} (got {on_match_raw!r})" + ) + on_match = on_match_raw + for k in dlp: - if k not in ("outbound_detectors", "inbound_detectors"): + if k not in ("outbound_detectors", "inbound_detectors", "outbound_on_match"): raise ValueError( f"{label}: dlp has unknown key {k!r}; accepted keys " - f"are 'outbound_detectors', 'inbound_detectors'" + f"are 'outbound_detectors', 'inbound_detectors', " + f"'outbound_on_match'" ) - return outbound, inbound + return outbound, inbound, on_match def parse_routes(payload: object) -> tuple[Route, ...]: @@ -337,7 +364,7 @@ def _parse_one(idx: int, raw: object) -> Route: ) # dlp detectors - outbound_detectors, inbound_detectors = _parse_detectors( + outbound_detectors, inbound_detectors, outbound_on_match = _parse_detectors( idx, host, raw_dict, ) @@ -356,6 +383,7 @@ def _parse_one(idx: int, raw: object) -> Route: git_fetch=git_fetch, outbound_detectors=outbound_detectors, inbound_detectors=inbound_detectors, + outbound_on_match=outbound_on_match, ) @@ -404,6 +432,8 @@ def route_to_yaml_dict(r: Route) -> dict[str, object]: dlp["outbound_detectors"] = list(r.outbound_detectors) if r.inbound_detectors is not None: dlp["inbound_detectors"] = list(r.inbound_detectors) + if r.outbound_on_match: + dlp["outbound_on_match"] = r.outbound_on_match if dlp: d["dlp"] = dlp return d @@ -690,6 +720,9 @@ def scan_outbound( route: Route, body: str | bytes, environ: typing.Mapping[str, str], + *, + safe_tokens: typing.AbstractSet[str] | None = None, + crlf_text: str | None = None, ) -> ScanResult | None: # Lazy import to avoid circular deps and keep dlp_detectors optional # at import time (the sidecar copies it flat alongside this file). @@ -708,25 +741,53 @@ def scan_outbound( text = body if isinstance(body, str) else body.decode("utf-8", errors="replace") - # CRLF injection is never legitimate — runs unconditionally, not gated - # by outbound_detectors config. - result = scan_crlf_injection(text) + # CRLF injection is only an attack in the request line + headers, never the + # body: an HTTP body is delimited by Content-Length, so CRLF bytes there + # cannot split the request. Scanning the body produces false positives on + # legitimate form-encoded / multi-line content. Callers pass the + # body-excluded surfaces as `crlf_text`; `None` falls back to the full text + # for backward-compatible callers (host-side tests, websocket frames). + crlf_target = text if crlf_text is None else crlf_text + result = scan_crlf_injection(crlf_target) if result is not None: return result if _detector_enabled(route.outbound_detectors, "token_patterns"): - result = scan_token_patterns(text, location="body") + result = scan_token_patterns(text, location="body", safe_tokens=safe_tokens) if result is not None: return result if _detector_enabled(route.outbound_detectors, "known_secrets"): - result = scan_known_secrets(text, location="body", env=environ) + result = scan_known_secrets( + text, location="body", env=environ, safe_tokens=safe_tokens, + ) if result is not None: return result return None +def build_token_allow_payload( + host: str, + method: str, + path: str, + result: ScanResult, +) -> str: + """Render the human-readable supervisor proposal body for an outbound + token block (PRD 0062). Carries the host/method/path, the detector + reason, and the redacted context snippet — never the raw token value.""" + lines = [ + "egress blocked an outbound request carrying a detected token", + f"host: {host}", + f"method: {method}", + f"path: {path}", + f"detector: {result.reason}", + ] + if result.context: + lines.append(f"context: {result.context}") + return "\n".join(lines) + "\n" + + def scan_inbound( route: Route, body: str | bytes, @@ -751,6 +812,11 @@ __all__ = [ "route_to_yaml_dict", "LOG_FULL", "LOG_OFF", + "ON_MATCH_BLOCK", + "ON_MATCH_REDACT", + "ON_MATCH_SUPERVISE", + "OUTBOUND_ON_MATCH_VALUES", + "DEFAULT_OUTBOUND_ON_MATCH", "Config", "Decision", "HeaderMatch", @@ -760,6 +826,7 @@ __all__ = [ "ScanResult", "build_inbound_scan_text", "build_outbound_scan_text", + "build_token_allow_payload", "decide", "decide_git_fetch", "evaluate_matches", diff --git a/bot_bottle/manifest_egress.py b/bot_bottle/manifest_egress.py index 0f22209..9b46f2e 100644 --- a/bot_bottle/manifest_egress.py +++ b/bot_bottle/manifest_egress.py @@ -21,6 +21,9 @@ VALID_METHODS = frozenset({ OUTBOUND_DETECTOR_NAMES = frozenset({"token_patterns", "known_secrets"}) INBOUND_DETECTOR_NAMES = frozenset({"naive_injection_detection"}) +# What the proxy does on an outbound token match (PRD 0062). +OUTBOUND_ON_MATCH_VALUES = ("block", "redact", "supervise") + def validate_egress_routes( bottle_name: str, @@ -67,6 +70,7 @@ class ManifestEgressRoute: GitFetch: bool = False OutboundDetectors: tuple[str, ...] | None = None InboundDetectors: tuple[str, ...] | None = None + OutboundOnMatch: str = "" @classmethod def from_dict(cls, bottle_name: str, idx: int, raw: object) -> "ManifestEgressRoute": @@ -161,8 +165,9 @@ class ManifestEgressRoute: # --- dlp --- outbound_detectors: tuple[str, ...] | None = None inbound_detectors: tuple[str, ...] | None = None + outbound_on_match = "" if "dlp" in d: - outbound_detectors, inbound_detectors = _parse_dlp_block( + outbound_detectors, inbound_detectors, outbound_on_match = _parse_dlp_block( label, d.get("dlp"), ) @@ -201,6 +206,7 @@ class ManifestEgressRoute: GitFetch=git_fetch, OutboundDetectors=outbound_detectors, InboundDetectors=inbound_detectors, + OutboundOnMatch=outbound_on_match, ) @@ -323,7 +329,7 @@ def _parse_header_match( def _parse_dlp_block( route_label: str, raw: object, -) -> tuple[tuple[str, ...] | None, tuple[str, ...] | None]: +) -> tuple[tuple[str, ...] | None, tuple[str, ...] | None, str]: label = f"{route_label} dlp" d = as_json_object(raw, label) @@ -358,13 +364,24 @@ def _parse_dlp_block( outbound = _parse_field("outbound_detectors", OUTBOUND_DETECTOR_NAMES) inbound = _parse_field("inbound_detectors", INBOUND_DETECTOR_NAMES) + on_match = "" + on_match_raw = d.get("outbound_on_match") + if on_match_raw is not None: + if not isinstance(on_match_raw, str) or on_match_raw not in OUTBOUND_ON_MATCH_VALUES: + raise ManifestError( + f"{label} outbound_on_match must be one of " + f"{', '.join(OUTBOUND_ON_MATCH_VALUES)} (got {on_match_raw!r})" + ) + on_match = on_match_raw + for k in d: - if k not in ("outbound_detectors", "inbound_detectors"): + if k not in ("outbound_detectors", "inbound_detectors", "outbound_on_match"): raise ManifestError( f"{label} has unknown key {k!r}; accepted keys are " - f"'outbound_detectors', 'inbound_detectors'" + f"'outbound_detectors', 'inbound_detectors', " + f"'outbound_on_match'" ) - return outbound, inbound + return outbound, inbound, on_match LOG_LEVELS = frozenset({0, 1, 2}) diff --git a/bot_bottle/supervise.py b/bot_bottle/supervise.py index b27ab5e..7b1d56d 100644 --- a/bot_bottle/supervise.py +++ b/bot_bottle/supervise.py @@ -52,12 +52,16 @@ TOOL_CAPABILITY_BLOCK = "capability-block" TOOL_EGRESS_BLOCK = "egress-block" TOOL_ALLOW = "allow" TOOL_GITLEAKS_ALLOW = "gitleaks-allow" +# Written directly by the egress addon (not an agent-facing MCP tool) when an +# outbound DLP token block is routed to the operator for override (PRD 0062). +TOOL_EGRESS_TOKEN_ALLOW = "egress-token-allow" TOOL_LIST_EGRESS_ROUTES = "list-egress-routes" TOOLS: tuple[str, ...] = ( TOOL_ALLOW, TOOL_CAPABILITY_BLOCK, TOOL_EGRESS_BLOCK, TOOL_GITLEAKS_ALLOW, + TOOL_EGRESS_TOKEN_ALLOW, TOOL_LIST_EGRESS_ROUTES, ) @@ -556,6 +560,7 @@ __all__ = [ "EGRESS_INTROSPECT_URL", "TOOL_CAPABILITY_BLOCK", "TOOL_GITLEAKS_ALLOW", + "TOOL_EGRESS_TOKEN_ALLOW", "TOOL_LIST_EGRESS_ROUTES", "archive_proposal", "audit_dir", diff --git a/bot_bottle/supervise_server.py b/bot_bottle/supervise_server.py index 6530561..e529787 100644 --- a/bot_bottle/supervise_server.py +++ b/bot_bottle/supervise_server.py @@ -187,6 +187,7 @@ TOOL_DEFINITIONS: list[dict[str, object]] = [ " dlp: (optional DLP scanner overrides)\n" " outbound_detectors: [token_patterns, known_secrets]\n" " inbound_detectors: [naive_injection_detection]\n" + " outbound_on_match: block|redact|supervise (default supervise)\n" "Omit any key that should use its default. " "`list-egress-routes` returns routes in this same format." ), @@ -228,6 +229,7 @@ TOOL_DEFINITIONS: list[dict[str, object]] = [ " dlp: (optional DLP scanner overrides)\n" " outbound_detectors: [token_patterns, known_secrets]\n" " inbound_detectors: [naive_injection_detection]\n" + " outbound_on_match: block|redact|supervise (default supervise)\n" "Omit any key that should use its default. " "`list-egress-routes` returns routes in this same format." ), diff --git a/docs/prds/0062-egress-supervisor-token-override.md b/docs/prds/0062-egress-supervisor-token-override.md new file mode 100644 index 0000000..741b127 --- /dev/null +++ b/docs/prds/0062-egress-supervisor-token-override.md @@ -0,0 +1,210 @@ +# PRD 0062: Supervisor override for egress token blocks + +- **Status:** Active +- **Author:** claude +- **Created:** 2026-06-24 +- **Issue:** #261 + +## Summary + +Give each egress route a policy for what happens when an outbound DLP detector +matches a token, via `dlp.outbound_on_match: block | redact | supervise` +(default `supervise`): + +- **`supervise`** (default) — route the block through the existing supervisor + approval queue instead of returning `403` immediately. The proxy holds the + request open until the operator approves or rejects it. On approval the + matched token is added to an in-memory "safe tokens" set so the request — and + any later request carrying the same token — flows through without + re-prompting. +- **`redact`** — scrub the matched value(s) from the request and forward it, + no operator in the loop. For routes where a token-shaped value is noise the + upstream doesn't need (telemetry/log sinks). Fails closed if a match lands on + a surface redaction can't rewrite (the hostname). +- **`block`** — the original hard `403`; never overridable. For routes where a + detected token must always stop. + +The motivating goal is reducing friction from false positives without weakening +the default-deny posture: supervise keeps a human in the loop, redact is an +explicit per-route opt-in, and block stays available for sensitive routes. + +## Problem + +The outbound DLP detectors (`token_patterns`, `known_secrets`) are +deliberately aggressive: any string that looks like a credential is blocked +before it leaves the bottle. That is the right default, but it produces false +positives — a token-shaped value that is not actually a secret, or a credential +the agent legitimately needs to send to a declared host. Today the only +recovery is for the operator to notice the `egress DLP` 403 in the logs and +hand-edit the route's `dlp.outbound_detectors`, which disables the detector for +the whole route rather than allowing the one value. + +The operator has no in-the-loop signal that a token block happened and no +fine-grained way to say "this specific value is fine." + +## Goals / Success Criteria + +1. An outbound DLP **token** block (a `ScanResult` carrying a matched secret + value) creates a supervisor proposal instead of an immediate `403`. +2. The egress proxy holds the blocked request open, polling for the operator's + response up to a bounded timeout. +3. The proposal shows the operator the host, method, path, the detector reason, + and a **redacted** context snippet — never the raw token value. +4. On `approved`/`modified`, the matched token value is added to an in-memory + safe-tokens set and the request proceeds normally; later requests carrying + the same value skip the block. +5. On `rejected`, timeout, malformed response, or missing supervisor wiring, + the request fails closed with the same `403` as today. +6. Structural blocks that carry no token value (CRLF injection) and the + route-not-allowlisted / git blocks are unchanged — they stay hard `403`s and + keep their existing agent-driven `allow` / `egress-block` MCP path. +7. The proxy event loop is not stalled while waiting: the wait is asynchronous, + so other flows keep being served. + +## Non-goals + +- Persisting the safe-tokens set across egress restarts. It lives in process + memory only; a restart re-prompts. (The issue explicitly defers persistence.) +- Supervising inbound (prompt-injection) blocks or WebSocket frame blocks. + WebSocket frames still honour the safe-tokens set for already-approved values + but cannot wait for approval (there is no response surface after upgrade). +- Generalising an approved secret across encodings. The safe-tokens set matches + the exact value the detector found. +- Replacing the per-route `dlp.outbound_detectors` override. That remains the + way to turn a detector off wholesale. +- Making `redact` the default. Silent redaction of a true false positive + corrupts legitimate data, so it is opt-in per route; `supervise` (human in + the loop) stays the default. + +## Scope + +### In scope + +The minimum cut that ships, in build order: + +1. **Core** — `ScanResult.matched`; thread `safe_tokens` through + `scan_outbound` / the token detectors; `build_token_allow_payload`. +2. **Supervise + TUI** — `TOOL_EGRESS_TOKEN_ALLOW`; TUI suffix, modify guard, + required approval reason. +3. **Addon glue** — async `request`, safe-tokens set, proposal write + async + poll, allow/block decision; pass `safe_tokens` into the WebSocket path. +4. **On-match policy** — `dlp.outbound_on_match` through manifest → render → + addon; `redact` surface scrub with fail-closed re-scan; policy dispatch in + the addon's outbound handler. +5. **Tests + docs** — core/supervise/TUI/manifest/render unit tests; README + egress + supervisor notes. + +### Out of scope + +The deferrals enumerated under **Non-goals** — restart persistence, inbound / +WebSocket-frame supervision, cross-encoding generalisation, replacing +`dlp.outbound_detectors`, and making `redact` the default. + +## Proposed Design + +### New services / components + +A new proposal tool constant `egress-token-allow` (`TOOL_EGRESS_TOKEN_ALLOW`) +is added to `supervise.TOOLS`, and the egress addon gains an in-memory +safe-tokens set plus the policy-dispatch path that drives it. + +On an outbound block the addon dispatches on the resolved policy: + +- **Structural blocks always 403.** A `ScanResult` with no `matched` value + (CRLF injection) is a hard `403` regardless of policy — there is nothing to + redact or safelist. +- **`redact`** runs `redact_tokens` over the body, non-`host` header values, + and path/query, then re-scans. If the re-scan is clean the (rewritten) + request is forwarded; if a block-severity match remains (e.g. in the + hostname, or a unicode-evasion token redaction can't reach) it fails closed + with a `403`. +- **`block`** writes the `403` immediately. +- **`supervise`** runs the queue-and-wait loop, falling back to `block` when + supervise isn't wired for the bottle. + +For `supervise`, the addon writes the proposal directly to +`SUPERVISE_QUEUE_DIR` (the queue is bind-mounted into the sidecar bundle and +shared by every daemon, exactly as git-gate's `gitleaks-allow` proposal in PRD +0061 does). The proposal's `proposed_file` is a human-readable text payload +built by `build_token_allow_payload`: + +``` +egress blocked an outbound request carrying a detected token +host: api.example.com +method: POST +path: /v1/ingest +detector: OpenAI API key found in body +context: ...before ******** after... +``` + +The justification tells the operator to approve only if the value is a false +positive or a credential the request legitimately needs. The addon then polls +`.response.json` for `EGRESS_TOKEN_ALLOW_TIMEOUT_SECONDS` (default +300). `approved`/`modified` allow the request and add the value to the +safe-tokens set; `rejected`, malformed responses, and timeout fail the request +closed. The proposal + response are archived to `processed/` after a decision. +Because the wait happens inside mitmproxy's asyncio loop, the addon's `request` +hook is async and polls with `asyncio.sleep`, so concurrent flows are +unaffected. + +### Existing code touched + +- **Policy threading.** `dlp.outbound_on_match` is a per-route enum threaded + from the bottle manifest (`manifest_egress`) through the resolved route + (`egress.EgressRoute`), the rendered `routes.yaml` (`egress_render_routes`), + and the addon's `Route` (`egress_addon_core`). Unset renders nothing and + resolves to `supervise` at request time. The `list-egress-routes` + introspection endpoint round-trips it so the agent's proposals preserve it. +- **Provider-route default.** Agent-provider routes (the agent talking to its + own LLM API — `api.anthropic.com`, the Codex backend, etc.) are the worst + source of token-shaped false positives because the whole conversation payload + flows through them. `egress_routes_for_bottle` fills `outbound_on_match=redact` + on any provider route that doesn't set it explicitly; a provider that sets the + policy keeps its choice, and manifest routes are unaffected (they default to + `supervise`). +- **Scanners.** `scan_outbound` (and the token detectors `scan_token_patterns` + / `scan_known_secrets` it calls) accept a `safe_tokens` set. A match whose + value is in `safe_tokens` is skipped, so an approved token no longer blocks; + the scanners keep searching past a safelisted match so a second, un-approved + secret in the same request is still caught. The WebSocket path is passed the + same `safe_tokens` set. +- **Supervisor UI.** `cli/supervise.py` renders `egress-token-allow` like + `gitleaks-allow`: the text payload is shown, modify is unavailable (there is + no file patch to edit), and approval prompts for a non-empty reason recorded + in the response notes. There is no on-disk config diff, so — like + `gitleaks-allow` and `capability-block` — it writes no egress audit-log entry. +- **Failure handling.** If `SUPERVISE_QUEUE_DIR` / `SUPERVISE_BOTTLE_SLUG` are + unset (supervise disabled for the bottle), the addon skips the queue and + returns the existing `403`. Any error writing the proposal or reading the + response also fails closed. + +### Data model changes + +- New per-route manifest field `dlp.outbound_on_match: block | redact | + supervise`, rendered into `routes.yaml` (omitted when unset). +- `ScanResult` gains a `matched: str = ""` field carrying the raw substring the + detector matched. The token detectors populate it; the structural CRLF + detector leaves it empty. The value stays inside the egress sidecar process — + never written to a log line (logs use the redacted `context`) nor to the + proposal file. +- Proposal text payload (above) plus `.response.json` in + `SUPERVISE_QUEUE_DIR`, archived to `processed/` after a decision. +- New env var `EGRESS_TOKEN_ALLOW_TIMEOUT_SECONDS` (default 300). + +### External dependencies + +None. Reuses the existing supervisor queue (`SUPERVISE_QUEUE_DIR`) and the +mitmproxy addon framework already in the egress sidecar. + +## Open questions + +- Should `known_secrets` (provisioned `EGRESS_TOKEN_*` exfiltration) be + override-able at all, or only `token_patterns`? This PRD allows both — + approval is an explicit operator decision and the safe-tokens set matches the + exact found value — but a future revision could restrict `known_secrets` to + reject-only. + +## References + +- Issue #261 +- PRD 0061 — `gitleaks-allow` supervisor proposal pattern this reuses. diff --git a/tests/unit/test_dlp_detectors.py b/tests/unit/test_dlp_detectors.py index 03ddae6..3028729 100644 --- a/tests/unit/test_dlp_detectors.py +++ b/tests/unit/test_dlp_detectors.py @@ -445,5 +445,63 @@ class TestKnownSecretsNewVariants(unittest.TestCase): self.assertIsNotNone(result) +class TestMatchedAndSafeTokens(unittest.TestCase): + """PRD 0062: detectors carry the raw matched value, and a safelisted + value is skipped so the supervisor can approve a specific token.""" + + def test_token_pattern_sets_matched(self): + token = "ghp_" + "A" * 36 + result = scan_token_patterns(f"token: {token}") + assert result is not None + self.assertEqual(token, result.matched) + + def test_safe_token_is_skipped(self): + token = "ghp_" + "A" * 36 + self.assertIsNone( + scan_token_patterns(f"token: {token}", safe_tokens={token}) + ) + + def test_safe_token_does_not_mask_other_token(self): + safe = "ghp_" + "A" * 36 + other = "AKIAIOSFODNN7EXAMPLE" + result = scan_token_patterns( + f"a={safe} b={other}", safe_tokens={safe}, + ) + assert result is not None + self.assertEqual(other, result.matched) + self.assertIn("AWS", result.reason) + + def test_known_secret_sets_matched_and_safelist_skips(self): + secret = "supersecretvalue123" + env = {"EGRESS_TOKEN_FOO": secret} + result = scan_known_secrets(f"x={secret}", env=env) + assert result is not None + self.assertEqual(secret, result.matched) + self.assertIsNone( + scan_known_secrets(f"x={secret}", env=env, safe_tokens={secret}) + ) + + def test_crlf_block_has_no_matched_value(self): + result = scan_crlf_injection("path%0d%0aHost: evil") + assert result is not None + self.assertEqual("", result.matched) + + +class TestStripCrlf(unittest.TestCase): + def test_removes_url_encoded_crlf(self): + from bot_bottle.dlp_detectors import strip_crlf + out = strip_crlf("next=%0d%0aX-Injected: evil") + self.assertNotRegex(out, r"%0[dD]%0[aA]") + + def test_removes_literal_header_injection(self): + from bot_bottle.dlp_detectors import strip_crlf + out = strip_crlf("value\r\nX-Injected: evil") + self.assertIsNone(scan_crlf_injection(out)) + + def test_leaves_clean_text_unchanged(self): + from bot_bottle.dlp_detectors import strip_crlf + self.assertEqual("/api/v1/data?q=hello", strip_crlf("/api/v1/data?q=hello")) + + if __name__ == "__main__": unittest.main() diff --git a/tests/unit/test_egress.py b/tests/unit/test_egress.py index 05ea426..4fdae02 100644 --- a/tests/unit/test_egress.py +++ b/tests/unit/test_egress.py @@ -202,6 +202,23 @@ class TestProviderRouteMerge(unittest.TestCase): self.assertEqual((), routes[0].matches) self.assertEqual({}, egress_token_env_map(routes)) + def test_provider_route_defaults_to_redact_on_match(self): + b = _bottle([]) + pr = EgressRoute(host="api.anthropic.com") + routes = egress_routes_for_bottle(b, (pr,)) + self.assertEqual("redact", routes[0].outbound_on_match) + + def test_provider_route_explicit_on_match_preserved(self): + b = _bottle([]) + pr = EgressRoute(host="api.anthropic.com", outbound_on_match="supervise") + routes = egress_routes_for_bottle(b, (pr,)) + self.assertEqual("supervise", routes[0].outbound_on_match) + + def test_manifest_route_does_not_get_redact_default(self): + b = _bottle([{"host": "api.example.com"}]) + routes = egress_routes_for_bottle(b) + self.assertEqual("", routes[0].outbound_on_match) + def test_two_provider_routes_with_same_token_ref_share_slot(self): b = _bottle([]) routes = egress_routes_for_bottle(b, ( @@ -329,6 +346,23 @@ class TestRenderRoutes(unittest.TestCase): self.assertEqual(("token_patterns",), addon_routes[0].outbound_detectors) self.assertEqual((), addon_routes[0].inbound_detectors) + def test_outbound_on_match_round_trips(self): + from bot_bottle.egress_addon_core import load_routes + b = _bottle([{"host": "logs.example", "dlp": { + "outbound_on_match": "redact", + }}]) + routes = egress_routes_for_bottle(b) + rendered = egress_render_routes(routes) + self.assertIn('outbound_on_match: "redact"', rendered) + addon_routes = load_routes(rendered) + self.assertEqual("redact", addon_routes[0].outbound_on_match) + + def test_outbound_on_match_default_omitted_from_render(self): + b = _bottle([{"host": "x.example"}]) + routes = egress_routes_for_bottle(b) + rendered = egress_render_routes(routes) + self.assertNotIn("outbound_on_match", rendered) + def test_git_fetch_policy_round_trips(self): from bot_bottle.egress_addon_core import load_routes b = _bottle([{"host": "github.com", "git": {"fetch": True}}]) diff --git a/tests/unit/test_egress_addon_core.py b/tests/unit/test_egress_addon_core.py index 09d79e3..2ea6abd 100644 --- a/tests/unit/test_egress_addon_core.py +++ b/tests/unit/test_egress_addon_core.py @@ -22,8 +22,10 @@ from bot_bottle.egress_addon_core import ( MatchEntry, PathMatch, Route, + ScanResult, build_inbound_scan_text, build_outbound_scan_text, + build_token_allow_payload, decide, decide_git_fetch, evaluate_matches, @@ -267,6 +269,25 @@ class TestParseDlp(unittest.TestCase): "dlp": {"wat": True}, }]}) + def test_outbound_on_match_default_empty(self): + routes = parse_routes({"routes": [{"host": "x.example"}]}) + self.assertEqual("", routes[0].outbound_on_match) + + def test_outbound_on_match_parsed(self): + for policy in ("block", "redact", "supervise"): + routes = parse_routes({"routes": [{ + "host": "x.example", + "dlp": {"outbound_on_match": policy}, + }]}) + self.assertEqual(policy, routes[0].outbound_on_match) + + def test_outbound_on_match_invalid_rejected(self): + with self.assertRaises(ValueError): + parse_routes({"routes": [{ + "host": "x.example", + "dlp": {"outbound_on_match": "nope"}, + }]}) + # --- load_routes --------------------------------------------------------- @@ -1167,5 +1188,92 @@ class TestScanInbound(unittest.TestCase): self.assertEqual("block", result.severity) +class TestScanOutboundSafeTokens(unittest.TestCase): + """PRD 0062: scan_outbound threads the supervisor-approved safe-tokens + set into the token detectors.""" + + def test_safe_token_allows_request(self): + text = build_outbound_scan_text( + host="api.example.com", path="/v1/data", query="", + headers={}, body=f"key={_AWS_KEY}", + ) + self.assertIsNone( + scan_outbound(_ROUTE, text, {}, safe_tokens={_AWS_KEY}) + ) + + def test_unrelated_safe_token_still_blocks(self): + text = build_outbound_scan_text( + host="api.example.com", path="/v1/data", query="", + headers={}, body=f"key={_AWS_KEY}", + ) + result = scan_outbound(_ROUTE, text, {}, safe_tokens={"ghp_" + "A" * 36}) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual(_AWS_KEY, result.matched) + + +class TestScanOutboundCrlfText(unittest.TestCase): + """PRD 0062: CRLF is scanned only over the request line + headers + (crlf_text), never the body — a body is not an injection vector.""" + + def test_body_crlf_not_flagged_when_crlf_text_excludes_body(self): + # A form-encoded multi-line body legitimately contains %0d%0a. + body = "comment=line1%0d%0aline2" + full = build_outbound_scan_text( + host="api.example.com", path="/submit", query="", + headers={}, body=body, + ) + crlf_text = build_outbound_scan_text( + host="api.example.com", path="/submit", query="", + headers={}, body="", + ) + self.assertIsNone(scan_outbound(_ROUTE, full, {}, crlf_text=crlf_text)) + + def test_request_line_crlf_still_flagged(self): + full = build_outbound_scan_text( + host="api.example.com", path="/p", query="next=%0d%0aX:evil", + headers={}, body="", + ) + crlf_text = full + result = scan_outbound(_ROUTE, full, {}, crlf_text=crlf_text) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("block", result.severity) + + def test_default_crlf_text_scans_full_blob(self): + # Backward compatibility: crlf_text=None scans everything (body too). + full = build_outbound_scan_text( + host="api.example.com", path="/submit", query="", + headers={}, body="x=%0d%0aX:evil", + ) + self.assertIsNotNone(scan_outbound(_ROUTE, full, {})) + + +class TestBuildTokenAllowPayload(unittest.TestCase): + def test_payload_includes_context_and_no_raw_token(self): + result = ScanResult( + severity="block", + reason="AWS access key found in body", + location="body", + context="key=******** tail", + matched=_AWS_KEY, + ) + payload = build_token_allow_payload( + "api.example.com", "POST", "/v1/ingest", result, + ) + self.assertIn("host: api.example.com", payload) + self.assertIn("method: POST", payload) + self.assertIn("path: /v1/ingest", payload) + self.assertIn("AWS access key found in body", payload) + self.assertIn("key=******** tail", payload) + # The raw matched value must never appear in the proposal file. + self.assertNotIn(_AWS_KEY, payload) + + def test_payload_omits_context_line_when_empty(self): + result = ScanResult(severity="block", reason="r", matched="x") + payload = build_token_allow_payload("h", "GET", "/", result) + self.assertNotIn("context:", payload) + + if __name__ == "__main__": unittest.main() diff --git a/tests/unit/test_manifest_egress.py b/tests/unit/test_manifest_egress.py index 7daa67a..cf70825 100644 --- a/tests/unit/test_manifest_egress.py +++ b/tests/unit/test_manifest_egress.py @@ -302,6 +302,24 @@ class TestDlp(unittest.TestCase): "bogus": True, }}]) + def test_outbound_on_match_omitted_is_empty(self): + b = _bottle([{"host": "x.example"}]) + self.assertEqual("", b.egress.routes[0].OutboundOnMatch) + + def test_outbound_on_match_accepts_policies(self): + for policy in ("block", "redact", "supervise"): + with self.subTest(policy=policy): + b = _bottle([{"host": "x.example", "dlp": { + "outbound_on_match": policy, + }}]) + self.assertEqual(policy, b.egress.routes[0].OutboundOnMatch) + + def test_outbound_on_match_rejects_unknown_value(self): + with self.assertRaises(ManifestError): + _bottle([{"host": "x.example", "dlp": { + "outbound_on_match": "allow", + }}]) + class TestGitPolicy(unittest.TestCase): def test_omitted_means_https_git_fetch_disabled(self): diff --git a/tests/unit/test_supervise.py b/tests/unit/test_supervise.py index 87ded1b..ed30ef2 100644 --- a/tests/unit/test_supervise.py +++ b/tests/unit/test_supervise.py @@ -322,11 +322,22 @@ class TestToolConstants(unittest.TestCase): TOOL_CAPABILITY_BLOCK, supervise.TOOL_EGRESS_BLOCK, TOOL_GITLEAKS_ALLOW, + supervise.TOOL_EGRESS_TOKEN_ALLOW, supervise.TOOL_LIST_EGRESS_ROUTES, ), supervise.TOOLS, ) + def test_token_allow_proposal_roundtrips(self): + p = Proposal.new( + bottle_slug="dev", + tool=supervise.TOOL_EGRESS_TOKEN_ALLOW, + proposed_file="host: api.example.com\n", + justification="false positive", + current_file_hash="h", + ) + self.assertEqual(p, Proposal.from_dict(p.to_dict())) + def test_component_map_has_egress_entries(self): self.assertEqual( { diff --git a/tests/unit/test_supervise_cli.py b/tests/unit/test_supervise_cli.py index 8b9f354..63a7b47 100644 --- a/tests/unit/test_supervise_cli.py +++ b/tests/unit/test_supervise_cli.py @@ -20,6 +20,7 @@ from bot_bottle.supervise import ( STATUS_REJECTED, TOOL_CAPABILITY_BLOCK, TOOL_GITLEAKS_ALLOW, + TOOL_EGRESS_TOKEN_ALLOW, read_audit_entries, read_response, sha256_hex, @@ -35,6 +36,7 @@ def _proposal(slug: str = "dev", tool: str = TOOL_CAPABILITY_BLOCK) -> Proposal: supervise.TOOL_ALLOW: "routes:\n - host: example.com\n", supervise.TOOL_EGRESS_BLOCK: "routes:\n - host: example.com\n", TOOL_GITLEAKS_ALLOW: "file: tests/test_fixture.py\nline: 3\n", + TOOL_EGRESS_TOKEN_ALLOW: "host: api.example.com\ndetector: token\n", } payload = payloads.get(tool, "") return Proposal.new( @@ -196,6 +198,39 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase): resp = read_response(qp.queue_dir, qp.proposal.id) self.assertEqual("test fixture", resp.notes) + def test_approve_token_allow_leaves_response_for_egress(self): + qp = self._enqueue(tool=TOOL_EGRESS_TOKEN_ALLOW) + supervise_cli.approve(qp, notes="false positive") + # The egress addon polls the queue dir for the response; the TUI must + # not archive it (the addon archives after reading). + resp = read_response(qp.queue_dir, qp.proposal.id) + self.assertEqual(STATUS_APPROVED, resp.status) + self.assertEqual("false positive", resp.notes) + self.assertFalse((qp.queue_dir / "processed").exists()) + + def test_token_allow_writes_no_audit_log(self): + qp = self._enqueue(tool=TOOL_EGRESS_TOKEN_ALLOW) + supervise_cli.approve(qp, notes="false positive") + self.assertEqual([], read_audit_entries("egress", "dev")) + + def test_tui_token_allow_requires_reason(self): + qp = self._enqueue(tool=TOOL_EGRESS_TOKEN_ALLOW) + with patch.object(supervise_cli, "_prompt", return_value=""): + status = supervise_cli._approve_from_tui(None, qp) # type: ignore[arg-type] + self.assertEqual("approve aborted (empty reason)", status) + self.assertFalse((qp.queue_dir / "processed").exists()) + + def test_tui_token_allow_writes_reason(self): + qp = self._enqueue(tool=TOOL_EGRESS_TOKEN_ALLOW) + with patch.object(supervise_cli, "_prompt", return_value="legit"): + status = supervise_cli._approve_from_tui(None, qp) # type: ignore[arg-type] + self.assertIn("approved egress-token-allow", status) + resp = read_response(qp.queue_dir, qp.proposal.id) + self.assertEqual("legit", resp.notes) + + def test_suffix_for_token_allow_is_txt(self): + self.assertEqual(".txt", supervise_cli._suffix_for_tool(TOOL_EGRESS_TOKEN_ALLOW)) + # class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase): # # DISABLED — capability_apply functionality is currently commented out.