Compare commits

..

11 Commits

Author SHA1 Message Date
didericis bba24d87f7 fix(lint): resolve pyright and pylint issues in provider/backend changes
lint / lint (push) Successful in 1m31s
prd-check / no-prd-new-on-main (pull_request) Failing after 21s
test / unit (pull_request) Successful in 28s
test / integration (pull_request) Successful in 43s
- Remove unused Bottle import from docker/backend.py (pyright)
- Suppress wrong-import-position on circular-import-avoiding
  deferred imports in backend/__init__.py (pylint C0413)
- Add encoding="utf-8" to read_text() in smolmachines provision
  test (pylint W1514)
- Suppress consider-using-with on TemporaryDirectory setUp pattern
  in both provision test files (pylint R1732)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-07 11:38:54 -04:00
didericis efb3af4a93 feat(agent-provider): user plugin discovery, Dockerfile cascade, and provider-owned ca/git provisioning
- Add _load_user_plugin: loads AgentProvider subclass from
  ~/.bot-bottle/contrib/<name>/agent_provider.py; get_provider()
  checks there first before falling back to built-ins
- Add Dockerfile cascade to docker prepare: per-bottle override →
  manifest dockerfile → user plugin Dockerfile → provider default
- Move provision_ca and provision_git from backend-specific
  provision/ modules to AgentProvider ABC as overridable defaults;
  delete docker/provision/ca.py, docker/provision/git.py,
  smolmachines/provision/ca.py, smolmachines/provision/git.py
- Add git_gate_insteadof_host/scheme properties to BottlePlan base;
  SmolmachinesBottlePlan overrides them to return agent_git_gate_host
  and "http" so provision_git works correctly on both backends
- Move SIGKILL retry from smolmachines provision/ca.py into
  SmolmachinesBottle.exec via _exec_raw helper — all exec calls
  on smolmachines now transparently retry once on exit 137
- Relax manifest_agent template validation to allow user-defined
  template names; keep auth_token/forward_host_credentials guards
  for built-in-only features
- Update tests: rewrite test_docker_provision_git_user and
  test_smolmachines_provision to call provider methods directly;
  add TestSmolmachinesBottleExec for SIGKILL retry coverage

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-07 11:35:35 -04:00
didericis 65746af720 docs(prd): expand user-provider-plugins to cover Dockerfile convention and provisioning methods 2026-06-07 11:35:35 -04:00
didericis d9e9d27e01 ci(prd): rename PRD to prd-new placeholder per new convention 2026-06-07 11:35:35 -04:00
didericis-claude 83351606c6 docs: bump PRD number from 0052 to 0053
Renames docs/prds/0052-user-provider-plugins.md to 0053-user-provider-plugins.md
and updates the heading inside the file. 0052 is now reserved for the egress
DLP addon.
2026-06-07 11:35:35 -04:00
didericis-claude d528f578aa fix: correct broken imports and fileno() guard after rebase
codex_auth.py was moved into contrib/codex/ but still used `.log`/
`.util` relative imports that resolved to the parent bot_bottle
package before the move — update to `...log` / `...util`.

_read_winsize() called sys.stdin.fileno() outside the OSError guard;
pytest's redirected stdin raises UnsupportedOperation (an OSError
subclass) there, breaking test_returns_first_tty_size. Move fileno()
inside the try block so any non-TTY stream is skipped cleanly.
2026-06-07 11:35:35 -04:00
didericis-claude cf3310e818 docs: PRD 0052 — user-defined agent provider plugins
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-07 11:35:35 -04:00
didericis-claude 74d6b25183 refactor: move codex_auth into contrib/codex
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-07 11:35:35 -04:00
didericis-claude dc837a5400 feat(supervise)!: remove egress-block MCP tool and runtime route-mutation
lint / lint (push) Successful in 1m39s
test / unit (push) Successful in 40s
test / integration (push) Successful in 1m1s
test / unit (pull_request) Successful in 40s
Update Quality Badges / update-badges (push) Successful in 1m45s
test / integration (pull_request) Successful in 57s
Drops `egress-block` from the supervise sidecar, removes
`_merge_single_route`, `add_route`, and `apply_routes_change` from
egress_apply.py, and strips the proposal/approve/reject flow for egress
from the supervise CLI. The list-egress-routes and capability-block tools
are unaffected. Tests updated throughout.

Closes #198
2026-06-07 09:56:39 -04:00
didericis-claude 4eff49c9c5 build: drop unused agent-image apt deps
Removes socat, openssh-client, and dnsutils from Dockerfile.claude
and Dockerfile.codex.

- socat was the privileged forwarder for the in-container ssh-agent
  that PRD 0009 removed; nothing in bot_bottle references it.
- openssh-client was needed back when the agent talked ssh:// to
  upstreams; git-gate's insteadOf rewrites now route every upstream
  through HTTP/git-protocol, and ssh-keygen runs host-side from the
  deploy-key provisioner.
- dnsutils was only used by tests/integration/test_sandbox_escape.py
  (attack 4b runs dig from inside the agent container).

Splits python3/python3-pip/python3-venv onto a separate layer with
a comment noting they're app-specific and a candidate to move to a
downstream image.
2026-06-07 09:50:27 -04:00
didericis 965d5073c3 ci(prd): add prd-new placeholder convention and numbering workflow
Implements #213: PRDs use prd-new-<slug>.md while a PR is open; a
post-merge workflow on main assigns sequential numbers and renames the
file. A required PR check blocks prd-new-*.md from landing on main
without going through the workflow.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-06 22:02:21 -04:00
18 changed files with 267 additions and 692 deletions
+39
View File
@@ -0,0 +1,39 @@
# Block PRs that add prd-new-*.md files directly to main.
#
# prd-new-*.md files are placeholders — they must go through a PR so
# the post-merge prd-number workflow can assign a sequential number and
# rename the file. A direct push or a PR that slips through without
# triggering the check would leave an un-numbered PRD on main.
name: prd-check
on:
pull_request:
branches:
- main
paths:
- 'docs/prds/prd-new-*.md'
jobs:
no-prd-new-on-main:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Fail if prd-new-*.md files are present in the diff
run: |
base="${{ github.event.pull_request.base.sha }}"
head="${{ github.event.pull_request.head.sha }}"
new_prds=$(git diff --name-only --diff-filter=A "$base" "$head" \
| grep -E '^docs/prds/prd-new-.+\.md$' || true)
if [ -n "$new_prds" ]; then
echo "ERROR: PRs to main must not add prd-new-*.md files directly."
echo "These files must be merged via a feature branch so the"
echo "prd-number workflow can assign a sequential number on merge:"
echo "$new_prds"
exit 1
fi
echo "OK: no prd-new-*.md files added in this PR."
+123
View File
@@ -0,0 +1,123 @@
# Assign sequential numbers to prd-new-*.md files on merge to main.
#
# When a PR merges to main and includes prd-new-*.md files this workflow:
# 1. Finds the next available NNNN number by scanning existing PRDs.
# 2. Renames each prd-new-*.md to NNNN-<slug>.md.
# 3. Updates the title header (# PRD prd-new: → # PRD NNNN:).
# 4. Flips Status: Draft → Active when the merge commit also touched
# files outside docs/prds/ (i.e. the implementation shipped together
# with the PRD).
# 5. Commits the renaming back to main.
#
# No-op if the push contained no prd-new-*.md files.
name: prd-number
on:
push:
branches:
- main
paths:
- 'docs/prds/prd-new-*.md'
jobs:
assign-numbers:
runs-on: ubuntu-latest
permissions:
contents: write
steps:
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 2
token: ${{ secrets.GITHUB_TOKEN }}
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: Configure git
run: |
git config user.name "github-actions[bot]"
git config user.email "github-actions[bot]@users.noreply.github.com"
- name: Assign PRD numbers
run: |
python3 - <<'EOF'
import os
import re
import subprocess
import sys
from pathlib import Path
prds_dir = Path("docs/prds")
# Files added in the latest commit (HEAD vs HEAD~1).
result = subprocess.run(
["git", "diff", "--name-only", "--diff-filter=A", "HEAD~1", "HEAD"],
capture_output=True, text=True, check=True,
)
added = [Path(p) for p in result.stdout.splitlines()]
new_prds = [p for p in added if p.parent == prds_dir
and re.match(r"prd-new-.+\.md$", p.name)]
if not new_prds:
print("No prd-new-*.md files added in this commit — nothing to do.")
sys.exit(0)
# Determine whether non-PRD files were also changed (for Status flip).
all_changed = subprocess.run(
["git", "diff", "--name-only", "HEAD~1", "HEAD"],
capture_output=True, text=True, check=True,
).stdout.splitlines()
non_prd_changed = any(
not f.startswith("docs/prds/") for f in all_changed
)
# Find next available number.
existing = sorted(
int(m.group(1))
for p in prds_dir.glob("*.md")
if (m := re.match(r"^(\d{4})-", p.name))
)
next_num = (max(existing) + 1) if existing else 1
for prd_path in sorted(new_prds):
slug = re.sub(r"^prd-new-", "", prd_path.stem)
new_name = f"{next_num:04d}-{slug}.md"
new_path = prds_dir / new_name
print(f" {prd_path.name} → {new_name}")
content = prd_path.read_text()
# Update title header.
content = re.sub(
r"^(#\s+PRD\s+)prd-new(:)",
rf"\g<1>{next_num:04d}\2",
content,
count=1,
flags=re.MULTILINE,
)
# Conditionally flip Status.
if non_prd_changed:
content = re.sub(
r"(\*\*Status:\*\*\s*)Draft",
r"\g<1>Active",
content,
count=1,
)
new_path.write_text(content)
subprocess.run(["git", "rm", str(prd_path)], check=True)
subprocess.run(["git", "add", str(new_path)], check=True)
next_num += 1
subprocess.run(
["git", "commit", "-m", "ci(prd): assign sequential numbers to new PRDs"],
check=True,
)
subprocess.run(["git", "push"], check=True)
EOF
+5 -4
View File
@@ -36,10 +36,11 @@ the container lifecycle and the copying of skills and env vars into it.
- Three kinds of doc, each with its own conventions in-folder; see - Three kinds of doc, each with its own conventions in-folder; see
`docs/README.md` for when to write which: `docs/README.md` for when to write which:
- **PRDs** (`docs/prds/`) — one feature per file, numbered - **PRDs** (`docs/prds/`) — one feature per file. While a PR is open
`NNNN-kebab.md`. A `Status:` line tracks lifecycle: Draft → Active the file is named `prd-new-<kebab>.md`; CI assigns a sequential
(shipped to `main`) → Superseded/Retargeted. Format in number on merge to `main` and renames it. A `Status:` line tracks
`docs/prds/README.md`. lifecycle: Draft → Active (shipped to `main`) →
Superseded/Retargeted. Format in `docs/prds/README.md`.
- **Research notes** (`docs/research/`) — opinionated investigations; - **Research notes** (`docs/research/`) — opinionated investigations;
unnumbered kebab-case, freeform and verdict-first. See unnumbered kebab-case, freeform and verdict-first. See
`docs/research/README.md`. `docs/research/README.md`.
+13 -7
View File
@@ -16,14 +16,20 @@ FROM node:22-slim
# features (status checks, commits, PR creation) — without git in the # features (status checks, commits, PR creation) — without git in the
# image, those features fail in surprising ways once the user does any # image, those features fail in surprising ways once the user does any
# real work. ca-certificates is already in the slim base; listed for # real work. ca-certificates is already in the slim base; listed for
# clarity in case the base ever drops it. socat is the privileged # clarity in case the base ever drops it. curl is here so any
# forwarder for the in-container ssh-agent (see bot_bottle/ssh.py): the agent # HTTPS_PROXY-aware tool (curl itself, plus anything that shells out
# runs as root and rejects non-root connections, so socat sits between # to it) works against egress's bumped TLS without the agent needing
# node and the agent socket. curl is here so any HTTPS_PROXY-aware # local DNS.
# tool (curl itself, plus anything that shells out to it) works
# against egress's bumped TLS without the agent needing local DNS.
RUN apt-get update \ RUN apt-get update \
&& apt-get install -y --no-install-recommends git ca-certificates openssh-client socat curl dnsutils python3 python3-pip python3-venv \ && apt-get install -y --no-install-recommends git ca-certificates curl \
&& rm -rf /var/lib/apt/lists/*
# App-specific deps. Python isn't required by claude-code itself
# (claude-code is a Node CLI), but is convenient for the agent to
# shell out to for ad-hoc scripts. Kept on its own layer so it can
# be moved to a downstream image if the base ever needs to shrink.
RUN apt-get update \
&& apt-get install -y --no-install-recommends python3 python3-pip python3-venv \
&& rm -rf /var/lib/apt/lists/* && rm -rf /var/lib/apt/lists/*
# Install claude-code globally. Pinned to the version verified in the v1 # Install claude-code globally. Pinned to the version verified in the v1
+9 -1
View File
@@ -6,7 +6,15 @@
FROM node:22-slim FROM node:22-slim
RUN apt-get update \ RUN apt-get update \
&& apt-get install -y --no-install-recommends git ca-certificates openssh-client socat curl dnsutils python3 python3-pip python3-venv \ && apt-get install -y --no-install-recommends git ca-certificates curl \
&& rm -rf /var/lib/apt/lists/*
# App-specific deps. Python isn't required by codex itself
# (codex is a Node CLI), but is convenient for the agent to shell
# out to for ad-hoc scripts. Kept on its own layer so it can be
# moved to a downstream image if the base ever needs to shrink.
RUN apt-get update \
&& apt-get install -y --no-install-recommends python3 python3-pip python3-venv \
&& rm -rf /var/lib/apt/lists/* && rm -rf /var/lib/apt/lists/*
RUN npm install -g --no-fund --no-audit @openai/codex@0.136.0 \ RUN npm install -g --no-fund --no-audit @openai/codex@0.136.0 \
+2 -2
View File
@@ -411,8 +411,8 @@ class BottleBackend(ABC, Generic[PlanT, CleanupT]):
# Import concrete backend classes AFTER the base types are defined, so # Import concrete backend classes AFTER the base types are defined, so
# each backend module can pull BottleSpec / BottlePlan / BottleBackend # each backend module can pull BottleSpec / BottlePlan / BottleBackend
# via `from . import ...` without hitting a partially-initialized module. # via `from . import ...` without hitting a partially-initialized module.
from .docker import DockerBottleBackend # noqa: E402 from .docker import DockerBottleBackend # noqa: E402 # pylint: disable=wrong-import-position
from .smolmachines import SmolmachinesBottleBackend # noqa: E402 from .smolmachines import SmolmachinesBottleBackend # noqa: E402 # pylint: disable=wrong-import-position
# The dict is heterogeneous: each value is a BottleBackend specialized # The dict is heterogeneous: each value is a BottleBackend specialized
+1 -1
View File
@@ -25,7 +25,7 @@ from pathlib import Path
from typing import Generator, Sequence from typing import Generator, Sequence
from ...supervise import SUPERVISE_HOSTNAME, SUPERVISE_PORT from ...supervise import SUPERVISE_HOSTNAME, SUPERVISE_PORT
from .. import ActiveAgent, Bottle, BottleBackend, BottleSpec from .. import ActiveAgent, BottleBackend, BottleSpec
from . import cleanup as _cleanup from . import cleanup as _cleanup
from . import enumerate as _enumerate from . import enumerate as _enumerate
from . import launch as _launch from . import launch as _launch
+5 -191
View File
@@ -1,70 +1,20 @@
"""Host-side helper to apply a routes.yaml change to a running """Host-side helper for egress sidecar inspection (issue #198).
egress sidecar (PRD 0014 retargeted by PRD 0017 chunk 3, PRD 0053).
Used by the supervise dashboard when the operator approves an `_merge_single_route`, `add_route`, and `apply_routes_change` were
egress-block proposal. Fetches current routes.yaml, validates, removed when the egress-block MCP tool was dropped. The remaining
writes into the sidecar, then SIGHUPs to reload. helpers support runtime inspection and validation of the routes file
without modifying it at runtime.
""" """
from __future__ import annotations from __future__ import annotations
import json
import subprocess import subprocess
from pathlib import Path
from typing import cast
from ...egress import EGRESS_ROUTES_IN_CONTAINER from ...egress import EGRESS_ROUTES_IN_CONTAINER
from ...egress_addon_core import load_routes from ...egress_addon_core import load_routes
from ...yaml_subset import YamlSubsetError, parse_yaml_subset
from .bottle_state import egress_state_dir
from .sidecar_bundle import sidecar_bundle_container_name from .sidecar_bundle import sidecar_bundle_container_name
def _render_routes_payload(routes_list: list[dict[str, object]]) -> str:
"""Render a list-of-dicts routes payload as YAML matching the
shape `egress_render_routes` produces."""
if not routes_list:
return "routes: []\n"
lines: list[str] = ["routes:"]
for entry in routes_list:
host = str(entry.get("host", ""))
lines.append(f' - host: "{host}"')
auth_scheme = entry.get("auth_scheme")
token_env = entry.get("token_env")
if auth_scheme and token_env:
lines.append(f' auth_scheme: "{auth_scheme}"')
lines.append(f' token_env: "{token_env}"')
matches_obj = entry.get("matches")
if isinstance(matches_obj, list) and matches_obj:
lines.append(" matches:")
for match_entry in matches_obj:
me = cast(dict[str, object], match_entry)
first_key = True
if "paths" in me:
lines.append(" - paths:")
first_key = False
for pd in cast(list[dict[str, str]], me["paths"]):
if "type" in pd:
lines.append(f' - type: "{pd["type"]}"')
lines.append(f' value: "{pd["value"]}"')
else:
lines.append(f' - value: "{pd["value"]}"')
if "methods" in me:
methods_str = ", ".join(
f'"{m}"' for m in cast(list[str], me["methods"])
)
prefix = " - " if first_key else " "
lines.append(f'{prefix}methods: [{methods_str}]')
first_key = False
if first_key:
lines.append(" - {}")
return "\n".join(lines) + "\n"
def _egress_routes_host_path(slug: str) -> Path:
return egress_state_dir(slug) / "egress_routes.yaml"
class EgressApplyError(RuntimeError): class EgressApplyError(RuntimeError):
pass pass
@@ -92,144 +42,8 @@ def validate_routes_content(content: str) -> None:
) from e ) from e
def apply_routes_change(slug: str, new_content: str) -> tuple[str, str]:
container = sidecar_bundle_container_name(slug)
before = fetch_current_routes(slug)
validate_routes_content(new_content)
target = _egress_routes_host_path(slug)
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(new_content)
target.chmod(0o644)
sig = subprocess.run(
["docker", "kill", "--signal", "HUP", container],
capture_output=True, text=True, check=False,
)
if sig.returncode != 0:
raise EgressApplyError(
f"failed to SIGHUP {container}: "
f"{(sig.stderr or '').strip()}"
)
return before, new_content
def _merge_single_route(
current_yaml: str, new_route: dict[str, object],
) -> str:
"""Merge a single proposed route into the current routes.yaml.
- Host absent → append the route.
- Host present → union the match paths (proposed existing).
Auth is preserved from existing route.
"""
try:
cfg = parse_yaml_subset(current_yaml)
except YamlSubsetError as e:
raise EgressApplyError(
f"current routes.yaml is not valid YAML: {e}"
) from e
routes = cfg.get("routes")
if not isinstance(routes, list):
raise EgressApplyError(
"current routes.yaml: 'routes' is not a list"
)
routes_typed = cast(list[object], routes)
new_host = str(new_route.get("host", "")).lower()
if not new_host:
raise EgressApplyError(
"proposed route is missing 'host'"
)
# Build proposed matches from the input
proposed_matches = new_route.get("matches")
if proposed_matches is None:
# Accept legacy path_allowlist from agent proposals and convert
proposed_paths = new_route.get("path_allowlist")
if isinstance(proposed_paths, list) and proposed_paths:
proposed_matches = [{"paths": [{"value": p} for p in proposed_paths]}]
for entry in routes_typed:
if not isinstance(entry, dict):
continue
entry_typed = cast(dict[str, object], entry)
if str(entry_typed.get("host", "")).lower() == new_host:
# Merge matches: union path values from proposed into existing
if isinstance(proposed_matches, list) and proposed_matches:
existing_matches = entry_typed.get("matches")
if not isinstance(existing_matches, list):
existing_matches = []
# Simple merge: collect all existing path values, add new ones
existing_paths: set[str] = set()
for me in existing_matches:
me_typed = cast(dict[str, object], me) if isinstance(me, dict) else {}
paths = me_typed.get("paths")
if isinstance(paths, list):
for p in paths:
p_typed = cast(dict[str, object], p) if isinstance(p, dict) else {}
val = p_typed.get("value")
if isinstance(val, str):
existing_paths.add(val)
new_paths: list[str] = []
for me in proposed_matches:
me_typed = cast(dict[str, object], me) if isinstance(me, dict) else {}
paths = me_typed.get("paths")
if isinstance(paths, list):
for p in paths:
p_typed = cast(dict[str, object], p) if isinstance(p, dict) else {}
val = p_typed.get("value")
if isinstance(val, str) and val not in existing_paths:
new_paths.append(val)
existing_paths.add(val)
if new_paths:
existing_matches.append(
{"paths": [{"value": p} for p in new_paths]}
)
entry_typed["matches"] = existing_matches
break
else:
entry_typed: dict[str, object] = {"host": new_route.get("host")} # type: ignore
if isinstance(proposed_matches, list) and proposed_matches:
entry_typed["matches"] = proposed_matches
auth = new_route.get("auth")
if isinstance(auth, dict) and auth.get("scheme") and auth.get("token_ref"): # type: ignore
auth_typed = cast(dict[str, object], auth)
existing_slots = sorted({
str(r_entry.get("token_env", ""))
for r_entry_obj in routes_typed
if isinstance(r_entry_obj, dict)
for r_entry in [cast(dict[str, object], r_entry_obj)]
if r_entry.get("token_env")
})
next_idx = len(existing_slots)
entry_typed["auth_scheme"] = str(cast(object, auth_typed.get("scheme")))
entry_typed["token_env"] = f"EGRESS_TOKEN_{next_idx}"
routes_typed.append(entry_typed)
return _render_routes_payload(cast(list[dict[str, object]], routes_typed))
def add_route(slug: str, proposed_route_json: str) -> tuple[str, str]:
try:
proposed = json.loads(proposed_route_json)
except json.JSONDecodeError as e:
raise EgressApplyError(
f"proposed route is not valid JSON: {e}"
) from e
if not isinstance(proposed, dict):
raise EgressApplyError(
"proposed route must be a JSON object"
)
current = fetch_current_routes(slug)
merged = _merge_single_route(current, proposed)
return apply_routes_change(slug, merged)
__all__ = [ __all__ = [
"EgressApplyError", "EgressApplyError",
"add_route",
"apply_routes_change",
"fetch_current_routes", "fetch_current_routes",
"validate_routes_content", "validate_routes_content",
] ]
+5 -14
View File
@@ -2,9 +2,8 @@
act on them (approve / modify / reject). act on them (approve / modify / reject).
Curses-based TUI; modify-then-approve shells out to $EDITOR. The Curses-based TUI; modify-then-approve shells out to $EDITOR. The
approval handlers wire to the per-tool remediation engines: approval handler wires to PRD 0016 (capability-block), which rebuilds
PRD 0014 (egress) writes routes.yaml + SIGHUPs egress; PRD 0016 the bottle Dockerfile. The egress-block tool was removed in issue #198.
(capability) rebuilds the bottle Dockerfile.
""" """
from __future__ import annotations from __future__ import annotations
@@ -26,7 +25,6 @@ from ..backend.docker.capability_apply import (
CapabilityApplyError, CapabilityApplyError,
apply_capability_change, apply_capability_change,
) )
from ..backend.docker.egress_apply import EgressApplyError, add_route
from ..log import Die, error, info from ..log import Die, error, info
from ..supervise import ( from ..supervise import (
COMPONENT_FOR_TOOL, COMPONENT_FOR_TOOL,
@@ -37,7 +35,6 @@ from ..supervise import (
STATUS_MODIFIED, STATUS_MODIFIED,
STATUS_REJECTED, STATUS_REJECTED,
TOOL_CAPABILITY_BLOCK, TOOL_CAPABILITY_BLOCK,
TOOL_EGRESS_BLOCK,
archive_proposal, archive_proposal,
list_pending_proposals, list_pending_proposals,
render_diff, render_diff,
@@ -61,7 +58,7 @@ class QueuedProposal:
# Errors any remediation engine may raise. Caught by the TUI key # Errors any remediation engine may raise. Caught by the TUI key
# handlers and surfaced in the status line so a failed apply keeps # handlers and surfaced in the status line so a failed apply keeps
# the proposal pending rather than crashing curses. # the proposal pending rather than crashing curses.
ApplyError = (EgressApplyError, CapabilityApplyError) ApplyError = (CapabilityApplyError,)
def discover_pending() -> list[QueuedProposal]: def discover_pending() -> list[QueuedProposal]:
@@ -82,9 +79,7 @@ def discover_pending() -> list[QueuedProposal]:
def _approval_status(qp: QueuedProposal, verb: str) -> str: def _approval_status(qp: QueuedProposal, verb: str) -> str:
"""Status-line text after a successful approval.""" """Status-line text after a successful approval."""
base = f"{verb} {qp.proposal.tool} for [{qp.proposal.bottle_slug}]" base = f"{verb} {qp.proposal.tool} for [{qp.proposal.bottle_slug}]"
if qp.proposal.tool == TOOL_CAPABILITY_BLOCK: return f"{base}; resume: ./cli.py resume {qp.proposal.bottle_slug}"
return f"{base}; resume: ./cli.py resume {qp.proposal.bottle_slug}"
return base
def _detail_lines( def _detail_lines(
@@ -132,11 +127,7 @@ def approve(
file_to_apply = final_file if final_file is not None else qp.proposal.proposed_file file_to_apply = final_file if final_file is not None else qp.proposal.proposed_file
diff_before, diff_after = "", "" diff_before, diff_after = "", ""
if qp.proposal.tool == TOOL_EGRESS_BLOCK: if qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
diff_before, diff_after = add_route(
qp.proposal.bottle_slug, file_to_apply,
)
elif qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
_meta = read_metadata(qp.proposal.bottle_slug) _meta = read_metadata(qp.proposal.bottle_slug)
if _meta is not None and not _meta.compose_project: if _meta is not None and not _meta.compose_project:
raise CapabilityApplyError( raise CapabilityApplyError(
+2 -7
View File
@@ -48,11 +48,9 @@ from pathlib import Path
SUPERVISE_HOSTNAME = "supervise" SUPERVISE_HOSTNAME = "supervise"
SUPERVISE_PORT = 9100 SUPERVISE_PORT = 9100
TOOL_EGRESS_BLOCK = "egress-block"
TOOL_CAPABILITY_BLOCK = "capability-block" TOOL_CAPABILITY_BLOCK = "capability-block"
TOOL_LIST_EGRESS_ROUTES = "list-egress-routes" TOOL_LIST_EGRESS_ROUTES = "list-egress-routes"
TOOLS: tuple[str, ...] = ( TOOLS: tuple[str, ...] = (
TOOL_EGRESS_BLOCK,
TOOL_CAPABILITY_BLOCK, TOOL_CAPABILITY_BLOCK,
TOOL_LIST_EGRESS_ROUTES, TOOL_LIST_EGRESS_ROUTES,
) )
@@ -70,10 +68,8 @@ EGRESS_INTROSPECT_URL = "http://_egress.local/allowlist"
# capability-block has no on-disk config the operator edits in place # capability-block has no on-disk config the operator edits in place
# (the Dockerfile is rebuilt, not patched), so it has no audit log # (the Dockerfile is rebuilt, not patched), so it has no audit log
# here — those changes are captured by git history + the rebuild # here — those changes are captured by git history + the rebuild
# record laid down in PRD 0016. # record laid down in PRD 0016. egress-block was removed in issue #198.
COMPONENT_FOR_TOOL: dict[str, str] = { COMPONENT_FOR_TOOL: dict[str, str] = {}
TOOL_EGRESS_BLOCK: "egress",
}
STATUS_APPROVED = "approved" STATUS_APPROVED = "approved"
STATUS_MODIFIED = "modified" STATUS_MODIFIED = "modified"
@@ -555,7 +551,6 @@ __all__ = [
"EGRESS_FORWARD_PROXY", "EGRESS_FORWARD_PROXY",
"EGRESS_INTROSPECT_URL", "EGRESS_INTROSPECT_URL",
"TOOL_CAPABILITY_BLOCK", "TOOL_CAPABILITY_BLOCK",
"TOOL_EGRESS_BLOCK",
"TOOL_LIST_EGRESS_ROUTES", "TOOL_LIST_EGRESS_ROUTES",
"archive_proposal", "archive_proposal",
"audit_dir", "audit_dir",
+6 -142
View File
@@ -1,8 +1,10 @@
"""Supervise sidecar HTTP server (PRD 0013). """Supervise sidecar HTTP server (PRD 0013).
Per-bottle MCP server exposing two tools `egress-block`, Per-bottle MCP server exposing tools the agent calls to propose config
`capability-block` that the agent calls to propose config changes changes when stuck. The egress-block tool was removed in issue #198;
when stuck. Each tool call: the remaining tools are `capability-block` and `list-egress-routes`.
Each queued tool call:
1. Validates the proposed file syntactically. 1. Validates the proposed file syntactically.
2. Writes a Proposal to /run/supervise/queue/ (bind-mounted from 2. Writes a Proposal to /run/supervise/queue/ (bind-mounted from
@@ -133,69 +135,6 @@ def jsonrpc_error(request_id: object, code: int, message: str) -> bytes:
TOOL_DEFINITIONS: list[dict[str, object]] = [ TOOL_DEFINITIONS: list[dict[str, object]] = [
{
"name": _sv.TOOL_EGRESS_BLOCK,
"description": (
"Call when egress refused your HTTPS request — host "
"without a matching route, or a request that did not match "
"the route's matches rules (typically a 403 from the "
"proxy). Propose a SINGLE route to add: the host you "
"need + (optionally) a path_allowlist of path prefixes + "
"(optionally) an auth block. The supervisor merges the "
"route into the live table at approval time — you do NOT "
"need to see or reproduce the existing routes. If the "
"host already has a route, the proposed paths are unioned "
"with the existing ones (host stays single-route). The "
"operator approves or rejects in the supervise TUI. On "
"approval the supervisor writes the merged routes.yaml "
"and SIGHUPs egress (no dropped connections)."
),
"inputSchema": {
"type": "object",
"properties": {
"host": {
"type": "string",
"description": (
"The hostname to allow (e.g. 'api.github.com'). "
"Case-insensitive on match."
),
},
"path_allowlist": {
"type": "array",
"items": {"type": "string"},
"description": (
"Optional URL path prefixes the route permits. "
"Each must start with '/'. Omit to allow all "
"paths under this host (bare-pass route). "
"Internally converted to matches entries."
),
},
"auth": {
"type": "object",
"description": (
"Optional credential injection. {scheme, "
"token_ref}: scheme is 'Bearer' or 'token'; "
"token_ref names the host env var holding the "
"secret value. Omit to add a host without "
"credential injection. Ignored if the host "
"already has a route (operator decides auth "
"changes, not the agent)."
),
"properties": {
"scheme": {"type": "string"},
"token_ref": {"type": "string"},
},
"required": ["scheme", "token_ref"],
"additionalProperties": False,
},
"justification": {
"type": "string",
"description": "Why this host needs to be allowed.",
},
},
"required": ["host", "justification"],
},
},
{ {
"name": _sv.TOOL_LIST_EGRESS_ROUTES, "name": _sv.TOOL_LIST_EGRESS_ROUTES,
"description": ( "description": (
@@ -254,11 +193,6 @@ PROPOSED_FILE_FIELD: dict[str, str] = {
# --- Validation ------------------------------------------------------------ # --- Validation ------------------------------------------------------------
# Auth schemes accepted on egress-block proposals — match the
# manifest-side EGRESS_AUTH_SCHEMES.
_AUTH_SCHEMES = ("Bearer", "token")
def validate_proposed_file(tool: str, content: str) -> None: def validate_proposed_file(tool: str, content: str) -> None:
"""Syntactic validation. The operator is the real gate; this just """Syntactic validation. The operator is the real gate; this just
catches obvious paste-errors / wrong-tool selections before they catches obvious paste-errors / wrong-tool selections before they
@@ -273,70 +207,6 @@ def validate_proposed_file(tool: str, content: str) -> None:
raise _RpcError(ERR_INVALID_PARAMS, f"unknown tool {tool!r}") raise _RpcError(ERR_INVALID_PARAMS, f"unknown tool {tool!r}")
def _validate_and_bundle_egress_route(
args: dict[str, object],
) -> str:
"""Validate egress-block input fields and bundle them into
a JSON string that becomes the Proposal.proposed_file. Raises
_RpcError on bad input the agent retries with a fixed shape."""
tool = _sv.TOOL_EGRESS_BLOCK
host = args.get("host")
if not isinstance(host, str) or not host.strip():
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: 'host' is required and must be a non-empty string",
)
payload: dict[str, object] = {"host": host}
path_allow_raw = args.get("path_allowlist")
if path_allow_raw is not None:
if not isinstance(path_allow_raw, list):
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: 'path_allowlist' must be an array of strings",
)
prefixes: list[str] = []
for i, p in enumerate(path_allow_raw):
if not isinstance(p, str):
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: path_allowlist[{i}] must be a string",
)
if not p.startswith("/"):
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: path_allowlist[{i}] {p!r} must start with '/'",
)
prefixes.append(p)
if prefixes:
payload["path_allowlist"] = prefixes
auth_raw = args.get("auth")
if auth_raw is not None:
if not isinstance(auth_raw, dict):
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: 'auth' must be an object with 'scheme' and 'token_ref'",
)
scheme = auth_raw.get("scheme")
token_ref = auth_raw.get("token_ref")
if not isinstance(scheme, str) or scheme not in _AUTH_SCHEMES:
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: auth.scheme must be one of "
f"{', '.join(_AUTH_SCHEMES)} (got {scheme!r})",
)
if not isinstance(token_ref, str) or not token_ref:
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: auth.token_ref must be a non-empty string "
f"naming the host env var holding the token",
)
payload["auth"] = {"scheme": scheme, "token_ref": token_ref}
return json.dumps(payload, indent=2) + "\n"
# --- MCP handlers ---------------------------------------------------------- # --- MCP handlers ----------------------------------------------------------
@@ -422,13 +292,7 @@ def handle_tools_call(
f"{name}: 'justification' is required and must be a non-empty string", f"{name}: 'justification' is required and must be a non-empty string",
) )
if name == _sv.TOOL_EGRESS_BLOCK: if name in PROPOSED_FILE_FIELD:
# Structured input → JSON bundle on Proposal.proposed_file.
# The dashboard's apply step (egress_apply.add_route)
# parses this JSON, fetches the current routes, merges in
# the new one, and writes the merged file.
proposed_file = _validate_and_bundle_egress_route(args_raw)
elif name in PROPOSED_FILE_FIELD:
file_field = PROPOSED_FILE_FIELD[name] file_field = PROPOSED_FILE_FIELD[name]
proposed_file = args_raw.get(file_field) proposed_file = args_raw.get(file_field)
if not isinstance(proposed_file, str): if not isinstance(proposed_file, str):
+7 -4
View File
@@ -7,9 +7,12 @@ document vs. a research note or a decision record).
## Naming and numbering ## Naming and numbering
`NNNN-kebab-title.md`, zero-padded and sequential (`0024-…`, `0025-…`). New PRDs use a `prd-new-<kebab-title>.md` placeholder name while the PR
Numbers are never reused; gaps are fine (there is no 0005). The number is open. On merge to `main` a CI workflow assigns the next sequential
is assigned at creation and stays fixed for the life of the doc. number (`0024-…`, `0025-…`), renames the file, and updates the title
header. Numbers are never reused; gaps are fine.
Once numbered, the filename stays fixed for the life of the doc.
## Status ## Status
@@ -23,7 +26,7 @@ The `Status:` line near the top tracks the PRD's lifecycle:
## Format ## Format
```markdown ```markdown
# PRD NNNN: <short title> # PRD prd-new: <short title> ← placeholder; CI fills in the number on merge
- **Status:** Draft - **Status:** Draft
- **Author:** <who> - **Author:** <who>
+1 -1
View File
@@ -120,7 +120,7 @@ def _git_config_exec_calls(bottle: MagicMock) -> list[tuple[str, str]]:
class TestProvisionGitUser(unittest.TestCase): class TestProvisionGitUser(unittest.TestCase):
def setUp(self): def setUp(self):
self._tmp = tempfile.TemporaryDirectory(prefix="cb-prov-git-user.") self._tmp = tempfile.TemporaryDirectory(prefix="cb-prov-git-user.") # pylint: disable=consider-using-with
self.stage = Path(self._tmp.name) self.stage = Path(self._tmp.name)
def tearDown(self): def tearDown(self):
+3 -136
View File
@@ -1,27 +1,19 @@
"""Unit: validate_routes_content (PRD 0014 retargeted by PRD 0017 """Unit: validate_routes_content (issue #198: _merge_single_route and
chunk 3, PRD 0053). docker exec / cp / kill paths are covered by the add_route removed; docker exec / cp / kill paths are covered by the
integration test.""" integration test)."""
import unittest import unittest
from bot_bottle.backend.docker.egress_apply import ( from bot_bottle.backend.docker.egress_apply import (
EgressApplyError, EgressApplyError,
_merge_single_route,
validate_routes_content, validate_routes_content,
) )
from bot_bottle.yaml_subset import parse_yaml_subset
_ROUTES_EMPTY = "routes: []\n" _ROUTES_EMPTY = "routes: []\n"
_ROUTES_ONE = 'routes:\n - host: "api.anthropic.com"\n' _ROUTES_ONE = 'routes:\n - host: "api.anthropic.com"\n'
def _routes(parsed: str) -> list[dict]: # type: ignore
"""Parse a YAML routes string and pull out the routes list, so
tests can assert on shape directly."""
return parse_yaml_subset(parsed)["routes"] # type: ignore
class TestValidateRoutesContent(unittest.TestCase): class TestValidateRoutesContent(unittest.TestCase):
def test_accepts_minimal_route_table(self): def test_accepts_minimal_route_table(self):
validate_routes_content(_ROUTES_EMPTY) validate_routes_content(_ROUTES_EMPTY)
@@ -60,130 +52,5 @@ class TestValidateRoutesContent(unittest.TestCase):
) )
class TestMergeSingleRoute(unittest.TestCase):
BASE = _ROUTES_ONE
def test_appends_route_when_host_absent(self):
merged = _merge_single_route(self.BASE, {"host": "github.com"})
hosts = [r["host"] for r in _routes(merged)]
self.assertEqual(["api.anthropic.com", "github.com"], hosts)
def test_appends_matches(self):
merged = _merge_single_route(
self.BASE,
{"host": "github.com", "matches": [
{"paths": [{"value": "/repos/x/"}]}
]},
)
new_route = _routes(merged)[-1]
self.assertIn("matches", new_route)
def test_appends_legacy_path_allowlist_as_matches(self):
merged = _merge_single_route(
self.BASE,
{"host": "github.com", "path_allowlist": ["/repos/x/"]},
)
new_route = _routes(merged)[-1]
self.assertIn("matches", new_route)
def test_appends_auth_with_token_env_slot(self):
merged = _merge_single_route(
self.BASE,
{
"host": "api.github.com",
"auth": {"scheme": "Bearer", "token_ref": "GH"},
},
)
new_route = _routes(merged)[-1]
self.assertEqual("Bearer", new_route["auth_scheme"])
self.assertEqual("EGRESS_TOKEN_0", new_route["token_env"])
def test_auth_slot_increments_past_existing(self):
base = (
'routes:\n'
' - host: "api.anthropic.com"\n'
' auth_scheme: "Bearer"\n'
' token_env: "EGRESS_TOKEN_0"\n'
)
merged = _merge_single_route(base, {
"host": "api.github.com",
"auth": {"scheme": "Bearer", "token_ref": "GH"},
})
new_route = _routes(merged)[-1]
self.assertEqual("EGRESS_TOKEN_1", new_route["token_env"])
def test_existing_host_merges_match_paths_as_union(self):
base = (
'routes:\n'
' - host: "github.com"\n'
' matches:\n'
' - paths:\n'
' - value: "/a/"\n'
)
merged = _merge_single_route(base, {
"host": "github.com",
"matches": [{"paths": [{"value": "/b/"}]}],
})
routes = _routes(merged)
self.assertEqual(1, len(routes))
all_paths: list[str] = []
for me in routes[0].get("matches", []):
for p in me.get("paths", []):
all_paths.append(p["value"])
self.assertIn("/a/", all_paths)
self.assertIn("/b/", all_paths)
def test_existing_host_dedup_match_paths(self):
base = (
'routes:\n'
' - host: "github.com"\n'
' matches:\n'
' - paths:\n'
' - value: "/a/"\n'
)
merged = _merge_single_route(base, {
"host": "github.com",
"matches": [{"paths": [{"value": "/a/"}, {"value": "/b/"}]}],
})
all_paths: list[str] = []
for me in _routes(merged)[0].get("matches", []):
for p in me.get("paths", []):
all_paths.append(p["value"])
self.assertEqual(1, all_paths.count("/a/"))
self.assertIn("/b/", all_paths)
def test_existing_host_preserves_existing_auth_ignores_proposed(self):
base = (
'routes:\n'
' - host: "api.github.com"\n'
' auth_scheme: "Bearer"\n'
' token_env: "EGRESS_TOKEN_0"\n'
)
merged = _merge_single_route(base, {
"host": "api.github.com",
"auth": {"scheme": "token", "token_ref": "DIFFERENT"},
})
route = _routes(merged)[0]
self.assertEqual("Bearer", route["auth_scheme"])
self.assertEqual("EGRESS_TOKEN_0", route["token_env"])
def test_host_match_is_case_insensitive(self):
base = 'routes:\n - host: "GitHub.com"\n'
merged = _merge_single_route(base, {
"host": "github.com",
"matches": [{"paths": [{"value": "/x/"}]}],
})
routes = _routes(merged)
self.assertEqual(1, len(routes))
def test_missing_host_raises(self):
with self.assertRaises(EgressApplyError):
_merge_single_route(self.BASE, {})
def test_invalid_current_yaml_raises(self):
with self.assertRaises(EgressApplyError):
_merge_single_route("routes:\n\tbad", {"host": "x.example"})
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
+4 -4
View File
@@ -254,7 +254,7 @@ class TestProvisionCA(unittest.TestCase):
cp_in + exec in the right order.""" cp_in + exec in the right order."""
def setUp(self): def setUp(self):
self._tmp = tempfile.TemporaryDirectory(prefix="cb-prov-ca.") self._tmp = tempfile.TemporaryDirectory(prefix="cb-prov-ca.") # pylint: disable=consider-using-with
self.tmp = Path(self._tmp.name) self.tmp = Path(self._tmp.name)
self.egress_ca = self.tmp / "egress-ca.pem" self.egress_ca = self.tmp / "egress-ca.pem"
_write_self_signed_cert(self.egress_ca) _write_self_signed_cert(self.egress_ca)
@@ -346,7 +346,7 @@ class TestProvisionGit(unittest.TestCase):
when its condition doesn't hold.""" when its condition doesn't hold."""
def setUp(self): def setUp(self):
self._tmp = tempfile.TemporaryDirectory(prefix="cb-prov-git.") self._tmp = tempfile.TemporaryDirectory(prefix="cb-prov-git.") # pylint: disable=consider-using-with
self.stage = Path(self._tmp.name) self.stage = Path(self._tmp.name)
def tearDown(self): def tearDown(self):
@@ -407,7 +407,7 @@ class TestProvisionGit(unittest.TestCase):
cp_call = bottle.cp_in.call_args cp_call = bottle.cp_in.call_args
staged_path = Path(cp_call.args[0]) staged_path = Path(cp_call.args[0])
self.assertEqual(self.stage, staged_path.parent) self.assertEqual(self.stage, staged_path.parent)
content = staged_path.read_text() content = staged_path.read_text(encoding="utf-8")
self.assertIn( self.assertIn(
'[url "http://127.0.0.1:9418/bot-bottle.git"]', content, '[url "http://127.0.0.1:9418/bot-bottle.git"]', content,
) )
@@ -507,7 +507,7 @@ class TestProvisionGitUser(unittest.TestCase):
class TestProvisionWorkspace(unittest.TestCase): class TestProvisionWorkspace(unittest.TestCase):
def setUp(self): def setUp(self):
self._tmp = tempfile.TemporaryDirectory(prefix="cb-prov-workspace.") self._tmp = tempfile.TemporaryDirectory(prefix="cb-prov-workspace.") # pylint: disable=consider-using-with
self.stage = Path(self._tmp.name) self.stage = Path(self._tmp.name)
def tearDown(self): def tearDown(self):
+12 -14
View File
@@ -17,7 +17,6 @@ from bot_bottle.supervise import (
STATUS_MODIFIED, STATUS_MODIFIED,
STATUS_REJECTED, STATUS_REJECTED,
TOOL_CAPABILITY_BLOCK, TOOL_CAPABILITY_BLOCK,
TOOL_EGRESS_BLOCK,
archive_proposal, archive_proposal,
audit_log_path, audit_log_path,
list_pending_proposals, list_pending_proposals,
@@ -37,16 +36,16 @@ FIXED_TS = datetime(2026, 5, 25, 12, 0, 0, tzinfo=timezone.utc)
def _proposal( def _proposal(
tool: str = TOOL_EGRESS_BLOCK, tool: str = TOOL_CAPABILITY_BLOCK,
proposed: str = "{}", proposed: str = "FROM python:3.13\n",
justification: str = "need a route", justification: str = "need a capability",
) -> Proposal: ) -> Proposal:
return Proposal.new( return Proposal.new(
bottle_slug="dev", bottle_slug="dev",
tool=tool, tool=tool,
proposed_file=proposed, proposed_file=proposed,
justification=justification, justification=justification,
current_file_hash=sha256_hex("{}"), current_file_hash=sha256_hex(proposed),
now=FIXED_TS, now=FIXED_TS,
) )
@@ -57,7 +56,7 @@ class TestProposalRoundtrip(unittest.TestCase):
self.assertTrue(p.id) self.assertTrue(p.id)
self.assertEqual("2026-05-25T12:00:00+00:00", p.arrival_timestamp) self.assertEqual("2026-05-25T12:00:00+00:00", p.arrival_timestamp)
self.assertEqual("dev", p.bottle_slug) self.assertEqual("dev", p.bottle_slug)
self.assertEqual(TOOL_EGRESS_BLOCK, p.tool) self.assertEqual(TOOL_CAPABILITY_BLOCK, p.tool)
def test_to_from_dict_roundtrip(self): def test_to_from_dict_roundtrip(self):
p = _proposal() p = _proposal()
@@ -142,14 +141,14 @@ class TestQueueIO(unittest.TestCase):
def test_list_pending_sorted_by_arrival(self): def test_list_pending_sorted_by_arrival(self):
# Fabricate two with explicit timestamps. # Fabricate two with explicit timestamps.
a = Proposal.new( a = Proposal.new(
bottle_slug="dev", tool=TOOL_EGRESS_BLOCK, bottle_slug="dev", tool=TOOL_CAPABILITY_BLOCK,
proposed_file="{}", justification="early", proposed_file="FROM python:3.13\n", justification="early",
current_file_hash="x", current_file_hash="x",
now=datetime(2026, 5, 25, 10, 0, 0, tzinfo=timezone.utc), now=datetime(2026, 5, 25, 10, 0, 0, tzinfo=timezone.utc),
) )
b = Proposal.new( b = Proposal.new(
bottle_slug="dev", tool=TOOL_EGRESS_BLOCK, bottle_slug="dev", tool=TOOL_CAPABILITY_BLOCK,
proposed_file="{}", justification="late", proposed_file="FROM python:3.13\n", justification="late",
current_file_hash="x", current_file_hash="x",
now=datetime(2026, 5, 25, 14, 0, 0, tzinfo=timezone.utc), now=datetime(2026, 5, 25, 14, 0, 0, tzinfo=timezone.utc),
) )
@@ -318,16 +317,15 @@ class TestToolConstants(unittest.TestCase):
def test_tools_tuple_matches_individual_constants(self): def test_tools_tuple_matches_individual_constants(self):
self.assertEqual( self.assertEqual(
( (
TOOL_EGRESS_BLOCK,
TOOL_CAPABILITY_BLOCK, TOOL_CAPABILITY_BLOCK,
supervise.TOOL_LIST_EGRESS_ROUTES, supervise.TOOL_LIST_EGRESS_ROUTES,
), ),
supervise.TOOLS, supervise.TOOLS,
) )
def test_component_map_covers_egress_remediation_only(self): def test_component_map_has_no_entries(self):
self.assertIn(TOOL_EGRESS_BLOCK, supervise.COMPONENT_FOR_TOOL) # egress-block removed in issue #198; capability-block never had one.
self.assertNotIn(TOOL_CAPABILITY_BLOCK, supervise.COMPONENT_FOR_TOOL) self.assertEqual({}, supervise.COMPONENT_FOR_TOOL)
class _StubSupervise(supervise.Supervise): class _StubSupervise(supervise.Supervise):
+18 -152
View File
@@ -1,12 +1,10 @@
"""Unit: supervise headless paths (PRD 0013 phase 4, PRD 0014). """Unit: supervise headless paths (PRD 0013 phase 4, PRD 0016).
The curses TUI itself isn't exercised here — these tests cover the The curses TUI itself isn't exercised here — these tests cover the
discovery + approve/reject + audit-write paths that the TUI's key discovery + approve/reject paths that the TUI's key handlers call into.
handlers call into.
add_route is stubbed at the supervise CLI module level so the tests egress-block (add_route) was removed in issue #198; the TestEgressApplyWiring
don't need a running egress sidecar; the real docker exec/cp/SIGHUP class and all stubs for add_route have been dropped accordingly.
plumbing is covered by the integration test.
""" """
import os import os
@@ -17,7 +15,6 @@ from pathlib import Path
from bot_bottle import supervise from bot_bottle import supervise
from bot_bottle.backend.docker.capability_apply import CapabilityApplyError from bot_bottle.backend.docker.capability_apply import CapabilityApplyError
from bot_bottle.backend.docker.egress_apply import EgressApplyError
from bot_bottle.cli import supervise as supervise_cli from bot_bottle.cli import supervise as supervise_cli
from bot_bottle.supervise import ( from bot_bottle.supervise import (
Proposal, Proposal,
@@ -25,7 +22,6 @@ from bot_bottle.supervise import (
STATUS_MODIFIED, STATUS_MODIFIED,
STATUS_REJECTED, STATUS_REJECTED,
TOOL_CAPABILITY_BLOCK, TOOL_CAPABILITY_BLOCK,
TOOL_EGRESS_BLOCK,
read_audit_entries, read_audit_entries,
read_response, read_response,
sha256_hex, sha256_hex,
@@ -35,9 +31,8 @@ from bot_bottle.supervise import (
FIXED = datetime(2026, 5, 25, 12, 0, 0, tzinfo=timezone.utc) FIXED = datetime(2026, 5, 25, 12, 0, 0, tzinfo=timezone.utc)
def _proposal(slug: str = "dev", tool: str = TOOL_EGRESS_BLOCK) -> Proposal: def _proposal(slug: str = "dev", tool: str = TOOL_CAPABILITY_BLOCK) -> Proposal:
payloads = { payloads = {
TOOL_EGRESS_BLOCK: '{"routes": []}\n',
TOOL_CAPABILITY_BLOCK: "FROM python:3.13\n", TOOL_CAPABILITY_BLOCK: "FROM python:3.13\n",
} }
payload = payloads.get(tool, "") payload = payloads.get(tool, "")
@@ -88,14 +83,14 @@ class TestDiscoverPending(_FakeHomeMixin, unittest.TestCase):
def test_sorted_by_arrival_across_bottles(self): def test_sorted_by_arrival_across_bottles(self):
early = Proposal.new( early = Proposal.new(
bottle_slug="api", tool=TOOL_EGRESS_BLOCK, bottle_slug="api", tool=TOOL_CAPABILITY_BLOCK,
proposed_file="{}", justification="early", proposed_file="FROM python:3.13\n", justification="early",
current_file_hash="h", current_file_hash="h",
now=datetime(2026, 5, 25, 10, 0, 0, tzinfo=timezone.utc), now=datetime(2026, 5, 25, 10, 0, 0, tzinfo=timezone.utc),
) )
late = Proposal.new( late = Proposal.new(
bottle_slug="dev", tool=TOOL_EGRESS_BLOCK, bottle_slug="dev", tool=TOOL_CAPABILITY_BLOCK,
proposed_file="{}", justification="late", proposed_file="FROM python:3.13\n", justification="late",
current_file_hash="h", current_file_hash="h",
now=datetime(2026, 5, 25, 14, 0, 0, tzinfo=timezone.utc), now=datetime(2026, 5, 25, 14, 0, 0, tzinfo=timezone.utc),
) )
@@ -120,48 +115,38 @@ class TestDiscoverPending(_FakeHomeMixin, unittest.TestCase):
class TestApproveReject(_FakeHomeMixin, unittest.TestCase): class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
def setUp(self): def setUp(self):
self._setup_fake_home() self._setup_fake_home()
self._original_add_route = supervise_cli.add_route
self._original_apply_capability = supervise_cli.apply_capability_change self._original_apply_capability = supervise_cli.apply_capability_change
# Default stubs: succeed with deterministic before/after so the
# audit log shows a non-empty diff.
supervise_cli.add_route = lambda slug, content: ( # type: ignore
'{"routes": []}\n', '{"routes": [{"host": "x"}]}\n',
)
supervise_cli.apply_capability_change = lambda slug, content: ( # type: ignore supervise_cli.apply_capability_change = lambda slug, content: ( # type: ignore
"FROM old\n", content, "FROM old\n", content,
) )
def tearDown(self): def tearDown(self):
supervise_cli.add_route = self._original_add_route
supervise_cli.apply_capability_change = self._original_apply_capability supervise_cli.apply_capability_change = self._original_apply_capability
self._teardown_fake_home() self._teardown_fake_home()
def _enqueue(self, tool: str = TOOL_EGRESS_BLOCK): def _enqueue(self, tool: str = TOOL_CAPABILITY_BLOCK):
p = _proposal(tool=tool) p = _proposal(tool=tool)
qdir = supervise.queue_dir_for_slug("dev") qdir = supervise.queue_dir_for_slug("dev")
qdir.mkdir(parents=True, exist_ok=True) qdir.mkdir(parents=True, exist_ok=True)
supervise.write_proposal(qdir, p) supervise.write_proposal(qdir, p)
return supervise_cli.QueuedProposal(proposal=p, queue_dir=qdir) return supervise_cli.QueuedProposal(proposal=p, queue_dir=qdir)
def test_approve_writes_response_and_audit(self): def test_approve_writes_response(self):
qp = self._enqueue() qp = self._enqueue()
supervise_cli.approve(qp) supervise_cli.approve(qp)
resp = read_response(qp.queue_dir, qp.proposal.id) # capability-block is archived on approve, so the response file
# moves to processed/ before the caller can read it.
resp = read_response(qp.queue_dir / "processed", qp.proposal.id)
self.assertEqual(STATUS_APPROVED, resp.status) self.assertEqual(STATUS_APPROVED, resp.status)
self.assertIsNone(resp.final_file) self.assertIsNone(resp.final_file)
entries = read_audit_entries("egress", "dev")
self.assertEqual(1, len(entries))
self.assertEqual("approved", entries[0].operator_action)
def test_approve_with_final_file_marks_modified(self): def test_approve_with_final_file_marks_modified(self):
qp = self._enqueue() qp = self._enqueue()
supervise_cli.approve(qp, final_file='{"routes": [{"path": "/x/"}]}\n', notes="tweaked") supervise_cli.approve(qp, final_file="FROM bookworm\n", notes="tweaked")
resp = read_response(qp.queue_dir, qp.proposal.id) resp = read_response(qp.queue_dir / "processed", qp.proposal.id)
self.assertEqual(STATUS_MODIFIED, resp.status) self.assertEqual(STATUS_MODIFIED, resp.status)
self.assertEqual('{"routes": [{"path": "/x/"}]}\n', resp.final_file) self.assertEqual("FROM bookworm\n", resp.final_file)
self.assertEqual("tweaked", resp.notes) self.assertEqual("tweaked", resp.notes)
entries = read_audit_entries("egress", "dev")
self.assertEqual("modified", entries[0].operator_action)
def test_reject_writes_rejection(self): def test_reject_writes_rejection(self):
qp = self._enqueue() qp = self._enqueue()
@@ -169,113 +154,13 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
resp = read_response(qp.queue_dir, qp.proposal.id) resp = read_response(qp.queue_dir, qp.proposal.id)
self.assertEqual(STATUS_REJECTED, resp.status) self.assertEqual(STATUS_REJECTED, resp.status)
self.assertEqual("nope", resp.notes) self.assertEqual("nope", resp.notes)
entries = read_audit_entries("egress", "dev")
self.assertEqual("rejected", entries[0].operator_action)
self.assertEqual("nope", entries[0].operator_notes)
def test_capability_block_skips_audit_log(self): def test_no_audit_log_for_capability_block(self):
qp = self._enqueue(tool=TOOL_CAPABILITY_BLOCK) qp = self._enqueue(tool=TOOL_CAPABILITY_BLOCK)
supervise_cli.approve(qp) supervise_cli.approve(qp)
# No audit log for capability-block (per PRD 0013 / 0016).
self.assertEqual([], read_audit_entries("egress", "dev")) self.assertEqual([], read_audit_entries("egress", "dev"))
class TestEgressApplyWiring(_FakeHomeMixin, unittest.TestCase):
"""PRD 0017 chunk 3: approve() on an egress-block proposal
must call add_route (single-route merge) with the right args
and surface its failures."""
def setUp(self):
self._setup_fake_home()
self._original_add_route = supervise_cli.add_route
def tearDown(self):
supervise_cli.add_route = self._original_add_route
self._teardown_fake_home()
def _enqueue_egress(self, proposed: str = '{"host": "x.example"}\n'):
p = Proposal.new(
bottle_slug="dev", tool=TOOL_EGRESS_BLOCK,
proposed_file=proposed,
justification="need a route",
current_file_hash=sha256_hex(proposed),
now=FIXED,
)
qdir = supervise.queue_dir_for_slug("dev")
qdir.mkdir(parents=True, exist_ok=True)
supervise.write_proposal(qdir, p)
return supervise_cli.QueuedProposal(proposal=p, queue_dir=qdir)
def test_egress_block_calls_add_route_with_proposed_json(self):
calls = []
supervise_cli.add_route = lambda slug, content: ( # type: ignore
calls.append((slug, content)) or ("before", "after")
)
qp = self._enqueue_egress(
proposed='{"host": "new.example", "path_allowlist": ["/x/"]}\n'
)
supervise_cli.approve(qp)
self.assertEqual(1, len(calls))
slug, content = calls[0]
self.assertEqual("dev", slug)
# The single-route JSON the agent proposed reaches add_route
# unchanged — add_route fetches current state + merges.
self.assertEqual(
'{"host": "new.example", "path_allowlist": ["/x/"]}\n',
content,
)
def test_modify_passes_final_file_to_add_route(self):
calls = []
supervise_cli.add_route = lambda slug, content: ( # type: ignore
calls.append(content) or ("before", "after")
)
qp = self._enqueue_egress()
supervise_cli.approve(
qp,
final_file='{"host": "edited.example"}\n',
notes="tweaked",
)
self.assertEqual(['{"host": "edited.example"}\n'], calls)
def test_apply_failure_blocks_response_and_audit(self):
supervise_cli.add_route = lambda slug, content: (_ for _ in ()).throw( # type: ignore
EgressApplyError("docker exec failed")
)
qp = self._enqueue_egress()
with self.assertRaises(EgressApplyError):
supervise_cli.approve(qp)
# No response file (proposal stays pending).
self.assertEqual(
[qp.proposal.id],
[p.id for p in supervise.list_pending_proposals(qp.queue_dir)],
)
# No audit entry.
self.assertEqual([], read_audit_entries("egress", "dev"))
def test_real_diff_lands_in_audit(self):
supervise_cli.add_route = lambda slug, content: ( # type: ignore
'{"routes": []}\n', # before
'{"routes": [{"host": "new.example"}]}\n', # after
)
qp = self._enqueue_egress(proposed='{"host": "new.example"}\n')
supervise_cli.approve(qp)
entries = read_audit_entries("egress", "dev")
self.assertEqual(1, len(entries))
self.assertIn('+{"routes": [{"host": "new.example"}]}', entries[0].diff)
self.assertIn('-{"routes": []}', entries[0].diff)
def test_reject_does_not_call_apply(self):
qp = self._enqueue_egress()
supervise_cli.reject(qp, reason="no thanks")
# Reject still writes a response + audit entry with empty diff.
resp = read_response(qp.queue_dir, qp.proposal.id)
self.assertEqual(STATUS_REJECTED, resp.status)
entries = read_audit_entries("egress", "dev")
self.assertEqual(1, len(entries))
self.assertEqual("", entries[0].diff)
class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase): class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
"""PRD 0016 Phase 3: approve() on a capability-block proposal """PRD 0016 Phase 3: approve() on a capability-block proposal
calls apply_capability_change, archives the proposal afterward calls apply_capability_change, archives the proposal afterward
@@ -328,17 +213,12 @@ class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
supervise_cli.apply_capability_change = lambda slug, content: ("FROM old\n", content) # type: ignore supervise_cli.apply_capability_change = lambda slug, content: ("FROM old\n", content) # type: ignore
qp = self._enqueue_capability() qp = self._enqueue_capability()
supervise_cli.approve(qp) supervise_cli.approve(qp)
# capability-block has no audit log per PRD 0013 — its record
# lives in the per-bottle Dockerfile + transcript state.
self.assertEqual([], read_audit_entries("egress", "dev")) self.assertEqual([], read_audit_entries("egress", "dev"))
def test_proposal_archived_after_apply(self): def test_proposal_archived_after_apply(self):
supervise_cli.apply_capability_change = lambda slug, content: ("FROM old\n", content) # type: ignore supervise_cli.apply_capability_change = lambda slug, content: ("FROM old\n", content) # type: ignore
qp = self._enqueue_capability() qp = self._enqueue_capability()
supervise_cli.approve(qp) supervise_cli.approve(qp)
# Sidecar would normally archive after delivering the response,
# but it's gone by then. The supervise TUI archives so
# discover_pending stops surfacing the resolved proposal.
self.assertEqual([], supervise.list_pending_proposals(qp.queue_dir)) self.assertEqual([], supervise.list_pending_proposals(qp.queue_dir))
processed = list((qp.queue_dir / "processed").glob("*.json")) processed = list((qp.queue_dir / "processed").glob("*.json"))
self.assertEqual(2, len(processed)) self.assertEqual(2, len(processed))
@@ -346,20 +226,8 @@ class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
class TestEditInEditor(unittest.TestCase): class TestEditInEditor(unittest.TestCase):
def test_runs_editor_returns_edited_content(self): def test_runs_editor_returns_edited_content(self):
# Fake "editor" is /bin/sh -c 'cat <<EOF > $1 ... EOF'
original_editor = os.environ.get("EDITOR") original_editor = os.environ.get("EDITOR")
try: try:
# Use a fake editor that overwrites the file with a known
# marker. EDITOR is split with shlex equivalence by
# subprocess.run when invoked as a list — keep it as a
# single program path that takes the file as argv[1].
os.environ["EDITOR"] = (
"/bin/sh -c 'printf %s \"edited\" > \"$0\"'"
)
# subprocess.run with the str as the first list element
# would try to find a binary literally named "/bin/sh -c ..."
# — that won't work. Use shell mode trick: wrap in a script.
# Easier: build a tiny helper script.
with tempfile.NamedTemporaryFile( with tempfile.NamedTemporaryFile(
mode="w", suffix=".sh", delete=False, prefix="fake-editor.", mode="w", suffix=".sh", delete=False, prefix="fake-editor.",
) as script: ) as script:
@@ -381,7 +249,6 @@ class TestEditInEditor(unittest.TestCase):
def test_returns_none_when_unchanged(self): def test_returns_none_when_unchanged(self):
original_editor = os.environ.get("EDITOR") original_editor = os.environ.get("EDITOR")
try: try:
# No-op editor: touch the file (leaves it unchanged).
with tempfile.NamedTemporaryFile( with tempfile.NamedTemporaryFile(
mode="w", suffix=".sh", delete=False, prefix="noop-editor.", mode="w", suffix=".sh", delete=False, prefix="noop-editor.",
) as script: ) as script:
@@ -445,7 +312,6 @@ class TestCapabilityBlockSmolmachinesGuard(_FakeHomeMixin, unittest.TestCase):
supervise_cli.approve(qp) # must not raise supervise_cli.approve(qp) # must not raise
def test_no_metadata_falls_through_to_docker_path(self): def test_no_metadata_falls_through_to_docker_path(self):
# No metadata at all → assume Docker (backward-compatible).
qp = self._enqueue_capability("dev") qp = self._enqueue_capability("dev")
supervise_cli.approve(qp) # must not raise supervise_cli.approve(qp) # must not raise
+12 -12
View File
@@ -141,7 +141,6 @@ class TestHandleToolsList(unittest.TestCase):
names = [t["name"] for t in result["tools"]] # type: ignore[index] names = [t["name"] for t in result["tools"]] # type: ignore[index]
self.assertEqual( self.assertEqual(
sorted([ sorted([
_sv.TOOL_EGRESS_BLOCK,
_sv.TOOL_CAPABILITY_BLOCK, _sv.TOOL_CAPABILITY_BLOCK,
_sv.TOOL_LIST_EGRESS_ROUTES, _sv.TOOL_LIST_EGRESS_ROUTES,
]), ]),
@@ -206,10 +205,10 @@ class TestHandleToolsCall(unittest.TestCase):
try: try:
result = handle_tools_call( result = handle_tools_call(
{ {
"name": _sv.TOOL_EGRESS_BLOCK, "name": _sv.TOOL_CAPABILITY_BLOCK,
"arguments": { "arguments": {
"host": "example.com", "dockerfile": "FROM python:3.13\n",
"justification": "need a route", "justification": "need git",
}, },
}, },
self.config, self.config,
@@ -250,8 +249,8 @@ class TestHandleToolsCall(unittest.TestCase):
with self.assertRaises(_RpcError): with self.assertRaises(_RpcError):
handle_tools_call( handle_tools_call(
{ {
"name": _sv.TOOL_EGRESS_BLOCK, "name": _sv.TOOL_CAPABILITY_BLOCK,
"arguments": {"host": "example.com"}, "arguments": {"dockerfile": "FROM python:3.13\n"},
}, },
self.config, self.config,
) )
@@ -261,9 +260,9 @@ class TestHandleToolsCall(unittest.TestCase):
try: try:
handle_tools_call( handle_tools_call(
{ {
"name": _sv.TOOL_EGRESS_BLOCK, "name": _sv.TOOL_CAPABILITY_BLOCK,
"arguments": { "arguments": {
"host": "example.com", "dockerfile": "FROM python:3.13\n",
"justification": "x", "justification": "x",
}, },
}, },
@@ -285,10 +284,10 @@ class TestHandleToolsCall(unittest.TestCase):
) )
result = handle_tools_call( result = handle_tools_call(
{ {
"name": _sv.TOOL_EGRESS_BLOCK, "name": _sv.TOOL_CAPABILITY_BLOCK,
"arguments": { "arguments": {
"host": "example.com", "dockerfile": "FROM python:3.13\n",
"justification": "need a route", "justification": "need a capability",
}, },
}, },
config, config,
@@ -412,7 +411,8 @@ class TestHttpEndToEnd(unittest.TestCase):
self.assertEqual("2.0", result["jsonrpc"]) self.assertEqual("2.0", result["jsonrpc"])
self.assertEqual(1, result["id"]) self.assertEqual(1, result["id"])
names = [t["name"] for t in result["result"]["tools"]] # type: ignore[index] names = [t["name"] for t in result["result"]["tools"]] # type: ignore[index]
self.assertIn(_sv.TOOL_EGRESS_BLOCK, names) self.assertIn(_sv.TOOL_CAPABILITY_BLOCK, names)
self.assertNotIn("egress-block", names)
def test_unknown_method_returns_jsonrpc_error(self): def test_unknown_method_returns_jsonrpc_error(self):
result = self._post_jsonrpc( result = self._post_jsonrpc(