Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 217eadf9a1 | |||
| 3fe3829c8d | |||
| 51751c8d28 | |||
| 330e836085 | |||
| fa38012621 |
@@ -1,9 +0,0 @@
|
||||
[run]
|
||||
branch = True
|
||||
source = .
|
||||
|
||||
[report]
|
||||
omit =
|
||||
bot_bottle/egress_addon.py
|
||||
bot_bottle/cli/tui.py
|
||||
tests/*
|
||||
@@ -39,14 +39,8 @@ jobs:
|
||||
with:
|
||||
python-version: "3.12"
|
||||
|
||||
- name: Install dev requirements
|
||||
run: python3 -m pip install -r requirements-dev.txt
|
||||
|
||||
- name: Run unit tests
|
||||
run: python3 -m coverage run -m unittest discover -t . -s tests/unit -v
|
||||
|
||||
- name: Report unit coverage
|
||||
run: python3 -m coverage report -m
|
||||
run: python3 -m unittest discover -t . -s tests/unit -v
|
||||
|
||||
integration:
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
@@ -22,4 +22,3 @@ venv/
|
||||
.pytest_cache/
|
||||
.mypy_cache/
|
||||
.ruff_cache/
|
||||
.coverage
|
||||
|
||||
@@ -61,6 +61,7 @@ class AgentProviderRuntime:
|
||||
prompt_mode: PromptMode
|
||||
bypass_args: tuple[str, ...]
|
||||
resume_args: tuple[str, ...]
|
||||
remote_control_args: tuple[str, ...]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
@@ -370,15 +371,6 @@ def build_agent_provision_plan(
|
||||
)
|
||||
|
||||
|
||||
def provider_startup_args(
|
||||
provider_settings: dict[str, object] | None,
|
||||
) -> tuple[str, ...]:
|
||||
raw = (provider_settings or {}).get("startup_args", ())
|
||||
if not isinstance(raw, (list, tuple)):
|
||||
return ()
|
||||
return tuple(arg for arg in raw if isinstance(arg, str))
|
||||
|
||||
|
||||
def prompt_args(
|
||||
prompt_mode: PromptMode,
|
||||
prompt_path: str | None,
|
||||
@@ -390,7 +382,7 @@ def prompt_args(
|
||||
if prompt_mode == "append_file":
|
||||
return ["--append-system-prompt-file", prompt_path]
|
||||
if prompt_mode == "read_prompt_file":
|
||||
if argv and ("resume" in argv or "remote-control" in argv):
|
||||
if argv and "resume" in argv:
|
||||
return []
|
||||
return [f"Read and follow the instructions in {prompt_path}."]
|
||||
if prompt_mode == "print_read_prompt_file":
|
||||
|
||||
@@ -109,8 +109,9 @@ class BottlePlan(ABC):
|
||||
def workspace_plan(self) -> WorkspacePlan:
|
||||
return workspace_plan(self.spec, guest_home=self.guest_home)
|
||||
|
||||
def print(self) -> None:
|
||||
def print(self, *, remote_control: bool) -> None:
|
||||
"""Render the y/N preflight summary to stderr."""
|
||||
del remote_control
|
||||
spec = self.spec
|
||||
manifest = self.manifest
|
||||
agent = manifest.agent
|
||||
|
||||
@@ -28,8 +28,6 @@ from typing import Any
|
||||
from ...egress import (
|
||||
EGRESS_HOSTNAME,
|
||||
EGRESS_ROUTES_IN_CONTAINER,
|
||||
egress_agent_env_entries,
|
||||
egress_sidecar_env_entries,
|
||||
)
|
||||
from ...git_gate import GIT_GATE_HOSTNAME
|
||||
from ...log import die, warn
|
||||
@@ -137,7 +135,12 @@ def _sidecar_bundle_service(plan: DockerBottlePlan) -> dict[str, Any]:
|
||||
volumes.append(_bind(ep.mitmproxy_ca_host_path, EGRESS_CA_IN_CONTAINER))
|
||||
if ep.routes:
|
||||
volumes.append(_bind(ep.routes_path.parent, str(Path(EGRESS_ROUTES_IN_CONTAINER).parent)))
|
||||
env.extend(egress_sidecar_env_entries(ep))
|
||||
for token_env in sorted(ep.token_env_map.keys()):
|
||||
env.append(token_env)
|
||||
if ep.canary:
|
||||
# Inject canary as a literal NAME=VALUE (not a bare name) — the
|
||||
# value is a fake secret so it need not be hidden from the compose file.
|
||||
env.append(f"EGRESS_TOKEN_CANARY={ep.canary}")
|
||||
|
||||
# --- git-gate -----------------------------------------------------
|
||||
gp = plan.git_gate_plan
|
||||
@@ -221,7 +224,10 @@ def _agent_service(plan: DockerBottlePlan) -> dict[str, Any]:
|
||||
# never lands on argv or in the compose file.
|
||||
for name in sorted(plan.forwarded_env.keys()):
|
||||
env.append(name)
|
||||
env.extend(egress_agent_env_entries(plan.egress_plan))
|
||||
# Canary token: visible to the agent as a fake secret so that any
|
||||
# outbound appearance of this value is a zero-FP exfil signal.
|
||||
if plan.egress_plan.canary:
|
||||
env.append(f"BOT_BOTTLE_CANARY={plan.egress_plan.canary}")
|
||||
|
||||
service: dict[str, Any] = {
|
||||
"image": plan.image,
|
||||
|
||||
@@ -11,7 +11,7 @@ from pathlib import Path
|
||||
|
||||
from ..bottle_state import egress_state_dir
|
||||
from ..egress import EGRESS_ROUTES_FILENAME
|
||||
from ..egress_addon_core import LOG_OFF, load_config
|
||||
from ..egress_addon_core import load_routes
|
||||
|
||||
|
||||
class EgressApplyError(RuntimeError):
|
||||
@@ -33,15 +33,11 @@ class EgressApplicator(ABC):
|
||||
@staticmethod
|
||||
def validate_routes_content(content: str) -> None:
|
||||
try:
|
||||
config = load_config(content)
|
||||
load_routes(content)
|
||||
except ValueError as e:
|
||||
raise EgressApplyError(
|
||||
f"proposed routes.yaml is not valid: {e}"
|
||||
) from e
|
||||
if config.log != LOG_OFF:
|
||||
raise EgressApplyError(
|
||||
"proposed routes.yaml must not change egress logging"
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _routes_path(slug: str) -> Path:
|
||||
|
||||
@@ -22,12 +22,7 @@ from ...bottle_state import (
|
||||
git_gate_state_dir,
|
||||
read_committed_image,
|
||||
)
|
||||
from ...egress import (
|
||||
EGRESS_ROUTES_IN_CONTAINER,
|
||||
egress_agent_env_entries,
|
||||
egress_resolve_token_values,
|
||||
egress_sidecar_env_entries,
|
||||
)
|
||||
from ...egress import EGRESS_ROUTES_IN_CONTAINER, egress_resolve_token_values
|
||||
from ...git_gate import revoke_git_gate_provisioned_keys
|
||||
from ...log import die, info, warn
|
||||
from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT
|
||||
@@ -355,7 +350,11 @@ def _sidecar_daemons(plan: MacosContainerBottlePlan) -> tuple[str, ...]:
|
||||
|
||||
|
||||
def _sidecar_env_entries(plan: MacosContainerBottlePlan) -> tuple[str, ...]:
|
||||
env: list[str] = list(egress_sidecar_env_entries(plan.egress_plan))
|
||||
env: list[str] = []
|
||||
if plan.egress_plan.routes:
|
||||
env.extend(sorted(plan.egress_plan.token_env_map.keys()))
|
||||
if plan.egress_plan.canary:
|
||||
env.append(f"EGRESS_TOKEN_CANARY={plan.egress_plan.canary}")
|
||||
if plan.git_gate_plan.upstreams:
|
||||
env.append(f"BOT_BOTTLE_GIT_GATE_READY_FILE={_GIT_GATE_READY_FILE}")
|
||||
if plan.supervise_plan is not None:
|
||||
@@ -423,7 +422,8 @@ def _agent_env_entries(
|
||||
env.append(f"{name}={value}")
|
||||
for name in sorted(plan.forwarded_env.keys()):
|
||||
env.append(name)
|
||||
env.extend(egress_agent_env_entries(plan.egress_plan))
|
||||
if plan.egress_plan.canary:
|
||||
env.append(f"BOT_BOTTLE_CANARY={plan.egress_plan.canary}")
|
||||
return tuple(env)
|
||||
|
||||
|
||||
|
||||
@@ -68,11 +68,6 @@ def build_image(ref: str, context: str, *, dockerfile: str = "") -> None:
|
||||
_ensure_builder_dns()
|
||||
args = [_CONTAINER, "build", "-t", ref, "--dns", dns_server()]
|
||||
if dockerfile:
|
||||
# `container build` resolves -f relative to the current working
|
||||
# directory, not the build context. Anchor a relative Dockerfile to
|
||||
# the context so builds work from any cwd.
|
||||
if not os.path.isabs(dockerfile):
|
||||
dockerfile = os.path.join(context, dockerfile)
|
||||
args.extend(["-f", dockerfile])
|
||||
args.append(context)
|
||||
subprocess.run(args, check=True)
|
||||
|
||||
@@ -23,9 +23,7 @@ from typing import Callable, Generator
|
||||
|
||||
from ...egress import (
|
||||
EGRESS_ROUTES_IN_CONTAINER,
|
||||
egress_agent_env_entries,
|
||||
egress_resolve_token_values,
|
||||
egress_sidecar_env_entries,
|
||||
)
|
||||
from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT
|
||||
from ...util import expand_tilde
|
||||
@@ -230,9 +228,6 @@ def _discover_urls(
|
||||
guest_env["GIT_GATE_URL"] = f"http://{agent_git_gate_host}"
|
||||
if agent_supervise_url:
|
||||
guest_env["MCP_SUPERVISE_URL"] = agent_supervise_url
|
||||
for entry in egress_agent_env_entries(plan.egress_plan):
|
||||
name, value = entry.split("=", 1)
|
||||
guest_env[name] = value
|
||||
|
||||
return dataclasses.replace(
|
||||
plan,
|
||||
@@ -321,7 +316,11 @@ def _bundle_launch_spec(
|
||||
volumes.append((str(ep.mitmproxy_ca_host_path), EGRESS_CA_IN_CONTAINER, True))
|
||||
if ep.routes:
|
||||
volumes.append((str(ep.routes_path.parent), str(Path(EGRESS_ROUTES_IN_CONTAINER).parent), True))
|
||||
env.extend(egress_sidecar_env_entries(ep))
|
||||
# Bare-name entries for upstream-token slots. Their values
|
||||
# come from the docker-run subprocess env (inherited from
|
||||
# the operator's shell), never landing on argv.
|
||||
for token_env in sorted(ep.token_env_map.keys()):
|
||||
env.append(token_env)
|
||||
|
||||
# --- git-gate ---------------------------------------------
|
||||
gp = plan.git_gate_plan
|
||||
|
||||
@@ -28,6 +28,7 @@ from .start import _launch_bottle
|
||||
def cmd_resume(argv: list[str]) -> int:
|
||||
parser = argparse.ArgumentParser(prog=f"{PROG} resume", add_help=True)
|
||||
parser.add_argument("--dry-run", action="store_true")
|
||||
parser.add_argument("--remote-control", action="store_true")
|
||||
parser.add_argument(
|
||||
"identity",
|
||||
help="bottle identity from a prior `start` (see its session-end output)",
|
||||
@@ -55,5 +56,6 @@ def cmd_resume(argv: list[str]) -> int:
|
||||
return _launch_bottle(
|
||||
spec,
|
||||
dry_run=args.dry_run,
|
||||
remote_control=args.remote_control,
|
||||
backend_name=backend_name,
|
||||
)
|
||||
|
||||
+10
-4
@@ -42,6 +42,7 @@ def cmd_start(argv: list[str]) -> int:
|
||||
parser = argparse.ArgumentParser(prog=f"{PROG} start", add_help=True)
|
||||
parser.add_argument("--dry-run", action="store_true")
|
||||
parser.add_argument("--cwd", action="store_true", help="copy host cwd into the running bottle")
|
||||
parser.add_argument("--remote-control", action="store_true")
|
||||
parser.add_argument(
|
||||
"--backend",
|
||||
choices=known_backend_names(),
|
||||
@@ -88,6 +89,7 @@ def cmd_start(argv: list[str]) -> int:
|
||||
return _launch_bottle(
|
||||
spec,
|
||||
dry_run=dry_run,
|
||||
remote_control=args.remote_control,
|
||||
backend_name=backend_name,
|
||||
)
|
||||
|
||||
@@ -132,7 +134,7 @@ def prepare_with_preflight(
|
||||
|
||||
|
||||
def attach_agent(
|
||||
bottle: Bottle, *, resume: bool = False,
|
||||
bottle: Bottle, *, remote_control: bool = False, resume: bool = False,
|
||||
agent_provider_template: str = "claude",
|
||||
startup_args: tuple[str, ...] = (),
|
||||
) -> int:
|
||||
@@ -151,6 +153,8 @@ def attach_agent(
|
||||
"(Ctrl-D or 'exit' to leave; container will be removed)"
|
||||
)
|
||||
agent_args = list(runtime.bypass_args)
|
||||
if remote_control:
|
||||
agent_args.extend(runtime.remote_control_args)
|
||||
agent_args.extend(startup_args)
|
||||
if resume:
|
||||
agent_args.extend(runtime.resume_args)
|
||||
@@ -214,9 +218,9 @@ def _text_prompt_yes() -> bool:
|
||||
return reply in ("y", "Y", "yes", "YES")
|
||||
|
||||
|
||||
def _text_render_preflight():
|
||||
def _text_render_preflight(*, remote_control: bool):
|
||||
def _render(plan: DockerBottlePlan) -> None:
|
||||
plan.print()
|
||||
plan.print(remote_control=remote_control)
|
||||
return _render
|
||||
|
||||
|
||||
@@ -224,6 +228,7 @@ def _launch_bottle(
|
||||
spec: BottleSpec,
|
||||
*,
|
||||
dry_run: bool,
|
||||
remote_control: bool,
|
||||
backend_name: str | None = None,
|
||||
) -> int:
|
||||
"""Shared launch core for `start` and `resume`. Builds the plan,
|
||||
@@ -235,7 +240,7 @@ def _launch_bottle(
|
||||
plan, identity = prepare_with_preflight(
|
||||
spec,
|
||||
stage_dir=stage_dir,
|
||||
render_preflight=_text_render_preflight(),
|
||||
render_preflight=_text_render_preflight(remote_control=remote_control),
|
||||
prompt_yes=_text_prompt_yes,
|
||||
dry_run=dry_run,
|
||||
backend_name=backend_name,
|
||||
@@ -248,6 +253,7 @@ def _launch_bottle(
|
||||
agent_provider_template = getattr(plan, "agent_provider_template", "claude")
|
||||
exit_code = attach_agent(
|
||||
bottle,
|
||||
remote_control=remote_control,
|
||||
agent_provider_template=agent_provider_template,
|
||||
startup_args=plan.agent_provision.startup_args,
|
||||
)
|
||||
|
||||
@@ -2,8 +2,9 @@
|
||||
act on them (approve / modify / reject).
|
||||
|
||||
Curses-based TUI; modify-then-approve shells out to $EDITOR. The
|
||||
Egress proposals are queued for operator review as full routes.yaml
|
||||
updates.
|
||||
approval handler wires to PRD 0016 (capability-block), which rebuilds
|
||||
the bottle Dockerfile. Egress proposals are queued for operator review
|
||||
as full routes.yaml updates.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -21,6 +22,10 @@ from pathlib import Path
|
||||
|
||||
from .. import supervise as _supervise
|
||||
from ..bottle_state import read_metadata
|
||||
# from ..backend.docker.capability_apply import (
|
||||
# CapabilityApplyError,
|
||||
# apply_capability_change,
|
||||
# )
|
||||
from ..backend.docker.egress_apply import (
|
||||
EgressApplyError,
|
||||
applicator as _docker_applicator,
|
||||
@@ -33,6 +38,10 @@ from ..backend.smolmachines.egress_apply import (
|
||||
)
|
||||
from ..log import Die, error, info
|
||||
|
||||
|
||||
class CapabilityApplyError(RuntimeError):
|
||||
"""Placeholder while capability_apply is disabled."""
|
||||
|
||||
from ..supervise import (
|
||||
COMPONENT_FOR_TOOL,
|
||||
AuditEntry,
|
||||
@@ -41,10 +50,12 @@ from ..supervise import (
|
||||
STATUS_APPROVED,
|
||||
STATUS_MODIFIED,
|
||||
STATUS_REJECTED,
|
||||
TOOL_CAPABILITY_BLOCK,
|
||||
TOOL_EGRESS_ALLOW,
|
||||
TOOL_EGRESS_BLOCK,
|
||||
TOOL_GITLEAKS_ALLOW,
|
||||
TOOL_EGRESS_TOKEN_ALLOW,
|
||||
archive_proposal,
|
||||
list_pending_proposals,
|
||||
render_diff,
|
||||
write_audit_entry,
|
||||
@@ -72,7 +83,7 @@ class QueuedProposal:
|
||||
# Errors any remediation engine may raise. Caught by the TUI key
|
||||
# handlers and surfaced in the status line so a failed apply keeps
|
||||
# the proposal pending rather than crashing curses.
|
||||
ApplyError = (EgressApplyError,)
|
||||
ApplyError = (CapabilityApplyError, EgressApplyError)
|
||||
|
||||
|
||||
def apply_routes_change(slug: str, content: str) -> tuple[str, str]:
|
||||
@@ -132,6 +143,8 @@ def _detail_lines(
|
||||
|
||||
|
||||
def _suffix_for_tool(tool: str) -> str:
|
||||
if tool == TOOL_CAPABILITY_BLOCK:
|
||||
return ".dockerfile"
|
||||
if tool in (TOOL_EGRESS_ALLOW, TOOL_EGRESS_BLOCK):
|
||||
return ".yaml"
|
||||
if tool in (TOOL_GITLEAKS_ALLOW, TOOL_EGRESS_TOKEN_ALLOW):
|
||||
@@ -153,6 +166,17 @@ def approve(
|
||||
file_to_apply = final_file if final_file is not None else qp.proposal.proposed_file
|
||||
|
||||
diff_before, diff_after = "", ""
|
||||
# if qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
|
||||
# _meta = read_metadata(qp.proposal.bottle_slug)
|
||||
# if _meta is not None and not _meta.compose_project:
|
||||
# raise CapabilityApplyError(
|
||||
# "capability-block remediation is not supported for smolmachines "
|
||||
# "bottles. Reject this proposal or handle the capability change "
|
||||
# "manually, then restart the bottle."
|
||||
# )
|
||||
# diff_before, diff_after = apply_capability_change(
|
||||
# qp.proposal.bottle_slug, file_to_apply,
|
||||
# )
|
||||
if qp.proposal.tool in (TOOL_EGRESS_ALLOW, TOOL_EGRESS_BLOCK):
|
||||
diff_before, diff_after = apply_routes_change(
|
||||
qp.proposal.bottle_slug,
|
||||
@@ -170,6 +194,9 @@ def approve(
|
||||
qp, action=status, notes=notes,
|
||||
diff_before=diff_before, diff_after=diff_after,
|
||||
)
|
||||
if qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
|
||||
archive_proposal(qp.queue_dir, qp.proposal.id)
|
||||
|
||||
|
||||
def reject(qp: QueuedProposal, *, reason: str) -> None:
|
||||
"""Write a rejection response and an audit entry."""
|
||||
@@ -319,7 +346,7 @@ def _list_once() -> int:
|
||||
return 0
|
||||
|
||||
|
||||
def _try_init_green() -> int: # pragma: no cover
|
||||
def _try_init_green() -> int:
|
||||
"""Initialise a green color pair and return its attr, or 0."""
|
||||
try:
|
||||
curses.start_color()
|
||||
@@ -330,7 +357,7 @@ def _try_init_green() -> int: # pragma: no cover
|
||||
return 0
|
||||
|
||||
|
||||
def _main_loop(stdscr: "curses._CursesWindow") -> None: # type: ignore # pragma: no cover
|
||||
def _main_loop(stdscr: "curses._CursesWindow") -> None: # type: ignore
|
||||
curses.curs_set(0)
|
||||
stdscr.timeout(_REFRESH_INTERVAL_MS)
|
||||
green_attr = _try_init_green()
|
||||
@@ -420,7 +447,7 @@ def _render(
|
||||
status_line: str,
|
||||
*,
|
||||
green_attr: int = 0, # noqa: F841 — unused, but required by interface
|
||||
) -> None: # pragma: no cover
|
||||
) -> None:
|
||||
stdscr.erase()
|
||||
h, w = stdscr.getmaxyx()
|
||||
header = f"bot-bottle supervise ({len(pending)} pending)"
|
||||
@@ -471,7 +498,7 @@ def _detail_view(
|
||||
qp: QueuedProposal,
|
||||
*,
|
||||
green_attr: int = 0,
|
||||
) -> None: # pragma: no cover
|
||||
) -> None:
|
||||
"""Render the full proposal. Scrollable. Press q to return."""
|
||||
lines = _detail_lines(qp, green_attr=green_attr)
|
||||
offset = 0
|
||||
@@ -523,7 +550,7 @@ def _detail_view(
|
||||
return
|
||||
|
||||
|
||||
def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None: # type: ignore # pragma: no cover
|
||||
def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None: # type: ignore
|
||||
"""Suspend curses, open $EDITOR on the proposed file, return edited content."""
|
||||
suffix = _suffix_for_tool(qp.proposal.tool)
|
||||
curses.endwin()
|
||||
@@ -534,7 +561,7 @@ def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None:
|
||||
return edited
|
||||
|
||||
|
||||
def _prompt(stdscr: "curses._CursesWindow", label: str) -> str: # type: ignore # pragma: no cover
|
||||
def _prompt(stdscr: "curses._CursesWindow", label: str) -> str: # type: ignore
|
||||
"""One-line input at the bottom of the screen."""
|
||||
curses.curs_set(1)
|
||||
h, _ = stdscr.getmaxyx()
|
||||
|
||||
@@ -20,7 +20,6 @@ from ...agent_provider import (
|
||||
AgentProvisionDir,
|
||||
AgentProvisionFile,
|
||||
AgentProvisionPlan,
|
||||
provider_startup_args,
|
||||
)
|
||||
from ...backend.docker import util as docker_mod
|
||||
from ...egress import EgressRoute
|
||||
@@ -91,6 +90,7 @@ _RUNTIME = AgentProviderRuntime(
|
||||
prompt_mode="append_file",
|
||||
bypass_args=("--dangerously-skip-permissions",),
|
||||
resume_args=("--continue",),
|
||||
remote_control_args=("--remote-control",),
|
||||
)
|
||||
|
||||
|
||||
@@ -115,9 +115,8 @@ class ClaudeAgentProvider(AgentProvider):
|
||||
color: str = "",
|
||||
provider_settings: dict[str, object] | None = None,
|
||||
) -> AgentProvisionPlan:
|
||||
del forward_host_credentials, host_env
|
||||
del forward_host_credentials, host_env, provider_settings
|
||||
resolved_guest_env = dict(guest_env or {})
|
||||
startup_args = provider_startup_args(provider_settings)
|
||||
guest_home = self.guest_home
|
||||
trusted_path = trusted_project_path or guest_home
|
||||
|
||||
@@ -200,7 +199,6 @@ class ClaudeAgentProvider(AgentProvider):
|
||||
env_vars=env_vars,
|
||||
guest_env=resolved_guest_env,
|
||||
has_prompt=has_prompt,
|
||||
startup_args=startup_args,
|
||||
dirs=dirs,
|
||||
files=tuple(files),
|
||||
egress_routes=egress_routes,
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
# bot-bottle Codex provider image.
|
||||
#
|
||||
# Mirrors the default Claude image shape: Node LTS, git/network tooling,
|
||||
# non-root node user, and the provider CLI installed for that user.
|
||||
# non-root node user, and the provider CLI installed globally.
|
||||
|
||||
FROM node:22-slim
|
||||
|
||||
RUN apt-get update \
|
||||
&& apt-get install -y --no-install-recommends git ca-certificates curl procps \
|
||||
&& apt-get install -y --no-install-recommends git ca-certificates curl \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# App-specific deps. Python isn't required by codex itself
|
||||
@@ -17,15 +17,12 @@ RUN apt-get update \
|
||||
&& apt-get install -y --no-install-recommends python3 python3-pip python3-venv \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
RUN npm install -g --no-fund --no-audit @openai/codex@0.136.0 \
|
||||
&& npm cache clean --force
|
||||
|
||||
USER node
|
||||
WORKDIR /home/node
|
||||
|
||||
ENV PATH="/home/node/.local/bin:${PATH}"
|
||||
|
||||
# Remote-control support requires the standalone Codex install layout
|
||||
# under ~/.codex/packages/standalone/current. The npm package can run
|
||||
# the TUI, but remote-control commands expect this installer-owned path.
|
||||
RUN mkdir -p /home/node/.codex \
|
||||
&& curl -fsSL https://chatgpt.com/codex/install.sh | sh
|
||||
RUN mkdir -p /home/node/.codex
|
||||
|
||||
CMD ["codex"]
|
||||
|
||||
@@ -22,7 +22,6 @@ from ...agent_provider import (
|
||||
AgentProvisionCommand,
|
||||
AgentProvisionFile,
|
||||
AgentProvisionPlan,
|
||||
provider_startup_args,
|
||||
)
|
||||
from .codex_auth import codex_host_access_token, write_codex_dummy_auth_file
|
||||
from ...egress import CODEX_HOST_CREDENTIAL_TOKEN_REF, EgressRoute
|
||||
@@ -55,6 +54,7 @@ _RUNTIME = AgentProviderRuntime(
|
||||
prompt_mode="read_prompt_file",
|
||||
bypass_args=("--dangerously-bypass-approvals-and-sandbox",),
|
||||
resume_args=("resume", "--last"),
|
||||
remote_control_args=(),
|
||||
)
|
||||
|
||||
|
||||
@@ -79,9 +79,8 @@ class CodexAgentProvider(AgentProvider):
|
||||
color: str = "",
|
||||
provider_settings: dict[str, object] | None = None,
|
||||
) -> AgentProvisionPlan:
|
||||
del auth_token, label, color
|
||||
del auth_token, label, color, provider_settings
|
||||
resolved_guest_env = dict(guest_env or {})
|
||||
startup_args = provider_startup_args(provider_settings)
|
||||
guest_home = self.guest_home
|
||||
trusted_path = trusted_project_path or guest_home
|
||||
|
||||
@@ -164,7 +163,6 @@ class CodexAgentProvider(AgentProvider):
|
||||
env_vars=env_vars,
|
||||
guest_env=resolved_guest_env,
|
||||
has_prompt=has_prompt,
|
||||
startup_args=startup_args,
|
||||
dirs=tuple(dirs),
|
||||
files=tuple(files),
|
||||
pre_copy=tuple(pre_copy),
|
||||
|
||||
@@ -21,7 +21,6 @@ from ...agent_provider import (
|
||||
AgentProvisionDir,
|
||||
AgentProvisionFile,
|
||||
AgentProvisionPlan,
|
||||
provider_startup_args,
|
||||
)
|
||||
from ...egress import EgressRoute
|
||||
from ...log import die, info
|
||||
@@ -166,6 +165,7 @@ _RUNTIME = AgentProviderRuntime(
|
||||
prompt_mode="append_system_prompt",
|
||||
bypass_args=(),
|
||||
resume_args=(),
|
||||
remote_control_args=(),
|
||||
)
|
||||
|
||||
|
||||
@@ -199,7 +199,6 @@ class PiAgentProvider(AgentProvider):
|
||||
models_payload, base_url, api_key_env, models, provider_name = (
|
||||
_pi_models_json(settings)
|
||||
)
|
||||
extra_startup_args = provider_startup_args(provider_settings)
|
||||
models_file = state_dir / "pi-models.json"
|
||||
models_file.write_text(json.dumps(models_payload, indent=2) + "\n")
|
||||
models_file.chmod(0o600)
|
||||
@@ -220,7 +219,6 @@ class PiAgentProvider(AgentProvider):
|
||||
startup_args=(
|
||||
"--models",
|
||||
",".join(f"{provider_name}/{model}" for model in models),
|
||||
*extra_startup_args,
|
||||
),
|
||||
dirs=(AgentProvisionDir(f"{guest_home}/.pi/agent"),),
|
||||
files=(AgentProvisionFile(models_file, _models_path(guest_home)),),
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
"""DLP detectors for the egress proxy (PRD 0053).
|
||||
"""DLP detectors for the egress proxy (PRD 0053, prd-new).
|
||||
|
||||
Pure Python, no mitmproxy dependency. Each detector is a module-level
|
||||
function returning `ScanResult | None`.
|
||||
@@ -123,7 +123,7 @@ def redact_tokens(
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Known secrets detector
|
||||
# Known secrets detector (Phase 1b, prd-new)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _encoded_variants(secret: str) -> list[str]:
|
||||
@@ -165,7 +165,7 @@ def _encoded_variants(secret: str) -> list[str]:
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fragmentation-resistant helpers
|
||||
# Fragmentation-resistant helpers (prd-new)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Minimum length of alnum projection for projection-based checks to run.
|
||||
@@ -283,7 +283,7 @@ def scan_known_secrets(
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Entropy detector (warn-only)
|
||||
# Entropy detector (warn-only, prd-new)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Sliding window size and step for the entropy scan.
|
||||
|
||||
+3
-51
@@ -35,50 +35,6 @@ EGRESS_HOSTNAME = "egress"
|
||||
EGRESS_ROUTES_IN_CONTAINER = "/etc/egress/routes.yaml"
|
||||
EGRESS_ROUTES_FILENAME = Path(EGRESS_ROUTES_IN_CONTAINER).name
|
||||
|
||||
_CANARY_ENV_WORDS = (
|
||||
"ACCORD",
|
||||
"ANCHOR",
|
||||
"ATLAS",
|
||||
"CANON",
|
||||
"CIPHER",
|
||||
"EMBER",
|
||||
"FALCON",
|
||||
"HARBOR",
|
||||
"LANTERN",
|
||||
"MARBLE",
|
||||
"NOVA",
|
||||
"ORBIT",
|
||||
"PIVOT",
|
||||
"RADIUS",
|
||||
"SUMMIT",
|
||||
"VECTOR",
|
||||
)
|
||||
|
||||
|
||||
def _random_canary_env() -> str:
|
||||
first = secrets.choice(_CANARY_ENV_WORDS)
|
||||
remaining = tuple(word for word in _CANARY_ENV_WORDS if word != first)
|
||||
second = secrets.choice(remaining)
|
||||
return f"{first}_{second}_SECRET"
|
||||
|
||||
|
||||
def egress_sidecar_env_entries(plan: "EgressPlan") -> tuple[str, ...]:
|
||||
"""Return sidecar env entries needed by egress across all backends."""
|
||||
env: list[str] = []
|
||||
if plan.routes:
|
||||
env.extend(sorted(plan.token_env_map.keys()))
|
||||
if plan.canary and plan.canary_env:
|
||||
env.append(f"{plan.canary_env}={plan.canary}")
|
||||
env.append(f"BOT_BOTTLE_SENSITIVE_PREFIXES={plan.canary_env}")
|
||||
return tuple(env)
|
||||
|
||||
|
||||
def egress_agent_env_entries(plan: "EgressPlan") -> tuple[str, ...]:
|
||||
"""Return agent-visible egress env entries shared by all backends."""
|
||||
if plan.canary and plan.canary_env:
|
||||
return (f"{plan.canary_env}={plan.canary}",)
|
||||
return ()
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class EgressRoute(Route):
|
||||
@@ -111,7 +67,6 @@ class EgressPlan:
|
||||
mitmproxy_ca_cert_only_host_path: Path = Path()
|
||||
log: int = 0
|
||||
canary: str = ""
|
||||
canary_env: str = ""
|
||||
|
||||
|
||||
def egress_manifest_routes(
|
||||
@@ -371,9 +326,9 @@ class Egress(ABC):
|
||||
routes_path = stage_dir / EGRESS_ROUTES_FILENAME
|
||||
routes_path.write_text(egress_render_routes(routes, log=log))
|
||||
routes_path.chmod(0o600)
|
||||
# Generate a per-session fake secret under a plausible random env name.
|
||||
# The sidecar marks that exact env name as sensitive for known-secret
|
||||
# scanning; the agent receives the same name/value as exfil bait.
|
||||
# Generate a per-session canary token. The sidecar receives it as
|
||||
# EGRESS_TOKEN_CANARY (scanned by the existing known-secrets detector);
|
||||
# the agent receives it as BOT_BOTTLE_CANARY (a visible fake secret).
|
||||
canary = secrets.token_urlsafe(32)
|
||||
return EgressPlan(
|
||||
slug=slug,
|
||||
@@ -382,7 +337,6 @@ class Egress(ABC):
|
||||
token_env_map=egress_token_env_map(routes),
|
||||
log=log,
|
||||
canary=canary,
|
||||
canary_env=_random_canary_env(),
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
@@ -397,7 +351,5 @@ __all__ = [
|
||||
"egress_render_routes",
|
||||
"egress_resolve_token_values",
|
||||
"egress_routes_for_bottle",
|
||||
"egress_agent_env_entries",
|
||||
"egress_sidecar_env_entries",
|
||||
"egress_token_env_map",
|
||||
]
|
||||
|
||||
@@ -160,37 +160,26 @@ class EgressAddon:
|
||||
)
|
||||
|
||||
def _log_request(self, flow: http.HTTPFlow) -> None:
|
||||
headers = {
|
||||
k: redact_tokens(v, env=os.environ)
|
||||
for k, v in flow.request.headers.items()
|
||||
if k.lower() != "authorization"
|
||||
}
|
||||
body = redact_tokens(flow.request.get_text(strict=False) or "", env=os.environ)
|
||||
sys.stderr.write(
|
||||
json.dumps({
|
||||
"event": "egress_request",
|
||||
"host": redact_tokens(flow.request.pretty_host, env=os.environ),
|
||||
"method": flow.request.method,
|
||||
"path": redact_tokens(flow.request.path, env=os.environ),
|
||||
"headers": headers,
|
||||
"body": body,
|
||||
"headers": dict(flow.request.headers),
|
||||
"body": flow.request.get_text(strict=False) or "",
|
||||
})
|
||||
+ "\n"
|
||||
)
|
||||
|
||||
def _log_response(self, flow: http.HTTPFlow) -> None:
|
||||
headers = {
|
||||
k: redact_tokens(v, env=os.environ)
|
||||
for k, v in flow.response.headers.items()
|
||||
}
|
||||
body = redact_tokens(flow.response.get_text(strict=False) or "", env=os.environ)
|
||||
sys.stderr.write(
|
||||
json.dumps({
|
||||
"event": "egress_response",
|
||||
"host": flow.request.pretty_host,
|
||||
"status": flow.response.status_code,
|
||||
"headers": headers,
|
||||
"body": body,
|
||||
"headers": dict(flow.response.headers),
|
||||
"body": flow.response.get_text(strict=False) or "",
|
||||
})
|
||||
+ "\n"
|
||||
)
|
||||
|
||||
@@ -439,6 +439,15 @@ def route_to_yaml_dict(r: Route) -> dict[str, object]:
|
||||
return d
|
||||
|
||||
|
||||
def load_routes(text: str) -> tuple[Route, ...]:
|
||||
"""Parse YAML text → routes."""
|
||||
try:
|
||||
payload = parse_yaml_subset(text)
|
||||
except YamlSubsetError as e:
|
||||
raise ValueError(f"routes payload: invalid YAML: {e}") from e
|
||||
return parse_routes(payload)
|
||||
|
||||
|
||||
def parse_config(payload: object) -> "Config":
|
||||
"""Parse a full egress config payload (top-level log level + routes)."""
|
||||
if not isinstance(payload, dict):
|
||||
@@ -853,6 +862,7 @@ __all__ = [
|
||||
"is_git_push_request",
|
||||
"is_git_fetch_request",
|
||||
"load_config",
|
||||
"load_routes",
|
||||
"match_route",
|
||||
"outbound_scan_headers",
|
||||
"parse_config",
|
||||
|
||||
@@ -199,10 +199,13 @@ def _parse_provider_settings(
|
||||
) -> dict[str, object]:
|
||||
if raw is None:
|
||||
return {}
|
||||
if template != "pi":
|
||||
raise ManifestError(
|
||||
f"bottle '{bottle_name}' agent_provider.settings is only "
|
||||
"supported for template 'pi'"
|
||||
)
|
||||
settings = as_json_object(raw, f"bottle '{bottle_name}' agent_provider.settings")
|
||||
|
||||
common_allowed = {"startup_args"}
|
||||
pi_allowed = {
|
||||
allowed = {
|
||||
"provider",
|
||||
"base_url",
|
||||
"api",
|
||||
@@ -215,37 +218,12 @@ def _parse_provider_settings(
|
||||
"supports_developer_role",
|
||||
"supports_reasoning_effort",
|
||||
}
|
||||
if template == "pi":
|
||||
allowed = common_allowed | pi_allowed
|
||||
elif template in ("claude", "codex"):
|
||||
allowed = common_allowed
|
||||
elif template not in PROVIDER_TEMPLATES:
|
||||
return dict(settings)
|
||||
else:
|
||||
allowed = common_allowed
|
||||
|
||||
for key in settings:
|
||||
if key not in allowed:
|
||||
raise ManifestError(
|
||||
f"bottle '{bottle_name}' agent_provider.settings has unknown "
|
||||
f"key {key!r}; allowed: {', '.join(sorted(allowed))}"
|
||||
)
|
||||
startup_args = settings.get("startup_args")
|
||||
if startup_args is not None:
|
||||
if not isinstance(startup_args, list):
|
||||
raise ManifestError(
|
||||
f"bottle '{bottle_name}' agent_provider.settings.startup_args "
|
||||
f"must be an array of strings"
|
||||
)
|
||||
for i, arg in enumerate(startup_args):
|
||||
if not isinstance(arg, str) or not arg:
|
||||
raise ManifestError(
|
||||
f"bottle '{bottle_name}' agent_provider.settings."
|
||||
f"startup_args[{i}] must be a non-empty string"
|
||||
)
|
||||
if template != "pi":
|
||||
return dict(settings)
|
||||
|
||||
for key in ("provider", "base_url", "api", "api_key", "api_key_env"):
|
||||
value = settings.get(key)
|
||||
if value is not None and (not isinstance(value, str) or not value):
|
||||
|
||||
@@ -47,11 +47,11 @@ from pathlib import Path
|
||||
try:
|
||||
# Same-directory imports inside the bundle container; these files are
|
||||
# COPYed flat under /app by Dockerfile.sidecars.
|
||||
from egress_addon_core import LOG_OFF, load_config
|
||||
from egress_addon_core import load_routes
|
||||
import supervise as _sv
|
||||
except ModuleNotFoundError:
|
||||
# Package imports for host-side tests and tooling.
|
||||
from .egress_addon_core import LOG_OFF, load_config
|
||||
from .egress_addon_core import load_routes
|
||||
from . import supervise as _sv
|
||||
|
||||
|
||||
@@ -297,17 +297,12 @@ def validate_proposed_file(tool: str, content: str) -> None:
|
||||
pass
|
||||
elif tool in (_sv.TOOL_EGRESS_ALLOW, _sv.TOOL_EGRESS_BLOCK):
|
||||
try:
|
||||
config = load_config(content)
|
||||
load_routes(content)
|
||||
except ValueError as e:
|
||||
raise _RpcError(
|
||||
ERR_INVALID_PARAMS,
|
||||
f"{tool}: proposed routes.yaml is not valid: {e}",
|
||||
) from e
|
||||
if config.log != LOG_OFF:
|
||||
raise _RpcError(
|
||||
ERR_INVALID_PARAMS,
|
||||
f"{tool}: proposed routes.yaml must not change egress logging",
|
||||
)
|
||||
else:
|
||||
raise _RpcError(ERR_INVALID_PARAMS, f"unknown tool {tool!r}")
|
||||
|
||||
|
||||
@@ -1,85 +0,0 @@
|
||||
# PRD 0064: LOG_FULL egress logging credential redaction
|
||||
|
||||
- **Status:** Active
|
||||
- **Author:** claude
|
||||
- **Created:** 2026-06-25
|
||||
- **Issue:** #257
|
||||
|
||||
## Summary
|
||||
|
||||
The `LOG_FULL` egress logging path (`_log_request` and `_log_response` in `egress_addon.py`) writes request/response headers and bodies to stderr without redaction and includes the sidecar-injected upstream `Authorization` header verbatim. This PR applies `redact_tokens` to header values and bodies in both log functions and strips the injected `Authorization` header from request logs entirely.
|
||||
|
||||
## Problem
|
||||
|
||||
`LOG_FULL` (log level 2) is intended for debugging egress traffic. When active it calls `_log_request` and `_log_response`. Both functions have two related bugs:
|
||||
|
||||
1. **Injected `Authorization` header exposure.** `_log_request` is called *after* the sidecar injects upstream credentials (`flow.request.headers["authorization"] = decision.inject_authorization`). The full header dict — including the live credential — is serialized to stderr. Any log collector that ingests the egress container's stderr will receive the upstream bearer token in plaintext.
|
||||
|
||||
2. **Unredacted bodies and header values.** Neither `_log_request` nor `_log_response` passes body or header values through `redact_tokens`. By contrast, `_req_ctx` (used for block/warn events) already calls `redact_tokens` on path and host. Any provisioned secret or recognized token pattern that appears in a request body, response body, or non-Authorization header value will be logged verbatim under `LOG_FULL`.
|
||||
|
||||
These two bugs compose: an agent that enables `LOG_FULL` and simultaneously triggers a request that carries a known token gains a write path from credentials → egress logs.
|
||||
|
||||
## Goals / Success Criteria
|
||||
|
||||
- `_log_request` never logs the `authorization` header in any form.
|
||||
- `_log_request` applies `redact_tokens(value, env=os.environ)` to every other header value before serializing.
|
||||
- `_log_request` applies `redact_tokens(body, env=os.environ)` to the request body before logging.
|
||||
- `_log_response` applies `redact_tokens(value, env=os.environ)` to every response header value before logging.
|
||||
- `_log_response` applies `redact_tokens(body, env=os.environ)` to the response body before logging.
|
||||
- Unit tests cover each of the five cases above.
|
||||
|
||||
## Non-goals
|
||||
|
||||
- Redacting host or path in the full-log path (already covered by `_req_ctx` for block/warn events; `_log_request` already calls `redact_tokens` on host and path).
|
||||
- Suppressing `LOG_FULL` or adding a new log level.
|
||||
- Changing the outbound DLP scan logic.
|
||||
|
||||
## Design
|
||||
|
||||
### `_log_request`
|
||||
|
||||
```python
|
||||
def _log_request(self, flow: http.HTTPFlow) -> None:
|
||||
headers = {
|
||||
k: redact_tokens(v, env=os.environ)
|
||||
for k, v in flow.request.headers.items()
|
||||
if k.lower() != "authorization"
|
||||
}
|
||||
body = redact_tokens(flow.request.get_text(strict=False) or "", env=os.environ)
|
||||
sys.stderr.write(
|
||||
json.dumps({
|
||||
"event": "egress_request",
|
||||
"host": redact_tokens(flow.request.pretty_host, env=os.environ),
|
||||
"method": flow.request.method,
|
||||
"path": redact_tokens(flow.request.path, env=os.environ),
|
||||
"headers": headers,
|
||||
"body": body,
|
||||
})
|
||||
+ "\n"
|
||||
)
|
||||
```
|
||||
|
||||
The `authorization` key is excluded because by the time `_log_request` is called the sidecar has already injected the upstream credential (`decision.inject_authorization`). Logging it would write a live bearer token to stderr on every allowed request. There is no safe subset to log — the value is always a live credential or empty.
|
||||
|
||||
### `_log_response`
|
||||
|
||||
```python
|
||||
def _log_response(self, flow: http.HTTPFlow) -> None:
|
||||
headers = {
|
||||
k: redact_tokens(v, env=os.environ)
|
||||
for k, v in flow.response.headers.items()
|
||||
}
|
||||
body = redact_tokens(flow.response.get_text(strict=False) or "", env=os.environ)
|
||||
sys.stderr.write(
|
||||
json.dumps({
|
||||
"event": "egress_response",
|
||||
"host": flow.request.pretty_host,
|
||||
"status": flow.response.status_code,
|
||||
"headers": headers,
|
||||
"body": body,
|
||||
})
|
||||
+ "\n"
|
||||
)
|
||||
```
|
||||
|
||||
Response headers don't carry injected credentials, so no header name is suppressed — only the values are scrubbed by `redact_tokens`.
|
||||
+10
-13
@@ -1,4 +1,4 @@
|
||||
# PRD 0063: Strengthen outbound exfiltration detection
|
||||
# PRD prd-new: Strengthen outbound exfiltration detection
|
||||
|
||||
- **Status:** Active
|
||||
- **Author:** claude
|
||||
@@ -37,10 +37,9 @@ query, headers, body). But the content-based strong tier only matches
|
||||
## Goals / Success Criteria
|
||||
|
||||
1. Each launched bottle has a unique canary token in the agent's environment
|
||||
under a randomized `WORD_WORD_SECRET` env var name. The egress sidecar gets
|
||||
the same env var and registers that exact name through
|
||||
`BOT_BOTTLE_SENSITIVE_PREFIXES`. Any outbound appearance of the canary
|
||||
blocks the request as a known-secret match.
|
||||
(`BOT_BOTTLE_CANARY`) and the egress sidecar's environment
|
||||
(`EGRESS_TOKEN_CANARY`). Any outbound appearance of the canary blocks the
|
||||
request with reason `"canary token"`.
|
||||
2. `scan_known_secrets` accepts a `sensitive_prefixes` parameter (default:
|
||||
`("EGRESS_TOKEN_",)`). `scan_outbound` reads
|
||||
`BOT_BOTTLE_SENSITIVE_PREFIXES` from `environ` and merges those prefixes
|
||||
@@ -78,20 +77,18 @@ query, headers, body). But the content-based strong tier only matches
|
||||
```
|
||||
Egress.prepare()
|
||||
canary = secrets.token_urlsafe(32)
|
||||
canary_env = <random WORD_WORD_SECRET>
|
||||
EgressPlan(canary=canary, canary_env=canary_env, ...)
|
||||
EgressPlan(canary=canary, ...)
|
||||
|
||||
Docker compose render:
|
||||
sidecar env: <canary_env>=<canary>
|
||||
sidecar env: BOT_BOTTLE_SENSITIVE_PREFIXES=<canary_env>
|
||||
agent env: <canary_env>=<canary> ← visible to agent as a "secret"
|
||||
sidecar env: EGRESS_TOKEN_CANARY=<canary> ← scanned by existing known-secrets detector
|
||||
agent env: BOT_BOTTLE_CANARY=<canary> ← visible to agent as a "secret"
|
||||
|
||||
macos-container launch: same literals added to sidecar + agent env entries
|
||||
```
|
||||
|
||||
The sidecar uses `BOT_BOTTLE_SENSITIVE_PREFIXES` to make the random canary env
|
||||
name part of the existing `scan_known_secrets` detector without adding a
|
||||
manifest schema field.
|
||||
`EGRESS_TOKEN_CANARY` matches the `EGRESS_TOKEN_` prefix already scanned by
|
||||
`scan_known_secrets`, so no detector code changes are required for canary
|
||||
detection — only the injection path.
|
||||
|
||||
### Broadened known-value scanning
|
||||
|
||||
@@ -4,4 +4,3 @@
|
||||
|
||||
pylint>=3.0.0
|
||||
pyright>=1.1.300
|
||||
coverage>=7.0.0
|
||||
|
||||
@@ -92,9 +92,9 @@ class TestSandboxEscape(unittest.TestCase):
|
||||
"on PATH: curl -sSL https://smolmachines.com/install.sh | sh"
|
||||
)
|
||||
|
||||
# Throwaway static key for the git-gate fixture. It need not
|
||||
# be a real SSH key: test 5 reaches gitleaks before any SSH
|
||||
# attempt anyway.
|
||||
# Throwaway "identity file" for the git-gate's `identity` field.
|
||||
# It need not be a real SSH key: test 5 reaches gitleaks before
|
||||
# any SSH attempt anyway.
|
||||
fd, kp = tempfile.mkstemp(prefix="sandbox-test-key.")
|
||||
os.close(fd)
|
||||
cls._key_path = Path(kp)
|
||||
@@ -123,10 +123,7 @@ class TestSandboxEscape(unittest.TestCase):
|
||||
"git-gate": {"repos": {
|
||||
"throwaway": {
|
||||
"url": "ssh://git@unreachable.invalid:22/throwaway.git",
|
||||
"key": {
|
||||
"provider": "static",
|
||||
"path": str(cls._key_path),
|
||||
},
|
||||
"identity": str(cls._key_path),
|
||||
},
|
||||
}},
|
||||
},
|
||||
|
||||
@@ -198,7 +198,6 @@ class TestSmolmachinesLaunch(unittest.TestCase):
|
||||
# connect fails, which is the property chunk 3 will
|
||||
# preserve once egress is actually running.
|
||||
r = self.bottle.exec(
|
||||
"env -u HTTPS_PROXY -u HTTP_PROXY -u https_proxy -u http_proxy "
|
||||
f"curl -s --show-error --max-time 3 http://{self.plan.bundle_ip}:9099 "
|
||||
"2>&1 || true"
|
||||
)
|
||||
|
||||
@@ -168,34 +168,6 @@ class TestAgentProviderRuntime(unittest.TestCase):
|
||||
self.assertEqual("~/.claude/statusline.sh", settings["statusLine"]["command"])
|
||||
self.assertEqual("custom:bot-bottle-research-ui", settings["theme"])
|
||||
|
||||
def test_claude_plan_uses_startup_args_from_provider_settings(self):
|
||||
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
|
||||
plan = build_agent_provision_plan(
|
||||
template="claude",
|
||||
dockerfile="",
|
||||
state_dir=Path(tmp),
|
||||
instance_name="bot-bottle-test",
|
||||
prompt_file=Path(tmp) / "prompt.txt",
|
||||
provider_settings={
|
||||
"startup_args": ["--model", "opus"],
|
||||
},
|
||||
)
|
||||
self.assertEqual(("--model", "opus"), plan.startup_args)
|
||||
|
||||
def test_codex_plan_uses_startup_args_from_provider_settings(self):
|
||||
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
|
||||
plan = build_agent_provision_plan(
|
||||
template="codex",
|
||||
dockerfile="",
|
||||
state_dir=Path(tmp),
|
||||
instance_name="bot-bottle-test",
|
||||
prompt_file=Path(tmp) / "prompt.txt",
|
||||
provider_settings={
|
||||
"startup_args": ["--model", "gpt-5-codex"],
|
||||
},
|
||||
)
|
||||
self.assertEqual(("--model", "gpt-5-codex"), plan.startup_args)
|
||||
|
||||
def test_codex_forward_host_credentials_populates_egress_routes(self):
|
||||
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
|
||||
home = Path(tmp) / "host-codex"
|
||||
@@ -422,24 +394,6 @@ class TestAgentProviderRuntime(unittest.TestCase):
|
||||
self.assertNotIn("OPENROUTER_API_KEY", plan.guest_env)
|
||||
self.assertTrue(provider["compat"]["supportsReasoningEffort"])
|
||||
|
||||
def test_pi_plan_appends_startup_args_from_provider_settings(self):
|
||||
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
|
||||
plan = build_agent_provision_plan(
|
||||
template="pi",
|
||||
dockerfile="",
|
||||
state_dir=Path(tmp),
|
||||
instance_name="bot-bottle-test",
|
||||
prompt_file=Path(tmp) / "prompt.txt",
|
||||
provider_settings={
|
||||
"models": ["qwen3:14b"],
|
||||
"startup_args": ["--no-stream"],
|
||||
},
|
||||
)
|
||||
self.assertEqual(
|
||||
("--models", "ollama/qwen3:14b", "--no-stream"),
|
||||
plan.startup_args,
|
||||
)
|
||||
|
||||
def test_pi_prompt_mode_appends_system_prompt_interactively(self):
|
||||
self.assertEqual(
|
||||
["--append-system-prompt", "/home/node/.bot-bottle-prompt.txt"],
|
||||
|
||||
@@ -102,27 +102,6 @@ class TestAttachAgent(unittest.TestCase):
|
||||
bottle.argv,
|
||||
)
|
||||
|
||||
def test_remote_control_is_provider_startup_arg(self):
|
||||
class Bottle:
|
||||
argv: list[str] = []
|
||||
|
||||
def exec_agent(self, argv: list[str], *, tty: bool = True) -> int:
|
||||
self.argv = list(argv)
|
||||
return 0
|
||||
|
||||
bottle = Bottle()
|
||||
exit_code = start_mod.attach_agent(
|
||||
bottle, # type: ignore[arg-type]
|
||||
agent_provider_template="codex",
|
||||
startup_args=("remote-control",),
|
||||
)
|
||||
|
||||
self.assertEqual(0, exit_code)
|
||||
self.assertEqual(
|
||||
["--dangerously-bypass-approvals-and-sandbox", "remote-control"],
|
||||
bottle.argv,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
@@ -80,11 +80,7 @@ def _git_gate_plan(upstreams: tuple[GitGateUpstream, ...] = ()) -> GitGatePlan:
|
||||
)
|
||||
|
||||
|
||||
def _egress_plan(
|
||||
routes: tuple[EgressRoute, ...] = (),
|
||||
*,
|
||||
canary: bool = False,
|
||||
) -> EgressPlan:
|
||||
def _egress_plan(routes: tuple[EgressRoute, ...] = ()) -> EgressPlan:
|
||||
token_env_map = {
|
||||
r.token_env: r.token_ref
|
||||
for r in routes
|
||||
@@ -99,8 +95,6 @@ def _egress_plan(
|
||||
egress_network=f"bot-bottle-egress-{SLUG}",
|
||||
mitmproxy_ca_host_path=STATE / "egress-ca" / "mitmproxy-ca.pem",
|
||||
mitmproxy_ca_cert_only_host_path=STATE / "egress-ca" / "ca.pem",
|
||||
canary="fake-canary-value" if canary else "",
|
||||
canary_env="CANON_ALPHA_SECRET" if canary else "",
|
||||
)
|
||||
|
||||
|
||||
@@ -118,7 +112,6 @@ def _plan(
|
||||
with_git: bool = False,
|
||||
with_egress: bool = False,
|
||||
supervise: bool = False,
|
||||
canary: bool = False,
|
||||
) -> DockerBottlePlan:
|
||||
"""Build a fully-resolved DockerBottlePlan. Toggles cover the
|
||||
matrix the renderer's conditional-service logic branches on."""
|
||||
@@ -157,7 +150,7 @@ def _plan(
|
||||
slug=SLUG,
|
||||
forwarded_env={"CLAUDE_CODE_OAUTH_TOKEN": "x"},
|
||||
git_gate_plan=_git_gate_plan(upstreams),
|
||||
egress_plan=_egress_plan(routes, canary=canary),
|
||||
egress_plan=_egress_plan(routes),
|
||||
supervise_plan=_supervise_plan() if supervise else None,
|
||||
use_runsc=False,
|
||||
agent_provision=AgentProvisionPlan(
|
||||
@@ -382,20 +375,6 @@ class TestSidecarBundleShape(unittest.TestCase):
|
||||
env_strings = sc["environment"]
|
||||
self.assertNotIn("EGRESS_TOKEN_0", env_strings)
|
||||
|
||||
def test_canary_env_registered_as_sensitive_in_sidecar(self):
|
||||
sc = self._render(canary=True)["services"]["sidecars"]
|
||||
env_strings = sc["environment"]
|
||||
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", env_strings)
|
||||
self.assertIn(
|
||||
"BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET",
|
||||
env_strings,
|
||||
)
|
||||
|
||||
def test_canary_env_visible_to_agent(self):
|
||||
agent = self._render(canary=True)["services"]["agent"]
|
||||
env_strings = agent["environment"]
|
||||
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", env_strings)
|
||||
|
||||
def test_supervise_env_present_when_active(self):
|
||||
sc = self._render(supervise=True)["services"]["sidecars"]
|
||||
env_strings = sc["environment"]
|
||||
|
||||
@@ -29,9 +29,6 @@ from bot_bottle.supervise import SupervisePlan
|
||||
|
||||
|
||||
_URL = "http://supervise:9100/"
|
||||
_CODEX_DOCKERFILE = (
|
||||
Path(__file__).resolve().parents[2] / "bot_bottle/contrib/codex/Dockerfile"
|
||||
)
|
||||
|
||||
|
||||
def _make_bottle(exec_result: ExecResult | None = None) -> MagicMock:
|
||||
@@ -279,12 +276,6 @@ class TestCodexProvision(unittest.TestCase):
|
||||
)
|
||||
|
||||
|
||||
class TestCodexDockerfile(unittest.TestCase):
|
||||
def test_installs_procps_for_remote_control_pid_management(self):
|
||||
dockerfile = _CODEX_DOCKERFILE.read_text()
|
||||
self.assertIn("procps", dockerfile)
|
||||
|
||||
|
||||
class TestCodexSuperviseMcp(unittest.TestCase):
|
||||
def test_noop_when_supervise_disabled(self):
|
||||
bottle = _make_bottle()
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
"""Unit: DLP detectors (PRD 0053).
|
||||
"""Unit: DLP detectors (PRD 0053, prd-new).
|
||||
|
||||
Tests for token pattern scanning, known secret detection, fragmentation-
|
||||
resistant matching, entropy scoring, and naive prompt injection detection."""
|
||||
@@ -9,6 +9,7 @@ import unittest
|
||||
|
||||
from bot_bottle.dlp_detectors import (
|
||||
ENTROPY_BLOCK_THRESHOLD,
|
||||
ENTROPY_WINDOW,
|
||||
PARTIAL_MATCH_MIN_LEN,
|
||||
REDACT,
|
||||
_alnum_projection,
|
||||
@@ -593,15 +594,11 @@ class TestFragmentationResistantMatching(unittest.TestCase):
|
||||
|
||||
def test_canary_prefix_detected(self):
|
||||
canary_value = "canary-fake-secret-value-xyz"
|
||||
env = {"CANON_ALPHA_SECRET": canary_value}
|
||||
result = scan_known_secrets(
|
||||
f"x={canary_value}",
|
||||
env=env,
|
||||
sensitive_prefixes=("CANON_ALPHA_SECRET",),
|
||||
)
|
||||
env = {"EGRESS_TOKEN_CANARY": canary_value}
|
||||
result = scan_known_secrets(f"x={canary_value}", env=env)
|
||||
self.assertIsNotNone(result)
|
||||
assert result is not None
|
||||
self.assertIn("CANON_ALPHA_SECRET", result.reason)
|
||||
self.assertIn("EGRESS_TOKEN_CANARY", result.reason)
|
||||
|
||||
|
||||
class TestRedactTokensBroadenedPrefixes(unittest.TestCase):
|
||||
|
||||
@@ -136,16 +136,6 @@ class TestClaudeArgv(unittest.TestCase):
|
||||
argv,
|
||||
)
|
||||
|
||||
def test_codex_remote_control_startup_arg_does_not_receive_initial_prompt(self):
|
||||
argv = _codex_bottle("/home/node/.bot-bottle-prompt.txt").agent_argv(
|
||||
["--dangerously-bypass-approvals-and-sandbox", "remote-control"],
|
||||
)
|
||||
self.assertEqual(
|
||||
["docker", "exec", "-it", "bot-bottle-dev-abc", "codex",
|
||||
"--dangerously-bypass-approvals-and-sandbox", "remote-control"],
|
||||
argv,
|
||||
)
|
||||
|
||||
def test_codex_resume_does_not_append_initial_prompt(self):
|
||||
argv = _codex_bottle("/home/node/.bot-bottle-prompt.txt").agent_argv(
|
||||
["--dangerously-bypass-approvals-and-sandbox", "resume", "--last"],
|
||||
|
||||
@@ -31,6 +31,7 @@ class _Provider(AgentProvider):
|
||||
return AgentProviderRuntime(
|
||||
template="test", command="test", image="",
|
||||
prompt_mode="append_file", bypass_args=(), resume_args=(),
|
||||
remote_control_args=(),
|
||||
)
|
||||
def provision_plan(self, **kwargs): # type: ignore[override]
|
||||
raise NotImplementedError
|
||||
|
||||
+13
-70
@@ -1,5 +1,5 @@
|
||||
"""Unit: Egress route lift + routes.yaml render + token
|
||||
resolution (PRD 0017, PRD 0053)."""
|
||||
resolution (PRD 0017, PRD 0053, prd-new)."""
|
||||
|
||||
import tempfile
|
||||
import unittest
|
||||
@@ -10,12 +10,10 @@ from bot_bottle.egress import (
|
||||
Egress,
|
||||
EgressPlan,
|
||||
EgressRoute,
|
||||
egress_agent_env_entries,
|
||||
egress_manifest_routes,
|
||||
egress_render_routes,
|
||||
egress_resolve_token_values,
|
||||
egress_routes_for_bottle,
|
||||
egress_sidecar_env_entries,
|
||||
egress_token_env_map,
|
||||
)
|
||||
from bot_bottle.log import Die
|
||||
@@ -322,7 +320,7 @@ class TestRenderRoutes(unittest.TestCase):
|
||||
self.assertEqual([], parse_yaml_subset(rendered)["routes"])
|
||||
|
||||
def test_round_trip_through_addon_core(self):
|
||||
from bot_bottle.egress_addon_core import load_config
|
||||
from bot_bottle.egress_addon_core import load_routes
|
||||
b = _bottle([
|
||||
{"host": "api.github.com",
|
||||
"auth": {"scheme": "Bearer", "token_ref": "GH_PAT"},
|
||||
@@ -333,7 +331,7 @@ class TestRenderRoutes(unittest.TestCase):
|
||||
{"host": "api.anthropic.com"},
|
||||
])
|
||||
routes = egress_routes_for_bottle(b)
|
||||
addon_routes = load_config(egress_render_routes(routes)).routes
|
||||
addon_routes = load_routes(egress_render_routes(routes))
|
||||
self.assertEqual(3, len(addon_routes))
|
||||
self.assertEqual("Bearer", addon_routes[0].auth_scheme)
|
||||
self.assertEqual("EGRESS_TOKEN_0", addon_routes[0].token_env)
|
||||
@@ -341,26 +339,26 @@ class TestRenderRoutes(unittest.TestCase):
|
||||
self.assertEqual("", addon_routes[2].auth_scheme)
|
||||
|
||||
def test_dlp_round_trips(self):
|
||||
from bot_bottle.egress_addon_core import load_config
|
||||
from bot_bottle.egress_addon_core import load_routes
|
||||
b = _bottle([{"host": "x.example", "dlp": {
|
||||
"outbound_detectors": ["token_patterns"],
|
||||
"inbound_detectors": False,
|
||||
}}])
|
||||
routes = egress_routes_for_bottle(b)
|
||||
rendered = egress_render_routes(routes)
|
||||
addon_routes = load_config(rendered).routes
|
||||
addon_routes = load_routes(rendered)
|
||||
self.assertEqual(("token_patterns",), addon_routes[0].outbound_detectors)
|
||||
self.assertEqual((), addon_routes[0].inbound_detectors)
|
||||
|
||||
def test_outbound_on_match_round_trips(self):
|
||||
from bot_bottle.egress_addon_core import load_config
|
||||
from bot_bottle.egress_addon_core import load_routes
|
||||
b = _bottle([{"host": "logs.example", "dlp": {
|
||||
"outbound_on_match": "redact",
|
||||
}}])
|
||||
routes = egress_routes_for_bottle(b)
|
||||
rendered = egress_render_routes(routes)
|
||||
self.assertIn('outbound_on_match: "redact"', rendered)
|
||||
addon_routes = load_config(rendered).routes
|
||||
addon_routes = load_routes(rendered)
|
||||
self.assertEqual("redact", addon_routes[0].outbound_on_match)
|
||||
|
||||
def test_outbound_on_match_default_omitted_from_render(self):
|
||||
@@ -370,12 +368,12 @@ class TestRenderRoutes(unittest.TestCase):
|
||||
self.assertNotIn("outbound_on_match", rendered)
|
||||
|
||||
def test_git_fetch_policy_round_trips(self):
|
||||
from bot_bottle.egress_addon_core import load_config
|
||||
from bot_bottle.egress_addon_core import load_routes
|
||||
b = _bottle([{"host": "github.com", "git": {"fetch": True}}])
|
||||
routes = egress_routes_for_bottle(b)
|
||||
rendered = egress_render_routes(routes)
|
||||
self.assertEqual({"fetch": True}, self._parsed(routes)[0]["git"])
|
||||
addon_routes = load_config(rendered).routes
|
||||
addon_routes = load_routes(rendered)
|
||||
self.assertTrue(addon_routes[0].git_fetch)
|
||||
|
||||
def test_log_zero_omitted_from_render(self):
|
||||
@@ -450,7 +448,7 @@ class TestResolveTokenValues(unittest.TestCase):
|
||||
|
||||
|
||||
class TestCanaryGeneration(unittest.TestCase):
|
||||
"""Egress.prepare() generates a unique canary token per session."""
|
||||
"""Egress.prepare() generates a unique canary token per session (prd-new)."""
|
||||
|
||||
def _bottle_obj(self):
|
||||
return ManifestIndex.from_json_obj({
|
||||
@@ -472,7 +470,6 @@ class TestCanaryGeneration(unittest.TestCase):
|
||||
plan = self._make_plan()
|
||||
self.assertIsInstance(plan.canary, str)
|
||||
self.assertGreater(len(plan.canary), 0)
|
||||
self.assertRegex(plan.canary_env, r"^[A-Z]+_[A-Z]+_SECRET$")
|
||||
|
||||
def test_canary_is_unique_per_session(self):
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
@@ -490,16 +487,12 @@ class TestCanaryGeneration(unittest.TestCase):
|
||||
from bot_bottle.dlp_detectors import scan_known_secrets
|
||||
|
||||
plan = self._make_plan()
|
||||
env = {plan.canary_env: plan.canary}
|
||||
result = scan_known_secrets(
|
||||
f"exfil={plan.canary}",
|
||||
env=env,
|
||||
sensitive_prefixes=(plan.canary_env,),
|
||||
)
|
||||
env = {"EGRESS_TOKEN_CANARY": plan.canary}
|
||||
result = scan_known_secrets(f"exfil={plan.canary}", env=env)
|
||||
self.assertIsNotNone(result)
|
||||
assert result is not None
|
||||
self.assertEqual("block", result.severity)
|
||||
self.assertIn(plan.canary_env, result.reason)
|
||||
self.assertIn("EGRESS_TOKEN_CANARY", result.reason)
|
||||
|
||||
def test_egress_plan_canary_field_default_empty(self):
|
||||
# Verify EgressPlan can be constructed with an empty canary (backward compat).
|
||||
@@ -511,56 +504,6 @@ class TestCanaryGeneration(unittest.TestCase):
|
||||
token_env_map={},
|
||||
)
|
||||
self.assertEqual("", plan.canary)
|
||||
self.assertEqual("", plan.canary_env)
|
||||
|
||||
|
||||
class TestEgressEnvEntries(unittest.TestCase):
|
||||
def test_sidecar_entries_include_route_tokens_and_canary_scan_prefix(self):
|
||||
plan = EgressPlan(
|
||||
slug="s",
|
||||
routes_path=Path("/tmp/r.yaml"),
|
||||
routes=(EgressRoute(host="api.example"),),
|
||||
token_env_map={"EGRESS_TOKEN_1": "T1", "EGRESS_TOKEN_0": "T0"},
|
||||
canary="fake-canary-value",
|
||||
canary_env="CANON_ALPHA_SECRET",
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
(
|
||||
"EGRESS_TOKEN_0",
|
||||
"EGRESS_TOKEN_1",
|
||||
"CANON_ALPHA_SECRET=fake-canary-value",
|
||||
"BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET",
|
||||
),
|
||||
egress_sidecar_env_entries(plan),
|
||||
)
|
||||
|
||||
def test_agent_entries_include_only_canary_bait(self):
|
||||
plan = EgressPlan(
|
||||
slug="s",
|
||||
routes_path=Path("/tmp/r.yaml"),
|
||||
routes=(),
|
||||
token_env_map={},
|
||||
canary="fake-canary-value",
|
||||
canary_env="CANON_ALPHA_SECRET",
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
("CANON_ALPHA_SECRET=fake-canary-value",),
|
||||
egress_agent_env_entries(plan),
|
||||
)
|
||||
|
||||
def test_canary_entries_omitted_when_name_missing(self):
|
||||
plan = EgressPlan(
|
||||
slug="s",
|
||||
routes_path=Path("/tmp/r.yaml"),
|
||||
routes=(),
|
||||
token_env_map={},
|
||||
canary="fake-canary-value",
|
||||
)
|
||||
|
||||
self.assertEqual((), egress_sidecar_env_entries(plan))
|
||||
self.assertEqual((), egress_agent_env_entries(plan))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -32,6 +32,7 @@ from bot_bottle.egress_addon_core import (
|
||||
is_git_fetch_request,
|
||||
is_git_push_request,
|
||||
load_config,
|
||||
load_routes,
|
||||
match_route,
|
||||
outbound_scan_headers,
|
||||
parse_config,
|
||||
@@ -288,6 +289,47 @@ class TestParseDlp(unittest.TestCase):
|
||||
}]})
|
||||
|
||||
|
||||
# --- load_routes ---------------------------------------------------------
|
||||
|
||||
|
||||
class TestLoadRoutes(unittest.TestCase):
|
||||
def test_yaml_text_round_trip(self):
|
||||
routes = load_routes(
|
||||
'routes:\n'
|
||||
' - host: "api.example"\n'
|
||||
)
|
||||
self.assertEqual(1, len(routes))
|
||||
self.assertEqual("api.example", routes[0].host)
|
||||
|
||||
def test_full_route_shape_parses(self):
|
||||
routes = load_routes(
|
||||
'routes:\n'
|
||||
' - host: "api.example"\n'
|
||||
' auth_scheme: "Bearer"\n'
|
||||
' token_env: "EGRESS_TOKEN_0"\n'
|
||||
' matches:\n'
|
||||
' - paths:\n'
|
||||
' - value: "/v1/"\n'
|
||||
' - type: "exact"\n'
|
||||
' value: "/messages"\n'
|
||||
)
|
||||
self.assertEqual(1, len(routes))
|
||||
r = routes[0]
|
||||
self.assertEqual("api.example", r.host)
|
||||
self.assertEqual("Bearer", r.auth_scheme)
|
||||
self.assertEqual("EGRESS_TOKEN_0", r.token_env)
|
||||
self.assertEqual(1, len(r.matches))
|
||||
self.assertEqual(2, len(r.matches[0].paths))
|
||||
|
||||
def test_empty_routes_list(self):
|
||||
routes = load_routes("routes: []\n")
|
||||
self.assertEqual((), routes)
|
||||
|
||||
def test_invalid_yaml_raises_value_error(self):
|
||||
with self.assertRaises(ValueError):
|
||||
load_routes("routes:\n\t- host: x\n")
|
||||
|
||||
|
||||
# --- load_config / parse_config ------------------------------------------
|
||||
|
||||
|
||||
@@ -336,33 +378,6 @@ class TestLoadConfig(unittest.TestCase):
|
||||
with self.assertRaises(ValueError):
|
||||
parse_config("not a dict")
|
||||
|
||||
def test_empty_routes_list(self):
|
||||
cfg = load_config("routes: []\n")
|
||||
self.assertEqual((), cfg.routes)
|
||||
|
||||
def test_full_route_shape_parses(self):
|
||||
cfg = load_config(
|
||||
'routes:\n'
|
||||
' - host: "api.example"\n'
|
||||
' auth_scheme: "Bearer"\n'
|
||||
' token_env: "EGRESS_TOKEN_0"\n'
|
||||
' matches:\n'
|
||||
' - paths:\n'
|
||||
' - value: "/v1/"\n'
|
||||
' - type: "exact"\n'
|
||||
' value: "/messages"\n'
|
||||
)
|
||||
r = cfg.routes[0]
|
||||
self.assertEqual("api.example", r.host)
|
||||
self.assertEqual("Bearer", r.auth_scheme)
|
||||
self.assertEqual("EGRESS_TOKEN_0", r.token_env)
|
||||
self.assertEqual(1, len(r.matches))
|
||||
self.assertEqual(2, len(r.matches[0].paths))
|
||||
|
||||
def test_invalid_yaml_raises_value_error(self):
|
||||
with self.assertRaises(ValueError):
|
||||
load_config("routes:\n\t- host: x\n")
|
||||
|
||||
|
||||
# --- evaluate_matches ---------------------------------------------------
|
||||
|
||||
@@ -1259,7 +1274,7 @@ class TestBuildTokenAllowPayload(unittest.TestCase):
|
||||
payload = build_token_allow_payload("h", "GET", "/", result)
|
||||
self.assertNotIn("context:", payload)
|
||||
class TestScanOutboundEnhanced(unittest.TestCase):
|
||||
"""scan_outbound changes: binary decode, entropy detector,
|
||||
"""scan_outbound changes from prd-new: binary decode, entropy detector,
|
||||
broadened known-value prefixes, fragmentation resistance."""
|
||||
|
||||
_ROUTE = Route(host="api.example.com")
|
||||
@@ -1323,27 +1338,20 @@ class TestScanOutboundEnhanced(unittest.TestCase):
|
||||
result = scan_outbound(self._ROUTE, f"auth={secret}", env)
|
||||
self.assertIsNotNone(result)
|
||||
|
||||
def test_canary_detected_via_random_secret_env_name(self):
|
||||
# The fake secret uses a randomized env name that the sidecar marks
|
||||
# as sensitive through BOT_BOTTLE_SENSITIVE_PREFIXES.
|
||||
def test_canary_detected_via_egress_token_canary(self):
|
||||
# The canary (injected as EGRESS_TOKEN_CANARY) is caught by known_secrets.
|
||||
canary = "canaryvalue12345abcdef"
|
||||
env = {
|
||||
"CANON_ALPHA_SECRET": canary,
|
||||
"BOT_BOTTLE_SENSITIVE_PREFIXES": "CANON_ALPHA_SECRET",
|
||||
}
|
||||
env = {"EGRESS_TOKEN_CANARY": canary}
|
||||
result = scan_outbound(self._ROUTE, f"data={canary}", env)
|
||||
self.assertIsNotNone(result)
|
||||
assert result is not None
|
||||
self.assertEqual("block", result.severity)
|
||||
self.assertIn("CANON_ALPHA_SECRET", result.reason)
|
||||
self.assertIn("EGRESS_TOKEN_CANARY", result.reason)
|
||||
|
||||
def test_fragmented_canary_blocked(self):
|
||||
# Canary with separators injected is still caught.
|
||||
canary = "supersecretcanary99"
|
||||
env = {
|
||||
"CANON_ALPHA_SECRET": canary,
|
||||
"BOT_BOTTLE_SENSITIVE_PREFIXES": "CANON_ALPHA_SECRET",
|
||||
}
|
||||
env = {"EGRESS_TOKEN_CANARY": canary}
|
||||
fragmented = "-".join(canary)
|
||||
result = scan_outbound(self._ROUTE, f"x={fragmented}", env)
|
||||
self.assertIsNotNone(result)
|
||||
|
||||
@@ -1,274 +0,0 @@
|
||||
"""Unit: LOG_FULL credential redaction in _log_request / _log_response (issue #257).
|
||||
|
||||
egress_addon.py is sidecar-only code that depends on mitmproxy, which is
|
||||
not installed on the host. This file pre-populates sys.modules with the
|
||||
minimum mocks needed so EgressAddon can be imported and tested without the
|
||||
real mitmproxy package."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sys
|
||||
import types
|
||||
import unittest
|
||||
from io import StringIO
|
||||
from typing import Any
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Sidecar-import shims — must run before importing egress_addon
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _ensure_shims() -> None:
|
||||
if "mitmproxy" not in sys.modules:
|
||||
_mm = types.ModuleType("mitmproxy")
|
||||
_mh = types.ModuleType("mitmproxy.http")
|
||||
setattr(_mm, "http", _mh)
|
||||
sys.modules["mitmproxy"] = _mm
|
||||
sys.modules["mitmproxy.http"] = _mh
|
||||
if "egress_addon_core" not in sys.modules:
|
||||
import bot_bottle.egress_addon_core as _core
|
||||
sys.modules["egress_addon_core"] = _core
|
||||
|
||||
|
||||
_ensure_shims()
|
||||
|
||||
from bot_bottle.egress_addon import EgressAddon # noqa: E402 (import after shims)
|
||||
from bot_bottle.egress_addon_core import Config, LOG_FULL # noqa: E402
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _addon() -> EgressAddon:
|
||||
"""Return a bare EgressAddon with LOG_FULL config and no routes file."""
|
||||
a: EgressAddon = EgressAddon.__new__(EgressAddon)
|
||||
a.config = Config(routes=(), log=LOG_FULL)
|
||||
a.safe_tokens = set()
|
||||
a._supervise_queue_dir = ""
|
||||
a._supervise_slug = ""
|
||||
a._token_allow_timeout = 300.0
|
||||
return a
|
||||
|
||||
|
||||
class _Headers:
|
||||
def __init__(self, d: dict[str, str]) -> None:
|
||||
self._d = d
|
||||
|
||||
def items(self) -> list[tuple[str, str]]:
|
||||
return list(self._d.items())
|
||||
|
||||
|
||||
class _Request:
|
||||
def __init__(
|
||||
self,
|
||||
host: str = "api.example.com",
|
||||
method: str = "POST",
|
||||
path: str = "/v1/messages",
|
||||
headers: dict[str, str] | None = None,
|
||||
body: str = "",
|
||||
) -> None:
|
||||
self.pretty_host = host
|
||||
self.method = method
|
||||
self.path = path
|
||||
self.headers = _Headers(headers or {})
|
||||
self._body = body
|
||||
|
||||
def get_text(self, *, strict: bool = True) -> str:
|
||||
return self._body
|
||||
|
||||
|
||||
class _Response:
|
||||
def __init__(
|
||||
self,
|
||||
status_code: int = 200,
|
||||
headers: dict[str, str] | None = None,
|
||||
body: str = "",
|
||||
) -> None:
|
||||
self.status_code = status_code
|
||||
self.headers = _Headers(headers or {})
|
||||
self._body = body
|
||||
|
||||
def get_text(self, *, strict: bool = True) -> str:
|
||||
return self._body
|
||||
|
||||
|
||||
class _Flow:
|
||||
def __init__(
|
||||
self,
|
||||
request: _Request | None = None,
|
||||
response: _Response | None = None,
|
||||
) -> None:
|
||||
self.request = request or _Request()
|
||||
self.response = response or _Response()
|
||||
|
||||
|
||||
def _log_request(addon: EgressAddon, flow: _Flow) -> dict[str, Any]:
|
||||
buf = StringIO()
|
||||
with patch("sys.stderr", buf):
|
||||
addon._log_request(flow) # type: ignore[arg-type]
|
||||
return json.loads(buf.getvalue())
|
||||
|
||||
|
||||
def _log_response(addon: EgressAddon, flow: _Flow) -> dict[str, Any]:
|
||||
buf = StringIO()
|
||||
with patch("sys.stderr", buf):
|
||||
addon._log_response(flow) # type: ignore[arg-type]
|
||||
return json.loads(buf.getvalue())
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _log_request — authorization header stripped
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestLogRequestAuthorizationStripped(unittest.TestCase):
|
||||
def test_lowercase_authorization_excluded(self) -> None:
|
||||
addon = _addon()
|
||||
flow = _Flow(request=_Request(headers={"authorization": "Bearer sk-real-secret"}))
|
||||
entry = _log_request(addon, flow)
|
||||
self.assertNotIn("authorization", entry["headers"])
|
||||
|
||||
def test_titlecase_authorization_excluded(self) -> None:
|
||||
addon = _addon()
|
||||
flow = _Flow(request=_Request(headers={"Authorization": "Bearer sk-real-secret"}))
|
||||
entry = _log_request(addon, flow)
|
||||
self.assertNotIn("Authorization", entry["headers"])
|
||||
self.assertNotIn("authorization", entry["headers"])
|
||||
|
||||
def test_non_auth_headers_retained(self) -> None:
|
||||
addon = _addon()
|
||||
flow = _Flow(request=_Request(headers={
|
||||
"authorization": "Bearer sk-real-secret",
|
||||
"content-type": "application/json",
|
||||
}))
|
||||
entry = _log_request(addon, flow)
|
||||
self.assertIn("content-type", entry["headers"])
|
||||
self.assertEqual("application/json", entry["headers"]["content-type"])
|
||||
|
||||
def test_no_authorization_header_logs_all_others(self) -> None:
|
||||
addon = _addon()
|
||||
flow = _Flow(request=_Request(headers={"x-request-id": "abc"}))
|
||||
entry = _log_request(addon, flow)
|
||||
self.assertEqual({"x-request-id": "abc"}, entry["headers"])
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _log_request — body redaction
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
_OPENAI_KEY = "sk-" + "A" * 48
|
||||
|
||||
|
||||
class TestLogRequestBodyRedacted(unittest.TestCase):
|
||||
def test_token_pattern_in_body_scrubbed(self) -> None:
|
||||
addon = _addon()
|
||||
flow = _Flow(request=_Request(body=f"key={_OPENAI_KEY}"))
|
||||
entry = _log_request(addon, flow)
|
||||
self.assertNotIn(_OPENAI_KEY, entry["body"])
|
||||
self.assertIn("********", entry["body"])
|
||||
|
||||
def test_provisioned_secret_in_body_scrubbed(self) -> None:
|
||||
addon = _addon()
|
||||
secret = "provisioned-egress-secret-xyz"
|
||||
flow = _Flow(request=_Request(body=f"token={secret}"))
|
||||
with patch.dict("os.environ", {"EGRESS_TOKEN_0": secret}):
|
||||
entry = _log_request(addon, flow)
|
||||
self.assertNotIn(secret, entry["body"])
|
||||
self.assertIn("********", entry["body"])
|
||||
|
||||
def test_clean_body_preserved(self) -> None:
|
||||
addon = _addon()
|
||||
payload = '{"model": "claude-3", "max_tokens": 1024}'
|
||||
flow = _Flow(request=_Request(body=payload))
|
||||
entry = _log_request(addon, flow)
|
||||
self.assertEqual(payload, entry["body"])
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _log_request — non-authorization header value redaction
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestLogRequestHeaderValuesRedacted(unittest.TestCase):
|
||||
def test_token_in_custom_header_scrubbed(self) -> None:
|
||||
addon = _addon()
|
||||
flow = _Flow(request=_Request(headers={"x-api-key": _OPENAI_KEY}))
|
||||
entry = _log_request(addon, flow)
|
||||
self.assertNotIn(_OPENAI_KEY, entry["headers"].get("x-api-key", ""))
|
||||
self.assertIn("********", entry["headers"].get("x-api-key", ""))
|
||||
|
||||
def test_clean_header_value_preserved(self) -> None:
|
||||
addon = _addon()
|
||||
flow = _Flow(request=_Request(headers={"accept": "application/json"}))
|
||||
entry = _log_request(addon, flow)
|
||||
self.assertEqual("application/json", entry["headers"]["accept"])
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _log_response — body redaction
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestLogResponseBodyRedacted(unittest.TestCase):
|
||||
def test_token_pattern_in_response_body_scrubbed(self) -> None:
|
||||
addon = _addon()
|
||||
flow = _Flow(
|
||||
request=_Request(),
|
||||
response=_Response(body=f'{{"key": "{_OPENAI_KEY}"}}'),
|
||||
)
|
||||
entry = _log_response(addon, flow)
|
||||
self.assertNotIn(_OPENAI_KEY, entry["body"])
|
||||
self.assertIn("********", entry["body"])
|
||||
|
||||
def test_provisioned_secret_in_response_body_scrubbed(self) -> None:
|
||||
addon = _addon()
|
||||
secret = "provisioned-egress-secret-xyz"
|
||||
flow = _Flow(
|
||||
request=_Request(),
|
||||
response=_Response(body=f'{{"token": "{secret}"}}'),
|
||||
)
|
||||
with patch.dict("os.environ", {"EGRESS_TOKEN_0": secret}):
|
||||
entry = _log_response(addon, flow)
|
||||
self.assertNotIn(secret, entry["body"])
|
||||
self.assertIn("********", entry["body"])
|
||||
|
||||
def test_clean_response_body_preserved(self) -> None:
|
||||
addon = _addon()
|
||||
flow = _Flow(request=_Request(), response=_Response(body='{"result": "ok"}'))
|
||||
entry = _log_response(addon, flow)
|
||||
self.assertEqual('{"result": "ok"}', entry["body"])
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _log_response — response header value redaction
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestLogResponseHeaderValuesRedacted(unittest.TestCase):
|
||||
def test_token_in_response_header_scrubbed(self) -> None:
|
||||
addon = _addon()
|
||||
flow = _Flow(
|
||||
request=_Request(),
|
||||
response=_Response(headers={"set-cookie": f"token={_OPENAI_KEY}"}),
|
||||
)
|
||||
entry = _log_response(addon, flow)
|
||||
cookie_val = entry["headers"].get("set-cookie", "")
|
||||
self.assertNotIn(_OPENAI_KEY, cookie_val)
|
||||
self.assertIn("********", cookie_val)
|
||||
|
||||
def test_clean_response_header_preserved(self) -> None:
|
||||
addon = _addon()
|
||||
flow = _Flow(
|
||||
request=_Request(),
|
||||
response=_Response(headers={"content-type": "application/json"}),
|
||||
)
|
||||
entry = _log_response(addon, flow)
|
||||
self.assertEqual("application/json", entry["headers"]["content-type"])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -54,15 +54,6 @@ class TestValidateRoutesContent(unittest.TestCase):
|
||||
' auth_scheme: "Bearer"\n'
|
||||
)
|
||||
|
||||
def test_rejects_log_full(self):
|
||||
with self.assertRaises(EgressApplyError) as cm:
|
||||
applicator.validate_routes_content(
|
||||
'log: 2\n'
|
||||
'routes:\n'
|
||||
' - host: "x.example"\n'
|
||||
)
|
||||
self.assertIn("must not change egress logging", str(cm.exception))
|
||||
|
||||
|
||||
class TestApplyRoutesChange(unittest.TestCase):
|
||||
def setUp(self):
|
||||
|
||||
@@ -30,7 +30,6 @@ def _plan(
|
||||
supervise: bool = False,
|
||||
agent_git_gate_url: str = "",
|
||||
agent_supervise_url: str = "",
|
||||
canary: bool = False,
|
||||
) -> MacosContainerBottlePlan:
|
||||
routes_path = stage_dir / "routes.yaml"
|
||||
routes_path.write_text("routes: []\n", encoding="utf-8")
|
||||
@@ -43,8 +42,7 @@ def _plan(
|
||||
routes_path=routes_path,
|
||||
routes=("route",),
|
||||
token_env_map={"EGRESS_TOKEN_0": "HOST_TOKEN"},
|
||||
canary="fake-canary-value" if canary else "",
|
||||
canary_env="CANON_ALPHA_SECRET" if canary else "",
|
||||
canary="",
|
||||
)
|
||||
if git:
|
||||
key_path = stage_dir / "origin-key"
|
||||
@@ -141,26 +139,6 @@ class TestMacosContainerLaunchArgv(unittest.TestCase):
|
||||
argv,
|
||||
)
|
||||
|
||||
def test_sidecar_argv_registers_canary_env_as_sensitive(self):
|
||||
plan = _plan(stage_dir=self.stage_dir, canary=True)
|
||||
argv = launch._sidecar_run_argv(
|
||||
plan,
|
||||
"bot-bottle-sidecars-dev-abc",
|
||||
"bot-bottle-net-dev-abc",
|
||||
"bot-bottle-egress-dev-abc",
|
||||
)
|
||||
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", argv)
|
||||
self.assertIn("BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET", argv)
|
||||
|
||||
def test_agent_argv_receives_canary_env(self):
|
||||
plan = _plan(stage_dir=self.stage_dir, canary=True)
|
||||
argv = launch._agent_run_argv(
|
||||
plan,
|
||||
"bot-bottle-net-dev-abc",
|
||||
"192.0.2.10",
|
||||
)
|
||||
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", argv)
|
||||
|
||||
def test_agent_env_points_proxy_at_sidecar_ip(self):
|
||||
plan = _plan(
|
||||
stage_dir=self.stage_dir,
|
||||
|
||||
@@ -73,33 +73,6 @@ resolver #2
|
||||
)
|
||||
self.assertTrue(run.call_args_list[-1].kwargs["check"])
|
||||
|
||||
def test_build_image_anchors_relative_dockerfile_to_context(self):
|
||||
status = util.subprocess.CompletedProcess(
|
||||
args=[],
|
||||
returncode=0,
|
||||
stdout=(
|
||||
'[{"status":{"state":"running"},'
|
||||
'"configuration":{"dns":{"nameservers":["9.9.9.9"]}}}]'
|
||||
),
|
||||
stderr="",
|
||||
)
|
||||
with patch.object(util.subprocess, "run", return_value=status) as run, \
|
||||
patch.object(util.os, "environ", {
|
||||
"BOT_BOTTLE_MACOS_CONTAINER_DNS": "9.9.9.9",
|
||||
}):
|
||||
util.build_image(
|
||||
"bot-bottle-sidecars:latest",
|
||||
"/repo",
|
||||
dockerfile="Dockerfile.sidecars",
|
||||
)
|
||||
self.assertEqual(
|
||||
[
|
||||
"container", "build", "-t", "bot-bottle-sidecars:latest",
|
||||
"--dns", "9.9.9.9", "-f", "/repo/Dockerfile.sidecars", "/repo",
|
||||
],
|
||||
run.call_args_list[-1].args[0],
|
||||
)
|
||||
|
||||
def test_commit_container_execs_tar_and_builds_image(self):
|
||||
# stderr is bytes because subprocess.run uses stderr=PIPE without text=True
|
||||
completed = util.subprocess.CompletedProcess(
|
||||
|
||||
@@ -167,40 +167,13 @@ class TestAgentProviderHostCredentials(unittest.TestCase):
|
||||
},
|
||||
})
|
||||
|
||||
def test_startup_args_allowed_for_claude(self):
|
||||
b = _provider_config_bottle({
|
||||
"template": "claude",
|
||||
"settings": {"startup_args": ["--model", "opus"]},
|
||||
})
|
||||
self.assertEqual(
|
||||
{"startup_args": ["--model", "opus"]},
|
||||
b.agent_provider.settings,
|
||||
)
|
||||
|
||||
def test_startup_args_allowed_for_codex(self):
|
||||
b = _provider_config_bottle({
|
||||
"template": "codex",
|
||||
"settings": {"startup_args": ["--model", "gpt-5-codex"]},
|
||||
})
|
||||
self.assertEqual(
|
||||
{"startup_args": ["--model", "gpt-5-codex"]},
|
||||
b.agent_provider.settings,
|
||||
)
|
||||
|
||||
def test_provider_specific_settings_still_rejected_for_claude(self):
|
||||
def test_settings_rejected_for_claude(self):
|
||||
with self.assertRaises(ManifestError):
|
||||
_provider_config_bottle({
|
||||
"template": "claude",
|
||||
"settings": {"models": ["qwen2.5-coder:7b"]},
|
||||
})
|
||||
|
||||
def test_startup_args_must_be_string_array(self):
|
||||
with self.assertRaises(ManifestError):
|
||||
_provider_config_bottle({
|
||||
"template": "codex",
|
||||
"settings": {"startup_args": ["--model", 42]},
|
||||
})
|
||||
|
||||
def test_settings_models_must_be_non_empty_string_array(self):
|
||||
with self.assertRaises(ManifestError):
|
||||
_provider_config_bottle({
|
||||
|
||||
@@ -130,7 +130,7 @@ def _capture_print(plan: DockerBottlePlan | SmolmachinesBottlePlan) -> list[str]
|
||||
orig = sys.stderr
|
||||
sys.stderr = buf
|
||||
try:
|
||||
plan.print()
|
||||
plan.print(remote_control=False)
|
||||
finally:
|
||||
sys.stderr = orig
|
||||
return buf.getvalue().splitlines()
|
||||
|
||||
@@ -26,7 +26,9 @@ from bot_bottle.backend.smolmachines.bottle import SmolmachinesBottle
|
||||
from bot_bottle.backend.smolmachines.bottle_plan import (
|
||||
SmolmachinesBottlePlan,
|
||||
)
|
||||
from bot_bottle.backend.smolmachines import launch as _launch
|
||||
# from bot_bottle.backend.smolmachines.provision import (
|
||||
# workspace as _workspace,
|
||||
# )
|
||||
from bot_bottle.backend.smolmachines.launch import _bundle_launch_spec
|
||||
from bot_bottle.backend.util import AGENT_CA_PATH
|
||||
from bot_bottle.egress import EgressPlan, EgressRoute
|
||||
@@ -42,6 +44,7 @@ class _Provider(AgentProvider):
|
||||
return AgentProviderRuntime(
|
||||
template="test", command="test", image="",
|
||||
prompt_mode="append_file", bypass_args=(), resume_args=(),
|
||||
remote_control_args=(),
|
||||
)
|
||||
def provision_plan(self, **kwargs): # type: ignore[override]
|
||||
raise NotImplementedError
|
||||
@@ -83,7 +86,6 @@ def _plan(
|
||||
stage_dir: Path | None = None,
|
||||
egress_routes: tuple[EgressRoute, ...] = (),
|
||||
egress_ca_path: Path = Path(),
|
||||
canary: bool = False,
|
||||
supervise: bool = False,
|
||||
bundle_ip: str = "192.168.50.2",
|
||||
agent_git_gate_host: str = "127.0.0.1:55555",
|
||||
@@ -154,8 +156,6 @@ def _plan(
|
||||
routes=egress_routes,
|
||||
token_env_map={},
|
||||
mitmproxy_ca_cert_only_host_path=egress_ca_path,
|
||||
canary="fake-canary-value" if canary else "",
|
||||
canary_env="CANON_ALPHA_SECRET" if canary else "",
|
||||
),
|
||||
supervise_plan=supervise_plan,
|
||||
agent_git_gate_host=agent_git_gate_host,
|
||||
@@ -411,31 +411,6 @@ class TestBundleLaunchSpec(unittest.TestCase):
|
||||
self.assertIn(9420, spec.ports_to_publish)
|
||||
self.assertNotIn(9418, spec.ports_to_publish)
|
||||
|
||||
def test_canary_env_registered_as_sensitive_in_bundle(self):
|
||||
plan = _plan(canary=True)
|
||||
|
||||
spec = _bundle_launch_spec(plan, "net", "127.0.0.16")
|
||||
|
||||
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", spec.environment)
|
||||
self.assertIn(
|
||||
"BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET",
|
||||
spec.environment,
|
||||
)
|
||||
|
||||
def test_canary_env_visible_to_smolvm_guest(self):
|
||||
plan = _plan(canary=True)
|
||||
with patch.object(
|
||||
_launch._bundle,
|
||||
"bundle_host_port",
|
||||
return_value="65000",
|
||||
):
|
||||
stamped = _launch._discover_urls(plan, "127.0.0.16")
|
||||
|
||||
self.assertEqual(
|
||||
"fake-canary-value",
|
||||
stamped.guest_env["CANON_ALPHA_SECRET"],
|
||||
)
|
||||
|
||||
|
||||
class TestProvisionGitUser(unittest.TestCase):
|
||||
"""`provision_git` runs `git config --global` inside the
|
||||
|
||||
@@ -20,7 +20,6 @@ import supervise as _sv # noqa: E402 # type: ignore
|
||||
|
||||
from bot_bottle import supervise_server # noqa: E402
|
||||
from bot_bottle.supervise_server import (
|
||||
ERR_INTERNAL,
|
||||
ERR_INVALID_PARAMS,
|
||||
ERR_INVALID_REQUEST,
|
||||
ERR_METHOD_NOT_FOUND,
|
||||
@@ -30,9 +29,7 @@ from bot_bottle.supervise_server import (
|
||||
PROPOSED_FILE_FIELD,
|
||||
ServerConfig,
|
||||
TOOL_DEFINITIONS,
|
||||
_RpcClientError,
|
||||
_RpcError,
|
||||
_RpcInternalError,
|
||||
_response_timeout_from_env,
|
||||
format_response_text,
|
||||
handle_initialize,
|
||||
@@ -50,15 +47,15 @@ from bot_bottle.supervise_server import (
|
||||
|
||||
|
||||
class TestValidation(unittest.TestCase):
|
||||
def test_capability_block_accepts_anything_nonempty(self):
|
||||
validate_proposed_file(
|
||||
_sv.TOOL_CAPABILITY_BLOCK,
|
||||
"FROM python:3.13\nRUN apk add git\n",
|
||||
)
|
||||
|
||||
def test_empty_proposed_file_rejected_for_tools_with_file_field(self):
|
||||
with self.assertRaises(_RpcError):
|
||||
validate_proposed_file(_sv.TOOL_EGRESS_ALLOW, " \n\t")
|
||||
|
||||
def test_capability_block_rejected_as_unknown_tool(self):
|
||||
with self.assertRaises(_RpcError) as cm:
|
||||
validate_proposed_file("capability-block", "FROM python:3.13\n")
|
||||
self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code)
|
||||
self.assertIn("unknown tool", cm.exception.message)
|
||||
validate_proposed_file(_sv.TOOL_CAPABILITY_BLOCK, " \n\t")
|
||||
|
||||
def test_egress_routes_yaml_is_validated(self):
|
||||
validate_proposed_file(
|
||||
@@ -70,74 +67,6 @@ class TestValidation(unittest.TestCase):
|
||||
with self.assertRaises(_RpcError):
|
||||
validate_proposed_file(_sv.TOOL_EGRESS_BLOCK, "routes: nope\n")
|
||||
|
||||
def test_egress_routes_yaml_rejects_log_full(self):
|
||||
with self.assertRaises(_RpcError) as cm:
|
||||
validate_proposed_file(
|
||||
_sv.TOOL_EGRESS_ALLOW,
|
||||
"log: 2\nroutes:\n - host: example.com\n",
|
||||
)
|
||||
self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code)
|
||||
self.assertIn("must not change egress logging", cm.exception.message)
|
||||
|
||||
|
||||
# --- Error taxonomy --------------------------------------------------------
|
||||
|
||||
|
||||
class TestRpcErrorTaxonomy(unittest.TestCase):
|
||||
def test_rpc_client_error_is_rpc_error(self):
|
||||
e = _RpcClientError(ERR_INVALID_PARAMS, "bad param")
|
||||
self.assertIsInstance(e, _RpcError)
|
||||
self.assertEqual(ERR_INVALID_PARAMS, e.code)
|
||||
self.assertEqual("bad param", e.message)
|
||||
|
||||
def test_rpc_internal_error_is_rpc_error(self):
|
||||
e = _RpcInternalError("disk full")
|
||||
self.assertIsInstance(e, _RpcError)
|
||||
self.assertEqual(ERR_INTERNAL, e.code)
|
||||
self.assertEqual("disk full", e.message)
|
||||
|
||||
def test_rpc_internal_error_preserves_cause(self):
|
||||
cause = OSError("no space left on device")
|
||||
try:
|
||||
raise _RpcInternalError("failed to write") from cause
|
||||
except _RpcInternalError as e:
|
||||
self.assertIs(cause, e.__cause__)
|
||||
|
||||
def test_parse_error_is_client_error(self):
|
||||
with self.assertRaises(_RpcClientError):
|
||||
parse_jsonrpc(b"{bad json")
|
||||
|
||||
def test_validation_error_is_client_error(self):
|
||||
with self.assertRaises(_RpcClientError):
|
||||
validate_proposed_file(_sv.TOOL_EGRESS_ALLOW, "routes: nope\n")
|
||||
|
||||
def test_unknown_tool_in_tools_call_is_client_error(self):
|
||||
config = ServerConfig(bottle_slug="dev", queue_dir=Path("/unused"))
|
||||
with self.assertRaises(_RpcClientError) as cm:
|
||||
handle_tools_call({"name": "no-such-tool", "arguments": {}}, config)
|
||||
self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code)
|
||||
|
||||
|
||||
class TestRpcInternalErrorOnIoFailure(unittest.TestCase):
|
||||
def test_write_proposal_os_error_raises_internal(self):
|
||||
config = ServerConfig(
|
||||
bottle_slug="dev",
|
||||
queue_dir=Path("/dev/null/cannot-exist"),
|
||||
)
|
||||
with self.assertRaises(_RpcInternalError) as cm:
|
||||
handle_tools_call(
|
||||
{
|
||||
"name": _sv.TOOL_EGRESS_ALLOW,
|
||||
"arguments": {
|
||||
"routes_yaml": "routes:\n - host: example.com\n",
|
||||
"justification": "x",
|
||||
},
|
||||
},
|
||||
config,
|
||||
)
|
||||
self.assertEqual(ERR_INTERNAL, cm.exception.code)
|
||||
self.assertIsNotNone(cm.exception.__cause__)
|
||||
|
||||
|
||||
# --- JSON-RPC parsing ------------------------------------------------------
|
||||
|
||||
@@ -219,6 +148,7 @@ class TestHandleToolsList(unittest.TestCase):
|
||||
self.assertEqual(
|
||||
sorted([
|
||||
_sv.TOOL_EGRESS_ALLOW,
|
||||
_sv.TOOL_CAPABILITY_BLOCK,
|
||||
_sv.TOOL_EGRESS_BLOCK,
|
||||
_sv.TOOL_LIST_EGRESS_ROUTES,
|
||||
]),
|
||||
@@ -294,10 +224,10 @@ class TestHandleToolsCall(unittest.TestCase):
|
||||
try:
|
||||
result = handle_tools_call(
|
||||
{
|
||||
"name": _sv.TOOL_EGRESS_BLOCK,
|
||||
"name": _sv.TOOL_CAPABILITY_BLOCK,
|
||||
"arguments": {
|
||||
"routes_yaml": "routes:\n - host: example.com\n",
|
||||
"justification": "need example.com",
|
||||
"dockerfile": "FROM python:3.13\n",
|
||||
"justification": "need git",
|
||||
},
|
||||
},
|
||||
self.config,
|
||||
@@ -334,9 +264,9 @@ class TestHandleToolsCall(unittest.TestCase):
|
||||
try:
|
||||
result = handle_tools_call(
|
||||
{
|
||||
"name": _sv.TOOL_EGRESS_ALLOW,
|
||||
"name": _sv.TOOL_CAPABILITY_BLOCK,
|
||||
"arguments": {
|
||||
"routes_yaml": "routes:\n - host: example.com\n",
|
||||
"dockerfile": "FROM python:3.13\n",
|
||||
"justification": "needed for tests",
|
||||
},
|
||||
},
|
||||
@@ -358,52 +288,20 @@ class TestHandleToolsCall(unittest.TestCase):
|
||||
with self.assertRaises(_RpcError):
|
||||
handle_tools_call(
|
||||
{
|
||||
"name": _sv.TOOL_EGRESS_ALLOW,
|
||||
"arguments": {"routes_yaml": "routes:\n - host: example.com\n"},
|
||||
"name": _sv.TOOL_CAPABILITY_BLOCK,
|
||||
"arguments": {"dockerfile": "FROM python:3.13\n"},
|
||||
},
|
||||
self.config,
|
||||
)
|
||||
|
||||
def test_missing_name_raises(self):
|
||||
with self.assertRaises(_RpcError) as cm:
|
||||
handle_tools_call({"arguments": {}}, self.config)
|
||||
self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code)
|
||||
|
||||
def test_arguments_must_be_object(self):
|
||||
with self.assertRaises(_RpcError) as cm:
|
||||
handle_tools_call(
|
||||
{
|
||||
"name": _sv.TOOL_EGRESS_ALLOW,
|
||||
"arguments": [],
|
||||
},
|
||||
self.config,
|
||||
)
|
||||
self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code)
|
||||
self.assertIn("must be an object", cm.exception.message)
|
||||
|
||||
def test_capability_block_call_raises_unknown_tool(self):
|
||||
with self.assertRaises(_RpcError) as cm:
|
||||
handle_tools_call(
|
||||
{
|
||||
"name": "capability-block",
|
||||
"arguments": {
|
||||
"dockerfile": "FROM python:3.13\n",
|
||||
"justification": "need git",
|
||||
},
|
||||
},
|
||||
self.config,
|
||||
)
|
||||
self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code)
|
||||
self.assertIn("unknown tool", cm.exception.message)
|
||||
|
||||
def test_archives_proposal_after_response(self):
|
||||
responder = self._respond_when_proposal_appears(_sv.STATUS_APPROVED)
|
||||
try:
|
||||
handle_tools_call(
|
||||
{
|
||||
"name": _sv.TOOL_EGRESS_ALLOW,
|
||||
"name": _sv.TOOL_CAPABILITY_BLOCK,
|
||||
"arguments": {
|
||||
"routes_yaml": "routes:\n - host: example.com\n",
|
||||
"dockerfile": "FROM python:3.13\n",
|
||||
"justification": "x",
|
||||
},
|
||||
},
|
||||
@@ -425,10 +323,10 @@ class TestHandleToolsCall(unittest.TestCase):
|
||||
)
|
||||
result = handle_tools_call(
|
||||
{
|
||||
"name": _sv.TOOL_EGRESS_ALLOW,
|
||||
"name": _sv.TOOL_CAPABILITY_BLOCK,
|
||||
"arguments": {
|
||||
"routes_yaml": "routes:\n - host: example.com\n",
|
||||
"justification": "need egress",
|
||||
"dockerfile": "FROM python:3.13\n",
|
||||
"justification": "need a capability",
|
||||
},
|
||||
},
|
||||
config,
|
||||
@@ -443,31 +341,6 @@ class TestHandleToolsCall(unittest.TestCase):
|
||||
|
||||
|
||||
class TestHandleListEgressRoutes(unittest.TestCase):
|
||||
def test_success_returns_body_text(self):
|
||||
class _Resp:
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type: type[BaseException] | None, exc: BaseException | None, tb: object) -> bool:
|
||||
return False
|
||||
|
||||
def read(self):
|
||||
return b"[{\"host\": \"example.com\"}]"
|
||||
|
||||
class _Opener:
|
||||
def open(self, *args, **kwargs): # noqa: ANN001, ANN002, ANN003 # type: ignore
|
||||
return _Resp()
|
||||
|
||||
with patch.object(supervise_server.urllib.request, "build_opener", return_value=_Opener()):
|
||||
result = handle_list_egress_routes(
|
||||
{},
|
||||
ServerConfig(bottle_slug="dev", queue_dir=Path("/unused")),
|
||||
)
|
||||
|
||||
self.assertFalse(result["isError"]) # type: ignore[index]
|
||||
text = result["content"][0]["text"] # type: ignore[index]
|
||||
self.assertIn("example.com", text)
|
||||
|
||||
def test_url_error_returns_tool_error(self):
|
||||
class _Opener:
|
||||
def open(self, *args, **kwargs): # noqa: ANN001, ANN002, ANN003 # type: ignore
|
||||
@@ -527,13 +400,6 @@ class TestFormatResponseText(unittest.TestCase):
|
||||
self.assertIn("the operator modified", text.lower())
|
||||
|
||||
|
||||
class TestFormatPendingResponseText(unittest.TestCase):
|
||||
def test_formats_timeout_message(self):
|
||||
text = supervise_server.format_pending_response_text(12.5)
|
||||
self.assertIn("status: pending", text)
|
||||
self.assertIn("12.5s", text)
|
||||
|
||||
|
||||
# --- End-to-end HTTP sanity ------------------------------------------------
|
||||
|
||||
|
||||
@@ -584,7 +450,7 @@ class TestHttpEndToEnd(unittest.TestCase):
|
||||
self.assertEqual("2.0", result["jsonrpc"])
|
||||
self.assertEqual(1, result["id"])
|
||||
names = [t["name"] for t in result["result"]["tools"]] # type: ignore[index]
|
||||
self.assertNotIn("capability-block", names)
|
||||
self.assertIn(_sv.TOOL_CAPABILITY_BLOCK, names)
|
||||
self.assertIn(_sv.TOOL_EGRESS_ALLOW, names)
|
||||
self.assertIn(_sv.TOOL_EGRESS_BLOCK, names)
|
||||
|
||||
@@ -594,26 +460,6 @@ class TestHttpEndToEnd(unittest.TestCase):
|
||||
)
|
||||
self.assertEqual(ERR_METHOD_NOT_FOUND, result["error"]["code"]) # type: ignore[index]
|
||||
|
||||
def test_internal_error_returns_err_internal_over_http(self):
|
||||
with patch.object(
|
||||
supervise_server._sv, "write_proposal",
|
||||
side_effect=OSError("disk full"),
|
||||
):
|
||||
result = self._post_jsonrpc({
|
||||
"jsonrpc": "2.0",
|
||||
"id": 99,
|
||||
"method": "tools/call",
|
||||
"params": {
|
||||
"name": _sv.TOOL_EGRESS_ALLOW,
|
||||
"arguments": {
|
||||
"routes_yaml": "routes:\n - host: example.com\n",
|
||||
"justification": "x",
|
||||
},
|
||||
},
|
||||
})
|
||||
self.assertIn("error", result)
|
||||
self.assertEqual(ERR_INTERNAL, result["error"]["code"]) # type: ignore[index]
|
||||
|
||||
def test_health_endpoint(self):
|
||||
conn = http.client.HTTPConnection("127.0.0.1", self.port, timeout=5)
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user