feat(forge): forge library layer for native integration (PRD chunks 1-3, 5)
lint / lint (push) Failing after 2m9s
test / unit (pull_request) Successful in 58s
test / integration (pull_request) Successful in 21s
test / coverage (pull_request) Successful in 1m23s

Implements the bot-bottle side of the forge-native PRD that is
self-contained in this repo (the forge sidecar and orchestrate command
belong to the separate bot-bottle-orchestrator, a PRD non-goal):

- contrib/forge/base.py: Forge ABC + ScopedForge enforcing the
  read-anywhere / write-scoped model (writes rejected outside the
  assigned issue/PRs via ForgeScopeError).
- contrib/gitea/client.py: GiteaClient (stdlib-only HTTP, mirrors the
  deploy-key provisioner) + GiteaForge. Token held by the caller (the
  sidecar), not injected by cred-proxy.
- contrib/gitea/forge_state.py: ForgeState dataclass + atomic
  read/write/delete/all under ~/.bot-bottle/forge/<owner>/<repo>/.
- contrib/gitea/provenance.py: build_provenance_footer — collapsed
  markdown audit footer; watchdog/gitleaks/egress rendering.
- cli/resume.py: `resume --headless --prompt` reusing the shipped
  assume_yes + headless_prompt launch core (the new half of chunk 1).

