Compare commits

...

3 Commits

Author SHA1 Message Date
didericis-claude 813cb685bf docs: draft PRD prd-new for strengthen-outbound-exfil-detection
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-25 00:54:28 +00:00
didericis 7cb967770e feat(log): add leveled severity and structured context to log wrappers
test / unit (pull_request) Successful in 31s
test / integration (pull_request) Successful in 16s
lint / lint (push) Successful in 1m39s
test / unit (push) Successful in 31s
test / integration (push) Successful in 16s
Update Quality Badges / update-badges (push) Successful in 1m32s
log.py was bare print-to-stderr wrappers with no levels or attributable
context (issue #252). Add:

- Ordered severities (debug/info/warn/error) gated by
  BOT_BOTTLE_LOG_LEVEL (default info). debug is silent by default;
  error always surfaces (nothing sits above it), so the fatal die path
  stays visible regardless of configured level.
- An optional `context` mapping on every wrapper, rendered as a
  parseable ` [k=v ...]` suffix (keys sorted; whitespace/quoted values
  quoted) so failures can be filtered and correlated.

Default output with no context is byte-identical to the original lines,
so the 100+ existing single-string call sites are unaffected. Wires the
supervise crash path (the example the issue names) to attach error_type
and crash_log context. Adds test_log.py (backward-compat, context
rendering, level gating, die surfacing).

Closes #252.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01YcU7nerbg8cVj9R4EkpfLJ
2026-06-24 15:37:57 -04:00
didericis 80eca740d6 docs(research): replace unsourced "20% malicious skills" with cited empirical figures
The "~20% of ClawHub skills malicious" claim had no traceable source and
is contradicted by the empirical literature. Replace with the Jan 2026
large-scale study (98,380-skill snapshot: 157 confirmed malicious, ~71%
credential harvesters, exfiltration overwhelmingly naive) and add the
arXiv citation. The corrected figures still support the supply-chain
threat point and are defensible under scrutiny.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01YcU7nerbg8cVj9R4EkpfLJ
2026-06-24 09:32:19 -04:00
5 changed files with 414 additions and 12 deletions
+4 -1
View File
@@ -292,7 +292,10 @@ def cmd_supervise(argv: list[str]) -> int:
return e.code if isinstance(e.code, int) else 1 return e.code if isinstance(e.code, int) else 1
except Exception as e: # noqa: W0718 — catch supervise crash for logging except Exception as e: # noqa: W0718 — catch supervise crash for logging
log_path = _write_crash_log(e) log_path = _write_crash_log(e)
error(f"supervise crashed: {type(e).__name__}: {e}") error(
f"supervise crashed: {type(e).__name__}: {e}",
context={"error_type": type(e).__name__, "crash_log": str(log_path)},
)
error(f"full traceback written to {log_path}") error(f"full traceback written to {log_path}")
return 1 return 1
return 0 return 0
+96 -10
View File
@@ -1,21 +1,107 @@
"""Tiny logging wrappers. All output goes to stderr.""" """Tiny logging wrappers. All output goes to stderr.
Two capabilities layer onto the bare wrappers (issue #252):
- **Levels.** `debug` / `info` / `warn` / `error` carry an ordered
severity. Output is gated by `BOT_BOTTLE_LOG_LEVEL` (debug | info |
warn | error; default `info`). A message emits when its severity is
at or above the threshold, so `debug` is silent by default and
`error` always surfaces (nothing sits above it) — which keeps the
fatal `die` path visible regardless of the configured level.
- **Context.** Every wrapper takes an optional `context` mapping that
renders as a parseable ` [k=v ...]` suffix (keys sorted; values with
whitespace/quotes are quoted), so failures can be filtered and
correlated instead of being flat strings.
With no `context` and the default level, output is byte-identical to the
original `bot-bottle: <msg>` / `bot-bottle: warning: <msg>` /
`bot-bottle: error: <msg>` lines — the 100+ existing call sites are
unaffected.
"""
from __future__ import annotations from __future__ import annotations
import os
import sys import sys
from typing import NoReturn from typing import Mapping, NoReturn
# Ordered severities. Gaps left between values so intermediate levels
# can be added later without renumbering.
DEBUG = 10
INFO = 20
WARN = 30
ERROR = 40
_LEVEL_NAMES: dict[str, int] = {
"debug": DEBUG,
"info": INFO,
"warn": WARN,
"warning": WARN,
"error": ERROR,
}
# Default threshold when BOT_BOTTLE_LOG_LEVEL is unset or unrecognised.
_DEFAULT_THRESHOLD = INFO
_LOG_LEVEL_ENV = "BOT_BOTTLE_LOG_LEVEL"
def info(msg: str) -> None: def _threshold() -> int:
print(f"bot-bottle: {msg}", file=sys.stderr) """Resolve the active level threshold from the environment.
Read per-call (not cached) so the level can be changed at runtime
and so tests can patch `os.environ` without a reload. Unknown values
fall back to the default rather than raising — logging must never be
the thing that crashes the process."""
raw = os.environ.get(_LOG_LEVEL_ENV, "")
return _LEVEL_NAMES.get(raw.strip().lower(), _DEFAULT_THRESHOLD)
def warn(msg: str) -> None: def _format_context(context: Mapping[str, object] | None) -> str:
print(f"bot-bottle: warning: {msg}", file=sys.stderr) """Render a context mapping as a ` [k=v k2=v2]` suffix.
Keys are sorted for stable, diffable output. Values that are empty or
contain whitespace or a quote are wrapped in double quotes (with inner
quotes escaped) so each `k=v` pair stays parseable. Empty/None context
renders as the empty string."""
if not context:
return ""
parts: list[str] = []
for key in sorted(context):
value = str(context[key])
if value == "" or any(ch.isspace() for ch in value) or '"' in value:
value = '"' + value.replace('"', '\\"') + '"'
parts.append(f"{key}={value}")
return " [" + " ".join(parts) + "]"
def error(msg: str) -> None: def _emit(
print(f"bot-bottle: error: {msg}", file=sys.stderr) level: int,
label: str,
msg: str,
context: Mapping[str, object] | None,
) -> None:
if level < _threshold():
return
prefix = f"{label}: " if label else ""
sys.stderr.write(f"bot-bottle: {prefix}{msg}{_format_context(context)}\n")
def debug(msg: str, *, context: Mapping[str, object] | None = None) -> None:
_emit(DEBUG, "debug", msg, context)
def info(msg: str, *, context: Mapping[str, object] | None = None) -> None:
_emit(INFO, "", msg, context)
def warn(msg: str, *, context: Mapping[str, object] | None = None) -> None:
_emit(WARN, "warning", msg, context)
def error(msg: str, *, context: Mapping[str, object] | None = None) -> None:
_emit(ERROR, "error", msg, context)
class Die(SystemExit): class Die(SystemExit):
@@ -31,6 +117,6 @@ class Die(SystemExit):
self.message = message self.message = message
def die(msg: str) -> NoReturn: def die(msg: str, *, context: Mapping[str, object] | None = None) -> NoReturn:
error(msg) error(msg, context=context)
raise Die(1, msg) raise Die(1, msg)
@@ -0,0 +1,186 @@
# PRD prd-new: Strengthen outbound exfiltration detection
- **Status:** Draft
- **Author:** claude
- **Created:** 2026-06-25
- **Issue:** #259
## Summary
The current DLP system has two detection tiers: weak (vendor token regexes)
and strong (exact matching of provisioned `EGRESS_TOKEN_*` values in multiple
encodings). This PRD adds four complementary hardening layers in ranked order
of impact:
1. **Canary tokens** — inject a per-session fake secret into the agent's
environment; any outbound appearance is a zero-false-positive exfil signal.
2. **Broadened known-value scanning** — extend the strong tier beyond
`EGRESS_TOKEN_*` via a `BOT_BOTTLE_SENSITIVE_PREFIXES` sidecar env var.
3. **Fragmentation-resistant matching** — catch separator injection and
long-substring leaks via alnum projection and sliding-window substring
search.
4. **Entropy scoring** — warn-only flag on high-entropy blobs as a
complementary signal for encrypted exfil.
## Problem
After PRD 0056 the outbound scan covers all HTTP request surfaces (host, path,
query, headers, body). But the content-based strong tier only matches
`EGRESS_TOKEN_*` values and their encodings. Three gaps remain:
| Gap | How an agent exploits it |
|-----|--------------------------|
| No canary | There is no "can't be false-positive" signal |
| Narrow prefix filter | Sensitive values provisioned under other env prefixes (MCP keys, API keys injected via the agent provider) are invisible to `scan_known_secrets` |
| Fragmentation blindness | Inserting `-`, space, or `\n` between secret characters turns an exact-match into a miss |
## Goals / Success Criteria
1. Each launched bottle has a unique canary token in the agent's environment
(`BOT_BOTTLE_CANARY`) and the egress sidecar's environment
(`EGRESS_TOKEN_CANARY`). Any outbound appearance of the canary blocks the
request with reason `"canary token"`.
2. `scan_known_secrets` accepts a `sensitive_prefixes` parameter (default:
`("EGRESS_TOKEN_",)`). `scan_outbound` reads
`BOT_BOTTLE_SENSITIVE_PREFIXES` from `environ` and merges those prefixes
in, so operators can mark additional env vars as scanned values without
changing the manifest schema.
3. For every secret that passes exact-match, a secondary alnum-projection pass
checks for the secret with all non-alphanumeric characters stripped. This
catches separator-injection evasion (`MY-SECRET` → body contains
`MY SECRET`).
4. A sliding-window partial-match pass checks for long-enough contiguous
substrings of the secret's alnum projection in the text's alnum projection.
Any match ≥ `PARTIAL_MATCH_MIN_LEN` (12 chars) blocks with reason
`"partial match"`.
5. A new `scan_entropy` detector flags outbound text windows with Shannon
entropy ≥ `ENTROPY_BLOCK_THRESHOLD` (5.5 bits/char) at **warn** severity
only. It is registered under the new detector name `"entropy"` in
`OUTBOUND_DETECTOR_NAMES` and disabled by default (routes must opt in).
6. Binary request bodies are decoded via `latin-1` instead of
`utf-8 errors="replace"`, preserving every byte value and allowing
ASCII-range secrets to be found within binary payloads.
7. All new behaviour is unit-tested; existing tests pass unchanged.
## Non-goals
- Rolling per-host buffer for split-across-requests detection (state in the
stateless addon is complex; deferred).
- Additional vendor regexes.
- ML / embedding-based detection.
- Entropy-based hard blocks (warn only per the issue).
## Design
### Canary token flow
```
Egress.prepare()
canary = secrets.token_urlsafe(32)
EgressPlan(canary=canary, ...)
Docker compose render:
sidecar env: EGRESS_TOKEN_CANARY=<canary> ← scanned by existing known-secrets detector
agent env: BOT_BOTTLE_CANARY=<canary> ← visible to agent as a "secret"
macos-container launch: same literals added to sidecar + agent env entries
```
`EGRESS_TOKEN_CANARY` matches the `EGRESS_TOKEN_` prefix already scanned by
`scan_known_secrets`, so no detector code changes are required for canary
detection — only the injection path.
### Broadened known-value scanning
`scan_known_secrets` gains a `sensitive_prefixes` parameter:
```python
def scan_known_secrets(
text: str,
*,
location: str = "body",
env: Mapping[str, str] | None = None,
sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",),
) -> ScanResult | None:
```
`scan_outbound` reads `BOT_BOTTLE_SENSITIVE_PREFIXES` (comma-separated list
of additional prefixes) from `environ` and appends them:
```python
extra = tuple(
p for p in environ.get("BOT_BOTTLE_SENSITIVE_PREFIXES", "").split(",") if p
)
sensitive_prefixes = ("EGRESS_TOKEN_",) + extra
```
`redact_tokens` receives the same treatment for consistent redaction.
### Fragmentation-resistant matching
A new helper `_alnum_projection(text)` strips all non-alphanumeric characters.
`scan_known_secrets` runs two passes per secret:
1. **Exact pass** — existing encoded-variant loop (unchanged).
2. **Alnum-projection pass** — if the secret's alnum projection has ≥ 8 chars,
check if it appears in the text's alnum projection. Match → block with
`"fragmented match (separator injection)"` reason.
3. **Partial-substring pass** — if the secret's alnum projection has ≥
`PARTIAL_MATCH_MIN_LEN` chars (12), slide a window of that length across the
secret's projection and look for each window in the text's alnum projection.
First match → block with `"partial match"` reason.
All three passes run only for the `"known_secrets"` detector; the token-pattern
and entropy detectors are unchanged.
### Entropy scoring
New public function:
```python
def scan_entropy(
text: str,
*,
location: str = "body",
window: int = ENTROPY_WINDOW, # 64
threshold: float = ENTROPY_BLOCK_THRESHOLD, # 5.5
) -> ScanResult | None:
```
Slides a window of `window` characters across `text` in steps of `window // 2`.
If any window's Shannon entropy exceeds `threshold`, returns a **warn**-severity
`ScanResult`. Never blocks.
`OUTBOUND_DETECTOR_NAMES` gains `"entropy"`. Routes opt in via their `dlp`
block; entropy scanning is **off by default** to avoid false-positive noise on
legitimate binary payloads.
### Binary body handling
In `scan_outbound`, the bytes → str decoding changes from:
```python
body.decode("utf-8", errors="replace")
```
to:
```python
body.decode("utf-8") if body is str else body.decode("latin-1")
```
`latin-1` is a bijective byte↔codepoint mapping; every byte value is preserved
as its corresponding Latin-1 code point, so ASCII-range secret strings remain
intact and `str.find` / regex still locate them correctly. The fallback from
strict UTF-8 is tried first so valid UTF-8 bodies are decoded faithfully.
## Implementation
Delivered in three commits on the same branch:
1. **DLP detector changes**`_alnum_projection`, fragmentation passes,
`scan_entropy`, broadened `scan_known_secrets`, updated `scan_outbound` and
`redact_tokens`; all accompanying unit tests.
2. **Canary injection**`EgressPlan.canary`, `Egress.prepare()`,
Docker compose + macos-container backend injection.
3. **PRD flip**`Status: Draft → Active`.
@@ -22,7 +22,7 @@ escapes**, and **whether credentials are short-lived and scoped**.
- Outbound: Docker containers have full internet access by default; no egress monitoring on most home networks - Outbound: Docker containers have full internet access by default; no egress monitoring on most home networks
- Lateral movement: compromised container can reach the LAN — NAS, other machines, internal services - Lateral movement: compromised container can reach the LAN — NAS, other machines, internal services
- Notable: CVE-2025-59536 (CVSS 8.7, Feb 2026) — a poisoned `.claude/settings.json` in a repo gives RCE when Claude Code opens it. `--dangerously-skip-permissions` removes the last gate. - Notable: CVE-2025-59536 (CVSS 8.7, Feb 2026) — a poisoned `.claude/settings.json` in a repo gives RCE when Claude Code opens it. `--dangerously-skip-permissions` removes the last gate.
- Supply chain: MCP servers, skills, and npm packages pulled during agent execution. ~20% of ClawHub skills were found malicious in early 2026. - Supply chain: MCP servers, skills, and npm packages pulled during agent execution. A Jan 2026 large-scale empirical study of a 98,380-skill snapshot confirmed 157 malicious skills, ~71% of them credential harvesters. Exfiltration was overwhelmingly naive — plaintext HTTP to hardcoded endpoints; under 10% used any code obfuscation, and concealment was mostly at the documentation level, not the code level. ([Malicious Agent Skills in the Wild](https://arxiv.org/html/2602.06547v1), arXiv:2602.06547)
**What local topology protects:** **What local topology protects:**
- No inbound attack surface — nothing listening on a public port - No inbound attack surface — nothing listening on a public port
+127
View File
@@ -0,0 +1,127 @@
"""Unit: leveled + structured logging wrappers (issue #252).
Locks three properties of bot_bottle.log:
- backward compatibility — default output is byte-identical to the
original bare wrappers, so the 100+ existing single-string call
sites are unaffected;
- context rendering — an optional mapping becomes a parseable
` [k=v ...]` suffix;
- level gating — BOT_BOTTLE_LOG_LEVEL filters by severity, debug is
silent by default, and error always surfaces.
"""
from __future__ import annotations
import contextlib
import io
import unittest
from typing import Callable
from unittest import mock
from bot_bottle import log
def _capture(
fn: Callable[..., None],
*args: object,
env: dict[str, str] | None = None,
**kwargs: object,
) -> str:
buf = io.StringIO()
patched = mock.patch.dict("os.environ", env or {}, clear=False)
with patched, contextlib.redirect_stderr(buf):
fn(*args, **kwargs)
return buf.getvalue()
class TestBackwardCompat(unittest.TestCase):
"""No context + default level → exactly the legacy lines."""
def test_info(self):
self.assertEqual("bot-bottle: hello\n", _capture(log.info, "hello"))
def test_warn(self):
self.assertEqual(
"bot-bottle: warning: careful\n", _capture(log.warn, "careful")
)
def test_error(self):
self.assertEqual(
"bot-bottle: error: boom\n", _capture(log.error, "boom")
)
class TestContext(unittest.TestCase):
def test_appends_sorted_parseable_suffix(self):
out = _capture(
log.error, "rpc failed", context={"slug": "abc123", "code": "-32603"}
)
# keys sorted: code before slug
self.assertEqual(
"bot-bottle: error: rpc failed [code=-32603 slug=abc123]\n", out
)
def test_quotes_values_with_whitespace(self):
out = _capture(
log.info, "did thing", context={"path": "/a b/c", "ok": "yes"}
)
self.assertEqual(
'bot-bottle: did thing [ok=yes path="/a b/c"]\n', out
)
def test_empty_context_is_noop_suffix(self):
self.assertEqual(
"bot-bottle: x\n", _capture(log.info, "x", context={})
)
class TestLevels(unittest.TestCase):
def test_debug_silent_by_default(self):
self.assertEqual("", _capture(log.debug, "trace"))
def test_debug_emits_when_level_lowered(self):
out = _capture(log.debug, "trace", env={"BOT_BOTTLE_LOG_LEVEL": "debug"})
self.assertEqual("bot-bottle: debug: trace\n", out)
def test_error_level_suppresses_info_and_warn(self):
env = {"BOT_BOTTLE_LOG_LEVEL": "error"}
self.assertEqual("", _capture(log.info, "i", env=env))
self.assertEqual("", _capture(log.warn, "w", env=env))
# error still surfaces — nothing sits above it
self.assertEqual(
"bot-bottle: error: e\n", _capture(log.error, "e", env=env)
)
def test_unknown_level_falls_back_to_default(self):
# garbage value → default INFO threshold, so info still prints
out = _capture(log.info, "i", env={"BOT_BOTTLE_LOG_LEVEL": "loud"})
self.assertEqual("bot-bottle: i\n", out)
def test_warning_alias_accepted(self):
env = {"BOT_BOTTLE_LOG_LEVEL": "warning"}
self.assertEqual("", _capture(log.info, "i", env=env))
self.assertEqual(
"bot-bottle: warning: w\n", _capture(log.warn, "w", env=env)
)
class TestDie(unittest.TestCase):
def test_die_still_raises_and_prints_error(self):
buf = io.StringIO()
with contextlib.redirect_stderr(buf):
with self.assertRaises(log.Die) as cm:
log.die("fatal thing")
self.assertEqual("fatal thing", cm.exception.message)
self.assertIn("bot-bottle: error: fatal thing", buf.getvalue())
def test_die_surfaces_even_at_error_level(self):
buf = io.StringIO()
with mock.patch.dict("os.environ", {"BOT_BOTTLE_LOG_LEVEL": "error"}):
with contextlib.redirect_stderr(buf):
with self.assertRaises(log.Die):
log.die("still fatal")
self.assertIn("bot-bottle: error: still fatal", buf.getvalue())
if __name__ == "__main__":
unittest.main()