Part I: Forge native integration: PRD + forge library layer #318

Open
didericis-claude wants to merge 11 commits from forge-native-integration into main
17 changed files with 2082 additions and 10 deletions
+10
View File
@@ -1 +1,11 @@
"""bot-bottle: Python implementation of the agent container launcher."""
from .api import BottleError, destroy, freeze, resume_headless, start_headless
__all__ = [
"BottleError",
"destroy",
"freeze",
"resume_headless",
"start_headless",
]
+258
View File
@@ -0,0 +1,258 @@
"""Public Python API for programmatic bottle orchestration.
Stable surface for bot-bottle-orchestrator (and other Python callers) to
drive bottles without invoking the CLI as a subprocess. Every function
converts ``Die`` and non-zero agent exit codes to ``BottleError`` so
callers use exception handling rather than inspecting return values.
The Protocol the orchestrator's ``BottleRunner`` targets looks like::
class BottleRunner(Protocol):
def start(self, agent: str, *, prompt: str, ...) -> str: ...
def resume(self, slug: str, *, prompt: str) -> None: ...
def freeze(self, slug: str) -> None: ...
def destroy(self, slug: str) -> None: ...
A ``SubprocessBottleRunner`` calls ``./cli.py`` for each operation. A
``ProgrammaticBottleRunner`` calls these functions directly; the Protocol
call sites in ``lifecycle.py`` are unchanged.
"""
from __future__ import annotations
from typing import Sequence, cast
from .backend import BottleSpec
from .backend.freeze import CommitCancelled, get_freezer
from .bottle_state import cleanup_state, clear_preserve_marker, read_metadata
from .cli._common import USER_CWD
from .cli.start import _launch_bottle, _peek_agent_bottle, _uniquify_label_headless
from .log import Die
from .manifest import ManifestError, ManifestIndex
class BottleError(Exception):
"""Raised when a bottle operation fails.
``exit_code`` carries the agent process's exit code when the failure is
a non-zero agent exit; 1 for all other failure modes (missing state,
backend errors, etc.)."""
def __init__(self, message: str, *, exit_code: int = 1) -> None:
super().__init__(message)
self.exit_code = exit_code
def start_headless(
agent_name: str,
*,
prompt: str,
bottles: Sequence[str] | None = None,
label: str | None = None,
color: str | None = None,
backend_name: str | None = None,
copy_cwd: bool = False,
forge_env: dict[str, str] | None = None,
user_cwd: str | None = None,
) -> str:
"""Launch a new bottle headlessly. Returns the bottle slug.
``forge_env`` is passed through to the forge sidecar (not the agent)
when the bottle is forge-targeted; it carries the credentials and
context the sidecar needs to call the forge API.
Raises ``BottleError`` on configuration errors or if the agent exits
non-zero. The returned slug can be passed to ``freeze()``,
``resume_headless()``, or ``destroy()`` for subsequent lifecycle
operations."""
cwd = user_cwd or USER_CWD
try:
manifest = ManifestIndex.resolve(cwd)
manifest.require_agent(agent_name)
except (Die, ManifestError) as exc:
raise BottleError(str(exc)) from exc
if bottles:
bottle_names: tuple[str, ...] = tuple(bottles)
else:
default_bottle = _peek_agent_bottle(manifest, agent_name)
if not default_bottle:
raise BottleError(
f"agent '{agent_name}' has no default bottle; "
f"pass bottles=[...]"
)
bottle_names = (default_bottle,)
spec = BottleSpec(
manifest=manifest,
agent_name=agent_name,
copy_cwd=copy_cwd,
user_cwd=cwd,
label=_uniquify_label_headless(label or agent_name),
color=color or "",
bottle_names=bottle_names,
forge_env=dict(forge_env) if forge_env else {},
)
try:
slug, exit_code = _launch_bottle(
spec,
dry_run=False,
backend_name=backend_name,
assume_yes=True,
headless_prompt_text=prompt,
)
except Die as exc:
raise BottleError(exc.message, exit_code=cast(int, exc.code)) from exc
if exit_code != 0:
raise BottleError(
f"agent exited {exit_code} (slug={slug!r})", exit_code=exit_code
)
return slug
def resume_headless(
slug: str,
*,
prompt: str,
backend_name: str | None = None,
forge_env: dict[str, str] | None = None,
) -> None:
"""Resume a frozen bottle headlessly with ``prompt``.
``forge_env`` re-supplies forge context for the new session (the
sidecar is relaunched alongside the agent on resume).
Raises ``BottleError`` on missing state, backend errors, or non-zero
agent exit."""
metadata = read_metadata(slug)
if metadata is None:
raise BottleError(
f"no state recorded for slug {slug!r}; "
f"check ~/.bot-bottle/state/ or call start_headless() to create a new bottle"
)
try:
manifest = ManifestIndex.resolve(metadata.cwd or USER_CWD)
manifest.require_agent(metadata.agent_name)
except (Die, ManifestError) as exc:
raise BottleError(str(exc)) from exc
spec = BottleSpec(
manifest=manifest,
agent_name=metadata.agent_name,
copy_cwd=metadata.copy_cwd,
user_cwd=metadata.cwd or USER_CWD,
identity=metadata.identity,
bottle_names=tuple(metadata.bottle_names),
forge_env=dict(forge_env) if forge_env else {},
)
try:
_, exit_code = _launch_bottle(
spec,
dry_run=False,
backend_name=backend_name or metadata.backend or None,
assume_yes=True,
headless_prompt_text=prompt,
)
except Die as exc:
raise BottleError(exc.message, exit_code=cast(int, exc.code)) from exc
if exit_code != 0:
raise BottleError(
f"agent exited {exit_code} resuming {slug!r}", exit_code=exit_code
)
def freeze(slug: str, *, backend_name: str | None = None) -> None:
"""Freeze the named bottle to a resumable artifact.
Reads the bottle's backend from its metadata when ``backend_name`` is
not supplied. Raises ``BottleError`` if the freeze fails."""
metadata = read_metadata(slug)
resolved_backend = backend_name or (metadata.backend if metadata else "") or "docker"
try:
get_freezer(resolved_backend).commit_slug(slug)
except CommitCancelled as exc:
raise BottleError(f"freeze cancelled for {slug!r}") from exc
except Die as exc:
raise BottleError(exc.message, exit_code=cast(int, exc.code)) from exc
def destroy(slug: str, *, backend_name: str | None = None) -> None:
"""Destroy the named bottle, removing all resources and state.
Brings down any running resources for ``slug``, then removes the
per-bottle state directory. Idempotent: a slug with no running
resources or no state directory is not an error."""
metadata = read_metadata(slug)
resolved_backend = backend_name or (metadata.backend if metadata else "") or "docker"
try:
if resolved_backend == "docker":
_destroy_docker(slug)
elif resolved_backend == "smolmachines":
_destroy_smolmachines(slug)
# macos-container: the container is torn down inside the launch
# context manager; no persistent VM survives, so nothing extra is
# needed at destroy time beyond the state-dir removal below.
except Die as exc:
raise BottleError(exc.message, exit_code=cast(int, exc.code)) from exc
clear_preserve_marker(slug)
cleanup_state(slug)
# --- backend-specific helpers -----------------------------------------------
def _destroy_docker(slug: str) -> None:
"""Best-effort ``docker compose down`` for a Docker bottle.
No-op when the compose file is absent — the project was already
brought down (normal for a frozen bottle) or was never created."""
from .backend.docker.compose import (
compose_down,
compose_file_path,
compose_project_name,
)
from .bottle_state import bottle_state_dir
state_dir = bottle_state_dir(slug)
compose_file = compose_file_path(state_dir)
if compose_file.exists():
compose_down(compose_project_name(slug), compose_file)
def _destroy_smolmachines(slug: str) -> None:
"""Best-effort stop + delete for a smolmachines bottle.
Both steps are best-effort: a machine that is already gone does not
cause an error; partial failures are logged as warnings."""
import subprocess
from .log import warn
machine = f"bot-bottle-{slug}"
subprocess.run(
["smolvm", "machine", "stop", "--name", machine],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
)
r = subprocess.run(
["smolvm", "machine", "delete", "-f", machine],
capture_output=True,
text=True,
check=False,
)
if r.returncode != 0:
warn(
f"smolvm machine delete -f {machine!r} failed "
f"(may already be gone): {(r.stderr or '').strip()}"
)
__all__ = [
"BottleError",
"destroy",
"freeze",
"resume_headless",
"start_headless",
]
+6 -1
View File
@@ -37,7 +37,7 @@ import shlex
import sys
from abc import ABC, abstractmethod
from contextlib import AbstractContextManager
from dataclasses import dataclass
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Generic, Sequence, TypeVar
@@ -75,6 +75,11 @@ class BottleSpec:
# Ordered bottle names selected at launch (issue #269). When non-empty
# they are merged in order and replace the agent's `bottle:` field.
bottle_names: tuple[str, ...] = ()
# Forge sidecar env vars (PRD forge-native-integration, chunk 1).
# Passed by the orchestrator at launch time; the forge sidecar reads
# them to connect to Gitea. Empty for non-forge runs. The agent
# process itself does not receive these.
forge_env: dict[str, str] = field(default_factory=dict)
@dataclass(frozen=True)
+26 -1
View File
@@ -27,12 +27,34 @@ from .start import _launch_bottle
def cmd_resume(argv: list[str]) -> int:
parser = argparse.ArgumentParser(prog=f"{PROG} resume", add_help=True)
parser.add_argument("--dry-run", action="store_true")
parser.add_argument(
"--headless",
action="store_true",
help=(
"non-interactive rehydrate: deliver --prompt to the agent and "
"skip the y/N preflight. For orchestrators / the freeze-rehydrate "
"loop."
),
)
parser.add_argument(
"--prompt",
default=None,
help="follow-up prompt delivered to the agent (required with --headless)",
)
parser.add_argument(
"identity",
help="bottle identity from a prior `start` (see its session-end output)",
)
args = parser.parse_args(argv)
if args.prompt and not args.headless:
die("--prompt is only valid with --headless")
if args.headless and not args.prompt:
die(
"--headless requires --prompt: "
"./cli.py resume <identity> --headless --prompt 'Address the review'"
)
metadata = read_metadata(args.identity)
if metadata is None:
die(
@@ -52,8 +74,11 @@ def cmd_resume(argv: list[str]) -> int:
bottle_names=tuple(metadata.bottle_names),
)
backend_name = metadata.backend or None
return _launch_bottle(
_, rc = _launch_bottle(
spec,
dry_run=args.dry_run,
backend_name=backend_name,
assume_yes=args.headless,
headless_prompt_text=args.prompt or "",
)
return rc
+13 -5
View File
@@ -144,11 +144,12 @@ def cmd_start(argv: list[str]) -> int:
color=color,
bottle_names=bottle_names,
)
return _launch_bottle(
_, rc = _launch_bottle(
spec,
dry_run=dry_run,
backend_name=backend_name,
)
return rc
# --- Headless launch -----------------------------------------------------
@@ -203,13 +204,14 @@ def _start_headless(
color=args.color or "",
bottle_names=bottle_names,
)
return _launch_bottle(
_, rc = _launch_bottle(
spec,
dry_run=dry_run,
backend_name=backend_name,
assume_yes=True,
headless_prompt_text=prompt,
)
return rc
def _uniquify_label_headless(label: str) -> str:
@@ -497,11 +499,16 @@ def _launch_bottle(
backend_name: str | None = None,
assume_yes: bool = False,
headless_prompt_text: str = "",
) -> int:
) -> tuple[str, int]:
"""Shared launch core for `start` and `resume`. Builds the plan,
prints / dry-runs / prompts as appropriate, brings the bottle up,
attaches claude, and prints the resume hint on session end.
Returns ``(slug, exit_code)`` where ``slug`` is the bottle identity
(empty string when the launch was aborted before a slug was minted)
and ``exit_code`` is the agent process's exit code (0 on clean exit
or when launch was aborted before the agent ran).
`assume_yes` skips the interactive y/N confirmation (headless /
orchestrator launches), where there is no human at the prompt.
@@ -510,6 +517,7 @@ def _launch_bottle(
agent receives the initial task without interactive input."""
stage_dir = Path(tempfile.mkdtemp(prefix="bot-bottle-stage."))
identity = ""
exit_code = 0
try:
plan, identity = prepare_with_preflight(
spec,
@@ -520,7 +528,7 @@ def _launch_bottle(
backend_name=backend_name,
)
if plan is None:
return 0
return identity, 0
backend = get_bottle_backend(backend_name)
with backend.launch(plan) as bottle:
@@ -547,7 +555,7 @@ def _launch_bottle(
# Ctrl-Cs / OOM kills before cleanup removes the state dir.
if agent_provider_template == "claude":
capture_claude_session_state(identity, exit_code)
return 0
return identity, exit_code
finally:
# PRD 0018 chunk 2: prepare now writes the bottle's bind-mount
# sources under state/<slug>/. If we never reached the
+165
View File
@@ -0,0 +1,165 @@
"""Forge abstraction (PRD forge-native-integration, chunk 3).
The `Forge` abstract class is the provider-agnostic surface a forge
sidecar dispatches to: read issues/comments, post comments, edit
descriptions, and the membership / PR lookups the orchestrator needs.
Each forge (Gitea first) implements it; the sidecar protocol and the
agent prompt stay forge-agnostic.
`signal_done` is deliberately *not* a `Forge` method — completion is a
sidecar concept relayed to the orchestrator over a queue dir, not a
forge API operation.
`ScopedForge` enforces the PRD's **read-anywhere / write-scoped** model:
reads pass through to any issue/PR for context; writes are rejected
unless the target is the assigned issue or one of its PRs. This bounds
the blast radius of a prompt-injected agent below repo-wide API-key
permissions.
"""
from __future__ import annotations
import abc
from collections.abc import Iterable
from dataclasses import dataclass
@dataclass(frozen=True)
class Issue:
"""A forge issue (not a PR — see `PullRequest`)."""
Review

There should also be a “PullRequest” dataclass/issues and PullRequests should not use the same object

There should also be a “PullRequest” dataclass/issues and PullRequests should not use the same object
number: int
title: str
body: str
state: str # "open" | "closed"
@dataclass(frozen=True)
class PullRequest:
"""A forge pull request. Kept distinct from `Issue` even though some
forges model PRs as issues on the wire: the domain objects carry
different data (a PR has merge state) and are read through different
methods (`read_pr` vs `read_issue`)."""
number: int
title: str
body: str
state: str # "open" | "closed"
merged: bool
@dataclass(frozen=True)
class Comment:
id: int
user: str # login of the comment author
body: str
didericis marked this conversation as resolved
Review

Should be a separate “read_pr” method

Should be a separate “read_pr” method
class ForgeScopeError(PermissionError):
"""Raised by `ScopedForge` when a write targets an issue/PR outside
the assigned scope."""
class Forge(abc.ABC):
"""Provider-agnostic forge operations. Implementations wrap a
per-provider HTTP client and translate to `Issue` / `Comment`."""
@abc.abstractmethod
def read_issue(self, number: int) -> Issue:
"""Read an issue body (read-anywhere)."""
@abc.abstractmethod
def read_pr(self, number: int) -> PullRequest:
"""Read a pull request, including its merge state (read-anywhere)."""
@abc.abstractmethod
def read_comments(self, number: int) -> list[Comment]:
"""Read a thread's comments (read-anywhere)."""
@abc.abstractmethod
def post_comment(self, number: int, body: str) -> None:
"""Post a comment to an issue or PR (write-scoped)."""
@abc.abstractmethod
def update_description(self, number: int, body: str) -> None:
"""Replace an issue or PR body (write-scoped)."""
@abc.abstractmethod
def is_org_member(self, org: str, username: str) -> bool:
"""Whether `username` is a member of `org`."""
@abc.abstractmethod
def get_pr_for_issue(self, number: int) -> int | None:
"""The PR number linked to an issue, or None when there is none."""
@abc.abstractmethod
def is_pr_open(self, number: int) -> bool:
"""Whether the given PR is still open."""
class ScopedForge(Forge):
"""Read-anywhere / write-scoped wrapper around a concrete `Forge`.
`post_comment` and `update_description` are rejected with
`ForgeScopeError` unless the target number is the assigned issue or
one of the assigned PRs. Every other method delegates unchanged, so
reads, membership checks, and PR lookups work against any number for
context.
The writable set is fixed at construction. The sidecar reconstructs
a `ScopedForge` when a PR is discovered (`get_pr_for_issue`) so the
new PR becomes writable; this class does not mutate its own scope.
"""
def __init__(
self,
inner: Forge,
*,
assigned_issue: int,
assigned_prs: Iterable[int] = (),
) -> None:
self._inner = inner
self._assigned_issue = assigned_issue
self._writable = {assigned_issue, *assigned_prs}
@property
def writable(self) -> frozenset[int]:
return frozenset(self._writable)
def _check_write(self, number: int) -> None:
if number not in self._writable:
allowed = ", ".join(str(n) for n in sorted(self._writable))
raise ForgeScopeError(
f"write to #{number} denied: out of assigned scope "
f"(writable: {allowed})"
)
# --- read-anywhere: pass through --------------------------------------
def read_issue(self, number: int) -> Issue:
return self._inner.read_issue(number)
def read_pr(self, number: int) -> PullRequest:
return self._inner.read_pr(number)
def read_comments(self, number: int) -> list[Comment]:
return self._inner.read_comments(number)
def is_org_member(self, org: str, username: str) -> bool:
return self._inner.is_org_member(org, username)
def get_pr_for_issue(self, number: int) -> int | None:
return self._inner.get_pr_for_issue(number)
def is_pr_open(self, number: int) -> bool:
return self._inner.is_pr_open(number)
# --- write-scoped: check then delegate --------------------------------
def post_comment(self, number: int, body: str) -> None:
self._check_write(number)
self._inner.post_comment(number, body)
def update_description(self, number: int, body: str) -> None:
self._check_write(number)
self._inner.update_description(number, body)
+174
View File
@@ -0,0 +1,174 @@
"""Gitea HTTP client + `GiteaForge` (PRD forge-native-integration, chunk 3).
`GiteaClient` is the thin stdlib-only HTTP transport (mirrors
`deploy_key_provisioner.py`: `urllib.request`, bounded timeouts,
structured error bodies). `GiteaForge` adapts it to the provider-agnostic
`Forge` surface.
Unlike the option-2 design, the token is held here (the sidecar process
owns it) and passed to the client directly there is no agent-side
cred-proxy route, because the agent never makes forge calls. The HTTP
client is the one piece shared with `GiteaDeployKeyProvisioner`; the two
are deliberately *not* unified behind a common abstract base (see the
deferral note in the PRD).
"""
from __future__ import annotations
import json
import urllib.error
import urllib.request
from typing import Any
from ..forge.base import Comment, Forge, Issue, PullRequest
# Bound every Gitea call: a hung instance must not stall the sidecar.
_API_TIMEOUT_SECS = 30
class GiteaClient:
"""Thin authenticated HTTP client for one repo's Gitea API.
`api_url` is the API base *including* `/api/v1` (matching the
`FORGE_GITEA_API` env var), e.g. `https://gitea.example.com/api/v1`.
"""
def __init__(self, *, api_url: str, owner: str, repo: str, token: str) -> None:
self._api_url = api_url.rstrip("/")
self._owner = owner
self._repo = repo
self._token = token
# --- low-level request -------------------------------------------------
def _request(
self, method: str, path: str, *, body: dict[str, Any] | None = None
) -> tuple[int, Any]:
"""Issue an authenticated request. Returns `(status, parsed_json)`;
parsed_json is None when the response has no body. Raises
`RuntimeError` on any non-2xx except where callers special-case
the HTTPError themselves (membership 404)."""
url = f"{self._api_url}{path}"
data = json.dumps(body).encode() if body is not None else None
headers = {"Authorization": f"token {self._token}"}
if data is not None:
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, data=data, headers=headers, method=method)
with urllib.request.urlopen(req, timeout=_API_TIMEOUT_SECS) as resp:
raw = resp.read()
parsed = json.loads(raw) if raw else None
return resp.status, parsed
def _repo_path(self, suffix: str) -> str:
return f"/repos/{self._owner}/{self._repo}{suffix}"
# --- operations --------------------------------------------------------
def is_org_member(self, org: str, username: str) -> bool:
"""GET /orgs/{org}/members/{username}: 2xx → member, 404 → not.
Other errors propagate so a misconfigured token fails loudly."""
url = f"{self._api_url}/orgs/{org}/members/{username}"
req = urllib.request.Request(
url, headers={"Authorization": f"token {self._token}"}, method="GET"
)
try:
with urllib.request.urlopen(req, timeout=_API_TIMEOUT_SECS):
return True
except urllib.error.HTTPError as exc:
if exc.code == 404:
return False
raise RuntimeError(
f"org membership check failed for {org}/{username}: "
f"HTTP {exc.code}{_read_error_body(exc)}"
) from exc
def get_issue(self, number: int) -> dict[str, Any]:
_status, body = self._request("GET", self._repo_path(f"/issues/{number}"))
return body or {}
def get_comments(self, number: int) -> list[dict[str, Any]]:
_status, body = self._request(
"GET", self._repo_path(f"/issues/{number}/comments")
)
return body or []
def post_comment(self, number: int, body: str) -> None:
self._request(
"POST",
self._repo_path(f"/issues/{number}/comments"),
body={"body": body},
)
def patch_issue_body(self, number: int, body: str) -> None:
self._request(
"PATCH", self._repo_path(f"/issues/{number}"), body={"body": body}
)
def get_pull(self, number: int) -> dict[str, Any]:
_status, body = self._request("GET", self._repo_path(f"/pulls/{number}"))
return body or {}
class GiteaForge(Forge):
"""`Forge` over a `GiteaClient`."""
def __init__(self, client: GiteaClient) -> None:
self._client = client
def read_issue(self, number: int) -> Issue:
raw = self._client.get_issue(number)
return Issue(
number=int(raw.get("number", number)),
title=str(raw.get("title", "")),
body=str(raw.get("body", "") or ""),
state=str(raw.get("state", "")),
)
def read_pr(self, number: int) -> PullRequest:
raw = self._client.get_pull(number)
return PullRequest(
number=int(raw.get("number", number)),
title=str(raw.get("title", "")),
body=str(raw.get("body", "") or ""),
state=str(raw.get("state", "")),
merged=bool(raw.get("merged", False)),
)
def read_comments(self, number: int) -> list[Comment]:
return [
Comment(
id=int(c.get("id", 0)),
user=str((c.get("user") or {}).get("login", "")),
body=str(c.get("body", "") or ""),
)
for c in self._client.get_comments(number)
]
def post_comment(self, number: int, body: str) -> None:
self._client.post_comment(number, body)
def update_description(self, number: int, body: str) -> None:
self._client.patch_issue_body(number, body)
def is_org_member(self, org: str, username: str) -> bool:
return self._client.is_org_member(org, username)
def get_pr_for_issue(self, number: int) -> int | None:
"""Gitea models a PR as an issue with the same number, exposing a
`pull_request` object on the issue. When the queried number is
itself a PR, return it; otherwise None. (The orchestrator tracks
the issuePR mapping in forge state for the cross-number case.)"""
raw = self._client.get_issue(number)
if raw.get("pull_request"):
return int(raw.get("number", number))
return None
def is_pr_open(self, number: int) -> bool:
return self.read_pr(number).state == "open"
def _read_error_body(exc: urllib.error.HTTPError) -> str:
try:
return exc.read().decode("utf-8", errors="replace")
except Exception: # pylint: disable=broad-exception-caught
return ""
+171
View File
@@ -0,0 +1,171 @@
"""Forge state persistence (PRD forge-native-integration, chunk 2).
The orchestrator tracks one record per forge-targeted issue so it can
map an incoming webhook back to the bottle handling it, drive the
freeze / rehydrate loop, and run the watchdog.
State is stored in a local SQLite database in `~/.bot-bottle/`. Access
goes through the thin `ForgeStateStore` CRUD interface so the backing
Outdated
Review

We should introduce sqlite and start writing state in a local db now. We should do it the following way:

  1. db lives in the home ~/.bot-bottle folder
  2. there’s a thin api for performing crud against the db so we can swap out different storage locations
We should introduce sqlite and start writing state in a local db now. We should do it the following way: 1. db lives in the home ~/.bot-bottle folder 2. there’s a thin api for performing crud against the db so we can swap out different storage locations
store (location or engine) can be swapped without touching callers;
`SqliteForgeStateStore` is the first implementation.
"""
from __future__ import annotations
import abc
import json
import sqlite3
from dataclasses import dataclass, field
from pathlib import Path
from ...supervise import bot_bottle_root
_DB_FILENAME = "bot-bottle.db"
# Lifecycle: a bottle is launched (running), frozen on the done signal,
# and destroyed when the PR closes.
STATUS_RUNNING = "running"
STATUS_FROZEN = "frozen"
STATUS_DESTROYED = "destroyed"
@dataclass
class ForgeState:
"""One forge-targeted issue's bottle lifecycle record."""
owner: str
repo: str
issue_number: int
slug: str
agent_name: str
bottle_names: list[str] = field(default_factory=list)
backend_name: str = ""
agent_git_user: str = ""
pr_number: int | None = None
status: str = STATUS_RUNNING
last_checkin_at: str = ""
class ForgeStateStore(abc.ABC):
"""Thin CRUD surface over forge state. Implementations back it with a
concrete store; callers depend only on this interface so the storage
location/engine is swappable."""
@abc.abstractmethod
def upsert(self, state: ForgeState) -> None:
"""Insert or replace the record keyed by (owner, repo, issue)."""
@abc.abstractmethod
def get(self, owner: str, repo: str, issue_number: int) -> ForgeState | None:
"""Fetch one record, or None when absent."""
@abc.abstractmethod
def delete(self, owner: str, repo: str, issue_number: int) -> None:
"""Remove a record. Missing is success (idempotent)."""
@abc.abstractmethod
def all(self) -> list[ForgeState]:
"""Every record, for the status table and the watchdog sweep."""
def default_db_path() -> Path:
return bot_bottle_root() / _DB_FILENAME
class SqliteForgeStateStore(ForgeStateStore):
"""SQLite-backed `ForgeStateStore`. The database lives at
`~/.bot-bottle/bot-bottle.db` by default; pass `db_path` to point at
a different location (tests, alternate homes)."""
def __init__(self, db_path: Path | None = None) -> None:
self._db_path = db_path or default_db_path()
self._db_path.parent.mkdir(parents=True, exist_ok=True)
with self._connect() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS forge_state (
owner TEXT NOT NULL,
repo TEXT NOT NULL,
issue_number INTEGER NOT NULL,
slug TEXT NOT NULL,
agent_name TEXT NOT NULL,
bottle_names TEXT NOT NULL,
backend_name TEXT NOT NULL,
agent_git_user TEXT NOT NULL,
pr_number INTEGER,
status TEXT NOT NULL,
last_checkin_at TEXT NOT NULL,
PRIMARY KEY (owner, repo, issue_number)
)
"""
)
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self._db_path)
conn.row_factory = sqlite3.Row
return conn
def upsert(self, state: ForgeState) -> None:
with self._connect() as conn:
conn.execute(
"""
INSERT OR REPLACE INTO forge_state (
owner, repo, issue_number, slug, agent_name,
bottle_names, backend_name, agent_git_user,
pr_number, status, last_checkin_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
state.owner,
state.repo,
state.issue_number,
state.slug,
state.agent_name,
json.dumps(state.bottle_names),
state.backend_name,
state.agent_git_user,
state.pr_number,
state.status,
state.last_checkin_at,
),
)
def get(self, owner: str, repo: str, issue_number: int) -> ForgeState | None:
with self._connect() as conn:
row = conn.execute(
"SELECT * FROM forge_state "
"WHERE owner = ? AND repo = ? AND issue_number = ?",
(owner, repo, issue_number),
).fetchone()
return _row_to_state(row) if row is not None else None
def delete(self, owner: str, repo: str, issue_number: int) -> None:
with self._connect() as conn:
conn.execute(
"DELETE FROM forge_state "
"WHERE owner = ? AND repo = ? AND issue_number = ?",
(owner, repo, issue_number),
)
def all(self) -> list[ForgeState]:
with self._connect() as conn:
rows = conn.execute(
"SELECT * FROM forge_state ORDER BY owner, repo, issue_number"
).fetchall()
return [_row_to_state(row) for row in rows]
def _row_to_state(row: sqlite3.Row) -> ForgeState:
return ForgeState(
owner=row["owner"],
repo=row["repo"],
issue_number=row["issue_number"],
slug=row["slug"],
agent_name=row["agent_name"],
bottle_names=json.loads(row["bottle_names"]),
backend_name=row["backend_name"],
agent_git_user=row["agent_git_user"],
pr_number=row["pr_number"],
status=row["status"],
last_checkin_at=row["last_checkin_at"],
)
@@ -0,0 +1,439 @@
# PRD prd-new: Forge native integration
- **Status:** Draft
- **Author:** claude
- **Created:** 2026-06-29
- **Issue:** #317
## Summary
Add a webhook-driven orchestration layer that lets Gitea issues and PR comments
drive bot-bottle sessions end-to-end with no operator in the loop for the happy
path. An issue assigned to a member of the configured agent org and labelled
with an agent name triggers a headless bottle launch; the bottle processes the
issue, opens a PR, and interacts with the forge through a **forge sidecar**
the agent never touches the Gitea API or its credentials directly. The agent
calls `signal_done(status, summary)` on the sidecar when a work unit is
complete; the sidecar relays that to the orchestrator over a queue dir (the same
pattern as the supervise sidecar), so completion is an unambiguous in-band
signal rather than a comment the orchestrator has to parse. The orchestrator
freezes the bottle. Subsequent PR comments rehydrate the frozen bottle. The
bottle is destroyed when the PR closes.
The forge sidecar is backed by a `Forge` abstract class with per-provider
implementations (Gitea first), so the agent's prompts and the sidecar protocol
stay forge-agnostic. The sidecar logs forge operations semantically ("read PR
description", "posted comment", "signalled done"), giving richer provenance than
post-hoc egress-byte parsing, and enforces a **read-anywhere / write-scoped**
permission model: the agent may read for context but may only write to the
issue and PRs it was assigned.
Run provenance is exposed through a **provenance API** (the sidecar's structured
operation log plus the run's metadata), not posted back into the forge. We do
not surface a provenance footer in the PR — the audit record lives behind the
API where it can be retained and queried, rather than as an editable comment.
The separation of concerns across the two layers: bot-bottle owns the headless
launch primitives, the forge sidecar + `Forge` abstraction, and forge state.
`bot-bottle-orchestrator` (separate binary) owns the webhook listener, bottle
lifecycle loop, and monitoring dashboard; it calls into bot-bottle via
`./cli.py orchestrate`, a thin wrapper command. This PRD covers bot-bottle's
side of that contract.
## Problem
Today an operator must open the TUI, select an agent and bottle, confirm the
preflight, and type prompts interactively. This blocks "issue → PR" automation
and produces no durable audit record of what the agent did. The security model
already provides the right isolation and egress controls, and `start --headless`
(#315) already gives `bot-bottle-orchestrator` a non-interactive launch path.
The missing pieces are a headless `resume` counterpart for rehydrating frozen
bottles, a forge-interaction surface the agent uses to read context, post
comments, and signal completion, and the provenance trail that makes the audit
story legible to reviewers on every PR.
That forge-interaction surface could be built two ways: (2) give the agent the
Gitea API directly with cred-proxy injecting the token, or (3) put a forge
sidecar between the agent and the forge. This PRD takes **option 3**. The
deciding factors: a sidecar `signal_done` call is an unambiguous completion
signal where comment-parsing is a correctness risk that surfaces in production;
the sidecar produces a semantic audit trail rather than HTTP bytes, which is
load-bearing for provenance (the stated product priority); and the sidecar can
enforce scope tighter than repo-wide API-key permissions, reducing blast radius
for a prompt-injected agent. The costs — a second sidecar process per forge run,
a new failure mode if it crashes, and per-forge implementation cost — are
accepted as the price of those properties.
## Goals / Success Criteria
1. Headless launch already exists: `./cli.py start <agent> --headless --prompt`
(#315) runs non-interactively with no TUI selectors or y/N preflight. This
PRD builds on it rather than re-introducing it. The remaining gap is a
matching headless `resume` path (`./cli.py resume --headless`), since
rehydrating a frozen bottle for a new prompt is required by the freeze /
rehydrate loop and `resume` has no non-interactive entry point today.
2. An issue assigned to a member of the configured org (`FORGE_ORG`, default
`bot-bottle`) and labelled `bot-bottle:<agent-name>` is the trigger
convention. Org membership is verified via the Gitea API at event time.
3. Forge-targeted bottles run a **forge sidecar** that exposes a small,
forge-agnostic API (comment/issue/PR CRUD plus `signal_done`) over the same
queue-dir + HTTP/JSON-RPC machinery as the supervise sidecar. The agent calls
the sidecar; it never sees the forge token or forge-specific endpoints.
4. The sidecar is backed by a `Forge` abstract class. Gitea is the first
concrete implementation; adding a forge means a new subclass, not changes to
the agent prompt or sidecar protocol. The sidecar enforces a read-anywhere /
write-scoped model: writes are limited to the assigned issue and its PRs;
reads are unrestricted for context.
5. The agent calls `signal_done(status, summary)` on the sidecar when a work
unit is complete; the sidecar relays it to the orchestrator over a queue dir.
This is the done signal — no comment parsing. A watchdog timeout
(configurable, default 30 min) causes the orchestrator to treat the run as
done-without-self-report if the agent exits without signalling.
6. Run provenance (agent name, bottle name(s), slug, timing, exit code,
gitleaks result, egress summary, and the sidecar's semantic operation log)
is available through a provenance API. It is **not** surfaced as a PR footer
or any other forge comment.
7. Forge state (issue → slug, status) is persisted in a local SQLite database
under `~/.bot-bottle/` and survives orchestrator restarts.
8. `./cli.py orchestrate status` lists active forge-managed bottles and their
issue/PR URLs.
9. Unit tests cover: label parsing, org-membership check path, forge state
store CRUD (SQLite), headless launch arg construction, forge env var
injection, sidecar request dispatch through the `Forge` abstraction,
write-scope enforcement (reject writes outside the assigned issue/PRs), and
`signal_done` queue relay.
## Non-goals
- Webhook signature verification (HMAC-SHA256). Added as a follow-up.
- The `bot-bottle-orchestrator` binary itself — this PRD covers bot-bottle's
side of the interface only. The orchestrator is a separate project.
- GitHub or GitLab support.
- Multiple simultaneous forge bottles per issue.
- Automatic retry on agent error exit.
- Bottle destruction on issue close (PR close only; issue close is ambiguous).
- Concurrent multi-issue handling (one blocking run per orchestrator process).
- A monitoring dashboard (orchestrator-side concern).
- Folding `DeployKeyProvisioner` into the `Forge` abstraction. Deploy-key
provisioning runs at bottle-provision time on the host; the forge sidecar runs
inside the bottle at agent time. The two have different lifecycles and actors,
so coupling them into one class is deferred to a follow-up. This PRD only
shares the Gitea HTTP client between them.
## Design
### Targeting convention
An issue is forge-targeted when **both** hold:
- At least one assignee is a member of the Gitea org named by `FORGE_ORG`
(default `bot-bottle`). Checked via `GET /api/v1/orgs/{org}/members/{user}`.
- At least one label has the prefix `bot-bottle:`. The suffix names the agent
manifest, e.g. `bot-bottle:implementer` → agent `implementer`.
`FORGE_ORG` is read at orchestrate-command startup. It is not embedded in
manifests or state files; the orchestrator stamps its value into log output for
auditability.
An optional label `bot-bottle-bottle:<name>` overrides bottle selection. When
absent the agent's default bottle is used.
### `./cli.py orchestrate` — the thin wrapper
```
./cli.py orchestrate start --agent AGENT [--bottle BOTTLE ...] --prompt PROMPT
[--label LABEL] [--backend BACKEND]
./cli.py orchestrate resume --slug SLUG --prompt PROMPT [--backend BACKEND]
./cli.py orchestrate status
```
`orchestrate start` is a thin shim over the already-shipped `start --headless`
(#315): it forwards agent / bottle / label / prompt and adds the forge-specific
wiring (`forge_env`, sidecar launch). It does not re-implement headless launch.
The caller (`bot-bottle-orchestrator`) manages freeze, state, and the forge
sidecar's done signal around it.
`orchestrate resume` is the shim over the new `resume --headless` (below).
`orchestrate status` prints the forge state table.
### Headless primitives — what exists vs. what's new
Headless **start** already shipped in #315 and this PRD reuses it as-is:
- `./cli.py start <agent> --headless --prompt TEXT` — no TUI selectors, no y/N
preflight. Internally `_start_headless()` calls the shared `_launch_bottle()`
with `assume_yes=True` and `headless_prompt_text=prompt`.
- The prompt is delivered through `AgentProvider.headless_prompt(prompt)`
claude `-p`, codex positional, pi `-p`. The orchestrator does **not** hand-roll
agent args; it relies on this provider abstraction. (An earlier draft proposed
`start_headless` / `attach_agent_headless` helpers that constructed
`--no-interactive`/`-p` directly — those are dropped as redundant with, and
divergent from, what #315 merged.)
Two additions are needed on top of #315:
**1. A `forge_env` hook on the headless launch path.** The orchestrator needs to
pass forge context + token through to the forge sidecar launched alongside the
agent. This is a parameter threaded into `_launch_bottle` (the same core
`start --headless` already uses), not a parallel launch function. The agent
process itself does not receive the token.
**2. `resume --headless`** — new in `bot_bottle/cli/resume.py`, mirroring the
`--headless` flag on `start`:
```
./cli.py resume <slug> --headless --prompt TEXT
```
It rehydrates a frozen bottle and runs one headless prompt via the same
`assume_yes` + `headless_prompt` path, returning the agent's exit code. `resume`
has no non-interactive entry point today, so this is genuinely new work rather
than a rename of an existing helper.
### Forge sidecar
Forge-targeted bottles run a forge sidecar alongside the agent, mirroring the
supervise sidecar: a per-bottle process that exposes an HTTP/JSON-RPC endpoint
over a Unix socket and relays events to the orchestrator through a queue dir.
The agent calls the sidecar; the sidecar holds the forge token and makes the
actual forge API calls. The agent never receives the credential and never sees a
forge-specific endpoint — swapping Gitea for another forge does not change the
agent prompt or the sidecar protocol.
The sidecar is configured at launch from the forge context (owner, repo, issue,
PR) and the token, supplied by the orchestrator — not baked into the agent
manifest. Because the sidecar owns the token, forge traffic does not need a
cred-proxy egress route on the agent; the agent's egress policy is unchanged by
forge targeting.
**Sidecar protocol** (forge-agnostic; each method maps to a `Forge` call):
| Method | Scope | Purpose |
|---|---|---|
| `read_issue(number)` | read-anywhere | Read an issue body for context |
| `read_pr(number)` | read-anywhere | Read a PR (incl. merge state) for context |
| `read_comments(number)` | read-anywhere | Read a thread for context |
| `post_comment(number, body)` | write-scoped | Post to the assigned issue/PR |
| `update_description(number, body)` | write-scoped | Edit the assigned issue/PR body |
| `signal_done(status, summary)` | — | Relay completion to the orchestrator |
Issues and PRs are distinct domain objects (`Issue` vs `PullRequest`) read
through distinct methods; a PR carries merge state an issue does not.
**Scope enforcement** is read-anywhere / write-scoped: read methods accept any
issue/PR number for context; write methods are rejected unless the target is the
assigned issue or one of its PRs. This is tighter than Gitea's repo-wide API-key
permissions and bounds the blast radius of a prompt-injected agent. Rejections
are logged semantically (operation, target, reason) so the audit trail records
attempted out-of-scope writes, not just allowed ones.
**Semantic audit**: every sidecar call is logged as a structured operation
("read PR #318 description", "posted comment to #317", "signalled done:
success") rather than as opaque HTTP bytes. This log feeds provenance directly,
with no post-hoc egress-log parsing.
### `Forge` abstraction — `bot_bottle/contrib/forge/`
The sidecar dispatches to a `Forge` abstract class. Each provider implements the
operations behind the sidecar protocol:
```python
class Forge(abc.ABC):
@abc.abstractmethod
def read_issue(self, number: int) -> Issue: ...
@abc.abstractmethod
def read_pr(self, number: int) -> PullRequest: ...
@abc.abstractmethod
def read_comments(self, number: int) -> list[Comment]: ...
@abc.abstractmethod
def post_comment(self, number: int, body: str) -> None: ...
@abc.abstractmethod
def update_description(self, number: int, body: str) -> None: ...
@abc.abstractmethod
def is_org_member(self, org: str, username: str) -> bool: ...
@abc.abstractmethod
def get_pr_for_issue(self, number: int) -> int | None: ...
@abc.abstractmethod
def is_pr_open(self, number: int) -> bool: ...
```
`Issue` and `PullRequest` are separate frozen dataclasses — a PR adds `merged`.
`ScopedForge` wraps a concrete `Forge` to enforce the read-anywhere /
write-scoped model (`post_comment` / `update_description` raise `ForgeScopeError`
outside the assigned issue and PRs).
`GiteaForge` is the first and only concrete implementation in this PRD. It wraps
the Gitea HTTP client (below). Adding GitHub or GitLab later is a new subclass;
the sidecar, protocol, and agent prompt are untouched.
> **Deferred:** `DeployKeyProvisioner` is *not* folded into `Forge` here.
> Deploy-key provisioning runs on the host at provision time; the sidecar runs
> in the bottle at agent time. They have different lifecycles and actors, so a
> shared abstract base would couple two unrelated auth contexts. For now they
> only share the Gitea HTTP client; a later PRD can revisit unification.
### Forge env vars
The orchestrator passes forge context to the **sidecar** (not the agent) at
launch. The agent does not need owner/repo/issue env vars to construct API
calls, since it only names issue/PR numbers to the sidecar:
| Var | Example | Purpose |
|---|---|---|
| `FORGE_GITEA_API` | `https://gitea.dideric.is/api/v1` | Base URL the sidecar calls |
| `FORGE_OWNER` | `didericis` | Repo owner |
| `FORGE_REPO` | `bot-bottle` | Repo name |
| `FORGE_ISSUE_NUMBER` | `317` | Assigned issue (defines write scope) |
| `FORGE_PR_NUMBER` | `318` | Assigned PR (empty until PR exists) |
The agent's forge-specific prompt instructs it to call `signal_done` on the
sidecar when a work unit is complete, and to use the sidecar for any
comment/description writes. The instruction is forge-agnostic and is part of the
forge prompt overlay, not the base agent manifest, so non-forge runs are
unaffected.
### Done signal and watchdog
The agent calls `signal_done(status, summary)` on the sidecar when it finishes a
work unit. The sidecar writes the event to its queue dir; the orchestrator reads
it and:
1. Reads the forge state for `(owner, repo, issue_number)`.
2. If `status == "running"`, treats the event as the done signal: freezes the
bottle and sets `status = "frozen"`. Provenance is recorded via the
provenance API — no comment is posted to the forge.
Because completion is an explicit `signal_done` call, the orchestrator does not
parse comment text to detect "done", and intermediate comments the agent posts
mid-run cannot be mistaken for completion.
**Watchdog**: the orchestrator tracks `last_checkin_at` in forge state, updated
on each sidecar event. A background thread wakes every minute. If
`now - last_checkin_at > FORGE_WATCHDOG_TIMEOUT` (default 30 min, configurable
via env) and `status == "running"`, the orchestrator treats the run as
done-without-self-report and freezes the bottle, flagging the run as incomplete
in the provenance record.
**Sidecar-death failure mode**: if the forge sidecar crashes mid-run the agent
loses forge access while the bottle is otherwise healthy. The orchestrator
detects a dead sidecar (socket/queue gone) the same way it detects a stalled
agent and falls back to the watchdog path.
### Forge state — `bot_bottle/contrib/gitea/forge_state.py`
State is stored in a local SQLite database at `~/.bot-bottle/bot-bottle.db`.
Access goes through a thin CRUD interface, `ForgeStateStore`, so the storage
location/engine can be swapped without touching callers. `SqliteForgeStateStore`
is the first implementation.
The `forge_state` table is keyed by `(owner, repo, issue_number)` and carries:
`slug`, `agent_name`, `bottle_names` (JSON), `backend_name`, `agent_git_user`,
`pr_number` (nullable), `status`, `last_checkin_at`.
`status`: `"running"` | `"frozen"` | `"destroyed"`.
Store interface:
```python
class ForgeStateStore(abc.ABC):
def upsert(self, state: ForgeState) -> None: ...
def get(self, owner: str, repo: str, issue_number: int) -> ForgeState | None: ...
def delete(self, owner: str, repo: str, issue_number: int) -> None: ...
def all(self) -> list[ForgeState]: ...
class SqliteForgeStateStore(ForgeStateStore):
def __init__(self, db_path: Path | None = None) -> None: ...
```
`upsert` uses `INSERT OR REPLACE` so a re-run for the same issue overwrites in
place. The schema is created on first open.
### Provenance API
Run provenance — agent, bottle(s), slug, timing, exit code, gitleaks result,
egress summary, watchdog-fired flag, and the sidecar's semantic operation log —
is exposed through a **provenance API**, not posted into the forge. There is no
provenance footer or run-summary comment.
The rationale (per the monetization positioning): a PR comment is mutable by any
maintainer, unsigned, and per-PR, so it is worthless as an audit record and
invites false trust. The authoritative record therefore lives behind the API,
where it can be retained, queried, and (eventually) signed. Whether any
projection of it ever appears in the forge is a separate, out-of-scope decision;
this PR does not build one.
The API surface itself (schema, transport, signing, retention) is **out of scope
for this PRD** and belongs with the orchestrator / control-plane work. bot-bottle
here only produces the raw material: the sidecar's semantic operation log and the
run metadata the orchestrator collects.
### Gitea HTTP client — `bot_bottle/contrib/gitea/client.py`
`GiteaForge` (and the existing `GiteaDeployKeyProvisioner`) share one thin HTTP
client. Unlike the option-2 design, the token is held by the sidecar process and
passed to the client directly — there is no agent-side cred-proxy route to
inject it, because the agent never makes forge calls.
```python
class GiteaClient:
def __init__(self, *, api_url: str, owner: str, repo: str, token: str) -> None: ...
def is_org_member(self, org: str, username: str) -> bool: ...
def get_issue(self, number: int) -> dict: ...
def get_comments(self, number: int) -> list[dict]: ...
def post_comment(self, number: int, body: str) -> None: ...
def patch_issue_body(self, number: int, body: str) -> None: ...
def get_pull(self, number: int) -> dict: ...
```
`GiteaForge` adapts this client to the `Forge` surface (mapping raw JSON to
`Issue` / `PullRequest` / `Comment`). Sharing only the HTTP client (not an
abstract base) is the deliberate boundary between the sidecar and the deploy-key
provisioner — see the deferral note under the `Forge` abstraction.
### Implementation chunks
1. **Headless additions on top of #315** — thread a `forge_env` parameter into
the existing `_launch_bottle` core (the one `start --headless` already uses);
add a `--headless` path to `cli/resume.py` reusing `assume_yes` +
`headless_prompt`. No new `start_headless`/`attach_agent_headless` helpers.
Tests: `forge_env` reaches the sidecar/`guest_env`; `resume --headless` skips
the TUI and y/N preflight and returns the agent exit code.
2. **Forge state**`contrib/gitea/forge_state.py`: `ForgeState` dataclass,
`ForgeStateStore` CRUD interface, `SqliteForgeStateStore`. Tests: round-trip,
missing → None, `INSERT OR REPLACE` upsert, delete idempotent, `all()`
ordering, persistence across store instances.
3. **`Forge` abstraction + Gitea client** — `contrib/forge/base.py` (`Forge`
ABC, `ScopedForge`, `Issue` / `PullRequest` / `Comment`) and
`contrib/gitea/client.py` + `GiteaForge`: `is_org_member`, `read_issue`,
`read_pr`, `read_comments`, `post_comment`, `update_description`,
`get_pr_for_issue`, `is_pr_open`. Tests: mock `urllib.request.urlopen`,
assert payloads and 404-as-false for membership; `ScopedForge` write-scope
enforcement.
4. **Forge sidecar** — sidecar process exposing the protocol over a Unix socket,
queue-dir relay, write-scope enforcement, semantic op log, `signal_done`.
Reuses the supervise sidecar bundle machinery. Tests: dispatch each method to
the `Forge`, reject out-of-scope writes, `signal_done` writes a queue event,
scope-rejection is logged.
5. **`./cli.py orchestrate`** — `cli/orchestrate.py` with `start`, `resume`,
`status` subcommands wired into `cli.py`; `start` launches the forge sidecar
alongside the agent for forge-targeted runs. Tests: arg parsing, `start`
delegates to `start --headless`, `resume` delegates to `resume --headless`.
## Provenance
Run provenance is captured (sidecar semantic operation log + run metadata) and
exposed through a provenance API. It is deliberately **not** surfaced in the
forge — no footer, no run-summary comment. A mutable, unsigned PR comment is not
an audit record; the authoritative record lives behind the API where it can be
retained and signed. The `watchdog_fired` flag marks runs where the agent did
not self-report completion so consumers of the API know the record may be
incomplete.
The provenance API's schema, transport, signing, and retention are out of scope
for this PRD (control-plane work); bot-bottle here produces the raw material
only.
+391
View File
@@ -0,0 +1,391 @@
"""Unit: bot_bottle public Python API (bot_bottle/__init__.py surface).
Covers start_headless, resume_headless, freeze, and destroy the four
operations the bot-bottle-orchestrator's ProgrammaticBottleRunner uses.
All I/O is stubbed so no container is created.
"""
from __future__ import annotations
import unittest
from pathlib import Path
from unittest.mock import MagicMock, patch
from bot_bottle import BottleError, destroy, freeze, resume_headless, start_headless
from bot_bottle.log import Die
# ---------------------------------------------------------------------------
# helpers
# ---------------------------------------------------------------------------
def _make_manifest(agent_name: str = "implementer", bottle_name: str = "claude"):
manifest = MagicMock()
manifest.agents = {agent_name: MagicMock(bottle=bottle_name)}
manifest.all_agent_names = [agent_name]
manifest.all_bottle_names = [bottle_name]
manifest.home_md = None # eager mode — _peek_agent_bottle uses agents dict
manifest.require_agent = MagicMock(return_value=None)
return manifest
def _metadata(
slug: str = "implementer-abc12",
agent_name: str = "implementer",
backend: str = "docker",
):
md = MagicMock()
md.identity = slug
md.agent_name = agent_name
md.cwd = "/repo"
md.copy_cwd = False
md.bottle_names = ["claude"]
md.backend = backend
return md
# ---------------------------------------------------------------------------
# start_headless
# ---------------------------------------------------------------------------
class TestStartHeadless(unittest.TestCase):
def setUp(self) -> None:
self._manifest = _make_manifest()
patch("bot_bottle.api.ManifestIndex.resolve", return_value=self._manifest).start()
self._launch = patch(
"bot_bottle.api._launch_bottle", return_value=("implementer-abc12", 0)
).start()
patch(
"bot_bottle.api._uniquify_label_headless", side_effect=str
).start()
self.addCleanup(patch.stopall)
def _spec(self):
self._launch.assert_called_once()
return self._launch.call_args[0][0]
def test_returns_slug_on_success(self):
slug = start_headless("implementer", prompt="Do it")
self.assertEqual("implementer-abc12", slug)
def test_passes_assume_yes_and_prompt(self):
start_headless("implementer", prompt="Do it")
kwargs = self._launch.call_args[1]
self.assertTrue(kwargs["assume_yes"])
self.assertEqual("Do it", kwargs["headless_prompt_text"])
def test_explicit_bottles_forwarded(self):
start_headless("implementer", prompt="Do it", bottles=["dev", "claude"])
self.assertEqual(("dev", "claude"), self._spec().bottle_names)
def test_default_bottle_resolved_from_manifest(self):
start_headless("implementer", prompt="Do it")
self.assertEqual(("claude",), self._spec().bottle_names)
def test_forge_env_on_spec(self):
env = {"FORGE_GITEA_API": "https://gitea.example.com/api/v1", "FORGE_OWNER": "acme"}
start_headless("implementer", prompt="Do it", forge_env=env)
self.assertEqual(env, self._spec().forge_env)
def test_no_forge_env_defaults_to_empty_dict(self):
start_headless("implementer", prompt="Do it")
self.assertEqual({}, self._spec().forge_env)
def test_nonzero_exit_raises_bottle_error(self):
self._launch.return_value = ("implementer-abc12", 1)
with self.assertRaises(BottleError) as ctx:
start_headless("implementer", prompt="Do it")
self.assertEqual(1, ctx.exception.exit_code)
def test_no_default_bottle_raises_bottle_error(self):
manifest = _make_manifest(bottle_name="")
with patch("bot_bottle.api.ManifestIndex.resolve", return_value=manifest):
with self.assertRaises(BottleError):
start_headless("implementer", prompt="Do it")
self._launch.assert_not_called()
def test_manifest_error_in_resolve_raises_bottle_error(self):
from bot_bottle.manifest import ManifestError
patch(
"bot_bottle.api.ManifestIndex.resolve", side_effect=ManifestError("bad")
).start()
with self.assertRaises(BottleError):
start_headless("implementer", prompt="Do it")
self._launch.assert_not_called()
def test_die_from_launch_bottle_raises_bottle_error(self):
self._launch.side_effect = Die(3, "backend exploded")
with self.assertRaises(BottleError) as ctx:
start_headless("implementer", prompt="Do it")
self.assertEqual(3, ctx.exception.exit_code)
def test_backend_name_forwarded(self):
start_headless("implementer", prompt="Do it", backend_name="docker")
self.assertEqual("docker", self._launch.call_args[1]["backend_name"])
def test_label_forwarded_to_spec(self):
start_headless("implementer", prompt="Do it", label="nightly")
self.assertEqual("nightly", self._spec().label)
def test_color_forwarded_to_spec(self):
start_headless("implementer", prompt="Do it", color="green")
self.assertEqual("green", self._spec().color)
# ---------------------------------------------------------------------------
# resume_headless
# ---------------------------------------------------------------------------
class TestResumeHeadless(unittest.TestCase):
def setUp(self) -> None:
self._md = _metadata()
patch("bot_bottle.api.read_metadata", return_value=self._md).start()
manifest = _make_manifest()
patch("bot_bottle.api.ManifestIndex.resolve", return_value=manifest).start()
self._launch = patch(
"bot_bottle.api._launch_bottle", return_value=("implementer-abc12", 0)
).start()
self.addCleanup(patch.stopall)
def _spec(self):
self._launch.assert_called_once()
return self._launch.call_args[0][0]
def test_passes_assume_yes_and_prompt(self):
resume_headless("implementer-abc12", prompt="Address review")
kwargs = self._launch.call_args[1]
self.assertTrue(kwargs["assume_yes"])
self.assertEqual("Address review", kwargs["headless_prompt_text"])
def test_identity_set_on_spec(self):
resume_headless("implementer-abc12", prompt="Prompt")
self.assertEqual("implementer-abc12", self._spec().identity)
def test_forge_env_on_spec(self):
env = {"FORGE_ISSUE_NUMBER": "42"}
resume_headless("implementer-abc12", prompt="Prompt", forge_env=env)
self.assertEqual(env, self._spec().forge_env)
def test_missing_state_raises_bottle_error(self):
with patch("bot_bottle.api.read_metadata", return_value=None):
with self.assertRaises(BottleError):
resume_headless("no-such-abc12", prompt="Prompt")
self._launch.assert_not_called()
def test_nonzero_exit_raises_bottle_error(self):
self._launch.return_value = ("implementer-abc12", 2)
with self.assertRaises(BottleError) as ctx:
resume_headless("implementer-abc12", prompt="Prompt")
self.assertEqual(2, ctx.exception.exit_code)
def test_manifest_error_in_resolve_raises_bottle_error(self):
from bot_bottle.manifest import ManifestError
patch(
"bot_bottle.api.ManifestIndex.resolve", side_effect=ManifestError("bad")
).start()
with self.assertRaises(BottleError):
resume_headless("implementer-abc12", prompt="Prompt")
self._launch.assert_not_called()
def test_die_from_launch_bottle_raises_bottle_error(self):
self._launch.side_effect = Die(5, "resume failed")
with self.assertRaises(BottleError) as ctx:
resume_headless("implementer-abc12", prompt="Prompt")
self.assertEqual(5, ctx.exception.exit_code)
def test_backend_from_metadata_when_not_supplied(self):
resume_headless("implementer-abc12", prompt="Prompt")
self.assertEqual("docker", self._launch.call_args[1]["backend_name"])
def test_explicit_backend_overrides_metadata(self):
resume_headless(
"implementer-abc12", prompt="Prompt", backend_name="smolmachines"
)
self.assertEqual("smolmachines", self._launch.call_args[1]["backend_name"])
# ---------------------------------------------------------------------------
# freeze
# ---------------------------------------------------------------------------
class TestFreeze(unittest.TestCase):
def setUp(self) -> None:
patch("bot_bottle.api.read_metadata", return_value=_metadata()).start()
self._freezer = MagicMock()
self._get_freezer = patch(
"bot_bottle.api.get_freezer", return_value=self._freezer
).start()
self.addCleanup(patch.stopall)
def test_calls_commit_slug(self):
freeze("implementer-abc12")
self._freezer.commit_slug.assert_called_once_with("implementer-abc12")
def test_backend_from_metadata_when_not_supplied(self):
freeze("implementer-abc12")
self._get_freezer.assert_called_once_with("docker")
def test_explicit_backend_used(self):
freeze("implementer-abc12", backend_name="smolmachines")
self._get_freezer.assert_called_once_with("smolmachines")
def test_commit_cancelled_raises_bottle_error(self):
from bot_bottle.backend.freeze import CommitCancelled
self._freezer.commit_slug.side_effect = CommitCancelled("declined")
with self.assertRaises(BottleError):
freeze("implementer-abc12")
def test_die_from_freezer_raises_bottle_error(self):
self._freezer.commit_slug.side_effect = Die(2, "commit exploded")
with self.assertRaises(BottleError) as ctx:
freeze("implementer-abc12")
self.assertEqual(2, ctx.exception.exit_code)
# ---------------------------------------------------------------------------
# destroy
# ---------------------------------------------------------------------------
class TestDestroy(unittest.TestCase):
def setUp(self) -> None:
patch("bot_bottle.api.read_metadata", return_value=_metadata()).start()
self._dd = patch("bot_bottle.api._destroy_docker").start()
patch("bot_bottle.api.clear_preserve_marker").start()
self._cleanup = patch("bot_bottle.api.cleanup_state").start()
self.addCleanup(patch.stopall)
def test_docker_backend_calls_destroy_docker(self):
destroy("implementer-abc12")
self._dd.assert_called_once_with("implementer-abc12")
def test_state_dir_always_cleaned(self):
destroy("implementer-abc12")
self._cleanup.assert_called_once_with("implementer-abc12")
def test_smolmachines_backend_calls_destroy_smolmachines(self):
patch(
"bot_bottle.api.read_metadata",
return_value=_metadata(backend="smolmachines"),
).start()
ds = patch("bot_bottle.api._destroy_smolmachines").start()
destroy("implementer-abc12")
ds.assert_called_once_with("implementer-abc12")
self._dd.assert_not_called()
def test_other_backend_skips_docker_and_smolmachines(self):
patch(
"bot_bottle.api.read_metadata",
return_value=_metadata(backend="macos-container"),
).start()
ds = patch("bot_bottle.api._destroy_smolmachines").start()
destroy("implementer-abc12")
self._dd.assert_not_called()
ds.assert_not_called()
self._cleanup.assert_called_once_with("implementer-abc12")
def test_missing_metadata_defaults_to_docker(self):
patch("bot_bottle.api.read_metadata", return_value=None).start()
destroy("no-state-abc12")
self._dd.assert_called_once_with("no-state-abc12")
def test_explicit_backend_overrides_metadata(self):
ds = patch("bot_bottle.api._destroy_smolmachines").start()
destroy("implementer-abc12", backend_name="smolmachines")
ds.assert_called_once_with("implementer-abc12")
self._dd.assert_not_called()
def test_die_from_backend_raises_bottle_error(self):
self._dd.side_effect = Die(1, "compose failed")
with self.assertRaises(BottleError):
destroy("implementer-abc12")
# ---------------------------------------------------------------------------
# _destroy_docker (helper)
# ---------------------------------------------------------------------------
class TestDestroyDocker(unittest.TestCase):
def setUp(self) -> None:
self._compose_down = patch(
"bot_bottle.backend.docker.compose.compose_down"
).start()
self._compose_project_name = patch(
"bot_bottle.backend.docker.compose.compose_project_name",
return_value="bb-proj",
).start()
self._state_dir = patch(
"bot_bottle.bottle_state.bottle_state_dir",
return_value=Path("/fake/state"),
).start()
self.addCleanup(patch.stopall)
def _run(self, exists: bool) -> None:
fake_file = MagicMock()
fake_file.exists.return_value = exists
with patch(
"bot_bottle.backend.docker.compose.compose_file_path",
return_value=fake_file,
):
from bot_bottle.api import _destroy_docker
_destroy_docker("slug-1")
def test_calls_compose_down_when_file_exists(self) -> None:
self._run(exists=True)
self._compose_down.assert_called_once()
def test_noop_when_compose_file_absent(self) -> None:
self._run(exists=False)
self._compose_down.assert_not_called()
# ---------------------------------------------------------------------------
# _destroy_smolmachines (helper)
# ---------------------------------------------------------------------------
class TestDestroySmolmachines(unittest.TestCase):
def setUp(self) -> None:
self._run = patch("subprocess.run").start()
self._run.return_value = MagicMock(returncode=0, stderr="", stdout="")
self.addCleanup(patch.stopall)
def _call(self) -> None:
from bot_bottle.api import _destroy_smolmachines
_destroy_smolmachines("slug-7")
def test_issues_stop_then_delete(self) -> None:
self._call()
self.assertEqual(2, self._run.call_count)
first_argv = self._run.call_args_list[0][0][0]
self.assertIn("stop", first_argv)
second_argv = self._run.call_args_list[1][0][0]
self.assertIn("delete", second_argv)
def test_nonzero_delete_does_not_raise(self) -> None:
self._run.side_effect = [
MagicMock(returncode=0),
MagicMock(returncode=1, stderr="not found", stdout=""),
]
self._call() # must not raise
# ---------------------------------------------------------------------------
# public surface exported from bot_bottle.__init__
# ---------------------------------------------------------------------------
class TestPublicSurface(unittest.TestCase):
def test_importable_from_package(self):
import bot_bottle
for name in ("BottleError", "start_headless", "resume_headless", "freeze", "destroy"):
self.assertTrue(hasattr(bot_bottle, name), f"missing: {name}")
if __name__ == "__main__":
unittest.main()
+75
View File
@@ -0,0 +1,75 @@
"""Unit: `cli.py resume --headless` non-interactive rehydrate path.
The freeze / rehydrate loop needs a non-interactive `resume`: deliver a
follow-up prompt and skip the y/N preflight, reusing the same launch
core (`assume_yes` + `headless_prompt_text`) as `start --headless`.
"""
from __future__ import annotations
import unittest
from typing import Any
from unittest.mock import MagicMock, patch
import bot_bottle.cli.resume as resume_mod
from bot_bottle.log import Die
def _metadata():
md = MagicMock()
md.agent_name = "implementer"
md.copy_cwd = False
md.cwd = "/repo"
md.identity = "implementer-abc12"
md.bottle_names = ["claude"]
md.backend = "docker"
return md
class ResumeHeadlessTest(unittest.TestCase):
def setUp(self) -> None:
self._launch = patch.object(
resume_mod, "_launch_bottle", return_value=("implementer-abc12", 0)
).start()
patch.object(
resume_mod, "read_metadata", return_value=_metadata()
).start()
manifest = MagicMock()
manifest.require_agent = MagicMock(return_value=None)
patch.object(
resume_mod.ManifestIndex, "resolve", return_value=manifest
).start()
self.addCleanup(patch.stopall)
def _launch_kwargs(self) -> dict[str, Any]:
self._launch.assert_called_once()
return dict(self._launch.call_args.kwargs)
def test_headless_passes_assume_yes_and_prompt(self):
rc = resume_mod.cmd_resume(
["implementer-abc12", "--headless", "--prompt", "Address the review"]
)
self.assertEqual(0, rc)
kwargs = self._launch_kwargs()
self.assertTrue(kwargs["assume_yes"])
self.assertEqual("Address the review", kwargs["headless_prompt_text"])
def test_interactive_resume_unchanged(self):
resume_mod.cmd_resume(["implementer-abc12"])
kwargs = self._launch_kwargs()
self.assertFalse(kwargs["assume_yes"])
self.assertEqual("", kwargs["headless_prompt_text"])
def test_headless_without_prompt_errors(self):
with self.assertRaises(Die):
resume_mod.cmd_resume(["implementer-abc12", "--headless"])
self._launch.assert_not_called()
def test_prompt_without_headless_errors(self):
with self.assertRaises(Die):
resume_mod.cmd_resume(["implementer-abc12", "--prompt", "hi"])
self._launch.assert_not_called()
if __name__ == "__main__":
unittest.main()
+1 -1
View File
@@ -56,7 +56,7 @@ class TestCmdStartHeadless(unittest.TestCase):
return_value=self._manifest,
).start()
self._launch_mock = patch(
"bot_bottle.cli.start._launch_bottle", return_value=0
"bot_bottle.cli.start._launch_bottle", return_value=("", 0)
).start()
# No bottles running by default → no label collision.
patch(
+2 -2
View File
@@ -45,7 +45,7 @@ class TestCmdStartSelector(unittest.TestCase):
self._launch_patch = patch(
"bot_bottle.cli.start._launch_bottle",
return_value=0,
return_value=("", 0),
)
self._launch_mock = self._launch_patch.start()
@@ -211,7 +211,7 @@ class TestCmdStartLabelCollision(unittest.TestCase):
self._manifest = _make_manifest(["researcher"], ["claude"])
patch("bot_bottle.cli.start.ManifestIndex.resolve", return_value=self._manifest).start()
self._launch_mock = patch(
"bot_bottle.cli.start._launch_bottle", return_value=0,
"bot_bottle.cli.start._launch_bottle", return_value=("", 0),
).start()
# Stub the bottle picker to always return a selection.
patch.object(tui_mod, "filter_multiselect", return_value=["claude"]).start()
+107
View File
@@ -0,0 +1,107 @@
"""Unit: Forge abstraction + ScopedForge (PRD forge-native-integration)."""
from __future__ import annotations
import unittest
from bot_bottle.contrib.forge.base import (
Comment,
Forge,
ForgeScopeError,
Issue,
PullRequest,
ScopedForge,
)
class _RecordingForge(Forge):
"""In-memory fake that records writes."""
def __init__(self) -> None:
self.comments: list[tuple[int, str]] = []
self.descriptions: list[tuple[int, str]] = []
def read_issue(self, number: int) -> Issue:
return Issue(number=number, title="t", body="b", state="open")
def read_pr(self, number: int) -> PullRequest:
return PullRequest(
number=number, title="pr", body="b", state="open", merged=False
)
def read_comments(self, number: int) -> list[Comment]:
return [Comment(id=1, user="alice", body="hi")]
def post_comment(self, number: int, body: str) -> None:
self.comments.append((number, body))
def update_description(self, number: int, body: str) -> None:
self.descriptions.append((number, body))
def is_org_member(self, org: str, username: str) -> bool:
return username == "member"
def get_pr_for_issue(self, number: int) -> int | None:
return 99 if number == 17 else None
def is_pr_open(self, number: int) -> bool:
return True
class TestScopedForgeReads(unittest.TestCase):
def setUp(self) -> None:
self.inner = _RecordingForge()
self.scoped = ScopedForge(self.inner, assigned_issue=17, assigned_prs=[42])
def test_reads_pass_through_to_any_number(self):
# A number well outside the writable scope still reads fine.
self.assertEqual(123, self.scoped.read_issue(123).number)
self.assertEqual("alice", self.scoped.read_comments(500)[0].user)
def test_read_pr_passes_through(self):
pr = self.scoped.read_pr(999)
self.assertIsInstance(pr, PullRequest)
self.assertEqual(999, pr.number)
self.assertFalse(pr.merged)
def test_membership_and_pr_lookups_delegate(self):
self.assertTrue(self.scoped.is_org_member("bot-bottle", "member"))
self.assertFalse(self.scoped.is_org_member("bot-bottle", "stranger"))
self.assertEqual(99, self.scoped.get_pr_for_issue(17))
self.assertTrue(self.scoped.is_pr_open(8000))
class TestScopedForgeWrites(unittest.TestCase):
def setUp(self) -> None:
self.inner = _RecordingForge()
self.scoped = ScopedForge(self.inner, assigned_issue=17, assigned_prs=[42])
def test_writable_set_is_issue_plus_prs(self):
self.assertEqual(frozenset({17, 42}), self.scoped.writable)
def test_write_to_assigned_issue_allowed(self):
self.scoped.post_comment(17, "done")
self.assertEqual([(17, "done")], self.inner.comments)
def test_write_to_assigned_pr_allowed(self):
self.scoped.update_description(42, "new body")
self.assertEqual([(42, "new body")], self.inner.descriptions)
def test_comment_outside_scope_rejected(self):
with self.assertRaises(ForgeScopeError) as ctx:
self.scoped.post_comment(500, "spam")
self.assertIn("500", str(ctx.exception))
self.assertEqual([], self.inner.comments)
def test_description_outside_scope_rejected(self):
with self.assertRaises(ForgeScopeError):
self.scoped.update_description(500, "tamper")
self.assertEqual([], self.inner.descriptions)
def test_scope_error_is_permission_error(self):
# Sidecars can catch the stdlib base type.
self.assertIn(PermissionError, ForgeScopeError.__mro__)
if __name__ == "__main__":
unittest.main()
+145
View File
@@ -0,0 +1,145 @@
"""Unit: GiteaClient + GiteaForge (PRD forge-native-integration)."""
from __future__ import annotations
import json
import unittest
import urllib.error
from io import BytesIO
from unittest.mock import MagicMock, patch
from bot_bottle.contrib.gitea.client import GiteaClient, GiteaForge
def _client() -> GiteaClient:
return GiteaClient(
api_url="https://gitea.example.com/api/v1",
owner="didericis",
repo="bot-bottle",
token="test-token",
)
def _resp(body: object, status: int = 200) -> MagicMock:
resp = MagicMock()
resp.read.return_value = json.dumps(body).encode() if body is not None else b""
resp.status = status
resp.__enter__ = lambda s: s # type: ignore
resp.__exit__ = MagicMock(return_value=False)
return resp
def _http_error(code: int, body: str = "") -> urllib.error.HTTPError:
return urllib.error.HTTPError(
url="http://x", code=code, msg="err", hdrs=None, # type: ignore[arg-type]
fp=BytesIO(body.encode()),
)
_URLOPEN = "bot_bottle.contrib.gitea.client.urllib.request.urlopen"
class TestOrgMembership(unittest.TestCase):
def test_member_returns_true_on_2xx(self):
with patch(_URLOPEN, return_value=_resp(None, 204)) as m:
self.assertTrue(_client().is_org_member("bot-bottle", "alice"))
req = m.call_args.args[0]
self.assertIn("/orgs/bot-bottle/members/alice", req.full_url)
def test_nonmember_returns_false_on_404(self):
with patch(_URLOPEN, side_effect=_http_error(404)):
self.assertFalse(_client().is_org_member("bot-bottle", "stranger"))
def test_other_http_error_raises(self):
with patch(_URLOPEN, side_effect=_http_error(403, "forbidden")):
with self.assertRaises(RuntimeError) as ctx:
_client().is_org_member("bot-bottle", "alice")
self.assertIn("403", str(ctx.exception))
class TestForgeReads(unittest.TestCase):
def test_read_issue_maps_fields(self):
raw = {"number": 17, "title": "Bug", "body": "broken", "state": "open"}
with patch(_URLOPEN, return_value=_resp(raw)) as m:
issue = GiteaForge(_client()).read_issue(17)
self.assertEqual((17, "Bug", "broken", "open"),
(issue.number, issue.title, issue.body, issue.state))
self.assertIn("/repos/didericis/bot-bottle/issues/17",
m.call_args.args[0].full_url)
def test_read_issue_tolerates_null_body(self):
raw = {"number": 17, "title": "T", "body": None, "state": "open"}
with patch(_URLOPEN, return_value=_resp(raw)):
self.assertEqual("", GiteaForge(_client()).read_issue(17).body)
def test_read_comments_maps_user_login(self):
raw = [
{"id": 1, "user": {"login": "alice"}, "body": "hi"},
{"id": 2, "user": {"login": "bob"}, "body": "yo"},
]
with patch(_URLOPEN, return_value=_resp(raw)):
comments = GiteaForge(_client()).read_comments(17)
self.assertEqual(["alice", "bob"], [c.user for c in comments])
self.assertEqual([1, 2], [c.id for c in comments])
class TestForgeWrites(unittest.TestCase):
def test_post_comment_payload_and_url(self):
with patch(_URLOPEN, return_value=_resp(None, 201)) as m:
GiteaForge(_client()).post_comment(17, "done ✓")
req = m.call_args.args[0]
self.assertEqual("POST", req.method)
self.assertIn("/repos/didericis/bot-bottle/issues/17/comments", req.full_url)
self.assertEqual("done ✓", json.loads(req.data)["body"])
def test_update_description_patches_issue(self):
with patch(_URLOPEN, return_value=_resp(None, 200)) as m:
GiteaForge(_client()).update_description(17, "edited")
req = m.call_args.args[0]
self.assertEqual("PATCH", req.method)
self.assertTrue(req.full_url.endswith("/issues/17"))
self.assertEqual("edited", json.loads(req.data)["body"])
def test_auth_header_sent(self):
with patch(_URLOPEN, return_value=_resp(None, 201)) as m:
GiteaForge(_client()).post_comment(17, "x")
self.assertEqual("token test-token",
m.call_args.args[0].headers["Authorization"])
class TestPRHelpers(unittest.TestCase):
def test_get_pr_for_issue_returns_number_when_issue_is_pr(self):
raw = {"number": 18, "pull_request": {"merged": False}}
with patch(_URLOPEN, return_value=_resp(raw)):
self.assertEqual(18, GiteaForge(_client()).get_pr_for_issue(18))
def test_get_pr_for_issue_none_for_plain_issue(self):
raw = {"number": 17, "pull_request": None}
with patch(_URLOPEN, return_value=_resp(raw)):
self.assertIsNone(GiteaForge(_client()).get_pr_for_issue(17))
def test_is_pr_open_true_when_state_open(self):
with patch(_URLOPEN, return_value=_resp({"state": "open"})):
self.assertTrue(GiteaForge(_client()).is_pr_open(18))
def test_is_pr_open_false_when_closed(self):
with patch(_URLOPEN, return_value=_resp({"state": "closed"})):
self.assertFalse(GiteaForge(_client()).is_pr_open(18))
def test_read_pr_maps_fields_including_merged(self):
raw = {"number": 18, "title": "Fix", "body": "patch",
"state": "closed", "merged": True}
with patch(_URLOPEN, return_value=_resp(raw)) as m:
pr = GiteaForge(_client()).read_pr(18)
self.assertEqual((18, "Fix", "patch", "closed", True),
(pr.number, pr.title, pr.body, pr.state, pr.merged))
self.assertIn("/repos/didericis/bot-bottle/pulls/18",
m.call_args.args[0].full_url)
def test_read_pr_merged_defaults_false(self):
with patch(_URLOPEN, return_value=_resp({"number": 18, "state": "open"})):
self.assertFalse(GiteaForge(_client()).read_pr(18).merged)
if __name__ == "__main__":
unittest.main()
@@ -0,0 +1,99 @@
"""Unit: SQLite forge state store (PRD forge-native-integration)."""
from __future__ import annotations
import tempfile
import unittest
from dataclasses import replace
from pathlib import Path
from bot_bottle.contrib.gitea.forge_state import (
STATUS_FROZEN,
STATUS_RUNNING,
ForgeState,
SqliteForgeStateStore,
)
def _state(**over: object) -> ForgeState:
base = ForgeState(
owner="didericis",
repo="bot-bottle",
issue_number=17,
slug="implementer-abc12",
agent_name="implementer",
bottle_names=["claude"],
backend_name="docker",
agent_git_user="didericis-claude",
pr_number=42,
status=STATUS_FROZEN,
last_checkin_at="2026-06-29T12:04:12-04:00",
)
return replace(base, **over)
class ForgeStateStoreTest(unittest.TestCase):
def setUp(self) -> None:
tmp = Path(self.enterContext(tempfile.TemporaryDirectory())) # pylint: disable=consider-using-with
self.store = SqliteForgeStateStore(tmp / "sub" / "bot-bottle.db")
def test_round_trip(self):
self.store.upsert(_state())
self.assertEqual(_state(), self.store.get("didericis", "bot-bottle", 17))
def test_missing_returns_none(self):
self.assertIsNone(self.store.get("nobody", "nope", 1))
def test_creates_db_parent_dirs(self):
# setUp pointed at a non-existent 'sub/' dir; init must create it.
self.assertIsNone(self.store.get("x", "y", 1)) # no raise
def test_upsert_replaces(self):
self.store.upsert(_state(status=STATUS_RUNNING))
self.store.upsert(_state(status=STATUS_FROZEN))
got = self.store.get("didericis", "bot-bottle", 17)
assert got is not None
self.assertEqual(STATUS_FROZEN, got.status)
# Still one row, not two.
self.assertEqual(1, len(self.store.all()))
def test_delete_is_idempotent(self):
self.store.upsert(_state())
self.store.delete("didericis", "bot-bottle", 17)
self.store.delete("didericis", "bot-bottle", 17) # no raise
self.assertIsNone(self.store.get("didericis", "bot-bottle", 17))
def test_all_lists_across_repos_sorted(self):
self.store.upsert(_state(issue_number=18, slug="other"))
self.store.upsert(_state(issue_number=17))
self.store.upsert(_state(owner="acme", repo="widget", issue_number=3))
states = self.store.all()
self.assertEqual(3, len(states))
self.assertEqual(
[("acme", 3), ("didericis", 17), ("didericis", 18)],
[(s.owner, s.issue_number) for s in states],
)
def test_all_empty(self):
self.assertEqual([], self.store.all())
def test_bottle_names_list_preserved(self):
self.store.upsert(_state(bottle_names=["claude", "dev"]))
got = self.store.get("didericis", "bot-bottle", 17)
assert got is not None
self.assertEqual(["claude", "dev"], got.bottle_names)
def test_pr_number_nullable(self):
self.store.upsert(_state(pr_number=None))
got = self.store.get("didericis", "bot-bottle", 17)
assert got is not None
self.assertIsNone(got.pr_number)
def test_persists_across_store_instances(self):
self.store.upsert(_state())
reopened = SqliteForgeStateStore(self.store._db_path) # pylint: disable=protected-access
self.assertEqual(_state(), reopened.get("didericis", "bot-bottle", 17))
if __name__ == "__main__":
unittest.main()