Compare commits

..

1 Commits

Author SHA1 Message Date
didericis-claude 0661464a58 fix(egress): strip injected Authorization and redact bodies in LOG_FULL path
lint / lint (push) Failing after 2m11s
test / unit (pull_request) Successful in 41s
test / integration (pull_request) Successful in 25s
_log_request and _log_response wrote headers and bodies to stderr verbatim.
_log_request also included the sidecar-injected upstream Authorization value,
exposing live bearer tokens on every allowed request under LOG_FULL.

Apply redact_tokens to all header values and bodies in both log functions;
exclude the authorization header from _log_request entirely since its value
is always a live sidecar-injected credential by the time _log_request runs.

Closes #257
2026-06-25 02:39:54 +00:00
44 changed files with 194 additions and 1428 deletions
-9
View File
@@ -1,9 +0,0 @@
[run]
branch = True
source = .
[report]
omit =
bot_bottle/egress_addon.py
bot_bottle/cli/tui.py
tests/*
+1 -7
View File
@@ -39,14 +39,8 @@ jobs:
with: with:
python-version: "3.12" python-version: "3.12"
- name: Install dev requirements
run: python3 -m pip install -r requirements-dev.txt
- name: Run unit tests - name: Run unit tests
run: python3 -m coverage run -m unittest discover -t . -s tests/unit -v run: python3 -m unittest discover -t . -s tests/unit -v
- name: Report unit coverage
run: python3 -m coverage report -m
integration: integration:
runs-on: ubuntu-latest runs-on: ubuntu-latest
-1
View File
@@ -22,4 +22,3 @@ venv/
.pytest_cache/ .pytest_cache/
.mypy_cache/ .mypy_cache/
.ruff_cache/ .ruff_cache/
.coverage
+2 -10
View File
@@ -61,6 +61,7 @@ class AgentProviderRuntime:
prompt_mode: PromptMode prompt_mode: PromptMode
bypass_args: tuple[str, ...] bypass_args: tuple[str, ...]
resume_args: tuple[str, ...] resume_args: tuple[str, ...]
remote_control_args: tuple[str, ...]
@dataclass(frozen=True) @dataclass(frozen=True)
@@ -370,15 +371,6 @@ def build_agent_provision_plan(
) )
def provider_startup_args(
provider_settings: dict[str, object] | None,
) -> tuple[str, ...]:
raw = (provider_settings or {}).get("startup_args", ())
if not isinstance(raw, (list, tuple)):
return ()
return tuple(arg for arg in raw if isinstance(arg, str))
def prompt_args( def prompt_args(
prompt_mode: PromptMode, prompt_mode: PromptMode,
prompt_path: str | None, prompt_path: str | None,
@@ -390,7 +382,7 @@ def prompt_args(
if prompt_mode == "append_file": if prompt_mode == "append_file":
return ["--append-system-prompt-file", prompt_path] return ["--append-system-prompt-file", prompt_path]
if prompt_mode == "read_prompt_file": if prompt_mode == "read_prompt_file":
if argv and ("resume" in argv or "remote-control" in argv): if argv and "resume" in argv:
return [] return []
return [f"Read and follow the instructions in {prompt_path}."] return [f"Read and follow the instructions in {prompt_path}."]
if prompt_mode == "print_read_prompt_file": if prompt_mode == "print_read_prompt_file":
+2 -1
View File
@@ -109,8 +109,9 @@ class BottlePlan(ABC):
def workspace_plan(self) -> WorkspacePlan: def workspace_plan(self) -> WorkspacePlan:
return workspace_plan(self.spec, guest_home=self.guest_home) return workspace_plan(self.spec, guest_home=self.guest_home)
def print(self) -> None: def print(self, *, remote_control: bool) -> None:
"""Render the y/N preflight summary to stderr.""" """Render the y/N preflight summary to stderr."""
del remote_control
spec = self.spec spec = self.spec
manifest = self.manifest manifest = self.manifest
agent = manifest.agent agent = manifest.agent
+2 -4
View File
@@ -28,8 +28,6 @@ from typing import Any
from ...egress import ( from ...egress import (
EGRESS_HOSTNAME, EGRESS_HOSTNAME,
EGRESS_ROUTES_IN_CONTAINER, EGRESS_ROUTES_IN_CONTAINER,
egress_agent_env_entries,
egress_sidecar_env_entries,
) )
from ...git_gate import GIT_GATE_HOSTNAME from ...git_gate import GIT_GATE_HOSTNAME
from ...log import die, warn from ...log import die, warn
@@ -137,7 +135,8 @@ def _sidecar_bundle_service(plan: DockerBottlePlan) -> dict[str, Any]:
volumes.append(_bind(ep.mitmproxy_ca_host_path, EGRESS_CA_IN_CONTAINER)) volumes.append(_bind(ep.mitmproxy_ca_host_path, EGRESS_CA_IN_CONTAINER))
if ep.routes: if ep.routes:
volumes.append(_bind(ep.routes_path.parent, str(Path(EGRESS_ROUTES_IN_CONTAINER).parent))) volumes.append(_bind(ep.routes_path.parent, str(Path(EGRESS_ROUTES_IN_CONTAINER).parent)))
env.extend(egress_sidecar_env_entries(ep)) for token_env in sorted(ep.token_env_map.keys()):
env.append(token_env)
# --- git-gate ----------------------------------------------------- # --- git-gate -----------------------------------------------------
gp = plan.git_gate_plan gp = plan.git_gate_plan
@@ -221,7 +220,6 @@ def _agent_service(plan: DockerBottlePlan) -> dict[str, Any]:
# never lands on argv or in the compose file. # never lands on argv or in the compose file.
for name in sorted(plan.forwarded_env.keys()): for name in sorted(plan.forwarded_env.keys()):
env.append(name) env.append(name)
env.extend(egress_agent_env_entries(plan.egress_plan))
service: dict[str, Any] = { service: dict[str, Any] = {
"image": plan.image, "image": plan.image,
+2 -6
View File
@@ -11,7 +11,7 @@ from pathlib import Path
from ..bottle_state import egress_state_dir from ..bottle_state import egress_state_dir
from ..egress import EGRESS_ROUTES_FILENAME from ..egress import EGRESS_ROUTES_FILENAME
from ..egress_addon_core import LOG_OFF, load_config from ..egress_addon_core import load_routes
class EgressApplyError(RuntimeError): class EgressApplyError(RuntimeError):
@@ -33,15 +33,11 @@ class EgressApplicator(ABC):
@staticmethod @staticmethod
def validate_routes_content(content: str) -> None: def validate_routes_content(content: str) -> None:
try: try:
config = load_config(content) load_routes(content)
except ValueError as e: except ValueError as e:
raise EgressApplyError( raise EgressApplyError(
f"proposed routes.yaml is not valid: {e}" f"proposed routes.yaml is not valid: {e}"
) from e ) from e
if config.log != LOG_OFF:
raise EgressApplyError(
"proposed routes.yaml must not change egress logging"
)
@staticmethod @staticmethod
def _routes_path(slug: str) -> Path: def _routes_path(slug: str) -> Path:
+4 -8
View File
@@ -22,12 +22,7 @@ from ...bottle_state import (
git_gate_state_dir, git_gate_state_dir,
read_committed_image, read_committed_image,
) )
from ...egress import ( from ...egress import EGRESS_ROUTES_IN_CONTAINER, egress_resolve_token_values
EGRESS_ROUTES_IN_CONTAINER,
egress_agent_env_entries,
egress_resolve_token_values,
egress_sidecar_env_entries,
)
from ...git_gate import revoke_git_gate_provisioned_keys from ...git_gate import revoke_git_gate_provisioned_keys
from ...log import die, info, warn from ...log import die, info, warn
from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT
@@ -355,7 +350,9 @@ def _sidecar_daemons(plan: MacosContainerBottlePlan) -> tuple[str, ...]:
def _sidecar_env_entries(plan: MacosContainerBottlePlan) -> tuple[str, ...]: def _sidecar_env_entries(plan: MacosContainerBottlePlan) -> tuple[str, ...]:
env: list[str] = list(egress_sidecar_env_entries(plan.egress_plan)) env: list[str] = []
if plan.egress_plan.routes:
env.extend(sorted(plan.egress_plan.token_env_map.keys()))
if plan.git_gate_plan.upstreams: if plan.git_gate_plan.upstreams:
env.append(f"BOT_BOTTLE_GIT_GATE_READY_FILE={_GIT_GATE_READY_FILE}") env.append(f"BOT_BOTTLE_GIT_GATE_READY_FILE={_GIT_GATE_READY_FILE}")
if plan.supervise_plan is not None: if plan.supervise_plan is not None:
@@ -423,7 +420,6 @@ def _agent_env_entries(
env.append(f"{name}={value}") env.append(f"{name}={value}")
for name in sorted(plan.forwarded_env.keys()): for name in sorted(plan.forwarded_env.keys()):
env.append(name) env.append(name)
env.extend(egress_agent_env_entries(plan.egress_plan))
return tuple(env) return tuple(env)
@@ -68,11 +68,6 @@ def build_image(ref: str, context: str, *, dockerfile: str = "") -> None:
_ensure_builder_dns() _ensure_builder_dns()
args = [_CONTAINER, "build", "-t", ref, "--dns", dns_server()] args = [_CONTAINER, "build", "-t", ref, "--dns", dns_server()]
if dockerfile: if dockerfile:
# `container build` resolves -f relative to the current working
# directory, not the build context. Anchor a relative Dockerfile to
# the context so builds work from any cwd.
if not os.path.isabs(dockerfile):
dockerfile = os.path.join(context, dockerfile)
args.extend(["-f", dockerfile]) args.extend(["-f", dockerfile])
args.append(context) args.append(context)
subprocess.run(args, check=True) subprocess.run(args, check=True)
+5 -6
View File
@@ -23,9 +23,7 @@ from typing import Callable, Generator
from ...egress import ( from ...egress import (
EGRESS_ROUTES_IN_CONTAINER, EGRESS_ROUTES_IN_CONTAINER,
egress_agent_env_entries,
egress_resolve_token_values, egress_resolve_token_values,
egress_sidecar_env_entries,
) )
from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT
from ...util import expand_tilde from ...util import expand_tilde
@@ -230,9 +228,6 @@ def _discover_urls(
guest_env["GIT_GATE_URL"] = f"http://{agent_git_gate_host}" guest_env["GIT_GATE_URL"] = f"http://{agent_git_gate_host}"
if agent_supervise_url: if agent_supervise_url:
guest_env["MCP_SUPERVISE_URL"] = agent_supervise_url guest_env["MCP_SUPERVISE_URL"] = agent_supervise_url
for entry in egress_agent_env_entries(plan.egress_plan):
name, value = entry.split("=", 1)
guest_env[name] = value
return dataclasses.replace( return dataclasses.replace(
plan, plan,
@@ -321,7 +316,11 @@ def _bundle_launch_spec(
volumes.append((str(ep.mitmproxy_ca_host_path), EGRESS_CA_IN_CONTAINER, True)) volumes.append((str(ep.mitmproxy_ca_host_path), EGRESS_CA_IN_CONTAINER, True))
if ep.routes: if ep.routes:
volumes.append((str(ep.routes_path.parent), str(Path(EGRESS_ROUTES_IN_CONTAINER).parent), True)) volumes.append((str(ep.routes_path.parent), str(Path(EGRESS_ROUTES_IN_CONTAINER).parent), True))
env.extend(egress_sidecar_env_entries(ep)) # Bare-name entries for upstream-token slots. Their values
# come from the docker-run subprocess env (inherited from
# the operator's shell), never landing on argv.
for token_env in sorted(ep.token_env_map.keys()):
env.append(token_env)
# --- git-gate --------------------------------------------- # --- git-gate ---------------------------------------------
gp = plan.git_gate_plan gp = plan.git_gate_plan
+2
View File
@@ -28,6 +28,7 @@ from .start import _launch_bottle
def cmd_resume(argv: list[str]) -> int: def cmd_resume(argv: list[str]) -> int:
parser = argparse.ArgumentParser(prog=f"{PROG} resume", add_help=True) parser = argparse.ArgumentParser(prog=f"{PROG} resume", add_help=True)
parser.add_argument("--dry-run", action="store_true") parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--remote-control", action="store_true")
parser.add_argument( parser.add_argument(
"identity", "identity",
help="bottle identity from a prior `start` (see its session-end output)", help="bottle identity from a prior `start` (see its session-end output)",
@@ -55,5 +56,6 @@ def cmd_resume(argv: list[str]) -> int:
return _launch_bottle( return _launch_bottle(
spec, spec,
dry_run=args.dry_run, dry_run=args.dry_run,
remote_control=args.remote_control,
backend_name=backend_name, backend_name=backend_name,
) )
+10 -4
View File
@@ -42,6 +42,7 @@ def cmd_start(argv: list[str]) -> int:
parser = argparse.ArgumentParser(prog=f"{PROG} start", add_help=True) parser = argparse.ArgumentParser(prog=f"{PROG} start", add_help=True)
parser.add_argument("--dry-run", action="store_true") parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--cwd", action="store_true", help="copy host cwd into the running bottle") parser.add_argument("--cwd", action="store_true", help="copy host cwd into the running bottle")
parser.add_argument("--remote-control", action="store_true")
parser.add_argument( parser.add_argument(
"--backend", "--backend",
choices=known_backend_names(), choices=known_backend_names(),
@@ -88,6 +89,7 @@ def cmd_start(argv: list[str]) -> int:
return _launch_bottle( return _launch_bottle(
spec, spec,
dry_run=dry_run, dry_run=dry_run,
remote_control=args.remote_control,
backend_name=backend_name, backend_name=backend_name,
) )
@@ -132,7 +134,7 @@ def prepare_with_preflight(
def attach_agent( def attach_agent(
bottle: Bottle, *, resume: bool = False, bottle: Bottle, *, remote_control: bool = False, resume: bool = False,
agent_provider_template: str = "claude", agent_provider_template: str = "claude",
startup_args: tuple[str, ...] = (), startup_args: tuple[str, ...] = (),
) -> int: ) -> int:
@@ -151,6 +153,8 @@ def attach_agent(
"(Ctrl-D or 'exit' to leave; container will be removed)" "(Ctrl-D or 'exit' to leave; container will be removed)"
) )
agent_args = list(runtime.bypass_args) agent_args = list(runtime.bypass_args)
if remote_control:
agent_args.extend(runtime.remote_control_args)
agent_args.extend(startup_args) agent_args.extend(startup_args)
if resume: if resume:
agent_args.extend(runtime.resume_args) agent_args.extend(runtime.resume_args)
@@ -214,9 +218,9 @@ def _text_prompt_yes() -> bool:
return reply in ("y", "Y", "yes", "YES") return reply in ("y", "Y", "yes", "YES")
def _text_render_preflight(): def _text_render_preflight(*, remote_control: bool):
def _render(plan: DockerBottlePlan) -> None: def _render(plan: DockerBottlePlan) -> None:
plan.print() plan.print(remote_control=remote_control)
return _render return _render
@@ -224,6 +228,7 @@ def _launch_bottle(
spec: BottleSpec, spec: BottleSpec,
*, *,
dry_run: bool, dry_run: bool,
remote_control: bool,
backend_name: str | None = None, backend_name: str | None = None,
) -> int: ) -> int:
"""Shared launch core for `start` and `resume`. Builds the plan, """Shared launch core for `start` and `resume`. Builds the plan,
@@ -235,7 +240,7 @@ def _launch_bottle(
plan, identity = prepare_with_preflight( plan, identity = prepare_with_preflight(
spec, spec,
stage_dir=stage_dir, stage_dir=stage_dir,
render_preflight=_text_render_preflight(), render_preflight=_text_render_preflight(remote_control=remote_control),
prompt_yes=_text_prompt_yes, prompt_yes=_text_prompt_yes,
dry_run=dry_run, dry_run=dry_run,
backend_name=backend_name, backend_name=backend_name,
@@ -248,6 +253,7 @@ def _launch_bottle(
agent_provider_template = getattr(plan, "agent_provider_template", "claude") agent_provider_template = getattr(plan, "agent_provider_template", "claude")
exit_code = attach_agent( exit_code = attach_agent(
bottle, bottle,
remote_control=remote_control,
agent_provider_template=agent_provider_template, agent_provider_template=agent_provider_template,
startup_args=plan.agent_provision.startup_args, startup_args=plan.agent_provision.startup_args,
) )
+36 -9
View File
@@ -2,8 +2,9 @@
act on them (approve / modify / reject). act on them (approve / modify / reject).
Curses-based TUI; modify-then-approve shells out to $EDITOR. The Curses-based TUI; modify-then-approve shells out to $EDITOR. The
Egress proposals are queued for operator review as full routes.yaml approval handler wires to PRD 0016 (capability-block), which rebuilds
updates. the bottle Dockerfile. Egress proposals are queued for operator review
as full routes.yaml updates.
""" """
from __future__ import annotations from __future__ import annotations
@@ -21,6 +22,10 @@ from pathlib import Path
from .. import supervise as _supervise from .. import supervise as _supervise
from ..bottle_state import read_metadata from ..bottle_state import read_metadata
# from ..backend.docker.capability_apply import (
# CapabilityApplyError,
# apply_capability_change,
# )
from ..backend.docker.egress_apply import ( from ..backend.docker.egress_apply import (
EgressApplyError, EgressApplyError,
applicator as _docker_applicator, applicator as _docker_applicator,
@@ -33,6 +38,10 @@ from ..backend.smolmachines.egress_apply import (
) )
from ..log import Die, error, info from ..log import Die, error, info
class CapabilityApplyError(RuntimeError):
"""Placeholder while capability_apply is disabled."""
from ..supervise import ( from ..supervise import (
COMPONENT_FOR_TOOL, COMPONENT_FOR_TOOL,
AuditEntry, AuditEntry,
@@ -41,10 +50,12 @@ from ..supervise import (
STATUS_APPROVED, STATUS_APPROVED,
STATUS_MODIFIED, STATUS_MODIFIED,
STATUS_REJECTED, STATUS_REJECTED,
TOOL_CAPABILITY_BLOCK,
TOOL_EGRESS_ALLOW, TOOL_EGRESS_ALLOW,
TOOL_EGRESS_BLOCK, TOOL_EGRESS_BLOCK,
TOOL_GITLEAKS_ALLOW, TOOL_GITLEAKS_ALLOW,
TOOL_EGRESS_TOKEN_ALLOW, TOOL_EGRESS_TOKEN_ALLOW,
archive_proposal,
list_pending_proposals, list_pending_proposals,
render_diff, render_diff,
write_audit_entry, write_audit_entry,
@@ -72,7 +83,7 @@ class QueuedProposal:
# Errors any remediation engine may raise. Caught by the TUI key # Errors any remediation engine may raise. Caught by the TUI key
# handlers and surfaced in the status line so a failed apply keeps # handlers and surfaced in the status line so a failed apply keeps
# the proposal pending rather than crashing curses. # the proposal pending rather than crashing curses.
ApplyError = (EgressApplyError,) ApplyError = (CapabilityApplyError, EgressApplyError)
def apply_routes_change(slug: str, content: str) -> tuple[str, str]: def apply_routes_change(slug: str, content: str) -> tuple[str, str]:
@@ -132,6 +143,8 @@ def _detail_lines(
def _suffix_for_tool(tool: str) -> str: def _suffix_for_tool(tool: str) -> str:
if tool == TOOL_CAPABILITY_BLOCK:
return ".dockerfile"
if tool in (TOOL_EGRESS_ALLOW, TOOL_EGRESS_BLOCK): if tool in (TOOL_EGRESS_ALLOW, TOOL_EGRESS_BLOCK):
return ".yaml" return ".yaml"
if tool in (TOOL_GITLEAKS_ALLOW, TOOL_EGRESS_TOKEN_ALLOW): if tool in (TOOL_GITLEAKS_ALLOW, TOOL_EGRESS_TOKEN_ALLOW):
@@ -153,6 +166,17 @@ def approve(
file_to_apply = final_file if final_file is not None else qp.proposal.proposed_file file_to_apply = final_file if final_file is not None else qp.proposal.proposed_file
diff_before, diff_after = "", "" diff_before, diff_after = "", ""
# if qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
# _meta = read_metadata(qp.proposal.bottle_slug)
# if _meta is not None and not _meta.compose_project:
# raise CapabilityApplyError(
# "capability-block remediation is not supported for smolmachines "
# "bottles. Reject this proposal or handle the capability change "
# "manually, then restart the bottle."
# )
# diff_before, diff_after = apply_capability_change(
# qp.proposal.bottle_slug, file_to_apply,
# )
if qp.proposal.tool in (TOOL_EGRESS_ALLOW, TOOL_EGRESS_BLOCK): if qp.proposal.tool in (TOOL_EGRESS_ALLOW, TOOL_EGRESS_BLOCK):
diff_before, diff_after = apply_routes_change( diff_before, diff_after = apply_routes_change(
qp.proposal.bottle_slug, qp.proposal.bottle_slug,
@@ -170,6 +194,9 @@ def approve(
qp, action=status, notes=notes, qp, action=status, notes=notes,
diff_before=diff_before, diff_after=diff_after, diff_before=diff_before, diff_after=diff_after,
) )
if qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
archive_proposal(qp.queue_dir, qp.proposal.id)
def reject(qp: QueuedProposal, *, reason: str) -> None: def reject(qp: QueuedProposal, *, reason: str) -> None:
"""Write a rejection response and an audit entry.""" """Write a rejection response and an audit entry."""
@@ -319,7 +346,7 @@ def _list_once() -> int:
return 0 return 0
def _try_init_green() -> int: # pragma: no cover def _try_init_green() -> int:
"""Initialise a green color pair and return its attr, or 0.""" """Initialise a green color pair and return its attr, or 0."""
try: try:
curses.start_color() curses.start_color()
@@ -330,7 +357,7 @@ def _try_init_green() -> int: # pragma: no cover
return 0 return 0
def _main_loop(stdscr: "curses._CursesWindow") -> None: # type: ignore # pragma: no cover def _main_loop(stdscr: "curses._CursesWindow") -> None: # type: ignore
curses.curs_set(0) curses.curs_set(0)
stdscr.timeout(_REFRESH_INTERVAL_MS) stdscr.timeout(_REFRESH_INTERVAL_MS)
green_attr = _try_init_green() green_attr = _try_init_green()
@@ -420,7 +447,7 @@ def _render(
status_line: str, status_line: str,
*, *,
green_attr: int = 0, # noqa: F841 — unused, but required by interface green_attr: int = 0, # noqa: F841 — unused, but required by interface
) -> None: # pragma: no cover ) -> None:
stdscr.erase() stdscr.erase()
h, w = stdscr.getmaxyx() h, w = stdscr.getmaxyx()
header = f"bot-bottle supervise ({len(pending)} pending)" header = f"bot-bottle supervise ({len(pending)} pending)"
@@ -471,7 +498,7 @@ def _detail_view(
qp: QueuedProposal, qp: QueuedProposal,
*, *,
green_attr: int = 0, green_attr: int = 0,
) -> None: # pragma: no cover ) -> None:
"""Render the full proposal. Scrollable. Press q to return.""" """Render the full proposal. Scrollable. Press q to return."""
lines = _detail_lines(qp, green_attr=green_attr) lines = _detail_lines(qp, green_attr=green_attr)
offset = 0 offset = 0
@@ -523,7 +550,7 @@ def _detail_view(
return return
def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None: # type: ignore # pragma: no cover def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None: # type: ignore
"""Suspend curses, open $EDITOR on the proposed file, return edited content.""" """Suspend curses, open $EDITOR on the proposed file, return edited content."""
suffix = _suffix_for_tool(qp.proposal.tool) suffix = _suffix_for_tool(qp.proposal.tool)
curses.endwin() curses.endwin()
@@ -534,7 +561,7 @@ def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None:
return edited return edited
def _prompt(stdscr: "curses._CursesWindow", label: str) -> str: # type: ignore # pragma: no cover def _prompt(stdscr: "curses._CursesWindow", label: str) -> str: # type: ignore
"""One-line input at the bottom of the screen.""" """One-line input at the bottom of the screen."""
curses.curs_set(1) curses.curs_set(1)
h, _ = stdscr.getmaxyx() h, _ = stdscr.getmaxyx()
+2 -4
View File
@@ -20,7 +20,6 @@ from ...agent_provider import (
AgentProvisionDir, AgentProvisionDir,
AgentProvisionFile, AgentProvisionFile,
AgentProvisionPlan, AgentProvisionPlan,
provider_startup_args,
) )
from ...backend.docker import util as docker_mod from ...backend.docker import util as docker_mod
from ...egress import EgressRoute from ...egress import EgressRoute
@@ -91,6 +90,7 @@ _RUNTIME = AgentProviderRuntime(
prompt_mode="append_file", prompt_mode="append_file",
bypass_args=("--dangerously-skip-permissions",), bypass_args=("--dangerously-skip-permissions",),
resume_args=("--continue",), resume_args=("--continue",),
remote_control_args=("--remote-control",),
) )
@@ -115,9 +115,8 @@ class ClaudeAgentProvider(AgentProvider):
color: str = "", color: str = "",
provider_settings: dict[str, object] | None = None, provider_settings: dict[str, object] | None = None,
) -> AgentProvisionPlan: ) -> AgentProvisionPlan:
del forward_host_credentials, host_env del forward_host_credentials, host_env, provider_settings
resolved_guest_env = dict(guest_env or {}) resolved_guest_env = dict(guest_env or {})
startup_args = provider_startup_args(provider_settings)
guest_home = self.guest_home guest_home = self.guest_home
trusted_path = trusted_project_path or guest_home trusted_path = trusted_project_path or guest_home
@@ -200,7 +199,6 @@ class ClaudeAgentProvider(AgentProvider):
env_vars=env_vars, env_vars=env_vars,
guest_env=resolved_guest_env, guest_env=resolved_guest_env,
has_prompt=has_prompt, has_prompt=has_prompt,
startup_args=startup_args,
dirs=dirs, dirs=dirs,
files=tuple(files), files=tuple(files),
egress_routes=egress_routes, egress_routes=egress_routes,
+6 -9
View File
@@ -1,12 +1,12 @@
# bot-bottle Codex provider image. # bot-bottle Codex provider image.
# #
# Mirrors the default Claude image shape: Node LTS, git/network tooling, # Mirrors the default Claude image shape: Node LTS, git/network tooling,
# non-root node user, and the provider CLI installed for that user. # non-root node user, and the provider CLI installed globally.
FROM node:22-slim FROM node:22-slim
RUN apt-get update \ RUN apt-get update \
&& apt-get install -y --no-install-recommends git ca-certificates curl procps \ && apt-get install -y --no-install-recommends git ca-certificates curl \
&& rm -rf /var/lib/apt/lists/* && rm -rf /var/lib/apt/lists/*
# App-specific deps. Python isn't required by codex itself # App-specific deps. Python isn't required by codex itself
@@ -17,15 +17,12 @@ RUN apt-get update \
&& apt-get install -y --no-install-recommends python3 python3-pip python3-venv \ && apt-get install -y --no-install-recommends python3 python3-pip python3-venv \
&& rm -rf /var/lib/apt/lists/* && rm -rf /var/lib/apt/lists/*
RUN npm install -g --no-fund --no-audit @openai/codex@0.136.0 \
&& npm cache clean --force
USER node USER node
WORKDIR /home/node WORKDIR /home/node
ENV PATH="/home/node/.local/bin:${PATH}" RUN mkdir -p /home/node/.codex
# Remote-control support requires the standalone Codex install layout
# under ~/.codex/packages/standalone/current. The npm package can run
# the TUI, but remote-control commands expect this installer-owned path.
RUN mkdir -p /home/node/.codex \
&& curl -fsSL https://chatgpt.com/codex/install.sh | sh
CMD ["codex"] CMD ["codex"]
+2 -4
View File
@@ -22,7 +22,6 @@ from ...agent_provider import (
AgentProvisionCommand, AgentProvisionCommand,
AgentProvisionFile, AgentProvisionFile,
AgentProvisionPlan, AgentProvisionPlan,
provider_startup_args,
) )
from .codex_auth import codex_host_access_token, write_codex_dummy_auth_file from .codex_auth import codex_host_access_token, write_codex_dummy_auth_file
from ...egress import CODEX_HOST_CREDENTIAL_TOKEN_REF, EgressRoute from ...egress import CODEX_HOST_CREDENTIAL_TOKEN_REF, EgressRoute
@@ -55,6 +54,7 @@ _RUNTIME = AgentProviderRuntime(
prompt_mode="read_prompt_file", prompt_mode="read_prompt_file",
bypass_args=("--dangerously-bypass-approvals-and-sandbox",), bypass_args=("--dangerously-bypass-approvals-and-sandbox",),
resume_args=("resume", "--last"), resume_args=("resume", "--last"),
remote_control_args=(),
) )
@@ -79,9 +79,8 @@ class CodexAgentProvider(AgentProvider):
color: str = "", color: str = "",
provider_settings: dict[str, object] | None = None, provider_settings: dict[str, object] | None = None,
) -> AgentProvisionPlan: ) -> AgentProvisionPlan:
del auth_token, label, color del auth_token, label, color, provider_settings
resolved_guest_env = dict(guest_env or {}) resolved_guest_env = dict(guest_env or {})
startup_args = provider_startup_args(provider_settings)
guest_home = self.guest_home guest_home = self.guest_home
trusted_path = trusted_project_path or guest_home trusted_path = trusted_project_path or guest_home
@@ -164,7 +163,6 @@ class CodexAgentProvider(AgentProvider):
env_vars=env_vars, env_vars=env_vars,
guest_env=resolved_guest_env, guest_env=resolved_guest_env,
has_prompt=has_prompt, has_prompt=has_prompt,
startup_args=startup_args,
dirs=tuple(dirs), dirs=tuple(dirs),
files=tuple(files), files=tuple(files),
pre_copy=tuple(pre_copy), pre_copy=tuple(pre_copy),
+1 -3
View File
@@ -21,7 +21,6 @@ from ...agent_provider import (
AgentProvisionDir, AgentProvisionDir,
AgentProvisionFile, AgentProvisionFile,
AgentProvisionPlan, AgentProvisionPlan,
provider_startup_args,
) )
from ...egress import EgressRoute from ...egress import EgressRoute
from ...log import die, info from ...log import die, info
@@ -166,6 +165,7 @@ _RUNTIME = AgentProviderRuntime(
prompt_mode="append_system_prompt", prompt_mode="append_system_prompt",
bypass_args=(), bypass_args=(),
resume_args=(), resume_args=(),
remote_control_args=(),
) )
@@ -199,7 +199,6 @@ class PiAgentProvider(AgentProvider):
models_payload, base_url, api_key_env, models, provider_name = ( models_payload, base_url, api_key_env, models, provider_name = (
_pi_models_json(settings) _pi_models_json(settings)
) )
extra_startup_args = provider_startup_args(provider_settings)
models_file = state_dir / "pi-models.json" models_file = state_dir / "pi-models.json"
models_file.write_text(json.dumps(models_payload, indent=2) + "\n") models_file.write_text(json.dumps(models_payload, indent=2) + "\n")
models_file.chmod(0o600) models_file.chmod(0o600)
@@ -220,7 +219,6 @@ class PiAgentProvider(AgentProvider):
startup_args=( startup_args=(
"--models", "--models",
",".join(f"{provider_name}/{model}" for model in models), ",".join(f"{provider_name}/{model}" for model in models),
*extra_startup_args,
), ),
dirs=(AgentProvisionDir(f"{guest_home}/.pi/agent"),), dirs=(AgentProvisionDir(f"{guest_home}/.pi/agent"),),
files=(AgentProvisionFile(models_file, _models_path(guest_home)),), files=(AgentProvisionFile(models_file, _models_path(guest_home)),),
+3 -159
View File
@@ -15,8 +15,6 @@ import gzip
import re import re
import typing import typing
import unicodedata import unicodedata
from math import log2
from collections import Counter
from urllib.parse import quote as url_quote from urllib.parse import quote as url_quote
try: try:
@@ -109,21 +107,20 @@ def redact_tokens(
text: str, text: str,
*, *,
env: typing.Mapping[str, str] | None = None, env: typing.Mapping[str, str] | None = None,
sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",),
) -> str: ) -> str:
"""Replace token pattern matches and (if env given) provisioned secrets with REDACT.""" """Replace token pattern matches and (if env given) provisioned secrets with REDACT."""
for _, pattern in TOKEN_PATTERNS: for _, pattern in TOKEN_PATTERNS:
text = pattern.sub(REDACT, text) text = pattern.sub(REDACT, text)
if env is not None: if env is not None:
for key, value in env.items(): for key, value in env.items():
if any(key.startswith(p) for p in sensitive_prefixes) and value: if key.startswith("EGRESS_TOKEN_") and value:
for variant in _encoded_variants(value): for variant in _encoded_variants(value):
text = text.replace(variant, REDACT) text = text.replace(variant, REDACT)
return text return text
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Known secrets detector # Known secrets detector (Phase 1b)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def _encoded_variants(secret: str) -> list[str]: def _encoded_variants(secret: str) -> list[str]:
@@ -164,65 +161,18 @@ def _encoded_variants(secret: str) -> list[str]:
return variants return variants
# ---------------------------------------------------------------------------
# Fragmentation-resistant helpers
# ---------------------------------------------------------------------------
# Minimum length of alnum projection for projection-based checks to run.
# Short secrets produce too many false positives in projection space.
_ALNUM_MIN_LEN = 8
# Minimum window length for the partial-substring sliding scan.
PARTIAL_MATCH_MIN_LEN = 12
def _alnum_projection(text: str) -> str:
"""Return text with every non-alphanumeric character stripped.
Used for fragmentation-resistant matching: separator-injected secrets
(spaces, hyphens, dots inserted between characters) are identical to
their originals in alnum projection space.
"""
return "".join(c for c in text if c.isalnum())
def _find_partial_window(secret_alnum: str, text_alnum: str, min_len: int) -> int | None:
"""Return the position in text_alnum where any min_len-char window of
secret_alnum first appears, or None.
Slides a window of width min_len across secret_alnum and searches for
each window in text_alnum. The first hit position is returned.
"""
if len(secret_alnum) < min_len or len(text_alnum) < min_len:
return None
for i in range(len(secret_alnum) - min_len + 1):
window = secret_alnum[i:i + min_len]
pos = text_alnum.find(window)
if pos >= 0:
return pos
return None
def scan_known_secrets( def scan_known_secrets(
text: str, text: str,
*, *,
location: str = "body", location: str = "body",
env: typing.Mapping[str, str] | None = None, env: typing.Mapping[str, str] | None = None,
sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",),
safe_tokens: typing.AbstractSet[str] | None = None, safe_tokens: typing.AbstractSet[str] | None = None,
) -> ScanResult | None: ) -> ScanResult | None:
if env is None: if env is None:
return None return None
# Pre-compute alnum projection of the scan text once; reused per secret.
text_alnum: str | None = None
for key, value in env.items(): for key, value in env.items():
if not any(key.startswith(p) for p in sensitive_prefixes) or not value: if not key.startswith("EGRESS_TOKEN_") or not value:
continue continue
# Pass 1: exact match across encoded variants (original behaviour).
approved_exact = False
for variant in _encoded_variants(value): for variant in _encoded_variants(value):
pos = text.find(variant) pos = text.find(variant)
if pos >= 0: if pos >= 0:
@@ -230,7 +180,6 @@ def scan_known_secrets(
# (PRD 0062); a different encoding of the same secret is a # (PRD 0062); a different encoding of the same secret is a
# fresh block. # fresh block.
if safe_tokens is not None and variant in safe_tokens: if safe_tokens is not None and variant in safe_tokens:
approved_exact = True
continue continue
return ScanResult( return ScanResult(
severity="block", severity="block",
@@ -239,104 +188,6 @@ def scan_known_secrets(
context=_snippet(text, pos, pos + len(variant)), context=_snippet(text, pos, pos + len(variant)),
matched=variant, matched=variant,
) )
if approved_exact:
# Exact match was found and approved; projection passes would
# fire on the same value, so skip them for this secret.
continue
# Pass 2 & 3: fragmentation-resistant projection checks.
secret_alnum = _alnum_projection(value)
if len(secret_alnum) < _ALNUM_MIN_LEN:
continue
if text_alnum is None:
text_alnum = _alnum_projection(text)
# Pass 2: full alnum-projection exact match (catches separator injection).
pos2 = text_alnum.find(secret_alnum)
if pos2 >= 0:
return ScanResult(
severity="block",
reason=(
f"provisioned secret from {key} found in {location} "
f"(fragmented match — separator injection)"
),
location=location,
context=_snippet(text_alnum, pos2, pos2 + len(secret_alnum)),
)
# Pass 3: sliding-window partial match (catches chunked-substring leaks).
pos3 = _find_partial_window(secret_alnum, text_alnum, PARTIAL_MATCH_MIN_LEN)
if pos3 is not None:
return ScanResult(
severity="block",
reason=(
f"provisioned secret from {key} found in {location} "
f"(partial match — at least {PARTIAL_MATCH_MIN_LEN} consecutive "
f"alphanumeric chars)"
),
location=location,
context=_snippet(text_alnum, pos3, pos3 + PARTIAL_MATCH_MIN_LEN),
)
return None
# ---------------------------------------------------------------------------
# Entropy detector (warn-only)
# ---------------------------------------------------------------------------
# Sliding window size and step for the entropy scan.
ENTROPY_WINDOW = 64
ENTROPY_STEP = 32
# Bits-per-character threshold. Random ASCII printable ≈ 6.6 bits; random
# lowercase hex ≈ 4 bits; random base64url ≈ 6 bits. 5.5 sits above
# typical structured data (JSON, URLs) while staying below truly random
# content.
ENTROPY_BLOCK_THRESHOLD = 5.5
def _shannon_entropy(text: str) -> float:
if not text:
return 0.0
counts = Counter(text)
n = len(text)
return -sum((c / n) * log2(c / n) for c in counts.values())
def scan_entropy(
text: str,
*,
location: str = "body",
window: int = ENTROPY_WINDOW,
threshold: float = ENTROPY_BLOCK_THRESHOLD,
) -> ScanResult | None:
"""Warn-only detector: flag windows of `window` chars with Shannon entropy
above `threshold` bits per character.
Never blocks; always returns severity='warn'. Disabled by default
routes must opt in via dlp.outbound_detectors=['entropy'].
"""
if not text:
return None
step = max(1, window // 2)
end = len(text)
# Scan overlapping windows; also check the final tail if shorter than window.
positions = list(range(0, end - window + 1, step))
if end < window:
positions = [0]
elif (end - window) % step != 0:
positions.append(end - window)
for i in positions:
chunk = text[i:i + window]
if _shannon_entropy(chunk) >= threshold:
return ScanResult(
severity="warn",
reason=f"high-entropy content in {location} (possible encrypted exfil)",
location=location,
context=_snippet(text, i, i + len(chunk)),
)
return None return None
@@ -455,18 +306,11 @@ def scan_crlf_injection(text: str) -> ScanResult | None:
__all__ = [ __all__ = [
"ENTROPY_BLOCK_THRESHOLD",
"ENTROPY_WINDOW",
"ENTROPY_STEP",
"PARTIAL_MATCH_MIN_LEN",
"REDACT", "REDACT",
"SNIPPET_CONTEXT", "SNIPPET_CONTEXT",
"TOKEN_PATTERNS", "TOKEN_PATTERNS",
"_alnum_projection",
"_shannon_entropy",
"redact_tokens", "redact_tokens",
"scan_crlf_injection", "scan_crlf_injection",
"scan_entropy",
"scan_known_secrets", "scan_known_secrets",
"scan_naive_injection", "scan_naive_injection",
"scan_token_patterns", "scan_token_patterns",
-55
View File
@@ -10,7 +10,6 @@ specific and lives on concrete subclasses (see
from __future__ import annotations from __future__ import annotations
import dataclasses import dataclasses
import secrets
from abc import ABC from abc import ABC
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
@@ -35,50 +34,6 @@ EGRESS_HOSTNAME = "egress"
EGRESS_ROUTES_IN_CONTAINER = "/etc/egress/routes.yaml" EGRESS_ROUTES_IN_CONTAINER = "/etc/egress/routes.yaml"
EGRESS_ROUTES_FILENAME = Path(EGRESS_ROUTES_IN_CONTAINER).name EGRESS_ROUTES_FILENAME = Path(EGRESS_ROUTES_IN_CONTAINER).name
_CANARY_ENV_WORDS = (
"ACCORD",
"ANCHOR",
"ATLAS",
"CANON",
"CIPHER",
"EMBER",
"FALCON",
"HARBOR",
"LANTERN",
"MARBLE",
"NOVA",
"ORBIT",
"PIVOT",
"RADIUS",
"SUMMIT",
"VECTOR",
)
def _random_canary_env() -> str:
first = secrets.choice(_CANARY_ENV_WORDS)
remaining = tuple(word for word in _CANARY_ENV_WORDS if word != first)
second = secrets.choice(remaining)
return f"{first}_{second}_SECRET"
def egress_sidecar_env_entries(plan: "EgressPlan") -> tuple[str, ...]:
"""Return sidecar env entries needed by egress across all backends."""
env: list[str] = []
if plan.routes:
env.extend(sorted(plan.token_env_map.keys()))
if plan.canary and plan.canary_env:
env.append(f"{plan.canary_env}={plan.canary}")
env.append(f"BOT_BOTTLE_SENSITIVE_PREFIXES={plan.canary_env}")
return tuple(env)
def egress_agent_env_entries(plan: "EgressPlan") -> tuple[str, ...]:
"""Return agent-visible egress env entries shared by all backends."""
if plan.canary and plan.canary_env:
return (f"{plan.canary_env}={plan.canary}",)
return ()
@dataclass(frozen=True) @dataclass(frozen=True)
class EgressRoute(Route): class EgressRoute(Route):
@@ -110,8 +65,6 @@ class EgressPlan:
mitmproxy_ca_host_path: Path = Path() mitmproxy_ca_host_path: Path = Path()
mitmproxy_ca_cert_only_host_path: Path = Path() mitmproxy_ca_cert_only_host_path: Path = Path()
log: int = 0 log: int = 0
canary: str = ""
canary_env: str = ""
def egress_manifest_routes( def egress_manifest_routes(
@@ -371,18 +324,12 @@ class Egress(ABC):
routes_path = stage_dir / EGRESS_ROUTES_FILENAME routes_path = stage_dir / EGRESS_ROUTES_FILENAME
routes_path.write_text(egress_render_routes(routes, log=log)) routes_path.write_text(egress_render_routes(routes, log=log))
routes_path.chmod(0o600) routes_path.chmod(0o600)
# Generate a per-session fake secret under a plausible random env name.
# The sidecar marks that exact env name as sensitive for known-secret
# scanning; the agent receives the same name/value as exfil bait.
canary = secrets.token_urlsafe(32)
return EgressPlan( return EgressPlan(
slug=slug, slug=slug,
routes_path=routes_path, routes_path=routes_path,
routes=routes, routes=routes,
token_env_map=egress_token_env_map(routes), token_env_map=egress_token_env_map(routes),
log=log, log=log,
canary=canary,
canary_env=_random_canary_env(),
) )
__all__ = [ __all__ = [
@@ -397,7 +344,5 @@ __all__ = [
"egress_render_routes", "egress_render_routes",
"egress_resolve_token_values", "egress_resolve_token_values",
"egress_routes_for_bottle", "egress_routes_for_bottle",
"egress_agent_env_entries",
"egress_sidecar_env_entries",
"egress_token_env_map", "egress_token_env_map",
] ]
+13 -32
View File
@@ -34,7 +34,7 @@ VALID_METHODS = frozenset({
"CONNECT", "CONNECT",
}) })
OUTBOUND_DETECTOR_NAMES = frozenset({"token_patterns", "known_secrets", "entropy"}) OUTBOUND_DETECTOR_NAMES = frozenset({"token_patterns", "known_secrets"})
INBOUND_DETECTOR_NAMES = frozenset({"naive_injection_detection"}) INBOUND_DETECTOR_NAMES = frozenset({"naive_injection_detection"})
# Per-route policy for what the proxy does when an outbound DLP detector # Per-route policy for what the proxy does when an outbound DLP detector
@@ -439,6 +439,15 @@ def route_to_yaml_dict(r: Route) -> dict[str, object]:
return d return d
def load_routes(text: str) -> tuple[Route, ...]:
"""Parse YAML text → routes."""
try:
payload = parse_yaml_subset(text)
except YamlSubsetError as e:
raise ValueError(f"routes payload: invalid YAML: {e}") from e
return parse_routes(payload)
def parse_config(payload: object) -> "Config": def parse_config(payload: object) -> "Config":
"""Parse a full egress config payload (top-level log level + routes).""" """Parse a full egress config payload (top-level log level + routes)."""
if not isinstance(payload, dict): if not isinstance(payload, dict):
@@ -720,28 +729,17 @@ def scan_outbound(
try: try:
from dlp_detectors import ( # type: ignore[import-not-found] from dlp_detectors import ( # type: ignore[import-not-found]
scan_crlf_injection, scan_crlf_injection,
scan_entropy,
scan_known_secrets, scan_known_secrets,
scan_token_patterns, scan_token_patterns,
) )
except ImportError: # pragma: no cover - host-side path except ImportError: # pragma: no cover - host-side path
from .dlp_detectors import ( # type: ignore[import-not-found] from .dlp_detectors import ( # type: ignore[import-not-found]
scan_crlf_injection, scan_crlf_injection,
scan_entropy,
scan_known_secrets, scan_known_secrets,
scan_token_patterns, scan_token_patterns,
) )
# Binary bodies: latin-1 is a bijective byte↔codepoint mapping that text = body if isinstance(body, str) else body.decode("utf-8", errors="replace")
# preserves every byte value, so ASCII-range secret strings remain
# findable by str.find / regex. Prefer strict UTF-8 for valid text bodies.
if isinstance(body, bytes):
try:
text = body.decode("utf-8")
except UnicodeDecodeError:
text = body.decode("latin-1")
else:
text = body
# CRLF injection is only an attack in the request line + headers, never the # CRLF injection is only an attack in the request line + headers, never the
# body: an HTTP body is delimited by Content-Length, so CRLF bytes there # body: an HTTP body is delimited by Content-Length, so CRLF bytes there
@@ -760,30 +758,12 @@ def scan_outbound(
return result return result
if _detector_enabled(route.outbound_detectors, "known_secrets"): if _detector_enabled(route.outbound_detectors, "known_secrets"):
# BOT_BOTTLE_SENSITIVE_PREFIXES lets operators add extra env prefixes
# beyond EGRESS_TOKEN_* without changing the manifest schema.
extra_raw = environ.get("BOT_BOTTLE_SENSITIVE_PREFIXES", "")
extra = tuple(p for p in extra_raw.split(",") if p)
sensitive_prefixes = ("EGRESS_TOKEN_",) + extra
result = scan_known_secrets( result = scan_known_secrets(
text, location="body", env=environ, text, location="body", env=environ, safe_tokens=safe_tokens,
sensitive_prefixes=sensitive_prefixes, safe_tokens=safe_tokens,
) )
if result is not None: if result is not None:
return result return result
# Entropy scanning requires explicit opt-in: it is NOT part of the
# default "all detectors" set because it produces false positives on
# legitimate base64 / binary payloads. Routes must list "entropy" in
# dlp.outbound_detectors to enable it.
if (
route.outbound_detectors is not None
and "entropy" in route.outbound_detectors
):
result = scan_entropy(text, location="body")
if result is not None:
return result
return None return None
@@ -853,6 +833,7 @@ __all__ = [
"is_git_push_request", "is_git_push_request",
"is_git_fetch_request", "is_git_fetch_request",
"load_config", "load_config",
"load_routes",
"match_route", "match_route",
"outbound_scan_headers", "outbound_scan_headers",
"parse_config", "parse_config",
+6 -28
View File
@@ -199,10 +199,13 @@ def _parse_provider_settings(
) -> dict[str, object]: ) -> dict[str, object]:
if raw is None: if raw is None:
return {} return {}
if template != "pi":
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.settings is only "
"supported for template 'pi'"
)
settings = as_json_object(raw, f"bottle '{bottle_name}' agent_provider.settings") settings = as_json_object(raw, f"bottle '{bottle_name}' agent_provider.settings")
allowed = {
common_allowed = {"startup_args"}
pi_allowed = {
"provider", "provider",
"base_url", "base_url",
"api", "api",
@@ -215,37 +218,12 @@ def _parse_provider_settings(
"supports_developer_role", "supports_developer_role",
"supports_reasoning_effort", "supports_reasoning_effort",
} }
if template == "pi":
allowed = common_allowed | pi_allowed
elif template in ("claude", "codex"):
allowed = common_allowed
elif template not in PROVIDER_TEMPLATES:
return dict(settings)
else:
allowed = common_allowed
for key in settings: for key in settings:
if key not in allowed: if key not in allowed:
raise ManifestError( raise ManifestError(
f"bottle '{bottle_name}' agent_provider.settings has unknown " f"bottle '{bottle_name}' agent_provider.settings has unknown "
f"key {key!r}; allowed: {', '.join(sorted(allowed))}" f"key {key!r}; allowed: {', '.join(sorted(allowed))}"
) )
startup_args = settings.get("startup_args")
if startup_args is not None:
if not isinstance(startup_args, list):
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.settings.startup_args "
f"must be an array of strings"
)
for i, arg in enumerate(startup_args):
if not isinstance(arg, str) or not arg:
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.settings."
f"startup_args[{i}] must be a non-empty string"
)
if template != "pi":
return dict(settings)
for key in ("provider", "base_url", "api", "api_key", "api_key_env"): for key in ("provider", "base_url", "api", "api_key", "api_key_env"):
value = settings.get(key) value = settings.get(key)
if value is not None and (not isinstance(value, str) or not value): if value is not None and (not isinstance(value, str) or not value):
+3 -8
View File
@@ -47,11 +47,11 @@ from pathlib import Path
try: try:
# Same-directory imports inside the bundle container; these files are # Same-directory imports inside the bundle container; these files are
# COPYed flat under /app by Dockerfile.sidecars. # COPYed flat under /app by Dockerfile.sidecars.
from egress_addon_core import LOG_OFF, load_config from egress_addon_core import load_routes
import supervise as _sv import supervise as _sv
except ModuleNotFoundError: except ModuleNotFoundError:
# Package imports for host-side tests and tooling. # Package imports for host-side tests and tooling.
from .egress_addon_core import LOG_OFF, load_config from .egress_addon_core import load_routes
from . import supervise as _sv from . import supervise as _sv
@@ -297,17 +297,12 @@ def validate_proposed_file(tool: str, content: str) -> None:
pass pass
elif tool in (_sv.TOOL_EGRESS_ALLOW, _sv.TOOL_EGRESS_BLOCK): elif tool in (_sv.TOOL_EGRESS_ALLOW, _sv.TOOL_EGRESS_BLOCK):
try: try:
config = load_config(content) load_routes(content)
except ValueError as e: except ValueError as e:
raise _RpcError( raise _RpcError(
ERR_INVALID_PARAMS, ERR_INVALID_PARAMS,
f"{tool}: proposed routes.yaml is not valid: {e}", f"{tool}: proposed routes.yaml is not valid: {e}",
) from e ) from e
if config.log != LOG_OFF:
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: proposed routes.yaml must not change egress logging",
)
else: else:
raise _RpcError(ERR_INVALID_PARAMS, f"unknown tool {tool!r}") raise _RpcError(ERR_INVALID_PARAMS, f"unknown tool {tool!r}")
@@ -1,189 +0,0 @@
# PRD 0063: Strengthen outbound exfiltration detection
- **Status:** Active
- **Author:** claude
- **Created:** 2026-06-25
- **Issue:** #259
## Summary
The current DLP system has two detection tiers: weak (vendor token regexes)
and strong (exact matching of provisioned `EGRESS_TOKEN_*` values in multiple
encodings). This PRD adds four complementary hardening layers in ranked order
of impact:
1. **Canary tokens** — inject a per-session fake secret into the agent's
environment; any outbound appearance is a zero-false-positive exfil signal.
2. **Broadened known-value scanning** — extend the strong tier beyond
`EGRESS_TOKEN_*` via a `BOT_BOTTLE_SENSITIVE_PREFIXES` sidecar env var.
3. **Fragmentation-resistant matching** — catch separator injection and
long-substring leaks via alnum projection and sliding-window substring
search.
4. **Entropy scoring** — warn-only flag on high-entropy blobs as a
complementary signal for encrypted exfil.
## Problem
After PRD 0056 the outbound scan covers all HTTP request surfaces (host, path,
query, headers, body). But the content-based strong tier only matches
`EGRESS_TOKEN_*` values and their encodings. Three gaps remain:
| Gap | How an agent exploits it |
|-----|--------------------------|
| No canary | There is no "can't be false-positive" signal |
| Narrow prefix filter | Sensitive values provisioned under other env prefixes (MCP keys, API keys injected via the agent provider) are invisible to `scan_known_secrets` |
| Fragmentation blindness | Inserting `-`, space, or `\n` between secret characters turns an exact-match into a miss |
## Goals / Success Criteria
1. Each launched bottle has a unique canary token in the agent's environment
under a randomized `WORD_WORD_SECRET` env var name. The egress sidecar gets
the same env var and registers that exact name through
`BOT_BOTTLE_SENSITIVE_PREFIXES`. Any outbound appearance of the canary
blocks the request as a known-secret match.
2. `scan_known_secrets` accepts a `sensitive_prefixes` parameter (default:
`("EGRESS_TOKEN_",)`). `scan_outbound` reads
`BOT_BOTTLE_SENSITIVE_PREFIXES` from `environ` and merges those prefixes
in, so operators can mark additional env vars as scanned values without
changing the manifest schema.
3. For every secret that passes exact-match, a secondary alnum-projection pass
checks for the secret with all non-alphanumeric characters stripped. This
catches separator-injection evasion (`MY-SECRET` → body contains
`MY SECRET`).
4. A sliding-window partial-match pass checks for long-enough contiguous
substrings of the secret's alnum projection in the text's alnum projection.
Any match ≥ `PARTIAL_MATCH_MIN_LEN` (12 chars) blocks with reason
`"partial match"`.
5. A new `scan_entropy` detector flags outbound text windows with Shannon
entropy ≥ `ENTROPY_BLOCK_THRESHOLD` (5.5 bits/char) at **warn** severity
only. It is registered under the new detector name `"entropy"` in
`OUTBOUND_DETECTOR_NAMES` and disabled by default (routes must opt in).
6. Binary request bodies are decoded via `latin-1` instead of
`utf-8 errors="replace"`, preserving every byte value and allowing
ASCII-range secrets to be found within binary payloads.
7. All new behaviour is unit-tested; existing tests pass unchanged.
## Non-goals
- Rolling per-host buffer for split-across-requests detection (state in the
stateless addon is complex; deferred).
- Additional vendor regexes.
- ML / embedding-based detection.
- Entropy-based hard blocks (warn only per the issue).
## Design
### Canary token flow
```
Egress.prepare()
canary = secrets.token_urlsafe(32)
canary_env = <random WORD_WORD_SECRET>
EgressPlan(canary=canary, canary_env=canary_env, ...)
Docker compose render:
sidecar env: <canary_env>=<canary>
sidecar env: BOT_BOTTLE_SENSITIVE_PREFIXES=<canary_env>
agent env: <canary_env>=<canary> ← visible to agent as a "secret"
macos-container launch: same literals added to sidecar + agent env entries
```
The sidecar uses `BOT_BOTTLE_SENSITIVE_PREFIXES` to make the random canary env
name part of the existing `scan_known_secrets` detector without adding a
manifest schema field.
### Broadened known-value scanning
`scan_known_secrets` gains a `sensitive_prefixes` parameter:
```python
def scan_known_secrets(
text: str,
*,
location: str = "body",
env: Mapping[str, str] | None = None,
sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",),
) -> ScanResult | None:
```
`scan_outbound` reads `BOT_BOTTLE_SENSITIVE_PREFIXES` (comma-separated list
of additional prefixes) from `environ` and appends them:
```python
extra = tuple(
p for p in environ.get("BOT_BOTTLE_SENSITIVE_PREFIXES", "").split(",") if p
)
sensitive_prefixes = ("EGRESS_TOKEN_",) + extra
```
`redact_tokens` receives the same treatment for consistent redaction.
### Fragmentation-resistant matching
A new helper `_alnum_projection(text)` strips all non-alphanumeric characters.
`scan_known_secrets` runs two passes per secret:
1. **Exact pass** — existing encoded-variant loop (unchanged).
2. **Alnum-projection pass** — if the secret's alnum projection has ≥ 8 chars,
check if it appears in the text's alnum projection. Match → block with
`"fragmented match (separator injection)"` reason.
3. **Partial-substring pass** — if the secret's alnum projection has ≥
`PARTIAL_MATCH_MIN_LEN` chars (12), slide a window of that length across the
secret's projection and look for each window in the text's alnum projection.
First match → block with `"partial match"` reason.
All three passes run only for the `"known_secrets"` detector; the token-pattern
and entropy detectors are unchanged.
### Entropy scoring
New public function:
```python
def scan_entropy(
text: str,
*,
location: str = "body",
window: int = ENTROPY_WINDOW, # 64
threshold: float = ENTROPY_BLOCK_THRESHOLD, # 5.5
) -> ScanResult | None:
```
Slides a window of `window` characters across `text` in steps of `window // 2`.
If any window's Shannon entropy exceeds `threshold`, returns a **warn**-severity
`ScanResult`. Never blocks.
`OUTBOUND_DETECTOR_NAMES` gains `"entropy"`. Routes opt in via their `dlp`
block; entropy scanning is **off by default** to avoid false-positive noise on
legitimate binary payloads.
### Binary body handling
In `scan_outbound`, the bytes → str decoding changes from:
```python
body.decode("utf-8", errors="replace")
```
to:
```python
body.decode("utf-8") if body is str else body.decode("latin-1")
```
`latin-1` is a bijective byte↔codepoint mapping; every byte value is preserved
as its corresponding Latin-1 code point, so ASCII-range secret strings remain
intact and `str.find` / regex still locate them correctly. The fallback from
strict UTF-8 is tried first so valid UTF-8 bodies are decoded faithfully.
## Implementation
Delivered in three commits on the same branch:
1. **DLP detector changes**`_alnum_projection`, fragmentation passes,
`scan_entropy`, broadened `scan_known_secrets`, updated `scan_outbound` and
`redact_tokens`; all accompanying unit tests.
2. **Canary injection**`EgressPlan.canary`, `Egress.prepare()`,
Docker compose + macos-container backend injection.
3. **PRD flip**`Status: Draft → Active`.
@@ -1,6 +1,6 @@
# PRD 0064: LOG_FULL egress logging credential redaction # PRD prd-new: LOG_FULL egress logging credential redaction
- **Status:** Active - **Status:** Draft
- **Author:** claude - **Author:** claude
- **Created:** 2026-06-25 - **Created:** 2026-06-25
- **Issue:** #257 - **Issue:** #257
-1
View File
@@ -4,4 +4,3 @@
pylint>=3.0.0 pylint>=3.0.0
pyright>=1.1.300 pyright>=1.1.300
coverage>=7.0.0
+4 -7
View File
@@ -92,9 +92,9 @@ class TestSandboxEscape(unittest.TestCase):
"on PATH: curl -sSL https://smolmachines.com/install.sh | sh" "on PATH: curl -sSL https://smolmachines.com/install.sh | sh"
) )
# Throwaway static key for the git-gate fixture. It need not # Throwaway "identity file" for the git-gate's `identity` field.
# be a real SSH key: test 5 reaches gitleaks before any SSH # It need not be a real SSH key: test 5 reaches gitleaks before
# attempt anyway. # any SSH attempt anyway.
fd, kp = tempfile.mkstemp(prefix="sandbox-test-key.") fd, kp = tempfile.mkstemp(prefix="sandbox-test-key.")
os.close(fd) os.close(fd)
cls._key_path = Path(kp) cls._key_path = Path(kp)
@@ -123,10 +123,7 @@ class TestSandboxEscape(unittest.TestCase):
"git-gate": {"repos": { "git-gate": {"repos": {
"throwaway": { "throwaway": {
"url": "ssh://git@unreachable.invalid:22/throwaway.git", "url": "ssh://git@unreachable.invalid:22/throwaway.git",
"key": { "identity": str(cls._key_path),
"provider": "static",
"path": str(cls._key_path),
},
}, },
}}, }},
}, },
@@ -198,7 +198,6 @@ class TestSmolmachinesLaunch(unittest.TestCase):
# connect fails, which is the property chunk 3 will # connect fails, which is the property chunk 3 will
# preserve once egress is actually running. # preserve once egress is actually running.
r = self.bottle.exec( r = self.bottle.exec(
"env -u HTTPS_PROXY -u HTTP_PROXY -u https_proxy -u http_proxy "
f"curl -s --show-error --max-time 3 http://{self.plan.bundle_ip}:9099 " f"curl -s --show-error --max-time 3 http://{self.plan.bundle_ip}:9099 "
"2>&1 || true" "2>&1 || true"
) )
-46
View File
@@ -168,34 +168,6 @@ class TestAgentProviderRuntime(unittest.TestCase):
self.assertEqual("~/.claude/statusline.sh", settings["statusLine"]["command"]) self.assertEqual("~/.claude/statusline.sh", settings["statusLine"]["command"])
self.assertEqual("custom:bot-bottle-research-ui", settings["theme"]) self.assertEqual("custom:bot-bottle-research-ui", settings["theme"])
def test_claude_plan_uses_startup_args_from_provider_settings(self):
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
plan = build_agent_provision_plan(
template="claude",
dockerfile="",
state_dir=Path(tmp),
instance_name="bot-bottle-test",
prompt_file=Path(tmp) / "prompt.txt",
provider_settings={
"startup_args": ["--model", "opus"],
},
)
self.assertEqual(("--model", "opus"), plan.startup_args)
def test_codex_plan_uses_startup_args_from_provider_settings(self):
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
plan = build_agent_provision_plan(
template="codex",
dockerfile="",
state_dir=Path(tmp),
instance_name="bot-bottle-test",
prompt_file=Path(tmp) / "prompt.txt",
provider_settings={
"startup_args": ["--model", "gpt-5-codex"],
},
)
self.assertEqual(("--model", "gpt-5-codex"), plan.startup_args)
def test_codex_forward_host_credentials_populates_egress_routes(self): def test_codex_forward_host_credentials_populates_egress_routes(self):
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp: with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
home = Path(tmp) / "host-codex" home = Path(tmp) / "host-codex"
@@ -422,24 +394,6 @@ class TestAgentProviderRuntime(unittest.TestCase):
self.assertNotIn("OPENROUTER_API_KEY", plan.guest_env) self.assertNotIn("OPENROUTER_API_KEY", plan.guest_env)
self.assertTrue(provider["compat"]["supportsReasoningEffort"]) self.assertTrue(provider["compat"]["supportsReasoningEffort"])
def test_pi_plan_appends_startup_args_from_provider_settings(self):
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
plan = build_agent_provision_plan(
template="pi",
dockerfile="",
state_dir=Path(tmp),
instance_name="bot-bottle-test",
prompt_file=Path(tmp) / "prompt.txt",
provider_settings={
"models": ["qwen3:14b"],
"startup_args": ["--no-stream"],
},
)
self.assertEqual(
("--models", "ollama/qwen3:14b", "--no-stream"),
plan.startup_args,
)
def test_pi_prompt_mode_appends_system_prompt_interactively(self): def test_pi_prompt_mode_appends_system_prompt_interactively(self):
self.assertEqual( self.assertEqual(
["--append-system-prompt", "/home/node/.bot-bottle-prompt.txt"], ["--append-system-prompt", "/home/node/.bot-bottle-prompt.txt"],
-21
View File
@@ -102,27 +102,6 @@ class TestAttachAgent(unittest.TestCase):
bottle.argv, bottle.argv,
) )
def test_remote_control_is_provider_startup_arg(self):
class Bottle:
argv: list[str] = []
def exec_agent(self, argv: list[str], *, tty: bool = True) -> int:
self.argv = list(argv)
return 0
bottle = Bottle()
exit_code = start_mod.attach_agent(
bottle, # type: ignore[arg-type]
agent_provider_template="codex",
startup_args=("remote-control",),
)
self.assertEqual(0, exit_code)
self.assertEqual(
["--dangerously-bypass-approvals-and-sandbox", "remote-control"],
bottle.argv,
)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
+2 -23
View File
@@ -80,11 +80,7 @@ def _git_gate_plan(upstreams: tuple[GitGateUpstream, ...] = ()) -> GitGatePlan:
) )
def _egress_plan( def _egress_plan(routes: tuple[EgressRoute, ...] = ()) -> EgressPlan:
routes: tuple[EgressRoute, ...] = (),
*,
canary: bool = False,
) -> EgressPlan:
token_env_map = { token_env_map = {
r.token_env: r.token_ref r.token_env: r.token_ref
for r in routes for r in routes
@@ -99,8 +95,6 @@ def _egress_plan(
egress_network=f"bot-bottle-egress-{SLUG}", egress_network=f"bot-bottle-egress-{SLUG}",
mitmproxy_ca_host_path=STATE / "egress-ca" / "mitmproxy-ca.pem", mitmproxy_ca_host_path=STATE / "egress-ca" / "mitmproxy-ca.pem",
mitmproxy_ca_cert_only_host_path=STATE / "egress-ca" / "ca.pem", mitmproxy_ca_cert_only_host_path=STATE / "egress-ca" / "ca.pem",
canary="fake-canary-value" if canary else "",
canary_env="CANON_ALPHA_SECRET" if canary else "",
) )
@@ -118,7 +112,6 @@ def _plan(
with_git: bool = False, with_git: bool = False,
with_egress: bool = False, with_egress: bool = False,
supervise: bool = False, supervise: bool = False,
canary: bool = False,
) -> DockerBottlePlan: ) -> DockerBottlePlan:
"""Build a fully-resolved DockerBottlePlan. Toggles cover the """Build a fully-resolved DockerBottlePlan. Toggles cover the
matrix the renderer's conditional-service logic branches on.""" matrix the renderer's conditional-service logic branches on."""
@@ -157,7 +150,7 @@ def _plan(
slug=SLUG, slug=SLUG,
forwarded_env={"CLAUDE_CODE_OAUTH_TOKEN": "x"}, forwarded_env={"CLAUDE_CODE_OAUTH_TOKEN": "x"},
git_gate_plan=_git_gate_plan(upstreams), git_gate_plan=_git_gate_plan(upstreams),
egress_plan=_egress_plan(routes, canary=canary), egress_plan=_egress_plan(routes),
supervise_plan=_supervise_plan() if supervise else None, supervise_plan=_supervise_plan() if supervise else None,
use_runsc=False, use_runsc=False,
agent_provision=AgentProvisionPlan( agent_provision=AgentProvisionPlan(
@@ -382,20 +375,6 @@ class TestSidecarBundleShape(unittest.TestCase):
env_strings = sc["environment"] env_strings = sc["environment"]
self.assertNotIn("EGRESS_TOKEN_0", env_strings) self.assertNotIn("EGRESS_TOKEN_0", env_strings)
def test_canary_env_registered_as_sensitive_in_sidecar(self):
sc = self._render(canary=True)["services"]["sidecars"]
env_strings = sc["environment"]
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", env_strings)
self.assertIn(
"BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET",
env_strings,
)
def test_canary_env_visible_to_agent(self):
agent = self._render(canary=True)["services"]["agent"]
env_strings = agent["environment"]
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", env_strings)
def test_supervise_env_present_when_active(self): def test_supervise_env_present_when_active(self):
sc = self._render(supervise=True)["services"]["sidecars"] sc = self._render(supervise=True)["services"]["sidecars"]
env_strings = sc["environment"] env_strings = sc["environment"]
@@ -29,9 +29,6 @@ from bot_bottle.supervise import SupervisePlan
_URL = "http://supervise:9100/" _URL = "http://supervise:9100/"
_CODEX_DOCKERFILE = (
Path(__file__).resolve().parents[2] / "bot_bottle/contrib/codex/Dockerfile"
)
def _make_bottle(exec_result: ExecResult | None = None) -> MagicMock: def _make_bottle(exec_result: ExecResult | None = None) -> MagicMock:
@@ -279,12 +276,6 @@ class TestCodexProvision(unittest.TestCase):
) )
class TestCodexDockerfile(unittest.TestCase):
def test_installs_procps_for_remote_control_pid_management(self):
dockerfile = _CODEX_DOCKERFILE.read_text()
self.assertIn("procps", dockerfile)
class TestCodexSuperviseMcp(unittest.TestCase): class TestCodexSuperviseMcp(unittest.TestCase):
def test_noop_when_supervise_disabled(self): def test_noop_when_supervise_disabled(self):
bottle = _make_bottle() bottle = _make_bottle()
+2 -192
View File
@@ -1,23 +1,18 @@
"""Unit: DLP detectors (PRD 0053). """Unit: DLP detectors (PRD 0053).
Tests for token pattern scanning, known secret detection, fragmentation- Tests for token pattern scanning, known secret detection, and
resistant matching, entropy scoring, and naive prompt injection detection.""" naive prompt injection detection."""
import base64 import base64
import gzip import gzip
import unittest import unittest
from bot_bottle.dlp_detectors import ( from bot_bottle.dlp_detectors import (
ENTROPY_BLOCK_THRESHOLD,
PARTIAL_MATCH_MIN_LEN,
REDACT, REDACT,
_alnum_projection,
_encoded_variants, _encoded_variants,
_normalize_text, _normalize_text,
_shannon_entropy,
redact_tokens, redact_tokens,
scan_crlf_injection, scan_crlf_injection,
scan_entropy,
scan_known_secrets, scan_known_secrets,
scan_naive_injection, scan_naive_injection,
scan_token_patterns, scan_token_patterns,
@@ -507,191 +502,6 @@ class TestStripCrlf(unittest.TestCase):
from bot_bottle.dlp_detectors import strip_crlf from bot_bottle.dlp_detectors import strip_crlf
self.assertEqual("/api/v1/data?q=hello", strip_crlf("/api/v1/data?q=hello")) self.assertEqual("/api/v1/data?q=hello", strip_crlf("/api/v1/data?q=hello"))
class TestAlnumProjection(unittest.TestCase):
def test_alphanumeric_unchanged(self):
self.assertEqual("abc123XYZ", _alnum_projection("abc123XYZ"))
def test_strips_hyphens(self):
self.assertEqual("mysecretvalue", _alnum_projection("my-secret-value"))
def test_strips_spaces(self):
self.assertEqual("mysecretvalue", _alnum_projection("my secret value"))
def test_strips_dots_and_underscores(self):
self.assertEqual("mysecretvalue", _alnum_projection("my.secret_value"))
def test_empty_string(self):
self.assertEqual("", _alnum_projection(""))
def test_all_special_chars(self):
self.assertEqual("", _alnum_projection("!@#$%^&*()"))
class TestFragmentationResistantMatching(unittest.TestCase):
"""scan_known_secrets catches separator-injection and partial-substring evasion."""
# Secrets long enough that their alnum projections are ≥ 8 chars.
SECRET = "supersecrettoken99"
ENV = {"EGRESS_TOKEN_0": SECRET}
def test_exact_match_still_works(self):
result = scan_known_secrets(f"key={self.SECRET}", env=self.ENV)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_separator_injection_blocked(self):
# Hyphens inserted between chars of the secret.
fragmented = "-".join(self.SECRET)
result = scan_known_secrets(f"data={fragmented}", env=self.ENV)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
self.assertIn("separator injection", result.reason)
def test_space_separator_blocked(self):
fragmented = " ".join(self.SECRET)
result = scan_known_secrets(f"body: {fragmented}", env=self.ENV)
self.assertIsNotNone(result)
assert result is not None
self.assertIn("separator injection", result.reason)
def test_partial_substring_blocked(self):
# First PARTIAL_MATCH_MIN_LEN alnum chars of the secret, no separators.
partial = _alnum_projection(self.SECRET)[:PARTIAL_MATCH_MIN_LEN]
result = scan_known_secrets(f"x={partial}&y=other", env=self.ENV)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
self.assertIn("partial match", result.reason)
def test_short_secret_skips_projection(self):
# Secrets shorter than _ALNUM_MIN_LEN in alnum projection are not
# fragmentation-checked (too many false positives).
short_env = {"EGRESS_TOKEN_0": "abc"}
# "a b c" has alnum projection "abc" (3 chars, < 8); should not block.
self.assertIsNone(scan_known_secrets("a b c", env=short_env))
def test_clean_text_not_blocked(self):
self.assertIsNone(scan_known_secrets("nothing to see here", env=self.ENV))
def test_sensitive_prefixes_param_extra_prefix(self):
env = {"MY_CRED_0": self.SECRET, "IGNORED": "other"}
result = scan_known_secrets(
f"key={self.SECRET}",
env=env,
sensitive_prefixes=("MY_CRED_",),
)
self.assertIsNotNone(result)
assert result is not None
self.assertIn("MY_CRED_0", result.reason)
def test_sensitive_prefixes_default_only_egress_token(self):
# A value under a non-EGRESS_TOKEN_ key is ignored with default prefixes.
env = {"MY_CRED_0": self.SECRET}
self.assertIsNone(scan_known_secrets(f"key={self.SECRET}", env=env))
def test_canary_prefix_detected(self):
canary_value = "canary-fake-secret-value-xyz"
env = {"CANON_ALPHA_SECRET": canary_value}
result = scan_known_secrets(
f"x={canary_value}",
env=env,
sensitive_prefixes=("CANON_ALPHA_SECRET",),
)
self.assertIsNotNone(result)
assert result is not None
self.assertIn("CANON_ALPHA_SECRET", result.reason)
class TestRedactTokensBroadenedPrefixes(unittest.TestCase):
SECRET = "my-provisioned-secret"
def test_default_redacts_egress_token(self):
env = {"EGRESS_TOKEN_0": self.SECRET}
out = redact_tokens(f"val={self.SECRET}", env=env)
self.assertNotIn(self.SECRET, out)
self.assertIn(REDACT, out)
def test_extra_prefix_redacted(self):
env = {"MY_SECRET_KEY": self.SECRET}
out = redact_tokens(
f"val={self.SECRET}",
env=env,
sensitive_prefixes=("MY_SECRET_",),
)
self.assertNotIn(self.SECRET, out)
self.assertIn(REDACT, out)
def test_non_matching_prefix_not_redacted(self):
env = {"MY_SECRET_KEY": self.SECRET}
out = redact_tokens(f"val={self.SECRET}", env=env)
# Default prefixes only include EGRESS_TOKEN_ → secret not redacted
self.assertIn(self.SECRET, out)
class TestShannonEntropy(unittest.TestCase):
def test_empty_string_zero(self):
self.assertEqual(0.0, _shannon_entropy(""))
def test_single_char_zero(self):
self.assertEqual(0.0, _shannon_entropy("aaaaaa"))
def test_two_equal_chars_one_bit(self):
self.assertAlmostEqual(1.0, _shannon_entropy("abababab"), places=10)
def test_high_entropy_random_like(self):
# Uniform 64-char string over 64 distinct symbols has entropy 6 bits.
import string
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
text = alphabet # each char appears exactly once
self.assertAlmostEqual(6.0, _shannon_entropy(text), places=10)
class TestScanEntropy(unittest.TestCase):
def test_empty_returns_none(self):
self.assertIsNone(scan_entropy(""))
def test_low_entropy_returns_none(self):
# Highly repetitive text has low entropy.
self.assertIsNone(scan_entropy("a" * 200))
def test_high_entropy_warns(self):
# Build a 64-char string with entropy > ENTROPY_BLOCK_THRESHOLD.
# Use all 64 distinct printable chars to maximise entropy (~6 bits).
import string
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
result = scan_entropy(alphabet, threshold=ENTROPY_BLOCK_THRESHOLD)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("warn", result.severity)
self.assertIn("high-entropy", result.reason)
def test_never_blocks(self):
import string
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
result = scan_entropy(alphabet)
# scan_entropy is warn-only; it must never return severity="block".
if result is not None:
self.assertNotEqual("block", result.severity)
def test_location_in_result(self):
import string
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
result = scan_entropy(alphabet, location="authorization header")
if result is not None:
self.assertIn("authorization header", result.location)
def test_structured_json_no_warn(self):
# Typical JSON has low entropy and should not be flagged.
json_body = '{"status": "ok", "message": "hello world", "count": 42}'
self.assertIsNone(scan_entropy(json_body))
def test_short_text_below_window(self):
# Text shorter than the window: checked as one chunk.
# Use a uniform string to ensure it won't be flagged.
self.assertIsNone(scan_entropy("abcde", threshold=ENTROPY_BLOCK_THRESHOLD))
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
-10
View File
@@ -136,16 +136,6 @@ class TestClaudeArgv(unittest.TestCase):
argv, argv,
) )
def test_codex_remote_control_startup_arg_does_not_receive_initial_prompt(self):
argv = _codex_bottle("/home/node/.bot-bottle-prompt.txt").agent_argv(
["--dangerously-bypass-approvals-and-sandbox", "remote-control"],
)
self.assertEqual(
["docker", "exec", "-it", "bot-bottle-dev-abc", "codex",
"--dangerously-bypass-approvals-and-sandbox", "remote-control"],
argv,
)
def test_codex_resume_does_not_append_initial_prompt(self): def test_codex_resume_does_not_append_initial_prompt(self):
argv = _codex_bottle("/home/node/.bot-bottle-prompt.txt").agent_argv( argv = _codex_bottle("/home/node/.bot-bottle-prompt.txt").agent_argv(
["--dangerously-bypass-approvals-and-sandbox", "resume", "--last"], ["--dangerously-bypass-approvals-and-sandbox", "resume", "--last"],
@@ -31,6 +31,7 @@ class _Provider(AgentProvider):
return AgentProviderRuntime( return AgentProviderRuntime(
template="test", command="test", image="", template="test", command="test", image="",
prompt_mode="append_file", bypass_args=(), resume_args=(), prompt_mode="append_file", bypass_args=(), resume_args=(),
remote_control_args=(),
) )
def provision_plan(self, **kwargs): # type: ignore[override] def provision_plan(self, **kwargs): # type: ignore[override]
raise NotImplementedError raise NotImplementedError
+8 -128
View File
@@ -1,21 +1,15 @@
"""Unit: Egress route lift + routes.yaml render + token """Unit: Egress route lift + routes.yaml render + token
resolution (PRD 0017, PRD 0053).""" resolution (PRD 0017, PRD 0053)."""
import tempfile
import unittest import unittest
from pathlib import Path
from bot_bottle.egress import ( from bot_bottle.egress import (
CODEX_HOST_CREDENTIAL_TOKEN_REF, CODEX_HOST_CREDENTIAL_TOKEN_REF,
Egress,
EgressPlan,
EgressRoute, EgressRoute,
egress_agent_env_entries,
egress_manifest_routes, egress_manifest_routes,
egress_render_routes, egress_render_routes,
egress_resolve_token_values, egress_resolve_token_values,
egress_routes_for_bottle, egress_routes_for_bottle,
egress_sidecar_env_entries,
egress_token_env_map, egress_token_env_map,
) )
from bot_bottle.log import Die from bot_bottle.log import Die
@@ -322,7 +316,7 @@ class TestRenderRoutes(unittest.TestCase):
self.assertEqual([], parse_yaml_subset(rendered)["routes"]) self.assertEqual([], parse_yaml_subset(rendered)["routes"])
def test_round_trip_through_addon_core(self): def test_round_trip_through_addon_core(self):
from bot_bottle.egress_addon_core import load_config from bot_bottle.egress_addon_core import load_routes
b = _bottle([ b = _bottle([
{"host": "api.github.com", {"host": "api.github.com",
"auth": {"scheme": "Bearer", "token_ref": "GH_PAT"}, "auth": {"scheme": "Bearer", "token_ref": "GH_PAT"},
@@ -333,7 +327,7 @@ class TestRenderRoutes(unittest.TestCase):
{"host": "api.anthropic.com"}, {"host": "api.anthropic.com"},
]) ])
routes = egress_routes_for_bottle(b) routes = egress_routes_for_bottle(b)
addon_routes = load_config(egress_render_routes(routes)).routes addon_routes = load_routes(egress_render_routes(routes))
self.assertEqual(3, len(addon_routes)) self.assertEqual(3, len(addon_routes))
self.assertEqual("Bearer", addon_routes[0].auth_scheme) self.assertEqual("Bearer", addon_routes[0].auth_scheme)
self.assertEqual("EGRESS_TOKEN_0", addon_routes[0].token_env) self.assertEqual("EGRESS_TOKEN_0", addon_routes[0].token_env)
@@ -341,26 +335,26 @@ class TestRenderRoutes(unittest.TestCase):
self.assertEqual("", addon_routes[2].auth_scheme) self.assertEqual("", addon_routes[2].auth_scheme)
def test_dlp_round_trips(self): def test_dlp_round_trips(self):
from bot_bottle.egress_addon_core import load_config from bot_bottle.egress_addon_core import load_routes
b = _bottle([{"host": "x.example", "dlp": { b = _bottle([{"host": "x.example", "dlp": {
"outbound_detectors": ["token_patterns"], "outbound_detectors": ["token_patterns"],
"inbound_detectors": False, "inbound_detectors": False,
}}]) }}])
routes = egress_routes_for_bottle(b) routes = egress_routes_for_bottle(b)
rendered = egress_render_routes(routes) rendered = egress_render_routes(routes)
addon_routes = load_config(rendered).routes addon_routes = load_routes(rendered)
self.assertEqual(("token_patterns",), addon_routes[0].outbound_detectors) self.assertEqual(("token_patterns",), addon_routes[0].outbound_detectors)
self.assertEqual((), addon_routes[0].inbound_detectors) self.assertEqual((), addon_routes[0].inbound_detectors)
def test_outbound_on_match_round_trips(self): def test_outbound_on_match_round_trips(self):
from bot_bottle.egress_addon_core import load_config from bot_bottle.egress_addon_core import load_routes
b = _bottle([{"host": "logs.example", "dlp": { b = _bottle([{"host": "logs.example", "dlp": {
"outbound_on_match": "redact", "outbound_on_match": "redact",
}}]) }}])
routes = egress_routes_for_bottle(b) routes = egress_routes_for_bottle(b)
rendered = egress_render_routes(routes) rendered = egress_render_routes(routes)
self.assertIn('outbound_on_match: "redact"', rendered) self.assertIn('outbound_on_match: "redact"', rendered)
addon_routes = load_config(rendered).routes addon_routes = load_routes(rendered)
self.assertEqual("redact", addon_routes[0].outbound_on_match) self.assertEqual("redact", addon_routes[0].outbound_on_match)
def test_outbound_on_match_default_omitted_from_render(self): def test_outbound_on_match_default_omitted_from_render(self):
@@ -370,12 +364,12 @@ class TestRenderRoutes(unittest.TestCase):
self.assertNotIn("outbound_on_match", rendered) self.assertNotIn("outbound_on_match", rendered)
def test_git_fetch_policy_round_trips(self): def test_git_fetch_policy_round_trips(self):
from bot_bottle.egress_addon_core import load_config from bot_bottle.egress_addon_core import load_routes
b = _bottle([{"host": "github.com", "git": {"fetch": True}}]) b = _bottle([{"host": "github.com", "git": {"fetch": True}}])
routes = egress_routes_for_bottle(b) routes = egress_routes_for_bottle(b)
rendered = egress_render_routes(routes) rendered = egress_render_routes(routes)
self.assertEqual({"fetch": True}, self._parsed(routes)[0]["git"]) self.assertEqual({"fetch": True}, self._parsed(routes)[0]["git"])
addon_routes = load_config(rendered).routes addon_routes = load_routes(rendered)
self.assertTrue(addon_routes[0].git_fetch) self.assertTrue(addon_routes[0].git_fetch)
def test_log_zero_omitted_from_render(self): def test_log_zero_omitted_from_render(self):
@@ -449,119 +443,5 @@ class TestResolveTokenValues(unittest.TestCase):
self.assertEqual({"EGRESS_TOKEN_0": "codex-access-token"}, out) self.assertEqual({"EGRESS_TOKEN_0": "codex-access-token"}, out)
class TestCanaryGeneration(unittest.TestCase):
"""Egress.prepare() generates a unique canary token per session."""
def _bottle_obj(self):
return ManifestIndex.from_json_obj({
"bottles": {"dev": {"egress": {"routes": []}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
}).bottles["dev"]
def _make_plan(self) -> EgressPlan:
# Use a concrete no-op subclass so we can call prepare() without
# a real backend.
class _TestEgress(Egress):
pass
e = _TestEgress()
with tempfile.TemporaryDirectory() as td:
return e.prepare(self._bottle_obj(), "test-slug", Path(td))
def test_canary_is_non_empty(self):
plan = self._make_plan()
self.assertIsInstance(plan.canary, str)
self.assertGreater(len(plan.canary), 0)
self.assertRegex(plan.canary_env, r"^[A-Z]+_[A-Z]+_SECRET$")
def test_canary_is_unique_per_session(self):
with tempfile.TemporaryDirectory() as td:
bottle = self._bottle_obj()
class _TestEgress(Egress):
pass
e = _TestEgress()
plan_a = e.prepare(bottle, "slug-a", Path(td))
plan_b = e.prepare(bottle, "slug-b", Path(td))
self.assertNotEqual(plan_a.canary, plan_b.canary)
def test_canary_detected_by_scan_known_secrets(self):
from bot_bottle.dlp_detectors import scan_known_secrets
plan = self._make_plan()
env = {plan.canary_env: plan.canary}
result = scan_known_secrets(
f"exfil={plan.canary}",
env=env,
sensitive_prefixes=(plan.canary_env,),
)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
self.assertIn(plan.canary_env, result.reason)
def test_egress_plan_canary_field_default_empty(self):
# Verify EgressPlan can be constructed with an empty canary (backward compat).
from pathlib import Path
plan = EgressPlan(
slug="s",
routes_path=Path("/tmp/r.yaml"),
routes=(),
token_env_map={},
)
self.assertEqual("", plan.canary)
self.assertEqual("", plan.canary_env)
class TestEgressEnvEntries(unittest.TestCase):
def test_sidecar_entries_include_route_tokens_and_canary_scan_prefix(self):
plan = EgressPlan(
slug="s",
routes_path=Path("/tmp/r.yaml"),
routes=(EgressRoute(host="api.example"),),
token_env_map={"EGRESS_TOKEN_1": "T1", "EGRESS_TOKEN_0": "T0"},
canary="fake-canary-value",
canary_env="CANON_ALPHA_SECRET",
)
self.assertEqual(
(
"EGRESS_TOKEN_0",
"EGRESS_TOKEN_1",
"CANON_ALPHA_SECRET=fake-canary-value",
"BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET",
),
egress_sidecar_env_entries(plan),
)
def test_agent_entries_include_only_canary_bait(self):
plan = EgressPlan(
slug="s",
routes_path=Path("/tmp/r.yaml"),
routes=(),
token_env_map={},
canary="fake-canary-value",
canary_env="CANON_ALPHA_SECRET",
)
self.assertEqual(
("CANON_ALPHA_SECRET=fake-canary-value",),
egress_agent_env_entries(plan),
)
def test_canary_entries_omitted_when_name_missing(self):
plan = EgressPlan(
slug="s",
routes_path=Path("/tmp/r.yaml"),
routes=(),
token_env_map={},
canary="fake-canary-value",
)
self.assertEqual((), egress_sidecar_env_entries(plan))
self.assertEqual((), egress_agent_env_entries(plan))
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
+42 -130
View File
@@ -32,6 +32,7 @@ from bot_bottle.egress_addon_core import (
is_git_fetch_request, is_git_fetch_request,
is_git_push_request, is_git_push_request,
load_config, load_config,
load_routes,
match_route, match_route,
outbound_scan_headers, outbound_scan_headers,
parse_config, parse_config,
@@ -288,6 +289,47 @@ class TestParseDlp(unittest.TestCase):
}]}) }]})
# --- load_routes ---------------------------------------------------------
class TestLoadRoutes(unittest.TestCase):
def test_yaml_text_round_trip(self):
routes = load_routes(
'routes:\n'
' - host: "api.example"\n'
)
self.assertEqual(1, len(routes))
self.assertEqual("api.example", routes[0].host)
def test_full_route_shape_parses(self):
routes = load_routes(
'routes:\n'
' - host: "api.example"\n'
' auth_scheme: "Bearer"\n'
' token_env: "EGRESS_TOKEN_0"\n'
' matches:\n'
' - paths:\n'
' - value: "/v1/"\n'
' - type: "exact"\n'
' value: "/messages"\n'
)
self.assertEqual(1, len(routes))
r = routes[0]
self.assertEqual("api.example", r.host)
self.assertEqual("Bearer", r.auth_scheme)
self.assertEqual("EGRESS_TOKEN_0", r.token_env)
self.assertEqual(1, len(r.matches))
self.assertEqual(2, len(r.matches[0].paths))
def test_empty_routes_list(self):
routes = load_routes("routes: []\n")
self.assertEqual((), routes)
def test_invalid_yaml_raises_value_error(self):
with self.assertRaises(ValueError):
load_routes("routes:\n\t- host: x\n")
# --- load_config / parse_config ------------------------------------------ # --- load_config / parse_config ------------------------------------------
@@ -336,33 +378,6 @@ class TestLoadConfig(unittest.TestCase):
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
parse_config("not a dict") parse_config("not a dict")
def test_empty_routes_list(self):
cfg = load_config("routes: []\n")
self.assertEqual((), cfg.routes)
def test_full_route_shape_parses(self):
cfg = load_config(
'routes:\n'
' - host: "api.example"\n'
' auth_scheme: "Bearer"\n'
' token_env: "EGRESS_TOKEN_0"\n'
' matches:\n'
' - paths:\n'
' - value: "/v1/"\n'
' - type: "exact"\n'
' value: "/messages"\n'
)
r = cfg.routes[0]
self.assertEqual("api.example", r.host)
self.assertEqual("Bearer", r.auth_scheme)
self.assertEqual("EGRESS_TOKEN_0", r.token_env)
self.assertEqual(1, len(r.matches))
self.assertEqual(2, len(r.matches[0].paths))
def test_invalid_yaml_raises_value_error(self):
with self.assertRaises(ValueError):
load_config("routes:\n\t- host: x\n")
# --- evaluate_matches --------------------------------------------------- # --- evaluate_matches ---------------------------------------------------
@@ -1258,109 +1273,6 @@ class TestBuildTokenAllowPayload(unittest.TestCase):
result = ScanResult(severity="block", reason="r", matched="x") result = ScanResult(severity="block", reason="r", matched="x")
payload = build_token_allow_payload("h", "GET", "/", result) payload = build_token_allow_payload("h", "GET", "/", result)
self.assertNotIn("context:", payload) self.assertNotIn("context:", payload)
class TestScanOutboundEnhanced(unittest.TestCase):
"""scan_outbound changes: binary decode, entropy detector,
broadened known-value prefixes, fragmentation resistance."""
_ROUTE = Route(host="api.example.com")
_ROUTE_ENTROPY = Route(
host="api.example.com",
outbound_detectors=("entropy",),
)
def test_binary_body_latin1_decode_finds_ascii_secret(self):
# Body contains valid ASCII secret surrounded by non-UTF-8 bytes.
secret = "supersecrettoken99"
env = {"EGRESS_TOKEN_0": secret}
# Wrap the secret in bytes that are invalid UTF-8.
body = b"\x80\x81" + secret.encode("ascii") + b"\xff"
result = scan_outbound(self._ROUTE, body, env)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_binary_body_valid_utf8_decoded_correctly(self):
env = {"EGRESS_TOKEN_0": "mysecret"}
# Valid UTF-8 body — should be decoded as UTF-8, not latin-1.
body = "clean body with mysecret".encode("utf-8")
result = scan_outbound(self._ROUTE, body, env)
self.assertIsNotNone(result)
def test_entropy_detector_off_by_default(self):
import string
# High-entropy content should NOT warn if the route has no entropy detector.
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
result = scan_outbound(self._ROUTE, alphabet, {})
self.assertIsNone(result)
def test_entropy_detector_warns_when_enabled(self):
import string
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
result = scan_outbound(self._ROUTE_ENTROPY, alphabet, {})
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("warn", result.severity)
def test_bot_bottle_sensitive_prefixes_env_var(self):
# When the sidecar env contains BOT_BOTTLE_SENSITIVE_PREFIXES,
# scan_outbound should scan those additional prefixes.
secret = "extra-sensitive-value-abc"
env = {
"MY_CRED_KEY": secret,
"BOT_BOTTLE_SENSITIVE_PREFIXES": "MY_CRED_",
}
result = scan_outbound(self._ROUTE, f"x={secret}", env)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_bot_bottle_sensitive_prefixes_multiple(self):
secret = "my-api-key-value-xyz"
env = {
"ANTHROPIC_API_0": secret,
"BOT_BOTTLE_SENSITIVE_PREFIXES": "ANTHROPIC_API_,OTHER_",
}
result = scan_outbound(self._ROUTE, f"auth={secret}", env)
self.assertIsNotNone(result)
def test_canary_detected_via_random_secret_env_name(self):
# The fake secret uses a randomized env name that the sidecar marks
# as sensitive through BOT_BOTTLE_SENSITIVE_PREFIXES.
canary = "canaryvalue12345abcdef"
env = {
"CANON_ALPHA_SECRET": canary,
"BOT_BOTTLE_SENSITIVE_PREFIXES": "CANON_ALPHA_SECRET",
}
result = scan_outbound(self._ROUTE, f"data={canary}", env)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
self.assertIn("CANON_ALPHA_SECRET", result.reason)
def test_fragmented_canary_blocked(self):
# Canary with separators injected is still caught.
canary = "supersecretcanary99"
env = {
"CANON_ALPHA_SECRET": canary,
"BOT_BOTTLE_SENSITIVE_PREFIXES": "CANON_ALPHA_SECRET",
}
fragmented = "-".join(canary)
result = scan_outbound(self._ROUTE, f"x={fragmented}", env)
self.assertIsNotNone(result)
class TestOutboundDetectorNames(unittest.TestCase):
def test_entropy_in_outbound_detector_names(self):
from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES
self.assertIn("entropy", OUTBOUND_DETECTOR_NAMES)
def test_known_secrets_in_outbound_detector_names(self):
from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES
self.assertIn("known_secrets", OUTBOUND_DETECTOR_NAMES)
def test_token_patterns_in_outbound_detector_names(self):
from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES
self.assertIn("token_patterns", OUTBOUND_DETECTOR_NAMES)
if __name__ == "__main__": if __name__ == "__main__":
@@ -12,7 +12,6 @@ import sys
import types import types
import unittest import unittest
from io import StringIO from io import StringIO
from typing import Any
from unittest.mock import patch from unittest.mock import patch
@@ -24,7 +23,7 @@ def _ensure_shims() -> None:
if "mitmproxy" not in sys.modules: if "mitmproxy" not in sys.modules:
_mm = types.ModuleType("mitmproxy") _mm = types.ModuleType("mitmproxy")
_mh = types.ModuleType("mitmproxy.http") _mh = types.ModuleType("mitmproxy.http")
setattr(_mm, "http", _mh) _mm.http = _mh
sys.modules["mitmproxy"] = _mm sys.modules["mitmproxy"] = _mm
sys.modules["mitmproxy.http"] = _mh sys.modules["mitmproxy.http"] = _mh
if "egress_addon_core" not in sys.modules: if "egress_addon_core" not in sys.modules:
@@ -105,14 +104,14 @@ class _Flow:
self.response = response or _Response() self.response = response or _Response()
def _log_request(addon: EgressAddon, flow: _Flow) -> dict[str, Any]: def _log_request(addon: EgressAddon, flow: _Flow) -> dict:
buf = StringIO() buf = StringIO()
with patch("sys.stderr", buf): with patch("sys.stderr", buf):
addon._log_request(flow) # type: ignore[arg-type] addon._log_request(flow) # type: ignore[arg-type]
return json.loads(buf.getvalue()) return json.loads(buf.getvalue())
def _log_response(addon: EgressAddon, flow: _Flow) -> dict[str, Any]: def _log_response(addon: EgressAddon, flow: _Flow) -> dict:
buf = StringIO() buf = StringIO()
with patch("sys.stderr", buf): with patch("sys.stderr", buf):
addon._log_response(flow) # type: ignore[arg-type] addon._log_response(flow) # type: ignore[arg-type]
-9
View File
@@ -54,15 +54,6 @@ class TestValidateRoutesContent(unittest.TestCase):
' auth_scheme: "Bearer"\n' ' auth_scheme: "Bearer"\n'
) )
def test_rejects_log_full(self):
with self.assertRaises(EgressApplyError) as cm:
applicator.validate_routes_content(
'log: 2\n'
'routes:\n'
' - host: "x.example"\n'
)
self.assertIn("must not change egress logging", str(cm.exception))
class TestApplyRoutesChange(unittest.TestCase): class TestApplyRoutesChange(unittest.TestCase):
def setUp(self): def setUp(self):
+1 -24
View File
@@ -30,7 +30,6 @@ def _plan(
supervise: bool = False, supervise: bool = False,
agent_git_gate_url: str = "", agent_git_gate_url: str = "",
agent_supervise_url: str = "", agent_supervise_url: str = "",
canary: bool = False,
) -> MacosContainerBottlePlan: ) -> MacosContainerBottlePlan:
routes_path = stage_dir / "routes.yaml" routes_path = stage_dir / "routes.yaml"
routes_path.write_text("routes: []\n", encoding="utf-8") routes_path.write_text("routes: []\n", encoding="utf-8")
@@ -43,8 +42,6 @@ def _plan(
routes_path=routes_path, routes_path=routes_path,
routes=("route",), routes=("route",),
token_env_map={"EGRESS_TOKEN_0": "HOST_TOKEN"}, token_env_map={"EGRESS_TOKEN_0": "HOST_TOKEN"},
canary="fake-canary-value" if canary else "",
canary_env="CANON_ALPHA_SECRET" if canary else "",
) )
if git: if git:
key_path = stage_dir / "origin-key" key_path = stage_dir / "origin-key"
@@ -141,26 +138,6 @@ class TestMacosContainerLaunchArgv(unittest.TestCase):
argv, argv,
) )
def test_sidecar_argv_registers_canary_env_as_sensitive(self):
plan = _plan(stage_dir=self.stage_dir, canary=True)
argv = launch._sidecar_run_argv(
plan,
"bot-bottle-sidecars-dev-abc",
"bot-bottle-net-dev-abc",
"bot-bottle-egress-dev-abc",
)
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", argv)
self.assertIn("BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET", argv)
def test_agent_argv_receives_canary_env(self):
plan = _plan(stage_dir=self.stage_dir, canary=True)
argv = launch._agent_run_argv(
plan,
"bot-bottle-net-dev-abc",
"192.0.2.10",
)
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", argv)
def test_agent_env_points_proxy_at_sidecar_ip(self): def test_agent_env_points_proxy_at_sidecar_ip(self):
plan = _plan( plan = _plan(
stage_dir=self.stage_dir, stage_dir=self.stage_dir,
@@ -294,7 +271,7 @@ def _build_plan(stage_dir: Path) -> MacosContainerBottlePlan:
manifest=_MANIFEST, manifest=_MANIFEST,
stage_dir=stage_dir, stage_dir=stage_dir,
git_gate_plan=cast(GitGatePlan, SimpleNamespace(upstreams=())), git_gate_plan=cast(GitGatePlan, SimpleNamespace(upstreams=())),
egress_plan=cast(EgressPlan, SimpleNamespace(canary="")), egress_plan=cast(EgressPlan, SimpleNamespace()),
supervise_plan=None, supervise_plan=None,
agent_provision=AgentProvisionPlan( agent_provision=AgentProvisionPlan(
template="claude", template="claude",
-27
View File
@@ -73,33 +73,6 @@ resolver #2
) )
self.assertTrue(run.call_args_list[-1].kwargs["check"]) self.assertTrue(run.call_args_list[-1].kwargs["check"])
def test_build_image_anchors_relative_dockerfile_to_context(self):
status = util.subprocess.CompletedProcess(
args=[],
returncode=0,
stdout=(
'[{"status":{"state":"running"},'
'"configuration":{"dns":{"nameservers":["9.9.9.9"]}}}]'
),
stderr="",
)
with patch.object(util.subprocess, "run", return_value=status) as run, \
patch.object(util.os, "environ", {
"BOT_BOTTLE_MACOS_CONTAINER_DNS": "9.9.9.9",
}):
util.build_image(
"bot-bottle-sidecars:latest",
"/repo",
dockerfile="Dockerfile.sidecars",
)
self.assertEqual(
[
"container", "build", "-t", "bot-bottle-sidecars:latest",
"--dns", "9.9.9.9", "-f", "/repo/Dockerfile.sidecars", "/repo",
],
run.call_args_list[-1].args[0],
)
def test_commit_container_execs_tar_and_builds_image(self): def test_commit_container_execs_tar_and_builds_image(self):
# stderr is bytes because subprocess.run uses stderr=PIPE without text=True # stderr is bytes because subprocess.run uses stderr=PIPE without text=True
completed = util.subprocess.CompletedProcess( completed = util.subprocess.CompletedProcess(
+1 -28
View File
@@ -167,40 +167,13 @@ class TestAgentProviderHostCredentials(unittest.TestCase):
}, },
}) })
def test_startup_args_allowed_for_claude(self): def test_settings_rejected_for_claude(self):
b = _provider_config_bottle({
"template": "claude",
"settings": {"startup_args": ["--model", "opus"]},
})
self.assertEqual(
{"startup_args": ["--model", "opus"]},
b.agent_provider.settings,
)
def test_startup_args_allowed_for_codex(self):
b = _provider_config_bottle({
"template": "codex",
"settings": {"startup_args": ["--model", "gpt-5-codex"]},
})
self.assertEqual(
{"startup_args": ["--model", "gpt-5-codex"]},
b.agent_provider.settings,
)
def test_provider_specific_settings_still_rejected_for_claude(self):
with self.assertRaises(ManifestError): with self.assertRaises(ManifestError):
_provider_config_bottle({ _provider_config_bottle({
"template": "claude", "template": "claude",
"settings": {"models": ["qwen2.5-coder:7b"]}, "settings": {"models": ["qwen2.5-coder:7b"]},
}) })
def test_startup_args_must_be_string_array(self):
with self.assertRaises(ManifestError):
_provider_config_bottle({
"template": "codex",
"settings": {"startup_args": ["--model", 42]},
})
def test_settings_models_must_be_non_empty_string_array(self): def test_settings_models_must_be_non_empty_string_array(self):
with self.assertRaises(ManifestError): with self.assertRaises(ManifestError):
_provider_config_bottle({ _provider_config_bottle({
+1 -1
View File
@@ -130,7 +130,7 @@ def _capture_print(plan: DockerBottlePlan | SmolmachinesBottlePlan) -> list[str]
orig = sys.stderr orig = sys.stderr
sys.stderr = buf sys.stderr = buf
try: try:
plan.print() plan.print(remote_control=False)
finally: finally:
sys.stderr = orig sys.stderr = orig
return buf.getvalue().splitlines() return buf.getvalue().splitlines()
+4 -29
View File
@@ -26,7 +26,9 @@ from bot_bottle.backend.smolmachines.bottle import SmolmachinesBottle
from bot_bottle.backend.smolmachines.bottle_plan import ( from bot_bottle.backend.smolmachines.bottle_plan import (
SmolmachinesBottlePlan, SmolmachinesBottlePlan,
) )
from bot_bottle.backend.smolmachines import launch as _launch # from bot_bottle.backend.smolmachines.provision import (
# workspace as _workspace,
# )
from bot_bottle.backend.smolmachines.launch import _bundle_launch_spec from bot_bottle.backend.smolmachines.launch import _bundle_launch_spec
from bot_bottle.backend.util import AGENT_CA_PATH from bot_bottle.backend.util import AGENT_CA_PATH
from bot_bottle.egress import EgressPlan, EgressRoute from bot_bottle.egress import EgressPlan, EgressRoute
@@ -42,6 +44,7 @@ class _Provider(AgentProvider):
return AgentProviderRuntime( return AgentProviderRuntime(
template="test", command="test", image="", template="test", command="test", image="",
prompt_mode="append_file", bypass_args=(), resume_args=(), prompt_mode="append_file", bypass_args=(), resume_args=(),
remote_control_args=(),
) )
def provision_plan(self, **kwargs): # type: ignore[override] def provision_plan(self, **kwargs): # type: ignore[override]
raise NotImplementedError raise NotImplementedError
@@ -83,7 +86,6 @@ def _plan(
stage_dir: Path | None = None, stage_dir: Path | None = None,
egress_routes: tuple[EgressRoute, ...] = (), egress_routes: tuple[EgressRoute, ...] = (),
egress_ca_path: Path = Path(), egress_ca_path: Path = Path(),
canary: bool = False,
supervise: bool = False, supervise: bool = False,
bundle_ip: str = "192.168.50.2", bundle_ip: str = "192.168.50.2",
agent_git_gate_host: str = "127.0.0.1:55555", agent_git_gate_host: str = "127.0.0.1:55555",
@@ -154,8 +156,6 @@ def _plan(
routes=egress_routes, routes=egress_routes,
token_env_map={}, token_env_map={},
mitmproxy_ca_cert_only_host_path=egress_ca_path, mitmproxy_ca_cert_only_host_path=egress_ca_path,
canary="fake-canary-value" if canary else "",
canary_env="CANON_ALPHA_SECRET" if canary else "",
), ),
supervise_plan=supervise_plan, supervise_plan=supervise_plan,
agent_git_gate_host=agent_git_gate_host, agent_git_gate_host=agent_git_gate_host,
@@ -411,31 +411,6 @@ class TestBundleLaunchSpec(unittest.TestCase):
self.assertIn(9420, spec.ports_to_publish) self.assertIn(9420, spec.ports_to_publish)
self.assertNotIn(9418, spec.ports_to_publish) self.assertNotIn(9418, spec.ports_to_publish)
def test_canary_env_registered_as_sensitive_in_bundle(self):
plan = _plan(canary=True)
spec = _bundle_launch_spec(plan, "net", "127.0.0.16")
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", spec.environment)
self.assertIn(
"BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET",
spec.environment,
)
def test_canary_env_visible_to_smolvm_guest(self):
plan = _plan(canary=True)
with patch.object(
_launch._bundle,
"bundle_host_port",
return_value="65000",
):
stamped = _launch._discover_urls(plan, "127.0.0.16")
self.assertEqual(
"fake-canary-value",
stamped.guest_env["CANON_ALPHA_SECRET"],
)
class TestProvisionGitUser(unittest.TestCase): class TestProvisionGitUser(unittest.TestCase):
"""`provision_git` runs `git config --global` inside the """`provision_git` runs `git config --global` inside the
+21 -175
View File
@@ -20,7 +20,6 @@ import supervise as _sv # noqa: E402 # type: ignore
from bot_bottle import supervise_server # noqa: E402 from bot_bottle import supervise_server # noqa: E402
from bot_bottle.supervise_server import ( from bot_bottle.supervise_server import (
ERR_INTERNAL,
ERR_INVALID_PARAMS, ERR_INVALID_PARAMS,
ERR_INVALID_REQUEST, ERR_INVALID_REQUEST,
ERR_METHOD_NOT_FOUND, ERR_METHOD_NOT_FOUND,
@@ -30,9 +29,7 @@ from bot_bottle.supervise_server import (
PROPOSED_FILE_FIELD, PROPOSED_FILE_FIELD,
ServerConfig, ServerConfig,
TOOL_DEFINITIONS, TOOL_DEFINITIONS,
_RpcClientError,
_RpcError, _RpcError,
_RpcInternalError,
_response_timeout_from_env, _response_timeout_from_env,
format_response_text, format_response_text,
handle_initialize, handle_initialize,
@@ -50,15 +47,15 @@ from bot_bottle.supervise_server import (
class TestValidation(unittest.TestCase): class TestValidation(unittest.TestCase):
def test_capability_block_accepts_anything_nonempty(self):
validate_proposed_file(
_sv.TOOL_CAPABILITY_BLOCK,
"FROM python:3.13\nRUN apk add git\n",
)
def test_empty_proposed_file_rejected_for_tools_with_file_field(self): def test_empty_proposed_file_rejected_for_tools_with_file_field(self):
with self.assertRaises(_RpcError): with self.assertRaises(_RpcError):
validate_proposed_file(_sv.TOOL_EGRESS_ALLOW, " \n\t") validate_proposed_file(_sv.TOOL_CAPABILITY_BLOCK, " \n\t")
def test_capability_block_rejected_as_unknown_tool(self):
with self.assertRaises(_RpcError) as cm:
validate_proposed_file("capability-block", "FROM python:3.13\n")
self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code)
self.assertIn("unknown tool", cm.exception.message)
def test_egress_routes_yaml_is_validated(self): def test_egress_routes_yaml_is_validated(self):
validate_proposed_file( validate_proposed_file(
@@ -70,74 +67,6 @@ class TestValidation(unittest.TestCase):
with self.assertRaises(_RpcError): with self.assertRaises(_RpcError):
validate_proposed_file(_sv.TOOL_EGRESS_BLOCK, "routes: nope\n") validate_proposed_file(_sv.TOOL_EGRESS_BLOCK, "routes: nope\n")
def test_egress_routes_yaml_rejects_log_full(self):
with self.assertRaises(_RpcError) as cm:
validate_proposed_file(
_sv.TOOL_EGRESS_ALLOW,
"log: 2\nroutes:\n - host: example.com\n",
)
self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code)
self.assertIn("must not change egress logging", cm.exception.message)
# --- Error taxonomy --------------------------------------------------------
class TestRpcErrorTaxonomy(unittest.TestCase):
def test_rpc_client_error_is_rpc_error(self):
e = _RpcClientError(ERR_INVALID_PARAMS, "bad param")
self.assertIsInstance(e, _RpcError)
self.assertEqual(ERR_INVALID_PARAMS, e.code)
self.assertEqual("bad param", e.message)
def test_rpc_internal_error_is_rpc_error(self):
e = _RpcInternalError("disk full")
self.assertIsInstance(e, _RpcError)
self.assertEqual(ERR_INTERNAL, e.code)
self.assertEqual("disk full", e.message)
def test_rpc_internal_error_preserves_cause(self):
cause = OSError("no space left on device")
try:
raise _RpcInternalError("failed to write") from cause
except _RpcInternalError as e:
self.assertIs(cause, e.__cause__)
def test_parse_error_is_client_error(self):
with self.assertRaises(_RpcClientError):
parse_jsonrpc(b"{bad json")
def test_validation_error_is_client_error(self):
with self.assertRaises(_RpcClientError):
validate_proposed_file(_sv.TOOL_EGRESS_ALLOW, "routes: nope\n")
def test_unknown_tool_in_tools_call_is_client_error(self):
config = ServerConfig(bottle_slug="dev", queue_dir=Path("/unused"))
with self.assertRaises(_RpcClientError) as cm:
handle_tools_call({"name": "no-such-tool", "arguments": {}}, config)
self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code)
class TestRpcInternalErrorOnIoFailure(unittest.TestCase):
def test_write_proposal_os_error_raises_internal(self):
config = ServerConfig(
bottle_slug="dev",
queue_dir=Path("/dev/null/cannot-exist"),
)
with self.assertRaises(_RpcInternalError) as cm:
handle_tools_call(
{
"name": _sv.TOOL_EGRESS_ALLOW,
"arguments": {
"routes_yaml": "routes:\n - host: example.com\n",
"justification": "x",
},
},
config,
)
self.assertEqual(ERR_INTERNAL, cm.exception.code)
self.assertIsNotNone(cm.exception.__cause__)
# --- JSON-RPC parsing ------------------------------------------------------ # --- JSON-RPC parsing ------------------------------------------------------
@@ -219,6 +148,7 @@ class TestHandleToolsList(unittest.TestCase):
self.assertEqual( self.assertEqual(
sorted([ sorted([
_sv.TOOL_EGRESS_ALLOW, _sv.TOOL_EGRESS_ALLOW,
_sv.TOOL_CAPABILITY_BLOCK,
_sv.TOOL_EGRESS_BLOCK, _sv.TOOL_EGRESS_BLOCK,
_sv.TOOL_LIST_EGRESS_ROUTES, _sv.TOOL_LIST_EGRESS_ROUTES,
]), ]),
@@ -294,10 +224,10 @@ class TestHandleToolsCall(unittest.TestCase):
try: try:
result = handle_tools_call( result = handle_tools_call(
{ {
"name": _sv.TOOL_EGRESS_BLOCK, "name": _sv.TOOL_CAPABILITY_BLOCK,
"arguments": { "arguments": {
"routes_yaml": "routes:\n - host: example.com\n", "dockerfile": "FROM python:3.13\n",
"justification": "need example.com", "justification": "need git",
}, },
}, },
self.config, self.config,
@@ -334,9 +264,9 @@ class TestHandleToolsCall(unittest.TestCase):
try: try:
result = handle_tools_call( result = handle_tools_call(
{ {
"name": _sv.TOOL_EGRESS_ALLOW, "name": _sv.TOOL_CAPABILITY_BLOCK,
"arguments": { "arguments": {
"routes_yaml": "routes:\n - host: example.com\n", "dockerfile": "FROM python:3.13\n",
"justification": "needed for tests", "justification": "needed for tests",
}, },
}, },
@@ -358,52 +288,20 @@ class TestHandleToolsCall(unittest.TestCase):
with self.assertRaises(_RpcError): with self.assertRaises(_RpcError):
handle_tools_call( handle_tools_call(
{ {
"name": _sv.TOOL_EGRESS_ALLOW, "name": _sv.TOOL_CAPABILITY_BLOCK,
"arguments": {"routes_yaml": "routes:\n - host: example.com\n"}, "arguments": {"dockerfile": "FROM python:3.13\n"},
}, },
self.config, self.config,
) )
def test_missing_name_raises(self):
with self.assertRaises(_RpcError) as cm:
handle_tools_call({"arguments": {}}, self.config)
self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code)
def test_arguments_must_be_object(self):
with self.assertRaises(_RpcError) as cm:
handle_tools_call(
{
"name": _sv.TOOL_EGRESS_ALLOW,
"arguments": [],
},
self.config,
)
self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code)
self.assertIn("must be an object", cm.exception.message)
def test_capability_block_call_raises_unknown_tool(self):
with self.assertRaises(_RpcError) as cm:
handle_tools_call(
{
"name": "capability-block",
"arguments": {
"dockerfile": "FROM python:3.13\n",
"justification": "need git",
},
},
self.config,
)
self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code)
self.assertIn("unknown tool", cm.exception.message)
def test_archives_proposal_after_response(self): def test_archives_proposal_after_response(self):
responder = self._respond_when_proposal_appears(_sv.STATUS_APPROVED) responder = self._respond_when_proposal_appears(_sv.STATUS_APPROVED)
try: try:
handle_tools_call( handle_tools_call(
{ {
"name": _sv.TOOL_EGRESS_ALLOW, "name": _sv.TOOL_CAPABILITY_BLOCK,
"arguments": { "arguments": {
"routes_yaml": "routes:\n - host: example.com\n", "dockerfile": "FROM python:3.13\n",
"justification": "x", "justification": "x",
}, },
}, },
@@ -425,10 +323,10 @@ class TestHandleToolsCall(unittest.TestCase):
) )
result = handle_tools_call( result = handle_tools_call(
{ {
"name": _sv.TOOL_EGRESS_ALLOW, "name": _sv.TOOL_CAPABILITY_BLOCK,
"arguments": { "arguments": {
"routes_yaml": "routes:\n - host: example.com\n", "dockerfile": "FROM python:3.13\n",
"justification": "need egress", "justification": "need a capability",
}, },
}, },
config, config,
@@ -443,31 +341,6 @@ class TestHandleToolsCall(unittest.TestCase):
class TestHandleListEgressRoutes(unittest.TestCase): class TestHandleListEgressRoutes(unittest.TestCase):
def test_success_returns_body_text(self):
class _Resp:
def __enter__(self):
return self
def __exit__(self, exc_type: type[BaseException] | None, exc: BaseException | None, tb: object) -> bool:
return False
def read(self):
return b"[{\"host\": \"example.com\"}]"
class _Opener:
def open(self, *args, **kwargs): # noqa: ANN001, ANN002, ANN003 # type: ignore
return _Resp()
with patch.object(supervise_server.urllib.request, "build_opener", return_value=_Opener()):
result = handle_list_egress_routes(
{},
ServerConfig(bottle_slug="dev", queue_dir=Path("/unused")),
)
self.assertFalse(result["isError"]) # type: ignore[index]
text = result["content"][0]["text"] # type: ignore[index]
self.assertIn("example.com", text)
def test_url_error_returns_tool_error(self): def test_url_error_returns_tool_error(self):
class _Opener: class _Opener:
def open(self, *args, **kwargs): # noqa: ANN001, ANN002, ANN003 # type: ignore def open(self, *args, **kwargs): # noqa: ANN001, ANN002, ANN003 # type: ignore
@@ -527,13 +400,6 @@ class TestFormatResponseText(unittest.TestCase):
self.assertIn("the operator modified", text.lower()) self.assertIn("the operator modified", text.lower())
class TestFormatPendingResponseText(unittest.TestCase):
def test_formats_timeout_message(self):
text = supervise_server.format_pending_response_text(12.5)
self.assertIn("status: pending", text)
self.assertIn("12.5s", text)
# --- End-to-end HTTP sanity ------------------------------------------------ # --- End-to-end HTTP sanity ------------------------------------------------
@@ -584,7 +450,7 @@ class TestHttpEndToEnd(unittest.TestCase):
self.assertEqual("2.0", result["jsonrpc"]) self.assertEqual("2.0", result["jsonrpc"])
self.assertEqual(1, result["id"]) self.assertEqual(1, result["id"])
names = [t["name"] for t in result["result"]["tools"]] # type: ignore[index] names = [t["name"] for t in result["result"]["tools"]] # type: ignore[index]
self.assertNotIn("capability-block", names) self.assertIn(_sv.TOOL_CAPABILITY_BLOCK, names)
self.assertIn(_sv.TOOL_EGRESS_ALLOW, names) self.assertIn(_sv.TOOL_EGRESS_ALLOW, names)
self.assertIn(_sv.TOOL_EGRESS_BLOCK, names) self.assertIn(_sv.TOOL_EGRESS_BLOCK, names)
@@ -594,26 +460,6 @@ class TestHttpEndToEnd(unittest.TestCase):
) )
self.assertEqual(ERR_METHOD_NOT_FOUND, result["error"]["code"]) # type: ignore[index] self.assertEqual(ERR_METHOD_NOT_FOUND, result["error"]["code"]) # type: ignore[index]
def test_internal_error_returns_err_internal_over_http(self):
with patch.object(
supervise_server._sv, "write_proposal",
side_effect=OSError("disk full"),
):
result = self._post_jsonrpc({
"jsonrpc": "2.0",
"id": 99,
"method": "tools/call",
"params": {
"name": _sv.TOOL_EGRESS_ALLOW,
"arguments": {
"routes_yaml": "routes:\n - host: example.com\n",
"justification": "x",
},
},
})
self.assertIn("error", result)
self.assertEqual(ERR_INTERNAL, result["error"]["code"]) # type: ignore[index]
def test_health_endpoint(self): def test_health_endpoint(self):
conn = http.client.HTTPConnection("127.0.0.1", self.port, timeout=5) conn = http.client.HTTPConnection("127.0.0.1", self.port, timeout=5)
try: try: