Compare commits

..

8 Commits

Author SHA1 Message Date
didericis-claude 7278ee1157 fix(claude): fall back to macOS Keychain for credentials
lint / lint (push) Failing after 2m10s
test / unit (pull_request) Successful in 57s
test / integration (pull_request) Successful in 22s
test / coverage (pull_request) Successful in 1m6s
On macOS, Claude Code stores credentials in the Keychain under
service "Claude Code-credentials" rather than in a file. When
~/.claude/.credentials.json is absent, shell out to:
  security find-generic-password -s "Claude Code-credentials" -w
and parse the result as the same JSON schema.

~/.claude.json holds only profile/UI metadata (oauthAccount has
no token fields). expiresAt in the credentials is milliseconds.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-07-01 21:46:53 +00:00
didericis-claude bdd352570b fix(claude): read credentials from ~/.claude/.credentials.json
lint / lint (push) Successful in 2m10s
test / unit (pull_request) Successful in 53s
test / integration (pull_request) Successful in 16s
test / coverage (pull_request) Successful in 1m8s
The actual OAuth token is in ~/.claude/.credentials.json under
claudeAiOauth.accessToken, not in ~/.claude.json.
~/.claude.json holds only UI state and profile metadata (oauthAccount
has no token fields). expiresAt in the credentials file is milliseconds,
not seconds.

Discovered after testing against Claude Code 2.1.198.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-07-01 21:33:04 +00:00
didericis-claude f0d27863c2 feat(claude): add forward_host_credentials support
lint / lint (push) Successful in 2m19s
test / unit (pull_request) Successful in 1m2s
test / integration (pull_request) Successful in 22s
test / coverage (pull_request) Successful in 1m14s
Reads the host's Claude OAuth session key from ~/.claude.json at launch
and forwards it only to the egress sidecar (never to the agent), placing
a placeholder CLAUDE_CODE_OAUTH_TOKEN in the agent env so Claude Code
starts without seeing the real credential.

Mirrors the existing Codex forward_host_credentials flow (PRD 0029).
Adds claude_auth.py to extract and validate the sessionKey, a
CLAUDE_HOST_CREDENTIAL_TOKEN_REF constant in egress.py, and updates
manifest_agent.py to allow the flag for both 'codex' and 'claude'
templates. Also adds a mutual-exclusion check that rejects setting
both auth_token and forward_host_credentials together.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-07-01 21:14:37 +00:00
didericis-claude 71699b3ecd fix: resolve pylint/pyright issues in new test files
lint / lint (push) Successful in 2m7s
test / unit (pull_request) Successful in 57s
test / integration (pull_request) Successful in 17s
test / coverage (pull_request) Successful in 1m4s
- test_contrib_gitea_client: remove unused Any import, fix _mock_response
  to use return_value instead of lambda (unknown lambda type), narrow
  HTTPError hdrs type, add type annotations to fake_urlopen helpers,
  suppress protected-access for _request tests
- test_bootstrap: annotate **kw as **kw: object, use dict literal,
  unpack server_address via index to avoid tuple type mismatch
- test_main: remove unused MagicMock import
- test_watchdog: guard store.get() result before accessing .status

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-07-01 19:47:31 +00:00
didericis-claude 57290da1e8 test: add coverage for orchestrator + gitea client (diff gate 77% → 98%)
lint / lint (push) Failing after 2m5s
test / unit (pull_request) Successful in 53s
test / integration (pull_request) Successful in 24s
test / coverage (pull_request) Successful in 1m12s
Three new unit test modules:
- tests/unit/test_contrib_gitea_client.py — GiteaClient (urllib mocked)
  and GiteaForge delegation
- tests/unit/orchestrator/test_main.py — __main__ run/status commands
- tests/unit/orchestrator/test_bootstrap.py — _token, BotBottleStateStore,
  _to_forge_state/_to_record, make_forge, make_sidecar, build

Augments to existing suites:
- test_events: non-"created" comment action ignored
- test_lifecycle: _iso_now callable, untracked-issue comment ignored,
  untracked-PR closed ignored (covers _find_by_pr return-None path)
- test_runner: destroy command, _default_run via subprocess mock
- test_sidecar: _jsonable dataclass/list branches, OpLog.read on missing
  file, drain_done_events on corrupted file, socket _Handler invalid-JSON
  and empty-line paths, serve() with pre-existing socket path
- test_watchdog: _loop body covered by patching _TICK_SECS to 0.01s
- test_webhook: unknown GET path returns 404

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-07-01 19:35:30 +00:00
didericis-claude df1f0e8f70 docs: mark fold-orchestrator PRD as Active
lint / lint (push) Successful in 2m4s
test / unit (pull_request) Successful in 56s
test / integration (pull_request) Successful in 22s
test / coverage (pull_request) Failing after 1m7s
2026-07-01 17:18:38 +00:00
didericis-claude 314dc03b0d feat: fold bot-bottle-orchestrator into bot_bottle/orchestrator subpackage
Moves the orchestrator into bot_bottle/orchestrator/ so one install gets
everything. Entry point is now `python -m bot_bottle.orchestrator run`.

- Add bot_bottle/orchestrator/ with all 14 modules (verbatim move; internal
  imports were already relative, so no changes inside orchestrator modules)
- Rewrite bootstrap.py: remove the lazy bot_bottle import guard, use direct
  relative imports from ..contrib.*
- Add bot_bottle/contrib/forge/base.py: ScopedForge (read-anywhere / write-scoped)
- Add bot_bottle/contrib/gitea/client.py: GiteaClient + GiteaForge (urllib.request only)
- Add bot_bottle/contrib/gitea/forge_state.py: ForgeState + SqliteForgeStateStore
- Add tests/unit/orchestrator/ (82 tests: 63 migrated + 19 new for contrib modules)

