Compare commits

..

5 Commits

Author SHA1 Message Date
didericis-claude 61e101178d fix(types): resolve pyright errors introduced in #269 changes
lint / lint (push) Successful in 1m47s
test / unit (pull_request) Successful in 30s
test / integration (pull_request) Successful in 16s
- manifest.py: remove unused load_bottle_chain_from_dir import
- manifest_extends.py: drop redundant ManifestEgressRoute annotation
- test_cli_start_selector.py: remove unused call import
- test_cli_tui.py: move Optional/constants to top, annotate FakeScreen,
  remove unused curses import
- test_manifest_bottle_merge.py: add type args to dict, annotate **kwargs
2026-06-25 03:46:32 -04:00
didericis-claude 1ca00d8f30 feat(tui): add reordering to filter_multiselect
Tab switches focus to the selected-order panel; K/J shift the
highlighted item up/down; Space/Enter removes it. The filter list dims
while the order panel is active. Help line updates per focus mode.
2026-06-25 03:46:32 -04:00
didericis-claude dc8978a309 docs(prd): activate PRD for separate agent/bottle selection 2026-06-25 03:46:32 -04:00
didericis-claude c28f3609fc feat(#269): separate agent and bottle selection at launch time
- `bottle:` in agent frontmatter is now optional; agents without it
  are portable and require bottles to be selected at launch.
- Adds `filter_multiselect` to `tui.py`: multi-select picker with
  ordered selection list, Space/Enter to toggle, Ctrl-D to confirm.
- `ManifestIndex` gains `all_bottle_names` and `load_for_agent` accepts
  `bottle_names: tuple[str, ...]` to merge bottles in order at runtime.
- `merge_bottles_runtime` in `manifest_extends.py` applies the same
  field-merge rules as `extends:` to pre-resolved bottle objects.
- `BottleSpec` gains `bottle_names`; `_validate` and `write_launch_metadata`
  thread it through so `resume` replays the same bottle configuration.
- `cmd_start` shows the bottle multiselect after agent selection,
  pre-populated from the agent's `bottle:` field when present.
- Existing agents with `bottle:` declared continue to work unchanged.
2026-06-25 03:46:32 -04:00
didericis-claude 637ab4e89a docs(prd): draft PRD for separate agent/bottle selection
Closes #269.
2026-06-25 03:46:32 -04:00
50 changed files with 637 additions and 1480 deletions
-9
View File
@@ -1,9 +0,0 @@
[run]
branch = True
source = .
[report]
omit =
bot_bottle/egress_addon.py
bot_bottle/cli/tui.py
tests/*
+1 -7
View File
@@ -39,14 +39,8 @@ jobs:
with:
python-version: "3.12"
- name: Install dev requirements
run: python3 -m pip install -r requirements-dev.txt
- name: Run unit tests
run: python3 -m coverage run -m unittest discover -t . -s tests/unit -v
- name: Report unit coverage
run: python3 -m coverage report -m
run: python3 -m unittest discover -t . -s tests/unit -v
integration:
runs-on: ubuntu-latest
+2 -15
View File
@@ -8,7 +8,6 @@ on:
- '**.py'
- '.pylintrc'
- 'pyrightconfig.json'
- '.coveragerc'
workflow_dispatch:
jobs:
@@ -46,19 +45,10 @@ jobs:
echo "errors=$ERRORS" >> $GITHUB_OUTPUT
echo "Pyright errors: $ERRORS"
- name: Run coverage and extract percentage
id: coverage
run: |
python -m coverage run -m unittest discover -t . -s tests/unit > /dev/null 2>&1 || true
PERCENT=$(python -m coverage report 2>/dev/null | grep '^TOTAL' | grep -oP '\d+(?=%)' | tail -1)
echo "percent=$PERCENT" >> $GITHUB_OUTPUT
echo "Coverage: $PERCENT%"
- name: Update badges in README
run: |
PYLINT_SCORE="${{ steps.pylint.outputs.score }}"
PYRIGHT_ERRORS="${{ steps.pyright.outputs.errors }}"
COVERAGE_PERCENT="${{ steps.coverage.outputs.percent }}"
PYLINT_SCORE_ENCODED=$(echo "$PYLINT_SCORE" | sed 's|/|%2F|g')
@@ -68,12 +58,9 @@ jobs:
if [ -n "$PYRIGHT_ERRORS" ]; then
sed -i "s|/badge/pyright-[^)]*|/badge/pyright-${PYRIGHT_ERRORS}%20errors-brightgreen|" README.md
fi
if [ -n "$COVERAGE_PERCENT" ]; then
sed -i "s|/badge/coverage-[^)]*|/badge/coverage-${COVERAGE_PERCENT}%25-brightgreen|" README.md
fi
echo "Updated badges:"
grep -E "pylint|pyright|coverage" README.md | head -3
grep -E "pylint|pyright" README.md | head -2
- name: Commit and push badge updates
run: |
@@ -86,7 +73,7 @@ jobs:
else
echo "Badge changes detected, committing..."
git add README.md
MSG="chore: update quality badges"$'\n\n'"- Pylint: ${{ steps.pylint.outputs.score }}"$'\n'"- Pyright: ${{ steps.pyright.outputs.errors }} errors"$'\n'"- Coverage: ${{ steps.coverage.outputs.percent }}%"$'\n\n'"[skip ci]"
MSG="chore: update quality badges"$'\n\n'"- Pylint: ${{ steps.pylint.outputs.score }}"$'\n'"- Pyright: ${{ steps.pyright.outputs.errors }} errors"$'\n\n'"[skip ci]"
git commit -m "$MSG"
git push
fi
-1
View File
@@ -22,4 +22,3 @@ venv/
.pytest_cache/
.mypy_cache/
.ruff_cache/
.coverage
-1
View File
@@ -7,7 +7,6 @@
[![test](https://gitea.dideric.is/didericis/bot-bottle/actions/workflows/test.yml/badge.svg?branch=main)](https://gitea.dideric.is/didericis/bot-bottle/actions?workflow=test.yml)
[![pylint](https://img.shields.io/badge/pylint-9.93%2F10-brightgreen)](https://github.com/PyCQA/pylint)
[![pyright](https://img.shields.io/badge/pyright-0%20errors-brightgreen)](https://github.com/microsoft/pyright)
[![coverage](https://img.shields.io/badge/coverage-79%25-brightgreen)](https://coverage.readthedocs.io/)
**Problem:** Developer wants to run a coding agent without supervision, but they don't want a prompt injected or misbehaving agent wrecking their environment or exfiltrating sensitive data.
@@ -0,0 +1,211 @@
"""capability_apply — host-side orchestrator for capability-block
remediation (PRD 0016).
On approval of a capability-block proposal, the dashboard calls
apply_capability_change(slug, new_dockerfile) which:
1. Snapshots the agent's transcript dir to
~/.bot-bottle/state/<slug>/transcript/ (best-effort).
2. Pushes the agent's working tree via `git push` (best-effort —
no upstream / no commits / no git repo all skip with a log).
3. Writes the new Dockerfile to
~/.bot-bottle/state/<slug>/Dockerfile (PRD 0016 Phase 1
state). The next `cli.py start <agent>` picks it up.
4. Force-removes the agent container + all sidecars + the
per-bottle networks. Idempotent — missing resources are not
errors.
Returns (before, after) Dockerfile contents so the dashboard can
record / render the diff. (capability-block has no audit log per
PRD 0013 — the per-bottle Dockerfile state is its own record.)
This is "fire-and-forget" from the agent's perspective: by the time
the dashboard writes the response file the supervise sidecar is
gone, so the agent's tool call connection drops without ever
receiving the response. The replacement agent (next manual
`cli.py start`) sees the new Dockerfile and starts from there.
v1 does not auto-relaunch — see PRD 0016's capability-block return
semantics open question.
"""
from __future__ import annotations
import shutil
import subprocess
from ...agent_provider import get_provider
from ...log import info, warn
from ...bottle_state import (
mark_preserved,
per_bottle_dockerfile,
transcript_snapshot_dir,
write_per_bottle_dockerfile,
)
from .sidecar_bundle import sidecar_bundle_container_name
# Agent home inside the container (per the repo Dockerfile's
# `USER node` + `WORKDIR /home/node`). Used to locate the transcript
# dir + the workspace dir for git push.
_AGENT_HOME_IN_CONTAINER = "/home/node"
_AGENT_TRANSCRIPT_IN_CONTAINER = f"{_AGENT_HOME_IN_CONTAINER}/.claude"
_AGENT_WORKSPACE_IN_CONTAINER = f"{_AGENT_HOME_IN_CONTAINER}/workspace"
# Per-bottle resource name patterns (mirroring prepare.py).
def _agent_container_name(slug: str) -> str:
return f"bot-bottle-{slug}"
def _per_bottle_container_names(slug: str) -> list[str]:
"""All container names that belong to this bottle. Missing
containers are silently skipped by the teardown helper, so it's
fine to include names that don't exist for a given bottle."""
return [
_agent_container_name(slug),
sidecar_bundle_container_name(slug),
]
def _per_bottle_network_names(slug: str) -> list[str]:
return [
f"bot-bottle-net-{slug}",
f"bot-bottle-egress-{slug}",
]
class CapabilityApplyError(RuntimeError):
"""Raised when the apply fails in a way that should keep the
proposal pending (so the operator can retry). Best-effort
failures (transcript snapshot, git push) do not raise — they
just log and proceed."""
# --- Public helpers --------------------------------------------------------
def fetch_current_dockerfile(slug: str) -> str:
"""Return the Dockerfile content the next `cli.py start <agent>`
would use for this bottle. If a per-bottle override exists, that
one; otherwise the repo's Dockerfile.
Used by the operator-edit verb to show the current source of
truth, and by apply_capability_change for the before-diff."""
override = per_bottle_dockerfile(slug)
if override is not None:
return override
repo_dockerfile = get_provider("claude").dockerfile
if repo_dockerfile.is_file():
return repo_dockerfile.read_text()
raise CapabilityApplyError(
f"no per-bottle Dockerfile for {slug} and no provider Dockerfile at "
f"{repo_dockerfile}"
)
def apply_capability_change(slug: str, new_dockerfile: str) -> tuple[str, str]:
"""End-to-end capability-block remediation. See module docstring
for the sequence. Returns (before, after) Dockerfile content."""
if not new_dockerfile.strip():
raise CapabilityApplyError("proposed Dockerfile is empty")
before = fetch_current_dockerfile(slug)
snapshot_transcript(slug)
_push_working_tree(slug)
write_per_bottle_dockerfile(slug, new_dockerfile)
# Set the preserve marker BEFORE teardown so cli.py's session-end
# cleanup sees it and keeps the state dir intact for the
# operator's `cli.py resume <identity>`. Without the marker the
# state dir would be deleted as part of normal session end.
mark_preserved(slug)
_teardown_bottle(slug)
return before, new_dockerfile
# --- Internals -------------------------------------------------------------
def snapshot_transcript(slug: str) -> None:
"""`docker cp` /home/node/.claude out of the agent container into
~/.bot-bottle/state/<slug>/transcript/. Best-effort: missing
container, missing dir, or cp error all log a warning and return.
The transcript is what `claude --resume` reads to pick up where
the agent left off.
Called from two places:
- capability-apply, before tearing the bottle down.
- cli.py's session-end path, before the launch context closes,
so a crash or normal exit also leaves a transcript on disk
(deleted along with the state dir on clean exit, kept on
crash or capability-block per the preserve marker)."""
container = _agent_container_name(slug)
dest = transcript_snapshot_dir(slug)
if dest.exists():
# Remove any prior snapshot so the new one is a clean copy.
shutil.rmtree(dest, ignore_errors=True)
dest.parent.mkdir(parents=True, exist_ok=True)
r = subprocess.run(
["docker", "cp", f"{container}:{_AGENT_TRANSCRIPT_IN_CONTAINER}", str(dest)],
capture_output=True, text=True, check=False,
)
if r.returncode != 0:
warn(
f"transcript snapshot skipped "
f"({(r.stderr or '').strip() or 'no transcript dir in container?'})"
)
return
info(f"transcript snapshotted to {dest}")
def _push_working_tree(slug: str) -> None:
"""`docker exec <agent> git push` from /home/node/workspace.
Best-effort: not-a-git-repo, no upstream, nothing-to-push, no
network all log a warning and return. The replacement bottle
will pick up whatever's actually upstream."""
container = _agent_container_name(slug)
r = subprocess.run(
[
"docker", "exec", container, "sh", "-c",
f"cd {_AGENT_WORKSPACE_IN_CONTAINER} && "
f"git rev-parse --is-inside-work-tree >/dev/null 2>&1 && "
f"git push origin HEAD 2>&1 || true",
],
capture_output=True, text=True, check=False,
)
if r.returncode != 0:
warn(
f"capability-apply: git push skipped "
f"({(r.stderr or '').strip() or 'docker exec failed'})"
)
return
output = (r.stdout or "").strip()
if output:
info(f"capability-apply: git push: {output}")
else:
info("capability-apply: git push ran (no output — likely not a git workspace)")
def _teardown_bottle(slug: str) -> None:
"""Force-remove all per-bottle docker resources. Idempotent —
`docker rm -f` / `docker network rm` silently ignore missing
names, so this can be called even mid-rebuild."""
info(f"capability-apply: tearing down bottle {slug}")
for name in _per_bottle_container_names(slug):
subprocess.run(
["docker", "rm", "-f", name],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False,
)
for net in _per_bottle_network_names(slug):
subprocess.run(
["docker", "network", "rm", net],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False,
)
__all__ = [
"CapabilityApplyError",
"apply_capability_change",
"fetch_current_dockerfile",
"snapshot_transcript",
]
+10
View File
@@ -34,6 +34,7 @@ from ...egress import (
from ...git_gate import GIT_GATE_HOSTNAME
from ...log import die, warn
from ...supervise import (
CURRENT_CONFIG_DIR_IN_AGENT,
QUEUE_DIR_IN_CONTAINER,
SUPERVISE_HOSTNAME,
SUPERVISE_PORT,
@@ -232,6 +233,15 @@ def _agent_service(plan: DockerBottlePlan) -> dict[str, Any]:
if plan.use_runsc:
service["runtime"] = "runsc"
volumes: list[dict[str, Any]] = []
if plan.supervise_plan is not None:
volumes.append(_bind(
plan.supervise_plan.current_config_dir,
CURRENT_CONFIG_DIR_IN_AGENT,
))
if volumes:
service["volumes"] = volumes
# The init supervisor inside the bundle owns intra-bundle
# daemon ordering, so the agent only waits for the bundle
# container itself.
@@ -68,11 +68,6 @@ def build_image(ref: str, context: str, *, dockerfile: str = "") -> None:
_ensure_builder_dns()
args = [_CONTAINER, "build", "-t", ref, "--dns", dns_server()]
if dockerfile:
# `container build` resolves -f relative to the current working
# directory, not the build context. Anchor a relative Dockerfile to
# the context so builds work from any cwd.
if not os.path.isabs(dockerfile):
dockerfile = os.path.join(context, dockerfile)
args.extend(["-f", dockerfile])
args.append(context)
subprocess.run(args, check=True)
+16 -10
View File
@@ -1,7 +1,8 @@
"""Per-bottle persistent state.
"""Per-bottle persistent state (PRD 0016).
Holds optional per-bottle Dockerfile overrides, the transcript snapshot
the state-preservation helper saves before teardown, and the launch metadata that lets
Holds the per-bottle Dockerfile override that capability-block
remediation writes, the transcript snapshot the state-preservation
helper saves before teardown, and the launch metadata that lets
`cli.py resume <identity>` reconstruct a bottle's spec. State
lives at:
@@ -60,7 +61,7 @@ _METADATA_NAME = "metadata.json"
_LIVE_CONFIG_SUBDIR = "live-config"
LIVE_CONFIG_ROUTES_NAME = "routes.yaml"
LIVE_CONFIG_ALLOWLIST_NAME = "allowlist"
# Empty marker file. Session preservation writes it before teardown so
# Empty marker file. capability_apply writes it before teardown so
# cli.py's session-end cleanup knows to preserve the state dir for
# `cli.py resume <identity>`. Absent = clean up.
_PRESERVE_MARKER = ".preserve"
@@ -172,7 +173,8 @@ def per_bottle_dockerfile_path(identity: str) -> Path:
def per_bottle_dockerfile(identity: str) -> str | None:
"""Return the per-bottle Dockerfile content if present, else
None. None means: use the provider or manifest Dockerfile."""
None. None means: use the repo's Dockerfile (the original
pre-capability-block behavior)."""
p = per_bottle_dockerfile_path(identity)
if p.is_file():
return p.read_text()
@@ -256,7 +258,9 @@ def write_live_config(
def transcript_snapshot_dir(identity: str) -> Path:
"""Where agent session snapshots are kept for resume flows."""
"""Where capability_apply stashes the agent's transcript before
teardown, so the next `cli.py start <agent>` can offer to
resume from it."""
return bottle_state_dir(identity) / _TRANSCRIPT_SUBDIR
@@ -283,7 +287,8 @@ def git_gate_state_dir(identity: str) -> Path:
def supervise_state_dir(identity: str) -> Path:
"""State subdir reserved for supervise sidecar bind-mount sources.
"""State subdir for the supervise sidecar's current-config dir
(bind-mounted into the agent at /etc/bot-bottle/current-config).
The queue dir is intentionally NOT under here — it lives at
~/.bot-bottle/queue/<slug>/ alongside the audit logs, so it
survives state-dir cleanup."""
@@ -305,8 +310,9 @@ def preserve_marker_path(identity: str) -> Path:
def mark_preserved(identity: str) -> Path:
"""Mark this bottle's state for preservation across session
teardown so cli.py's session-end cleanup leaves the state dir
intact for a subsequent `cli.py resume`."""
teardown. Written by capability_apply.apply_capability_change so
cli.py's session-end cleanup leaves the state dir intact for a
subsequent `cli.py resume`."""
path = preserve_marker_path(identity)
path.parent.mkdir(parents=True, exist_ok=True)
path.touch()
@@ -319,7 +325,7 @@ def is_preserved(identity: str) -> bool:
def clear_preserve_marker(identity: str) -> None:
"""Idempotent removal. Called at fresh launch (start or resume)
so a marker left from a prior preserved session doesn't keep
so a marker left from a prior capability-block doesn't keep
state alive past the next normal session-end."""
try:
preserve_marker_path(identity).unlink()
+3 -2
View File
@@ -13,8 +13,9 @@ dirs are shared layout, so docker is the single owner of that
bucket.
State dirs with `.preserve` are intentionally never touched they
hold preserved sessions the operator may want to `resume`. Manual
`rm -rf ~/.bot-bottle/state/<identity>` is the path for those.
hold capability-block rebuilds or crash snapshots the operator may
want to `resume`. Manual `rm -rf ~/.bot-bottle/state/<identity>`
is the path for those.
"""
from __future__ import annotations
+5 -4
View File
@@ -4,12 +4,13 @@ Reads ~/.bot-bottle/state/<identity>/metadata.json to recover the
(agent_name, cwd, copy_cwd) the bottle was originally started with,
then runs the same launch core as `start` but pinned to the
recorded identity so the new bottle picks up any per-bottle Dockerfile
override and transcript snapshot under the same state dir.
(from capability-block apply) and transcript snapshot under the same
state dir.
Use case: an interrupted or preserved bottle needs to be relaunched;
the operator runs
Use case: an agent calls capability-block, the dashboard approves
and tears down the bottle, the operator runs
./cli.py resume <identity>
to bring up the replacement from the recorded state.
to bring up the replacement with the new capabilities baked in.
"""
from __future__ import annotations
+15 -115
View File
@@ -31,8 +31,9 @@ from ..bottle_state import (
is_preserved,
mark_preserved,
)
# from ..backend.docker.capability_apply import snapshot_transcript
from ..log import info
from ..manifest import Manifest, ManifestIndex
from ..manifest import ManifestIndex
from ._common import PROG, USER_CWD, read_tty_line
from . import tui
@@ -76,19 +77,16 @@ def cmd_start(argv: list[str]) -> int:
# Bottle multiselect: always show after agent selection so operators
# can compose bottles at launch time without editing agent manifests.
available_bottles = manifest.all_bottle_names
lineage_map = _bottle_lineage(manifest)
display_labels = [lineage_map.get(n, n) for n in available_bottles]
label_to_name = {lineage_map.get(n, n): n for n in available_bottles}
initial_bottle = _peek_agent_bottle(manifest, agent_name)
initial_labels = [lineage_map.get(initial_bottle, initial_bottle)] if initial_bottle else []
selected_labels = tui.filter_multiselect(
display_labels,
initial_bottles = [initial_bottle] if initial_bottle else []
selected_bottles = tui.filter_multiselect(
available_bottles,
title="Select bottles",
initial=initial_labels,
initial=initial_bottles,
)
if selected_labels is None:
if selected_bottles is None:
return 0
bottle_names = tuple(label_to_name.get(lbl, lbl) for lbl in selected_labels)
bottle_names = tuple(selected_bottles)
label, color = tui.name_color_modal(default_label=agent_name)
label, color = _resolve_unique_label(label, color)
@@ -265,112 +263,10 @@ def _text_prompt_yes() -> bool:
def _text_render_preflight():
def _render(plan: DockerBottlePlan) -> None:
print(file=sys.stderr)
print(_manifest_to_yaml(plan.manifest), file=sys.stderr)
plan.print()
return _render
def _bottle_lineage(manifest: ManifestIndex) -> dict[str, str]:
"""Return {bottle_name: lineage_label} for bottles that have an extends chain.
Bottles without a parent are omitted (the caller falls back to the bare name).
Labels show the chain root-first: e.g. 'dev -> bot-bottle-dev -> claude-dev'."""
if manifest.home_md is None:
return {}
bottles_dir = manifest.home_md / "bottles"
if not bottles_dir.is_dir():
return {}
from ..yaml_subset import YamlSubsetError, parse_frontmatter
extends_of: dict[str, str] = {}
for path in bottles_dir.glob("*.md"):
try:
fm, _ = parse_frontmatter(path.read_text())
parent = fm.get("extends", "")
if isinstance(parent, str) and parent:
extends_of[path.stem] = parent
except (OSError, YamlSubsetError):
pass
labels: dict[str, str] = {}
for name in extends_of:
chain = [name]
seen = {name}
cur = name
while cur in extends_of:
par = extends_of[cur]
if par in seen:
break
chain.append(par)
seen.add(par)
cur = par
labels[name] = " -> ".join(reversed(chain))
return labels
def _manifest_to_yaml(manifest: Manifest) -> str:
"""Serialize the resolved Manifest to a YAML string for preflight display."""
lines: list[str] = []
agent = manifest.agent
lines.append("agent:")
if agent.skills:
lines.append(" skills:")
for s in agent.skills:
lines.append(f" - {s}")
if not agent.git_user.is_empty():
lines.append(" git-gate:")
lines.append(" user:")
if agent.git_user.name:
lines.append(f" name: {agent.git_user.name}")
if agent.git_user.email:
lines.append(f" email: {agent.git_user.email}")
bottle = manifest.bottle
lines.append("bottle:")
if bottle.agent_provider.template != "claude" or bottle.agent_provider.dockerfile:
lines.append(" agent_provider:")
lines.append(f" template: {bottle.agent_provider.template}")
if bottle.agent_provider.dockerfile:
lines.append(f" dockerfile: {bottle.agent_provider.dockerfile}")
if bottle.env:
lines.append(" env:")
for k, v in sorted(bottle.env.items()):
lines.append(f" {k}: {v}")
has_git_gate = not bottle.git_user.is_empty() or bottle.git
if has_git_gate:
lines.append(" git-gate:")
if not bottle.git_user.is_empty():
lines.append(" user:")
if bottle.git_user.name:
lines.append(f" name: {bottle.git_user.name}")
if bottle.git_user.email:
lines.append(f" email: {bottle.git_user.email}")
if bottle.git:
lines.append(" repos:")
for entry in bottle.git:
lines.append(f" {entry.Name}:")
lines.append(f" url: {entry.Upstream}")
if bottle.egress.routes:
lines.append(" egress:")
lines.append(" routes:")
for r in bottle.egress.routes:
lines.append(f" - host: {r.Host}")
if r.AuthScheme:
lines.append(f" auth:")
lines.append(f" scheme: {r.AuthScheme}")
lines.append(f" supervise: {'true' if bottle.supervise else 'false'}")
return "\n".join(lines)
def _launch_bottle(
spec: BottleSpec,
*,
@@ -408,8 +304,12 @@ def _launch_bottle(
)
# While the container is still alive: always snapshot the
# transcript and — if the agent exited non-zero — mark
# the state for preservation. This picks up crashes /
# Ctrl-Cs / OOM kills before cleanup removes the state dir.
# the state for preservation. Capability-block already
# did both before triggering teardown from the dashboard;
# this picks up crashes / Ctrl-Cs / OOM kills the same
# way. snapshot_transcript is best-effort so the
# capability-block path's prior snapshot isn't clobbered
# when the container is already gone.
if agent_provider_template == "claude":
capture_claude_session_state(identity, exit_code)
return 0
+36 -9
View File
@@ -2,8 +2,9 @@
act on them (approve / modify / reject).
Curses-based TUI; modify-then-approve shells out to $EDITOR. The
Egress proposals are queued for operator review as full routes.yaml
updates.
approval handler wires to PRD 0016 (capability-block), which rebuilds
the bottle Dockerfile. Egress proposals are queued for operator review
as full routes.yaml updates.
"""
from __future__ import annotations
@@ -21,6 +22,10 @@ from pathlib import Path
from .. import supervise as _supervise
from ..bottle_state import read_metadata
# from ..backend.docker.capability_apply import (
# CapabilityApplyError,
# apply_capability_change,
# )
from ..backend.docker.egress_apply import (
EgressApplyError,
applicator as _docker_applicator,
@@ -33,6 +38,10 @@ from ..backend.smolmachines.egress_apply import (
)
from ..log import Die, error, info
class CapabilityApplyError(RuntimeError):
"""Placeholder while capability_apply is disabled."""
from ..supervise import (
COMPONENT_FOR_TOOL,
AuditEntry,
@@ -41,10 +50,12 @@ from ..supervise import (
STATUS_APPROVED,
STATUS_MODIFIED,
STATUS_REJECTED,
TOOL_CAPABILITY_BLOCK,
TOOL_EGRESS_ALLOW,
TOOL_EGRESS_BLOCK,
TOOL_GITLEAKS_ALLOW,
TOOL_EGRESS_TOKEN_ALLOW,
archive_proposal,
list_pending_proposals,
render_diff,
write_audit_entry,
@@ -72,7 +83,7 @@ class QueuedProposal:
# Errors any remediation engine may raise. Caught by the TUI key
# handlers and surfaced in the status line so a failed apply keeps
# the proposal pending rather than crashing curses.
ApplyError = (EgressApplyError,)
ApplyError = (CapabilityApplyError, EgressApplyError)
def apply_routes_change(slug: str, content: str) -> tuple[str, str]:
@@ -132,6 +143,8 @@ def _detail_lines(
def _suffix_for_tool(tool: str) -> str:
if tool == TOOL_CAPABILITY_BLOCK:
return ".dockerfile"
if tool in (TOOL_EGRESS_ALLOW, TOOL_EGRESS_BLOCK):
return ".yaml"
if tool in (TOOL_GITLEAKS_ALLOW, TOOL_EGRESS_TOKEN_ALLOW):
@@ -153,6 +166,17 @@ def approve(
file_to_apply = final_file if final_file is not None else qp.proposal.proposed_file
diff_before, diff_after = "", ""
# if qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
# _meta = read_metadata(qp.proposal.bottle_slug)
# if _meta is not None and not _meta.compose_project:
# raise CapabilityApplyError(
# "capability-block remediation is not supported for smolmachines "
# "bottles. Reject this proposal or handle the capability change "
# "manually, then restart the bottle."
# )
# diff_before, diff_after = apply_capability_change(
# qp.proposal.bottle_slug, file_to_apply,
# )
if qp.proposal.tool in (TOOL_EGRESS_ALLOW, TOOL_EGRESS_BLOCK):
diff_before, diff_after = apply_routes_change(
qp.proposal.bottle_slug,
@@ -170,6 +194,9 @@ def approve(
qp, action=status, notes=notes,
diff_before=diff_before, diff_after=diff_after,
)
if qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
archive_proposal(qp.queue_dir, qp.proposal.id)
def reject(qp: QueuedProposal, *, reason: str) -> None:
"""Write a rejection response and an audit entry."""
@@ -319,7 +346,7 @@ def _list_once() -> int:
return 0
def _try_init_green() -> int: # pragma: no cover
def _try_init_green() -> int:
"""Initialise a green color pair and return its attr, or 0."""
try:
curses.start_color()
@@ -330,7 +357,7 @@ def _try_init_green() -> int: # pragma: no cover
return 0
def _main_loop(stdscr: "curses._CursesWindow") -> None: # type: ignore # pragma: no cover
def _main_loop(stdscr: "curses._CursesWindow") -> None: # type: ignore
curses.curs_set(0)
stdscr.timeout(_REFRESH_INTERVAL_MS)
green_attr = _try_init_green()
@@ -420,7 +447,7 @@ def _render(
status_line: str,
*,
green_attr: int = 0, # noqa: F841 — unused, but required by interface
) -> None: # pragma: no cover
) -> None:
stdscr.erase()
h, w = stdscr.getmaxyx()
header = f"bot-bottle supervise ({len(pending)} pending)"
@@ -471,7 +498,7 @@ def _detail_view(
qp: QueuedProposal,
*,
green_attr: int = 0,
) -> None: # pragma: no cover
) -> None:
"""Render the full proposal. Scrollable. Press q to return."""
lines = _detail_lines(qp, green_attr=green_attr)
offset = 0
@@ -523,7 +550,7 @@ def _detail_view(
return
def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None: # type: ignore # pragma: no cover
def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None: # type: ignore
"""Suspend curses, open $EDITOR on the proposed file, return edited content."""
suffix = _suffix_for_tool(qp.proposal.tool)
curses.endwin()
@@ -534,7 +561,7 @@ def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None:
return edited
def _prompt(stdscr: "curses._CursesWindow", label: str) -> str: # type: ignore # pragma: no cover
def _prompt(stdscr: "curses._CursesWindow", label: str) -> str: # type: ignore
"""One-line input at the bottom of the screen."""
curses.curs_set(1)
h, _ = stdscr.getmaxyx()
+3 -7
View File
@@ -29,8 +29,7 @@ def filter_multiselect(
Returns the ordered list of selected items, or ``None`` if the user
cancelled (Esc / ``q`` / Ctrl-C / Ctrl-D with no items).
Press Space to toggle the item under the cursor.
Press Enter to confirm the current selection.
Press Space or Enter to toggle the item under the cursor.
Press Ctrl-D to confirm the current selection (returns even if empty).
Press Esc/q to cancel (returns None).
@@ -357,10 +356,7 @@ def _multiselect_loop(
continue
if focus == "filter":
if key in (curses.KEY_ENTER, _KEY_ENTER_ALT, ord("\r")):
return list(selected)
elif key == _KEY_SPACE:
if key in (curses.KEY_ENTER, _KEY_ENTER_ALT, ord("\r"), _KEY_SPACE):
if filtered:
item = filtered[cursor]
if item in selected:
@@ -504,7 +500,7 @@ def _render_multiselect(
row += 1
if focus == "filter":
help_line = "[↑↓/jk] move [Space] toggle [Enter] confirm [Tab] reorder [Esc/q] cancel"
help_line = "[↑↓/jk] move [Space/Enter] toggle [Tab] reorder [Ctrl-D] done [Esc/q] cancel"
else:
help_line = "[↑↓/jk] cursor [K/J] reorder [Space/Enter] remove [Tab] back [Ctrl-D] done"
if row < rows:
+1 -1
View File
@@ -21,7 +21,7 @@ FROM node:22-slim
# to it) works against egress's bumped TLS without the agent needing
# local DNS.
RUN apt-get update \
&& apt-get install -y --no-install-recommends git ca-certificates curl ripgrep \
&& apt-get install -y --no-install-recommends git ca-certificates curl \
&& rm -rf /var/lib/apt/lists/*
# App-specific deps. Python isn't required by claude-code itself
+1 -1
View File
@@ -6,7 +6,7 @@
FROM node:22-slim
RUN apt-get update \
&& apt-get install -y --no-install-recommends git ca-certificates curl procps ripgrep \
&& apt-get install -y --no-install-recommends git ca-certificates curl procps \
&& rm -rf /var/lib/apt/lists/*
# App-specific deps. Python isn't required by codex itself
@@ -21,11 +21,6 @@ from pathlib import Path
from ...deploy_key_provisioner import DeployKeyCollisionError, DeployKeyProvisioner
# Timeout for ssh-keygen and Gitea API HTTP calls. A hung Gitea instance at
# prepare time would stall bottle launch indefinitely without this bound.
_API_TIMEOUT_SECS = 30
_KEYGEN_TIMEOUT_SECS = 10
class GiteaDeployKeyProvisioner(DeployKeyProvisioner):
"""Manages deploy keys on a Gitea instance."""
@@ -51,7 +46,6 @@ class GiteaDeployKeyProvisioner(DeployKeyProvisioner):
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=_KEYGEN_TIMEOUT_SECS,
)
private_key = key_path.read_bytes()
public_key = key_path.with_suffix(".pub").read_text().strip()
@@ -73,7 +67,7 @@ class GiteaDeployKeyProvisioner(DeployKeyProvisioner):
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=_API_TIMEOUT_SECS) as resp:
with urllib.request.urlopen(req) as resp:
body = json.loads(resp.read())
except urllib.error.HTTPError as exc:
_body = _read_error_body(exc)
@@ -104,7 +98,7 @@ class GiteaDeployKeyProvisioner(DeployKeyProvisioner):
method="DELETE",
)
try:
with urllib.request.urlopen(req, timeout=_API_TIMEOUT_SECS):
with urllib.request.urlopen(req):
pass
except urllib.error.HTTPError as exc:
if exc.code == 404:
+10 -21
View File
@@ -210,17 +210,6 @@ def egress_token_env_map(
return out
def _yaml_str_escape(s: str) -> str:
"""Escape a string for use inside a YAML double-quoted scalar."""
return (
s.replace("\\", "\\\\")
.replace('"', '\\"')
.replace("\n", "\\n")
.replace("\r", "\\r")
.replace("\t", "\\t")
)
def _route_to_yaml_fields(r: Route) -> dict[str, object]:
fields: dict[str, object] = {"host": r.host}
if r.auth_scheme and r.token_env:
@@ -283,12 +272,12 @@ def _render_match_entry(entry: dict[str, object]) -> list[str]:
for pd in entry["paths"]: # type: ignore[union-attr]
pd_dict: dict[str, str] = pd # type: ignore[assignment]
if "type" in pd_dict:
lines.append(f' - type: "{_yaml_str_escape(pd_dict["type"])}"')
lines.append(f' value: "{_yaml_str_escape(pd_dict["value"])}"')
lines.append(f' - type: "{pd_dict["type"]}"')
lines.append(f' value: "{pd_dict["value"]}"')
else:
lines.append(f' - value: "{_yaml_str_escape(pd_dict["value"])}"')
lines.append(f' - value: "{pd_dict["value"]}"')
if "methods" in entry:
methods_str = ", ".join(f'"{_yaml_str_escape(m)}"' for m in entry["methods"]) # type: ignore[union-attr]
methods_str = ", ".join(f'"{m}"' for m in entry["methods"]) # type: ignore[union-attr]
prefix = " - " if first_key else " "
lines.append(f'{prefix}methods: [{methods_str}]')
first_key = False
@@ -298,8 +287,8 @@ def _render_match_entry(entry: dict[str, object]) -> list[str]:
first_key = False
for hd in entry["headers"]: # type: ignore[union-attr]
hd_dict: dict[str, str] = hd # type: ignore[assignment]
lines.append(f' - name: "{_yaml_str_escape(hd_dict["name"])}"')
lines.append(f' value: "{_yaml_str_escape(hd_dict["value"])}"')
lines.append(f' - name: "{hd_dict["name"]}"')
lines.append(f' value: "{hd_dict["value"]}"')
if first_key:
lines.append(" - {}")
return lines
@@ -319,10 +308,10 @@ def egress_render_routes(
return "\n".join(lines) + "\n"
for r in routes:
f = _route_to_yaml_fields(r)
lines.append(f' - host: "{_yaml_str_escape(str(f["host"]))}"')
lines.append(f' - host: "{f["host"]}"')
if "auth_scheme" in f:
lines.append(f' auth_scheme: "{_yaml_str_escape(str(f["auth_scheme"]))}"')
lines.append(f' token_env: "{_yaml_str_escape(str(f["token_env"]))}"')
lines.append(f' auth_scheme: "{f["auth_scheme"]}"')
lines.append(f' token_env: "{f["token_env"]}"')
if "matches" in f:
lines.append(" matches:")
for entry in f["matches"]: # type: ignore[union-attr]
@@ -342,7 +331,7 @@ def egress_render_routes(
items_str = ", ".join(f'"{x}"' for x in dv)
lines.append(f" {dk}: [{items_str}]")
elif isinstance(dv, str):
lines.append(f' {dk}: "{_yaml_str_escape(dv)}"')
lines.append(f' {dk}: "{dv}"')
return "\n".join(lines) + "\n"
+6 -17
View File
@@ -43,10 +43,10 @@ from .manifest import ManifestBottle, ManifestGitEntry
# Short network alias for git-gate inside the sidecar bundle. The
# agent's `.gitconfig` insteadOf rewrites resolve through this name.
GIT_GATE_HOSTNAME = "git-gate"
# Shared timeout (seconds) for all git-gate subprocess and CGI calls:
# git daemon (--timeout/--init-timeout), the access-hook subprocess in
# git_http_backend, and the git http-backend CGI subprocess.
GIT_GATE_TIMEOUT_SECS = 15
# Bound half-open git client sessions. If an agent/tool runner is
# interrupted during push, git daemon should reap the receive-pack
# child instead of keeping the gate wedged indefinitely.
GIT_GATE_DAEMON_TIMEOUT_SECS = 15
@dataclass(frozen=True)
@@ -112,15 +112,6 @@ def git_gate_upstreams_for_bottle(bottle: ManifestBottle) -> tuple[GitGateUpstre
)
def _gitconfig_validate_value(field: str, value: str) -> None:
"""Raise ValueError if value contains characters that break gitconfig line syntax."""
if "\n" in value or "\r" in value:
raise ValueError(
f"git-gate: {field} contains a newline, which would inject "
f"arbitrary gitconfig keys; rejecting manifest entry"
)
def git_gate_render_gitconfig(
entries: tuple[ManifestGitEntry, ...], gate_host: str, *, scheme: str = "git",
) -> str:
@@ -145,7 +136,6 @@ def git_gate_render_gitconfig(
"# fetch-from-upstream-before-every-upload-pack via access-hook).\n",
]
for entry in entries:
_gitconfig_validate_value(f"repos[{entry.Name!r}].url", entry.Upstream)
out.append(f'[url "{scheme}://{gate_host}/{entry.Name}.git"]\n')
out.append(f"\tinsteadOf = {entry.Upstream}\n")
if entry.RemoteKey and entry.RemoteKey != entry.UpstreamHost:
@@ -158,7 +148,6 @@ def git_gate_render_gitconfig(
f"ssh://{entry.UpstreamUser}@{entry.RemoteKey}{port}/"
f"{entry.UpstreamPath}"
)
_gitconfig_validate_value(f"repos[{entry.Name!r}].url (resolved alias)", alias)
out.append(f"\tinsteadOf = {alias}\n")
return "".join(out)
@@ -228,8 +217,8 @@ def git_gate_render_entrypoint(upstreams: tuple[GitGateUpstream, ...]) -> str:
"",
"exec git daemon \\",
" --reuseaddr \\",
f" --timeout={GIT_GATE_TIMEOUT_SECS} \\",
f" --init-timeout={GIT_GATE_TIMEOUT_SECS} \\",
f" --timeout={GIT_GATE_DAEMON_TIMEOUT_SECS} \\",
f" --init-timeout={GIT_GATE_DAEMON_TIMEOUT_SECS} \\",
" --base-path=/git \\",
" --export-all \\",
" --enable=receive-pack \\",
+1 -11
View File
@@ -16,8 +16,6 @@ from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from urllib.parse import urlsplit
from .git_gate import GIT_GATE_TIMEOUT_SECS
DEFAULT_PORT = 9420
@@ -49,7 +47,6 @@ class GitHttpHandler(BaseHTTPRequestHandler):
[hook_path, "upload-pack", str(repo_dir), peer, peer],
capture_output=True,
check=False,
timeout=GIT_GATE_TIMEOUT_SECS,
)
if hook.returncode != 0:
detail = (hook.stderr or hook.stdout).decode(
@@ -113,7 +110,6 @@ class GitHttpHandler(BaseHTTPRequestHandler):
env=env,
capture_output=True,
check=False,
timeout=GIT_GATE_TIMEOUT_SECS,
)
self._write_cgi_response(proc.stdout)
@@ -152,13 +148,7 @@ class GitHttpHandler(BaseHTTPRequestHandler):
key, _, value = line.decode("latin1").partition(":")
value = value.strip()
if key.lower() == "status":
try:
status = int(value.split()[0])
except (ValueError, IndexError):
self.log_message(
"malformed CGI Status header %r; using 500", value,
)
status = 500
status = int(value.split()[0])
else:
headers.append((key, value))
self.send_response(status)
+4 -2
View File
@@ -113,8 +113,10 @@ class ManifestBottle:
egress: ManifestEgressConfig = field(default_factory=ManifestEgressConfig)
# Per-bottle stuck-recovery sidecar (PRD 0013). When true (the
# default, issue #249), the launch step brings up a supervise
# sidecar that exposes egress MCP tools to the agent. Set
# `supervise: false` to skip the sidecar.
# sidecar that exposes MCP tools to the agent (egress-block,
# capability-block) plus mounts the current-config dir read-only
# into the agent at /etc/bot-bottle/current-config. Set
# `supervise: false` to skip the sidecar and mount.
supervise: bool = True
@classmethod
+18 -110
View File
@@ -101,125 +101,33 @@ def _resolve_one_bottle(
repos_cache[name] = _resolve_repos_raw({}, child_raw)
return bottle
# Normalize to list, accepting both str and list[str].
raw_list: list[object]
if isinstance(parent_name_raw, str):
raw_list = [parent_name_raw]
elif isinstance(parent_name_raw, list):
raw_list = parent_name_raw
else:
if not isinstance(parent_name_raw, str):
raise ManifestError(
f"bottle '{name}' extends must be a string or list of strings "
f"bottle '{name}' extends must be a string "
f"(was {type(parent_name_raw).__name__})"
)
# Validate each entry before resolving any of them.
parent_names: list[str] = []
for i, pname in enumerate(raw_list):
if not isinstance(pname, str):
raise ManifestError(
f"bottle '{name}' extends[{i}] must be a string "
f"(was {type(pname).__name__})"
)
parent_names.append(pname)
if pname == name:
raise ManifestError(
f"bottle '{name}' extends itself; remove the self-reference"
)
if pname not in raws:
avail = ", ".join(sorted(raws.keys())) or "(none)"
raise ManifestError(
f"bottle '{name}' extends '{pname}' which is not "
f"defined. Available bottles: {avail}"
)
combined_parent, combined_repos_raw = _fold_parents(
parent_names, raws, cache, repos_cache, seen + (name,)
parent_name: str = parent_name_raw
if parent_name == name:
raise ManifestError(
f"bottle '{name}' extends itself; remove the "
f"self-reference"
)
if parent_name not in raws:
avail = ", ".join(sorted(raws.keys())) or "(none)"
raise ManifestError(
f"bottle '{name}' extends '{parent_name}' which is not "
f"defined. Available bottles: {avail}"
)
parent = _resolve_one_bottle(
parent_name, raws, cache, repos_cache, seen + (name,)
)
merged_repos_raw = _resolve_repos_raw(combined_repos_raw, child_raw)
bottle = _merge_bottles(combined_parent, child_raw, merged_repos_raw, name)
merged_repos_raw = _resolve_repos_raw(repos_cache[parent_name], child_raw)
bottle = _merge_bottles(parent, child_raw, merged_repos_raw, name)
cache[name] = bottle
repos_cache[name] = merged_repos_raw
return bottle
def _fold_parents(
parent_names: list[str],
raws: dict[str, dict[str, object]],
cache: dict[str, ManifestBottle],
repos_cache: dict[str, dict[str, object]],
seen: tuple[str, ...],
) -> tuple[ManifestBottle, dict[str, object]]:
"""Resolve each parent and fold them left-to-right.
Later parents win over earlier ones on conflict. The `seen` tuple
carries the current bottle's name so cycle detection works across
every parent edge in the multi-parent graph."""
first = parent_names[0]
effective = _resolve_one_bottle(first, raws, cache, repos_cache, seen)
effective_repos_raw = repos_cache[first]
for pname in parent_names[1:]:
later = _resolve_one_bottle(pname, raws, cache, repos_cache, seen)
later_repos_raw = repos_cache[pname]
effective, effective_repos_raw = _fold_two_bottles(
effective, effective_repos_raw, later, later_repos_raw
)
return effective, effective_repos_raw
def _fold_two_bottles(
earlier: ManifestBottle,
earlier_repos_raw: dict[str, object],
later: ManifestBottle,
later_repos_raw: dict[str, object],
) -> tuple[ManifestBottle, dict[str, object]]:
"""Combine two resolved parent bottles; later wins over earlier."""
from .manifest import ManifestBottle, ManifestGitUser
from .manifest_egress import ManifestEgressConfig
from .manifest_git import parse_git_gate_config
from .manifest_util import as_json_object
merged_env = {**earlier.env, **later.env}
merged_git_user = ManifestGitUser(
name=later.git_user.name or earlier.git_user.name,
email=later.git_user.email or earlier.git_user.email,
)
# Repos: union by name; for same-name entries, later wins per-field.
# Unlike _resolve_repos_raw, an empty later_repos_raw means "no repos
# declared" — it does NOT clear the earlier parent's repos.
names = list(earlier_repos_raw) + [
n for n in later_repos_raw if n not in earlier_repos_raw
]
merged_repos_raw: dict[str, object] = {
n: {
**as_json_object(earlier_repos_raw.get(n, {}), "earlier parent repo"),
**as_json_object(later_repos_raw.get(n, {}), "later parent repo"),
}
for n in names
}
if merged_repos_raw:
merged_git, _ = parse_git_gate_config("_fold", {"repos": merged_repos_raw})
else:
merged_git = ()
# Egress: routes concatenate; scalar fields use last-wins.
merged_egress = ManifestEgressConfig(
routes=earlier.egress.routes + later.egress.routes,
Log=later.egress.Log,
)
return ManifestBottle(
env=merged_env,
agent_provider=later.agent_provider,
git=merged_git,
git_user=merged_git_user,
egress=merged_egress,
supervise=later.supervise,
), merged_repos_raw
def _merge_bottles(
parent: ManifestBottle,
child_raw: dict[str, object],
-2
View File
@@ -106,7 +106,5 @@ def load_bottle_chain_from_dir(
parent = fm.get("extends")
if isinstance(parent, str):
to_load.append(parent)
elif isinstance(parent, list):
to_load.extend(p for p in parent if isinstance(p, str))
return resolve_bottles(raws)[bottle_name]
+42 -10
View File
@@ -2,10 +2,11 @@
The supervise plane is the per-bottle MCP sidecar plus its host-side
queue/audit support. The sidecar (bot_bottle.supervise_server)
sits on the bottle's internal network and exposes MCP tools the agent
calls when it needs an operator-reviewed egress change:
sits on the bottle's internal network and exposes three MCP tools the
agent calls when it hits a stuck-recovery category:
* egress-block / allow agent proposes a new routes.yaml
* capability-block agent proposes a new agent Dockerfile
Each tool call: the agent passes the full proposed file plus a
justification text. The sidecar validates the proposal syntactically,
@@ -47,6 +48,7 @@ from pathlib import Path
SUPERVISE_HOSTNAME = "supervise"
SUPERVISE_PORT = 9100
TOOL_CAPABILITY_BLOCK = "capability-block"
TOOL_EGRESS_BLOCK = "egress-block"
TOOL_EGRESS_ALLOW = "egress-allow"
TOOL_GITLEAKS_ALLOW = "gitleaks-allow"
@@ -56,6 +58,7 @@ TOOL_EGRESS_TOKEN_ALLOW = "egress-token-allow"
TOOL_LIST_EGRESS_ROUTES = "list-egress-routes"
TOOLS: tuple[str, ...] = (
TOOL_EGRESS_ALLOW,
TOOL_CAPABILITY_BLOCK,
TOOL_EGRESS_BLOCK,
TOOL_GITLEAKS_ALLOW,
TOOL_EGRESS_TOKEN_ALLOW,
@@ -72,6 +75,10 @@ TOOLS: tuple[str, ...] = (
EGRESS_FORWARD_PROXY = "http://127.0.0.1:9099"
EGRESS_INTROSPECT_URL = "http://_egress.local/allowlist"
# capability-block has no on-disk config the operator edits in place
# (the Dockerfile is rebuilt, not patched), so it has no audit log
# here — those changes are captured by git history + the rebuild record
# laid down in PRD 0016.
COMPONENT_FOR_TOOL: dict[str, str] = {
TOOL_EGRESS_ALLOW: "egress",
TOOL_EGRESS_BLOCK: "egress",
@@ -87,6 +94,8 @@ STATUSES: tuple[str, ...] = (STATUS_APPROVED, STATUS_MODIFIED, STATUS_REJECTED)
ACTION_OPERATOR_EDIT = "operator-edit"
QUEUE_DIR_IN_CONTAINER = "/run/supervise/queue"
CURRENT_CONFIG_DIR_IN_AGENT = "/etc/bot-bottle/current-config"
DEFAULT_POLL_INTERVAL_SEC = 0.5
@@ -429,39 +438,59 @@ def sha256_hex(content: str) -> str:
# --- Sidecar plan + abstract lifecycle -------------------------------------
# Filename of the staged Dockerfile inside the agent's read-only
# current-config mount. The capability-block tool's description
# points the agent at this exact path so it can read the current
# Dockerfile and propose modifications.
#
# routes.yaml + allowlist used to live here too; PRD 0017 chunk 3
# moved them behind the `list-egress-routes` MCP tool (live state
# from egress's introspection endpoint) so the agent always sees
# current data rather than a launch-time snapshot.
CURRENT_CONFIG_DOCKERFILE = "Dockerfile"
@dataclass(frozen=True)
class SupervisePlan:
"""Output of Supervise.prepare; consumed by .start.
`queue_dir` is the host directory bind-mounted into the sidecar
at /run/supervise/queue. `internal_network` is empty at prepare
time; the backend's launch step fills it via dataclasses.replace
before calling .start."""
at /run/supervise/queue. `current_config_dir` is the host
directory bind-mounted (read-only) into the *agent* container
at /etc/bot-bottle/current-config currently holds only the
Dockerfile snapshot (routes.yaml + allowlist moved to the
`list-egress-routes` MCP tool). `internal_network` is
empty at prepare time; the backend's launch step fills it via
dataclasses.replace before calling .start."""
slug: str
queue_dir: Path
current_config_dir: Path
internal_network: str = ""
class Supervise(ABC):
"""Per-bottle supervise sidecar. Encapsulates the host-side
prepare (queue dir staging); the sidecar's start/stop lifecycle
is backend-specific."""
prepare (queue dir + current-config staging); the sidecar's
start/stop lifecycle is backend-specific."""
def prepare(
self,
slug: str,
stage_dir: Path,
) -> SupervisePlan:
"""Stage the per-bottle queue dir on the host. Returns the
plan; `internal_network` must be set by the launch step before
"""Stage the per-bottle queue dir on the host and the
current-config dir under `stage_dir`. Returns the plan;
`internal_network` must be set by the launch step before
.start runs."""
del stage_dir
queue_dir = queue_dir_for_slug(slug)
queue_dir.mkdir(parents=True, exist_ok=True)
current_config_dir = stage_dir / "current-config"
current_config_dir.mkdir(parents=True, exist_ok=True)
return SupervisePlan(
slug=slug,
queue_dir=queue_dir,
current_config_dir=current_config_dir,
)
# --- Helpers ---------------------------------------------------------------
@@ -512,6 +541,8 @@ __all__ = [
"ACTION_OPERATOR_EDIT",
"AuditEntry",
"COMPONENT_FOR_TOOL",
"CURRENT_CONFIG_DIR_IN_AGENT",
"CURRENT_CONFIG_DOCKERFILE",
"DEFAULT_POLL_INTERVAL_SEC",
"Proposal",
"QUEUE_DIR_IN_CONTAINER",
@@ -527,6 +558,7 @@ __all__ = [
"TOOLS",
"EGRESS_FORWARD_PROXY",
"EGRESS_INTROSPECT_URL",
"TOOL_CAPABILITY_BLOCK",
"TOOL_EGRESS_ALLOW",
"TOOL_EGRESS_BLOCK",
"TOOL_GITLEAKS_ALLOW",
+61 -52
View File
@@ -1,8 +1,8 @@
"""Supervise sidecar HTTP server (PRD 0013).
Per-bottle MCP server exposing tools the agent calls to propose egress
config changes when stuck. The tools are `egress-allow`,
`egress-block`, and `list-egress-routes`.
Per-bottle MCP server exposing tools the agent calls to propose config
changes when stuck. The tools are `allow`, `egress-block`,
`capability-block`, and `list-egress-routes`.
Each queued tool call:
@@ -90,19 +90,19 @@ def parse_jsonrpc(body: bytes) -> JsonRpcRequest:
try:
raw = json.loads(body)
except json.JSONDecodeError as e:
raise _RpcClientError(ERR_PARSE, f"parse error: {e}") from e
raise _RpcError(ERR_PARSE, f"parse error: {e}") from e
if not isinstance(raw, dict):
raise _RpcClientError(ERR_INVALID_REQUEST, "request must be a JSON object")
raise _RpcError(ERR_INVALID_REQUEST, "request must be a JSON object")
if raw.get("jsonrpc") != JSONRPC_VERSION:
raise _RpcClientError(ERR_INVALID_REQUEST, "jsonrpc field must be '2.0'")
raise _RpcError(ERR_INVALID_REQUEST, "jsonrpc field must be '2.0'")
method = raw.get("method")
if not isinstance(method, str):
raise _RpcClientError(ERR_INVALID_REQUEST, "method must be a string")
raise _RpcError(ERR_INVALID_REQUEST, "method must be a string")
params = raw.get("params", {})
if params is None:
params = {}
if not isinstance(params, dict):
raise _RpcClientError(ERR_INVALID_PARAMS, "params must be an object")
raise _RpcError(ERR_INVALID_PARAMS, "params must be an object")
rpc_id = raw.get("id", _NO_ID)
is_notification = rpc_id is _NO_ID
return JsonRpcRequest(
@@ -117,23 +117,12 @@ _NO_ID = object()
class _RpcError(Exception):
"""Base class for all typed RPC errors that surface as JSON-RPC error responses."""
def __init__(self, code: int, message: str):
super().__init__(message)
self.code = code
self.message = message
class _RpcClientError(_RpcError):
"""Caller sent a bad request; returned verbatim, no server-side logging."""
class _RpcInternalError(_RpcError):
"""Server-side fault; logged at ERROR with cause, always returns ERR_INTERNAL."""
def __init__(self, message: str) -> None:
super().__init__(ERR_INTERNAL, message)
def jsonrpc_result(request_id: object, result: object) -> bytes:
payload = {"jsonrpc": JSONRPC_VERSION, "id": request_id, "result": result}
return (json.dumps(payload) + "\n").encode("utf-8")
@@ -253,6 +242,34 @@ TOOL_DEFINITIONS: list[dict[str, object]] = [
"required": ["routes_yaml", "justification"],
},
},
{
"name": _sv.TOOL_CAPABILITY_BLOCK,
"description": (
"Call when the bottle is missing a tool, skill, permission, "
"or env var you need — something that lives in the agent "
"Dockerfile rather than in the egress routes. "
"Read the current Dockerfile from "
"/etc/bot-bottle/current-config/Dockerfile, compose a "
"modified version, and pass the full new file plus a "
"justification. On approval the supervisor rebuilds the "
"bottle from the new Dockerfile and starts a replacement on "
"the same branch (wired in PRD 0016; v1 acknowledges only)."
),
"inputSchema": {
"type": "object",
"properties": {
"dockerfile": {
"type": "string",
"description": "Full proposed Dockerfile content.",
},
"justification": {
"type": "string",
"description": "Why this capability is needed.",
},
},
"required": ["dockerfile", "justification"],
},
},
]
@@ -260,6 +277,7 @@ TOOL_DEFINITIONS: list[dict[str, object]] = [
# payload (stored in Proposal.proposed_file).
PROPOSED_FILE_FIELD: dict[str, str] = {
_sv.TOOL_EGRESS_ALLOW: "routes_yaml",
_sv.TOOL_CAPABILITY_BLOCK: "dockerfile",
_sv.TOOL_EGRESS_BLOCK: "routes_yaml",
}
@@ -272,22 +290,26 @@ def validate_proposed_file(tool: str, content: str) -> None:
catches obvious paste-errors / wrong-tool selections before they
enter the queue."""
if not content.strip():
raise _RpcClientError(ERR_INVALID_PARAMS, f"{tool}: proposed file is empty")
if tool in (_sv.TOOL_EGRESS_ALLOW, _sv.TOOL_EGRESS_BLOCK):
raise _RpcError(ERR_INVALID_PARAMS, f"{tool}: proposed file is empty")
if tool == _sv.TOOL_CAPABILITY_BLOCK:
# Dockerfiles are too varied to validate syntactically beyond
# non-empty. The operator reads the diff in the TUI.
pass
elif tool in (_sv.TOOL_EGRESS_ALLOW, _sv.TOOL_EGRESS_BLOCK):
try:
config = load_config(content)
except ValueError as e:
raise _RpcClientError(
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: proposed routes.yaml is not valid: {e}",
) from e
if config.log != LOG_OFF:
raise _RpcClientError(
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: proposed routes.yaml must not change egress logging",
)
else:
raise _RpcClientError(ERR_INVALID_PARAMS, f"unknown tool {tool!r}")
raise _RpcError(ERR_INVALID_PARAMS, f"unknown tool {tool!r}")
# --- MCP handlers ----------------------------------------------------------
@@ -360,17 +382,17 @@ def handle_tools_call(
doesn't need operator approval."""
name = params.get("name")
if not isinstance(name, str):
raise _RpcClientError(ERR_INVALID_PARAMS, "tools/call missing 'name'")
raise _RpcError(ERR_INVALID_PARAMS, "tools/call missing 'name'")
if name == _sv.TOOL_LIST_EGRESS_ROUTES:
return handle_list_egress_routes(typing.cast(dict[str, object], params.get("arguments", {})), config)
args_raw = params.get("arguments", {})
if not isinstance(args_raw, dict):
raise _RpcClientError(ERR_INVALID_PARAMS, "tools/call 'arguments' must be an object")
raise _RpcError(ERR_INVALID_PARAMS, "tools/call 'arguments' must be an object")
justification = args_raw.get("justification")
if not isinstance(justification, str) or not justification.strip():
raise _RpcClientError(
raise _RpcError(
ERR_INVALID_PARAMS,
f"{name}: 'justification' is required and must be a non-empty string",
)
@@ -379,13 +401,13 @@ def handle_tools_call(
file_field = PROPOSED_FILE_FIELD[name]
proposed_file = args_raw.get(file_field)
if not isinstance(proposed_file, str):
raise _RpcClientError(
raise _RpcError(
ERR_INVALID_PARAMS,
f"{name}: '{file_field}' is required and must be a string",
)
validate_proposed_file(name, proposed_file)
else:
raise _RpcClientError(ERR_INVALID_PARAMS, f"unknown tool {name!r}")
raise _RpcError(ERR_INVALID_PARAMS, f"unknown tool {name!r}")
proposal = _sv.Proposal.new(
bottle_slug=config.bottle_slug,
@@ -394,10 +416,7 @@ def handle_tools_call(
justification=justification,
current_file_hash=_sv.sha256_hex(proposed_file),
)
try:
_sv.write_proposal(config.queue_dir, proposal)
except OSError as e:
raise _RpcInternalError(f"failed to write proposal to queue: {e}") from e
_sv.write_proposal(config.queue_dir, proposal)
sys.stderr.write(
f"supervise: queued proposal {proposal.id} ({name}) "
f"for bottle {config.bottle_slug}; waiting for operator...\n"
@@ -417,10 +436,7 @@ def handle_tools_call(
"content": [{"type": "text", "text": text}],
"isError": False,
}
try:
_sv.archive_proposal(config.queue_dir, proposal.id)
except OSError as e:
raise _RpcInternalError(f"failed to archive proposal: {e}") from e
_sv.archive_proposal(config.queue_dir, proposal.id)
text = format_response_text(response)
return {
@@ -454,8 +470,9 @@ def format_pending_response_text(timeout_seconds: float) -> str:
# --- HTTP transport --------------------------------------------------------
# Max request body the server accepts. 1 MB is well above any realistic
# routes.yaml proposal.
# Max request body the server accepts. Generous because Dockerfile
# proposals can be a few KB; routes.json is small. 1 MB is well above
# any realistic config file.
MAX_BODY_BYTES = 1 * 1024 * 1024
@@ -495,7 +512,7 @@ class MCPHandler(http.server.BaseHTTPRequestHandler):
try:
req = parse_jsonrpc(body)
except _RpcClientError as e:
except _RpcError as e:
self._write_jsonrpc(jsonrpc_error(None, e.code, e.message))
return
@@ -503,19 +520,11 @@ class MCPHandler(http.server.BaseHTTPRequestHandler):
try:
result = self._dispatch(req, config)
except _RpcClientError as e:
except _RpcError as e:
self._write_jsonrpc(jsonrpc_error(req.id, e.code, e.message))
return
except _RpcInternalError as e:
cause = e.__cause__
detail = f": {cause}" if cause else ""
sys.stderr.write(f"supervise: internal error: {e.message}{detail}\n")
sys.stderr.flush()
self._write_jsonrpc(jsonrpc_error(req.id, ERR_INTERNAL, "internal error"))
return
except Exception as e: # noqa: W0718 — unexpected errors
sys.stderr.write(f"supervise: unexpected error: {type(e).__name__}: {e}\n")
sys.stderr.flush()
except Exception as e: # noqa: W0718 — catch-all for RPC dispatch errors
sys.stderr.write(f"supervise: internal error: {e}\n")
self._write_jsonrpc(jsonrpc_error(req.id, ERR_INTERNAL, "internal error"))
return
@@ -534,7 +543,7 @@ class MCPHandler(http.server.BaseHTTPRequestHandler):
return handle_tools_list(req.params)
if method == "tools/call":
return handle_tools_call(req.params, config)
raise _RpcClientError(ERR_METHOD_NOT_FOUND, f"method not found: {method}")
raise _RpcError(ERR_METHOD_NOT_FOUND, f"method not found: {method}")
def _write_jsonrpc(self, body: bytes) -> None:
self.send_response(200)
-166
View File
@@ -1,166 +0,0 @@
# PRD 0065: Multi-parent `extends:` for bottles
- **Status:** Active
- **Author:** didericis
- **Created:** 2026-06-25
- **Issue:** #268
- **Extends:** PRD 0025 (`0025-bottle-extends.md`)
## Summary
Allow a bottle's `extends:` field to accept either a single bottle name (existing
behavior) or a list of bottle names (new). Multiple parents are resolved
independently and folded left-to-right into a single effective parent before the
child is merged on top. This lets orthogonal concerns (base env, networking/egress,
agent provider) live in separate bottles and be composed without forcing them into a
linear chain.
## Problem
PRD 0025 shipped single-parent `extends:` and listed "No multi-parent inheritance"
as a non-goal. In practice, users want to compose multiple orthogonal bottles — a
base environment, a networking profile, and an agent-provider override — without
creating a three-level linear chain that couples unrelated parents to each other.
The linear chain workaround has two problems:
1. **Ordering constraint.** `networking extends base` works, but then
`agent extends networking` can't also pick up `base` without going through
`networking`, coupling two unrelated concerns.
2. **Quadratic duplication.** N orthogonal bottles require O(N²) chain variants
(one chain per permutation of applied concerns).
Multi-parent `extends:` removes both constraints: each orthogonal concern stays in
its own bottle, and the child bottle is the only place that names the combination.
## Goals / Success Criteria
- `extends:` accepts a list of strings in addition to a plain string.
- Backward compat: existing single-string `extends:` is unchanged.
- Parents are resolved left-to-right; later entries win on conflict.
- Child wins over all parents (unchanged from PRD 0025).
- Cycle detection covers multi-parent graphs, not just linear chains.
- Diamond inheritance: a shared ancestor is resolved once (via the existing cache).
- Invalid list entries (non-string, undefined bottle, self-reference) die at parse
with clear messages.
- `manifest_loader.py`'s `load_bottle_chain_from_dir` enqueues all parents from a
list `extends:` so the resolver sees every bottle in the graph.
## Non-goals
- No change to the agent-vs-bottle trust boundary (PRD 0025 "Alternatives
considered" option 2 stays rejected).
- No MRO / C3 linearization. Left-to-right fold is sufficient for the expected use
cases.
- No preflight display of per-field provenance across multiple parents (same open
question as PRD 0025; remains a follow-up).
## Design
### Schema
`extends:` now accepts either form:
```yaml
# single parent (unchanged)
extends: base
# multiple parents (new)
extends: [base, networking]
```
Both forms are normalized to a list internally. A list with one element behaves
identically to the string form.
### Merge rules for multi-parent fold
Parents are folded pairwise left-to-right before the child merge. For each step in
the fold, the "earlier" bottle is the running accumulator and the "later" bottle is
the next parent. Rules per field:
| Field | Fold rule |
|--------------------|--------------------------------------------------------------|
| `env` | dict merge; later wins on key collision |
| `git-gate.user` | per-field overlay; later's non-empty fields win |
| `git-gate.repos` | union by name; for same-name entries, later wins per-field |
| `egress.routes` | concatenate (earlier first, later appended) |
| `egress.log` | later wins (last-wins) |
| `agent_provider` | later wins (last-wins) |
| `supervise` | later wins (last-wins) |
After the fold, the combined parent is merged against the child using the existing
PRD 0025 rules (child always wins). The child's `egress.routes` appends to the
combined parent's concatenated routes; `validate_egress_routes` runs once on the
final merged set and catches duplicate hosts.
### Algorithm
```
extends: [p1, p2, p3]
fold:
combined = resolve(p1)
combined = fold_two(combined, resolve(p2))
combined = fold_two(combined, resolve(p3))
merge:
result = _merge_bottles(combined, child_raw, name)
```
`fold_two(earlier, later)` applies the rules in the table above. Cycle detection
(the `seen` tuple) is passed to each parent resolution call unchanged — if any
parent's chain circles back to the current bottle, it is caught. The `cache` dict
ensures a shared ancestor is only resolved once across all parents.
### Error cases
| Condition | Error message shape |
|----------------------------------------|------------------------------------------------------------------|
| `extends` is not a string or list | `extends must be a string or list of strings (was <type>)` |
| A list entry is not a string | `extends[<i>] must be a string (was <type>)` |
| A list entry names an undefined bottle | `extends '<name>' which is not defined. Available bottles: ...` |
| A list entry is the bottle itself | `extends itself; remove the self-reference` |
| Cycle through any parent edge | `is in an extends cycle: <chain>` |
## Implementation
### `bot_bottle/manifest_extends.py`
- `_resolve_one_bottle`: accept `str | list[str]` for `extends`; normalize to list;
validate each entry; for a single-entry list fall through to the existing
single-parent path; for multiple entries call `_fold_parents` then
`_merge_bottles`.
- `_fold_parents(parent_names, raws, cache, repos_cache, seen)`: resolve each
parent and fold pairwise left-to-right; return `(effective_bottle,
effective_repos_raw)`.
- `_fold_two_bottles(earlier, earlier_repos_raw, later, later_repos_raw)`: apply
the fold rules above; return `(folded_bottle, folded_repos_raw)`.
### `bot_bottle/manifest_loader.py`
- `load_bottle_chain_from_dir`: when `extends` is a list, enqueue all parent names
for loading (previously only `isinstance(parent, str)` was handled).
### `tests/unit/test_manifest_extends.py`
- `TestExtendsErrors.test_non_string_extends_dies`: update to use an integer
`extends` value (a list is now valid).
- New class `TestExtendsMultiParent` covering all cases listed in the issue.
## Testing strategy
Unit tests via `ManifestIndex.from_json_obj` (same resolver surface used by all
paths). No integration test changes needed — downstream code consumes the already-
merged bottle and is unchanged.
Test cases:
- Two-parent list: env union, egress routes concat, git repos union
- Last-parent-wins on scalar (supervise, agent_provider)
- Child wins over all parents on conflict
- Diamond: two parents share an ancestor; ancestor resolved once
- Single-element list: identical to string form
- Non-string extends value → ManifestError
- Non-string list entry → ManifestError
- Undefined bottle in list → ManifestError
- Self-reference in list → ManifestError
- Cycle through multi-parent edge → ManifestError
@@ -1,4 +1,4 @@
# PRD 0066: Separate agent and bottle selection
# PRD prd-new: Separate agent and bottle selection
- **Status:** Active
- **Author:** claude
-1
View File
@@ -4,4 +4,3 @@
pylint>=3.0.0
pyright>=1.1.300
coverage>=7.0.0
+4 -7
View File
@@ -92,9 +92,9 @@ class TestSandboxEscape(unittest.TestCase):
"on PATH: curl -sSL https://smolmachines.com/install.sh | sh"
)
# Throwaway static key for the git-gate fixture. It need not
# be a real SSH key: test 5 reaches gitleaks before any SSH
# attempt anyway.
# Throwaway "identity file" for the git-gate's `identity` field.
# It need not be a real SSH key: test 5 reaches gitleaks before
# any SSH attempt anyway.
fd, kp = tempfile.mkstemp(prefix="sandbox-test-key.")
os.close(fd)
cls._key_path = Path(kp)
@@ -123,10 +123,7 @@ class TestSandboxEscape(unittest.TestCase):
"git-gate": {"repos": {
"throwaway": {
"url": "ssh://git@unreachable.invalid:22/throwaway.git",
"key": {
"provider": "static",
"path": str(cls._key_path),
},
"identity": str(cls._key_path),
},
}},
},
@@ -198,7 +198,6 @@ class TestSmolmachinesLaunch(unittest.TestCase):
# connect fails, which is the property chunk 3 will
# preserve once egress is actually running.
r = self.bottle.exec(
"env -u HTTPS_PROXY -u HTTP_PROXY -u https_proxy -u http_proxy "
f"curl -s --show-error --max-time 3 http://{self.plan.bundle_ip}:9099 "
"2>&1 || true"
)
+2 -2
View File
@@ -115,8 +115,8 @@ class TestBottleIdentity(unittest.TestCase):
class TestPreserveMarker(_FakeHomeMixin, unittest.TestCase):
"""The .preserve marker tells cli.py's session-end cleanup to keep
the state dir instead of removing it."""
"""The .preserve marker is how capability_apply tells cli.py's
session-end cleanup to keep the state dir instead of removing it."""
def setUp(self):
self._setup_fake_home()
-103
View File
@@ -11,7 +11,6 @@ from __future__ import annotations
import os
import unittest
from collections.abc import Mapping, Sequence
from unittest.mock import MagicMock, patch
import bot_bottle.cli.start as start_mod
@@ -254,107 +253,5 @@ class TestCmdStartLabelCollision(unittest.TestCase):
self.assertIn("already in use", second_call_kwargs.get("disclaimer", ""))
class TestBottleLineage(unittest.TestCase):
"""Unit tests for _bottle_lineage."""
def test_returns_empty_in_eager_mode(self):
manifest = _make_manifest(["agent"], ["base", "dev"])
# home_md is None in eager mode → no file reads, returns {}
result = start_mod._bottle_lineage(manifest)
self.assertEqual({}, result)
def test_reads_extends_chain_from_files(self):
import tempfile
from pathlib import Path
with tempfile.TemporaryDirectory() as tmp:
bottles_dir = Path(tmp) / "bottles"
bottles_dir.mkdir()
(bottles_dir / "base.md").write_text("---\n{}\n---\n")
(bottles_dir / "mid.md").write_text("---\nextends: base\n---\n")
(bottles_dir / "leaf.md").write_text("---\nextends: mid\n---\n")
manifest = MagicMock()
manifest.home_md = Path(tmp)
result = start_mod._bottle_lineage(manifest)
self.assertNotIn("base", result) # no parent → not in map
self.assertEqual("base -> mid", result["mid"])
self.assertEqual("base -> mid -> leaf", result["leaf"])
def test_cycle_protection(self):
import tempfile
from pathlib import Path
with tempfile.TemporaryDirectory() as tmp:
bottles_dir = Path(tmp) / "bottles"
bottles_dir.mkdir()
(bottles_dir / "a.md").write_text("---\nextends: b\n---\n")
(bottles_dir / "b.md").write_text("---\nextends: a\n---\n")
manifest = MagicMock()
manifest.home_md = Path(tmp)
result = start_mod._bottle_lineage(manifest)
# Cycle must not hang; each should get a two-element chain.
for name in ("a", "b"):
self.assertIn(name, result)
self.assertIn("->", result[name])
class TestManifestToYaml(unittest.TestCase):
"""Unit tests for _manifest_to_yaml."""
def _make_manifest_obj(
self,
*,
skills: Sequence[str] = (),
env: Mapping[str, str] | None = None,
supervise: bool = True,
agent_provider_template: str = "claude",
):
from bot_bottle.manifest import Manifest, ManifestBottle
from bot_bottle.manifest_agent import ManifestAgent, ManifestAgentProvider
agent = ManifestAgent(skills=tuple(skills))
bottle = ManifestBottle(
env=env or {},
supervise=supervise,
agent_provider=ManifestAgentProvider(template=agent_provider_template),
)
return Manifest(agent=agent, bottle=bottle)
def test_includes_agent_section(self):
m = self._make_manifest_obj(skills=["researcher"])
yaml = start_mod._manifest_to_yaml(m)
self.assertIn("agent:", yaml)
self.assertIn("- researcher", yaml)
def test_includes_bottle_section(self):
m = self._make_manifest_obj(env={"FOO": "bar"})
yaml = start_mod._manifest_to_yaml(m)
self.assertIn("bottle:", yaml)
self.assertIn("FOO: bar", yaml)
def test_supervise_rendered(self):
m_true = self._make_manifest_obj(supervise=True)
m_false = self._make_manifest_obj(supervise=False)
self.assertIn("supervise: true", start_mod._manifest_to_yaml(m_true))
self.assertIn("supervise: false", start_mod._manifest_to_yaml(m_false))
def test_non_claude_provider_shown(self):
m = self._make_manifest_obj(agent_provider_template="codex")
yaml = start_mod._manifest_to_yaml(m)
self.assertIn("agent_provider:", yaml)
self.assertIn("template: codex", yaml)
def test_default_claude_provider_omitted(self):
m = self._make_manifest_obj(agent_provider_template="claude")
yaml = start_mod._manifest_to_yaml(m)
self.assertNotIn("agent_provider:", yaml)
if __name__ == "__main__":
unittest.main()
+2 -2
View File
@@ -29,8 +29,8 @@ class _FakeHomeMixin:
class TestCaptureSessionState(_FakeHomeMixin, unittest.TestCase):
# capture_claude_session_state handles the preserve marker for
# non-zero agent exits.
# snapshot_transcript is commented out (capability_apply is disabled);
# capture_claude_session_state now only handles the preserve marker.
def setUp(self):
self._setup_fake_home()
-25
View File
@@ -12,9 +12,6 @@ from typing import Any, Optional
from bot_bottle.cli.tui import _filter_items, _multiselect_loop, filter_multiselect, filter_select
_KEY_SPACE = 32
_KEY_ENTER = 10
_KEY_ESC = 27
_KEY_CTRL_D = 4
@@ -147,29 +144,7 @@ class TestMultiselectLoopReordering(unittest.TestCase):
)
self.assertEqual(["a", "b"], result)
def test_space_toggles_item_on(self):
# Space on an unselected item selects it; Ctrl-D confirms.
result = self._run([_KEY_SPACE, _KEY_CTRL_D], ["a", "b"], [])
self.assertEqual(["a"], result)
def test_space_toggles_item_off(self):
# Space on a selected item deselects it; Ctrl-D confirms empty.
result = self._run([_KEY_SPACE, _KEY_CTRL_D], ["a", "b"], ["a"])
self.assertEqual([], result)
def test_enter_confirms_without_toggle(self):
# Enter immediately confirms the current selection without toggling.
result = self._run([_KEY_ENTER], ["a", "b"], ["a"])
self.assertEqual(["a"], result)
def test_enter_confirms_empty_selection(self):
result = self._run([_KEY_ENTER], ["a", "b"], [])
self.assertEqual([], result)
def test_space_then_enter_confirms(self):
# Space selects "a", Enter confirms.
result = self._run([_KEY_SPACE, _KEY_ENTER], ["a", "b"], [])
self.assertEqual(["a"], result)
if __name__ == "__main__":
+11 -3
View File
@@ -108,6 +108,7 @@ def _supervise_plan() -> SupervisePlan:
return SupervisePlan(
slug=SLUG,
queue_dir=STATE / "supervise" / "queue",
current_config_dir=STATE / "supervise" / "current-config",
internal_network=f"bot-bottle-net-{SLUG}",
)
@@ -270,11 +271,18 @@ class TestAgentAlwaysPresent(unittest.TestCase):
s = bottle_plan_to_compose(_plan(**kwargs))["services"]["agent"]
self.assertEqual(["sidecars"], s["depends_on"])
def test_agent_has_no_current_config_mount_with_supervise(self):
def test_agent_current_config_mount_only_with_supervise(self):
with_sv = bottle_plan_to_compose(_plan(supervise=True))["services"]["agent"]
self.assertNotIn("volumes", with_sv)
self.assertTrue(any(
v["target"] == "/etc/bot-bottle/current-config"
for v in with_sv.get("volumes", [])
))
without_sv = bottle_plan_to_compose(_plan(supervise=False))["services"]["agent"]
self.assertNotIn("volumes", without_sv)
# Either no volumes key at all, or no current-config target.
self.assertFalse(any(
v["target"] == "/etc/bot-bottle/current-config"
for v in without_sv.get("volumes", [])
))
class TestSidecarBundleShape(unittest.TestCase):
@@ -75,6 +75,7 @@ def _plan(
supervise_plan = SupervisePlan(
slug="demo-abc12",
queue_dir=Path("/tmp/queue"),
current_config_dir=Path("/tmp/current-config"),
)
return DockerBottlePlan(
spec=spec,
@@ -78,6 +78,7 @@ def _plan(
supervise_plan = SupervisePlan(
slug="demo-abc12",
queue_dir=Path("/tmp/queue"),
current_config_dir=Path("/tmp/current-config"),
)
return DockerBottlePlan(
spec=spec,
@@ -10,8 +10,6 @@ from unittest.mock import MagicMock, patch
from bot_bottle.contrib.gitea.deploy_key_provisioner import (
GiteaDeployKeyProvisioner,
_API_TIMEOUT_SECS,
_KEYGEN_TIMEOUT_SECS,
_split_owner_repo,
)
from bot_bottle.deploy_key_provisioner import DeployKeyCollisionError
@@ -85,25 +83,6 @@ class TestCreate(unittest.TestCase):
self.assertEqual(str(fake_key_id), key_id)
self.assertEqual(fake_private, private_bytes)
def test_create_passes_timeout_to_ssh_keygen_and_urlopen(self):
provisioner = _provisioner()
with patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.subprocess.run"
) as mock_run, patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.urllib.request.urlopen"
) as mock_urlopen, patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.Path.read_bytes",
return_value=b"PRIVATE",
), patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.Path.read_text",
return_value="ssh-ed25519 AAAA\n",
):
mock_urlopen.return_value = _urlopen_response({"id": 1})
provisioner.create("owner/repo", "title")
self.assertEqual(_KEYGEN_TIMEOUT_SECS, mock_run.call_args.kwargs.get("timeout"))
self.assertEqual(_API_TIMEOUT_SECS, mock_urlopen.call_args.kwargs.get("timeout"))
def test_create_raises_on_http_error(self):
provisioner = _provisioner()
with patch(
@@ -160,16 +139,6 @@ class TestDelete(unittest.TestCase):
self.assertIn("/api/v1/repos/didericis/bot-bottle/keys/99", req.full_url)
self.assertEqual("DELETE", req.get_method())
def test_delete_passes_timeout_to_urlopen(self):
provisioner = _provisioner()
with patch(
"bot_bottle.contrib.gitea.deploy_key_provisioner.urllib.request.urlopen"
) as mock_urlopen:
mock_urlopen.return_value = _urlopen_response({})
provisioner.delete("owner/repo", "7")
self.assertEqual(_API_TIMEOUT_SECS, mock_urlopen.call_args.kwargs.get("timeout"))
def test_delete_tolerates_404(self):
provisioner = _provisioner()
with patch(
+92 -29
View File
@@ -24,36 +24,61 @@ from bot_bottle.dlp_detectors import (
)
# (case id, sample body carrying the token, substring expected in the reason).
# One row per known token shape; all are block-severity credential matches.
# `# gitleaks:allow` marks the synthetic tokens so a source scan won't flag them.
_TOKEN_PATTERN_CASES: list[tuple[str, str, str]] = [
("aws_access_key", "key=AKIAIOSFODNN7EXAMPLE", "AWS access key"),
("github_classic", "token: ghp_" + "A" * 36, "GitHub token"), # gitleaks:allow
("github_fine_grained", "pat=github_pat_" + "A" * 82, "fine-grained"), # gitleaks:allow
("anthropic", "auth: sk-ant-" + "A" * 93, "Anthropic"), # gitleaks:allow
("openai", "key=sk-" + "A" * 48, "OpenAI"), # gitleaks:allow
("stripe_live", "stripe: sk_live_" + "A" * 24, "Stripe"), # gitleaks:allow
("bearer_jwt", "Authorization: Bearer " + "A" * 60, "Bearer JWT"), # gitleaks:allow
("openai_project", "key=sk-proj-" + "A" * 48, "OpenAI project"), # gitleaks:allow
("huggingface", "token=hf_" + "A" * 34, "HuggingFace"), # gitleaks:allow
("databricks", "dapi" + "a" * 32, "Databricks"), # gitleaks:allow
("slack_bot", "xoxb-00000000000-00000000000-" + "A" * 24, "Slack"), # gitleaks:allow
("npm", "npm_" + "A" * 36, "npm"), # gitleaks:allow
("sendgrid", "SG." + "A" * 22 + "." + "B" * 43, "SendGrid"), # gitleaks:allow
("pypi", "pypi-" + "A" * 80, "PyPI"), # gitleaks:allow
("vault", "hvs." + "A" * 24, "Vault"), # gitleaks:allow
]
class TestScanTokenPatterns(unittest.TestCase):
def test_detects_each_token_pattern(self):
for case_id, sample, expected in _TOKEN_PATTERN_CASES:
with self.subTest(case_id):
result = scan_token_patterns(sample)
assert result is not None
self.assertEqual("block", result.severity)
self.assertIn(expected, result.reason)
def test_aws_access_key(self):
result = scan_token_patterns("key=AKIAIOSFODNN7EXAMPLE")
assert result is not None
self.assertEqual("block", result.severity)
self.assertIn("AWS access key", result.reason)
def test_github_classic_token(self):
result = scan_token_patterns(
"token: ghp_" + "A" * 36,
)
assert result is not None
self.assertIn("GitHub token", result.reason)
def test_github_fine_grained_token(self):
result = scan_token_patterns(
"pat=github_pat_" + "A" * 82,
)
assert result is not None
self.assertIn("fine-grained", result.reason)
def test_anthropic_api_key(self):
result = scan_token_patterns(
"auth: sk-ant-" + "A" * 93,
)
assert result is not None
self.assertIn("Anthropic", result.reason)
def test_openai_api_key(self):
result = scan_token_patterns(
"key=sk-" + "A" * 48,
)
assert result is not None
self.assertIn("OpenAI", result.reason)
def test_stripe_live_key(self):
result = scan_token_patterns(
"stripe: sk_live_" + "A" * 24,
)
assert result is not None
self.assertIn("Stripe", result.reason)
def test_bearer_jwt(self):
result = scan_token_patterns(
"Authorization: Bearer " + "A" * 60,
)
assert result is not None
self.assertIn("Bearer JWT", result.reason)
def test_openai_project_key(self):
result = scan_token_patterns(
"key=sk-proj-" + "A" * 48,
)
assert result is not None
self.assertIn("OpenAI project", result.reason)
def test_clean_text_returns_none(self):
self.assertIsNone(scan_token_patterns("hello world"))
@@ -282,6 +307,44 @@ class TestEncodedVariants(unittest.TestCase):
self.assertEqual(len(v), len(set(v)))
class TestScanTokenPatternsExtended(unittest.TestCase):
def test_huggingface_token(self):
result = scan_token_patterns("token=hf_" + "A" * 34) # gitleaks:allow
assert result is not None
self.assertIn("HuggingFace", result.reason)
def test_databricks_token(self):
result = scan_token_patterns("dapi" + "a" * 32) # gitleaks:allow
assert result is not None
self.assertIn("Databricks", result.reason)
def test_slack_bot_token(self):
# Use all-zero numeric segments to keep entropy low
result = scan_token_patterns("xoxb-00000000000-00000000000-" + "A" * 24) # gitleaks:allow
assert result is not None
self.assertIn("Slack", result.reason)
def test_npm_token(self):
result = scan_token_patterns("npm_" + "A" * 36) # gitleaks:allow
assert result is not None
self.assertIn("npm", result.reason)
def test_sendgrid_key(self):
result = scan_token_patterns("SG." + "A" * 22 + "." + "B" * 43) # gitleaks:allow
assert result is not None
self.assertIn("SendGrid", result.reason)
def test_pypi_token(self):
result = scan_token_patterns("pypi-" + "A" * 80) # gitleaks:allow
assert result is not None
self.assertIn("PyPI", result.reason)
def test_vault_token(self):
result = scan_token_patterns("hvs." + "A" * 24) # gitleaks:allow
assert result is not None
self.assertIn("Vault", result.reason)
class TestUnicodeNormalization(unittest.TestCase):
def test_fullwidth_chars_normalized(self):
# Fullwidth ASCII chars (U+FF21..U+FF3A) should map to ASCII
+2 -2
View File
@@ -65,8 +65,8 @@ class TestOrphanStateDirs(_FakeHomeMixin, unittest.TestCase):
)
def test_preserve_marker_skips_dir(self):
# Preserve marker means the user explicitly wanted this dir
# kept for `resume`.
# Preserve marker = capability-block or crash auto-preserve;
# the user explicitly wanted this dir kept for `resume`.
bottle_state.write_per_bottle_dockerfile("kept-ccc", "FROM x\n")
bottle_state.mark_preserved("kept-ccc")
self.assertEqual(
-71
View File
@@ -10,7 +10,6 @@ from bot_bottle.egress import (
Egress,
EgressPlan,
EgressRoute,
_yaml_str_escape,
egress_agent_env_entries,
egress_manifest_routes,
egress_render_routes,
@@ -420,76 +419,6 @@ class TestRenderRoutes(unittest.TestCase):
self.assertEqual(LOG_BLOCKS, cfg.log)
class TestYamlStrEscape(unittest.TestCase):
"""_yaml_str_escape produces safe YAML double-quoted scalar content."""
def test_plain_string_unchanged(self):
self.assertEqual("api.example.com", _yaml_str_escape("api.example.com"))
def test_double_quote_escaped(self):
self.assertEqual('\\"', _yaml_str_escape('"'))
def test_backslash_escaped(self):
self.assertEqual("\\\\", _yaml_str_escape("\\"))
def test_newline_escaped(self):
self.assertEqual("\\n", _yaml_str_escape("\n"))
def test_carriage_return_escaped(self):
self.assertEqual("\\r", _yaml_str_escape("\r"))
def test_tab_escaped(self):
self.assertEqual("\\t", _yaml_str_escape("\t"))
def test_combined(self):
self.assertEqual('\\"\\n\\\\', _yaml_str_escape('"\n\\'))
class TestRenderRoutesEscaping(unittest.TestCase):
"""Stray quotes/newlines in manifest strings do not corrupt routes.yaml."""
@staticmethod
def _parsed(routes) -> list[dict]: # type: ignore
return parse_yaml_subset(egress_render_routes(routes))["routes"] # type: ignore
def test_host_with_double_quote_round_trips(self):
routes = (EgressRoute(host='bad"host.example'),)
parsed = self._parsed(routes)
self.assertEqual('bad"host.example', parsed[0]["host"])
def test_host_with_newline_round_trips(self):
routes = (EgressRoute(host="host\nextra.example"),)
parsed = self._parsed(routes)
self.assertEqual("host\nextra.example", parsed[0]["host"])
def test_auth_scheme_with_double_quote_round_trips(self):
routes = (EgressRoute(
host="api.example",
auth_scheme='Bear"er',
token_env="EGRESS_TOKEN_0",
),)
parsed = self._parsed(routes)
self.assertEqual('Bear"er', parsed[0]["auth_scheme"])
def test_path_value_with_double_quote_round_trips(self):
from bot_bottle.egress_addon_core import PathMatch, MatchEntry
routes = (EgressRoute(
host="api.example",
matches=(MatchEntry(paths=(PathMatch(type="prefix", value='/v1/"quoted"/'),)),),
),)
parsed = self._parsed(routes)
self.assertEqual('/v1/"quoted"/', parsed[0]["matches"][0]["paths"][0]["value"])
def test_header_value_with_double_quote_round_trips(self):
from bot_bottle.egress_addon_core import HeaderMatch, MatchEntry
routes = (EgressRoute(
host="api.example",
matches=(MatchEntry(headers=(HeaderMatch(name="x-h", value='val"ue'),)),),
),)
parsed = self._parsed(routes)
self.assertEqual('val"ue', parsed[0]["matches"][0]["headers"][0]["value"])
class TestResolveTokenValues(unittest.TestCase):
def test_reads_host_env(self):
out = egress_resolve_token_values(
-65
View File
@@ -4,7 +4,6 @@ import os
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch
from bot_bottle.git_gate import (
GitGate,
@@ -14,8 +13,6 @@ from bot_bottle.git_gate import (
git_gate_render_access_hook,
git_gate_render_entrypoint,
git_gate_render_hook,
revoke_git_gate_provisioned_keys,
_resolve_identity_file,
git_gate_upstreams_for_bottle,
)
from bot_bottle.manifest import ManifestIndex
@@ -331,68 +328,6 @@ class TestPrepare(unittest.TestCase):
self.assertIn("exec git daemon", content)
class TestDynamicKeyProvisioning(unittest.TestCase):
def setUp(self):
self.stage = Path(tempfile.mkdtemp())
def tearDown(self):
import shutil
shutil.rmtree(self.stage, ignore_errors=True)
def _gitea_manifest(self):
return ManifestIndex.from_json_obj({
"bottles": {
"dev": {
"git-gate": {
"repos": {
"repo": {
"url": "ssh://git@gitea.example.com/org/repo.git",
"key": {
"provider": "gitea",
"forge_token_env": "GITEA_TOKEN",
},
"host_key": "ssh-ed25519 AAAA...",
},
},
}
}
},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
})
def test_resolve_identity_file_static_uses_entry_path(self):
entry = fixture_with_git().bottles["dev"].git[0]
self.assertEqual(entry.IdentityFile, _resolve_identity_file(entry, "demo", self.stage))
def test_resolve_identity_file_gitea_provisions_key(self):
entry = self._gitea_manifest().bottles["dev"].git[0]
with patch("bot_bottle.git_gate._provision_dynamic_key", return_value="/tmp/provisioned-key") as mock_provision:
self.assertEqual("/tmp/provisioned-key", _resolve_identity_file(entry, "demo", self.stage))
mock_provision.assert_called_once()
def test_revoke_skips_non_gitea_and_missing_id_file(self):
revoke_git_gate_provisioned_keys(fixture_with_git().bottles["dev"], self.stage)
def test_revoke_calls_delete_for_gitea_entry(self):
bottle = self._gitea_manifest().bottles["dev"]
(self.stage / "repo-deploy-key-id").write_text("123\n")
with patch.dict("os.environ", {"GITEA_TOKEN": "token"}), patch(
"bot_bottle.deploy_key_provisioner.get_provisioner"
) as mock_get_provisioner:
provisioner = mock_get_provisioner.return_value
revoke_git_gate_provisioned_keys(bottle, self.stage)
mock_get_provisioner.assert_called_once()
provisioner.delete.assert_called_once_with("org/repo", "123")
def test_revoke_missing_token_raises(self):
bottle = self._gitea_manifest().bottles["dev"]
(self.stage / "repo-deploy-key-id").write_text("123\n")
with patch.dict("os.environ", {}, clear=True), self.assertRaises(RuntimeError) as cm:
revoke_git_gate_provisioned_keys(bottle, self.stage)
self.assertIn("env var is not set", str(cm.exception))
class TestShellEscaping(unittest.TestCase):
"""Regression tests: all three render functions must produce syntactically
valid sh code even when names and upstream URLs contain shell-special
-107
View File
@@ -9,7 +9,6 @@ import urllib.request
from pathlib import Path
from unittest import mock
from bot_bottle.git_gate import GIT_GATE_TIMEOUT_SECS
from bot_bottle.git_http_backend import GitHttpHandler, MAX_BODY_BYTES
@@ -151,61 +150,6 @@ class TestGitHttpBackend(unittest.TestCase):
)
self.assertEqual("git/test", env["HTTP_USER_AGENT"])
def test_subprocess_calls_include_timeout(self):
"""Both subprocess.run calls (access-hook and git http-backend) must
pass timeout= so a hung upstream cannot wedge the sidecar."""
from http.server import ThreadingHTTPServer
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
(root / "repo.git").mkdir()
old_root = os.environ.get("GIT_PROJECT_ROOT")
os.environ["GIT_PROJECT_ROOT"] = str(root)
self.addCleanup(self._restore_env, old_root)
old_hook = os.environ.get("GIT_GATE_ACCESS_HOOK")
hook = root / "access-hook"
hook.write_text("#!/bin/sh\nexit 0\n")
hook.chmod(0o700)
os.environ["GIT_GATE_ACCESS_HOOK"] = str(hook)
self.addCleanup(self._restore_hook, old_hook)
server = ThreadingHTTPServer(("127.0.0.1", 0), GitHttpHandler)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
self.addCleanup(server.shutdown)
self.addCleanup(server.server_close)
backend_response = (
b"Status: 200 OK\r\n"
b"Content-Type: application/x-git-upload-pack-result\r\n"
b"\r\n"
b"0000"
)
calls = [
subprocess.CompletedProcess(["hook"], 0, b"", b""),
subprocess.CompletedProcess(["git"], 0, backend_response, b""),
]
with mock.patch(
"bot_bottle.git_http_backend.subprocess.run",
side_effect=calls,
) as run:
req = urllib.request.Request(
f"http://127.0.0.1:{server.server_port}"
"/repo.git/git-upload-pack",
data=b"",
method="POST",
)
with urllib.request.urlopen(req, timeout=5):
pass
for call in run.call_args_list:
self.assertEqual(
GIT_GATE_TIMEOUT_SECS,
call.kwargs.get("timeout"),
f"subprocess.run call missing timeout: {call}",
)
def test_access_hook_denial_is_logged_to_stdout(self):
"""When the access-hook exits non-zero we still return 403 to the
client, but the hook's stderr must also appear on the handler's
@@ -312,57 +256,6 @@ class TestGitHttpBackend(unittest.TestCase):
os.environ["GIT_GATE_ACCESS_HOOK"] = value
class TestMalformedStatusHeader(unittest.TestCase):
"""Malformed CGI Status: headers must not propagate as unhandled exceptions;
the handler should fall back to HTTP 500."""
def setUp(self):
from http.server import ThreadingHTTPServer
import tempfile
self._tmp = tempfile.mkdtemp()
os.environ["GIT_PROJECT_ROOT"] = self._tmp
self._server = ThreadingHTTPServer(("127.0.0.1", 0), GitHttpHandler)
self._thread = threading.Thread(
target=self._server.serve_forever, daemon=True,
)
self._thread.start()
self._port = self._server.server_port
def tearDown(self):
self._server.shutdown()
self._server.server_close()
os.environ.pop("GIT_PROJECT_ROOT", None)
import shutil
shutil.rmtree(self._tmp, ignore_errors=True)
def _get_with_backend_response(self, cgi_response: bytes) -> int:
with mock.patch(
"bot_bottle.git_http_backend.subprocess.run",
return_value=mock.Mock(returncode=0, stdout=cgi_response),
):
req = urllib.request.Request(
f"http://127.0.0.1:{self._port}/repo.git/info/refs",
method="GET",
)
try:
with urllib.request.urlopen(req, timeout=3) as resp:
return resp.status
except urllib.error.HTTPError as e: # type: ignore
return e.code
def test_empty_status_value_returns_500(self):
status = self._get_with_backend_response(
b"Status: \r\nContent-Type: text/plain\r\n\r\n"
)
self.assertEqual(500, status)
def test_non_numeric_status_returns_500(self):
status = self._get_with_backend_response(
b"Status: bad\r\nContent-Type: text/plain\r\n\r\n"
)
self.assertEqual(500, status)
class TestContentLengthBounds(unittest.TestCase):
"""PRD 0041: malformed or oversized Content-Length is rejected before
git http-backend is invoked."""
-27
View File
@@ -73,33 +73,6 @@ resolver #2
)
self.assertTrue(run.call_args_list[-1].kwargs["check"])
def test_build_image_anchors_relative_dockerfile_to_context(self):
status = util.subprocess.CompletedProcess(
args=[],
returncode=0,
stdout=(
'[{"status":{"state":"running"},'
'"configuration":{"dns":{"nameservers":["9.9.9.9"]}}}]'
),
stderr="",
)
with patch.object(util.subprocess, "run", return_value=status) as run, \
patch.object(util.os, "environ", {
"BOT_BOTTLE_MACOS_CONTAINER_DNS": "9.9.9.9",
}):
util.build_image(
"bot-bottle-sidecars:latest",
"/repo",
dockerfile="Dockerfile.sidecars",
)
self.assertEqual(
[
"container", "build", "-t", "bot-bottle-sidecars:latest",
"--dns", "9.9.9.9", "-f", "/repo/Dockerfile.sidecars", "/repo",
],
run.call_args_list[-1].args[0],
)
def test_commit_container_execs_tar_and_builds_image(self):
# stderr is bytes because subprocess.run uses stderr=PIPE without text=True
completed = util.subprocess.CompletedProcess(
+3 -176
View File
@@ -423,182 +423,9 @@ class TestExtendsErrors(unittest.TestCase):
)
self.assertIn("extends cycle", msg)
def test_non_string_non_list_extends_dies(self):
msg = _error_message(_build, child={"extends": 123})
self.assertIn("extends must be a string or list of strings", msg)
def test_list_entry_non_string_dies(self):
msg = _error_message(_build, child={"extends": [123]})
self.assertIn("extends[0] must be a string", msg)
class TestExtendsMultiParent(unittest.TestCase):
"""extends: [p1, p2, ...] — multi-parent composition (issue #268)."""
_GIT_A = {"url": "ssh://git@host-a/a.git", "key": {"provider": "static", "path": "/k"}}
_GIT_B = {"url": "ssh://git@host-b/b.git", "key": {"provider": "static", "path": "/k"}}
def test_single_element_list_same_as_string(self):
m = _build(
base={"env": {"X": "1"}},
child={"extends": ["base"]},
)
self.assertEqual({"X": "1"}, dict(m.bottles["child"].env))
def test_two_parents_env_union(self):
m = _build(
p1={"env": {"A": "1"}},
p2={"env": {"B": "2"}},
child={"extends": ["p1", "p2"]},
)
self.assertEqual({"A": "1", "B": "2"}, dict(m.bottles["child"].env))
def test_two_parents_env_last_wins_on_collision(self):
m = _build(
p1={"env": {"X": "from-p1"}},
p2={"env": {"X": "from-p2"}},
child={"extends": ["p1", "p2"]},
)
self.assertEqual("from-p2", m.bottles["child"].env["X"])
def test_child_wins_over_all_parents(self):
m = _build(
p1={"env": {"X": "from-p1"}},
p2={"env": {"X": "from-p2"}},
child={"extends": ["p1", "p2"], "env": {"X": "from-child"}},
)
self.assertEqual("from-child", m.bottles["child"].env["X"])
def test_two_parents_supervise_last_wins(self):
m = _build(
p1={"supervise": False},
p2={"supervise": True},
child={"extends": ["p1", "p2"]},
)
self.assertTrue(m.bottles["child"].supervise)
def test_child_supervise_overrides_all_parents(self):
m = _build(
p1={"supervise": True},
p2={"supervise": True},
child={"extends": ["p1", "p2"], "supervise": False},
)
self.assertFalse(m.bottles["child"].supervise)
def test_two_parents_egress_routes_concatenated(self):
m = _build(
p1={"egress": {"routes": [{"host": "a.example.com"}]}},
p2={"egress": {"routes": [{"host": "b.example.com"}]}},
child={"extends": ["p1", "p2"]},
)
hosts = [r.Host for r in m.bottles["child"].egress.routes]
self.assertEqual(["a.example.com", "b.example.com"], hosts)
def test_child_egress_appends_after_combined_parents(self):
m = _build(
p1={"egress": {"routes": [{"host": "a.example.com"}]}},
p2={"egress": {"routes": [{"host": "b.example.com"}]}},
child={
"extends": ["p1", "p2"],
"egress": {"routes": [{"host": "c.example.com"}]},
},
)
hosts = [r.Host for r in m.bottles["child"].egress.routes]
self.assertEqual(["a.example.com", "b.example.com", "c.example.com"], hosts)
def test_two_parents_git_repos_union(self):
m = _build(
p1={"git-gate": {"repos": {"a": self._GIT_A}}},
p2={"git-gate": {"repos": {"b": self._GIT_B}}},
child={"extends": ["p1", "p2"]},
)
names = {e.Name for e in m.bottles["child"].git}
self.assertEqual({"a", "b"}, names)
def test_two_parents_git_same_name_later_wins_per_field(self):
# Both parents declare the same repo name. p2's `key` wins; p1's
# `host_key` is preserved because p2 doesn't override it.
p1_entry = {
"url": "ssh://git@host-a/repo.git",
"host_key": "ecdsa AAAA",
"key": {"provider": "static", "path": "/k1"},
}
p2_entry = {
"url": "ssh://git@host-a/repo.git", # required, same url
"key": {"provider": "gitea", "forge_token_env": "TOK"},
}
m = _build(
p1={"git-gate": {"repos": {"repo": p1_entry}}},
p2={"git-gate": {"repos": {"repo": p2_entry}}},
child={"extends": ["p1", "p2"]},
)
entries = m.bottles["child"].git
self.assertEqual(1, len(entries))
e = entries[0]
self.assertEqual("ssh://git@host-a/repo.git", e.Upstream)
self.assertEqual("ecdsa AAAA", e.KnownHostKey)
self.assertEqual("gitea", e.Key.provider)
def test_p1_repos_preserved_when_p2_has_none(self):
m = _build(
p1={"git-gate": {"repos": {"a": self._GIT_A}}},
p2={"env": {"X": "1"}},
child={"extends": ["p1", "p2"]},
)
names = [e.Name for e in m.bottles["child"].git]
self.assertEqual(["a"], names)
def test_diamond_shared_ancestor_resolved_once(self):
# a <- b, a <- c; child extends [b, c]
# `a` must be resolved once and cached.
m = _build(
a={"env": {"FROM_A": "1"}, "supervise": False},
b={"extends": "a", "env": {"FROM_B": "1"}},
c={"extends": "a", "env": {"FROM_C": "1"}},
child={"extends": ["b", "c"]},
)
child = m.bottles["child"]
self.assertEqual("1", child.env["FROM_A"])
self.assertEqual("1", child.env["FROM_B"])
self.assertEqual("1", child.env["FROM_C"])
# supervise=False from `a` threads through both b and c; c is the
# later parent so its effective supervise (False) wins.
self.assertFalse(child.supervise)
def test_three_parents_env_fold_order(self):
m = _build(
p1={"env": {"X": "1", "A": "a"}},
p2={"env": {"X": "2", "B": "b"}},
p3={"env": {"X": "3", "C": "c"}},
child={"extends": ["p1", "p2", "p3"]},
)
env = dict(m.bottles["child"].env)
self.assertEqual("3", env["X"])
self.assertEqual("a", env["A"])
self.assertEqual("b", env["B"])
self.assertEqual("c", env["C"])
def test_undefined_bottle_in_list_dies(self):
msg = _error_message(
_build,
base={"env": {}},
child={"extends": ["base", "ghost"]},
)
self.assertIn("extends 'ghost'", msg)
self.assertIn("not defined", msg)
def test_self_reference_in_list_dies(self):
msg = _error_message(_build, child={"extends": ["child"]})
self.assertIn("extends itself", msg)
def test_cycle_through_multi_parent_edge_dies(self):
msg = _error_message(
_build,
a={"extends": ["b", "c"]},
b={},
c={"extends": "a"},
)
self.assertIn("extends cycle", msg)
def test_non_string_extends_dies(self):
msg = _error_message(_build, child={"extends": ["base"]})
self.assertIn("extends must be a string", msg)
class TestExtendsAvailableInBottleKeys(unittest.TestCase):
-38
View File
@@ -8,7 +8,6 @@ import unittest
from bot_bottle.git_gate import (
GIT_GATE_HOSTNAME,
_gitconfig_validate_value,
git_gate_render_gitconfig,
)
from bot_bottle.manifest import ManifestIndex
@@ -91,42 +90,5 @@ class TestGitGateGitconfigRender(unittest.TestCase):
self.assertNotIn("gitea.dideric.is", out)
class TestGitconfigValidateValue(unittest.TestCase):
"""_gitconfig_validate_value rejects values that would inject gitconfig keys."""
def test_normal_url_passes(self):
_gitconfig_validate_value("url", "ssh://git@github.com/owner/repo.git")
def test_newline_in_url_raises(self):
with self.assertRaises(ValueError):
_gitconfig_validate_value("url", "ssh://git@github.com/owner/\nrepo.git")
def test_carriage_return_in_url_raises(self):
with self.assertRaises(ValueError):
_gitconfig_validate_value("url", "ssh://git@github.com/\rrepo.git")
def test_error_message_names_field(self):
with self.assertRaises(ValueError, msg="error should name the field") as ctx:
_gitconfig_validate_value("repos['bad'].url", "ssh://host/\npath")
self.assertIn("repos['bad'].url", str(ctx.exception))
class TestGitconfigRenderRejectsNewlineInUpstream(unittest.TestCase):
"""git_gate_render_gitconfig raises on Upstream values with newlines."""
def test_newline_in_upstream_raises(self):
m = ManifestIndex.from_json_obj({
"bottles": {"dev": {"git-gate": {"repos": {
"evil": {
"url": "ssh://git@github.com/owner/\nfake-key = injected\nrepo.git",
"key": {"provider": "static", "path": "/dev/null"},
},
}}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
})
with self.assertRaises(ValueError):
git_gate_render_gitconfig(m.bottles["dev"].git, GIT_GATE_HOSTNAME)
if __name__ == "__main__":
unittest.main()
@@ -130,6 +130,7 @@ def _plan(
supervise_plan = SupervisePlan(
slug="demo-abc12",
queue_dir=Path("/tmp/queue"),
current_config_dir=Path("/tmp/current-config"),
)
return SmolmachinesBottlePlan(
spec=spec,
+18 -13
View File
@@ -16,7 +16,7 @@ from bot_bottle.supervise import (
STATUS_APPROVED,
STATUS_MODIFIED,
STATUS_REJECTED,
TOOL_EGRESS_ALLOW,
TOOL_CAPABILITY_BLOCK,
TOOL_GITLEAKS_ALLOW,
archive_proposal,
audit_log_path,
@@ -37,9 +37,9 @@ FIXED_TS = datetime(2026, 5, 25, 12, 0, 0, tzinfo=timezone.utc)
def _proposal(
tool: str = TOOL_EGRESS_ALLOW,
proposed: str = "routes:\n - host: example.com\n",
justification: str = "need egress",
tool: str = TOOL_CAPABILITY_BLOCK,
proposed: str = "FROM python:3.13\n",
justification: str = "need a capability",
) -> Proposal:
return Proposal.new(
bottle_slug="dev",
@@ -57,7 +57,7 @@ class TestProposalRoundtrip(unittest.TestCase):
self.assertTrue(p.id)
self.assertEqual("2026-05-25T12:00:00+00:00", p.arrival_timestamp)
self.assertEqual("dev", p.bottle_slug)
self.assertEqual(TOOL_EGRESS_ALLOW, p.tool)
self.assertEqual(TOOL_CAPABILITY_BLOCK, p.tool)
def test_to_from_dict_roundtrip(self):
p = _proposal()
@@ -142,14 +142,14 @@ class TestQueueIO(unittest.TestCase):
def test_list_pending_sorted_by_arrival(self):
# Fabricate two with explicit timestamps.
a = Proposal.new(
bottle_slug="dev", tool=TOOL_EGRESS_ALLOW,
proposed_file="routes:\n - host: early.example.com\n", justification="early",
bottle_slug="dev", tool=TOOL_CAPABILITY_BLOCK,
proposed_file="FROM python:3.13\n", justification="early",
current_file_hash="x",
now=datetime(2026, 5, 25, 10, 0, 0, tzinfo=timezone.utc),
)
b = Proposal.new(
bottle_slug="dev", tool=TOOL_EGRESS_ALLOW,
proposed_file="routes:\n - host: late.example.com\n", justification="late",
bottle_slug="dev", tool=TOOL_CAPABILITY_BLOCK,
proposed_file="FROM python:3.13\n", justification="late",
current_file_hash="x",
now=datetime(2026, 5, 25, 14, 0, 0, tzinfo=timezone.utc),
)
@@ -319,6 +319,7 @@ class TestToolConstants(unittest.TestCase):
self.assertEqual(
(
supervise.TOOL_EGRESS_ALLOW,
TOOL_CAPABILITY_BLOCK,
supervise.TOOL_EGRESS_BLOCK,
TOOL_GITLEAKS_ALLOW,
supervise.TOOL_EGRESS_TOKEN_ALLOW,
@@ -377,16 +378,20 @@ class TestSupervisePrepare(unittest.TestCase):
supervise.bot_bottle_root = fake_root # type: ignore[assignment]
return lambda: setattr(supervise, "bot_bottle_root", original)
def test_prepare_creates_queue(self):
def test_prepare_creates_queue_and_current_config(self):
plan = _StubSupervise().prepare("dev", self.stage_dir)
self.assertTrue(plan.queue_dir.is_dir())
self.assertTrue(plan.current_config_dir.is_dir())
self.assertEqual("dev", plan.slug)
self.assertEqual("", plan.internal_network)
def test_prepare_does_not_create_current_config_dir(self):
def test_prepare_writes_no_files_to_current_config(self):
# dockerfile_content is no longer accepted by prepare.
# routes.yaml + allowlist live behind the
# `list-egress-routes` MCP tool (PRD 0017 chunk 3).
plan = _StubSupervise().prepare("dev", self.stage_dir)
self.assertFalse((self.stage_dir / "current-config").exists())
self.assertFalse(hasattr(plan, "current_config_dir"))
files = sorted(p.name for p in plan.current_config_dir.iterdir())
self.assertEqual([], files)
if __name__ == "__main__":
+30 -24
View File
@@ -18,7 +18,7 @@ from bot_bottle.supervise import (
STATUS_APPROVED,
STATUS_MODIFIED,
STATUS_REJECTED,
TOOL_EGRESS_ALLOW,
TOOL_CAPABILITY_BLOCK,
TOOL_GITLEAKS_ALLOW,
TOOL_EGRESS_TOKEN_ALLOW,
read_audit_entries,
@@ -30,8 +30,9 @@ from bot_bottle.supervise import (
FIXED = datetime(2026, 5, 25, 12, 0, 0, tzinfo=timezone.utc)
def _proposal(slug: str = "dev", tool: str = TOOL_EGRESS_ALLOW) -> Proposal:
def _proposal(slug: str = "dev", tool: str = TOOL_CAPABILITY_BLOCK) -> Proposal:
payloads = {
TOOL_CAPABILITY_BLOCK: "FROM python:3.13\n",
supervise.TOOL_EGRESS_ALLOW: "routes:\n - host: example.com\n",
supervise.TOOL_EGRESS_BLOCK: "routes:\n - host: example.com\n",
TOOL_GITLEAKS_ALLOW: "file: tests/test_fixture.py\nline: 3\n",
@@ -85,14 +86,14 @@ class TestDiscoverPending(_FakeHomeMixin, unittest.TestCase):
def test_sorted_by_arrival_across_bottles(self):
early = Proposal.new(
bottle_slug="api", tool=TOOL_EGRESS_ALLOW,
proposed_file="routes:\n - host: early.example.com\n", justification="early",
bottle_slug="api", tool=TOOL_CAPABILITY_BLOCK,
proposed_file="FROM python:3.13\n", justification="early",
current_file_hash="h",
now=datetime(2026, 5, 25, 10, 0, 0, tzinfo=timezone.utc),
)
late = Proposal.new(
bottle_slug="dev", tool=TOOL_EGRESS_ALLOW,
proposed_file="routes:\n - host: late.example.com\n", justification="late",
bottle_slug="dev", tool=TOOL_CAPABILITY_BLOCK,
proposed_file="FROM python:3.13\n", justification="late",
current_file_hash="h",
now=datetime(2026, 5, 25, 14, 0, 0, tzinfo=timezone.utc),
)
@@ -121,7 +122,7 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
def tearDown(self):
self._teardown_fake_home()
def _enqueue(self, tool: str = TOOL_EGRESS_ALLOW):
def _enqueue(self, tool: str = TOOL_CAPABILITY_BLOCK):
p = _proposal(tool=tool)
qdir = supervise.queue_dir_for_slug("dev")
qdir.mkdir(parents=True, exist_ok=True)
@@ -130,29 +131,19 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
def test_approve_writes_response(self):
qp = self._enqueue()
with patch(
"bot_bottle.cli.supervise.apply_routes_change",
return_value=("routes: []\n", "routes:\n - host: example.com\n"),
):
supervise_cli.approve(qp)
resp = read_response(qp.queue_dir, qp.proposal.id)
supervise_cli.approve(qp)
# capability-block is archived on approve, so the response file
# moves to processed/ before the caller can read it.
resp = read_response(qp.queue_dir / "processed", qp.proposal.id)
self.assertEqual(STATUS_APPROVED, resp.status)
self.assertIsNone(resp.final_file)
def test_approve_with_final_file_marks_modified(self):
qp = self._enqueue()
with patch(
"bot_bottle.cli.supervise.apply_routes_change",
return_value=("routes: []\n", "routes:\n - host: edited.example.com\n"),
):
supervise_cli.approve(
qp,
final_file="routes:\n - host: edited.example.com\n",
notes="tweaked",
)
resp = read_response(qp.queue_dir, qp.proposal.id)
supervise_cli.approve(qp, final_file="FROM bookworm\n", notes="tweaked")
resp = read_response(qp.queue_dir / "processed", qp.proposal.id)
self.assertEqual(STATUS_MODIFIED, resp.status)
self.assertEqual("routes:\n - host: edited.example.com\n", resp.final_file)
self.assertEqual("FROM bookworm\n", resp.final_file)
self.assertEqual("tweaked", resp.notes)
def test_reject_writes_rejection(self):
@@ -162,6 +153,11 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
self.assertEqual(STATUS_REJECTED, resp.status)
self.assertEqual("nope", resp.notes)
def test_no_audit_log_for_capability_block(self):
qp = self._enqueue(tool=TOOL_CAPABILITY_BLOCK)
supervise_cli.approve(qp)
self.assertEqual([], read_audit_entries("egress", "dev"))
def test_approve_egress_block_writes_audit_log(self):
qp = self._enqueue(tool=supervise.TOOL_EGRESS_BLOCK)
with patch(
@@ -236,6 +232,11 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
self.assertEqual(".txt", supervise_cli._suffix_for_tool(TOOL_EGRESS_TOKEN_ALLOW))
# class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
# # DISABLED — capability_apply functionality is currently commented out.
# pass
class TestEditInEditor(unittest.TestCase):
def test_runs_editor_returns_edited_content(self):
original_editor = os.environ.get("EDITOR")
@@ -280,5 +281,10 @@ class TestEditInEditor(unittest.TestCase):
os.environ["EDITOR"] = original_editor
# class TestCapabilityBlockSmolmachinesGuard(_FakeHomeMixin, unittest.TestCase):
# # DISABLED — capability_apply functionality is currently commented out.
# pass
if __name__ == "__main__":
unittest.main()
+21 -166
View File
@@ -20,7 +20,6 @@ import supervise as _sv # noqa: E402 # type: ignore
from bot_bottle import supervise_server # noqa: E402
from bot_bottle.supervise_server import (
ERR_INTERNAL,
ERR_INVALID_PARAMS,
ERR_INVALID_REQUEST,
ERR_METHOD_NOT_FOUND,
@@ -30,9 +29,7 @@ from bot_bottle.supervise_server import (
PROPOSED_FILE_FIELD,
ServerConfig,
TOOL_DEFINITIONS,
_RpcClientError,
_RpcError,
_RpcInternalError,
_response_timeout_from_env,
format_response_text,
handle_initialize,
@@ -50,15 +47,15 @@ from bot_bottle.supervise_server import (
class TestValidation(unittest.TestCase):
def test_capability_block_accepts_anything_nonempty(self):
validate_proposed_file(
_sv.TOOL_CAPABILITY_BLOCK,
"FROM python:3.13\nRUN apk add git\n",
)
def test_empty_proposed_file_rejected_for_tools_with_file_field(self):
with self.assertRaises(_RpcError):
validate_proposed_file(_sv.TOOL_EGRESS_ALLOW, " \n\t")
def test_capability_block_rejected_as_unknown_tool(self):
with self.assertRaises(_RpcError) as cm:
validate_proposed_file("capability-block", "FROM python:3.13\n")
self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code)
self.assertIn("unknown tool", cm.exception.message)
validate_proposed_file(_sv.TOOL_CAPABILITY_BLOCK, " \n\t")
def test_egress_routes_yaml_is_validated(self):
validate_proposed_file(
@@ -80,65 +77,6 @@ class TestValidation(unittest.TestCase):
self.assertIn("must not change egress logging", cm.exception.message)
# --- Error taxonomy --------------------------------------------------------
class TestRpcErrorTaxonomy(unittest.TestCase):
def test_rpc_client_error_is_rpc_error(self):
e = _RpcClientError(ERR_INVALID_PARAMS, "bad param")
self.assertIsInstance(e, _RpcError)
self.assertEqual(ERR_INVALID_PARAMS, e.code)
self.assertEqual("bad param", e.message)
def test_rpc_internal_error_is_rpc_error(self):
e = _RpcInternalError("disk full")
self.assertIsInstance(e, _RpcError)
self.assertEqual(ERR_INTERNAL, e.code)
self.assertEqual("disk full", e.message)
def test_rpc_internal_error_preserves_cause(self):
cause = OSError("no space left on device")
try:
raise _RpcInternalError("failed to write") from cause
except _RpcInternalError as e:
self.assertIs(cause, e.__cause__)
def test_parse_error_is_client_error(self):
with self.assertRaises(_RpcClientError):
parse_jsonrpc(b"{bad json")
def test_validation_error_is_client_error(self):
with self.assertRaises(_RpcClientError):
validate_proposed_file(_sv.TOOL_EGRESS_ALLOW, "routes: nope\n")
def test_unknown_tool_in_tools_call_is_client_error(self):
config = ServerConfig(bottle_slug="dev", queue_dir=Path("/unused"))
with self.assertRaises(_RpcClientError) as cm:
handle_tools_call({"name": "no-such-tool", "arguments": {}}, config)
self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code)
class TestRpcInternalErrorOnIoFailure(unittest.TestCase):
def test_write_proposal_os_error_raises_internal(self):
config = ServerConfig(
bottle_slug="dev",
queue_dir=Path("/dev/null/cannot-exist"),
)
with self.assertRaises(_RpcInternalError) as cm:
handle_tools_call(
{
"name": _sv.TOOL_EGRESS_ALLOW,
"arguments": {
"routes_yaml": "routes:\n - host: example.com\n",
"justification": "x",
},
},
config,
)
self.assertEqual(ERR_INTERNAL, cm.exception.code)
self.assertIsNotNone(cm.exception.__cause__)
# --- JSON-RPC parsing ------------------------------------------------------
@@ -219,6 +157,7 @@ class TestHandleToolsList(unittest.TestCase):
self.assertEqual(
sorted([
_sv.TOOL_EGRESS_ALLOW,
_sv.TOOL_CAPABILITY_BLOCK,
_sv.TOOL_EGRESS_BLOCK,
_sv.TOOL_LIST_EGRESS_ROUTES,
]),
@@ -294,10 +233,10 @@ class TestHandleToolsCall(unittest.TestCase):
try:
result = handle_tools_call(
{
"name": _sv.TOOL_EGRESS_BLOCK,
"name": _sv.TOOL_CAPABILITY_BLOCK,
"arguments": {
"routes_yaml": "routes:\n - host: example.com\n",
"justification": "need example.com",
"dockerfile": "FROM python:3.13\n",
"justification": "need git",
},
},
self.config,
@@ -334,9 +273,9 @@ class TestHandleToolsCall(unittest.TestCase):
try:
result = handle_tools_call(
{
"name": _sv.TOOL_EGRESS_ALLOW,
"name": _sv.TOOL_CAPABILITY_BLOCK,
"arguments": {
"routes_yaml": "routes:\n - host: example.com\n",
"dockerfile": "FROM python:3.13\n",
"justification": "needed for tests",
},
},
@@ -358,52 +297,20 @@ class TestHandleToolsCall(unittest.TestCase):
with self.assertRaises(_RpcError):
handle_tools_call(
{
"name": _sv.TOOL_EGRESS_ALLOW,
"arguments": {"routes_yaml": "routes:\n - host: example.com\n"},
"name": _sv.TOOL_CAPABILITY_BLOCK,
"arguments": {"dockerfile": "FROM python:3.13\n"},
},
self.config,
)
def test_missing_name_raises(self):
with self.assertRaises(_RpcError) as cm:
handle_tools_call({"arguments": {}}, self.config)
self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code)
def test_arguments_must_be_object(self):
with self.assertRaises(_RpcError) as cm:
handle_tools_call(
{
"name": _sv.TOOL_EGRESS_ALLOW,
"arguments": [],
},
self.config,
)
self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code)
self.assertIn("must be an object", cm.exception.message)
def test_capability_block_call_raises_unknown_tool(self):
with self.assertRaises(_RpcError) as cm:
handle_tools_call(
{
"name": "capability-block",
"arguments": {
"dockerfile": "FROM python:3.13\n",
"justification": "need git",
},
},
self.config,
)
self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code)
self.assertIn("unknown tool", cm.exception.message)
def test_archives_proposal_after_response(self):
responder = self._respond_when_proposal_appears(_sv.STATUS_APPROVED)
try:
handle_tools_call(
{
"name": _sv.TOOL_EGRESS_ALLOW,
"name": _sv.TOOL_CAPABILITY_BLOCK,
"arguments": {
"routes_yaml": "routes:\n - host: example.com\n",
"dockerfile": "FROM python:3.13\n",
"justification": "x",
},
},
@@ -425,10 +332,10 @@ class TestHandleToolsCall(unittest.TestCase):
)
result = handle_tools_call(
{
"name": _sv.TOOL_EGRESS_ALLOW,
"name": _sv.TOOL_CAPABILITY_BLOCK,
"arguments": {
"routes_yaml": "routes:\n - host: example.com\n",
"justification": "need egress",
"dockerfile": "FROM python:3.13\n",
"justification": "need a capability",
},
},
config,
@@ -443,31 +350,6 @@ class TestHandleToolsCall(unittest.TestCase):
class TestHandleListEgressRoutes(unittest.TestCase):
def test_success_returns_body_text(self):
class _Resp:
def __enter__(self):
return self
def __exit__(self, exc_type: type[BaseException] | None, exc: BaseException | None, tb: object) -> bool:
return False
def read(self):
return b"[{\"host\": \"example.com\"}]"
class _Opener:
def open(self, *args, **kwargs): # noqa: ANN001, ANN002, ANN003 # type: ignore
return _Resp()
with patch.object(supervise_server.urllib.request, "build_opener", return_value=_Opener()):
result = handle_list_egress_routes(
{},
ServerConfig(bottle_slug="dev", queue_dir=Path("/unused")),
)
self.assertFalse(result["isError"]) # type: ignore[index]
text = result["content"][0]["text"] # type: ignore[index]
self.assertIn("example.com", text)
def test_url_error_returns_tool_error(self):
class _Opener:
def open(self, *args, **kwargs): # noqa: ANN001, ANN002, ANN003 # type: ignore
@@ -527,13 +409,6 @@ class TestFormatResponseText(unittest.TestCase):
self.assertIn("the operator modified", text.lower())
class TestFormatPendingResponseText(unittest.TestCase):
def test_formats_timeout_message(self):
text = supervise_server.format_pending_response_text(12.5)
self.assertIn("status: pending", text)
self.assertIn("12.5s", text)
# --- End-to-end HTTP sanity ------------------------------------------------
@@ -584,7 +459,7 @@ class TestHttpEndToEnd(unittest.TestCase):
self.assertEqual("2.0", result["jsonrpc"])
self.assertEqual(1, result["id"])
names = [t["name"] for t in result["result"]["tools"]] # type: ignore[index]
self.assertNotIn("capability-block", names)
self.assertIn(_sv.TOOL_CAPABILITY_BLOCK, names)
self.assertIn(_sv.TOOL_EGRESS_ALLOW, names)
self.assertIn(_sv.TOOL_EGRESS_BLOCK, names)
@@ -594,26 +469,6 @@ class TestHttpEndToEnd(unittest.TestCase):
)
self.assertEqual(ERR_METHOD_NOT_FOUND, result["error"]["code"]) # type: ignore[index]
def test_internal_error_returns_err_internal_over_http(self):
with patch.object(
supervise_server._sv, "write_proposal",
side_effect=OSError("disk full"),
):
result = self._post_jsonrpc({
"jsonrpc": "2.0",
"id": 99,
"method": "tools/call",
"params": {
"name": _sv.TOOL_EGRESS_ALLOW,
"arguments": {
"routes_yaml": "routes:\n - host: example.com\n",
"justification": "x",
},
},
})
self.assertIn("error", result)
self.assertEqual(ERR_INTERNAL, result["error"]["code"]) # type: ignore[index]
def test_health_endpoint(self):
conn = http.client.HTTPConnection("127.0.0.1", self.port, timeout=5)
try: