Compare commits

..

7 Commits

Author SHA1 Message Date
didericis ef0f3109b7 feat(agent-provider): user plugin discovery, Dockerfile cascade, and provider-owned ca/git provisioning
lint / lint (push) Failing after 1m26s
test / unit (pull_request) Successful in 30s
test / integration (pull_request) Successful in 47s
- Add _load_user_plugin: loads AgentProvider subclass from
  ~/.bot-bottle/contrib/<name>/agent_provider.py; get_provider()
  checks there first before falling back to built-ins
- Add Dockerfile cascade to docker prepare: per-bottle override →
  manifest dockerfile → user plugin Dockerfile → provider default
- Move provision_ca and provision_git from backend-specific
  provision/ modules to AgentProvider ABC as overridable defaults;
  delete docker/provision/ca.py, docker/provision/git.py,
  smolmachines/provision/ca.py, smolmachines/provision/git.py
- Add git_gate_insteadof_host/scheme properties to BottlePlan base;
  SmolmachinesBottlePlan overrides them to return agent_git_gate_host
  and "http" so provision_git works correctly on both backends
- Move SIGKILL retry from smolmachines provision/ca.py into
  SmolmachinesBottle.exec via _exec_raw helper — all exec calls
  on smolmachines now transparently retry once on exit 137
- Relax manifest_agent template validation to allow user-defined
  template names; keep auth_token/forward_host_credentials guards
  for built-in-only features
- Update tests: rewrite test_docker_provision_git_user and
  test_smolmachines_provision to call provider methods directly;
  add TestSmolmachinesBottleExec for SIGKILL retry coverage

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-07 10:39:58 -04:00
didericis a6e5f93bbe docs(prd): expand user-provider-plugins to cover Dockerfile convention and provisioning methods
test / unit (pull_request) Successful in 33s
test / integration (pull_request) Successful in 48s
2026-06-07 10:19:03 -04:00
didericis 086da71e6e ci(prd): rename PRD to prd-new placeholder per new convention
test / unit (pull_request) Successful in 31s
test / integration (pull_request) Successful in 47s
2026-06-06 22:10:23 -04:00
didericis-claude 0d4d930b29 docs: bump PRD number from 0052 to 0053
lint / lint (push) Successful in 1m30s
test / unit (pull_request) Successful in 34s
test / integration (pull_request) Successful in 43s
Renames docs/prds/0052-user-provider-plugins.md to 0053-user-provider-plugins.md
and updates the heading inside the file. 0052 is now reserved for the egress
DLP addon.
2026-06-06 16:06:09 -04:00
didericis-claude 3866461bf4 fix: correct broken imports and fileno() guard after rebase
codex_auth.py was moved into contrib/codex/ but still used `.log`/
`.util` relative imports that resolved to the parent bot_bottle
package before the move — update to `...log` / `...util`.

_read_winsize() called sys.stdin.fileno() outside the OSError guard;
pytest's redirected stdin raises UnsupportedOperation (an OSError
subclass) there, breaking test_returns_first_tty_size. Move fileno()
inside the try block so any non-TTY stream is skipped cleanly.
2026-06-06 16:06:09 -04:00
didericis-claude 664c67a272 docs: PRD 0052 — user-defined agent provider plugins
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-06 16:06:09 -04:00
didericis-claude 26a1a51cbb refactor: move codex_auth into contrib/codex
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-06 16:06:09 -04:00
18 changed files with 692 additions and 267 deletions
-39
View File
@@ -1,39 +0,0 @@
# Block PRs that add prd-new-*.md files directly to main.
#
# prd-new-*.md files are placeholders — they must go through a PR so
# the post-merge prd-number workflow can assign a sequential number and
# rename the file. A direct push or a PR that slips through without
# triggering the check would leave an un-numbered PRD on main.
name: prd-check
on:
pull_request:
branches:
- main
paths:
- 'docs/prds/prd-new-*.md'
jobs:
no-prd-new-on-main:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Fail if prd-new-*.md files are present in the diff
run: |
base="${{ github.event.pull_request.base.sha }}"
head="${{ github.event.pull_request.head.sha }}"
new_prds=$(git diff --name-only --diff-filter=A "$base" "$head" \
| grep -E '^docs/prds/prd-new-.+\.md$' || true)
if [ -n "$new_prds" ]; then
echo "ERROR: PRs to main must not add prd-new-*.md files directly."
echo "These files must be merged via a feature branch so the"
echo "prd-number workflow can assign a sequential number on merge:"
echo "$new_prds"
exit 1
fi
echo "OK: no prd-new-*.md files added in this PR."
-123
View File
@@ -1,123 +0,0 @@
# Assign sequential numbers to prd-new-*.md files on merge to main.
#
# When a PR merges to main and includes prd-new-*.md files this workflow:
# 1. Finds the next available NNNN number by scanning existing PRDs.
# 2. Renames each prd-new-*.md to NNNN-<slug>.md.
# 3. Updates the title header (# PRD prd-new: → # PRD NNNN:).
# 4. Flips Status: Draft → Active when the merge commit also touched
# files outside docs/prds/ (i.e. the implementation shipped together
# with the PRD).
# 5. Commits the renaming back to main.
#
# No-op if the push contained no prd-new-*.md files.
name: prd-number
on:
push:
branches:
- main
paths:
- 'docs/prds/prd-new-*.md'
jobs:
assign-numbers:
runs-on: ubuntu-latest
permissions:
contents: write
steps:
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 2
token: ${{ secrets.GITHUB_TOKEN }}
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: Configure git
run: |
git config user.name "github-actions[bot]"
git config user.email "github-actions[bot]@users.noreply.github.com"
- name: Assign PRD numbers
run: |
python3 - <<'EOF'
import os
import re
import subprocess
import sys
from pathlib import Path
prds_dir = Path("docs/prds")
# Files added in the latest commit (HEAD vs HEAD~1).
result = subprocess.run(
["git", "diff", "--name-only", "--diff-filter=A", "HEAD~1", "HEAD"],
capture_output=True, text=True, check=True,
)
added = [Path(p) for p in result.stdout.splitlines()]
new_prds = [p for p in added if p.parent == prds_dir
and re.match(r"prd-new-.+\.md$", p.name)]
if not new_prds:
print("No prd-new-*.md files added in this commit — nothing to do.")
sys.exit(0)
# Determine whether non-PRD files were also changed (for Status flip).
all_changed = subprocess.run(
["git", "diff", "--name-only", "HEAD~1", "HEAD"],
capture_output=True, text=True, check=True,
).stdout.splitlines()
non_prd_changed = any(
not f.startswith("docs/prds/") for f in all_changed
)
# Find next available number.
existing = sorted(
int(m.group(1))
for p in prds_dir.glob("*.md")
if (m := re.match(r"^(\d{4})-", p.name))
)
next_num = (max(existing) + 1) if existing else 1
for prd_path in sorted(new_prds):
slug = re.sub(r"^prd-new-", "", prd_path.stem)
new_name = f"{next_num:04d}-{slug}.md"
new_path = prds_dir / new_name
print(f" {prd_path.name} → {new_name}")
content = prd_path.read_text()
# Update title header.
content = re.sub(
r"^(#\s+PRD\s+)prd-new(:)",
rf"\g<1>{next_num:04d}\2",
content,
count=1,
flags=re.MULTILINE,
)
# Conditionally flip Status.
if non_prd_changed:
content = re.sub(
r"(\*\*Status:\*\*\s*)Draft",
r"\g<1>Active",
content,
count=1,
)
new_path.write_text(content)
subprocess.run(["git", "rm", str(prd_path)], check=True)
subprocess.run(["git", "add", str(new_path)], check=True)
next_num += 1
subprocess.run(
["git", "commit", "-m", "ci(prd): assign sequential numbers to new PRDs"],
check=True,
)
subprocess.run(["git", "push"], check=True)
EOF
+4 -5
View File
@@ -36,11 +36,10 @@ the container lifecycle and the copying of skills and env vars into it.
- Three kinds of doc, each with its own conventions in-folder; see
`docs/README.md` for when to write which:
- **PRDs** (`docs/prds/`) — one feature per file. While a PR is open
the file is named `prd-new-<kebab>.md`; CI assigns a sequential
number on merge to `main` and renames it. A `Status:` line tracks
lifecycle: Draft → Active (shipped to `main`) →
Superseded/Retargeted. Format in `docs/prds/README.md`.
- **PRDs** (`docs/prds/`) — one feature per file, numbered
`NNNN-kebab.md`. A `Status:` line tracks lifecycle: Draft → Active
(shipped to `main`) → Superseded/Retargeted. Format in
`docs/prds/README.md`.
- **Research notes** (`docs/research/`) — opinionated investigations;
unnumbered kebab-case, freeform and verdict-first. See
`docs/research/README.md`.
+7 -13
View File
@@ -16,20 +16,14 @@ FROM node:22-slim
# features (status checks, commits, PR creation) — without git in the
# image, those features fail in surprising ways once the user does any
# real work. ca-certificates is already in the slim base; listed for
# clarity in case the base ever drops it. curl is here so any
# HTTPS_PROXY-aware tool (curl itself, plus anything that shells out
# to it) works against egress's bumped TLS without the agent needing
# local DNS.
# clarity in case the base ever drops it. socat is the privileged
# forwarder for the in-container ssh-agent (see bot_bottle/ssh.py): the agent
# runs as root and rejects non-root connections, so socat sits between
# node and the agent socket. curl is here so any HTTPS_PROXY-aware
# tool (curl itself, plus anything that shells out to it) works
# against egress's bumped TLS without the agent needing local DNS.
RUN apt-get update \
&& apt-get install -y --no-install-recommends git ca-certificates curl \
&& rm -rf /var/lib/apt/lists/*
# App-specific deps. Python isn't required by claude-code itself
# (claude-code is a Node CLI), but is convenient for the agent to
# shell out to for ad-hoc scripts. Kept on its own layer so it can
# be moved to a downstream image if the base ever needs to shrink.
RUN apt-get update \
&& apt-get install -y --no-install-recommends python3 python3-pip python3-venv \
&& apt-get install -y --no-install-recommends git ca-certificates openssh-client socat curl dnsutils python3 python3-pip python3-venv \
&& rm -rf /var/lib/apt/lists/*
# Install claude-code globally. Pinned to the version verified in the v1
+1 -9
View File
@@ -6,15 +6,7 @@
FROM node:22-slim
RUN apt-get update \
&& apt-get install -y --no-install-recommends git ca-certificates curl \
&& rm -rf /var/lib/apt/lists/*
# App-specific deps. Python isn't required by codex itself
# (codex is a Node CLI), but is convenient for the agent to shell
# out to for ad-hoc scripts. Kept on its own layer so it can be
# moved to a downstream image if the base ever needs to shrink.
RUN apt-get update \
&& apt-get install -y --no-install-recommends python3 python3-pip python3-venv \
&& apt-get install -y --no-install-recommends git ca-certificates openssh-client socat curl dnsutils python3 python3-pip python3-venv \
&& rm -rf /var/lib/apt/lists/*
RUN npm install -g --no-fund --no-audit @openai/codex@0.136.0 \
+2 -2
View File
@@ -411,8 +411,8 @@ class BottleBackend(ABC, Generic[PlanT, CleanupT]):
# Import concrete backend classes AFTER the base types are defined, so
# each backend module can pull BottleSpec / BottlePlan / BottleBackend
# via `from . import ...` without hitting a partially-initialized module.
from .docker import DockerBottleBackend # noqa: E402 # pylint: disable=wrong-import-position
from .smolmachines import SmolmachinesBottleBackend # noqa: E402 # pylint: disable=wrong-import-position
from .docker import DockerBottleBackend # noqa: E402
from .smolmachines import SmolmachinesBottleBackend # noqa: E402
# The dict is heterogeneous: each value is a BottleBackend specialized
+1 -1
View File
@@ -25,7 +25,7 @@ from pathlib import Path
from typing import Generator, Sequence
from ...supervise import SUPERVISE_HOSTNAME, SUPERVISE_PORT
from .. import ActiveAgent, BottleBackend, BottleSpec
from .. import ActiveAgent, Bottle, BottleBackend, BottleSpec
from . import cleanup as _cleanup
from . import enumerate as _enumerate
from . import launch as _launch
+191 -5
View File
@@ -1,20 +1,70 @@
"""Host-side helper for egress sidecar inspection (issue #198).
"""Host-side helper to apply a routes.yaml change to a running
egress sidecar (PRD 0014 retargeted by PRD 0017 chunk 3, PRD 0053).
`_merge_single_route`, `add_route`, and `apply_routes_change` were
removed when the egress-block MCP tool was dropped. The remaining
helpers support runtime inspection and validation of the routes file
without modifying it at runtime.
Used by the supervise dashboard when the operator approves an
egress-block proposal. Fetches current routes.yaml, validates,
writes into the sidecar, then SIGHUPs to reload.
"""
from __future__ import annotations
import json
import subprocess
from pathlib import Path
from typing import cast
from ...egress import EGRESS_ROUTES_IN_CONTAINER
from ...egress_addon_core import load_routes
from ...yaml_subset import YamlSubsetError, parse_yaml_subset
from .bottle_state import egress_state_dir
from .sidecar_bundle import sidecar_bundle_container_name
def _render_routes_payload(routes_list: list[dict[str, object]]) -> str:
"""Render a list-of-dicts routes payload as YAML matching the
shape `egress_render_routes` produces."""
if not routes_list:
return "routes: []\n"
lines: list[str] = ["routes:"]
for entry in routes_list:
host = str(entry.get("host", ""))
lines.append(f' - host: "{host}"')
auth_scheme = entry.get("auth_scheme")
token_env = entry.get("token_env")
if auth_scheme and token_env:
lines.append(f' auth_scheme: "{auth_scheme}"')
lines.append(f' token_env: "{token_env}"')
matches_obj = entry.get("matches")
if isinstance(matches_obj, list) and matches_obj:
lines.append(" matches:")
for match_entry in matches_obj:
me = cast(dict[str, object], match_entry)
first_key = True
if "paths" in me:
lines.append(" - paths:")
first_key = False
for pd in cast(list[dict[str, str]], me["paths"]):
if "type" in pd:
lines.append(f' - type: "{pd["type"]}"')
lines.append(f' value: "{pd["value"]}"')
else:
lines.append(f' - value: "{pd["value"]}"')
if "methods" in me:
methods_str = ", ".join(
f'"{m}"' for m in cast(list[str], me["methods"])
)
prefix = " - " if first_key else " "
lines.append(f'{prefix}methods: [{methods_str}]')
first_key = False
if first_key:
lines.append(" - {}")
return "\n".join(lines) + "\n"
def _egress_routes_host_path(slug: str) -> Path:
return egress_state_dir(slug) / "egress_routes.yaml"
class EgressApplyError(RuntimeError):
pass
@@ -42,8 +92,144 @@ def validate_routes_content(content: str) -> None:
) from e
def apply_routes_change(slug: str, new_content: str) -> tuple[str, str]:
container = sidecar_bundle_container_name(slug)
before = fetch_current_routes(slug)
validate_routes_content(new_content)
target = _egress_routes_host_path(slug)
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(new_content)
target.chmod(0o644)
sig = subprocess.run(
["docker", "kill", "--signal", "HUP", container],
capture_output=True, text=True, check=False,
)
if sig.returncode != 0:
raise EgressApplyError(
f"failed to SIGHUP {container}: "
f"{(sig.stderr or '').strip()}"
)
return before, new_content
def _merge_single_route(
current_yaml: str, new_route: dict[str, object],
) -> str:
"""Merge a single proposed route into the current routes.yaml.
- Host absent → append the route.
- Host present → union the match paths (proposed existing).
Auth is preserved from existing route.
"""
try:
cfg = parse_yaml_subset(current_yaml)
except YamlSubsetError as e:
raise EgressApplyError(
f"current routes.yaml is not valid YAML: {e}"
) from e
routes = cfg.get("routes")
if not isinstance(routes, list):
raise EgressApplyError(
"current routes.yaml: 'routes' is not a list"
)
routes_typed = cast(list[object], routes)
new_host = str(new_route.get("host", "")).lower()
if not new_host:
raise EgressApplyError(
"proposed route is missing 'host'"
)
# Build proposed matches from the input
proposed_matches = new_route.get("matches")
if proposed_matches is None:
# Accept legacy path_allowlist from agent proposals and convert
proposed_paths = new_route.get("path_allowlist")
if isinstance(proposed_paths, list) and proposed_paths:
proposed_matches = [{"paths": [{"value": p} for p in proposed_paths]}]
for entry in routes_typed:
if not isinstance(entry, dict):
continue
entry_typed = cast(dict[str, object], entry)
if str(entry_typed.get("host", "")).lower() == new_host:
# Merge matches: union path values from proposed into existing
if isinstance(proposed_matches, list) and proposed_matches:
existing_matches = entry_typed.get("matches")
if not isinstance(existing_matches, list):
existing_matches = []
# Simple merge: collect all existing path values, add new ones
existing_paths: set[str] = set()
for me in existing_matches:
me_typed = cast(dict[str, object], me) if isinstance(me, dict) else {}
paths = me_typed.get("paths")
if isinstance(paths, list):
for p in paths:
p_typed = cast(dict[str, object], p) if isinstance(p, dict) else {}
val = p_typed.get("value")
if isinstance(val, str):
existing_paths.add(val)
new_paths: list[str] = []
for me in proposed_matches:
me_typed = cast(dict[str, object], me) if isinstance(me, dict) else {}
paths = me_typed.get("paths")
if isinstance(paths, list):
for p in paths:
p_typed = cast(dict[str, object], p) if isinstance(p, dict) else {}
val = p_typed.get("value")
if isinstance(val, str) and val not in existing_paths:
new_paths.append(val)
existing_paths.add(val)
if new_paths:
existing_matches.append(
{"paths": [{"value": p} for p in new_paths]}
)
entry_typed["matches"] = existing_matches
break
else:
entry_typed: dict[str, object] = {"host": new_route.get("host")} # type: ignore
if isinstance(proposed_matches, list) and proposed_matches:
entry_typed["matches"] = proposed_matches
auth = new_route.get("auth")
if isinstance(auth, dict) and auth.get("scheme") and auth.get("token_ref"): # type: ignore
auth_typed = cast(dict[str, object], auth)
existing_slots = sorted({
str(r_entry.get("token_env", ""))
for r_entry_obj in routes_typed
if isinstance(r_entry_obj, dict)
for r_entry in [cast(dict[str, object], r_entry_obj)]
if r_entry.get("token_env")
})
next_idx = len(existing_slots)
entry_typed["auth_scheme"] = str(cast(object, auth_typed.get("scheme")))
entry_typed["token_env"] = f"EGRESS_TOKEN_{next_idx}"
routes_typed.append(entry_typed)
return _render_routes_payload(cast(list[dict[str, object]], routes_typed))
def add_route(slug: str, proposed_route_json: str) -> tuple[str, str]:
try:
proposed = json.loads(proposed_route_json)
except json.JSONDecodeError as e:
raise EgressApplyError(
f"proposed route is not valid JSON: {e}"
) from e
if not isinstance(proposed, dict):
raise EgressApplyError(
"proposed route must be a JSON object"
)
current = fetch_current_routes(slug)
merged = _merge_single_route(current, proposed)
return apply_routes_change(slug, merged)
__all__ = [
"EgressApplyError",
"add_route",
"apply_routes_change",
"fetch_current_routes",
"validate_routes_content",
]
+14 -5
View File
@@ -2,8 +2,9 @@
act on them (approve / modify / reject).
Curses-based TUI; modify-then-approve shells out to $EDITOR. The
approval handler wires to PRD 0016 (capability-block), which rebuilds
the bottle Dockerfile. The egress-block tool was removed in issue #198.
approval handlers wire to the per-tool remediation engines:
PRD 0014 (egress) writes routes.yaml + SIGHUPs egress; PRD 0016
(capability) rebuilds the bottle Dockerfile.
"""
from __future__ import annotations
@@ -25,6 +26,7 @@ from ..backend.docker.capability_apply import (
CapabilityApplyError,
apply_capability_change,
)
from ..backend.docker.egress_apply import EgressApplyError, add_route
from ..log import Die, error, info
from ..supervise import (
COMPONENT_FOR_TOOL,
@@ -35,6 +37,7 @@ from ..supervise import (
STATUS_MODIFIED,
STATUS_REJECTED,
TOOL_CAPABILITY_BLOCK,
TOOL_EGRESS_BLOCK,
archive_proposal,
list_pending_proposals,
render_diff,
@@ -58,7 +61,7 @@ class QueuedProposal:
# Errors any remediation engine may raise. Caught by the TUI key
# handlers and surfaced in the status line so a failed apply keeps
# the proposal pending rather than crashing curses.
ApplyError = (CapabilityApplyError,)
ApplyError = (EgressApplyError, CapabilityApplyError)
def discover_pending() -> list[QueuedProposal]:
@@ -79,7 +82,9 @@ def discover_pending() -> list[QueuedProposal]:
def _approval_status(qp: QueuedProposal, verb: str) -> str:
"""Status-line text after a successful approval."""
base = f"{verb} {qp.proposal.tool} for [{qp.proposal.bottle_slug}]"
return f"{base}; resume: ./cli.py resume {qp.proposal.bottle_slug}"
if qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
return f"{base}; resume: ./cli.py resume {qp.proposal.bottle_slug}"
return base
def _detail_lines(
@@ -127,7 +132,11 @@ def approve(
file_to_apply = final_file if final_file is not None else qp.proposal.proposed_file
diff_before, diff_after = "", ""
if qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
if qp.proposal.tool == TOOL_EGRESS_BLOCK:
diff_before, diff_after = add_route(
qp.proposal.bottle_slug, file_to_apply,
)
elif qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
_meta = read_metadata(qp.proposal.bottle_slug)
if _meta is not None and not _meta.compose_project:
raise CapabilityApplyError(
+7 -2
View File
@@ -48,9 +48,11 @@ from pathlib import Path
SUPERVISE_HOSTNAME = "supervise"
SUPERVISE_PORT = 9100
TOOL_EGRESS_BLOCK = "egress-block"
TOOL_CAPABILITY_BLOCK = "capability-block"
TOOL_LIST_EGRESS_ROUTES = "list-egress-routes"
TOOLS: tuple[str, ...] = (
TOOL_EGRESS_BLOCK,
TOOL_CAPABILITY_BLOCK,
TOOL_LIST_EGRESS_ROUTES,
)
@@ -68,8 +70,10 @@ EGRESS_INTROSPECT_URL = "http://_egress.local/allowlist"
# capability-block has no on-disk config the operator edits in place
# (the Dockerfile is rebuilt, not patched), so it has no audit log
# here — those changes are captured by git history + the rebuild
# record laid down in PRD 0016. egress-block was removed in issue #198.
COMPONENT_FOR_TOOL: dict[str, str] = {}
# record laid down in PRD 0016.
COMPONENT_FOR_TOOL: dict[str, str] = {
TOOL_EGRESS_BLOCK: "egress",
}
STATUS_APPROVED = "approved"
STATUS_MODIFIED = "modified"
@@ -551,6 +555,7 @@ __all__ = [
"EGRESS_FORWARD_PROXY",
"EGRESS_INTROSPECT_URL",
"TOOL_CAPABILITY_BLOCK",
"TOOL_EGRESS_BLOCK",
"TOOL_LIST_EGRESS_ROUTES",
"archive_proposal",
"audit_dir",
+142 -6
View File
@@ -1,10 +1,8 @@
"""Supervise sidecar HTTP server (PRD 0013).
Per-bottle MCP server exposing tools the agent calls to propose config
changes when stuck. The egress-block tool was removed in issue #198;
the remaining tools are `capability-block` and `list-egress-routes`.
Each queued tool call:
Per-bottle MCP server exposing two tools — `egress-block`,
`capability-block` — that the agent calls to propose config changes
when stuck. Each tool call:
1. Validates the proposed file syntactically.
2. Writes a Proposal to /run/supervise/queue/ (bind-mounted from
@@ -135,6 +133,69 @@ def jsonrpc_error(request_id: object, code: int, message: str) -> bytes:
TOOL_DEFINITIONS: list[dict[str, object]] = [
{
"name": _sv.TOOL_EGRESS_BLOCK,
"description": (
"Call when egress refused your HTTPS request — host "
"without a matching route, or a request that did not match "
"the route's matches rules (typically a 403 from the "
"proxy). Propose a SINGLE route to add: the host you "
"need + (optionally) a path_allowlist of path prefixes + "
"(optionally) an auth block. The supervisor merges the "
"route into the live table at approval time — you do NOT "
"need to see or reproduce the existing routes. If the "
"host already has a route, the proposed paths are unioned "
"with the existing ones (host stays single-route). The "
"operator approves or rejects in the supervise TUI. On "
"approval the supervisor writes the merged routes.yaml "
"and SIGHUPs egress (no dropped connections)."
),
"inputSchema": {
"type": "object",
"properties": {
"host": {
"type": "string",
"description": (
"The hostname to allow (e.g. 'api.github.com'). "
"Case-insensitive on match."
),
},
"path_allowlist": {
"type": "array",
"items": {"type": "string"},
"description": (
"Optional URL path prefixes the route permits. "
"Each must start with '/'. Omit to allow all "
"paths under this host (bare-pass route). "
"Internally converted to matches entries."
),
},
"auth": {
"type": "object",
"description": (
"Optional credential injection. {scheme, "
"token_ref}: scheme is 'Bearer' or 'token'; "
"token_ref names the host env var holding the "
"secret value. Omit to add a host without "
"credential injection. Ignored if the host "
"already has a route (operator decides auth "
"changes, not the agent)."
),
"properties": {
"scheme": {"type": "string"},
"token_ref": {"type": "string"},
},
"required": ["scheme", "token_ref"],
"additionalProperties": False,
},
"justification": {
"type": "string",
"description": "Why this host needs to be allowed.",
},
},
"required": ["host", "justification"],
},
},
{
"name": _sv.TOOL_LIST_EGRESS_ROUTES,
"description": (
@@ -193,6 +254,11 @@ PROPOSED_FILE_FIELD: dict[str, str] = {
# --- Validation ------------------------------------------------------------
# Auth schemes accepted on egress-block proposals — match the
# manifest-side EGRESS_AUTH_SCHEMES.
_AUTH_SCHEMES = ("Bearer", "token")
def validate_proposed_file(tool: str, content: str) -> None:
"""Syntactic validation. The operator is the real gate; this just
catches obvious paste-errors / wrong-tool selections before they
@@ -207,6 +273,70 @@ def validate_proposed_file(tool: str, content: str) -> None:
raise _RpcError(ERR_INVALID_PARAMS, f"unknown tool {tool!r}")
def _validate_and_bundle_egress_route(
args: dict[str, object],
) -> str:
"""Validate egress-block input fields and bundle them into
a JSON string that becomes the Proposal.proposed_file. Raises
_RpcError on bad input — the agent retries with a fixed shape."""
tool = _sv.TOOL_EGRESS_BLOCK
host = args.get("host")
if not isinstance(host, str) or not host.strip():
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: 'host' is required and must be a non-empty string",
)
payload: dict[str, object] = {"host": host}
path_allow_raw = args.get("path_allowlist")
if path_allow_raw is not None:
if not isinstance(path_allow_raw, list):
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: 'path_allowlist' must be an array of strings",
)
prefixes: list[str] = []
for i, p in enumerate(path_allow_raw):
if not isinstance(p, str):
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: path_allowlist[{i}] must be a string",
)
if not p.startswith("/"):
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: path_allowlist[{i}] {p!r} must start with '/'",
)
prefixes.append(p)
if prefixes:
payload["path_allowlist"] = prefixes
auth_raw = args.get("auth")
if auth_raw is not None:
if not isinstance(auth_raw, dict):
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: 'auth' must be an object with 'scheme' and 'token_ref'",
)
scheme = auth_raw.get("scheme")
token_ref = auth_raw.get("token_ref")
if not isinstance(scheme, str) or scheme not in _AUTH_SCHEMES:
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: auth.scheme must be one of "
f"{', '.join(_AUTH_SCHEMES)} (got {scheme!r})",
)
if not isinstance(token_ref, str) or not token_ref:
raise _RpcError(
ERR_INVALID_PARAMS,
f"{tool}: auth.token_ref must be a non-empty string "
f"naming the host env var holding the token",
)
payload["auth"] = {"scheme": scheme, "token_ref": token_ref}
return json.dumps(payload, indent=2) + "\n"
# --- MCP handlers ----------------------------------------------------------
@@ -292,7 +422,13 @@ def handle_tools_call(
f"{name}: 'justification' is required and must be a non-empty string",
)
if name in PROPOSED_FILE_FIELD:
if name == _sv.TOOL_EGRESS_BLOCK:
# Structured input → JSON bundle on Proposal.proposed_file.
# The dashboard's apply step (egress_apply.add_route)
# parses this JSON, fetches the current routes, merges in
# the new one, and writes the merged file.
proposed_file = _validate_and_bundle_egress_route(args_raw)
elif name in PROPOSED_FILE_FIELD:
file_field = PROPOSED_FILE_FIELD[name]
proposed_file = args_raw.get(file_field)
if not isinstance(proposed_file, str):
+4 -7
View File
@@ -7,12 +7,9 @@ document vs. a research note or a decision record).
## Naming and numbering
New PRDs use a `prd-new-<kebab-title>.md` placeholder name while the PR
is open. On merge to `main` a CI workflow assigns the next sequential
number (`0024-…`, `0025-…`), renames the file, and updates the title
header. Numbers are never reused; gaps are fine.
Once numbered, the filename stays fixed for the life of the doc.
`NNNN-kebab-title.md`, zero-padded and sequential (`0024-…`, `0025-…`).
Numbers are never reused; gaps are fine (there is no 0005). The number
is assigned at creation and stays fixed for the life of the doc.
## Status
@@ -26,7 +23,7 @@ The `Status:` line near the top tracks the PRD's lifecycle:
## Format
```markdown
# PRD prd-new: <short title> ← placeholder; CI fills in the number on merge
# PRD NNNN: <short title>
- **Status:** Draft
- **Author:** <who>
+1 -1
View File
@@ -120,7 +120,7 @@ def _git_config_exec_calls(bottle: MagicMock) -> list[tuple[str, str]]:
class TestProvisionGitUser(unittest.TestCase):
def setUp(self):
self._tmp = tempfile.TemporaryDirectory(prefix="cb-prov-git-user.") # pylint: disable=consider-using-with
self._tmp = tempfile.TemporaryDirectory(prefix="cb-prov-git-user.")
self.stage = Path(self._tmp.name)
def tearDown(self):
+136 -3
View File
@@ -1,19 +1,27 @@
"""Unit: validate_routes_content (issue #198: _merge_single_route and
add_route removed; docker exec / cp / kill paths are covered by the
integration test)."""
"""Unit: validate_routes_content (PRD 0014 retargeted by PRD 0017
chunk 3, PRD 0053). docker exec / cp / kill paths are covered by the
integration test."""
import unittest
from bot_bottle.backend.docker.egress_apply import (
EgressApplyError,
_merge_single_route,
validate_routes_content,
)
from bot_bottle.yaml_subset import parse_yaml_subset
_ROUTES_EMPTY = "routes: []\n"
_ROUTES_ONE = 'routes:\n - host: "api.anthropic.com"\n'
def _routes(parsed: str) -> list[dict]: # type: ignore
"""Parse a YAML routes string and pull out the routes list, so
tests can assert on shape directly."""
return parse_yaml_subset(parsed)["routes"] # type: ignore
class TestValidateRoutesContent(unittest.TestCase):
def test_accepts_minimal_route_table(self):
validate_routes_content(_ROUTES_EMPTY)
@@ -52,5 +60,130 @@ class TestValidateRoutesContent(unittest.TestCase):
)
class TestMergeSingleRoute(unittest.TestCase):
BASE = _ROUTES_ONE
def test_appends_route_when_host_absent(self):
merged = _merge_single_route(self.BASE, {"host": "github.com"})
hosts = [r["host"] for r in _routes(merged)]
self.assertEqual(["api.anthropic.com", "github.com"], hosts)
def test_appends_matches(self):
merged = _merge_single_route(
self.BASE,
{"host": "github.com", "matches": [
{"paths": [{"value": "/repos/x/"}]}
]},
)
new_route = _routes(merged)[-1]
self.assertIn("matches", new_route)
def test_appends_legacy_path_allowlist_as_matches(self):
merged = _merge_single_route(
self.BASE,
{"host": "github.com", "path_allowlist": ["/repos/x/"]},
)
new_route = _routes(merged)[-1]
self.assertIn("matches", new_route)
def test_appends_auth_with_token_env_slot(self):
merged = _merge_single_route(
self.BASE,
{
"host": "api.github.com",
"auth": {"scheme": "Bearer", "token_ref": "GH"},
},
)
new_route = _routes(merged)[-1]
self.assertEqual("Bearer", new_route["auth_scheme"])
self.assertEqual("EGRESS_TOKEN_0", new_route["token_env"])
def test_auth_slot_increments_past_existing(self):
base = (
'routes:\n'
' - host: "api.anthropic.com"\n'
' auth_scheme: "Bearer"\n'
' token_env: "EGRESS_TOKEN_0"\n'
)
merged = _merge_single_route(base, {
"host": "api.github.com",
"auth": {"scheme": "Bearer", "token_ref": "GH"},
})
new_route = _routes(merged)[-1]
self.assertEqual("EGRESS_TOKEN_1", new_route["token_env"])
def test_existing_host_merges_match_paths_as_union(self):
base = (
'routes:\n'
' - host: "github.com"\n'
' matches:\n'
' - paths:\n'
' - value: "/a/"\n'
)
merged = _merge_single_route(base, {
"host": "github.com",
"matches": [{"paths": [{"value": "/b/"}]}],
})
routes = _routes(merged)
self.assertEqual(1, len(routes))
all_paths: list[str] = []
for me in routes[0].get("matches", []):
for p in me.get("paths", []):
all_paths.append(p["value"])
self.assertIn("/a/", all_paths)
self.assertIn("/b/", all_paths)
def test_existing_host_dedup_match_paths(self):
base = (
'routes:\n'
' - host: "github.com"\n'
' matches:\n'
' - paths:\n'
' - value: "/a/"\n'
)
merged = _merge_single_route(base, {
"host": "github.com",
"matches": [{"paths": [{"value": "/a/"}, {"value": "/b/"}]}],
})
all_paths: list[str] = []
for me in _routes(merged)[0].get("matches", []):
for p in me.get("paths", []):
all_paths.append(p["value"])
self.assertEqual(1, all_paths.count("/a/"))
self.assertIn("/b/", all_paths)
def test_existing_host_preserves_existing_auth_ignores_proposed(self):
base = (
'routes:\n'
' - host: "api.github.com"\n'
' auth_scheme: "Bearer"\n'
' token_env: "EGRESS_TOKEN_0"\n'
)
merged = _merge_single_route(base, {
"host": "api.github.com",
"auth": {"scheme": "token", "token_ref": "DIFFERENT"},
})
route = _routes(merged)[0]
self.assertEqual("Bearer", route["auth_scheme"])
self.assertEqual("EGRESS_TOKEN_0", route["token_env"])
def test_host_match_is_case_insensitive(self):
base = 'routes:\n - host: "GitHub.com"\n'
merged = _merge_single_route(base, {
"host": "github.com",
"matches": [{"paths": [{"value": "/x/"}]}],
})
routes = _routes(merged)
self.assertEqual(1, len(routes))
def test_missing_host_raises(self):
with self.assertRaises(EgressApplyError):
_merge_single_route(self.BASE, {})
def test_invalid_current_yaml_raises(self):
with self.assertRaises(EgressApplyError):
_merge_single_route("routes:\n\tbad", {"host": "x.example"})
if __name__ == "__main__":
unittest.main()
+4 -4
View File
@@ -254,7 +254,7 @@ class TestProvisionCA(unittest.TestCase):
cp_in + exec in the right order."""
def setUp(self):
self._tmp = tempfile.TemporaryDirectory(prefix="cb-prov-ca.") # pylint: disable=consider-using-with
self._tmp = tempfile.TemporaryDirectory(prefix="cb-prov-ca.")
self.tmp = Path(self._tmp.name)
self.egress_ca = self.tmp / "egress-ca.pem"
_write_self_signed_cert(self.egress_ca)
@@ -346,7 +346,7 @@ class TestProvisionGit(unittest.TestCase):
when its condition doesn't hold."""
def setUp(self):
self._tmp = tempfile.TemporaryDirectory(prefix="cb-prov-git.") # pylint: disable=consider-using-with
self._tmp = tempfile.TemporaryDirectory(prefix="cb-prov-git.")
self.stage = Path(self._tmp.name)
def tearDown(self):
@@ -407,7 +407,7 @@ class TestProvisionGit(unittest.TestCase):
cp_call = bottle.cp_in.call_args
staged_path = Path(cp_call.args[0])
self.assertEqual(self.stage, staged_path.parent)
content = staged_path.read_text(encoding="utf-8")
content = staged_path.read_text()
self.assertIn(
'[url "http://127.0.0.1:9418/bot-bottle.git"]', content,
)
@@ -507,7 +507,7 @@ class TestProvisionGitUser(unittest.TestCase):
class TestProvisionWorkspace(unittest.TestCase):
def setUp(self):
self._tmp = tempfile.TemporaryDirectory(prefix="cb-prov-workspace.") # pylint: disable=consider-using-with
self._tmp = tempfile.TemporaryDirectory(prefix="cb-prov-workspace.")
self.stage = Path(self._tmp.name)
def tearDown(self):
+14 -12
View File
@@ -17,6 +17,7 @@ from bot_bottle.supervise import (
STATUS_MODIFIED,
STATUS_REJECTED,
TOOL_CAPABILITY_BLOCK,
TOOL_EGRESS_BLOCK,
archive_proposal,
audit_log_path,
list_pending_proposals,
@@ -36,16 +37,16 @@ FIXED_TS = datetime(2026, 5, 25, 12, 0, 0, tzinfo=timezone.utc)
def _proposal(
tool: str = TOOL_CAPABILITY_BLOCK,
proposed: str = "FROM python:3.13\n",
justification: str = "need a capability",
tool: str = TOOL_EGRESS_BLOCK,
proposed: str = "{}",
justification: str = "need a route",
) -> Proposal:
return Proposal.new(
bottle_slug="dev",
tool=tool,
proposed_file=proposed,
justification=justification,
current_file_hash=sha256_hex(proposed),
current_file_hash=sha256_hex("{}"),
now=FIXED_TS,
)
@@ -56,7 +57,7 @@ class TestProposalRoundtrip(unittest.TestCase):
self.assertTrue(p.id)
self.assertEqual("2026-05-25T12:00:00+00:00", p.arrival_timestamp)
self.assertEqual("dev", p.bottle_slug)
self.assertEqual(TOOL_CAPABILITY_BLOCK, p.tool)
self.assertEqual(TOOL_EGRESS_BLOCK, p.tool)
def test_to_from_dict_roundtrip(self):
p = _proposal()
@@ -141,14 +142,14 @@ class TestQueueIO(unittest.TestCase):
def test_list_pending_sorted_by_arrival(self):
# Fabricate two with explicit timestamps.
a = Proposal.new(
bottle_slug="dev", tool=TOOL_CAPABILITY_BLOCK,
proposed_file="FROM python:3.13\n", justification="early",
bottle_slug="dev", tool=TOOL_EGRESS_BLOCK,
proposed_file="{}", justification="early",
current_file_hash="x",
now=datetime(2026, 5, 25, 10, 0, 0, tzinfo=timezone.utc),
)
b = Proposal.new(
bottle_slug="dev", tool=TOOL_CAPABILITY_BLOCK,
proposed_file="FROM python:3.13\n", justification="late",
bottle_slug="dev", tool=TOOL_EGRESS_BLOCK,
proposed_file="{}", justification="late",
current_file_hash="x",
now=datetime(2026, 5, 25, 14, 0, 0, tzinfo=timezone.utc),
)
@@ -317,15 +318,16 @@ class TestToolConstants(unittest.TestCase):
def test_tools_tuple_matches_individual_constants(self):
self.assertEqual(
(
TOOL_EGRESS_BLOCK,
TOOL_CAPABILITY_BLOCK,
supervise.TOOL_LIST_EGRESS_ROUTES,
),
supervise.TOOLS,
)
def test_component_map_has_no_entries(self):
# egress-block removed in issue #198; capability-block never had one.
self.assertEqual({}, supervise.COMPONENT_FOR_TOOL)
def test_component_map_covers_egress_remediation_only(self):
self.assertIn(TOOL_EGRESS_BLOCK, supervise.COMPONENT_FOR_TOOL)
self.assertNotIn(TOOL_CAPABILITY_BLOCK, supervise.COMPONENT_FOR_TOOL)
class _StubSupervise(supervise.Supervise):
+152 -18
View File
@@ -1,10 +1,12 @@
"""Unit: supervise headless paths (PRD 0013 phase 4, PRD 0016).
"""Unit: supervise headless paths (PRD 0013 phase 4, PRD 0014).
The curses TUI itself isn't exercised here — these tests cover the
discovery + approve/reject paths that the TUI's key handlers call into.
discovery + approve/reject + audit-write paths that the TUI's key
handlers call into.
egress-block (add_route) was removed in issue #198; the TestEgressApplyWiring
class and all stubs for add_route have been dropped accordingly.
add_route is stubbed at the supervise CLI module level so the tests
don't need a running egress sidecar; the real docker exec/cp/SIGHUP
plumbing is covered by the integration test.
"""
import os
@@ -15,6 +17,7 @@ from pathlib import Path
from bot_bottle import supervise
from bot_bottle.backend.docker.capability_apply import CapabilityApplyError
from bot_bottle.backend.docker.egress_apply import EgressApplyError
from bot_bottle.cli import supervise as supervise_cli
from bot_bottle.supervise import (
Proposal,
@@ -22,6 +25,7 @@ from bot_bottle.supervise import (
STATUS_MODIFIED,
STATUS_REJECTED,
TOOL_CAPABILITY_BLOCK,
TOOL_EGRESS_BLOCK,
read_audit_entries,
read_response,
sha256_hex,
@@ -31,8 +35,9 @@ from bot_bottle.supervise import (
FIXED = datetime(2026, 5, 25, 12, 0, 0, tzinfo=timezone.utc)
def _proposal(slug: str = "dev", tool: str = TOOL_CAPABILITY_BLOCK) -> Proposal:
def _proposal(slug: str = "dev", tool: str = TOOL_EGRESS_BLOCK) -> Proposal:
payloads = {
TOOL_EGRESS_BLOCK: '{"routes": []}\n',
TOOL_CAPABILITY_BLOCK: "FROM python:3.13\n",
}
payload = payloads.get(tool, "")
@@ -83,14 +88,14 @@ class TestDiscoverPending(_FakeHomeMixin, unittest.TestCase):
def test_sorted_by_arrival_across_bottles(self):
early = Proposal.new(
bottle_slug="api", tool=TOOL_CAPABILITY_BLOCK,
proposed_file="FROM python:3.13\n", justification="early",
bottle_slug="api", tool=TOOL_EGRESS_BLOCK,
proposed_file="{}", justification="early",
current_file_hash="h",
now=datetime(2026, 5, 25, 10, 0, 0, tzinfo=timezone.utc),
)
late = Proposal.new(
bottle_slug="dev", tool=TOOL_CAPABILITY_BLOCK,
proposed_file="FROM python:3.13\n", justification="late",
bottle_slug="dev", tool=TOOL_EGRESS_BLOCK,
proposed_file="{}", justification="late",
current_file_hash="h",
now=datetime(2026, 5, 25, 14, 0, 0, tzinfo=timezone.utc),
)
@@ -115,38 +120,48 @@ class TestDiscoverPending(_FakeHomeMixin, unittest.TestCase):
class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
def setUp(self):
self._setup_fake_home()
self._original_add_route = supervise_cli.add_route
self._original_apply_capability = supervise_cli.apply_capability_change
# Default stubs: succeed with deterministic before/after so the
# audit log shows a non-empty diff.
supervise_cli.add_route = lambda slug, content: ( # type: ignore
'{"routes": []}\n', '{"routes": [{"host": "x"}]}\n',
)
supervise_cli.apply_capability_change = lambda slug, content: ( # type: ignore
"FROM old\n", content,
)
def tearDown(self):
supervise_cli.add_route = self._original_add_route
supervise_cli.apply_capability_change = self._original_apply_capability
self._teardown_fake_home()
def _enqueue(self, tool: str = TOOL_CAPABILITY_BLOCK):
def _enqueue(self, tool: str = TOOL_EGRESS_BLOCK):
p = _proposal(tool=tool)
qdir = supervise.queue_dir_for_slug("dev")
qdir.mkdir(parents=True, exist_ok=True)
supervise.write_proposal(qdir, p)
return supervise_cli.QueuedProposal(proposal=p, queue_dir=qdir)
def test_approve_writes_response(self):
def test_approve_writes_response_and_audit(self):
qp = self._enqueue()
supervise_cli.approve(qp)
# capability-block is archived on approve, so the response file
# moves to processed/ before the caller can read it.
resp = read_response(qp.queue_dir / "processed", qp.proposal.id)
resp = read_response(qp.queue_dir, qp.proposal.id)
self.assertEqual(STATUS_APPROVED, resp.status)
self.assertIsNone(resp.final_file)
entries = read_audit_entries("egress", "dev")
self.assertEqual(1, len(entries))
self.assertEqual("approved", entries[0].operator_action)
def test_approve_with_final_file_marks_modified(self):
qp = self._enqueue()
supervise_cli.approve(qp, final_file="FROM bookworm\n", notes="tweaked")
resp = read_response(qp.queue_dir / "processed", qp.proposal.id)
supervise_cli.approve(qp, final_file='{"routes": [{"path": "/x/"}]}\n', notes="tweaked")
resp = read_response(qp.queue_dir, qp.proposal.id)
self.assertEqual(STATUS_MODIFIED, resp.status)
self.assertEqual("FROM bookworm\n", resp.final_file)
self.assertEqual('{"routes": [{"path": "/x/"}]}\n', resp.final_file)
self.assertEqual("tweaked", resp.notes)
entries = read_audit_entries("egress", "dev")
self.assertEqual("modified", entries[0].operator_action)
def test_reject_writes_rejection(self):
qp = self._enqueue()
@@ -154,13 +169,113 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
resp = read_response(qp.queue_dir, qp.proposal.id)
self.assertEqual(STATUS_REJECTED, resp.status)
self.assertEqual("nope", resp.notes)
entries = read_audit_entries("egress", "dev")
self.assertEqual("rejected", entries[0].operator_action)
self.assertEqual("nope", entries[0].operator_notes)
def test_no_audit_log_for_capability_block(self):
def test_capability_block_skips_audit_log(self):
qp = self._enqueue(tool=TOOL_CAPABILITY_BLOCK)
supervise_cli.approve(qp)
# No audit log for capability-block (per PRD 0013 / 0016).
self.assertEqual([], read_audit_entries("egress", "dev"))
class TestEgressApplyWiring(_FakeHomeMixin, unittest.TestCase):
"""PRD 0017 chunk 3: approve() on an egress-block proposal
must call add_route (single-route merge) with the right args
and surface its failures."""
def setUp(self):
self._setup_fake_home()
self._original_add_route = supervise_cli.add_route
def tearDown(self):
supervise_cli.add_route = self._original_add_route
self._teardown_fake_home()
def _enqueue_egress(self, proposed: str = '{"host": "x.example"}\n'):
p = Proposal.new(
bottle_slug="dev", tool=TOOL_EGRESS_BLOCK,
proposed_file=proposed,
justification="need a route",
current_file_hash=sha256_hex(proposed),
now=FIXED,
)
qdir = supervise.queue_dir_for_slug("dev")
qdir.mkdir(parents=True, exist_ok=True)
supervise.write_proposal(qdir, p)
return supervise_cli.QueuedProposal(proposal=p, queue_dir=qdir)
def test_egress_block_calls_add_route_with_proposed_json(self):
calls = []
supervise_cli.add_route = lambda slug, content: ( # type: ignore
calls.append((slug, content)) or ("before", "after")
)
qp = self._enqueue_egress(
proposed='{"host": "new.example", "path_allowlist": ["/x/"]}\n'
)
supervise_cli.approve(qp)
self.assertEqual(1, len(calls))
slug, content = calls[0]
self.assertEqual("dev", slug)
# The single-route JSON the agent proposed reaches add_route
# unchanged — add_route fetches current state + merges.
self.assertEqual(
'{"host": "new.example", "path_allowlist": ["/x/"]}\n',
content,
)
def test_modify_passes_final_file_to_add_route(self):
calls = []
supervise_cli.add_route = lambda slug, content: ( # type: ignore
calls.append(content) or ("before", "after")
)
qp = self._enqueue_egress()
supervise_cli.approve(
qp,
final_file='{"host": "edited.example"}\n',
notes="tweaked",
)
self.assertEqual(['{"host": "edited.example"}\n'], calls)
def test_apply_failure_blocks_response_and_audit(self):
supervise_cli.add_route = lambda slug, content: (_ for _ in ()).throw( # type: ignore
EgressApplyError("docker exec failed")
)
qp = self._enqueue_egress()
with self.assertRaises(EgressApplyError):
supervise_cli.approve(qp)
# No response file (proposal stays pending).
self.assertEqual(
[qp.proposal.id],
[p.id for p in supervise.list_pending_proposals(qp.queue_dir)],
)
# No audit entry.
self.assertEqual([], read_audit_entries("egress", "dev"))
def test_real_diff_lands_in_audit(self):
supervise_cli.add_route = lambda slug, content: ( # type: ignore
'{"routes": []}\n', # before
'{"routes": [{"host": "new.example"}]}\n', # after
)
qp = self._enqueue_egress(proposed='{"host": "new.example"}\n')
supervise_cli.approve(qp)
entries = read_audit_entries("egress", "dev")
self.assertEqual(1, len(entries))
self.assertIn('+{"routes": [{"host": "new.example"}]}', entries[0].diff)
self.assertIn('-{"routes": []}', entries[0].diff)
def test_reject_does_not_call_apply(self):
qp = self._enqueue_egress()
supervise_cli.reject(qp, reason="no thanks")
# Reject still writes a response + audit entry with empty diff.
resp = read_response(qp.queue_dir, qp.proposal.id)
self.assertEqual(STATUS_REJECTED, resp.status)
entries = read_audit_entries("egress", "dev")
self.assertEqual(1, len(entries))
self.assertEqual("", entries[0].diff)
class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
"""PRD 0016 Phase 3: approve() on a capability-block proposal
calls apply_capability_change, archives the proposal afterward
@@ -213,12 +328,17 @@ class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
supervise_cli.apply_capability_change = lambda slug, content: ("FROM old\n", content) # type: ignore
qp = self._enqueue_capability()
supervise_cli.approve(qp)
# capability-block has no audit log per PRD 0013 — its record
# lives in the per-bottle Dockerfile + transcript state.
self.assertEqual([], read_audit_entries("egress", "dev"))
def test_proposal_archived_after_apply(self):
supervise_cli.apply_capability_change = lambda slug, content: ("FROM old\n", content) # type: ignore
qp = self._enqueue_capability()
supervise_cli.approve(qp)
# Sidecar would normally archive after delivering the response,
# but it's gone by then. The supervise TUI archives so
# discover_pending stops surfacing the resolved proposal.
self.assertEqual([], supervise.list_pending_proposals(qp.queue_dir))
processed = list((qp.queue_dir / "processed").glob("*.json"))
self.assertEqual(2, len(processed))
@@ -226,8 +346,20 @@ class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
class TestEditInEditor(unittest.TestCase):
def test_runs_editor_returns_edited_content(self):
# Fake "editor" is /bin/sh -c 'cat <<EOF > $1 ... EOF'
original_editor = os.environ.get("EDITOR")
try:
# Use a fake editor that overwrites the file with a known
# marker. EDITOR is split with shlex equivalence by
# subprocess.run when invoked as a list — keep it as a
# single program path that takes the file as argv[1].
os.environ["EDITOR"] = (
"/bin/sh -c 'printf %s \"edited\" > \"$0\"'"
)
# subprocess.run with the str as the first list element
# would try to find a binary literally named "/bin/sh -c ..."
# — that won't work. Use shell mode trick: wrap in a script.
# Easier: build a tiny helper script.
with tempfile.NamedTemporaryFile(
mode="w", suffix=".sh", delete=False, prefix="fake-editor.",
) as script:
@@ -249,6 +381,7 @@ class TestEditInEditor(unittest.TestCase):
def test_returns_none_when_unchanged(self):
original_editor = os.environ.get("EDITOR")
try:
# No-op editor: touch the file (leaves it unchanged).
with tempfile.NamedTemporaryFile(
mode="w", suffix=".sh", delete=False, prefix="noop-editor.",
) as script:
@@ -312,6 +445,7 @@ class TestCapabilityBlockSmolmachinesGuard(_FakeHomeMixin, unittest.TestCase):
supervise_cli.approve(qp) # must not raise
def test_no_metadata_falls_through_to_docker_path(self):
# No metadata at all → assume Docker (backward-compatible).
qp = self._enqueue_capability("dev")
supervise_cli.approve(qp) # must not raise
+12 -12
View File
@@ -141,6 +141,7 @@ class TestHandleToolsList(unittest.TestCase):
names = [t["name"] for t in result["tools"]] # type: ignore[index]
self.assertEqual(
sorted([
_sv.TOOL_EGRESS_BLOCK,
_sv.TOOL_CAPABILITY_BLOCK,
_sv.TOOL_LIST_EGRESS_ROUTES,
]),
@@ -205,10 +206,10 @@ class TestHandleToolsCall(unittest.TestCase):
try:
result = handle_tools_call(
{
"name": _sv.TOOL_CAPABILITY_BLOCK,
"name": _sv.TOOL_EGRESS_BLOCK,
"arguments": {
"dockerfile": "FROM python:3.13\n",
"justification": "need git",
"host": "example.com",
"justification": "need a route",
},
},
self.config,
@@ -249,8 +250,8 @@ class TestHandleToolsCall(unittest.TestCase):
with self.assertRaises(_RpcError):
handle_tools_call(
{
"name": _sv.TOOL_CAPABILITY_BLOCK,
"arguments": {"dockerfile": "FROM python:3.13\n"},
"name": _sv.TOOL_EGRESS_BLOCK,
"arguments": {"host": "example.com"},
},
self.config,
)
@@ -260,9 +261,9 @@ class TestHandleToolsCall(unittest.TestCase):
try:
handle_tools_call(
{
"name": _sv.TOOL_CAPABILITY_BLOCK,
"name": _sv.TOOL_EGRESS_BLOCK,
"arguments": {
"dockerfile": "FROM python:3.13\n",
"host": "example.com",
"justification": "x",
},
},
@@ -284,10 +285,10 @@ class TestHandleToolsCall(unittest.TestCase):
)
result = handle_tools_call(
{
"name": _sv.TOOL_CAPABILITY_BLOCK,
"name": _sv.TOOL_EGRESS_BLOCK,
"arguments": {
"dockerfile": "FROM python:3.13\n",
"justification": "need a capability",
"host": "example.com",
"justification": "need a route",
},
},
config,
@@ -411,8 +412,7 @@ class TestHttpEndToEnd(unittest.TestCase):
self.assertEqual("2.0", result["jsonrpc"])
self.assertEqual(1, result["id"])
names = [t["name"] for t in result["result"]["tools"]] # type: ignore[index]
self.assertIn(_sv.TOOL_CAPABILITY_BLOCK, names)
self.assertNotIn("egress-block", names)
self.assertIn(_sv.TOOL_EGRESS_BLOCK, names)
def test_unknown_method_returns_jsonrpc_error(self):
result = self._post_jsonrpc(