Compare commits

...

8 Commits

Author SHA1 Message Date
didericis a229a22d54 feat(forge): forge library layer for native integration (PRD chunks 1-3, 5)
lint / lint (push) Failing after 2m9s
test / unit (pull_request) Successful in 58s
test / integration (pull_request) Successful in 21s
test / coverage (pull_request) Successful in 1m23s
Implements the bot-bottle side of the forge-native PRD that is
self-contained in this repo (the forge sidecar and orchestrate command
belong to the separate bot-bottle-orchestrator, a PRD non-goal):

- contrib/forge/base.py: Forge ABC + ScopedForge enforcing the
  read-anywhere / write-scoped model (writes rejected outside the
  assigned issue/PRs via ForgeScopeError).
- contrib/gitea/client.py: GiteaClient (stdlib-only HTTP, mirrors the
  deploy-key provisioner) + GiteaForge. Token held by the caller (the
  sidecar), not injected by cred-proxy.
- contrib/gitea/forge_state.py: ForgeState dataclass + atomic
  read/write/delete/all under ~/.bot-bottle/forge/<owner>/<repo>/.
- contrib/gitea/provenance.py: build_provenance_footer — collapsed
  markdown audit footer; watchdog/gitleaks/egress rendering.
- cli/resume.py: `resume --headless --prompt` reusing the shipped
  assume_yes + headless_prompt launch core (the new half of chunk 1).

47 new unit tests; pylint 9.98/10, pyright clean. Forge sidecar (chunk
4), orchestrate command (chunk 6), and forge_env plumbing are deferred:
their only consumer is the separate orchestrator and they are untestable
in isolation here.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01WL77TgFxKbs3cidGMG9dz7
2026-06-30 19:39:49 -04:00
didericis 738990b2df Merge remote-tracking branch 'origin/main' into forge-native-integration 2026-06-30 19:19:01 -04:00
didericis 4cb106b48d docs(prd): reconcile headless primitives with shipped start --headless
#315 already merged `start --headless` (assume_yes on _launch_bottle +
AgentProvider.headless_prompt). The PRD's proposed start_headless /
attach_agent_headless helpers were redundant with it, and the latter
diverged by hand-rolling --no-interactive/-p instead of using the
headless_prompt provider abstraction. Drop them.

Scope the remaining headless work to what's actually new: a forge_env
hook threaded into the existing _launch_bottle core, and a `resume
--headless` path (resume has no non-interactive entry point today).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01WL77TgFxKbs3cidGMG9dz7
2026-06-30 17:46:59 -04:00
didericis ebad90bfa9 docs(prd): adopt forge sidecar (option 3) for native integration
Flip the forge-native-integration PRD from option 2 (agent calls the
Gitea API directly via cred-proxy; done signal parsed from comments) to
option 3 per issue #317 comment 2715: a forge sidecar backed by a Forge
abstract class.

- signal_done(status, summary) replaces comment-parsing as the done signal
- semantic audit trail from the sidecar feeds provenance directly
- read-anywhere / write-scoped enforcement, tighter than repo-wide API keys
- forge-agnostic agent prompts and sidecar protocol
- DeployKeyProvisioner subsumption deferred; share the HTTP client only

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01WL77TgFxKbs3cidGMG9dz7
2026-06-30 17:39:33 -04:00
didericis b93fe58523 feat(cli): add headless launch mode for orchestrators
test / unit (pull_request) Successful in 47s
test / integration (pull_request) Successful in 16s
test / coverage (pull_request) Successful in 1m4s
lint / lint (push) Successful in 2m0s
test / unit (push) Successful in 48s
test / integration (push) Successful in 18s
test / coverage (push) Successful in 57s
Update Quality Badges / update-badges (push) Successful in 57s
`--headless` is a non-interactive launch path for `cli.py start`:
agent, bottles, label, and color come from flags + manifest defaults
with no TUI selectors and no y/N preflight (auto-confirmed via a new
`assume_yes` param threaded into the shared `_launch_bottle` core).

- `--bottle` (repeatable) defaults to the agent's own `bottle:`;
  `--label` defaults to the agent name and auto-uniquifies on slug
  collision; `--color` defaults to none.
- `--prompt TEXT` is required in headless mode and is delivered to the
  agent via a new `headless_prompt(prompt)` method on `AgentProvider`,
  implemented for claude (`-p`), codex (positional), and pi (`-p`).
- The agent still execs on inherited stdio/PTY, so whatever allocates
  the PTY drives the live session; only the launch chrome is headless.
- `--headless --dry-run` previews the resolved plan without launching.

Adds unit coverage in tests/unit/test_cli_start_headless.py and
headless_prompt tests for each provider. Also stubs headless_prompt on
the in-test AgentProvider subclasses so the unit suite collects cleanly.

Closes #315.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01WL77TgFxKbs3cidGMG9dz7
2026-06-30 15:08:14 -04:00
didericis-claude 1789deaf73 docs: update forge PRD — orchestrator split, done signal, org targeting, forge env vars 2026-06-29 12:39:02 -04:00
didericis-claude b607d68a0e docs: add PRD for forge native integration 2026-06-29 12:10:41 -04:00
didericis 94eca35b4f fix(skills): validate skill names and quote provisioning paths
test / unit (push) Successful in 55s
test / integration (push) Successful in 23s
test / coverage (push) Successful in 1m11s
Update Quality Badges / update-badges (push) Successful in 1m3s
lint / lint (push) Successful in 2m18s
Skill names become host/guest path segments interpolated into the
`bottle.exec` shell strings in each contrib provider's provision_skills.
They were validated only as strings, so a name with shell metacharacters
or path traversal could reach the command.

Layer two defenses:
  - Primary: reject any skill name that isn't kebab-case
    ([a-z][a-z0-9-]*) at manifest load, reusing the convention already
    enforced on bottle/agent filenames (new is_valid_entity_name helper
    in manifest_schema). Fails loud and early, protecting every consumer
    of the name — not just the exec call sites.
  - Failsafe: shlex.quote the interpolated skills_dir / dst paths in the
    claude, codex, and pi providers, so a future unvalidated field can't
    inject shell metacharacters even if it bypasses the load-time check.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01NkwFXLFff9PYPy4wgVBJp9
2026-06-27 02:15:30 -04:00
26 changed files with 1918 additions and 18 deletions
+9
View File
@@ -209,6 +209,15 @@ class AgentProvider(ABC):
the supervise sidecar is reachable. No-op when the supervise sidecar is reachable. No-op when
`plan.supervise_plan is None`.""" `plan.supervise_plan is None`."""
@abstractmethod
def headless_prompt(self, prompt: str) -> list[str]:
"""Return the agent CLI args that deliver `prompt` as the
initial task in a non-interactive (headless) session.
Called only when ``--prompt`` is passed to
``./cli.py start --headless``; the returned args are appended
after the provider's ``bypass_args`` and ``startup_args``."""
def provision_ca(self, bottle: "Bottle", plan: "BottlePlan") -> None: def provision_ca(self, bottle: "Bottle", plan: "BottlePlan") -> None:
"""Install the egress MITM CA into the agent's trust store. """Install the egress MITM CA into the agent's trust store.
+24
View File
@@ -27,12 +27,34 @@ from .start import _launch_bottle
def cmd_resume(argv: list[str]) -> int: def cmd_resume(argv: list[str]) -> int:
parser = argparse.ArgumentParser(prog=f"{PROG} resume", add_help=True) parser = argparse.ArgumentParser(prog=f"{PROG} resume", add_help=True)
parser.add_argument("--dry-run", action="store_true") parser.add_argument("--dry-run", action="store_true")
parser.add_argument(
"--headless",
action="store_true",
help=(
"non-interactive rehydrate: deliver --prompt to the agent and "
"skip the y/N preflight. For orchestrators / the freeze-rehydrate "
"loop."
),
)
parser.add_argument(
"--prompt",
default=None,
help="follow-up prompt delivered to the agent (required with --headless)",
)
parser.add_argument( parser.add_argument(
"identity", "identity",
help="bottle identity from a prior `start` (see its session-end output)", help="bottle identity from a prior `start` (see its session-end output)",
) )
args = parser.parse_args(argv) args = parser.parse_args(argv)
if args.prompt and not args.headless:
die("--prompt is only valid with --headless")
if args.headless and not args.prompt:
die(
"--headless requires --prompt: "
"./cli.py resume <identity> --headless --prompt 'Address the review'"
)
metadata = read_metadata(args.identity) metadata = read_metadata(args.identity)
if metadata is None: if metadata is None:
die( die(
@@ -56,4 +78,6 @@ def cmd_resume(argv: list[str]) -> int:
spec, spec,
dry_run=args.dry_run, dry_run=args.dry_run,
backend_name=backend_name, backend_name=backend_name,
assume_yes=args.headless,
headless_prompt_text=args.prompt or "",
) )
+142 -7
View File
@@ -2,6 +2,11 @@
interactive claude-code session. The container is torn down when the interactive claude-code session. The container is torn down when the
session ends. session ends.
`--headless` selects a non-interactive launch (agent/bottles/label from
flags, no TUI selectors, no y/N prompt) for orchestrators,
CI, and webhook dispatch. The agent still execs on the inherited
stdio/PTY, so an orchestrator that allocates the PTY drives the session.
The launch core is shared with `cli.py resume <identity>` through The launch core is shared with `cli.py resume <identity>` through
the private orchestrator `_launch_bottle`. the private orchestrator `_launch_bottle`.
""" """
@@ -16,7 +21,7 @@ import tempfile
from pathlib import Path from pathlib import Path
from typing import Callable from typing import Callable
from ..agent_provider import runtime_for from ..agent_provider import get_provider, runtime_for
from ..backend import ( from ..backend import (
Bottle, Bottle,
BottleSpec, BottleSpec,
@@ -31,7 +36,7 @@ from ..bottle_state import (
is_preserved, is_preserved,
mark_preserved, mark_preserved,
) )
from ..log import info from ..log import info, die
from ..manifest import Manifest, ManifestIndex from ..manifest import Manifest, ManifestIndex
from ._common import PROG, USER_CWD, read_tty_line from ._common import PROG, USER_CWD, read_tty_line
from . import tui from . import tui
@@ -50,6 +55,39 @@ def cmd_start(argv: list[str]) -> int:
"or host auto-selection). Overrides the env var when set." "or host auto-selection). Overrides the env var when set."
), ),
) )
parser.add_argument(
"--headless",
action="store_true",
help=(
"non-interactive launch: take agent/bottles/label from flags, "
"skip all prompts. For orchestrators, CI, and webhooks."
),
)
parser.add_argument(
"--bottle",
action="append",
default=None,
metavar="NAME",
help=(
"bottle to compose, repeatable (order = merge order). In "
"--headless, defaults to the agent's own bottle when omitted."
),
)
parser.add_argument(
"--label",
default=None,
help="bottle label / terminal title (--headless default: agent name)",
)
parser.add_argument(
"--color",
default=None,
help="bottle color, one of the 16 ANSI color names (--headless default: none)",
)
parser.add_argument(
"--prompt",
default=None,
help="initial task prompt delivered to the agent (required with --headless)",
)
parser.add_argument( parser.add_argument(
"name", "name",
nargs="?", nargs="?",
@@ -61,6 +99,12 @@ def cmd_start(argv: list[str]) -> int:
dry_run = args.dry_run or os.environ.get("BOT_BOTTLE_DRY_RUN") == "1" dry_run = args.dry_run or os.environ.get("BOT_BOTTLE_DRY_RUN") == "1"
manifest = ManifestIndex.resolve(USER_CWD) manifest = ManifestIndex.resolve(USER_CWD)
backend_name: str | None = args.backend
if args.headless:
return _start_headless(
manifest, args, dry_run=dry_run, backend_name=backend_name
)
agent_name: str | None = args.name agent_name: str | None = args.name
if agent_name is None: if agent_name is None:
@@ -71,8 +115,6 @@ def cmd_start(argv: list[str]) -> int:
if agent_name is None: if agent_name is None:
return 0 return 0
backend_name: str | None = args.backend
# Bottle multiselect: always show after agent selection so operators # Bottle multiselect: always show after agent selection so operators
# can compose bottles at launch time without editing agent manifests. # can compose bottles at launch time without editing agent manifests.
available_bottles = manifest.all_bottle_names available_bottles = manifest.all_bottle_names
@@ -109,6 +151,83 @@ def cmd_start(argv: list[str]) -> int:
) )
# --- Headless launch -----------------------------------------------------
def _start_headless(
manifest: ManifestIndex,
args: argparse.Namespace,
*,
dry_run: bool,
backend_name: str | None,
) -> int:
"""Non-interactive launch path for orchestrators / CI / webhooks.
Resolves agent, bottles, label, and color from flags + manifest
defaults instead of the TUI selectors, and auto-confirms the
preflight. Otherwise runs the same launch core as the interactive
path, so the agent still execs on the inherited stdio/PTY — an
orchestrator allocates that PTY and relays it to its
desktop/mobile clients."""
agent_name = args.name
if not agent_name:
die("--headless requires an agent name: ./cli.py start <agent> --headless")
manifest.require_agent(agent_name) # raises ManifestError if unknown
prompt = args.prompt
if not prompt:
die(
"--headless requires --prompt: "
"./cli.py start <agent> --headless --prompt 'Do the thing'"
)
if args.bottle:
bottle_names: tuple[str, ...] = tuple(args.bottle)
else:
default_bottle = _peek_agent_bottle(manifest, agent_name)
if not default_bottle:
die(
f"--headless: agent '{agent_name}' has no default bottle; "
f"pass one or more --bottle NAME"
)
bottle_names = (default_bottle,)
label = _uniquify_label_headless(args.label or agent_name)
spec = BottleSpec(
manifest=manifest,
agent_name=agent_name,
copy_cwd=args.cwd,
user_cwd=USER_CWD,
label=label,
color=args.color or "",
bottle_names=bottle_names,
)
return _launch_bottle(
spec,
dry_run=dry_run,
backend_name=backend_name,
assume_yes=True,
headless_prompt_text=prompt,
)
def _uniquify_label_headless(label: str) -> str:
"""Non-interactive analog of `_resolve_unique_label`: if the label's
slug collides with a running bottle, append -2, -3, … until free,
logging the chosen label. Orchestrators fire-and-forget many bottles,
so silently picking a free name beats erroring on every collision."""
active_slugs = {a.slug for a in enumerate_active_agents()}
if docker_mod.slugify(label) not in active_slugs:
return label
n = 2
while docker_mod.slugify(f"{label}-{n}") in active_slugs:
n += 1
chosen = f"{label}-{n}"
info(f"label '{label}' already in use; using '{chosen}'")
return chosen
# --- Launch helpers ------------------------------------------------------ # --- Launch helpers ------------------------------------------------------
@@ -376,10 +495,19 @@ def _launch_bottle(
*, *,
dry_run: bool, dry_run: bool,
backend_name: str | None = None, backend_name: str | None = None,
assume_yes: bool = False,
headless_prompt_text: str = "",
) -> int: ) -> int:
"""Shared launch core for `start` and `resume`. Builds the plan, """Shared launch core for `start` and `resume`. Builds the plan,
prints / dry-runs / prompts as appropriate, brings the bottle up, prints / dry-runs / prompts as appropriate, brings the bottle up,
attaches claude, and prints the resume hint on session end.""" attaches claude, and prints the resume hint on session end.
`assume_yes` skips the interactive y/N confirmation (headless /
orchestrator launches), where there is no human at the prompt.
`headless_prompt_text` is passed to the provider's `headless_prompt`
method and the resulting args are appended to startup_args so the
agent receives the initial task without interactive input."""
stage_dir = Path(tempfile.mkdtemp(prefix="bot-bottle-stage.")) stage_dir = Path(tempfile.mkdtemp(prefix="bot-bottle-stage."))
identity = "" identity = ""
try: try:
@@ -387,7 +515,7 @@ def _launch_bottle(
spec, spec,
stage_dir=stage_dir, stage_dir=stage_dir,
render_preflight=_text_render_preflight(), render_preflight=_text_render_preflight(),
prompt_yes=_text_prompt_yes, prompt_yes=(lambda: True) if assume_yes else _text_prompt_yes,
dry_run=dry_run, dry_run=dry_run,
backend_name=backend_name, backend_name=backend_name,
) )
@@ -397,10 +525,17 @@ def _launch_bottle(
backend = get_bottle_backend(backend_name) backend = get_bottle_backend(backend_name)
with backend.launch(plan) as bottle: with backend.launch(plan) as bottle:
agent_provider_template = getattr(plan, "agent_provider_template", "claude") agent_provider_template = getattr(plan, "agent_provider_template", "claude")
extra_args: tuple[str, ...] = ()
if headless_prompt_text:
extra_args = tuple(
get_provider(agent_provider_template).headless_prompt(
headless_prompt_text
)
)
exit_code = attach_agent( exit_code = attach_agent(
bottle, bottle,
agent_provider_template=agent_provider_template, agent_provider_template=agent_provider_template,
startup_args=plan.agent_provision.startup_args, startup_args=plan.agent_provision.startup_args + extra_args,
) )
info( info(
f"session ended (exit {exit_code}); " f"session ended (exit {exit_code}); "
+10 -3
View File
@@ -217,7 +217,7 @@ class ClaudeAgentProvider(AgentProvider):
if not agent.skills: if not agent.skills:
return return
skills_dir = _skills_dir(plan.guest_home) skills_dir = _skills_dir(plan.guest_home)
bottle.exec(f"mkdir -p {skills_dir}", user="root") bottle.exec(f"mkdir -p {shlex.quote(skills_dir)}", user="root")
for name in agent.skills: for name in agent.skills:
src = host_skill_dir(name) src = host_skill_dir(name)
if not os.path.isdir(src): if not os.path.isdir(src):
@@ -227,9 +227,13 @@ class ClaudeAgentProvider(AgentProvider):
) )
dst = f"{skills_dir}/{name}" dst = f"{skills_dir}/{name}"
info(f"copying skill {name} into {bottle.name}:{dst}") info(f"copying skill {name} into {bottle.name}:{dst}")
bottle.exec(f"rm -rf {dst} && mkdir -p {dst}", user="root") # Defense in depth: skill names are validated kebab-case at
# manifest load, but quote the path so a future unvalidated
# field can't inject shell metacharacters here either.
dst_q = shlex.quote(dst)
bottle.exec(f"rm -rf {dst_q} && mkdir -p {dst_q}", user="root")
bottle.cp_in(f"{src}/.", f"{dst}/") bottle.cp_in(f"{src}/.", f"{dst}/")
bottle.exec(f"chown -R node:node {dst}", user="root") bottle.exec(f"chown -R node:node {dst_q}", user="root")
def provision_prompt(self, plan: "BottlePlan", bottle: "Bottle") -> str | None: def provision_prompt(self, plan: "BottlePlan", bottle: "Bottle") -> str | None:
"""Copy the prompt file into the guest, fix ownership/mode. """Copy the prompt file into the guest, fix ownership/mode.
@@ -309,6 +313,9 @@ class ClaudeAgentProvider(AgentProvider):
f"claude mcp add --scope user --transport http supervise {supervise_url}" f"claude mcp add --scope user --transport http supervise {supervise_url}"
) )
def headless_prompt(self, prompt: str) -> list[str]:
return ["-p", prompt]
def _exec(bottle: "Bottle", script: str, error: str) -> None: def _exec(bottle: "Bottle", script: str, error: str) -> None:
result = bottle.exec(script, user="root") result = bottle.exec(script, user="root")
+10 -3
View File
@@ -183,7 +183,7 @@ class CodexAgentProvider(AgentProvider):
if not agent.skills: if not agent.skills:
return return
skills_dir = _skills_dir(plan.guest_home) skills_dir = _skills_dir(plan.guest_home)
bottle.exec(f"mkdir -p {skills_dir}", user="root") bottle.exec(f"mkdir -p {shlex.quote(skills_dir)}", user="root")
for name in agent.skills: for name in agent.skills:
src = host_skill_dir(name) src = host_skill_dir(name)
if not os.path.isdir(src): if not os.path.isdir(src):
@@ -193,9 +193,13 @@ class CodexAgentProvider(AgentProvider):
) )
dst = f"{skills_dir}/{name}" dst = f"{skills_dir}/{name}"
info(f"copying skill {name} into {bottle.name}:{dst}") info(f"copying skill {name} into {bottle.name}:{dst}")
bottle.exec(f"rm -rf {dst} && mkdir -p {dst}", user="root") # Defense in depth: skill names are validated kebab-case at
# manifest load, but quote the path so a future unvalidated
# field can't inject shell metacharacters here either.
dst_q = shlex.quote(dst)
bottle.exec(f"rm -rf {dst_q} && mkdir -p {dst_q}", user="root")
bottle.cp_in(f"{src}/.", f"{dst}/") bottle.cp_in(f"{src}/.", f"{dst}/")
bottle.exec(f"chown -R node:node {dst}", user="root") bottle.exec(f"chown -R node:node {dst_q}", user="root")
def provision_prompt(self, plan: "BottlePlan", bottle: "Bottle") -> str | None: def provision_prompt(self, plan: "BottlePlan", bottle: "Bottle") -> str | None:
"""Copy the prompt file into the guest, fix ownership/mode. """Copy the prompt file into the guest, fix ownership/mode.
@@ -275,6 +279,9 @@ class CodexAgentProvider(AgentProvider):
f"codex mcp add supervise --url {shlex.quote(supervise_url)}" f"codex mcp add supervise --url {shlex.quote(supervise_url)}"
) )
def headless_prompt(self, prompt: str) -> list[str]:
return [prompt]
def _exec(bottle: "Bottle", script: str, error: str) -> None: def _exec(bottle: "Bottle", script: str, error: str) -> None:
result = bottle.exec(script, user="root") result = bottle.exec(script, user="root")
+145
View File
@@ -0,0 +1,145 @@
"""Forge abstraction (PRD forge-native-integration, chunk 3).
The `Forge` abstract class is the provider-agnostic surface a forge
sidecar dispatches to: read issues/comments, post comments, edit
descriptions, and the membership / PR lookups the orchestrator needs.
Each forge (Gitea first) implements it; the sidecar protocol and the
agent prompt stay forge-agnostic.
`signal_done` is deliberately *not* a `Forge` method — completion is a
sidecar concept relayed to the orchestrator over a queue dir, not a
forge API operation.
`ScopedForge` enforces the PRD's **read-anywhere / write-scoped** model:
reads pass through to any issue/PR for context; writes are rejected
unless the target is the assigned issue or one of its PRs. This bounds
the blast radius of a prompt-injected agent below repo-wide API-key
permissions.
"""
from __future__ import annotations
import abc
from collections.abc import Iterable
from dataclasses import dataclass
@dataclass(frozen=True)
class Issue:
"""A forge issue or PR (forges model PRs as issues with the same
number)."""
number: int
title: str
body: str
state: str # "open" | "closed"
@dataclass(frozen=True)
class Comment:
id: int
user: str # login of the comment author
body: str
class ForgeScopeError(PermissionError):
"""Raised by `ScopedForge` when a write targets an issue/PR outside
the assigned scope."""
class Forge(abc.ABC):
"""Provider-agnostic forge operations. Implementations wrap a
per-provider HTTP client and translate to `Issue` / `Comment`."""
@abc.abstractmethod
def read_issue(self, number: int) -> Issue:
"""Read an issue or PR body (read-anywhere)."""
@abc.abstractmethod
def read_comments(self, number: int) -> list[Comment]:
"""Read a thread's comments (read-anywhere)."""
@abc.abstractmethod
def post_comment(self, number: int, body: str) -> None:
"""Post a comment to an issue or PR (write-scoped)."""
@abc.abstractmethod
def update_description(self, number: int, body: str) -> None:
"""Replace an issue or PR body (write-scoped)."""
@abc.abstractmethod
def is_org_member(self, org: str, username: str) -> bool:
"""Whether `username` is a member of `org`."""
@abc.abstractmethod
def get_pr_for_issue(self, number: int) -> int | None:
"""The PR number linked to an issue, or None when there is none."""
@abc.abstractmethod
def is_pr_open(self, number: int) -> bool:
"""Whether the given PR is still open."""
class ScopedForge(Forge):
"""Read-anywhere / write-scoped wrapper around a concrete `Forge`.
`post_comment` and `update_description` are rejected with
`ForgeScopeError` unless the target number is the assigned issue or
one of the assigned PRs. Every other method delegates unchanged, so
reads, membership checks, and PR lookups work against any number for
context.
The writable set is fixed at construction. The sidecar reconstructs
a `ScopedForge` when a PR is discovered (`get_pr_for_issue`) so the
new PR becomes writable; this class does not mutate its own scope.
"""
def __init__(
self,
inner: Forge,
*,
assigned_issue: int,
assigned_prs: Iterable[int] = (),
) -> None:
self._inner = inner
self._assigned_issue = assigned_issue
self._writable = {assigned_issue, *assigned_prs}
@property
def writable(self) -> frozenset[int]:
return frozenset(self._writable)
def _check_write(self, number: int) -> None:
if number not in self._writable:
allowed = ", ".join(str(n) for n in sorted(self._writable))
raise ForgeScopeError(
f"write to #{number} denied: out of assigned scope "
f"(writable: {allowed})"
)
# --- read-anywhere: pass through --------------------------------------
def read_issue(self, number: int) -> Issue:
return self._inner.read_issue(number)
def read_comments(self, number: int) -> list[Comment]:
return self._inner.read_comments(number)
def is_org_member(self, org: str, username: str) -> bool:
return self._inner.is_org_member(org, username)
def get_pr_for_issue(self, number: int) -> int | None:
return self._inner.get_pr_for_issue(number)
def is_pr_open(self, number: int) -> bool:
return self._inner.is_pr_open(number)
# --- write-scoped: check then delegate --------------------------------
def post_comment(self, number: int, body: str) -> None:
self._check_write(number)
self._inner.post_comment(number, body)
def update_description(self, number: int, body: str) -> None:
self._check_write(number)
self._inner.update_description(number, body)
+164
View File
@@ -0,0 +1,164 @@
"""Gitea HTTP client + `GiteaForge` (PRD forge-native-integration, chunk 3).
`GiteaClient` is the thin stdlib-only HTTP transport (mirrors
`deploy_key_provisioner.py`: `urllib.request`, bounded timeouts,
structured error bodies). `GiteaForge` adapts it to the provider-agnostic
`Forge` surface.
Unlike the option-2 design, the token is held here (the sidecar process
owns it) and passed to the client directly — there is no agent-side
cred-proxy route, because the agent never makes forge calls. The HTTP
client is the one piece shared with `GiteaDeployKeyProvisioner`; the two
are deliberately *not* unified behind a common abstract base (see the
deferral note in the PRD).
"""
from __future__ import annotations
import json
import urllib.error
import urllib.request
from typing import Any
from ..forge.base import Comment, Forge, Issue
# Bound every Gitea call: a hung instance must not stall the sidecar.
_API_TIMEOUT_SECS = 30
class GiteaClient:
"""Thin authenticated HTTP client for one repo's Gitea API.
`api_url` is the API base *including* `/api/v1` (matching the
`FORGE_GITEA_API` env var), e.g. `https://gitea.example.com/api/v1`.
"""
def __init__(self, *, api_url: str, owner: str, repo: str, token: str) -> None:
self._api_url = api_url.rstrip("/")
self._owner = owner
self._repo = repo
self._token = token
# --- low-level request -------------------------------------------------
def _request(
self, method: str, path: str, *, body: dict[str, Any] | None = None
) -> tuple[int, Any]:
"""Issue an authenticated request. Returns `(status, parsed_json)`;
parsed_json is None when the response has no body. Raises
`RuntimeError` on any non-2xx except where callers special-case
the HTTPError themselves (membership 404)."""
url = f"{self._api_url}{path}"
data = json.dumps(body).encode() if body is not None else None
headers = {"Authorization": f"token {self._token}"}
if data is not None:
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, data=data, headers=headers, method=method)
with urllib.request.urlopen(req, timeout=_API_TIMEOUT_SECS) as resp:
raw = resp.read()
parsed = json.loads(raw) if raw else None
return resp.status, parsed
def _repo_path(self, suffix: str) -> str:
return f"/repos/{self._owner}/{self._repo}{suffix}"
# --- operations --------------------------------------------------------
def is_org_member(self, org: str, username: str) -> bool:
"""GET /orgs/{org}/members/{username}: 2xx → member, 404 → not.
Other errors propagate so a misconfigured token fails loudly."""
url = f"{self._api_url}/orgs/{org}/members/{username}"
req = urllib.request.Request(
url, headers={"Authorization": f"token {self._token}"}, method="GET"
)
try:
with urllib.request.urlopen(req, timeout=_API_TIMEOUT_SECS):
return True
except urllib.error.HTTPError as exc:
if exc.code == 404:
return False
raise RuntimeError(
f"org membership check failed for {org}/{username}: "
f"HTTP {exc.code}{_read_error_body(exc)}"
) from exc
def get_issue(self, number: int) -> dict[str, Any]:
_status, body = self._request("GET", self._repo_path(f"/issues/{number}"))
return body or {}
def get_comments(self, number: int) -> list[dict[str, Any]]:
_status, body = self._request(
"GET", self._repo_path(f"/issues/{number}/comments")
)
return body or []
def post_comment(self, number: int, body: str) -> None:
self._request(
"POST",
self._repo_path(f"/issues/{number}/comments"),
body={"body": body},
)
def patch_issue_body(self, number: int, body: str) -> None:
self._request(
"PATCH", self._repo_path(f"/issues/{number}"), body={"body": body}
)
def get_pull(self, number: int) -> dict[str, Any]:
_status, body = self._request("GET", self._repo_path(f"/pulls/{number}"))
return body or {}
class GiteaForge(Forge):
"""`Forge` over a `GiteaClient`."""
def __init__(self, client: GiteaClient) -> None:
self._client = client
def read_issue(self, number: int) -> Issue:
raw = self._client.get_issue(number)
return Issue(
number=int(raw.get("number", number)),
title=str(raw.get("title", "")),
body=str(raw.get("body", "") or ""),
state=str(raw.get("state", "")),
)
def read_comments(self, number: int) -> list[Comment]:
return [
Comment(
id=int(c.get("id", 0)),
user=str((c.get("user") or {}).get("login", "")),
body=str(c.get("body", "") or ""),
)
for c in self._client.get_comments(number)
]
def post_comment(self, number: int, body: str) -> None:
self._client.post_comment(number, body)
def update_description(self, number: int, body: str) -> None:
self._client.patch_issue_body(number, body)
def is_org_member(self, org: str, username: str) -> bool:
return self._client.is_org_member(org, username)
def get_pr_for_issue(self, number: int) -> int | None:
"""Gitea models a PR as an issue with the same number, exposing a
`pull_request` object on the issue. When the queried number is
itself a PR, return it; otherwise None. (The orchestrator tracks
the issue→PR mapping in forge state for the cross-number case.)"""
raw = self._client.get_issue(number)
if raw.get("pull_request"):
return int(raw.get("number", number))
return None
def is_pr_open(self, number: int) -> bool:
return self._client.get_pull(number).get("state") == "open"
def _read_error_body(exc: urllib.error.HTTPError) -> str:
try:
return exc.read().decode("utf-8", errors="replace")
except Exception: # pylint: disable=broad-exception-caught
return ""
+105
View File
@@ -0,0 +1,105 @@
"""Forge state persistence (PRD forge-native-integration, chunk 2).
The orchestrator tracks one record per forge-targeted issue so it can
map an incoming webhook back to the bottle handling it, drive the
freeze / rehydrate loop, and run the watchdog. State lives on disk and
survives orchestrator restarts:
~/.bot-bottle/forge/<owner>/<repo>/issue-<n>.json
Writes are atomic (`os.replace`) so a crash mid-write never leaves a
truncated record.
"""
from __future__ import annotations
import json
import os
from dataclasses import asdict, dataclass, field, fields
from typing import Any
from pathlib import Path
from ...supervise import bot_bottle_root
_FORGE_SUBDIR = "forge"
# Lifecycle: a bottle is launched (running), frozen on the done signal,
# and destroyed when the PR closes.
STATUS_RUNNING = "running"
STATUS_FROZEN = "frozen"
STATUS_DESTROYED = "destroyed"
@dataclass
class ForgeState:
"""One forge-targeted issue's bottle lifecycle record."""
owner: str
repo: str
issue_number: int
slug: str
agent_name: str
bottle_names: list[str] = field(default_factory=list)
backend_name: str = ""
agent_git_user: str = ""
pr_number: int | None = None
status: str = STATUS_RUNNING
last_checkin_at: str = ""
def to_json(self) -> str:
return json.dumps(asdict(self), indent=2, sort_keys=True)
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "ForgeState":
# Tolerate unknown keys (forward-compat) by filtering to fields.
known = {f.name for f in fields(cls)}
return cls(**{k: v for k, v in data.items() if k in known})
def _forge_root() -> Path:
return bot_bottle_root() / _FORGE_SUBDIR
def forge_state_path(owner: str, repo: str, issue_number: int) -> Path:
return _forge_root() / owner / repo / f"issue-{issue_number}.json"
def write_forge_state(state: ForgeState) -> None:
"""Persist `state` atomically. Creates parent dirs as needed."""
path = forge_state_path(state.owner, state.repo, state.issue_number)
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(".json.tmp")
tmp.write_text(state.to_json())
os.replace(tmp, path)
def read_forge_state(owner: str, repo: str, issue_number: int) -> ForgeState | None:
"""Load state for one issue, or None when no record exists."""
path = forge_state_path(owner, repo, issue_number)
try:
data = json.loads(path.read_text())
except FileNotFoundError:
return None
return ForgeState.from_dict(data)
def delete_forge_state(owner: str, repo: str, issue_number: int) -> None:
"""Remove an issue's record. Missing file is success (idempotent)."""
path = forge_state_path(owner, repo, issue_number)
path.unlink(missing_ok=True)
def all_forge_states() -> list[ForgeState]:
"""Every persisted record, for the orchestrate-status table and the
watchdog sweep. Unreadable files are skipped rather than aborting the
whole listing."""
root = _forge_root()
if not root.is_dir():
return []
states: list[ForgeState] = []
for path in sorted(root.glob("*/*/issue-*.json")):
try:
states.append(ForgeState.from_dict(json.loads(path.read_text())))
except (OSError, ValueError, TypeError):
continue
return states
+103
View File
@@ -0,0 +1,103 @@
"""Provenance footer (PRD forge-native-integration, chunk 5).
Every orchestrator-posted comment ends with this footer — non-optional
and not configurable off. It renders the run's audit trail (agent,
bottle, timing, exit, gitleaks, done-signal source, egress) as a
collapsed markdown block the reviewer sees at the moment of the merge
decision.
The function is pure: the orchestrator, which holds the run context,
supplies the values. In particular `egress_routes` is the pre-rendered
list of allowed-route lines the orchestrator computed from the run's
resolved egress policy — this module does not parse backend-specific
egress state. (The PRD sketch named an `egress_log_path`; passing the
already-rendered lines keeps the footer builder pure and fully testable
and leaves egress-state parsing where the data lives.)
"""
from __future__ import annotations
from datetime import datetime
def _parse(ts: str) -> datetime | None:
try:
return datetime.fromisoformat(ts)
except (ValueError, TypeError):
return None
def _format_duration(started_at: str, finished_at: str) -> str:
start = _parse(started_at)
end = _parse(finished_at)
if start is None or end is None:
return "unknown"
secs = int((end - start).total_seconds())
if secs < 0:
return "unknown"
if secs < 60:
return f"{secs}s"
return f"{secs // 60}m {secs % 60}s"
def build_provenance_footer(
slug: str,
*,
agent_name: str,
bottle_names: tuple[str, ...],
started_at: str,
finished_at: str,
exit_code: int,
watchdog_fired: bool = False,
gitleaks_clean: bool | None = None,
egress_routes: list[str] | None = None,
) -> str:
"""Return a markdown string for appending to a Gitea comment body.
`watchdog_fired=True` marks runs where the agent did not signal
completion, so reviewers know the audit trail may be incomplete.
`gitleaks_clean=None` renders the gitleaks row as "not run".
`egress_routes` is omitted entirely when None/empty.
"""
bottle_label = ", ".join(f"`{b}`" for b in bottle_names) if bottle_names else ""
exit_cell = f"{exit_code} {'' if exit_code == 0 else ''}"
if gitleaks_clean is None:
gitleaks_cell = "— not run"
elif gitleaks_clean:
gitleaks_cell = "✓ no secrets detected"
else:
gitleaks_cell = "✗ secrets detected"
if watchdog_fired:
done_cell = "watchdog — agent did not signal"
else:
done_cell = "sidecar `signal_done`"
lines = [
"<details><summary>🔬 Run provenance</summary>",
"",
"| Field | Value |",
"|---|---|",
f"| agent | `{agent_name}` |",
f"| bottle | {bottle_label} |",
f"| slug | `{slug}` |",
f"| started | {started_at} |",
f"| duration | {_format_duration(started_at, finished_at)} |",
f"| exit | {exit_cell} |",
f"| gitleaks | {gitleaks_cell} |",
f"| done signal | {done_cell} |",
]
if egress_routes:
lines.append("")
lines.append(
f"**Egress** (deny-by-default; {len(egress_routes)} "
f"route{'s' if len(egress_routes) != 1 else ''} allowed)"
)
for route in egress_routes:
lines.append(f"- {route}")
lines.append("")
lines.append("</details>")
return "\n".join(lines)
+10 -3
View File
@@ -238,7 +238,7 @@ class PiAgentProvider(AgentProvider):
if not agent.skills: if not agent.skills:
return return
skills_dir = _skills_dir(plan.guest_home) skills_dir = _skills_dir(plan.guest_home)
bottle.exec(f"mkdir -p {skills_dir}", user="root") bottle.exec(f"mkdir -p {shlex.quote(skills_dir)}", user="root")
for name in agent.skills: for name in agent.skills:
src = host_skill_dir(name) src = host_skill_dir(name)
if not os.path.isdir(src): if not os.path.isdir(src):
@@ -248,9 +248,13 @@ class PiAgentProvider(AgentProvider):
) )
dst = f"{skills_dir}/{name}" dst = f"{skills_dir}/{name}"
info(f"copying skill {name} into {bottle.name}:{dst}") info(f"copying skill {name} into {bottle.name}:{dst}")
bottle.exec(f"rm -rf {dst} && mkdir -p {dst}", user="root") # Defense in depth: skill names are validated kebab-case at
# manifest load, but quote the path so a future unvalidated
# field can't inject shell metacharacters here either.
dst_q = shlex.quote(dst)
bottle.exec(f"rm -rf {dst_q} && mkdir -p {dst_q}", user="root")
bottle.cp_in(f"{src}/.", f"{dst}/") bottle.cp_in(f"{src}/.", f"{dst}/")
bottle.exec(f"chown -R node:node {dst}", user="root") bottle.exec(f"chown -R node:node {dst_q}", user="root")
def provision_prompt(self, plan: "BottlePlan", bottle: "Bottle") -> str | None: def provision_prompt(self, plan: "BottlePlan", bottle: "Bottle") -> str | None:
prompt_path = _prompt_path(plan.guest_home) prompt_path = _prompt_path(plan.guest_home)
@@ -311,6 +315,9 @@ class PiAgentProvider(AgentProvider):
) -> None: ) -> None:
del plan, bottle, supervise_url del plan, bottle, supervise_url
def headless_prompt(self, prompt: str) -> list[str]:
return ["-p", prompt]
def _exec(bottle: "Bottle", script: str, error: str) -> None: def _exec(bottle: "Bottle", script: str, error: str) -> None:
result = bottle.exec(script, user="root") result = bottle.exec(script, user="root")
+11 -1
View File
@@ -8,7 +8,7 @@ from typing import cast
from .agent_provider import PROVIDER_TEMPLATES from .agent_provider import PROVIDER_TEMPLATES
from .manifest_util import ManifestError, as_json_object from .manifest_util import ManifestError, as_json_object
from .manifest_git import ManifestGitUser from .manifest_git import ManifestGitUser
from .manifest_schema import AGENT_MODEL_KEYS from .manifest_schema import AGENT_MODEL_KEYS, is_valid_entity_name
@dataclass(frozen=True) @dataclass(frozen=True)
@@ -161,6 +161,16 @@ class ManifestAgent:
f"agent '{name}' skills[{i}] must be a string " f"agent '{name}' skills[{i}] must be a string "
f"(was {type(skill).__name__})" f"(was {type(skill).__name__})"
) )
# Skill names become host/guest path segments and are
# interpolated into provisioning shell commands, so they
# must fit the same kebab-case convention as bottle/agent
# filenames — rejecting anything that could break out of a
# path segment or inject shell metacharacters.
if not is_valid_entity_name(skill):
raise ManifestError(
f"agent '{name}' skills[{i}] {skill!r} is not a valid "
f"skill name; must match [a-z][a-z0-9-]*"
)
collected.append(skill) collected.append(skill)
skills = tuple(collected) skills = tuple(collected)
+8 -1
View File
@@ -33,13 +33,20 @@ AGENT_KEYS = (
AGENT_MODEL_KEYS = AGENT_KEYS | frozenset({"prompt"}) AGENT_MODEL_KEYS = AGENT_KEYS | frozenset({"prompt"})
def is_valid_entity_name(name: str) -> bool:
"""True if `name` fits the kebab-case `[a-z][a-z0-9-]*` convention
shared by bottle/agent filenames and skill names. Names that satisfy
this are also safe to interpolate into a host/guest path segment."""
return bool(_FILENAME_RX.match(name))
def entity_name_from_path(path: Path) -> str | None: def entity_name_from_path(path: Path) -> str | None:
"""Return the entity name implied by the filename, or None if the """Return the entity name implied by the filename, or None if the
filename does not fit the [a-z][a-z0-9-]* convention.""" filename does not fit the [a-z][a-z0-9-]* convention."""
if path.suffix != ".md": if path.suffix != ".md":
return None return None
stem = path.stem stem = path.stem
if not _FILENAME_RX.match(stem): if not is_valid_entity_name(stem):
return None return None
return stem return stem
@@ -0,0 +1,460 @@
# PRD prd-new: Forge native integration
- **Status:** Draft
- **Author:** claude
- **Created:** 2026-06-29
- **Issue:** #317
## Summary
Add a webhook-driven orchestration layer that lets Gitea issues and PR comments
drive bot-bottle sessions end-to-end with no operator in the loop for the happy
path. An issue assigned to a member of the configured agent org and labelled
with an agent name triggers a headless bottle launch; the bottle processes the
issue, opens a PR, and interacts with the forge through a **forge sidecar**
the agent never touches the Gitea API or its credentials directly. The agent
calls `signal_done(status, summary)` on the sidecar when a work unit is
complete; the sidecar relays that to the orchestrator over a queue dir (the same
pattern as the supervise sidecar), so completion is an unambiguous in-band
signal rather than a comment the orchestrator has to parse. The orchestrator
freezes the bottle and attaches a provenance footer. Subsequent PR comments
rehydrate the frozen bottle. The bottle is destroyed when the PR closes.
The forge sidecar is backed by a `Forge` abstract class with per-provider
implementations (Gitea first), so the agent's prompts and the sidecar protocol
stay forge-agnostic. The sidecar logs forge operations semantically ("read PR
description", "posted comment", "signalled done"), giving richer provenance than
post-hoc egress-byte parsing, and enforces a **read-anywhere / write-scoped**
permission model: the agent may read for context but may only write to the
issue and PRs it was assigned.
The separation of concerns across the two layers: bot-bottle owns the headless
launch primitives, the forge sidecar + `Forge` abstraction, forge state, and the
provenance builder. `bot-bottle-orchestrator` (separate binary) owns the webhook
listener, bottle lifecycle loop, and monitoring dashboard; it calls into
bot-bottle via `./cli.py orchestrate`, a thin wrapper command. This PRD covers
bot-bottle's side of that contract.
## Problem
Today an operator must open the TUI, select an agent and bottle, confirm the
preflight, and type prompts interactively. This blocks "issue → PR" automation
and produces no durable audit record of what the agent did. The security model
already provides the right isolation and egress controls, and `start --headless`
(#315) already gives `bot-bottle-orchestrator` a non-interactive launch path.
The missing pieces are a headless `resume` counterpart for rehydrating frozen
bottles, a forge-interaction surface the agent uses to read context, post
comments, and signal completion, and the provenance trail that makes the audit
story legible to reviewers on every PR.
That forge-interaction surface could be built two ways: (2) give the agent the
Gitea API directly with cred-proxy injecting the token, or (3) put a forge
sidecar between the agent and the forge. This PRD takes **option 3**. The
deciding factors: a sidecar `signal_done` call is an unambiguous completion
signal where comment-parsing is a correctness risk that surfaces in production;
the sidecar produces a semantic audit trail rather than HTTP bytes, which is
load-bearing for provenance (the stated product priority); and the sidecar can
enforce scope tighter than repo-wide API-key permissions, reducing blast radius
for a prompt-injected agent. The costs — a second sidecar process per forge run,
a new failure mode if it crashes, and per-forge implementation cost — are
accepted as the price of those properties.
## Goals / Success Criteria
1. Headless launch already exists: `./cli.py start <agent> --headless --prompt`
(#315) runs non-interactively with no TUI selectors or y/N preflight. This
PRD builds on it rather than re-introducing it. The remaining gap is a
matching headless `resume` path (`./cli.py resume --headless`), since
rehydrating a frozen bottle for a new prompt is required by the freeze /
rehydrate loop and `resume` has no non-interactive entry point today.
2. An issue assigned to a member of the configured org (`FORGE_ORG`, default
`bot-bottle`) and labelled `bot-bottle:<agent-name>` is the trigger
convention. Org membership is verified via the Gitea API at event time.
3. Forge-targeted bottles run a **forge sidecar** that exposes a small,
forge-agnostic API (comment/issue/PR CRUD plus `signal_done`) over the same
queue-dir + HTTP/JSON-RPC machinery as the supervise sidecar. The agent calls
the sidecar; it never sees the forge token or forge-specific endpoints.
4. The sidecar is backed by a `Forge` abstract class. Gitea is the first
concrete implementation; adding a forge means a new subclass, not changes to
the agent prompt or sidecar protocol. The sidecar enforces a read-anywhere /
write-scoped model: writes are limited to the assigned issue and its PRs;
reads are unrestricted for context.
5. The agent calls `signal_done(status, summary)` on the sidecar when a work
unit is complete; the sidecar relays it to the orchestrator over a queue dir.
This is the done signal — no comment parsing. A watchdog timeout
(configurable, default 30 min) causes the orchestrator to treat the run as
done-without-self-report if the agent exits without signalling.
6. Every orchestrator-posted comment ends with a provenance footer: agent name,
bottle name(s), slug, start time, duration, exit code, gitleaks result, and
egress summary.
7. Forge state (issue → slug, status) is persisted to disk and survives
orchestrator restarts.
8. `./cli.py orchestrate status` lists active forge-managed bottles and their
issue/PR URLs.
9. Unit tests cover: label parsing, org-membership check path, forge state
read/write, provenance footer rendering, headless launch arg construction,
forge env var injection, sidecar request dispatch through the `Forge`
abstraction, write-scope enforcement (reject writes outside the assigned
issue/PRs), and `signal_done` queue relay.
## Non-goals
- Webhook signature verification (HMAC-SHA256). Added as a follow-up.
- The `bot-bottle-orchestrator` binary itself — this PRD covers bot-bottle's
side of the interface only. The orchestrator is a separate project.
- GitHub or GitLab support.
- Multiple simultaneous forge bottles per issue.
- Automatic retry on agent error exit.
- Bottle destruction on issue close (PR close only; issue close is ambiguous).
- Concurrent multi-issue handling (one blocking run per orchestrator process).
- A monitoring dashboard (orchestrator-side concern).
- Folding `DeployKeyProvisioner` into the `Forge` abstraction. Deploy-key
provisioning runs at bottle-provision time on the host; the forge sidecar runs
inside the bottle at agent time. The two have different lifecycles and actors,
so coupling them into one class is deferred to a follow-up. This PRD only
shares the Gitea HTTP client between them.
## Design
### Targeting convention
An issue is forge-targeted when **both** hold:
- At least one assignee is a member of the Gitea org named by `FORGE_ORG`
(default `bot-bottle`). Checked via `GET /api/v1/orgs/{org}/members/{user}`.
- At least one label has the prefix `bot-bottle:`. The suffix names the agent
manifest, e.g. `bot-bottle:implementer` → agent `implementer`.
`FORGE_ORG` is read at orchestrate-command startup. It is not embedded in
manifests or state files; the orchestrator stamps its value into log output for
auditability.
An optional label `bot-bottle-bottle:<name>` overrides bottle selection. When
absent the agent's default bottle is used.
### `./cli.py orchestrate` — the thin wrapper
```
./cli.py orchestrate start --agent AGENT [--bottle BOTTLE ...] --prompt PROMPT
[--label LABEL] [--backend BACKEND]
./cli.py orchestrate resume --slug SLUG --prompt PROMPT [--backend BACKEND]
./cli.py orchestrate status
```
`orchestrate start` is a thin shim over the already-shipped `start --headless`
(#315): it forwards agent / bottle / label / prompt and adds the forge-specific
wiring (`forge_env`, sidecar launch). It does not re-implement headless launch.
The caller (`bot-bottle-orchestrator`) manages freeze, state, and the forge
sidecar's done signal around it.
`orchestrate resume` is the shim over the new `resume --headless` (below).
`orchestrate status` prints the forge state table.
### Headless primitives — what exists vs. what's new
Headless **start** already shipped in #315 and this PRD reuses it as-is:
- `./cli.py start <agent> --headless --prompt TEXT` — no TUI selectors, no y/N
preflight. Internally `_start_headless()` calls the shared `_launch_bottle()`
with `assume_yes=True` and `headless_prompt_text=prompt`.
- The prompt is delivered through `AgentProvider.headless_prompt(prompt)`
claude `-p`, codex positional, pi `-p`. The orchestrator does **not** hand-roll
agent args; it relies on this provider abstraction. (An earlier draft proposed
`start_headless` / `attach_agent_headless` helpers that constructed
`--no-interactive`/`-p` directly — those are dropped as redundant with, and
divergent from, what #315 merged.)
Two additions are needed on top of #315:
**1. A `forge_env` hook on the headless launch path.** The orchestrator needs to
pass forge context + token through to the forge sidecar launched alongside the
agent. This is a parameter threaded into `_launch_bottle` (the same core
`start --headless` already uses), not a parallel launch function. The agent
process itself does not receive the token.
**2. `resume --headless`** — new in `bot_bottle/cli/resume.py`, mirroring the
`--headless` flag on `start`:
```
./cli.py resume <slug> --headless --prompt TEXT
```
It rehydrates a frozen bottle and runs one headless prompt via the same
`assume_yes` + `headless_prompt` path, returning the agent's exit code. `resume`
has no non-interactive entry point today, so this is genuinely new work rather
than a rename of an existing helper.
### Forge sidecar
Forge-targeted bottles run a forge sidecar alongside the agent, mirroring the
supervise sidecar: a per-bottle process that exposes an HTTP/JSON-RPC endpoint
over a Unix socket and relays events to the orchestrator through a queue dir.
The agent calls the sidecar; the sidecar holds the forge token and makes the
actual forge API calls. The agent never receives the credential and never sees a
forge-specific endpoint — swapping Gitea for another forge does not change the
agent prompt or the sidecar protocol.
The sidecar is configured at launch from the forge context (owner, repo, issue,
PR) and the token, supplied by the orchestrator — not baked into the agent
manifest. Because the sidecar owns the token, forge traffic does not need a
cred-proxy egress route on the agent; the agent's egress policy is unchanged by
forge targeting.
**Sidecar protocol** (forge-agnostic; each method maps to a `Forge` call):
| Method | Scope | Purpose |
|---|---|---|
| `read_issue(number)` | read-anywhere | Read issue/PR body for context |
| `read_comments(number)` | read-anywhere | Read a thread for context |
| `post_comment(number, body)` | write-scoped | Post to the assigned issue/PR |
| `update_description(number, body)` | write-scoped | Edit the assigned issue/PR body |
| `signal_done(status, summary)` | — | Relay completion to the orchestrator |
**Scope enforcement** is read-anywhere / write-scoped: read methods accept any
issue/PR number for context; write methods are rejected unless the target is the
assigned issue or one of its PRs. This is tighter than Gitea's repo-wide API-key
permissions and bounds the blast radius of a prompt-injected agent. Rejections
are logged semantically (operation, target, reason) so the audit trail records
attempted out-of-scope writes, not just allowed ones.
**Semantic audit**: every sidecar call is logged as a structured operation
("read PR #318 description", "posted comment to #317", "signalled done:
success") rather than as opaque HTTP bytes. This log feeds provenance directly,
with no post-hoc egress-log parsing.
### `Forge` abstraction — `bot_bottle/contrib/forge/`
The sidecar dispatches to a `Forge` abstract class. Each provider implements the
operations behind the sidecar protocol:
```python
class Forge(abc.ABC):
@abc.abstractmethod
def read_issue(self, number: int) -> Issue: ...
@abc.abstractmethod
def read_comments(self, number: int) -> list[Comment]: ...
@abc.abstractmethod
def post_comment(self, number: int, body: str) -> None: ...
@abc.abstractmethod
def update_description(self, number: int, body: str) -> None: ...
@abc.abstractmethod
def is_org_member(self, org: str, username: str) -> bool: ...
@abc.abstractmethod
def get_pr_for_issue(self, number: int) -> int | None: ...
@abc.abstractmethod
def is_pr_open(self, number: int) -> bool: ...
```
`GiteaForge` is the first and only concrete implementation in this PRD. It wraps
the Gitea HTTP client (below). Adding GitHub or GitLab later is a new subclass;
the sidecar, protocol, and agent prompt are untouched.
> **Deferred:** `DeployKeyProvisioner` is *not* folded into `Forge` here.
> Deploy-key provisioning runs on the host at provision time; the sidecar runs
> in the bottle at agent time. They have different lifecycles and actors, so a
> shared abstract base would couple two unrelated auth contexts. For now they
> only share the Gitea HTTP client; a later PRD can revisit unification.
### Forge env vars
The orchestrator passes forge context to the **sidecar** (not the agent) at
launch. The agent does not need owner/repo/issue env vars to construct API
calls, since it only names issue/PR numbers to the sidecar:
| Var | Example | Purpose |
|---|---|---|
| `FORGE_GITEA_API` | `https://gitea.dideric.is/api/v1` | Base URL the sidecar calls |
| `FORGE_OWNER` | `didericis` | Repo owner |
| `FORGE_REPO` | `bot-bottle` | Repo name |
| `FORGE_ISSUE_NUMBER` | `317` | Assigned issue (defines write scope) |
| `FORGE_PR_NUMBER` | `318` | Assigned PR (empty until PR exists) |
The agent's forge-specific prompt instructs it to call `signal_done` on the
sidecar when a work unit is complete, and to use the sidecar for any
comment/description writes. The instruction is forge-agnostic and is part of the
forge prompt overlay, not the base agent manifest, so non-forge runs are
unaffected.
### Done signal and watchdog
The agent calls `signal_done(status, summary)` on the sidecar when it finishes a
work unit. The sidecar writes the event to its queue dir; the orchestrator reads
it and:
1. Reads the forge state for `(owner, repo, issue_number)`.
2. If `status == "running"`, treats the event as the done signal: freezes the
bottle, posts a summary comment with the provenance footer, sets
`status = "frozen"`.
Because completion is an explicit `signal_done` call, the orchestrator does not
parse comment text to detect "done", and intermediate comments the agent posts
mid-run cannot be mistaken for completion.
**Watchdog**: the orchestrator tracks `last_checkin_at` in forge state, updated
on each sidecar event. A background thread wakes every minute. If
`now - last_checkin_at > FORGE_WATCHDOG_TIMEOUT` (default 30 min, configurable
via env) and `status == "running"`, the orchestrator treats the run as
done-without-self-report: it posts the provenance footer (with `watchdog_fired`
set) and freezes the bottle.
**Sidecar-death failure mode**: if the forge sidecar crashes mid-run the agent
loses forge access while the bottle is otherwise healthy. The orchestrator
detects a dead sidecar (socket/queue gone) the same way it detects a stalled
agent and falls back to the watchdog path, posting a footer that flags the
incomplete run.
### Forge state — `bot_bottle/contrib/gitea/forge_state.py`
```
~/.bot-bottle/forge/
<owner>/
<repo>/
issue-<n>.json
```
Schema:
```json
{
"slug": "implementer-abc12",
"pr_number": 42,
"agent_name": "implementer",
"bottle_names": ["claude"],
"backend_name": "docker",
"agent_git_user": "didericis-claude",
"issue_number": 17,
"owner": "didericis",
"repo": "bot-bottle",
"status": "frozen",
"last_checkin_at": "2026-06-29T12:04:12-04:00"
}
```
`status`: `"running"` | `"frozen"` | `"destroyed"`.
Public API:
```python
def write_forge_state(state: ForgeState) -> None: ...
def read_forge_state(owner: str, repo: str, issue_number: int) -> ForgeState | None: ...
def delete_forge_state(owner: str, repo: str, issue_number: int) -> None: ...
def all_forge_states() -> list[ForgeState]: ...
```
Writes use atomic rename (`os.replace`) for crash safety.
### Provenance — `bot_bottle/contrib/gitea/provenance.py`
```python
def build_provenance_footer(
slug: str,
*,
agent_name: str,
bottle_names: tuple[str, ...],
started_at: str,
finished_at: str,
exit_code: int,
watchdog_fired: bool = False,
egress_log_path: Path | None = None,
) -> str:
"""Return a markdown string for appending to a Gitea comment body."""
```
Output (collapsed by default):
```markdown
<details><summary>🔬 Run provenance</summary>
| Field | Value |
|---|---|
| agent | `implementer` |
| bottle | `claude` |
| slug | `implementer-abc12` |
| started | 2026-06-29T12:00:00-04:00 |
| duration | 4m 12s |
| exit | 0 ✓ |
| gitleaks | ✓ no secrets detected |
| done signal | sidecar `signal_done` *(or: watchdog — agent did not signal)* |
**Egress** (deny-by-default; 2 routes allowed)
- `api.anthropic.com` — Bearer auth
- `pypi.org` — unauthenticated
Forge traffic is not an agent egress route — the forge sidecar holds the token
and makes those calls out of band. The provenance footer's forge operations come
from the sidecar's semantic audit log.
</details>
```
The egress summary is read from `~/.bot-bottle/state/<slug>/egress/`. When
unavailable the section is omitted. `watchdog_fired=True` changes the
"done signal" row to warn reviewers.
### Gitea HTTP client — `bot_bottle/contrib/gitea/client.py`
`GiteaForge` (and the existing `GiteaDeployKeyProvisioner`) share one thin HTTP
client. Unlike the option-2 design, the token is held by the sidecar process and
passed to the client directly — there is no agent-side cred-proxy route to
inject it, because the agent never makes forge calls.
```python
class GiteaClient:
def __init__(self, *, api_url: str, owner: str, repo: str, token: str) -> None: ...
def is_org_member(self, org: str, username: str) -> bool: ...
def post_comment(self, issue_number: int, body: str) -> None: ...
def update_comment_body(self, issue_number: int, body: str) -> None: ...
def get_pr_for_issue(self, issue_number: int) -> int | None: ...
def is_pr_open(self, pr_number: int) -> bool: ...
```
Sharing only the HTTP client (not an abstract base) is the deliberate boundary
between the sidecar and the deploy-key provisioner — see the deferral note under
the `Forge` abstraction.
### Implementation chunks
1. **Headless additions on top of #315** — thread a `forge_env` parameter into
the existing `_launch_bottle` core (the one `start --headless` already uses);
add a `--headless` path to `cli/resume.py` reusing `assume_yes` +
`headless_prompt`. No new `start_headless`/`attach_agent_headless` helpers.
Tests: `forge_env` reaches the sidecar/`guest_env`; `resume --headless` skips
the TUI and y/N preflight and returns the agent exit code.
2. **Forge state**`contrib/gitea/forge_state.py`: `ForgeState` dataclass,
read/write/delete/all helpers, atomic rename. Tests: round-trip JSON, missing
file → None, atomic write.
3. **`Forge` abstraction + Gitea client** — `contrib/forge/base.py` (`Forge`
ABC) and `contrib/gitea/client.py` + `GiteaForge`: `is_org_member`,
`read_issue`, `read_comments`, `post_comment`, `update_description`,
`get_pr_for_issue`, `is_pr_open`. Tests: mock `urllib.request.urlopen`,
assert payloads and 404-as-false for membership.
4. **Forge sidecar** — sidecar process exposing the protocol over a Unix socket,
queue-dir relay, write-scope enforcement, semantic op log, `signal_done`.
Reuses the supervise sidecar bundle machinery. Tests: dispatch each method to
the `Forge`, reject out-of-scope writes, `signal_done` writes a queue event,
scope-rejection is logged.
5. **Provenance**`contrib/gitea/provenance.py`: `build_provenance_footer`.
Tests: required fields present, watchdog row text, egress omitted when log
absent.
6. **`./cli.py orchestrate`** — `cli/orchestrate.py` with `start`, `resume`,
`status` subcommands wired into `cli.py`; `start` launches the forge sidecar
alongside the agent for forge-targeted runs. Tests: arg parsing, `start`
delegates to `start --headless`, `resume` delegates to `resume --headless`.
## Provenance as the product
Every orchestrator-posted comment ends with the provenance footer — non-optional
and not configurable off. PRs that land without a footer were not produced by
this integration. The `watchdog_fired` flag in the footer flags runs where the
agent did not self-report completion, so reviewers know the audit trail may be
incomplete.
The footer links to the bot-bottle repo pinned to the commit SHA active during
the run (not `main`), so the policy that governed the run is permanently
anchored in the PR history.
+74
View File
@@ -0,0 +1,74 @@
"""Unit: `cli.py resume --headless` non-interactive rehydrate path.
The freeze / rehydrate loop needs a non-interactive `resume`: deliver a
follow-up prompt and skip the y/N preflight, reusing the same launch
core (`assume_yes` + `headless_prompt_text`) as `start --headless`.
"""
from __future__ import annotations
import unittest
from unittest.mock import MagicMock, patch
import bot_bottle.cli.resume as resume_mod
from bot_bottle.log import Die
def _metadata():
md = MagicMock()
md.agent_name = "implementer"
md.copy_cwd = False
md.cwd = "/repo"
md.identity = "implementer-abc12"
md.bottle_names = ["claude"]
md.backend = "docker"
return md
class ResumeHeadlessTest(unittest.TestCase):
def setUp(self) -> None:
self._launch = patch.object(
resume_mod, "_launch_bottle", return_value=0
).start()
patch.object(
resume_mod, "read_metadata", return_value=_metadata()
).start()
manifest = MagicMock()
manifest.require_agent = MagicMock(return_value=None)
patch.object(
resume_mod.ManifestIndex, "resolve", return_value=manifest
).start()
self.addCleanup(patch.stopall)
def _launch_kwargs(self) -> dict:
self._launch.assert_called_once()
return self._launch.call_args.kwargs
def test_headless_passes_assume_yes_and_prompt(self):
rc = resume_mod.cmd_resume(
["implementer-abc12", "--headless", "--prompt", "Address the review"]
)
self.assertEqual(0, rc)
kwargs = self._launch_kwargs()
self.assertTrue(kwargs["assume_yes"])
self.assertEqual("Address the review", kwargs["headless_prompt_text"])
def test_interactive_resume_unchanged(self):
resume_mod.cmd_resume(["implementer-abc12"])
kwargs = self._launch_kwargs()
self.assertFalse(kwargs["assume_yes"])
self.assertEqual("", kwargs["headless_prompt_text"])
def test_headless_without_prompt_errors(self):
with self.assertRaises(Die):
resume_mod.cmd_resume(["implementer-abc12", "--headless"])
self._launch.assert_not_called()
def test_prompt_without_headless_errors(self):
with self.assertRaises(Die):
resume_mod.cmd_resume(["implementer-abc12", "--prompt", "hi"])
self._launch.assert_not_called()
if __name__ == "__main__":
unittest.main()
+188
View File
@@ -0,0 +1,188 @@
"""Unit: `cli.py start --headless` non-interactive launch path.
Headless is the keystone for orchestrators, CI, and webhook
dispatch: agent/bottles/label come from flags + manifest defaults, no
TUI selectors fire, and the preflight y/N is auto-confirmed
(`assume_yes=True`). All actual launch work is stubbed so no container
is created.
"""
from __future__ import annotations
import os
import unittest
from unittest.mock import MagicMock, patch
import bot_bottle.cli.start as start_mod
import bot_bottle.cli.tui as tui_mod
from bot_bottle.backend import ActiveAgent
from bot_bottle.log import Die
from bot_bottle.manifest import ManifestError
def _make_manifest(
agent_names: list[str],
bottle_names: list[str] | None = None,
agent_bottle: str = "",
):
manifest = MagicMock()
manifest.agents = {name: MagicMock(bottle=agent_bottle) for name in agent_names}
manifest.all_agent_names = sorted(agent_names)
manifest.all_bottle_names = sorted(bottle_names or [])
manifest.home_md = None # eager mode so _peek_agent_bottle uses agents dict
manifest.require_agent = MagicMock(return_value=None)
return manifest
def _active_agent(slug: str) -> ActiveAgent:
return ActiveAgent(
backend_name="docker",
slug=slug,
agent_name="demo",
started_at="2026-01-01T00:00:00+00:00",
services=(),
)
class TestCmdStartHeadless(unittest.TestCase):
"""Drive `cmd_start --headless` with launch + TUI stubbed out."""
def setUp(self):
self._manifest = _make_manifest(
["researcher", "implementer"], ["claude", "dev"], agent_bottle="claude"
)
patch(
"bot_bottle.cli.start.ManifestIndex.resolve",
return_value=self._manifest,
).start()
self._launch_mock = patch(
"bot_bottle.cli.start._launch_bottle", return_value=0
).start()
# No bottles running by default → no label collision.
patch(
"bot_bottle.cli.start.enumerate_active_agents", return_value=[]
).start()
# If any TUI picker fires in headless mode, that's a bug.
self._agent_picker = patch.object(tui_mod, "filter_select").start()
self._bottle_picker = patch.object(tui_mod, "filter_multiselect").start()
self._modal = patch.object(tui_mod, "name_color_modal").start()
patch.dict(os.environ, {}, clear=False).start()
os.environ.pop("BOT_BOTTLE_BACKEND", None)
self.addCleanup(patch.stopall)
def _spec(self):
self._launch_mock.assert_called_once()
return self._launch_mock.call_args[0][0]
# -- no TUI in headless --------------------------------------------
def test_headless_fires_no_pickers(self):
rc = start_mod.cmd_start(
["--headless", "researcher", "--bottle", "claude", "--prompt", "Do it"]
)
self.assertEqual(0, rc)
self._agent_picker.assert_not_called()
self._bottle_picker.assert_not_called()
self._modal.assert_not_called()
def test_headless_assume_yes_forwarded(self):
start_mod.cmd_start(
["--headless", "researcher", "--bottle", "claude", "--prompt", "Do it"]
)
self.assertTrue(self._launch_mock.call_args[1]["assume_yes"])
# -- prompt --------------------------------------------------------
def test_headless_without_prompt_dies(self):
with self.assertRaises(Die):
start_mod.cmd_start(["--headless", "researcher", "--bottle", "claude"])
self._launch_mock.assert_not_called()
def test_headless_prompt_forwarded_to_launch(self):
start_mod.cmd_start(
["--headless", "researcher", "--bottle", "claude",
"--prompt", "Implement issue #42"]
)
self.assertEqual(
"Implement issue #42",
self._launch_mock.call_args[1]["headless_prompt_text"],
)
# -- bottle resolution ---------------------------------------------
def test_explicit_bottles_forwarded_in_order(self):
start_mod.cmd_start(
["--headless", "researcher", "--bottle", "dev", "--bottle", "claude",
"--prompt", "Do it"]
)
self.assertEqual(("dev", "claude"), self._spec().bottle_names)
def test_omitted_bottle_falls_back_to_agent_default(self):
start_mod.cmd_start(["--headless", "implementer", "--prompt", "Do it"])
self.assertEqual(("claude",), self._spec().bottle_names)
def test_no_bottle_and_no_default_dies(self):
manifest = _make_manifest(["researcher"], ["claude"], agent_bottle="")
with patch(
"bot_bottle.cli.start.ManifestIndex.resolve", return_value=manifest
):
with self.assertRaises(Die):
start_mod.cmd_start(
["--headless", "researcher", "--prompt", "Do it"]
)
self._launch_mock.assert_not_called()
# -- agent resolution ----------------------------------------------
def test_missing_agent_name_dies(self):
with self.assertRaises(Die):
start_mod.cmd_start(["--headless"])
self._launch_mock.assert_not_called()
def test_unknown_agent_raises_manifest_error(self):
self._manifest.require_agent.side_effect = ManifestError("agent 'x' not defined")
with self.assertRaises(ManifestError):
start_mod.cmd_start(
["--headless", "x", "--bottle", "claude", "--prompt", "Do it"]
)
self._launch_mock.assert_not_called()
# -- label / color -------------------------------------------------
def test_label_defaults_to_agent_name(self):
start_mod.cmd_start(
["--headless", "researcher", "--bottle", "claude", "--prompt", "Do it"]
)
self.assertEqual("researcher", self._spec().label)
def test_explicit_label_and_color_forwarded(self):
start_mod.cmd_start(
["--headless", "researcher", "--bottle", "claude",
"--label", "nightly", "--color", "green", "--prompt", "Do it"]
)
spec = self._spec()
self.assertEqual("nightly", spec.label)
self.assertEqual("green", spec.color)
def test_label_collision_uniquifies(self):
with patch(
"bot_bottle.cli.start.enumerate_active_agents",
return_value=[_active_agent("researcher")],
):
start_mod.cmd_start(
["--headless", "researcher", "--bottle", "claude", "--prompt", "Do it"]
)
self.assertEqual("researcher-2", self._spec().label)
# -- backend wiring ------------------------------------------------
def test_backend_flag_forwarded(self):
start_mod.cmd_start(
["--headless", "--backend=docker", "researcher", "--bottle", "claude",
"--prompt", "Do it"]
)
self.assertEqual("docker", self._launch_mock.call_args[1]["backend_name"])
if __name__ == "__main__":
unittest.main()
@@ -343,5 +343,14 @@ class TestClaudeSuperviseMcp(unittest.TestCase):
) )
class TestClaudeHeadlessPrompt(unittest.TestCase):
def test_returns_p_flag_and_prompt(self):
self.assertEqual(["-p", "Do the task"], ClaudeAgentProvider().headless_prompt("Do the task"))
def test_preserves_prompt_text_verbatim(self):
text = "Fix issue #42: the widget breaks on empty input"
self.assertEqual(["-p", text], ClaudeAgentProvider().headless_prompt(text))
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
@@ -314,5 +314,14 @@ class TestCodexSuperviseMcp(unittest.TestCase):
) )
class TestCodexHeadlessPrompt(unittest.TestCase):
def test_returns_prompt_as_positional_arg(self):
self.assertEqual(["Do the task"], CodexAgentProvider().headless_prompt("Do the task"))
def test_preserves_prompt_text_verbatim(self):
text = "Fix issue #42: the widget breaks on empty input"
self.assertEqual([text], CodexAgentProvider().headless_prompt(text))
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
+95
View File
@@ -0,0 +1,95 @@
"""Unit: Forge abstraction + ScopedForge (PRD forge-native-integration)."""
from __future__ import annotations
import unittest
from bot_bottle.contrib.forge.base import (
Comment,
Forge,
ForgeScopeError,
Issue,
ScopedForge,
)
class _RecordingForge(Forge):
"""In-memory fake that records writes."""
def __init__(self) -> None:
self.comments: list[tuple[int, str]] = []
self.descriptions: list[tuple[int, str]] = []
def read_issue(self, number: int) -> Issue:
return Issue(number=number, title="t", body="b", state="open")
def read_comments(self, number: int) -> list[Comment]:
return [Comment(id=1, user="alice", body="hi")]
def post_comment(self, number: int, body: str) -> None:
self.comments.append((number, body))
def update_description(self, number: int, body: str) -> None:
self.descriptions.append((number, body))
def is_org_member(self, org: str, username: str) -> bool:
return username == "member"
def get_pr_for_issue(self, number: int) -> int | None:
return 99 if number == 17 else None
def is_pr_open(self, number: int) -> bool:
return True
class TestScopedForgeReads(unittest.TestCase):
def setUp(self) -> None:
self.inner = _RecordingForge()
self.scoped = ScopedForge(self.inner, assigned_issue=17, assigned_prs=[42])
def test_reads_pass_through_to_any_number(self):
# A number well outside the writable scope still reads fine.
self.assertEqual(123, self.scoped.read_issue(123).number)
self.assertEqual("alice", self.scoped.read_comments(500)[0].user)
def test_membership_and_pr_lookups_delegate(self):
self.assertTrue(self.scoped.is_org_member("bot-bottle", "member"))
self.assertFalse(self.scoped.is_org_member("bot-bottle", "stranger"))
self.assertEqual(99, self.scoped.get_pr_for_issue(17))
self.assertTrue(self.scoped.is_pr_open(8000))
class TestScopedForgeWrites(unittest.TestCase):
def setUp(self) -> None:
self.inner = _RecordingForge()
self.scoped = ScopedForge(self.inner, assigned_issue=17, assigned_prs=[42])
def test_writable_set_is_issue_plus_prs(self):
self.assertEqual(frozenset({17, 42}), self.scoped.writable)
def test_write_to_assigned_issue_allowed(self):
self.scoped.post_comment(17, "done")
self.assertEqual([(17, "done")], self.inner.comments)
def test_write_to_assigned_pr_allowed(self):
self.scoped.update_description(42, "new body")
self.assertEqual([(42, "new body")], self.inner.descriptions)
def test_comment_outside_scope_rejected(self):
with self.assertRaises(ForgeScopeError) as ctx:
self.scoped.post_comment(500, "spam")
self.assertIn("500", str(ctx.exception))
self.assertEqual([], self.inner.comments)
def test_description_outside_scope_rejected(self):
with self.assertRaises(ForgeScopeError):
self.scoped.update_description(500, "tamper")
self.assertEqual([], self.inner.descriptions)
def test_scope_error_is_permission_error(self):
# Sidecars can catch the stdlib base type.
self.assertTrue(issubclass(ForgeScopeError, PermissionError))
if __name__ == "__main__":
unittest.main()
+131
View File
@@ -0,0 +1,131 @@
"""Unit: GiteaClient + GiteaForge (PRD forge-native-integration)."""
from __future__ import annotations
import json
import unittest
import urllib.error
from io import BytesIO
from unittest.mock import MagicMock, patch
from bot_bottle.contrib.gitea.client import GiteaClient, GiteaForge
def _client() -> GiteaClient:
return GiteaClient(
api_url="https://gitea.example.com/api/v1",
owner="didericis",
repo="bot-bottle",
token="test-token",
)
def _resp(body, status: int = 200) -> MagicMock:
resp = MagicMock()
resp.read.return_value = json.dumps(body).encode() if body is not None else b""
resp.status = status
resp.__enter__ = lambda s: s
resp.__exit__ = MagicMock(return_value=False)
return resp
def _http_error(code: int, body: str = "") -> urllib.error.HTTPError:
return urllib.error.HTTPError(
url="http://x", code=code, msg="err", hdrs=None, # type: ignore[arg-type]
fp=BytesIO(body.encode()),
)
_URLOPEN = "bot_bottle.contrib.gitea.client.urllib.request.urlopen"
class TestOrgMembership(unittest.TestCase):
def test_member_returns_true_on_2xx(self):
with patch(_URLOPEN, return_value=_resp(None, 204)) as m:
self.assertTrue(_client().is_org_member("bot-bottle", "alice"))
req = m.call_args.args[0]
self.assertIn("/orgs/bot-bottle/members/alice", req.full_url)
def test_nonmember_returns_false_on_404(self):
with patch(_URLOPEN, side_effect=_http_error(404)):
self.assertFalse(_client().is_org_member("bot-bottle", "stranger"))
def test_other_http_error_raises(self):
with patch(_URLOPEN, side_effect=_http_error(403, "forbidden")):
with self.assertRaises(RuntimeError) as ctx:
_client().is_org_member("bot-bottle", "alice")
self.assertIn("403", str(ctx.exception))
class TestForgeReads(unittest.TestCase):
def test_read_issue_maps_fields(self):
raw = {"number": 17, "title": "Bug", "body": "broken", "state": "open"}
with patch(_URLOPEN, return_value=_resp(raw)) as m:
issue = GiteaForge(_client()).read_issue(17)
self.assertEqual((17, "Bug", "broken", "open"),
(issue.number, issue.title, issue.body, issue.state))
self.assertIn("/repos/didericis/bot-bottle/issues/17",
m.call_args.args[0].full_url)
def test_read_issue_tolerates_null_body(self):
raw = {"number": 17, "title": "T", "body": None, "state": "open"}
with patch(_URLOPEN, return_value=_resp(raw)):
self.assertEqual("", GiteaForge(_client()).read_issue(17).body)
def test_read_comments_maps_user_login(self):
raw = [
{"id": 1, "user": {"login": "alice"}, "body": "hi"},
{"id": 2, "user": {"login": "bob"}, "body": "yo"},
]
with patch(_URLOPEN, return_value=_resp(raw)):
comments = GiteaForge(_client()).read_comments(17)
self.assertEqual(["alice", "bob"], [c.user for c in comments])
self.assertEqual([1, 2], [c.id for c in comments])
class TestForgeWrites(unittest.TestCase):
def test_post_comment_payload_and_url(self):
with patch(_URLOPEN, return_value=_resp(None, 201)) as m:
GiteaForge(_client()).post_comment(17, "done ✓")
req = m.call_args.args[0]
self.assertEqual("POST", req.method)
self.assertIn("/repos/didericis/bot-bottle/issues/17/comments", req.full_url)
self.assertEqual("done ✓", json.loads(req.data)["body"])
def test_update_description_patches_issue(self):
with patch(_URLOPEN, return_value=_resp(None, 200)) as m:
GiteaForge(_client()).update_description(17, "edited")
req = m.call_args.args[0]
self.assertEqual("PATCH", req.method)
self.assertTrue(req.full_url.endswith("/issues/17"))
self.assertEqual("edited", json.loads(req.data)["body"])
def test_auth_header_sent(self):
with patch(_URLOPEN, return_value=_resp(None, 201)) as m:
GiteaForge(_client()).post_comment(17, "x")
self.assertEqual("token test-token",
m.call_args.args[0].headers["Authorization"])
class TestPRHelpers(unittest.TestCase):
def test_get_pr_for_issue_returns_number_when_issue_is_pr(self):
raw = {"number": 18, "pull_request": {"merged": False}}
with patch(_URLOPEN, return_value=_resp(raw)):
self.assertEqual(18, GiteaForge(_client()).get_pr_for_issue(18))
def test_get_pr_for_issue_none_for_plain_issue(self):
raw = {"number": 17, "pull_request": None}
with patch(_URLOPEN, return_value=_resp(raw)):
self.assertIsNone(GiteaForge(_client()).get_pr_for_issue(17))
def test_is_pr_open_true_when_state_open(self):
with patch(_URLOPEN, return_value=_resp({"state": "open"})):
self.assertTrue(GiteaForge(_client()).is_pr_open(18))
def test_is_pr_open_false_when_closed(self):
with patch(_URLOPEN, return_value=_resp({"state": "closed"})):
self.assertFalse(GiteaForge(_client()).is_pr_open(18))
if __name__ == "__main__":
unittest.main()
@@ -0,0 +1,103 @@
"""Unit: forge state persistence (PRD forge-native-integration)."""
from __future__ import annotations
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch
from bot_bottle.contrib.gitea import forge_state as fs
from bot_bottle.contrib.gitea.forge_state import (
STATUS_FROZEN,
STATUS_RUNNING,
ForgeState,
)
def _state(**over) -> ForgeState:
base = {
"owner": "didericis",
"repo": "bot-bottle",
"issue_number": 17,
"slug": "implementer-abc12",
"agent_name": "implementer",
"bottle_names": ["claude"],
"backend_name": "docker",
"agent_git_user": "didericis-claude",
"pr_number": 42,
"status": STATUS_FROZEN,
"last_checkin_at": "2026-06-29T12:04:12-04:00",
}
base.update(over)
return ForgeState(**base)
class ForgeStateTest(unittest.TestCase):
def setUp(self) -> None:
# enterContext handles cleanup; pylint doesn't recognize it as CM-aware.
root = Path(self.enterContext( # pylint: disable=consider-using-with
tempfile.TemporaryDirectory()))
patcher = patch.object(fs, "bot_bottle_root", return_value=root)
patcher.start()
self.addCleanup(patcher.stop)
def test_round_trip(self):
fs.write_forge_state(_state())
got = fs.read_forge_state("didericis", "bot-bottle", 17)
self.assertEqual(_state(), got)
def test_missing_returns_none(self):
self.assertIsNone(fs.read_forge_state("nobody", "nope", 1))
def test_path_layout(self):
path = fs.forge_state_path("didericis", "bot-bottle", 17)
self.assertTrue(str(path).endswith("forge/didericis/bot-bottle/issue-17.json"))
def test_write_is_atomic_no_tmp_left(self):
fs.write_forge_state(_state())
path = fs.forge_state_path("didericis", "bot-bottle", 17)
self.assertFalse(path.with_suffix(".json.tmp").exists())
self.assertTrue(path.exists())
def test_update_overwrites(self):
fs.write_forge_state(_state(status=STATUS_RUNNING))
fs.write_forge_state(_state(status=STATUS_FROZEN))
got = fs.read_forge_state("didericis", "bot-bottle", 17)
assert got is not None
self.assertEqual(STATUS_FROZEN, got.status)
def test_delete_is_idempotent(self):
fs.write_forge_state(_state())
fs.delete_forge_state("didericis", "bot-bottle", 17)
fs.delete_forge_state("didericis", "bot-bottle", 17) # no raise
self.assertIsNone(fs.read_forge_state("didericis", "bot-bottle", 17))
def test_all_forge_states_lists_across_repos(self):
fs.write_forge_state(_state(issue_number=17))
fs.write_forge_state(_state(issue_number=18, slug="other"))
fs.write_forge_state(_state(owner="acme", repo="widget", issue_number=3))
states = fs.all_forge_states()
self.assertEqual(3, len(states))
self.assertEqual({17, 18, 3}, {s.issue_number for s in states})
def test_all_forge_states_empty_when_no_dir(self):
self.assertEqual([], fs.all_forge_states())
def test_from_dict_ignores_unknown_keys(self):
st = ForgeState.from_dict({
"owner": "o", "repo": "r", "issue_number": 1, "slug": "s",
"agent_name": "a", "future_field": "ignored",
})
self.assertEqual("o", st.owner)
self.assertIsNone(st.pr_number)
def test_pr_number_optional(self):
fs.write_forge_state(_state(pr_number=None))
got = fs.read_forge_state("didericis", "bot-bottle", 17)
assert got is not None
self.assertIsNone(got.pr_number)
if __name__ == "__main__":
unittest.main()
@@ -0,0 +1,81 @@
"""Unit: provenance footer (PRD forge-native-integration)."""
from __future__ import annotations
import unittest
from bot_bottle.contrib.gitea.provenance import build_provenance_footer
def _footer(slug: str = "implementer-abc12", **over) -> str:
base = {
"agent_name": "implementer",
"bottle_names": ("claude",),
"started_at": "2026-06-29T12:00:00-04:00",
"finished_at": "2026-06-29T12:04:12-04:00",
"exit_code": 0,
}
base.update(over)
return build_provenance_footer(slug, **base)
class ProvenanceTest(unittest.TestCase):
def test_required_fields_present(self):
out = _footer()
for token in ("Run provenance", "`implementer`", "`claude`",
"`implementer-abc12`", "| exit | 0 ✓ |"):
self.assertIn(token, out)
def test_collapsed_details_block(self):
out = _footer()
self.assertTrue(out.startswith("<details>"))
self.assertIn("</details>", out)
def test_duration_minutes_seconds(self):
self.assertIn("| duration | 4m 12s |", _footer())
def test_duration_under_a_minute(self):
out = _footer(finished_at="2026-06-29T12:00:30-04:00")
self.assertIn("| duration | 30s |", out)
def test_duration_unknown_on_bad_timestamp(self):
out = _footer(finished_at="not-a-time")
self.assertIn("| duration | unknown |", out)
def test_nonzero_exit_marked(self):
self.assertIn("| exit | 1 ✗ |", _footer(exit_code=1))
def test_watchdog_changes_done_signal_row(self):
normal = _footer()
self.assertIn("sidecar `signal_done`", normal)
fired = _footer(watchdog_fired=True)
self.assertIn("watchdog — agent did not signal", fired)
self.assertNotIn("sidecar `signal_done`", fired)
def test_gitleaks_states(self):
self.assertIn("not run", _footer())
self.assertIn("✓ no secrets detected", _footer(gitleaks_clean=True))
self.assertIn("✗ secrets detected", _footer(gitleaks_clean=False))
def test_egress_omitted_when_absent(self):
self.assertNotIn("**Egress**", _footer())
def test_egress_rendered_when_present(self):
out = _footer(egress_routes=[
"`api.anthropic.com` — Bearer auth",
"`pypi.org` — unauthenticated",
])
self.assertIn("**Egress** (deny-by-default; 2 routes allowed)", out)
self.assertIn("- `api.anthropic.com` — Bearer auth", out)
def test_egress_singular_route(self):
out = _footer(egress_routes=["`api.anthropic.com` — Bearer auth"])
self.assertIn("1 route allowed", out)
def test_multiple_bottles_listed(self):
out = _footer(bottle_names=("claude", "dev"))
self.assertIn("`claude`, `dev`", out)
if __name__ == "__main__":
unittest.main()
+9
View File
@@ -223,5 +223,14 @@ class TestPiDockerfile(unittest.TestCase):
self.assertIn("chmod 1777 /tmp /var/tmp", dockerfile) self.assertIn("chmod 1777 /tmp /var/tmp", dockerfile)
class TestPiHeadlessPrompt(unittest.TestCase):
def test_returns_p_flag_and_prompt(self):
self.assertEqual(["-p", "Do the task"], PiAgentProvider().headless_prompt("Do the task"))
def test_preserves_prompt_text_verbatim(self):
text = "Fix issue #42: the widget breaks on empty input"
self.assertEqual(["-p", text], PiAgentProvider().headless_prompt(text))
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
@@ -38,6 +38,7 @@ class _Provider(AgentProvider):
def provision_prompt(self, plan, bottle): ... # type: ignore[override] def provision_prompt(self, plan, bottle): ... # type: ignore[override]
def provision(self, plan, bottle): ... # type: ignore[override] def provision(self, plan, bottle): ... # type: ignore[override]
def provision_supervise_mcp(self, plan, bottle, supervise_url): ... # type: ignore[override] def provision_supervise_mcp(self, plan, bottle, supervise_url): ... # type: ignore[override]
def headless_prompt(self, prompt): return [] # type: ignore[override]
_PROVIDER = _Provider() _PROVIDER = _Provider()
+16
View File
@@ -165,6 +165,22 @@ class TestAgentValidation(unittest.TestCase):
with self.assertRaises(ManifestError): with self.assertRaises(ManifestError):
ManifestAgent.from_dict("a", {"skills": [5]}, set()) ManifestAgent.from_dict("a", {"skills": [5]}, set())
def test_skill_name_rejects_shell_metacharacters(self) -> None:
# Skill names become host/guest path segments interpolated into
# provisioning shell commands; anything outside kebab-case is
# rejected at load so it can never reach a `bottle.exec` string.
for bad in ("foo; rm -rf /", "../escape", "foo bar", "Foo", "-leading"):
with self.assertRaises(ManifestError):
ManifestAgent.from_dict("a", {"skills": [bad]}, set())
def test_skill_name_accepts_kebab_case(self) -> None:
agent = ManifestAgent.from_dict(
"a", {"skills": ["init-entry", "quality-eval", "skill0"]}, set()
)
self.assertEqual(
agent.skills, ("init-entry", "quality-eval", "skill0")
)
def test_prompt_not_string(self) -> None: def test_prompt_not_string(self) -> None:
with self.assertRaises(ManifestError): with self.assertRaises(ManifestError):
ManifestAgent.from_dict("a", {"prompt": 5}, set()) ManifestAgent.from_dict("a", {"prompt": 5}, set())
@@ -49,6 +49,7 @@ class _Provider(AgentProvider):
def provision_prompt(self, plan, bottle): ... # type: ignore[override] def provision_prompt(self, plan, bottle): ... # type: ignore[override]
def provision(self, plan, bottle): ... # type: ignore[override] def provision(self, plan, bottle): ... # type: ignore[override]
def provision_supervise_mcp(self, plan, bottle, supervise_url): ... # type: ignore[override] def provision_supervise_mcp(self, plan, bottle, supervise_url): ... # type: ignore[override]
def headless_prompt(self, prompt): return [] # type: ignore[override]
_PROVIDER = _Provider() _PROVIDER = _Provider()