Compare commits
16 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| affd628df6 | |||
| 2610f5a2c9 | |||
| 001a420957 | |||
| 813cb685bf | |||
| 7cb967770e | |||
| 80eca740d6 | |||
| 369d332204 | |||
| 31cde11b0d | |||
| c41751f3b9 | |||
| e2422c20a0 | |||
| de71533a17 | |||
| 88c4f61901 | |||
| c666eaa63f | |||
| 83eb9e4041 | |||
| 33333ac4d9 | |||
| 4d56f515bc |
@@ -14,7 +14,7 @@
|
|||||||
|
|
||||||
## Features
|
## Features
|
||||||
|
|
||||||
- **Per-bottle egress allowlist** — TLS-bumped HTTP/HTTPS chokepoint with a per-manifest host allowlist and request-body DLP scanner; DoH and arbitrary hosts blocked by default.
|
- **Per-bottle egress allowlist** — TLS-bumped HTTP/HTTPS chokepoint with a per-manifest host allowlist; per-route path/method/header `matches` filtering; outbound DLP scanning for known tokens and secrets, inbound DLP scanning for prompt-injection attempts; DoH and arbitrary hosts blocked by default.
|
||||||
- **Tokens the agent never sees** — host secrets live in a sidecar; the agent dials `http://sidecar:9099/<path>` and the proxy strips inbound `Authorization` and injects the real token before forwarding. `printenv` in the agent shows proxy URLs only.
|
- **Tokens the agent never sees** — host secrets live in a sidecar; the agent dials `http://sidecar:9099/<path>` and the proxy strips inbound `Authorization` and injects the real token before forwarding. `printenv` in the agent shows proxy URLs only.
|
||||||
- **Gitleaks-scanned push (git-gate)** — `bottle.git` remotes route through a per-bottle `git daemon` that gitleaks-scans incoming refs pre-receive and forwards clean refs upstream over SSH. The agent never holds the upstream credential.
|
- **Gitleaks-scanned push (git-gate)** — `bottle.git` remotes route through a per-bottle `git daemon` that gitleaks-scans incoming refs pre-receive and forwards clean refs upstream over SSH. The agent never holds the upstream credential.
|
||||||
- **Manifest-scoped skills + secrets** — each bottle declares its skills, env, git identity, remotes, and egress routes; unknown keys die at load.
|
- **Manifest-scoped skills + secrets** — each bottle declares its skills, env, git identity, remotes, and egress routes; unknown keys die at load.
|
||||||
@@ -106,8 +106,15 @@ egress:
|
|||||||
routes:
|
routes:
|
||||||
- host: gitea.dideric.is
|
- host: gitea.dideric.is
|
||||||
auth:
|
auth:
|
||||||
scheme: token
|
scheme: token # Bearer | token
|
||||||
token_ref: BOT_BOTTLE_GITEA_TOKEN
|
token_ref: BOT_BOTTLE_GITEA_TOKEN
|
||||||
|
matches: # optional — restrict to specific paths/methods/headers
|
||||||
|
- paths:
|
||||||
|
- {type: prefix, value: /api/v1/}
|
||||||
|
methods: [GET, POST, PATCH, DELETE]
|
||||||
|
dlp: # optional — per-route detector overrides (default: all on)
|
||||||
|
outbound_detectors: [token_patterns, known_secrets]
|
||||||
|
inbound_detectors: false # disable response scanning for this host
|
||||||
---
|
---
|
||||||
|
|
||||||
The `gitea-dev` bottle. Provider auth via the inherited Claude route;
|
The `gitea-dev` bottle. Provider auth via the inherited Claude route;
|
||||||
@@ -126,6 +133,23 @@ skills:
|
|||||||
You help maintain Gitea-hosted projects.
|
You help maintain Gitea-hosted projects.
|
||||||
````
|
````
|
||||||
|
|
||||||
|
**Egress route fields:**
|
||||||
|
|
||||||
|
| Field | Required | Description |
|
||||||
|
|---|---|---|
|
||||||
|
| `host` | yes | Hostname to allowlist. One entry per host. |
|
||||||
|
| `role` | no | Reserved for future use. The key is recognised but any value is currently rejected at load. Provider auth routes (e.g. Claude's `api.anthropic.com`) are injected automatically from `agent_provider.auth_token`, not via `role`. |
|
||||||
|
| `auth.scheme` | when `auth` present | `Bearer` or `token`. Injected by the proxy; the agent never sees the value. |
|
||||||
|
| `auth.token_ref` | when `auth` present | Env-var name holding the secret on the host. |
|
||||||
|
| `matches` | no | Array of `{paths, methods, headers}` filters. A request must match at least one entry (if any are given) to be forwarded. |
|
||||||
|
| `matches[].paths` | no | Array of `{type, value}`. `type` is `prefix` (default), `exact`, or `regex`. |
|
||||||
|
| `matches[].methods` | no | Array of HTTP method strings, e.g. `[GET, POST]`. |
|
||||||
|
| `matches[].headers` | no | Array of `{name, value, type}`. `type` is `exact` (default) or `regex`. |
|
||||||
|
| `dlp` | no | Per-route DLP overrides. Omit to use defaults (all detectors on). |
|
||||||
|
| `dlp.outbound_detectors` | no | `false` disables outbound scanning; list restricts to named detectors (`token_patterns`, `known_secrets`). |
|
||||||
|
| `dlp.inbound_detectors` | no | `false` disables inbound scanning; list restricts to named detectors (`naive_injection_detection`). |
|
||||||
|
| `git.fetch` | no | `true` permits smart HTTP clone/fetch (`git-upload-pack`) for this host. Push (`git-receive-pack`) remains blocked. |
|
||||||
|
|
||||||
More examples in `examples/`. Full design lives under `docs/prds/`; the trust-boundary rationale is in `docs/prds/0011-per-file-md-manifest.md`.
|
More examples in `examples/`. Full design lives under `docs/prds/`; the trust-boundary rationale is in `docs/prds/0011-per-file-md-manifest.md`.
|
||||||
|
|
||||||
## Trademarks
|
## Trademarks
|
||||||
|
|||||||
@@ -137,6 +137,10 @@ def _sidecar_bundle_service(plan: DockerBottlePlan) -> dict[str, Any]:
|
|||||||
volumes.append(_bind(ep.routes_path.parent, str(Path(EGRESS_ROUTES_IN_CONTAINER).parent)))
|
volumes.append(_bind(ep.routes_path.parent, str(Path(EGRESS_ROUTES_IN_CONTAINER).parent)))
|
||||||
for token_env in sorted(ep.token_env_map.keys()):
|
for token_env in sorted(ep.token_env_map.keys()):
|
||||||
env.append(token_env)
|
env.append(token_env)
|
||||||
|
if ep.canary:
|
||||||
|
# Inject canary as a literal NAME=VALUE (not a bare name) — the
|
||||||
|
# value is a fake secret so it need not be hidden from the compose file.
|
||||||
|
env.append(f"EGRESS_TOKEN_CANARY={ep.canary}")
|
||||||
|
|
||||||
# --- git-gate -----------------------------------------------------
|
# --- git-gate -----------------------------------------------------
|
||||||
gp = plan.git_gate_plan
|
gp = plan.git_gate_plan
|
||||||
@@ -220,6 +224,10 @@ def _agent_service(plan: DockerBottlePlan) -> dict[str, Any]:
|
|||||||
# never lands on argv or in the compose file.
|
# never lands on argv or in the compose file.
|
||||||
for name in sorted(plan.forwarded_env.keys()):
|
for name in sorted(plan.forwarded_env.keys()):
|
||||||
env.append(name)
|
env.append(name)
|
||||||
|
# Canary token: visible to the agent as a fake secret so that any
|
||||||
|
# outbound appearance of this value is a zero-FP exfil signal.
|
||||||
|
if plan.egress_plan.canary:
|
||||||
|
env.append(f"BOT_BOTTLE_CANARY={plan.egress_plan.canary}")
|
||||||
|
|
||||||
service: dict[str, Any] = {
|
service: dict[str, Any] = {
|
||||||
"image": plan.image,
|
"image": plan.image,
|
||||||
|
|||||||
@@ -353,6 +353,8 @@ def _sidecar_env_entries(plan: MacosContainerBottlePlan) -> tuple[str, ...]:
|
|||||||
env: list[str] = []
|
env: list[str] = []
|
||||||
if plan.egress_plan.routes:
|
if plan.egress_plan.routes:
|
||||||
env.extend(sorted(plan.egress_plan.token_env_map.keys()))
|
env.extend(sorted(plan.egress_plan.token_env_map.keys()))
|
||||||
|
if plan.egress_plan.canary:
|
||||||
|
env.append(f"EGRESS_TOKEN_CANARY={plan.egress_plan.canary}")
|
||||||
if plan.git_gate_plan.upstreams:
|
if plan.git_gate_plan.upstreams:
|
||||||
env.append(f"BOT_BOTTLE_GIT_GATE_READY_FILE={_GIT_GATE_READY_FILE}")
|
env.append(f"BOT_BOTTLE_GIT_GATE_READY_FILE={_GIT_GATE_READY_FILE}")
|
||||||
if plan.supervise_plan is not None:
|
if plan.supervise_plan is not None:
|
||||||
@@ -420,6 +422,8 @@ def _agent_env_entries(
|
|||||||
env.append(f"{name}={value}")
|
env.append(f"{name}={value}")
|
||||||
for name in sorted(plan.forwarded_env.keys()):
|
for name in sorted(plan.forwarded_env.keys()):
|
||||||
env.append(name)
|
env.append(name)
|
||||||
|
if plan.egress_plan.canary:
|
||||||
|
env.append(f"BOT_BOTTLE_CANARY={plan.egress_plan.canary}")
|
||||||
return tuple(env)
|
return tuple(env)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -53,6 +53,7 @@ from ..supervise import (
|
|||||||
TOOL_CAPABILITY_BLOCK,
|
TOOL_CAPABILITY_BLOCK,
|
||||||
TOOL_ALLOW,
|
TOOL_ALLOW,
|
||||||
TOOL_EGRESS_BLOCK,
|
TOOL_EGRESS_BLOCK,
|
||||||
|
TOOL_GITLEAKS_ALLOW,
|
||||||
archive_proposal,
|
archive_proposal,
|
||||||
list_pending_proposals,
|
list_pending_proposals,
|
||||||
render_diff,
|
render_diff,
|
||||||
@@ -140,6 +141,8 @@ def _suffix_for_tool(tool: str) -> str:
|
|||||||
return ".dockerfile"
|
return ".dockerfile"
|
||||||
if tool in (TOOL_ALLOW, TOOL_EGRESS_BLOCK):
|
if tool in (TOOL_ALLOW, TOOL_EGRESS_BLOCK):
|
||||||
return ".yaml"
|
return ".yaml"
|
||||||
|
if tool == TOOL_GITLEAKS_ALLOW:
|
||||||
|
return ".txt"
|
||||||
return ".txt"
|
return ".txt"
|
||||||
|
|
||||||
|
|
||||||
@@ -201,6 +204,23 @@ def reject(qp: QueuedProposal, *, reason: str) -> None:
|
|||||||
_write_audit(qp, action=STATUS_REJECTED, notes=reason, diff_before="", diff_after="")
|
_write_audit(qp, action=STATUS_REJECTED, notes=reason, diff_before="", diff_after="")
|
||||||
|
|
||||||
|
|
||||||
|
def _approve_from_tui(
|
||||||
|
stdscr: "curses._CursesWindow", # type: ignore
|
||||||
|
qp: QueuedProposal,
|
||||||
|
*,
|
||||||
|
final_file: str | None = None,
|
||||||
|
notes: str = "",
|
||||||
|
) -> str:
|
||||||
|
"""Approve from curses, prompting for any tool-specific audit note."""
|
||||||
|
if qp.proposal.tool == TOOL_GITLEAKS_ALLOW and final_file is None:
|
||||||
|
notes = _prompt(stdscr, "allow reason (test fixture/false positive): ")
|
||||||
|
if not notes:
|
||||||
|
return "approve aborted (empty reason)"
|
||||||
|
approve(qp, final_file=final_file, notes=notes)
|
||||||
|
verb = "modified+approved" if final_file is not None else "approved"
|
||||||
|
return _approval_status(qp, verb)
|
||||||
|
|
||||||
|
|
||||||
def _write_audit(
|
def _write_audit(
|
||||||
qp: QueuedProposal,
|
qp: QueuedProposal,
|
||||||
*,
|
*,
|
||||||
@@ -272,7 +292,10 @@ def cmd_supervise(argv: list[str]) -> int:
|
|||||||
return e.code if isinstance(e.code, int) else 1
|
return e.code if isinstance(e.code, int) else 1
|
||||||
except Exception as e: # noqa: W0718 — catch supervise crash for logging
|
except Exception as e: # noqa: W0718 — catch supervise crash for logging
|
||||||
log_path = _write_crash_log(e)
|
log_path = _write_crash_log(e)
|
||||||
error(f"supervise crashed: {type(e).__name__}: {e}")
|
error(
|
||||||
|
f"supervise crashed: {type(e).__name__}: {e}",
|
||||||
|
context={"error_type": type(e).__name__, "crash_log": str(log_path)},
|
||||||
|
)
|
||||||
error(f"full traceback written to {log_path}")
|
error(f"full traceback written to {log_path}")
|
||||||
return 1
|
return 1
|
||||||
return 0
|
return 0
|
||||||
@@ -384,18 +407,22 @@ def _main_loop(stdscr: "curses._CursesWindow") -> None: # type: ignore
|
|||||||
_detail_view(stdscr, qp, green_attr=green_attr)
|
_detail_view(stdscr, qp, green_attr=green_attr)
|
||||||
elif key == ord("a"):
|
elif key == ord("a"):
|
||||||
try:
|
try:
|
||||||
approve(qp)
|
status_line = _approve_from_tui(stdscr, qp)
|
||||||
status_line = _approval_status(qp, "approved")
|
|
||||||
except ApplyError as e:
|
except ApplyError as e:
|
||||||
status_line = f"apply failed: {e}"
|
status_line = f"apply failed: {e}"
|
||||||
elif key == ord("m"):
|
elif key == ord("m"):
|
||||||
|
if qp.proposal.tool == TOOL_GITLEAKS_ALLOW:
|
||||||
|
status_line = "modify unavailable for gitleaks-allow"
|
||||||
|
continue
|
||||||
edited = _modify(stdscr, qp)
|
edited = _modify(stdscr, qp)
|
||||||
if edited is None:
|
if edited is None:
|
||||||
status_line = "modify aborted (no change)"
|
status_line = "modify aborted (no change)"
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
approve(qp, final_file=edited, notes="operator modified before approving")
|
status_line = _approve_from_tui(
|
||||||
status_line = _approval_status(qp, "modified+approved")
|
stdscr, qp, final_file=edited,
|
||||||
|
notes="operator modified before approving",
|
||||||
|
)
|
||||||
except ApplyError as e:
|
except ApplyError as e:
|
||||||
status_line = f"apply failed: {e}"
|
status_line = f"apply failed: {e}"
|
||||||
elif key == ord("r"):
|
elif key == ord("r"):
|
||||||
@@ -493,15 +520,20 @@ def _detail_view(
|
|||||||
offset = max(0, len(lines) - 1)
|
offset = max(0, len(lines) - 1)
|
||||||
elif key == ord("a"):
|
elif key == ord("a"):
|
||||||
try:
|
try:
|
||||||
approve(qp)
|
_approve_from_tui(stdscr, qp)
|
||||||
except ApplyError:
|
except ApplyError:
|
||||||
pass
|
pass
|
||||||
return
|
return
|
||||||
elif key == ord("m"):
|
elif key == ord("m"):
|
||||||
|
if qp.proposal.tool == TOOL_GITLEAKS_ALLOW:
|
||||||
|
return
|
||||||
edited = _modify(stdscr, qp)
|
edited = _modify(stdscr, qp)
|
||||||
if edited is not None:
|
if edited is not None:
|
||||||
try:
|
try:
|
||||||
approve(qp, final_file=edited, notes="operator modified before approving")
|
_approve_from_tui(
|
||||||
|
stdscr, qp, final_file=edited,
|
||||||
|
notes="operator modified before approving",
|
||||||
|
)
|
||||||
except ApplyError:
|
except ApplyError:
|
||||||
pass
|
pass
|
||||||
return
|
return
|
||||||
|
|||||||
+154
-4
@@ -1,4 +1,4 @@
|
|||||||
"""DLP detectors for the egress proxy (PRD 0053).
|
"""DLP detectors for the egress proxy (PRD 0053, prd-new).
|
||||||
|
|
||||||
Pure Python, no mitmproxy dependency. Each detector is a module-level
|
Pure Python, no mitmproxy dependency. Each detector is a module-level
|
||||||
function returning `ScanResult | None`.
|
function returning `ScanResult | None`.
|
||||||
@@ -15,6 +15,8 @@ import gzip
|
|||||||
import re
|
import re
|
||||||
import typing
|
import typing
|
||||||
import unicodedata
|
import unicodedata
|
||||||
|
from math import log2
|
||||||
|
from collections import Counter
|
||||||
from urllib.parse import quote as url_quote
|
from urllib.parse import quote as url_quote
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -96,20 +98,21 @@ def redact_tokens(
|
|||||||
text: str,
|
text: str,
|
||||||
*,
|
*,
|
||||||
env: typing.Mapping[str, str] | None = None,
|
env: typing.Mapping[str, str] | None = None,
|
||||||
|
sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",),
|
||||||
) -> str:
|
) -> str:
|
||||||
"""Replace token pattern matches and (if env given) provisioned secrets with REDACT."""
|
"""Replace token pattern matches and (if env given) provisioned secrets with REDACT."""
|
||||||
for _, pattern in TOKEN_PATTERNS:
|
for _, pattern in TOKEN_PATTERNS:
|
||||||
text = pattern.sub(REDACT, text)
|
text = pattern.sub(REDACT, text)
|
||||||
if env is not None:
|
if env is not None:
|
||||||
for key, value in env.items():
|
for key, value in env.items():
|
||||||
if key.startswith("EGRESS_TOKEN_") and value:
|
if any(key.startswith(p) for p in sensitive_prefixes) and value:
|
||||||
for variant in _encoded_variants(value):
|
for variant in _encoded_variants(value):
|
||||||
text = text.replace(variant, REDACT)
|
text = text.replace(variant, REDACT)
|
||||||
return text
|
return text
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Known secrets detector (Phase 1b)
|
# Known secrets detector (Phase 1b, prd-new)
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
def _encoded_variants(secret: str) -> list[str]:
|
def _encoded_variants(secret: str) -> list[str]:
|
||||||
@@ -150,17 +153,63 @@ def _encoded_variants(secret: str) -> list[str]:
|
|||||||
return variants
|
return variants
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Fragmentation-resistant helpers (prd-new)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
# Minimum length of alnum projection for projection-based checks to run.
|
||||||
|
# Short secrets produce too many false positives in projection space.
|
||||||
|
_ALNUM_MIN_LEN = 8
|
||||||
|
|
||||||
|
# Minimum window length for the partial-substring sliding scan.
|
||||||
|
PARTIAL_MATCH_MIN_LEN = 12
|
||||||
|
|
||||||
|
|
||||||
|
def _alnum_projection(text: str) -> str:
|
||||||
|
"""Return text with every non-alphanumeric character stripped.
|
||||||
|
|
||||||
|
Used for fragmentation-resistant matching: separator-injected secrets
|
||||||
|
(spaces, hyphens, dots inserted between characters) are identical to
|
||||||
|
their originals in alnum projection space.
|
||||||
|
"""
|
||||||
|
return "".join(c for c in text if c.isalnum())
|
||||||
|
|
||||||
|
|
||||||
|
def _find_partial_window(secret_alnum: str, text_alnum: str, min_len: int) -> int | None:
|
||||||
|
"""Return the position in text_alnum where any min_len-char window of
|
||||||
|
secret_alnum first appears, or None.
|
||||||
|
|
||||||
|
Slides a window of width min_len across secret_alnum and searches for
|
||||||
|
each window in text_alnum. The first hit position is returned.
|
||||||
|
"""
|
||||||
|
if len(secret_alnum) < min_len or len(text_alnum) < min_len:
|
||||||
|
return None
|
||||||
|
for i in range(len(secret_alnum) - min_len + 1):
|
||||||
|
window = secret_alnum[i:i + min_len]
|
||||||
|
pos = text_alnum.find(window)
|
||||||
|
if pos >= 0:
|
||||||
|
return pos
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def scan_known_secrets(
|
def scan_known_secrets(
|
||||||
text: str,
|
text: str,
|
||||||
*,
|
*,
|
||||||
location: str = "body",
|
location: str = "body",
|
||||||
env: typing.Mapping[str, str] | None = None,
|
env: typing.Mapping[str, str] | None = None,
|
||||||
|
sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",),
|
||||||
) -> ScanResult | None:
|
) -> ScanResult | None:
|
||||||
if env is None:
|
if env is None:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
# Pre-compute alnum projection of the scan text once; reused per secret.
|
||||||
|
text_alnum: str | None = None
|
||||||
|
|
||||||
for key, value in env.items():
|
for key, value in env.items():
|
||||||
if not key.startswith("EGRESS_TOKEN_") or not value:
|
if not any(key.startswith(p) for p in sensitive_prefixes) or not value:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# Pass 1: exact match across encoded variants (original behaviour).
|
||||||
for variant in _encoded_variants(value):
|
for variant in _encoded_variants(value):
|
||||||
pos = text.find(variant)
|
pos = text.find(variant)
|
||||||
if pos >= 0:
|
if pos >= 0:
|
||||||
@@ -170,6 +219,100 @@ def scan_known_secrets(
|
|||||||
location=location,
|
location=location,
|
||||||
context=_snippet(text, pos, pos + len(variant)),
|
context=_snippet(text, pos, pos + len(variant)),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Pass 2 & 3: fragmentation-resistant projection checks.
|
||||||
|
secret_alnum = _alnum_projection(value)
|
||||||
|
if len(secret_alnum) < _ALNUM_MIN_LEN:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if text_alnum is None:
|
||||||
|
text_alnum = _alnum_projection(text)
|
||||||
|
|
||||||
|
# Pass 2: full alnum-projection exact match (catches separator injection).
|
||||||
|
pos2 = text_alnum.find(secret_alnum)
|
||||||
|
if pos2 >= 0:
|
||||||
|
return ScanResult(
|
||||||
|
severity="block",
|
||||||
|
reason=(
|
||||||
|
f"provisioned secret from {key} found in {location} "
|
||||||
|
f"(fragmented match — separator injection)"
|
||||||
|
),
|
||||||
|
location=location,
|
||||||
|
context=_snippet(text_alnum, pos2, pos2 + len(secret_alnum)),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Pass 3: sliding-window partial match (catches chunked-substring leaks).
|
||||||
|
pos3 = _find_partial_window(secret_alnum, text_alnum, PARTIAL_MATCH_MIN_LEN)
|
||||||
|
if pos3 is not None:
|
||||||
|
return ScanResult(
|
||||||
|
severity="block",
|
||||||
|
reason=(
|
||||||
|
f"provisioned secret from {key} found in {location} "
|
||||||
|
f"(partial match — at least {PARTIAL_MATCH_MIN_LEN} consecutive "
|
||||||
|
f"alphanumeric chars)"
|
||||||
|
),
|
||||||
|
location=location,
|
||||||
|
context=_snippet(text_alnum, pos3, pos3 + PARTIAL_MATCH_MIN_LEN),
|
||||||
|
)
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Entropy detector (warn-only, prd-new)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
# Sliding window size and step for the entropy scan.
|
||||||
|
ENTROPY_WINDOW = 64
|
||||||
|
ENTROPY_STEP = 32
|
||||||
|
|
||||||
|
# Bits-per-character threshold. Random ASCII printable ≈ 6.6 bits; random
|
||||||
|
# lowercase hex ≈ 4 bits; random base64url ≈ 6 bits. 5.5 sits above
|
||||||
|
# typical structured data (JSON, URLs) while staying below truly random
|
||||||
|
# content.
|
||||||
|
ENTROPY_BLOCK_THRESHOLD = 5.5
|
||||||
|
|
||||||
|
|
||||||
|
def _shannon_entropy(text: str) -> float:
|
||||||
|
if not text:
|
||||||
|
return 0.0
|
||||||
|
counts = Counter(text)
|
||||||
|
n = len(text)
|
||||||
|
return -sum((c / n) * log2(c / n) for c in counts.values())
|
||||||
|
|
||||||
|
|
||||||
|
def scan_entropy(
|
||||||
|
text: str,
|
||||||
|
*,
|
||||||
|
location: str = "body",
|
||||||
|
window: int = ENTROPY_WINDOW,
|
||||||
|
threshold: float = ENTROPY_BLOCK_THRESHOLD,
|
||||||
|
) -> ScanResult | None:
|
||||||
|
"""Warn-only detector: flag windows of `window` chars with Shannon entropy
|
||||||
|
above `threshold` bits per character.
|
||||||
|
|
||||||
|
Never blocks; always returns severity='warn'. Disabled by default —
|
||||||
|
routes must opt in via dlp.outbound_detectors=['entropy'].
|
||||||
|
"""
|
||||||
|
if not text:
|
||||||
|
return None
|
||||||
|
step = max(1, window // 2)
|
||||||
|
end = len(text)
|
||||||
|
# Scan overlapping windows; also check the final tail if shorter than window.
|
||||||
|
positions = list(range(0, end - window + 1, step))
|
||||||
|
if end < window:
|
||||||
|
positions = [0]
|
||||||
|
elif (end - window) % step != 0:
|
||||||
|
positions.append(end - window)
|
||||||
|
for i in positions:
|
||||||
|
chunk = text[i:i + window]
|
||||||
|
if _shannon_entropy(chunk) >= threshold:
|
||||||
|
return ScanResult(
|
||||||
|
severity="warn",
|
||||||
|
reason=f"high-entropy content in {location} (possible encrypted exfil)",
|
||||||
|
location=location,
|
||||||
|
context=_snippet(text, i, i + len(chunk)),
|
||||||
|
)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
@@ -280,11 +423,18 @@ def scan_crlf_injection(text: str) -> ScanResult | None:
|
|||||||
|
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
|
"ENTROPY_BLOCK_THRESHOLD",
|
||||||
|
"ENTROPY_WINDOW",
|
||||||
|
"ENTROPY_STEP",
|
||||||
|
"PARTIAL_MATCH_MIN_LEN",
|
||||||
"REDACT",
|
"REDACT",
|
||||||
"SNIPPET_CONTEXT",
|
"SNIPPET_CONTEXT",
|
||||||
"TOKEN_PATTERNS",
|
"TOKEN_PATTERNS",
|
||||||
|
"_alnum_projection",
|
||||||
|
"_shannon_entropy",
|
||||||
"redact_tokens",
|
"redact_tokens",
|
||||||
"scan_crlf_injection",
|
"scan_crlf_injection",
|
||||||
|
"scan_entropy",
|
||||||
"scan_known_secrets",
|
"scan_known_secrets",
|
||||||
"scan_naive_injection",
|
"scan_naive_injection",
|
||||||
"scan_token_patterns",
|
"scan_token_patterns",
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ specific and lives on concrete subclasses (see
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import dataclasses
|
import dataclasses
|
||||||
|
import secrets
|
||||||
from abc import ABC
|
from abc import ABC
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
@@ -64,6 +65,7 @@ class EgressPlan:
|
|||||||
mitmproxy_ca_host_path: Path = Path()
|
mitmproxy_ca_host_path: Path = Path()
|
||||||
mitmproxy_ca_cert_only_host_path: Path = Path()
|
mitmproxy_ca_cert_only_host_path: Path = Path()
|
||||||
log: int = 0
|
log: int = 0
|
||||||
|
canary: str = ""
|
||||||
|
|
||||||
|
|
||||||
def egress_manifest_routes(
|
def egress_manifest_routes(
|
||||||
@@ -299,12 +301,17 @@ class Egress(ABC):
|
|||||||
routes_path = stage_dir / EGRESS_ROUTES_FILENAME
|
routes_path = stage_dir / EGRESS_ROUTES_FILENAME
|
||||||
routes_path.write_text(egress_render_routes(routes, log=log))
|
routes_path.write_text(egress_render_routes(routes, log=log))
|
||||||
routes_path.chmod(0o600)
|
routes_path.chmod(0o600)
|
||||||
|
# Generate a per-session canary token. The sidecar receives it as
|
||||||
|
# EGRESS_TOKEN_CANARY (scanned by the existing known-secrets detector);
|
||||||
|
# the agent receives it as BOT_BOTTLE_CANARY (a visible fake secret).
|
||||||
|
canary = secrets.token_urlsafe(32)
|
||||||
return EgressPlan(
|
return EgressPlan(
|
||||||
slug=slug,
|
slug=slug,
|
||||||
routes_path=routes_path,
|
routes_path=routes_path,
|
||||||
routes=routes,
|
routes=routes,
|
||||||
token_env_map=egress_token_env_map(routes),
|
token_env_map=egress_token_env_map(routes),
|
||||||
log=log,
|
log=log,
|
||||||
|
canary=canary,
|
||||||
)
|
)
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
|
|||||||
@@ -34,7 +34,7 @@ VALID_METHODS = frozenset({
|
|||||||
"CONNECT",
|
"CONNECT",
|
||||||
})
|
})
|
||||||
|
|
||||||
OUTBOUND_DETECTOR_NAMES = frozenset({"token_patterns", "known_secrets"})
|
OUTBOUND_DETECTOR_NAMES = frozenset({"token_patterns", "known_secrets", "entropy"})
|
||||||
INBOUND_DETECTOR_NAMES = frozenset({"naive_injection_detection"})
|
INBOUND_DETECTOR_NAMES = frozenset({"naive_injection_detection"})
|
||||||
|
|
||||||
|
|
||||||
@@ -696,17 +696,28 @@ def scan_outbound(
|
|||||||
try:
|
try:
|
||||||
from dlp_detectors import ( # type: ignore[import-not-found]
|
from dlp_detectors import ( # type: ignore[import-not-found]
|
||||||
scan_crlf_injection,
|
scan_crlf_injection,
|
||||||
|
scan_entropy,
|
||||||
scan_known_secrets,
|
scan_known_secrets,
|
||||||
scan_token_patterns,
|
scan_token_patterns,
|
||||||
)
|
)
|
||||||
except ImportError: # pragma: no cover - host-side path
|
except ImportError: # pragma: no cover - host-side path
|
||||||
from .dlp_detectors import ( # type: ignore[import-not-found]
|
from .dlp_detectors import ( # type: ignore[import-not-found]
|
||||||
scan_crlf_injection,
|
scan_crlf_injection,
|
||||||
|
scan_entropy,
|
||||||
scan_known_secrets,
|
scan_known_secrets,
|
||||||
scan_token_patterns,
|
scan_token_patterns,
|
||||||
)
|
)
|
||||||
|
|
||||||
text = body if isinstance(body, str) else body.decode("utf-8", errors="replace")
|
# Binary bodies: latin-1 is a bijective byte↔codepoint mapping that
|
||||||
|
# preserves every byte value, so ASCII-range secret strings remain
|
||||||
|
# findable by str.find / regex. Prefer strict UTF-8 for valid text bodies.
|
||||||
|
if isinstance(body, bytes):
|
||||||
|
try:
|
||||||
|
text = body.decode("utf-8")
|
||||||
|
except UnicodeDecodeError:
|
||||||
|
text = body.decode("latin-1")
|
||||||
|
else:
|
||||||
|
text = body
|
||||||
|
|
||||||
# CRLF injection is never legitimate — runs unconditionally, not gated
|
# CRLF injection is never legitimate — runs unconditionally, not gated
|
||||||
# by outbound_detectors config.
|
# by outbound_detectors config.
|
||||||
@@ -720,7 +731,26 @@ def scan_outbound(
|
|||||||
return result
|
return result
|
||||||
|
|
||||||
if _detector_enabled(route.outbound_detectors, "known_secrets"):
|
if _detector_enabled(route.outbound_detectors, "known_secrets"):
|
||||||
result = scan_known_secrets(text, location="body", env=environ)
|
# BOT_BOTTLE_SENSITIVE_PREFIXES lets operators add extra env prefixes
|
||||||
|
# beyond EGRESS_TOKEN_* without changing the manifest schema.
|
||||||
|
extra_raw = environ.get("BOT_BOTTLE_SENSITIVE_PREFIXES", "")
|
||||||
|
extra = tuple(p for p in extra_raw.split(",") if p)
|
||||||
|
sensitive_prefixes = ("EGRESS_TOKEN_",) + extra
|
||||||
|
result = scan_known_secrets(
|
||||||
|
text, location="body", env=environ, sensitive_prefixes=sensitive_prefixes,
|
||||||
|
)
|
||||||
|
if result is not None:
|
||||||
|
return result
|
||||||
|
|
||||||
|
# Entropy scanning requires explicit opt-in: it is NOT part of the
|
||||||
|
# default "all detectors" set because it produces false positives on
|
||||||
|
# legitimate base64 / binary payloads. Routes must list "entropy" in
|
||||||
|
# dlp.outbound_detectors to enable it.
|
||||||
|
if (
|
||||||
|
route.outbound_detectors is not None
|
||||||
|
and "entropy" in route.outbound_detectors
|
||||||
|
):
|
||||||
|
result = scan_entropy(text, location="body")
|
||||||
if result is not None:
|
if result is not None:
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|||||||
@@ -247,6 +247,164 @@ cat > "$refs_file"
|
|||||||
|
|
||||||
zero=0000000000000000000000000000000000000000
|
zero=0000000000000000000000000000000000000000
|
||||||
|
|
||||||
|
supervise_gitleaks_allow() {
|
||||||
|
log_opts=$1
|
||||||
|
ref=$2
|
||||||
|
report_file=$(mktemp)
|
||||||
|
if ! gitleaks git \
|
||||||
|
--log-opts="$log_opts" \
|
||||||
|
--no-banner \
|
||||||
|
--redact \
|
||||||
|
--ignore-gitleaks-allow \
|
||||||
|
--report-format=json \
|
||||||
|
--report-path="$report_file" \
|
||||||
|
--exit-code 0 \
|
||||||
|
1>&2; then
|
||||||
|
rm -f "$report_file"
|
||||||
|
echo "git-gate: gitleaks inline-suppression scan failed for $ref" >&2
|
||||||
|
return 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
proposal_id=$(
|
||||||
|
GITLEAKS_ALLOW_REF="$ref" python3 - "$report_file" <<'PY'
|
||||||
|
import datetime
|
||||||
|
import hashlib
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import uuid
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
report_path = Path(sys.argv[1])
|
||||||
|
queue_dir = os.environ.get("SUPERVISE_QUEUE_DIR", "")
|
||||||
|
slug = os.environ.get("SUPERVISE_BOTTLE_SLUG", "")
|
||||||
|
if not queue_dir or not slug:
|
||||||
|
sys.exit(2)
|
||||||
|
|
||||||
|
try:
|
||||||
|
raw = json.loads(report_path.read_text() or "[]")
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
sys.exit(3)
|
||||||
|
if not isinstance(raw, list):
|
||||||
|
sys.exit(3)
|
||||||
|
if not raw:
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
ref = os.environ.get("GITLEAKS_ALLOW_REF", "")
|
||||||
|
lines = [
|
||||||
|
"gitleaks inline suppression requires supervisor approval",
|
||||||
|
f"ref: {ref}",
|
||||||
|
"",
|
||||||
|
]
|
||||||
|
for i, finding in enumerate(raw, 1):
|
||||||
|
if not isinstance(finding, dict):
|
||||||
|
continue
|
||||||
|
file_path = finding.get("File", "")
|
||||||
|
line_no = finding.get("StartLine", finding.get("Line", ""))
|
||||||
|
rule_id = finding.get("RuleID", "")
|
||||||
|
commit = finding.get("Commit", "")
|
||||||
|
line = finding.get("Line", "")
|
||||||
|
lines.extend([
|
||||||
|
f"finding {i}:",
|
||||||
|
f" file: {file_path}",
|
||||||
|
f" line: {line_no}",
|
||||||
|
f" rule: {rule_id}",
|
||||||
|
f" commit: {commit}",
|
||||||
|
f" code: {line}",
|
||||||
|
"",
|
||||||
|
])
|
||||||
|
|
||||||
|
payload = "\n".join(lines).rstrip() + "\n"
|
||||||
|
proposal_id = str(uuid.uuid4())
|
||||||
|
proposal = {
|
||||||
|
"id": proposal_id,
|
||||||
|
"bottle_slug": slug,
|
||||||
|
"tool": "gitleaks-allow",
|
||||||
|
"proposed_file": payload,
|
||||||
|
"justification": (
|
||||||
|
"git-gate found gitleaks findings hidden by # gitleaks:allow; "
|
||||||
|
"approve only for dummy test fixtures or confirmed false positives"
|
||||||
|
),
|
||||||
|
"arrival_timestamp": datetime.datetime.now(
|
||||||
|
datetime.timezone.utc
|
||||||
|
).isoformat(),
|
||||||
|
"current_file_hash": hashlib.sha256(payload.encode("utf-8")).hexdigest(),
|
||||||
|
}
|
||||||
|
queue = Path(queue_dir)
|
||||||
|
queue.mkdir(parents=True, exist_ok=True)
|
||||||
|
path = queue / f"{proposal_id}.proposal.json"
|
||||||
|
tmp = path.with_suffix(path.suffix + ".tmp")
|
||||||
|
with tmp.open("w", encoding="utf-8") as f:
|
||||||
|
json.dump(proposal, f, indent=2)
|
||||||
|
f.write("\n")
|
||||||
|
os.chmod(tmp, 0o600)
|
||||||
|
os.replace(tmp, path)
|
||||||
|
print(proposal_id)
|
||||||
|
PY
|
||||||
|
)
|
||||||
|
rc=$?
|
||||||
|
rm -f "$report_file"
|
||||||
|
if [ "$rc" -eq 0 ] && [ -z "$proposal_id" ]; then
|
||||||
|
return 0
|
||||||
|
fi
|
||||||
|
if [ "$rc" -ne 0 ]; then
|
||||||
|
echo "git-gate: cannot route # gitleaks:allow finding to supervisor; refusing push" >&2
|
||||||
|
return 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
queue_dir=${SUPERVISE_QUEUE_DIR:-}
|
||||||
|
response_file="$queue_dir/${proposal_id}.response.json"
|
||||||
|
timeout=${SUPERVISE_GITLEAKS_ALLOW_TIMEOUT_SECONDS:-300}
|
||||||
|
case "$timeout" in
|
||||||
|
''|*[!0-9]*)
|
||||||
|
echo "git-gate: invalid SUPERVISE_GITLEAKS_ALLOW_TIMEOUT_SECONDS=$timeout" >&2
|
||||||
|
return 1
|
||||||
|
;;
|
||||||
|
esac
|
||||||
|
echo "git-gate: queued # gitleaks:allow supervisor approval $proposal_id" >&2
|
||||||
|
echo "git-gate: approve with './cli.py supervise' to continue this push" >&2
|
||||||
|
waited=0
|
||||||
|
while [ "$waited" -lt "$timeout" ]; do
|
||||||
|
if [ -f "$response_file" ]; then
|
||||||
|
status=$(python3 - "$response_file" <<'PY'
|
||||||
|
import json
|
||||||
|
import sys
|
||||||
|
try:
|
||||||
|
with open(sys.argv[1], encoding="utf-8") as f:
|
||||||
|
raw = json.load(f)
|
||||||
|
except (OSError, json.JSONDecodeError):
|
||||||
|
sys.exit(1)
|
||||||
|
status = raw.get("status")
|
||||||
|
if not isinstance(status, str):
|
||||||
|
sys.exit(1)
|
||||||
|
print(status)
|
||||||
|
PY
|
||||||
|
) || status=""
|
||||||
|
case "$status" in
|
||||||
|
approved|modified)
|
||||||
|
mkdir -p "$queue_dir/processed"
|
||||||
|
mv -f "$queue_dir/${proposal_id}.proposal.json" "$queue_dir/processed/" 2>/dev/null || true
|
||||||
|
mv -f "$queue_dir/${proposal_id}.response.json" "$queue_dir/processed/" 2>/dev/null || true
|
||||||
|
echo "git-gate: supervisor approved # gitleaks:allow for $ref" >&2
|
||||||
|
return 0
|
||||||
|
;;
|
||||||
|
rejected)
|
||||||
|
echo "git-gate: supervisor rejected # gitleaks:allow for $ref" >&2
|
||||||
|
return 1
|
||||||
|
;;
|
||||||
|
*)
|
||||||
|
echo "git-gate: invalid supervisor response for # gitleaks:allow" >&2
|
||||||
|
return 1
|
||||||
|
;;
|
||||||
|
esac
|
||||||
|
fi
|
||||||
|
sleep 1
|
||||||
|
waited=$((waited + 1))
|
||||||
|
done
|
||||||
|
echo "git-gate: supervisor approval timed out for # gitleaks:allow; refusing push" >&2
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
|
||||||
# Phase 1: gitleaks scan each ref's incoming commits.
|
# Phase 1: gitleaks scan each ref's incoming commits.
|
||||||
while IFS=' ' read -r old new ref; do
|
while IFS=' ' read -r old new ref; do
|
||||||
[ -z "$ref" ] && continue
|
[ -z "$ref" ] && continue
|
||||||
@@ -268,6 +426,9 @@ while IFS=' ' read -r old new ref; do
|
|||||||
echo "git-gate: gitleaks rejected push to $ref" >&2
|
echo "git-gate: gitleaks rejected push to $ref" >&2
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
if ! supervise_gitleaks_allow "$log_opts" "$ref"; then
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
done < "$refs_file"
|
done < "$refs_file"
|
||||||
|
|
||||||
# Phase 2: forward each ref to the upstream (`origin`, configured
|
# Phase 2: forward each ref to the upstream (`origin`, configured
|
||||||
|
|||||||
+96
-10
@@ -1,21 +1,107 @@
|
|||||||
"""Tiny logging wrappers. All output goes to stderr."""
|
"""Tiny logging wrappers. All output goes to stderr.
|
||||||
|
|
||||||
|
Two capabilities layer onto the bare wrappers (issue #252):
|
||||||
|
|
||||||
|
- **Levels.** `debug` / `info` / `warn` / `error` carry an ordered
|
||||||
|
severity. Output is gated by `BOT_BOTTLE_LOG_LEVEL` (debug | info |
|
||||||
|
warn | error; default `info`). A message emits when its severity is
|
||||||
|
at or above the threshold, so `debug` is silent by default and
|
||||||
|
`error` always surfaces (nothing sits above it) — which keeps the
|
||||||
|
fatal `die` path visible regardless of the configured level.
|
||||||
|
|
||||||
|
- **Context.** Every wrapper takes an optional `context` mapping that
|
||||||
|
renders as a parseable ` [k=v ...]` suffix (keys sorted; values with
|
||||||
|
whitespace/quotes are quoted), so failures can be filtered and
|
||||||
|
correlated instead of being flat strings.
|
||||||
|
|
||||||
|
With no `context` and the default level, output is byte-identical to the
|
||||||
|
original `bot-bottle: <msg>` / `bot-bottle: warning: <msg>` /
|
||||||
|
`bot-bottle: error: <msg>` lines — the 100+ existing call sites are
|
||||||
|
unaffected.
|
||||||
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import os
|
||||||
import sys
|
import sys
|
||||||
from typing import NoReturn
|
from typing import Mapping, NoReturn
|
||||||
|
|
||||||
|
# Ordered severities. Gaps left between values so intermediate levels
|
||||||
|
# can be added later without renumbering.
|
||||||
|
DEBUG = 10
|
||||||
|
INFO = 20
|
||||||
|
WARN = 30
|
||||||
|
ERROR = 40
|
||||||
|
|
||||||
|
_LEVEL_NAMES: dict[str, int] = {
|
||||||
|
"debug": DEBUG,
|
||||||
|
"info": INFO,
|
||||||
|
"warn": WARN,
|
||||||
|
"warning": WARN,
|
||||||
|
"error": ERROR,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Default threshold when BOT_BOTTLE_LOG_LEVEL is unset or unrecognised.
|
||||||
|
_DEFAULT_THRESHOLD = INFO
|
||||||
|
|
||||||
|
_LOG_LEVEL_ENV = "BOT_BOTTLE_LOG_LEVEL"
|
||||||
|
|
||||||
|
|
||||||
def info(msg: str) -> None:
|
def _threshold() -> int:
|
||||||
print(f"bot-bottle: {msg}", file=sys.stderr)
|
"""Resolve the active level threshold from the environment.
|
||||||
|
|
||||||
|
Read per-call (not cached) so the level can be changed at runtime
|
||||||
|
and so tests can patch `os.environ` without a reload. Unknown values
|
||||||
|
fall back to the default rather than raising — logging must never be
|
||||||
|
the thing that crashes the process."""
|
||||||
|
raw = os.environ.get(_LOG_LEVEL_ENV, "")
|
||||||
|
return _LEVEL_NAMES.get(raw.strip().lower(), _DEFAULT_THRESHOLD)
|
||||||
|
|
||||||
|
|
||||||
def warn(msg: str) -> None:
|
def _format_context(context: Mapping[str, object] | None) -> str:
|
||||||
print(f"bot-bottle: warning: {msg}", file=sys.stderr)
|
"""Render a context mapping as a ` [k=v k2=v2]` suffix.
|
||||||
|
|
||||||
|
Keys are sorted for stable, diffable output. Values that are empty or
|
||||||
|
contain whitespace or a quote are wrapped in double quotes (with inner
|
||||||
|
quotes escaped) so each `k=v` pair stays parseable. Empty/None context
|
||||||
|
renders as the empty string."""
|
||||||
|
if not context:
|
||||||
|
return ""
|
||||||
|
parts: list[str] = []
|
||||||
|
for key in sorted(context):
|
||||||
|
value = str(context[key])
|
||||||
|
if value == "" or any(ch.isspace() for ch in value) or '"' in value:
|
||||||
|
value = '"' + value.replace('"', '\\"') + '"'
|
||||||
|
parts.append(f"{key}={value}")
|
||||||
|
return " [" + " ".join(parts) + "]"
|
||||||
|
|
||||||
|
|
||||||
def error(msg: str) -> None:
|
def _emit(
|
||||||
print(f"bot-bottle: error: {msg}", file=sys.stderr)
|
level: int,
|
||||||
|
label: str,
|
||||||
|
msg: str,
|
||||||
|
context: Mapping[str, object] | None,
|
||||||
|
) -> None:
|
||||||
|
if level < _threshold():
|
||||||
|
return
|
||||||
|
prefix = f"{label}: " if label else ""
|
||||||
|
sys.stderr.write(f"bot-bottle: {prefix}{msg}{_format_context(context)}\n")
|
||||||
|
|
||||||
|
|
||||||
|
def debug(msg: str, *, context: Mapping[str, object] | None = None) -> None:
|
||||||
|
_emit(DEBUG, "debug", msg, context)
|
||||||
|
|
||||||
|
|
||||||
|
def info(msg: str, *, context: Mapping[str, object] | None = None) -> None:
|
||||||
|
_emit(INFO, "", msg, context)
|
||||||
|
|
||||||
|
|
||||||
|
def warn(msg: str, *, context: Mapping[str, object] | None = None) -> None:
|
||||||
|
_emit(WARN, "warning", msg, context)
|
||||||
|
|
||||||
|
|
||||||
|
def error(msg: str, *, context: Mapping[str, object] | None = None) -> None:
|
||||||
|
_emit(ERROR, "error", msg, context)
|
||||||
|
|
||||||
|
|
||||||
class Die(SystemExit):
|
class Die(SystemExit):
|
||||||
@@ -31,6 +117,6 @@ class Die(SystemExit):
|
|||||||
self.message = message
|
self.message = message
|
||||||
|
|
||||||
|
|
||||||
def die(msg: str) -> NoReturn:
|
def die(msg: str, *, context: Mapping[str, object] | None = None) -> NoReturn:
|
||||||
error(msg)
|
error(msg, context=context)
|
||||||
raise Die(1, msg)
|
raise Die(1, msg)
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ Bottle schema (frontmatter):
|
|||||||
repos: { <name>: <git-gate-entry>, ... } # optional
|
repos: { <name>: <git-gate-entry>, ... } # optional
|
||||||
egress: { routes: [ <egress-route>, ... ] }
|
egress: { routes: [ <egress-route>, ... ] }
|
||||||
# route keys: host, matches, auth, role, dlp
|
# route keys: host, matches, auth, role, dlp
|
||||||
supervise: <bool> # optional
|
supervise: <bool> # optional (default true)
|
||||||
|
|
||||||
Agent schema (frontmatter):
|
Agent schema (frontmatter):
|
||||||
bottle: <bottle-name> # required
|
bottle: <bottle-name> # required
|
||||||
@@ -111,13 +111,13 @@ class ManifestBottle:
|
|||||||
# identity without any git-gate.repos upstreams, and vice versa.
|
# identity without any git-gate.repos upstreams, and vice versa.
|
||||||
git_user: ManifestGitUser = field(default_factory=ManifestGitUser)
|
git_user: ManifestGitUser = field(default_factory=ManifestGitUser)
|
||||||
egress: ManifestEgressConfig = field(default_factory=ManifestEgressConfig)
|
egress: ManifestEgressConfig = field(default_factory=ManifestEgressConfig)
|
||||||
# Opt-in per-bottle stuck-recovery sidecar (PRD 0013). When true,
|
# Per-bottle stuck-recovery sidecar (PRD 0013). When true (the
|
||||||
# the launch step brings up a supervise sidecar that exposes MCP
|
# default, issue #249), the launch step brings up a supervise
|
||||||
# tools to the agent (egress-block, capability-block) plus mounts
|
# sidecar that exposes MCP tools to the agent (egress-block,
|
||||||
# the current-config dir read-only into the agent at
|
# capability-block) plus mounts the current-config dir read-only
|
||||||
# /etc/bot-bottle/current-config. False (the default) skips the
|
# into the agent at /etc/bot-bottle/current-config. Set
|
||||||
# sidecar and mount.
|
# `supervise: false` to skip the sidecar and mount.
|
||||||
supervise: bool = False
|
supervise: bool = True
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_dict(cls, name: str, raw: object) -> "ManifestBottle":
|
def from_dict(cls, name: str, raw: object) -> "ManifestBottle":
|
||||||
@@ -190,7 +190,7 @@ class ManifestBottle:
|
|||||||
else ManifestEgressConfig()
|
else ManifestEgressConfig()
|
||||||
)
|
)
|
||||||
|
|
||||||
supervise_raw = d.get("supervise", False)
|
supervise_raw = d.get("supervise", True)
|
||||||
if not isinstance(supervise_raw, bool):
|
if not isinstance(supervise_raw, bool):
|
||||||
raise ManifestError(
|
raise ManifestError(
|
||||||
f"bottle '{name}' supervise must be a boolean "
|
f"bottle '{name}' supervise must be a boolean "
|
||||||
|
|||||||
@@ -51,11 +51,13 @@ SUPERVISE_PORT = 9100
|
|||||||
TOOL_CAPABILITY_BLOCK = "capability-block"
|
TOOL_CAPABILITY_BLOCK = "capability-block"
|
||||||
TOOL_EGRESS_BLOCK = "egress-block"
|
TOOL_EGRESS_BLOCK = "egress-block"
|
||||||
TOOL_ALLOW = "allow"
|
TOOL_ALLOW = "allow"
|
||||||
|
TOOL_GITLEAKS_ALLOW = "gitleaks-allow"
|
||||||
TOOL_LIST_EGRESS_ROUTES = "list-egress-routes"
|
TOOL_LIST_EGRESS_ROUTES = "list-egress-routes"
|
||||||
TOOLS: tuple[str, ...] = (
|
TOOLS: tuple[str, ...] = (
|
||||||
TOOL_ALLOW,
|
TOOL_ALLOW,
|
||||||
TOOL_CAPABILITY_BLOCK,
|
TOOL_CAPABILITY_BLOCK,
|
||||||
TOOL_EGRESS_BLOCK,
|
TOOL_EGRESS_BLOCK,
|
||||||
|
TOOL_GITLEAKS_ALLOW,
|
||||||
TOOL_LIST_EGRESS_ROUTES,
|
TOOL_LIST_EGRESS_ROUTES,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -553,6 +555,7 @@ __all__ = [
|
|||||||
"EGRESS_FORWARD_PROXY",
|
"EGRESS_FORWARD_PROXY",
|
||||||
"EGRESS_INTROSPECT_URL",
|
"EGRESS_INTROSPECT_URL",
|
||||||
"TOOL_CAPABILITY_BLOCK",
|
"TOOL_CAPABILITY_BLOCK",
|
||||||
|
"TOOL_GITLEAKS_ALLOW",
|
||||||
"TOOL_LIST_EGRESS_ROUTES",
|
"TOOL_LIST_EGRESS_ROUTES",
|
||||||
"archive_proposal",
|
"archive_proposal",
|
||||||
"audit_dir",
|
"audit_dir",
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
# PRD prd-new: Commit bottle state to an image
|
# PRD 0060: Commit bottle state to an image
|
||||||
|
|
||||||
- **Status:** Active
|
- **Status:** Active
|
||||||
- **Author:** Claude
|
- **Author:** Claude
|
||||||
@@ -0,0 +1,101 @@
|
|||||||
|
# PRD 0061: Gitleaks inline suppression supervision
|
||||||
|
|
||||||
|
- **Status:** Active
|
||||||
|
- **Author:** codex
|
||||||
|
- **Created:** 2026-06-10
|
||||||
|
- **Issue:** #208
|
||||||
|
|
||||||
|
## Summary
|
||||||
|
|
||||||
|
Route every `# gitleaks:allow` inline suppression through the existing
|
||||||
|
supervisor approval queue before git-gate forwards a push upstream. The
|
||||||
|
annotation remains available for test fixtures and confirmed false positives,
|
||||||
|
but it no longer lets an agent silently bypass the gate's secret scan.
|
||||||
|
|
||||||
|
## Problem
|
||||||
|
|
||||||
|
Gitleaks supports inline suppression comments. With the default behaviour,
|
||||||
|
a pushed line that contains a secret-like value and `# gitleaks:allow` is
|
||||||
|
ignored by gitleaks and accepted by git-gate. That is convenient for dummy
|
||||||
|
credentials in tests, but it also gives an agent a one-line bypass for a real
|
||||||
|
secret. Once the commit lands, the diff alone cannot prove whether the
|
||||||
|
suppression was legitimate.
|
||||||
|
|
||||||
|
## Goals / Success Criteria
|
||||||
|
|
||||||
|
1. Git-gate continues to run the normal gitleaks scan for every incoming ref.
|
||||||
|
2. After the normal scan passes, git-gate runs a second scan with
|
||||||
|
`--ignore-gitleaks-allow` and a JSON report so suppressed findings become
|
||||||
|
visible.
|
||||||
|
3. If that second scan reports no suppressed findings, the push proceeds
|
||||||
|
unchanged.
|
||||||
|
4. If it reports suppressed findings, git-gate creates a `gitleaks-allow`
|
||||||
|
supervisor proposal containing the ref, file path, line number, rule,
|
||||||
|
commit, and flagged line for each finding.
|
||||||
|
5. The push proceeds only when the supervisor explicitly approves the
|
||||||
|
proposal; rejection, malformed responses, missing supervisor configuration,
|
||||||
|
and timeout all refuse the push.
|
||||||
|
6. The supervisor TUI requires a reason when approving a `gitleaks-allow`
|
||||||
|
proposal, so the audit trail records whether the approval was for a test
|
||||||
|
fixture or a false positive.
|
||||||
|
|
||||||
|
## Non-goals
|
||||||
|
|
||||||
|
- Replacing gitleaks or changing the main secret-detection rule set.
|
||||||
|
- Removing support for `# gitleaks:allow`.
|
||||||
|
- Automatically classifying fixture files or false positives.
|
||||||
|
- Adding new supervisor transport or authentication mechanisms.
|
||||||
|
|
||||||
|
## Design
|
||||||
|
|
||||||
|
### Git-gate flow
|
||||||
|
|
||||||
|
`git_gate_render_hook()` emits a `supervise_gitleaks_allow` shell helper.
|
||||||
|
For each incoming ref, git-gate first runs the existing gitleaks command. If
|
||||||
|
that scan passes, it runs:
|
||||||
|
|
||||||
|
```sh
|
||||||
|
gitleaks git \
|
||||||
|
--log-opts="$log_opts" \
|
||||||
|
--no-banner \
|
||||||
|
--redact \
|
||||||
|
--ignore-gitleaks-allow \
|
||||||
|
--report-format=json \
|
||||||
|
--report-path="$report_file" \
|
||||||
|
--exit-code 0
|
||||||
|
```
|
||||||
|
|
||||||
|
The second pass keeps the push path non-interactive while producing a report
|
||||||
|
of findings that would otherwise have been hidden by inline suppression.
|
||||||
|
|
||||||
|
### Supervisor proposal
|
||||||
|
|
||||||
|
When the JSON report contains findings, an embedded Python helper writes a
|
||||||
|
proposal into `SUPERVISE_QUEUE_DIR` using the existing proposal schema. The
|
||||||
|
proposal uses:
|
||||||
|
|
||||||
|
- `tool: "gitleaks-allow"`
|
||||||
|
- a text payload with the ref and each finding's file, line, rule, commit,
|
||||||
|
and redacted code line
|
||||||
|
- a justification that tells the operator to approve only dummy test fixtures
|
||||||
|
or confirmed false positives
|
||||||
|
|
||||||
|
Git-gate then waits for `<proposal-id>.response.json` for
|
||||||
|
`SUPERVISE_GITLEAKS_ALLOW_TIMEOUT_SECONDS`, defaulting to 300 seconds.
|
||||||
|
`approved` and `modified` responses allow the push; `rejected`, invalid
|
||||||
|
responses, invalid timeout configuration, or timeout refuse it.
|
||||||
|
|
||||||
|
### Supervisor UI
|
||||||
|
|
||||||
|
`TOOL_GITLEAKS_ALLOW` is added to the supervisor tool registry. The curses
|
||||||
|
supervisor renders the proposal as text and allows approval or rejection.
|
||||||
|
Modification is unavailable for this proposal type because there is no file
|
||||||
|
patch to apply. Approval from the TUI prompts for a non-empty reason and
|
||||||
|
writes that reason to the response/audit path.
|
||||||
|
|
||||||
|
### Tests
|
||||||
|
|
||||||
|
Unit tests assert that the rendered git-gate hook includes the second gitleaks
|
||||||
|
pass, supervisor queue fields, and fail-closed messages. Supervisor tests cover
|
||||||
|
the new tool constant, proposal archiving, and the required TUI approval
|
||||||
|
reason.
|
||||||
@@ -0,0 +1,186 @@
|
|||||||
|
# PRD prd-new: Strengthen outbound exfiltration detection
|
||||||
|
|
||||||
|
- **Status:** Active
|
||||||
|
- **Author:** claude
|
||||||
|
- **Created:** 2026-06-25
|
||||||
|
- **Issue:** #259
|
||||||
|
|
||||||
|
## Summary
|
||||||
|
|
||||||
|
The current DLP system has two detection tiers: weak (vendor token regexes)
|
||||||
|
and strong (exact matching of provisioned `EGRESS_TOKEN_*` values in multiple
|
||||||
|
encodings). This PRD adds four complementary hardening layers in ranked order
|
||||||
|
of impact:
|
||||||
|
|
||||||
|
1. **Canary tokens** — inject a per-session fake secret into the agent's
|
||||||
|
environment; any outbound appearance is a zero-false-positive exfil signal.
|
||||||
|
2. **Broadened known-value scanning** — extend the strong tier beyond
|
||||||
|
`EGRESS_TOKEN_*` via a `BOT_BOTTLE_SENSITIVE_PREFIXES` sidecar env var.
|
||||||
|
3. **Fragmentation-resistant matching** — catch separator injection and
|
||||||
|
long-substring leaks via alnum projection and sliding-window substring
|
||||||
|
search.
|
||||||
|
4. **Entropy scoring** — warn-only flag on high-entropy blobs as a
|
||||||
|
complementary signal for encrypted exfil.
|
||||||
|
|
||||||
|
## Problem
|
||||||
|
|
||||||
|
After PRD 0056 the outbound scan covers all HTTP request surfaces (host, path,
|
||||||
|
query, headers, body). But the content-based strong tier only matches
|
||||||
|
`EGRESS_TOKEN_*` values and their encodings. Three gaps remain:
|
||||||
|
|
||||||
|
| Gap | How an agent exploits it |
|
||||||
|
|-----|--------------------------|
|
||||||
|
| No canary | There is no "can't be false-positive" signal |
|
||||||
|
| Narrow prefix filter | Sensitive values provisioned under other env prefixes (MCP keys, API keys injected via the agent provider) are invisible to `scan_known_secrets` |
|
||||||
|
| Fragmentation blindness | Inserting `-`, space, or `\n` between secret characters turns an exact-match into a miss |
|
||||||
|
|
||||||
|
## Goals / Success Criteria
|
||||||
|
|
||||||
|
1. Each launched bottle has a unique canary token in the agent's environment
|
||||||
|
(`BOT_BOTTLE_CANARY`) and the egress sidecar's environment
|
||||||
|
(`EGRESS_TOKEN_CANARY`). Any outbound appearance of the canary blocks the
|
||||||
|
request with reason `"canary token"`.
|
||||||
|
2. `scan_known_secrets` accepts a `sensitive_prefixes` parameter (default:
|
||||||
|
`("EGRESS_TOKEN_",)`). `scan_outbound` reads
|
||||||
|
`BOT_BOTTLE_SENSITIVE_PREFIXES` from `environ` and merges those prefixes
|
||||||
|
in, so operators can mark additional env vars as scanned values without
|
||||||
|
changing the manifest schema.
|
||||||
|
3. For every secret that passes exact-match, a secondary alnum-projection pass
|
||||||
|
checks for the secret with all non-alphanumeric characters stripped. This
|
||||||
|
catches separator-injection evasion (`MY-SECRET` → body contains
|
||||||
|
`MY SECRET`).
|
||||||
|
4. A sliding-window partial-match pass checks for long-enough contiguous
|
||||||
|
substrings of the secret's alnum projection in the text's alnum projection.
|
||||||
|
Any match ≥ `PARTIAL_MATCH_MIN_LEN` (12 chars) blocks with reason
|
||||||
|
`"partial match"`.
|
||||||
|
5. A new `scan_entropy` detector flags outbound text windows with Shannon
|
||||||
|
entropy ≥ `ENTROPY_BLOCK_THRESHOLD` (5.5 bits/char) at **warn** severity
|
||||||
|
only. It is registered under the new detector name `"entropy"` in
|
||||||
|
`OUTBOUND_DETECTOR_NAMES` and disabled by default (routes must opt in).
|
||||||
|
6. Binary request bodies are decoded via `latin-1` instead of
|
||||||
|
`utf-8 errors="replace"`, preserving every byte value and allowing
|
||||||
|
ASCII-range secrets to be found within binary payloads.
|
||||||
|
7. All new behaviour is unit-tested; existing tests pass unchanged.
|
||||||
|
|
||||||
|
## Non-goals
|
||||||
|
|
||||||
|
- Rolling per-host buffer for split-across-requests detection (state in the
|
||||||
|
stateless addon is complex; deferred).
|
||||||
|
- Additional vendor regexes.
|
||||||
|
- ML / embedding-based detection.
|
||||||
|
- Entropy-based hard blocks (warn only per the issue).
|
||||||
|
|
||||||
|
## Design
|
||||||
|
|
||||||
|
### Canary token flow
|
||||||
|
|
||||||
|
```
|
||||||
|
Egress.prepare()
|
||||||
|
canary = secrets.token_urlsafe(32)
|
||||||
|
EgressPlan(canary=canary, ...)
|
||||||
|
|
||||||
|
Docker compose render:
|
||||||
|
sidecar env: EGRESS_TOKEN_CANARY=<canary> ← scanned by existing known-secrets detector
|
||||||
|
agent env: BOT_BOTTLE_CANARY=<canary> ← visible to agent as a "secret"
|
||||||
|
|
||||||
|
macos-container launch: same literals added to sidecar + agent env entries
|
||||||
|
```
|
||||||
|
|
||||||
|
`EGRESS_TOKEN_CANARY` matches the `EGRESS_TOKEN_` prefix already scanned by
|
||||||
|
`scan_known_secrets`, so no detector code changes are required for canary
|
||||||
|
detection — only the injection path.
|
||||||
|
|
||||||
|
### Broadened known-value scanning
|
||||||
|
|
||||||
|
`scan_known_secrets` gains a `sensitive_prefixes` parameter:
|
||||||
|
|
||||||
|
```python
|
||||||
|
def scan_known_secrets(
|
||||||
|
text: str,
|
||||||
|
*,
|
||||||
|
location: str = "body",
|
||||||
|
env: Mapping[str, str] | None = None,
|
||||||
|
sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",),
|
||||||
|
) -> ScanResult | None:
|
||||||
|
```
|
||||||
|
|
||||||
|
`scan_outbound` reads `BOT_BOTTLE_SENSITIVE_PREFIXES` (comma-separated list
|
||||||
|
of additional prefixes) from `environ` and appends them:
|
||||||
|
|
||||||
|
```python
|
||||||
|
extra = tuple(
|
||||||
|
p for p in environ.get("BOT_BOTTLE_SENSITIVE_PREFIXES", "").split(",") if p
|
||||||
|
)
|
||||||
|
sensitive_prefixes = ("EGRESS_TOKEN_",) + extra
|
||||||
|
```
|
||||||
|
|
||||||
|
`redact_tokens` receives the same treatment for consistent redaction.
|
||||||
|
|
||||||
|
### Fragmentation-resistant matching
|
||||||
|
|
||||||
|
A new helper `_alnum_projection(text)` strips all non-alphanumeric characters.
|
||||||
|
`scan_known_secrets` runs two passes per secret:
|
||||||
|
|
||||||
|
1. **Exact pass** — existing encoded-variant loop (unchanged).
|
||||||
|
2. **Alnum-projection pass** — if the secret's alnum projection has ≥ 8 chars,
|
||||||
|
check if it appears in the text's alnum projection. Match → block with
|
||||||
|
`"fragmented match (separator injection)"` reason.
|
||||||
|
3. **Partial-substring pass** — if the secret's alnum projection has ≥
|
||||||
|
`PARTIAL_MATCH_MIN_LEN` chars (12), slide a window of that length across the
|
||||||
|
secret's projection and look for each window in the text's alnum projection.
|
||||||
|
First match → block with `"partial match"` reason.
|
||||||
|
|
||||||
|
All three passes run only for the `"known_secrets"` detector; the token-pattern
|
||||||
|
and entropy detectors are unchanged.
|
||||||
|
|
||||||
|
### Entropy scoring
|
||||||
|
|
||||||
|
New public function:
|
||||||
|
|
||||||
|
```python
|
||||||
|
def scan_entropy(
|
||||||
|
text: str,
|
||||||
|
*,
|
||||||
|
location: str = "body",
|
||||||
|
window: int = ENTROPY_WINDOW, # 64
|
||||||
|
threshold: float = ENTROPY_BLOCK_THRESHOLD, # 5.5
|
||||||
|
) -> ScanResult | None:
|
||||||
|
```
|
||||||
|
|
||||||
|
Slides a window of `window` characters across `text` in steps of `window // 2`.
|
||||||
|
If any window's Shannon entropy exceeds `threshold`, returns a **warn**-severity
|
||||||
|
`ScanResult`. Never blocks.
|
||||||
|
|
||||||
|
`OUTBOUND_DETECTOR_NAMES` gains `"entropy"`. Routes opt in via their `dlp`
|
||||||
|
block; entropy scanning is **off by default** to avoid false-positive noise on
|
||||||
|
legitimate binary payloads.
|
||||||
|
|
||||||
|
### Binary body handling
|
||||||
|
|
||||||
|
In `scan_outbound`, the bytes → str decoding changes from:
|
||||||
|
|
||||||
|
```python
|
||||||
|
body.decode("utf-8", errors="replace")
|
||||||
|
```
|
||||||
|
|
||||||
|
to:
|
||||||
|
|
||||||
|
```python
|
||||||
|
body.decode("utf-8") if body is str else body.decode("latin-1")
|
||||||
|
```
|
||||||
|
|
||||||
|
`latin-1` is a bijective byte↔codepoint mapping; every byte value is preserved
|
||||||
|
as its corresponding Latin-1 code point, so ASCII-range secret strings remain
|
||||||
|
intact and `str.find` / regex still locate them correctly. The fallback from
|
||||||
|
strict UTF-8 is tried first so valid UTF-8 bodies are decoded faithfully.
|
||||||
|
|
||||||
|
## Implementation
|
||||||
|
|
||||||
|
Delivered in three commits on the same branch:
|
||||||
|
|
||||||
|
1. **DLP detector changes** — `_alnum_projection`, fragmentation passes,
|
||||||
|
`scan_entropy`, broadened `scan_known_secrets`, updated `scan_outbound` and
|
||||||
|
`redact_tokens`; all accompanying unit tests.
|
||||||
|
2. **Canary injection** — `EgressPlan.canary`, `Egress.prepare()`,
|
||||||
|
Docker compose + macos-container backend injection.
|
||||||
|
3. **PRD flip** — `Status: Draft → Active`.
|
||||||
@@ -22,7 +22,7 @@ escapes**, and **whether credentials are short-lived and scoped**.
|
|||||||
- Outbound: Docker containers have full internet access by default; no egress monitoring on most home networks
|
- Outbound: Docker containers have full internet access by default; no egress monitoring on most home networks
|
||||||
- Lateral movement: compromised container can reach the LAN — NAS, other machines, internal services
|
- Lateral movement: compromised container can reach the LAN — NAS, other machines, internal services
|
||||||
- Notable: CVE-2025-59536 (CVSS 8.7, Feb 2026) — a poisoned `.claude/settings.json` in a repo gives RCE when Claude Code opens it. `--dangerously-skip-permissions` removes the last gate.
|
- Notable: CVE-2025-59536 (CVSS 8.7, Feb 2026) — a poisoned `.claude/settings.json` in a repo gives RCE when Claude Code opens it. `--dangerously-skip-permissions` removes the last gate.
|
||||||
- Supply chain: MCP servers, skills, and npm packages pulled during agent execution. ~20% of ClawHub skills were found malicious in early 2026.
|
- Supply chain: MCP servers, skills, and npm packages pulled during agent execution. A Jan 2026 large-scale empirical study of a 98,380-skill snapshot confirmed 157 malicious skills, ~71% of them credential harvesters. Exfiltration was overwhelmingly naive — plaintext HTTP to hardcoded endpoints; under 10% used any code obfuscation, and concealment was mostly at the documentation level, not the code level. ([Malicious Agent Skills in the Wild](https://arxiv.org/html/2602.06547v1), arXiv:2602.06547)
|
||||||
|
|
||||||
**What local topology protects:**
|
**What local topology protects:**
|
||||||
- No inbound attack surface — nothing listening on a public port
|
- No inbound attack surface — nothing listening on a public port
|
||||||
|
|||||||
@@ -1,14 +1,14 @@
|
|||||||
---
|
---
|
||||||
agent_provider:
|
agent_provider:
|
||||||
template: claude
|
template: claude
|
||||||
|
# auth_token names the host env var holding the Claude OAuth token. The
|
||||||
egress:
|
# provider injects a provider-owned api.anthropic.com egress route that
|
||||||
routes:
|
# re-injects this token as the Bearer header; the agent only ever sees a
|
||||||
- host: api.anthropic.com
|
# placeholder CLAUDE_CODE_OAUTH_TOKEN. DLP defaults (token_patterns,
|
||||||
role: claude_code_oauth
|
# known_secrets outbound; naive_injection_detection inbound) apply to
|
||||||
auth:
|
# that route. To scan additional hosts, declare them under egress.routes
|
||||||
scheme: Bearer
|
# with per-route matches/dlp (see README "Egress route fields").
|
||||||
token_ref: BOT_BOTTLE_CLAUDE_OAUTH_TOKEN
|
auth_token: BOT_BOTTLE_CLAUDE_OAUTH_TOKEN
|
||||||
---
|
---
|
||||||
|
|
||||||
Common Claude provider boundary. Drop this file into
|
Common Claude provider boundary. Drop this file into
|
||||||
|
|||||||
@@ -1,18 +1,24 @@
|
|||||||
"""Unit: DLP detectors (PRD 0053).
|
"""Unit: DLP detectors (PRD 0053, prd-new).
|
||||||
|
|
||||||
Tests for token pattern scanning, known secret detection, and
|
Tests for token pattern scanning, known secret detection, fragmentation-
|
||||||
naive prompt injection detection."""
|
resistant matching, entropy scoring, and naive prompt injection detection."""
|
||||||
|
|
||||||
import base64
|
import base64
|
||||||
import gzip
|
import gzip
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from bot_bottle.dlp_detectors import (
|
from bot_bottle.dlp_detectors import (
|
||||||
|
ENTROPY_BLOCK_THRESHOLD,
|
||||||
|
ENTROPY_WINDOW,
|
||||||
|
PARTIAL_MATCH_MIN_LEN,
|
||||||
REDACT,
|
REDACT,
|
||||||
|
_alnum_projection,
|
||||||
_encoded_variants,
|
_encoded_variants,
|
||||||
_normalize_text,
|
_normalize_text,
|
||||||
|
_shannon_entropy,
|
||||||
redact_tokens,
|
redact_tokens,
|
||||||
scan_crlf_injection,
|
scan_crlf_injection,
|
||||||
|
scan_entropy,
|
||||||
scan_known_secrets,
|
scan_known_secrets,
|
||||||
scan_naive_injection,
|
scan_naive_injection,
|
||||||
scan_token_patterns,
|
scan_token_patterns,
|
||||||
@@ -445,5 +451,187 @@ class TestKnownSecretsNewVariants(unittest.TestCase):
|
|||||||
self.assertIsNotNone(result)
|
self.assertIsNotNone(result)
|
||||||
|
|
||||||
|
|
||||||
|
class TestAlnumProjection(unittest.TestCase):
|
||||||
|
def test_alphanumeric_unchanged(self):
|
||||||
|
self.assertEqual("abc123XYZ", _alnum_projection("abc123XYZ"))
|
||||||
|
|
||||||
|
def test_strips_hyphens(self):
|
||||||
|
self.assertEqual("mysecretvalue", _alnum_projection("my-secret-value"))
|
||||||
|
|
||||||
|
def test_strips_spaces(self):
|
||||||
|
self.assertEqual("mysecretvalue", _alnum_projection("my secret value"))
|
||||||
|
|
||||||
|
def test_strips_dots_and_underscores(self):
|
||||||
|
self.assertEqual("mysecretvalue", _alnum_projection("my.secret_value"))
|
||||||
|
|
||||||
|
def test_empty_string(self):
|
||||||
|
self.assertEqual("", _alnum_projection(""))
|
||||||
|
|
||||||
|
def test_all_special_chars(self):
|
||||||
|
self.assertEqual("", _alnum_projection("!@#$%^&*()"))
|
||||||
|
|
||||||
|
|
||||||
|
class TestFragmentationResistantMatching(unittest.TestCase):
|
||||||
|
"""scan_known_secrets catches separator-injection and partial-substring evasion."""
|
||||||
|
|
||||||
|
# Secrets long enough that their alnum projections are ≥ 8 chars.
|
||||||
|
SECRET = "supersecrettoken99"
|
||||||
|
ENV = {"EGRESS_TOKEN_0": SECRET}
|
||||||
|
|
||||||
|
def test_exact_match_still_works(self):
|
||||||
|
result = scan_known_secrets(f"key={self.SECRET}", env=self.ENV)
|
||||||
|
self.assertIsNotNone(result)
|
||||||
|
assert result is not None
|
||||||
|
self.assertEqual("block", result.severity)
|
||||||
|
|
||||||
|
def test_separator_injection_blocked(self):
|
||||||
|
# Hyphens inserted between chars of the secret.
|
||||||
|
fragmented = "-".join(self.SECRET)
|
||||||
|
result = scan_known_secrets(f"data={fragmented}", env=self.ENV)
|
||||||
|
self.assertIsNotNone(result)
|
||||||
|
assert result is not None
|
||||||
|
self.assertEqual("block", result.severity)
|
||||||
|
self.assertIn("separator injection", result.reason)
|
||||||
|
|
||||||
|
def test_space_separator_blocked(self):
|
||||||
|
fragmented = " ".join(self.SECRET)
|
||||||
|
result = scan_known_secrets(f"body: {fragmented}", env=self.ENV)
|
||||||
|
self.assertIsNotNone(result)
|
||||||
|
assert result is not None
|
||||||
|
self.assertIn("separator injection", result.reason)
|
||||||
|
|
||||||
|
def test_partial_substring_blocked(self):
|
||||||
|
# First PARTIAL_MATCH_MIN_LEN alnum chars of the secret, no separators.
|
||||||
|
partial = _alnum_projection(self.SECRET)[:PARTIAL_MATCH_MIN_LEN]
|
||||||
|
result = scan_known_secrets(f"x={partial}&y=other", env=self.ENV)
|
||||||
|
self.assertIsNotNone(result)
|
||||||
|
assert result is not None
|
||||||
|
self.assertEqual("block", result.severity)
|
||||||
|
self.assertIn("partial match", result.reason)
|
||||||
|
|
||||||
|
def test_short_secret_skips_projection(self):
|
||||||
|
# Secrets shorter than _ALNUM_MIN_LEN in alnum projection are not
|
||||||
|
# fragmentation-checked (too many false positives).
|
||||||
|
short_env = {"EGRESS_TOKEN_0": "abc"}
|
||||||
|
# "a b c" has alnum projection "abc" (3 chars, < 8); should not block.
|
||||||
|
self.assertIsNone(scan_known_secrets("a b c", env=short_env))
|
||||||
|
|
||||||
|
def test_clean_text_not_blocked(self):
|
||||||
|
self.assertIsNone(scan_known_secrets("nothing to see here", env=self.ENV))
|
||||||
|
|
||||||
|
def test_sensitive_prefixes_param_extra_prefix(self):
|
||||||
|
env = {"MY_CRED_0": self.SECRET, "IGNORED": "other"}
|
||||||
|
result = scan_known_secrets(
|
||||||
|
f"key={self.SECRET}",
|
||||||
|
env=env,
|
||||||
|
sensitive_prefixes=("MY_CRED_",),
|
||||||
|
)
|
||||||
|
self.assertIsNotNone(result)
|
||||||
|
assert result is not None
|
||||||
|
self.assertIn("MY_CRED_0", result.reason)
|
||||||
|
|
||||||
|
def test_sensitive_prefixes_default_only_egress_token(self):
|
||||||
|
# A value under a non-EGRESS_TOKEN_ key is ignored with default prefixes.
|
||||||
|
env = {"MY_CRED_0": self.SECRET}
|
||||||
|
self.assertIsNone(scan_known_secrets(f"key={self.SECRET}", env=env))
|
||||||
|
|
||||||
|
def test_canary_prefix_detected(self):
|
||||||
|
canary_value = "canary-fake-secret-value-xyz"
|
||||||
|
env = {"EGRESS_TOKEN_CANARY": canary_value}
|
||||||
|
result = scan_known_secrets(f"x={canary_value}", env=env)
|
||||||
|
self.assertIsNotNone(result)
|
||||||
|
assert result is not None
|
||||||
|
self.assertIn("EGRESS_TOKEN_CANARY", result.reason)
|
||||||
|
|
||||||
|
|
||||||
|
class TestRedactTokensBroadenedPrefixes(unittest.TestCase):
|
||||||
|
SECRET = "my-provisioned-secret"
|
||||||
|
|
||||||
|
def test_default_redacts_egress_token(self):
|
||||||
|
env = {"EGRESS_TOKEN_0": self.SECRET}
|
||||||
|
out = redact_tokens(f"val={self.SECRET}", env=env)
|
||||||
|
self.assertNotIn(self.SECRET, out)
|
||||||
|
self.assertIn(REDACT, out)
|
||||||
|
|
||||||
|
def test_extra_prefix_redacted(self):
|
||||||
|
env = {"MY_SECRET_KEY": self.SECRET}
|
||||||
|
out = redact_tokens(
|
||||||
|
f"val={self.SECRET}",
|
||||||
|
env=env,
|
||||||
|
sensitive_prefixes=("MY_SECRET_",),
|
||||||
|
)
|
||||||
|
self.assertNotIn(self.SECRET, out)
|
||||||
|
self.assertIn(REDACT, out)
|
||||||
|
|
||||||
|
def test_non_matching_prefix_not_redacted(self):
|
||||||
|
env = {"MY_SECRET_KEY": self.SECRET}
|
||||||
|
out = redact_tokens(f"val={self.SECRET}", env=env)
|
||||||
|
# Default prefixes only include EGRESS_TOKEN_ → secret not redacted
|
||||||
|
self.assertIn(self.SECRET, out)
|
||||||
|
|
||||||
|
|
||||||
|
class TestShannonEntropy(unittest.TestCase):
|
||||||
|
def test_empty_string_zero(self):
|
||||||
|
self.assertEqual(0.0, _shannon_entropy(""))
|
||||||
|
|
||||||
|
def test_single_char_zero(self):
|
||||||
|
self.assertEqual(0.0, _shannon_entropy("aaaaaa"))
|
||||||
|
|
||||||
|
def test_two_equal_chars_one_bit(self):
|
||||||
|
self.assertAlmostEqual(1.0, _shannon_entropy("abababab"), places=10)
|
||||||
|
|
||||||
|
def test_high_entropy_random_like(self):
|
||||||
|
# Uniform 64-char string over 64 distinct symbols has entropy 6 bits.
|
||||||
|
import string
|
||||||
|
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
|
||||||
|
text = alphabet # each char appears exactly once
|
||||||
|
self.assertAlmostEqual(6.0, _shannon_entropy(text), places=10)
|
||||||
|
|
||||||
|
|
||||||
|
class TestScanEntropy(unittest.TestCase):
|
||||||
|
def test_empty_returns_none(self):
|
||||||
|
self.assertIsNone(scan_entropy(""))
|
||||||
|
|
||||||
|
def test_low_entropy_returns_none(self):
|
||||||
|
# Highly repetitive text has low entropy.
|
||||||
|
self.assertIsNone(scan_entropy("a" * 200))
|
||||||
|
|
||||||
|
def test_high_entropy_warns(self):
|
||||||
|
# Build a 64-char string with entropy > ENTROPY_BLOCK_THRESHOLD.
|
||||||
|
# Use all 64 distinct printable chars to maximise entropy (~6 bits).
|
||||||
|
import string
|
||||||
|
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
|
||||||
|
result = scan_entropy(alphabet, threshold=ENTROPY_BLOCK_THRESHOLD)
|
||||||
|
self.assertIsNotNone(result)
|
||||||
|
assert result is not None
|
||||||
|
self.assertEqual("warn", result.severity)
|
||||||
|
self.assertIn("high-entropy", result.reason)
|
||||||
|
|
||||||
|
def test_never_blocks(self):
|
||||||
|
import string
|
||||||
|
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
|
||||||
|
result = scan_entropy(alphabet)
|
||||||
|
# scan_entropy is warn-only; it must never return severity="block".
|
||||||
|
if result is not None:
|
||||||
|
self.assertNotEqual("block", result.severity)
|
||||||
|
|
||||||
|
def test_location_in_result(self):
|
||||||
|
import string
|
||||||
|
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
|
||||||
|
result = scan_entropy(alphabet, location="authorization header")
|
||||||
|
if result is not None:
|
||||||
|
self.assertIn("authorization header", result.location)
|
||||||
|
|
||||||
|
def test_structured_json_no_warn(self):
|
||||||
|
# Typical JSON has low entropy and should not be flagged.
|
||||||
|
json_body = '{"status": "ok", "message": "hello world", "count": 42}'
|
||||||
|
self.assertIsNone(scan_entropy(json_body))
|
||||||
|
|
||||||
|
def test_short_text_below_window(self):
|
||||||
|
# Text shorter than the window: checked as one chunk.
|
||||||
|
# Use a uniform string to ensure it won't be flagged.
|
||||||
|
self.assertIsNone(scan_entropy("abcde", threshold=ENTROPY_BLOCK_THRESHOLD))
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|||||||
@@ -1,10 +1,14 @@
|
|||||||
"""Unit: Egress route lift + routes.yaml render + token
|
"""Unit: Egress route lift + routes.yaml render + token
|
||||||
resolution (PRD 0017, PRD 0053)."""
|
resolution (PRD 0017, PRD 0053, prd-new)."""
|
||||||
|
|
||||||
|
import tempfile
|
||||||
import unittest
|
import unittest
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
from bot_bottle.egress import (
|
from bot_bottle.egress import (
|
||||||
CODEX_HOST_CREDENTIAL_TOKEN_REF,
|
CODEX_HOST_CREDENTIAL_TOKEN_REF,
|
||||||
|
Egress,
|
||||||
|
EgressPlan,
|
||||||
EgressRoute,
|
EgressRoute,
|
||||||
egress_manifest_routes,
|
egress_manifest_routes,
|
||||||
egress_render_routes,
|
egress_render_routes,
|
||||||
@@ -409,5 +413,64 @@ class TestResolveTokenValues(unittest.TestCase):
|
|||||||
self.assertEqual({"EGRESS_TOKEN_0": "codex-access-token"}, out)
|
self.assertEqual({"EGRESS_TOKEN_0": "codex-access-token"}, out)
|
||||||
|
|
||||||
|
|
||||||
|
class TestCanaryGeneration(unittest.TestCase):
|
||||||
|
"""Egress.prepare() generates a unique canary token per session (prd-new)."""
|
||||||
|
|
||||||
|
def _bottle_obj(self):
|
||||||
|
return ManifestIndex.from_json_obj({
|
||||||
|
"bottles": {"dev": {"egress": {"routes": []}}},
|
||||||
|
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
|
||||||
|
}).bottles["dev"]
|
||||||
|
|
||||||
|
def _make_plan(self) -> EgressPlan:
|
||||||
|
# Use a concrete no-op subclass so we can call prepare() without
|
||||||
|
# a real backend.
|
||||||
|
class _TestEgress(Egress):
|
||||||
|
pass
|
||||||
|
|
||||||
|
e = _TestEgress()
|
||||||
|
with tempfile.TemporaryDirectory() as td:
|
||||||
|
return e.prepare(self._bottle_obj(), "test-slug", Path(td))
|
||||||
|
|
||||||
|
def test_canary_is_non_empty(self):
|
||||||
|
plan = self._make_plan()
|
||||||
|
self.assertIsInstance(plan.canary, str)
|
||||||
|
self.assertGreater(len(plan.canary), 0)
|
||||||
|
|
||||||
|
def test_canary_is_unique_per_session(self):
|
||||||
|
with tempfile.TemporaryDirectory() as td:
|
||||||
|
bottle = self._bottle_obj()
|
||||||
|
|
||||||
|
class _TestEgress(Egress):
|
||||||
|
pass
|
||||||
|
|
||||||
|
e = _TestEgress()
|
||||||
|
plan_a = e.prepare(bottle, "slug-a", Path(td))
|
||||||
|
plan_b = e.prepare(bottle, "slug-b", Path(td))
|
||||||
|
self.assertNotEqual(plan_a.canary, plan_b.canary)
|
||||||
|
|
||||||
|
def test_canary_detected_by_scan_known_secrets(self):
|
||||||
|
from bot_bottle.dlp_detectors import scan_known_secrets
|
||||||
|
|
||||||
|
plan = self._make_plan()
|
||||||
|
env = {"EGRESS_TOKEN_CANARY": plan.canary}
|
||||||
|
result = scan_known_secrets(f"exfil={plan.canary}", env=env)
|
||||||
|
self.assertIsNotNone(result)
|
||||||
|
assert result is not None
|
||||||
|
self.assertEqual("block", result.severity)
|
||||||
|
self.assertIn("EGRESS_TOKEN_CANARY", result.reason)
|
||||||
|
|
||||||
|
def test_egress_plan_canary_field_default_empty(self):
|
||||||
|
# Verify EgressPlan can be constructed with an empty canary (backward compat).
|
||||||
|
from pathlib import Path
|
||||||
|
plan = EgressPlan(
|
||||||
|
slug="s",
|
||||||
|
routes_path=Path("/tmp/r.yaml"),
|
||||||
|
routes=(),
|
||||||
|
token_env_map={},
|
||||||
|
)
|
||||||
|
self.assertEqual("", plan.canary)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|||||||
@@ -1167,5 +1167,103 @@ class TestScanInbound(unittest.TestCase):
|
|||||||
self.assertEqual("block", result.severity)
|
self.assertEqual("block", result.severity)
|
||||||
|
|
||||||
|
|
||||||
|
class TestScanOutboundEnhanced(unittest.TestCase):
|
||||||
|
"""scan_outbound changes from prd-new: binary decode, entropy detector,
|
||||||
|
broadened known-value prefixes, fragmentation resistance."""
|
||||||
|
|
||||||
|
_ROUTE = Route(host="api.example.com")
|
||||||
|
_ROUTE_ENTROPY = Route(
|
||||||
|
host="api.example.com",
|
||||||
|
outbound_detectors=("entropy",),
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_binary_body_latin1_decode_finds_ascii_secret(self):
|
||||||
|
# Body contains valid ASCII secret surrounded by non-UTF-8 bytes.
|
||||||
|
secret = "supersecrettoken99"
|
||||||
|
env = {"EGRESS_TOKEN_0": secret}
|
||||||
|
# Wrap the secret in bytes that are invalid UTF-8.
|
||||||
|
body = b"\x80\x81" + secret.encode("ascii") + b"\xff"
|
||||||
|
result = scan_outbound(self._ROUTE, body, env)
|
||||||
|
self.assertIsNotNone(result)
|
||||||
|
assert result is not None
|
||||||
|
self.assertEqual("block", result.severity)
|
||||||
|
|
||||||
|
def test_binary_body_valid_utf8_decoded_correctly(self):
|
||||||
|
env = {"EGRESS_TOKEN_0": "mysecret"}
|
||||||
|
# Valid UTF-8 body — should be decoded as UTF-8, not latin-1.
|
||||||
|
body = "clean body with mysecret".encode("utf-8")
|
||||||
|
result = scan_outbound(self._ROUTE, body, env)
|
||||||
|
self.assertIsNotNone(result)
|
||||||
|
|
||||||
|
def test_entropy_detector_off_by_default(self):
|
||||||
|
import string
|
||||||
|
# High-entropy content should NOT warn if the route has no entropy detector.
|
||||||
|
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
|
||||||
|
result = scan_outbound(self._ROUTE, alphabet, {})
|
||||||
|
self.assertIsNone(result)
|
||||||
|
|
||||||
|
def test_entropy_detector_warns_when_enabled(self):
|
||||||
|
import string
|
||||||
|
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
|
||||||
|
result = scan_outbound(self._ROUTE_ENTROPY, alphabet, {})
|
||||||
|
self.assertIsNotNone(result)
|
||||||
|
assert result is not None
|
||||||
|
self.assertEqual("warn", result.severity)
|
||||||
|
|
||||||
|
def test_bot_bottle_sensitive_prefixes_env_var(self):
|
||||||
|
# When the sidecar env contains BOT_BOTTLE_SENSITIVE_PREFIXES,
|
||||||
|
# scan_outbound should scan those additional prefixes.
|
||||||
|
secret = "extra-sensitive-value-abc"
|
||||||
|
env = {
|
||||||
|
"MY_CRED_KEY": secret,
|
||||||
|
"BOT_BOTTLE_SENSITIVE_PREFIXES": "MY_CRED_",
|
||||||
|
}
|
||||||
|
result = scan_outbound(self._ROUTE, f"x={secret}", env)
|
||||||
|
self.assertIsNotNone(result)
|
||||||
|
assert result is not None
|
||||||
|
self.assertEqual("block", result.severity)
|
||||||
|
|
||||||
|
def test_bot_bottle_sensitive_prefixes_multiple(self):
|
||||||
|
secret = "my-api-key-value-xyz"
|
||||||
|
env = {
|
||||||
|
"ANTHROPIC_API_0": secret,
|
||||||
|
"BOT_BOTTLE_SENSITIVE_PREFIXES": "ANTHROPIC_API_,OTHER_",
|
||||||
|
}
|
||||||
|
result = scan_outbound(self._ROUTE, f"auth={secret}", env)
|
||||||
|
self.assertIsNotNone(result)
|
||||||
|
|
||||||
|
def test_canary_detected_via_egress_token_canary(self):
|
||||||
|
# The canary (injected as EGRESS_TOKEN_CANARY) is caught by known_secrets.
|
||||||
|
canary = "canaryvalue12345abcdef"
|
||||||
|
env = {"EGRESS_TOKEN_CANARY": canary}
|
||||||
|
result = scan_outbound(self._ROUTE, f"data={canary}", env)
|
||||||
|
self.assertIsNotNone(result)
|
||||||
|
assert result is not None
|
||||||
|
self.assertEqual("block", result.severity)
|
||||||
|
self.assertIn("EGRESS_TOKEN_CANARY", result.reason)
|
||||||
|
|
||||||
|
def test_fragmented_canary_blocked(self):
|
||||||
|
# Canary with separators injected is still caught.
|
||||||
|
canary = "supersecretcanary99"
|
||||||
|
env = {"EGRESS_TOKEN_CANARY": canary}
|
||||||
|
fragmented = "-".join(canary)
|
||||||
|
result = scan_outbound(self._ROUTE, f"x={fragmented}", env)
|
||||||
|
self.assertIsNotNone(result)
|
||||||
|
|
||||||
|
|
||||||
|
class TestOutboundDetectorNames(unittest.TestCase):
|
||||||
|
def test_entropy_in_outbound_detector_names(self):
|
||||||
|
from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES
|
||||||
|
self.assertIn("entropy", OUTBOUND_DETECTOR_NAMES)
|
||||||
|
|
||||||
|
def test_known_secrets_in_outbound_detector_names(self):
|
||||||
|
from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES
|
||||||
|
self.assertIn("known_secrets", OUTBOUND_DETECTOR_NAMES)
|
||||||
|
|
||||||
|
def test_token_patterns_in_outbound_detector_names(self):
|
||||||
|
from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES
|
||||||
|
self.assertIn("token_patterns", OUTBOUND_DETECTOR_NAMES)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|||||||
@@ -199,6 +199,30 @@ class TestHookRender(unittest.TestCase):
|
|||||||
self.assertIn('set -- "$@" --push-option="$opt"', hook)
|
self.assertIn('set -- "$@" --push-option="$opt"', hook)
|
||||||
self.assertIn('git push "$@" origin "$refspec"', hook)
|
self.assertIn('git push "$@" origin "$refspec"', hook)
|
||||||
|
|
||||||
|
def test_inline_gitleaks_allow_routes_to_supervisor(self):
|
||||||
|
hook = git_gate_render_hook()
|
||||||
|
# First gitleaks runs normally; only if that passes does the
|
||||||
|
# hook ask gitleaks to ignore inline allow comments and report
|
||||||
|
# the suppressed findings for human approval.
|
||||||
|
self.assertIn("--ignore-gitleaks-allow", hook)
|
||||||
|
self.assertIn("--report-format=json", hook)
|
||||||
|
self.assertIn('"tool": "gitleaks-allow"', hook)
|
||||||
|
self.assertIn("SUPERVISE_QUEUE_DIR", hook)
|
||||||
|
self.assertIn("SUPERVISE_BOTTLE_SLUG", hook)
|
||||||
|
self.assertIn("supervisor approved # gitleaks:allow", hook)
|
||||||
|
self.assertIn("supervisor rejected # gitleaks:allow", hook)
|
||||||
|
|
||||||
|
def test_inline_gitleaks_allow_fails_closed_without_supervisor(self):
|
||||||
|
hook = git_gate_render_hook()
|
||||||
|
self.assertIn(
|
||||||
|
"cannot route # gitleaks:allow finding to supervisor; refusing push",
|
||||||
|
hook,
|
||||||
|
)
|
||||||
|
self.assertIn(
|
||||||
|
"supervisor approval timed out for # gitleaks:allow; refusing push",
|
||||||
|
hook,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class TestAccessHookRender(unittest.TestCase):
|
class TestAccessHookRender(unittest.TestCase):
|
||||||
def test_access_hook_refreshes_origin_on_upload_pack(self):
|
def test_access_hook_refreshes_origin_on_upload_pack(self):
|
||||||
|
|||||||
@@ -0,0 +1,127 @@
|
|||||||
|
"""Unit: leveled + structured logging wrappers (issue #252).
|
||||||
|
|
||||||
|
Locks three properties of bot_bottle.log:
|
||||||
|
- backward compatibility — default output is byte-identical to the
|
||||||
|
original bare wrappers, so the 100+ existing single-string call
|
||||||
|
sites are unaffected;
|
||||||
|
- context rendering — an optional mapping becomes a parseable
|
||||||
|
` [k=v ...]` suffix;
|
||||||
|
- level gating — BOT_BOTTLE_LOG_LEVEL filters by severity, debug is
|
||||||
|
silent by default, and error always surfaces.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import contextlib
|
||||||
|
import io
|
||||||
|
import unittest
|
||||||
|
from typing import Callable
|
||||||
|
from unittest import mock
|
||||||
|
|
||||||
|
from bot_bottle import log
|
||||||
|
|
||||||
|
|
||||||
|
def _capture(
|
||||||
|
fn: Callable[..., None],
|
||||||
|
*args: object,
|
||||||
|
env: dict[str, str] | None = None,
|
||||||
|
**kwargs: object,
|
||||||
|
) -> str:
|
||||||
|
buf = io.StringIO()
|
||||||
|
patched = mock.patch.dict("os.environ", env or {}, clear=False)
|
||||||
|
with patched, contextlib.redirect_stderr(buf):
|
||||||
|
fn(*args, **kwargs)
|
||||||
|
return buf.getvalue()
|
||||||
|
|
||||||
|
|
||||||
|
class TestBackwardCompat(unittest.TestCase):
|
||||||
|
"""No context + default level → exactly the legacy lines."""
|
||||||
|
|
||||||
|
def test_info(self):
|
||||||
|
self.assertEqual("bot-bottle: hello\n", _capture(log.info, "hello"))
|
||||||
|
|
||||||
|
def test_warn(self):
|
||||||
|
self.assertEqual(
|
||||||
|
"bot-bottle: warning: careful\n", _capture(log.warn, "careful")
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_error(self):
|
||||||
|
self.assertEqual(
|
||||||
|
"bot-bottle: error: boom\n", _capture(log.error, "boom")
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestContext(unittest.TestCase):
|
||||||
|
def test_appends_sorted_parseable_suffix(self):
|
||||||
|
out = _capture(
|
||||||
|
log.error, "rpc failed", context={"slug": "abc123", "code": "-32603"}
|
||||||
|
)
|
||||||
|
# keys sorted: code before slug
|
||||||
|
self.assertEqual(
|
||||||
|
"bot-bottle: error: rpc failed [code=-32603 slug=abc123]\n", out
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_quotes_values_with_whitespace(self):
|
||||||
|
out = _capture(
|
||||||
|
log.info, "did thing", context={"path": "/a b/c", "ok": "yes"}
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
'bot-bottle: did thing [ok=yes path="/a b/c"]\n', out
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_empty_context_is_noop_suffix(self):
|
||||||
|
self.assertEqual(
|
||||||
|
"bot-bottle: x\n", _capture(log.info, "x", context={})
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestLevels(unittest.TestCase):
|
||||||
|
def test_debug_silent_by_default(self):
|
||||||
|
self.assertEqual("", _capture(log.debug, "trace"))
|
||||||
|
|
||||||
|
def test_debug_emits_when_level_lowered(self):
|
||||||
|
out = _capture(log.debug, "trace", env={"BOT_BOTTLE_LOG_LEVEL": "debug"})
|
||||||
|
self.assertEqual("bot-bottle: debug: trace\n", out)
|
||||||
|
|
||||||
|
def test_error_level_suppresses_info_and_warn(self):
|
||||||
|
env = {"BOT_BOTTLE_LOG_LEVEL": "error"}
|
||||||
|
self.assertEqual("", _capture(log.info, "i", env=env))
|
||||||
|
self.assertEqual("", _capture(log.warn, "w", env=env))
|
||||||
|
# error still surfaces — nothing sits above it
|
||||||
|
self.assertEqual(
|
||||||
|
"bot-bottle: error: e\n", _capture(log.error, "e", env=env)
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_unknown_level_falls_back_to_default(self):
|
||||||
|
# garbage value → default INFO threshold, so info still prints
|
||||||
|
out = _capture(log.info, "i", env={"BOT_BOTTLE_LOG_LEVEL": "loud"})
|
||||||
|
self.assertEqual("bot-bottle: i\n", out)
|
||||||
|
|
||||||
|
def test_warning_alias_accepted(self):
|
||||||
|
env = {"BOT_BOTTLE_LOG_LEVEL": "warning"}
|
||||||
|
self.assertEqual("", _capture(log.info, "i", env=env))
|
||||||
|
self.assertEqual(
|
||||||
|
"bot-bottle: warning: w\n", _capture(log.warn, "w", env=env)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestDie(unittest.TestCase):
|
||||||
|
def test_die_still_raises_and_prints_error(self):
|
||||||
|
buf = io.StringIO()
|
||||||
|
with contextlib.redirect_stderr(buf):
|
||||||
|
with self.assertRaises(log.Die) as cm:
|
||||||
|
log.die("fatal thing")
|
||||||
|
self.assertEqual("fatal thing", cm.exception.message)
|
||||||
|
self.assertIn("bot-bottle: error: fatal thing", buf.getvalue())
|
||||||
|
|
||||||
|
def test_die_surfaces_even_at_error_level(self):
|
||||||
|
buf = io.StringIO()
|
||||||
|
with mock.patch.dict("os.environ", {"BOT_BOTTLE_LOG_LEVEL": "error"}):
|
||||||
|
with contextlib.redirect_stderr(buf):
|
||||||
|
with self.assertRaises(log.Die):
|
||||||
|
log.die("still fatal")
|
||||||
|
self.assertIn("bot-bottle: error: still fatal", buf.getvalue())
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
@@ -42,6 +42,7 @@ def _plan(
|
|||||||
routes_path=routes_path,
|
routes_path=routes_path,
|
||||||
routes=("route",),
|
routes=("route",),
|
||||||
token_env_map={"EGRESS_TOKEN_0": "HOST_TOKEN"},
|
token_env_map={"EGRESS_TOKEN_0": "HOST_TOKEN"},
|
||||||
|
canary="",
|
||||||
)
|
)
|
||||||
if git:
|
if git:
|
||||||
key_path = stage_dir / "origin-key"
|
key_path = stage_dir / "origin-key"
|
||||||
@@ -271,7 +272,7 @@ def _build_plan(stage_dir: Path) -> MacosContainerBottlePlan:
|
|||||||
manifest=_MANIFEST,
|
manifest=_MANIFEST,
|
||||||
stage_dir=stage_dir,
|
stage_dir=stage_dir,
|
||||||
git_gate_plan=cast(GitGatePlan, SimpleNamespace(upstreams=())),
|
git_gate_plan=cast(GitGatePlan, SimpleNamespace(upstreams=())),
|
||||||
egress_plan=cast(EgressPlan, SimpleNamespace()),
|
egress_plan=cast(EgressPlan, SimpleNamespace(canary="")),
|
||||||
supervise_plan=None,
|
supervise_plan=None,
|
||||||
agent_provision=AgentProvisionPlan(
|
agent_provision=AgentProvisionPlan(
|
||||||
template="claude",
|
template="claude",
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ from bot_bottle.supervise import (
|
|||||||
STATUS_MODIFIED,
|
STATUS_MODIFIED,
|
||||||
STATUS_REJECTED,
|
STATUS_REJECTED,
|
||||||
TOOL_CAPABILITY_BLOCK,
|
TOOL_CAPABILITY_BLOCK,
|
||||||
|
TOOL_GITLEAKS_ALLOW,
|
||||||
archive_proposal,
|
archive_proposal,
|
||||||
audit_log_path,
|
audit_log_path,
|
||||||
list_pending_proposals,
|
list_pending_proposals,
|
||||||
@@ -320,6 +321,7 @@ class TestToolConstants(unittest.TestCase):
|
|||||||
supervise.TOOL_ALLOW,
|
supervise.TOOL_ALLOW,
|
||||||
TOOL_CAPABILITY_BLOCK,
|
TOOL_CAPABILITY_BLOCK,
|
||||||
supervise.TOOL_EGRESS_BLOCK,
|
supervise.TOOL_EGRESS_BLOCK,
|
||||||
|
TOOL_GITLEAKS_ALLOW,
|
||||||
supervise.TOOL_LIST_EGRESS_ROUTES,
|
supervise.TOOL_LIST_EGRESS_ROUTES,
|
||||||
),
|
),
|
||||||
supervise.TOOLS,
|
supervise.TOOLS,
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ from bot_bottle.supervise import (
|
|||||||
STATUS_MODIFIED,
|
STATUS_MODIFIED,
|
||||||
STATUS_REJECTED,
|
STATUS_REJECTED,
|
||||||
TOOL_CAPABILITY_BLOCK,
|
TOOL_CAPABILITY_BLOCK,
|
||||||
|
TOOL_GITLEAKS_ALLOW,
|
||||||
read_audit_entries,
|
read_audit_entries,
|
||||||
read_response,
|
read_response,
|
||||||
sha256_hex,
|
sha256_hex,
|
||||||
@@ -33,6 +34,7 @@ def _proposal(slug: str = "dev", tool: str = TOOL_CAPABILITY_BLOCK) -> Proposal:
|
|||||||
TOOL_CAPABILITY_BLOCK: "FROM python:3.13\n",
|
TOOL_CAPABILITY_BLOCK: "FROM python:3.13\n",
|
||||||
supervise.TOOL_ALLOW: "routes:\n - host: example.com\n",
|
supervise.TOOL_ALLOW: "routes:\n - host: example.com\n",
|
||||||
supervise.TOOL_EGRESS_BLOCK: "routes:\n - host: example.com\n",
|
supervise.TOOL_EGRESS_BLOCK: "routes:\n - host: example.com\n",
|
||||||
|
TOOL_GITLEAKS_ALLOW: "file: tests/test_fixture.py\nline: 3\n",
|
||||||
}
|
}
|
||||||
payload = payloads.get(tool, "")
|
payload = payloads.get(tool, "")
|
||||||
return Proposal.new(
|
return Proposal.new(
|
||||||
@@ -170,6 +172,30 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
|
|||||||
self.assertEqual(STATUS_APPROVED, entries[0].operator_action)
|
self.assertEqual(STATUS_APPROVED, entries[0].operator_action)
|
||||||
self.assertEqual("needed for dev", entries[0].justification)
|
self.assertEqual("needed for dev", entries[0].justification)
|
||||||
|
|
||||||
|
def test_approve_gitleaks_allow_leaves_response_for_gate(self):
|
||||||
|
qp = self._enqueue(tool=TOOL_GITLEAKS_ALLOW)
|
||||||
|
supervise_cli.approve(qp, notes="dummy fixture")
|
||||||
|
# Gate polls the queue dir for the response; TUI must not archive it.
|
||||||
|
resp = read_response(qp.queue_dir, qp.proposal.id)
|
||||||
|
self.assertEqual(STATUS_APPROVED, resp.status)
|
||||||
|
self.assertEqual("dummy fixture", resp.notes)
|
||||||
|
self.assertFalse((qp.queue_dir / "processed").exists())
|
||||||
|
|
||||||
|
def test_tui_gitleaks_allow_requires_reason(self):
|
||||||
|
qp = self._enqueue(tool=TOOL_GITLEAKS_ALLOW)
|
||||||
|
with patch.object(supervise_cli, "_prompt", return_value=""):
|
||||||
|
status = supervise_cli._approve_from_tui(None, qp) # type: ignore[arg-type]
|
||||||
|
self.assertEqual("approve aborted (empty reason)", status)
|
||||||
|
self.assertFalse((qp.queue_dir / "processed").exists())
|
||||||
|
|
||||||
|
def test_tui_gitleaks_allow_writes_reason(self):
|
||||||
|
qp = self._enqueue(tool=TOOL_GITLEAKS_ALLOW)
|
||||||
|
with patch.object(supervise_cli, "_prompt", return_value="test fixture"):
|
||||||
|
status = supervise_cli._approve_from_tui(None, qp) # type: ignore[arg-type]
|
||||||
|
self.assertIn("approved gitleaks-allow", status)
|
||||||
|
resp = read_response(qp.queue_dir, qp.proposal.id)
|
||||||
|
self.assertEqual("test fixture", resp.notes)
|
||||||
|
|
||||||
|
|
||||||
# class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
|
# class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
|
||||||
# # DISABLED — capability_apply functionality is currently commented out.
|
# # DISABLED — capability_apply functionality is currently commented out.
|
||||||
|
|||||||
Reference in New Issue
Block a user