Compare commits

...

25 Commits

Author SHA1 Message Date
didericis-claude 88f58bf4c0 merge: update tests/unit/test_supervise_server.py from issue-277-coverage-ci
lint / lint (push) Failing after 1m46s
test / unit (pull_request) Failing after 36s
test / integration (pull_request) Successful in 16s
2026-06-25 14:31:36 -04:00
didericis-claude ca0dc72b89 merge: update bot_bottle/cli/supervise.py from issue-277-coverage-ci
test / unit (pull_request) Successful in 41s
test / integration (pull_request) Successful in 16s
lint / lint (push) Successful in 1m46s
2026-06-25 14:31:32 -04:00
didericis-claude 2fc99ea098 merge: update .gitignore from issue-277-coverage-ci
test / unit (pull_request) Successful in 40s
test / integration (pull_request) Successful in 15s
2026-06-25 14:31:29 -04:00
didericis-claude 9a9235f2af merge: update .coveragerc from issue-277-coverage-ci 2026-06-25 14:31:27 -04:00
didericis 42f79283f0 test: fix integration coverage failures
lint / lint (push) Successful in 1m50s
test / unit (pull_request) Successful in 40s
test / integration (pull_request) Successful in 18s
2026-06-25 04:39:43 -04:00
didericis d6b9d7af3e ci: add coverage.py reporting 2026-06-25 04:08:21 -04:00
didericis 0f72843150 fix(macos-container): anchor relative Dockerfile path to build context
test / unit (pull_request) Successful in 33s
test / integration (pull_request) Successful in 17s
lint / lint (push) Successful in 1m49s
test / unit (push) Successful in 33s
test / integration (push) Successful in 18s
Update Quality Badges / update-badges (push) Successful in 1m19s
`container build` resolves -f relative to the current working directory,
not the build context, so builds failed from any cwd other than the repo
root. Anchor a relative Dockerfile to the context before passing it.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 03:27:46 -04:00
didericis fd6b14fb32 fix: route remote control through provider startup args
test / unit (pull_request) Successful in 32s
test / integration (pull_request) Successful in 18s
lint / lint (push) Successful in 1m46s
test / unit (push) Successful in 30s
test / integration (push) Successful in 17s
Update Quality Badges / update-badges (push) Successful in 1m23s
2026-06-25 03:08:47 -04:00
didericis-claude 9f9aa2e762 refactor: remove load_routes, use load_config(...).routes in tests
test / unit (pull_request) Successful in 48s
test / integration (pull_request) Successful in 26s
lint / lint (push) Successful in 1m45s
test / unit (push) Successful in 32s
test / integration (push) Successful in 17s
Update Quality Badges / update-badges (push) Successful in 1m21s
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-25 06:07:47 +00:00
didericis-codex 454baaf3a1 fix(egress): validate proposed full config
lint / lint (push) Successful in 2m23s
test / unit (pull_request) Successful in 47s
test / integration (pull_request) Successful in 28s
2026-06-25 05:25:42 +00:00
github-actions[bot] 8a092504b8 ci(prd): assign sequential numbers to new PRDs 2026-06-25 04:50:36 +00:00
didericis-codex e7dacf7d86 fix: satisfy pyright for log redaction tests
test / integration (pull_request) Successful in 29s
prd-number / assign-numbers (push) Successful in 1m6s
Update Quality Badges / update-badges (push) Successful in 1m40s
test / unit (pull_request) Successful in 52s
lint / lint (push) Successful in 2m20s
test / unit (push) Successful in 50s
test / integration (push) Successful in 27s
2026-06-25 00:32:42 -04:00
didericis-claude 9b929d0684 fix(egress): strip injected Authorization and redact bodies in LOG_FULL path
_log_request and _log_response wrote headers and bodies to stderr verbatim.
_log_request also included the sidecar-injected upstream Authorization value,
exposing live bearer tokens on every allowed request under LOG_FULL.

Apply redact_tokens to all header values and bodies in both log functions;
exclude the authorization header from _log_request entirely since its value
is always a live sidecar-injected credential by the time _log_request runs.

Closes #257
2026-06-25 00:32:42 -04:00
github-actions[bot] ec41f629a4 ci(prd): assign sequential numbers to new PRDs 2026-06-25 04:19:30 +00:00
didericis-codex d9a9eef276 docs: remove prd-new code citations
test / integration (pull_request) Successful in 46s
test / unit (pull_request) Successful in 1m4s
lint / lint (push) Successful in 2m36s
prd-number / assign-numbers (push) Successful in 1m24s
test / integration (push) Successful in 34s
test / unit (push) Successful in 52s
Update Quality Badges / update-badges (push) Successful in 2m11s
2026-06-25 03:57:41 +00:00
didericis-codex 5204b98777 refactor(egress): centralize launch env entries
lint / lint (push) Successful in 2m12s
test / unit (pull_request) Successful in 43s
test / integration (pull_request) Successful in 25s
2026-06-25 03:35:24 +00:00
didericis-codex 14ae89580a fix(egress): wire canary env for smolmachines
lint / lint (push) Successful in 2m16s
test / unit (pull_request) Successful in 42s
test / integration (pull_request) Successful in 23s
2026-06-25 03:31:51 +00:00
didericis-codex 4808ef557a fix(egress): randomize canary secret env name
lint / lint (push) Successful in 2m15s
test / unit (pull_request) Successful in 45s
test / integration (pull_request) Successful in 26s
2026-06-25 03:25:37 +00:00
didericis-codex 0a7e166b35 fix(tests): remove unused dlp entropy import
lint / lint (push) Successful in 2m8s
test / unit (pull_request) Successful in 40s
test / integration (pull_request) Successful in 23s
2026-06-24 23:09:11 -04:00
didericis-claude a920203730 fix(dlp): skip projection passes when exact variant is safe-listed
When a supervisor-approved safe-token exactly matched an env secret
(Pass 1), Passes 2 & 3 (alnum projection) still ran and re-blocked on
the same value.  Track whether any variant was found-and-approved and
skip the projection passes for that secret in that case.
2026-06-24 23:09:11 -04:00
didericis-claude e02fab15d0 docs(prd): flip prd-new-strengthen-outbound-exfil-detection Draft → Active
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-24 23:09:11 -04:00
didericis-claude 11cf12188d feat(egress): inject per-session canary token into sidecar and agent environments
EgressPlan gains a `canary: str` field (default "") populated in Egress.prepare()
using secrets.token_urlsafe(32).  Each launched bottle:

  - sidecar receives EGRESS_TOKEN_CANARY=<value> (literal env entry, scanned by
    existing known-secrets detector without any detector code changes)
  - agent receives BOT_BOTTLE_CANARY=<value> (visible fake secret that signals
    exfiltration with zero false positives if it appears in outbound traffic)

Docker compose and macos-container backends updated; smolmachines shares docker
compose and so picks this up automatically.  Unit tests cover canary uniqueness,
detection via scan_known_secrets, and EgressPlan backward-compat default.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-24 23:09:11 -04:00
didericis-claude 701df6cb2f feat(dlp): fragmentation resistance, entropy detector, broadened known-value scan
- _alnum_projection(): strip non-alphanumeric chars for separator-injection detection
- scan_known_secrets() gains two extra passes per secret after exact-variant matching:
  alnum-projection exact match (catches hyphens/spaces between secret chars) and a
  sliding-window partial-match scan (catches chunked substrings ≥ PARTIAL_MATCH_MIN_LEN)
- scan_known_secrets() accepts sensitive_prefixes param (default ("EGRESS_TOKEN_",))
  so redact_tokens and call-sites can extend the scanned env-var prefix set
- scan_entropy() warn-only detector flagging windows with Shannon entropy ≥ 5.5 bits/char
- "entropy" added to OUTBOUND_DETECTOR_NAMES; scan_outbound opts it in only when
  explicitly listed in dlp.outbound_detectors (never part of the default "all" set)
- scan_outbound reads BOT_BOTTLE_SENSITIVE_PREFIXES from environ to extend
  scan_known_secrets beyond EGRESS_TOKEN_* without schema changes
