diff --git a/bot_bottle/supervise.py b/bot_bottle/supervise.py index 12d4b6c..ddd6c4f 100644 --- a/bot_bottle/supervise.py +++ b/bot_bottle/supervise.py @@ -34,8 +34,7 @@ from __future__ import annotations import dataclasses import difflib import hashlib -import json -import os +import sqlite3 import time import uuid from abc import ABC @@ -88,6 +87,8 @@ ACTION_OPERATOR_EDIT = "operator-edit" QUEUE_DIR_IN_CONTAINER = "/run/supervise/queue" DEFAULT_POLL_INTERVAL_SEC = 0.5 +HOST_DB_FILENAME = "bot-bottle.db" +QUEUE_DB_FILENAME = "supervise.db" # --- Paths ----------------------------------------------------------------- @@ -109,6 +110,14 @@ def audit_log_path(component: str, slug: str) -> Path: return audit_dir() / f"{component}-{slug}.log" +def host_db_path() -> Path: + return bot_bottle_root() / HOST_DB_FILENAME + + +def queue_db_path(queue_dir: Path) -> Path: + return queue_dir / QUEUE_DB_FILENAME + + # --- Dataclasses ----------------------------------------------------------- @@ -226,83 +235,29 @@ class AuditEntry: # --- Queue I/O ------------------------------------------------------------- -def _proposal_filename(proposal_id: str) -> str: - return f"{proposal_id}.proposal.json" - - -def _response_filename(proposal_id: str) -> str: - return f"{proposal_id}.response.json" - - -def _id_from_proposal_filename(path: Path) -> str | None: - name = path.name - if not name.endswith(".proposal.json"): - return None - return name[: -len(".proposal.json")] - - def write_proposal(queue_dir: Path, proposal: Proposal) -> Path: - """Persist `proposal` as JSON in the queue dir, mode 0o600. + """Persist `proposal` in the queue database, mode 0o600. Directory is created if missing.""" - queue_dir.mkdir(parents=True, exist_ok=True) - path = queue_dir / _proposal_filename(proposal.id) - payload = json.dumps(proposal.to_dict(), indent=2) + "\n" - _atomic_write(path, payload, mode=0o600) - return path + return _QueueStore(queue_dir).write_proposal(proposal) def read_proposal(queue_dir: Path, proposal_id: str) -> Proposal: - path = queue_dir / _proposal_filename(proposal_id) - with path.open() as f: - raw = json.load(f) - if not isinstance(raw, dict): - raise ValueError(f"{path}: top-level must be an object") - return Proposal.from_dict(raw) + return _QueueStore(queue_dir).read_proposal(proposal_id) def list_pending_proposals(queue_dir: Path) -> list[Proposal]: """All proposals in `queue_dir` that do not yet have a matching - response file. Sorted by `arrival_timestamp` so the operator + response. Sorted by `arrival_timestamp` so the operator sees the queue FIFO.""" - if not queue_dir.is_dir(): - return [] - out: list[Proposal] = [] - for path in sorted(queue_dir.glob("*.proposal.json")): - proposal_id = _id_from_proposal_filename(path) - if proposal_id is None: - continue - if (queue_dir / _response_filename(proposal_id)).exists(): - continue - try: - with path.open() as f: - raw = json.load(f) - except (OSError, json.JSONDecodeError): - continue - if not isinstance(raw, dict): - continue - try: - out.append(Proposal.from_dict(raw)) - except (KeyError, ValueError): - continue - out.sort(key=lambda p: p.arrival_timestamp) - return out + return _QueueStore(queue_dir).list_pending_proposals() def write_response(queue_dir: Path, response: Response) -> Path: - queue_dir.mkdir(parents=True, exist_ok=True) - path = queue_dir / _response_filename(response.proposal_id) - payload = json.dumps(response.to_dict(), indent=2) + "\n" - _atomic_write(path, payload, mode=0o600) - return path + return _QueueStore(queue_dir).write_response(response) def read_response(queue_dir: Path, proposal_id: str) -> Response: - path = queue_dir / _response_filename(proposal_id) - with path.open() as f: - raw = json.load(f) - if not isinstance(raw, dict): - raise ValueError(f"{path}: top-level must be an object") - return Response.from_dict(raw) + return _QueueStore(queue_dir).read_response(proposal_id) def wait_for_response( @@ -317,90 +272,35 @@ def wait_for_response( which the wait raises TimeoutError. None waits forever — the natural shape, since the operator's response time is unbounded. - Polls the filesystem so the implementation stays portable and - stdlib-only.""" - path = queue_dir / _response_filename(proposal_id) + Polls SQLite so the implementation stays portable and stdlib-only.""" + store = _QueueStore(queue_dir) while True: - if path.exists(): - try: - with path.open() as f: - raw = json.load(f) - except (OSError, json.JSONDecodeError): - raw = None - if isinstance(raw, dict): - try: - return Response.from_dict(raw) - except (KeyError, ValueError): - pass + try: + return store.read_response(proposal_id) + except FileNotFoundError: + pass if deadline is not None and time.monotonic() >= deadline: raise TimeoutError(f"no response for proposal {proposal_id!r}") time.sleep(poll_interval) def archive_proposal(queue_dir: Path, proposal_id: str) -> None: - """Move both proposal and response files to `/processed/`. - Idempotent — missing files are silently skipped.""" - processed = queue_dir / "processed" - processed.mkdir(parents=True, exist_ok=True) - for name in (_proposal_filename(proposal_id), _response_filename(proposal_id)): - src = queue_dir / name - if src.exists(): - src.rename(processed / name) + """Mark both proposal and response rows processed. + Idempotent — missing rows are silently skipped.""" + _QueueStore(queue_dir).archive_proposal(proposal_id) # --- Audit log ------------------------------------------------------------- def write_audit_entry(entry: AuditEntry) -> Path: - """Append `entry` as one JSON-Lines record to the per-bottle - audit log. Acquires an advisory exclusive lock so concurrent - writers don't interleave bytes.""" - path = audit_log_path(entry.component, entry.bottle_slug) - path.parent.mkdir(parents=True, exist_ok=True) - line = json.dumps(entry.to_dict(), sort_keys=False) + "\n" - fd = os.open(path, os.O_WRONLY | os.O_APPEND | os.O_CREAT, 0o600) - try: - _try_flock(fd) - try: - os.write(fd, line.encode("utf-8")) - finally: - _try_funlock(fd) - finally: - os.close(fd) - return path + """Append `entry` to the host supervise audit table.""" + return _AuditStore().write_audit_entry(entry) def read_audit_entries(component: str, slug: str) -> list[AuditEntry]: - """Load all audit entries for the given component+slug. Empty - list if the log doesn't exist.""" - path = audit_log_path(component, slug) - if not path.is_file(): - return [] - out: list[AuditEntry] = [] - with path.open() as f: - for raw_line in f: - raw_line = raw_line.strip() - if not raw_line: - continue - try: - raw = json.loads(raw_line) - except json.JSONDecodeError: - continue - if not isinstance(raw, dict): - continue - try: - out.append(AuditEntry( - timestamp=_require_str(raw, "timestamp"), - bottle_slug=_require_str(raw, "bottle_slug"), - component=_require_str(raw, "component"), - operator_action=_require_str(raw, "operator_action"), - operator_notes=_require_str(raw, "operator_notes"), - justification=_require_str(raw, "justification"), - diff=_require_str(raw, "diff"), - )) - except ValueError: - continue - return out + """Load all audit entries for the given component+slug.""" + return _AuditStore().read_audit_entries(component, slug) # --- Diff rendering -------------------------------------------------------- @@ -426,6 +326,260 @@ def sha256_hex(content: str) -> str: return hashlib.sha256(content.encode("utf-8")).hexdigest() +# --- SQLite storage -------------------------------------------------------- + + +class _QueueStore: + def __init__(self, queue_dir: Path) -> None: + self.db_path = queue_db_path(queue_dir) + self.db_path.parent.mkdir(parents=True, exist_ok=True) + self._init() + + def write_proposal(self, proposal: Proposal) -> Path: + with self._connect() as conn: + conn.execute( + """ + INSERT OR REPLACE INTO supervise_proposals ( + id, bottle_slug, tool, proposed_file, justification, + arrival_timestamp, current_file_hash, archived + ) VALUES (?, ?, ?, ?, ?, ?, ?, 0) + """, + ( + proposal.id, + proposal.bottle_slug, + proposal.tool, + proposal.proposed_file, + proposal.justification, + proposal.arrival_timestamp, + proposal.current_file_hash, + ), + ) + self._chmod() + return self.db_path + + def read_proposal(self, proposal_id: str) -> Proposal: + with self._connect() as conn: + row = conn.execute( + """ + SELECT * FROM supervise_proposals + WHERE id = ? AND archived = 0 + """, + (proposal_id,), + ).fetchone() + if row is None: + raise FileNotFoundError(proposal_id) + return _proposal_from_row(row) + + def list_pending_proposals(self) -> list[Proposal]: + if not self.db_path.is_file(): + return [] + with self._connect() as conn: + rows = conn.execute( + """ + SELECT p.* FROM supervise_proposals p + WHERE p.archived = 0 + AND NOT EXISTS ( + SELECT 1 FROM supervise_responses r + WHERE r.proposal_id = p.id AND r.archived = 0 + ) + ORDER BY p.arrival_timestamp, p.id + """ + ).fetchall() + return [_proposal_from_row(row) for row in rows] + + def write_response(self, response: Response) -> Path: + with self._connect() as conn: + conn.execute( + """ + INSERT OR REPLACE INTO supervise_responses ( + proposal_id, status, notes, final_file, archived + ) VALUES (?, ?, ?, ?, 0) + """, + ( + response.proposal_id, + response.status, + response.notes, + response.final_file, + ), + ) + self._chmod() + return self.db_path + + def read_response(self, proposal_id: str) -> Response: + with self._connect() as conn: + row = conn.execute( + """ + SELECT * FROM supervise_responses + WHERE proposal_id = ? AND archived = 0 + """, + (proposal_id,), + ).fetchone() + if row is None: + raise FileNotFoundError(proposal_id) + return _response_from_row(row) + + def archive_proposal(self, proposal_id: str) -> None: + if not self.db_path.is_file(): + return + with self._connect() as conn: + conn.execute( + "UPDATE supervise_proposals SET archived = 1 WHERE id = ?", + (proposal_id,), + ) + conn.execute( + """ + UPDATE supervise_responses SET archived = 1 + WHERE proposal_id = ? + """, + (proposal_id,), + ) + + def _connect(self) -> sqlite3.Connection: + conn = sqlite3.connect(self.db_path) + conn.row_factory = sqlite3.Row + return conn + + def _init(self) -> None: + with self._connect() as conn: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS supervise_proposals ( + id TEXT PRIMARY KEY, + bottle_slug TEXT NOT NULL, + tool TEXT NOT NULL, + proposed_file TEXT NOT NULL, + justification TEXT NOT NULL, + arrival_timestamp TEXT NOT NULL, + current_file_hash TEXT NOT NULL, + archived INTEGER NOT NULL DEFAULT 0 + ) + """ + ) + conn.execute( + """ + CREATE TABLE IF NOT EXISTS supervise_responses ( + proposal_id TEXT PRIMARY KEY, + status TEXT NOT NULL, + notes TEXT NOT NULL, + final_file TEXT, + archived INTEGER NOT NULL DEFAULT 0 + ) + """ + ) + self._chmod() + + def _chmod(self) -> None: + try: + self.db_path.chmod(0o600) + except OSError: + pass + + +class _AuditStore: + def __init__(self, db_path: Path | None = None) -> None: + self.db_path = db_path or host_db_path() + self.db_path.parent.mkdir(parents=True, exist_ok=True) + self._init() + + def write_audit_entry(self, entry: AuditEntry) -> Path: + with self._connect() as conn: + conn.execute( + """ + INSERT INTO supervise_audit_entries ( + timestamp, bottle_slug, component, operator_action, + operator_notes, justification, diff + ) VALUES (?, ?, ?, ?, ?, ?, ?) + """, + ( + entry.timestamp, + entry.bottle_slug, + entry.component, + entry.operator_action, + entry.operator_notes, + entry.justification, + entry.diff, + ), + ) + self._chmod() + return self.db_path + + def read_audit_entries(self, component: str, slug: str) -> list[AuditEntry]: + if not self.db_path.is_file(): + return [] + with self._connect() as conn: + rows = conn.execute( + """ + SELECT * FROM supervise_audit_entries + WHERE component = ? AND bottle_slug = ? + ORDER BY id + """, + (component, slug), + ).fetchall() + return [_audit_entry_from_row(row) for row in rows] + + def _connect(self) -> sqlite3.Connection: + conn = sqlite3.connect(self.db_path) + conn.row_factory = sqlite3.Row + return conn + + def _init(self) -> None: + with self._connect() as conn: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS supervise_audit_entries ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL, + bottle_slug TEXT NOT NULL, + component TEXT NOT NULL, + operator_action TEXT NOT NULL, + operator_notes TEXT NOT NULL, + justification TEXT NOT NULL, + diff TEXT NOT NULL + ) + """ + ) + self._chmod() + + def _chmod(self) -> None: + try: + self.db_path.chmod(0o600) + except OSError: + pass + + +def _proposal_from_row(row: sqlite3.Row) -> Proposal: + return Proposal( + id=row["id"], + bottle_slug=row["bottle_slug"], + tool=row["tool"], + proposed_file=row["proposed_file"], + justification=row["justification"], + arrival_timestamp=row["arrival_timestamp"], + current_file_hash=row["current_file_hash"], + ) + + +def _response_from_row(row: sqlite3.Row) -> Response: + return Response( + proposal_id=row["proposal_id"], + status=row["status"], + notes=row["notes"], + final_file=row["final_file"], + ) + + +def _audit_entry_from_row(row: sqlite3.Row) -> AuditEntry: + return AuditEntry( + timestamp=row["timestamp"], + bottle_slug=row["bottle_slug"], + component=row["component"], + operator_action=row["operator_action"], + operator_notes=row["operator_notes"], + justification=row["justification"], + diff=row["diff"], + ) + + # --- Sidecar plan + abstract lifecycle ------------------------------------- @@ -474,40 +628,6 @@ def _require_str(raw: dict[str, object], key: str) -> str: return value -def _atomic_write(path: Path, content: str, *, mode: int) -> None: - """Atomic: write to a sibling tmp file, fsync, rename.""" - tmp = path.with_suffix(path.suffix + ".tmp") - fd = os.open(tmp, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, mode) - try: - os.write(fd, content.encode("utf-8")) - os.fsync(fd) - finally: - os.close(fd) - os.replace(tmp, path) - - -try: - import fcntl as _fcntl - - def _try_flock(fd: int) -> None: # type: ignore[reportRedeclaration] - try: - _fcntl.flock(fd, _fcntl.LOCK_EX) - except OSError: - pass - - def _try_funlock(fd: int) -> None: # type: ignore[reportRedeclaration] - try: - _fcntl.flock(fd, _fcntl.LOCK_UN) - except OSError: - pass -except ImportError: # pragma: no cover — Windows path - def _try_flock(fd: int) -> None: # noqa: F841 — Windows fallback - return None - - def _try_funlock(fd: int) -> None: # noqa: F841 — Windows fallback - return None - - __all__ = [ "ACTION_OPERATOR_EDIT", "AuditEntry", @@ -536,7 +656,9 @@ __all__ = [ "audit_dir", "audit_log_path", "bot_bottle_root", + "host_db_path", "list_pending_proposals", + "queue_db_path", "queue_dir_for_slug", "read_audit_entries", "read_proposal", diff --git a/tests/unit/test_supervise.py b/tests/unit/test_supervise.py index dfe8bd4..0e97d32 100644 --- a/tests/unit/test_supervise.py +++ b/tests/unit/test_supervise.py @@ -1,6 +1,5 @@ """Unit: supervise queue + audit log + diff helpers (PRD 0013).""" -import json import tempfile import threading import time @@ -19,8 +18,9 @@ from bot_bottle.supervise import ( TOOL_EGRESS_ALLOW, TOOL_GITLEAKS_ALLOW, archive_proposal, - audit_log_path, + host_db_path, list_pending_proposals, + queue_db_path, read_audit_entries, read_proposal, read_response, @@ -121,6 +121,7 @@ class TestQueueIO(unittest.TestCase): p = _proposal() path = write_proposal(self.queue_dir, p) self.assertTrue(path.exists()) + self.assertEqual(queue_db_path(self.queue_dir), path) self.assertEqual(0o600, path.stat().st_mode & 0o777) loaded = read_proposal(self.queue_dir, p.id) self.assertEqual(p, loaded) @@ -198,10 +199,9 @@ class TestQueueIO(unittest.TestCase): proposal_id=p.id, status=STATUS_APPROVED, notes="", )) archive_proposal(self.queue_dir, p.id) - self.assertFalse((self.queue_dir / f"{p.id}.proposal.json").exists()) - self.assertFalse((self.queue_dir / f"{p.id}.response.json").exists()) - self.assertTrue((self.queue_dir / "processed" / f"{p.id}.proposal.json").exists()) - self.assertTrue((self.queue_dir / "processed" / f"{p.id}.response.json").exists()) + self.assertEqual([], list_pending_proposals(self.queue_dir)) + with self.assertRaises(FileNotFoundError): + read_response(self.queue_dir, p.id) def test_archive_is_idempotent_on_missing_files(self): # Should not raise. @@ -237,6 +237,7 @@ class TestAuditLog(unittest.TestCase): diff="--- before\n+++ after\n", ) path = write_audit_entry(e) + self.assertEqual(host_db_path(), path) self.assertEqual(0o600, path.stat().st_mode & 0o777) loaded = read_audit_entries("cred-proxy", "dev") self.assertEqual([e], loaded) @@ -252,12 +253,13 @@ class TestAuditLog(unittest.TestCase): justification="", diff="", )) - path = audit_log_path("egress", "dev") - with path.open() as f: - lines = [line for line in f if line.strip()] - self.assertEqual(3, len(lines)) - for line in lines: - self.assertTrue(json.loads(line)) # each line is valid JSON + entries = read_audit_entries("egress", "dev") + self.assertEqual(3, len(entries)) + self.assertEqual( + ["2026-05-25T12:00:00+00:00", "2026-05-25T12:00:01+00:00", + "2026-05-25T12:00:02+00:00"], + [entry.timestamp for entry in entries], + ) def test_separate_logs_per_component_slug(self): write_audit_entry(AuditEntry( diff --git a/tests/unit/test_supervise_server.py b/tests/unit/test_supervise_server.py index 0eb11da..088dd39 100644 --- a/tests/unit/test_supervise_server.py +++ b/tests/unit/test_supervise_server.py @@ -413,9 +413,7 @@ class TestHandleToolsCall(unittest.TestCase): responder.join() # No pending proposals left after archive. self.assertEqual([], _sv.list_pending_proposals(self.queue_dir)) - # Both files moved to processed/. - processed = list((self.queue_dir / "processed").glob("*.json")) - self.assertEqual(2, len(processed)) + self.assertFalse((self.queue_dir / "processed").exists()) def test_pending_response_times_out_without_archive(self): config = ServerConfig(