diff --git a/Dockerfile.sidecars b/Dockerfile.sidecars index a94960e..2e28b1a 100644 --- a/Dockerfile.sidecars +++ b/Dockerfile.sidecars @@ -18,7 +18,7 @@ # /git-gate-entrypoint.sh docker-cp'd at start time # /git-gate/creds/* docker-cp'd at start time # /git/* bare repos, populated at runtime -# /run/supervise/queue/ bind-mounted at run time +# /run/supervise/bot-bottle.db bind-mounted at run time # /home/mitmproxy/.mitmproxy/ mitmproxy CA dir # # Exposed ports inside the container: @@ -66,6 +66,10 @@ COPY bot_bottle/egress_dlp_config.py /app/egress_dlp_config.py COPY bot_bottle/egress_addon.py /app/egress_addon.py COPY bot_bottle/dlp_detectors.py /app/dlp_detectors.py COPY bot_bottle/yaml_subset.py /app/yaml_subset.py +COPY bot_bottle/migrations.py /app/migrations.py +COPY bot_bottle/db_store.py /app/db_store.py +COPY bot_bottle/queue_store.py /app/queue_store.py +COPY bot_bottle/audit_store.py /app/audit_store.py COPY bot_bottle/supervise.py /app/supervise.py COPY bot_bottle/supervise_server.py /app/supervise_server.py COPY bot_bottle/sidecar_init.py /app/sidecar_init.py @@ -81,7 +85,7 @@ RUN mkdir -p \ /etc/git-gate \ /git-gate/creds \ /git \ - /run/supervise/queue \ + /run/supervise \ /home/mitmproxy/.mitmproxy # Documentation only — the compose renderer publishes whichever diff --git a/bot_bottle/audit_store.py b/bot_bottle/audit_store.py new file mode 100644 index 0000000..5f848d0 --- /dev/null +++ b/bot_bottle/audit_store.py @@ -0,0 +1,113 @@ +"""SQLite-backed audit store for supervise (PRD 0013).""" + +from __future__ import annotations + +import sqlite3 +from pathlib import Path +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from .supervise import AuditEntry + +try: + from .db_store import DbStore + from .migrations import TableMigrations +except ImportError: + from db_store import DbStore # type: ignore[import-not-found] # pylint: disable=import-error,no-name-in-module + from migrations import TableMigrations # type: ignore[import-not-found] # pylint: disable=import-error,no-name-in-module + + +def get_supervise_mod() -> object: + """Lazy import of supervise to avoid a circular-import at module init time. + Mirrors our own module identity so patches on supervise.bot_bottle_root + propagate correctly in both flat (sidecar / sys.path-injection tests) and + package contexts.""" + import sys + sv_name = "supervise" if __name__ == "audit_store" else "bot_bottle.supervise" + if sv_name in sys.modules: + return sys.modules[sv_name] + try: + import bot_bottle.supervise as _m + except ImportError: + import supervise as _m # type: ignore[import-not-found] # pylint: disable=import-error,no-name-in-module + return _m + + +# One entry per schema version: _MIGRATIONS.migrations[0] brings a fresh DB +# to version 1, [1] to version 2, and so on. Add new migrations at the end; +# never edit existing ones. +_MIGRATIONS = TableMigrations("audit_store", [ + # v1 — initial schema + """ + CREATE TABLE IF NOT EXISTS supervise_audit_entries ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL, + bottle_slug TEXT NOT NULL, + component TEXT NOT NULL, + operator_action TEXT NOT NULL, + operator_notes TEXT NOT NULL, + justification TEXT NOT NULL, + diff TEXT NOT NULL + ) + """, +]) + + +class AuditStore(DbStore): + """SQLite-backed persistent store for supervise audit entries.""" + + def __init__(self, db_path: Path | None = None) -> None: + resolved = db_path or get_supervise_mod().host_db_path() # type: ignore[attr-defined] + super().__init__(resolved, _MIGRATIONS) + + def write_audit_entry(self, entry: AuditEntry) -> Path: + with self._connect() as conn: + conn.execute( + """ + INSERT INTO supervise_audit_entries ( + timestamp, bottle_slug, component, operator_action, + operator_notes, justification, diff + ) VALUES (?, ?, ?, ?, ?, ?, ?) + """, + ( + entry.timestamp, + entry.bottle_slug, + entry.component, + entry.operator_action, + entry.operator_notes, + entry.justification, + entry.diff, + ), + ) + self._chmod() + return self.db_path + + def read_audit_entries(self, component: str, slug: str) -> list[AuditEntry]: + if not self.db_path.is_file(): + return [] + with self._connect() as conn: + rows = conn.execute( + """ + SELECT * FROM supervise_audit_entries + WHERE component = ? AND bottle_slug = ? + ORDER BY id + """, + (component, slug), + ).fetchall() + return [self._row_to_entry(row) for row in rows] + + @staticmethod + def _row_to_entry(row: sqlite3.Row) -> AuditEntry: + m = get_supervise_mod() + return m.AuditEntry( # type: ignore[attr-defined] + timestamp=row["timestamp"], + bottle_slug=row["bottle_slug"], + component=row["component"], + operator_action=row["operator_action"], + operator_notes=row["operator_notes"], + justification=row["justification"], + diff=row["diff"], + ) + + +__all__ = ["AuditStore"] diff --git a/bot_bottle/backend/docker/compose.py b/bot_bottle/backend/docker/compose.py index 11ebc2c..9929610 100644 --- a/bot_bottle/backend/docker/compose.py +++ b/bot_bottle/backend/docker/compose.py @@ -34,7 +34,7 @@ from ...egress import ( from ...git_gate import GIT_GATE_HOSTNAME from ...log import die, warn from ...supervise import ( - QUEUE_DIR_IN_CONTAINER, + DB_PATH_IN_CONTAINER, SUPERVISE_HOSTNAME, SUPERVISE_PORT, ) @@ -163,16 +163,15 @@ def _sidecar_bundle_service(plan: DockerBottlePlan) -> dict[str, Any]: if sp is not None: env += [ f"SUPERVISE_BOTTLE_SLUG={plan.slug}", - f"SUPERVISE_QUEUE_DIR={QUEUE_DIR_IN_CONTAINER}", + f"SUPERVISE_DB_PATH={DB_PATH_IN_CONTAINER}", f"SUPERVISE_PORT={SUPERVISE_PORT}", ] volumes.append({ "type": "bind", - "source": str(sp.queue_dir), - "target": QUEUE_DIR_IN_CONTAINER, + "source": str(sp.db_path), + "target": DB_PATH_IN_CONTAINER, "read_only": False, }) - internal_aliases = [EGRESS_HOSTNAME] if gp.upstreams: internal_aliases.append(GIT_GATE_HOSTNAME) diff --git a/bot_bottle/backend/macos_container/launch.py b/bot_bottle/backend/macos_container/launch.py index e0fe254..10976d9 100644 --- a/bot_bottle/backend/macos_container/launch.py +++ b/bot_bottle/backend/macos_container/launch.py @@ -33,7 +33,7 @@ from ...git_gate import ( revoke_git_gate_provisioned_keys, ) from ...log import die, info, warn -from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT +from ...supervise import DB_PATH_IN_CONTAINER, SUPERVISE_PORT from ...util import expand_tilde from ..docker.egress import EGRESS_CA_IN_CONTAINER, EGRESS_PORT from ..docker.git_gate import ( @@ -379,7 +379,7 @@ def _sidecar_env_entries(plan: MacosContainerBottlePlan) -> tuple[str, ...]: if plan.supervise_plan is not None: env += [ f"SUPERVISE_BOTTLE_SLUG={plan.slug}", - f"SUPERVISE_QUEUE_DIR={QUEUE_DIR_IN_CONTAINER}", + f"SUPERVISE_DB_PATH={DB_PATH_IN_CONTAINER}", f"SUPERVISE_PORT={SUPERVISE_PORT}", ] return tuple(env) @@ -405,7 +405,7 @@ def _sidecar_mounts( sp = plan.supervise_plan if sp is not None: - mounts.append((str(sp.queue_dir), QUEUE_DIR_IN_CONTAINER, False)) + mounts.append((str(sp.db_path), DB_PATH_IN_CONTAINER, False)) return tuple(mounts) diff --git a/bot_bottle/backend/smolmachines/launch.py b/bot_bottle/backend/smolmachines/launch.py index 483cf85..8736567 100644 --- a/bot_bottle/backend/smolmachines/launch.py +++ b/bot_bottle/backend/smolmachines/launch.py @@ -27,7 +27,7 @@ from ...egress import ( egress_resolve_token_values, egress_sidecar_env_entries, ) -from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT +from ...supervise import DB_PATH_IN_CONTAINER, SUPERVISE_PORT from ...util import expand_tilde from ..docker import util as docker_mod from ..docker.egress import ( @@ -369,10 +369,10 @@ def _bundle_launch_spec( daemons.append("supervise") env += [ f"SUPERVISE_BOTTLE_SLUG={plan.slug}", - f"SUPERVISE_QUEUE_DIR={QUEUE_DIR_IN_CONTAINER}", + f"SUPERVISE_DB_PATH={DB_PATH_IN_CONTAINER}", f"SUPERVISE_PORT={SUPERVISE_PORT}", ] - volumes.append((str(sp.queue_dir), QUEUE_DIR_IN_CONTAINER, False)) + volumes.append((str(sp.db_path), DB_PATH_IN_CONTAINER, False)) # Container ports the agent reaches from the smolvm guest — # published on host loopback so the guest can dial via TSI + diff --git a/bot_bottle/bottle_state.py b/bot_bottle/bottle_state.py index 02fe3b1..7eb3316 100644 --- a/bot_bottle/bottle_state.py +++ b/bot_bottle/bottle_state.py @@ -284,9 +284,8 @@ def git_gate_state_dir(identity: str) -> Path: def supervise_state_dir(identity: str) -> Path: """State subdir reserved for supervise sidecar bind-mount sources. - The queue dir is intentionally NOT under here — it lives at - ~/.bot-bottle/queue// alongside the audit logs, so it - survives state-dir cleanup.""" + Runtime queue/audit rows live in the host-level bot-bottle SQLite + database, so they survive state-dir cleanup.""" return bottle_state_dir(identity) / _SUPERVISE_SUBDIR diff --git a/bot_bottle/cli/supervise.py b/bot_bottle/cli/supervise.py index 3eeaeb6..6f341bd 100644 --- a/bot_bottle/cli/supervise.py +++ b/bot_bottle/cli/supervise.py @@ -45,7 +45,7 @@ from ..supervise import ( TOOL_EGRESS_BLOCK, TOOL_GITLEAKS_ALLOW, TOOL_EGRESS_TOKEN_ALLOW, - list_pending_proposals, + list_all_pending_proposals, render_diff, write_audit_entry, write_response, @@ -63,10 +63,9 @@ _REPORT_ONLY_TOOLS: tuple[str, ...] = (TOOL_GITLEAKS_ALLOW, TOOL_EGRESS_TOKEN_AL @dataclass(frozen=True) class QueuedProposal: - """A pending proposal plus the queue dir it was found in.""" + """A pending proposal from the supervise queue.""" proposal: Proposal - queue_dir: Path # Errors any remediation engine may raise. Caught by the TUI key @@ -86,16 +85,11 @@ def apply_routes_change(slug: str, content: str) -> tuple[str, str]: def discover_pending() -> list[QueuedProposal]: - """Walk ~/.bot-bottle/queue/* and collect pending proposals.""" - queue_root = _supervise.bot_bottle_root() / "queue" - if not queue_root.is_dir(): - return [] - out: list[QueuedProposal] = [] - for slug_dir in sorted(queue_root.iterdir()): - if not slug_dir.is_dir(): - continue - for proposal in list_pending_proposals(slug_dir): - out.append(QueuedProposal(proposal=proposal, queue_dir=slug_dir)) + """Collect pending proposals across bottles.""" + out = [ + QueuedProposal(proposal=proposal) + for proposal in list_all_pending_proposals() + ] out.sort(key=lambda q: q.proposal.arrival_timestamp) return out @@ -118,7 +112,6 @@ def _detail_lines( (f"tool: {p.tool}", 0), (f"id: {p.id}", 0), (f"arrived: {p.arrival_timestamp}", 0), - (f"queue: {qp.queue_dir}", 0), ("", 0), ("justification:", 0), ] @@ -165,7 +158,7 @@ def approve( notes=notes, final_file=final_file, ) - write_response(qp.queue_dir, response) + write_response(qp.proposal.bottle_slug, response) _write_audit( qp, action=status, notes=notes, diff_before=diff_before, diff_after=diff_after, @@ -179,7 +172,7 @@ def reject(qp: QueuedProposal, *, reason: str) -> None: notes=reason, final_file=None, ) - write_response(qp.queue_dir, response) + write_response(qp.proposal.bottle_slug, response) _write_audit(qp, action=STATUS_REJECTED, notes=reason, diff_before="", diff_after="") diff --git a/bot_bottle/db_store.py b/bot_bottle/db_store.py new file mode 100644 index 0000000..7cc1849 --- /dev/null +++ b/bot_bottle/db_store.py @@ -0,0 +1,40 @@ +"""Shared SQLite-backed store base class for bot-bottle (PRD 0013).""" + +from __future__ import annotations + +import sqlite3 +from pathlib import Path + +try: + from .migrations import TableMigrations +except ImportError: + from migrations import TableMigrations # type: ignore[import-not-found] # pylint: disable=import-error,no-name-in-module + + +class DbStore: + """Base for SQLite-backed stores. Subclasses resolve db_path then call super().__init__.""" + + def __init__(self, db_path: Path, migrations: TableMigrations) -> None: + self.db_path = db_path + self._migrations = migrations + self.db_path.parent.mkdir(parents=True, exist_ok=True) + self._init() + + def _connect(self) -> sqlite3.Connection: + conn = sqlite3.connect(self.db_path) + conn.row_factory = sqlite3.Row + return conn + + def _init(self) -> None: + with self._connect() as conn: + self._migrations.apply(conn) + self._chmod() + + def _chmod(self) -> None: + try: + self.db_path.chmod(0o600) + except OSError: + pass + + +__all__ = ["DbStore"] diff --git a/bot_bottle/egress_addon.py b/bot_bottle/egress_addon.py index a28094e..d0cf0c7 100644 --- a/bot_bottle/egress_addon.py +++ b/bot_bottle/egress_addon.py @@ -79,14 +79,13 @@ class EgressAddon: # only — a restart re-prompts. Mutated only from the asyncio loop that # runs the addon hooks, so no lock is needed. self.safe_tokens: set[str] = set() - self._supervise_queue_dir = os.environ.get("SUPERVISE_QUEUE_DIR", "").strip() self._supervise_slug = os.environ.get("SUPERVISE_BOTTLE_SLUG", "").strip() self._token_allow_timeout = _token_allow_timeout_from_env(os.environ) self._reload(initial=True) self._install_sighup() def _supervise_available(self) -> bool: - return bool(self._supervise_queue_dir and self._supervise_slug) + return bool(self._supervise_slug) def _reload(self, *, initial: bool = False) -> None: try: @@ -393,9 +392,8 @@ class EgressAddon: justification=_TOKEN_ALLOW_JUSTIFICATION, current_file_hash=_sv.sha256_hex(payload), ) - queue_dir = Path(self._supervise_queue_dir) try: - _sv.write_proposal(queue_dir, proposal) + _sv.write_proposal(proposal) except OSError as e: sys.stderr.write( f"egress: could not queue token-allow proposal: {e}; " @@ -411,8 +409,8 @@ class EgressAddon: **self._req_ctx(flow), }) + "\n") - response = await self._await_token_response(queue_dir, proposal.id) - _sv.archive_proposal(queue_dir, proposal.id) + response = await self._await_token_response(proposal.id) + _sv.archive_proposal(self._supervise_slug, proposal.id) if response is not None and response.status in ( _sv.STATUS_APPROVED, _sv.STATUS_MODIFIED, @@ -439,16 +437,15 @@ class EgressAddon: async def _await_token_response( self, - queue_dir: Path, proposal_id: str, ) -> "_sv.Response | None": - """Poll the queue dir for the operator's response without blocking the + """Poll the DB for the operator's response without blocking the proxy event loop. Returns the Response, or None on timeout.""" loop = asyncio.get_running_loop() deadline = loop.time() + self._token_allow_timeout while True: try: - return _sv.read_response(queue_dir, proposal_id) + return _sv.read_response(self._supervise_slug, proposal_id) except (OSError, ValueError, KeyError): # Not written yet, or a partial/malformed write — retry until # the deadline, then fail closed. diff --git a/bot_bottle/git_gate_render.py b/bot_bottle/git_gate_render.py index 8a442b8..ba11926 100644 --- a/bot_bottle/git_gate_render.py +++ b/bot_bottle/git_gate_render.py @@ -234,13 +234,13 @@ import hashlib import json import os import sys -import uuid from pathlib import Path +from bot_bottle import supervise as _sv + report_path = Path(sys.argv[1]) -queue_dir = os.environ.get("SUPERVISE_QUEUE_DIR", "") slug = os.environ.get("SUPERVISE_BOTTLE_SLUG", "") -if not queue_dir or not slug: +if not slug: sys.exit(2) try: @@ -277,31 +277,19 @@ for i, finding in enumerate(raw, 1): ]) payload = "\n".join(lines).rstrip() + "\n" -proposal_id = str(uuid.uuid4()) -proposal = { - "id": proposal_id, - "bottle_slug": slug, - "tool": "gitleaks-allow", - "proposed_file": payload, - "justification": ( +proposal = _sv.Proposal.new( + bottle_slug=slug, + tool=_sv.TOOL_GITLEAKS_ALLOW, + proposed_file=payload, + justification=( "git-gate found gitleaks findings hidden by # gitleaks:allow; " "approve only for dummy test fixtures or confirmed false positives" ), - "arrival_timestamp": datetime.datetime.now( - datetime.timezone.utc - ).isoformat(), - "current_file_hash": hashlib.sha256(payload.encode("utf-8")).hexdigest(), -} -queue = Path(queue_dir) -queue.mkdir(parents=True, exist_ok=True) -path = queue / f"{proposal_id}.proposal.json" -tmp = path.with_suffix(path.suffix + ".tmp") -with tmp.open("w", encoding="utf-8") as f: - json.dump(proposal, f, indent=2) - f.write("\n") -os.chmod(tmp, 0o600) -os.replace(tmp, path) -print(proposal_id) + current_file_hash=hashlib.sha256(payload.encode("utf-8")).hexdigest(), + now=datetime.datetime.now(datetime.timezone.utc), +) +_sv.write_proposal(proposal) +print(proposal.id) PY ) rc=$? @@ -314,8 +302,7 @@ PY return 1 fi - queue_dir=${SUPERVISE_QUEUE_DIR:-} - response_file="$queue_dir/${proposal_id}.response.json" + slug=${SUPERVISE_BOTTLE_SLUG:-} timeout=${SUPERVISE_GITLEAKS_ALLOW_TIMEOUT_SECONDS:-300} case "$timeout" in ''|*[!0-9]*) @@ -327,26 +314,35 @@ PY echo "git-gate: approve with './cli.py supervise' to continue this push" >&2 waited=0 while [ "$waited" -lt "$timeout" ]; do - if [ -f "$response_file" ]; then - status=$(python3 - "$response_file" <<'PY' -import json + status=$(python3 - "$slug" "$proposal_id" <<'PY' import sys + +from bot_bottle import supervise as _sv + +slug = sys.argv[1] try: - with open(sys.argv[1], encoding="utf-8") as f: - raw = json.load(f) -except (OSError, json.JSONDecodeError): - sys.exit(1) -status = raw.get("status") -if not isinstance(status, str): - sys.exit(1) -print(status) + response = _sv.read_response(slug, sys.argv[2]) +except FileNotFoundError: + sys.exit(2) +print(response.status) PY - ) || status="" + ) + rc=$? + if [ "$rc" -eq 2 ]; then + status="" + elif [ "$rc" -ne 0 ]; then + status="invalid" + fi + if [ -n "$status" ]; then case "$status" in approved|modified) - mkdir -p "$queue_dir/processed" - mv -f "$queue_dir/${proposal_id}.proposal.json" "$queue_dir/processed/" 2>/dev/null || true - mv -f "$queue_dir/${proposal_id}.response.json" "$queue_dir/processed/" 2>/dev/null || true + python3 - "$slug" "$proposal_id" <<'PY' || true +import sys + +from bot_bottle import supervise as _sv + +_sv.archive_proposal(sys.argv[1], sys.argv[2]) +PY echo "git-gate: supervisor approved # gitleaks:allow for $ref" >&2 return 0 ;; @@ -499,4 +495,3 @@ if ! git -C "$repo_dir" rev-parse --verify HEAD >/dev/null 2>&1; then fi exit 0 """ - diff --git a/bot_bottle/migrations.py b/bot_bottle/migrations.py new file mode 100644 index 0000000..83fccf6 --- /dev/null +++ b/bot_bottle/migrations.py @@ -0,0 +1,37 @@ +"""SQLite migration runner for bot-bottle stores.""" + +from __future__ import annotations + +import sqlite3 + + +class TableMigrations: + """Runs a sequential list of DDL migrations tracked by schema_key in schema_versions.""" + + def __init__(self, schema_key: str, migrations: list[str]) -> None: + self.schema_key = schema_key + self.migrations = migrations + + def apply(self, conn: sqlite3.Connection) -> None: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS schema_versions ( + module TEXT PRIMARY KEY, + version INTEGER NOT NULL DEFAULT 0 + ) + """ + ) + row = conn.execute( + "SELECT version FROM schema_versions WHERE module = ?", + (self.schema_key,), + ).fetchone() + version = row[0] if row else 0 + for i, sql in enumerate(self.migrations[version:], start=version + 1): + conn.execute(sql) + conn.execute( + "INSERT OR REPLACE INTO schema_versions (module, version) VALUES (?, ?)", + (self.schema_key, i), + ) + + +__all__ = ["TableMigrations"] diff --git a/bot_bottle/queue_store.py b/bot_bottle/queue_store.py new file mode 100644 index 0000000..76a02da --- /dev/null +++ b/bot_bottle/queue_store.py @@ -0,0 +1,240 @@ +"""SQLite-backed queue store for supervise proposals and responses (PRD 0013).""" + +from __future__ import annotations + +import os +import sqlite3 +from pathlib import Path +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from .supervise import Proposal, Response + +try: + from .db_store import DbStore + from .migrations import TableMigrations +except ImportError: + from db_store import DbStore # type: ignore[import-not-found] # pylint: disable=import-error,no-name-in-module + from migrations import TableMigrations # type: ignore[import-not-found] # pylint: disable=import-error,no-name-in-module + + +def get_supervise_mod() -> object: + """Lazy import of supervise to avoid a circular-import at module init time. + By the time any QueueStore method is called, both modules are fully loaded. + + Mirrors our own module identity: when we are 'queue_store' (sidecar flat + context or tests that inject bot_bottle/ into sys.path) we use the flat + 'supervise' module so that patches on supervise.bot_bottle_root propagate + correctly. When we are 'bot_bottle.queue_store' we use 'bot_bottle.supervise'.""" + import sys + sv_name = "supervise" if __name__ == "queue_store" else "bot_bottle.supervise" + if sv_name in sys.modules: + return sys.modules[sv_name] + try: + import bot_bottle.supervise as _m + except ImportError: + import supervise as _m # type: ignore[import-not-found] # pylint: disable=import-error,no-name-in-module + return _m + + +# One entry per schema version: _MIGRATIONS.migrations[0] brings a fresh DB +# to version 1, [1] to version 2, and so on. Add new migrations at the end; +# never edit existing ones. +_MIGRATIONS = TableMigrations("queue_store", [ + # v1 — proposals table + """ + CREATE TABLE IF NOT EXISTS supervise_proposals ( + queue_key TEXT NOT NULL, + id TEXT NOT NULL, + bottle_slug TEXT NOT NULL, + tool TEXT NOT NULL, + proposed_file TEXT NOT NULL, + justification TEXT NOT NULL, + arrival_timestamp TEXT NOT NULL, + current_file_hash TEXT NOT NULL, + archived INTEGER NOT NULL DEFAULT 0, + PRIMARY KEY (queue_key, id) + ) + """, + # v2 — responses table + """ + CREATE TABLE IF NOT EXISTS supervise_responses ( + queue_key TEXT NOT NULL, + proposal_id TEXT NOT NULL, + status TEXT NOT NULL, + notes TEXT NOT NULL, + final_file TEXT, + archived INTEGER NOT NULL DEFAULT 0, + PRIMARY KEY (queue_key, proposal_id) + ) + """, +]) + + +class QueueStore(DbStore): + """SQLite-backed persistent store for supervise proposals and responses.""" + + def __init__(self, queue_key: str, db_path: Path | None = None) -> None: + self.queue_key = queue_key + if db_path is not None: + resolved = db_path + else: + # In the sidecar container SUPERVISE_DB_PATH points at the + # bind-mounted host DB. On the host this env var is never set, + # so we always fall through to host_db_path(). + env_path = os.environ.get("SUPERVISE_DB_PATH", "").strip() + resolved = Path(env_path) if env_path else get_supervise_mod().host_db_path() # type: ignore[attr-defined] + super().__init__(resolved, _MIGRATIONS) + + def write_proposal(self, proposal: Proposal) -> Path: + with self._connect() as conn: + conn.execute( + """ + INSERT OR REPLACE INTO supervise_proposals ( + queue_key, id, bottle_slug, tool, proposed_file, justification, + arrival_timestamp, current_file_hash, archived + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, 0) + """, + ( + self.queue_key, + proposal.id, + proposal.bottle_slug, + proposal.tool, + proposal.proposed_file, + proposal.justification, + proposal.arrival_timestamp, + proposal.current_file_hash, + ), + ) + self._chmod() + return self.db_path + + def read_proposal(self, proposal_id: str) -> Proposal: + with self._connect() as conn: + row = conn.execute( + """ + SELECT * FROM supervise_proposals + WHERE queue_key = ? AND id = ? AND archived = 0 + """, + (self.queue_key, proposal_id), + ).fetchone() + if row is None: + raise FileNotFoundError(proposal_id) + return self._row_to_proposal(row) + + def list_pending_proposals(self) -> list[Proposal]: + if not self.db_path.is_file(): + return [] + with self._connect() as conn: + rows = conn.execute( + """ + SELECT p.* FROM supervise_proposals p + WHERE p.archived = 0 + AND p.queue_key = ? + AND NOT EXISTS ( + SELECT 1 FROM supervise_responses r + WHERE r.queue_key = p.queue_key + AND r.proposal_id = p.id + AND r.archived = 0 + ) + ORDER BY p.arrival_timestamp, p.id + """, + (self.queue_key,), + ).fetchall() + return [self._row_to_proposal(row) for row in rows] + + def list_all_pending_proposals(self) -> list[Proposal]: + if not self.db_path.is_file(): + return [] + with self._connect() as conn: + rows = conn.execute( + """ + SELECT p.* FROM supervise_proposals p + WHERE p.archived = 0 + AND NOT EXISTS ( + SELECT 1 FROM supervise_responses r + WHERE r.queue_key = p.queue_key + AND r.proposal_id = p.id + AND r.archived = 0 + ) + ORDER BY p.arrival_timestamp, p.id + """ + ).fetchall() + return [self._row_to_proposal(row) for row in rows] + + def write_response(self, response: Response) -> Path: + with self._connect() as conn: + conn.execute( + """ + INSERT OR REPLACE INTO supervise_responses ( + queue_key, proposal_id, status, notes, final_file, archived + ) VALUES (?, ?, ?, ?, ?, 0) + """, + ( + self.queue_key, + response.proposal_id, + response.status, + response.notes, + response.final_file, + ), + ) + self._chmod() + return self.db_path + + def read_response(self, proposal_id: str) -> Response: + with self._connect() as conn: + row = conn.execute( + """ + SELECT * FROM supervise_responses + WHERE queue_key = ? AND proposal_id = ? AND archived = 0 + """, + (self.queue_key, proposal_id), + ).fetchone() + if row is None: + raise FileNotFoundError(proposal_id) + return self._row_to_response(row) + + def archive_proposal(self, proposal_id: str) -> None: + if not self.db_path.is_file(): + return + with self._connect() as conn: + conn.execute( + """ + UPDATE supervise_proposals SET archived = 1 + WHERE queue_key = ? AND id = ? + """, + (self.queue_key, proposal_id), + ) + conn.execute( + """ + UPDATE supervise_responses SET archived = 1 + WHERE queue_key = ? AND proposal_id = ? + """, + (self.queue_key, proposal_id), + ) + + @staticmethod + def _row_to_proposal(row: sqlite3.Row) -> Proposal: + m = get_supervise_mod() + return m.Proposal( # type: ignore[attr-defined] + id=row["id"], + bottle_slug=row["bottle_slug"], + tool=row["tool"], + proposed_file=row["proposed_file"], + justification=row["justification"], + arrival_timestamp=row["arrival_timestamp"], + current_file_hash=row["current_file_hash"], + ) + + @staticmethod + def _row_to_response(row: sqlite3.Row) -> Response: + m = get_supervise_mod() + return m.Response( # type: ignore[attr-defined] + proposal_id=row["proposal_id"], + status=row["status"], + notes=row["notes"], + final_file=row["final_file"], + ) + + +__all__ = ["QueueStore"] diff --git a/bot_bottle/supervise.py b/bot_bottle/supervise.py index 12d4b6c..ba00b50 100644 --- a/bot_bottle/supervise.py +++ b/bot_bottle/supervise.py @@ -9,15 +9,14 @@ calls when it needs an operator-reviewed egress change: Each tool call: the agent passes the full proposed file plus a justification text. The sidecar validates the proposal syntactically, -writes it to the host's per-bottle queue dir, and holds the tool-call +writes it to the host SQLite queue table, and holds the tool-call connection open. The operator's supervise TUI (bot_bottle.cli.supervise) sees the proposal, accepts -approve / modify / reject, and writes a response file alongside the -proposal. The sidecar sees the response and returns `{status, notes}` -to the agent. +approve / modify / reject, and writes a response row. The sidecar sees +the response and returns `{status, notes}` to the agent. This module defines the host-side library: dataclasses for the queue -file shapes, queue read/write helpers, the audit log writer, and the +record shapes, queue read/write helpers, the audit log writer, and the diff renderer. The in-container sidecar lives in bot_bottle/supervise_server.py; the supervise daemon's container lifecycle is owned by the sidecar bundle (PRD 0024). @@ -34,8 +33,6 @@ from __future__ import annotations import dataclasses import difflib import hashlib -import json -import os import time import uuid from abc import ABC @@ -86,8 +83,9 @@ STATUSES: tuple[str, ...] = (STATUS_APPROVED, STATUS_MODIFIED, STATUS_REJECTED) # `routes edit ` verb writes entries with this action. ACTION_OPERATOR_EDIT = "operator-edit" -QUEUE_DIR_IN_CONTAINER = "/run/supervise/queue" +DB_PATH_IN_CONTAINER = "/run/supervise/bot-bottle.db" DEFAULT_POLL_INTERVAL_SEC = 0.5 +HOST_DB_FILENAME = "bot-bottle.db" # --- Paths ----------------------------------------------------------------- @@ -97,10 +95,6 @@ def bot_bottle_root() -> Path: return Path.home() / ".bot-bottle" -def queue_dir_for_slug(slug: str) -> Path: - return bot_bottle_root() / "queue" / slug - - def audit_dir() -> Path: return bot_bottle_root() / "audit" @@ -109,14 +103,16 @@ def audit_log_path(component: str, slug: str) -> Path: return audit_dir() / f"{component}-{slug}.log" +def host_db_path() -> Path: + return bot_bottle_root() / HOST_DB_FILENAME + + # --- Dataclasses ----------------------------------------------------------- @dataclass(frozen=True) class Proposal: - """One pending tool-call from the agent. The sidecar writes one - of these to the queue dir on a tool call; the operator's TUI - reads them; the sidecar polls for a matching Response.""" + """One pending tool-call from the agent.""" id: str bottle_slug: str @@ -170,7 +166,7 @@ class Proposal: @dataclass(frozen=True) class Response: """The operator's decision on a proposal. The TUI writes one of - these to the queue dir; the sidecar reads it and returns the + these to the queue table; the sidecar reads it and returns the `{status, notes}` pair to the agent's tool call. `final_file` carries the file content the supervisor will @@ -223,90 +219,50 @@ class AuditEntry: return dataclasses.asdict(self) +try: + from .queue_store import QueueStore + from .audit_store import AuditStore +except ImportError: + # Sidecar bundle: files are flat-copied under /app, not a package. + from queue_store import QueueStore # type: ignore[import-not-found] # pylint: disable=import-error,no-name-in-module + from audit_store import AuditStore # type: ignore[import-not-found] # pylint: disable=import-error,no-name-in-module + + # --- Queue I/O ------------------------------------------------------------- -def _proposal_filename(proposal_id: str) -> str: - return f"{proposal_id}.proposal.json" - - -def _response_filename(proposal_id: str) -> str: - return f"{proposal_id}.response.json" - - -def _id_from_proposal_filename(path: Path) -> str | None: - name = path.name - if not name.endswith(".proposal.json"): - return None - return name[: -len(".proposal.json")] - - -def write_proposal(queue_dir: Path, proposal: Proposal) -> Path: - """Persist `proposal` as JSON in the queue dir, mode 0o600. +def write_proposal(proposal: Proposal) -> Path: + """Persist `proposal` in the queue database, mode 0o600. Directory is created if missing.""" - queue_dir.mkdir(parents=True, exist_ok=True) - path = queue_dir / _proposal_filename(proposal.id) - payload = json.dumps(proposal.to_dict(), indent=2) + "\n" - _atomic_write(path, payload, mode=0o600) - return path + return QueueStore(proposal.bottle_slug).write_proposal(proposal) -def read_proposal(queue_dir: Path, proposal_id: str) -> Proposal: - path = queue_dir / _proposal_filename(proposal_id) - with path.open() as f: - raw = json.load(f) - if not isinstance(raw, dict): - raise ValueError(f"{path}: top-level must be an object") - return Proposal.from_dict(raw) +def read_proposal(bottle_slug: str, proposal_id: str) -> Proposal: + return QueueStore(bottle_slug).read_proposal(proposal_id) -def list_pending_proposals(queue_dir: Path) -> list[Proposal]: - """All proposals in `queue_dir` that do not yet have a matching - response file. Sorted by `arrival_timestamp` so the operator +def list_pending_proposals(bottle_slug: str) -> list[Proposal]: + """All proposals for `bottle_slug` that do not yet have a matching + response. Sorted by `arrival_timestamp` so the operator sees the queue FIFO.""" - if not queue_dir.is_dir(): - return [] - out: list[Proposal] = [] - for path in sorted(queue_dir.glob("*.proposal.json")): - proposal_id = _id_from_proposal_filename(path) - if proposal_id is None: - continue - if (queue_dir / _response_filename(proposal_id)).exists(): - continue - try: - with path.open() as f: - raw = json.load(f) - except (OSError, json.JSONDecodeError): - continue - if not isinstance(raw, dict): - continue - try: - out.append(Proposal.from_dict(raw)) - except (KeyError, ValueError): - continue - out.sort(key=lambda p: p.arrival_timestamp) - return out + return QueueStore(bottle_slug).list_pending_proposals() -def write_response(queue_dir: Path, response: Response) -> Path: - queue_dir.mkdir(parents=True, exist_ok=True) - path = queue_dir / _response_filename(response.proposal_id) - payload = json.dumps(response.to_dict(), indent=2) + "\n" - _atomic_write(path, payload, mode=0o600) - return path +def list_all_pending_proposals() -> list[Proposal]: + """All pending proposals across bottles, sorted FIFO.""" + return QueueStore("").list_all_pending_proposals() -def read_response(queue_dir: Path, proposal_id: str) -> Response: - path = queue_dir / _response_filename(proposal_id) - with path.open() as f: - raw = json.load(f) - if not isinstance(raw, dict): - raise ValueError(f"{path}: top-level must be an object") - return Response.from_dict(raw) +def write_response(bottle_slug: str, response: Response) -> Path: + return QueueStore(bottle_slug).write_response(response) + + +def read_response(bottle_slug: str, proposal_id: str) -> Response: + return QueueStore(bottle_slug).read_response(proposal_id) def wait_for_response( - queue_dir: Path, + bottle_slug: str, proposal_id: str, *, poll_interval: float = DEFAULT_POLL_INTERVAL_SEC, @@ -317,90 +273,35 @@ def wait_for_response( which the wait raises TimeoutError. None waits forever — the natural shape, since the operator's response time is unbounded. - Polls the filesystem so the implementation stays portable and - stdlib-only.""" - path = queue_dir / _response_filename(proposal_id) + Polls SQLite so the implementation stays portable and stdlib-only.""" + store = QueueStore(bottle_slug) while True: - if path.exists(): - try: - with path.open() as f: - raw = json.load(f) - except (OSError, json.JSONDecodeError): - raw = None - if isinstance(raw, dict): - try: - return Response.from_dict(raw) - except (KeyError, ValueError): - pass + try: + return store.read_response(proposal_id) + except FileNotFoundError: + pass if deadline is not None and time.monotonic() >= deadline: raise TimeoutError(f"no response for proposal {proposal_id!r}") time.sleep(poll_interval) -def archive_proposal(queue_dir: Path, proposal_id: str) -> None: - """Move both proposal and response files to `/processed/`. - Idempotent — missing files are silently skipped.""" - processed = queue_dir / "processed" - processed.mkdir(parents=True, exist_ok=True) - for name in (_proposal_filename(proposal_id), _response_filename(proposal_id)): - src = queue_dir / name - if src.exists(): - src.rename(processed / name) +def archive_proposal(bottle_slug: str, proposal_id: str) -> None: + """Mark both proposal and response rows processed. + Idempotent — missing rows are silently skipped.""" + QueueStore(bottle_slug).archive_proposal(proposal_id) # --- Audit log ------------------------------------------------------------- def write_audit_entry(entry: AuditEntry) -> Path: - """Append `entry` as one JSON-Lines record to the per-bottle - audit log. Acquires an advisory exclusive lock so concurrent - writers don't interleave bytes.""" - path = audit_log_path(entry.component, entry.bottle_slug) - path.parent.mkdir(parents=True, exist_ok=True) - line = json.dumps(entry.to_dict(), sort_keys=False) + "\n" - fd = os.open(path, os.O_WRONLY | os.O_APPEND | os.O_CREAT, 0o600) - try: - _try_flock(fd) - try: - os.write(fd, line.encode("utf-8")) - finally: - _try_funlock(fd) - finally: - os.close(fd) - return path + """Append `entry` to the host supervise audit table.""" + return AuditStore().write_audit_entry(entry) def read_audit_entries(component: str, slug: str) -> list[AuditEntry]: - """Load all audit entries for the given component+slug. Empty - list if the log doesn't exist.""" - path = audit_log_path(component, slug) - if not path.is_file(): - return [] - out: list[AuditEntry] = [] - with path.open() as f: - for raw_line in f: - raw_line = raw_line.strip() - if not raw_line: - continue - try: - raw = json.loads(raw_line) - except json.JSONDecodeError: - continue - if not isinstance(raw, dict): - continue - try: - out.append(AuditEntry( - timestamp=_require_str(raw, "timestamp"), - bottle_slug=_require_str(raw, "bottle_slug"), - component=_require_str(raw, "component"), - operator_action=_require_str(raw, "operator_action"), - operator_notes=_require_str(raw, "operator_notes"), - justification=_require_str(raw, "justification"), - diff=_require_str(raw, "diff"), - )) - except ValueError: - continue - return out + """Load all audit entries for the given component+slug.""" + return AuditStore().read_audit_entries(component, slug) # --- Diff rendering -------------------------------------------------------- @@ -433,35 +334,34 @@ def sha256_hex(content: str) -> str: class SupervisePlan: """Output of Supervise.prepare; consumed by .start. - `queue_dir` is the host directory bind-mounted into the sidecar - at /run/supervise/queue. `internal_network` is empty at prepare - time; the backend's launch step fills it via dataclasses.replace - before calling .start.""" + `db_path` is the host database bind-mounted into the sidecar at + /run/supervise/bot-bottle.db. `internal_network` is empty at + prepare time; the backend's launch step fills it via + dataclasses.replace before calling .start.""" slug: str - queue_dir: Path + db_path: Path internal_network: str = "" class Supervise(ABC): - """Per-bottle supervise sidecar. Encapsulates the host-side - prepare (queue dir staging); the sidecar's start/stop lifecycle - is backend-specific.""" + """Per-bottle supervise sidecar. Encapsulates host-side database + staging; the sidecar's start/stop lifecycle is backend-specific.""" def prepare( self, slug: str, stage_dir: Path, ) -> SupervisePlan: - """Stage the per-bottle queue dir on the host. Returns the - plan; `internal_network` must be set by the launch step before - .start runs.""" + """Stage the host database. Returns the plan; `internal_network` + must be set by the launch step before .start runs.""" del stage_dir - queue_dir = queue_dir_for_slug(slug) - queue_dir.mkdir(parents=True, exist_ok=True) + db_path = host_db_path() + QueueStore(slug) + AuditStore(db_path) return SupervisePlan( slug=slug, - queue_dir=queue_dir, + db_path=db_path, ) # --- Helpers --------------------------------------------------------------- @@ -474,47 +374,15 @@ def _require_str(raw: dict[str, object], key: str) -> str: return value -def _atomic_write(path: Path, content: str, *, mode: int) -> None: - """Atomic: write to a sibling tmp file, fsync, rename.""" - tmp = path.with_suffix(path.suffix + ".tmp") - fd = os.open(tmp, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, mode) - try: - os.write(fd, content.encode("utf-8")) - os.fsync(fd) - finally: - os.close(fd) - os.replace(tmp, path) - - -try: - import fcntl as _fcntl - - def _try_flock(fd: int) -> None: # type: ignore[reportRedeclaration] - try: - _fcntl.flock(fd, _fcntl.LOCK_EX) - except OSError: - pass - - def _try_funlock(fd: int) -> None: # type: ignore[reportRedeclaration] - try: - _fcntl.flock(fd, _fcntl.LOCK_UN) - except OSError: - pass -except ImportError: # pragma: no cover — Windows path - def _try_flock(fd: int) -> None: # noqa: F841 — Windows fallback - return None - - def _try_funlock(fd: int) -> None: # noqa: F841 — Windows fallback - return None - - __all__ = [ "ACTION_OPERATOR_EDIT", "AuditEntry", + "AuditStore", "COMPONENT_FOR_TOOL", "DEFAULT_POLL_INTERVAL_SEC", + "DB_PATH_IN_CONTAINER", "Proposal", - "QUEUE_DIR_IN_CONTAINER", + "QueueStore", "Response", "STATUSES", "STATUS_APPROVED", @@ -536,8 +404,9 @@ __all__ = [ "audit_dir", "audit_log_path", "bot_bottle_root", + "host_db_path", "list_pending_proposals", - "queue_dir_for_slug", + "list_all_pending_proposals", "read_audit_entries", "read_proposal", "read_response", diff --git a/bot_bottle/supervise_server.py b/bot_bottle/supervise_server.py index 5fca484..089e106 100644 --- a/bot_bottle/supervise_server.py +++ b/bot_bottle/supervise_server.py @@ -7,14 +7,13 @@ config changes when stuck. The tools are `egress-allow`, Each queued tool call: 1. Validates the proposed file syntactically. - 2. Writes a Proposal to /run/supervise/queue/ (bind-mounted from - the host's ~/.bot-bottle/queue//). - 3. Blocks polling for a matching Response file. + 2. Writes a Proposal to the host SQLite database. + 3. Blocks polling for a matching Response row. 4. Returns the operator's `{status, notes}` to the agent. The bottle slug arrives via SUPERVISE_BOTTLE_SLUG env (stamped at -container creation by the backend's start step). The queue dir comes -from SUPERVISE_QUEUE_DIR (default `/run/supervise/queue`). +container creation by the backend's start step). SUPERVISE_DB_PATH +points at the bind-mounted host database. Speaks MCP over HTTP+JSON-RPC. Methods handled: @@ -42,7 +41,6 @@ import typing import urllib.error import urllib.request from dataclasses import dataclass -from pathlib import Path try: # Same-directory imports inside the bundle container; these files are @@ -277,7 +275,6 @@ def validate_proposed_file(tool: str, content: str) -> None: @dataclass(frozen=True) class ServerConfig: bottle_slug: str - queue_dir: Path response_timeout_seconds: float = DEFAULT_RESPONSE_TIMEOUT_SECONDS @@ -376,7 +373,7 @@ def handle_tools_call( current_file_hash=_sv.sha256_hex(proposed_file), ) try: - _sv.write_proposal(config.queue_dir, proposal) + _sv.write_proposal(proposal) except OSError as e: raise _RpcInternalError(f"failed to write proposal to queue: {e}") from e sys.stderr.write( @@ -387,7 +384,7 @@ def handle_tools_call( deadline = time.monotonic() + config.response_timeout_seconds try: response = _sv.wait_for_response( - config.queue_dir, + config.bottle_slug, proposal.id, poll_interval=MIN_RESPONSE_POLL_INTERVAL_SECONDS, deadline=deadline, @@ -399,7 +396,7 @@ def handle_tools_call( "isError": False, } try: - _sv.archive_proposal(config.queue_dir, proposal.id) + _sv.archive_proposal(config.bottle_slug, proposal.id) except OSError as e: raise _RpcInternalError(f"failed to archive proposal: {e}") from e @@ -539,7 +536,7 @@ class MCPHandler(http.server.BaseHTTPRequestHandler): class MCPServer(socketserver.ThreadingMixIn, http.server.HTTPServer): allow_reuse_address = True daemon_threads = True - config: ServerConfig = ServerConfig(bottle_slug="", queue_dir=Path()) + config: ServerConfig = ServerConfig(bottle_slug="") # --- Entry point ----------------------------------------------------------- @@ -548,21 +545,18 @@ class MCPServer(socketserver.ThreadingMixIn, http.server.HTTPServer): def serve( *, bottle_slug: str, - queue_dir: Path, port: int = _sv.SUPERVISE_PORT, bind: str = "0.0.0.0", response_timeout_seconds: float = DEFAULT_RESPONSE_TIMEOUT_SECONDS, ) -> typing.NoReturn: - queue_dir.mkdir(parents=True, exist_ok=True) server = MCPServer((bind, port), MCPHandler) server.config = ServerConfig( bottle_slug=bottle_slug, - queue_dir=queue_dir, response_timeout_seconds=response_timeout_seconds, ) sys.stderr.write( f"supervise listening on {bind}:{port}; " - f"slug={bottle_slug!r}; queue={queue_dir}; " + f"slug={bottle_slug!r}; " f"tools: {', '.join(t['name'] for t in TOOL_DEFINITIONS)}\n" # type: ignore[arg-type] ) sys.stderr.flush() @@ -581,7 +575,6 @@ def main(argv: list[str]) -> int: if not bottle_slug: sys.stderr.write("supervise: SUPERVISE_BOTTLE_SLUG env is unset\n") return 2 - queue_dir = Path(os.environ.get("SUPERVISE_QUEUE_DIR", _sv.QUEUE_DIR_IN_CONTAINER)) port = int(os.environ.get("SUPERVISE_PORT", str(_sv.SUPERVISE_PORT))) bind = os.environ.get("SUPERVISE_BIND", "0.0.0.0") try: @@ -591,7 +584,6 @@ def main(argv: list[str]) -> int: return 2 serve( bottle_slug=bottle_slug, - queue_dir=queue_dir, port=port, bind=bind, response_timeout_seconds=response_timeout_seconds, diff --git a/docs/prds/prd-new-sqlite-local-storage.md b/docs/prds/prd-new-sqlite-local-storage.md new file mode 100644 index 0000000..1055a8a --- /dev/null +++ b/docs/prds/prd-new-sqlite-local-storage.md @@ -0,0 +1,135 @@ +# PRD prd-new: SQLite local storage + +- **Status:** Active +- **Author:** codex +- **Created:** 2026-07-01 +- **Issue:** #319 + +## Summary + +Add a small stdlib SQLite storage layer for bot-bottle host runtime state, +starting with the supervise queue and audit log. This replaces scattered JSON +queue files and JSONL audit logs with structured tables while preserving the +existing public supervise helper functions and sidecar queue mount contract. + +## Problem + +Bot-bottle currently stores supervise proposals and responses as individual JSON +files under `~/.bot-bottle/queue//`, and audit entries as JSONL files +under `~/.bot-bottle/audit/`. That worked for the original interactive TUI, but +new forge-native orchestration needs durable, queryable local state for queues, +audit trails, watchdogs, and lifecycle records. PR #318 started introducing +SQLite-shaped boilerplate for forge state; the storage foundation should live in +its own PR so forge work can build on the shared runtime store instead of adding +one-off persistence. + +## Goals / Success Criteria + +1. Supervise proposals and responses are persisted through SQLite. +2. Audit entries are persisted through SQLite. +3. Supervise queue helpers use the bottle slug / queue key instead of a queue + directory path. +4. The sidecar receives the host database mount across docker, smolmachines, + and macOS-container backends. +5. The implementation stays stdlib-only. +6. Schema migrations use a `PRAGMA user_version` runner — no third-party deps. +7. Unit tests cover queue round-trips, pending discovery, response waits, + archive semantics, audit round-trips, and path creation. + +## Non-goals + +- Migrating old JSON queue files or JSONL audit logs. +- Adding forge orchestration state tables. +- Adding egress metering or budget tables. +- Changing the supervise TUI workflow or remediation behavior. +- Introducing a third-party ORM or migration library. + +## Design + +### Database locations + +Queue and audit state use the host-level local database: + +```text +~/.bot-bottle/bot-bottle.db +``` + +The supervise sidecar receives that database as a writable bind mount at +`/run/supervise/bot-bottle.db` and gets the path through `SUPERVISE_DB_PATH`. +No per-slug queue directory is mounted into the sidecar. This creates the shared +host database that later forge/native lifecycle work can extend in separate +PRDs. + +### Tables + +`supervise_proposals` lives in the host database: + +```sql +CREATE TABLE supervise_proposals ( + queue_key TEXT NOT NULL, + id TEXT NOT NULL, + bottle_slug TEXT NOT NULL, + tool TEXT NOT NULL, + proposed_file TEXT NOT NULL, + justification TEXT NOT NULL, + arrival_timestamp TEXT NOT NULL, + current_file_hash TEXT NOT NULL, + archived INTEGER NOT NULL DEFAULT 0, + PRIMARY KEY (queue_key, id) +); +``` + +`supervise_responses` lives in the host database: + +```sql +CREATE TABLE supervise_responses ( + queue_key TEXT NOT NULL, + proposal_id TEXT NOT NULL, + status TEXT NOT NULL, + notes TEXT NOT NULL, + final_file TEXT, + archived INTEGER NOT NULL DEFAULT 0, + PRIMARY KEY (queue_key, proposal_id) +); +``` + +`supervise_audit_entries` lives in the host database: + +```sql +CREATE TABLE supervise_audit_entries ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL, + bottle_slug TEXT NOT NULL, + component TEXT NOT NULL, + operator_action TEXT NOT NULL, + operator_notes TEXT NOT NULL, + justification TEXT NOT NULL, + diff TEXT NOT NULL +); +``` + +### Compatibility + +The queue helpers take a bottle slug / queue key and perform equivalent +operations against `~/.bot-bottle/bot-bottle.db`: + +- `list_pending_proposals` returns non-archived proposals without a non-archived + response, sorted by arrival time. +- `archive_proposal` marks matching proposal/response rows archived instead of + moving files into `processed/`. +- `wait_for_response` keeps the current polling behavior but polls SQLite. + +The old audit path helpers (`audit_dir`, `audit_log_path`) stay available for +compatibility. `audit_log_path` no longer describes the active storage location; +callers should use `read_audit_entries`. + +## Implementation chunks + +1. Add SQLite store helpers for supervise queue and audit state. +2. Rewire `bot_bottle.supervise` queue/audit functions to the store. +3. Update supervise CLI discovery tests and queue/audit unit tests. +4. Run unit tests, pyright, and pylint for touched modules. + +## Open questions + +None. diff --git a/tests/unit/test_compose.py b/tests/unit/test_compose.py index 9f75be5..fa83cb3 100644 --- a/tests/unit/test_compose.py +++ b/tests/unit/test_compose.py @@ -107,7 +107,7 @@ def _egress_plan( def _supervise_plan() -> SupervisePlan: return SupervisePlan( slug=SLUG, - queue_dir=STATE / "supervise" / "queue", + db_path=STATE / "bot-bottle.db", internal_network=f"bot-bottle-net-{SLUG}", ) @@ -392,7 +392,7 @@ class TestSidecarBundleShape(unittest.TestCase): sc = self._render(supervise=True)["services"]["sidecars"] env_strings = sc["environment"] self.assertIn(f"SUPERVISE_BOTTLE_SLUG={SLUG}", env_strings) - self.assertTrue(any(e.startswith("SUPERVISE_QUEUE_DIR=") for e in env_strings)) + self.assertIn("SUPERVISE_DB_PATH=/run/supervise/bot-bottle.db", env_strings) self.assertTrue(any(e.startswith("SUPERVISE_PORT=") for e in env_strings)) def test_volumes_always_includes_egress_ca(self): @@ -408,8 +408,7 @@ class TestSidecarBundleShape(unittest.TestCase): self.assertIn("/etc/egress", targets) self.assertIn("/git-gate-entrypoint.sh", targets) self.assertIn("/git-gate/creds/upstream-known_hosts", targets) - self.assertTrue(any("supervise/queue" in t or t.startswith("/run/supervise") - for t in targets)) + self.assertIn("/run/supervise/bot-bottle.db", targets) def test_extra_hosts_omitted_for_git_upstreams(self): sc = self._render(with_git=True)["services"]["sidecars"] diff --git a/tests/unit/test_contrib_claude_provider.py b/tests/unit/test_contrib_claude_provider.py index 91bf73e..0e8589b 100644 --- a/tests/unit/test_contrib_claude_provider.py +++ b/tests/unit/test_contrib_claude_provider.py @@ -74,7 +74,7 @@ def _plan( if supervise: supervise_plan = SupervisePlan( slug="demo-abc12", - queue_dir=Path("/tmp/queue"), + db_path=Path("/tmp/bot-bottle.db"), ) return DockerBottlePlan( spec=spec, diff --git a/tests/unit/test_contrib_codex_provider.py b/tests/unit/test_contrib_codex_provider.py index 3d3cf1c..3ccfbfb 100644 --- a/tests/unit/test_contrib_codex_provider.py +++ b/tests/unit/test_contrib_codex_provider.py @@ -77,7 +77,7 @@ def _plan( if supervise: supervise_plan = SupervisePlan( slug="demo-abc12", - queue_dir=Path("/tmp/queue"), + db_path=Path("/tmp/bot-bottle.db"), ) return DockerBottlePlan( spec=spec, diff --git a/tests/unit/test_egress_addon_log_redaction.py b/tests/unit/test_egress_addon_log_redaction.py index 87290c3..dbff136 100644 --- a/tests/unit/test_egress_addon_log_redaction.py +++ b/tests/unit/test_egress_addon_log_redaction.py @@ -47,7 +47,6 @@ def _addon() -> EgressAddon: a: EgressAddon = EgressAddon.__new__(EgressAddon) a.config = Config(routes=(), log=LOG_FULL) a.safe_tokens = set() - a._supervise_queue_dir = "" a._supervise_slug = "" a._token_allow_timeout = 300.0 return a diff --git a/tests/unit/test_egress_addon_request_flow.py b/tests/unit/test_egress_addon_request_flow.py index f374dd5..ed8d8b2 100644 --- a/tests/unit/test_egress_addon_request_flow.py +++ b/tests/unit/test_egress_addon_request_flow.py @@ -212,7 +212,6 @@ def _addon(config: Config) -> EgressAddon: a: EgressAddon = EgressAddon.__new__(EgressAddon) a.config = config a.safe_tokens = set() - a._supervise_queue_dir = "" a._supervise_slug = "" a._token_allow_timeout = 300.0 a.routes_path = "/nonexistent/routes.yaml" @@ -386,10 +385,10 @@ def _fake_sv(response_status: str | None) -> types.SimpleNamespace: def _sha256_hex(_payload: Any) -> str: return "hash" - def _noop(_a: Any, _b: Any) -> None: + def _noop(*_args: Any) -> None: return None - def _read_response(_qd: Any, _pid: Any) -> Any: + def _read_response(_slug: Any, _pid: Any) -> Any: if response_status is None: raise OSError("not written yet") # forces poll -> timeout return types.SimpleNamespace(status=response_status) @@ -409,7 +408,6 @@ def _fake_sv(response_status: str | None) -> types.SimpleNamespace: class TestSuperviseBranch(unittest.TestCase): def _supervised_addon(self) -> EgressAddon: addon = _addon(Config(routes=(Route(host="api.example.com"),))) - addon._supervise_queue_dir = "/tmp/egress-queue" addon._supervise_slug = "test-bottle" addon._token_allow_timeout = 0.05 return addon @@ -632,14 +630,13 @@ class TestRedactSurfaces(unittest.TestCase): class TestSuperviseWriteFailure(unittest.TestCase): def test_write_proposal_oserror_blocks(self) -> None: addon = _addon(Config(routes=(Route(host="api.example.com"),))) - addon._supervise_queue_dir = "/tmp/egress-queue" addon._supervise_slug = "test-bottle" addon._token_allow_timeout = 0.05 flow = _Flow(_Request(host="api.example.com", method="POST", body=f"k={_OPENAI_KEY}")) fake = _fake_sv("approved") - def _raise(_qd: Any, _p: Any) -> None: + def _raise(_p: Any) -> None: raise OSError("disk full") fake.write_proposal = _raise diff --git a/tests/unit/test_git_gate.py b/tests/unit/test_git_gate.py index be6fc32..6cc0a99 100644 --- a/tests/unit/test_git_gate.py +++ b/tests/unit/test_git_gate.py @@ -210,8 +210,9 @@ class TestHookRender(unittest.TestCase): # the suppressed findings for human approval. self.assertIn("--ignore-gitleaks-allow", hook) self.assertIn("--report-format=json", hook) - self.assertIn('"tool": "gitleaks-allow"', hook) - self.assertIn("SUPERVISE_QUEUE_DIR", hook) + self.assertIn("tool=_sv.TOOL_GITLEAKS_ALLOW", hook) + self.assertIn("_sv.write_proposal", hook) + self.assertIn("_sv.read_response", hook) self.assertIn("SUPERVISE_BOTTLE_SLUG", hook) self.assertIn("supervisor approved # gitleaks:allow", hook) self.assertIn("supervisor rejected # gitleaks:allow", hook) diff --git a/tests/unit/test_macos_container_launch.py b/tests/unit/test_macos_container_launch.py index 3e1038e..4d1ecb0 100644 --- a/tests/unit/test_macos_container_launch.py +++ b/tests/unit/test_macos_container_launch.py @@ -71,7 +71,9 @@ def _plan( else: git_gate_plan = SimpleNamespace(upstreams=()) supervise_plan = ( - SimpleNamespace(queue_dir=Path("/state/supervise/queue")) + SimpleNamespace( + db_path=Path("/state/bot-bottle.db"), + ) if supervise else None ) agent_provision = SimpleNamespace( @@ -137,7 +139,7 @@ class TestMacosContainerLaunchArgv(unittest.TestCase): argv, ) self.assertIn( - "type=bind,source=/state/supervise/queue,target=/run/supervise/queue", + "type=bind,source=/state/bot-bottle.db,target=/run/supervise/bot-bottle.db", argv, ) diff --git a/tests/unit/test_smolmachines_provision.py b/tests/unit/test_smolmachines_provision.py index d6b28c5..3c7c25e 100644 --- a/tests/unit/test_smolmachines_provision.py +++ b/tests/unit/test_smolmachines_provision.py @@ -130,7 +130,7 @@ def _plan( if supervise: supervise_plan = SupervisePlan( slug="demo-abc12", - queue_dir=Path("/tmp/queue"), + db_path=Path("/tmp/bot-bottle.db"), ) return SmolmachinesBottlePlan( spec=spec, @@ -422,6 +422,14 @@ class TestBundleLaunchSpec(unittest.TestCase): spec.environment, ) + def test_supervise_adds_daemon_volume_and_env(self): + from bot_bottle.supervise import DB_PATH_IN_CONTAINER + plan = _plan(supervise=True) + spec = _bundle_launch_spec(plan, "net", "127.0.0.16") + self.assertIn("supervise", spec.daemons_csv) + self.assertIn(f"SUPERVISE_DB_PATH={DB_PATH_IN_CONTAINER}", spec.environment) + self.assertIn(("/tmp/bot-bottle.db", DB_PATH_IN_CONTAINER, False), spec.volumes) + def test_canary_env_visible_to_smolvm_guest(self): plan = _plan(canary=True) with patch.object( diff --git a/tests/unit/test_supervise.py b/tests/unit/test_supervise.py index dfe8bd4..f41ceba 100644 --- a/tests/unit/test_supervise.py +++ b/tests/unit/test_supervise.py @@ -1,6 +1,5 @@ """Unit: supervise queue + audit log + diff helpers (PRD 0013).""" -import json import tempfile import threading import time @@ -19,7 +18,7 @@ from bot_bottle.supervise import ( TOOL_EGRESS_ALLOW, TOOL_GITLEAKS_ALLOW, archive_proposal, - audit_log_path, + host_db_path, list_pending_proposals, read_audit_entries, read_proposal, @@ -112,32 +111,44 @@ class TestResponseRoundtrip(unittest.TestCase): class TestQueueIO(unittest.TestCase): def setUp(self): self._tmp = tempfile.TemporaryDirectory(prefix="bot-bottle-supervise-test.") - self.queue_dir = Path(self._tmp.name) + self._home_patch = self._patch_home(Path(self._tmp.name)) + self.slug = "dev" def tearDown(self): + self._home_patch() self._tmp.cleanup() + def _patch_home(self, fake_home: Path): + original = supervise.bot_bottle_root + + def fake_root() -> Path: + return fake_home / ".bot-bottle" + + supervise.bot_bottle_root = fake_root # type: ignore[assignment] + return lambda: setattr(supervise, "bot_bottle_root", original) + def test_write_and_read_proposal(self): p = _proposal() - path = write_proposal(self.queue_dir, p) + path = write_proposal(p) self.assertTrue(path.exists()) + self.assertEqual(host_db_path(), path) self.assertEqual(0o600, path.stat().st_mode & 0o777) - loaded = read_proposal(self.queue_dir, p.id) + loaded = read_proposal(self.slug, p.id) self.assertEqual(p, loaded) def test_list_pending_excludes_responded(self): a = _proposal(justification="first") b = _proposal(justification="second") - write_proposal(self.queue_dir, a) - write_proposal(self.queue_dir, b) - write_response(self.queue_dir, Response( + write_proposal(a) + write_proposal(b) + write_response(self.slug, Response( proposal_id=a.id, status=STATUS_APPROVED, notes="", )) - pending = list_pending_proposals(self.queue_dir) + pending = list_pending_proposals(self.slug) self.assertEqual([b.id], [p.id for p in pending]) - def test_list_pending_returns_empty_for_missing_dir(self): - self.assertEqual([], list_pending_proposals(self.queue_dir / "nope")) + def test_list_pending_returns_empty_for_missing_slug(self): + self.assertEqual([], list_pending_proposals("nope")) def test_list_pending_sorted_by_arrival(self): # Fabricate two with explicit timestamps. @@ -154,30 +165,30 @@ class TestQueueIO(unittest.TestCase): now=datetime(2026, 5, 25, 14, 0, 0, tzinfo=timezone.utc), ) # Write in reverse order. - write_proposal(self.queue_dir, b) - write_proposal(self.queue_dir, a) - ordered = list_pending_proposals(self.queue_dir) + write_proposal(b) + write_proposal(a) + ordered = list_pending_proposals(self.slug) self.assertEqual([a.id, b.id], [p.id for p in ordered]) def test_write_and_read_response(self): r = Response(proposal_id="xyz", status=STATUS_REJECTED, notes="no") - write_response(self.queue_dir, r) - self.assertEqual(r, read_response(self.queue_dir, "xyz")) + write_response(self.slug, r) + self.assertEqual(r, read_response(self.slug, "xyz")) def test_wait_for_response_returns_when_file_appears(self): p = _proposal() - write_proposal(self.queue_dir, p) + write_proposal(p) def write_after_delay(): time.sleep(0.05) - write_response(self.queue_dir, Response( + write_response(self.slug, Response( proposal_id=p.id, status=STATUS_APPROVED, notes="ok", )) t = threading.Thread(target=write_after_delay) t.start() try: - r = wait_for_response(self.queue_dir, p.id, poll_interval=0.01) + r = wait_for_response(self.slug, p.id, poll_interval=0.01) finally: t.join() self.assertEqual(STATUS_APPROVED, r.status) @@ -187,25 +198,24 @@ class TestQueueIO(unittest.TestCase): deadline = time.monotonic() + 0.05 with self.assertRaises(TimeoutError): wait_for_response( - self.queue_dir, "never", + self.slug, "never", poll_interval=0.01, deadline=deadline, ) - def test_archive_proposal_moves_both_files(self): + def test_archive_proposal_hides_rows(self): p = _proposal() - write_proposal(self.queue_dir, p) - write_response(self.queue_dir, Response( + write_proposal(p) + write_response(self.slug, Response( proposal_id=p.id, status=STATUS_APPROVED, notes="", )) - archive_proposal(self.queue_dir, p.id) - self.assertFalse((self.queue_dir / f"{p.id}.proposal.json").exists()) - self.assertFalse((self.queue_dir / f"{p.id}.response.json").exists()) - self.assertTrue((self.queue_dir / "processed" / f"{p.id}.proposal.json").exists()) - self.assertTrue((self.queue_dir / "processed" / f"{p.id}.response.json").exists()) + archive_proposal(self.slug, p.id) + self.assertEqual([], list_pending_proposals(self.slug)) + with self.assertRaises(FileNotFoundError): + read_response(self.slug, p.id) def test_archive_is_idempotent_on_missing_files(self): # Should not raise. - archive_proposal(self.queue_dir, "nope") + archive_proposal(self.slug, "nope") class TestAuditLog(unittest.TestCase): @@ -237,6 +247,7 @@ class TestAuditLog(unittest.TestCase): diff="--- before\n+++ after\n", ) path = write_audit_entry(e) + self.assertEqual(host_db_path(), path) self.assertEqual(0o600, path.stat().st_mode & 0o777) loaded = read_audit_entries("cred-proxy", "dev") self.assertEqual([e], loaded) @@ -252,12 +263,13 @@ class TestAuditLog(unittest.TestCase): justification="", diff="", )) - path = audit_log_path("egress", "dev") - with path.open() as f: - lines = [line for line in f if line.strip()] - self.assertEqual(3, len(lines)) - for line in lines: - self.assertTrue(json.loads(line)) # each line is valid JSON + entries = read_audit_entries("egress", "dev") + self.assertEqual(3, len(entries)) + self.assertEqual( + ["2026-05-25T12:00:00+00:00", "2026-05-25T12:00:01+00:00", + "2026-05-25T12:00:02+00:00"], + [entry.timestamp for entry in entries], + ) def test_separate_logs_per_component_slug(self): write_audit_entry(AuditEntry( @@ -379,7 +391,7 @@ class TestSupervisePrepare(unittest.TestCase): def test_prepare_creates_queue(self): plan = _StubSupervise().prepare("dev", self.stage_dir) - self.assertTrue(plan.queue_dir.is_dir()) + self.assertTrue(plan.db_path.is_file()) self.assertEqual("dev", plan.slug) self.assertEqual("", plan.internal_network) diff --git a/tests/unit/test_supervise_cli.py b/tests/unit/test_supervise_cli.py index 47de267..836a221 100644 --- a/tests/unit/test_supervise_cli.py +++ b/tests/unit/test_supervise_cli.py @@ -77,9 +77,7 @@ class TestDiscoverPending(_FakeHomeMixin, unittest.TestCase): def test_walks_all_slug_subdirs(self): for slug in ("dev", "api"): - qdir = supervise.queue_dir_for_slug(slug) - qdir.mkdir(parents=True) - supervise.write_proposal(qdir, _proposal(slug=slug)) + supervise.write_proposal(_proposal(slug=slug)) pending = supervise_cli.discover_pending() self.assertEqual({"dev", "api"}, {qp.proposal.bottle_slug for qp in pending}) @@ -97,18 +95,14 @@ class TestDiscoverPending(_FakeHomeMixin, unittest.TestCase): now=datetime(2026, 5, 25, 14, 0, 0, tzinfo=timezone.utc), ) for p in (late, early): - qdir = supervise.queue_dir_for_slug(p.bottle_slug) - qdir.mkdir(parents=True, exist_ok=True) - supervise.write_proposal(qdir, p) + supervise.write_proposal(p) pending = supervise_cli.discover_pending() self.assertEqual([early.id, late.id], [qp.proposal.id for qp in pending]) def test_excludes_already_responded(self): p = _proposal() - qdir = supervise.queue_dir_for_slug("dev") - qdir.mkdir(parents=True) - supervise.write_proposal(qdir, p) - supervise.write_response(qdir, supervise.Response( + supervise.write_proposal(p) + supervise.write_response("dev", supervise.Response( proposal_id=p.id, status=STATUS_APPROVED, notes="", )) self.assertEqual([], supervise_cli.discover_pending()) @@ -123,10 +117,8 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase): def _enqueue(self, tool: str = TOOL_EGRESS_ALLOW): p = _proposal(tool=tool) - qdir = supervise.queue_dir_for_slug("dev") - qdir.mkdir(parents=True, exist_ok=True) - supervise.write_proposal(qdir, p) - return supervise_cli.QueuedProposal(proposal=p, queue_dir=qdir) + supervise.write_proposal(p) + return supervise_cli.QueuedProposal(proposal=p) def test_approve_writes_response(self): qp = self._enqueue() @@ -135,7 +127,7 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase): return_value=("routes: []\n", "routes:\n - host: example.com\n"), ): supervise_cli.approve(qp) - resp = read_response(qp.queue_dir, qp.proposal.id) + resp = read_response(qp.proposal.bottle_slug, qp.proposal.id) self.assertEqual(STATUS_APPROVED, resp.status) self.assertIsNone(resp.final_file) @@ -150,7 +142,7 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase): final_file="routes:\n - host: edited.example.com\n", notes="tweaked", ) - resp = read_response(qp.queue_dir, qp.proposal.id) + resp = read_response(qp.proposal.bottle_slug, qp.proposal.id) self.assertEqual(STATUS_MODIFIED, resp.status) self.assertEqual("routes:\n - host: edited.example.com\n", resp.final_file) self.assertEqual("tweaked", resp.notes) @@ -158,7 +150,7 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase): def test_reject_writes_rejection(self): qp = self._enqueue() supervise_cli.reject(qp, reason="nope") - resp = read_response(qp.queue_dir, qp.proposal.id) + resp = read_response(qp.proposal.bottle_slug, qp.proposal.id) self.assertEqual(STATUS_REJECTED, resp.status) self.assertEqual("nope", resp.notes) @@ -181,36 +173,33 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase): def test_approve_gitleaks_allow_leaves_response_for_gate(self): qp = self._enqueue(tool=TOOL_GITLEAKS_ALLOW) supervise_cli.approve(qp, notes="dummy fixture") - # Gate polls the queue dir for the response; TUI must not archive it. - resp = read_response(qp.queue_dir, qp.proposal.id) + # Gate polls the DB for the response; TUI must not archive it. + resp = read_response(qp.proposal.bottle_slug, qp.proposal.id) self.assertEqual(STATUS_APPROVED, resp.status) self.assertEqual("dummy fixture", resp.notes) - self.assertFalse((qp.queue_dir / "processed").exists()) def test_tui_gitleaks_allow_requires_reason(self): qp = self._enqueue(tool=TOOL_GITLEAKS_ALLOW) with patch.object(supervise_cli, "_prompt", return_value=""): status = supervise_cli._approve_from_tui(None, qp) # type: ignore[arg-type] self.assertEqual("approve aborted (empty reason)", status) - self.assertFalse((qp.queue_dir / "processed").exists()) def test_tui_gitleaks_allow_writes_reason(self): qp = self._enqueue(tool=TOOL_GITLEAKS_ALLOW) with patch.object(supervise_cli, "_prompt", return_value="test fixture"): status = supervise_cli._approve_from_tui(None, qp) # type: ignore[arg-type] self.assertIn("approved gitleaks-allow", status) - resp = read_response(qp.queue_dir, qp.proposal.id) + resp = read_response(qp.proposal.bottle_slug, qp.proposal.id) self.assertEqual("test fixture", resp.notes) def test_approve_token_allow_leaves_response_for_egress(self): qp = self._enqueue(tool=TOOL_EGRESS_TOKEN_ALLOW) supervise_cli.approve(qp, notes="false positive") - # The egress addon polls the queue dir for the response; the TUI must + # The egress addon polls the DB for the response; the TUI must # not archive it (the addon archives after reading). - resp = read_response(qp.queue_dir, qp.proposal.id) + resp = read_response(qp.proposal.bottle_slug, qp.proposal.id) self.assertEqual(STATUS_APPROVED, resp.status) self.assertEqual("false positive", resp.notes) - self.assertFalse((qp.queue_dir / "processed").exists()) def test_token_allow_writes_no_audit_log(self): qp = self._enqueue(tool=TOOL_EGRESS_TOKEN_ALLOW) @@ -222,14 +211,13 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase): with patch.object(supervise_cli, "_prompt", return_value=""): status = supervise_cli._approve_from_tui(None, qp) # type: ignore[arg-type] self.assertEqual("approve aborted (empty reason)", status) - self.assertFalse((qp.queue_dir / "processed").exists()) def test_tui_token_allow_writes_reason(self): qp = self._enqueue(tool=TOOL_EGRESS_TOKEN_ALLOW) with patch.object(supervise_cli, "_prompt", return_value="legit"): status = supervise_cli._approve_from_tui(None, qp) # type: ignore[arg-type] self.assertIn("approved egress-token-allow", status) - resp = read_response(qp.queue_dir, qp.proposal.id) + resp = read_response(qp.proposal.bottle_slug, qp.proposal.id) self.assertEqual("legit", resp.notes) def test_suffix_for_token_allow_is_txt(self): diff --git a/tests/unit/test_supervise_edge.py b/tests/unit/test_supervise_edge.py index 7f05450..2f69920 100644 --- a/tests/unit/test_supervise_edge.py +++ b/tests/unit/test_supervise_edge.py @@ -4,7 +4,6 @@ fallback paths.""" from __future__ import annotations -import os import tempfile import time import unittest @@ -12,14 +11,19 @@ from pathlib import Path from unittest.mock import patch from bot_bottle import supervise +from bot_bottle.audit_store import AuditStore +from bot_bottle.queue_store import QueueStore from bot_bottle.supervise import ( + AuditEntry, Proposal, + STATUS_APPROVED, TOOL_EGRESS_ALLOW, list_pending_proposals, read_audit_entries, read_proposal, read_response, wait_for_response, + write_audit_entry, ) @@ -37,58 +41,53 @@ class TestPathHelpers(unittest.TestCase): def test_bot_bottle_root(self) -> None: self.assertTrue(str(supervise.bot_bottle_root()).endswith(".bot-bottle")) - def test_queue_dir_for_slug(self) -> None: - self.assertIn("slug", str(supervise.queue_dir_for_slug("slug"))) - - def test_id_from_non_proposal_filename(self) -> None: - self.assertIsNone(supervise._id_from_proposal_filename(Path("x.response.json"))) - class TestReadMalformed(unittest.TestCase): - def test_read_proposal_non_dict(self) -> None: + def test_read_proposal_missing_row(self) -> None: with tempfile.TemporaryDirectory() as d: - (Path(d) / "p.proposal.json").write_text("[]") - with self.assertRaises(ValueError): - read_proposal(Path(d), "p") + with patch.dict("os.environ", {"HOME": d}), \ + self.assertRaises(FileNotFoundError): + read_proposal("slug", "p") - def test_read_response_non_dict(self) -> None: + def test_read_response_missing_row(self) -> None: with tempfile.TemporaryDirectory() as d: - (Path(d) / "p.response.json").write_text("[]") - with self.assertRaises(ValueError): - read_response(Path(d), "p") + with patch.dict("os.environ", {"HOME": d}), \ + self.assertRaises(FileNotFoundError): + read_response("slug", "p") - def test_list_pending_skips_malformed(self) -> None: + def test_list_pending_reads_db_only(self) -> None: with tempfile.TemporaryDirectory() as d: - qd = Path(d) - (qd / "bad.proposal.json").write_text("{ not json") - (qd / "arr.proposal.json").write_text("[]") - (qd / "incomplete.proposal.json").write_text("{}") # from_dict raises - supervise.write_proposal(qd, _proposal()) # one valid - pending = list_pending_proposals(qd) + with patch.dict("os.environ", {"HOME": d}): + supervise.write_proposal(_proposal()) + pending = list_pending_proposals("slug") self.assertEqual(1, len(pending)) self.assertEqual("slug", pending[0].bottle_slug) def test_list_pending_skips_when_response_present(self) -> None: with tempfile.TemporaryDirectory() as d: - qd = Path(d) - p = _proposal() - supervise.write_proposal(qd, p) - (qd / f"{p.id}.response.json").write_text("{}") # response exists -> skipped - self.assertEqual([], list_pending_proposals(qd)) + with patch.dict("os.environ", {"HOME": d}): + p = _proposal() + supervise.write_proposal(p) + supervise.write_response("slug", supervise.Response( + proposal_id=p.id, + status=STATUS_APPROVED, + notes="", + )) + self.assertEqual([], list_pending_proposals("slug")) class TestWaitForResponse(unittest.TestCase): - def test_malformed_response_then_timeout(self) -> None: + def test_missing_response_times_out(self) -> None: with tempfile.TemporaryDirectory() as d: - (Path(d) / "p.response.json").write_text("{ not json") - with self.assertRaises(TimeoutError): - wait_for_response(Path(d), "p", deadline=time.monotonic()) + with patch.dict("os.environ", {"HOME": d}), \ + self.assertRaises(TimeoutError): + wait_for_response("slug", "p", deadline=time.monotonic()) - def test_incomplete_response_then_timeout(self) -> None: + def test_empty_db_response_does_not_count(self) -> None: with tempfile.TemporaryDirectory() as d: - (Path(d) / "p.response.json").write_text("{}") # dict but from_dict raises - with self.assertRaises(TimeoutError): - wait_for_response(Path(d), "p", deadline=time.monotonic()) + with patch.dict("os.environ", {"HOME": d}), \ + self.assertRaises(TimeoutError): + wait_for_response("slug", "p", deadline=time.monotonic()) class TestReadAuditEntries(unittest.TestCase): @@ -97,35 +96,94 @@ class TestReadAuditEntries(unittest.TestCase): patch.dict("os.environ", {"HOME": home}): self.assertEqual([], read_audit_entries("egress", "nope")) - def test_skips_malformed_lines(self) -> None: + def test_reads_entries_from_db(self) -> None: with tempfile.TemporaryDirectory() as home, \ patch.dict("os.environ", {"HOME": home}): - path = supervise.audit_log_path("egress", "slug") - path.parent.mkdir(parents=True, exist_ok=True) - valid = ( - '{"timestamp": "t", "bottle_slug": "slug", "component": "egress",' - ' "operator_action": "approve", "operator_notes": "",' - ' "justification": "", "diff": ""}' - ) - path.write_text( - "\n" # blank line skipped - "{ not json\n" # JSONDecodeError skipped - "[]\n" # not a dict skipped - "{}\n" # missing fields -> ValueError skipped - + valid + "\n" - ) + write_audit_entry(AuditEntry( + timestamp="t", + bottle_slug="slug", + component="egress", + operator_action="approve", + operator_notes="", + justification="", + diff="", + )) + write_audit_entry(AuditEntry( + timestamp="t", + bottle_slug="other", + component="egress", + operator_action="reject", + operator_notes="", + justification="", + diff="", + )) entries = read_audit_entries("egress", "slug") self.assertEqual(1, len(entries)) self.assertEqual("approve", entries[0].operator_action) + def test_legacy_audit_log_file_does_not_count(self) -> None: + with tempfile.TemporaryDirectory() as home, \ + patch.dict("os.environ", {"HOME": home}): + path = supervise.audit_log_path("egress", "slug") + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text( + '{"timestamp": "t", "bottle_slug": "slug", "component": "egress",' + ' "operator_action": "approve", "operator_notes": "",' + ' "justification": "", "diff": ""}\n' + ) + entries = read_audit_entries("egress", "slug") + self.assertEqual([], entries) -class TestFlockFallback(unittest.TestCase): - def test_flock_on_closed_fd_is_swallowed(self) -> None: - # flock on a closed fd raises OSError(EBADF), which the helpers swallow. - fd = os.open(os.devnull, os.O_RDONLY) - os.close(fd) - supervise._try_flock(fd) - supervise._try_funlock(fd) + +class TestStoreGuardBranches(unittest.TestCase): + """Direct QueueStore / AuditStore construction and early-return guard branches.""" + + def test_queue_store_explicit_db_path(self): + with tempfile.TemporaryDirectory() as d: + db = Path(d) / "q.db" + store = QueueStore("key", db_path=db) + self.assertTrue(db.is_file()) + self.assertEqual(db, store.db_path) + + def test_queue_store_missing_db_list_pending_returns_empty(self): + with tempfile.TemporaryDirectory() as d: + db = Path(d) / "q.db" + store = QueueStore("key", db_path=db) + db.unlink() + self.assertEqual([], store.list_pending_proposals()) + + def test_queue_store_missing_db_list_all_returns_empty(self): + with tempfile.TemporaryDirectory() as d: + db = Path(d) / "q.db" + store = QueueStore("key", db_path=db) + db.unlink() + self.assertEqual([], store.list_all_pending_proposals()) + + def test_queue_store_missing_db_archive_is_noop(self): + with tempfile.TemporaryDirectory() as d: + db = Path(d) / "q.db" + store = QueueStore("key", db_path=db) + db.unlink() + store.archive_proposal("anything") # must not raise + + def test_queue_store_chmod_oserror_is_swallowed(self): + with tempfile.TemporaryDirectory() as d: + db = Path(d) / "q.db" + with patch("pathlib.Path.chmod", side_effect=OSError("ro")): + QueueStore("key", db_path=db) # must not raise + + def test_audit_store_missing_db_read_returns_empty(self): + with tempfile.TemporaryDirectory() as d: + db = Path(d) / "a.db" + store = AuditStore(db_path=db) + db.unlink() + self.assertEqual([], store.read_audit_entries("egress", "slug")) + + def test_audit_store_chmod_oserror_is_swallowed(self): + with tempfile.TemporaryDirectory() as d: + db = Path(d) / "a.db" + with patch("pathlib.Path.chmod", side_effect=OSError("ro")): + AuditStore(db_path=db) # must not raise if __name__ == "__main__": diff --git a/tests/unit/test_supervise_server.py b/tests/unit/test_supervise_server.py index 0eb11da..e41894f 100644 --- a/tests/unit/test_supervise_server.py +++ b/tests/unit/test_supervise_server.py @@ -112,7 +112,7 @@ class TestRpcErrorTaxonomy(unittest.TestCase): validate_proposed_file(_sv.TOOL_EGRESS_ALLOW, "routes: nope\n") def test_unknown_tool_in_tools_call_is_client_error(self): - config = ServerConfig(bottle_slug="dev", queue_dir=Path("/unused")) + config = ServerConfig(bottle_slug="dev") with self.assertRaises(_RpcClientError) as cm: handle_tools_call({"name": "no-such-tool", "arguments": {}}, config) self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code) @@ -122,9 +122,9 @@ class TestRpcInternalErrorOnIoFailure(unittest.TestCase): def test_write_proposal_os_error_raises_internal(self): config = ServerConfig( bottle_slug="dev", - queue_dir=Path("/dev/null/cannot-exist"), ) - with self.assertRaises(_RpcInternalError) as cm: + with patch.object(_sv, "write_proposal", side_effect=OSError("disk full")), \ + self.assertRaises(_RpcInternalError) as cm: handle_tools_call( { "name": _sv.TOOL_EGRESS_ALLOW, @@ -265,21 +265,31 @@ class TestHandleToolsList(unittest.TestCase): class TestHandleToolsCall(unittest.TestCase): def setUp(self): self._tmp = tempfile.TemporaryDirectory(prefix="supervise-server-test.") - self.queue_dir = Path(self._tmp.name) - self.config = ServerConfig(bottle_slug="dev", queue_dir=self.queue_dir) + self._home_patch = self._patch_home(Path(self._tmp.name)) + self.config = ServerConfig(bottle_slug="dev") def tearDown(self): + self._home_patch() self._tmp.cleanup() + def _patch_home(self, fake_home: Path): + original = _sv.bot_bottle_root + + def fake_root() -> Path: + return fake_home / ".bot-bottle" + + _sv.bot_bottle_root = fake_root # type: ignore[assignment] + return lambda: setattr(_sv, "bot_bottle_root", original) + def _respond_when_proposal_appears(self, status: str, notes: str = "") -> threading.Thread: """Background thread: poll the queue for a fresh proposal, write a matching response. Returns the thread so the test can join it.""" def runner(): for _ in range(200): - pending = _sv.list_pending_proposals(self.queue_dir) + pending = _sv.list_pending_proposals("dev") if pending: p = pending[0] - _sv.write_response(self.queue_dir, _sv.Response( + _sv.write_response("dev", _sv.Response( proposal_id=p.id, status=status, notes=notes, )) return @@ -412,15 +422,11 @@ class TestHandleToolsCall(unittest.TestCase): finally: responder.join() # No pending proposals left after archive. - self.assertEqual([], _sv.list_pending_proposals(self.queue_dir)) - # Both files moved to processed/. - processed = list((self.queue_dir / "processed").glob("*.json")) - self.assertEqual(2, len(processed)) + self.assertEqual([], _sv.list_pending_proposals("dev")) def test_pending_response_times_out_without_archive(self): config = ServerConfig( bottle_slug="dev", - queue_dir=self.queue_dir, response_timeout_seconds=0.05, ) result = handle_tools_call( @@ -438,8 +444,7 @@ class TestHandleToolsCall(unittest.TestCase): text = result["content"][0]["text"] # type: ignore[index] self.assertIn("status: pending", text) self.assertIn("proposal remains queued", text) - self.assertEqual(1, len(_sv.list_pending_proposals(self.queue_dir))) - self.assertFalse((self.queue_dir / "processed").exists()) + self.assertEqual(1, len(_sv.list_pending_proposals("dev"))) class TestHandleListEgressRoutes(unittest.TestCase): @@ -461,7 +466,7 @@ class TestHandleListEgressRoutes(unittest.TestCase): with patch.object(supervise_server.urllib.request, "build_opener", return_value=_Opener()): result = handle_list_egress_routes( {}, - ServerConfig(bottle_slug="dev", queue_dir=Path("/unused")), + ServerConfig(bottle_slug="dev"), ) self.assertFalse(result["isError"]) # type: ignore[index] @@ -476,7 +481,7 @@ class TestHandleListEgressRoutes(unittest.TestCase): with patch.object(supervise_server.urllib.request, "build_opener", return_value=_Opener()): result = handle_list_egress_routes( {}, - ServerConfig(bottle_slug="dev", queue_dir=Path("/unused")), + ServerConfig(bottle_slug="dev"), ) self.assertTrue(result["isError"]) # type: ignore[index] @@ -544,7 +549,6 @@ class TestHttpEndToEnd(unittest.TestCase): def setUp(self): self._tmp = tempfile.TemporaryDirectory(prefix="supervise-http-test.") - self.queue_dir = Path(self._tmp.name) # Pick a random port by binding to :0 first. import socket s = socket.socket() @@ -552,7 +556,7 @@ class TestHttpEndToEnd(unittest.TestCase): self.port = s.getsockname()[1] s.close() self.server = MCPServer(("127.0.0.1", self.port), MCPHandler) - self.server.config = ServerConfig(bottle_slug="dev", queue_dir=self.queue_dir) + self.server.config = ServerConfig(bottle_slug="dev") self.thread = threading.Thread( target=self.server.serve_forever, daemon=True, )