Compare commits

..

9 Commits

Author SHA1 Message Date
didericis 83cb64378c docs(prd): unwrap 80-column hard-wrapping in the forge PRD
test / unit (pull_request) Successful in 50s
test / integration (pull_request) Successful in 18s
test / coverage (pull_request) Successful in 1m6s
Reflow the PRD prose to single logical lines per paragraph / list item /
blockquote instead of hard-wrapping at ~80 columns. Code fences, tables,
headings, and blank lines are untouched; content is unchanged (only the
redundant `>` prefixes on the wrapped blockquote were consolidated).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01WL77TgFxKbs3cidGMG9dz7
2026-07-01 11:16:19 -04:00
didericis 42004d37fd refactor(forge): address PR #318 review — PR/Issue split, sqlite state, drop footer
lint / lint (push) Successful in 1m59s
test / unit (pull_request) Successful in 54s
test / integration (pull_request) Successful in 19s
test / coverage (pull_request) Successful in 1m4s
Addresses the five review comments on PR #318:

- Split PullRequest from Issue and add a dedicated read_pr method on
  Forge/ScopedForge/GiteaForge (a PR carries merge state an issue does
  not); is_pr_open now derives from read_pr.
- Replace the JSON-file forge state with a thin swappable CRUD interface
  (ForgeStateStore) backed by SQLite (SqliteForgeStateStore) at
  ~/.bot-bottle/bot-bottle.db.
- Remove the provenance footer (provenance.py + its test): a mutable,
  unsigned PR comment is not an audit record.
- Reword the PRD: provenance is exposed via an API, not surfaced in the
  PR; document the Issue/PullRequest split and the SQLite store.

pyright clean (whole repo), pylint 10/10, 38 forge/resume unit tests pass;
no remaining refs to the removed provenance module or old JSON state API.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01WL77TgFxKbs3cidGMG9dz7
2026-07-01 08:37:25 -04:00
didericis f211ece6bf fix(tests): resolve pyright strict errors in forge test helpers
lint / lint (push) Successful in 2m18s
test / unit (pull_request) Successful in 1m1s
test / integration (pull_request) Successful in 22s
test / coverage (pull_request) Successful in 1m19s
CI runs `pyright .` over the whole repo including tests; the earlier
run only checked the source paths. The test helpers used `**over`
dict-splat into typed constructors, which pyright strict rejects.

- forge_state: build a typed ForgeState base and dataclasses.replace(**over)
- provenance: explicit typed keyword params instead of a **over dict
- resume: _launch_kwargs returns dict[str, Any] (copy call_args.kwargs)
- forge_base: assert PermissionError in __mro__ (avoids always-true issubclass)
- client: annotate _resp body param; type: ignore the mock __enter__ lambda

pyright . now 0 errors; 47 tests still pass; pylint 9.97/10.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01WL77TgFxKbs3cidGMG9dz7
2026-07-01 07:58:55 -04:00
didericis a229a22d54 feat(forge): forge library layer for native integration (PRD chunks 1-3, 5)
lint / lint (push) Failing after 2m9s
test / unit (pull_request) Successful in 58s
test / integration (pull_request) Successful in 21s
test / coverage (pull_request) Successful in 1m23s
Implements the bot-bottle side of the forge-native PRD that is
self-contained in this repo (the forge sidecar and orchestrate command
belong to the separate bot-bottle-orchestrator, a PRD non-goal):

- contrib/forge/base.py: Forge ABC + ScopedForge enforcing the
  read-anywhere / write-scoped model (writes rejected outside the
  assigned issue/PRs via ForgeScopeError).
- contrib/gitea/client.py: GiteaClient (stdlib-only HTTP, mirrors the
  deploy-key provisioner) + GiteaForge. Token held by the caller (the
  sidecar), not injected by cred-proxy.
- contrib/gitea/forge_state.py: ForgeState dataclass + atomic
  read/write/delete/all under ~/.bot-bottle/forge/<owner>/<repo>/.
- contrib/gitea/provenance.py: build_provenance_footer — collapsed
  markdown audit footer; watchdog/gitleaks/egress rendering.
- cli/resume.py: `resume --headless --prompt` reusing the shipped
  assume_yes + headless_prompt launch core (the new half of chunk 1).

47 new unit tests; pylint 9.98/10, pyright clean. Forge sidecar (chunk
4), orchestrate command (chunk 6), and forge_env plumbing are deferred:
their only consumer is the separate orchestrator and they are untestable
in isolation here.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01WL77TgFxKbs3cidGMG9dz7
2026-06-30 19:39:49 -04:00
didericis 738990b2df Merge remote-tracking branch 'origin/main' into forge-native-integration 2026-06-30 19:19:01 -04:00
didericis 4cb106b48d docs(prd): reconcile headless primitives with shipped start --headless
#315 already merged `start --headless` (assume_yes on _launch_bottle +
AgentProvider.headless_prompt). The PRD's proposed start_headless /
attach_agent_headless helpers were redundant with it, and the latter
diverged by hand-rolling --no-interactive/-p instead of using the
headless_prompt provider abstraction. Drop them.

Scope the remaining headless work to what's actually new: a forge_env
hook threaded into the existing _launch_bottle core, and a `resume
--headless` path (resume has no non-interactive entry point today).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01WL77TgFxKbs3cidGMG9dz7
2026-06-30 17:46:59 -04:00
didericis ebad90bfa9 docs(prd): adopt forge sidecar (option 3) for native integration
Flip the forge-native-integration PRD from option 2 (agent calls the
Gitea API directly via cred-proxy; done signal parsed from comments) to
option 3 per issue #317 comment 2715: a forge sidecar backed by a Forge
abstract class.