- Binary bodies decoded via latin-1 fallback (bijective byte↔codepoint) instead
  of utf-8 errors=replace, preserving ASCII secret strings in binary payloads

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-24 23:09:11 -04:00
didericis-claude ea6bc5a170 docs: draft PRD prd-new for strengthen-outbound-exfil-detection
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-24 23:09:11 -04:00
didericis ecaae708f7 feat(provider): support startup args settings
test / unit (pull_request) Successful in 41s
test / integration (pull_request) Successful in 26s
lint / lint (push) Successful in 2m12s
test / unit (push) Successful in 41s
test / integration (push) Successful in 26s
Update Quality Badges / update-badges (push) Successful in 2m9s
2026-06-24 22:51:27 -04:00
45 changed files with 1796 additions and 193 deletions
+9
View File
@@ -0,0 +1,9 @@
[run]
branch = True
source = .
[report]
omit =
bot_bottle/egress_addon.py
bot_bottle/cli/tui.py
tests/*
+7 -1
View File
@@ -39,8 +39,14 @@ jobs:
with:
python-version: "3.12"
- name: Install dev requirements
run: python3 -m pip install -r requirements-dev.txt
- name: Run unit tests
run: python3 -m unittest discover -t . -s tests/unit -v
run: python3 -m coverage run -m unittest discover -t . -s tests/unit -v
- name: Report unit coverage
run: python3 -m coverage report -m
integration:
runs-on: ubuntu-latest
+1
View File
@@ -22,3 +22,4 @@ venv/
.pytest_cache/
.mypy_cache/
.ruff_cache/
.coverage
+10 -2
View File
@@ -61,7 +61,6 @@ class AgentProviderRuntime:
prompt_mode: PromptMode
bypass_args: tuple[str, ...]
resume_args: tuple[str, ...]
remote_control_args: tuple[str, ...]
@dataclass(frozen=True)
@@ -371,6 +370,15 @@ def build_agent_provision_plan(
)
def provider_startup_args(
provider_settings: dict[str, object] | None,
) -> tuple[str, ...]:
raw = (provider_settings or {}).get("startup_args", ())
if not isinstance(raw, (list, tuple)):
return ()
return tuple(arg for arg in raw if isinstance(arg, str))
def prompt_args(
prompt_mode: PromptMode,
prompt_path: str | None,
@@ -382,7 +390,7 @@ def prompt_args(
if prompt_mode == "append_file":
return ["--append-system-prompt-file", prompt_path]
if prompt_mode == "read_prompt_file":
if argv and "resume" in argv:
if argv and ("resume" in argv or "remote-control" in argv):
return []
return [f"Read and follow the instructions in {prompt_path}."]
if prompt_mode == "print_read_prompt_file":
+1 -2
View File
@@ -109,9 +109,8 @@ class BottlePlan(ABC):
def workspace_plan(self) -> WorkspacePlan:
return workspace_plan(self.spec, guest_home=self.guest_home)
def print(self, *, remote_control: bool) -> None:
def print(self) -> None:
"""Render the y/N preflight summary to stderr."""
del remote_control
spec = self.spec
manifest = self.manifest
agent = manifest.agent
+4 -2
View File
@@ -28,6 +28,8 @@ from typing import Any
from ...egress import (
EGRESS_HOSTNAME,
EGRESS_ROUTES_IN_CONTAINER,
egress_agent_env_entries,
egress_sidecar_env_entries,
)
from ...git_gate import GIT_GATE_HOSTNAME
from ...log import die, warn
@@ -135,8 +137,7 @@ def _sidecar_bundle_service(plan: DockerBottlePlan) -> dict[str, Any]:
volumes.append(_bind(ep.mitmproxy_ca_host_path, EGRESS_CA_IN_CONTAINER))
if ep.routes:
volumes.append(_bind(ep.routes_path.parent, str(Path(EGRESS_ROUTES_IN_CONTAINER).parent)))
for token_env in sorted(ep.token_env_map.keys()):
env.append(token_env)
env.extend(egress_sidecar_env_entries(ep))
# --- git-gate -----------------------------------------------------
gp = plan.git_gate_plan
@@ -220,6 +221,7 @@ def _agent_service(plan: DockerBottlePlan) -> dict[str, Any]:
# never lands on argv or in the compose file.
for name in sorted(plan.forwarded_env.keys()):
env.append(name)
env.extend(egress_agent_env_entries(plan.egress_plan))
service: dict[str, Any] = {
"image": plan.image,
+6 -2
View File
@@ -11,7 +11,7 @@ from pathlib import Path
from ..bottle_state import egress_state_dir
from ..egress import EGRESS_ROUTES_FILENAME
from ..egress_addon_core import load_routes
from ..egress_addon_core import LOG_OFF, load_config
class EgressApplyError(RuntimeError):
@@ -33,11 +33,15 @@ class EgressApplicator(ABC):
@staticmethod
def validate_routes_content(content: str) -> None:
try:
load_routes(content)
config = load_config(content)
except ValueError as e:
raise EgressApplyError(
f"proposed routes.yaml is not valid: {e}"
) from e
if config.log != LOG_OFF:
raise EgressApplyError(
"proposed routes.yaml must not change egress logging"
)
@staticmethod
def _routes_path(slug: str) -> Path:
+8 -4
View File
@@ -22,7 +22,12 @@ from ...bottle_state import (
git_gate_state_dir,
read_committed_image,
)
from ...egress import EGRESS_ROUTES_IN_CONTAINER, egress_resolve_token_values
from ...egress import (
EGRESS_ROUTES_IN_CONTAINER,
egress_agent_env_entries,
egress_resolve_token_values,
egress_sidecar_env_entries,
)
from ...git_gate import revoke_git_gate_provisioned_keys
from ...log import die, info, warn
from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT
@@ -350,9 +355,7 @@ def _sidecar_daemons(plan: MacosContainerBottlePlan) -> tuple[str, ...]:
def _sidecar_env_entries(plan: MacosContainerBottlePlan) -> tuple[str, ...]:
env: list[str] = []
if plan.egress_plan.routes:
env.extend(sorted(plan.egress_plan.token_env_map.keys()))
env: list[str] = list(egress_sidecar_env_entries(plan.egress_plan))
if plan.git_gate_plan.upstreams:
env.append(f"BOT_BOTTLE_GIT_GATE_READY_FILE={_GIT_GATE_READY_FILE}")
if plan.supervise_plan is not None:
@@ -420,6 +423,7 @@ def _agent_env_entries(
env.append(f"{name}={value}")
for name in sorted(plan.forwarded_env.keys()):
env.append(name)
env.extend(egress_agent_env_entries(plan.egress_plan))
return tuple(env)
@@ -68,6 +68,11 @@ def build_image(ref: str, context: str, *, dockerfile: str = "") -> None:
_ensure_builder_dns()
args = [_CONTAINER, "build", "-t", ref, "--dns", dns_server()]
if dockerfile:
# `container build` resolves -f relative to the current working
# directory, not the build context. Anchor a relative Dockerfile to
# the context so builds work from any cwd.
if not os.path.isabs(dockerfile):
dockerfile = os.path.join(context, dockerfile)
args.extend(["-f", dockerfile])
args.append(context)
subprocess.run(args, check=True)
+6 -5
View File
@@ -23,7 +23,9 @@ from typing import Callable, Generator
from ...egress import (
EGRESS_ROUTES_IN_CONTAINER,
egress_agent_env_entries,
egress_resolve_token_values,
egress_sidecar_env_entries,
)
from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT
from ...util import expand_tilde
@@ -228,6 +230,9 @@ def _discover_urls(
guest_env["GIT_GATE_URL"] = f"http://{agent_git_gate_host}"
if agent_supervise_url:
guest_env["MCP_SUPERVISE_URL"] = agent_supervise_url
for entry in egress_agent_env_entries(plan.egress_plan):
name, value = entry.split("=", 1)
guest_env[name] = value
return dataclasses.replace(
plan,
@@ -316,11 +321,7 @@ def _bundle_launch_spec(
volumes.append((str(ep.mitmproxy_ca_host_path), EGRESS_CA_IN_CONTAINER, True))
if ep.routes:
volumes.append((str(ep.routes_path.parent), str(Path(EGRESS_ROUTES_IN_CONTAINER).parent), True))
# Bare-name entries for upstream-token slots. Their values
# come from the docker-run subprocess env (inherited from
# the operator's shell), never landing on argv.
for token_env in sorted(ep.token_env_map.keys()):
env.append(token_env)
env.extend(egress_sidecar_env_entries(ep))
# --- git-gate ---------------------------------------------
gp = plan.git_gate_plan
-2
View File
@@ -28,7 +28,6 @@ from .start import _launch_bottle
def cmd_resume(argv: list[str]) -> int:
parser = argparse.ArgumentParser(prog=f"{PROG} resume", add_help=True)
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--remote-control", action="store_true")
parser.add_argument(
"identity",
help="bottle identity from a prior `start` (see its session-end output)",
@@ -56,6 +55,5 @@ def cmd_resume(argv: list[str]) -> int:
return _launch_bottle(
spec,
dry_run=args.dry_run,
remote_control=args.remote_control,
backend_name=backend_name,
)
+4 -10
View File
@@ -42,7 +42,6 @@ def cmd_start(argv: list[str]) -> int:
parser = argparse.ArgumentParser(prog=f"{PROG} start", add_help=True)
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--cwd", action="store_true", help="copy host cwd into the running bottle")
parser.add_argument("--remote-control", action="store_true")
parser.add_argument(
"--backend",
choices=known_backend_names(),
@@ -89,7 +88,6 @@ def cmd_start(argv: list[str]) -> int:
return _launch_bottle(
spec,
dry_run=dry_run,
remote_control=args.remote_control,
backend_name=backend_name,
)
@@ -134,7 +132,7 @@ def prepare_with_preflight(
def attach_agent(
bottle: Bottle, *, remote_control: bool = False, resume: bool = False,
bottle: Bottle, *, resume: bool = False,
agent_provider_template: str = "claude",
startup_args: tuple[str, ...] = (),
) -> int:
@@ -153,8 +151,6 @@ def attach_agent(
"(Ctrl-D or 'exit' to leave; container will be removed)"
)
agent_args = list(runtime.bypass_args)
if remote_control:
agent_args.extend(runtime.remote_control_args)
agent_args.extend(startup_args)
if resume:
agent_args.extend(runtime.resume_args)
@@ -218,9 +214,9 @@ def _text_prompt_yes() -> bool:
return reply in ("y", "Y", "yes", "YES")
def _text_render_preflight(*, remote_control: bool):
def _text_render_preflight():
def _render(plan: DockerBottlePlan) -> None:
plan.print(remote_control=remote_control)
plan.print()
return _render
@@ -228,7 +224,6 @@ def _launch_bottle(
spec: BottleSpec,
*,
dry_run: bool,
remote_control: bool,
backend_name: str | None = None,
) -> int:
"""Shared launch core for `start` and `resume`. Builds the plan,
@@ -240,7 +235,7 @@ def _launch_bottle(
plan, identity = prepare_with_preflight(
spec,
stage_dir=stage_dir,
render_preflight=_text_render_preflight(remote_control=remote_control),
render_preflight=_text_render_preflight(),
prompt_yes=_text_prompt_yes,
dry_run=dry_run,
backend_name=backend_name,
@@ -253,7 +248,6 @@ def _launch_bottle(
agent_provider_template = getattr(plan, "agent_provider_template", "claude")
exit_code = attach_agent(
bottle,
remote_control=remote_control,
agent_provider_template=agent_provider_template,
startup_args=plan.agent_provision.startup_args,
)
+9 -36
View File
@@ -2,9 +2,8 @@
act on them (approve / modify / reject).
Curses-based TUI; modify-then-approve shells out to $EDITOR. The
approval handler wires to PRD 0016 (capability-block), which rebuilds
the bottle Dockerfile. Egress proposals are queued for operator review
as full routes.yaml updates.
Egress proposals are queued for operator review as full routes.yaml
updates.
"""
from __future__ import annotations
@@ -22,10 +21,6 @@ from pathlib import Path
from .. import supervise as _supervise
from ..bottle_state import read_metadata
# from ..backend.docker.capability_apply import (
# CapabilityApplyError,
# apply_capability_change,
# )
from ..backend.docker.egress_apply import (
EgressApplyError,
applicator as _docker_applicator,
@@ -38,10 +33,6 @@ from ..backend.smolmachines.egress_apply import (
)
from ..log import Die, error, info
class CapabilityApplyError(RuntimeError):
"""Placeholder while capability_apply is disabled."""
from ..supervise import (
COMPONENT_FOR_TOOL,
AuditEntry,
@@ -50,12 +41,10 @@ from ..supervise import (
STATUS_APPROVED,
STATUS_MODIFIED,
STATUS_REJECTED,
TOOL_CAPABILITY_BLOCK,
TOOL_EGRESS_ALLOW,
TOOL_EGRESS_BLOCK,
TOOL_GITLEAKS_ALLOW,
TOOL_EGRESS_TOKEN_ALLOW,
archive_proposal,
list_pending_proposals,
render_diff,
write_audit_entry,
@@ -83,7 +72,7 @@ class QueuedProposal:
# Errors any remediation engine may raise. Caught by the TUI key
# handlers and surfaced in the status line so a failed apply keeps
# the proposal pending rather than crashing curses.
ApplyError = (CapabilityApplyError, EgressApplyError)
ApplyError = (EgressApplyError,)
def apply_routes_change(slug: str, content: str) -> tuple[str, str]:
@@ -143,8 +132,6 @@ def _detail_lines(
def _suffix_for_tool(tool: str) -> str:
if tool == TOOL_CAPABILITY_BLOCK:
return ".dockerfile"
if tool in (TOOL_EGRESS_ALLOW, TOOL_EGRESS_BLOCK):
return ".yaml"
if tool in (TOOL_GITLEAKS_ALLOW, TOOL_EGRESS_TOKEN_ALLOW):
@@ -166,17 +153,6 @@ def approve(
file_to_apply = final_file if final_file is not None else qp.proposal.proposed_file
diff_before, diff_after = "", ""
# if qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
# _meta = read_metadata(qp.proposal.bottle_slug)
# if _meta is not None and not _meta.compose_project:
# raise CapabilityApplyError(
# "capability-block remediation is not supported for smolmachines "
# "bottles. Reject this proposal or handle the capability change "
# "manually, then restart the bottle."
# )
# diff_before, diff_after = apply_capability_change(
# qp.proposal.bottle_slug, file_to_apply,
# )
if qp.proposal.tool in (TOOL_EGRESS_ALLOW, TOOL_EGRESS_BLOCK):
diff_before, diff_after = apply_routes_change(
qp.proposal.bottle_slug,
@@ -194,9 +170,6 @@ def approve(
qp, action=status, notes=notes,
diff_before=diff_before, diff_after=diff_after,
)
if qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
archive_proposal(qp.queue_dir, qp.proposal.id)
def reject(qp: QueuedProposal, *, reason: str) -> None:
"""Write a rejection response and an audit entry."""
@@ -346,7 +319,7 @@ def _list_once() -> int:
return 0
def _try_init_green() -> int:
def _try_init_green() -> int: # pragma: no cover
"""Initialise a green color pair and return its attr, or 0."""
try:
curses.start_color()
@@ -357,7 +330,7 @@ def _try_init_green() -> int:
return 0
def _main_loop(stdscr: "curses._CursesWindow") -> None: # type: ignore
def _main_loop(stdscr: "curses._CursesWindow") -> None: # type: ignore # pragma: no cover
curses.curs_set(0)
stdscr.timeout(_REFRESH_INTERVAL_MS)
green_attr = _try_init_green()
@@ -447,7 +420,7 @@ def _render(
status_line: str,
*,
green_attr: int = 0, # noqa: F841 — unused, but required by interface
) -> None:
) -> None: # pragma: no cover
stdscr.erase()
h, w = stdscr.getmaxyx()
header = f"bot-bottle supervise ({len(pending)} pending)"
@@ -498,7 +471,7 @@ def _detail_view(
qp: QueuedProposal,
*,
green_attr: int = 0,
) -> None:
) -> None: # pragma: no cover
"""Render the full proposal. Scrollable. Press q to return."""
lines = _detail_lines(qp, green_attr=green_attr)
offset = 0
@@ -550,7 +523,7 @@ def _detail_view(
return
def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None: # type: ignore
def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None: # type: ignore # pragma: no cover
"""Suspend curses, open $EDITOR on the proposed file, return edited content."""
suffix = _suffix_for_tool(qp.proposal.tool)
curses.endwin()
@@ -561,7 +534,7 @@ def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None:
return edited
def _prompt(stdscr: "curses._CursesWindow", label: str) -> str: # type: ignore
def _prompt(stdscr: "curses._CursesWindow", label: str) -> str: # type: ignore # pragma: no cover
"""One-line input at the bottom of the screen."""
curses.curs_set(1)
h, _ = stdscr.getmaxyx()
+4 -2
View File
@@ -20,6 +20,7 @@ from ...agent_provider import (
AgentProvisionDir,
AgentProvisionFile,
AgentProvisionPlan,
provider_startup_args,
)
from ...backend.docker import util as docker_mod
from ...egress import EgressRoute
@@ -90,7 +91,6 @@ _RUNTIME = AgentProviderRuntime(
prompt_mode="append_file",
bypass_args=("--dangerously-skip-permissions",),
resume_args=("--continue",),
remote_control_args=("--remote-control",),
)
@@ -115,8 +115,9 @@ class ClaudeAgentProvider(AgentProvider):
color: str = "",
provider_settings: dict[str, object] | None = None,
) -> AgentProvisionPlan:
del forward_host_credentials, host_env, provider_settings
del forward_host_credentials, host_env
resolved_guest_env = dict(guest_env or {})
startup_args = provider_startup_args(provider_settings)
guest_home = self.guest_home
trusted_path = trusted_project_path or guest_home
@@ -199,6 +200,7 @@ class ClaudeAgentProvider(AgentProvider):
env_vars=env_vars,
guest_env=resolved_guest_env,
has_prompt=has_prompt,
startup_args=startup_args,
dirs=dirs,
files=tuple(files),
egress_routes=egress_routes,
+9 -6
View File
@@ -1,12 +1,12 @@
# bot-bottle Codex provider image.
#
# Mirrors the default Claude image shape: Node LTS, git/network tooling,
# non-root node user, and the provider CLI installed globally.
# non-root node user, and the provider CLI installed for that user.
FROM node:22-slim
RUN apt-get update \
&& apt-get install -y --no-install-recommends git ca-certificates curl \
&& apt-get install -y --no-install-recommends git ca-certificates curl procps \
&& rm -rf /var/lib/apt/lists/*
# App-specific deps. Python isn't required by codex itself
@@ -17,12 +17,15 @@ RUN apt-get update \
&& apt-get install -y --no-install-recommends python3 python3-pip python3-venv \
&& rm -rf /var/lib/apt/lists/*
RUN npm install -g --no-fund --no-audit @openai/codex@0.136.0 \
&& npm cache clean --force
USER node
WORKDIR /home/node
RUN mkdir -p /home/node/.codex
ENV PATH="/home/node/.local/bin:${PATH}"
# Remote-control support requires the standalone Codex install layout
# under ~/.codex/packages/standalone/current. The npm package can run
# the TUI, but remote-control commands expect this installer-owned path.
RUN mkdir -p /home/node/.codex \
&& curl -fsSL https://chatgpt.com/codex/install.sh | sh
CMD ["codex"]
+4 -2
View File
@@ -22,6 +22,7 @@ from ...agent_provider import (
AgentProvisionCommand,
AgentProvisionFile,
AgentProvisionPlan,
provider_startup_args,
)
from .codex_auth import codex_host_access_token, write_codex_dummy_auth_file
from ...egress import CODEX_HOST_CREDENTIAL_TOKEN_REF, EgressRoute
@@ -54,7 +55,6 @@ _RUNTIME = AgentProviderRuntime(
prompt_mode="read_prompt_file",
bypass_args=("--dangerously-bypass-approvals-and-sandbox",),
resume_args=("resume", "--last"),
remote_control_args=(),
)
@@ -79,8 +79,9 @@ class CodexAgentProvider(AgentProvider):
color: str = "",
provider_settings: dict[str, object] | None = None,
) -> AgentProvisionPlan:
del auth_token, label, color, provider_settings
del auth_token, label, color
resolved_guest_env = dict(guest_env or {})
startup_args = provider_startup_args(provider_settings)
guest_home = self.guest_home
trusted_path = trusted_project_path or guest_home
@@ -163,6 +164,7 @@ class CodexAgentProvider(AgentProvider):
env_vars=env_vars,
guest_env=resolved_guest_env,
has_prompt=has_prompt,
startup_args=startup_args,
dirs=tuple(dirs),
files=tuple(files),
pre_copy=tuple(pre_copy),
+3 -1
View File
@@ -21,6 +21,7 @@ from ...agent_provider import (
AgentProvisionDir,
AgentProvisionFile,
AgentProvisionPlan,
provider_startup_args,
)
from ...egress import EgressRoute
from ...log import die, info
@@ -165,7 +166,6 @@ _RUNTIME = AgentProviderRuntime(
prompt_mode="append_system_prompt",
bypass_args=(),
resume_args=(),
remote_control_args=(),
)
@@ -199,6 +199,7 @@ class PiAgentProvider(AgentProvider):
models_payload, base_url, api_key_env, models, provider_name = (
_pi_models_json(settings)
)
extra_startup_args = provider_startup_args(provider_settings)
models_file = state_dir / "pi-models.json"
models_file.write_text(json.dumps(models_payload, indent=2) + "\n")
models_file.chmod(0o600)
@@ -219,6 +220,7 @@ class PiAgentProvider(AgentProvider):
startup_args=(
"--models",
",".join(f"{provider_name}/{model}" for model in models),
*extra_startup_args,
),
dirs=(AgentProvisionDir(f"{guest_home}/.pi/agent"),),
files=(AgentProvisionFile(models_file, _models_path(guest_home)),),
+159 -3
View File
@@ -15,6 +15,8 @@ import gzip
import re
import typing
import unicodedata
from math import log2
from collections import Counter
from urllib.parse import quote as url_quote
try:
@@ -107,20 +109,21 @@ def redact_tokens(
text: str,
*,
env: typing.Mapping[str, str] | None = None,
sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",),
) -> str:
"""Replace token pattern matches and (if env given) provisioned secrets with REDACT."""
for _, pattern in TOKEN_PATTERNS:
text = pattern.sub(REDACT, text)
if env is not None:
for key, value in env.items():
if key.startswith("EGRESS_TOKEN_") and value:
if any(key.startswith(p) for p in sensitive_prefixes) and value:
for variant in _encoded_variants(value):
text = text.replace(variant, REDACT)
return text
# ---------------------------------------------------------------------------
# Known secrets detector (Phase 1b)
# Known secrets detector
# ---------------------------------------------------------------------------
def _encoded_variants(secret: str) -> list[str]:
@@ -161,18 +164,65 @@ def _encoded_variants(secret: str) -> list[str]:
return variants
# ---------------------------------------------------------------------------
# Fragmentation-resistant helpers
# ---------------------------------------------------------------------------
# Minimum length of alnum projection for projection-based checks to run.
# Short secrets produce too many false positives in projection space.
_ALNUM_MIN_LEN = 8
# Minimum window length for the partial-substring sliding scan.
PARTIAL_MATCH_MIN_LEN = 12
def _alnum_projection(text: str) -> str:
"""Return text with every non-alphanumeric character stripped.
Used for fragmentation-resistant matching: separator-injected secrets
(spaces, hyphens, dots inserted between characters) are identical to
their originals in alnum projection space.
"""
return "".join(c for c in text if c.isalnum())
def _find_partial_window(secret_alnum: str, text_alnum: str, min_len: int) -> int | None:
"""Return the position in text_alnum where any min_len-char window of
secret_alnum first appears, or None.
Slides a window of width min_len across secret_alnum and searches for
each window in text_alnum. The first hit position is returned.
"""
if len(secret_alnum) < min_len or len(text_alnum) < min_len:
return None
for i in range(len(secret_alnum) - min_len + 1):
window = secret_alnum[i:i + min_len]
pos = text_alnum.find(window)
if pos >= 0:
return pos
return None
def scan_known_secrets(
text: str,
*,
location: str = "body",
env: typing.Mapping[str, str] | None = None,
sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",),
safe_tokens: typing.AbstractSet[str] | None = None,
) -> ScanResult | None:
if env is None:
return None
# Pre-compute alnum projection of the scan text once; reused per secret.
text_alnum: str | None = None
for key, value in env.items():
if not key.startswith("EGRESS_TOKEN_") or not value:
if not any(key.startswith(p) for p in sensitive_prefixes) or not value:
continue
# Pass 1: exact match across encoded variants (original behaviour).
approved_exact = False
for variant in _encoded_variants(value):
pos = text.find(variant)
if pos >= 0:
@@ -180,6 +230,7 @@ def scan_known_secrets(
# (PRD 0062); a different encoding of the same secret is a
# fresh block.
if safe_tokens is not None and variant in safe_tokens:
approved_exact = True
continue
return ScanResult(
severity="block",
@@ -188,6 +239,104 @@ def scan_known_secrets(
context=_snippet(text, pos, pos + len(variant)),
matched=variant,
)
if approved_exact:
# Exact match was found and approved; projection passes would
# fire on the same value, so skip them for this secret.
continue
# Pass 2 & 3: fragmentation-resistant projection checks.
secret_alnum = _alnum_projection(value)
if len(secret_alnum) < _ALNUM_MIN_LEN:
continue
if text_alnum is None:
text_alnum = _alnum_projection(text)
# Pass 2: full alnum-projection exact match (catches separator injection).
pos2 = text_alnum.find(secret_alnum)
if pos2 >= 0:
return ScanResult(
severity="block",
reason=(
f"provisioned secret from {key} found in {location} "
f"(fragmented match — separator injection)"
),
location=location,
context=_snippet(text_alnum, pos2, pos2 + len(secret_alnum)),
)
# Pass 3: sliding-window partial match (catches chunked-substring leaks).
pos3 = _find_partial_window(secret_alnum, text_alnum, PARTIAL_MATCH_MIN_LEN)
if pos3 is not None:
return ScanResult(
severity="block",
reason=(
f"provisioned secret from {key} found in {location} "
f"(partial match — at least {PARTIAL_MATCH_MIN_LEN} consecutive "
f"alphanumeric chars)"
),
location=location,
context=_snippet(text_alnum, pos3, pos3 + PARTIAL_MATCH_MIN_LEN),
)
return None
# ---------------------------------------------------------------------------
# Entropy detector (warn-only)
# ---------------------------------------------------------------------------
# Sliding window size and step for the entropy scan.
ENTROPY_WINDOW = 64
ENTROPY_STEP = 32
# Bits-per-character threshold. Random ASCII printable ≈ 6.6 bits; random
# lowercase hex ≈ 4 bits; random base64url ≈ 6 bits. 5.5 sits above
# typical structured data (JSON, URLs) while staying below truly random
# content.
ENTROPY_BLOCK_THRESHOLD = 5.5
def _shannon_entropy(text: str) -> float:
if not text:
return 0.0
counts = Counter(text)
n = len(text)
return -sum((c / n) * log2(c / n) for c in counts.values())
def scan_entropy(
text: str,
*,
location: str = "body",
window: int = ENTROPY_WINDOW,
threshold: float = ENTROPY_BLOCK_THRESHOLD,
) -> ScanResult | None:
"""Warn-only detector: flag windows of `window` chars with Shannon entropy
above `threshold` bits per character.
Never blocks; always returns severity='warn'. Disabled by default
routes must opt in via dlp.outbound_detectors=['entropy'].
"""
if not text:
return None
step = max(1, window // 2)
end = len(text)
# Scan overlapping windows; also check the final tail if shorter than window.
positions = list(range(0, end - window + 1, step))
if end < window:
positions = [0]
elif (end - window) % step != 0:
positions.append(end - window)
for i in positions:
chunk = text[i:i + window]
if _shannon_entropy(chunk) >= threshold:
return ScanResult(
severity="warn",
reason=f"high-entropy content in {location} (possible encrypted exfil)",
location=location,
context=_snippet(text, i, i + len(chunk)),
)
return None
@@ -306,11 +455,18 @@ def scan_crlf_injection(text: str) -> ScanResult | None:
__all__ = [
"ENTROPY_BLOCK_THRESHOLD",
"ENTROPY_WINDOW",
"ENTROPY_STEP",
"PARTIAL_MATCH_MIN_LEN",
"REDACT",
"SNIPPET_CONTEXT",
"TOKEN_PATTERNS",
"_alnum_projection",
"_shannon_entropy",
"redact_tokens",
"scan_crlf_injection",
"scan_entropy",
"scan_known_secrets",
"scan_naive_injection",
"scan_token_patterns",
+55
View File
@@ -10,6 +10,7 @@ specific and lives on concrete subclasses (see
from __future__ import annotations
import dataclasses
import secrets
from abc import ABC
from dataclasses import dataclass
from pathlib import Path
@@ -34,6 +35,50 @@ EGRESS_HOSTNAME = "egress"
EGRESS_ROUTES_IN_CONTAINER = "/etc/egress/routes.yaml"
EGRESS_ROUTES_FILENAME = Path(EGRESS_ROUTES_IN_CONTAINER).name
_CANARY_ENV_WORDS = (
"ACCORD",
"ANCHOR",
"ATLAS",
"CANON",
"CIPHER",
"EMBER",
"FALCON",
"HARBOR",
"LANTERN",
"MARBLE",
"NOVA",
"ORBIT",
"PIVOT",
"RADIUS",
"SUMMIT",
"VECTOR",
)
def _random_canary_env() -> str:
first = secrets.choice(_CANARY_ENV_WORDS)
remaining = tuple(word for word in _CANARY_ENV_WORDS if word != first)
second = secrets.choice(remaining)
return f"{first}_{second}_SECRET"
def egress_sidecar_env_entries(plan: "EgressPlan") -> tuple[str, ...]:
"""Return sidecar env entries needed by egress across all backends."""
env: list[str] = []
if plan.routes:
env.extend(sorted(plan.token_env_map.keys()))
if plan.canary and plan.canary_env:
env.append(f"{plan.canary_env}={plan.canary}")
env.append(f"BOT_BOTTLE_SENSITIVE_PREFIXES={plan.canary_env}")
return tuple(env)
def egress_agent_env_entries(plan: "EgressPlan") -> tuple[str, ...]:
"""Return agent-visible egress env entries shared by all backends."""
if plan.canary and plan.canary_env:
return (f"{plan.canary_env}={plan.canary}",)
return ()
@dataclass(frozen=True)
class EgressRoute(Route):
@@ -65,6 +110,8 @@ class EgressPlan:
mitmproxy_ca_host_path: Path = Path()
mitmproxy_ca_cert_only_host_path: Path = Path()
log: int = 0
canary: str = ""
canary_env: str = ""
def egress_manifest_routes(
@@ -324,12 +371,18 @@ class Egress(ABC):
routes_path = stage_dir / EGRESS_ROUTES_FILENAME
routes_path.write_text(egress_render_routes(routes, log=log))
routes_path.chmod(0o600)
# Generate a per-session fake secret under a plausible random env name.
# The sidecar marks that exact env name as sensitive for known-secret
# scanning; the agent receives the same name/value as exfil bait.
canary = secrets.token_urlsafe(32)
return EgressPlan(
slug=slug,
routes_path=routes_path,
routes=routes,
token_env_map=egress_token_env_map(routes),
log=log,
canary=canary,
canary_env=_random_canary_env(),
)
__all__ = [
@@ -344,5 +397,7 @@ __all__ = [
"egress_render_routes",
"egress_resolve_token_values",
"egress_routes_for_bottle",
"egress_agent_env_entries",
"egress_sidecar_env_entries",
"egress_token_env_map",
]
+15 -4
View File
@@ -160,26 +160,37 @@ class EgressAddon:
)
def _log_request(self, flow: http.HTTPFlow) -> None:
headers = {
k: redact_tokens(v, env=os.environ)
for k, v in flow.request.headers.items()
if k.lower() != "authorization"
}
body = redact_tokens(flow.request.get_text(strict=False) or "", env=os.environ)
sys.stderr.write(
json.dumps({
"event": "egress_request",
"host": redact_tokens(flow.request.pretty_host, env=os.environ),
"method": flow.request.method,
"path": redact_tokens(flow.request.path, env=os.environ),
"headers": dict(flow.request.headers),
"body": flow.request.get_text(strict=False) or "",
"headers": headers,
"body": body,
})
+ "\n"
)
def _log_response(self, flow: http.HTTPFlow) -> None:
headers = {
k: redact_tokens(v, env=os.environ)
for k, v in flow.response.headers.items()
}
body = redact_tokens(flow.response.get_text(strict=False) or "", env=os.environ)
sys.stderr.write(
json.dumps({
"event": "egress_response",
"host": flow.request.pretty_host,
"status": flow.response.status_code,
"headers": dict(flow.response.headers),
"body": flow.response.get_text(strict=False) or "",
"headers": headers,
"body": body,
})
+ "\n"
)
+32 -13
View File
@@ -34,7 +34,7 @@ VALID_METHODS = frozenset({
"CONNECT",
})
OUTBOUND_DETECTOR_NAMES = frozenset({"token_patterns", "known_secrets"})
OUTBOUND_DETECTOR_NAMES = frozenset({"token_patterns", "known_secrets", "entropy"})
INBOUND_DETECTOR_NAMES = frozenset({"naive_injection_detection"})
# Per-route policy for what the proxy does when an outbound DLP detector
@@ -439,15 +439,6 @@ def route_to_yaml_dict(r: Route) -> dict[str, object]:
return d
def load_routes(text: str) -> tuple[Route, ...]:
"""Parse YAML text → routes."""
try:
payload = parse_yaml_subset(text)
except YamlSubsetError as e:
raise ValueError(f"routes payload: invalid YAML: {e}") from e
return parse_routes(payload)
def parse_config(payload: object) -> "Config":
"""Parse a full egress config payload (top-level log level + routes)."""
if not isinstance(payload, dict):
@@ -729,17 +720,28 @@ def scan_outbound(
try:
from dlp_detectors import ( # type: ignore[import-not-found]
scan_crlf_injection,
scan_entropy,
scan_known_secrets,
scan_token_patterns,
)
except ImportError: # pragma: no cover - host-side path
from .dlp_detectors import ( # type: ignore[import-not-found]
scan_crlf_injection,
scan_entropy,
scan_known_secrets,
scan_token_patterns,
)
text = body if isinstance(body, str) else body.decode("utf-8", errors="replace")
# Binary bodies: latin-1 is a bijective byte↔codepoint mapping that
# preserves every byte value, so ASCII-range secret strings remain
# findable by str.find / regex. Prefer strict UTF-8 for valid text bodies.
if isinstance(body, bytes):
try:
text = body.decode("utf-8")
except UnicodeDecodeError:
text = body.decode("latin-1")
else:
text = body
# CRLF injection is only an attack in the request line + headers, never the
# body: an HTTP body is delimited by Content-Length, so CRLF bytes there
@@ -758,12 +760,30 @@ def scan_outbound(
return result
if _detector_enabled(route.outbound_detectors, "known_secrets"):
# BOT_BOTTLE_SENSITIVE_PREFIXES lets operators add extra env prefixes
# beyond EGRESS_TOKEN_* without changing the manifest schema.
extra_raw = environ.get("BOT_BOTTLE_SENSITIVE_PREFIXES", "")
extra = tuple(p for p in extra_raw.split(",") if p)
sensitive_prefixes = ("EGRESS_TOKEN_",) + extra
result = scan_known_secrets(
text, location="body", env=environ, safe_tokens=safe_tokens,
text, location="body", env=environ,
sensitive_prefixes=sensitive_prefixes, safe_tokens=safe_tokens,
)
if result is not None:
return result
# Entropy scanning requires explicit opt-in: it is NOT part of the
# default "all detectors" set because it produces false positives on
# legitimate base64 / binary payloads. Routes must list "entropy" in
# dlp.outbound_detectors to enable it.
if (
route.outbound_detectors is not None
and "entropy" in route.outbound_detectors
):
result = scan_entropy(text, location="body")
if result is not None:
return result
return None
@@ -833,7 +853,6 @@ __all__ = [
"is_git_push_request",
"is_git_fetch_request",
"load_config",
"load_routes",
"match_route",
"outbound_scan_headers",
"parse_config",
+28 -6
View File
@@ -199,13 +199,10 @@ def _parse_provider_settings(
) -> dict[str, object]:
if raw is None:
return {}
if template != "pi":
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.settings is only "
"supported for template 'pi'"
)
settings = as_json_object(raw, f"bottle '{bottle_name}' agent_provider.settings")
allowed = {
common_allowed = {"startup_args"}
pi_allowed = {
"provider",
"base_url",
"api",
@@ -218,12 +215,37 @@ def _parse_provider_settings(
"supports_developer_role",
"supports_reasoning_effort",
}
if template == "pi":
allowed = common_allowed | pi_allowed
elif template in ("claude", "codex"):
allowed = common_allowed
elif template not in PROVIDER_TEMPLATES:
return dict(settings)
else:
allowed = common_allowed
for key in settings:
if key not in allowed:
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.settings has unknown "
f"key {key!r}; allowed: {', '.join(sorted(allowed))}"
)
startup_args = settings.get("startup_args")
if startup_args is not None:
if not isinstance(startup_args, list):
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.settings.startup_args "
f"must be an array of strings"
)
for i, arg in enumerate(startup_args):
if not isinstance(arg, str) or not arg:
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.settings."
f"startup_args[{i}] must be a non-empty string"
)
if template != "pi":
return dict(settings)
for key in ("provider", "base_url", "api", "api_key", "api_key_env"):
value = settings.get(key)
if value is not None and (not isinstance(value, str) or not value):
+8 -3
View File
@@ -47,11 +47,11 @@ from pathlib import Path
try:
# Same-directory imports inside the bundle container; these files are
# COPYed flat under /app by Dockerfile.sidecars.
from egress_addon_core import load_routes
from egress_addon_core import LOG_OFF, load_config
import supervise as _sv
except ModuleNotFoundError:
# Package imports for host-side tests and tooling.
from .egress_addon_core import load_routes
from .egress_addon_core import LOG_OFF, load_config
from . import supervise as _sv
@@ -297,12 +297,17 @@ def validate_proposed_file(tool: str, content: str) -> None:
pass
elif tool in (_sv.TOOL_EGRESS_ALLOW, _sv.TOOL_EGRESS_BLOCK):
try:
load_routes(content)
config = load_config(content)
except ValueError as e:
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: proposed routes.yaml is not valid: {e}",
) from e
if config.log != LOG_OFF:
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: proposed routes.yaml must not change egress logging",
)
else:
raise _RpcError(ERR_INVALID_PARAMS, f"unknown tool {tool!r}")
@@ -0,0 +1,189 @@
# PRD 0063: Strengthen outbound exfiltration detection
- **Status:** Active
- **Author:** claude
- **Created:** 2026-06-25
- **Issue:** #259
## Summary
The current DLP system has two detection tiers: weak (vendor token regexes)
and strong (exact matching of provisioned `EGRESS_TOKEN_*` values in multiple
encodings). This PRD adds four complementary hardening layers in ranked order
of impact:
1. **Canary tokens** — inject a per-session fake secret into the agent's
environment; any outbound appearance is a zero-false-positive exfil signal.
2. **Broadened known-value scanning** — extend the strong tier beyond
`EGRESS_TOKEN_*` via a `BOT_BOTTLE_SENSITIVE_PREFIXES` sidecar env var.
3. **Fragmentation-resistant matching** — catch separator injection and
long-substring leaks via alnum projection and sliding-window substring
search.
4. **Entropy scoring** — warn-only flag on high-entropy blobs as a
complementary signal for encrypted exfil.
## Problem
After PRD 0056 the outbound scan covers all HTTP request surfaces (host, path,
query, headers, body). But the content-based strong tier only matches
`EGRESS_TOKEN_*` values and their encodings. Three gaps remain:
| Gap | How an agent exploits it |
|-----|--------------------------|
| No canary | There is no "can't be false-positive" signal |
| Narrow prefix filter | Sensitive values provisioned under other env prefixes (MCP keys, API keys injected via the agent provider) are invisible to `scan_known_secrets` |
| Fragmentation blindness | Inserting `-`, space, or `\n` between secret characters turns an exact-match into a miss |
## Goals / Success Criteria
1. Each launched bottle has a unique canary token in the agent's environment
under a randomized `WORD_WORD_SECRET` env var name. The egress sidecar gets
the same env var and registers that exact name through
`BOT_BOTTLE_SENSITIVE_PREFIXES`. Any outbound appearance of the canary
blocks the request as a known-secret match.
2. `scan_known_secrets` accepts a `sensitive_prefixes` parameter (default:
`("EGRESS_TOKEN_",)`). `scan_outbound` reads
`BOT_BOTTLE_SENSITIVE_PREFIXES` from `environ` and merges those prefixes
in, so operators can mark additional env vars as scanned values without
changing the manifest schema.
3. For every secret that passes exact-match, a secondary alnum-projection pass
checks for the secret with all non-alphanumeric characters stripped. This
catches separator-injection evasion (`MY-SECRET` → body contains
`MY SECRET`).
4. A sliding-window partial-match pass checks for long-enough contiguous
substrings of the secret's alnum projection in the text's alnum projection.
Any match ≥ `PARTIAL_MATCH_MIN_LEN` (12 chars) blocks with reason
`"partial match"`.
5. A new `scan_entropy` detector flags outbound text windows with Shannon
entropy ≥ `ENTROPY_BLOCK_THRESHOLD` (5.5 bits/char) at **warn** severity
only. It is registered under the new detector name `"entropy"` in
`OUTBOUND_DETECTOR_NAMES` and disabled by default (routes must opt in).
6. Binary request bodies are decoded via `latin-1` instead of
`utf-8 errors="replace"`, preserving every byte value and allowing
ASCII-range secrets to be found within binary payloads.
7. All new behaviour is unit-tested; existing tests pass unchanged.
## Non-goals
- Rolling per-host buffer for split-across-requests detection (state in the
stateless addon is complex; deferred).
- Additional vendor regexes.
- ML / embedding-based detection.
- Entropy-based hard blocks (warn only per the issue).
## Design
### Canary token flow
```
Egress.prepare()
canary = secrets.token_urlsafe(32)
canary_env = <random WORD_WORD_SECRET>
EgressPlan(canary=canary, canary_env=canary_env, ...)
Docker compose render:
sidecar env: <canary_env>=<canary>
sidecar env: BOT_BOTTLE_SENSITIVE_PREFIXES=<canary_env>
agent env: <canary_env>=<canary> ← visible to agent as a "secret"
macos-container launch: same literals added to sidecar + agent env entries
```
The sidecar uses `BOT_BOTTLE_SENSITIVE_PREFIXES` to make the random canary env
name part of the existing `scan_known_secrets` detector without adding a
manifest schema field.
### Broadened known-value scanning
`scan_known_secrets` gains a `sensitive_prefixes` parameter:
```python
def scan_known_secrets(
text: str,
*,
location: str = "body",
env: Mapping[str, str] | None = None,
sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",),
) -> ScanResult | None:
```
`scan_outbound` reads `BOT_BOTTLE_SENSITIVE_PREFIXES` (comma-separated list
of additional prefixes) from `environ` and appends them:
```python
extra = tuple(
p for p in environ.get("BOT_BOTTLE_SENSITIVE_PREFIXES", "").split(",") if p
)
sensitive_prefixes = ("EGRESS_TOKEN_",) + extra
```
`redact_tokens` receives the same treatment for consistent redaction.
### Fragmentation-resistant matching
A new helper `_alnum_projection(text)` strips all non-alphanumeric characters.
`scan_known_secrets` runs two passes per secret:
1. **Exact pass** — existing encoded-variant loop (unchanged).
2. **Alnum-projection pass** — if the secret's alnum projection has ≥ 8 chars,
check if it appears in the text's alnum projection. Match → block with
`"fragmented match (separator injection)"` reason.
3. **Partial-substring pass** — if the secret's alnum projection has ≥
`PARTIAL_MATCH_MIN_LEN` chars (12), slide a window of that length across the
secret's projection and look for each window in the text's alnum projection.
First match → block with `"partial match"` reason.
All three passes run only for the `"known_secrets"` detector; the token-pattern
and entropy detectors are unchanged.
### Entropy scoring
New public function:
```python
def scan_entropy(
text: str,
*,
location: str = "body",
window: int = ENTROPY_WINDOW, # 64
threshold: float = ENTROPY_BLOCK_THRESHOLD, # 5.5
) -> ScanResult | None:
```
Slides a window of `window` characters across `text` in steps of `window // 2`.
If any window's Shannon entropy exceeds `threshold`, returns a **warn**-severity
`ScanResult`. Never blocks.
`OUTBOUND_DETECTOR_NAMES` gains `"entropy"`. Routes opt in via their `dlp`
block; entropy scanning is **off by default** to avoid false-positive noise on
legitimate binary payloads.
### Binary body handling
In `scan_outbound`, the bytes → str decoding changes from:
```python
body.decode("utf-8", errors="replace")
```
to:
```python
body.decode("utf-8") if body is str else body.decode("latin-1")
```
`latin-1` is a bijective byte↔codepoint mapping; every byte value is preserved
as its corresponding Latin-1 code point, so ASCII-range secret strings remain
intact and `str.find` / regex still locate them correctly. The fallback from
strict UTF-8 is tried first so valid UTF-8 bodies are decoded faithfully.
## Implementation
Delivered in three commits on the same branch:
1. **DLP detector changes**`_alnum_projection`, fragmentation passes,
`scan_entropy`, broadened `scan_known_secrets`, updated `scan_outbound` and
`redact_tokens`; all accompanying unit tests.
2. **Canary injection**`EgressPlan.canary`, `Egress.prepare()`,
Docker compose + macos-container backend injection.
3. **PRD flip**`Status: Draft → Active`.
@@ -0,0 +1,85 @@
# PRD 0064: LOG_FULL egress logging credential redaction
- **Status:** Active
- **Author:** claude
- **Created:** 2026-06-25
- **Issue:** #257
## Summary
The `LOG_FULL` egress logging path (`_log_request` and `_log_response` in `egress_addon.py`) writes request/response headers and bodies to stderr without redaction and includes the sidecar-injected upstream `Authorization` header verbatim. This PR applies `redact_tokens` to header values and bodies in both log functions and strips the injected `Authorization` header from request logs entirely.
## Problem
`LOG_FULL` (log level 2) is intended for debugging egress traffic. When active it calls `_log_request` and `_log_response`. Both functions have two related bugs:
1. **Injected `Authorization` header exposure.** `_log_request` is called *after* the sidecar injects upstream credentials (`flow.request.headers["authorization"] = decision.inject_authorization`). The full header dict — including the live credential — is serialized to stderr. Any log collector that ingests the egress container's stderr will receive the upstream bearer token in plaintext.
2. **Unredacted bodies and header values.** Neither `_log_request` nor `_log_response` passes body or header values through `redact_tokens`. By contrast, `_req_ctx` (used for block/warn events) already calls `redact_tokens` on path and host. Any provisioned secret or recognized token pattern that appears in a request body, response body, or non-Authorization header value will be logged verbatim under `LOG_FULL`.
These two bugs compose: an agent that enables `LOG_FULL` and simultaneously triggers a request that carries a known token gains a write path from credentials → egress logs.
## Goals / Success Criteria
- `_log_request` never logs the `authorization` header in any form.
- `_log_request` applies `redact_tokens(value, env=os.environ)` to every other header value before serializing.
- `_log_request` applies `redact_tokens(body, env=os.environ)` to the request body before logging.
- `_log_response` applies `redact_tokens(value, env=os.environ)` to every response header value before logging.
- `_log_response` applies `redact_tokens(body, env=os.environ)` to the response body before logging.
- Unit tests cover each of the five cases above.
## Non-goals
- Redacting host or path in the full-log path (already covered by `_req_ctx` for block/warn events; `_log_request` already calls `redact_tokens` on host and path).
- Suppressing `LOG_FULL` or adding a new log level.
- Changing the outbound DLP scan logic.
## Design
### `_log_request`
```python
def _log_request(self, flow: http.HTTPFlow) -> None:
headers = {
k: redact_tokens(v, env=os.environ)
for k, v in flow.request.headers.items()
if k.lower() != "authorization"
}
body = redact_tokens(flow.request.get_text(strict=False) or "", env=os.environ)
sys.stderr.write(
json.dumps({
"event": "egress_request",
"host": redact_tokens(flow.request.pretty_host, env=os.environ),
"method": flow.request.method,
"path": redact_tokens(flow.request.path, env=os.environ),
"headers": headers,
"body": body,
})
+ "\n"
)
```
The `authorization` key is excluded because by the time `_log_request` is called the sidecar has already injected the upstream credential (`decision.inject_authorization`). Logging it would write a live bearer token to stderr on every allowed request. There is no safe subset to log — the value is always a live credential or empty.
### `_log_response`
```python
def _log_response(self, flow: http.HTTPFlow) -> None:
headers = {
k: redact_tokens(v, env=os.environ)
for k, v in flow.response.headers.items()
}
body = redact_tokens(flow.response.get_text(strict=False) or "", env=os.environ)
sys.stderr.write(
json.dumps({
"event": "egress_response",
"host": flow.request.pretty_host,
"status": flow.response.status_code,
"headers": headers,
"body": body,
})
+ "\n"
)
```
Response headers don't carry injected credentials, so no header name is suppressed — only the values are scrubbed by `redact_tokens`.
+1
View File
@@ -4,3 +4,4 @@
pylint>=3.0.0
pyright>=1.1.300
coverage>=7.0.0
+7 -4
View File
@@ -92,9 +92,9 @@ class TestSandboxEscape(unittest.TestCase):
"on PATH: curl -sSL https://smolmachines.com/install.sh | sh"
)
# Throwaway "identity file" for the git-gate's `identity` field.
# It need not be a real SSH key: test 5 reaches gitleaks before
# any SSH attempt anyway.
# Throwaway static key for the git-gate fixture. It need not
# be a real SSH key: test 5 reaches gitleaks before any SSH
# attempt anyway.
fd, kp = tempfile.mkstemp(prefix="sandbox-test-key.")
os.close(fd)
cls._key_path = Path(kp)
@@ -123,7 +123,10 @@ class TestSandboxEscape(unittest.TestCase):
"git-gate": {"repos": {
"throwaway": {
"url": "ssh://git@unreachable.invalid:22/throwaway.git",
"identity": str(cls._key_path),
"key": {
"provider": "static",
"path": str(cls._key_path),
},
},
}},
},
@@ -198,6 +198,7 @@ class TestSmolmachinesLaunch(unittest.TestCase):
# connect fails, which is the property chunk 3 will
# preserve once egress is actually running.
r = self.bottle.exec(
"env -u HTTPS_PROXY -u HTTP_PROXY -u https_proxy -u http_proxy "
f"curl -s --show-error --max-time 3 http://{self.plan.bundle_ip}:9099 "
"2>&1 || true"
)
+46
View File
@@ -168,6 +168,34 @@ class TestAgentProviderRuntime(unittest.TestCase):
self.assertEqual("~/.claude/statusline.sh", settings["statusLine"]["command"])
self.assertEqual("custom:bot-bottle-research-ui", settings["theme"])
def test_claude_plan_uses_startup_args_from_provider_settings(self):
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
plan = build_agent_provision_plan(
template="claude",
dockerfile="",
state_dir=Path(tmp),
instance_name="bot-bottle-test",
prompt_file=Path(tmp) / "prompt.txt",
provider_settings={
"startup_args": ["--model", "opus"],
},
)
self.assertEqual(("--model", "opus"), plan.startup_args)
def test_codex_plan_uses_startup_args_from_provider_settings(self):
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
plan = build_agent_provision_plan(
template="codex",
dockerfile="",
state_dir=Path(tmp),
instance_name="bot-bottle-test",
prompt_file=Path(tmp) / "prompt.txt",
provider_settings={
"startup_args": ["--model", "gpt-5-codex"],
},
)
self.assertEqual(("--model", "gpt-5-codex"), plan.startup_args)
def test_codex_forward_host_credentials_populates_egress_routes(self):
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
home = Path(tmp) / "host-codex"
@@ -394,6 +422,24 @@ class TestAgentProviderRuntime(unittest.TestCase):
self.assertNotIn("OPENROUTER_API_KEY", plan.guest_env)
self.assertTrue(provider["compat"]["supportsReasoningEffort"])
def test_pi_plan_appends_startup_args_from_provider_settings(self):
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
plan = build_agent_provision_plan(
template="pi",
dockerfile="",
state_dir=Path(tmp),
instance_name="bot-bottle-test",
prompt_file=Path(tmp) / "prompt.txt",
provider_settings={
"models": ["qwen3:14b"],
"startup_args": ["--no-stream"],
},
)
self.assertEqual(
("--models", "ollama/qwen3:14b", "--no-stream"),
plan.startup_args,
)
def test_pi_prompt_mode_appends_system_prompt_interactively(self):
self.assertEqual(
["--append-system-prompt", "/home/node/.bot-bottle-prompt.txt"],
+21
View File
@@ -102,6 +102,27 @@ class TestAttachAgent(unittest.TestCase):
bottle.argv,
)
def test_remote_control_is_provider_startup_arg(self):
class Bottle:
argv: list[str] = []
def exec_agent(self, argv: list[str], *, tty: bool = True) -> int:
self.argv = list(argv)
return 0
bottle = Bottle()
exit_code = start_mod.attach_agent(
bottle, # type: ignore[arg-type]
agent_provider_template="codex",
startup_args=("remote-control",),
)
self.assertEqual(0, exit_code)
self.assertEqual(
["--dangerously-bypass-approvals-and-sandbox", "remote-control"],
bottle.argv,
)
if __name__ == "__main__":
unittest.main()
+23 -2
View File
@@ -80,7 +80,11 @@ def _git_gate_plan(upstreams: tuple[GitGateUpstream, ...] = ()) -> GitGatePlan:
)
def _egress_plan(routes: tuple[EgressRoute, ...] = ()) -> EgressPlan:
def _egress_plan(
routes: tuple[EgressRoute, ...] = (),
*,
canary: bool = False,
) -> EgressPlan:
token_env_map = {
r.token_env: r.token_ref
for r in routes
@@ -95,6 +99,8 @@ def _egress_plan(routes: tuple[EgressRoute, ...] = ()) -> EgressPlan:
egress_network=f"bot-bottle-egress-{SLUG}",
mitmproxy_ca_host_path=STATE / "egress-ca" / "mitmproxy-ca.pem",
mitmproxy_ca_cert_only_host_path=STATE / "egress-ca" / "ca.pem",
canary="fake-canary-value" if canary else "",
canary_env="CANON_ALPHA_SECRET" if canary else "",
)
@@ -112,6 +118,7 @@ def _plan(
with_git: bool = False,
with_egress: bool = False,
supervise: bool = False,
canary: bool = False,
) -> DockerBottlePlan:
"""Build a fully-resolved DockerBottlePlan. Toggles cover the
matrix the renderer's conditional-service logic branches on."""
@@ -150,7 +157,7 @@ def _plan(
slug=SLUG,
forwarded_env={"CLAUDE_CODE_OAUTH_TOKEN": "x"},
git_gate_plan=_git_gate_plan(upstreams),
egress_plan=_egress_plan(routes),
egress_plan=_egress_plan(routes, canary=canary),
supervise_plan=_supervise_plan() if supervise else None,
use_runsc=False,
agent_provision=AgentProvisionPlan(
@@ -375,6 +382,20 @@ class TestSidecarBundleShape(unittest.TestCase):
env_strings = sc["environment"]
self.assertNotIn("EGRESS_TOKEN_0", env_strings)
def test_canary_env_registered_as_sensitive_in_sidecar(self):
sc = self._render(canary=True)["services"]["sidecars"]
env_strings = sc["environment"]
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", env_strings)
self.assertIn(
"BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET",
env_strings,
)
def test_canary_env_visible_to_agent(self):
agent = self._render(canary=True)["services"]["agent"]
env_strings = agent["environment"]
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", env_strings)
def test_supervise_env_present_when_active(self):
sc = self._render(supervise=True)["services"]["sidecars"]
env_strings = sc["environment"]
@@ -29,6 +29,9 @@ from bot_bottle.supervise import SupervisePlan
_URL = "http://supervise:9100/"
_CODEX_DOCKERFILE = (
Path(__file__).resolve().parents[2] / "bot_bottle/contrib/codex/Dockerfile"
)
def _make_bottle(exec_result: ExecResult | None = None) -> MagicMock:
@@ -276,6 +279,12 @@ class TestCodexProvision(unittest.TestCase):
)
class TestCodexDockerfile(unittest.TestCase):
def test_installs_procps_for_remote_control_pid_management(self):
dockerfile = _CODEX_DOCKERFILE.read_text()
self.assertIn("procps", dockerfile)
class TestCodexSuperviseMcp(unittest.TestCase):
def test_noop_when_supervise_disabled(self):
bottle = _make_bottle()
+192 -2
View File
@@ -1,18 +1,23 @@
"""Unit: DLP detectors (PRD 0053).
Tests for token pattern scanning, known secret detection, and
naive prompt injection detection."""
Tests for token pattern scanning, known secret detection, fragmentation-
resistant matching, entropy scoring, and naive prompt injection detection."""
import base64
import gzip
import unittest
from bot_bottle.dlp_detectors import (
ENTROPY_BLOCK_THRESHOLD,
PARTIAL_MATCH_MIN_LEN,
REDACT,
_alnum_projection,
_encoded_variants,
_normalize_text,
_shannon_entropy,
redact_tokens,
scan_crlf_injection,
scan_entropy,
scan_known_secrets,
scan_naive_injection,
scan_token_patterns,
@@ -502,6 +507,191 @@ class TestStripCrlf(unittest.TestCase):
from bot_bottle.dlp_detectors import strip_crlf
self.assertEqual("/api/v1/data?q=hello", strip_crlf("/api/v1/data?q=hello"))
class TestAlnumProjection(unittest.TestCase):
def test_alphanumeric_unchanged(self):
self.assertEqual("abc123XYZ", _alnum_projection("abc123XYZ"))
def test_strips_hyphens(self):
self.assertEqual("mysecretvalue", _alnum_projection("my-secret-value"))
def test_strips_spaces(self):
self.assertEqual("mysecretvalue", _alnum_projection("my secret value"))
def test_strips_dots_and_underscores(self):
self.assertEqual("mysecretvalue", _alnum_projection("my.secret_value"))
def test_empty_string(self):
self.assertEqual("", _alnum_projection(""))
def test_all_special_chars(self):
self.assertEqual("", _alnum_projection("!@#$%^&*()"))
class TestFragmentationResistantMatching(unittest.TestCase):
"""scan_known_secrets catches separator-injection and partial-substring evasion."""
# Secrets long enough that their alnum projections are ≥ 8 chars.
SECRET = "supersecrettoken99"
ENV = {"EGRESS_TOKEN_0": SECRET}
def test_exact_match_still_works(self):
result = scan_known_secrets(f"key={self.SECRET}", env=self.ENV)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_separator_injection_blocked(self):
# Hyphens inserted between chars of the secret.
fragmented = "-".join(self.SECRET)
result = scan_known_secrets(f"data={fragmented}", env=self.ENV)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
self.assertIn("separator injection", result.reason)
def test_space_separator_blocked(self):
fragmented = " ".join(self.SECRET)
result = scan_known_secrets(f"body: {fragmented}", env=self.ENV)
self.assertIsNotNone(result)
assert result is not None
self.assertIn("separator injection", result.reason)
def test_partial_substring_blocked(self):
# First PARTIAL_MATCH_MIN_LEN alnum chars of the secret, no separators.
partial = _alnum_projection(self.SECRET)[:PARTIAL_MATCH_MIN_LEN]
result = scan_known_secrets(f"x={partial}&y=other", env=self.ENV)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
self.assertIn("partial match", result.reason)
def test_short_secret_skips_projection(self):
# Secrets shorter than _ALNUM_MIN_LEN in alnum projection are not
# fragmentation-checked (too many false positives).
short_env = {"EGRESS_TOKEN_0": "abc"}
# "a b c" has alnum projection "abc" (3 chars, < 8); should not block.
self.assertIsNone(scan_known_secrets("a b c", env=short_env))
def test_clean_text_not_blocked(self):
self.assertIsNone(scan_known_secrets("nothing to see here", env=self.ENV))
def test_sensitive_prefixes_param_extra_prefix(self):
env = {"MY_CRED_0": self.SECRET, "IGNORED": "other"}
result = scan_known_secrets(
f"key={self.SECRET}",
env=env,
sensitive_prefixes=("MY_CRED_",),
)
self.assertIsNotNone(result)
assert result is not None
self.assertIn("MY_CRED_0", result.reason)
def test_sensitive_prefixes_default_only_egress_token(self):
# A value under a non-EGRESS_TOKEN_ key is ignored with default prefixes.
env = {"MY_CRED_0": self.SECRET}
self.assertIsNone(scan_known_secrets(f"key={self.SECRET}", env=env))
def test_canary_prefix_detected(self):
canary_value = "canary-fake-secret-value-xyz"
env = {"CANON_ALPHA_SECRET": canary_value}
result = scan_known_secrets(
f"x={canary_value}",
env=env,
sensitive_prefixes=("CANON_ALPHA_SECRET",),
)
self.assertIsNotNone(result)
assert result is not None
self.assertIn("CANON_ALPHA_SECRET", result.reason)
class TestRedactTokensBroadenedPrefixes(unittest.TestCase):
SECRET = "my-provisioned-secret"
def test_default_redacts_egress_token(self):
env = {"EGRESS_TOKEN_0": self.SECRET}
out = redact_tokens(f"val={self.SECRET}", env=env)
self.assertNotIn(self.SECRET, out)
self.assertIn(REDACT, out)
def test_extra_prefix_redacted(self):
env = {"MY_SECRET_KEY": self.SECRET}
out = redact_tokens(
f"val={self.SECRET}",
env=env,
sensitive_prefixes=("MY_SECRET_",),
)
self.assertNotIn(self.SECRET, out)
self.assertIn(REDACT, out)
def test_non_matching_prefix_not_redacted(self):
env = {"MY_SECRET_KEY": self.SECRET}
out = redact_tokens(f"val={self.SECRET}", env=env)
# Default prefixes only include EGRESS_TOKEN_ → secret not redacted
self.assertIn(self.SECRET, out)
class TestShannonEntropy(unittest.TestCase):
def test_empty_string_zero(self):
self.assertEqual(0.0, _shannon_entropy(""))
def test_single_char_zero(self):
self.assertEqual(0.0, _shannon_entropy("aaaaaa"))
def test_two_equal_chars_one_bit(self):
self.assertAlmostEqual(1.0, _shannon_entropy("abababab"), places=10)
def test_high_entropy_random_like(self):
# Uniform 64-char string over 64 distinct symbols has entropy 6 bits.
import string
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
text = alphabet # each char appears exactly once
self.assertAlmostEqual(6.0, _shannon_entropy(text), places=10)
class TestScanEntropy(unittest.TestCase):
def test_empty_returns_none(self):
self.assertIsNone(scan_entropy(""))
def test_low_entropy_returns_none(self):
# Highly repetitive text has low entropy.
self.assertIsNone(scan_entropy("a" * 200))
def test_high_entropy_warns(self):
# Build a 64-char string with entropy > ENTROPY_BLOCK_THRESHOLD.
# Use all 64 distinct printable chars to maximise entropy (~6 bits).
import string
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
result = scan_entropy(alphabet, threshold=ENTROPY_BLOCK_THRESHOLD)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("warn", result.severity)
self.assertIn("high-entropy", result.reason)
def test_never_blocks(self):
import string
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
result = scan_entropy(alphabet)
# scan_entropy is warn-only; it must never return severity="block".
if result is not None:
self.assertNotEqual("block", result.severity)
def test_location_in_result(self):
import string
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
result = scan_entropy(alphabet, location="authorization header")
if result is not None:
self.assertIn("authorization header", result.location)
def test_structured_json_no_warn(self):
# Typical JSON has low entropy and should not be flagged.
json_body = '{"status": "ok", "message": "hello world", "count": 42}'
self.assertIsNone(scan_entropy(json_body))
def test_short_text_below_window(self):
# Text shorter than the window: checked as one chunk.
# Use a uniform string to ensure it won't be flagged.
self.assertIsNone(scan_entropy("abcde", threshold=ENTROPY_BLOCK_THRESHOLD))
if __name__ == "__main__":
unittest.main()
+10
View File
@@ -136,6 +136,16 @@ class TestClaudeArgv(unittest.TestCase):
argv,
)
def test_codex_remote_control_startup_arg_does_not_receive_initial_prompt(self):
argv = _codex_bottle("/home/node/.bot-bottle-prompt.txt").agent_argv(
["--dangerously-bypass-approvals-and-sandbox", "remote-control"],
)
self.assertEqual(
["docker", "exec", "-it", "bot-bottle-dev-abc", "codex",
"--dangerously-bypass-approvals-and-sandbox", "remote-control"],
argv,
)
def test_codex_resume_does_not_append_initial_prompt(self):
argv = _codex_bottle("/home/node/.bot-bottle-prompt.txt").agent_argv(
["--dangerously-bypass-approvals-and-sandbox", "resume", "--last"],
@@ -31,7 +31,6 @@ class _Provider(AgentProvider):
return AgentProviderRuntime(
template="test", command="test", image="",
prompt_mode="append_file", bypass_args=(), resume_args=(),
remote_control_args=(),
)
def provision_plan(self, **kwargs): # type: ignore[override]
raise NotImplementedError
+128 -8
View File
@@ -1,15 +1,21 @@
"""Unit: Egress route lift + routes.yaml render + token
resolution (PRD 0017, PRD 0053)."""
import tempfile
import unittest
from pathlib import Path
from bot_bottle.egress import (
CODEX_HOST_CREDENTIAL_TOKEN_REF,
Egress,
EgressPlan,
EgressRoute,
egress_agent_env_entries,
egress_manifest_routes,
egress_render_routes,
egress_resolve_token_values,
egress_routes_for_bottle,
egress_sidecar_env_entries,
egress_token_env_map,
)
from bot_bottle.log import Die
@@ -316,7 +322,7 @@ class TestRenderRoutes(unittest.TestCase):
self.assertEqual([], parse_yaml_subset(rendered)["routes"])
def test_round_trip_through_addon_core(self):
from bot_bottle.egress_addon_core import load_routes
from bot_bottle.egress_addon_core import load_config
b = _bottle([
{"host": "api.github.com",
"auth": {"scheme": "Bearer", "token_ref": "GH_PAT"},
@@ -327,7 +333,7 @@ class TestRenderRoutes(unittest.TestCase):
{"host": "api.anthropic.com"},
])
routes = egress_routes_for_bottle(b)
addon_routes = load_routes(egress_render_routes(routes))
addon_routes = load_config(egress_render_routes(routes)).routes
self.assertEqual(3, len(addon_routes))
self.assertEqual("Bearer", addon_routes[0].auth_scheme)
self.assertEqual("EGRESS_TOKEN_0", addon_routes[0].token_env)
@@ -335,26 +341,26 @@ class TestRenderRoutes(unittest.TestCase):
self.assertEqual("", addon_routes[2].auth_scheme)
def test_dlp_round_trips(self):
from bot_bottle.egress_addon_core import load_routes
from bot_bottle.egress_addon_core import load_config
b = _bottle([{"host": "x.example", "dlp": {
"outbound_detectors": ["token_patterns"],
"inbound_detectors": False,
}}])
routes = egress_routes_for_bottle(b)
rendered = egress_render_routes(routes)
addon_routes = load_routes(rendered)
addon_routes = load_config(rendered).routes
self.assertEqual(("token_patterns",), addon_routes[0].outbound_detectors)
self.assertEqual((), addon_routes[0].inbound_detectors)
def test_outbound_on_match_round_trips(self):
from bot_bottle.egress_addon_core import load_routes
from bot_bottle.egress_addon_core import load_config
b = _bottle([{"host": "logs.example", "dlp": {
"outbound_on_match": "redact",
}}])
routes = egress_routes_for_bottle(b)
rendered = egress_render_routes(routes)
self.assertIn('outbound_on_match: "redact"', rendered)
addon_routes = load_routes(rendered)
addon_routes = load_config(rendered).routes
self.assertEqual("redact", addon_routes[0].outbound_on_match)
def test_outbound_on_match_default_omitted_from_render(self):
@@ -364,12 +370,12 @@ class TestRenderRoutes(unittest.TestCase):
self.assertNotIn("outbound_on_match", rendered)
def test_git_fetch_policy_round_trips(self):
from bot_bottle.egress_addon_core import load_routes
from bot_bottle.egress_addon_core import load_config
b = _bottle([{"host": "github.com", "git": {"fetch": True}}])
routes = egress_routes_for_bottle(b)
rendered = egress_render_routes(routes)
self.assertEqual({"fetch": True}, self._parsed(routes)[0]["git"])
addon_routes = load_routes(rendered)
addon_routes = load_config(rendered).routes
self.assertTrue(addon_routes[0].git_fetch)
def test_log_zero_omitted_from_render(self):
@@ -443,5 +449,119 @@ class TestResolveTokenValues(unittest.TestCase):
self.assertEqual({"EGRESS_TOKEN_0": "codex-access-token"}, out)
class TestCanaryGeneration(unittest.TestCase):
"""Egress.prepare() generates a unique canary token per session."""
def _bottle_obj(self):
return ManifestIndex.from_json_obj({
"bottles": {"dev": {"egress": {"routes": []}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
}).bottles["dev"]
def _make_plan(self) -> EgressPlan:
# Use a concrete no-op subclass so we can call prepare() without
# a real backend.
class _TestEgress(Egress):
pass
e = _TestEgress()
with tempfile.TemporaryDirectory() as td:
return e.prepare(self._bottle_obj(), "test-slug", Path(td))
def test_canary_is_non_empty(self):
plan = self._make_plan()
self.assertIsInstance(plan.canary, str)
self.assertGreater(len(plan.canary), 0)
self.assertRegex(plan.canary_env, r"^[A-Z]+_[A-Z]+_SECRET$")
def test_canary_is_unique_per_session(self):
with tempfile.TemporaryDirectory() as td:
bottle = self._bottle_obj()
class _TestEgress(Egress):
pass
e = _TestEgress()
plan_a = e.prepare(bottle, "slug-a", Path(td))
plan_b = e.prepare(bottle, "slug-b", Path(td))
self.assertNotEqual(plan_a.canary, plan_b.canary)
def test_canary_detected_by_scan_known_secrets(self):
from bot_bottle.dlp_detectors import scan_known_secrets
plan = self._make_plan()
env = {plan.canary_env: plan.canary}
result = scan_known_secrets(
f"exfil={plan.canary}",
env=env,
sensitive_prefixes=(plan.canary_env,),
)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
self.assertIn(plan.canary_env, result.reason)
def test_egress_plan_canary_field_default_empty(self):
# Verify EgressPlan can be constructed with an empty canary (backward compat).
from pathlib import Path
plan = EgressPlan(
slug="s",
routes_path=Path("/tmp/r.yaml"),
routes=(),
token_env_map={},
)
self.assertEqual("", plan.canary)
self.assertEqual("", plan.canary_env)
class TestEgressEnvEntries(unittest.TestCase):
def test_sidecar_entries_include_route_tokens_and_canary_scan_prefix(self):
plan = EgressPlan(
slug="s",
routes_path=Path("/tmp/r.yaml"),
routes=(EgressRoute(host="api.example"),),
token_env_map={"EGRESS_TOKEN_1": "T1", "EGRESS_TOKEN_0": "T0"},
canary="fake-canary-value",
canary_env="CANON_ALPHA_SECRET",
)
self.assertEqual(
(
"EGRESS_TOKEN_0",
"EGRESS_TOKEN_1",
"CANON_ALPHA_SECRET=fake-canary-value",
"BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET",
),
egress_sidecar_env_entries(plan),
)
def test_agent_entries_include_only_canary_bait(self):
plan = EgressPlan(
slug="s",
routes_path=Path("/tmp/r.yaml"),
routes=(),
token_env_map={},
canary="fake-canary-value",
canary_env="CANON_ALPHA_SECRET",
)
self.assertEqual(
("CANON_ALPHA_SECRET=fake-canary-value",),
egress_agent_env_entries(plan),
)
def test_canary_entries_omitted_when_name_missing(self):
plan = EgressPlan(
slug="s",
routes_path=Path("/tmp/r.yaml"),
routes=(),
token_env_map={},
canary="fake-canary-value",
)
self.assertEqual((), egress_sidecar_env_entries(plan))
self.assertEqual((), egress_agent_env_entries(plan))
if __name__ == "__main__":
unittest.main()
+130 -42
View File
@@ -32,7 +32,6 @@ from bot_bottle.egress_addon_core import (
is_git_fetch_request,
is_git_push_request,
load_config,
load_routes,
match_route,
outbound_scan_headers,
parse_config,
@@ -289,47 +288,6 @@ class TestParseDlp(unittest.TestCase):
}]})
# --- load_routes ---------------------------------------------------------
class TestLoadRoutes(unittest.TestCase):
def test_yaml_text_round_trip(self):
routes = load_routes(
'routes:\n'
' - host: "api.example"\n'
)
self.assertEqual(1, len(routes))
self.assertEqual("api.example", routes[0].host)
def test_full_route_shape_parses(self):
routes = load_routes(
'routes:\n'
' - host: "api.example"\n'
' auth_scheme: "Bearer"\n'
' token_env: "EGRESS_TOKEN_0"\n'
' matches:\n'
' - paths:\n'
' - value: "/v1/"\n'
' - type: "exact"\n'
' value: "/messages"\n'
)
self.assertEqual(1, len(routes))
r = routes[0]
self.assertEqual("api.example", r.host)
self.assertEqual("Bearer", r.auth_scheme)
self.assertEqual("EGRESS_TOKEN_0", r.token_env)
self.assertEqual(1, len(r.matches))
self.assertEqual(2, len(r.matches[0].paths))
def test_empty_routes_list(self):
routes = load_routes("routes: []\n")
self.assertEqual((), routes)
def test_invalid_yaml_raises_value_error(self):
with self.assertRaises(ValueError):
load_routes("routes:\n\t- host: x\n")
# --- load_config / parse_config ------------------------------------------
@@ -378,6 +336,33 @@ class TestLoadConfig(unittest.TestCase):
with self.assertRaises(ValueError):
parse_config("not a dict")
def test_empty_routes_list(self):
cfg = load_config("routes: []\n")
self.assertEqual((), cfg.routes)
def test_full_route_shape_parses(self):
cfg = load_config(
'routes:\n'
' - host: "api.example"\n'
' auth_scheme: "Bearer"\n'
' token_env: "EGRESS_TOKEN_0"\n'
' matches:\n'
' - paths:\n'
' - value: "/v1/"\n'
' - type: "exact"\n'
' value: "/messages"\n'
)
r = cfg.routes[0]
self.assertEqual("api.example", r.host)
self.assertEqual("Bearer", r.auth_scheme)
self.assertEqual("EGRESS_TOKEN_0", r.token_env)
self.assertEqual(1, len(r.matches))
self.assertEqual(2, len(r.matches[0].paths))
def test_invalid_yaml_raises_value_error(self):
with self.assertRaises(ValueError):
load_config("routes:\n\t- host: x\n")
# --- evaluate_matches ---------------------------------------------------
@@ -1273,6 +1258,109 @@ class TestBuildTokenAllowPayload(unittest.TestCase):
result = ScanResult(severity="block", reason="r", matched="x")
payload = build_token_allow_payload("h", "GET", "/", result)
self.assertNotIn("context:", payload)
class TestScanOutboundEnhanced(unittest.TestCase):
"""scan_outbound changes: binary decode, entropy detector,
broadened known-value prefixes, fragmentation resistance."""
_ROUTE = Route(host="api.example.com")
_ROUTE_ENTROPY = Route(
host="api.example.com",
outbound_detectors=("entropy",),
)
def test_binary_body_latin1_decode_finds_ascii_secret(self):
# Body contains valid ASCII secret surrounded by non-UTF-8 bytes.
secret = "supersecrettoken99"
env = {"EGRESS_TOKEN_0": secret}
# Wrap the secret in bytes that are invalid UTF-8.
body = b"\x80\x81" + secret.encode("ascii") + b"\xff"
result = scan_outbound(self._ROUTE, body, env)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_binary_body_valid_utf8_decoded_correctly(self):
env = {"EGRESS_TOKEN_0": "mysecret"}
# Valid UTF-8 body — should be decoded as UTF-8, not latin-1.
body = "clean body with mysecret".encode("utf-8")
result = scan_outbound(self._ROUTE, body, env)
self.assertIsNotNone(result)
def test_entropy_detector_off_by_default(self):
import string
# High-entropy content should NOT warn if the route has no entropy detector.
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
result = scan_outbound(self._ROUTE, alphabet, {})
self.assertIsNone(result)
def test_entropy_detector_warns_when_enabled(self):
import string
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
result = scan_outbound(self._ROUTE_ENTROPY, alphabet, {})
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("warn", result.severity)
def test_bot_bottle_sensitive_prefixes_env_var(self):
# When the sidecar env contains BOT_BOTTLE_SENSITIVE_PREFIXES,
# scan_outbound should scan those additional prefixes.
secret = "extra-sensitive-value-abc"
env = {
"MY_CRED_KEY": secret,
"BOT_BOTTLE_SENSITIVE_PREFIXES": "MY_CRED_",
}
result = scan_outbound(self._ROUTE, f"x={secret}", env)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_bot_bottle_sensitive_prefixes_multiple(self):
secret = "my-api-key-value-xyz"
env = {
"ANTHROPIC_API_0": secret,
"BOT_BOTTLE_SENSITIVE_PREFIXES": "ANTHROPIC_API_,OTHER_",
}
result = scan_outbound(self._ROUTE, f"auth={secret}", env)
self.assertIsNotNone(result)
def test_canary_detected_via_random_secret_env_name(self):
# The fake secret uses a randomized env name that the sidecar marks
# as sensitive through BOT_BOTTLE_SENSITIVE_PREFIXES.
canary = "canaryvalue12345abcdef"
env = {
"CANON_ALPHA_SECRET": canary,
"BOT_BOTTLE_SENSITIVE_PREFIXES": "CANON_ALPHA_SECRET",
}
result = scan_outbound(self._ROUTE, f"data={canary}", env)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
self.assertIn("CANON_ALPHA_SECRET", result.reason)
def test_fragmented_canary_blocked(self):
# Canary with separators injected is still caught.
canary = "supersecretcanary99"
env = {
"CANON_ALPHA_SECRET": canary,
"BOT_BOTTLE_SENSITIVE_PREFIXES": "CANON_ALPHA_SECRET",
}
fragmented = "-".join(canary)
result = scan_outbound(self._ROUTE, f"x={fragmented}", env)
self.assertIsNotNone(result)
class TestOutboundDetectorNames(unittest.TestCase):
def test_entropy_in_outbound_detector_names(self):
from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES
self.assertIn("entropy", OUTBOUND_DETECTOR_NAMES)
def test_known_secrets_in_outbound_detector_names(self):
from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES
self.assertIn("known_secrets", OUTBOUND_DETECTOR_NAMES)
def test_token_patterns_in_outbound_detector_names(self):
from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES
self.assertIn("token_patterns", OUTBOUND_DETECTOR_NAMES)
if __name__ == "__main__":
@@ -0,0 +1,274 @@
"""Unit: LOG_FULL credential redaction in _log_request / _log_response (issue #257).
egress_addon.py is sidecar-only code that depends on mitmproxy, which is
not installed on the host. This file pre-populates sys.modules with the
minimum mocks needed so EgressAddon can be imported and tested without the
real mitmproxy package."""
from __future__ import annotations
import json
import sys
import types
import unittest
from io import StringIO
from typing import Any
from unittest.mock import patch
# ---------------------------------------------------------------------------
# Sidecar-import shims — must run before importing egress_addon
# ---------------------------------------------------------------------------
def _ensure_shims() -> None:
if "mitmproxy" not in sys.modules:
_mm = types.ModuleType("mitmproxy")
_mh = types.ModuleType("mitmproxy.http")
setattr(_mm, "http", _mh)
sys.modules["mitmproxy"] = _mm
sys.modules["mitmproxy.http"] = _mh
if "egress_addon_core" not in sys.modules:
import bot_bottle.egress_addon_core as _core
sys.modules["egress_addon_core"] = _core
_ensure_shims()
from bot_bottle.egress_addon import EgressAddon # noqa: E402 (import after shims)
from bot_bottle.egress_addon_core import Config, LOG_FULL # noqa: E402
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _addon() -> EgressAddon:
"""Return a bare EgressAddon with LOG_FULL config and no routes file."""
a: EgressAddon = EgressAddon.__new__(EgressAddon)
a.config = Config(routes=(), log=LOG_FULL)
a.safe_tokens = set()
a._supervise_queue_dir = ""
a._supervise_slug = ""
a._token_allow_timeout = 300.0
return a
class _Headers:
def __init__(self, d: dict[str, str]) -> None:
self._d = d
def items(self) -> list[tuple[str, str]]:
return list(self._d.items())
class _Request:
def __init__(
self,
host: str = "api.example.com",
method: str = "POST",
path: str = "/v1/messages",
headers: dict[str, str] | None = None,
body: str = "",
) -> None:
self.pretty_host = host
self.method = method
self.path = path
self.headers = _Headers(headers or {})
self._body = body
def get_text(self, *, strict: bool = True) -> str:
return self._body
class _Response:
def __init__(
self,
status_code: int = 200,
headers: dict[str, str] | None = None,
body: str = "",
) -> None:
self.status_code = status_code
self.headers = _Headers(headers or {})
self._body = body
def get_text(self, *, strict: bool = True) -> str:
return self._body
class _Flow:
def __init__(
self,
request: _Request | None = None,
response: _Response | None = None,
) -> None:
self.request = request or _Request()
self.response = response or _Response()
def _log_request(addon: EgressAddon, flow: _Flow) -> dict[str, Any]:
buf = StringIO()
with patch("sys.stderr", buf):
addon._log_request(flow) # type: ignore[arg-type]
return json.loads(buf.getvalue())
def _log_response(addon: EgressAddon, flow: _Flow) -> dict[str, Any]:
buf = StringIO()
with patch("sys.stderr", buf):
addon._log_response(flow) # type: ignore[arg-type]
return json.loads(buf.getvalue())
# ---------------------------------------------------------------------------
# _log_request — authorization header stripped
# ---------------------------------------------------------------------------
class TestLogRequestAuthorizationStripped(unittest.TestCase):
def test_lowercase_authorization_excluded(self) -> None:
addon = _addon()
flow = _Flow(request=_Request(headers={"authorization": "Bearer sk-real-secret"}))
entry = _log_request(addon, flow)
self.assertNotIn("authorization", entry["headers"])
def test_titlecase_authorization_excluded(self) -> None:
addon = _addon()
flow = _Flow(request=_Request(headers={"Authorization": "Bearer sk-real-secret"}))
entry = _log_request(addon, flow)
self.assertNotIn("Authorization", entry["headers"])
self.assertNotIn("authorization", entry["headers"])
def test_non_auth_headers_retained(self) -> None:
addon = _addon()
flow = _Flow(request=_Request(headers={
"authorization": "Bearer sk-real-secret",
"content-type": "application/json",
}))
entry = _log_request(addon, flow)
self.assertIn("content-type", entry["headers"])
self.assertEqual("application/json", entry["headers"]["content-type"])
def test_no_authorization_header_logs_all_others(self) -> None:
addon = _addon()
flow = _Flow(request=_Request(headers={"x-request-id": "abc"}))
entry = _log_request(addon, flow)
self.assertEqual({"x-request-id": "abc"}, entry["headers"])
# ---------------------------------------------------------------------------
# _log_request — body redaction
# ---------------------------------------------------------------------------
_OPENAI_KEY = "sk-" + "A" * 48
class TestLogRequestBodyRedacted(unittest.TestCase):
def test_token_pattern_in_body_scrubbed(self) -> None:
addon = _addon()
flow = _Flow(request=_Request(body=f"key={_OPENAI_KEY}"))
entry = _log_request(addon, flow)
self.assertNotIn(_OPENAI_KEY, entry["body"])
self.assertIn("********", entry["body"])
def test_provisioned_secret_in_body_scrubbed(self) -> None:
addon = _addon()
secret = "provisioned-egress-secret-xyz"
flow = _Flow(request=_Request(body=f"token={secret}"))
with patch.dict("os.environ", {"EGRESS_TOKEN_0": secret}):
entry = _log_request(addon, flow)
self.assertNotIn(secret, entry["body"])
self.assertIn("********", entry["body"])
def test_clean_body_preserved(self) -> None:
addon = _addon()
payload = '{"model": "claude-3", "max_tokens": 1024}'
flow = _Flow(request=_Request(body=payload))
entry = _log_request(addon, flow)
self.assertEqual(payload, entry["body"])
# ---------------------------------------------------------------------------
# _log_request — non-authorization header value redaction
# ---------------------------------------------------------------------------
class TestLogRequestHeaderValuesRedacted(unittest.TestCase):
def test_token_in_custom_header_scrubbed(self) -> None:
addon = _addon()
flow = _Flow(request=_Request(headers={"x-api-key": _OPENAI_KEY}))
entry = _log_request(addon, flow)
self.assertNotIn(_OPENAI_KEY, entry["headers"].get("x-api-key", ""))
self.assertIn("********", entry["headers"].get("x-api-key", ""))
def test_clean_header_value_preserved(self) -> None:
addon = _addon()
flow = _Flow(request=_Request(headers={"accept": "application/json"}))
entry = _log_request(addon, flow)
self.assertEqual("application/json", entry["headers"]["accept"])
# ---------------------------------------------------------------------------
# _log_response — body redaction
# ---------------------------------------------------------------------------
class TestLogResponseBodyRedacted(unittest.TestCase):
def test_token_pattern_in_response_body_scrubbed(self) -> None:
addon = _addon()
flow = _Flow(
request=_Request(),
response=_Response(body=f'{{"key": "{_OPENAI_KEY}"}}'),
)
entry = _log_response(addon, flow)
self.assertNotIn(_OPENAI_KEY, entry["body"])
self.assertIn("********", entry["body"])
def test_provisioned_secret_in_response_body_scrubbed(self) -> None:
addon = _addon()
secret = "provisioned-egress-secret-xyz"
flow = _Flow(
request=_Request(),
response=_Response(body=f'{{"token": "{secret}"}}'),
)
with patch.dict("os.environ", {"EGRESS_TOKEN_0": secret}):
entry = _log_response(addon, flow)
self.assertNotIn(secret, entry["body"])
self.assertIn("********", entry["body"])
def test_clean_response_body_preserved(self) -> None:
addon = _addon()
flow = _Flow(request=_Request(), response=_Response(body='{"result": "ok"}'))
entry = _log_response(addon, flow)
self.assertEqual('{"result": "ok"}', entry["body"])
# ---------------------------------------------------------------------------
# _log_response — response header value redaction
# ---------------------------------------------------------------------------
class TestLogResponseHeaderValuesRedacted(unittest.TestCase):
def test_token_in_response_header_scrubbed(self) -> None:
addon = _addon()
flow = _Flow(
request=_Request(),
response=_Response(headers={"set-cookie": f"token={_OPENAI_KEY}"}),
)
entry = _log_response(addon, flow)
cookie_val = entry["headers"].get("set-cookie", "")
self.assertNotIn(_OPENAI_KEY, cookie_val)
self.assertIn("********", cookie_val)
def test_clean_response_header_preserved(self) -> None:
addon = _addon()
flow = _Flow(
request=_Request(),
response=_Response(headers={"content-type": "application/json"}),
)
entry = _log_response(addon, flow)
self.assertEqual("application/json", entry["headers"]["content-type"])
if __name__ == "__main__":
unittest.main()
+9
View File
@@ -54,6 +54,15 @@ class TestValidateRoutesContent(unittest.TestCase):
' auth_scheme: "Bearer"\n'
)
def test_rejects_log_full(self):
with self.assertRaises(EgressApplyError) as cm:
applicator.validate_routes_content(
'log: 2\n'
'routes:\n'
' - host: "x.example"\n'
)
self.assertIn("must not change egress logging", str(cm.exception))
class TestApplyRoutesChange(unittest.TestCase):
def setUp(self):
+24 -1
View File
@@ -30,6 +30,7 @@ def _plan(
supervise: bool = False,
agent_git_gate_url: str = "",
agent_supervise_url: str = "",
canary: bool = False,
) -> MacosContainerBottlePlan:
routes_path = stage_dir / "routes.yaml"
routes_path.write_text("routes: []\n", encoding="utf-8")
@@ -42,6 +43,8 @@ def _plan(
routes_path=routes_path,
routes=("route",),
token_env_map={"EGRESS_TOKEN_0": "HOST_TOKEN"},
canary="fake-canary-value" if canary else "",
canary_env="CANON_ALPHA_SECRET" if canary else "",
)
if git:
key_path = stage_dir / "origin-key"
@@ -138,6 +141,26 @@ class TestMacosContainerLaunchArgv(unittest.TestCase):
argv,
)
def test_sidecar_argv_registers_canary_env_as_sensitive(self):
plan = _plan(stage_dir=self.stage_dir, canary=True)
argv = launch._sidecar_run_argv(
plan,
"bot-bottle-sidecars-dev-abc",
"bot-bottle-net-dev-abc",
"bot-bottle-egress-dev-abc",
)
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", argv)
self.assertIn("BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET", argv)
def test_agent_argv_receives_canary_env(self):
plan = _plan(stage_dir=self.stage_dir, canary=True)
argv = launch._agent_run_argv(
plan,
"bot-bottle-net-dev-abc",
"192.0.2.10",
)
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", argv)
def test_agent_env_points_proxy_at_sidecar_ip(self):
plan = _plan(
stage_dir=self.stage_dir,
@@ -271,7 +294,7 @@ def _build_plan(stage_dir: Path) -> MacosContainerBottlePlan:
manifest=_MANIFEST,
stage_dir=stage_dir,
git_gate_plan=cast(GitGatePlan, SimpleNamespace(upstreams=())),
egress_plan=cast(EgressPlan, SimpleNamespace()),
egress_plan=cast(EgressPlan, SimpleNamespace(canary="")),
supervise_plan=None,
agent_provision=AgentProvisionPlan(
template="claude",
+27
View File
@@ -73,6 +73,33 @@ resolver #2
)
self.assertTrue(run.call_args_list[-1].kwargs["check"])
def test_build_image_anchors_relative_dockerfile_to_context(self):
status = util.subprocess.CompletedProcess(
args=[],
returncode=0,
stdout=(
'[{"status":{"state":"running"},'
'"configuration":{"dns":{"nameservers":["9.9.9.9"]}}}]'
),
stderr="",
)
with patch.object(util.subprocess, "run", return_value=status) as run, \
patch.object(util.os, "environ", {
"BOT_BOTTLE_MACOS_CONTAINER_DNS": "9.9.9.9",
}):
util.build_image(
"bot-bottle-sidecars:latest",
"/repo",
dockerfile="Dockerfile.sidecars",
)
self.assertEqual(
[
"container", "build", "-t", "bot-bottle-sidecars:latest",
"--dns", "9.9.9.9", "-f", "/repo/Dockerfile.sidecars", "/repo",
],
run.call_args_list[-1].args[0],
)
def test_commit_container_execs_tar_and_builds_image(self):
# stderr is bytes because subprocess.run uses stderr=PIPE without text=True
completed = util.subprocess.CompletedProcess(
+28 -1
View File
@@ -167,13 +167,40 @@ class TestAgentProviderHostCredentials(unittest.TestCase):
},
})
def test_settings_rejected_for_claude(self):
def test_startup_args_allowed_for_claude(self):
b = _provider_config_bottle({
"template": "claude",
"settings": {"startup_args": ["--model", "opus"]},
})
self.assertEqual(
{"startup_args": ["--model", "opus"]},
b.agent_provider.settings,
)
def test_startup_args_allowed_for_codex(self):
b = _provider_config_bottle({
"template": "codex",
"settings": {"startup_args": ["--model", "gpt-5-codex"]},
})
self.assertEqual(
{"startup_args": ["--model", "gpt-5-codex"]},
b.agent_provider.settings,
)
def test_provider_specific_settings_still_rejected_for_claude(self):
with self.assertRaises(ManifestError):
_provider_config_bottle({
"template": "claude",
"settings": {"models": ["qwen2.5-coder:7b"]},
})
def test_startup_args_must_be_string_array(self):
with self.assertRaises(ManifestError):
_provider_config_bottle({
"template": "codex",
"settings": {"startup_args": ["--model", 42]},
})
def test_settings_models_must_be_non_empty_string_array(self):
with self.assertRaises(ManifestError):
_provider_config_bottle({
+1 -1
View File
@@ -130,7 +130,7 @@ def _capture_print(plan: DockerBottlePlan | SmolmachinesBottlePlan) -> list[str]
orig = sys.stderr
sys.stderr = buf
try:
plan.print(remote_control=False)
plan.print()
finally:
sys.stderr = orig
return buf.getvalue().splitlines()
+29 -4
View File
@@ -26,9 +26,7 @@ from bot_bottle.backend.smolmachines.bottle import SmolmachinesBottle
from bot_bottle.backend.smolmachines.bottle_plan import (
SmolmachinesBottlePlan,
)
# from bot_bottle.backend.smolmachines.provision import (
# workspace as _workspace,
# )
from bot_bottle.backend.smolmachines import launch as _launch
from bot_bottle.backend.smolmachines.launch import _bundle_launch_spec
from bot_bottle.backend.util import AGENT_CA_PATH
from bot_bottle.egress import EgressPlan, EgressRoute
@@ -44,7 +42,6 @@ class _Provider(AgentProvider):
return AgentProviderRuntime(
template="test", command="test", image="",
prompt_mode="append_file", bypass_args=(), resume_args=(),
remote_control_args=(),
)
def provision_plan(self, **kwargs): # type: ignore[override]
raise NotImplementedError
@@ -86,6 +83,7 @@ def _plan(
stage_dir: Path | None = None,
egress_routes: tuple[EgressRoute, ...] = (),
egress_ca_path: Path = Path(),
canary: bool = False,
supervise: bool = False,
bundle_ip: str = "192.168.50.2",
agent_git_gate_host: str = "127.0.0.1:55555",
@@ -156,6 +154,8 @@ def _plan(
routes=egress_routes,
token_env_map={},
mitmproxy_ca_cert_only_host_path=egress_ca_path,
canary="fake-canary-value" if canary else "",
canary_env="CANON_ALPHA_SECRET" if canary else "",
),
supervise_plan=supervise_plan,
agent_git_gate_host=agent_git_gate_host,
@@ -411,6 +411,31 @@ class TestBundleLaunchSpec(unittest.TestCase):
self.assertIn(9420, spec.ports_to_publish)
self.assertNotIn(9418, spec.ports_to_publish)
def test_canary_env_registered_as_sensitive_in_bundle(self):
plan = _plan(canary=True)
spec = _bundle_launch_spec(plan, "net", "127.0.0.16")
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", spec.environment)
self.assertIn(
"BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET",
spec.environment,
)
def test_canary_env_visible_to_smolvm_guest(self):
plan = _plan(canary=True)
with patch.object(
_launch._bundle,
"bundle_host_port",
return_value="65000",
):
stamped = _launch._discover_urls(plan, "127.0.0.16")
self.assertEqual(
"fake-canary-value",
stamped.guest_env["CANON_ALPHA_SECRET"],
)
class TestProvisionGitUser(unittest.TestCase):
"""`provision_git` runs `git config --global` inside the
+175 -21
View File
@@ -20,6 +20,7 @@ import supervise as _sv # noqa: E402 # type: ignore
from bot_bottle import supervise_server # noqa: E402
from bot_bottle.supervise_server import (
ERR_INTERNAL,
ERR_INVALID_PARAMS,
ERR_INVALID_REQUEST,
ERR_METHOD_NOT_FOUND,
@@ -29,7 +30,9 @@ from bot_bottle.supervise_server import (
PROPOSED_FILE_FIELD,
ServerConfig,
TOOL_DEFINITIONS,
_RpcClientError,
_RpcError,
_RpcInternalError,
_response_timeout_from_env,
format_response_text,
handle_initialize,
@@ -47,15 +50,15 @@ from bot_bottle.supervise_server import (
class TestValidation(unittest.TestCase):
def test_capability_block_accepts_anything_nonempty(self):
validate_proposed_file(
_sv.TOOL_CAPABILITY_BLOCK,
"FROM python:3.13\nRUN apk add git\n",
)
def test_empty_proposed_file_rejected_for_tools_with_file_field(self):
with self.assertRaises(_RpcError):
validate_proposed_file(_sv.TOOL_CAPABILITY_BLOCK, " \n\t")
validate_proposed_file(_sv.TOOL_EGRESS_ALLOW, " \n\t")
def test_capability_block_rejected_as_unknown_tool(self):
with self.assertRaises(_RpcError) as cm:
validate_proposed_file("capability-block", "FROM python:3.13\n")
self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code)
self.assertIn("unknown tool", cm.exception.message)
def test_egress_routes_yaml_is_validated(self):
validate_proposed_file(
@@ -67,6 +70,74 @@ class TestValidation(unittest.TestCase):
with self.assertRaises(_RpcError):
validate_proposed_file(_sv.TOOL_EGRESS_BLOCK, "routes: nope\n")
def test_egress_routes_yaml_rejects_log_full(self):
with self.assertRaises(_RpcError) as cm:
validate_proposed_file(
_sv.TOOL_EGRESS_ALLOW,
"log: 2\nroutes:\n - host: example.com\n",
)
self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code)
self.assertIn("must not change egress logging", cm.exception.message)
# --- Error taxonomy --------------------------------------------------------
class TestRpcErrorTaxonomy(unittest.TestCase):
def test_rpc_client_error_is_rpc_error(self):
e = _RpcClientError(ERR_INVALID_PARAMS, "bad param")
self.assertIsInstance(e, _RpcError)
self.assertEqual(ERR_INVALID_PARAMS, e.code)
self.assertEqual("bad param", e.message)
def test_rpc_internal_error_is_rpc_error(self):
e = _RpcInternalError("disk full")
self.assertIsInstance(e, _RpcError)
self.assertEqual(ERR_INTERNAL, e.code)
self.assertEqual("disk full", e.message)
def test_rpc_internal_error_preserves_cause(self):
cause = OSError("no space left on device")
try:
raise _RpcInternalError("failed to write") from cause
except _RpcInternalError as e:
self.assertIs(cause, e.__cause__)
def test_parse_error_is_client_error(self):
with self.assertRaises(_RpcClientError):
parse_jsonrpc(b"{bad json")
def test_validation_error_is_client_error(self):
with self.assertRaises(_RpcClientError):
validate_proposed_file(_sv.TOOL_EGRESS_ALLOW, "routes: nope\n")
def test_unknown_tool_in_tools_call_is_client_error(self):
config = ServerConfig(bottle_slug="dev", queue_dir=Path("/unused"))
with self.assertRaises(_RpcClientError) as cm:
handle_tools_call({"name": "no-such-tool", "arguments": {}}, config)
self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code)
class TestRpcInternalErrorOnIoFailure(unittest.TestCase):
def test_write_proposal_os_error_raises_internal(self):
config = ServerConfig(
bottle_slug="dev",
queue_dir=Path("/dev/null/cannot-exist"),
)
with self.assertRaises(_RpcInternalError) as cm:
handle_tools_call(
{
"name": _sv.TOOL_EGRESS_ALLOW,
"arguments": {
"routes_yaml": "routes:\n - host: example.com\n",
"justification": "x",
},
},
config,
)
self.assertEqual(ERR_INTERNAL, cm.exception.code)
self.assertIsNotNone(cm.exception.__cause__)
# --- JSON-RPC parsing ------------------------------------------------------
@@ -148,7 +219,6 @@ class TestHandleToolsList(unittest.TestCase):
self.assertEqual(
sorted([
_sv.TOOL_EGRESS_ALLOW,
_sv.TOOL_CAPABILITY_BLOCK,
_sv.TOOL_EGRESS_BLOCK,
_sv.TOOL_LIST_EGRESS_ROUTES,
]),
@@ -224,10 +294,10 @@ class TestHandleToolsCall(unittest.TestCase):
try:
result = handle_tools_call(
{
"name": _sv.TOOL_CAPABILITY_BLOCK,
"name": _sv.TOOL_EGRESS_BLOCK,
"arguments": {
"dockerfile": "FROM python:3.13\n",
"justification": "need git",
"routes_yaml": "routes:\n - host: example.com\n",
"justification": "need example.com",
},
},
self.config,
@@ -264,9 +334,9 @@ class TestHandleToolsCall(unittest.TestCase):
try:
result = handle_tools_call(
{
"name": _sv.TOOL_CAPABILITY_BLOCK,
"name": _sv.TOOL_EGRESS_ALLOW,
"arguments": {
"dockerfile": "FROM python:3.13\n",
"routes_yaml": "routes:\n - host: example.com\n",
"justification": "needed for tests",
},
},
@@ -288,20 +358,52 @@ class TestHandleToolsCall(unittest.TestCase):
with self.assertRaises(_RpcError):
handle_tools_call(
{
"name": _sv.TOOL_CAPABILITY_BLOCK,
"arguments": {"dockerfile": "FROM python:3.13\n"},
"name": _sv.TOOL_EGRESS_ALLOW,
"arguments": {"routes_yaml": "routes:\n - host: example.com\n"},
},
self.config,
)
def test_missing_name_raises(self):
with self.assertRaises(_RpcError) as cm:
handle_tools_call({"arguments": {}}, self.config)
self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code)
def test_arguments_must_be_object(self):
with self.assertRaises(_RpcError) as cm:
handle_tools_call(
{
"name": _sv.TOOL_EGRESS_ALLOW,
"arguments": [],
},
self.config,
)
self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code)
self.assertIn("must be an object", cm.exception.message)
def test_capability_block_call_raises_unknown_tool(self):
with self.assertRaises(_RpcError) as cm:
handle_tools_call(
{
"name": "capability-block",
"arguments": {
"dockerfile": "FROM python:3.13\n",
"justification": "need git",
},
},
self.config,
)
self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code)
self.assertIn("unknown tool", cm.exception.message)
def test_archives_proposal_after_response(self):
responder = self._respond_when_proposal_appears(_sv.STATUS_APPROVED)
try:
handle_tools_call(
{
"name": _sv.TOOL_CAPABILITY_BLOCK,
"name": _sv.TOOL_EGRESS_ALLOW,
"arguments": {
"dockerfile": "FROM python:3.13\n",
"routes_yaml": "routes:\n - host: example.com\n",
"justification": "x",
},
},
@@ -323,10 +425,10 @@ class TestHandleToolsCall(unittest.TestCase):
)
result = handle_tools_call(
{
"name": _sv.TOOL_CAPABILITY_BLOCK,
"name": _sv.TOOL_EGRESS_ALLOW,
"arguments": {
"dockerfile": "FROM python:3.13\n",
"justification": "need a capability",
"routes_yaml": "routes:\n - host: example.com\n",
"justification": "need egress",
},
},
config,
@@ -341,6 +443,31 @@ class TestHandleToolsCall(unittest.TestCase):
class TestHandleListEgressRoutes(unittest.TestCase):
def test_success_returns_body_text(self):
class _Resp:
def __enter__(self):
return self
def __exit__(self, exc_type: type[BaseException] | None, exc: BaseException | None, tb: object) -> bool:
return False
def read(self):
return b"[{\"host\": \"example.com\"}]"
class _Opener:
def open(self, *args, **kwargs): # noqa: ANN001, ANN002, ANN003 # type: ignore
return _Resp()
with patch.object(supervise_server.urllib.request, "build_opener", return_value=_Opener()):
result = handle_list_egress_routes(
{},
ServerConfig(bottle_slug="dev", queue_dir=Path("/unused")),
)
self.assertFalse(result["isError"]) # type: ignore[index]
text = result["content"][0]["text"] # type: ignore[index]
self.assertIn("example.com", text)
def test_url_error_returns_tool_error(self):
class _Opener:
def open(self, *args, **kwargs): # noqa: ANN001, ANN002, ANN003 # type: ignore
@@ -400,6 +527,13 @@ class TestFormatResponseText(unittest.TestCase):
self.assertIn("the operator modified", text.lower())
class TestFormatPendingResponseText(unittest.TestCase):
def test_formats_timeout_message(self):
text = supervise_server.format_pending_response_text(12.5)
self.assertIn("status: pending", text)
self.assertIn("12.5s", text)
# --- End-to-end HTTP sanity ------------------------------------------------
@@ -450,7 +584,7 @@ class TestHttpEndToEnd(unittest.TestCase):
self.assertEqual("2.0", result["jsonrpc"])
self.assertEqual(1, result["id"])
names = [t["name"] for t in result["result"]["tools"]] # type: ignore[index]
self.assertIn(_sv.TOOL_CAPABILITY_BLOCK, names)
self.assertNotIn("capability-block", names)
self.assertIn(_sv.TOOL_EGRESS_ALLOW, names)
self.assertIn(_sv.TOOL_EGRESS_BLOCK, names)
@@ -460,6 +594,26 @@ class TestHttpEndToEnd(unittest.TestCase):
)
self.assertEqual(ERR_METHOD_NOT_FOUND, result["error"]["code"]) # type: ignore[index]
def test_internal_error_returns_err_internal_over_http(self):
with patch.object(
supervise_server._sv, "write_proposal",
side_effect=OSError("disk full"),
):
result = self._post_jsonrpc({
"jsonrpc": "2.0",
"id": 99,
"method": "tools/call",
"params": {
"name": _sv.TOOL_EGRESS_ALLOW,
"arguments": {
"routes_yaml": "routes:\n - host: example.com\n",
"justification": "x",
},
},
})
self.assertIn("error", result)
self.assertEqual(ERR_INTERNAL, result["error"]["code"]) # type: ignore[index]
def test_health_endpoint(self):
conn = http.client.HTTPConnection("127.0.0.1", self.port, timeout=5)
try: