diff --git a/bot_bottle/cli/resume.py b/bot_bottle/cli/resume.py index 5dd421b..d454593 100644 --- a/bot_bottle/cli/resume.py +++ b/bot_bottle/cli/resume.py @@ -27,12 +27,34 @@ from .start import _launch_bottle def cmd_resume(argv: list[str]) -> int: parser = argparse.ArgumentParser(prog=f"{PROG} resume", add_help=True) parser.add_argument("--dry-run", action="store_true") + parser.add_argument( + "--headless", + action="store_true", + help=( + "non-interactive rehydrate: deliver --prompt to the agent and " + "skip the y/N preflight. For orchestrators / the freeze-rehydrate " + "loop." + ), + ) + parser.add_argument( + "--prompt", + default=None, + help="follow-up prompt delivered to the agent (required with --headless)", + ) parser.add_argument( "identity", help="bottle identity from a prior `start` (see its session-end output)", ) args = parser.parse_args(argv) + if args.prompt and not args.headless: + die("--prompt is only valid with --headless") + if args.headless and not args.prompt: + die( + "--headless requires --prompt: " + "./cli.py resume --headless --prompt 'Address the review'" + ) + metadata = read_metadata(args.identity) if metadata is None: die( @@ -56,4 +78,6 @@ def cmd_resume(argv: list[str]) -> int: spec, dry_run=args.dry_run, backend_name=backend_name, + assume_yes=args.headless, + headless_prompt_text=args.prompt or "", ) diff --git a/bot_bottle/contrib/forge/__init__.py b/bot_bottle/contrib/forge/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/bot_bottle/contrib/forge/base.py b/bot_bottle/contrib/forge/base.py new file mode 100644 index 0000000..b6d53d8 --- /dev/null +++ b/bot_bottle/contrib/forge/base.py @@ -0,0 +1,145 @@ +"""Forge abstraction (PRD forge-native-integration, chunk 3). + +The `Forge` abstract class is the provider-agnostic surface a forge +sidecar dispatches to: read issues/comments, post comments, edit +descriptions, and the membership / PR lookups the orchestrator needs. +Each forge (Gitea first) implements it; the sidecar protocol and the +agent prompt stay forge-agnostic. + +`signal_done` is deliberately *not* a `Forge` method — completion is a +sidecar concept relayed to the orchestrator over a queue dir, not a +forge API operation. + +`ScopedForge` enforces the PRD's **read-anywhere / write-scoped** model: +reads pass through to any issue/PR for context; writes are rejected +unless the target is the assigned issue or one of its PRs. This bounds +the blast radius of a prompt-injected agent below repo-wide API-key +permissions. +""" + +from __future__ import annotations + +import abc +from collections.abc import Iterable +from dataclasses import dataclass + + +@dataclass(frozen=True) +class Issue: + """A forge issue or PR (forges model PRs as issues with the same + number).""" + + number: int + title: str + body: str + state: str # "open" | "closed" + + +@dataclass(frozen=True) +class Comment: + id: int + user: str # login of the comment author + body: str + + +class ForgeScopeError(PermissionError): + """Raised by `ScopedForge` when a write targets an issue/PR outside + the assigned scope.""" + + +class Forge(abc.ABC): + """Provider-agnostic forge operations. Implementations wrap a + per-provider HTTP client and translate to `Issue` / `Comment`.""" + + @abc.abstractmethod + def read_issue(self, number: int) -> Issue: + """Read an issue or PR body (read-anywhere).""" + + @abc.abstractmethod + def read_comments(self, number: int) -> list[Comment]: + """Read a thread's comments (read-anywhere).""" + + @abc.abstractmethod + def post_comment(self, number: int, body: str) -> None: + """Post a comment to an issue or PR (write-scoped).""" + + @abc.abstractmethod + def update_description(self, number: int, body: str) -> None: + """Replace an issue or PR body (write-scoped).""" + + @abc.abstractmethod + def is_org_member(self, org: str, username: str) -> bool: + """Whether `username` is a member of `org`.""" + + @abc.abstractmethod + def get_pr_for_issue(self, number: int) -> int | None: + """The PR number linked to an issue, or None when there is none.""" + + @abc.abstractmethod + def is_pr_open(self, number: int) -> bool: + """Whether the given PR is still open.""" + + +class ScopedForge(Forge): + """Read-anywhere / write-scoped wrapper around a concrete `Forge`. + + `post_comment` and `update_description` are rejected with + `ForgeScopeError` unless the target number is the assigned issue or + one of the assigned PRs. Every other method delegates unchanged, so + reads, membership checks, and PR lookups work against any number for + context. + + The writable set is fixed at construction. The sidecar reconstructs + a `ScopedForge` when a PR is discovered (`get_pr_for_issue`) so the + new PR becomes writable; this class does not mutate its own scope. + """ + + def __init__( + self, + inner: Forge, + *, + assigned_issue: int, + assigned_prs: Iterable[int] = (), + ) -> None: + self._inner = inner + self._assigned_issue = assigned_issue + self._writable = {assigned_issue, *assigned_prs} + + @property + def writable(self) -> frozenset[int]: + return frozenset(self._writable) + + def _check_write(self, number: int) -> None: + if number not in self._writable: + allowed = ", ".join(str(n) for n in sorted(self._writable)) + raise ForgeScopeError( + f"write to #{number} denied: out of assigned scope " + f"(writable: {allowed})" + ) + + # --- read-anywhere: pass through -------------------------------------- + + def read_issue(self, number: int) -> Issue: + return self._inner.read_issue(number) + + def read_comments(self, number: int) -> list[Comment]: + return self._inner.read_comments(number) + + def is_org_member(self, org: str, username: str) -> bool: + return self._inner.is_org_member(org, username) + + def get_pr_for_issue(self, number: int) -> int | None: + return self._inner.get_pr_for_issue(number) + + def is_pr_open(self, number: int) -> bool: + return self._inner.is_pr_open(number) + + # --- write-scoped: check then delegate -------------------------------- + + def post_comment(self, number: int, body: str) -> None: + self._check_write(number) + self._inner.post_comment(number, body) + + def update_description(self, number: int, body: str) -> None: + self._check_write(number) + self._inner.update_description(number, body) diff --git a/bot_bottle/contrib/gitea/client.py b/bot_bottle/contrib/gitea/client.py new file mode 100644 index 0000000..5963249 --- /dev/null +++ b/bot_bottle/contrib/gitea/client.py @@ -0,0 +1,164 @@ +"""Gitea HTTP client + `GiteaForge` (PRD forge-native-integration, chunk 3). + +`GiteaClient` is the thin stdlib-only HTTP transport (mirrors +`deploy_key_provisioner.py`: `urllib.request`, bounded timeouts, +structured error bodies). `GiteaForge` adapts it to the provider-agnostic +`Forge` surface. + +Unlike the option-2 design, the token is held here (the sidecar process +owns it) and passed to the client directly — there is no agent-side +cred-proxy route, because the agent never makes forge calls. The HTTP +client is the one piece shared with `GiteaDeployKeyProvisioner`; the two +are deliberately *not* unified behind a common abstract base (see the +deferral note in the PRD). +""" + +from __future__ import annotations + +import json +import urllib.error +import urllib.request +from typing import Any + +from ..forge.base import Comment, Forge, Issue + +# Bound every Gitea call: a hung instance must not stall the sidecar. +_API_TIMEOUT_SECS = 30 + + +class GiteaClient: + """Thin authenticated HTTP client for one repo's Gitea API. + + `api_url` is the API base *including* `/api/v1` (matching the + `FORGE_GITEA_API` env var), e.g. `https://gitea.example.com/api/v1`. + """ + + def __init__(self, *, api_url: str, owner: str, repo: str, token: str) -> None: + self._api_url = api_url.rstrip("/") + self._owner = owner + self._repo = repo + self._token = token + + # --- low-level request ------------------------------------------------- + + def _request( + self, method: str, path: str, *, body: dict[str, Any] | None = None + ) -> tuple[int, Any]: + """Issue an authenticated request. Returns `(status, parsed_json)`; + parsed_json is None when the response has no body. Raises + `RuntimeError` on any non-2xx except where callers special-case + the HTTPError themselves (membership 404).""" + url = f"{self._api_url}{path}" + data = json.dumps(body).encode() if body is not None else None + headers = {"Authorization": f"token {self._token}"} + if data is not None: + headers["Content-Type"] = "application/json" + req = urllib.request.Request(url, data=data, headers=headers, method=method) + with urllib.request.urlopen(req, timeout=_API_TIMEOUT_SECS) as resp: + raw = resp.read() + parsed = json.loads(raw) if raw else None + return resp.status, parsed + + def _repo_path(self, suffix: str) -> str: + return f"/repos/{self._owner}/{self._repo}{suffix}" + + # --- operations -------------------------------------------------------- + + def is_org_member(self, org: str, username: str) -> bool: + """GET /orgs/{org}/members/{username}: 2xx → member, 404 → not. + Other errors propagate so a misconfigured token fails loudly.""" + url = f"{self._api_url}/orgs/{org}/members/{username}" + req = urllib.request.Request( + url, headers={"Authorization": f"token {self._token}"}, method="GET" + ) + try: + with urllib.request.urlopen(req, timeout=_API_TIMEOUT_SECS): + return True + except urllib.error.HTTPError as exc: + if exc.code == 404: + return False + raise RuntimeError( + f"org membership check failed for {org}/{username}: " + f"HTTP {exc.code} — {_read_error_body(exc)}" + ) from exc + + def get_issue(self, number: int) -> dict[str, Any]: + _status, body = self._request("GET", self._repo_path(f"/issues/{number}")) + return body or {} + + def get_comments(self, number: int) -> list[dict[str, Any]]: + _status, body = self._request( + "GET", self._repo_path(f"/issues/{number}/comments") + ) + return body or [] + + def post_comment(self, number: int, body: str) -> None: + self._request( + "POST", + self._repo_path(f"/issues/{number}/comments"), + body={"body": body}, + ) + + def patch_issue_body(self, number: int, body: str) -> None: + self._request( + "PATCH", self._repo_path(f"/issues/{number}"), body={"body": body} + ) + + def get_pull(self, number: int) -> dict[str, Any]: + _status, body = self._request("GET", self._repo_path(f"/pulls/{number}")) + return body or {} + + +class GiteaForge(Forge): + """`Forge` over a `GiteaClient`.""" + + def __init__(self, client: GiteaClient) -> None: + self._client = client + + def read_issue(self, number: int) -> Issue: + raw = self._client.get_issue(number) + return Issue( + number=int(raw.get("number", number)), + title=str(raw.get("title", "")), + body=str(raw.get("body", "") or ""), + state=str(raw.get("state", "")), + ) + + def read_comments(self, number: int) -> list[Comment]: + return [ + Comment( + id=int(c.get("id", 0)), + user=str((c.get("user") or {}).get("login", "")), + body=str(c.get("body", "") or ""), + ) + for c in self._client.get_comments(number) + ] + + def post_comment(self, number: int, body: str) -> None: + self._client.post_comment(number, body) + + def update_description(self, number: int, body: str) -> None: + self._client.patch_issue_body(number, body) + + def is_org_member(self, org: str, username: str) -> bool: + return self._client.is_org_member(org, username) + + def get_pr_for_issue(self, number: int) -> int | None: + """Gitea models a PR as an issue with the same number, exposing a + `pull_request` object on the issue. When the queried number is + itself a PR, return it; otherwise None. (The orchestrator tracks + the issue→PR mapping in forge state for the cross-number case.)""" + raw = self._client.get_issue(number) + if raw.get("pull_request"): + return int(raw.get("number", number)) + return None + + def is_pr_open(self, number: int) -> bool: + return self._client.get_pull(number).get("state") == "open" + + +def _read_error_body(exc: urllib.error.HTTPError) -> str: + try: + return exc.read().decode("utf-8", errors="replace") + except Exception: # pylint: disable=broad-exception-caught + return "" diff --git a/bot_bottle/contrib/gitea/forge_state.py b/bot_bottle/contrib/gitea/forge_state.py new file mode 100644 index 0000000..d7c69ee --- /dev/null +++ b/bot_bottle/contrib/gitea/forge_state.py @@ -0,0 +1,105 @@ +"""Forge state persistence (PRD forge-native-integration, chunk 2). + +The orchestrator tracks one record per forge-targeted issue so it can +map an incoming webhook back to the bottle handling it, drive the +freeze / rehydrate loop, and run the watchdog. State lives on disk and +survives orchestrator restarts: + + ~/.bot-bottle/forge///issue-.json + +Writes are atomic (`os.replace`) so a crash mid-write never leaves a +truncated record. +""" + +from __future__ import annotations + +import json +import os +from dataclasses import asdict, dataclass, field, fields +from typing import Any +from pathlib import Path + +from ...supervise import bot_bottle_root + +_FORGE_SUBDIR = "forge" + +# Lifecycle: a bottle is launched (running), frozen on the done signal, +# and destroyed when the PR closes. +STATUS_RUNNING = "running" +STATUS_FROZEN = "frozen" +STATUS_DESTROYED = "destroyed" + + +@dataclass +class ForgeState: + """One forge-targeted issue's bottle lifecycle record.""" + + owner: str + repo: str + issue_number: int + slug: str + agent_name: str + bottle_names: list[str] = field(default_factory=list) + backend_name: str = "" + agent_git_user: str = "" + pr_number: int | None = None + status: str = STATUS_RUNNING + last_checkin_at: str = "" + + def to_json(self) -> str: + return json.dumps(asdict(self), indent=2, sort_keys=True) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "ForgeState": + # Tolerate unknown keys (forward-compat) by filtering to fields. + known = {f.name for f in fields(cls)} + return cls(**{k: v for k, v in data.items() if k in known}) + + +def _forge_root() -> Path: + return bot_bottle_root() / _FORGE_SUBDIR + + +def forge_state_path(owner: str, repo: str, issue_number: int) -> Path: + return _forge_root() / owner / repo / f"issue-{issue_number}.json" + + +def write_forge_state(state: ForgeState) -> None: + """Persist `state` atomically. Creates parent dirs as needed.""" + path = forge_state_path(state.owner, state.repo, state.issue_number) + path.parent.mkdir(parents=True, exist_ok=True) + tmp = path.with_suffix(".json.tmp") + tmp.write_text(state.to_json()) + os.replace(tmp, path) + + +def read_forge_state(owner: str, repo: str, issue_number: int) -> ForgeState | None: + """Load state for one issue, or None when no record exists.""" + path = forge_state_path(owner, repo, issue_number) + try: + data = json.loads(path.read_text()) + except FileNotFoundError: + return None + return ForgeState.from_dict(data) + + +def delete_forge_state(owner: str, repo: str, issue_number: int) -> None: + """Remove an issue's record. Missing file is success (idempotent).""" + path = forge_state_path(owner, repo, issue_number) + path.unlink(missing_ok=True) + + +def all_forge_states() -> list[ForgeState]: + """Every persisted record, for the orchestrate-status table and the + watchdog sweep. Unreadable files are skipped rather than aborting the + whole listing.""" + root = _forge_root() + if not root.is_dir(): + return [] + states: list[ForgeState] = [] + for path in sorted(root.glob("*/*/issue-*.json")): + try: + states.append(ForgeState.from_dict(json.loads(path.read_text()))) + except (OSError, ValueError, TypeError): + continue + return states diff --git a/bot_bottle/contrib/gitea/provenance.py b/bot_bottle/contrib/gitea/provenance.py new file mode 100644 index 0000000..a443ebb --- /dev/null +++ b/bot_bottle/contrib/gitea/provenance.py @@ -0,0 +1,103 @@ +"""Provenance footer (PRD forge-native-integration, chunk 5). + +Every orchestrator-posted comment ends with this footer — non-optional +and not configurable off. It renders the run's audit trail (agent, +bottle, timing, exit, gitleaks, done-signal source, egress) as a +collapsed markdown block the reviewer sees at the moment of the merge +decision. + +The function is pure: the orchestrator, which holds the run context, +supplies the values. In particular `egress_routes` is the pre-rendered +list of allowed-route lines the orchestrator computed from the run's +resolved egress policy — this module does not parse backend-specific +egress state. (The PRD sketch named an `egress_log_path`; passing the +already-rendered lines keeps the footer builder pure and fully testable +and leaves egress-state parsing where the data lives.) +""" + +from __future__ import annotations + +from datetime import datetime + + +def _parse(ts: str) -> datetime | None: + try: + return datetime.fromisoformat(ts) + except (ValueError, TypeError): + return None + + +def _format_duration(started_at: str, finished_at: str) -> str: + start = _parse(started_at) + end = _parse(finished_at) + if start is None or end is None: + return "unknown" + secs = int((end - start).total_seconds()) + if secs < 0: + return "unknown" + if secs < 60: + return f"{secs}s" + return f"{secs // 60}m {secs % 60}s" + + +def build_provenance_footer( + slug: str, + *, + agent_name: str, + bottle_names: tuple[str, ...], + started_at: str, + finished_at: str, + exit_code: int, + watchdog_fired: bool = False, + gitleaks_clean: bool | None = None, + egress_routes: list[str] | None = None, +) -> str: + """Return a markdown string for appending to a Gitea comment body. + + `watchdog_fired=True` marks runs where the agent did not signal + completion, so reviewers know the audit trail may be incomplete. + `gitleaks_clean=None` renders the gitleaks row as "not run". + `egress_routes` is omitted entirely when None/empty. + """ + bottle_label = ", ".join(f"`{b}`" for b in bottle_names) if bottle_names else "—" + exit_cell = f"{exit_code} {'✓' if exit_code == 0 else '✗'}" + + if gitleaks_clean is None: + gitleaks_cell = "— not run" + elif gitleaks_clean: + gitleaks_cell = "✓ no secrets detected" + else: + gitleaks_cell = "✗ secrets detected" + + if watchdog_fired: + done_cell = "watchdog — agent did not signal" + else: + done_cell = "sidecar `signal_done`" + + lines = [ + "
🔬 Run provenance", + "", + "| Field | Value |", + "|---|---|", + f"| agent | `{agent_name}` |", + f"| bottle | {bottle_label} |", + f"| slug | `{slug}` |", + f"| started | {started_at} |", + f"| duration | {_format_duration(started_at, finished_at)} |", + f"| exit | {exit_cell} |", + f"| gitleaks | {gitleaks_cell} |", + f"| done signal | {done_cell} |", + ] + + if egress_routes: + lines.append("") + lines.append( + f"**Egress** (deny-by-default; {len(egress_routes)} " + f"route{'s' if len(egress_routes) != 1 else ''} allowed)" + ) + for route in egress_routes: + lines.append(f"- {route}") + + lines.append("") + lines.append("
") + return "\n".join(lines) diff --git a/tests/unit/test_cli_resume_headless.py b/tests/unit/test_cli_resume_headless.py new file mode 100644 index 0000000..3d1399b --- /dev/null +++ b/tests/unit/test_cli_resume_headless.py @@ -0,0 +1,74 @@ +"""Unit: `cli.py resume --headless` non-interactive rehydrate path. + +The freeze / rehydrate loop needs a non-interactive `resume`: deliver a +follow-up prompt and skip the y/N preflight, reusing the same launch +core (`assume_yes` + `headless_prompt_text`) as `start --headless`. +""" + +from __future__ import annotations + +import unittest +from unittest.mock import MagicMock, patch + +import bot_bottle.cli.resume as resume_mod +from bot_bottle.log import Die + + +def _metadata(): + md = MagicMock() + md.agent_name = "implementer" + md.copy_cwd = False + md.cwd = "/repo" + md.identity = "implementer-abc12" + md.bottle_names = ["claude"] + md.backend = "docker" + return md + + +class ResumeHeadlessTest(unittest.TestCase): + def setUp(self) -> None: + self._launch = patch.object( + resume_mod, "_launch_bottle", return_value=0 + ).start() + patch.object( + resume_mod, "read_metadata", return_value=_metadata() + ).start() + manifest = MagicMock() + manifest.require_agent = MagicMock(return_value=None) + patch.object( + resume_mod.ManifestIndex, "resolve", return_value=manifest + ).start() + self.addCleanup(patch.stopall) + + def _launch_kwargs(self) -> dict: + self._launch.assert_called_once() + return self._launch.call_args.kwargs + + def test_headless_passes_assume_yes_and_prompt(self): + rc = resume_mod.cmd_resume( + ["implementer-abc12", "--headless", "--prompt", "Address the review"] + ) + self.assertEqual(0, rc) + kwargs = self._launch_kwargs() + self.assertTrue(kwargs["assume_yes"]) + self.assertEqual("Address the review", kwargs["headless_prompt_text"]) + + def test_interactive_resume_unchanged(self): + resume_mod.cmd_resume(["implementer-abc12"]) + kwargs = self._launch_kwargs() + self.assertFalse(kwargs["assume_yes"]) + self.assertEqual("", kwargs["headless_prompt_text"]) + + def test_headless_without_prompt_errors(self): + with self.assertRaises(Die): + resume_mod.cmd_resume(["implementer-abc12", "--headless"]) + self._launch.assert_not_called() + + def test_prompt_without_headless_errors(self): + with self.assertRaises(Die): + resume_mod.cmd_resume(["implementer-abc12", "--prompt", "hi"]) + self._launch.assert_not_called() + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/test_contrib_forge_base.py b/tests/unit/test_contrib_forge_base.py new file mode 100644 index 0000000..108d360 --- /dev/null +++ b/tests/unit/test_contrib_forge_base.py @@ -0,0 +1,95 @@ +"""Unit: Forge abstraction + ScopedForge (PRD forge-native-integration).""" + +from __future__ import annotations + +import unittest + +from bot_bottle.contrib.forge.base import ( + Comment, + Forge, + ForgeScopeError, + Issue, + ScopedForge, +) + + +class _RecordingForge(Forge): + """In-memory fake that records writes.""" + + def __init__(self) -> None: + self.comments: list[tuple[int, str]] = [] + self.descriptions: list[tuple[int, str]] = [] + + def read_issue(self, number: int) -> Issue: + return Issue(number=number, title="t", body="b", state="open") + + def read_comments(self, number: int) -> list[Comment]: + return [Comment(id=1, user="alice", body="hi")] + + def post_comment(self, number: int, body: str) -> None: + self.comments.append((number, body)) + + def update_description(self, number: int, body: str) -> None: + self.descriptions.append((number, body)) + + def is_org_member(self, org: str, username: str) -> bool: + return username == "member" + + def get_pr_for_issue(self, number: int) -> int | None: + return 99 if number == 17 else None + + def is_pr_open(self, number: int) -> bool: + return True + + +class TestScopedForgeReads(unittest.TestCase): + def setUp(self) -> None: + self.inner = _RecordingForge() + self.scoped = ScopedForge(self.inner, assigned_issue=17, assigned_prs=[42]) + + def test_reads_pass_through_to_any_number(self): + # A number well outside the writable scope still reads fine. + self.assertEqual(123, self.scoped.read_issue(123).number) + self.assertEqual("alice", self.scoped.read_comments(500)[0].user) + + def test_membership_and_pr_lookups_delegate(self): + self.assertTrue(self.scoped.is_org_member("bot-bottle", "member")) + self.assertFalse(self.scoped.is_org_member("bot-bottle", "stranger")) + self.assertEqual(99, self.scoped.get_pr_for_issue(17)) + self.assertTrue(self.scoped.is_pr_open(8000)) + + +class TestScopedForgeWrites(unittest.TestCase): + def setUp(self) -> None: + self.inner = _RecordingForge() + self.scoped = ScopedForge(self.inner, assigned_issue=17, assigned_prs=[42]) + + def test_writable_set_is_issue_plus_prs(self): + self.assertEqual(frozenset({17, 42}), self.scoped.writable) + + def test_write_to_assigned_issue_allowed(self): + self.scoped.post_comment(17, "done") + self.assertEqual([(17, "done")], self.inner.comments) + + def test_write_to_assigned_pr_allowed(self): + self.scoped.update_description(42, "new body") + self.assertEqual([(42, "new body")], self.inner.descriptions) + + def test_comment_outside_scope_rejected(self): + with self.assertRaises(ForgeScopeError) as ctx: + self.scoped.post_comment(500, "spam") + self.assertIn("500", str(ctx.exception)) + self.assertEqual([], self.inner.comments) + + def test_description_outside_scope_rejected(self): + with self.assertRaises(ForgeScopeError): + self.scoped.update_description(500, "tamper") + self.assertEqual([], self.inner.descriptions) + + def test_scope_error_is_permission_error(self): + # Sidecars can catch the stdlib base type. + self.assertTrue(issubclass(ForgeScopeError, PermissionError)) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/test_contrib_gitea_client.py b/tests/unit/test_contrib_gitea_client.py new file mode 100644 index 0000000..7660bcb --- /dev/null +++ b/tests/unit/test_contrib_gitea_client.py @@ -0,0 +1,131 @@ +"""Unit: GiteaClient + GiteaForge (PRD forge-native-integration).""" + +from __future__ import annotations + +import json +import unittest +import urllib.error +from io import BytesIO +from unittest.mock import MagicMock, patch + +from bot_bottle.contrib.gitea.client import GiteaClient, GiteaForge + + +def _client() -> GiteaClient: + return GiteaClient( + api_url="https://gitea.example.com/api/v1", + owner="didericis", + repo="bot-bottle", + token="test-token", + ) + + +def _resp(body, status: int = 200) -> MagicMock: + resp = MagicMock() + resp.read.return_value = json.dumps(body).encode() if body is not None else b"" + resp.status = status + resp.__enter__ = lambda s: s + resp.__exit__ = MagicMock(return_value=False) + return resp + + +def _http_error(code: int, body: str = "") -> urllib.error.HTTPError: + return urllib.error.HTTPError( + url="http://x", code=code, msg="err", hdrs=None, # type: ignore[arg-type] + fp=BytesIO(body.encode()), + ) + + +_URLOPEN = "bot_bottle.contrib.gitea.client.urllib.request.urlopen" + + +class TestOrgMembership(unittest.TestCase): + def test_member_returns_true_on_2xx(self): + with patch(_URLOPEN, return_value=_resp(None, 204)) as m: + self.assertTrue(_client().is_org_member("bot-bottle", "alice")) + req = m.call_args.args[0] + self.assertIn("/orgs/bot-bottle/members/alice", req.full_url) + + def test_nonmember_returns_false_on_404(self): + with patch(_URLOPEN, side_effect=_http_error(404)): + self.assertFalse(_client().is_org_member("bot-bottle", "stranger")) + + def test_other_http_error_raises(self): + with patch(_URLOPEN, side_effect=_http_error(403, "forbidden")): + with self.assertRaises(RuntimeError) as ctx: + _client().is_org_member("bot-bottle", "alice") + self.assertIn("403", str(ctx.exception)) + + +class TestForgeReads(unittest.TestCase): + def test_read_issue_maps_fields(self): + raw = {"number": 17, "title": "Bug", "body": "broken", "state": "open"} + with patch(_URLOPEN, return_value=_resp(raw)) as m: + issue = GiteaForge(_client()).read_issue(17) + self.assertEqual((17, "Bug", "broken", "open"), + (issue.number, issue.title, issue.body, issue.state)) + self.assertIn("/repos/didericis/bot-bottle/issues/17", + m.call_args.args[0].full_url) + + def test_read_issue_tolerates_null_body(self): + raw = {"number": 17, "title": "T", "body": None, "state": "open"} + with patch(_URLOPEN, return_value=_resp(raw)): + self.assertEqual("", GiteaForge(_client()).read_issue(17).body) + + def test_read_comments_maps_user_login(self): + raw = [ + {"id": 1, "user": {"login": "alice"}, "body": "hi"}, + {"id": 2, "user": {"login": "bob"}, "body": "yo"}, + ] + with patch(_URLOPEN, return_value=_resp(raw)): + comments = GiteaForge(_client()).read_comments(17) + self.assertEqual(["alice", "bob"], [c.user for c in comments]) + self.assertEqual([1, 2], [c.id for c in comments]) + + +class TestForgeWrites(unittest.TestCase): + def test_post_comment_payload_and_url(self): + with patch(_URLOPEN, return_value=_resp(None, 201)) as m: + GiteaForge(_client()).post_comment(17, "done ✓") + req = m.call_args.args[0] + self.assertEqual("POST", req.method) + self.assertIn("/repos/didericis/bot-bottle/issues/17/comments", req.full_url) + self.assertEqual("done ✓", json.loads(req.data)["body"]) + + def test_update_description_patches_issue(self): + with patch(_URLOPEN, return_value=_resp(None, 200)) as m: + GiteaForge(_client()).update_description(17, "edited") + req = m.call_args.args[0] + self.assertEqual("PATCH", req.method) + self.assertTrue(req.full_url.endswith("/issues/17")) + self.assertEqual("edited", json.loads(req.data)["body"]) + + def test_auth_header_sent(self): + with patch(_URLOPEN, return_value=_resp(None, 201)) as m: + GiteaForge(_client()).post_comment(17, "x") + self.assertEqual("token test-token", + m.call_args.args[0].headers["Authorization"]) + + +class TestPRHelpers(unittest.TestCase): + def test_get_pr_for_issue_returns_number_when_issue_is_pr(self): + raw = {"number": 18, "pull_request": {"merged": False}} + with patch(_URLOPEN, return_value=_resp(raw)): + self.assertEqual(18, GiteaForge(_client()).get_pr_for_issue(18)) + + def test_get_pr_for_issue_none_for_plain_issue(self): + raw = {"number": 17, "pull_request": None} + with patch(_URLOPEN, return_value=_resp(raw)): + self.assertIsNone(GiteaForge(_client()).get_pr_for_issue(17)) + + def test_is_pr_open_true_when_state_open(self): + with patch(_URLOPEN, return_value=_resp({"state": "open"})): + self.assertTrue(GiteaForge(_client()).is_pr_open(18)) + + def test_is_pr_open_false_when_closed(self): + with patch(_URLOPEN, return_value=_resp({"state": "closed"})): + self.assertFalse(GiteaForge(_client()).is_pr_open(18)) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/test_contrib_gitea_forge_state.py b/tests/unit/test_contrib_gitea_forge_state.py new file mode 100644 index 0000000..4d540f4 --- /dev/null +++ b/tests/unit/test_contrib_gitea_forge_state.py @@ -0,0 +1,103 @@ +"""Unit: forge state persistence (PRD forge-native-integration).""" + +from __future__ import annotations + +import tempfile +import unittest +from pathlib import Path +from unittest.mock import patch + +from bot_bottle.contrib.gitea import forge_state as fs +from bot_bottle.contrib.gitea.forge_state import ( + STATUS_FROZEN, + STATUS_RUNNING, + ForgeState, +) + + +def _state(**over) -> ForgeState: + base = { + "owner": "didericis", + "repo": "bot-bottle", + "issue_number": 17, + "slug": "implementer-abc12", + "agent_name": "implementer", + "bottle_names": ["claude"], + "backend_name": "docker", + "agent_git_user": "didericis-claude", + "pr_number": 42, + "status": STATUS_FROZEN, + "last_checkin_at": "2026-06-29T12:04:12-04:00", + } + base.update(over) + return ForgeState(**base) + + +class ForgeStateTest(unittest.TestCase): + def setUp(self) -> None: + # enterContext handles cleanup; pylint doesn't recognize it as CM-aware. + root = Path(self.enterContext( # pylint: disable=consider-using-with + tempfile.TemporaryDirectory())) + patcher = patch.object(fs, "bot_bottle_root", return_value=root) + patcher.start() + self.addCleanup(patcher.stop) + + def test_round_trip(self): + fs.write_forge_state(_state()) + got = fs.read_forge_state("didericis", "bot-bottle", 17) + self.assertEqual(_state(), got) + + def test_missing_returns_none(self): + self.assertIsNone(fs.read_forge_state("nobody", "nope", 1)) + + def test_path_layout(self): + path = fs.forge_state_path("didericis", "bot-bottle", 17) + self.assertTrue(str(path).endswith("forge/didericis/bot-bottle/issue-17.json")) + + def test_write_is_atomic_no_tmp_left(self): + fs.write_forge_state(_state()) + path = fs.forge_state_path("didericis", "bot-bottle", 17) + self.assertFalse(path.with_suffix(".json.tmp").exists()) + self.assertTrue(path.exists()) + + def test_update_overwrites(self): + fs.write_forge_state(_state(status=STATUS_RUNNING)) + fs.write_forge_state(_state(status=STATUS_FROZEN)) + got = fs.read_forge_state("didericis", "bot-bottle", 17) + assert got is not None + self.assertEqual(STATUS_FROZEN, got.status) + + def test_delete_is_idempotent(self): + fs.write_forge_state(_state()) + fs.delete_forge_state("didericis", "bot-bottle", 17) + fs.delete_forge_state("didericis", "bot-bottle", 17) # no raise + self.assertIsNone(fs.read_forge_state("didericis", "bot-bottle", 17)) + + def test_all_forge_states_lists_across_repos(self): + fs.write_forge_state(_state(issue_number=17)) + fs.write_forge_state(_state(issue_number=18, slug="other")) + fs.write_forge_state(_state(owner="acme", repo="widget", issue_number=3)) + states = fs.all_forge_states() + self.assertEqual(3, len(states)) + self.assertEqual({17, 18, 3}, {s.issue_number for s in states}) + + def test_all_forge_states_empty_when_no_dir(self): + self.assertEqual([], fs.all_forge_states()) + + def test_from_dict_ignores_unknown_keys(self): + st = ForgeState.from_dict({ + "owner": "o", "repo": "r", "issue_number": 1, "slug": "s", + "agent_name": "a", "future_field": "ignored", + }) + self.assertEqual("o", st.owner) + self.assertIsNone(st.pr_number) + + def test_pr_number_optional(self): + fs.write_forge_state(_state(pr_number=None)) + got = fs.read_forge_state("didericis", "bot-bottle", 17) + assert got is not None + self.assertIsNone(got.pr_number) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/test_contrib_gitea_provenance.py b/tests/unit/test_contrib_gitea_provenance.py new file mode 100644 index 0000000..b652558 --- /dev/null +++ b/tests/unit/test_contrib_gitea_provenance.py @@ -0,0 +1,81 @@ +"""Unit: provenance footer (PRD forge-native-integration).""" + +from __future__ import annotations + +import unittest + +from bot_bottle.contrib.gitea.provenance import build_provenance_footer + + +def _footer(slug: str = "implementer-abc12", **over) -> str: + base = { + "agent_name": "implementer", + "bottle_names": ("claude",), + "started_at": "2026-06-29T12:00:00-04:00", + "finished_at": "2026-06-29T12:04:12-04:00", + "exit_code": 0, + } + base.update(over) + return build_provenance_footer(slug, **base) + + +class ProvenanceTest(unittest.TestCase): + def test_required_fields_present(self): + out = _footer() + for token in ("Run provenance", "`implementer`", "`claude`", + "`implementer-abc12`", "| exit | 0 ✓ |"): + self.assertIn(token, out) + + def test_collapsed_details_block(self): + out = _footer() + self.assertTrue(out.startswith("
")) + self.assertIn("
", out) + + def test_duration_minutes_seconds(self): + self.assertIn("| duration | 4m 12s |", _footer()) + + def test_duration_under_a_minute(self): + out = _footer(finished_at="2026-06-29T12:00:30-04:00") + self.assertIn("| duration | 30s |", out) + + def test_duration_unknown_on_bad_timestamp(self): + out = _footer(finished_at="not-a-time") + self.assertIn("| duration | unknown |", out) + + def test_nonzero_exit_marked(self): + self.assertIn("| exit | 1 ✗ |", _footer(exit_code=1)) + + def test_watchdog_changes_done_signal_row(self): + normal = _footer() + self.assertIn("sidecar `signal_done`", normal) + fired = _footer(watchdog_fired=True) + self.assertIn("watchdog — agent did not signal", fired) + self.assertNotIn("sidecar `signal_done`", fired) + + def test_gitleaks_states(self): + self.assertIn("not run", _footer()) + self.assertIn("✓ no secrets detected", _footer(gitleaks_clean=True)) + self.assertIn("✗ secrets detected", _footer(gitleaks_clean=False)) + + def test_egress_omitted_when_absent(self): + self.assertNotIn("**Egress**", _footer()) + + def test_egress_rendered_when_present(self): + out = _footer(egress_routes=[ + "`api.anthropic.com` — Bearer auth", + "`pypi.org` — unauthenticated", + ]) + self.assertIn("**Egress** (deny-by-default; 2 routes allowed)", out) + self.assertIn("- `api.anthropic.com` — Bearer auth", out) + + def test_egress_singular_route(self): + out = _footer(egress_routes=["`api.anthropic.com` — Bearer auth"]) + self.assertIn("1 route allowed", out) + + def test_multiple_bottles_listed(self): + out = _footer(bottle_names=("claude", "dev")) + self.assertIn("`claude`, `dev`", out) + + +if __name__ == "__main__": + unittest.main()