Files
bot-bottle/bot_bottle/queue_store.py
didericis-claude a4d82e5ff2
lint / lint (push) Successful in 2m0s
test / unit (pull_request) Successful in 1m4s
test / integration (pull_request) Successful in 23s
test / coverage (pull_request) Successful in 1m9s
refactor: extract TableMigrations and DbStore base class
Adds bot_bottle/migrations.py (TableMigrations) and bot_bottle/db_store.py
(DbStore) per PR review. Both stores now inherit from DbStore and hold a
TableMigrations instance instead of duplicating schema-version logic inline.
2026-07-02 21:04:34 +00:00

241 lines
8.5 KiB
Python

"""SQLite-backed queue store for supervise proposals and responses (PRD 0013)."""
from __future__ import annotations
import os
import sqlite3
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .supervise import Proposal, Response
try:
from .db_store import DbStore
from .migrations import TableMigrations
except ImportError:
from db_store import DbStore # type: ignore[import-not-found] # pylint: disable=import-error,no-name-in-module
from migrations import TableMigrations # type: ignore[import-not-found] # pylint: disable=import-error,no-name-in-module
def get_supervise_mod() -> object:
"""Lazy import of supervise to avoid a circular-import at module init time.
By the time any QueueStore method is called, both modules are fully loaded.
Mirrors our own module identity: when we are 'queue_store' (sidecar flat
context or tests that inject bot_bottle/ into sys.path) we use the flat
'supervise' module so that patches on supervise.bot_bottle_root propagate
correctly. When we are 'bot_bottle.queue_store' we use 'bot_bottle.supervise'."""
import sys
sv_name = "supervise" if __name__ == "queue_store" else "bot_bottle.supervise"
if sv_name in sys.modules:
return sys.modules[sv_name]
try:
import bot_bottle.supervise as _m
except ImportError:
import supervise as _m # type: ignore[import-not-found] # pylint: disable=import-error,no-name-in-module
return _m
# One entry per schema version: _MIGRATIONS.migrations[0] brings a fresh DB
# to version 1, [1] to version 2, and so on. Add new migrations at the end;
# never edit existing ones.
_MIGRATIONS = TableMigrations("queue_store", [
# v1 — proposals table
"""
CREATE TABLE IF NOT EXISTS supervise_proposals (
queue_key TEXT NOT NULL,
id TEXT NOT NULL,
bottle_slug TEXT NOT NULL,
tool TEXT NOT NULL,
proposed_file TEXT NOT NULL,
justification TEXT NOT NULL,
arrival_timestamp TEXT NOT NULL,
current_file_hash TEXT NOT NULL,
archived INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (queue_key, id)
)
""",
# v2 — responses table
"""
CREATE TABLE IF NOT EXISTS supervise_responses (
queue_key TEXT NOT NULL,
proposal_id TEXT NOT NULL,
status TEXT NOT NULL,
notes TEXT NOT NULL,
final_file TEXT,
archived INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (queue_key, proposal_id)
)
""",
])
class QueueStore(DbStore):
"""SQLite-backed persistent store for supervise proposals and responses."""
def __init__(self, queue_key: str, db_path: Path | None = None) -> None:
self.queue_key = queue_key
if db_path is not None:
resolved = db_path
else:
# In the sidecar container SUPERVISE_DB_PATH points at the
# bind-mounted host DB. On the host this env var is never set,
# so we always fall through to host_db_path().
env_path = os.environ.get("SUPERVISE_DB_PATH", "").strip()
resolved = Path(env_path) if env_path else get_supervise_mod().host_db_path() # type: ignore[attr-defined]
super().__init__(resolved, _MIGRATIONS)
def write_proposal(self, proposal: Proposal) -> Path:
with self._connect() as conn:
conn.execute(
"""
INSERT OR REPLACE INTO supervise_proposals (
queue_key, id, bottle_slug, tool, proposed_file, justification,
arrival_timestamp, current_file_hash, archived
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, 0)
""",
(
self.queue_key,
proposal.id,
proposal.bottle_slug,
proposal.tool,
proposal.proposed_file,
proposal.justification,
proposal.arrival_timestamp,
proposal.current_file_hash,
),
)
self._chmod()
return self.db_path
def read_proposal(self, proposal_id: str) -> Proposal:
with self._connect() as conn:
row = conn.execute(
"""
SELECT * FROM supervise_proposals
WHERE queue_key = ? AND id = ? AND archived = 0
""",
(self.queue_key, proposal_id),
).fetchone()
if row is None:
raise FileNotFoundError(proposal_id)
return self._row_to_proposal(row)
def list_pending_proposals(self) -> list[Proposal]:
if not self.db_path.is_file():
return []
with self._connect() as conn:
rows = conn.execute(
"""
SELECT p.* FROM supervise_proposals p
WHERE p.archived = 0
AND p.queue_key = ?
AND NOT EXISTS (
SELECT 1 FROM supervise_responses r
WHERE r.queue_key = p.queue_key
AND r.proposal_id = p.id
AND r.archived = 0
)
ORDER BY p.arrival_timestamp, p.id
""",
(self.queue_key,),
).fetchall()
return [self._row_to_proposal(row) for row in rows]
def list_all_pending_proposals(self) -> list[Proposal]:
if not self.db_path.is_file():
return []
with self._connect() as conn:
rows = conn.execute(
"""
SELECT p.* FROM supervise_proposals p
WHERE p.archived = 0
AND NOT EXISTS (
SELECT 1 FROM supervise_responses r
WHERE r.queue_key = p.queue_key
AND r.proposal_id = p.id
AND r.archived = 0
)
ORDER BY p.arrival_timestamp, p.id
"""
).fetchall()
return [self._row_to_proposal(row) for row in rows]
def write_response(self, response: Response) -> Path:
with self._connect() as conn:
conn.execute(
"""
INSERT OR REPLACE INTO supervise_responses (
queue_key, proposal_id, status, notes, final_file, archived
) VALUES (?, ?, ?, ?, ?, 0)
""",
(
self.queue_key,
response.proposal_id,
response.status,
response.notes,
response.final_file,
),
)
self._chmod()
return self.db_path
def read_response(self, proposal_id: str) -> Response:
with self._connect() as conn:
row = conn.execute(
"""
SELECT * FROM supervise_responses
WHERE queue_key = ? AND proposal_id = ? AND archived = 0
""",
(self.queue_key, proposal_id),
).fetchone()
if row is None:
raise FileNotFoundError(proposal_id)
return self._row_to_response(row)
def archive_proposal(self, proposal_id: str) -> None:
if not self.db_path.is_file():
return
with self._connect() as conn:
conn.execute(
"""
UPDATE supervise_proposals SET archived = 1
WHERE queue_key = ? AND id = ?
""",
(self.queue_key, proposal_id),
)
conn.execute(
"""
UPDATE supervise_responses SET archived = 1
WHERE queue_key = ? AND proposal_id = ?
""",
(self.queue_key, proposal_id),
)
@staticmethod
def _row_to_proposal(row: sqlite3.Row) -> Proposal:
m = get_supervise_mod()
return m.Proposal( # type: ignore[attr-defined]
id=row["id"],
bottle_slug=row["bottle_slug"],
tool=row["tool"],
proposed_file=row["proposed_file"],
justification=row["justification"],
arrival_timestamp=row["arrival_timestamp"],
current_file_hash=row["current_file_hash"],
)
@staticmethod
def _row_to_response(row: sqlite3.Row) -> Response:
m = get_supervise_mod()
return m.Response( # type: ignore[attr-defined]
proposal_id=row["proposal_id"],
status=row["status"],
notes=row["notes"],
final_file=row["final_file"],
)
__all__ = ["QueueStore"]