Compare commits

..

8 Commits

Author SHA1 Message Date
didericis-claude cadaa5dc25 refactor(types): move loaded manifest from BottleSpec to BottlePlan
lint / lint (push) Successful in 1m44s
test / unit (pull_request) Successful in 34s
test / integration (pull_request) Successful in 19s
BottleSpec.manifest was ManifestIndex | Manifest — a union encoding
two lifecycle stages in one field. The union was unjustifiable:
it forced a type-narrowing workaround (loaded_manifest property)
on every consumer.

Clean split:
- BottleSpec.manifest: ManifestIndex (always; CLI-supplied intent)
- BottlePlan.manifest: Manifest (always; loaded by _validate())

_validate() returns the loaded Manifest directly. prepare() passes
it to _resolve_plan(), which stores it on the plan. All provisioner
code now reads plan.manifest.agent / plan.manifest.bottle — no
union, no asserts, no type: ignore.
2026-06-23 02:22:10 +00:00
didericis-claude 910b601a5e fix(types): add BottleSpec.loaded_manifest to satisfy pyright on union type
lint / lint (push) Successful in 1m43s
test / unit (pull_request) Successful in 36s
test / integration (pull_request) Successful in 18s
BottleSpec.manifest is ManifestIndex | Manifest (pre/post _validate()).
Downstream code always runs post-validate so it needs Manifest, but
pyright flagged every .agent/.bottle access. The new loaded_manifest
property asserts isinstance and returns Manifest, giving pyright a
narrowed type without scattering type: ignore everywhere.

Also remove unused Manifest imports from test files and annotate the
_index() helper in test_manifest_agent_git_user.
2026-06-23 02:08:27 +00:00
didericis-claude fa0a5fbb9c refactor(manifest): split Manifest into ManifestIndex + Manifest single-value type
lint / lint (push) Failing after 1m42s
test / unit (pull_request) Successful in 35s
test / integration (pull_request) Successful in 18s
Manifest now holds exactly one agent and one effective bottle (with
git_user overlay already applied). The old multi-agent/bottle
collection is renamed ManifestIndex. BottleSpec.manifest starts as
ManifestIndex from the CLI and becomes Manifest after _validate()
calls load_for_agent(); all provisioning code downstream reads
spec.manifest.agent / spec.manifest.bottle instead of indexing by name.
2026-06-23 00:56:30 +00:00
didericis-claude cd93fc71f7 docs: clarify load_for_agent invariant in docstring
lint / lint (push) Successful in 1m47s
test / unit (pull_request) Successful in 31s
test / integration (pull_request) Successful in 17s
2026-06-22 19:54:41 +00:00
didericis-claude 3e94472e78 fix: load_for_agent always returns single-agent manifest
lint / lint (push) Successful in 1m42s
test / unit (pull_request) Successful in 32s
test / integration (pull_request) Successful in 19s
Filter to exactly one agent and one bottle in both the lazy (md-dirs)
and eager (from_json_obj) paths so the returned manifest invariant
holds regardless of how the manifest was constructed.
2026-06-22 19:54:21 +00:00
didericis-claude 0609813ba0 refactor: scan filenames at resolve, parse only selected agent at preflight
lint / lint (push) Successful in 1m37s
test / unit (pull_request) Successful in 32s
test / integration (pull_request) Successful in 17s
Manifest.resolve() now returns an empty-dict manifest with only directory
paths recorded (home_md, cwd_md). No content is read from any .md file
until load_for_agent() is called for a specific agent at preflight.

- Manifest.from_md_dirs: scan-only, no frontmatter parsing
- Manifest.load_for_agent: parses the selected agent file and its bottle
  chain; works on eager (from_json_obj) manifests too by returning self
- Manifest.all_agent_names: scans filenames in lazy mode
- backend._validate: calls load_for_agent and propagates upgraded spec
- cli/info.py, cli/list.py, cli/start.py: use load_for_agent / all_agent_names
- manifest_extends.py: reverted to original (no partial-resolve helpers)
- manifest_loader.py: only scan_agent_names + load_bottle_chain_from_dir
- Tests updated to call load_for_agent before accessing agents/bottles;
  test_md_agent_repos_deferred renamed to test_md_agent_repos_fails_at_preflight
2026-06-22 19:42:49 +00:00
didericis-claude 09e04359e3 fix: resolve pyright reportUnusedImport in manifest_extends
Import ManifestError at module level from manifest_util (no circular
dep) and remove the redundant local imports from function bodies that
were shadowing it. ManifestBottle retains its local import pattern to
avoid the circular manifest ↔ manifest_extends dependency.
2026-06-22 19:42:49 +00:00
didericis-claude fe32f65fa4 feat: defer broken manifest parse errors to preflight
Broken bottle/agent files no longer block the agent selector or prevent
unrelated agents from loading. Per-file parse errors are collected in
`Manifest.broken_agents`; the CLI selector includes them via
`all_agent_names`, and the error surfaces only when the specific agent
is selected and launch is attempted (in `require_agent`/`bottle_for`).

Closes #236
2026-06-22 19:42:49 +00:00
83 changed files with 329 additions and 5568 deletions
+2 -30
View File
@@ -14,8 +14,7 @@
## Features
- **Per-bottle egress allowlist** — TLS-bumped HTTP/HTTPS chokepoint with a per-manifest host allowlist; per-route path/method/header `matches` filtering; outbound DLP scanning for known tokens and secrets, inbound DLP scanning for prompt-injection attempts; DoH and arbitrary hosts blocked by default.
- **Per-route token-match policy** — each egress route picks what happens when the outbound DLP catches a token via `dlp.outbound_on_match`: `supervise` (default) holds the request and surfaces it in `./cli.py supervise` for approval (an approved value is remembered for the life of the proxy); `redact` scrubs the value and forwards; `block` is a hard `403`. Cuts false-positive friction without weakening default-deny.
- **Per-bottle egress allowlist** — TLS-bumped HTTP/HTTPS chokepoint with a per-manifest host allowlist and request-body DLP scanner; DoH and arbitrary hosts blocked by default.
- **Tokens the agent never sees** — host secrets live in a sidecar; the agent dials `http://sidecar:9099/<path>` and the proxy strips inbound `Authorization` and injects the real token before forwarding. `printenv` in the agent shows proxy URLs only.
- **Gitleaks-scanned push (git-gate)** — `bottle.git` remotes route through a per-bottle `git daemon` that gitleaks-scans incoming refs pre-receive and forwards clean refs upstream over SSH. The agent never holds the upstream credential.
- **Manifest-scoped skills + secrets** — each bottle declares its skills, env, git identity, remotes, and egress routes; unknown keys die at load.
@@ -107,15 +106,8 @@ egress:
routes:
- host: gitea.dideric.is
auth:
scheme: token # Bearer | token
scheme: token
token_ref: BOT_BOTTLE_GITEA_TOKEN
matches: # optional — restrict to specific paths/methods/headers
- paths:
- {type: prefix, value: /api/v1/}
methods: [GET, POST, PATCH, DELETE]
dlp: # optional — per-route detector overrides (default: all on)
outbound_detectors: [token_patterns, known_secrets]
inbound_detectors: false # disable response scanning for this host
---
The `gitea-dev` bottle. Provider auth via the inherited Claude route;
@@ -134,26 +126,6 @@ skills:
You help maintain Gitea-hosted projects.
````
**Egress route fields:**
| Field | Required | Description |
|---|---|---|
| `host` | yes | Hostname to allowlist. One entry per host. |
| `role` | no | Reserved for future use. The key is recognised but any value is currently rejected at load. Provider auth routes (e.g. Claude's `api.anthropic.com`) are injected automatically from `agent_provider.auth_token`, not via `role`. |
| `auth.scheme` | when `auth` present | `Bearer` or `token`. Injected by the proxy; the agent never sees the value. |
| `auth.token_ref` | when `auth` present | Env-var name holding the secret on the host. |
| `matches` | no | Array of `{paths, methods, headers}` filters. A request must match at least one entry (if any are given) to be forwarded. |
| `matches[].paths` | no | Array of `{type, value}`. `type` is `prefix` (default), `exact`, or `regex`. |
| `matches[].methods` | no | Array of HTTP method strings, e.g. `[GET, POST]`. |
| `matches[].headers` | no | Array of `{name, value, type}`. `type` is `exact` (default) or `regex`. |
| `dlp` | no | Per-route DLP overrides. Omit to use defaults (all detectors on). |
| `dlp.outbound_detectors` | no | `false` disables outbound scanning; list restricts to named detectors (`token_patterns`, `known_secrets`). |
| `dlp.inbound_detectors` | no | `false` disables inbound scanning; list restricts to named detectors (`naive_injection_detection`). |
| `dlp.outbound_on_match` | no | What to do when an outbound token is detected: `supervise` (default for manifest routes — hold for operator approval), `redact` (scrub the value and forward), or `block` (hard 403). Agent-provider routes (e.g. `api.anthropic.com`) default to `redact`. |
| `git.fetch` | no | `true` permits smart HTTP clone/fetch (`git-upload-pack`) for this host. Push (`git-receive-pack`) remains blocked. |
When an outbound DLP detector matches a token, the route's `dlp.outbound_on_match` policy decides what happens. Under the default `supervise`, the proxy queues an `egress-token-allow` proposal for the operator's `./cli.py supervise` TUI and holds the request open until it is answered (or `EGRESS_TOKEN_ALLOW_TIMEOUT_SECONDS`, default 300s, elapses — after which it fails closed). The operator never sees the raw token, only the host, method, path, and a redacted snippet; approving adds the value to an in-memory safelist for the life of the egress proxy. Under `redact`, the matched value is scrubbed from the body, headers, and path and the request is forwarded (failing closed if a match lands somewhere unredactable, like the hostname). Under `block` it stays a hard `403`. Structural blocks (CRLF injection) and not-in-allowlist host blocks are always hard `403`s regardless of policy.
More examples in `examples/`. Full design lives under `docs/prds/`; the trust-boundary rationale is in `docs/prds/0011-per-file-md-manifest.md`.
## Trademarks
+2 -10
View File
@@ -61,6 +61,7 @@ class AgentProviderRuntime:
prompt_mode: PromptMode
bypass_args: tuple[str, ...]
resume_args: tuple[str, ...]
remote_control_args: tuple[str, ...]
@dataclass(frozen=True)
@@ -370,15 +371,6 @@ def build_agent_provision_plan(
)
def provider_startup_args(
provider_settings: dict[str, object] | None,
) -> tuple[str, ...]:
raw = (provider_settings or {}).get("startup_args", ())
if not isinstance(raw, (list, tuple)):
return ()
return tuple(arg for arg in raw if isinstance(arg, str))
def prompt_args(
prompt_mode: PromptMode,
prompt_path: str | None,
@@ -390,7 +382,7 @@ def prompt_args(
if prompt_mode == "append_file":
return ["--append-system-prompt-file", prompt_path]
if prompt_mode == "read_prompt_file":
if argv and ("resume" in argv or "remote-control" in argv):
if argv and "resume" in argv:
return []
return [f"Read and follow the instructions in {prompt_path}."]
if prompt_mode == "print_read_prompt_file":
+2 -9
View File
@@ -109,8 +109,9 @@ class BottlePlan(ABC):
def workspace_plan(self) -> WorkspacePlan:
return workspace_plan(self.spec, guest_home=self.guest_home)
def print(self) -> None:
def print(self, *, remote_control: bool) -> None:
"""Render the y/N preflight summary to stderr."""
del remote_control
spec = self.spec
manifest = self.manifest
agent = manifest.agent
@@ -525,11 +526,6 @@ from .docker import DockerBottleBackend # noqa: E402 # pylint: disable=wrong-i
from .macos_container import MacosContainerBottleBackend # noqa: E402 # pylint: disable=wrong-import-position
from .smolmachines import SmolmachinesBottleBackend # noqa: E402 # pylint: disable=wrong-import-position
# Freezer is imported after the backend classes for the same reason:
# Freezer.commit_slug constructs ActiveAgent, which must be fully
# defined first.
from .freeze import CommitCancelled, Freezer, get_freezer # noqa: E402 # pylint: disable=wrong-import-position
# The dict is heterogeneous: each value is a BottleBackend specialized
# over its own plan type. Concrete plan types are erased here because
@@ -617,12 +613,9 @@ __all__ = [
"BottleCleanupPlan",
"BottlePlan",
"BottleSpec",
"CommitCancelled",
"ExecResult",
"Freezer",
"enumerate_active_agents",
"get_bottle_backend",
"get_freezer",
"has_backend",
"known_backend_names",
]
+3 -5
View File
@@ -28,8 +28,6 @@ from typing import Any
from ...egress import (
EGRESS_HOSTNAME,
EGRESS_ROUTES_IN_CONTAINER,
egress_agent_env_entries,
egress_sidecar_env_entries,
)
from ...git_gate import GIT_GATE_HOSTNAME
from ...log import die, warn
@@ -136,8 +134,9 @@ def _sidecar_bundle_service(plan: DockerBottlePlan) -> dict[str, Any]:
ep = plan.egress_plan
volumes.append(_bind(ep.mitmproxy_ca_host_path, EGRESS_CA_IN_CONTAINER))
if ep.routes:
volumes.append(_bind(ep.routes_path.parent, str(Path(EGRESS_ROUTES_IN_CONTAINER).parent)))
env.extend(egress_sidecar_env_entries(ep))
volumes.append(_bind(ep.routes_path, EGRESS_ROUTES_IN_CONTAINER))
for token_env in sorted(ep.token_env_map.keys()):
env.append(token_env)
# --- git-gate -----------------------------------------------------
gp = plan.git_gate_plan
@@ -221,7 +220,6 @@ def _agent_service(plan: DockerBottlePlan) -> dict[str, Any]:
# never lands on argv or in the compose file.
for name in sorted(plan.forwarded_env.keys()):
env.append(name)
env.extend(egress_agent_env_entries(plan.egress_plan))
service: dict[str, Any] = {
"image": plan.image,
+18 -29
View File
@@ -1,21 +1,24 @@
"""Host-side helper for egress sidecar inspection and live updates.
"""Host-side helper for egress sidecar inspection (issue #198).
The approve path uses this module to validate a proposed routes file,
write it to the bottle's live egress state dir, and signal the sidecar
bundle so the mitmproxy addon reloads it.
`_merge_single_route`, `add_route`, and `apply_routes_change` were
removed when the egress-block MCP tool was dropped. The remaining
helpers support runtime inspection and validation of the routes file
without modifying it at runtime.
"""
from __future__ import annotations
import os
import subprocess
from ...egress import EGRESS_ROUTES_IN_CONTAINER
from ...log import warn
from ..egress_apply import EgressApplicator, EgressApplyError
from ...egress_addon_core import load_routes
from .sidecar_bundle import sidecar_bundle_container_name
class EgressApplyError(RuntimeError):
pass
def fetch_current_routes(slug: str) -> str:
container = sidecar_bundle_container_name(slug)
r = subprocess.run(
@@ -30,31 +33,17 @@ def fetch_current_routes(slug: str) -> str:
return r.stdout
class DockerEgressApplicator(EgressApplicator):
def _signal_bundle_reload(self, slug: str) -> None:
container = sidecar_bundle_container_name(slug)
result = subprocess.run(
["docker", "kill", "--signal", "HUP", container],
capture_output=True, text=True, check=False, env=os.environ,
)
if result.returncode != 0:
last_error = (result.stderr or "").strip() or (result.stdout or "").strip()
warn(
f"egress: routes updated on disk for {slug}, but bundle reload failed: "
f"{last_error or 'docker kill failed'}"
)
raise EgressApplyError(
f"could not reload egress bundle {container}: "
f"{last_error or 'docker kill failed'}"
)
applicator = DockerEgressApplicator()
def validate_routes_content(content: str) -> None:
try:
load_routes(content)
except ValueError as e:
raise EgressApplyError(
f"proposed routes.yaml is not valid: {e}"
) from e
__all__ = [
"DockerEgressApplicator",
"EgressApplyError",
"applicator",
"fetch_current_routes",
"validate_routes_content",
]
-23
View File
@@ -1,23 +0,0 @@
"""DockerFreezer — snapshot a Docker bottle via `docker commit`."""
from __future__ import annotations
from .. import ActiveAgent
from ..freeze import Freezer
from .util import commit_container
from ...log import info
class DockerFreezer(Freezer):
"""Freezes a Docker bottle by running `docker commit`."""
backend_name = "docker"
def _freeze(self, agent: ActiveAgent) -> str:
container = f"bot-bottle-{agent.slug}"
image_tag = f"bot-bottle-committed-{agent.slug}:latest"
commit_container(container, image_tag)
return image_tag
def _export_hint(self, slug: str, image_ref: str) -> None:
info(f"to export for migration: docker save {image_ref} -o {slug}.tar")
+7 -18
View File
@@ -47,7 +47,6 @@ from ...bottle_state import (
bottle_state_dir,
egress_state_dir,
git_gate_state_dir,
read_committed_image,
)
from .compose import (
bottle_plan_to_compose,
@@ -92,22 +91,12 @@ def launch(
)
try:
# Step 1: agent image. Use a committed snapshot when one exists
# and is present in the local daemon; otherwise build from the
# Dockerfile. Sidecar images get built lazily by `docker compose
# up` via the renderer's `build:` directives.
committed = read_committed_image(plan.slug)
if committed and docker_mod.image_exists(committed):
info(f"using committed image {committed!r}")
plan = dataclasses.replace(
plan,
agent_provision=dataclasses.replace(plan.agent_provision, image=committed),
)
else:
docker_mod.build_image(
plan.image, _REPO_DIR,
dockerfile=plan.dockerfile_path,
)
# Step 1: agent image build. Sidecar images get built lazily by
# `docker compose up` via the renderer's `build:` directives.
docker_mod.build_image(
plan.image, _REPO_DIR,
dockerfile=plan.dockerfile_path,
)
internal_network = network_mod.network_name_for_slug(plan.slug)
egress_network = network_mod.network_egress_name_for_slug(plan.slug)
@@ -187,7 +176,7 @@ def launch(
agent_command=plan.agent_command,
agent_prompt_mode=plan.agent_prompt_mode,
agent_provider_template=plan.agent_provider_template,
terminal_title=f"{plan.spec.label} ({plan.spec.agent_name})" if plan.spec.label else plan.spec.agent_name,
terminal_title=plan.spec.label or plan.spec.agent_name,
terminal_color=plan.spec.color,
agent_workdir=plan.workspace_plan.workdir,
)
-15
View File
@@ -152,21 +152,6 @@ def build_image(ref: str, context: str, *, dockerfile: str = "") -> None:
# )
def commit_container(container_name: str, image_tag: str) -> None:
"""Run `docker commit <container_name> <image_tag>` to snapshot the
running container's filesystem state as a local Docker image."""
result = subprocess.run(
["docker", "commit", container_name, image_tag],
capture_output=True, text=True, check=False,
)
if result.returncode != 0:
die(
f"docker commit {container_name!r}{image_tag!r} failed: "
f"{(result.stderr or '').strip() or '<no stderr>'}"
)
info(f"committed {container_name!r}{image_tag!r}")
def image_id(ref: str) -> str:
"""Return the content-addressed image ID (e.g.
`sha256:abcd...`) for `ref`. The smolmachines backend keys its
-54
View File
@@ -1,54 +0,0 @@
"""Shared base class for host-side egress apply across backends.
Each backend subclasses EgressApplicator and overrides _signal_bundle_reload
with the backend-specific kill command.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from pathlib import Path
from ..bottle_state import egress_state_dir
from ..egress import EGRESS_ROUTES_FILENAME
from ..egress_addon_core import LOG_OFF, load_config
class EgressApplyError(RuntimeError):
pass
class EgressApplicator(ABC):
def apply_routes_change(self, slug: str, content: str) -> tuple[str, str]:
"""Persist `content` to the live routes file and reload egress."""
self.validate_routes_content(content)
routes_path = self._routes_path(slug)
routes_path.parent.mkdir(parents=True, exist_ok=True)
before = routes_path.read_text(encoding="utf-8") if routes_path.exists() else ""
routes_path.write_text(content, encoding="utf-8")
routes_path.chmod(0o600)
self._signal_bundle_reload(slug)
return before, content
@staticmethod
def validate_routes_content(content: str) -> None:
try:
config = load_config(content)
except ValueError as e:
raise EgressApplyError(
f"proposed routes.yaml is not valid: {e}"
) from e
if config.log != LOG_OFF:
raise EgressApplyError(
"proposed routes.yaml must not change egress logging"
)
@staticmethod
def _routes_path(slug: str) -> Path:
return egress_state_dir(slug) / EGRESS_ROUTES_FILENAME
@abstractmethod
def _signal_bundle_reload(self, slug: str) -> None: ...
__all__ = ["EgressApplicator", "EgressApplyError"]
-100
View File
@@ -1,100 +0,0 @@
"""Freezer — snapshot a running bottle to a resumable artifact.
Follows the same pattern as BottleBackend: a shared base class with
common post-freeze steps (write committed-image path, mark preserved,
print resume hint) and backend-specific subclasses in their respective
backend directories.
Entry points:
Freezer.commit(agent) freeze by ActiveAgent
Freezer.commit_slug(slug) convenience wrapper for cmd_commit
get_freezer(backend_name) factory
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from . import ActiveAgent
from ..bottle_state import mark_preserved, write_committed_image
from ..log import die, info
class CommitCancelled(Exception):
"""Raised by Freezer._freeze when the user declines a confirmation prompt."""
class Freezer(ABC):
"""Freezes a running bottle to a resumable artifact.
The base class owns the shared post-commit steps:
- write_committed_image records the artifact path in per-bottle state
- mark_preserved prevents teardown from removing the state dir
- resume hint printed to stderr after the snapshot
Subclasses implement _freeze with the backend-specific snapshot
operation and optionally override _export_hint for migration hints.
"""
backend_name: str
def commit(self, agent: ActiveAgent) -> None:
"""Freeze the bottle for `agent` to a resumable artifact.
Calls _freeze for the backend-specific snapshot, then writes the
committed image reference to per-bottle state and marks the bottle
preserved so the next `./cli.py resume` boots from the snapshot.
Raises CommitCancelled if the user declines an interactive
confirmation prompt (e.g. the macos-container stop prompt).
"""
image_ref = self._freeze(agent)
write_committed_image(agent.slug, image_ref)
mark_preserved(agent.slug)
info(f"to resume from this snapshot: ./cli.py resume {agent.slug}")
self._export_hint(agent.slug, image_ref)
@abstractmethod
def _freeze(self, agent: ActiveAgent) -> str:
"""Backend-specific snapshot. Returns the image tag or artifact path
stored by write_committed_image. Raises CommitCancelled if the user
declines a stop-confirmation prompt."""
def _export_hint(self, slug: str, image_ref: str) -> None:
"""Optionally print an export-for-migration hint after committing.
Overridden by backends that provide a meaningful export command."""
def commit_slug(self, slug: str) -> None:
"""Convenience entry for cmd_commit when only a slug is available."""
from ..bottle_state import read_metadata
metadata = read_metadata(slug)
agent = ActiveAgent(
backend_name=self.backend_name,
slug=slug,
agent_name=metadata.agent_name if metadata else "",
started_at=metadata.started_at if metadata else "",
services=(),
)
self.commit(agent)
def get_freezer(backend_name: str) -> Freezer:
"""Return the Freezer for the named backend.
backend_name "" is treated as "docker" for backward compatibility
with state dirs written before the backend field was added."""
resolved = backend_name or "docker"
if resolved == "docker":
from .docker.freezer import DockerFreezer
return DockerFreezer()
if resolved == "macos-container":
from .macos_container.freezer import MacosContainerFreezer
return MacosContainerFreezer()
if resolved == "smolmachines":
from .smolmachines.freezer import SmolmachinesFreezer
return SmolmachinesFreezer()
die(
f"commit is only supported for docker, macos-container, and "
f"smolmachines; backend {backend_name!r} has no freezer"
)
raise AssertionError("unreachable")
+5 -45
View File
@@ -2,41 +2,12 @@
from __future__ import annotations
import os
import subprocess
import sys
from typing import Callable, cast
from ...agent_provider import PromptMode, prompt_args
from .. import Bottle, ExecResult
from ..terminal import exec_shell_script
from . import pty_forward as _pty_forward
_PTY_FORWARD_SCRIPT = _pty_forward.__file__
_TERMINAL_ENV_NAMES = (
"TERM",
"COLORTERM",
"TERM_PROGRAM",
"TERM_PROGRAM_VERSION",
"KITTY_WINDOW_ID",
"KITTY_PID",
"WEZTERM_PANE",
"WEZTERM_UNIX_SOCKET",
"GHOSTTY_BIN_DIR",
"GHOSTTY_RESOURCES_DIR",
"ITERM_SESSION_ID",
"VTE_VERSION",
"KONSOLE_VERSION",
"ALACRITTY_WINDOW_ID",
)
def _terminal_env_names() -> tuple[str, ...]:
return tuple(
name for name in _TERMINAL_ENV_NAMES
if name == "TERM" or os.environ.get(name)
)
class MacosContainerBottle(Bottle):
@@ -73,24 +44,13 @@ class MacosContainerBottle(Bottle):
argv=full_argv,
)
)
container_exec = ["container", "exec"]
cmd = ["container", "exec"]
if tty:
container_exec.extend(["--interactive", "--tty"])
# Forward terminal capability hints so TUIs can enable modified-key
# protocols. Use bare env names: values stay in the child env, not
# on argv, and pty_forward supplies a TERM fallback when needed.
for name in _terminal_env_names():
container_exec.extend(["--env", name])
cmd.extend(["--interactive", "--tty"])
if self.agent_workdir and self.agent_workdir != "/home/node":
container_exec.extend(["--workdir", self.agent_workdir])
container_exec.extend([self.name, self.agent_command, *full_argv])
if tty:
# Wrap with the raw-mode forwarder: container exec does not put
# the host terminal into raw mode itself, so the line discipline
# buffers modifier-key sequences until CR. The wrapper sets raw
# mode before exec and restores it on exit.
return [sys.executable, _PTY_FORWARD_SCRIPT, "--", *container_exec]
return container_exec
cmd.extend(["--workdir", self.agent_workdir])
cmd.extend([self.name, self.agent_command, *full_argv])
return cmd
def exec_agent(self, argv: list[str], *, tty: bool = True) -> int:
agent_argv = self.agent_argv(argv, tty=tty)
@@ -1,39 +0,0 @@
"""Host-side egress apply for the macos-container backend.
Uses `container kill --signal HUP` (Apple Container framework) instead
of `docker kill` to signal the sidecar bundle.
"""
from __future__ import annotations
import os
import subprocess
from ...log import warn
from ..egress_apply import EgressApplicator, EgressApplyError
from .launch import sidecar_container_name
class MacOSContainerEgressApplicator(EgressApplicator):
def _signal_bundle_reload(self, slug: str) -> None:
container = sidecar_container_name(slug)
result = subprocess.run(
["container", "kill", "--signal", "HUP", container],
capture_output=True, text=True, check=False, env=os.environ,
)
if result.returncode != 0:
last_error = (result.stderr or "").strip() or (result.stdout or "").strip()
warn(
f"egress: routes updated on disk for {slug}, but bundle reload failed: "
f"{last_error or 'container kill failed'}"
)
raise EgressApplyError(
f"could not reload egress bundle {container}: "
f"{last_error or 'container kill failed'}"
)
applicator = MacOSContainerEgressApplicator()
__all__ = ["MacOSContainerEgressApplicator", "EgressApplyError", "applicator"]
@@ -1,31 +0,0 @@
"""MacosContainerFreezer — snapshot a macOS container bottle.
Apple Container removes containers when they stop, making stop-then-export
impossible. Instead, commit_container execs into the running container and
streams the root filesystem via tar. The bottle continues running after commit.
"""
from __future__ import annotations
from .. import ActiveAgent
from ..freeze import Freezer
from .util import commit_container
from ...log import info
class MacosContainerFreezer(Freezer):
"""Freezes a macOS-container bottle via exec-tar + image rebuild."""
backend_name = "macos-container"
def _freeze(self, agent: ActiveAgent) -> str:
container = f"bot-bottle-{agent.slug}"
image_tag = f"bot-bottle-committed-{agent.slug}:latest"
commit_container(container, image_tag)
return image_tag
def _export_hint(self, slug: str, image_ref: str) -> None:
info(
f"to export for migration: "
f"container image save {image_ref} -o {slug}.tar"
)
+22 -28
View File
@@ -12,22 +12,14 @@ from __future__ import annotations
import dataclasses
import os
import shutil
import subprocess
from contextlib import ExitStack, contextmanager
from pathlib import Path
from typing import Callable, Generator
from ...bottle_state import (
egress_state_dir,
git_gate_state_dir,
read_committed_image,
)
from ...egress import (
EGRESS_ROUTES_IN_CONTAINER,
egress_agent_env_entries,
egress_resolve_token_values,
egress_sidecar_env_entries,
)
from ...bottle_state import egress_state_dir, git_gate_state_dir
from ...egress import EGRESS_ROUTES_IN_CONTAINER, egress_resolve_token_values
from ...git_gate import revoke_git_gate_provisioned_keys
from ...log import die, info, warn
from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT
@@ -92,7 +84,7 @@ def launch(
try:
plan = _mint_certs(plan)
plan = _build_images(plan)
_build_images(plan)
internal_network = internal_network_name(plan.slug)
egress_network = egress_network_name(plan.slug)
@@ -120,7 +112,7 @@ def launch(
agent_command=plan.agent_command,
agent_prompt_mode=plan.agent_prompt_mode,
agent_provider_template=plan.agent_provider_template,
terminal_title=f"{plan.spec.label} ({plan.spec.agent_name})" if plan.spec.label else plan.spec.agent_name,
terminal_title=plan.spec.label or plan.spec.agent_name,
terminal_color=plan.spec.color,
agent_workdir=plan.workspace_plan.workdir,
)
@@ -143,28 +135,17 @@ def _mint_certs(plan: MacosContainerBottlePlan) -> MacosContainerBottlePlan:
return dataclasses.replace(plan, egress_plan=egress_plan)
def _build_images(plan: MacosContainerBottlePlan) -> MacosContainerBottlePlan:
def _build_images(plan: MacosContainerBottlePlan) -> None:
container_mod.build_image(
SIDECAR_BUNDLE_IMAGE,
_REPO_DIR,
dockerfile=SIDECAR_BUNDLE_DOCKERFILE,
)
committed = read_committed_image(plan.slug)
if committed and container_mod.image_exists(committed):
info(f"using committed image {committed!r}")
return dataclasses.replace(
plan,
agent_provision=dataclasses.replace(
plan.agent_provision,
image=committed,
),
)
container_mod.build_image(
plan.image,
_REPO_DIR,
dockerfile=plan.dockerfile_path,
)
return plan
def _create_networks(
@@ -333,6 +314,7 @@ def _agent_run_argv(
"container", "run",
"--name", plan.container_name,
"--detach",
"--rm",
"--network", internal_network,
]
for entry in _agent_env_entries(plan, sidecar_ip):
@@ -355,7 +337,9 @@ def _sidecar_daemons(plan: MacosContainerBottlePlan) -> tuple[str, ...]:
def _sidecar_env_entries(plan: MacosContainerBottlePlan) -> tuple[str, ...]:
env: list[str] = list(egress_sidecar_env_entries(plan.egress_plan))
env: list[str] = []
if plan.egress_plan.routes:
env.extend(sorted(plan.egress_plan.token_env_map.keys()))
if plan.git_gate_plan.upstreams:
env.append(f"BOT_BOTTLE_GIT_GATE_READY_FILE={_GIT_GATE_READY_FILE}")
if plan.supervise_plan is not None:
@@ -380,7 +364,7 @@ def _sidecar_mounts(
))
if ep.routes:
mounts.append((
str(ep.routes_path.parent),
str(_stage_routes_dir(plan)),
str(Path(EGRESS_ROUTES_IN_CONTAINER).parent),
True,
))
@@ -391,6 +375,17 @@ def _sidecar_mounts(
return tuple(mounts)
def _stage_routes_dir(plan: MacosContainerBottlePlan) -> Path:
routes_dir = plan.stage_dir / "macos-container-egress"
routes_dir.mkdir(parents=True, exist_ok=True)
shutil.copyfile(
plan.egress_plan.routes_path,
routes_dir / Path(EGRESS_ROUTES_IN_CONTAINER).name,
)
return routes_dir
def _mount_spec(host_path: str, container_path: str, read_only: bool) -> str:
spec = f"type=bind,source={host_path},target={container_path}"
if read_only:
@@ -423,7 +418,6 @@ def _agent_env_entries(
env.append(f"{name}={value}")
for name in sorted(plan.forwarded_env.keys()):
env.append(name)
env.extend(egress_agent_env_entries(plan.egress_plan))
return tuple(env)
@@ -1,70 +0,0 @@
"""Host-side raw-mode wrapper for `container exec --interactive --tty`.
Apple's `container exec --interactive --tty` does not set the host terminal to
raw mode before starting its I/O relay. Without raw mode the kernel line
discipline buffers modifier-key escape sequences (e.g. Shift+Enter in
modifyOtherKeys mode produces \\x1b[13;2~) until a carriage-return arrives, so
they never reach Claude Code inside the container.
This module sets the host terminal to raw mode, spawns the inner argv (the
container exec command), and restores the original terminal attributes on
exit. When stdin is not a TTY (piped invocations, CI) it falls through to a
bare subprocess.run so callers do not need to special-case non-interactive
contexts.
Usage (the `--` separator is the API contract everything after it is the
inner command):
python pty_forward.py -- container exec --interactive --tty <name> <cmd>
"""
from __future__ import annotations
import os
import subprocess
import sys
import termios
import tty
def _inner_env() -> dict[str, str]:
env = dict(os.environ)
env.setdefault("TERM", "xterm-256color")
return env
def _run_inner(inner: list[str]) -> int:
return subprocess.run(inner, check=False, env=_inner_env()).returncode
def main(argv: list[str]) -> int:
"""Entry point. ``argv`` shape: ``-- <inner-argv...>``."""
if len(argv) < 2 or argv[0] != "--":
sys.stderr.write(
"usage: python pty_forward.py -- <container-exec-argv...>\n"
)
return 2
inner = argv[1:]
try:
fd = sys.stdin.fileno()
except OSError:
return _run_inner(inner)
if not os.isatty(fd):
return _run_inner(inner)
try:
old = termios.tcgetattr(fd)
except termios.error:
return _run_inner(inner)
try:
tty.setraw(fd)
return _run_inner(inner)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old)
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))
@@ -8,7 +8,6 @@ import ipaddress
import platform
import shutil
import subprocess
import tempfile
import time
from typing import Iterable
@@ -68,63 +67,11 @@ def build_image(ref: str, context: str, *, dockerfile: str = "") -> None:
_ensure_builder_dns()
args = [_CONTAINER, "build", "-t", ref, "--dns", dns_server()]
if dockerfile:
# `container build` resolves -f relative to the current working
# directory, not the build context. Anchor a relative Dockerfile to
# the context so builds work from any cwd.
if not os.path.isabs(dockerfile):
dockerfile = os.path.join(context, dockerfile)
args.extend(["-f", dockerfile])
args.append(context)
subprocess.run(args, check=True)
def commit_container(container_name: str, image_tag: str) -> None:
"""Snapshot a running Apple Container as a local image.
`container export` requires a stopped container, but Apple Container
removes containers when they stop, making stop-then-export impossible.
Instead, exec into the running container as root and stream the root
filesystem out via tar, then build a new image from that archive.
The bottle continues running after commit.
"""
with tempfile.TemporaryDirectory(prefix="bot-bottle-container-commit.") as tmp:
rootfs_tar = os.path.join(tmp, "rootfs.tar")
dockerfile = os.path.join(tmp, "Dockerfile")
with open(rootfs_tar, "wb") as tar_out:
result = subprocess.run(
[
_CONTAINER, "exec",
"--user", "root",
container_name,
"tar", "--create",
"--exclude=./proc",
"--exclude=./sys",
"--exclude=./dev",
"--exclude=./run",
"--file=-",
"--directory=/",
".",
],
stdout=tar_out,
stderr=subprocess.PIPE,
check=False,
)
if result.returncode != 0:
die(
f"container exec tar {container_name!r} failed: "
f"{(result.stderr or b'').decode().strip() or '<no stderr>'}"
)
with open(dockerfile, "w", encoding="utf-8") as f:
f.write(
"FROM scratch\n"
"ADD rootfs.tar /\n"
"USER node\n"
"WORKDIR /home/node\n"
)
build_image(image_tag, tmp, dockerfile=dockerfile)
info(f"committed {container_name!r}{image_tag!r}")
def _ensure_builder_dns() -> None:
dns = dns_server()
status = _builder_status()
@@ -271,36 +218,6 @@ def container_exists(name: str) -> bool:
return name in {line.strip() for line in result.stdout.splitlines()}
def container_is_running(name: str) -> bool:
"""Return True if the named container is currently running.
`container list` without `--all` lists only running containers."""
result = subprocess.run(
[_CONTAINER, "list", "--quiet"],
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0:
return False
return name in {line.strip() for line in result.stdout.splitlines()}
def stop_container(name: str) -> None:
"""Stop the named container without deleting it."""
result = subprocess.run(
[_CONTAINER, "stop", name],
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0:
die(
f"container stop {name!r} failed: "
f"{(result.stderr or '').strip() or '<no stderr>'}"
)
def force_remove_container(name: str) -> None:
if container_exists(name):
subprocess.run(
+1 -6
View File
@@ -145,12 +145,7 @@ class SmolmachinesBottle(Bottle):
script = exec_shell_script(agent_argv, self.terminal_title, self.terminal_color) if tty else None
if script is None:
return subprocess.run(agent_argv, check=False).returncode
# Use sh -c (not -lc) so the script inherits PATH from the calling
# process. sh -l sources login-shell init files (e.g. /etc/profile)
# which may NOT include smolvm's location when it was installed via
# homebrew. The calling process (./cli.py) already has smolvm on PATH
# (provision steps succeed), so -c is sufficient.
return subprocess.run(["sh", "-c", script], check=False).returncode
return subprocess.run(["sh", "-lc", script], check=False).returncode
# smolvm/libkrun can SIGKILL an otherwise-normal exec during
# early-VM provisioning. Retry once after a short settle so
@@ -1,21 +0,0 @@
"""Egress apply for the smolmachines backend.
The smolmachines sidecar bundle runs as a host-side Docker container,
so egress signalling is identical to the docker backend.
"""
from __future__ import annotations
from ..docker.egress_apply import ( # noqa: F401
DockerEgressApplicator,
EgressApplyError,
applicator,
fetch_current_routes,
)
__all__ = [
"DockerEgressApplicator",
"EgressApplyError",
"applicator",
"fetch_current_routes",
]
-145
View File
@@ -1,145 +0,0 @@
"""SmolmachinesFreezer — snapshot a smolmachines bottle.
`smolvm pack create --from-vm` requires the VM to be stopped, and smolvm
removes VMs when stopped (same issue as Apple Container). Instead, exec
into the running VM as root to write a gzip-compressed tar of the root
filesystem to /var/tmp, then copy it to the host with `smolvm machine cp`,
build a Docker image from the archive, convert it to a smolmachine artifact
via the existing registry pipeline, and record the sidecar path. The VM
stays running throughout."""
from __future__ import annotations
import tempfile
from pathlib import Path
from .. import ActiveAgent
from ..freeze import Freezer
from ..docker import util as docker_mod
from .local_registry import crane_push_tarball, ephemeral_registry
from .smolvm import machine_cp, machine_exec, pack_create
from ...bottle_state import bottle_state_dir
from ...log import die, info
# Temp file written inside the VM during commit. Lives in /var/tmp
# (on-disk, unlike tmpfs /tmp) to survive for machine_cp.
_VM_COMMIT_TAR = "/var/tmp/.bot-bottle-commit.tar.gz"
class SmolmachinesFreezer(Freezer):
"""Freezes a smolmachines bottle via exec-tar + Docker image + smolmachine pack.
The VM is NOT stopped. We exec into the running VM to write a compressed
tar of the root filesystem to /var/tmp, copy it to the host with
machine_cp, build a Docker image (Docker's ADD decompresses .tar.gz
automatically), then run the same imageregistrypack_create pipeline
that _ensure_smolmachine uses for fresh builds."""
backend_name = "smolmachines"
def _freeze(self, agent: ActiveAgent) -> str:
machine = f"bot-bottle-{agent.slug}"
image_ref = f"bot-bottle-committed-{agent.slug}:latest"
output_dir = bottle_state_dir(agent.slug)
output_dir.mkdir(parents=True, exist_ok=True)
binary = output_dir / "committed-smolmachine"
sidecar = output_dir / "committed-smolmachine.smolmachine"
_snapshot_running_vm(machine, image_ref, binary)
return str(sidecar)
def _export_hint(self, slug: str, image_ref: str) -> None:
info(f"to export for migration: cp {image_ref} {slug}.smolmachine")
def _snapshot_running_vm(machine: str, image_ref: str, binary: Path) -> None:
"""Exec-tar the running VM, build a Docker image, and pack to a smolmachine.
binary: destination for the launcher (sibling .smolmachine is the artifact
that machine_create --from consumes, same convention as pack_create).
"""
with tempfile.TemporaryDirectory(prefix="bot-bottle-vm-commit.") as tmp:
tmp_path = Path(tmp)
# Use .tar.gz — Docker ADD decompresses automatically and the
# compressed archive fits in the VM's /var/tmp more easily.
rootfs_tar_gz = tmp_path / "rootfs.tar.gz"
dockerfile = tmp_path / "Dockerfile"
_exec_tar_to_file(machine, rootfs_tar_gz)
dockerfile.write_text(
"FROM scratch\n"
"ADD rootfs.tar.gz /\n"
"USER node\n"
"WORKDIR /home/node\n"
)
docker_mod.build_image(image_ref, str(tmp_path), dockerfile=str(dockerfile))
image_tarball = binary.parent / "committed.image.tar"
docker_mod.save(image_ref, str(image_tarball))
try:
with ephemeral_registry() as handle:
digest = docker_mod.image_id(image_ref).split(":", 1)[-1][:16]
push_ref = f"{handle.push_endpoint}/bot-bottle-committed:{digest}"
pack_ref = f"{handle.pull_endpoint}/bot-bottle-committed:{digest}"
crane_push_tarball(handle, str(image_tarball), push_ref)
pack_create(pack_ref, binary)
finally:
image_tarball.unlink(missing_ok=True)
def _exec_tar_to_file(machine: str, dest: Path) -> None:
"""Snapshot the running VM's root filesystem to dest (.tar.gz).
Writes a gzip-compressed tar to _VM_COMMIT_TAR inside the VM via
machine_exec (same mechanism as provisioning), then copies it to the
host with machine_cp. This avoids binary-stdout piping through the
smolvm exec channel, which does not reliably handle large binary output.
A connectivity probe (machine_exec true) runs first so a concurrent-exec
limitation (smolvm may reject a second exec while -i -t is active) is
reported clearly rather than as a silent failure."""
# Connectivity probe — if smolvm rejects concurrent exec while an
# interactive session is running, fail clearly here.
probe = machine_exec(machine, ["true"])
if probe.returncode != 0:
die(
f"smolvm exec is not available for {machine!r} "
f"(exit {probe.returncode}: {probe.stderr.strip() or probe.stdout.strip() or '<no output>'}). "
f"If an interactive session is active, smolvm may not support concurrent exec."
)
# Create the compressed tar inside the VM.
# tar exits 1 when files change during archiving (normal for a live
# filesystem); only treat exit > 1 as fatal.
tar_result = machine_exec(
machine,
[
"tar", "--create", "--gzip",
"--exclude=./proc",
"--exclude=./sys",
"--exclude=./dev",
"--exclude=./run",
# /tmp and /var/tmp are ephemeral. Their stale contents
# (e.g. /tmp/claude-<uid>) have uid remapped by smolvm's
# pack process, causing Claude Code to refuse to use them
# on resume. Exclude both; _init_vm recreates them with
# mkdir -p + correct ownership on every boot.
"--exclude=./tmp",
"--exclude=./var/tmp",
f"--file={_VM_COMMIT_TAR}",
"--directory=/",
".",
],
)
if tar_result.returncode > 1:
die(
f"smolvm exec tar {machine!r} failed (exit {tar_result.returncode}): "
f"{tar_result.stderr.strip() or tar_result.stdout.strip() or '<no output>'}"
)
# Copy from VM to host, then clean up.
try:
machine_cp(f"{machine}:{_VM_COMMIT_TAR}", str(dest))
finally:
machine_exec(machine, ["rm", "-f", _VM_COMMIT_TAR])
+18 -50
View File
@@ -23,9 +23,7 @@ from typing import Callable, Generator
from ...egress import (
EGRESS_ROUTES_IN_CONTAINER,
egress_agent_env_entries,
egress_resolve_token_values,
egress_sidecar_env_entries,
)
from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT
from ...util import expand_tilde
@@ -42,12 +40,8 @@ from ..docker.git_gate import (
GIT_GATE_HOOK_IN_CONTAINER,
)
from ...git_gate import revoke_git_gate_provisioned_keys
from ...log import info, warn
from ...bottle_state import (
egress_state_dir,
git_gate_state_dir,
read_committed_image,
)
from ...log import warn
from ...bottle_state import egress_state_dir, git_gate_state_dir
from . import loopback_alias as _loopback
from . import sidecar_bundle as _bundle
from . import smolvm as _smolvm
@@ -91,7 +85,14 @@ def launch(
plan = _start_bundle(plan, network, loopback_ip, stack)
plan = _discover_urls(plan, loopback_ip)
agent_from_path = _agent_from_path(plan)
# Build the agent image and pack it into a `.smolmachine`
# artifact (or hit the per-Dockerfile-digest cache). Runs
# here, not in prepare, so the docker-build output doesn't
# garble the dashboard's preflight modal.
agent_from_path = _ensure_smolmachine(
plan.agent_image,
dockerfile=plan.agent_dockerfile_path,
)
_launch_vm(plan, agent_from_path, loopback_ip, stack)
_init_vm(plan)
@@ -103,7 +104,7 @@ def launch(
agent_command=plan.agent_command,
agent_prompt_mode=plan.agent_prompt_mode,
agent_provider_template=plan.agent_provider_template,
terminal_title=f"{plan.spec.label} ({plan.spec.agent_name})" if plan.spec.label else plan.spec.agent_name,
terminal_title=plan.spec.label or plan.spec.agent_name,
terminal_color=plan.spec.color,
agent_workdir=plan.workspace_plan.workdir,
)
@@ -216,23 +217,16 @@ def _discover_urls(
agent_supervise_url = f"http://{loopback_ip}:{supervise_host_port}/"
existing_no_proxy = plan.guest_env.get("NO_PROXY", "localhost,127.0.0.1")
no_proxy = f"{existing_no_proxy},{loopback_ip}"
guest_env = {
**plan.guest_env,
"HTTPS_PROXY": agent_proxy_url,
"HTTP_PROXY": agent_proxy_url,
"https_proxy": agent_proxy_url,
"http_proxy": agent_proxy_url,
"NO_PROXY": no_proxy,
"no_proxy": no_proxy,
"NO_PROXY": f"{existing_no_proxy},{loopback_ip}",
}
if agent_git_gate_host:
guest_env["GIT_GATE_URL"] = f"http://{agent_git_gate_host}"
if agent_supervise_url:
guest_env["MCP_SUPERVISE_URL"] = agent_supervise_url
for entry in egress_agent_env_entries(plan.egress_plan):
name, value = entry.split("=", 1)
guest_env[name] = value
return dataclasses.replace(
plan,
@@ -281,16 +275,10 @@ def _init_vm(plan: SmolmachinesBottlePlan) -> None:
All folded into one sh -c to avoid back-to-back exec calls
immediately after machine_start (libkrun exec-channel race).
mkdir -p guards: when booting from a committed snapshot, /tmp and
/var/tmp are excluded from the archive (they're ephemeral and their
stale contents would have wrong uid after smolvm's uid remap). The
directories must be created before chown/chmod can set permissions.
wait_exec_ready polls until the exec channel is ready for the
subsequent provision calls, replacing the empirical sleep."""
_smolvm.machine_exec(plan.machine_name, [
"sh", "-c",
"mkdir -p /tmp /var/tmp && "
"chown -R node:node /home/node && "
"chown root:root /tmp /var/tmp && "
"chmod 1777 /tmp /var/tmp",
@@ -320,8 +308,12 @@ def _bundle_launch_spec(
ep = plan.egress_plan
volumes.append((str(ep.mitmproxy_ca_host_path), EGRESS_CA_IN_CONTAINER, True))
if ep.routes:
volumes.append((str(ep.routes_path.parent), str(Path(EGRESS_ROUTES_IN_CONTAINER).parent), True))
env.extend(egress_sidecar_env_entries(ep))
volumes.append((str(ep.routes_path), EGRESS_ROUTES_IN_CONTAINER, True))
# Bare-name entries for upstream-token slots. Their values
# come from the docker-run subprocess env (inherited from
# the operator's shell), never landing on argv.
for token_env in sorted(ep.token_env_map.keys()):
env.append(token_env)
# --- git-gate ---------------------------------------------
gp = plan.git_gate_plan
@@ -390,30 +382,6 @@ def _resolve_token_env(
return egress_resolve_token_values(plan.egress_plan.token_env_map, effective_env)
def _agent_from_path(plan: SmolmachinesBottlePlan) -> Path:
"""Return the `.smolmachine` artifact used for `machine create --from`.
Prefer a committed VM artifact when one is recorded and still
present. If the file was removed, fall back to the normal image
build + pack cache path.
"""
committed = read_committed_image(plan.slug)
if committed:
committed_path = Path(committed)
if committed_path.is_file():
info(f"using committed smolmachine {str(committed_path)!r}")
return committed_path
# Build the agent image and pack it into a `.smolmachine`
# artifact (or hit the per-Dockerfile-digest cache). Runs here,
# not in prepare, so the docker-build output doesn't garble the
# dashboard's preflight modal.
return _ensure_smolmachine(
plan.agent_image,
dockerfile=plan.agent_dockerfile_path,
)
def _ensure_smolmachine(image_ref: str, *, dockerfile: str = "") -> Path:
"""Build the agent docker image and convert it into a
`.smolmachine` artifact, caching the result under
-26
View File
@@ -25,7 +25,6 @@ smolvm binary."""
from __future__ import annotations
import json
import shutil
import subprocess
import time
@@ -95,16 +94,6 @@ def pack_create(image: str, output: Path) -> None:
_smolvm("pack", "create", "--image", image, "-o", str(output))
def pack_create_from_vm(name: str, output: Path) -> None:
"""`smolvm pack create --from-vm <name> -o <output>`.
Snapshots an existing persistent VM into a pack artifact. As
with `pack_create`, smolvm writes a launcher at `output` and the
bootable sidecar at `output.smolmachine`.
"""
_smolvm("pack", "create", "--from-vm", name, "-o", str(output))
# --- Machine lifecycle ---------------------------------------------------
@@ -154,21 +143,6 @@ def machine_create(
_smolvm(*args)
def machine_is_running(name: str) -> bool:
"""Return True if the named VM is in the 'running' state."""
result = _smolvm("machine", "ls", "--json", check=False)
if result.returncode != 0:
return False
try:
machines = json.loads(result.stdout or "[]")
except ValueError:
return False
return any(
isinstance(m, dict) and m.get("name") == name and m.get("state") == "running"
for m in machines
)
def machine_start(name: str) -> None:
"""`smolvm machine start --name NAME`."""
_smolvm("machine", "start", "--name", name)
-30
View File
@@ -43,7 +43,6 @@ from . import supervise as _supervise
# Directory layout: ~/.bot-bottle/state/<identity>/...
_STATE_SUBDIR = "state"
_PER_BOTTLE_DOCKERFILE_NAME = "Dockerfile"
_COMMITTED_IMAGE_NAME = "committed-image"
_TRANSCRIPT_SUBDIR = "transcript"
# Per-sidecar scratch subdirs. PRD 0018 chunk 2: bind-mount sources
# live here so chunk 3's `docker compose up` can find them at stable
@@ -180,32 +179,6 @@ def write_per_bottle_dockerfile(identity: str, content: str) -> Path:
return p
def committed_image_path(identity: str) -> Path:
return bottle_state_dir(identity) / _COMMITTED_IMAGE_NAME
def write_committed_image(identity: str, image_tag: str) -> Path:
"""Persist the committed image tag for `identity`. The next
`cli.py resume <identity>` will boot from this image instead of
rebuilding from the Dockerfile."""
path = committed_image_path(identity)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(image_tag.strip() + "\n")
path.chmod(0o644)
return path
def read_committed_image(identity: str) -> str | None:
"""Return the committed image tag for `identity`, or None if no
commit has been recorded. Used by the Docker launch step to skip
the Dockerfile build when a committed snapshot exists."""
path = committed_image_path(identity)
if not path.is_file():
return None
tag = path.read_text().strip()
return tag or None
def per_bottle_image_tag(identity: str) -> str:
"""Image tag for a rebuilt bottle. Distinct from the base
bot-bottle-claude:latest so per-bottle rebuilds don't collide in
@@ -341,7 +314,6 @@ __all__ = [
"bottle_state_dir",
"cleanup_state",
"clear_preserve_marker",
"committed_image_path",
"egress_state_dir",
"git_gate_state_dir",
"is_preserved",
@@ -351,11 +323,9 @@ __all__ = [
"per_bottle_dockerfile_path",
"per_bottle_image_tag",
"preserve_marker_path",
"read_committed_image",
"read_metadata",
"supervise_state_dir",
"transcript_snapshot_dir",
"write_committed_image",
"write_metadata",
"write_per_bottle_dockerfile",
]
+1 -4
View File
@@ -1,6 +1,6 @@
"""Main CLI dispatcher.
Commands: cleanup, commit, edit, info, init, list, resume, start, supervise
Commands: cleanup, edit, info, init, list, resume, start, supervise
"""
from __future__ import annotations
@@ -12,7 +12,6 @@ from ..manifest import ManifestError
from ._common import PROG
from . import list as _list_mod
from .cleanup import cmd_cleanup
from .commit import cmd_commit
from .edit import cmd_edit
from .info import cmd_info
from .init import cmd_init
@@ -24,7 +23,6 @@ cmd_list = _list_mod.cmd_list
COMMANDS = {
"cleanup": cmd_cleanup,
"commit": cmd_commit,
"edit": cmd_edit,
"info": cmd_info,
"init": cmd_init,
@@ -39,7 +37,6 @@ def usage() -> None:
sys.stderr.write(f"usage: {PROG} <command> [args...]\n\n")
sys.stderr.write("Commands:\n")
sys.stderr.write(" cleanup stop and remove all active bot-bottle containers\n")
sys.stderr.write(" commit snapshot a running bottle's container state to a Docker image\n")
sys.stderr.write(" edit open an agent in vim for editing\n")
sys.stderr.write(" info print env, skills, and prompt details for a named agent\n")
sys.stderr.write(" init interactively create a new agent and add it to bot-bottle.json\n")
-53
View File
@@ -1,53 +0,0 @@
"""commit: freeze a running bottle's state to a resumable artifact.
Docker bottles are committed to a local Docker image. Macos-container
bottles are exported and rebuilt as a local Apple Container image.
Smolmachines bottles are packed from the running VM into a
`.smolmachine` artifact. The resulting reference is stored in
per-bottle state so the next `./cli.py resume <slug>` boots from the
snapshot instead of rebuilding from the Dockerfile.
"""
from __future__ import annotations
import argparse
from ..backend import enumerate_active_agents
from ..backend.freeze import CommitCancelled, get_freezer
from ..bottle_state import read_metadata
from ..log import die
from ._common import PROG
from . import tui
def cmd_commit(argv: list[str]) -> int:
parser = argparse.ArgumentParser(prog=f"{PROG} commit", add_help=True)
parser.add_argument(
"slug",
nargs="?",
default=None,
help=(
"bottle slug from `cli.py list active` "
"(omit to pick interactively)"
),
)
args = parser.parse_args(argv)
slug = args.slug
if slug is None:
active = enumerate_active_agents()
if not active:
die("no active bottles; start one with `./cli.py start`")
choices = [a.slug for a in active]
slug = tui.filter_select(choices, title="Select bottle to commit")
if slug is None:
return 0
metadata = read_metadata(slug)
backend = metadata.backend if metadata else ""
try:
get_freezer(backend).commit_slug(slug)
except CommitCancelled:
return 0
return 0
+1 -1
View File
@@ -55,7 +55,7 @@ def cmd_list(argv: list[str]) -> int:
# Tab-separated keeps the format stable for shell pipelines.
for b in active:
services = ",".join(b.services) if b.services else "-"
display_name = f"{b.label} ({b.agent_name})" if b.label else b.agent_name
display_name = b.label if b.label else b.agent_name
colored_name = _ansi_label(display_name, b.color)
print(f"{b.backend_name}\t{b.slug}\t{colored_name}\t{services}")
return 0
+2
View File
@@ -28,6 +28,7 @@ from .start import _launch_bottle
def cmd_resume(argv: list[str]) -> int:
parser = argparse.ArgumentParser(prog=f"{PROG} resume", add_help=True)
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--remote-control", action="store_true")
parser.add_argument(
"identity",
help="bottle identity from a prior `start` (see its session-end output)",
@@ -55,5 +56,6 @@ def cmd_resume(argv: list[str]) -> int:
return _launch_bottle(
spec,
dry_run=args.dry_run,
remote_control=args.remote_control,
backend_name=backend_name,
)
+10 -4
View File
@@ -42,6 +42,7 @@ def cmd_start(argv: list[str]) -> int:
parser = argparse.ArgumentParser(prog=f"{PROG} start", add_help=True)
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--cwd", action="store_true", help="copy host cwd into the running bottle")
parser.add_argument("--remote-control", action="store_true")
parser.add_argument(
"--backend",
choices=known_backend_names(),
@@ -88,6 +89,7 @@ def cmd_start(argv: list[str]) -> int:
return _launch_bottle(
spec,
dry_run=dry_run,
remote_control=args.remote_control,
backend_name=backend_name,
)
@@ -132,7 +134,7 @@ def prepare_with_preflight(
def attach_agent(
bottle: Bottle, *, resume: bool = False,
bottle: Bottle, *, remote_control: bool = False, resume: bool = False,
agent_provider_template: str = "claude",
startup_args: tuple[str, ...] = (),
) -> int:
@@ -151,6 +153,8 @@ def attach_agent(
"(Ctrl-D or 'exit' to leave; container will be removed)"
)
agent_args = list(runtime.bypass_args)
if remote_control:
agent_args.extend(runtime.remote_control_args)
agent_args.extend(startup_args)
if resume:
agent_args.extend(runtime.resume_args)
@@ -214,9 +218,9 @@ def _text_prompt_yes() -> bool:
return reply in ("y", "Y", "yes", "YES")
def _text_render_preflight():
def _text_render_preflight(*, remote_control: bool):
def _render(plan: DockerBottlePlan) -> None:
plan.print()
plan.print(remote_control=remote_control)
return _render
@@ -224,6 +228,7 @@ def _launch_bottle(
spec: BottleSpec,
*,
dry_run: bool,
remote_control: bool,
backend_name: str | None = None,
) -> int:
"""Shared launch core for `start` and `resume`. Builds the plan,
@@ -235,7 +240,7 @@ def _launch_bottle(
plan, identity = prepare_with_preflight(
spec,
stage_dir=stage_dir,
render_preflight=_text_render_preflight(),
render_preflight=_text_render_preflight(remote_control=remote_control),
prompt_yes=_text_prompt_yes,
dry_run=dry_run,
backend_name=backend_name,
@@ -248,6 +253,7 @@ def _launch_bottle(
agent_provider_template = getattr(plan, "agent_provider_template", "claude")
exit_code = attach_agent(
bottle,
remote_control=remote_control,
agent_provider_template=agent_provider_template,
startup_args=plan.agent_provision.startup_args,
)
+10 -79
View File
@@ -3,8 +3,7 @@ act on them (approve / modify / reject).
Curses-based TUI; modify-then-approve shells out to $EDITOR. The
approval handler wires to PRD 0016 (capability-block), which rebuilds
the bottle Dockerfile. Egress proposals are queued for operator review
as full routes.yaml updates.
the bottle Dockerfile. The egress-block tool was removed in issue #198.
"""
from __future__ import annotations
@@ -21,21 +20,11 @@ from datetime import datetime, timezone
from pathlib import Path
from .. import supervise as _supervise
from ..bottle_state import read_metadata
# from ..bottle_state import read_metadata
# from ..backend.docker.capability_apply import (
# CapabilityApplyError,
# apply_capability_change,
# )
from ..backend.docker.egress_apply import (
EgressApplyError,
applicator as _docker_applicator,
)
from ..backend.macos_container.egress_apply import (
applicator as _macos_applicator,
)
from ..backend.smolmachines.egress_apply import (
applicator as _smolmachines_applicator,
)
from ..log import Die, error, info
@@ -51,10 +40,6 @@ from ..supervise import (
STATUS_MODIFIED,
STATUS_REJECTED,
TOOL_CAPABILITY_BLOCK,
TOOL_EGRESS_ALLOW,
TOOL_EGRESS_BLOCK,
TOOL_GITLEAKS_ALLOW,
TOOL_EGRESS_TOKEN_ALLOW,
archive_proposal,
list_pending_proposals,
render_diff,
@@ -66,11 +51,6 @@ from ._common import PROG
_REFRESH_INTERVAL_MS = 1000
# Proposal tools whose payload is a read-only report, not a file the operator
# edits: modify is unavailable and approval requires a recorded reason for the
# audit trail.
_REPORT_ONLY_TOOLS: tuple[str, ...] = (TOOL_GITLEAKS_ALLOW, TOOL_EGRESS_TOKEN_ALLOW)
@dataclass(frozen=True)
class QueuedProposal:
@@ -83,17 +63,7 @@ class QueuedProposal:
# Errors any remediation engine may raise. Caught by the TUI key
# handlers and surfaced in the status line so a failed apply keeps
# the proposal pending rather than crashing curses.
ApplyError = (CapabilityApplyError, EgressApplyError)
def apply_routes_change(slug: str, content: str) -> tuple[str, str]:
meta = read_metadata(slug)
backend = meta.backend if meta is not None else ""
if backend == "macos-container":
return _macos_applicator.apply_routes_change(slug, content)
if backend == "smolmachines":
return _smolmachines_applicator.apply_routes_change(slug, content)
return _docker_applicator.apply_routes_change(slug, content)
ApplyError = (CapabilityApplyError,)
def discover_pending() -> list[QueuedProposal]:
@@ -145,10 +115,6 @@ def _detail_lines(
def _suffix_for_tool(tool: str) -> str:
if tool == TOOL_CAPABILITY_BLOCK:
return ".dockerfile"
if tool in (TOOL_EGRESS_ALLOW, TOOL_EGRESS_BLOCK):
return ".yaml"
if tool in (TOOL_GITLEAKS_ALLOW, TOOL_EGRESS_TOKEN_ALLOW):
return ".txt"
return ".txt"
@@ -163,7 +129,6 @@ def approve(
) -> None:
"""Apply the proposal, write the waiting response, and audit it."""
status = STATUS_MODIFIED if final_file is not None else STATUS_APPROVED
file_to_apply = final_file if final_file is not None else qp.proposal.proposed_file
diff_before, diff_after = "", ""
# if qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
@@ -177,11 +142,6 @@ def approve(
# diff_before, diff_after = apply_capability_change(
# qp.proposal.bottle_slug, file_to_apply,
# )
if qp.proposal.tool in (TOOL_EGRESS_ALLOW, TOOL_EGRESS_BLOCK):
diff_before, diff_after = apply_routes_change(
qp.proposal.bottle_slug,
file_to_apply,
)
response = Response(
proposal_id=qp.proposal.id,
@@ -210,23 +170,6 @@ def reject(qp: QueuedProposal, *, reason: str) -> None:
_write_audit(qp, action=STATUS_REJECTED, notes=reason, diff_before="", diff_after="")
def _approve_from_tui(
stdscr: "curses._CursesWindow", # type: ignore
qp: QueuedProposal,
*,
final_file: str | None = None,
notes: str = "",
) -> str:
"""Approve from curses, prompting for any tool-specific audit note."""
if qp.proposal.tool in _REPORT_ONLY_TOOLS and final_file is None:
notes = _prompt(stdscr, "allow reason (false positive / legitimately needed): ")
if not notes:
return "approve aborted (empty reason)"
approve(qp, final_file=final_file, notes=notes)
verb = "modified+approved" if final_file is not None else "approved"
return _approval_status(qp, verb)
def _write_audit(
qp: QueuedProposal,
*,
@@ -298,10 +241,7 @@ def cmd_supervise(argv: list[str]) -> int:
return e.code if isinstance(e.code, int) else 1
except Exception as e: # noqa: W0718 — catch supervise crash for logging
log_path = _write_crash_log(e)
error(
f"supervise crashed: {type(e).__name__}: {e}",
context={"error_type": type(e).__name__, "crash_log": str(log_path)},
)
error(f"supervise crashed: {type(e).__name__}: {e}")
error(f"full traceback written to {log_path}")
return 1
return 0
@@ -413,22 +353,18 @@ def _main_loop(stdscr: "curses._CursesWindow") -> None: # type: ignore
_detail_view(stdscr, qp, green_attr=green_attr)
elif key == ord("a"):
try:
status_line = _approve_from_tui(stdscr, qp)
approve(qp)
status_line = _approval_status(qp, "approved")
except ApplyError as e:
status_line = f"apply failed: {e}"
elif key == ord("m"):
if qp.proposal.tool in _REPORT_ONLY_TOOLS:
status_line = f"modify unavailable for {qp.proposal.tool}"
continue
edited = _modify(stdscr, qp)
if edited is None:
status_line = "modify aborted (no change)"
else:
try:
status_line = _approve_from_tui(
stdscr, qp, final_file=edited,
notes="operator modified before approving",
)
approve(qp, final_file=edited, notes="operator modified before approving")
status_line = _approval_status(qp, "modified+approved")
except ApplyError as e:
status_line = f"apply failed: {e}"
elif key == ord("r"):
@@ -526,20 +462,15 @@ def _detail_view(
offset = max(0, len(lines) - 1)
elif key == ord("a"):
try:
_approve_from_tui(stdscr, qp)
approve(qp)
except ApplyError:
pass
return
elif key == ord("m"):
if qp.proposal.tool in _REPORT_ONLY_TOOLS:
return
edited = _modify(stdscr, qp)
if edited is not None:
try:
_approve_from_tui(
stdscr, qp, final_file=edited,
notes="operator modified before approving",
)
approve(qp, final_file=edited, notes="operator modified before approving")
except ApplyError:
pass
return
+2 -4
View File
@@ -20,7 +20,6 @@ from ...agent_provider import (
AgentProvisionDir,
AgentProvisionFile,
AgentProvisionPlan,
provider_startup_args,
)
from ...backend.docker import util as docker_mod
from ...egress import EgressRoute
@@ -91,6 +90,7 @@ _RUNTIME = AgentProviderRuntime(
prompt_mode="append_file",
bypass_args=("--dangerously-skip-permissions",),
resume_args=("--continue",),
remote_control_args=("--remote-control",),
)
@@ -115,9 +115,8 @@ class ClaudeAgentProvider(AgentProvider):
color: str = "",
provider_settings: dict[str, object] | None = None,
) -> AgentProvisionPlan:
del forward_host_credentials, host_env
del forward_host_credentials, host_env, provider_settings
resolved_guest_env = dict(guest_env or {})
startup_args = provider_startup_args(provider_settings)
guest_home = self.guest_home
trusted_path = trusted_project_path or guest_home
@@ -200,7 +199,6 @@ class ClaudeAgentProvider(AgentProvider):
env_vars=env_vars,
guest_env=resolved_guest_env,
has_prompt=has_prompt,
startup_args=startup_args,
dirs=dirs,
files=tuple(files),
egress_routes=egress_routes,
+6 -9
View File
@@ -1,12 +1,12 @@
# bot-bottle Codex provider image.
#
# Mirrors the default Claude image shape: Node LTS, git/network tooling,
# non-root node user, and the provider CLI installed for that user.
# non-root node user, and the provider CLI installed globally.
FROM node:22-slim
RUN apt-get update \
&& apt-get install -y --no-install-recommends git ca-certificates curl procps \
&& apt-get install -y --no-install-recommends git ca-certificates curl \
&& rm -rf /var/lib/apt/lists/*
# App-specific deps. Python isn't required by codex itself
@@ -17,15 +17,12 @@ RUN apt-get update \
&& apt-get install -y --no-install-recommends python3 python3-pip python3-venv \
&& rm -rf /var/lib/apt/lists/*
RUN npm install -g --no-fund --no-audit @openai/codex@0.136.0 \
&& npm cache clean --force
USER node
WORKDIR /home/node
ENV PATH="/home/node/.local/bin:${PATH}"
# Remote-control support requires the standalone Codex install layout
# under ~/.codex/packages/standalone/current. The npm package can run
# the TUI, but remote-control commands expect this installer-owned path.
RUN mkdir -p /home/node/.codex \
&& curl -fsSL https://chatgpt.com/codex/install.sh | sh
RUN mkdir -p /home/node/.codex
CMD ["codex"]
+5 -7
View File
@@ -22,7 +22,6 @@ from ...agent_provider import (
AgentProvisionCommand,
AgentProvisionFile,
AgentProvisionPlan,
provider_startup_args,
)
from .codex_auth import codex_host_access_token, write_codex_dummy_auth_file
from ...egress import CODEX_HOST_CREDENTIAL_TOKEN_REF, EgressRoute
@@ -55,6 +54,7 @@ _RUNTIME = AgentProviderRuntime(
prompt_mode="read_prompt_file",
bypass_args=("--dangerously-bypass-approvals-and-sandbox",),
resume_args=("resume", "--last"),
remote_control_args=(),
)
@@ -79,9 +79,8 @@ class CodexAgentProvider(AgentProvider):
color: str = "",
provider_settings: dict[str, object] | None = None,
) -> AgentProvisionPlan:
del auth_token, label, color
del auth_token, label, color, provider_settings
resolved_guest_env = dict(guest_env or {})
startup_args = provider_startup_args(provider_settings)
guest_home = self.guest_home
trusted_path = trusted_project_path or guest_home
@@ -164,7 +163,6 @@ class CodexAgentProvider(AgentProvider):
env_vars=env_vars,
guest_env=resolved_guest_env,
has_prompt=has_prompt,
startup_args=startup_args,
dirs=tuple(dirs),
files=tuple(files),
pre_copy=tuple(pre_copy),
@@ -263,8 +261,8 @@ class CodexAgentProvider(AgentProvider):
return
info(f"registering supervise MCP server in agent codex config → {supervise_url}")
r = bottle.exec(
f"codex mcp add {_SUPERVISE_MCP_NAME} --url "
f"{shlex.quote(supervise_url)}",
f"codex mcp add --transport http "
f"{_SUPERVISE_MCP_NAME} {supervise_url}",
user="node",
)
if r.returncode != 0:
@@ -272,7 +270,7 @@ class CodexAgentProvider(AgentProvider):
f"`codex mcp add supervise` failed (exit {r.returncode}): "
f"{(r.stderr or r.stdout or '').strip()}. Inside the bottle, "
f"register manually with: "
f"codex mcp add supervise --url {shlex.quote(supervise_url)}"
f"codex mcp add --transport http supervise {supervise_url}"
)
@@ -19,7 +19,7 @@ import urllib.error
import urllib.request
from pathlib import Path
from ...deploy_key_provisioner import DeployKeyCollisionError, DeployKeyProvisioner
from ...deploy_key_provisioner import DeployKeyProvisioner
class GiteaDeployKeyProvisioner(DeployKeyProvisioner):
@@ -71,11 +71,6 @@ class GiteaDeployKeyProvisioner(DeployKeyProvisioner):
body = json.loads(resp.read())
except urllib.error.HTTPError as exc:
_body = _read_error_body(exc)
if exc.code == 422:
raise DeployKeyCollisionError(
f"deploy key collision for {owner_repo!r} "
f"(title={title!r}): key title or content already registered — {_body}"
) from exc
raise RuntimeError(
f"failed to create deploy key for {owner_repo}: "
f"HTTP {exc.code}{_body}"
+1 -3
View File
@@ -21,7 +21,6 @@ from ...agent_provider import (
AgentProvisionDir,
AgentProvisionFile,
AgentProvisionPlan,
provider_startup_args,
)
from ...egress import EgressRoute
from ...log import die, info
@@ -166,6 +165,7 @@ _RUNTIME = AgentProviderRuntime(
prompt_mode="append_system_prompt",
bypass_args=(),
resume_args=(),
remote_control_args=(),
)
@@ -199,7 +199,6 @@ class PiAgentProvider(AgentProvider):
models_payload, base_url, api_key_env, models, provider_name = (
_pi_models_json(settings)
)
extra_startup_args = provider_startup_args(provider_settings)
models_file = state_dir / "pi-models.json"
models_file.write_text(json.dumps(models_payload, indent=2) + "\n")
models_file.chmod(0o600)
@@ -220,7 +219,6 @@ class PiAgentProvider(AgentProvider):
startup_args=(
"--models",
",".join(f"{provider_name}/{model}" for model in models),
*extra_startup_args,
),
dirs=(AgentProvisionDir(f"{guest_home}/.pi/agent"),),
files=(AgentProvisionFile(models_file, _models_path(guest_home)),),
-4
View File
@@ -11,10 +11,6 @@ from __future__ import annotations
from abc import ABC, abstractmethod
class DeployKeyCollisionError(RuntimeError):
"""Raised when a deploy key title or public key already exists on the repo."""
class DeployKeyProvisioner(ABC):
"""Manages a single deploy-key lifecycle on a remote forge."""
+7 -190
View File
@@ -15,8 +15,6 @@ import gzip
import re
import typing
import unicodedata
from math import log2
from collections import Counter
from urllib.parse import quote as url_quote
try:
@@ -80,27 +78,16 @@ TOKEN_PATTERNS: tuple[tuple[str, re.Pattern[str]], ...] = (
)
def scan_token_patterns(
text: str,
*,
location: str = "body",
safe_tokens: typing.AbstractSet[str] | None = None,
) -> ScanResult | None:
def scan_token_patterns(text: str, *, location: str = "body") -> ScanResult | None:
normalized = _normalize_text(text)
for name, pattern in TOKEN_PATTERNS:
for m in pattern.finditer(normalized):
value = m.group(0)
# A value the supervisor has approved (PRD 0062) is no longer a
# block — keep scanning so a second, un-approved token in the
# same request is still caught.
if safe_tokens is not None and value in safe_tokens:
continue
m = pattern.search(normalized)
if m is not None:
return ScanResult(
severity="block",
reason=f"{name} found in {location}",
location=location,
context=_snippet(normalized, m.start(), m.end()),
matched=value,
context=_snippet(text, m.start(), m.end()),
)
return None
@@ -109,21 +96,20 @@ def redact_tokens(
text: str,
*,
env: typing.Mapping[str, str] | None = None,
sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",),
) -> str:
"""Replace token pattern matches and (if env given) provisioned secrets with REDACT."""
for _, pattern in TOKEN_PATTERNS:
text = pattern.sub(REDACT, text)
if env is not None:
for key, value in env.items():
if any(key.startswith(p) for p in sensitive_prefixes) and value:
if key.startswith("EGRESS_TOKEN_") and value:
for variant in _encoded_variants(value):
text = text.replace(variant, REDACT)
return text
# ---------------------------------------------------------------------------
# Known secrets detector
# Known secrets detector (Phase 1b)
# ---------------------------------------------------------------------------
def _encoded_variants(secret: str) -> list[str]:
@@ -164,179 +150,26 @@ def _encoded_variants(secret: str) -> list[str]:
return variants
# ---------------------------------------------------------------------------
# Fragmentation-resistant helpers
# ---------------------------------------------------------------------------
# Minimum length of alnum projection for projection-based checks to run.
# Short secrets produce too many false positives in projection space.
_ALNUM_MIN_LEN = 8
# Minimum window length for the partial-substring sliding scan.
PARTIAL_MATCH_MIN_LEN = 12
def _alnum_projection(text: str) -> str:
"""Return text with every non-alphanumeric character stripped.
Used for fragmentation-resistant matching: separator-injected secrets
(spaces, hyphens, dots inserted between characters) are identical to
their originals in alnum projection space.
"""
return "".join(c for c in text if c.isalnum())
def _find_partial_window(secret_alnum: str, text_alnum: str, min_len: int) -> int | None:
"""Return the position in text_alnum where any min_len-char window of
secret_alnum first appears, or None.
Slides a window of width min_len across secret_alnum and searches for
each window in text_alnum. The first hit position is returned.
"""
if len(secret_alnum) < min_len or len(text_alnum) < min_len:
return None
for i in range(len(secret_alnum) - min_len + 1):
window = secret_alnum[i:i + min_len]
pos = text_alnum.find(window)
if pos >= 0:
return pos
return None
def scan_known_secrets(
text: str,
*,
location: str = "body",
env: typing.Mapping[str, str] | None = None,
sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",),
safe_tokens: typing.AbstractSet[str] | None = None,
) -> ScanResult | None:
if env is None:
return None
# Pre-compute alnum projection of the scan text once; reused per secret.
text_alnum: str | None = None
for key, value in env.items():
if not any(key.startswith(p) for p in sensitive_prefixes) or not value:
if not key.startswith("EGRESS_TOKEN_") or not value:
continue
# Pass 1: exact match across encoded variants (original behaviour).
approved_exact = False
for variant in _encoded_variants(value):
pos = text.find(variant)
if pos >= 0:
# The supervisor approves the exact encoded variant found
# (PRD 0062); a different encoding of the same secret is a
# fresh block.
if safe_tokens is not None and variant in safe_tokens:
approved_exact = True
continue
return ScanResult(
severity="block",
reason=f"provisioned secret from {key} found in {location}",
location=location,
context=_snippet(text, pos, pos + len(variant)),
matched=variant,
)
if approved_exact:
# Exact match was found and approved; projection passes would
# fire on the same value, so skip them for this secret.
continue
# Pass 2 & 3: fragmentation-resistant projection checks.
secret_alnum = _alnum_projection(value)
if len(secret_alnum) < _ALNUM_MIN_LEN:
continue
if text_alnum is None:
text_alnum = _alnum_projection(text)
# Pass 2: full alnum-projection exact match (catches separator injection).
pos2 = text_alnum.find(secret_alnum)
if pos2 >= 0:
return ScanResult(
severity="block",
reason=(
f"provisioned secret from {key} found in {location} "
f"(fragmented match — separator injection)"
),
location=location,
context=_snippet(text_alnum, pos2, pos2 + len(secret_alnum)),
)
# Pass 3: sliding-window partial match (catches chunked-substring leaks).
pos3 = _find_partial_window(secret_alnum, text_alnum, PARTIAL_MATCH_MIN_LEN)
if pos3 is not None:
return ScanResult(
severity="block",
reason=(
f"provisioned secret from {key} found in {location} "
f"(partial match — at least {PARTIAL_MATCH_MIN_LEN} consecutive "
f"alphanumeric chars)"
),
location=location,
context=_snippet(text_alnum, pos3, pos3 + PARTIAL_MATCH_MIN_LEN),
)
return None
# ---------------------------------------------------------------------------
# Entropy detector (warn-only)
# ---------------------------------------------------------------------------
# Sliding window size and step for the entropy scan.
ENTROPY_WINDOW = 64
ENTROPY_STEP = 32
# Bits-per-character threshold. Random ASCII printable ≈ 6.6 bits; random
# lowercase hex ≈ 4 bits; random base64url ≈ 6 bits. 5.5 sits above
# typical structured data (JSON, URLs) while staying below truly random
# content.
ENTROPY_BLOCK_THRESHOLD = 5.5
def _shannon_entropy(text: str) -> float:
if not text:
return 0.0
counts = Counter(text)
n = len(text)
return -sum((c / n) * log2(c / n) for c in counts.values())
def scan_entropy(
text: str,
*,
location: str = "body",
window: int = ENTROPY_WINDOW,
threshold: float = ENTROPY_BLOCK_THRESHOLD,
) -> ScanResult | None:
"""Warn-only detector: flag windows of `window` chars with Shannon entropy
above `threshold` bits per character.
Never blocks; always returns severity='warn'. Disabled by default
routes must opt in via dlp.outbound_detectors=['entropy'].
"""
if not text:
return None
step = max(1, window // 2)
end = len(text)
# Scan overlapping windows; also check the final tail if shorter than window.
positions = list(range(0, end - window + 1, step))
if end < window:
positions = [0]
elif (end - window) % step != 0:
positions.append(end - window)
for i in positions:
chunk = text[i:i + window]
if _shannon_entropy(chunk) >= threshold:
return ScanResult(
severity="warn",
reason=f"high-entropy content in {location} (possible encrypted exfil)",
location=location,
context=_snippet(text, i, i + len(chunk)),
)
return None
@@ -432,14 +265,6 @@ _CRLF_ENCODED_RE = re.compile(r"%0[dD]%0[aA]", re.ASCII)
_CRLF_HEADER_INJECT_RE = re.compile(r"\r\n[A-Za-z][A-Za-z0-9\-]+\s*:", re.ASCII)
def strip_crlf(text: str) -> str:
"""Remove URL-encoded and literal CRLF injection sequences from a request
surface (PRD 0062 redact policy). Used to scrub the request line / headers
so the request can be forwarded instead of hard-blocked."""
text = _CRLF_ENCODED_RE.sub("", text)
return _CRLF_HEADER_INJECT_RE.sub(lambda m: m.group(0)[2:], text)
def scan_crlf_injection(text: str) -> ScanResult | None:
if _CRLF_ENCODED_RE.search(text):
return ScanResult(
@@ -455,20 +280,12 @@ def scan_crlf_injection(text: str) -> ScanResult | None:
__all__ = [
"ENTROPY_BLOCK_THRESHOLD",
"ENTROPY_WINDOW",
"ENTROPY_STEP",
"PARTIAL_MATCH_MIN_LEN",
"REDACT",
"SNIPPET_CONTEXT",
"TOKEN_PATTERNS",
"_alnum_projection",
"_shannon_entropy",
"redact_tokens",
"scan_crlf_injection",
"scan_entropy",
"scan_known_secrets",
"scan_naive_injection",
"scan_token_patterns",
"strip_crlf",
]
+3 -85
View File
@@ -10,14 +10,12 @@ specific and lives on concrete subclasses (see
from __future__ import annotations
import dataclasses
import secrets
from abc import ABC
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING
from .egress_addon_core import (
ON_MATCH_REDACT,
HeaderMatch as CoreHeaderMatch,
MatchEntry as CoreMatchEntry,
PathMatch as CorePathMatch,
@@ -33,51 +31,6 @@ CODEX_HOST_CREDENTIAL_TOKEN_REF = "BOT_BOTTLE_CODEX_HOST_ACCESS_TOKEN"
EGRESS_HOSTNAME = "egress"
EGRESS_ROUTES_IN_CONTAINER = "/etc/egress/routes.yaml"
EGRESS_ROUTES_FILENAME = Path(EGRESS_ROUTES_IN_CONTAINER).name
_CANARY_ENV_WORDS = (
"ACCORD",
"ANCHOR",
"ATLAS",
"CANON",
"CIPHER",
"EMBER",
"FALCON",
"HARBOR",
"LANTERN",
"MARBLE",
"NOVA",
"ORBIT",
"PIVOT",
"RADIUS",
"SUMMIT",
"VECTOR",
)
def _random_canary_env() -> str:
first = secrets.choice(_CANARY_ENV_WORDS)
remaining = tuple(word for word in _CANARY_ENV_WORDS if word != first)
second = secrets.choice(remaining)
return f"{first}_{second}_SECRET"
def egress_sidecar_env_entries(plan: "EgressPlan") -> tuple[str, ...]:
"""Return sidecar env entries needed by egress across all backends."""
env: list[str] = []
if plan.routes:
env.extend(sorted(plan.token_env_map.keys()))
if plan.canary and plan.canary_env:
env.append(f"{plan.canary_env}={plan.canary}")
env.append(f"BOT_BOTTLE_SENSITIVE_PREFIXES={plan.canary_env}")
return tuple(env)
def egress_agent_env_entries(plan: "EgressPlan") -> tuple[str, ...]:
"""Return agent-visible egress env entries shared by all backends."""
if plan.canary and plan.canary_env:
return (f"{plan.canary_env}={plan.canary}",)
return ()
@dataclass(frozen=True)
@@ -110,8 +63,6 @@ class EgressPlan:
mitmproxy_ca_host_path: Path = Path()
mitmproxy_ca_cert_only_host_path: Path = Path()
log: int = 0
canary: str = ""
canary_env: str = ""
def egress_manifest_routes(
@@ -143,7 +94,6 @@ def egress_manifest_routes(
git_fetch=r.GitFetch,
outbound_detectors=r.OutboundDetectors,
inbound_detectors=r.InboundDetectors,
outbound_on_match=r.OutboundOnMatch,
))
return tuple(out)
@@ -154,27 +104,12 @@ def egress_routes_for_bottle(
) -> tuple[EgressRoute, ...]:
manifest = egress_manifest_routes(bottle)
provisioned_hosts = {pr.host.lower() for pr in provider_routes}
merged = list(_default_provider_on_match(provider_routes)) + [
merged = list(provider_routes) + [
r for r in manifest if r.host.lower() not in provisioned_hosts
]
return _assign_token_slots(merged)
def _default_provider_on_match(
provider_routes: tuple[EgressRoute, ...],
) -> tuple[EgressRoute, ...]:
"""Provider routes (the agent talking to its own LLM API) default to the
`redact` on-match policy (PRD 0062): high-volume conversation payloads are
the worst source of token-shaped false positives, so a match is scrubbed
and forwarded rather than hard-blocked or queued for the operator. A
provider that sets `outbound_on_match` explicitly keeps its choice."""
return tuple(
r if r.outbound_on_match
else dataclasses.replace(r, outbound_on_match=ON_MATCH_REDACT)
for r in provider_routes
)
def _assign_token_slots(
routes: list[EgressRoute],
) -> tuple[EgressRoute, ...]:
@@ -241,11 +176,7 @@ def _route_to_yaml_fields(r: Route) -> dict[str, object]:
fields["matches"] = matches_data
if r.git_fetch:
fields["git"] = {"fetch": True}
if (
r.outbound_detectors is not None
or r.inbound_detectors is not None
or r.outbound_on_match
):
if r.outbound_detectors is not None or r.inbound_detectors is not None:
dlp: dict[str, object] = {}
if r.outbound_detectors is not None:
dlp["outbound_detectors"] = (
@@ -257,8 +188,6 @@ def _route_to_yaml_fields(r: Route) -> dict[str, object]:
False if not r.inbound_detectors
else list(r.inbound_detectors)
)
if r.outbound_on_match:
dlp["outbound_on_match"] = r.outbound_on_match
fields["dlp"] = dlp
return fields
@@ -330,8 +259,6 @@ def egress_render_routes(
elif isinstance(dv, list):
items_str = ", ".join(f'"{x}"' for x in dv)
lines.append(f" {dk}: [{items_str}]")
elif isinstance(dv, str):
lines.append(f' {dk}: "{dv}"')
return "\n".join(lines) + "\n"
@@ -368,27 +295,20 @@ class Egress(ABC):
) -> EgressPlan:
routes = egress_routes_for_bottle(bottle, provider_routes)
log = bottle.egress.Log
routes_path = stage_dir / EGRESS_ROUTES_FILENAME
routes_path = stage_dir / "egress_routes.yaml"
routes_path.write_text(egress_render_routes(routes, log=log))
routes_path.chmod(0o600)
# Generate a per-session fake secret under a plausible random env name.
# The sidecar marks that exact env name as sensitive for known-secret
# scanning; the agent receives the same name/value as exfil bait.
canary = secrets.token_urlsafe(32)
return EgressPlan(
slug=slug,
routes_path=routes_path,
routes=routes,
token_env_map=egress_token_env_map(routes),
log=log,
canary=canary,
canary_env=_random_canary_env(),
)
__all__ = [
"CODEX_HOST_CREDENTIAL_TOKEN_REF",
"EGRESS_HOSTNAME",
"EGRESS_ROUTES_FILENAME",
"EGRESS_ROUTES_IN_CONTAINER",
"Egress",
"EgressPlan",
@@ -397,7 +317,5 @@ __all__ = [
"egress_render_routes",
"egress_resolve_token_values",
"egress_routes_for_bottle",
"egress_agent_env_entries",
"egress_sidecar_env_entries",
"egress_token_env_map",
]
+24 -284
View File
@@ -5,7 +5,7 @@ egress container."""
from __future__ import annotations
import asyncio
import dataclasses
import json
import os
import signal
@@ -17,15 +17,9 @@ from mitmproxy import http # type: ignore[import-not-found] # pylint: disable=
from egress_addon_core import ( # type: ignore[import-not-found] # pylint: disable=import-error
LOG_BLOCKS,
LOG_FULL,
DEFAULT_OUTBOUND_ON_MATCH,
ON_MATCH_BLOCK,
ON_MATCH_REDACT,
Config,
Route,
ScanResult,
build_inbound_scan_text,
build_outbound_scan_text,
build_token_allow_payload,
decide,
decide_git_fetch,
is_git_fetch_request,
@@ -33,61 +27,28 @@ from egress_addon_core import ( # type: ignore[import-not-found] # pylint: dis
load_config,
match_route,
outbound_scan_headers,
route_to_yaml_dict,
scan_inbound,
scan_outbound,
)
try:
from dlp_detectors import redact_tokens, strip_crlf # type: ignore[import-not-found]
from dlp_detectors import redact_tokens # type: ignore[import-not-found]
except ImportError: # pragma: no cover - host-side path
from bot_bottle.dlp_detectors import ( # type: ignore[import-not-found]
redact_tokens,
strip_crlf,
)
try:
import supervise as _sv # type: ignore[import-not-found]
except ImportError: # pragma: no cover - host-side path
from bot_bottle import supervise as _sv # type: ignore[import-not-found]
from bot_bottle.dlp_detectors import redact_tokens # type: ignore[import-not-found]
DEFAULT_ROUTES_PATH = "/etc/egress/routes.yaml"
INTROSPECT_HOST = "_egress.local"
# Seconds the egress proxy holds a token-blocked request open waiting for the
# operator's supervisor decision (PRD 0062), overridable via env.
DEFAULT_TOKEN_ALLOW_TIMEOUT_SECONDS = 300.0
# Filesystem poll cadence while awaiting the operator's response.
TOKEN_ALLOW_POLL_INTERVAL_SECONDS = 0.5
# Fixed operator guidance attached to every token-allow proposal.
_TOKEN_ALLOW_JUSTIFICATION = (
"egress DLP blocked an outbound request carrying a detected token. "
"Approve only if this value is a false positive or a credential this "
"request legitimately needs; the value is then allowed for the life of "
"this bottle's egress proxy."
)
class EgressAddon:
def __init__(self) -> None:
self.routes_path = os.environ.get("EGRESS_ROUTES", DEFAULT_ROUTES_PATH)
self.config: Config = Config(routes=())
# Tokens the operator has approved this session (PRD 0062). In-memory
# only — a restart re-prompts. Mutated only from the asyncio loop that
# runs the addon hooks, so no lock is needed.
self.safe_tokens: set[str] = set()
self._supervise_queue_dir = os.environ.get("SUPERVISE_QUEUE_DIR", "").strip()
self._supervise_slug = os.environ.get("SUPERVISE_BOTTLE_SLUG", "").strip()
self._token_allow_timeout = _token_allow_timeout_from_env(os.environ)
self._reload(initial=True)
self._install_sighup()
def _supervise_available(self) -> bool:
return bool(self._supervise_queue_dir and self._supervise_slug)
def _reload(self, *, initial: bool = False) -> None:
try:
text = Path(self.routes_path).read_text(encoding="utf-8")
@@ -121,7 +82,7 @@ class EgressAddon:
def _serve_introspection(self, flow: http.HTTPFlow, path: str) -> None:
if path == "/allowlist":
payload = json.dumps(
{"routes": [route_to_yaml_dict(r) for r in self.config.routes]},
{"routes": [dataclasses.asdict(r) for r in self.config.routes]},
indent=2,
).encode("utf-8")
flow.response = http.Response.make(
@@ -160,42 +121,31 @@ class EgressAddon:
)
def _log_request(self, flow: http.HTTPFlow) -> None:
headers = {
k: redact_tokens(v, env=os.environ)
for k, v in flow.request.headers.items()
if k.lower() != "authorization"
}
body = redact_tokens(flow.request.get_text(strict=False) or "", env=os.environ)
sys.stderr.write(
json.dumps({
"event": "egress_request",
"host": redact_tokens(flow.request.pretty_host, env=os.environ),
"method": flow.request.method,
"path": redact_tokens(flow.request.path, env=os.environ),
"headers": headers,
"body": body,
"headers": dict(flow.request.headers),
"body": flow.request.get_text(strict=False) or "",
})
+ "\n"
)
def _log_response(self, flow: http.HTTPFlow) -> None:
headers = {
k: redact_tokens(v, env=os.environ)
for k, v in flow.response.headers.items()
}
body = redact_tokens(flow.response.get_text(strict=False) or "", env=os.environ)
sys.stderr.write(
json.dumps({
"event": "egress_response",
"host": flow.request.pretty_host,
"status": flow.response.status_code,
"headers": headers,
"body": body,
"headers": dict(flow.response.headers),
"body": flow.response.get_text(strict=False) or "",
})
+ "\n"
)
async def request(self, flow: http.HTTPFlow) -> None:
def request(self, flow: http.HTTPFlow) -> None:
request_path, _, query = flow.request.path.partition("?")
if flow.request.pretty_host == INTROSPECT_HOST:
@@ -207,11 +157,21 @@ class EgressAddon:
# Hostname is included to catch DNS-tunnelling exfiltration attempts.
route = match_route(self.config.routes, flow.request.pretty_host)
if route is not None:
if not await self._handle_outbound_dlp(flow, route):
body = flow.request.get_text(strict=False) or ""
scan_text = build_outbound_scan_text(
flow.request.pretty_host,
request_path,
query,
outbound_scan_headers(route, dict(flow.request.headers)),
body,
)
dlp_result = scan_outbound(route, scan_text, os.environ)
if dlp_result is not None and dlp_result.severity == "block":
ctx = self._req_ctx(flow)
if dlp_result.context:
ctx = {**ctx, "context": dlp_result.context}
self._block(flow, f"egress DLP: {dlp_result.reason}", ctx=ctx)
return
# The redact policy may have rewritten the request line; recompute
# the path/query the git checks below rely on.
request_path, _, query = flow.request.path.partition("?")
if is_git_push_request(request_path, query):
self._block(
@@ -261,202 +221,6 @@ class EgressAddon:
if self.config.log >= LOG_FULL:
self._log_request(flow)
def _block_dlp(self, flow: http.HTTPFlow, result: ScanResult) -> None:
ctx = self._req_ctx(flow)
if result.context:
ctx = {**ctx, "context": result.context}
self._block(flow, f"egress DLP: {result.reason}", ctx=ctx)
async def _handle_outbound_dlp(
self,
flow: http.HTTPFlow,
route: Route,
) -> bool:
"""Scan the outbound request and apply the route's on-match policy
(PRD 0062). Returns True if the request may be forwarded, False if a
403 response has been written to `flow`.
Loops so the supervise policy can re-scan after each approval a
second, un-approved token in the same request is still caught."""
while True:
request_path, _, query = flow.request.path.partition("?")
body = flow.request.get_text(strict=False) or ""
headers = outbound_scan_headers(route, dict(flow.request.headers))
scan_text = build_outbound_scan_text(
flow.request.pretty_host, request_path, query, headers, body,
)
# CRLF is scanned only over the request line + headers, never the
# body (see scan_outbound) — a body is not an injection vector.
crlf_text = build_outbound_scan_text(
flow.request.pretty_host, request_path, query, headers, "",
)
result = scan_outbound(
route, scan_text, os.environ,
safe_tokens=self.safe_tokens, crlf_text=crlf_text,
)
if result is None or result.severity != "block":
return True
policy = route.outbound_on_match or DEFAULT_OUTBOUND_ON_MATCH
# redact scrubs every detection (tokens and structural CRLF) and
# forwards; it fails closed only if a match survives the scrub.
if policy == ON_MATCH_REDACT:
if self._redact_outbound(flow, route):
if self.config.log >= LOG_BLOCKS:
sys.stderr.write(json.dumps({
"event": "egress_redacted",
"reason": f"egress DLP: {result.reason}",
**self._req_ctx(flow),
}) + "\n")
return True
self._block(
flow,
f"egress DLP: {result.reason}; redaction could not remove "
"all matches (e.g. a match in the hostname)",
ctx=self._req_ctx(flow),
)
return False
# Structural blocks (CRLF, no safelist-able value) cannot be
# supervised — there is nothing to approve and remember — so under
# block/supervise they are a hard 403.
if policy == ON_MATCH_BLOCK or not result.matched:
self._block_dlp(flow, result)
return False
# supervise (default): hold the request for operator approval.
# Fall back to a hard 403 when supervise isn't wired for the bottle.
if not self._supervise_available():
self._block_dlp(flow, result)
return False
approved = await self._supervise_token_block(flow, request_path, result)
if not approved:
return False # _supervise_token_block wrote the 403 response
# loop: the approved value is now in safe_tokens; re-scan.
def _redact_outbound(self, flow: http.HTTPFlow, route: Route) -> bool:
"""Scrub detected tokens (and CRLF injection sequences) from the mutable
request surfaces (body, headers, path/query) and re-scan. Returns True
if the request is now clean; False if a block-severity match remains on
a surface redaction cannot rewrite (the hostname) so the caller fails
closed."""
body = flow.request.get_text(strict=False)
if body:
redacted_body = redact_tokens(body, env=os.environ)
if redacted_body != body:
flow.request.text = redacted_body
for name, value in list(flow.request.headers.items()):
if name.lower() == "host":
continue # routing-critical; never a legitimate token
redacted = strip_crlf(redact_tokens(value, env=os.environ))
if redacted != value:
flow.request.headers[name] = redacted
redacted_path = strip_crlf(redact_tokens(flow.request.path, env=os.environ))
if redacted_path != flow.request.path:
flow.request.path = redacted_path
request_path, _, query = flow.request.path.partition("?")
new_body = flow.request.get_text(strict=False) or ""
headers = outbound_scan_headers(route, dict(flow.request.headers))
scan_text = build_outbound_scan_text(
flow.request.pretty_host, request_path, query, headers, new_body,
)
crlf_text = build_outbound_scan_text(
flow.request.pretty_host, request_path, query, headers, "",
)
result = scan_outbound(route, scan_text, os.environ, crlf_text=crlf_text)
return result is None or result.severity != "block"
async def _supervise_token_block(
self,
flow: http.HTTPFlow,
request_path: str,
result: ScanResult,
) -> bool:
"""Route a token DLP block to the operator's supervisor queue and wait.
Returns True if the operator approved (the matched value is added to
`self.safe_tokens` and the caller re-scans); False if the request must
be blocked (a 403 response has been written to `flow`)."""
host = flow.request.pretty_host
payload = build_token_allow_payload(
redact_tokens(host, env=os.environ),
flow.request.method,
redact_tokens(request_path, env=os.environ),
result,
)
proposal = _sv.Proposal.new(
bottle_slug=self._supervise_slug,
tool=_sv.TOOL_EGRESS_TOKEN_ALLOW,
proposed_file=payload,
justification=_TOKEN_ALLOW_JUSTIFICATION,
current_file_hash=_sv.sha256_hex(payload),
)
queue_dir = Path(self._supervise_queue_dir)
try:
_sv.write_proposal(queue_dir, proposal)
except OSError as e:
sys.stderr.write(
f"egress: could not queue token-allow proposal: {e}; "
"blocking request\n"
)
self._block(flow, f"egress DLP: {result.reason}", ctx=self._req_ctx(flow))
return False
sys.stderr.write(json.dumps({
"event": "egress_token_supervise",
"reason": f"egress DLP: {result.reason}",
"proposal": proposal.id,
**self._req_ctx(flow),
}) + "\n")
response = await self._await_token_response(queue_dir, proposal.id)
_sv.archive_proposal(queue_dir, proposal.id)
if response is not None and response.status in (
_sv.STATUS_APPROVED, _sv.STATUS_MODIFIED,
):
self.safe_tokens.add(result.matched)
if self.config.log >= LOG_BLOCKS:
sys.stderr.write(json.dumps({
"event": "egress_token_allowed",
"reason": f"egress DLP: {result.reason}",
"proposal": proposal.id,
**self._req_ctx(flow),
}) + "\n")
return True
if response is None:
reason = (
f"egress DLP: {result.reason}; supervisor approval timed out "
f"after {self._token_allow_timeout:g}s"
)
else:
reason = f"egress DLP: {result.reason}; supervisor rejected the request"
self._block(flow, reason, ctx=self._req_ctx(flow))
return False
async def _await_token_response(
self,
queue_dir: Path,
proposal_id: str,
) -> "_sv.Response | None":
"""Poll the queue dir for the operator's response without blocking the
proxy event loop. Returns the Response, or None on timeout."""
loop = asyncio.get_running_loop()
deadline = loop.time() + self._token_allow_timeout
while True:
try:
return _sv.read_response(queue_dir, proposal_id)
except (OSError, ValueError, KeyError):
# Not written yet, or a partial/malformed write — retry until
# the deadline, then fail closed.
pass
if loop.time() >= deadline:
return None
await asyncio.sleep(TOKEN_ALLOW_POLL_INTERVAL_SECONDS)
def response(self, flow: http.HTTPFlow) -> None:
"""DLP inbound scan on response headers and body."""
route = match_route(self.config.routes, flow.request.pretty_host)
@@ -508,12 +272,7 @@ class EgressAddon:
message = flow.websocket.messages[-1] # type: ignore[union-attr]
content = message.content.decode("utf-8", errors="replace")
if message.from_client:
# A WebSocket data frame is not an HTTP request line, so CRLF is
# not an injection vector here — scan only for credential leakage.
result = scan_outbound(
route, content, os.environ,
safe_tokens=self.safe_tokens, crlf_text="",
)
result = scan_outbound(route, content, os.environ)
if result is not None and result.severity == "block":
sys.stderr.write(f"egress DLP: {result.reason}\n")
flow.kill() # type: ignore[union-attr]
@@ -527,23 +286,4 @@ class EgressAddon:
sys.stderr.write(f"egress DLP warn: {result.reason}\n")
def _token_allow_timeout_from_env(env: "os._Environ[str]") -> float:
"""Read EGRESS_TOKEN_ALLOW_TIMEOUT_SECONDS; fall back to the default on an
unset or invalid value (a bad value should not wedge egress at boot)."""
raw = env.get("EGRESS_TOKEN_ALLOW_TIMEOUT_SECONDS", "").strip()
if not raw:
return DEFAULT_TOKEN_ALLOW_TIMEOUT_SECONDS
try:
value = float(raw)
except ValueError:
value = 0.0
if value <= 0:
sys.stderr.write(
"egress: invalid EGRESS_TOKEN_ALLOW_TIMEOUT_SECONDS="
f"{raw!r}; using default {DEFAULT_TOKEN_ALLOW_TIMEOUT_SECONDS:g}s\n"
)
return DEFAULT_TOKEN_ALLOW_TIMEOUT_SECONDS
return value
addons = [EgressAddon()]
+22 -159
View File
@@ -34,18 +34,9 @@ VALID_METHODS = frozenset({
"CONNECT",
})
OUTBOUND_DETECTOR_NAMES = frozenset({"token_patterns", "known_secrets", "entropy"})
OUTBOUND_DETECTOR_NAMES = frozenset({"token_patterns", "known_secrets"})
INBOUND_DETECTOR_NAMES = frozenset({"naive_injection_detection"})
# Per-route policy for what the proxy does when an outbound DLP detector
# matches a token (PRD 0062).
ON_MATCH_BLOCK = "block" # hard 403, never overridable
ON_MATCH_REDACT = "redact" # scrub the matched value, forward the request
ON_MATCH_SUPERVISE = "supervise" # queue for operator approval, hold the request
OUTBOUND_ON_MATCH_VALUES = (ON_MATCH_BLOCK, ON_MATCH_REDACT, ON_MATCH_SUPERVISE)
# Unset resolves to supervise (fall back to block when supervise is not wired).
DEFAULT_OUTBOUND_ON_MATCH = ON_MATCH_SUPERVISE
@dataclass(frozen=True)
class PathMatch:
@@ -78,8 +69,6 @@ class Route:
git_fetch: bool = False
outbound_detectors: tuple[str, ...] | None = None
inbound_detectors: tuple[str, ...] | None = None
# "" means unset → DEFAULT_OUTBOUND_ON_MATCH. See OUTBOUND_ON_MATCH_VALUES.
outbound_on_match: str = ""
LOG_OFF = 0 # no logging
@@ -106,11 +95,6 @@ class ScanResult:
reason: str
location: str = "" # where the match was found, e.g. "body", "authorization header"
context: str = "" # surrounding text with the match replaced by REDACT
# Raw substring the detector matched. Used inside the sidecar to key the
# supervisor-approved "safe tokens" set (PRD 0062); never logged or written
# to a proposal file. Empty for structural detectors (CRLF) that carry no
# safelist-able value.
matched: str = ""
# ---------------------------------------------------------------------------
@@ -234,12 +218,12 @@ def _parse_detectors(
idx: int,
host: str,
raw_dict: dict[str, object],
) -> tuple[tuple[str, ...] | None, tuple[str, ...] | None, str]:
) -> tuple[tuple[str, ...] | None, tuple[str, ...] | None]:
"""Parse the optional `dlp` block on a route, returning
(outbound_detectors, inbound_detectors, outbound_on_match)."""
(outbound_detectors, inbound_detectors)."""
dlp_raw = raw_dict.get("dlp")
if dlp_raw is None:
return None, None, ""
return None, None
label = f"route[{idx}] ({host})"
if not isinstance(dlp_raw, dict):
raise ValueError(f"{label}: 'dlp' must be an object")
@@ -276,24 +260,13 @@ def _parse_detectors(
outbound = _parse_detector_field("outbound_detectors", OUTBOUND_DETECTOR_NAMES)
inbound = _parse_detector_field("inbound_detectors", INBOUND_DETECTOR_NAMES)
on_match = ""
on_match_raw = dlp.get("outbound_on_match")
if on_match_raw is not None:
if not isinstance(on_match_raw, str) or on_match_raw not in OUTBOUND_ON_MATCH_VALUES:
raise ValueError(
f"{label}: dlp.outbound_on_match must be one of "
f"{', '.join(OUTBOUND_ON_MATCH_VALUES)} (got {on_match_raw!r})"
)
on_match = on_match_raw
for k in dlp:
if k not in ("outbound_detectors", "inbound_detectors", "outbound_on_match"):
if k not in ("outbound_detectors", "inbound_detectors"):
raise ValueError(
f"{label}: dlp has unknown key {k!r}; accepted keys "
f"are 'outbound_detectors', 'inbound_detectors', "
f"'outbound_on_match'"
f"are 'outbound_detectors', 'inbound_detectors'"
)
return outbound, inbound, on_match
return outbound, inbound
def parse_routes(payload: object) -> tuple[Route, ...]:
@@ -364,7 +337,7 @@ def _parse_one(idx: int, raw: object) -> Route:
)
# dlp detectors
outbound_detectors, inbound_detectors, outbound_on_match = _parse_detectors(
outbound_detectors, inbound_detectors = _parse_detectors(
idx, host, raw_dict,
)
@@ -383,60 +356,16 @@ def _parse_one(idx: int, raw: object) -> Route:
git_fetch=git_fetch,
outbound_detectors=outbound_detectors,
inbound_detectors=inbound_detectors,
outbound_on_match=outbound_on_match,
)
def _path_match_to_dict(pm: PathMatch) -> dict[str, object]:
d: dict[str, object] = {"value": pm.value}
if pm.type != "prefix":
d["type"] = pm.type
return d
def _header_match_to_dict(hm: HeaderMatch) -> dict[str, object]:
d: dict[str, object] = {"name": hm.name, "value": hm.value}
if hm.type != "exact":
d["type"] = hm.type
return d
def _match_entry_to_dict(me: MatchEntry) -> dict[str, object]:
d: dict[str, object] = {}
if me.paths:
d["paths"] = [_path_match_to_dict(p) for p in me.paths]
if me.methods:
d["methods"] = list(me.methods)
if me.headers:
d["headers"] = [_header_match_to_dict(h) for h in me.headers]
return d
def route_to_yaml_dict(r: Route) -> dict[str, object]:
"""Serialize a Route to YAML-schema-compatible dict.
Uses the same field names the YAML parser accepts, so the output
can be round-tripped directly into an `allow` or `egress-block`
proposal without translation. Fields that are empty/default are
omitted so the agent doesn't copy irrelevant keys."""
d: dict[str, object] = {"host": r.host}
if r.auth_scheme:
d["auth_scheme"] = r.auth_scheme
d["token_env"] = r.token_env
if r.matches:
d["matches"] = [_match_entry_to_dict(m) for m in r.matches]
if r.git_fetch:
d["git"] = {"fetch": True}
dlp: dict[str, object] = {}
if r.outbound_detectors is not None:
dlp["outbound_detectors"] = list(r.outbound_detectors)
if r.inbound_detectors is not None:
dlp["inbound_detectors"] = list(r.inbound_detectors)
if r.outbound_on_match:
dlp["outbound_on_match"] = r.outbound_on_match
if dlp:
d["dlp"] = dlp
return d
def load_routes(text: str) -> tuple[Route, ...]:
"""Parse YAML text → routes."""
try:
payload = parse_yaml_subset(text)
except YamlSubsetError as e:
raise ValueError(f"routes payload: invalid YAML: {e}") from e
return parse_routes(payload)
def parse_config(payload: object) -> "Config":
@@ -711,103 +640,43 @@ def scan_outbound(
route: Route,
body: str | bytes,
environ: typing.Mapping[str, str],
*,
safe_tokens: typing.AbstractSet[str] | None = None,
crlf_text: str | None = None,
) -> ScanResult | None:
# Lazy import to avoid circular deps and keep dlp_detectors optional
# at import time (the sidecar copies it flat alongside this file).
try:
from dlp_detectors import ( # type: ignore[import-not-found]
scan_crlf_injection,
scan_entropy,
scan_known_secrets,
scan_token_patterns,
)
except ImportError: # pragma: no cover - host-side path
from .dlp_detectors import ( # type: ignore[import-not-found]
scan_crlf_injection,
scan_entropy,
scan_known_secrets,
scan_token_patterns,
)
# Binary bodies: latin-1 is a bijective byte↔codepoint mapping that
# preserves every byte value, so ASCII-range secret strings remain
# findable by str.find / regex. Prefer strict UTF-8 for valid text bodies.
if isinstance(body, bytes):
try:
text = body.decode("utf-8")
except UnicodeDecodeError:
text = body.decode("latin-1")
else:
text = body
text = body if isinstance(body, str) else body.decode("utf-8", errors="replace")
# CRLF injection is only an attack in the request line + headers, never the
# body: an HTTP body is delimited by Content-Length, so CRLF bytes there
# cannot split the request. Scanning the body produces false positives on
# legitimate form-encoded / multi-line content. Callers pass the
# body-excluded surfaces as `crlf_text`; `None` falls back to the full text
# for backward-compatible callers (host-side tests, websocket frames).
crlf_target = text if crlf_text is None else crlf_text
result = scan_crlf_injection(crlf_target)
# CRLF injection is never legitimate — runs unconditionally, not gated
# by outbound_detectors config.
result = scan_crlf_injection(text)
if result is not None:
return result
if _detector_enabled(route.outbound_detectors, "token_patterns"):
result = scan_token_patterns(text, location="body", safe_tokens=safe_tokens)
result = scan_token_patterns(text, location="body")
if result is not None:
return result
if _detector_enabled(route.outbound_detectors, "known_secrets"):
# BOT_BOTTLE_SENSITIVE_PREFIXES lets operators add extra env prefixes
# beyond EGRESS_TOKEN_* without changing the manifest schema.
extra_raw = environ.get("BOT_BOTTLE_SENSITIVE_PREFIXES", "")
extra = tuple(p for p in extra_raw.split(",") if p)
sensitive_prefixes = ("EGRESS_TOKEN_",) + extra
result = scan_known_secrets(
text, location="body", env=environ,
sensitive_prefixes=sensitive_prefixes, safe_tokens=safe_tokens,
)
if result is not None:
return result
# Entropy scanning requires explicit opt-in: it is NOT part of the
# default "all detectors" set because it produces false positives on
# legitimate base64 / binary payloads. Routes must list "entropy" in
# dlp.outbound_detectors to enable it.
if (
route.outbound_detectors is not None
and "entropy" in route.outbound_detectors
):
result = scan_entropy(text, location="body")
result = scan_known_secrets(text, location="body", env=environ)
if result is not None:
return result
return None
def build_token_allow_payload(
host: str,
method: str,
path: str,
result: ScanResult,
) -> str:
"""Render the human-readable supervisor proposal body for an outbound
token block (PRD 0062). Carries the host/method/path, the detector
reason, and the redacted context snippet never the raw token value."""
lines = [
"egress blocked an outbound request carrying a detected token",
f"host: {host}",
f"method: {method}",
f"path: {path}",
f"detector: {result.reason}",
]
if result.context:
lines.append(f"context: {result.context}")
return "\n".join(lines) + "\n"
def scan_inbound(
route: Route,
body: str | bytes,
@@ -829,14 +698,8 @@ def scan_inbound(
__all__ = [
"LOG_BLOCKS",
"route_to_yaml_dict",
"LOG_FULL",
"LOG_OFF",
"ON_MATCH_BLOCK",
"ON_MATCH_REDACT",
"ON_MATCH_SUPERVISE",
"OUTBOUND_ON_MATCH_VALUES",
"DEFAULT_OUTBOUND_ON_MATCH",
"Config",
"Decision",
"HeaderMatch",
@@ -846,13 +709,13 @@ __all__ = [
"ScanResult",
"build_inbound_scan_text",
"build_outbound_scan_text",
"build_token_allow_payload",
"decide",
"decide_git_fetch",
"evaluate_matches",
"is_git_push_request",
"is_git_fetch_request",
"load_config",
"load_routes",
"match_route",
"outbound_scan_headers",
"parse_config",
-161
View File
@@ -247,164 +247,6 @@ cat > "$refs_file"
zero=0000000000000000000000000000000000000000
supervise_gitleaks_allow() {
log_opts=$1
ref=$2
report_file=$(mktemp)
if ! gitleaks git \
--log-opts="$log_opts" \
--no-banner \
--redact \
--ignore-gitleaks-allow \
--report-format=json \
--report-path="$report_file" \
--exit-code 0 \
1>&2; then
rm -f "$report_file"
echo "git-gate: gitleaks inline-suppression scan failed for $ref" >&2
return 1
fi
proposal_id=$(
GITLEAKS_ALLOW_REF="$ref" python3 - "$report_file" <<'PY'
import datetime
import hashlib
import json
import os
import sys
import uuid
from pathlib import Path
report_path = Path(sys.argv[1])
queue_dir = os.environ.get("SUPERVISE_QUEUE_DIR", "")
slug = os.environ.get("SUPERVISE_BOTTLE_SLUG", "")
if not queue_dir or not slug:
sys.exit(2)
try:
raw = json.loads(report_path.read_text() or "[]")
except json.JSONDecodeError:
sys.exit(3)
if not isinstance(raw, list):
sys.exit(3)
if not raw:
sys.exit(0)
ref = os.environ.get("GITLEAKS_ALLOW_REF", "")
lines = [
"gitleaks inline suppression requires supervisor approval",
f"ref: {ref}",
"",
]
for i, finding in enumerate(raw, 1):
if not isinstance(finding, dict):
continue
file_path = finding.get("File", "")
line_no = finding.get("StartLine", finding.get("Line", ""))
rule_id = finding.get("RuleID", "")
commit = finding.get("Commit", "")
line = finding.get("Line", "")
lines.extend([
f"finding {i}:",
f" file: {file_path}",
f" line: {line_no}",
f" rule: {rule_id}",
f" commit: {commit}",
f" code: {line}",
"",
])
payload = "\n".join(lines).rstrip() + "\n"
proposal_id = str(uuid.uuid4())
proposal = {
"id": proposal_id,
"bottle_slug": slug,
"tool": "gitleaks-allow",
"proposed_file": payload,
"justification": (
"git-gate found gitleaks findings hidden by # gitleaks:allow; "
"approve only for dummy test fixtures or confirmed false positives"
),
"arrival_timestamp": datetime.datetime.now(
datetime.timezone.utc
).isoformat(),
"current_file_hash": hashlib.sha256(payload.encode("utf-8")).hexdigest(),
}
queue = Path(queue_dir)
queue.mkdir(parents=True, exist_ok=True)
path = queue / f"{proposal_id}.proposal.json"
tmp = path.with_suffix(path.suffix + ".tmp")
with tmp.open("w", encoding="utf-8") as f:
json.dump(proposal, f, indent=2)
f.write("\n")
os.chmod(tmp, 0o600)
os.replace(tmp, path)
print(proposal_id)
PY
)
rc=$?
rm -f "$report_file"
if [ "$rc" -eq 0 ] && [ -z "$proposal_id" ]; then
return 0
fi
if [ "$rc" -ne 0 ]; then
echo "git-gate: cannot route # gitleaks:allow finding to supervisor; refusing push" >&2
return 1
fi
queue_dir=${SUPERVISE_QUEUE_DIR:-}
response_file="$queue_dir/${proposal_id}.response.json"
timeout=${SUPERVISE_GITLEAKS_ALLOW_TIMEOUT_SECONDS:-300}
case "$timeout" in
''|*[!0-9]*)
echo "git-gate: invalid SUPERVISE_GITLEAKS_ALLOW_TIMEOUT_SECONDS=$timeout" >&2
return 1
;;
esac
echo "git-gate: queued # gitleaks:allow supervisor approval $proposal_id" >&2
echo "git-gate: approve with './cli.py supervise' to continue this push" >&2
waited=0
while [ "$waited" -lt "$timeout" ]; do
if [ -f "$response_file" ]; then
status=$(python3 - "$response_file" <<'PY'
import json
import sys
try:
with open(sys.argv[1], encoding="utf-8") as f:
raw = json.load(f)
except (OSError, json.JSONDecodeError):
sys.exit(1)
status = raw.get("status")
if not isinstance(status, str):
sys.exit(1)
print(status)
PY
) || status=""
case "$status" in
approved|modified)
mkdir -p "$queue_dir/processed"
mv -f "$queue_dir/${proposal_id}.proposal.json" "$queue_dir/processed/" 2>/dev/null || true
mv -f "$queue_dir/${proposal_id}.response.json" "$queue_dir/processed/" 2>/dev/null || true
echo "git-gate: supervisor approved # gitleaks:allow for $ref" >&2
return 0
;;
rejected)
echo "git-gate: supervisor rejected # gitleaks:allow for $ref" >&2
return 1
;;
*)
echo "git-gate: invalid supervisor response for # gitleaks:allow" >&2
return 1
;;
esac
fi
sleep 1
waited=$((waited + 1))
done
echo "git-gate: supervisor approval timed out for # gitleaks:allow; refusing push" >&2
return 1
}
# Phase 1: gitleaks scan each ref's incoming commits.
while IFS=' ' read -r old new ref; do
[ -z "$ref" ] && continue
@@ -426,9 +268,6 @@ while IFS=' ' read -r old new ref; do
echo "git-gate: gitleaks rejected push to $ref" >&2
exit 1
fi
if ! supervise_gitleaks_allow "$log_opts" "$ref"; then
exit 1
fi
done < "$refs_file"
# Phase 2: forward each ref to the upstream (`origin`, configured
+10 -96
View File
@@ -1,107 +1,21 @@
"""Tiny logging wrappers. All output goes to stderr.
Two capabilities layer onto the bare wrappers (issue #252):
- **Levels.** `debug` / `info` / `warn` / `error` carry an ordered
severity. Output is gated by `BOT_BOTTLE_LOG_LEVEL` (debug | info |
warn | error; default `info`). A message emits when its severity is
at or above the threshold, so `debug` is silent by default and
`error` always surfaces (nothing sits above it) which keeps the
fatal `die` path visible regardless of the configured level.
- **Context.** Every wrapper takes an optional `context` mapping that
renders as a parseable ` [k=v ...]` suffix (keys sorted; values with
whitespace/quotes are quoted), so failures can be filtered and
correlated instead of being flat strings.
With no `context` and the default level, output is byte-identical to the
original `bot-bottle: <msg>` / `bot-bottle: warning: <msg>` /
`bot-bottle: error: <msg>` lines the 100+ existing call sites are
unaffected.
"""
"""Tiny logging wrappers. All output goes to stderr."""
from __future__ import annotations
import os
import sys
from typing import Mapping, NoReturn
# Ordered severities. Gaps left between values so intermediate levels
# can be added later without renumbering.
DEBUG = 10
INFO = 20
WARN = 30
ERROR = 40
_LEVEL_NAMES: dict[str, int] = {
"debug": DEBUG,
"info": INFO,
"warn": WARN,
"warning": WARN,
"error": ERROR,
}
# Default threshold when BOT_BOTTLE_LOG_LEVEL is unset or unrecognised.
_DEFAULT_THRESHOLD = INFO
_LOG_LEVEL_ENV = "BOT_BOTTLE_LOG_LEVEL"
from typing import NoReturn
def _threshold() -> int:
"""Resolve the active level threshold from the environment.
Read per-call (not cached) so the level can be changed at runtime
and so tests can patch `os.environ` without a reload. Unknown values
fall back to the default rather than raising logging must never be
the thing that crashes the process."""
raw = os.environ.get(_LOG_LEVEL_ENV, "")
return _LEVEL_NAMES.get(raw.strip().lower(), _DEFAULT_THRESHOLD)
def info(msg: str) -> None:
print(f"bot-bottle: {msg}", file=sys.stderr)
def _format_context(context: Mapping[str, object] | None) -> str:
"""Render a context mapping as a ` [k=v k2=v2]` suffix.
Keys are sorted for stable, diffable output. Values that are empty or
contain whitespace or a quote are wrapped in double quotes (with inner
quotes escaped) so each `k=v` pair stays parseable. Empty/None context
renders as the empty string."""
if not context:
return ""
parts: list[str] = []
for key in sorted(context):
value = str(context[key])
if value == "" or any(ch.isspace() for ch in value) or '"' in value:
value = '"' + value.replace('"', '\\"') + '"'
parts.append(f"{key}={value}")
return " [" + " ".join(parts) + "]"
def warn(msg: str) -> None:
print(f"bot-bottle: warning: {msg}", file=sys.stderr)
def _emit(
level: int,
label: str,
msg: str,
context: Mapping[str, object] | None,
) -> None:
if level < _threshold():
return
prefix = f"{label}: " if label else ""
sys.stderr.write(f"bot-bottle: {prefix}{msg}{_format_context(context)}\n")
def debug(msg: str, *, context: Mapping[str, object] | None = None) -> None:
_emit(DEBUG, "debug", msg, context)
def info(msg: str, *, context: Mapping[str, object] | None = None) -> None:
_emit(INFO, "", msg, context)
def warn(msg: str, *, context: Mapping[str, object] | None = None) -> None:
_emit(WARN, "warning", msg, context)
def error(msg: str, *, context: Mapping[str, object] | None = None) -> None:
_emit(ERROR, "error", msg, context)
def error(msg: str) -> None:
print(f"bot-bottle: error: {msg}", file=sys.stderr)
class Die(SystemExit):
@@ -117,6 +31,6 @@ class Die(SystemExit):
self.message = message
def die(msg: str, *, context: Mapping[str, object] | None = None) -> NoReturn:
error(msg, context=context)
def die(msg: str) -> NoReturn:
error(msg)
raise Die(1, msg)
+9 -9
View File
@@ -19,7 +19,7 @@ Bottle schema (frontmatter):
repos: { <name>: <git-gate-entry>, ... } # optional
egress: { routes: [ <egress-route>, ... ] }
# route keys: host, matches, auth, role, dlp
supervise: <bool> # optional (default true)
supervise: <bool> # optional
Agent schema (frontmatter):
bottle: <bottle-name> # required
@@ -111,13 +111,13 @@ class ManifestBottle:
# identity without any git-gate.repos upstreams, and vice versa.
git_user: ManifestGitUser = field(default_factory=ManifestGitUser)
egress: ManifestEgressConfig = field(default_factory=ManifestEgressConfig)
# Per-bottle stuck-recovery sidecar (PRD 0013). When true (the
# default, issue #249), the launch step brings up a supervise
# sidecar that exposes MCP tools to the agent (egress-block,
# capability-block) plus mounts the current-config dir read-only
# into the agent at /etc/bot-bottle/current-config. Set
# `supervise: false` to skip the sidecar and mount.
supervise: bool = True
# Opt-in per-bottle stuck-recovery sidecar (PRD 0013). When true,
# the launch step brings up a supervise sidecar that exposes MCP
# tools to the agent (egress-block, capability-block) plus mounts
# the current-config dir read-only into the agent at
# /etc/bot-bottle/current-config. False (the default) skips the
# sidecar and mount.
supervise: bool = False
@classmethod
def from_dict(cls, name: str, raw: object) -> "ManifestBottle":
@@ -190,7 +190,7 @@ class ManifestBottle:
else ManifestEgressConfig()
)
supervise_raw = d.get("supervise", True)
supervise_raw = d.get("supervise", False)
if not isinstance(supervise_raw, bool):
raise ManifestError(
f"bottle '{name}' supervise must be a boolean "
+6 -28
View File
@@ -199,10 +199,13 @@ def _parse_provider_settings(
) -> dict[str, object]:
if raw is None:
return {}
if template != "pi":
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.settings is only "
"supported for template 'pi'"
)
settings = as_json_object(raw, f"bottle '{bottle_name}' agent_provider.settings")
common_allowed = {"startup_args"}
pi_allowed = {
allowed = {
"provider",
"base_url",
"api",
@@ -215,37 +218,12 @@ def _parse_provider_settings(
"supports_developer_role",
"supports_reasoning_effort",
}
if template == "pi":
allowed = common_allowed | pi_allowed
elif template in ("claude", "codex"):
allowed = common_allowed
elif template not in PROVIDER_TEMPLATES:
return dict(settings)
else:
allowed = common_allowed
for key in settings:
if key not in allowed:
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.settings has unknown "
f"key {key!r}; allowed: {', '.join(sorted(allowed))}"
)
startup_args = settings.get("startup_args")
if startup_args is not None:
if not isinstance(startup_args, list):
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.settings.startup_args "
f"must be an array of strings"
)
for i, arg in enumerate(startup_args):
if not isinstance(arg, str) or not arg:
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.settings."
f"startup_args[{i}] must be a non-empty string"
)
if template != "pi":
return dict(settings)
for key in ("provider", "base_url", "api", "api_key", "api_key_env"):
value = settings.get(key)
if value is not None and (not isinstance(value, str) or not value):
+5 -22
View File
@@ -21,9 +21,6 @@ VALID_METHODS = frozenset({
OUTBOUND_DETECTOR_NAMES = frozenset({"token_patterns", "known_secrets"})
INBOUND_DETECTOR_NAMES = frozenset({"naive_injection_detection"})
# What the proxy does on an outbound token match (PRD 0062).
OUTBOUND_ON_MATCH_VALUES = ("block", "redact", "supervise")
def validate_egress_routes(
bottle_name: str,
@@ -70,7 +67,6 @@ class ManifestEgressRoute:
GitFetch: bool = False
OutboundDetectors: tuple[str, ...] | None = None
InboundDetectors: tuple[str, ...] | None = None
OutboundOnMatch: str = ""
@classmethod
def from_dict(cls, bottle_name: str, idx: int, raw: object) -> "ManifestEgressRoute":
@@ -165,9 +161,8 @@ class ManifestEgressRoute:
# --- dlp ---
outbound_detectors: tuple[str, ...] | None = None
inbound_detectors: tuple[str, ...] | None = None
outbound_on_match = ""
if "dlp" in d:
outbound_detectors, inbound_detectors, outbound_on_match = _parse_dlp_block(
outbound_detectors, inbound_detectors = _parse_dlp_block(
label, d.get("dlp"),
)
@@ -206,7 +201,6 @@ class ManifestEgressRoute:
GitFetch=git_fetch,
OutboundDetectors=outbound_detectors,
InboundDetectors=inbound_detectors,
OutboundOnMatch=outbound_on_match,
)
@@ -329,7 +323,7 @@ def _parse_header_match(
def _parse_dlp_block(
route_label: str,
raw: object,
) -> tuple[tuple[str, ...] | None, tuple[str, ...] | None, str]:
) -> tuple[tuple[str, ...] | None, tuple[str, ...] | None]:
label = f"{route_label} dlp"
d = as_json_object(raw, label)
@@ -364,24 +358,13 @@ def _parse_dlp_block(
outbound = _parse_field("outbound_detectors", OUTBOUND_DETECTOR_NAMES)
inbound = _parse_field("inbound_detectors", INBOUND_DETECTOR_NAMES)
on_match = ""
on_match_raw = d.get("outbound_on_match")
if on_match_raw is not None:
if not isinstance(on_match_raw, str) or on_match_raw not in OUTBOUND_ON_MATCH_VALUES:
raise ManifestError(
f"{label} outbound_on_match must be one of "
f"{', '.join(OUTBOUND_ON_MATCH_VALUES)} (got {on_match_raw!r})"
)
on_match = on_match_raw
for k in d:
if k not in ("outbound_detectors", "inbound_detectors", "outbound_on_match"):
if k not in ("outbound_detectors", "inbound_detectors"):
raise ManifestError(
f"{label} has unknown key {k!r}; accepted keys are "
f"'outbound_detectors', 'inbound_detectors', "
f"'outbound_on_match'"
f"'outbound_detectors', 'inbound_detectors'"
)
return outbound, inbound, on_match
return outbound, inbound
LOG_LEVELS = frozenset({0, 1, 2})
+12 -29
View File
@@ -5,7 +5,7 @@ queue/audit support. The sidecar (bot_bottle.supervise_server)
sits on the bottle's internal network and exposes three MCP tools the
agent calls when it hits a stuck-recovery category:
* egress-block / allow agent proposes a new routes.yaml
* egress-block agent proposes a new routes.yaml
* capability-block agent proposes a new agent Dockerfile
Each tool call: the agent passes the full proposed file plus a
@@ -49,40 +49,27 @@ SUPERVISE_HOSTNAME = "supervise"
SUPERVISE_PORT = 9100
TOOL_CAPABILITY_BLOCK = "capability-block"
TOOL_EGRESS_BLOCK = "egress-block"
TOOL_EGRESS_ALLOW = "egress-allow"
TOOL_GITLEAKS_ALLOW = "gitleaks-allow"
# Written directly by the egress addon (not an agent-facing MCP tool) when an
# outbound DLP token block is routed to the operator for override (PRD 0062).
TOOL_EGRESS_TOKEN_ALLOW = "egress-token-allow"
TOOL_LIST_EGRESS_ROUTES = "list-egress-routes"
TOOLS: tuple[str, ...] = (
TOOL_EGRESS_ALLOW,
TOOL_CAPABILITY_BLOCK,
TOOL_EGRESS_BLOCK,
TOOL_GITLEAKS_ALLOW,
TOOL_EGRESS_TOKEN_ALLOW,
TOOL_LIST_EGRESS_ROUTES,
)
# The supervise sidecar uses these to query egress's
# introspection endpoint for the `list-egress-routes` MCP
# tool. The hostname + port match egress's docker network
# listen port (see backend.docker.egress.EGRESS_PORT). The supervise
# daemon runs inside the sidecar bundle alongside egress, so loopback
# is the stable address across docker, smolmachines, and Apple
# Container backends.
EGRESS_FORWARD_PROXY = "http://127.0.0.1:9099"
# alias + listen port (see bot_bottle.egress.EGRESS_HOSTNAME
# and backend.docker.egress.EGRESS_PORT — the values
# are inlined here so the in-container supervise_server doesn't
# need to import the egress package).
EGRESS_FORWARD_PROXY = "http://egress:9099"
EGRESS_INTROSPECT_URL = "http://_egress.local/allowlist"
# capability-block has no on-disk config the operator edits in place
# (the Dockerfile is rebuilt, not patched), so it has no audit log
# here — those changes are captured by git history + the rebuild record
# laid down in PRD 0016.
COMPONENT_FOR_TOOL: dict[str, str] = {
TOOL_EGRESS_ALLOW: "egress",
TOOL_EGRESS_BLOCK: "egress",
}
# here — those changes are captured by git history + the rebuild
# record laid down in PRD 0016. egress-block was removed in issue #198.
COMPONENT_FOR_TOOL: dict[str, str] = {}
STATUS_APPROVED = "approved"
STATUS_MODIFIED = "modified"
@@ -444,9 +431,9 @@ def sha256_hex(content: str) -> str:
# Dockerfile and propose modifications.
#
# routes.yaml + allowlist used to live here too; PRD 0017 chunk 3
# moved them behind the `list-egress-routes` MCP tool (live state
# from egress's introspection endpoint) so the agent always sees
# current data rather than a launch-time snapshot.
# moved them behind the `list-egress-routes` MCP tool (live
# state from egress's introspection endpoint) so the agent
# always sees current data rather than a launch-time snapshot.
CURRENT_CONFIG_DOCKERFILE = "Dockerfile"
@@ -559,10 +546,6 @@ __all__ = [
"EGRESS_FORWARD_PROXY",
"EGRESS_INTROSPECT_URL",
"TOOL_CAPABILITY_BLOCK",
"TOOL_EGRESS_ALLOW",
"TOOL_EGRESS_BLOCK",
"TOOL_GITLEAKS_ALLOW",
"TOOL_EGRESS_TOKEN_ALLOW",
"TOOL_LIST_EGRESS_ROUTES",
"archive_proposal",
"audit_dir",
+10 -115
View File
@@ -1,8 +1,8 @@
"""Supervise sidecar HTTP server (PRD 0013).
Per-bottle MCP server exposing tools the agent calls to propose config
changes when stuck. The tools are `allow`, `egress-block`,
`capability-block`, and `list-egress-routes`.
changes when stuck. The egress-block tool was removed in issue #198;
the remaining tools are `capability-block` and `list-egress-routes`.
Each queued tool call:
@@ -44,15 +44,9 @@ import urllib.request
from dataclasses import dataclass
from pathlib import Path
try:
# Same-directory imports inside the bundle container; these files are
# COPYed flat under /app by Dockerfile.sidecars.
from egress_addon_core import LOG_OFF, load_config
import supervise as _sv
except ModuleNotFoundError:
# Package imports for host-side tests and tooling.
from .egress_addon_core import LOG_OFF, load_config
from . import supervise as _sv
# Same-directory import inside the bundle container; `supervise.py`
# is COPYed alongside this file by Dockerfile.sidecars.
import supervise as _sv
# --- JSON-RPC / MCP plumbing ----------------------------------------------
@@ -148,9 +142,8 @@ TOOL_DEFINITIONS: list[dict[str, object]] = [
"allowlist. Returns JSON with one entry per allowed host, "
"each carrying its matches rules (if any) and whether "
"the proxy injects Authorization for the route. Use this "
"before composing an `egress-allow` or `egress-block` proposal so "
"the new routes file extends the live one rather than "
"replacing it."
"before composing an `egress-block` proposal so the new "
"routes file extends the live one rather than replacing it."
),
"inputSchema": {
"type": "object",
@@ -158,90 +151,6 @@ TOOL_DEFINITIONS: list[dict[str, object]] = [
"additionalProperties": False,
},
},
{
"name": _sv.TOOL_EGRESS_ALLOW,
"description": (
"Request operator approval to change the bottle's egress "
"allowlist. Pass the full proposed routes.yaml content, not "
"just the new host, plus a justification. Use "
"`list-egress-routes` first so the proposal preserves existing "
"routes."
),
"inputSchema": {
"type": "object",
"properties": {
"routes_yaml": {
"type": "string",
"description": (
"Full proposed /etc/egress/routes.yaml content. "
"Each route entry accepts these keys:\n"
" host: <hostname> (required)\n"
" auth_scheme: Bearer|token (must pair with token_env)\n"
" token_env: <ENV_VAR_NAME> (must pair with auth_scheme)\n"
" matches: (optional list of match entries)\n"
" - paths: [{type: prefix|exact|regex, value: /...}]\n"
" methods: [GET, POST, ...]\n"
" headers: [{name: X-Hdr, value: val, type: exact|regex}]\n"
" git: (optional; omit to block git clone/fetch)\n"
" fetch: true\n"
" dlp: (optional DLP scanner overrides)\n"
" outbound_detectors: [token_patterns, known_secrets]\n"
" inbound_detectors: [naive_injection_detection]\n"
" outbound_on_match: block|redact|supervise (default supervise)\n"
"Omit any key that should use its default. "
"`list-egress-routes` returns routes in this same format."
),
},
"justification": {
"type": "string",
"description": "Why this egress route is needed.",
},
},
"required": ["routes_yaml", "justification"],
},
},
{
"name": _sv.TOOL_EGRESS_BLOCK,
"description": (
"Request operator approval to change the bottle's egress "
"allowlist after a blocked outbound request. Pass the full "
"proposed routes.yaml content plus a justification. Use "
"`list-egress-routes` first so the proposal preserves existing "
"routes."
),
"inputSchema": {
"type": "object",
"properties": {
"routes_yaml": {
"type": "string",
"description": (
"Full proposed /etc/egress/routes.yaml content. "
"Each route entry accepts these keys:\n"
" host: <hostname> (required)\n"
" auth_scheme: Bearer|token (must pair with token_env)\n"
" token_env: <ENV_VAR_NAME> (must pair with auth_scheme)\n"
" matches: (optional list of match entries)\n"
" - paths: [{type: prefix|exact|regex, value: /...}]\n"
" methods: [GET, POST, ...]\n"
" headers: [{name: X-Hdr, value: val, type: exact|regex}]\n"
" git: (optional; omit to block git clone/fetch)\n"
" fetch: true\n"
" dlp: (optional DLP scanner overrides)\n"
" outbound_detectors: [token_patterns, known_secrets]\n"
" inbound_detectors: [naive_injection_detection]\n"
" outbound_on_match: block|redact|supervise (default supervise)\n"
"Omit any key that should use its default. "
"`list-egress-routes` returns routes in this same format."
),
},
"justification": {
"type": "string",
"description": "Why this egress route is needed.",
},
},
"required": ["routes_yaml", "justification"],
},
},
{
"name": _sv.TOOL_CAPABILITY_BLOCK,
"description": (
@@ -273,12 +182,11 @@ TOOL_DEFINITIONS: list[dict[str, object]] = [
]
# Map each proposal tool to the input field that carries the agent's
# payload (stored in Proposal.proposed_file).
# Map each non-egress tool to the input field that carries the agent's
# payload (stored in Proposal.proposed_file). egress-block builds its
# payload from structured input fields in `handle_egress_block`.
PROPOSED_FILE_FIELD: dict[str, str] = {
_sv.TOOL_EGRESS_ALLOW: "routes_yaml",
_sv.TOOL_CAPABILITY_BLOCK: "dockerfile",
_sv.TOOL_EGRESS_BLOCK: "routes_yaml",
}
@@ -295,19 +203,6 @@ def validate_proposed_file(tool: str, content: str) -> None:
# Dockerfiles are too varied to validate syntactically beyond
# non-empty. The operator reads the diff in the TUI.
pass
elif tool in (_sv.TOOL_EGRESS_ALLOW, _sv.TOOL_EGRESS_BLOCK):
try:
config = load_config(content)
except ValueError as e:
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: proposed routes.yaml is not valid: {e}",
) from e
if config.log != LOG_OFF:
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: proposed routes.yaml must not change egress logging",
)
else:
raise _RpcError(ERR_INVALID_PARAMS, f"unknown tool {tool!r}")
-159
View File
@@ -1,159 +0,0 @@
# PRD 0060: Commit bottle state to an image
- **Status:** Active
- **Author:** Claude
- **Created:** 2026-06-20
- **Issue:** #194
## Summary
Add a `commit` CLI command that freezes a running bottle's state to a
resumable local artifact. Docker bottles are stored as Docker images;
smolmachines bottles are stored as `.smolmachine` artifacts. Operators
can then resume the bottle from that exact filesystem snapshot, or
export the artifact to migrate work to a different host.
## Problem
When a long-running agent session is interrupted — by a host reboot, a
network failure, or a planned infrastructure migration — the in-progress
container state is lost. `cli.py resume` rebuilds the agent image from
the Dockerfile and reprovi-sions the bottle, but that returns the guest
to its initial state, not to wherever the agent was mid-task.
There is no mechanism today to capture "what's installed / configured
inside the running container right now" and make it reproducible. The
`capability-block` flow writes a new Dockerfile and marks the bottle for
resume, but that only applies when the agent itself has requested a
capability change; it doesn't help the operator who wants to take a
snapshot before a planned host reboot or hardware migration.
## Goals / Success Criteria
- `./cli.py commit [<slug>]` takes a snapshot of the running agent and
stores it as a local artifact.
- Without a slug argument the command shows the same interactive picker
as `start` (the list of active slugs).
- The committed artifact reference is stored in per-bottle state so
that the next `./cli.py resume <slug>` automatically uses the
snapshot instead of rebuilding from the Dockerfile.
- `mark_preserved` is called so the state dir survives the normal
session-end cleanup.
- A backend-specific export hint is printed so operators know how to
migrate the snapshot.
- The command errors clearly on unsupported backends.
## Non-goals
- macOS-container backend support.
- Automatic commit on agent exit.
- Image push to a remote registry.
- Storing the image tag in the manifest or sharing it between operators.
## Design
### Docker image tag
`bot-bottle-committed-<slug>:latest` — namespaced under `bot-bottle-`
to match existing image naming conventions; `committed` distinguishes it
from the build-time image (`bot-bottle-claude:latest`) and the
capability-block rebuild image (`bot-bottle-rebuilt-<identity>:latest`).
### State storage
A new plain-text file `committed-image` is added to the per-bottle state
directory:
```
~/.bot-bottle/state/<identity>/
metadata.json
Dockerfile (capability-block override; optional)
committed-image (committed artifact reference; optional)
transcript/
```
`bottle_state.committed_image_path(identity)` returns the path.
`write_committed_image` / `read_committed_image` are the read/write
helpers, matching the existing `per_bottle_dockerfile` pattern. Docker
stores a Docker tag in this file; smolmachines stores the absolute path
to the committed `.smolmachine` artifact.
### `commit` command
```
./cli.py commit [<slug>]
```
1. Resolve slug (arg or interactive picker from `enumerate_active_agents`).
2. Check metadata and branch by backend.
3. For Docker, derive container name `bot-bottle-<slug>` and run
`docker commit <container> bot-bottle-committed-<slug>:latest`.
4. For smolmachines, derive machine name `bot-bottle-<slug>` and run
`smolvm pack create --from-vm <machine> -o ~/.bot-bottle/state/<slug>/committed-smolmachine`.
5. Write the Docker image tag or smolmachine artifact path to
`~/.bot-bottle/state/<slug>/committed-image`.
6. Call `mark_preserved(<slug>)` so the state dir survives session-end.
7. Print the resume hint and a backend-specific export example.
### Resume from committed image
`bot_bottle/backend/docker/launch.py` already rebuilds the agent image
at the top of the `launch` context manager. The change is a check
immediately before that step:
```python
committed = read_committed_image(plan.slug)
if committed and docker_mod.image_exists(committed):
info(f"using committed image {committed!r}")
plan = dataclasses.replace(
plan,
agent_provision=dataclasses.replace(plan.agent_provision, image=committed),
)
else:
docker_mod.build_image(plan.image, _REPO_DIR, dockerfile=plan.dockerfile_path)
```
Replacing `agent_provision.image` propagates to `plan.image` (a
property) and from there to the Compose spec renderer's `_agent_service`
`image:` field, so the container boots from the committed snapshot.
The build step is skipped entirely when a committed image is found and
exists locally.
If the committed image has been deleted from the local daemon (e.g.
after `docker rmi` or a `docker system prune`), the launch falls back
to a normal Dockerfile build, matching the pre-commit behavior.
### Resume from committed smolmachine
`bot_bottle/backend/smolmachines/launch.py` checks the committed
reference before the normal Docker build -> pack cache path:
```python
committed = read_committed_image(plan.slug)
if committed and Path(committed).is_file():
return Path(committed)
return _ensure_smolmachine(plan.agent_image, dockerfile=plan.agent_dockerfile_path)
```
The returned path is passed to `smolvm machine create --from`, so the
resumed VM boots from the committed snapshot. If the artifact has been
deleted, launch falls back to the normal build and pack flow.
## Testing strategy
- Unit tests for `write_committed_image` / `read_committed_image` in
`tests/unit/test_bottle_state.py`, using the existing `_FakeHomeMixin`
pattern.
- Unit tests for `commit_container` in `tests/unit/test_docker_util_image.py`,
mocking `subprocess.run` and asserting on the `docker commit` argv.
- Unit tests for `cmd_commit` argument parsing, Docker commit,
smolmachines pack, and the unsupported backend error path, mocking
`enumerate_active_agents`, `commit_container`, and
`pack_create_from_vm`.
- Unit tests for the launch-step committed-image branch: patch
`read_committed_image` to return a tag, patch `image_exists` to return
True, and assert that `build_image` is not called and `plan.image` is
overridden.
- Unit tests for the smolmachines launch-step committed-artifact branch:
patch `read_committed_image` to return an existing path and assert the
normal `_ensure_smolmachine` path is skipped.
@@ -1,101 +0,0 @@
# PRD 0061: Gitleaks inline suppression supervision
- **Status:** Active
- **Author:** codex
- **Created:** 2026-06-10
- **Issue:** #208
## Summary
Route every `# gitleaks:allow` inline suppression through the existing
supervisor approval queue before git-gate forwards a push upstream. The
annotation remains available for test fixtures and confirmed false positives,
but it no longer lets an agent silently bypass the gate's secret scan.
## Problem
Gitleaks supports inline suppression comments. With the default behaviour,
a pushed line that contains a secret-like value and `# gitleaks:allow` is
ignored by gitleaks and accepted by git-gate. That is convenient for dummy
credentials in tests, but it also gives an agent a one-line bypass for a real
secret. Once the commit lands, the diff alone cannot prove whether the
suppression was legitimate.
## Goals / Success Criteria
1. Git-gate continues to run the normal gitleaks scan for every incoming ref.
2. After the normal scan passes, git-gate runs a second scan with
`--ignore-gitleaks-allow` and a JSON report so suppressed findings become
visible.
3. If that second scan reports no suppressed findings, the push proceeds
unchanged.
4. If it reports suppressed findings, git-gate creates a `gitleaks-allow`
supervisor proposal containing the ref, file path, line number, rule,
commit, and flagged line for each finding.
5. The push proceeds only when the supervisor explicitly approves the
proposal; rejection, malformed responses, missing supervisor configuration,
and timeout all refuse the push.
6. The supervisor TUI requires a reason when approving a `gitleaks-allow`
proposal, so the audit trail records whether the approval was for a test
fixture or a false positive.
## Non-goals
- Replacing gitleaks or changing the main secret-detection rule set.
- Removing support for `# gitleaks:allow`.
- Automatically classifying fixture files or false positives.
- Adding new supervisor transport or authentication mechanisms.
## Design
### Git-gate flow
`git_gate_render_hook()` emits a `supervise_gitleaks_allow` shell helper.
For each incoming ref, git-gate first runs the existing gitleaks command. If
that scan passes, it runs:
```sh
gitleaks git \
--log-opts="$log_opts" \
--no-banner \
--redact \
--ignore-gitleaks-allow \
--report-format=json \
--report-path="$report_file" \
--exit-code 0
```
The second pass keeps the push path non-interactive while producing a report
of findings that would otherwise have been hidden by inline suppression.
### Supervisor proposal
When the JSON report contains findings, an embedded Python helper writes a
proposal into `SUPERVISE_QUEUE_DIR` using the existing proposal schema. The
proposal uses:
- `tool: "gitleaks-allow"`
- a text payload with the ref and each finding's file, line, rule, commit,
and redacted code line
- a justification that tells the operator to approve only dummy test fixtures
or confirmed false positives
Git-gate then waits for `<proposal-id>.response.json` for
`SUPERVISE_GITLEAKS_ALLOW_TIMEOUT_SECONDS`, defaulting to 300 seconds.
`approved` and `modified` responses allow the push; `rejected`, invalid
responses, invalid timeout configuration, or timeout refuse it.
### Supervisor UI
`TOOL_GITLEAKS_ALLOW` is added to the supervisor tool registry. The curses
supervisor renders the proposal as text and allows approval or rejection.
Modification is unavailable for this proposal type because there is no file
patch to apply. Approval from the TUI prompts for a non-empty reason and
writes that reason to the response/audit path.
### Tests
Unit tests assert that the rendered git-gate hook includes the second gitleaks
pass, supervisor queue fields, and fail-closed messages. Supervisor tests cover
the new tool constant, proposal archiving, and the required TUI approval
reason.
@@ -1,210 +0,0 @@
# PRD 0062: Supervisor override for egress token blocks
- **Status:** Active
- **Author:** claude
- **Created:** 2026-06-24
- **Issue:** #261
## Summary
Give each egress route a policy for what happens when an outbound DLP detector
matches a token, via `dlp.outbound_on_match: block | redact | supervise`
(default `supervise`):
- **`supervise`** (default) — route the block through the existing supervisor
approval queue instead of returning `403` immediately. The proxy holds the
request open until the operator approves or rejects it. On approval the
matched token is added to an in-memory "safe tokens" set so the request — and
any later request carrying the same token — flows through without
re-prompting.
- **`redact`** — scrub the matched value(s) from the request and forward it,
no operator in the loop. For routes where a token-shaped value is noise the
upstream doesn't need (telemetry/log sinks). Fails closed if a match lands on
a surface redaction can't rewrite (the hostname).
- **`block`** — the original hard `403`; never overridable. For routes where a
detected token must always stop.
The motivating goal is reducing friction from false positives without weakening
the default-deny posture: supervise keeps a human in the loop, redact is an
explicit per-route opt-in, and block stays available for sensitive routes.
## Problem
The outbound DLP detectors (`token_patterns`, `known_secrets`) are
deliberately aggressive: any string that looks like a credential is blocked
before it leaves the bottle. That is the right default, but it produces false
positives — a token-shaped value that is not actually a secret, or a credential
the agent legitimately needs to send to a declared host. Today the only
recovery is for the operator to notice the `egress DLP` 403 in the logs and
hand-edit the route's `dlp.outbound_detectors`, which disables the detector for
the whole route rather than allowing the one value.
The operator has no in-the-loop signal that a token block happened and no
fine-grained way to say "this specific value is fine."
## Goals / Success Criteria
1. An outbound DLP **token** block (a `ScanResult` carrying a matched secret
value) creates a supervisor proposal instead of an immediate `403`.
2. The egress proxy holds the blocked request open, polling for the operator's
response up to a bounded timeout.
3. The proposal shows the operator the host, method, path, the detector reason,
and a **redacted** context snippet — never the raw token value.
4. On `approved`/`modified`, the matched token value is added to an in-memory
safe-tokens set and the request proceeds normally; later requests carrying
the same value skip the block.
5. On `rejected`, timeout, malformed response, or missing supervisor wiring,
the request fails closed with the same `403` as today.
6. Structural blocks that carry no token value (CRLF injection) and the
route-not-allowlisted / git blocks are unchanged — they stay hard `403`s and
keep their existing agent-driven `allow` / `egress-block` MCP path.
7. The proxy event loop is not stalled while waiting: the wait is asynchronous,
so other flows keep being served.
## Non-goals
- Persisting the safe-tokens set across egress restarts. It lives in process
memory only; a restart re-prompts. (The issue explicitly defers persistence.)
- Supervising inbound (prompt-injection) blocks or WebSocket frame blocks.
WebSocket frames still honour the safe-tokens set for already-approved values
but cannot wait for approval (there is no response surface after upgrade).
- Generalising an approved secret across encodings. The safe-tokens set matches
the exact value the detector found.
- Replacing the per-route `dlp.outbound_detectors` override. That remains the
way to turn a detector off wholesale.
- Making `redact` the default. Silent redaction of a true false positive
corrupts legitimate data, so it is opt-in per route; `supervise` (human in
the loop) stays the default.
## Scope
### In scope
The minimum cut that ships, in build order:
1. **Core**`ScanResult.matched`; thread `safe_tokens` through
`scan_outbound` / the token detectors; `build_token_allow_payload`.
2. **Supervise + TUI**`TOOL_EGRESS_TOKEN_ALLOW`; TUI suffix, modify guard,
required approval reason.
3. **Addon glue** — async `request`, safe-tokens set, proposal write + async
poll, allow/block decision; pass `safe_tokens` into the WebSocket path.
4. **On-match policy**`dlp.outbound_on_match` through manifest → render →
addon; `redact` surface scrub with fail-closed re-scan; policy dispatch in
the addon's outbound handler.
5. **Tests + docs** — core/supervise/TUI/manifest/render unit tests; README
egress + supervisor notes.
### Out of scope
The deferrals enumerated under **Non-goals** — restart persistence, inbound /
WebSocket-frame supervision, cross-encoding generalisation, replacing
`dlp.outbound_detectors`, and making `redact` the default.
## Proposed Design
### New services / components
A new proposal tool constant `egress-token-allow` (`TOOL_EGRESS_TOKEN_ALLOW`)
is added to `supervise.TOOLS`, and the egress addon gains an in-memory
safe-tokens set plus the policy-dispatch path that drives it.
On an outbound block the addon dispatches on the resolved policy:
- **Structural blocks always 403.** A `ScanResult` with no `matched` value
(CRLF injection) is a hard `403` regardless of policy — there is nothing to
redact or safelist.
- **`redact`** runs `redact_tokens` over the body, non-`host` header values,
and path/query, then re-scans. If the re-scan is clean the (rewritten)
request is forwarded; if a block-severity match remains (e.g. in the
hostname, or a unicode-evasion token redaction can't reach) it fails closed
with a `403`.
- **`block`** writes the `403` immediately.
- **`supervise`** runs the queue-and-wait loop, falling back to `block` when
supervise isn't wired for the bottle.
For `supervise`, the addon writes the proposal directly to
`SUPERVISE_QUEUE_DIR` (the queue is bind-mounted into the sidecar bundle and
shared by every daemon, exactly as git-gate's `gitleaks-allow` proposal in PRD
0061 does). The proposal's `proposed_file` is a human-readable text payload
built by `build_token_allow_payload`:
```
egress blocked an outbound request carrying a detected token
host: api.example.com
method: POST
path: /v1/ingest
detector: OpenAI API key found in body
context: ...before ******** after...
```
The justification tells the operator to approve only if the value is a false
positive or a credential the request legitimately needs. The addon then polls
`<proposal-id>.response.json` for `EGRESS_TOKEN_ALLOW_TIMEOUT_SECONDS` (default
300). `approved`/`modified` allow the request and add the value to the
safe-tokens set; `rejected`, malformed responses, and timeout fail the request
closed. The proposal + response are archived to `processed/` after a decision.
Because the wait happens inside mitmproxy's asyncio loop, the addon's `request`
hook is async and polls with `asyncio.sleep`, so concurrent flows are
unaffected.
### Existing code touched
- **Policy threading.** `dlp.outbound_on_match` is a per-route enum threaded
from the bottle manifest (`manifest_egress`) through the resolved route
(`egress.EgressRoute`), the rendered `routes.yaml` (`egress_render_routes`),
and the addon's `Route` (`egress_addon_core`). Unset renders nothing and
resolves to `supervise` at request time. The `list-egress-routes`
introspection endpoint round-trips it so the agent's proposals preserve it.
- **Provider-route default.** Agent-provider routes (the agent talking to its
own LLM API — `api.anthropic.com`, the Codex backend, etc.) are the worst
source of token-shaped false positives because the whole conversation payload
flows through them. `egress_routes_for_bottle` fills `outbound_on_match=redact`
on any provider route that doesn't set it explicitly; a provider that sets the
policy keeps its choice, and manifest routes are unaffected (they default to
`supervise`).
- **Scanners.** `scan_outbound` (and the token detectors `scan_token_patterns`
/ `scan_known_secrets` it calls) accept a `safe_tokens` set. A match whose
value is in `safe_tokens` is skipped, so an approved token no longer blocks;
the scanners keep searching past a safelisted match so a second, un-approved
secret in the same request is still caught. The WebSocket path is passed the
same `safe_tokens` set.
- **Supervisor UI.** `cli/supervise.py` renders `egress-token-allow` like
`gitleaks-allow`: the text payload is shown, modify is unavailable (there is
no file patch to edit), and approval prompts for a non-empty reason recorded
in the response notes. There is no on-disk config diff, so — like
`gitleaks-allow` and `capability-block` — it writes no egress audit-log entry.
- **Failure handling.** If `SUPERVISE_QUEUE_DIR` / `SUPERVISE_BOTTLE_SLUG` are
unset (supervise disabled for the bottle), the addon skips the queue and
returns the existing `403`. Any error writing the proposal or reading the
response also fails closed.
### Data model changes
- New per-route manifest field `dlp.outbound_on_match: block | redact |
supervise`, rendered into `routes.yaml` (omitted when unset).
- `ScanResult` gains a `matched: str = ""` field carrying the raw substring the
detector matched. The token detectors populate it; the structural CRLF
detector leaves it empty. The value stays inside the egress sidecar process —
never written to a log line (logs use the redacted `context`) nor to the
proposal file.
- Proposal text payload (above) plus `<proposal-id>.response.json` in
`SUPERVISE_QUEUE_DIR`, archived to `processed/` after a decision.
- New env var `EGRESS_TOKEN_ALLOW_TIMEOUT_SECONDS` (default 300).
### External dependencies
None. Reuses the existing supervisor queue (`SUPERVISE_QUEUE_DIR`) and the
mitmproxy addon framework already in the egress sidecar.
## Open questions
- Should `known_secrets` (provisioned `EGRESS_TOKEN_*` exfiltration) be
override-able at all, or only `token_patterns`? This PRD allows both —
approval is an explicit operator decision and the safe-tokens set matches the
exact found value — but a future revision could restrict `known_secrets` to
reject-only.
## References
- Issue #261
- PRD 0061 — `gitleaks-allow` supervisor proposal pattern this reuses.
@@ -1,189 +0,0 @@
# PRD 0063: Strengthen outbound exfiltration detection
- **Status:** Active
- **Author:** claude
- **Created:** 2026-06-25
- **Issue:** #259
## Summary
The current DLP system has two detection tiers: weak (vendor token regexes)
and strong (exact matching of provisioned `EGRESS_TOKEN_*` values in multiple
encodings). This PRD adds four complementary hardening layers in ranked order
of impact:
1. **Canary tokens** — inject a per-session fake secret into the agent's
environment; any outbound appearance is a zero-false-positive exfil signal.
2. **Broadened known-value scanning** — extend the strong tier beyond
`EGRESS_TOKEN_*` via a `BOT_BOTTLE_SENSITIVE_PREFIXES` sidecar env var.
3. **Fragmentation-resistant matching** — catch separator injection and
long-substring leaks via alnum projection and sliding-window substring
search.
4. **Entropy scoring** — warn-only flag on high-entropy blobs as a
complementary signal for encrypted exfil.
## Problem
After PRD 0056 the outbound scan covers all HTTP request surfaces (host, path,
query, headers, body). But the content-based strong tier only matches
`EGRESS_TOKEN_*` values and their encodings. Three gaps remain:
| Gap | How an agent exploits it |
|-----|--------------------------|
| No canary | There is no "can't be false-positive" signal |
| Narrow prefix filter | Sensitive values provisioned under other env prefixes (MCP keys, API keys injected via the agent provider) are invisible to `scan_known_secrets` |
| Fragmentation blindness | Inserting `-`, space, or `\n` between secret characters turns an exact-match into a miss |
## Goals / Success Criteria
1. Each launched bottle has a unique canary token in the agent's environment
under a randomized `WORD_WORD_SECRET` env var name. The egress sidecar gets
the same env var and registers that exact name through
`BOT_BOTTLE_SENSITIVE_PREFIXES`. Any outbound appearance of the canary
blocks the request as a known-secret match.
2. `scan_known_secrets` accepts a `sensitive_prefixes` parameter (default:
`("EGRESS_TOKEN_",)`). `scan_outbound` reads
`BOT_BOTTLE_SENSITIVE_PREFIXES` from `environ` and merges those prefixes
in, so operators can mark additional env vars as scanned values without
changing the manifest schema.
3. For every secret that passes exact-match, a secondary alnum-projection pass
checks for the secret with all non-alphanumeric characters stripped. This
catches separator-injection evasion (`MY-SECRET` → body contains
`MY SECRET`).
4. A sliding-window partial-match pass checks for long-enough contiguous
substrings of the secret's alnum projection in the text's alnum projection.
Any match ≥ `PARTIAL_MATCH_MIN_LEN` (12 chars) blocks with reason
`"partial match"`.
5. A new `scan_entropy` detector flags outbound text windows with Shannon
entropy ≥ `ENTROPY_BLOCK_THRESHOLD` (5.5 bits/char) at **warn** severity
only. It is registered under the new detector name `"entropy"` in
`OUTBOUND_DETECTOR_NAMES` and disabled by default (routes must opt in).
6. Binary request bodies are decoded via `latin-1` instead of
`utf-8 errors="replace"`, preserving every byte value and allowing
ASCII-range secrets to be found within binary payloads.
7. All new behaviour is unit-tested; existing tests pass unchanged.
## Non-goals
- Rolling per-host buffer for split-across-requests detection (state in the
stateless addon is complex; deferred).
- Additional vendor regexes.
- ML / embedding-based detection.
- Entropy-based hard blocks (warn only per the issue).
## Design
### Canary token flow
```
Egress.prepare()
canary = secrets.token_urlsafe(32)
canary_env = <random WORD_WORD_SECRET>
EgressPlan(canary=canary, canary_env=canary_env, ...)
Docker compose render:
sidecar env: <canary_env>=<canary>
sidecar env: BOT_BOTTLE_SENSITIVE_PREFIXES=<canary_env>
agent env: <canary_env>=<canary> ← visible to agent as a "secret"
macos-container launch: same literals added to sidecar + agent env entries
```
The sidecar uses `BOT_BOTTLE_SENSITIVE_PREFIXES` to make the random canary env
name part of the existing `scan_known_secrets` detector without adding a
manifest schema field.
### Broadened known-value scanning
`scan_known_secrets` gains a `sensitive_prefixes` parameter:
```python
def scan_known_secrets(
text: str,
*,
location: str = "body",
env: Mapping[str, str] | None = None,
sensitive_prefixes: tuple[str, ...] = ("EGRESS_TOKEN_",),
) -> ScanResult | None:
```
`scan_outbound` reads `BOT_BOTTLE_SENSITIVE_PREFIXES` (comma-separated list
of additional prefixes) from `environ` and appends them:
```python
extra = tuple(
p for p in environ.get("BOT_BOTTLE_SENSITIVE_PREFIXES", "").split(",") if p
)
sensitive_prefixes = ("EGRESS_TOKEN_",) + extra
```
`redact_tokens` receives the same treatment for consistent redaction.
### Fragmentation-resistant matching
A new helper `_alnum_projection(text)` strips all non-alphanumeric characters.
`scan_known_secrets` runs two passes per secret:
1. **Exact pass** — existing encoded-variant loop (unchanged).
2. **Alnum-projection pass** — if the secret's alnum projection has ≥ 8 chars,
check if it appears in the text's alnum projection. Match → block with
`"fragmented match (separator injection)"` reason.
3. **Partial-substring pass** — if the secret's alnum projection has ≥
`PARTIAL_MATCH_MIN_LEN` chars (12), slide a window of that length across the
secret's projection and look for each window in the text's alnum projection.
First match → block with `"partial match"` reason.
All three passes run only for the `"known_secrets"` detector; the token-pattern
and entropy detectors are unchanged.
### Entropy scoring
New public function:
```python
def scan_entropy(
text: str,
*,
location: str = "body",
window: int = ENTROPY_WINDOW, # 64
threshold: float = ENTROPY_BLOCK_THRESHOLD, # 5.5
) -> ScanResult | None:
```
Slides a window of `window` characters across `text` in steps of `window // 2`.
If any window's Shannon entropy exceeds `threshold`, returns a **warn**-severity
`ScanResult`. Never blocks.
`OUTBOUND_DETECTOR_NAMES` gains `"entropy"`. Routes opt in via their `dlp`
block; entropy scanning is **off by default** to avoid false-positive noise on
legitimate binary payloads.
### Binary body handling
In `scan_outbound`, the bytes → str decoding changes from:
```python
body.decode("utf-8", errors="replace")
```
to:
```python
body.decode("utf-8") if body is str else body.decode("latin-1")
```
`latin-1` is a bijective byte↔codepoint mapping; every byte value is preserved
as its corresponding Latin-1 code point, so ASCII-range secret strings remain
intact and `str.find` / regex still locate them correctly. The fallback from
strict UTF-8 is tried first so valid UTF-8 bodies are decoded faithfully.
## Implementation
Delivered in three commits on the same branch:
1. **DLP detector changes**`_alnum_projection`, fragmentation passes,
`scan_entropy`, broadened `scan_known_secrets`, updated `scan_outbound` and
`redact_tokens`; all accompanying unit tests.
2. **Canary injection**`EgressPlan.canary`, `Egress.prepare()`,
Docker compose + macos-container backend injection.
3. **PRD flip**`Status: Draft → Active`.
@@ -1,85 +0,0 @@
# PRD 0064: LOG_FULL egress logging credential redaction
- **Status:** Active
- **Author:** claude
- **Created:** 2026-06-25
- **Issue:** #257
## Summary
The `LOG_FULL` egress logging path (`_log_request` and `_log_response` in `egress_addon.py`) writes request/response headers and bodies to stderr without redaction and includes the sidecar-injected upstream `Authorization` header verbatim. This PR applies `redact_tokens` to header values and bodies in both log functions and strips the injected `Authorization` header from request logs entirely.
## Problem
`LOG_FULL` (log level 2) is intended for debugging egress traffic. When active it calls `_log_request` and `_log_response`. Both functions have two related bugs:
1. **Injected `Authorization` header exposure.** `_log_request` is called *after* the sidecar injects upstream credentials (`flow.request.headers["authorization"] = decision.inject_authorization`). The full header dict — including the live credential — is serialized to stderr. Any log collector that ingests the egress container's stderr will receive the upstream bearer token in plaintext.
2. **Unredacted bodies and header values.** Neither `_log_request` nor `_log_response` passes body or header values through `redact_tokens`. By contrast, `_req_ctx` (used for block/warn events) already calls `redact_tokens` on path and host. Any provisioned secret or recognized token pattern that appears in a request body, response body, or non-Authorization header value will be logged verbatim under `LOG_FULL`.
These two bugs compose: an agent that enables `LOG_FULL` and simultaneously triggers a request that carries a known token gains a write path from credentials → egress logs.
## Goals / Success Criteria
- `_log_request` never logs the `authorization` header in any form.
- `_log_request` applies `redact_tokens(value, env=os.environ)` to every other header value before serializing.
- `_log_request` applies `redact_tokens(body, env=os.environ)` to the request body before logging.
- `_log_response` applies `redact_tokens(value, env=os.environ)` to every response header value before logging.
- `_log_response` applies `redact_tokens(body, env=os.environ)` to the response body before logging.
- Unit tests cover each of the five cases above.
## Non-goals
- Redacting host or path in the full-log path (already covered by `_req_ctx` for block/warn events; `_log_request` already calls `redact_tokens` on host and path).
- Suppressing `LOG_FULL` or adding a new log level.
- Changing the outbound DLP scan logic.
## Design
### `_log_request`
```python
def _log_request(self, flow: http.HTTPFlow) -> None:
headers = {
k: redact_tokens(v, env=os.environ)
for k, v in flow.request.headers.items()
if k.lower() != "authorization"
}
body = redact_tokens(flow.request.get_text(strict=False) or "", env=os.environ)
sys.stderr.write(
json.dumps({
"event": "egress_request",
"host": redact_tokens(flow.request.pretty_host, env=os.environ),
"method": flow.request.method,
"path": redact_tokens(flow.request.path, env=os.environ),
"headers": headers,
"body": body,
})
+ "\n"
)
```
The `authorization` key is excluded because by the time `_log_request` is called the sidecar has already injected the upstream credential (`decision.inject_authorization`). Logging it would write a live bearer token to stderr on every allowed request. There is no safe subset to log — the value is always a live credential or empty.
### `_log_response`
```python
def _log_response(self, flow: http.HTTPFlow) -> None:
headers = {
k: redact_tokens(v, env=os.environ)
for k, v in flow.response.headers.items()
}
body = redact_tokens(flow.response.get_text(strict=False) or "", env=os.environ)
sys.stderr.write(
json.dumps({
"event": "egress_response",
"host": flow.request.pretty_host,
"status": flow.response.status_code,
"headers": headers,
"body": body,
})
+ "\n"
)
```
Response headers don't carry injected credentials, so no header name is suppressed — only the values are scrubbed by `redact_tokens`.
@@ -22,7 +22,7 @@ escapes**, and **whether credentials are short-lived and scoped**.
- Outbound: Docker containers have full internet access by default; no egress monitoring on most home networks
- Lateral movement: compromised container can reach the LAN — NAS, other machines, internal services
- Notable: CVE-2025-59536 (CVSS 8.7, Feb 2026) — a poisoned `.claude/settings.json` in a repo gives RCE when Claude Code opens it. `--dangerously-skip-permissions` removes the last gate.
- Supply chain: MCP servers, skills, and npm packages pulled during agent execution. A Jan 2026 large-scale empirical study of a 98,380-skill snapshot confirmed 157 malicious skills, ~71% of them credential harvesters. Exfiltration was overwhelmingly naive — plaintext HTTP to hardcoded endpoints; under 10% used any code obfuscation, and concealment was mostly at the documentation level, not the code level. ([Malicious Agent Skills in the Wild](https://arxiv.org/html/2602.06547v1), arXiv:2602.06547)
- Supply chain: MCP servers, skills, and npm packages pulled during agent execution. ~20% of ClawHub skills were found malicious in early 2026.
**What local topology protects:**
- No inbound attack surface — nothing listening on a public port
+8 -8
View File
@@ -1,14 +1,14 @@
---
agent_provider:
template: claude
# auth_token names the host env var holding the Claude OAuth token. The
# provider injects a provider-owned api.anthropic.com egress route that
# re-injects this token as the Bearer header; the agent only ever sees a
# placeholder CLAUDE_CODE_OAUTH_TOKEN. DLP defaults (token_patterns,
# known_secrets outbound; naive_injection_detection inbound) apply to
# that route. To scan additional hosts, declare them under egress.routes
# with per-route matches/dlp (see README "Egress route fields").
auth_token: BOT_BOTTLE_CLAUDE_OAUTH_TOKEN
egress:
routes:
- host: api.anthropic.com
role: claude_code_oauth
auth:
scheme: Bearer
token_ref: BOT_BOTTLE_CLAUDE_OAUTH_TOKEN
---
Common Claude provider boundary. Drop this file into
-46
View File
@@ -168,34 +168,6 @@ class TestAgentProviderRuntime(unittest.TestCase):
self.assertEqual("~/.claude/statusline.sh", settings["statusLine"]["command"])
self.assertEqual("custom:bot-bottle-research-ui", settings["theme"])
def test_claude_plan_uses_startup_args_from_provider_settings(self):
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
plan = build_agent_provision_plan(
template="claude",
dockerfile="",
state_dir=Path(tmp),
instance_name="bot-bottle-test",
prompt_file=Path(tmp) / "prompt.txt",
provider_settings={
"startup_args": ["--model", "opus"],
},
)
self.assertEqual(("--model", "opus"), plan.startup_args)
def test_codex_plan_uses_startup_args_from_provider_settings(self):
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
plan = build_agent_provision_plan(
template="codex",
dockerfile="",
state_dir=Path(tmp),
instance_name="bot-bottle-test",
prompt_file=Path(tmp) / "prompt.txt",
provider_settings={
"startup_args": ["--model", "gpt-5-codex"],
},
)
self.assertEqual(("--model", "gpt-5-codex"), plan.startup_args)
def test_codex_forward_host_credentials_populates_egress_routes(self):
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
home = Path(tmp) / "host-codex"
@@ -422,24 +394,6 @@ class TestAgentProviderRuntime(unittest.TestCase):
self.assertNotIn("OPENROUTER_API_KEY", plan.guest_env)
self.assertTrue(provider["compat"]["supportsReasoningEffort"])
def test_pi_plan_appends_startup_args_from_provider_settings(self):
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
plan = build_agent_provision_plan(
template="pi",
dockerfile="",
state_dir=Path(tmp),
instance_name="bot-bottle-test",
prompt_file=Path(tmp) / "prompt.txt",
provider_settings={
"models": ["qwen3:14b"],
"startup_args": ["--no-stream"],
},
)
self.assertEqual(
("--models", "ollama/qwen3:14b", "--no-stream"),
plan.startup_args,
)
def test_pi_prompt_mode_appends_system_prompt_interactively(self):
self.assertEqual(
["--append-system-prompt", "/home/node/.bot-bottle-prompt.txt"],
-216
View File
@@ -1,216 +0,0 @@
"""Unit: Freezer class hierarchy."""
from __future__ import annotations
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch
from bot_bottle import supervise, bottle_state
from bot_bottle.backend import ActiveAgent
from bot_bottle.backend.freeze import get_freezer
from bot_bottle.backend.docker.freezer import DockerFreezer
from bot_bottle.backend.macos_container.freezer import MacosContainerFreezer
from bot_bottle.backend.smolmachines.freezer import SmolmachinesFreezer
class _FakeHomeMixin:
def _setup_fake_home(self):
self._tmp = tempfile.TemporaryDirectory(prefix="freezer-test.")
original = supervise.bot_bottle_root
def fake_root() -> Path:
return Path(self._tmp.name) / ".bot-bottle"
supervise.bot_bottle_root = fake_root # type: ignore[assignment]
self._restore = lambda: setattr(supervise, "bot_bottle_root", original)
def _teardown_fake_home(self):
self._restore()
self._tmp.cleanup()
def _make_agent(slug: str, backend: str = "docker") -> ActiveAgent:
return ActiveAgent(
backend_name=backend,
slug=slug,
agent_name="dev",
started_at="t",
services=(),
)
class TestGetFreezer(unittest.TestCase):
def test_docker(self):
self.assertIsInstance(get_freezer("docker"), DockerFreezer)
def test_empty_backend_gives_docker(self):
self.assertIsInstance(get_freezer(""), DockerFreezer)
def test_macos_container(self):
self.assertIsInstance(get_freezer("macos-container"), MacosContainerFreezer)
def test_smolmachines(self):
self.assertIsInstance(get_freezer("smolmachines"), SmolmachinesFreezer)
def test_unknown_backend_dies(self):
with patch("bot_bottle.backend.freeze.die", side_effect=SystemExit("die")):
with self.assertRaises(SystemExit):
get_freezer("unknown-backend")
class TestFreezerBaseCommit(_FakeHomeMixin, unittest.TestCase):
"""The base Freezer.commit() owns the shared post-freeze steps."""
def setUp(self):
self._setup_fake_home()
def tearDown(self):
self._teardown_fake_home()
def test_writes_committed_image_and_marks_preserved(self):
slug = "dev-abc12"
bottle_state.write_metadata(bottle_state.BottleMetadata(
identity=slug, agent_name="dev", cwd="", copy_cwd=False,
started_at="t", backend="docker",
))
freezer = get_freezer("docker")
agent = _make_agent(slug)
with patch.object(freezer, "_freeze", return_value="bot-bottle-committed-dev-abc12:latest"), \
patch("bot_bottle.backend.freeze.info"):
freezer.commit(agent)
self.assertEqual(
"bot-bottle-committed-dev-abc12:latest",
bottle_state.read_committed_image(slug),
)
self.assertTrue(bottle_state.is_preserved(slug))
def test_commit_slug_passes_correct_slug_to_freeze(self):
slug = "dev-abc12"
bottle_state.write_metadata(bottle_state.BottleMetadata(
identity=slug, agent_name="dev", cwd="", copy_cwd=False,
started_at="t", backend="docker",
))
freezer = get_freezer("docker")
captured = {}
def capture_freeze(agent: ActiveAgent) -> str:
captured["slug"] = agent.slug
return "some-ref"
with patch.object(freezer, "_freeze", side_effect=capture_freeze), \
patch("bot_bottle.backend.freeze.info"):
freezer.commit_slug(slug)
self.assertEqual(slug, captured["slug"])
class TestDockerFreezer(_FakeHomeMixin, unittest.TestCase):
def setUp(self):
self._setup_fake_home()
def tearDown(self):
self._teardown_fake_home()
def test_commits_container_and_records_image(self):
slug = "dev-abc12"
bottle_state.write_metadata(bottle_state.BottleMetadata(
identity=slug, agent_name="dev", cwd="", copy_cwd=False,
started_at="t", backend="docker",
))
freezer = DockerFreezer()
agent = _make_agent(slug)
with patch("bot_bottle.backend.docker.freezer.commit_container") as mock_commit, \
patch("bot_bottle.backend.freeze.info"), \
patch("bot_bottle.backend.docker.freezer.info"):
freezer.commit(agent)
mock_commit.assert_called_once_with(
f"bot-bottle-{slug}",
f"bot-bottle-committed-{slug}:latest",
)
self.assertEqual(
f"bot-bottle-committed-{slug}:latest",
bottle_state.read_committed_image(slug),
)
self.assertTrue(bottle_state.is_preserved(slug))
class TestMacosContainerFreezer(_FakeHomeMixin, unittest.TestCase):
def setUp(self):
self._setup_fake_home()
def tearDown(self):
self._teardown_fake_home()
def _write_meta(self, slug: str) -> None:
bottle_state.write_metadata(bottle_state.BottleMetadata(
identity=slug, agent_name="dev", cwd="", copy_cwd=False,
started_at="t", backend="macos-container",
))
def test_commits_running_container_without_stopping(self):
"""Commit should exec-tar the running container, not stop it."""
slug = "dev-abc12"
self._write_meta(slug)
freezer = MacosContainerFreezer()
agent = _make_agent(slug, "macos-container")
with patch("bot_bottle.backend.macos_container.freezer.commit_container") as mock_commit, \
patch("bot_bottle.backend.freeze.info"), \
patch("bot_bottle.backend.macos_container.freezer.info"):
freezer.commit(agent)
mock_commit.assert_called_once_with(
f"bot-bottle-{slug}",
f"bot-bottle-committed-{slug}:latest",
)
self.assertEqual(
f"bot-bottle-committed-{slug}:latest",
bottle_state.read_committed_image(slug),
)
self.assertTrue(bottle_state.is_preserved(slug))
class TestSmolmachinesFreezer(_FakeHomeMixin, unittest.TestCase):
def setUp(self):
self._setup_fake_home()
def tearDown(self):
self._teardown_fake_home()
def _write_meta(self, slug: str) -> None:
bottle_state.write_metadata(bottle_state.BottleMetadata(
identity=slug, agent_name="dev", cwd="", copy_cwd=False,
started_at="t", backend="smolmachines",
))
def test_snapshots_running_vm_without_stopping(self):
"""Commit should exec-tar the running VM, not stop it."""
slug = "dev-abc12"
self._write_meta(slug)
freezer = SmolmachinesFreezer()
agent = _make_agent(slug, "smolmachines")
with patch("bot_bottle.backend.smolmachines.freezer._snapshot_running_vm") as mock_snap, \
patch("bot_bottle.backend.freeze.info"), \
patch("bot_bottle.backend.smolmachines.freezer.info"):
freezer.commit(agent)
expected_binary = bottle_state.bottle_state_dir(slug) / "committed-smolmachine"
mock_snap.assert_called_once_with(
f"bot-bottle-{slug}",
f"bot-bottle-committed-{slug}:latest",
expected_binary,
)
expected_sidecar = str(expected_binary.with_suffix(".smolmachine"))
self.assertEqual(expected_sidecar, bottle_state.read_committed_image(slug))
self.assertTrue(bottle_state.is_preserved(slug))
if __name__ == "__main__":
unittest.main()
-51
View File
@@ -277,56 +277,5 @@ class TestBottleMetadataBackend(_FakeHomeMixin, unittest.TestCase):
self.assertEqual("", loaded.backend)
class TestCommittedImage(_FakeHomeMixin, unittest.TestCase):
"""write_committed_image / read_committed_image round-trip."""
def setUp(self):
self._setup_fake_home()
def tearDown(self):
self._teardown_fake_home()
def test_returns_none_when_absent(self):
self.assertIsNone(bottle_state.read_committed_image("dev"))
def test_write_then_read_roundtrip(self):
bottle_state.write_committed_image("dev", "bot-bottle-committed-dev:latest")
self.assertEqual(
"bot-bottle-committed-dev:latest",
bottle_state.read_committed_image("dev"),
)
def test_strips_trailing_newline_on_read(self):
path = bottle_state.committed_image_path("dev")
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text("bot-bottle-committed-dev:latest\n\n")
self.assertEqual(
"bot-bottle-committed-dev:latest",
bottle_state.read_committed_image("dev"),
)
def test_isolated_per_slug(self):
bottle_state.write_committed_image("dev", "bot-bottle-committed-dev:latest")
bottle_state.write_committed_image("api", "bot-bottle-committed-api:latest")
self.assertEqual(
"bot-bottle-committed-dev:latest",
bottle_state.read_committed_image("dev"),
)
self.assertEqual(
"bot-bottle-committed-api:latest",
bottle_state.read_committed_image("api"),
)
def test_path_under_state_dir(self):
path = bottle_state.committed_image_path("dev")
self.assertTrue(str(path).endswith("/.bot-bottle/state/dev/committed-image"))
def test_empty_content_returns_none(self):
path = bottle_state.committed_image_path("dev")
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(" \n")
self.assertIsNone(bottle_state.read_committed_image("dev"))
if __name__ == "__main__":
unittest.main()
-143
View File
@@ -1,143 +0,0 @@
"""Unit: cli.py commit command."""
from __future__ import annotations
import tempfile
import unittest
from pathlib import Path
from unittest.mock import MagicMock, patch
from bot_bottle.cli.commit import cmd_commit
from bot_bottle import supervise
from bot_bottle import bottle_state
from bot_bottle.backend.freeze import CommitCancelled
class _FakeHomeMixin:
def _setup_fake_home(self):
self._tmp = tempfile.TemporaryDirectory(prefix="cli-commit-test.")
original = supervise.bot_bottle_root
def fake_root() -> Path:
return Path(self._tmp.name) / ".bot-bottle"
supervise.bot_bottle_root = fake_root # type: ignore[assignment]
self._restore = lambda: setattr(supervise, "bot_bottle_root", original)
def _teardown_fake_home(self):
self._restore()
self._tmp.cleanup()
class TestCmdCommitSlugArg(_FakeHomeMixin, unittest.TestCase):
"""cmd_commit with an explicit slug delegates to get_freezer."""
def setUp(self):
self._setup_fake_home()
def tearDown(self):
self._teardown_fake_home()
def _write_meta(self, slug: str, backend: str) -> None:
bottle_state.write_metadata(bottle_state.BottleMetadata(
identity=slug, agent_name="dev", cwd="", copy_cwd=False,
started_at="t", backend=backend,
))
def test_commits_docker_bottle(self):
slug = "dev-abc12"
self._write_meta(slug, "docker")
with patch("bot_bottle.cli.commit.get_freezer") as mock_gf:
mock_freezer = MagicMock()
mock_gf.return_value = mock_freezer
rc = cmd_commit([slug])
self.assertEqual(0, rc)
mock_gf.assert_called_once_with("docker")
mock_freezer.commit_slug.assert_called_once_with(slug)
def test_empty_backend_passed_to_get_freezer(self):
"""Old state dirs without a backend field pass '' to get_freezer."""
slug = "dev-abc12"
self._write_meta(slug, "")
with patch("bot_bottle.cli.commit.get_freezer") as mock_gf:
mock_freezer = MagicMock()
mock_gf.return_value = mock_freezer
rc = cmd_commit([slug])
self.assertEqual(0, rc)
mock_gf.assert_called_once_with("")
def test_commits_macos_container_bottle(self):
slug = "dev-abc12"
self._write_meta(slug, "macos-container")
with patch("bot_bottle.cli.commit.get_freezer") as mock_gf:
mock_freezer = MagicMock()
mock_gf.return_value = mock_freezer
rc = cmd_commit([slug])
self.assertEqual(0, rc)
mock_gf.assert_called_once_with("macos-container")
mock_freezer.commit_slug.assert_called_once_with(slug)
def test_commits_smolmachines_bottle(self):
slug = "dev-abc12"
self._write_meta(slug, "smolmachines")
with patch("bot_bottle.cli.commit.get_freezer") as mock_gf:
mock_freezer = MagicMock()
mock_gf.return_value = mock_freezer
rc = cmd_commit([slug])
self.assertEqual(0, rc)
mock_gf.assert_called_once_with("smolmachines")
def test_returns_zero_on_commit_cancelled(self):
slug = "dev-abc12"
self._write_meta(slug, "macos-container")
with patch("bot_bottle.cli.commit.get_freezer") as mock_gf:
mock_freezer = MagicMock()
mock_freezer.commit_slug.side_effect = CommitCancelled
mock_gf.return_value = mock_freezer
rc = cmd_commit([slug])
self.assertEqual(0, rc)
class TestCmdCommitNoActiveBottles(_FakeHomeMixin, unittest.TestCase):
def setUp(self):
self._setup_fake_home()
def tearDown(self):
self._teardown_fake_home()
def test_dies_when_no_active_bottles_and_no_slug(self):
with patch(
"bot_bottle.cli.commit.enumerate_active_agents", return_value=[],
), patch(
"bot_bottle.cli.commit.die", side_effect=SystemExit("die"),
) as mock_die:
with self.assertRaises(SystemExit):
cmd_commit([])
mock_die.assert_called_once()
def test_returns_zero_when_picker_cancelled(self):
active = MagicMock()
active.slug = "dev-abc12"
with patch(
"bot_bottle.cli.commit.enumerate_active_agents", return_value=[active],
), patch(
"bot_bottle.cli.commit.tui.filter_select", return_value=None,
):
rc = cmd_commit([])
self.assertEqual(0, rc)
if __name__ == "__main__":
unittest.main()
-21
View File
@@ -102,27 +102,6 @@ class TestAttachAgent(unittest.TestCase):
bottle.argv,
)
def test_remote_control_is_provider_startup_arg(self):
class Bottle:
argv: list[str] = []
def exec_agent(self, argv: list[str], *, tty: bool = True) -> int:
self.argv = list(argv)
return 0
bottle = Bottle()
exit_code = start_mod.attach_agent(
bottle, # type: ignore[arg-type]
agent_provider_template="codex",
startup_args=("remote-control",),
)
self.assertEqual(0, exit_code)
self.assertEqual(
["--dangerously-bypass-approvals-and-sandbox", "remote-control"],
bottle.argv,
)
if __name__ == "__main__":
unittest.main()
+3 -24
View File
@@ -80,11 +80,7 @@ def _git_gate_plan(upstreams: tuple[GitGateUpstream, ...] = ()) -> GitGatePlan:
)
def _egress_plan(
routes: tuple[EgressRoute, ...] = (),
*,
canary: bool = False,
) -> EgressPlan:
def _egress_plan(routes: tuple[EgressRoute, ...] = ()) -> EgressPlan:
token_env_map = {
r.token_env: r.token_ref
for r in routes
@@ -99,8 +95,6 @@ def _egress_plan(
egress_network=f"bot-bottle-egress-{SLUG}",
mitmproxy_ca_host_path=STATE / "egress-ca" / "mitmproxy-ca.pem",
mitmproxy_ca_cert_only_host_path=STATE / "egress-ca" / "ca.pem",
canary="fake-canary-value" if canary else "",
canary_env="CANON_ALPHA_SECRET" if canary else "",
)
@@ -118,7 +112,6 @@ def _plan(
with_git: bool = False,
with_egress: bool = False,
supervise: bool = False,
canary: bool = False,
) -> DockerBottlePlan:
"""Build a fully-resolved DockerBottlePlan. Toggles cover the
matrix the renderer's conditional-service logic branches on."""
@@ -157,7 +150,7 @@ def _plan(
slug=SLUG,
forwarded_env={"CLAUDE_CODE_OAUTH_TOKEN": "x"},
git_gate_plan=_git_gate_plan(upstreams),
egress_plan=_egress_plan(routes, canary=canary),
egress_plan=_egress_plan(routes),
supervise_plan=_supervise_plan() if supervise else None,
use_runsc=False,
agent_provision=AgentProvisionPlan(
@@ -382,20 +375,6 @@ class TestSidecarBundleShape(unittest.TestCase):
env_strings = sc["environment"]
self.assertNotIn("EGRESS_TOKEN_0", env_strings)
def test_canary_env_registered_as_sensitive_in_sidecar(self):
sc = self._render(canary=True)["services"]["sidecars"]
env_strings = sc["environment"]
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", env_strings)
self.assertIn(
"BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET",
env_strings,
)
def test_canary_env_visible_to_agent(self):
agent = self._render(canary=True)["services"]["agent"]
env_strings = agent["environment"]
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", env_strings)
def test_supervise_env_present_when_active(self):
sc = self._render(supervise=True)["services"]["sidecars"]
env_strings = sc["environment"]
@@ -413,7 +392,7 @@ class TestSidecarBundleShape(unittest.TestCase):
"services"]["sidecars"]
targets = {v["target"] for v in sc["volumes"]}
self.assertIn("/home/mitmproxy/.mitmproxy/mitmproxy-ca.pem", targets)
self.assertIn("/etc/egress", targets)
self.assertIn("/etc/egress/routes.yaml", targets)
self.assertIn("/git-gate-entrypoint.sh", targets)
self.assertIn("/git-gate/creds/upstream-known_hosts", targets)
self.assertTrue(any("supervise/queue" in t or t.startswith("/run/supervise")
+4 -13
View File
@@ -29,9 +29,6 @@ from bot_bottle.supervise import SupervisePlan
_URL = "http://supervise:9100/"
_CODEX_DOCKERFILE = (
Path(__file__).resolve().parents[2] / "bot_bottle/contrib/codex/Dockerfile"
)
def _make_bottle(exec_result: ExecResult | None = None) -> MagicMock:
@@ -279,12 +276,6 @@ class TestCodexProvision(unittest.TestCase):
)
class TestCodexDockerfile(unittest.TestCase):
def test_installs_procps_for_remote_control_pid_management(self):
dockerfile = _CODEX_DOCKERFILE.read_text()
self.assertIn("procps", dockerfile)
class TestCodexSuperviseMcp(unittest.TestCase):
def test_noop_when_supervise_disabled(self):
bottle = _make_bottle()
@@ -301,10 +292,10 @@ class TestCodexSuperviseMcp(unittest.TestCase):
bottle.exec.assert_called_once()
script = bottle.exec.call_args.args[0]
self.assertEqual("node", bottle.exec.call_args.kwargs.get("user"))
self.assertEqual(
f"codex mcp add supervise --url {_URL}",
script,
)
self.assertIn("codex mcp add", script)
self.assertIn("--transport http", script)
self.assertIn("supervise", script)
self.assertIn(_URL, script)
def test_logs_warning_on_failure_but_does_not_raise(self):
bottle = _make_bottle(
@@ -12,7 +12,6 @@ from bot_bottle.contrib.gitea.deploy_key_provisioner import (
GiteaDeployKeyProvisioner,
_split_owner_repo,
)
from bot_bottle.deploy_key_provisioner import DeployKeyCollisionError
def _provisioner() -> GiteaDeployKeyProvisioner:
@@ -101,30 +100,6 @@ class TestCreate(unittest.TestCase):
provisioner.create("owner/repo", "title")
self.assertIn("403", str(ctx.exception))
def test_create_raises_collision_error_on_422(self):
provisioner = _provisioner()
collision_body = json.dumps({
"errors": ["Key content already exists on this repository"],
"message": "422 Unprocessable Entity",
})
with patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.subprocess.run"
), patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.urllib.request.urlopen",
side_effect=_http_error(422, collision_body),
), patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.Path.read_bytes",
return_value=b"pk",
), patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.Path.read_text",
return_value="ssh-ed25519 AAAA\n",
):
with self.assertRaises(DeployKeyCollisionError) as ctx:
provisioner.create("owner/repo", "my-title")
msg = str(ctx.exception)
self.assertIn("owner/repo", msg)
self.assertIn("my-title", msg)
class TestDelete(unittest.TestCase):
def test_delete_calls_correct_endpoint(self):
+2 -250
View File
@@ -1,23 +1,18 @@
"""Unit: DLP detectors (PRD 0053).
Tests for token pattern scanning, known secret detection, fragmentation-
resistant matching, entropy scoring, and naive prompt injection detection."""
Tests for token pattern scanning, known secret detection, and
naive prompt injection detection."""
import base64
import gzip
import unittest
from bot_bottle.dlp_detectors import (
ENTROPY_BLOCK_THRESHOLD,
PARTIAL_MATCH_MIN_LEN,
REDACT,
_alnum_projection,
_encoded_variants,
_normalize_text,
_shannon_entropy,
redact_tokens,
scan_crlf_injection,
scan_entropy,
scan_known_secrets,
scan_naive_injection,
scan_token_patterns,
@@ -450,248 +445,5 @@ class TestKnownSecretsNewVariants(unittest.TestCase):
self.assertIsNotNone(result)
class TestMatchedAndSafeTokens(unittest.TestCase):
"""PRD 0062: detectors carry the raw matched value, and a safelisted
value is skipped so the supervisor can approve a specific token."""
def test_token_pattern_sets_matched(self):
token = "ghp_" + "A" * 36
result = scan_token_patterns(f"token: {token}")
assert result is not None
self.assertEqual(token, result.matched)
def test_safe_token_is_skipped(self):
token = "ghp_" + "A" * 36
self.assertIsNone(
scan_token_patterns(f"token: {token}", safe_tokens={token})
)
def test_safe_token_does_not_mask_other_token(self):
safe = "ghp_" + "A" * 36
other = "AKIAIOSFODNN7EXAMPLE"
result = scan_token_patterns(
f"a={safe} b={other}", safe_tokens={safe},
)
assert result is not None
self.assertEqual(other, result.matched)
self.assertIn("AWS", result.reason)
def test_known_secret_sets_matched_and_safelist_skips(self):
secret = "supersecretvalue123"
env = {"EGRESS_TOKEN_FOO": secret}
result = scan_known_secrets(f"x={secret}", env=env)
assert result is not None
self.assertEqual(secret, result.matched)
self.assertIsNone(
scan_known_secrets(f"x={secret}", env=env, safe_tokens={secret})
)
def test_crlf_block_has_no_matched_value(self):
result = scan_crlf_injection("path%0d%0aHost: evil")
assert result is not None
self.assertEqual("", result.matched)
class TestStripCrlf(unittest.TestCase):
def test_removes_url_encoded_crlf(self):
from bot_bottle.dlp_detectors import strip_crlf
out = strip_crlf("next=%0d%0aX-Injected: evil")
self.assertNotRegex(out, r"%0[dD]%0[aA]")
def test_removes_literal_header_injection(self):
from bot_bottle.dlp_detectors import strip_crlf
out = strip_crlf("value\r\nX-Injected: evil")
self.assertIsNone(scan_crlf_injection(out))
def test_leaves_clean_text_unchanged(self):
from bot_bottle.dlp_detectors import strip_crlf
self.assertEqual("/api/v1/data?q=hello", strip_crlf("/api/v1/data?q=hello"))
class TestAlnumProjection(unittest.TestCase):
def test_alphanumeric_unchanged(self):
self.assertEqual("abc123XYZ", _alnum_projection("abc123XYZ"))
def test_strips_hyphens(self):
self.assertEqual("mysecretvalue", _alnum_projection("my-secret-value"))
def test_strips_spaces(self):
self.assertEqual("mysecretvalue", _alnum_projection("my secret value"))
def test_strips_dots_and_underscores(self):
self.assertEqual("mysecretvalue", _alnum_projection("my.secret_value"))
def test_empty_string(self):
self.assertEqual("", _alnum_projection(""))
def test_all_special_chars(self):
self.assertEqual("", _alnum_projection("!@#$%^&*()"))
class TestFragmentationResistantMatching(unittest.TestCase):
"""scan_known_secrets catches separator-injection and partial-substring evasion."""
# Secrets long enough that their alnum projections are ≥ 8 chars.
SECRET = "supersecrettoken99"
ENV = {"EGRESS_TOKEN_0": SECRET}
def test_exact_match_still_works(self):
result = scan_known_secrets(f"key={self.SECRET}", env=self.ENV)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_separator_injection_blocked(self):
# Hyphens inserted between chars of the secret.
fragmented = "-".join(self.SECRET)
result = scan_known_secrets(f"data={fragmented}", env=self.ENV)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
self.assertIn("separator injection", result.reason)
def test_space_separator_blocked(self):
fragmented = " ".join(self.SECRET)
result = scan_known_secrets(f"body: {fragmented}", env=self.ENV)
self.assertIsNotNone(result)
assert result is not None
self.assertIn("separator injection", result.reason)
def test_partial_substring_blocked(self):
# First PARTIAL_MATCH_MIN_LEN alnum chars of the secret, no separators.
partial = _alnum_projection(self.SECRET)[:PARTIAL_MATCH_MIN_LEN]
result = scan_known_secrets(f"x={partial}&y=other", env=self.ENV)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
self.assertIn("partial match", result.reason)
def test_short_secret_skips_projection(self):
# Secrets shorter than _ALNUM_MIN_LEN in alnum projection are not
# fragmentation-checked (too many false positives).
short_env = {"EGRESS_TOKEN_0": "abc"}
# "a b c" has alnum projection "abc" (3 chars, < 8); should not block.
self.assertIsNone(scan_known_secrets("a b c", env=short_env))
def test_clean_text_not_blocked(self):
self.assertIsNone(scan_known_secrets("nothing to see here", env=self.ENV))
def test_sensitive_prefixes_param_extra_prefix(self):
env = {"MY_CRED_0": self.SECRET, "IGNORED": "other"}
result = scan_known_secrets(
f"key={self.SECRET}",
env=env,
sensitive_prefixes=("MY_CRED_",),
)
self.assertIsNotNone(result)
assert result is not None
self.assertIn("MY_CRED_0", result.reason)
def test_sensitive_prefixes_default_only_egress_token(self):
# A value under a non-EGRESS_TOKEN_ key is ignored with default prefixes.
env = {"MY_CRED_0": self.SECRET}
self.assertIsNone(scan_known_secrets(f"key={self.SECRET}", env=env))
def test_canary_prefix_detected(self):
canary_value = "canary-fake-secret-value-xyz"
env = {"CANON_ALPHA_SECRET": canary_value}
result = scan_known_secrets(
f"x={canary_value}",
env=env,
sensitive_prefixes=("CANON_ALPHA_SECRET",),
)
self.assertIsNotNone(result)
assert result is not None
self.assertIn("CANON_ALPHA_SECRET", result.reason)
class TestRedactTokensBroadenedPrefixes(unittest.TestCase):
SECRET = "my-provisioned-secret"
def test_default_redacts_egress_token(self):
env = {"EGRESS_TOKEN_0": self.SECRET}
out = redact_tokens(f"val={self.SECRET}", env=env)
self.assertNotIn(self.SECRET, out)
self.assertIn(REDACT, out)
def test_extra_prefix_redacted(self):
env = {"MY_SECRET_KEY": self.SECRET}
out = redact_tokens(
f"val={self.SECRET}",
env=env,
sensitive_prefixes=("MY_SECRET_",),
)
self.assertNotIn(self.SECRET, out)
self.assertIn(REDACT, out)
def test_non_matching_prefix_not_redacted(self):
env = {"MY_SECRET_KEY": self.SECRET}
out = redact_tokens(f"val={self.SECRET}", env=env)
# Default prefixes only include EGRESS_TOKEN_ → secret not redacted
self.assertIn(self.SECRET, out)
class TestShannonEntropy(unittest.TestCase):
def test_empty_string_zero(self):
self.assertEqual(0.0, _shannon_entropy(""))
def test_single_char_zero(self):
self.assertEqual(0.0, _shannon_entropy("aaaaaa"))
def test_two_equal_chars_one_bit(self):
self.assertAlmostEqual(1.0, _shannon_entropy("abababab"), places=10)
def test_high_entropy_random_like(self):
# Uniform 64-char string over 64 distinct symbols has entropy 6 bits.
import string
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
text = alphabet # each char appears exactly once
self.assertAlmostEqual(6.0, _shannon_entropy(text), places=10)
class TestScanEntropy(unittest.TestCase):
def test_empty_returns_none(self):
self.assertIsNone(scan_entropy(""))
def test_low_entropy_returns_none(self):
# Highly repetitive text has low entropy.
self.assertIsNone(scan_entropy("a" * 200))
def test_high_entropy_warns(self):
# Build a 64-char string with entropy > ENTROPY_BLOCK_THRESHOLD.
# Use all 64 distinct printable chars to maximise entropy (~6 bits).
import string
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
result = scan_entropy(alphabet, threshold=ENTROPY_BLOCK_THRESHOLD)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("warn", result.severity)
self.assertIn("high-entropy", result.reason)
def test_never_blocks(self):
import string
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
result = scan_entropy(alphabet)
# scan_entropy is warn-only; it must never return severity="block".
if result is not None:
self.assertNotEqual("block", result.severity)
def test_location_in_result(self):
import string
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
result = scan_entropy(alphabet, location="authorization header")
if result is not None:
self.assertIn("authorization header", result.location)
def test_structured_json_no_warn(self):
# Typical JSON has low entropy and should not be flagged.
json_body = '{"status": "ok", "message": "hello world", "count": 42}'
self.assertIsNone(scan_entropy(json_body))
def test_short_text_below_window(self):
# Text shorter than the window: checked as one chunk.
# Use a uniform string to ensure it won't be flagged.
self.assertIsNone(scan_entropy("abcde", threshold=ENTROPY_BLOCK_THRESHOLD))
if __name__ == "__main__":
unittest.main()
-10
View File
@@ -136,16 +136,6 @@ class TestClaudeArgv(unittest.TestCase):
argv,
)
def test_codex_remote_control_startup_arg_does_not_receive_initial_prompt(self):
argv = _codex_bottle("/home/node/.bot-bottle-prompt.txt").agent_argv(
["--dangerously-bypass-approvals-and-sandbox", "remote-control"],
)
self.assertEqual(
["docker", "exec", "-it", "bot-bottle-dev-abc", "codex",
"--dangerously-bypass-approvals-and-sandbox", "remote-control"],
argv,
)
def test_codex_resume_does_not_append_initial_prompt(self):
argv = _codex_bottle("/home/node/.bot-bottle-prompt.txt").agent_argv(
["--dangerously-bypass-approvals-and-sandbox", "resume", "--last"],
@@ -1,192 +0,0 @@
"""Unit: Docker launch step uses committed image when available."""
from __future__ import annotations
import contextlib
import io
import tempfile
import unittest
from pathlib import Path
from typing import Any
from unittest import mock
from bot_bottle.agent_provider import AgentProvisionPlan
from bot_bottle.backend import BottleSpec
from bot_bottle.backend.docker import launch as launch_mod
from bot_bottle.backend.docker.bottle_plan import DockerBottlePlan
from bot_bottle.egress import EgressPlan
from bot_bottle.git_gate import GitGatePlan
from bot_bottle.manifest import ManifestIndex
_SLUG = "dev-abc12"
_COMMITTED_TAG = f"bot-bottle-committed-{_SLUG}:latest"
_DEFAULT_IMAGE = "bot-bottle-claude:latest"
_IDX = ManifestIndex.from_json_obj({
"bottles": {"dev": {}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
})
def _plan(tmp: str) -> DockerBottlePlan:
stage = Path(tmp)
spec = BottleSpec(
manifest=_IDX,
agent_name="demo",
copy_cwd=False,
user_cwd=tmp,
identity=_SLUG,
)
return DockerBottlePlan(
spec=spec,
manifest=_IDX.load_for_agent("demo"),
stage_dir=stage,
git_gate_plan=GitGatePlan(
slug=_SLUG,
entrypoint_script=stage / "entrypoint.sh",
hook_script=stage / "hook.sh",
access_hook_script=stage / "access-hook.sh",
upstreams=(),
),
egress_plan=EgressPlan(
slug=_SLUG,
routes_path=stage / "egress.yaml",
routes=(),
token_env_map={},
),
supervise_plan=None,
agent_provision=AgentProvisionPlan(
template="claude",
command="claude",
prompt_mode="append_file",
image=_DEFAULT_IMAGE,
dockerfile="",
guest_home="/home/node",
instance_name=f"bot-bottle-{_SLUG}",
prompt_file=stage / "prompt.txt",
guest_env={},
),
slug=_SLUG,
forwarded_env={},
use_runsc=False,
)
class TestLaunchCommittedImage(unittest.TestCase):
def setUp(self) -> None:
self._tmp = tempfile.mkdtemp(prefix="launch-committed-test.")
def tearDown(self) -> None:
import shutil
shutil.rmtree(self._tmp, ignore_errors=True)
def _run_launch(
self,
plan: DockerBottlePlan,
*,
committed_tag: str | None = None,
image_present: bool = True,
) -> list[str]:
"""Drive launch() through its full sequence with the committed-image
behaviour controlled by the arguments. Returns the images that were
passed to `build_image` (empty list if it was never called)."""
built: list[str] = []
def fake_build(image: str, ctx: str, *, dockerfile: str = "") -> None:
del ctx, dockerfile
built.append(image)
with mock.patch.object(
launch_mod, "read_committed_image", return_value=committed_tag,
), mock.patch.object(
launch_mod.docker_mod, "image_exists", return_value=image_present,
), mock.patch.object(
launch_mod.docker_mod, "build_image", side_effect=fake_build,
), mock.patch.object(
launch_mod, "egress_tls_init",
return_value=(Path("/egress_ca"), Path("/egress_cert")),
), mock.patch.object(
launch_mod.network_mod, "network_name_for_slug",
return_value="bb-internal",
), mock.patch.object(
launch_mod.network_mod, "network_egress_name_for_slug",
return_value="bb-egress",
), mock.patch.object(
launch_mod, "bottle_plan_to_compose",
return_value={"services": {"agent": {}}},
), mock.patch.object(
launch_mod, "write_compose_file",
return_value=Path("/tmp/compose.yml"),
), mock.patch.object(launch_mod, "compose_up"), \
mock.patch.object(launch_mod, "compose_dump_logs"), \
mock.patch.object(launch_mod, "compose_down"), \
contextlib.redirect_stderr(io.StringIO()):
provision = mock.Mock(return_value=None)
with launch_mod.launch(plan, provision=provision):
pass
return built
def test_skips_build_when_committed_image_present(self) -> None:
plan = _plan(self._tmp)
built = self._run_launch(plan, committed_tag=_COMMITTED_TAG, image_present=True)
self.assertEqual([], built, "build_image should not be called when committed image exists")
def test_uses_committed_image_in_compose_spec(self) -> None:
"""The compose spec renderer receives the committed image tag via
plan.image captured here by checking what bottle_plan_to_compose
was called with."""
plan = _plan(self._tmp)
captured_plans: list[DockerBottlePlan] = []
def fake_compose(p: DockerBottlePlan) -> dict[str, Any]:
captured_plans.append(p)
return {"services": {"agent": {}}}
with mock.patch.object(
launch_mod, "read_committed_image", return_value=_COMMITTED_TAG,
), mock.patch.object(
launch_mod.docker_mod, "image_exists", return_value=True,
), mock.patch.object(
launch_mod.docker_mod, "build_image",
), mock.patch.object(
launch_mod, "egress_tls_init",
return_value=(Path("/egress_ca"), Path("/egress_cert")),
), mock.patch.object(
launch_mod.network_mod, "network_name_for_slug",
return_value="bb-internal",
), mock.patch.object(
launch_mod.network_mod, "network_egress_name_for_slug",
return_value="bb-egress",
), mock.patch.object(
launch_mod, "bottle_plan_to_compose", side_effect=fake_compose,
), mock.patch.object(
launch_mod, "write_compose_file",
return_value=Path("/tmp/compose.yml"),
), mock.patch.object(launch_mod, "compose_up"), \
mock.patch.object(launch_mod, "compose_dump_logs"), \
mock.patch.object(launch_mod, "compose_down"), \
contextlib.redirect_stderr(io.StringIO()):
provision = mock.Mock(return_value=None)
with launch_mod.launch(plan, provision=provision):
pass
self.assertEqual(1, len(captured_plans))
self.assertEqual(_COMMITTED_TAG, captured_plans[0].image)
def test_falls_back_to_build_when_no_committed_image(self) -> None:
plan = _plan(self._tmp)
built = self._run_launch(plan, committed_tag=None)
self.assertEqual([_DEFAULT_IMAGE], built)
def test_falls_back_to_build_when_committed_image_missing_from_daemon(self) -> None:
plan = _plan(self._tmp)
built = self._run_launch(
plan, committed_tag=_COMMITTED_TAG, image_present=False,
)
self.assertEqual([_DEFAULT_IMAGE], built)
if __name__ == "__main__":
unittest.main()
@@ -31,6 +31,7 @@ class _Provider(AgentProvider):
return AgentProviderRuntime(
template="test", command="test", image="",
prompt_mode="append_file", bypass_args=(), resume_args=(),
remote_control_args=(),
)
def provision_plan(self, **kwargs): # type: ignore[override]
raise NotImplementedError
-41
View File
@@ -67,46 +67,5 @@ class TestSave(unittest.TestCase):
)
class TestCommitContainer(unittest.TestCase):
def test_runs_docker_commit(self):
with patch.object(
docker_mod.subprocess, "run", return_value=_ok(),
) as run, patch.object(docker_mod, "info"):
docker_mod.commit_container(
"bot-bottle-dev-abc12",
"bot-bottle-committed-dev-abc12:latest",
)
argv = run.call_args.args[0]
self.assertEqual(
[
"docker", "commit",
"bot-bottle-dev-abc12",
"bot-bottle-committed-dev-abc12:latest",
],
argv,
)
def test_dies_on_docker_commit_failure(self):
with patch.object(
docker_mod.subprocess, "run", return_value=_fail("No such container"),
), patch.object(
docker_mod, "die", side_effect=SystemExit("die"),
) as die:
with self.assertRaises(SystemExit):
docker_mod.commit_container("missing-container", "some:tag")
die.assert_called_once()
self.assertIn("missing-container", die.call_args.args[0])
def test_die_message_includes_image_tag(self):
with patch.object(
docker_mod.subprocess, "run", return_value=_fail("boom"),
), patch.object(
docker_mod, "die", side_effect=SystemExit("die"),
) as die:
with self.assertRaises(SystemExit):
docker_mod.commit_container("ctr", "my-tag:v1")
self.assertIn("my-tag:v1", die.call_args.args[0])
if __name__ == "__main__":
unittest.main()
+6 -160
View File
@@ -1,21 +1,15 @@
"""Unit: Egress route lift + routes.yaml render + token
resolution (PRD 0017, PRD 0053)."""
import tempfile
import unittest
from pathlib import Path
from bot_bottle.egress import (
CODEX_HOST_CREDENTIAL_TOKEN_REF,
Egress,
EgressPlan,
EgressRoute,
egress_agent_env_entries,
egress_manifest_routes,
egress_render_routes,
egress_resolve_token_values,
egress_routes_for_bottle,
egress_sidecar_env_entries,
egress_token_env_map,
)
from bot_bottle.log import Die
@@ -208,23 +202,6 @@ class TestProviderRouteMerge(unittest.TestCase):
self.assertEqual((), routes[0].matches)
self.assertEqual({}, egress_token_env_map(routes))
def test_provider_route_defaults_to_redact_on_match(self):
b = _bottle([])
pr = EgressRoute(host="api.anthropic.com")
routes = egress_routes_for_bottle(b, (pr,))
self.assertEqual("redact", routes[0].outbound_on_match)
def test_provider_route_explicit_on_match_preserved(self):
b = _bottle([])
pr = EgressRoute(host="api.anthropic.com", outbound_on_match="supervise")
routes = egress_routes_for_bottle(b, (pr,))
self.assertEqual("supervise", routes[0].outbound_on_match)
def test_manifest_route_does_not_get_redact_default(self):
b = _bottle([{"host": "api.example.com"}])
routes = egress_routes_for_bottle(b)
self.assertEqual("", routes[0].outbound_on_match)
def test_two_provider_routes_with_same_token_ref_share_slot(self):
b = _bottle([])
routes = egress_routes_for_bottle(b, (
@@ -322,7 +299,7 @@ class TestRenderRoutes(unittest.TestCase):
self.assertEqual([], parse_yaml_subset(rendered)["routes"])
def test_round_trip_through_addon_core(self):
from bot_bottle.egress_addon_core import load_config
from bot_bottle.egress_addon_core import load_routes
b = _bottle([
{"host": "api.github.com",
"auth": {"scheme": "Bearer", "token_ref": "GH_PAT"},
@@ -333,7 +310,7 @@ class TestRenderRoutes(unittest.TestCase):
{"host": "api.anthropic.com"},
])
routes = egress_routes_for_bottle(b)
addon_routes = load_config(egress_render_routes(routes)).routes
addon_routes = load_routes(egress_render_routes(routes))
self.assertEqual(3, len(addon_routes))
self.assertEqual("Bearer", addon_routes[0].auth_scheme)
self.assertEqual("EGRESS_TOKEN_0", addon_routes[0].token_env)
@@ -341,41 +318,24 @@ class TestRenderRoutes(unittest.TestCase):
self.assertEqual("", addon_routes[2].auth_scheme)
def test_dlp_round_trips(self):
from bot_bottle.egress_addon_core import load_config
from bot_bottle.egress_addon_core import load_routes
b = _bottle([{"host": "x.example", "dlp": {
"outbound_detectors": ["token_patterns"],
"inbound_detectors": False,
}}])
routes = egress_routes_for_bottle(b)
rendered = egress_render_routes(routes)
addon_routes = load_config(rendered).routes
addon_routes = load_routes(rendered)
self.assertEqual(("token_patterns",), addon_routes[0].outbound_detectors)
self.assertEqual((), addon_routes[0].inbound_detectors)
def test_outbound_on_match_round_trips(self):
from bot_bottle.egress_addon_core import load_config
b = _bottle([{"host": "logs.example", "dlp": {
"outbound_on_match": "redact",
}}])
routes = egress_routes_for_bottle(b)
rendered = egress_render_routes(routes)
self.assertIn('outbound_on_match: "redact"', rendered)
addon_routes = load_config(rendered).routes
self.assertEqual("redact", addon_routes[0].outbound_on_match)
def test_outbound_on_match_default_omitted_from_render(self):
b = _bottle([{"host": "x.example"}])
routes = egress_routes_for_bottle(b)
rendered = egress_render_routes(routes)
self.assertNotIn("outbound_on_match", rendered)
def test_git_fetch_policy_round_trips(self):
from bot_bottle.egress_addon_core import load_config
from bot_bottle.egress_addon_core import load_routes
b = _bottle([{"host": "github.com", "git": {"fetch": True}}])
routes = egress_routes_for_bottle(b)
rendered = egress_render_routes(routes)
self.assertEqual({"fetch": True}, self._parsed(routes)[0]["git"])
addon_routes = load_config(rendered).routes
addon_routes = load_routes(rendered)
self.assertTrue(addon_routes[0].git_fetch)
def test_log_zero_omitted_from_render(self):
@@ -449,119 +409,5 @@ class TestResolveTokenValues(unittest.TestCase):
self.assertEqual({"EGRESS_TOKEN_0": "codex-access-token"}, out)
class TestCanaryGeneration(unittest.TestCase):
"""Egress.prepare() generates a unique canary token per session."""
def _bottle_obj(self):
return ManifestIndex.from_json_obj({
"bottles": {"dev": {"egress": {"routes": []}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
}).bottles["dev"]
def _make_plan(self) -> EgressPlan:
# Use a concrete no-op subclass so we can call prepare() without
# a real backend.
class _TestEgress(Egress):
pass
e = _TestEgress()
with tempfile.TemporaryDirectory() as td:
return e.prepare(self._bottle_obj(), "test-slug", Path(td))
def test_canary_is_non_empty(self):
plan = self._make_plan()
self.assertIsInstance(plan.canary, str)
self.assertGreater(len(plan.canary), 0)
self.assertRegex(plan.canary_env, r"^[A-Z]+_[A-Z]+_SECRET$")
def test_canary_is_unique_per_session(self):
with tempfile.TemporaryDirectory() as td:
bottle = self._bottle_obj()
class _TestEgress(Egress):
pass
e = _TestEgress()
plan_a = e.prepare(bottle, "slug-a", Path(td))
plan_b = e.prepare(bottle, "slug-b", Path(td))
self.assertNotEqual(plan_a.canary, plan_b.canary)
def test_canary_detected_by_scan_known_secrets(self):
from bot_bottle.dlp_detectors import scan_known_secrets
plan = self._make_plan()
env = {plan.canary_env: plan.canary}
result = scan_known_secrets(
f"exfil={plan.canary}",
env=env,
sensitive_prefixes=(plan.canary_env,),
)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
self.assertIn(plan.canary_env, result.reason)
def test_egress_plan_canary_field_default_empty(self):
# Verify EgressPlan can be constructed with an empty canary (backward compat).
from pathlib import Path
plan = EgressPlan(
slug="s",
routes_path=Path("/tmp/r.yaml"),
routes=(),
token_env_map={},
)
self.assertEqual("", plan.canary)
self.assertEqual("", plan.canary_env)
class TestEgressEnvEntries(unittest.TestCase):
def test_sidecar_entries_include_route_tokens_and_canary_scan_prefix(self):
plan = EgressPlan(
slug="s",
routes_path=Path("/tmp/r.yaml"),
routes=(EgressRoute(host="api.example"),),
token_env_map={"EGRESS_TOKEN_1": "T1", "EGRESS_TOKEN_0": "T0"},
canary="fake-canary-value",
canary_env="CANON_ALPHA_SECRET",
)
self.assertEqual(
(
"EGRESS_TOKEN_0",
"EGRESS_TOKEN_1",
"CANON_ALPHA_SECRET=fake-canary-value",
"BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET",
),
egress_sidecar_env_entries(plan),
)
def test_agent_entries_include_only_canary_bait(self):
plan = EgressPlan(
slug="s",
routes_path=Path("/tmp/r.yaml"),
routes=(),
token_env_map={},
canary="fake-canary-value",
canary_env="CANON_ALPHA_SECRET",
)
self.assertEqual(
("CANON_ALPHA_SECRET=fake-canary-value",),
egress_agent_env_entries(plan),
)
def test_canary_entries_omitted_when_name_missing(self):
plan = EgressPlan(
slug="s",
routes_path=Path("/tmp/r.yaml"),
routes=(),
token_env_map={},
canary="fake-canary-value",
)
self.assertEqual((), egress_sidecar_env_entries(plan))
self.assertEqual((), egress_agent_env_entries(plan))
if __name__ == "__main__":
unittest.main()
+38 -234
View File
@@ -22,16 +22,15 @@ from bot_bottle.egress_addon_core import (
MatchEntry,
PathMatch,
Route,
ScanResult,
build_inbound_scan_text,
build_outbound_scan_text,
build_token_allow_payload,
decide,
decide_git_fetch,
evaluate_matches,
is_git_fetch_request,
is_git_push_request,
load_config,
load_routes,
match_route,
outbound_scan_headers,
parse_config,
@@ -268,24 +267,46 @@ class TestParseDlp(unittest.TestCase):
"dlp": {"wat": True},
}]})
def test_outbound_on_match_default_empty(self):
routes = parse_routes({"routes": [{"host": "x.example"}]})
self.assertEqual("", routes[0].outbound_on_match)
def test_outbound_on_match_parsed(self):
for policy in ("block", "redact", "supervise"):
routes = parse_routes({"routes": [{
"host": "x.example",
"dlp": {"outbound_on_match": policy},
}]})
self.assertEqual(policy, routes[0].outbound_on_match)
# --- load_routes ---------------------------------------------------------
def test_outbound_on_match_invalid_rejected(self):
class TestLoadRoutes(unittest.TestCase):
def test_yaml_text_round_trip(self):
routes = load_routes(
'routes:\n'
' - host: "api.example"\n'
)
self.assertEqual(1, len(routes))
self.assertEqual("api.example", routes[0].host)
def test_full_route_shape_parses(self):
routes = load_routes(
'routes:\n'
' - host: "api.example"\n'
' auth_scheme: "Bearer"\n'
' token_env: "EGRESS_TOKEN_0"\n'
' matches:\n'
' - paths:\n'
' - value: "/v1/"\n'
' - type: "exact"\n'
' value: "/messages"\n'
)
self.assertEqual(1, len(routes))
r = routes[0]
self.assertEqual("api.example", r.host)
self.assertEqual("Bearer", r.auth_scheme)
self.assertEqual("EGRESS_TOKEN_0", r.token_env)
self.assertEqual(1, len(r.matches))
self.assertEqual(2, len(r.matches[0].paths))
def test_empty_routes_list(self):
routes = load_routes("routes: []\n")
self.assertEqual((), routes)
def test_invalid_yaml_raises_value_error(self):
with self.assertRaises(ValueError):
parse_routes({"routes": [{
"host": "x.example",
"dlp": {"outbound_on_match": "nope"},
}]})
load_routes("routes:\n\t- host: x\n")
# --- load_config / parse_config ------------------------------------------
@@ -336,33 +357,6 @@ class TestLoadConfig(unittest.TestCase):
with self.assertRaises(ValueError):
parse_config("not a dict")
def test_empty_routes_list(self):
cfg = load_config("routes: []\n")
self.assertEqual((), cfg.routes)
def test_full_route_shape_parses(self):
cfg = load_config(
'routes:\n'
' - host: "api.example"\n'
' auth_scheme: "Bearer"\n'
' token_env: "EGRESS_TOKEN_0"\n'
' matches:\n'
' - paths:\n'
' - value: "/v1/"\n'
' - type: "exact"\n'
' value: "/messages"\n'
)
r = cfg.routes[0]
self.assertEqual("api.example", r.host)
self.assertEqual("Bearer", r.auth_scheme)
self.assertEqual("EGRESS_TOKEN_0", r.token_env)
self.assertEqual(1, len(r.matches))
self.assertEqual(2, len(r.matches[0].paths))
def test_invalid_yaml_raises_value_error(self):
with self.assertRaises(ValueError):
load_config("routes:\n\t- host: x\n")
# --- evaluate_matches ---------------------------------------------------
@@ -1173,195 +1167,5 @@ class TestScanInbound(unittest.TestCase):
self.assertEqual("block", result.severity)
class TestScanOutboundSafeTokens(unittest.TestCase):
"""PRD 0062: scan_outbound threads the supervisor-approved safe-tokens
set into the token detectors."""
def test_safe_token_allows_request(self):
text = build_outbound_scan_text(
host="api.example.com", path="/v1/data", query="",
headers={}, body=f"key={_AWS_KEY}",
)
self.assertIsNone(
scan_outbound(_ROUTE, text, {}, safe_tokens={_AWS_KEY})
)
def test_unrelated_safe_token_still_blocks(self):
text = build_outbound_scan_text(
host="api.example.com", path="/v1/data", query="",
headers={}, body=f"key={_AWS_KEY}",
)
result = scan_outbound(_ROUTE, text, {}, safe_tokens={"ghp_" + "A" * 36})
self.assertIsNotNone(result)
assert result is not None
self.assertEqual(_AWS_KEY, result.matched)
class TestScanOutboundCrlfText(unittest.TestCase):
"""PRD 0062: CRLF is scanned only over the request line + headers
(crlf_text), never the body a body is not an injection vector."""
def test_body_crlf_not_flagged_when_crlf_text_excludes_body(self):
# A form-encoded multi-line body legitimately contains %0d%0a.
body = "comment=line1%0d%0aline2"
full = build_outbound_scan_text(
host="api.example.com", path="/submit", query="",
headers={}, body=body,
)
crlf_text = build_outbound_scan_text(
host="api.example.com", path="/submit", query="",
headers={}, body="",
)
self.assertIsNone(scan_outbound(_ROUTE, full, {}, crlf_text=crlf_text))
def test_request_line_crlf_still_flagged(self):
full = build_outbound_scan_text(
host="api.example.com", path="/p", query="next=%0d%0aX:evil",
headers={}, body="",
)
crlf_text = full
result = scan_outbound(_ROUTE, full, {}, crlf_text=crlf_text)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_default_crlf_text_scans_full_blob(self):
# Backward compatibility: crlf_text=None scans everything (body too).
full = build_outbound_scan_text(
host="api.example.com", path="/submit", query="",
headers={}, body="x=%0d%0aX:evil",
)
self.assertIsNotNone(scan_outbound(_ROUTE, full, {}))
class TestBuildTokenAllowPayload(unittest.TestCase):
def test_payload_includes_context_and_no_raw_token(self):
result = ScanResult(
severity="block",
reason="AWS access key found in body",
location="body",
context="key=******** tail",
matched=_AWS_KEY,
)
payload = build_token_allow_payload(
"api.example.com", "POST", "/v1/ingest", result,
)
self.assertIn("host: api.example.com", payload)
self.assertIn("method: POST", payload)
self.assertIn("path: /v1/ingest", payload)
self.assertIn("AWS access key found in body", payload)
self.assertIn("key=******** tail", payload)
# The raw matched value must never appear in the proposal file.
self.assertNotIn(_AWS_KEY, payload)
def test_payload_omits_context_line_when_empty(self):
result = ScanResult(severity="block", reason="r", matched="x")
payload = build_token_allow_payload("h", "GET", "/", result)
self.assertNotIn("context:", payload)
class TestScanOutboundEnhanced(unittest.TestCase):
"""scan_outbound changes: binary decode, entropy detector,
broadened known-value prefixes, fragmentation resistance."""
_ROUTE = Route(host="api.example.com")
_ROUTE_ENTROPY = Route(
host="api.example.com",
outbound_detectors=("entropy",),
)
def test_binary_body_latin1_decode_finds_ascii_secret(self):
# Body contains valid ASCII secret surrounded by non-UTF-8 bytes.
secret = "supersecrettoken99"
env = {"EGRESS_TOKEN_0": secret}
# Wrap the secret in bytes that are invalid UTF-8.
body = b"\x80\x81" + secret.encode("ascii") + b"\xff"
result = scan_outbound(self._ROUTE, body, env)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_binary_body_valid_utf8_decoded_correctly(self):
env = {"EGRESS_TOKEN_0": "mysecret"}
# Valid UTF-8 body — should be decoded as UTF-8, not latin-1.
body = "clean body with mysecret".encode("utf-8")
result = scan_outbound(self._ROUTE, body, env)
self.assertIsNotNone(result)
def test_entropy_detector_off_by_default(self):
import string
# High-entropy content should NOT warn if the route has no entropy detector.
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
result = scan_outbound(self._ROUTE, alphabet, {})
self.assertIsNone(result)
def test_entropy_detector_warns_when_enabled(self):
import string
alphabet = (string.ascii_letters + string.digits + "+/")[:64]
result = scan_outbound(self._ROUTE_ENTROPY, alphabet, {})
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("warn", result.severity)
def test_bot_bottle_sensitive_prefixes_env_var(self):
# When the sidecar env contains BOT_BOTTLE_SENSITIVE_PREFIXES,
# scan_outbound should scan those additional prefixes.
secret = "extra-sensitive-value-abc"
env = {
"MY_CRED_KEY": secret,
"BOT_BOTTLE_SENSITIVE_PREFIXES": "MY_CRED_",
}
result = scan_outbound(self._ROUTE, f"x={secret}", env)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_bot_bottle_sensitive_prefixes_multiple(self):
secret = "my-api-key-value-xyz"
env = {
"ANTHROPIC_API_0": secret,
"BOT_BOTTLE_SENSITIVE_PREFIXES": "ANTHROPIC_API_,OTHER_",
}
result = scan_outbound(self._ROUTE, f"auth={secret}", env)
self.assertIsNotNone(result)
def test_canary_detected_via_random_secret_env_name(self):
# The fake secret uses a randomized env name that the sidecar marks
# as sensitive through BOT_BOTTLE_SENSITIVE_PREFIXES.
canary = "canaryvalue12345abcdef"
env = {
"CANON_ALPHA_SECRET": canary,
"BOT_BOTTLE_SENSITIVE_PREFIXES": "CANON_ALPHA_SECRET",
}
result = scan_outbound(self._ROUTE, f"data={canary}", env)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
self.assertIn("CANON_ALPHA_SECRET", result.reason)
def test_fragmented_canary_blocked(self):
# Canary with separators injected is still caught.
canary = "supersecretcanary99"
env = {
"CANON_ALPHA_SECRET": canary,
"BOT_BOTTLE_SENSITIVE_PREFIXES": "CANON_ALPHA_SECRET",
}
fragmented = "-".join(canary)
result = scan_outbound(self._ROUTE, f"x={fragmented}", env)
self.assertIsNotNone(result)
class TestOutboundDetectorNames(unittest.TestCase):
def test_entropy_in_outbound_detector_names(self):
from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES
self.assertIn("entropy", OUTBOUND_DETECTOR_NAMES)
def test_known_secrets_in_outbound_detector_names(self):
from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES
self.assertIn("known_secrets", OUTBOUND_DETECTOR_NAMES)
def test_token_patterns_in_outbound_detector_names(self):
from bot_bottle.egress_addon_core import OUTBOUND_DETECTOR_NAMES
self.assertIn("token_patterns", OUTBOUND_DETECTOR_NAMES)
if __name__ == "__main__":
unittest.main()
@@ -1,274 +0,0 @@
"""Unit: LOG_FULL credential redaction in _log_request / _log_response (issue #257).
egress_addon.py is sidecar-only code that depends on mitmproxy, which is
not installed on the host. This file pre-populates sys.modules with the
minimum mocks needed so EgressAddon can be imported and tested without the
real mitmproxy package."""
from __future__ import annotations
import json
import sys
import types
import unittest
from io import StringIO
from typing import Any
from unittest.mock import patch
# ---------------------------------------------------------------------------
# Sidecar-import shims — must run before importing egress_addon
# ---------------------------------------------------------------------------
def _ensure_shims() -> None:
if "mitmproxy" not in sys.modules:
_mm = types.ModuleType("mitmproxy")
_mh = types.ModuleType("mitmproxy.http")
setattr(_mm, "http", _mh)
sys.modules["mitmproxy"] = _mm
sys.modules["mitmproxy.http"] = _mh
if "egress_addon_core" not in sys.modules:
import bot_bottle.egress_addon_core as _core
sys.modules["egress_addon_core"] = _core
_ensure_shims()
from bot_bottle.egress_addon import EgressAddon # noqa: E402 (import after shims)
from bot_bottle.egress_addon_core import Config, LOG_FULL # noqa: E402
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _addon() -> EgressAddon:
"""Return a bare EgressAddon with LOG_FULL config and no routes file."""
a: EgressAddon = EgressAddon.__new__(EgressAddon)
a.config = Config(routes=(), log=LOG_FULL)
a.safe_tokens = set()
a._supervise_queue_dir = ""
a._supervise_slug = ""
a._token_allow_timeout = 300.0
return a
class _Headers:
def __init__(self, d: dict[str, str]) -> None:
self._d = d
def items(self) -> list[tuple[str, str]]:
return list(self._d.items())
class _Request:
def __init__(
self,
host: str = "api.example.com",
method: str = "POST",
path: str = "/v1/messages",
headers: dict[str, str] | None = None,
body: str = "",
) -> None:
self.pretty_host = host
self.method = method
self.path = path
self.headers = _Headers(headers or {})
self._body = body
def get_text(self, *, strict: bool = True) -> str:
return self._body
class _Response:
def __init__(
self,
status_code: int = 200,
headers: dict[str, str] | None = None,
body: str = "",
) -> None:
self.status_code = status_code
self.headers = _Headers(headers or {})
self._body = body
def get_text(self, *, strict: bool = True) -> str:
return self._body
class _Flow:
def __init__(
self,
request: _Request | None = None,
response: _Response | None = None,
) -> None:
self.request = request or _Request()
self.response = response or _Response()
def _log_request(addon: EgressAddon, flow: _Flow) -> dict[str, Any]:
buf = StringIO()
with patch("sys.stderr", buf):
addon._log_request(flow) # type: ignore[arg-type]
return json.loads(buf.getvalue())
def _log_response(addon: EgressAddon, flow: _Flow) -> dict[str, Any]:
buf = StringIO()
with patch("sys.stderr", buf):
addon._log_response(flow) # type: ignore[arg-type]
return json.loads(buf.getvalue())
# ---------------------------------------------------------------------------
# _log_request — authorization header stripped
# ---------------------------------------------------------------------------
class TestLogRequestAuthorizationStripped(unittest.TestCase):
def test_lowercase_authorization_excluded(self) -> None:
addon = _addon()
flow = _Flow(request=_Request(headers={"authorization": "Bearer sk-real-secret"}))
entry = _log_request(addon, flow)
self.assertNotIn("authorization", entry["headers"])
def test_titlecase_authorization_excluded(self) -> None:
addon = _addon()
flow = _Flow(request=_Request(headers={"Authorization": "Bearer sk-real-secret"}))
entry = _log_request(addon, flow)
self.assertNotIn("Authorization", entry["headers"])
self.assertNotIn("authorization", entry["headers"])
def test_non_auth_headers_retained(self) -> None:
addon = _addon()
flow = _Flow(request=_Request(headers={
"authorization": "Bearer sk-real-secret",
"content-type": "application/json",
}))
entry = _log_request(addon, flow)
self.assertIn("content-type", entry["headers"])
self.assertEqual("application/json", entry["headers"]["content-type"])
def test_no_authorization_header_logs_all_others(self) -> None:
addon = _addon()
flow = _Flow(request=_Request(headers={"x-request-id": "abc"}))
entry = _log_request(addon, flow)
self.assertEqual({"x-request-id": "abc"}, entry["headers"])
# ---------------------------------------------------------------------------
# _log_request — body redaction
# ---------------------------------------------------------------------------
_OPENAI_KEY = "sk-" + "A" * 48
class TestLogRequestBodyRedacted(unittest.TestCase):
def test_token_pattern_in_body_scrubbed(self) -> None:
addon = _addon()
flow = _Flow(request=_Request(body=f"key={_OPENAI_KEY}"))
entry = _log_request(addon, flow)
self.assertNotIn(_OPENAI_KEY, entry["body"])
self.assertIn("********", entry["body"])
def test_provisioned_secret_in_body_scrubbed(self) -> None:
addon = _addon()
secret = "provisioned-egress-secret-xyz"
flow = _Flow(request=_Request(body=f"token={secret}"))
with patch.dict("os.environ", {"EGRESS_TOKEN_0": secret}):
entry = _log_request(addon, flow)
self.assertNotIn(secret, entry["body"])
self.assertIn("********", entry["body"])
def test_clean_body_preserved(self) -> None:
addon = _addon()
payload = '{"model": "claude-3", "max_tokens": 1024}'
flow = _Flow(request=_Request(body=payload))
entry = _log_request(addon, flow)
self.assertEqual(payload, entry["body"])
# ---------------------------------------------------------------------------
# _log_request — non-authorization header value redaction
# ---------------------------------------------------------------------------
class TestLogRequestHeaderValuesRedacted(unittest.TestCase):
def test_token_in_custom_header_scrubbed(self) -> None:
addon = _addon()
flow = _Flow(request=_Request(headers={"x-api-key": _OPENAI_KEY}))
entry = _log_request(addon, flow)
self.assertNotIn(_OPENAI_KEY, entry["headers"].get("x-api-key", ""))
self.assertIn("********", entry["headers"].get("x-api-key", ""))
def test_clean_header_value_preserved(self) -> None:
addon = _addon()
flow = _Flow(request=_Request(headers={"accept": "application/json"}))
entry = _log_request(addon, flow)
self.assertEqual("application/json", entry["headers"]["accept"])
# ---------------------------------------------------------------------------
# _log_response — body redaction
# ---------------------------------------------------------------------------
class TestLogResponseBodyRedacted(unittest.TestCase):
def test_token_pattern_in_response_body_scrubbed(self) -> None:
addon = _addon()
flow = _Flow(
request=_Request(),
response=_Response(body=f'{{"key": "{_OPENAI_KEY}"}}'),
)
entry = _log_response(addon, flow)
self.assertNotIn(_OPENAI_KEY, entry["body"])
self.assertIn("********", entry["body"])
def test_provisioned_secret_in_response_body_scrubbed(self) -> None:
addon = _addon()
secret = "provisioned-egress-secret-xyz"
flow = _Flow(
request=_Request(),
response=_Response(body=f'{{"token": "{secret}"}}'),
)
with patch.dict("os.environ", {"EGRESS_TOKEN_0": secret}):
entry = _log_response(addon, flow)
self.assertNotIn(secret, entry["body"])
self.assertIn("********", entry["body"])
def test_clean_response_body_preserved(self) -> None:
addon = _addon()
flow = _Flow(request=_Request(), response=_Response(body='{"result": "ok"}'))
entry = _log_response(addon, flow)
self.assertEqual('{"result": "ok"}', entry["body"])
# ---------------------------------------------------------------------------
# _log_response — response header value redaction
# ---------------------------------------------------------------------------
class TestLogResponseHeaderValuesRedacted(unittest.TestCase):
def test_token_in_response_header_scrubbed(self) -> None:
addon = _addon()
flow = _Flow(
request=_Request(),
response=_Response(headers={"set-cookie": f"token={_OPENAI_KEY}"}),
)
entry = _log_response(addon, flow)
cookie_val = entry["headers"].get("set-cookie", "")
self.assertNotIn(_OPENAI_KEY, cookie_val)
self.assertIn("********", cookie_val)
def test_clean_response_header_preserved(self) -> None:
addon = _addon()
flow = _Flow(
request=_Request(),
response=_Response(headers={"content-type": "application/json"}),
)
entry = _log_response(addon, flow)
self.assertEqual("application/json", entry["headers"]["content-type"])
if __name__ == "__main__":
unittest.main()
+11 -63
View File
@@ -2,15 +2,12 @@
add_route removed; docker exec / cp / kill paths are covered by the
integration test)."""
import tempfile
import unittest
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import patch
from bot_bottle import supervise
from bot_bottle.backend.egress_apply import EgressApplyError
from bot_bottle.backend.docker.egress_apply import applicator
from bot_bottle.backend.docker.egress_apply import (
EgressApplyError,
validate_routes_content,
)
_ROUTES_EMPTY = "routes: []\n"
@@ -19,11 +16,11 @@ _ROUTES_ONE = 'routes:\n - host: "api.anthropic.com"\n'
class TestValidateRoutesContent(unittest.TestCase):
def test_accepts_minimal_route_table(self):
applicator.validate_routes_content(_ROUTES_EMPTY)
applicator.validate_routes_content(_ROUTES_ONE)
validate_routes_content(_ROUTES_EMPTY)
validate_routes_content(_ROUTES_ONE)
def test_accepts_full_route_with_matches(self):
applicator.validate_routes_content(
validate_routes_content(
'routes:\n'
' - host: "api.github.com"\n'
' auth_scheme: "Bearer"\n'
@@ -35,74 +32,25 @@ class TestValidateRoutesContent(unittest.TestCase):
def test_rejects_bad_yaml(self):
with self.assertRaises(EgressApplyError) as cm:
applicator.validate_routes_content("routes:\n\t- host: x\n")
validate_routes_content("routes:\n\t- host: x\n")
self.assertIn("not valid", str(cm.exception))
def test_rejects_missing_routes_key(self):
with self.assertRaises(EgressApplyError):
applicator.validate_routes_content("other: []\n")
validate_routes_content("other: []\n")
def test_rejects_non_list_routes(self):
with self.assertRaises(EgressApplyError):
applicator.validate_routes_content('routes: "not a list"\n')
validate_routes_content('routes: "not a list"\n')
def test_rejects_partial_auth_pair(self):
with self.assertRaises(EgressApplyError):
applicator.validate_routes_content(
validate_routes_content(
'routes:\n'
' - host: "x.example"\n'
' auth_scheme: "Bearer"\n'
)
def test_rejects_log_full(self):
with self.assertRaises(EgressApplyError) as cm:
applicator.validate_routes_content(
'log: 2\n'
'routes:\n'
' - host: "x.example"\n'
)
self.assertIn("must not change egress logging", str(cm.exception))
class TestApplyRoutesChange(unittest.TestCase):
def setUp(self):
self._tmp = tempfile.TemporaryDirectory(prefix="egress-apply-test.")
original = supervise.bot_bottle_root
def fake_root() -> Path:
return Path(self._tmp.name) / ".bot-bottle"
supervise.bot_bottle_root = fake_root # type: ignore[assignment]
self.addCleanup(lambda: setattr(supervise, "bot_bottle_root", original))
self.addCleanup(self._tmp.cleanup)
def test_writes_live_routes_and_signals_reload(self):
calls: list[list[str]] = []
def fake_run(argv: list[str], **kwargs: object) -> SimpleNamespace:
calls.append(list(argv))
return SimpleNamespace(returncode=0, stdout="", stderr="")
with patch(
"bot_bottle.backend.docker.egress_apply.subprocess.run",
side_effect=fake_run,
):
before, after = applicator.apply_routes_change(
"dev",
"routes:\n - host: google.com\n",
)
self.assertEqual("", before)
self.assertEqual("routes:\n - host: google.com\n", after)
self.assertEqual(
"routes:\n - host: google.com\n",
(Path(self._tmp.name) / ".bot-bottle/state/dev/egress/routes.yaml").read_text(encoding="utf-8"),
)
self.assertEqual(
["docker", "kill", "--signal", "HUP", "bot-bottle-sidecars-dev"],
calls[0],
)
if __name__ == "__main__":
unittest.main()
-24
View File
@@ -199,30 +199,6 @@ class TestHookRender(unittest.TestCase):
self.assertIn('set -- "$@" --push-option="$opt"', hook)
self.assertIn('git push "$@" origin "$refspec"', hook)
def test_inline_gitleaks_allow_routes_to_supervisor(self):
hook = git_gate_render_hook()
# First gitleaks runs normally; only if that passes does the
# hook ask gitleaks to ignore inline allow comments and report
# the suppressed findings for human approval.
self.assertIn("--ignore-gitleaks-allow", hook)
self.assertIn("--report-format=json", hook)
self.assertIn('"tool": "gitleaks-allow"', hook)
self.assertIn("SUPERVISE_QUEUE_DIR", hook)
self.assertIn("SUPERVISE_BOTTLE_SLUG", hook)
self.assertIn("supervisor approved # gitleaks:allow", hook)
self.assertIn("supervisor rejected # gitleaks:allow", hook)
def test_inline_gitleaks_allow_fails_closed_without_supervisor(self):
hook = git_gate_render_hook()
self.assertIn(
"cannot route # gitleaks:allow finding to supervisor; refusing push",
hook,
)
self.assertIn(
"supervisor approval timed out for # gitleaks:allow; refusing push",
hook,
)
class TestAccessHookRender(unittest.TestCase):
def test_access_hook_refreshes_origin_on_upload_pack(self):
-127
View File
@@ -1,127 +0,0 @@
"""Unit: leveled + structured logging wrappers (issue #252).
Locks three properties of bot_bottle.log:
- backward compatibility default output is byte-identical to the
original bare wrappers, so the 100+ existing single-string call
sites are unaffected;
- context rendering an optional mapping becomes a parseable
` [k=v ...]` suffix;
- level gating BOT_BOTTLE_LOG_LEVEL filters by severity, debug is
silent by default, and error always surfaces.
"""
from __future__ import annotations
import contextlib
import io
import unittest
from typing import Callable
from unittest import mock
from bot_bottle import log
def _capture(
fn: Callable[..., None],
*args: object,
env: dict[str, str] | None = None,
**kwargs: object,
) -> str:
buf = io.StringIO()
patched = mock.patch.dict("os.environ", env or {}, clear=False)
with patched, contextlib.redirect_stderr(buf):
fn(*args, **kwargs)
return buf.getvalue()
class TestBackwardCompat(unittest.TestCase):
"""No context + default level → exactly the legacy lines."""
def test_info(self):
self.assertEqual("bot-bottle: hello\n", _capture(log.info, "hello"))
def test_warn(self):
self.assertEqual(
"bot-bottle: warning: careful\n", _capture(log.warn, "careful")
)
def test_error(self):
self.assertEqual(
"bot-bottle: error: boom\n", _capture(log.error, "boom")
)
class TestContext(unittest.TestCase):
def test_appends_sorted_parseable_suffix(self):
out = _capture(
log.error, "rpc failed", context={"slug": "abc123", "code": "-32603"}
)
# keys sorted: code before slug
self.assertEqual(
"bot-bottle: error: rpc failed [code=-32603 slug=abc123]\n", out
)
def test_quotes_values_with_whitespace(self):
out = _capture(
log.info, "did thing", context={"path": "/a b/c", "ok": "yes"}
)
self.assertEqual(
'bot-bottle: did thing [ok=yes path="/a b/c"]\n', out
)
def test_empty_context_is_noop_suffix(self):
self.assertEqual(
"bot-bottle: x\n", _capture(log.info, "x", context={})
)
class TestLevels(unittest.TestCase):
def test_debug_silent_by_default(self):
self.assertEqual("", _capture(log.debug, "trace"))
def test_debug_emits_when_level_lowered(self):
out = _capture(log.debug, "trace", env={"BOT_BOTTLE_LOG_LEVEL": "debug"})
self.assertEqual("bot-bottle: debug: trace\n", out)
def test_error_level_suppresses_info_and_warn(self):
env = {"BOT_BOTTLE_LOG_LEVEL": "error"}
self.assertEqual("", _capture(log.info, "i", env=env))
self.assertEqual("", _capture(log.warn, "w", env=env))
# error still surfaces — nothing sits above it
self.assertEqual(
"bot-bottle: error: e\n", _capture(log.error, "e", env=env)
)
def test_unknown_level_falls_back_to_default(self):
# garbage value → default INFO threshold, so info still prints
out = _capture(log.info, "i", env={"BOT_BOTTLE_LOG_LEVEL": "loud"})
self.assertEqual("bot-bottle: i\n", out)
def test_warning_alias_accepted(self):
env = {"BOT_BOTTLE_LOG_LEVEL": "warning"}
self.assertEqual("", _capture(log.info, "i", env=env))
self.assertEqual(
"bot-bottle: warning: w\n", _capture(log.warn, "w", env=env)
)
class TestDie(unittest.TestCase):
def test_die_still_raises_and_prints_error(self):
buf = io.StringIO()
with contextlib.redirect_stderr(buf):
with self.assertRaises(log.Die) as cm:
log.die("fatal thing")
self.assertEqual("fatal thing", cm.exception.message)
self.assertIn("bot-bottle: error: fatal thing", buf.getvalue())
def test_die_surfaces_even_at_error_level(self):
buf = io.StringIO()
with mock.patch.dict("os.environ", {"BOT_BOTTLE_LOG_LEVEL": "error"}):
with contextlib.redirect_stderr(buf):
with self.assertRaises(log.Die):
log.die("still fatal")
self.assertIn("bot-bottle: error: still fatal", buf.getvalue())
if __name__ == "__main__":
unittest.main()
+4 -49
View File
@@ -2,32 +2,26 @@
from __future__ import annotations
import sys
import unittest
from unittest.mock import patch
from bot_bottle.backend.macos_container import bottle as bottle_mod
from bot_bottle.backend.macos_container.bottle import MacosContainerBottle, _PTY_FORWARD_SCRIPT
from bot_bottle.backend.macos_container.bottle import MacosContainerBottle
class TestMacosContainerBottle(unittest.TestCase):
def test_agent_argv_uses_pty_forward_and_container_exec(self):
def test_agent_argv_uses_container_exec(self):
bottle = MacosContainerBottle(
"bot-bottle-dev-abc",
lambda: None,
None,
agent_command="codex",
)
with patch.dict(bottle_mod.os.environ, {}, clear=True):
argv = bottle.agent_argv(["run"])
self.assertEqual(
[
sys.executable, _PTY_FORWARD_SCRIPT, "--",
"container", "exec", "--interactive", "--tty",
"--env", "TERM",
"bot-bottle-dev-abc", "codex", "run",
],
argv,
bottle.agent_argv(["run"]),
)
def test_agent_argv_includes_workdir(self):
@@ -37,54 +31,15 @@ class TestMacosContainerBottle(unittest.TestCase):
None,
agent_workdir="/home/node/workspace",
)
with patch.dict(bottle_mod.os.environ, {}, clear=True):
argv = bottle.agent_argv([])
self.assertEqual(
[
sys.executable, _PTY_FORWARD_SCRIPT, "--",
"container", "exec", "--interactive", "--tty",
"--env", "TERM",
"--workdir", "/home/node/workspace",
"bot-bottle-dev-abc", "claude",
],
argv,
bottle.agent_argv([]),
)
def test_agent_argv_forwards_terminal_env_names_without_values(self):
bottle = MacosContainerBottle("bot-bottle-dev-abc", lambda: None, None)
with patch.dict(
bottle_mod.os.environ,
{
"TERM": "screen-256color",
"TERM_PROGRAM": "WezTerm",
"WEZTERM_PANE": "pane-id",
"SHELL": "/bin/zsh",
},
clear=True,
):
argv = bottle.agent_argv([])
self.assertIn("TERM", argv)
self.assertIn("TERM_PROGRAM", argv)
self.assertIn("WEZTERM_PANE", argv)
self.assertNotIn("SHELL", argv)
self.assertNotIn("TERM=screen-256color", argv)
self.assertNotIn("TERM_PROGRAM=WezTerm", argv)
self.assertNotIn("WEZTERM_PANE=pane-id", argv)
def test_agent_argv_always_forwards_term_name(self):
bottle = MacosContainerBottle("bot-bottle-dev-abc", lambda: None, None)
with patch.dict(bottle_mod.os.environ, {}, clear=True):
argv = bottle.agent_argv([])
self.assertIn("TERM", argv)
def test_agent_argv_no_tty_omits_wrapper_and_tty_flags(self):
bottle = MacosContainerBottle("bot-bottle-dev-abc", lambda: None, None)
argv = bottle.agent_argv([], tty=False)
self.assertNotIn("--tty", argv)
self.assertNotIn("--env", argv)
self.assertNotIn(_PTY_FORWARD_SCRIPT, argv)
self.assertEqual(["container", "exec", "bot-bottle-dev-abc", "claude"], argv)
def test_exec_pipes_script_to_shell(self):
bottle = MacosContainerBottle("bot-bottle-dev-abc", lambda: None, None)
with patch("bot_bottle.backend.macos_container.bottle.subprocess.run") as run:
+7 -104
View File
@@ -9,12 +9,8 @@ from types import SimpleNamespace
from typing import cast
from unittest.mock import patch
from bot_bottle.agent_provider import AgentProvisionPlan
from bot_bottle.backend import BottleSpec
from bot_bottle.backend.macos_container import launch
from bot_bottle.backend.macos_container.bottle_plan import MacosContainerBottlePlan
from bot_bottle.egress import EgressPlan
from bot_bottle.git_gate import GitGatePlan
from bot_bottle.manifest import ManifestIndex
_MANIFEST = ManifestIndex.from_json_obj({
@@ -30,9 +26,8 @@ def _plan(
supervise: bool = False,
agent_git_gate_url: str = "",
agent_supervise_url: str = "",
canary: bool = False,
) -> MacosContainerBottlePlan:
routes_path = stage_dir / "routes.yaml"
routes_path = stage_dir / "source-routes.yaml"
routes_path.write_text("routes: []\n", encoding="utf-8")
ca_dir = stage_dir / "egress-ca"
ca_dir.mkdir(exist_ok=True)
@@ -43,8 +38,6 @@ def _plan(
routes_path=routes_path,
routes=("route",),
token_env_map={"EGRESS_TOKEN_0": "HOST_TOKEN"},
canary="fake-canary-value" if canary else "",
canary_env="CANON_ALPHA_SECRET" if canary else "",
)
if git:
key_path = stage_dir / "origin-key"
@@ -132,35 +125,20 @@ class TestMacosContainerLaunchArgv(unittest.TestCase):
f"type=bind,source={self.stage_dir / 'egress-ca'},target=/home/mitmproxy/.mitmproxy",
argv,
)
routes_dir = self.stage_dir / "macos-container-egress"
self.assertIn(
f"type=bind,source={self.stage_dir},target=/etc/egress,readonly",
f"type=bind,source={routes_dir},target=/etc/egress,readonly",
argv,
)
self.assertEqual(
"routes: []\n",
(routes_dir / "routes.yaml").read_text(encoding="utf-8"),
)
self.assertIn(
"type=bind,source=/state/supervise/queue,target=/run/supervise/queue",
argv,
)
def test_sidecar_argv_registers_canary_env_as_sensitive(self):
plan = _plan(stage_dir=self.stage_dir, canary=True)
argv = launch._sidecar_run_argv(
plan,
"bot-bottle-sidecars-dev-abc",
"bot-bottle-net-dev-abc",
"bot-bottle-egress-dev-abc",
)
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", argv)
self.assertIn("BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET", argv)
def test_agent_argv_receives_canary_env(self):
plan = _plan(stage_dir=self.stage_dir, canary=True)
argv = launch._agent_run_argv(
plan,
"bot-bottle-net-dev-abc",
"192.0.2.10",
)
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", argv)
def test_agent_env_points_proxy_at_sidecar_ip(self):
plan = _plan(
stage_dir=self.stage_dir,
@@ -288,80 +266,5 @@ class TestMacosContainerLaunchArgv(unittest.TestCase):
)
def _build_plan(stage_dir: Path) -> MacosContainerBottlePlan:
return MacosContainerBottlePlan(
spec=cast(BottleSpec, SimpleNamespace()),
manifest=_MANIFEST,
stage_dir=stage_dir,
git_gate_plan=cast(GitGatePlan, SimpleNamespace(upstreams=())),
egress_plan=cast(EgressPlan, SimpleNamespace(canary="")),
supervise_plan=None,
agent_provision=AgentProvisionPlan(
template="claude",
command="claude",
prompt_mode="append_file",
image="bot-bottle-agent:latest",
dockerfile="/repo/Dockerfile",
guest_home="/home/node",
instance_name="bot-bottle-dev-abc",
prompt_file=stage_dir / "prompt.txt",
guest_env={},
),
slug="dev-abc",
forwarded_env={},
)
class TestMacosContainerLaunchCommittedImage(unittest.TestCase):
def setUp(self):
self._tmp = tempfile.TemporaryDirectory()
self.stage_dir = Path(self._tmp.name)
def tearDown(self):
self._tmp.cleanup()
def test_build_images_uses_committed_image_when_present(self):
plan = _build_plan(self.stage_dir)
calls = []
def fake_build(image: str, context: str, *, dockerfile: str = "") -> None:
calls.append((image, context, dockerfile))
with patch.object(
launch, "read_committed_image",
return_value="bot-bottle-committed-dev-abc:latest",
), patch.object(
launch.container_mod, "image_exists", return_value=True,
), patch.object(
launch.container_mod, "build_image", side_effect=fake_build,
), patch.object(launch, "info"):
updated = launch._build_images(plan)
self.assertEqual("bot-bottle-committed-dev-abc:latest", updated.image)
self.assertEqual(1, len(calls))
self.assertEqual(launch.SIDECAR_BUNDLE_IMAGE, calls[0][0])
def test_build_images_builds_agent_when_committed_image_missing(self):
plan = _build_plan(self.stage_dir)
calls = []
def fake_build(image: str, context: str, *, dockerfile: str = "") -> None:
calls.append((image, context, dockerfile))
with patch.object(
launch, "read_committed_image",
return_value="bot-bottle-committed-dev-abc:latest",
), patch.object(
launch.container_mod, "image_exists", return_value=False,
), patch.object(
launch.container_mod, "build_image", side_effect=fake_build,
):
updated = launch._build_images(plan)
self.assertEqual("bot-bottle-agent:latest", updated.image)
self.assertEqual(2, len(calls))
self.assertEqual("bot-bottle-agent:latest", calls[1][0])
if __name__ == "__main__":
unittest.main()
@@ -1,159 +0,0 @@
"""Unit: macos-container pty_forward raw-mode wrapper (issue #245).
Tests argument parsing, non-TTY fallback, and the raw-mode
setup/restore sequence without requiring a real terminal.
"""
from __future__ import annotations
import io
import termios
import unittest
from unittest.mock import ANY, MagicMock, patch
from bot_bottle.backend.macos_container import pty_forward
def _fake_stdin(fd: int = 0) -> MagicMock:
"""Return a mock stdin whose fileno() returns *fd*."""
m = MagicMock()
m.fileno.return_value = fd
return m
class TestArgvParsing(unittest.TestCase):
def test_missing_separator_returns_error_exit_code(self):
with patch.object(pty_forward.sys, "stderr", new=io.StringIO()) as err:
rc = pty_forward.main(["container", "exec"])
self.assertEqual(2, rc)
self.assertIn("usage:", err.getvalue())
def test_too_few_args_returns_error_exit_code(self):
with patch.object(pty_forward.sys, "stderr", new=io.StringIO()):
self.assertEqual(2, pty_forward.main([]))
self.assertEqual(2, pty_forward.main(["--"]))
def test_separator_at_start_with_inner_is_valid(self):
with (
patch.object(pty_forward.sys, "stdin", _fake_stdin()),
patch.object(pty_forward.os, "isatty", return_value=False),
patch.object(pty_forward.subprocess, "run") as run,
):
run.return_value.returncode = 0
rc = pty_forward.main(["--", "container", "exec"])
self.assertEqual(0, rc)
run.assert_called_once()
self.assertEqual(["container", "exec"], run.call_args.args[0])
self.assertFalse(run.call_args.kwargs["check"])
class TestNonTtyFallback(unittest.TestCase):
def test_non_tty_stdin_runs_inner_directly(self):
with (
patch.object(pty_forward.sys, "stdin", _fake_stdin()),
patch.object(pty_forward.os, "isatty", return_value=False),
patch.object(pty_forward.subprocess, "run") as run,
):
run.return_value.returncode = 42
rc = pty_forward.main(
["--", "container", "exec", "--interactive", "--tty", "c", "claude"]
)
self.assertEqual(42, rc)
run.assert_called_once()
self.assertEqual(
["container", "exec", "--interactive", "--tty", "c", "claude"],
run.call_args.args[0],
)
self.assertFalse(run.call_args.kwargs["check"])
def test_fileno_error_runs_inner_directly(self):
bad_stdin = MagicMock()
bad_stdin.fileno.side_effect = OSError("pseudofile")
with (
patch.object(pty_forward.sys, "stdin", bad_stdin),
patch.object(pty_forward.subprocess, "run") as run,
):
run.return_value.returncode = 0
rc = pty_forward.main(["--", "container", "exec"])
run.assert_called_once()
self.assertEqual(["container", "exec"], run.call_args.args[0])
self.assertFalse(run.call_args.kwargs["check"])
self.assertEqual(0, rc)
class TestRawModeSetupAndRestore(unittest.TestCase):
def test_tty_stdin_sets_raw_mode_and_restores_on_exit(self):
saved_attrs = object()
with (
patch.object(pty_forward.sys, "stdin", _fake_stdin()),
patch.object(pty_forward.os, "isatty", return_value=True),
patch.object(pty_forward.termios, "tcgetattr", return_value=saved_attrs),
patch.object(pty_forward.tty, "setraw") as setraw,
patch.object(pty_forward.termios, "tcsetattr") as tcsetattr,
patch.object(pty_forward.subprocess, "run") as run,
):
run.return_value.returncode = 0
rc = pty_forward.main(["--", "container", "exec"])
self.assertEqual(0, rc)
setraw.assert_called_once()
tcsetattr.assert_called_once_with(
ANY, termios.TCSADRAIN, saved_attrs,
)
def test_tty_restores_on_subprocess_nonzero_exit(self):
saved_attrs = object()
with (
patch.object(pty_forward.sys, "stdin", _fake_stdin()),
patch.object(pty_forward.os, "isatty", return_value=True),
patch.object(pty_forward.termios, "tcgetattr", return_value=saved_attrs),
patch.object(pty_forward.tty, "setraw"),
patch.object(pty_forward.termios, "tcsetattr") as tcsetattr,
patch.object(pty_forward.subprocess, "run") as run,
):
run.return_value.returncode = 1
rc = pty_forward.main(["--", "container", "exec"])
self.assertEqual(1, rc)
tcsetattr.assert_called_once_with(
ANY, termios.TCSADRAIN, saved_attrs,
)
def test_tcgetattr_error_falls_back_to_bare_run(self):
with (
patch.object(pty_forward.sys, "stdin", _fake_stdin()),
patch.object(pty_forward.os, "isatty", return_value=True),
patch.object(
pty_forward.termios, "tcgetattr",
side_effect=termios.error("not a tty"),
),
patch.object(pty_forward.tty, "setraw") as setraw,
patch.object(pty_forward.subprocess, "run") as run,
):
run.return_value.returncode = 0
rc = pty_forward.main(["--", "container", "exec"])
setraw.assert_not_called()
run.assert_called_once()
self.assertEqual(["container", "exec"], run.call_args.args[0])
self.assertFalse(run.call_args.kwargs["check"])
self.assertEqual(0, rc)
def test_inner_run_sets_term_default_without_mutating_process_env(self):
with (
patch.dict(pty_forward.os.environ, {}, clear=True),
patch.object(pty_forward.subprocess, "run") as run,
):
run.return_value.returncode = 0
rc = pty_forward._run_inner(["container", "exec"])
self.assertNotIn("TERM", pty_forward.os.environ)
self.assertEqual(0, rc)
child_env = run.call_args.kwargs["env"]
self.assertEqual(["TERM"], sorted(child_env.keys()))
self.assertEqual("xterm-256color", child_env["TERM"])
if __name__ == "__main__":
unittest.main()
-74
View File
@@ -73,80 +73,6 @@ resolver #2
)
self.assertTrue(run.call_args_list[-1].kwargs["check"])
def test_build_image_anchors_relative_dockerfile_to_context(self):
status = util.subprocess.CompletedProcess(
args=[],
returncode=0,
stdout=(
'[{"status":{"state":"running"},'
'"configuration":{"dns":{"nameservers":["9.9.9.9"]}}}]'
),
stderr="",
)
with patch.object(util.subprocess, "run", return_value=status) as run, \
patch.object(util.os, "environ", {
"BOT_BOTTLE_MACOS_CONTAINER_DNS": "9.9.9.9",
}):
util.build_image(
"bot-bottle-sidecars:latest",
"/repo",
dockerfile="Dockerfile.sidecars",
)
self.assertEqual(
[
"container", "build", "-t", "bot-bottle-sidecars:latest",
"--dns", "9.9.9.9", "-f", "/repo/Dockerfile.sidecars", "/repo",
],
run.call_args_list[-1].args[0],
)
def test_commit_container_execs_tar_and_builds_image(self):
# stderr is bytes because subprocess.run uses stderr=PIPE without text=True
completed = util.subprocess.CompletedProcess(
args=[], returncode=0, stdout=b"", stderr=b"",
)
dockerfile_text = ""
def fake_build_image(image_tag: str, context: str, *, dockerfile: str = "") -> None:
nonlocal dockerfile_text
with open(dockerfile, encoding="utf-8") as f:
dockerfile_text = f.read()
with patch.object(util.subprocess, "run", return_value=completed) as run, \
patch.object(util, "build_image", side_effect=fake_build_image) as build_image, \
patch.object(util, "info"):
util.commit_container(
"bot-bottle-dev-abc12",
"bot-bottle-committed-dev-abc12:latest",
)
argv = run.call_args.args[0]
self.assertEqual("container", argv[0])
self.assertEqual("exec", argv[1])
self.assertIn("bot-bottle-dev-abc12", argv)
self.assertIn("tar", argv)
self.assertIn("--directory=/", argv)
build_image.assert_called_once()
self.assertEqual(
"bot-bottle-committed-dev-abc12:latest",
build_image.call_args.args[0],
)
self.assertIn("ADD rootfs.tar /\n", dockerfile_text)
self.assertIn("USER node\n", dockerfile_text)
self.assertIn("WORKDIR /home/node\n", dockerfile_text)
def test_commit_container_dies_on_exec_tar_failure(self):
failed = util.subprocess.CompletedProcess(
args=[], returncode=1, stdout=b"", stderr=b"No such container",
)
with patch.object(util.subprocess, "run", return_value=failed), \
patch.object(util, "die", side_effect=SystemExit("die")) as die:
with self.assertRaises(SystemExit):
util.commit_container("missing-container", "some:tag")
die.assert_called_once()
self.assertIn("missing-container", die.call_args.args[0])
def test_build_image_restarts_builder_when_dns_mismatches(self):
status = util.subprocess.CompletedProcess(
args=[],
+1 -46
View File
@@ -167,40 +167,13 @@ class TestAgentProviderHostCredentials(unittest.TestCase):
},
})
def test_startup_args_allowed_for_claude(self):
b = _provider_config_bottle({
"template": "claude",
"settings": {"startup_args": ["--model", "opus"]},
})
self.assertEqual(
{"startup_args": ["--model", "opus"]},
b.agent_provider.settings,
)
def test_startup_args_allowed_for_codex(self):
b = _provider_config_bottle({
"template": "codex",
"settings": {"startup_args": ["--model", "gpt-5-codex"]},
})
self.assertEqual(
{"startup_args": ["--model", "gpt-5-codex"]},
b.agent_provider.settings,
)
def test_provider_specific_settings_still_rejected_for_claude(self):
def test_settings_rejected_for_claude(self):
with self.assertRaises(ManifestError):
_provider_config_bottle({
"template": "claude",
"settings": {"models": ["qwen2.5-coder:7b"]},
})
def test_startup_args_must_be_string_array(self):
with self.assertRaises(ManifestError):
_provider_config_bottle({
"template": "codex",
"settings": {"startup_args": ["--model", 42]},
})
def test_settings_models_must_be_non_empty_string_array(self):
with self.assertRaises(ManifestError):
_provider_config_bottle({
@@ -329,24 +302,6 @@ class TestDlp(unittest.TestCase):
"bogus": True,
}}])
def test_outbound_on_match_omitted_is_empty(self):
b = _bottle([{"host": "x.example"}])
self.assertEqual("", b.egress.routes[0].OutboundOnMatch)
def test_outbound_on_match_accepts_policies(self):
for policy in ("block", "redact", "supervise"):
with self.subTest(policy=policy):
b = _bottle([{"host": "x.example", "dlp": {
"outbound_on_match": policy,
}}])
self.assertEqual(policy, b.egress.routes[0].OutboundOnMatch)
def test_outbound_on_match_rejects_unknown_value(self):
with self.assertRaises(ManifestError):
_bottle([{"host": "x.example", "dlp": {
"outbound_on_match": "allow",
}}])
class TestGitPolicy(unittest.TestCase):
def test_omitted_means_https_git_fetch_disabled(self):
+1 -1
View File
@@ -130,7 +130,7 @@ def _capture_print(plan: DockerBottlePlan | SmolmachinesBottlePlan) -> list[str]
orig = sys.stderr
sys.stderr = buf
try:
plan.print()
plan.print(remote_control=False)
finally:
sys.stderr = orig
return buf.getvalue().splitlines()
@@ -16,8 +16,6 @@ from __future__ import annotations
import tempfile
import unittest
from pathlib import Path
from types import SimpleNamespace
from typing import Any, cast
from unittest.mock import patch
from bot_bottle.backend.smolmachines import launch as _launch_mod
@@ -143,46 +141,5 @@ class TestEnsureSmolmachine(unittest.TestCase):
self.assertTrue(str(pack_args[1]).endswith(f"{digest}.smolmachine"))
class TestAgentFromPath(unittest.TestCase):
def _plan(self) -> Any:
return cast(Any, SimpleNamespace(
slug="dev-abc12",
agent_image="bot-bottle-claude:latest",
agent_dockerfile_path="/repo/Dockerfile",
))
def test_uses_committed_artifact_when_present(self):
with tempfile.TemporaryDirectory(prefix="committed-smolmachine.") as tmp:
artifact = Path(tmp) / "committed-smolmachine.smolmachine"
artifact.write_text("")
with patch.object(
_launch_mod, "read_committed_image", return_value=str(artifact),
), patch.object(
_launch_mod, "_ensure_smolmachine",
) as ensure, patch.object(
_launch_mod, "info",
):
result = _launch_mod._agent_from_path(self._plan())
self.assertEqual(artifact, result)
ensure.assert_not_called()
def test_falls_back_when_committed_artifact_missing(self):
packed = Path("/cache/agent.smolmachine")
with patch.object(
_launch_mod, "read_committed_image",
return_value="/missing/committed.smolmachine",
), patch.object(
_launch_mod, "_ensure_smolmachine", return_value=packed,
) as ensure:
result = _launch_mod._agent_from_path(self._plan())
self.assertEqual(packed, result)
ensure.assert_called_once_with(
"bot-bottle-claude:latest",
dockerfile="/repo/Dockerfile",
)
if __name__ == "__main__":
unittest.main()
+4 -29
View File
@@ -26,7 +26,9 @@ from bot_bottle.backend.smolmachines.bottle import SmolmachinesBottle
from bot_bottle.backend.smolmachines.bottle_plan import (
SmolmachinesBottlePlan,
)
from bot_bottle.backend.smolmachines import launch as _launch
# from bot_bottle.backend.smolmachines.provision import (
# workspace as _workspace,
# )
from bot_bottle.backend.smolmachines.launch import _bundle_launch_spec
from bot_bottle.backend.util import AGENT_CA_PATH
from bot_bottle.egress import EgressPlan, EgressRoute
@@ -42,6 +44,7 @@ class _Provider(AgentProvider):
return AgentProviderRuntime(
template="test", command="test", image="",
prompt_mode="append_file", bypass_args=(), resume_args=(),
remote_control_args=(),
)
def provision_plan(self, **kwargs): # type: ignore[override]
raise NotImplementedError
@@ -83,7 +86,6 @@ def _plan(
stage_dir: Path | None = None,
egress_routes: tuple[EgressRoute, ...] = (),
egress_ca_path: Path = Path(),
canary: bool = False,
supervise: bool = False,
bundle_ip: str = "192.168.50.2",
agent_git_gate_host: str = "127.0.0.1:55555",
@@ -154,8 +156,6 @@ def _plan(
routes=egress_routes,
token_env_map={},
mitmproxy_ca_cert_only_host_path=egress_ca_path,
canary="fake-canary-value" if canary else "",
canary_env="CANON_ALPHA_SECRET" if canary else "",
),
supervise_plan=supervise_plan,
agent_git_gate_host=agent_git_gate_host,
@@ -411,31 +411,6 @@ class TestBundleLaunchSpec(unittest.TestCase):
self.assertIn(9420, spec.ports_to_publish)
self.assertNotIn(9418, spec.ports_to_publish)
def test_canary_env_registered_as_sensitive_in_bundle(self):
plan = _plan(canary=True)
spec = _bundle_launch_spec(plan, "net", "127.0.0.16")
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", spec.environment)
self.assertIn(
"BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET",
spec.environment,
)
def test_canary_env_visible_to_smolvm_guest(self):
plan = _plan(canary=True)
with patch.object(
_launch._bundle,
"bundle_host_port",
return_value="65000",
):
stamped = _launch._discover_urls(plan, "127.0.0.16")
self.assertEqual(
"fake-canary-value",
stamped.guest_env["CANON_ALPHA_SECRET"],
)
class TestProvisionGitUser(unittest.TestCase):
"""`provision_git` runs `git config --global` inside the
-20
View File
@@ -24,7 +24,6 @@ from bot_bottle.backend.smolmachines.smolvm import (
machine_start,
machine_stop,
pack_create,
pack_create_from_vm,
wait_exec_ready,
)
@@ -64,17 +63,6 @@ class TestArgvShapes(unittest.TestCase):
argv,
)
def test_pack_create_from_vm_argv(self):
with self._patch_run() as m:
pack_create_from_vm("bot-bottle-dev-abc12", Path("/tmp/committed"))
argv = m.call_args.args[0]
self.assertEqual(
["smolvm", "pack", "create",
"--from-vm", "bot-bottle-dev-abc12",
"-o", "/tmp/committed"],
argv,
)
def test_machine_create_minimal(self):
with self._patch_run() as m:
machine_create("agent-xyz")
@@ -205,14 +193,6 @@ class TestErrorPath(unittest.TestCase):
with self.assertRaises(SmolvmError):
pack_create("missing:tag", Path("/tmp/out"))
def test_pack_create_from_vm_failure_raises(self):
with patch(
"bot_bottle.backend.smolmachines.smolvm.subprocess.run",
return_value=_fail("pack failed"),
):
with self.assertRaises(SmolvmError):
pack_create_from_vm("bot-bottle-dev-abc12", Path("/tmp/out"))
def test_exec_failure_returns_result(self):
# The in-VM command's exit code is what Bottle.exec sees;
# `false` exiting non-zero is not a smolvm failure.
+3 -23
View File
@@ -17,7 +17,6 @@ from bot_bottle.supervise import (
STATUS_MODIFIED,
STATUS_REJECTED,
TOOL_CAPABILITY_BLOCK,
TOOL_GITLEAKS_ALLOW,
archive_proposal,
audit_log_path,
list_pending_proposals,
@@ -318,34 +317,15 @@ class TestToolConstants(unittest.TestCase):
def test_tools_tuple_matches_individual_constants(self):
self.assertEqual(
(
supervise.TOOL_EGRESS_ALLOW,
TOOL_CAPABILITY_BLOCK,
supervise.TOOL_EGRESS_BLOCK,
TOOL_GITLEAKS_ALLOW,
supervise.TOOL_EGRESS_TOKEN_ALLOW,
supervise.TOOL_LIST_EGRESS_ROUTES,
),
supervise.TOOLS,
)
def test_token_allow_proposal_roundtrips(self):
p = Proposal.new(
bottle_slug="dev",
tool=supervise.TOOL_EGRESS_TOKEN_ALLOW,
proposed_file="host: api.example.com\n",
justification="false positive",
current_file_hash="h",
)
self.assertEqual(p, Proposal.from_dict(p.to_dict()))
def test_component_map_has_egress_entries(self):
self.assertEqual(
{
supervise.TOOL_EGRESS_ALLOW: "egress",
supervise.TOOL_EGRESS_BLOCK: "egress",
},
supervise.COMPONENT_FOR_TOOL,
)
def test_component_map_has_no_entries(self):
# egress-block removed in issue #198; capability-block never had one.
self.assertEqual({}, supervise.COMPONENT_FOR_TOOL)
class _StubSupervise(supervise.Supervise):
+3 -80
View File
@@ -2,6 +2,9 @@
The curses TUI itself isn't exercised here — these tests cover the
discovery + approve/reject paths that the TUI's key handlers call into.
egress-block (add_route) was removed in issue #198; the TestEgressApplyWiring
class and all stubs for add_route have been dropped accordingly.
"""
import os
@@ -9,7 +12,6 @@ import tempfile
import unittest
from datetime import datetime, timezone
from pathlib import Path
from unittest.mock import patch
from bot_bottle import supervise
from bot_bottle.cli import supervise as supervise_cli
@@ -19,8 +21,6 @@ from bot_bottle.supervise import (
STATUS_MODIFIED,
STATUS_REJECTED,
TOOL_CAPABILITY_BLOCK,
TOOL_GITLEAKS_ALLOW,
TOOL_EGRESS_TOKEN_ALLOW,
read_audit_entries,
read_response,
sha256_hex,
@@ -33,10 +33,6 @@ FIXED = datetime(2026, 5, 25, 12, 0, 0, tzinfo=timezone.utc)
def _proposal(slug: str = "dev", tool: str = TOOL_CAPABILITY_BLOCK) -> Proposal:
payloads = {
TOOL_CAPABILITY_BLOCK: "FROM python:3.13\n",
supervise.TOOL_EGRESS_ALLOW: "routes:\n - host: example.com\n",
supervise.TOOL_EGRESS_BLOCK: "routes:\n - host: example.com\n",
TOOL_GITLEAKS_ALLOW: "file: tests/test_fixture.py\nline: 3\n",
TOOL_EGRESS_TOKEN_ALLOW: "host: api.example.com\ndetector: token\n",
}
payload = payloads.get(tool, "")
return Proposal.new(
@@ -158,79 +154,6 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
supervise_cli.approve(qp)
self.assertEqual([], read_audit_entries("egress", "dev"))
def test_approve_egress_block_writes_audit_log(self):
qp = self._enqueue(tool=supervise.TOOL_EGRESS_BLOCK)
with patch(
"bot_bottle.cli.supervise.apply_routes_change",
return_value=("routes: []\n", "routes:\n - host: example.com\n"),
) as apply_routes_change:
supervise_cli.approve(qp)
apply_routes_change.assert_called_once_with(
"dev",
"routes:\n - host: example.com\n",
)
entries = read_audit_entries("egress", "dev")
self.assertEqual(1, len(entries))
self.assertEqual(STATUS_APPROVED, entries[0].operator_action)
self.assertEqual("needed for dev", entries[0].justification)
def test_approve_gitleaks_allow_leaves_response_for_gate(self):
qp = self._enqueue(tool=TOOL_GITLEAKS_ALLOW)
supervise_cli.approve(qp, notes="dummy fixture")
# Gate polls the queue dir for the response; TUI must not archive it.
resp = read_response(qp.queue_dir, qp.proposal.id)
self.assertEqual(STATUS_APPROVED, resp.status)
self.assertEqual("dummy fixture", resp.notes)
self.assertFalse((qp.queue_dir / "processed").exists())
def test_tui_gitleaks_allow_requires_reason(self):
qp = self._enqueue(tool=TOOL_GITLEAKS_ALLOW)
with patch.object(supervise_cli, "_prompt", return_value=""):
status = supervise_cli._approve_from_tui(None, qp) # type: ignore[arg-type]
self.assertEqual("approve aborted (empty reason)", status)
self.assertFalse((qp.queue_dir / "processed").exists())
def test_tui_gitleaks_allow_writes_reason(self):
qp = self._enqueue(tool=TOOL_GITLEAKS_ALLOW)
with patch.object(supervise_cli, "_prompt", return_value="test fixture"):
status = supervise_cli._approve_from_tui(None, qp) # type: ignore[arg-type]
self.assertIn("approved gitleaks-allow", status)
resp = read_response(qp.queue_dir, qp.proposal.id)
self.assertEqual("test fixture", resp.notes)
def test_approve_token_allow_leaves_response_for_egress(self):
qp = self._enqueue(tool=TOOL_EGRESS_TOKEN_ALLOW)
supervise_cli.approve(qp, notes="false positive")
# The egress addon polls the queue dir for the response; the TUI must
# not archive it (the addon archives after reading).
resp = read_response(qp.queue_dir, qp.proposal.id)
self.assertEqual(STATUS_APPROVED, resp.status)
self.assertEqual("false positive", resp.notes)
self.assertFalse((qp.queue_dir / "processed").exists())
def test_token_allow_writes_no_audit_log(self):
qp = self._enqueue(tool=TOOL_EGRESS_TOKEN_ALLOW)
supervise_cli.approve(qp, notes="false positive")
self.assertEqual([], read_audit_entries("egress", "dev"))
def test_tui_token_allow_requires_reason(self):
qp = self._enqueue(tool=TOOL_EGRESS_TOKEN_ALLOW)
with patch.object(supervise_cli, "_prompt", return_value=""):
status = supervise_cli._approve_from_tui(None, qp) # type: ignore[arg-type]
self.assertEqual("approve aborted (empty reason)", status)
self.assertFalse((qp.queue_dir / "processed").exists())
def test_tui_token_allow_writes_reason(self):
qp = self._enqueue(tool=TOOL_EGRESS_TOKEN_ALLOW)
with patch.object(supervise_cli, "_prompt", return_value="legit"):
status = supervise_cli._approve_from_tui(None, qp) # type: ignore[arg-type]
self.assertIn("approved egress-token-allow", status)
resp = read_response(qp.queue_dir, qp.proposal.id)
self.assertEqual("legit", resp.notes)
def test_suffix_for_token_allow_is_txt(self):
self.assertEqual(".txt", supervise_cli._suffix_for_tool(TOOL_EGRESS_TOKEN_ALLOW))
# class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
# # DISABLED — capability_apply functionality is currently commented out.
+5 -54
View File
@@ -54,28 +54,13 @@ class TestValidation(unittest.TestCase):
)
def test_empty_proposed_file_rejected_for_tools_with_file_field(self):
# egress-block has structured input (validated in
# _validate_and_bundle_egress_route, not here) and
# list-egress-routes takes no input. Only capability-block
# goes through `validate_proposed_file`.
with self.assertRaises(_RpcError):
validate_proposed_file(_sv.TOOL_CAPABILITY_BLOCK, " \n\t")
def test_egress_routes_yaml_is_validated(self):
validate_proposed_file(
_sv.TOOL_EGRESS_ALLOW,
"routes:\n - host: example.com\n",
)
def test_invalid_egress_routes_yaml_rejected(self):
with self.assertRaises(_RpcError):
validate_proposed_file(_sv.TOOL_EGRESS_BLOCK, "routes: nope\n")
def test_egress_routes_yaml_rejects_log_full(self):
with self.assertRaises(_RpcError) as cm:
validate_proposed_file(
_sv.TOOL_EGRESS_ALLOW,
"log: 2\nroutes:\n - host: example.com\n",
)
self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code)
self.assertIn("must not change egress logging", cm.exception.message)
# --- JSON-RPC parsing ------------------------------------------------------
@@ -156,9 +141,7 @@ class TestHandleToolsList(unittest.TestCase):
names = [t["name"] for t in result["tools"]] # type: ignore[index]
self.assertEqual(
sorted([
_sv.TOOL_EGRESS_ALLOW,
_sv.TOOL_CAPABILITY_BLOCK,
_sv.TOOL_EGRESS_BLOCK,
_sv.TOOL_LIST_EGRESS_ROUTES,
]),
sorted(names),
@@ -189,17 +172,6 @@ class TestHandleToolsList(unittest.TestCase):
# No `required` array because no inputs are required.
self.assertNotIn("required", schema) # type: ignore[operator]
def test_egress_tools_take_routes_yaml_and_justification(self):
for tool_name in (_sv.TOOL_EGRESS_ALLOW, _sv.TOOL_EGRESS_BLOCK):
with self.subTest(tool_name=tool_name):
tool = next(t for t in TOOL_DEFINITIONS if t["name"] == tool_name)
schema = tool["inputSchema"]
self.assertEqual("object", schema["type"]) # type: ignore[index]
self.assertEqual(
["routes_yaml", "justification"],
schema["required"], # type: ignore[index]
)
class TestHandleToolsCall(unittest.TestCase):
def setUp(self):
@@ -248,26 +220,6 @@ class TestHandleToolsCall(unittest.TestCase):
self.assertIn("status: approved", text)
self.assertIn("notes: lgtm", text)
def test_allow_round_trips_through_queue(self):
responder = self._respond_when_proposal_appears(_sv.STATUS_APPROVED, notes="ok")
try:
result = handle_tools_call(
{
"name": _sv.TOOL_EGRESS_ALLOW,
"arguments": {
"routes_yaml": "routes:\n - host: example.com\n",
"justification": "need example.com",
},
},
self.config,
)
finally:
responder.join()
self.assertFalse(result["isError"]) # type: ignore[index]
text = result["content"][0]["text"] # type: ignore[index]
self.assertIn("status: approved", text)
self.assertIn("notes: ok", text)
def test_rejected_response_sets_isError(self):
responder = self._respond_when_proposal_appears(_sv.STATUS_REJECTED, notes="nope")
try:
@@ -460,8 +412,7 @@ class TestHttpEndToEnd(unittest.TestCase):
self.assertEqual(1, result["id"])
names = [t["name"] for t in result["result"]["tools"]] # type: ignore[index]
self.assertIn(_sv.TOOL_CAPABILITY_BLOCK, names)
self.assertIn(_sv.TOOL_EGRESS_ALLOW, names)
self.assertIn(_sv.TOOL_EGRESS_BLOCK, names)
self.assertNotIn("egress-block", names)
def test_unknown_method_returns_jsonrpc_error(self):
result = self._post_jsonrpc(