refactor(forge): address PR #318 review — PR/Issue split, sqlite state, drop footer
lint / lint (push) Successful in 1m59s
test / unit (pull_request) Successful in 48s
test / integration (pull_request) Successful in 17s
test / coverage (pull_request) Successful in 58s

Addresses the five review comments on PR #318:

- Split PullRequest from Issue and add a dedicated read_pr method on
  Forge/ScopedForge/GiteaForge (a PR carries merge state an issue does
  not); is_pr_open now derives from read_pr.
- Replace the JSON-file forge state with a thin swappable CRUD interface
  (ForgeStateStore) backed by SQLite (SqliteForgeStateStore) at
  ~/.bot-bottle/bot-bottle.db.
- Remove the provenance footer (provenance.py + its test): a mutable,
  unsigned PR comment is not an audit record.
- Reword the PRD: provenance is exposed via an API, not surfaced in the
  PR; document the Issue/PullRequest split and the SQLite store.

pyright clean (whole repo), pylint 10/10, 38 forge/resume unit tests pass;
no remaining refs to the removed provenance module or old JSON state API.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01WL77TgFxKbs3cidGMG9dz7
This commit is contained in:
2026-07-01 08:37:25 -04:00
parent f211ece6bf
commit 42004d37fd
9 changed files with 332 additions and 432 deletions
+23 -3
View File
@@ -26,8 +26,7 @@ from dataclasses import dataclass
@dataclass(frozen=True)
class Issue:
"""A forge issue or PR (forges model PRs as issues with the same
number)."""
"""A forge issue (not a PR — see `PullRequest`)."""
number: int
title: str
@@ -35,6 +34,20 @@ class Issue:
state: str # "open" | "closed"
@dataclass(frozen=True)
class PullRequest:
"""A forge pull request. Kept distinct from `Issue` even though some
forges model PRs as issues on the wire: the domain objects carry
different data (a PR has merge state) and are read through different
methods (`read_pr` vs `read_issue`)."""
number: int
title: str
body: str
state: str # "open" | "closed"
merged: bool
@dataclass(frozen=True)
class Comment:
id: int
@@ -53,7 +66,11 @@ class Forge(abc.ABC):
@abc.abstractmethod
def read_issue(self, number: int) -> Issue:
"""Read an issue or PR body (read-anywhere)."""
"""Read an issue body (read-anywhere)."""
@abc.abstractmethod
def read_pr(self, number: int) -> PullRequest:
"""Read a pull request, including its merge state (read-anywhere)."""
@abc.abstractmethod
def read_comments(self, number: int) -> list[Comment]:
@@ -122,6 +139,9 @@ class ScopedForge(Forge):
def read_issue(self, number: int) -> Issue:
return self._inner.read_issue(number)
def read_pr(self, number: int) -> PullRequest:
return self._inner.read_pr(number)
def read_comments(self, number: int) -> list[Comment]:
return self._inner.read_comments(number)
+12 -2
View File
@@ -20,7 +20,7 @@ import urllib.error
import urllib.request
from typing import Any
from ..forge.base import Comment, Forge, Issue
from ..forge.base import Comment, Forge, Issue, PullRequest
# Bound every Gitea call: a hung instance must not stall the sidecar.
_API_TIMEOUT_SECS = 30
@@ -124,6 +124,16 @@ class GiteaForge(Forge):
state=str(raw.get("state", "")),
)
def read_pr(self, number: int) -> PullRequest:
raw = self._client.get_pull(number)
return PullRequest(
number=int(raw.get("number", number)),
title=str(raw.get("title", "")),
body=str(raw.get("body", "") or ""),
state=str(raw.get("state", "")),
merged=bool(raw.get("merged", False)),
)
def read_comments(self, number: int) -> list[Comment]:
return [
Comment(
@@ -154,7 +164,7 @@ class GiteaForge(Forge):
return None
def is_pr_open(self, number: int) -> bool:
return self._client.get_pull(number).get("state") == "open"
return self.read_pr(number).state == "open"
def _read_error_body(exc: urllib.error.HTTPError) -> str:
+126 -60
View File
@@ -2,26 +2,25 @@
The orchestrator tracks one record per forge-targeted issue so it can
map an incoming webhook back to the bottle handling it, drive the
freeze / rehydrate loop, and run the watchdog. State lives on disk and
survives orchestrator restarts:
freeze / rehydrate loop, and run the watchdog.
~/.bot-bottle/forge/<owner>/<repo>/issue-<n>.json
Writes are atomic (`os.replace`) so a crash mid-write never leaves a
truncated record.
State is stored in a local SQLite database in `~/.bot-bottle/`. Access
goes through the thin `ForgeStateStore` CRUD interface so the backing
store (location or engine) can be swapped without touching callers;
`SqliteForgeStateStore` is the first implementation.
"""
from __future__ import annotations
import abc
import json
import os
from dataclasses import asdict, dataclass, field, fields
from typing import Any
import sqlite3
from dataclasses import dataclass, field
from pathlib import Path
from ...supervise import bot_bottle_root
_FORGE_SUBDIR = "forge"
_DB_FILENAME = "bot-bottle.db"
# Lifecycle: a bottle is launched (running), frozen on the done signal,
# and destroyed when the PR closes.
@@ -46,60 +45,127 @@ class ForgeState:
status: str = STATUS_RUNNING
last_checkin_at: str = ""
def to_json(self) -> str:
return json.dumps(asdict(self), indent=2, sort_keys=True)
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "ForgeState":
# Tolerate unknown keys (forward-compat) by filtering to fields.
known = {f.name for f in fields(cls)}
return cls(**{k: v for k, v in data.items() if k in known})
class ForgeStateStore(abc.ABC):
"""Thin CRUD surface over forge state. Implementations back it with a
concrete store; callers depend only on this interface so the storage
location/engine is swappable."""
@abc.abstractmethod
def upsert(self, state: ForgeState) -> None:
"""Insert or replace the record keyed by (owner, repo, issue)."""
@abc.abstractmethod
def get(self, owner: str, repo: str, issue_number: int) -> ForgeState | None:
"""Fetch one record, or None when absent."""
@abc.abstractmethod
def delete(self, owner: str, repo: str, issue_number: int) -> None:
"""Remove a record. Missing is success (idempotent)."""
@abc.abstractmethod
def all(self) -> list[ForgeState]:
"""Every record, for the status table and the watchdog sweep."""
def _forge_root() -> Path:
return bot_bottle_root() / _FORGE_SUBDIR
def default_db_path() -> Path:
return bot_bottle_root() / _DB_FILENAME
def forge_state_path(owner: str, repo: str, issue_number: int) -> Path:
return _forge_root() / owner / repo / f"issue-{issue_number}.json"
class SqliteForgeStateStore(ForgeStateStore):
"""SQLite-backed `ForgeStateStore`. The database lives at
`~/.bot-bottle/bot-bottle.db` by default; pass `db_path` to point at
a different location (tests, alternate homes)."""
def __init__(self, db_path: Path | None = None) -> None:
self._db_path = db_path or default_db_path()
self._db_path.parent.mkdir(parents=True, exist_ok=True)
with self._connect() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS forge_state (
owner TEXT NOT NULL,
repo TEXT NOT NULL,
issue_number INTEGER NOT NULL,
slug TEXT NOT NULL,
agent_name TEXT NOT NULL,
bottle_names TEXT NOT NULL,
backend_name TEXT NOT NULL,
agent_git_user TEXT NOT NULL,
pr_number INTEGER,
status TEXT NOT NULL,
last_checkin_at TEXT NOT NULL,
PRIMARY KEY (owner, repo, issue_number)
)
"""
)
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self._db_path)
conn.row_factory = sqlite3.Row
return conn
def upsert(self, state: ForgeState) -> None:
with self._connect() as conn:
conn.execute(
"""
INSERT OR REPLACE INTO forge_state (
owner, repo, issue_number, slug, agent_name,
bottle_names, backend_name, agent_git_user,
pr_number, status, last_checkin_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
state.owner,
state.repo,
state.issue_number,
state.slug,
state.agent_name,
json.dumps(state.bottle_names),
state.backend_name,
state.agent_git_user,
state.pr_number,
state.status,
state.last_checkin_at,
),
)
def get(self, owner: str, repo: str, issue_number: int) -> ForgeState | None:
with self._connect() as conn:
row = conn.execute(
"SELECT * FROM forge_state "
"WHERE owner = ? AND repo = ? AND issue_number = ?",
(owner, repo, issue_number),
).fetchone()
return _row_to_state(row) if row is not None else None
def delete(self, owner: str, repo: str, issue_number: int) -> None:
with self._connect() as conn:
conn.execute(
"DELETE FROM forge_state "
"WHERE owner = ? AND repo = ? AND issue_number = ?",
(owner, repo, issue_number),
)
def all(self) -> list[ForgeState]:
with self._connect() as conn:
rows = conn.execute(
"SELECT * FROM forge_state ORDER BY owner, repo, issue_number"
).fetchall()
return [_row_to_state(row) for row in rows]
def write_forge_state(state: ForgeState) -> None:
"""Persist `state` atomically. Creates parent dirs as needed."""
path = forge_state_path(state.owner, state.repo, state.issue_number)
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(".json.tmp")
tmp.write_text(state.to_json())
os.replace(tmp, path)
def read_forge_state(owner: str, repo: str, issue_number: int) -> ForgeState | None:
"""Load state for one issue, or None when no record exists."""
path = forge_state_path(owner, repo, issue_number)
try:
data = json.loads(path.read_text())
except FileNotFoundError:
return None
return ForgeState.from_dict(data)
def delete_forge_state(owner: str, repo: str, issue_number: int) -> None:
"""Remove an issue's record. Missing file is success (idempotent)."""
path = forge_state_path(owner, repo, issue_number)
path.unlink(missing_ok=True)
def all_forge_states() -> list[ForgeState]:
"""Every persisted record, for the orchestrate-status table and the
watchdog sweep. Unreadable files are skipped rather than aborting the
whole listing."""
root = _forge_root()
if not root.is_dir():
return []
states: list[ForgeState] = []
for path in sorted(root.glob("*/*/issue-*.json")):
try:
states.append(ForgeState.from_dict(json.loads(path.read_text())))
except (OSError, ValueError, TypeError):
continue
return states
def _row_to_state(row: sqlite3.Row) -> ForgeState:
return ForgeState(
owner=row["owner"],
repo=row["repo"],
issue_number=row["issue_number"],
slug=row["slug"],
agent_name=row["agent_name"],
bottle_names=json.loads(row["bottle_names"]),
backend_name=row["backend_name"],
agent_git_user=row["agent_git_user"],
pr_number=row["pr_number"],
status=row["status"],
last_checkin_at=row["last_checkin_at"],
)
-103
View File
@@ -1,103 +0,0 @@
"""Provenance footer (PRD forge-native-integration, chunk 5).
Every orchestrator-posted comment ends with this footer — non-optional
and not configurable off. It renders the run's audit trail (agent,
bottle, timing, exit, gitleaks, done-signal source, egress) as a
collapsed markdown block the reviewer sees at the moment of the merge
decision.
The function is pure: the orchestrator, which holds the run context,
supplies the values. In particular `egress_routes` is the pre-rendered
list of allowed-route lines the orchestrator computed from the run's
resolved egress policy — this module does not parse backend-specific
egress state. (The PRD sketch named an `egress_log_path`; passing the
already-rendered lines keeps the footer builder pure and fully testable
and leaves egress-state parsing where the data lives.)
"""
from __future__ import annotations
from datetime import datetime
def _parse(ts: str) -> datetime | None:
try:
return datetime.fromisoformat(ts)
except (ValueError, TypeError):
return None
def _format_duration(started_at: str, finished_at: str) -> str:
start = _parse(started_at)
end = _parse(finished_at)
if start is None or end is None:
return "unknown"
secs = int((end - start).total_seconds())
if secs < 0:
return "unknown"
if secs < 60:
return f"{secs}s"
return f"{secs // 60}m {secs % 60}s"
def build_provenance_footer(
slug: str,
*,
agent_name: str,
bottle_names: tuple[str, ...],
started_at: str,
finished_at: str,
exit_code: int,
watchdog_fired: bool = False,
gitleaks_clean: bool | None = None,
egress_routes: list[str] | None = None,
) -> str:
"""Return a markdown string for appending to a Gitea comment body.
`watchdog_fired=True` marks runs where the agent did not signal
completion, so reviewers know the audit trail may be incomplete.
`gitleaks_clean=None` renders the gitleaks row as "not run".
`egress_routes` is omitted entirely when None/empty.
"""
bottle_label = ", ".join(f"`{b}`" for b in bottle_names) if bottle_names else ""
exit_cell = f"{exit_code} {'' if exit_code == 0 else ''}"
if gitleaks_clean is None:
gitleaks_cell = "— not run"
elif gitleaks_clean:
gitleaks_cell = "✓ no secrets detected"
else:
gitleaks_cell = "✗ secrets detected"
if watchdog_fired:
done_cell = "watchdog — agent did not signal"
else:
done_cell = "sidecar `signal_done`"
lines = [
"<details><summary>🔬 Run provenance</summary>",
"",
"| Field | Value |",
"|---|---|",
f"| agent | `{agent_name}` |",
f"| bottle | {bottle_label} |",
f"| slug | `{slug}` |",
f"| started | {started_at} |",
f"| duration | {_format_duration(started_at, finished_at)} |",
f"| exit | {exit_cell} |",
f"| gitleaks | {gitleaks_cell} |",
f"| done signal | {done_cell} |",
]
if egress_routes:
lines.append("")
lines.append(
f"**Egress** (deny-by-default; {len(egress_routes)} "
f"route{'s' if len(egress_routes) != 1 else ''} allowed)"
)
for route in egress_routes:
lines.append(f"- {route}")
lines.append("")
lines.append("</details>")
return "\n".join(lines)
+100 -121
View File
@@ -17,8 +17,8 @@ calls `signal_done(status, summary)` on the sidecar when a work unit is
complete; the sidecar relays that to the orchestrator over a queue dir (the same
pattern as the supervise sidecar), so completion is an unambiguous in-band
signal rather than a comment the orchestrator has to parse. The orchestrator
freezes the bottle and attaches a provenance footer. Subsequent PR comments
rehydrate the frozen bottle. The bottle is destroyed when the PR closes.
freezes the bottle. Subsequent PR comments rehydrate the frozen bottle. The
bottle is destroyed when the PR closes.
The forge sidecar is backed by a `Forge` abstract class with per-provider
implementations (Gitea first), so the agent's prompts and the sidecar protocol
@@ -28,12 +28,17 @@ post-hoc egress-byte parsing, and enforces a **read-anywhere / write-scoped**
permission model: the agent may read for context but may only write to the
issue and PRs it was assigned.
Run provenance is exposed through a **provenance API** (the sidecar's structured
operation log plus the run's metadata), not posted back into the forge. We do
not surface a provenance footer in the PR — the audit record lives behind the
API where it can be retained and queried, rather than as an editable comment.
The separation of concerns across the two layers: bot-bottle owns the headless
launch primitives, the forge sidecar + `Forge` abstraction, forge state, and the
provenance builder. `bot-bottle-orchestrator` (separate binary) owns the webhook
listener, bottle lifecycle loop, and monitoring dashboard; it calls into
bot-bottle via `./cli.py orchestrate`, a thin wrapper command. This PRD covers
bot-bottle's side of that contract.
launch primitives, the forge sidecar + `Forge` abstraction, and forge state.
`bot-bottle-orchestrator` (separate binary) owns the webhook listener, bottle
lifecycle loop, and monitoring dashboard; it calls into bot-bottle via
`./cli.py orchestrate`, a thin wrapper command. This PRD covers bot-bottle's
side of that contract.
## Problem
@@ -84,18 +89,19 @@ accepted as the price of those properties.
This is the done signal — no comment parsing. A watchdog timeout
(configurable, default 30 min) causes the orchestrator to treat the run as
done-without-self-report if the agent exits without signalling.
6. Every orchestrator-posted comment ends with a provenance footer: agent name,
bottle name(s), slug, start time, duration, exit code, gitleaks result, and
egress summary.
7. Forge state (issue → slug, status) is persisted to disk and survives
orchestrator restarts.
6. Run provenance (agent name, bottle name(s), slug, timing, exit code,
gitleaks result, egress summary, and the sidecar's semantic operation log)
is available through a provenance API. It is **not** surfaced as a PR footer
or any other forge comment.
7. Forge state (issue → slug, status) is persisted in a local SQLite database
under `~/.bot-bottle/` and survives orchestrator restarts.
8. `./cli.py orchestrate status` lists active forge-managed bottles and their
issue/PR URLs.
9. Unit tests cover: label parsing, org-membership check path, forge state
read/write, provenance footer rendering, headless launch arg construction,
forge env var injection, sidecar request dispatch through the `Forge`
abstraction, write-scope enforcement (reject writes outside the assigned
issue/PRs), and `signal_done` queue relay.
store CRUD (SQLite), headless launch arg construction, forge env var
injection, sidecar request dispatch through the `Forge` abstraction,
write-scope enforcement (reject writes outside the assigned issue/PRs), and
`signal_done` queue relay.
## Non-goals
@@ -205,12 +211,16 @@ forge targeting.
| Method | Scope | Purpose |
|---|---|---|
| `read_issue(number)` | read-anywhere | Read issue/PR body for context |
| `read_issue(number)` | read-anywhere | Read an issue body for context |
| `read_pr(number)` | read-anywhere | Read a PR (incl. merge state) for context |
| `read_comments(number)` | read-anywhere | Read a thread for context |
| `post_comment(number, body)` | write-scoped | Post to the assigned issue/PR |
| `update_description(number, body)` | write-scoped | Edit the assigned issue/PR body |
| `signal_done(status, summary)` | — | Relay completion to the orchestrator |
Issues and PRs are distinct domain objects (`Issue` vs `PullRequest`) read
through distinct methods; a PR carries merge state an issue does not.
**Scope enforcement** is read-anywhere / write-scoped: read methods accept any
issue/PR number for context; write methods are rejected unless the target is the
assigned issue or one of its PRs. This is tighter than Gitea's repo-wide API-key
@@ -233,6 +243,8 @@ class Forge(abc.ABC):
@abc.abstractmethod
def read_issue(self, number: int) -> Issue: ...
@abc.abstractmethod
def read_pr(self, number: int) -> PullRequest: ...
@abc.abstractmethod
def read_comments(self, number: int) -> list[Comment]: ...
@abc.abstractmethod
def post_comment(self, number: int, body: str) -> None: ...
@@ -246,6 +258,11 @@ class Forge(abc.ABC):
def is_pr_open(self, number: int) -> bool: ...
```
`Issue` and `PullRequest` are separate frozen dataclasses — a PR adds `merged`.
`ScopedForge` wraps a concrete `Forge` to enforce the read-anywhere /
write-scoped model (`post_comment` / `update_description` raise `ForgeScopeError`
outside the assigned issue and PRs).
`GiteaForge` is the first and only concrete implementation in this PRD. It wraps
the Gitea HTTP client (below). Adding GitHub or GitLab later is a new subclass;
the sidecar, protocol, and agent prompt are untouched.
@@ -284,8 +301,8 @@ it and:
1. Reads the forge state for `(owner, repo, issue_number)`.
2. If `status == "running"`, treats the event as the done signal: freezes the
bottle, posts a summary comment with the provenance footer, sets
`status = "frozen"`.
bottle and sets `status = "frozen"`. Provenance is recorded via the
provenance API — no comment is posted to the forge.
Because completion is an explicit `signal_done` call, the orchestrator does not
parse comment text to detect "done", and intermediate comments the agent posts
@@ -295,102 +312,61 @@ mid-run cannot be mistaken for completion.
on each sidecar event. A background thread wakes every minute. If
`now - last_checkin_at > FORGE_WATCHDOG_TIMEOUT` (default 30 min, configurable
via env) and `status == "running"`, the orchestrator treats the run as
done-without-self-report: it posts the provenance footer (with `watchdog_fired`
set) and freezes the bottle.
done-without-self-report and freezes the bottle, flagging the run as incomplete
in the provenance record.
**Sidecar-death failure mode**: if the forge sidecar crashes mid-run the agent
loses forge access while the bottle is otherwise healthy. The orchestrator
detects a dead sidecar (socket/queue gone) the same way it detects a stalled
agent and falls back to the watchdog path, posting a footer that flags the
incomplete run.
agent and falls back to the watchdog path.
### Forge state — `bot_bottle/contrib/gitea/forge_state.py`
```
~/.bot-bottle/forge/
<owner>/
<repo>/
issue-<n>.json
```
State is stored in a local SQLite database at `~/.bot-bottle/bot-bottle.db`.
Access goes through a thin CRUD interface, `ForgeStateStore`, so the storage
location/engine can be swapped without touching callers. `SqliteForgeStateStore`
is the first implementation.
Schema:
```json
{
"slug": "implementer-abc12",
"pr_number": 42,
"agent_name": "implementer",
"bottle_names": ["claude"],
"backend_name": "docker",
"agent_git_user": "didericis-claude",
"issue_number": 17,
"owner": "didericis",
"repo": "bot-bottle",
"status": "frozen",
"last_checkin_at": "2026-06-29T12:04:12-04:00"
}
```
The `forge_state` table is keyed by `(owner, repo, issue_number)` and carries:
`slug`, `agent_name`, `bottle_names` (JSON), `backend_name`, `agent_git_user`,
`pr_number` (nullable), `status`, `last_checkin_at`.
`status`: `"running"` | `"frozen"` | `"destroyed"`.
Public API:
Store interface:
```python
def write_forge_state(state: ForgeState) -> None: ...
def read_forge_state(owner: str, repo: str, issue_number: int) -> ForgeState | None: ...
def delete_forge_state(owner: str, repo: str, issue_number: int) -> None: ...
def all_forge_states() -> list[ForgeState]: ...
class ForgeStateStore(abc.ABC):
def upsert(self, state: ForgeState) -> None: ...
def get(self, owner: str, repo: str, issue_number: int) -> ForgeState | None: ...
def delete(self, owner: str, repo: str, issue_number: int) -> None: ...
def all(self) -> list[ForgeState]: ...
class SqliteForgeStateStore(ForgeStateStore):
def __init__(self, db_path: Path | None = None) -> None: ...
```
Writes use atomic rename (`os.replace`) for crash safety.
`upsert` uses `INSERT OR REPLACE` so a re-run for the same issue overwrites in
place. The schema is created on first open.
### Provenance — `bot_bottle/contrib/gitea/provenance.py`
### Provenance API
```python
def build_provenance_footer(
slug: str,
*,
agent_name: str,
bottle_names: tuple[str, ...],
started_at: str,
finished_at: str,
exit_code: int,
watchdog_fired: bool = False,
egress_log_path: Path | None = None,
) -> str:
"""Return a markdown string for appending to a Gitea comment body."""
```
Run provenance — agent, bottle(s), slug, timing, exit code, gitleaks result,
egress summary, watchdog-fired flag, and the sidecar's semantic operation log —
is exposed through a **provenance API**, not posted into the forge. There is no
provenance footer or run-summary comment.
Output (collapsed by default):
The rationale (per the monetization positioning): a PR comment is mutable by any
maintainer, unsigned, and per-PR, so it is worthless as an audit record and
invites false trust. The authoritative record therefore lives behind the API,
where it can be retained, queried, and (eventually) signed. Whether any
projection of it ever appears in the forge is a separate, out-of-scope decision;
this PR does not build one.
```markdown
<details><summary>🔬 Run provenance</summary>
| Field | Value |
|---|---|
| agent | `implementer` |
| bottle | `claude` |
| slug | `implementer-abc12` |
| started | 2026-06-29T12:00:00-04:00 |
| duration | 4m 12s |
| exit | 0 ✓ |
| gitleaks | ✓ no secrets detected |
| done signal | sidecar `signal_done` *(or: watchdog — agent did not signal)* |
**Egress** (deny-by-default; 2 routes allowed)
- `api.anthropic.com` — Bearer auth
- `pypi.org` — unauthenticated
Forge traffic is not an agent egress route — the forge sidecar holds the token
and makes those calls out of band. The provenance footer's forge operations come
from the sidecar's semantic audit log.
</details>
```
The egress summary is read from `~/.bot-bottle/state/<slug>/egress/`. When
unavailable the section is omitted. `watchdog_fired=True` changes the
"done signal" row to warn reviewers.
The API surface itself (schema, transport, signing, retention) is **out of scope
for this PRD** and belongs with the orchestrator / control-plane work. bot-bottle
here only produces the raw material: the sidecar's semantic operation log and the
run metadata the orchestrator collects.
### Gitea HTTP client — `bot_bottle/contrib/gitea/client.py`
@@ -403,15 +379,17 @@ inject it, because the agent never makes forge calls.
class GiteaClient:
def __init__(self, *, api_url: str, owner: str, repo: str, token: str) -> None: ...
def is_org_member(self, org: str, username: str) -> bool: ...
def post_comment(self, issue_number: int, body: str) -> None: ...
def update_comment_body(self, issue_number: int, body: str) -> None: ...
def get_pr_for_issue(self, issue_number: int) -> int | None: ...
def is_pr_open(self, pr_number: int) -> bool: ...
def get_issue(self, number: int) -> dict: ...
def get_comments(self, number: int) -> list[dict]: ...
def post_comment(self, number: int, body: str) -> None: ...
def patch_issue_body(self, number: int, body: str) -> None: ...
def get_pull(self, number: int) -> dict: ...
```
Sharing only the HTTP client (not an abstract base) is the deliberate boundary
between the sidecar and the deploy-key provisioner — see the deferral note under
the `Forge` abstraction.
`GiteaForge` adapts this client to the `Forge` surface (mapping raw JSON to
`Issue` / `PullRequest` / `Comment`). Sharing only the HTTP client (not an
abstract base) is the deliberate boundary between the sidecar and the deploy-key
provisioner — see the deferral note under the `Forge` abstraction.
### Implementation chunks
@@ -423,14 +401,17 @@ the `Forge` abstraction.
the TUI and y/N preflight and returns the agent exit code.
2. **Forge state**`contrib/gitea/forge_state.py`: `ForgeState` dataclass,
read/write/delete/all helpers, atomic rename. Tests: round-trip JSON, missing
file → None, atomic write.
`ForgeStateStore` CRUD interface, `SqliteForgeStateStore`. Tests: round-trip,
missing → None, `INSERT OR REPLACE` upsert, delete idempotent, `all()`
ordering, persistence across store instances.
3. **`Forge` abstraction + Gitea client** — `contrib/forge/base.py` (`Forge`
ABC) and `contrib/gitea/client.py` + `GiteaForge`: `is_org_member`,
`read_issue`, `read_comments`, `post_comment`, `update_description`,
ABC, `ScopedForge`, `Issue` / `PullRequest` / `Comment`) and
`contrib/gitea/client.py` + `GiteaForge`: `is_org_member`, `read_issue`,
`read_pr`, `read_comments`, `post_comment`, `update_description`,
`get_pr_for_issue`, `is_pr_open`. Tests: mock `urllib.request.urlopen`,
assert payloads and 404-as-false for membership.
assert payloads and 404-as-false for membership; `ScopedForge` write-scope
enforcement.
4. **Forge sidecar** — sidecar process exposing the protocol over a Unix socket,
queue-dir relay, write-scope enforcement, semantic op log, `signal_done`.
@@ -438,23 +419,21 @@ the `Forge` abstraction.
the `Forge`, reject out-of-scope writes, `signal_done` writes a queue event,
scope-rejection is logged.
5. **Provenance**`contrib/gitea/provenance.py`: `build_provenance_footer`.
Tests: required fields present, watchdog row text, egress omitted when log
absent.
6. **`./cli.py orchestrate`** — `cli/orchestrate.py` with `start`, `resume`,
5. **`./cli.py orchestrate`** — `cli/orchestrate.py` with `start`, `resume`,
`status` subcommands wired into `cli.py`; `start` launches the forge sidecar
alongside the agent for forge-targeted runs. Tests: arg parsing, `start`
delegates to `start --headless`, `resume` delegates to `resume --headless`.
## Provenance as the product
## Provenance
Every orchestrator-posted comment ends with the provenance footer — non-optional
and not configurable off. PRs that land without a footer were not produced by
this integration. The `watchdog_fired` flag in the footer flags runs where the
agent did not self-report completion, so reviewers know the audit trail may be
Run provenance is captured (sidecar semantic operation log + run metadata) and
exposed through a provenance API. It is deliberately **not** surfaced in the
forge — no footer, no run-summary comment. A mutable, unsigned PR comment is not
an audit record; the authoritative record lives behind the API where it can be
retained and signed. The `watchdog_fired` flag marks runs where the agent did
not self-report completion so consumers of the API know the record may be
incomplete.
The footer links to the bot-bottle repo pinned to the commit SHA active during
the run (not `main`), so the policy that governed the run is permanently
anchored in the PR history.
The provenance API's schema, transport, signing, and retention are out of scope
for this PRD (control-plane work); bot-bottle here produces the raw material
only.
+12
View File
@@ -9,6 +9,7 @@ from bot_bottle.contrib.forge.base import (
Forge,
ForgeScopeError,
Issue,
PullRequest,
ScopedForge,
)
@@ -23,6 +24,11 @@ class _RecordingForge(Forge):
def read_issue(self, number: int) -> Issue:
return Issue(number=number, title="t", body="b", state="open")
def read_pr(self, number: int) -> PullRequest:
return PullRequest(
number=number, title="pr", body="b", state="open", merged=False
)
def read_comments(self, number: int) -> list[Comment]:
return [Comment(id=1, user="alice", body="hi")]
@@ -52,6 +58,12 @@ class TestScopedForgeReads(unittest.TestCase):
self.assertEqual(123, self.scoped.read_issue(123).number)
self.assertEqual("alice", self.scoped.read_comments(500)[0].user)
def test_read_pr_passes_through(self):
pr = self.scoped.read_pr(999)
self.assertIsInstance(pr, PullRequest)
self.assertEqual(999, pr.number)
self.assertFalse(pr.merged)
def test_membership_and_pr_lookups_delegate(self):
self.assertTrue(self.scoped.is_org_member("bot-bottle", "member"))
self.assertFalse(self.scoped.is_org_member("bot-bottle", "stranger"))
+14
View File
@@ -126,6 +126,20 @@ class TestPRHelpers(unittest.TestCase):
with patch(_URLOPEN, return_value=_resp({"state": "closed"})):
self.assertFalse(GiteaForge(_client()).is_pr_open(18))
def test_read_pr_maps_fields_including_merged(self):
raw = {"number": 18, "title": "Fix", "body": "patch",
"state": "closed", "merged": True}
with patch(_URLOPEN, return_value=_resp(raw)) as m:
pr = GiteaForge(_client()).read_pr(18)
self.assertEqual((18, "Fix", "patch", "closed", True),
(pr.number, pr.title, pr.body, pr.state, pr.merged))
self.assertIn("/repos/didericis/bot-bottle/pulls/18",
m.call_args.args[0].full_url)
def test_read_pr_merged_defaults_false(self):
with patch(_URLOPEN, return_value=_resp({"number": 18, "state": "open"})):
self.assertFalse(GiteaForge(_client()).read_pr(18).merged)
if __name__ == "__main__":
unittest.main()
+45 -49
View File
@@ -1,4 +1,4 @@
"""Unit: forge state persistence (PRD forge-native-integration)."""
"""Unit: SQLite forge state store (PRD forge-native-integration)."""
from __future__ import annotations
@@ -6,13 +6,12 @@ import tempfile
import unittest
from dataclasses import replace
from pathlib import Path
from unittest.mock import patch
from bot_bottle.contrib.gitea import forge_state as fs
from bot_bottle.contrib.gitea.forge_state import (
STATUS_FROZEN,
STATUS_RUNNING,
ForgeState,
SqliteForgeStateStore,
)
@@ -33,71 +32,68 @@ def _state(**over: object) -> ForgeState:
return replace(base, **over)
class ForgeStateTest(unittest.TestCase):
class ForgeStateStoreTest(unittest.TestCase):
def setUp(self) -> None:
# enterContext handles cleanup; pylint doesn't recognize it as CM-aware.
root = Path(self.enterContext( # pylint: disable=consider-using-with
tempfile.TemporaryDirectory()))
patcher = patch.object(fs, "bot_bottle_root", return_value=root)
patcher.start()
self.addCleanup(patcher.stop)
tmp = Path(self.enterContext(tempfile.TemporaryDirectory())) # pylint: disable=consider-using-with
self.store = SqliteForgeStateStore(tmp / "sub" / "bot-bottle.db")
def test_round_trip(self):
fs.write_forge_state(_state())
got = fs.read_forge_state("didericis", "bot-bottle", 17)
self.assertEqual(_state(), got)
self.store.upsert(_state())
self.assertEqual(_state(), self.store.get("didericis", "bot-bottle", 17))
def test_missing_returns_none(self):
self.assertIsNone(fs.read_forge_state("nobody", "nope", 1))
self.assertIsNone(self.store.get("nobody", "nope", 1))
def test_path_layout(self):
path = fs.forge_state_path("didericis", "bot-bottle", 17)
self.assertTrue(str(path).endswith("forge/didericis/bot-bottle/issue-17.json"))
def test_creates_db_parent_dirs(self):
# setUp pointed at a non-existent 'sub/' dir; init must create it.
self.assertIsNone(self.store.get("x", "y", 1)) # no raise
def test_write_is_atomic_no_tmp_left(self):
fs.write_forge_state(_state())
path = fs.forge_state_path("didericis", "bot-bottle", 17)
self.assertFalse(path.with_suffix(".json.tmp").exists())
self.assertTrue(path.exists())
def test_update_overwrites(self):
fs.write_forge_state(_state(status=STATUS_RUNNING))
fs.write_forge_state(_state(status=STATUS_FROZEN))
got = fs.read_forge_state("didericis", "bot-bottle", 17)
def test_upsert_replaces(self):
self.store.upsert(_state(status=STATUS_RUNNING))
self.store.upsert(_state(status=STATUS_FROZEN))
got = self.store.get("didericis", "bot-bottle", 17)
assert got is not None
self.assertEqual(STATUS_FROZEN, got.status)
# Still one row, not two.
self.assertEqual(1, len(self.store.all()))
def test_delete_is_idempotent(self):
fs.write_forge_state(_state())
fs.delete_forge_state("didericis", "bot-bottle", 17)
fs.delete_forge_state("didericis", "bot-bottle", 17) # no raise
self.assertIsNone(fs.read_forge_state("didericis", "bot-bottle", 17))
self.store.upsert(_state())
self.store.delete("didericis", "bot-bottle", 17)
self.store.delete("didericis", "bot-bottle", 17) # no raise
self.assertIsNone(self.store.get("didericis", "bot-bottle", 17))
def test_all_forge_states_lists_across_repos(self):
fs.write_forge_state(_state(issue_number=17))
fs.write_forge_state(_state(issue_number=18, slug="other"))
fs.write_forge_state(_state(owner="acme", repo="widget", issue_number=3))
states = fs.all_forge_states()
def test_all_lists_across_repos_sorted(self):
self.store.upsert(_state(issue_number=18, slug="other"))
self.store.upsert(_state(issue_number=17))
self.store.upsert(_state(owner="acme", repo="widget", issue_number=3))
states = self.store.all()
self.assertEqual(3, len(states))
self.assertEqual({17, 18, 3}, {s.issue_number for s in states})
self.assertEqual(
[("acme", 3), ("didericis", 17), ("didericis", 18)],
[(s.owner, s.issue_number) for s in states],
)
def test_all_forge_states_empty_when_no_dir(self):
self.assertEqual([], fs.all_forge_states())
def test_all_empty(self):
self.assertEqual([], self.store.all())
def test_from_dict_ignores_unknown_keys(self):
st = ForgeState.from_dict({
"owner": "o", "repo": "r", "issue_number": 1, "slug": "s",
"agent_name": "a", "future_field": "ignored",
})
self.assertEqual("o", st.owner)
self.assertIsNone(st.pr_number)
def test_bottle_names_list_preserved(self):
self.store.upsert(_state(bottle_names=["claude", "dev"]))
got = self.store.get("didericis", "bot-bottle", 17)
assert got is not None
self.assertEqual(["claude", "dev"], got.bottle_names)
def test_pr_number_optional(self):
fs.write_forge_state(_state(pr_number=None))
got = fs.read_forge_state("didericis", "bot-bottle", 17)
def test_pr_number_nullable(self):
self.store.upsert(_state(pr_number=None))
got = self.store.get("didericis", "bot-bottle", 17)
assert got is not None
self.assertIsNone(got.pr_number)
def test_persists_across_store_instances(self):
self.store.upsert(_state())
reopened = SqliteForgeStateStore(self.store._db_path) # pylint: disable=protected-access
self.assertEqual(_state(), reopened.get("didericis", "bot-bottle", 17))
if __name__ == "__main__":
unittest.main()
@@ -1,94 +0,0 @@
"""Unit: provenance footer (PRD forge-native-integration)."""
from __future__ import annotations
import unittest
from bot_bottle.contrib.gitea.provenance import build_provenance_footer
def _footer(
slug: str = "implementer-abc12",
*,
agent_name: str = "implementer",
bottle_names: tuple[str, ...] = ("claude",),
started_at: str = "2026-06-29T12:00:00-04:00",
finished_at: str = "2026-06-29T12:04:12-04:00",
exit_code: int = 0,
watchdog_fired: bool = False,
gitleaks_clean: bool | None = None,
egress_routes: list[str] | None = None,
) -> str:
return build_provenance_footer(
slug,
agent_name=agent_name,
bottle_names=bottle_names,
started_at=started_at,
finished_at=finished_at,
exit_code=exit_code,
watchdog_fired=watchdog_fired,
gitleaks_clean=gitleaks_clean,
egress_routes=egress_routes,
)
class ProvenanceTest(unittest.TestCase):
def test_required_fields_present(self):
out = _footer()
for token in ("Run provenance", "`implementer`", "`claude`",
"`implementer-abc12`", "| exit | 0 ✓ |"):
self.assertIn(token, out)
def test_collapsed_details_block(self):
out = _footer()
self.assertTrue(out.startswith("<details>"))
self.assertIn("</details>", out)
def test_duration_minutes_seconds(self):
self.assertIn("| duration | 4m 12s |", _footer())
def test_duration_under_a_minute(self):
out = _footer(finished_at="2026-06-29T12:00:30-04:00")
self.assertIn("| duration | 30s |", out)
def test_duration_unknown_on_bad_timestamp(self):
out = _footer(finished_at="not-a-time")
self.assertIn("| duration | unknown |", out)
def test_nonzero_exit_marked(self):
self.assertIn("| exit | 1 ✗ |", _footer(exit_code=1))
def test_watchdog_changes_done_signal_row(self):
normal = _footer()
self.assertIn("sidecar `signal_done`", normal)
fired = _footer(watchdog_fired=True)
self.assertIn("watchdog — agent did not signal", fired)
self.assertNotIn("sidecar `signal_done`", fired)
def test_gitleaks_states(self):
self.assertIn("not run", _footer())
self.assertIn("✓ no secrets detected", _footer(gitleaks_clean=True))
self.assertIn("✗ secrets detected", _footer(gitleaks_clean=False))
def test_egress_omitted_when_absent(self):
self.assertNotIn("**Egress**", _footer())
def test_egress_rendered_when_present(self):
out = _footer(egress_routes=[
"`api.anthropic.com` — Bearer auth",
"`pypi.org` — unauthenticated",
])
self.assertIn("**Egress** (deny-by-default; 2 routes allowed)", out)
self.assertIn("- `api.anthropic.com` — Bearer auth", out)
def test_egress_singular_route(self):
out = _footer(egress_routes=["`api.anthropic.com` — Bearer auth"])
self.assertIn("1 route allowed", out)
def test_multiple_bottles_listed(self):
out = _footer(bottle_names=("claude", "dev"))
self.assertIn("`claude`, `dev`", out)
if __name__ == "__main__":
unittest.main()