47 new unit tests; pylint 9.98/10, pyright clean. Forge sidecar (chunk
4), orchestrate command (chunk 6), and forge_env plumbing are deferred:
their only consumer is the separate orchestrator and they are untestable
in isolation here.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01WL77TgFxKbs3cidGMG9dz7
This commit is contained in:
2026-06-30 19:39:49 -04:00
parent 738990b2df
commit a229a22d54
11 changed files with 1025 additions and 0 deletions
+24
View File
@@ -27,12 +27,34 @@ from .start import _launch_bottle
def cmd_resume(argv: list[str]) -> int: def cmd_resume(argv: list[str]) -> int:
parser = argparse.ArgumentParser(prog=f"{PROG} resume", add_help=True) parser = argparse.ArgumentParser(prog=f"{PROG} resume", add_help=True)
parser.add_argument("--dry-run", action="store_true") parser.add_argument("--dry-run", action="store_true")
parser.add_argument(
"--headless",
action="store_true",
help=(
"non-interactive rehydrate: deliver --prompt to the agent and "
"skip the y/N preflight. For orchestrators / the freeze-rehydrate "
"loop."
),
)
parser.add_argument(
"--prompt",
default=None,
help="follow-up prompt delivered to the agent (required with --headless)",
)
parser.add_argument( parser.add_argument(
"identity", "identity",
help="bottle identity from a prior `start` (see its session-end output)", help="bottle identity from a prior `start` (see its session-end output)",
) )
args = parser.parse_args(argv) args = parser.parse_args(argv)
if args.prompt and not args.headless:
die("--prompt is only valid with --headless")
if args.headless and not args.prompt:
die(
"--headless requires --prompt: "
"./cli.py resume <identity> --headless --prompt 'Address the review'"
)
metadata = read_metadata(args.identity) metadata = read_metadata(args.identity)
if metadata is None: if metadata is None:
die( die(
@@ -56,4 +78,6 @@ def cmd_resume(argv: list[str]) -> int:
spec, spec,
dry_run=args.dry_run, dry_run=args.dry_run,
backend_name=backend_name, backend_name=backend_name,
assume_yes=args.headless,
headless_prompt_text=args.prompt or "",
) )
+145
View File
@@ -0,0 +1,145 @@
"""Forge abstraction (PRD forge-native-integration, chunk 3).
The `Forge` abstract class is the provider-agnostic surface a forge
sidecar dispatches to: read issues/comments, post comments, edit
descriptions, and the membership / PR lookups the orchestrator needs.
Each forge (Gitea first) implements it; the sidecar protocol and the
agent prompt stay forge-agnostic.
`signal_done` is deliberately *not* a `Forge` method — completion is a
sidecar concept relayed to the orchestrator over a queue dir, not a
forge API operation.
`ScopedForge` enforces the PRD's **read-anywhere / write-scoped** model:
reads pass through to any issue/PR for context; writes are rejected
unless the target is the assigned issue or one of its PRs. This bounds
the blast radius of a prompt-injected agent below repo-wide API-key
permissions.
"""
from __future__ import annotations
import abc
from collections.abc import Iterable
from dataclasses import dataclass
@dataclass(frozen=True)
class Issue:
"""A forge issue or PR (forges model PRs as issues with the same
number)."""
number: int
title: str
body: str
state: str # "open" | "closed"
@dataclass(frozen=True)
class Comment:
id: int
user: str # login of the comment author
body: str
class ForgeScopeError(PermissionError):
"""Raised by `ScopedForge` when a write targets an issue/PR outside
the assigned scope."""
class Forge(abc.ABC):
"""Provider-agnostic forge operations. Implementations wrap a
per-provider HTTP client and translate to `Issue` / `Comment`."""
@abc.abstractmethod
def read_issue(self, number: int) -> Issue:
"""Read an issue or PR body (read-anywhere)."""
@abc.abstractmethod
def read_comments(self, number: int) -> list[Comment]:
"""Read a thread's comments (read-anywhere)."""
@abc.abstractmethod
def post_comment(self, number: int, body: str) -> None:
"""Post a comment to an issue or PR (write-scoped)."""
@abc.abstractmethod
def update_description(self, number: int, body: str) -> None:
"""Replace an issue or PR body (write-scoped)."""
@abc.abstractmethod
def is_org_member(self, org: str, username: str) -> bool:
"""Whether `username` is a member of `org`."""
@abc.abstractmethod
def get_pr_for_issue(self, number: int) -> int | None:
"""The PR number linked to an issue, or None when there is none."""
@abc.abstractmethod
def is_pr_open(self, number: int) -> bool:
"""Whether the given PR is still open."""
class ScopedForge(Forge):
"""Read-anywhere / write-scoped wrapper around a concrete `Forge`.
`post_comment` and `update_description` are rejected with
`ForgeScopeError` unless the target number is the assigned issue or
one of the assigned PRs. Every other method delegates unchanged, so
reads, membership checks, and PR lookups work against any number for
context.
The writable set is fixed at construction. The sidecar reconstructs
a `ScopedForge` when a PR is discovered (`get_pr_for_issue`) so the
new PR becomes writable; this class does not mutate its own scope.
"""
def __init__(
self,
inner: Forge,
*,
assigned_issue: int,
assigned_prs: Iterable[int] = (),
) -> None:
self._inner = inner
self._assigned_issue = assigned_issue
self._writable = {assigned_issue, *assigned_prs}
@property
def writable(self) -> frozenset[int]:
return frozenset(self._writable)
def _check_write(self, number: int) -> None:
if number not in self._writable:
allowed = ", ".join(str(n) for n in sorted(self._writable))
raise ForgeScopeError(
f"write to #{number} denied: out of assigned scope "
f"(writable: {allowed})"
)
# --- read-anywhere: pass through --------------------------------------
def read_issue(self, number: int) -> Issue:
return self._inner.read_issue(number)
def read_comments(self, number: int) -> list[Comment]:
return self._inner.read_comments(number)
def is_org_member(self, org: str, username: str) -> bool:
return self._inner.is_org_member(org, username)
def get_pr_for_issue(self, number: int) -> int | None:
return self._inner.get_pr_for_issue(number)
def is_pr_open(self, number: int) -> bool:
return self._inner.is_pr_open(number)
# --- write-scoped: check then delegate --------------------------------
def post_comment(self, number: int, body: str) -> None:
self._check_write(number)
self._inner.post_comment(number, body)
def update_description(self, number: int, body: str) -> None:
self._check_write(number)
self._inner.update_description(number, body)
+164
View File
@@ -0,0 +1,164 @@
"""Gitea HTTP client + `GiteaForge` (PRD forge-native-integration, chunk 3).
`GiteaClient` is the thin stdlib-only HTTP transport (mirrors
`deploy_key_provisioner.py`: `urllib.request`, bounded timeouts,
structured error bodies). `GiteaForge` adapts it to the provider-agnostic
`Forge` surface.
Unlike the option-2 design, the token is held here (the sidecar process
owns it) and passed to the client directly — there is no agent-side
cred-proxy route, because the agent never makes forge calls. The HTTP
client is the one piece shared with `GiteaDeployKeyProvisioner`; the two
are deliberately *not* unified behind a common abstract base (see the
deferral note in the PRD).
"""
from __future__ import annotations
import json
import urllib.error
import urllib.request
from typing import Any
from ..forge.base import Comment, Forge, Issue
# Bound every Gitea call: a hung instance must not stall the sidecar.
_API_TIMEOUT_SECS = 30
class GiteaClient:
"""Thin authenticated HTTP client for one repo's Gitea API.
`api_url` is the API base *including* `/api/v1` (matching the
`FORGE_GITEA_API` env var), e.g. `https://gitea.example.com/api/v1`.
"""
def __init__(self, *, api_url: str, owner: str, repo: str, token: str) -> None:
self._api_url = api_url.rstrip("/")
self._owner = owner
self._repo = repo
self._token = token
# --- low-level request -------------------------------------------------
def _request(
self, method: str, path: str, *, body: dict[str, Any] | None = None
) -> tuple[int, Any]:
"""Issue an authenticated request. Returns `(status, parsed_json)`;
parsed_json is None when the response has no body. Raises
`RuntimeError` on any non-2xx except where callers special-case
the HTTPError themselves (membership 404)."""
url = f"{self._api_url}{path}"
data = json.dumps(body).encode() if body is not None else None
headers = {"Authorization": f"token {self._token}"}
if data is not None:
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, data=data, headers=headers, method=method)
with urllib.request.urlopen(req, timeout=_API_TIMEOUT_SECS) as resp:
raw = resp.read()
parsed = json.loads(raw) if raw else None
return resp.status, parsed
def _repo_path(self, suffix: str) -> str:
return f"/repos/{self._owner}/{self._repo}{suffix}"
# --- operations --------------------------------------------------------
def is_org_member(self, org: str, username: str) -> bool:
"""GET /orgs/{org}/members/{username}: 2xx → member, 404 → not.
Other errors propagate so a misconfigured token fails loudly."""
url = f"{self._api_url}/orgs/{org}/members/{username}"
req = urllib.request.Request(
url, headers={"Authorization": f"token {self._token}"}, method="GET"
)
try:
with urllib.request.urlopen(req, timeout=_API_TIMEOUT_SECS):
return True
except urllib.error.HTTPError as exc:
if exc.code == 404:
return False
raise RuntimeError(
f"org membership check failed for {org}/{username}: "
f"HTTP {exc.code}{_read_error_body(exc)}"
) from exc
def get_issue(self, number: int) -> dict[str, Any]:
_status, body = self._request("GET", self._repo_path(f"/issues/{number}"))
return body or {}
def get_comments(self, number: int) -> list[dict[str, Any]]:
_status, body = self._request(
"GET", self._repo_path(f"/issues/{number}/comments")
)
return body or []
def post_comment(self, number: int, body: str) -> None:
self._request(
"POST",
self._repo_path(f"/issues/{number}/comments"),
body={"body": body},
)
def patch_issue_body(self, number: int, body: str) -> None:
self._request(
"PATCH", self._repo_path(f"/issues/{number}"), body={"body": body}
)
def get_pull(self, number: int) -> dict[str, Any]:
_status, body = self._request("GET", self._repo_path(f"/pulls/{number}"))
return body or {}
class GiteaForge(Forge):
"""`Forge` over a `GiteaClient`."""
def __init__(self, client: GiteaClient) -> None:
self._client = client
def read_issue(self, number: int) -> Issue:
raw = self._client.get_issue(number)
return Issue(
number=int(raw.get("number", number)),
title=str(raw.get("title", "")),
body=str(raw.get("body", "") or ""),
state=str(raw.get("state", "")),
)
def read_comments(self, number: int) -> list[Comment]:
return [
Comment(
id=int(c.get("id", 0)),
user=str((c.get("user") or {}).get("login", "")),
body=str(c.get("body", "") or ""),
)
for c in self._client.get_comments(number)
]
def post_comment(self, number: int, body: str) -> None:
self._client.post_comment(number, body)
def update_description(self, number: int, body: str) -> None:
self._client.patch_issue_body(number, body)
def is_org_member(self, org: str, username: str) -> bool:
return self._client.is_org_member(org, username)
def get_pr_for_issue(self, number: int) -> int | None:
"""Gitea models a PR as an issue with the same number, exposing a
`pull_request` object on the issue. When the queried number is
itself a PR, return it; otherwise None. (The orchestrator tracks
the issue→PR mapping in forge state for the cross-number case.)"""
raw = self._client.get_issue(number)
if raw.get("pull_request"):
return int(raw.get("number", number))
return None
def is_pr_open(self, number: int) -> bool:
return self._client.get_pull(number).get("state") == "open"
def _read_error_body(exc: urllib.error.HTTPError) -> str:
try:
return exc.read().decode("utf-8", errors="replace")
except Exception: # pylint: disable=broad-exception-caught
return ""
+105
View File
@@ -0,0 +1,105 @@
"""Forge state persistence (PRD forge-native-integration, chunk 2).
The orchestrator tracks one record per forge-targeted issue so it can
map an incoming webhook back to the bottle handling it, drive the
freeze / rehydrate loop, and run the watchdog. State lives on disk and
survives orchestrator restarts:
~/.bot-bottle/forge/<owner>/<repo>/issue-<n>.json
Writes are atomic (`os.replace`) so a crash mid-write never leaves a
truncated record.
"""
from __future__ import annotations
import json
import os
from dataclasses import asdict, dataclass, field, fields
from typing import Any
from pathlib import Path
from ...supervise import bot_bottle_root
_FORGE_SUBDIR = "forge"
# Lifecycle: a bottle is launched (running), frozen on the done signal,
# and destroyed when the PR closes.
STATUS_RUNNING = "running"
STATUS_FROZEN = "frozen"
STATUS_DESTROYED = "destroyed"
@dataclass
class ForgeState:
"""One forge-targeted issue's bottle lifecycle record."""
owner: str
repo: str
issue_number: int
slug: str
agent_name: str
bottle_names: list[str] = field(default_factory=list)
backend_name: str = ""
agent_git_user: str = ""
pr_number: int | None = None
status: str = STATUS_RUNNING
last_checkin_at: str = ""
def to_json(self) -> str:
return json.dumps(asdict(self), indent=2, sort_keys=True)
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "ForgeState":
# Tolerate unknown keys (forward-compat) by filtering to fields.
known = {f.name for f in fields(cls)}
return cls(**{k: v for k, v in data.items() if k in known})
def _forge_root() -> Path:
return bot_bottle_root() / _FORGE_SUBDIR
def forge_state_path(owner: str, repo: str, issue_number: int) -> Path:
return _forge_root() / owner / repo / f"issue-{issue_number}.json"
def write_forge_state(state: ForgeState) -> None:
"""Persist `state` atomically. Creates parent dirs as needed."""
path = forge_state_path(state.owner, state.repo, state.issue_number)
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(".json.tmp")
tmp.write_text(state.to_json())
os.replace(tmp, path)
def read_forge_state(owner: str, repo: str, issue_number: int) -> ForgeState | None:
"""Load state for one issue, or None when no record exists."""
path = forge_state_path(owner, repo, issue_number)
try:
data = json.loads(path.read_text())
except FileNotFoundError:
return None
return ForgeState.from_dict(data)
def delete_forge_state(owner: str, repo: str, issue_number: int) -> None:
"""Remove an issue's record. Missing file is success (idempotent)."""
path = forge_state_path(owner, repo, issue_number)
path.unlink(missing_ok=True)
def all_forge_states() -> list[ForgeState]:
"""Every persisted record, for the orchestrate-status table and the
watchdog sweep. Unreadable files are skipped rather than aborting the
whole listing."""
root = _forge_root()
if not root.is_dir():
return []
states: list[ForgeState] = []
for path in sorted(root.glob("*/*/issue-*.json")):
try:
states.append(ForgeState.from_dict(json.loads(path.read_text())))
except (OSError, ValueError, TypeError):
continue
return states
+103
View File
@@ -0,0 +1,103 @@
"""Provenance footer (PRD forge-native-integration, chunk 5).
Every orchestrator-posted comment ends with this footer — non-optional
and not configurable off. It renders the run's audit trail (agent,
bottle, timing, exit, gitleaks, done-signal source, egress) as a
collapsed markdown block the reviewer sees at the moment of the merge
decision.
The function is pure: the orchestrator, which holds the run context,
supplies the values. In particular `egress_routes` is the pre-rendered
list of allowed-route lines the orchestrator computed from the run's
resolved egress policy — this module does not parse backend-specific
egress state. (The PRD sketch named an `egress_log_path`; passing the
already-rendered lines keeps the footer builder pure and fully testable
and leaves egress-state parsing where the data lives.)
"""
from __future__ import annotations
from datetime import datetime
def _parse(ts: str) -> datetime | None:
try:
return datetime.fromisoformat(ts)
except (ValueError, TypeError):
return None
def _format_duration(started_at: str, finished_at: str) -> str:
start = _parse(started_at)
end = _parse(finished_at)
if start is None or end is None:
return "unknown"
secs = int((end - start).total_seconds())
if secs < 0:
return "unknown"
if secs < 60:
return f"{secs}s"
return f"{secs // 60}m {secs % 60}s"
def build_provenance_footer(
slug: str,
*,
agent_name: str,
bottle_names: tuple[str, ...],
started_at: str,
finished_at: str,
exit_code: int,
watchdog_fired: bool = False,
gitleaks_clean: bool | None = None,
egress_routes: list[str] | None = None,
) -> str:
"""Return a markdown string for appending to a Gitea comment body.
`watchdog_fired=True` marks runs where the agent did not signal
completion, so reviewers know the audit trail may be incomplete.
`gitleaks_clean=None` renders the gitleaks row as "not run".
`egress_routes` is omitted entirely when None/empty.
"""
bottle_label = ", ".join(f"`{b}`" for b in bottle_names) if bottle_names else ""
exit_cell = f"{exit_code} {'' if exit_code == 0 else ''}"
if gitleaks_clean is None:
gitleaks_cell = "— not run"
elif gitleaks_clean:
gitleaks_cell = "✓ no secrets detected"
else:
gitleaks_cell = "✗ secrets detected"
if watchdog_fired:
done_cell = "watchdog — agent did not signal"
else:
done_cell = "sidecar `signal_done`"
lines = [
"<details><summary>🔬 Run provenance</summary>",
"",
"| Field | Value |",
"|---|---|",
f"| agent | `{agent_name}` |",
f"| bottle | {bottle_label} |",
f"| slug | `{slug}` |",
f"| started | {started_at} |",
f"| duration | {_format_duration(started_at, finished_at)} |",
f"| exit | {exit_cell} |",
f"| gitleaks | {gitleaks_cell} |",
f"| done signal | {done_cell} |",
]
if egress_routes:
lines.append("")
lines.append(
f"**Egress** (deny-by-default; {len(egress_routes)} "
f"route{'s' if len(egress_routes) != 1 else ''} allowed)"
)
for route in egress_routes:
lines.append(f"- {route}")
lines.append("")
lines.append("</details>")
return "\n".join(lines)
+74
View File
@@ -0,0 +1,74 @@
"""Unit: `cli.py resume --headless` non-interactive rehydrate path.
The freeze / rehydrate loop needs a non-interactive `resume`: deliver a
follow-up prompt and skip the y/N preflight, reusing the same launch
core (`assume_yes` + `headless_prompt_text`) as `start --headless`.
"""
from __future__ import annotations
import unittest
from unittest.mock import MagicMock, patch
import bot_bottle.cli.resume as resume_mod
from bot_bottle.log import Die
def _metadata():
md = MagicMock()
md.agent_name = "implementer"
md.copy_cwd = False
md.cwd = "/repo"
md.identity = "implementer-abc12"
md.bottle_names = ["claude"]
md.backend = "docker"
return md
class ResumeHeadlessTest(unittest.TestCase):
def setUp(self) -> None:
self._launch = patch.object(
resume_mod, "_launch_bottle", return_value=0
).start()
patch.object(
resume_mod, "read_metadata", return_value=_metadata()
).start()
manifest = MagicMock()
manifest.require_agent = MagicMock(return_value=None)
patch.object(
resume_mod.ManifestIndex, "resolve", return_value=manifest
).start()
self.addCleanup(patch.stopall)
def _launch_kwargs(self) -> dict:
self._launch.assert_called_once()
return self._launch.call_args.kwargs
def test_headless_passes_assume_yes_and_prompt(self):
rc = resume_mod.cmd_resume(
["implementer-abc12", "--headless", "--prompt", "Address the review"]
)
self.assertEqual(0, rc)
kwargs = self._launch_kwargs()
self.assertTrue(kwargs["assume_yes"])
self.assertEqual("Address the review", kwargs["headless_prompt_text"])
def test_interactive_resume_unchanged(self):
resume_mod.cmd_resume(["implementer-abc12"])
kwargs = self._launch_kwargs()
self.assertFalse(kwargs["assume_yes"])
self.assertEqual("", kwargs["headless_prompt_text"])
def test_headless_without_prompt_errors(self):
with self.assertRaises(Die):
resume_mod.cmd_resume(["implementer-abc12", "--headless"])
self._launch.assert_not_called()
def test_prompt_without_headless_errors(self):
with self.assertRaises(Die):
resume_mod.cmd_resume(["implementer-abc12", "--prompt", "hi"])
self._launch.assert_not_called()
if __name__ == "__main__":
unittest.main()
+95
View File
@@ -0,0 +1,95 @@
"""Unit: Forge abstraction + ScopedForge (PRD forge-native-integration)."""
from __future__ import annotations
import unittest
from bot_bottle.contrib.forge.base import (
Comment,
Forge,
ForgeScopeError,
Issue,
ScopedForge,
)
class _RecordingForge(Forge):
"""In-memory fake that records writes."""
def __init__(self) -> None:
self.comments: list[tuple[int, str]] = []
self.descriptions: list[tuple[int, str]] = []
def read_issue(self, number: int) -> Issue:
return Issue(number=number, title="t", body="b", state="open")
def read_comments(self, number: int) -> list[Comment]:
return [Comment(id=1, user="alice", body="hi")]
def post_comment(self, number: int, body: str) -> None:
self.comments.append((number, body))
def update_description(self, number: int, body: str) -> None:
self.descriptions.append((number, body))
def is_org_member(self, org: str, username: str) -> bool:
return username == "member"
def get_pr_for_issue(self, number: int) -> int | None:
return 99 if number == 17 else None
def is_pr_open(self, number: int) -> bool:
return True
class TestScopedForgeReads(unittest.TestCase):
def setUp(self) -> None:
self.inner = _RecordingForge()
self.scoped = ScopedForge(self.inner, assigned_issue=17, assigned_prs=[42])
def test_reads_pass_through_to_any_number(self):
# A number well outside the writable scope still reads fine.
self.assertEqual(123, self.scoped.read_issue(123).number)
self.assertEqual("alice", self.scoped.read_comments(500)[0].user)
def test_membership_and_pr_lookups_delegate(self):
self.assertTrue(self.scoped.is_org_member("bot-bottle", "member"))
self.assertFalse(self.scoped.is_org_member("bot-bottle", "stranger"))
self.assertEqual(99, self.scoped.get_pr_for_issue(17))
self.assertTrue(self.scoped.is_pr_open(8000))
class TestScopedForgeWrites(unittest.TestCase):
def setUp(self) -> None:
self.inner = _RecordingForge()
self.scoped = ScopedForge(self.inner, assigned_issue=17, assigned_prs=[42])
def test_writable_set_is_issue_plus_prs(self):
self.assertEqual(frozenset({17, 42}), self.scoped.writable)
def test_write_to_assigned_issue_allowed(self):
self.scoped.post_comment(17, "done")
self.assertEqual([(17, "done")], self.inner.comments)
def test_write_to_assigned_pr_allowed(self):
self.scoped.update_description(42, "new body")
self.assertEqual([(42, "new body")], self.inner.descriptions)
def test_comment_outside_scope_rejected(self):
with self.assertRaises(ForgeScopeError) as ctx:
self.scoped.post_comment(500, "spam")
self.assertIn("500", str(ctx.exception))
self.assertEqual([], self.inner.comments)
def test_description_outside_scope_rejected(self):
with self.assertRaises(ForgeScopeError):
self.scoped.update_description(500, "tamper")
self.assertEqual([], self.inner.descriptions)
def test_scope_error_is_permission_error(self):
# Sidecars can catch the stdlib base type.
self.assertTrue(issubclass(ForgeScopeError, PermissionError))
if __name__ == "__main__":
unittest.main()
+131
View File
@@ -0,0 +1,131 @@
"""Unit: GiteaClient + GiteaForge (PRD forge-native-integration)."""
from __future__ import annotations
import json
import unittest
import urllib.error
from io import BytesIO
from unittest.mock import MagicMock, patch
from bot_bottle.contrib.gitea.client import GiteaClient, GiteaForge
def _client() -> GiteaClient:
return GiteaClient(
api_url="https://gitea.example.com/api/v1",
owner="didericis",
repo="bot-bottle",
token="test-token",
)
def _resp(body, status: int = 200) -> MagicMock:
resp = MagicMock()
resp.read.return_value = json.dumps(body).encode() if body is not None else b""
resp.status = status
resp.__enter__ = lambda s: s
resp.__exit__ = MagicMock(return_value=False)
return resp
def _http_error(code: int, body: str = "") -> urllib.error.HTTPError:
return urllib.error.HTTPError(
url="http://x", code=code, msg="err", hdrs=None, # type: ignore[arg-type]
fp=BytesIO(body.encode()),
)
_URLOPEN = "bot_bottle.contrib.gitea.client.urllib.request.urlopen"
class TestOrgMembership(unittest.TestCase):
def test_member_returns_true_on_2xx(self):
with patch(_URLOPEN, return_value=_resp(None, 204)) as m:
self.assertTrue(_client().is_org_member("bot-bottle", "alice"))
req = m.call_args.args[0]
self.assertIn("/orgs/bot-bottle/members/alice", req.full_url)
def test_nonmember_returns_false_on_404(self):
with patch(_URLOPEN, side_effect=_http_error(404)):
self.assertFalse(_client().is_org_member("bot-bottle", "stranger"))
def test_other_http_error_raises(self):
with patch(_URLOPEN, side_effect=_http_error(403, "forbidden")):
with self.assertRaises(RuntimeError) as ctx:
_client().is_org_member("bot-bottle", "alice")
self.assertIn("403", str(ctx.exception))
class TestForgeReads(unittest.TestCase):
def test_read_issue_maps_fields(self):
raw = {"number": 17, "title": "Bug", "body": "broken", "state": "open"}
with patch(_URLOPEN, return_value=_resp(raw)) as m:
issue = GiteaForge(_client()).read_issue(17)
self.assertEqual((17, "Bug", "broken", "open"),
(issue.number, issue.title, issue.body, issue.state))
self.assertIn("/repos/didericis/bot-bottle/issues/17",
m.call_args.args[0].full_url)
def test_read_issue_tolerates_null_body(self):
raw = {"number": 17, "title": "T", "body": None, "state": "open"}
with patch(_URLOPEN, return_value=_resp(raw)):
self.assertEqual("", GiteaForge(_client()).read_issue(17).body)
def test_read_comments_maps_user_login(self):
raw = [
{"id": 1, "user": {"login": "alice"}, "body": "hi"},
{"id": 2, "user": {"login": "bob"}, "body": "yo"},
]
with patch(_URLOPEN, return_value=_resp(raw)):
comments = GiteaForge(_client()).read_comments(17)
self.assertEqual(["alice", "bob"], [c.user for c in comments])
self.assertEqual([1, 2], [c.id for c in comments])
class TestForgeWrites(unittest.TestCase):
def test_post_comment_payload_and_url(self):
with patch(_URLOPEN, return_value=_resp(None, 201)) as m:
GiteaForge(_client()).post_comment(17, "done ✓")
req = m.call_args.args[0]
self.assertEqual("POST", req.method)
self.assertIn("/repos/didericis/bot-bottle/issues/17/comments", req.full_url)
self.assertEqual("done ✓", json.loads(req.data)["body"])
def test_update_description_patches_issue(self):
with patch(_URLOPEN, return_value=_resp(None, 200)) as m:
GiteaForge(_client()).update_description(17, "edited")
req = m.call_args.args[0]
self.assertEqual("PATCH", req.method)
self.assertTrue(req.full_url.endswith("/issues/17"))
self.assertEqual("edited", json.loads(req.data)["body"])
def test_auth_header_sent(self):
with patch(_URLOPEN, return_value=_resp(None, 201)) as m:
GiteaForge(_client()).post_comment(17, "x")
self.assertEqual("token test-token",
m.call_args.args[0].headers["Authorization"])
class TestPRHelpers(unittest.TestCase):
def test_get_pr_for_issue_returns_number_when_issue_is_pr(self):
raw = {"number": 18, "pull_request": {"merged": False}}
with patch(_URLOPEN, return_value=_resp(raw)):
self.assertEqual(18, GiteaForge(_client()).get_pr_for_issue(18))
def test_get_pr_for_issue_none_for_plain_issue(self):
raw = {"number": 17, "pull_request": None}
with patch(_URLOPEN, return_value=_resp(raw)):
self.assertIsNone(GiteaForge(_client()).get_pr_for_issue(17))
def test_is_pr_open_true_when_state_open(self):
with patch(_URLOPEN, return_value=_resp({"state": "open"})):
self.assertTrue(GiteaForge(_client()).is_pr_open(18))
def test_is_pr_open_false_when_closed(self):
with patch(_URLOPEN, return_value=_resp({"state": "closed"})):
self.assertFalse(GiteaForge(_client()).is_pr_open(18))
if __name__ == "__main__":
unittest.main()
@@ -0,0 +1,103 @@
"""Unit: forge state persistence (PRD forge-native-integration)."""
from __future__ import annotations
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch
from bot_bottle.contrib.gitea import forge_state as fs
from bot_bottle.contrib.gitea.forge_state import (
STATUS_FROZEN,
STATUS_RUNNING,
ForgeState,
)
def _state(**over) -> ForgeState:
base = {
"owner": "didericis",
"repo": "bot-bottle",
"issue_number": 17,
"slug": "implementer-abc12",
"agent_name": "implementer",
"bottle_names": ["claude"],
"backend_name": "docker",
"agent_git_user": "didericis-claude",
"pr_number": 42,
"status": STATUS_FROZEN,
"last_checkin_at": "2026-06-29T12:04:12-04:00",
}
base.update(over)
return ForgeState(**base)
class ForgeStateTest(unittest.TestCase):
def setUp(self) -> None:
# enterContext handles cleanup; pylint doesn't recognize it as CM-aware.
root = Path(self.enterContext( # pylint: disable=consider-using-with
tempfile.TemporaryDirectory()))
patcher = patch.object(fs, "bot_bottle_root", return_value=root)
patcher.start()
self.addCleanup(patcher.stop)
def test_round_trip(self):
fs.write_forge_state(_state())
got = fs.read_forge_state("didericis", "bot-bottle", 17)
self.assertEqual(_state(), got)
def test_missing_returns_none(self):
self.assertIsNone(fs.read_forge_state("nobody", "nope", 1))
def test_path_layout(self):
path = fs.forge_state_path("didericis", "bot-bottle", 17)
self.assertTrue(str(path).endswith("forge/didericis/bot-bottle/issue-17.json"))
def test_write_is_atomic_no_tmp_left(self):
fs.write_forge_state(_state())
path = fs.forge_state_path("didericis", "bot-bottle", 17)
self.assertFalse(path.with_suffix(".json.tmp").exists())
self.assertTrue(path.exists())
def test_update_overwrites(self):
fs.write_forge_state(_state(status=STATUS_RUNNING))
fs.write_forge_state(_state(status=STATUS_FROZEN))
got = fs.read_forge_state("didericis", "bot-bottle", 17)
assert got is not None
self.assertEqual(STATUS_FROZEN, got.status)
def test_delete_is_idempotent(self):
fs.write_forge_state(_state())
fs.delete_forge_state("didericis", "bot-bottle", 17)
fs.delete_forge_state("didericis", "bot-bottle", 17) # no raise
self.assertIsNone(fs.read_forge_state("didericis", "bot-bottle", 17))
def test_all_forge_states_lists_across_repos(self):
fs.write_forge_state(_state(issue_number=17))
fs.write_forge_state(_state(issue_number=18, slug="other"))
fs.write_forge_state(_state(owner="acme", repo="widget", issue_number=3))
states = fs.all_forge_states()
self.assertEqual(3, len(states))
self.assertEqual({17, 18, 3}, {s.issue_number for s in states})
def test_all_forge_states_empty_when_no_dir(self):
self.assertEqual([], fs.all_forge_states())
def test_from_dict_ignores_unknown_keys(self):
st = ForgeState.from_dict({
"owner": "o", "repo": "r", "issue_number": 1, "slug": "s",
"agent_name": "a", "future_field": "ignored",
})
self.assertEqual("o", st.owner)
self.assertIsNone(st.pr_number)
def test_pr_number_optional(self):
fs.write_forge_state(_state(pr_number=None))
got = fs.read_forge_state("didericis", "bot-bottle", 17)
assert got is not None
self.assertIsNone(got.pr_number)
if __name__ == "__main__":
unittest.main()
@@ -0,0 +1,81 @@
"""Unit: provenance footer (PRD forge-native-integration)."""
from __future__ import annotations
import unittest
from bot_bottle.contrib.gitea.provenance import build_provenance_footer
def _footer(slug: str = "implementer-abc12", **over) -> str:
base = {
"agent_name": "implementer",
"bottle_names": ("claude",),
"started_at": "2026-06-29T12:00:00-04:00",
"finished_at": "2026-06-29T12:04:12-04:00",
"exit_code": 0,
}
base.update(over)
return build_provenance_footer(slug, **base)
class ProvenanceTest(unittest.TestCase):
def test_required_fields_present(self):
out = _footer()
for token in ("Run provenance", "`implementer`", "`claude`",
"`implementer-abc12`", "| exit | 0 ✓ |"):
self.assertIn(token, out)
def test_collapsed_details_block(self):
out = _footer()
self.assertTrue(out.startswith("<details>"))
self.assertIn("</details>", out)
def test_duration_minutes_seconds(self):
self.assertIn("| duration | 4m 12s |", _footer())
def test_duration_under_a_minute(self):
out = _footer(finished_at="2026-06-29T12:00:30-04:00")
self.assertIn("| duration | 30s |", out)
def test_duration_unknown_on_bad_timestamp(self):
out = _footer(finished_at="not-a-time")
self.assertIn("| duration | unknown |", out)
def test_nonzero_exit_marked(self):
self.assertIn("| exit | 1 ✗ |", _footer(exit_code=1))
def test_watchdog_changes_done_signal_row(self):
normal = _footer()
self.assertIn("sidecar `signal_done`", normal)
fired = _footer(watchdog_fired=True)
self.assertIn("watchdog — agent did not signal", fired)
self.assertNotIn("sidecar `signal_done`", fired)
def test_gitleaks_states(self):
self.assertIn("not run", _footer())
self.assertIn("✓ no secrets detected", _footer(gitleaks_clean=True))
self.assertIn("✗ secrets detected", _footer(gitleaks_clean=False))
def test_egress_omitted_when_absent(self):
self.assertNotIn("**Egress**", _footer())
def test_egress_rendered_when_present(self):
out = _footer(egress_routes=[
"`api.anthropic.com` — Bearer auth",
"`pypi.org` — unauthenticated",
])
self.assertIn("**Egress** (deny-by-default; 2 routes allowed)", out)
self.assertIn("- `api.anthropic.com` — Bearer auth", out)
def test_egress_singular_route(self):
out = _footer(egress_routes=["`api.anthropic.com` — Bearer auth"])
self.assertIn("1 route allowed", out)
def test_multiple_bottles_listed(self):
out = _footer(bottle_names=("claude", "dev"))
self.assertIn("`claude`, `dev`", out)
if __name__ == "__main__":
unittest.main()