Compare commits

...

12 Commits

Author SHA1 Message Date
didericis-claude 520e6f545d feat: expose stable Python API for programmatic bottle orchestration
lint / lint (push) Failing after 2m1s
test / unit (pull_request) Successful in 50s
test / integration (pull_request) Successful in 18s
test / coverage (pull_request) Failing after 1m8s
Add bot_bottle/api.py with four public functions the orchestrator uses:
start_headless, resume_headless, freeze, and destroy. These let a
ProgrammaticBottleRunner call directly into bot_bottle instead of
shelling out to the CLI; call sites in lifecycle.py stay unchanged.

Key changes:
- BottleSpec gains forge_env field for forge sidecar credentials
- _launch_bottle returns (slug, exit_code) instead of int so start_headless
  can return the slug to callers
- All four API functions convert Die and non-zero exits to BottleError
- 27 new unit tests; existing tests updated for the new return type

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-07-01 17:11:13 +00:00
didericis 42004d37fd refactor(forge): address PR #318 review — PR/Issue split, sqlite state, drop footer
lint / lint (push) Successful in 1m59s
test / unit (pull_request) Successful in 54s
test / integration (pull_request) Successful in 19s
test / coverage (pull_request) Successful in 1m4s
Addresses the five review comments on PR #318:

- Split PullRequest from Issue and add a dedicated read_pr method on
  Forge/ScopedForge/GiteaForge (a PR carries merge state an issue does
  not); is_pr_open now derives from read_pr.
- Replace the JSON-file forge state with a thin swappable CRUD interface
  (ForgeStateStore) backed by SQLite (SqliteForgeStateStore) at
  ~/.bot-bottle/bot-bottle.db.
- Remove the provenance footer (provenance.py + its test): a mutable,
  unsigned PR comment is not an audit record.
- Reword the PRD: provenance is exposed via an API, not surfaced in the
  PR; document the Issue/PullRequest split and the SQLite store.

pyright clean (whole repo), pylint 10/10, 38 forge/resume unit tests pass;
no remaining refs to the removed provenance module or old JSON state API.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01WL77TgFxKbs3cidGMG9dz7
2026-07-01 08:37:25 -04:00
didericis f211ece6bf fix(tests): resolve pyright strict errors in forge test helpers
lint / lint (push) Successful in 2m18s
test / unit (pull_request) Successful in 1m1s
test / integration (pull_request) Successful in 22s
test / coverage (pull_request) Successful in 1m19s
CI runs `pyright .` over the whole repo including tests; the earlier
run only checked the source paths. The test helpers used `**over`
dict-splat into typed constructors, which pyright strict rejects.

- forge_state: build a typed ForgeState base and dataclasses.replace(**over)
- provenance: explicit typed keyword params instead of a **over dict
- resume: _launch_kwargs returns dict[str, Any] (copy call_args.kwargs)
- forge_base: assert PermissionError in __mro__ (avoids always-true issubclass)
- client: annotate _resp body param; type: ignore the mock __enter__ lambda

pyright . now 0 errors; 47 tests still pass; pylint 9.97/10.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01WL77TgFxKbs3cidGMG9dz7
2026-07-01 07:58:55 -04:00
didericis a229a22d54 feat(forge): forge library layer for native integration (PRD chunks 1-3, 5)
lint / lint (push) Failing after 2m9s
test / unit (pull_request) Successful in 58s
test / integration (pull_request) Successful in 21s
test / coverage (pull_request) Successful in 1m23s
Implements the bot-bottle side of the forge-native PRD that is
self-contained in this repo (the forge sidecar and orchestrate command
belong to the separate bot-bottle-orchestrator, a PRD non-goal):

- contrib/forge/base.py: Forge ABC + ScopedForge enforcing the
  read-anywhere / write-scoped model (writes rejected outside the
  assigned issue/PRs via ForgeScopeError).
- contrib/gitea/client.py: GiteaClient (stdlib-only HTTP, mirrors the
  deploy-key provisioner) + GiteaForge. Token held by the caller (the
  sidecar), not injected by cred-proxy.
- contrib/gitea/forge_state.py: ForgeState dataclass + atomic
  read/write/delete/all under ~/.bot-bottle/forge/<owner>/<repo>/.
- contrib/gitea/provenance.py: build_provenance_footer — collapsed
  markdown audit footer; watchdog/gitleaks/egress rendering.
- cli/resume.py: `resume --headless --prompt` reusing the shipped
  assume_yes + headless_prompt launch core (the new half of chunk 1).

47 new unit tests; pylint 9.98/10, pyright clean. Forge sidecar (chunk
4), orchestrate command (chunk 6), and forge_env plumbing are deferred:
their only consumer is the separate orchestrator and they are untestable
in isolation here.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01WL77TgFxKbs3cidGMG9dz7
2026-06-30 19:39:49 -04:00
didericis 738990b2df Merge remote-tracking branch 'origin/main' into forge-native-integration 2026-06-30 19:19:01 -04:00
didericis 4cb106b48d docs(prd): reconcile headless primitives with shipped start --headless
#315 already merged `start --headless` (assume_yes on _launch_bottle +
AgentProvider.headless_prompt). The PRD's proposed start_headless /
attach_agent_headless helpers were redundant with it, and the latter
diverged by hand-rolling --no-interactive/-p instead of using the
headless_prompt provider abstraction. Drop them.

Scope the remaining headless work to what's actually new: a forge_env
hook threaded into the existing _launch_bottle core, and a `resume
--headless` path (resume has no non-interactive entry point today).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01WL77TgFxKbs3cidGMG9dz7
2026-06-30 17:46:59 -04:00
didericis ebad90bfa9 docs(prd): adopt forge sidecar (option 3) for native integration
Flip the forge-native-integration PRD from option 2 (agent calls the
Gitea API directly via cred-proxy; done signal parsed from comments) to
option 3 per issue #317 comment 2715: a forge sidecar backed by a Forge
abstract class.

- signal_done(status, summary) replaces comment-parsing as the done signal
- semantic audit trail from the sidecar feeds provenance directly
- read-anywhere / write-scoped enforcement, tighter than repo-wide API keys
- forge-agnostic agent prompts and sidecar protocol
- DeployKeyProvisioner subsumption deferred; share the HTTP client only

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01WL77TgFxKbs3cidGMG9dz7
2026-06-30 17:39:33 -04:00
didericis b93fe58523 feat(cli): add headless launch mode for orchestrators
test / unit (pull_request) Successful in 47s
test / integration (pull_request) Successful in 16s
test / coverage (pull_request) Successful in 1m4s
lint / lint (push) Successful in 2m0s
test / unit (push) Successful in 48s
test / integration (push) Successful in 18s
test / coverage (push) Successful in 57s
Update Quality Badges / update-badges (push) Successful in 57s
`--headless` is a non-interactive launch path for `cli.py start`:
agent, bottles, label, and color come from flags + manifest defaults
with no TUI selectors and no y/N preflight (auto-confirmed via a new
`assume_yes` param threaded into the shared `_launch_bottle` core).

- `--bottle` (repeatable) defaults to the agent's own `bottle:`;
  `--label` defaults to the agent name and auto-uniquifies on slug
  collision; `--color` defaults to none.
- `--prompt TEXT` is required in headless mode and is delivered to the
  agent via a new `headless_prompt(prompt)` method on `AgentProvider`,
  implemented for claude (`-p`), codex (positional), and pi (`-p`).
- The agent still execs on inherited stdio/PTY, so whatever allocates
  the PTY drives the live session; only the launch chrome is headless.
- `--headless --dry-run` previews the resolved plan without launching.

Adds unit coverage in tests/unit/test_cli_start_headless.py and
headless_prompt tests for each provider. Also stubs headless_prompt on
the in-test AgentProvider subclasses so the unit suite collects cleanly.

Closes #315.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01WL77TgFxKbs3cidGMG9dz7
2026-06-30 15:08:14 -04:00
didericis-claude 1789deaf73 docs: update forge PRD — orchestrator split, done signal, org targeting, forge env vars 2026-06-29 12:39:02 -04:00
didericis-claude b607d68a0e docs: add PRD for forge native integration 2026-06-29 12:10:41 -04:00
didericis 94eca35b4f fix(skills): validate skill names and quote provisioning paths
test / unit (push) Successful in 55s
test / integration (push) Successful in 23s
test / coverage (push) Successful in 1m11s
Update Quality Badges / update-badges (push) Successful in 1m3s
lint / lint (push) Successful in 2m18s
Skill names become host/guest path segments interpolated into the
`bottle.exec` shell strings in each contrib provider's provision_skills.
They were validated only as strings, so a name with shell metacharacters
or path traversal could reach the command.

Layer two defenses:
  - Primary: reject any skill name that isn't kebab-case
    ([a-z][a-z0-9-]*) at manifest load, reusing the convention already
    enforced on bottle/agent filenames (new is_valid_entity_name helper
    in manifest_schema). Fails loud and early, protecting every consumer
    of the name — not just the exec call sites.
  - Failsafe: shlex.quote the interpolated skills_dir / dst paths in the
    claude, codex, and pi providers, so a future unvalidated field can't
    inject shell metacharacters even if it bypasses the load-time check.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01NkwFXLFff9PYPy4wgVBJp9
2026-06-27 02:15:30 -04:00
didericis f787764364 refactor(manifest): break import cycle by extracting ManifestBottle to a leaf module
test / unit (pull_request) Successful in 57s
test / integration (pull_request) Successful in 27s
test / coverage (pull_request) Successful in 1m23s
lint / lint (push) Successful in 2m24s
test / unit (push) Successful in 59s
test / integration (push) Successful in 26s
test / coverage (push) Successful in 1m17s
Update Quality Badges / update-badges (push) Successful in 1m13s
manifest.py imported the extends/loader resolvers, while those resolvers
needed ManifestBottle back from manifest.py — a true bidirectional cycle
papered over with in-function imports and TYPE_CHECKING guards (not clear
dependency inversion).

Extract ManifestBottle into a new leaf module manifest_bottle.py that depends
only on the other leaf modules (manifest_util/agent/egress/git/schema).
manifest.py re-exports ManifestBottle, so `from .manifest import ManifestBottle`
callers are unaffected. With the cycle gone:

- manifest_extends and manifest_loader import ManifestBottle from
  manifest_bottle and their other deps from the real source modules, all at
  top level (TYPE_CHECKING block removed).
- manifest.py imports the extends/loader/schema/yaml_subset/log helpers at
  module top; all per-function lazy imports in the cluster are removed.