- signal_done(status, summary) replaces comment-parsing as the done signal
- semantic audit trail from the sidecar feeds provenance directly
- read-anywhere / write-scoped enforcement, tighter than repo-wide API keys
- forge-agnostic agent prompts and sidecar protocol
- DeployKeyProvisioner subsumption deferred; share the HTTP client only

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01WL77TgFxKbs3cidGMG9dz7
2026-06-30 17:39:33 -04:00
didericis-claude 1789deaf73 docs: update forge PRD — orchestrator split, done signal, org targeting, forge env vars 2026-06-29 12:39:02 -04:00
didericis-claude b607d68a0e docs: add PRD for forge native integration 2026-06-29 12:10:41 -04:00
40 changed files with 1674 additions and 1045 deletions
+2 -4
View File
@@ -18,7 +18,7 @@
# /git-gate-entrypoint.sh docker-cp'd at start time
# /git-gate/creds/* docker-cp'd at start time
# /git/* bare repos, populated at runtime
# /run/supervise/bot-bottle.db bind-mounted at run time
# /run/supervise/queue/ bind-mounted at run time
# /home/mitmproxy/.mitmproxy/ mitmproxy CA dir
#
# Exposed ports inside the container:
@@ -66,8 +66,6 @@ COPY bot_bottle/egress_dlp_config.py /app/egress_dlp_config.py
COPY bot_bottle/egress_addon.py /app/egress_addon.py
COPY bot_bottle/dlp_detectors.py /app/dlp_detectors.py
COPY bot_bottle/yaml_subset.py /app/yaml_subset.py
COPY bot_bottle/queue_store.py /app/queue_store.py
COPY bot_bottle/audit_store.py /app/audit_store.py
COPY bot_bottle/supervise.py /app/supervise.py
COPY bot_bottle/supervise_server.py /app/supervise_server.py
COPY bot_bottle/sidecar_init.py /app/sidecar_init.py
@@ -83,7 +81,7 @@ RUN mkdir -p \
/etc/git-gate \
/git-gate/creds \
/git \
/run/supervise \
/run/supervise/queue \
/home/mitmproxy/.mitmproxy
# Documentation only — the compose renderer publishes whichever
+2 -2
View File
@@ -5,8 +5,8 @@
# bot-bottle
[![test](https://gitea.dideric.is/didericis/bot-bottle/actions/workflows/test.yml/badge.svg?branch=main)](https://gitea.dideric.is/didericis/bot-bottle/actions?workflow=test.yml)
[![coverage](https://img.shields.io/badge/coverage-83%25-brightgreen)](https://coverage.readthedocs.io/)
[![core coverage](https://img.shields.io/badge/core%20coverage-95%25-brightgreen)](https://gitea.dideric.is/didericis/bot-bottle/src/branch/main/docs/decisions/0004-coverage-policy.md)
[![coverage](https://img.shields.io/badge/coverage-84%25-brightgreen)](https://coverage.readthedocs.io/)
[![core coverage](https://img.shields.io/badge/core%20coverage-96%25-brightgreen)](https://gitea.dideric.is/didericis/bot-bottle/src/branch/main/docs/decisions/0004-coverage-policy.md)
**Problem:** Developer wants to run a coding agent without supervision, but they don't want a prompt injected or misbehaving agent wrecking their environment or exfiltrating sensitive data.
-143
View File
@@ -1,143 +0,0 @@
"""SQLite-backed audit store for supervise (PRD 0013)."""
from __future__ import annotations
import sqlite3
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .supervise import AuditEntry
def get_supervise_mod() -> object:
"""Lazy import of supervise to avoid a circular-import at module init time.
Mirrors our own module identity so patches on supervise.bot_bottle_root
propagate correctly in both flat (sidecar / sys.path-injection tests) and
package contexts."""
import sys
sv_name = "supervise" if __name__ == "audit_store" else "bot_bottle.supervise"
if sv_name in sys.modules:
return sys.modules[sv_name]
try:
import bot_bottle.supervise as _m
except ImportError:
import supervise as _m # type: ignore[import-not-found] # pylint: disable=import-error,no-name-in-module
return _m
# One entry per schema version: _MIGRATIONS[0] brings a fresh DB (user_version=0)
# to version 1, _MIGRATIONS[1] to version 2, and so on. Add new migrations at
# the end; never edit existing ones.
_MIGRATIONS: list[str] = [
# v1 — initial schema
"""
CREATE TABLE IF NOT EXISTS supervise_audit_entries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
bottle_slug TEXT NOT NULL,
component TEXT NOT NULL,
operator_action TEXT NOT NULL,
operator_notes TEXT NOT NULL,
justification TEXT NOT NULL,
diff TEXT NOT NULL
)
""",
]
class AuditStore:
"""SQLite-backed persistent store for supervise audit entries."""
def __init__(self, db_path: Path | None = None) -> None:
self.db_path = db_path or get_supervise_mod().host_db_path() # type: ignore[attr-defined]
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._init()
def write_audit_entry(self, entry: AuditEntry) -> Path:
with self._connect() as conn:
conn.execute(
"""
INSERT INTO supervise_audit_entries (
timestamp, bottle_slug, component, operator_action,
operator_notes, justification, diff
) VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
entry.timestamp,
entry.bottle_slug,
entry.component,
entry.operator_action,
entry.operator_notes,
entry.justification,
entry.diff,
),
)
self._chmod()
return self.db_path
def read_audit_entries(self, component: str, slug: str) -> list[AuditEntry]:
if not self.db_path.is_file():
return []
with self._connect() as conn:
rows = conn.execute(
"""
SELECT * FROM supervise_audit_entries
WHERE component = ? AND bottle_slug = ?
ORDER BY id
""",
(component, slug),
).fetchall()
return [self._row_to_entry(row) for row in rows]
@staticmethod
def _row_to_entry(row: sqlite3.Row) -> AuditEntry:
m = get_supervise_mod()
return m.AuditEntry( # type: ignore[attr-defined]
timestamp=row["timestamp"],
bottle_slug=row["bottle_slug"],
component=row["component"],
operator_action=row["operator_action"],
operator_notes=row["operator_notes"],
justification=row["justification"],
diff=row["diff"],
)
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
return conn
_SCHEMA_KEY = "audit_store"
def _init(self) -> None:
with self._connect() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS schema_versions (
module TEXT PRIMARY KEY,
version INTEGER NOT NULL DEFAULT 0
)
"""
)
row = conn.execute(
"SELECT version FROM schema_versions WHERE module = ?",
(self._SCHEMA_KEY,),
).fetchone()
version = row[0] if row else 0
for i, sql in enumerate(_MIGRATIONS[version:], start=version + 1):
conn.execute(sql)
conn.execute(
"INSERT OR REPLACE INTO schema_versions (module, version) VALUES (?, ?)",
(self._SCHEMA_KEY, i),
)
self._chmod()
def _chmod(self) -> None:
try:
self.db_path.chmod(0o600)
except OSError:
pass
__all__ = ["AuditStore"]
+5 -4
View File
@@ -34,7 +34,7 @@ from ...egress import (
from ...git_gate import GIT_GATE_HOSTNAME
from ...log import die, warn
from ...supervise import (
DB_PATH_IN_CONTAINER,
QUEUE_DIR_IN_CONTAINER,
SUPERVISE_HOSTNAME,
SUPERVISE_PORT,
)
@@ -163,15 +163,16 @@ def _sidecar_bundle_service(plan: DockerBottlePlan) -> dict[str, Any]:
if sp is not None:
env += [
f"SUPERVISE_BOTTLE_SLUG={plan.slug}",
f"SUPERVISE_DB_PATH={DB_PATH_IN_CONTAINER}",
f"SUPERVISE_QUEUE_DIR={QUEUE_DIR_IN_CONTAINER}",
f"SUPERVISE_PORT={SUPERVISE_PORT}",
]
volumes.append({
"type": "bind",
"source": str(sp.db_path),
"target": DB_PATH_IN_CONTAINER,
"source": str(sp.queue_dir),
"target": QUEUE_DIR_IN_CONTAINER,
"read_only": False,
})
internal_aliases = [EGRESS_HOSTNAME]
if gp.upstreams:
internal_aliases.append(GIT_GATE_HOSTNAME)
+1 -9
View File
@@ -37,10 +37,7 @@ from pathlib import Path
from typing import Callable, Generator
from ...egress import egress_resolve_token_values
from ...git_gate import (
provision_git_gate_dynamic_keys,
revoke_git_gate_provisioned_keys,
)
from ...git_gate import revoke_git_gate_provisioned_keys
from ...log import info, warn
from . import network as network_mod
from . import util as docker_mod
@@ -121,11 +118,6 @@ def launch(
git_gate_plan = plan.git_gate_plan
if git_gate_plan.upstreams:
git_gate_plan = provision_git_gate_dynamic_keys(
plan.manifest.bottle,
git_gate_plan,
git_gate_state_dir(plan.slug),
)
git_gate_plan = dataclasses.replace(
git_gate_plan,
internal_network=internal_network,
+4 -22
View File
@@ -28,12 +28,9 @@ from ...egress import (
egress_resolve_token_values,
egress_sidecar_env_entries,
)
from ...git_gate import (
provision_git_gate_dynamic_keys,
revoke_git_gate_provisioned_keys,
)
from ...git_gate import revoke_git_gate_provisioned_keys
from ...log import die, info, warn
from ...supervise import DB_PATH_IN_CONTAINER, SUPERVISE_PORT
from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT
from ...util import expand_tilde
from ..docker.egress import EGRESS_CA_IN_CONTAINER, EGRESS_PORT
from ..docker.git_gate import (
@@ -101,8 +98,6 @@ def launch(
egress_network = egress_network_name(plan.slug)
_create_networks(internal_network, egress_network, stack)
plan = _provision_git_gate_keys(plan)
sidecar_name = sidecar_container_name(plan.slug)
container_mod.force_remove_container(sidecar_name)
_start_sidecar_bundle(plan, sidecar_name, internal_network, egress_network)
@@ -246,19 +241,6 @@ def _stamp_agent_urls(
)
def _provision_git_gate_keys(
plan: MacosContainerBottlePlan,
) -> MacosContainerBottlePlan:
if not plan.git_gate_plan.upstreams:
return plan
git_gate_plan = provision_git_gate_dynamic_keys(
plan.manifest.bottle,
plan.git_gate_plan,
git_gate_state_dir(plan.slug),
)
return dataclasses.replace(plan, git_gate_plan=git_gate_plan)
def _stage_git_gate(plan: MacosContainerBottlePlan, sidecar_name: str) -> None:
gp = plan.git_gate_plan
if not gp.upstreams:
@@ -379,7 +361,7 @@ def _sidecar_env_entries(plan: MacosContainerBottlePlan) -> tuple[str, ...]:
if plan.supervise_plan is not None:
env += [
f"SUPERVISE_BOTTLE_SLUG={plan.slug}",
f"SUPERVISE_DB_PATH={DB_PATH_IN_CONTAINER}",
f"SUPERVISE_QUEUE_DIR={QUEUE_DIR_IN_CONTAINER}",
f"SUPERVISE_PORT={SUPERVISE_PORT}",
]
return tuple(env)
@@ -405,7 +387,7 @@ def _sidecar_mounts(
sp = plan.supervise_plan
if sp is not None:
mounts.append((str(sp.db_path), DB_PATH_IN_CONTAINER, False))
mounts.append((str(sp.queue_dir), QUEUE_DIR_IN_CONTAINER, False))
return tuple(mounts)
+4 -21
View File
@@ -27,7 +27,7 @@ from ...egress import (
egress_resolve_token_values,
egress_sidecar_env_entries,
)
from ...supervise import DB_PATH_IN_CONTAINER, SUPERVISE_PORT
from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT
from ...util import expand_tilde
from ..docker import util as docker_mod
from ..docker.egress import (
@@ -41,10 +41,7 @@ from ..docker.git_gate import (
GIT_GATE_ENTRYPOINT_IN_CONTAINER,
GIT_GATE_HOOK_IN_CONTAINER,
)
from ...git_gate import (
provision_git_gate_dynamic_keys,
revoke_git_gate_provisioned_keys,
)
from ...git_gate import revoke_git_gate_provisioned_keys
from ...log import info, warn
from ...bottle_state import (
egress_state_dir,
@@ -177,7 +174,6 @@ def _start_bundle(
) -> SmolmachinesBottlePlan:
"""Build the BundleLaunchSpec, resolve token env, start the
sidecar bundle container, and register teardown."""
plan = _provision_git_gate_keys(plan)
bundle_spec = _bundle_launch_spec(plan, network, loopback_ip)
token_env = _resolve_token_env(plan, dict(os.environ))
_bundle.ensure_bundle_image(bundle_spec.image)
@@ -186,19 +182,6 @@ def _start_bundle(
return plan
def _provision_git_gate_keys(
plan: SmolmachinesBottlePlan,
) -> SmolmachinesBottlePlan:
if not plan.git_gate_plan.upstreams:
return plan
git_gate_plan = provision_git_gate_dynamic_keys(
plan.manifest.bottle,
plan.git_gate_plan,
git_gate_state_dir(plan.slug),
)
return dataclasses.replace(plan, git_gate_plan=git_gate_plan)
def _discover_urls(
plan: SmolmachinesBottlePlan,
loopback_ip: str,
@@ -369,10 +352,10 @@ def _bundle_launch_spec(
daemons.append("supervise")
env += [
f"SUPERVISE_BOTTLE_SLUG={plan.slug}",
f"SUPERVISE_DB_PATH={DB_PATH_IN_CONTAINER}",
f"SUPERVISE_QUEUE_DIR={QUEUE_DIR_IN_CONTAINER}",
f"SUPERVISE_PORT={SUPERVISE_PORT}",
]
volumes.append((str(sp.db_path), DB_PATH_IN_CONTAINER, False))
volumes.append((str(sp.queue_dir), QUEUE_DIR_IN_CONTAINER, False))
# Container ports the agent reaches from the smolvm guest —
# published on host loopback so the guest can dial via TSI +
+3 -2
View File
@@ -284,8 +284,9 @@ def git_gate_state_dir(identity: str) -> Path:
def supervise_state_dir(identity: str) -> Path:
"""State subdir reserved for supervise sidecar bind-mount sources.
Runtime queue/audit rows live in the host-level bot-bottle SQLite
database, so they survive state-dir cleanup."""
The queue dir is intentionally NOT under here — it lives at
~/.bot-bottle/queue/<slug>/ alongside the audit logs, so it
survives state-dir cleanup."""
return bottle_state_dir(identity) / _SUPERVISE_SUBDIR
+24
View File
@@ -27,12 +27,34 @@ from .start import _launch_bottle
def cmd_resume(argv: list[str]) -> int:
parser = argparse.ArgumentParser(prog=f"{PROG} resume", add_help=True)
parser.add_argument("--dry-run", action="store_true")
parser.add_argument(
"--headless",
action="store_true",
help=(
"non-interactive rehydrate: deliver --prompt to the agent and "
"skip the y/N preflight. For orchestrators / the freeze-rehydrate "
"loop."
),
)
parser.add_argument(
"--prompt",
default=None,
help="follow-up prompt delivered to the agent (required with --headless)",
)
parser.add_argument(
"identity",
help="bottle identity from a prior `start` (see its session-end output)",
)
args = parser.parse_args(argv)
if args.prompt and not args.headless:
die("--prompt is only valid with --headless")
if args.headless and not args.prompt:
die(
"--headless requires --prompt: "
"./cli.py resume <identity> --headless --prompt 'Address the review'"
)
metadata = read_metadata(args.identity)
if metadata is None:
die(
@@ -56,4 +78,6 @@ def cmd_resume(argv: list[str]) -> int:
spec,
dry_run=args.dry_run,
backend_name=backend_name,
assume_yes=args.headless,
headless_prompt_text=args.prompt or "",
)
+16 -9
View File
@@ -45,7 +45,7 @@ from ..supervise import (
TOOL_EGRESS_BLOCK,
TOOL_GITLEAKS_ALLOW,
TOOL_EGRESS_TOKEN_ALLOW,
list_all_pending_proposals,
list_pending_proposals,
render_diff,
write_audit_entry,
write_response,
@@ -63,9 +63,10 @@ _REPORT_ONLY_TOOLS: tuple[str, ...] = (TOOL_GITLEAKS_ALLOW, TOOL_EGRESS_TOKEN_AL
@dataclass(frozen=True)
class QueuedProposal:
"""A pending proposal from the supervise queue."""
"""A pending proposal plus the queue dir it was found in."""
proposal: Proposal
queue_dir: Path
# Errors any remediation engine may raise. Caught by the TUI key
@@ -85,11 +86,16 @@ def apply_routes_change(slug: str, content: str) -> tuple[str, str]:
def discover_pending() -> list[QueuedProposal]:
"""Collect pending proposals across bottles."""
out = [
QueuedProposal(proposal=proposal)
for proposal in list_all_pending_proposals()
]
"""Walk ~/.bot-bottle/queue/* and collect pending proposals."""
queue_root = _supervise.bot_bottle_root() / "queue"
if not queue_root.is_dir():
return []
out: list[QueuedProposal] = []
for slug_dir in sorted(queue_root.iterdir()):
if not slug_dir.is_dir():
continue
for proposal in list_pending_proposals(slug_dir):
out.append(QueuedProposal(proposal=proposal, queue_dir=slug_dir))
out.sort(key=lambda q: q.proposal.arrival_timestamp)
return out
@@ -112,6 +118,7 @@ def _detail_lines(
(f"tool: {p.tool}", 0),
(f"id: {p.id}", 0),
(f"arrived: {p.arrival_timestamp}", 0),
(f"queue: {qp.queue_dir}", 0),
("", 0),
("justification:", 0),
]
@@ -158,7 +165,7 @@ def approve(
notes=notes,
final_file=final_file,
)
write_response(qp.proposal.bottle_slug, response)
write_response(qp.queue_dir, response)
_write_audit(
qp, action=status, notes=notes,
diff_before=diff_before, diff_after=diff_after,
@@ -172,7 +179,7 @@ def reject(qp: QueuedProposal, *, reason: str) -> None:
notes=reason,
final_file=None,
)
write_response(qp.proposal.bottle_slug, response)
write_response(qp.queue_dir, response)
_write_audit(qp, action=STATUS_REJECTED, notes=reason, diff_before="", diff_after="")
+165
View File
@@ -0,0 +1,165 @@
"""Forge abstraction (PRD forge-native-integration, chunk 3).
The `Forge` abstract class is the provider-agnostic surface a forge
sidecar dispatches to: read issues/comments, post comments, edit
descriptions, and the membership / PR lookups the orchestrator needs.
Each forge (Gitea first) implements it; the sidecar protocol and the
agent prompt stay forge-agnostic.
`signal_done` is deliberately *not* a `Forge` method completion is a
sidecar concept relayed to the orchestrator over a queue dir, not a
forge API operation.
`ScopedForge` enforces the PRD's **read-anywhere / write-scoped** model:
reads pass through to any issue/PR for context; writes are rejected
unless the target is the assigned issue or one of its PRs. This bounds
the blast radius of a prompt-injected agent below repo-wide API-key
permissions.
"""
from __future__ import annotations
import abc
from collections.abc import Iterable
from dataclasses import dataclass
@dataclass(frozen=True)
class Issue:
"""A forge issue (not a PR — see `PullRequest`)."""
number: int
title: str
body: str
state: str # "open" | "closed"
@dataclass(frozen=True)
class PullRequest:
"""A forge pull request. Kept distinct from `Issue` even though some
forges model PRs as issues on the wire: the domain objects carry
different data (a PR has merge state) and are read through different
methods (`read_pr` vs `read_issue`)."""
number: int
title: str
body: str
state: str # "open" | "closed"
merged: bool
@dataclass(frozen=True)
class Comment:
id: int
user: str # login of the comment author
body: str
class ForgeScopeError(PermissionError):
"""Raised by `ScopedForge` when a write targets an issue/PR outside
the assigned scope."""
class Forge(abc.ABC):
"""Provider-agnostic forge operations. Implementations wrap a
per-provider HTTP client and translate to `Issue` / `Comment`."""
@abc.abstractmethod
def read_issue(self, number: int) -> Issue:
"""Read an issue body (read-anywhere)."""
@abc.abstractmethod
def read_pr(self, number: int) -> PullRequest:
"""Read a pull request, including its merge state (read-anywhere)."""
@abc.abstractmethod
def read_comments(self, number: int) -> list[Comment]:
"""Read a thread's comments (read-anywhere)."""
@abc.abstractmethod
def post_comment(self, number: int, body: str) -> None:
"""Post a comment to an issue or PR (write-scoped)."""
@abc.abstractmethod
def update_description(self, number: int, body: str) -> None:
"""Replace an issue or PR body (write-scoped)."""
@abc.abstractmethod
def is_org_member(self, org: str, username: str) -> bool:
"""Whether `username` is a member of `org`."""
@abc.abstractmethod
def get_pr_for_issue(self, number: int) -> int | None:
"""The PR number linked to an issue, or None when there is none."""
@abc.abstractmethod
def is_pr_open(self, number: int) -> bool:
"""Whether the given PR is still open."""
class ScopedForge(Forge):
"""Read-anywhere / write-scoped wrapper around a concrete `Forge`.
`post_comment` and `update_description` are rejected with
`ForgeScopeError` unless the target number is the assigned issue or
one of the assigned PRs. Every other method delegates unchanged, so
reads, membership checks, and PR lookups work against any number for
context.
The writable set is fixed at construction. The sidecar reconstructs
a `ScopedForge` when a PR is discovered (`get_pr_for_issue`) so the
new PR becomes writable; this class does not mutate its own scope.
"""
def __init__(
self,
inner: Forge,
*,
assigned_issue: int,
assigned_prs: Iterable[int] = (),
) -> None:
self._inner = inner
self._assigned_issue = assigned_issue
self._writable = {assigned_issue, *assigned_prs}
@property
def writable(self) -> frozenset[int]:
return frozenset(self._writable)
def _check_write(self, number: int) -> None:
if number not in self._writable:
allowed = ", ".join(str(n) for n in sorted(self._writable))
raise ForgeScopeError(
f"write to #{number} denied: out of assigned scope "
f"(writable: {allowed})"
)
# --- read-anywhere: pass through --------------------------------------
def read_issue(self, number: int) -> Issue:
return self._inner.read_issue(number)
def read_pr(self, number: int) -> PullRequest:
return self._inner.read_pr(number)
def read_comments(self, number: int) -> list[Comment]:
return self._inner.read_comments(number)
def is_org_member(self, org: str, username: str) -> bool:
return self._inner.is_org_member(org, username)
def get_pr_for_issue(self, number: int) -> int | None:
return self._inner.get_pr_for_issue(number)
def is_pr_open(self, number: int) -> bool:
return self._inner.is_pr_open(number)
# --- write-scoped: check then delegate --------------------------------
def post_comment(self, number: int, body: str) -> None:
self._check_write(number)
self._inner.post_comment(number, body)
def update_description(self, number: int, body: str) -> None:
self._check_write(number)
self._inner.update_description(number, body)
+174
View File
@@ -0,0 +1,174 @@
"""Gitea HTTP client + `GiteaForge` (PRD forge-native-integration, chunk 3).
`GiteaClient` is the thin stdlib-only HTTP transport (mirrors
`deploy_key_provisioner.py`: `urllib.request`, bounded timeouts,
structured error bodies). `GiteaForge` adapts it to the provider-agnostic
`Forge` surface.
Unlike the option-2 design, the token is held here (the sidecar process
owns it) and passed to the client directly there is no agent-side
cred-proxy route, because the agent never makes forge calls. The HTTP
client is the one piece shared with `GiteaDeployKeyProvisioner`; the two
are deliberately *not* unified behind a common abstract base (see the
deferral note in the PRD).
"""
from __future__ import annotations
import json
import urllib.error
import urllib.request
from typing import Any
from ..forge.base import Comment, Forge, Issue, PullRequest
# Bound every Gitea call: a hung instance must not stall the sidecar.
_API_TIMEOUT_SECS = 30
class GiteaClient:
"""Thin authenticated HTTP client for one repo's Gitea API.
`api_url` is the API base *including* `/api/v1` (matching the
`FORGE_GITEA_API` env var), e.g. `https://gitea.example.com/api/v1`.
"""
def __init__(self, *, api_url: str, owner: str, repo: str, token: str) -> None:
self._api_url = api_url.rstrip("/")
self._owner = owner
self._repo = repo
self._token = token
# --- low-level request -------------------------------------------------
def _request(
self, method: str, path: str, *, body: dict[str, Any] | None = None
) -> tuple[int, Any]:
"""Issue an authenticated request. Returns `(status, parsed_json)`;
parsed_json is None when the response has no body. Raises
`RuntimeError` on any non-2xx except where callers special-case
the HTTPError themselves (membership 404)."""
url = f"{self._api_url}{path}"
data = json.dumps(body).encode() if body is not None else None
headers = {"Authorization": f"token {self._token}"}
if data is not None:
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, data=data, headers=headers, method=method)
with urllib.request.urlopen(req, timeout=_API_TIMEOUT_SECS) as resp:
raw = resp.read()
parsed = json.loads(raw) if raw else None
return resp.status, parsed
def _repo_path(self, suffix: str) -> str:
return f"/repos/{self._owner}/{self._repo}{suffix}"
# --- operations --------------------------------------------------------
def is_org_member(self, org: str, username: str) -> bool:
"""GET /orgs/{org}/members/{username}: 2xx → member, 404 → not.
Other errors propagate so a misconfigured token fails loudly."""
url = f"{self._api_url}/orgs/{org}/members/{username}"
req = urllib.request.Request(
url, headers={"Authorization": f"token {self._token}"}, method="GET"
)
try:
with urllib.request.urlopen(req, timeout=_API_TIMEOUT_SECS):
return True
except urllib.error.HTTPError as exc:
if exc.code == 404:
return False
raise RuntimeError(
f"org membership check failed for {org}/{username}: "
f"HTTP {exc.code}{_read_error_body(exc)}"
) from exc
def get_issue(self, number: int) -> dict[str, Any]:
_status, body = self._request("GET", self._repo_path(f"/issues/{number}"))
return body or {}
def get_comments(self, number: int) -> list[dict[str, Any]]:
_status, body = self._request(
"GET", self._repo_path(f"/issues/{number}/comments")
)
return body or []
def post_comment(self, number: int, body: str) -> None:
self._request(
"POST",
self._repo_path(f"/issues/{number}/comments"),
body={"body": body},
)
def patch_issue_body(self, number: int, body: str) -> None:
self._request(
"PATCH", self._repo_path(f"/issues/{number}"), body={"body": body}
)
def get_pull(self, number: int) -> dict[str, Any]:
_status, body = self._request("GET", self._repo_path(f"/pulls/{number}"))
return body or {}
class GiteaForge(Forge):
"""`Forge` over a `GiteaClient`."""
def __init__(self, client: GiteaClient) -> None:
self._client = client
def read_issue(self, number: int) -> Issue:
raw = self._client.get_issue(number)
return Issue(
number=int(raw.get("number", number)),
title=str(raw.get("title", "")),
body=str(raw.get("body", "") or ""),
state=str(raw.get("state", "")),
)
def read_pr(self, number: int) -> PullRequest:
raw = self._client.get_pull(number)
return PullRequest(
number=int(raw.get("number", number)),
title=str(raw.get("title", "")),
body=str(raw.get("body", "") or ""),
state=str(raw.get("state", "")),
merged=bool(raw.get("merged", False)),
)
def read_comments(self, number: int) -> list[Comment]:
return [
Comment(
id=int(c.get("id", 0)),
user=str((c.get("user") or {}).get("login", "")),
body=str(c.get("body", "") or ""),
)
for c in self._client.get_comments(number)
]
def post_comment(self, number: int, body: str) -> None:
self._client.post_comment(number, body)
def update_description(self, number: int, body: str) -> None:
self._client.patch_issue_body(number, body)
def is_org_member(self, org: str, username: str) -> bool:
return self._client.is_org_member(org, username)
def get_pr_for_issue(self, number: int) -> int | None:
"""Gitea models a PR as an issue with the same number, exposing a
`pull_request` object on the issue. When the queried number is
itself a PR, return it; otherwise None. (The orchestrator tracks
the issuePR mapping in forge state for the cross-number case.)"""
raw = self._client.get_issue(number)
if raw.get("pull_request"):
return int(raw.get("number", number))
return None
def is_pr_open(self, number: int) -> bool:
return self.read_pr(number).state == "open"
def _read_error_body(exc: urllib.error.HTTPError) -> str:
try:
return exc.read().decode("utf-8", errors="replace")
except Exception: # pylint: disable=broad-exception-caught
return ""
+171
View File
@@ -0,0 +1,171 @@
"""Forge state persistence (PRD forge-native-integration, chunk 2).
The orchestrator tracks one record per forge-targeted issue so it can
map an incoming webhook back to the bottle handling it, drive the
freeze / rehydrate loop, and run the watchdog.
State is stored in a local SQLite database in `~/.bot-bottle/`. Access
goes through the thin `ForgeStateStore` CRUD interface so the backing
store (location or engine) can be swapped without touching callers;
`SqliteForgeStateStore` is the first implementation.
"""
from __future__ import annotations
import abc
import json
import sqlite3
from dataclasses import dataclass, field
from pathlib import Path
from ...supervise import bot_bottle_root
_DB_FILENAME = "bot-bottle.db"
# Lifecycle: a bottle is launched (running), frozen on the done signal,
# and destroyed when the PR closes.
STATUS_RUNNING = "running"
STATUS_FROZEN = "frozen"
STATUS_DESTROYED = "destroyed"
@dataclass
class ForgeState:
"""One forge-targeted issue's bottle lifecycle record."""
owner: str
repo: str
issue_number: int
slug: str
agent_name: str
bottle_names: list[str] = field(default_factory=list)
backend_name: str = ""
agent_git_user: str = ""
pr_number: int | None = None
status: str = STATUS_RUNNING
last_checkin_at: str = ""
class ForgeStateStore(abc.ABC):
"""Thin CRUD surface over forge state. Implementations back it with a
concrete store; callers depend only on this interface so the storage
location/engine is swappable."""
@abc.abstractmethod
def upsert(self, state: ForgeState) -> None:
"""Insert or replace the record keyed by (owner, repo, issue)."""
@abc.abstractmethod
def get(self, owner: str, repo: str, issue_number: int) -> ForgeState | None:
"""Fetch one record, or None when absent."""
@abc.abstractmethod
def delete(self, owner: str, repo: str, issue_number: int) -> None:
"""Remove a record. Missing is success (idempotent)."""
@abc.abstractmethod
def all(self) -> list[ForgeState]:
"""Every record, for the status table and the watchdog sweep."""
def default_db_path() -> Path:
return bot_bottle_root() / _DB_FILENAME
class SqliteForgeStateStore(ForgeStateStore):
"""SQLite-backed `ForgeStateStore`. The database lives at
`~/.bot-bottle/bot-bottle.db` by default; pass `db_path` to point at
a different location (tests, alternate homes)."""
def __init__(self, db_path: Path | None = None) -> None:
self._db_path = db_path or default_db_path()
self._db_path.parent.mkdir(parents=True, exist_ok=True)
with self._connect() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS forge_state (
owner TEXT NOT NULL,
repo TEXT NOT NULL,
issue_number INTEGER NOT NULL,
slug TEXT NOT NULL,
agent_name TEXT NOT NULL,
bottle_names TEXT NOT NULL,
backend_name TEXT NOT NULL,
agent_git_user TEXT NOT NULL,
pr_number INTEGER,
status TEXT NOT NULL,
last_checkin_at TEXT NOT NULL,
PRIMARY KEY (owner, repo, issue_number)
)
"""
)
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self._db_path)
conn.row_factory = sqlite3.Row
return conn
def upsert(self, state: ForgeState) -> None:
with self._connect() as conn:
conn.execute(
"""
INSERT OR REPLACE INTO forge_state (
owner, repo, issue_number, slug, agent_name,
bottle_names, backend_name, agent_git_user,
pr_number, status, last_checkin_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
state.owner,
state.repo,
state.issue_number,
state.slug,
state.agent_name,
json.dumps(state.bottle_names),
state.backend_name,
state.agent_git_user,
state.pr_number,
state.status,
state.last_checkin_at,
),
)
def get(self, owner: str, repo: str, issue_number: int) -> ForgeState | None:
with self._connect() as conn:
row = conn.execute(
"SELECT * FROM forge_state "
"WHERE owner = ? AND repo = ? AND issue_number = ?",
(owner, repo, issue_number),
).fetchone()
return _row_to_state(row) if row is not None else None
def delete(self, owner: str, repo: str, issue_number: int) -> None:
with self._connect() as conn:
conn.execute(
"DELETE FROM forge_state "
"WHERE owner = ? AND repo = ? AND issue_number = ?",
(owner, repo, issue_number),
)
def all(self) -> list[ForgeState]:
with self._connect() as conn:
rows = conn.execute(
"SELECT * FROM forge_state ORDER BY owner, repo, issue_number"
).fetchall()
return [_row_to_state(row) for row in rows]
def _row_to_state(row: sqlite3.Row) -> ForgeState:
return ForgeState(
owner=row["owner"],
repo=row["repo"],
issue_number=row["issue_number"],
slug=row["slug"],
agent_name=row["agent_name"],
bottle_names=json.loads(row["bottle_names"]),
backend_name=row["backend_name"],
agent_git_user=row["agent_git_user"],
pr_number=row["pr_number"],
status=row["status"],
last_checkin_at=row["last_checkin_at"],
)
+9 -6
View File
@@ -79,13 +79,14 @@ class EgressAddon:
# only — a restart re-prompts. Mutated only from the asyncio loop that
# runs the addon hooks, so no lock is needed.
self.safe_tokens: set[str] = set()
self._supervise_queue_dir = os.environ.get("SUPERVISE_QUEUE_DIR", "").strip()
self._supervise_slug = os.environ.get("SUPERVISE_BOTTLE_SLUG", "").strip()
self._token_allow_timeout = _token_allow_timeout_from_env(os.environ)
self._reload(initial=True)
self._install_sighup()
def _supervise_available(self) -> bool:
return bool(self._supervise_slug)
return bool(self._supervise_queue_dir and self._supervise_slug)
def _reload(self, *, initial: bool = False) -> None:
try:
@@ -392,8 +393,9 @@ class EgressAddon:
justification=_TOKEN_ALLOW_JUSTIFICATION,
current_file_hash=_sv.sha256_hex(payload),
)
queue_dir = Path(self._supervise_queue_dir)
try:
_sv.write_proposal(proposal)
_sv.write_proposal(queue_dir, proposal)
except OSError as e:
sys.stderr.write(
f"egress: could not queue token-allow proposal: {e}; "
@@ -409,8 +411,8 @@ class EgressAddon:
**self._req_ctx(flow),
}) + "\n")
response = await self._await_token_response(proposal.id)
_sv.archive_proposal(self._supervise_slug, proposal.id)
response = await self._await_token_response(queue_dir, proposal.id)
_sv.archive_proposal(queue_dir, proposal.id)
if response is not None and response.status in (
_sv.STATUS_APPROVED, _sv.STATUS_MODIFIED,
@@ -437,15 +439,16 @@ class EgressAddon:
async def _await_token_response(
self,
queue_dir: Path,
proposal_id: str,
) -> "_sv.Response | None":
"""Poll the DB for the operator's response without blocking the
"""Poll the queue dir for the operator's response without blocking the
proxy event loop. Returns the Response, or None on timeout."""
loop = asyncio.get_running_loop()
deadline = loop.time() + self._token_allow_timeout
while True:
try:
return _sv.read_response(self._supervise_slug, proposal_id)
return _sv.read_response(queue_dir, proposal_id)
except (OSError, ValueError, KeyError):
# Not written yet, or a partial/malformed write — retry until
# the deadline, then fail closed.
+11 -6
View File
@@ -30,6 +30,7 @@ backend-specific and lives on concrete subclasses (see
from __future__ import annotations
import dataclasses
from abc import ABC
from dataclasses import dataclass
from pathlib import Path
@@ -52,7 +53,6 @@ from .git_gate_render import (
_gitconfig_validate_value,
)
from .git_gate_provision import (
provision_git_gate_dynamic_keys,
revoke_git_gate_provisioned_keys,
_provision_dynamic_key,
_resolve_identity_file,
@@ -93,14 +93,20 @@ class GitGate(ABC):
entrypoint, pre-receive hook, and access-hook scripts (mode
600) under `stage_dir`. Pure host-side, no docker subprocess.
For `gitea` key entries, the returned upstream intentionally
has an empty identity file. Backend launch fills that in after
the operator confirms the preflight.
For `gitea` key entries, also generates and registers
a fresh deploy key via the forge API and writes the private key
+ key ID to `stage_dir`.
Returned plan is incomplete: the launch step must fill
`internal_network` / `egress_network` via `dataclasses.replace`
before passing the plan to `.start`."""
upstreams = git_gate_upstreams_for_bottle(bottle)
upstreams_list = list(git_gate_upstreams_for_bottle(bottle))
for i, entry in enumerate(bottle.git):
upstreams_list[i] = dataclasses.replace(
upstreams_list[i],
identity_file=_resolve_identity_file(entry, slug, stage_dir),
)
upstreams = tuple(upstreams_list)
entrypoint = stage_dir / "git_gate_entrypoint.sh"
entrypoint.write_text(git_gate_render_entrypoint(upstreams))
entrypoint.chmod(0o600)
@@ -156,7 +162,6 @@ __all__ = [
"git_gate_render_entrypoint",
"git_gate_render_hook",
"git_gate_render_access_hook",
"provision_git_gate_dynamic_keys",
"revoke_git_gate_provisioned_keys",
"_gitconfig_validate_value",
"_provision_dynamic_key",
-43
View File
@@ -9,16 +9,10 @@ imported (`deploy_key_provisioner`) to keep its cost off the host path.
from __future__ import annotations
import os
import dataclasses
from pathlib import Path
from typing import TYPE_CHECKING
from .log import info
from .manifest import ManifestBottle, ManifestGitEntry
from .git_gate_render import GitGateUpstream
if TYPE_CHECKING:
from .git_gate import GitGatePlan
def _provision_dynamic_key(
entry: ManifestGitEntry,
@@ -101,45 +95,8 @@ def _resolve_identity_file(entry: ManifestGitEntry, slug: str, stage_dir: Path)
return entry.IdentityFile
def provision_git_gate_dynamic_keys(
bottle: ManifestBottle,
plan: "GitGatePlan",
stage_dir: Path,
) -> "GitGatePlan":
"""Provision dynamic git-gate keys and return an updated plan.
This runs during backend launch, after the operator confirms the
preflight. Plan preparation intentionally stays side-effect-light:
dry-runs and aborted launches must not create remote deploy keys.
"""
if not plan.upstreams:
return plan
upstreams_by_name: dict[str, GitGateUpstream] = {
upstream.name: upstream for upstream in plan.upstreams
}
updated: list[GitGateUpstream] = []
for entry in bottle.git:
upstream = upstreams_by_name.get(entry.Name)
if upstream is None:
continue
if entry.Key.provider == "gitea":
identity_file = _provision_dynamic_key(entry, plan.slug, stage_dir)
upstream = dataclasses.replace(upstream, identity_file=identity_file)
updated.append(upstream)
if len(updated) != len(plan.upstreams):
updated_names = {u.name for u in updated}
for upstream in plan.upstreams:
if upstream.name not in updated_names:
updated.append(upstream)
return dataclasses.replace(plan, upstreams=tuple(updated))
__all__ = [
"revoke_git_gate_provisioned_keys",
"provision_git_gate_dynamic_keys",
"_provision_dynamic_key",
"_resolve_identity_file",
]
+43 -38
View File
@@ -234,13 +234,13 @@ import hashlib
import json
import os
import sys
import uuid
from pathlib import Path
from bot_bottle import supervise as _sv
report_path = Path(sys.argv[1])
queue_dir = os.environ.get("SUPERVISE_QUEUE_DIR", "")
slug = os.environ.get("SUPERVISE_BOTTLE_SLUG", "")
if not slug:
if not queue_dir or not slug:
sys.exit(2)
try:
@@ -277,19 +277,31 @@ for i, finding in enumerate(raw, 1):
])
payload = "\n".join(lines).rstrip() + "\n"
proposal = _sv.Proposal.new(
bottle_slug=slug,
tool=_sv.TOOL_GITLEAKS_ALLOW,
proposed_file=payload,
justification=(
proposal_id = str(uuid.uuid4())
proposal = {
"id": proposal_id,
"bottle_slug": slug,
"tool": "gitleaks-allow",
"proposed_file": payload,
"justification": (
"git-gate found gitleaks findings hidden by # gitleaks:allow; "
"approve only for dummy test fixtures or confirmed false positives"
),
current_file_hash=hashlib.sha256(payload.encode("utf-8")).hexdigest(),
now=datetime.datetime.now(datetime.timezone.utc),
)
_sv.write_proposal(proposal)
print(proposal.id)
"arrival_timestamp": datetime.datetime.now(
datetime.timezone.utc
).isoformat(),
"current_file_hash": hashlib.sha256(payload.encode("utf-8")).hexdigest(),
}
queue = Path(queue_dir)
queue.mkdir(parents=True, exist_ok=True)
path = queue / f"{proposal_id}.proposal.json"
tmp = path.with_suffix(path.suffix + ".tmp")
with tmp.open("w", encoding="utf-8") as f:
json.dump(proposal, f, indent=2)
f.write("\n")
os.chmod(tmp, 0o600)
os.replace(tmp, path)
print(proposal_id)
PY
)
rc=$?
@@ -302,7 +314,8 @@ PY
return 1
fi
slug=${SUPERVISE_BOTTLE_SLUG:-}
queue_dir=${SUPERVISE_QUEUE_DIR:-}
response_file="$queue_dir/${proposal_id}.response.json"
timeout=${SUPERVISE_GITLEAKS_ALLOW_TIMEOUT_SECONDS:-300}
case "$timeout" in
''|*[!0-9]*)
@@ -314,35 +327,26 @@ PY
echo "git-gate: approve with './cli.py supervise' to continue this push" >&2
waited=0
while [ "$waited" -lt "$timeout" ]; do
status=$(python3 - "$slug" "$proposal_id" <<'PY'
if [ -f "$response_file" ]; then
status=$(python3 - "$response_file" <<'PY'
import json
import sys
from bot_bottle import supervise as _sv
slug = sys.argv[1]
try:
response = _sv.read_response(slug, sys.argv[2])
except FileNotFoundError:
sys.exit(2)
print(response.status)
with open(sys.argv[1], encoding="utf-8") as f:
raw = json.load(f)
except (OSError, json.JSONDecodeError):
sys.exit(1)
status = raw.get("status")
if not isinstance(status, str):
sys.exit(1)
print(status)
PY
)
rc=$?
if [ "$rc" -eq 2 ]; then
status=""
elif [ "$rc" -ne 0 ]; then
status="invalid"
fi
if [ -n "$status" ]; then
) || status=""
case "$status" in
approved|modified)
python3 - "$slug" "$proposal_id" <<'PY' || true
import sys
from bot_bottle import supervise as _sv
_sv.archive_proposal(sys.argv[1], sys.argv[2])
PY
mkdir -p "$queue_dir/processed"
mv -f "$queue_dir/${proposal_id}.proposal.json" "$queue_dir/processed/" 2>/dev/null || true
mv -f "$queue_dir/${proposal_id}.response.json" "$queue_dir/processed/" 2>/dev/null || true
echo "git-gate: supervisor approved # gitleaks:allow for $ref" >&2
return 0
;;
@@ -495,3 +499,4 @@ if ! git -C "$repo_dir" rev-parse --verify HEAD >/dev/null 2>&1; then
fi
exit 0
"""
+2 -7
View File
@@ -16,16 +16,11 @@ from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from urllib.parse import urlsplit
from .git_gate import GIT_GATE_TIMEOUT_SECS
DEFAULT_PORT = 9420
# Mirrors git_gate_render.GIT_GATE_TIMEOUT_SECS. Duplicated rather than
# imported: this module ships as a flat top-level sibling in the sidecar
# bundle image (see Dockerfile.sidecars), not as part of the bot_bottle
# package, so `bot_bottle.git_gate` and its dependency chain aren't
# available at runtime.
GIT_GATE_TIMEOUT_SECS = 15
# Bound memory use while still allowing ordinary git push packfiles.
MAX_BODY_BYTES = 100 * 1024 * 1024
-270
View File
@@ -1,270 +0,0 @@
"""SQLite-backed queue store for supervise proposals and responses (PRD 0013)."""
from __future__ import annotations
import os
import sqlite3
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .supervise import Proposal, Response
def get_supervise_mod() -> object:
"""Lazy import of supervise to avoid a circular-import at module init time.
By the time any QueueStore method is called, both modules are fully loaded.
Mirrors our own module identity: when we are 'queue_store' (sidecar flat
context or tests that inject bot_bottle/ into sys.path) we use the flat
'supervise' module so that patches on supervise.bot_bottle_root propagate
correctly. When we are 'bot_bottle.queue_store' we use 'bot_bottle.supervise'."""
import sys
sv_name = "supervise" if __name__ == "queue_store" else "bot_bottle.supervise"
if sv_name in sys.modules:
return sys.modules[sv_name]
try:
import bot_bottle.supervise as _m
except ImportError:
import supervise as _m # type: ignore[import-not-found] # pylint: disable=import-error,no-name-in-module
return _m
# One entry per schema version: _MIGRATIONS[0] brings a fresh DB (user_version=0)
# to version 1, _MIGRATIONS[1] to version 2, and so on. Add new migrations at
# the end; never edit existing ones.
_MIGRATIONS: list[str] = [
# v1 — proposals table
"""
CREATE TABLE IF NOT EXISTS supervise_proposals (
queue_key TEXT NOT NULL,
id TEXT NOT NULL,
bottle_slug TEXT NOT NULL,
tool TEXT NOT NULL,
proposed_file TEXT NOT NULL,
justification TEXT NOT NULL,
arrival_timestamp TEXT NOT NULL,
current_file_hash TEXT NOT NULL,
archived INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (queue_key, id)
)
""",
# v2 — responses table
"""
CREATE TABLE IF NOT EXISTS supervise_responses (
queue_key TEXT NOT NULL,
proposal_id TEXT NOT NULL,
status TEXT NOT NULL,
notes TEXT NOT NULL,
final_file TEXT,
archived INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (queue_key, proposal_id)
)
""",
]
class QueueStore:
"""SQLite-backed persistent store for supervise proposals and responses."""
def __init__(self, queue_key: str, db_path: Path | None = None) -> None:
self.queue_key = queue_key
if db_path is not None:
self.db_path = db_path
else:
# In the sidecar container SUPERVISE_DB_PATH points at the
# bind-mounted host DB. On the host this env var is never set,
# so we always fall through to host_db_path().
env_path = os.environ.get("SUPERVISE_DB_PATH", "").strip()
self.db_path = Path(env_path) if env_path else get_supervise_mod().host_db_path() # type: ignore[attr-defined]
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._init()
def write_proposal(self, proposal: Proposal) -> Path:
with self._connect() as conn:
conn.execute(
"""
INSERT OR REPLACE INTO supervise_proposals (
queue_key, id, bottle_slug, tool, proposed_file, justification,
arrival_timestamp, current_file_hash, archived
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, 0)
""",
(
self.queue_key,
proposal.id,
proposal.bottle_slug,
proposal.tool,
proposal.proposed_file,
proposal.justification,
proposal.arrival_timestamp,
proposal.current_file_hash,
),
)
self._chmod()
return self.db_path
def read_proposal(self, proposal_id: str) -> Proposal:
with self._connect() as conn:
row = conn.execute(
"""
SELECT * FROM supervise_proposals
WHERE queue_key = ? AND id = ? AND archived = 0
""",
(self.queue_key, proposal_id),
).fetchone()
if row is None:
raise FileNotFoundError(proposal_id)
return self._row_to_proposal(row)
def list_pending_proposals(self) -> list[Proposal]:
if not self.db_path.is_file():
return []
with self._connect() as conn:
rows = conn.execute(
"""
SELECT p.* FROM supervise_proposals p
WHERE p.archived = 0
AND p.queue_key = ?
AND NOT EXISTS (
SELECT 1 FROM supervise_responses r
WHERE r.queue_key = p.queue_key
AND r.proposal_id = p.id
AND r.archived = 0
)
ORDER BY p.arrival_timestamp, p.id
""",
(self.queue_key,),
).fetchall()
return [self._row_to_proposal(row) for row in rows]
def list_all_pending_proposals(self) -> list[Proposal]:
if not self.db_path.is_file():
return []
with self._connect() as conn:
rows = conn.execute(
"""
SELECT p.* FROM supervise_proposals p
WHERE p.archived = 0
AND NOT EXISTS (
SELECT 1 FROM supervise_responses r
WHERE r.queue_key = p.queue_key
AND r.proposal_id = p.id
AND r.archived = 0
)
ORDER BY p.arrival_timestamp, p.id
"""
).fetchall()
return [self._row_to_proposal(row) for row in rows]
def write_response(self, response: Response) -> Path:
with self._connect() as conn:
conn.execute(
"""
INSERT OR REPLACE INTO supervise_responses (
queue_key, proposal_id, status, notes, final_file, archived
) VALUES (?, ?, ?, ?, ?, 0)
""",
(
self.queue_key,
response.proposal_id,
response.status,
response.notes,
response.final_file,
),
)
self._chmod()
return self.db_path
def read_response(self, proposal_id: str) -> Response:
with self._connect() as conn:
row = conn.execute(
"""
SELECT * FROM supervise_responses
WHERE queue_key = ? AND proposal_id = ? AND archived = 0
""",
(self.queue_key, proposal_id),
).fetchone()
if row is None:
raise FileNotFoundError(proposal_id)
return self._row_to_response(row)
def archive_proposal(self, proposal_id: str) -> None:
if not self.db_path.is_file():
return
with self._connect() as conn:
conn.execute(
"""
UPDATE supervise_proposals SET archived = 1
WHERE queue_key = ? AND id = ?
""",
(self.queue_key, proposal_id),
)
conn.execute(
"""
UPDATE supervise_responses SET archived = 1
WHERE queue_key = ? AND proposal_id = ?
""",
(self.queue_key, proposal_id),
)
@staticmethod
def _row_to_proposal(row: sqlite3.Row) -> Proposal:
m = get_supervise_mod()
return m.Proposal( # type: ignore[attr-defined]
id=row["id"],
bottle_slug=row["bottle_slug"],
tool=row["tool"],
proposed_file=row["proposed_file"],
justification=row["justification"],
arrival_timestamp=row["arrival_timestamp"],
current_file_hash=row["current_file_hash"],
)
@staticmethod
def _row_to_response(row: sqlite3.Row) -> Response:
m = get_supervise_mod()
return m.Response( # type: ignore[attr-defined]
proposal_id=row["proposal_id"],
status=row["status"],
notes=row["notes"],
final_file=row["final_file"],
)
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
return conn
_SCHEMA_KEY = "queue_store"
def _init(self) -> None:
with self._connect() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS schema_versions (
module TEXT PRIMARY KEY,
version INTEGER NOT NULL DEFAULT 0
)
"""
)
row = conn.execute(
"SELECT version FROM schema_versions WHERE module = ?",
(self._SCHEMA_KEY,),
).fetchone()
version = row[0] if row else 0
for i, sql in enumerate(_MIGRATIONS[version:], start=version + 1):
conn.execute(sql)
conn.execute(
"INSERT OR REPLACE INTO schema_versions (module, version) VALUES (?, ?)",
(self._SCHEMA_KEY, i),
)
self._chmod()
def _chmod(self) -> None:
try:
self.db_path.chmod(0o600)
except OSError:
pass
__all__ = ["QueueStore"]
+203 -72
View File
@@ -9,14 +9,15 @@ calls when it needs an operator-reviewed egress change:
Each tool call: the agent passes the full proposed file plus a
justification text. The sidecar validates the proposal syntactically,
writes it to the host SQLite queue table, and holds the tool-call
writes it to the host's per-bottle queue dir, and holds the tool-call
connection open. The operator's supervise TUI
(bot_bottle.cli.supervise) sees the proposal, accepts
approve / modify / reject, and writes a response row. The sidecar sees
the response and returns `{status, notes}` to the agent.
approve / modify / reject, and writes a response file alongside the
proposal. The sidecar sees the response and returns `{status, notes}`
to the agent.
This module defines the host-side library: dataclasses for the queue
record shapes, queue read/write helpers, the audit log writer, and the
file shapes, queue read/write helpers, the audit log writer, and the
diff renderer. The in-container sidecar lives in
bot_bottle/supervise_server.py; the supervise daemon's container
lifecycle is owned by the sidecar bundle (PRD 0024).
@@ -33,6 +34,8 @@ from __future__ import annotations
import dataclasses
import difflib
import hashlib
import json
import os
import time
import uuid
from abc import ABC
@@ -83,9 +86,8 @@ STATUSES: tuple[str, ...] = (STATUS_APPROVED, STATUS_MODIFIED, STATUS_REJECTED)
# `routes edit <bottle>` verb writes entries with this action.
ACTION_OPERATOR_EDIT = "operator-edit"
DB_PATH_IN_CONTAINER = "/run/supervise/bot-bottle.db"
QUEUE_DIR_IN_CONTAINER = "/run/supervise/queue"
DEFAULT_POLL_INTERVAL_SEC = 0.5
HOST_DB_FILENAME = "bot-bottle.db"
# --- Paths -----------------------------------------------------------------
@@ -95,6 +97,10 @@ def bot_bottle_root() -> Path:
return Path.home() / ".bot-bottle"
def queue_dir_for_slug(slug: str) -> Path:
return bot_bottle_root() / "queue" / slug
def audit_dir() -> Path:
return bot_bottle_root() / "audit"
@@ -103,16 +109,14 @@ def audit_log_path(component: str, slug: str) -> Path:
return audit_dir() / f"{component}-{slug}.log"
def host_db_path() -> Path:
return bot_bottle_root() / HOST_DB_FILENAME
# --- Dataclasses -----------------------------------------------------------
@dataclass(frozen=True)
class Proposal:
"""One pending tool-call from the agent."""
"""One pending tool-call from the agent. The sidecar writes one
of these to the queue dir on a tool call; the operator's TUI
reads them; the sidecar polls for a matching Response."""
id: str
bottle_slug: str
@@ -166,7 +170,7 @@ class Proposal:
@dataclass(frozen=True)
class Response:
"""The operator's decision on a proposal. The TUI writes one of
these to the queue table; the sidecar reads it and returns the
these to the queue dir; the sidecar reads it and returns the
`{status, notes}` pair to the agent's tool call.
`final_file` carries the file content the supervisor will
@@ -219,50 +223,90 @@ class AuditEntry:
return dataclasses.asdict(self)
try:
from .queue_store import QueueStore
from .audit_store import AuditStore
except ImportError:
# Sidecar bundle: files are flat-copied under /app, not a package.
from queue_store import QueueStore # type: ignore[import-not-found] # pylint: disable=import-error,no-name-in-module
from audit_store import AuditStore # type: ignore[import-not-found] # pylint: disable=import-error,no-name-in-module
# --- Queue I/O -------------------------------------------------------------
def write_proposal(proposal: Proposal) -> Path:
"""Persist `proposal` in the queue database, mode 0o600.
def _proposal_filename(proposal_id: str) -> str:
return f"{proposal_id}.proposal.json"
def _response_filename(proposal_id: str) -> str:
return f"{proposal_id}.response.json"
def _id_from_proposal_filename(path: Path) -> str | None:
name = path.name
if not name.endswith(".proposal.json"):
return None
return name[: -len(".proposal.json")]
def write_proposal(queue_dir: Path, proposal: Proposal) -> Path:
"""Persist `proposal` as JSON in the queue dir, mode 0o600.
Directory is created if missing."""
return QueueStore(proposal.bottle_slug).write_proposal(proposal)
queue_dir.mkdir(parents=True, exist_ok=True)
path = queue_dir / _proposal_filename(proposal.id)
payload = json.dumps(proposal.to_dict(), indent=2) + "\n"
_atomic_write(path, payload, mode=0o600)
return path
def read_proposal(bottle_slug: str, proposal_id: str) -> Proposal:
return QueueStore(bottle_slug).read_proposal(proposal_id)
def read_proposal(queue_dir: Path, proposal_id: str) -> Proposal:
path = queue_dir / _proposal_filename(proposal_id)
with path.open() as f:
raw = json.load(f)
if not isinstance(raw, dict):
raise ValueError(f"{path}: top-level must be an object")
return Proposal.from_dict(raw)
def list_pending_proposals(bottle_slug: str) -> list[Proposal]:
"""All proposals for `bottle_slug` that do not yet have a matching
response. Sorted by `arrival_timestamp` so the operator
def list_pending_proposals(queue_dir: Path) -> list[Proposal]:
"""All proposals in `queue_dir` that do not yet have a matching
response file. Sorted by `arrival_timestamp` so the operator
sees the queue FIFO."""
return QueueStore(bottle_slug).list_pending_proposals()
if not queue_dir.is_dir():
return []
out: list[Proposal] = []
for path in sorted(queue_dir.glob("*.proposal.json")):
proposal_id = _id_from_proposal_filename(path)
if proposal_id is None:
continue
if (queue_dir / _response_filename(proposal_id)).exists():
continue
try:
with path.open() as f:
raw = json.load(f)
except (OSError, json.JSONDecodeError):
continue
if not isinstance(raw, dict):
continue
try:
out.append(Proposal.from_dict(raw))
except (KeyError, ValueError):
continue
out.sort(key=lambda p: p.arrival_timestamp)
return out
def list_all_pending_proposals() -> list[Proposal]:
"""All pending proposals across bottles, sorted FIFO."""
return QueueStore("").list_all_pending_proposals()
def write_response(queue_dir: Path, response: Response) -> Path:
queue_dir.mkdir(parents=True, exist_ok=True)
path = queue_dir / _response_filename(response.proposal_id)
payload = json.dumps(response.to_dict(), indent=2) + "\n"
_atomic_write(path, payload, mode=0o600)
return path
def write_response(bottle_slug: str, response: Response) -> Path:
return QueueStore(bottle_slug).write_response(response)
def read_response(bottle_slug: str, proposal_id: str) -> Response:
return QueueStore(bottle_slug).read_response(proposal_id)
def read_response(queue_dir: Path, proposal_id: str) -> Response:
path = queue_dir / _response_filename(proposal_id)
with path.open() as f:
raw = json.load(f)
if not isinstance(raw, dict):
raise ValueError(f"{path}: top-level must be an object")
return Response.from_dict(raw)
def wait_for_response(
bottle_slug: str,
queue_dir: Path,
proposal_id: str,
*,
poll_interval: float = DEFAULT_POLL_INTERVAL_SEC,
@@ -273,35 +317,90 @@ def wait_for_response(
which the wait raises TimeoutError. None waits forever the
natural shape, since the operator's response time is unbounded.
Polls SQLite so the implementation stays portable and stdlib-only."""
store = QueueStore(bottle_slug)
Polls the filesystem so the implementation stays portable and
stdlib-only."""
path = queue_dir / _response_filename(proposal_id)
while True:
try:
return store.read_response(proposal_id)
except FileNotFoundError:
pass
if path.exists():
try:
with path.open() as f:
raw = json.load(f)
except (OSError, json.JSONDecodeError):
raw = None
if isinstance(raw, dict):
try:
return Response.from_dict(raw)
except (KeyError, ValueError):
pass
if deadline is not None and time.monotonic() >= deadline:
raise TimeoutError(f"no response for proposal {proposal_id!r}")
time.sleep(poll_interval)
def archive_proposal(bottle_slug: str, proposal_id: str) -> None:
"""Mark both proposal and response rows processed.
Idempotent missing rows are silently skipped."""
QueueStore(bottle_slug).archive_proposal(proposal_id)
def archive_proposal(queue_dir: Path, proposal_id: str) -> None:
"""Move both proposal and response files to `<queue_dir>/processed/`.
Idempotent missing files are silently skipped."""
processed = queue_dir / "processed"
processed.mkdir(parents=True, exist_ok=True)
for name in (_proposal_filename(proposal_id), _response_filename(proposal_id)):
src = queue_dir / name
if src.exists():
src.rename(processed / name)
# --- Audit log -------------------------------------------------------------
def write_audit_entry(entry: AuditEntry) -> Path:
"""Append `entry` to the host supervise audit table."""
return AuditStore().write_audit_entry(entry)
"""Append `entry` as one JSON-Lines record to the per-bottle
audit log. Acquires an advisory exclusive lock so concurrent
writers don't interleave bytes."""
path = audit_log_path(entry.component, entry.bottle_slug)
path.parent.mkdir(parents=True, exist_ok=True)
line = json.dumps(entry.to_dict(), sort_keys=False) + "\n"
fd = os.open(path, os.O_WRONLY | os.O_APPEND | os.O_CREAT, 0o600)
try:
_try_flock(fd)
try:
os.write(fd, line.encode("utf-8"))
finally:
_try_funlock(fd)
finally:
os.close(fd)
return path
def read_audit_entries(component: str, slug: str) -> list[AuditEntry]:
"""Load all audit entries for the given component+slug."""
return AuditStore().read_audit_entries(component, slug)
"""Load all audit entries for the given component+slug. Empty
list if the log doesn't exist."""
path = audit_log_path(component, slug)
if not path.is_file():
return []
out: list[AuditEntry] = []
with path.open() as f:
for raw_line in f:
raw_line = raw_line.strip()
if not raw_line:
continue
try:
raw = json.loads(raw_line)
except json.JSONDecodeError:
continue
if not isinstance(raw, dict):
continue
try:
out.append(AuditEntry(
timestamp=_require_str(raw, "timestamp"),
bottle_slug=_require_str(raw, "bottle_slug"),
component=_require_str(raw, "component"),
operator_action=_require_str(raw, "operator_action"),
operator_notes=_require_str(raw, "operator_notes"),
justification=_require_str(raw, "justification"),
diff=_require_str(raw, "diff"),
))
except ValueError:
continue
return out
# --- Diff rendering --------------------------------------------------------
@@ -334,34 +433,35 @@ def sha256_hex(content: str) -> str:
class SupervisePlan:
"""Output of Supervise.prepare; consumed by .start.
`db_path` is the host database bind-mounted into the sidecar at
/run/supervise/bot-bottle.db. `internal_network` is empty at
prepare time; the backend's launch step fills it via
dataclasses.replace before calling .start."""
`queue_dir` is the host directory bind-mounted into the sidecar
at /run/supervise/queue. `internal_network` is empty at prepare
time; the backend's launch step fills it via dataclasses.replace
before calling .start."""
slug: str
db_path: Path
queue_dir: Path
internal_network: str = ""
class Supervise(ABC):
"""Per-bottle supervise sidecar. Encapsulates host-side database
staging; the sidecar's start/stop lifecycle is backend-specific."""
"""Per-bottle supervise sidecar. Encapsulates the host-side
prepare (queue dir staging); the sidecar's start/stop lifecycle
is backend-specific."""
def prepare(
self,
slug: str,
stage_dir: Path,
) -> SupervisePlan:
"""Stage the host database. Returns the plan; `internal_network`
must be set by the launch step before .start runs."""
"""Stage the per-bottle queue dir on the host. Returns the
plan; `internal_network` must be set by the launch step before
.start runs."""
del stage_dir
db_path = host_db_path()
QueueStore(slug)
AuditStore(db_path)
queue_dir = queue_dir_for_slug(slug)
queue_dir.mkdir(parents=True, exist_ok=True)
return SupervisePlan(
slug=slug,
db_path=db_path,
queue_dir=queue_dir,
)
# --- Helpers ---------------------------------------------------------------
@@ -374,15 +474,47 @@ def _require_str(raw: dict[str, object], key: str) -> str:
return value
def _atomic_write(path: Path, content: str, *, mode: int) -> None:
"""Atomic: write to a sibling tmp file, fsync, rename."""
tmp = path.with_suffix(path.suffix + ".tmp")
fd = os.open(tmp, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, mode)
try:
os.write(fd, content.encode("utf-8"))
os.fsync(fd)
finally:
os.close(fd)
os.replace(tmp, path)
try:
import fcntl as _fcntl
def _try_flock(fd: int) -> None: # type: ignore[reportRedeclaration]
try:
_fcntl.flock(fd, _fcntl.LOCK_EX)
except OSError:
pass
def _try_funlock(fd: int) -> None: # type: ignore[reportRedeclaration]
try:
_fcntl.flock(fd, _fcntl.LOCK_UN)
except OSError:
pass
except ImportError: # pragma: no cover — Windows path
def _try_flock(fd: int) -> None: # noqa: F841 — Windows fallback
return None
def _try_funlock(fd: int) -> None: # noqa: F841 — Windows fallback
return None
__all__ = [
"ACTION_OPERATOR_EDIT",
"AuditEntry",
"AuditStore",
"COMPONENT_FOR_TOOL",
"DEFAULT_POLL_INTERVAL_SEC",
"DB_PATH_IN_CONTAINER",
"Proposal",
"QueueStore",
"QUEUE_DIR_IN_CONTAINER",
"Response",
"STATUSES",
"STATUS_APPROVED",
@@ -404,9 +536,8 @@ __all__ = [
"audit_dir",
"audit_log_path",
"bot_bottle_root",
"host_db_path",
"list_pending_proposals",
"list_all_pending_proposals",
"queue_dir_for_slug",
"read_audit_entries",
"read_proposal",
"read_response",
+17 -9
View File
@@ -7,13 +7,14 @@ config changes when stuck. The tools are `egress-allow`,
Each queued tool call:
1. Validates the proposed file syntactically.
2. Writes a Proposal to the host SQLite database.
3. Blocks polling for a matching Response row.
2. Writes a Proposal to /run/supervise/queue/ (bind-mounted from
the host's ~/.bot-bottle/queue/<slug>/).
3. Blocks polling for a matching Response file.
4. Returns the operator's `{status, notes}` to the agent.
The bottle slug arrives via SUPERVISE_BOTTLE_SLUG env (stamped at
container creation by the backend's start step). SUPERVISE_DB_PATH
points at the bind-mounted host database.
container creation by the backend's start step). The queue dir comes
from SUPERVISE_QUEUE_DIR (default `/run/supervise/queue`).
Speaks MCP over HTTP+JSON-RPC. Methods handled:
@@ -41,6 +42,7 @@ import typing
import urllib.error
import urllib.request
from dataclasses import dataclass
from pathlib import Path
try:
# Same-directory imports inside the bundle container; these files are
@@ -275,6 +277,7 @@ def validate_proposed_file(tool: str, content: str) -> None:
@dataclass(frozen=True)
class ServerConfig:
bottle_slug: str
queue_dir: Path
response_timeout_seconds: float = DEFAULT_RESPONSE_TIMEOUT_SECONDS
@@ -373,7 +376,7 @@ def handle_tools_call(
current_file_hash=_sv.sha256_hex(proposed_file),
)
try:
_sv.write_proposal(proposal)
_sv.write_proposal(config.queue_dir, proposal)
except OSError as e:
raise _RpcInternalError(f"failed to write proposal to queue: {e}") from e
sys.stderr.write(
@@ -384,7 +387,7 @@ def handle_tools_call(
deadline = time.monotonic() + config.response_timeout_seconds
try:
response = _sv.wait_for_response(
config.bottle_slug,
config.queue_dir,
proposal.id,
poll_interval=MIN_RESPONSE_POLL_INTERVAL_SECONDS,
deadline=deadline,
@@ -396,7 +399,7 @@ def handle_tools_call(
"isError": False,
}
try:
_sv.archive_proposal(config.bottle_slug, proposal.id)
_sv.archive_proposal(config.queue_dir, proposal.id)
except OSError as e:
raise _RpcInternalError(f"failed to archive proposal: {e}") from e
@@ -536,7 +539,7 @@ class MCPHandler(http.server.BaseHTTPRequestHandler):
class MCPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
allow_reuse_address = True
daemon_threads = True
config: ServerConfig = ServerConfig(bottle_slug="")
config: ServerConfig = ServerConfig(bottle_slug="", queue_dir=Path())
# --- Entry point -----------------------------------------------------------
@@ -545,18 +548,21 @@ class MCPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
def serve(
*,
bottle_slug: str,
queue_dir: Path,
port: int = _sv.SUPERVISE_PORT,
bind: str = "0.0.0.0",
response_timeout_seconds: float = DEFAULT_RESPONSE_TIMEOUT_SECONDS,
) -> typing.NoReturn:
queue_dir.mkdir(parents=True, exist_ok=True)
server = MCPServer((bind, port), MCPHandler)
server.config = ServerConfig(
bottle_slug=bottle_slug,
queue_dir=queue_dir,
response_timeout_seconds=response_timeout_seconds,
)
sys.stderr.write(
f"supervise listening on {bind}:{port}; "
f"slug={bottle_slug!r}; "
f"slug={bottle_slug!r}; queue={queue_dir}; "
f"tools: {', '.join(t['name'] for t in TOOL_DEFINITIONS)}\n" # type: ignore[arg-type]
)
sys.stderr.flush()
@@ -575,6 +581,7 @@ def main(argv: list[str]) -> int:
if not bottle_slug:
sys.stderr.write("supervise: SUPERVISE_BOTTLE_SLUG env is unset\n")
return 2
queue_dir = Path(os.environ.get("SUPERVISE_QUEUE_DIR", _sv.QUEUE_DIR_IN_CONTAINER))
port = int(os.environ.get("SUPERVISE_PORT", str(_sv.SUPERVISE_PORT)))
bind = os.environ.get("SUPERVISE_BIND", "0.0.0.0")
try:
@@ -584,6 +591,7 @@ def main(argv: list[str]) -> int:
return 2
serve(
bottle_slug=bottle_slug,
queue_dir=queue_dir,
port=port,
bind=bind,
response_timeout_seconds=response_timeout_seconds,
@@ -0,0 +1,239 @@
# PRD prd-new: Forge native integration
- **Status:** Draft
- **Author:** claude
- **Created:** 2026-06-29
- **Issue:** #317
## Summary
Add a webhook-driven orchestration layer that lets Gitea issues and PR comments drive bot-bottle sessions end-to-end with no operator in the loop for the happy path. An issue assigned to a member of the configured agent org and labelled with an agent name triggers a headless bottle launch; the bottle processes the issue, opens a PR, and interacts with the forge through a **forge sidecar** — the agent never touches the Gitea API or its credentials directly. The agent calls `signal_done(status, summary)` on the sidecar when a work unit is complete; the sidecar relays that to the orchestrator over a queue dir (the same pattern as the supervise sidecar), so completion is an unambiguous in-band signal rather than a comment the orchestrator has to parse. The orchestrator freezes the bottle. Subsequent PR comments rehydrate the frozen bottle. The bottle is destroyed when the PR closes.
The forge sidecar is backed by a `Forge` abstract class with per-provider implementations (Gitea first), so the agent's prompts and the sidecar protocol stay forge-agnostic. The sidecar logs forge operations semantically ("read PR description", "posted comment", "signalled done"), giving richer provenance than post-hoc egress-byte parsing, and enforces a **read-anywhere / write-scoped** permission model: the agent may read for context but may only write to the issue and PRs it was assigned.
Run provenance is exposed through a **provenance API** (the sidecar's structured operation log plus the run's metadata), not posted back into the forge. We do not surface a provenance footer in the PR — the audit record lives behind the API where it can be retained and queried, rather than as an editable comment.
The separation of concerns across the two layers: bot-bottle owns the headless launch primitives, the forge sidecar + `Forge` abstraction, and forge state. `bot-bottle-orchestrator` (separate binary) owns the webhook listener, bottle lifecycle loop, and monitoring dashboard; it calls into bot-bottle via `./cli.py orchestrate`, a thin wrapper command. This PRD covers bot-bottle's side of that contract.
## Problem
Today an operator must open the TUI, select an agent and bottle, confirm the preflight, and type prompts interactively. This blocks "issue → PR" automation and produces no durable audit record of what the agent did. The security model already provides the right isolation and egress controls, and `start --headless` (#315) already gives `bot-bottle-orchestrator` a non-interactive launch path. The missing pieces are a headless `resume` counterpart for rehydrating frozen bottles, a forge-interaction surface the agent uses to read context, post comments, and signal completion, and the provenance trail that makes the audit story legible to reviewers on every PR.
That forge-interaction surface could be built two ways: (2) give the agent the Gitea API directly with cred-proxy injecting the token, or (3) put a forge sidecar between the agent and the forge. This PRD takes **option 3**. The deciding factors: a sidecar `signal_done` call is an unambiguous completion signal where comment-parsing is a correctness risk that surfaces in production; the sidecar produces a semantic audit trail rather than HTTP bytes, which is load-bearing for provenance (the stated product priority); and the sidecar can enforce scope tighter than repo-wide API-key permissions, reducing blast radius for a prompt-injected agent. The costs — a second sidecar process per forge run, a new failure mode if it crashes, and per-forge implementation cost — are accepted as the price of those properties.
## Goals / Success Criteria
1. Headless launch already exists: `./cli.py start <agent> --headless --prompt` (#315) runs non-interactively with no TUI selectors or y/N preflight. This PRD builds on it rather than re-introducing it. The remaining gap is a matching headless `resume` path (`./cli.py resume --headless`), since rehydrating a frozen bottle for a new prompt is required by the freeze / rehydrate loop and `resume` has no non-interactive entry point today.
2. An issue assigned to a member of the configured org (`FORGE_ORG`, default `bot-bottle`) and labelled `bot-bottle:<agent-name>` is the trigger convention. Org membership is verified via the Gitea API at event time.
3. Forge-targeted bottles run a **forge sidecar** that exposes a small, forge-agnostic API (comment/issue/PR CRUD plus `signal_done`) over the same queue-dir + HTTP/JSON-RPC machinery as the supervise sidecar. The agent calls the sidecar; it never sees the forge token or forge-specific endpoints.
4. The sidecar is backed by a `Forge` abstract class. Gitea is the first concrete implementation; adding a forge means a new subclass, not changes to the agent prompt or sidecar protocol. The sidecar enforces a read-anywhere / write-scoped model: writes are limited to the assigned issue and its PRs; reads are unrestricted for context.
5. The agent calls `signal_done(status, summary)` on the sidecar when a work unit is complete; the sidecar relays it to the orchestrator over a queue dir. This is the done signal — no comment parsing. A watchdog timeout (configurable, default 30 min) causes the orchestrator to treat the run as done-without-self-report if the agent exits without signalling.
6. Run provenance (agent name, bottle name(s), slug, timing, exit code, gitleaks result, egress summary, and the sidecar's semantic operation log) is available through a provenance API. It is **not** surfaced as a PR footer or any other forge comment.
7. Forge state (issue → slug, status) is persisted in a local SQLite database under `~/.bot-bottle/` and survives orchestrator restarts.
8. `./cli.py orchestrate status` lists active forge-managed bottles and their issue/PR URLs.
9. Unit tests cover: label parsing, org-membership check path, forge state store CRUD (SQLite), headless launch arg construction, forge env var injection, sidecar request dispatch through the `Forge` abstraction, write-scope enforcement (reject writes outside the assigned issue/PRs), and `signal_done` queue relay.
## Non-goals
- Webhook signature verification (HMAC-SHA256). Added as a follow-up.
- The `bot-bottle-orchestrator` binary itself — this PRD covers bot-bottle's side of the interface only. The orchestrator is a separate project.
- GitHub or GitLab support.
- Multiple simultaneous forge bottles per issue.
- Automatic retry on agent error exit.
- Bottle destruction on issue close (PR close only; issue close is ambiguous).
- Concurrent multi-issue handling (one blocking run per orchestrator process).
- A monitoring dashboard (orchestrator-side concern).
- Folding `DeployKeyProvisioner` into the `Forge` abstraction. Deploy-key provisioning runs at bottle-provision time on the host; the forge sidecar runs inside the bottle at agent time. The two have different lifecycles and actors, so coupling them into one class is deferred to a follow-up. This PRD only shares the Gitea HTTP client between them.
## Design
### Targeting convention
An issue is forge-targeted when **both** hold:
- At least one assignee is a member of the Gitea org named by `FORGE_ORG` (default `bot-bottle`). Checked via `GET /api/v1/orgs/{org}/members/{user}`.
- At least one label has the prefix `bot-bottle:`. The suffix names the agent manifest, e.g. `bot-bottle:implementer` → agent `implementer`.
`FORGE_ORG` is read at orchestrate-command startup. It is not embedded in manifests or state files; the orchestrator stamps its value into log output for auditability.
An optional label `bot-bottle-bottle:<name>` overrides bottle selection. When absent the agent's default bottle is used.
### `./cli.py orchestrate` — the thin wrapper
```
./cli.py orchestrate start --agent AGENT [--bottle BOTTLE ...] --prompt PROMPT
[--label LABEL] [--backend BACKEND]
./cli.py orchestrate resume --slug SLUG --prompt PROMPT [--backend BACKEND]
./cli.py orchestrate status
```
`orchestrate start` is a thin shim over the already-shipped `start --headless` (#315): it forwards agent / bottle / label / prompt and adds the forge-specific wiring (`forge_env`, sidecar launch). It does not re-implement headless launch. The caller (`bot-bottle-orchestrator`) manages freeze, state, and the forge sidecar's done signal around it.
`orchestrate resume` is the shim over the new `resume --headless` (below).
`orchestrate status` prints the forge state table.
### Headless primitives — what exists vs. what's new
Headless **start** already shipped in #315 and this PRD reuses it as-is:
- `./cli.py start <agent> --headless --prompt TEXT` — no TUI selectors, no y/N preflight. Internally `_start_headless()` calls the shared `_launch_bottle()` with `assume_yes=True` and `headless_prompt_text=prompt`.
- The prompt is delivered through `AgentProvider.headless_prompt(prompt)` — claude `-p`, codex positional, pi `-p`. The orchestrator does **not** hand-roll agent args; it relies on this provider abstraction. (An earlier draft proposed `start_headless` / `attach_agent_headless` helpers that constructed `--no-interactive`/`-p` directly — those are dropped as redundant with, and divergent from, what #315 merged.)
Two additions are needed on top of #315:
**1. A `forge_env` hook on the headless launch path.** The orchestrator needs to pass forge context + token through to the forge sidecar launched alongside the agent. This is a parameter threaded into `_launch_bottle` (the same core `start --headless` already uses), not a parallel launch function. The agent process itself does not receive the token.
**2. `resume --headless`** — new in `bot_bottle/cli/resume.py`, mirroring the `--headless` flag on `start`:
```
./cli.py resume <slug> --headless --prompt TEXT
```
It rehydrates a frozen bottle and runs one headless prompt via the same `assume_yes` + `headless_prompt` path, returning the agent's exit code. `resume` has no non-interactive entry point today, so this is genuinely new work rather than a rename of an existing helper.
### Forge sidecar
Forge-targeted bottles run a forge sidecar alongside the agent, mirroring the supervise sidecar: a per-bottle process that exposes an HTTP/JSON-RPC endpoint over a Unix socket and relays events to the orchestrator through a queue dir. The agent calls the sidecar; the sidecar holds the forge token and makes the actual forge API calls. The agent never receives the credential and never sees a forge-specific endpoint — swapping Gitea for another forge does not change the agent prompt or the sidecar protocol.
The sidecar is configured at launch from the forge context (owner, repo, issue, PR) and the token, supplied by the orchestrator — not baked into the agent manifest. Because the sidecar owns the token, forge traffic does not need a cred-proxy egress route on the agent; the agent's egress policy is unchanged by forge targeting.
**Sidecar protocol** (forge-agnostic; each method maps to a `Forge` call):
| Method | Scope | Purpose |
|---|---|---|
| `read_issue(number)` | read-anywhere | Read an issue body for context |
| `read_pr(number)` | read-anywhere | Read a PR (incl. merge state) for context |
| `read_comments(number)` | read-anywhere | Read a thread for context |
| `post_comment(number, body)` | write-scoped | Post to the assigned issue/PR |
| `update_description(number, body)` | write-scoped | Edit the assigned issue/PR body |
| `signal_done(status, summary)` | — | Relay completion to the orchestrator |
Issues and PRs are distinct domain objects (`Issue` vs `PullRequest`) read through distinct methods; a PR carries merge state an issue does not.
**Scope enforcement** is read-anywhere / write-scoped: read methods accept any issue/PR number for context; write methods are rejected unless the target is the assigned issue or one of its PRs. This is tighter than Gitea's repo-wide API-key permissions and bounds the blast radius of a prompt-injected agent. Rejections are logged semantically (operation, target, reason) so the audit trail records attempted out-of-scope writes, not just allowed ones.
**Semantic audit**: every sidecar call is logged as a structured operation ("read PR #318 description", "posted comment to #317", "signalled done: success") rather than as opaque HTTP bytes. This log feeds provenance directly, with no post-hoc egress-log parsing.
### `Forge` abstraction — `bot_bottle/contrib/forge/`
The sidecar dispatches to a `Forge` abstract class. Each provider implements the operations behind the sidecar protocol:
```python
class Forge(abc.ABC):
@abc.abstractmethod
def read_issue(self, number: int) -> Issue: ...
@abc.abstractmethod
def read_pr(self, number: int) -> PullRequest: ...
@abc.abstractmethod
def read_comments(self, number: int) -> list[Comment]: ...
@abc.abstractmethod
def post_comment(self, number: int, body: str) -> None: ...
@abc.abstractmethod
def update_description(self, number: int, body: str) -> None: ...
@abc.abstractmethod
def is_org_member(self, org: str, username: str) -> bool: ...
@abc.abstractmethod
def get_pr_for_issue(self, number: int) -> int | None: ...
@abc.abstractmethod
def is_pr_open(self, number: int) -> bool: ...
```
`Issue` and `PullRequest` are separate frozen dataclasses — a PR adds `merged`. `ScopedForge` wraps a concrete `Forge` to enforce the read-anywhere / write-scoped model (`post_comment` / `update_description` raise `ForgeScopeError` outside the assigned issue and PRs).
`GiteaForge` is the first and only concrete implementation in this PRD. It wraps the Gitea HTTP client (below). Adding GitHub or GitLab later is a new subclass; the sidecar, protocol, and agent prompt are untouched.
> **Deferred:** `DeployKeyProvisioner` is *not* folded into `Forge` here. Deploy-key provisioning runs on the host at provision time; the sidecar runs in the bottle at agent time. They have different lifecycles and actors, so a shared abstract base would couple two unrelated auth contexts. For now they only share the Gitea HTTP client; a later PRD can revisit unification.
### Forge env vars
The orchestrator passes forge context to the **sidecar** (not the agent) at launch. The agent does not need owner/repo/issue env vars to construct API calls, since it only names issue/PR numbers to the sidecar:
| Var | Example | Purpose |
|---|---|---|
| `FORGE_GITEA_API` | `https://gitea.dideric.is/api/v1` | Base URL the sidecar calls |
| `FORGE_OWNER` | `didericis` | Repo owner |
| `FORGE_REPO` | `bot-bottle` | Repo name |
| `FORGE_ISSUE_NUMBER` | `317` | Assigned issue (defines write scope) |
| `FORGE_PR_NUMBER` | `318` | Assigned PR (empty until PR exists) |
The agent's forge-specific prompt instructs it to call `signal_done` on the sidecar when a work unit is complete, and to use the sidecar for any comment/description writes. The instruction is forge-agnostic and is part of the forge prompt overlay, not the base agent manifest, so non-forge runs are unaffected.
### Done signal and watchdog
The agent calls `signal_done(status, summary)` on the sidecar when it finishes a work unit. The sidecar writes the event to its queue dir; the orchestrator reads it and:
1. Reads the forge state for `(owner, repo, issue_number)`.
2. If `status == "running"`, treats the event as the done signal: freezes the bottle and sets `status = "frozen"`. Provenance is recorded via the provenance API — no comment is posted to the forge.
Because completion is an explicit `signal_done` call, the orchestrator does not parse comment text to detect "done", and intermediate comments the agent posts mid-run cannot be mistaken for completion.
**Watchdog**: the orchestrator tracks `last_checkin_at` in forge state, updated on each sidecar event. A background thread wakes every minute. If `now - last_checkin_at > FORGE_WATCHDOG_TIMEOUT` (default 30 min, configurable via env) and `status == "running"`, the orchestrator treats the run as done-without-self-report and freezes the bottle, flagging the run as incomplete in the provenance record.
**Sidecar-death failure mode**: if the forge sidecar crashes mid-run the agent loses forge access while the bottle is otherwise healthy. The orchestrator detects a dead sidecar (socket/queue gone) the same way it detects a stalled agent and falls back to the watchdog path.
### Forge state — `bot_bottle/contrib/gitea/forge_state.py`
State is stored in a local SQLite database at `~/.bot-bottle/bot-bottle.db`. Access goes through a thin CRUD interface, `ForgeStateStore`, so the storage location/engine can be swapped without touching callers. `SqliteForgeStateStore` is the first implementation.
The `forge_state` table is keyed by `(owner, repo, issue_number)` and carries: `slug`, `agent_name`, `bottle_names` (JSON), `backend_name`, `agent_git_user`, `pr_number` (nullable), `status`, `last_checkin_at`.
`status`: `"running"` | `"frozen"` | `"destroyed"`.
Store interface:
```python
class ForgeStateStore(abc.ABC):
def upsert(self, state: ForgeState) -> None: ...
def get(self, owner: str, repo: str, issue_number: int) -> ForgeState | None: ...
def delete(self, owner: str, repo: str, issue_number: int) -> None: ...
def all(self) -> list[ForgeState]: ...
class SqliteForgeStateStore(ForgeStateStore):
def __init__(self, db_path: Path | None = None) -> None: ...
```
`upsert` uses `INSERT OR REPLACE` so a re-run for the same issue overwrites in place. The schema is created on first open.
### Provenance API
Run provenance — agent, bottle(s), slug, timing, exit code, gitleaks result, egress summary, watchdog-fired flag, and the sidecar's semantic operation log — is exposed through a **provenance API**, not posted into the forge. There is no provenance footer or run-summary comment.
The rationale (per the monetization positioning): a PR comment is mutable by any maintainer, unsigned, and per-PR, so it is worthless as an audit record and invites false trust. The authoritative record therefore lives behind the API, where it can be retained, queried, and (eventually) signed. Whether any projection of it ever appears in the forge is a separate, out-of-scope decision; this PR does not build one.
The API surface itself (schema, transport, signing, retention) is **out of scope for this PRD** and belongs with the orchestrator / control-plane work. bot-bottle here only produces the raw material: the sidecar's semantic operation log and the run metadata the orchestrator collects.
### Gitea HTTP client — `bot_bottle/contrib/gitea/client.py`
`GiteaForge` (and the existing `GiteaDeployKeyProvisioner`) share one thin HTTP client. Unlike the option-2 design, the token is held by the sidecar process and passed to the client directly — there is no agent-side cred-proxy route to inject it, because the agent never makes forge calls.
```python
class GiteaClient:
def __init__(self, *, api_url: str, owner: str, repo: str, token: str) -> None: ...
def is_org_member(self, org: str, username: str) -> bool: ...
def get_issue(self, number: int) -> dict: ...
def get_comments(self, number: int) -> list[dict]: ...
def post_comment(self, number: int, body: str) -> None: ...
def patch_issue_body(self, number: int, body: str) -> None: ...
def get_pull(self, number: int) -> dict: ...
```
`GiteaForge` adapts this client to the `Forge` surface (mapping raw JSON to `Issue` / `PullRequest` / `Comment`). Sharing only the HTTP client (not an abstract base) is the deliberate boundary between the sidecar and the deploy-key provisioner — see the deferral note under the `Forge` abstraction.
### Implementation chunks
1. **Headless additions on top of #315** — thread a `forge_env` parameter into the existing `_launch_bottle` core (the one `start --headless` already uses); add a `--headless` path to `cli/resume.py` reusing `assume_yes` + `headless_prompt`. No new `start_headless`/`attach_agent_headless` helpers. Tests: `forge_env` reaches the sidecar/`guest_env`; `resume --headless` skips the TUI and y/N preflight and returns the agent exit code.
2. **Forge state**`contrib/gitea/forge_state.py`: `ForgeState` dataclass, `ForgeStateStore` CRUD interface, `SqliteForgeStateStore`. Tests: round-trip, missing → None, `INSERT OR REPLACE` upsert, delete idempotent, `all()` ordering, persistence across store instances.
3. **`Forge` abstraction + Gitea client** — `contrib/forge/base.py` (`Forge` ABC, `ScopedForge`, `Issue` / `PullRequest` / `Comment`) and `contrib/gitea/client.py` + `GiteaForge`: `is_org_member`, `read_issue`, `read_pr`, `read_comments`, `post_comment`, `update_description`, `get_pr_for_issue`, `is_pr_open`. Tests: mock `urllib.request.urlopen`, assert payloads and 404-as-false for membership; `ScopedForge` write-scope enforcement.
4. **Forge sidecar** — sidecar process exposing the protocol over a Unix socket, queue-dir relay, write-scope enforcement, semantic op log, `signal_done`. Reuses the supervise sidecar bundle machinery. Tests: dispatch each method to the `Forge`, reject out-of-scope writes, `signal_done` writes a queue event, scope-rejection is logged.
5. **`./cli.py orchestrate`** — `cli/orchestrate.py` with `start`, `resume`, `status` subcommands wired into `cli.py`; `start` launches the forge sidecar alongside the agent for forge-targeted runs. Tests: arg parsing, `start` delegates to `start --headless`, `resume` delegates to `resume --headless`.
## Provenance
Run provenance is captured (sidecar semantic operation log + run metadata) and exposed through a provenance API. It is deliberately **not** surfaced in the forge — no footer, no run-summary comment. A mutable, unsigned PR comment is not an audit record; the authoritative record lives behind the API where it can be retained and signed. The `watchdog_fired` flag marks runs where the agent did not self-report completion so consumers of the API know the record may be incomplete.
The provenance API's schema, transport, signing, and retention are out of scope for this PRD (control-plane work); bot-bottle here produces the raw material only.
-135
View File
@@ -1,135 +0,0 @@
# PRD prd-new: SQLite local storage
- **Status:** Active
- **Author:** codex
- **Created:** 2026-07-01
- **Issue:** #319
## Summary
Add a small stdlib SQLite storage layer for bot-bottle host runtime state,
starting with the supervise queue and audit log. This replaces scattered JSON
queue files and JSONL audit logs with structured tables while preserving the
existing public supervise helper functions and sidecar queue mount contract.
## Problem
Bot-bottle currently stores supervise proposals and responses as individual JSON
files under `~/.bot-bottle/queue/<slug>/`, and audit entries as JSONL files
under `~/.bot-bottle/audit/`. That worked for the original interactive TUI, but
new forge-native orchestration needs durable, queryable local state for queues,
audit trails, watchdogs, and lifecycle records. PR #318 started introducing
SQLite-shaped boilerplate for forge state; the storage foundation should live in
its own PR so forge work can build on the shared runtime store instead of adding
one-off persistence.
## Goals / Success Criteria
1. Supervise proposals and responses are persisted through SQLite.
2. Audit entries are persisted through SQLite.
3. Supervise queue helpers use the bottle slug / queue key instead of a queue
directory path.
4. The sidecar receives the host database mount across docker, smolmachines,
and macOS-container backends.
5. The implementation stays stdlib-only.
6. Schema migrations use a `PRAGMA user_version` runner — no third-party deps.
7. Unit tests cover queue round-trips, pending discovery, response waits,
archive semantics, audit round-trips, and path creation.
## Non-goals
- Migrating old JSON queue files or JSONL audit logs.
- Adding forge orchestration state tables.
- Adding egress metering or budget tables.
- Changing the supervise TUI workflow or remediation behavior.
- Introducing a third-party ORM or migration library.
## Design
### Database locations
Queue and audit state use the host-level local database:
```text
~/.bot-bottle/bot-bottle.db
```
The supervise sidecar receives that database as a writable bind mount at
`/run/supervise/bot-bottle.db` and gets the path through `SUPERVISE_DB_PATH`.
No per-slug queue directory is mounted into the sidecar. This creates the shared
host database that later forge/native lifecycle work can extend in separate
PRDs.
### Tables
`supervise_proposals` lives in the host database:
```sql
CREATE TABLE supervise_proposals (
queue_key TEXT NOT NULL,
id TEXT NOT NULL,
bottle_slug TEXT NOT NULL,
tool TEXT NOT NULL,
proposed_file TEXT NOT NULL,
justification TEXT NOT NULL,
arrival_timestamp TEXT NOT NULL,
current_file_hash TEXT NOT NULL,
archived INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (queue_key, id)
);
```
`supervise_responses` lives in the host database:
```sql
CREATE TABLE supervise_responses (
queue_key TEXT NOT NULL,
proposal_id TEXT NOT NULL,
status TEXT NOT NULL,
notes TEXT NOT NULL,
final_file TEXT,
archived INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (queue_key, proposal_id)
);
```
`supervise_audit_entries` lives in the host database:
```sql
CREATE TABLE supervise_audit_entries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
bottle_slug TEXT NOT NULL,
component TEXT NOT NULL,
operator_action TEXT NOT NULL,
operator_notes TEXT NOT NULL,
justification TEXT NOT NULL,
diff TEXT NOT NULL
);
```
### Compatibility
The queue helpers take a bottle slug / queue key and perform equivalent
operations against `~/.bot-bottle/bot-bottle.db`:
- `list_pending_proposals` returns non-archived proposals without a non-archived
response, sorted by arrival time.
- `archive_proposal` marks matching proposal/response rows archived instead of
moving files into `processed/`.
- `wait_for_response` keeps the current polling behavior but polls SQLite.
The old audit path helpers (`audit_dir`, `audit_log_path`) stay available for
compatibility. `audit_log_path` no longer describes the active storage location;
callers should use `read_audit_entries`.
## Implementation chunks
1. Add SQLite store helpers for supervise queue and audit state.
2. Rewire `bot_bottle.supervise` queue/audit functions to the store.
3. Update supervise CLI discovery tests and queue/audit unit tests.
4. Run unit tests, pyright, and pylint for touched modules.
## Open questions
None.
+75
View File
@@ -0,0 +1,75 @@
"""Unit: `cli.py resume --headless` non-interactive rehydrate path.
The freeze / rehydrate loop needs a non-interactive `resume`: deliver a
follow-up prompt and skip the y/N preflight, reusing the same launch
core (`assume_yes` + `headless_prompt_text`) as `start --headless`.
"""
from __future__ import annotations
import unittest
from typing import Any
from unittest.mock import MagicMock, patch
import bot_bottle.cli.resume as resume_mod
from bot_bottle.log import Die
def _metadata():
md = MagicMock()
md.agent_name = "implementer"
md.copy_cwd = False
md.cwd = "/repo"
md.identity = "implementer-abc12"
md.bottle_names = ["claude"]
md.backend = "docker"
return md
class ResumeHeadlessTest(unittest.TestCase):
def setUp(self) -> None:
self._launch = patch.object(
resume_mod, "_launch_bottle", return_value=0
).start()
patch.object(
resume_mod, "read_metadata", return_value=_metadata()
).start()
manifest = MagicMock()
manifest.require_agent = MagicMock(return_value=None)
patch.object(
resume_mod.ManifestIndex, "resolve", return_value=manifest
).start()
self.addCleanup(patch.stopall)
def _launch_kwargs(self) -> dict[str, Any]:
self._launch.assert_called_once()
return dict(self._launch.call_args.kwargs)
def test_headless_passes_assume_yes_and_prompt(self):
rc = resume_mod.cmd_resume(
["implementer-abc12", "--headless", "--prompt", "Address the review"]
)
self.assertEqual(0, rc)
kwargs = self._launch_kwargs()
self.assertTrue(kwargs["assume_yes"])
self.assertEqual("Address the review", kwargs["headless_prompt_text"])
def test_interactive_resume_unchanged(self):
resume_mod.cmd_resume(["implementer-abc12"])
kwargs = self._launch_kwargs()
self.assertFalse(kwargs["assume_yes"])
self.assertEqual("", kwargs["headless_prompt_text"])
def test_headless_without_prompt_errors(self):
with self.assertRaises(Die):
resume_mod.cmd_resume(["implementer-abc12", "--headless"])
self._launch.assert_not_called()
def test_prompt_without_headless_errors(self):
with self.assertRaises(Die):
resume_mod.cmd_resume(["implementer-abc12", "--prompt", "hi"])
self._launch.assert_not_called()
if __name__ == "__main__":
unittest.main()
+4 -3
View File
@@ -107,7 +107,7 @@ def _egress_plan(
def _supervise_plan() -> SupervisePlan:
return SupervisePlan(
slug=SLUG,
db_path=STATE / "bot-bottle.db",
queue_dir=STATE / "supervise" / "queue",
internal_network=f"bot-bottle-net-{SLUG}",
)
@@ -392,7 +392,7 @@ class TestSidecarBundleShape(unittest.TestCase):
sc = self._render(supervise=True)["services"]["sidecars"]
env_strings = sc["environment"]
self.assertIn(f"SUPERVISE_BOTTLE_SLUG={SLUG}", env_strings)
self.assertIn("SUPERVISE_DB_PATH=/run/supervise/bot-bottle.db", env_strings)
self.assertTrue(any(e.startswith("SUPERVISE_QUEUE_DIR=") for e in env_strings))
self.assertTrue(any(e.startswith("SUPERVISE_PORT=") for e in env_strings))
def test_volumes_always_includes_egress_ca(self):
@@ -408,7 +408,8 @@ class TestSidecarBundleShape(unittest.TestCase):
self.assertIn("/etc/egress", targets)
self.assertIn("/git-gate-entrypoint.sh", targets)
self.assertIn("/git-gate/creds/upstream-known_hosts", targets)
self.assertIn("/run/supervise/bot-bottle.db", targets)
self.assertTrue(any("supervise/queue" in t or t.startswith("/run/supervise")
for t in targets))
def test_extra_hosts_omitted_for_git_upstreams(self):
sc = self._render(with_git=True)["services"]["sidecars"]
+1 -1
View File
@@ -74,7 +74,7 @@ def _plan(
if supervise:
supervise_plan = SupervisePlan(
slug="demo-abc12",
db_path=Path("/tmp/bot-bottle.db"),
queue_dir=Path("/tmp/queue"),
)
return DockerBottlePlan(
spec=spec,
+1 -1
View File
@@ -77,7 +77,7 @@ def _plan(
if supervise:
supervise_plan = SupervisePlan(
slug="demo-abc12",
db_path=Path("/tmp/bot-bottle.db"),
queue_dir=Path("/tmp/queue"),
)
return DockerBottlePlan(
spec=spec,
+107
View File
@@ -0,0 +1,107 @@
"""Unit: Forge abstraction + ScopedForge (PRD forge-native-integration)."""
from __future__ import annotations
import unittest
from bot_bottle.contrib.forge.base import (
Comment,
Forge,
ForgeScopeError,
Issue,
PullRequest,
ScopedForge,
)
class _RecordingForge(Forge):
"""In-memory fake that records writes."""
def __init__(self) -> None:
self.comments: list[tuple[int, str]] = []
self.descriptions: list[tuple[int, str]] = []
def read_issue(self, number: int) -> Issue:
return Issue(number=number, title="t", body="b", state="open")
def read_pr(self, number: int) -> PullRequest:
return PullRequest(
number=number, title="pr", body="b", state="open", merged=False
)
def read_comments(self, number: int) -> list[Comment]:
return [Comment(id=1, user="alice", body="hi")]
def post_comment(self, number: int, body: str) -> None:
self.comments.append((number, body))
def update_description(self, number: int, body: str) -> None:
self.descriptions.append((number, body))
def is_org_member(self, org: str, username: str) -> bool:
return username == "member"
def get_pr_for_issue(self, number: int) -> int | None:
return 99 if number == 17 else None
def is_pr_open(self, number: int) -> bool:
return True
class TestScopedForgeReads(unittest.TestCase):
def setUp(self) -> None:
self.inner = _RecordingForge()
self.scoped = ScopedForge(self.inner, assigned_issue=17, assigned_prs=[42])
def test_reads_pass_through_to_any_number(self):
# A number well outside the writable scope still reads fine.
self.assertEqual(123, self.scoped.read_issue(123).number)
self.assertEqual("alice", self.scoped.read_comments(500)[0].user)
def test_read_pr_passes_through(self):
pr = self.scoped.read_pr(999)
self.assertIsInstance(pr, PullRequest)
self.assertEqual(999, pr.number)
self.assertFalse(pr.merged)
def test_membership_and_pr_lookups_delegate(self):
self.assertTrue(self.scoped.is_org_member("bot-bottle", "member"))
self.assertFalse(self.scoped.is_org_member("bot-bottle", "stranger"))
self.assertEqual(99, self.scoped.get_pr_for_issue(17))
self.assertTrue(self.scoped.is_pr_open(8000))
class TestScopedForgeWrites(unittest.TestCase):
def setUp(self) -> None:
self.inner = _RecordingForge()
self.scoped = ScopedForge(self.inner, assigned_issue=17, assigned_prs=[42])
def test_writable_set_is_issue_plus_prs(self):
self.assertEqual(frozenset({17, 42}), self.scoped.writable)
def test_write_to_assigned_issue_allowed(self):
self.scoped.post_comment(17, "done")
self.assertEqual([(17, "done")], self.inner.comments)
def test_write_to_assigned_pr_allowed(self):
self.scoped.update_description(42, "new body")
self.assertEqual([(42, "new body")], self.inner.descriptions)
def test_comment_outside_scope_rejected(self):
with self.assertRaises(ForgeScopeError) as ctx:
self.scoped.post_comment(500, "spam")
self.assertIn("500", str(ctx.exception))
self.assertEqual([], self.inner.comments)
def test_description_outside_scope_rejected(self):
with self.assertRaises(ForgeScopeError):
self.scoped.update_description(500, "tamper")
self.assertEqual([], self.inner.descriptions)
def test_scope_error_is_permission_error(self):
# Sidecars can catch the stdlib base type.
self.assertIn(PermissionError, ForgeScopeError.__mro__)
if __name__ == "__main__":
unittest.main()
+145
View File
@@ -0,0 +1,145 @@
"""Unit: GiteaClient + GiteaForge (PRD forge-native-integration)."""
from __future__ import annotations
import json
import unittest
import urllib.error
from io import BytesIO
from unittest.mock import MagicMock, patch
from bot_bottle.contrib.gitea.client import GiteaClient, GiteaForge
def _client() -> GiteaClient:
return GiteaClient(
api_url="https://gitea.example.com/api/v1",
owner="didericis",
repo="bot-bottle",
token="test-token",
)
def _resp(body: object, status: int = 200) -> MagicMock:
resp = MagicMock()
resp.read.return_value = json.dumps(body).encode() if body is not None else b""
resp.status = status
resp.__enter__ = lambda s: s # type: ignore
resp.__exit__ = MagicMock(return_value=False)
return resp
def _http_error(code: int, body: str = "") -> urllib.error.HTTPError:
return urllib.error.HTTPError(
url="http://x", code=code, msg="err", hdrs=None, # type: ignore[arg-type]
fp=BytesIO(body.encode()),
)
_URLOPEN = "bot_bottle.contrib.gitea.client.urllib.request.urlopen"
class TestOrgMembership(unittest.TestCase):
def test_member_returns_true_on_2xx(self):
with patch(_URLOPEN, return_value=_resp(None, 204)) as m:
self.assertTrue(_client().is_org_member("bot-bottle", "alice"))
req = m.call_args.args[0]
self.assertIn("/orgs/bot-bottle/members/alice", req.full_url)
def test_nonmember_returns_false_on_404(self):
with patch(_URLOPEN, side_effect=_http_error(404)):
self.assertFalse(_client().is_org_member("bot-bottle", "stranger"))
def test_other_http_error_raises(self):
with patch(_URLOPEN, side_effect=_http_error(403, "forbidden")):
with self.assertRaises(RuntimeError) as ctx:
_client().is_org_member("bot-bottle", "alice")
self.assertIn("403", str(ctx.exception))
class TestForgeReads(unittest.TestCase):
def test_read_issue_maps_fields(self):
raw = {"number": 17, "title": "Bug", "body": "broken", "state": "open"}
with patch(_URLOPEN, return_value=_resp(raw)) as m:
issue = GiteaForge(_client()).read_issue(17)
self.assertEqual((17, "Bug", "broken", "open"),
(issue.number, issue.title, issue.body, issue.state))
self.assertIn("/repos/didericis/bot-bottle/issues/17",
m.call_args.args[0].full_url)
def test_read_issue_tolerates_null_body(self):
raw = {"number": 17, "title": "T", "body": None, "state": "open"}
with patch(_URLOPEN, return_value=_resp(raw)):
self.assertEqual("", GiteaForge(_client()).read_issue(17).body)
def test_read_comments_maps_user_login(self):
raw = [
{"id": 1, "user": {"login": "alice"}, "body": "hi"},
{"id": 2, "user": {"login": "bob"}, "body": "yo"},
]
with patch(_URLOPEN, return_value=_resp(raw)):
comments = GiteaForge(_client()).read_comments(17)
self.assertEqual(["alice", "bob"], [c.user for c in comments])
self.assertEqual([1, 2], [c.id for c in comments])
class TestForgeWrites(unittest.TestCase):
def test_post_comment_payload_and_url(self):
with patch(_URLOPEN, return_value=_resp(None, 201)) as m:
GiteaForge(_client()).post_comment(17, "done ✓")
req = m.call_args.args[0]
self.assertEqual("POST", req.method)
self.assertIn("/repos/didericis/bot-bottle/issues/17/comments", req.full_url)
self.assertEqual("done ✓", json.loads(req.data)["body"])
def test_update_description_patches_issue(self):
with patch(_URLOPEN, return_value=_resp(None, 200)) as m:
GiteaForge(_client()).update_description(17, "edited")
req = m.call_args.args[0]
self.assertEqual("PATCH", req.method)
self.assertTrue(req.full_url.endswith("/issues/17"))
self.assertEqual("edited", json.loads(req.data)["body"])
def test_auth_header_sent(self):
with patch(_URLOPEN, return_value=_resp(None, 201)) as m:
GiteaForge(_client()).post_comment(17, "x")
self.assertEqual("token test-token",
m.call_args.args[0].headers["Authorization"])
class TestPRHelpers(unittest.TestCase):
def test_get_pr_for_issue_returns_number_when_issue_is_pr(self):
raw = {"number": 18, "pull_request": {"merged": False}}
with patch(_URLOPEN, return_value=_resp(raw)):
self.assertEqual(18, GiteaForge(_client()).get_pr_for_issue(18))
def test_get_pr_for_issue_none_for_plain_issue(self):
raw = {"number": 17, "pull_request": None}
with patch(_URLOPEN, return_value=_resp(raw)):
self.assertIsNone(GiteaForge(_client()).get_pr_for_issue(17))
def test_is_pr_open_true_when_state_open(self):
with patch(_URLOPEN, return_value=_resp({"state": "open"})):
self.assertTrue(GiteaForge(_client()).is_pr_open(18))
def test_is_pr_open_false_when_closed(self):
with patch(_URLOPEN, return_value=_resp({"state": "closed"})):
self.assertFalse(GiteaForge(_client()).is_pr_open(18))
def test_read_pr_maps_fields_including_merged(self):
raw = {"number": 18, "title": "Fix", "body": "patch",
"state": "closed", "merged": True}
with patch(_URLOPEN, return_value=_resp(raw)) as m:
pr = GiteaForge(_client()).read_pr(18)
self.assertEqual((18, "Fix", "patch", "closed", True),
(pr.number, pr.title, pr.body, pr.state, pr.merged))
self.assertIn("/repos/didericis/bot-bottle/pulls/18",
m.call_args.args[0].full_url)
def test_read_pr_merged_defaults_false(self):
with patch(_URLOPEN, return_value=_resp({"number": 18, "state": "open"})):
self.assertFalse(GiteaForge(_client()).read_pr(18).merged)
if __name__ == "__main__":
unittest.main()
@@ -0,0 +1,99 @@
"""Unit: SQLite forge state store (PRD forge-native-integration)."""
from __future__ import annotations
import tempfile
import unittest
from dataclasses import replace
from pathlib import Path
from bot_bottle.contrib.gitea.forge_state import (
STATUS_FROZEN,
STATUS_RUNNING,
ForgeState,
SqliteForgeStateStore,
)
def _state(**over: object) -> ForgeState:
base = ForgeState(
owner="didericis",
repo="bot-bottle",
issue_number=17,
slug="implementer-abc12",
agent_name="implementer",
bottle_names=["claude"],
backend_name="docker",
agent_git_user="didericis-claude",
pr_number=42,
status=STATUS_FROZEN,
last_checkin_at="2026-06-29T12:04:12-04:00",
)
return replace(base, **over)
class ForgeStateStoreTest(unittest.TestCase):
def setUp(self) -> None:
tmp = Path(self.enterContext(tempfile.TemporaryDirectory())) # pylint: disable=consider-using-with
self.store = SqliteForgeStateStore(tmp / "sub" / "bot-bottle.db")
def test_round_trip(self):
self.store.upsert(_state())
self.assertEqual(_state(), self.store.get("didericis", "bot-bottle", 17))
def test_missing_returns_none(self):
self.assertIsNone(self.store.get("nobody", "nope", 1))
def test_creates_db_parent_dirs(self):
# setUp pointed at a non-existent 'sub/' dir; init must create it.
self.assertIsNone(self.store.get("x", "y", 1)) # no raise
def test_upsert_replaces(self):
self.store.upsert(_state(status=STATUS_RUNNING))
self.store.upsert(_state(status=STATUS_FROZEN))
got = self.store.get("didericis", "bot-bottle", 17)
assert got is not None
self.assertEqual(STATUS_FROZEN, got.status)
# Still one row, not two.
self.assertEqual(1, len(self.store.all()))
def test_delete_is_idempotent(self):
self.store.upsert(_state())
self.store.delete("didericis", "bot-bottle", 17)
self.store.delete("didericis", "bot-bottle", 17) # no raise
self.assertIsNone(self.store.get("didericis", "bot-bottle", 17))
def test_all_lists_across_repos_sorted(self):
self.store.upsert(_state(issue_number=18, slug="other"))
self.store.upsert(_state(issue_number=17))
self.store.upsert(_state(owner="acme", repo="widget", issue_number=3))
states = self.store.all()
self.assertEqual(3, len(states))
self.assertEqual(
[("acme", 3), ("didericis", 17), ("didericis", 18)],
[(s.owner, s.issue_number) for s in states],
)
def test_all_empty(self):
self.assertEqual([], self.store.all())
def test_bottle_names_list_preserved(self):
self.store.upsert(_state(bottle_names=["claude", "dev"]))
got = self.store.get("didericis", "bot-bottle", 17)
assert got is not None
self.assertEqual(["claude", "dev"], got.bottle_names)
def test_pr_number_nullable(self):
self.store.upsert(_state(pr_number=None))
got = self.store.get("didericis", "bot-bottle", 17)
assert got is not None
self.assertIsNone(got.pr_number)
def test_persists_across_store_instances(self):
self.store.upsert(_state())
reopened = SqliteForgeStateStore(self.store._db_path) # pylint: disable=protected-access
self.assertEqual(_state(), reopened.get("didericis", "bot-bottle", 17))
if __name__ == "__main__":
unittest.main()
@@ -47,6 +47,7 @@ def _addon() -> EgressAddon:
a: EgressAddon = EgressAddon.__new__(EgressAddon)
a.config = Config(routes=(), log=LOG_FULL)
a.safe_tokens = set()
a._supervise_queue_dir = ""
a._supervise_slug = ""
a._token_allow_timeout = 300.0
return a
+6 -3
View File
@@ -212,6 +212,7 @@ def _addon(config: Config) -> EgressAddon:
a: EgressAddon = EgressAddon.__new__(EgressAddon)
a.config = config
a.safe_tokens = set()
a._supervise_queue_dir = ""
a._supervise_slug = ""
a._token_allow_timeout = 300.0
a.routes_path = "/nonexistent/routes.yaml"
@@ -385,10 +386,10 @@ def _fake_sv(response_status: str | None) -> types.SimpleNamespace:
def _sha256_hex(_payload: Any) -> str:
return "hash"
def _noop(*_args: Any) -> None:
def _noop(_a: Any, _b: Any) -> None:
return None
def _read_response(_slug: Any, _pid: Any) -> Any:
def _read_response(_qd: Any, _pid: Any) -> Any:
if response_status is None:
raise OSError("not written yet") # forces poll -> timeout
return types.SimpleNamespace(status=response_status)
@@ -408,6 +409,7 @@ def _fake_sv(response_status: str | None) -> types.SimpleNamespace:
class TestSuperviseBranch(unittest.TestCase):
def _supervised_addon(self) -> EgressAddon:
addon = _addon(Config(routes=(Route(host="api.example.com"),)))
addon._supervise_queue_dir = "/tmp/egress-queue"
addon._supervise_slug = "test-bottle"
addon._token_allow_timeout = 0.05
return addon
@@ -630,13 +632,14 @@ class TestRedactSurfaces(unittest.TestCase):
class TestSuperviseWriteFailure(unittest.TestCase):
def test_write_proposal_oserror_blocks(self) -> None:
addon = _addon(Config(routes=(Route(host="api.example.com"),)))
addon._supervise_queue_dir = "/tmp/egress-queue"
addon._supervise_slug = "test-bottle"
addon._token_allow_timeout = 0.05
flow = _Flow(_Request(host="api.example.com", method="POST", body=f"k={_OPENAI_KEY}"))
fake = _fake_sv("approved")
def _raise(_p: Any) -> None:
def _raise(_qd: Any, _p: Any) -> None:
raise OSError("disk full")
fake.write_proposal = _raise
+2 -25
View File
@@ -14,7 +14,6 @@ from bot_bottle.git_gate import (
git_gate_render_access_hook,
git_gate_render_entrypoint,
git_gate_render_hook,
provision_git_gate_dynamic_keys,
revoke_git_gate_provisioned_keys,
_resolve_identity_file,
git_gate_upstreams_for_bottle,
@@ -210,9 +209,8 @@ class TestHookRender(unittest.TestCase):
# the suppressed findings for human approval.
self.assertIn("--ignore-gitleaks-allow", hook)
self.assertIn("--report-format=json", hook)
self.assertIn("tool=_sv.TOOL_GITLEAKS_ALLOW", hook)
self.assertIn("_sv.write_proposal", hook)
self.assertIn("_sv.read_response", hook)
self.assertIn('"tool": "gitleaks-allow"', hook)
self.assertIn("SUPERVISE_QUEUE_DIR", hook)
self.assertIn("SUPERVISE_BOTTLE_SLUG", hook)
self.assertIn("supervisor approved # gitleaks:allow", hook)
self.assertIn("supervisor rejected # gitleaks:allow", hook)
@@ -373,27 +371,6 @@ class TestDynamicKeyProvisioning(unittest.TestCase):
self.assertEqual("/tmp/provisioned-key", _resolve_identity_file(entry, "demo", self.stage))
mock_provision.assert_called_once()
def test_prepare_defers_gitea_key_provisioning(self):
bottle = self._gitea_manifest().bottles["dev"]
with patch("bot_bottle.git_gate_provision._provision_dynamic_key") as mock_provision:
plan = _StubGate().prepare(bottle, "demo", self.stage)
mock_provision.assert_not_called()
self.assertEqual("", plan.upstreams[0].identity_file)
def test_launch_time_helper_provisions_gitea_keys(self):
bottle = self._gitea_manifest().bottles["dev"]
plan = _StubGate().prepare(bottle, "demo", self.stage)
with patch(
"bot_bottle.git_gate_provision._provision_dynamic_key",
return_value="/tmp/provisioned-key",
) as mock_provision:
updated = provision_git_gate_dynamic_keys(bottle, plan, self.stage)
mock_provision.assert_called_once_with(bottle.git[0], "demo", self.stage)
self.assertEqual("/tmp/provisioned-key", updated.upstreams[0].identity_file)
def test_revoke_skips_non_gitea_and_missing_id_file(self):
revoke_git_gate_provisioned_keys(fixture_with_git().bottles["dev"], self.stage)
+2 -4
View File
@@ -71,9 +71,7 @@ def _plan(
else:
git_gate_plan = SimpleNamespace(upstreams=())
supervise_plan = (
SimpleNamespace(
db_path=Path("/state/bot-bottle.db"),
)
SimpleNamespace(queue_dir=Path("/state/supervise/queue"))
if supervise else None
)
agent_provision = SimpleNamespace(
@@ -139,7 +137,7 @@ class TestMacosContainerLaunchArgv(unittest.TestCase):
argv,
)
self.assertIn(
"type=bind,source=/state/bot-bottle.db,target=/run/supervise/bot-bottle.db",
"type=bind,source=/state/supervise/queue,target=/run/supervise/queue",
argv,
)
+1 -9
View File
@@ -130,7 +130,7 @@ def _plan(
if supervise:
supervise_plan = SupervisePlan(
slug="demo-abc12",
db_path=Path("/tmp/bot-bottle.db"),
queue_dir=Path("/tmp/queue"),
)
return SmolmachinesBottlePlan(
spec=spec,
@@ -422,14 +422,6 @@ class TestBundleLaunchSpec(unittest.TestCase):
spec.environment,
)
def test_supervise_adds_daemon_volume_and_env(self):
from bot_bottle.supervise import DB_PATH_IN_CONTAINER
plan = _plan(supervise=True)
spec = _bundle_launch_spec(plan, "net", "127.0.0.16")
self.assertIn("supervise", spec.daemons_csv)
self.assertIn(f"SUPERVISE_DB_PATH={DB_PATH_IN_CONTAINER}", spec.environment)
self.assertIn(("/tmp/bot-bottle.db", DB_PATH_IN_CONTAINER, False), spec.volumes)
def test_canary_env_visible_to_smolvm_guest(self):
plan = _plan(canary=True)
with patch.object(
+36 -48
View File
@@ -1,5 +1,6 @@
"""Unit: supervise queue + audit log + diff helpers (PRD 0013)."""
import json
import tempfile
import threading
import time
@@ -18,7 +19,7 @@ from bot_bottle.supervise import (
TOOL_EGRESS_ALLOW,
TOOL_GITLEAKS_ALLOW,
archive_proposal,
host_db_path,
audit_log_path,
list_pending_proposals,
read_audit_entries,
read_proposal,
@@ -111,44 +112,32 @@ class TestResponseRoundtrip(unittest.TestCase):
class TestQueueIO(unittest.TestCase):
def setUp(self):
self._tmp = tempfile.TemporaryDirectory(prefix="bot-bottle-supervise-test.")
self._home_patch = self._patch_home(Path(self._tmp.name))
self.slug = "dev"
self.queue_dir = Path(self._tmp.name)
def tearDown(self):
self._home_patch()
self._tmp.cleanup()
def _patch_home(self, fake_home: Path):
original = supervise.bot_bottle_root
def fake_root() -> Path:
return fake_home / ".bot-bottle"
supervise.bot_bottle_root = fake_root # type: ignore[assignment]
return lambda: setattr(supervise, "bot_bottle_root", original)
def test_write_and_read_proposal(self):
p = _proposal()
path = write_proposal(p)
path = write_proposal(self.queue_dir, p)
self.assertTrue(path.exists())
self.assertEqual(host_db_path(), path)
self.assertEqual(0o600, path.stat().st_mode & 0o777)
loaded = read_proposal(self.slug, p.id)
loaded = read_proposal(self.queue_dir, p.id)
self.assertEqual(p, loaded)
def test_list_pending_excludes_responded(self):
a = _proposal(justification="first")
b = _proposal(justification="second")
write_proposal(a)
write_proposal(b)
write_response(self.slug, Response(
write_proposal(self.queue_dir, a)
write_proposal(self.queue_dir, b)
write_response(self.queue_dir, Response(
proposal_id=a.id, status=STATUS_APPROVED, notes="",
))
pending = list_pending_proposals(self.slug)
pending = list_pending_proposals(self.queue_dir)
self.assertEqual([b.id], [p.id for p in pending])
def test_list_pending_returns_empty_for_missing_slug(self):
self.assertEqual([], list_pending_proposals("nope"))
def test_list_pending_returns_empty_for_missing_dir(self):
self.assertEqual([], list_pending_proposals(self.queue_dir / "nope"))
def test_list_pending_sorted_by_arrival(self):
# Fabricate two with explicit timestamps.
@@ -165,30 +154,30 @@ class TestQueueIO(unittest.TestCase):
now=datetime(2026, 5, 25, 14, 0, 0, tzinfo=timezone.utc),
)
# Write in reverse order.
write_proposal(b)
write_proposal(a)
ordered = list_pending_proposals(self.slug)
write_proposal(self.queue_dir, b)
write_proposal(self.queue_dir, a)
ordered = list_pending_proposals(self.queue_dir)
self.assertEqual([a.id, b.id], [p.id for p in ordered])
def test_write_and_read_response(self):
r = Response(proposal_id="xyz", status=STATUS_REJECTED, notes="no")
write_response(self.slug, r)
self.assertEqual(r, read_response(self.slug, "xyz"))
write_response(self.queue_dir, r)
self.assertEqual(r, read_response(self.queue_dir, "xyz"))
def test_wait_for_response_returns_when_file_appears(self):
p = _proposal()
write_proposal(p)
write_proposal(self.queue_dir, p)
def write_after_delay():
time.sleep(0.05)
write_response(self.slug, Response(
write_response(self.queue_dir, Response(
proposal_id=p.id, status=STATUS_APPROVED, notes="ok",
))
t = threading.Thread(target=write_after_delay)
t.start()
try:
r = wait_for_response(self.slug, p.id, poll_interval=0.01)
r = wait_for_response(self.queue_dir, p.id, poll_interval=0.01)
finally:
t.join()
self.assertEqual(STATUS_APPROVED, r.status)
@@ -198,24 +187,25 @@ class TestQueueIO(unittest.TestCase):
deadline = time.monotonic() + 0.05
with self.assertRaises(TimeoutError):
wait_for_response(
self.slug, "never",
self.queue_dir, "never",
poll_interval=0.01, deadline=deadline,
)
def test_archive_proposal_hides_rows(self):
def test_archive_proposal_moves_both_files(self):
p = _proposal()
write_proposal(p)
write_response(self.slug, Response(
write_proposal(self.queue_dir, p)
write_response(self.queue_dir, Response(
proposal_id=p.id, status=STATUS_APPROVED, notes="",
))
archive_proposal(self.slug, p.id)
self.assertEqual([], list_pending_proposals(self.slug))
with self.assertRaises(FileNotFoundError):
read_response(self.slug, p.id)
archive_proposal(self.queue_dir, p.id)
self.assertFalse((self.queue_dir / f"{p.id}.proposal.json").exists())
self.assertFalse((self.queue_dir / f"{p.id}.response.json").exists())
self.assertTrue((self.queue_dir / "processed" / f"{p.id}.proposal.json").exists())
self.assertTrue((self.queue_dir / "processed" / f"{p.id}.response.json").exists())
def test_archive_is_idempotent_on_missing_files(self):
# Should not raise.
archive_proposal(self.slug, "nope")
archive_proposal(self.queue_dir, "nope")
class TestAuditLog(unittest.TestCase):
@@ -247,7 +237,6 @@ class TestAuditLog(unittest.TestCase):
diff="--- before\n+++ after\n",
)
path = write_audit_entry(e)
self.assertEqual(host_db_path(), path)
self.assertEqual(0o600, path.stat().st_mode & 0o777)
loaded = read_audit_entries("cred-proxy", "dev")
self.assertEqual([e], loaded)
@@ -263,13 +252,12 @@ class TestAuditLog(unittest.TestCase):
justification="",
diff="",
))
entries = read_audit_entries("egress", "dev")
self.assertEqual(3, len(entries))
self.assertEqual(
["2026-05-25T12:00:00+00:00", "2026-05-25T12:00:01+00:00",
"2026-05-25T12:00:02+00:00"],
[entry.timestamp for entry in entries],
)
path = audit_log_path("egress", "dev")
with path.open() as f:
lines = [line for line in f if line.strip()]
self.assertEqual(3, len(lines))
for line in lines:
self.assertTrue(json.loads(line)) # each line is valid JSON
def test_separate_logs_per_component_slug(self):
write_audit_entry(AuditEntry(
@@ -391,7 +379,7 @@ class TestSupervisePrepare(unittest.TestCase):
def test_prepare_creates_queue(self):
plan = _StubSupervise().prepare("dev", self.stage_dir)
self.assertTrue(plan.db_path.is_file())
self.assertTrue(plan.queue_dir.is_dir())
self.assertEqual("dev", plan.slug)
self.assertEqual("", plan.internal_network)
+27 -15
View File
@@ -77,7 +77,9 @@ class TestDiscoverPending(_FakeHomeMixin, unittest.TestCase):
def test_walks_all_slug_subdirs(self):
for slug in ("dev", "api"):
supervise.write_proposal(_proposal(slug=slug))
qdir = supervise.queue_dir_for_slug(slug)
qdir.mkdir(parents=True)
supervise.write_proposal(qdir, _proposal(slug=slug))
pending = supervise_cli.discover_pending()
self.assertEqual({"dev", "api"}, {qp.proposal.bottle_slug for qp in pending})
@@ -95,14 +97,18 @@ class TestDiscoverPending(_FakeHomeMixin, unittest.TestCase):
now=datetime(2026, 5, 25, 14, 0, 0, tzinfo=timezone.utc),
)
for p in (late, early):
supervise.write_proposal(p)
qdir = supervise.queue_dir_for_slug(p.bottle_slug)
qdir.mkdir(parents=True, exist_ok=True)
supervise.write_proposal(qdir, p)
pending = supervise_cli.discover_pending()
self.assertEqual([early.id, late.id], [qp.proposal.id for qp in pending])
def test_excludes_already_responded(self):
p = _proposal()
supervise.write_proposal(p)
supervise.write_response("dev", supervise.Response(
qdir = supervise.queue_dir_for_slug("dev")
qdir.mkdir(parents=True)
supervise.write_proposal(qdir, p)
supervise.write_response(qdir, supervise.Response(
proposal_id=p.id, status=STATUS_APPROVED, notes="",
))
self.assertEqual([], supervise_cli.discover_pending())
@@ -117,8 +123,10 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
def _enqueue(self, tool: str = TOOL_EGRESS_ALLOW):
p = _proposal(tool=tool)
supervise.write_proposal(p)
return supervise_cli.QueuedProposal(proposal=p)
qdir = supervise.queue_dir_for_slug("dev")
qdir.mkdir(parents=True, exist_ok=True)
supervise.write_proposal(qdir, p)
return supervise_cli.QueuedProposal(proposal=p, queue_dir=qdir)
def test_approve_writes_response(self):
qp = self._enqueue()
@@ -127,7 +135,7 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
return_value=("routes: []\n", "routes:\n - host: example.com\n"),
):
supervise_cli.approve(qp)
resp = read_response(qp.proposal.bottle_slug, qp.proposal.id)
resp = read_response(qp.queue_dir, qp.proposal.id)
self.assertEqual(STATUS_APPROVED, resp.status)
self.assertIsNone(resp.final_file)
@@ -142,7 +150,7 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
final_file="routes:\n - host: edited.example.com\n",
notes="tweaked",
)
resp = read_response(qp.proposal.bottle_slug, qp.proposal.id)
resp = read_response(qp.queue_dir, qp.proposal.id)
self.assertEqual(STATUS_MODIFIED, resp.status)
self.assertEqual("routes:\n - host: edited.example.com\n", resp.final_file)
self.assertEqual("tweaked", resp.notes)
@@ -150,7 +158,7 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
def test_reject_writes_rejection(self):
qp = self._enqueue()
supervise_cli.reject(qp, reason="nope")
resp = read_response(qp.proposal.bottle_slug, qp.proposal.id)
resp = read_response(qp.queue_dir, qp.proposal.id)
self.assertEqual(STATUS_REJECTED, resp.status)
self.assertEqual("nope", resp.notes)
@@ -173,33 +181,36 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
def test_approve_gitleaks_allow_leaves_response_for_gate(self):
qp = self._enqueue(tool=TOOL_GITLEAKS_ALLOW)
supervise_cli.approve(qp, notes="dummy fixture")
# Gate polls the DB for the response; TUI must not archive it.
resp = read_response(qp.proposal.bottle_slug, qp.proposal.id)
# Gate polls the queue dir for the response; TUI must not archive it.
resp = read_response(qp.queue_dir, qp.proposal.id)
self.assertEqual(STATUS_APPROVED, resp.status)
self.assertEqual("dummy fixture", resp.notes)
self.assertFalse((qp.queue_dir / "processed").exists())
def test_tui_gitleaks_allow_requires_reason(self):
qp = self._enqueue(tool=TOOL_GITLEAKS_ALLOW)
with patch.object(supervise_cli, "_prompt", return_value=""):
status = supervise_cli._approve_from_tui(None, qp) # type: ignore[arg-type]
self.assertEqual("approve aborted (empty reason)", status)
self.assertFalse((qp.queue_dir / "processed").exists())
def test_tui_gitleaks_allow_writes_reason(self):
qp = self._enqueue(tool=TOOL_GITLEAKS_ALLOW)
with patch.object(supervise_cli, "_prompt", return_value="test fixture"):
status = supervise_cli._approve_from_tui(None, qp) # type: ignore[arg-type]
self.assertIn("approved gitleaks-allow", status)
resp = read_response(qp.proposal.bottle_slug, qp.proposal.id)
resp = read_response(qp.queue_dir, qp.proposal.id)
self.assertEqual("test fixture", resp.notes)
def test_approve_token_allow_leaves_response_for_egress(self):
qp = self._enqueue(tool=TOOL_EGRESS_TOKEN_ALLOW)
supervise_cli.approve(qp, notes="false positive")
# The egress addon polls the DB for the response; the TUI must
# The egress addon polls the queue dir for the response; the TUI must
# not archive it (the addon archives after reading).
resp = read_response(qp.proposal.bottle_slug, qp.proposal.id)
resp = read_response(qp.queue_dir, qp.proposal.id)
self.assertEqual(STATUS_APPROVED, resp.status)
self.assertEqual("false positive", resp.notes)
self.assertFalse((qp.queue_dir / "processed").exists())
def test_token_allow_writes_no_audit_log(self):
qp = self._enqueue(tool=TOOL_EGRESS_TOKEN_ALLOW)
@@ -211,13 +222,14 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
with patch.object(supervise_cli, "_prompt", return_value=""):
status = supervise_cli._approve_from_tui(None, qp) # type: ignore[arg-type]
self.assertEqual("approve aborted (empty reason)", status)
self.assertFalse((qp.queue_dir / "processed").exists())
def test_tui_token_allow_writes_reason(self):
qp = self._enqueue(tool=TOOL_EGRESS_TOKEN_ALLOW)
with patch.object(supervise_cli, "_prompt", return_value="legit"):
status = supervise_cli._approve_from_tui(None, qp) # type: ignore[arg-type]
self.assertIn("approved egress-token-allow", status)
resp = read_response(qp.proposal.bottle_slug, qp.proposal.id)
resp = read_response(qp.queue_dir, qp.proposal.id)
self.assertEqual("legit", resp.notes)
def test_suffix_for_token_allow_is_txt(self):
+54 -112
View File
@@ -4,6 +4,7 @@ fallback paths."""
from __future__ import annotations
import os
import tempfile
import time
import unittest
@@ -11,19 +12,14 @@ from pathlib import Path
from unittest.mock import patch
from bot_bottle import supervise
from bot_bottle.audit_store import AuditStore
from bot_bottle.queue_store import QueueStore
from bot_bottle.supervise import (
AuditEntry,
Proposal,
STATUS_APPROVED,
TOOL_EGRESS_ALLOW,
list_pending_proposals,
read_audit_entries,
read_proposal,
read_response,
wait_for_response,
write_audit_entry,
)
@@ -41,53 +37,58 @@ class TestPathHelpers(unittest.TestCase):
def test_bot_bottle_root(self) -> None:
self.assertTrue(str(supervise.bot_bottle_root()).endswith(".bot-bottle"))
def test_queue_dir_for_slug(self) -> None:
self.assertIn("slug", str(supervise.queue_dir_for_slug("slug")))
def test_id_from_non_proposal_filename(self) -> None:
self.assertIsNone(supervise._id_from_proposal_filename(Path("x.response.json")))
class TestReadMalformed(unittest.TestCase):
def test_read_proposal_missing_row(self) -> None:
def test_read_proposal_non_dict(self) -> None:
with tempfile.TemporaryDirectory() as d:
with patch.dict("os.environ", {"HOME": d}), \
self.assertRaises(FileNotFoundError):
read_proposal("slug", "p")
(Path(d) / "p.proposal.json").write_text("[]")
with self.assertRaises(ValueError):
read_proposal(Path(d), "p")
def test_read_response_missing_row(self) -> None:
def test_read_response_non_dict(self) -> None:
with tempfile.TemporaryDirectory() as d:
with patch.dict("os.environ", {"HOME": d}), \
self.assertRaises(FileNotFoundError):
read_response("slug", "p")
(Path(d) / "p.response.json").write_text("[]")
with self.assertRaises(ValueError):
read_response(Path(d), "p")
def test_list_pending_reads_db_only(self) -> None:
def test_list_pending_skips_malformed(self) -> None:
with tempfile.TemporaryDirectory() as d:
with patch.dict("os.environ", {"HOME": d}):
supervise.write_proposal(_proposal())
pending = list_pending_proposals("slug")
qd = Path(d)
(qd / "bad.proposal.json").write_text("{ not json")
(qd / "arr.proposal.json").write_text("[]")
(qd / "incomplete.proposal.json").write_text("{}") # from_dict raises
supervise.write_proposal(qd, _proposal()) # one valid
pending = list_pending_proposals(qd)
self.assertEqual(1, len(pending))
self.assertEqual("slug", pending[0].bottle_slug)
def test_list_pending_skips_when_response_present(self) -> None:
with tempfile.TemporaryDirectory() as d:
with patch.dict("os.environ", {"HOME": d}):
p = _proposal()
supervise.write_proposal(p)
supervise.write_response("slug", supervise.Response(
proposal_id=p.id,
status=STATUS_APPROVED,
notes="",
))
self.assertEqual([], list_pending_proposals("slug"))
qd = Path(d)
p = _proposal()
supervise.write_proposal(qd, p)
(qd / f"{p.id}.response.json").write_text("{}") # response exists -> skipped
self.assertEqual([], list_pending_proposals(qd))
class TestWaitForResponse(unittest.TestCase):
def test_missing_response_times_out(self) -> None:
def test_malformed_response_then_timeout(self) -> None:
with tempfile.TemporaryDirectory() as d:
with patch.dict("os.environ", {"HOME": d}), \
self.assertRaises(TimeoutError):
wait_for_response("slug", "p", deadline=time.monotonic())
(Path(d) / "p.response.json").write_text("{ not json")
with self.assertRaises(TimeoutError):
wait_for_response(Path(d), "p", deadline=time.monotonic())
def test_empty_db_response_does_not_count(self) -> None:
def test_incomplete_response_then_timeout(self) -> None:
with tempfile.TemporaryDirectory() as d:
with patch.dict("os.environ", {"HOME": d}), \
self.assertRaises(TimeoutError):
wait_for_response("slug", "p", deadline=time.monotonic())
(Path(d) / "p.response.json").write_text("{}") # dict but from_dict raises
with self.assertRaises(TimeoutError):
wait_for_response(Path(d), "p", deadline=time.monotonic())
class TestReadAuditEntries(unittest.TestCase):
@@ -96,94 +97,35 @@ class TestReadAuditEntries(unittest.TestCase):
patch.dict("os.environ", {"HOME": home}):
self.assertEqual([], read_audit_entries("egress", "nope"))
def test_reads_entries_from_db(self) -> None:
with tempfile.TemporaryDirectory() as home, \
patch.dict("os.environ", {"HOME": home}):
write_audit_entry(AuditEntry(
timestamp="t",
bottle_slug="slug",
component="egress",
operator_action="approve",
operator_notes="",
justification="",
diff="",
))
write_audit_entry(AuditEntry(
timestamp="t",
bottle_slug="other",
component="egress",
operator_action="reject",
operator_notes="",
justification="",
diff="",
))
entries = read_audit_entries("egress", "slug")
self.assertEqual(1, len(entries))
self.assertEqual("approve", entries[0].operator_action)
def test_legacy_audit_log_file_does_not_count(self) -> None:
def test_skips_malformed_lines(self) -> None:
with tempfile.TemporaryDirectory() as home, \
patch.dict("os.environ", {"HOME": home}):
path = supervise.audit_log_path("egress", "slug")
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(
valid = (
'{"timestamp": "t", "bottle_slug": "slug", "component": "egress",'
' "operator_action": "approve", "operator_notes": "",'
' "justification": "", "diff": ""}\n'
' "justification": "", "diff": ""}'
)
path.write_text(
"\n" # blank line skipped
"{ not json\n" # JSONDecodeError skipped
"[]\n" # not a dict skipped
"{}\n" # missing fields -> ValueError skipped
+ valid + "\n"
)
entries = read_audit_entries("egress", "slug")
self.assertEqual([], entries)
self.assertEqual(1, len(entries))
self.assertEqual("approve", entries[0].operator_action)
class TestStoreGuardBranches(unittest.TestCase):
"""Direct QueueStore / AuditStore construction and early-return guard branches."""
def test_queue_store_explicit_db_path(self):
with tempfile.TemporaryDirectory() as d:
db = Path(d) / "q.db"
store = QueueStore("key", db_path=db)
self.assertTrue(db.is_file())
self.assertEqual(db, store.db_path)
def test_queue_store_missing_db_list_pending_returns_empty(self):
with tempfile.TemporaryDirectory() as d:
db = Path(d) / "q.db"
store = QueueStore("key", db_path=db)
db.unlink()
self.assertEqual([], store.list_pending_proposals())
def test_queue_store_missing_db_list_all_returns_empty(self):
with tempfile.TemporaryDirectory() as d:
db = Path(d) / "q.db"
store = QueueStore("key", db_path=db)
db.unlink()
self.assertEqual([], store.list_all_pending_proposals())
def test_queue_store_missing_db_archive_is_noop(self):
with tempfile.TemporaryDirectory() as d:
db = Path(d) / "q.db"
store = QueueStore("key", db_path=db)
db.unlink()
store.archive_proposal("anything") # must not raise
def test_queue_store_chmod_oserror_is_swallowed(self):
with tempfile.TemporaryDirectory() as d:
db = Path(d) / "q.db"
with patch("pathlib.Path.chmod", side_effect=OSError("ro")):
QueueStore("key", db_path=db) # must not raise
def test_audit_store_missing_db_read_returns_empty(self):
with tempfile.TemporaryDirectory() as d:
db = Path(d) / "a.db"
store = AuditStore(db_path=db)
db.unlink()
self.assertEqual([], store.read_audit_entries("egress", "slug"))
def test_audit_store_chmod_oserror_is_swallowed(self):
with tempfile.TemporaryDirectory() as d:
db = Path(d) / "a.db"
with patch("pathlib.Path.chmod", side_effect=OSError("ro")):
AuditStore(db_path=db) # must not raise
class TestFlockFallback(unittest.TestCase):
def test_flock_on_closed_fd_is_swallowed(self) -> None:
# flock on a closed fd raises OSError(EBADF), which the helpers swallow.
fd = os.open(os.devnull, os.O_RDONLY)
os.close(fd)
supervise._try_flock(fd)
supervise._try_funlock(fd)
if __name__ == "__main__":
+18 -22
View File
@@ -112,7 +112,7 @@ class TestRpcErrorTaxonomy(unittest.TestCase):
validate_proposed_file(_sv.TOOL_EGRESS_ALLOW, "routes: nope\n")
def test_unknown_tool_in_tools_call_is_client_error(self):
config = ServerConfig(bottle_slug="dev")
config = ServerConfig(bottle_slug="dev", queue_dir=Path("/unused"))
with self.assertRaises(_RpcClientError) as cm:
handle_tools_call({"name": "no-such-tool", "arguments": {}}, config)
self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code)
@@ -122,9 +122,9 @@ class TestRpcInternalErrorOnIoFailure(unittest.TestCase):
def test_write_proposal_os_error_raises_internal(self):
config = ServerConfig(
bottle_slug="dev",
queue_dir=Path("/dev/null/cannot-exist"),
)
with patch.object(_sv, "write_proposal", side_effect=OSError("disk full")), \
self.assertRaises(_RpcInternalError) as cm:
with self.assertRaises(_RpcInternalError) as cm:
handle_tools_call(
{
"name": _sv.TOOL_EGRESS_ALLOW,
@@ -265,31 +265,21 @@ class TestHandleToolsList(unittest.TestCase):
class TestHandleToolsCall(unittest.TestCase):
def setUp(self):
self._tmp = tempfile.TemporaryDirectory(prefix="supervise-server-test.")
self._home_patch = self._patch_home(Path(self._tmp.name))
self.config = ServerConfig(bottle_slug="dev")
self.queue_dir = Path(self._tmp.name)
self.config = ServerConfig(bottle_slug="dev", queue_dir=self.queue_dir)
def tearDown(self):
self._home_patch()
self._tmp.cleanup()
def _patch_home(self, fake_home: Path):
original = _sv.bot_bottle_root
def fake_root() -> Path:
return fake_home / ".bot-bottle"
_sv.bot_bottle_root = fake_root # type: ignore[assignment]
return lambda: setattr(_sv, "bot_bottle_root", original)
def _respond_when_proposal_appears(self, status: str, notes: str = "") -> threading.Thread:
"""Background thread: poll the queue for a fresh proposal, write a
matching response. Returns the thread so the test can join it."""
def runner():
for _ in range(200):
pending = _sv.list_pending_proposals("dev")
pending = _sv.list_pending_proposals(self.queue_dir)
if pending:
p = pending[0]
_sv.write_response("dev", _sv.Response(
_sv.write_response(self.queue_dir, _sv.Response(
proposal_id=p.id, status=status, notes=notes,
))
return
@@ -422,11 +412,15 @@ class TestHandleToolsCall(unittest.TestCase):
finally:
responder.join()
# No pending proposals left after archive.
self.assertEqual([], _sv.list_pending_proposals("dev"))
self.assertEqual([], _sv.list_pending_proposals(self.queue_dir))
# Both files moved to processed/.
processed = list((self.queue_dir / "processed").glob("*.json"))
self.assertEqual(2, len(processed))
def test_pending_response_times_out_without_archive(self):
config = ServerConfig(
bottle_slug="dev",
queue_dir=self.queue_dir,
response_timeout_seconds=0.05,
)
result = handle_tools_call(
@@ -444,7 +438,8 @@ class TestHandleToolsCall(unittest.TestCase):
text = result["content"][0]["text"] # type: ignore[index]
self.assertIn("status: pending", text)
self.assertIn("proposal remains queued", text)
self.assertEqual(1, len(_sv.list_pending_proposals("dev")))
self.assertEqual(1, len(_sv.list_pending_proposals(self.queue_dir)))
self.assertFalse((self.queue_dir / "processed").exists())
class TestHandleListEgressRoutes(unittest.TestCase):
@@ -466,7 +461,7 @@ class TestHandleListEgressRoutes(unittest.TestCase):
with patch.object(supervise_server.urllib.request, "build_opener", return_value=_Opener()):
result = handle_list_egress_routes(
{},
ServerConfig(bottle_slug="dev"),
ServerConfig(bottle_slug="dev", queue_dir=Path("/unused")),
)
self.assertFalse(result["isError"]) # type: ignore[index]
@@ -481,7 +476,7 @@ class TestHandleListEgressRoutes(unittest.TestCase):
with patch.object(supervise_server.urllib.request, "build_opener", return_value=_Opener()):
result = handle_list_egress_routes(
{},
ServerConfig(bottle_slug="dev"),
ServerConfig(bottle_slug="dev", queue_dir=Path("/unused")),
)
self.assertTrue(result["isError"]) # type: ignore[index]
@@ -549,6 +544,7 @@ class TestHttpEndToEnd(unittest.TestCase):
def setUp(self):
self._tmp = tempfile.TemporaryDirectory(prefix="supervise-http-test.")
self.queue_dir = Path(self._tmp.name)
# Pick a random port by binding to :0 first.
import socket
s = socket.socket()
@@ -556,7 +552,7 @@ class TestHttpEndToEnd(unittest.TestCase):
self.port = s.getsockname()[1]
s.close()
self.server = MCPServer(("127.0.0.1", self.port), MCPHandler)
self.server.config = ServerConfig(bottle_slug="dev")
self.server.config = ServerConfig(bottle_slug="dev", queue_dir=self.queue_dir)
self.thread = threading.Thread(
target=self.server.serve_forever, daemon=True,
)