feat: fold bot-bottle-orchestrator into bot_bottle/orchestrator subpackage
Moves the orchestrator into bot_bottle/orchestrator/ so one install gets everything. Entry point is now `python -m bot_bottle.orchestrator run`. - Add bot_bottle/orchestrator/ with all 14 modules (verbatim move; internal imports were already relative, so no changes inside orchestrator modules) - Rewrite bootstrap.py: remove the lazy bot_bottle import guard, use direct relative imports from ..contrib.* - Add bot_bottle/contrib/forge/base.py: ScopedForge (read-anywhere / write-scoped) - Add bot_bottle/contrib/gitea/client.py: GiteaClient + GiteaForge (urllib.request only) - Add bot_bottle/contrib/gitea/forge_state.py: ForgeState + SqliteForgeStateStore - Add tests/unit/orchestrator/ (82 tests: 63 migrated + 19 new for contrib modules) Closes #321
This commit is contained in:
@@ -0,0 +1,52 @@
|
||||
"""Scoped forge wrapper: read-anywhere / write-scoped access control.
|
||||
|
||||
`ScopedForge` wraps any forge object and restricts write operations to
|
||||
the set of issue/PR numbers the agent is explicitly assigned to. Read
|
||||
operations always pass through unconditionally.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
|
||||
class ScopedForge:
|
||||
"""Delegates all forge calls to an inner forge, raising `PermissionError`
|
||||
on write calls for numbers outside the assigned scope."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
forge: Any,
|
||||
*,
|
||||
assigned_issue: int,
|
||||
assigned_prs: list[int],
|
||||
) -> None:
|
||||
self._forge = forge
|
||||
self._allowed_writes: frozenset[int] = frozenset({assigned_issue, *assigned_prs})
|
||||
|
||||
def _check_write(self, number: int) -> None:
|
||||
if number not in self._allowed_writes:
|
||||
raise PermissionError(
|
||||
f"write to #{number} is outside the assigned scope "
|
||||
f"(allowed: {sorted(self._allowed_writes)})"
|
||||
)
|
||||
|
||||
def is_org_member(self, org: str, username: str) -> bool:
|
||||
return self._forge.is_org_member(org, username)
|
||||
|
||||
def read_issue(self, number: int) -> dict[str, Any]:
|
||||
return self._forge.read_issue(number)
|
||||
|
||||
def read_pr(self, number: int) -> dict[str, Any]:
|
||||
return self._forge.read_pr(number)
|
||||
|
||||
def read_comments(self, number: int) -> list[dict[str, Any]]:
|
||||
return self._forge.read_comments(number)
|
||||
|
||||
def post_comment(self, number: int, body: str) -> None:
|
||||
self._check_write(number)
|
||||
self._forge.post_comment(number, body)
|
||||
|
||||
def update_description(self, number: int, body: str) -> None:
|
||||
self._check_write(number)
|
||||
self._forge.update_description(number, body)
|
||||
@@ -0,0 +1,112 @@
|
||||
"""Gitea API client and forge adapter (PRD prd-new: fold orchestrator).
|
||||
|
||||
`GiteaClient` is a thin HTTP wrapper (stdlib `urllib.request` only — no
|
||||
new runtime dependencies). `GiteaForge` composes a client and exposes
|
||||
the forge protocol used by the orchestrator's sidecar and lifecycle.
|
||||
|
||||
Required Gitea token scopes:
|
||||
- Repository: Read & Write (issues, comments, PR descriptions)
|
||||
- Organization: Read (org membership check)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
from typing import Any
|
||||
|
||||
_TIMEOUT_SECS = 30
|
||||
|
||||
|
||||
class GiteaClient:
|
||||
"""Low-level HTTP wrapper for the Gitea REST API."""
|
||||
|
||||
def __init__(
|
||||
self, *, api_url: str, owner: str, repo: str, token: str
|
||||
) -> None:
|
||||
self._base = api_url.rstrip("/")
|
||||
self._owner = owner
|
||||
self._repo = repo
|
||||
self._headers = {
|
||||
"Authorization": f"token {token}",
|
||||
"Content-Type": "application/json",
|
||||
"Accept": "application/json",
|
||||
}
|
||||
|
||||
def _request(
|
||||
self,
|
||||
method: str,
|
||||
path: str,
|
||||
body: dict[str, Any] | None = None,
|
||||
) -> Any:
|
||||
url = f"{self._base}{path}"
|
||||
data = json.dumps(body).encode() if body is not None else None
|
||||
req = urllib.request.Request(
|
||||
url, data=data, headers=self._headers, method=method
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=_TIMEOUT_SECS) as resp:
|
||||
raw = resp.read()
|
||||
return json.loads(raw) if raw else None
|
||||
|
||||
def is_org_member(self, org: str, username: str) -> bool:
|
||||
url = f"{self._base}/orgs/{org}/members/{username}"
|
||||
req = urllib.request.Request(url, headers=self._headers, method="GET")
|
||||
try:
|
||||
urllib.request.urlopen(req, timeout=_TIMEOUT_SECS).close()
|
||||
return True
|
||||
except urllib.error.HTTPError:
|
||||
return False
|
||||
|
||||
def get_issue(self, number: int) -> dict[str, Any]:
|
||||
return self._request("GET", f"/repos/{self._owner}/{self._repo}/issues/{number}")
|
||||
|
||||
def get_pull(self, number: int) -> dict[str, Any]:
|
||||
return self._request("GET", f"/repos/{self._owner}/{self._repo}/pulls/{number}")
|
||||
|
||||
def list_comments(self, number: int) -> list[dict[str, Any]]:
|
||||
return self._request("GET", f"/repos/{self._owner}/{self._repo}/issues/{number}/comments")
|
||||
|
||||
def create_comment(self, number: int, body: str) -> None:
|
||||
self._request(
|
||||
"POST",
|
||||
f"/repos/{self._owner}/{self._repo}/issues/{number}/comments",
|
||||
{"body": body},
|
||||
)
|
||||
|
||||
def update_issue(self, number: int, body: str) -> None:
|
||||
self._request(
|
||||
"PATCH",
|
||||
f"/repos/{self._owner}/{self._repo}/issues/{number}",
|
||||
{"body": body},
|
||||
)
|
||||
|
||||
|
||||
class GiteaForge:
|
||||
"""Adapts `GiteaClient` to the forge protocol expected by the orchestrator.
|
||||
|
||||
The forge protocol is duck-typed: any object with `is_org_member`,
|
||||
`read_issue`, `read_pr`, `read_comments`, `post_comment`, and
|
||||
`update_description` methods satisfies it.
|
||||
"""
|
||||
|
||||
def __init__(self, client: GiteaClient) -> None:
|
||||
self._client = client
|
||||
|
||||
def is_org_member(self, org: str, username: str) -> bool:
|
||||
return self._client.is_org_member(org, username)
|
||||
|
||||
def read_issue(self, number: int) -> dict[str, Any]:
|
||||
return self._client.get_issue(number)
|
||||
|
||||
def read_pr(self, number: int) -> dict[str, Any]:
|
||||
return self._client.get_pull(number)
|
||||
|
||||
def read_comments(self, number: int) -> list[dict[str, Any]]:
|
||||
return self._client.list_comments(number)
|
||||
|
||||
def post_comment(self, number: int, body: str) -> None:
|
||||
self._client.create_comment(number, body)
|
||||
|
||||
def update_description(self, number: int, body: str) -> None:
|
||||
self._client.update_issue(number, body)
|
||||
@@ -0,0 +1,137 @@
|
||||
"""Forge state persistence for the orchestrator (PRD prd-new: fold orchestrator).
|
||||
|
||||
`ForgeState` is a dataclass that mirrors the orchestrator's `RunRecord`
|
||||
field-for-field, held here so the store implementation is in bot-bottle
|
||||
where the Gitea contrib lives.
|
||||
|
||||
`SqliteForgeStateStore` backs it with a single SQLite table. The DB path
|
||||
is optional; passing `None` uses `:memory:` (useful for tests and status
|
||||
commands that don't need persistence).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sqlite3
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
@dataclass
|
||||
class ForgeState:
|
||||
"""Persisted state for one forge-targeted issue's bottle lifecycle."""
|
||||
|
||||
owner: str
|
||||
repo: str
|
||||
issue_number: int
|
||||
slug: str
|
||||
agent_name: str
|
||||
bottle_names: list[str] = field(default_factory=list)
|
||||
backend_name: str = ""
|
||||
agent_git_user: str = ""
|
||||
pr_number: int | None = None
|
||||
status: str = ""
|
||||
last_checkin_at: str = ""
|
||||
|
||||
|
||||
_DDL = """
|
||||
CREATE TABLE IF NOT EXISTS forge_state (
|
||||
owner TEXT NOT NULL,
|
||||
repo TEXT NOT NULL,
|
||||
issue_number INTEGER NOT NULL,
|
||||
slug TEXT NOT NULL,
|
||||
agent_name TEXT NOT NULL,
|
||||
bottle_names TEXT NOT NULL DEFAULT '[]',
|
||||
backend_name TEXT NOT NULL DEFAULT '',
|
||||
agent_git_user TEXT NOT NULL DEFAULT '',
|
||||
pr_number INTEGER,
|
||||
status TEXT NOT NULL DEFAULT '',
|
||||
last_checkin_at TEXT NOT NULL DEFAULT '',
|
||||
PRIMARY KEY (owner, repo, issue_number)
|
||||
)
|
||||
"""
|
||||
|
||||
|
||||
class SqliteForgeStateStore:
|
||||
"""SQLite-backed `ForgeState` store.
|
||||
|
||||
Thread-safety: a single connection is used; callers that share a
|
||||
store across threads must serialise access externally.
|
||||
"""
|
||||
|
||||
def __init__(self, db_path: Path | None) -> None:
|
||||
path = str(db_path) if db_path is not None else ":memory:"
|
||||
self._conn = sqlite3.connect(path, check_same_thread=False)
|
||||
self._conn.row_factory = sqlite3.Row
|
||||
self._conn.execute(_DDL)
|
||||
self._conn.commit()
|
||||
|
||||
def upsert(self, state: ForgeState) -> None:
|
||||
self._conn.execute(
|
||||
"""
|
||||
INSERT INTO forge_state
|
||||
(owner, repo, issue_number, slug, agent_name,
|
||||
bottle_names, backend_name, agent_git_user,
|
||||
pr_number, status, last_checkin_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(owner, repo, issue_number) DO UPDATE SET
|
||||
slug = excluded.slug,
|
||||
agent_name = excluded.agent_name,
|
||||
bottle_names = excluded.bottle_names,
|
||||
backend_name = excluded.backend_name,
|
||||
agent_git_user = excluded.agent_git_user,
|
||||
pr_number = excluded.pr_number,
|
||||
status = excluded.status,
|
||||
last_checkin_at = excluded.last_checkin_at
|
||||
""",
|
||||
(
|
||||
state.owner,
|
||||
state.repo,
|
||||
state.issue_number,
|
||||
state.slug,
|
||||
state.agent_name,
|
||||
json.dumps(state.bottle_names),
|
||||
state.backend_name,
|
||||
state.agent_git_user,
|
||||
state.pr_number,
|
||||
state.status,
|
||||
state.last_checkin_at,
|
||||
),
|
||||
)
|
||||
self._conn.commit()
|
||||
|
||||
def get(self, owner: str, repo: str, issue_number: int) -> ForgeState | None:
|
||||
row = self._conn.execute(
|
||||
"SELECT * FROM forge_state WHERE owner=? AND repo=? AND issue_number=?",
|
||||
(owner, repo, issue_number),
|
||||
).fetchone()
|
||||
return _row_to_state(row) if row is not None else None
|
||||
|
||||
def delete(self, owner: str, repo: str, issue_number: int) -> None:
|
||||
self._conn.execute(
|
||||
"DELETE FROM forge_state WHERE owner=? AND repo=? AND issue_number=?",
|
||||
(owner, repo, issue_number),
|
||||
)
|
||||
self._conn.commit()
|
||||
|
||||
def all(self) -> list[ForgeState]:
|
||||
rows = self._conn.execute(
|
||||
"SELECT * FROM forge_state ORDER BY owner, repo, issue_number"
|
||||
).fetchall()
|
||||
return [_row_to_state(r) for r in rows]
|
||||
|
||||
|
||||
def _row_to_state(row: sqlite3.Row) -> ForgeState:
|
||||
return ForgeState(
|
||||
owner=row["owner"],
|
||||
repo=row["repo"],
|
||||
issue_number=row["issue_number"],
|
||||
slug=row["slug"],
|
||||
agent_name=row["agent_name"],
|
||||
bottle_names=json.loads(row["bottle_names"]),
|
||||
backend_name=row["backend_name"],
|
||||
agent_git_user=row["agent_git_user"],
|
||||
pr_number=row["pr_number"],
|
||||
status=row["status"],
|
||||
last_checkin_at=row["last_checkin_at"],
|
||||
)
|
||||
@@ -0,0 +1,8 @@
|
||||
"""bot-bottle-orchestrator: forge-native orchestration for bot-bottle.
|
||||
|
||||
The package is stdlib-only. The core (events, targeting, lifecycle,
|
||||
watchdog, sidecar, webhook) depends on its collaborators — a forge, a
|
||||
state store, a bottle runner — through duck-typed interfaces, so it runs
|
||||
and tests without bot-bottle installed. `bootstrap` is the single module
|
||||
that imports `bot_bottle` and wires the concrete implementations.
|
||||
"""
|
||||
@@ -0,0 +1,51 @@
|
||||
"""CLI entry point: `python -m bot_bottle.orchestrator <command>`.
|
||||
|
||||
Commands:
|
||||
run start the webhook server + watchdog + done-signal relay
|
||||
status print the tracked runs (issue -> slug, status)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
|
||||
from .config import Config
|
||||
|
||||
|
||||
def main(argv: list[str] | None = None) -> int:
|
||||
parser = argparse.ArgumentParser(prog="python -m bot_bottle.orchestrator")
|
||||
sub = parser.add_subparsers(dest="command", required=True)
|
||||
sub.add_parser("run", help="start the webhook server, watchdog, and relay")
|
||||
sub.add_parser("status", help="list tracked runs")
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
config = Config.from_env()
|
||||
|
||||
if args.command == "run":
|
||||
from . import bootstrap # pylint: disable=import-outside-toplevel
|
||||
|
||||
print(
|
||||
f"orchestrator listening on "
|
||||
f"http://{config.webhook_host}:{config.webhook_port}/webhook",
|
||||
file=sys.stderr,
|
||||
)
|
||||
bootstrap.run(config)
|
||||
return 0
|
||||
|
||||
if args.command == "status":
|
||||
from .bootstrap import ( # pylint: disable=import-outside-toplevel
|
||||
BotBottleStateStore,
|
||||
)
|
||||
|
||||
store = BotBottleStateStore(config.db_path)
|
||||
for r in store.all():
|
||||
pr = f"PR#{r.pr_number}" if r.pr_number else "-"
|
||||
print(f"{r.owner}/{r.repo}#{r.issue_number}\t{r.slug}\t{r.status}\t{pr}")
|
||||
return 0
|
||||
|
||||
return 2
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@@ -0,0 +1,155 @@
|
||||
"""Wire the concrete bot-bottle implementations into the core.
|
||||
|
||||
This is the ONLY module that imports from `bot_bottle.contrib`. It adapts
|
||||
`SqliteForgeStateStore` to our `StateStore`, builds `GiteaForge`s (and
|
||||
scope-wrapped forges for sidecars), constructs the `Orchestrator`, and
|
||||
runs the webhook server + watchdog + done-signal relay.
|
||||
|
||||
Imports are direct (no lazy loading) because the orchestrator is now part
|
||||
of the same package installation.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import threading
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from ..contrib.forge.base import ScopedForge
|
||||
from ..contrib.gitea.client import GiteaClient, GiteaForge
|
||||
from ..contrib.gitea.forge_state import ForgeState, SqliteForgeStateStore
|
||||
from .config import Config
|
||||
from .lifecycle import Orchestrator
|
||||
from .model import RunRecord
|
||||
from .runner import SubprocessBottleRunner
|
||||
from .sidecar import ForgeSidecar, OpLog, drain_done_events
|
||||
from .watchdog import Watchdog
|
||||
from .webhook import WebhookServer
|
||||
|
||||
_RELAY_TICK_SECS = 2.0
|
||||
|
||||
|
||||
def _token() -> str:
|
||||
tok = os.environ.get("GITEA_TOKEN") or os.environ.get("FORGE_GITEA_TOKEN")
|
||||
if not tok:
|
||||
raise RuntimeError("set GITEA_TOKEN (or FORGE_GITEA_TOKEN)")
|
||||
return tok
|
||||
|
||||
|
||||
class BotBottleStateStore:
|
||||
"""Adapts `SqliteForgeStateStore` to our `StateStore`, translating
|
||||
`RunRecord` <-> `ForgeState` field-for-field."""
|
||||
|
||||
def __init__(self, db_path: Path | None) -> None:
|
||||
self._inner = SqliteForgeStateStore(db_path)
|
||||
|
||||
def upsert(self, record: RunRecord) -> None:
|
||||
self._inner.upsert(_to_forge_state(record))
|
||||
|
||||
def get(self, owner: str, repo: str, issue_number: int) -> RunRecord | None:
|
||||
state = self._inner.get(owner, repo, issue_number)
|
||||
return _to_record(state) if state is not None else None
|
||||
|
||||
def delete(self, owner: str, repo: str, issue_number: int) -> None:
|
||||
self._inner.delete(owner, repo, issue_number)
|
||||
|
||||
def all(self) -> list[RunRecord]:
|
||||
return [_to_record(s) for s in self._inner.all()]
|
||||
|
||||
|
||||
def _to_forge_state(r: RunRecord) -> ForgeState:
|
||||
return ForgeState(
|
||||
owner=r.owner, repo=r.repo, issue_number=r.issue_number, slug=r.slug,
|
||||
agent_name=r.agent_name, bottle_names=list(r.bottle_names),
|
||||
backend_name=r.backend_name, agent_git_user=r.agent_git_user,
|
||||
pr_number=r.pr_number, status=r.status, last_checkin_at=r.last_checkin_at,
|
||||
)
|
||||
|
||||
|
||||
def _to_record(s: ForgeState) -> RunRecord:
|
||||
return RunRecord(
|
||||
owner=s.owner, repo=s.repo, issue_number=s.issue_number, slug=s.slug,
|
||||
agent_name=s.agent_name, bottle_names=list(s.bottle_names),
|
||||
backend_name=s.backend_name, agent_git_user=s.agent_git_user,
|
||||
pr_number=s.pr_number, status=s.status, last_checkin_at=s.last_checkin_at,
|
||||
)
|
||||
|
||||
|
||||
def make_forge(config: Config, owner: str, repo: str) -> Any:
|
||||
"""A `GiteaForge` bound to one repo."""
|
||||
client = GiteaClient(
|
||||
api_url=config.gitea_api, owner=owner, repo=repo, token=_token()
|
||||
)
|
||||
return GiteaForge(client)
|
||||
|
||||
|
||||
def make_sidecar(
|
||||
config: Config, owner: str, repo: str, issue_number: int, assigned_prs: list[int]
|
||||
) -> ForgeSidecar:
|
||||
"""A scope-enforced sidecar for one run (read-anywhere / write-scoped)."""
|
||||
scoped = ScopedForge(
|
||||
make_forge(config, owner, repo),
|
||||
assigned_issue=issue_number,
|
||||
assigned_prs=assigned_prs,
|
||||
)
|
||||
op_log = OpLog(config.queue_dir / f"{owner}-{repo}-{issue_number}.oplog.jsonl")
|
||||
return ForgeSidecar(
|
||||
forge=scoped,
|
||||
op_log=op_log,
|
||||
queue_dir=config.queue_dir,
|
||||
run_key=(owner, repo, issue_number),
|
||||
)
|
||||
|
||||
|
||||
def build(config: Config) -> tuple[WebhookServer, Watchdog, Orchestrator]:
|
||||
store = BotBottleStateStore(config.db_path)
|
||||
runner = SubprocessBottleRunner(cli=config.bot_bottle_cli, base_env=dict(os.environ))
|
||||
membership_forge = make_forge(config, "_", "_")
|
||||
orchestrator = Orchestrator(
|
||||
forge=membership_forge,
|
||||
store=store,
|
||||
runner=runner,
|
||||
org=config.forge_org,
|
||||
gitea_api=config.gitea_api,
|
||||
forge_env_base={
|
||||
"GITEA_TOKEN": _token(),
|
||||
"FORGE_QUEUE_DIR": str(config.queue_dir),
|
||||
"FORGE_SIDECAR_SOCKET": str(config.sidecar_socket),
|
||||
},
|
||||
)
|
||||
watchdog = Watchdog(
|
||||
store=store, runner=runner, timeout_secs=config.watchdog_timeout_secs
|
||||
)
|
||||
server = WebhookServer(
|
||||
(config.webhook_host, config.webhook_port),
|
||||
orchestrator=orchestrator,
|
||||
store=store,
|
||||
)
|
||||
return server, watchdog, orchestrator
|
||||
|
||||
|
||||
def _relay_loop(config: Config, orchestrator: Orchestrator, stop: threading.Event) -> None:
|
||||
while not stop.wait(_RELAY_TICK_SECS):
|
||||
for ev in drain_done_events(config.queue_dir):
|
||||
orchestrator.on_done_signal(
|
||||
ev["owner"], ev["repo"], int(ev["issue_number"]),
|
||||
str(ev.get("status", "")), str(ev.get("summary", "")),
|
||||
)
|
||||
|
||||
|
||||
def run(config: Config) -> None:
|
||||
"""Blocking run: webhook server + watchdog + done-signal relay."""
|
||||
server, watchdog, orchestrator = build(config)
|
||||
watchdog.start()
|
||||
stop = threading.Event()
|
||||
relay = threading.Thread(
|
||||
target=_relay_loop, args=(config, orchestrator, stop), daemon=True
|
||||
)
|
||||
relay.start()
|
||||
try:
|
||||
server.serve_forever()
|
||||
finally:
|
||||
stop.set()
|
||||
watchdog.stop()
|
||||
server.server_close()
|
||||
@@ -0,0 +1,52 @@
|
||||
"""Configuration, loaded from the environment (stdlib `os` only).
|
||||
|
||||
Everything the orchestrator needs to run is an env var so a deploy is a
|
||||
process with an environment, no config file to manage. `FORGE_*` names
|
||||
match the bot-bottle forge-native PRD.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
# The label that marks an issue as agent-targeted: `bot-bottle:<agent>`.
|
||||
LABEL_PREFIX = "bot-bottle:"
|
||||
# Optional bottle override: `bot-bottle-bottle:<name>`.
|
||||
BOTTLE_LABEL_PREFIX = "bot-bottle-bottle:"
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Config:
|
||||
"""Resolved orchestrator configuration."""
|
||||
|
||||
forge_org: str
|
||||
gitea_api: str
|
||||
watchdog_timeout_secs: int
|
||||
webhook_host: str
|
||||
webhook_port: int
|
||||
bot_bottle_cli: str
|
||||
queue_dir: Path
|
||||
sidecar_socket: Path
|
||||
db_path: Path | None
|
||||
|
||||
@staticmethod
|
||||
def from_env(env: dict[str, str] | None = None) -> "Config":
|
||||
e = os.environ if env is None else env
|
||||
home = Path(e.get("HOME", str(Path.home())))
|
||||
default_root = home / ".bot-bottle"
|
||||
db = e.get("FORGE_DB_PATH")
|
||||
return Config(
|
||||
forge_org=e.get("FORGE_ORG", "bot-bottle"),
|
||||
gitea_api=e.get("FORGE_GITEA_API", ""),
|
||||
watchdog_timeout_secs=int(e.get("FORGE_WATCHDOG_TIMEOUT", "1800")),
|
||||
webhook_host=e.get("FORGE_WEBHOOK_HOST", "127.0.0.1"),
|
||||
webhook_port=int(e.get("FORGE_WEBHOOK_PORT", "8477")),
|
||||
bot_bottle_cli=e.get("BOT_BOTTLE_CLI", "cli.py"),
|
||||
queue_dir=Path(e.get("FORGE_QUEUE_DIR", str(default_root / "forge-queue"))),
|
||||
sidecar_socket=Path(
|
||||
e.get("FORGE_SIDECAR_SOCKET", str(default_root / "forge-sidecar.sock"))
|
||||
),
|
||||
db_path=Path(db) if db else None,
|
||||
)
|
||||
@@ -0,0 +1,85 @@
|
||||
"""Parse Gitea webhook payloads into typed `ForgeEvent`s.
|
||||
|
||||
Only the fields the orchestrator acts on are extracted; unknown payloads
|
||||
and event types return None so the webhook layer can ignore them.
|
||||
|
||||
Gitea sends the event kind in the `X-Gitea-Event` header and the payload
|
||||
as JSON. The relevant kinds:
|
||||
|
||||
- `issues` with `action == "assigned"` -> IssueAssigned
|
||||
- `issue_comment` with `action == "created"` -> CommentCreated
|
||||
- `pull_request` with `action == "closed"` -> PullRequestClosed
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from .model import CommentCreated, ForgeEvent, IssueAssigned, PullRequestClosed
|
||||
|
||||
|
||||
def _repo_owner(payload: dict[str, Any]) -> tuple[str, str]:
|
||||
repo = payload.get("repository") or {}
|
||||
owner = (repo.get("owner") or {}).get("login", "")
|
||||
return str(owner), str(repo.get("name", ""))
|
||||
|
||||
|
||||
def parse_event(event_kind: str, payload: dict[str, Any]) -> ForgeEvent | None:
|
||||
"""Map (X-Gitea-Event, payload) to a `ForgeEvent`, or None to ignore."""
|
||||
if event_kind == "issues":
|
||||
return _parse_issue(payload)
|
||||
if event_kind == "issue_comment":
|
||||
return _parse_comment(payload)
|
||||
if event_kind == "pull_request":
|
||||
return _parse_pull_request(payload)
|
||||
return None
|
||||
|
||||
|
||||
def _parse_issue(payload: dict[str, Any]) -> IssueAssigned | None:
|
||||
if payload.get("action") != "assigned":
|
||||
return None
|
||||
owner, repo = _repo_owner(payload)
|
||||
issue = payload.get("issue") or {}
|
||||
assignees = tuple(
|
||||
str(a.get("login", "")) for a in (issue.get("assignees") or [])
|
||||
)
|
||||
labels = tuple(str(l.get("name", "")) for l in (issue.get("labels") or []))
|
||||
return IssueAssigned(
|
||||
owner=owner,
|
||||
repo=repo,
|
||||
issue_number=int(issue.get("number", 0)),
|
||||
title=str(issue.get("title", "")),
|
||||
body=str(issue.get("body", "") or ""),
|
||||
assignees=assignees,
|
||||
labels=labels,
|
||||
)
|
||||
|
||||
|
||||
def _parse_comment(payload: dict[str, Any]) -> CommentCreated | None:
|
||||
if payload.get("action") != "created":
|
||||
return None
|
||||
owner, repo = _repo_owner(payload)
|
||||
issue = payload.get("issue") or {}
|
||||
comment = payload.get("comment") or {}
|
||||
return CommentCreated(
|
||||
owner=owner,
|
||||
repo=repo,
|
||||
issue_number=int(issue.get("number", 0)),
|
||||
comment_id=int(comment.get("id", 0)),
|
||||
author=str((comment.get("user") or {}).get("login", "")),
|
||||
body=str(comment.get("body", "") or ""),
|
||||
is_pull=bool(issue.get("pull_request")),
|
||||
)
|
||||
|
||||
|
||||
def _parse_pull_request(payload: dict[str, Any]) -> PullRequestClosed | None:
|
||||
if payload.get("action") != "closed":
|
||||
return None
|
||||
owner, repo = _repo_owner(payload)
|
||||
pr = payload.get("pull_request") or {}
|
||||
return PullRequestClosed(
|
||||
owner=owner,
|
||||
repo=repo,
|
||||
pr_number=int(pr.get("number", 0)),
|
||||
merged=bool(pr.get("merged", False)),
|
||||
)
|
||||
@@ -0,0 +1,180 @@
|
||||
"""The orchestration lifecycle: forge events -> bottle transitions.
|
||||
|
||||
`Orchestrator.handle(event)` is the single entry point the webhook layer
|
||||
calls. `on_done_signal(...)` is called by the sidecar relay when an agent
|
||||
signals completion. All collaborators (forge, store, runner) are
|
||||
injected and duck-typed; `now` and `label_for` are injectable for tests.
|
||||
|
||||
Transitions:
|
||||
IssueAssigned (targeted, new) -> start bottle, record = running
|
||||
signal_done (running) -> freeze bottle, record = frozen
|
||||
CommentCreated (frozen) -> resume bottle, record = running
|
||||
PullRequestClosed (tracked) -> destroy bottle, record removed
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Callable
|
||||
from datetime import datetime
|
||||
|
||||
from .model import (
|
||||
STATUS_DESTROYED,
|
||||
STATUS_FROZEN,
|
||||
STATUS_RUNNING,
|
||||
CommentCreated,
|
||||
ForgeEvent,
|
||||
IssueAssigned,
|
||||
PullRequestClosed,
|
||||
RunRecord,
|
||||
)
|
||||
from .runner import BottleRunner
|
||||
from .store import StateStore
|
||||
from .targeting import Membership, Target, resolve_target
|
||||
|
||||
|
||||
def _iso_now() -> str:
|
||||
return datetime.now().astimezone().isoformat(timespec="seconds")
|
||||
|
||||
|
||||
def _default_label(agent: str, event: IssueAssigned) -> str:
|
||||
# Embed the issue identity so slugs are unique per issue and never
|
||||
# get renamed on collision.
|
||||
return f"{agent}-{event.owner}-{event.repo}-{event.issue_number}"
|
||||
|
||||
|
||||
class Orchestrator:
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
forge: Membership,
|
||||
store: StateStore,
|
||||
runner: BottleRunner,
|
||||
org: str,
|
||||
gitea_api: str = "",
|
||||
forge_env_base: dict[str, str] | None = None,
|
||||
now: Callable[[], str] = _iso_now,
|
||||
label_for: Callable[[str, IssueAssigned], str] = _default_label,
|
||||
) -> None:
|
||||
self._forge = forge
|
||||
self._store = store
|
||||
self._runner = runner
|
||||
self._org = org
|
||||
self._gitea_api = gitea_api
|
||||
self._forge_env_base = forge_env_base or {}
|
||||
self._now = now
|
||||
self._label_for = label_for
|
||||
|
||||
# --- entry points ------------------------------------------------------
|
||||
|
||||
def handle(self, event: ForgeEvent) -> None:
|
||||
if isinstance(event, IssueAssigned):
|
||||
self._on_issue_assigned(event)
|
||||
elif isinstance(event, CommentCreated):
|
||||
self._on_comment(event)
|
||||
else:
|
||||
self._on_pr_closed(event)
|
||||
|
||||
def on_done_signal( # pylint: disable=unused-argument
|
||||
self, owner: str, repo: str, issue_number: int, status: str, summary: str
|
||||
) -> None:
|
||||
"""Sidecar relay: an agent signalled completion. Freeze the bottle.
|
||||
`status`/`summary` are recorded by provenance (via the op log), not
|
||||
acted on here."""
|
||||
record = self._store.get(owner, repo, issue_number)
|
||||
if record is None or record.status != STATUS_RUNNING:
|
||||
return
|
||||
self._runner.freeze(record.slug)
|
||||
record.status = STATUS_FROZEN
|
||||
record.last_checkin_at = self._now()
|
||||
self._store.upsert(record)
|
||||
|
||||
def link_pr(self, owner: str, repo: str, issue_number: int, pr_number: int) -> None:
|
||||
"""Record the PR a tracked issue produced, so PR comments and the
|
||||
PR-close event route back to this record."""
|
||||
record = self._store.get(owner, repo, issue_number)
|
||||
if record is not None:
|
||||
record.pr_number = pr_number
|
||||
self._store.upsert(record)
|
||||
|
||||
# --- handlers ----------------------------------------------------------
|
||||
|
||||
def _on_issue_assigned(self, event: IssueAssigned) -> None:
|
||||
target = resolve_target(event, self._forge, self._org)
|
||||
if target is None:
|
||||
return
|
||||
# Idempotent: a webhook redelivery must not launch a second bottle.
|
||||
if self._store.get(event.owner, event.repo, event.issue_number) is not None:
|
||||
return
|
||||
self._launch(event, target)
|
||||
|
||||
def _launch(self, event: IssueAssigned, target: Target) -> None:
|
||||
label = self._label_for(target.agent_name, event)
|
||||
bottles = [target.bottle_override] if target.bottle_override else []
|
||||
result = self._runner.start(
|
||||
agent=target.agent_name,
|
||||
bottles=bottles,
|
||||
label=label,
|
||||
prompt=event.body,
|
||||
forge_env=self._forge_env(event.owner, event.repo, event.issue_number),
|
||||
)
|
||||
self._store.upsert(
|
||||
RunRecord(
|
||||
owner=event.owner,
|
||||
repo=event.repo,
|
||||
issue_number=event.issue_number,
|
||||
slug=result.slug,
|
||||
agent_name=target.agent_name,
|
||||
bottle_names=bottles,
|
||||
status=STATUS_RUNNING,
|
||||
last_checkin_at=self._now(),
|
||||
)
|
||||
)
|
||||
|
||||
def _on_comment(self, event: CommentCreated) -> None:
|
||||
record = self._route_comment(event)
|
||||
if record is None or record.status != STATUS_FROZEN:
|
||||
return
|
||||
# Echo-loop guard: ignore the agent's own comments.
|
||||
if record.agent_git_user and event.author == record.agent_git_user:
|
||||
return
|
||||
self._runner.resume(record.slug, event.body)
|
||||
record.status = STATUS_RUNNING
|
||||
record.last_checkin_at = self._now()
|
||||
self._store.upsert(record)
|
||||
|
||||
def _route_comment(self, event: CommentCreated) -> RunRecord | None:
|
||||
# A comment on the issue routes by issue number; a comment on a PR
|
||||
# routes by the recorded pr_number.
|
||||
direct = self._store.get(event.owner, event.repo, event.issue_number)
|
||||
if direct is not None:
|
||||
return direct
|
||||
if event.is_pull:
|
||||
return self._find_by_pr(event.owner, event.repo, event.issue_number)
|
||||
return None
|
||||
|
||||
def _on_pr_closed(self, event: PullRequestClosed) -> None:
|
||||
record = self._find_by_pr(event.owner, event.repo, event.pr_number)
|
||||
if record is None:
|
||||
return
|
||||
self._runner.destroy(record.slug)
|
||||
record.status = STATUS_DESTROYED
|
||||
self._store.delete(record.owner, record.repo, record.issue_number)
|
||||
|
||||
def _find_by_pr(self, owner: str, repo: str, pr_number: int) -> RunRecord | None:
|
||||
for record in self._store.all():
|
||||
if (
|
||||
record.owner == owner
|
||||
and record.repo == repo
|
||||
and record.pr_number == pr_number
|
||||
):
|
||||
return record
|
||||
return None
|
||||
|
||||
def _forge_env(self, owner: str, repo: str, issue_number: int) -> dict[str, str]:
|
||||
env = dict(self._forge_env_base)
|
||||
if self._gitea_api:
|
||||
env["FORGE_GITEA_API"] = self._gitea_api
|
||||
env["FORGE_OWNER"] = owner
|
||||
env["FORGE_REPO"] = repo
|
||||
env["FORGE_ISSUE_NUMBER"] = str(issue_number)
|
||||
return env
|
||||
@@ -0,0 +1,108 @@
|
||||
"""Domain model: run records, forge events, provenance.
|
||||
|
||||
These are the orchestrator's own dataclasses. `RunRecord` mirrors
|
||||
bot-bottle's `ForgeState` field-for-field so the bootstrap adapter can
|
||||
translate between them with no loss; keeping our own copy is what lets
|
||||
the core stay import-free of bot-bottle.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
# Run lifecycle. A bottle is launched (running), frozen on the done
|
||||
# signal, and destroyed when the PR closes.
|
||||
STATUS_RUNNING = "running"
|
||||
STATUS_FROZEN = "frozen"
|
||||
STATUS_DESTROYED = "destroyed"
|
||||
|
||||
|
||||
@dataclass
|
||||
class RunRecord:
|
||||
"""One forge-targeted issue's bottle lifecycle record."""
|
||||
|
||||
owner: str
|
||||
repo: str
|
||||
issue_number: int
|
||||
slug: str
|
||||
agent_name: str
|
||||
bottle_names: list[str] = field(default_factory=list)
|
||||
backend_name: str = ""
|
||||
agent_git_user: str = ""
|
||||
pr_number: int | None = None
|
||||
status: str = STATUS_RUNNING
|
||||
last_checkin_at: str = ""
|
||||
|
||||
|
||||
# --- Forge events (parsed webhook payloads) --------------------------------
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class IssueAssigned:
|
||||
"""An issue gained an assignee — the trigger to consider a launch."""
|
||||
|
||||
owner: str
|
||||
repo: str
|
||||
issue_number: int
|
||||
title: str
|
||||
body: str
|
||||
assignees: tuple[str, ...]
|
||||
labels: tuple[str, ...]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class CommentCreated:
|
||||
"""A comment was posted on an issue or PR — a rehydrate trigger."""
|
||||
|
||||
owner: str
|
||||
repo: str
|
||||
issue_number: int
|
||||
comment_id: int
|
||||
author: str
|
||||
body: str
|
||||
is_pull: bool
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class PullRequestClosed:
|
||||
"""A PR closed (merged or not) — the teardown trigger."""
|
||||
|
||||
owner: str
|
||||
repo: str
|
||||
pr_number: int
|
||||
merged: bool
|
||||
|
||||
|
||||
# Union of everything the webhook layer can emit.
|
||||
ForgeEvent = IssueAssigned | CommentCreated | PullRequestClosed
|
||||
|
||||
|
||||
# --- Provenance ------------------------------------------------------------
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ForgeOp:
|
||||
"""One semantic forge operation the sidecar recorded."""
|
||||
|
||||
at: str # ISO timestamp
|
||||
op: str # e.g. "post_comment", "read_pr", "signal_done"
|
||||
target: int | None
|
||||
detail: str
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Provenance:
|
||||
"""The audit record for one run, served by the provenance API. Never
|
||||
posted into the forge."""
|
||||
|
||||
slug: str
|
||||
owner: str
|
||||
repo: str
|
||||
issue_number: int
|
||||
agent_name: str
|
||||
bottle_names: tuple[str, ...]
|
||||
started_at: str
|
||||
finished_at: str
|
||||
exit_code: int | None
|
||||
watchdog_fired: bool
|
||||
ops: tuple[ForgeOp, ...]
|
||||
@@ -0,0 +1,71 @@
|
||||
"""Provenance assembly + serialization.
|
||||
|
||||
Provenance is the run's audit record: the `RunRecord` metadata plus the
|
||||
sidecar's semantic operation log. It is exposed through the provenance
|
||||
API (see `webhook.ProvenanceHandler`) and deliberately never posted back
|
||||
into the forge — a mutable PR comment is not an audit record.
|
||||
|
||||
This module only assembles and serializes; retention/signing of the
|
||||
record is a control-plane concern out of scope here.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from .model import ForgeOp, Provenance, RunRecord
|
||||
|
||||
|
||||
def ops_from_log(entries: list[dict[str, Any]]) -> tuple[ForgeOp, ...]:
|
||||
return tuple(
|
||||
ForgeOp(
|
||||
at=str(e.get("at", "")),
|
||||
op=str(e.get("op", "")),
|
||||
target=e.get("target"),
|
||||
detail=str(e.get("detail", "")),
|
||||
)
|
||||
for e in entries
|
||||
)
|
||||
|
||||
|
||||
def build_provenance(
|
||||
record: RunRecord,
|
||||
*,
|
||||
ops: tuple[ForgeOp, ...],
|
||||
started_at: str,
|
||||
finished_at: str,
|
||||
exit_code: int | None,
|
||||
watchdog_fired: bool,
|
||||
) -> Provenance:
|
||||
return Provenance(
|
||||
slug=record.slug,
|
||||
owner=record.owner,
|
||||
repo=record.repo,
|
||||
issue_number=record.issue_number,
|
||||
agent_name=record.agent_name,
|
||||
bottle_names=tuple(record.bottle_names),
|
||||
started_at=started_at,
|
||||
finished_at=finished_at,
|
||||
exit_code=exit_code,
|
||||
watchdog_fired=watchdog_fired,
|
||||
ops=ops,
|
||||
)
|
||||
|
||||
|
||||
def provenance_to_dict(p: Provenance) -> dict[str, Any]:
|
||||
return {
|
||||
"slug": p.slug,
|
||||
"owner": p.owner,
|
||||
"repo": p.repo,
|
||||
"issue_number": p.issue_number,
|
||||
"agent": p.agent_name,
|
||||
"bottles": list(p.bottle_names),
|
||||
"started_at": p.started_at,
|
||||
"finished_at": p.finished_at,
|
||||
"exit_code": p.exit_code,
|
||||
"watchdog_fired": p.watchdog_fired,
|
||||
"ops": [
|
||||
{"at": o.at, "op": o.op, "target": o.target, "detail": o.detail}
|
||||
for o in p.ops
|
||||
],
|
||||
}
|
||||
@@ -0,0 +1,118 @@
|
||||
"""Bottle runner: drive the bot-bottle CLI to manage a bottle's life.
|
||||
|
||||
`BottleRunner` is the interface the lifecycle depends on;
|
||||
`SubprocessBottleRunner` shells out to the bot-bottle `cli.py`
|
||||
(`start --headless`, `commit`, `resume --headless`). The subprocess
|
||||
callable is injectable so tests never spawn a process.
|
||||
|
||||
The slug is derived from the label via `slugify`, matching bot-bottle's
|
||||
container-slug rule; the orchestrator picks labels that embed the issue
|
||||
identity so slugs are unique and collisions never rename them.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
from collections.abc import Callable, Sequence
|
||||
from dataclasses import dataclass
|
||||
from typing import Protocol
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class RunResult:
|
||||
slug: str
|
||||
exit_code: int
|
||||
|
||||
|
||||
class BottleRunner(Protocol):
|
||||
def start(
|
||||
self,
|
||||
*,
|
||||
agent: str,
|
||||
bottles: Sequence[str],
|
||||
label: str,
|
||||
prompt: str,
|
||||
forge_env: dict[str, str],
|
||||
) -> RunResult: ...
|
||||
|
||||
def freeze(self, slug: str) -> int: ...
|
||||
|
||||
def resume(self, slug: str, prompt: str) -> RunResult: ...
|
||||
|
||||
def destroy(self, slug: str) -> int: ...
|
||||
|
||||
|
||||
_SLUG_RE = re.compile(r"[^a-z0-9]+")
|
||||
|
||||
|
||||
def slugify(label: str) -> str:
|
||||
"""Lowercase, collapse non-alphanumerics to single hyphens, strip
|
||||
leading/trailing hyphens — matches bot-bottle's slug rule."""
|
||||
return _SLUG_RE.sub("-", label.lower()).strip("-")
|
||||
|
||||
|
||||
# A subprocess.run-shaped callable, injectable for tests.
|
||||
RunFn = Callable[[Sequence[str], dict[str, str]], int]
|
||||
|
||||
|
||||
def _default_run(argv: Sequence[str], env: dict[str, str]) -> int:
|
||||
return subprocess.run(list(argv), env=env, check=False).returncode
|
||||
|
||||
|
||||
class SubprocessBottleRunner:
|
||||
"""Shells the bot-bottle CLI. `cli` is the path to `cli.py`; `python`
|
||||
is the interpreter to run it with; `base_env` is the environment the
|
||||
child inherits (the orchestrator's, minus per-run additions)."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
cli: str,
|
||||
base_env: dict[str, str],
|
||||
python: str = sys.executable,
|
||||
run: RunFn = _default_run,
|
||||
) -> None:
|
||||
self._cli = cli
|
||||
self._python = python
|
||||
self._base_env = base_env
|
||||
self._run = run
|
||||
|
||||
def _argv(self, *args: str) -> list[str]:
|
||||
return [self._python, self._cli, *args]
|
||||
|
||||
def start(
|
||||
self,
|
||||
*,
|
||||
agent: str,
|
||||
bottles: Sequence[str],
|
||||
label: str,
|
||||
prompt: str,
|
||||
forge_env: dict[str, str],
|
||||
) -> RunResult:
|
||||
argv = self._argv(
|
||||
"start", agent, "--headless", "--label", label, "--prompt", prompt
|
||||
)
|
||||
for bottle in bottles:
|
||||
argv += ["--bottle", bottle]
|
||||
code = self._run(argv, {**self._base_env, **forge_env})
|
||||
return RunResult(slug=slugify(label), exit_code=code)
|
||||
|
||||
def freeze(self, slug: str) -> int:
|
||||
# bot-bottle's `commit` snapshots a running bottle's state.
|
||||
return self._run(self._argv("commit", slug), self._base_env)
|
||||
|
||||
def resume(self, slug: str, prompt: str) -> RunResult:
|
||||
code = self._run(
|
||||
self._argv("resume", slug, "--headless", "--prompt", prompt),
|
||||
self._base_env,
|
||||
)
|
||||
return RunResult(slug=slug, exit_code=code)
|
||||
|
||||
def destroy(self, slug: str) -> int:
|
||||
# NOTE: bot-bottle `cleanup` currently targets all bottles; a
|
||||
# per-slug teardown command is a known integration follow-up
|
||||
# (tracked in docs/JOURNAL.md). Kept behind this method so the
|
||||
# call site does not change when that lands.
|
||||
return self._run(self._argv("cleanup", slug), self._base_env)
|
||||
@@ -0,0 +1,171 @@
|
||||
"""Forge sidecar: the agent's only door to the forge.
|
||||
|
||||
The agent calls the sidecar over a line-delimited JSON-RPC AF_UNIX
|
||||
socket; the sidecar dispatches to an injected `forge` (already
|
||||
scope-wrapped by bootstrap) and holds the token, so the agent never sees
|
||||
a credential or a forge endpoint. Every call is appended to a semantic
|
||||
operation log (the provenance raw material). `signal_done` additionally
|
||||
drops an event file in the queue dir the orchestrator drains.
|
||||
|
||||
`dispatch` is pure and testable; `serve` wraps it in a socket server.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import dataclasses
|
||||
import json
|
||||
import socketserver
|
||||
import uuid
|
||||
from collections.abc import Callable
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
_READ_METHODS = {"read_issue", "read_pr", "read_comments"}
|
||||
_WRITE_METHODS = {"post_comment", "update_description"}
|
||||
|
||||
|
||||
def _iso_now() -> str:
|
||||
return datetime.now().astimezone().isoformat(timespec="seconds")
|
||||
|
||||
|
||||
def _jsonable(value: Any) -> Any:
|
||||
if dataclasses.is_dataclass(value) and not isinstance(value, type):
|
||||
return dataclasses.asdict(value)
|
||||
if isinstance(value, list):
|
||||
return [_jsonable(v) for v in value]
|
||||
return value
|
||||
|
||||
|
||||
class OpLog:
|
||||
"""Append-only JSONL log of semantic forge operations."""
|
||||
|
||||
def __init__(self, path: Path, *, now: Callable[[], str] = _iso_now) -> None:
|
||||
self._path = path
|
||||
self._now = now
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def record(self, op: str, target: int | None, detail: str) -> None:
|
||||
entry = {"at": self._now(), "op": op, "target": target, "detail": detail}
|
||||
with self._path.open("a", encoding="utf-8") as fh:
|
||||
fh.write(json.dumps(entry) + "\n")
|
||||
|
||||
def read(self) -> list[dict[str, Any]]:
|
||||
if not self._path.exists():
|
||||
return []
|
||||
return [
|
||||
json.loads(line)
|
||||
for line in self._path.read_text(encoding="utf-8").splitlines()
|
||||
if line.strip()
|
||||
]
|
||||
|
||||
|
||||
def write_done_event(queue_dir: Path, event: dict[str, Any]) -> Path:
|
||||
"""Atomically drop a done-signal event file in the queue dir."""
|
||||
queue_dir.mkdir(parents=True, exist_ok=True)
|
||||
path = queue_dir / f"done-{uuid.uuid4().hex}.json"
|
||||
tmp = path.with_suffix(".json.tmp")
|
||||
tmp.write_text(json.dumps(event), encoding="utf-8")
|
||||
tmp.replace(path)
|
||||
return path
|
||||
|
||||
|
||||
def drain_done_events(queue_dir: Path) -> list[dict[str, Any]]:
|
||||
"""Read and remove every queued done-signal event."""
|
||||
if not queue_dir.is_dir():
|
||||
return []
|
||||
events: list[dict[str, Any]] = []
|
||||
for path in sorted(queue_dir.glob("done-*.json")):
|
||||
try:
|
||||
events.append(json.loads(path.read_text(encoding="utf-8")))
|
||||
except (OSError, ValueError):
|
||||
continue
|
||||
finally:
|
||||
path.unlink(missing_ok=True)
|
||||
return events
|
||||
|
||||
|
||||
class ForgeSidecar:
|
||||
"""Dispatches sidecar protocol calls to the forge, logging each and
|
||||
relaying `signal_done` to the queue dir. `run_key` is the
|
||||
(owner, repo, issue_number) the run is bound to."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
forge: object,
|
||||
op_log: OpLog,
|
||||
queue_dir: Path,
|
||||
run_key: tuple[str, str, int],
|
||||
) -> None:
|
||||
self._forge = forge
|
||||
self._log = op_log
|
||||
self._queue_dir = queue_dir
|
||||
self._owner, self._repo, self._issue = run_key
|
||||
|
||||
def dispatch(self, method: str, params: dict[str, Any]) -> dict[str, Any]:
|
||||
try:
|
||||
result = self._invoke(method, params)
|
||||
except Exception as exc: # noqa: BLE001 — surface as JSON-RPC error
|
||||
self._log.record(method, params.get("number"), f"error: {exc}")
|
||||
return {"ok": False, "error": str(exc)}
|
||||
return {"ok": True, "result": result}
|
||||
|
||||
def _invoke(self, method: str, params: dict[str, Any]) -> Any:
|
||||
if method in _READ_METHODS:
|
||||
number = int(params["number"])
|
||||
result = getattr(self._forge, method)(number)
|
||||
self._log.record(method, number, "ok")
|
||||
return _jsonable(result)
|
||||
if method in _WRITE_METHODS:
|
||||
number = int(params["number"])
|
||||
getattr(self._forge, method)(number, params["body"])
|
||||
self._log.record(method, number, "ok")
|
||||
return None
|
||||
if method == "signal_done":
|
||||
status = str(params.get("status", ""))
|
||||
summary = str(params.get("summary", ""))
|
||||
self._log.record("signal_done", None, f"{status}: {summary}")
|
||||
write_done_event(
|
||||
self._queue_dir,
|
||||
{
|
||||
"owner": self._owner,
|
||||
"repo": self._repo,
|
||||
"issue_number": self._issue,
|
||||
"status": status,
|
||||
"summary": summary,
|
||||
},
|
||||
)
|
||||
return None
|
||||
raise ValueError(f"unknown method: {method}")
|
||||
|
||||
|
||||
class _Handler(socketserver.StreamRequestHandler):
|
||||
def handle(self) -> None:
|
||||
line = self.rfile.readline()
|
||||
if not line:
|
||||
return
|
||||
try:
|
||||
req = json.loads(line)
|
||||
except ValueError:
|
||||
self.wfile.write(b'{"ok": false, "error": "invalid json"}\n')
|
||||
return
|
||||
resp = self.server.sidecar.dispatch( # type: ignore[attr-defined]
|
||||
str(req.get("method", "")), dict(req.get("params", {}))
|
||||
)
|
||||
self.wfile.write((json.dumps(resp) + "\n").encode())
|
||||
|
||||
|
||||
class _Server(socketserver.ThreadingUnixStreamServer):
|
||||
def __init__(self, socket_path: str, sidecar: ForgeSidecar) -> None:
|
||||
super().__init__(socket_path, _Handler)
|
||||
self.sidecar = sidecar
|
||||
|
||||
|
||||
def serve(sidecar: ForgeSidecar, socket_path: Path) -> _Server:
|
||||
"""Bind a threaded AF_UNIX server for `sidecar`. Caller runs
|
||||
`serve_forever()` (or `handle_request()` in tests) and closes it."""
|
||||
if socket_path.exists():
|
||||
socket_path.unlink()
|
||||
socket_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
return _Server(str(socket_path), sidecar)
|
||||
@@ -0,0 +1,48 @@
|
||||
"""State store interface + an in-memory implementation.
|
||||
|
||||
The orchestrator persists one `RunRecord` per forge-targeted issue. At
|
||||
runtime `bootstrap` supplies an adapter over bot-bottle's
|
||||
`SqliteForgeStateStore`; the in-memory store here backs tests and a
|
||||
`--no-bot-bottle` dry mode.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Protocol
|
||||
|
||||
from .model import RunRecord
|
||||
|
||||
|
||||
class StateStore(Protocol):
|
||||
"""Thin CRUD surface. Mirrors bot-bottle's `ForgeStateStore` so the
|
||||
bootstrap adapter is a straight pass-through."""
|
||||
|
||||
def upsert(self, record: RunRecord) -> None: ...
|
||||
|
||||
def get(self, owner: str, repo: str, issue_number: int) -> RunRecord | None: ...
|
||||
|
||||
def delete(self, owner: str, repo: str, issue_number: int) -> None: ...
|
||||
|
||||
def all(self) -> list[RunRecord]: ...
|
||||
|
||||
|
||||
class InMemoryStateStore:
|
||||
"""Dict-backed `StateStore`, keyed by (owner, repo, issue_number)."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._by_key: dict[tuple[str, str, int], RunRecord] = {}
|
||||
|
||||
def upsert(self, record: RunRecord) -> None:
|
||||
self._by_key[(record.owner, record.repo, record.issue_number)] = record
|
||||
|
||||
def get(self, owner: str, repo: str, issue_number: int) -> RunRecord | None:
|
||||
return self._by_key.get((owner, repo, issue_number))
|
||||
|
||||
def delete(self, owner: str, repo: str, issue_number: int) -> None:
|
||||
self._by_key.pop((owner, repo, issue_number), None)
|
||||
|
||||
def all(self) -> list[RunRecord]:
|
||||
return sorted(
|
||||
self._by_key.values(),
|
||||
key=lambda r: (r.owner, r.repo, r.issue_number),
|
||||
)
|
||||
@@ -0,0 +1,51 @@
|
||||
"""Decide whether an assigned issue is agent-targeted, and for whom.
|
||||
|
||||
An issue is forge-targeted when BOTH hold:
|
||||
- it carries a `bot-bottle:<agent>` label naming the agent, and
|
||||
- at least one assignee is a member of the configured org.
|
||||
|
||||
An optional `bot-bottle-bottle:<name>` label overrides bottle selection.
|
||||
The forge is duck-typed: any object with `is_org_member(org, user)`.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Protocol
|
||||
|
||||
from .config import BOTTLE_LABEL_PREFIX, LABEL_PREFIX
|
||||
from .model import IssueAssigned
|
||||
|
||||
|
||||
class Membership(Protocol):
|
||||
def is_org_member(self, org: str, username: str) -> bool: ...
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Target:
|
||||
agent_name: str
|
||||
bottle_override: str | None
|
||||
|
||||
|
||||
def parse_labels(labels: tuple[str, ...]) -> tuple[str | None, str | None]:
|
||||
"""Return (agent_name, bottle_override) parsed from labels."""
|
||||
agent: str | None = None
|
||||
bottle: str | None = None
|
||||
for label in labels:
|
||||
if label.startswith(BOTTLE_LABEL_PREFIX):
|
||||
bottle = label[len(BOTTLE_LABEL_PREFIX):] or None
|
||||
elif label.startswith(LABEL_PREFIX):
|
||||
agent = label[len(LABEL_PREFIX):] or None
|
||||
return agent, bottle
|
||||
|
||||
|
||||
def resolve_target(
|
||||
event: IssueAssigned, forge: Membership, org: str
|
||||
) -> Target | None:
|
||||
"""Return the `Target` for a forge-targeted issue, or None to ignore."""
|
||||
agent, bottle = parse_labels(event.labels)
|
||||
if not agent:
|
||||
return None
|
||||
if not any(forge.is_org_member(org, a) for a in event.assignees):
|
||||
return None
|
||||
return Target(agent_name=agent, bottle_override=bottle)
|
||||
@@ -0,0 +1,68 @@
|
||||
"""Watchdog: freeze runs whose agent exited without signalling done.
|
||||
|
||||
`sweep(now)` is the pure, testable core: any `running` record whose
|
||||
`last_checkin_at` is older than the timeout is frozen as
|
||||
done-without-self-report and returned so provenance can flag it.
|
||||
`Watchdog.start()` runs `sweep` on a daemon thread once a minute.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from .model import STATUS_FROZEN, STATUS_RUNNING, RunRecord
|
||||
from .runner import BottleRunner
|
||||
from .store import StateStore
|
||||
|
||||
_TICK_SECS = 60.0
|
||||
|
||||
|
||||
def _parse(ts: str) -> datetime | None:
|
||||
try:
|
||||
return datetime.fromisoformat(ts)
|
||||
except (ValueError, TypeError):
|
||||
return None
|
||||
|
||||
|
||||
class Watchdog:
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
store: StateStore,
|
||||
runner: BottleRunner,
|
||||
timeout_secs: int,
|
||||
) -> None:
|
||||
self._store = store
|
||||
self._runner = runner
|
||||
self._timeout = timedelta(seconds=timeout_secs)
|
||||
self._stop = threading.Event()
|
||||
self._thread: threading.Thread | None = None
|
||||
|
||||
def sweep(self, now: datetime) -> list[RunRecord]:
|
||||
"""Freeze stale running records. Returns the ones fired."""
|
||||
fired: list[RunRecord] = []
|
||||
for record in self._store.all():
|
||||
if record.status != STATUS_RUNNING:
|
||||
continue
|
||||
checkin = _parse(record.last_checkin_at)
|
||||
if checkin is None or now - checkin <= self._timeout:
|
||||
continue
|
||||
self._runner.freeze(record.slug)
|
||||
record.status = STATUS_FROZEN
|
||||
self._store.upsert(record)
|
||||
fired.append(record)
|
||||
return fired
|
||||
|
||||
def start(self) -> None:
|
||||
self._thread = threading.Thread(target=self._loop, daemon=True)
|
||||
self._thread.start()
|
||||
|
||||
def stop(self) -> None:
|
||||
self._stop.set()
|
||||
if self._thread is not None:
|
||||
self._thread.join(timeout=_TICK_SECS)
|
||||
|
||||
def _loop(self) -> None:
|
||||
while not self._stop.wait(_TICK_SECS):
|
||||
self.sweep(datetime.now().astimezone())
|
||||
@@ -0,0 +1,123 @@
|
||||
"""HTTP surface: the Gitea webhook receiver and the provenance API.
|
||||
|
||||
`POST /webhook` — a Gitea event; parsed and dispatched to the orchestrator.
|
||||
`GET /healthz` — liveness.
|
||||
`GET /provenance?owner=&repo=&issue=` — the run's audit record (never
|
||||
posted to the forge).
|
||||
|
||||
Webhook signature verification is optional: set a secret and the handler
|
||||
rejects bodies whose `X-Gitea-Signature` HMAC-SHA256 does not match.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import hmac
|
||||
import json
|
||||
from collections.abc import Callable
|
||||
from hashlib import sha256
|
||||
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
||||
from typing import Any
|
||||
from urllib.parse import parse_qs, urlparse
|
||||
|
||||
from .events import parse_event
|
||||
from .lifecycle import Orchestrator
|
||||
from .provenance import build_provenance, ops_from_log, provenance_to_dict
|
||||
from .store import StateStore
|
||||
|
||||
# (record) -> that run's op-log entries, injected by bootstrap.
|
||||
OpLogReader = Callable[[Any], list[dict[str, Any]]]
|
||||
|
||||
|
||||
class WebhookServer(ThreadingHTTPServer):
|
||||
def __init__(
|
||||
self,
|
||||
address: tuple[str, int],
|
||||
*,
|
||||
orchestrator: Orchestrator,
|
||||
store: StateStore,
|
||||
secret: bytes | None = None,
|
||||
op_log_reader: OpLogReader | None = None,
|
||||
) -> None:
|
||||
super().__init__(address, _Handler)
|
||||
self.orchestrator = orchestrator
|
||||
self.store = store
|
||||
self.secret = secret
|
||||
self.op_log_reader = op_log_reader
|
||||
|
||||
|
||||
def verify_signature(secret: bytes, body: bytes, signature: str) -> bool:
|
||||
expected = hmac.new(secret, body, sha256).hexdigest()
|
||||
return hmac.compare_digest(expected, signature or "")
|
||||
|
||||
|
||||
class _Handler(BaseHTTPRequestHandler):
|
||||
server: WebhookServer # type: ignore[assignment]
|
||||
|
||||
def log_message( # pylint: disable=redefined-builtin
|
||||
self, format: str, *args: Any
|
||||
) -> None: # quiet by default
|
||||
pass
|
||||
|
||||
def _send(self, code: int, payload: dict[str, Any]) -> None:
|
||||
body = json.dumps(payload).encode()
|
||||
self.send_response(code)
|
||||
self.send_header("Content-Type", "application/json")
|
||||
self.send_header("Content-Length", str(len(body)))
|
||||
self.end_headers()
|
||||
self.wfile.write(body)
|
||||
|
||||
def do_POST(self) -> None: # noqa: N802 # pylint: disable=invalid-name
|
||||
if urlparse(self.path).path != "/webhook":
|
||||
self._send(404, {"error": "not found"})
|
||||
return
|
||||
length = int(self.headers.get("Content-Length", "0"))
|
||||
body = self.rfile.read(length)
|
||||
if self.server.secret is not None:
|
||||
sig = self.headers.get("X-Gitea-Signature", "")
|
||||
if not verify_signature(self.server.secret, body, sig):
|
||||
self._send(401, {"error": "bad signature"})
|
||||
return
|
||||
try:
|
||||
payload = json.loads(body)
|
||||
except ValueError:
|
||||
self._send(400, {"error": "invalid json"})
|
||||
return
|
||||
kind = self.headers.get("X-Gitea-Event", "")
|
||||
event = parse_event(kind, payload)
|
||||
if event is not None:
|
||||
self.server.orchestrator.handle(event)
|
||||
self._send(200, {"ok": True, "handled": event is not None})
|
||||
|
||||
def do_GET(self) -> None: # noqa: N802 # pylint: disable=invalid-name
|
||||
parsed = urlparse(self.path)
|
||||
if parsed.path == "/healthz":
|
||||
self._send(200, {"ok": True})
|
||||
return
|
||||
if parsed.path == "/provenance":
|
||||
self._provenance(parse_qs(parsed.query))
|
||||
return
|
||||
self._send(404, {"error": "not found"})
|
||||
|
||||
def _provenance(self, query: dict[str, list[str]]) -> None:
|
||||
try:
|
||||
owner = query["owner"][0]
|
||||
repo = query["repo"][0]
|
||||
issue = int(query["issue"][0])
|
||||
except (KeyError, IndexError, ValueError):
|
||||
self._send(400, {"error": "owner, repo, issue required"})
|
||||
return
|
||||
record = self.server.store.get(owner, repo, issue)
|
||||
if record is None:
|
||||
self._send(404, {"error": "no such run"})
|
||||
return
|
||||
reader = self.server.op_log_reader
|
||||
ops = ops_from_log(reader(record) if reader is not None else [])
|
||||
prov = build_provenance(
|
||||
record,
|
||||
ops=ops,
|
||||
started_at="",
|
||||
finished_at=record.last_checkin_at,
|
||||
exit_code=None,
|
||||
watchdog_fired=False,
|
||||
)
|
||||
self._send(200, provenance_to_dict(prov))
|
||||
@@ -0,0 +1,69 @@
|
||||
"""Shared test doubles: a duck-typed forge and bottle runner."""
|
||||
|
||||
# Test doubles mirror an API shape; some params are intentionally unused.
|
||||
# pylint: disable=unused-argument
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Sequence
|
||||
|
||||
from bot_bottle.orchestrator.runner import RunResult, slugify
|
||||
|
||||
|
||||
class FakeForge:
|
||||
def __init__(self, members: tuple[str, ...] = ()) -> None:
|
||||
self.members = set(members)
|
||||
self.comments: list[tuple[int, str]] = []
|
||||
self.descriptions: list[tuple[int, str]] = []
|
||||
self.scope_denied: set[int] = set()
|
||||
|
||||
def is_org_member(self, org: str, username: str) -> bool:
|
||||
return username in self.members
|
||||
|
||||
def read_issue(self, number: int) -> dict[str, object]:
|
||||
return {"number": number, "kind": "issue"}
|
||||
|
||||
def read_pr(self, number: int) -> dict[str, object]:
|
||||
return {"number": number, "merged": False}
|
||||
|
||||
def read_comments(self, number: int) -> list[dict[str, object]]:
|
||||
return [{"id": 1, "user": "alice", "body": "hi"}]
|
||||
|
||||
def post_comment(self, number: int, body: str) -> None:
|
||||
if number in self.scope_denied:
|
||||
raise PermissionError(f"write to #{number} denied")
|
||||
self.comments.append((number, body))
|
||||
|
||||
def update_description(self, number: int, body: str) -> None:
|
||||
if number in self.scope_denied:
|
||||
raise PermissionError(f"write to #{number} denied")
|
||||
self.descriptions.append((number, body))
|
||||
|
||||
|
||||
class FakeRunner:
|
||||
def __init__(self) -> None:
|
||||
self.calls: list[tuple[object, ...]] = []
|
||||
|
||||
def start(
|
||||
self,
|
||||
*,
|
||||
agent: str,
|
||||
bottles: Sequence[str],
|
||||
label: str,
|
||||
prompt: str,
|
||||
forge_env: dict[str, str],
|
||||
) -> RunResult:
|
||||
self.calls.append(("start", agent, tuple(bottles), label, prompt, dict(forge_env)))
|
||||
return RunResult(slug=slugify(label), exit_code=0)
|
||||
|
||||
def freeze(self, slug: str) -> int:
|
||||
self.calls.append(("freeze", slug))
|
||||
return 0
|
||||
|
||||
def resume(self, slug: str, prompt: str) -> RunResult:
|
||||
self.calls.append(("resume", slug, prompt))
|
||||
return RunResult(slug=slug, exit_code=0)
|
||||
|
||||
def destroy(self, slug: str) -> int:
|
||||
self.calls.append(("destroy", slug))
|
||||
return 0
|
||||
@@ -0,0 +1,38 @@
|
||||
"""Unit: Config.from_env."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
from bot_bottle.orchestrator.config import Config
|
||||
|
||||
|
||||
class ConfigTest(unittest.TestCase):
|
||||
def test_defaults(self):
|
||||
c = Config.from_env({"HOME": "/home/x"})
|
||||
self.assertEqual("bot-bottle", c.forge_org)
|
||||
self.assertEqual(1800, c.watchdog_timeout_secs)
|
||||
self.assertEqual("127.0.0.1", c.webhook_host)
|
||||
self.assertEqual(8477, c.webhook_port)
|
||||
self.assertEqual(Path("/home/x/.bot-bottle/forge-queue"), c.queue_dir)
|
||||
self.assertIsNone(c.db_path)
|
||||
|
||||
def test_overrides(self):
|
||||
c = Config.from_env({
|
||||
"HOME": "/home/x",
|
||||
"FORGE_ORG": "agents",
|
||||
"FORGE_WATCHDOG_TIMEOUT": "60",
|
||||
"FORGE_GITEA_API": "https://g.example/api/v1",
|
||||
"FORGE_WEBHOOK_PORT": "9000",
|
||||
"FORGE_DB_PATH": "/data/bb.db",
|
||||
})
|
||||
self.assertEqual("agents", c.forge_org)
|
||||
self.assertEqual(60, c.watchdog_timeout_secs)
|
||||
self.assertEqual("https://g.example/api/v1", c.gitea_api)
|
||||
self.assertEqual(9000, c.webhook_port)
|
||||
self.assertEqual(Path("/data/bb.db"), c.db_path)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,64 @@
|
||||
"""Unit: webhook payload parsing."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import unittest
|
||||
|
||||
from bot_bottle.orchestrator.events import parse_event
|
||||
from bot_bottle.orchestrator.model import CommentCreated, IssueAssigned, PullRequestClosed
|
||||
|
||||
_REPO = {"repository": {"name": "bot-bottle", "owner": {"login": "didericis"}}}
|
||||
|
||||
|
||||
class ParseEventTest(unittest.TestCase):
|
||||
def test_issue_assigned(self):
|
||||
payload = {
|
||||
**_REPO,
|
||||
"action": "assigned",
|
||||
"issue": {
|
||||
"number": 17,
|
||||
"title": "Fix it",
|
||||
"body": "please",
|
||||
"assignees": [{"login": "agent-bot"}],
|
||||
"labels": [{"name": "bot-bottle:implementer"}],
|
||||
},
|
||||
}
|
||||
ev = parse_event("issues", payload)
|
||||
self.assertIsInstance(ev, IssueAssigned)
|
||||
assert isinstance(ev, IssueAssigned)
|
||||
self.assertEqual(("didericis", "bot-bottle", 17), (ev.owner, ev.repo, ev.issue_number))
|
||||
self.assertEqual(("agent-bot",), ev.assignees)
|
||||
self.assertEqual(("bot-bottle:implementer",), ev.labels)
|
||||
|
||||
def test_issue_non_assigned_ignored(self):
|
||||
self.assertIsNone(parse_event("issues", {**_REPO, "action": "opened", "issue": {}}))
|
||||
|
||||
def test_comment_created(self):
|
||||
payload = {
|
||||
**_REPO,
|
||||
"action": "created",
|
||||
"issue": {"number": 42, "pull_request": {"x": 1}},
|
||||
"comment": {"id": 5, "user": {"login": "reviewer"}, "body": "redo"},
|
||||
}
|
||||
ev = parse_event("issue_comment", payload)
|
||||
assert isinstance(ev, CommentCreated)
|
||||
self.assertEqual(42, ev.issue_number)
|
||||
self.assertEqual("reviewer", ev.author)
|
||||
self.assertTrue(ev.is_pull)
|
||||
|
||||
def test_pull_request_closed(self):
|
||||
payload = {**_REPO, "action": "closed", "pull_request": {"number": 8, "merged": True}}
|
||||
ev = parse_event("pull_request", payload)
|
||||
assert isinstance(ev, PullRequestClosed)
|
||||
self.assertEqual(8, ev.pr_number)
|
||||
self.assertTrue(ev.merged)
|
||||
|
||||
def test_pull_request_non_closed_ignored(self):
|
||||
self.assertIsNone(parse_event("pull_request", {**_REPO, "action": "opened"}))
|
||||
|
||||
def test_unknown_kind_ignored(self):
|
||||
self.assertIsNone(parse_event("push", {**_REPO}))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,75 @@
|
||||
"""Unit: ForgeState + SqliteForgeStateStore."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import unittest
|
||||
|
||||
from bot_bottle.contrib.gitea.forge_state import ForgeState, SqliteForgeStateStore
|
||||
|
||||
|
||||
def _state(**kw: object) -> ForgeState:
|
||||
defaults: dict[str, object] = dict(
|
||||
owner="alice", repo="myrepo", issue_number=1,
|
||||
slug="impl-alice-myrepo-1", agent_name="impl",
|
||||
)
|
||||
defaults.update(kw)
|
||||
return ForgeState(**defaults) # type: ignore[arg-type]
|
||||
|
||||
|
||||
class ForgeStateStoreTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.store = SqliteForgeStateStore(None)
|
||||
|
||||
def test_upsert_and_get(self):
|
||||
s = _state()
|
||||
self.store.upsert(s)
|
||||
got = self.store.get("alice", "myrepo", 1)
|
||||
assert got is not None
|
||||
self.assertEqual("impl-alice-myrepo-1", got.slug)
|
||||
self.assertEqual("impl", got.agent_name)
|
||||
|
||||
def test_get_missing(self):
|
||||
self.assertIsNone(self.store.get("alice", "myrepo", 99))
|
||||
|
||||
def test_upsert_replaces(self):
|
||||
self.store.upsert(_state(status="running"))
|
||||
self.store.upsert(_state(status="frozen"))
|
||||
got = self.store.get("alice", "myrepo", 1)
|
||||
assert got is not None
|
||||
self.assertEqual("frozen", got.status)
|
||||
|
||||
def test_delete(self):
|
||||
self.store.upsert(_state())
|
||||
self.store.delete("alice", "myrepo", 1)
|
||||
self.assertIsNone(self.store.get("alice", "myrepo", 1))
|
||||
|
||||
def test_delete_missing_no_error(self):
|
||||
self.store.delete("alice", "myrepo", 99)
|
||||
|
||||
def test_all_sorted(self):
|
||||
self.store.upsert(_state(owner="z", issue_number=2))
|
||||
self.store.upsert(_state(owner="a", issue_number=1))
|
||||
rows = self.store.all()
|
||||
self.assertEqual(("a", "z"), (rows[0].owner, rows[1].owner))
|
||||
|
||||
def test_bottle_names_roundtrip(self):
|
||||
self.store.upsert(_state(bottle_names=["claude", "dev"]))
|
||||
got = self.store.get("alice", "myrepo", 1)
|
||||
assert got is not None
|
||||
self.assertEqual(["claude", "dev"], got.bottle_names)
|
||||
|
||||
def test_pr_number_none_roundtrip(self):
|
||||
self.store.upsert(_state(pr_number=None))
|
||||
got = self.store.get("alice", "myrepo", 1)
|
||||
assert got is not None
|
||||
self.assertIsNone(got.pr_number)
|
||||
|
||||
def test_pr_number_int_roundtrip(self):
|
||||
self.store.upsert(_state(pr_number=42))
|
||||
got = self.store.get("alice", "myrepo", 1)
|
||||
assert got is not None
|
||||
self.assertEqual(42, got.pr_number)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,140 @@
|
||||
"""Unit: the orchestration lifecycle."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import unittest
|
||||
from typing import cast
|
||||
|
||||
from bot_bottle.orchestrator.lifecycle import Orchestrator
|
||||
from bot_bottle.orchestrator.model import (
|
||||
STATUS_FROZEN,
|
||||
STATUS_RUNNING,
|
||||
CommentCreated,
|
||||
IssueAssigned,
|
||||
PullRequestClosed,
|
||||
)
|
||||
from bot_bottle.orchestrator.store import InMemoryStateStore
|
||||
|
||||
from ._fakes import FakeForge, FakeRunner
|
||||
|
||||
|
||||
def _assigned(
|
||||
labels: tuple[str, ...] = ("bot-bottle:impl",),
|
||||
assignees: tuple[str, ...] = ("agent-bot",),
|
||||
) -> IssueAssigned:
|
||||
return IssueAssigned(
|
||||
owner="didericis", repo="bot-bottle", issue_number=17,
|
||||
title="t", body="the task", assignees=tuple(assignees), labels=tuple(labels),
|
||||
)
|
||||
|
||||
|
||||
class LifecycleTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.forge = FakeForge(members=("agent-bot",))
|
||||
self.store = InMemoryStateStore()
|
||||
self.runner = FakeRunner()
|
||||
self.orch = Orchestrator(
|
||||
forge=self.forge, store=self.store, runner=self.runner,
|
||||
org="bot-bottle", gitea_api="https://g/api/v1",
|
||||
now=lambda: "2026-07-01T00:00:00-04:00",
|
||||
)
|
||||
|
||||
def _record(self):
|
||||
return self.store.get("didericis", "bot-bottle", 17)
|
||||
|
||||
def test_assigned_targeted_launches(self):
|
||||
self.orch.handle(_assigned())
|
||||
rec = self._record()
|
||||
assert rec is not None
|
||||
self.assertEqual(STATUS_RUNNING, rec.status)
|
||||
self.assertEqual("impl-didericis-bot-bottle-17", rec.slug)
|
||||
self.assertEqual("start", self.runner.calls[0][0])
|
||||
# forge context injected into the child env.
|
||||
env = cast("dict[str, str]", self.runner.calls[0][5])
|
||||
self.assertEqual("didericis", env["FORGE_OWNER"])
|
||||
self.assertEqual("17", env["FORGE_ISSUE_NUMBER"])
|
||||
|
||||
def test_untargeted_ignored(self):
|
||||
self.orch.handle(_assigned(labels=("bug",)))
|
||||
self.assertIsNone(self._record())
|
||||
self.assertEqual([], self.runner.calls)
|
||||
|
||||
def test_assigned_is_idempotent(self):
|
||||
self.orch.handle(_assigned())
|
||||
self.orch.handle(_assigned()) # redelivery
|
||||
starts = [c for c in self.runner.calls if c[0] == "start"]
|
||||
self.assertEqual(1, len(starts))
|
||||
|
||||
def test_done_signal_freezes(self):
|
||||
self.orch.handle(_assigned())
|
||||
self.orch.on_done_signal("didericis", "bot-bottle", 17, "success", "done")
|
||||
rec = self._record()
|
||||
assert rec is not None
|
||||
self.assertEqual(STATUS_FROZEN, rec.status)
|
||||
self.assertIn(("freeze", "impl-didericis-bot-bottle-17"), self.runner.calls)
|
||||
|
||||
def test_done_signal_ignored_when_not_running(self):
|
||||
# No record yet -> no freeze.
|
||||
self.orch.on_done_signal("didericis", "bot-bottle", 17, "s", "")
|
||||
self.assertEqual([], self.runner.calls)
|
||||
|
||||
def test_comment_on_frozen_resumes(self):
|
||||
self.orch.handle(_assigned())
|
||||
self.orch.on_done_signal("didericis", "bot-bottle", 17, "s", "")
|
||||
self.orch.handle(CommentCreated(
|
||||
owner="didericis", repo="bot-bottle", issue_number=17,
|
||||
comment_id=1, author="reviewer", body="please redo", is_pull=False,
|
||||
))
|
||||
rec = self._record()
|
||||
assert rec is not None
|
||||
self.assertEqual(STATUS_RUNNING, rec.status)
|
||||
self.assertIn(("resume", "impl-didericis-bot-bottle-17", "please redo"),
|
||||
self.runner.calls)
|
||||
|
||||
def test_comment_echo_guard(self):
|
||||
self.orch.handle(_assigned())
|
||||
self.orch.on_done_signal("didericis", "bot-bottle", 17, "s", "")
|
||||
rec = self._record()
|
||||
assert rec is not None
|
||||
rec.agent_git_user = "agent-bot"
|
||||
self.store.upsert(rec)
|
||||
self.orch.handle(CommentCreated(
|
||||
owner="didericis", repo="bot-bottle", issue_number=17,
|
||||
comment_id=2, author="agent-bot", body="I finished", is_pull=False,
|
||||
))
|
||||
# Still frozen, no resume triggered by the agent's own comment.
|
||||
self.assertEqual(STATUS_FROZEN, self._record().status) # type: ignore[union-attr]
|
||||
self.assertNotIn("resume", [c[0] for c in self.runner.calls])
|
||||
|
||||
def test_comment_on_running_ignored(self):
|
||||
self.orch.handle(_assigned()) # running
|
||||
self.orch.handle(CommentCreated(
|
||||
owner="didericis", repo="bot-bottle", issue_number=17,
|
||||
comment_id=1, author="reviewer", body="hi", is_pull=False,
|
||||
))
|
||||
self.assertNotIn("resume", [c[0] for c in self.runner.calls])
|
||||
|
||||
def test_pr_comment_routes_via_link(self):
|
||||
self.orch.handle(_assigned())
|
||||
self.orch.on_done_signal("didericis", "bot-bottle", 17, "s", "")
|
||||
self.orch.link_pr("didericis", "bot-bottle", 17, 42)
|
||||
# Comment arrives on PR #42 (issue_number == PR number in Gitea).
|
||||
self.orch.handle(CommentCreated(
|
||||
owner="didericis", repo="bot-bottle", issue_number=42,
|
||||
comment_id=9, author="reviewer", body="fix", is_pull=True,
|
||||
))
|
||||
self.assertIn(("resume", "impl-didericis-bot-bottle-17", "fix"),
|
||||
self.runner.calls)
|
||||
|
||||
def test_pr_closed_destroys_and_removes(self):
|
||||
self.orch.handle(_assigned())
|
||||
self.orch.link_pr("didericis", "bot-bottle", 17, 42)
|
||||
self.orch.handle(PullRequestClosed(
|
||||
owner="didericis", repo="bot-bottle", pr_number=42, merged=True,
|
||||
))
|
||||
self.assertIn(("destroy", "impl-didericis-bot-bottle-17"), self.runner.calls)
|
||||
self.assertIsNone(self._record())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,53 @@
|
||||
"""Unit: provenance assembly + serialization."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import unittest
|
||||
|
||||
from bot_bottle.orchestrator.model import RunRecord
|
||||
from bot_bottle.orchestrator.provenance import build_provenance, ops_from_log, provenance_to_dict
|
||||
|
||||
|
||||
def _record() -> RunRecord:
|
||||
return RunRecord(
|
||||
owner="didericis", repo="bot-bottle", issue_number=17,
|
||||
slug="impl-17", agent_name="impl", bottle_names=["claude"],
|
||||
last_checkin_at="2026-07-01T00:05:00-04:00",
|
||||
)
|
||||
|
||||
|
||||
class ProvenanceTest(unittest.TestCase):
|
||||
def test_ops_from_log(self):
|
||||
ops = ops_from_log([
|
||||
{"at": "T1", "op": "read_pr", "target": 5, "detail": "ok"},
|
||||
{"at": "T2", "op": "signal_done", "target": None, "detail": "success: done"},
|
||||
])
|
||||
self.assertEqual(2, len(ops))
|
||||
self.assertEqual("read_pr", ops[0].op)
|
||||
self.assertIsNone(ops[1].target)
|
||||
|
||||
def test_build_and_serialize(self):
|
||||
ops = ops_from_log([{"at": "T1", "op": "post_comment", "target": 17, "detail": "ok"}])
|
||||
prov = build_provenance(
|
||||
_record(), ops=ops, started_at="2026-07-01T00:00:00-04:00",
|
||||
finished_at="2026-07-01T00:05:00-04:00", exit_code=0, watchdog_fired=False,
|
||||
)
|
||||
d = provenance_to_dict(prov)
|
||||
self.assertEqual("impl-17", d["slug"])
|
||||
self.assertEqual("didericis", d["owner"])
|
||||
self.assertEqual(["claude"], d["bottles"])
|
||||
self.assertEqual(0, d["exit_code"])
|
||||
self.assertFalse(d["watchdog_fired"])
|
||||
self.assertEqual(1, len(d["ops"]))
|
||||
self.assertEqual("post_comment", d["ops"][0]["op"])
|
||||
|
||||
def test_watchdog_flag_serialized(self):
|
||||
prov = build_provenance(
|
||||
_record(), ops=(), started_at="", finished_at="",
|
||||
exit_code=None, watchdog_fired=True,
|
||||
)
|
||||
self.assertTrue(provenance_to_dict(prov)["watchdog_fired"])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,65 @@
|
||||
"""Unit: SubprocessBottleRunner + slugify (injected run fn)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import unittest
|
||||
from collections.abc import Sequence
|
||||
|
||||
from bot_bottle.orchestrator.runner import SubprocessBottleRunner, slugify
|
||||
|
||||
|
||||
class SlugifyTest(unittest.TestCase):
|
||||
def test_basic(self):
|
||||
self.assertEqual("impl-didericis-bot-bottle-17",
|
||||
slugify("impl-didericis-bot-bottle-17"))
|
||||
|
||||
def test_collapses_and_strips(self):
|
||||
self.assertEqual("a-b-c", slugify(" A_B/C!! "))
|
||||
|
||||
|
||||
class SubprocessRunnerTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.argvs: list[list[str]] = []
|
||||
self.envs: list[dict[str, str]] = []
|
||||
|
||||
def fake_run(argv: Sequence[str], env: dict[str, str]) -> int:
|
||||
self.argvs.append(list(argv))
|
||||
self.envs.append(dict(env))
|
||||
return 0
|
||||
|
||||
self.runner = SubprocessBottleRunner(
|
||||
cli="/x/cli.py", base_env={"PATH": "/bin"}, python="/py", run=fake_run
|
||||
)
|
||||
|
||||
def test_start_argv_and_env(self):
|
||||
result = self.runner.start(
|
||||
agent="impl", bottles=["claude", "dev"], label="impl-r-17",
|
||||
prompt="do it", forge_env={"FORGE_OWNER": "didericis"},
|
||||
)
|
||||
self.assertEqual("impl-r-17", result.slug)
|
||||
argv = self.argvs[0]
|
||||
self.assertEqual(["/py", "/x/cli.py", "start", "impl", "--headless",
|
||||
"--label", "impl-r-17", "--prompt", "do it",
|
||||
"--bottle", "claude", "--bottle", "dev"], argv)
|
||||
# forge_env merged over base_env for the child.
|
||||
self.assertEqual("didericis", self.envs[0]["FORGE_OWNER"])
|
||||
self.assertEqual("/bin", self.envs[0]["PATH"])
|
||||
|
||||
def test_start_no_bottles_omits_flag(self):
|
||||
self.runner.start(agent="impl", bottles=[], label="l", prompt="p", forge_env={})
|
||||
self.assertNotIn("--bottle", self.argvs[0])
|
||||
|
||||
def test_freeze_calls_commit(self):
|
||||
self.runner.freeze("slug-1")
|
||||
self.assertEqual(["/py", "/x/cli.py", "commit", "slug-1"], self.argvs[0])
|
||||
|
||||
def test_resume_headless(self):
|
||||
r = self.runner.resume("slug-1", "address review")
|
||||
self.assertEqual("slug-1", r.slug)
|
||||
self.assertEqual(
|
||||
["/py", "/x/cli.py", "resume", "slug-1", "--headless", "--prompt",
|
||||
"address review"], self.argvs[0])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,75 @@
|
||||
"""Unit: ScopedForge — read-anywhere / write-scoped access control."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import unittest
|
||||
|
||||
from bot_bottle.contrib.forge.base import ScopedForge
|
||||
|
||||
from ._fakes import FakeForge
|
||||
|
||||
|
||||
class ScopedForgeTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.inner = FakeForge()
|
||||
self.scoped = ScopedForge(
|
||||
self.inner, assigned_issue=10, assigned_prs=[20, 30]
|
||||
)
|
||||
|
||||
# --- reads always pass through -----------------------------------------
|
||||
|
||||
def test_read_issue_allowed_anywhere(self):
|
||||
for number in (10, 20, 99):
|
||||
result = self.scoped.read_issue(number)
|
||||
self.assertEqual(number, result["number"])
|
||||
|
||||
def test_read_pr_allowed_anywhere(self):
|
||||
for number in (10, 20, 99):
|
||||
result = self.scoped.read_pr(number)
|
||||
self.assertEqual(number, result["number"])
|
||||
|
||||
def test_read_comments_allowed_anywhere(self):
|
||||
comments = self.scoped.read_comments(99)
|
||||
self.assertTrue(len(comments) > 0)
|
||||
|
||||
def test_is_org_member_passes_through(self):
|
||||
inner = FakeForge(members=("alice",))
|
||||
scoped = ScopedForge(inner, assigned_issue=1, assigned_prs=[])
|
||||
self.assertTrue(scoped.is_org_member("org", "alice"))
|
||||
self.assertFalse(scoped.is_org_member("org", "bob"))
|
||||
|
||||
# --- writes: assigned numbers allowed ----------------------------------
|
||||
|
||||
def test_post_comment_on_assigned_issue(self):
|
||||
self.scoped.post_comment(10, "hi")
|
||||
self.assertIn((10, "hi"), self.inner.comments)
|
||||
|
||||
def test_post_comment_on_assigned_pr(self):
|
||||
self.scoped.post_comment(20, "lgtm")
|
||||
self.assertIn((20, "lgtm"), self.inner.comments)
|
||||
|
||||
def test_update_description_on_assigned(self):
|
||||
self.scoped.update_description(30, "updated")
|
||||
self.assertIn((30, "updated"), self.inner.descriptions)
|
||||
|
||||
# --- writes: unassigned numbers denied ---------------------------------
|
||||
|
||||
def test_post_comment_denied_for_unassigned(self):
|
||||
with self.assertRaises(PermissionError):
|
||||
self.scoped.post_comment(99, "nope")
|
||||
self.assertEqual([], self.inner.comments)
|
||||
|
||||
def test_update_description_denied_for_unassigned(self):
|
||||
with self.assertRaises(PermissionError):
|
||||
self.scoped.update_description(99, "nope")
|
||||
self.assertEqual([], self.inner.descriptions)
|
||||
|
||||
def test_error_message_names_number(self):
|
||||
try:
|
||||
self.scoped.post_comment(99, "nope")
|
||||
except PermissionError as exc:
|
||||
self.assertIn("99", str(exc))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,108 @@
|
||||
"""Unit: forge sidecar dispatch, op log, queue relay, socket server."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import socket
|
||||
import tempfile
|
||||
import threading
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
from bot_bottle.orchestrator.sidecar import (
|
||||
ForgeSidecar,
|
||||
OpLog,
|
||||
drain_done_events,
|
||||
serve,
|
||||
write_done_event,
|
||||
)
|
||||
|
||||
from ._fakes import FakeForge
|
||||
|
||||
|
||||
class SidecarDispatchTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.tmp = Path(self.enterContext(tempfile.TemporaryDirectory())) # pylint: disable=consider-using-with
|
||||
self.forge = FakeForge()
|
||||
self.log = OpLog(self.tmp / "ops.jsonl", now=lambda: "T")
|
||||
self.queue = self.tmp / "queue"
|
||||
self.sc = ForgeSidecar(
|
||||
forge=self.forge, op_log=self.log, queue_dir=self.queue,
|
||||
run_key=("o", "r", 17),
|
||||
)
|
||||
|
||||
def test_read_pr_ok_and_logged(self):
|
||||
resp = self.sc.dispatch("read_pr", {"number": 5})
|
||||
self.assertTrue(resp["ok"])
|
||||
self.assertEqual(5, resp["result"]["number"])
|
||||
self.assertEqual([("read_pr", 5, "ok")],
|
||||
[(o["op"], o["target"], o["detail"]) for o in self.log.read()])
|
||||
|
||||
def test_post_comment_writes_and_logs(self):
|
||||
resp = self.sc.dispatch("post_comment", {"number": 17, "body": "done"})
|
||||
self.assertTrue(resp["ok"])
|
||||
self.assertEqual([(17, "done")], self.forge.comments)
|
||||
|
||||
def test_scope_denied_write_returns_error_and_audits_rejection(self):
|
||||
self.forge.scope_denied.add(999)
|
||||
resp = self.sc.dispatch("post_comment", {"number": 999, "body": "x"})
|
||||
self.assertFalse(resp["ok"])
|
||||
self.assertIn("denied", resp["error"])
|
||||
# The rejection is recorded in the op log, not just the allows.
|
||||
self.assertIn("error", self.log.read()[-1]["detail"])
|
||||
self.assertEqual([], self.forge.comments)
|
||||
|
||||
def test_signal_done_queues_event(self):
|
||||
resp = self.sc.dispatch("signal_done", {"status": "success", "summary": "ok"})
|
||||
self.assertTrue(resp["ok"])
|
||||
events = drain_done_events(self.queue)
|
||||
self.assertEqual(1, len(events))
|
||||
self.assertEqual(("o", "r", 17, "success"),
|
||||
(events[0]["owner"], events[0]["repo"],
|
||||
events[0]["issue_number"], events[0]["status"]))
|
||||
|
||||
def test_unknown_method(self):
|
||||
resp = self.sc.dispatch("delete_repo", {})
|
||||
self.assertFalse(resp["ok"])
|
||||
|
||||
|
||||
class QueueTest(unittest.TestCase):
|
||||
def test_drain_removes_events(self):
|
||||
tmp = Path(self.enterContext(tempfile.TemporaryDirectory())) # pylint: disable=consider-using-with
|
||||
write_done_event(tmp, {"owner": "o", "repo": "r", "issue_number": 1})
|
||||
self.assertEqual(1, len(drain_done_events(tmp)))
|
||||
self.assertEqual([], drain_done_events(tmp)) # drained
|
||||
|
||||
def test_drain_missing_dir(self):
|
||||
self.assertEqual([], drain_done_events(Path("/nonexistent/queue")))
|
||||
|
||||
|
||||
class SocketServerTest(unittest.TestCase):
|
||||
def test_round_trip_over_unix_socket(self):
|
||||
tmp = tempfile.mkdtemp()
|
||||
sock = Path(tmp) / "s.sock"
|
||||
if len(str(sock)) > 100: # AF_UNIX path limit; skip on long tmp paths
|
||||
self.skipTest("temp socket path too long for AF_UNIX")
|
||||
sidecar = ForgeSidecar(
|
||||
forge=FakeForge(), op_log=OpLog(Path(tmp) / "ops.jsonl"),
|
||||
queue_dir=Path(tmp) / "q", run_key=("o", "r", 1),
|
||||
)
|
||||
srv = serve(sidecar, sock)
|
||||
t = threading.Thread(target=srv.handle_request, daemon=True)
|
||||
t.start()
|
||||
try:
|
||||
client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
client.connect(str(sock))
|
||||
client.sendall(b'{"method": "read_issue", "params": {"number": 3}}\n')
|
||||
line = client.makefile().readline()
|
||||
client.close()
|
||||
finally:
|
||||
t.join(timeout=5)
|
||||
srv.server_close()
|
||||
resp = json.loads(line)
|
||||
self.assertTrue(resp["ok"])
|
||||
self.assertEqual(3, resp["result"]["number"])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,50 @@
|
||||
"""Unit: InMemoryStateStore."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import unittest
|
||||
|
||||
from bot_bottle.orchestrator.model import RunRecord
|
||||
from bot_bottle.orchestrator.store import InMemoryStateStore
|
||||
|
||||
|
||||
def _rec(issue: int, owner: str = "o") -> RunRecord:
|
||||
return RunRecord(owner=owner, repo="r", issue_number=issue, slug=f"s{issue}",
|
||||
agent_name="a")
|
||||
|
||||
|
||||
class InMemoryStoreTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.store = InMemoryStateStore()
|
||||
|
||||
def test_upsert_get(self):
|
||||
self.store.upsert(_rec(1))
|
||||
got = self.store.get("o", "r", 1)
|
||||
assert got is not None
|
||||
self.assertEqual("s1", got.slug)
|
||||
|
||||
def test_get_missing(self):
|
||||
self.assertIsNone(self.store.get("o", "r", 99))
|
||||
|
||||
def test_upsert_replaces(self):
|
||||
self.store.upsert(_rec(1))
|
||||
r = _rec(1)
|
||||
r.slug = "changed"
|
||||
self.store.upsert(r)
|
||||
self.assertEqual("changed", self.store.get("o", "r", 1).slug) # type: ignore[union-attr]
|
||||
self.assertEqual(1, len(self.store.all()))
|
||||
|
||||
def test_delete(self):
|
||||
self.store.upsert(_rec(1))
|
||||
self.store.delete("o", "r", 1)
|
||||
self.assertIsNone(self.store.get("o", "r", 1))
|
||||
|
||||
def test_all_sorted(self):
|
||||
self.store.upsert(_rec(2, owner="b"))
|
||||
self.store.upsert(_rec(1, owner="a"))
|
||||
self.assertEqual([("a", 1), ("b", 2)],
|
||||
[(r.owner, r.issue_number) for r in self.store.all()])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,60 @@
|
||||
"""Unit: targeting (labels + org membership)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import unittest
|
||||
|
||||
from bot_bottle.orchestrator.model import IssueAssigned
|
||||
from bot_bottle.orchestrator.targeting import parse_labels, resolve_target
|
||||
|
||||
from ._fakes import FakeForge
|
||||
|
||||
|
||||
def _issue(
|
||||
assignees: tuple[str, ...] = ("agent-bot",),
|
||||
labels: tuple[str, ...] = ("bot-bottle:implementer",),
|
||||
) -> IssueAssigned:
|
||||
return IssueAssigned(
|
||||
owner="didericis", repo="bot-bottle", issue_number=17,
|
||||
title="t", body="b", assignees=tuple(assignees), labels=tuple(labels),
|
||||
)
|
||||
|
||||
|
||||
class ParseLabelsTest(unittest.TestCase):
|
||||
def test_agent_label(self):
|
||||
self.assertEqual(("implementer", None), parse_labels(("bot-bottle:implementer",)))
|
||||
|
||||
def test_bottle_override_not_confused_with_agent(self):
|
||||
agent, bottle = parse_labels(("bot-bottle:impl", "bot-bottle-bottle:dev"))
|
||||
self.assertEqual(("impl", "dev"), (agent, bottle))
|
||||
|
||||
def test_no_agent_label(self):
|
||||
self.assertEqual((None, None), parse_labels(("bug", "p1")))
|
||||
|
||||
|
||||
class ResolveTargetTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.forge = FakeForge(members=("agent-bot",))
|
||||
|
||||
def test_targeted(self):
|
||||
target = resolve_target(_issue(), self.forge, "bot-bottle")
|
||||
assert target is not None
|
||||
self.assertEqual("implementer", target.agent_name)
|
||||
self.assertIsNone(target.bottle_override)
|
||||
|
||||
def test_bottle_override(self):
|
||||
ev = _issue(labels=("bot-bottle:impl", "bot-bottle-bottle:dev"))
|
||||
target = resolve_target(ev, self.forge, "bot-bottle")
|
||||
assert target is not None
|
||||
self.assertEqual("dev", target.bottle_override)
|
||||
|
||||
def test_no_label_not_targeted(self):
|
||||
self.assertIsNone(resolve_target(_issue(labels=("bug",)), self.forge, "bot-bottle"))
|
||||
|
||||
def test_non_member_assignee_not_targeted(self):
|
||||
ev = _issue(assignees=("random-user",))
|
||||
self.assertIsNone(resolve_target(ev, self.forge, "bot-bottle"))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,66 @@
|
||||
"""Unit: watchdog sweep."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import unittest
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from bot_bottle.orchestrator.model import STATUS_FROZEN, STATUS_RUNNING, RunRecord
|
||||
from bot_bottle.orchestrator.store import InMemoryStateStore
|
||||
from bot_bottle.orchestrator.watchdog import Watchdog
|
||||
|
||||
from ._fakes import FakeRunner
|
||||
|
||||
_NOW = datetime(2026, 7, 1, 12, 0, 0).astimezone()
|
||||
|
||||
|
||||
def _record(issue: int, status: str, checkin: str) -> RunRecord:
|
||||
return RunRecord(
|
||||
owner="o", repo="r", issue_number=issue, slug=f"s{issue}",
|
||||
agent_name="a", status=status, last_checkin_at=checkin,
|
||||
)
|
||||
|
||||
|
||||
class WatchdogSweepTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.store = InMemoryStateStore()
|
||||
self.runner = FakeRunner()
|
||||
self.wd = Watchdog(store=self.store, runner=self.runner, timeout_secs=1800)
|
||||
|
||||
def _status(self, issue: int) -> str:
|
||||
rec = self.store.get("o", "r", issue)
|
||||
assert rec is not None
|
||||
return rec.status
|
||||
|
||||
def test_stale_running_is_frozen(self):
|
||||
stale = (_NOW - timedelta(minutes=31)).isoformat()
|
||||
self.store.upsert(_record(1, STATUS_RUNNING, stale))
|
||||
fired = self.wd.sweep(_NOW)
|
||||
self.assertEqual([1], [r.issue_number for r in fired])
|
||||
self.assertEqual(STATUS_FROZEN, self._status(1))
|
||||
self.assertIn(("freeze", "s1"), self.runner.calls)
|
||||
|
||||
def test_fresh_running_untouched(self):
|
||||
fresh = (_NOW - timedelta(minutes=5)).isoformat()
|
||||
self.store.upsert(_record(2, STATUS_RUNNING, fresh))
|
||||
self.assertEqual([], self.wd.sweep(_NOW))
|
||||
self.assertEqual(STATUS_RUNNING, self._status(2))
|
||||
|
||||
def test_non_running_ignored(self):
|
||||
stale = (_NOW - timedelta(hours=2)).isoformat()
|
||||
self.store.upsert(_record(3, STATUS_FROZEN, stale))
|
||||
self.assertEqual([], self.wd.sweep(_NOW))
|
||||
|
||||
def test_unparseable_checkin_skipped(self):
|
||||
self.store.upsert(_record(4, STATUS_RUNNING, "not-a-time"))
|
||||
self.assertEqual([], self.wd.sweep(_NOW))
|
||||
|
||||
def test_start_and_stop(self):
|
||||
# Exercises the daemon-thread start/stop path; stop sets the event
|
||||
# so the loop's wait returns immediately.
|
||||
self.wd.start()
|
||||
self.wd.stop()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,155 @@
|
||||
"""Unit: webhook HTTP surface (signature + routing over a real server)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import hmac
|
||||
import json
|
||||
import threading
|
||||
import unittest
|
||||
import urllib.request
|
||||
from urllib.error import HTTPError
|
||||
|
||||
from bot_bottle.orchestrator.model import RunRecord
|
||||
from bot_bottle.orchestrator.store import InMemoryStateStore
|
||||
from bot_bottle.orchestrator.webhook import WebhookServer, verify_signature
|
||||
|
||||
_ISSUE_ASSIGNED = {
|
||||
"action": "assigned",
|
||||
"repository": {"name": "bot-bottle", "owner": {"login": "didericis"}},
|
||||
"issue": {
|
||||
"number": 17, "title": "t", "body": "b",
|
||||
"assignees": [{"login": "agent-bot"}],
|
||||
"labels": [{"name": "bot-bottle:impl"}],
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
class _RecordingOrch:
|
||||
def __init__(self) -> None:
|
||||
self.events: list[object] = []
|
||||
|
||||
def handle(self, event: object) -> None:
|
||||
self.events.append(event)
|
||||
|
||||
|
||||
class SignatureTest(unittest.TestCase):
|
||||
def test_verify(self):
|
||||
secret = b"s3cret"
|
||||
body = b'{"x":1}'
|
||||
sig = hmac.new(secret, body, hashlib.sha256).hexdigest()
|
||||
self.assertTrue(verify_signature(secret, body, sig))
|
||||
self.assertFalse(verify_signature(secret, body, "deadbeef"))
|
||||
|
||||
|
||||
class WebhookServerTest(unittest.TestCase):
|
||||
# _serve is the per-test setup; attributes are assigned there.
|
||||
# pylint: disable=attribute-defined-outside-init
|
||||
def _serve(self, **kwargs: object) -> None:
|
||||
self.orch = _RecordingOrch()
|
||||
kwargs.setdefault("store", InMemoryStateStore())
|
||||
self.server = WebhookServer(
|
||||
("127.0.0.1", 0), orchestrator=self.orch, **kwargs, # type: ignore[arg-type]
|
||||
)
|
||||
self.port = self.server.server_address[1]
|
||||
self.thread = threading.Thread(target=self.server.serve_forever, daemon=True)
|
||||
self.thread.start()
|
||||
self.addCleanup(self._shutdown)
|
||||
|
||||
def _shutdown(self) -> None:
|
||||
self.server.shutdown()
|
||||
self.server.server_close()
|
||||
self.thread.join(timeout=5)
|
||||
|
||||
def _post(
|
||||
self, path: str, body: bytes, headers: dict[str, str] | None = None
|
||||
) -> tuple[int, dict[str, object]]:
|
||||
req = urllib.request.Request(
|
||||
f"http://127.0.0.1:{self.port}{path}", data=body, method="POST",
|
||||
headers=headers or {},
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=5) as resp:
|
||||
return resp.status, json.loads(resp.read())
|
||||
|
||||
def _get(self, path: str) -> tuple[int, dict[str, object]]:
|
||||
with urllib.request.urlopen(f"http://127.0.0.1:{self.port}{path}", timeout=5) as r:
|
||||
return r.status, json.loads(r.read())
|
||||
|
||||
def test_webhook_dispatches(self):
|
||||
self._serve()
|
||||
body = json.dumps(_ISSUE_ASSIGNED).encode()
|
||||
status, payload = self._post("/webhook", body, {"X-Gitea-Event": "issues"})
|
||||
self.assertEqual(200, status)
|
||||
self.assertTrue(payload["handled"])
|
||||
self.assertEqual(1, len(self.orch.events))
|
||||
|
||||
def test_unhandled_event_ok_but_not_handled(self):
|
||||
self._serve()
|
||||
body = json.dumps({"action": "push"}).encode()
|
||||
_status, payload = self._post("/webhook", body, {"X-Gitea-Event": "push"})
|
||||
self.assertFalse(payload["handled"])
|
||||
self.assertEqual([], self.orch.events)
|
||||
|
||||
def test_invalid_json_400(self):
|
||||
self._serve()
|
||||
with self.assertRaises(HTTPError) as ctx:
|
||||
self._post("/webhook", b"{not json", {"X-Gitea-Event": "issues"})
|
||||
self.assertEqual(400, ctx.exception.code)
|
||||
|
||||
def test_bad_signature_rejected(self):
|
||||
self._serve(secret=b"sekret")
|
||||
body = json.dumps(_ISSUE_ASSIGNED).encode()
|
||||
with self.assertRaises(HTTPError) as ctx:
|
||||
self._post("/webhook", body,
|
||||
{"X-Gitea-Event": "issues", "X-Gitea-Signature": "deadbeef"})
|
||||
self.assertEqual(401, ctx.exception.code)
|
||||
self.assertEqual([], self.orch.events)
|
||||
|
||||
def test_good_signature_accepted(self):
|
||||
self._serve(secret=b"sekret")
|
||||
body = json.dumps(_ISSUE_ASSIGNED).encode()
|
||||
sig = hmac.new(b"sekret", body, hashlib.sha256).hexdigest()
|
||||
status, _payload = self._post(
|
||||
"/webhook", body, {"X-Gitea-Event": "issues", "X-Gitea-Signature": sig})
|
||||
self.assertEqual(200, status)
|
||||
self.assertEqual(1, len(self.orch.events))
|
||||
|
||||
def test_healthz(self):
|
||||
self._serve()
|
||||
self.assertEqual(200, self._get("/healthz")[0])
|
||||
|
||||
def test_unknown_path_404(self):
|
||||
self._serve()
|
||||
with self.assertRaises(HTTPError) as ctx:
|
||||
self._post("/nope", b"{}", {"X-Gitea-Event": "issues"})
|
||||
self.assertEqual(404, ctx.exception.code)
|
||||
|
||||
def test_provenance_returns_record_and_ops(self):
|
||||
store = InMemoryStateStore()
|
||||
store.upsert(RunRecord(owner="didericis", repo="bot-bottle", issue_number=17,
|
||||
slug="impl-17", agent_name="impl", bottle_names=["claude"]))
|
||||
|
||||
def reader(rec: object) -> list[dict[str, object]]: # pylint: disable=unused-argument
|
||||
return [{"at": "T", "op": "post_comment", "target": 17, "detail": "ok"}]
|
||||
|
||||
self._serve(store=store, op_log_reader=reader)
|
||||
status, payload = self._get("/provenance?owner=didericis&repo=bot-bottle&issue=17")
|
||||
self.assertEqual(200, status)
|
||||
self.assertEqual("impl-17", payload["slug"])
|
||||
self.assertEqual(1, len(payload["ops"])) # type: ignore[arg-type]
|
||||
|
||||
def test_provenance_missing_params_400(self):
|
||||
self._serve()
|
||||
with self.assertRaises(HTTPError) as ctx:
|
||||
self._get("/provenance?owner=didericis")
|
||||
self.assertEqual(400, ctx.exception.code)
|
||||
|
||||
def test_provenance_unknown_run_404(self):
|
||||
self._serve()
|
||||
with self.assertRaises(HTTPError) as ctx:
|
||||
self._get("/provenance?owner=x&repo=y&issue=1")
|
||||
self.assertEqual(404, ctx.exception.code)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user