No behavior change; full unit suite green, pyright clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01NkwFXLFff9PYPy4wgVBJp9
2026-06-26 23:42:03 -04:00
33 changed files with 2536 additions and 182 deletions
+10
View File
@@ -1 +1,11 @@
"""bot-bottle: Python implementation of the agent container launcher.""" """bot-bottle: Python implementation of the agent container launcher."""
from .api import BottleError, destroy, freeze, resume_headless, start_headless
__all__ = [
"BottleError",
"destroy",
"freeze",
"resume_headless",
"start_headless",
]
+9
View File
@@ -209,6 +209,15 @@ class AgentProvider(ABC):
the supervise sidecar is reachable. No-op when the supervise sidecar is reachable. No-op when
`plan.supervise_plan is None`.""" `plan.supervise_plan is None`."""
@abstractmethod
def headless_prompt(self, prompt: str) -> list[str]:
"""Return the agent CLI args that deliver `prompt` as the
initial task in a non-interactive (headless) session.
Called only when ``--prompt`` is passed to
``./cli.py start --headless``; the returned args are appended
after the provider's ``bypass_args`` and ``startup_args``."""
def provision_ca(self, bottle: "Bottle", plan: "BottlePlan") -> None: def provision_ca(self, bottle: "Bottle", plan: "BottlePlan") -> None:
"""Install the egress MITM CA into the agent's trust store. """Install the egress MITM CA into the agent's trust store.
+258
View File
@@ -0,0 +1,258 @@
"""Public Python API for programmatic bottle orchestration.
Stable surface for bot-bottle-orchestrator (and other Python callers) to
drive bottles without invoking the CLI as a subprocess. Every function
converts ``Die`` and non-zero agent exit codes to ``BottleError`` so
callers use exception handling rather than inspecting return values.
The Protocol the orchestrator's ``BottleRunner`` targets looks like::
class BottleRunner(Protocol):
def start(self, agent: str, *, prompt: str, ...) -> str: ...
def resume(self, slug: str, *, prompt: str) -> None: ...
def freeze(self, slug: str) -> None: ...
def destroy(self, slug: str) -> None: ...
A ``SubprocessBottleRunner`` calls ``./cli.py`` for each operation. A
``ProgrammaticBottleRunner`` calls these functions directly; the Protocol
call sites in ``lifecycle.py`` are unchanged.
"""
from __future__ import annotations
from typing import Sequence
from .backend import BottleSpec
from .backend.freeze import CommitCancelled, get_freezer
from .bottle_state import cleanup_state, clear_preserve_marker, read_metadata
from .cli._common import USER_CWD
from .cli.start import _launch_bottle, _peek_agent_bottle, _uniquify_label_headless
from .log import Die
from .manifest import ManifestError, ManifestIndex
class BottleError(Exception):
"""Raised when a bottle operation fails.
``exit_code`` carries the agent process's exit code when the failure is
a non-zero agent exit; 1 for all other failure modes (missing state,
backend errors, etc.)."""
def __init__(self, message: str, *, exit_code: int = 1) -> None:
super().__init__(message)
self.exit_code = exit_code
def start_headless(
agent_name: str,
*,
prompt: str,
bottles: Sequence[str] | None = None,
label: str | None = None,
color: str | None = None,
backend_name: str | None = None,
copy_cwd: bool = False,
forge_env: dict[str, str] | None = None,
user_cwd: str | None = None,
) -> str:
"""Launch a new bottle headlessly. Returns the bottle slug.
``forge_env`` is passed through to the forge sidecar (not the agent)
when the bottle is forge-targeted; it carries the credentials and
context the sidecar needs to call the forge API.
Raises ``BottleError`` on configuration errors or if the agent exits
non-zero. The returned slug can be passed to ``freeze()``,
``resume_headless()``, or ``destroy()`` for subsequent lifecycle
operations."""
cwd = user_cwd or USER_CWD
try:
manifest = ManifestIndex.resolve(cwd)
manifest.require_agent(agent_name)
except (Die, ManifestError) as exc:
raise BottleError(str(exc)) from exc
if bottles:
bottle_names: tuple[str, ...] = tuple(bottles)
else:
default_bottle = _peek_agent_bottle(manifest, agent_name)
if not default_bottle:
raise BottleError(
f"agent '{agent_name}' has no default bottle; "
f"pass bottles=[...]"
)
bottle_names = (default_bottle,)
spec = BottleSpec(
manifest=manifest,
agent_name=agent_name,
copy_cwd=copy_cwd,
user_cwd=cwd,
label=_uniquify_label_headless(label or agent_name),
color=color or "",
bottle_names=bottle_names,
forge_env=dict(forge_env) if forge_env else {},
)
try:
slug, exit_code = _launch_bottle(
spec,
dry_run=False,
backend_name=backend_name,
assume_yes=True,
headless_prompt_text=prompt,
)
except Die as exc:
raise BottleError(exc.message, exit_code=exc.code) from exc
if exit_code != 0:
raise BottleError(
f"agent exited {exit_code} (slug={slug!r})", exit_code=exit_code
)
return slug
def resume_headless(
slug: str,
*,
prompt: str,
backend_name: str | None = None,
forge_env: dict[str, str] | None = None,
) -> None:
"""Resume a frozen bottle headlessly with ``prompt``.
``forge_env`` re-supplies forge context for the new session (the
sidecar is relaunched alongside the agent on resume).
Raises ``BottleError`` on missing state, backend errors, or non-zero
agent exit."""
metadata = read_metadata(slug)
if metadata is None:
raise BottleError(
f"no state recorded for slug {slug!r}; "
f"check ~/.bot-bottle/state/ or call start_headless() to create a new bottle"
)
try:
manifest = ManifestIndex.resolve(metadata.cwd or USER_CWD)
manifest.require_agent(metadata.agent_name)
except (Die, ManifestError) as exc:
raise BottleError(str(exc)) from exc
spec = BottleSpec(
manifest=manifest,
agent_name=metadata.agent_name,
copy_cwd=metadata.copy_cwd,
user_cwd=metadata.cwd or USER_CWD,
identity=metadata.identity,
bottle_names=tuple(metadata.bottle_names),
forge_env=dict(forge_env) if forge_env else {},
)
try:
_, exit_code = _launch_bottle(
spec,
dry_run=False,
backend_name=backend_name or metadata.backend or None,
assume_yes=True,
headless_prompt_text=prompt,
)
except Die as exc:
raise BottleError(exc.message, exit_code=exc.code) from exc
if exit_code != 0:
raise BottleError(
f"agent exited {exit_code} resuming {slug!r}", exit_code=exit_code
)
def freeze(slug: str, *, backend_name: str | None = None) -> None:
"""Freeze the named bottle to a resumable artifact.
Reads the bottle's backend from its metadata when ``backend_name`` is
not supplied. Raises ``BottleError`` if the freeze fails."""
metadata = read_metadata(slug)
resolved_backend = backend_name or (metadata.backend if metadata else "") or "docker"
try:
get_freezer(resolved_backend).commit_slug(slug)
except CommitCancelled as exc:
raise BottleError(f"freeze cancelled for {slug!r}") from exc
except Die as exc:
raise BottleError(exc.message, exit_code=exc.code) from exc
def destroy(slug: str, *, backend_name: str | None = None) -> None:
"""Destroy the named bottle, removing all resources and state.
Brings down any running resources for ``slug``, then removes the
per-bottle state directory. Idempotent: a slug with no running
resources or no state directory is not an error."""
metadata = read_metadata(slug)
resolved_backend = backend_name or (metadata.backend if metadata else "") or "docker"
try:
if resolved_backend == "docker":
_destroy_docker(slug)
elif resolved_backend == "smolmachines":
_destroy_smolmachines(slug)
# macos-container: the container is torn down inside the launch
# context manager; no persistent VM survives, so nothing extra is
# needed at destroy time beyond the state-dir removal below.
except Die as exc:
raise BottleError(exc.message, exit_code=exc.code) from exc
clear_preserve_marker(slug)
cleanup_state(slug)
# --- backend-specific helpers -----------------------------------------------
def _destroy_docker(slug: str) -> None:
"""Best-effort ``docker compose down`` for a Docker bottle.
No-op when the compose file is absent — the project was already
brought down (normal for a frozen bottle) or was never created."""
from .backend.docker.compose import (
compose_down,
compose_file_path,
compose_project_name,
)
from .bottle_state import bottle_state_dir
state_dir = bottle_state_dir(slug)
compose_file = compose_file_path(state_dir)
if compose_file.exists():
compose_down(compose_project_name(slug), compose_file)
def _destroy_smolmachines(slug: str) -> None:
"""Best-effort stop + delete for a smolmachines bottle.
Both steps are best-effort: a machine that is already gone does not
cause an error; partial failures are logged as warnings."""
import subprocess
from .log import warn
machine = f"bot-bottle-{slug}"
subprocess.run(
["smolvm", "machine", "stop", "--name", machine],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
)
r = subprocess.run(
["smolvm", "machine", "delete", "-f", machine],
capture_output=True,
text=True,
check=False,
)
if r.returncode != 0:
warn(
f"smolvm machine delete -f {machine!r} failed "
f"(may already be gone): {(r.stderr or '').strip()}"
)
__all__ = [
"BottleError",
"destroy",
"freeze",
"resume_headless",
"start_headless",
]
+6 -1
View File
@@ -37,7 +37,7 @@ import shlex
import sys import sys
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from contextlib import AbstractContextManager from contextlib import AbstractContextManager
from dataclasses import dataclass from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from typing import Any, Generic, Sequence, TypeVar from typing import Any, Generic, Sequence, TypeVar
@@ -75,6 +75,11 @@ class BottleSpec:
# Ordered bottle names selected at launch (issue #269). When non-empty # Ordered bottle names selected at launch (issue #269). When non-empty
# they are merged in order and replace the agent's `bottle:` field. # they are merged in order and replace the agent's `bottle:` field.
bottle_names: tuple[str, ...] = () bottle_names: tuple[str, ...] = ()
# Forge sidecar env vars (PRD forge-native-integration, chunk 1).
# Passed by the orchestrator at launch time; the forge sidecar reads
# them to connect to Gitea. Empty for non-forge runs. The agent
# process itself does not receive these.
forge_env: dict[str, str] = field(default_factory=dict)
@dataclass(frozen=True) @dataclass(frozen=True)
+26 -1
View File
@@ -27,12 +27,34 @@ from .start import _launch_bottle
def cmd_resume(argv: list[str]) -> int: def cmd_resume(argv: list[str]) -> int:
parser = argparse.ArgumentParser(prog=f"{PROG} resume", add_help=True) parser = argparse.ArgumentParser(prog=f"{PROG} resume", add_help=True)
parser.add_argument("--dry-run", action="store_true") parser.add_argument("--dry-run", action="store_true")
parser.add_argument(
"--headless",
action="store_true",
help=(
"non-interactive rehydrate: deliver --prompt to the agent and "
"skip the y/N preflight. For orchestrators / the freeze-rehydrate "
"loop."
),
)
parser.add_argument(
"--prompt",
default=None,
help="follow-up prompt delivered to the agent (required with --headless)",
)
parser.add_argument( parser.add_argument(
"identity", "identity",
help="bottle identity from a prior `start` (see its session-end output)", help="bottle identity from a prior `start` (see its session-end output)",
) )
args = parser.parse_args(argv) args = parser.parse_args(argv)
if args.prompt and not args.headless:
die("--prompt is only valid with --headless")
if args.headless and not args.prompt:
die(
"--headless requires --prompt: "
"./cli.py resume <identity> --headless --prompt 'Address the review'"
)
metadata = read_metadata(args.identity) metadata = read_metadata(args.identity)
if metadata is None: if metadata is None:
die( die(
@@ -52,8 +74,11 @@ def cmd_resume(argv: list[str]) -> int:
bottle_names=tuple(metadata.bottle_names), bottle_names=tuple(metadata.bottle_names),
) )
backend_name = metadata.backend or None backend_name = metadata.backend or None
return _launch_bottle( _, rc = _launch_bottle(
spec, spec,
dry_run=args.dry_run, dry_run=args.dry_run,
backend_name=backend_name, backend_name=backend_name,
assume_yes=args.headless,
headless_prompt_text=args.prompt or "",
) )
return rc
+154 -11
View File
@@ -2,6 +2,11 @@
interactive claude-code session. The container is torn down when the interactive claude-code session. The container is torn down when the
session ends. session ends.
`--headless` selects a non-interactive launch (agent/bottles/label from
flags, no TUI selectors, no y/N prompt) for orchestrators,
CI, and webhook dispatch. The agent still execs on the inherited
stdio/PTY, so an orchestrator that allocates the PTY drives the session.
The launch core is shared with `cli.py resume <identity>` through The launch core is shared with `cli.py resume <identity>` through
the private orchestrator `_launch_bottle`. the private orchestrator `_launch_bottle`.
""" """
@@ -16,7 +21,7 @@ import tempfile
from pathlib import Path from pathlib import Path
from typing import Callable from typing import Callable
from ..agent_provider import runtime_for from ..agent_provider import get_provider, runtime_for
from ..backend import ( from ..backend import (
Bottle, Bottle,
BottleSpec, BottleSpec,
@@ -31,7 +36,7 @@ from ..bottle_state import (
is_preserved, is_preserved,
mark_preserved, mark_preserved,
) )
from ..log import info from ..log import info, die
from ..manifest import Manifest, ManifestIndex from ..manifest import Manifest, ManifestIndex
from ._common import PROG, USER_CWD, read_tty_line from ._common import PROG, USER_CWD, read_tty_line
from . import tui from . import tui
@@ -50,6 +55,39 @@ def cmd_start(argv: list[str]) -> int:
"or host auto-selection). Overrides the env var when set." "or host auto-selection). Overrides the env var when set."
), ),
) )
parser.add_argument(
"--headless",
action="store_true",
help=(
"non-interactive launch: take agent/bottles/label from flags, "
"skip all prompts. For orchestrators, CI, and webhooks."
),
)
parser.add_argument(
"--bottle",
action="append",
default=None,
metavar="NAME",
help=(
"bottle to compose, repeatable (order = merge order). In "
"--headless, defaults to the agent's own bottle when omitted."
),
)
parser.add_argument(
"--label",
default=None,
help="bottle label / terminal title (--headless default: agent name)",
)
parser.add_argument(
"--color",
default=None,
help="bottle color, one of the 16 ANSI color names (--headless default: none)",
)
parser.add_argument(
"--prompt",
default=None,
help="initial task prompt delivered to the agent (required with --headless)",
)
parser.add_argument( parser.add_argument(
"name", "name",
nargs="?", nargs="?",
@@ -61,6 +99,12 @@ def cmd_start(argv: list[str]) -> int:
dry_run = args.dry_run or os.environ.get("BOT_BOTTLE_DRY_RUN") == "1" dry_run = args.dry_run or os.environ.get("BOT_BOTTLE_DRY_RUN") == "1"
manifest = ManifestIndex.resolve(USER_CWD) manifest = ManifestIndex.resolve(USER_CWD)
backend_name: str | None = args.backend
if args.headless:
return _start_headless(
manifest, args, dry_run=dry_run, backend_name=backend_name
)
agent_name: str | None = args.name agent_name: str | None = args.name
if agent_name is None: if agent_name is None:
@@ -71,8 +115,6 @@ def cmd_start(argv: list[str]) -> int:
if agent_name is None: if agent_name is None:
return 0 return 0
backend_name: str | None = args.backend
# Bottle multiselect: always show after agent selection so operators # Bottle multiselect: always show after agent selection so operators
# can compose bottles at launch time without editing agent manifests. # can compose bottles at launch time without editing agent manifests.
available_bottles = manifest.all_bottle_names available_bottles = manifest.all_bottle_names
@@ -102,11 +144,90 @@ def cmd_start(argv: list[str]) -> int:
color=color, color=color,
bottle_names=bottle_names, bottle_names=bottle_names,
) )
return _launch_bottle( _, rc = _launch_bottle(
spec, spec,
dry_run=dry_run, dry_run=dry_run,
backend_name=backend_name, backend_name=backend_name,
) )
return rc
# --- Headless launch -----------------------------------------------------
def _start_headless(
manifest: ManifestIndex,
args: argparse.Namespace,
*,
dry_run: bool,
backend_name: str | None,
) -> int:
"""Non-interactive launch path for orchestrators / CI / webhooks.
Resolves agent, bottles, label, and color from flags + manifest
defaults instead of the TUI selectors, and auto-confirms the
preflight. Otherwise runs the same launch core as the interactive
path, so the agent still execs on the inherited stdio/PTY — an
orchestrator allocates that PTY and relays it to its
desktop/mobile clients."""
agent_name = args.name
if not agent_name:
die("--headless requires an agent name: ./cli.py start <agent> --headless")
manifest.require_agent(agent_name) # raises ManifestError if unknown
prompt = args.prompt
if not prompt:
die(
"--headless requires --prompt: "
"./cli.py start <agent> --headless --prompt 'Do the thing'"
)
if args.bottle:
bottle_names: tuple[str, ...] = tuple(args.bottle)
else:
default_bottle = _peek_agent_bottle(manifest, agent_name)
if not default_bottle:
die(
f"--headless: agent '{agent_name}' has no default bottle; "
f"pass one or more --bottle NAME"
)
bottle_names = (default_bottle,)
label = _uniquify_label_headless(args.label or agent_name)
spec = BottleSpec(
manifest=manifest,
agent_name=agent_name,
copy_cwd=args.cwd,
user_cwd=USER_CWD,
label=label,
color=args.color or "",
bottle_names=bottle_names,
)
_, rc = _launch_bottle(
spec,
dry_run=dry_run,
backend_name=backend_name,
assume_yes=True,
headless_prompt_text=prompt,
)
return rc
def _uniquify_label_headless(label: str) -> str:
"""Non-interactive analog of `_resolve_unique_label`: if the label's
slug collides with a running bottle, append -2, -3, … until free,
logging the chosen label. Orchestrators fire-and-forget many bottles,
so silently picking a free name beats erroring on every collision."""
active_slugs = {a.slug for a in enumerate_active_agents()}
if docker_mod.slugify(label) not in active_slugs:
return label
n = 2
while docker_mod.slugify(f"{label}-{n}") in active_slugs:
n += 1
chosen = f"{label}-{n}"
info(f"label '{label}' already in use; using '{chosen}'")
return chosen
# --- Launch helpers ------------------------------------------------------ # --- Launch helpers ------------------------------------------------------
@@ -376,31 +497,53 @@ def _launch_bottle(
*, *,
dry_run: bool, dry_run: bool,
backend_name: str | None = None, backend_name: str | None = None,
) -> int: assume_yes: bool = False,
headless_prompt_text: str = "",
) -> tuple[str, int]:
"""Shared launch core for `start` and `resume`. Builds the plan, """Shared launch core for `start` and `resume`. Builds the plan,
prints / dry-runs / prompts as appropriate, brings the bottle up, prints / dry-runs / prompts as appropriate, brings the bottle up,
attaches claude, and prints the resume hint on session end.""" attaches claude, and prints the resume hint on session end.
Returns ``(slug, exit_code)`` where ``slug`` is the bottle identity
(empty string when the launch was aborted before a slug was minted)
and ``exit_code`` is the agent process's exit code (0 on clean exit
or when launch was aborted before the agent ran).
`assume_yes` skips the interactive y/N confirmation (headless /
orchestrator launches), where there is no human at the prompt.
`headless_prompt_text` is passed to the provider's `headless_prompt`
method and the resulting args are appended to startup_args so the
agent receives the initial task without interactive input."""
stage_dir = Path(tempfile.mkdtemp(prefix="bot-bottle-stage.")) stage_dir = Path(tempfile.mkdtemp(prefix="bot-bottle-stage."))
identity = "" identity = ""
exit_code = 0
try: try:
plan, identity = prepare_with_preflight( plan, identity = prepare_with_preflight(
spec, spec,
stage_dir=stage_dir, stage_dir=stage_dir,
render_preflight=_text_render_preflight(), render_preflight=_text_render_preflight(),
prompt_yes=_text_prompt_yes, prompt_yes=(lambda: True) if assume_yes else _text_prompt_yes,
dry_run=dry_run, dry_run=dry_run,
backend_name=backend_name, backend_name=backend_name,
) )
if plan is None: if plan is None:
return 0 return identity, 0
backend = get_bottle_backend(backend_name) backend = get_bottle_backend(backend_name)
with backend.launch(plan) as bottle: with backend.launch(plan) as bottle:
agent_provider_template = getattr(plan, "agent_provider_template", "claude") agent_provider_template = getattr(plan, "agent_provider_template", "claude")
extra_args: tuple[str, ...] = ()
if headless_prompt_text:
extra_args = tuple(
get_provider(agent_provider_template).headless_prompt(
headless_prompt_text
)
)
exit_code = attach_agent( exit_code = attach_agent(
bottle, bottle,
agent_provider_template=agent_provider_template, agent_provider_template=agent_provider_template,
startup_args=plan.agent_provision.startup_args, startup_args=plan.agent_provision.startup_args + extra_args,
) )
info( info(
f"session ended (exit {exit_code}); " f"session ended (exit {exit_code}); "
@@ -412,7 +555,7 @@ def _launch_bottle(
# Ctrl-Cs / OOM kills before cleanup removes the state dir. # Ctrl-Cs / OOM kills before cleanup removes the state dir.
if agent_provider_template == "claude": if agent_provider_template == "claude":
capture_claude_session_state(identity, exit_code) capture_claude_session_state(identity, exit_code)
return 0 return identity, exit_code
finally: finally:
# PRD 0018 chunk 2: prepare now writes the bottle's bind-mount # PRD 0018 chunk 2: prepare now writes the bottle's bind-mount
# sources under state/<slug>/. If we never reached the # sources under state/<slug>/. If we never reached the
+10 -3
View File
@@ -217,7 +217,7 @@ class ClaudeAgentProvider(AgentProvider):
if not agent.skills: if not agent.skills:
return return
skills_dir = _skills_dir(plan.guest_home) skills_dir = _skills_dir(plan.guest_home)
bottle.exec(f"mkdir -p {skills_dir}", user="root") bottle.exec(f"mkdir -p {shlex.quote(skills_dir)}", user="root")
for name in agent.skills: for name in agent.skills:
src = host_skill_dir(name) src = host_skill_dir(name)
if not os.path.isdir(src): if not os.path.isdir(src):
@@ -227,9 +227,13 @@ class ClaudeAgentProvider(AgentProvider):
) )
dst = f"{skills_dir}/{name}" dst = f"{skills_dir}/{name}"
info(f"copying skill {name} into {bottle.name}:{dst}") info(f"copying skill {name} into {bottle.name}:{dst}")
bottle.exec(f"rm -rf {dst} && mkdir -p {dst}", user="root") # Defense in depth: skill names are validated kebab-case at
# manifest load, but quote the path so a future unvalidated
# field can't inject shell metacharacters here either.
dst_q = shlex.quote(dst)
bottle.exec(f"rm -rf {dst_q} && mkdir -p {dst_q}", user="root")
bottle.cp_in(f"{src}/.", f"{dst}/") bottle.cp_in(f"{src}/.", f"{dst}/")
bottle.exec(f"chown -R node:node {dst}", user="root") bottle.exec(f"chown -R node:node {dst_q}", user="root")
def provision_prompt(self, plan: "BottlePlan", bottle: "Bottle") -> str | None: def provision_prompt(self, plan: "BottlePlan", bottle: "Bottle") -> str | None:
"""Copy the prompt file into the guest, fix ownership/mode. """Copy the prompt file into the guest, fix ownership/mode.
@@ -309,6 +313,9 @@ class ClaudeAgentProvider(AgentProvider):
f"claude mcp add --scope user --transport http supervise {supervise_url}" f"claude mcp add --scope user --transport http supervise {supervise_url}"
) )
def headless_prompt(self, prompt: str) -> list[str]:
return ["-p", prompt]
def _exec(bottle: "Bottle", script: str, error: str) -> None: def _exec(bottle: "Bottle", script: str, error: str) -> None:
result = bottle.exec(script, user="root") result = bottle.exec(script, user="root")
+10 -3
View File
@@ -183,7 +183,7 @@ class CodexAgentProvider(AgentProvider):
if not agent.skills: if not agent.skills:
return return
skills_dir = _skills_dir(plan.guest_home) skills_dir = _skills_dir(plan.guest_home)
bottle.exec(f"mkdir -p {skills_dir}", user="root") bottle.exec(f"mkdir -p {shlex.quote(skills_dir)}", user="root")
for name in agent.skills: for name in agent.skills:
src = host_skill_dir(name) src = host_skill_dir(name)
if not os.path.isdir(src): if not os.path.isdir(src):
@@ -193,9 +193,13 @@ class CodexAgentProvider(AgentProvider):
) )
dst = f"{skills_dir}/{name}" dst = f"{skills_dir}/{name}"
info(f"copying skill {name} into {bottle.name}:{dst}") info(f"copying skill {name} into {bottle.name}:{dst}")
bottle.exec(f"rm -rf {dst} && mkdir -p {dst}", user="root") # Defense in depth: skill names are validated kebab-case at
# manifest load, but quote the path so a future unvalidated
# field can't inject shell metacharacters here either.
dst_q = shlex.quote(dst)
bottle.exec(f"rm -rf {dst_q} && mkdir -p {dst_q}", user="root")
bottle.cp_in(f"{src}/.", f"{dst}/") bottle.cp_in(f"{src}/.", f"{dst}/")
bottle.exec(f"chown -R node:node {dst}", user="root") bottle.exec(f"chown -R node:node {dst_q}", user="root")
def provision_prompt(self, plan: "BottlePlan", bottle: "Bottle") -> str | None: def provision_prompt(self, plan: "BottlePlan", bottle: "Bottle") -> str | None:
"""Copy the prompt file into the guest, fix ownership/mode. """Copy the prompt file into the guest, fix ownership/mode.
@@ -275,6 +279,9 @@ class CodexAgentProvider(AgentProvider):
f"codex mcp add supervise --url {shlex.quote(supervise_url)}" f"codex mcp add supervise --url {shlex.quote(supervise_url)}"
) )
def headless_prompt(self, prompt: str) -> list[str]:
return [prompt]
def _exec(bottle: "Bottle", script: str, error: str) -> None: def _exec(bottle: "Bottle", script: str, error: str) -> None:
result = bottle.exec(script, user="root") result = bottle.exec(script, user="root")
+165
View File
@@ -0,0 +1,165 @@
"""Forge abstraction (PRD forge-native-integration, chunk 3).
The `Forge` abstract class is the provider-agnostic surface a forge
sidecar dispatches to: read issues/comments, post comments, edit
descriptions, and the membership / PR lookups the orchestrator needs.
Each forge (Gitea first) implements it; the sidecar protocol and the
agent prompt stay forge-agnostic.
`signal_done` is deliberately *not* a `Forge` method completion is a
sidecar concept relayed to the orchestrator over a queue dir, not a
forge API operation.
`ScopedForge` enforces the PRD's **read-anywhere / write-scoped** model:
reads pass through to any issue/PR for context; writes are rejected
unless the target is the assigned issue or one of its PRs. This bounds
the blast radius of a prompt-injected agent below repo-wide API-key
permissions.
"""
from __future__ import annotations
import abc
from collections.abc import Iterable
from dataclasses import dataclass
@dataclass(frozen=True)
class Issue:
"""A forge issue (not a PR — see `PullRequest`)."""
number: int
title: str
body: str
state: str # "open" | "closed"
@dataclass(frozen=True)
class PullRequest:
"""A forge pull request. Kept distinct from `Issue` even though some
forges model PRs as issues on the wire: the domain objects carry
different data (a PR has merge state) and are read through different
methods (`read_pr` vs `read_issue`)."""
number: int
title: str
body: str
state: str # "open" | "closed"
merged: bool
@dataclass(frozen=True)
class Comment:
id: int
user: str # login of the comment author
body: str
class ForgeScopeError(PermissionError):
"""Raised by `ScopedForge` when a write targets an issue/PR outside
the assigned scope."""
class Forge(abc.ABC):
"""Provider-agnostic forge operations. Implementations wrap a
per-provider HTTP client and translate to `Issue` / `Comment`."""
@abc.abstractmethod
def read_issue(self, number: int) -> Issue:
"""Read an issue body (read-anywhere)."""
@abc.abstractmethod
def read_pr(self, number: int) -> PullRequest:
"""Read a pull request, including its merge state (read-anywhere)."""
@abc.abstractmethod
def read_comments(self, number: int) -> list[Comment]:
"""Read a thread's comments (read-anywhere)."""
@abc.abstractmethod
def post_comment(self, number: int, body: str) -> None:
"""Post a comment to an issue or PR (write-scoped)."""
@abc.abstractmethod
def update_description(self, number: int, body: str) -> None:
"""Replace an issue or PR body (write-scoped)."""
@abc.abstractmethod
def is_org_member(self, org: str, username: str) -> bool:
"""Whether `username` is a member of `org`."""
@abc.abstractmethod
def get_pr_for_issue(self, number: int) -> int | None:
"""The PR number linked to an issue, or None when there is none."""
@abc.abstractmethod
def is_pr_open(self, number: int) -> bool:
"""Whether the given PR is still open."""
class ScopedForge(Forge):
"""Read-anywhere / write-scoped wrapper around a concrete `Forge`.
`post_comment` and `update_description` are rejected with
`ForgeScopeError` unless the target number is the assigned issue or
one of the assigned PRs. Every other method delegates unchanged, so
reads, membership checks, and PR lookups work against any number for
context.
The writable set is fixed at construction. The sidecar reconstructs
a `ScopedForge` when a PR is discovered (`get_pr_for_issue`) so the
new PR becomes writable; this class does not mutate its own scope.
"""
def __init__(
self,
inner: Forge,
*,
assigned_issue: int,
assigned_prs: Iterable[int] = (),
) -> None:
self._inner = inner
self._assigned_issue = assigned_issue
self._writable = {assigned_issue, *assigned_prs}
@property
def writable(self) -> frozenset[int]:
return frozenset(self._writable)
def _check_write(self, number: int) -> None:
if number not in self._writable:
allowed = ", ".join(str(n) for n in sorted(self._writable))
raise ForgeScopeError(
f"write to #{number} denied: out of assigned scope "
f"(writable: {allowed})"
)
# --- read-anywhere: pass through --------------------------------------
def read_issue(self, number: int) -> Issue:
return self._inner.read_issue(number)
def read_pr(self, number: int) -> PullRequest:
return self._inner.read_pr(number)
def read_comments(self, number: int) -> list[Comment]:
return self._inner.read_comments(number)
def is_org_member(self, org: str, username: str) -> bool:
return self._inner.is_org_member(org, username)
def get_pr_for_issue(self, number: int) -> int | None:
return self._inner.get_pr_for_issue(number)
def is_pr_open(self, number: int) -> bool:
return self._inner.is_pr_open(number)
# --- write-scoped: check then delegate --------------------------------
def post_comment(self, number: int, body: str) -> None:
self._check_write(number)
self._inner.post_comment(number, body)
def update_description(self, number: int, body: str) -> None:
self._check_write(number)
self._inner.update_description(number, body)
+174
View File
@@ -0,0 +1,174 @@
"""Gitea HTTP client + `GiteaForge` (PRD forge-native-integration, chunk 3).
`GiteaClient` is the thin stdlib-only HTTP transport (mirrors
`deploy_key_provisioner.py`: `urllib.request`, bounded timeouts,
structured error bodies). `GiteaForge` adapts it to the provider-agnostic
`Forge` surface.
Unlike the option-2 design, the token is held here (the sidecar process
owns it) and passed to the client directly there is no agent-side
cred-proxy route, because the agent never makes forge calls. The HTTP
client is the one piece shared with `GiteaDeployKeyProvisioner`; the two
are deliberately *not* unified behind a common abstract base (see the
deferral note in the PRD).
"""
from __future__ import annotations
import json
import urllib.error
import urllib.request
from typing import Any
from ..forge.base import Comment, Forge, Issue, PullRequest
# Bound every Gitea call: a hung instance must not stall the sidecar.
_API_TIMEOUT_SECS = 30
class GiteaClient:
"""Thin authenticated HTTP client for one repo's Gitea API.
`api_url` is the API base *including* `/api/v1` (matching the
`FORGE_GITEA_API` env var), e.g. `https://gitea.example.com/api/v1`.
"""
def __init__(self, *, api_url: str, owner: str, repo: str, token: str) -> None:
self._api_url = api_url.rstrip("/")
self._owner = owner
self._repo = repo
self._token = token
# --- low-level request -------------------------------------------------
def _request(
self, method: str, path: str, *, body: dict[str, Any] | None = None
) -> tuple[int, Any]:
"""Issue an authenticated request. Returns `(status, parsed_json)`;
parsed_json is None when the response has no body. Raises
`RuntimeError` on any non-2xx except where callers special-case
the HTTPError themselves (membership 404)."""
url = f"{self._api_url}{path}"
data = json.dumps(body).encode() if body is not None else None
headers = {"Authorization": f"token {self._token}"}
if data is not None:
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, data=data, headers=headers, method=method)
with urllib.request.urlopen(req, timeout=_API_TIMEOUT_SECS) as resp:
raw = resp.read()
parsed = json.loads(raw) if raw else None
return resp.status, parsed
def _repo_path(self, suffix: str) -> str:
return f"/repos/{self._owner}/{self._repo}{suffix}"
# --- operations --------------------------------------------------------
def is_org_member(self, org: str, username: str) -> bool:
"""GET /orgs/{org}/members/{username}: 2xx → member, 404 → not.
Other errors propagate so a misconfigured token fails loudly."""
url = f"{self._api_url}/orgs/{org}/members/{username}"
req = urllib.request.Request(
url, headers={"Authorization": f"token {self._token}"}, method="GET"
)
try:
with urllib.request.urlopen(req, timeout=_API_TIMEOUT_SECS):
return True
except urllib.error.HTTPError as exc:
if exc.code == 404:
return False
raise RuntimeError(
f"org membership check failed for {org}/{username}: "
f"HTTP {exc.code}{_read_error_body(exc)}"
) from exc
def get_issue(self, number: int) -> dict[str, Any]:
_status, body = self._request("GET", self._repo_path(f"/issues/{number}"))
return body or {}
def get_comments(self, number: int) -> list[dict[str, Any]]:
_status, body = self._request(
"GET", self._repo_path(f"/issues/{number}/comments")
)
return body or []
def post_comment(self, number: int, body: str) -> None:
self._request(
"POST",
self._repo_path(f"/issues/{number}/comments"),
body={"body": body},
)
def patch_issue_body(self, number: int, body: str) -> None:
self._request(
"PATCH", self._repo_path(f"/issues/{number}"), body={"body": body}
)
def get_pull(self, number: int) -> dict[str, Any]:
_status, body = self._request("GET", self._repo_path(f"/pulls/{number}"))
return body or {}
class GiteaForge(Forge):
"""`Forge` over a `GiteaClient`."""
def __init__(self, client: GiteaClient) -> None:
self._client = client
def read_issue(self, number: int) -> Issue:
raw = self._client.get_issue(number)
return Issue(
number=int(raw.get("number", number)),
title=str(raw.get("title", "")),
body=str(raw.get("body", "") or ""),
state=str(raw.get("state", "")),
)
def read_pr(self, number: int) -> PullRequest:
raw = self._client.get_pull(number)
return PullRequest(
number=int(raw.get("number", number)),
title=str(raw.get("title", "")),
body=str(raw.get("body", "") or ""),
state=str(raw.get("state", "")),
merged=bool(raw.get("merged", False)),
)
def read_comments(self, number: int) -> list[Comment]:
return [
Comment(
id=int(c.get("id", 0)),
user=str((c.get("user") or {}).get("login", "")),
body=str(c.get("body", "") or ""),
)
for c in self._client.get_comments(number)
]
def post_comment(self, number: int, body: str) -> None:
self._client.post_comment(number, body)
def update_description(self, number: int, body: str) -> None:
self._client.patch_issue_body(number, body)
def is_org_member(self, org: str, username: str) -> bool:
return self._client.is_org_member(org, username)
def get_pr_for_issue(self, number: int) -> int | None:
"""Gitea models a PR as an issue with the same number, exposing a
`pull_request` object on the issue. When the queried number is
itself a PR, return it; otherwise None. (The orchestrator tracks
the issuePR mapping in forge state for the cross-number case.)"""
raw = self._client.get_issue(number)
if raw.get("pull_request"):
return int(raw.get("number", number))
return None
def is_pr_open(self, number: int) -> bool:
return self.read_pr(number).state == "open"
def _read_error_body(exc: urllib.error.HTTPError) -> str:
try:
return exc.read().decode("utf-8", errors="replace")
except Exception: # pylint: disable=broad-exception-caught
return ""
+171
View File
@@ -0,0 +1,171 @@
"""Forge state persistence (PRD forge-native-integration, chunk 2).
The orchestrator tracks one record per forge-targeted issue so it can
map an incoming webhook back to the bottle handling it, drive the
freeze / rehydrate loop, and run the watchdog.
State is stored in a local SQLite database in `~/.bot-bottle/`. Access
goes through the thin `ForgeStateStore` CRUD interface so the backing
store (location or engine) can be swapped without touching callers;
`SqliteForgeStateStore` is the first implementation.
"""
from __future__ import annotations
import abc
import json
import sqlite3
from dataclasses import dataclass, field
from pathlib import Path
from ...supervise import bot_bottle_root
_DB_FILENAME = "bot-bottle.db"
# Lifecycle: a bottle is launched (running), frozen on the done signal,
# and destroyed when the PR closes.
STATUS_RUNNING = "running"
STATUS_FROZEN = "frozen"
STATUS_DESTROYED = "destroyed"
@dataclass
class ForgeState:
"""One forge-targeted issue's bottle lifecycle record."""
owner: str
repo: str
issue_number: int
slug: str
agent_name: str
bottle_names: list[str] = field(default_factory=list)
backend_name: str = ""
agent_git_user: str = ""
pr_number: int | None = None
status: str = STATUS_RUNNING
last_checkin_at: str = ""
class ForgeStateStore(abc.ABC):
"""Thin CRUD surface over forge state. Implementations back it with a
concrete store; callers depend only on this interface so the storage
location/engine is swappable."""
@abc.abstractmethod
def upsert(self, state: ForgeState) -> None:
"""Insert or replace the record keyed by (owner, repo, issue)."""
@abc.abstractmethod
def get(self, owner: str, repo: str, issue_number: int) -> ForgeState | None:
"""Fetch one record, or None when absent."""
@abc.abstractmethod
def delete(self, owner: str, repo: str, issue_number: int) -> None:
"""Remove a record. Missing is success (idempotent)."""
@abc.abstractmethod
def all(self) -> list[ForgeState]:
"""Every record, for the status table and the watchdog sweep."""
def default_db_path() -> Path:
return bot_bottle_root() / _DB_FILENAME
class SqliteForgeStateStore(ForgeStateStore):
"""SQLite-backed `ForgeStateStore`. The database lives at
`~/.bot-bottle/bot-bottle.db` by default; pass `db_path` to point at
a different location (tests, alternate homes)."""
def __init__(self, db_path: Path | None = None) -> None:
self._db_path = db_path or default_db_path()
self._db_path.parent.mkdir(parents=True, exist_ok=True)
with self._connect() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS forge_state (
owner TEXT NOT NULL,
repo TEXT NOT NULL,
issue_number INTEGER NOT NULL,
slug TEXT NOT NULL,
agent_name TEXT NOT NULL,
bottle_names TEXT NOT NULL,
backend_name TEXT NOT NULL,
agent_git_user TEXT NOT NULL,
pr_number INTEGER,
status TEXT NOT NULL,
last_checkin_at TEXT NOT NULL,
PRIMARY KEY (owner, repo, issue_number)
)
"""
)
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self._db_path)
conn.row_factory = sqlite3.Row
return conn
def upsert(self, state: ForgeState) -> None:
with self._connect() as conn:
conn.execute(
"""
INSERT OR REPLACE INTO forge_state (
owner, repo, issue_number, slug, agent_name,
bottle_names, backend_name, agent_git_user,
pr_number, status, last_checkin_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
state.owner,
state.repo,
state.issue_number,
state.slug,
state.agent_name,
json.dumps(state.bottle_names),
state.backend_name,
state.agent_git_user,
state.pr_number,
state.status,
state.last_checkin_at,
),
)
def get(self, owner: str, repo: str, issue_number: int) -> ForgeState | None:
with self._connect() as conn:
row = conn.execute(
"SELECT * FROM forge_state "
"WHERE owner = ? AND repo = ? AND issue_number = ?",
(owner, repo, issue_number),
).fetchone()
return _row_to_state(row) if row is not None else None
def delete(self, owner: str, repo: str, issue_number: int) -> None:
with self._connect() as conn:
conn.execute(
"DELETE FROM forge_state "
"WHERE owner = ? AND repo = ? AND issue_number = ?",
(owner, repo, issue_number),
)
def all(self) -> list[ForgeState]:
with self._connect() as conn:
rows = conn.execute(
"SELECT * FROM forge_state ORDER BY owner, repo, issue_number"
).fetchall()
return [_row_to_state(row) for row in rows]
def _row_to_state(row: sqlite3.Row) -> ForgeState:
return ForgeState(
owner=row["owner"],
repo=row["repo"],
issue_number=row["issue_number"],
slug=row["slug"],
agent_name=row["agent_name"],
bottle_names=json.loads(row["bottle_names"]),
backend_name=row["backend_name"],
agent_git_user=row["agent_git_user"],
pr_number=row["pr_number"],
status=row["status"],
last_checkin_at=row["last_checkin_at"],
)
+10 -3
View File
@@ -238,7 +238,7 @@ class PiAgentProvider(AgentProvider):
if not agent.skills: if not agent.skills:
return return
skills_dir = _skills_dir(plan.guest_home) skills_dir = _skills_dir(plan.guest_home)
bottle.exec(f"mkdir -p {skills_dir}", user="root") bottle.exec(f"mkdir -p {shlex.quote(skills_dir)}", user="root")
for name in agent.skills: for name in agent.skills:
src = host_skill_dir(name) src = host_skill_dir(name)
if not os.path.isdir(src): if not os.path.isdir(src):
@@ -248,9 +248,13 @@ class PiAgentProvider(AgentProvider):
) )
dst = f"{skills_dir}/{name}" dst = f"{skills_dir}/{name}"
info(f"copying skill {name} into {bottle.name}:{dst}") info(f"copying skill {name} into {bottle.name}:{dst}")
bottle.exec(f"rm -rf {dst} && mkdir -p {dst}", user="root") # Defense in depth: skill names are validated kebab-case at
# manifest load, but quote the path so a future unvalidated
# field can't inject shell metacharacters here either.
dst_q = shlex.quote(dst)
bottle.exec(f"rm -rf {dst_q} && mkdir -p {dst_q}", user="root")
bottle.cp_in(f"{src}/.", f"{dst}/") bottle.cp_in(f"{src}/.", f"{dst}/")
bottle.exec(f"chown -R node:node {dst}", user="root") bottle.exec(f"chown -R node:node {dst_q}", user="root")
def provision_prompt(self, plan: "BottlePlan", bottle: "Bottle") -> str | None: def provision_prompt(self, plan: "BottlePlan", bottle: "Bottle") -> str | None:
prompt_path = _prompt_path(plan.guest_home) prompt_path = _prompt_path(plan.guest_home)
@@ -311,6 +315,9 @@ class PiAgentProvider(AgentProvider):
) -> None: ) -> None:
del plan, bottle, supervise_url del plan, bottle, supervise_url
def headless_prompt(self, prompt: str) -> list[str]:
return ["-p", prompt]
def _exec(bottle: "Bottle", script: str, error: str) -> None: def _exec(bottle: "Bottle", script: str, error: str) -> None:
result = bottle.exec(script, user="root") result = bottle.exec(script, user="root")
+12 -122
View File
@@ -62,15 +62,25 @@ from dataclasses import dataclass, field, replace
from pathlib import Path from pathlib import Path
from typing import Mapping from typing import Mapping
from .log import warn
from .manifest_util import ManifestError, as_json_object from .manifest_util import ManifestError, as_json_object
from .manifest_agent import ManifestAgent, ManifestAgentProvider from .manifest_agent import ManifestAgent, ManifestAgentProvider
from .manifest_bottle import ManifestBottle
from .manifest_egress import ( from .manifest_egress import (
EGRESS_AUTH_SCHEMES, EGRESS_AUTH_SCHEMES,
ManifestEgressConfig, ManifestEgressConfig,
ManifestEgressRoute, ManifestEgressRoute,
) )
from .manifest_git import ManifestGitEntry, ManifestGitUser, ManifestKeyConfig, parse_git_gate_config from .manifest_extends import merge_bottles_runtime, resolve_bottles
from .manifest_schema import BOTTLE_KEYS from .manifest_git import ManifestGitEntry, ManifestGitUser, ManifestKeyConfig
from .manifest_loader import (
check_stale_json,
load_bottle_chain_from_dir,
scan_agent_names,
scan_bottle_names,
)
from .manifest_schema import validate_agent_frontmatter_keys
from .yaml_subset import YamlSubsetError, parse_frontmatter
# Re-export everything that callers currently import from this module. # Re-export everything that callers currently import from this module.
__all__ = [ __all__ = [
@@ -89,10 +99,6 @@ __all__ = [
] ]
def _empty_str_dict() -> dict[str, str]:
return {}
def _section_dict(value: object, label: str) -> dict[str, object]: def _section_dict(value: object, label: str) -> dict[str, object]:
"""Like as_json_object but treats absent/null as an empty section.""" """Like as_json_object but treats absent/null as an empty section."""
if value is None: if value is None:
@@ -100,107 +106,6 @@ def _section_dict(value: object, label: str) -> dict[str, object]:
return as_json_object(value, label) return as_json_object(value, label)
@dataclass(frozen=True)
class ManifestBottle:
env: Mapping[str, str] = field(default_factory=_empty_str_dict)
agent_provider: ManifestAgentProvider = field(default_factory=ManifestAgentProvider)
git: tuple[ManifestGitEntry, ...] = ()
# Per-bottle git identity (issue #86). Empty default — bottles
# that don't set `git-gate.user:` in the manifest skip the
# `git config --global` step entirely. A bottle can declare a user
# identity without any git-gate.repos upstreams, and vice versa.
git_user: ManifestGitUser = field(default_factory=ManifestGitUser)
egress: ManifestEgressConfig = field(default_factory=ManifestEgressConfig)
# Per-bottle stuck-recovery sidecar (PRD 0013). When true (the
# default, issue #249), the launch step brings up a supervise
# sidecar that exposes egress MCP tools to the agent. Set
# `supervise: false` to skip the sidecar.
supervise: bool = True
@classmethod
def from_dict(cls, name: str, raw: object) -> "ManifestBottle":
d = as_json_object(raw, f"bottle '{name}'")
if "runtime" in d:
raise ManifestError(
f"bottle '{name}' has a 'runtime' field, which is no longer "
f"supported. gVisor (runsc) is now auto-detected by the "
f"backend; remove the 'runtime' field from the bottle "
f"definition."
)
if "ssh" in d:
raise ManifestError(
f"bottle '{name}' has an 'ssh' field, which has been removed "
f"(PRD 0009). Declare upstreams under 'git-gate.repos' with "
f"url + identity + host_key; the git-gate sidecar (PRD 0008) "
f"holds the credential and gitleaks-scans pushes."
)
if "git" in d:
raise ManifestError(
f"bottle '{name}' uses 'git' which has been replaced by "
f"'git-gate' (PRD 0047). Move git.user → git-gate.user "
f"and git.remotes → git-gate.repos (fields: url, identity, host_key)."
)
if "git_user" in d:
raise ManifestError(
f"bottle '{name}' has a 'git_user' field, which has been "
f"removed. Move it under 'git-gate.user'."
)
unknown = set(d.keys()) - BOTTLE_KEYS
if unknown:
allowed = ", ".join(sorted(BOTTLE_KEYS))
raise ManifestError(
f"bottle '{name}' has unknown key(s) {sorted(unknown)}; "
f"allowed keys are {allowed}."
)
env: dict[str, str] = {}
env_raw = d.get("env")
if env_raw is not None:
env_dict = as_json_object(env_raw, f"bottle '{name}' env")
for var, value in env_dict.items():
if not isinstance(value, str):
raise ManifestError(
f"env entry {var} in bottle '{name}' must be a JSON string "
f"(was {type(value).__name__}). Use \"?<message>\" for prompt-at-runtime."
)
env[var] = value
git: tuple[ManifestGitEntry, ...] = ()
git_user = ManifestGitUser()
git_raw = d.get("git-gate")
if git_raw is not None:
git, git_user = parse_git_gate_config(name, git_raw)
agent_provider = (
ManifestAgentProvider.from_dict(name, d["agent_provider"])
if "agent_provider" in d
else ManifestAgentProvider()
)
egress = (
ManifestEgressConfig.from_dict(name, d["egress"])
if "egress" in d
else ManifestEgressConfig()
)
supervise_raw = d.get("supervise", True)
if not isinstance(supervise_raw, bool):
raise ManifestError(
f"bottle '{name}' supervise must be a boolean "
f"(was {type(supervise_raw).__name__})"
)
return cls(
env=env, agent_provider=agent_provider, git=git,
git_user=git_user, egress=egress, supervise=supervise_raw,
)
def _merge_git_user( def _merge_git_user(
agent_user: ManifestGitUser, base_user: ManifestGitUser agent_user: ManifestGitUser, base_user: ManifestGitUser
) -> ManifestGitUser: ) -> ManifestGitUser:
@@ -237,8 +142,6 @@ def _resolve_effective_bottle_eager(
When bottle_names is non-empty they are merged in order. When empty, falls When bottle_names is non-empty they are merged in order. When empty, falls
back to agent.bottle. Raises ManifestError when neither is set.""" back to agent.bottle. Raises ManifestError when neither is set."""
from .manifest_extends import merge_bottles_runtime
if bottle_names: if bottle_names:
resolved: list[ManifestBottle] = [] resolved: list[ManifestBottle] = []
for bn in bottle_names: for bn in bottle_names:
@@ -270,9 +173,6 @@ def _resolve_effective_bottle_lazy(
When bottle_names is non-empty they are resolved from disk and merged in When bottle_names is non-empty they are resolved from disk and merged in
order. When empty, falls back to agent_bottle. Raises ManifestError when order. When empty, falls back to agent_bottle. Raises ManifestError when
neither is set.""" neither is set."""
from .manifest_extends import merge_bottles_runtime
from .manifest_loader import load_bottle_chain_from_dir
if bottle_names: if bottle_names:
resolved = [load_bottle_chain_from_dir(bn, bottles_dir) for bn in bottle_names] resolved = [load_bottle_chain_from_dir(bn, bottles_dir) for bn in bottle_names]
return merge_bottles_runtime(resolved) return merge_bottles_runtime(resolved)
@@ -358,8 +258,6 @@ class ManifestIndex:
home_md = home_dir / ".bot-bottle" home_md = home_dir / ".bot-bottle"
cwd_md = cwd_dir / ".bot-bottle" cwd_md = cwd_dir / ".bot-bottle"
from .manifest_loader import check_stale_json
check_stale_json(home_dir, home_md, "$HOME") check_stale_json(home_dir, home_md, "$HOME")
if cwd_dir.resolve() != home_dir.resolve(): if cwd_dir.resolve() != home_dir.resolve():
check_stale_json(cwd_dir, cwd_md, "$CWD") check_stale_json(cwd_dir, cwd_md, "$CWD")
@@ -399,7 +297,6 @@ class ManifestIndex:
files = sorted(stale_bottles.glob("*.md")) files = sorted(stale_bottles.glob("*.md"))
if files: if files:
names = ", ".join(p.name for p in files) names = ", ".join(p.name for p in files)
from .log import warn
warn( warn(
f"ignoring bottle file(s) under " f"ignoring bottle file(s) under "
f"{stale_bottles}: {names}. Bottles can only " f"{stale_bottles}: {names}. Bottles can only "
@@ -421,7 +318,6 @@ class ManifestIndex:
raw_bottles: dict[str, dict[str, object]] = {} raw_bottles: dict[str, dict[str, object]] = {}
for n, b in raw_bottles_obj.items(): for n, b in raw_bottles_obj.items():
raw_bottles[n] = as_json_object(b, f"bottle '{n}'") raw_bottles[n] = as_json_object(b, f"bottle '{n}'")
from .manifest_extends import resolve_bottles
bottles = resolve_bottles(raw_bottles) bottles = resolve_bottles(raw_bottles)
@@ -439,7 +335,6 @@ class ManifestIndex:
filenames without reading their content. In eager mode (from filenames without reading their content. In eager mode (from
from_json_obj) it returns the pre-parsed bottles' names.""" from_json_obj) it returns the pre-parsed bottles' names."""
if self.home_md is not None: if self.home_md is not None:
from .manifest_loader import scan_bottle_names
return scan_bottle_names(self.home_md / "bottles") return scan_bottle_names(self.home_md / "bottles")
return sorted(self.bottles.keys()) return sorted(self.bottles.keys())
@@ -451,7 +346,6 @@ class ManifestIndex:
filenames without reading their content. In eager mode (from filenames without reading their content. In eager mode (from
from_json_obj) it returns the pre-parsed agents' names.""" from_json_obj) it returns the pre-parsed agents' names."""
if self.home_md is not None: if self.home_md is not None:
from .manifest_loader import scan_agent_names
home_names = set(scan_agent_names(self.home_md / "agents").keys()) home_names = set(scan_agent_names(self.home_md / "agents").keys())
cwd_names: set[str] = set() cwd_names: set[str] = set()
if self.cwd_md is not None: if self.cwd_md is not None:
@@ -509,10 +403,6 @@ class ManifestIndex:
"""Lazy path (resolve/from_md_dirs): read and parse the agent file and """Lazy path (resolve/from_md_dirs): read and parse the agent file and
its bottle chain from disk for the first time here.""" its bottle chain from disk for the first time here."""
assert self.home_md is not None # guaranteed by load_for_agent dispatch assert self.home_md is not None # guaranteed by load_for_agent dispatch
from .manifest_loader import scan_agent_names
from .manifest_schema import validate_agent_frontmatter_keys
from .yaml_subset import YamlSubsetError, parse_frontmatter
# Locate the agent file; cwd wins over home on name collision. # Locate the agent file; cwd wins over home on name collision.
home_agents = scan_agent_names(self.home_md / "agents") home_agents = scan_agent_names(self.home_md / "agents")
cwd_agents: dict[str, Path] = {} cwd_agents: dict[str, Path] = {}
+11 -1
View File
@@ -8,7 +8,7 @@ from typing import cast
from .agent_provider import PROVIDER_TEMPLATES from .agent_provider import PROVIDER_TEMPLATES
from .manifest_util import ManifestError, as_json_object from .manifest_util import ManifestError, as_json_object
from .manifest_git import ManifestGitUser from .manifest_git import ManifestGitUser
from .manifest_schema import AGENT_MODEL_KEYS from .manifest_schema import AGENT_MODEL_KEYS, is_valid_entity_name
@dataclass(frozen=True) @dataclass(frozen=True)
@@ -161,6 +161,16 @@ class ManifestAgent:
f"agent '{name}' skills[{i}] must be a string " f"agent '{name}' skills[{i}] must be a string "
f"(was {type(skill).__name__})" f"(was {type(skill).__name__})"
) )
# Skill names become host/guest path segments and are
# interpolated into provisioning shell commands, so they
# must fit the same kebab-case convention as bottle/agent
# filenames — rejecting anything that could break out of a
# path segment or inject shell metacharacters.
if not is_valid_entity_name(skill):
raise ManifestError(
f"agent '{name}' skills[{i}] {skill!r} is not a valid "
f"skill name; must match [a-z][a-z0-9-]*"
)
collected.append(skill) collected.append(skill)
skills = tuple(collected) skills = tuple(collected)
+129
View File
@@ -0,0 +1,129 @@
"""The `ManifestBottle` value type.
Split out of `manifest.py` so the `extends:`/loader resolvers can import it
without a circular dependency: `manifest.py` imports those resolvers, while
they only need this value type. Everything here depends on leaf modules
(`manifest_util`, `manifest_agent`, `manifest_egress`, `manifest_git`,
`manifest_schema`), so this module sits at the bottom of the manifest layer.
`manifest.py` re-exports `ManifestBottle`, so existing
`from .manifest import ManifestBottle` callers are unaffected.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Mapping
from .manifest_util import ManifestError, as_json_object
from .manifest_agent import ManifestAgentProvider
from .manifest_egress import ManifestEgressConfig
from .manifest_git import ManifestGitEntry, ManifestGitUser, parse_git_gate_config
from .manifest_schema import BOTTLE_KEYS
__all__ = ["ManifestBottle"]
def _empty_str_dict() -> dict[str, str]:
return {}
@dataclass(frozen=True)
class ManifestBottle:
env: Mapping[str, str] = field(default_factory=_empty_str_dict)
agent_provider: ManifestAgentProvider = field(default_factory=ManifestAgentProvider)
git: tuple[ManifestGitEntry, ...] = ()
# Per-bottle git identity (issue #86). Empty default — bottles
# that don't set `git-gate.user:` in the manifest skip the
# `git config --global` step entirely. A bottle can declare a user
# identity without any git-gate.repos upstreams, and vice versa.
git_user: ManifestGitUser = field(default_factory=ManifestGitUser)
egress: ManifestEgressConfig = field(default_factory=ManifestEgressConfig)
# Per-bottle stuck-recovery sidecar (PRD 0013). When true (the
# default, issue #249), the launch step brings up a supervise
# sidecar that exposes egress MCP tools to the agent. Set
# `supervise: false` to skip the sidecar.
supervise: bool = True
@classmethod
def from_dict(cls, name: str, raw: object) -> "ManifestBottle":
d = as_json_object(raw, f"bottle '{name}'")
if "runtime" in d:
raise ManifestError(
f"bottle '{name}' has a 'runtime' field, which is no longer "
f"supported. gVisor (runsc) is now auto-detected by the "
f"backend; remove the 'runtime' field from the bottle "
f"definition."
)
if "ssh" in d:
raise ManifestError(
f"bottle '{name}' has an 'ssh' field, which has been removed "
f"(PRD 0009). Declare upstreams under 'git-gate.repos' with "
f"url + identity + host_key; the git-gate sidecar (PRD 0008) "
f"holds the credential and gitleaks-scans pushes."
)
if "git" in d:
raise ManifestError(
f"bottle '{name}' uses 'git' which has been replaced by "
f"'git-gate' (PRD 0047). Move git.user → git-gate.user "
f"and git.remotes → git-gate.repos (fields: url, identity, host_key)."
)
if "git_user" in d:
raise ManifestError(
f"bottle '{name}' has a 'git_user' field, which has been "
f"removed. Move it under 'git-gate.user'."
)
unknown = set(d.keys()) - BOTTLE_KEYS
if unknown:
allowed = ", ".join(sorted(BOTTLE_KEYS))
raise ManifestError(
f"bottle '{name}' has unknown key(s) {sorted(unknown)}; "
f"allowed keys are {allowed}."
)
env: dict[str, str] = {}
env_raw = d.get("env")
if env_raw is not None:
env_dict = as_json_object(env_raw, f"bottle '{name}' env")
for var, value in env_dict.items():
if not isinstance(value, str):
raise ManifestError(
f"env entry {var} in bottle '{name}' must be a JSON string "
f"(was {type(value).__name__}). Use \"?<message>\" for prompt-at-runtime."
)
env[var] = value
git: tuple[ManifestGitEntry, ...] = ()
git_user = ManifestGitUser()
git_raw = d.get("git-gate")
if git_raw is not None:
git, git_user = parse_git_gate_config(name, git_raw)
agent_provider = (
ManifestAgentProvider.from_dict(name, d["agent_provider"])
if "agent_provider" in d
else ManifestAgentProvider()
)
egress = (
ManifestEgressConfig.from_dict(name, d["egress"])
if "egress" in d
else ManifestEgressConfig()
)
supervise_raw = d.get("supervise", True)
if not isinstance(supervise_raw, bool):
raise ManifestError(
f"bottle '{name}' supervise must be a boolean "
f"(was {type(supervise_raw).__name__})"
)
return cls(
env=env, agent_provider=agent_provider, git=git,
git_user=git_user, egress=egress, supervise=supervise_raw,
)
+4 -28
View File
@@ -2,11 +2,10 @@
from __future__ import annotations from __future__ import annotations
from typing import TYPE_CHECKING from .manifest_bottle import ManifestBottle
from .manifest_egress import ManifestEgressConfig, validate_egress_routes
if TYPE_CHECKING: from .manifest_git import ManifestGitUser, parse_git_gate_config
from .manifest import ManifestBottle from .manifest_util import ManifestError, as_json_object
from .manifest_egress import ManifestEgressConfig
def merge_bottles_runtime(bottles: "list[ManifestBottle]") -> "ManifestBottle": def merge_bottles_runtime(bottles: "list[ManifestBottle]") -> "ManifestBottle":
@@ -27,9 +26,6 @@ def merge_bottles_runtime(bottles: "list[ManifestBottle]") -> "ManifestBottle":
def _merge_two_bottles_runtime(base: "ManifestBottle", override: "ManifestBottle") -> "ManifestBottle": def _merge_two_bottles_runtime(base: "ManifestBottle", override: "ManifestBottle") -> "ManifestBottle":
from .manifest import ManifestBottle, ManifestGitUser
from .manifest_egress import ManifestEgressConfig
merged_env = {**base.env, **override.env} merged_env = {**base.env, **override.env}
merged_git_user = ManifestGitUser( merged_git_user = ManifestGitUser(
@@ -81,8 +77,6 @@ def _resolve_one_bottle(
repos_cache: dict[str, dict[str, object]], repos_cache: dict[str, dict[str, object]],
seen: tuple[str, ...], seen: tuple[str, ...],
) -> ManifestBottle: ) -> ManifestBottle:
from .manifest import ManifestBottle, ManifestError
if name in cache: if name in cache:
return cache[name] return cache[name]
if name in seen: if name in seen:
@@ -174,11 +168,6 @@ def _fold_two_bottles(
later_repos_raw: dict[str, object], later_repos_raw: dict[str, object],
) -> tuple[ManifestBottle, dict[str, object]]: ) -> tuple[ManifestBottle, dict[str, object]]:
"""Combine two resolved parent bottles; later wins over earlier.""" """Combine two resolved parent bottles; later wins over earlier."""
from .manifest import ManifestBottle, ManifestGitUser
from .manifest_egress import ManifestEgressConfig
from .manifest_git import parse_git_gate_config
from .manifest_util import as_json_object
merged_env = {**earlier.env, **later.env} merged_env = {**earlier.env, **later.env}
merged_git_user = ManifestGitUser( merged_git_user = ManifestGitUser(
@@ -227,10 +216,6 @@ def _merge_bottles(
name: str, name: str,
) -> ManifestBottle: ) -> ManifestBottle:
"""Apply PRD 0025 merge rules.""" """Apply PRD 0025 merge rules."""
from .manifest import ManifestBottle, ManifestGitUser
from .manifest_egress import validate_egress_routes
from .manifest_util import as_json_object
# git-gate.repos: when the child declares repos, inject the already # git-gate.repos: when the child declares repos, inject the already
# name-merged repo set (computed by _resolve_repos_raw) so the child # name-merged repo set (computed by _resolve_repos_raw) so the child
# parses with the full inherited+overridden list (issue #237). # parses with the full inherited+overridden list (issue #237).
@@ -303,8 +288,6 @@ def _resolve_repos_raw(
inherits the parent's set verbatim; an explicit empty dict clears it. inherits the parent's set verbatim; an explicit empty dict clears it.
Otherwise parent and child unite by name, with same-name entries Otherwise parent and child unite by name, with same-name entries
field-merged (parent fields are defaults, child fields win).""" field-merged (parent fields are defaults, child fields win)."""
from .manifest_util import as_json_object
if not _child_declares_git_gate_repos(child_raw): if not _child_declares_git_gate_repos(child_raw):
return parent_repos return parent_repos
child_repos = _declared_repos_raw(child_raw) child_repos = _declared_repos_raw(child_raw)
@@ -324,8 +307,6 @@ def _resolve_repos_raw(
def _declared_repos_raw(child_raw: dict[str, object]) -> dict[str, object]: def _declared_repos_raw(child_raw: dict[str, object]) -> dict[str, object]:
"""Return the child's explicitly declared git-gate.repos as raw dicts, """Return the child's explicitly declared git-gate.repos as raw dicts,
or an empty dict when none are declared.""" or an empty dict when none are declared."""
from .manifest_util import as_json_object
if not _child_declares_git_gate_repos(child_raw): if not _child_declares_git_gate_repos(child_raw):
return {} return {}
git_raw = as_json_object(child_raw.get("git-gate", {}), "child git-gate") git_raw = as_json_object(child_raw.get("git-gate", {}), "child git-gate")
@@ -333,8 +314,6 @@ def _declared_repos_raw(child_raw: dict[str, object]) -> dict[str, object]:
def _child_declares_git_gate_repos(child_raw: dict[str, object]) -> bool: def _child_declares_git_gate_repos(child_raw: dict[str, object]) -> bool:
from .manifest_util import as_json_object
git_raw = child_raw.get("git-gate") git_raw = child_raw.get("git-gate")
if git_raw is None: if git_raw is None:
return False return False
@@ -347,9 +326,6 @@ def _merge_egress(
child: ManifestEgressConfig, child: ManifestEgressConfig,
child_raw: dict[str, object], child_raw: dict[str, object],
) -> ManifestEgressConfig: ) -> ManifestEgressConfig:
from .manifest_egress import ManifestEgressConfig
from .manifest_util import as_json_object
child_egress_raw = as_json_object(child_raw.get("egress"), "child egress") child_egress_raw = as_json_object(child_raw.get("egress"), "child egress")
routes = parent.routes + child.routes routes = parent.routes + child.routes
log = child.Log if "log" in child_egress_raw else parent.Log log = child.Log if "log" in child_egress_raw else parent.Log
+2 -6
View File
@@ -3,9 +3,10 @@
from __future__ import annotations from __future__ import annotations
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING
from .log import warn from .log import warn
from .manifest_bottle import ManifestBottle
from .manifest_extends import resolve_bottles
from .manifest_schema import ( from .manifest_schema import (
entity_name_from_path, entity_name_from_path,
validate_bottle_frontmatter_keys, validate_bottle_frontmatter_keys,
@@ -13,9 +14,6 @@ from .manifest_schema import (
from .manifest_util import ManifestError from .manifest_util import ManifestError
from .yaml_subset import YamlSubsetError, parse_frontmatter from .yaml_subset import YamlSubsetError, parse_frontmatter
if TYPE_CHECKING:
from .manifest import ManifestBottle
def check_stale_json(dir_path: Path, md_dir: Path, label: str) -> None: def check_stale_json(dir_path: Path, md_dir: Path, label: str) -> None:
"""Die if `<dir_path>/bot-bottle.json` exists but `md_dir` does """Die if `<dir_path>/bot-bottle.json` exists but `md_dir` does
@@ -78,8 +76,6 @@ def load_bottle_chain_from_dir(
Only the files in the extends chain are read unrelated bottle files Only the files in the extends chain are read unrelated bottle files
are never touched. Raises ManifestError on parse or validation failure.""" are never touched. Raises ManifestError on parse or validation failure."""
from .manifest_extends import resolve_bottles
raws: dict[str, dict[str, object]] = {} raws: dict[str, dict[str, object]] = {}
to_load = [bottle_name] to_load = [bottle_name]
while to_load: while to_load:
+8 -1
View File
@@ -33,13 +33,20 @@ AGENT_KEYS = (
AGENT_MODEL_KEYS = AGENT_KEYS | frozenset({"prompt"}) AGENT_MODEL_KEYS = AGENT_KEYS | frozenset({"prompt"})
def is_valid_entity_name(name: str) -> bool:
"""True if `name` fits the kebab-case `[a-z][a-z0-9-]*` convention
shared by bottle/agent filenames and skill names. Names that satisfy
this are also safe to interpolate into a host/guest path segment."""
return bool(_FILENAME_RX.match(name))
def entity_name_from_path(path: Path) -> str | None: def entity_name_from_path(path: Path) -> str | None:
"""Return the entity name implied by the filename, or None if the """Return the entity name implied by the filename, or None if the
filename does not fit the [a-z][a-z0-9-]* convention.""" filename does not fit the [a-z][a-z0-9-]* convention."""
if path.suffix != ".md": if path.suffix != ".md":
return None return None
stem = path.stem stem = path.stem
if not _FILENAME_RX.match(stem): if not is_valid_entity_name(stem):
return None return None
return stem return stem
@@ -0,0 +1,439 @@
# PRD prd-new: Forge native integration
- **Status:** Draft
- **Author:** claude
- **Created:** 2026-06-29
- **Issue:** #317
## Summary
Add a webhook-driven orchestration layer that lets Gitea issues and PR comments
drive bot-bottle sessions end-to-end with no operator in the loop for the happy
path. An issue assigned to a member of the configured agent org and labelled
with an agent name triggers a headless bottle launch; the bottle processes the
issue, opens a PR, and interacts with the forge through a **forge sidecar**
the agent never touches the Gitea API or its credentials directly. The agent
calls `signal_done(status, summary)` on the sidecar when a work unit is
complete; the sidecar relays that to the orchestrator over a queue dir (the same
pattern as the supervise sidecar), so completion is an unambiguous in-band
signal rather than a comment the orchestrator has to parse. The orchestrator
freezes the bottle. Subsequent PR comments rehydrate the frozen bottle. The
bottle is destroyed when the PR closes.
The forge sidecar is backed by a `Forge` abstract class with per-provider
implementations (Gitea first), so the agent's prompts and the sidecar protocol
stay forge-agnostic. The sidecar logs forge operations semantically ("read PR
description", "posted comment", "signalled done"), giving richer provenance than
post-hoc egress-byte parsing, and enforces a **read-anywhere / write-scoped**
permission model: the agent may read for context but may only write to the
issue and PRs it was assigned.
Run provenance is exposed through a **provenance API** (the sidecar's structured
operation log plus the run's metadata), not posted back into the forge. We do
not surface a provenance footer in the PR — the audit record lives behind the
API where it can be retained and queried, rather than as an editable comment.
The separation of concerns across the two layers: bot-bottle owns the headless
launch primitives, the forge sidecar + `Forge` abstraction, and forge state.
`bot-bottle-orchestrator` (separate binary) owns the webhook listener, bottle
lifecycle loop, and monitoring dashboard; it calls into bot-bottle via
`./cli.py orchestrate`, a thin wrapper command. This PRD covers bot-bottle's
side of that contract.
## Problem
Today an operator must open the TUI, select an agent and bottle, confirm the
preflight, and type prompts interactively. This blocks "issue → PR" automation
and produces no durable audit record of what the agent did. The security model
already provides the right isolation and egress controls, and `start --headless`
(#315) already gives `bot-bottle-orchestrator` a non-interactive launch path.
The missing pieces are a headless `resume` counterpart for rehydrating frozen
bottles, a forge-interaction surface the agent uses to read context, post
comments, and signal completion, and the provenance trail that makes the audit
story legible to reviewers on every PR.
That forge-interaction surface could be built two ways: (2) give the agent the
Gitea API directly with cred-proxy injecting the token, or (3) put a forge
sidecar between the agent and the forge. This PRD takes **option 3**. The
deciding factors: a sidecar `signal_done` call is an unambiguous completion
signal where comment-parsing is a correctness risk that surfaces in production;
the sidecar produces a semantic audit trail rather than HTTP bytes, which is
load-bearing for provenance (the stated product priority); and the sidecar can
enforce scope tighter than repo-wide API-key permissions, reducing blast radius
for a prompt-injected agent. The costs — a second sidecar process per forge run,
a new failure mode if it crashes, and per-forge implementation cost — are
accepted as the price of those properties.
## Goals / Success Criteria
1. Headless launch already exists: `./cli.py start <agent> --headless --prompt`
(#315) runs non-interactively with no TUI selectors or y/N preflight. This
PRD builds on it rather than re-introducing it. The remaining gap is a
matching headless `resume` path (`./cli.py resume --headless`), since
rehydrating a frozen bottle for a new prompt is required by the freeze /
rehydrate loop and `resume` has no non-interactive entry point today.
2. An issue assigned to a member of the configured org (`FORGE_ORG`, default
`bot-bottle`) and labelled `bot-bottle:<agent-name>` is the trigger
convention. Org membership is verified via the Gitea API at event time.
3. Forge-targeted bottles run a **forge sidecar** that exposes a small,
forge-agnostic API (comment/issue/PR CRUD plus `signal_done`) over the same
queue-dir + HTTP/JSON-RPC machinery as the supervise sidecar. The agent calls
the sidecar; it never sees the forge token or forge-specific endpoints.
4. The sidecar is backed by a `Forge` abstract class. Gitea is the first
concrete implementation; adding a forge means a new subclass, not changes to
the agent prompt or sidecar protocol. The sidecar enforces a read-anywhere /
write-scoped model: writes are limited to the assigned issue and its PRs;
reads are unrestricted for context.
5. The agent calls `signal_done(status, summary)` on the sidecar when a work
unit is complete; the sidecar relays it to the orchestrator over a queue dir.
This is the done signal — no comment parsing. A watchdog timeout
(configurable, default 30 min) causes the orchestrator to treat the run as
done-without-self-report if the agent exits without signalling.
6. Run provenance (agent name, bottle name(s), slug, timing, exit code,
gitleaks result, egress summary, and the sidecar's semantic operation log)
is available through a provenance API. It is **not** surfaced as a PR footer
or any other forge comment.
7. Forge state (issue → slug, status) is persisted in a local SQLite database
under `~/.bot-bottle/` and survives orchestrator restarts.
8. `./cli.py orchestrate status` lists active forge-managed bottles and their
issue/PR URLs.
9. Unit tests cover: label parsing, org-membership check path, forge state
store CRUD (SQLite), headless launch arg construction, forge env var
injection, sidecar request dispatch through the `Forge` abstraction,
write-scope enforcement (reject writes outside the assigned issue/PRs), and
`signal_done` queue relay.
## Non-goals
- Webhook signature verification (HMAC-SHA256). Added as a follow-up.
- The `bot-bottle-orchestrator` binary itself — this PRD covers bot-bottle's
side of the interface only. The orchestrator is a separate project.
- GitHub or GitLab support.
- Multiple simultaneous forge bottles per issue.
- Automatic retry on agent error exit.
- Bottle destruction on issue close (PR close only; issue close is ambiguous).
- Concurrent multi-issue handling (one blocking run per orchestrator process).
- A monitoring dashboard (orchestrator-side concern).
- Folding `DeployKeyProvisioner` into the `Forge` abstraction. Deploy-key
provisioning runs at bottle-provision time on the host; the forge sidecar runs
inside the bottle at agent time. The two have different lifecycles and actors,
so coupling them into one class is deferred to a follow-up. This PRD only
shares the Gitea HTTP client between them.
## Design
### Targeting convention
An issue is forge-targeted when **both** hold:
- At least one assignee is a member of the Gitea org named by `FORGE_ORG`
(default `bot-bottle`). Checked via `GET /api/v1/orgs/{org}/members/{user}`.
- At least one label has the prefix `bot-bottle:`. The suffix names the agent
manifest, e.g. `bot-bottle:implementer` → agent `implementer`.
`FORGE_ORG` is read at orchestrate-command startup. It is not embedded in
manifests or state files; the orchestrator stamps its value into log output for
auditability.
An optional label `bot-bottle-bottle:<name>` overrides bottle selection. When
absent the agent's default bottle is used.
### `./cli.py orchestrate` — the thin wrapper
```
./cli.py orchestrate start --agent AGENT [--bottle BOTTLE ...] --prompt PROMPT
[--label LABEL] [--backend BACKEND]
./cli.py orchestrate resume --slug SLUG --prompt PROMPT [--backend BACKEND]
./cli.py orchestrate status
```
`orchestrate start` is a thin shim over the already-shipped `start --headless`
(#315): it forwards agent / bottle / label / prompt and adds the forge-specific
wiring (`forge_env`, sidecar launch). It does not re-implement headless launch.
The caller (`bot-bottle-orchestrator`) manages freeze, state, and the forge
sidecar's done signal around it.
`orchestrate resume` is the shim over the new `resume --headless` (below).
`orchestrate status` prints the forge state table.
### Headless primitives — what exists vs. what's new
Headless **start** already shipped in #315 and this PRD reuses it as-is:
- `./cli.py start <agent> --headless --prompt TEXT` — no TUI selectors, no y/N
preflight. Internally `_start_headless()` calls the shared `_launch_bottle()`
with `assume_yes=True` and `headless_prompt_text=prompt`.
- The prompt is delivered through `AgentProvider.headless_prompt(prompt)`
claude `-p`, codex positional, pi `-p`. The orchestrator does **not** hand-roll
agent args; it relies on this provider abstraction. (An earlier draft proposed
`start_headless` / `attach_agent_headless` helpers that constructed
`--no-interactive`/`-p` directly — those are dropped as redundant with, and
divergent from, what #315 merged.)
Two additions are needed on top of #315:
**1. A `forge_env` hook on the headless launch path.** The orchestrator needs to
pass forge context + token through to the forge sidecar launched alongside the
agent. This is a parameter threaded into `_launch_bottle` (the same core
`start --headless` already uses), not a parallel launch function. The agent
process itself does not receive the token.
**2. `resume --headless`** — new in `bot_bottle/cli/resume.py`, mirroring the
`--headless` flag on `start`:
```
./cli.py resume <slug> --headless --prompt TEXT
```
It rehydrates a frozen bottle and runs one headless prompt via the same
`assume_yes` + `headless_prompt` path, returning the agent's exit code. `resume`
has no non-interactive entry point today, so this is genuinely new work rather
than a rename of an existing helper.
### Forge sidecar
Forge-targeted bottles run a forge sidecar alongside the agent, mirroring the
supervise sidecar: a per-bottle process that exposes an HTTP/JSON-RPC endpoint
over a Unix socket and relays events to the orchestrator through a queue dir.
The agent calls the sidecar; the sidecar holds the forge token and makes the
actual forge API calls. The agent never receives the credential and never sees a
forge-specific endpoint — swapping Gitea for another forge does not change the
agent prompt or the sidecar protocol.
The sidecar is configured at launch from the forge context (owner, repo, issue,
PR) and the token, supplied by the orchestrator — not baked into the agent
manifest. Because the sidecar owns the token, forge traffic does not need a
cred-proxy egress route on the agent; the agent's egress policy is unchanged by
forge targeting.
**Sidecar protocol** (forge-agnostic; each method maps to a `Forge` call):
| Method | Scope | Purpose |
|---|---|---|
| `read_issue(number)` | read-anywhere | Read an issue body for context |
| `read_pr(number)` | read-anywhere | Read a PR (incl. merge state) for context |
| `read_comments(number)` | read-anywhere | Read a thread for context |
| `post_comment(number, body)` | write-scoped | Post to the assigned issue/PR |
| `update_description(number, body)` | write-scoped | Edit the assigned issue/PR body |
| `signal_done(status, summary)` | — | Relay completion to the orchestrator |
Issues and PRs are distinct domain objects (`Issue` vs `PullRequest`) read
through distinct methods; a PR carries merge state an issue does not.
**Scope enforcement** is read-anywhere / write-scoped: read methods accept any
issue/PR number for context; write methods are rejected unless the target is the
assigned issue or one of its PRs. This is tighter than Gitea's repo-wide API-key
permissions and bounds the blast radius of a prompt-injected agent. Rejections
are logged semantically (operation, target, reason) so the audit trail records
attempted out-of-scope writes, not just allowed ones.
**Semantic audit**: every sidecar call is logged as a structured operation
("read PR #318 description", "posted comment to #317", "signalled done:
success") rather than as opaque HTTP bytes. This log feeds provenance directly,
with no post-hoc egress-log parsing.
### `Forge` abstraction — `bot_bottle/contrib/forge/`
The sidecar dispatches to a `Forge` abstract class. Each provider implements the
operations behind the sidecar protocol:
```python
class Forge(abc.ABC):
@abc.abstractmethod
def read_issue(self, number: int) -> Issue: ...
@abc.abstractmethod
def read_pr(self, number: int) -> PullRequest: ...
@abc.abstractmethod
def read_comments(self, number: int) -> list[Comment]: ...
@abc.abstractmethod
def post_comment(self, number: int, body: str) -> None: ...
@abc.abstractmethod
def update_description(self, number: int, body: str) -> None: ...
@abc.abstractmethod
def is_org_member(self, org: str, username: str) -> bool: ...
@abc.abstractmethod
def get_pr_for_issue(self, number: int) -> int | None: ...
@abc.abstractmethod
def is_pr_open(self, number: int) -> bool: ...
```
`Issue` and `PullRequest` are separate frozen dataclasses — a PR adds `merged`.
`ScopedForge` wraps a concrete `Forge` to enforce the read-anywhere /
write-scoped model (`post_comment` / `update_description` raise `ForgeScopeError`
outside the assigned issue and PRs).
`GiteaForge` is the first and only concrete implementation in this PRD. It wraps
the Gitea HTTP client (below). Adding GitHub or GitLab later is a new subclass;
the sidecar, protocol, and agent prompt are untouched.
> **Deferred:** `DeployKeyProvisioner` is *not* folded into `Forge` here.
> Deploy-key provisioning runs on the host at provision time; the sidecar runs
> in the bottle at agent time. They have different lifecycles and actors, so a
> shared abstract base would couple two unrelated auth contexts. For now they
> only share the Gitea HTTP client; a later PRD can revisit unification.
### Forge env vars
The orchestrator passes forge context to the **sidecar** (not the agent) at
launch. The agent does not need owner/repo/issue env vars to construct API
calls, since it only names issue/PR numbers to the sidecar:
| Var | Example | Purpose |
|---|---|---|
| `FORGE_GITEA_API` | `https://gitea.dideric.is/api/v1` | Base URL the sidecar calls |
| `FORGE_OWNER` | `didericis` | Repo owner |
| `FORGE_REPO` | `bot-bottle` | Repo name |
| `FORGE_ISSUE_NUMBER` | `317` | Assigned issue (defines write scope) |
| `FORGE_PR_NUMBER` | `318` | Assigned PR (empty until PR exists) |
The agent's forge-specific prompt instructs it to call `signal_done` on the
sidecar when a work unit is complete, and to use the sidecar for any
comment/description writes. The instruction is forge-agnostic and is part of the
forge prompt overlay, not the base agent manifest, so non-forge runs are
unaffected.
### Done signal and watchdog
The agent calls `signal_done(status, summary)` on the sidecar when it finishes a
work unit. The sidecar writes the event to its queue dir; the orchestrator reads
it and:
1. Reads the forge state for `(owner, repo, issue_number)`.
2. If `status == "running"`, treats the event as the done signal: freezes the
bottle and sets `status = "frozen"`. Provenance is recorded via the
provenance API — no comment is posted to the forge.
Because completion is an explicit `signal_done` call, the orchestrator does not
parse comment text to detect "done", and intermediate comments the agent posts
mid-run cannot be mistaken for completion.
**Watchdog**: the orchestrator tracks `last_checkin_at` in forge state, updated
on each sidecar event. A background thread wakes every minute. If
`now - last_checkin_at > FORGE_WATCHDOG_TIMEOUT` (default 30 min, configurable
via env) and `status == "running"`, the orchestrator treats the run as
done-without-self-report and freezes the bottle, flagging the run as incomplete
in the provenance record.
**Sidecar-death failure mode**: if the forge sidecar crashes mid-run the agent
loses forge access while the bottle is otherwise healthy. The orchestrator
detects a dead sidecar (socket/queue gone) the same way it detects a stalled
agent and falls back to the watchdog path.
### Forge state — `bot_bottle/contrib/gitea/forge_state.py`
State is stored in a local SQLite database at `~/.bot-bottle/bot-bottle.db`.
Access goes through a thin CRUD interface, `ForgeStateStore`, so the storage
location/engine can be swapped without touching callers. `SqliteForgeStateStore`
is the first implementation.
The `forge_state` table is keyed by `(owner, repo, issue_number)` and carries:
`slug`, `agent_name`, `bottle_names` (JSON), `backend_name`, `agent_git_user`,
`pr_number` (nullable), `status`, `last_checkin_at`.
`status`: `"running"` | `"frozen"` | `"destroyed"`.
Store interface:
```python
class ForgeStateStore(abc.ABC):
def upsert(self, state: ForgeState) -> None: ...
def get(self, owner: str, repo: str, issue_number: int) -> ForgeState | None: ...
def delete(self, owner: str, repo: str, issue_number: int) -> None: ...
def all(self) -> list[ForgeState]: ...
class SqliteForgeStateStore(ForgeStateStore):
def __init__(self, db_path: Path | None = None) -> None: ...
```
`upsert` uses `INSERT OR REPLACE` so a re-run for the same issue overwrites in
place. The schema is created on first open.
### Provenance API
Run provenance — agent, bottle(s), slug, timing, exit code, gitleaks result,
egress summary, watchdog-fired flag, and the sidecar's semantic operation log —
is exposed through a **provenance API**, not posted into the forge. There is no
provenance footer or run-summary comment.
The rationale (per the monetization positioning): a PR comment is mutable by any
maintainer, unsigned, and per-PR, so it is worthless as an audit record and
invites false trust. The authoritative record therefore lives behind the API,
where it can be retained, queried, and (eventually) signed. Whether any
projection of it ever appears in the forge is a separate, out-of-scope decision;
this PR does not build one.
The API surface itself (schema, transport, signing, retention) is **out of scope
for this PRD** and belongs with the orchestrator / control-plane work. bot-bottle
here only produces the raw material: the sidecar's semantic operation log and the
run metadata the orchestrator collects.
### Gitea HTTP client — `bot_bottle/contrib/gitea/client.py`
`GiteaForge` (and the existing `GiteaDeployKeyProvisioner`) share one thin HTTP
client. Unlike the option-2 design, the token is held by the sidecar process and
passed to the client directly — there is no agent-side cred-proxy route to
inject it, because the agent never makes forge calls.
```python
class GiteaClient:
def __init__(self, *, api_url: str, owner: str, repo: str, token: str) -> None: ...
def is_org_member(self, org: str, username: str) -> bool: ...
def get_issue(self, number: int) -> dict: ...
def get_comments(self, number: int) -> list[dict]: ...
def post_comment(self, number: int, body: str) -> None: ...
def patch_issue_body(self, number: int, body: str) -> None: ...
def get_pull(self, number: int) -> dict: ...
```
`GiteaForge` adapts this client to the `Forge` surface (mapping raw JSON to
`Issue` / `PullRequest` / `Comment`). Sharing only the HTTP client (not an
abstract base) is the deliberate boundary between the sidecar and the deploy-key
provisioner — see the deferral note under the `Forge` abstraction.
### Implementation chunks
1. **Headless additions on top of #315** — thread a `forge_env` parameter into
the existing `_launch_bottle` core (the one `start --headless` already uses);
add a `--headless` path to `cli/resume.py` reusing `assume_yes` +
`headless_prompt`. No new `start_headless`/`attach_agent_headless` helpers.
Tests: `forge_env` reaches the sidecar/`guest_env`; `resume --headless` skips
the TUI and y/N preflight and returns the agent exit code.
2. **Forge state**`contrib/gitea/forge_state.py`: `ForgeState` dataclass,
`ForgeStateStore` CRUD interface, `SqliteForgeStateStore`. Tests: round-trip,
missing → None, `INSERT OR REPLACE` upsert, delete idempotent, `all()`
ordering, persistence across store instances.
3. **`Forge` abstraction + Gitea client** — `contrib/forge/base.py` (`Forge`
ABC, `ScopedForge`, `Issue` / `PullRequest` / `Comment`) and
`contrib/gitea/client.py` + `GiteaForge`: `is_org_member`, `read_issue`,
`read_pr`, `read_comments`, `post_comment`, `update_description`,
`get_pr_for_issue`, `is_pr_open`. Tests: mock `urllib.request.urlopen`,
assert payloads and 404-as-false for membership; `ScopedForge` write-scope
enforcement.
4. **Forge sidecar** — sidecar process exposing the protocol over a Unix socket,
queue-dir relay, write-scope enforcement, semantic op log, `signal_done`.
Reuses the supervise sidecar bundle machinery. Tests: dispatch each method to
the `Forge`, reject out-of-scope writes, `signal_done` writes a queue event,
scope-rejection is logged.
5. **`./cli.py orchestrate`** — `cli/orchestrate.py` with `start`, `resume`,
`status` subcommands wired into `cli.py`; `start` launches the forge sidecar
alongside the agent for forge-targeted runs. Tests: arg parsing, `start`
delegates to `start --headless`, `resume` delegates to `resume --headless`.
## Provenance
Run provenance is captured (sidecar semantic operation log + run metadata) and
exposed through a provenance API. It is deliberately **not** surfaced in the
forge — no footer, no run-summary comment. A mutable, unsigned PR comment is not
an audit record; the authoritative record lives behind the API where it can be
retained and signed. The `watchdog_fired` flag marks runs where the agent did
not self-report completion so consumers of the API know the record may be
incomplete.
The provenance API's schema, transport, signing, and retention are out of scope
for this PRD (control-plane work); bot-bottle here produces the raw material
only.
+267
View File
@@ -0,0 +1,267 @@
"""Unit: bot_bottle public Python API (bot_bottle/__init__.py surface).
Covers start_headless, resume_headless, freeze, and destroy the four
operations the bot-bottle-orchestrator's ProgrammaticBottleRunner uses.
All I/O is stubbed so no container is created.
"""
from __future__ import annotations
import unittest
from unittest.mock import MagicMock, patch
from bot_bottle import BottleError, destroy, freeze, resume_headless, start_headless
# ---------------------------------------------------------------------------
# helpers
# ---------------------------------------------------------------------------
def _make_manifest(agent_name: str = "implementer", bottle_name: str = "claude"):
manifest = MagicMock()
manifest.agents = {agent_name: MagicMock(bottle=bottle_name)}
manifest.all_agent_names = [agent_name]
manifest.all_bottle_names = [bottle_name]
manifest.home_md = None # eager mode — _peek_agent_bottle uses agents dict
manifest.require_agent = MagicMock(return_value=None)
return manifest
def _metadata(
slug: str = "implementer-abc12",
agent_name: str = "implementer",
backend: str = "docker",
):
md = MagicMock()
md.identity = slug
md.agent_name = agent_name
md.cwd = "/repo"
md.copy_cwd = False
md.bottle_names = ["claude"]
md.backend = backend
return md
# ---------------------------------------------------------------------------
# start_headless
# ---------------------------------------------------------------------------
class TestStartHeadless(unittest.TestCase):
def setUp(self) -> None:
self._manifest = _make_manifest()
patch("bot_bottle.api.ManifestIndex.resolve", return_value=self._manifest).start()
self._launch = patch(
"bot_bottle.api._launch_bottle", return_value=("implementer-abc12", 0)
).start()
patch(
"bot_bottle.api._uniquify_label_headless", side_effect=lambda lbl: lbl
).start()
self.addCleanup(patch.stopall)
def _spec(self):
self._launch.assert_called_once()
return self._launch.call_args[0][0]
def test_returns_slug_on_success(self):
slug = start_headless("implementer", prompt="Do it")
self.assertEqual("implementer-abc12", slug)
def test_passes_assume_yes_and_prompt(self):
start_headless("implementer", prompt="Do it")
kwargs = self._launch.call_args[1]
self.assertTrue(kwargs["assume_yes"])
self.assertEqual("Do it", kwargs["headless_prompt_text"])
def test_explicit_bottles_forwarded(self):
start_headless("implementer", prompt="Do it", bottles=["dev", "claude"])
self.assertEqual(("dev", "claude"), self._spec().bottle_names)
def test_default_bottle_resolved_from_manifest(self):
start_headless("implementer", prompt="Do it")
self.assertEqual(("claude",), self._spec().bottle_names)
def test_forge_env_on_spec(self):
env = {"FORGE_GITEA_API": "https://gitea.example.com/api/v1", "FORGE_OWNER": "acme"}
start_headless("implementer", prompt="Do it", forge_env=env)
self.assertEqual(env, self._spec().forge_env)
def test_no_forge_env_defaults_to_empty_dict(self):
start_headless("implementer", prompt="Do it")
self.assertEqual({}, self._spec().forge_env)
def test_nonzero_exit_raises_bottle_error(self):
self._launch.return_value = ("implementer-abc12", 1)
with self.assertRaises(BottleError) as ctx:
start_headless("implementer", prompt="Do it")
self.assertEqual(1, ctx.exception.exit_code)
def test_no_default_bottle_raises_bottle_error(self):
manifest = _make_manifest(bottle_name="")
with patch("bot_bottle.api.ManifestIndex.resolve", return_value=manifest):
with self.assertRaises(BottleError):
start_headless("implementer", prompt="Do it")
self._launch.assert_not_called()
def test_backend_name_forwarded(self):
start_headless("implementer", prompt="Do it", backend_name="docker")
self.assertEqual("docker", self._launch.call_args[1]["backend_name"])
def test_label_forwarded_to_spec(self):
start_headless("implementer", prompt="Do it", label="nightly")
self.assertEqual("nightly", self._spec().label)
def test_color_forwarded_to_spec(self):
start_headless("implementer", prompt="Do it", color="green")
self.assertEqual("green", self._spec().color)
# ---------------------------------------------------------------------------
# resume_headless
# ---------------------------------------------------------------------------
class TestResumeHeadless(unittest.TestCase):
def setUp(self) -> None:
self._md = _metadata()
patch("bot_bottle.api.read_metadata", return_value=self._md).start()
manifest = _make_manifest()
patch("bot_bottle.api.ManifestIndex.resolve", return_value=manifest).start()
self._launch = patch(
"bot_bottle.api._launch_bottle", return_value=("implementer-abc12", 0)
).start()
self.addCleanup(patch.stopall)
def _spec(self):
self._launch.assert_called_once()
return self._launch.call_args[0][0]
def test_passes_assume_yes_and_prompt(self):
resume_headless("implementer-abc12", prompt="Address review")
kwargs = self._launch.call_args[1]
self.assertTrue(kwargs["assume_yes"])
self.assertEqual("Address review", kwargs["headless_prompt_text"])
def test_identity_set_on_spec(self):
resume_headless("implementer-abc12", prompt="Prompt")
self.assertEqual("implementer-abc12", self._spec().identity)
def test_forge_env_on_spec(self):
env = {"FORGE_ISSUE_NUMBER": "42"}
resume_headless("implementer-abc12", prompt="Prompt", forge_env=env)
self.assertEqual(env, self._spec().forge_env)
def test_missing_state_raises_bottle_error(self):
with patch("bot_bottle.api.read_metadata", return_value=None):
with self.assertRaises(BottleError):
resume_headless("no-such-abc12", prompt="Prompt")
self._launch.assert_not_called()
def test_nonzero_exit_raises_bottle_error(self):
self._launch.return_value = ("implementer-abc12", 2)
with self.assertRaises(BottleError) as ctx:
resume_headless("implementer-abc12", prompt="Prompt")
self.assertEqual(2, ctx.exception.exit_code)
def test_backend_from_metadata_when_not_supplied(self):
resume_headless("implementer-abc12", prompt="Prompt")
self.assertEqual("docker", self._launch.call_args[1]["backend_name"])
def test_explicit_backend_overrides_metadata(self):
resume_headless(
"implementer-abc12", prompt="Prompt", backend_name="smolmachines"
)
self.assertEqual("smolmachines", self._launch.call_args[1]["backend_name"])
# ---------------------------------------------------------------------------
# freeze
# ---------------------------------------------------------------------------
class TestFreeze(unittest.TestCase):
def setUp(self) -> None:
patch("bot_bottle.api.read_metadata", return_value=_metadata()).start()
self._freezer = MagicMock()
self._get_freezer = patch(
"bot_bottle.api.get_freezer", return_value=self._freezer
).start()
self.addCleanup(patch.stopall)
def test_calls_commit_slug(self):
freeze("implementer-abc12")
self._freezer.commit_slug.assert_called_once_with("implementer-abc12")
def test_backend_from_metadata_when_not_supplied(self):
freeze("implementer-abc12")
self._get_freezer.assert_called_once_with("docker")
def test_explicit_backend_used(self):
freeze("implementer-abc12", backend_name="smolmachines")
self._get_freezer.assert_called_once_with("smolmachines")
def test_commit_cancelled_raises_bottle_error(self):
from bot_bottle.backend.freeze import CommitCancelled
self._freezer.commit_slug.side_effect = CommitCancelled("declined")
with self.assertRaises(BottleError):
freeze("implementer-abc12")
# ---------------------------------------------------------------------------
# destroy
# ---------------------------------------------------------------------------
class TestDestroy(unittest.TestCase):
def setUp(self) -> None:
patch("bot_bottle.api.read_metadata", return_value=_metadata()).start()
self._dd = patch("bot_bottle.api._destroy_docker").start()
patch("bot_bottle.api.clear_preserve_marker").start()
self._cleanup = patch("bot_bottle.api.cleanup_state").start()
self.addCleanup(patch.stopall)
def test_docker_backend_calls_destroy_docker(self):
destroy("implementer-abc12")
self._dd.assert_called_once_with("implementer-abc12")
def test_state_dir_always_cleaned(self):
destroy("implementer-abc12")
self._cleanup.assert_called_once_with("implementer-abc12")
def test_smolmachines_backend_calls_destroy_smolmachines(self):
patch(
"bot_bottle.api.read_metadata",
return_value=_metadata(backend="smolmachines"),
).start()
ds = patch("bot_bottle.api._destroy_smolmachines").start()
destroy("implementer-abc12")
ds.assert_called_once_with("implementer-abc12")
self._dd.assert_not_called()
def test_missing_metadata_defaults_to_docker(self):
patch("bot_bottle.api.read_metadata", return_value=None).start()
destroy("no-state-abc12")
self._dd.assert_called_once_with("no-state-abc12")
def test_explicit_backend_overrides_metadata(self):
ds = patch("bot_bottle.api._destroy_smolmachines").start()
destroy("implementer-abc12", backend_name="smolmachines")
ds.assert_called_once_with("implementer-abc12")
self._dd.assert_not_called()
# ---------------------------------------------------------------------------
# public surface exported from bot_bottle.__init__
# ---------------------------------------------------------------------------
class TestPublicSurface(unittest.TestCase):
def test_importable_from_package(self):
import bot_bottle
for name in ("BottleError", "start_headless", "resume_headless", "freeze", "destroy"):
self.assertTrue(hasattr(bot_bottle, name), f"missing: {name}")
if __name__ == "__main__":
unittest.main()
+75
View File
@@ -0,0 +1,75 @@
"""Unit: `cli.py resume --headless` non-interactive rehydrate path.
The freeze / rehydrate loop needs a non-interactive `resume`: deliver a
follow-up prompt and skip the y/N preflight, reusing the same launch
core (`assume_yes` + `headless_prompt_text`) as `start --headless`.
"""
from __future__ import annotations
import unittest
from typing import Any
from unittest.mock import MagicMock, patch
import bot_bottle.cli.resume as resume_mod
from bot_bottle.log import Die
def _metadata():
md = MagicMock()
md.agent_name = "implementer"
md.copy_cwd = False
md.cwd = "/repo"
md.identity = "implementer-abc12"
md.bottle_names = ["claude"]
md.backend = "docker"
return md
class ResumeHeadlessTest(unittest.TestCase):
def setUp(self) -> None:
self._launch = patch.object(
resume_mod, "_launch_bottle", return_value=("implementer-abc12", 0)
).start()
patch.object(
resume_mod, "read_metadata", return_value=_metadata()
).start()
manifest = MagicMock()
manifest.require_agent = MagicMock(return_value=None)
patch.object(
resume_mod.ManifestIndex, "resolve", return_value=manifest
).start()
self.addCleanup(patch.stopall)
def _launch_kwargs(self) -> dict[str, Any]:
self._launch.assert_called_once()
return dict(self._launch.call_args.kwargs)
def test_headless_passes_assume_yes_and_prompt(self):
rc = resume_mod.cmd_resume(
["implementer-abc12", "--headless", "--prompt", "Address the review"]
)
self.assertEqual(0, rc)
kwargs = self._launch_kwargs()
self.assertTrue(kwargs["assume_yes"])
self.assertEqual("Address the review", kwargs["headless_prompt_text"])
def test_interactive_resume_unchanged(self):
resume_mod.cmd_resume(["implementer-abc12"])
kwargs = self._launch_kwargs()
self.assertFalse(kwargs["assume_yes"])
self.assertEqual("", kwargs["headless_prompt_text"])
def test_headless_without_prompt_errors(self):
with self.assertRaises(Die):
resume_mod.cmd_resume(["implementer-abc12", "--headless"])
self._launch.assert_not_called()
def test_prompt_without_headless_errors(self):
with self.assertRaises(Die):
resume_mod.cmd_resume(["implementer-abc12", "--prompt", "hi"])
self._launch.assert_not_called()
if __name__ == "__main__":
unittest.main()
+188
View File
@@ -0,0 +1,188 @@
"""Unit: `cli.py start --headless` non-interactive launch path.
Headless is the keystone for orchestrators, CI, and webhook
dispatch: agent/bottles/label come from flags + manifest defaults, no
TUI selectors fire, and the preflight y/N is auto-confirmed
(`assume_yes=True`). All actual launch work is stubbed so no container
is created.
"""
from __future__ import annotations
import os
import unittest
from unittest.mock import MagicMock, patch
import bot_bottle.cli.start as start_mod
import bot_bottle.cli.tui as tui_mod
from bot_bottle.backend import ActiveAgent
from bot_bottle.log import Die
from bot_bottle.manifest import ManifestError
def _make_manifest(
agent_names: list[str],
bottle_names: list[str] | None = None,
agent_bottle: str = "",
):
manifest = MagicMock()
manifest.agents = {name: MagicMock(bottle=agent_bottle) for name in agent_names}
manifest.all_agent_names = sorted(agent_names)
manifest.all_bottle_names = sorted(bottle_names or [])
manifest.home_md = None # eager mode so _peek_agent_bottle uses agents dict
manifest.require_agent = MagicMock(return_value=None)
return manifest
def _active_agent(slug: str) -> ActiveAgent:
return ActiveAgent(
backend_name="docker",
slug=slug,
agent_name="demo",
started_at="2026-01-01T00:00:00+00:00",
services=(),
)
class TestCmdStartHeadless(unittest.TestCase):
"""Drive `cmd_start --headless` with launch + TUI stubbed out."""
def setUp(self):
self._manifest = _make_manifest(
["researcher", "implementer"], ["claude", "dev"], agent_bottle="claude"
)
patch(
"bot_bottle.cli.start.ManifestIndex.resolve",
return_value=self._manifest,
).start()
self._launch_mock = patch(
"bot_bottle.cli.start._launch_bottle", return_value=("", 0)
).start()
# No bottles running by default → no label collision.
patch(
"bot_bottle.cli.start.enumerate_active_agents", return_value=[]
).start()
# If any TUI picker fires in headless mode, that's a bug.
self._agent_picker = patch.object(tui_mod, "filter_select").start()
self._bottle_picker = patch.object(tui_mod, "filter_multiselect").start()
self._modal = patch.object(tui_mod, "name_color_modal").start()
patch.dict(os.environ, {}, clear=False).start()
os.environ.pop("BOT_BOTTLE_BACKEND", None)
self.addCleanup(patch.stopall)
def _spec(self):
self._launch_mock.assert_called_once()
return self._launch_mock.call_args[0][0]
# -- no TUI in headless --------------------------------------------
def test_headless_fires_no_pickers(self):
rc = start_mod.cmd_start(
["--headless", "researcher", "--bottle", "claude", "--prompt", "Do it"]
)
self.assertEqual(0, rc)
self._agent_picker.assert_not_called()
self._bottle_picker.assert_not_called()
self._modal.assert_not_called()
def test_headless_assume_yes_forwarded(self):
start_mod.cmd_start(
["--headless", "researcher", "--bottle", "claude", "--prompt", "Do it"]
)
self.assertTrue(self._launch_mock.call_args[1]["assume_yes"])
# -- prompt --------------------------------------------------------
def test_headless_without_prompt_dies(self):
with self.assertRaises(Die):
start_mod.cmd_start(["--headless", "researcher", "--bottle", "claude"])
self._launch_mock.assert_not_called()
def test_headless_prompt_forwarded_to_launch(self):
start_mod.cmd_start(
["--headless", "researcher", "--bottle", "claude",
"--prompt", "Implement issue #42"]
)
self.assertEqual(
"Implement issue #42",
self._launch_mock.call_args[1]["headless_prompt_text"],
)
# -- bottle resolution ---------------------------------------------
def test_explicit_bottles_forwarded_in_order(self):
start_mod.cmd_start(
["--headless", "researcher", "--bottle", "dev", "--bottle", "claude",
"--prompt", "Do it"]
)
self.assertEqual(("dev", "claude"), self._spec().bottle_names)
def test_omitted_bottle_falls_back_to_agent_default(self):
start_mod.cmd_start(["--headless", "implementer", "--prompt", "Do it"])
self.assertEqual(("claude",), self._spec().bottle_names)
def test_no_bottle_and_no_default_dies(self):
manifest = _make_manifest(["researcher"], ["claude"], agent_bottle="")
with patch(
"bot_bottle.cli.start.ManifestIndex.resolve", return_value=manifest
):
with self.assertRaises(Die):
start_mod.cmd_start(
["--headless", "researcher", "--prompt", "Do it"]
)
self._launch_mock.assert_not_called()
# -- agent resolution ----------------------------------------------
def test_missing_agent_name_dies(self):
with self.assertRaises(Die):
start_mod.cmd_start(["--headless"])
self._launch_mock.assert_not_called()
def test_unknown_agent_raises_manifest_error(self):
self._manifest.require_agent.side_effect = ManifestError("agent 'x' not defined")
with self.assertRaises(ManifestError):
start_mod.cmd_start(
["--headless", "x", "--bottle", "claude", "--prompt", "Do it"]
)
self._launch_mock.assert_not_called()
# -- label / color -------------------------------------------------
def test_label_defaults_to_agent_name(self):
start_mod.cmd_start(
["--headless", "researcher", "--bottle", "claude", "--prompt", "Do it"]
)
self.assertEqual("researcher", self._spec().label)
def test_explicit_label_and_color_forwarded(self):
start_mod.cmd_start(
["--headless", "researcher", "--bottle", "claude",
"--label", "nightly", "--color", "green", "--prompt", "Do it"]
)
spec = self._spec()
self.assertEqual("nightly", spec.label)
self.assertEqual("green", spec.color)
def test_label_collision_uniquifies(self):
with patch(
"bot_bottle.cli.start.enumerate_active_agents",
return_value=[_active_agent("researcher")],
):
start_mod.cmd_start(
["--headless", "researcher", "--bottle", "claude", "--prompt", "Do it"]
)
self.assertEqual("researcher-2", self._spec().label)
# -- backend wiring ------------------------------------------------
def test_backend_flag_forwarded(self):
start_mod.cmd_start(
["--headless", "--backend=docker", "researcher", "--bottle", "claude",
"--prompt", "Do it"]
)
self.assertEqual("docker", self._launch_mock.call_args[1]["backend_name"])
if __name__ == "__main__":
unittest.main()
+2 -2
View File
@@ -45,7 +45,7 @@ class TestCmdStartSelector(unittest.TestCase):
self._launch_patch = patch( self._launch_patch = patch(
"bot_bottle.cli.start._launch_bottle", "bot_bottle.cli.start._launch_bottle",
return_value=0, return_value=("", 0),
) )
self._launch_mock = self._launch_patch.start() self._launch_mock = self._launch_patch.start()
@@ -211,7 +211,7 @@ class TestCmdStartLabelCollision(unittest.TestCase):
self._manifest = _make_manifest(["researcher"], ["claude"]) self._manifest = _make_manifest(["researcher"], ["claude"])
patch("bot_bottle.cli.start.ManifestIndex.resolve", return_value=self._manifest).start() patch("bot_bottle.cli.start.ManifestIndex.resolve", return_value=self._manifest).start()
self._launch_mock = patch( self._launch_mock = patch(
"bot_bottle.cli.start._launch_bottle", return_value=0, "bot_bottle.cli.start._launch_bottle", return_value=("", 0),
).start() ).start()
# Stub the bottle picker to always return a selection. # Stub the bottle picker to always return a selection.
patch.object(tui_mod, "filter_multiselect", return_value=["claude"]).start() patch.object(tui_mod, "filter_multiselect", return_value=["claude"]).start()
@@ -343,5 +343,14 @@ class TestClaudeSuperviseMcp(unittest.TestCase):
) )
class TestClaudeHeadlessPrompt(unittest.TestCase):
def test_returns_p_flag_and_prompt(self):
self.assertEqual(["-p", "Do the task"], ClaudeAgentProvider().headless_prompt("Do the task"))
def test_preserves_prompt_text_verbatim(self):
text = "Fix issue #42: the widget breaks on empty input"
self.assertEqual(["-p", text], ClaudeAgentProvider().headless_prompt(text))
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
@@ -314,5 +314,14 @@ class TestCodexSuperviseMcp(unittest.TestCase):
) )
class TestCodexHeadlessPrompt(unittest.TestCase):
def test_returns_prompt_as_positional_arg(self):
self.assertEqual(["Do the task"], CodexAgentProvider().headless_prompt("Do the task"))
def test_preserves_prompt_text_verbatim(self):
text = "Fix issue #42: the widget breaks on empty input"
self.assertEqual([text], CodexAgentProvider().headless_prompt(text))
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
+107
View File
@@ -0,0 +1,107 @@
"""Unit: Forge abstraction + ScopedForge (PRD forge-native-integration)."""
from __future__ import annotations
import unittest
from bot_bottle.contrib.forge.base import (
Comment,
Forge,
ForgeScopeError,
Issue,
PullRequest,
ScopedForge,
)
class _RecordingForge(Forge):
"""In-memory fake that records writes."""
def __init__(self) -> None:
self.comments: list[tuple[int, str]] = []
self.descriptions: list[tuple[int, str]] = []
def read_issue(self, number: int) -> Issue:
return Issue(number=number, title="t", body="b", state="open")
def read_pr(self, number: int) -> PullRequest:
return PullRequest(
number=number, title="pr", body="b", state="open", merged=False
)
def read_comments(self, number: int) -> list[Comment]:
return [Comment(id=1, user="alice", body="hi")]
def post_comment(self, number: int, body: str) -> None:
self.comments.append((number, body))
def update_description(self, number: int, body: str) -> None:
self.descriptions.append((number, body))
def is_org_member(self, org: str, username: str) -> bool:
return username == "member"
def get_pr_for_issue(self, number: int) -> int | None:
return 99 if number == 17 else None
def is_pr_open(self, number: int) -> bool:
return True
class TestScopedForgeReads(unittest.TestCase):
def setUp(self) -> None:
self.inner = _RecordingForge()
self.scoped = ScopedForge(self.inner, assigned_issue=17, assigned_prs=[42])
def test_reads_pass_through_to_any_number(self):
# A number well outside the writable scope still reads fine.
self.assertEqual(123, self.scoped.read_issue(123).number)
self.assertEqual("alice", self.scoped.read_comments(500)[0].user)
def test_read_pr_passes_through(self):
pr = self.scoped.read_pr(999)
self.assertIsInstance(pr, PullRequest)
self.assertEqual(999, pr.number)
self.assertFalse(pr.merged)
def test_membership_and_pr_lookups_delegate(self):
self.assertTrue(self.scoped.is_org_member("bot-bottle", "member"))
self.assertFalse(self.scoped.is_org_member("bot-bottle", "stranger"))
self.assertEqual(99, self.scoped.get_pr_for_issue(17))
self.assertTrue(self.scoped.is_pr_open(8000))
class TestScopedForgeWrites(unittest.TestCase):
def setUp(self) -> None:
self.inner = _RecordingForge()
self.scoped = ScopedForge(self.inner, assigned_issue=17, assigned_prs=[42])
def test_writable_set_is_issue_plus_prs(self):
self.assertEqual(frozenset({17, 42}), self.scoped.writable)
def test_write_to_assigned_issue_allowed(self):
self.scoped.post_comment(17, "done")
self.assertEqual([(17, "done")], self.inner.comments)
def test_write_to_assigned_pr_allowed(self):
self.scoped.update_description(42, "new body")
self.assertEqual([(42, "new body")], self.inner.descriptions)
def test_comment_outside_scope_rejected(self):
with self.assertRaises(ForgeScopeError) as ctx:
self.scoped.post_comment(500, "spam")
self.assertIn("500", str(ctx.exception))
self.assertEqual([], self.inner.comments)
def test_description_outside_scope_rejected(self):
with self.assertRaises(ForgeScopeError):
self.scoped.update_description(500, "tamper")
self.assertEqual([], self.inner.descriptions)
def test_scope_error_is_permission_error(self):
# Sidecars can catch the stdlib base type.
self.assertIn(PermissionError, ForgeScopeError.__mro__)
if __name__ == "__main__":
unittest.main()
+145
View File
@@ -0,0 +1,145 @@
"""Unit: GiteaClient + GiteaForge (PRD forge-native-integration)."""
from __future__ import annotations
import json
import unittest
import urllib.error
from io import BytesIO
from unittest.mock import MagicMock, patch
from bot_bottle.contrib.gitea.client import GiteaClient, GiteaForge
def _client() -> GiteaClient:
return GiteaClient(
api_url="https://gitea.example.com/api/v1",
owner="didericis",
repo="bot-bottle",
token="test-token",
)
def _resp(body: object, status: int = 200) -> MagicMock:
resp = MagicMock()
resp.read.return_value = json.dumps(body).encode() if body is not None else b""
resp.status = status
resp.__enter__ = lambda s: s # type: ignore
resp.__exit__ = MagicMock(return_value=False)
return resp
def _http_error(code: int, body: str = "") -> urllib.error.HTTPError:
return urllib.error.HTTPError(
url="http://x", code=code, msg="err", hdrs=None, # type: ignore[arg-type]
fp=BytesIO(body.encode()),
)
_URLOPEN = "bot_bottle.contrib.gitea.client.urllib.request.urlopen"
class TestOrgMembership(unittest.TestCase):
def test_member_returns_true_on_2xx(self):
with patch(_URLOPEN, return_value=_resp(None, 204)) as m:
self.assertTrue(_client().is_org_member("bot-bottle", "alice"))
req = m.call_args.args[0]
self.assertIn("/orgs/bot-bottle/members/alice", req.full_url)
def test_nonmember_returns_false_on_404(self):
with patch(_URLOPEN, side_effect=_http_error(404)):
self.assertFalse(_client().is_org_member("bot-bottle", "stranger"))
def test_other_http_error_raises(self):
with patch(_URLOPEN, side_effect=_http_error(403, "forbidden")):
with self.assertRaises(RuntimeError) as ctx:
_client().is_org_member("bot-bottle", "alice")
self.assertIn("403", str(ctx.exception))
class TestForgeReads(unittest.TestCase):
def test_read_issue_maps_fields(self):
raw = {"number": 17, "title": "Bug", "body": "broken", "state": "open"}
with patch(_URLOPEN, return_value=_resp(raw)) as m:
issue = GiteaForge(_client()).read_issue(17)
self.assertEqual((17, "Bug", "broken", "open"),
(issue.number, issue.title, issue.body, issue.state))
self.assertIn("/repos/didericis/bot-bottle/issues/17",
m.call_args.args[0].full_url)
def test_read_issue_tolerates_null_body(self):
raw = {"number": 17, "title": "T", "body": None, "state": "open"}
with patch(_URLOPEN, return_value=_resp(raw)):
self.assertEqual("", GiteaForge(_client()).read_issue(17).body)
def test_read_comments_maps_user_login(self):
raw = [
{"id": 1, "user": {"login": "alice"}, "body": "hi"},
{"id": 2, "user": {"login": "bob"}, "body": "yo"},
]
with patch(_URLOPEN, return_value=_resp(raw)):
comments = GiteaForge(_client()).read_comments(17)
self.assertEqual(["alice", "bob"], [c.user for c in comments])
self.assertEqual([1, 2], [c.id for c in comments])
class TestForgeWrites(unittest.TestCase):
def test_post_comment_payload_and_url(self):
with patch(_URLOPEN, return_value=_resp(None, 201)) as m:
GiteaForge(_client()).post_comment(17, "done ✓")
req = m.call_args.args[0]
self.assertEqual("POST", req.method)
self.assertIn("/repos/didericis/bot-bottle/issues/17/comments", req.full_url)
self.assertEqual("done ✓", json.loads(req.data)["body"])
def test_update_description_patches_issue(self):
with patch(_URLOPEN, return_value=_resp(None, 200)) as m:
GiteaForge(_client()).update_description(17, "edited")
req = m.call_args.args[0]
self.assertEqual("PATCH", req.method)
self.assertTrue(req.full_url.endswith("/issues/17"))
self.assertEqual("edited", json.loads(req.data)["body"])
def test_auth_header_sent(self):
with patch(_URLOPEN, return_value=_resp(None, 201)) as m:
GiteaForge(_client()).post_comment(17, "x")
self.assertEqual("token test-token",
m.call_args.args[0].headers["Authorization"])
class TestPRHelpers(unittest.TestCase):
def test_get_pr_for_issue_returns_number_when_issue_is_pr(self):
raw = {"number": 18, "pull_request": {"merged": False}}
with patch(_URLOPEN, return_value=_resp(raw)):
self.assertEqual(18, GiteaForge(_client()).get_pr_for_issue(18))
def test_get_pr_for_issue_none_for_plain_issue(self):
raw = {"number": 17, "pull_request": None}
with patch(_URLOPEN, return_value=_resp(raw)):
self.assertIsNone(GiteaForge(_client()).get_pr_for_issue(17))
def test_is_pr_open_true_when_state_open(self):
with patch(_URLOPEN, return_value=_resp({"state": "open"})):
self.assertTrue(GiteaForge(_client()).is_pr_open(18))
def test_is_pr_open_false_when_closed(self):
with patch(_URLOPEN, return_value=_resp({"state": "closed"})):
self.assertFalse(GiteaForge(_client()).is_pr_open(18))
def test_read_pr_maps_fields_including_merged(self):
raw = {"number": 18, "title": "Fix", "body": "patch",
"state": "closed", "merged": True}
with patch(_URLOPEN, return_value=_resp(raw)) as m:
pr = GiteaForge(_client()).read_pr(18)
self.assertEqual((18, "Fix", "patch", "closed", True),
(pr.number, pr.title, pr.body, pr.state, pr.merged))
self.assertIn("/repos/didericis/bot-bottle/pulls/18",
m.call_args.args[0].full_url)
def test_read_pr_merged_defaults_false(self):
with patch(_URLOPEN, return_value=_resp({"number": 18, "state": "open"})):
self.assertFalse(GiteaForge(_client()).read_pr(18).merged)
if __name__ == "__main__":
unittest.main()
@@ -0,0 +1,99 @@
"""Unit: SQLite forge state store (PRD forge-native-integration)."""
from __future__ import annotations
import tempfile
import unittest
from dataclasses import replace
from pathlib import Path
from bot_bottle.contrib.gitea.forge_state import (
STATUS_FROZEN,
STATUS_RUNNING,
ForgeState,
SqliteForgeStateStore,
)
def _state(**over: object) -> ForgeState:
base = ForgeState(
owner="didericis",
repo="bot-bottle",
issue_number=17,
slug="implementer-abc12",
agent_name="implementer",
bottle_names=["claude"],
backend_name="docker",
agent_git_user="didericis-claude",
pr_number=42,
status=STATUS_FROZEN,
last_checkin_at="2026-06-29T12:04:12-04:00",
)
return replace(base, **over)
class ForgeStateStoreTest(unittest.TestCase):
def setUp(self) -> None:
tmp = Path(self.enterContext(tempfile.TemporaryDirectory())) # pylint: disable=consider-using-with
self.store = SqliteForgeStateStore(tmp / "sub" / "bot-bottle.db")
def test_round_trip(self):
self.store.upsert(_state())
self.assertEqual(_state(), self.store.get("didericis", "bot-bottle", 17))
def test_missing_returns_none(self):
self.assertIsNone(self.store.get("nobody", "nope", 1))
def test_creates_db_parent_dirs(self):
# setUp pointed at a non-existent 'sub/' dir; init must create it.
self.assertIsNone(self.store.get("x", "y", 1)) # no raise
def test_upsert_replaces(self):
self.store.upsert(_state(status=STATUS_RUNNING))
self.store.upsert(_state(status=STATUS_FROZEN))
got = self.store.get("didericis", "bot-bottle", 17)
assert got is not None
self.assertEqual(STATUS_FROZEN, got.status)
# Still one row, not two.
self.assertEqual(1, len(self.store.all()))
def test_delete_is_idempotent(self):
self.store.upsert(_state())
self.store.delete("didericis", "bot-bottle", 17)
self.store.delete("didericis", "bot-bottle", 17) # no raise
self.assertIsNone(self.store.get("didericis", "bot-bottle", 17))
def test_all_lists_across_repos_sorted(self):
self.store.upsert(_state(issue_number=18, slug="other"))
self.store.upsert(_state(issue_number=17))
self.store.upsert(_state(owner="acme", repo="widget", issue_number=3))
states = self.store.all()
self.assertEqual(3, len(states))
self.assertEqual(
[("acme", 3), ("didericis", 17), ("didericis", 18)],
[(s.owner, s.issue_number) for s in states],
)
def test_all_empty(self):
self.assertEqual([], self.store.all())
def test_bottle_names_list_preserved(self):
self.store.upsert(_state(bottle_names=["claude", "dev"]))
got = self.store.get("didericis", "bot-bottle", 17)
assert got is not None
self.assertEqual(["claude", "dev"], got.bottle_names)
def test_pr_number_nullable(self):
self.store.upsert(_state(pr_number=None))
got = self.store.get("didericis", "bot-bottle", 17)
assert got is not None
self.assertIsNone(got.pr_number)
def test_persists_across_store_instances(self):
self.store.upsert(_state())
reopened = SqliteForgeStateStore(self.store._db_path) # pylint: disable=protected-access
self.assertEqual(_state(), reopened.get("didericis", "bot-bottle", 17))
if __name__ == "__main__":
unittest.main()
+9
View File
@@ -223,5 +223,14 @@ class TestPiDockerfile(unittest.TestCase):
self.assertIn("chmod 1777 /tmp /var/tmp", dockerfile) self.assertIn("chmod 1777 /tmp /var/tmp", dockerfile)
class TestPiHeadlessPrompt(unittest.TestCase):
def test_returns_p_flag_and_prompt(self):
self.assertEqual(["-p", "Do the task"], PiAgentProvider().headless_prompt("Do the task"))
def test_preserves_prompt_text_verbatim(self):
text = "Fix issue #42: the widget breaks on empty input"
self.assertEqual(["-p", text], PiAgentProvider().headless_prompt(text))
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
@@ -38,6 +38,7 @@ class _Provider(AgentProvider):
def provision_prompt(self, plan, bottle): ... # type: ignore[override] def provision_prompt(self, plan, bottle): ... # type: ignore[override]
def provision(self, plan, bottle): ... # type: ignore[override] def provision(self, plan, bottle): ... # type: ignore[override]
def provision_supervise_mcp(self, plan, bottle, supervise_url): ... # type: ignore[override] def provision_supervise_mcp(self, plan, bottle, supervise_url): ... # type: ignore[override]
def headless_prompt(self, prompt): return [] # type: ignore[override]
_PROVIDER = _Provider() _PROVIDER = _Provider()
+16
View File
@@ -165,6 +165,22 @@ class TestAgentValidation(unittest.TestCase):
with self.assertRaises(ManifestError): with self.assertRaises(ManifestError):
ManifestAgent.from_dict("a", {"skills": [5]}, set()) ManifestAgent.from_dict("a", {"skills": [5]}, set())
def test_skill_name_rejects_shell_metacharacters(self) -> None:
# Skill names become host/guest path segments interpolated into
# provisioning shell commands; anything outside kebab-case is
# rejected at load so it can never reach a `bottle.exec` string.
for bad in ("foo; rm -rf /", "../escape", "foo bar", "Foo", "-leading"):
with self.assertRaises(ManifestError):
ManifestAgent.from_dict("a", {"skills": [bad]}, set())
def test_skill_name_accepts_kebab_case(self) -> None:
agent = ManifestAgent.from_dict(
"a", {"skills": ["init-entry", "quality-eval", "skill0"]}, set()
)
self.assertEqual(
agent.skills, ("init-entry", "quality-eval", "skill0")
)
def test_prompt_not_string(self) -> None: def test_prompt_not_string(self) -> None:
with self.assertRaises(ManifestError): with self.assertRaises(ManifestError):
ManifestAgent.from_dict("a", {"prompt": 5}, set()) ManifestAgent.from_dict("a", {"prompt": 5}, set())
@@ -49,6 +49,7 @@ class _Provider(AgentProvider):
def provision_prompt(self, plan, bottle): ... # type: ignore[override] def provision_prompt(self, plan, bottle): ... # type: ignore[override]
def provision(self, plan, bottle): ... # type: ignore[override] def provision(self, plan, bottle): ... # type: ignore[override]
def provision_supervise_mcp(self, plan, bottle, supervise_url): ... # type: ignore[override] def provision_supervise_mcp(self, plan, bottle, supervise_url): ... # type: ignore[override]
def headless_prompt(self, prompt): return [] # type: ignore[override]
_PROVIDER = _Provider() _PROVIDER = _Provider()