Files
bot-bottle/bot_bottle/supervise.py
T
didericis-codex 29904609da
lint / lint (push) Successful in 2m4s
test / unit (pull_request) Successful in 59s
test / integration (pull_request) Successful in 20s
test / coverage (pull_request) Successful in 1m10s
fix(supervise): remove queue directory from db-backed flow
2026-07-01 19:50:38 +00:00

704 lines
22 KiB
Python

"""Per-bottle supervise plane (PRD 0013).
The supervise plane is the per-bottle MCP sidecar plus its host-side
queue/audit support. The sidecar (bot_bottle.supervise_server)
sits on the bottle's internal network and exposes MCP tools the agent
calls when it needs an operator-reviewed egress change:
* egress-block / allow — agent proposes a new routes.yaml
Each tool call: the agent passes the full proposed file plus a
justification text. The sidecar validates the proposal syntactically,
writes it to the host SQLite queue table, and holds the tool-call
connection open. The operator's supervise TUI
(bot_bottle.cli.supervise) sees the proposal, accepts
approve / modify / reject, and writes a response row. The sidecar sees
the response and returns `{status, notes}` to the agent.
This module defines the host-side library: dataclasses for the queue
record shapes, queue read/write helpers, the audit log writer, and the
diff renderer. The in-container sidecar lives in
bot_bottle/supervise_server.py; the supervise daemon's container
lifecycle is owned by the sidecar bundle (PRD 0024).
For 0013 the supervisor's approval handlers are deliberately no-ops:
on approval the audit log is written and the response file is
delivered to the agent, but no host-side config change happens. The
remediation engines that wire real config changes land in PRDs 0014,
0015, and 0016.
"""
from __future__ import annotations
import dataclasses
import difflib
import hashlib
import os
import sqlite3
import time
import uuid
from abc import ABC
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
SUPERVISE_HOSTNAME = "supervise"
SUPERVISE_PORT = 9100
TOOL_EGRESS_BLOCK = "egress-block"
TOOL_EGRESS_ALLOW = "egress-allow"
TOOL_GITLEAKS_ALLOW = "gitleaks-allow"
# Written directly by the egress addon (not an agent-facing MCP tool) when an
# outbound DLP token block is routed to the operator for override (PRD 0062).
TOOL_EGRESS_TOKEN_ALLOW = "egress-token-allow"
TOOL_LIST_EGRESS_ROUTES = "list-egress-routes"
TOOLS: tuple[str, ...] = (
TOOL_EGRESS_ALLOW,
TOOL_EGRESS_BLOCK,
TOOL_GITLEAKS_ALLOW,
TOOL_EGRESS_TOKEN_ALLOW,
TOOL_LIST_EGRESS_ROUTES,
)
# The supervise sidecar uses these to query egress's
# introspection endpoint for the `list-egress-routes` MCP
# tool. The hostname + port match egress's docker network
# listen port (see backend.docker.egress.EGRESS_PORT). The supervise
# daemon runs inside the sidecar bundle alongside egress, so loopback
# is the stable address across docker, smolmachines, and Apple
# Container backends.
EGRESS_FORWARD_PROXY = "http://127.0.0.1:9099"
EGRESS_INTROSPECT_URL = "http://_egress.local/allowlist"
COMPONENT_FOR_TOOL: dict[str, str] = {
TOOL_EGRESS_ALLOW: "egress",
TOOL_EGRESS_BLOCK: "egress",
}
STATUS_APPROVED = "approved"
STATUS_MODIFIED = "modified"
STATUS_REJECTED = "rejected"
STATUSES: tuple[str, ...] = (STATUS_APPROVED, STATUS_MODIFIED, STATUS_REJECTED)
# Operator-initiated audit entries (no tool call). PRD 0014's
# `routes edit <bottle>` verb writes entries with this action.
ACTION_OPERATOR_EDIT = "operator-edit"
DB_PATH_IN_CONTAINER = "/run/supervise/bot-bottle.db"
DEFAULT_POLL_INTERVAL_SEC = 0.5
HOST_DB_FILENAME = "bot-bottle.db"
# --- Paths -----------------------------------------------------------------
def bot_bottle_root() -> Path:
return Path.home() / ".bot-bottle"
def audit_dir() -> Path:
return bot_bottle_root() / "audit"
def audit_log_path(component: str, slug: str) -> Path:
return audit_dir() / f"{component}-{slug}.log"
def host_db_path() -> Path:
return bot_bottle_root() / HOST_DB_FILENAME
def queue_db_path() -> Path:
env_path = os.environ.get("SUPERVISE_DB_PATH", "").strip()
return Path(env_path) if env_path else host_db_path()
# --- Dataclasses -----------------------------------------------------------
@dataclass(frozen=True)
class Proposal:
"""One pending tool-call from the agent."""
id: str
bottle_slug: str
tool: str
proposed_file: str
justification: str
arrival_timestamp: str
current_file_hash: str
@classmethod
def new(
cls,
*,
bottle_slug: str,
tool: str,
proposed_file: str,
justification: str,
current_file_hash: str,
now: datetime | None = None,
) -> "Proposal":
ts = (now or datetime.now(timezone.utc)).isoformat()
return cls(
id=str(uuid.uuid4()),
bottle_slug=bottle_slug,
tool=tool,
proposed_file=proposed_file,
justification=justification,
arrival_timestamp=ts,
current_file_hash=current_file_hash,
)
def to_dict(self) -> dict[str, object]:
return dataclasses.asdict(self)
@classmethod
def from_dict(cls, raw: dict[str, object]) -> "Proposal":
tool = _require_str(raw, "tool")
if tool not in TOOLS:
raise ValueError(f"tool must be one of {TOOLS}; got {tool!r}")
return cls(
id=_require_str(raw, "id"),
bottle_slug=_require_str(raw, "bottle_slug"),
tool=tool,
proposed_file=_require_str(raw, "proposed_file"),
justification=_require_str(raw, "justification"),
arrival_timestamp=_require_str(raw, "arrival_timestamp"),
current_file_hash=_require_str(raw, "current_file_hash"),
)
@dataclass(frozen=True)
class Response:
"""The operator's decision on a proposal. The TUI writes one of
these to the queue table; the sidecar reads it and returns the
`{status, notes}` pair to the agent's tool call.
`final_file` carries the file content the supervisor will
actually apply: for `approved`, equal to the proposal's
`proposed_file`; for `modified`, the operator's edited version
(the audit diff is current → final_file, not current →
proposed_file); for `rejected`, None."""
proposal_id: str
status: str
notes: str
final_file: str | None = None
def to_dict(self) -> dict[str, object]:
return dataclasses.asdict(self)
@classmethod
def from_dict(cls, raw: dict[str, object]) -> "Response":
status = _require_str(raw, "status")
if status not in STATUSES:
raise ValueError(
f"response status must be one of {STATUSES}; got {status!r}"
)
final = raw.get("final_file")
if final is not None and not isinstance(final, str):
raise ValueError(
f"final_file must be a string or null; got {type(final).__name__}"
)
return cls(
proposal_id=_require_str(raw, "proposal_id"),
status=status,
notes=_require_str(raw, "notes"),
final_file=final,
)
@dataclass(frozen=True)
class AuditEntry:
"""One row of the per-bottle audit log. JSON-Lines, append-only."""
timestamp: str
bottle_slug: str
component: str
operator_action: str
operator_notes: str
justification: str
diff: str
def to_dict(self) -> dict[str, object]:
return dataclasses.asdict(self)
# --- Queue I/O -------------------------------------------------------------
def write_proposal(proposal: Proposal) -> Path:
"""Persist `proposal` in the queue database, mode 0o600.
Directory is created if missing."""
return _QueueStore(proposal.bottle_slug).write_proposal(proposal)
def read_proposal(bottle_slug: str, proposal_id: str) -> Proposal:
return _QueueStore(bottle_slug).read_proposal(proposal_id)
def list_pending_proposals(bottle_slug: str) -> list[Proposal]:
"""All proposals for `bottle_slug` that do not yet have a matching
response. Sorted by `arrival_timestamp` so the operator
sees the queue FIFO."""
return _QueueStore(bottle_slug).list_pending_proposals()
def list_all_pending_proposals() -> list[Proposal]:
"""All pending proposals across bottles, sorted FIFO."""
return _QueueStore("").list_all_pending_proposals()
def write_response(bottle_slug: str, response: Response) -> Path:
return _QueueStore(bottle_slug).write_response(response)
def read_response(bottle_slug: str, proposal_id: str) -> Response:
return _QueueStore(bottle_slug).read_response(proposal_id)
def wait_for_response(
bottle_slug: str,
proposal_id: str,
*,
poll_interval: float = DEFAULT_POLL_INTERVAL_SEC,
deadline: float | None = None,
) -> Response:
"""Block until a response file appears for `proposal_id`, then
return it. `deadline` is an absolute time.monotonic() value after
which the wait raises TimeoutError. None waits forever — the
natural shape, since the operator's response time is unbounded.
Polls SQLite so the implementation stays portable and stdlib-only."""
store = _QueueStore(bottle_slug)
while True:
try:
return store.read_response(proposal_id)
except FileNotFoundError:
pass
if deadline is not None and time.monotonic() >= deadline:
raise TimeoutError(f"no response for proposal {proposal_id!r}")
time.sleep(poll_interval)
def archive_proposal(bottle_slug: str, proposal_id: str) -> None:
"""Mark both proposal and response rows processed.
Idempotent — missing rows are silently skipped."""
_QueueStore(bottle_slug).archive_proposal(proposal_id)
# --- Audit log -------------------------------------------------------------
def write_audit_entry(entry: AuditEntry) -> Path:
"""Append `entry` to the host supervise audit table."""
return _AuditStore().write_audit_entry(entry)
def read_audit_entries(component: str, slug: str) -> list[AuditEntry]:
"""Load all audit entries for the given component+slug."""
return _AuditStore().read_audit_entries(component, slug)
# --- Diff rendering --------------------------------------------------------
def render_diff(before: str, after: str, *, label: str = "config") -> str:
"""Unified diff suitable for the audit log + TUI. Empty diff (no
changes) renders as the empty string."""
diff = difflib.unified_diff(
before.splitlines(keepends=True),
after.splitlines(keepends=True),
fromfile=f"{label} (current)",
tofile=f"{label} (proposed)",
lineterm="",
)
parts = list(diff)
if not parts:
return ""
return "".join(p if p.endswith("\n") else p + "\n" for p in parts).rstrip("\n")
def sha256_hex(content: str) -> str:
return hashlib.sha256(content.encode("utf-8")).hexdigest()
# --- SQLite storage --------------------------------------------------------
class _QueueStore:
def __init__(self, queue_key: str) -> None:
self.queue_key = queue_key
self.db_path = queue_db_path()
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._init()
def write_proposal(self, proposal: Proposal) -> Path:
with self._connect() as conn:
conn.execute(
"""
INSERT OR REPLACE INTO supervise_proposals (
queue_key, id, bottle_slug, tool, proposed_file, justification,
arrival_timestamp, current_file_hash, archived
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, 0)
""",
(
self.queue_key,
proposal.id,
proposal.bottle_slug,
proposal.tool,
proposal.proposed_file,
proposal.justification,
proposal.arrival_timestamp,
proposal.current_file_hash,
),
)
self._chmod()
return self.db_path
def read_proposal(self, proposal_id: str) -> Proposal:
with self._connect() as conn:
row = conn.execute(
"""
SELECT * FROM supervise_proposals
WHERE queue_key = ? AND id = ? AND archived = 0
""",
(self.queue_key, proposal_id),
).fetchone()
if row is None:
raise FileNotFoundError(proposal_id)
return _proposal_from_row(row)
def list_pending_proposals(self) -> list[Proposal]:
if not self.db_path.is_file():
return []
with self._connect() as conn:
rows = conn.execute(
"""
SELECT p.* FROM supervise_proposals p
WHERE p.archived = 0
AND p.queue_key = ?
AND NOT EXISTS (
SELECT 1 FROM supervise_responses r
WHERE r.queue_key = p.queue_key
AND r.proposal_id = p.id
AND r.archived = 0
)
ORDER BY p.arrival_timestamp, p.id
""",
(self.queue_key,),
).fetchall()
return [_proposal_from_row(row) for row in rows]
def list_all_pending_proposals(self) -> list[Proposal]:
if not self.db_path.is_file():
return []
with self._connect() as conn:
rows = conn.execute(
"""
SELECT p.* FROM supervise_proposals p
WHERE p.archived = 0
AND NOT EXISTS (
SELECT 1 FROM supervise_responses r
WHERE r.queue_key = p.queue_key
AND r.proposal_id = p.id
AND r.archived = 0
)
ORDER BY p.arrival_timestamp, p.id
"""
).fetchall()
return [_proposal_from_row(row) for row in rows]
def write_response(self, response: Response) -> Path:
with self._connect() as conn:
conn.execute(
"""
INSERT OR REPLACE INTO supervise_responses (
queue_key, proposal_id, status, notes, final_file, archived
) VALUES (?, ?, ?, ?, ?, 0)
""",
(
self.queue_key,
response.proposal_id,
response.status,
response.notes,
response.final_file,
),
)
self._chmod()
return self.db_path
def read_response(self, proposal_id: str) -> Response:
with self._connect() as conn:
row = conn.execute(
"""
SELECT * FROM supervise_responses
WHERE queue_key = ? AND proposal_id = ? AND archived = 0
""",
(self.queue_key, proposal_id),
).fetchone()
if row is None:
raise FileNotFoundError(proposal_id)
return _response_from_row(row)
def archive_proposal(self, proposal_id: str) -> None:
if not self.db_path.is_file():
return
with self._connect() as conn:
conn.execute(
"""
UPDATE supervise_proposals SET archived = 1
WHERE queue_key = ? AND id = ?
""",
(self.queue_key, proposal_id),
)
conn.execute(
"""
UPDATE supervise_responses SET archived = 1
WHERE queue_key = ? AND proposal_id = ?
""",
(self.queue_key, proposal_id),
)
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
return conn
def _init(self) -> None:
with self._connect() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS supervise_proposals (
queue_key TEXT NOT NULL,
id TEXT NOT NULL,
bottle_slug TEXT NOT NULL,
tool TEXT NOT NULL,
proposed_file TEXT NOT NULL,
justification TEXT NOT NULL,
arrival_timestamp TEXT NOT NULL,
current_file_hash TEXT NOT NULL,
archived INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (queue_key, id)
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS supervise_responses (
queue_key TEXT NOT NULL,
proposal_id TEXT NOT NULL,
status TEXT NOT NULL,
notes TEXT NOT NULL,
final_file TEXT,
archived INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (queue_key, proposal_id)
)
"""
)
self._chmod()
def _chmod(self) -> None:
try:
self.db_path.chmod(0o600)
except OSError:
pass
class _AuditStore:
def __init__(self, db_path: Path | None = None) -> None:
self.db_path = db_path or host_db_path()
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._init()
def write_audit_entry(self, entry: AuditEntry) -> Path:
with self._connect() as conn:
conn.execute(
"""
INSERT INTO supervise_audit_entries (
timestamp, bottle_slug, component, operator_action,
operator_notes, justification, diff
) VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
entry.timestamp,
entry.bottle_slug,
entry.component,
entry.operator_action,
entry.operator_notes,
entry.justification,
entry.diff,
),
)
self._chmod()
return self.db_path
def read_audit_entries(self, component: str, slug: str) -> list[AuditEntry]:
if not self.db_path.is_file():
return []
with self._connect() as conn:
rows = conn.execute(
"""
SELECT * FROM supervise_audit_entries
WHERE component = ? AND bottle_slug = ?
ORDER BY id
""",
(component, slug),
).fetchall()
return [_audit_entry_from_row(row) for row in rows]
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
return conn
def _init(self) -> None:
with self._connect() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS supervise_audit_entries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
bottle_slug TEXT NOT NULL,
component TEXT NOT NULL,
operator_action TEXT NOT NULL,
operator_notes TEXT NOT NULL,
justification TEXT NOT NULL,
diff TEXT NOT NULL
)
"""
)
self._chmod()
def _chmod(self) -> None:
try:
self.db_path.chmod(0o600)
except OSError:
pass
def _proposal_from_row(row: sqlite3.Row) -> Proposal:
return Proposal(
id=row["id"],
bottle_slug=row["bottle_slug"],
tool=row["tool"],
proposed_file=row["proposed_file"],
justification=row["justification"],
arrival_timestamp=row["arrival_timestamp"],
current_file_hash=row["current_file_hash"],
)
def _response_from_row(row: sqlite3.Row) -> Response:
return Response(
proposal_id=row["proposal_id"],
status=row["status"],
notes=row["notes"],
final_file=row["final_file"],
)
def _audit_entry_from_row(row: sqlite3.Row) -> AuditEntry:
return AuditEntry(
timestamp=row["timestamp"],
bottle_slug=row["bottle_slug"],
component=row["component"],
operator_action=row["operator_action"],
operator_notes=row["operator_notes"],
justification=row["justification"],
diff=row["diff"],
)
# --- Sidecar plan + abstract lifecycle -------------------------------------
@dataclass(frozen=True)
class SupervisePlan:
"""Output of Supervise.prepare; consumed by .start.
`db_path` is the host database bind-mounted into the sidecar at
/run/supervise/bot-bottle.db. `internal_network` is empty at
prepare time; the backend's launch step fills it via
dataclasses.replace before calling .start."""
slug: str
db_path: Path
internal_network: str = ""
class Supervise(ABC):
"""Per-bottle supervise sidecar. Encapsulates host-side database
staging; the sidecar's start/stop lifecycle is backend-specific."""
def prepare(
self,
slug: str,
stage_dir: Path,
) -> SupervisePlan:
"""Stage the host database. Returns the plan; `internal_network`
must be set by the launch step before .start runs."""
del stage_dir
db_path = host_db_path()
_QueueStore(slug)
_AuditStore(db_path)
return SupervisePlan(
slug=slug,
db_path=db_path,
)
# --- Helpers ---------------------------------------------------------------
def _require_str(raw: dict[str, object], key: str) -> str:
value = raw.get(key)
if not isinstance(value, str):
raise ValueError(f"missing or non-string field {key!r}")
return value
__all__ = [
"ACTION_OPERATOR_EDIT",
"AuditEntry",
"COMPONENT_FOR_TOOL",
"DEFAULT_POLL_INTERVAL_SEC",
"DB_PATH_IN_CONTAINER",
"Proposal",
"Response",
"STATUSES",
"STATUS_APPROVED",
"STATUS_MODIFIED",
"STATUS_REJECTED",
"SUPERVISE_HOSTNAME",
"SUPERVISE_PORT",
"Supervise",
"SupervisePlan",
"TOOLS",
"EGRESS_FORWARD_PROXY",
"EGRESS_INTROSPECT_URL",
"TOOL_EGRESS_ALLOW",
"TOOL_EGRESS_BLOCK",
"TOOL_GITLEAKS_ALLOW",
"TOOL_EGRESS_TOKEN_ALLOW",
"TOOL_LIST_EGRESS_ROUTES",
"archive_proposal",
"audit_dir",
"audit_log_path",
"bot_bottle_root",
"host_db_path",
"list_pending_proposals",
"list_all_pending_proposals",
"queue_db_path",
"read_audit_entries",
"read_proposal",
"read_response",
"render_diff",
"sha256_hex",
"wait_for_response",
"write_audit_entry",
"write_proposal",
"write_response",
]