From 314dc03b0ddd64eb2eb8ef03d63c6fadfad996a4 Mon Sep 17 00:00:00 2001 From: claude Date: Wed, 1 Jul 2026 17:18:28 +0000 Subject: [PATCH] feat: fold bot-bottle-orchestrator into bot_bottle/orchestrator subpackage Moves the orchestrator into bot_bottle/orchestrator/ so one install gets everything. Entry point is now `python -m bot_bottle.orchestrator run`. - Add bot_bottle/orchestrator/ with all 14 modules (verbatim move; internal imports were already relative, so no changes inside orchestrator modules) - Rewrite bootstrap.py: remove the lazy bot_bottle import guard, use direct relative imports from ..contrib.* - Add bot_bottle/contrib/forge/base.py: ScopedForge (read-anywhere / write-scoped) - Add bot_bottle/contrib/gitea/client.py: GiteaClient + GiteaForge (urllib.request only) - Add bot_bottle/contrib/gitea/forge_state.py: ForgeState + SqliteForgeStateStore - Add tests/unit/orchestrator/ (82 tests: 63 migrated + 19 new for contrib modules) Closes #321 --- bot_bottle/contrib/forge/__init__.py | 0 bot_bottle/contrib/forge/base.py | 52 ++++++ bot_bottle/contrib/gitea/client.py | 112 ++++++++++++ bot_bottle/contrib/gitea/forge_state.py | 137 ++++++++++++++ bot_bottle/orchestrator/__init__.py | 8 + bot_bottle/orchestrator/__main__.py | 51 ++++++ bot_bottle/orchestrator/bootstrap.py | 155 ++++++++++++++++ bot_bottle/orchestrator/config.py | 52 ++++++ bot_bottle/orchestrator/events.py | 85 +++++++++ bot_bottle/orchestrator/lifecycle.py | 180 +++++++++++++++++++ bot_bottle/orchestrator/model.py | 108 +++++++++++ bot_bottle/orchestrator/provenance.py | 71 ++++++++ bot_bottle/orchestrator/runner.py | 118 ++++++++++++ bot_bottle/orchestrator/sidecar.py | 171 ++++++++++++++++++ bot_bottle/orchestrator/store.py | 48 +++++ bot_bottle/orchestrator/targeting.py | 51 ++++++ bot_bottle/orchestrator/watchdog.py | 68 +++++++ bot_bottle/orchestrator/webhook.py | 123 +++++++++++++ tests/unit/orchestrator/__init__.py | 0 tests/unit/orchestrator/_fakes.py | 69 +++++++ tests/unit/orchestrator/test_config.py | 38 ++++ tests/unit/orchestrator/test_events.py | 64 +++++++ tests/unit/orchestrator/test_forge_state.py | 75 ++++++++ tests/unit/orchestrator/test_lifecycle.py | 140 +++++++++++++++ tests/unit/orchestrator/test_provenance.py | 53 ++++++ tests/unit/orchestrator/test_runner.py | 65 +++++++ tests/unit/orchestrator/test_scoped_forge.py | 75 ++++++++ tests/unit/orchestrator/test_sidecar.py | 108 +++++++++++ tests/unit/orchestrator/test_store.py | 50 ++++++ tests/unit/orchestrator/test_targeting.py | 60 +++++++ tests/unit/orchestrator/test_watchdog.py | 66 +++++++ tests/unit/orchestrator/test_webhook.py | 155 ++++++++++++++++ 32 files changed, 2608 insertions(+) create mode 100644 bot_bottle/contrib/forge/__init__.py create mode 100644 bot_bottle/contrib/forge/base.py create mode 100644 bot_bottle/contrib/gitea/client.py create mode 100644 bot_bottle/contrib/gitea/forge_state.py create mode 100644 bot_bottle/orchestrator/__init__.py create mode 100644 bot_bottle/orchestrator/__main__.py create mode 100644 bot_bottle/orchestrator/bootstrap.py create mode 100644 bot_bottle/orchestrator/config.py create mode 100644 bot_bottle/orchestrator/events.py create mode 100644 bot_bottle/orchestrator/lifecycle.py create mode 100644 bot_bottle/orchestrator/model.py create mode 100644 bot_bottle/orchestrator/provenance.py create mode 100644 bot_bottle/orchestrator/runner.py create mode 100644 bot_bottle/orchestrator/sidecar.py create mode 100644 bot_bottle/orchestrator/store.py create mode 100644 bot_bottle/orchestrator/targeting.py create mode 100644 bot_bottle/orchestrator/watchdog.py create mode 100644 bot_bottle/orchestrator/webhook.py create mode 100644 tests/unit/orchestrator/__init__.py create mode 100644 tests/unit/orchestrator/_fakes.py create mode 100644 tests/unit/orchestrator/test_config.py create mode 100644 tests/unit/orchestrator/test_events.py create mode 100644 tests/unit/orchestrator/test_forge_state.py create mode 100644 tests/unit/orchestrator/test_lifecycle.py create mode 100644 tests/unit/orchestrator/test_provenance.py create mode 100644 tests/unit/orchestrator/test_runner.py create mode 100644 tests/unit/orchestrator/test_scoped_forge.py create mode 100644 tests/unit/orchestrator/test_sidecar.py create mode 100644 tests/unit/orchestrator/test_store.py create mode 100644 tests/unit/orchestrator/test_targeting.py create mode 100644 tests/unit/orchestrator/test_watchdog.py create mode 100644 tests/unit/orchestrator/test_webhook.py diff --git a/bot_bottle/contrib/forge/__init__.py b/bot_bottle/contrib/forge/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/bot_bottle/contrib/forge/base.py b/bot_bottle/contrib/forge/base.py new file mode 100644 index 0000000..50433f0 --- /dev/null +++ b/bot_bottle/contrib/forge/base.py @@ -0,0 +1,52 @@ +"""Scoped forge wrapper: read-anywhere / write-scoped access control. + +`ScopedForge` wraps any forge object and restricts write operations to +the set of issue/PR numbers the agent is explicitly assigned to. Read +operations always pass through unconditionally. +""" + +from __future__ import annotations + +from typing import Any + + +class ScopedForge: + """Delegates all forge calls to an inner forge, raising `PermissionError` + on write calls for numbers outside the assigned scope.""" + + def __init__( + self, + forge: Any, + *, + assigned_issue: int, + assigned_prs: list[int], + ) -> None: + self._forge = forge + self._allowed_writes: frozenset[int] = frozenset({assigned_issue, *assigned_prs}) + + def _check_write(self, number: int) -> None: + if number not in self._allowed_writes: + raise PermissionError( + f"write to #{number} is outside the assigned scope " + f"(allowed: {sorted(self._allowed_writes)})" + ) + + def is_org_member(self, org: str, username: str) -> bool: + return self._forge.is_org_member(org, username) + + def read_issue(self, number: int) -> dict[str, Any]: + return self._forge.read_issue(number) + + def read_pr(self, number: int) -> dict[str, Any]: + return self._forge.read_pr(number) + + def read_comments(self, number: int) -> list[dict[str, Any]]: + return self._forge.read_comments(number) + + def post_comment(self, number: int, body: str) -> None: + self._check_write(number) + self._forge.post_comment(number, body) + + def update_description(self, number: int, body: str) -> None: + self._check_write(number) + self._forge.update_description(number, body) diff --git a/bot_bottle/contrib/gitea/client.py b/bot_bottle/contrib/gitea/client.py new file mode 100644 index 0000000..64fa728 --- /dev/null +++ b/bot_bottle/contrib/gitea/client.py @@ -0,0 +1,112 @@ +"""Gitea API client and forge adapter (PRD prd-new: fold orchestrator). + +`GiteaClient` is a thin HTTP wrapper (stdlib `urllib.request` only — no +new runtime dependencies). `GiteaForge` composes a client and exposes +the forge protocol used by the orchestrator's sidecar and lifecycle. + +Required Gitea token scopes: +- Repository: Read & Write (issues, comments, PR descriptions) +- Organization: Read (org membership check) +""" + +from __future__ import annotations + +import json +import urllib.error +import urllib.request +from typing import Any + +_TIMEOUT_SECS = 30 + + +class GiteaClient: + """Low-level HTTP wrapper for the Gitea REST API.""" + + def __init__( + self, *, api_url: str, owner: str, repo: str, token: str + ) -> None: + self._base = api_url.rstrip("/") + self._owner = owner + self._repo = repo + self._headers = { + "Authorization": f"token {token}", + "Content-Type": "application/json", + "Accept": "application/json", + } + + def _request( + self, + method: str, + path: str, + body: dict[str, Any] | None = None, + ) -> Any: + url = f"{self._base}{path}" + data = json.dumps(body).encode() if body is not None else None + req = urllib.request.Request( + url, data=data, headers=self._headers, method=method + ) + with urllib.request.urlopen(req, timeout=_TIMEOUT_SECS) as resp: + raw = resp.read() + return json.loads(raw) if raw else None + + def is_org_member(self, org: str, username: str) -> bool: + url = f"{self._base}/orgs/{org}/members/{username}" + req = urllib.request.Request(url, headers=self._headers, method="GET") + try: + urllib.request.urlopen(req, timeout=_TIMEOUT_SECS).close() + return True + except urllib.error.HTTPError: + return False + + def get_issue(self, number: int) -> dict[str, Any]: + return self._request("GET", f"/repos/{self._owner}/{self._repo}/issues/{number}") + + def get_pull(self, number: int) -> dict[str, Any]: + return self._request("GET", f"/repos/{self._owner}/{self._repo}/pulls/{number}") + + def list_comments(self, number: int) -> list[dict[str, Any]]: + return self._request("GET", f"/repos/{self._owner}/{self._repo}/issues/{number}/comments") + + def create_comment(self, number: int, body: str) -> None: + self._request( + "POST", + f"/repos/{self._owner}/{self._repo}/issues/{number}/comments", + {"body": body}, + ) + + def update_issue(self, number: int, body: str) -> None: + self._request( + "PATCH", + f"/repos/{self._owner}/{self._repo}/issues/{number}", + {"body": body}, + ) + + +class GiteaForge: + """Adapts `GiteaClient` to the forge protocol expected by the orchestrator. + + The forge protocol is duck-typed: any object with `is_org_member`, + `read_issue`, `read_pr`, `read_comments`, `post_comment`, and + `update_description` methods satisfies it. + """ + + def __init__(self, client: GiteaClient) -> None: + self._client = client + + def is_org_member(self, org: str, username: str) -> bool: + return self._client.is_org_member(org, username) + + def read_issue(self, number: int) -> dict[str, Any]: + return self._client.get_issue(number) + + def read_pr(self, number: int) -> dict[str, Any]: + return self._client.get_pull(number) + + def read_comments(self, number: int) -> list[dict[str, Any]]: + return self._client.list_comments(number) + + def post_comment(self, number: int, body: str) -> None: + self._client.create_comment(number, body) + + def update_description(self, number: int, body: str) -> None: + self._client.update_issue(number, body) diff --git a/bot_bottle/contrib/gitea/forge_state.py b/bot_bottle/contrib/gitea/forge_state.py new file mode 100644 index 0000000..e6323a9 --- /dev/null +++ b/bot_bottle/contrib/gitea/forge_state.py @@ -0,0 +1,137 @@ +"""Forge state persistence for the orchestrator (PRD prd-new: fold orchestrator). + +`ForgeState` is a dataclass that mirrors the orchestrator's `RunRecord` +field-for-field, held here so the store implementation is in bot-bottle +where the Gitea contrib lives. + +`SqliteForgeStateStore` backs it with a single SQLite table. The DB path +is optional; passing `None` uses `:memory:` (useful for tests and status +commands that don't need persistence). +""" + +from __future__ import annotations + +import json +import sqlite3 +from dataclasses import dataclass, field +from pathlib import Path + + +@dataclass +class ForgeState: + """Persisted state for one forge-targeted issue's bottle lifecycle.""" + + owner: str + repo: str + issue_number: int + slug: str + agent_name: str + bottle_names: list[str] = field(default_factory=list) + backend_name: str = "" + agent_git_user: str = "" + pr_number: int | None = None + status: str = "" + last_checkin_at: str = "" + + +_DDL = """ +CREATE TABLE IF NOT EXISTS forge_state ( + owner TEXT NOT NULL, + repo TEXT NOT NULL, + issue_number INTEGER NOT NULL, + slug TEXT NOT NULL, + agent_name TEXT NOT NULL, + bottle_names TEXT NOT NULL DEFAULT '[]', + backend_name TEXT NOT NULL DEFAULT '', + agent_git_user TEXT NOT NULL DEFAULT '', + pr_number INTEGER, + status TEXT NOT NULL DEFAULT '', + last_checkin_at TEXT NOT NULL DEFAULT '', + PRIMARY KEY (owner, repo, issue_number) +) +""" + + +class SqliteForgeStateStore: + """SQLite-backed `ForgeState` store. + + Thread-safety: a single connection is used; callers that share a + store across threads must serialise access externally. + """ + + def __init__(self, db_path: Path | None) -> None: + path = str(db_path) if db_path is not None else ":memory:" + self._conn = sqlite3.connect(path, check_same_thread=False) + self._conn.row_factory = sqlite3.Row + self._conn.execute(_DDL) + self._conn.commit() + + def upsert(self, state: ForgeState) -> None: + self._conn.execute( + """ + INSERT INTO forge_state + (owner, repo, issue_number, slug, agent_name, + bottle_names, backend_name, agent_git_user, + pr_number, status, last_checkin_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(owner, repo, issue_number) DO UPDATE SET + slug = excluded.slug, + agent_name = excluded.agent_name, + bottle_names = excluded.bottle_names, + backend_name = excluded.backend_name, + agent_git_user = excluded.agent_git_user, + pr_number = excluded.pr_number, + status = excluded.status, + last_checkin_at = excluded.last_checkin_at + """, + ( + state.owner, + state.repo, + state.issue_number, + state.slug, + state.agent_name, + json.dumps(state.bottle_names), + state.backend_name, + state.agent_git_user, + state.pr_number, + state.status, + state.last_checkin_at, + ), + ) + self._conn.commit() + + def get(self, owner: str, repo: str, issue_number: int) -> ForgeState | None: + row = self._conn.execute( + "SELECT * FROM forge_state WHERE owner=? AND repo=? AND issue_number=?", + (owner, repo, issue_number), + ).fetchone() + return _row_to_state(row) if row is not None else None + + def delete(self, owner: str, repo: str, issue_number: int) -> None: + self._conn.execute( + "DELETE FROM forge_state WHERE owner=? AND repo=? AND issue_number=?", + (owner, repo, issue_number), + ) + self._conn.commit() + + def all(self) -> list[ForgeState]: + rows = self._conn.execute( + "SELECT * FROM forge_state ORDER BY owner, repo, issue_number" + ).fetchall() + return [_row_to_state(r) for r in rows] + + +def _row_to_state(row: sqlite3.Row) -> ForgeState: + return ForgeState( + owner=row["owner"], + repo=row["repo"], + issue_number=row["issue_number"], + slug=row["slug"], + agent_name=row["agent_name"], + bottle_names=json.loads(row["bottle_names"]), + backend_name=row["backend_name"], + agent_git_user=row["agent_git_user"], + pr_number=row["pr_number"], + status=row["status"], + last_checkin_at=row["last_checkin_at"], + ) diff --git a/bot_bottle/orchestrator/__init__.py b/bot_bottle/orchestrator/__init__.py new file mode 100644 index 0000000..3c049cb --- /dev/null +++ b/bot_bottle/orchestrator/__init__.py @@ -0,0 +1,8 @@ +"""bot-bottle-orchestrator: forge-native orchestration for bot-bottle. + +The package is stdlib-only. The core (events, targeting, lifecycle, +watchdog, sidecar, webhook) depends on its collaborators — a forge, a +state store, a bottle runner — through duck-typed interfaces, so it runs +and tests without bot-bottle installed. `bootstrap` is the single module +that imports `bot_bottle` and wires the concrete implementations. +""" diff --git a/bot_bottle/orchestrator/__main__.py b/bot_bottle/orchestrator/__main__.py new file mode 100644 index 0000000..455742d --- /dev/null +++ b/bot_bottle/orchestrator/__main__.py @@ -0,0 +1,51 @@ +"""CLI entry point: `python -m bot_bottle.orchestrator `. + +Commands: + run start the webhook server + watchdog + done-signal relay + status print the tracked runs (issue -> slug, status) +""" + +from __future__ import annotations + +import argparse +import sys + +from .config import Config + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser(prog="python -m bot_bottle.orchestrator") + sub = parser.add_subparsers(dest="command", required=True) + sub.add_parser("run", help="start the webhook server, watchdog, and relay") + sub.add_parser("status", help="list tracked runs") + args = parser.parse_args(argv) + + config = Config.from_env() + + if args.command == "run": + from . import bootstrap # pylint: disable=import-outside-toplevel + + print( + f"orchestrator listening on " + f"http://{config.webhook_host}:{config.webhook_port}/webhook", + file=sys.stderr, + ) + bootstrap.run(config) + return 0 + + if args.command == "status": + from .bootstrap import ( # pylint: disable=import-outside-toplevel + BotBottleStateStore, + ) + + store = BotBottleStateStore(config.db_path) + for r in store.all(): + pr = f"PR#{r.pr_number}" if r.pr_number else "-" + print(f"{r.owner}/{r.repo}#{r.issue_number}\t{r.slug}\t{r.status}\t{pr}") + return 0 + + return 2 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/bot_bottle/orchestrator/bootstrap.py b/bot_bottle/orchestrator/bootstrap.py new file mode 100644 index 0000000..636d7d8 --- /dev/null +++ b/bot_bottle/orchestrator/bootstrap.py @@ -0,0 +1,155 @@ +"""Wire the concrete bot-bottle implementations into the core. + +This is the ONLY module that imports from `bot_bottle.contrib`. It adapts +`SqliteForgeStateStore` to our `StateStore`, builds `GiteaForge`s (and +scope-wrapped forges for sidecars), constructs the `Orchestrator`, and +runs the webhook server + watchdog + done-signal relay. + +Imports are direct (no lazy loading) because the orchestrator is now part +of the same package installation. +""" + +from __future__ import annotations + +import os +import threading +from pathlib import Path +from typing import Any + +from ..contrib.forge.base import ScopedForge +from ..contrib.gitea.client import GiteaClient, GiteaForge +from ..contrib.gitea.forge_state import ForgeState, SqliteForgeStateStore +from .config import Config +from .lifecycle import Orchestrator +from .model import RunRecord +from .runner import SubprocessBottleRunner +from .sidecar import ForgeSidecar, OpLog, drain_done_events +from .watchdog import Watchdog +from .webhook import WebhookServer + +_RELAY_TICK_SECS = 2.0 + + +def _token() -> str: + tok = os.environ.get("GITEA_TOKEN") or os.environ.get("FORGE_GITEA_TOKEN") + if not tok: + raise RuntimeError("set GITEA_TOKEN (or FORGE_GITEA_TOKEN)") + return tok + + +class BotBottleStateStore: + """Adapts `SqliteForgeStateStore` to our `StateStore`, translating + `RunRecord` <-> `ForgeState` field-for-field.""" + + def __init__(self, db_path: Path | None) -> None: + self._inner = SqliteForgeStateStore(db_path) + + def upsert(self, record: RunRecord) -> None: + self._inner.upsert(_to_forge_state(record)) + + def get(self, owner: str, repo: str, issue_number: int) -> RunRecord | None: + state = self._inner.get(owner, repo, issue_number) + return _to_record(state) if state is not None else None + + def delete(self, owner: str, repo: str, issue_number: int) -> None: + self._inner.delete(owner, repo, issue_number) + + def all(self) -> list[RunRecord]: + return [_to_record(s) for s in self._inner.all()] + + +def _to_forge_state(r: RunRecord) -> ForgeState: + return ForgeState( + owner=r.owner, repo=r.repo, issue_number=r.issue_number, slug=r.slug, + agent_name=r.agent_name, bottle_names=list(r.bottle_names), + backend_name=r.backend_name, agent_git_user=r.agent_git_user, + pr_number=r.pr_number, status=r.status, last_checkin_at=r.last_checkin_at, + ) + + +def _to_record(s: ForgeState) -> RunRecord: + return RunRecord( + owner=s.owner, repo=s.repo, issue_number=s.issue_number, slug=s.slug, + agent_name=s.agent_name, bottle_names=list(s.bottle_names), + backend_name=s.backend_name, agent_git_user=s.agent_git_user, + pr_number=s.pr_number, status=s.status, last_checkin_at=s.last_checkin_at, + ) + + +def make_forge(config: Config, owner: str, repo: str) -> Any: + """A `GiteaForge` bound to one repo.""" + client = GiteaClient( + api_url=config.gitea_api, owner=owner, repo=repo, token=_token() + ) + return GiteaForge(client) + + +def make_sidecar( + config: Config, owner: str, repo: str, issue_number: int, assigned_prs: list[int] +) -> ForgeSidecar: + """A scope-enforced sidecar for one run (read-anywhere / write-scoped).""" + scoped = ScopedForge( + make_forge(config, owner, repo), + assigned_issue=issue_number, + assigned_prs=assigned_prs, + ) + op_log = OpLog(config.queue_dir / f"{owner}-{repo}-{issue_number}.oplog.jsonl") + return ForgeSidecar( + forge=scoped, + op_log=op_log, + queue_dir=config.queue_dir, + run_key=(owner, repo, issue_number), + ) + + +def build(config: Config) -> tuple[WebhookServer, Watchdog, Orchestrator]: + store = BotBottleStateStore(config.db_path) + runner = SubprocessBottleRunner(cli=config.bot_bottle_cli, base_env=dict(os.environ)) + membership_forge = make_forge(config, "_", "_") + orchestrator = Orchestrator( + forge=membership_forge, + store=store, + runner=runner, + org=config.forge_org, + gitea_api=config.gitea_api, + forge_env_base={ + "GITEA_TOKEN": _token(), + "FORGE_QUEUE_DIR": str(config.queue_dir), + "FORGE_SIDECAR_SOCKET": str(config.sidecar_socket), + }, + ) + watchdog = Watchdog( + store=store, runner=runner, timeout_secs=config.watchdog_timeout_secs + ) + server = WebhookServer( + (config.webhook_host, config.webhook_port), + orchestrator=orchestrator, + store=store, + ) + return server, watchdog, orchestrator + + +def _relay_loop(config: Config, orchestrator: Orchestrator, stop: threading.Event) -> None: + while not stop.wait(_RELAY_TICK_SECS): + for ev in drain_done_events(config.queue_dir): + orchestrator.on_done_signal( + ev["owner"], ev["repo"], int(ev["issue_number"]), + str(ev.get("status", "")), str(ev.get("summary", "")), + ) + + +def run(config: Config) -> None: + """Blocking run: webhook server + watchdog + done-signal relay.""" + server, watchdog, orchestrator = build(config) + watchdog.start() + stop = threading.Event() + relay = threading.Thread( + target=_relay_loop, args=(config, orchestrator, stop), daemon=True + ) + relay.start() + try: + server.serve_forever() + finally: + stop.set() + watchdog.stop() + server.server_close() diff --git a/bot_bottle/orchestrator/config.py b/bot_bottle/orchestrator/config.py new file mode 100644 index 0000000..e66aa7b --- /dev/null +++ b/bot_bottle/orchestrator/config.py @@ -0,0 +1,52 @@ +"""Configuration, loaded from the environment (stdlib `os` only). + +Everything the orchestrator needs to run is an env var so a deploy is a +process with an environment, no config file to manage. `FORGE_*` names +match the bot-bottle forge-native PRD. +""" + +from __future__ import annotations + +import os +from dataclasses import dataclass +from pathlib import Path + +# The label that marks an issue as agent-targeted: `bot-bottle:`. +LABEL_PREFIX = "bot-bottle:" +# Optional bottle override: `bot-bottle-bottle:`. +BOTTLE_LABEL_PREFIX = "bot-bottle-bottle:" + + +@dataclass(frozen=True) +class Config: + """Resolved orchestrator configuration.""" + + forge_org: str + gitea_api: str + watchdog_timeout_secs: int + webhook_host: str + webhook_port: int + bot_bottle_cli: str + queue_dir: Path + sidecar_socket: Path + db_path: Path | None + + @staticmethod + def from_env(env: dict[str, str] | None = None) -> "Config": + e = os.environ if env is None else env + home = Path(e.get("HOME", str(Path.home()))) + default_root = home / ".bot-bottle" + db = e.get("FORGE_DB_PATH") + return Config( + forge_org=e.get("FORGE_ORG", "bot-bottle"), + gitea_api=e.get("FORGE_GITEA_API", ""), + watchdog_timeout_secs=int(e.get("FORGE_WATCHDOG_TIMEOUT", "1800")), + webhook_host=e.get("FORGE_WEBHOOK_HOST", "127.0.0.1"), + webhook_port=int(e.get("FORGE_WEBHOOK_PORT", "8477")), + bot_bottle_cli=e.get("BOT_BOTTLE_CLI", "cli.py"), + queue_dir=Path(e.get("FORGE_QUEUE_DIR", str(default_root / "forge-queue"))), + sidecar_socket=Path( + e.get("FORGE_SIDECAR_SOCKET", str(default_root / "forge-sidecar.sock")) + ), + db_path=Path(db) if db else None, + ) diff --git a/bot_bottle/orchestrator/events.py b/bot_bottle/orchestrator/events.py new file mode 100644 index 0000000..b426db9 --- /dev/null +++ b/bot_bottle/orchestrator/events.py @@ -0,0 +1,85 @@ +"""Parse Gitea webhook payloads into typed `ForgeEvent`s. + +Only the fields the orchestrator acts on are extracted; unknown payloads +and event types return None so the webhook layer can ignore them. + +Gitea sends the event kind in the `X-Gitea-Event` header and the payload +as JSON. The relevant kinds: + +- `issues` with `action == "assigned"` -> IssueAssigned +- `issue_comment` with `action == "created"` -> CommentCreated +- `pull_request` with `action == "closed"` -> PullRequestClosed +""" + +from __future__ import annotations + +from typing import Any + +from .model import CommentCreated, ForgeEvent, IssueAssigned, PullRequestClosed + + +def _repo_owner(payload: dict[str, Any]) -> tuple[str, str]: + repo = payload.get("repository") or {} + owner = (repo.get("owner") or {}).get("login", "") + return str(owner), str(repo.get("name", "")) + + +def parse_event(event_kind: str, payload: dict[str, Any]) -> ForgeEvent | None: + """Map (X-Gitea-Event, payload) to a `ForgeEvent`, or None to ignore.""" + if event_kind == "issues": + return _parse_issue(payload) + if event_kind == "issue_comment": + return _parse_comment(payload) + if event_kind == "pull_request": + return _parse_pull_request(payload) + return None + + +def _parse_issue(payload: dict[str, Any]) -> IssueAssigned | None: + if payload.get("action") != "assigned": + return None + owner, repo = _repo_owner(payload) + issue = payload.get("issue") or {} + assignees = tuple( + str(a.get("login", "")) for a in (issue.get("assignees") or []) + ) + labels = tuple(str(l.get("name", "")) for l in (issue.get("labels") or [])) + return IssueAssigned( + owner=owner, + repo=repo, + issue_number=int(issue.get("number", 0)), + title=str(issue.get("title", "")), + body=str(issue.get("body", "") or ""), + assignees=assignees, + labels=labels, + ) + + +def _parse_comment(payload: dict[str, Any]) -> CommentCreated | None: + if payload.get("action") != "created": + return None + owner, repo = _repo_owner(payload) + issue = payload.get("issue") or {} + comment = payload.get("comment") or {} + return CommentCreated( + owner=owner, + repo=repo, + issue_number=int(issue.get("number", 0)), + comment_id=int(comment.get("id", 0)), + author=str((comment.get("user") or {}).get("login", "")), + body=str(comment.get("body", "") or ""), + is_pull=bool(issue.get("pull_request")), + ) + + +def _parse_pull_request(payload: dict[str, Any]) -> PullRequestClosed | None: + if payload.get("action") != "closed": + return None + owner, repo = _repo_owner(payload) + pr = payload.get("pull_request") or {} + return PullRequestClosed( + owner=owner, + repo=repo, + pr_number=int(pr.get("number", 0)), + merged=bool(pr.get("merged", False)), + ) diff --git a/bot_bottle/orchestrator/lifecycle.py b/bot_bottle/orchestrator/lifecycle.py new file mode 100644 index 0000000..5e6f6cb --- /dev/null +++ b/bot_bottle/orchestrator/lifecycle.py @@ -0,0 +1,180 @@ +"""The orchestration lifecycle: forge events -> bottle transitions. + +`Orchestrator.handle(event)` is the single entry point the webhook layer +calls. `on_done_signal(...)` is called by the sidecar relay when an agent +signals completion. All collaborators (forge, store, runner) are +injected and duck-typed; `now` and `label_for` are injectable for tests. + +Transitions: + IssueAssigned (targeted, new) -> start bottle, record = running + signal_done (running) -> freeze bottle, record = frozen + CommentCreated (frozen) -> resume bottle, record = running + PullRequestClosed (tracked) -> destroy bottle, record removed +""" + +from __future__ import annotations + +from collections.abc import Callable +from datetime import datetime + +from .model import ( + STATUS_DESTROYED, + STATUS_FROZEN, + STATUS_RUNNING, + CommentCreated, + ForgeEvent, + IssueAssigned, + PullRequestClosed, + RunRecord, +) +from .runner import BottleRunner +from .store import StateStore +from .targeting import Membership, Target, resolve_target + + +def _iso_now() -> str: + return datetime.now().astimezone().isoformat(timespec="seconds") + + +def _default_label(agent: str, event: IssueAssigned) -> str: + # Embed the issue identity so slugs are unique per issue and never + # get renamed on collision. + return f"{agent}-{event.owner}-{event.repo}-{event.issue_number}" + + +class Orchestrator: + def __init__( + self, + *, + forge: Membership, + store: StateStore, + runner: BottleRunner, + org: str, + gitea_api: str = "", + forge_env_base: dict[str, str] | None = None, + now: Callable[[], str] = _iso_now, + label_for: Callable[[str, IssueAssigned], str] = _default_label, + ) -> None: + self._forge = forge + self._store = store + self._runner = runner + self._org = org + self._gitea_api = gitea_api + self._forge_env_base = forge_env_base or {} + self._now = now + self._label_for = label_for + + # --- entry points ------------------------------------------------------ + + def handle(self, event: ForgeEvent) -> None: + if isinstance(event, IssueAssigned): + self._on_issue_assigned(event) + elif isinstance(event, CommentCreated): + self._on_comment(event) + else: + self._on_pr_closed(event) + + def on_done_signal( # pylint: disable=unused-argument + self, owner: str, repo: str, issue_number: int, status: str, summary: str + ) -> None: + """Sidecar relay: an agent signalled completion. Freeze the bottle. + `status`/`summary` are recorded by provenance (via the op log), not + acted on here.""" + record = self._store.get(owner, repo, issue_number) + if record is None or record.status != STATUS_RUNNING: + return + self._runner.freeze(record.slug) + record.status = STATUS_FROZEN + record.last_checkin_at = self._now() + self._store.upsert(record) + + def link_pr(self, owner: str, repo: str, issue_number: int, pr_number: int) -> None: + """Record the PR a tracked issue produced, so PR comments and the + PR-close event route back to this record.""" + record = self._store.get(owner, repo, issue_number) + if record is not None: + record.pr_number = pr_number + self._store.upsert(record) + + # --- handlers ---------------------------------------------------------- + + def _on_issue_assigned(self, event: IssueAssigned) -> None: + target = resolve_target(event, self._forge, self._org) + if target is None: + return + # Idempotent: a webhook redelivery must not launch a second bottle. + if self._store.get(event.owner, event.repo, event.issue_number) is not None: + return + self._launch(event, target) + + def _launch(self, event: IssueAssigned, target: Target) -> None: + label = self._label_for(target.agent_name, event) + bottles = [target.bottle_override] if target.bottle_override else [] + result = self._runner.start( + agent=target.agent_name, + bottles=bottles, + label=label, + prompt=event.body, + forge_env=self._forge_env(event.owner, event.repo, event.issue_number), + ) + self._store.upsert( + RunRecord( + owner=event.owner, + repo=event.repo, + issue_number=event.issue_number, + slug=result.slug, + agent_name=target.agent_name, + bottle_names=bottles, + status=STATUS_RUNNING, + last_checkin_at=self._now(), + ) + ) + + def _on_comment(self, event: CommentCreated) -> None: + record = self._route_comment(event) + if record is None or record.status != STATUS_FROZEN: + return + # Echo-loop guard: ignore the agent's own comments. + if record.agent_git_user and event.author == record.agent_git_user: + return + self._runner.resume(record.slug, event.body) + record.status = STATUS_RUNNING + record.last_checkin_at = self._now() + self._store.upsert(record) + + def _route_comment(self, event: CommentCreated) -> RunRecord | None: + # A comment on the issue routes by issue number; a comment on a PR + # routes by the recorded pr_number. + direct = self._store.get(event.owner, event.repo, event.issue_number) + if direct is not None: + return direct + if event.is_pull: + return self._find_by_pr(event.owner, event.repo, event.issue_number) + return None + + def _on_pr_closed(self, event: PullRequestClosed) -> None: + record = self._find_by_pr(event.owner, event.repo, event.pr_number) + if record is None: + return + self._runner.destroy(record.slug) + record.status = STATUS_DESTROYED + self._store.delete(record.owner, record.repo, record.issue_number) + + def _find_by_pr(self, owner: str, repo: str, pr_number: int) -> RunRecord | None: + for record in self._store.all(): + if ( + record.owner == owner + and record.repo == repo + and record.pr_number == pr_number + ): + return record + return None + + def _forge_env(self, owner: str, repo: str, issue_number: int) -> dict[str, str]: + env = dict(self._forge_env_base) + if self._gitea_api: + env["FORGE_GITEA_API"] = self._gitea_api + env["FORGE_OWNER"] = owner + env["FORGE_REPO"] = repo + env["FORGE_ISSUE_NUMBER"] = str(issue_number) + return env diff --git a/bot_bottle/orchestrator/model.py b/bot_bottle/orchestrator/model.py new file mode 100644 index 0000000..98bef75 --- /dev/null +++ b/bot_bottle/orchestrator/model.py @@ -0,0 +1,108 @@ +"""Domain model: run records, forge events, provenance. + +These are the orchestrator's own dataclasses. `RunRecord` mirrors +bot-bottle's `ForgeState` field-for-field so the bootstrap adapter can +translate between them with no loss; keeping our own copy is what lets +the core stay import-free of bot-bottle. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field + +# Run lifecycle. A bottle is launched (running), frozen on the done +# signal, and destroyed when the PR closes. +STATUS_RUNNING = "running" +STATUS_FROZEN = "frozen" +STATUS_DESTROYED = "destroyed" + + +@dataclass +class RunRecord: + """One forge-targeted issue's bottle lifecycle record.""" + + owner: str + repo: str + issue_number: int + slug: str + agent_name: str + bottle_names: list[str] = field(default_factory=list) + backend_name: str = "" + agent_git_user: str = "" + pr_number: int | None = None + status: str = STATUS_RUNNING + last_checkin_at: str = "" + + +# --- Forge events (parsed webhook payloads) -------------------------------- + + +@dataclass(frozen=True) +class IssueAssigned: + """An issue gained an assignee — the trigger to consider a launch.""" + + owner: str + repo: str + issue_number: int + title: str + body: str + assignees: tuple[str, ...] + labels: tuple[str, ...] + + +@dataclass(frozen=True) +class CommentCreated: + """A comment was posted on an issue or PR — a rehydrate trigger.""" + + owner: str + repo: str + issue_number: int + comment_id: int + author: str + body: str + is_pull: bool + + +@dataclass(frozen=True) +class PullRequestClosed: + """A PR closed (merged or not) — the teardown trigger.""" + + owner: str + repo: str + pr_number: int + merged: bool + + +# Union of everything the webhook layer can emit. +ForgeEvent = IssueAssigned | CommentCreated | PullRequestClosed + + +# --- Provenance ------------------------------------------------------------ + + +@dataclass(frozen=True) +class ForgeOp: + """One semantic forge operation the sidecar recorded.""" + + at: str # ISO timestamp + op: str # e.g. "post_comment", "read_pr", "signal_done" + target: int | None + detail: str + + +@dataclass(frozen=True) +class Provenance: + """The audit record for one run, served by the provenance API. Never + posted into the forge.""" + + slug: str + owner: str + repo: str + issue_number: int + agent_name: str + bottle_names: tuple[str, ...] + started_at: str + finished_at: str + exit_code: int | None + watchdog_fired: bool + ops: tuple[ForgeOp, ...] diff --git a/bot_bottle/orchestrator/provenance.py b/bot_bottle/orchestrator/provenance.py new file mode 100644 index 0000000..e9aa508 --- /dev/null +++ b/bot_bottle/orchestrator/provenance.py @@ -0,0 +1,71 @@ +"""Provenance assembly + serialization. + +Provenance is the run's audit record: the `RunRecord` metadata plus the +sidecar's semantic operation log. It is exposed through the provenance +API (see `webhook.ProvenanceHandler`) and deliberately never posted back +into the forge — a mutable PR comment is not an audit record. + +This module only assembles and serializes; retention/signing of the +record is a control-plane concern out of scope here. +""" + +from __future__ import annotations + +from typing import Any + +from .model import ForgeOp, Provenance, RunRecord + + +def ops_from_log(entries: list[dict[str, Any]]) -> tuple[ForgeOp, ...]: + return tuple( + ForgeOp( + at=str(e.get("at", "")), + op=str(e.get("op", "")), + target=e.get("target"), + detail=str(e.get("detail", "")), + ) + for e in entries + ) + + +def build_provenance( + record: RunRecord, + *, + ops: tuple[ForgeOp, ...], + started_at: str, + finished_at: str, + exit_code: int | None, + watchdog_fired: bool, +) -> Provenance: + return Provenance( + slug=record.slug, + owner=record.owner, + repo=record.repo, + issue_number=record.issue_number, + agent_name=record.agent_name, + bottle_names=tuple(record.bottle_names), + started_at=started_at, + finished_at=finished_at, + exit_code=exit_code, + watchdog_fired=watchdog_fired, + ops=ops, + ) + + +def provenance_to_dict(p: Provenance) -> dict[str, Any]: + return { + "slug": p.slug, + "owner": p.owner, + "repo": p.repo, + "issue_number": p.issue_number, + "agent": p.agent_name, + "bottles": list(p.bottle_names), + "started_at": p.started_at, + "finished_at": p.finished_at, + "exit_code": p.exit_code, + "watchdog_fired": p.watchdog_fired, + "ops": [ + {"at": o.at, "op": o.op, "target": o.target, "detail": o.detail} + for o in p.ops + ], + } diff --git a/bot_bottle/orchestrator/runner.py b/bot_bottle/orchestrator/runner.py new file mode 100644 index 0000000..2bba90f --- /dev/null +++ b/bot_bottle/orchestrator/runner.py @@ -0,0 +1,118 @@ +"""Bottle runner: drive the bot-bottle CLI to manage a bottle's life. + +`BottleRunner` is the interface the lifecycle depends on; +`SubprocessBottleRunner` shells out to the bot-bottle `cli.py` +(`start --headless`, `commit`, `resume --headless`). The subprocess +callable is injectable so tests never spawn a process. + +The slug is derived from the label via `slugify`, matching bot-bottle's +container-slug rule; the orchestrator picks labels that embed the issue +identity so slugs are unique and collisions never rename them. +""" + +from __future__ import annotations + +import re +import subprocess +import sys +from collections.abc import Callable, Sequence +from dataclasses import dataclass +from typing import Protocol + + +@dataclass(frozen=True) +class RunResult: + slug: str + exit_code: int + + +class BottleRunner(Protocol): + def start( + self, + *, + agent: str, + bottles: Sequence[str], + label: str, + prompt: str, + forge_env: dict[str, str], + ) -> RunResult: ... + + def freeze(self, slug: str) -> int: ... + + def resume(self, slug: str, prompt: str) -> RunResult: ... + + def destroy(self, slug: str) -> int: ... + + +_SLUG_RE = re.compile(r"[^a-z0-9]+") + + +def slugify(label: str) -> str: + """Lowercase, collapse non-alphanumerics to single hyphens, strip + leading/trailing hyphens — matches bot-bottle's slug rule.""" + return _SLUG_RE.sub("-", label.lower()).strip("-") + + +# A subprocess.run-shaped callable, injectable for tests. +RunFn = Callable[[Sequence[str], dict[str, str]], int] + + +def _default_run(argv: Sequence[str], env: dict[str, str]) -> int: + return subprocess.run(list(argv), env=env, check=False).returncode + + +class SubprocessBottleRunner: + """Shells the bot-bottle CLI. `cli` is the path to `cli.py`; `python` + is the interpreter to run it with; `base_env` is the environment the + child inherits (the orchestrator's, minus per-run additions).""" + + def __init__( + self, + *, + cli: str, + base_env: dict[str, str], + python: str = sys.executable, + run: RunFn = _default_run, + ) -> None: + self._cli = cli + self._python = python + self._base_env = base_env + self._run = run + + def _argv(self, *args: str) -> list[str]: + return [self._python, self._cli, *args] + + def start( + self, + *, + agent: str, + bottles: Sequence[str], + label: str, + prompt: str, + forge_env: dict[str, str], + ) -> RunResult: + argv = self._argv( + "start", agent, "--headless", "--label", label, "--prompt", prompt + ) + for bottle in bottles: + argv += ["--bottle", bottle] + code = self._run(argv, {**self._base_env, **forge_env}) + return RunResult(slug=slugify(label), exit_code=code) + + def freeze(self, slug: str) -> int: + # bot-bottle's `commit` snapshots a running bottle's state. + return self._run(self._argv("commit", slug), self._base_env) + + def resume(self, slug: str, prompt: str) -> RunResult: + code = self._run( + self._argv("resume", slug, "--headless", "--prompt", prompt), + self._base_env, + ) + return RunResult(slug=slug, exit_code=code) + + def destroy(self, slug: str) -> int: + # NOTE: bot-bottle `cleanup` currently targets all bottles; a + # per-slug teardown command is a known integration follow-up + # (tracked in docs/JOURNAL.md). Kept behind this method so the + # call site does not change when that lands. + return self._run(self._argv("cleanup", slug), self._base_env) diff --git a/bot_bottle/orchestrator/sidecar.py b/bot_bottle/orchestrator/sidecar.py new file mode 100644 index 0000000..abc2db5 --- /dev/null +++ b/bot_bottle/orchestrator/sidecar.py @@ -0,0 +1,171 @@ +"""Forge sidecar: the agent's only door to the forge. + +The agent calls the sidecar over a line-delimited JSON-RPC AF_UNIX +socket; the sidecar dispatches to an injected `forge` (already +scope-wrapped by bootstrap) and holds the token, so the agent never sees +a credential or a forge endpoint. Every call is appended to a semantic +operation log (the provenance raw material). `signal_done` additionally +drops an event file in the queue dir the orchestrator drains. + +`dispatch` is pure and testable; `serve` wraps it in a socket server. +""" + +from __future__ import annotations + +import dataclasses +import json +import socketserver +import uuid +from collections.abc import Callable +from datetime import datetime +from pathlib import Path +from typing import Any + +_READ_METHODS = {"read_issue", "read_pr", "read_comments"} +_WRITE_METHODS = {"post_comment", "update_description"} + + +def _iso_now() -> str: + return datetime.now().astimezone().isoformat(timespec="seconds") + + +def _jsonable(value: Any) -> Any: + if dataclasses.is_dataclass(value) and not isinstance(value, type): + return dataclasses.asdict(value) + if isinstance(value, list): + return [_jsonable(v) for v in value] + return value + + +class OpLog: + """Append-only JSONL log of semantic forge operations.""" + + def __init__(self, path: Path, *, now: Callable[[], str] = _iso_now) -> None: + self._path = path + self._now = now + path.parent.mkdir(parents=True, exist_ok=True) + + def record(self, op: str, target: int | None, detail: str) -> None: + entry = {"at": self._now(), "op": op, "target": target, "detail": detail} + with self._path.open("a", encoding="utf-8") as fh: + fh.write(json.dumps(entry) + "\n") + + def read(self) -> list[dict[str, Any]]: + if not self._path.exists(): + return [] + return [ + json.loads(line) + for line in self._path.read_text(encoding="utf-8").splitlines() + if line.strip() + ] + + +def write_done_event(queue_dir: Path, event: dict[str, Any]) -> Path: + """Atomically drop a done-signal event file in the queue dir.""" + queue_dir.mkdir(parents=True, exist_ok=True) + path = queue_dir / f"done-{uuid.uuid4().hex}.json" + tmp = path.with_suffix(".json.tmp") + tmp.write_text(json.dumps(event), encoding="utf-8") + tmp.replace(path) + return path + + +def drain_done_events(queue_dir: Path) -> list[dict[str, Any]]: + """Read and remove every queued done-signal event.""" + if not queue_dir.is_dir(): + return [] + events: list[dict[str, Any]] = [] + for path in sorted(queue_dir.glob("done-*.json")): + try: + events.append(json.loads(path.read_text(encoding="utf-8"))) + except (OSError, ValueError): + continue + finally: + path.unlink(missing_ok=True) + return events + + +class ForgeSidecar: + """Dispatches sidecar protocol calls to the forge, logging each and + relaying `signal_done` to the queue dir. `run_key` is the + (owner, repo, issue_number) the run is bound to.""" + + def __init__( + self, + *, + forge: object, + op_log: OpLog, + queue_dir: Path, + run_key: tuple[str, str, int], + ) -> None: + self._forge = forge + self._log = op_log + self._queue_dir = queue_dir + self._owner, self._repo, self._issue = run_key + + def dispatch(self, method: str, params: dict[str, Any]) -> dict[str, Any]: + try: + result = self._invoke(method, params) + except Exception as exc: # noqa: BLE001 — surface as JSON-RPC error + self._log.record(method, params.get("number"), f"error: {exc}") + return {"ok": False, "error": str(exc)} + return {"ok": True, "result": result} + + def _invoke(self, method: str, params: dict[str, Any]) -> Any: + if method in _READ_METHODS: + number = int(params["number"]) + result = getattr(self._forge, method)(number) + self._log.record(method, number, "ok") + return _jsonable(result) + if method in _WRITE_METHODS: + number = int(params["number"]) + getattr(self._forge, method)(number, params["body"]) + self._log.record(method, number, "ok") + return None + if method == "signal_done": + status = str(params.get("status", "")) + summary = str(params.get("summary", "")) + self._log.record("signal_done", None, f"{status}: {summary}") + write_done_event( + self._queue_dir, + { + "owner": self._owner, + "repo": self._repo, + "issue_number": self._issue, + "status": status, + "summary": summary, + }, + ) + return None + raise ValueError(f"unknown method: {method}") + + +class _Handler(socketserver.StreamRequestHandler): + def handle(self) -> None: + line = self.rfile.readline() + if not line: + return + try: + req = json.loads(line) + except ValueError: + self.wfile.write(b'{"ok": false, "error": "invalid json"}\n') + return + resp = self.server.sidecar.dispatch( # type: ignore[attr-defined] + str(req.get("method", "")), dict(req.get("params", {})) + ) + self.wfile.write((json.dumps(resp) + "\n").encode()) + + +class _Server(socketserver.ThreadingUnixStreamServer): + def __init__(self, socket_path: str, sidecar: ForgeSidecar) -> None: + super().__init__(socket_path, _Handler) + self.sidecar = sidecar + + +def serve(sidecar: ForgeSidecar, socket_path: Path) -> _Server: + """Bind a threaded AF_UNIX server for `sidecar`. Caller runs + `serve_forever()` (or `handle_request()` in tests) and closes it.""" + if socket_path.exists(): + socket_path.unlink() + socket_path.parent.mkdir(parents=True, exist_ok=True) + return _Server(str(socket_path), sidecar) diff --git a/bot_bottle/orchestrator/store.py b/bot_bottle/orchestrator/store.py new file mode 100644 index 0000000..96ac229 --- /dev/null +++ b/bot_bottle/orchestrator/store.py @@ -0,0 +1,48 @@ +"""State store interface + an in-memory implementation. + +The orchestrator persists one `RunRecord` per forge-targeted issue. At +runtime `bootstrap` supplies an adapter over bot-bottle's +`SqliteForgeStateStore`; the in-memory store here backs tests and a +`--no-bot-bottle` dry mode. +""" + +from __future__ import annotations + +from typing import Protocol + +from .model import RunRecord + + +class StateStore(Protocol): + """Thin CRUD surface. Mirrors bot-bottle's `ForgeStateStore` so the + bootstrap adapter is a straight pass-through.""" + + def upsert(self, record: RunRecord) -> None: ... + + def get(self, owner: str, repo: str, issue_number: int) -> RunRecord | None: ... + + def delete(self, owner: str, repo: str, issue_number: int) -> None: ... + + def all(self) -> list[RunRecord]: ... + + +class InMemoryStateStore: + """Dict-backed `StateStore`, keyed by (owner, repo, issue_number).""" + + def __init__(self) -> None: + self._by_key: dict[tuple[str, str, int], RunRecord] = {} + + def upsert(self, record: RunRecord) -> None: + self._by_key[(record.owner, record.repo, record.issue_number)] = record + + def get(self, owner: str, repo: str, issue_number: int) -> RunRecord | None: + return self._by_key.get((owner, repo, issue_number)) + + def delete(self, owner: str, repo: str, issue_number: int) -> None: + self._by_key.pop((owner, repo, issue_number), None) + + def all(self) -> list[RunRecord]: + return sorted( + self._by_key.values(), + key=lambda r: (r.owner, r.repo, r.issue_number), + ) diff --git a/bot_bottle/orchestrator/targeting.py b/bot_bottle/orchestrator/targeting.py new file mode 100644 index 0000000..8a15a6d --- /dev/null +++ b/bot_bottle/orchestrator/targeting.py @@ -0,0 +1,51 @@ +"""Decide whether an assigned issue is agent-targeted, and for whom. + +An issue is forge-targeted when BOTH hold: +- it carries a `bot-bottle:` label naming the agent, and +- at least one assignee is a member of the configured org. + +An optional `bot-bottle-bottle:` label overrides bottle selection. +The forge is duck-typed: any object with `is_org_member(org, user)`. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Protocol + +from .config import BOTTLE_LABEL_PREFIX, LABEL_PREFIX +from .model import IssueAssigned + + +class Membership(Protocol): + def is_org_member(self, org: str, username: str) -> bool: ... + + +@dataclass(frozen=True) +class Target: + agent_name: str + bottle_override: str | None + + +def parse_labels(labels: tuple[str, ...]) -> tuple[str | None, str | None]: + """Return (agent_name, bottle_override) parsed from labels.""" + agent: str | None = None + bottle: str | None = None + for label in labels: + if label.startswith(BOTTLE_LABEL_PREFIX): + bottle = label[len(BOTTLE_LABEL_PREFIX):] or None + elif label.startswith(LABEL_PREFIX): + agent = label[len(LABEL_PREFIX):] or None + return agent, bottle + + +def resolve_target( + event: IssueAssigned, forge: Membership, org: str +) -> Target | None: + """Return the `Target` for a forge-targeted issue, or None to ignore.""" + agent, bottle = parse_labels(event.labels) + if not agent: + return None + if not any(forge.is_org_member(org, a) for a in event.assignees): + return None + return Target(agent_name=agent, bottle_override=bottle) diff --git a/bot_bottle/orchestrator/watchdog.py b/bot_bottle/orchestrator/watchdog.py new file mode 100644 index 0000000..3961ef9 --- /dev/null +++ b/bot_bottle/orchestrator/watchdog.py @@ -0,0 +1,68 @@ +"""Watchdog: freeze runs whose agent exited without signalling done. + +`sweep(now)` is the pure, testable core: any `running` record whose +`last_checkin_at` is older than the timeout is frozen as +done-without-self-report and returned so provenance can flag it. +`Watchdog.start()` runs `sweep` on a daemon thread once a minute. +""" + +from __future__ import annotations + +import threading +from datetime import datetime, timedelta + +from .model import STATUS_FROZEN, STATUS_RUNNING, RunRecord +from .runner import BottleRunner +from .store import StateStore + +_TICK_SECS = 60.0 + + +def _parse(ts: str) -> datetime | None: + try: + return datetime.fromisoformat(ts) + except (ValueError, TypeError): + return None + + +class Watchdog: + def __init__( + self, + *, + store: StateStore, + runner: BottleRunner, + timeout_secs: int, + ) -> None: + self._store = store + self._runner = runner + self._timeout = timedelta(seconds=timeout_secs) + self._stop = threading.Event() + self._thread: threading.Thread | None = None + + def sweep(self, now: datetime) -> list[RunRecord]: + """Freeze stale running records. Returns the ones fired.""" + fired: list[RunRecord] = [] + for record in self._store.all(): + if record.status != STATUS_RUNNING: + continue + checkin = _parse(record.last_checkin_at) + if checkin is None or now - checkin <= self._timeout: + continue + self._runner.freeze(record.slug) + record.status = STATUS_FROZEN + self._store.upsert(record) + fired.append(record) + return fired + + def start(self) -> None: + self._thread = threading.Thread(target=self._loop, daemon=True) + self._thread.start() + + def stop(self) -> None: + self._stop.set() + if self._thread is not None: + self._thread.join(timeout=_TICK_SECS) + + def _loop(self) -> None: + while not self._stop.wait(_TICK_SECS): + self.sweep(datetime.now().astimezone()) diff --git a/bot_bottle/orchestrator/webhook.py b/bot_bottle/orchestrator/webhook.py new file mode 100644 index 0000000..b3a46a0 --- /dev/null +++ b/bot_bottle/orchestrator/webhook.py @@ -0,0 +1,123 @@ +"""HTTP surface: the Gitea webhook receiver and the provenance API. + +`POST /webhook` — a Gitea event; parsed and dispatched to the orchestrator. +`GET /healthz` — liveness. +`GET /provenance?owner=&repo=&issue=` — the run's audit record (never + posted to the forge). + +Webhook signature verification is optional: set a secret and the handler +rejects bodies whose `X-Gitea-Signature` HMAC-SHA256 does not match. +""" + +from __future__ import annotations + +import hmac +import json +from collections.abc import Callable +from hashlib import sha256 +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from typing import Any +from urllib.parse import parse_qs, urlparse + +from .events import parse_event +from .lifecycle import Orchestrator +from .provenance import build_provenance, ops_from_log, provenance_to_dict +from .store import StateStore + +# (record) -> that run's op-log entries, injected by bootstrap. +OpLogReader = Callable[[Any], list[dict[str, Any]]] + + +class WebhookServer(ThreadingHTTPServer): + def __init__( + self, + address: tuple[str, int], + *, + orchestrator: Orchestrator, + store: StateStore, + secret: bytes | None = None, + op_log_reader: OpLogReader | None = None, + ) -> None: + super().__init__(address, _Handler) + self.orchestrator = orchestrator + self.store = store + self.secret = secret + self.op_log_reader = op_log_reader + + +def verify_signature(secret: bytes, body: bytes, signature: str) -> bool: + expected = hmac.new(secret, body, sha256).hexdigest() + return hmac.compare_digest(expected, signature or "") + + +class _Handler(BaseHTTPRequestHandler): + server: WebhookServer # type: ignore[assignment] + + def log_message( # pylint: disable=redefined-builtin + self, format: str, *args: Any + ) -> None: # quiet by default + pass + + def _send(self, code: int, payload: dict[str, Any]) -> None: + body = json.dumps(payload).encode() + self.send_response(code) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def do_POST(self) -> None: # noqa: N802 # pylint: disable=invalid-name + if urlparse(self.path).path != "/webhook": + self._send(404, {"error": "not found"}) + return + length = int(self.headers.get("Content-Length", "0")) + body = self.rfile.read(length) + if self.server.secret is not None: + sig = self.headers.get("X-Gitea-Signature", "") + if not verify_signature(self.server.secret, body, sig): + self._send(401, {"error": "bad signature"}) + return + try: + payload = json.loads(body) + except ValueError: + self._send(400, {"error": "invalid json"}) + return + kind = self.headers.get("X-Gitea-Event", "") + event = parse_event(kind, payload) + if event is not None: + self.server.orchestrator.handle(event) + self._send(200, {"ok": True, "handled": event is not None}) + + def do_GET(self) -> None: # noqa: N802 # pylint: disable=invalid-name + parsed = urlparse(self.path) + if parsed.path == "/healthz": + self._send(200, {"ok": True}) + return + if parsed.path == "/provenance": + self._provenance(parse_qs(parsed.query)) + return + self._send(404, {"error": "not found"}) + + def _provenance(self, query: dict[str, list[str]]) -> None: + try: + owner = query["owner"][0] + repo = query["repo"][0] + issue = int(query["issue"][0]) + except (KeyError, IndexError, ValueError): + self._send(400, {"error": "owner, repo, issue required"}) + return + record = self.server.store.get(owner, repo, issue) + if record is None: + self._send(404, {"error": "no such run"}) + return + reader = self.server.op_log_reader + ops = ops_from_log(reader(record) if reader is not None else []) + prov = build_provenance( + record, + ops=ops, + started_at="", + finished_at=record.last_checkin_at, + exit_code=None, + watchdog_fired=False, + ) + self._send(200, provenance_to_dict(prov)) diff --git a/tests/unit/orchestrator/__init__.py b/tests/unit/orchestrator/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/orchestrator/_fakes.py b/tests/unit/orchestrator/_fakes.py new file mode 100644 index 0000000..84dfd83 --- /dev/null +++ b/tests/unit/orchestrator/_fakes.py @@ -0,0 +1,69 @@ +"""Shared test doubles: a duck-typed forge and bottle runner.""" + +# Test doubles mirror an API shape; some params are intentionally unused. +# pylint: disable=unused-argument + +from __future__ import annotations + +from collections.abc import Sequence + +from bot_bottle.orchestrator.runner import RunResult, slugify + + +class FakeForge: + def __init__(self, members: tuple[str, ...] = ()) -> None: + self.members = set(members) + self.comments: list[tuple[int, str]] = [] + self.descriptions: list[tuple[int, str]] = [] + self.scope_denied: set[int] = set() + + def is_org_member(self, org: str, username: str) -> bool: + return username in self.members + + def read_issue(self, number: int) -> dict[str, object]: + return {"number": number, "kind": "issue"} + + def read_pr(self, number: int) -> dict[str, object]: + return {"number": number, "merged": False} + + def read_comments(self, number: int) -> list[dict[str, object]]: + return [{"id": 1, "user": "alice", "body": "hi"}] + + def post_comment(self, number: int, body: str) -> None: + if number in self.scope_denied: + raise PermissionError(f"write to #{number} denied") + self.comments.append((number, body)) + + def update_description(self, number: int, body: str) -> None: + if number in self.scope_denied: + raise PermissionError(f"write to #{number} denied") + self.descriptions.append((number, body)) + + +class FakeRunner: + def __init__(self) -> None: + self.calls: list[tuple[object, ...]] = [] + + def start( + self, + *, + agent: str, + bottles: Sequence[str], + label: str, + prompt: str, + forge_env: dict[str, str], + ) -> RunResult: + self.calls.append(("start", agent, tuple(bottles), label, prompt, dict(forge_env))) + return RunResult(slug=slugify(label), exit_code=0) + + def freeze(self, slug: str) -> int: + self.calls.append(("freeze", slug)) + return 0 + + def resume(self, slug: str, prompt: str) -> RunResult: + self.calls.append(("resume", slug, prompt)) + return RunResult(slug=slug, exit_code=0) + + def destroy(self, slug: str) -> int: + self.calls.append(("destroy", slug)) + return 0 diff --git a/tests/unit/orchestrator/test_config.py b/tests/unit/orchestrator/test_config.py new file mode 100644 index 0000000..ef7f5b3 --- /dev/null +++ b/tests/unit/orchestrator/test_config.py @@ -0,0 +1,38 @@ +"""Unit: Config.from_env.""" + +from __future__ import annotations + +import unittest +from pathlib import Path + +from bot_bottle.orchestrator.config import Config + + +class ConfigTest(unittest.TestCase): + def test_defaults(self): + c = Config.from_env({"HOME": "/home/x"}) + self.assertEqual("bot-bottle", c.forge_org) + self.assertEqual(1800, c.watchdog_timeout_secs) + self.assertEqual("127.0.0.1", c.webhook_host) + self.assertEqual(8477, c.webhook_port) + self.assertEqual(Path("/home/x/.bot-bottle/forge-queue"), c.queue_dir) + self.assertIsNone(c.db_path) + + def test_overrides(self): + c = Config.from_env({ + "HOME": "/home/x", + "FORGE_ORG": "agents", + "FORGE_WATCHDOG_TIMEOUT": "60", + "FORGE_GITEA_API": "https://g.example/api/v1", + "FORGE_WEBHOOK_PORT": "9000", + "FORGE_DB_PATH": "/data/bb.db", + }) + self.assertEqual("agents", c.forge_org) + self.assertEqual(60, c.watchdog_timeout_secs) + self.assertEqual("https://g.example/api/v1", c.gitea_api) + self.assertEqual(9000, c.webhook_port) + self.assertEqual(Path("/data/bb.db"), c.db_path) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/orchestrator/test_events.py b/tests/unit/orchestrator/test_events.py new file mode 100644 index 0000000..da0c721 --- /dev/null +++ b/tests/unit/orchestrator/test_events.py @@ -0,0 +1,64 @@ +"""Unit: webhook payload parsing.""" + +from __future__ import annotations + +import unittest + +from bot_bottle.orchestrator.events import parse_event +from bot_bottle.orchestrator.model import CommentCreated, IssueAssigned, PullRequestClosed + +_REPO = {"repository": {"name": "bot-bottle", "owner": {"login": "didericis"}}} + + +class ParseEventTest(unittest.TestCase): + def test_issue_assigned(self): + payload = { + **_REPO, + "action": "assigned", + "issue": { + "number": 17, + "title": "Fix it", + "body": "please", + "assignees": [{"login": "agent-bot"}], + "labels": [{"name": "bot-bottle:implementer"}], + }, + } + ev = parse_event("issues", payload) + self.assertIsInstance(ev, IssueAssigned) + assert isinstance(ev, IssueAssigned) + self.assertEqual(("didericis", "bot-bottle", 17), (ev.owner, ev.repo, ev.issue_number)) + self.assertEqual(("agent-bot",), ev.assignees) + self.assertEqual(("bot-bottle:implementer",), ev.labels) + + def test_issue_non_assigned_ignored(self): + self.assertIsNone(parse_event("issues", {**_REPO, "action": "opened", "issue": {}})) + + def test_comment_created(self): + payload = { + **_REPO, + "action": "created", + "issue": {"number": 42, "pull_request": {"x": 1}}, + "comment": {"id": 5, "user": {"login": "reviewer"}, "body": "redo"}, + } + ev = parse_event("issue_comment", payload) + assert isinstance(ev, CommentCreated) + self.assertEqual(42, ev.issue_number) + self.assertEqual("reviewer", ev.author) + self.assertTrue(ev.is_pull) + + def test_pull_request_closed(self): + payload = {**_REPO, "action": "closed", "pull_request": {"number": 8, "merged": True}} + ev = parse_event("pull_request", payload) + assert isinstance(ev, PullRequestClosed) + self.assertEqual(8, ev.pr_number) + self.assertTrue(ev.merged) + + def test_pull_request_non_closed_ignored(self): + self.assertIsNone(parse_event("pull_request", {**_REPO, "action": "opened"})) + + def test_unknown_kind_ignored(self): + self.assertIsNone(parse_event("push", {**_REPO})) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/orchestrator/test_forge_state.py b/tests/unit/orchestrator/test_forge_state.py new file mode 100644 index 0000000..1fde1ad --- /dev/null +++ b/tests/unit/orchestrator/test_forge_state.py @@ -0,0 +1,75 @@ +"""Unit: ForgeState + SqliteForgeStateStore.""" + +from __future__ import annotations + +import unittest + +from bot_bottle.contrib.gitea.forge_state import ForgeState, SqliteForgeStateStore + + +def _state(**kw: object) -> ForgeState: + defaults: dict[str, object] = dict( + owner="alice", repo="myrepo", issue_number=1, + slug="impl-alice-myrepo-1", agent_name="impl", + ) + defaults.update(kw) + return ForgeState(**defaults) # type: ignore[arg-type] + + +class ForgeStateStoreTest(unittest.TestCase): + def setUp(self): + self.store = SqliteForgeStateStore(None) + + def test_upsert_and_get(self): + s = _state() + self.store.upsert(s) + got = self.store.get("alice", "myrepo", 1) + assert got is not None + self.assertEqual("impl-alice-myrepo-1", got.slug) + self.assertEqual("impl", got.agent_name) + + def test_get_missing(self): + self.assertIsNone(self.store.get("alice", "myrepo", 99)) + + def test_upsert_replaces(self): + self.store.upsert(_state(status="running")) + self.store.upsert(_state(status="frozen")) + got = self.store.get("alice", "myrepo", 1) + assert got is not None + self.assertEqual("frozen", got.status) + + def test_delete(self): + self.store.upsert(_state()) + self.store.delete("alice", "myrepo", 1) + self.assertIsNone(self.store.get("alice", "myrepo", 1)) + + def test_delete_missing_no_error(self): + self.store.delete("alice", "myrepo", 99) + + def test_all_sorted(self): + self.store.upsert(_state(owner="z", issue_number=2)) + self.store.upsert(_state(owner="a", issue_number=1)) + rows = self.store.all() + self.assertEqual(("a", "z"), (rows[0].owner, rows[1].owner)) + + def test_bottle_names_roundtrip(self): + self.store.upsert(_state(bottle_names=["claude", "dev"])) + got = self.store.get("alice", "myrepo", 1) + assert got is not None + self.assertEqual(["claude", "dev"], got.bottle_names) + + def test_pr_number_none_roundtrip(self): + self.store.upsert(_state(pr_number=None)) + got = self.store.get("alice", "myrepo", 1) + assert got is not None + self.assertIsNone(got.pr_number) + + def test_pr_number_int_roundtrip(self): + self.store.upsert(_state(pr_number=42)) + got = self.store.get("alice", "myrepo", 1) + assert got is not None + self.assertEqual(42, got.pr_number) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/orchestrator/test_lifecycle.py b/tests/unit/orchestrator/test_lifecycle.py new file mode 100644 index 0000000..449ac3e --- /dev/null +++ b/tests/unit/orchestrator/test_lifecycle.py @@ -0,0 +1,140 @@ +"""Unit: the orchestration lifecycle.""" + +from __future__ import annotations + +import unittest +from typing import cast + +from bot_bottle.orchestrator.lifecycle import Orchestrator +from bot_bottle.orchestrator.model import ( + STATUS_FROZEN, + STATUS_RUNNING, + CommentCreated, + IssueAssigned, + PullRequestClosed, +) +from bot_bottle.orchestrator.store import InMemoryStateStore + +from ._fakes import FakeForge, FakeRunner + + +def _assigned( + labels: tuple[str, ...] = ("bot-bottle:impl",), + assignees: tuple[str, ...] = ("agent-bot",), +) -> IssueAssigned: + return IssueAssigned( + owner="didericis", repo="bot-bottle", issue_number=17, + title="t", body="the task", assignees=tuple(assignees), labels=tuple(labels), + ) + + +class LifecycleTest(unittest.TestCase): + def setUp(self): + self.forge = FakeForge(members=("agent-bot",)) + self.store = InMemoryStateStore() + self.runner = FakeRunner() + self.orch = Orchestrator( + forge=self.forge, store=self.store, runner=self.runner, + org="bot-bottle", gitea_api="https://g/api/v1", + now=lambda: "2026-07-01T00:00:00-04:00", + ) + + def _record(self): + return self.store.get("didericis", "bot-bottle", 17) + + def test_assigned_targeted_launches(self): + self.orch.handle(_assigned()) + rec = self._record() + assert rec is not None + self.assertEqual(STATUS_RUNNING, rec.status) + self.assertEqual("impl-didericis-bot-bottle-17", rec.slug) + self.assertEqual("start", self.runner.calls[0][0]) + # forge context injected into the child env. + env = cast("dict[str, str]", self.runner.calls[0][5]) + self.assertEqual("didericis", env["FORGE_OWNER"]) + self.assertEqual("17", env["FORGE_ISSUE_NUMBER"]) + + def test_untargeted_ignored(self): + self.orch.handle(_assigned(labels=("bug",))) + self.assertIsNone(self._record()) + self.assertEqual([], self.runner.calls) + + def test_assigned_is_idempotent(self): + self.orch.handle(_assigned()) + self.orch.handle(_assigned()) # redelivery + starts = [c for c in self.runner.calls if c[0] == "start"] + self.assertEqual(1, len(starts)) + + def test_done_signal_freezes(self): + self.orch.handle(_assigned()) + self.orch.on_done_signal("didericis", "bot-bottle", 17, "success", "done") + rec = self._record() + assert rec is not None + self.assertEqual(STATUS_FROZEN, rec.status) + self.assertIn(("freeze", "impl-didericis-bot-bottle-17"), self.runner.calls) + + def test_done_signal_ignored_when_not_running(self): + # No record yet -> no freeze. + self.orch.on_done_signal("didericis", "bot-bottle", 17, "s", "") + self.assertEqual([], self.runner.calls) + + def test_comment_on_frozen_resumes(self): + self.orch.handle(_assigned()) + self.orch.on_done_signal("didericis", "bot-bottle", 17, "s", "") + self.orch.handle(CommentCreated( + owner="didericis", repo="bot-bottle", issue_number=17, + comment_id=1, author="reviewer", body="please redo", is_pull=False, + )) + rec = self._record() + assert rec is not None + self.assertEqual(STATUS_RUNNING, rec.status) + self.assertIn(("resume", "impl-didericis-bot-bottle-17", "please redo"), + self.runner.calls) + + def test_comment_echo_guard(self): + self.orch.handle(_assigned()) + self.orch.on_done_signal("didericis", "bot-bottle", 17, "s", "") + rec = self._record() + assert rec is not None + rec.agent_git_user = "agent-bot" + self.store.upsert(rec) + self.orch.handle(CommentCreated( + owner="didericis", repo="bot-bottle", issue_number=17, + comment_id=2, author="agent-bot", body="I finished", is_pull=False, + )) + # Still frozen, no resume triggered by the agent's own comment. + self.assertEqual(STATUS_FROZEN, self._record().status) # type: ignore[union-attr] + self.assertNotIn("resume", [c[0] for c in self.runner.calls]) + + def test_comment_on_running_ignored(self): + self.orch.handle(_assigned()) # running + self.orch.handle(CommentCreated( + owner="didericis", repo="bot-bottle", issue_number=17, + comment_id=1, author="reviewer", body="hi", is_pull=False, + )) + self.assertNotIn("resume", [c[0] for c in self.runner.calls]) + + def test_pr_comment_routes_via_link(self): + self.orch.handle(_assigned()) + self.orch.on_done_signal("didericis", "bot-bottle", 17, "s", "") + self.orch.link_pr("didericis", "bot-bottle", 17, 42) + # Comment arrives on PR #42 (issue_number == PR number in Gitea). + self.orch.handle(CommentCreated( + owner="didericis", repo="bot-bottle", issue_number=42, + comment_id=9, author="reviewer", body="fix", is_pull=True, + )) + self.assertIn(("resume", "impl-didericis-bot-bottle-17", "fix"), + self.runner.calls) + + def test_pr_closed_destroys_and_removes(self): + self.orch.handle(_assigned()) + self.orch.link_pr("didericis", "bot-bottle", 17, 42) + self.orch.handle(PullRequestClosed( + owner="didericis", repo="bot-bottle", pr_number=42, merged=True, + )) + self.assertIn(("destroy", "impl-didericis-bot-bottle-17"), self.runner.calls) + self.assertIsNone(self._record()) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/orchestrator/test_provenance.py b/tests/unit/orchestrator/test_provenance.py new file mode 100644 index 0000000..7c706a8 --- /dev/null +++ b/tests/unit/orchestrator/test_provenance.py @@ -0,0 +1,53 @@ +"""Unit: provenance assembly + serialization.""" + +from __future__ import annotations + +import unittest + +from bot_bottle.orchestrator.model import RunRecord +from bot_bottle.orchestrator.provenance import build_provenance, ops_from_log, provenance_to_dict + + +def _record() -> RunRecord: + return RunRecord( + owner="didericis", repo="bot-bottle", issue_number=17, + slug="impl-17", agent_name="impl", bottle_names=["claude"], + last_checkin_at="2026-07-01T00:05:00-04:00", + ) + + +class ProvenanceTest(unittest.TestCase): + def test_ops_from_log(self): + ops = ops_from_log([ + {"at": "T1", "op": "read_pr", "target": 5, "detail": "ok"}, + {"at": "T2", "op": "signal_done", "target": None, "detail": "success: done"}, + ]) + self.assertEqual(2, len(ops)) + self.assertEqual("read_pr", ops[0].op) + self.assertIsNone(ops[1].target) + + def test_build_and_serialize(self): + ops = ops_from_log([{"at": "T1", "op": "post_comment", "target": 17, "detail": "ok"}]) + prov = build_provenance( + _record(), ops=ops, started_at="2026-07-01T00:00:00-04:00", + finished_at="2026-07-01T00:05:00-04:00", exit_code=0, watchdog_fired=False, + ) + d = provenance_to_dict(prov) + self.assertEqual("impl-17", d["slug"]) + self.assertEqual("didericis", d["owner"]) + self.assertEqual(["claude"], d["bottles"]) + self.assertEqual(0, d["exit_code"]) + self.assertFalse(d["watchdog_fired"]) + self.assertEqual(1, len(d["ops"])) + self.assertEqual("post_comment", d["ops"][0]["op"]) + + def test_watchdog_flag_serialized(self): + prov = build_provenance( + _record(), ops=(), started_at="", finished_at="", + exit_code=None, watchdog_fired=True, + ) + self.assertTrue(provenance_to_dict(prov)["watchdog_fired"]) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/orchestrator/test_runner.py b/tests/unit/orchestrator/test_runner.py new file mode 100644 index 0000000..1b51b33 --- /dev/null +++ b/tests/unit/orchestrator/test_runner.py @@ -0,0 +1,65 @@ +"""Unit: SubprocessBottleRunner + slugify (injected run fn).""" + +from __future__ import annotations + +import unittest +from collections.abc import Sequence + +from bot_bottle.orchestrator.runner import SubprocessBottleRunner, slugify + + +class SlugifyTest(unittest.TestCase): + def test_basic(self): + self.assertEqual("impl-didericis-bot-bottle-17", + slugify("impl-didericis-bot-bottle-17")) + + def test_collapses_and_strips(self): + self.assertEqual("a-b-c", slugify(" A_B/C!! ")) + + +class SubprocessRunnerTest(unittest.TestCase): + def setUp(self): + self.argvs: list[list[str]] = [] + self.envs: list[dict[str, str]] = [] + + def fake_run(argv: Sequence[str], env: dict[str, str]) -> int: + self.argvs.append(list(argv)) + self.envs.append(dict(env)) + return 0 + + self.runner = SubprocessBottleRunner( + cli="/x/cli.py", base_env={"PATH": "/bin"}, python="/py", run=fake_run + ) + + def test_start_argv_and_env(self): + result = self.runner.start( + agent="impl", bottles=["claude", "dev"], label="impl-r-17", + prompt="do it", forge_env={"FORGE_OWNER": "didericis"}, + ) + self.assertEqual("impl-r-17", result.slug) + argv = self.argvs[0] + self.assertEqual(["/py", "/x/cli.py", "start", "impl", "--headless", + "--label", "impl-r-17", "--prompt", "do it", + "--bottle", "claude", "--bottle", "dev"], argv) + # forge_env merged over base_env for the child. + self.assertEqual("didericis", self.envs[0]["FORGE_OWNER"]) + self.assertEqual("/bin", self.envs[0]["PATH"]) + + def test_start_no_bottles_omits_flag(self): + self.runner.start(agent="impl", bottles=[], label="l", prompt="p", forge_env={}) + self.assertNotIn("--bottle", self.argvs[0]) + + def test_freeze_calls_commit(self): + self.runner.freeze("slug-1") + self.assertEqual(["/py", "/x/cli.py", "commit", "slug-1"], self.argvs[0]) + + def test_resume_headless(self): + r = self.runner.resume("slug-1", "address review") + self.assertEqual("slug-1", r.slug) + self.assertEqual( + ["/py", "/x/cli.py", "resume", "slug-1", "--headless", "--prompt", + "address review"], self.argvs[0]) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/orchestrator/test_scoped_forge.py b/tests/unit/orchestrator/test_scoped_forge.py new file mode 100644 index 0000000..c42e974 --- /dev/null +++ b/tests/unit/orchestrator/test_scoped_forge.py @@ -0,0 +1,75 @@ +"""Unit: ScopedForge — read-anywhere / write-scoped access control.""" + +from __future__ import annotations + +import unittest + +from bot_bottle.contrib.forge.base import ScopedForge + +from ._fakes import FakeForge + + +class ScopedForgeTest(unittest.TestCase): + def setUp(self): + self.inner = FakeForge() + self.scoped = ScopedForge( + self.inner, assigned_issue=10, assigned_prs=[20, 30] + ) + + # --- reads always pass through ----------------------------------------- + + def test_read_issue_allowed_anywhere(self): + for number in (10, 20, 99): + result = self.scoped.read_issue(number) + self.assertEqual(number, result["number"]) + + def test_read_pr_allowed_anywhere(self): + for number in (10, 20, 99): + result = self.scoped.read_pr(number) + self.assertEqual(number, result["number"]) + + def test_read_comments_allowed_anywhere(self): + comments = self.scoped.read_comments(99) + self.assertTrue(len(comments) > 0) + + def test_is_org_member_passes_through(self): + inner = FakeForge(members=("alice",)) + scoped = ScopedForge(inner, assigned_issue=1, assigned_prs=[]) + self.assertTrue(scoped.is_org_member("org", "alice")) + self.assertFalse(scoped.is_org_member("org", "bob")) + + # --- writes: assigned numbers allowed ---------------------------------- + + def test_post_comment_on_assigned_issue(self): + self.scoped.post_comment(10, "hi") + self.assertIn((10, "hi"), self.inner.comments) + + def test_post_comment_on_assigned_pr(self): + self.scoped.post_comment(20, "lgtm") + self.assertIn((20, "lgtm"), self.inner.comments) + + def test_update_description_on_assigned(self): + self.scoped.update_description(30, "updated") + self.assertIn((30, "updated"), self.inner.descriptions) + + # --- writes: unassigned numbers denied --------------------------------- + + def test_post_comment_denied_for_unassigned(self): + with self.assertRaises(PermissionError): + self.scoped.post_comment(99, "nope") + self.assertEqual([], self.inner.comments) + + def test_update_description_denied_for_unassigned(self): + with self.assertRaises(PermissionError): + self.scoped.update_description(99, "nope") + self.assertEqual([], self.inner.descriptions) + + def test_error_message_names_number(self): + try: + self.scoped.post_comment(99, "nope") + except PermissionError as exc: + self.assertIn("99", str(exc)) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/orchestrator/test_sidecar.py b/tests/unit/orchestrator/test_sidecar.py new file mode 100644 index 0000000..2a6c473 --- /dev/null +++ b/tests/unit/orchestrator/test_sidecar.py @@ -0,0 +1,108 @@ +"""Unit: forge sidecar dispatch, op log, queue relay, socket server.""" + +from __future__ import annotations + +import json +import socket +import tempfile +import threading +import unittest +from pathlib import Path + +from bot_bottle.orchestrator.sidecar import ( + ForgeSidecar, + OpLog, + drain_done_events, + serve, + write_done_event, +) + +from ._fakes import FakeForge + + +class SidecarDispatchTest(unittest.TestCase): + def setUp(self): + self.tmp = Path(self.enterContext(tempfile.TemporaryDirectory())) # pylint: disable=consider-using-with + self.forge = FakeForge() + self.log = OpLog(self.tmp / "ops.jsonl", now=lambda: "T") + self.queue = self.tmp / "queue" + self.sc = ForgeSidecar( + forge=self.forge, op_log=self.log, queue_dir=self.queue, + run_key=("o", "r", 17), + ) + + def test_read_pr_ok_and_logged(self): + resp = self.sc.dispatch("read_pr", {"number": 5}) + self.assertTrue(resp["ok"]) + self.assertEqual(5, resp["result"]["number"]) + self.assertEqual([("read_pr", 5, "ok")], + [(o["op"], o["target"], o["detail"]) for o in self.log.read()]) + + def test_post_comment_writes_and_logs(self): + resp = self.sc.dispatch("post_comment", {"number": 17, "body": "done"}) + self.assertTrue(resp["ok"]) + self.assertEqual([(17, "done")], self.forge.comments) + + def test_scope_denied_write_returns_error_and_audits_rejection(self): + self.forge.scope_denied.add(999) + resp = self.sc.dispatch("post_comment", {"number": 999, "body": "x"}) + self.assertFalse(resp["ok"]) + self.assertIn("denied", resp["error"]) + # The rejection is recorded in the op log, not just the allows. + self.assertIn("error", self.log.read()[-1]["detail"]) + self.assertEqual([], self.forge.comments) + + def test_signal_done_queues_event(self): + resp = self.sc.dispatch("signal_done", {"status": "success", "summary": "ok"}) + self.assertTrue(resp["ok"]) + events = drain_done_events(self.queue) + self.assertEqual(1, len(events)) + self.assertEqual(("o", "r", 17, "success"), + (events[0]["owner"], events[0]["repo"], + events[0]["issue_number"], events[0]["status"])) + + def test_unknown_method(self): + resp = self.sc.dispatch("delete_repo", {}) + self.assertFalse(resp["ok"]) + + +class QueueTest(unittest.TestCase): + def test_drain_removes_events(self): + tmp = Path(self.enterContext(tempfile.TemporaryDirectory())) # pylint: disable=consider-using-with + write_done_event(tmp, {"owner": "o", "repo": "r", "issue_number": 1}) + self.assertEqual(1, len(drain_done_events(tmp))) + self.assertEqual([], drain_done_events(tmp)) # drained + + def test_drain_missing_dir(self): + self.assertEqual([], drain_done_events(Path("/nonexistent/queue"))) + + +class SocketServerTest(unittest.TestCase): + def test_round_trip_over_unix_socket(self): + tmp = tempfile.mkdtemp() + sock = Path(tmp) / "s.sock" + if len(str(sock)) > 100: # AF_UNIX path limit; skip on long tmp paths + self.skipTest("temp socket path too long for AF_UNIX") + sidecar = ForgeSidecar( + forge=FakeForge(), op_log=OpLog(Path(tmp) / "ops.jsonl"), + queue_dir=Path(tmp) / "q", run_key=("o", "r", 1), + ) + srv = serve(sidecar, sock) + t = threading.Thread(target=srv.handle_request, daemon=True) + t.start() + try: + client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + client.connect(str(sock)) + client.sendall(b'{"method": "read_issue", "params": {"number": 3}}\n') + line = client.makefile().readline() + client.close() + finally: + t.join(timeout=5) + srv.server_close() + resp = json.loads(line) + self.assertTrue(resp["ok"]) + self.assertEqual(3, resp["result"]["number"]) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/orchestrator/test_store.py b/tests/unit/orchestrator/test_store.py new file mode 100644 index 0000000..a58d098 --- /dev/null +++ b/tests/unit/orchestrator/test_store.py @@ -0,0 +1,50 @@ +"""Unit: InMemoryStateStore.""" + +from __future__ import annotations + +import unittest + +from bot_bottle.orchestrator.model import RunRecord +from bot_bottle.orchestrator.store import InMemoryStateStore + + +def _rec(issue: int, owner: str = "o") -> RunRecord: + return RunRecord(owner=owner, repo="r", issue_number=issue, slug=f"s{issue}", + agent_name="a") + + +class InMemoryStoreTest(unittest.TestCase): + def setUp(self): + self.store = InMemoryStateStore() + + def test_upsert_get(self): + self.store.upsert(_rec(1)) + got = self.store.get("o", "r", 1) + assert got is not None + self.assertEqual("s1", got.slug) + + def test_get_missing(self): + self.assertIsNone(self.store.get("o", "r", 99)) + + def test_upsert_replaces(self): + self.store.upsert(_rec(1)) + r = _rec(1) + r.slug = "changed" + self.store.upsert(r) + self.assertEqual("changed", self.store.get("o", "r", 1).slug) # type: ignore[union-attr] + self.assertEqual(1, len(self.store.all())) + + def test_delete(self): + self.store.upsert(_rec(1)) + self.store.delete("o", "r", 1) + self.assertIsNone(self.store.get("o", "r", 1)) + + def test_all_sorted(self): + self.store.upsert(_rec(2, owner="b")) + self.store.upsert(_rec(1, owner="a")) + self.assertEqual([("a", 1), ("b", 2)], + [(r.owner, r.issue_number) for r in self.store.all()]) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/orchestrator/test_targeting.py b/tests/unit/orchestrator/test_targeting.py new file mode 100644 index 0000000..c28f969 --- /dev/null +++ b/tests/unit/orchestrator/test_targeting.py @@ -0,0 +1,60 @@ +"""Unit: targeting (labels + org membership).""" + +from __future__ import annotations + +import unittest + +from bot_bottle.orchestrator.model import IssueAssigned +from bot_bottle.orchestrator.targeting import parse_labels, resolve_target + +from ._fakes import FakeForge + + +def _issue( + assignees: tuple[str, ...] = ("agent-bot",), + labels: tuple[str, ...] = ("bot-bottle:implementer",), +) -> IssueAssigned: + return IssueAssigned( + owner="didericis", repo="bot-bottle", issue_number=17, + title="t", body="b", assignees=tuple(assignees), labels=tuple(labels), + ) + + +class ParseLabelsTest(unittest.TestCase): + def test_agent_label(self): + self.assertEqual(("implementer", None), parse_labels(("bot-bottle:implementer",))) + + def test_bottle_override_not_confused_with_agent(self): + agent, bottle = parse_labels(("bot-bottle:impl", "bot-bottle-bottle:dev")) + self.assertEqual(("impl", "dev"), (agent, bottle)) + + def test_no_agent_label(self): + self.assertEqual((None, None), parse_labels(("bug", "p1"))) + + +class ResolveTargetTest(unittest.TestCase): + def setUp(self): + self.forge = FakeForge(members=("agent-bot",)) + + def test_targeted(self): + target = resolve_target(_issue(), self.forge, "bot-bottle") + assert target is not None + self.assertEqual("implementer", target.agent_name) + self.assertIsNone(target.bottle_override) + + def test_bottle_override(self): + ev = _issue(labels=("bot-bottle:impl", "bot-bottle-bottle:dev")) + target = resolve_target(ev, self.forge, "bot-bottle") + assert target is not None + self.assertEqual("dev", target.bottle_override) + + def test_no_label_not_targeted(self): + self.assertIsNone(resolve_target(_issue(labels=("bug",)), self.forge, "bot-bottle")) + + def test_non_member_assignee_not_targeted(self): + ev = _issue(assignees=("random-user",)) + self.assertIsNone(resolve_target(ev, self.forge, "bot-bottle")) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/orchestrator/test_watchdog.py b/tests/unit/orchestrator/test_watchdog.py new file mode 100644 index 0000000..acda3c6 --- /dev/null +++ b/tests/unit/orchestrator/test_watchdog.py @@ -0,0 +1,66 @@ +"""Unit: watchdog sweep.""" + +from __future__ import annotations + +import unittest +from datetime import datetime, timedelta + +from bot_bottle.orchestrator.model import STATUS_FROZEN, STATUS_RUNNING, RunRecord +from bot_bottle.orchestrator.store import InMemoryStateStore +from bot_bottle.orchestrator.watchdog import Watchdog + +from ._fakes import FakeRunner + +_NOW = datetime(2026, 7, 1, 12, 0, 0).astimezone() + + +def _record(issue: int, status: str, checkin: str) -> RunRecord: + return RunRecord( + owner="o", repo="r", issue_number=issue, slug=f"s{issue}", + agent_name="a", status=status, last_checkin_at=checkin, + ) + + +class WatchdogSweepTest(unittest.TestCase): + def setUp(self): + self.store = InMemoryStateStore() + self.runner = FakeRunner() + self.wd = Watchdog(store=self.store, runner=self.runner, timeout_secs=1800) + + def _status(self, issue: int) -> str: + rec = self.store.get("o", "r", issue) + assert rec is not None + return rec.status + + def test_stale_running_is_frozen(self): + stale = (_NOW - timedelta(minutes=31)).isoformat() + self.store.upsert(_record(1, STATUS_RUNNING, stale)) + fired = self.wd.sweep(_NOW) + self.assertEqual([1], [r.issue_number for r in fired]) + self.assertEqual(STATUS_FROZEN, self._status(1)) + self.assertIn(("freeze", "s1"), self.runner.calls) + + def test_fresh_running_untouched(self): + fresh = (_NOW - timedelta(minutes=5)).isoformat() + self.store.upsert(_record(2, STATUS_RUNNING, fresh)) + self.assertEqual([], self.wd.sweep(_NOW)) + self.assertEqual(STATUS_RUNNING, self._status(2)) + + def test_non_running_ignored(self): + stale = (_NOW - timedelta(hours=2)).isoformat() + self.store.upsert(_record(3, STATUS_FROZEN, stale)) + self.assertEqual([], self.wd.sweep(_NOW)) + + def test_unparseable_checkin_skipped(self): + self.store.upsert(_record(4, STATUS_RUNNING, "not-a-time")) + self.assertEqual([], self.wd.sweep(_NOW)) + + def test_start_and_stop(self): + # Exercises the daemon-thread start/stop path; stop sets the event + # so the loop's wait returns immediately. + self.wd.start() + self.wd.stop() + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/orchestrator/test_webhook.py b/tests/unit/orchestrator/test_webhook.py new file mode 100644 index 0000000..dd1a844 --- /dev/null +++ b/tests/unit/orchestrator/test_webhook.py @@ -0,0 +1,155 @@ +"""Unit: webhook HTTP surface (signature + routing over a real server).""" + +from __future__ import annotations + +import hashlib +import hmac +import json +import threading +import unittest +import urllib.request +from urllib.error import HTTPError + +from bot_bottle.orchestrator.model import RunRecord +from bot_bottle.orchestrator.store import InMemoryStateStore +from bot_bottle.orchestrator.webhook import WebhookServer, verify_signature + +_ISSUE_ASSIGNED = { + "action": "assigned", + "repository": {"name": "bot-bottle", "owner": {"login": "didericis"}}, + "issue": { + "number": 17, "title": "t", "body": "b", + "assignees": [{"login": "agent-bot"}], + "labels": [{"name": "bot-bottle:impl"}], + }, +} + + +class _RecordingOrch: + def __init__(self) -> None: + self.events: list[object] = [] + + def handle(self, event: object) -> None: + self.events.append(event) + + +class SignatureTest(unittest.TestCase): + def test_verify(self): + secret = b"s3cret" + body = b'{"x":1}' + sig = hmac.new(secret, body, hashlib.sha256).hexdigest() + self.assertTrue(verify_signature(secret, body, sig)) + self.assertFalse(verify_signature(secret, body, "deadbeef")) + + +class WebhookServerTest(unittest.TestCase): + # _serve is the per-test setup; attributes are assigned there. + # pylint: disable=attribute-defined-outside-init + def _serve(self, **kwargs: object) -> None: + self.orch = _RecordingOrch() + kwargs.setdefault("store", InMemoryStateStore()) + self.server = WebhookServer( + ("127.0.0.1", 0), orchestrator=self.orch, **kwargs, # type: ignore[arg-type] + ) + self.port = self.server.server_address[1] + self.thread = threading.Thread(target=self.server.serve_forever, daemon=True) + self.thread.start() + self.addCleanup(self._shutdown) + + def _shutdown(self) -> None: + self.server.shutdown() + self.server.server_close() + self.thread.join(timeout=5) + + def _post( + self, path: str, body: bytes, headers: dict[str, str] | None = None + ) -> tuple[int, dict[str, object]]: + req = urllib.request.Request( + f"http://127.0.0.1:{self.port}{path}", data=body, method="POST", + headers=headers or {}, + ) + with urllib.request.urlopen(req, timeout=5) as resp: + return resp.status, json.loads(resp.read()) + + def _get(self, path: str) -> tuple[int, dict[str, object]]: + with urllib.request.urlopen(f"http://127.0.0.1:{self.port}{path}", timeout=5) as r: + return r.status, json.loads(r.read()) + + def test_webhook_dispatches(self): + self._serve() + body = json.dumps(_ISSUE_ASSIGNED).encode() + status, payload = self._post("/webhook", body, {"X-Gitea-Event": "issues"}) + self.assertEqual(200, status) + self.assertTrue(payload["handled"]) + self.assertEqual(1, len(self.orch.events)) + + def test_unhandled_event_ok_but_not_handled(self): + self._serve() + body = json.dumps({"action": "push"}).encode() + _status, payload = self._post("/webhook", body, {"X-Gitea-Event": "push"}) + self.assertFalse(payload["handled"]) + self.assertEqual([], self.orch.events) + + def test_invalid_json_400(self): + self._serve() + with self.assertRaises(HTTPError) as ctx: + self._post("/webhook", b"{not json", {"X-Gitea-Event": "issues"}) + self.assertEqual(400, ctx.exception.code) + + def test_bad_signature_rejected(self): + self._serve(secret=b"sekret") + body = json.dumps(_ISSUE_ASSIGNED).encode() + with self.assertRaises(HTTPError) as ctx: + self._post("/webhook", body, + {"X-Gitea-Event": "issues", "X-Gitea-Signature": "deadbeef"}) + self.assertEqual(401, ctx.exception.code) + self.assertEqual([], self.orch.events) + + def test_good_signature_accepted(self): + self._serve(secret=b"sekret") + body = json.dumps(_ISSUE_ASSIGNED).encode() + sig = hmac.new(b"sekret", body, hashlib.sha256).hexdigest() + status, _payload = self._post( + "/webhook", body, {"X-Gitea-Event": "issues", "X-Gitea-Signature": sig}) + self.assertEqual(200, status) + self.assertEqual(1, len(self.orch.events)) + + def test_healthz(self): + self._serve() + self.assertEqual(200, self._get("/healthz")[0]) + + def test_unknown_path_404(self): + self._serve() + with self.assertRaises(HTTPError) as ctx: + self._post("/nope", b"{}", {"X-Gitea-Event": "issues"}) + self.assertEqual(404, ctx.exception.code) + + def test_provenance_returns_record_and_ops(self): + store = InMemoryStateStore() + store.upsert(RunRecord(owner="didericis", repo="bot-bottle", issue_number=17, + slug="impl-17", agent_name="impl", bottle_names=["claude"])) + + def reader(rec: object) -> list[dict[str, object]]: # pylint: disable=unused-argument + return [{"at": "T", "op": "post_comment", "target": 17, "detail": "ok"}] + + self._serve(store=store, op_log_reader=reader) + status, payload = self._get("/provenance?owner=didericis&repo=bot-bottle&issue=17") + self.assertEqual(200, status) + self.assertEqual("impl-17", payload["slug"]) + self.assertEqual(1, len(payload["ops"])) # type: ignore[arg-type] + + def test_provenance_missing_params_400(self): + self._serve() + with self.assertRaises(HTTPError) as ctx: + self._get("/provenance?owner=didericis") + self.assertEqual(400, ctx.exception.code) + + def test_provenance_unknown_run_404(self): + self._serve() + with self.assertRaises(HTTPError) as ctx: + self._get("/provenance?owner=x&repo=y&issue=1") + self.assertEqual(404, ctx.exception.code) + + +if __name__ == "__main__": + unittest.main()