Closes #321
2026-07-01 17:18:28 +00:00
didericis-claude 06025687ed docs: add PRD for folding orchestrator into bot-bottle subpackage 2026-07-01 17:14:43 +00:00
46 changed files with 3888 additions and 13 deletions
+4
View File
@@ -45,6 +45,10 @@ PROVIDER_TEMPLATES = frozenset({PROVIDER_CLAUDE, PROVIDER_CODEX, PROVIDER_PI})
# forward_host_credentials is enabled. Pipelock must pass these through # forward_host_credentials is enabled. Pipelock must pass these through
# (no TLS MITM) or its header DLP blocks the injected JWT. # (no TLS MITM) or its header DLP blocks the injected JWT.
CODEX_HOST_CREDENTIAL_HOSTS = ("api.openai.com", "chatgpt.com") CODEX_HOST_CREDENTIAL_HOSTS = ("api.openai.com", "chatgpt.com")
# Host that egress injects the host Claude bearer on when Claude
# forward_host_credentials is enabled.
CLAUDE_HOST_CREDENTIAL_HOSTS = ("api.anthropic.com",)
PromptMode = Literal[ PromptMode = Literal[
"append_file", "append_file",
"read_prompt_file", "read_prompt_file",
+17 -5
View File
@@ -23,8 +23,9 @@ from ...agent_provider import (
provider_startup_args, provider_startup_args,
) )
from ...backend.docker import util as docker_mod from ...backend.docker import util as docker_mod
from ...egress import EgressRoute from ...egress import CLAUDE_HOST_CREDENTIAL_TOKEN_REF, EgressRoute
from ...log import die, info, warn from ...log import die, info, warn
from .claude_auth import claude_host_access_token
if TYPE_CHECKING: if TYPE_CHECKING:
@@ -115,7 +116,6 @@ class ClaudeAgentProvider(AgentProvider):
color: str = "", color: str = "",
provider_settings: dict[str, object] | None = None, provider_settings: dict[str, object] | None = None,
) -> AgentProvisionPlan: ) -> AgentProvisionPlan:
del forward_host_credentials, host_env
resolved_guest_env = dict(guest_env or {}) resolved_guest_env = dict(guest_env or {})
startup_args = provider_startup_args(provider_settings) startup_args = provider_startup_args(provider_settings)
guest_home = self.guest_home guest_home = self.guest_home
@@ -177,13 +177,24 @@ class ClaudeAgentProvider(AgentProvider):
claude_settings, claude_settings,
f"{guest_home}/.claude/settings.json", f"{guest_home}/.claude/settings.json",
)) ))
provisioned_env: dict[str, str] = {}
if forward_host_credentials:
_host_env = host_env or dict(os.environ)
provisioned_env[CLAUDE_HOST_CREDENTIAL_TOKEN_REF] = (
claude_host_access_token(_host_env)
)
cred_token_ref = (
CLAUDE_HOST_CREDENTIAL_TOKEN_REF if forward_host_credentials
else auth_token
)
egress_routes = (EgressRoute( egress_routes = (EgressRoute(
host="api.anthropic.com", host="api.anthropic.com",
auth_scheme="Bearer" if auth_token else "", auth_scheme="Bearer" if (auth_token or forward_host_credentials) else "",
token_ref=auth_token, token_ref=cred_token_ref,
),) ),)
hidden_env_names: frozenset[str] = frozenset() hidden_env_names: frozenset[str] = frozenset()
if auth_token: if auth_token or forward_host_credentials:
env_vars["CLAUDE_CODE_OAUTH_TOKEN"] = "egress-placeholder" env_vars["CLAUDE_CODE_OAUTH_TOKEN"] = "egress-placeholder"
hidden_env_names = frozenset({"CLAUDE_CODE_OAUTH_TOKEN"}) hidden_env_names = frozenset({"CLAUDE_CODE_OAUTH_TOKEN"})
@@ -205,6 +216,7 @@ class ClaudeAgentProvider(AgentProvider):
files=tuple(files), files=tuple(files),
egress_routes=egress_routes, egress_routes=egress_routes,
hidden_env_names=hidden_env_names, hidden_env_names=hidden_env_names,
provisioned_env=provisioned_env,
) )
def provision_skills(self, plan: "BottlePlan", bottle: "Bottle") -> None: def provision_skills(self, plan: "BottlePlan", bottle: "Bottle") -> None:
+114
View File
@@ -0,0 +1,114 @@
"""Host Claude auth helpers.
Reads the host's Claude Code credentials and returns only the access
token needed by egress. Does not expose refresh tokens or raw payloads.
Credential storage by platform:
Linux — ~/.claude/.credentials.json
macOS — macOS Keychain, service "Claude Code-credentials"
(file path is tried first; Keychain is the fallback)
"""
from __future__ import annotations
import json
import os
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
from ...log import die
_KEYCHAIN_SERVICE = "Claude Code-credentials"
def claude_auth_path(host_env: dict[str, str] | None = None) -> Path:
env = os.environ if host_env is None else host_env
home = env.get("HOME")
if home:
return Path(home) / ".claude" / ".credentials.json"
return Path.home() / ".claude" / ".credentials.json"
def _read_keychain() -> dict[str, object] | None:
"""Try the macOS Keychain. Returns parsed JSON dict or None."""
if sys.platform != "darwin":
return None
try:
result = subprocess.run(
["security", "find-generic-password", "-s", _KEYCHAIN_SERVICE, "-w"],
capture_output=True,
text=True,
timeout=10,
)
except (FileNotFoundError, subprocess.TimeoutExpired):
return None
if result.returncode != 0 or not result.stdout.strip():
return None
try:
raw = json.loads(result.stdout.strip())
except json.JSONDecodeError:
return None
return raw if isinstance(raw, dict) else None
def claude_host_access_token(
host_env: dict[str, str] | None = None,
*,
now: datetime | None = None,
) -> str:
path = claude_auth_path(host_env)
raw: dict[str, object] | None = None
if path.is_file():
try:
raw = json.loads(path.read_text())
except (OSError, json.JSONDecodeError) as e:
die(f"claude host credentials: could not read valid JSON at {path}: {e}")
if not isinstance(raw, dict):
die(f"claude host credentials: {path} must contain a JSON object")
else:
raw = _read_keychain()
if raw is None:
die(
f"claude host credentials: auth file missing at {path} and "
f"macOS Keychain lookup for '{_KEYCHAIN_SERVICE}' failed. "
"Run `claude login` on the host or disable "
"agent_provider.forward_host_credentials."
)
oauth = raw.get("claudeAiOauth")
if not isinstance(oauth, dict):
die(
"claude host credentials: claudeAiOauth is missing from credentials. "
"Run `claude login` on the host or disable "
"agent_provider.forward_host_credentials."
)
access_token = oauth.get("accessToken")
if not isinstance(access_token, str) or not access_token:
die(
"claude host credentials: claudeAiOauth.accessToken is missing or empty. "
"Run `claude login` on the host and restart the bottle."
)
# expiresAt is in milliseconds
expires_at = oauth.get("expiresAt")
if isinstance(expires_at, (int, float)):
check_now = now or datetime.now(timezone.utc)
exp_dt = datetime.fromtimestamp(float(expires_at) / 1000.0, timezone.utc)
if exp_dt <= check_now:
die(
"claude host credentials: host Claude access token is expired. "
"Run `claude login` on the host and restart the bottle."
)
return access_token
__all__ = [
"claude_auth_path",
"claude_host_access_token",
]
+52
View File
@@ -0,0 +1,52 @@
"""Scoped forge wrapper: read-anywhere / write-scoped access control.
`ScopedForge` wraps any forge object and restricts write operations to
the set of issue/PR numbers the agent is explicitly assigned to. Read
operations always pass through unconditionally.
"""
from __future__ import annotations
from typing import Any
class ScopedForge:
"""Delegates all forge calls to an inner forge, raising `PermissionError`
on write calls for numbers outside the assigned scope."""
def __init__(
self,
forge: Any,
*,
assigned_issue: int,
assigned_prs: list[int],
) -> None:
self._forge = forge
self._allowed_writes: frozenset[int] = frozenset({assigned_issue, *assigned_prs})
def _check_write(self, number: int) -> None:
if number not in self._allowed_writes:
raise PermissionError(
f"write to #{number} is outside the assigned scope "
f"(allowed: {sorted(self._allowed_writes)})"
)
def is_org_member(self, org: str, username: str) -> bool:
return self._forge.is_org_member(org, username)
def read_issue(self, number: int) -> dict[str, Any]:
return self._forge.read_issue(number)
def read_pr(self, number: int) -> dict[str, Any]:
return self._forge.read_pr(number)
def read_comments(self, number: int) -> list[dict[str, Any]]:
return self._forge.read_comments(number)
def post_comment(self, number: int, body: str) -> None:
self._check_write(number)
self._forge.post_comment(number, body)
def update_description(self, number: int, body: str) -> None:
self._check_write(number)
self._forge.update_description(number, body)
+112
View File
@@ -0,0 +1,112 @@
"""Gitea API client and forge adapter (PRD prd-new: fold orchestrator).
`GiteaClient` is a thin HTTP wrapper (stdlib `urllib.request` only — no
new runtime dependencies). `GiteaForge` composes a client and exposes
the forge protocol used by the orchestrator's sidecar and lifecycle.
Required Gitea token scopes:
- Repository: Read & Write (issues, comments, PR descriptions)
- Organization: Read (org membership check)
"""
from __future__ import annotations
import json
import urllib.error
import urllib.request
from typing import Any
_TIMEOUT_SECS = 30
class GiteaClient:
"""Low-level HTTP wrapper for the Gitea REST API."""
def __init__(
self, *, api_url: str, owner: str, repo: str, token: str
) -> None:
self._base = api_url.rstrip("/")
self._owner = owner
self._repo = repo
self._headers = {
"Authorization": f"token {token}",
"Content-Type": "application/json",
"Accept": "application/json",
}
def _request(
self,
method: str,
path: str,
body: dict[str, Any] | None = None,
) -> Any:
url = f"{self._base}{path}"
data = json.dumps(body).encode() if body is not None else None
req = urllib.request.Request(
url, data=data, headers=self._headers, method=method
)
with urllib.request.urlopen(req, timeout=_TIMEOUT_SECS) as resp:
raw = resp.read()
return json.loads(raw) if raw else None
def is_org_member(self, org: str, username: str) -> bool:
url = f"{self._base}/orgs/{org}/members/{username}"
req = urllib.request.Request(url, headers=self._headers, method="GET")
try:
urllib.request.urlopen(req, timeout=_TIMEOUT_SECS).close()
return True
except urllib.error.HTTPError:
return False
def get_issue(self, number: int) -> dict[str, Any]:
return self._request("GET", f"/repos/{self._owner}/{self._repo}/issues/{number}")
def get_pull(self, number: int) -> dict[str, Any]:
return self._request("GET", f"/repos/{self._owner}/{self._repo}/pulls/{number}")
def list_comments(self, number: int) -> list[dict[str, Any]]:
return self._request("GET", f"/repos/{self._owner}/{self._repo}/issues/{number}/comments")
def create_comment(self, number: int, body: str) -> None:
self._request(
"POST",
f"/repos/{self._owner}/{self._repo}/issues/{number}/comments",
{"body": body},
)
def update_issue(self, number: int, body: str) -> None:
self._request(
"PATCH",
f"/repos/{self._owner}/{self._repo}/issues/{number}",
{"body": body},
)
class GiteaForge:
"""Adapts `GiteaClient` to the forge protocol expected by the orchestrator.
The forge protocol is duck-typed: any object with `is_org_member`,
`read_issue`, `read_pr`, `read_comments`, `post_comment`, and
`update_description` methods satisfies it.
"""
def __init__(self, client: GiteaClient) -> None:
self._client = client
def is_org_member(self, org: str, username: str) -> bool:
return self._client.is_org_member(org, username)
def read_issue(self, number: int) -> dict[str, Any]:
return self._client.get_issue(number)
def read_pr(self, number: int) -> dict[str, Any]:
return self._client.get_pull(number)
def read_comments(self, number: int) -> list[dict[str, Any]]:
return self._client.list_comments(number)
def post_comment(self, number: int, body: str) -> None:
self._client.create_comment(number, body)
def update_description(self, number: int, body: str) -> None:
self._client.update_issue(number, body)
+137
View File
@@ -0,0 +1,137 @@
"""Forge state persistence for the orchestrator (PRD prd-new: fold orchestrator).
`ForgeState` is a dataclass that mirrors the orchestrator's `RunRecord`
field-for-field, held here so the store implementation is in bot-bottle
where the Gitea contrib lives.
`SqliteForgeStateStore` backs it with a single SQLite table. The DB path
is optional; passing `None` uses `:memory:` (useful for tests and status
commands that don't need persistence).
"""
from __future__ import annotations
import json
import sqlite3
from dataclasses import dataclass, field
from pathlib import Path
@dataclass
class ForgeState:
"""Persisted state for one forge-targeted issue's bottle lifecycle."""
owner: str
repo: str
issue_number: int
slug: str
agent_name: str
bottle_names: list[str] = field(default_factory=list)
backend_name: str = ""
agent_git_user: str = ""
pr_number: int | None = None
status: str = ""
last_checkin_at: str = ""
_DDL = """
CREATE TABLE IF NOT EXISTS forge_state (
owner TEXT NOT NULL,
repo TEXT NOT NULL,
issue_number INTEGER NOT NULL,
slug TEXT NOT NULL,
agent_name TEXT NOT NULL,
bottle_names TEXT NOT NULL DEFAULT '[]',
backend_name TEXT NOT NULL DEFAULT '',
agent_git_user TEXT NOT NULL DEFAULT '',
pr_number INTEGER,
status TEXT NOT NULL DEFAULT '',
last_checkin_at TEXT NOT NULL DEFAULT '',
PRIMARY KEY (owner, repo, issue_number)
)
"""
class SqliteForgeStateStore:
"""SQLite-backed `ForgeState` store.
Thread-safety: a single connection is used; callers that share a
store across threads must serialise access externally.
"""
def __init__(self, db_path: Path | None) -> None:
path = str(db_path) if db_path is not None else ":memory:"
self._conn = sqlite3.connect(path, check_same_thread=False)
self._conn.row_factory = sqlite3.Row
self._conn.execute(_DDL)
self._conn.commit()
def upsert(self, state: ForgeState) -> None:
self._conn.execute(
"""
INSERT INTO forge_state
(owner, repo, issue_number, slug, agent_name,
bottle_names, backend_name, agent_git_user,
pr_number, status, last_checkin_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(owner, repo, issue_number) DO UPDATE SET
slug = excluded.slug,
agent_name = excluded.agent_name,
bottle_names = excluded.bottle_names,
backend_name = excluded.backend_name,
agent_git_user = excluded.agent_git_user,
pr_number = excluded.pr_number,
status = excluded.status,
last_checkin_at = excluded.last_checkin_at
""",
(
state.owner,
state.repo,
state.issue_number,
state.slug,
state.agent_name,
json.dumps(state.bottle_names),
state.backend_name,
state.agent_git_user,
state.pr_number,
state.status,
state.last_checkin_at,
),
)
self._conn.commit()
def get(self, owner: str, repo: str, issue_number: int) -> ForgeState | None:
row = self._conn.execute(
"SELECT * FROM forge_state WHERE owner=? AND repo=? AND issue_number=?",
(owner, repo, issue_number),
).fetchone()
return _row_to_state(row) if row is not None else None
def delete(self, owner: str, repo: str, issue_number: int) -> None:
self._conn.execute(
"DELETE FROM forge_state WHERE owner=? AND repo=? AND issue_number=?",
(owner, repo, issue_number),
)
self._conn.commit()
def all(self) -> list[ForgeState]:
rows = self._conn.execute(
"SELECT * FROM forge_state ORDER BY owner, repo, issue_number"
).fetchall()
return [_row_to_state(r) for r in rows]
def _row_to_state(row: sqlite3.Row) -> ForgeState:
return ForgeState(
owner=row["owner"],
repo=row["repo"],
issue_number=row["issue_number"],
slug=row["slug"],
agent_name=row["agent_name"],
bottle_names=json.loads(row["bottle_names"]),
backend_name=row["backend_name"],
agent_git_user=row["agent_git_user"],
pr_number=row["pr_number"],
status=row["status"],
last_checkin_at=row["last_checkin_at"],
)
+2
View File
@@ -29,6 +29,7 @@ if TYPE_CHECKING:
from .manifest import ManifestBottle from .manifest import ManifestBottle
CODEX_HOST_CREDENTIAL_TOKEN_REF = "BOT_BOTTLE_CODEX_HOST_ACCESS_TOKEN" CODEX_HOST_CREDENTIAL_TOKEN_REF = "BOT_BOTTLE_CODEX_HOST_ACCESS_TOKEN"
CLAUDE_HOST_CREDENTIAL_TOKEN_REF = "BOT_BOTTLE_CLAUDE_HOST_ACCESS_TOKEN"
EGRESS_HOSTNAME = "egress" EGRESS_HOSTNAME = "egress"
@@ -397,6 +398,7 @@ class Egress(ABC):
) )
__all__ = [ __all__ = [
"CLAUDE_HOST_CREDENTIAL_TOKEN_REF",
"CODEX_HOST_CREDENTIAL_TOKEN_REF", "CODEX_HOST_CREDENTIAL_TOKEN_REF",
"EGRESS_HOSTNAME", "EGRESS_HOSTNAME",
"EGRESS_ROUTES_FILENAME", "EGRESS_ROUTES_FILENAME",
+10 -4
View File
@@ -25,8 +25,9 @@ class ManifestAgentProvider:
header, and sets a placeholder CLAUDE_CODE_OAUTH_TOKEN in the agent header, and sets a placeholder CLAUDE_CODE_OAUTH_TOKEN in the agent
so the Claude Code CLI starts. so the Claude Code CLI starts.
`forward_host_credentials` forwards the host Codex auth token into `forward_host_credentials` forwards the host provider auth token into
the egress sidecar (Codex only). the egress sidecar (Codex and Claude). For Codex this reads
`~/.codex/auth.json`; for Claude it reads `~/.claude.json`.
""" """
template: str = "claude" template: str = "claude"
@@ -92,10 +93,15 @@ class ManifestAgentProvider:
f"is only supported for built-in templates " f"is only supported for built-in templates "
f"({', '.join(sorted(PROVIDER_TEMPLATES))})" f"({', '.join(sorted(PROVIDER_TEMPLATES))})"
) )
if forward_host_credentials and template != "codex": if forward_host_credentials and template not in {"codex", "claude"}:
raise ManifestError( raise ManifestError(
f"bottle '{bottle_name}' agent_provider.forward_host_credentials " f"bottle '{bottle_name}' agent_provider.forward_host_credentials "
"is currently only supported for template 'codex'" "is only supported for templates 'codex' and 'claude'"
)
if forward_host_credentials and auth_token:
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.forward_host_credentials "
"and auth_token both set; use one or the other"
) )
settings = _parse_provider_settings(bottle_name, template, d.get("settings")) settings = _parse_provider_settings(bottle_name, template, d.get("settings"))
return cls( return cls(
+8
View File
@@ -0,0 +1,8 @@
"""bot-bottle-orchestrator: forge-native orchestration for bot-bottle.
The package is stdlib-only. The core (events, targeting, lifecycle,
watchdog, sidecar, webhook) depends on its collaborators — a forge, a
state store, a bottle runner — through duck-typed interfaces, so it runs
and tests without bot-bottle installed. `bootstrap` is the single module
that imports `bot_bottle` and wires the concrete implementations.
"""
+51
View File
@@ -0,0 +1,51 @@
"""CLI entry point: `python -m bot_bottle.orchestrator <command>`.
Commands:
run start the webhook server + watchdog + done-signal relay
status print the tracked runs (issue -> slug, status)
"""
from __future__ import annotations
import argparse
import sys
from .config import Config
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(prog="python -m bot_bottle.orchestrator")
sub = parser.add_subparsers(dest="command", required=True)
sub.add_parser("run", help="start the webhook server, watchdog, and relay")
sub.add_parser("status", help="list tracked runs")
args = parser.parse_args(argv)
config = Config.from_env()
if args.command == "run":
from . import bootstrap # pylint: disable=import-outside-toplevel
print(
f"orchestrator listening on "
f"http://{config.webhook_host}:{config.webhook_port}/webhook",
file=sys.stderr,
)
bootstrap.run(config)
return 0
if args.command == "status":
from .bootstrap import ( # pylint: disable=import-outside-toplevel
BotBottleStateStore,
)
store = BotBottleStateStore(config.db_path)
for r in store.all():
pr = f"PR#{r.pr_number}" if r.pr_number else "-"
print(f"{r.owner}/{r.repo}#{r.issue_number}\t{r.slug}\t{r.status}\t{pr}")
return 0
return 2
if __name__ == "__main__":
sys.exit(main())
+155
View File
@@ -0,0 +1,155 @@
"""Wire the concrete bot-bottle implementations into the core.
This is the ONLY module that imports from `bot_bottle.contrib`. It adapts
`SqliteForgeStateStore` to our `StateStore`, builds `GiteaForge`s (and
scope-wrapped forges for sidecars), constructs the `Orchestrator`, and
runs the webhook server + watchdog + done-signal relay.
Imports are direct (no lazy loading) because the orchestrator is now part
of the same package installation.
"""
from __future__ import annotations
import os
import threading
from pathlib import Path
from typing import Any
from ..contrib.forge.base import ScopedForge
from ..contrib.gitea.client import GiteaClient, GiteaForge
from ..contrib.gitea.forge_state import ForgeState, SqliteForgeStateStore
from .config import Config
from .lifecycle import Orchestrator
from .model import RunRecord
from .runner import SubprocessBottleRunner
from .sidecar import ForgeSidecar, OpLog, drain_done_events
from .watchdog import Watchdog
from .webhook import WebhookServer
_RELAY_TICK_SECS = 2.0
def _token() -> str:
tok = os.environ.get("GITEA_TOKEN") or os.environ.get("FORGE_GITEA_TOKEN")
if not tok:
raise RuntimeError("set GITEA_TOKEN (or FORGE_GITEA_TOKEN)")
return tok
class BotBottleStateStore:
"""Adapts `SqliteForgeStateStore` to our `StateStore`, translating
`RunRecord` <-> `ForgeState` field-for-field."""
def __init__(self, db_path: Path | None) -> None:
self._inner = SqliteForgeStateStore(db_path)
def upsert(self, record: RunRecord) -> None:
self._inner.upsert(_to_forge_state(record))
def get(self, owner: str, repo: str, issue_number: int) -> RunRecord | None:
state = self._inner.get(owner, repo, issue_number)
return _to_record(state) if state is not None else None
def delete(self, owner: str, repo: str, issue_number: int) -> None:
self._inner.delete(owner, repo, issue_number)
def all(self) -> list[RunRecord]:
return [_to_record(s) for s in self._inner.all()]
def _to_forge_state(r: RunRecord) -> ForgeState:
return ForgeState(
owner=r.owner, repo=r.repo, issue_number=r.issue_number, slug=r.slug,
agent_name=r.agent_name, bottle_names=list(r.bottle_names),
backend_name=r.backend_name, agent_git_user=r.agent_git_user,
pr_number=r.pr_number, status=r.status, last_checkin_at=r.last_checkin_at,
)
def _to_record(s: ForgeState) -> RunRecord:
return RunRecord(
owner=s.owner, repo=s.repo, issue_number=s.issue_number, slug=s.slug,
agent_name=s.agent_name, bottle_names=list(s.bottle_names),
backend_name=s.backend_name, agent_git_user=s.agent_git_user,
pr_number=s.pr_number, status=s.status, last_checkin_at=s.last_checkin_at,
)
def make_forge(config: Config, owner: str, repo: str) -> Any:
"""A `GiteaForge` bound to one repo."""
client = GiteaClient(
api_url=config.gitea_api, owner=owner, repo=repo, token=_token()
)
return GiteaForge(client)
def make_sidecar(
config: Config, owner: str, repo: str, issue_number: int, assigned_prs: list[int]
) -> ForgeSidecar:
"""A scope-enforced sidecar for one run (read-anywhere / write-scoped)."""
scoped = ScopedForge(
make_forge(config, owner, repo),
assigned_issue=issue_number,
assigned_prs=assigned_prs,
)
op_log = OpLog(config.queue_dir / f"{owner}-{repo}-{issue_number}.oplog.jsonl")
return ForgeSidecar(
forge=scoped,
op_log=op_log,
queue_dir=config.queue_dir,
run_key=(owner, repo, issue_number),
)
def build(config: Config) -> tuple[WebhookServer, Watchdog, Orchestrator]:
store = BotBottleStateStore(config.db_path)
runner = SubprocessBottleRunner(cli=config.bot_bottle_cli, base_env=dict(os.environ))
membership_forge = make_forge(config, "_", "_")
orchestrator = Orchestrator(
forge=membership_forge,
store=store,
runner=runner,
org=config.forge_org,
gitea_api=config.gitea_api,
forge_env_base={
"GITEA_TOKEN": _token(),
"FORGE_QUEUE_DIR": str(config.queue_dir),
"FORGE_SIDECAR_SOCKET": str(config.sidecar_socket),
},
)
watchdog = Watchdog(
store=store, runner=runner, timeout_secs=config.watchdog_timeout_secs
)
server = WebhookServer(
(config.webhook_host, config.webhook_port),
orchestrator=orchestrator,
store=store,
)
return server, watchdog, orchestrator
def _relay_loop(config: Config, orchestrator: Orchestrator, stop: threading.Event) -> None:
while not stop.wait(_RELAY_TICK_SECS):
for ev in drain_done_events(config.queue_dir):
orchestrator.on_done_signal(
ev["owner"], ev["repo"], int(ev["issue_number"]),
str(ev.get("status", "")), str(ev.get("summary", "")),
)
def run(config: Config) -> None:
"""Blocking run: webhook server + watchdog + done-signal relay."""
server, watchdog, orchestrator = build(config)
watchdog.start()
stop = threading.Event()
relay = threading.Thread(
target=_relay_loop, args=(config, orchestrator, stop), daemon=True
)
relay.start()
try:
server.serve_forever()
finally:
stop.set()
watchdog.stop()
server.server_close()
+52
View File
@@ -0,0 +1,52 @@
"""Configuration, loaded from the environment (stdlib `os` only).
Everything the orchestrator needs to run is an env var so a deploy is a
process with an environment, no config file to manage. `FORGE_*` names
match the bot-bottle forge-native PRD.
"""
from __future__ import annotations
import os
from dataclasses import dataclass
from pathlib import Path
# The label that marks an issue as agent-targeted: `bot-bottle:<agent>`.
LABEL_PREFIX = "bot-bottle:"
# Optional bottle override: `bot-bottle-bottle:<name>`.
BOTTLE_LABEL_PREFIX = "bot-bottle-bottle:"
@dataclass(frozen=True)
class Config:
"""Resolved orchestrator configuration."""
forge_org: str
gitea_api: str
watchdog_timeout_secs: int
webhook_host: str
webhook_port: int
bot_bottle_cli: str
queue_dir: Path
sidecar_socket: Path
db_path: Path | None
@staticmethod
def from_env(env: dict[str, str] | None = None) -> "Config":
e = os.environ if env is None else env
home = Path(e.get("HOME", str(Path.home())))
default_root = home / ".bot-bottle"
db = e.get("FORGE_DB_PATH")
return Config(
forge_org=e.get("FORGE_ORG", "bot-bottle"),
gitea_api=e.get("FORGE_GITEA_API", ""),
watchdog_timeout_secs=int(e.get("FORGE_WATCHDOG_TIMEOUT", "1800")),
webhook_host=e.get("FORGE_WEBHOOK_HOST", "127.0.0.1"),
webhook_port=int(e.get("FORGE_WEBHOOK_PORT", "8477")),
bot_bottle_cli=e.get("BOT_BOTTLE_CLI", "cli.py"),
queue_dir=Path(e.get("FORGE_QUEUE_DIR", str(default_root / "forge-queue"))),
sidecar_socket=Path(
e.get("FORGE_SIDECAR_SOCKET", str(default_root / "forge-sidecar.sock"))
),
db_path=Path(db) if db else None,
)
+85
View File
@@ -0,0 +1,85 @@
"""Parse Gitea webhook payloads into typed `ForgeEvent`s.
Only the fields the orchestrator acts on are extracted; unknown payloads
and event types return None so the webhook layer can ignore them.
Gitea sends the event kind in the `X-Gitea-Event` header and the payload
as JSON. The relevant kinds:
- `issues` with `action == "assigned"` -> IssueAssigned
- `issue_comment` with `action == "created"` -> CommentCreated
- `pull_request` with `action == "closed"` -> PullRequestClosed
"""
from __future__ import annotations
from typing import Any
from .model import CommentCreated, ForgeEvent, IssueAssigned, PullRequestClosed
def _repo_owner(payload: dict[str, Any]) -> tuple[str, str]:
repo = payload.get("repository") or {}
owner = (repo.get("owner") or {}).get("login", "")
return str(owner), str(repo.get("name", ""))
def parse_event(event_kind: str, payload: dict[str, Any]) -> ForgeEvent | None:
"""Map (X-Gitea-Event, payload) to a `ForgeEvent`, or None to ignore."""
if event_kind == "issues":
return _parse_issue(payload)
if event_kind == "issue_comment":
return _parse_comment(payload)
if event_kind == "pull_request":
return _parse_pull_request(payload)
return None
def _parse_issue(payload: dict[str, Any]) -> IssueAssigned | None:
if payload.get("action") != "assigned":
return None
owner, repo = _repo_owner(payload)
issue = payload.get("issue") or {}
assignees = tuple(
str(a.get("login", "")) for a in (issue.get("assignees") or [])
)
labels = tuple(str(l.get("name", "")) for l in (issue.get("labels") or []))
return IssueAssigned(
owner=owner,
repo=repo,
issue_number=int(issue.get("number", 0)),
title=str(issue.get("title", "")),
body=str(issue.get("body", "") or ""),
assignees=assignees,
labels=labels,
)
def _parse_comment(payload: dict[str, Any]) -> CommentCreated | None:
if payload.get("action") != "created":
return None
owner, repo = _repo_owner(payload)
issue = payload.get("issue") or {}
comment = payload.get("comment") or {}
return CommentCreated(
owner=owner,
repo=repo,
issue_number=int(issue.get("number", 0)),
comment_id=int(comment.get("id", 0)),
author=str((comment.get("user") or {}).get("login", "")),
body=str(comment.get("body", "") or ""),
is_pull=bool(issue.get("pull_request")),
)
def _parse_pull_request(payload: dict[str, Any]) -> PullRequestClosed | None:
if payload.get("action") != "closed":
return None
owner, repo = _repo_owner(payload)
pr = payload.get("pull_request") or {}
return PullRequestClosed(
owner=owner,
repo=repo,
pr_number=int(pr.get("number", 0)),
merged=bool(pr.get("merged", False)),
)
+180
View File
@@ -0,0 +1,180 @@
"""The orchestration lifecycle: forge events -> bottle transitions.
`Orchestrator.handle(event)` is the single entry point the webhook layer
calls. `on_done_signal(...)` is called by the sidecar relay when an agent
signals completion. All collaborators (forge, store, runner) are
injected and duck-typed; `now` and `label_for` are injectable for tests.
Transitions:
IssueAssigned (targeted, new) -> start bottle, record = running
signal_done (running) -> freeze bottle, record = frozen
CommentCreated (frozen) -> resume bottle, record = running
PullRequestClosed (tracked) -> destroy bottle, record removed
"""
from __future__ import annotations
from collections.abc import Callable
from datetime import datetime
from .model import (
STATUS_DESTROYED,
STATUS_FROZEN,
STATUS_RUNNING,
CommentCreated,
ForgeEvent,
IssueAssigned,
PullRequestClosed,
RunRecord,
)
from .runner import BottleRunner
from .store import StateStore
from .targeting import Membership, Target, resolve_target
def _iso_now() -> str:
return datetime.now().astimezone().isoformat(timespec="seconds")
def _default_label(agent: str, event: IssueAssigned) -> str:
# Embed the issue identity so slugs are unique per issue and never
# get renamed on collision.
return f"{agent}-{event.owner}-{event.repo}-{event.issue_number}"
class Orchestrator:
def __init__(
self,
*,
forge: Membership,
store: StateStore,
runner: BottleRunner,
org: str,
gitea_api: str = "",
forge_env_base: dict[str, str] | None = None,
now: Callable[[], str] = _iso_now,
label_for: Callable[[str, IssueAssigned], str] = _default_label,
) -> None:
self._forge = forge
self._store = store
self._runner = runner
self._org = org
self._gitea_api = gitea_api
self._forge_env_base = forge_env_base or {}
self._now = now
self._label_for = label_for
# --- entry points ------------------------------------------------------
def handle(self, event: ForgeEvent) -> None:
if isinstance(event, IssueAssigned):
self._on_issue_assigned(event)
elif isinstance(event, CommentCreated):
self._on_comment(event)
else:
self._on_pr_closed(event)
def on_done_signal( # pylint: disable=unused-argument
self, owner: str, repo: str, issue_number: int, status: str, summary: str
) -> None:
"""Sidecar relay: an agent signalled completion. Freeze the bottle.
`status`/`summary` are recorded by provenance (via the op log), not
acted on here."""
record = self._store.get(owner, repo, issue_number)
if record is None or record.status != STATUS_RUNNING:
return
self._runner.freeze(record.slug)
record.status = STATUS_FROZEN
record.last_checkin_at = self._now()
self._store.upsert(record)
def link_pr(self, owner: str, repo: str, issue_number: int, pr_number: int) -> None:
"""Record the PR a tracked issue produced, so PR comments and the
PR-close event route back to this record."""
record = self._store.get(owner, repo, issue_number)
if record is not None:
record.pr_number = pr_number
self._store.upsert(record)
# --- handlers ----------------------------------------------------------
def _on_issue_assigned(self, event: IssueAssigned) -> None:
target = resolve_target(event, self._forge, self._org)
if target is None:
return
# Idempotent: a webhook redelivery must not launch a second bottle.
if self._store.get(event.owner, event.repo, event.issue_number) is not None:
return
self._launch(event, target)
def _launch(self, event: IssueAssigned, target: Target) -> None:
label = self._label_for(target.agent_name, event)
bottles = [target.bottle_override] if target.bottle_override else []
result = self._runner.start(
agent=target.agent_name,
bottles=bottles,
label=label,
prompt=event.body,
forge_env=self._forge_env(event.owner, event.repo, event.issue_number),
)
self._store.upsert(
RunRecord(
owner=event.owner,
repo=event.repo,
issue_number=event.issue_number,
slug=result.slug,
agent_name=target.agent_name,
bottle_names=bottles,
status=STATUS_RUNNING,
last_checkin_at=self._now(),
)
)
def _on_comment(self, event: CommentCreated) -> None:
record = self._route_comment(event)
if record is None or record.status != STATUS_FROZEN:
return
# Echo-loop guard: ignore the agent's own comments.
if record.agent_git_user and event.author == record.agent_git_user:
return
self._runner.resume(record.slug, event.body)
record.status = STATUS_RUNNING
record.last_checkin_at = self._now()
self._store.upsert(record)
def _route_comment(self, event: CommentCreated) -> RunRecord | None:
# A comment on the issue routes by issue number; a comment on a PR
# routes by the recorded pr_number.
direct = self._store.get(event.owner, event.repo, event.issue_number)
if direct is not None:
return direct
if event.is_pull:
return self._find_by_pr(event.owner, event.repo, event.issue_number)
return None
def _on_pr_closed(self, event: PullRequestClosed) -> None:
record = self._find_by_pr(event.owner, event.repo, event.pr_number)
if record is None:
return
self._runner.destroy(record.slug)
record.status = STATUS_DESTROYED
self._store.delete(record.owner, record.repo, record.issue_number)
def _find_by_pr(self, owner: str, repo: str, pr_number: int) -> RunRecord | None:
for record in self._store.all():
if (
record.owner == owner
and record.repo == repo
and record.pr_number == pr_number
):
return record
return None
def _forge_env(self, owner: str, repo: str, issue_number: int) -> dict[str, str]:
env = dict(self._forge_env_base)
if self._gitea_api:
env["FORGE_GITEA_API"] = self._gitea_api
env["FORGE_OWNER"] = owner
env["FORGE_REPO"] = repo
env["FORGE_ISSUE_NUMBER"] = str(issue_number)
return env
+108
View File
@@ -0,0 +1,108 @@
"""Domain model: run records, forge events, provenance.
These are the orchestrator's own dataclasses. `RunRecord` mirrors
bot-bottle's `ForgeState` field-for-field so the bootstrap adapter can
translate between them with no loss; keeping our own copy is what lets
the core stay import-free of bot-bottle.
"""
from __future__ import annotations
from dataclasses import dataclass, field
# Run lifecycle. A bottle is launched (running), frozen on the done
# signal, and destroyed when the PR closes.
STATUS_RUNNING = "running"
STATUS_FROZEN = "frozen"
STATUS_DESTROYED = "destroyed"
@dataclass
class RunRecord:
"""One forge-targeted issue's bottle lifecycle record."""
owner: str
repo: str
issue_number: int
slug: str
agent_name: str
bottle_names: list[str] = field(default_factory=list)
backend_name: str = ""
agent_git_user: str = ""
pr_number: int | None = None
status: str = STATUS_RUNNING
last_checkin_at: str = ""
# --- Forge events (parsed webhook payloads) --------------------------------
@dataclass(frozen=True)
class IssueAssigned:
"""An issue gained an assignee — the trigger to consider a launch."""
owner: str
repo: str
issue_number: int
title: str
body: str
assignees: tuple[str, ...]
labels: tuple[str, ...]
@dataclass(frozen=True)
class CommentCreated:
"""A comment was posted on an issue or PR — a rehydrate trigger."""
owner: str
repo: str
issue_number: int
comment_id: int
author: str
body: str
is_pull: bool
@dataclass(frozen=True)
class PullRequestClosed:
"""A PR closed (merged or not) — the teardown trigger."""
owner: str
repo: str
pr_number: int
merged: bool
# Union of everything the webhook layer can emit.
ForgeEvent = IssueAssigned | CommentCreated | PullRequestClosed
# --- Provenance ------------------------------------------------------------
@dataclass(frozen=True)
class ForgeOp:
"""One semantic forge operation the sidecar recorded."""
at: str # ISO timestamp
op: str # e.g. "post_comment", "read_pr", "signal_done"
target: int | None
detail: str
@dataclass(frozen=True)
class Provenance:
"""The audit record for one run, served by the provenance API. Never
posted into the forge."""
slug: str
owner: str
repo: str
issue_number: int
agent_name: str
bottle_names: tuple[str, ...]
started_at: str
finished_at: str
exit_code: int | None
watchdog_fired: bool
ops: tuple[ForgeOp, ...]
+71
View File
@@ -0,0 +1,71 @@
"""Provenance assembly + serialization.
Provenance is the run's audit record: the `RunRecord` metadata plus the
sidecar's semantic operation log. It is exposed through the provenance
API (see `webhook.ProvenanceHandler`) and deliberately never posted back
into the forge — a mutable PR comment is not an audit record.
This module only assembles and serializes; retention/signing of the
record is a control-plane concern out of scope here.
"""
from __future__ import annotations
from typing import Any
from .model import ForgeOp, Provenance, RunRecord
def ops_from_log(entries: list[dict[str, Any]]) -> tuple[ForgeOp, ...]:
return tuple(
ForgeOp(
at=str(e.get("at", "")),
op=str(e.get("op", "")),
target=e.get("target"),
detail=str(e.get("detail", "")),
)
for e in entries
)
def build_provenance(
record: RunRecord,
*,
ops: tuple[ForgeOp, ...],
started_at: str,
finished_at: str,
exit_code: int | None,
watchdog_fired: bool,
) -> Provenance:
return Provenance(
slug=record.slug,
owner=record.owner,
repo=record.repo,
issue_number=record.issue_number,
agent_name=record.agent_name,
bottle_names=tuple(record.bottle_names),
started_at=started_at,
finished_at=finished_at,
exit_code=exit_code,
watchdog_fired=watchdog_fired,
ops=ops,
)
def provenance_to_dict(p: Provenance) -> dict[str, Any]:
return {
"slug": p.slug,
"owner": p.owner,
"repo": p.repo,
"issue_number": p.issue_number,
"agent": p.agent_name,
"bottles": list(p.bottle_names),
"started_at": p.started_at,
"finished_at": p.finished_at,
"exit_code": p.exit_code,
"watchdog_fired": p.watchdog_fired,
"ops": [
{"at": o.at, "op": o.op, "target": o.target, "detail": o.detail}
for o in p.ops
],
}
+118
View File
@@ -0,0 +1,118 @@
"""Bottle runner: drive the bot-bottle CLI to manage a bottle's life.
`BottleRunner` is the interface the lifecycle depends on;
`SubprocessBottleRunner` shells out to the bot-bottle `cli.py`
(`start --headless`, `commit`, `resume --headless`). The subprocess
callable is injectable so tests never spawn a process.
The slug is derived from the label via `slugify`, matching bot-bottle's
container-slug rule; the orchestrator picks labels that embed the issue
identity so slugs are unique and collisions never rename them.
"""
from __future__ import annotations
import re
import subprocess
import sys
from collections.abc import Callable, Sequence
from dataclasses import dataclass
from typing import Protocol
@dataclass(frozen=True)
class RunResult:
slug: str
exit_code: int
class BottleRunner(Protocol):
def start(
self,
*,
agent: str,
bottles: Sequence[str],
label: str,
prompt: str,
forge_env: dict[str, str],
) -> RunResult: ...
def freeze(self, slug: str) -> int: ...
def resume(self, slug: str, prompt: str) -> RunResult: ...
def destroy(self, slug: str) -> int: ...
_SLUG_RE = re.compile(r"[^a-z0-9]+")
def slugify(label: str) -> str:
"""Lowercase, collapse non-alphanumerics to single hyphens, strip
leading/trailing hyphens — matches bot-bottle's slug rule."""
return _SLUG_RE.sub("-", label.lower()).strip("-")
# A subprocess.run-shaped callable, injectable for tests.
RunFn = Callable[[Sequence[str], dict[str, str]], int]
def _default_run(argv: Sequence[str], env: dict[str, str]) -> int:
return subprocess.run(list(argv), env=env, check=False).returncode
class SubprocessBottleRunner:
"""Shells the bot-bottle CLI. `cli` is the path to `cli.py`; `python`
is the interpreter to run it with; `base_env` is the environment the
child inherits (the orchestrator's, minus per-run additions)."""
def __init__(
self,
*,
cli: str,
base_env: dict[str, str],
python: str = sys.executable,
run: RunFn = _default_run,
) -> None:
self._cli = cli
self._python = python
self._base_env = base_env
self._run = run
def _argv(self, *args: str) -> list[str]:
return [self._python, self._cli, *args]
def start(
self,
*,
agent: str,
bottles: Sequence[str],
label: str,
prompt: str,
forge_env: dict[str, str],
) -> RunResult:
argv = self._argv(
"start", agent, "--headless", "--label", label, "--prompt", prompt
)
for bottle in bottles:
argv += ["--bottle", bottle]
code = self._run(argv, {**self._base_env, **forge_env})
return RunResult(slug=slugify(label), exit_code=code)
def freeze(self, slug: str) -> int:
# bot-bottle's `commit` snapshots a running bottle's state.
return self._run(self._argv("commit", slug), self._base_env)
def resume(self, slug: str, prompt: str) -> RunResult:
code = self._run(
self._argv("resume", slug, "--headless", "--prompt", prompt),
self._base_env,
)
return RunResult(slug=slug, exit_code=code)
def destroy(self, slug: str) -> int:
# NOTE: bot-bottle `cleanup` currently targets all bottles; a
# per-slug teardown command is a known integration follow-up
# (tracked in docs/JOURNAL.md). Kept behind this method so the
# call site does not change when that lands.
return self._run(self._argv("cleanup", slug), self._base_env)
+171
View File
@@ -0,0 +1,171 @@
"""Forge sidecar: the agent's only door to the forge.
The agent calls the sidecar over a line-delimited JSON-RPC AF_UNIX
socket; the sidecar dispatches to an injected `forge` (already
scope-wrapped by bootstrap) and holds the token, so the agent never sees
a credential or a forge endpoint. Every call is appended to a semantic
operation log (the provenance raw material). `signal_done` additionally
drops an event file in the queue dir the orchestrator drains.
`dispatch` is pure and testable; `serve` wraps it in a socket server.
"""
from __future__ import annotations
import dataclasses
import json
import socketserver
import uuid
from collections.abc import Callable
from datetime import datetime
from pathlib import Path
from typing import Any
_READ_METHODS = {"read_issue", "read_pr", "read_comments"}
_WRITE_METHODS = {"post_comment", "update_description"}
def _iso_now() -> str:
return datetime.now().astimezone().isoformat(timespec="seconds")
def _jsonable(value: Any) -> Any:
if dataclasses.is_dataclass(value) and not isinstance(value, type):
return dataclasses.asdict(value)
if isinstance(value, list):
return [_jsonable(v) for v in value]
return value
class OpLog:
"""Append-only JSONL log of semantic forge operations."""
def __init__(self, path: Path, *, now: Callable[[], str] = _iso_now) -> None:
self._path = path
self._now = now
path.parent.mkdir(parents=True, exist_ok=True)
def record(self, op: str, target: int | None, detail: str) -> None:
entry = {"at": self._now(), "op": op, "target": target, "detail": detail}
with self._path.open("a", encoding="utf-8") as fh:
fh.write(json.dumps(entry) + "\n")
def read(self) -> list[dict[str, Any]]:
if not self._path.exists():
return []
return [
json.loads(line)
for line in self._path.read_text(encoding="utf-8").splitlines()
if line.strip()
]
def write_done_event(queue_dir: Path, event: dict[str, Any]) -> Path:
"""Atomically drop a done-signal event file in the queue dir."""
queue_dir.mkdir(parents=True, exist_ok=True)
path = queue_dir / f"done-{uuid.uuid4().hex}.json"
tmp = path.with_suffix(".json.tmp")
tmp.write_text(json.dumps(event), encoding="utf-8")
tmp.replace(path)
return path
def drain_done_events(queue_dir: Path) -> list[dict[str, Any]]:
"""Read and remove every queued done-signal event."""
if not queue_dir.is_dir():
return []
events: list[dict[str, Any]] = []
for path in sorted(queue_dir.glob("done-*.json")):
try:
events.append(json.loads(path.read_text(encoding="utf-8")))
except (OSError, ValueError):
continue
finally:
path.unlink(missing_ok=True)
return events
class ForgeSidecar:
"""Dispatches sidecar protocol calls to the forge, logging each and
relaying `signal_done` to the queue dir. `run_key` is the
(owner, repo, issue_number) the run is bound to."""
def __init__(
self,
*,
forge: object,
op_log: OpLog,
queue_dir: Path,
run_key: tuple[str, str, int],
) -> None:
self._forge = forge
self._log = op_log
self._queue_dir = queue_dir
self._owner, self._repo, self._issue = run_key
def dispatch(self, method: str, params: dict[str, Any]) -> dict[str, Any]:
try:
result = self._invoke(method, params)
except Exception as exc: # noqa: BLE001 — surface as JSON-RPC error
self._log.record(method, params.get("number"), f"error: {exc}")
return {"ok": False, "error": str(exc)}
return {"ok": True, "result": result}
def _invoke(self, method: str, params: dict[str, Any]) -> Any:
if method in _READ_METHODS:
number = int(params["number"])
result = getattr(self._forge, method)(number)
self._log.record(method, number, "ok")
return _jsonable(result)
if method in _WRITE_METHODS:
number = int(params["number"])
getattr(self._forge, method)(number, params["body"])
self._log.record(method, number, "ok")
return None
if method == "signal_done":
status = str(params.get("status", ""))
summary = str(params.get("summary", ""))
self._log.record("signal_done", None, f"{status}: {summary}")
write_done_event(
self._queue_dir,
{
"owner": self._owner,
"repo": self._repo,
"issue_number": self._issue,
"status": status,
"summary": summary,
},
)
return None
raise ValueError(f"unknown method: {method}")
class _Handler(socketserver.StreamRequestHandler):
def handle(self) -> None:
line = self.rfile.readline()
if not line:
return
try:
req = json.loads(line)
except ValueError:
self.wfile.write(b'{"ok": false, "error": "invalid json"}\n')
return
resp = self.server.sidecar.dispatch( # type: ignore[attr-defined]
str(req.get("method", "")), dict(req.get("params", {}))
)
self.wfile.write((json.dumps(resp) + "\n").encode())
class _Server(socketserver.ThreadingUnixStreamServer):
def __init__(self, socket_path: str, sidecar: ForgeSidecar) -> None:
super().__init__(socket_path, _Handler)
self.sidecar = sidecar
def serve(sidecar: ForgeSidecar, socket_path: Path) -> _Server:
"""Bind a threaded AF_UNIX server for `sidecar`. Caller runs
`serve_forever()` (or `handle_request()` in tests) and closes it."""
if socket_path.exists():
socket_path.unlink()
socket_path.parent.mkdir(parents=True, exist_ok=True)
return _Server(str(socket_path), sidecar)
+48
View File
@@ -0,0 +1,48 @@
"""State store interface + an in-memory implementation.
The orchestrator persists one `RunRecord` per forge-targeted issue. At
runtime `bootstrap` supplies an adapter over bot-bottle's
`SqliteForgeStateStore`; the in-memory store here backs tests and a
`--no-bot-bottle` dry mode.
"""
from __future__ import annotations
from typing import Protocol
from .model import RunRecord
class StateStore(Protocol):
"""Thin CRUD surface. Mirrors bot-bottle's `ForgeStateStore` so the
bootstrap adapter is a straight pass-through."""
def upsert(self, record: RunRecord) -> None: ...
def get(self, owner: str, repo: str, issue_number: int) -> RunRecord | None: ...
def delete(self, owner: str, repo: str, issue_number: int) -> None: ...
def all(self) -> list[RunRecord]: ...
class InMemoryStateStore:
"""Dict-backed `StateStore`, keyed by (owner, repo, issue_number)."""
def __init__(self) -> None:
self._by_key: dict[tuple[str, str, int], RunRecord] = {}
def upsert(self, record: RunRecord) -> None:
self._by_key[(record.owner, record.repo, record.issue_number)] = record
def get(self, owner: str, repo: str, issue_number: int) -> RunRecord | None:
return self._by_key.get((owner, repo, issue_number))
def delete(self, owner: str, repo: str, issue_number: int) -> None:
self._by_key.pop((owner, repo, issue_number), None)
def all(self) -> list[RunRecord]:
return sorted(
self._by_key.values(),
key=lambda r: (r.owner, r.repo, r.issue_number),
)
+51
View File
@@ -0,0 +1,51 @@
"""Decide whether an assigned issue is agent-targeted, and for whom.
An issue is forge-targeted when BOTH hold:
- it carries a `bot-bottle:<agent>` label naming the agent, and
- at least one assignee is a member of the configured org.
An optional `bot-bottle-bottle:<name>` label overrides bottle selection.
The forge is duck-typed: any object with `is_org_member(org, user)`.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Protocol
from .config import BOTTLE_LABEL_PREFIX, LABEL_PREFIX
from .model import IssueAssigned
class Membership(Protocol):
def is_org_member(self, org: str, username: str) -> bool: ...
@dataclass(frozen=True)
class Target:
agent_name: str
bottle_override: str | None
def parse_labels(labels: tuple[str, ...]) -> tuple[str | None, str | None]:
"""Return (agent_name, bottle_override) parsed from labels."""
agent: str | None = None
bottle: str | None = None
for label in labels:
if label.startswith(BOTTLE_LABEL_PREFIX):
bottle = label[len(BOTTLE_LABEL_PREFIX):] or None
elif label.startswith(LABEL_PREFIX):
agent = label[len(LABEL_PREFIX):] or None
return agent, bottle
def resolve_target(
event: IssueAssigned, forge: Membership, org: str
) -> Target | None:
"""Return the `Target` for a forge-targeted issue, or None to ignore."""
agent, bottle = parse_labels(event.labels)
if not agent:
return None
if not any(forge.is_org_member(org, a) for a in event.assignees):
return None
return Target(agent_name=agent, bottle_override=bottle)
+68
View File
@@ -0,0 +1,68 @@
"""Watchdog: freeze runs whose agent exited without signalling done.
`sweep(now)` is the pure, testable core: any `running` record whose
`last_checkin_at` is older than the timeout is frozen as
done-without-self-report and returned so provenance can flag it.
`Watchdog.start()` runs `sweep` on a daemon thread once a minute.
"""
from __future__ import annotations
import threading
from datetime import datetime, timedelta
from .model import STATUS_FROZEN, STATUS_RUNNING, RunRecord
from .runner import BottleRunner
from .store import StateStore
_TICK_SECS = 60.0
def _parse(ts: str) -> datetime | None:
try:
return datetime.fromisoformat(ts)
except (ValueError, TypeError):
return None
class Watchdog:
def __init__(
self,
*,
store: StateStore,
runner: BottleRunner,
timeout_secs: int,
) -> None:
self._store = store
self._runner = runner
self._timeout = timedelta(seconds=timeout_secs)
self._stop = threading.Event()
self._thread: threading.Thread | None = None
def sweep(self, now: datetime) -> list[RunRecord]:
"""Freeze stale running records. Returns the ones fired."""
fired: list[RunRecord] = []
for record in self._store.all():
if record.status != STATUS_RUNNING:
continue
checkin = _parse(record.last_checkin_at)
if checkin is None or now - checkin <= self._timeout:
continue
self._runner.freeze(record.slug)
record.status = STATUS_FROZEN
self._store.upsert(record)
fired.append(record)
return fired
def start(self) -> None:
self._thread = threading.Thread(target=self._loop, daemon=True)
self._thread.start()
def stop(self) -> None:
self._stop.set()
if self._thread is not None:
self._thread.join(timeout=_TICK_SECS)
def _loop(self) -> None:
while not self._stop.wait(_TICK_SECS):
self.sweep(datetime.now().astimezone())
+123
View File
@@ -0,0 +1,123 @@
"""HTTP surface: the Gitea webhook receiver and the provenance API.
`POST /webhook` a Gitea event; parsed and dispatched to the orchestrator.
`GET /healthz` liveness.
`GET /provenance?owner=&repo=&issue=` the run's audit record (never
posted to the forge).
Webhook signature verification is optional: set a secret and the handler
rejects bodies whose `X-Gitea-Signature` HMAC-SHA256 does not match.
"""
from __future__ import annotations
import hmac
import json
from collections.abc import Callable
from hashlib import sha256
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from typing import Any
from urllib.parse import parse_qs, urlparse
from .events import parse_event
from .lifecycle import Orchestrator
from .provenance import build_provenance, ops_from_log, provenance_to_dict
from .store import StateStore
# (record) -> that run's op-log entries, injected by bootstrap.
OpLogReader = Callable[[Any], list[dict[str, Any]]]
class WebhookServer(ThreadingHTTPServer):
def __init__(
self,
address: tuple[str, int],
*,
orchestrator: Orchestrator,
store: StateStore,
secret: bytes | None = None,
op_log_reader: OpLogReader | None = None,
) -> None:
super().__init__(address, _Handler)
self.orchestrator = orchestrator
self.store = store
self.secret = secret
self.op_log_reader = op_log_reader
def verify_signature(secret: bytes, body: bytes, signature: str) -> bool:
expected = hmac.new(secret, body, sha256).hexdigest()
return hmac.compare_digest(expected, signature or "")
class _Handler(BaseHTTPRequestHandler):
server: WebhookServer # type: ignore[assignment]
def log_message( # pylint: disable=redefined-builtin
self, format: str, *args: Any
) -> None: # quiet by default
pass
def _send(self, code: int, payload: dict[str, Any]) -> None:
body = json.dumps(payload).encode()
self.send_response(code)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def do_POST(self) -> None: # noqa: N802 # pylint: disable=invalid-name
if urlparse(self.path).path != "/webhook":
self._send(404, {"error": "not found"})
return
length = int(self.headers.get("Content-Length", "0"))
body = self.rfile.read(length)
if self.server.secret is not None:
sig = self.headers.get("X-Gitea-Signature", "")
if not verify_signature(self.server.secret, body, sig):
self._send(401, {"error": "bad signature"})
return
try:
payload = json.loads(body)
except ValueError:
self._send(400, {"error": "invalid json"})
return
kind = self.headers.get("X-Gitea-Event", "")
event = parse_event(kind, payload)
if event is not None:
self.server.orchestrator.handle(event)
self._send(200, {"ok": True, "handled": event is not None})
def do_GET(self) -> None: # noqa: N802 # pylint: disable=invalid-name
parsed = urlparse(self.path)
if parsed.path == "/healthz":
self._send(200, {"ok": True})
return
if parsed.path == "/provenance":
self._provenance(parse_qs(parsed.query))
return
self._send(404, {"error": "not found"})
def _provenance(self, query: dict[str, list[str]]) -> None:
try:
owner = query["owner"][0]
repo = query["repo"][0]
issue = int(query["issue"][0])
except (KeyError, IndexError, ValueError):
self._send(400, {"error": "owner, repo, issue required"})
return
record = self.server.store.get(owner, repo, issue)
if record is None:
self._send(404, {"error": "no such run"})
return
reader = self.server.op_log_reader
ops = ops_from_log(reader(record) if reader is not None else [])
prov = build_provenance(
record,
ops=ops,
started_at="",
finished_at=record.last_checkin_at,
exit_code=None,
watchdog_fired=False,
)
self._send(200, provenance_to_dict(prov))
@@ -0,0 +1,146 @@
# PRD prd-new: Claude forward_host_credentials
- **Status:** Draft
- **Author:** claude
- **Created:** 2026-07-01
- **Issue:** #325
## Summary
Add `agent_provider.forward_host_credentials: true` support for the
`claude` template, mirroring the existing Codex flow. When enabled,
bot-bottle reads the host's Claude OAuth session key from
`~/.claude.json` at launch, forwards it only to the egress sidecar,
and injects a placeholder `CLAUDE_CODE_OAUTH_TOKEN` into the agent so
Claude Code starts without ever seeing the real credential.
## Problem
Running a Claude agent in a container today requires the operator to
manually extract a long-lived OAuth token (`claude setup-token`), export
it as `BOT_BOTTLE_CLAUDE_OAUTH_TOKEN`, and reference it explicitly in
the manifest with `agent_provider.auth_token:
"BOT_BOTTLE_CLAUDE_OAUTH_TOKEN"`. This is a two-step manual ceremony
that is easy to skip or do incorrectly.
The host already stores a valid Claude session in `~/.claude.json` after
`claude login` or `claude setup-token`. Codex already automates an
equivalent extraction from `~/.codex/auth.json`. There is no reason
Claude bottles cannot do the same.
## Goals / Success Criteria
- A Claude bottle with `forward_host_credentials: true` in the manifest
uses the host's `~/.claude.json` session key at launch with no
additional operator steps.
- The agent container receives only `CLAUDE_CODE_OAUTH_TOKEN=egress-placeholder`
— never the real token.
- The real session key lives only in the egress sidecar's environment.
- Missing, malformed, or expired host Claude auth fails launch with a
clear operator-facing message.
- Existing `auth_token` behavior is unchanged.
- `forward_host_credentials: true` is rejected in the manifest when both
`auth_token` and `forward_host_credentials` are set, since they serve
the same purpose.
## Non-goals
- Refreshing Claude OAuth tokens in the sidecar.
- Writing a dummy `~/.claude.json` auth state to the agent (unlike the
Codex flow, Claude Code reads its credential from `CLAUDE_CODE_OAUTH_TOKEN`
in env, not from an auth file — no guest-side auth marker is needed).
- Supporting `forward_host_credentials` for providers other than `codex`
and `claude`.
## Design
### Manifest schema
```yaml
agent_provider:
template: claude
forward_host_credentials: true
```
Rejects in manifest validation when:
- Template is not `codex` or `claude`.
- Both `auth_token` and `forward_host_credentials` are set.
### Host auth extraction (`contrib/claude/claude_auth.py`)
Claude Code credential storage varies by platform:
- **Linux**: `~/.claude/.credentials.json`
- **macOS**: macOS Keychain, service `"Claude Code-credentials"`
(the file path is tried first; Keychain is the fallback when the file
is absent)
`~/.claude.json` contains only UI state and profile metadata — no token.
The credentials JSON schema (same whether from file or Keychain):
```json
{
"claudeAiOauth": {
"accessToken": "sk-ant-oat01-...",
"refreshToken": "sk-ant-ort01-...",
"expiresAt": 1748276587173,
"scopes": ["user:inference", "user:profile"]
}
}
```
`expiresAt` is in **milliseconds** (not seconds).
At prepare/launch time, when `forward_host_credentials: true`:
1. Try `~/.claude/.credentials.json`; on macOS, if absent, run
`security find-generic-password -s "Claude Code-credentials" -w`
and parse its stdout as JSON.
2. Require a `claudeAiOauth` dict.
3. Require a non-empty `claudeAiOauth.accessToken` string.
4. If `claudeAiOauth.expiresAt` is present, divide by 1000 and require
the result to be in the future.
5. Return only the access token to the launch path.
Errors name the missing or invalid condition and point the operator at
`claude login`, without printing token values.
### Egress route
When `forward_host_credentials: true`:
- Provision the session key in `provisioned_env` under
`BOT_BOTTLE_CLAUDE_HOST_ACCESS_TOKEN` (new constant in `egress.py`).
- Set up the `api.anthropic.com` egress route with `auth_scheme: Bearer`
and `token_ref: BOT_BOTTLE_CLAUDE_HOST_ACCESS_TOKEN`.
- Set `CLAUDE_CODE_OAUTH_TOKEN=egress-placeholder` in the agent env and
add it to `hidden_env_names`.
No dummy auth file and no `verify` step are needed — Claude Code reads
the credential from the env var, not from a file.
### Constants
- `CLAUDE_HOST_CREDENTIAL_TOKEN_REF = "BOT_BOTTLE_CLAUDE_HOST_ACCESS_TOKEN"`
in `egress.py` (alongside the existing `CODEX_HOST_CREDENTIAL_TOKEN_REF`).
- `CLAUDE_HOST_CREDENTIAL_HOSTS = ("api.anthropic.com",)` in
`agent_provider.py` (alongside the existing `CODEX_HOST_CREDENTIAL_HOSTS`).
### Data flow
```
Host ~/.claude.json → bot-bottle launch
├──► egress sidecar env (real token only)
└──► agent env: CLAUDE_CODE_OAUTH_TOKEN=egress-placeholder
Agent → HTTPS to api.anthropic.com (via egress)
Egress → injects Authorization: Bearer <real token>
Egress → forwards to api.anthropic.com
```
## Open questions
None — the Codex precedent makes the design clear.
@@ -0,0 +1,132 @@
# PRD prd-new: Fold bot-bottle-orchestrator into this repo
- **Status:** Active
- **Author:** didericis
- **Created:** 2026-07-01
- **Issue:** #321
## Summary
Move the `bot-bottle-orchestrator` binary into `bot_bottle/orchestrator/` as a
first-class subpackage. `pip install bot-bottle` gets you everything; the
orchestrator's entry point becomes `python -m bot_bottle.orchestrator run`. The
cross-repo CLI contract becomes an internal boundary, and the forge integration
layer (`GiteaClient`, `ScopedForge`, `SqliteForgeStateStore`) is promoted to
`bot_bottle/contrib/` where it belongs.
## Problem
The orchestrator and bot-bottle are tightly coupled:
- It always deploys on the same host.
- It imports from `bot_bottle` for the forge/state layer.
- Its runner shims (`start --headless`, `commit`, `resume`) map 1:1 to CLI
commands in `cli.py` — a breaking CLI change silently breaks the orchestrator
with no CI signal.
- Two repos means two version pins, two CI pipelines, and two install steps
every time the deploy environment is rebuilt.
## Goals / Success Criteria
- All orchestrator modules live under `bot_bottle/orchestrator/` and the package
is importable as `from bot_bottle.orchestrator import ...`.
- `python -m bot_bottle.orchestrator run` starts the webhook server.
- `python -m bot_bottle.orchestrator status` prints tracked runs.
- The forge integration layer (`GiteaClient`, `GiteaForge`, `ScopedForge`,
`ForgeState`, `SqliteForgeStateStore`) lives in `bot_bottle/contrib/` and is
covered by tests in `tests/unit/orchestrator/`.
- All orchestrator unit tests pass under bot-bottle's existing CI
(`python -m unittest discover -s tests/unit`).
- No functional change to the orchestrator's external behaviour: same
HTTP surface, same webhook protocol, same env-var config, same CLI flags.
## Non-goals
- Replacing `SubprocessBottleRunner` with a direct programmatic runner — the
subprocess shim stays; the `BottleRunner` protocol remains the internal
abstraction point.
- Merging the orchestrator's SQLite DB with any other bot-bottle state store.
- Archiving `bot-bottle-orchestrator` (that happens after this ships and the
deploy is updated; out of scope for this PR).
## Design
### Package layout
```
bot_bottle/
orchestrator/
__init__.py
__main__.py # python -m bot_bottle.orchestrator
bootstrap.py # wires contrib modules → orchestrator core
config.py
events.py
lifecycle.py
model.py
provenance.py
runner.py
sidecar.py
store.py
targeting.py
watchdog.py
webhook.py
contrib/
forge/
__init__.py
base.py # ScopedForge: read-anywhere / write-scoped wrapper
gitea/
client.py # GiteaClient (urllib.request), GiteaForge
forge_state.py # ForgeState dataclass + SqliteForgeStateStore
tests/unit/orchestrator/
__init__.py
_fakes.py
test_config.py
test_events.py
test_lifecycle.py
test_provenance.py
test_runner.py
test_sidecar.py
test_store.py
test_targeting.py
test_watchdog.py
test_webhook.py
```
### Module moves
Every `orchestrator/` source file moves verbatim into `bot_bottle/orchestrator/`.
Internal imports are already relative (`from .config import Config`) so no
changes are needed inside the orchestrator modules themselves.
`bootstrap.py` is the only file that changes meaningfully: the lazy `bot_bottle`
imports become direct relative imports (`from ..contrib.gitea.client import …`),
and the `_require_bot_bottle()` guard is removed since the package is always
present.
### New contrib modules
**`bot_bottle/contrib/forge/base.py``ScopedForge`**
Wraps any forge object and enforces read-anywhere / write-scoped access: reads
pass through unconditionally; `post_comment` and `update_description` raise
`PermissionError` for issue/PR numbers outside the assigned set.
**`bot_bottle/contrib/gitea/client.py``GiteaClient`, `GiteaForge`**
`GiteaClient` is a thin `urllib.request`-only HTTP wrapper (no new Python
dependencies). `GiteaForge` composes a client and exposes the forge protocol:
`is_org_member`, `read_issue`, `read_pr`, `read_comments`, `post_comment`,
`update_description`.
**`bot_bottle/contrib/gitea/forge_state.py``ForgeState`, `SqliteForgeStateStore`**
`ForgeState` is a dataclass mirroring `RunRecord` field-for-field. `SqliteForgeStateStore`
backs it with SQLite (stdlib `sqlite3`): a single `forge_state` table with one
row per (owner, repo, issue\_number).
### Test migration
All orchestrator test files move to `tests/unit/orchestrator/` with absolute
imports updated from `orchestrator.X` to `bot_bottle.orchestrator.X`. The unit
discovery command (`-s tests/unit`) picks them up automatically — no CI changes
required.
View File
+69
View File
@@ -0,0 +1,69 @@
"""Shared test doubles: a duck-typed forge and bottle runner."""
# Test doubles mirror an API shape; some params are intentionally unused.
# pylint: disable=unused-argument
from __future__ import annotations
from collections.abc import Sequence
from bot_bottle.orchestrator.runner import RunResult, slugify
class FakeForge:
def __init__(self, members: tuple[str, ...] = ()) -> None:
self.members = set(members)
self.comments: list[tuple[int, str]] = []
self.descriptions: list[tuple[int, str]] = []
self.scope_denied: set[int] = set()
def is_org_member(self, org: str, username: str) -> bool:
return username in self.members
def read_issue(self, number: int) -> dict[str, object]:
return {"number": number, "kind": "issue"}
def read_pr(self, number: int) -> dict[str, object]:
return {"number": number, "merged": False}
def read_comments(self, number: int) -> list[dict[str, object]]:
return [{"id": 1, "user": "alice", "body": "hi"}]
def post_comment(self, number: int, body: str) -> None:
if number in self.scope_denied:
raise PermissionError(f"write to #{number} denied")
self.comments.append((number, body))
def update_description(self, number: int, body: str) -> None:
if number in self.scope_denied:
raise PermissionError(f"write to #{number} denied")
self.descriptions.append((number, body))
class FakeRunner:
def __init__(self) -> None:
self.calls: list[tuple[object, ...]] = []
def start(
self,
*,
agent: str,
bottles: Sequence[str],
label: str,
prompt: str,
forge_env: dict[str, str],
) -> RunResult:
self.calls.append(("start", agent, tuple(bottles), label, prompt, dict(forge_env)))
return RunResult(slug=slugify(label), exit_code=0)
def freeze(self, slug: str) -> int:
self.calls.append(("freeze", slug))
return 0
def resume(self, slug: str, prompt: str) -> RunResult:
self.calls.append(("resume", slug, prompt))
return RunResult(slug=slug, exit_code=0)
def destroy(self, slug: str) -> int:
self.calls.append(("destroy", slug))
return 0
+179
View File
@@ -0,0 +1,179 @@
"""Unit: BotBottleStateStore, _token, conversions, make_forge/make_sidecar, build."""
from __future__ import annotations
import os
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch
from bot_bottle.orchestrator.bootstrap import (
BotBottleStateStore,
_to_forge_state,
_to_record,
_token,
build,
make_forge,
make_sidecar,
)
from bot_bottle.orchestrator.config import Config
from bot_bottle.orchestrator.model import RunRecord
def _config(tmp: str) -> Config:
return Config(
forge_org="org",
gitea_api="http://g/api/v1",
watchdog_timeout_secs=1800,
webhook_host="127.0.0.1",
webhook_port=0,
bot_bottle_cli="cli.py",
queue_dir=Path(tmp) / "q",
sidecar_socket=Path(tmp) / "s.sock",
db_path=None,
)
def _record(**kw: object) -> RunRecord:
defaults: dict[str, object] = {
"owner": "o", "repo": "r", "issue_number": 1, "slug": "s1", "agent_name": "a",
"bottle_names": ["claude"], "backend_name": "docker", "agent_git_user": "bot",
"pr_number": 5, "status": "running", "last_checkin_at": "2026-01-01T00:00:00+00:00",
}
defaults.update(kw)
return RunRecord(**defaults) # type: ignore[arg-type]
class TokenTest(unittest.TestCase):
def test_gitea_token_env(self):
with patch.dict(os.environ, {"GITEA_TOKEN": "tok123"}):
self.assertEqual("tok123", _token())
def test_forge_gitea_token_fallback(self):
clean = {k: v for k, v in os.environ.items()
if k not in ("GITEA_TOKEN", "FORGE_GITEA_TOKEN")}
with patch.dict(os.environ, {**clean, "FORGE_GITEA_TOKEN": "tok456"}, clear=True):
self.assertEqual("tok456", _token())
def test_missing_token_raises(self):
clean = {k: v for k, v in os.environ.items()
if k not in ("GITEA_TOKEN", "FORGE_GITEA_TOKEN")}
with patch.dict(os.environ, clean, clear=True):
with self.assertRaises(RuntimeError):
_token()
class ConversionRoundTripTest(unittest.TestCase):
def test_record_survives_forge_state_roundtrip(self):
rec = _record()
result = _to_record(_to_forge_state(rec))
self.assertEqual(rec.owner, result.owner)
self.assertEqual(rec.repo, result.repo)
self.assertEqual(rec.issue_number, result.issue_number)
self.assertEqual(rec.slug, result.slug)
self.assertEqual(rec.agent_name, result.agent_name)
self.assertEqual(rec.bottle_names, result.bottle_names)
self.assertEqual(rec.backend_name, result.backend_name)
self.assertEqual(rec.agent_git_user, result.agent_git_user)
self.assertEqual(rec.pr_number, result.pr_number)
self.assertEqual(rec.status, result.status)
self.assertEqual(rec.last_checkin_at, result.last_checkin_at)
def test_none_pr_number_preserved(self):
rec = _record(pr_number=None)
result = _to_record(_to_forge_state(rec))
self.assertIsNone(result.pr_number)
class BotBottleStateStoreTest(unittest.TestCase):
def setUp(self):
self.store = BotBottleStateStore(None)
def test_upsert_and_get(self):
self.store.upsert(_record())
got = self.store.get("o", "r", 1)
assert got is not None
self.assertEqual("s1", got.slug)
def test_get_missing(self):
self.assertIsNone(self.store.get("o", "r", 99))
def test_upsert_replaces(self):
self.store.upsert(_record())
self.store.upsert(_record(slug="new-slug"))
got = self.store.get("o", "r", 1)
assert got is not None
self.assertEqual("new-slug", got.slug)
def test_delete(self):
self.store.upsert(_record())
self.store.delete("o", "r", 1)
self.assertIsNone(self.store.get("o", "r", 1))
def test_all_returns_all_records(self):
self.store.upsert(_record(issue_number=1, slug="s1"))
self.store.upsert(_record(issue_number=2, slug="s2"))
recs = self.store.all()
self.assertEqual(2, len(recs))
slugs = {r.slug for r in recs}
self.assertEqual({"s1", "s2"}, slugs)
def test_all_empty(self):
self.assertEqual([], self.store.all())
def test_bottle_names_preserved(self):
self.store.upsert(_record(bottle_names=["claude", "dev"]))
got = self.store.get("o", "r", 1)
assert got is not None
self.assertEqual(["claude", "dev"], got.bottle_names)
class MakeForgeTest(unittest.TestCase):
def test_returns_gitea_forge(self):
with tempfile.TemporaryDirectory() as tmp:
config = _config(tmp)
with patch.dict(os.environ, {"GITEA_TOKEN": "tok"}):
forge = make_forge(config, "owner", "repo")
from bot_bottle.contrib.gitea.client import GiteaForge
self.assertIsInstance(forge, GiteaForge)
class MakeSidecarTest(unittest.TestCase):
def test_returns_forge_sidecar(self):
with tempfile.TemporaryDirectory() as tmp:
config = _config(tmp)
with patch.dict(os.environ, {"GITEA_TOKEN": "tok"}):
sidecar = make_sidecar(config, "owner", "repo", 1, [])
from bot_bottle.orchestrator.sidecar import ForgeSidecar
self.assertIsInstance(sidecar, ForgeSidecar)
class BuildTest(unittest.TestCase):
def test_returns_server_watchdog_orchestrator(self):
with tempfile.TemporaryDirectory() as tmp:
config = _config(tmp)
with patch.dict(os.environ, {"GITEA_TOKEN": "tok"}):
server, watchdog, orch = build(config)
server.server_close()
from bot_bottle.orchestrator.lifecycle import Orchestrator
from bot_bottle.orchestrator.watchdog import Watchdog
from bot_bottle.orchestrator.webhook import WebhookServer
self.assertIsInstance(server, WebhookServer)
self.assertIsInstance(watchdog, Watchdog)
self.assertIsInstance(orch, Orchestrator)
def test_server_binds_to_configured_host(self):
with tempfile.TemporaryDirectory() as tmp:
config = _config(tmp)
with patch.dict(os.environ, {"GITEA_TOKEN": "tok"}):
server, _, _ = build(config)
addr = server.server_address
server.server_close()
self.assertEqual("127.0.0.1", addr[0])
self.assertGreater(addr[1], 0)
if __name__ == "__main__":
unittest.main()
+38
View File
@@ -0,0 +1,38 @@
"""Unit: Config.from_env."""
from __future__ import annotations
import unittest
from pathlib import Path
from bot_bottle.orchestrator.config import Config
class ConfigTest(unittest.TestCase):
def test_defaults(self):
c = Config.from_env({"HOME": "/home/x"})
self.assertEqual("bot-bottle", c.forge_org)
self.assertEqual(1800, c.watchdog_timeout_secs)
self.assertEqual("127.0.0.1", c.webhook_host)
self.assertEqual(8477, c.webhook_port)
self.assertEqual(Path("/home/x/.bot-bottle/forge-queue"), c.queue_dir)
self.assertIsNone(c.db_path)
def test_overrides(self):
c = Config.from_env({
"HOME": "/home/x",
"FORGE_ORG": "agents",
"FORGE_WATCHDOG_TIMEOUT": "60",
"FORGE_GITEA_API": "https://g.example/api/v1",
"FORGE_WEBHOOK_PORT": "9000",
"FORGE_DB_PATH": "/data/bb.db",
})
self.assertEqual("agents", c.forge_org)
self.assertEqual(60, c.watchdog_timeout_secs)
self.assertEqual("https://g.example/api/v1", c.gitea_api)
self.assertEqual(9000, c.webhook_port)
self.assertEqual(Path("/data/bb.db"), c.db_path)
if __name__ == "__main__":
unittest.main()
+68
View File
@@ -0,0 +1,68 @@
"""Unit: webhook payload parsing."""
from __future__ import annotations
import unittest
from bot_bottle.orchestrator.events import parse_event
from bot_bottle.orchestrator.model import CommentCreated, IssueAssigned, PullRequestClosed
_REPO = {"repository": {"name": "bot-bottle", "owner": {"login": "didericis"}}}
class ParseEventTest(unittest.TestCase):
def test_issue_assigned(self):
payload = {
**_REPO,
"action": "assigned",
"issue": {
"number": 17,
"title": "Fix it",
"body": "please",
"assignees": [{"login": "agent-bot"}],
"labels": [{"name": "bot-bottle:implementer"}],
},
}
ev = parse_event("issues", payload)
self.assertIsInstance(ev, IssueAssigned)
assert isinstance(ev, IssueAssigned)
self.assertEqual(("didericis", "bot-bottle", 17), (ev.owner, ev.repo, ev.issue_number))
self.assertEqual(("agent-bot",), ev.assignees)
self.assertEqual(("bot-bottle:implementer",), ev.labels)
def test_issue_non_assigned_ignored(self):
self.assertIsNone(parse_event("issues", {**_REPO, "action": "opened", "issue": {}}))
def test_comment_created(self):
payload = {
**_REPO,
"action": "created",
"issue": {"number": 42, "pull_request": {"x": 1}},
"comment": {"id": 5, "user": {"login": "reviewer"}, "body": "redo"},
}
ev = parse_event("issue_comment", payload)
assert isinstance(ev, CommentCreated)
self.assertEqual(42, ev.issue_number)
self.assertEqual("reviewer", ev.author)
self.assertTrue(ev.is_pull)
def test_pull_request_closed(self):
payload = {**_REPO, "action": "closed", "pull_request": {"number": 8, "merged": True}}
ev = parse_event("pull_request", payload)
assert isinstance(ev, PullRequestClosed)
self.assertEqual(8, ev.pr_number)
self.assertTrue(ev.merged)
def test_pull_request_non_closed_ignored(self):
self.assertIsNone(parse_event("pull_request", {**_REPO, "action": "opened"}))
def test_comment_non_created_action_ignored(self):
payload = {**_REPO, "action": "edited", "issue": {}, "comment": {}}
self.assertIsNone(parse_event("issue_comment", payload))
def test_unknown_kind_ignored(self):
self.assertIsNone(parse_event("push", {**_REPO}))
if __name__ == "__main__":
unittest.main()
@@ -0,0 +1,75 @@
"""Unit: ForgeState + SqliteForgeStateStore."""
from __future__ import annotations
import unittest
from bot_bottle.contrib.gitea.forge_state import ForgeState, SqliteForgeStateStore
def _state(**kw: object) -> ForgeState:
defaults: dict[str, object] = dict(
owner="alice", repo="myrepo", issue_number=1,
slug="impl-alice-myrepo-1", agent_name="impl",
)
defaults.update(kw)
return ForgeState(**defaults) # type: ignore[arg-type]
class ForgeStateStoreTest(unittest.TestCase):
def setUp(self):
self.store = SqliteForgeStateStore(None)
def test_upsert_and_get(self):
s = _state()
self.store.upsert(s)
got = self.store.get("alice", "myrepo", 1)
assert got is not None
self.assertEqual("impl-alice-myrepo-1", got.slug)
self.assertEqual("impl", got.agent_name)
def test_get_missing(self):
self.assertIsNone(self.store.get("alice", "myrepo", 99))
def test_upsert_replaces(self):
self.store.upsert(_state(status="running"))
self.store.upsert(_state(status="frozen"))
got = self.store.get("alice", "myrepo", 1)
assert got is not None
self.assertEqual("frozen", got.status)
def test_delete(self):
self.store.upsert(_state())
self.store.delete("alice", "myrepo", 1)
self.assertIsNone(self.store.get("alice", "myrepo", 1))
def test_delete_missing_no_error(self):
self.store.delete("alice", "myrepo", 99)
def test_all_sorted(self):
self.store.upsert(_state(owner="z", issue_number=2))
self.store.upsert(_state(owner="a", issue_number=1))
rows = self.store.all()
self.assertEqual(("a", "z"), (rows[0].owner, rows[1].owner))
def test_bottle_names_roundtrip(self):
self.store.upsert(_state(bottle_names=["claude", "dev"]))
got = self.store.get("alice", "myrepo", 1)
assert got is not None
self.assertEqual(["claude", "dev"], got.bottle_names)
def test_pr_number_none_roundtrip(self):
self.store.upsert(_state(pr_number=None))
got = self.store.get("alice", "myrepo", 1)
assert got is not None
self.assertIsNone(got.pr_number)
def test_pr_number_int_roundtrip(self):
self.store.upsert(_state(pr_number=42))
got = self.store.get("alice", "myrepo", 1)
assert got is not None
self.assertEqual(42, got.pr_number)
if __name__ == "__main__":
unittest.main()
+163
View File
@@ -0,0 +1,163 @@
"""Unit: the orchestration lifecycle."""
from __future__ import annotations
import unittest
from typing import cast
from bot_bottle.orchestrator.lifecycle import Orchestrator
from bot_bottle.orchestrator.model import (
STATUS_FROZEN,
STATUS_RUNNING,
CommentCreated,
IssueAssigned,
PullRequestClosed,
)
from bot_bottle.orchestrator.store import InMemoryStateStore
from ._fakes import FakeForge, FakeRunner
def _assigned(
labels: tuple[str, ...] = ("bot-bottle:impl",),
assignees: tuple[str, ...] = ("agent-bot",),
) -> IssueAssigned:
return IssueAssigned(
owner="didericis", repo="bot-bottle", issue_number=17,
title="t", body="the task", assignees=tuple(assignees), labels=tuple(labels),
)
class LifecycleTest(unittest.TestCase):
def setUp(self):
self.forge = FakeForge(members=("agent-bot",))
self.store = InMemoryStateStore()
self.runner = FakeRunner()
self.orch = Orchestrator(
forge=self.forge, store=self.store, runner=self.runner,
org="bot-bottle", gitea_api="https://g/api/v1",
now=lambda: "2026-07-01T00:00:00-04:00",
)
def _record(self):
return self.store.get("didericis", "bot-bottle", 17)
def test_assigned_targeted_launches(self):
self.orch.handle(_assigned())
rec = self._record()
assert rec is not None
self.assertEqual(STATUS_RUNNING, rec.status)
self.assertEqual("impl-didericis-bot-bottle-17", rec.slug)
self.assertEqual("start", self.runner.calls[0][0])
# forge context injected into the child env.
env = cast("dict[str, str]", self.runner.calls[0][5])
self.assertEqual("didericis", env["FORGE_OWNER"])
self.assertEqual("17", env["FORGE_ISSUE_NUMBER"])
def test_untargeted_ignored(self):
self.orch.handle(_assigned(labels=("bug",)))
self.assertIsNone(self._record())
self.assertEqual([], self.runner.calls)
def test_assigned_is_idempotent(self):
self.orch.handle(_assigned())
self.orch.handle(_assigned()) # redelivery
starts = [c for c in self.runner.calls if c[0] == "start"]
self.assertEqual(1, len(starts))
def test_done_signal_freezes(self):
self.orch.handle(_assigned())
self.orch.on_done_signal("didericis", "bot-bottle", 17, "success", "done")
rec = self._record()
assert rec is not None
self.assertEqual(STATUS_FROZEN, rec.status)
self.assertIn(("freeze", "impl-didericis-bot-bottle-17"), self.runner.calls)
def test_done_signal_ignored_when_not_running(self):
# No record yet -> no freeze.
self.orch.on_done_signal("didericis", "bot-bottle", 17, "s", "")
self.assertEqual([], self.runner.calls)
def test_comment_on_frozen_resumes(self):
self.orch.handle(_assigned())
self.orch.on_done_signal("didericis", "bot-bottle", 17, "s", "")
self.orch.handle(CommentCreated(
owner="didericis", repo="bot-bottle", issue_number=17,
comment_id=1, author="reviewer", body="please redo", is_pull=False,
))
rec = self._record()
assert rec is not None
self.assertEqual(STATUS_RUNNING, rec.status)
self.assertIn(("resume", "impl-didericis-bot-bottle-17", "please redo"),
self.runner.calls)
def test_comment_echo_guard(self):
self.orch.handle(_assigned())
self.orch.on_done_signal("didericis", "bot-bottle", 17, "s", "")
rec = self._record()
assert rec is not None
rec.agent_git_user = "agent-bot"
self.store.upsert(rec)
self.orch.handle(CommentCreated(
owner="didericis", repo="bot-bottle", issue_number=17,
comment_id=2, author="agent-bot", body="I finished", is_pull=False,
))
# Still frozen, no resume triggered by the agent's own comment.
self.assertEqual(STATUS_FROZEN, self._record().status) # type: ignore[union-attr]
self.assertNotIn("resume", [c[0] for c in self.runner.calls])
def test_comment_on_running_ignored(self):
self.orch.handle(_assigned()) # running
self.orch.handle(CommentCreated(
owner="didericis", repo="bot-bottle", issue_number=17,
comment_id=1, author="reviewer", body="hi", is_pull=False,
))
self.assertNotIn("resume", [c[0] for c in self.runner.calls])
def test_pr_comment_routes_via_link(self):
self.orch.handle(_assigned())
self.orch.on_done_signal("didericis", "bot-bottle", 17, "s", "")
self.orch.link_pr("didericis", "bot-bottle", 17, 42)
# Comment arrives on PR #42 (issue_number == PR number in Gitea).
self.orch.handle(CommentCreated(
owner="didericis", repo="bot-bottle", issue_number=42,
comment_id=9, author="reviewer", body="fix", is_pull=True,
))
self.assertIn(("resume", "impl-didericis-bot-bottle-17", "fix"),
self.runner.calls)
def test_pr_closed_destroys_and_removes(self):
self.orch.handle(_assigned())
self.orch.link_pr("didericis", "bot-bottle", 17, 42)
self.orch.handle(PullRequestClosed(
owner="didericis", repo="bot-bottle", pr_number=42, merged=True,
))
self.assertIn(("destroy", "impl-didericis-bot-bottle-17"), self.runner.calls)
self.assertIsNone(self._record())
def test_comment_on_untracked_issue_ignored(self):
# No record in store and is_pull=False -> _route_comment returns None.
self.orch.handle(CommentCreated(
owner="didericis", repo="bot-bottle", issue_number=99,
comment_id=1, author="reviewer", body="hi", is_pull=False,
))
self.assertEqual([], self.runner.calls)
def test_pr_closed_untracked_pr_ignored(self):
# _find_by_pr finds nothing -> _on_pr_closed exits early.
self.orch.handle(PullRequestClosed(
owner="didericis", repo="bot-bottle", pr_number=999, merged=True,
))
self.assertEqual([], self.runner.calls)
class IsoNowTest(unittest.TestCase):
def test_returns_iso_string(self):
from bot_bottle.orchestrator.lifecycle import _iso_now
ts = _iso_now()
self.assertIsInstance(ts, str)
self.assertIn("T", ts)
if __name__ == "__main__":
unittest.main()
+88
View File
@@ -0,0 +1,88 @@
"""Unit: __main__ CLI entry points (run and status commands)."""
from __future__ import annotations
import io
import unittest
from unittest.mock import patch
from bot_bottle.orchestrator.__main__ import main
from bot_bottle.orchestrator.config import Config
from bot_bottle.orchestrator.model import RunRecord
def _config() -> Config:
return Config.from_env({"HOME": "/tmp"})
class MainRunTest(unittest.TestCase):
def test_run_delegates_to_bootstrap(self):
config = _config()
with patch.object(Config, "from_env", return_value=config), \
patch("bot_bottle.orchestrator.bootstrap.run") as mock_run:
rc = main(["run"])
self.assertEqual(0, rc)
mock_run.assert_called_once_with(config)
def test_run_prints_listen_address_to_stderr(self):
config = _config()
err = io.StringIO()
with patch.object(Config, "from_env", return_value=config), \
patch("bot_bottle.orchestrator.bootstrap.run"), \
patch("sys.stderr", err):
main(["run"])
self.assertIn(str(config.webhook_port), err.getvalue())
class MainStatusTest(unittest.TestCase):
def test_status_empty_store(self):
config = _config()
with patch.object(Config, "from_env", return_value=config), \
patch("bot_bottle.orchestrator.bootstrap.BotBottleStateStore") as MockStore:
MockStore.return_value.all.return_value = []
rc = main(["status"])
self.assertEqual(0, rc)
def test_status_prints_records(self):
config = _config()
rec = RunRecord(
owner="o", repo="r", issue_number=1, slug="my-slug",
agent_name="a", pr_number=7, status="frozen",
)
out = io.StringIO()
with patch.object(Config, "from_env", return_value=config), \
patch("bot_bottle.orchestrator.bootstrap.BotBottleStateStore") as MockStore, \
patch("sys.stdout", out):
MockStore.return_value.all.return_value = [rec]
rc = main(["status"])
self.assertEqual(0, rc)
self.assertIn("my-slug", out.getvalue())
self.assertIn("PR#7", out.getvalue())
def test_status_no_pr_prints_dash(self):
config = _config()
rec = RunRecord(
owner="o", repo="r", issue_number=2, slug="s2",
agent_name="a", pr_number=None, status="running",
)
out = io.StringIO()
with patch.object(Config, "from_env", return_value=config), \
patch("bot_bottle.orchestrator.bootstrap.BotBottleStateStore") as MockStore, \
patch("sys.stdout", out):
MockStore.return_value.all.return_value = [rec]
main(["status"])
self.assertIn("-", out.getvalue())
class MainArgparseTest(unittest.TestCase):
def test_no_command_exits(self):
with self.assertRaises(SystemExit):
main([])
def test_unknown_command_exits(self):
with self.assertRaises(SystemExit):
main(["bogus"])
if __name__ == "__main__":
unittest.main()
@@ -0,0 +1,53 @@
"""Unit: provenance assembly + serialization."""
from __future__ import annotations
import unittest
from bot_bottle.orchestrator.model import RunRecord
from bot_bottle.orchestrator.provenance import build_provenance, ops_from_log, provenance_to_dict
def _record() -> RunRecord:
return RunRecord(
owner="didericis", repo="bot-bottle", issue_number=17,
slug="impl-17", agent_name="impl", bottle_names=["claude"],
last_checkin_at="2026-07-01T00:05:00-04:00",
)
class ProvenanceTest(unittest.TestCase):
def test_ops_from_log(self):
ops = ops_from_log([
{"at": "T1", "op": "read_pr", "target": 5, "detail": "ok"},
{"at": "T2", "op": "signal_done", "target": None, "detail": "success: done"},
])
self.assertEqual(2, len(ops))
self.assertEqual("read_pr", ops[0].op)
self.assertIsNone(ops[1].target)
def test_build_and_serialize(self):
ops = ops_from_log([{"at": "T1", "op": "post_comment", "target": 17, "detail": "ok"}])
prov = build_provenance(
_record(), ops=ops, started_at="2026-07-01T00:00:00-04:00",
finished_at="2026-07-01T00:05:00-04:00", exit_code=0, watchdog_fired=False,
)
d = provenance_to_dict(prov)
self.assertEqual("impl-17", d["slug"])
self.assertEqual("didericis", d["owner"])
self.assertEqual(["claude"], d["bottles"])
self.assertEqual(0, d["exit_code"])
self.assertFalse(d["watchdog_fired"])
self.assertEqual(1, len(d["ops"]))
self.assertEqual("post_comment", d["ops"][0]["op"])
def test_watchdog_flag_serialized(self):
prov = build_provenance(
_record(), ops=(), started_at="", finished_at="",
exit_code=None, watchdog_fired=True,
)
self.assertTrue(provenance_to_dict(prov)["watchdog_fired"])
if __name__ == "__main__":
unittest.main()
+81
View File
@@ -0,0 +1,81 @@
"""Unit: SubprocessBottleRunner + slugify (injected run fn)."""
from __future__ import annotations
import unittest
from collections.abc import Sequence
from bot_bottle.orchestrator.runner import SubprocessBottleRunner, slugify
class SlugifyTest(unittest.TestCase):
def test_basic(self):
self.assertEqual("impl-didericis-bot-bottle-17",
slugify("impl-didericis-bot-bottle-17"))
def test_collapses_and_strips(self):
self.assertEqual("a-b-c", slugify(" A_B/C!! "))
class SubprocessRunnerTest(unittest.TestCase):
def setUp(self):
self.argvs: list[list[str]] = []
self.envs: list[dict[str, str]] = []
def fake_run(argv: Sequence[str], env: dict[str, str]) -> int:
self.argvs.append(list(argv))
self.envs.append(dict(env))
return 0
self.runner = SubprocessBottleRunner(
cli="/x/cli.py", base_env={"PATH": "/bin"}, python="/py", run=fake_run
)
def test_start_argv_and_env(self):
result = self.runner.start(
agent="impl", bottles=["claude", "dev"], label="impl-r-17",
prompt="do it", forge_env={"FORGE_OWNER": "didericis"},
)
self.assertEqual("impl-r-17", result.slug)
argv = self.argvs[0]
self.assertEqual(["/py", "/x/cli.py", "start", "impl", "--headless",
"--label", "impl-r-17", "--prompt", "do it",
"--bottle", "claude", "--bottle", "dev"], argv)
# forge_env merged over base_env for the child.
self.assertEqual("didericis", self.envs[0]["FORGE_OWNER"])
self.assertEqual("/bin", self.envs[0]["PATH"])
def test_start_no_bottles_omits_flag(self):
self.runner.start(agent="impl", bottles=[], label="l", prompt="p", forge_env={})
self.assertNotIn("--bottle", self.argvs[0])
def test_freeze_calls_commit(self):
self.runner.freeze("slug-1")
self.assertEqual(["/py", "/x/cli.py", "commit", "slug-1"], self.argvs[0])
def test_resume_headless(self):
r = self.runner.resume("slug-1", "address review")
self.assertEqual("slug-1", r.slug)
self.assertEqual(
["/py", "/x/cli.py", "resume", "slug-1", "--headless", "--prompt",
"address review"], self.argvs[0])
def test_destroy_calls_cleanup(self):
code = self.runner.destroy("slug-7")
self.assertEqual(0, code)
self.assertEqual(["/py", "/x/cli.py", "cleanup", "slug-7"], self.argvs[0])
class DefaultRunTest(unittest.TestCase):
def test_calls_subprocess_and_returns_code(self):
from unittest.mock import MagicMock, patch
from bot_bottle.orchestrator.runner import _default_run
with patch("subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=42)
code = _default_run(["echo", "hi"], {"PATH": "/bin"})
self.assertEqual(42, code)
mock_run.assert_called_once_with(["echo", "hi"], env={"PATH": "/bin"}, check=False)
if __name__ == "__main__":
unittest.main()
@@ -0,0 +1,75 @@
"""Unit: ScopedForge — read-anywhere / write-scoped access control."""
from __future__ import annotations
import unittest
from bot_bottle.contrib.forge.base import ScopedForge
from ._fakes import FakeForge
class ScopedForgeTest(unittest.TestCase):
def setUp(self):
self.inner = FakeForge()
self.scoped = ScopedForge(
self.inner, assigned_issue=10, assigned_prs=[20, 30]
)
# --- reads always pass through -----------------------------------------
def test_read_issue_allowed_anywhere(self):
for number in (10, 20, 99):
result = self.scoped.read_issue(number)
self.assertEqual(number, result["number"])
def test_read_pr_allowed_anywhere(self):
for number in (10, 20, 99):
result = self.scoped.read_pr(number)
self.assertEqual(number, result["number"])
def test_read_comments_allowed_anywhere(self):
comments = self.scoped.read_comments(99)
self.assertTrue(len(comments) > 0)
def test_is_org_member_passes_through(self):
inner = FakeForge(members=("alice",))
scoped = ScopedForge(inner, assigned_issue=1, assigned_prs=[])
self.assertTrue(scoped.is_org_member("org", "alice"))
self.assertFalse(scoped.is_org_member("org", "bob"))
# --- writes: assigned numbers allowed ----------------------------------
def test_post_comment_on_assigned_issue(self):
self.scoped.post_comment(10, "hi")
self.assertIn((10, "hi"), self.inner.comments)
def test_post_comment_on_assigned_pr(self):
self.scoped.post_comment(20, "lgtm")
self.assertIn((20, "lgtm"), self.inner.comments)
def test_update_description_on_assigned(self):
self.scoped.update_description(30, "updated")
self.assertIn((30, "updated"), self.inner.descriptions)
# --- writes: unassigned numbers denied ---------------------------------
def test_post_comment_denied_for_unassigned(self):
with self.assertRaises(PermissionError):
self.scoped.post_comment(99, "nope")
self.assertEqual([], self.inner.comments)
def test_update_description_denied_for_unassigned(self):
with self.assertRaises(PermissionError):
self.scoped.update_description(99, "nope")
self.assertEqual([], self.inner.descriptions)
def test_error_message_names_number(self):
try:
self.scoped.post_comment(99, "nope")
except PermissionError as exc:
self.assertIn("99", str(exc))
if __name__ == "__main__":
unittest.main()
+204
View File
@@ -0,0 +1,204 @@
"""Unit: forge sidecar dispatch, op log, queue relay, socket server."""
from __future__ import annotations
import dataclasses
import json
import socket
import tempfile
import threading
import unittest
from pathlib import Path
from bot_bottle.orchestrator.sidecar import (
ForgeSidecar,
OpLog,
_jsonable,
drain_done_events,
serve,
write_done_event,
)
from ._fakes import FakeForge
class SidecarDispatchTest(unittest.TestCase):
def setUp(self):
self.tmp = Path(self.enterContext(tempfile.TemporaryDirectory())) # pylint: disable=consider-using-with
self.forge = FakeForge()
self.log = OpLog(self.tmp / "ops.jsonl", now=lambda: "T")
self.queue = self.tmp / "queue"
self.sc = ForgeSidecar(
forge=self.forge, op_log=self.log, queue_dir=self.queue,
run_key=("o", "r", 17),
)
def test_read_pr_ok_and_logged(self):
resp = self.sc.dispatch("read_pr", {"number": 5})
self.assertTrue(resp["ok"])
self.assertEqual(5, resp["result"]["number"])
self.assertEqual([("read_pr", 5, "ok")],
[(o["op"], o["target"], o["detail"]) for o in self.log.read()])
def test_post_comment_writes_and_logs(self):
resp = self.sc.dispatch("post_comment", {"number": 17, "body": "done"})
self.assertTrue(resp["ok"])
self.assertEqual([(17, "done")], self.forge.comments)
def test_scope_denied_write_returns_error_and_audits_rejection(self):
self.forge.scope_denied.add(999)
resp = self.sc.dispatch("post_comment", {"number": 999, "body": "x"})
self.assertFalse(resp["ok"])
self.assertIn("denied", resp["error"])
# The rejection is recorded in the op log, not just the allows.
self.assertIn("error", self.log.read()[-1]["detail"])
self.assertEqual([], self.forge.comments)
def test_signal_done_queues_event(self):
resp = self.sc.dispatch("signal_done", {"status": "success", "summary": "ok"})
self.assertTrue(resp["ok"])
events = drain_done_events(self.queue)
self.assertEqual(1, len(events))
self.assertEqual(("o", "r", 17, "success"),
(events[0]["owner"], events[0]["repo"],
events[0]["issue_number"], events[0]["status"]))
def test_unknown_method(self):
resp = self.sc.dispatch("delete_repo", {})
self.assertFalse(resp["ok"])
class JsonableTest(unittest.TestCase):
def test_plain_value_passthrough(self):
self.assertEqual(42, _jsonable(42))
self.assertEqual("s", _jsonable("s"))
def test_dataclass_converted_to_dict(self):
@dataclasses.dataclass
class Thing:
x: int
y: str = "hi"
self.assertEqual({"x": 99, "y": "hi"}, _jsonable(Thing(x=99)))
def test_list_recursed(self):
self.assertEqual([1, 2, 3], _jsonable([1, 2, 3]))
def test_list_of_dataclasses(self):
@dataclasses.dataclass
class Item:
v: int
result = _jsonable([Item(v=1), Item(v=2)])
self.assertEqual([{"v": 1}, {"v": 2}], result)
class QueueTest(unittest.TestCase):
def test_drain_removes_events(self):
tmp = Path(self.enterContext(tempfile.TemporaryDirectory())) # pylint: disable=consider-using-with
write_done_event(tmp, {"owner": "o", "repo": "r", "issue_number": 1})
self.assertEqual(1, len(drain_done_events(tmp)))
self.assertEqual([], drain_done_events(tmp)) # drained
def test_drain_missing_dir(self):
self.assertEqual([], drain_done_events(Path("/nonexistent/queue")))
def test_drain_skips_corrupted_file(self):
tmp = Path(self.enterContext(tempfile.TemporaryDirectory())) # pylint: disable=consider-using-with
(tmp / "done-bad.json").write_text("not json", encoding="utf-8")
events = drain_done_events(tmp)
self.assertEqual([], events)
# The corrupted file is removed by the finally block.
self.assertFalse((tmp / "done-bad.json").exists())
class OpLogReadTest(unittest.TestCase):
def test_read_missing_file_returns_empty(self):
with tempfile.TemporaryDirectory() as tmp:
log = OpLog(Path(tmp) / "sub" / "ops.jsonl")
# File not written yet — read() should return [].
self.assertEqual([], log.read())
class SocketServerTest(unittest.TestCase):
def _make_server(self, tmp: Path):
sock = tmp / "s.sock"
if len(str(sock)) > 100:
self.skipTest("temp socket path too long for AF_UNIX")
sidecar = ForgeSidecar(
forge=FakeForge(), op_log=OpLog(tmp / "ops.jsonl"),
queue_dir=tmp / "q", run_key=("o", "r", 1),
)
return serve(sidecar, sock), sock
def test_round_trip_over_unix_socket(self):
tmp = tempfile.mkdtemp()
sock = Path(tmp) / "s.sock"
if len(str(sock)) > 100: # AF_UNIX path limit; skip on long tmp paths
self.skipTest("temp socket path too long for AF_UNIX")
sidecar = ForgeSidecar(
forge=FakeForge(), op_log=OpLog(Path(tmp) / "ops.jsonl"),
queue_dir=Path(tmp) / "q", run_key=("o", "r", 1),
)
srv = serve(sidecar, sock)
t = threading.Thread(target=srv.handle_request, daemon=True)
t.start()
try:
client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
client.connect(str(sock))
client.sendall(b'{"method": "read_issue", "params": {"number": 3}}\n')
line = client.makefile().readline()
client.close()
finally:
t.join(timeout=5)
srv.server_close()
resp = json.loads(line)
self.assertTrue(resp["ok"])
self.assertEqual(3, resp["result"]["number"])
def test_handler_invalid_json_returns_error(self):
tmp = Path(tempfile.mkdtemp())
srv, sock = self._make_server(tmp)
t = threading.Thread(target=srv.handle_request, daemon=True)
t.start()
try:
client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
client.connect(str(sock))
client.sendall(b"not valid json!\n")
line = client.makefile().readline()
client.close()
finally:
t.join(timeout=5)
srv.server_close()
resp = json.loads(line)
self.assertFalse(resp["ok"])
self.assertIn("invalid json", resp["error"])
def test_handler_empty_line_closes_silently(self):
tmp = Path(tempfile.mkdtemp())
srv, sock = self._make_server(tmp)
t = threading.Thread(target=srv.handle_request, daemon=True)
t.start()
try:
client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
client.connect(str(sock))
client.close() # immediate EOF -> readline() returns b""
finally:
t.join(timeout=5)
srv.server_close()
def test_serve_removes_existing_socket_path(self):
tmp = Path(tempfile.mkdtemp())
sock = tmp / "existing.sock"
if len(str(sock)) > 100:
self.skipTest("temp socket path too long for AF_UNIX")
sock.touch() # pre-existing file at socket path
sidecar = ForgeSidecar(
forge=FakeForge(), op_log=OpLog(tmp / "ops.jsonl"),
queue_dir=tmp / "q", run_key=("o", "r", 1),
)
srv = serve(sidecar, sock) # should unlink the pre-existing file
srv.server_close()
if __name__ == "__main__":
unittest.main()
+50
View File
@@ -0,0 +1,50 @@
"""Unit: InMemoryStateStore."""
from __future__ import annotations
import unittest
from bot_bottle.orchestrator.model import RunRecord
from bot_bottle.orchestrator.store import InMemoryStateStore
def _rec(issue: int, owner: str = "o") -> RunRecord:
return RunRecord(owner=owner, repo="r", issue_number=issue, slug=f"s{issue}",
agent_name="a")
class InMemoryStoreTest(unittest.TestCase):
def setUp(self):
self.store = InMemoryStateStore()
def test_upsert_get(self):
self.store.upsert(_rec(1))
got = self.store.get("o", "r", 1)
assert got is not None
self.assertEqual("s1", got.slug)
def test_get_missing(self):
self.assertIsNone(self.store.get("o", "r", 99))
def test_upsert_replaces(self):
self.store.upsert(_rec(1))
r = _rec(1)
r.slug = "changed"
self.store.upsert(r)
self.assertEqual("changed", self.store.get("o", "r", 1).slug) # type: ignore[union-attr]
self.assertEqual(1, len(self.store.all()))
def test_delete(self):
self.store.upsert(_rec(1))
self.store.delete("o", "r", 1)
self.assertIsNone(self.store.get("o", "r", 1))
def test_all_sorted(self):
self.store.upsert(_rec(2, owner="b"))
self.store.upsert(_rec(1, owner="a"))
self.assertEqual([("a", 1), ("b", 2)],
[(r.owner, r.issue_number) for r in self.store.all()])
if __name__ == "__main__":
unittest.main()
+60
View File
@@ -0,0 +1,60 @@
"""Unit: targeting (labels + org membership)."""
from __future__ import annotations
import unittest
from bot_bottle.orchestrator.model import IssueAssigned
from bot_bottle.orchestrator.targeting import parse_labels, resolve_target
from ._fakes import FakeForge
def _issue(
assignees: tuple[str, ...] = ("agent-bot",),
labels: tuple[str, ...] = ("bot-bottle:implementer",),
) -> IssueAssigned:
return IssueAssigned(
owner="didericis", repo="bot-bottle", issue_number=17,
title="t", body="b", assignees=tuple(assignees), labels=tuple(labels),
)
class ParseLabelsTest(unittest.TestCase):
def test_agent_label(self):
self.assertEqual(("implementer", None), parse_labels(("bot-bottle:implementer",)))
def test_bottle_override_not_confused_with_agent(self):
agent, bottle = parse_labels(("bot-bottle:impl", "bot-bottle-bottle:dev"))
self.assertEqual(("impl", "dev"), (agent, bottle))
def test_no_agent_label(self):
self.assertEqual((None, None), parse_labels(("bug", "p1")))
class ResolveTargetTest(unittest.TestCase):
def setUp(self):
self.forge = FakeForge(members=("agent-bot",))
def test_targeted(self):
target = resolve_target(_issue(), self.forge, "bot-bottle")
assert target is not None
self.assertEqual("implementer", target.agent_name)
self.assertIsNone(target.bottle_override)
def test_bottle_override(self):
ev = _issue(labels=("bot-bottle:impl", "bot-bottle-bottle:dev"))
target = resolve_target(ev, self.forge, "bot-bottle")
assert target is not None
self.assertEqual("dev", target.bottle_override)
def test_no_label_not_targeted(self):
self.assertIsNone(resolve_target(_issue(labels=("bug",)), self.forge, "bot-bottle"))
def test_non_member_assignee_not_targeted(self):
ev = _issue(assignees=("random-user",))
self.assertIsNone(resolve_target(ev, self.forge, "bot-bottle"))
if __name__ == "__main__":
unittest.main()
+80
View File
@@ -0,0 +1,80 @@
"""Unit: watchdog sweep."""
from __future__ import annotations
import time
import unittest
import unittest.mock
from datetime import datetime, timedelta
from bot_bottle.orchestrator.model import STATUS_FROZEN, STATUS_RUNNING, RunRecord
from bot_bottle.orchestrator.store import InMemoryStateStore
from bot_bottle.orchestrator.watchdog import Watchdog
from ._fakes import FakeRunner
_NOW = datetime(2026, 7, 1, 12, 0, 0).astimezone()
def _record(issue: int, status: str, checkin: str) -> RunRecord:
return RunRecord(
owner="o", repo="r", issue_number=issue, slug=f"s{issue}",
agent_name="a", status=status, last_checkin_at=checkin,
)
class WatchdogSweepTest(unittest.TestCase):
def setUp(self):
self.store = InMemoryStateStore()
self.runner = FakeRunner()
self.wd = Watchdog(store=self.store, runner=self.runner, timeout_secs=1800)
def _status(self, issue: int) -> str:
rec = self.store.get("o", "r", issue)
assert rec is not None
return rec.status
def test_stale_running_is_frozen(self):
stale = (_NOW - timedelta(minutes=31)).isoformat()
self.store.upsert(_record(1, STATUS_RUNNING, stale))
fired = self.wd.sweep(_NOW)
self.assertEqual([1], [r.issue_number for r in fired])
self.assertEqual(STATUS_FROZEN, self._status(1))
self.assertIn(("freeze", "s1"), self.runner.calls)
def test_fresh_running_untouched(self):
fresh = (_NOW - timedelta(minutes=5)).isoformat()
self.store.upsert(_record(2, STATUS_RUNNING, fresh))
self.assertEqual([], self.wd.sweep(_NOW))
self.assertEqual(STATUS_RUNNING, self._status(2))
def test_non_running_ignored(self):
stale = (_NOW - timedelta(hours=2)).isoformat()
self.store.upsert(_record(3, STATUS_FROZEN, stale))
self.assertEqual([], self.wd.sweep(_NOW))
def test_unparseable_checkin_skipped(self):
self.store.upsert(_record(4, STATUS_RUNNING, "not-a-time"))
self.assertEqual([], self.wd.sweep(_NOW))
def test_start_and_stop(self):
# Exercises the daemon-thread start/stop path; stop sets the event
# so the loop's wait returns immediately.
self.wd.start()
self.wd.stop()
def test_loop_sweeps_stale_record(self):
# Patch tick to near-zero so the loop iterates quickly.
stale = (_NOW - timedelta(hours=1)).isoformat()
self.store.upsert(_record(5, STATUS_RUNNING, stale))
with unittest.mock.patch("bot_bottle.orchestrator.watchdog._TICK_SECS", 0.01):
self.wd.start()
time.sleep(0.05) # enough for several iterations at 0.01s tick
self.wd.stop()
rec = self.store.get("o", "r", 5)
assert rec is not None
self.assertEqual(STATUS_FROZEN, rec.status)
if __name__ == "__main__":
unittest.main()
+161
View File
@@ -0,0 +1,161 @@
"""Unit: webhook HTTP surface (signature + routing over a real server)."""
from __future__ import annotations
import hashlib
import hmac
import json
import threading
import unittest
import urllib.request
from urllib.error import HTTPError
from bot_bottle.orchestrator.model import RunRecord
from bot_bottle.orchestrator.store import InMemoryStateStore
from bot_bottle.orchestrator.webhook import WebhookServer, verify_signature
_ISSUE_ASSIGNED = {
"action": "assigned",
"repository": {"name": "bot-bottle", "owner": {"login": "didericis"}},
"issue": {
"number": 17, "title": "t", "body": "b",
"assignees": [{"login": "agent-bot"}],
"labels": [{"name": "bot-bottle:impl"}],
},
}
class _RecordingOrch:
def __init__(self) -> None:
self.events: list[object] = []
def handle(self, event: object) -> None:
self.events.append(event)
class SignatureTest(unittest.TestCase):
def test_verify(self):
secret = b"s3cret"
body = b'{"x":1}'
sig = hmac.new(secret, body, hashlib.sha256).hexdigest()
self.assertTrue(verify_signature(secret, body, sig))
self.assertFalse(verify_signature(secret, body, "deadbeef"))
class WebhookServerTest(unittest.TestCase):
# _serve is the per-test setup; attributes are assigned there.
# pylint: disable=attribute-defined-outside-init
def _serve(self, **kwargs: object) -> None:
self.orch = _RecordingOrch()
kwargs.setdefault("store", InMemoryStateStore())
self.server = WebhookServer(
("127.0.0.1", 0), orchestrator=self.orch, **kwargs, # type: ignore[arg-type]
)
self.port = self.server.server_address[1]
self.thread = threading.Thread(target=self.server.serve_forever, daemon=True)
self.thread.start()
self.addCleanup(self._shutdown)
def _shutdown(self) -> None:
self.server.shutdown()
self.server.server_close()
self.thread.join(timeout=5)
def _post(
self, path: str, body: bytes, headers: dict[str, str] | None = None
) -> tuple[int, dict[str, object]]:
req = urllib.request.Request(
f"http://127.0.0.1:{self.port}{path}", data=body, method="POST",
headers=headers or {},
)
with urllib.request.urlopen(req, timeout=5) as resp:
return resp.status, json.loads(resp.read())
def _get(self, path: str) -> tuple[int, dict[str, object]]:
with urllib.request.urlopen(f"http://127.0.0.1:{self.port}{path}", timeout=5) as r:
return r.status, json.loads(r.read())
def test_webhook_dispatches(self):
self._serve()
body = json.dumps(_ISSUE_ASSIGNED).encode()
status, payload = self._post("/webhook", body, {"X-Gitea-Event": "issues"})
self.assertEqual(200, status)
self.assertTrue(payload["handled"])
self.assertEqual(1, len(self.orch.events))
def test_unhandled_event_ok_but_not_handled(self):
self._serve()
body = json.dumps({"action": "push"}).encode()
_status, payload = self._post("/webhook", body, {"X-Gitea-Event": "push"})
self.assertFalse(payload["handled"])
self.assertEqual([], self.orch.events)
def test_invalid_json_400(self):
self._serve()
with self.assertRaises(HTTPError) as ctx:
self._post("/webhook", b"{not json", {"X-Gitea-Event": "issues"})
self.assertEqual(400, ctx.exception.code)
def test_bad_signature_rejected(self):
self._serve(secret=b"sekret")
body = json.dumps(_ISSUE_ASSIGNED).encode()
with self.assertRaises(HTTPError) as ctx:
self._post("/webhook", body,
{"X-Gitea-Event": "issues", "X-Gitea-Signature": "deadbeef"})
self.assertEqual(401, ctx.exception.code)
self.assertEqual([], self.orch.events)
def test_good_signature_accepted(self):
self._serve(secret=b"sekret")
body = json.dumps(_ISSUE_ASSIGNED).encode()
sig = hmac.new(b"sekret", body, hashlib.sha256).hexdigest()
status, _payload = self._post(
"/webhook", body, {"X-Gitea-Event": "issues", "X-Gitea-Signature": sig})
self.assertEqual(200, status)
self.assertEqual(1, len(self.orch.events))
def test_healthz(self):
self._serve()
self.assertEqual(200, self._get("/healthz")[0])
def test_unknown_path_404(self):
self._serve()
with self.assertRaises(HTTPError) as ctx:
self._post("/nope", b"{}", {"X-Gitea-Event": "issues"})
self.assertEqual(404, ctx.exception.code)
def test_provenance_returns_record_and_ops(self):
store = InMemoryStateStore()
store.upsert(RunRecord(owner="didericis", repo="bot-bottle", issue_number=17,
slug="impl-17", agent_name="impl", bottle_names=["claude"]))
def reader(rec: object) -> list[dict[str, object]]: # pylint: disable=unused-argument
return [{"at": "T", "op": "post_comment", "target": 17, "detail": "ok"}]
self._serve(store=store, op_log_reader=reader)
status, payload = self._get("/provenance?owner=didericis&repo=bot-bottle&issue=17")
self.assertEqual(200, status)
self.assertEqual("impl-17", payload["slug"])
self.assertEqual(1, len(payload["ops"])) # type: ignore[arg-type]
def test_provenance_missing_params_400(self):
self._serve()
with self.assertRaises(HTTPError) as ctx:
self._get("/provenance?owner=didericis")
self.assertEqual(400, ctx.exception.code)
def test_provenance_unknown_run_404(self):
self._serve()
with self.assertRaises(HTTPError) as ctx:
self._get("/provenance?owner=x&repo=y&issue=1")
self.assertEqual(404, ctx.exception.code)
def test_unknown_get_path_404(self):
self._serve()
with self.assertRaises(HTTPError) as ctx:
self._get("/nope")
self.assertEqual(404, ctx.exception.code)
if __name__ == "__main__":
unittest.main()
+66 -1
View File
@@ -9,11 +9,15 @@ import unittest
from pathlib import Path from pathlib import Path
from bot_bottle.agent_provider import ( from bot_bottle.agent_provider import (
CLAUDE_HOST_CREDENTIAL_HOSTS,
CODEX_HOST_CREDENTIAL_HOSTS, CODEX_HOST_CREDENTIAL_HOSTS,
build_agent_provision_plan, build_agent_provision_plan,
prompt_args, prompt_args,
) )
from bot_bottle.egress import CODEX_HOST_CREDENTIAL_TOKEN_REF from bot_bottle.egress import (
CLAUDE_HOST_CREDENTIAL_TOKEN_REF,
CODEX_HOST_CREDENTIAL_TOKEN_REF,
)
def _jwt(exp: int) -> str: def _jwt(exp: int) -> str:
@@ -289,6 +293,67 @@ class TestAgentProviderRuntime(unittest.TestCase):
) )
self.assertEqual({}, plan.provisioned_env) self.assertEqual({}, plan.provisioned_env)
def test_claude_forward_host_credentials_populates_egress_route(self):
access_token = "sk-ant-oat01-test-key"
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
home = Path(tmp) / "host-claude"
cred_dir = home / ".claude"
cred_dir.mkdir(parents=True)
(cred_dir / ".credentials.json").write_text(json.dumps({
"claudeAiOauth": {"accessToken": access_token},
}))
plan = build_agent_provision_plan(
template="claude",
dockerfile="",
state_dir=Path(tmp),
instance_name="bot-bottle-test",
prompt_file=Path(tmp) / "prompt.txt",
forward_host_credentials=True,
host_env={"HOME": str(home)},
)
self.assertEqual(1, len(plan.egress_routes))
route = plan.egress_routes[0]
self.assertIn(route.host, CLAUDE_HOST_CREDENTIAL_HOSTS)
self.assertEqual("Bearer", route.auth_scheme)
self.assertEqual(CLAUDE_HOST_CREDENTIAL_TOKEN_REF, route.token_ref)
self.assertEqual("egress-placeholder", plan.env_vars["CLAUDE_CODE_OAUTH_TOKEN"])
self.assertEqual(frozenset({"CLAUDE_CODE_OAUTH_TOKEN"}), plan.hidden_env_names)
def test_claude_forward_host_credentials_populates_provisioned_env(self):
access_token = "sk-ant-oat01-test-key"
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
home = Path(tmp) / "host-claude"
cred_dir = home / ".claude"
cred_dir.mkdir(parents=True)
(cred_dir / ".credentials.json").write_text(json.dumps({
"claudeAiOauth": {"accessToken": access_token},
}))
plan = build_agent_provision_plan(
template="claude",
dockerfile="",
state_dir=Path(tmp),
instance_name="bot-bottle-test",
prompt_file=Path(tmp) / "prompt.txt",
forward_host_credentials=True,
host_env={"HOME": str(home)},
)
self.assertEqual(
{CLAUDE_HOST_CREDENTIAL_TOKEN_REF: access_token},
plan.provisioned_env,
)
def test_claude_without_forward_host_credentials_has_empty_provisioned_env(self):
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
plan = build_agent_provision_plan(
template="claude",
dockerfile="",
state_dir=Path(tmp),
instance_name="bot-bottle-test",
prompt_file=Path(tmp) / "prompt.txt",
forward_host_credentials=False,
)
self.assertEqual({}, plan.provisioned_env)
def test_pi_plan_writes_default_ollama_models(self): def test_pi_plan_writes_default_ollama_models(self):
with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp: with tempfile.TemporaryDirectory(prefix="bb-provider.") as tmp:
plan = build_agent_provision_plan( plan = build_agent_provision_plan(
+187
View File
@@ -0,0 +1,187 @@
"""Unit: host Claude auth extraction."""
from __future__ import annotations
import json
import subprocess
import tempfile
import unittest
from datetime import datetime, timezone
from pathlib import Path
from unittest.mock import MagicMock, patch
from bot_bottle.contrib.claude.claude_auth import (
claude_auth_path,
claude_host_access_token,
)
from bot_bottle.log import Die
def _cred_json(access_token: str, **extra) -> str: # type: ignore[no-untyped-def]
payload: dict = {"claudeAiOauth": {"accessToken": access_token, **extra}}
return json.dumps(payload)
class TestClaudeHostAccessToken(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.TemporaryDirectory(prefix="bb-claude-auth.")
self.home = Path(self.tmp.name)
self.cred_dir = self.home / ".claude"
self.cred_dir.mkdir()
self.auth_path = self.cred_dir / ".credentials.json"
def tearDown(self):
self.tmp.cleanup()
def _write(self, payload: dict) -> None: # type: ignore[no-untyped-def]
self.auth_path.write_text(json.dumps(payload))
def test_auth_path_uses_home_env(self):
self.assertEqual(
self.auth_path,
claude_auth_path({"HOME": str(self.home)}),
)
# --- file-based (Linux) ---
def test_file_returns_access_token(self):
key = "sk-ant-oat01-real-key"
self._write({"claudeAiOauth": {"accessToken": key}})
out = claude_host_access_token({"HOME": str(self.home)})
self.assertEqual(key, out)
def test_file_missing_claude_ai_oauth_dies(self):
self._write({"hasCompletedOnboarding": True})
with self.assertRaises(Die):
claude_host_access_token({"HOME": str(self.home)})
def test_file_missing_access_token_dies(self):
self._write({"claudeAiOauth": {"expiresAt": 2000000000000}})
with self.assertRaises(Die):
claude_host_access_token({"HOME": str(self.home)})
def test_file_empty_access_token_dies(self):
self._write({"claudeAiOauth": {"accessToken": ""}})
with self.assertRaises(Die):
claude_host_access_token({"HOME": str(self.home)})
def test_file_expired_token_dies(self):
# expiresAt is milliseconds; 1_000_000 ms is year 1970
self._write({
"claudeAiOauth": {"accessToken": "sk-ant-oat01-x", "expiresAt": 1_000_000},
})
with self.assertRaises(Die):
claude_host_access_token(
{"HOME": str(self.home)},
now=datetime(2026, 1, 1, tzinfo=timezone.utc),
)
def test_file_future_expiry_is_accepted(self):
key = "sk-ant-oat01-y"
# 2_000_000_000_000 ms ≈ year 2033
self._write({
"claudeAiOauth": {"accessToken": key, "expiresAt": 2_000_000_000_000},
})
out = claude_host_access_token(
{"HOME": str(self.home)},
now=datetime(2026, 1, 1, tzinfo=timezone.utc),
)
self.assertEqual(key, out)
def test_file_absent_expiry_is_accepted(self):
key = "sk-ant-oat01-z"
self._write({"claudeAiOauth": {"accessToken": key}})
out = claude_host_access_token({"HOME": str(self.home)})
self.assertEqual(key, out)
def test_file_non_json_dies(self):
self.auth_path.write_text("not json {{{")
with self.assertRaises(Die):
claude_host_access_token({"HOME": str(self.home)})
def test_file_json_array_root_dies(self):
self.auth_path.write_text("[]")
with self.assertRaises(Die):
claude_host_access_token({"HOME": str(self.home)})
def test_file_extra_fields_are_ignored(self):
key = "sk-ant-oat01-real"
self._write({
"claudeAiOauth": {
"accessToken": key,
"refreshToken": "sk-ant-ort01-secret",
"scopes": ["user:inference"],
"expiresAt": 2_000_000_000_000,
},
})
out = claude_host_access_token({"HOME": str(self.home)})
self.assertEqual(key, out)
# --- macOS Keychain fallback ---
def _home_without_creds(self) -> Path:
"""A home dir that has .claude/ but no .credentials.json."""
empty = self.home / "no-creds"
(empty / ".claude").mkdir(parents=True)
return empty
def _mock_keychain(self, stdout: str, returncode: int = 0) -> MagicMock:
mock = MagicMock()
mock.returncode = returncode
mock.stdout = stdout
return mock
def test_keychain_used_when_file_absent(self):
key = "sk-ant-oat01-keychain"
home = self._home_without_creds()
with patch(
"bot_bottle.contrib.claude.claude_auth.subprocess.run",
return_value=self._mock_keychain(_cred_json(key)),
), patch(
"bot_bottle.contrib.claude.claude_auth.sys.platform", "darwin",
):
out = claude_host_access_token({"HOME": str(home)})
self.assertEqual(key, out)
def test_keychain_failure_when_file_absent_dies(self):
home = self._home_without_creds()
with patch(
"bot_bottle.contrib.claude.claude_auth.subprocess.run",
return_value=self._mock_keychain("", returncode=44),
), patch(
"bot_bottle.contrib.claude.claude_auth.sys.platform", "darwin",
):
with self.assertRaises(Die):
claude_host_access_token({"HOME": str(home)})
def test_no_file_no_keychain_on_linux_dies(self):
home = self._home_without_creds()
with patch("bot_bottle.contrib.claude.claude_auth.sys.platform", "linux"):
with self.assertRaises(Die):
claude_host_access_token({"HOME": str(home)})
def test_keychain_non_json_dies(self):
home = self._home_without_creds()
with patch(
"bot_bottle.contrib.claude.claude_auth.subprocess.run",
return_value=self._mock_keychain("not-json"),
), patch(
"bot_bottle.contrib.claude.claude_auth.sys.platform", "darwin",
):
with self.assertRaises(Die):
claude_host_access_token({"HOME": str(home)})
def test_keychain_security_not_found_dies(self):
home = self._home_without_creds()
with patch(
"bot_bottle.contrib.claude.claude_auth.subprocess.run",
side_effect=FileNotFoundError,
), patch(
"bot_bottle.contrib.claude.claude_auth.sys.platform", "darwin",
):
with self.assertRaises(Die):
claude_host_access_token({"HOME": str(home)})
if __name__ == "__main__":
unittest.main()
+153
View File
@@ -0,0 +1,153 @@
"""Unit: GiteaClient and GiteaForge (urllib mocked — no network)."""
from __future__ import annotations
import json
import unittest
import urllib.error
from unittest.mock import MagicMock, patch
from bot_bottle.contrib.gitea.client import GiteaClient, GiteaForge
def _client() -> GiteaClient:
return GiteaClient(api_url="http://g/api/v1", owner="o", repo="r", token="tok")
def _mock_response(body: bytes) -> MagicMock:
resp = MagicMock()
resp.read.return_value = body
resp.__enter__.return_value = resp
resp.__exit__.return_value = False
return resp
class GiteaClientTest(unittest.TestCase):
# pylint: disable=protected-access
def setUp(self):
self.client = _client()
def test_request_returns_parsed_json(self):
payload = {"number": 42}
resp = _mock_response(json.dumps(payload).encode())
with patch("urllib.request.urlopen", return_value=resp):
result = self.client._request("GET", "/repos/o/r/issues/42")
self.assertEqual(payload, result)
def test_request_empty_body_returns_none(self):
resp = _mock_response(b"")
with patch("urllib.request.urlopen", return_value=resp):
result = self.client._request("POST", "/some/path", {"x": 1})
self.assertIsNone(result)
def test_is_org_member_true_on_200(self):
mock_resp = MagicMock()
mock_resp.close = MagicMock()
with patch("urllib.request.urlopen", return_value=mock_resp):
self.assertTrue(self.client.is_org_member("myorg", "alice"))
def test_is_org_member_false_on_http_error(self):
err = urllib.error.HTTPError("url", 404, "Not Found", None, None) # type: ignore[arg-type]
with patch("urllib.request.urlopen", side_effect=err):
self.assertFalse(self.client.is_org_member("myorg", "nobody"))
def test_get_issue(self):
resp = _mock_response(json.dumps({"number": 1}).encode())
with patch("urllib.request.urlopen", return_value=resp):
result = self.client.get_issue(1)
self.assertEqual(1, result["number"])
def test_get_pull(self):
resp = _mock_response(json.dumps({"number": 7, "merged": False}).encode())
with patch("urllib.request.urlopen", return_value=resp):
result = self.client.get_pull(7)
self.assertEqual(7, result["number"])
def test_list_comments(self):
resp = _mock_response(json.dumps([{"id": 1, "body": "hi"}]).encode())
with patch("urllib.request.urlopen", return_value=resp):
result = self.client.list_comments(1)
self.assertEqual(1, len(result))
self.assertEqual(1, result[0]["id"])
def test_create_comment(self):
resp = _mock_response(b"")
with patch("urllib.request.urlopen", return_value=resp) as mock_open:
self.client.create_comment(1, "hello")
mock_open.assert_called_once()
def test_update_issue(self):
resp = _mock_response(b"")
with patch("urllib.request.urlopen", return_value=resp) as mock_open:
self.client.update_issue(1, "new body")
mock_open.assert_called_once()
def test_request_builds_correct_url(self):
import urllib.request as ureq
captured: list[ureq.Request] = []
def fake_urlopen(req: ureq.Request, timeout: float) -> MagicMock: # pylint: disable=unused-argument
captured.append(req)
return _mock_response(b"{}")
with patch("urllib.request.urlopen", side_effect=fake_urlopen):
self.client.get_issue(5)
self.assertIn("/issues/5", captured[0].full_url)
def test_request_sends_auth_header(self):
import urllib.request as ureq
captured: list[ureq.Request] = []
def fake_urlopen(req: ureq.Request, timeout: float) -> MagicMock: # pylint: disable=unused-argument
captured.append(req)
return _mock_response(b"{}")
with patch("urllib.request.urlopen", side_effect=fake_urlopen):
self.client.get_issue(1)
self.assertEqual("token tok", captured[0].get_header("Authorization"))
class GiteaForgeTest(unittest.TestCase):
def setUp(self):
self.client = MagicMock(spec=GiteaClient)
self.forge = GiteaForge(self.client)
def test_is_org_member_delegates(self):
self.client.is_org_member.return_value = True
self.assertTrue(self.forge.is_org_member("org", "alice"))
self.client.is_org_member.assert_called_once_with("org", "alice")
def test_is_org_member_false(self):
self.client.is_org_member.return_value = False
self.assertFalse(self.forge.is_org_member("org", "outsider"))
def test_read_issue_delegates(self):
self.client.get_issue.return_value = {"number": 3}
self.assertEqual({"number": 3}, self.forge.read_issue(3))
self.client.get_issue.assert_called_once_with(3)
def test_read_pr_delegates(self):
self.client.get_pull.return_value = {"number": 5, "merged": False}
result = self.forge.read_pr(5)
self.assertEqual(5, result["number"])
self.client.get_pull.assert_called_once_with(5)
def test_read_comments_delegates(self):
self.client.list_comments.return_value = [{"id": 1}]
comments = self.forge.read_comments(1)
self.assertEqual([{"id": 1}], comments)
self.client.list_comments.assert_called_once_with(1)
def test_post_comment_delegates(self):
self.forge.post_comment(1, "looks good")
self.client.create_comment.assert_called_once_with(1, "looks good")
def test_update_description_delegates(self):
self.forge.update_description(1, "updated body")
self.client.update_issue.assert_called_once_with(1, "updated body")
if __name__ == "__main__":
unittest.main()
+9 -1
View File
@@ -80,11 +80,19 @@ class TestAgentProviderHostCredentials(unittest.TestCase):
"forward_host_credentials": "yes", "forward_host_credentials": "yes",
}) })
def test_forward_host_credentials_rejected_for_claude(self): def test_forward_host_credentials_allowed_for_claude(self):
b = _provider_config_bottle({
"template": "claude",
"forward_host_credentials": True,
})
self.assertTrue(b.agent_provider.forward_host_credentials)
def test_forward_host_credentials_and_auth_token_rejected_together(self):
with self.assertRaises(ManifestError): with self.assertRaises(ManifestError):
_provider_config_bottle({ _provider_config_bottle({
"template": "claude", "template": "claude",
"forward_host_credentials": True, "forward_host_credentials": True,
"auth_token": "SOME_TOKEN",
}) })
def test_auth_token_defaults_empty(self): def test_auth_token_defaults_empty(self):
+14 -2
View File
@@ -82,10 +82,22 @@ class TestAgentProviderValidation(unittest.TestCase):
"b", {"forward_host_credentials": True, "template": "weird"} "b", {"forward_host_credentials": True, "template": "weird"}
) )
def test_forward_creds_non_codex_template(self) -> None: def test_forward_creds_pi_template_rejected(self) -> None:
with self.assertRaises(ManifestError): with self.assertRaises(ManifestError):
ManifestAgentProvider.from_dict( ManifestAgentProvider.from_dict(
"b", {"forward_host_credentials": True, "template": "claude"} "b", {"forward_host_credentials": True, "template": "pi"}
)
def test_forward_creds_claude_allowed(self) -> None:
p = ManifestAgentProvider.from_dict(
"b", {"forward_host_credentials": True, "template": "claude"}
)
self.assertTrue(p.forward_host_credentials)
def test_forward_creds_and_auth_token_rejected(self) -> None:
with self.assertRaises(ManifestError):
ManifestAgentProvider.from_dict(
"b", {"forward_host_credentials": True, "auth_token": "T", "template": "claude"}
) )
def test_valid_claude_auth_token(self) -> None: def test_valid_claude_auth_token(self) -> None: