feat(supervise): store queue and audit data in sqlite
lint / lint (push) Failing after 1m53s
test / unit (pull_request) Failing after 45s
test / integration (pull_request) Successful in 17s
test / coverage (pull_request) Failing after 50s

This commit is contained in:
2026-07-01 16:56:23 +00:00
parent 9af02831ea
commit 08918f9a8a
3 changed files with 302 additions and 180 deletions
+287 -165
View File
@@ -34,8 +34,7 @@ from __future__ import annotations
import dataclasses import dataclasses
import difflib import difflib
import hashlib import hashlib
import json import sqlite3
import os
import time import time
import uuid import uuid
from abc import ABC from abc import ABC
@@ -88,6 +87,8 @@ ACTION_OPERATOR_EDIT = "operator-edit"
QUEUE_DIR_IN_CONTAINER = "/run/supervise/queue" QUEUE_DIR_IN_CONTAINER = "/run/supervise/queue"
DEFAULT_POLL_INTERVAL_SEC = 0.5 DEFAULT_POLL_INTERVAL_SEC = 0.5
HOST_DB_FILENAME = "bot-bottle.db"
QUEUE_DB_FILENAME = "supervise.db"
# --- Paths ----------------------------------------------------------------- # --- Paths -----------------------------------------------------------------
@@ -109,6 +110,14 @@ def audit_log_path(component: str, slug: str) -> Path:
return audit_dir() / f"{component}-{slug}.log" return audit_dir() / f"{component}-{slug}.log"
def host_db_path() -> Path:
return bot_bottle_root() / HOST_DB_FILENAME
def queue_db_path(queue_dir: Path) -> Path:
return queue_dir / QUEUE_DB_FILENAME
# --- Dataclasses ----------------------------------------------------------- # --- Dataclasses -----------------------------------------------------------
@@ -226,83 +235,29 @@ class AuditEntry:
# --- Queue I/O ------------------------------------------------------------- # --- Queue I/O -------------------------------------------------------------
def _proposal_filename(proposal_id: str) -> str:
return f"{proposal_id}.proposal.json"
def _response_filename(proposal_id: str) -> str:
return f"{proposal_id}.response.json"
def _id_from_proposal_filename(path: Path) -> str | None:
name = path.name
if not name.endswith(".proposal.json"):
return None
return name[: -len(".proposal.json")]
def write_proposal(queue_dir: Path, proposal: Proposal) -> Path: def write_proposal(queue_dir: Path, proposal: Proposal) -> Path:
"""Persist `proposal` as JSON in the queue dir, mode 0o600. """Persist `proposal` in the queue database, mode 0o600.
Directory is created if missing.""" Directory is created if missing."""
queue_dir.mkdir(parents=True, exist_ok=True) return _QueueStore(queue_dir).write_proposal(proposal)
path = queue_dir / _proposal_filename(proposal.id)
payload = json.dumps(proposal.to_dict(), indent=2) + "\n"
_atomic_write(path, payload, mode=0o600)
return path
def read_proposal(queue_dir: Path, proposal_id: str) -> Proposal: def read_proposal(queue_dir: Path, proposal_id: str) -> Proposal:
path = queue_dir / _proposal_filename(proposal_id) return _QueueStore(queue_dir).read_proposal(proposal_id)
with path.open() as f:
raw = json.load(f)
if not isinstance(raw, dict):
raise ValueError(f"{path}: top-level must be an object")
return Proposal.from_dict(raw)
def list_pending_proposals(queue_dir: Path) -> list[Proposal]: def list_pending_proposals(queue_dir: Path) -> list[Proposal]:
"""All proposals in `queue_dir` that do not yet have a matching """All proposals in `queue_dir` that do not yet have a matching
response file. Sorted by `arrival_timestamp` so the operator response. Sorted by `arrival_timestamp` so the operator
sees the queue FIFO.""" sees the queue FIFO."""
if not queue_dir.is_dir(): return _QueueStore(queue_dir).list_pending_proposals()
return []
out: list[Proposal] = []
for path in sorted(queue_dir.glob("*.proposal.json")):
proposal_id = _id_from_proposal_filename(path)
if proposal_id is None:
continue
if (queue_dir / _response_filename(proposal_id)).exists():
continue
try:
with path.open() as f:
raw = json.load(f)
except (OSError, json.JSONDecodeError):
continue
if not isinstance(raw, dict):
continue
try:
out.append(Proposal.from_dict(raw))
except (KeyError, ValueError):
continue
out.sort(key=lambda p: p.arrival_timestamp)
return out
def write_response(queue_dir: Path, response: Response) -> Path: def write_response(queue_dir: Path, response: Response) -> Path:
queue_dir.mkdir(parents=True, exist_ok=True) return _QueueStore(queue_dir).write_response(response)
path = queue_dir / _response_filename(response.proposal_id)
payload = json.dumps(response.to_dict(), indent=2) + "\n"
_atomic_write(path, payload, mode=0o600)
return path
def read_response(queue_dir: Path, proposal_id: str) -> Response: def read_response(queue_dir: Path, proposal_id: str) -> Response:
path = queue_dir / _response_filename(proposal_id) return _QueueStore(queue_dir).read_response(proposal_id)
with path.open() as f:
raw = json.load(f)
if not isinstance(raw, dict):
raise ValueError(f"{path}: top-level must be an object")
return Response.from_dict(raw)
def wait_for_response( def wait_for_response(
@@ -317,90 +272,35 @@ def wait_for_response(
which the wait raises TimeoutError. None waits forever — the which the wait raises TimeoutError. None waits forever — the
natural shape, since the operator's response time is unbounded. natural shape, since the operator's response time is unbounded.
Polls the filesystem so the implementation stays portable and Polls SQLite so the implementation stays portable and stdlib-only."""
stdlib-only.""" store = _QueueStore(queue_dir)
path = queue_dir / _response_filename(proposal_id)
while True: while True:
if path.exists(): try:
try: return store.read_response(proposal_id)
with path.open() as f: except FileNotFoundError:
raw = json.load(f) pass
except (OSError, json.JSONDecodeError):
raw = None
if isinstance(raw, dict):
try:
return Response.from_dict(raw)
except (KeyError, ValueError):
pass
if deadline is not None and time.monotonic() >= deadline: if deadline is not None and time.monotonic() >= deadline:
raise TimeoutError(f"no response for proposal {proposal_id!r}") raise TimeoutError(f"no response for proposal {proposal_id!r}")
time.sleep(poll_interval) time.sleep(poll_interval)
def archive_proposal(queue_dir: Path, proposal_id: str) -> None: def archive_proposal(queue_dir: Path, proposal_id: str) -> None:
"""Move both proposal and response files to `<queue_dir>/processed/`. """Mark both proposal and response rows processed.
Idempotent — missing files are silently skipped.""" Idempotent — missing rows are silently skipped."""
processed = queue_dir / "processed" _QueueStore(queue_dir).archive_proposal(proposal_id)
processed.mkdir(parents=True, exist_ok=True)
for name in (_proposal_filename(proposal_id), _response_filename(proposal_id)):
src = queue_dir / name
if src.exists():
src.rename(processed / name)
# --- Audit log ------------------------------------------------------------- # --- Audit log -------------------------------------------------------------
def write_audit_entry(entry: AuditEntry) -> Path: def write_audit_entry(entry: AuditEntry) -> Path:
"""Append `entry` as one JSON-Lines record to the per-bottle """Append `entry` to the host supervise audit table."""
audit log. Acquires an advisory exclusive lock so concurrent return _AuditStore().write_audit_entry(entry)
writers don't interleave bytes."""
path = audit_log_path(entry.component, entry.bottle_slug)
path.parent.mkdir(parents=True, exist_ok=True)
line = json.dumps(entry.to_dict(), sort_keys=False) + "\n"
fd = os.open(path, os.O_WRONLY | os.O_APPEND | os.O_CREAT, 0o600)
try:
_try_flock(fd)
try:
os.write(fd, line.encode("utf-8"))
finally:
_try_funlock(fd)
finally:
os.close(fd)
return path
def read_audit_entries(component: str, slug: str) -> list[AuditEntry]: def read_audit_entries(component: str, slug: str) -> list[AuditEntry]:
"""Load all audit entries for the given component+slug. Empty """Load all audit entries for the given component+slug."""
list if the log doesn't exist.""" return _AuditStore().read_audit_entries(component, slug)
path = audit_log_path(component, slug)
if not path.is_file():
return []
out: list[AuditEntry] = []
with path.open() as f:
for raw_line in f:
raw_line = raw_line.strip()
if not raw_line:
continue
try:
raw = json.loads(raw_line)
except json.JSONDecodeError:
continue
if not isinstance(raw, dict):
continue
try:
out.append(AuditEntry(
timestamp=_require_str(raw, "timestamp"),
bottle_slug=_require_str(raw, "bottle_slug"),
component=_require_str(raw, "component"),
operator_action=_require_str(raw, "operator_action"),
operator_notes=_require_str(raw, "operator_notes"),
justification=_require_str(raw, "justification"),
diff=_require_str(raw, "diff"),
))
except ValueError:
continue
return out
# --- Diff rendering -------------------------------------------------------- # --- Diff rendering --------------------------------------------------------
@@ -426,6 +326,260 @@ def sha256_hex(content: str) -> str:
return hashlib.sha256(content.encode("utf-8")).hexdigest() return hashlib.sha256(content.encode("utf-8")).hexdigest()
# --- SQLite storage --------------------------------------------------------
class _QueueStore:
def __init__(self, queue_dir: Path) -> None:
self.db_path = queue_db_path(queue_dir)
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._init()
def write_proposal(self, proposal: Proposal) -> Path:
with self._connect() as conn:
conn.execute(
"""
INSERT OR REPLACE INTO supervise_proposals (
id, bottle_slug, tool, proposed_file, justification,
arrival_timestamp, current_file_hash, archived
) VALUES (?, ?, ?, ?, ?, ?, ?, 0)
""",
(
proposal.id,
proposal.bottle_slug,
proposal.tool,
proposal.proposed_file,
proposal.justification,
proposal.arrival_timestamp,
proposal.current_file_hash,
),
)
self._chmod()
return self.db_path
def read_proposal(self, proposal_id: str) -> Proposal:
with self._connect() as conn:
row = conn.execute(
"""
SELECT * FROM supervise_proposals
WHERE id = ? AND archived = 0
""",
(proposal_id,),
).fetchone()
if row is None:
raise FileNotFoundError(proposal_id)
return _proposal_from_row(row)
def list_pending_proposals(self) -> list[Proposal]:
if not self.db_path.is_file():
return []
with self._connect() as conn:
rows = conn.execute(
"""
SELECT p.* FROM supervise_proposals p
WHERE p.archived = 0
AND NOT EXISTS (
SELECT 1 FROM supervise_responses r
WHERE r.proposal_id = p.id AND r.archived = 0
)
ORDER BY p.arrival_timestamp, p.id
"""
).fetchall()
return [_proposal_from_row(row) for row in rows]
def write_response(self, response: Response) -> Path:
with self._connect() as conn:
conn.execute(
"""
INSERT OR REPLACE INTO supervise_responses (
proposal_id, status, notes, final_file, archived
) VALUES (?, ?, ?, ?, 0)
""",
(
response.proposal_id,
response.status,
response.notes,
response.final_file,
),
)
self._chmod()
return self.db_path
def read_response(self, proposal_id: str) -> Response:
with self._connect() as conn:
row = conn.execute(
"""
SELECT * FROM supervise_responses
WHERE proposal_id = ? AND archived = 0
""",
(proposal_id,),
).fetchone()
if row is None:
raise FileNotFoundError(proposal_id)
return _response_from_row(row)
def archive_proposal(self, proposal_id: str) -> None:
if not self.db_path.is_file():
return
with self._connect() as conn:
conn.execute(
"UPDATE supervise_proposals SET archived = 1 WHERE id = ?",
(proposal_id,),
)
conn.execute(
"""
UPDATE supervise_responses SET archived = 1
WHERE proposal_id = ?
""",
(proposal_id,),
)
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
return conn
def _init(self) -> None:
with self._connect() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS supervise_proposals (
id TEXT PRIMARY KEY,
bottle_slug TEXT NOT NULL,
tool TEXT NOT NULL,
proposed_file TEXT NOT NULL,
justification TEXT NOT NULL,
arrival_timestamp TEXT NOT NULL,
current_file_hash TEXT NOT NULL,
archived INTEGER NOT NULL DEFAULT 0
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS supervise_responses (
proposal_id TEXT PRIMARY KEY,
status TEXT NOT NULL,
notes TEXT NOT NULL,
final_file TEXT,
archived INTEGER NOT NULL DEFAULT 0
)
"""
)
self._chmod()
def _chmod(self) -> None:
try:
self.db_path.chmod(0o600)
except OSError:
pass
class _AuditStore:
def __init__(self, db_path: Path | None = None) -> None:
self.db_path = db_path or host_db_path()
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._init()
def write_audit_entry(self, entry: AuditEntry) -> Path:
with self._connect() as conn:
conn.execute(
"""
INSERT INTO supervise_audit_entries (
timestamp, bottle_slug, component, operator_action,
operator_notes, justification, diff
) VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
entry.timestamp,
entry.bottle_slug,
entry.component,
entry.operator_action,
entry.operator_notes,
entry.justification,
entry.diff,
),
)
self._chmod()
return self.db_path
def read_audit_entries(self, component: str, slug: str) -> list[AuditEntry]:
if not self.db_path.is_file():
return []
with self._connect() as conn:
rows = conn.execute(
"""
SELECT * FROM supervise_audit_entries
WHERE component = ? AND bottle_slug = ?
ORDER BY id
""",
(component, slug),
).fetchall()
return [_audit_entry_from_row(row) for row in rows]
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
return conn
def _init(self) -> None:
with self._connect() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS supervise_audit_entries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
bottle_slug TEXT NOT NULL,
component TEXT NOT NULL,
operator_action TEXT NOT NULL,
operator_notes TEXT NOT NULL,
justification TEXT NOT NULL,
diff TEXT NOT NULL
)
"""
)
self._chmod()
def _chmod(self) -> None:
try:
self.db_path.chmod(0o600)
except OSError:
pass
def _proposal_from_row(row: sqlite3.Row) -> Proposal:
return Proposal(
id=row["id"],
bottle_slug=row["bottle_slug"],
tool=row["tool"],
proposed_file=row["proposed_file"],
justification=row["justification"],
arrival_timestamp=row["arrival_timestamp"],
current_file_hash=row["current_file_hash"],
)
def _response_from_row(row: sqlite3.Row) -> Response:
return Response(
proposal_id=row["proposal_id"],
status=row["status"],
notes=row["notes"],
final_file=row["final_file"],
)
def _audit_entry_from_row(row: sqlite3.Row) -> AuditEntry:
return AuditEntry(
timestamp=row["timestamp"],
bottle_slug=row["bottle_slug"],
component=row["component"],
operator_action=row["operator_action"],
operator_notes=row["operator_notes"],
justification=row["justification"],
diff=row["diff"],
)
# --- Sidecar plan + abstract lifecycle ------------------------------------- # --- Sidecar plan + abstract lifecycle -------------------------------------
@@ -474,40 +628,6 @@ def _require_str(raw: dict[str, object], key: str) -> str:
return value return value
def _atomic_write(path: Path, content: str, *, mode: int) -> None:
"""Atomic: write to a sibling tmp file, fsync, rename."""
tmp = path.with_suffix(path.suffix + ".tmp")
fd = os.open(tmp, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, mode)
try:
os.write(fd, content.encode("utf-8"))
os.fsync(fd)
finally:
os.close(fd)
os.replace(tmp, path)
try:
import fcntl as _fcntl
def _try_flock(fd: int) -> None: # type: ignore[reportRedeclaration]
try:
_fcntl.flock(fd, _fcntl.LOCK_EX)
except OSError:
pass
def _try_funlock(fd: int) -> None: # type: ignore[reportRedeclaration]
try:
_fcntl.flock(fd, _fcntl.LOCK_UN)
except OSError:
pass
except ImportError: # pragma: no cover — Windows path
def _try_flock(fd: int) -> None: # noqa: F841 — Windows fallback
return None
def _try_funlock(fd: int) -> None: # noqa: F841 — Windows fallback
return None
__all__ = [ __all__ = [
"ACTION_OPERATOR_EDIT", "ACTION_OPERATOR_EDIT",
"AuditEntry", "AuditEntry",
@@ -536,7 +656,9 @@ __all__ = [
"audit_dir", "audit_dir",
"audit_log_path", "audit_log_path",
"bot_bottle_root", "bot_bottle_root",
"host_db_path",
"list_pending_proposals", "list_pending_proposals",
"queue_db_path",
"queue_dir_for_slug", "queue_dir_for_slug",
"read_audit_entries", "read_audit_entries",
"read_proposal", "read_proposal",
+14 -12
View File
@@ -1,6 +1,5 @@
"""Unit: supervise queue + audit log + diff helpers (PRD 0013).""" """Unit: supervise queue + audit log + diff helpers (PRD 0013)."""
import json
import tempfile import tempfile
import threading import threading
import time import time
@@ -19,8 +18,9 @@ from bot_bottle.supervise import (
TOOL_EGRESS_ALLOW, TOOL_EGRESS_ALLOW,
TOOL_GITLEAKS_ALLOW, TOOL_GITLEAKS_ALLOW,
archive_proposal, archive_proposal,
audit_log_path, host_db_path,
list_pending_proposals, list_pending_proposals,
queue_db_path,
read_audit_entries, read_audit_entries,
read_proposal, read_proposal,
read_response, read_response,
@@ -121,6 +121,7 @@ class TestQueueIO(unittest.TestCase):
p = _proposal() p = _proposal()
path = write_proposal(self.queue_dir, p) path = write_proposal(self.queue_dir, p)
self.assertTrue(path.exists()) self.assertTrue(path.exists())
self.assertEqual(queue_db_path(self.queue_dir), path)
self.assertEqual(0o600, path.stat().st_mode & 0o777) self.assertEqual(0o600, path.stat().st_mode & 0o777)
loaded = read_proposal(self.queue_dir, p.id) loaded = read_proposal(self.queue_dir, p.id)
self.assertEqual(p, loaded) self.assertEqual(p, loaded)
@@ -198,10 +199,9 @@ class TestQueueIO(unittest.TestCase):
proposal_id=p.id, status=STATUS_APPROVED, notes="", proposal_id=p.id, status=STATUS_APPROVED, notes="",
)) ))
archive_proposal(self.queue_dir, p.id) archive_proposal(self.queue_dir, p.id)
self.assertFalse((self.queue_dir / f"{p.id}.proposal.json").exists()) self.assertEqual([], list_pending_proposals(self.queue_dir))
self.assertFalse((self.queue_dir / f"{p.id}.response.json").exists()) with self.assertRaises(FileNotFoundError):
self.assertTrue((self.queue_dir / "processed" / f"{p.id}.proposal.json").exists()) read_response(self.queue_dir, p.id)
self.assertTrue((self.queue_dir / "processed" / f"{p.id}.response.json").exists())
def test_archive_is_idempotent_on_missing_files(self): def test_archive_is_idempotent_on_missing_files(self):
# Should not raise. # Should not raise.
@@ -237,6 +237,7 @@ class TestAuditLog(unittest.TestCase):
diff="--- before\n+++ after\n", diff="--- before\n+++ after\n",
) )
path = write_audit_entry(e) path = write_audit_entry(e)
self.assertEqual(host_db_path(), path)
self.assertEqual(0o600, path.stat().st_mode & 0o777) self.assertEqual(0o600, path.stat().st_mode & 0o777)
loaded = read_audit_entries("cred-proxy", "dev") loaded = read_audit_entries("cred-proxy", "dev")
self.assertEqual([e], loaded) self.assertEqual([e], loaded)
@@ -252,12 +253,13 @@ class TestAuditLog(unittest.TestCase):
justification="", justification="",
diff="", diff="",
)) ))
path = audit_log_path("egress", "dev") entries = read_audit_entries("egress", "dev")
with path.open() as f: self.assertEqual(3, len(entries))
lines = [line for line in f if line.strip()] self.assertEqual(
self.assertEqual(3, len(lines)) ["2026-05-25T12:00:00+00:00", "2026-05-25T12:00:01+00:00",
for line in lines: "2026-05-25T12:00:02+00:00"],
self.assertTrue(json.loads(line)) # each line is valid JSON [entry.timestamp for entry in entries],
)
def test_separate_logs_per_component_slug(self): def test_separate_logs_per_component_slug(self):
write_audit_entry(AuditEntry( write_audit_entry(AuditEntry(
+1 -3
View File
@@ -413,9 +413,7 @@ class TestHandleToolsCall(unittest.TestCase):
responder.join() responder.join()
# No pending proposals left after archive. # No pending proposals left after archive.
self.assertEqual([], _sv.list_pending_proposals(self.queue_dir)) self.assertEqual([], _sv.list_pending_proposals(self.queue_dir))
# Both files moved to processed/. self.assertFalse((self.queue_dir / "processed").exists())
processed = list((self.queue_dir / "processed").glob("*.json"))
self.assertEqual(2, len(processed))
def test_pending_response_times_out_without_archive(self): def test_pending_response_times_out_without_archive(self):
config = ServerConfig( config = ServerConfig(