Compare commits

..

8 Commits

Author SHA1 Message Date
didericis-claude cadaa5dc25 refactor(types): move loaded manifest from BottleSpec to BottlePlan
lint / lint (push) Successful in 1m44s
test / unit (pull_request) Successful in 34s
test / integration (pull_request) Successful in 19s
BottleSpec.manifest was ManifestIndex | Manifest — a union encoding
two lifecycle stages in one field. The union was unjustifiable:
it forced a type-narrowing workaround (loaded_manifest property)
on every consumer.

Clean split:
- BottleSpec.manifest: ManifestIndex (always; CLI-supplied intent)
- BottlePlan.manifest: Manifest (always; loaded by _validate())

_validate() returns the loaded Manifest directly. prepare() passes
it to _resolve_plan(), which stores it on the plan. All provisioner
code now reads plan.manifest.agent / plan.manifest.bottle — no
union, no asserts, no type: ignore.
2026-06-23 02:22:10 +00:00
didericis-claude 910b601a5e fix(types): add BottleSpec.loaded_manifest to satisfy pyright on union type
lint / lint (push) Successful in 1m43s
test / unit (pull_request) Successful in 36s
test / integration (pull_request) Successful in 18s
BottleSpec.manifest is ManifestIndex | Manifest (pre/post _validate()).
Downstream code always runs post-validate so it needs Manifest, but
pyright flagged every .agent/.bottle access. The new loaded_manifest
property asserts isinstance and returns Manifest, giving pyright a
narrowed type without scattering type: ignore everywhere.

Also remove unused Manifest imports from test files and annotate the
_index() helper in test_manifest_agent_git_user.
2026-06-23 02:08:27 +00:00
didericis-claude fa0a5fbb9c refactor(manifest): split Manifest into ManifestIndex + Manifest single-value type
lint / lint (push) Failing after 1m42s
test / unit (pull_request) Successful in 35s
test / integration (pull_request) Successful in 18s
Manifest now holds exactly one agent and one effective bottle (with
git_user overlay already applied). The old multi-agent/bottle
collection is renamed ManifestIndex. BottleSpec.manifest starts as
ManifestIndex from the CLI and becomes Manifest after _validate()
calls load_for_agent(); all provisioning code downstream reads
spec.manifest.agent / spec.manifest.bottle instead of indexing by name.
2026-06-23 00:56:30 +00:00
didericis-claude cd93fc71f7 docs: clarify load_for_agent invariant in docstring
lint / lint (push) Successful in 1m47s
test / unit (pull_request) Successful in 31s
test / integration (pull_request) Successful in 17s
2026-06-22 19:54:41 +00:00
didericis-claude 3e94472e78 fix: load_for_agent always returns single-agent manifest
lint / lint (push) Successful in 1m42s
test / unit (pull_request) Successful in 32s
test / integration (pull_request) Successful in 19s
Filter to exactly one agent and one bottle in both the lazy (md-dirs)
and eager (from_json_obj) paths so the returned manifest invariant
holds regardless of how the manifest was constructed.
2026-06-22 19:54:21 +00:00
didericis-claude 0609813ba0 refactor: scan filenames at resolve, parse only selected agent at preflight
lint / lint (push) Successful in 1m37s
test / unit (pull_request) Successful in 32s
test / integration (pull_request) Successful in 17s
Manifest.resolve() now returns an empty-dict manifest with only directory
paths recorded (home_md, cwd_md). No content is read from any .md file
until load_for_agent() is called for a specific agent at preflight.

- Manifest.from_md_dirs: scan-only, no frontmatter parsing
- Manifest.load_for_agent: parses the selected agent file and its bottle
  chain; works on eager (from_json_obj) manifests too by returning self
- Manifest.all_agent_names: scans filenames in lazy mode
- backend._validate: calls load_for_agent and propagates upgraded spec
- cli/info.py, cli/list.py, cli/start.py: use load_for_agent / all_agent_names
- manifest_extends.py: reverted to original (no partial-resolve helpers)
- manifest_loader.py: only scan_agent_names + load_bottle_chain_from_dir
- Tests updated to call load_for_agent before accessing agents/bottles;
  test_md_agent_repos_deferred renamed to test_md_agent_repos_fails_at_preflight
2026-06-22 19:42:49 +00:00
didericis-claude 09e04359e3 fix: resolve pyright reportUnusedImport in manifest_extends
Import ManifestError at module level from manifest_util (no circular
dep) and remove the redundant local imports from function bodies that
were shadowing it. ManifestBottle retains its local import pattern to
avoid the circular manifest ↔ manifest_extends dependency.
2026-06-22 19:42:49 +00:00
didericis-claude fe32f65fa4 feat: defer broken manifest parse errors to preflight
Broken bottle/agent files no longer block the agent selector or prevent
unrelated agents from loading. Per-file parse errors are collected in
`Manifest.broken_agents`; the CLI selector includes them via
`all_agent_names`, and the error surfaces only when the specific agent
is selected and launch is attempted (in `require_agent`/`bottle_for`).

Closes #236
2026-06-22 19:42:49 +00:00
48 changed files with 134 additions and 2411 deletions
-8
View File
@@ -526,11 +526,6 @@ from .docker import DockerBottleBackend # noqa: E402 # pylint: disable=wrong-i
from .macos_container import MacosContainerBottleBackend # noqa: E402 # pylint: disable=wrong-import-position
from .smolmachines import SmolmachinesBottleBackend # noqa: E402 # pylint: disable=wrong-import-position
# Freezer is imported after the backend classes for the same reason:
# Freezer.commit_slug constructs ActiveAgent, which must be fully
# defined first.
from .freeze import CommitCancelled, Freezer, get_freezer # noqa: E402 # pylint: disable=wrong-import-position
# The dict is heterogeneous: each value is a BottleBackend specialized
# over its own plan type. Concrete plan types are erased here because
@@ -618,12 +613,9 @@ __all__ = [
"BottleCleanupPlan",
"BottlePlan",
"BottleSpec",
"CommitCancelled",
"ExecResult",
"Freezer",
"enumerate_active_agents",
"get_bottle_backend",
"get_freezer",
"has_backend",
"known_backend_names",
]
+1 -1
View File
@@ -134,7 +134,7 @@ def _sidecar_bundle_service(plan: DockerBottlePlan) -> dict[str, Any]:
ep = plan.egress_plan
volumes.append(_bind(ep.mitmproxy_ca_host_path, EGRESS_CA_IN_CONTAINER))
if ep.routes:
volumes.append(_bind(ep.routes_path.parent, str(Path(EGRESS_ROUTES_IN_CONTAINER).parent)))
volumes.append(_bind(ep.routes_path, EGRESS_ROUTES_IN_CONTAINER))
for token_env in sorted(ep.token_env_map.keys()):
env.append(token_env)
+18 -29
View File
@@ -1,21 +1,24 @@
"""Host-side helper for egress sidecar inspection and live updates.
"""Host-side helper for egress sidecar inspection (issue #198).
The approve path uses this module to validate a proposed routes file,
write it to the bottle's live egress state dir, and signal the sidecar
bundle so the mitmproxy addon reloads it.
`_merge_single_route`, `add_route`, and `apply_routes_change` were
removed when the egress-block MCP tool was dropped. The remaining
helpers support runtime inspection and validation of the routes file
without modifying it at runtime.
"""
from __future__ import annotations
import os
import subprocess
from ...egress import EGRESS_ROUTES_IN_CONTAINER
from ...log import warn
from ..egress_apply import EgressApplicator, EgressApplyError
from ...egress_addon_core import load_routes
from .sidecar_bundle import sidecar_bundle_container_name
class EgressApplyError(RuntimeError):
pass
def fetch_current_routes(slug: str) -> str:
container = sidecar_bundle_container_name(slug)
r = subprocess.run(
@@ -30,31 +33,17 @@ def fetch_current_routes(slug: str) -> str:
return r.stdout
class DockerEgressApplicator(EgressApplicator):
def _signal_bundle_reload(self, slug: str) -> None:
container = sidecar_bundle_container_name(slug)
result = subprocess.run(
["docker", "kill", "--signal", "HUP", container],
capture_output=True, text=True, check=False, env=os.environ,
)
if result.returncode != 0:
last_error = (result.stderr or "").strip() or (result.stdout or "").strip()
warn(
f"egress: routes updated on disk for {slug}, but bundle reload failed: "
f"{last_error or 'docker kill failed'}"
)
raise EgressApplyError(
f"could not reload egress bundle {container}: "
f"{last_error or 'docker kill failed'}"
)
applicator = DockerEgressApplicator()
def validate_routes_content(content: str) -> None:
try:
load_routes(content)
except ValueError as e:
raise EgressApplyError(
f"proposed routes.yaml is not valid: {e}"
) from e
__all__ = [
"DockerEgressApplicator",
"EgressApplyError",
"applicator",
"fetch_current_routes",
"validate_routes_content",
]
-23
View File
@@ -1,23 +0,0 @@
"""DockerFreezer — snapshot a Docker bottle via `docker commit`."""
from __future__ import annotations
from .. import ActiveAgent
from ..freeze import Freezer
from .util import commit_container
from ...log import info
class DockerFreezer(Freezer):
"""Freezes a Docker bottle by running `docker commit`."""
backend_name = "docker"
def _freeze(self, agent: ActiveAgent) -> str:
container = f"bot-bottle-{agent.slug}"
image_tag = f"bot-bottle-committed-{agent.slug}:latest"
commit_container(container, image_tag)
return image_tag
def _export_hint(self, slug: str, image_ref: str) -> None:
info(f"to export for migration: docker save {image_ref} -o {slug}.tar")
+7 -18
View File
@@ -47,7 +47,6 @@ from ...bottle_state import (
bottle_state_dir,
egress_state_dir,
git_gate_state_dir,
read_committed_image,
)
from .compose import (
bottle_plan_to_compose,
@@ -92,22 +91,12 @@ def launch(
)
try:
# Step 1: agent image. Use a committed snapshot when one exists
# and is present in the local daemon; otherwise build from the
# Dockerfile. Sidecar images get built lazily by `docker compose
# up` via the renderer's `build:` directives.
committed = read_committed_image(plan.slug)
if committed and docker_mod.image_exists(committed):
info(f"using committed image {committed!r}")
plan = dataclasses.replace(
plan,
agent_provision=dataclasses.replace(plan.agent_provision, image=committed),
)
else:
docker_mod.build_image(
plan.image, _REPO_DIR,
dockerfile=plan.dockerfile_path,
)
# Step 1: agent image build. Sidecar images get built lazily by
# `docker compose up` via the renderer's `build:` directives.
docker_mod.build_image(
plan.image, _REPO_DIR,
dockerfile=plan.dockerfile_path,
)
internal_network = network_mod.network_name_for_slug(plan.slug)
egress_network = network_mod.network_egress_name_for_slug(plan.slug)
@@ -187,7 +176,7 @@ def launch(
agent_command=plan.agent_command,
agent_prompt_mode=plan.agent_prompt_mode,
agent_provider_template=plan.agent_provider_template,
terminal_title=f"{plan.spec.label} ({plan.spec.agent_name})" if plan.spec.label else plan.spec.agent_name,
terminal_title=plan.spec.label or plan.spec.agent_name,
terminal_color=plan.spec.color,
agent_workdir=plan.workspace_plan.workdir,
)
-15
View File
@@ -152,21 +152,6 @@ def build_image(ref: str, context: str, *, dockerfile: str = "") -> None:
# )
def commit_container(container_name: str, image_tag: str) -> None:
"""Run `docker commit <container_name> <image_tag>` to snapshot the
running container's filesystem state as a local Docker image."""
result = subprocess.run(
["docker", "commit", container_name, image_tag],
capture_output=True, text=True, check=False,
)
if result.returncode != 0:
die(
f"docker commit {container_name!r}{image_tag!r} failed: "
f"{(result.stderr or '').strip() or '<no stderr>'}"
)
info(f"committed {container_name!r}{image_tag!r}")
def image_id(ref: str) -> str:
"""Return the content-addressed image ID (e.g.
`sha256:abcd...`) for `ref`. The smolmachines backend keys its
-50
View File
@@ -1,50 +0,0 @@
"""Shared base class for host-side egress apply across backends.
Each backend subclasses EgressApplicator and overrides _signal_bundle_reload
with the backend-specific kill command.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from pathlib import Path
from ..bottle_state import egress_state_dir
from ..egress import EGRESS_ROUTES_FILENAME
from ..egress_addon_core import load_routes
class EgressApplyError(RuntimeError):
pass
class EgressApplicator(ABC):
def apply_routes_change(self, slug: str, content: str) -> tuple[str, str]:
"""Persist `content` to the live routes file and reload egress."""
self.validate_routes_content(content)
routes_path = self._routes_path(slug)
routes_path.parent.mkdir(parents=True, exist_ok=True)
before = routes_path.read_text(encoding="utf-8") if routes_path.exists() else ""
routes_path.write_text(content, encoding="utf-8")
routes_path.chmod(0o600)
self._signal_bundle_reload(slug)
return before, content
@staticmethod
def validate_routes_content(content: str) -> None:
try:
load_routes(content)
except ValueError as e:
raise EgressApplyError(
f"proposed routes.yaml is not valid: {e}"
) from e
@staticmethod
def _routes_path(slug: str) -> Path:
return egress_state_dir(slug) / EGRESS_ROUTES_FILENAME
@abstractmethod
def _signal_bundle_reload(self, slug: str) -> None: ...
__all__ = ["EgressApplicator", "EgressApplyError"]
-100
View File
@@ -1,100 +0,0 @@
"""Freezer — snapshot a running bottle to a resumable artifact.
Follows the same pattern as BottleBackend: a shared base class with
common post-freeze steps (write committed-image path, mark preserved,
print resume hint) and backend-specific subclasses in their respective
backend directories.
Entry points:
Freezer.commit(agent) — freeze by ActiveAgent
Freezer.commit_slug(slug) — convenience wrapper for cmd_commit
get_freezer(backend_name) — factory
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from . import ActiveAgent
from ..bottle_state import mark_preserved, write_committed_image
from ..log import die, info
class CommitCancelled(Exception):
"""Raised by Freezer._freeze when the user declines a confirmation prompt."""
class Freezer(ABC):
"""Freezes a running bottle to a resumable artifact.
The base class owns the shared post-commit steps:
- write_committed_image — records the artifact path in per-bottle state
- mark_preserved — prevents teardown from removing the state dir
- resume hint — printed to stderr after the snapshot
Subclasses implement _freeze with the backend-specific snapshot
operation and optionally override _export_hint for migration hints.
"""
backend_name: str
def commit(self, agent: ActiveAgent) -> None:
"""Freeze the bottle for `agent` to a resumable artifact.
Calls _freeze for the backend-specific snapshot, then writes the
committed image reference to per-bottle state and marks the bottle
preserved so the next `./cli.py resume` boots from the snapshot.
Raises CommitCancelled if the user declines an interactive
confirmation prompt (e.g. the macos-container stop prompt).
"""
image_ref = self._freeze(agent)
write_committed_image(agent.slug, image_ref)
mark_preserved(agent.slug)
info(f"to resume from this snapshot: ./cli.py resume {agent.slug}")
self._export_hint(agent.slug, image_ref)
@abstractmethod
def _freeze(self, agent: ActiveAgent) -> str:
"""Backend-specific snapshot. Returns the image tag or artifact path
stored by write_committed_image. Raises CommitCancelled if the user
declines a stop-confirmation prompt."""
def _export_hint(self, slug: str, image_ref: str) -> None:
"""Optionally print an export-for-migration hint after committing.
Overridden by backends that provide a meaningful export command."""
def commit_slug(self, slug: str) -> None:
"""Convenience entry for cmd_commit when only a slug is available."""
from ..bottle_state import read_metadata
metadata = read_metadata(slug)
agent = ActiveAgent(
backend_name=self.backend_name,
slug=slug,
agent_name=metadata.agent_name if metadata else "",
started_at=metadata.started_at if metadata else "",
services=(),
)
self.commit(agent)
def get_freezer(backend_name: str) -> Freezer:
"""Return the Freezer for the named backend.
backend_name "" is treated as "docker" for backward compatibility
with state dirs written before the backend field was added."""
resolved = backend_name or "docker"
if resolved == "docker":
from .docker.freezer import DockerFreezer
return DockerFreezer()
if resolved == "macos-container":
from .macos_container.freezer import MacosContainerFreezer
return MacosContainerFreezer()
if resolved == "smolmachines":
from .smolmachines.freezer import SmolmachinesFreezer
return SmolmachinesFreezer()
die(
f"commit is only supported for docker, macos-container, and "
f"smolmachines; backend {backend_name!r} has no freezer"
)
raise AssertionError("unreachable")
+5 -45
View File
@@ -2,41 +2,12 @@
from __future__ import annotations
import os
import subprocess
import sys
from typing import Callable, cast
from ...agent_provider import PromptMode, prompt_args
from .. import Bottle, ExecResult
from ..terminal import exec_shell_script
from . import pty_forward as _pty_forward
_PTY_FORWARD_SCRIPT = _pty_forward.__file__
_TERMINAL_ENV_NAMES = (
"TERM",
"COLORTERM",
"TERM_PROGRAM",
"TERM_PROGRAM_VERSION",
"KITTY_WINDOW_ID",
"KITTY_PID",
"WEZTERM_PANE",
"WEZTERM_UNIX_SOCKET",
"GHOSTTY_BIN_DIR",
"GHOSTTY_RESOURCES_DIR",
"ITERM_SESSION_ID",
"VTE_VERSION",
"KONSOLE_VERSION",
"ALACRITTY_WINDOW_ID",
)
def _terminal_env_names() -> tuple[str, ...]:
return tuple(
name for name in _TERMINAL_ENV_NAMES
if name == "TERM" or os.environ.get(name)
)
class MacosContainerBottle(Bottle):
@@ -73,24 +44,13 @@ class MacosContainerBottle(Bottle):
argv=full_argv,
)
)
container_exec = ["container", "exec"]
cmd = ["container", "exec"]
if tty:
container_exec.extend(["--interactive", "--tty"])
# Forward terminal capability hints so TUIs can enable modified-key
# protocols. Use bare env names: values stay in the child env, not
# on argv, and pty_forward supplies a TERM fallback when needed.
for name in _terminal_env_names():
container_exec.extend(["--env", name])
cmd.extend(["--interactive", "--tty"])
if self.agent_workdir and self.agent_workdir != "/home/node":
container_exec.extend(["--workdir", self.agent_workdir])
container_exec.extend([self.name, self.agent_command, *full_argv])
if tty:
# Wrap with the raw-mode forwarder: container exec does not put
# the host terminal into raw mode itself, so the line discipline
# buffers modifier-key sequences until CR. The wrapper sets raw
# mode before exec and restores it on exit.
return [sys.executable, _PTY_FORWARD_SCRIPT, "--", *container_exec]
return container_exec
cmd.extend(["--workdir", self.agent_workdir])
cmd.extend([self.name, self.agent_command, *full_argv])
return cmd
def exec_agent(self, argv: list[str], *, tty: bool = True) -> int:
agent_argv = self.agent_argv(argv, tty=tty)
@@ -1,39 +0,0 @@
"""Host-side egress apply for the macos-container backend.
Uses `container kill --signal HUP` (Apple Container framework) instead
of `docker kill` to signal the sidecar bundle.
"""
from __future__ import annotations
import os
import subprocess
from ...log import warn
from ..egress_apply import EgressApplicator, EgressApplyError
from .launch import sidecar_container_name
class MacOSContainerEgressApplicator(EgressApplicator):
def _signal_bundle_reload(self, slug: str) -> None:
container = sidecar_container_name(slug)
result = subprocess.run(
["container", "kill", "--signal", "HUP", container],
capture_output=True, text=True, check=False, env=os.environ,
)
if result.returncode != 0:
last_error = (result.stderr or "").strip() or (result.stdout or "").strip()
warn(
f"egress: routes updated on disk for {slug}, but bundle reload failed: "
f"{last_error or 'container kill failed'}"
)
raise EgressApplyError(
f"could not reload egress bundle {container}: "
f"{last_error or 'container kill failed'}"
)
applicator = MacOSContainerEgressApplicator()
__all__ = ["MacOSContainerEgressApplicator", "EgressApplyError", "applicator"]
@@ -1,31 +0,0 @@
"""MacosContainerFreezer — snapshot a macOS container bottle.
Apple Container removes containers when they stop, making stop-then-export
impossible. Instead, commit_container execs into the running container and
streams the root filesystem via tar. The bottle continues running after commit.
"""
from __future__ import annotations
from .. import ActiveAgent
from ..freeze import Freezer
from .util import commit_container
from ...log import info
class MacosContainerFreezer(Freezer):
"""Freezes a macOS-container bottle via exec-tar + image rebuild."""
backend_name = "macos-container"
def _freeze(self, agent: ActiveAgent) -> str:
container = f"bot-bottle-{agent.slug}"
image_tag = f"bot-bottle-committed-{agent.slug}:latest"
commit_container(container, image_tag)
return image_tag
def _export_hint(self, slug: str, image_ref: str) -> None:
info(
f"to export for migration: "
f"container image save {image_ref} -o {slug}.tar"
)
+18 -20
View File
@@ -12,16 +12,13 @@ from __future__ import annotations
import dataclasses
import os
import shutil
import subprocess
from contextlib import ExitStack, contextmanager
from pathlib import Path
from typing import Callable, Generator
from ...bottle_state import (
egress_state_dir,
git_gate_state_dir,
read_committed_image,
)
from ...bottle_state import egress_state_dir, git_gate_state_dir
from ...egress import EGRESS_ROUTES_IN_CONTAINER, egress_resolve_token_values
from ...git_gate import revoke_git_gate_provisioned_keys
from ...log import die, info, warn
@@ -87,7 +84,7 @@ def launch(
try:
plan = _mint_certs(plan)
plan = _build_images(plan)
_build_images(plan)
internal_network = internal_network_name(plan.slug)
egress_network = egress_network_name(plan.slug)
@@ -115,7 +112,7 @@ def launch(
agent_command=plan.agent_command,
agent_prompt_mode=plan.agent_prompt_mode,
agent_provider_template=plan.agent_provider_template,
terminal_title=f"{plan.spec.label} ({plan.spec.agent_name})" if plan.spec.label else plan.spec.agent_name,
terminal_title=plan.spec.label or plan.spec.agent_name,
terminal_color=plan.spec.color,
agent_workdir=plan.workspace_plan.workdir,
)
@@ -138,28 +135,17 @@ def _mint_certs(plan: MacosContainerBottlePlan) -> MacosContainerBottlePlan:
return dataclasses.replace(plan, egress_plan=egress_plan)
def _build_images(plan: MacosContainerBottlePlan) -> MacosContainerBottlePlan:
def _build_images(plan: MacosContainerBottlePlan) -> None:
container_mod.build_image(
SIDECAR_BUNDLE_IMAGE,
_REPO_DIR,
dockerfile=SIDECAR_BUNDLE_DOCKERFILE,
)
committed = read_committed_image(plan.slug)
if committed and container_mod.image_exists(committed):
info(f"using committed image {committed!r}")
return dataclasses.replace(
plan,
agent_provision=dataclasses.replace(
plan.agent_provision,
image=committed,
),
)
container_mod.build_image(
plan.image,
_REPO_DIR,
dockerfile=plan.dockerfile_path,
)
return plan
def _create_networks(
@@ -328,6 +314,7 @@ def _agent_run_argv(
"container", "run",
"--name", plan.container_name,
"--detach",
"--rm",
"--network", internal_network,
]
for entry in _agent_env_entries(plan, sidecar_ip):
@@ -377,7 +364,7 @@ def _sidecar_mounts(
))
if ep.routes:
mounts.append((
str(ep.routes_path.parent),
str(_stage_routes_dir(plan)),
str(Path(EGRESS_ROUTES_IN_CONTAINER).parent),
True,
))
@@ -388,6 +375,17 @@ def _sidecar_mounts(
return tuple(mounts)
def _stage_routes_dir(plan: MacosContainerBottlePlan) -> Path:
routes_dir = plan.stage_dir / "macos-container-egress"
routes_dir.mkdir(parents=True, exist_ok=True)
shutil.copyfile(
plan.egress_plan.routes_path,
routes_dir / Path(EGRESS_ROUTES_IN_CONTAINER).name,
)
return routes_dir
def _mount_spec(host_path: str, container_path: str, read_only: bool) -> str:
spec = f"type=bind,source={host_path},target={container_path}"
if read_only:
@@ -1,70 +0,0 @@
"""Host-side raw-mode wrapper for `container exec --interactive --tty`.
Apple's `container exec --interactive --tty` does not set the host terminal to
raw mode before starting its I/O relay. Without raw mode the kernel line
discipline buffers modifier-key escape sequences (e.g. Shift+Enter in
modifyOtherKeys mode produces \\x1b[13;2~) until a carriage-return arrives, so
they never reach Claude Code inside the container.
This module sets the host terminal to raw mode, spawns the inner argv (the
container exec command), and restores the original terminal attributes on
exit. When stdin is not a TTY (piped invocations, CI) it falls through to a
bare subprocess.run so callers do not need to special-case non-interactive
contexts.
Usage (the `--` separator is the API contract — everything after it is the
inner command):
python pty_forward.py -- container exec --interactive --tty <name> <cmd>
"""
from __future__ import annotations
import os
import subprocess
import sys
import termios
import tty
def _inner_env() -> dict[str, str]:
env = dict(os.environ)
env.setdefault("TERM", "xterm-256color")
return env
def _run_inner(inner: list[str]) -> int:
return subprocess.run(inner, check=False, env=_inner_env()).returncode
def main(argv: list[str]) -> int:
"""Entry point. ``argv`` shape: ``-- <inner-argv...>``."""
if len(argv) < 2 or argv[0] != "--":
sys.stderr.write(
"usage: python pty_forward.py -- <container-exec-argv...>\n"
)
return 2
inner = argv[1:]
try:
fd = sys.stdin.fileno()
except OSError:
return _run_inner(inner)
if not os.isatty(fd):
return _run_inner(inner)
try:
old = termios.tcgetattr(fd)
except termios.error:
return _run_inner(inner)
try:
tty.setraw(fd)
return _run_inner(inner)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old)
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))
@@ -8,7 +8,6 @@ import ipaddress
import platform
import shutil
import subprocess
import tempfile
import time
from typing import Iterable
@@ -73,53 +72,6 @@ def build_image(ref: str, context: str, *, dockerfile: str = "") -> None:
subprocess.run(args, check=True)
def commit_container(container_name: str, image_tag: str) -> None:
"""Snapshot a running Apple Container as a local image.
`container export` requires a stopped container, but Apple Container
removes containers when they stop, making stop-then-export impossible.
Instead, exec into the running container as root and stream the root
filesystem out via tar, then build a new image from that archive.
The bottle continues running after commit.
"""
with tempfile.TemporaryDirectory(prefix="bot-bottle-container-commit.") as tmp:
rootfs_tar = os.path.join(tmp, "rootfs.tar")
dockerfile = os.path.join(tmp, "Dockerfile")
with open(rootfs_tar, "wb") as tar_out:
result = subprocess.run(
[
_CONTAINER, "exec",
"--user", "root",
container_name,
"tar", "--create",
"--exclude=./proc",
"--exclude=./sys",
"--exclude=./dev",
"--exclude=./run",
"--file=-",
"--directory=/",
".",
],
stdout=tar_out,
stderr=subprocess.PIPE,
check=False,
)
if result.returncode != 0:
die(
f"container exec tar {container_name!r} failed: "
f"{(result.stderr or b'').decode().strip() or '<no stderr>'}"
)
with open(dockerfile, "w", encoding="utf-8") as f:
f.write(
"FROM scratch\n"
"ADD rootfs.tar /\n"
"USER node\n"
"WORKDIR /home/node\n"
)
build_image(image_tag, tmp, dockerfile=dockerfile)
info(f"committed {container_name!r}{image_tag!r}")
def _ensure_builder_dns() -> None:
dns = dns_server()
status = _builder_status()
@@ -266,36 +218,6 @@ def container_exists(name: str) -> bool:
return name in {line.strip() for line in result.stdout.splitlines()}
def container_is_running(name: str) -> bool:
"""Return True if the named container is currently running.
`container list` without `--all` lists only running containers."""
result = subprocess.run(
[_CONTAINER, "list", "--quiet"],
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0:
return False
return name in {line.strip() for line in result.stdout.splitlines()}
def stop_container(name: str) -> None:
"""Stop the named container without deleting it."""
result = subprocess.run(
[_CONTAINER, "stop", name],
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0:
die(
f"container stop {name!r} failed: "
f"{(result.stderr or '').strip() or '<no stderr>'}"
)
def force_remove_container(name: str) -> None:
if container_exists(name):
subprocess.run(
+1 -6
View File
@@ -145,12 +145,7 @@ class SmolmachinesBottle(Bottle):
script = exec_shell_script(agent_argv, self.terminal_title, self.terminal_color) if tty else None
if script is None:
return subprocess.run(agent_argv, check=False).returncode
# Use sh -c (not -lc) so the script inherits PATH from the calling
# process. sh -l sources login-shell init files (e.g. /etc/profile)
# which may NOT include smolvm's location when it was installed via
# homebrew. The calling process (./cli.py) already has smolvm on PATH
# (provision steps succeed), so -c is sufficient.
return subprocess.run(["sh", "-c", script], check=False).returncode
return subprocess.run(["sh", "-lc", script], check=False).returncode
# smolvm/libkrun can SIGKILL an otherwise-normal exec during
# early-VM provisioning. Retry once after a short settle so
@@ -1,21 +0,0 @@
"""Egress apply for the smolmachines backend.
The smolmachines sidecar bundle runs as a host-side Docker container,
so egress signalling is identical to the docker backend.
"""
from __future__ import annotations
from ..docker.egress_apply import ( # noqa: F401
DockerEgressApplicator,
EgressApplyError,
applicator,
fetch_current_routes,
)
__all__ = [
"DockerEgressApplicator",
"EgressApplyError",
"applicator",
"fetch_current_routes",
]
-145
View File
@@ -1,145 +0,0 @@
"""SmolmachinesFreezer — snapshot a smolmachines bottle.
`smolvm pack create --from-vm` requires the VM to be stopped, and smolvm
removes VMs when stopped (same issue as Apple Container). Instead, exec
into the running VM as root to write a gzip-compressed tar of the root
filesystem to /var/tmp, then copy it to the host with `smolvm machine cp`,
build a Docker image from the archive, convert it to a smolmachine artifact
via the existing registry pipeline, and record the sidecar path. The VM
stays running throughout."""
from __future__ import annotations
import tempfile
from pathlib import Path
from .. import ActiveAgent
from ..freeze import Freezer
from ..docker import util as docker_mod
from .local_registry import crane_push_tarball, ephemeral_registry
from .smolvm import machine_cp, machine_exec, pack_create
from ...bottle_state import bottle_state_dir
from ...log import die, info
# Temp file written inside the VM during commit. Lives in /var/tmp
# (on-disk, unlike tmpfs /tmp) to survive for machine_cp.
_VM_COMMIT_TAR = "/var/tmp/.bot-bottle-commit.tar.gz"
class SmolmachinesFreezer(Freezer):
"""Freezes a smolmachines bottle via exec-tar + Docker image + smolmachine pack.
The VM is NOT stopped. We exec into the running VM to write a compressed
tar of the root filesystem to /var/tmp, copy it to the host with
machine_cp, build a Docker image (Docker's ADD decompresses .tar.gz
automatically), then run the same image→registry→pack_create pipeline
that _ensure_smolmachine uses for fresh builds."""
backend_name = "smolmachines"
def _freeze(self, agent: ActiveAgent) -> str:
machine = f"bot-bottle-{agent.slug}"
image_ref = f"bot-bottle-committed-{agent.slug}:latest"
output_dir = bottle_state_dir(agent.slug)
output_dir.mkdir(parents=True, exist_ok=True)
binary = output_dir / "committed-smolmachine"
sidecar = output_dir / "committed-smolmachine.smolmachine"
_snapshot_running_vm(machine, image_ref, binary)
return str(sidecar)
def _export_hint(self, slug: str, image_ref: str) -> None:
info(f"to export for migration: cp {image_ref} {slug}.smolmachine")
def _snapshot_running_vm(machine: str, image_ref: str, binary: Path) -> None:
"""Exec-tar the running VM, build a Docker image, and pack to a smolmachine.
binary: destination for the launcher (sibling .smolmachine is the artifact
that machine_create --from consumes, same convention as pack_create).
"""
with tempfile.TemporaryDirectory(prefix="bot-bottle-vm-commit.") as tmp:
tmp_path = Path(tmp)
# Use .tar.gz — Docker ADD decompresses automatically and the
# compressed archive fits in the VM's /var/tmp more easily.
rootfs_tar_gz = tmp_path / "rootfs.tar.gz"
dockerfile = tmp_path / "Dockerfile"
_exec_tar_to_file(machine, rootfs_tar_gz)
dockerfile.write_text(
"FROM scratch\n"
"ADD rootfs.tar.gz /\n"
"USER node\n"
"WORKDIR /home/node\n"
)
docker_mod.build_image(image_ref, str(tmp_path), dockerfile=str(dockerfile))
image_tarball = binary.parent / "committed.image.tar"
docker_mod.save(image_ref, str(image_tarball))
try:
with ephemeral_registry() as handle:
digest = docker_mod.image_id(image_ref).split(":", 1)[-1][:16]
push_ref = f"{handle.push_endpoint}/bot-bottle-committed:{digest}"
pack_ref = f"{handle.pull_endpoint}/bot-bottle-committed:{digest}"
crane_push_tarball(handle, str(image_tarball), push_ref)
pack_create(pack_ref, binary)
finally:
image_tarball.unlink(missing_ok=True)
def _exec_tar_to_file(machine: str, dest: Path) -> None:
"""Snapshot the running VM's root filesystem to dest (.tar.gz).
Writes a gzip-compressed tar to _VM_COMMIT_TAR inside the VM via
machine_exec (same mechanism as provisioning), then copies it to the
host with machine_cp. This avoids binary-stdout piping through the
smolvm exec channel, which does not reliably handle large binary output.
A connectivity probe (machine_exec true) runs first so a concurrent-exec
limitation (smolvm may reject a second exec while -i -t is active) is
reported clearly rather than as a silent failure."""
# Connectivity probe — if smolvm rejects concurrent exec while an
# interactive session is running, fail clearly here.
probe = machine_exec(machine, ["true"])
if probe.returncode != 0:
die(
f"smolvm exec is not available for {machine!r} "
f"(exit {probe.returncode}: {probe.stderr.strip() or probe.stdout.strip() or '<no output>'}). "
f"If an interactive session is active, smolvm may not support concurrent exec."
)
# Create the compressed tar inside the VM.
# tar exits 1 when files change during archiving (normal for a live
# filesystem); only treat exit > 1 as fatal.
tar_result = machine_exec(
machine,
[
"tar", "--create", "--gzip",
"--exclude=./proc",
"--exclude=./sys",
"--exclude=./dev",
"--exclude=./run",
# /tmp and /var/tmp are ephemeral. Their stale contents
# (e.g. /tmp/claude-<uid>) have uid remapped by smolvm's
# pack process, causing Claude Code to refuse to use them
# on resume. Exclude both; _init_vm recreates them with
# mkdir -p + correct ownership on every boot.
"--exclude=./tmp",
"--exclude=./var/tmp",
f"--file={_VM_COMMIT_TAR}",
"--directory=/",
".",
],
)
if tar_result.returncode > 1:
die(
f"smolvm exec tar {machine!r} failed (exit {tar_result.returncode}): "
f"{tar_result.stderr.strip() or tar_result.stdout.strip() or '<no output>'}"
)
# Copy from VM to host, then clean up.
try:
machine_cp(f"{machine}:{_VM_COMMIT_TAR}", str(dest))
finally:
machine_exec(machine, ["rm", "-f", _VM_COMMIT_TAR])
+13 -44
View File
@@ -40,12 +40,8 @@ from ..docker.git_gate import (
GIT_GATE_HOOK_IN_CONTAINER,
)
from ...git_gate import revoke_git_gate_provisioned_keys
from ...log import info, warn
from ...bottle_state import (
egress_state_dir,
git_gate_state_dir,
read_committed_image,
)
from ...log import warn
from ...bottle_state import egress_state_dir, git_gate_state_dir
from . import loopback_alias as _loopback
from . import sidecar_bundle as _bundle
from . import smolvm as _smolvm
@@ -89,7 +85,14 @@ def launch(
plan = _start_bundle(plan, network, loopback_ip, stack)
plan = _discover_urls(plan, loopback_ip)
agent_from_path = _agent_from_path(plan)
# Build the agent image and pack it into a `.smolmachine`
# artifact (or hit the per-Dockerfile-digest cache). Runs
# here, not in prepare, so the docker-build output doesn't
# garble the dashboard's preflight modal.
agent_from_path = _ensure_smolmachine(
plan.agent_image,
dockerfile=plan.agent_dockerfile_path,
)
_launch_vm(plan, agent_from_path, loopback_ip, stack)
_init_vm(plan)
@@ -101,7 +104,7 @@ def launch(
agent_command=plan.agent_command,
agent_prompt_mode=plan.agent_prompt_mode,
agent_provider_template=plan.agent_provider_template,
terminal_title=f"{plan.spec.label} ({plan.spec.agent_name})" if plan.spec.label else plan.spec.agent_name,
terminal_title=plan.spec.label or plan.spec.agent_name,
terminal_color=plan.spec.color,
agent_workdir=plan.workspace_plan.workdir,
)
@@ -214,15 +217,11 @@ def _discover_urls(
agent_supervise_url = f"http://{loopback_ip}:{supervise_host_port}/"
existing_no_proxy = plan.guest_env.get("NO_PROXY", "localhost,127.0.0.1")
no_proxy = f"{existing_no_proxy},{loopback_ip}"
guest_env = {
**plan.guest_env,
"HTTPS_PROXY": agent_proxy_url,
"HTTP_PROXY": agent_proxy_url,
"https_proxy": agent_proxy_url,
"http_proxy": agent_proxy_url,
"NO_PROXY": no_proxy,
"no_proxy": no_proxy,
"NO_PROXY": f"{existing_no_proxy},{loopback_ip}",
}
if agent_git_gate_host:
guest_env["GIT_GATE_URL"] = f"http://{agent_git_gate_host}"
@@ -276,16 +275,10 @@ def _init_vm(plan: SmolmachinesBottlePlan) -> None:
All folded into one sh -c to avoid back-to-back exec calls
immediately after machine_start (libkrun exec-channel race).
mkdir -p guards: when booting from a committed snapshot, /tmp and
/var/tmp are excluded from the archive (they're ephemeral and their
stale contents would have wrong uid after smolvm's uid remap). The
directories must be created before chown/chmod can set permissions.
wait_exec_ready polls until the exec channel is ready for the
subsequent provision calls, replacing the empirical sleep."""
_smolvm.machine_exec(plan.machine_name, [
"sh", "-c",
"mkdir -p /tmp /var/tmp && "
"chown -R node:node /home/node && "
"chown root:root /tmp /var/tmp && "
"chmod 1777 /tmp /var/tmp",
@@ -315,7 +308,7 @@ def _bundle_launch_spec(
ep = plan.egress_plan
volumes.append((str(ep.mitmproxy_ca_host_path), EGRESS_CA_IN_CONTAINER, True))
if ep.routes:
volumes.append((str(ep.routes_path.parent), str(Path(EGRESS_ROUTES_IN_CONTAINER).parent), True))
volumes.append((str(ep.routes_path), EGRESS_ROUTES_IN_CONTAINER, True))
# Bare-name entries for upstream-token slots. Their values
# come from the docker-run subprocess env (inherited from
# the operator's shell), never landing on argv.
@@ -389,30 +382,6 @@ def _resolve_token_env(
return egress_resolve_token_values(plan.egress_plan.token_env_map, effective_env)
def _agent_from_path(plan: SmolmachinesBottlePlan) -> Path:
"""Return the `.smolmachine` artifact used for `machine create --from`.
Prefer a committed VM artifact when one is recorded and still
present. If the file was removed, fall back to the normal image
build + pack cache path.
"""
committed = read_committed_image(plan.slug)
if committed:
committed_path = Path(committed)
if committed_path.is_file():
info(f"using committed smolmachine {str(committed_path)!r}")
return committed_path
# Build the agent image and pack it into a `.smolmachine`
# artifact (or hit the per-Dockerfile-digest cache). Runs here,
# not in prepare, so the docker-build output doesn't garble the
# dashboard's preflight modal.
return _ensure_smolmachine(
plan.agent_image,
dockerfile=plan.agent_dockerfile_path,
)
def _ensure_smolmachine(image_ref: str, *, dockerfile: str = "") -> Path:
"""Build the agent docker image and convert it into a
`.smolmachine` artifact, caching the result under
-26
View File
@@ -25,7 +25,6 @@ smolvm binary."""
from __future__ import annotations
import json
import shutil
import subprocess
import time
@@ -95,16 +94,6 @@ def pack_create(image: str, output: Path) -> None:
_smolvm("pack", "create", "--image", image, "-o", str(output))
def pack_create_from_vm(name: str, output: Path) -> None:
"""`smolvm pack create --from-vm <name> -o <output>`.
Snapshots an existing persistent VM into a pack artifact. As
with `pack_create`, smolvm writes a launcher at `output` and the
bootable sidecar at `output.smolmachine`.
"""
_smolvm("pack", "create", "--from-vm", name, "-o", str(output))
# --- Machine lifecycle ---------------------------------------------------
@@ -154,21 +143,6 @@ def machine_create(
_smolvm(*args)
def machine_is_running(name: str) -> bool:
"""Return True if the named VM is in the 'running' state."""
result = _smolvm("machine", "ls", "--json", check=False)
if result.returncode != 0:
return False
try:
machines = json.loads(result.stdout or "[]")
except ValueError:
return False
return any(
isinstance(m, dict) and m.get("name") == name and m.get("state") == "running"
for m in machines
)
def machine_start(name: str) -> None:
"""`smolvm machine start --name NAME`."""
_smolvm("machine", "start", "--name", name)
-30
View File
@@ -43,7 +43,6 @@ from . import supervise as _supervise
# Directory layout: ~/.bot-bottle/state/<identity>/...
_STATE_SUBDIR = "state"
_PER_BOTTLE_DOCKERFILE_NAME = "Dockerfile"
_COMMITTED_IMAGE_NAME = "committed-image"
_TRANSCRIPT_SUBDIR = "transcript"
# Per-sidecar scratch subdirs. PRD 0018 chunk 2: bind-mount sources
# live here so chunk 3's `docker compose up` can find them at stable
@@ -180,32 +179,6 @@ def write_per_bottle_dockerfile(identity: str, content: str) -> Path:
return p
def committed_image_path(identity: str) -> Path:
return bottle_state_dir(identity) / _COMMITTED_IMAGE_NAME
def write_committed_image(identity: str, image_tag: str) -> Path:
"""Persist the committed image tag for `identity`. The next
`cli.py resume <identity>` will boot from this image instead of
rebuilding from the Dockerfile."""
path = committed_image_path(identity)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(image_tag.strip() + "\n")
path.chmod(0o644)
return path
def read_committed_image(identity: str) -> str | None:
"""Return the committed image tag for `identity`, or None if no
commit has been recorded. Used by the Docker launch step to skip
the Dockerfile build when a committed snapshot exists."""
path = committed_image_path(identity)
if not path.is_file():
return None
tag = path.read_text().strip()
return tag or None
def per_bottle_image_tag(identity: str) -> str:
"""Image tag for a rebuilt bottle. Distinct from the base
bot-bottle-claude:latest so per-bottle rebuilds don't collide in
@@ -341,7 +314,6 @@ __all__ = [
"bottle_state_dir",
"cleanup_state",
"clear_preserve_marker",
"committed_image_path",
"egress_state_dir",
"git_gate_state_dir",
"is_preserved",
@@ -351,11 +323,9 @@ __all__ = [
"per_bottle_dockerfile_path",
"per_bottle_image_tag",
"preserve_marker_path",
"read_committed_image",
"read_metadata",
"supervise_state_dir",
"transcript_snapshot_dir",
"write_committed_image",
"write_metadata",
"write_per_bottle_dockerfile",
]
+1 -4
View File
@@ -1,6 +1,6 @@
"""Main CLI dispatcher.
Commands: cleanup, commit, edit, info, init, list, resume, start, supervise
Commands: cleanup, edit, info, init, list, resume, start, supervise
"""
from __future__ import annotations
@@ -12,7 +12,6 @@ from ..manifest import ManifestError
from ._common import PROG
from . import list as _list_mod
from .cleanup import cmd_cleanup
from .commit import cmd_commit
from .edit import cmd_edit
from .info import cmd_info
from .init import cmd_init
@@ -24,7 +23,6 @@ cmd_list = _list_mod.cmd_list
COMMANDS = {
"cleanup": cmd_cleanup,
"commit": cmd_commit,
"edit": cmd_edit,
"info": cmd_info,
"init": cmd_init,
@@ -39,7 +37,6 @@ def usage() -> None:
sys.stderr.write(f"usage: {PROG} <command> [args...]\n\n")
sys.stderr.write("Commands:\n")
sys.stderr.write(" cleanup stop and remove all active bot-bottle containers\n")
sys.stderr.write(" commit snapshot a running bottle's container state to a Docker image\n")
sys.stderr.write(" edit open an agent in vim for editing\n")
sys.stderr.write(" info print env, skills, and prompt details for a named agent\n")
sys.stderr.write(" init interactively create a new agent and add it to bot-bottle.json\n")
-53
View File
@@ -1,53 +0,0 @@
"""commit: freeze a running bottle's state to a resumable artifact.
Docker bottles are committed to a local Docker image. Macos-container
bottles are exported and rebuilt as a local Apple Container image.
Smolmachines bottles are packed from the running VM into a
`.smolmachine` artifact. The resulting reference is stored in
per-bottle state so the next `./cli.py resume <slug>` boots from the
snapshot instead of rebuilding from the Dockerfile.
"""
from __future__ import annotations
import argparse
from ..backend import enumerate_active_agents
from ..backend.freeze import CommitCancelled, get_freezer
from ..bottle_state import read_metadata
from ..log import die
from ._common import PROG
from . import tui
def cmd_commit(argv: list[str]) -> int:
parser = argparse.ArgumentParser(prog=f"{PROG} commit", add_help=True)
parser.add_argument(
"slug",
nargs="?",
default=None,
help=(
"bottle slug from `cli.py list active` "
"(omit to pick interactively)"
),
)
args = parser.parse_args(argv)
slug = args.slug
if slug is None:
active = enumerate_active_agents()
if not active:
die("no active bottles; start one with `./cli.py start`")
choices = [a.slug for a in active]
slug = tui.filter_select(choices, title="Select bottle to commit")
if slug is None:
return 0
metadata = read_metadata(slug)
backend = metadata.backend if metadata else ""
try:
get_freezer(backend).commit_slug(slug)
except CommitCancelled:
return 0
return 0
+1 -1
View File
@@ -55,7 +55,7 @@ def cmd_list(argv: list[str]) -> int:
# Tab-separated keeps the format stable for shell pipelines.
for b in active:
services = ",".join(b.services) if b.services else "-"
display_name = f"{b.label} ({b.agent_name})" if b.label else b.agent_name
display_name = b.label if b.label else b.agent_name
colored_name = _ansi_label(display_name, b.color)
print(f"{b.backend_name}\t{b.slug}\t{colored_name}\t{services}")
return 0
+3 -34
View File
@@ -3,8 +3,7 @@ act on them (approve / modify / reject).
Curses-based TUI; modify-then-approve shells out to $EDITOR. The
approval handler wires to PRD 0016 (capability-block), which rebuilds
the bottle Dockerfile. Egress proposals are queued for operator review
as full routes.yaml updates.
the bottle Dockerfile. The egress-block tool was removed in issue #198.
"""
from __future__ import annotations
@@ -21,21 +20,11 @@ from datetime import datetime, timezone
from pathlib import Path
from .. import supervise as _supervise
from ..bottle_state import read_metadata
# from ..bottle_state import read_metadata
# from ..backend.docker.capability_apply import (
# CapabilityApplyError,
# apply_capability_change,
# )
from ..backend.docker.egress_apply import (
EgressApplyError,
applicator as _docker_applicator,
)
from ..backend.macos_container.egress_apply import (
applicator as _macos_applicator,
)
from ..backend.smolmachines.egress_apply import (
applicator as _smolmachines_applicator,
)
from ..log import Die, error, info
@@ -51,8 +40,6 @@ from ..supervise import (
STATUS_MODIFIED,
STATUS_REJECTED,
TOOL_CAPABILITY_BLOCK,
TOOL_ALLOW,
TOOL_EGRESS_BLOCK,
archive_proposal,
list_pending_proposals,
render_diff,
@@ -76,17 +63,7 @@ class QueuedProposal:
# Errors any remediation engine may raise. Caught by the TUI key
# handlers and surfaced in the status line so a failed apply keeps
# the proposal pending rather than crashing curses.
ApplyError = (CapabilityApplyError, EgressApplyError)
def apply_routes_change(slug: str, content: str) -> tuple[str, str]:
meta = read_metadata(slug)
backend = meta.backend if meta is not None else ""
if backend == "macos-container":
return _macos_applicator.apply_routes_change(slug, content)
if backend == "smolmachines":
return _smolmachines_applicator.apply_routes_change(slug, content)
return _docker_applicator.apply_routes_change(slug, content)
ApplyError = (CapabilityApplyError,)
def discover_pending() -> list[QueuedProposal]:
@@ -138,8 +115,6 @@ def _detail_lines(
def _suffix_for_tool(tool: str) -> str:
if tool == TOOL_CAPABILITY_BLOCK:
return ".dockerfile"
if tool in (TOOL_ALLOW, TOOL_EGRESS_BLOCK):
return ".yaml"
return ".txt"
@@ -154,7 +129,6 @@ def approve(
) -> None:
"""Apply the proposal, write the waiting response, and audit it."""
status = STATUS_MODIFIED if final_file is not None else STATUS_APPROVED
file_to_apply = final_file if final_file is not None else qp.proposal.proposed_file
diff_before, diff_after = "", ""
# if qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
@@ -168,11 +142,6 @@ def approve(
# diff_before, diff_after = apply_capability_change(
# qp.proposal.bottle_slug, file_to_apply,
# )
if qp.proposal.tool in (TOOL_ALLOW, TOOL_EGRESS_BLOCK):
diff_before, diff_after = apply_routes_change(
qp.proposal.bottle_slug,
file_to_apply,
)
response = Response(
proposal_id=qp.proposal.id,
+3 -3
View File
@@ -261,8 +261,8 @@ class CodexAgentProvider(AgentProvider):
return
info(f"registering supervise MCP server in agent codex config → {supervise_url}")
r = bottle.exec(
f"codex mcp add {_SUPERVISE_MCP_NAME} --url "
f"{shlex.quote(supervise_url)}",
f"codex mcp add --transport http "
f"{_SUPERVISE_MCP_NAME} {supervise_url}",
user="node",
)
if r.returncode != 0:
@@ -270,7 +270,7 @@ class CodexAgentProvider(AgentProvider):
f"`codex mcp add supervise` failed (exit {r.returncode}): "
f"{(r.stderr or r.stdout or '').strip()}. Inside the bottle, "
f"register manually with: "
f"codex mcp add supervise --url {shlex.quote(supervise_url)}"
f"codex mcp add --transport http supervise {supervise_url}"
)
+1 -3
View File
@@ -31,7 +31,6 @@ CODEX_HOST_CREDENTIAL_TOKEN_REF = "BOT_BOTTLE_CODEX_HOST_ACCESS_TOKEN"
EGRESS_HOSTNAME = "egress"
EGRESS_ROUTES_IN_CONTAINER = "/etc/egress/routes.yaml"
EGRESS_ROUTES_FILENAME = Path(EGRESS_ROUTES_IN_CONTAINER).name
@dataclass(frozen=True)
@@ -296,7 +295,7 @@ class Egress(ABC):
) -> EgressPlan:
routes = egress_routes_for_bottle(bottle, provider_routes)
log = bottle.egress.Log
routes_path = stage_dir / EGRESS_ROUTES_FILENAME
routes_path = stage_dir / "egress_routes.yaml"
routes_path.write_text(egress_render_routes(routes, log=log))
routes_path.chmod(0o600)
return EgressPlan(
@@ -310,7 +309,6 @@ class Egress(ABC):
__all__ = [
"CODEX_HOST_CREDENTIAL_TOKEN_REF",
"EGRESS_HOSTNAME",
"EGRESS_ROUTES_FILENAME",
"EGRESS_ROUTES_IN_CONTAINER",
"Egress",
"EgressPlan",
+2 -2
View File
@@ -5,6 +5,7 @@ egress container."""
from __future__ import annotations
import dataclasses
import json
import os
import signal
@@ -26,7 +27,6 @@ from egress_addon_core import ( # type: ignore[import-not-found] # pylint: dis
load_config,
match_route,
outbound_scan_headers,
route_to_yaml_dict,
scan_inbound,
scan_outbound,
)
@@ -82,7 +82,7 @@ class EgressAddon:
def _serve_introspection(self, flow: http.HTTPFlow, path: str) -> None:
if path == "/allowlist":
payload = json.dumps(
{"routes": [route_to_yaml_dict(r) for r in self.config.routes]},
{"routes": [dataclasses.asdict(r) for r in self.config.routes]},
indent=2,
).encode("utf-8")
flow.response = http.Response.make(
-51
View File
@@ -359,56 +359,6 @@ def _parse_one(idx: int, raw: object) -> Route:
)
def _path_match_to_dict(pm: PathMatch) -> dict[str, object]:
d: dict[str, object] = {"value": pm.value}
if pm.type != "prefix":
d["type"] = pm.type
return d
def _header_match_to_dict(hm: HeaderMatch) -> dict[str, object]:
d: dict[str, object] = {"name": hm.name, "value": hm.value}
if hm.type != "exact":
d["type"] = hm.type
return d
def _match_entry_to_dict(me: MatchEntry) -> dict[str, object]:
d: dict[str, object] = {}
if me.paths:
d["paths"] = [_path_match_to_dict(p) for p in me.paths]
if me.methods:
d["methods"] = list(me.methods)
if me.headers:
d["headers"] = [_header_match_to_dict(h) for h in me.headers]
return d
def route_to_yaml_dict(r: Route) -> dict[str, object]:
"""Serialize a Route to YAML-schema-compatible dict.
Uses the same field names the YAML parser accepts, so the output
can be round-tripped directly into an `allow` or `egress-block`
proposal without translation. Fields that are empty/default are
omitted so the agent doesn't copy irrelevant keys."""
d: dict[str, object] = {"host": r.host}
if r.auth_scheme:
d["auth_scheme"] = r.auth_scheme
d["token_env"] = r.token_env
if r.matches:
d["matches"] = [_match_entry_to_dict(m) for m in r.matches]
if r.git_fetch:
d["git"] = {"fetch": True}
dlp: dict[str, object] = {}
if r.outbound_detectors is not None:
dlp["outbound_detectors"] = list(r.outbound_detectors)
if r.inbound_detectors is not None:
dlp["inbound_detectors"] = list(r.inbound_detectors)
if dlp:
d["dlp"] = dlp
return d
def load_routes(text: str) -> tuple[Route, ...]:
"""Parse YAML text → routes."""
try:
@@ -748,7 +698,6 @@ def scan_inbound(
__all__ = [
"LOG_BLOCKS",
"route_to_yaml_dict",
"LOG_FULL",
"LOG_OFF",
"Config",
+12 -19
View File
@@ -5,7 +5,7 @@ queue/audit support. The sidecar (bot_bottle.supervise_server)
sits on the bottle's internal network and exposes three MCP tools the
agent calls when it hits a stuck-recovery category:
* egress-block / allow agent proposes a new routes.yaml
* egress-block agent proposes a new routes.yaml
* capability-block agent proposes a new agent Dockerfile
Each tool call: the agent passes the full proposed file plus a
@@ -49,34 +49,27 @@ SUPERVISE_HOSTNAME = "supervise"
SUPERVISE_PORT = 9100
TOOL_CAPABILITY_BLOCK = "capability-block"
TOOL_EGRESS_BLOCK = "egress-block"
TOOL_ALLOW = "allow"
TOOL_LIST_EGRESS_ROUTES = "list-egress-routes"
TOOLS: tuple[str, ...] = (
TOOL_ALLOW,
TOOL_CAPABILITY_BLOCK,
TOOL_EGRESS_BLOCK,
TOOL_LIST_EGRESS_ROUTES,
)
# The supervise sidecar uses these to query egress's
# introspection endpoint for the `list-egress-routes` MCP
# tool. The hostname + port match egress's docker network
# listen port (see backend.docker.egress.EGRESS_PORT). The supervise
# daemon runs inside the sidecar bundle alongside egress, so loopback
# is the stable address across docker, smolmachines, and Apple
# Container backends.
EGRESS_FORWARD_PROXY = "http://127.0.0.1:9099"
# alias + listen port (see bot_bottle.egress.EGRESS_HOSTNAME
# and backend.docker.egress.EGRESS_PORT — the values
# are inlined here so the in-container supervise_server doesn't
# need to import the egress package).
EGRESS_FORWARD_PROXY = "http://egress:9099"
EGRESS_INTROSPECT_URL = "http://_egress.local/allowlist"
# capability-block has no on-disk config the operator edits in place
# (the Dockerfile is rebuilt, not patched), so it has no audit log
# here — those changes are captured by git history + the rebuild record
# laid down in PRD 0016.
COMPONENT_FOR_TOOL: dict[str, str] = {
TOOL_ALLOW: "egress",
TOOL_EGRESS_BLOCK: "egress",
}
# here — those changes are captured by git history + the rebuild
# record laid down in PRD 0016. egress-block was removed in issue #198.
COMPONENT_FOR_TOOL: dict[str, str] = {}
STATUS_APPROVED = "approved"
STATUS_MODIFIED = "modified"
@@ -438,9 +431,9 @@ def sha256_hex(content: str) -> str:
# Dockerfile and propose modifications.
#
# routes.yaml + allowlist used to live here too; PRD 0017 chunk 3
# moved them behind the `list-egress-routes` MCP tool (live state
# from egress's introspection endpoint) so the agent always sees
# current data rather than a launch-time snapshot.
# moved them behind the `list-egress-routes` MCP tool (live
# state from egress's introspection endpoint) so the agent
# always sees current data rather than a launch-time snapshot.
CURRENT_CONFIG_DOCKERFILE = "Dockerfile"
+10 -108
View File
@@ -1,8 +1,8 @@
"""Supervise sidecar HTTP server (PRD 0013).
Per-bottle MCP server exposing tools the agent calls to propose config
changes when stuck. The tools are `allow`, `egress-block`,
`capability-block`, and `list-egress-routes`.
changes when stuck. The egress-block tool was removed in issue #198;
the remaining tools are `capability-block` and `list-egress-routes`.
Each queued tool call:
@@ -44,15 +44,9 @@ import urllib.request
from dataclasses import dataclass
from pathlib import Path
try:
# Same-directory imports inside the bundle container; these files are
# COPYed flat under /app by Dockerfile.sidecars.
from egress_addon_core import load_routes
import supervise as _sv
except ModuleNotFoundError:
# Package imports for host-side tests and tooling.
from .egress_addon_core import load_routes
from . import supervise as _sv
# Same-directory import inside the bundle container; `supervise.py`
# is COPYed alongside this file by Dockerfile.sidecars.
import supervise as _sv
# --- JSON-RPC / MCP plumbing ----------------------------------------------
@@ -148,9 +142,8 @@ TOOL_DEFINITIONS: list[dict[str, object]] = [
"allowlist. Returns JSON with one entry per allowed host, "
"each carrying its matches rules (if any) and whether "
"the proxy injects Authorization for the route. Use this "
"before composing an `allow` or `egress-block` proposal so "
"the new routes file extends the live one rather than "
"replacing it."
"before composing an `egress-block` proposal so the new "
"routes file extends the live one rather than replacing it."
),
"inputSchema": {
"type": "object",
@@ -158,88 +151,6 @@ TOOL_DEFINITIONS: list[dict[str, object]] = [
"additionalProperties": False,
},
},
{
"name": _sv.TOOL_ALLOW,
"description": (
"Request operator approval to change the bottle's egress "
"allowlist. Pass the full proposed routes.yaml content, not "
"just the new host, plus a justification. Use "
"`list-egress-routes` first so the proposal preserves existing "
"routes."
),
"inputSchema": {
"type": "object",
"properties": {
"routes_yaml": {
"type": "string",
"description": (
"Full proposed /etc/egress/routes.yaml content. "
"Each route entry accepts these keys:\n"
" host: <hostname> (required)\n"
" auth_scheme: Bearer|token (must pair with token_env)\n"
" token_env: <ENV_VAR_NAME> (must pair with auth_scheme)\n"
" matches: (optional list of match entries)\n"
" - paths: [{type: prefix|exact|regex, value: /...}]\n"
" methods: [GET, POST, ...]\n"
" headers: [{name: X-Hdr, value: val, type: exact|regex}]\n"
" git: (optional; omit to block git clone/fetch)\n"
" fetch: true\n"
" dlp: (optional DLP scanner overrides)\n"
" outbound_detectors: [token_patterns, known_secrets]\n"
" inbound_detectors: [naive_injection_detection]\n"
"Omit any key that should use its default. "
"`list-egress-routes` returns routes in this same format."
),
},
"justification": {
"type": "string",
"description": "Why this egress route is needed.",
},
},
"required": ["routes_yaml", "justification"],
},
},
{
"name": _sv.TOOL_EGRESS_BLOCK,
"description": (
"Request operator approval to change the bottle's egress "
"allowlist after a blocked outbound request. Pass the full "
"proposed routes.yaml content plus a justification. Use "
"`list-egress-routes` first so the proposal preserves existing "
"routes."
),
"inputSchema": {
"type": "object",
"properties": {
"routes_yaml": {
"type": "string",
"description": (
"Full proposed /etc/egress/routes.yaml content. "
"Each route entry accepts these keys:\n"
" host: <hostname> (required)\n"
" auth_scheme: Bearer|token (must pair with token_env)\n"
" token_env: <ENV_VAR_NAME> (must pair with auth_scheme)\n"
" matches: (optional list of match entries)\n"
" - paths: [{type: prefix|exact|regex, value: /...}]\n"
" methods: [GET, POST, ...]\n"
" headers: [{name: X-Hdr, value: val, type: exact|regex}]\n"
" git: (optional; omit to block git clone/fetch)\n"
" fetch: true\n"
" dlp: (optional DLP scanner overrides)\n"
" outbound_detectors: [token_patterns, known_secrets]\n"
" inbound_detectors: [naive_injection_detection]\n"
"Omit any key that should use its default. "
"`list-egress-routes` returns routes in this same format."
),
},
"justification": {
"type": "string",
"description": "Why this egress route is needed.",
},
},
"required": ["routes_yaml", "justification"],
},
},
{
"name": _sv.TOOL_CAPABILITY_BLOCK,
"description": (
@@ -271,12 +182,11 @@ TOOL_DEFINITIONS: list[dict[str, object]] = [
]
# Map each proposal tool to the input field that carries the agent's
# payload (stored in Proposal.proposed_file).
# Map each non-egress tool to the input field that carries the agent's
# payload (stored in Proposal.proposed_file). egress-block builds its
# payload from structured input fields in `handle_egress_block`.
PROPOSED_FILE_FIELD: dict[str, str] = {
_sv.TOOL_ALLOW: "routes_yaml",
_sv.TOOL_CAPABILITY_BLOCK: "dockerfile",
_sv.TOOL_EGRESS_BLOCK: "routes_yaml",
}
@@ -293,14 +203,6 @@ def validate_proposed_file(tool: str, content: str) -> None:
# Dockerfiles are too varied to validate syntactically beyond
# non-empty. The operator reads the diff in the TUI.
pass
elif tool in (_sv.TOOL_ALLOW, _sv.TOOL_EGRESS_BLOCK):
try:
load_routes(content)
except ValueError as e:
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: proposed routes.yaml is not valid: {e}",
) from e
else:
raise _RpcError(ERR_INVALID_PARAMS, f"unknown tool {tool!r}")
-159
View File
@@ -1,159 +0,0 @@
# PRD prd-new: Commit bottle state to an image
- **Status:** Active
- **Author:** Claude
- **Created:** 2026-06-20
- **Issue:** #194
## Summary
Add a `commit` CLI command that freezes a running bottle's state to a
resumable local artifact. Docker bottles are stored as Docker images;
smolmachines bottles are stored as `.smolmachine` artifacts. Operators
can then resume the bottle from that exact filesystem snapshot, or
export the artifact to migrate work to a different host.
## Problem
When a long-running agent session is interrupted — by a host reboot, a
network failure, or a planned infrastructure migration — the in-progress
container state is lost. `cli.py resume` rebuilds the agent image from
the Dockerfile and reprovi-sions the bottle, but that returns the guest
to its initial state, not to wherever the agent was mid-task.
There is no mechanism today to capture "what's installed / configured
inside the running container right now" and make it reproducible. The
`capability-block` flow writes a new Dockerfile and marks the bottle for
resume, but that only applies when the agent itself has requested a
capability change; it doesn't help the operator who wants to take a
snapshot before a planned host reboot or hardware migration.
## Goals / Success Criteria
- `./cli.py commit [<slug>]` takes a snapshot of the running agent and
stores it as a local artifact.
- Without a slug argument the command shows the same interactive picker
as `start` (the list of active slugs).
- The committed artifact reference is stored in per-bottle state so
that the next `./cli.py resume <slug>` automatically uses the
snapshot instead of rebuilding from the Dockerfile.
- `mark_preserved` is called so the state dir survives the normal
session-end cleanup.
- A backend-specific export hint is printed so operators know how to
migrate the snapshot.
- The command errors clearly on unsupported backends.
## Non-goals
- macOS-container backend support.
- Automatic commit on agent exit.
- Image push to a remote registry.
- Storing the image tag in the manifest or sharing it between operators.
## Design
### Docker image tag
`bot-bottle-committed-<slug>:latest` — namespaced under `bot-bottle-`
to match existing image naming conventions; `committed` distinguishes it
from the build-time image (`bot-bottle-claude:latest`) and the
capability-block rebuild image (`bot-bottle-rebuilt-<identity>:latest`).
### State storage
A new plain-text file `committed-image` is added to the per-bottle state
directory:
```
~/.bot-bottle/state/<identity>/
metadata.json
Dockerfile (capability-block override; optional)
committed-image (committed artifact reference; optional)
transcript/
```
`bottle_state.committed_image_path(identity)` returns the path.
`write_committed_image` / `read_committed_image` are the read/write
helpers, matching the existing `per_bottle_dockerfile` pattern. Docker
stores a Docker tag in this file; smolmachines stores the absolute path
to the committed `.smolmachine` artifact.
### `commit` command
```
./cli.py commit [<slug>]
```
1. Resolve slug (arg or interactive picker from `enumerate_active_agents`).
2. Check metadata and branch by backend.
3. For Docker, derive container name `bot-bottle-<slug>` and run
`docker commit <container> bot-bottle-committed-<slug>:latest`.
4. For smolmachines, derive machine name `bot-bottle-<slug>` and run
`smolvm pack create --from-vm <machine> -o ~/.bot-bottle/state/<slug>/committed-smolmachine`.
5. Write the Docker image tag or smolmachine artifact path to
`~/.bot-bottle/state/<slug>/committed-image`.
6. Call `mark_preserved(<slug>)` so the state dir survives session-end.
7. Print the resume hint and a backend-specific export example.
### Resume from committed image
`bot_bottle/backend/docker/launch.py` already rebuilds the agent image
at the top of the `launch` context manager. The change is a check
immediately before that step:
```python
committed = read_committed_image(plan.slug)
if committed and docker_mod.image_exists(committed):
info(f"using committed image {committed!r}")
plan = dataclasses.replace(
plan,
agent_provision=dataclasses.replace(plan.agent_provision, image=committed),
)
else:
docker_mod.build_image(plan.image, _REPO_DIR, dockerfile=plan.dockerfile_path)
```
Replacing `agent_provision.image` propagates to `plan.image` (a
property) and from there to the Compose spec renderer's `_agent_service`
`image:` field, so the container boots from the committed snapshot.
The build step is skipped entirely when a committed image is found and
exists locally.
If the committed image has been deleted from the local daemon (e.g.
after `docker rmi` or a `docker system prune`), the launch falls back
to a normal Dockerfile build, matching the pre-commit behavior.
### Resume from committed smolmachine
`bot_bottle/backend/smolmachines/launch.py` checks the committed
reference before the normal Docker build -> pack cache path:
```python
committed = read_committed_image(plan.slug)
if committed and Path(committed).is_file():
return Path(committed)
return _ensure_smolmachine(plan.agent_image, dockerfile=plan.agent_dockerfile_path)
```
The returned path is passed to `smolvm machine create --from`, so the
resumed VM boots from the committed snapshot. If the artifact has been
deleted, launch falls back to the normal build and pack flow.
## Testing strategy
- Unit tests for `write_committed_image` / `read_committed_image` in
`tests/unit/test_bottle_state.py`, using the existing `_FakeHomeMixin`
pattern.
- Unit tests for `commit_container` in `tests/unit/test_docker_util_image.py`,
mocking `subprocess.run` and asserting on the `docker commit` argv.
- Unit tests for `cmd_commit` argument parsing, Docker commit,
smolmachines pack, and the unsupported backend error path, mocking
`enumerate_active_agents`, `commit_container`, and
`pack_create_from_vm`.
- Unit tests for the launch-step committed-image branch: patch
`read_committed_image` to return a tag, patch `image_exists` to return
True, and assert that `build_image` is not called and `plan.image` is
overridden.
- Unit tests for the smolmachines launch-step committed-artifact branch:
patch `read_committed_image` to return an existing path and assert the
normal `_ensure_smolmachine` path is skipped.
-216
View File
@@ -1,216 +0,0 @@
"""Unit: Freezer class hierarchy."""
from __future__ import annotations
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch
from bot_bottle import supervise, bottle_state
from bot_bottle.backend import ActiveAgent
from bot_bottle.backend.freeze import get_freezer
from bot_bottle.backend.docker.freezer import DockerFreezer
from bot_bottle.backend.macos_container.freezer import MacosContainerFreezer
from bot_bottle.backend.smolmachines.freezer import SmolmachinesFreezer
class _FakeHomeMixin:
def _setup_fake_home(self):
self._tmp = tempfile.TemporaryDirectory(prefix="freezer-test.")
original = supervise.bot_bottle_root
def fake_root() -> Path:
return Path(self._tmp.name) / ".bot-bottle"
supervise.bot_bottle_root = fake_root # type: ignore[assignment]
self._restore = lambda: setattr(supervise, "bot_bottle_root", original)
def _teardown_fake_home(self):
self._restore()
self._tmp.cleanup()
def _make_agent(slug: str, backend: str = "docker") -> ActiveAgent:
return ActiveAgent(
backend_name=backend,
slug=slug,
agent_name="dev",
started_at="t",
services=(),
)
class TestGetFreezer(unittest.TestCase):
def test_docker(self):
self.assertIsInstance(get_freezer("docker"), DockerFreezer)
def test_empty_backend_gives_docker(self):
self.assertIsInstance(get_freezer(""), DockerFreezer)
def test_macos_container(self):
self.assertIsInstance(get_freezer("macos-container"), MacosContainerFreezer)
def test_smolmachines(self):
self.assertIsInstance(get_freezer("smolmachines"), SmolmachinesFreezer)
def test_unknown_backend_dies(self):
with patch("bot_bottle.backend.freeze.die", side_effect=SystemExit("die")):
with self.assertRaises(SystemExit):
get_freezer("unknown-backend")
class TestFreezerBaseCommit(_FakeHomeMixin, unittest.TestCase):
"""The base Freezer.commit() owns the shared post-freeze steps."""
def setUp(self):
self._setup_fake_home()
def tearDown(self):
self._teardown_fake_home()
def test_writes_committed_image_and_marks_preserved(self):
slug = "dev-abc12"
bottle_state.write_metadata(bottle_state.BottleMetadata(
identity=slug, agent_name="dev", cwd="", copy_cwd=False,
started_at="t", backend="docker",
))
freezer = get_freezer("docker")
agent = _make_agent(slug)
with patch.object(freezer, "_freeze", return_value="bot-bottle-committed-dev-abc12:latest"), \
patch("bot_bottle.backend.freeze.info"):
freezer.commit(agent)
self.assertEqual(
"bot-bottle-committed-dev-abc12:latest",
bottle_state.read_committed_image(slug),
)
self.assertTrue(bottle_state.is_preserved(slug))
def test_commit_slug_passes_correct_slug_to_freeze(self):
slug = "dev-abc12"
bottle_state.write_metadata(bottle_state.BottleMetadata(
identity=slug, agent_name="dev", cwd="", copy_cwd=False,
started_at="t", backend="docker",
))
freezer = get_freezer("docker")
captured = {}
def capture_freeze(agent: ActiveAgent) -> str:
captured["slug"] = agent.slug
return "some-ref"
with patch.object(freezer, "_freeze", side_effect=capture_freeze), \
patch("bot_bottle.backend.freeze.info"):
freezer.commit_slug(slug)
self.assertEqual(slug, captured["slug"])
class TestDockerFreezer(_FakeHomeMixin, unittest.TestCase):
def setUp(self):
self._setup_fake_home()
def tearDown(self):
self._teardown_fake_home()
def test_commits_container_and_records_image(self):
slug = "dev-abc12"
bottle_state.write_metadata(bottle_state.BottleMetadata(
identity=slug, agent_name="dev", cwd="", copy_cwd=False,
started_at="t", backend="docker",
))
freezer = DockerFreezer()
agent = _make_agent(slug)
with patch("bot_bottle.backend.docker.freezer.commit_container") as mock_commit, \
patch("bot_bottle.backend.freeze.info"), \
patch("bot_bottle.backend.docker.freezer.info"):
freezer.commit(agent)
mock_commit.assert_called_once_with(
f"bot-bottle-{slug}",
f"bot-bottle-committed-{slug}:latest",
)
self.assertEqual(
f"bot-bottle-committed-{slug}:latest",
bottle_state.read_committed_image(slug),
)
self.assertTrue(bottle_state.is_preserved(slug))
class TestMacosContainerFreezer(_FakeHomeMixin, unittest.TestCase):
def setUp(self):
self._setup_fake_home()
def tearDown(self):
self._teardown_fake_home()
def _write_meta(self, slug: str) -> None:
bottle_state.write_metadata(bottle_state.BottleMetadata(
identity=slug, agent_name="dev", cwd="", copy_cwd=False,
started_at="t", backend="macos-container",
))
def test_commits_running_container_without_stopping(self):
"""Commit should exec-tar the running container, not stop it."""
slug = "dev-abc12"
self._write_meta(slug)
freezer = MacosContainerFreezer()
agent = _make_agent(slug, "macos-container")
with patch("bot_bottle.backend.macos_container.freezer.commit_container") as mock_commit, \
patch("bot_bottle.backend.freeze.info"), \
patch("bot_bottle.backend.macos_container.freezer.info"):
freezer.commit(agent)
mock_commit.assert_called_once_with(
f"bot-bottle-{slug}",
f"bot-bottle-committed-{slug}:latest",
)
self.assertEqual(
f"bot-bottle-committed-{slug}:latest",
bottle_state.read_committed_image(slug),
)
self.assertTrue(bottle_state.is_preserved(slug))
class TestSmolmachinesFreezer(_FakeHomeMixin, unittest.TestCase):
def setUp(self):
self._setup_fake_home()
def tearDown(self):
self._teardown_fake_home()
def _write_meta(self, slug: str) -> None:
bottle_state.write_metadata(bottle_state.BottleMetadata(
identity=slug, agent_name="dev", cwd="", copy_cwd=False,
started_at="t", backend="smolmachines",
))
def test_snapshots_running_vm_without_stopping(self):
"""Commit should exec-tar the running VM, not stop it."""
slug = "dev-abc12"
self._write_meta(slug)
freezer = SmolmachinesFreezer()
agent = _make_agent(slug, "smolmachines")
with patch("bot_bottle.backend.smolmachines.freezer._snapshot_running_vm") as mock_snap, \
patch("bot_bottle.backend.freeze.info"), \
patch("bot_bottle.backend.smolmachines.freezer.info"):
freezer.commit(agent)
expected_binary = bottle_state.bottle_state_dir(slug) / "committed-smolmachine"
mock_snap.assert_called_once_with(
f"bot-bottle-{slug}",
f"bot-bottle-committed-{slug}:latest",
expected_binary,
)
expected_sidecar = str(expected_binary.with_suffix(".smolmachine"))
self.assertEqual(expected_sidecar, bottle_state.read_committed_image(slug))
self.assertTrue(bottle_state.is_preserved(slug))
if __name__ == "__main__":
unittest.main()
-51
View File
@@ -277,56 +277,5 @@ class TestBottleMetadataBackend(_FakeHomeMixin, unittest.TestCase):
self.assertEqual("", loaded.backend)
class TestCommittedImage(_FakeHomeMixin, unittest.TestCase):
"""write_committed_image / read_committed_image round-trip."""
def setUp(self):
self._setup_fake_home()
def tearDown(self):
self._teardown_fake_home()
def test_returns_none_when_absent(self):
self.assertIsNone(bottle_state.read_committed_image("dev"))
def test_write_then_read_roundtrip(self):
bottle_state.write_committed_image("dev", "bot-bottle-committed-dev:latest")
self.assertEqual(
"bot-bottle-committed-dev:latest",
bottle_state.read_committed_image("dev"),
)
def test_strips_trailing_newline_on_read(self):
path = bottle_state.committed_image_path("dev")
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text("bot-bottle-committed-dev:latest\n\n")
self.assertEqual(
"bot-bottle-committed-dev:latest",
bottle_state.read_committed_image("dev"),
)
def test_isolated_per_slug(self):
bottle_state.write_committed_image("dev", "bot-bottle-committed-dev:latest")
bottle_state.write_committed_image("api", "bot-bottle-committed-api:latest")
self.assertEqual(
"bot-bottle-committed-dev:latest",
bottle_state.read_committed_image("dev"),
)
self.assertEqual(
"bot-bottle-committed-api:latest",
bottle_state.read_committed_image("api"),
)
def test_path_under_state_dir(self):
path = bottle_state.committed_image_path("dev")
self.assertTrue(str(path).endswith("/.bot-bottle/state/dev/committed-image"))
def test_empty_content_returns_none(self):
path = bottle_state.committed_image_path("dev")
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(" \n")
self.assertIsNone(bottle_state.read_committed_image("dev"))
if __name__ == "__main__":
unittest.main()
-143
View File
@@ -1,143 +0,0 @@
"""Unit: cli.py commit command."""
from __future__ import annotations
import tempfile
import unittest
from pathlib import Path
from unittest.mock import MagicMock, patch
from bot_bottle.cli.commit import cmd_commit
from bot_bottle import supervise
from bot_bottle import bottle_state
from bot_bottle.backend.freeze import CommitCancelled
class _FakeHomeMixin:
def _setup_fake_home(self):
self._tmp = tempfile.TemporaryDirectory(prefix="cli-commit-test.")
original = supervise.bot_bottle_root
def fake_root() -> Path:
return Path(self._tmp.name) / ".bot-bottle"
supervise.bot_bottle_root = fake_root # type: ignore[assignment]
self._restore = lambda: setattr(supervise, "bot_bottle_root", original)
def _teardown_fake_home(self):
self._restore()
self._tmp.cleanup()
class TestCmdCommitSlugArg(_FakeHomeMixin, unittest.TestCase):
"""cmd_commit with an explicit slug delegates to get_freezer."""
def setUp(self):
self._setup_fake_home()
def tearDown(self):
self._teardown_fake_home()
def _write_meta(self, slug: str, backend: str) -> None:
bottle_state.write_metadata(bottle_state.BottleMetadata(
identity=slug, agent_name="dev", cwd="", copy_cwd=False,
started_at="t", backend=backend,
))
def test_commits_docker_bottle(self):
slug = "dev-abc12"
self._write_meta(slug, "docker")
with patch("bot_bottle.cli.commit.get_freezer") as mock_gf:
mock_freezer = MagicMock()
mock_gf.return_value = mock_freezer
rc = cmd_commit([slug])
self.assertEqual(0, rc)
mock_gf.assert_called_once_with("docker")
mock_freezer.commit_slug.assert_called_once_with(slug)
def test_empty_backend_passed_to_get_freezer(self):
"""Old state dirs without a backend field pass '' to get_freezer."""
slug = "dev-abc12"
self._write_meta(slug, "")
with patch("bot_bottle.cli.commit.get_freezer") as mock_gf:
mock_freezer = MagicMock()
mock_gf.return_value = mock_freezer
rc = cmd_commit([slug])
self.assertEqual(0, rc)
mock_gf.assert_called_once_with("")
def test_commits_macos_container_bottle(self):
slug = "dev-abc12"
self._write_meta(slug, "macos-container")
with patch("bot_bottle.cli.commit.get_freezer") as mock_gf:
mock_freezer = MagicMock()
mock_gf.return_value = mock_freezer
rc = cmd_commit([slug])
self.assertEqual(0, rc)
mock_gf.assert_called_once_with("macos-container")
mock_freezer.commit_slug.assert_called_once_with(slug)
def test_commits_smolmachines_bottle(self):
slug = "dev-abc12"
self._write_meta(slug, "smolmachines")
with patch("bot_bottle.cli.commit.get_freezer") as mock_gf:
mock_freezer = MagicMock()
mock_gf.return_value = mock_freezer
rc = cmd_commit([slug])
self.assertEqual(0, rc)
mock_gf.assert_called_once_with("smolmachines")
def test_returns_zero_on_commit_cancelled(self):
slug = "dev-abc12"
self._write_meta(slug, "macos-container")
with patch("bot_bottle.cli.commit.get_freezer") as mock_gf:
mock_freezer = MagicMock()
mock_freezer.commit_slug.side_effect = CommitCancelled
mock_gf.return_value = mock_freezer
rc = cmd_commit([slug])
self.assertEqual(0, rc)
class TestCmdCommitNoActiveBottles(_FakeHomeMixin, unittest.TestCase):
def setUp(self):
self._setup_fake_home()
def tearDown(self):
self._teardown_fake_home()
def test_dies_when_no_active_bottles_and_no_slug(self):
with patch(
"bot_bottle.cli.commit.enumerate_active_agents", return_value=[],
), patch(
"bot_bottle.cli.commit.die", side_effect=SystemExit("die"),
) as mock_die:
with self.assertRaises(SystemExit):
cmd_commit([])
mock_die.assert_called_once()
def test_returns_zero_when_picker_cancelled(self):
active = MagicMock()
active.slug = "dev-abc12"
with patch(
"bot_bottle.cli.commit.enumerate_active_agents", return_value=[active],
), patch(
"bot_bottle.cli.commit.tui.filter_select", return_value=None,
):
rc = cmd_commit([])
self.assertEqual(0, rc)
if __name__ == "__main__":
unittest.main()
+1 -1
View File
@@ -392,7 +392,7 @@ class TestSidecarBundleShape(unittest.TestCase):
"services"]["sidecars"]
targets = {v["target"] for v in sc["volumes"]}
self.assertIn("/home/mitmproxy/.mitmproxy/mitmproxy-ca.pem", targets)
self.assertIn("/etc/egress", targets)
self.assertIn("/etc/egress/routes.yaml", targets)
self.assertIn("/git-gate-entrypoint.sh", targets)
self.assertIn("/git-gate/creds/upstream-known_hosts", targets)
self.assertTrue(any("supervise/queue" in t or t.startswith("/run/supervise")
+4 -4
View File
@@ -292,10 +292,10 @@ class TestCodexSuperviseMcp(unittest.TestCase):
bottle.exec.assert_called_once()
script = bottle.exec.call_args.args[0]
self.assertEqual("node", bottle.exec.call_args.kwargs.get("user"))
self.assertEqual(
f"codex mcp add supervise --url {_URL}",
script,
)
self.assertIn("codex mcp add", script)
self.assertIn("--transport http", script)
self.assertIn("supervise", script)
self.assertIn(_URL, script)
def test_logs_warning_on_failure_but_does_not_raise(self):
bottle = _make_bottle(
@@ -1,192 +0,0 @@
"""Unit: Docker launch step uses committed image when available."""
from __future__ import annotations
import contextlib
import io
import tempfile
import unittest
from pathlib import Path
from typing import Any
from unittest import mock
from bot_bottle.agent_provider import AgentProvisionPlan
from bot_bottle.backend import BottleSpec
from bot_bottle.backend.docker import launch as launch_mod
from bot_bottle.backend.docker.bottle_plan import DockerBottlePlan
from bot_bottle.egress import EgressPlan
from bot_bottle.git_gate import GitGatePlan
from bot_bottle.manifest import ManifestIndex
_SLUG = "dev-abc12"
_COMMITTED_TAG = f"bot-bottle-committed-{_SLUG}:latest"
_DEFAULT_IMAGE = "bot-bottle-claude:latest"
_IDX = ManifestIndex.from_json_obj({
"bottles": {"dev": {}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
})
def _plan(tmp: str) -> DockerBottlePlan:
stage = Path(tmp)
spec = BottleSpec(
manifest=_IDX,
agent_name="demo",
copy_cwd=False,
user_cwd=tmp,
identity=_SLUG,
)
return DockerBottlePlan(
spec=spec,
manifest=_IDX.load_for_agent("demo"),
stage_dir=stage,
git_gate_plan=GitGatePlan(
slug=_SLUG,
entrypoint_script=stage / "entrypoint.sh",
hook_script=stage / "hook.sh",
access_hook_script=stage / "access-hook.sh",
upstreams=(),
),
egress_plan=EgressPlan(
slug=_SLUG,
routes_path=stage / "egress.yaml",
routes=(),
token_env_map={},
),
supervise_plan=None,
agent_provision=AgentProvisionPlan(
template="claude",
command="claude",
prompt_mode="append_file",
image=_DEFAULT_IMAGE,
dockerfile="",
guest_home="/home/node",
instance_name=f"bot-bottle-{_SLUG}",
prompt_file=stage / "prompt.txt",
guest_env={},
),
slug=_SLUG,
forwarded_env={},
use_runsc=False,
)
class TestLaunchCommittedImage(unittest.TestCase):
def setUp(self) -> None:
self._tmp = tempfile.mkdtemp(prefix="launch-committed-test.")
def tearDown(self) -> None:
import shutil
shutil.rmtree(self._tmp, ignore_errors=True)
def _run_launch(
self,
plan: DockerBottlePlan,
*,
committed_tag: str | None = None,
image_present: bool = True,
) -> list[str]:
"""Drive launch() through its full sequence with the committed-image
behaviour controlled by the arguments. Returns the images that were
passed to `build_image` (empty list if it was never called)."""
built: list[str] = []
def fake_build(image: str, ctx: str, *, dockerfile: str = "") -> None:
del ctx, dockerfile
built.append(image)
with mock.patch.object(
launch_mod, "read_committed_image", return_value=committed_tag,
), mock.patch.object(
launch_mod.docker_mod, "image_exists", return_value=image_present,
), mock.patch.object(
launch_mod.docker_mod, "build_image", side_effect=fake_build,
), mock.patch.object(
launch_mod, "egress_tls_init",
return_value=(Path("/egress_ca"), Path("/egress_cert")),
), mock.patch.object(
launch_mod.network_mod, "network_name_for_slug",
return_value="bb-internal",
), mock.patch.object(
launch_mod.network_mod, "network_egress_name_for_slug",
return_value="bb-egress",
), mock.patch.object(
launch_mod, "bottle_plan_to_compose",
return_value={"services": {"agent": {}}},
), mock.patch.object(
launch_mod, "write_compose_file",
return_value=Path("/tmp/compose.yml"),
), mock.patch.object(launch_mod, "compose_up"), \
mock.patch.object(launch_mod, "compose_dump_logs"), \
mock.patch.object(launch_mod, "compose_down"), \
contextlib.redirect_stderr(io.StringIO()):
provision = mock.Mock(return_value=None)
with launch_mod.launch(plan, provision=provision):
pass
return built
def test_skips_build_when_committed_image_present(self) -> None:
plan = _plan(self._tmp)
built = self._run_launch(plan, committed_tag=_COMMITTED_TAG, image_present=True)
self.assertEqual([], built, "build_image should not be called when committed image exists")
def test_uses_committed_image_in_compose_spec(self) -> None:
"""The compose spec renderer receives the committed image tag via
plan.image captured here by checking what bottle_plan_to_compose
was called with."""
plan = _plan(self._tmp)
captured_plans: list[DockerBottlePlan] = []
def fake_compose(p: DockerBottlePlan) -> dict[str, Any]:
captured_plans.append(p)
return {"services": {"agent": {}}}
with mock.patch.object(
launch_mod, "read_committed_image", return_value=_COMMITTED_TAG,
), mock.patch.object(
launch_mod.docker_mod, "image_exists", return_value=True,
), mock.patch.object(
launch_mod.docker_mod, "build_image",
), mock.patch.object(
launch_mod, "egress_tls_init",
return_value=(Path("/egress_ca"), Path("/egress_cert")),
), mock.patch.object(
launch_mod.network_mod, "network_name_for_slug",
return_value="bb-internal",
), mock.patch.object(
launch_mod.network_mod, "network_egress_name_for_slug",
return_value="bb-egress",
), mock.patch.object(
launch_mod, "bottle_plan_to_compose", side_effect=fake_compose,
), mock.patch.object(
launch_mod, "write_compose_file",
return_value=Path("/tmp/compose.yml"),
), mock.patch.object(launch_mod, "compose_up"), \
mock.patch.object(launch_mod, "compose_dump_logs"), \
mock.patch.object(launch_mod, "compose_down"), \
contextlib.redirect_stderr(io.StringIO()):
provision = mock.Mock(return_value=None)
with launch_mod.launch(plan, provision=provision):
pass
self.assertEqual(1, len(captured_plans))
self.assertEqual(_COMMITTED_TAG, captured_plans[0].image)
def test_falls_back_to_build_when_no_committed_image(self) -> None:
plan = _plan(self._tmp)
built = self._run_launch(plan, committed_tag=None)
self.assertEqual([_DEFAULT_IMAGE], built)
def test_falls_back_to_build_when_committed_image_missing_from_daemon(self) -> None:
plan = _plan(self._tmp)
built = self._run_launch(
plan, committed_tag=_COMMITTED_TAG, image_present=False,
)
self.assertEqual([_DEFAULT_IMAGE], built)
if __name__ == "__main__":
unittest.main()
-41
View File
@@ -67,46 +67,5 @@ class TestSave(unittest.TestCase):
)
class TestCommitContainer(unittest.TestCase):
def test_runs_docker_commit(self):
with patch.object(
docker_mod.subprocess, "run", return_value=_ok(),
) as run, patch.object(docker_mod, "info"):
docker_mod.commit_container(
"bot-bottle-dev-abc12",
"bot-bottle-committed-dev-abc12:latest",
)
argv = run.call_args.args[0]
self.assertEqual(
[
"docker", "commit",
"bot-bottle-dev-abc12",
"bot-bottle-committed-dev-abc12:latest",
],
argv,
)
def test_dies_on_docker_commit_failure(self):
with patch.object(
docker_mod.subprocess, "run", return_value=_fail("No such container"),
), patch.object(
docker_mod, "die", side_effect=SystemExit("die"),
) as die:
with self.assertRaises(SystemExit):
docker_mod.commit_container("missing-container", "some:tag")
die.assert_called_once()
self.assertIn("missing-container", die.call_args.args[0])
def test_die_message_includes_image_tag(self):
with patch.object(
docker_mod.subprocess, "run", return_value=_fail("boom"),
), patch.object(
docker_mod, "die", side_effect=SystemExit("die"),
) as die:
with self.assertRaises(SystemExit):
docker_mod.commit_container("ctr", "my-tag:v1")
self.assertIn("my-tag:v1", die.call_args.args[0])
if __name__ == "__main__":
unittest.main()
+11 -54
View File
@@ -2,15 +2,12 @@
add_route removed; docker exec / cp / kill paths are covered by the
integration test)."""
import tempfile
import unittest
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import patch
from bot_bottle import supervise
from bot_bottle.backend.egress_apply import EgressApplyError
from bot_bottle.backend.docker.egress_apply import applicator
from bot_bottle.backend.docker.egress_apply import (
EgressApplyError,
validate_routes_content,
)
_ROUTES_EMPTY = "routes: []\n"
@@ -19,11 +16,11 @@ _ROUTES_ONE = 'routes:\n - host: "api.anthropic.com"\n'
class TestValidateRoutesContent(unittest.TestCase):
def test_accepts_minimal_route_table(self):
applicator.validate_routes_content(_ROUTES_EMPTY)
applicator.validate_routes_content(_ROUTES_ONE)
validate_routes_content(_ROUTES_EMPTY)
validate_routes_content(_ROUTES_ONE)
def test_accepts_full_route_with_matches(self):
applicator.validate_routes_content(
validate_routes_content(
'routes:\n'
' - host: "api.github.com"\n'
' auth_scheme: "Bearer"\n'
@@ -35,65 +32,25 @@ class TestValidateRoutesContent(unittest.TestCase):
def test_rejects_bad_yaml(self):
with self.assertRaises(EgressApplyError) as cm:
applicator.validate_routes_content("routes:\n\t- host: x\n")
validate_routes_content("routes:\n\t- host: x\n")
self.assertIn("not valid", str(cm.exception))
def test_rejects_missing_routes_key(self):
with self.assertRaises(EgressApplyError):
applicator.validate_routes_content("other: []\n")
validate_routes_content("other: []\n")
def test_rejects_non_list_routes(self):
with self.assertRaises(EgressApplyError):
applicator.validate_routes_content('routes: "not a list"\n')
validate_routes_content('routes: "not a list"\n')
def test_rejects_partial_auth_pair(self):
with self.assertRaises(EgressApplyError):
applicator.validate_routes_content(
validate_routes_content(
'routes:\n'
' - host: "x.example"\n'
' auth_scheme: "Bearer"\n'
)
class TestApplyRoutesChange(unittest.TestCase):
def setUp(self):
self._tmp = tempfile.TemporaryDirectory(prefix="egress-apply-test.")
original = supervise.bot_bottle_root
def fake_root() -> Path:
return Path(self._tmp.name) / ".bot-bottle"
supervise.bot_bottle_root = fake_root # type: ignore[assignment]
self.addCleanup(lambda: setattr(supervise, "bot_bottle_root", original))
self.addCleanup(self._tmp.cleanup)
def test_writes_live_routes_and_signals_reload(self):
calls: list[list[str]] = []
def fake_run(argv: list[str], **kwargs: object) -> SimpleNamespace:
calls.append(list(argv))
return SimpleNamespace(returncode=0, stdout="", stderr="")
with patch(
"bot_bottle.backend.docker.egress_apply.subprocess.run",
side_effect=fake_run,
):
before, after = applicator.apply_routes_change(
"dev",
"routes:\n - host: google.com\n",
)
self.assertEqual("", before)
self.assertEqual("routes:\n - host: google.com\n", after)
self.assertEqual(
"routes:\n - host: google.com\n",
(Path(self._tmp.name) / ".bot-bottle/state/dev/egress/routes.yaml").read_text(encoding="utf-8"),
)
self.assertEqual(
["docker", "kill", "--signal", "HUP", "bot-bottle-sidecars-dev"],
calls[0],
)
if __name__ == "__main__":
unittest.main()
+4 -49
View File
@@ -2,32 +2,26 @@
from __future__ import annotations
import sys
import unittest
from unittest.mock import patch
from bot_bottle.backend.macos_container import bottle as bottle_mod
from bot_bottle.backend.macos_container.bottle import MacosContainerBottle, _PTY_FORWARD_SCRIPT
from bot_bottle.backend.macos_container.bottle import MacosContainerBottle
class TestMacosContainerBottle(unittest.TestCase):
def test_agent_argv_uses_pty_forward_and_container_exec(self):
def test_agent_argv_uses_container_exec(self):
bottle = MacosContainerBottle(
"bot-bottle-dev-abc",
lambda: None,
None,
agent_command="codex",
)
with patch.dict(bottle_mod.os.environ, {}, clear=True):
argv = bottle.agent_argv(["run"])
self.assertEqual(
[
sys.executable, _PTY_FORWARD_SCRIPT, "--",
"container", "exec", "--interactive", "--tty",
"--env", "TERM",
"bot-bottle-dev-abc", "codex", "run",
],
argv,
bottle.agent_argv(["run"]),
)
def test_agent_argv_includes_workdir(self):
@@ -37,54 +31,15 @@ class TestMacosContainerBottle(unittest.TestCase):
None,
agent_workdir="/home/node/workspace",
)
with patch.dict(bottle_mod.os.environ, {}, clear=True):
argv = bottle.agent_argv([])
self.assertEqual(
[
sys.executable, _PTY_FORWARD_SCRIPT, "--",
"container", "exec", "--interactive", "--tty",
"--env", "TERM",
"--workdir", "/home/node/workspace",
"bot-bottle-dev-abc", "claude",
],
argv,
bottle.agent_argv([]),
)
def test_agent_argv_forwards_terminal_env_names_without_values(self):
bottle = MacosContainerBottle("bot-bottle-dev-abc", lambda: None, None)
with patch.dict(
bottle_mod.os.environ,
{
"TERM": "screen-256color",
"TERM_PROGRAM": "WezTerm",
"WEZTERM_PANE": "pane-id",
"SHELL": "/bin/zsh",
},
clear=True,
):
argv = bottle.agent_argv([])
self.assertIn("TERM", argv)
self.assertIn("TERM_PROGRAM", argv)
self.assertIn("WEZTERM_PANE", argv)
self.assertNotIn("SHELL", argv)
self.assertNotIn("TERM=screen-256color", argv)
self.assertNotIn("TERM_PROGRAM=WezTerm", argv)
self.assertNotIn("WEZTERM_PANE=pane-id", argv)
def test_agent_argv_always_forwards_term_name(self):
bottle = MacosContainerBottle("bot-bottle-dev-abc", lambda: None, None)
with patch.dict(bottle_mod.os.environ, {}, clear=True):
argv = bottle.agent_argv([])
self.assertIn("TERM", argv)
def test_agent_argv_no_tty_omits_wrapper_and_tty_flags(self):
bottle = MacosContainerBottle("bot-bottle-dev-abc", lambda: None, None)
argv = bottle.agent_argv([], tty=False)
self.assertNotIn("--tty", argv)
self.assertNotIn("--env", argv)
self.assertNotIn(_PTY_FORWARD_SCRIPT, argv)
self.assertEqual(["container", "exec", "bot-bottle-dev-abc", "claude"], argv)
def test_exec_pipes_script_to_shell(self):
bottle = MacosContainerBottle("bot-bottle-dev-abc", lambda: None, None)
with patch("bot_bottle.backend.macos_container.bottle.subprocess.run") as run:
+7 -81
View File
@@ -9,12 +9,8 @@ from types import SimpleNamespace
from typing import cast
from unittest.mock import patch
from bot_bottle.agent_provider import AgentProvisionPlan
from bot_bottle.backend import BottleSpec
from bot_bottle.backend.macos_container import launch
from bot_bottle.backend.macos_container.bottle_plan import MacosContainerBottlePlan
from bot_bottle.egress import EgressPlan
from bot_bottle.git_gate import GitGatePlan
from bot_bottle.manifest import ManifestIndex
_MANIFEST = ManifestIndex.from_json_obj({
@@ -31,7 +27,7 @@ def _plan(
agent_git_gate_url: str = "",
agent_supervise_url: str = "",
) -> MacosContainerBottlePlan:
routes_path = stage_dir / "routes.yaml"
routes_path = stage_dir / "source-routes.yaml"
routes_path.write_text("routes: []\n", encoding="utf-8")
ca_dir = stage_dir / "egress-ca"
ca_dir.mkdir(exist_ok=True)
@@ -129,10 +125,15 @@ class TestMacosContainerLaunchArgv(unittest.TestCase):
f"type=bind,source={self.stage_dir / 'egress-ca'},target=/home/mitmproxy/.mitmproxy",
argv,
)
routes_dir = self.stage_dir / "macos-container-egress"
self.assertIn(
f"type=bind,source={self.stage_dir},target=/etc/egress,readonly",
f"type=bind,source={routes_dir},target=/etc/egress,readonly",
argv,
)
self.assertEqual(
"routes: []\n",
(routes_dir / "routes.yaml").read_text(encoding="utf-8"),
)
self.assertIn(
"type=bind,source=/state/supervise/queue,target=/run/supervise/queue",
argv,
@@ -265,80 +266,5 @@ class TestMacosContainerLaunchArgv(unittest.TestCase):
)
def _build_plan(stage_dir: Path) -> MacosContainerBottlePlan:
return MacosContainerBottlePlan(
spec=cast(BottleSpec, SimpleNamespace()),
manifest=_MANIFEST,
stage_dir=stage_dir,
git_gate_plan=cast(GitGatePlan, SimpleNamespace(upstreams=())),
egress_plan=cast(EgressPlan, SimpleNamespace()),
supervise_plan=None,
agent_provision=AgentProvisionPlan(
template="claude",
command="claude",
prompt_mode="append_file",
image="bot-bottle-agent:latest",
dockerfile="/repo/Dockerfile",
guest_home="/home/node",
instance_name="bot-bottle-dev-abc",
prompt_file=stage_dir / "prompt.txt",
guest_env={},
),
slug="dev-abc",
forwarded_env={},
)
class TestMacosContainerLaunchCommittedImage(unittest.TestCase):
def setUp(self):
self._tmp = tempfile.TemporaryDirectory()
self.stage_dir = Path(self._tmp.name)
def tearDown(self):
self._tmp.cleanup()
def test_build_images_uses_committed_image_when_present(self):
plan = _build_plan(self.stage_dir)
calls = []
def fake_build(image: str, context: str, *, dockerfile: str = "") -> None:
calls.append((image, context, dockerfile))
with patch.object(
launch, "read_committed_image",
return_value="bot-bottle-committed-dev-abc:latest",
), patch.object(
launch.container_mod, "image_exists", return_value=True,
), patch.object(
launch.container_mod, "build_image", side_effect=fake_build,
), patch.object(launch, "info"):
updated = launch._build_images(plan)
self.assertEqual("bot-bottle-committed-dev-abc:latest", updated.image)
self.assertEqual(1, len(calls))
self.assertEqual(launch.SIDECAR_BUNDLE_IMAGE, calls[0][0])
def test_build_images_builds_agent_when_committed_image_missing(self):
plan = _build_plan(self.stage_dir)
calls = []
def fake_build(image: str, context: str, *, dockerfile: str = "") -> None:
calls.append((image, context, dockerfile))
with patch.object(
launch, "read_committed_image",
return_value="bot-bottle-committed-dev-abc:latest",
), patch.object(
launch.container_mod, "image_exists", return_value=False,
), patch.object(
launch.container_mod, "build_image", side_effect=fake_build,
):
updated = launch._build_images(plan)
self.assertEqual("bot-bottle-agent:latest", updated.image)
self.assertEqual(2, len(calls))
self.assertEqual("bot-bottle-agent:latest", calls[1][0])
if __name__ == "__main__":
unittest.main()
@@ -1,159 +0,0 @@
"""Unit: macos-container pty_forward raw-mode wrapper (issue #245).
Tests argument parsing, non-TTY fallback, and the raw-mode
setup/restore sequence without requiring a real terminal.
"""
from __future__ import annotations
import io
import termios
import unittest
from unittest.mock import ANY, MagicMock, patch
from bot_bottle.backend.macos_container import pty_forward
def _fake_stdin(fd: int = 0) -> MagicMock:
"""Return a mock stdin whose fileno() returns *fd*."""
m = MagicMock()
m.fileno.return_value = fd
return m
class TestArgvParsing(unittest.TestCase):
def test_missing_separator_returns_error_exit_code(self):
with patch.object(pty_forward.sys, "stderr", new=io.StringIO()) as err:
rc = pty_forward.main(["container", "exec"])
self.assertEqual(2, rc)
self.assertIn("usage:", err.getvalue())
def test_too_few_args_returns_error_exit_code(self):
with patch.object(pty_forward.sys, "stderr", new=io.StringIO()):
self.assertEqual(2, pty_forward.main([]))
self.assertEqual(2, pty_forward.main(["--"]))
def test_separator_at_start_with_inner_is_valid(self):
with (
patch.object(pty_forward.sys, "stdin", _fake_stdin()),
patch.object(pty_forward.os, "isatty", return_value=False),
patch.object(pty_forward.subprocess, "run") as run,
):
run.return_value.returncode = 0
rc = pty_forward.main(["--", "container", "exec"])
self.assertEqual(0, rc)
run.assert_called_once()
self.assertEqual(["container", "exec"], run.call_args.args[0])
self.assertFalse(run.call_args.kwargs["check"])
class TestNonTtyFallback(unittest.TestCase):
def test_non_tty_stdin_runs_inner_directly(self):
with (
patch.object(pty_forward.sys, "stdin", _fake_stdin()),
patch.object(pty_forward.os, "isatty", return_value=False),
patch.object(pty_forward.subprocess, "run") as run,
):
run.return_value.returncode = 42
rc = pty_forward.main(
["--", "container", "exec", "--interactive", "--tty", "c", "claude"]
)
self.assertEqual(42, rc)
run.assert_called_once()
self.assertEqual(
["container", "exec", "--interactive", "--tty", "c", "claude"],
run.call_args.args[0],
)
self.assertFalse(run.call_args.kwargs["check"])
def test_fileno_error_runs_inner_directly(self):
bad_stdin = MagicMock()
bad_stdin.fileno.side_effect = OSError("pseudofile")
with (
patch.object(pty_forward.sys, "stdin", bad_stdin),
patch.object(pty_forward.subprocess, "run") as run,
):
run.return_value.returncode = 0
rc = pty_forward.main(["--", "container", "exec"])
run.assert_called_once()
self.assertEqual(["container", "exec"], run.call_args.args[0])
self.assertFalse(run.call_args.kwargs["check"])
self.assertEqual(0, rc)
class TestRawModeSetupAndRestore(unittest.TestCase):
def test_tty_stdin_sets_raw_mode_and_restores_on_exit(self):
saved_attrs = object()
with (
patch.object(pty_forward.sys, "stdin", _fake_stdin()),
patch.object(pty_forward.os, "isatty", return_value=True),
patch.object(pty_forward.termios, "tcgetattr", return_value=saved_attrs),
patch.object(pty_forward.tty, "setraw") as setraw,
patch.object(pty_forward.termios, "tcsetattr") as tcsetattr,
patch.object(pty_forward.subprocess, "run") as run,
):
run.return_value.returncode = 0
rc = pty_forward.main(["--", "container", "exec"])
self.assertEqual(0, rc)
setraw.assert_called_once()
tcsetattr.assert_called_once_with(
ANY, termios.TCSADRAIN, saved_attrs,
)
def test_tty_restores_on_subprocess_nonzero_exit(self):
saved_attrs = object()
with (
patch.object(pty_forward.sys, "stdin", _fake_stdin()),
patch.object(pty_forward.os, "isatty", return_value=True),
patch.object(pty_forward.termios, "tcgetattr", return_value=saved_attrs),
patch.object(pty_forward.tty, "setraw"),
patch.object(pty_forward.termios, "tcsetattr") as tcsetattr,
patch.object(pty_forward.subprocess, "run") as run,
):
run.return_value.returncode = 1
rc = pty_forward.main(["--", "container", "exec"])
self.assertEqual(1, rc)
tcsetattr.assert_called_once_with(
ANY, termios.TCSADRAIN, saved_attrs,
)
def test_tcgetattr_error_falls_back_to_bare_run(self):
with (
patch.object(pty_forward.sys, "stdin", _fake_stdin()),
patch.object(pty_forward.os, "isatty", return_value=True),
patch.object(
pty_forward.termios, "tcgetattr",
side_effect=termios.error("not a tty"),
),
patch.object(pty_forward.tty, "setraw") as setraw,
patch.object(pty_forward.subprocess, "run") as run,
):
run.return_value.returncode = 0
rc = pty_forward.main(["--", "container", "exec"])
setraw.assert_not_called()
run.assert_called_once()
self.assertEqual(["container", "exec"], run.call_args.args[0])
self.assertFalse(run.call_args.kwargs["check"])
self.assertEqual(0, rc)
def test_inner_run_sets_term_default_without_mutating_process_env(self):
with (
patch.dict(pty_forward.os.environ, {}, clear=True),
patch.object(pty_forward.subprocess, "run") as run,
):
run.return_value.returncode = 0
rc = pty_forward._run_inner(["container", "exec"])
self.assertNotIn("TERM", pty_forward.os.environ)
self.assertEqual(0, rc)
child_env = run.call_args.kwargs["env"]
self.assertEqual(["TERM"], sorted(child_env.keys()))
self.assertEqual("xterm-256color", child_env["TERM"])
if __name__ == "__main__":
unittest.main()
-47
View File
@@ -73,53 +73,6 @@ resolver #2
)
self.assertTrue(run.call_args_list[-1].kwargs["check"])
def test_commit_container_execs_tar_and_builds_image(self):
# stderr is bytes because subprocess.run uses stderr=PIPE without text=True
completed = util.subprocess.CompletedProcess(
args=[], returncode=0, stdout=b"", stderr=b"",
)
dockerfile_text = ""
def fake_build_image(image_tag: str, context: str, *, dockerfile: str = "") -> None:
nonlocal dockerfile_text
with open(dockerfile, encoding="utf-8") as f:
dockerfile_text = f.read()
with patch.object(util.subprocess, "run", return_value=completed) as run, \
patch.object(util, "build_image", side_effect=fake_build_image) as build_image, \
patch.object(util, "info"):
util.commit_container(
"bot-bottle-dev-abc12",
"bot-bottle-committed-dev-abc12:latest",
)
argv = run.call_args.args[0]
self.assertEqual("container", argv[0])
self.assertEqual("exec", argv[1])
self.assertIn("bot-bottle-dev-abc12", argv)
self.assertIn("tar", argv)
self.assertIn("--directory=/", argv)
build_image.assert_called_once()
self.assertEqual(
"bot-bottle-committed-dev-abc12:latest",
build_image.call_args.args[0],
)
self.assertIn("ADD rootfs.tar /\n", dockerfile_text)
self.assertIn("USER node\n", dockerfile_text)
self.assertIn("WORKDIR /home/node\n", dockerfile_text)
def test_commit_container_dies_on_exec_tar_failure(self):
failed = util.subprocess.CompletedProcess(
args=[], returncode=1, stdout=b"", stderr=b"No such container",
)
with patch.object(util.subprocess, "run", return_value=failed), \
patch.object(util, "die", side_effect=SystemExit("die")) as die:
with self.assertRaises(SystemExit):
util.commit_container("missing-container", "some:tag")
die.assert_called_once()
self.assertIn("missing-container", die.call_args.args[0])
def test_build_image_restarts_builder_when_dns_mismatches(self):
status = util.subprocess.CompletedProcess(
args=[],
@@ -16,8 +16,6 @@ from __future__ import annotations
import tempfile
import unittest
from pathlib import Path
from types import SimpleNamespace
from typing import Any, cast
from unittest.mock import patch
from bot_bottle.backend.smolmachines import launch as _launch_mod
@@ -143,46 +141,5 @@ class TestEnsureSmolmachine(unittest.TestCase):
self.assertTrue(str(pack_args[1]).endswith(f"{digest}.smolmachine"))
class TestAgentFromPath(unittest.TestCase):
def _plan(self) -> Any:
return cast(Any, SimpleNamespace(
slug="dev-abc12",
agent_image="bot-bottle-claude:latest",
agent_dockerfile_path="/repo/Dockerfile",
))
def test_uses_committed_artifact_when_present(self):
with tempfile.TemporaryDirectory(prefix="committed-smolmachine.") as tmp:
artifact = Path(tmp) / "committed-smolmachine.smolmachine"
artifact.write_text("")
with patch.object(
_launch_mod, "read_committed_image", return_value=str(artifact),
), patch.object(
_launch_mod, "_ensure_smolmachine",
) as ensure, patch.object(
_launch_mod, "info",
):
result = _launch_mod._agent_from_path(self._plan())
self.assertEqual(artifact, result)
ensure.assert_not_called()
def test_falls_back_when_committed_artifact_missing(self):
packed = Path("/cache/agent.smolmachine")
with patch.object(
_launch_mod, "read_committed_image",
return_value="/missing/committed.smolmachine",
), patch.object(
_launch_mod, "_ensure_smolmachine", return_value=packed,
) as ensure:
result = _launch_mod._agent_from_path(self._plan())
self.assertEqual(packed, result)
ensure.assert_called_once_with(
"bot-bottle-claude:latest",
dockerfile="/repo/Dockerfile",
)
if __name__ == "__main__":
unittest.main()
-20
View File
@@ -24,7 +24,6 @@ from bot_bottle.backend.smolmachines.smolvm import (
machine_start,
machine_stop,
pack_create,
pack_create_from_vm,
wait_exec_ready,
)
@@ -64,17 +63,6 @@ class TestArgvShapes(unittest.TestCase):
argv,
)
def test_pack_create_from_vm_argv(self):
with self._patch_run() as m:
pack_create_from_vm("bot-bottle-dev-abc12", Path("/tmp/committed"))
argv = m.call_args.args[0]
self.assertEqual(
["smolvm", "pack", "create",
"--from-vm", "bot-bottle-dev-abc12",
"-o", "/tmp/committed"],
argv,
)
def test_machine_create_minimal(self):
with self._patch_run() as m:
machine_create("agent-xyz")
@@ -205,14 +193,6 @@ class TestErrorPath(unittest.TestCase):
with self.assertRaises(SmolvmError):
pack_create("missing:tag", Path("/tmp/out"))
def test_pack_create_from_vm_failure_raises(self):
with patch(
"bot_bottle.backend.smolmachines.smolvm.subprocess.run",
return_value=_fail("pack failed"),
):
with self.assertRaises(SmolvmError):
pack_create_from_vm("bot-bottle-dev-abc12", Path("/tmp/out"))
def test_exec_failure_returns_result(self):
# The in-VM command's exit code is what Bottle.exec sees;
# `false` exiting non-zero is not a smolvm failure.
+3 -10
View File
@@ -317,22 +317,15 @@ class TestToolConstants(unittest.TestCase):
def test_tools_tuple_matches_individual_constants(self):
self.assertEqual(
(
supervise.TOOL_ALLOW,
TOOL_CAPABILITY_BLOCK,
supervise.TOOL_EGRESS_BLOCK,
supervise.TOOL_LIST_EGRESS_ROUTES,
),
supervise.TOOLS,
)
def test_component_map_has_egress_entries(self):
self.assertEqual(
{
supervise.TOOL_ALLOW: "egress",
supervise.TOOL_EGRESS_BLOCK: "egress",
},
supervise.COMPONENT_FOR_TOOL,
)
def test_component_map_has_no_entries(self):
# egress-block removed in issue #198; capability-block never had one.
self.assertEqual({}, supervise.COMPONENT_FOR_TOOL)
class _StubSupervise(supervise.Supervise):
+3 -19
View File
@@ -2,6 +2,9 @@
The curses TUI itself isn't exercised here — these tests cover the
discovery + approve/reject paths that the TUI's key handlers call into.
egress-block (add_route) was removed in issue #198; the TestEgressApplyWiring
class and all stubs for add_route have been dropped accordingly.
"""
import os
@@ -9,7 +12,6 @@ import tempfile
import unittest
from datetime import datetime, timezone
from pathlib import Path
from unittest.mock import patch
from bot_bottle import supervise
from bot_bottle.cli import supervise as supervise_cli
@@ -31,8 +33,6 @@ FIXED = datetime(2026, 5, 25, 12, 0, 0, tzinfo=timezone.utc)
def _proposal(slug: str = "dev", tool: str = TOOL_CAPABILITY_BLOCK) -> Proposal:
payloads = {
TOOL_CAPABILITY_BLOCK: "FROM python:3.13\n",
supervise.TOOL_ALLOW: "routes:\n - host: example.com\n",
supervise.TOOL_EGRESS_BLOCK: "routes:\n - host: example.com\n",
}
payload = payloads.get(tool, "")
return Proposal.new(
@@ -154,22 +154,6 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
supervise_cli.approve(qp)
self.assertEqual([], read_audit_entries("egress", "dev"))
def test_approve_egress_block_writes_audit_log(self):
qp = self._enqueue(tool=supervise.TOOL_EGRESS_BLOCK)
with patch(
"bot_bottle.cli.supervise.apply_routes_change",
return_value=("routes: []\n", "routes:\n - host: example.com\n"),
) as apply_routes_change:
supervise_cli.approve(qp)
apply_routes_change.assert_called_once_with(
"dev",
"routes:\n - host: example.com\n",
)
entries = read_audit_entries("egress", "dev")
self.assertEqual(1, len(entries))
self.assertEqual(STATUS_APPROVED, entries[0].operator_action)
self.assertEqual("needed for dev", entries[0].justification)
# class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
# # DISABLED — capability_apply functionality is currently commented out.
+5 -45
View File
@@ -54,19 +54,13 @@ class TestValidation(unittest.TestCase):
)
def test_empty_proposed_file_rejected_for_tools_with_file_field(self):
# egress-block has structured input (validated in
# _validate_and_bundle_egress_route, not here) and
# list-egress-routes takes no input. Only capability-block
# goes through `validate_proposed_file`.
with self.assertRaises(_RpcError):
validate_proposed_file(_sv.TOOL_CAPABILITY_BLOCK, " \n\t")
def test_egress_routes_yaml_is_validated(self):
validate_proposed_file(
_sv.TOOL_ALLOW,
"routes:\n - host: example.com\n",
)
def test_invalid_egress_routes_yaml_rejected(self):
with self.assertRaises(_RpcError):
validate_proposed_file(_sv.TOOL_EGRESS_BLOCK, "routes: nope\n")
# --- JSON-RPC parsing ------------------------------------------------------
@@ -147,9 +141,7 @@ class TestHandleToolsList(unittest.TestCase):
names = [t["name"] for t in result["tools"]] # type: ignore[index]
self.assertEqual(
sorted([
_sv.TOOL_ALLOW,
_sv.TOOL_CAPABILITY_BLOCK,
_sv.TOOL_EGRESS_BLOCK,
_sv.TOOL_LIST_EGRESS_ROUTES,
]),
sorted(names),
@@ -180,17 +172,6 @@ class TestHandleToolsList(unittest.TestCase):
# No `required` array because no inputs are required.
self.assertNotIn("required", schema) # type: ignore[operator]
def test_egress_tools_take_routes_yaml_and_justification(self):
for tool_name in (_sv.TOOL_ALLOW, _sv.TOOL_EGRESS_BLOCK):
with self.subTest(tool_name=tool_name):
tool = next(t for t in TOOL_DEFINITIONS if t["name"] == tool_name)
schema = tool["inputSchema"]
self.assertEqual("object", schema["type"]) # type: ignore[index]
self.assertEqual(
["routes_yaml", "justification"],
schema["required"], # type: ignore[index]
)
class TestHandleToolsCall(unittest.TestCase):
def setUp(self):
@@ -239,26 +220,6 @@ class TestHandleToolsCall(unittest.TestCase):
self.assertIn("status: approved", text)
self.assertIn("notes: lgtm", text)
def test_allow_round_trips_through_queue(self):
responder = self._respond_when_proposal_appears(_sv.STATUS_APPROVED, notes="ok")
try:
result = handle_tools_call(
{
"name": _sv.TOOL_ALLOW,
"arguments": {
"routes_yaml": "routes:\n - host: example.com\n",
"justification": "need example.com",
},
},
self.config,
)
finally:
responder.join()
self.assertFalse(result["isError"]) # type: ignore[index]
text = result["content"][0]["text"] # type: ignore[index]
self.assertIn("status: approved", text)
self.assertIn("notes: ok", text)
def test_rejected_response_sets_isError(self):
responder = self._respond_when_proposal_appears(_sv.STATUS_REJECTED, notes="nope")
try:
@@ -451,8 +412,7 @@ class TestHttpEndToEnd(unittest.TestCase):
self.assertEqual(1, result["id"])
names = [t["name"] for t in result["result"]["tools"]] # type: ignore[index]
self.assertIn(_sv.TOOL_CAPABILITY_BLOCK, names)
self.assertIn(_sv.TOOL_ALLOW, names)
self.assertIn(_sv.TOOL_EGRESS_BLOCK, names)
self.assertNotIn("egress-block", names)
def test_unknown_method_returns_jsonrpc_error(self):
result = self._post_jsonrpc(