"""Forge state persistence (PRD forge-native-integration, chunk 2). The orchestrator tracks one record per forge-targeted issue so it can map an incoming webhook back to the bottle handling it, drive the freeze / rehydrate loop, and run the watchdog. State is stored in a local SQLite database in `~/.bot-bottle/`. Access goes through the thin `ForgeStateStore` CRUD interface so the backing store (location or engine) can be swapped without touching callers; `SqliteForgeStateStore` is the first implementation. """ from __future__ import annotations import abc import json import sqlite3 from dataclasses import dataclass, field from pathlib import Path from ...supervise import bot_bottle_root _DB_FILENAME = "bot-bottle.db" # Lifecycle: a bottle is launched (running), frozen on the done signal, # and destroyed when the PR closes. STATUS_RUNNING = "running" STATUS_FROZEN = "frozen" STATUS_DESTROYED = "destroyed" @dataclass class ForgeState: """One forge-targeted issue's bottle lifecycle record.""" owner: str repo: str issue_number: int slug: str agent_name: str bottle_names: list[str] = field(default_factory=list) backend_name: str = "" agent_git_user: str = "" pr_number: int | None = None status: str = STATUS_RUNNING last_checkin_at: str = "" class ForgeStateStore(abc.ABC): """Thin CRUD surface over forge state. Implementations back it with a concrete store; callers depend only on this interface so the storage location/engine is swappable.""" @abc.abstractmethod def upsert(self, state: ForgeState) -> None: """Insert or replace the record keyed by (owner, repo, issue).""" @abc.abstractmethod def get(self, owner: str, repo: str, issue_number: int) -> ForgeState | None: """Fetch one record, or None when absent.""" @abc.abstractmethod def delete(self, owner: str, repo: str, issue_number: int) -> None: """Remove a record. Missing is success (idempotent).""" @abc.abstractmethod def all(self) -> list[ForgeState]: """Every record, for the status table and the watchdog sweep.""" def default_db_path() -> Path: return bot_bottle_root() / _DB_FILENAME class SqliteForgeStateStore(ForgeStateStore): """SQLite-backed `ForgeStateStore`. The database lives at `~/.bot-bottle/bot-bottle.db` by default; pass `db_path` to point at a different location (tests, alternate homes).""" def __init__(self, db_path: Path | None = None) -> None: self._db_path = db_path or default_db_path() self._db_path.parent.mkdir(parents=True, exist_ok=True) with self._connect() as conn: conn.execute( """ CREATE TABLE IF NOT EXISTS forge_state ( owner TEXT NOT NULL, repo TEXT NOT NULL, issue_number INTEGER NOT NULL, slug TEXT NOT NULL, agent_name TEXT NOT NULL, bottle_names TEXT NOT NULL, backend_name TEXT NOT NULL, agent_git_user TEXT NOT NULL, pr_number INTEGER, status TEXT NOT NULL, last_checkin_at TEXT NOT NULL, PRIMARY KEY (owner, repo, issue_number) ) """ ) def _connect(self) -> sqlite3.Connection: conn = sqlite3.connect(self._db_path) conn.row_factory = sqlite3.Row return conn def upsert(self, state: ForgeState) -> None: with self._connect() as conn: conn.execute( """ INSERT OR REPLACE INTO forge_state ( owner, repo, issue_number, slug, agent_name, bottle_names, backend_name, agent_git_user, pr_number, status, last_checkin_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( state.owner, state.repo, state.issue_number, state.slug, state.agent_name, json.dumps(state.bottle_names), state.backend_name, state.agent_git_user, state.pr_number, state.status, state.last_checkin_at, ), ) def get(self, owner: str, repo: str, issue_number: int) -> ForgeState | None: with self._connect() as conn: row = conn.execute( "SELECT * FROM forge_state " "WHERE owner = ? AND repo = ? AND issue_number = ?", (owner, repo, issue_number), ).fetchone() return _row_to_state(row) if row is not None else None def delete(self, owner: str, repo: str, issue_number: int) -> None: with self._connect() as conn: conn.execute( "DELETE FROM forge_state " "WHERE owner = ? AND repo = ? AND issue_number = ?", (owner, repo, issue_number), ) def all(self) -> list[ForgeState]: with self._connect() as conn: rows = conn.execute( "SELECT * FROM forge_state ORDER BY owner, repo, issue_number" ).fetchall() return [_row_to_state(row) for row in rows] def _row_to_state(row: sqlite3.Row) -> ForgeState: return ForgeState( owner=row["owner"], repo=row["repo"], issue_number=row["issue_number"], slug=row["slug"], agent_name=row["agent_name"], bottle_names=json.loads(row["bottle_names"]), backend_name=row["backend_name"], agent_git_user=row["agent_git_user"], pr_number=row["pr_number"], status=row["status"], last_checkin_at=row["last_checkin_at"], )