Compare commits

..

4 Commits

Author SHA1 Message Date
didericis-claude affd628df6 docs(prd): flip prd-new-strengthen-outbound-exfil-detection Draft → Active
lint / lint (push) Failing after 2m2s
test / unit (pull_request) Successful in 47s
test / integration (pull_request) Successful in 29s
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-25 01:02:55 +00:00
didericis-claude 2610f5a2c9 feat(egress): inject per-session canary token into sidecar and agent environments
EgressPlan gains a `canary: str` field (default "") populated in Egress.prepare()
using secrets.token_urlsafe(32).  Each launched bottle:

  - sidecar receives EGRESS_TOKEN_CANARY=<value> (literal env entry, scanned by
    existing known-secrets detector without any detector code changes)
  - agent receives BOT_BOTTLE_CANARY=<value> (visible fake secret that signals
    exfiltration with zero false positives if it appears in outbound traffic)

Docker compose and macos-container backends updated; smolmachines shares docker
compose and so picks this up automatically.  Unit tests cover canary uniqueness,
detection via scan_known_secrets, and EgressPlan backward-compat default.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-25 01:02:44 +00:00
didericis-claude 001a420957 feat(dlp): fragmentation resistance, entropy detector, broadened known-value scan
- _alnum_projection(): strip non-alphanumeric chars for separator-injection detection
- scan_known_secrets() gains two extra passes per secret after exact-variant matching:
  alnum-projection exact match (catches hyphens/spaces between secret chars) and a
  sliding-window partial-match scan (catches chunked substrings ≥ PARTIAL_MATCH_MIN_LEN)
- scan_known_secrets() accepts sensitive_prefixes param (default ("EGRESS_TOKEN_",))
  so redact_tokens and call-sites can extend the scanned env-var prefix set
- scan_entropy() warn-only detector flagging windows with Shannon entropy ≥ 5.5 bits/char
- "entropy" added to OUTBOUND_DETECTOR_NAMES; scan_outbound opts it in only when
  explicitly listed in dlp.outbound_detectors (never part of the default "all" set)
- scan_outbound reads BOT_BOTTLE_SENSITIVE_PREFIXES from environ to extend
  scan_known_secrets beyond EGRESS_TOKEN_* without schema changes
- Binary bodies decoded via latin-1 fallback (bijective byte↔codepoint) instead
  of utf-8 errors=replace, preserving ASCII secret strings in binary payloads

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-25 01:02:34 +00:00
didericis-claude 813cb685bf docs: draft PRD prd-new for strengthen-outbound-exfil-detection
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-25 00:54:28 +00:00
48 changed files with 228 additions and 1869 deletions
-4
View File
@@ -15,7 +15,6 @@
## Features
- **Per-bottle egress allowlist** — TLS-bumped HTTP/HTTPS chokepoint with a per-manifest host allowlist; per-route path/method/header `matches` filtering; outbound DLP scanning for known tokens and secrets, inbound DLP scanning for prompt-injection attempts; DoH and arbitrary hosts blocked by default.
- **Per-route token-match policy** — each egress route picks what happens when the outbound DLP catches a token via `dlp.outbound_on_match`: `supervise` (default) holds the request and surfaces it in `./cli.py supervise` for approval (an approved value is remembered for the life of the proxy); `redact` scrubs the value and forwards; `block` is a hard `403`. Cuts false-positive friction without weakening default-deny.
- **Tokens the agent never sees** — host secrets live in a sidecar; the agent dials `http://sidecar:9099/<path>` and the proxy strips inbound `Authorization` and injects the real token before forwarding. `printenv` in the agent shows proxy URLs only.
- **Gitleaks-scanned push (git-gate)** — `bottle.git` remotes route through a per-bottle `git daemon` that gitleaks-scans incoming refs pre-receive and forwards clean refs upstream over SSH. The agent never holds the upstream credential.
- **Manifest-scoped skills + secrets** — each bottle declares its skills, env, git identity, remotes, and egress routes; unknown keys die at load.
@@ -149,11 +148,8 @@ You help maintain Gitea-hosted projects.
| `dlp` | no | Per-route DLP overrides. Omit to use defaults (all detectors on). |
| `dlp.outbound_detectors` | no | `false` disables outbound scanning; list restricts to named detectors (`token_patterns`, `known_secrets`). |
| `dlp.inbound_detectors` | no | `false` disables inbound scanning; list restricts to named detectors (`naive_injection_detection`). |
| `dlp.outbound_on_match` | no | What to do when an outbound token is detected: `supervise` (default for manifest routes — hold for operator approval), `redact` (scrub the value and forward), or `block` (hard 403). Agent-provider routes (e.g. `api.anthropic.com`) default to `redact`. |
| `git.fetch` | no | `true` permits smart HTTP clone/fetch (`git-upload-pack`) for this host. Push (`git-receive-pack`) remains blocked. |
When an outbound DLP detector matches a token, the route's `dlp.outbound_on_match` policy decides what happens. Under the default `supervise`, the proxy queues an `egress-token-allow` proposal for the operator's `./cli.py supervise` TUI and holds the request open until it is answered (or `EGRESS_TOKEN_ALLOW_TIMEOUT_SECONDS`, default 300s, elapses — after which it fails closed). The operator never sees the raw token, only the host, method, path, and a redacted snippet; approving adds the value to an in-memory safelist for the life of the egress proxy. Under `redact`, the matched value is scrubbed from the body, headers, and path and the request is forwarded (failing closed if a match lands somewhere unredactable, like the hostname). Under `block` it stays a hard `403`. Structural blocks (CRLF injection) and not-in-allowlist host blocks are always hard `403`s regardless of policy.
More examples in `examples/`. Full design lives under `docs/prds/`; the trust-boundary rationale is in `docs/prds/0011-per-file-md-manifest.md`.
## Trademarks
+2 -10
View File
@@ -61,6 +61,7 @@ class AgentProviderRuntime:
prompt_mode: PromptMode
bypass_args: tuple[str, ...]
resume_args: tuple[str, ...]
remote_control_args: tuple[str, ...]
@dataclass(frozen=True)
@@ -370,15 +371,6 @@ def build_agent_provision_plan(
)
def provider_startup_args(
provider_settings: dict[str, object] | None,
) -> tuple[str, ...]:
raw = (provider_settings or {}).get("startup_args", ())
if not isinstance(raw, (list, tuple)):
return ()
return tuple(arg for arg in raw if isinstance(arg, str))
def prompt_args(
prompt_mode: PromptMode,
prompt_path: str | None,
@@ -390,7 +382,7 @@ def prompt_args(
if prompt_mode == "append_file":
return ["--append-system-prompt-file", prompt_path]
if prompt_mode == "read_prompt_file":
if argv and ("resume" in argv or "remote-control" in argv):
if argv and "resume" in argv:
return []
return [f"Read and follow the instructions in {prompt_path}."]
if prompt_mode == "print_read_prompt_file":
+2 -1
View File
@@ -109,8 +109,9 @@ class BottlePlan(ABC):
def workspace_plan(self) -> WorkspacePlan:
return workspace_plan(self.spec, guest_home=self.guest_home)
def print(self) -> None:
def print(self, *, remote_control: bool) -> None:
"""Render the y/N preflight summary to stderr."""
del remote_control
spec = self.spec
manifest = self.manifest
agent = manifest.agent
+10 -4
View File
@@ -28,8 +28,6 @@ from typing import Any
from ...egress import (
EGRESS_HOSTNAME,
EGRESS_ROUTES_IN_CONTAINER,
egress_agent_env_entries,
egress_sidecar_env_entries,
)
from ...git_gate import GIT_GATE_HOSTNAME
from ...log import die, warn
@@ -137,7 +135,12 @@ def _sidecar_bundle_service(plan: DockerBottlePlan) -> dict[str, Any]:
volumes.append(_bind(ep.mitmproxy_ca_host_path, EGRESS_CA_IN_CONTAINER))
if ep.routes:
volumes.append(_bind(ep.routes_path.parent, str(Path(EGRESS_ROUTES_IN_CONTAINER).parent)))
env.extend(egress_sidecar_env_entries(ep))
for token_env in sorted(ep.token_env_map.keys()):
env.append(token_env)
if ep.canary:
# Inject canary as a literal NAME=VALUE (not a bare name) — the
# value is a fake secret so it need not be hidden from the compose file.
env.append(f"EGRESS_TOKEN_CANARY={ep.canary}")
# --- git-gate -----------------------------------------------------
gp = plan.git_gate_plan
@@ -221,7 +224,10 @@ def _agent_service(plan: DockerBottlePlan) -> dict[str, Any]:
# never lands on argv or in the compose file.
for name in sorted(plan.forwarded_env.keys()):
env.append(name)
env.extend(egress_agent_env_entries(plan.egress_plan))
# Canary token: visible to the agent as a fake secret so that any
# outbound appearance of this value is a zero-FP exfil signal.
if plan.egress_plan.canary:
env.append(f"BOT_BOTTLE_CANARY={plan.egress_plan.canary}")
service: dict[str, Any] = {
"image": plan.image,
+2 -6
View File
@@ -11,7 +11,7 @@ from pathlib import Path
from ..bottle_state import egress_state_dir
from ..egress import EGRESS_ROUTES_FILENAME
from ..egress_addon_core import LOG_OFF, load_config
from ..egress_addon_core import load_routes
class EgressApplyError(RuntimeError):
@@ -33,15 +33,11 @@ class EgressApplicator(ABC):
@staticmethod
def validate_routes_content(content: str) -> None:
try:
config = load_config(content)
load_routes(content)
except ValueError as e:
raise EgressApplyError(
f"proposed routes.yaml is not valid: {e}"
) from e
if config.log != LOG_OFF:
raise EgressApplyError(
"proposed routes.yaml must not change egress logging"
)
@staticmethod
def _routes_path(slug: str) -> Path:
+8 -8
View File
@@ -22,12 +22,7 @@ from ...bottle_state import (
git_gate_state_dir,
read_committed_image,
)
from ...egress import (
EGRESS_ROUTES_IN_CONTAINER,
egress_agent_env_entries,
egress_resolve_token_values,
egress_sidecar_env_entries,
)
from ...egress import EGRESS_ROUTES_IN_CONTAINER, egress_resolve_token_values
from ...git_gate import revoke_git_gate_provisioned_keys
from ...log import die, info, warn
from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT
@@ -355,7 +350,11 @@ def _sidecar_daemons(plan: MacosContainerBottlePlan) -> tuple[str, ...]:
def _sidecar_env_entries(plan: MacosContainerBottlePlan) -> tuple[str, ...]:
env: list[str] = list(egress_sidecar_env_entries(plan.egress_plan))
env: list[str] = []
if plan.egress_plan.routes:
env.extend(sorted(plan.egress_plan.token_env_map.keys()))
if plan.egress_plan.canary:
env.append(f"EGRESS_TOKEN_CANARY={plan.egress_plan.canary}")
if plan.git_gate_plan.upstreams:
env.append(f"BOT_BOTTLE_GIT_GATE_READY_FILE={_GIT_GATE_READY_FILE}")
if plan.supervise_plan is not None:
@@ -423,7 +422,8 @@ def _agent_env_entries(
env.append(f"{name}={value}")
for name in sorted(plan.forwarded_env.keys()):
env.append(name)
env.extend(egress_agent_env_entries(plan.egress_plan))
if plan.egress_plan.canary:
env.append(f"BOT_BOTTLE_CANARY={plan.egress_plan.canary}")
return tuple(env)
@@ -68,11 +68,6 @@ def build_image(ref: str, context: str, *, dockerfile: str = "") -> None:
_ensure_builder_dns()
args = [_CONTAINER, "build", "-t", ref, "--dns", dns_server()]
if dockerfile:
# `container build` resolves -f relative to the current working
# directory, not the build context. Anchor a relative Dockerfile to
# the context so builds work from any cwd.
if not os.path.isabs(dockerfile):
dockerfile = os.path.join(context, dockerfile)
args.extend(["-f", dockerfile])
args.append(context)
subprocess.run(args, check=True)
+5 -6
View File
@@ -23,9 +23,7 @@ from typing import Callable, Generator
from ...egress import (
EGRESS_ROUTES_IN_CONTAINER,
egress_agent_env_entries,
egress_resolve_token_values,
egress_sidecar_env_entries,
)
from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT
from ...util import expand_tilde
@@ -230,9 +228,6 @@ def _discover_urls(
guest_env["GIT_GATE_URL"] = f"http://{agent_git_gate_host}"
if agent_supervise_url:
guest_env["MCP_SUPERVISE_URL"] = agent_supervise_url
for entry in egress_agent_env_entries(plan.egress_plan):
name, value = entry.split("=", 1)
guest_env[name] = value
return dataclasses.replace(
plan,
@@ -321,7 +316,11 @@ def _bundle_launch_spec(
volumes.append((str(ep.mitmproxy_ca_host_path), EGRESS_CA_IN_CONTAINER, True))
if ep.routes:
volumes.append((str(ep.routes_path.parent), str(Path(EGRESS_ROUTES_IN_CONTAINER).parent), True))
env.extend(egress_sidecar_env_entries(ep))
# Bare-name entries for upstream-token slots. Their values
# come from the docker-run subprocess env (inherited from
# the operator's shell), never landing on argv.
for token_env in sorted(ep.token_env_map.keys()):
env.append(token_env)
# --- git-gate ---------------------------------------------
gp = plan.git_gate_plan
+2
View File
@@ -28,6 +28,7 @@ from .start import _launch_bottle
def cmd_resume(argv: list[str]) -> int:
parser = argparse.ArgumentParser(prog=f"{PROG} resume", add_help=True)
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--remote-control", action="store_true")
parser.add_argument(
"identity",
help="bottle identity from a prior `start` (see its session-end output)",
@@ -55,5 +56,6 @@ def cmd_resume(argv: list[str]) -> int:
return _launch_bottle(
spec,
dry_run=args.dry_run,
remote_control=args.remote_control,
backend_name=backend_name,
)
+10 -4
View File
@@ -42,6 +42,7 @@ def cmd_start(argv: list[str]) -> int:
parser = argparse.ArgumentParser(prog=f"{PROG} start", add_help=True)
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--cwd", action="store_true", help="copy host cwd into the running bottle")
parser.add_argument("--remote-control", action="store_true")
parser.add_argument(
"--backend",
choices=known_backend_names(),
@@ -88,6 +89,7 @@ def cmd_start(argv: list[str]) -> int:
return _launch_bottle(
spec,
dry_run=dry_run,
remote_control=args.remote_control,
backend_name=backend_name,
)
@@ -132,7 +134,7 @@ def prepare_with_preflight(
def attach_agent(
bottle: Bottle, *, resume: bool = False,
bottle: Bottle, *, remote_control: bool = False, resume: bool = False,
agent_provider_template: str = "claude",
startup_args: tuple[str, ...] = (),
) -> int:
@@ -151,6 +153,8 @@ def attach_agent(
"(Ctrl-D or 'exit' to leave; container will be removed)"
)
agent_args = list(runtime.bypass_args)
if remote_control:
agent_args.extend(runtime.remote_control_args)
agent_args.extend(startup_args)
if resume:
agent_args.extend(runtime.resume_args)
@@ -214,9 +218,9 @@ def _text_prompt_yes() -> bool:
return reply in ("y", "Y", "yes", "YES")
def _text_render_preflight():
def _text_render_preflight(*, remote_control: bool):
def _render(plan: DockerBottlePlan) -> None:
plan.print()
plan.print(remote_control=remote_control)
return _render
@@ -224,6 +228,7 @@ def _launch_bottle(
spec: BottleSpec,
*,
dry_run: bool,
remote_control: bool,
backend_name: str | None = None,
) -> int:
"""Shared launch core for `start` and `resume`. Builds the plan,
@@ -235,7 +240,7 @@ def _launch_bottle(
plan, identity = prepare_with_preflight(
spec,
stage_dir=stage_dir,
render_preflight=_text_render_preflight(),
render_preflight=_text_render_preflight(remote_control=remote_control),
prompt_yes=_text_prompt_yes,
dry_run=dry_run,
backend_name=backend_name,
@@ -248,6 +253,7 @@ def _launch_bottle(
agent_provider_template = getattr(plan, "agent_provider_template", "claude")
exit_code = attach_agent(
bottle,
remote_control=remote_control,
agent_provider_template=agent_provider_template,
startup_args=plan.agent_provision.startup_args,
)
+9 -15
View File
@@ -51,10 +51,9 @@ from ..supervise import (
STATUS_MODIFIED,
STATUS_REJECTED,
TOOL_CAPABILITY_BLOCK,
TOOL_EGRESS_ALLOW,
TOOL_ALLOW,
TOOL_EGRESS_BLOCK,
TOOL_GITLEAKS_ALLOW,
TOOL_EGRESS_TOKEN_ALLOW,
archive_proposal,
list_pending_proposals,
render_diff,
@@ -66,11 +65,6 @@ from ._common import PROG
_REFRESH_INTERVAL_MS = 1000
# Proposal tools whose payload is a read-only report, not a file the operator
# edits: modify is unavailable and approval requires a recorded reason for the
# audit trail.
_REPORT_ONLY_TOOLS: tuple[str, ...] = (TOOL_GITLEAKS_ALLOW, TOOL_EGRESS_TOKEN_ALLOW)
@dataclass(frozen=True)
class QueuedProposal:
@@ -145,9 +139,9 @@ def _detail_lines(
def _suffix_for_tool(tool: str) -> str:
if tool == TOOL_CAPABILITY_BLOCK:
return ".dockerfile"
if tool in (TOOL_EGRESS_ALLOW, TOOL_EGRESS_BLOCK):
if tool in (TOOL_ALLOW, TOOL_EGRESS_BLOCK):
return ".yaml"
if tool in (TOOL_GITLEAKS_ALLOW, TOOL_EGRESS_TOKEN_ALLOW):
if tool == TOOL_GITLEAKS_ALLOW:
return ".txt"
return ".txt"
@@ -177,7 +171,7 @@ def approve(
# diff_before, diff_after = apply_capability_change(
# qp.proposal.bottle_slug, file_to_apply,
# )
if qp.proposal.tool in (TOOL_EGRESS_ALLOW, TOOL_EGRESS_BLOCK):
if qp.proposal.tool in (TOOL_ALLOW, TOOL_EGRESS_BLOCK):
diff_before, diff_after = apply_routes_change(
qp.proposal.bottle_slug,
file_to_apply,
@@ -218,8 +212,8 @@ def _approve_from_tui(
notes: str = "",
) -> str:
"""Approve from curses, prompting for any tool-specific audit note."""
if qp.proposal.tool in _REPORT_ONLY_TOOLS and final_file is None:
notes = _prompt(stdscr, "allow reason (false positive / legitimately needed): ")
if qp.proposal.tool == TOOL_GITLEAKS_ALLOW and final_file is None:
notes = _prompt(stdscr, "allow reason (test fixture/false positive): ")
if not notes:
return "approve aborted (empty reason)"
approve(qp, final_file=final_file, notes=notes)
@@ -417,8 +411,8 @@ def _main_loop(stdscr: "curses._CursesWindow") -> None: # type: ignore
except ApplyError as e:
status_line = f"apply failed: {e}"
elif key == ord("m"):
if qp.proposal.tool in _REPORT_ONLY_TOOLS:
status_line = f"modify unavailable for {qp.proposal.tool}"
if qp.proposal.tool == TOOL_GITLEAKS_ALLOW:
status_line = "modify unavailable for gitleaks-allow"
continue
edited = _modify(stdscr, qp)
if edited is None:
@@ -531,7 +525,7 @@ def _detail_view(
pass
return
elif key == ord("m"):
if qp.proposal.tool in _REPORT_ONLY_TOOLS:
if qp.proposal.tool == TOOL_GITLEAKS_ALLOW:
return
edited = _modify(stdscr, qp)
if edited is not None:
+2 -4
View File
@@ -20,7 +20,6 @@ from ...agent_provider import (
AgentProvisionDir,
AgentProvisionFile,
AgentProvisionPlan,
provider_startup_args,
)
from ...backend.docker import util as docker_mod
from ...egress import EgressRoute
@@ -91,6 +90,7 @@ _RUNTIME = AgentProviderRuntime(
prompt_mode="append_file",
bypass_args=("--dangerously-skip-permissions",),
resume_args=("--continue",),
remote_control_args=("--remote-control",),
)
@@ -115,9 +115,8 @@ class ClaudeAgentProvider(AgentProvider):
color: str = "",
provider_settings: dict[str, object] | None = None,
) -> AgentProvisionPlan:
del forward_host_credentials, host_env
del forward_host_credentials, host_env, provider_settings
resolved_guest_env = dict(guest_env or {})
startup_args = provider_startup_args(provider_settings)
guest_home = self.guest_home
trusted_path = trusted_project_path or guest_home
@@ -200,7 +199,6 @@ class ClaudeAgentProvider(AgentProvider):
env_vars=env_vars,
guest_env=resolved_guest_env,
has_prompt=has_prompt,
startup_args=startup_args,
dirs=dirs,
files=tuple(files),
egress_routes=egress_routes,
+6 -9
View File
@@ -1,12 +1,12 @@
# bot-bottle Codex provider image.
#
# Mirrors the default Claude image shape: Node LTS, git/network tooling,
# non-root node user, and the provider CLI installed for that user.
# non-root node user, and the provider CLI installed globally.
FROM node:22-slim
RUN apt-get update \
&& apt-get install -y --no-install-recommends git ca-certificates curl procps \
&& apt-get install -y --no-install-recommends git ca-certificates curl \
&& rm -rf /var/lib/apt/lists/*
# App-specific deps. Python isn't required by codex itself
@@ -17,15 +17,12 @@ RUN apt-get update \
&& apt-get install -y --no-install-recommends python3 python3-pip python3-venv \
&& rm -rf /var/lib/apt/lists/*
RUN npm install -g --no-fund --no-audit @openai/codex@0.136.0 \
&& npm cache clean --force
USER node
WORKDIR /home/node
ENV PATH="/home/node/.local/bin:${PATH}"
# Remote-control support requires the standalone Codex install layout
# under ~/.codex/packages/standalone/current. The npm package can run
# the TUI, but remote-control commands expect this installer-owned path.
RUN mkdir -p /home/node/.codex \
&& curl -fsSL https://chatgpt.com/codex/install.sh | sh
RUN mkdir -p /home/node/.codex
CMD ["codex"]
+2 -4
View File
@@ -22,7 +22,6 @@ from ...agent_provider import (
AgentProvisionCommand,
AgentProvisionFile,
AgentProvisionPlan,
provider_startup_args,
)
from .codex_auth import codex_host_access_token, write_codex_dummy_auth_file
from ...egress import CODEX_HOST_CREDENTIAL_TOKEN_REF, EgressRoute
@@ -55,6 +54,7 @@ _RUNTIME = AgentProviderRuntime(
prompt_mode="read_prompt_file",
bypass_args=("--dangerously-bypass-approvals-and-sandbox",),
resume_args=("resume", "--last"),
remote_control_args=(),
)
@@ -79,9 +79,8 @@ class CodexAgentProvider(AgentProvider):
color: str = "",
provider_settings: dict[str, object] | None = None,
) -> AgentProvisionPlan:
del auth_token, label, color
del auth_token, label, color, provider_settings
resolved_guest_env = dict(guest_env or {})
startup_args = provider_startup_args(provider_settings)
guest_home = self.guest_home
trusted_path = trusted_project_path or guest_home
@@ -164,7 +163,6 @@ class CodexAgentProvider(AgentProvider):
env_vars=env_vars,
guest_env=resolved_guest_env,
has_prompt=has_prompt,
startup_args=startup_args,
dirs=tuple(dirs),
files=tuple(files),
pre_copy=tuple(pre_copy),
@@ -19,7 +19,7 @@ import urllib.error
import urllib.request
from pathlib import Path
from ...deploy_key_provisioner import DeployKeyCollisionError, DeployKeyProvisioner
from ...deploy_key_provisioner import DeployKeyProvisioner
class GiteaDeployKeyProvisioner(DeployKeyProvisioner):
@@ -71,11 +71,6 @@ class GiteaDeployKeyProvisioner(DeployKeyProvisioner):
body = json.loads(resp.read())
except urllib.error.HTTPError as exc:
_body = _read_error_body(exc)
if exc.code == 422:
raise DeployKeyCollisionError(
f"deploy key collision for {owner_repo!r} "
f"(title={title!r}): key title or content already registered — {_body}"
) from exc
raise RuntimeError(
f"failed to create deploy key for {owner_repo}: "
f"HTTP {exc.code}{_body}"
+1 -3
View File
@@ -21,7 +21,6 @@ from ...agent_provider import (
AgentProvisionDir,
AgentProvisionFile,
AgentProvisionPlan,
provider_startup_args,
)
from ...egress import EgressRoute
from ...log import die, info
@@ -166,6 +165,7 @@ _RUNTIME = AgentProviderRuntime(
prompt_mode="append_system_prompt",
bypass_args=(),
resume_args=(),
remote_control_args=(),
)
@@ -199,7 +199,6 @@ class PiAgentProvider(AgentProvider):
models_payload, base_url, api_key_env, models, provider_name = (
_pi_models_json(settings)
)
extra_startup_args = provider_startup_args(provider_settings)
models_file = state_dir / "pi-models.json"
models_file.write_text(json.dumps(models_payload, indent=2) + "\n")
models_file.chmod(0o600)
@@ -220,7 +219,6 @@ class PiAgentProvider(AgentProvider):
startup_args=(
"--models",
",".join(f"{provider_name}/{model}" for model in models),
*extra_startup_args,
),
dirs=(AgentProvisionDir(f"{guest_home}/.pi/agent"),),
files=(AgentProvisionFile(models_file, _models_path(guest_home)),),
-4
View File
@@ -11,10 +11,6 @@ from __future__ import annotations
from abc import ABC, abstractmethod
class DeployKeyCollisionError(RuntimeError):
"""Raised when a deploy key title or public key already exists on the repo."""
class DeployKeyProvisioner(ABC):
"""Manages a single deploy-key lifecycle on a remote forge."""
+8 -41
View File
@@ -1,4 +1,4 @@
"""DLP detectors for the egress proxy (PRD 0053).
"""DLP detectors for the egress proxy (PRD 0053, prd-new).
Pure Python, no mitmproxy dependency. Each detector is a module-level
function returning `ScanResult | None`.
@@ -80,27 +80,16 @@ TOKEN_PATTERNS: tuple[tuple[str, re.Pattern[str]], ...] = (
)
def scan_token_patterns(
text: str,
*,
location: str = "body",
safe_tokens: typing.AbstractSet[str] | None = None,
) -> ScanResult | None:
def scan_token_patterns(text: str, *, location: str = "body") -> ScanResult | None:
normalized = _normalize_text(text)
for name, pattern in TOKEN_PATTERNS:
for m in pattern.finditer(normalized):
value = m.group(0)
# A value the supervisor has approved (PRD 0062) is no longer a
# block — keep scanning so a second, un-approved token in the
# same request is still caught.
if safe_tokens is not None and value in safe_tokens:
continue
m = pattern.search(normalized)
if m is not None:
return ScanResult(
severity="block",
reason=f"{name} found in {location}",
location=location,
context=_snippet(normalized, m.start(), m.end()),
matched=value,
context=_snippet(text, m.start(), m.end()),
)
return None
@@ -123,7 +112,7 @@ def redact_tokens(
# ---------------------------------------------------------------------------
# Known secrets detector
# Known secrets detector (Phase 1b, prd-new)
# ---------------------------------------------------------------------------
def _encoded_variants(secret: str) -> list[str]:
@@ -165,7 +154,7 @@ def _encoded_variants(secret: str) -> list[str]:
# ---------------------------------------------------------------------------
# Fragmentation-resistant helpers
# Fragmentation-resistant helpers (prd-new)
# ---------------------------------------------------------------------------
# Minimum length of alnum projection for projection-based checks to run.
@@ -209,7 +198,6 @@ def scan_known_secrets(
location: str = "body",
env: typing.Mapping[str, str] | None = None,
sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",),
safe_tokens: typing.AbstractSet[str] | None = None,
) -> ScanResult | None:
if env is None:
return None
@@ -222,27 +210,15 @@ def scan_known_secrets(
continue
# Pass 1: exact match across encoded variants (original behaviour).
approved_exact = False
for variant in _encoded_variants(value):
pos = text.find(variant)
if pos >= 0:
# The supervisor approves the exact encoded variant found
# (PRD 0062); a different encoding of the same secret is a
# fresh block.
if safe_tokens is not None and variant in safe_tokens:
approved_exact = True
continue
return ScanResult(
severity="block",
reason=f"provisioned secret from {key} found in {location}",
location=location,
context=_snippet(text, pos, pos + len(variant)),
matched=variant,
)
if approved_exact:
# Exact match was found and approved; projection passes would
# fire on the same value, so skip them for this secret.
continue
# Pass 2 & 3: fragmentation-resistant projection checks.
secret_alnum = _alnum_projection(value)
@@ -283,7 +259,7 @@ def scan_known_secrets(
# ---------------------------------------------------------------------------
# Entropy detector (warn-only)
# Entropy detector (warn-only, prd-new)
# ---------------------------------------------------------------------------
# Sliding window size and step for the entropy scan.
@@ -432,14 +408,6 @@ _CRLF_ENCODED_RE = re.compile(r"%0[dD]%0[aA]", re.ASCII)
_CRLF_HEADER_INJECT_RE = re.compile(r"\r\n[A-Za-z][A-Za-z0-9\-]+\s*:", re.ASCII)
def strip_crlf(text: str) -> str:
"""Remove URL-encoded and literal CRLF injection sequences from a request
surface (PRD 0062 redact policy). Used to scrub the request line / headers
so the request can be forwarded instead of hard-blocked."""
text = _CRLF_ENCODED_RE.sub("", text)
return _CRLF_HEADER_INJECT_RE.sub(lambda m: m.group(0)[2:], text)
def scan_crlf_injection(text: str) -> ScanResult | None:
if _CRLF_ENCODED_RE.search(text):
return ScanResult(
@@ -470,5 +438,4 @@ __all__ = [
"scan_known_secrets",
"scan_naive_injection",
"scan_token_patterns",
"strip_crlf",
]
+5 -78
View File
@@ -17,7 +17,6 @@ from pathlib import Path
from typing import TYPE_CHECKING
from .egress_addon_core import (
ON_MATCH_REDACT,
HeaderMatch as CoreHeaderMatch,
MatchEntry as CoreMatchEntry,
PathMatch as CorePathMatch,
@@ -35,50 +34,6 @@ EGRESS_HOSTNAME = "egress"
EGRESS_ROUTES_IN_CONTAINER = "/etc/egress/routes.yaml"
EGRESS_ROUTES_FILENAME = Path(EGRESS_ROUTES_IN_CONTAINER).name
_CANARY_ENV_WORDS = (
"ACCORD",
"ANCHOR",
"ATLAS",
"CANON",
"CIPHER",
"EMBER",
"FALCON",
"HARBOR",
"LANTERN",
"MARBLE",
"NOVA",
"ORBIT",
"PIVOT",
"RADIUS",
"SUMMIT",
"VECTOR",
)
def _random_canary_env() -> str:
first = secrets.choice(_CANARY_ENV_WORDS)
remaining = tuple(word for word in _CANARY_ENV_WORDS if word != first)
second = secrets.choice(remaining)
return f"{first}_{second}_SECRET"
def egress_sidecar_env_entries(plan: "EgressPlan") -> tuple[str, ...]:
"""Return sidecar env entries needed by egress across all backends."""
env: list[str] = []
if plan.routes:
env.extend(sorted(plan.token_env_map.keys()))
if plan.canary and plan.canary_env:
env.append(f"{plan.canary_env}={plan.canary}")
env.append(f"BOT_BOTTLE_SENSITIVE_PREFIXES={plan.canary_env}")
return tuple(env)
def egress_agent_env_entries(plan: "EgressPlan") -> tuple[str, ...]:
"""Return agent-visible egress env entries shared by all backends."""
if plan.canary and plan.canary_env:
return (f"{plan.canary_env}={plan.canary}",)
return ()
@dataclass(frozen=True)
class EgressRoute(Route):
@@ -111,7 +66,6 @@ class EgressPlan:
mitmproxy_ca_cert_only_host_path: Path = Path()
log: int = 0
canary: str = ""
canary_env: str = ""
def egress_manifest_routes(
@@ -143,7 +97,6 @@ def egress_manifest_routes(
git_fetch=r.GitFetch,
outbound_detectors=r.OutboundDetectors,
inbound_detectors=r.InboundDetectors,
outbound_on_match=r.OutboundOnMatch,
))
return tuple(out)
@@ -154,27 +107,12 @@ def egress_routes_for_bottle(
) -> tuple[EgressRoute, ...]:
manifest = egress_manifest_routes(bottle)
provisioned_hosts = {pr.host.lower() for pr in provider_routes}
merged = list(_default_provider_on_match(provider_routes)) + [
merged = list(provider_routes) + [
r for r in manifest if r.host.lower() not in provisioned_hosts
]
return _assign_token_slots(merged)
def _default_provider_on_match(
provider_routes: tuple[EgressRoute, ...],
) -> tuple[EgressRoute, ...]:
"""Provider routes (the agent talking to its own LLM API) default to the
`redact` on-match policy (PRD 0062): high-volume conversation payloads are
the worst source of token-shaped false positives, so a match is scrubbed
and forwarded rather than hard-blocked or queued for the operator. A
provider that sets `outbound_on_match` explicitly keeps its choice."""
return tuple(
r if r.outbound_on_match
else dataclasses.replace(r, outbound_on_match=ON_MATCH_REDACT)
for r in provider_routes
)
def _assign_token_slots(
routes: list[EgressRoute],
) -> tuple[EgressRoute, ...]:
@@ -241,11 +179,7 @@ def _route_to_yaml_fields(r: Route) -> dict[str, object]:
fields["matches"] = matches_data
if r.git_fetch:
fields["git"] = {"fetch": True}
if (
r.outbound_detectors is not None
or r.inbound_detectors is not None
or r.outbound_on_match
):
if r.outbound_detectors is not None or r.inbound_detectors is not None:
dlp: dict[str, object] = {}
if r.outbound_detectors is not None:
dlp["outbound_detectors"] = (
@@ -257,8 +191,6 @@ def _route_to_yaml_fields(r: Route) -> dict[str, object]:
False if not r.inbound_detectors
else list(r.inbound_detectors)
)
if r.outbound_on_match:
dlp["outbound_on_match"] = r.outbound_on_match
fields["dlp"] = dlp
return fields
@@ -330,8 +262,6 @@ def egress_render_routes(
elif isinstance(dv, list):
items_str = ", ".join(f'"{x}"' for x in dv)
lines.append(f" {dk}: [{items_str}]")
elif isinstance(dv, str):
lines.append(f' {dk}: "{dv}"')
return "\n".join(lines) + "\n"
@@ -371,9 +301,9 @@ class Egress(ABC):
routes_path = stage_dir / EGRESS_ROUTES_FILENAME
routes_path.write_text(egress_render_routes(routes, log=log))
routes_path.chmod(0o600)
# Generate a per-session fake secret under a plausible random env name.
# The sidecar marks that exact env name as sensitive for known-secret
# scanning; the agent receives the same name/value as exfil bait.
# Generate a per-session canary token. The sidecar receives it as
# EGRESS_TOKEN_CANARY (scanned by the existing known-secrets detector);
# the agent receives it as BOT_BOTTLE_CANARY (a visible fake secret).
canary = secrets.token_urlsafe(32)
return EgressPlan(
slug=slug,
@@ -382,7 +312,6 @@ class Egress(ABC):
token_env_map=egress_token_env_map(routes),
log=log,
canary=canary,
canary_env=_random_canary_env(),
)
__all__ = [
@@ -397,7 +326,5 @@ __all__ = [
"egress_render_routes",
"egress_resolve_token_values",
"egress_routes_for_bottle",
"egress_agent_env_entries",
"egress_sidecar_env_entries",
"egress_token_env_map",
]
+22 -282
View File
@@ -5,7 +5,6 @@ egress container."""
from __future__ import annotations
import asyncio
import json
import os
import signal
@@ -17,15 +16,9 @@ from mitmproxy import http # type: ignore[import-not-found] # pylint: disable=
from egress_addon_core import ( # type: ignore[import-not-found] # pylint: disable=import-error
LOG_BLOCKS,
LOG_FULL,
DEFAULT_OUTBOUND_ON_MATCH,
ON_MATCH_BLOCK,
ON_MATCH_REDACT,
Config,
Route,
ScanResult,
build_inbound_scan_text,
build_outbound_scan_text,
build_token_allow_payload,
decide,
decide_git_fetch,
is_git_fetch_request,
@@ -39,55 +32,23 @@ from egress_addon_core import ( # type: ignore[import-not-found] # pylint: dis
)
try:
from dlp_detectors import redact_tokens, strip_crlf # type: ignore[import-not-found]
from dlp_detectors import redact_tokens # type: ignore[import-not-found]
except ImportError: # pragma: no cover - host-side path
from bot_bottle.dlp_detectors import ( # type: ignore[import-not-found]
redact_tokens,
strip_crlf,
)
try:
import supervise as _sv # type: ignore[import-not-found]
except ImportError: # pragma: no cover - host-side path
from bot_bottle import supervise as _sv # type: ignore[import-not-found]
from bot_bottle.dlp_detectors import redact_tokens # type: ignore[import-not-found]
DEFAULT_ROUTES_PATH = "/etc/egress/routes.yaml"
INTROSPECT_HOST = "_egress.local"
# Seconds the egress proxy holds a token-blocked request open waiting for the
# operator's supervisor decision (PRD 0062), overridable via env.
DEFAULT_TOKEN_ALLOW_TIMEOUT_SECONDS = 300.0
# Filesystem poll cadence while awaiting the operator's response.
TOKEN_ALLOW_POLL_INTERVAL_SECONDS = 0.5
# Fixed operator guidance attached to every token-allow proposal.
_TOKEN_ALLOW_JUSTIFICATION = (
"egress DLP blocked an outbound request carrying a detected token. "
"Approve only if this value is a false positive or a credential this "
"request legitimately needs; the value is then allowed for the life of "
"this bottle's egress proxy."
)
class EgressAddon:
def __init__(self) -> None:
self.routes_path = os.environ.get("EGRESS_ROUTES", DEFAULT_ROUTES_PATH)
self.config: Config = Config(routes=())
# Tokens the operator has approved this session (PRD 0062). In-memory
# only — a restart re-prompts. Mutated only from the asyncio loop that
# runs the addon hooks, so no lock is needed.
self.safe_tokens: set[str] = set()
self._supervise_queue_dir = os.environ.get("SUPERVISE_QUEUE_DIR", "").strip()
self._supervise_slug = os.environ.get("SUPERVISE_BOTTLE_SLUG", "").strip()
self._token_allow_timeout = _token_allow_timeout_from_env(os.environ)
self._reload(initial=True)
self._install_sighup()
def _supervise_available(self) -> bool:
return bool(self._supervise_queue_dir and self._supervise_slug)
def _reload(self, *, initial: bool = False) -> None:
try:
text = Path(self.routes_path).read_text(encoding="utf-8")
@@ -160,42 +121,31 @@ class EgressAddon:
)
def _log_request(self, flow: http.HTTPFlow) -> None:
headers = {
k: redact_tokens(v, env=os.environ)
for k, v in flow.request.headers.items()
if k.lower() != "authorization"
}
body = redact_tokens(flow.request.get_text(strict=False) or "", env=os.environ)
sys.stderr.write(
json.dumps({
"event": "egress_request",
"host": redact_tokens(flow.request.pretty_host, env=os.environ),
"method": flow.request.method,
"path": redact_tokens(flow.request.path, env=os.environ),
"headers": headers,
"body": body,
"headers": dict(flow.request.headers),
"body": flow.request.get_text(strict=False) or "",
})
+ "\n"
)
def _log_response(self, flow: http.HTTPFlow) -> None:
headers = {
k: redact_tokens(v, env=os.environ)
for k, v in flow.response.headers.items()
}
body = redact_tokens(flow.response.get_text(strict=False) or "", env=os.environ)
sys.stderr.write(
json.dumps({
"event": "egress_response",
"host": flow.request.pretty_host,
"status": flow.response.status_code,
"headers": headers,
"body": body,
"headers": dict(flow.response.headers),
"body": flow.response.get_text(strict=False) or "",
})
+ "\n"
)
async def request(self, flow: http.HTTPFlow) -> None:
def request(self, flow: http.HTTPFlow) -> None:
request_path, _, query = flow.request.path.partition("?")
if flow.request.pretty_host == INTROSPECT_HOST:
@@ -207,11 +157,21 @@ class EgressAddon:
# Hostname is included to catch DNS-tunnelling exfiltration attempts.
route = match_route(self.config.routes, flow.request.pretty_host)
if route is not None:
if not await self._handle_outbound_dlp(flow, route):
body = flow.request.get_text(strict=False) or ""
scan_text = build_outbound_scan_text(
flow.request.pretty_host,
request_path,
query,
outbound_scan_headers(route, dict(flow.request.headers)),
body,
)
dlp_result = scan_outbound(route, scan_text, os.environ)
if dlp_result is not None and dlp_result.severity == "block":
ctx = self._req_ctx(flow)
if dlp_result.context:
ctx = {**ctx, "context": dlp_result.context}
self._block(flow, f"egress DLP: {dlp_result.reason}", ctx=ctx)
return
# The redact policy may have rewritten the request line; recompute
# the path/query the git checks below rely on.
request_path, _, query = flow.request.path.partition("?")
if is_git_push_request(request_path, query):
self._block(
@@ -261,202 +221,6 @@ class EgressAddon:
if self.config.log >= LOG_FULL:
self._log_request(flow)
def _block_dlp(self, flow: http.HTTPFlow, result: ScanResult) -> None:
ctx = self._req_ctx(flow)
if result.context:
ctx = {**ctx, "context": result.context}
self._block(flow, f"egress DLP: {result.reason}", ctx=ctx)
async def _handle_outbound_dlp(
self,
flow: http.HTTPFlow,
route: Route,
) -> bool:
"""Scan the outbound request and apply the route's on-match policy
(PRD 0062). Returns True if the request may be forwarded, False if a
403 response has been written to `flow`.
Loops so the supervise policy can re-scan after each approval a
second, un-approved token in the same request is still caught."""
while True:
request_path, _, query = flow.request.path.partition("?")
body = flow.request.get_text(strict=False) or ""
headers = outbound_scan_headers(route, dict(flow.request.headers))
scan_text = build_outbound_scan_text(
flow.request.pretty_host, request_path, query, headers, body,
)
# CRLF is scanned only over the request line + headers, never the
# body (see scan_outbound) — a body is not an injection vector.
crlf_text = build_outbound_scan_text(
flow.request.pretty_host, request_path, query, headers, "",
)
result = scan_outbound(
route, scan_text, os.environ,
safe_tokens=self.safe_tokens, crlf_text=crlf_text,
)
if result is None or result.severity != "block":
return True
policy = route.outbound_on_match or DEFAULT_OUTBOUND_ON_MATCH
# redact scrubs every detection (tokens and structural CRLF) and
# forwards; it fails closed only if a match survives the scrub.
if policy == ON_MATCH_REDACT:
if self._redact_outbound(flow, route):
if self.config.log >= LOG_BLOCKS:
sys.stderr.write(json.dumps({
"event": "egress_redacted",
"reason": f"egress DLP: {result.reason}",
**self._req_ctx(flow),
}) + "\n")
return True
self._block(
flow,
f"egress DLP: {result.reason}; redaction could not remove "
"all matches (e.g. a match in the hostname)",
ctx=self._req_ctx(flow),
)
return False
# Structural blocks (CRLF, no safelist-able value) cannot be
# supervised — there is nothing to approve and remember — so under
# block/supervise they are a hard 403.
if policy == ON_MATCH_BLOCK or not result.matched:
self._block_dlp(flow, result)
return False
# supervise (default): hold the request for operator approval.
# Fall back to a hard 403 when supervise isn't wired for the bottle.
if not self._supervise_available():
self._block_dlp(flow, result)
return False
approved = await self._supervise_token_block(flow, request_path, result)
if not approved:
return False # _supervise_token_block wrote the 403 response
# loop: the approved value is now in safe_tokens; re-scan.
def _redact_outbound(self, flow: http.HTTPFlow, route: Route) -> bool:
"""Scrub detected tokens (and CRLF injection sequences) from the mutable
request surfaces (body, headers, path/query) and re-scan. Returns True
if the request is now clean; False if a block-severity match remains on
a surface redaction cannot rewrite (the hostname) so the caller fails
closed."""
body = flow.request.get_text(strict=False)
if body:
redacted_body = redact_tokens(body, env=os.environ)
if redacted_body != body:
flow.request.text = redacted_body
for name, value in list(flow.request.headers.items()):
if name.lower() == "host":
continue # routing-critical; never a legitimate token
redacted = strip_crlf(redact_tokens(value, env=os.environ))
if redacted != value:
flow.request.headers[name] = redacted
redacted_path = strip_crlf(redact_tokens(flow.request.path, env=os.environ))
if redacted_path != flow.request.path:
flow.request.path = redacted_path
request_path, _, query = flow.request.path.partition("?")
new_body = flow.request.get_text(strict=False) or ""
headers = outbound_scan_headers(route, dict(flow.request.headers))
scan_text = build_outbound_scan_text(
flow.request.pretty_host, request_path, query, headers, new_body,
)
crlf_text = build_outbound_scan_text(
flow.request.pretty_host, request_path, query, headers, "",
)
result = scan_outbound(route, scan_text, os.environ, crlf_text=crlf_text)
return result is None or result.severity != "block"
async def _supervise_token_block(
self,
flow: http.HTTPFlow,
request_path: str,
result: ScanResult,
) -> bool:
"""Route a token DLP block to the operator's supervisor queue and wait.
Returns True if the operator approved (the matched value is added to
`self.safe_tokens` and the caller re-scans); False if the request must
be blocked (a 403 response has been written to `flow`)."""
host = flow.request.pretty_host
payload = build_token_allow_payload(
redact_tokens(host, env=os.environ),
flow.request.method,
redact_tokens(request_path, env=os.environ),
result,
)
proposal = _sv.Proposal.new(
bottle_slug=self._supervise_slug,
tool=_sv.TOOL_EGRESS_TOKEN_ALLOW,
proposed_file=payload,
justification=_TOKEN_ALLOW_JUSTIFICATION,
current_file_hash=_sv.sha256_hex(payload),
)
queue_dir = Path(self._supervise_queue_dir)
try:
_sv.write_proposal(queue_dir, proposal)
except OSError as e:
sys.stderr.write(
f"egress: could not queue token-allow proposal: {e}; "
"blocking request\n"
)
self._block(flow, f"egress DLP: {result.reason}", ctx=self._req_ctx(flow))
return False
sys.stderr.write(json.dumps({
"event": "egress_token_supervise",
"reason": f"egress DLP: {result.reason}",
"proposal": proposal.id,
**self._req_ctx(flow),
}) + "\n")
response = await self._await_token_response(queue_dir, proposal.id)
_sv.archive_proposal(queue_dir, proposal.id)
if response is not None and response.status in (
_sv.STATUS_APPROVED, _sv.STATUS_MODIFIED,
):
self.safe_tokens.add(result.matched)
if self.config.log >= LOG_BLOCKS:
sys.stderr.write(json.dumps({
"event": "egress_token_allowed",
"reason": f"egress DLP: {result.reason}",
"proposal": proposal.id,
**self._req_ctx(flow),
}) + "\n")
return True
if response is None:
reason = (
f"egress DLP: {result.reason}; supervisor approval timed out "
f"after {self._token_allow_timeout:g}s"
)
else:
reason = f"egress DLP: {result.reason}; supervisor rejected the request"
self._block(flow, reason, ctx=self._req_ctx(flow))
return False
async def _await_token_response(
self,
queue_dir: Path,
proposal_id: str,
) -> "_sv.Response | None":
"""Poll the queue dir for the operator's response without blocking the
proxy event loop. Returns the Response, or None on timeout."""
loop = asyncio.get_running_loop()
deadline = loop.time() + self._token_allow_timeout
while True:
try:
return _sv.read_response(queue_dir, proposal_id)
except (OSError, ValueError, KeyError):
# Not written yet, or a partial/malformed write — retry until
# the deadline, then fail closed.
pass
if loop.time() >= deadline:
return None
await asyncio.sleep(TOKEN_ALLOW_POLL_INTERVAL_SECONDS)
def response(self, flow: http.HTTPFlow) -> None:
"""DLP inbound scan on response headers and body."""
route = match_route(self.config.routes, flow.request.pretty_host)
@@ -508,12 +272,7 @@ class EgressAddon:
message = flow.websocket.messages[-1] # type: ignore[union-attr]
content = message.content.decode("utf-8", errors="replace")
if message.from_client:
# A WebSocket data frame is not an HTTP request line, so CRLF is
# not an injection vector here — scan only for credential leakage.
result = scan_outbound(
route, content, os.environ,
safe_tokens=self.safe_tokens, crlf_text="",
)
result = scan_outbound(route, content, os.environ)
if result is not None and result.severity == "block":
sys.stderr.write(f"egress DLP: {result.reason}\n")
flow.kill() # type: ignore[union-attr]
@@ -527,23 +286,4 @@ class EgressAddon:
sys.stderr.write(f"egress DLP warn: {result.reason}\n")
def _token_allow_timeout_from_env(env: "os._Environ[str]") -> float:
"""Read EGRESS_TOKEN_ALLOW_TIMEOUT_SECONDS; fall back to the default on an
unset or invalid value (a bad value should not wedge egress at boot)."""
raw = env.get("EGRESS_TOKEN_ALLOW_TIMEOUT_SECONDS", "").strip()
if not raw:
return DEFAULT_TOKEN_ALLOW_TIMEOUT_SECONDS
try:
value = float(raw)
except ValueError:
value = 0.0
if value <= 0:
sys.stderr.write(
"egress: invalid EGRESS_TOKEN_ALLOW_TIMEOUT_SECONDS="
f"{raw!r}; using default {DEFAULT_TOKEN_ALLOW_TIMEOUT_SECONDS:g}s\n"
)
return DEFAULT_TOKEN_ALLOW_TIMEOUT_SECONDS
return value
addons = [EgressAddon()]
+22 -78
View File
@@ -37,15 +37,6 @@ VALID_METHODS = frozenset({
OUTBOUND_DETECTOR_NAMES = frozenset({"token_patterns", "known_secrets", "entropy"})
INBOUND_DETECTOR_NAMES = frozenset({"naive_injection_detection"})
# Per-route policy for what the proxy does when an outbound DLP detector
# matches a token (PRD 0062).
ON_MATCH_BLOCK = "block" # hard 403, never overridable
ON_MATCH_REDACT = "redact" # scrub the matched value, forward the request
ON_MATCH_SUPERVISE = "supervise" # queue for operator approval, hold the request
OUTBOUND_ON_MATCH_VALUES = (ON_MATCH_BLOCK, ON_MATCH_REDACT, ON_MATCH_SUPERVISE)
# Unset resolves to supervise (fall back to block when supervise is not wired).
DEFAULT_OUTBOUND_ON_MATCH = ON_MATCH_SUPERVISE
@dataclass(frozen=True)
class PathMatch:
@@ -78,8 +69,6 @@ class Route:
git_fetch: bool = False
outbound_detectors: tuple[str, ...] | None = None
inbound_detectors: tuple[str, ...] | None = None
# "" means unset → DEFAULT_OUTBOUND_ON_MATCH. See OUTBOUND_ON_MATCH_VALUES.
outbound_on_match: str = ""
LOG_OFF = 0 # no logging
@@ -106,11 +95,6 @@ class ScanResult:
reason: str
location: str = "" # where the match was found, e.g. "body", "authorization header"
context: str = "" # surrounding text with the match replaced by REDACT
# Raw substring the detector matched. Used inside the sidecar to key the
# supervisor-approved "safe tokens" set (PRD 0062); never logged or written
# to a proposal file. Empty for structural detectors (CRLF) that carry no
# safelist-able value.
matched: str = ""
# ---------------------------------------------------------------------------
@@ -234,12 +218,12 @@ def _parse_detectors(
idx: int,
host: str,
raw_dict: dict[str, object],
) -> tuple[tuple[str, ...] | None, tuple[str, ...] | None, str]:
) -> tuple[tuple[str, ...] | None, tuple[str, ...] | None]:
"""Parse the optional `dlp` block on a route, returning
(outbound_detectors, inbound_detectors, outbound_on_match)."""
(outbound_detectors, inbound_detectors)."""
dlp_raw = raw_dict.get("dlp")
if dlp_raw is None:
return None, None, ""
return None, None
label = f"route[{idx}] ({host})"
if not isinstance(dlp_raw, dict):
raise ValueError(f"{label}: 'dlp' must be an object")
@@ -276,24 +260,13 @@ def _parse_detectors(
outbound = _parse_detector_field("outbound_detectors", OUTBOUND_DETECTOR_NAMES)
inbound = _parse_detector_field("inbound_detectors", INBOUND_DETECTOR_NAMES)
on_match = ""
on_match_raw = dlp.get("outbound_on_match")
if on_match_raw is not None:
if not isinstance(on_match_raw, str) or on_match_raw not in OUTBOUND_ON_MATCH_VALUES:
raise ValueError(
f"{label}: dlp.outbound_on_match must be one of "
f"{', '.join(OUTBOUND_ON_MATCH_VALUES)} (got {on_match_raw!r})"
)
on_match = on_match_raw
for k in dlp:
if k not in ("outbound_detectors", "inbound_detectors", "outbound_on_match"):
if k not in ("outbound_detectors", "inbound_detectors"):
raise ValueError(
f"{label}: dlp has unknown key {k!r}; accepted keys "
f"are 'outbound_detectors', 'inbound_detectors', "
f"'outbound_on_match'"
f"are 'outbound_detectors', 'inbound_detectors'"
)
return outbound, inbound, on_match
return outbound, inbound
def parse_routes(payload: object) -> tuple[Route, ...]:
@@ -364,7 +337,7 @@ def _parse_one(idx: int, raw: object) -> Route:
)
# dlp detectors
outbound_detectors, inbound_detectors, outbound_on_match = _parse_detectors(
outbound_detectors, inbound_detectors = _parse_detectors(
idx, host, raw_dict,
)
@@ -383,7 +356,6 @@ def _parse_one(idx: int, raw: object) -> Route:
git_fetch=git_fetch,
outbound_detectors=outbound_detectors,
inbound_detectors=inbound_detectors,
outbound_on_match=outbound_on_match,
)
@@ -432,13 +404,20 @@ def route_to_yaml_dict(r: Route) -> dict[str, object]:
dlp["outbound_detectors"] = list(r.outbound_detectors)
if r.inbound_detectors is not None:
dlp["inbound_detectors"] = list(r.inbound_detectors)
if r.outbound_on_match:
dlp["outbound_on_match"] = r.outbound_on_match
if dlp:
d["dlp"] = dlp
return d
def load_routes(text: str) -> tuple[Route, ...]:
"""Parse YAML text → routes."""
try:
payload = parse_yaml_subset(text)
except YamlSubsetError as e:
raise ValueError(f"routes payload: invalid YAML: {e}") from e
return parse_routes(payload)
def parse_config(payload: object) -> "Config":
"""Parse a full egress config payload (top-level log level + routes)."""
if not isinstance(payload, dict):
@@ -711,9 +690,6 @@ def scan_outbound(
route: Route,
body: str | bytes,
environ: typing.Mapping[str, str],
*,
safe_tokens: typing.AbstractSet[str] | None = None,
crlf_text: str | None = None,
) -> ScanResult | None:
# Lazy import to avoid circular deps and keep dlp_detectors optional
# at import time (the sidecar copies it flat alongside this file).
@@ -743,19 +719,14 @@ def scan_outbound(
else:
text = body
# CRLF injection is only an attack in the request line + headers, never the
# body: an HTTP body is delimited by Content-Length, so CRLF bytes there
# cannot split the request. Scanning the body produces false positives on
# legitimate form-encoded / multi-line content. Callers pass the
# body-excluded surfaces as `crlf_text`; `None` falls back to the full text
# for backward-compatible callers (host-side tests, websocket frames).
crlf_target = text if crlf_text is None else crlf_text
result = scan_crlf_injection(crlf_target)
# CRLF injection is never legitimate — runs unconditionally, not gated
# by outbound_detectors config.
result = scan_crlf_injection(text)
if result is not None:
return result
if _detector_enabled(route.outbound_detectors, "token_patterns"):
result = scan_token_patterns(text, location="body", safe_tokens=safe_tokens)
result = scan_token_patterns(text, location="body")
if result is not None:
return result
@@ -766,8 +737,7 @@ def scan_outbound(
extra = tuple(p for p in extra_raw.split(",") if p)
sensitive_prefixes = ("EGRESS_TOKEN_",) + extra
result = scan_known_secrets(
text, location="body", env=environ,
sensitive_prefixes=sensitive_prefixes, safe_tokens=safe_tokens,
text, location="body", env=environ, sensitive_prefixes=sensitive_prefixes,
)
if result is not None:
return result
@@ -787,27 +757,6 @@ def scan_outbound(
return None
def build_token_allow_payload(
host: str,
method: str,
path: str,
result: ScanResult,
) -> str:
"""Render the human-readable supervisor proposal body for an outbound
token block (PRD 0062). Carries the host/method/path, the detector
reason, and the redacted context snippet never the raw token value."""
lines = [
"egress blocked an outbound request carrying a detected token",
f"host: {host}",
f"method: {method}",
f"path: {path}",
f"detector: {result.reason}",
]
if result.context:
lines.append(f"context: {result.context}")
return "\n".join(lines) + "\n"
def scan_inbound(
route: Route,
body: str | bytes,
@@ -832,11 +781,6 @@ __all__ = [
"route_to_yaml_dict",
"LOG_FULL",
"LOG_OFF",
"ON_MATCH_BLOCK",
"ON_MATCH_REDACT",
"ON_MATCH_SUPERVISE",
"OUTBOUND_ON_MATCH_VALUES",
"DEFAULT_OUTBOUND_ON_MATCH",
"Config",
"Decision",
"HeaderMatch",
@@ -846,13 +790,13 @@ __all__ = [
"ScanResult",
"build_inbound_scan_text",
"build_outbound_scan_text",
"build_token_allow_payload",
"decide",
"decide_git_fetch",
"evaluate_matches",
"is_git_push_request",
"is_git_fetch_request",
"load_config",
"load_routes",
"match_route",
"outbound_scan_headers",
"parse_config",
+6 -28
View File
@@ -199,10 +199,13 @@ def _parse_provider_settings(
) -> dict[str, object]:
if raw is None:
return {}
if template != "pi":
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.settings is only "
"supported for template 'pi'"
)
settings = as_json_object(raw, f"bottle '{bottle_name}' agent_provider.settings")
common_allowed = {"startup_args"}
pi_allowed = {
allowed = {
"provider",
"base_url",
"api",
@@ -215,37 +218,12 @@ def _parse_provider_settings(
"supports_developer_role",
"supports_reasoning_effort",
}
if template == "pi":
allowed = common_allowed | pi_allowed
elif template in ("claude", "codex"):
allowed = common_allowed
elif template not in PROVIDER_TEMPLATES:
return dict(settings)
else:
allowed = common_allowed
for key in settings:
if key not in allowed:
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.settings has unknown "
f"key {key!r}; allowed: {', '.join(sorted(allowed))}"
)
startup_args = settings.get("startup_args")
if startup_args is not None:
if not isinstance(startup_args, list):
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.settings.startup_args "
f"must be an array of strings"
)
for i, arg in enumerate(startup_args):
if not isinstance(arg, str) or not arg:
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.settings."
f"startup_args[{i}] must be a non-empty string"
)
if template != "pi":
return dict(settings)
for key in ("provider", "base_url", "api", "api_key", "api_key_env"):
value = settings.get(key)
if value is not None and (not isinstance(value, str) or not value):
+5 -22
View File
@@ -21,9 +21,6 @@ VALID_METHODS = frozenset({
OUTBOUND_DETECTOR_NAMES = frozenset({"token_patterns", "known_secrets"})
INBOUND_DETECTOR_NAMES = frozenset({"naive_injection_detection"})
# What the proxy does on an outbound token match (PRD 0062).
OUTBOUND_ON_MATCH_VALUES = ("block", "redact", "supervise")
def validate_egress_routes(
bottle_name: str,
@@ -70,7 +67,6 @@ class ManifestEgressRoute:
GitFetch: bool = False
OutboundDetectors: tuple[str, ...] | None = None
InboundDetectors: tuple[str, ...] | None = None
OutboundOnMatch: str = ""
@classmethod
def from_dict(cls, bottle_name: str, idx: int, raw: object) -> "ManifestEgressRoute":
@@ -165,9 +161,8 @@ class ManifestEgressRoute:
# --- dlp ---
outbound_detectors: tuple[str, ...] | None = None
inbound_detectors: tuple[str, ...] | None = None
outbound_on_match = ""
if "dlp" in d:
outbound_detectors, inbound_detectors, outbound_on_match = _parse_dlp_block(
outbound_detectors, inbound_detectors = _parse_dlp_block(
label, d.get("dlp"),
)
@@ -206,7 +201,6 @@ class ManifestEgressRoute:
GitFetch=git_fetch,
OutboundDetectors=outbound_detectors,
InboundDetectors=inbound_detectors,
OutboundOnMatch=outbound_on_match,
)
@@ -329,7 +323,7 @@ def _parse_header_match(
def _parse_dlp_block(
route_label: str,
raw: object,
) -> tuple[tuple[str, ...] | None, tuple[str, ...] | None, str]:
) -> tuple[tuple[str, ...] | None, tuple[str, ...] | None]:
label = f"{route_label} dlp"
d = as_json_object(raw, label)
@@ -364,24 +358,13 @@ def _parse_dlp_block(
outbound = _parse_field("outbound_detectors", OUTBOUND_DETECTOR_NAMES)
inbound = _parse_field("inbound_detectors", INBOUND_DETECTOR_NAMES)
on_match = ""
on_match_raw = d.get("outbound_on_match")
if on_match_raw is not None:
if not isinstance(on_match_raw, str) or on_match_raw not in OUTBOUND_ON_MATCH_VALUES:
raise ManifestError(
f"{label} outbound_on_match must be one of "
f"{', '.join(OUTBOUND_ON_MATCH_VALUES)} (got {on_match_raw!r})"
)
on_match = on_match_raw
for k in d:
if k not in ("outbound_detectors", "inbound_detectors", "outbound_on_match"):
if k not in ("outbound_detectors", "inbound_detectors"):
raise ManifestError(
f"{label} has unknown key {k!r}; accepted keys are "
f"'outbound_detectors', 'inbound_detectors', "
f"'outbound_on_match'"
f"'outbound_detectors', 'inbound_detectors'"
)
return outbound, inbound, on_match
return outbound, inbound
LOG_LEVELS = frozenset({0, 1, 2})
+3 -10
View File
@@ -50,18 +50,14 @@ SUPERVISE_PORT = 9100
TOOL_CAPABILITY_BLOCK = "capability-block"
TOOL_EGRESS_BLOCK = "egress-block"
TOOL_EGRESS_ALLOW = "egress-allow"
TOOL_ALLOW = "allow"
TOOL_GITLEAKS_ALLOW = "gitleaks-allow"
# Written directly by the egress addon (not an agent-facing MCP tool) when an
# outbound DLP token block is routed to the operator for override (PRD 0062).
TOOL_EGRESS_TOKEN_ALLOW = "egress-token-allow"
TOOL_LIST_EGRESS_ROUTES = "list-egress-routes"
TOOLS: tuple[str, ...] = (
TOOL_EGRESS_ALLOW,
TOOL_ALLOW,
TOOL_CAPABILITY_BLOCK,
TOOL_EGRESS_BLOCK,
TOOL_GITLEAKS_ALLOW,
TOOL_EGRESS_TOKEN_ALLOW,
TOOL_LIST_EGRESS_ROUTES,
)
@@ -80,7 +76,7 @@ EGRESS_INTROSPECT_URL = "http://_egress.local/allowlist"
# here — those changes are captured by git history + the rebuild record
# laid down in PRD 0016.
COMPONENT_FOR_TOOL: dict[str, str] = {
TOOL_EGRESS_ALLOW: "egress",
TOOL_ALLOW: "egress",
TOOL_EGRESS_BLOCK: "egress",
}
@@ -559,10 +555,7 @@ __all__ = [
"EGRESS_FORWARD_PROXY",
"EGRESS_INTROSPECT_URL",
"TOOL_CAPABILITY_BLOCK",
"TOOL_EGRESS_ALLOW",
"TOOL_EGRESS_BLOCK",
"TOOL_GITLEAKS_ALLOW",
"TOOL_EGRESS_TOKEN_ALLOW",
"TOOL_LIST_EGRESS_ROUTES",
"archive_proposal",
"audit_dir",
+7 -14
View File
@@ -47,11 +47,11 @@ from pathlib import Path
try:
# Same-directory imports inside the bundle container; these files are
# COPYed flat under /app by Dockerfile.sidecars.
from egress_addon_core import LOG_OFF, load_config
from egress_addon_core import load_routes
import supervise as _sv
except ModuleNotFoundError:
# Package imports for host-side tests and tooling.
from .egress_addon_core import LOG_OFF, load_config
from .egress_addon_core import load_routes
from . import supervise as _sv
@@ -148,7 +148,7 @@ TOOL_DEFINITIONS: list[dict[str, object]] = [
"allowlist. Returns JSON with one entry per allowed host, "
"each carrying its matches rules (if any) and whether "
"the proxy injects Authorization for the route. Use this "
"before composing an `egress-allow` or `egress-block` proposal so "
"before composing an `allow` or `egress-block` proposal so "
"the new routes file extends the live one rather than "
"replacing it."
),
@@ -159,7 +159,7 @@ TOOL_DEFINITIONS: list[dict[str, object]] = [
},
},
{
"name": _sv.TOOL_EGRESS_ALLOW,
"name": _sv.TOOL_ALLOW,
"description": (
"Request operator approval to change the bottle's egress "
"allowlist. Pass the full proposed routes.yaml content, not "
@@ -187,7 +187,6 @@ TOOL_DEFINITIONS: list[dict[str, object]] = [
" dlp: (optional DLP scanner overrides)\n"
" outbound_detectors: [token_patterns, known_secrets]\n"
" inbound_detectors: [naive_injection_detection]\n"
" outbound_on_match: block|redact|supervise (default supervise)\n"
"Omit any key that should use its default. "
"`list-egress-routes` returns routes in this same format."
),
@@ -229,7 +228,6 @@ TOOL_DEFINITIONS: list[dict[str, object]] = [
" dlp: (optional DLP scanner overrides)\n"
" outbound_detectors: [token_patterns, known_secrets]\n"
" inbound_detectors: [naive_injection_detection]\n"
" outbound_on_match: block|redact|supervise (default supervise)\n"
"Omit any key that should use its default. "
"`list-egress-routes` returns routes in this same format."
),
@@ -276,7 +274,7 @@ TOOL_DEFINITIONS: list[dict[str, object]] = [
# Map each proposal tool to the input field that carries the agent's
# payload (stored in Proposal.proposed_file).
PROPOSED_FILE_FIELD: dict[str, str] = {
_sv.TOOL_EGRESS_ALLOW: "routes_yaml",
_sv.TOOL_ALLOW: "routes_yaml",
_sv.TOOL_CAPABILITY_BLOCK: "dockerfile",
_sv.TOOL_EGRESS_BLOCK: "routes_yaml",
}
@@ -295,19 +293,14 @@ def validate_proposed_file(tool: str, content: str) -> None:
# Dockerfiles are too varied to validate syntactically beyond
# non-empty. The operator reads the diff in the TUI.
pass
elif tool in (_sv.TOOL_EGRESS_ALLOW, _sv.TOOL_EGRESS_BLOCK):
elif tool in (_sv.TOOL_ALLOW, _sv.TOOL_EGRESS_BLOCK):
try:
config = load_config(content)
load_routes(content)
except ValueError as e:
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: proposed routes.yaml is not valid: {e}",
) from e
if config.log != LOG_OFF:
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: proposed routes.yaml must not change egress logging",
)
else:
raise _RpcError(ERR_INVALID_PARAMS, f"unknown tool {tool!r}")
@@ -1,210 +0,0 @@
# PRD 0062: Supervisor override for egress token blocks
- **Status:** Active
- **Author:** claude
- **Created:** 2026-06-24
- **Issue:** #261
## Summary
Give each egress route a policy for what happens when an outbound DLP detector
matches a token, via `dlp.outbound_on_match: block | redact | supervise`
(default `supervise`):
- **`supervise`** (default) — route the block through the existing supervisor
approval queue instead of returning `403` immediately. The proxy holds the
request open until the operator approves or rejects it. On approval the
matched token is added to an in-memory "safe tokens" set so the request — and
any later request carrying the same token — flows through without
re-prompting.
- **`redact`** — scrub the matched value(s) from the request and forward it,
no operator in the loop. For routes where a token-shaped value is noise the
upstream doesn't need (telemetry/log sinks). Fails closed if a match lands on
a surface redaction can't rewrite (the hostname).
- **`block`** — the original hard `403`; never overridable. For routes where a
detected token must always stop.
The motivating goal is reducing friction from false positives without weakening
the default-deny posture: supervise keeps a human in the loop, redact is an
explicit per-route opt-in, and block stays available for sensitive routes.
## Problem
The outbound DLP detectors (`token_patterns`, `known_secrets`) are
deliberately aggressive: any string that looks like a credential is blocked
before it leaves the bottle. That is the right default, but it produces false
positives — a token-shaped value that is not actually a secret, or a credential
the agent legitimately needs to send to a declared host. Today the only
recovery is for the operator to notice the `egress DLP` 403 in the logs and
hand-edit the route's `dlp.outbound_detectors`, which disables the detector for
the whole route rather than allowing the one value.
The operator has no in-the-loop signal that a token block happened and no
fine-grained way to say "this specific value is fine."
## Goals / Success Criteria
1. An outbound DLP **token** block (a `ScanResult` carrying a matched secret
value) creates a supervisor proposal instead of an immediate `403`.
2. The egress proxy holds the blocked request open, polling for the operator's
response up to a bounded timeout.
3. The proposal shows the operator the host, method, path, the detector reason,
and a **redacted** context snippet — never the raw token value.
4. On `approved`/`modified`, the matched token value is added to an in-memory
safe-tokens set and the request proceeds normally; later requests carrying
the same value skip the block.
5. On `rejected`, timeout, malformed response, or missing supervisor wiring,
the request fails closed with the same `403` as today.
6. Structural blocks that carry no token value (CRLF injection) and the
route-not-allowlisted / git blocks are unchanged — they stay hard `403`s and
keep their existing agent-driven `allow` / `egress-block` MCP path.
7. The proxy event loop is not stalled while waiting: the wait is asynchronous,
so other flows keep being served.
## Non-goals
- Persisting the safe-tokens set across egress restarts. It lives in process
memory only; a restart re-prompts. (The issue explicitly defers persistence.)
- Supervising inbound (prompt-injection) blocks or WebSocket frame blocks.
WebSocket frames still honour the safe-tokens set for already-approved values
but cannot wait for approval (there is no response surface after upgrade).
- Generalising an approved secret across encodings. The safe-tokens set matches
the exact value the detector found.
- Replacing the per-route `dlp.outbound_detectors` override. That remains the
way to turn a detector off wholesale.
- Making `redact` the default. Silent redaction of a true false positive
corrupts legitimate data, so it is opt-in per route; `supervise` (human in
the loop) stays the default.
## Scope
### In scope
The minimum cut that ships, in build order:
1. **Core**`ScanResult.matched`; thread `safe_tokens` through
`scan_outbound` / the token detectors; `build_token_allow_payload`.
2. **Supervise + TUI**`TOOL_EGRESS_TOKEN_ALLOW`; TUI suffix, modify guard,
required approval reason.
3. **Addon glue** — async `request`, safe-tokens set, proposal write + async
poll, allow/block decision; pass `safe_tokens` into the WebSocket path.
4. **On-match policy**`dlp.outbound_on_match` through manifest → render →
addon; `redact` surface scrub with fail-closed re-scan; policy dispatch in
the addon's outbound handler.
5. **Tests + docs** — core/supervise/TUI/manifest/render unit tests; README
egress + supervisor notes.
### Out of scope
The deferrals enumerated under **Non-goals** — restart persistence, inbound /
WebSocket-frame supervision, cross-encoding generalisation, replacing
`dlp.outbound_detectors`, and making `redact` the default.
## Proposed Design
### New services / components
A new proposal tool constant `egress-token-allow` (`TOOL_EGRESS_TOKEN_ALLOW`)
is added to `supervise.TOOLS`, and the egress addon gains an in-memory
safe-tokens set plus the policy-dispatch path that drives it.
On an outbound block the addon dispatches on the resolved policy:
- **Structural blocks always 403.** A `ScanResult` with no `matched` value
(CRLF injection) is a hard `403` regardless of policy — there is nothing to
redact or safelist.
- **`redact`** runs `redact_tokens` over the body, non-`host` header values,
and path/query, then re-scans. If the re-scan is clean the (rewritten)
request is forwarded; if a block-severity match remains (e.g. in the
hostname, or a unicode-evasion token redaction can't reach) it fails closed
with a `403`.
- **`block`** writes the `403` immediately.
- **`supervise`** runs the queue-and-wait loop, falling back to `block` when
supervise isn't wired for the bottle.
For `supervise`, the addon writes the proposal directly to
`SUPERVISE_QUEUE_DIR` (the queue is bind-mounted into the sidecar bundle and
shared by every daemon, exactly as git-gate's `gitleaks-allow` proposal in PRD
0061 does). The proposal's `proposed_file` is a human-readable text payload
built by `build_token_allow_payload`:
```
egress blocked an outbound request carrying a detected token
host: api.example.com
method: POST
path: /v1/ingest
detector: OpenAI API key found in body
context: ...before ******** after...
```
The justification tells the operator to approve only if the value is a false
positive or a credential the request legitimately needs. The addon then polls
`<proposal-id>.response.json` for `EGRESS_TOKEN_ALLOW_TIMEOUT_SECONDS` (default
300). `approved`/`modified` allow the request and add the value to the
safe-tokens set; `rejected`, malformed responses, and timeout fail the request
closed. The proposal + response are archived to `processed/` after a decision.
Because the wait happens inside mitmproxy's asyncio loop, the addon's `request`
hook is async and polls with `asyncio.sleep`, so concurrent flows are
unaffected.
### Existing code touched
- **Policy threading.** `dlp.outbound_on_match` is a per-route enum threaded
from the bottle manifest (`manifest_egress`) through the resolved route
(`egress.EgressRoute`), the rendered `routes.yaml` (`egress_render_routes`),
and the addon's `Route` (`egress_addon_core`). Unset renders nothing and
resolves to `supervise` at request time. The `list-egress-routes`
introspection endpoint round-trips it so the agent's proposals preserve it.
- **Provider-route default.** Agent-provider routes (the agent talking to its
own LLM API — `api.anthropic.com`, the Codex backend, etc.) are the worst
source of token-shaped false positives because the whole conversation payload
flows through them. `egress_routes_for_bottle` fills `outbound_on_match=redact`
on any provider route that doesn't set it explicitly; a provider that sets the
policy keeps its choice, and manifest routes are unaffected (they default to
`supervise`).
- **Scanners.** `scan_outbound` (and the token detectors `scan_token_patterns`
/ `scan_known_secrets` it calls) accept a `safe_tokens` set. A match whose
value is in `safe_tokens` is skipped, so an approved token no longer blocks;
the scanners keep searching past a safelisted match so a second, un-approved
secret in the same request is still caught. The WebSocket path is passed the
same `safe_tokens` set.
- **Supervisor UI.** `cli/supervise.py` renders `egress-token-allow` like
`gitleaks-allow`: the text payload is shown, modify is unavailable (there is
no file patch to edit), and approval prompts for a non-empty reason recorded
in the response notes. There is no on-disk config diff, so — like
`gitleaks-allow` and `capability-block` — it writes no egress audit-log entry.
- **Failure handling.** If `SUPERVISE_QUEUE_DIR` / `SUPERVISE_BOTTLE_SLUG` are
unset (supervise disabled for the bottle), the addon skips the queue and
returns the existing `403`. Any error writing the proposal or reading the
response also fails closed.
### Data model changes
- New per-route manifest field `dlp.outbound_on_match: block | redact |
supervise`, rendered into `routes.yaml` (omitted when unset).
- `ScanResult` gains a `matched: str = ""` field carrying the raw substring the
detector matched. The token detectors populate it; the structural CRLF
detector leaves it empty. The value stays inside the egress sidecar process —
never written to a log line (logs use the redacted `context`) nor to the
proposal file.
- Proposal text payload (above) plus `<proposal-id>.response.json` in
`SUPERVISE_QUEUE_DIR`, archived to `processed/` after a decision.
- New env var `EGRESS_TOKEN_ALLOW_TIMEOUT_SECONDS` (default 300).
### External dependencies
None. Reuses the existing supervisor queue (`SUPERVISE_QUEUE_DIR`) and the
mitmproxy addon framework already in the egress sidecar.
## Open questions
- Should `known_secrets` (provisioned `EGRESS_TOKEN_*` exfiltration) be
override-able at all, or only `token_patterns`? This PRD allows both —
approval is an explicit operator decision and the safe-tokens set matches the
exact found value — but a future revision could restrict `known_secrets` to
reject-only.
## References
- Issue #261
- PRD 0061 — `gitleaks-allow` supervisor proposal pattern this reuses.
@@ -1,85 +0,0 @@
# PRD 0064: LOG_FULL egress logging credential redaction
- **Status:** Active
- **Author:** claude
- **Created:** 2026-06-25
- **Issue:** #257
## Summary
The `LOG_FULL` egress logging path (`_log_request` and `_log_response` in `egress_addon.py`) writes request/response headers and bodies to stderr without redaction and includes the sidecar-injected upstream `Authorization` header verbatim. This PR applies `redact_tokens` to header values and bodies in both log functions and strips the injected `Authorization` header from request logs entirely.
## Problem
`LOG_FULL` (log level 2) is intended for debugging egress traffic. When active it calls `_log_request` and `_log_response`. Both functions have two related bugs:
1. **Injected `Authorization` header exposure.** `_log_request` is called *after* the sidecar injects upstream credentials (`flow.request.headers["authorization"] = decision.inject_authorization`). The full header dict — including the live credential — is serialized to stderr. Any log collector that ingests the egress container's stderr will receive the upstream bearer token in plaintext.
2. **Unredacted bodies and header values.** Neither `_log_request` nor `_log_response` passes body or header values through `redact_tokens`. By contrast, `_req_ctx` (used for block/warn events) already calls `redact_tokens` on path and host. Any provisioned secret or recognized token pattern that appears in a request body, response body, or non-Authorization header value will be logged verbatim under `LOG_FULL`.
These two bugs compose: an agent that enables `LOG_FULL` and simultaneously triggers a request that carries a known token gains a write path from credentials → egress logs.
## Goals / Success Criteria
- `_log_request` never logs the `authorization` header in any form.
- `_log_request` applies `redact_tokens(value, env=os.environ)` to every other header value before serializing.
- `_log_request` applies `redact_tokens(body, env=os.environ)` to the request body before logging.
- `_log_response` applies `redact_tokens(value, env=os.environ)` to every response header value before logging.
- `_log_response` applies `redact_tokens(body, env=os.environ)` to the response body before logging.
- Unit tests cover each of the five cases above.
## Non-goals
- Redacting host or path in the full-log path (already covered by `_req_ctx` for block/warn events; `_log_request` already calls `redact_tokens` on host and path).
- Suppressing `LOG_FULL` or adding a new log level.
- Changing the outbound DLP scan logic.
## Design
### `_log_request`
```python
def _log_request(self, flow: http.HTTPFlow) -> None:
headers = {
k: redact_tokens(v, env=os.environ)
for k, v in flow.request.headers.items()
if k.lower() != "authorization"
}
body = redact_tokens(flow.request.get_text(strict=False) or "", env=os.environ)
sys.stderr.write(
json.dumps({
"event": "egress_request",
"host": redact_tokens(flow.request.pretty_host, env=os.environ),
"method": flow.request.method,
"path": redact_tokens(flow.request.path, env=os.environ),
"headers": headers,
"body": body,
})
+ "\n"
)
```
The `authorization` key is excluded because by the time `_log_request` is called the sidecar has already injected the upstream credential (`decision.inject_authorization`). Logging it would write a live bearer token to stderr on every allowed request. There is no safe subset to log — the value is always a live credential or empty.
### `_log_response`
```python
def _log_response(self, flow: http.HTTPFlow) -> None:
headers = {
k: redact_tokens(v, env=os.environ)
for k, v in flow.response.headers.items()
}
body = redact_tokens(flow.response.get_text(strict=False) or "", env=os.environ)
sys.stderr.write(
json.dumps({
"event": "egress_response",
"host": flow.request.pretty_host,
"status": flow.response.status_code,
"headers": headers,
"body": body,
})
+ "\n"
)
```
Response headers don't carry injected credentials, so no header name is suppressed — only the values are scrubbed by `redact_tokens`.
@@ -1,4 +1,4 @@
# PRD 0063: Strengthen outbound exfiltration detection
# PRD prd-new: Strengthen outbound exfiltration detection
- **Status:** Active
- **Author:** claude
@@ -37,10 +37,9 @@ query, headers, body). But the content-based strong tier only matches
## Goals / Success Criteria
1. Each launched bottle has a unique canary token in the agent's environment
under a randomized `WORD_WORD_SECRET` env var name. The egress sidecar gets
the same env var and registers that exact name through
`BOT_BOTTLE_SENSITIVE_PREFIXES`. Any outbound appearance of the canary
blocks the request as a known-secret match.
(`BOT_BOTTLE_CANARY`) and the egress sidecar's environment
(`EGRESS_TOKEN_CANARY`). Any outbound appearance of the canary blocks the
request with reason `"canary token"`.
2. `scan_known_secrets` accepts a `sensitive_prefixes` parameter (default:
`("EGRESS_TOKEN_",)`). `scan_outbound` reads
`BOT_BOTTLE_SENSITIVE_PREFIXES` from `environ` and merges those prefixes
@@ -78,20 +77,18 @@ query, headers, body). But the content-based strong tier only matches
```
Egress.prepare()
canary = secrets.token_urlsafe(32)
canary_env = <random WORD_WORD_SECRET>
EgressPlan(canary=canary, canary_env=canary_env, ...)
EgressPlan(canary=canary, ...)
Docker compose render:
sidecar env: <canary_env>=<canary>
sidecar env: BOT_BOTTLE_SENSITIVE_PREFIXES=<canary_env>
agent env: <canary_env>=<canary> ← visible to agent as a "secret"
sidecar env: EGRESS_TOKEN_CANARY=<canary> ← scanned by existing known-secrets detector
agent env: BOT_BOTTLE_CANARY=<canary> ← visible to agent as a "secret"
macos-container launch: same literals added to sidecar + agent env entries
```
The sidecar uses `BOT_BOTTLE_SENSITIVE_PREFIXES` to make the random canary env
name part of the existing `scan_known_secrets` detector without adding a
manifest schema field.
`EGRESS_TOKEN_CANARY` matches the `EGRESS_TOKEN_` prefix already scanned by
`scan_known_secrets`, so no detector code changes are required for canary
detection — only the injection path.
### Broadened known-value scanning
-46
View File
@@ -168,34 +168,6 @@ class TestAgentProviderRuntime(unittest.TestCase):
self.assertEqual("~/.claude/statusline.sh", settings["statusLine"]["command"])
self.assertEqual("custom:bot-bottle-research-ui", settings["theme"])
def test_claude_plan_uses_startup_args_from_provider_settings(self):
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
plan = build_agent_provision_plan(
template="claude",
dockerfile="",
state_dir=Path(tmp),
instance_name="bot-bottle-test",
prompt_file=Path(tmp) / "prompt.txt",
provider_settings={
"startup_args": ["--model", "opus"],
},
)
self.assertEqual(("--model", "opus"), plan.startup_args)
def test_codex_plan_uses_startup_args_from_provider_settings(self):
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
plan = build_agent_provision_plan(
template="codex",
dockerfile="",
state_dir=Path(tmp),
instance_name="bot-bottle-test",
prompt_file=Path(tmp) / "prompt.txt",
provider_settings={
"startup_args": ["--model", "gpt-5-codex"],
},
)
self.assertEqual(("--model", "gpt-5-codex"), plan.startup_args)
def test_codex_forward_host_credentials_populates_egress_routes(self):
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
home = Path(tmp) / "host-codex"
@@ -422,24 +394,6 @@ class TestAgentProviderRuntime(unittest.TestCase):
self.assertNotIn("OPENROUTER_API_KEY", plan.guest_env)
self.assertTrue(provider["compat"]["supportsReasoningEffort"])
def test_pi_plan_appends_startup_args_from_provider_settings(self):
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
plan = build_agent_provision_plan(
template="pi",
dockerfile="",
state_dir=Path(tmp),
instance_name="bot-bottle-test",
prompt_file=Path(tmp) / "prompt.txt",
provider_settings={
"models": ["qwen3:14b"],
"startup_args": ["--no-stream"],
},
)
self.assertEqual(
("--models", "ollama/qwen3:14b", "--no-stream"),
plan.startup_args,
)
def test_pi_prompt_mode_appends_system_prompt_interactively(self):
self.assertEqual(
["--append-system-prompt", "/home/node/.bot-bottle-prompt.txt"],
-21
View File
@@ -102,27 +102,6 @@ class TestAttachAgent(unittest.TestCase):
bottle.argv,
)
def test_remote_control_is_provider_startup_arg(self):
class Bottle:
argv: list[str] = []
def exec_agent(self, argv: list[str], *, tty: bool = True) -> int:
self.argv = list(argv)
return 0
bottle = Bottle()
exit_code = start_mod.attach_agent(
bottle, # type: ignore[arg-type]
agent_provider_template="codex",
startup_args=("remote-control",),
)
self.assertEqual(0, exit_code)
self.assertEqual(
["--dangerously-bypass-approvals-and-sandbox", "remote-control"],
bottle.argv,
)
if __name__ == "__main__":
unittest.main()
+2 -23
View File
@@ -80,11 +80,7 @@ def _git_gate_plan(upstreams: tuple[GitGateUpstream, ...] = ()) -> GitGatePlan:
)
def _egress_plan(
routes: tuple[EgressRoute, ...] = (),
*,
canary: bool = False,
) -> EgressPlan:
def _egress_plan(routes: tuple[EgressRoute, ...] = ()) -> EgressPlan:
token_env_map = {
r.token_env: r.token_ref
for r in routes
@@ -99,8 +95,6 @@ def _egress_plan(
egress_network=f"bot-bottle-egress-{SLUG}",
mitmproxy_ca_host_path=STATE / "egress-ca" / "mitmproxy-ca.pem",
mitmproxy_ca_cert_only_host_path=STATE / "egress-ca" / "ca.pem",
canary="fake-canary-value" if canary else "",
canary_env="CANON_ALPHA_SECRET" if canary else "",
)
@@ -118,7 +112,6 @@ def _plan(
with_git: bool = False,
with_egress: bool = False,
supervise: bool = False,
canary: bool = False,
) -> DockerBottlePlan:
"""Build a fully-resolved DockerBottlePlan. Toggles cover the
matrix the renderer's conditional-service logic branches on."""
@@ -157,7 +150,7 @@ def _plan(
slug=SLUG,
forwarded_env={"CLAUDE_CODE_OAUTH_TOKEN": "x"},
git_gate_plan=_git_gate_plan(upstreams),
egress_plan=_egress_plan(routes, canary=canary),
egress_plan=_egress_plan(routes),
supervise_plan=_supervise_plan() if supervise else None,
use_runsc=False,
agent_provision=AgentProvisionPlan(
@@ -382,20 +375,6 @@ class TestSidecarBundleShape(unittest.TestCase):
env_strings = sc["environment"]
self.assertNotIn("EGRESS_TOKEN_0", env_strings)
def test_canary_env_registered_as_sensitive_in_sidecar(self):
sc = self._render(canary=True)["services"]["sidecars"]
env_strings = sc["environment"]
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", env_strings)
self.assertIn(
"BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET",
env_strings,
)
def test_canary_env_visible_to_agent(self):
agent = self._render(canary=True)["services"]["agent"]
env_strings = agent["environment"]
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", env_strings)
def test_supervise_env_present_when_active(self):
sc = self._render(supervise=True)["services"]["sidecars"]
env_strings = sc["environment"]
@@ -29,9 +29,6 @@ from bot_bottle.supervise import SupervisePlan
_URL = "http://supervise:9100/"
_CODEX_DOCKERFILE = (
Path(__file__).resolve().parents[2] / "bot_bottle/contrib/codex/Dockerfile"
)
def _make_bottle(exec_result: ExecResult | None = None) -> MagicMock:
@@ -279,12 +276,6 @@ class TestCodexProvision(unittest.TestCase):
)
class TestCodexDockerfile(unittest.TestCase):
def test_installs_procps_for_remote_control_pid_management(self):
dockerfile = _CODEX_DOCKERFILE.read_text()
self.assertIn("procps", dockerfile)
class TestCodexSuperviseMcp(unittest.TestCase):
def test_noop_when_supervise_disabled(self):
bottle = _make_bottle()
@@ -12,7 +12,6 @@ from bot_bottle.contrib.gitea.deploy_key_provisioner import (
GiteaDeployKeyProvisioner,
_split_owner_repo,
)
from bot_bottle.deploy_key_provisioner import DeployKeyCollisionError
def _provisioner() -> GiteaDeployKeyProvisioner:
@@ -101,30 +100,6 @@ class TestCreate(unittest.TestCase):
provisioner.create("owner/repo", "title")
self.assertIn("403", str(ctx.exception))
def test_create_raises_collision_error_on_422(self):
provisioner = _provisioner()
collision_body = json.dumps({
"errors": ["Key content already exists on this repository"],
"message": "422 Unprocessable Entity",
})
with patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.subprocess.run"
), patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.urllib.request.urlopen",
side_effect=_http_error(422, collision_body),
), patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.Path.read_bytes",
return_value=b"pk",
), patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.Path.read_text",
return_value="ssh-ed25519 AAAA\n",
):
with self.assertRaises(DeployKeyCollisionError) as ctx:
provisioner.create("owner/repo", "my-title")
msg = str(ctx.exception)
self.assertIn("owner/repo", msg)
self.assertIn("my-title", msg)
class TestDelete(unittest.TestCase):
def test_delete_calls_correct_endpoint(self):
+5 -65
View File
@@ -1,4 +1,4 @@
"""Unit: DLP detectors (PRD 0053).
"""Unit: DLP detectors (PRD 0053, prd-new).
Tests for token pattern scanning, known secret detection, fragmentation-
resistant matching, entropy scoring, and naive prompt injection detection."""
@@ -9,6 +9,7 @@ import unittest
from bot_bottle.dlp_detectors import (
ENTROPY_BLOCK_THRESHOLD,
ENTROPY_WINDOW,
PARTIAL_MATCH_MIN_LEN,
REDACT,
_alnum_projection,
@@ -450,63 +451,6 @@ class TestKnownSecretsNewVariants(unittest.TestCase):
self.assertIsNotNone(result)
class TestMatchedAndSafeTokens(unittest.TestCase):
"""PRD 0062: detectors carry the raw matched value, and a safelisted
value is skipped so the supervisor can approve a specific token."""
def test_token_pattern_sets_matched(self):
token = "ghp_" + "A" * 36
result = scan_token_patterns(f"token: {token}")
assert result is not None
self.assertEqual(token, result.matched)
def test_safe_token_is_skipped(self):
token = "ghp_" + "A" * 36
self.assertIsNone(
scan_token_patterns(f"token: {token}", safe_tokens={token})
)
def test_safe_token_does_not_mask_other_token(self):
safe = "ghp_" + "A" * 36
other = "AKIAIOSFODNN7EXAMPLE"
result = scan_token_patterns(
f"a={safe} b={other}", safe_tokens={safe},
)
assert result is not None
self.assertEqual(other, result.matched)
self.assertIn("AWS", result.reason)
def test_known_secret_sets_matched_and_safelist_skips(self):
secret = "supersecretvalue123"
env = {"EGRESS_TOKEN_FOO": secret}
result = scan_known_secrets(f"x={secret}", env=env)
assert result is not None
self.assertEqual(secret, result.matched)
self.assertIsNone(
scan_known_secrets(f"x={secret}", env=env, safe_tokens={secret})
)
def test_crlf_block_has_no_matched_value(self):
result = scan_crlf_injection("path%0d%0aHost: evil")
assert result is not None
self.assertEqual("", result.matched)
class TestStripCrlf(unittest.TestCase):
def test_removes_url_encoded_crlf(self):
from bot_bottle.dlp_detectors import strip_crlf
out = strip_crlf("next=%0d%0aX-Injected: evil")
self.assertNotRegex(out, r"%0[dD]%0[aA]")
def test_removes_literal_header_injection(self):
from bot_bottle.dlp_detectors import strip_crlf
out = strip_crlf("value\r\nX-Injected: evil")
self.assertIsNone(scan_crlf_injection(out))
def test_leaves_clean_text_unchanged(self):
from bot_bottle.dlp_detectors import strip_crlf
self.assertEqual("/api/v1/data?q=hello", strip_crlf("/api/v1/data?q=hello"))
class TestAlnumProjection(unittest.TestCase):
def test_alphanumeric_unchanged(self):
self.assertEqual("abc123XYZ", _alnum_projection("abc123XYZ"))
@@ -593,15 +537,11 @@ class TestFragmentationResistantMatching(unittest.TestCase):
def test_canary_prefix_detected(self):
canary_value = "canary-fake-secret-value-xyz"
env = {"CANON_ALPHA_SECRET": canary_value}
result = scan_known_secrets(
f"x={canary_value}",
env=env,
sensitive_prefixes=("CANON_ALPHA_SECRET",),
)
env = {"EGRESS_TOKEN_CANARY": canary_value}
result = scan_known_secrets(f"x={canary_value}", env=env)
self.assertIsNotNone(result)
assert result is not None
self.assertIn("CANON_ALPHA_SECRET", result.reason)
self.assertIn("EGRESS_TOKEN_CANARY", result.reason)
class TestRedactTokensBroadenedPrefixes(unittest.TestCase):
-10
View File
@@ -136,16 +136,6 @@ class TestClaudeArgv(unittest.TestCase):
argv,
)
def test_codex_remote_control_startup_arg_does_not_receive_initial_prompt(self):
argv = _codex_bottle("/home/node/.bot-bottle-prompt.txt").agent_argv(
["--dangerously-bypass-approvals-and-sandbox", "remote-control"],
)
self.assertEqual(
["docker", "exec", "-it", "bot-bottle-dev-abc", "codex",
"--dangerously-bypass-approvals-and-sandbox", "remote-control"],
argv,
)
def test_codex_resume_does_not_append_initial_prompt(self):
argv = _codex_bottle("/home/node/.bot-bottle-prompt.txt").agent_argv(
["--dangerously-bypass-approvals-and-sandbox", "resume", "--last"],
@@ -31,6 +31,7 @@ class _Provider(AgentProvider):
return AgentProviderRuntime(
template="test", command="test", image="",
prompt_mode="append_file", bypass_args=(), resume_args=(),
remote_control_args=(),
)
def provision_plan(self, **kwargs): # type: ignore[override]
raise NotImplementedError
+11 -102
View File
@@ -1,5 +1,5 @@
"""Unit: Egress route lift + routes.yaml render + token
resolution (PRD 0017, PRD 0053)."""
resolution (PRD 0017, PRD 0053, prd-new)."""
import tempfile
import unittest
@@ -10,12 +10,10 @@ from bot_bottle.egress import (
Egress,
EgressPlan,
EgressRoute,
egress_agent_env_entries,
egress_manifest_routes,
egress_render_routes,
egress_resolve_token_values,
egress_routes_for_bottle,
egress_sidecar_env_entries,
egress_token_env_map,
)
from bot_bottle.log import Die
@@ -208,23 +206,6 @@ class TestProviderRouteMerge(unittest.TestCase):
self.assertEqual((), routes[0].matches)
self.assertEqual({}, egress_token_env_map(routes))
def test_provider_route_defaults_to_redact_on_match(self):
b = _bottle([])
pr = EgressRoute(host="api.anthropic.com")
routes = egress_routes_for_bottle(b, (pr,))
self.assertEqual("redact", routes[0].outbound_on_match)
def test_provider_route_explicit_on_match_preserved(self):
b = _bottle([])
pr = EgressRoute(host="api.anthropic.com", outbound_on_match="supervise")
routes = egress_routes_for_bottle(b, (pr,))
self.assertEqual("supervise", routes[0].outbound_on_match)
def test_manifest_route_does_not_get_redact_default(self):
b = _bottle([{"host": "api.example.com"}])
routes = egress_routes_for_bottle(b)
self.assertEqual("", routes[0].outbound_on_match)
def test_two_provider_routes_with_same_token_ref_share_slot(self):
b = _bottle([])
routes = egress_routes_for_bottle(b, (
@@ -322,7 +303,7 @@ class TestRenderRoutes(unittest.TestCase):
self.assertEqual([], parse_yaml_subset(rendered)["routes"])
def test_round_trip_through_addon_core(self):
from bot_bottle.egress_addon_core import load_config
from bot_bottle.egress_addon_core import load_routes
b = _bottle([
{"host": "api.github.com",
"auth": {"scheme": "Bearer", "token_ref": "GH_PAT"},
@@ -333,7 +314,7 @@ class TestRenderRoutes(unittest.TestCase):
{"host": "api.anthropic.com"},
])
routes = egress_routes_for_bottle(b)
addon_routes = load_config(egress_render_routes(routes)).routes
addon_routes = load_routes(egress_render_routes(routes))
self.assertEqual(3, len(addon_routes))
self.assertEqual("Bearer", addon_routes[0].auth_scheme)
self.assertEqual("EGRESS_TOKEN_0", addon_routes[0].token_env)
@@ -341,41 +322,24 @@ class TestRenderRoutes(unittest.TestCase):
self.assertEqual("", addon_routes[2].auth_scheme)
def test_dlp_round_trips(self):
from bot_bottle.egress_addon_core import load_config
from bot_bottle.egress_addon_core import load_routes
b = _bottle([{"host": "x.example", "dlp": {
"outbound_detectors": ["token_patterns"],
"inbound_detectors": False,
}}])
routes = egress_routes_for_bottle(b)
rendered = egress_render_routes(routes)
addon_routes = load_config(rendered).routes
addon_routes = load_routes(rendered)
self.assertEqual(("token_patterns",), addon_routes[0].outbound_detectors)
self.assertEqual((), addon_routes[0].inbound_detectors)
def test_outbound_on_match_round_trips(self):
from bot_bottle.egress_addon_core import load_config
b = _bottle([{"host": "logs.example", "dlp": {
"outbound_on_match": "redact",
}}])
routes = egress_routes_for_bottle(b)
rendered = egress_render_routes(routes)
self.assertIn('outbound_on_match: "redact"', rendered)
addon_routes = load_config(rendered).routes
self.assertEqual("redact", addon_routes[0].outbound_on_match)
def test_outbound_on_match_default_omitted_from_render(self):
b = _bottle([{"host": "x.example"}])
routes = egress_routes_for_bottle(b)
rendered = egress_render_routes(routes)
self.assertNotIn("outbound_on_match", rendered)
def test_git_fetch_policy_round_trips(self):
from bot_bottle.egress_addon_core import load_config
from bot_bottle.egress_addon_core import load_routes
b = _bottle([{"host": "github.com", "git": {"fetch": True}}])
routes = egress_routes_for_bottle(b)
rendered = egress_render_routes(routes)
self.assertEqual({"fetch": True}, self._parsed(routes)[0]["git"])
addon_routes = load_config(rendered).routes
addon_routes = load_routes(rendered)
self.assertTrue(addon_routes[0].git_fetch)
def test_log_zero_omitted_from_render(self):
@@ -450,7 +414,7 @@ class TestResolveTokenValues(unittest.TestCase):
class TestCanaryGeneration(unittest.TestCase):
"""Egress.prepare() generates a unique canary token per session."""
"""Egress.prepare() generates a unique canary token per session (prd-new)."""
def _bottle_obj(self):
return ManifestIndex.from_json_obj({
@@ -472,7 +436,6 @@ class TestCanaryGeneration(unittest.TestCase):
plan = self._make_plan()
self.assertIsInstance(plan.canary, str)
self.assertGreater(len(plan.canary), 0)
self.assertRegex(plan.canary_env, r"^[A-Z]+_[A-Z]+_SECRET$")
def test_canary_is_unique_per_session(self):
with tempfile.TemporaryDirectory() as td:
@@ -490,16 +453,12 @@ class TestCanaryGeneration(unittest.TestCase):
from bot_bottle.dlp_detectors import scan_known_secrets
plan = self._make_plan()
env = {plan.canary_env: plan.canary}
result = scan_known_secrets(
f"exfil={plan.canary}",
env=env,
sensitive_prefixes=(plan.canary_env,),
)
env = {"EGRESS_TOKEN_CANARY": plan.canary}
result = scan_known_secrets(f"exfil={plan.canary}", env=env)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
self.assertIn(plan.canary_env, result.reason)
self.assertIn("EGRESS_TOKEN_CANARY", result.reason)
def test_egress_plan_canary_field_default_empty(self):
# Verify EgressPlan can be constructed with an empty canary (backward compat).
@@ -511,56 +470,6 @@ class TestCanaryGeneration(unittest.TestCase):
token_env_map={},
)
self.assertEqual("", plan.canary)
self.assertEqual("", plan.canary_env)
class TestEgressEnvEntries(unittest.TestCase):
def test_sidecar_entries_include_route_tokens_and_canary_scan_prefix(self):
plan = EgressPlan(
slug="s",
routes_path=Path("/tmp/r.yaml"),
routes=(EgressRoute(host="api.example"),),
token_env_map={"EGRESS_TOKEN_1": "T1", "EGRESS_TOKEN_0": "T0"},
canary="fake-canary-value",
canary_env="CANON_ALPHA_SECRET",
)
self.assertEqual(
(
"EGRESS_TOKEN_0",
"EGRESS_TOKEN_1",
"CANON_ALPHA_SECRET=fake-canary-value",
"BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET",
),
egress_sidecar_env_entries(plan),
)
def test_agent_entries_include_only_canary_bait(self):
plan = EgressPlan(
slug="s",
routes_path=Path("/tmp/r.yaml"),
routes=(),
token_env_map={},
canary="fake-canary-value",
canary_env="CANON_ALPHA_SECRET",
)
self.assertEqual(
("CANON_ALPHA_SECRET=fake-canary-value",),
egress_agent_env_entries(plan),
)
def test_canary_entries_omitted_when_name_missing(self):
plan = EgressPlan(
slug="s",
routes_path=Path("/tmp/r.yaml"),
routes=(),
token_env_map={},
canary="fake-canary-value",
)
self.assertEqual((), egress_sidecar_env_entries(plan))
self.assertEqual((), egress_agent_env_entries(plan))
if __name__ == "__main__":
+44 -142
View File
@@ -22,16 +22,15 @@ from bot_bottle.egress_addon_core import (
MatchEntry,
PathMatch,
Route,
ScanResult,
build_inbound_scan_text,
build_outbound_scan_text,
build_token_allow_payload,
decide,
decide_git_fetch,
evaluate_matches,
is_git_fetch_request,
is_git_push_request,
load_config,
load_routes,
match_route,
outbound_scan_headers,
parse_config,
@@ -268,24 +267,46 @@ class TestParseDlp(unittest.TestCase):
"dlp": {"wat": True},
}]})
def test_outbound_on_match_default_empty(self):
routes = parse_routes({"routes": [{"host": "x.example"}]})
self.assertEqual("", routes[0].outbound_on_match)
def test_outbound_on_match_parsed(self):
for policy in ("block", "redact", "supervise"):
routes = parse_routes({"routes": [{
"host": "x.example",
"dlp": {"outbound_on_match": policy},
}]})
self.assertEqual(policy, routes[0].outbound_on_match)
# --- load_routes ---------------------------------------------------------
def test_outbound_on_match_invalid_rejected(self):
class TestLoadRoutes(unittest.TestCase):
def test_yaml_text_round_trip(self):
routes = load_routes(
'routes:\n'
' - host: "api.example"\n'
)
self.assertEqual(1, len(routes))
self.assertEqual("api.example", routes[0].host)
def test_full_route_shape_parses(self):
routes = load_routes(
'routes:\n'
' - host: "api.example"\n'
' auth_scheme: "Bearer"\n'
' token_env: "EGRESS_TOKEN_0"\n'
' matches:\n'
' - paths:\n'
' - value: "/v1/"\n'
' - type: "exact"\n'
' value: "/messages"\n'
)
self.assertEqual(1, len(routes))
r = routes[0]
self.assertEqual("api.example", r.host)
self.assertEqual("Bearer", r.auth_scheme)
self.assertEqual("EGRESS_TOKEN_0", r.token_env)
self.assertEqual(1, len(r.matches))
self.assertEqual(2, len(r.matches[0].paths))
def test_empty_routes_list(self):
routes = load_routes("routes: []\n")
self.assertEqual((), routes)
def test_invalid_yaml_raises_value_error(self):
with self.assertRaises(ValueError):
parse_routes({"routes": [{
"host": "x.example",
"dlp": {"outbound_on_match": "nope"},
}]})
load_routes("routes:\n\t- host: x\n")
# --- load_config / parse_config ------------------------------------------
@@ -336,33 +357,6 @@ class TestLoadConfig(unittest.TestCase):
with self.assertRaises(ValueError):
parse_config("not a dict")
def test_empty_routes_list(self):
cfg = load_config("routes: []\n")
self.assertEqual((), cfg.routes)
def test_full_route_shape_parses(self):
cfg = load_config(
'routes:\n'
' - host: "api.example"\n'
' auth_scheme: "Bearer"\n'
' token_env: "EGRESS_TOKEN_0"\n'
' matches:\n'
' - paths:\n'
' - value: "/v1/"\n'
' - type: "exact"\n'
' value: "/messages"\n'
)
r = cfg.routes[0]
self.assertEqual("api.example", r.host)
self.assertEqual("Bearer", r.auth_scheme)
self.assertEqual("EGRESS_TOKEN_0", r.token_env)
self.assertEqual(1, len(r.matches))
self.assertEqual(2, len(r.matches[0].paths))
def test_invalid_yaml_raises_value_error(self):
with self.assertRaises(ValueError):
load_config("routes:\n\t- host: x\n")
# --- evaluate_matches ---------------------------------------------------
@@ -1173,93 +1167,8 @@ class TestScanInbound(unittest.TestCase):
self.assertEqual("block", result.severity)
class TestScanOutboundSafeTokens(unittest.TestCase):
"""PRD 0062: scan_outbound threads the supervisor-approved safe-tokens
set into the token detectors."""
def test_safe_token_allows_request(self):
text = build_outbound_scan_text(
host="api.example.com", path="/v1/data", query="",
headers={}, body=f"key={_AWS_KEY}",
)
self.assertIsNone(
scan_outbound(_ROUTE, text, {}, safe_tokens={_AWS_KEY})
)
def test_unrelated_safe_token_still_blocks(self):
text = build_outbound_scan_text(
host="api.example.com", path="/v1/data", query="",
headers={}, body=f"key={_AWS_KEY}",
)
result = scan_outbound(_ROUTE, text, {}, safe_tokens={"ghp_" + "A" * 36})
self.assertIsNotNone(result)
assert result is not None
self.assertEqual(_AWS_KEY, result.matched)
class TestScanOutboundCrlfText(unittest.TestCase):
"""PRD 0062: CRLF is scanned only over the request line + headers
(crlf_text), never the body a body is not an injection vector."""
def test_body_crlf_not_flagged_when_crlf_text_excludes_body(self):
# A form-encoded multi-line body legitimately contains %0d%0a.
body = "comment=line1%0d%0aline2"
full = build_outbound_scan_text(
host="api.example.com", path="/submit", query="",
headers={}, body=body,
)
crlf_text = build_outbound_scan_text(
host="api.example.com", path="/submit", query="",
headers={}, body="",
)
self.assertIsNone(scan_outbound(_ROUTE, full, {}, crlf_text=crlf_text))
def test_request_line_crlf_still_flagged(self):
full = build_outbound_scan_text(
host="api.example.com", path="/p", query="next=%0d%0aX:evil",
headers={}, body="",
)
crlf_text = full
result = scan_outbound(_ROUTE, full, {}, crlf_text=crlf_text)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_default_crlf_text_scans_full_blob(self):
# Backward compatibility: crlf_text=None scans everything (body too).
full = build_outbound_scan_text(
host="api.example.com", path="/submit", query="",
headers={}, body="x=%0d%0aX:evil",
)
self.assertIsNotNone(scan_outbound(_ROUTE, full, {}))
class TestBuildTokenAllowPayload(unittest.TestCase):
def test_payload_includes_context_and_no_raw_token(self):
result = ScanResult(
severity="block",
reason="AWS access key found in body",
location="body",
context="key=******** tail",
matched=_AWS_KEY,
)
payload = build_token_allow_payload(
"api.example.com", "POST", "/v1/ingest", result,
)
self.assertIn("host: api.example.com", payload)
self.assertIn("method: POST", payload)
self.assertIn("path: /v1/ingest", payload)
self.assertIn("AWS access key found in body", payload)
self.assertIn("key=******** tail", payload)
# The raw matched value must never appear in the proposal file.
self.assertNotIn(_AWS_KEY, payload)
def test_payload_omits_context_line_when_empty(self):
result = ScanResult(severity="block", reason="r", matched="x")
payload = build_token_allow_payload("h", "GET", "/", result)
self.assertNotIn("context:", payload)
class TestScanOutboundEnhanced(unittest.TestCase):
"""scan_outbound changes: binary decode, entropy detector,
"""scan_outbound changes from prd-new: binary decode, entropy detector,
broadened known-value prefixes, fragmentation resistance."""
_ROUTE = Route(host="api.example.com")
@@ -1323,27 +1232,20 @@ class TestScanOutboundEnhanced(unittest.TestCase):
result = scan_outbound(self._ROUTE, f"auth={secret}", env)
self.assertIsNotNone(result)
def test_canary_detected_via_random_secret_env_name(self):
# The fake secret uses a randomized env name that the sidecar marks
# as sensitive through BOT_BOTTLE_SENSITIVE_PREFIXES.
def test_canary_detected_via_egress_token_canary(self):
# The canary (injected as EGRESS_TOKEN_CANARY) is caught by known_secrets.
canary = "canaryvalue12345abcdef"
env = {
"CANON_ALPHA_SECRET": canary,
"BOT_BOTTLE_SENSITIVE_PREFIXES": "CANON_ALPHA_SECRET",
}
env = {"EGRESS_TOKEN_CANARY": canary}
result = scan_outbound(self._ROUTE, f"data={canary}", env)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
self.assertIn("CANON_ALPHA_SECRET", result.reason)
self.assertIn("EGRESS_TOKEN_CANARY", result.reason)
def test_fragmented_canary_blocked(self):
# Canary with separators injected is still caught.
canary = "supersecretcanary99"
env = {
"CANON_ALPHA_SECRET": canary,
"BOT_BOTTLE_SENSITIVE_PREFIXES": "CANON_ALPHA_SECRET",
}
env = {"EGRESS_TOKEN_CANARY": canary}
fragmented = "-".join(canary)
result = scan_outbound(self._ROUTE, f"x={fragmented}", env)
self.assertIsNotNone(result)
@@ -1,274 +0,0 @@
"""Unit: LOG_FULL credential redaction in _log_request / _log_response (issue #257).
egress_addon.py is sidecar-only code that depends on mitmproxy, which is
not installed on the host. This file pre-populates sys.modules with the
minimum mocks needed so EgressAddon can be imported and tested without the
real mitmproxy package."""
from __future__ import annotations
import json
import sys
import types
import unittest
from io import StringIO
from typing import Any
from unittest.mock import patch
# ---------------------------------------------------------------------------
# Sidecar-import shims — must run before importing egress_addon
# ---------------------------------------------------------------------------
def _ensure_shims() -> None:
if "mitmproxy" not in sys.modules:
_mm = types.ModuleType("mitmproxy")
_mh = types.ModuleType("mitmproxy.http")
setattr(_mm, "http", _mh)
sys.modules["mitmproxy"] = _mm
sys.modules["mitmproxy.http"] = _mh
if "egress_addon_core" not in sys.modules:
import bot_bottle.egress_addon_core as _core
sys.modules["egress_addon_core"] = _core
_ensure_shims()
from bot_bottle.egress_addon import EgressAddon # noqa: E402 (import after shims)
from bot_bottle.egress_addon_core import Config, LOG_FULL # noqa: E402
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _addon() -> EgressAddon:
"""Return a bare EgressAddon with LOG_FULL config and no routes file."""
a: EgressAddon = EgressAddon.__new__(EgressAddon)
a.config = Config(routes=(), log=LOG_FULL)
a.safe_tokens = set()
a._supervise_queue_dir = ""
a._supervise_slug = ""
a._token_allow_timeout = 300.0
return a
class _Headers:
def __init__(self, d: dict[str, str]) -> None:
self._d = d
def items(self) -> list[tuple[str, str]]:
return list(self._d.items())
class _Request:
def __init__(
self,
host: str = "api.example.com",
method: str = "POST",
path: str = "/v1/messages",
headers: dict[str, str] | None = None,
body: str = "",
) -> None:
self.pretty_host = host
self.method = method
self.path = path
self.headers = _Headers(headers or {})
self._body = body
def get_text(self, *, strict: bool = True) -> str:
return self._body
class _Response:
def __init__(
self,
status_code: int = 200,
headers: dict[str, str] | None = None,
body: str = "",
) -> None:
self.status_code = status_code
self.headers = _Headers(headers or {})
self._body = body
def get_text(self, *, strict: bool = True) -> str:
return self._body
class _Flow:
def __init__(
self,
request: _Request | None = None,
response: _Response | None = None,
) -> None:
self.request = request or _Request()
self.response = response or _Response()
def _log_request(addon: EgressAddon, flow: _Flow) -> dict[str, Any]:
buf = StringIO()
with patch("sys.stderr", buf):
addon._log_request(flow) # type: ignore[arg-type]
return json.loads(buf.getvalue())
def _log_response(addon: EgressAddon, flow: _Flow) -> dict[str, Any]:
buf = StringIO()
with patch("sys.stderr", buf):
addon._log_response(flow) # type: ignore[arg-type]
return json.loads(buf.getvalue())
# ---------------------------------------------------------------------------
# _log_request — authorization header stripped
# ---------------------------------------------------------------------------
class TestLogRequestAuthorizationStripped(unittest.TestCase):
def test_lowercase_authorization_excluded(self) -> None:
addon = _addon()
flow = _Flow(request=_Request(headers={"authorization": "Bearer sk-real-secret"}))
entry = _log_request(addon, flow)
self.assertNotIn("authorization", entry["headers"])
def test_titlecase_authorization_excluded(self) -> None:
addon = _addon()
flow = _Flow(request=_Request(headers={"Authorization": "Bearer sk-real-secret"}))
entry = _log_request(addon, flow)
self.assertNotIn("Authorization", entry["headers"])
self.assertNotIn("authorization", entry["headers"])
def test_non_auth_headers_retained(self) -> None:
addon = _addon()
flow = _Flow(request=_Request(headers={
"authorization": "Bearer sk-real-secret",
"content-type": "application/json",
}))
entry = _log_request(addon, flow)
self.assertIn("content-type", entry["headers"])
self.assertEqual("application/json", entry["headers"]["content-type"])
def test_no_authorization_header_logs_all_others(self) -> None:
addon = _addon()
flow = _Flow(request=_Request(headers={"x-request-id": "abc"}))
entry = _log_request(addon, flow)
self.assertEqual({"x-request-id": "abc"}, entry["headers"])
# ---------------------------------------------------------------------------
# _log_request — body redaction
# ---------------------------------------------------------------------------
_OPENAI_KEY = "sk-" + "A" * 48
class TestLogRequestBodyRedacted(unittest.TestCase):
def test_token_pattern_in_body_scrubbed(self) -> None:
addon = _addon()
flow = _Flow(request=_Request(body=f"key={_OPENAI_KEY}"))
entry = _log_request(addon, flow)
self.assertNotIn(_OPENAI_KEY, entry["body"])
self.assertIn("********", entry["body"])
def test_provisioned_secret_in_body_scrubbed(self) -> None:
addon = _addon()
secret = "provisioned-egress-secret-xyz"
flow = _Flow(request=_Request(body=f"token={secret}"))
with patch.dict("os.environ", {"EGRESS_TOKEN_0": secret}):
entry = _log_request(addon, flow)
self.assertNotIn(secret, entry["body"])
self.assertIn("********", entry["body"])
def test_clean_body_preserved(self) -> None:
addon = _addon()
payload = '{"model": "claude-3", "max_tokens": 1024}'
flow = _Flow(request=_Request(body=payload))
entry = _log_request(addon, flow)
self.assertEqual(payload, entry["body"])
# ---------------------------------------------------------------------------
# _log_request — non-authorization header value redaction
# ---------------------------------------------------------------------------
class TestLogRequestHeaderValuesRedacted(unittest.TestCase):
def test_token_in_custom_header_scrubbed(self) -> None:
addon = _addon()
flow = _Flow(request=_Request(headers={"x-api-key": _OPENAI_KEY}))
entry = _log_request(addon, flow)
self.assertNotIn(_OPENAI_KEY, entry["headers"].get("x-api-key", ""))
self.assertIn("********", entry["headers"].get("x-api-key", ""))
def test_clean_header_value_preserved(self) -> None:
addon = _addon()
flow = _Flow(request=_Request(headers={"accept": "application/json"}))
entry = _log_request(addon, flow)
self.assertEqual("application/json", entry["headers"]["accept"])
# ---------------------------------------------------------------------------
# _log_response — body redaction
# ---------------------------------------------------------------------------
class TestLogResponseBodyRedacted(unittest.TestCase):
def test_token_pattern_in_response_body_scrubbed(self) -> None:
addon = _addon()
flow = _Flow(
request=_Request(),
response=_Response(body=f'{{"key": "{_OPENAI_KEY}"}}'),
)
entry = _log_response(addon, flow)
self.assertNotIn(_OPENAI_KEY, entry["body"])
self.assertIn("********", entry["body"])
def test_provisioned_secret_in_response_body_scrubbed(self) -> None:
addon = _addon()
secret = "provisioned-egress-secret-xyz"
flow = _Flow(
request=_Request(),
response=_Response(body=f'{{"token": "{secret}"}}'),
)
with patch.dict("os.environ", {"EGRESS_TOKEN_0": secret}):
entry = _log_response(addon, flow)
self.assertNotIn(secret, entry["body"])
self.assertIn("********", entry["body"])
def test_clean_response_body_preserved(self) -> None:
addon = _addon()
flow = _Flow(request=_Request(), response=_Response(body='{"result": "ok"}'))
entry = _log_response(addon, flow)
self.assertEqual('{"result": "ok"}', entry["body"])
# ---------------------------------------------------------------------------
# _log_response — response header value redaction
# ---------------------------------------------------------------------------
class TestLogResponseHeaderValuesRedacted(unittest.TestCase):
def test_token_in_response_header_scrubbed(self) -> None:
addon = _addon()
flow = _Flow(
request=_Request(),
response=_Response(headers={"set-cookie": f"token={_OPENAI_KEY}"}),
)
entry = _log_response(addon, flow)
cookie_val = entry["headers"].get("set-cookie", "")
self.assertNotIn(_OPENAI_KEY, cookie_val)
self.assertIn("********", cookie_val)
def test_clean_response_header_preserved(self) -> None:
addon = _addon()
flow = _Flow(
request=_Request(),
response=_Response(headers={"content-type": "application/json"}),
)
entry = _log_response(addon, flow)
self.assertEqual("application/json", entry["headers"]["content-type"])
if __name__ == "__main__":
unittest.main()
-9
View File
@@ -54,15 +54,6 @@ class TestValidateRoutesContent(unittest.TestCase):
' auth_scheme: "Bearer"\n'
)
def test_rejects_log_full(self):
with self.assertRaises(EgressApplyError) as cm:
applicator.validate_routes_content(
'log: 2\n'
'routes:\n'
' - host: "x.example"\n'
)
self.assertIn("must not change egress logging", str(cm.exception))
class TestApplyRoutesChange(unittest.TestCase):
def setUp(self):
+1 -23
View File
@@ -30,7 +30,6 @@ def _plan(
supervise: bool = False,
agent_git_gate_url: str = "",
agent_supervise_url: str = "",
canary: bool = False,
) -> MacosContainerBottlePlan:
routes_path = stage_dir / "routes.yaml"
routes_path.write_text("routes: []\n", encoding="utf-8")
@@ -43,8 +42,7 @@ def _plan(
routes_path=routes_path,
routes=("route",),
token_env_map={"EGRESS_TOKEN_0": "HOST_TOKEN"},
canary="fake-canary-value" if canary else "",
canary_env="CANON_ALPHA_SECRET" if canary else "",
canary="",
)
if git:
key_path = stage_dir / "origin-key"
@@ -141,26 +139,6 @@ class TestMacosContainerLaunchArgv(unittest.TestCase):
argv,
)
def test_sidecar_argv_registers_canary_env_as_sensitive(self):
plan = _plan(stage_dir=self.stage_dir, canary=True)
argv = launch._sidecar_run_argv(
plan,
"bot-bottle-sidecars-dev-abc",
"bot-bottle-net-dev-abc",
"bot-bottle-egress-dev-abc",
)
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", argv)
self.assertIn("BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET", argv)
def test_agent_argv_receives_canary_env(self):
plan = _plan(stage_dir=self.stage_dir, canary=True)
argv = launch._agent_run_argv(
plan,
"bot-bottle-net-dev-abc",
"192.0.2.10",
)
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", argv)
def test_agent_env_points_proxy_at_sidecar_ip(self):
plan = _plan(
stage_dir=self.stage_dir,
-27
View File
@@ -73,33 +73,6 @@ resolver #2
)
self.assertTrue(run.call_args_list[-1].kwargs["check"])
def test_build_image_anchors_relative_dockerfile_to_context(self):
status = util.subprocess.CompletedProcess(
args=[],
returncode=0,
stdout=(
'[{"status":{"state":"running"},'
'"configuration":{"dns":{"nameservers":["9.9.9.9"]}}}]'
),
stderr="",
)
with patch.object(util.subprocess, "run", return_value=status) as run, \
patch.object(util.os, "environ", {
"BOT_BOTTLE_MACOS_CONTAINER_DNS": "9.9.9.9",
}):
util.build_image(
"bot-bottle-sidecars:latest",
"/repo",
dockerfile="Dockerfile.sidecars",
)
self.assertEqual(
[
"container", "build", "-t", "bot-bottle-sidecars:latest",
"--dns", "9.9.9.9", "-f", "/repo/Dockerfile.sidecars", "/repo",
],
run.call_args_list[-1].args[0],
)
def test_commit_container_execs_tar_and_builds_image(self):
# stderr is bytes because subprocess.run uses stderr=PIPE without text=True
completed = util.subprocess.CompletedProcess(
+1 -46
View File
@@ -167,40 +167,13 @@ class TestAgentProviderHostCredentials(unittest.TestCase):
},
})
def test_startup_args_allowed_for_claude(self):
b = _provider_config_bottle({
"template": "claude",
"settings": {"startup_args": ["--model", "opus"]},
})
self.assertEqual(
{"startup_args": ["--model", "opus"]},
b.agent_provider.settings,
)
def test_startup_args_allowed_for_codex(self):
b = _provider_config_bottle({
"template": "codex",
"settings": {"startup_args": ["--model", "gpt-5-codex"]},
})
self.assertEqual(
{"startup_args": ["--model", "gpt-5-codex"]},
b.agent_provider.settings,
)
def test_provider_specific_settings_still_rejected_for_claude(self):
def test_settings_rejected_for_claude(self):
with self.assertRaises(ManifestError):
_provider_config_bottle({
"template": "claude",
"settings": {"models": ["qwen2.5-coder:7b"]},
})
def test_startup_args_must_be_string_array(self):
with self.assertRaises(ManifestError):
_provider_config_bottle({
"template": "codex",
"settings": {"startup_args": ["--model", 42]},
})
def test_settings_models_must_be_non_empty_string_array(self):
with self.assertRaises(ManifestError):
_provider_config_bottle({
@@ -329,24 +302,6 @@ class TestDlp(unittest.TestCase):
"bogus": True,
}}])
def test_outbound_on_match_omitted_is_empty(self):
b = _bottle([{"host": "x.example"}])
self.assertEqual("", b.egress.routes[0].OutboundOnMatch)
def test_outbound_on_match_accepts_policies(self):
for policy in ("block", "redact", "supervise"):
with self.subTest(policy=policy):
b = _bottle([{"host": "x.example", "dlp": {
"outbound_on_match": policy,
}}])
self.assertEqual(policy, b.egress.routes[0].OutboundOnMatch)
def test_outbound_on_match_rejects_unknown_value(self):
with self.assertRaises(ManifestError):
_bottle([{"host": "x.example", "dlp": {
"outbound_on_match": "allow",
}}])
class TestGitPolicy(unittest.TestCase):
def test_omitted_means_https_git_fetch_disabled(self):
+1 -1
View File
@@ -130,7 +130,7 @@ def _capture_print(plan: DockerBottlePlan | SmolmachinesBottlePlan) -> list[str]
orig = sys.stderr
sys.stderr = buf
try:
plan.print()
plan.print(remote_control=False)
finally:
sys.stderr = orig
return buf.getvalue().splitlines()
+4 -29
View File
@@ -26,7 +26,9 @@ from bot_bottle.backend.smolmachines.bottle import SmolmachinesBottle
from bot_bottle.backend.smolmachines.bottle_plan import (
SmolmachinesBottlePlan,
)
from bot_bottle.backend.smolmachines import launch as _launch
# from bot_bottle.backend.smolmachines.provision import (
# workspace as _workspace,
# )
from bot_bottle.backend.smolmachines.launch import _bundle_launch_spec
from bot_bottle.backend.util import AGENT_CA_PATH
from bot_bottle.egress import EgressPlan, EgressRoute
@@ -42,6 +44,7 @@ class _Provider(AgentProvider):
return AgentProviderRuntime(
template="test", command="test", image="",
prompt_mode="append_file", bypass_args=(), resume_args=(),
remote_control_args=(),
)
def provision_plan(self, **kwargs): # type: ignore[override]
raise NotImplementedError
@@ -83,7 +86,6 @@ def _plan(
stage_dir: Path | None = None,
egress_routes: tuple[EgressRoute, ...] = (),
egress_ca_path: Path = Path(),
canary: bool = False,
supervise: bool = False,
bundle_ip: str = "192.168.50.2",
agent_git_gate_host: str = "127.0.0.1:55555",
@@ -154,8 +156,6 @@ def _plan(
routes=egress_routes,
token_env_map={},
mitmproxy_ca_cert_only_host_path=egress_ca_path,
canary="fake-canary-value" if canary else "",
canary_env="CANON_ALPHA_SECRET" if canary else "",
),
supervise_plan=supervise_plan,
agent_git_gate_host=agent_git_gate_host,
@@ -411,31 +411,6 @@ class TestBundleLaunchSpec(unittest.TestCase):
self.assertIn(9420, spec.ports_to_publish)
self.assertNotIn(9418, spec.ports_to_publish)
def test_canary_env_registered_as_sensitive_in_bundle(self):
plan = _plan(canary=True)
spec = _bundle_launch_spec(plan, "net", "127.0.0.16")
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", spec.environment)
self.assertIn(
"BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET",
spec.environment,
)
def test_canary_env_visible_to_smolvm_guest(self):
plan = _plan(canary=True)
with patch.object(
_launch._bundle,
"bundle_host_port",
return_value="65000",
):
stamped = _launch._discover_urls(plan, "127.0.0.16")
self.assertEqual(
"fake-canary-value",
stamped.guest_env["CANON_ALPHA_SECRET"],
)
class TestProvisionGitUser(unittest.TestCase):
"""`provision_git` runs `git config --global` inside the
+2 -13
View File
@@ -318,30 +318,19 @@ class TestToolConstants(unittest.TestCase):
def test_tools_tuple_matches_individual_constants(self):
self.assertEqual(
(
supervise.TOOL_EGRESS_ALLOW,
supervise.TOOL_ALLOW,
TOOL_CAPABILITY_BLOCK,
supervise.TOOL_EGRESS_BLOCK,
TOOL_GITLEAKS_ALLOW,
supervise.TOOL_EGRESS_TOKEN_ALLOW,
supervise.TOOL_LIST_EGRESS_ROUTES,
),
supervise.TOOLS,
)
def test_token_allow_proposal_roundtrips(self):
p = Proposal.new(
bottle_slug="dev",
tool=supervise.TOOL_EGRESS_TOKEN_ALLOW,
proposed_file="host: api.example.com\n",
justification="false positive",
current_file_hash="h",
)
self.assertEqual(p, Proposal.from_dict(p.to_dict()))
def test_component_map_has_egress_entries(self):
self.assertEqual(
{
supervise.TOOL_EGRESS_ALLOW: "egress",
supervise.TOOL_ALLOW: "egress",
supervise.TOOL_EGRESS_BLOCK: "egress",
},
supervise.COMPONENT_FOR_TOOL,
+1 -36
View File
@@ -20,7 +20,6 @@ from bot_bottle.supervise import (
STATUS_REJECTED,
TOOL_CAPABILITY_BLOCK,
TOOL_GITLEAKS_ALLOW,
TOOL_EGRESS_TOKEN_ALLOW,
read_audit_entries,
read_response,
sha256_hex,
@@ -33,10 +32,9 @@ FIXED = datetime(2026, 5, 25, 12, 0, 0, tzinfo=timezone.utc)
def _proposal(slug: str = "dev", tool: str = TOOL_CAPABILITY_BLOCK) -> Proposal:
payloads = {
TOOL_CAPABILITY_BLOCK: "FROM python:3.13\n",
supervise.TOOL_EGRESS_ALLOW: "routes:\n - host: example.com\n",
supervise.TOOL_ALLOW: "routes:\n - host: example.com\n",
supervise.TOOL_EGRESS_BLOCK: "routes:\n - host: example.com\n",
TOOL_GITLEAKS_ALLOW: "file: tests/test_fixture.py\nline: 3\n",
TOOL_EGRESS_TOKEN_ALLOW: "host: api.example.com\ndetector: token\n",
}
payload = payloads.get(tool, "")
return Proposal.new(
@@ -198,39 +196,6 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
resp = read_response(qp.queue_dir, qp.proposal.id)
self.assertEqual("test fixture", resp.notes)
def test_approve_token_allow_leaves_response_for_egress(self):
qp = self._enqueue(tool=TOOL_EGRESS_TOKEN_ALLOW)
supervise_cli.approve(qp, notes="false positive")
# The egress addon polls the queue dir for the response; the TUI must
# not archive it (the addon archives after reading).
resp = read_response(qp.queue_dir, qp.proposal.id)
self.assertEqual(STATUS_APPROVED, resp.status)
self.assertEqual("false positive", resp.notes)
self.assertFalse((qp.queue_dir / "processed").exists())
def test_token_allow_writes_no_audit_log(self):
qp = self._enqueue(tool=TOOL_EGRESS_TOKEN_ALLOW)
supervise_cli.approve(qp, notes="false positive")
self.assertEqual([], read_audit_entries("egress", "dev"))
def test_tui_token_allow_requires_reason(self):
qp = self._enqueue(tool=TOOL_EGRESS_TOKEN_ALLOW)
with patch.object(supervise_cli, "_prompt", return_value=""):
status = supervise_cli._approve_from_tui(None, qp) # type: ignore[arg-type]
self.assertEqual("approve aborted (empty reason)", status)
self.assertFalse((qp.queue_dir / "processed").exists())
def test_tui_token_allow_writes_reason(self):
qp = self._enqueue(tool=TOOL_EGRESS_TOKEN_ALLOW)
with patch.object(supervise_cli, "_prompt", return_value="legit"):
status = supervise_cli._approve_from_tui(None, qp) # type: ignore[arg-type]
self.assertIn("approved egress-token-allow", status)
resp = read_response(qp.queue_dir, qp.proposal.id)
self.assertEqual("legit", resp.notes)
def test_suffix_for_token_allow_is_txt(self):
self.assertEqual(".txt", supervise_cli._suffix_for_tool(TOOL_EGRESS_TOKEN_ALLOW))
# class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
# # DISABLED — capability_apply functionality is currently commented out.
+5 -14
View File
@@ -59,7 +59,7 @@ class TestValidation(unittest.TestCase):
def test_egress_routes_yaml_is_validated(self):
validate_proposed_file(
_sv.TOOL_EGRESS_ALLOW,
_sv.TOOL_ALLOW,
"routes:\n - host: example.com\n",
)
@@ -67,15 +67,6 @@ class TestValidation(unittest.TestCase):
with self.assertRaises(_RpcError):
validate_proposed_file(_sv.TOOL_EGRESS_BLOCK, "routes: nope\n")
def test_egress_routes_yaml_rejects_log_full(self):
with self.assertRaises(_RpcError) as cm:
validate_proposed_file(
_sv.TOOL_EGRESS_ALLOW,
"log: 2\nroutes:\n - host: example.com\n",
)
self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code)
self.assertIn("must not change egress logging", cm.exception.message)
# --- JSON-RPC parsing ------------------------------------------------------
@@ -156,7 +147,7 @@ class TestHandleToolsList(unittest.TestCase):
names = [t["name"] for t in result["tools"]] # type: ignore[index]
self.assertEqual(
sorted([
_sv.TOOL_EGRESS_ALLOW,
_sv.TOOL_ALLOW,
_sv.TOOL_CAPABILITY_BLOCK,
_sv.TOOL_EGRESS_BLOCK,
_sv.TOOL_LIST_EGRESS_ROUTES,
@@ -190,7 +181,7 @@ class TestHandleToolsList(unittest.TestCase):
self.assertNotIn("required", schema) # type: ignore[operator]
def test_egress_tools_take_routes_yaml_and_justification(self):
for tool_name in (_sv.TOOL_EGRESS_ALLOW, _sv.TOOL_EGRESS_BLOCK):
for tool_name in (_sv.TOOL_ALLOW, _sv.TOOL_EGRESS_BLOCK):
with self.subTest(tool_name=tool_name):
tool = next(t for t in TOOL_DEFINITIONS if t["name"] == tool_name)
schema = tool["inputSchema"]
@@ -253,7 +244,7 @@ class TestHandleToolsCall(unittest.TestCase):
try:
result = handle_tools_call(
{
"name": _sv.TOOL_EGRESS_ALLOW,
"name": _sv.TOOL_ALLOW,
"arguments": {
"routes_yaml": "routes:\n - host: example.com\n",
"justification": "need example.com",
@@ -460,7 +451,7 @@ class TestHttpEndToEnd(unittest.TestCase):
self.assertEqual(1, result["id"])
names = [t["name"] for t in result["result"]["tools"]] # type: ignore[index]
self.assertIn(_sv.TOOL_CAPABILITY_BLOCK, names)
self.assertIn(_sv.TOOL_EGRESS_ALLOW, names)
self.assertIn(_sv.TOOL_ALLOW, names)
self.assertIn(_sv.TOOL_EGRESS_BLOCK, names)
def test_unknown_method_returns_jsonrpc_error(self):