Compare commits

..

16 Commits

Author SHA1 Message Date
didericis-claude affd628df6 docs(prd): flip prd-new-strengthen-outbound-exfil-detection Draft → Active
lint / lint (push) Failing after 2m2s
test / unit (pull_request) Successful in 47s
test / integration (pull_request) Successful in 29s
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-25 01:02:55 +00:00
didericis-claude 2610f5a2c9 feat(egress): inject per-session canary token into sidecar and agent environments
EgressPlan gains a `canary: str` field (default "") populated in Egress.prepare()
using secrets.token_urlsafe(32).  Each launched bottle:

  - sidecar receives EGRESS_TOKEN_CANARY=<value> (literal env entry, scanned by
    existing known-secrets detector without any detector code changes)
  - agent receives BOT_BOTTLE_CANARY=<value> (visible fake secret that signals
    exfiltration with zero false positives if it appears in outbound traffic)

Docker compose and macos-container backends updated; smolmachines shares docker
compose and so picks this up automatically.  Unit tests cover canary uniqueness,
detection via scan_known_secrets, and EgressPlan backward-compat default.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-25 01:02:44 +00:00
didericis-claude 001a420957 feat(dlp): fragmentation resistance, entropy detector, broadened known-value scan
- _alnum_projection(): strip non-alphanumeric chars for separator-injection detection
- scan_known_secrets() gains two extra passes per secret after exact-variant matching:
  alnum-projection exact match (catches hyphens/spaces between secret chars) and a
  sliding-window partial-match scan (catches chunked substrings ≥ PARTIAL_MATCH_MIN_LEN)
- scan_known_secrets() accepts sensitive_prefixes param (default ("EGRESS_TOKEN_",))
  so redact_tokens and call-sites can extend the scanned env-var prefix set
- scan_entropy() warn-only detector flagging windows with Shannon entropy ≥ 5.5 bits/char
- "entropy" added to OUTBOUND_DETECTOR_NAMES; scan_outbound opts it in only when
  explicitly listed in dlp.outbound_detectors (never part of the default "all" set)
- scan_outbound reads BOT_BOTTLE_SENSITIVE_PREFIXES from environ to extend
  scan_known_secrets beyond EGRESS_TOKEN_* without schema changes
- Binary bodies decoded via latin-1 fallback (bijective byte↔codepoint) instead
  of utf-8 errors=replace, preserving ASCII secret strings in binary payloads

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-25 01:02:34 +00:00
didericis-claude 813cb685bf docs: draft PRD prd-new for strengthen-outbound-exfil-detection
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-25 00:54:28 +00:00
didericis 7cb967770e feat(log): add leveled severity and structured context to log wrappers
test / unit (pull_request) Successful in 31s
test / integration (pull_request) Successful in 16s
lint / lint (push) Successful in 1m39s
test / unit (push) Successful in 31s
test / integration (push) Successful in 16s
Update Quality Badges / update-badges (push) Successful in 1m32s
log.py was bare print-to-stderr wrappers with no levels or attributable
context (issue #252). Add:

- Ordered severities (debug/info/warn/error) gated by
  BOT_BOTTLE_LOG_LEVEL (default info). debug is silent by default;
  error always surfaces (nothing sits above it), so the fatal die path
  stays visible regardless of configured level.
- An optional `context` mapping on every wrapper, rendered as a
  parseable ` [k=v ...]` suffix (keys sorted; whitespace/quoted values
  quoted) so failures can be filtered and correlated.

Default output with no context is byte-identical to the original lines,
so the 100+ existing single-string call sites are unaffected. Wires the
supervise crash path (the example the issue names) to attach error_type
and crash_log context. Adds test_log.py (backward-compat, context
rendering, level gating, die surfacing).

Closes #252.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01YcU7nerbg8cVj9R4EkpfLJ
2026-06-24 15:37:57 -04:00
didericis 80eca740d6 docs(research): replace unsourced "20% malicious skills" with cited empirical figures
The "~20% of ClawHub skills malicious" claim had no traceable source and
is contradicted by the empirical literature. Replace with the Jan 2026
large-scale study (98,380-skill snapshot: 157 confirmed malicious, ~71%
credential harvesters, exfiltration overwhelmingly naive) and add the
arXiv citation. The corrected figures still support the supply-chain
threat point and are defensible under scrutiny.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01YcU7nerbg8cVj9R4EkpfLJ
2026-06-24 09:32:19 -04:00
didericis 369d332204 Default the supervise flag to true
test / unit (pull_request) Successful in 36s
test / integration (pull_request) Successful in 17s
lint / lint (push) Successful in 1m40s
test / unit (push) Successful in 30s
test / integration (push) Successful in 15s
Update Quality Badges / update-badges (push) Successful in 1m44s
Issue #249: bottles should be supervised by default. Rather than
remove the flag (which would make supervision mandatory and is the
wrong plane for cost-control enforcement — see #251), keep the
opt-out and flip the default. Bottles that omit `supervise:` now get
the stuck-recovery sidecar; `supervise: false` still skips it.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01YcU7nerbg8cVj9R4EkpfLJ
2026-06-23 20:48:04 -04:00
didericis 31cde11b0d docs: correct stale role field and claude provider auth example
lint / lint (push) Successful in 1m53s
The egress route fields table described `role` as a functional field
that wires built-in auth flows. PRD 0029 removed the
`claude_code_oauth` role; the manifest parser now rejects any `role`
value as reserved-for-future-use. Provider auth routes are injected
from `agent_provider.auth_token`.

- README: fix the `role` row to state it is reserved and any value is
  rejected at load.
- examples/bottles/claude.md: the manual `api.anthropic.com` route used
  the rejected `role` key and, even without it, would be silently
  dropped (provider-injected routes win for a provisioned host) — so its
  auth never took effect and the dlp comments described a route that
  never exists in the plan. Replace it with the canonical
  `agent_provider.auth_token` shape.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01YcU7nerbg8cVj9R4EkpfLJ
2026-06-23 17:53:18 -04:00
didericis-claude c41751f3b9 docs: add role and git.fetch to egress route fields table
Both fields were missing from the reference table added in the preceding
commit — `role` is visible in examples/bottles/claude.md and `git.fetch`
is documented in PRD 0052 but neither appeared in the README table.
2026-06-23 17:48:19 -04:00
didericis e2422c20a0 docs: document egress matches, dlp fields, and detector defaults 2026-06-23 17:48:19 -04:00
github-actions[bot] de71533a17 ci(prd): assign sequential numbers to new PRDs 2026-06-23 21:47:01 +00:00
didericis-claude 88c4f61901 fix: don't archive gitleaks-allow response before gate reads it
test / unit (pull_request) Successful in 41s
test / integration (pull_request) Successful in 18s
lint / lint (push) Successful in 1m52s
prd-number / assign-numbers (push) Successful in 45s
test / unit (push) Successful in 36s
test / integration (push) Successful in 21s
Update Quality Badges / update-badges (push) Successful in 1m19s
The TUI was calling archive_proposal for gitleaks-allow immediately
after write_response, moving the response file to processed/ within
microseconds. The git-gate shell loop polls queue_dir for the response
file every second — it never sees it and hangs until timeout.

capability-block is handled by the MCP sidecar which archives after
reading; gitleaks-allow is handled by the shell gate which archives
after processing. Let the gate own the archive step.
2026-06-23 17:37:01 -04:00
didericis-claude c666eaa63f fix: add TOOL_GITLEAKS_ALLOW to __all__ in supervise.py 2026-06-23 17:36:08 -04:00
didericis-codex 83eb9e4041 docs(prd): add gitleaks allow supervision 2026-06-23 17:36:08 -04:00
didericis-codex 33333ac4d9 Supervise gitleaks inline allow exceptions 2026-06-23 17:36:08 -04:00
github-actions[bot] 4d56f515bc ci(prd): assign sequential numbers to new PRDs 2026-06-23 21:32:54 +00:00
24 changed files with 1371 additions and 50 deletions
+26 -2
View File
@@ -14,7 +14,7 @@
## Features ## Features
- **Per-bottle egress allowlist** — TLS-bumped HTTP/HTTPS chokepoint with a per-manifest host allowlist and request-body DLP scanner; DoH and arbitrary hosts blocked by default. - **Per-bottle egress allowlist** — TLS-bumped HTTP/HTTPS chokepoint with a per-manifest host allowlist; per-route path/method/header `matches` filtering; outbound DLP scanning for known tokens and secrets, inbound DLP scanning for prompt-injection attempts; DoH and arbitrary hosts blocked by default.
- **Tokens the agent never sees** — host secrets live in a sidecar; the agent dials `http://sidecar:9099/<path>` and the proxy strips inbound `Authorization` and injects the real token before forwarding. `printenv` in the agent shows proxy URLs only. - **Tokens the agent never sees** — host secrets live in a sidecar; the agent dials `http://sidecar:9099/<path>` and the proxy strips inbound `Authorization` and injects the real token before forwarding. `printenv` in the agent shows proxy URLs only.
- **Gitleaks-scanned push (git-gate)** — `bottle.git` remotes route through a per-bottle `git daemon` that gitleaks-scans incoming refs pre-receive and forwards clean refs upstream over SSH. The agent never holds the upstream credential. - **Gitleaks-scanned push (git-gate)** — `bottle.git` remotes route through a per-bottle `git daemon` that gitleaks-scans incoming refs pre-receive and forwards clean refs upstream over SSH. The agent never holds the upstream credential.
- **Manifest-scoped skills + secrets** — each bottle declares its skills, env, git identity, remotes, and egress routes; unknown keys die at load. - **Manifest-scoped skills + secrets** — each bottle declares its skills, env, git identity, remotes, and egress routes; unknown keys die at load.
@@ -106,8 +106,15 @@ egress:
routes: routes:
- host: gitea.dideric.is - host: gitea.dideric.is
auth: auth:
scheme: token scheme: token # Bearer | token
token_ref: BOT_BOTTLE_GITEA_TOKEN token_ref: BOT_BOTTLE_GITEA_TOKEN
matches: # optional — restrict to specific paths/methods/headers
- paths:
- {type: prefix, value: /api/v1/}
methods: [GET, POST, PATCH, DELETE]
dlp: # optional — per-route detector overrides (default: all on)
outbound_detectors: [token_patterns, known_secrets]
inbound_detectors: false # disable response scanning for this host
--- ---
The `gitea-dev` bottle. Provider auth via the inherited Claude route; The `gitea-dev` bottle. Provider auth via the inherited Claude route;
@@ -126,6 +133,23 @@ skills:
You help maintain Gitea-hosted projects. You help maintain Gitea-hosted projects.
```` ````
**Egress route fields:**
| Field | Required | Description |
|---|---|---|
| `host` | yes | Hostname to allowlist. One entry per host. |
| `role` | no | Reserved for future use. The key is recognised but any value is currently rejected at load. Provider auth routes (e.g. Claude's `api.anthropic.com`) are injected automatically from `agent_provider.auth_token`, not via `role`. |
| `auth.scheme` | when `auth` present | `Bearer` or `token`. Injected by the proxy; the agent never sees the value. |
| `auth.token_ref` | when `auth` present | Env-var name holding the secret on the host. |
| `matches` | no | Array of `{paths, methods, headers}` filters. A request must match at least one entry (if any are given) to be forwarded. |
| `matches[].paths` | no | Array of `{type, value}`. `type` is `prefix` (default), `exact`, or `regex`. |
| `matches[].methods` | no | Array of HTTP method strings, e.g. `[GET, POST]`. |
| `matches[].headers` | no | Array of `{name, value, type}`. `type` is `exact` (default) or `regex`. |
| `dlp` | no | Per-route DLP overrides. Omit to use defaults (all detectors on). |
| `dlp.outbound_detectors` | no | `false` disables outbound scanning; list restricts to named detectors (`token_patterns`, `known_secrets`). |
| `dlp.inbound_detectors` | no | `false` disables inbound scanning; list restricts to named detectors (`naive_injection_detection`). |
| `git.fetch` | no | `true` permits smart HTTP clone/fetch (`git-upload-pack`) for this host. Push (`git-receive-pack`) remains blocked. |
More examples in `examples/`. Full design lives under `docs/prds/`; the trust-boundary rationale is in `docs/prds/0011-per-file-md-manifest.md`. More examples in `examples/`. Full design lives under `docs/prds/`; the trust-boundary rationale is in `docs/prds/0011-per-file-md-manifest.md`.
## Trademarks ## Trademarks
+8
View File
@@ -137,6 +137,10 @@ def _sidecar_bundle_service(plan: DockerBottlePlan) -> dict[str, Any]:
volumes.append(_bind(ep.routes_path.parent, str(Path(EGRESS_ROUTES_IN_CONTAINER).parent))) volumes.append(_bind(ep.routes_path.parent, str(Path(EGRESS_ROUTES_IN_CONTAINER).parent)))
for token_env in sorted(ep.token_env_map.keys()): for token_env in sorted(ep.token_env_map.keys()):
env.append(token_env) env.append(token_env)
if ep.canary:
# Inject canary as a literal NAME=VALUE (not a bare name) — the
# value is a fake secret so it need not be hidden from the compose file.
env.append(f"EGRESS_TOKEN_CANARY={ep.canary}")
# --- git-gate ----------------------------------------------------- # --- git-gate -----------------------------------------------------
gp = plan.git_gate_plan gp = plan.git_gate_plan
@@ -220,6 +224,10 @@ def _agent_service(plan: DockerBottlePlan) -> dict[str, Any]:
# never lands on argv or in the compose file. # never lands on argv or in the compose file.
for name in sorted(plan.forwarded_env.keys()): for name in sorted(plan.forwarded_env.keys()):
env.append(name) env.append(name)
# Canary token: visible to the agent as a fake secret so that any
# outbound appearance of this value is a zero-FP exfil signal.
if plan.egress_plan.canary:
env.append(f"BOT_BOTTLE_CANARY={plan.egress_plan.canary}")
service: dict[str, Any] = { service: dict[str, Any] = {
"image": plan.image, "image": plan.image,
@@ -353,6 +353,8 @@ def _sidecar_env_entries(plan: MacosContainerBottlePlan) -> tuple[str, ...]:
env: list[str] = [] env: list[str] = []
if plan.egress_plan.routes: if plan.egress_plan.routes:
env.extend(sorted(plan.egress_plan.token_env_map.keys())) env.extend(sorted(plan.egress_plan.token_env_map.keys()))
if plan.egress_plan.canary:
env.append(f"EGRESS_TOKEN_CANARY={plan.egress_plan.canary}")
if plan.git_gate_plan.upstreams: if plan.git_gate_plan.upstreams:
env.append(f"BOT_BOTTLE_GIT_GATE_READY_FILE={_GIT_GATE_READY_FILE}") env.append(f"BOT_BOTTLE_GIT_GATE_READY_FILE={_GIT_GATE_READY_FILE}")
if plan.supervise_plan is not None: if plan.supervise_plan is not None:
@@ -420,6 +422,8 @@ def _agent_env_entries(
env.append(f"{name}={value}") env.append(f"{name}={value}")
for name in sorted(plan.forwarded_env.keys()): for name in sorted(plan.forwarded_env.keys()):
env.append(name) env.append(name)
if plan.egress_plan.canary:
env.append(f"BOT_BOTTLE_CANARY={plan.egress_plan.canary}")
return tuple(env) return tuple(env)
+39 -7
View File
@@ -53,6 +53,7 @@ from ..supervise import (
TOOL_CAPABILITY_BLOCK, TOOL_CAPABILITY_BLOCK,
TOOL_ALLOW, TOOL_ALLOW,
TOOL_EGRESS_BLOCK, TOOL_EGRESS_BLOCK,
TOOL_GITLEAKS_ALLOW,
archive_proposal, archive_proposal,
list_pending_proposals, list_pending_proposals,
render_diff, render_diff,
@@ -140,6 +141,8 @@ def _suffix_for_tool(tool: str) -> str:
return ".dockerfile" return ".dockerfile"
if tool in (TOOL_ALLOW, TOOL_EGRESS_BLOCK): if tool in (TOOL_ALLOW, TOOL_EGRESS_BLOCK):
return ".yaml" return ".yaml"
if tool == TOOL_GITLEAKS_ALLOW:
return ".txt"
return ".txt" return ".txt"
@@ -201,6 +204,23 @@ def reject(qp: QueuedProposal, *, reason: str) -> None:
_write_audit(qp, action=STATUS_REJECTED, notes=reason, diff_before="", diff_after="") _write_audit(qp, action=STATUS_REJECTED, notes=reason, diff_before="", diff_after="")
def _approve_from_tui(
stdscr: "curses._CursesWindow", # type: ignore
qp: QueuedProposal,
*,
final_file: str | None = None,
notes: str = "",
) -> str:
"""Approve from curses, prompting for any tool-specific audit note."""
if qp.proposal.tool == TOOL_GITLEAKS_ALLOW and final_file is None:
notes = _prompt(stdscr, "allow reason (test fixture/false positive): ")
if not notes:
return "approve aborted (empty reason)"
approve(qp, final_file=final_file, notes=notes)
verb = "modified+approved" if final_file is not None else "approved"
return _approval_status(qp, verb)
def _write_audit( def _write_audit(
qp: QueuedProposal, qp: QueuedProposal,
*, *,
@@ -272,7 +292,10 @@ def cmd_supervise(argv: list[str]) -> int:
return e.code if isinstance(e.code, int) else 1 return e.code if isinstance(e.code, int) else 1
except Exception as e: # noqa: W0718 — catch supervise crash for logging except Exception as e: # noqa: W0718 — catch supervise crash for logging
log_path = _write_crash_log(e) log_path = _write_crash_log(e)
error(f"supervise crashed: {type(e).__name__}: {e}") error(
f"supervise crashed: {type(e).__name__}: {e}",
context={"error_type": type(e).__name__, "crash_log": str(log_path)},
)
error(f"full traceback written to {log_path}") error(f"full traceback written to {log_path}")
return 1 return 1
return 0 return 0
@@ -384,18 +407,22 @@ def _main_loop(stdscr: "curses._CursesWindow") -> None: # type: ignore
_detail_view(stdscr, qp, green_attr=green_attr) _detail_view(stdscr, qp, green_attr=green_attr)
elif key == ord("a"): elif key == ord("a"):
try: try:
approve(qp) status_line = _approve_from_tui(stdscr, qp)
status_line = _approval_status(qp, "approved")
except ApplyError as e: except ApplyError as e:
status_line = f"apply failed: {e}" status_line = f"apply failed: {e}"
elif key == ord("m"): elif key == ord("m"):
if qp.proposal.tool == TOOL_GITLEAKS_ALLOW:
status_line = "modify unavailable for gitleaks-allow"
continue
edited = _modify(stdscr, qp) edited = _modify(stdscr, qp)
if edited is None: if edited is None:
status_line = "modify aborted (no change)" status_line = "modify aborted (no change)"
else: else:
try: try:
approve(qp, final_file=edited, notes="operator modified before approving") status_line = _approve_from_tui(
status_line = _approval_status(qp, "modified+approved") stdscr, qp, final_file=edited,
notes="operator modified before approving",
)
except ApplyError as e: except ApplyError as e:
status_line = f"apply failed: {e}" status_line = f"apply failed: {e}"
elif key == ord("r"): elif key == ord("r"):
@@ -493,15 +520,20 @@ def _detail_view(
offset = max(0, len(lines) - 1) offset = max(0, len(lines) - 1)
elif key == ord("a"): elif key == ord("a"):
try: try:
approve(qp) _approve_from_tui(stdscr, qp)
except ApplyError: except ApplyError:
pass pass
return return
elif key == ord("m"): elif key == ord("m"):
if qp.proposal.tool == TOOL_GITLEAKS_ALLOW:
return
edited = _modify(stdscr, qp) edited = _modify(stdscr, qp)
if edited is not None: if edited is not None:
try: try:
approve(qp, final_file=edited, notes="operator modified before approving") _approve_from_tui(
stdscr, qp, final_file=edited,
notes="operator modified before approving",
)
except ApplyError: except ApplyError:
pass pass
return return
+154 -4
View File
@@ -1,4 +1,4 @@
"""DLP detectors for the egress proxy (PRD 0053). """DLP detectors for the egress proxy (PRD 0053, prd-new).
Pure Python, no mitmproxy dependency. Each detector is a module-level Pure Python, no mitmproxy dependency. Each detector is a module-level
function returning `ScanResult | None`. function returning `ScanResult | None`.
@@ -15,6 +15,8 @@ import gzip
import re import re
import typing import typing
import unicodedata import unicodedata
from math import log2
from collections import Counter
from urllib.parse import quote as url_quote from urllib.parse import quote as url_quote
try: try:
@@ -96,20 +98,21 @@ def redact_tokens(
text: str, text: str,
*, *,
env: typing.Mapping[str, str] | None = None, env: typing.Mapping[str, str] | None = None,
sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",),
) -> str: ) -> str:
"""Replace token pattern matches and (if env given) provisioned secrets with REDACT.""" """Replace token pattern matches and (if env given) provisioned secrets with REDACT."""
for _, pattern in TOKEN_PATTERNS: for _, pattern in TOKEN_PATTERNS:
text = pattern.sub(REDACT, text) text = pattern.sub(REDACT, text)
if env is not None: if env is not None:
for key, value in env.items(): for key, value in env.items():
if key.startswith("EGRESS_TOKEN_") and value: if any(key.startswith(p) for p in sensitive_prefixes) and value:
for variant in _encoded_variants(value): for variant in _encoded_variants(value):
text = text.replace(variant, REDACT) text = text.replace(variant, REDACT)
return text return text
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Known secrets detector (Phase 1b) # Known secrets detector (Phase 1b, prd-new)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def _encoded_variants(secret: str) -> list[str]: def _encoded_variants(secret: str) -> list[str]:
@@ -150,17 +153,63 @@ def _encoded_variants(secret: str) -> list[str]:
return variants return variants
# ---------------------------------------------------------------------------
# Fragmentation-resistant helpers (prd-new)
# ---------------------------------------------------------------------------
# Minimum length of alnum projection for projection-based checks to run.
# Short secrets produce too many false positives in projection space.
_ALNUM_MIN_LEN = 8
# Minimum window length for the partial-substring sliding scan.
PARTIAL_MATCH_MIN_LEN = 12
def _alnum_projection(text: str) -> str:
"""Return text with every non-alphanumeric character stripped.
Used for fragmentation-resistant matching: separator-injected secrets
(spaces, hyphens, dots inserted between characters) are identical to
their originals in alnum projection space.
"""
return "".join(c for c in text if c.isalnum())
def _find_partial_window(secret_alnum: str, text_alnum: str, min_len: int) -> int | None:
"""Return the position in text_alnum where any min_len-char window of
secret_alnum first appears, or None.
Slides a window of width min_len across secret_alnum and searches for
each window in text_alnum. The first hit position is returned.
"""
if len(secret_alnum) < min_len or len(text_alnum) < min_len:
return None
for i in range(len(secret_alnum) - min_len + 1):
window = secret_alnum[i:i + min_len]
pos = text_alnum.find(window)
if pos >= 0:
return pos
return None
def scan_known_secrets( def scan_known_secrets(
text: str, text: str,
*, *,
location: str = "body", location: str = "body",
env: typing.Mapping[str, str] | None = None, env: typing.Mapping[str, str] | None = None,
sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",),
) -> ScanResult | None: ) -> ScanResult | None:
if env is None: if env is None:
return None return None
# Pre-compute alnum projection of the scan text once; reused per secret.
text_alnum: str | None = None
for key, value in env.items(): for key, value in env.items():
if not key.startswith("EGRESS_TOKEN_") or not value: if not any(key.startswith(p) for p in sensitive_prefixes) or not value:
continue continue
# Pass 1: exact match across encoded variants (original behaviour).
for variant in _encoded_variants(value): for variant in _encoded_variants(value):
pos = text.find(variant) pos = text.find(variant)
if pos >= 0: if pos >= 0:
@@ -170,6 +219,100 @@ def scan_known_secrets(
location=location, location=location,
context=_snippet(text, pos, pos + len(variant)), context=_snippet(text, pos, pos + len(variant)),
) )
# Pass 2 & 3: fragmentation-resistant projection checks.
secret_alnum = _alnum_projection(value)
if len(secret_alnum) < _ALNUM_MIN_LEN:
continue
if text_alnum is None:
text_alnum = _alnum_projection(text)
# Pass 2: full alnum-projection exact match (catches separator injection).
pos2 = text_alnum.find(secret_alnum)
if pos2 >= 0:
return ScanResult(
severity="block",
reason=(
f"provisioned secret from {key} found in {location} "
f"(fragmented match — separator injection)"
),
location=location,
context=_snippet(text_alnum, pos2, pos2 + len(secret_alnum)),
)
# Pass 3: sliding-window partial match (catches chunked-substring leaks).
pos3 = _find_partial_window(secret_alnum, text_alnum, PARTIAL_MATCH_MIN_LEN)
if pos3 is not None:
return ScanResult(
severity="block",
reason=(
f"provisioned secret from {key} found in {location} "
f"(partial match — at least {PARTIAL_MATCH_MIN_LEN} consecutive "
f"alphanumeric chars)"
),
location=location,
context=_snippet(text_alnum, pos3, pos3 + PARTIAL_MATCH_MIN_LEN),
)
return None
# ---------------------------------------------------------------------------
# Entropy detector (warn-only, prd-new)
# ---------------------------------------------------------------------------
# Sliding window size and step for the entropy scan.
ENTROPY_WINDOW = 64
ENTROPY_STEP = 32
# Bits-per-character threshold. Random ASCII printable ≈ 6.6 bits; random
# lowercase hex ≈ 4 bits; random base64url ≈ 6 bits. 5.5 sits above
# typical structured data (JSON, URLs) while staying below truly random
# content.
ENTROPY_BLOCK_THRESHOLD = 5.5
def _shannon_entropy(text: str) -> float:
if not text:
return 0.0
counts = Counter(text)
n = len(text)
return -sum((c / n) * log2(c / n) for c in counts.values())
def scan_entropy(
text: str,
*,
location: str = "body",
window: int = ENTROPY_WINDOW,
threshold: float = ENTROPY_BLOCK_THRESHOLD,
) -> ScanResult | None:
"""Warn-only detector: flag windows of `window` chars with Shannon entropy
above `threshold` bits per character.
Never blocks; always returns severity='warn'. Disabled by default
routes must opt in via dlp.outbound_detectors=['entropy'].
"""
if not text:
return None
step = max(1, window // 2)
end = len(text)
# Scan overlapping windows; also check the final tail if shorter than window.
positions = list(range(0, end - window + 1, step))
if end < window:
positions = [0]
elif (end - window) % step != 0:
positions.append(end - window)
for i in positions:
chunk = text[i:i + window]
if _shannon_entropy(chunk) >= threshold:
return ScanResult(
severity="warn",
reason=f"high-entropy content in {location} (possible encrypted exfil)",
location=location,
context=_snippet(text, i, i + len(chunk)),
)
return None return None
@@ -280,11 +423,18 @@ def scan_crlf_injection(text: str) -> ScanResult | None:
__all__ = [ __all__ = [
"ENTROPY_BLOCK_THRESHOLD",
"ENTROPY_WINDOW",
"ENTROPY_STEP",
"PARTIAL_MATCH_MIN_LEN",
"REDACT", "REDACT",
"SNIPPET_CONTEXT", "SNIPPET_CONTEXT",
"TOKEN_PATTERNS", "TOKEN_PATTERNS",
"_alnum_projection",
"_shannon_entropy",
"redact_tokens", "redact_tokens",
"scan_crlf_injection", "scan_crlf_injection",
"scan_entropy",
"scan_known_secrets", "scan_known_secrets",
"scan_naive_injection", "scan_naive_injection",
"scan_token_patterns", "scan_token_patterns",
+7
View File
@@ -10,6 +10,7 @@ specific and lives on concrete subclasses (see
from __future__ import annotations from __future__ import annotations
import dataclasses import dataclasses
import secrets
from abc import ABC from abc import ABC
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
@@ -64,6 +65,7 @@ class EgressPlan:
mitmproxy_ca_host_path: Path = Path() mitmproxy_ca_host_path: Path = Path()
mitmproxy_ca_cert_only_host_path: Path = Path() mitmproxy_ca_cert_only_host_path: Path = Path()
log: int = 0 log: int = 0
canary: str = ""
def egress_manifest_routes( def egress_manifest_routes(
@@ -299,12 +301,17 @@ class Egress(ABC):
routes_path = stage_dir / EGRESS_ROUTES_FILENAME routes_path = stage_dir / EGRESS_ROUTES_FILENAME
routes_path.write_text(egress_render_routes(routes, log=log)) routes_path.write_text(egress_render_routes(routes, log=log))
routes_path.chmod(0o600) routes_path.chmod(0o600)
# Generate a per-session canary token. The sidecar receives it as
# EGRESS_TOKEN_CANARY (scanned by the existing known-secrets detector);
# the agent receives it as BOT_BOTTLE_CANARY (a visible fake secret).
canary = secrets.token_urlsafe(32)
return EgressPlan( return EgressPlan(
slug=slug, slug=slug,
routes_path=routes_path, routes_path=routes_path,
routes=routes, routes=routes,
token_env_map=egress_token_env_map(routes), token_env_map=egress_token_env_map(routes),
log=log, log=log,
canary=canary,
) )
__all__ = [ __all__ = [
+33 -3
View File
@@ -34,7 +34,7 @@ VALID_METHODS = frozenset({
"CONNECT", "CONNECT",
}) })
OUTBOUND_DETECTOR_NAMES = frozenset({"token_patterns", "known_secrets"}) OUTBOUND_DETECTOR_NAMES = frozenset({"token_patterns", "known_secrets", "entropy"})
INBOUND_DETECTOR_NAMES = frozenset({"naive_injection_detection"}) INBOUND_DETECTOR_NAMES = frozenset({"naive_injection_detection"})
@@ -696,17 +696,28 @@ def scan_outbound(
try: try:
from dlp_detectors import ( # type: ignore[import-not-found] from dlp_detectors import ( # type: ignore[import-not-found]
scan_crlf_injection, scan_crlf_injection,
scan_entropy,
scan_known_secrets, scan_known_secrets,
scan_token_patterns, scan_token_patterns,
) )
except ImportError: # pragma: no cover - host-side path except ImportError: # pragma: no cover - host-side path
from .dlp_detectors import ( # type: ignore[import-not-found] from .dlp_detectors import ( # type: ignore[import-not-found]
scan_crlf_injection, scan_crlf_injection,
scan_entropy,
scan_known_secrets, scan_known_secrets,
scan_token_patterns, scan_token_patterns,
) )
text = body if isinstance(body, str) else body.decode("utf-8", errors="replace") # Binary bodies: latin-1 is a bijective byte↔codepoint mapping that
# preserves every byte value, so ASCII-range secret strings remain
# findable by str.find / regex. Prefer strict UTF-8 for valid text bodies.
if isinstance(body, bytes):
try:
text = body.decode("utf-8")
except UnicodeDecodeError:
text = body.decode("latin-1")
else:
text = body
# CRLF injection is never legitimate — runs unconditionally, not gated # CRLF injection is never legitimate — runs unconditionally, not gated
# by outbound_detectors config. # by outbound_detectors config.
@@ -720,7 +731,26 @@ def scan_outbound(
return result return result
if _detector_enabled(route.outbound_detectors, "known_secrets"): if _detector_enabled(route.outbound_detectors, "known_secrets"):
result = scan_known_secrets(text, location="body", env=environ) # BOT_BOTTLE_SENSITIVE_PREFIXES lets operators add extra env prefixes
# beyond EGRESS_TOKEN_* without changing the manifest schema.
extra_raw = environ.get("BOT_BOTTLE_SENSITIVE_PREFIXES", "")
extra = tuple(p for p in extra_raw.split(",") if p)
sensitive_prefixes = ("EGRESS_TOKEN_",) + extra
result = scan_known_secrets(
text, location="body", env=environ, sensitive_prefixes=sensitive_prefixes,
)
if result is not None:
return result
# Entropy scanning requires explicit opt-in: it is NOT part of the
# default "all detectors" set because it produces false positives on
# legitimate base64 / binary payloads. Routes must list "entropy" in
# dlp.outbound_detectors to enable it.
if (
route.outbound_detectors is not None
and "entropy" in route.outbound_detectors
):
result = scan_entropy(text, location="body")
if result is not None: if result is not None:
return result return result
+161
View File
@@ -247,6 +247,164 @@ cat > "$refs_file"
zero=0000000000000000000000000000000000000000 zero=0000000000000000000000000000000000000000
supervise_gitleaks_allow() {
log_opts=$1
ref=$2
report_file=$(mktemp)
if ! gitleaks git \
--log-opts="$log_opts" \
--no-banner \
--redact \
--ignore-gitleaks-allow \
--report-format=json \
--report-path="$report_file" \
--exit-code 0 \
1>&2; then
rm -f "$report_file"
echo "git-gate: gitleaks inline-suppression scan failed for $ref" >&2
return 1
fi
proposal_id=$(
GITLEAKS_ALLOW_REF="$ref" python3 - "$report_file" <<'PY'
import datetime
import hashlib
import json
import os
import sys
import uuid
from pathlib import Path
report_path = Path(sys.argv[1])
queue_dir = os.environ.get("SUPERVISE_QUEUE_DIR", "")
slug = os.environ.get("SUPERVISE_BOTTLE_SLUG", "")
if not queue_dir or not slug:
sys.exit(2)
try:
raw = json.loads(report_path.read_text() or "[]")
except json.JSONDecodeError:
sys.exit(3)
if not isinstance(raw, list):
sys.exit(3)
if not raw:
sys.exit(0)
ref = os.environ.get("GITLEAKS_ALLOW_REF", "")
lines = [
"gitleaks inline suppression requires supervisor approval",
f"ref: {ref}",
"",
]
for i, finding in enumerate(raw, 1):
if not isinstance(finding, dict):
continue
file_path = finding.get("File", "")
line_no = finding.get("StartLine", finding.get("Line", ""))
rule_id = finding.get("RuleID", "")
commit = finding.get("Commit", "")
line = finding.get("Line", "")
lines.extend([
f"finding {i}:",
f" file: {file_path}",
f" line: {line_no}",
f" rule: {rule_id}",
f" commit: {commit}",
f" code: {line}",
"",
])
payload = "\n".join(lines).rstrip() + "\n"
proposal_id = str(uuid.uuid4())
proposal = {
"id": proposal_id,
"bottle_slug": slug,
"tool": "gitleaks-allow",
"proposed_file": payload,
"justification": (
"git-gate found gitleaks findings hidden by # gitleaks:allow; "
"approve only for dummy test fixtures or confirmed false positives"
),
"arrival_timestamp": datetime.datetime.now(
datetime.timezone.utc
).isoformat(),
"current_file_hash": hashlib.sha256(payload.encode("utf-8")).hexdigest(),
}
queue = Path(queue_dir)
queue.mkdir(parents=True, exist_ok=True)
path = queue / f"{proposal_id}.proposal.json"
tmp = path.with_suffix(path.suffix + ".tmp")
with tmp.open("w", encoding="utf-8") as f:
json.dump(proposal, f, indent=2)
f.write("\n")
os.chmod(tmp, 0o600)
os.replace(tmp, path)
print(proposal_id)
PY
)
rc=$?
rm -f "$report_file"
if [ "$rc" -eq 0 ] && [ -z "$proposal_id" ]; then
return 0
fi
if [ "$rc" -ne 0 ]; then
echo "git-gate: cannot route # gitleaks:allow finding to supervisor; refusing push" >&2
return 1
fi
queue_dir=${SUPERVISE_QUEUE_DIR:-}
response_file="$queue_dir/${proposal_id}.response.json"
timeout=${SUPERVISE_GITLEAKS_ALLOW_TIMEOUT_SECONDS:-300}
case "$timeout" in
''|*[!0-9]*)
echo "git-gate: invalid SUPERVISE_GITLEAKS_ALLOW_TIMEOUT_SECONDS=$timeout" >&2
return 1
;;
esac
echo "git-gate: queued # gitleaks:allow supervisor approval $proposal_id" >&2
echo "git-gate: approve with './cli.py supervise' to continue this push" >&2
waited=0
while [ "$waited" -lt "$timeout" ]; do
if [ -f "$response_file" ]; then
status=$(python3 - "$response_file" <<'PY'
import json
import sys
try:
with open(sys.argv[1], encoding="utf-8") as f:
raw = json.load(f)
except (OSError, json.JSONDecodeError):
sys.exit(1)
status = raw.get("status")
if not isinstance(status, str):
sys.exit(1)
print(status)
PY
) || status=""
case "$status" in
approved|modified)
mkdir -p "$queue_dir/processed"
mv -f "$queue_dir/${proposal_id}.proposal.json" "$queue_dir/processed/" 2>/dev/null || true
mv -f "$queue_dir/${proposal_id}.response.json" "$queue_dir/processed/" 2>/dev/null || true
echo "git-gate: supervisor approved # gitleaks:allow for $ref" >&2
return 0
;;
rejected)
echo "git-gate: supervisor rejected # gitleaks:allow for $ref" >&2
return 1
;;
*)
echo "git-gate: invalid supervisor response for # gitleaks:allow" >&2
return 1
;;
esac
fi
sleep 1
waited=$((waited + 1))
done
echo "git-gate: supervisor approval timed out for # gitleaks:allow; refusing push" >&2
return 1
}
# Phase 1: gitleaks scan each ref's incoming commits. # Phase 1: gitleaks scan each ref's incoming commits.
while IFS=' ' read -r old new ref; do while IFS=' ' read -r old new ref; do
[ -z "$ref" ] && continue [ -z "$ref" ] && continue
@@ -268,6 +426,9 @@ while IFS=' ' read -r old new ref; do
echo "git-gate: gitleaks rejected push to $ref" >&2 echo "git-gate: gitleaks rejected push to $ref" >&2
exit 1 exit 1
fi fi
if ! supervise_gitleaks_allow "$log_opts" "$ref"; then
exit 1
fi
done < "$refs_file" done < "$refs_file"
# Phase 2: forward each ref to the upstream (`origin`, configured # Phase 2: forward each ref to the upstream (`origin`, configured
+96 -10
View File
@@ -1,21 +1,107 @@
"""Tiny logging wrappers. All output goes to stderr.""" """Tiny logging wrappers. All output goes to stderr.
Two capabilities layer onto the bare wrappers (issue #252):
- **Levels.** `debug` / `info` / `warn` / `error` carry an ordered
severity. Output is gated by `BOT_BOTTLE_LOG_LEVEL` (debug | info |
warn | error; default `info`). A message emits when its severity is
at or above the threshold, so `debug` is silent by default and
`error` always surfaces (nothing sits above it) which keeps the
fatal `die` path visible regardless of the configured level.
- **Context.** Every wrapper takes an optional `context` mapping that
renders as a parseable ` [k=v ...]` suffix (keys sorted; values with
whitespace/quotes are quoted), so failures can be filtered and
correlated instead of being flat strings.
With no `context` and the default level, output is byte-identical to the
original `bot-bottle: <msg>` / `bot-bottle: warning: <msg>` /
`bot-bottle: error: <msg>` lines the 100+ existing call sites are
unaffected.
"""
from __future__ import annotations from __future__ import annotations
import os
import sys import sys
from typing import NoReturn from typing import Mapping, NoReturn
# Ordered severities. Gaps left between values so intermediate levels
# can be added later without renumbering.
DEBUG = 10
INFO = 20
WARN = 30
ERROR = 40
_LEVEL_NAMES: dict[str, int] = {
"debug": DEBUG,
"info": INFO,
"warn": WARN,
"warning": WARN,
"error": ERROR,
}
# Default threshold when BOT_BOTTLE_LOG_LEVEL is unset or unrecognised.
_DEFAULT_THRESHOLD = INFO
_LOG_LEVEL_ENV = "BOT_BOTTLE_LOG_LEVEL"
def info(msg: str) -> None: def _threshold() -> int:
print(f"bot-bottle: {msg}", file=sys.stderr) """Resolve the active level threshold from the environment.
Read per-call (not cached) so the level can be changed at runtime
and so tests can patch `os.environ` without a reload. Unknown values
fall back to the default rather than raising logging must never be
the thing that crashes the process."""
raw = os.environ.get(_LOG_LEVEL_ENV, "")
return _LEVEL_NAMES.get(raw.strip().lower(), _DEFAULT_THRESHOLD)
def warn(msg: str) -> None: def _format_context(context: Mapping[str, object] | None) -> str:
print(f"bot-bottle: warning: {msg}", file=sys.stderr) """Render a context mapping as a ` [k=v k2=v2]` suffix.
Keys are sorted for stable, diffable output. Values that are empty or
contain whitespace or a quote are wrapped in double quotes (with inner
quotes escaped) so each `k=v` pair stays parseable. Empty/None context
renders as the empty string."""
if not context:
return ""
parts: list[str] = []
for key in sorted(context):
value = str(context[key])
if value == "" or any(ch.isspace() for ch in value) or '"' in value:
value = '"' + value.replace('"', '\\"') + '"'
parts.append(f"{key}={value}")
return " [" + " ".join(parts) + "]"
def error(msg: str) -> None: def _emit(
print(f"bot-bottle: error: {msg}", file=sys.stderr) level: int,
label: str,
msg: str,
context: Mapping[str, object] | None,
) -> None:
if level < _threshold():
return
prefix = f"{label}: " if label else ""
sys.stderr.write(f"bot-bottle: {prefix}{msg}{_format_context(context)}\n")
def debug(msg: str, *, context: Mapping[str, object] | None = None) -> None:
_emit(DEBUG, "debug", msg, context)
def info(msg: str, *, context: Mapping[str, object] | None = None) -> None:
_emit(INFO, "", msg, context)
def warn(msg: str, *, context: Mapping[str, object] | None = None) -> None:
_emit(WARN, "warning", msg, context)
def error(msg: str, *, context: Mapping[str, object] | None = None) -> None:
_emit(ERROR, "error", msg, context)
class Die(SystemExit): class Die(SystemExit):
@@ -31,6 +117,6 @@ class Die(SystemExit):
self.message = message self.message = message
def die(msg: str) -> NoReturn: def die(msg: str, *, context: Mapping[str, object] | None = None) -> NoReturn:
error(msg) error(msg, context=context)
raise Die(1, msg) raise Die(1, msg)
+9 -9
View File
@@ -19,7 +19,7 @@ Bottle schema (frontmatter):
repos: { <name>: <git-gate-entry>, ... } # optional repos: { <name>: <git-gate-entry>, ... } # optional
egress: { routes: [ <egress-route>, ... ] } egress: { routes: [ <egress-route>, ... ] }
# route keys: host, matches, auth, role, dlp # route keys: host, matches, auth, role, dlp
supervise: <bool> # optional supervise: <bool> # optional (default true)
Agent schema (frontmatter): Agent schema (frontmatter):
bottle: <bottle-name> # required bottle: <bottle-name> # required
@@ -111,13 +111,13 @@ class ManifestBottle:
# identity without any git-gate.repos upstreams, and vice versa. # identity without any git-gate.repos upstreams, and vice versa.
git_user: ManifestGitUser = field(default_factory=ManifestGitUser) git_user: ManifestGitUser = field(default_factory=ManifestGitUser)
egress: ManifestEgressConfig = field(default_factory=ManifestEgressConfig) egress: ManifestEgressConfig = field(default_factory=ManifestEgressConfig)
# Opt-in per-bottle stuck-recovery sidecar (PRD 0013). When true, # Per-bottle stuck-recovery sidecar (PRD 0013). When true (the
# the launch step brings up a supervise sidecar that exposes MCP # default, issue #249), the launch step brings up a supervise
# tools to the agent (egress-block, capability-block) plus mounts # sidecar that exposes MCP tools to the agent (egress-block,
# the current-config dir read-only into the agent at # capability-block) plus mounts the current-config dir read-only
# /etc/bot-bottle/current-config. False (the default) skips the # into the agent at /etc/bot-bottle/current-config. Set
# sidecar and mount. # `supervise: false` to skip the sidecar and mount.
supervise: bool = False supervise: bool = True
@classmethod @classmethod
def from_dict(cls, name: str, raw: object) -> "ManifestBottle": def from_dict(cls, name: str, raw: object) -> "ManifestBottle":
@@ -190,7 +190,7 @@ class ManifestBottle:
else ManifestEgressConfig() else ManifestEgressConfig()
) )
supervise_raw = d.get("supervise", False) supervise_raw = d.get("supervise", True)
if not isinstance(supervise_raw, bool): if not isinstance(supervise_raw, bool):
raise ManifestError( raise ManifestError(
f"bottle '{name}' supervise must be a boolean " f"bottle '{name}' supervise must be a boolean "
+3
View File
@@ -51,11 +51,13 @@ SUPERVISE_PORT = 9100
TOOL_CAPABILITY_BLOCK = "capability-block" TOOL_CAPABILITY_BLOCK = "capability-block"
TOOL_EGRESS_BLOCK = "egress-block" TOOL_EGRESS_BLOCK = "egress-block"
TOOL_ALLOW = "allow" TOOL_ALLOW = "allow"
TOOL_GITLEAKS_ALLOW = "gitleaks-allow"
TOOL_LIST_EGRESS_ROUTES = "list-egress-routes" TOOL_LIST_EGRESS_ROUTES = "list-egress-routes"
TOOLS: tuple[str, ...] = ( TOOLS: tuple[str, ...] = (
TOOL_ALLOW, TOOL_ALLOW,
TOOL_CAPABILITY_BLOCK, TOOL_CAPABILITY_BLOCK,
TOOL_EGRESS_BLOCK, TOOL_EGRESS_BLOCK,
TOOL_GITLEAKS_ALLOW,
TOOL_LIST_EGRESS_ROUTES, TOOL_LIST_EGRESS_ROUTES,
) )
@@ -553,6 +555,7 @@ __all__ = [
"EGRESS_FORWARD_PROXY", "EGRESS_FORWARD_PROXY",
"EGRESS_INTROSPECT_URL", "EGRESS_INTROSPECT_URL",
"TOOL_CAPABILITY_BLOCK", "TOOL_CAPABILITY_BLOCK",
"TOOL_GITLEAKS_ALLOW",
"TOOL_LIST_EGRESS_ROUTES", "TOOL_LIST_EGRESS_ROUTES",
"archive_proposal", "archive_proposal",
"audit_dir", "audit_dir",
@@ -1,4 +1,4 @@
# PRD prd-new: Commit bottle state to an image # PRD 0060: Commit bottle state to an image
- **Status:** Active - **Status:** Active
- **Author:** Claude - **Author:** Claude
@@ -0,0 +1,101 @@
# PRD 0061: Gitleaks inline suppression supervision
- **Status:** Active
- **Author:** codex
- **Created:** 2026-06-10
- **Issue:** #208
## Summary
Route every `# gitleaks:allow` inline suppression through the existing
supervisor approval queue before git-gate forwards a push upstream. The
annotation remains available for test fixtures and confirmed false positives,
but it no longer lets an agent silently bypass the gate's secret scan.
## Problem
Gitleaks supports inline suppression comments. With the default behaviour,
a pushed line that contains a secret-like value and `# gitleaks:allow` is
ignored by gitleaks and accepted by git-gate. That is convenient for dummy
credentials in tests, but it also gives an agent a one-line bypass for a real
secret. Once the commit lands, the diff alone cannot prove whether the
suppression was legitimate.
## Goals / Success Criteria
1. Git-gate continues to run the normal gitleaks scan for every incoming ref.
2. After the normal scan passes, git-gate runs a second scan with
`--ignore-gitleaks-allow` and a JSON report so suppressed findings become
visible.
3. If that second scan reports no suppressed findings, the push proceeds
unchanged.
4. If it reports suppressed findings, git-gate creates a `gitleaks-allow`
supervisor proposal containing the ref, file path, line number, rule,
commit, and flagged line for each finding.
5. The push proceeds only when the supervisor explicitly approves the
proposal; rejection, malformed responses, missing supervisor configuration,
and timeout all refuse the push.
6. The supervisor TUI requires a reason when approving a `gitleaks-allow`
proposal, so the audit trail records whether the approval was for a test
fixture or a false positive.
## Non-goals
- Replacing gitleaks or changing the main secret-detection rule set.
- Removing support for `# gitleaks:allow`.
- Automatically classifying fixture files or false positives.
- Adding new supervisor transport or authentication mechanisms.
## Design
### Git-gate flow
`git_gate_render_hook()` emits a `supervise_gitleaks_allow` shell helper.
For each incoming ref, git-gate first runs the existing gitleaks command. If
that scan passes, it runs:
```sh
gitleaks git \
--log-opts="$log_opts" \
--no-banner \
--redact \
--ignore-gitleaks-allow \
--report-format=json \
--report-path="$report_file" \
--exit-code 0
```
The second pass keeps the push path non-interactive while producing a report
of findings that would otherwise have been hidden by inline suppression.
### Supervisor proposal
When the JSON report contains findings, an embedded Python helper writes a
proposal into `SUPERVISE_QUEUE_DIR` using the existing proposal schema. The
proposal uses:
- `tool: "gitleaks-allow"`
- a text payload with the ref and each finding's file, line, rule, commit,
and redacted code line
- a justification that tells the operator to approve only dummy test fixtures
or confirmed false positives
Git-gate then waits for `<proposal-id>.response.json` for
`SUPERVISE_GITLEAKS_ALLOW_TIMEOUT_SECONDS`, defaulting to 300 seconds.
`approved` and `modified` responses allow the push; `rejected`, invalid
responses, invalid timeout configuration, or timeout refuse it.
### Supervisor UI
`TOOL_GITLEAKS_ALLOW` is added to the supervisor tool registry. The curses
supervisor renders the proposal as text and allows approval or rejection.
Modification is unavailable for this proposal type because there is no file
patch to apply. Approval from the TUI prompts for a non-empty reason and
writes that reason to the response/audit path.
### Tests
Unit tests assert that the rendered git-gate hook includes the second gitleaks
pass, supervisor queue fields, and fail-closed messages. Supervisor tests cover
the new tool constant, proposal archiving, and the required TUI approval
reason.
@@ -0,0 +1,186 @@
# PRD prd-new: Strengthen outbound exfiltration detection
- **Status:** Active
- **Author:** claude
- **Created:** 2026-06-25
- **Issue:** #259
## Summary
The current DLP system has two detection tiers: weak (vendor token regexes)
and strong (exact matching of provisioned `EGRESS_TOKEN_*` values in multiple
encodings). This PRD adds four complementary hardening layers in ranked order
of impact:
1. **Canary tokens** — inject a per-session fake secret into the agent's
environment; any outbound appearance is a zero-false-positive exfil signal.
2. **Broadened known-value scanning** — extend the strong tier beyond
`EGRESS_TOKEN_*` via a `BOT_BOTTLE_SENSITIVE_PREFIXES` sidecar env var.
3. **Fragmentation-resistant matching** — catch separator injection and
long-substring leaks via alnum projection and sliding-window substring
search.
4. **Entropy scoring** — warn-only flag on high-entropy blobs as a
complementary signal for encrypted exfil.
## Problem
After PRD 0056 the outbound scan covers all HTTP request surfaces (host, path,
query, headers, body). But the content-based strong tier only matches
`EGRESS_TOKEN_*` values and their encodings. Three gaps remain:
| Gap | How an agent exploits it |
|-----|--------------------------|
| No canary | There is no "can't be false-positive" signal |
| Narrow prefix filter | Sensitive values provisioned under other env prefixes (MCP keys, API keys injected via the agent provider) are invisible to `scan_known_secrets` |
| Fragmentation blindness | Inserting `-`, space, or `\n` between secret characters turns an exact-match into a miss |
## Goals / Success Criteria
1. Each launched bottle has a unique canary token in the agent's environment
(`BOT_BOTTLE_CANARY`) and the egress sidecar's environment
(`EGRESS_TOKEN_CANARY`). Any outbound appearance of the canary blocks the
request with reason `"canary token"`.
2. `scan_known_secrets` accepts a `sensitive_prefixes` parameter (default:
`("EGRESS_TOKEN_",)`). `scan_outbound` reads
`BOT_BOTTLE_SENSITIVE_PREFIXES` from `environ` and merges those prefixes
in, so operators can mark additional env vars as scanned values without
changing the manifest schema.
3. For every secret that passes exact-match, a secondary alnum-projection pass
checks for the secret with all non-alphanumeric characters stripped. This
catches separator-injection evasion (`MY-SECRET` → body contains
`MY SECRET`).
4. A sliding-window partial-match pass checks for long-enough contiguous
substrings of the secret's alnum projection in the text's alnum projection.
Any match ≥ `PARTIAL_MATCH_MIN_LEN` (12 chars) blocks with reason
`"partial match"`.
5. A new `scan_entropy` detector flags outbound text windows with Shannon
entropy ≥ `ENTROPY_BLOCK_THRESHOLD` (5.5 bits/char) at **warn** severity
only. It is registered under the new detector name `"entropy"` in
`OUTBOUND_DETECTOR_NAMES` and disabled by default (routes must opt in).
6. Binary request bodies are decoded via `latin-1` instead of
`utf-8 errors="replace"`, preserving every byte value and allowing
ASCII-range secrets to be found within binary payloads.
7. All new behaviour is unit-tested; existing tests pass unchanged.
## Non-goals
- Rolling per-host buffer for split-across-requests detection (state in the
stateless addon is complex; deferred).
- Additional vendor regexes.
- ML / embedding-based detection.
- Entropy-based hard blocks (warn only per the issue).
## Design
### Canary token flow
```
Egress.prepare()
canary = secrets.token_urlsafe(32)
EgressPlan(canary=canary, ...)
Docker compose render:
sidecar env: EGRESS_TOKEN_CANARY=<canary> ← scanned by existing known-secrets detector
agent env: BOT_BOTTLE_CANARY=<canary> ← visible to agent as a "secret"
macos-container launch: same literals added to sidecar + agent env entries
```
`EGRESS_TOKEN_CANARY` matches the `EGRESS_TOKEN_` prefix already scanned by
`scan_known_secrets`, so no detector code changes are required for canary
detection — only the injection path.
### Broadened known-value scanning
`scan_known_secrets` gains a `sensitive_prefixes` parameter:
```python
def scan_known_secrets(
text: str,
*,
location: str = "body",
env: Mapping[str, str] | None = None,
sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",),
) -> ScanResult | None:
```
`scan_outbound` reads `BOT_BOTTLE_SENSITIVE_PREFIXES` (comma-separated list
of additional prefixes) from `environ` and appends them:
```python
extra = tuple(
p for p in environ.get("BOT_BOTTLE_SENSITIVE_PREFIXES", "").split(",") if p
)
sensitive_prefixes = ("EGRESS_TOKEN_",) + extra
```
`redact_tokens` receives the same treatment for consistent redaction.
### Fragmentation-resistant matching
A new helper `_alnum_projection(text)` strips all non-alphanumeric characters.
`scan_known_secrets` runs two passes per secret:
1. **Exact pass** — existing encoded-variant loop (unchanged).
2. **Alnum-projection pass** — if the secret's alnum projection has ≥ 8 chars,
check if it appears in the text's alnum projection. Match → block with
`"fragmented match (separator injection)"` reason.
3. **Partial-substring pass** — if the secret's alnum projection has ≥
`PARTIAL_MATCH_MIN_LEN` chars (12), slide a window of that length across the
secret's projection and look for each window in the text's alnum projection.
First match → block with `"partial match"` reason.
All three passes run only for the `"known_secrets"` detector; the token-pattern
and entropy detectors are unchanged.
### Entropy scoring
New public function:
```python
def scan_entropy(
text: str,
*,
location: str = "body",
window: int = ENTROPY_WINDOW, # 64
threshold: float = ENTROPY_BLOCK_THRESHOLD, # 5.5
) -> ScanResult | None:
```
Slides a window of `window` characters across `text` in steps of `window // 2`.
If any window's Shannon entropy exceeds `threshold`, returns a **warn**-severity
`ScanResult`. Never blocks.
`OUTBOUND_DETECTOR_NAMES` gains `"entropy"`. Routes opt in via their `dlp`
block; entropy scanning is **off by default** to avoid false-positive noise on
legitimate binary payloads.
### Binary body handling
In `scan_outbound`, the bytes → str decoding changes from:
```python
body.decode("utf-8", errors="replace")
```
to:
```python
body.decode("utf-8") if body is str else body.decode("latin-1")
```
`latin-1` is a bijective byte↔codepoint mapping; every byte value is preserved
as its corresponding Latin-1 code point, so ASCII-range secret strings remain
intact and `str.find` / regex still locate them correctly. The fallback from
strict UTF-8 is tried first so valid UTF-8 bodies are decoded faithfully.
## Implementation
Delivered in three commits on the same branch:
1. **DLP detector changes**`_alnum_projection`, fragmentation passes,
`scan_entropy`, broadened `scan_known_secrets`, updated `scan_outbound` and
`redact_tokens`; all accompanying unit tests.
2. **Canary injection**`EgressPlan.canary`, `Egress.prepare()`,
Docker compose + macos-container backend injection.
3. **PRD flip**`Status: Draft → Active`.
@@ -22,7 +22,7 @@ escapes**, and **whether credentials are short-lived and scoped**.
- Outbound: Docker containers have full internet access by default; no egress monitoring on most home networks - Outbound: Docker containers have full internet access by default; no egress monitoring on most home networks
- Lateral movement: compromised container can reach the LAN — NAS, other machines, internal services - Lateral movement: compromised container can reach the LAN — NAS, other machines, internal services
- Notable: CVE-2025-59536 (CVSS 8.7, Feb 2026) — a poisoned `.claude/settings.json` in a repo gives RCE when Claude Code opens it. `--dangerously-skip-permissions` removes the last gate. - Notable: CVE-2025-59536 (CVSS 8.7, Feb 2026) — a poisoned `.claude/settings.json` in a repo gives RCE when Claude Code opens it. `--dangerously-skip-permissions` removes the last gate.
- Supply chain: MCP servers, skills, and npm packages pulled during agent execution. ~20% of ClawHub skills were found malicious in early 2026. - Supply chain: MCP servers, skills, and npm packages pulled during agent execution. A Jan 2026 large-scale empirical study of a 98,380-skill snapshot confirmed 157 malicious skills, ~71% of them credential harvesters. Exfiltration was overwhelmingly naive — plaintext HTTP to hardcoded endpoints; under 10% used any code obfuscation, and concealment was mostly at the documentation level, not the code level. ([Malicious Agent Skills in the Wild](https://arxiv.org/html/2602.06547v1), arXiv:2602.06547)
**What local topology protects:** **What local topology protects:**
- No inbound attack surface — nothing listening on a public port - No inbound attack surface — nothing listening on a public port
+8 -8
View File
@@ -1,14 +1,14 @@
--- ---
agent_provider: agent_provider:
template: claude template: claude
# auth_token names the host env var holding the Claude OAuth token. The
egress: # provider injects a provider-owned api.anthropic.com egress route that
routes: # re-injects this token as the Bearer header; the agent only ever sees a
- host: api.anthropic.com # placeholder CLAUDE_CODE_OAUTH_TOKEN. DLP defaults (token_patterns,
role: claude_code_oauth # known_secrets outbound; naive_injection_detection inbound) apply to
auth: # that route. To scan additional hosts, declare them under egress.routes
scheme: Bearer # with per-route matches/dlp (see README "Egress route fields").
token_ref: BOT_BOTTLE_CLAUDE_OAUTH_TOKEN auth_token: BOT_BOTTLE_CLAUDE_OAUTH_TOKEN
--- ---
Common Claude provider boundary. Drop this file into Common Claude provider boundary. Drop this file into
+191 -3
View File
@@ -1,18 +1,24 @@
"""Unit: DLP detectors (PRD 0053). """Unit: DLP detectors (PRD 0053, prd-new).
Tests for token pattern scanning, known secret detection, and Tests for token pattern scanning, known secret detection, fragmentation-
naive prompt injection detection.""" resistant matching, entropy scoring, and naive prompt injection detection."""
import base64 import base64
import gzip import gzip
import unittest import unittest
from bot_bottle.dlp_detectors import ( from bot_bottle.dlp_detectors import (
ENTROPY_BLOCK_THRESHOLD,
ENTROPY_WINDOW,
PARTIAL_MATCH_MIN_LEN,
REDACT, REDACT,
_alnum_projection,
_encoded_variants, _encoded_variants,
_normalize_text, _normalize_text,
_shannon_entropy,
redact_tokens, redact_tokens,
scan_crlf_injection, scan_crlf_injection,
scan_entropy,
scan_known_secrets, scan_known_secrets,
scan_naive_injection, scan_naive_injection,
scan_token_patterns, scan_token_patterns,
@@ -445,5 +451,187 @@ class TestKnownSecretsNewVariants(unittest.TestCase):
self.assertIsNotNone(result) self.assertIsNotNone(result)
class TestAlnumProjection(unittest.TestCase):
def test_alphanumeric_unchanged(self):
self.assertEqual("abc123XYZ", _alnum_projection("abc123XYZ"))
def test_strips_hyphens(self):
self.assertEqual("mysecretvalue", _alnum_projection("my-secret-value"))
def test_strips_spaces(self):
self.assertEqual("mysecretvalue", _alnum_projection("my secret value"))
def test_strips_dots_and_underscores(self):
self.assertEqual("mysecretvalue", _alnum_projection("my.secret_value"))
def test_empty_string(self):
self.assertEqual("", _alnum_projection(""))
def test_all_special_chars(self):
self.assertEqual("", _alnum_projection("!@#$%^&*()"))
class TestFragmentationResistantMatching(unittest.TestCase):
"""scan_known_secrets catches separator-injection and partial-substring evasion."""
# Secrets long enough that their alnum projections are ≥ 8 chars.
SECRET = "supersecrettoken99"
ENV = {"EGRESS_TOKEN_0": SECRET}
def test_exact_match_still_works(self):
result = scan_known_secrets(f"key={self.SECRET}", env=self.ENV)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_separator_injection_blocked(self):
# Hyphens inserted between chars of the secret.
fragmented = "-".join(self.SECRET)
result = scan_known_secrets(f"data={fragmented}", env=self.ENV)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
self.assertIn("separator injection", result.reason)
def test_space_separator_blocked(self):
fragmented = " ".join(self.SECRET)
result = scan_known_secrets(f"body: {fragmented}", env=self.ENV)
self.assertIsNotNone(result)
assert result is not None
self.assertIn("separator injection", result.reason)
def test_partial_substring_blocked(self):
# First PARTIAL_MATCH_MIN_LEN alnum chars of the secret, no separators.
partial = _alnum_projection(self.SECRET)[:PARTIAL_MATCH_MIN_LEN]
result = scan_known_secrets(f"x={partial}&y=other", env=self.ENV)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
self.assertIn("partial match", result.reason)
def test_short_secret_skips_projection(self):
# Secrets shorter than _ALNUM_MIN_LEN in alnum projection are not
# fragmentation-checked (too many false positives).
short_env = {"EGRESS_TOKEN_0": "abc"}
# "a b c" has alnum projection "abc" (3 chars, < 8); should not block.
self.assertIsNone(scan_known_secrets("a b c", env=short_env))
def test_clean_text_not_blocked(self):
self.assertIsNone(scan_known_secrets("nothing to see here", env=self.ENV))
def test_sensitive_prefixes_param_extra_prefix(self):
env = {"MY_CRED_0": self.SECRET, "IGNORED": "other"}
result = scan_known_secrets(
f"key={self.SECRET}",
env=env,
sensitive_prefixes=("MY_CRED_",),
)
self.assertIsNotNone(result)
assert result is not None
self.assertIn("MY_CRED_0", result.reason)
def test_sensitive_prefixes_default_only_egress_token(self):
# A value under a non-EGRESS_TOKEN_ key is ignored with default prefixes.
env = {"MY_CRED_0": self.SECRET}
self.assertIsNone(scan_known_secrets(f"key={self.SECRET}", env=env))
def test_canary_prefix_detected(self):
canary_value = "canary-fake-secret-value-xyz"
env = {"EGRESS_TOKEN_CANARY": canary_value}
result = scan_known_secrets(f"x={canary_value}", env=env)
self.assertIsNotNone(result)
assert result is not None
self.assertIn("EGRESS_TOKEN_CANARY", result.reason)
class TestRedactTokensBroadenedPrefixes(unittest.TestCase):
SECRET = "my-provisioned-secret"
def test_default_redacts_egress_token(self):
env = {"EGRESS_TOKEN_0": self.SECRET}
out = redact_tokens(f"val={self.SECRET}", env=env)
self.assertNotIn(self.SECRET, out)
self.assertIn(REDACT, out)
def test_extra_prefix_redacted(self):
env = {"MY_SECRET_KEY": self.SECRET}
out = redact_tokens(
f"val={self.SECRET}",
env=env,
sensitive_prefixes=("MY_SECRET_",),
)
self.assertNotIn(self.SECRET, out)
self.assertIn(REDACT, out)
def test_non_matching_prefix_not_redacted(self):
env = {"MY_SECRET_KEY": self.SECRET}
out = redact_tokens(f"val={self.SECRET}", env=env)
# Default prefixes only include EGRESS_TOKEN_ → secret not redacted
self.assertIn(self.SECRET, out)
class TestShannonEntropy(unittest.TestCase):
def test_empty_string_zero(self):
self.assertEqual(0.0, _shannon_entropy(""))
def test_single_char_zero(self):
self.assertEqual(0.0, _shannon_entropy("aaaaaa"))
def test_two_equal_chars_one_bit(self):
self.assertAlmostEqual(1.0, _shannon_entropy("abababab"), places=10)
def test_high_entropy_random_like(self):
# Uniform 64-char string over 64 distinct symbols has entropy 6 bits.
import string
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
text = alphabet # each char appears exactly once
self.assertAlmostEqual(6.0, _shannon_entropy(text), places=10)
class TestScanEntropy(unittest.TestCase):
def test_empty_returns_none(self):
self.assertIsNone(scan_entropy(""))
def test_low_entropy_returns_none(self):
# Highly repetitive text has low entropy.
self.assertIsNone(scan_entropy("a" * 200))
def test_high_entropy_warns(self):
# Build a 64-char string with entropy > ENTROPY_BLOCK_THRESHOLD.
# Use all 64 distinct printable chars to maximise entropy (~6 bits).
import string
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
result = scan_entropy(alphabet, threshold=ENTROPY_BLOCK_THRESHOLD)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("warn", result.severity)
self.assertIn("high-entropy", result.reason)
def test_never_blocks(self):
import string
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
result = scan_entropy(alphabet)
# scan_entropy is warn-only; it must never return severity="block".
if result is not None:
self.assertNotEqual("block", result.severity)
def test_location_in_result(self):
import string
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
result = scan_entropy(alphabet, location="authorization header")
if result is not None:
self.assertIn("authorization header", result.location)
def test_structured_json_no_warn(self):
# Typical JSON has low entropy and should not be flagged.
json_body = '{"status": "ok", "message": "hello world", "count": 42}'
self.assertIsNone(scan_entropy(json_body))
def test_short_text_below_window(self):
# Text shorter than the window: checked as one chunk.
# Use a uniform string to ensure it won't be flagged.
self.assertIsNone(scan_entropy("abcde", threshold=ENTROPY_BLOCK_THRESHOLD))
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
+64 -1
View File
@@ -1,10 +1,14 @@
"""Unit: Egress route lift + routes.yaml render + token """Unit: Egress route lift + routes.yaml render + token
resolution (PRD 0017, PRD 0053).""" resolution (PRD 0017, PRD 0053, prd-new)."""
import tempfile
import unittest import unittest
from pathlib import Path
from bot_bottle.egress import ( from bot_bottle.egress import (
CODEX_HOST_CREDENTIAL_TOKEN_REF, CODEX_HOST_CREDENTIAL_TOKEN_REF,
Egress,
EgressPlan,
EgressRoute, EgressRoute,
egress_manifest_routes, egress_manifest_routes,
egress_render_routes, egress_render_routes,
@@ -409,5 +413,64 @@ class TestResolveTokenValues(unittest.TestCase):
self.assertEqual({"EGRESS_TOKEN_0": "codex-access-token"}, out) self.assertEqual({"EGRESS_TOKEN_0": "codex-access-token"}, out)
class TestCanaryGeneration(unittest.TestCase):
"""Egress.prepare() generates a unique canary token per session (prd-new)."""
def _bottle_obj(self):
return ManifestIndex.from_json_obj({
"bottles": {"dev": {"egress": {"routes": []}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
}).bottles["dev"]
def _make_plan(self) -> EgressPlan:
# Use a concrete no-op subclass so we can call prepare() without
# a real backend.
class _TestEgress(Egress):
pass
e = _TestEgress()
with tempfile.TemporaryDirectory() as td:
return e.prepare(self._bottle_obj(), "test-slug", Path(td))
def test_canary_is_non_empty(self):
plan = self._make_plan()
self.assertIsInstance(plan.canary, str)
self.assertGreater(len(plan.canary), 0)
def test_canary_is_unique_per_session(self):
with tempfile.TemporaryDirectory() as td:
bottle = self._bottle_obj()
class _TestEgress(Egress):
pass
e = _TestEgress()
plan_a = e.prepare(bottle, "slug-a", Path(td))
plan_b = e.prepare(bottle, "slug-b", Path(td))
self.assertNotEqual(plan_a.canary, plan_b.canary)
def test_canary_detected_by_scan_known_secrets(self):
from bot_bottle.dlp_detectors import scan_known_secrets
plan = self._make_plan()
env = {"EGRESS_TOKEN_CANARY": plan.canary}
result = scan_known_secrets(f"exfil={plan.canary}", env=env)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
self.assertIn("EGRESS_TOKEN_CANARY", result.reason)
def test_egress_plan_canary_field_default_empty(self):
# Verify EgressPlan can be constructed with an empty canary (backward compat).
from pathlib import Path
plan = EgressPlan(
slug="s",
routes_path=Path("/tmp/r.yaml"),
routes=(),
token_env_map={},
)
self.assertEqual("", plan.canary)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
+98
View File
@@ -1167,5 +1167,103 @@ class TestScanInbound(unittest.TestCase):
self.assertEqual("block", result.severity) self.assertEqual("block", result.severity)
class TestScanOutboundEnhanced(unittest.TestCase):
"""scan_outbound changes from prd-new: binary decode, entropy detector,
broadened known-value prefixes, fragmentation resistance."""
_ROUTE = Route(host="api.example.com")
_ROUTE_ENTROPY = Route(
host="api.example.com",
outbound_detectors=("entropy",),
)
def test_binary_body_latin1_decode_finds_ascii_secret(self):
# Body contains valid ASCII secret surrounded by non-UTF-8 bytes.
secret = "supersecrettoken99"
env = {"EGRESS_TOKEN_0": secret}
# Wrap the secret in bytes that are invalid UTF-8.
body = b"\x80\x81" + secret.encode("ascii") + b"\xff"
result = scan_outbound(self._ROUTE, body, env)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_binary_body_valid_utf8_decoded_correctly(self):
env = {"EGRESS_TOKEN_0": "mysecret"}
# Valid UTF-8 body — should be decoded as UTF-8, not latin-1.
body = "clean body with mysecret".encode("utf-8")
result = scan_outbound(self._ROUTE, body, env)
self.assertIsNotNone(result)
def test_entropy_detector_off_by_default(self):
import string
# High-entropy content should NOT warn if the route has no entropy detector.
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
result = scan_outbound(self._ROUTE, alphabet, {})
self.assertIsNone(result)
def test_entropy_detector_warns_when_enabled(self):
import string
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
result = scan_outbound(self._ROUTE_ENTROPY, alphabet, {})
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("warn", result.severity)
def test_bot_bottle_sensitive_prefixes_env_var(self):
# When the sidecar env contains BOT_BOTTLE_SENSITIVE_PREFIXES,
# scan_outbound should scan those additional prefixes.
secret = "extra-sensitive-value-abc"
env = {
"MY_CRED_KEY": secret,
"BOT_BOTTLE_SENSITIVE_PREFIXES": "MY_CRED_",
}
result = scan_outbound(self._ROUTE, f"x={secret}", env)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_bot_bottle_sensitive_prefixes_multiple(self):
secret = "my-api-key-value-xyz"
env = {
"ANTHROPIC_API_0": secret,
"BOT_BOTTLE_SENSITIVE_PREFIXES": "ANTHROPIC_API_,OTHER_",
}
result = scan_outbound(self._ROUTE, f"auth={secret}", env)
self.assertIsNotNone(result)
def test_canary_detected_via_egress_token_canary(self):
# The canary (injected as EGRESS_TOKEN_CANARY) is caught by known_secrets.
canary = "canaryvalue12345abcdef"
env = {"EGRESS_TOKEN_CANARY": canary}
result = scan_outbound(self._ROUTE, f"data={canary}", env)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
self.assertIn("EGRESS_TOKEN_CANARY", result.reason)
def test_fragmented_canary_blocked(self):
# Canary with separators injected is still caught.
canary = "supersecretcanary99"
env = {"EGRESS_TOKEN_CANARY": canary}
fragmented = "-".join(canary)
result = scan_outbound(self._ROUTE, f"x={fragmented}", env)
self.assertIsNotNone(result)
class TestOutboundDetectorNames(unittest.TestCase):
def test_entropy_in_outbound_detector_names(self):
from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES
self.assertIn("entropy", OUTBOUND_DETECTOR_NAMES)
def test_known_secrets_in_outbound_detector_names(self):
from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES
self.assertIn("known_secrets", OUTBOUND_DETECTOR_NAMES)
def test_token_patterns_in_outbound_detector_names(self):
from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES
self.assertIn("token_patterns", OUTBOUND_DETECTOR_NAMES)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
+24
View File
@@ -199,6 +199,30 @@ class TestHookRender(unittest.TestCase):
self.assertIn('set -- "$@" --push-option="$opt"', hook) self.assertIn('set -- "$@" --push-option="$opt"', hook)
self.assertIn('git push "$@" origin "$refspec"', hook) self.assertIn('git push "$@" origin "$refspec"', hook)
def test_inline_gitleaks_allow_routes_to_supervisor(self):
hook = git_gate_render_hook()
# First gitleaks runs normally; only if that passes does the
# hook ask gitleaks to ignore inline allow comments and report
# the suppressed findings for human approval.
self.assertIn("--ignore-gitleaks-allow", hook)
self.assertIn("--report-format=json", hook)
self.assertIn('"tool": "gitleaks-allow"', hook)
self.assertIn("SUPERVISE_QUEUE_DIR", hook)
self.assertIn("SUPERVISE_BOTTLE_SLUG", hook)
self.assertIn("supervisor approved # gitleaks:allow", hook)
self.assertIn("supervisor rejected # gitleaks:allow", hook)
def test_inline_gitleaks_allow_fails_closed_without_supervisor(self):
hook = git_gate_render_hook()
self.assertIn(
"cannot route # gitleaks:allow finding to supervisor; refusing push",
hook,
)
self.assertIn(
"supervisor approval timed out for # gitleaks:allow; refusing push",
hook,
)
class TestAccessHookRender(unittest.TestCase): class TestAccessHookRender(unittest.TestCase):
def test_access_hook_refreshes_origin_on_upload_pack(self): def test_access_hook_refreshes_origin_on_upload_pack(self):
+127
View File
@@ -0,0 +1,127 @@
"""Unit: leveled + structured logging wrappers (issue #252).
Locks three properties of bot_bottle.log:
- backward compatibility default output is byte-identical to the
original bare wrappers, so the 100+ existing single-string call
sites are unaffected;
- context rendering an optional mapping becomes a parseable
` [k=v ...]` suffix;
- level gating BOT_BOTTLE_LOG_LEVEL filters by severity, debug is
silent by default, and error always surfaces.
"""
from __future__ import annotations
import contextlib
import io
import unittest
from typing import Callable
from unittest import mock
from bot_bottle import log
def _capture(
fn: Callable[..., None],
*args: object,
env: dict[str, str] | None = None,
**kwargs: object,
) -> str:
buf = io.StringIO()
patched = mock.patch.dict("os.environ", env or {}, clear=False)
with patched, contextlib.redirect_stderr(buf):
fn(*args, **kwargs)
return buf.getvalue()
class TestBackwardCompat(unittest.TestCase):
"""No context + default level → exactly the legacy lines."""
def test_info(self):
self.assertEqual("bot-bottle: hello\n", _capture(log.info, "hello"))
def test_warn(self):
self.assertEqual(
"bot-bottle: warning: careful\n", _capture(log.warn, "careful")
)
def test_error(self):
self.assertEqual(
"bot-bottle: error: boom\n", _capture(log.error, "boom")
)
class TestContext(unittest.TestCase):
def test_appends_sorted_parseable_suffix(self):
out = _capture(
log.error, "rpc failed", context={"slug": "abc123", "code": "-32603"}
)
# keys sorted: code before slug
self.assertEqual(
"bot-bottle: error: rpc failed [code=-32603 slug=abc123]\n", out
)
def test_quotes_values_with_whitespace(self):
out = _capture(
log.info, "did thing", context={"path": "/a b/c", "ok": "yes"}
)
self.assertEqual(
'bot-bottle: did thing [ok=yes path="/a b/c"]\n', out
)
def test_empty_context_is_noop_suffix(self):
self.assertEqual(
"bot-bottle: x\n", _capture(log.info, "x", context={})
)
class TestLevels(unittest.TestCase):
def test_debug_silent_by_default(self):
self.assertEqual("", _capture(log.debug, "trace"))
def test_debug_emits_when_level_lowered(self):
out = _capture(log.debug, "trace", env={"BOT_BOTTLE_LOG_LEVEL": "debug"})
self.assertEqual("bot-bottle: debug: trace\n", out)
def test_error_level_suppresses_info_and_warn(self):
env = {"BOT_BOTTLE_LOG_LEVEL": "error"}
self.assertEqual("", _capture(log.info, "i", env=env))
self.assertEqual("", _capture(log.warn, "w", env=env))
# error still surfaces — nothing sits above it
self.assertEqual(
"bot-bottle: error: e\n", _capture(log.error, "e", env=env)
)
def test_unknown_level_falls_back_to_default(self):
# garbage value → default INFO threshold, so info still prints
out = _capture(log.info, "i", env={"BOT_BOTTLE_LOG_LEVEL": "loud"})
self.assertEqual("bot-bottle: i\n", out)
def test_warning_alias_accepted(self):
env = {"BOT_BOTTLE_LOG_LEVEL": "warning"}
self.assertEqual("", _capture(log.info, "i", env=env))
self.assertEqual(
"bot-bottle: warning: w\n", _capture(log.warn, "w", env=env)
)
class TestDie(unittest.TestCase):
def test_die_still_raises_and_prints_error(self):
buf = io.StringIO()
with contextlib.redirect_stderr(buf):
with self.assertRaises(log.Die) as cm:
log.die("fatal thing")
self.assertEqual("fatal thing", cm.exception.message)
self.assertIn("bot-bottle: error: fatal thing", buf.getvalue())
def test_die_surfaces_even_at_error_level(self):
buf = io.StringIO()
with mock.patch.dict("os.environ", {"BOT_BOTTLE_LOG_LEVEL": "error"}):
with contextlib.redirect_stderr(buf):
with self.assertRaises(log.Die):
log.die("still fatal")
self.assertIn("bot-bottle: error: still fatal", buf.getvalue())
if __name__ == "__main__":
unittest.main()
+2 -1
View File
@@ -42,6 +42,7 @@ def _plan(
routes_path=routes_path, routes_path=routes_path,
routes=("route",), routes=("route",),
token_env_map={"EGRESS_TOKEN_0": "HOST_TOKEN"}, token_env_map={"EGRESS_TOKEN_0": "HOST_TOKEN"},
canary="",
) )
if git: if git:
key_path = stage_dir / "origin-key" key_path = stage_dir / "origin-key"
@@ -271,7 +272,7 @@ def _build_plan(stage_dir: Path) -> MacosContainerBottlePlan:
manifest=_MANIFEST, manifest=_MANIFEST,
stage_dir=stage_dir, stage_dir=stage_dir,
git_gate_plan=cast(GitGatePlan, SimpleNamespace(upstreams=())), git_gate_plan=cast(GitGatePlan, SimpleNamespace(upstreams=())),
egress_plan=cast(EgressPlan, SimpleNamespace()), egress_plan=cast(EgressPlan, SimpleNamespace(canary="")),
supervise_plan=None, supervise_plan=None,
agent_provision=AgentProvisionPlan( agent_provision=AgentProvisionPlan(
template="claude", template="claude",
+2
View File
@@ -17,6 +17,7 @@ from bot_bottle.supervise import (
STATUS_MODIFIED, STATUS_MODIFIED,
STATUS_REJECTED, STATUS_REJECTED,
TOOL_CAPABILITY_BLOCK, TOOL_CAPABILITY_BLOCK,
TOOL_GITLEAKS_ALLOW,
archive_proposal, archive_proposal,
audit_log_path, audit_log_path,
list_pending_proposals, list_pending_proposals,
@@ -320,6 +321,7 @@ class TestToolConstants(unittest.TestCase):
supervise.TOOL_ALLOW, supervise.TOOL_ALLOW,
TOOL_CAPABILITY_BLOCK, TOOL_CAPABILITY_BLOCK,
supervise.TOOL_EGRESS_BLOCK, supervise.TOOL_EGRESS_BLOCK,
TOOL_GITLEAKS_ALLOW,
supervise.TOOL_LIST_EGRESS_ROUTES, supervise.TOOL_LIST_EGRESS_ROUTES,
), ),
supervise.TOOLS, supervise.TOOLS,
+26
View File
@@ -19,6 +19,7 @@ from bot_bottle.supervise import (
STATUS_MODIFIED, STATUS_MODIFIED,
STATUS_REJECTED, STATUS_REJECTED,
TOOL_CAPABILITY_BLOCK, TOOL_CAPABILITY_BLOCK,
TOOL_GITLEAKS_ALLOW,
read_audit_entries, read_audit_entries,
read_response, read_response,
sha256_hex, sha256_hex,
@@ -33,6 +34,7 @@ def _proposal(slug: str = "dev", tool: str = TOOL_CAPABILITY_BLOCK) -> Proposal:
TOOL_CAPABILITY_BLOCK: "FROM python:3.13\n", TOOL_CAPABILITY_BLOCK: "FROM python:3.13\n",
supervise.TOOL_ALLOW: "routes:\n - host: example.com\n", supervise.TOOL_ALLOW: "routes:\n - host: example.com\n",
supervise.TOOL_EGRESS_BLOCK: "routes:\n - host: example.com\n", supervise.TOOL_EGRESS_BLOCK: "routes:\n - host: example.com\n",
TOOL_GITLEAKS_ALLOW: "file: tests/test_fixture.py\nline: 3\n",
} }
payload = payloads.get(tool, "") payload = payloads.get(tool, "")
return Proposal.new( return Proposal.new(
@@ -170,6 +172,30 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
self.assertEqual(STATUS_APPROVED, entries[0].operator_action) self.assertEqual(STATUS_APPROVED, entries[0].operator_action)
self.assertEqual("needed for dev", entries[0].justification) self.assertEqual("needed for dev", entries[0].justification)
def test_approve_gitleaks_allow_leaves_response_for_gate(self):
qp = self._enqueue(tool=TOOL_GITLEAKS_ALLOW)
supervise_cli.approve(qp, notes="dummy fixture")
# Gate polls the queue dir for the response; TUI must not archive it.
resp = read_response(qp.queue_dir, qp.proposal.id)
self.assertEqual(STATUS_APPROVED, resp.status)
self.assertEqual("dummy fixture", resp.notes)
self.assertFalse((qp.queue_dir / "processed").exists())
def test_tui_gitleaks_allow_requires_reason(self):
qp = self._enqueue(tool=TOOL_GITLEAKS_ALLOW)
with patch.object(supervise_cli, "_prompt", return_value=""):
status = supervise_cli._approve_from_tui(None, qp) # type: ignore[arg-type]
self.assertEqual("approve aborted (empty reason)", status)
self.assertFalse((qp.queue_dir / "processed").exists())
def test_tui_gitleaks_allow_writes_reason(self):
qp = self._enqueue(tool=TOOL_GITLEAKS_ALLOW)
with patch.object(supervise_cli, "_prompt", return_value="test fixture"):
status = supervise_cli._approve_from_tui(None, qp) # type: ignore[arg-type]
self.assertIn("approved gitleaks-allow", status)
resp = read_response(qp.queue_dir, qp.proposal.id)
self.assertEqual("test fixture", resp.notes)
# class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase): # class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
# # DISABLED — capability_apply functionality is currently commented out. # # DISABLED — capability_apply functionality is currently commented out.