Compare commits

...

12 Commits

Author SHA1 Message Date
didericis-claude 18e610c7a8 fix: resolve pylint/pyright issues in runner, sidecar, and test_runner
lint / lint (push) Successful in 2m1s
test / unit (pull_request) Successful in 50s
test / integration (pull_request) Successful in 17s
test / coverage (pull_request) Successful in 1m3s
runner.py: use 'from bot_bottle import api' (satisfies R0402) with
type: ignore and pylint disable for the cross-branch dependency on
bot_bottle.api (added in PR #318, which merges before this one).
sidecar.py: add pylint disable for intentional broad-exception-caught.
test_runner.py: annotate _make_api_stub(**overrides: object) -> Any and
type stub variable as Any to allow attribute assignment without
type: ignore per-line.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-07-01 20:27:57 +00:00
didericis-claude d5fb159857 refactor(orchestrator): swap SubprocessBottleRunner → ProgrammaticBottleRunner
lint / lint (push) Failing after 2m15s
test / unit (pull_request) Successful in 51s
test / integration (pull_request) Successful in 21s
test / coverage (pull_request) Successful in 1m7s
BottleRunner Protocol tightened: start() → str, freeze/resume/destroy → None.
RunResult removed. lifecycle.py unpacks the slug directly. FakeRunner and
test_runner updated to match. Config.bot_bottle_cli dropped (nothing uses it).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-07-01 19:48:06 +00:00
didericis-claude 71699b3ecd fix: resolve pylint/pyright issues in new test files
lint / lint (push) Successful in 2m7s
test / unit (pull_request) Successful in 57s
test / integration (pull_request) Successful in 17s
test / coverage (pull_request) Successful in 1m4s
- test_contrib_gitea_client: remove unused Any import, fix _mock_response
  to use return_value instead of lambda (unknown lambda type), narrow
  HTTPError hdrs type, add type annotations to fake_urlopen helpers,
  suppress protected-access for _request tests
- test_bootstrap: annotate **kw as **kw: object, use dict literal,
  unpack server_address via index to avoid tuple type mismatch
- test_main: remove unused MagicMock import
- test_watchdog: guard store.get() result before accessing .status

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-07-01 19:47:31 +00:00
didericis-claude 57290da1e8 test: add coverage for orchestrator + gitea client (diff gate 77% → 98%)
lint / lint (push) Failing after 2m5s
test / unit (pull_request) Successful in 53s
test / integration (pull_request) Successful in 24s
test / coverage (pull_request) Successful in 1m12s
Three new unit test modules:
- tests/unit/test_contrib_gitea_client.py — GiteaClient (urllib mocked)
  and GiteaForge delegation
- tests/unit/orchestrator/test_main.py — __main__ run/status commands
- tests/unit/orchestrator/test_bootstrap.py — _token, BotBottleStateStore,
  _to_forge_state/_to_record, make_forge, make_sidecar, build

Augments to existing suites:
- test_events: non-"created" comment action ignored
- test_lifecycle: _iso_now callable, untracked-issue comment ignored,
  untracked-PR closed ignored (covers _find_by_pr return-None path)
- test_runner: destroy command, _default_run via subprocess mock
- test_sidecar: _jsonable dataclass/list branches, OpLog.read on missing
  file, drain_done_events on corrupted file, socket _Handler invalid-JSON
  and empty-line paths, serve() with pre-existing socket path
- test_watchdog: _loop body covered by patching _TICK_SECS to 0.01s
- test_webhook: unknown GET path returns 404

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-07-01 19:35:30 +00:00
didericis-claude df1f0e8f70 docs: mark fold-orchestrator PRD as Active
lint / lint (push) Successful in 2m4s
test / unit (pull_request) Successful in 56s
test / integration (pull_request) Successful in 22s
test / coverage (pull_request) Failing after 1m7s
2026-07-01 17:18:38 +00:00
didericis-claude 314dc03b0d feat: fold bot-bottle-orchestrator into bot_bottle/orchestrator subpackage
Moves the orchestrator into bot_bottle/orchestrator/ so one install gets
everything. Entry point is now `python -m bot_bottle.orchestrator run`.

- Add bot_bottle/orchestrator/ with all 14 modules (verbatim move; internal
  imports were already relative, so no changes inside orchestrator modules)
- Rewrite bootstrap.py: remove the lazy bot_bottle import guard, use direct
  relative imports from ..contrib.*
- Add bot_bottle/contrib/forge/base.py: ScopedForge (read-anywhere / write-scoped)
- Add bot_bottle/contrib/gitea/client.py: GiteaClient + GiteaForge (urllib.request only)
- Add bot_bottle/contrib/gitea/forge_state.py: ForgeState + SqliteForgeStateStore
- Add tests/unit/orchestrator/ (82 tests: 63 migrated + 19 new for contrib modules)

Closes #321
2026-07-01 17:18:28 +00:00
didericis-claude 06025687ed docs: add PRD for folding orchestrator into bot-bottle subpackage 2026-07-01 17:14:43 +00:00
Quality Badge Bot 5970b785aa chore: update quality badges
- Coverage: 83%
- Core coverage: 95%

[skip ci]
2026-07-01 16:51:08 +00:00
didericis 2f5cf81cf5 fix(git-gate): defer dynamic key provisioning
lint / lint (push) Successful in 1m59s
test / unit (push) Successful in 49s
test / integration (push) Successful in 23s
test / coverage (push) Successful in 1m0s
Update Quality Badges / update-badges (push) Successful in 53s
2026-07-01 12:45:46 -04:00
didericis 4a1e667306 fix(git-gate): inline GIT_GATE_TIMEOUT_SECS to fix git-http ImportError
lint / lint (push) Successful in 1m56s
test / unit (push) Successful in 48s
test / integration (push) Successful in 20s
test / coverage (push) Successful in 1m2s
Update Quality Badges / update-badges (push) Successful in 52s
git_http_backend.py is copied flat into the sidecar bundle image as a
standalone script, not as part of the bot_bottle package, and
git_gate.py/git_gate_render.py are never copied in. Its relative
import of GIT_GATE_TIMEOUT_SECS crashed the git-http daemon (port
9420) on every startup, silently leaving the smart-HTTP git-gate
transport down while the other sidecar daemons stayed up.

Co-Authored-By: Claude Sonnet 5 <noreply@anthropic.com>
2026-07-01 11:53:26 -04:00
didericis b93fe58523 feat(cli): add headless launch mode for orchestrators
test / unit (pull_request) Successful in 47s
test / integration (pull_request) Successful in 16s
test / coverage (pull_request) Successful in 1m4s
lint / lint (push) Successful in 2m0s
test / unit (push) Successful in 48s
test / integration (push) Successful in 18s
test / coverage (push) Successful in 57s
Update Quality Badges / update-badges (push) Successful in 57s
`--headless` is a non-interactive launch path for `cli.py start`:
agent, bottles, label, and color come from flags + manifest defaults
with no TUI selectors and no y/N preflight (auto-confirmed via a new
`assume_yes` param threaded into the shared `_launch_bottle` core).

- `--bottle` (repeatable) defaults to the agent's own `bottle:`;
  `--label` defaults to the agent name and auto-uniquifies on slug
  collision; `--color` defaults to none.
- `--prompt TEXT` is required in headless mode and is delivered to the
  agent via a new `headless_prompt(prompt)` method on `AgentProvider`,
  implemented for claude (`-p`), codex (positional), and pi (`-p`).
- The agent still execs on inherited stdio/PTY, so whatever allocates
  the PTY drives the live session; only the launch chrome is headless.
- `--headless --dry-run` previews the resolved plan without launching.

Adds unit coverage in tests/unit/test_cli_start_headless.py and
headless_prompt tests for each provider. Also stubs headless_prompt on
the in-test AgentProvider subclasses so the unit suite collects cleanly.

Closes #315.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01WL77TgFxKbs3cidGMG9dz7
2026-06-30 15:08:14 -04:00
didericis 94eca35b4f fix(skills): validate skill names and quote provisioning paths
test / unit (push) Successful in 55s
test / integration (push) Successful in 23s
test / coverage (push) Successful in 1m11s
Update Quality Badges / update-badges (push) Successful in 1m3s
lint / lint (push) Successful in 2m18s
Skill names become host/guest path segments interpolated into the
`bottle.exec` shell strings in each contrib provider's provision_skills.
They were validated only as strings, so a name with shell metacharacters
or path traversal could reach the command.

Layer two defenses:
  - Primary: reject any skill name that isn't kebab-case
    ([a-z][a-z0-9-]*) at manifest load, reusing the convention already
    enforced on bottle/agent filenames (new is_valid_entity_name helper
    in manifest_schema). Fails loud and early, protecting every consumer
    of the name — not just the exec call sites.
  - Failsafe: shlex.quote the interpolated skills_dir / dst paths in the
    claude, codex, and pi providers, so a future unvalidated field can't
    inject shell metacharacters even if it bypasses the load-time check.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01NkwFXLFff9PYPy4wgVBJp9
2026-06-27 02:15:30 -04:00
58 changed files with 3851 additions and 36 deletions
+2 -2
View File
@@ -5,8 +5,8 @@
# bot-bottle # bot-bottle
[![test](https://gitea.dideric.is/didericis/bot-bottle/actions/workflows/test.yml/badge.svg?branch=main)](https://gitea.dideric.is/didericis/bot-bottle/actions?workflow=test.yml) [![test](https://gitea.dideric.is/didericis/bot-bottle/actions/workflows/test.yml/badge.svg?branch=main)](https://gitea.dideric.is/didericis/bot-bottle/actions?workflow=test.yml)
[![coverage](https://img.shields.io/badge/coverage-84%25-brightgreen)](https://coverage.readthedocs.io/) [![coverage](https://img.shields.io/badge/coverage-83%25-brightgreen)](https://coverage.readthedocs.io/)
[![core coverage](https://img.shields.io/badge/core%20coverage-96%25-brightgreen)](https://gitea.dideric.is/didericis/bot-bottle/src/branch/main/docs/decisions/0004-coverage-policy.md) [![core coverage](https://img.shields.io/badge/core%20coverage-95%25-brightgreen)](https://gitea.dideric.is/didericis/bot-bottle/src/branch/main/docs/decisions/0004-coverage-policy.md)
**Problem:** Developer wants to run a coding agent without supervision, but they don't want a prompt injected or misbehaving agent wrecking their environment or exfiltrating sensitive data. **Problem:** Developer wants to run a coding agent without supervision, but they don't want a prompt injected or misbehaving agent wrecking their environment or exfiltrating sensitive data.
+9
View File
@@ -209,6 +209,15 @@ class AgentProvider(ABC):
the supervise sidecar is reachable. No-op when the supervise sidecar is reachable. No-op when
`plan.supervise_plan is None`.""" `plan.supervise_plan is None`."""
@abstractmethod
def headless_prompt(self, prompt: str) -> list[str]:
"""Return the agent CLI args that deliver `prompt` as the
initial task in a non-interactive (headless) session.
Called only when ``--prompt`` is passed to
``./cli.py start --headless``; the returned args are appended
after the provider's ``bypass_args`` and ``startup_args``."""
def provision_ca(self, bottle: "Bottle", plan: "BottlePlan") -> None: def provision_ca(self, bottle: "Bottle", plan: "BottlePlan") -> None:
"""Install the egress MITM CA into the agent's trust store. """Install the egress MITM CA into the agent's trust store.
+9 -1
View File
@@ -37,7 +37,10 @@ from pathlib import Path
from typing import Callable, Generator from typing import Callable, Generator
from ...egress import egress_resolve_token_values from ...egress import egress_resolve_token_values
from ...git_gate import revoke_git_gate_provisioned_keys from ...git_gate import (
provision_git_gate_dynamic_keys,
revoke_git_gate_provisioned_keys,
)
from ...log import info, warn from ...log import info, warn
from . import network as network_mod from . import network as network_mod
from . import util as docker_mod from . import util as docker_mod
@@ -118,6 +121,11 @@ def launch(
git_gate_plan = plan.git_gate_plan git_gate_plan = plan.git_gate_plan
if git_gate_plan.upstreams: if git_gate_plan.upstreams:
git_gate_plan = provision_git_gate_dynamic_keys(
plan.manifest.bottle,
git_gate_plan,
git_gate_state_dir(plan.slug),
)
git_gate_plan = dataclasses.replace( git_gate_plan = dataclasses.replace(
git_gate_plan, git_gate_plan,
internal_network=internal_network, internal_network=internal_network,
+19 -1
View File
@@ -28,7 +28,10 @@ from ...egress import (
egress_resolve_token_values, egress_resolve_token_values,
egress_sidecar_env_entries, egress_sidecar_env_entries,
) )
from ...git_gate import revoke_git_gate_provisioned_keys from ...git_gate import (
provision_git_gate_dynamic_keys,
revoke_git_gate_provisioned_keys,
)
from ...log import die, info, warn from ...log import die, info, warn
from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT from ...supervise import QUEUE_DIR_IN_CONTAINER, SUPERVISE_PORT
from ...util import expand_tilde from ...util import expand_tilde
@@ -98,6 +101,8 @@ def launch(
egress_network = egress_network_name(plan.slug) egress_network = egress_network_name(plan.slug)
_create_networks(internal_network, egress_network, stack) _create_networks(internal_network, egress_network, stack)
plan = _provision_git_gate_keys(plan)
sidecar_name = sidecar_container_name(plan.slug) sidecar_name = sidecar_container_name(plan.slug)
container_mod.force_remove_container(sidecar_name) container_mod.force_remove_container(sidecar_name)
_start_sidecar_bundle(plan, sidecar_name, internal_network, egress_network) _start_sidecar_bundle(plan, sidecar_name, internal_network, egress_network)
@@ -241,6 +246,19 @@ def _stamp_agent_urls(
) )
def _provision_git_gate_keys(
plan: MacosContainerBottlePlan,
) -> MacosContainerBottlePlan:
if not plan.git_gate_plan.upstreams:
return plan
git_gate_plan = provision_git_gate_dynamic_keys(
plan.manifest.bottle,
plan.git_gate_plan,
git_gate_state_dir(plan.slug),
)
return dataclasses.replace(plan, git_gate_plan=git_gate_plan)
def _stage_git_gate(plan: MacosContainerBottlePlan, sidecar_name: str) -> None: def _stage_git_gate(plan: MacosContainerBottlePlan, sidecar_name: str) -> None:
gp = plan.git_gate_plan gp = plan.git_gate_plan
if not gp.upstreams: if not gp.upstreams:
+18 -1
View File
@@ -41,7 +41,10 @@ from ..docker.git_gate import (
GIT_GATE_ENTRYPOINT_IN_CONTAINER, GIT_GATE_ENTRYPOINT_IN_CONTAINER,
GIT_GATE_HOOK_IN_CONTAINER, GIT_GATE_HOOK_IN_CONTAINER,
) )
from ...git_gate import revoke_git_gate_provisioned_keys from ...git_gate import (
provision_git_gate_dynamic_keys,
revoke_git_gate_provisioned_keys,
)
from ...log import info, warn from ...log import info, warn
from ...bottle_state import ( from ...bottle_state import (
egress_state_dir, egress_state_dir,
@@ -174,6 +177,7 @@ def _start_bundle(
) -> SmolmachinesBottlePlan: ) -> SmolmachinesBottlePlan:
"""Build the BundleLaunchSpec, resolve token env, start the """Build the BundleLaunchSpec, resolve token env, start the
sidecar bundle container, and register teardown.""" sidecar bundle container, and register teardown."""
plan = _provision_git_gate_keys(plan)
bundle_spec = _bundle_launch_spec(plan, network, loopback_ip) bundle_spec = _bundle_launch_spec(plan, network, loopback_ip)
token_env = _resolve_token_env(plan, dict(os.environ)) token_env = _resolve_token_env(plan, dict(os.environ))
_bundle.ensure_bundle_image(bundle_spec.image) _bundle.ensure_bundle_image(bundle_spec.image)
@@ -182,6 +186,19 @@ def _start_bundle(
return plan return plan
def _provision_git_gate_keys(
plan: SmolmachinesBottlePlan,
) -> SmolmachinesBottlePlan:
if not plan.git_gate_plan.upstreams:
return plan
git_gate_plan = provision_git_gate_dynamic_keys(
plan.manifest.bottle,
plan.git_gate_plan,
git_gate_state_dir(plan.slug),
)
return dataclasses.replace(plan, git_gate_plan=git_gate_plan)
def _discover_urls( def _discover_urls(
plan: SmolmachinesBottlePlan, plan: SmolmachinesBottlePlan,
loopback_ip: str, loopback_ip: str,
+142 -7
View File
@@ -2,6 +2,11 @@
interactive claude-code session. The container is torn down when the interactive claude-code session. The container is torn down when the
session ends. session ends.
`--headless` selects a non-interactive launch (agent/bottles/label from
flags, no TUI selectors, no y/N prompt) for orchestrators,
CI, and webhook dispatch. The agent still execs on the inherited
stdio/PTY, so an orchestrator that allocates the PTY drives the session.
The launch core is shared with `cli.py resume <identity>` through The launch core is shared with `cli.py resume <identity>` through
the private orchestrator `_launch_bottle`. the private orchestrator `_launch_bottle`.
""" """
@@ -16,7 +21,7 @@ import tempfile
from pathlib import Path from pathlib import Path
from typing import Callable from typing import Callable
from ..agent_provider import runtime_for from ..agent_provider import get_provider, runtime_for
from ..backend import ( from ..backend import (
Bottle, Bottle,
BottleSpec, BottleSpec,
@@ -31,7 +36,7 @@ from ..bottle_state import (
is_preserved, is_preserved,
mark_preserved, mark_preserved,
) )
from ..log import info from ..log import info, die
from ..manifest import Manifest, ManifestIndex from ..manifest import Manifest, ManifestIndex
from ._common import PROG, USER_CWD, read_tty_line from ._common import PROG, USER_CWD, read_tty_line
from . import tui from . import tui
@@ -50,6 +55,39 @@ def cmd_start(argv: list[str]) -> int:
"or host auto-selection). Overrides the env var when set." "or host auto-selection). Overrides the env var when set."
), ),
) )
parser.add_argument(
"--headless",
action="store_true",
help=(
"non-interactive launch: take agent/bottles/label from flags, "
"skip all prompts. For orchestrators, CI, and webhooks."
),
)
parser.add_argument(
"--bottle",
action="append",
default=None,
metavar="NAME",
help=(
"bottle to compose, repeatable (order = merge order). In "
"--headless, defaults to the agent's own bottle when omitted."
),
)
parser.add_argument(
"--label",
default=None,
help="bottle label / terminal title (--headless default: agent name)",
)
parser.add_argument(
"--color",
default=None,
help="bottle color, one of the 16 ANSI color names (--headless default: none)",
)
parser.add_argument(
"--prompt",
default=None,
help="initial task prompt delivered to the agent (required with --headless)",
)
parser.add_argument( parser.add_argument(
"name", "name",
nargs="?", nargs="?",
@@ -61,6 +99,12 @@ def cmd_start(argv: list[str]) -> int:
dry_run = args.dry_run or os.environ.get("BOT_BOTTLE_DRY_RUN") == "1" dry_run = args.dry_run or os.environ.get("BOT_BOTTLE_DRY_RUN") == "1"
manifest = ManifestIndex.resolve(USER_CWD) manifest = ManifestIndex.resolve(USER_CWD)
backend_name: str | None = args.backend
if args.headless:
return _start_headless(
manifest, args, dry_run=dry_run, backend_name=backend_name
)
agent_name: str | None = args.name agent_name: str | None = args.name
if agent_name is None: if agent_name is None:
@@ -71,8 +115,6 @@ def cmd_start(argv: list[str]) -> int:
if agent_name is None: if agent_name is None:
return 0 return 0
backend_name: str | None = args.backend
# Bottle multiselect: always show after agent selection so operators # Bottle multiselect: always show after agent selection so operators
# can compose bottles at launch time without editing agent manifests. # can compose bottles at launch time without editing agent manifests.
available_bottles = manifest.all_bottle_names available_bottles = manifest.all_bottle_names
@@ -109,6 +151,83 @@ def cmd_start(argv: list[str]) -> int:
) )
# --- Headless launch -----------------------------------------------------
def _start_headless(
manifest: ManifestIndex,
args: argparse.Namespace,
*,
dry_run: bool,
backend_name: str | None,
) -> int:
"""Non-interactive launch path for orchestrators / CI / webhooks.
Resolves agent, bottles, label, and color from flags + manifest
defaults instead of the TUI selectors, and auto-confirms the
preflight. Otherwise runs the same launch core as the interactive
path, so the agent still execs on the inherited stdio/PTY — an
orchestrator allocates that PTY and relays it to its
desktop/mobile clients."""
agent_name = args.name
if not agent_name:
die("--headless requires an agent name: ./cli.py start <agent> --headless")
manifest.require_agent(agent_name) # raises ManifestError if unknown
prompt = args.prompt
if not prompt:
die(
"--headless requires --prompt: "
"./cli.py start <agent> --headless --prompt 'Do the thing'"
)
if args.bottle:
bottle_names: tuple[str, ...] = tuple(args.bottle)
else:
default_bottle = _peek_agent_bottle(manifest, agent_name)
if not default_bottle:
die(
f"--headless: agent '{agent_name}' has no default bottle; "
f"pass one or more --bottle NAME"
)
bottle_names = (default_bottle,)
label = _uniquify_label_headless(args.label or agent_name)
spec = BottleSpec(
manifest=manifest,
agent_name=agent_name,
copy_cwd=args.cwd,
user_cwd=USER_CWD,
label=label,
color=args.color or "",
bottle_names=bottle_names,
)
return _launch_bottle(
spec,
dry_run=dry_run,
backend_name=backend_name,
assume_yes=True,
headless_prompt_text=prompt,
)
def _uniquify_label_headless(label: str) -> str:
"""Non-interactive analog of `_resolve_unique_label`: if the label's
slug collides with a running bottle, append -2, -3, … until free,
logging the chosen label. Orchestrators fire-and-forget many bottles,
so silently picking a free name beats erroring on every collision."""
active_slugs = {a.slug for a in enumerate_active_agents()}
if docker_mod.slugify(label) not in active_slugs:
return label
n = 2
while docker_mod.slugify(f"{label}-{n}") in active_slugs:
n += 1
chosen = f"{label}-{n}"
info(f"label '{label}' already in use; using '{chosen}'")
return chosen
# --- Launch helpers ------------------------------------------------------ # --- Launch helpers ------------------------------------------------------
@@ -376,10 +495,19 @@ def _launch_bottle(
*, *,
dry_run: bool, dry_run: bool,
backend_name: str | None = None, backend_name: str | None = None,
assume_yes: bool = False,
headless_prompt_text: str = "",
) -> int: ) -> int:
"""Shared launch core for `start` and `resume`. Builds the plan, """Shared launch core for `start` and `resume`. Builds the plan,
prints / dry-runs / prompts as appropriate, brings the bottle up, prints / dry-runs / prompts as appropriate, brings the bottle up,
attaches claude, and prints the resume hint on session end.""" attaches claude, and prints the resume hint on session end.
`assume_yes` skips the interactive y/N confirmation (headless /
orchestrator launches), where there is no human at the prompt.
`headless_prompt_text` is passed to the provider's `headless_prompt`
method and the resulting args are appended to startup_args so the
agent receives the initial task without interactive input."""
stage_dir = Path(tempfile.mkdtemp(prefix="bot-bottle-stage.")) stage_dir = Path(tempfile.mkdtemp(prefix="bot-bottle-stage."))
identity = "" identity = ""
try: try:
@@ -387,7 +515,7 @@ def _launch_bottle(
spec, spec,
stage_dir=stage_dir, stage_dir=stage_dir,
render_preflight=_text_render_preflight(), render_preflight=_text_render_preflight(),
prompt_yes=_text_prompt_yes, prompt_yes=(lambda: True) if assume_yes else _text_prompt_yes,
dry_run=dry_run, dry_run=dry_run,
backend_name=backend_name, backend_name=backend_name,
) )
@@ -397,10 +525,17 @@ def _launch_bottle(
backend = get_bottle_backend(backend_name) backend = get_bottle_backend(backend_name)
with backend.launch(plan) as bottle: with backend.launch(plan) as bottle:
agent_provider_template = getattr(plan, "agent_provider_template", "claude") agent_provider_template = getattr(plan, "agent_provider_template", "claude")
extra_args: tuple[str, ...] = ()
if headless_prompt_text:
extra_args = tuple(
get_provider(agent_provider_template).headless_prompt(
headless_prompt_text
)
)
exit_code = attach_agent( exit_code = attach_agent(
bottle, bottle,
agent_provider_template=agent_provider_template, agent_provider_template=agent_provider_template,
startup_args=plan.agent_provision.startup_args, startup_args=plan.agent_provision.startup_args + extra_args,
) )
info( info(
f"session ended (exit {exit_code}); " f"session ended (exit {exit_code}); "
+10 -3
View File
@@ -217,7 +217,7 @@ class ClaudeAgentProvider(AgentProvider):
if not agent.skills: if not agent.skills:
return return
skills_dir = _skills_dir(plan.guest_home) skills_dir = _skills_dir(plan.guest_home)
bottle.exec(f"mkdir -p {skills_dir}", user="root") bottle.exec(f"mkdir -p {shlex.quote(skills_dir)}", user="root")
for name in agent.skills: for name in agent.skills:
src = host_skill_dir(name) src = host_skill_dir(name)
if not os.path.isdir(src): if not os.path.isdir(src):
@@ -227,9 +227,13 @@ class ClaudeAgentProvider(AgentProvider):
) )
dst = f"{skills_dir}/{name}" dst = f"{skills_dir}/{name}"
info(f"copying skill {name} into {bottle.name}:{dst}") info(f"copying skill {name} into {bottle.name}:{dst}")
bottle.exec(f"rm -rf {dst} && mkdir -p {dst}", user="root") # Defense in depth: skill names are validated kebab-case at
# manifest load, but quote the path so a future unvalidated
# field can't inject shell metacharacters here either.
dst_q = shlex.quote(dst)
bottle.exec(f"rm -rf {dst_q} && mkdir -p {dst_q}", user="root")
bottle.cp_in(f"{src}/.", f"{dst}/") bottle.cp_in(f"{src}/.", f"{dst}/")
bottle.exec(f"chown -R node:node {dst}", user="root") bottle.exec(f"chown -R node:node {dst_q}", user="root")
def provision_prompt(self, plan: "BottlePlan", bottle: "Bottle") -> str | None: def provision_prompt(self, plan: "BottlePlan", bottle: "Bottle") -> str | None:
"""Copy the prompt file into the guest, fix ownership/mode. """Copy the prompt file into the guest, fix ownership/mode.
@@ -309,6 +313,9 @@ class ClaudeAgentProvider(AgentProvider):
f"claude mcp add --scope user --transport http supervise {supervise_url}" f"claude mcp add --scope user --transport http supervise {supervise_url}"
) )
def headless_prompt(self, prompt: str) -> list[str]:
return ["-p", prompt]
def _exec(bottle: "Bottle", script: str, error: str) -> None: def _exec(bottle: "Bottle", script: str, error: str) -> None:
result = bottle.exec(script, user="root") result = bottle.exec(script, user="root")
+10 -3
View File
@@ -183,7 +183,7 @@ class CodexAgentProvider(AgentProvider):
if not agent.skills: if not agent.skills:
return return
skills_dir = _skills_dir(plan.guest_home) skills_dir = _skills_dir(plan.guest_home)
bottle.exec(f"mkdir -p {skills_dir}", user="root") bottle.exec(f"mkdir -p {shlex.quote(skills_dir)}", user="root")
for name in agent.skills: for name in agent.skills:
src = host_skill_dir(name) src = host_skill_dir(name)
if not os.path.isdir(src): if not os.path.isdir(src):
@@ -193,9 +193,13 @@ class CodexAgentProvider(AgentProvider):
) )
dst = f"{skills_dir}/{name}" dst = f"{skills_dir}/{name}"
info(f"copying skill {name} into {bottle.name}:{dst}") info(f"copying skill {name} into {bottle.name}:{dst}")
bottle.exec(f"rm -rf {dst} && mkdir -p {dst}", user="root") # Defense in depth: skill names are validated kebab-case at
# manifest load, but quote the path so a future unvalidated
# field can't inject shell metacharacters here either.
dst_q = shlex.quote(dst)
bottle.exec(f"rm -rf {dst_q} && mkdir -p {dst_q}", user="root")
bottle.cp_in(f"{src}/.", f"{dst}/") bottle.cp_in(f"{src}/.", f"{dst}/")
bottle.exec(f"chown -R node:node {dst}", user="root") bottle.exec(f"chown -R node:node {dst_q}", user="root")
def provision_prompt(self, plan: "BottlePlan", bottle: "Bottle") -> str | None: def provision_prompt(self, plan: "BottlePlan", bottle: "Bottle") -> str | None:
"""Copy the prompt file into the guest, fix ownership/mode. """Copy the prompt file into the guest, fix ownership/mode.
@@ -275,6 +279,9 @@ class CodexAgentProvider(AgentProvider):
f"codex mcp add supervise --url {shlex.quote(supervise_url)}" f"codex mcp add supervise --url {shlex.quote(supervise_url)}"
) )
def headless_prompt(self, prompt: str) -> list[str]:
return [prompt]
def _exec(bottle: "Bottle", script: str, error: str) -> None: def _exec(bottle: "Bottle", script: str, error: str) -> None:
result = bottle.exec(script, user="root") result = bottle.exec(script, user="root")
+52
View File
@@ -0,0 +1,52 @@
"""Scoped forge wrapper: read-anywhere / write-scoped access control.
`ScopedForge` wraps any forge object and restricts write operations to
the set of issue/PR numbers the agent is explicitly assigned to. Read
operations always pass through unconditionally.
"""
from __future__ import annotations
from typing import Any
class ScopedForge:
"""Delegates all forge calls to an inner forge, raising `PermissionError`
on write calls for numbers outside the assigned scope."""
def __init__(
self,
forge: Any,
*,
assigned_issue: int,
assigned_prs: list[int],
) -> None:
self._forge = forge
self._allowed_writes: frozenset[int] = frozenset({assigned_issue, *assigned_prs})
def _check_write(self, number: int) -> None:
if number not in self._allowed_writes:
raise PermissionError(
f"write to #{number} is outside the assigned scope "
f"(allowed: {sorted(self._allowed_writes)})"
)
def is_org_member(self, org: str, username: str) -> bool:
return self._forge.is_org_member(org, username)
def read_issue(self, number: int) -> dict[str, Any]:
return self._forge.read_issue(number)
def read_pr(self, number: int) -> dict[str, Any]:
return self._forge.read_pr(number)
def read_comments(self, number: int) -> list[dict[str, Any]]:
return self._forge.read_comments(number)
def post_comment(self, number: int, body: str) -> None:
self._check_write(number)
self._forge.post_comment(number, body)
def update_description(self, number: int, body: str) -> None:
self._check_write(number)
self._forge.update_description(number, body)
+112
View File
@@ -0,0 +1,112 @@
"""Gitea API client and forge adapter (PRD prd-new: fold orchestrator).
`GiteaClient` is a thin HTTP wrapper (stdlib `urllib.request` only no
new runtime dependencies). `GiteaForge` composes a client and exposes
the forge protocol used by the orchestrator's sidecar and lifecycle.
Required Gitea token scopes:
- Repository: Read & Write (issues, comments, PR descriptions)
- Organization: Read (org membership check)
"""
from __future__ import annotations
import json
import urllib.error
import urllib.request
from typing import Any
_TIMEOUT_SECS = 30
class GiteaClient:
"""Low-level HTTP wrapper for the Gitea REST API."""
def __init__(
self, *, api_url: str, owner: str, repo: str, token: str
) -> None:
self._base = api_url.rstrip("/")
self._owner = owner
self._repo = repo
self._headers = {
"Authorization": f"token {token}",
"Content-Type": "application/json",
"Accept": "application/json",
}
def _request(
self,
method: str,
path: str,
body: dict[str, Any] | None = None,
) -> Any:
url = f"{self._base}{path}"
data = json.dumps(body).encode() if body is not None else None
req = urllib.request.Request(
url, data=data, headers=self._headers, method=method
)
with urllib.request.urlopen(req, timeout=_TIMEOUT_SECS) as resp:
raw = resp.read()
return json.loads(raw) if raw else None
def is_org_member(self, org: str, username: str) -> bool:
url = f"{self._base}/orgs/{org}/members/{username}"
req = urllib.request.Request(url, headers=self._headers, method="GET")
try:
urllib.request.urlopen(req, timeout=_TIMEOUT_SECS).close()
return True
except urllib.error.HTTPError:
return False
def get_issue(self, number: int) -> dict[str, Any]:
return self._request("GET", f"/repos/{self._owner}/{self._repo}/issues/{number}")
def get_pull(self, number: int) -> dict[str, Any]:
return self._request("GET", f"/repos/{self._owner}/{self._repo}/pulls/{number}")
def list_comments(self, number: int) -> list[dict[str, Any]]:
return self._request("GET", f"/repos/{self._owner}/{self._repo}/issues/{number}/comments")
def create_comment(self, number: int, body: str) -> None:
self._request(
"POST",
f"/repos/{self._owner}/{self._repo}/issues/{number}/comments",
{"body": body},
)
def update_issue(self, number: int, body: str) -> None:
self._request(
"PATCH",
f"/repos/{self._owner}/{self._repo}/issues/{number}",
{"body": body},
)
class GiteaForge:
"""Adapts `GiteaClient` to the forge protocol expected by the orchestrator.
The forge protocol is duck-typed: any object with `is_org_member`,
`read_issue`, `read_pr`, `read_comments`, `post_comment`, and
`update_description` methods satisfies it.
"""
def __init__(self, client: GiteaClient) -> None:
self._client = client
def is_org_member(self, org: str, username: str) -> bool:
return self._client.is_org_member(org, username)
def read_issue(self, number: int) -> dict[str, Any]:
return self._client.get_issue(number)
def read_pr(self, number: int) -> dict[str, Any]:
return self._client.get_pull(number)
def read_comments(self, number: int) -> list[dict[str, Any]]:
return self._client.list_comments(number)
def post_comment(self, number: int, body: str) -> None:
self._client.create_comment(number, body)
def update_description(self, number: int, body: str) -> None:
self._client.update_issue(number, body)
+137
View File
@@ -0,0 +1,137 @@
"""Forge state persistence for the orchestrator (PRD prd-new: fold orchestrator).
`ForgeState` is a dataclass that mirrors the orchestrator's `RunRecord`
field-for-field, held here so the store implementation is in bot-bottle
where the Gitea contrib lives.
`SqliteForgeStateStore` backs it with a single SQLite table. The DB path
is optional; passing `None` uses `:memory:` (useful for tests and status
commands that don't need persistence).
"""
from __future__ import annotations
import json
import sqlite3
from dataclasses import dataclass, field
from pathlib import Path
@dataclass
class ForgeState:
"""Persisted state for one forge-targeted issue's bottle lifecycle."""
owner: str
repo: str
issue_number: int
slug: str
agent_name: str
bottle_names: list[str] = field(default_factory=list)
backend_name: str = ""
agent_git_user: str = ""
pr_number: int | None = None
status: str = ""
last_checkin_at: str = ""
_DDL = """
CREATE TABLE IF NOT EXISTS forge_state (
owner TEXT NOT NULL,
repo TEXT NOT NULL,
issue_number INTEGER NOT NULL,
slug TEXT NOT NULL,
agent_name TEXT NOT NULL,
bottle_names TEXT NOT NULL DEFAULT '[]',
backend_name TEXT NOT NULL DEFAULT '',
agent_git_user TEXT NOT NULL DEFAULT '',
pr_number INTEGER,
status TEXT NOT NULL DEFAULT '',
last_checkin_at TEXT NOT NULL DEFAULT '',
PRIMARY KEY (owner, repo, issue_number)
)
"""
class SqliteForgeStateStore:
"""SQLite-backed `ForgeState` store.
Thread-safety: a single connection is used; callers that share a
store across threads must serialise access externally.
"""
def __init__(self, db_path: Path | None) -> None:
path = str(db_path) if db_path is not None else ":memory:"
self._conn = sqlite3.connect(path, check_same_thread=False)
self._conn.row_factory = sqlite3.Row
self._conn.execute(_DDL)
self._conn.commit()
def upsert(self, state: ForgeState) -> None:
self._conn.execute(
"""
INSERT INTO forge_state
(owner, repo, issue_number, slug, agent_name,
bottle_names, backend_name, agent_git_user,
pr_number, status, last_checkin_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(owner, repo, issue_number) DO UPDATE SET
slug = excluded.slug,
agent_name = excluded.agent_name,
bottle_names = excluded.bottle_names,
backend_name = excluded.backend_name,
agent_git_user = excluded.agent_git_user,
pr_number = excluded.pr_number,
status = excluded.status,
last_checkin_at = excluded.last_checkin_at
""",
(
state.owner,
state.repo,
state.issue_number,
state.slug,
state.agent_name,
json.dumps(state.bottle_names),
state.backend_name,
state.agent_git_user,
state.pr_number,
state.status,
state.last_checkin_at,
),
)
self._conn.commit()
def get(self, owner: str, repo: str, issue_number: int) -> ForgeState | None:
row = self._conn.execute(
"SELECT * FROM forge_state WHERE owner=? AND repo=? AND issue_number=?",
(owner, repo, issue_number),
).fetchone()
return _row_to_state(row) if row is not None else None
def delete(self, owner: str, repo: str, issue_number: int) -> None:
self._conn.execute(
"DELETE FROM forge_state WHERE owner=? AND repo=? AND issue_number=?",
(owner, repo, issue_number),
)
self._conn.commit()
def all(self) -> list[ForgeState]:
rows = self._conn.execute(
"SELECT * FROM forge_state ORDER BY owner, repo, issue_number"
).fetchall()
return [_row_to_state(r) for r in rows]
def _row_to_state(row: sqlite3.Row) -> ForgeState:
return ForgeState(
owner=row["owner"],
repo=row["repo"],
issue_number=row["issue_number"],
slug=row["slug"],
agent_name=row["agent_name"],
bottle_names=json.loads(row["bottle_names"]),
backend_name=row["backend_name"],
agent_git_user=row["agent_git_user"],
pr_number=row["pr_number"],
status=row["status"],
last_checkin_at=row["last_checkin_at"],
)
+10 -3
View File
@@ -238,7 +238,7 @@ class PiAgentProvider(AgentProvider):
if not agent.skills: if not agent.skills:
return return
skills_dir = _skills_dir(plan.guest_home) skills_dir = _skills_dir(plan.guest_home)
bottle.exec(f"mkdir -p {skills_dir}", user="root") bottle.exec(f"mkdir -p {shlex.quote(skills_dir)}", user="root")
for name in agent.skills: for name in agent.skills:
src = host_skill_dir(name) src = host_skill_dir(name)
if not os.path.isdir(src): if not os.path.isdir(src):
@@ -248,9 +248,13 @@ class PiAgentProvider(AgentProvider):
) )
dst = f"{skills_dir}/{name}" dst = f"{skills_dir}/{name}"
info(f"copying skill {name} into {bottle.name}:{dst}") info(f"copying skill {name} into {bottle.name}:{dst}")
bottle.exec(f"rm -rf {dst} && mkdir -p {dst}", user="root") # Defense in depth: skill names are validated kebab-case at
# manifest load, but quote the path so a future unvalidated
# field can't inject shell metacharacters here either.
dst_q = shlex.quote(dst)
bottle.exec(f"rm -rf {dst_q} && mkdir -p {dst_q}", user="root")
bottle.cp_in(f"{src}/.", f"{dst}/") bottle.cp_in(f"{src}/.", f"{dst}/")
bottle.exec(f"chown -R node:node {dst}", user="root") bottle.exec(f"chown -R node:node {dst_q}", user="root")
def provision_prompt(self, plan: "BottlePlan", bottle: "Bottle") -> str | None: def provision_prompt(self, plan: "BottlePlan", bottle: "Bottle") -> str | None:
prompt_path = _prompt_path(plan.guest_home) prompt_path = _prompt_path(plan.guest_home)
@@ -311,6 +315,9 @@ class PiAgentProvider(AgentProvider):
) -> None: ) -> None:
del plan, bottle, supervise_url del plan, bottle, supervise_url
def headless_prompt(self, prompt: str) -> list[str]:
return ["-p", prompt]
def _exec(bottle: "Bottle", script: str, error: str) -> None: def _exec(bottle: "Bottle", script: str, error: str) -> None:
result = bottle.exec(script, user="root") result = bottle.exec(script, user="root")
+6 -11
View File
@@ -30,7 +30,6 @@ backend-specific and lives on concrete subclasses (see
from __future__ import annotations from __future__ import annotations
import dataclasses
from abc import ABC from abc import ABC
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
@@ -53,6 +52,7 @@ from .git_gate_render import (
_gitconfig_validate_value, _gitconfig_validate_value,
) )
from .git_gate_provision import ( from .git_gate_provision import (
provision_git_gate_dynamic_keys,
revoke_git_gate_provisioned_keys, revoke_git_gate_provisioned_keys,
_provision_dynamic_key, _provision_dynamic_key,
_resolve_identity_file, _resolve_identity_file,
@@ -93,20 +93,14 @@ class GitGate(ABC):
entrypoint, pre-receive hook, and access-hook scripts (mode entrypoint, pre-receive hook, and access-hook scripts (mode
600) under `stage_dir`. Pure host-side, no docker subprocess. 600) under `stage_dir`. Pure host-side, no docker subprocess.
For `gitea` key entries, also generates and registers For `gitea` key entries, the returned upstream intentionally
a fresh deploy key via the forge API and writes the private key has an empty identity file. Backend launch fills that in after
+ key ID to `stage_dir`. the operator confirms the preflight.
Returned plan is incomplete: the launch step must fill Returned plan is incomplete: the launch step must fill
`internal_network` / `egress_network` via `dataclasses.replace` `internal_network` / `egress_network` via `dataclasses.replace`
before passing the plan to `.start`.""" before passing the plan to `.start`."""
upstreams_list = list(git_gate_upstreams_for_bottle(bottle)) upstreams = git_gate_upstreams_for_bottle(bottle)
for i, entry in enumerate(bottle.git):
upstreams_list[i] = dataclasses.replace(
upstreams_list[i],
identity_file=_resolve_identity_file(entry, slug, stage_dir),
)
upstreams = tuple(upstreams_list)
entrypoint = stage_dir / "git_gate_entrypoint.sh" entrypoint = stage_dir / "git_gate_entrypoint.sh"
entrypoint.write_text(git_gate_render_entrypoint(upstreams)) entrypoint.write_text(git_gate_render_entrypoint(upstreams))
entrypoint.chmod(0o600) entrypoint.chmod(0o600)
@@ -162,6 +156,7 @@ __all__ = [
"git_gate_render_entrypoint", "git_gate_render_entrypoint",
"git_gate_render_hook", "git_gate_render_hook",
"git_gate_render_access_hook", "git_gate_render_access_hook",
"provision_git_gate_dynamic_keys",
"revoke_git_gate_provisioned_keys", "revoke_git_gate_provisioned_keys",
"_gitconfig_validate_value", "_gitconfig_validate_value",
"_provision_dynamic_key", "_provision_dynamic_key",
+43
View File
@@ -9,10 +9,16 @@ imported (`deploy_key_provisioner`) to keep its cost off the host path.
from __future__ import annotations from __future__ import annotations
import os import os
import dataclasses
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING
from .log import info from .log import info
from .manifest import ManifestBottle, ManifestGitEntry from .manifest import ManifestBottle, ManifestGitEntry
from .git_gate_render import GitGateUpstream
if TYPE_CHECKING:
from .git_gate import GitGatePlan
def _provision_dynamic_key( def _provision_dynamic_key(
entry: ManifestGitEntry, entry: ManifestGitEntry,
@@ -95,8 +101,45 @@ def _resolve_identity_file(entry: ManifestGitEntry, slug: str, stage_dir: Path)
return entry.IdentityFile return entry.IdentityFile
def provision_git_gate_dynamic_keys(
bottle: ManifestBottle,
plan: "GitGatePlan",
stage_dir: Path,
) -> "GitGatePlan":
"""Provision dynamic git-gate keys and return an updated plan.
This runs during backend launch, after the operator confirms the
preflight. Plan preparation intentionally stays side-effect-light:
dry-runs and aborted launches must not create remote deploy keys.
"""
if not plan.upstreams:
return plan
upstreams_by_name: dict[str, GitGateUpstream] = {
upstream.name: upstream for upstream in plan.upstreams
}
updated: list[GitGateUpstream] = []
for entry in bottle.git:
upstream = upstreams_by_name.get(entry.Name)
if upstream is None:
continue
if entry.Key.provider == "gitea":
identity_file = _provision_dynamic_key(entry, plan.slug, stage_dir)
upstream = dataclasses.replace(upstream, identity_file=identity_file)
updated.append(upstream)
if len(updated) != len(plan.upstreams):
updated_names = {u.name for u in updated}
for upstream in plan.upstreams:
if upstream.name not in updated_names:
updated.append(upstream)
return dataclasses.replace(plan, upstreams=tuple(updated))
__all__ = [ __all__ = [
"revoke_git_gate_provisioned_keys", "revoke_git_gate_provisioned_keys",
"provision_git_gate_dynamic_keys",
"_provision_dynamic_key", "_provision_dynamic_key",
"_resolve_identity_file", "_resolve_identity_file",
] ]
+7 -2
View File
@@ -16,11 +16,16 @@ from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path from pathlib import Path
from urllib.parse import urlsplit from urllib.parse import urlsplit
from .git_gate import GIT_GATE_TIMEOUT_SECS
DEFAULT_PORT = 9420 DEFAULT_PORT = 9420
# Mirrors git_gate_render.GIT_GATE_TIMEOUT_SECS. Duplicated rather than
# imported: this module ships as a flat top-level sibling in the sidecar
# bundle image (see Dockerfile.sidecars), not as part of the bot_bottle
# package, so `bot_bottle.git_gate` and its dependency chain aren't
# available at runtime.
GIT_GATE_TIMEOUT_SECS = 15
# Bound memory use while still allowing ordinary git push packfiles. # Bound memory use while still allowing ordinary git push packfiles.
MAX_BODY_BYTES = 100 * 1024 * 1024 MAX_BODY_BYTES = 100 * 1024 * 1024
+11 -1
View File
@@ -8,7 +8,7 @@ from typing import cast
from .agent_provider import PROVIDER_TEMPLATES from .agent_provider import PROVIDER_TEMPLATES
from .manifest_util import ManifestError, as_json_object from .manifest_util import ManifestError, as_json_object
from .manifest_git import ManifestGitUser from .manifest_git import ManifestGitUser
from .manifest_schema import AGENT_MODEL_KEYS from .manifest_schema import AGENT_MODEL_KEYS, is_valid_entity_name
@dataclass(frozen=True) @dataclass(frozen=True)
@@ -161,6 +161,16 @@ class ManifestAgent:
f"agent '{name}' skills[{i}] must be a string " f"agent '{name}' skills[{i}] must be a string "
f"(was {type(skill).__name__})" f"(was {type(skill).__name__})"
) )
# Skill names become host/guest path segments and are
# interpolated into provisioning shell commands, so they
# must fit the same kebab-case convention as bottle/agent
# filenames — rejecting anything that could break out of a
# path segment or inject shell metacharacters.
if not is_valid_entity_name(skill):
raise ManifestError(
f"agent '{name}' skills[{i}] {skill!r} is not a valid "
f"skill name; must match [a-z][a-z0-9-]*"
)
collected.append(skill) collected.append(skill)
skills = tuple(collected) skills = tuple(collected)
+8 -1
View File
@@ -33,13 +33,20 @@ AGENT_KEYS = (
AGENT_MODEL_KEYS = AGENT_KEYS | frozenset({"prompt"}) AGENT_MODEL_KEYS = AGENT_KEYS | frozenset({"prompt"})
def is_valid_entity_name(name: str) -> bool:
"""True if `name` fits the kebab-case `[a-z][a-z0-9-]*` convention
shared by bottle/agent filenames and skill names. Names that satisfy
this are also safe to interpolate into a host/guest path segment."""
return bool(_FILENAME_RX.match(name))
def entity_name_from_path(path: Path) -> str | None: def entity_name_from_path(path: Path) -> str | None:
"""Return the entity name implied by the filename, or None if the """Return the entity name implied by the filename, or None if the
filename does not fit the [a-z][a-z0-9-]* convention.""" filename does not fit the [a-z][a-z0-9-]* convention."""
if path.suffix != ".md": if path.suffix != ".md":
return None return None
stem = path.stem stem = path.stem
if not _FILENAME_RX.match(stem): if not is_valid_entity_name(stem):
return None return None
return stem return stem
+8
View File
@@ -0,0 +1,8 @@
"""bot-bottle-orchestrator: forge-native orchestration for bot-bottle.
The package is stdlib-only. The core (events, targeting, lifecycle,
watchdog, sidecar, webhook) depends on its collaborators a forge, a
state store, a bottle runner through duck-typed interfaces, so it runs
and tests without bot-bottle installed. `bootstrap` is the single module
that imports `bot_bottle` and wires the concrete implementations.
"""
+51
View File
@@ -0,0 +1,51 @@
"""CLI entry point: `python -m bot_bottle.orchestrator <command>`.
Commands:
run start the webhook server + watchdog + done-signal relay
status print the tracked runs (issue -> slug, status)
"""
from __future__ import annotations
import argparse
import sys
from .config import Config
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(prog="python -m bot_bottle.orchestrator")
sub = parser.add_subparsers(dest="command", required=True)
sub.add_parser("run", help="start the webhook server, watchdog, and relay")
sub.add_parser("status", help="list tracked runs")
args = parser.parse_args(argv)
config = Config.from_env()
if args.command == "run":
from . import bootstrap # pylint: disable=import-outside-toplevel
print(
f"orchestrator listening on "
f"http://{config.webhook_host}:{config.webhook_port}/webhook",
file=sys.stderr,
)
bootstrap.run(config)
return 0
if args.command == "status":
from .bootstrap import ( # pylint: disable=import-outside-toplevel
BotBottleStateStore,
)
store = BotBottleStateStore(config.db_path)
for r in store.all():
pr = f"PR#{r.pr_number}" if r.pr_number else "-"
print(f"{r.owner}/{r.repo}#{r.issue_number}\t{r.slug}\t{r.status}\t{pr}")
return 0
return 2
if __name__ == "__main__":
sys.exit(main())
+155
View File
@@ -0,0 +1,155 @@
"""Wire the concrete bot-bottle implementations into the core.
This is the ONLY module that imports from `bot_bottle.contrib`. It adapts
`SqliteForgeStateStore` to our `StateStore`, builds `GiteaForge`s (and
scope-wrapped forges for sidecars), constructs the `Orchestrator`, and
runs the webhook server + watchdog + done-signal relay.
Imports are direct (no lazy loading) because the orchestrator is now part
of the same package installation.
"""
from __future__ import annotations
import os
import threading
from pathlib import Path
from typing import Any
from ..contrib.forge.base import ScopedForge
from ..contrib.gitea.client import GiteaClient, GiteaForge
from ..contrib.gitea.forge_state import ForgeState, SqliteForgeStateStore
from .config import Config
from .lifecycle import Orchestrator
from .model import RunRecord
from .runner import ProgrammaticBottleRunner
from .sidecar import ForgeSidecar, OpLog, drain_done_events
from .watchdog import Watchdog
from .webhook import WebhookServer
_RELAY_TICK_SECS = 2.0
def _token() -> str:
tok = os.environ.get("GITEA_TOKEN") or os.environ.get("FORGE_GITEA_TOKEN")
if not tok:
raise RuntimeError("set GITEA_TOKEN (or FORGE_GITEA_TOKEN)")
return tok
class BotBottleStateStore:
"""Adapts `SqliteForgeStateStore` to our `StateStore`, translating
`RunRecord` <-> `ForgeState` field-for-field."""
def __init__(self, db_path: Path | None) -> None:
self._inner = SqliteForgeStateStore(db_path)
def upsert(self, record: RunRecord) -> None:
self._inner.upsert(_to_forge_state(record))
def get(self, owner: str, repo: str, issue_number: int) -> RunRecord | None:
state = self._inner.get(owner, repo, issue_number)
return _to_record(state) if state is not None else None
def delete(self, owner: str, repo: str, issue_number: int) -> None:
self._inner.delete(owner, repo, issue_number)
def all(self) -> list[RunRecord]:
return [_to_record(s) for s in self._inner.all()]
def _to_forge_state(r: RunRecord) -> ForgeState:
return ForgeState(
owner=r.owner, repo=r.repo, issue_number=r.issue_number, slug=r.slug,
agent_name=r.agent_name, bottle_names=list(r.bottle_names),
backend_name=r.backend_name, agent_git_user=r.agent_git_user,
pr_number=r.pr_number, status=r.status, last_checkin_at=r.last_checkin_at,
)
def _to_record(s: ForgeState) -> RunRecord:
return RunRecord(
owner=s.owner, repo=s.repo, issue_number=s.issue_number, slug=s.slug,
agent_name=s.agent_name, bottle_names=list(s.bottle_names),
backend_name=s.backend_name, agent_git_user=s.agent_git_user,
pr_number=s.pr_number, status=s.status, last_checkin_at=s.last_checkin_at,
)
def make_forge(config: Config, owner: str, repo: str) -> Any:
"""A `GiteaForge` bound to one repo."""
client = GiteaClient(
api_url=config.gitea_api, owner=owner, repo=repo, token=_token()
)
return GiteaForge(client)
def make_sidecar(
config: Config, owner: str, repo: str, issue_number: int, assigned_prs: list[int]
) -> ForgeSidecar:
"""A scope-enforced sidecar for one run (read-anywhere / write-scoped)."""
scoped = ScopedForge(
make_forge(config, owner, repo),
assigned_issue=issue_number,
assigned_prs=assigned_prs,
)
op_log = OpLog(config.queue_dir / f"{owner}-{repo}-{issue_number}.oplog.jsonl")
return ForgeSidecar(
forge=scoped,
op_log=op_log,
queue_dir=config.queue_dir,
run_key=(owner, repo, issue_number),
)
def build(config: Config) -> tuple[WebhookServer, Watchdog, Orchestrator]:
store = BotBottleStateStore(config.db_path)
runner = ProgrammaticBottleRunner()
membership_forge = make_forge(config, "_", "_")
orchestrator = Orchestrator(
forge=membership_forge,
store=store,
runner=runner,
org=config.forge_org,
gitea_api=config.gitea_api,
forge_env_base={
"GITEA_TOKEN": _token(),
"FORGE_QUEUE_DIR": str(config.queue_dir),
"FORGE_SIDECAR_SOCKET": str(config.sidecar_socket),
},
)
watchdog = Watchdog(
store=store, runner=runner, timeout_secs=config.watchdog_timeout_secs
)
server = WebhookServer(
(config.webhook_host, config.webhook_port),
orchestrator=orchestrator,
store=store,
)
return server, watchdog, orchestrator
def _relay_loop(config: Config, orchestrator: Orchestrator, stop: threading.Event) -> None:
while not stop.wait(_RELAY_TICK_SECS):
for ev in drain_done_events(config.queue_dir):
orchestrator.on_done_signal(
ev["owner"], ev["repo"], int(ev["issue_number"]),
str(ev.get("status", "")), str(ev.get("summary", "")),
)
def run(config: Config) -> None:
"""Blocking run: webhook server + watchdog + done-signal relay."""
server, watchdog, orchestrator = build(config)
watchdog.start()
stop = threading.Event()
relay = threading.Thread(
target=_relay_loop, args=(config, orchestrator, stop), daemon=True
)
relay.start()
try:
server.serve_forever()
finally:
stop.set()
watchdog.stop()
server.server_close()
+50
View File
@@ -0,0 +1,50 @@
"""Configuration, loaded from the environment (stdlib `os` only).
Everything the orchestrator needs to run is an env var so a deploy is a
process with an environment, no config file to manage. `FORGE_*` names
match the bot-bottle forge-native PRD.
"""
from __future__ import annotations
import os
from dataclasses import dataclass
from pathlib import Path
# The label that marks an issue as agent-targeted: `bot-bottle:<agent>`.
LABEL_PREFIX = "bot-bottle:"
# Optional bottle override: `bot-bottle-bottle:<name>`.
BOTTLE_LABEL_PREFIX = "bot-bottle-bottle:"
@dataclass(frozen=True)
class Config:
"""Resolved orchestrator configuration."""
forge_org: str
gitea_api: str
watchdog_timeout_secs: int
webhook_host: str
webhook_port: int
queue_dir: Path
sidecar_socket: Path
db_path: Path | None
@staticmethod
def from_env(env: dict[str, str] | None = None) -> "Config":
e = os.environ if env is None else env
home = Path(e.get("HOME", str(Path.home())))
default_root = home / ".bot-bottle"
db = e.get("FORGE_DB_PATH")
return Config(
forge_org=e.get("FORGE_ORG", "bot-bottle"),
gitea_api=e.get("FORGE_GITEA_API", ""),
watchdog_timeout_secs=int(e.get("FORGE_WATCHDOG_TIMEOUT", "1800")),
webhook_host=e.get("FORGE_WEBHOOK_HOST", "127.0.0.1"),
webhook_port=int(e.get("FORGE_WEBHOOK_PORT", "8477")),
queue_dir=Path(e.get("FORGE_QUEUE_DIR", str(default_root / "forge-queue"))),
sidecar_socket=Path(
e.get("FORGE_SIDECAR_SOCKET", str(default_root / "forge-sidecar.sock"))
),
db_path=Path(db) if db else None,
)
+85
View File
@@ -0,0 +1,85 @@
"""Parse Gitea webhook payloads into typed `ForgeEvent`s.
Only the fields the orchestrator acts on are extracted; unknown payloads
and event types return None so the webhook layer can ignore them.
Gitea sends the event kind in the `X-Gitea-Event` header and the payload
as JSON. The relevant kinds:
- `issues` with `action == "assigned"` -> IssueAssigned
- `issue_comment` with `action == "created"` -> CommentCreated
- `pull_request` with `action == "closed"` -> PullRequestClosed
"""
from __future__ import annotations
from typing import Any
from .model import CommentCreated, ForgeEvent, IssueAssigned, PullRequestClosed
def _repo_owner(payload: dict[str, Any]) -> tuple[str, str]:
repo = payload.get("repository") or {}
owner = (repo.get("owner") or {}).get("login", "")
return str(owner), str(repo.get("name", ""))
def parse_event(event_kind: str, payload: dict[str, Any]) -> ForgeEvent | None:
"""Map (X-Gitea-Event, payload) to a `ForgeEvent`, or None to ignore."""
if event_kind == "issues":
return _parse_issue(payload)
if event_kind == "issue_comment":
return _parse_comment(payload)
if event_kind == "pull_request":
return _parse_pull_request(payload)
return None
def _parse_issue(payload: dict[str, Any]) -> IssueAssigned | None:
if payload.get("action") != "assigned":
return None
owner, repo = _repo_owner(payload)
issue = payload.get("issue") or {}
assignees = tuple(
str(a.get("login", "")) for a in (issue.get("assignees") or [])
)
labels = tuple(str(l.get("name", "")) for l in (issue.get("labels") or []))
return IssueAssigned(
owner=owner,
repo=repo,
issue_number=int(issue.get("number", 0)),
title=str(issue.get("title", "")),
body=str(issue.get("body", "") or ""),
assignees=assignees,
labels=labels,
)
def _parse_comment(payload: dict[str, Any]) -> CommentCreated | None:
if payload.get("action") != "created":
return None
owner, repo = _repo_owner(payload)
issue = payload.get("issue") or {}
comment = payload.get("comment") or {}
return CommentCreated(
owner=owner,
repo=repo,
issue_number=int(issue.get("number", 0)),
comment_id=int(comment.get("id", 0)),
author=str((comment.get("user") or {}).get("login", "")),
body=str(comment.get("body", "") or ""),
is_pull=bool(issue.get("pull_request")),
)
def _parse_pull_request(payload: dict[str, Any]) -> PullRequestClosed | None:
if payload.get("action") != "closed":
return None
owner, repo = _repo_owner(payload)
pr = payload.get("pull_request") or {}
return PullRequestClosed(
owner=owner,
repo=repo,
pr_number=int(pr.get("number", 0)),
merged=bool(pr.get("merged", False)),
)
+180
View File
@@ -0,0 +1,180 @@
"""The orchestration lifecycle: forge events -> bottle transitions.
`Orchestrator.handle(event)` is the single entry point the webhook layer
calls. `on_done_signal(...)` is called by the sidecar relay when an agent
signals completion. All collaborators (forge, store, runner) are
injected and duck-typed; `now` and `label_for` are injectable for tests.
Transitions:
IssueAssigned (targeted, new) -> start bottle, record = running
signal_done (running) -> freeze bottle, record = frozen
CommentCreated (frozen) -> resume bottle, record = running
PullRequestClosed (tracked) -> destroy bottle, record removed
"""
from __future__ import annotations
from collections.abc import Callable
from datetime import datetime
from .model import (
STATUS_DESTROYED,
STATUS_FROZEN,
STATUS_RUNNING,
CommentCreated,
ForgeEvent,
IssueAssigned,
PullRequestClosed,
RunRecord,
)
from .runner import BottleRunner
from .store import StateStore
from .targeting import Membership, Target, resolve_target
def _iso_now() -> str:
return datetime.now().astimezone().isoformat(timespec="seconds")
def _default_label(agent: str, event: IssueAssigned) -> str:
# Embed the issue identity so slugs are unique per issue and never
# get renamed on collision.
return f"{agent}-{event.owner}-{event.repo}-{event.issue_number}"
class Orchestrator:
def __init__(
self,
*,
forge: Membership,
store: StateStore,
runner: BottleRunner,
org: str,
gitea_api: str = "",
forge_env_base: dict[str, str] | None = None,
now: Callable[[], str] = _iso_now,
label_for: Callable[[str, IssueAssigned], str] = _default_label,
) -> None:
self._forge = forge
self._store = store
self._runner = runner
self._org = org
self._gitea_api = gitea_api
self._forge_env_base = forge_env_base or {}
self._now = now
self._label_for = label_for
# --- entry points ------------------------------------------------------
def handle(self, event: ForgeEvent) -> None:
if isinstance(event, IssueAssigned):
self._on_issue_assigned(event)
elif isinstance(event, CommentCreated):
self._on_comment(event)
else:
self._on_pr_closed(event)
def on_done_signal( # pylint: disable=unused-argument
self, owner: str, repo: str, issue_number: int, status: str, summary: str
) -> None:
"""Sidecar relay: an agent signalled completion. Freeze the bottle.
`status`/`summary` are recorded by provenance (via the op log), not
acted on here."""
record = self._store.get(owner, repo, issue_number)
if record is None or record.status != STATUS_RUNNING:
return
self._runner.freeze(record.slug)
record.status = STATUS_FROZEN
record.last_checkin_at = self._now()
self._store.upsert(record)
def link_pr(self, owner: str, repo: str, issue_number: int, pr_number: int) -> None:
"""Record the PR a tracked issue produced, so PR comments and the
PR-close event route back to this record."""
record = self._store.get(owner, repo, issue_number)
if record is not None:
record.pr_number = pr_number
self._store.upsert(record)
# --- handlers ----------------------------------------------------------
def _on_issue_assigned(self, event: IssueAssigned) -> None:
target = resolve_target(event, self._forge, self._org)
if target is None:
return
# Idempotent: a webhook redelivery must not launch a second bottle.
if self._store.get(event.owner, event.repo, event.issue_number) is not None:
return
self._launch(event, target)
def _launch(self, event: IssueAssigned, target: Target) -> None:
label = self._label_for(target.agent_name, event)
bottles = [target.bottle_override] if target.bottle_override else []
slug = self._runner.start(
agent=target.agent_name,
bottles=bottles,
label=label,
prompt=event.body,
forge_env=self._forge_env(event.owner, event.repo, event.issue_number),
)
self._store.upsert(
RunRecord(
owner=event.owner,
repo=event.repo,
issue_number=event.issue_number,
slug=slug,
agent_name=target.agent_name,
bottle_names=bottles,
status=STATUS_RUNNING,
last_checkin_at=self._now(),
)
)
def _on_comment(self, event: CommentCreated) -> None:
record = self._route_comment(event)
if record is None or record.status != STATUS_FROZEN:
return
# Echo-loop guard: ignore the agent's own comments.
if record.agent_git_user and event.author == record.agent_git_user:
return
self._runner.resume(record.slug, event.body)
record.status = STATUS_RUNNING
record.last_checkin_at = self._now()
self._store.upsert(record)
def _route_comment(self, event: CommentCreated) -> RunRecord | None:
# A comment on the issue routes by issue number; a comment on a PR
# routes by the recorded pr_number.
direct = self._store.get(event.owner, event.repo, event.issue_number)
if direct is not None:
return direct
if event.is_pull:
return self._find_by_pr(event.owner, event.repo, event.issue_number)
return None
def _on_pr_closed(self, event: PullRequestClosed) -> None:
record = self._find_by_pr(event.owner, event.repo, event.pr_number)
if record is None:
return
self._runner.destroy(record.slug)
record.status = STATUS_DESTROYED
self._store.delete(record.owner, record.repo, record.issue_number)
def _find_by_pr(self, owner: str, repo: str, pr_number: int) -> RunRecord | None:
for record in self._store.all():
if (
record.owner == owner
and record.repo == repo
and record.pr_number == pr_number
):
return record
return None
def _forge_env(self, owner: str, repo: str, issue_number: int) -> dict[str, str]:
env = dict(self._forge_env_base)
if self._gitea_api:
env["FORGE_GITEA_API"] = self._gitea_api
env["FORGE_OWNER"] = owner
env["FORGE_REPO"] = repo
env["FORGE_ISSUE_NUMBER"] = str(issue_number)
return env
+108
View File
@@ -0,0 +1,108 @@
"""Domain model: run records, forge events, provenance.
These are the orchestrator's own dataclasses. `RunRecord` mirrors
bot-bottle's `ForgeState` field-for-field so the bootstrap adapter can
translate between them with no loss; keeping our own copy is what lets
the core stay import-free of bot-bottle.
"""
from __future__ import annotations
from dataclasses import dataclass, field
# Run lifecycle. A bottle is launched (running), frozen on the done
# signal, and destroyed when the PR closes.
STATUS_RUNNING = "running"
STATUS_FROZEN = "frozen"
STATUS_DESTROYED = "destroyed"
@dataclass
class RunRecord:
"""One forge-targeted issue's bottle lifecycle record."""
owner: str
repo: str
issue_number: int
slug: str
agent_name: str
bottle_names: list[str] = field(default_factory=list)
backend_name: str = ""
agent_git_user: str = ""
pr_number: int | None = None
status: str = STATUS_RUNNING
last_checkin_at: str = ""
# --- Forge events (parsed webhook payloads) --------------------------------
@dataclass(frozen=True)
class IssueAssigned:
"""An issue gained an assignee — the trigger to consider a launch."""
owner: str
repo: str
issue_number: int
title: str
body: str
assignees: tuple[str, ...]
labels: tuple[str, ...]
@dataclass(frozen=True)
class CommentCreated:
"""A comment was posted on an issue or PR — a rehydrate trigger."""
owner: str
repo: str
issue_number: int
comment_id: int
author: str
body: str
is_pull: bool
@dataclass(frozen=True)
class PullRequestClosed:
"""A PR closed (merged or not) — the teardown trigger."""
owner: str
repo: str
pr_number: int
merged: bool
# Union of everything the webhook layer can emit.
ForgeEvent = IssueAssigned | CommentCreated | PullRequestClosed
# --- Provenance ------------------------------------------------------------
@dataclass(frozen=True)
class ForgeOp:
"""One semantic forge operation the sidecar recorded."""
at: str # ISO timestamp
op: str # e.g. "post_comment", "read_pr", "signal_done"
target: int | None
detail: str
@dataclass(frozen=True)
class Provenance:
"""The audit record for one run, served by the provenance API. Never
posted into the forge."""
slug: str
owner: str
repo: str
issue_number: int
agent_name: str
bottle_names: tuple[str, ...]
started_at: str
finished_at: str
exit_code: int | None
watchdog_fired: bool
ops: tuple[ForgeOp, ...]
+71
View File
@@ -0,0 +1,71 @@
"""Provenance assembly + serialization.
Provenance is the run's audit record: the `RunRecord` metadata plus the
sidecar's semantic operation log. It is exposed through the provenance
API (see `webhook.ProvenanceHandler`) and deliberately never posted back
into the forge a mutable PR comment is not an audit record.
This module only assembles and serializes; retention/signing of the
record is a control-plane concern out of scope here.
"""
from __future__ import annotations
from typing import Any
from .model import ForgeOp, Provenance, RunRecord
def ops_from_log(entries: list[dict[str, Any]]) -> tuple[ForgeOp, ...]:
return tuple(
ForgeOp(
at=str(e.get("at", "")),
op=str(e.get("op", "")),
target=e.get("target"),
detail=str(e.get("detail", "")),
)
for e in entries
)
def build_provenance(
record: RunRecord,
*,
ops: tuple[ForgeOp, ...],
started_at: str,
finished_at: str,
exit_code: int | None,
watchdog_fired: bool,
) -> Provenance:
return Provenance(
slug=record.slug,
owner=record.owner,
repo=record.repo,
issue_number=record.issue_number,
agent_name=record.agent_name,
bottle_names=tuple(record.bottle_names),
started_at=started_at,
finished_at=finished_at,
exit_code=exit_code,
watchdog_fired=watchdog_fired,
ops=ops,
)
def provenance_to_dict(p: Provenance) -> dict[str, Any]:
return {
"slug": p.slug,
"owner": p.owner,
"repo": p.repo,
"issue_number": p.issue_number,
"agent": p.agent_name,
"bottles": list(p.bottle_names),
"started_at": p.started_at,
"finished_at": p.finished_at,
"exit_code": p.exit_code,
"watchdog_fired": p.watchdog_fired,
"ops": [
{"at": o.at, "op": o.op, "target": o.target, "detail": o.detail}
for o in p.ops
],
}
+83
View File
@@ -0,0 +1,83 @@
"""Bottle runner: drive bot_bottle to manage a bottle's life.
`BottleRunner` is the interface the lifecycle depends on;
`ProgrammaticBottleRunner` calls into the bot_bottle Python API directly
(no subprocess). The slug returned by `start` is the actual slug minted
at launch time not a post-hoc derivation from the label so it is
authoritative even if bot-bottle's slugification logic changes.
`slugify` is retained for `FakeRunner` (tests) and for the label scheme
the orchestrator uses to predict collision-free slugs.
"""
from __future__ import annotations
import re
from collections.abc import Sequence
from typing import Protocol
class BottleRunner(Protocol):
def start(
self,
*,
agent: str,
bottles: Sequence[str],
label: str,
prompt: str,
forge_env: dict[str, str],
) -> str: ...
def freeze(self, slug: str) -> None: ...
def resume(self, slug: str, prompt: str) -> None: ...
def destroy(self, slug: str) -> None: ...
_SLUG_RE = re.compile(r"[^a-z0-9]+")
def slugify(label: str) -> str:
"""Lowercase, collapse non-alphanumerics to single hyphens, strip
leading/trailing hyphens matches bot-bottle's slug rule."""
return _SLUG_RE.sub("-", label.lower()).strip("-")
class ProgrammaticBottleRunner:
"""Calls into the bot_bottle Python API directly — no subprocess.
Imports are deferred to call time so tests can inject a mock into
sys.modules['bot_bottle.api'] before calling runner methods.
bot_bottle.api is added in the forge-native-integration PR (#318),
which merges before this one."""
def start(
self,
*,
agent: str,
bottles: Sequence[str],
label: str,
prompt: str,
forge_env: dict[str, str],
) -> str:
from bot_bottle import api # type: ignore[import-not-found] # pylint: disable=import-error,no-name-in-module
return api.start_headless(
agent,
prompt=prompt,
bottles=list(bottles) or None,
label=label,
forge_env=forge_env,
)
def freeze(self, slug: str) -> None:
from bot_bottle import api # type: ignore[import-not-found] # pylint: disable=import-error,no-name-in-module
api.freeze(slug)
def resume(self, slug: str, prompt: str) -> None:
from bot_bottle import api # type: ignore[import-not-found] # pylint: disable=import-error,no-name-in-module
api.resume_headless(slug, prompt=prompt)
def destroy(self, slug: str) -> None:
from bot_bottle import api # type: ignore[import-not-found] # pylint: disable=import-error,no-name-in-module
api.destroy(slug)
+171
View File
@@ -0,0 +1,171 @@
"""Forge sidecar: the agent's only door to the forge.
The agent calls the sidecar over a line-delimited JSON-RPC AF_UNIX
socket; the sidecar dispatches to an injected `forge` (already
scope-wrapped by bootstrap) and holds the token, so the agent never sees
a credential or a forge endpoint. Every call is appended to a semantic
operation log (the provenance raw material). `signal_done` additionally
drops an event file in the queue dir the orchestrator drains.
`dispatch` is pure and testable; `serve` wraps it in a socket server.
"""
from __future__ import annotations
import dataclasses
import json
import socketserver
import uuid
from collections.abc import Callable
from datetime import datetime
from pathlib import Path
from typing import Any
_READ_METHODS = {"read_issue", "read_pr", "read_comments"}
_WRITE_METHODS = {"post_comment", "update_description"}
def _iso_now() -> str:
return datetime.now().astimezone().isoformat(timespec="seconds")
def _jsonable(value: Any) -> Any:
if dataclasses.is_dataclass(value) and not isinstance(value, type):
return dataclasses.asdict(value)
if isinstance(value, list):
return [_jsonable(v) for v in value]
return value
class OpLog:
"""Append-only JSONL log of semantic forge operations."""
def __init__(self, path: Path, *, now: Callable[[], str] = _iso_now) -> None:
self._path = path
self._now = now
path.parent.mkdir(parents=True, exist_ok=True)
def record(self, op: str, target: int | None, detail: str) -> None:
entry = {"at": self._now(), "op": op, "target": target, "detail": detail}
with self._path.open("a", encoding="utf-8") as fh:
fh.write(json.dumps(entry) + "\n")
def read(self) -> list[dict[str, Any]]:
if not self._path.exists():
return []
return [
json.loads(line)
for line in self._path.read_text(encoding="utf-8").splitlines()
if line.strip()
]
def write_done_event(queue_dir: Path, event: dict[str, Any]) -> Path:
"""Atomically drop a done-signal event file in the queue dir."""
queue_dir.mkdir(parents=True, exist_ok=True)
path = queue_dir / f"done-{uuid.uuid4().hex}.json"
tmp = path.with_suffix(".json.tmp")
tmp.write_text(json.dumps(event), encoding="utf-8")
tmp.replace(path)
return path
def drain_done_events(queue_dir: Path) -> list[dict[str, Any]]:
"""Read and remove every queued done-signal event."""
if not queue_dir.is_dir():
return []
events: list[dict[str, Any]] = []
for path in sorted(queue_dir.glob("done-*.json")):
try:
events.append(json.loads(path.read_text(encoding="utf-8")))
except (OSError, ValueError):
continue
finally:
path.unlink(missing_ok=True)
return events
class ForgeSidecar:
"""Dispatches sidecar protocol calls to the forge, logging each and
relaying `signal_done` to the queue dir. `run_key` is the
(owner, repo, issue_number) the run is bound to."""
def __init__(
self,
*,
forge: object,
op_log: OpLog,
queue_dir: Path,
run_key: tuple[str, str, int],
) -> None:
self._forge = forge
self._log = op_log
self._queue_dir = queue_dir
self._owner, self._repo, self._issue = run_key
def dispatch(self, method: str, params: dict[str, Any]) -> dict[str, Any]:
try:
result = self._invoke(method, params)
except Exception as exc: # noqa: BLE001 # pylint: disable=broad-exception-caught
self._log.record(method, params.get("number"), f"error: {exc}")
return {"ok": False, "error": str(exc)}
return {"ok": True, "result": result}
def _invoke(self, method: str, params: dict[str, Any]) -> Any:
if method in _READ_METHODS:
number = int(params["number"])
result = getattr(self._forge, method)(number)
self._log.record(method, number, "ok")
return _jsonable(result)
if method in _WRITE_METHODS:
number = int(params["number"])
getattr(self._forge, method)(number, params["body"])
self._log.record(method, number, "ok")
return None
if method == "signal_done":
status = str(params.get("status", ""))
summary = str(params.get("summary", ""))
self._log.record("signal_done", None, f"{status}: {summary}")
write_done_event(
self._queue_dir,
{
"owner": self._owner,
"repo": self._repo,
"issue_number": self._issue,
"status": status,
"summary": summary,
},
)
return None
raise ValueError(f"unknown method: {method}")
class _Handler(socketserver.StreamRequestHandler):
def handle(self) -> None:
line = self.rfile.readline()
if not line:
return
try:
req = json.loads(line)
except ValueError:
self.wfile.write(b'{"ok": false, "error": "invalid json"}\n')
return
resp = self.server.sidecar.dispatch( # type: ignore[attr-defined]
str(req.get("method", "")), dict(req.get("params", {}))
)
self.wfile.write((json.dumps(resp) + "\n").encode())
class _Server(socketserver.ThreadingUnixStreamServer):
def __init__(self, socket_path: str, sidecar: ForgeSidecar) -> None:
super().__init__(socket_path, _Handler)
self.sidecar = sidecar
def serve(sidecar: ForgeSidecar, socket_path: Path) -> _Server:
"""Bind a threaded AF_UNIX server for `sidecar`. Caller runs
`serve_forever()` (or `handle_request()` in tests) and closes it."""
if socket_path.exists():
socket_path.unlink()
socket_path.parent.mkdir(parents=True, exist_ok=True)
return _Server(str(socket_path), sidecar)
+48
View File
@@ -0,0 +1,48 @@
"""State store interface + an in-memory implementation.
The orchestrator persists one `RunRecord` per forge-targeted issue. At
runtime `bootstrap` supplies an adapter over bot-bottle's
`SqliteForgeStateStore`; the in-memory store here backs tests and a
`--no-bot-bottle` dry mode.
"""
from __future__ import annotations
from typing import Protocol
from .model import RunRecord
class StateStore(Protocol):
"""Thin CRUD surface. Mirrors bot-bottle's `ForgeStateStore` so the
bootstrap adapter is a straight pass-through."""
def upsert(self, record: RunRecord) -> None: ...
def get(self, owner: str, repo: str, issue_number: int) -> RunRecord | None: ...
def delete(self, owner: str, repo: str, issue_number: int) -> None: ...
def all(self) -> list[RunRecord]: ...
class InMemoryStateStore:
"""Dict-backed `StateStore`, keyed by (owner, repo, issue_number)."""
def __init__(self) -> None:
self._by_key: dict[tuple[str, str, int], RunRecord] = {}
def upsert(self, record: RunRecord) -> None:
self._by_key[(record.owner, record.repo, record.issue_number)] = record
def get(self, owner: str, repo: str, issue_number: int) -> RunRecord | None:
return self._by_key.get((owner, repo, issue_number))
def delete(self, owner: str, repo: str, issue_number: int) -> None:
self._by_key.pop((owner, repo, issue_number), None)
def all(self) -> list[RunRecord]:
return sorted(
self._by_key.values(),
key=lambda r: (r.owner, r.repo, r.issue_number),
)
+51
View File
@@ -0,0 +1,51 @@
"""Decide whether an assigned issue is agent-targeted, and for whom.
An issue is forge-targeted when BOTH hold:
- it carries a `bot-bottle:<agent>` label naming the agent, and
- at least one assignee is a member of the configured org.
An optional `bot-bottle-bottle:<name>` label overrides bottle selection.
The forge is duck-typed: any object with `is_org_member(org, user)`.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Protocol
from .config import BOTTLE_LABEL_PREFIX, LABEL_PREFIX
from .model import IssueAssigned
class Membership(Protocol):
def is_org_member(self, org: str, username: str) -> bool: ...
@dataclass(frozen=True)
class Target:
agent_name: str
bottle_override: str | None
def parse_labels(labels: tuple[str, ...]) -> tuple[str | None, str | None]:
"""Return (agent_name, bottle_override) parsed from labels."""
agent: str | None = None
bottle: str | None = None
for label in labels:
if label.startswith(BOTTLE_LABEL_PREFIX):
bottle = label[len(BOTTLE_LABEL_PREFIX):] or None
elif label.startswith(LABEL_PREFIX):
agent = label[len(LABEL_PREFIX):] or None
return agent, bottle
def resolve_target(
event: IssueAssigned, forge: Membership, org: str
) -> Target | None:
"""Return the `Target` for a forge-targeted issue, or None to ignore."""
agent, bottle = parse_labels(event.labels)
if not agent:
return None
if not any(forge.is_org_member(org, a) for a in event.assignees):
return None
return Target(agent_name=agent, bottle_override=bottle)
+68
View File
@@ -0,0 +1,68 @@
"""Watchdog: freeze runs whose agent exited without signalling done.
`sweep(now)` is the pure, testable core: any `running` record whose
`last_checkin_at` is older than the timeout is frozen as
done-without-self-report and returned so provenance can flag it.
`Watchdog.start()` runs `sweep` on a daemon thread once a minute.
"""
from __future__ import annotations
import threading
from datetime import datetime, timedelta
from .model import STATUS_FROZEN, STATUS_RUNNING, RunRecord
from .runner import BottleRunner
from .store import StateStore
_TICK_SECS = 60.0
def _parse(ts: str) -> datetime | None:
try:
return datetime.fromisoformat(ts)
except (ValueError, TypeError):
return None
class Watchdog:
def __init__(
self,
*,
store: StateStore,
runner: BottleRunner,
timeout_secs: int,
) -> None:
self._store = store
self._runner = runner
self._timeout = timedelta(seconds=timeout_secs)
self._stop = threading.Event()
self._thread: threading.Thread | None = None
def sweep(self, now: datetime) -> list[RunRecord]:
"""Freeze stale running records. Returns the ones fired."""
fired: list[RunRecord] = []
for record in self._store.all():
if record.status != STATUS_RUNNING:
continue
checkin = _parse(record.last_checkin_at)
if checkin is None or now - checkin <= self._timeout:
continue
self._runner.freeze(record.slug)
record.status = STATUS_FROZEN
self._store.upsert(record)
fired.append(record)
return fired
def start(self) -> None:
self._thread = threading.Thread(target=self._loop, daemon=True)
self._thread.start()
def stop(self) -> None:
self._stop.set()
if self._thread is not None:
self._thread.join(timeout=_TICK_SECS)
def _loop(self) -> None:
while not self._stop.wait(_TICK_SECS):
self.sweep(datetime.now().astimezone())
+123
View File
@@ -0,0 +1,123 @@
"""HTTP surface: the Gitea webhook receiver and the provenance API.
`POST /webhook` a Gitea event; parsed and dispatched to the orchestrator.
`GET /healthz` liveness.
`GET /provenance?owner=&repo=&issue=` the run's audit record (never
posted to the forge).
Webhook signature verification is optional: set a secret and the handler
rejects bodies whose `X-Gitea-Signature` HMAC-SHA256 does not match.
"""
from __future__ import annotations
import hmac
import json
from collections.abc import Callable
from hashlib import sha256
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from typing import Any
from urllib.parse import parse_qs, urlparse
from .events import parse_event
from .lifecycle import Orchestrator
from .provenance import build_provenance, ops_from_log, provenance_to_dict
from .store import StateStore
# (record) -> that run's op-log entries, injected by bootstrap.
OpLogReader = Callable[[Any], list[dict[str, Any]]]
class WebhookServer(ThreadingHTTPServer):
def __init__(
self,
address: tuple[str, int],
*,
orchestrator: Orchestrator,
store: StateStore,
secret: bytes | None = None,
op_log_reader: OpLogReader | None = None,
) -> None:
super().__init__(address, _Handler)
self.orchestrator = orchestrator
self.store = store
self.secret = secret
self.op_log_reader = op_log_reader
def verify_signature(secret: bytes, body: bytes, signature: str) -> bool:
expected = hmac.new(secret, body, sha256).hexdigest()
return hmac.compare_digest(expected, signature or "")
class _Handler(BaseHTTPRequestHandler):
server: WebhookServer # type: ignore[assignment]
def log_message( # pylint: disable=redefined-builtin
self, format: str, *args: Any
) -> None: # quiet by default
pass
def _send(self, code: int, payload: dict[str, Any]) -> None:
body = json.dumps(payload).encode()
self.send_response(code)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def do_POST(self) -> None: # noqa: N802 # pylint: disable=invalid-name
if urlparse(self.path).path != "/webhook":
self._send(404, {"error": "not found"})
return
length = int(self.headers.get("Content-Length", "0"))
body = self.rfile.read(length)
if self.server.secret is not None:
sig = self.headers.get("X-Gitea-Signature", "")
if not verify_signature(self.server.secret, body, sig):
self._send(401, {"error": "bad signature"})
return
try:
payload = json.loads(body)
except ValueError:
self._send(400, {"error": "invalid json"})
return
kind = self.headers.get("X-Gitea-Event", "")
event = parse_event(kind, payload)
if event is not None:
self.server.orchestrator.handle(event)
self._send(200, {"ok": True, "handled": event is not None})
def do_GET(self) -> None: # noqa: N802 # pylint: disable=invalid-name
parsed = urlparse(self.path)
if parsed.path == "/healthz":
self._send(200, {"ok": True})
return
if parsed.path == "/provenance":
self._provenance(parse_qs(parsed.query))
return
self._send(404, {"error": "not found"})
def _provenance(self, query: dict[str, list[str]]) -> None:
try:
owner = query["owner"][0]
repo = query["repo"][0]
issue = int(query["issue"][0])
except (KeyError, IndexError, ValueError):
self._send(400, {"error": "owner, repo, issue required"})
return
record = self.server.store.get(owner, repo, issue)
if record is None:
self._send(404, {"error": "no such run"})
return
reader = self.server.op_log_reader
ops = ops_from_log(reader(record) if reader is not None else [])
prov = build_provenance(
record,
ops=ops,
started_at="",
finished_at=record.last_checkin_at,
exit_code=None,
watchdog_fired=False,
)
self._send(200, provenance_to_dict(prov))
@@ -0,0 +1,132 @@
# PRD prd-new: Fold bot-bottle-orchestrator into this repo
- **Status:** Active
- **Author:** didericis
- **Created:** 2026-07-01
- **Issue:** #321
## Summary
Move the `bot-bottle-orchestrator` binary into `bot_bottle/orchestrator/` as a
first-class subpackage. `pip install bot-bottle` gets you everything; the
orchestrator's entry point becomes `python -m bot_bottle.orchestrator run`. The
cross-repo CLI contract becomes an internal boundary, and the forge integration
layer (`GiteaClient`, `ScopedForge`, `SqliteForgeStateStore`) is promoted to
`bot_bottle/contrib/` where it belongs.
## Problem
The orchestrator and bot-bottle are tightly coupled:
- It always deploys on the same host.
- It imports from `bot_bottle` for the forge/state layer.
- Its runner shims (`start --headless`, `commit`, `resume`) map 1:1 to CLI
commands in `cli.py` — a breaking CLI change silently breaks the orchestrator
with no CI signal.
- Two repos means two version pins, two CI pipelines, and two install steps
every time the deploy environment is rebuilt.
## Goals / Success Criteria
- All orchestrator modules live under `bot_bottle/orchestrator/` and the package
is importable as `from bot_bottle.orchestrator import ...`.
- `python -m bot_bottle.orchestrator run` starts the webhook server.
- `python -m bot_bottle.orchestrator status` prints tracked runs.
- The forge integration layer (`GiteaClient`, `GiteaForge`, `ScopedForge`,
`ForgeState`, `SqliteForgeStateStore`) lives in `bot_bottle/contrib/` and is
covered by tests in `tests/unit/orchestrator/`.
- All orchestrator unit tests pass under bot-bottle's existing CI
(`python -m unittest discover -s tests/unit`).
- No functional change to the orchestrator's external behaviour: same
HTTP surface, same webhook protocol, same env-var config, same CLI flags.
## Non-goals
- Replacing `SubprocessBottleRunner` with a direct programmatic runner — the
subprocess shim stays; the `BottleRunner` protocol remains the internal
abstraction point.
- Merging the orchestrator's SQLite DB with any other bot-bottle state store.
- Archiving `bot-bottle-orchestrator` (that happens after this ships and the
deploy is updated; out of scope for this PR).
## Design
### Package layout
```
bot_bottle/
orchestrator/
__init__.py
__main__.py # python -m bot_bottle.orchestrator
bootstrap.py # wires contrib modules → orchestrator core
config.py
events.py
lifecycle.py
model.py
provenance.py
runner.py
sidecar.py
store.py
targeting.py
watchdog.py
webhook.py
contrib/
forge/
__init__.py
base.py # ScopedForge: read-anywhere / write-scoped wrapper
gitea/
client.py # GiteaClient (urllib.request), GiteaForge
forge_state.py # ForgeState dataclass + SqliteForgeStateStore
tests/unit/orchestrator/
__init__.py
_fakes.py
test_config.py
test_events.py
test_lifecycle.py
test_provenance.py
test_runner.py
test_sidecar.py
test_store.py
test_targeting.py
test_watchdog.py
test_webhook.py
```
### Module moves
Every `orchestrator/` source file moves verbatim into `bot_bottle/orchestrator/`.
Internal imports are already relative (`from .config import Config`) so no
changes are needed inside the orchestrator modules themselves.
`bootstrap.py` is the only file that changes meaningfully: the lazy `bot_bottle`
imports become direct relative imports (`from ..contrib.gitea.client import …`),
and the `_require_bot_bottle()` guard is removed since the package is always
present.
### New contrib modules
**`bot_bottle/contrib/forge/base.py``ScopedForge`**
Wraps any forge object and enforces read-anywhere / write-scoped access: reads
pass through unconditionally; `post_comment` and `update_description` raise
`PermissionError` for issue/PR numbers outside the assigned set.
**`bot_bottle/contrib/gitea/client.py``GiteaClient`, `GiteaForge`**
`GiteaClient` is a thin `urllib.request`-only HTTP wrapper (no new Python
dependencies). `GiteaForge` composes a client and exposes the forge protocol:
`is_org_member`, `read_issue`, `read_pr`, `read_comments`, `post_comment`,
`update_description`.
**`bot_bottle/contrib/gitea/forge_state.py``ForgeState`, `SqliteForgeStateStore`**
`ForgeState` is a dataclass mirroring `RunRecord` field-for-field. `SqliteForgeStateStore`
backs it with SQLite (stdlib `sqlite3`): a single `forge_state` table with one
row per (owner, repo, issue\_number).
### Test migration
All orchestrator test files move to `tests/unit/orchestrator/` with absolute
imports updated from `orchestrator.X` to `bot_bottle.orchestrator.X`. The unit
discovery command (`-s tests/unit`) picks them up automatically — no CI changes
required.
View File
+66
View File
@@ -0,0 +1,66 @@
"""Shared test doubles: a duck-typed forge and bottle runner."""
# Test doubles mirror an API shape; some params are intentionally unused.
# pylint: disable=unused-argument
from __future__ import annotations
from collections.abc import Sequence
from bot_bottle.orchestrator.runner import slugify
class FakeForge:
def __init__(self, members: tuple[str, ...] = ()) -> None:
self.members = set(members)
self.comments: list[tuple[int, str]] = []
self.descriptions: list[tuple[int, str]] = []
self.scope_denied: set[int] = set()
def is_org_member(self, org: str, username: str) -> bool:
return username in self.members
def read_issue(self, number: int) -> dict[str, object]:
return {"number": number, "kind": "issue"}
def read_pr(self, number: int) -> dict[str, object]:
return {"number": number, "merged": False}
def read_comments(self, number: int) -> list[dict[str, object]]:
return [{"id": 1, "user": "alice", "body": "hi"}]
def post_comment(self, number: int, body: str) -> None:
if number in self.scope_denied:
raise PermissionError(f"write to #{number} denied")
self.comments.append((number, body))
def update_description(self, number: int, body: str) -> None:
if number in self.scope_denied:
raise PermissionError(f"write to #{number} denied")
self.descriptions.append((number, body))
class FakeRunner:
def __init__(self) -> None:
self.calls: list[tuple[object, ...]] = []
def start(
self,
*,
agent: str,
bottles: Sequence[str],
label: str,
prompt: str,
forge_env: dict[str, str],
) -> str:
self.calls.append(("start", agent, tuple(bottles), label, prompt, dict(forge_env)))
return slugify(label)
def freeze(self, slug: str) -> None:
self.calls.append(("freeze", slug))
def resume(self, slug: str, prompt: str) -> None:
self.calls.append(("resume", slug, prompt))
def destroy(self, slug: str) -> None:
self.calls.append(("destroy", slug))
+178
View File
@@ -0,0 +1,178 @@
"""Unit: BotBottleStateStore, _token, conversions, make_forge/make_sidecar, build."""
from __future__ import annotations
import os
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch
from bot_bottle.orchestrator.bootstrap import (
BotBottleStateStore,
_to_forge_state,
_to_record,
_token,
build,
make_forge,
make_sidecar,
)
from bot_bottle.orchestrator.config import Config
from bot_bottle.orchestrator.model import RunRecord
def _config(tmp: str) -> Config:
return Config(
forge_org="org",
gitea_api="http://g/api/v1",
watchdog_timeout_secs=1800,
webhook_host="127.0.0.1",
webhook_port=0,
queue_dir=Path(tmp) / "q",
sidecar_socket=Path(tmp) / "s.sock",
db_path=None,
)
def _record(**kw: object) -> RunRecord:
defaults: dict[str, object] = {
"owner": "o", "repo": "r", "issue_number": 1, "slug": "s1", "agent_name": "a",
"bottle_names": ["claude"], "backend_name": "docker", "agent_git_user": "bot",
"pr_number": 5, "status": "running", "last_checkin_at": "2026-01-01T00:00:00+00:00",
}
defaults.update(kw)
return RunRecord(**defaults) # type: ignore[arg-type]
class TokenTest(unittest.TestCase):
def test_gitea_token_env(self):
with patch.dict(os.environ, {"GITEA_TOKEN": "tok123"}):
self.assertEqual("tok123", _token())
def test_forge_gitea_token_fallback(self):
clean = {k: v for k, v in os.environ.items()
if k not in ("GITEA_TOKEN", "FORGE_GITEA_TOKEN")}
with patch.dict(os.environ, {**clean, "FORGE_GITEA_TOKEN": "tok456"}, clear=True):
self.assertEqual("tok456", _token())
def test_missing_token_raises(self):
clean = {k: v for k, v in os.environ.items()
if k not in ("GITEA_TOKEN", "FORGE_GITEA_TOKEN")}
with patch.dict(os.environ, clean, clear=True):
with self.assertRaises(RuntimeError):
_token()
class ConversionRoundTripTest(unittest.TestCase):
def test_record_survives_forge_state_roundtrip(self):
rec = _record()
result = _to_record(_to_forge_state(rec))
self.assertEqual(rec.owner, result.owner)
self.assertEqual(rec.repo, result.repo)
self.assertEqual(rec.issue_number, result.issue_number)
self.assertEqual(rec.slug, result.slug)
self.assertEqual(rec.agent_name, result.agent_name)
self.assertEqual(rec.bottle_names, result.bottle_names)
self.assertEqual(rec.backend_name, result.backend_name)
self.assertEqual(rec.agent_git_user, result.agent_git_user)
self.assertEqual(rec.pr_number, result.pr_number)
self.assertEqual(rec.status, result.status)
self.assertEqual(rec.last_checkin_at, result.last_checkin_at)
def test_none_pr_number_preserved(self):
rec = _record(pr_number=None)
result = _to_record(_to_forge_state(rec))
self.assertIsNone(result.pr_number)
class BotBottleStateStoreTest(unittest.TestCase):
def setUp(self):
self.store = BotBottleStateStore(None)
def test_upsert_and_get(self):
self.store.upsert(_record())
got = self.store.get("o", "r", 1)
assert got is not None
self.assertEqual("s1", got.slug)
def test_get_missing(self):
self.assertIsNone(self.store.get("o", "r", 99))
def test_upsert_replaces(self):
self.store.upsert(_record())
self.store.upsert(_record(slug="new-slug"))
got = self.store.get("o", "r", 1)
assert got is not None
self.assertEqual("new-slug", got.slug)
def test_delete(self):
self.store.upsert(_record())
self.store.delete("o", "r", 1)
self.assertIsNone(self.store.get("o", "r", 1))
def test_all_returns_all_records(self):
self.store.upsert(_record(issue_number=1, slug="s1"))
self.store.upsert(_record(issue_number=2, slug="s2"))
recs = self.store.all()
self.assertEqual(2, len(recs))
slugs = {r.slug for r in recs}
self.assertEqual({"s1", "s2"}, slugs)
def test_all_empty(self):
self.assertEqual([], self.store.all())
def test_bottle_names_preserved(self):
self.store.upsert(_record(bottle_names=["claude", "dev"]))
got = self.store.get("o", "r", 1)
assert got is not None
self.assertEqual(["claude", "dev"], got.bottle_names)
class MakeForgeTest(unittest.TestCase):
def test_returns_gitea_forge(self):
with tempfile.TemporaryDirectory() as tmp:
config = _config(tmp)
with patch.dict(os.environ, {"GITEA_TOKEN": "tok"}):
forge = make_forge(config, "owner", "repo")
from bot_bottle.contrib.gitea.client import GiteaForge
self.assertIsInstance(forge, GiteaForge)
class MakeSidecarTest(unittest.TestCase):
def test_returns_forge_sidecar(self):
with tempfile.TemporaryDirectory() as tmp:
config = _config(tmp)
with patch.dict(os.environ, {"GITEA_TOKEN": "tok"}):
sidecar = make_sidecar(config, "owner", "repo", 1, [])
from bot_bottle.orchestrator.sidecar import ForgeSidecar
self.assertIsInstance(sidecar, ForgeSidecar)
class BuildTest(unittest.TestCase):
def test_returns_server_watchdog_orchestrator(self):
with tempfile.TemporaryDirectory() as tmp:
config = _config(tmp)
with patch.dict(os.environ, {"GITEA_TOKEN": "tok"}):
server, watchdog, orch = build(config)
server.server_close()
from bot_bottle.orchestrator.lifecycle import Orchestrator
from bot_bottle.orchestrator.watchdog import Watchdog
from bot_bottle.orchestrator.webhook import WebhookServer
self.assertIsInstance(server, WebhookServer)
self.assertIsInstance(watchdog, Watchdog)
self.assertIsInstance(orch, Orchestrator)
def test_server_binds_to_configured_host(self):
with tempfile.TemporaryDirectory() as tmp:
config = _config(tmp)
with patch.dict(os.environ, {"GITEA_TOKEN": "tok"}):
server, _, _ = build(config)
addr = server.server_address
server.server_close()
self.assertEqual("127.0.0.1", addr[0])
self.assertGreater(addr[1], 0)
if __name__ == "__main__":
unittest.main()
+38
View File
@@ -0,0 +1,38 @@
"""Unit: Config.from_env."""
from __future__ import annotations
import unittest
from pathlib import Path
from bot_bottle.orchestrator.config import Config
class ConfigTest(unittest.TestCase):
def test_defaults(self):
c = Config.from_env({"HOME": "/home/x"})
self.assertEqual("bot-bottle", c.forge_org)
self.assertEqual(1800, c.watchdog_timeout_secs)
self.assertEqual("127.0.0.1", c.webhook_host)
self.assertEqual(8477, c.webhook_port)
self.assertEqual(Path("/home/x/.bot-bottle/forge-queue"), c.queue_dir)
self.assertIsNone(c.db_path)
def test_overrides(self):
c = Config.from_env({
"HOME": "/home/x",
"FORGE_ORG": "agents",
"FORGE_WATCHDOG_TIMEOUT": "60",
"FORGE_GITEA_API": "https://g.example/api/v1",
"FORGE_WEBHOOK_PORT": "9000",
"FORGE_DB_PATH": "/data/bb.db",
})
self.assertEqual("agents", c.forge_org)
self.assertEqual(60, c.watchdog_timeout_secs)
self.assertEqual("https://g.example/api/v1", c.gitea_api)
self.assertEqual(9000, c.webhook_port)
self.assertEqual(Path("/data/bb.db"), c.db_path)
if __name__ == "__main__":
unittest.main()
+68
View File
@@ -0,0 +1,68 @@
"""Unit: webhook payload parsing."""
from __future__ import annotations
import unittest
from bot_bottle.orchestrator.events import parse_event
from bot_bottle.orchestrator.model import CommentCreated, IssueAssigned, PullRequestClosed
_REPO = {"repository": {"name": "bot-bottle", "owner": {"login": "didericis"}}}
class ParseEventTest(unittest.TestCase):
def test_issue_assigned(self):
payload = {
**_REPO,
"action": "assigned",
"issue": {
"number": 17,
"title": "Fix it",
"body": "please",
"assignees": [{"login": "agent-bot"}],
"labels": [{"name": "bot-bottle:implementer"}],
},
}
ev = parse_event("issues", payload)
self.assertIsInstance(ev, IssueAssigned)
assert isinstance(ev, IssueAssigned)
self.assertEqual(("didericis", "bot-bottle", 17), (ev.owner, ev.repo, ev.issue_number))
self.assertEqual(("agent-bot",), ev.assignees)
self.assertEqual(("bot-bottle:implementer",), ev.labels)
def test_issue_non_assigned_ignored(self):
self.assertIsNone(parse_event("issues", {**_REPO, "action": "opened", "issue": {}}))
def test_comment_created(self):
payload = {
**_REPO,
"action": "created",
"issue": {"number": 42, "pull_request": {"x": 1}},
"comment": {"id": 5, "user": {"login": "reviewer"}, "body": "redo"},
}
ev = parse_event("issue_comment", payload)
assert isinstance(ev, CommentCreated)
self.assertEqual(42, ev.issue_number)
self.assertEqual("reviewer", ev.author)
self.assertTrue(ev.is_pull)
def test_pull_request_closed(self):
payload = {**_REPO, "action": "closed", "pull_request": {"number": 8, "merged": True}}
ev = parse_event("pull_request", payload)
assert isinstance(ev, PullRequestClosed)
self.assertEqual(8, ev.pr_number)
self.assertTrue(ev.merged)
def test_pull_request_non_closed_ignored(self):
self.assertIsNone(parse_event("pull_request", {**_REPO, "action": "opened"}))
def test_comment_non_created_action_ignored(self):
payload = {**_REPO, "action": "edited", "issue": {}, "comment": {}}
self.assertIsNone(parse_event("issue_comment", payload))
def test_unknown_kind_ignored(self):
self.assertIsNone(parse_event("push", {**_REPO}))
if __name__ == "__main__":
unittest.main()
@@ -0,0 +1,75 @@
"""Unit: ForgeState + SqliteForgeStateStore."""
from __future__ import annotations
import unittest
from bot_bottle.contrib.gitea.forge_state import ForgeState, SqliteForgeStateStore
def _state(**kw: object) -> ForgeState:
defaults: dict[str, object] = dict(
owner="alice", repo="myrepo", issue_number=1,
slug="impl-alice-myrepo-1", agent_name="impl",
)
defaults.update(kw)
return ForgeState(**defaults) # type: ignore[arg-type]
class ForgeStateStoreTest(unittest.TestCase):
def setUp(self):
self.store = SqliteForgeStateStore(None)
def test_upsert_and_get(self):
s = _state()
self.store.upsert(s)
got = self.store.get("alice", "myrepo", 1)
assert got is not None
self.assertEqual("impl-alice-myrepo-1", got.slug)
self.assertEqual("impl", got.agent_name)
def test_get_missing(self):
self.assertIsNone(self.store.get("alice", "myrepo", 99))
def test_upsert_replaces(self):
self.store.upsert(_state(status="running"))
self.store.upsert(_state(status="frozen"))
got = self.store.get("alice", "myrepo", 1)
assert got is not None
self.assertEqual("frozen", got.status)
def test_delete(self):
self.store.upsert(_state())
self.store.delete("alice", "myrepo", 1)
self.assertIsNone(self.store.get("alice", "myrepo", 1))
def test_delete_missing_no_error(self):
self.store.delete("alice", "myrepo", 99)
def test_all_sorted(self):
self.store.upsert(_state(owner="z", issue_number=2))
self.store.upsert(_state(owner="a", issue_number=1))
rows = self.store.all()
self.assertEqual(("a", "z"), (rows[0].owner, rows[1].owner))
def test_bottle_names_roundtrip(self):
self.store.upsert(_state(bottle_names=["claude", "dev"]))
got = self.store.get("alice", "myrepo", 1)
assert got is not None
self.assertEqual(["claude", "dev"], got.bottle_names)
def test_pr_number_none_roundtrip(self):
self.store.upsert(_state(pr_number=None))
got = self.store.get("alice", "myrepo", 1)
assert got is not None
self.assertIsNone(got.pr_number)
def test_pr_number_int_roundtrip(self):
self.store.upsert(_state(pr_number=42))
got = self.store.get("alice", "myrepo", 1)
assert got is not None
self.assertEqual(42, got.pr_number)
if __name__ == "__main__":
unittest.main()
+163
View File
@@ -0,0 +1,163 @@
"""Unit: the orchestration lifecycle."""
from __future__ import annotations
import unittest
from typing import cast
from bot_bottle.orchestrator.lifecycle import Orchestrator
from bot_bottle.orchestrator.model import (
STATUS_FROZEN,
STATUS_RUNNING,
CommentCreated,
IssueAssigned,
PullRequestClosed,
)
from bot_bottle.orchestrator.store import InMemoryStateStore
from ._fakes import FakeForge, FakeRunner
def _assigned(
labels: tuple[str, ...] = ("bot-bottle:impl",),
assignees: tuple[str, ...] = ("agent-bot",),
) -> IssueAssigned:
return IssueAssigned(
owner="didericis", repo="bot-bottle", issue_number=17,
title="t", body="the task", assignees=tuple(assignees), labels=tuple(labels),
)
class LifecycleTest(unittest.TestCase):
def setUp(self):
self.forge = FakeForge(members=("agent-bot",))
self.store = InMemoryStateStore()
self.runner = FakeRunner()
self.orch = Orchestrator(
forge=self.forge, store=self.store, runner=self.runner,
org="bot-bottle", gitea_api="https://g/api/v1",
now=lambda: "2026-07-01T00:00:00-04:00",
)
def _record(self):
return self.store.get("didericis", "bot-bottle", 17)
def test_assigned_targeted_launches(self):
self.orch.handle(_assigned())
rec = self._record()
assert rec is not None
self.assertEqual(STATUS_RUNNING, rec.status)
self.assertEqual("impl-didericis-bot-bottle-17", rec.slug)
self.assertEqual("start", self.runner.calls[0][0])
# forge context injected into the child env.
env = cast("dict[str, str]", self.runner.calls[0][5])
self.assertEqual("didericis", env["FORGE_OWNER"])
self.assertEqual("17", env["FORGE_ISSUE_NUMBER"])
def test_untargeted_ignored(self):
self.orch.handle(_assigned(labels=("bug",)))
self.assertIsNone(self._record())
self.assertEqual([], self.runner.calls)
def test_assigned_is_idempotent(self):
self.orch.handle(_assigned())
self.orch.handle(_assigned()) # redelivery
starts = [c for c in self.runner.calls if c[0] == "start"]
self.assertEqual(1, len(starts))
def test_done_signal_freezes(self):
self.orch.handle(_assigned())
self.orch.on_done_signal("didericis", "bot-bottle", 17, "success", "done")
rec = self._record()
assert rec is not None
self.assertEqual(STATUS_FROZEN, rec.status)
self.assertIn(("freeze", "impl-didericis-bot-bottle-17"), self.runner.calls)
def test_done_signal_ignored_when_not_running(self):
# No record yet -> no freeze.
self.orch.on_done_signal("didericis", "bot-bottle", 17, "s", "")
self.assertEqual([], self.runner.calls)
def test_comment_on_frozen_resumes(self):
self.orch.handle(_assigned())
self.orch.on_done_signal("didericis", "bot-bottle", 17, "s", "")
self.orch.handle(CommentCreated(
owner="didericis", repo="bot-bottle", issue_number=17,
comment_id=1, author="reviewer", body="please redo", is_pull=False,
))
rec = self._record()
assert rec is not None
self.assertEqual(STATUS_RUNNING, rec.status)
self.assertIn(("resume", "impl-didericis-bot-bottle-17", "please redo"),
self.runner.calls)
def test_comment_echo_guard(self):
self.orch.handle(_assigned())
self.orch.on_done_signal("didericis", "bot-bottle", 17, "s", "")
rec = self._record()
assert rec is not None
rec.agent_git_user = "agent-bot"
self.store.upsert(rec)
self.orch.handle(CommentCreated(
owner="didericis", repo="bot-bottle", issue_number=17,
comment_id=2, author="agent-bot", body="I finished", is_pull=False,
))
# Still frozen, no resume triggered by the agent's own comment.
self.assertEqual(STATUS_FROZEN, self._record().status) # type: ignore[union-attr]
self.assertNotIn("resume", [c[0] for c in self.runner.calls])
def test_comment_on_running_ignored(self):
self.orch.handle(_assigned()) # running
self.orch.handle(CommentCreated(
owner="didericis", repo="bot-bottle", issue_number=17,
comment_id=1, author="reviewer", body="hi", is_pull=False,
))
self.assertNotIn("resume", [c[0] for c in self.runner.calls])
def test_pr_comment_routes_via_link(self):
self.orch.handle(_assigned())
self.orch.on_done_signal("didericis", "bot-bottle", 17, "s", "")
self.orch.link_pr("didericis", "bot-bottle", 17, 42)
# Comment arrives on PR #42 (issue_number == PR number in Gitea).
self.orch.handle(CommentCreated(
owner="didericis", repo="bot-bottle", issue_number=42,
comment_id=9, author="reviewer", body="fix", is_pull=True,
))
self.assertIn(("resume", "impl-didericis-bot-bottle-17", "fix"),
self.runner.calls)
def test_pr_closed_destroys_and_removes(self):
self.orch.handle(_assigned())
self.orch.link_pr("didericis", "bot-bottle", 17, 42)
self.orch.handle(PullRequestClosed(
owner="didericis", repo="bot-bottle", pr_number=42, merged=True,
))
self.assertIn(("destroy", "impl-didericis-bot-bottle-17"), self.runner.calls)
self.assertIsNone(self._record())
def test_comment_on_untracked_issue_ignored(self):
# No record in store and is_pull=False -> _route_comment returns None.
self.orch.handle(CommentCreated(
owner="didericis", repo="bot-bottle", issue_number=99,
comment_id=1, author="reviewer", body="hi", is_pull=False,
))
self.assertEqual([], self.runner.calls)
def test_pr_closed_untracked_pr_ignored(self):
# _find_by_pr finds nothing -> _on_pr_closed exits early.
self.orch.handle(PullRequestClosed(
owner="didericis", repo="bot-bottle", pr_number=999, merged=True,
))
self.assertEqual([], self.runner.calls)
class IsoNowTest(unittest.TestCase):
def test_returns_iso_string(self):
from bot_bottle.orchestrator.lifecycle import _iso_now
ts = _iso_now()
self.assertIsInstance(ts, str)
self.assertIn("T", ts)
if __name__ == "__main__":
unittest.main()
+88
View File
@@ -0,0 +1,88 @@
"""Unit: __main__ CLI entry points (run and status commands)."""
from __future__ import annotations
import io
import unittest
from unittest.mock import patch
from bot_bottle.orchestrator.__main__ import main
from bot_bottle.orchestrator.config import Config
from bot_bottle.orchestrator.model import RunRecord
def _config() -> Config:
return Config.from_env({"HOME": "/tmp"})
class MainRunTest(unittest.TestCase):
def test_run_delegates_to_bootstrap(self):
config = _config()
with patch.object(Config, "from_env", return_value=config), \
patch("bot_bottle.orchestrator.bootstrap.run") as mock_run:
rc = main(["run"])
self.assertEqual(0, rc)
mock_run.assert_called_once_with(config)
def test_run_prints_listen_address_to_stderr(self):
config = _config()
err = io.StringIO()
with patch.object(Config, "from_env", return_value=config), \
patch("bot_bottle.orchestrator.bootstrap.run"), \
patch("sys.stderr", err):
main(["run"])
self.assertIn(str(config.webhook_port), err.getvalue())
class MainStatusTest(unittest.TestCase):
def test_status_empty_store(self):
config = _config()
with patch.object(Config, "from_env", return_value=config), \
patch("bot_bottle.orchestrator.bootstrap.BotBottleStateStore") as MockStore:
MockStore.return_value.all.return_value = []
rc = main(["status"])
self.assertEqual(0, rc)
def test_status_prints_records(self):
config = _config()
rec = RunRecord(
owner="o", repo="r", issue_number=1, slug="my-slug",
agent_name="a", pr_number=7, status="frozen",
)
out = io.StringIO()
with patch.object(Config, "from_env", return_value=config), \
patch("bot_bottle.orchestrator.bootstrap.BotBottleStateStore") as MockStore, \
patch("sys.stdout", out):
MockStore.return_value.all.return_value = [rec]
rc = main(["status"])
self.assertEqual(0, rc)
self.assertIn("my-slug", out.getvalue())
self.assertIn("PR#7", out.getvalue())
def test_status_no_pr_prints_dash(self):
config = _config()
rec = RunRecord(
owner="o", repo="r", issue_number=2, slug="s2",
agent_name="a", pr_number=None, status="running",
)
out = io.StringIO()
with patch.object(Config, "from_env", return_value=config), \
patch("bot_bottle.orchestrator.bootstrap.BotBottleStateStore") as MockStore, \
patch("sys.stdout", out):
MockStore.return_value.all.return_value = [rec]
main(["status"])
self.assertIn("-", out.getvalue())
class MainArgparseTest(unittest.TestCase):
def test_no_command_exits(self):
with self.assertRaises(SystemExit):
main([])
def test_unknown_command_exits(self):
with self.assertRaises(SystemExit):
main(["bogus"])
if __name__ == "__main__":
unittest.main()
@@ -0,0 +1,53 @@
"""Unit: provenance assembly + serialization."""
from __future__ import annotations
import unittest
from bot_bottle.orchestrator.model import RunRecord
from bot_bottle.orchestrator.provenance import build_provenance, ops_from_log, provenance_to_dict
def _record() -> RunRecord:
return RunRecord(
owner="didericis", repo="bot-bottle", issue_number=17,
slug="impl-17", agent_name="impl", bottle_names=["claude"],
last_checkin_at="2026-07-01T00:05:00-04:00",
)
class ProvenanceTest(unittest.TestCase):
def test_ops_from_log(self):
ops = ops_from_log([
{"at": "T1", "op": "read_pr", "target": 5, "detail": "ok"},
{"at": "T2", "op": "signal_done", "target": None, "detail": "success: done"},
])
self.assertEqual(2, len(ops))
self.assertEqual("read_pr", ops[0].op)
self.assertIsNone(ops[1].target)
def test_build_and_serialize(self):
ops = ops_from_log([{"at": "T1", "op": "post_comment", "target": 17, "detail": "ok"}])
prov = build_provenance(
_record(), ops=ops, started_at="2026-07-01T00:00:00-04:00",
finished_at="2026-07-01T00:05:00-04:00", exit_code=0, watchdog_fired=False,
)
d = provenance_to_dict(prov)
self.assertEqual("impl-17", d["slug"])
self.assertEqual("didericis", d["owner"])
self.assertEqual(["claude"], d["bottles"])
self.assertEqual(0, d["exit_code"])
self.assertFalse(d["watchdog_fired"])
self.assertEqual(1, len(d["ops"]))
self.assertEqual("post_comment", d["ops"][0]["op"])
def test_watchdog_flag_serialized(self):
prov = build_provenance(
_record(), ops=(), started_at="", finished_at="",
exit_code=None, watchdog_fired=True,
)
self.assertTrue(provenance_to_dict(prov)["watchdog_fired"])
if __name__ == "__main__":
unittest.main()
+95
View File
@@ -0,0 +1,95 @@
"""Unit: ProgrammaticBottleRunner + slugify."""
from __future__ import annotations
import sys
import types
import unittest
from typing import Any
from unittest.mock import MagicMock
from bot_bottle.orchestrator.runner import ProgrammaticBottleRunner, slugify
class SlugifyTest(unittest.TestCase):
def test_basic(self):
self.assertEqual("impl-didericis-bot-bottle-17",
slugify("impl-didericis-bot-bottle-17"))
def test_collapses_and_strips(self):
self.assertEqual("a-b-c", slugify(" A_B/C!! "))
def _make_api_stub(**overrides: object) -> Any:
"""Return a mock bot_bottle.api module with sensible defaults."""
stub: Any = types.ModuleType("bot_bottle.api")
stub.start_headless = MagicMock(return_value="impl-r-17")
stub.freeze = MagicMock()
stub.resume_headless = MagicMock()
stub.destroy = MagicMock()
for k, v in overrides.items():
setattr(stub, k, v)
return stub
class ProgrammaticRunnerTest(unittest.TestCase):
def setUp(self) -> None:
self._api: Any = _make_api_stub()
sys.modules["bot_bottle.api"] = self._api
self.runner = ProgrammaticBottleRunner()
def tearDown(self) -> None:
sys.modules.pop("bot_bottle.api", None)
def test_start_returns_slug_from_api(self) -> None:
slug = self.runner.start(
agent="impl", bottles=["claude", "dev"], label="impl-r-17",
prompt="do it", forge_env={"FORGE_OWNER": "didericis"},
)
self.assertEqual("impl-r-17", slug)
def test_start_forwards_all_args(self) -> None:
self.runner.start(
agent="impl", bottles=["claude", "dev"], label="impl-r-17",
prompt="do it", forge_env={"FORGE_OWNER": "didericis"},
)
self._api.start_headless.assert_called_once_with(
"impl",
prompt="do it",
bottles=["claude", "dev"],
label="impl-r-17",
forge_env={"FORGE_OWNER": "didericis"},
)
def test_start_no_bottles_passes_none(self) -> None:
self.runner.start(agent="impl", bottles=[], label="l", prompt="p", forge_env={})
call_kwargs = self._api.start_headless.call_args[1]
self.assertIsNone(call_kwargs["bottles"])
def test_freeze_delegates_to_api(self) -> None:
self.runner.freeze("slug-1")
self._api.freeze.assert_called_once_with("slug-1")
def test_freeze_returns_none(self) -> None:
result = self.runner.freeze("slug-1")
self.assertIsNone(result)
def test_resume_delegates_to_api(self) -> None:
self.runner.resume("slug-1", "address review")
self._api.resume_headless.assert_called_once_with("slug-1", prompt="address review")
def test_resume_returns_none(self) -> None:
result = self.runner.resume("slug-1", "p")
self.assertIsNone(result)
def test_destroy_delegates_to_api(self) -> None:
self.runner.destroy("slug-7")
self._api.destroy.assert_called_once_with("slug-7")
def test_destroy_returns_none(self) -> None:
result = self.runner.destroy("slug-7")
self.assertIsNone(result)
if __name__ == "__main__":
unittest.main()
@@ -0,0 +1,75 @@
"""Unit: ScopedForge — read-anywhere / write-scoped access control."""
from __future__ import annotations
import unittest
from bot_bottle.contrib.forge.base import ScopedForge
from ._fakes import FakeForge
class ScopedForgeTest(unittest.TestCase):
def setUp(self):
self.inner = FakeForge()
self.scoped = ScopedForge(
self.inner, assigned_issue=10, assigned_prs=[20, 30]
)
# --- reads always pass through -----------------------------------------
def test_read_issue_allowed_anywhere(self):
for number in (10, 20, 99):
result = self.scoped.read_issue(number)
self.assertEqual(number, result["number"])
def test_read_pr_allowed_anywhere(self):
for number in (10, 20, 99):
result = self.scoped.read_pr(number)
self.assertEqual(number, result["number"])
def test_read_comments_allowed_anywhere(self):
comments = self.scoped.read_comments(99)
self.assertTrue(len(comments) > 0)
def test_is_org_member_passes_through(self):
inner = FakeForge(members=("alice",))
scoped = ScopedForge(inner, assigned_issue=1, assigned_prs=[])
self.assertTrue(scoped.is_org_member("org", "alice"))
self.assertFalse(scoped.is_org_member("org", "bob"))
# --- writes: assigned numbers allowed ----------------------------------
def test_post_comment_on_assigned_issue(self):
self.scoped.post_comment(10, "hi")
self.assertIn((10, "hi"), self.inner.comments)
def test_post_comment_on_assigned_pr(self):
self.scoped.post_comment(20, "lgtm")
self.assertIn((20, "lgtm"), self.inner.comments)
def test_update_description_on_assigned(self):
self.scoped.update_description(30, "updated")
self.assertIn((30, "updated"), self.inner.descriptions)
# --- writes: unassigned numbers denied ---------------------------------
def test_post_comment_denied_for_unassigned(self):
with self.assertRaises(PermissionError):
self.scoped.post_comment(99, "nope")
self.assertEqual([], self.inner.comments)
def test_update_description_denied_for_unassigned(self):
with self.assertRaises(PermissionError):
self.scoped.update_description(99, "nope")
self.assertEqual([], self.inner.descriptions)
def test_error_message_names_number(self):
try:
self.scoped.post_comment(99, "nope")
except PermissionError as exc:
self.assertIn("99", str(exc))
if __name__ == "__main__":
unittest.main()
+204
View File
@@ -0,0 +1,204 @@
"""Unit: forge sidecar dispatch, op log, queue relay, socket server."""
from __future__ import annotations
import dataclasses
import json
import socket
import tempfile
import threading
import unittest
from pathlib import Path
from bot_bottle.orchestrator.sidecar import (
ForgeSidecar,
OpLog,
_jsonable,
drain_done_events,
serve,
write_done_event,
)
from ._fakes import FakeForge
class SidecarDispatchTest(unittest.TestCase):
def setUp(self):
self.tmp = Path(self.enterContext(tempfile.TemporaryDirectory())) # pylint: disable=consider-using-with
self.forge = FakeForge()
self.log = OpLog(self.tmp / "ops.jsonl", now=lambda: "T")
self.queue = self.tmp / "queue"
self.sc = ForgeSidecar(
forge=self.forge, op_log=self.log, queue_dir=self.queue,
run_key=("o", "r", 17),
)
def test_read_pr_ok_and_logged(self):
resp = self.sc.dispatch("read_pr", {"number": 5})
self.assertTrue(resp["ok"])
self.assertEqual(5, resp["result"]["number"])
self.assertEqual([("read_pr", 5, "ok")],
[(o["op"], o["target"], o["detail"]) for o in self.log.read()])
def test_post_comment_writes_and_logs(self):
resp = self.sc.dispatch("post_comment", {"number": 17, "body": "done"})
self.assertTrue(resp["ok"])
self.assertEqual([(17, "done")], self.forge.comments)
def test_scope_denied_write_returns_error_and_audits_rejection(self):
self.forge.scope_denied.add(999)
resp = self.sc.dispatch("post_comment", {"number": 999, "body": "x"})
self.assertFalse(resp["ok"])
self.assertIn("denied", resp["error"])
# The rejection is recorded in the op log, not just the allows.
self.assertIn("error", self.log.read()[-1]["detail"])
self.assertEqual([], self.forge.comments)
def test_signal_done_queues_event(self):
resp = self.sc.dispatch("signal_done", {"status": "success", "summary": "ok"})
self.assertTrue(resp["ok"])
events = drain_done_events(self.queue)
self.assertEqual(1, len(events))
self.assertEqual(("o", "r", 17, "success"),
(events[0]["owner"], events[0]["repo"],
events[0]["issue_number"], events[0]["status"]))
def test_unknown_method(self):
resp = self.sc.dispatch("delete_repo", {})
self.assertFalse(resp["ok"])
class JsonableTest(unittest.TestCase):
def test_plain_value_passthrough(self):
self.assertEqual(42, _jsonable(42))
self.assertEqual("s", _jsonable("s"))
def test_dataclass_converted_to_dict(self):
@dataclasses.dataclass
class Thing:
x: int
y: str = "hi"
self.assertEqual({"x": 99, "y": "hi"}, _jsonable(Thing(x=99)))
def test_list_recursed(self):
self.assertEqual([1, 2, 3], _jsonable([1, 2, 3]))
def test_list_of_dataclasses(self):
@dataclasses.dataclass
class Item:
v: int
result = _jsonable([Item(v=1), Item(v=2)])
self.assertEqual([{"v": 1}, {"v": 2}], result)
class QueueTest(unittest.TestCase):
def test_drain_removes_events(self):
tmp = Path(self.enterContext(tempfile.TemporaryDirectory())) # pylint: disable=consider-using-with
write_done_event(tmp, {"owner": "o", "repo": "r", "issue_number": 1})
self.assertEqual(1, len(drain_done_events(tmp)))
self.assertEqual([], drain_done_events(tmp)) # drained
def test_drain_missing_dir(self):
self.assertEqual([], drain_done_events(Path("/nonexistent/queue")))
def test_drain_skips_corrupted_file(self):
tmp = Path(self.enterContext(tempfile.TemporaryDirectory())) # pylint: disable=consider-using-with
(tmp / "done-bad.json").write_text("not json", encoding="utf-8")
events = drain_done_events(tmp)
self.assertEqual([], events)
# The corrupted file is removed by the finally block.
self.assertFalse((tmp / "done-bad.json").exists())
class OpLogReadTest(unittest.TestCase):
def test_read_missing_file_returns_empty(self):
with tempfile.TemporaryDirectory() as tmp:
log = OpLog(Path(tmp) / "sub" / "ops.jsonl")
# File not written yet — read() should return [].
self.assertEqual([], log.read())
class SocketServerTest(unittest.TestCase):
def _make_server(self, tmp: Path):
sock = tmp / "s.sock"
if len(str(sock)) > 100:
self.skipTest("temp socket path too long for AF_UNIX")
sidecar = ForgeSidecar(
forge=FakeForge(), op_log=OpLog(tmp / "ops.jsonl"),
queue_dir=tmp / "q", run_key=("o", "r", 1),
)
return serve(sidecar, sock), sock
def test_round_trip_over_unix_socket(self):
tmp = tempfile.mkdtemp()
sock = Path(tmp) / "s.sock"
if len(str(sock)) > 100: # AF_UNIX path limit; skip on long tmp paths
self.skipTest("temp socket path too long for AF_UNIX")
sidecar = ForgeSidecar(
forge=FakeForge(), op_log=OpLog(Path(tmp) / "ops.jsonl"),
queue_dir=Path(tmp) / "q", run_key=("o", "r", 1),
)
srv = serve(sidecar, sock)
t = threading.Thread(target=srv.handle_request, daemon=True)
t.start()
try:
client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
client.connect(str(sock))
client.sendall(b'{"method": "read_issue", "params": {"number": 3}}\n')
line = client.makefile().readline()
client.close()
finally:
t.join(timeout=5)
srv.server_close()
resp = json.loads(line)
self.assertTrue(resp["ok"])
self.assertEqual(3, resp["result"]["number"])
def test_handler_invalid_json_returns_error(self):
tmp = Path(tempfile.mkdtemp())
srv, sock = self._make_server(tmp)
t = threading.Thread(target=srv.handle_request, daemon=True)
t.start()
try:
client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
client.connect(str(sock))
client.sendall(b"not valid json!\n")
line = client.makefile().readline()
client.close()
finally:
t.join(timeout=5)
srv.server_close()
resp = json.loads(line)
self.assertFalse(resp["ok"])
self.assertIn("invalid json", resp["error"])
def test_handler_empty_line_closes_silently(self):
tmp = Path(tempfile.mkdtemp())
srv, sock = self._make_server(tmp)
t = threading.Thread(target=srv.handle_request, daemon=True)
t.start()
try:
client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
client.connect(str(sock))
client.close() # immediate EOF -> readline() returns b""
finally:
t.join(timeout=5)
srv.server_close()
def test_serve_removes_existing_socket_path(self):
tmp = Path(tempfile.mkdtemp())
sock = tmp / "existing.sock"
if len(str(sock)) > 100:
self.skipTest("temp socket path too long for AF_UNIX")
sock.touch() # pre-existing file at socket path
sidecar = ForgeSidecar(
forge=FakeForge(), op_log=OpLog(tmp / "ops.jsonl"),
queue_dir=tmp / "q", run_key=("o", "r", 1),
)
srv = serve(sidecar, sock) # should unlink the pre-existing file
srv.server_close()
if __name__ == "__main__":
unittest.main()
+50
View File
@@ -0,0 +1,50 @@
"""Unit: InMemoryStateStore."""
from __future__ import annotations
import unittest
from bot_bottle.orchestrator.model import RunRecord
from bot_bottle.orchestrator.store import InMemoryStateStore
def _rec(issue: int, owner: str = "o") -> RunRecord:
return RunRecord(owner=owner, repo="r", issue_number=issue, slug=f"s{issue}",
agent_name="a")
class InMemoryStoreTest(unittest.TestCase):
def setUp(self):
self.store = InMemoryStateStore()
def test_upsert_get(self):
self.store.upsert(_rec(1))
got = self.store.get("o", "r", 1)
assert got is not None
self.assertEqual("s1", got.slug)
def test_get_missing(self):
self.assertIsNone(self.store.get("o", "r", 99))
def test_upsert_replaces(self):
self.store.upsert(_rec(1))
r = _rec(1)
r.slug = "changed"
self.store.upsert(r)
self.assertEqual("changed", self.store.get("o", "r", 1).slug) # type: ignore[union-attr]
self.assertEqual(1, len(self.store.all()))
def test_delete(self):
self.store.upsert(_rec(1))
self.store.delete("o", "r", 1)
self.assertIsNone(self.store.get("o", "r", 1))
def test_all_sorted(self):
self.store.upsert(_rec(2, owner="b"))
self.store.upsert(_rec(1, owner="a"))
self.assertEqual([("a", 1), ("b", 2)],
[(r.owner, r.issue_number) for r in self.store.all()])
if __name__ == "__main__":
unittest.main()
+60
View File
@@ -0,0 +1,60 @@
"""Unit: targeting (labels + org membership)."""
from __future__ import annotations
import unittest
from bot_bottle.orchestrator.model import IssueAssigned
from bot_bottle.orchestrator.targeting import parse_labels, resolve_target
from ._fakes import FakeForge
def _issue(
assignees: tuple[str, ...] = ("agent-bot",),
labels: tuple[str, ...] = ("bot-bottle:implementer",),
) -> IssueAssigned:
return IssueAssigned(
owner="didericis", repo="bot-bottle", issue_number=17,
title="t", body="b", assignees=tuple(assignees), labels=tuple(labels),
)
class ParseLabelsTest(unittest.TestCase):
def test_agent_label(self):
self.assertEqual(("implementer", None), parse_labels(("bot-bottle:implementer",)))
def test_bottle_override_not_confused_with_agent(self):
agent, bottle = parse_labels(("bot-bottle:impl", "bot-bottle-bottle:dev"))
self.assertEqual(("impl", "dev"), (agent, bottle))
def test_no_agent_label(self):
self.assertEqual((None, None), parse_labels(("bug", "p1")))
class ResolveTargetTest(unittest.TestCase):
def setUp(self):
self.forge = FakeForge(members=("agent-bot",))
def test_targeted(self):
target = resolve_target(_issue(), self.forge, "bot-bottle")
assert target is not None
self.assertEqual("implementer", target.agent_name)
self.assertIsNone(target.bottle_override)
def test_bottle_override(self):
ev = _issue(labels=("bot-bottle:impl", "bot-bottle-bottle:dev"))
target = resolve_target(ev, self.forge, "bot-bottle")
assert target is not None
self.assertEqual("dev", target.bottle_override)
def test_no_label_not_targeted(self):
self.assertIsNone(resolve_target(_issue(labels=("bug",)), self.forge, "bot-bottle"))
def test_non_member_assignee_not_targeted(self):
ev = _issue(assignees=("random-user",))
self.assertIsNone(resolve_target(ev, self.forge, "bot-bottle"))
if __name__ == "__main__":
unittest.main()
+80
View File
@@ -0,0 +1,80 @@
"""Unit: watchdog sweep."""
from __future__ import annotations
import time
import unittest
import unittest.mock
from datetime import datetime, timedelta
from bot_bottle.orchestrator.model import STATUS_FROZEN, STATUS_RUNNING, RunRecord
from bot_bottle.orchestrator.store import InMemoryStateStore
from bot_bottle.orchestrator.watchdog import Watchdog
from ._fakes import FakeRunner
_NOW = datetime(2026, 7, 1, 12, 0, 0).astimezone()
def _record(issue: int, status: str, checkin: str) -> RunRecord:
return RunRecord(
owner="o", repo="r", issue_number=issue, slug=f"s{issue}",
agent_name="a", status=status, last_checkin_at=checkin,
)
class WatchdogSweepTest(unittest.TestCase):
def setUp(self):
self.store = InMemoryStateStore()
self.runner = FakeRunner()
self.wd = Watchdog(store=self.store, runner=self.runner, timeout_secs=1800)
def _status(self, issue: int) -> str:
rec = self.store.get("o", "r", issue)
assert rec is not None
return rec.status
def test_stale_running_is_frozen(self):
stale = (_NOW - timedelta(minutes=31)).isoformat()
self.store.upsert(_record(1, STATUS_RUNNING, stale))
fired = self.wd.sweep(_NOW)
self.assertEqual([1], [r.issue_number for r in fired])
self.assertEqual(STATUS_FROZEN, self._status(1))
self.assertIn(("freeze", "s1"), self.runner.calls)
def test_fresh_running_untouched(self):
fresh = (_NOW - timedelta(minutes=5)).isoformat()
self.store.upsert(_record(2, STATUS_RUNNING, fresh))
self.assertEqual([], self.wd.sweep(_NOW))
self.assertEqual(STATUS_RUNNING, self._status(2))
def test_non_running_ignored(self):
stale = (_NOW - timedelta(hours=2)).isoformat()
self.store.upsert(_record(3, STATUS_FROZEN, stale))
self.assertEqual([], self.wd.sweep(_NOW))
def test_unparseable_checkin_skipped(self):
self.store.upsert(_record(4, STATUS_RUNNING, "not-a-time"))
self.assertEqual([], self.wd.sweep(_NOW))
def test_start_and_stop(self):
# Exercises the daemon-thread start/stop path; stop sets the event
# so the loop's wait returns immediately.
self.wd.start()
self.wd.stop()
def test_loop_sweeps_stale_record(self):
# Patch tick to near-zero so the loop iterates quickly.
stale = (_NOW - timedelta(hours=1)).isoformat()
self.store.upsert(_record(5, STATUS_RUNNING, stale))
with unittest.mock.patch("bot_bottle.orchestrator.watchdog._TICK_SECS", 0.01):
self.wd.start()
time.sleep(0.05) # enough for several iterations at 0.01s tick
self.wd.stop()
rec = self.store.get("o", "r", 5)
assert rec is not None
self.assertEqual(STATUS_FROZEN, rec.status)
if __name__ == "__main__":
unittest.main()
+161
View File
@@ -0,0 +1,161 @@
"""Unit: webhook HTTP surface (signature + routing over a real server)."""
from __future__ import annotations
import hashlib
import hmac
import json
import threading
import unittest
import urllib.request
from urllib.error import HTTPError
from bot_bottle.orchestrator.model import RunRecord
from bot_bottle.orchestrator.store import InMemoryStateStore
from bot_bottle.orchestrator.webhook import WebhookServer, verify_signature
_ISSUE_ASSIGNED = {
"action": "assigned",
"repository": {"name": "bot-bottle", "owner": {"login": "didericis"}},
"issue": {
"number": 17, "title": "t", "body": "b",
"assignees": [{"login": "agent-bot"}],
"labels": [{"name": "bot-bottle:impl"}],
},
}
class _RecordingOrch:
def __init__(self) -> None:
self.events: list[object] = []
def handle(self, event: object) -> None:
self.events.append(event)
class SignatureTest(unittest.TestCase):
def test_verify(self):
secret = b"s3cret"
body = b'{"x":1}'
sig = hmac.new(secret, body, hashlib.sha256).hexdigest()
self.assertTrue(verify_signature(secret, body, sig))
self.assertFalse(verify_signature(secret, body, "deadbeef"))
class WebhookServerTest(unittest.TestCase):
# _serve is the per-test setup; attributes are assigned there.
# pylint: disable=attribute-defined-outside-init
def _serve(self, **kwargs: object) -> None:
self.orch = _RecordingOrch()
kwargs.setdefault("store", InMemoryStateStore())
self.server = WebhookServer(
("127.0.0.1", 0), orchestrator=self.orch, **kwargs, # type: ignore[arg-type]
)
self.port = self.server.server_address[1]
self.thread = threading.Thread(target=self.server.serve_forever, daemon=True)
self.thread.start()
self.addCleanup(self._shutdown)
def _shutdown(self) -> None:
self.server.shutdown()
self.server.server_close()
self.thread.join(timeout=5)
def _post(
self, path: str, body: bytes, headers: dict[str, str] | None = None
) -> tuple[int, dict[str, object]]:
req = urllib.request.Request(
f"http://127.0.0.1:{self.port}{path}", data=body, method="POST",
headers=headers or {},
)
with urllib.request.urlopen(req, timeout=5) as resp:
return resp.status, json.loads(resp.read())
def _get(self, path: str) -> tuple[int, dict[str, object]]:
with urllib.request.urlopen(f"http://127.0.0.1:{self.port}{path}", timeout=5) as r:
return r.status, json.loads(r.read())
def test_webhook_dispatches(self):
self._serve()
body = json.dumps(_ISSUE_ASSIGNED).encode()
status, payload = self._post("/webhook", body, {"X-Gitea-Event": "issues"})
self.assertEqual(200, status)
self.assertTrue(payload["handled"])
self.assertEqual(1, len(self.orch.events))
def test_unhandled_event_ok_but_not_handled(self):
self._serve()
body = json.dumps({"action": "push"}).encode()
_status, payload = self._post("/webhook", body, {"X-Gitea-Event": "push"})
self.assertFalse(payload["handled"])
self.assertEqual([], self.orch.events)
def test_invalid_json_400(self):
self._serve()
with self.assertRaises(HTTPError) as ctx:
self._post("/webhook", b"{not json", {"X-Gitea-Event": "issues"})
self.assertEqual(400, ctx.exception.code)
def test_bad_signature_rejected(self):
self._serve(secret=b"sekret")
body = json.dumps(_ISSUE_ASSIGNED).encode()
with self.assertRaises(HTTPError) as ctx:
self._post("/webhook", body,
{"X-Gitea-Event": "issues", "X-Gitea-Signature": "deadbeef"})
self.assertEqual(401, ctx.exception.code)
self.assertEqual([], self.orch.events)
def test_good_signature_accepted(self):
self._serve(secret=b"sekret")
body = json.dumps(_ISSUE_ASSIGNED).encode()
sig = hmac.new(b"sekret", body, hashlib.sha256).hexdigest()
status, _payload = self._post(
"/webhook", body, {"X-Gitea-Event": "issues", "X-Gitea-Signature": sig})
self.assertEqual(200, status)
self.assertEqual(1, len(self.orch.events))
def test_healthz(self):
self._serve()
self.assertEqual(200, self._get("/healthz")[0])
def test_unknown_path_404(self):
self._serve()
with self.assertRaises(HTTPError) as ctx:
self._post("/nope", b"{}", {"X-Gitea-Event": "issues"})
self.assertEqual(404, ctx.exception.code)
def test_provenance_returns_record_and_ops(self):
store = InMemoryStateStore()
store.upsert(RunRecord(owner="didericis", repo="bot-bottle", issue_number=17,
slug="impl-17", agent_name="impl", bottle_names=["claude"]))
def reader(rec: object) -> list[dict[str, object]]: # pylint: disable=unused-argument
return [{"at": "T", "op": "post_comment", "target": 17, "detail": "ok"}]
self._serve(store=store, op_log_reader=reader)
status, payload = self._get("/provenance?owner=didericis&repo=bot-bottle&issue=17")
self.assertEqual(200, status)
self.assertEqual("impl-17", payload["slug"])
self.assertEqual(1, len(payload["ops"])) # type: ignore[arg-type]
def test_provenance_missing_params_400(self):
self._serve()
with self.assertRaises(HTTPError) as ctx:
self._get("/provenance?owner=didericis")
self.assertEqual(400, ctx.exception.code)
def test_provenance_unknown_run_404(self):
self._serve()
with self.assertRaises(HTTPError) as ctx:
self._get("/provenance?owner=x&repo=y&issue=1")
self.assertEqual(404, ctx.exception.code)
def test_unknown_get_path_404(self):
self._serve()
with self.assertRaises(HTTPError) as ctx:
self._get("/nope")
self.assertEqual(404, ctx.exception.code)
if __name__ == "__main__":
unittest.main()
+188
View File
@@ -0,0 +1,188 @@
"""Unit: `cli.py start --headless` non-interactive launch path.
Headless is the keystone for orchestrators, CI, and webhook
dispatch: agent/bottles/label come from flags + manifest defaults, no
TUI selectors fire, and the preflight y/N is auto-confirmed
(`assume_yes=True`). All actual launch work is stubbed so no container
is created.
"""
from __future__ import annotations
import os
import unittest
from unittest.mock import MagicMock, patch
import bot_bottle.cli.start as start_mod
import bot_bottle.cli.tui as tui_mod
from bot_bottle.backend import ActiveAgent
from bot_bottle.log import Die
from bot_bottle.manifest import ManifestError
def _make_manifest(
agent_names: list[str],
bottle_names: list[str] | None = None,
agent_bottle: str = "",
):
manifest = MagicMock()
manifest.agents = {name: MagicMock(bottle=agent_bottle) for name in agent_names}
manifest.all_agent_names = sorted(agent_names)
manifest.all_bottle_names = sorted(bottle_names or [])
manifest.home_md = None # eager mode so _peek_agent_bottle uses agents dict
manifest.require_agent = MagicMock(return_value=None)
return manifest
def _active_agent(slug: str) -> ActiveAgent:
return ActiveAgent(
backend_name="docker",
slug=slug,
agent_name="demo",
started_at="2026-01-01T00:00:00+00:00",
services=(),
)
class TestCmdStartHeadless(unittest.TestCase):
"""Drive `cmd_start --headless` with launch + TUI stubbed out."""
def setUp(self):
self._manifest = _make_manifest(
["researcher", "implementer"], ["claude", "dev"], agent_bottle="claude"
)
patch(
"bot_bottle.cli.start.ManifestIndex.resolve",
return_value=self._manifest,
).start()
self._launch_mock = patch(
"bot_bottle.cli.start._launch_bottle", return_value=0
).start()
# No bottles running by default → no label collision.
patch(
"bot_bottle.cli.start.enumerate_active_agents", return_value=[]
).start()
# If any TUI picker fires in headless mode, that's a bug.
self._agent_picker = patch.object(tui_mod, "filter_select").start()
self._bottle_picker = patch.object(tui_mod, "filter_multiselect").start()
self._modal = patch.object(tui_mod, "name_color_modal").start()
patch.dict(os.environ, {}, clear=False).start()
os.environ.pop("BOT_BOTTLE_BACKEND", None)
self.addCleanup(patch.stopall)
def _spec(self):
self._launch_mock.assert_called_once()
return self._launch_mock.call_args[0][0]
# -- no TUI in headless --------------------------------------------
def test_headless_fires_no_pickers(self):
rc = start_mod.cmd_start(
["--headless", "researcher", "--bottle", "claude", "--prompt", "Do it"]
)
self.assertEqual(0, rc)
self._agent_picker.assert_not_called()
self._bottle_picker.assert_not_called()
self._modal.assert_not_called()
def test_headless_assume_yes_forwarded(self):
start_mod.cmd_start(
["--headless", "researcher", "--bottle", "claude", "--prompt", "Do it"]
)
self.assertTrue(self._launch_mock.call_args[1]["assume_yes"])
# -- prompt --------------------------------------------------------
def test_headless_without_prompt_dies(self):
with self.assertRaises(Die):
start_mod.cmd_start(["--headless", "researcher", "--bottle", "claude"])
self._launch_mock.assert_not_called()
def test_headless_prompt_forwarded_to_launch(self):
start_mod.cmd_start(
["--headless", "researcher", "--bottle", "claude",
"--prompt", "Implement issue #42"]
)
self.assertEqual(
"Implement issue #42",
self._launch_mock.call_args[1]["headless_prompt_text"],
)
# -- bottle resolution ---------------------------------------------
def test_explicit_bottles_forwarded_in_order(self):
start_mod.cmd_start(
["--headless", "researcher", "--bottle", "dev", "--bottle", "claude",
"--prompt", "Do it"]
)
self.assertEqual(("dev", "claude"), self._spec().bottle_names)
def test_omitted_bottle_falls_back_to_agent_default(self):
start_mod.cmd_start(["--headless", "implementer", "--prompt", "Do it"])
self.assertEqual(("claude",), self._spec().bottle_names)
def test_no_bottle_and_no_default_dies(self):
manifest = _make_manifest(["researcher"], ["claude"], agent_bottle="")
with patch(
"bot_bottle.cli.start.ManifestIndex.resolve", return_value=manifest
):
with self.assertRaises(Die):
start_mod.cmd_start(
["--headless", "researcher", "--prompt", "Do it"]
)
self._launch_mock.assert_not_called()
# -- agent resolution ----------------------------------------------
def test_missing_agent_name_dies(self):
with self.assertRaises(Die):
start_mod.cmd_start(["--headless"])
self._launch_mock.assert_not_called()
def test_unknown_agent_raises_manifest_error(self):
self._manifest.require_agent.side_effect = ManifestError("agent 'x' not defined")
with self.assertRaises(ManifestError):
start_mod.cmd_start(
["--headless", "x", "--bottle", "claude", "--prompt", "Do it"]
)
self._launch_mock.assert_not_called()
# -- label / color -------------------------------------------------
def test_label_defaults_to_agent_name(self):
start_mod.cmd_start(
["--headless", "researcher", "--bottle", "claude", "--prompt", "Do it"]
)
self.assertEqual("researcher", self._spec().label)
def test_explicit_label_and_color_forwarded(self):
start_mod.cmd_start(
["--headless", "researcher", "--bottle", "claude",
"--label", "nightly", "--color", "green", "--prompt", "Do it"]
)
spec = self._spec()
self.assertEqual("nightly", spec.label)
self.assertEqual("green", spec.color)
def test_label_collision_uniquifies(self):
with patch(
"bot_bottle.cli.start.enumerate_active_agents",
return_value=[_active_agent("researcher")],
):
start_mod.cmd_start(
["--headless", "researcher", "--bottle", "claude", "--prompt", "Do it"]
)
self.assertEqual("researcher-2", self._spec().label)
# -- backend wiring ------------------------------------------------
def test_backend_flag_forwarded(self):
start_mod.cmd_start(
["--headless", "--backend=docker", "researcher", "--bottle", "claude",
"--prompt", "Do it"]
)
self.assertEqual("docker", self._launch_mock.call_args[1]["backend_name"])
if __name__ == "__main__":
unittest.main()
@@ -343,5 +343,14 @@ class TestClaudeSuperviseMcp(unittest.TestCase):
) )
class TestClaudeHeadlessPrompt(unittest.TestCase):
def test_returns_p_flag_and_prompt(self):
self.assertEqual(["-p", "Do the task"], ClaudeAgentProvider().headless_prompt("Do the task"))
def test_preserves_prompt_text_verbatim(self):
text = "Fix issue #42: the widget breaks on empty input"
self.assertEqual(["-p", text], ClaudeAgentProvider().headless_prompt(text))
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
@@ -314,5 +314,14 @@ class TestCodexSuperviseMcp(unittest.TestCase):
) )
class TestCodexHeadlessPrompt(unittest.TestCase):
def test_returns_prompt_as_positional_arg(self):
self.assertEqual(["Do the task"], CodexAgentProvider().headless_prompt("Do the task"))
def test_preserves_prompt_text_verbatim(self):
text = "Fix issue #42: the widget breaks on empty input"
self.assertEqual([text], CodexAgentProvider().headless_prompt(text))
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
+153
View File
@@ -0,0 +1,153 @@
"""Unit: GiteaClient and GiteaForge (urllib mocked — no network)."""
from __future__ import annotations
import json
import unittest
import urllib.error
from unittest.mock import MagicMock, patch
from bot_bottle.contrib.gitea.client import GiteaClient, GiteaForge
def _client() -> GiteaClient:
return GiteaClient(api_url="http://g/api/v1", owner="o", repo="r", token="tok")
def _mock_response(body: bytes) -> MagicMock:
resp = MagicMock()
resp.read.return_value = body
resp.__enter__.return_value = resp
resp.__exit__.return_value = False
return resp
class GiteaClientTest(unittest.TestCase):
# pylint: disable=protected-access
def setUp(self):
self.client = _client()
def test_request_returns_parsed_json(self):
payload = {"number": 42}
resp = _mock_response(json.dumps(payload).encode())
with patch("urllib.request.urlopen", return_value=resp):
result = self.client._request("GET", "/repos/o/r/issues/42")
self.assertEqual(payload, result)
def test_request_empty_body_returns_none(self):
resp = _mock_response(b"")
with patch("urllib.request.urlopen", return_value=resp):
result = self.client._request("POST", "/some/path", {"x": 1})
self.assertIsNone(result)
def test_is_org_member_true_on_200(self):
mock_resp = MagicMock()
mock_resp.close = MagicMock()
with patch("urllib.request.urlopen", return_value=mock_resp):
self.assertTrue(self.client.is_org_member("myorg", "alice"))
def test_is_org_member_false_on_http_error(self):
err = urllib.error.HTTPError("url", 404, "Not Found", None, None) # type: ignore[arg-type]
with patch("urllib.request.urlopen", side_effect=err):
self.assertFalse(self.client.is_org_member("myorg", "nobody"))
def test_get_issue(self):
resp = _mock_response(json.dumps({"number": 1}).encode())
with patch("urllib.request.urlopen", return_value=resp):
result = self.client.get_issue(1)
self.assertEqual(1, result["number"])
def test_get_pull(self):
resp = _mock_response(json.dumps({"number": 7, "merged": False}).encode())
with patch("urllib.request.urlopen", return_value=resp):
result = self.client.get_pull(7)
self.assertEqual(7, result["number"])
def test_list_comments(self):
resp = _mock_response(json.dumps([{"id": 1, "body": "hi"}]).encode())
with patch("urllib.request.urlopen", return_value=resp):
result = self.client.list_comments(1)
self.assertEqual(1, len(result))
self.assertEqual(1, result[0]["id"])
def test_create_comment(self):
resp = _mock_response(b"")
with patch("urllib.request.urlopen", return_value=resp) as mock_open:
self.client.create_comment(1, "hello")
mock_open.assert_called_once()
def test_update_issue(self):
resp = _mock_response(b"")
with patch("urllib.request.urlopen", return_value=resp) as mock_open:
self.client.update_issue(1, "new body")
mock_open.assert_called_once()
def test_request_builds_correct_url(self):
import urllib.request as ureq
captured: list[ureq.Request] = []
def fake_urlopen(req: ureq.Request, timeout: float) -> MagicMock: # pylint: disable=unused-argument
captured.append(req)
return _mock_response(b"{}")
with patch("urllib.request.urlopen", side_effect=fake_urlopen):
self.client.get_issue(5)
self.assertIn("/issues/5", captured[0].full_url)
def test_request_sends_auth_header(self):
import urllib.request as ureq
captured: list[ureq.Request] = []
def fake_urlopen(req: ureq.Request, timeout: float) -> MagicMock: # pylint: disable=unused-argument
captured.append(req)
return _mock_response(b"{}")
with patch("urllib.request.urlopen", side_effect=fake_urlopen):
self.client.get_issue(1)
self.assertEqual("token tok", captured[0].get_header("Authorization"))
class GiteaForgeTest(unittest.TestCase):
def setUp(self):
self.client = MagicMock(spec=GiteaClient)
self.forge = GiteaForge(self.client)
def test_is_org_member_delegates(self):
self.client.is_org_member.return_value = True
self.assertTrue(self.forge.is_org_member("org", "alice"))
self.client.is_org_member.assert_called_once_with("org", "alice")
def test_is_org_member_false(self):
self.client.is_org_member.return_value = False
self.assertFalse(self.forge.is_org_member("org", "outsider"))
def test_read_issue_delegates(self):
self.client.get_issue.return_value = {"number": 3}
self.assertEqual({"number": 3}, self.forge.read_issue(3))
self.client.get_issue.assert_called_once_with(3)
def test_read_pr_delegates(self):
self.client.get_pull.return_value = {"number": 5, "merged": False}
result = self.forge.read_pr(5)
self.assertEqual(5, result["number"])
self.client.get_pull.assert_called_once_with(5)
def test_read_comments_delegates(self):
self.client.list_comments.return_value = [{"id": 1}]
comments = self.forge.read_comments(1)
self.assertEqual([{"id": 1}], comments)
self.client.list_comments.assert_called_once_with(1)
def test_post_comment_delegates(self):
self.forge.post_comment(1, "looks good")
self.client.create_comment.assert_called_once_with(1, "looks good")
def test_update_description_delegates(self):
self.forge.update_description(1, "updated body")
self.client.update_issue.assert_called_once_with(1, "updated body")
if __name__ == "__main__":
unittest.main()
+9
View File
@@ -223,5 +223,14 @@ class TestPiDockerfile(unittest.TestCase):
self.assertIn("chmod 1777 /tmp /var/tmp", dockerfile) self.assertIn("chmod 1777 /tmp /var/tmp", dockerfile)
class TestPiHeadlessPrompt(unittest.TestCase):
def test_returns_p_flag_and_prompt(self):
self.assertEqual(["-p", "Do the task"], PiAgentProvider().headless_prompt("Do the task"))
def test_preserves_prompt_text_verbatim(self):
text = "Fix issue #42: the widget breaks on empty input"
self.assertEqual(["-p", text], PiAgentProvider().headless_prompt(text))
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
@@ -38,6 +38,7 @@ class _Provider(AgentProvider):
def provision_prompt(self, plan, bottle): ... # type: ignore[override] def provision_prompt(self, plan, bottle): ... # type: ignore[override]
def provision(self, plan, bottle): ... # type: ignore[override] def provision(self, plan, bottle): ... # type: ignore[override]
def provision_supervise_mcp(self, plan, bottle, supervise_url): ... # type: ignore[override] def provision_supervise_mcp(self, plan, bottle, supervise_url): ... # type: ignore[override]
def headless_prompt(self, prompt): return [] # type: ignore[override]
_PROVIDER = _Provider() _PROVIDER = _Provider()
+22
View File
@@ -14,6 +14,7 @@ from bot_bottle.git_gate import (
git_gate_render_access_hook, git_gate_render_access_hook,
git_gate_render_entrypoint, git_gate_render_entrypoint,
git_gate_render_hook, git_gate_render_hook,
provision_git_gate_dynamic_keys,
revoke_git_gate_provisioned_keys, revoke_git_gate_provisioned_keys,
_resolve_identity_file, _resolve_identity_file,
git_gate_upstreams_for_bottle, git_gate_upstreams_for_bottle,
@@ -371,6 +372,27 @@ class TestDynamicKeyProvisioning(unittest.TestCase):
self.assertEqual("/tmp/provisioned-key", _resolve_identity_file(entry, "demo", self.stage)) self.assertEqual("/tmp/provisioned-key", _resolve_identity_file(entry, "demo", self.stage))
mock_provision.assert_called_once() mock_provision.assert_called_once()
def test_prepare_defers_gitea_key_provisioning(self):
bottle = self._gitea_manifest().bottles["dev"]
with patch("bot_bottle.git_gate_provision._provision_dynamic_key") as mock_provision:
plan = _StubGate().prepare(bottle, "demo", self.stage)
mock_provision.assert_not_called()
self.assertEqual("", plan.upstreams[0].identity_file)
def test_launch_time_helper_provisions_gitea_keys(self):
bottle = self._gitea_manifest().bottles["dev"]
plan = _StubGate().prepare(bottle, "demo", self.stage)
with patch(
"bot_bottle.git_gate_provision._provision_dynamic_key",
return_value="/tmp/provisioned-key",
) as mock_provision:
updated = provision_git_gate_dynamic_keys(bottle, plan, self.stage)
mock_provision.assert_called_once_with(bottle.git[0], "demo", self.stage)
self.assertEqual("/tmp/provisioned-key", updated.upstreams[0].identity_file)
def test_revoke_skips_non_gitea_and_missing_id_file(self): def test_revoke_skips_non_gitea_and_missing_id_file(self):
revoke_git_gate_provisioned_keys(fixture_with_git().bottles["dev"], self.stage) revoke_git_gate_provisioned_keys(fixture_with_git().bottles["dev"], self.stage)
+16
View File
@@ -165,6 +165,22 @@ class TestAgentValidation(unittest.TestCase):
with self.assertRaises(ManifestError): with self.assertRaises(ManifestError):
ManifestAgent.from_dict("a", {"skills": [5]}, set()) ManifestAgent.from_dict("a", {"skills": [5]}, set())
def test_skill_name_rejects_shell_metacharacters(self) -> None:
# Skill names become host/guest path segments interpolated into
# provisioning shell commands; anything outside kebab-case is
# rejected at load so it can never reach a `bottle.exec` string.
for bad in ("foo; rm -rf /", "../escape", "foo bar", "Foo", "-leading"):
with self.assertRaises(ManifestError):
ManifestAgent.from_dict("a", {"skills": [bad]}, set())
def test_skill_name_accepts_kebab_case(self) -> None:
agent = ManifestAgent.from_dict(
"a", {"skills": ["init-entry", "quality-eval", "skill0"]}, set()
)
self.assertEqual(
agent.skills, ("init-entry", "quality-eval", "skill0")
)
def test_prompt_not_string(self) -> None: def test_prompt_not_string(self) -> None:
with self.assertRaises(ManifestError): with self.assertRaises(ManifestError):
ManifestAgent.from_dict("a", {"prompt": 5}, set()) ManifestAgent.from_dict("a", {"prompt": 5}, set())
@@ -49,6 +49,7 @@ class _Provider(AgentProvider):
def provision_prompt(self, plan, bottle): ... # type: ignore[override] def provision_prompt(self, plan, bottle): ... # type: ignore[override]
def provision(self, plan, bottle): ... # type: ignore[override] def provision(self, plan, bottle): ... # type: ignore[override]
def provision_supervise_mcp(self, plan, bottle, supervise_url): ... # type: ignore[override] def provision_supervise_mcp(self, plan, bottle, supervise_url): ... # type: ignore[override]
def headless_prompt(self, prompt): return [] # type: ignore[override]
_PROVIDER = _Provider() _PROVIDER = _Provider()