Compare commits

..

9 Commits

Author SHA1 Message Date
didericis-claude 520e6f545d feat: expose stable Python API for programmatic bottle orchestration
lint / lint (push) Failing after 2m1s
test / unit (pull_request) Successful in 50s
test / integration (pull_request) Successful in 18s
test / coverage (pull_request) Failing after 1m8s
Add bot_bottle/api.py with four public functions the orchestrator uses:
start_headless, resume_headless, freeze, and destroy. These let a
ProgrammaticBottleRunner call directly into bot_bottle instead of
shelling out to the CLI; call sites in lifecycle.py stay unchanged.

Key changes:
- BottleSpec gains forge_env field for forge sidecar credentials
- _launch_bottle returns (slug, exit_code) instead of int so start_headless
  can return the slug to callers
- All four API functions convert Die and non-zero exits to BottleError
- 27 new unit tests; existing tests updated for the new return type

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-07-01 17:11:13 +00:00
didericis 42004d37fd refactor(forge): address PR #318 review — PR/Issue split, sqlite state, drop footer
lint / lint (push) Successful in 1m59s
test / unit (pull_request) Successful in 54s
test / integration (pull_request) Successful in 19s
test / coverage (pull_request) Successful in 1m4s
Addresses the five review comments on PR #318:

- Split PullRequest from Issue and add a dedicated read_pr method on
  Forge/ScopedForge/GiteaForge (a PR carries merge state an issue does
  not); is_pr_open now derives from read_pr.
- Replace the JSON-file forge state with a thin swappable CRUD interface
  (ForgeStateStore) backed by SQLite (SqliteForgeStateStore) at
  ~/.bot-bottle/bot-bottle.db.
- Remove the provenance footer (provenance.py + its test): a mutable,
  unsigned PR comment is not an audit record.
- Reword the PRD: provenance is exposed via an API, not surfaced in the
  PR; document the Issue/PullRequest split and the SQLite store.

pyright clean (whole repo), pylint 10/10, 38 forge/resume unit tests pass;
no remaining refs to the removed provenance module or old JSON state API.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01WL77TgFxKbs3cidGMG9dz7
2026-07-01 08:37:25 -04:00
didericis f211ece6bf fix(tests): resolve pyright strict errors in forge test helpers
lint / lint (push) Successful in 2m18s
test / unit (pull_request) Successful in 1m1s
test / integration (pull_request) Successful in 22s
test / coverage (pull_request) Successful in 1m19s
CI runs `pyright .` over the whole repo including tests; the earlier
run only checked the source paths. The test helpers used `**over`
dict-splat into typed constructors, which pyright strict rejects.

- forge_state: build a typed ForgeState base and dataclasses.replace(**over)
- provenance: explicit typed keyword params instead of a **over dict
- resume: _launch_kwargs returns dict[str, Any] (copy call_args.kwargs)
- forge_base: assert PermissionError in __mro__ (avoids always-true issubclass)
- client: annotate _resp body param; type: ignore the mock __enter__ lambda

pyright . now 0 errors; 47 tests still pass; pylint 9.97/10.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01WL77TgFxKbs3cidGMG9dz7
2026-07-01 07:58:55 -04:00
didericis a229a22d54 feat(forge): forge library layer for native integration (PRD chunks 1-3, 5)
lint / lint (push) Failing after 2m9s
test / unit (pull_request) Successful in 58s
test / integration (pull_request) Successful in 21s
test / coverage (pull_request) Successful in 1m23s
Implements the bot-bottle side of the forge-native PRD that is
self-contained in this repo (the forge sidecar and orchestrate command
belong to the separate bot-bottle-orchestrator, a PRD non-goal):

- contrib/forge/base.py: Forge ABC + ScopedForge enforcing the
  read-anywhere / write-scoped model (writes rejected outside the
  assigned issue/PRs via ForgeScopeError).
- contrib/gitea/client.py: GiteaClient (stdlib-only HTTP, mirrors the
  deploy-key provisioner) + GiteaForge. Token held by the caller (the
  sidecar), not injected by cred-proxy.
- contrib/gitea/forge_state.py: ForgeState dataclass + atomic
  read/write/delete/all under ~/.bot-bottle/forge/<owner>/<repo>/.
- contrib/gitea/provenance.py: build_provenance_footer — collapsed
  markdown audit footer; watchdog/gitleaks/egress rendering.
- cli/resume.py: `resume --headless --prompt` reusing the shipped
  assume_yes + headless_prompt launch core (the new half of chunk 1).

47 new unit tests; pylint 9.98/10, pyright clean. Forge sidecar (chunk
4), orchestrate command (chunk 6), and forge_env plumbing are deferred:
their only consumer is the separate orchestrator and they are untestable
in isolation here.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01WL77TgFxKbs3cidGMG9dz7
2026-06-30 19:39:49 -04:00
didericis 738990b2df Merge remote-tracking branch 'origin/main' into forge-native-integration 2026-06-30 19:19:01 -04:00
didericis 4cb106b48d docs(prd): reconcile headless primitives with shipped start --headless
#315 already merged `start --headless` (assume_yes on _launch_bottle +
AgentProvider.headless_prompt). The PRD's proposed start_headless /
attach_agent_headless helpers were redundant with it, and the latter
diverged by hand-rolling --no-interactive/-p instead of using the
headless_prompt provider abstraction. Drop them.

Scope the remaining headless work to what's actually new: a forge_env
hook threaded into the existing _launch_bottle core, and a `resume
--headless` path (resume has no non-interactive entry point today).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01WL77TgFxKbs3cidGMG9dz7
2026-06-30 17:46:59 -04:00
didericis ebad90bfa9 docs(prd): adopt forge sidecar (option 3) for native integration
Flip the forge-native-integration PRD from option 2 (agent calls the
Gitea API directly via cred-proxy; done signal parsed from comments) to
option 3 per issue #317 comment 2715: a forge sidecar backed by a Forge
abstract class.

- signal_done(status, summary) replaces comment-parsing as the done signal
- semantic audit trail from the sidecar feeds provenance directly
- read-anywhere / write-scoped enforcement, tighter than repo-wide API keys
- forge-agnostic agent prompts and sidecar protocol
- DeployKeyProvisioner subsumption deferred; share the HTTP client only

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01WL77TgFxKbs3cidGMG9dz7
2026-06-30 17:39:33 -04:00
didericis-claude 1789deaf73 docs: update forge PRD — orchestrator split, done signal, org targeting, forge env vars 2026-06-29 12:39:02 -04:00
didericis-claude b607d68a0e docs: add PRD for forge native integration 2026-06-29 12:10:41 -04:00
25 changed files with 1976 additions and 136 deletions
+2 -2
View File
@@ -5,8 +5,8 @@
# bot-bottle # bot-bottle
[![test](https://gitea.dideric.is/didericis/bot-bottle/actions/workflows/test.yml/badge.svg?branch=main)](https://gitea.dideric.is/didericis/bot-bottle/actions?workflow=test.yml) [![test](https://gitea.dideric.is/didericis/bot-bottle/actions/workflows/test.yml/badge.svg?branch=main)](https://gitea.dideric.is/didericis/bot-bottle/actions?workflow=test.yml)
[![coverage](https://img.shields.io/badge/coverage-83%25-brightgreen)](https://coverage.readthedocs.io/) [![coverage](https://img.shields.io/badge/coverage-84%25-brightgreen)](https://coverage.readthedocs.io/)
[![core coverage](https://img.shields.io/badge/core%20coverage-95%25-brightgreen)](https://gitea.dideric.is/didericis/bot-bottle/src/branch/main/docs/decisions/0004-coverage-policy.md) [![core coverage](https://img.shields.io/badge/core%20coverage-96%25-brightgreen)](https://gitea.dideric.is/didericis/bot-bottle/src/branch/main/docs/decisions/0004-coverage-policy.md)
**Problem:** Developer wants to run a coding agent without supervision, but they don't want a prompt injected or misbehaving agent wrecking their environment or exfiltrating sensitive data. **Problem:** Developer wants to run a coding agent without supervision, but they don't want a prompt injected or misbehaving agent wrecking their environment or exfiltrating sensitive data.
+10
View File
@@ -1 +1,11 @@
"""bot-bottle: Python implementation of the agent container launcher.""" """bot-bottle: Python implementation of the agent container launcher."""
from .api import BottleError, destroy, freeze, resume_headless, start_headless
__all__ = [
"BottleError",
"destroy",
"freeze",
"resume_headless",
"start_headless",
]
+258
View File
@@ -0,0 +1,258 @@
"""Public Python API for programmatic bottle orchestration.
Stable surface for bot-bottle-orchestrator (and other Python callers) to
drive bottles without invoking the CLI as a subprocess. Every function
converts ``Die`` and non-zero agent exit codes to ``BottleError`` so
callers use exception handling rather than inspecting return values.
The Protocol the orchestrator's ``BottleRunner`` targets looks like::
class BottleRunner(Protocol):
def start(self, agent: str, *, prompt: str, ...) -> str: ...
def resume(self, slug: str, *, prompt: str) -> None: ...
def freeze(self, slug: str) -> None: ...
def destroy(self, slug: str) -> None: ...
A ``SubprocessBottleRunner`` calls ``./cli.py`` for each operation. A
``ProgrammaticBottleRunner`` calls these functions directly; the Protocol
call sites in ``lifecycle.py`` are unchanged.
"""
from __future__ import annotations
from typing import Sequence
from .backend import BottleSpec
from .backend.freeze import CommitCancelled, get_freezer
from .bottle_state import cleanup_state, clear_preserve_marker, read_metadata
from .cli._common import USER_CWD
from .cli.start import _launch_bottle, _peek_agent_bottle, _uniquify_label_headless
from .log import Die
from .manifest import ManifestError, ManifestIndex
class BottleError(Exception):
"""Raised when a bottle operation fails.
``exit_code`` carries the agent process's exit code when the failure is
a non-zero agent exit; 1 for all other failure modes (missing state,
backend errors, etc.)."""
def __init__(self, message: str, *, exit_code: int = 1) -> None:
super().__init__(message)
self.exit_code = exit_code
def start_headless(
agent_name: str,
*,
prompt: str,
bottles: Sequence[str] | None = None,
label: str | None = None,
color: str | None = None,
backend_name: str | None = None,
copy_cwd: bool = False,
forge_env: dict[str, str] | None = None,
user_cwd: str | None = None,
) -> str:
"""Launch a new bottle headlessly. Returns the bottle slug.
``forge_env`` is passed through to the forge sidecar (not the agent)
when the bottle is forge-targeted; it carries the credentials and
context the sidecar needs to call the forge API.
Raises ``BottleError`` on configuration errors or if the agent exits
non-zero. The returned slug can be passed to ``freeze()``,
``resume_headless()``, or ``destroy()`` for subsequent lifecycle
operations."""
cwd = user_cwd or USER_CWD
try:
manifest = ManifestIndex.resolve(cwd)
manifest.require_agent(agent_name)
except (Die, ManifestError) as exc:
raise BottleError(str(exc)) from exc
if bottles:
bottle_names: tuple[str, ...] = tuple(bottles)
else:
default_bottle = _peek_agent_bottle(manifest, agent_name)
if not default_bottle:
raise BottleError(
f"agent '{agent_name}' has no default bottle; "
f"pass bottles=[...]"
)
bottle_names = (default_bottle,)
spec = BottleSpec(
manifest=manifest,
agent_name=agent_name,
copy_cwd=copy_cwd,
user_cwd=cwd,
label=_uniquify_label_headless(label or agent_name),
color=color or "",
bottle_names=bottle_names,
forge_env=dict(forge_env) if forge_env else {},
)
try:
slug, exit_code = _launch_bottle(
spec,
dry_run=False,
backend_name=backend_name,
assume_yes=True,
headless_prompt_text=prompt,
)
except Die as exc:
raise BottleError(exc.message, exit_code=exc.code) from exc
if exit_code != 0:
raise BottleError(
f"agent exited {exit_code} (slug={slug!r})", exit_code=exit_code
)
return slug
def resume_headless(
slug: str,
*,
prompt: str,
backend_name: str | None = None,
forge_env: dict[str, str] | None = None,
) -> None:
"""Resume a frozen bottle headlessly with ``prompt``.
``forge_env`` re-supplies forge context for the new session (the
sidecar is relaunched alongside the agent on resume).
Raises ``BottleError`` on missing state, backend errors, or non-zero
agent exit."""
metadata = read_metadata(slug)
if metadata is None:
raise BottleError(
f"no state recorded for slug {slug!r}; "
f"check ~/.bot-bottle/state/ or call start_headless() to create a new bottle"
)
try:
manifest = ManifestIndex.resolve(metadata.cwd or USER_CWD)
manifest.require_agent(metadata.agent_name)
except (Die, ManifestError) as exc:
raise BottleError(str(exc)) from exc
spec = BottleSpec(
manifest=manifest,
agent_name=metadata.agent_name,
copy_cwd=metadata.copy_cwd,
user_cwd=metadata.cwd or USER_CWD,
identity=metadata.identity,
bottle_names=tuple(metadata.bottle_names),
forge_env=dict(forge_env) if forge_env else {},
)
try:
_, exit_code = _launch_bottle(
spec,
dry_run=False,
backend_name=backend_name or metadata.backend or None,
assume_yes=True,
headless_prompt_text=prompt,
)
except Die as exc:
raise BottleError(exc.message, exit_code=exc.code) from exc
if exit_code != 0:
raise BottleError(
f"agent exited {exit_code} resuming {slug!r}", exit_code=exit_code
)
def freeze(slug: str, *, backend_name: str | None = None) -> None:
"""Freeze the named bottle to a resumable artifact.
Reads the bottle's backend from its metadata when ``backend_name`` is
not supplied. Raises ``BottleError`` if the freeze fails."""
metadata = read_metadata(slug)
resolved_backend = backend_name or (metadata.backend if metadata else "") or "docker"
try:
get_freezer(resolved_backend).commit_slug(slug)
except CommitCancelled as exc:
raise BottleError(f"freeze cancelled for {slug!r}") from exc
except Die as exc:
raise BottleError(exc.message, exit_code=exc.code) from exc
def destroy(slug: str, *, backend_name: str | None = None) -> None:
"""Destroy the named bottle, removing all resources and state.
Brings down any running resources for ``slug``, then removes the
per-bottle state directory. Idempotent: a slug with no running
resources or no state directory is not an error."""
metadata = read_metadata(slug)
resolved_backend = backend_name or (metadata.backend if metadata else "") or "docker"
try:
if resolved_backend == "docker":
_destroy_docker(slug)
elif resolved_backend == "smolmachines":
_destroy_smolmachines(slug)
# macos-container: the container is torn down inside the launch
# context manager; no persistent VM survives, so nothing extra is
# needed at destroy time beyond the state-dir removal below.
except Die as exc:
raise BottleError(exc.message, exit_code=exc.code) from exc
clear_preserve_marker(slug)
cleanup_state(slug)
# --- backend-specific helpers -----------------------------------------------
def _destroy_docker(slug: str) -> None:
"""Best-effort ``docker compose down`` for a Docker bottle.
No-op when the compose file is absent — the project was already
brought down (normal for a frozen bottle) or was never created."""
from .backend.docker.compose import (
compose_down,
compose_file_path,
compose_project_name,
)
from .bottle_state import bottle_state_dir
state_dir = bottle_state_dir(slug)
compose_file = compose_file_path(state_dir)
if compose_file.exists():
compose_down(compose_project_name(slug), compose_file)
def _destroy_smolmachines(slug: str) -> None:
"""Best-effort stop + delete for a smolmachines bottle.
Both steps are best-effort: a machine that is already gone does not
cause an error; partial failures are logged as warnings."""
import subprocess
from .log import warn
machine = f"bot-bottle-{slug}"
subprocess.run(
["smolvm", "machine", "stop", "--name", machine],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
)
r = subprocess.run(
["smolvm", "machine", "delete", "-f", machine],
capture_output=True,
text=True,
check=False,
)
if r.returncode != 0:
warn(
f"smolvm machine delete -f {machine!r} failed "
f"(may already be gone): {(r.stderr or '').strip()}"
)
__all__ = [
"BottleError",
"destroy",
"freeze",
"resume_headless",
"start_headless",
]
+6 -1
View File
@@ -37,7 +37,7 @@ import shlex
import sys import sys
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from contextlib import AbstractContextManager from contextlib import AbstractContextManager
from dataclasses import dataclass from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from typing import Any, Generic, Sequence, TypeVar from typing import Any, Generic, Sequence, TypeVar
@@ -75,6 +75,11 @@ class BottleSpec:
# Ordered bottle names selected at launch (issue #269). When non-empty # Ordered bottle names selected at launch (issue #269). When non-empty
# they are merged in order and replace the agent's `bottle:` field. # they are merged in order and replace the agent's `bottle:` field.
bottle_names: tuple[str, ...] = () bottle_names: tuple[str, ...] = ()
# Forge sidecar env vars (PRD forge-native-integration, chunk 1).
# Passed by the orchestrator at launch time; the forge sidecar reads
# them to connect to Gitea. Empty for non-forge runs. The agent
# process itself does not receive these.
forge_env: dict[str, str] = field(default_factory=dict)
@dataclass(frozen=True) @dataclass(frozen=True)
+1 -9
View File
@@ -37,10 +37,7 @@ from pathlib import Path
from typing import Callable, Generator from typing import Callable, Generator
from ...egress import egress_resolve_token_values from ...egress import egress_resolve_token_values
from ...git_gate import ( from ...git_gate import revoke_git_gate_provisioned_keys
provision_git_gate_dynamic_keys,
revoke_git_gate_provisioned_keys,
)
from ...log import info, warn from ...log import info, warn
from . import network as network_mod from . import network as network_mod
from . import util as docker_mod from . import util as docker_mod
@@ -121,11 +118,6 @@ def launch(
git_gate_plan = plan.git_gate_plan git_gate_plan = plan.git_gate_plan
if git_gate_plan.upstreams: if git_gate_plan.upstreams:
git_gate_plan = provision_git_gate_dynamic_keys(
plan.manifest.bottle,
git_gate_plan,
git_gate_state_dir(plan.slug),
)
git_gate_plan = dataclasses.replace( git_gate_plan = dataclasses.replace(
git_gate_plan, git_gate_plan,
internal_network=internal_network, internal_network=internal_network,
+1 -19
View File
@@ -28,10 +28,7 @@ from ...egress import (
egress_resolve_token_values, egress_resolve_token_values,
egress_sidecar_env_entries, egress_sidecar_env_entries,
) )
from ...git_gate import ( from ...git_gate import revoke_git_gate_provisioned_keys
provision_git_gate_dynamic_keys,
revoke_git_gate_provisioned_keys,
)
from ...log import die, info, warn from ...log import die, info, warn
from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT
from ...util import expand_tilde from ...util import expand_tilde
@@ -101,8 +98,6 @@ def launch(
egress_network = egress_network_name(plan.slug) egress_network = egress_network_name(plan.slug)
_create_networks(internal_network, egress_network, stack) _create_networks(internal_network, egress_network, stack)
plan = _provision_git_gate_keys(plan)
sidecar_name = sidecar_container_name(plan.slug) sidecar_name = sidecar_container_name(plan.slug)
container_mod.force_remove_container(sidecar_name) container_mod.force_remove_container(sidecar_name)
_start_sidecar_bundle(plan, sidecar_name, internal_network, egress_network) _start_sidecar_bundle(plan, sidecar_name, internal_network, egress_network)
@@ -246,19 +241,6 @@ def _stamp_agent_urls(
) )
def _provision_git_gate_keys(
plan: MacosContainerBottlePlan,
) -> MacosContainerBottlePlan:
if not plan.git_gate_plan.upstreams:
return plan
git_gate_plan = provision_git_gate_dynamic_keys(
plan.manifest.bottle,
plan.git_gate_plan,
git_gate_state_dir(plan.slug),
)
return dataclasses.replace(plan, git_gate_plan=git_gate_plan)
def _stage_git_gate(plan: MacosContainerBottlePlan, sidecar_name: str) -> None: def _stage_git_gate(plan: MacosContainerBottlePlan, sidecar_name: str) -> None:
gp = plan.git_gate_plan gp = plan.git_gate_plan
if not gp.upstreams: if not gp.upstreams:
+1 -18
View File
@@ -41,10 +41,7 @@ from ..docker.git_gate import (
GIT_GATE_ENTRYPOINT_IN_CONTAINER, GIT_GATE_ENTRYPOINT_IN_CONTAINER,
GIT_GATE_HOOK_IN_CONTAINER, GIT_GATE_HOOK_IN_CONTAINER,
) )
from ...git_gate import ( from ...git_gate import revoke_git_gate_provisioned_keys
provision_git_gate_dynamic_keys,
revoke_git_gate_provisioned_keys,
)
from ...log import info, warn from ...log import info, warn
from ...bottle_state import ( from ...bottle_state import (
egress_state_dir, egress_state_dir,
@@ -177,7 +174,6 @@ def _start_bundle(
) -> SmolmachinesBottlePlan: ) -> SmolmachinesBottlePlan:
"""Build the BundleLaunchSpec, resolve token env, start the """Build the BundleLaunchSpec, resolve token env, start the
sidecar bundle container, and register teardown.""" sidecar bundle container, and register teardown."""
plan = _provision_git_gate_keys(plan)
bundle_spec = _bundle_launch_spec(plan, network, loopback_ip) bundle_spec = _bundle_launch_spec(plan, network, loopback_ip)
token_env = _resolve_token_env(plan, dict(os.environ)) token_env = _resolve_token_env(plan, dict(os.environ))
_bundle.ensure_bundle_image(bundle_spec.image) _bundle.ensure_bundle_image(bundle_spec.image)
@@ -186,19 +182,6 @@ def _start_bundle(
return plan return plan
def _provision_git_gate_keys(
plan: SmolmachinesBottlePlan,
) -> SmolmachinesBottlePlan:
if not plan.git_gate_plan.upstreams:
return plan
git_gate_plan = provision_git_gate_dynamic_keys(
plan.manifest.bottle,
plan.git_gate_plan,
git_gate_state_dir(plan.slug),
)
return dataclasses.replace(plan, git_gate_plan=git_gate_plan)
def _discover_urls( def _discover_urls(
plan: SmolmachinesBottlePlan, plan: SmolmachinesBottlePlan,
loopback_ip: str, loopback_ip: str,
+26 -1
View File
@@ -27,12 +27,34 @@ from .start import _launch_bottle
def cmd_resume(argv: list[str]) -> int: def cmd_resume(argv: list[str]) -> int:
parser = argparse.ArgumentParser(prog=f"{PROG} resume", add_help=True) parser = argparse.ArgumentParser(prog=f"{PROG} resume", add_help=True)
parser.add_argument("--dry-run", action="store_true") parser.add_argument("--dry-run", action="store_true")
parser.add_argument(
"--headless",
action="store_true",
help=(
"non-interactive rehydrate: deliver --prompt to the agent and "
"skip the y/N preflight. For orchestrators / the freeze-rehydrate "
"loop."
),
)
parser.add_argument(
"--prompt",
default=None,
help="follow-up prompt delivered to the agent (required with --headless)",
)
parser.add_argument( parser.add_argument(
"identity", "identity",
help="bottle identity from a prior `start` (see its session-end output)", help="bottle identity from a prior `start` (see its session-end output)",
) )
args = parser.parse_args(argv) args = parser.parse_args(argv)
if args.prompt and not args.headless:
die("--prompt is only valid with --headless")
if args.headless and not args.prompt:
die(
"--headless requires --prompt: "
"./cli.py resume <identity> --headless --prompt 'Address the review'"
)
metadata = read_metadata(args.identity) metadata = read_metadata(args.identity)
if metadata is None: if metadata is None:
die( die(
@@ -52,8 +74,11 @@ def cmd_resume(argv: list[str]) -> int:
bottle_names=tuple(metadata.bottle_names), bottle_names=tuple(metadata.bottle_names),
) )
backend_name = metadata.backend or None backend_name = metadata.backend or None
return _launch_bottle( _, rc = _launch_bottle(
spec, spec,
dry_run=args.dry_run, dry_run=args.dry_run,
backend_name=backend_name, backend_name=backend_name,
assume_yes=args.headless,
headless_prompt_text=args.prompt or "",
) )
return rc
+13 -5
View File
@@ -144,11 +144,12 @@ def cmd_start(argv: list[str]) -> int:
color=color, color=color,
bottle_names=bottle_names, bottle_names=bottle_names,
) )
return _launch_bottle( _, rc = _launch_bottle(
spec, spec,
dry_run=dry_run, dry_run=dry_run,
backend_name=backend_name, backend_name=backend_name,
) )
return rc
# --- Headless launch ----------------------------------------------------- # --- Headless launch -----------------------------------------------------
@@ -203,13 +204,14 @@ def _start_headless(
color=args.color or "", color=args.color or "",
bottle_names=bottle_names, bottle_names=bottle_names,
) )
return _launch_bottle( _, rc = _launch_bottle(
spec, spec,
dry_run=dry_run, dry_run=dry_run,
backend_name=backend_name, backend_name=backend_name,
assume_yes=True, assume_yes=True,
headless_prompt_text=prompt, headless_prompt_text=prompt,
) )
return rc
def _uniquify_label_headless(label: str) -> str: def _uniquify_label_headless(label: str) -> str:
@@ -497,11 +499,16 @@ def _launch_bottle(
backend_name: str | None = None, backend_name: str | None = None,
assume_yes: bool = False, assume_yes: bool = False,
headless_prompt_text: str = "", headless_prompt_text: str = "",
) -> int: ) -> tuple[str, int]:
"""Shared launch core for `start` and `resume`. Builds the plan, """Shared launch core for `start` and `resume`. Builds the plan,
prints / dry-runs / prompts as appropriate, brings the bottle up, prints / dry-runs / prompts as appropriate, brings the bottle up,
attaches claude, and prints the resume hint on session end. attaches claude, and prints the resume hint on session end.
Returns ``(slug, exit_code)`` where ``slug`` is the bottle identity
(empty string when the launch was aborted before a slug was minted)
and ``exit_code`` is the agent process's exit code (0 on clean exit
or when launch was aborted before the agent ran).
`assume_yes` skips the interactive y/N confirmation (headless / `assume_yes` skips the interactive y/N confirmation (headless /
orchestrator launches), where there is no human at the prompt. orchestrator launches), where there is no human at the prompt.
@@ -510,6 +517,7 @@ def _launch_bottle(
agent receives the initial task without interactive input.""" agent receives the initial task without interactive input."""
stage_dir = Path(tempfile.mkdtemp(prefix="bot-bottle-stage.")) stage_dir = Path(tempfile.mkdtemp(prefix="bot-bottle-stage."))
identity = "" identity = ""
exit_code = 0
try: try:
plan, identity = prepare_with_preflight( plan, identity = prepare_with_preflight(
spec, spec,
@@ -520,7 +528,7 @@ def _launch_bottle(
backend_name=backend_name, backend_name=backend_name,
) )
if plan is None: if plan is None:
return 0 return identity, 0
backend = get_bottle_backend(backend_name) backend = get_bottle_backend(backend_name)
with backend.launch(plan) as bottle: with backend.launch(plan) as bottle:
@@ -547,7 +555,7 @@ def _launch_bottle(
# Ctrl-Cs / OOM kills before cleanup removes the state dir. # Ctrl-Cs / OOM kills before cleanup removes the state dir.
if agent_provider_template == "claude": if agent_provider_template == "claude":
capture_claude_session_state(identity, exit_code) capture_claude_session_state(identity, exit_code)
return 0 return identity, exit_code
finally: finally:
# PRD 0018 chunk 2: prepare now writes the bottle's bind-mount # PRD 0018 chunk 2: prepare now writes the bottle's bind-mount
# sources under state/<slug>/. If we never reached the # sources under state/<slug>/. If we never reached the
+165
View File
@@ -0,0 +1,165 @@
"""Forge abstraction (PRD forge-native-integration, chunk 3).
The `Forge` abstract class is the provider-agnostic surface a forge
sidecar dispatches to: read issues/comments, post comments, edit
descriptions, and the membership / PR lookups the orchestrator needs.
Each forge (Gitea first) implements it; the sidecar protocol and the
agent prompt stay forge-agnostic.
`signal_done` is deliberately *not* a `Forge` method — completion is a
sidecar concept relayed to the orchestrator over a queue dir, not a
forge API operation.
`ScopedForge` enforces the PRD's **read-anywhere / write-scoped** model:
reads pass through to any issue/PR for context; writes are rejected
unless the target is the assigned issue or one of its PRs. This bounds
the blast radius of a prompt-injected agent below repo-wide API-key
permissions.
"""
from __future__ import annotations
import abc
from collections.abc import Iterable
from dataclasses import dataclass
@dataclass(frozen=True)
class Issue:
"""A forge issue (not a PR — see `PullRequest`)."""
number: int
title: str
body: str
state: str # "open" | "closed"
@dataclass(frozen=True)
class PullRequest:
"""A forge pull request. Kept distinct from `Issue` even though some
forges model PRs as issues on the wire: the domain objects carry
different data (a PR has merge state) and are read through different
methods (`read_pr` vs `read_issue`)."""
number: int
title: str
body: str
state: str # "open" | "closed"
merged: bool
@dataclass(frozen=True)
class Comment:
id: int
user: str # login of the comment author
body: str
class ForgeScopeError(PermissionError):
"""Raised by `ScopedForge` when a write targets an issue/PR outside
the assigned scope."""
class Forge(abc.ABC):
"""Provider-agnostic forge operations. Implementations wrap a
per-provider HTTP client and translate to `Issue` / `Comment`."""
@abc.abstractmethod
def read_issue(self, number: int) -> Issue:
"""Read an issue body (read-anywhere)."""
@abc.abstractmethod
def read_pr(self, number: int) -> PullRequest:
"""Read a pull request, including its merge state (read-anywhere)."""
@abc.abstractmethod
def read_comments(self, number: int) -> list[Comment]:
"""Read a thread's comments (read-anywhere)."""
@abc.abstractmethod
def post_comment(self, number: int, body: str) -> None:
"""Post a comment to an issue or PR (write-scoped)."""
@abc.abstractmethod
def update_description(self, number: int, body: str) -> None:
"""Replace an issue or PR body (write-scoped)."""
@abc.abstractmethod
def is_org_member(self, org: str, username: str) -> bool:
"""Whether `username` is a member of `org`."""
@abc.abstractmethod
def get_pr_for_issue(self, number: int) -> int | None:
"""The PR number linked to an issue, or None when there is none."""
@abc.abstractmethod
def is_pr_open(self, number: int) -> bool:
"""Whether the given PR is still open."""
class ScopedForge(Forge):
"""Read-anywhere / write-scoped wrapper around a concrete `Forge`.
`post_comment` and `update_description` are rejected with
`ForgeScopeError` unless the target number is the assigned issue or
one of the assigned PRs. Every other method delegates unchanged, so
reads, membership checks, and PR lookups work against any number for
context.
The writable set is fixed at construction. The sidecar reconstructs
a `ScopedForge` when a PR is discovered (`get_pr_for_issue`) so the
new PR becomes writable; this class does not mutate its own scope.
"""
def __init__(
self,
inner: Forge,
*,
assigned_issue: int,
assigned_prs: Iterable[int] = (),
) -> None:
self._inner = inner
self._assigned_issue = assigned_issue
self._writable = {assigned_issue, *assigned_prs}
@property
def writable(self) -> frozenset[int]:
return frozenset(self._writable)
def _check_write(self, number: int) -> None:
if number not in self._writable:
allowed = ", ".join(str(n) for n in sorted(self._writable))
raise ForgeScopeError(
f"write to #{number} denied: out of assigned scope "
f"(writable: {allowed})"
)
# --- read-anywhere: pass through --------------------------------------
def read_issue(self, number: int) -> Issue:
return self._inner.read_issue(number)
def read_pr(self, number: int) -> PullRequest:
return self._inner.read_pr(number)
def read_comments(self, number: int) -> list[Comment]:
return self._inner.read_comments(number)
def is_org_member(self, org: str, username: str) -> bool:
return self._inner.is_org_member(org, username)
def get_pr_for_issue(self, number: int) -> int | None:
return self._inner.get_pr_for_issue(number)
def is_pr_open(self, number: int) -> bool:
return self._inner.is_pr_open(number)
# --- write-scoped: check then delegate --------------------------------
def post_comment(self, number: int, body: str) -> None:
self._check_write(number)
self._inner.post_comment(number, body)
def update_description(self, number: int, body: str) -> None:
self._check_write(number)
self._inner.update_description(number, body)
+174
View File
@@ -0,0 +1,174 @@
"""Gitea HTTP client + `GiteaForge` (PRD forge-native-integration, chunk 3).
`GiteaClient` is the thin stdlib-only HTTP transport (mirrors
`deploy_key_provisioner.py`: `urllib.request`, bounded timeouts,
structured error bodies). `GiteaForge` adapts it to the provider-agnostic
`Forge` surface.
Unlike the option-2 design, the token is held here (the sidecar process
owns it) and passed to the client directly — there is no agent-side
cred-proxy route, because the agent never makes forge calls. The HTTP
client is the one piece shared with `GiteaDeployKeyProvisioner`; the two
are deliberately *not* unified behind a common abstract base (see the
deferral note in the PRD).
"""
from __future__ import annotations
import json
import urllib.error
import urllib.request
from typing import Any
from ..forge.base import Comment, Forge, Issue, PullRequest
# Bound every Gitea call: a hung instance must not stall the sidecar.
_API_TIMEOUT_SECS = 30
class GiteaClient:
"""Thin authenticated HTTP client for one repo's Gitea API.
`api_url` is the API base *including* `/api/v1` (matching the
`FORGE_GITEA_API` env var), e.g. `https://gitea.example.com/api/v1`.
"""
def __init__(self, *, api_url: str, owner: str, repo: str, token: str) -> None:
self._api_url = api_url.rstrip("/")
self._owner = owner
self._repo = repo
self._token = token
# --- low-level request -------------------------------------------------
def _request(
self, method: str, path: str, *, body: dict[str, Any] | None = None
) -> tuple[int, Any]:
"""Issue an authenticated request. Returns `(status, parsed_json)`;
parsed_json is None when the response has no body. Raises
`RuntimeError` on any non-2xx except where callers special-case
the HTTPError themselves (membership 404)."""
url = f"{self._api_url}{path}"
data = json.dumps(body).encode() if body is not None else None
headers = {"Authorization": f"token {self._token}"}
if data is not None:
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, data=data, headers=headers, method=method)
with urllib.request.urlopen(req, timeout=_API_TIMEOUT_SECS) as resp:
raw = resp.read()
parsed = json.loads(raw) if raw else None
return resp.status, parsed
def _repo_path(self, suffix: str) -> str:
return f"/repos/{self._owner}/{self._repo}{suffix}"
# --- operations --------------------------------------------------------
def is_org_member(self, org: str, username: str) -> bool:
"""GET /orgs/{org}/members/{username}: 2xx → member, 404 → not.
Other errors propagate so a misconfigured token fails loudly."""
url = f"{self._api_url}/orgs/{org}/members/{username}"
req = urllib.request.Request(
url, headers={"Authorization": f"token {self._token}"}, method="GET"
)
try:
with urllib.request.urlopen(req, timeout=_API_TIMEOUT_SECS):
return True
except urllib.error.HTTPError as exc:
if exc.code == 404:
return False
raise RuntimeError(
f"org membership check failed for {org}/{username}: "
f"HTTP {exc.code}{_read_error_body(exc)}"
) from exc
def get_issue(self, number: int) -> dict[str, Any]:
_status, body = self._request("GET", self._repo_path(f"/issues/{number}"))
return body or {}
def get_comments(self, number: int) -> list[dict[str, Any]]:
_status, body = self._request(
"GET", self._repo_path(f"/issues/{number}/comments")
)
return body or []
def post_comment(self, number: int, body: str) -> None:
self._request(
"POST",
self._repo_path(f"/issues/{number}/comments"),
body={"body": body},
)
def patch_issue_body(self, number: int, body: str) -> None:
self._request(
"PATCH", self._repo_path(f"/issues/{number}"), body={"body": body}
)
def get_pull(self, number: int) -> dict[str, Any]:
_status, body = self._request("GET", self._repo_path(f"/pulls/{number}"))
return body or {}
class GiteaForge(Forge):
"""`Forge` over a `GiteaClient`."""
def __init__(self, client: GiteaClient) -> None:
self._client = client
def read_issue(self, number: int) -> Issue:
raw = self._client.get_issue(number)
return Issue(
number=int(raw.get("number", number)),
title=str(raw.get("title", "")),
body=str(raw.get("body", "") or ""),
state=str(raw.get("state", "")),
)
def read_pr(self, number: int) -> PullRequest:
raw = self._client.get_pull(number)
return PullRequest(
number=int(raw.get("number", number)),
title=str(raw.get("title", "")),
body=str(raw.get("body", "") or ""),
state=str(raw.get("state", "")),
merged=bool(raw.get("merged", False)),
)
def read_comments(self, number: int) -> list[Comment]:
return [
Comment(
id=int(c.get("id", 0)),
user=str((c.get("user") or {}).get("login", "")),
body=str(c.get("body", "") or ""),
)
for c in self._client.get_comments(number)
]
def post_comment(self, number: int, body: str) -> None:
self._client.post_comment(number, body)
def update_description(self, number: int, body: str) -> None:
self._client.patch_issue_body(number, body)
def is_org_member(self, org: str, username: str) -> bool:
return self._client.is_org_member(org, username)
def get_pr_for_issue(self, number: int) -> int | None:
"""Gitea models a PR as an issue with the same number, exposing a
`pull_request` object on the issue. When the queried number is
itself a PR, return it; otherwise None. (The orchestrator tracks
the issue→PR mapping in forge state for the cross-number case.)"""
raw = self._client.get_issue(number)
if raw.get("pull_request"):
return int(raw.get("number", number))
return None
def is_pr_open(self, number: int) -> bool:
return self.read_pr(number).state == "open"
def _read_error_body(exc: urllib.error.HTTPError) -> str:
try:
return exc.read().decode("utf-8", errors="replace")
except Exception: # pylint: disable=broad-exception-caught
return ""
+171
View File
@@ -0,0 +1,171 @@
"""Forge state persistence (PRD forge-native-integration, chunk 2).
The orchestrator tracks one record per forge-targeted issue so it can
map an incoming webhook back to the bottle handling it, drive the
freeze / rehydrate loop, and run the watchdog.
State is stored in a local SQLite database in `~/.bot-bottle/`. Access
goes through the thin `ForgeStateStore` CRUD interface so the backing
store (location or engine) can be swapped without touching callers;
`SqliteForgeStateStore` is the first implementation.
"""
from __future__ import annotations
import abc
import json
import sqlite3
from dataclasses import dataclass, field
from pathlib import Path
from ...supervise import bot_bottle_root
_DB_FILENAME = "bot-bottle.db"
# Lifecycle: a bottle is launched (running), frozen on the done signal,
# and destroyed when the PR closes.
STATUS_RUNNING = "running"
STATUS_FROZEN = "frozen"
STATUS_DESTROYED = "destroyed"
@dataclass
class ForgeState:
"""One forge-targeted issue's bottle lifecycle record."""
owner: str
repo: str
issue_number: int
slug: str
agent_name: str
bottle_names: list[str] = field(default_factory=list)
backend_name: str = ""
agent_git_user: str = ""
pr_number: int | None = None
status: str = STATUS_RUNNING
last_checkin_at: str = ""
class ForgeStateStore(abc.ABC):
"""Thin CRUD surface over forge state. Implementations back it with a
concrete store; callers depend only on this interface so the storage
location/engine is swappable."""
@abc.abstractmethod
def upsert(self, state: ForgeState) -> None:
"""Insert or replace the record keyed by (owner, repo, issue)."""
@abc.abstractmethod
def get(self, owner: str, repo: str, issue_number: int) -> ForgeState | None:
"""Fetch one record, or None when absent."""
@abc.abstractmethod
def delete(self, owner: str, repo: str, issue_number: int) -> None:
"""Remove a record. Missing is success (idempotent)."""
@abc.abstractmethod
def all(self) -> list[ForgeState]:
"""Every record, for the status table and the watchdog sweep."""
def default_db_path() -> Path:
return bot_bottle_root() / _DB_FILENAME
class SqliteForgeStateStore(ForgeStateStore):
"""SQLite-backed `ForgeStateStore`. The database lives at
`~/.bot-bottle/bot-bottle.db` by default; pass `db_path` to point at
a different location (tests, alternate homes)."""
def __init__(self, db_path: Path | None = None) -> None:
self._db_path = db_path or default_db_path()
self._db_path.parent.mkdir(parents=True, exist_ok=True)
with self._connect() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS forge_state (
owner TEXT NOT NULL,
repo TEXT NOT NULL,
issue_number INTEGER NOT NULL,
slug TEXT NOT NULL,
agent_name TEXT NOT NULL,
bottle_names TEXT NOT NULL,
backend_name TEXT NOT NULL,
agent_git_user TEXT NOT NULL,
pr_number INTEGER,
status TEXT NOT NULL,
last_checkin_at TEXT NOT NULL,
PRIMARY KEY (owner, repo, issue_number)
)
"""
)
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self._db_path)
conn.row_factory = sqlite3.Row
return conn
def upsert(self, state: ForgeState) -> None:
with self._connect() as conn:
conn.execute(
"""
INSERT OR REPLACE INTO forge_state (
owner, repo, issue_number, slug, agent_name,
bottle_names, backend_name, agent_git_user,
pr_number, status, last_checkin_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
state.owner,
state.repo,
state.issue_number,
state.slug,
state.agent_name,
json.dumps(state.bottle_names),
state.backend_name,
state.agent_git_user,
state.pr_number,
state.status,
state.last_checkin_at,
),
)
def get(self, owner: str, repo: str, issue_number: int) -> ForgeState | None:
with self._connect() as conn:
row = conn.execute(
"SELECT * FROM forge_state "
"WHERE owner = ? AND repo = ? AND issue_number = ?",
(owner, repo, issue_number),
).fetchone()
return _row_to_state(row) if row is not None else None
def delete(self, owner: str, repo: str, issue_number: int) -> None:
with self._connect() as conn:
conn.execute(
"DELETE FROM forge_state "
"WHERE owner = ? AND repo = ? AND issue_number = ?",
(owner, repo, issue_number),
)
def all(self) -> list[ForgeState]:
with self._connect() as conn:
rows = conn.execute(
"SELECT * FROM forge_state ORDER BY owner, repo, issue_number"
).fetchall()
return [_row_to_state(row) for row in rows]
def _row_to_state(row: sqlite3.Row) -> ForgeState:
return ForgeState(
owner=row["owner"],
repo=row["repo"],
issue_number=row["issue_number"],
slug=row["slug"],
agent_name=row["agent_name"],
bottle_names=json.loads(row["bottle_names"]),
backend_name=row["backend_name"],
agent_git_user=row["agent_git_user"],
pr_number=row["pr_number"],
status=row["status"],
last_checkin_at=row["last_checkin_at"],
)
+11 -6
View File
@@ -30,6 +30,7 @@ backend-specific and lives on concrete subclasses (see
from __future__ import annotations from __future__ import annotations
import dataclasses
from abc import ABC from abc import ABC
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
@@ -52,7 +53,6 @@ from .git_gate_render import (
_gitconfig_validate_value, _gitconfig_validate_value,
) )
from .git_gate_provision import ( from .git_gate_provision import (
provision_git_gate_dynamic_keys,
revoke_git_gate_provisioned_keys, revoke_git_gate_provisioned_keys,
_provision_dynamic_key, _provision_dynamic_key,
_resolve_identity_file, _resolve_identity_file,
@@ -93,14 +93,20 @@ class GitGate(ABC):
entrypoint, pre-receive hook, and access-hook scripts (mode entrypoint, pre-receive hook, and access-hook scripts (mode
600) under `stage_dir`. Pure host-side, no docker subprocess. 600) under `stage_dir`. Pure host-side, no docker subprocess.
For `gitea` key entries, the returned upstream intentionally For `gitea` key entries, also generates and registers
has an empty identity file. Backend launch fills that in after a fresh deploy key via the forge API and writes the private key
the operator confirms the preflight. + key ID to `stage_dir`.
Returned plan is incomplete: the launch step must fill Returned plan is incomplete: the launch step must fill
`internal_network` / `egress_network` via `dataclasses.replace` `internal_network` / `egress_network` via `dataclasses.replace`
before passing the plan to `.start`.""" before passing the plan to `.start`."""
upstreams = git_gate_upstreams_for_bottle(bottle) upstreams_list = list(git_gate_upstreams_for_bottle(bottle))
for i, entry in enumerate(bottle.git):
upstreams_list[i] = dataclasses.replace(
upstreams_list[i],
identity_file=_resolve_identity_file(entry, slug, stage_dir),
)
upstreams = tuple(upstreams_list)
entrypoint = stage_dir / "git_gate_entrypoint.sh" entrypoint = stage_dir / "git_gate_entrypoint.sh"
entrypoint.write_text(git_gate_render_entrypoint(upstreams)) entrypoint.write_text(git_gate_render_entrypoint(upstreams))
entrypoint.chmod(0o600) entrypoint.chmod(0o600)
@@ -156,7 +162,6 @@ __all__ = [
"git_gate_render_entrypoint", "git_gate_render_entrypoint",
"git_gate_render_hook", "git_gate_render_hook",
"git_gate_render_access_hook", "git_gate_render_access_hook",
"provision_git_gate_dynamic_keys",
"revoke_git_gate_provisioned_keys", "revoke_git_gate_provisioned_keys",
"_gitconfig_validate_value", "_gitconfig_validate_value",
"_provision_dynamic_key", "_provision_dynamic_key",
-43
View File
@@ -9,16 +9,10 @@ imported (`deploy_key_provisioner`) to keep its cost off the host path.
from __future__ import annotations from __future__ import annotations
import os import os
import dataclasses
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING
from .log import info from .log import info
from .manifest import ManifestBottle, ManifestGitEntry from .manifest import ManifestBottle, ManifestGitEntry
from .git_gate_render import GitGateUpstream
if TYPE_CHECKING:
from .git_gate import GitGatePlan
def _provision_dynamic_key( def _provision_dynamic_key(
entry: ManifestGitEntry, entry: ManifestGitEntry,
@@ -101,45 +95,8 @@ def _resolve_identity_file(entry: ManifestGitEntry, slug: str, stage_dir: Path)
return entry.IdentityFile return entry.IdentityFile
def provision_git_gate_dynamic_keys(
bottle: ManifestBottle,
plan: "GitGatePlan",
stage_dir: Path,
) -> "GitGatePlan":
"""Provision dynamic git-gate keys and return an updated plan.
This runs during backend launch, after the operator confirms the
preflight. Plan preparation intentionally stays side-effect-light:
dry-runs and aborted launches must not create remote deploy keys.
"""
if not plan.upstreams:
return plan
upstreams_by_name: dict[str, GitGateUpstream] = {
upstream.name: upstream for upstream in plan.upstreams
}
updated: list[GitGateUpstream] = []
for entry in bottle.git:
upstream = upstreams_by_name.get(entry.Name)
if upstream is None:
continue
if entry.Key.provider == "gitea":
identity_file = _provision_dynamic_key(entry, plan.slug, stage_dir)
upstream = dataclasses.replace(upstream, identity_file=identity_file)
updated.append(upstream)
if len(updated) != len(plan.upstreams):
updated_names = {u.name for u in updated}
for upstream in plan.upstreams:
if upstream.name not in updated_names:
updated.append(upstream)
return dataclasses.replace(plan, upstreams=tuple(updated))
__all__ = [ __all__ = [
"revoke_git_gate_provisioned_keys", "revoke_git_gate_provisioned_keys",
"provision_git_gate_dynamic_keys",
"_provision_dynamic_key", "_provision_dynamic_key",
"_resolve_identity_file", "_resolve_identity_file",
] ]
+2 -7
View File
@@ -16,16 +16,11 @@ from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path from pathlib import Path
from urllib.parse import urlsplit from urllib.parse import urlsplit
from .git_gate import GIT_GATE_TIMEOUT_SECS
DEFAULT_PORT = 9420 DEFAULT_PORT = 9420
# Mirrors git_gate_render.GIT_GATE_TIMEOUT_SECS. Duplicated rather than
# imported: this module ships as a flat top-level sibling in the sidecar
# bundle image (see Dockerfile.sidecars), not as part of the bot_bottle
# package, so `bot_bottle.git_gate` and its dependency chain aren't
# available at runtime.
GIT_GATE_TIMEOUT_SECS = 15
# Bound memory use while still allowing ordinary git push packfiles. # Bound memory use while still allowing ordinary git push packfiles.
MAX_BODY_BYTES = 100 * 1024 * 1024 MAX_BODY_BYTES = 100 * 1024 * 1024
@@ -0,0 +1,439 @@
# PRD prd-new: Forge native integration
- **Status:** Draft
- **Author:** claude
- **Created:** 2026-06-29
- **Issue:** #317
## Summary
Add a webhook-driven orchestration layer that lets Gitea issues and PR comments
drive bot-bottle sessions end-to-end with no operator in the loop for the happy
path. An issue assigned to a member of the configured agent org and labelled
with an agent name triggers a headless bottle launch; the bottle processes the
issue, opens a PR, and interacts with the forge through a **forge sidecar**
the agent never touches the Gitea API or its credentials directly. The agent
calls `signal_done(status, summary)` on the sidecar when a work unit is
complete; the sidecar relays that to the orchestrator over a queue dir (the same
pattern as the supervise sidecar), so completion is an unambiguous in-band
signal rather than a comment the orchestrator has to parse. The orchestrator
freezes the bottle. Subsequent PR comments rehydrate the frozen bottle. The
bottle is destroyed when the PR closes.
The forge sidecar is backed by a `Forge` abstract class with per-provider
implementations (Gitea first), so the agent's prompts and the sidecar protocol
stay forge-agnostic. The sidecar logs forge operations semantically ("read PR
description", "posted comment", "signalled done"), giving richer provenance than
post-hoc egress-byte parsing, and enforces a **read-anywhere / write-scoped**
permission model: the agent may read for context but may only write to the
issue and PRs it was assigned.
Run provenance is exposed through a **provenance API** (the sidecar's structured
operation log plus the run's metadata), not posted back into the forge. We do
not surface a provenance footer in the PR — the audit record lives behind the
API where it can be retained and queried, rather than as an editable comment.
The separation of concerns across the two layers: bot-bottle owns the headless
launch primitives, the forge sidecar + `Forge` abstraction, and forge state.
`bot-bottle-orchestrator` (separate binary) owns the webhook listener, bottle
lifecycle loop, and monitoring dashboard; it calls into bot-bottle via
`./cli.py orchestrate`, a thin wrapper command. This PRD covers bot-bottle's
side of that contract.
## Problem
Today an operator must open the TUI, select an agent and bottle, confirm the
preflight, and type prompts interactively. This blocks "issue → PR" automation
and produces no durable audit record of what the agent did. The security model
already provides the right isolation and egress controls, and `start --headless`
(#315) already gives `bot-bottle-orchestrator` a non-interactive launch path.
The missing pieces are a headless `resume` counterpart for rehydrating frozen
bottles, a forge-interaction surface the agent uses to read context, post
comments, and signal completion, and the provenance trail that makes the audit
story legible to reviewers on every PR.
That forge-interaction surface could be built two ways: (2) give the agent the
Gitea API directly with cred-proxy injecting the token, or (3) put a forge
sidecar between the agent and the forge. This PRD takes **option 3**. The
deciding factors: a sidecar `signal_done` call is an unambiguous completion
signal where comment-parsing is a correctness risk that surfaces in production;
the sidecar produces a semantic audit trail rather than HTTP bytes, which is
load-bearing for provenance (the stated product priority); and the sidecar can
enforce scope tighter than repo-wide API-key permissions, reducing blast radius
for a prompt-injected agent. The costs — a second sidecar process per forge run,
a new failure mode if it crashes, and per-forge implementation cost — are
accepted as the price of those properties.
## Goals / Success Criteria
1. Headless launch already exists: `./cli.py start <agent> --headless --prompt`
(#315) runs non-interactively with no TUI selectors or y/N preflight. This
PRD builds on it rather than re-introducing it. The remaining gap is a
matching headless `resume` path (`./cli.py resume --headless`), since
rehydrating a frozen bottle for a new prompt is required by the freeze /
rehydrate loop and `resume` has no non-interactive entry point today.
2. An issue assigned to a member of the configured org (`FORGE_ORG`, default
`bot-bottle`) and labelled `bot-bottle:<agent-name>` is the trigger
convention. Org membership is verified via the Gitea API at event time.
3. Forge-targeted bottles run a **forge sidecar** that exposes a small,
forge-agnostic API (comment/issue/PR CRUD plus `signal_done`) over the same
queue-dir + HTTP/JSON-RPC machinery as the supervise sidecar. The agent calls
the sidecar; it never sees the forge token or forge-specific endpoints.
4. The sidecar is backed by a `Forge` abstract class. Gitea is the first
concrete implementation; adding a forge means a new subclass, not changes to
the agent prompt or sidecar protocol. The sidecar enforces a read-anywhere /
write-scoped model: writes are limited to the assigned issue and its PRs;
reads are unrestricted for context.
5. The agent calls `signal_done(status, summary)` on the sidecar when a work
unit is complete; the sidecar relays it to the orchestrator over a queue dir.
This is the done signal — no comment parsing. A watchdog timeout
(configurable, default 30 min) causes the orchestrator to treat the run as
done-without-self-report if the agent exits without signalling.
6. Run provenance (agent name, bottle name(s), slug, timing, exit code,
gitleaks result, egress summary, and the sidecar's semantic operation log)
is available through a provenance API. It is **not** surfaced as a PR footer
or any other forge comment.
7. Forge state (issue → slug, status) is persisted in a local SQLite database
under `~/.bot-bottle/` and survives orchestrator restarts.
8. `./cli.py orchestrate status` lists active forge-managed bottles and their
issue/PR URLs.
9. Unit tests cover: label parsing, org-membership check path, forge state
store CRUD (SQLite), headless launch arg construction, forge env var
injection, sidecar request dispatch through the `Forge` abstraction,
write-scope enforcement (reject writes outside the assigned issue/PRs), and
`signal_done` queue relay.
## Non-goals
- Webhook signature verification (HMAC-SHA256). Added as a follow-up.
- The `bot-bottle-orchestrator` binary itself — this PRD covers bot-bottle's
side of the interface only. The orchestrator is a separate project.
- GitHub or GitLab support.
- Multiple simultaneous forge bottles per issue.
- Automatic retry on agent error exit.
- Bottle destruction on issue close (PR close only; issue close is ambiguous).
- Concurrent multi-issue handling (one blocking run per orchestrator process).
- A monitoring dashboard (orchestrator-side concern).
- Folding `DeployKeyProvisioner` into the `Forge` abstraction. Deploy-key
provisioning runs at bottle-provision time on the host; the forge sidecar runs
inside the bottle at agent time. The two have different lifecycles and actors,
so coupling them into one class is deferred to a follow-up. This PRD only
shares the Gitea HTTP client between them.
## Design
### Targeting convention
An issue is forge-targeted when **both** hold:
- At least one assignee is a member of the Gitea org named by `FORGE_ORG`
(default `bot-bottle`). Checked via `GET /api/v1/orgs/{org}/members/{user}`.
- At least one label has the prefix `bot-bottle:`. The suffix names the agent
manifest, e.g. `bot-bottle:implementer` → agent `implementer`.
`FORGE_ORG` is read at orchestrate-command startup. It is not embedded in
manifests or state files; the orchestrator stamps its value into log output for
auditability.
An optional label `bot-bottle-bottle:<name>` overrides bottle selection. When
absent the agent's default bottle is used.
### `./cli.py orchestrate` — the thin wrapper
```
./cli.py orchestrate start --agent AGENT [--bottle BOTTLE ...] --prompt PROMPT
[--label LABEL] [--backend BACKEND]
./cli.py orchestrate resume --slug SLUG --prompt PROMPT [--backend BACKEND]
./cli.py orchestrate status
```
`orchestrate start` is a thin shim over the already-shipped `start --headless`
(#315): it forwards agent / bottle / label / prompt and adds the forge-specific
wiring (`forge_env`, sidecar launch). It does not re-implement headless launch.
The caller (`bot-bottle-orchestrator`) manages freeze, state, and the forge
sidecar's done signal around it.
`orchestrate resume` is the shim over the new `resume --headless` (below).
`orchestrate status` prints the forge state table.
### Headless primitives — what exists vs. what's new
Headless **start** already shipped in #315 and this PRD reuses it as-is:
- `./cli.py start <agent> --headless --prompt TEXT` — no TUI selectors, no y/N
preflight. Internally `_start_headless()` calls the shared `_launch_bottle()`
with `assume_yes=True` and `headless_prompt_text=prompt`.
- The prompt is delivered through `AgentProvider.headless_prompt(prompt)`
claude `-p`, codex positional, pi `-p`. The orchestrator does **not** hand-roll
agent args; it relies on this provider abstraction. (An earlier draft proposed
`start_headless` / `attach_agent_headless` helpers that constructed
`--no-interactive`/`-p` directly — those are dropped as redundant with, and
divergent from, what #315 merged.)
Two additions are needed on top of #315:
**1. A `forge_env` hook on the headless launch path.** The orchestrator needs to
pass forge context + token through to the forge sidecar launched alongside the
agent. This is a parameter threaded into `_launch_bottle` (the same core
`start --headless` already uses), not a parallel launch function. The agent
process itself does not receive the token.
**2. `resume --headless`** — new in `bot_bottle/cli/resume.py`, mirroring the
`--headless` flag on `start`:
```
./cli.py resume <slug> --headless --prompt TEXT
```
It rehydrates a frozen bottle and runs one headless prompt via the same
`assume_yes` + `headless_prompt` path, returning the agent's exit code. `resume`
has no non-interactive entry point today, so this is genuinely new work rather
than a rename of an existing helper.
### Forge sidecar
Forge-targeted bottles run a forge sidecar alongside the agent, mirroring the
supervise sidecar: a per-bottle process that exposes an HTTP/JSON-RPC endpoint
over a Unix socket and relays events to the orchestrator through a queue dir.
The agent calls the sidecar; the sidecar holds the forge token and makes the
actual forge API calls. The agent never receives the credential and never sees a
forge-specific endpoint — swapping Gitea for another forge does not change the
agent prompt or the sidecar protocol.
The sidecar is configured at launch from the forge context (owner, repo, issue,
PR) and the token, supplied by the orchestrator — not baked into the agent
manifest. Because the sidecar owns the token, forge traffic does not need a
cred-proxy egress route on the agent; the agent's egress policy is unchanged by
forge targeting.
**Sidecar protocol** (forge-agnostic; each method maps to a `Forge` call):
| Method | Scope | Purpose |
|---|---|---|
| `read_issue(number)` | read-anywhere | Read an issue body for context |
| `read_pr(number)` | read-anywhere | Read a PR (incl. merge state) for context |
| `read_comments(number)` | read-anywhere | Read a thread for context |
| `post_comment(number, body)` | write-scoped | Post to the assigned issue/PR |
| `update_description(number, body)` | write-scoped | Edit the assigned issue/PR body |
| `signal_done(status, summary)` | — | Relay completion to the orchestrator |
Issues and PRs are distinct domain objects (`Issue` vs `PullRequest`) read
through distinct methods; a PR carries merge state an issue does not.
**Scope enforcement** is read-anywhere / write-scoped: read methods accept any
issue/PR number for context; write methods are rejected unless the target is the
assigned issue or one of its PRs. This is tighter than Gitea's repo-wide API-key
permissions and bounds the blast radius of a prompt-injected agent. Rejections
are logged semantically (operation, target, reason) so the audit trail records
attempted out-of-scope writes, not just allowed ones.
**Semantic audit**: every sidecar call is logged as a structured operation
("read PR #318 description", "posted comment to #317", "signalled done:
success") rather than as opaque HTTP bytes. This log feeds provenance directly,
with no post-hoc egress-log parsing.
### `Forge` abstraction — `bot_bottle/contrib/forge/`
The sidecar dispatches to a `Forge` abstract class. Each provider implements the
operations behind the sidecar protocol:
```python
class Forge(abc.ABC):
@abc.abstractmethod
def read_issue(self, number: int) -> Issue: ...
@abc.abstractmethod
def read_pr(self, number: int) -> PullRequest: ...
@abc.abstractmethod
def read_comments(self, number: int) -> list[Comment]: ...
@abc.abstractmethod
def post_comment(self, number: int, body: str) -> None: ...
@abc.abstractmethod
def update_description(self, number: int, body: str) -> None: ...
@abc.abstractmethod
def is_org_member(self, org: str, username: str) -> bool: ...
@abc.abstractmethod
def get_pr_for_issue(self, number: int) -> int | None: ...
@abc.abstractmethod
def is_pr_open(self, number: int) -> bool: ...
```
`Issue` and `PullRequest` are separate frozen dataclasses — a PR adds `merged`.
`ScopedForge` wraps a concrete `Forge` to enforce the read-anywhere /
write-scoped model (`post_comment` / `update_description` raise `ForgeScopeError`
outside the assigned issue and PRs).
`GiteaForge` is the first and only concrete implementation in this PRD. It wraps
the Gitea HTTP client (below). Adding GitHub or GitLab later is a new subclass;
the sidecar, protocol, and agent prompt are untouched.
> **Deferred:** `DeployKeyProvisioner` is *not* folded into `Forge` here.
> Deploy-key provisioning runs on the host at provision time; the sidecar runs
> in the bottle at agent time. They have different lifecycles and actors, so a
> shared abstract base would couple two unrelated auth contexts. For now they
> only share the Gitea HTTP client; a later PRD can revisit unification.
### Forge env vars
The orchestrator passes forge context to the **sidecar** (not the agent) at
launch. The agent does not need owner/repo/issue env vars to construct API
calls, since it only names issue/PR numbers to the sidecar:
| Var | Example | Purpose |
|---|---|---|
| `FORGE_GITEA_API` | `https://gitea.dideric.is/api/v1` | Base URL the sidecar calls |
| `FORGE_OWNER` | `didericis` | Repo owner |
| `FORGE_REPO` | `bot-bottle` | Repo name |
| `FORGE_ISSUE_NUMBER` | `317` | Assigned issue (defines write scope) |
| `FORGE_PR_NUMBER` | `318` | Assigned PR (empty until PR exists) |
The agent's forge-specific prompt instructs it to call `signal_done` on the
sidecar when a work unit is complete, and to use the sidecar for any
comment/description writes. The instruction is forge-agnostic and is part of the
forge prompt overlay, not the base agent manifest, so non-forge runs are
unaffected.
### Done signal and watchdog
The agent calls `signal_done(status, summary)` on the sidecar when it finishes a
work unit. The sidecar writes the event to its queue dir; the orchestrator reads
it and:
1. Reads the forge state for `(owner, repo, issue_number)`.
2. If `status == "running"`, treats the event as the done signal: freezes the
bottle and sets `status = "frozen"`. Provenance is recorded via the
provenance API — no comment is posted to the forge.
Because completion is an explicit `signal_done` call, the orchestrator does not
parse comment text to detect "done", and intermediate comments the agent posts
mid-run cannot be mistaken for completion.
**Watchdog**: the orchestrator tracks `last_checkin_at` in forge state, updated
on each sidecar event. A background thread wakes every minute. If
`now - last_checkin_at > FORGE_WATCHDOG_TIMEOUT` (default 30 min, configurable
via env) and `status == "running"`, the orchestrator treats the run as
done-without-self-report and freezes the bottle, flagging the run as incomplete
in the provenance record.
**Sidecar-death failure mode**: if the forge sidecar crashes mid-run the agent
loses forge access while the bottle is otherwise healthy. The orchestrator
detects a dead sidecar (socket/queue gone) the same way it detects a stalled
agent and falls back to the watchdog path.
### Forge state — `bot_bottle/contrib/gitea/forge_state.py`
State is stored in a local SQLite database at `~/.bot-bottle/bot-bottle.db`.
Access goes through a thin CRUD interface, `ForgeStateStore`, so the storage
location/engine can be swapped without touching callers. `SqliteForgeStateStore`
is the first implementation.
The `forge_state` table is keyed by `(owner, repo, issue_number)` and carries:
`slug`, `agent_name`, `bottle_names` (JSON), `backend_name`, `agent_git_user`,
`pr_number` (nullable), `status`, `last_checkin_at`.
`status`: `"running"` | `"frozen"` | `"destroyed"`.
Store interface:
```python
class ForgeStateStore(abc.ABC):
def upsert(self, state: ForgeState) -> None: ...
def get(self, owner: str, repo: str, issue_number: int) -> ForgeState | None: ...
def delete(self, owner: str, repo: str, issue_number: int) -> None: ...
def all(self) -> list[ForgeState]: ...
class SqliteForgeStateStore(ForgeStateStore):
def __init__(self, db_path: Path | None = None) -> None: ...
```
`upsert` uses `INSERT OR REPLACE` so a re-run for the same issue overwrites in
place. The schema is created on first open.
### Provenance API
Run provenance — agent, bottle(s), slug, timing, exit code, gitleaks result,
egress summary, watchdog-fired flag, and the sidecar's semantic operation log —
is exposed through a **provenance API**, not posted into the forge. There is no
provenance footer or run-summary comment.
The rationale (per the monetization positioning): a PR comment is mutable by any
maintainer, unsigned, and per-PR, so it is worthless as an audit record and
invites false trust. The authoritative record therefore lives behind the API,
where it can be retained, queried, and (eventually) signed. Whether any
projection of it ever appears in the forge is a separate, out-of-scope decision;
this PR does not build one.
The API surface itself (schema, transport, signing, retention) is **out of scope
for this PRD** and belongs with the orchestrator / control-plane work. bot-bottle
here only produces the raw material: the sidecar's semantic operation log and the
run metadata the orchestrator collects.
### Gitea HTTP client — `bot_bottle/contrib/gitea/client.py`
`GiteaForge` (and the existing `GiteaDeployKeyProvisioner`) share one thin HTTP
client. Unlike the option-2 design, the token is held by the sidecar process and
passed to the client directly — there is no agent-side cred-proxy route to
inject it, because the agent never makes forge calls.
```python
class GiteaClient:
def __init__(self, *, api_url: str, owner: str, repo: str, token: str) -> None: ...
def is_org_member(self, org: str, username: str) -> bool: ...
def get_issue(self, number: int) -> dict: ...
def get_comments(self, number: int) -> list[dict]: ...
def post_comment(self, number: int, body: str) -> None: ...
def patch_issue_body(self, number: int, body: str) -> None: ...
def get_pull(self, number: int) -> dict: ...
```
`GiteaForge` adapts this client to the `Forge` surface (mapping raw JSON to
`Issue` / `PullRequest` / `Comment`). Sharing only the HTTP client (not an
abstract base) is the deliberate boundary between the sidecar and the deploy-key
provisioner — see the deferral note under the `Forge` abstraction.
### Implementation chunks
1. **Headless additions on top of #315** — thread a `forge_env` parameter into
the existing `_launch_bottle` core (the one `start --headless` already uses);
add a `--headless` path to `cli/resume.py` reusing `assume_yes` +
`headless_prompt`. No new `start_headless`/`attach_agent_headless` helpers.
Tests: `forge_env` reaches the sidecar/`guest_env`; `resume --headless` skips
the TUI and y/N preflight and returns the agent exit code.
2. **Forge state**`contrib/gitea/forge_state.py`: `ForgeState` dataclass,
`ForgeStateStore` CRUD interface, `SqliteForgeStateStore`. Tests: round-trip,
missing → None, `INSERT OR REPLACE` upsert, delete idempotent, `all()`
ordering, persistence across store instances.
3. **`Forge` abstraction + Gitea client** — `contrib/forge/base.py` (`Forge`
ABC, `ScopedForge`, `Issue` / `PullRequest` / `Comment`) and
`contrib/gitea/client.py` + `GiteaForge`: `is_org_member`, `read_issue`,
`read_pr`, `read_comments`, `post_comment`, `update_description`,
`get_pr_for_issue`, `is_pr_open`. Tests: mock `urllib.request.urlopen`,
assert payloads and 404-as-false for membership; `ScopedForge` write-scope
enforcement.
4. **Forge sidecar** — sidecar process exposing the protocol over a Unix socket,
queue-dir relay, write-scope enforcement, semantic op log, `signal_done`.
Reuses the supervise sidecar bundle machinery. Tests: dispatch each method to
the `Forge`, reject out-of-scope writes, `signal_done` writes a queue event,
scope-rejection is logged.
5. **`./cli.py orchestrate`** — `cli/orchestrate.py` with `start`, `resume`,
`status` subcommands wired into `cli.py`; `start` launches the forge sidecar
alongside the agent for forge-targeted runs. Tests: arg parsing, `start`
delegates to `start --headless`, `resume` delegates to `resume --headless`.
## Provenance
Run provenance is captured (sidecar semantic operation log + run metadata) and
exposed through a provenance API. It is deliberately **not** surfaced in the
forge — no footer, no run-summary comment. A mutable, unsigned PR comment is not
an audit record; the authoritative record lives behind the API where it can be
retained and signed. The `watchdog_fired` flag marks runs where the agent did
not self-report completion so consumers of the API know the record may be
incomplete.
The provenance API's schema, transport, signing, and retention are out of scope
for this PRD (control-plane work); bot-bottle here produces the raw material
only.
+267
View File
@@ -0,0 +1,267 @@
"""Unit: bot_bottle public Python API (bot_bottle/__init__.py surface).
Covers start_headless, resume_headless, freeze, and destroy the four
operations the bot-bottle-orchestrator's ProgrammaticBottleRunner uses.
All I/O is stubbed so no container is created.
"""
from __future__ import annotations
import unittest
from unittest.mock import MagicMock, patch
from bot_bottle import BottleError, destroy, freeze, resume_headless, start_headless
# ---------------------------------------------------------------------------
# helpers
# ---------------------------------------------------------------------------
def _make_manifest(agent_name: str = "implementer", bottle_name: str = "claude"):
manifest = MagicMock()
manifest.agents = {agent_name: MagicMock(bottle=bottle_name)}
manifest.all_agent_names = [agent_name]
manifest.all_bottle_names = [bottle_name]
manifest.home_md = None # eager mode — _peek_agent_bottle uses agents dict
manifest.require_agent = MagicMock(return_value=None)
return manifest
def _metadata(
slug: str = "implementer-abc12",
agent_name: str = "implementer",
backend: str = "docker",
):
md = MagicMock()
md.identity = slug
md.agent_name = agent_name
md.cwd = "/repo"
md.copy_cwd = False
md.bottle_names = ["claude"]
md.backend = backend
return md
# ---------------------------------------------------------------------------
# start_headless
# ---------------------------------------------------------------------------
class TestStartHeadless(unittest.TestCase):
def setUp(self) -> None:
self._manifest = _make_manifest()
patch("bot_bottle.api.ManifestIndex.resolve", return_value=self._manifest).start()
self._launch = patch(
"bot_bottle.api._launch_bottle", return_value=("implementer-abc12", 0)
).start()
patch(
"bot_bottle.api._uniquify_label_headless", side_effect=lambda lbl: lbl
).start()
self.addCleanup(patch.stopall)
def _spec(self):
self._launch.assert_called_once()
return self._launch.call_args[0][0]
def test_returns_slug_on_success(self):
slug = start_headless("implementer", prompt="Do it")
self.assertEqual("implementer-abc12", slug)
def test_passes_assume_yes_and_prompt(self):
start_headless("implementer", prompt="Do it")
kwargs = self._launch.call_args[1]
self.assertTrue(kwargs["assume_yes"])
self.assertEqual("Do it", kwargs["headless_prompt_text"])
def test_explicit_bottles_forwarded(self):
start_headless("implementer", prompt="Do it", bottles=["dev", "claude"])
self.assertEqual(("dev", "claude"), self._spec().bottle_names)
def test_default_bottle_resolved_from_manifest(self):
start_headless("implementer", prompt="Do it")
self.assertEqual(("claude",), self._spec().bottle_names)
def test_forge_env_on_spec(self):
env = {"FORGE_GITEA_API": "https://gitea.example.com/api/v1", "FORGE_OWNER": "acme"}
start_headless("implementer", prompt="Do it", forge_env=env)
self.assertEqual(env, self._spec().forge_env)
def test_no_forge_env_defaults_to_empty_dict(self):
start_headless("implementer", prompt="Do it")
self.assertEqual({}, self._spec().forge_env)
def test_nonzero_exit_raises_bottle_error(self):
self._launch.return_value = ("implementer-abc12", 1)
with self.assertRaises(BottleError) as ctx:
start_headless("implementer", prompt="Do it")
self.assertEqual(1, ctx.exception.exit_code)
def test_no_default_bottle_raises_bottle_error(self):
manifest = _make_manifest(bottle_name="")
with patch("bot_bottle.api.ManifestIndex.resolve", return_value=manifest):
with self.assertRaises(BottleError):
start_headless("implementer", prompt="Do it")
self._launch.assert_not_called()
def test_backend_name_forwarded(self):
start_headless("implementer", prompt="Do it", backend_name="docker")
self.assertEqual("docker", self._launch.call_args[1]["backend_name"])
def test_label_forwarded_to_spec(self):
start_headless("implementer", prompt="Do it", label="nightly")
self.assertEqual("nightly", self._spec().label)
def test_color_forwarded_to_spec(self):
start_headless("implementer", prompt="Do it", color="green")
self.assertEqual("green", self._spec().color)
# ---------------------------------------------------------------------------
# resume_headless
# ---------------------------------------------------------------------------
class TestResumeHeadless(unittest.TestCase):
def setUp(self) -> None:
self._md = _metadata()
patch("bot_bottle.api.read_metadata", return_value=self._md).start()
manifest = _make_manifest()
patch("bot_bottle.api.ManifestIndex.resolve", return_value=manifest).start()
self._launch = patch(
"bot_bottle.api._launch_bottle", return_value=("implementer-abc12", 0)
).start()
self.addCleanup(patch.stopall)
def _spec(self):
self._launch.assert_called_once()
return self._launch.call_args[0][0]
def test_passes_assume_yes_and_prompt(self):
resume_headless("implementer-abc12", prompt="Address review")
kwargs = self._launch.call_args[1]
self.assertTrue(kwargs["assume_yes"])
self.assertEqual("Address review", kwargs["headless_prompt_text"])
def test_identity_set_on_spec(self):
resume_headless("implementer-abc12", prompt="Prompt")
self.assertEqual("implementer-abc12", self._spec().identity)
def test_forge_env_on_spec(self):
env = {"FORGE_ISSUE_NUMBER": "42"}
resume_headless("implementer-abc12", prompt="Prompt", forge_env=env)
self.assertEqual(env, self._spec().forge_env)
def test_missing_state_raises_bottle_error(self):
with patch("bot_bottle.api.read_metadata", return_value=None):
with self.assertRaises(BottleError):
resume_headless("no-such-abc12", prompt="Prompt")
self._launch.assert_not_called()
def test_nonzero_exit_raises_bottle_error(self):
self._launch.return_value = ("implementer-abc12", 2)
with self.assertRaises(BottleError) as ctx:
resume_headless("implementer-abc12", prompt="Prompt")
self.assertEqual(2, ctx.exception.exit_code)
def test_backend_from_metadata_when_not_supplied(self):
resume_headless("implementer-abc12", prompt="Prompt")
self.assertEqual("docker", self._launch.call_args[1]["backend_name"])
def test_explicit_backend_overrides_metadata(self):
resume_headless(
"implementer-abc12", prompt="Prompt", backend_name="smolmachines"
)
self.assertEqual("smolmachines", self._launch.call_args[1]["backend_name"])
# ---------------------------------------------------------------------------
# freeze
# ---------------------------------------------------------------------------
class TestFreeze(unittest.TestCase):
def setUp(self) -> None:
patch("bot_bottle.api.read_metadata", return_value=_metadata()).start()
self._freezer = MagicMock()
self._get_freezer = patch(
"bot_bottle.api.get_freezer", return_value=self._freezer
).start()
self.addCleanup(patch.stopall)
def test_calls_commit_slug(self):
freeze("implementer-abc12")
self._freezer.commit_slug.assert_called_once_with("implementer-abc12")
def test_backend_from_metadata_when_not_supplied(self):
freeze("implementer-abc12")
self._get_freezer.assert_called_once_with("docker")
def test_explicit_backend_used(self):
freeze("implementer-abc12", backend_name="smolmachines")
self._get_freezer.assert_called_once_with("smolmachines")
def test_commit_cancelled_raises_bottle_error(self):
from bot_bottle.backend.freeze import CommitCancelled
self._freezer.commit_slug.side_effect = CommitCancelled("declined")
with self.assertRaises(BottleError):
freeze("implementer-abc12")
# ---------------------------------------------------------------------------
# destroy
# ---------------------------------------------------------------------------
class TestDestroy(unittest.TestCase):
def setUp(self) -> None:
patch("bot_bottle.api.read_metadata", return_value=_metadata()).start()
self._dd = patch("bot_bottle.api._destroy_docker").start()
patch("bot_bottle.api.clear_preserve_marker").start()
self._cleanup = patch("bot_bottle.api.cleanup_state").start()
self.addCleanup(patch.stopall)
def test_docker_backend_calls_destroy_docker(self):
destroy("implementer-abc12")
self._dd.assert_called_once_with("implementer-abc12")
def test_state_dir_always_cleaned(self):
destroy("implementer-abc12")
self._cleanup.assert_called_once_with("implementer-abc12")
def test_smolmachines_backend_calls_destroy_smolmachines(self):
patch(
"bot_bottle.api.read_metadata",
return_value=_metadata(backend="smolmachines"),
).start()
ds = patch("bot_bottle.api._destroy_smolmachines").start()
destroy("implementer-abc12")
ds.assert_called_once_with("implementer-abc12")
self._dd.assert_not_called()
def test_missing_metadata_defaults_to_docker(self):
patch("bot_bottle.api.read_metadata", return_value=None).start()
destroy("no-state-abc12")
self._dd.assert_called_once_with("no-state-abc12")
def test_explicit_backend_overrides_metadata(self):
ds = patch("bot_bottle.api._destroy_smolmachines").start()
destroy("implementer-abc12", backend_name="smolmachines")
ds.assert_called_once_with("implementer-abc12")
self._dd.assert_not_called()
# ---------------------------------------------------------------------------
# public surface exported from bot_bottle.__init__
# ---------------------------------------------------------------------------
class TestPublicSurface(unittest.TestCase):
def test_importable_from_package(self):
import bot_bottle
for name in ("BottleError", "start_headless", "resume_headless", "freeze", "destroy"):
self.assertTrue(hasattr(bot_bottle, name), f"missing: {name}")
if __name__ == "__main__":
unittest.main()
+75
View File
@@ -0,0 +1,75 @@
"""Unit: `cli.py resume --headless` non-interactive rehydrate path.
The freeze / rehydrate loop needs a non-interactive `resume`: deliver a
follow-up prompt and skip the y/N preflight, reusing the same launch
core (`assume_yes` + `headless_prompt_text`) as `start --headless`.
"""
from __future__ import annotations
import unittest
from typing import Any
from unittest.mock import MagicMock, patch
import bot_bottle.cli.resume as resume_mod
from bot_bottle.log import Die
def _metadata():
md = MagicMock()
md.agent_name = "implementer"
md.copy_cwd = False
md.cwd = "/repo"
md.identity = "implementer-abc12"
md.bottle_names = ["claude"]
md.backend = "docker"
return md
class ResumeHeadlessTest(unittest.TestCase):
def setUp(self) -> None:
self._launch = patch.object(
resume_mod, "_launch_bottle", return_value=("implementer-abc12", 0)
).start()
patch.object(
resume_mod, "read_metadata", return_value=_metadata()
).start()
manifest = MagicMock()
manifest.require_agent = MagicMock(return_value=None)
patch.object(
resume_mod.ManifestIndex, "resolve", return_value=manifest
).start()
self.addCleanup(patch.stopall)
def _launch_kwargs(self) -> dict[str, Any]:
self._launch.assert_called_once()
return dict(self._launch.call_args.kwargs)
def test_headless_passes_assume_yes_and_prompt(self):
rc = resume_mod.cmd_resume(
["implementer-abc12", "--headless", "--prompt", "Address the review"]
)
self.assertEqual(0, rc)
kwargs = self._launch_kwargs()
self.assertTrue(kwargs["assume_yes"])
self.assertEqual("Address the review", kwargs["headless_prompt_text"])
def test_interactive_resume_unchanged(self):
resume_mod.cmd_resume(["implementer-abc12"])
kwargs = self._launch_kwargs()
self.assertFalse(kwargs["assume_yes"])
self.assertEqual("", kwargs["headless_prompt_text"])
def test_headless_without_prompt_errors(self):
with self.assertRaises(Die):
resume_mod.cmd_resume(["implementer-abc12", "--headless"])
self._launch.assert_not_called()
def test_prompt_without_headless_errors(self):
with self.assertRaises(Die):
resume_mod.cmd_resume(["implementer-abc12", "--prompt", "hi"])
self._launch.assert_not_called()
if __name__ == "__main__":
unittest.main()
+1 -1
View File
@@ -56,7 +56,7 @@ class TestCmdStartHeadless(unittest.TestCase):
return_value=self._manifest, return_value=self._manifest,
).start() ).start()
self._launch_mock = patch( self._launch_mock = patch(
"bot_bottle.cli.start._launch_bottle", return_value=0 "bot_bottle.cli.start._launch_bottle", return_value=("", 0)
).start() ).start()
# No bottles running by default → no label collision. # No bottles running by default → no label collision.
patch( patch(
+2 -2
View File
@@ -45,7 +45,7 @@ class TestCmdStartSelector(unittest.TestCase):
self._launch_patch = patch( self._launch_patch = patch(
"bot_bottle.cli.start._launch_bottle", "bot_bottle.cli.start._launch_bottle",
return_value=0, return_value=("", 0),
) )
self._launch_mock = self._launch_patch.start() self._launch_mock = self._launch_patch.start()
@@ -211,7 +211,7 @@ class TestCmdStartLabelCollision(unittest.TestCase):
self._manifest = _make_manifest(["researcher"], ["claude"]) self._manifest = _make_manifest(["researcher"], ["claude"])
patch("bot_bottle.cli.start.ManifestIndex.resolve", return_value=self._manifest).start() patch("bot_bottle.cli.start.ManifestIndex.resolve", return_value=self._manifest).start()
self._launch_mock = patch( self._launch_mock = patch(
"bot_bottle.cli.start._launch_bottle", return_value=0, "bot_bottle.cli.start._launch_bottle", return_value=("", 0),
).start() ).start()
# Stub the bottle picker to always return a selection. # Stub the bottle picker to always return a selection.
patch.object(tui_mod, "filter_multiselect", return_value=["claude"]).start() patch.object(tui_mod, "filter_multiselect", return_value=["claude"]).start()
+107
View File
@@ -0,0 +1,107 @@
"""Unit: Forge abstraction + ScopedForge (PRD forge-native-integration)."""
from __future__ import annotations
import unittest
from bot_bottle.contrib.forge.base import (
Comment,
Forge,
ForgeScopeError,
Issue,
PullRequest,
ScopedForge,
)
class _RecordingForge(Forge):
"""In-memory fake that records writes."""
def __init__(self) -> None:
self.comments: list[tuple[int, str]] = []
self.descriptions: list[tuple[int, str]] = []
def read_issue(self, number: int) -> Issue:
return Issue(number=number, title="t", body="b", state="open")
def read_pr(self, number: int) -> PullRequest:
return PullRequest(
number=number, title="pr", body="b", state="open", merged=False
)
def read_comments(self, number: int) -> list[Comment]:
return [Comment(id=1, user="alice", body="hi")]
def post_comment(self, number: int, body: str) -> None:
self.comments.append((number, body))
def update_description(self, number: int, body: str) -> None:
self.descriptions.append((number, body))
def is_org_member(self, org: str, username: str) -> bool:
return username == "member"
def get_pr_for_issue(self, number: int) -> int | None:
return 99 if number == 17 else None
def is_pr_open(self, number: int) -> bool:
return True
class TestScopedForgeReads(unittest.TestCase):
def setUp(self) -> None:
self.inner = _RecordingForge()
self.scoped = ScopedForge(self.inner, assigned_issue=17, assigned_prs=[42])
def test_reads_pass_through_to_any_number(self):
# A number well outside the writable scope still reads fine.
self.assertEqual(123, self.scoped.read_issue(123).number)
self.assertEqual("alice", self.scoped.read_comments(500)[0].user)
def test_read_pr_passes_through(self):
pr = self.scoped.read_pr(999)
self.assertIsInstance(pr, PullRequest)
self.assertEqual(999, pr.number)
self.assertFalse(pr.merged)
def test_membership_and_pr_lookups_delegate(self):
self.assertTrue(self.scoped.is_org_member("bot-bottle", "member"))
self.assertFalse(self.scoped.is_org_member("bot-bottle", "stranger"))
self.assertEqual(99, self.scoped.get_pr_for_issue(17))
self.assertTrue(self.scoped.is_pr_open(8000))
class TestScopedForgeWrites(unittest.TestCase):
def setUp(self) -> None:
self.inner = _RecordingForge()
self.scoped = ScopedForge(self.inner, assigned_issue=17, assigned_prs=[42])
def test_writable_set_is_issue_plus_prs(self):
self.assertEqual(frozenset({17, 42}), self.scoped.writable)
def test_write_to_assigned_issue_allowed(self):
self.scoped.post_comment(17, "done")
self.assertEqual([(17, "done")], self.inner.comments)
def test_write_to_assigned_pr_allowed(self):
self.scoped.update_description(42, "new body")
self.assertEqual([(42, "new body")], self.inner.descriptions)
def test_comment_outside_scope_rejected(self):
with self.assertRaises(ForgeScopeError) as ctx:
self.scoped.post_comment(500, "spam")
self.assertIn("500", str(ctx.exception))
self.assertEqual([], self.inner.comments)
def test_description_outside_scope_rejected(self):
with self.assertRaises(ForgeScopeError):
self.scoped.update_description(500, "tamper")
self.assertEqual([], self.inner.descriptions)
def test_scope_error_is_permission_error(self):
# Sidecars can catch the stdlib base type.
self.assertIn(PermissionError, ForgeScopeError.__mro__)
if __name__ == "__main__":
unittest.main()
+145
View File
@@ -0,0 +1,145 @@
"""Unit: GiteaClient + GiteaForge (PRD forge-native-integration)."""
from __future__ import annotations
import json
import unittest
import urllib.error
from io import BytesIO
from unittest.mock import MagicMock, patch
from bot_bottle.contrib.gitea.client import GiteaClient, GiteaForge
def _client() -> GiteaClient:
return GiteaClient(
api_url="https://gitea.example.com/api/v1",
owner="didericis",
repo="bot-bottle",
token="test-token",
)
def _resp(body: object, status: int = 200) -> MagicMock:
resp = MagicMock()
resp.read.return_value = json.dumps(body).encode() if body is not None else b""
resp.status = status
resp.__enter__ = lambda s: s # type: ignore
resp.__exit__ = MagicMock(return_value=False)
return resp
def _http_error(code: int, body: str = "") -> urllib.error.HTTPError:
return urllib.error.HTTPError(
url="http://x", code=code, msg="err", hdrs=None, # type: ignore[arg-type]
fp=BytesIO(body.encode()),
)
_URLOPEN = "bot_bottle.contrib.gitea.client.urllib.request.urlopen"
class TestOrgMembership(unittest.TestCase):
def test_member_returns_true_on_2xx(self):
with patch(_URLOPEN, return_value=_resp(None, 204)) as m:
self.assertTrue(_client().is_org_member("bot-bottle", "alice"))
req = m.call_args.args[0]
self.assertIn("/orgs/bot-bottle/members/alice", req.full_url)
def test_nonmember_returns_false_on_404(self):
with patch(_URLOPEN, side_effect=_http_error(404)):
self.assertFalse(_client().is_org_member("bot-bottle", "stranger"))
def test_other_http_error_raises(self):
with patch(_URLOPEN, side_effect=_http_error(403, "forbidden")):
with self.assertRaises(RuntimeError) as ctx:
_client().is_org_member("bot-bottle", "alice")
self.assertIn("403", str(ctx.exception))
class TestForgeReads(unittest.TestCase):
def test_read_issue_maps_fields(self):
raw = {"number": 17, "title": "Bug", "body": "broken", "state": "open"}
with patch(_URLOPEN, return_value=_resp(raw)) as m:
issue = GiteaForge(_client()).read_issue(17)
self.assertEqual((17, "Bug", "broken", "open"),
(issue.number, issue.title, issue.body, issue.state))
self.assertIn("/repos/didericis/bot-bottle/issues/17",
m.call_args.args[0].full_url)
def test_read_issue_tolerates_null_body(self):
raw = {"number": 17, "title": "T", "body": None, "state": "open"}
with patch(_URLOPEN, return_value=_resp(raw)):
self.assertEqual("", GiteaForge(_client()).read_issue(17).body)
def test_read_comments_maps_user_login(self):
raw = [
{"id": 1, "user": {"login": "alice"}, "body": "hi"},
{"id": 2, "user": {"login": "bob"}, "body": "yo"},
]
with patch(_URLOPEN, return_value=_resp(raw)):
comments = GiteaForge(_client()).read_comments(17)
self.assertEqual(["alice", "bob"], [c.user for c in comments])
self.assertEqual([1, 2], [c.id for c in comments])
class TestForgeWrites(unittest.TestCase):
def test_post_comment_payload_and_url(self):
with patch(_URLOPEN, return_value=_resp(None, 201)) as m:
GiteaForge(_client()).post_comment(17, "done ✓")
req = m.call_args.args[0]
self.assertEqual("POST", req.method)
self.assertIn("/repos/didericis/bot-bottle/issues/17/comments", req.full_url)
self.assertEqual("done ✓", json.loads(req.data)["body"])
def test_update_description_patches_issue(self):
with patch(_URLOPEN, return_value=_resp(None, 200)) as m:
GiteaForge(_client()).update_description(17, "edited")
req = m.call_args.args[0]
self.assertEqual("PATCH", req.method)
self.assertTrue(req.full_url.endswith("/issues/17"))
self.assertEqual("edited", json.loads(req.data)["body"])
def test_auth_header_sent(self):
with patch(_URLOPEN, return_value=_resp(None, 201)) as m:
GiteaForge(_client()).post_comment(17, "x")
self.assertEqual("token test-token",
m.call_args.args[0].headers["Authorization"])
class TestPRHelpers(unittest.TestCase):
def test_get_pr_for_issue_returns_number_when_issue_is_pr(self):
raw = {"number": 18, "pull_request": {"merged": False}}
with patch(_URLOPEN, return_value=_resp(raw)):
self.assertEqual(18, GiteaForge(_client()).get_pr_for_issue(18))
def test_get_pr_for_issue_none_for_plain_issue(self):
raw = {"number": 17, "pull_request": None}
with patch(_URLOPEN, return_value=_resp(raw)):
self.assertIsNone(GiteaForge(_client()).get_pr_for_issue(17))
def test_is_pr_open_true_when_state_open(self):
with patch(_URLOPEN, return_value=_resp({"state": "open"})):
self.assertTrue(GiteaForge(_client()).is_pr_open(18))
def test_is_pr_open_false_when_closed(self):
with patch(_URLOPEN, return_value=_resp({"state": "closed"})):
self.assertFalse(GiteaForge(_client()).is_pr_open(18))
def test_read_pr_maps_fields_including_merged(self):
raw = {"number": 18, "title": "Fix", "body": "patch",
"state": "closed", "merged": True}
with patch(_URLOPEN, return_value=_resp(raw)) as m:
pr = GiteaForge(_client()).read_pr(18)
self.assertEqual((18, "Fix", "patch", "closed", True),
(pr.number, pr.title, pr.body, pr.state, pr.merged))
self.assertIn("/repos/didericis/bot-bottle/pulls/18",
m.call_args.args[0].full_url)
def test_read_pr_merged_defaults_false(self):
with patch(_URLOPEN, return_value=_resp({"number": 18, "state": "open"})):
self.assertFalse(GiteaForge(_client()).read_pr(18).merged)
if __name__ == "__main__":
unittest.main()
@@ -0,0 +1,99 @@
"""Unit: SQLite forge state store (PRD forge-native-integration)."""
from __future__ import annotations
import tempfile
import unittest
from dataclasses import replace
from pathlib import Path
from bot_bottle.contrib.gitea.forge_state import (
STATUS_FROZEN,
STATUS_RUNNING,
ForgeState,
SqliteForgeStateStore,
)
def _state(**over: object) -> ForgeState:
base = ForgeState(
owner="didericis",
repo="bot-bottle",
issue_number=17,
slug="implementer-abc12",
agent_name="implementer",
bottle_names=["claude"],
backend_name="docker",
agent_git_user="didericis-claude",
pr_number=42,
status=STATUS_FROZEN,
last_checkin_at="2026-06-29T12:04:12-04:00",
)
return replace(base, **over)
class ForgeStateStoreTest(unittest.TestCase):
def setUp(self) -> None:
tmp = Path(self.enterContext(tempfile.TemporaryDirectory())) # pylint: disable=consider-using-with
self.store = SqliteForgeStateStore(tmp / "sub" / "bot-bottle.db")
def test_round_trip(self):
self.store.upsert(_state())
self.assertEqual(_state(), self.store.get("didericis", "bot-bottle", 17))
def test_missing_returns_none(self):
self.assertIsNone(self.store.get("nobody", "nope", 1))
def test_creates_db_parent_dirs(self):
# setUp pointed at a non-existent 'sub/' dir; init must create it.
self.assertIsNone(self.store.get("x", "y", 1)) # no raise
def test_upsert_replaces(self):
self.store.upsert(_state(status=STATUS_RUNNING))
self.store.upsert(_state(status=STATUS_FROZEN))
got = self.store.get("didericis", "bot-bottle", 17)
assert got is not None
self.assertEqual(STATUS_FROZEN, got.status)
# Still one row, not two.
self.assertEqual(1, len(self.store.all()))
def test_delete_is_idempotent(self):
self.store.upsert(_state())
self.store.delete("didericis", "bot-bottle", 17)
self.store.delete("didericis", "bot-bottle", 17) # no raise
self.assertIsNone(self.store.get("didericis", "bot-bottle", 17))
def test_all_lists_across_repos_sorted(self):
self.store.upsert(_state(issue_number=18, slug="other"))
self.store.upsert(_state(issue_number=17))
self.store.upsert(_state(owner="acme", repo="widget", issue_number=3))
states = self.store.all()
self.assertEqual(3, len(states))
self.assertEqual(
[("acme", 3), ("didericis", 17), ("didericis", 18)],
[(s.owner, s.issue_number) for s in states],
)
def test_all_empty(self):
self.assertEqual([], self.store.all())
def test_bottle_names_list_preserved(self):
self.store.upsert(_state(bottle_names=["claude", "dev"]))
got = self.store.get("didericis", "bot-bottle", 17)
assert got is not None
self.assertEqual(["claude", "dev"], got.bottle_names)
def test_pr_number_nullable(self):
self.store.upsert(_state(pr_number=None))
got = self.store.get("didericis", "bot-bottle", 17)
assert got is not None
self.assertIsNone(got.pr_number)
def test_persists_across_store_instances(self):
self.store.upsert(_state())
reopened = SqliteForgeStateStore(self.store._db_path) # pylint: disable=protected-access
self.assertEqual(_state(), reopened.get("didericis", "bot-bottle", 17))
if __name__ == "__main__":
unittest.main()
-22
View File
@@ -14,7 +14,6 @@ from bot_bottle.git_gate import (
git_gate_render_access_hook, git_gate_render_access_hook,
git_gate_render_entrypoint, git_gate_render_entrypoint,
git_gate_render_hook, git_gate_render_hook,
provision_git_gate_dynamic_keys,
revoke_git_gate_provisioned_keys, revoke_git_gate_provisioned_keys,
_resolve_identity_file, _resolve_identity_file,
git_gate_upstreams_for_bottle, git_gate_upstreams_for_bottle,
@@ -372,27 +371,6 @@ class TestDynamicKeyProvisioning(unittest.TestCase):
self.assertEqual("/tmp/provisioned-key", _resolve_identity_file(entry, "demo", self.stage)) self.assertEqual("/tmp/provisioned-key", _resolve_identity_file(entry, "demo", self.stage))
mock_provision.assert_called_once() mock_provision.assert_called_once()
def test_prepare_defers_gitea_key_provisioning(self):
bottle = self._gitea_manifest().bottles["dev"]
with patch("bot_bottle.git_gate_provision._provision_dynamic_key") as mock_provision:
plan = _StubGate().prepare(bottle, "demo", self.stage)
mock_provision.assert_not_called()
self.assertEqual("", plan.upstreams[0].identity_file)
def test_launch_time_helper_provisions_gitea_keys(self):
bottle = self._gitea_manifest().bottles["dev"]
plan = _StubGate().prepare(bottle, "demo", self.stage)
with patch(
"bot_bottle.git_gate_provision._provision_dynamic_key",
return_value="/tmp/provisioned-key",
) as mock_provision:
updated = provision_git_gate_dynamic_keys(bottle, plan, self.stage)
mock_provision.assert_called_once_with(bottle.git[0], "demo", self.stage)
self.assertEqual("/tmp/provisioned-key", updated.upstreams[0].identity_file)
def test_revoke_skips_non_gitea_and_missing_id_file(self): def test_revoke_skips_non_gitea_and_missing_id_file(self):
revoke_git_gate_provisioned_keys(fixture_with_git().bottles["dev"], self.stage) revoke_git_gate_provisioned_keys(fixture_with_git().bottles["dev"], self.stage)