Compare commits
15 Commits
a455ed6e00
...
dfc035959e
| Author | SHA1 | Date | |
|---|---|---|---|
| dfc035959e | |||
| 91f67c79ba | |||
| bb21c294b4 | |||
| 6283542a7b | |||
| 5824aa6b8d | |||
| 720d69d6ea | |||
| bcbdf7fdec | |||
| 36c5b7025b | |||
| 515a95a79d | |||
| 0bace7615a | |||
| c0d3f16519 | |||
| 508c537deb | |||
| d99dba037c | |||
| 9a878bd885 | |||
| 0f72843150 |
@@ -72,6 +72,9 @@ class BottleSpec:
|
||||
identity: str = ""
|
||||
label: str = ""
|
||||
color: str = ""
|
||||
# Ordered bottle names selected at launch (issue #269). When non-empty
|
||||
# they are merged in order and replace the agent's `bottle:` field.
|
||||
bottle_names: tuple[str, ...] = ()
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
@@ -129,7 +132,11 @@ class BottlePlan(ABC):
|
||||
info(f"provider : {self.agent_provision.template}")
|
||||
print_multi("env ", env_names)
|
||||
print_multi("skills ", list(agent.skills))
|
||||
info(f"bottle : {agent.bottle}")
|
||||
effective_bottles = (
|
||||
list(spec.bottle_names) if spec.bottle_names
|
||||
else ([agent.bottle] if agent.bottle else [])
|
||||
)
|
||||
print_multi("bottle ", effective_bottles)
|
||||
|
||||
identity = manifest.git_identity_summary()
|
||||
if identity:
|
||||
@@ -363,7 +370,7 @@ class BottleBackend(ABC, Generic[PlanT, CleanupT]):
|
||||
Returns the loaded Manifest for the selected agent. Subclasses with
|
||||
additional preconditions should override and call
|
||||
`super()._validate(spec)` first."""
|
||||
manifest = spec.manifest.load_for_agent(spec.agent_name)
|
||||
manifest = spec.manifest.load_for_agent(spec.agent_name, spec.bottle_names)
|
||||
self._validate_skills(manifest.agent.skills)
|
||||
self._validate_agent_provider_dockerfile(spec, manifest)
|
||||
return manifest
|
||||
@@ -389,9 +396,12 @@ class BottleBackend(ABC, Generic[PlanT, CleanupT]):
|
||||
if not path.is_absolute():
|
||||
path = Path(spec.user_cwd) / path
|
||||
if not path.is_file():
|
||||
effective = (
|
||||
", ".join(spec.bottle_names) if spec.bottle_names else manifest.agent.bottle
|
||||
)
|
||||
die(
|
||||
f"agent_provider.dockerfile for bottle "
|
||||
f"'{manifest.agent.bottle}' not found: {path}"
|
||||
f"'{effective}' not found: {path}"
|
||||
)
|
||||
|
||||
@abstractmethod
|
||||
|
||||
@@ -68,6 +68,11 @@ def build_image(ref: str, context: str, *, dockerfile: str = "") -> None:
|
||||
_ensure_builder_dns()
|
||||
args = [_CONTAINER, "build", "-t", ref, "--dns", dns_server()]
|
||||
if dockerfile:
|
||||
# `container build` resolves -f relative to the current working
|
||||
# directory, not the build context. Anchor a relative Dockerfile to
|
||||
# the context so builds work from any cwd.
|
||||
if not os.path.isabs(dockerfile):
|
||||
dockerfile = os.path.join(context, dockerfile)
|
||||
args.extend(["-f", dockerfile])
|
||||
args.append(context)
|
||||
subprocess.run(args, check=True)
|
||||
|
||||
@@ -63,6 +63,7 @@ def write_launch_metadata(
|
||||
backend=backend,
|
||||
label=spec.label,
|
||||
color=spec.color,
|
||||
bottle_names=spec.bottle_names,
|
||||
))
|
||||
|
||||
|
||||
|
||||
@@ -112,6 +112,10 @@ class BottleMetadata:
|
||||
backend: str = ""
|
||||
label: str = ""
|
||||
color: str = ""
|
||||
# Ordered bottle names selected at launch (issue #269). Empty tuple
|
||||
# for state dirs written before this change; resume falls back to
|
||||
# the agent's `bottle:` field in that case.
|
||||
bottle_names: tuple[str, ...] = ()
|
||||
|
||||
|
||||
def metadata_path(identity: str) -> Path:
|
||||
@@ -139,6 +143,10 @@ def read_metadata(identity: str) -> BottleMetadata | None:
|
||||
if not isinstance(raw, dict):
|
||||
return None
|
||||
raw_typed = cast(dict[str, object], raw)
|
||||
raw_bottle_names = raw_typed.get("bottle_names", [])
|
||||
bottle_names: tuple[str, ...] = ()
|
||||
if isinstance(raw_bottle_names, list):
|
||||
bottle_names = tuple(str(n) for n in raw_bottle_names if isinstance(n, str))
|
||||
return BottleMetadata(
|
||||
identity=str(raw_typed.get("identity", identity)),
|
||||
agent_name=str(raw_typed.get("agent_name", "")),
|
||||
@@ -149,6 +157,7 @@ def read_metadata(identity: str) -> BottleMetadata | None:
|
||||
backend=str(raw_typed.get("backend", "")),
|
||||
label=str(raw_typed.get("label", "")),
|
||||
color=str(raw_typed.get("color", "")),
|
||||
bottle_names=bottle_names,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -50,6 +50,7 @@ def cmd_resume(argv: list[str]) -> int:
|
||||
copy_cwd=metadata.copy_cwd,
|
||||
user_cwd=metadata.cwd or USER_CWD,
|
||||
identity=metadata.identity,
|
||||
bottle_names=tuple(metadata.bottle_names),
|
||||
)
|
||||
backend_name = metadata.backend or None
|
||||
return _launch_bottle(
|
||||
|
||||
+154
-2
@@ -33,7 +33,7 @@ from ..bottle_state import (
|
||||
)
|
||||
# from ..backend.docker.capability_apply import snapshot_transcript
|
||||
from ..log import info
|
||||
from ..manifest import ManifestIndex
|
||||
from ..manifest import Manifest, ManifestIndex
|
||||
from ._common import PROG, USER_CWD, read_tty_line
|
||||
from . import tui
|
||||
|
||||
@@ -74,6 +74,23 @@ def cmd_start(argv: list[str]) -> int:
|
||||
|
||||
backend_name: str | None = args.backend
|
||||
|
||||
# Bottle multiselect: always show after agent selection so operators
|
||||
# can compose bottles at launch time without editing agent manifests.
|
||||
available_bottles = manifest.all_bottle_names
|
||||
lineage_map = _bottle_lineage(manifest)
|
||||
display_labels = [lineage_map.get(n, n) for n in available_bottles]
|
||||
label_to_name = {lineage_map.get(n, n): n for n in available_bottles}
|
||||
initial_bottle = _peek_agent_bottle(manifest, agent_name)
|
||||
initial_labels = [lineage_map.get(initial_bottle, initial_bottle)] if initial_bottle else []
|
||||
selected_labels = tui.filter_multiselect(
|
||||
display_labels,
|
||||
title="Select bottles",
|
||||
initial=initial_labels,
|
||||
)
|
||||
if selected_labels is None:
|
||||
return 0
|
||||
bottle_names = tuple(label_to_name.get(lbl, lbl) for lbl in selected_labels)
|
||||
|
||||
label, color = tui.name_color_modal(default_label=agent_name)
|
||||
label, color = _resolve_unique_label(label, color)
|
||||
|
||||
@@ -84,6 +101,7 @@ def cmd_start(argv: list[str]) -> int:
|
||||
user_cwd=USER_CWD,
|
||||
label=label,
|
||||
color=color,
|
||||
bottle_names=bottle_names,
|
||||
)
|
||||
return _launch_bottle(
|
||||
spec,
|
||||
@@ -190,6 +208,38 @@ def _identity_from_plan(plan: object) -> str:
|
||||
return getattr(plan, "slug", "")
|
||||
|
||||
|
||||
def _peek_agent_bottle(manifest: ManifestIndex, agent_name: str) -> str:
|
||||
"""Return the `bottle:` value from the named agent's frontmatter without
|
||||
fully parsing the agent file, or "" when absent or unreadable.
|
||||
|
||||
Used to pre-populate the bottle multiselect with the agent's default
|
||||
bottle so operators who haven't removed `bottle:` from their manifests
|
||||
don't need to re-select it every time."""
|
||||
if manifest.home_md is None:
|
||||
# Eager mode (from_json_obj): agent is pre-parsed.
|
||||
if agent_name in manifest.agents:
|
||||
return manifest.agents[agent_name].bottle
|
||||
return ""
|
||||
|
||||
from ..manifest_loader import scan_agent_names
|
||||
from ..yaml_subset import YamlSubsetError, parse_frontmatter
|
||||
|
||||
home_agents = scan_agent_names(manifest.home_md / "agents")
|
||||
cwd_agents: dict[str, Path] = {}
|
||||
if manifest.cwd_md is not None:
|
||||
cwd_agents = scan_agent_names(manifest.cwd_md / "agents")
|
||||
merged = {**home_agents, **cwd_agents}
|
||||
path = merged.get(agent_name)
|
||||
if path is None:
|
||||
return ""
|
||||
try:
|
||||
fm, _ = parse_frontmatter(path.read_text())
|
||||
bottle = fm.get("bottle", "")
|
||||
return str(bottle) if isinstance(bottle, str) else ""
|
||||
except (OSError, YamlSubsetError):
|
||||
return ""
|
||||
|
||||
|
||||
def _resolve_unique_label(label: str, color: str) -> tuple[str, str]:
|
||||
"""Re-prompt with a disclaimer until the label's slug is not already
|
||||
in use among running bottles. Passes through unchanged when no
|
||||
@@ -216,10 +266,112 @@ def _text_prompt_yes() -> bool:
|
||||
|
||||
def _text_render_preflight():
|
||||
def _render(plan: DockerBottlePlan) -> None:
|
||||
plan.print()
|
||||
print(file=sys.stderr)
|
||||
print(_manifest_to_yaml(plan.manifest), file=sys.stderr)
|
||||
return _render
|
||||
|
||||
|
||||
def _bottle_lineage(manifest: ManifestIndex) -> dict[str, str]:
|
||||
"""Return {bottle_name: lineage_label} for bottles that have an extends chain.
|
||||
|
||||
Bottles without a parent are omitted (the caller falls back to the bare name).
|
||||
Labels show the chain root-first: e.g. 'claude-dev <- bot-bottle-dev <- dev'."""
|
||||
if manifest.home_md is None:
|
||||
return {}
|
||||
bottles_dir = manifest.home_md / "bottles"
|
||||
if not bottles_dir.is_dir():
|
||||
return {}
|
||||
|
||||
from ..yaml_subset import YamlSubsetError, parse_frontmatter
|
||||
|
||||
extends_of: dict[str, str] = {}
|
||||
for path in bottles_dir.glob("*.md"):
|
||||
try:
|
||||
fm, _ = parse_frontmatter(path.read_text())
|
||||
parent = fm.get("extends", "")
|
||||
if isinstance(parent, str) and parent:
|
||||
extends_of[path.stem] = parent
|
||||
except (OSError, YamlSubsetError):
|
||||
pass
|
||||
|
||||
labels: dict[str, str] = {}
|
||||
for name in extends_of:
|
||||
chain = [name]
|
||||
seen = {name}
|
||||
cur = name
|
||||
while cur in extends_of:
|
||||
par = extends_of[cur]
|
||||
if par in seen:
|
||||
break
|
||||
chain.append(par)
|
||||
seen.add(par)
|
||||
cur = par
|
||||
labels[name] = " <- ".join(reversed(chain))
|
||||
|
||||
return labels
|
||||
|
||||
|
||||
def _manifest_to_yaml(manifest: Manifest) -> str:
|
||||
"""Serialize the resolved Manifest to a YAML string for preflight display."""
|
||||
lines: list[str] = []
|
||||
|
||||
agent = manifest.agent
|
||||
lines.append("agent:")
|
||||
if agent.skills:
|
||||
lines.append(" skills:")
|
||||
for s in agent.skills:
|
||||
lines.append(f" - {s}")
|
||||
if not agent.git_user.is_empty():
|
||||
lines.append(" git-gate:")
|
||||
lines.append(" user:")
|
||||
if agent.git_user.name:
|
||||
lines.append(f" name: {agent.git_user.name}")
|
||||
if agent.git_user.email:
|
||||
lines.append(f" email: {agent.git_user.email}")
|
||||
|
||||
bottle = manifest.bottle
|
||||
lines.append("bottle:")
|
||||
|
||||
if bottle.agent_provider.template != "claude" or bottle.agent_provider.dockerfile:
|
||||
lines.append(" agent_provider:")
|
||||
lines.append(f" template: {bottle.agent_provider.template}")
|
||||
if bottle.agent_provider.dockerfile:
|
||||
lines.append(f" dockerfile: {bottle.agent_provider.dockerfile}")
|
||||
|
||||
if bottle.env:
|
||||
lines.append(" env:")
|
||||
for k, v in sorted(bottle.env.items()):
|
||||
lines.append(f" {k}: {v}")
|
||||
|
||||
has_git_gate = not bottle.git_user.is_empty() or bottle.git
|
||||
if has_git_gate:
|
||||
lines.append(" git-gate:")
|
||||
if not bottle.git_user.is_empty():
|
||||
lines.append(" user:")
|
||||
if bottle.git_user.name:
|
||||
lines.append(f" name: {bottle.git_user.name}")
|
||||
if bottle.git_user.email:
|
||||
lines.append(f" email: {bottle.git_user.email}")
|
||||
if bottle.git:
|
||||
lines.append(" repos:")
|
||||
for entry in bottle.git:
|
||||
lines.append(f" {entry.Name}:")
|
||||
lines.append(f" url: {entry.Upstream}")
|
||||
|
||||
if bottle.egress.routes:
|
||||
lines.append(" egress:")
|
||||
lines.append(" routes:")
|
||||
for r in bottle.egress.routes:
|
||||
lines.append(f" - host: {r.Host}")
|
||||
if r.AuthScheme:
|
||||
lines.append(f" auth:")
|
||||
lines.append(f" scheme: {r.AuthScheme}")
|
||||
|
||||
lines.append(f" supervise: {'true' if bottle.supervise else 'false'}")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def _launch_bottle(
|
||||
spec: BottleSpec,
|
||||
*,
|
||||
|
||||
@@ -17,6 +17,43 @@ import sys
|
||||
from typing import Any, Optional
|
||||
|
||||
|
||||
def filter_multiselect(
|
||||
items: list[str],
|
||||
*,
|
||||
title: str = "",
|
||||
initial: Optional[list[str]] = None,
|
||||
tty_path: str = "/dev/tty",
|
||||
) -> Optional[list[str]]:
|
||||
"""Render a multi-select picker over *items*.
|
||||
|
||||
Returns the ordered list of selected items, or ``None`` if the user
|
||||
cancelled (Esc / ``q`` / Ctrl-C / Ctrl-D with no items).
|
||||
|
||||
Press Space to toggle the item under the cursor.
|
||||
Press Enter to confirm the current selection.
|
||||
Press Ctrl-D to confirm the current selection (returns even if empty).
|
||||
Press Esc/q to cancel (returns None).
|
||||
|
||||
*initial* pre-populates the selection in insertion order. Items
|
||||
added are appended; removed items leave the remaining order unchanged.
|
||||
"""
|
||||
if not items:
|
||||
return []
|
||||
|
||||
try:
|
||||
tty_fd = open(tty_path, "r+b", buffering=0)
|
||||
except OSError:
|
||||
return None
|
||||
|
||||
try:
|
||||
fd_dup = os.dup(tty_fd.fileno())
|
||||
return _run_multiselect(
|
||||
items, title=title, initial=list(initial or []), tty_fd=fd_dup
|
||||
)
|
||||
finally:
|
||||
tty_fd.close()
|
||||
|
||||
|
||||
def filter_select(
|
||||
items: list[str],
|
||||
*,
|
||||
@@ -221,6 +258,261 @@ def _addstr_safe(screen: Any, row: int, col: int, text: str, attr: int = curses.
|
||||
pass
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# filter_multiselect internals
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_KEY_SPACE = 32
|
||||
|
||||
|
||||
def _run_multiselect(
|
||||
items: list[str], *, title: str, initial: list[str], tty_fd: int
|
||||
) -> Optional[list[str]]:
|
||||
"""Drive a curses multi-select session on *tty_fd*."""
|
||||
os.environ.setdefault("TERM", "xterm-256color")
|
||||
|
||||
orig_stdin = sys.__stdin__
|
||||
orig_stdout = sys.__stdout__
|
||||
|
||||
try:
|
||||
import io
|
||||
tty_text = io.TextIOWrapper(io.FileIO(tty_fd, mode='r+'), write_through=True)
|
||||
sys.__stdin__ = tty_text # type: ignore[assignment]
|
||||
sys.__stdout__ = tty_text # type: ignore[assignment]
|
||||
|
||||
screen = curses.initscr()
|
||||
curses.noecho()
|
||||
curses.cbreak()
|
||||
screen.keypad(True)
|
||||
|
||||
try:
|
||||
result = _multiselect_loop(screen, items, title=title, initial=initial)
|
||||
finally:
|
||||
screen.keypad(False)
|
||||
curses.nocbreak()
|
||||
curses.echo()
|
||||
curses.endwin()
|
||||
except Exception: # noqa: W0718
|
||||
return None
|
||||
finally:
|
||||
sys.__stdin__ = orig_stdin # type: ignore[assignment]
|
||||
sys.__stdout__ = orig_stdout # type: ignore[assignment]
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def _multiselect_loop(
|
||||
screen: Any, items: list[str], *, title: str, initial: list[str]
|
||||
) -> Optional[list[str]]:
|
||||
query = ""
|
||||
cursor = 0
|
||||
selected: list[str] = [s for s in initial if s in items]
|
||||
# focus = "filter": navigate + toggle items in the filterable list
|
||||
# focus = "order": navigate + reorder items in the selected list
|
||||
focus = "filter"
|
||||
order_cursor = 0
|
||||
|
||||
while True:
|
||||
filtered = _filter_items(items, query)
|
||||
|
||||
if not filtered:
|
||||
cursor = 0
|
||||
elif cursor >= len(filtered):
|
||||
cursor = len(filtered) - 1
|
||||
|
||||
if not selected:
|
||||
order_cursor = 0
|
||||
if focus == "order":
|
||||
focus = "filter"
|
||||
elif order_cursor >= len(selected):
|
||||
order_cursor = len(selected) - 1
|
||||
|
||||
try:
|
||||
_render_multiselect(
|
||||
screen, filtered, cursor,
|
||||
query=query, title=title, selected=selected,
|
||||
focus=focus, order_cursor=order_cursor,
|
||||
)
|
||||
except curses.error:
|
||||
return None
|
||||
|
||||
try:
|
||||
key = screen.getch()
|
||||
except KeyboardInterrupt:
|
||||
return None
|
||||
|
||||
if key in (_KEY_ESC, _KEY_CTRL_C, ord("q")):
|
||||
return None
|
||||
|
||||
if key == _KEY_CTRL_D:
|
||||
return list(selected)
|
||||
|
||||
# Tab toggles between filter and order focus.
|
||||
if key == ord("\t"):
|
||||
if focus == "filter" and selected:
|
||||
focus = "order"
|
||||
order_cursor = 0
|
||||
else:
|
||||
focus = "filter"
|
||||
continue
|
||||
|
||||
if focus == "filter":
|
||||
if key in (curses.KEY_ENTER, _KEY_ENTER_ALT, ord("\r")):
|
||||
return list(selected)
|
||||
|
||||
elif key == _KEY_SPACE:
|
||||
if filtered:
|
||||
item = filtered[cursor]
|
||||
if item in selected:
|
||||
selected.remove(item)
|
||||
else:
|
||||
selected.append(item)
|
||||
|
||||
elif key in (curses.KEY_UP, ord("k")):
|
||||
if cursor > 0:
|
||||
cursor -= 1
|
||||
|
||||
elif key in (curses.KEY_DOWN, ord("j")):
|
||||
if cursor < len(filtered) - 1:
|
||||
cursor += 1
|
||||
|
||||
elif key in (curses.KEY_BACKSPACE, _KEY_BACKSPACE_WIN, 127):
|
||||
query = query[:-1]
|
||||
new_filtered = _filter_items(items, query)
|
||||
if cursor >= len(new_filtered):
|
||||
cursor = max(0, len(new_filtered) - 1)
|
||||
|
||||
elif 32 <= key <= 126 and key != _KEY_SPACE:
|
||||
query += chr(key)
|
||||
cursor = 0
|
||||
|
||||
else: # focus == "order"
|
||||
if key in (curses.KEY_UP, ord("k")):
|
||||
if order_cursor > 0:
|
||||
order_cursor -= 1
|
||||
|
||||
elif key in (curses.KEY_DOWN, ord("j")):
|
||||
if order_cursor < len(selected) - 1:
|
||||
order_cursor += 1
|
||||
|
||||
elif key == ord("K"):
|
||||
# Move selected item up (earlier in order).
|
||||
if order_cursor > 0:
|
||||
i = order_cursor
|
||||
selected[i - 1], selected[i] = selected[i], selected[i - 1]
|
||||
order_cursor -= 1
|
||||
|
||||
elif key == ord("J"):
|
||||
# Move selected item down (later in order).
|
||||
if order_cursor < len(selected) - 1:
|
||||
i = order_cursor
|
||||
selected[i], selected[i + 1] = selected[i + 1], selected[i]
|
||||
order_cursor += 1
|
||||
|
||||
elif key in (curses.KEY_ENTER, _KEY_ENTER_ALT, ord("\r"), _KEY_SPACE):
|
||||
# Remove item from selection while in order mode.
|
||||
del selected[order_cursor]
|
||||
if order_cursor >= len(selected) and order_cursor > 0:
|
||||
order_cursor -= 1
|
||||
|
||||
|
||||
def _render_multiselect(
|
||||
screen: Any,
|
||||
filtered: list[str],
|
||||
cursor: int,
|
||||
*,
|
||||
query: str,
|
||||
title: str,
|
||||
selected: list[str],
|
||||
focus: str = "filter",
|
||||
order_cursor: int = 0,
|
||||
) -> None:
|
||||
screen.erase()
|
||||
rows, cols = screen.getmaxyx()
|
||||
min_rows = 7
|
||||
|
||||
if rows < min_rows:
|
||||
raise curses.error("terminal too small")
|
||||
|
||||
sep = "─" * min(cols - 1, 40)
|
||||
row = 0
|
||||
|
||||
if title and row < rows - 1:
|
||||
_addstr_safe(screen, row, 0, title[:cols - 1], curses.A_BOLD)
|
||||
row += 1
|
||||
|
||||
# Filter line — dim when focus is on the order panel.
|
||||
filter_label = f"Filter: {query}"
|
||||
filter_hint = " [Tab: reorder]" if focus == "filter" and selected else ""
|
||||
filter_attr = curses.A_DIM if focus == "order" else curses.A_NORMAL
|
||||
if row < rows - 1:
|
||||
_addstr_safe(screen, row, 0, (filter_label + filter_hint)[:cols - 1], filter_attr)
|
||||
row += 1
|
||||
|
||||
if row < rows - 1:
|
||||
_addstr_safe(screen, row, 0, sep)
|
||||
row += 1
|
||||
|
||||
# Compute how many rows the bottom order panel needs.
|
||||
# Cap the visible selected list to keep the filter list legible.
|
||||
order_rows = min(len(selected), max(1, (rows - row) // 3)) if selected else 0
|
||||
# Bottom reserved: sep + order_rows + sep + help = order_rows + 3
|
||||
bottom_reserved = order_rows + 3
|
||||
|
||||
list_start = row
|
||||
list_rows = rows - list_start - bottom_reserved
|
||||
if list_rows < 1:
|
||||
list_rows = 1
|
||||
|
||||
selected_set = set(selected)
|
||||
filter_dim = focus == "order"
|
||||
scroll = max(0, cursor - list_rows + 1)
|
||||
visible = filtered[scroll: scroll + list_rows]
|
||||
|
||||
for idx, item in enumerate(visible):
|
||||
abs_idx = scroll + idx
|
||||
mark = "[*]" if item in selected_set else "[ ]"
|
||||
prefix = "> " if (abs_idx == cursor and focus == "filter") else " "
|
||||
line = (prefix + mark + " " + item)[:cols - 1]
|
||||
item_attr = curses.A_DIM if filter_dim else (
|
||||
curses.A_REVERSE if abs_idx == cursor else curses.A_NORMAL
|
||||
)
|
||||
if row < rows - bottom_reserved:
|
||||
_addstr_safe(screen, row, 0, line, item_attr)
|
||||
row += 1
|
||||
|
||||
# Separator before the order panel.
|
||||
if row < rows - (order_rows + 2):
|
||||
_addstr_safe(screen, row, 0, sep)
|
||||
row += 1
|
||||
|
||||
# Order panel.
|
||||
order_scroll = max(0, order_cursor - order_rows + 1)
|
||||
order_visible = selected[order_scroll: order_scroll + order_rows]
|
||||
for idx, item in enumerate(order_visible):
|
||||
abs_idx = order_scroll + idx
|
||||
is_active = focus == "order" and abs_idx == order_cursor
|
||||
prefix = "> " if is_active else " "
|
||||
line = (prefix + item)[:cols - 1]
|
||||
attr = curses.A_REVERSE if is_active else curses.A_NORMAL
|
||||
if row < rows - 2:
|
||||
_addstr_safe(screen, row, 0, line, attr)
|
||||
row += 1
|
||||
|
||||
if row < rows - 1:
|
||||
_addstr_safe(screen, row, 0, sep)
|
||||
row += 1
|
||||
|
||||
if focus == "filter":
|
||||
help_line = "[↑↓/jk] move [Space] toggle [Enter] confirm [Tab] reorder [Esc/q] cancel"
|
||||
else:
|
||||
help_line = "[↑↓/jk] cursor [K/J] reorder [Space/Enter] remove [Tab] back [Ctrl-D] done"
|
||||
if row < rows:
|
||||
_addstr_safe(screen, min(rows - 1, row), 0, help_line[:cols - 1])
|
||||
|
||||
screen.refresh()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# name_color_modal — two-step label + color picker
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@@ -21,7 +21,7 @@ FROM node:22-slim
|
||||
# to it) works against egress's bumped TLS without the agent needing
|
||||
# local DNS.
|
||||
RUN apt-get update \
|
||||
&& apt-get install -y --no-install-recommends git ca-certificates curl \
|
||||
&& apt-get install -y --no-install-recommends git ca-certificates curl ripgrep \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# App-specific deps. Python isn't required by claude-code itself
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
FROM node:22-slim
|
||||
|
||||
RUN apt-get update \
|
||||
&& apt-get install -y --no-install-recommends git ca-certificates curl procps \
|
||||
&& apt-get install -y --no-install-recommends git ca-certificates curl procps ripgrep \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# App-specific deps. Python isn't required by codex itself
|
||||
|
||||
@@ -21,6 +21,11 @@ from pathlib import Path
|
||||
|
||||
from ...deploy_key_provisioner import DeployKeyCollisionError, DeployKeyProvisioner
|
||||
|
||||
# Timeout for ssh-keygen and Gitea API HTTP calls. A hung Gitea instance at
|
||||
# prepare time would stall bottle launch indefinitely without this bound.
|
||||
_API_TIMEOUT_SECS = 30
|
||||
_KEYGEN_TIMEOUT_SECS = 10
|
||||
|
||||
|
||||
class GiteaDeployKeyProvisioner(DeployKeyProvisioner):
|
||||
"""Manages deploy keys on a Gitea instance."""
|
||||
@@ -46,6 +51,7 @@ class GiteaDeployKeyProvisioner(DeployKeyProvisioner):
|
||||
check=True,
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL,
|
||||
timeout=_KEYGEN_TIMEOUT_SECS,
|
||||
)
|
||||
private_key = key_path.read_bytes()
|
||||
public_key = key_path.with_suffix(".pub").read_text().strip()
|
||||
@@ -67,7 +73,7 @@ class GiteaDeployKeyProvisioner(DeployKeyProvisioner):
|
||||
method="POST",
|
||||
)
|
||||
try:
|
||||
with urllib.request.urlopen(req) as resp:
|
||||
with urllib.request.urlopen(req, timeout=_API_TIMEOUT_SECS) as resp:
|
||||
body = json.loads(resp.read())
|
||||
except urllib.error.HTTPError as exc:
|
||||
_body = _read_error_body(exc)
|
||||
@@ -98,7 +104,7 @@ class GiteaDeployKeyProvisioner(DeployKeyProvisioner):
|
||||
method="DELETE",
|
||||
)
|
||||
try:
|
||||
with urllib.request.urlopen(req):
|
||||
with urllib.request.urlopen(req, timeout=_API_TIMEOUT_SECS):
|
||||
pass
|
||||
except urllib.error.HTTPError as exc:
|
||||
if exc.code == 404:
|
||||
|
||||
+21
-10
@@ -210,6 +210,17 @@ def egress_token_env_map(
|
||||
return out
|
||||
|
||||
|
||||
def _yaml_str_escape(s: str) -> str:
|
||||
"""Escape a string for use inside a YAML double-quoted scalar."""
|
||||
return (
|
||||
s.replace("\\", "\\\\")
|
||||
.replace('"', '\\"')
|
||||
.replace("\n", "\\n")
|
||||
.replace("\r", "\\r")
|
||||
.replace("\t", "\\t")
|
||||
)
|
||||
|
||||
|
||||
def _route_to_yaml_fields(r: Route) -> dict[str, object]:
|
||||
fields: dict[str, object] = {"host": r.host}
|
||||
if r.auth_scheme and r.token_env:
|
||||
@@ -272,12 +283,12 @@ def _render_match_entry(entry: dict[str, object]) -> list[str]:
|
||||
for pd in entry["paths"]: # type: ignore[union-attr]
|
||||
pd_dict: dict[str, str] = pd # type: ignore[assignment]
|
||||
if "type" in pd_dict:
|
||||
lines.append(f' - type: "{pd_dict["type"]}"')
|
||||
lines.append(f' value: "{pd_dict["value"]}"')
|
||||
lines.append(f' - type: "{_yaml_str_escape(pd_dict["type"])}"')
|
||||
lines.append(f' value: "{_yaml_str_escape(pd_dict["value"])}"')
|
||||
else:
|
||||
lines.append(f' - value: "{pd_dict["value"]}"')
|
||||
lines.append(f' - value: "{_yaml_str_escape(pd_dict["value"])}"')
|
||||
if "methods" in entry:
|
||||
methods_str = ", ".join(f'"{m}"' for m in entry["methods"]) # type: ignore[union-attr]
|
||||
methods_str = ", ".join(f'"{_yaml_str_escape(m)}"' for m in entry["methods"]) # type: ignore[union-attr]
|
||||
prefix = " - " if first_key else " "
|
||||
lines.append(f'{prefix}methods: [{methods_str}]')
|
||||
first_key = False
|
||||
@@ -287,8 +298,8 @@ def _render_match_entry(entry: dict[str, object]) -> list[str]:
|
||||
first_key = False
|
||||
for hd in entry["headers"]: # type: ignore[union-attr]
|
||||
hd_dict: dict[str, str] = hd # type: ignore[assignment]
|
||||
lines.append(f' - name: "{hd_dict["name"]}"')
|
||||
lines.append(f' value: "{hd_dict["value"]}"')
|
||||
lines.append(f' - name: "{_yaml_str_escape(hd_dict["name"])}"')
|
||||
lines.append(f' value: "{_yaml_str_escape(hd_dict["value"])}"')
|
||||
if first_key:
|
||||
lines.append(" - {}")
|
||||
return lines
|
||||
@@ -308,10 +319,10 @@ def egress_render_routes(
|
||||
return "\n".join(lines) + "\n"
|
||||
for r in routes:
|
||||
f = _route_to_yaml_fields(r)
|
||||
lines.append(f' - host: "{f["host"]}"')
|
||||
lines.append(f' - host: "{_yaml_str_escape(str(f["host"]))}"')
|
||||
if "auth_scheme" in f:
|
||||
lines.append(f' auth_scheme: "{f["auth_scheme"]}"')
|
||||
lines.append(f' token_env: "{f["token_env"]}"')
|
||||
lines.append(f' auth_scheme: "{_yaml_str_escape(str(f["auth_scheme"]))}"')
|
||||
lines.append(f' token_env: "{_yaml_str_escape(str(f["token_env"]))}"')
|
||||
if "matches" in f:
|
||||
lines.append(" matches:")
|
||||
for entry in f["matches"]: # type: ignore[union-attr]
|
||||
@@ -331,7 +342,7 @@ def egress_render_routes(
|
||||
items_str = ", ".join(f'"{x}"' for x in dv)
|
||||
lines.append(f" {dk}: [{items_str}]")
|
||||
elif isinstance(dv, str):
|
||||
lines.append(f' {dk}: "{dv}"')
|
||||
lines.append(f' {dk}: "{_yaml_str_escape(dv)}"')
|
||||
return "\n".join(lines) + "\n"
|
||||
|
||||
|
||||
|
||||
+17
-6
@@ -43,10 +43,10 @@ from .manifest import ManifestBottle, ManifestGitEntry
|
||||
# Short network alias for git-gate inside the sidecar bundle. The
|
||||
# agent's `.gitconfig` insteadOf rewrites resolve through this name.
|
||||
GIT_GATE_HOSTNAME = "git-gate"
|
||||
# Bound half-open git client sessions. If an agent/tool runner is
|
||||
# interrupted during push, git daemon should reap the receive-pack
|
||||
# child instead of keeping the gate wedged indefinitely.
|
||||
GIT_GATE_DAEMON_TIMEOUT_SECS = 15
|
||||
# Shared timeout (seconds) for all git-gate subprocess and CGI calls:
|
||||
# git daemon (--timeout/--init-timeout), the access-hook subprocess in
|
||||
# git_http_backend, and the git http-backend CGI subprocess.
|
||||
GIT_GATE_TIMEOUT_SECS = 15
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
@@ -112,6 +112,15 @@ def git_gate_upstreams_for_bottle(bottle: ManifestBottle) -> tuple[GitGateUpstre
|
||||
)
|
||||
|
||||
|
||||
def _gitconfig_validate_value(field: str, value: str) -> None:
|
||||
"""Raise ValueError if value contains characters that break gitconfig line syntax."""
|
||||
if "\n" in value or "\r" in value:
|
||||
raise ValueError(
|
||||
f"git-gate: {field} contains a newline, which would inject "
|
||||
f"arbitrary gitconfig keys; rejecting manifest entry"
|
||||
)
|
||||
|
||||
|
||||
def git_gate_render_gitconfig(
|
||||
entries: tuple[ManifestGitEntry, ...], gate_host: str, *, scheme: str = "git",
|
||||
) -> str:
|
||||
@@ -136,6 +145,7 @@ def git_gate_render_gitconfig(
|
||||
"# fetch-from-upstream-before-every-upload-pack via access-hook).\n",
|
||||
]
|
||||
for entry in entries:
|
||||
_gitconfig_validate_value(f"repos[{entry.Name!r}].url", entry.Upstream)
|
||||
out.append(f'[url "{scheme}://{gate_host}/{entry.Name}.git"]\n')
|
||||
out.append(f"\tinsteadOf = {entry.Upstream}\n")
|
||||
if entry.RemoteKey and entry.RemoteKey != entry.UpstreamHost:
|
||||
@@ -148,6 +158,7 @@ def git_gate_render_gitconfig(
|
||||
f"ssh://{entry.UpstreamUser}@{entry.RemoteKey}{port}/"
|
||||
f"{entry.UpstreamPath}"
|
||||
)
|
||||
_gitconfig_validate_value(f"repos[{entry.Name!r}].url (resolved alias)", alias)
|
||||
out.append(f"\tinsteadOf = {alias}\n")
|
||||
return "".join(out)
|
||||
|
||||
@@ -217,8 +228,8 @@ def git_gate_render_entrypoint(upstreams: tuple[GitGateUpstream, ...]) -> str:
|
||||
"",
|
||||
"exec git daemon \\",
|
||||
" --reuseaddr \\",
|
||||
f" --timeout={GIT_GATE_DAEMON_TIMEOUT_SECS} \\",
|
||||
f" --init-timeout={GIT_GATE_DAEMON_TIMEOUT_SECS} \\",
|
||||
f" --timeout={GIT_GATE_TIMEOUT_SECS} \\",
|
||||
f" --init-timeout={GIT_GATE_TIMEOUT_SECS} \\",
|
||||
" --base-path=/git \\",
|
||||
" --export-all \\",
|
||||
" --enable=receive-pack \\",
|
||||
|
||||
@@ -16,6 +16,8 @@ from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
||||
from pathlib import Path
|
||||
from urllib.parse import urlsplit
|
||||
|
||||
from .git_gate import GIT_GATE_TIMEOUT_SECS
|
||||
|
||||
|
||||
DEFAULT_PORT = 9420
|
||||
|
||||
@@ -47,6 +49,7 @@ class GitHttpHandler(BaseHTTPRequestHandler):
|
||||
[hook_path, "upload-pack", str(repo_dir), peer, peer],
|
||||
capture_output=True,
|
||||
check=False,
|
||||
timeout=GIT_GATE_TIMEOUT_SECS,
|
||||
)
|
||||
if hook.returncode != 0:
|
||||
detail = (hook.stderr or hook.stdout).decode(
|
||||
@@ -110,6 +113,7 @@ class GitHttpHandler(BaseHTTPRequestHandler):
|
||||
env=env,
|
||||
capture_output=True,
|
||||
check=False,
|
||||
timeout=GIT_GATE_TIMEOUT_SECS,
|
||||
)
|
||||
self._write_cgi_response(proc.stdout)
|
||||
|
||||
@@ -148,7 +152,13 @@ class GitHttpHandler(BaseHTTPRequestHandler):
|
||||
key, _, value = line.decode("latin1").partition(":")
|
||||
value = value.strip()
|
||||
if key.lower() == "status":
|
||||
status = int(value.split()[0])
|
||||
try:
|
||||
status = int(value.split()[0])
|
||||
except (ValueError, IndexError):
|
||||
self.log_message(
|
||||
"malformed CGI Status header %r; using 500", value,
|
||||
)
|
||||
status = 500
|
||||
else:
|
||||
headers.append((key, value))
|
||||
self.send_response(status)
|
||||
|
||||
+103
-14
@@ -215,6 +215,65 @@ def _merge_git_user(
|
||||
)
|
||||
|
||||
|
||||
def _resolve_effective_bottle_eager(
|
||||
agent_name: str,
|
||||
agent: "ManifestAgent",
|
||||
bottle_names: "tuple[str, ...]",
|
||||
bottles: "Mapping[str, ManifestBottle]",
|
||||
) -> "ManifestBottle":
|
||||
"""Return the effective ManifestBottle for the eager (from_json_obj) path.
|
||||
|
||||
When bottle_names is non-empty they are merged in order. When empty, falls
|
||||
back to agent.bottle. Raises ManifestError when neither is set."""
|
||||
from .manifest_extends import merge_bottles_runtime
|
||||
|
||||
if bottle_names:
|
||||
resolved: list[ManifestBottle] = []
|
||||
for bn in bottle_names:
|
||||
if bn not in bottles:
|
||||
available = ", ".join(sorted(bottles.keys())) or "(none)"
|
||||
raise ManifestError(
|
||||
f"bottle '{bn}' not defined. Available: {available}"
|
||||
)
|
||||
resolved.append(bottles[bn])
|
||||
return merge_bottles_runtime(resolved)
|
||||
|
||||
if not agent.bottle:
|
||||
raise ManifestError(
|
||||
f"agent '{agent_name}' has no 'bottle' field and no bottles were "
|
||||
f"selected at launch. Select at least one bottle or add "
|
||||
f"'bottle: <name>' to the agent manifest."
|
||||
)
|
||||
return bottles[agent.bottle]
|
||||
|
||||
|
||||
def _resolve_effective_bottle_lazy(
|
||||
agent_name: str,
|
||||
agent_bottle: str,
|
||||
bottle_names: "tuple[str, ...]",
|
||||
bottles_dir: "Path",
|
||||
) -> "ManifestBottle":
|
||||
"""Return the effective ManifestBottle for the lazy (from_md_dirs) path.
|
||||
|
||||
When bottle_names is non-empty they are resolved from disk and merged in
|
||||
order. When empty, falls back to agent_bottle. Raises ManifestError when
|
||||
neither is set."""
|
||||
from .manifest_extends import merge_bottles_runtime
|
||||
from .manifest_loader import load_bottle_chain_from_dir
|
||||
|
||||
if bottle_names:
|
||||
resolved = [load_bottle_chain_from_dir(bn, bottles_dir) for bn in bottle_names]
|
||||
return merge_bottles_runtime(resolved)
|
||||
|
||||
if not agent_bottle:
|
||||
raise ManifestError(
|
||||
f"agent '{agent_name}' has no 'bottle' field and no bottles were "
|
||||
f"selected at launch. Select at least one bottle or add "
|
||||
f"'bottle: <name>' to the agent manifest."
|
||||
)
|
||||
return load_bottle_chain_from_dir(agent_bottle, bottles_dir)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Manifest:
|
||||
"""Single-agent/bottle value type. Returned by ManifestIndex.load_for_agent().
|
||||
@@ -360,6 +419,18 @@ class ManifestIndex:
|
||||
}
|
||||
return cls(bottles=bottles, agents=agents)
|
||||
|
||||
@property
|
||||
def all_bottle_names(self) -> list[str]:
|
||||
"""Sorted list of all discoverable bottle names.
|
||||
|
||||
In names-only mode (from resolve/from_md_dirs) this scans bottle
|
||||
filenames without reading their content. In eager mode (from
|
||||
from_json_obj) it returns the pre-parsed bottles' names."""
|
||||
if self.home_md is not None:
|
||||
from .manifest_loader import scan_bottle_names
|
||||
return scan_bottle_names(self.home_md / "bottles")
|
||||
return sorted(self.bottles.keys())
|
||||
|
||||
@property
|
||||
def all_agent_names(self) -> list[str]:
|
||||
"""Sorted list of all discoverable agent names.
|
||||
@@ -376,9 +447,18 @@ class ManifestIndex:
|
||||
return sorted(home_names | cwd_names)
|
||||
return sorted(self.agents.keys())
|
||||
|
||||
def load_for_agent(self, agent_name: str) -> "Manifest":
|
||||
def load_for_agent(
|
||||
self,
|
||||
agent_name: str,
|
||||
bottle_names: "tuple[str, ...] | None" = None,
|
||||
) -> "Manifest":
|
||||
"""Parse the named agent and its bottle; return a single-value Manifest.
|
||||
|
||||
`bottle_names` is an ordered list of bottles selected at launch time.
|
||||
When non-empty they are resolved and merged in order (index 0 = base;
|
||||
later entries override). When empty or None, falls back to the agent's
|
||||
own `bottle:` field. Raises ManifestError when neither is set.
|
||||
|
||||
In lazy mode (from resolve/from_md_dirs) the agent file and its
|
||||
bottle chain are read from disk for the first time here. In eager
|
||||
mode (from_json_obj) the data is already parsed; this just filters
|
||||
@@ -389,6 +469,8 @@ class ManifestIndex:
|
||||
|
||||
Always raises ManifestError if the agent is unknown or invalid.
|
||||
Backends call this at preflight inside _validate."""
|
||||
effective_bottle_names: tuple[str, ...] = bottle_names or ()
|
||||
|
||||
if self.home_md is None:
|
||||
# Eager manifest (from_json_obj): data already parsed; filter to
|
||||
# the one requested agent and its bottle so the returned Manifest
|
||||
@@ -399,12 +481,14 @@ class ManifestIndex:
|
||||
f"agent '{agent_name}' not defined. Available: {available}"
|
||||
)
|
||||
agent = self.agents[agent_name]
|
||||
raw_bottle = self.bottles[agent.bottle]
|
||||
raw_bottle = _resolve_effective_bottle_eager(
|
||||
agent_name, agent, effective_bottle_names, self.bottles
|
||||
)
|
||||
merged = _merge_git_user(agent.git_user, raw_bottle.git_user)
|
||||
bottle = raw_bottle if merged == raw_bottle.git_user else replace(raw_bottle, git_user=merged)
|
||||
return Manifest(agent=agent, bottle=bottle)
|
||||
|
||||
from .manifest_loader import load_bottle_chain_from_dir, scan_agent_names
|
||||
from .manifest_loader import scan_agent_names
|
||||
from .manifest_schema import validate_agent_frontmatter_keys
|
||||
from .yaml_subset import YamlSubsetError, parse_frontmatter
|
||||
|
||||
@@ -431,26 +515,31 @@ class ManifestIndex:
|
||||
|
||||
validate_agent_frontmatter_keys(agent_path, fm.keys())
|
||||
|
||||
bottle_name = fm.get("bottle")
|
||||
if not isinstance(bottle_name, str) or not bottle_name:
|
||||
raise ManifestError(
|
||||
f"agent '{agent_name}' must declare a 'bottle' field "
|
||||
f"naming a defined bottle"
|
||||
)
|
||||
|
||||
# Load the bottle chain (may raise ManifestError).
|
||||
# Determine the effective bottle name(s).
|
||||
agent_bottle = fm.get("bottle") or ""
|
||||
bottles_dir = self.home_md / "bottles"
|
||||
raw_bottle = load_bottle_chain_from_dir(bottle_name, bottles_dir)
|
||||
raw_bottle = _resolve_effective_bottle_lazy(
|
||||
agent_name, str(agent_bottle), effective_bottle_names, bottles_dir
|
||||
)
|
||||
effective_bottle_name = (
|
||||
effective_bottle_names[-1] if effective_bottle_names
|
||||
else str(agent_bottle)
|
||||
)
|
||||
|
||||
# Build and validate the full ManifestAgent.
|
||||
agent_dict: dict[str, object] = {
|
||||
"bottle": bottle_name,
|
||||
"skills": fm.get("skills", []),
|
||||
"prompt": body.strip(),
|
||||
}
|
||||
if agent_bottle:
|
||||
agent_dict["bottle"] = agent_bottle
|
||||
if "git-gate" in fm:
|
||||
agent_dict["git-gate"] = fm["git-gate"]
|
||||
agent = ManifestAgent.from_dict(agent_name, agent_dict, {bottle_name})
|
||||
# Pass the effective bottle name as the known-bottles set so agents
|
||||
# that have bottle: set are validated; agents without bottle: pass {}
|
||||
# since bottle_names were already resolved above.
|
||||
known = {effective_bottle_name} if effective_bottle_name else set()
|
||||
agent = ManifestAgent.from_dict(agent_name, agent_dict, known)
|
||||
|
||||
merged_user = _merge_git_user(agent.git_user, raw_bottle.git_user)
|
||||
bottle = raw_bottle if merged_user == raw_bottle.git_user else replace(raw_bottle, git_user=merged_user)
|
||||
|
||||
@@ -109,7 +109,8 @@ class ManifestAgentProvider:
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ManifestAgent:
|
||||
bottle: str
|
||||
# Optional: when empty the operator selects bottles at launch time.
|
||||
bottle: str = ""
|
||||
skills: tuple[str, ...] = ()
|
||||
prompt: str = ""
|
||||
# Per-agent git identity (issue #94). Overlays the referenced
|
||||
@@ -129,18 +130,20 @@ class ManifestAgent:
|
||||
f"allowed keys are {allowed}."
|
||||
)
|
||||
|
||||
bottle = d.get("bottle")
|
||||
if not isinstance(bottle, str) or not bottle:
|
||||
raise ManifestError(
|
||||
f"agent '{name}' must declare a 'bottle' field naming a "
|
||||
f"defined bottle"
|
||||
)
|
||||
if bottle not in bottle_names:
|
||||
available = ", ".join(sorted(bottle_names)) or "(none defined)"
|
||||
raise ManifestError(
|
||||
f"agent '{name}' references bottle '{bottle}', which is not defined. "
|
||||
f"Available: {available}"
|
||||
)
|
||||
bottle_raw = d.get("bottle")
|
||||
bottle = ""
|
||||
if bottle_raw is not None:
|
||||
if not isinstance(bottle_raw, str) or not bottle_raw:
|
||||
raise ManifestError(
|
||||
f"agent '{name}' bottle must be a non-empty string when declared"
|
||||
)
|
||||
if bottle_raw not in bottle_names:
|
||||
available = ", ".join(sorted(bottle_names)) or "(none defined)"
|
||||
raise ManifestError(
|
||||
f"agent '{name}' references bottle '{bottle_raw}', which is not defined. "
|
||||
f"Available: {available}"
|
||||
)
|
||||
bottle = bottle_raw
|
||||
|
||||
skills: tuple[str, ...] = ()
|
||||
skills_raw = d.get("skills")
|
||||
|
||||
@@ -9,6 +9,58 @@ if TYPE_CHECKING:
|
||||
from .manifest_egress import ManifestEgressConfig
|
||||
|
||||
|
||||
def merge_bottles_runtime(bottles: "list[ManifestBottle]") -> "ManifestBottle":
|
||||
"""Merge an ordered list of pre-resolved ManifestBottle objects.
|
||||
|
||||
Index 0 is the base; each subsequent entry is applied on top using
|
||||
the same field-merge rules as the file-based extends machinery:
|
||||
env: dict merge, later wins; git_user: per-field overlay, later
|
||||
wins on non-empty; git (repos): union by name, later wins; egress
|
||||
routes: concatenate; agent_provider, supervise: later replaces.
|
||||
"""
|
||||
if not bottles:
|
||||
raise ValueError("merge_bottles_runtime requires at least one bottle")
|
||||
result = bottles[0]
|
||||
for override in bottles[1:]:
|
||||
result = _merge_two_bottles_runtime(result, override)
|
||||
return result
|
||||
|
||||
|
||||
def _merge_two_bottles_runtime(base: "ManifestBottle", override: "ManifestBottle") -> "ManifestBottle":
|
||||
from .manifest import ManifestBottle, ManifestGitUser
|
||||
from .manifest_egress import ManifestEgressConfig
|
||||
|
||||
merged_env = {**base.env, **override.env}
|
||||
|
||||
merged_git_user = ManifestGitUser(
|
||||
name=override.git_user.name or base.git_user.name,
|
||||
email=override.git_user.email or base.git_user.email,
|
||||
)
|
||||
|
||||
# git repos: union keyed by Name, override wins per-name.
|
||||
base_repos_by_name = {entry.Name: entry for entry in base.git}
|
||||
override_repos_by_name = {entry.Name: entry for entry in override.git}
|
||||
merged_repos_names = list(base_repos_by_name) + [
|
||||
n for n in override_repos_by_name if n not in base_repos_by_name
|
||||
]
|
||||
merged_git = tuple(
|
||||
override_repos_by_name.get(n, base_repos_by_name[n])
|
||||
for n in merged_repos_names
|
||||
)
|
||||
|
||||
merged_routes = base.egress.routes + override.egress.routes
|
||||
merged_egress = ManifestEgressConfig(routes=merged_routes, Log=override.egress.Log)
|
||||
|
||||
return ManifestBottle(
|
||||
env=merged_env,
|
||||
agent_provider=override.agent_provider,
|
||||
git=merged_git,
|
||||
git_user=merged_git_user,
|
||||
egress=merged_egress,
|
||||
supervise=override.supervise,
|
||||
)
|
||||
|
||||
|
||||
def resolve_bottles(raws: dict[str, dict[str, object]]) -> dict[str, ManifestBottle]:
|
||||
"""Apply `extends:` chains and return resolved ManifestBottle objects."""
|
||||
cache: dict[str, ManifestBottle] = {}
|
||||
|
||||
@@ -32,6 +32,25 @@ def check_stale_json(dir_path: Path, md_dir: Path, label: str) -> None:
|
||||
)
|
||||
|
||||
|
||||
def scan_bottle_names(bottles_dir: Path) -> list[str]:
|
||||
"""Scan `<bottles_dir>/*.md` for valid filenames and return sorted bottle names.
|
||||
|
||||
No file content is read. Invalid filenames are skipped with a warning."""
|
||||
result: list[str] = []
|
||||
if not bottles_dir.is_dir():
|
||||
return result
|
||||
for path in sorted(bottles_dir.glob("*.md")):
|
||||
name = entity_name_from_path(path)
|
||||
if name is None:
|
||||
warn(
|
||||
f"skipping {path}: filename must match "
|
||||
f"[a-z][a-z0-9-]*.md (got {path.name!r})"
|
||||
)
|
||||
continue
|
||||
result.append(name)
|
||||
return result
|
||||
|
||||
|
||||
def scan_agent_names(agents_dir: Path) -> dict[str, Path]:
|
||||
"""Scan `<agents_dir>/*.md` for valid filenames and return `{name: path}`.
|
||||
|
||||
|
||||
@@ -18,8 +18,8 @@ _FILENAME_RX = re.compile(r"^[a-z][a-z0-9-]*$")
|
||||
BOTTLE_KEYS = frozenset(
|
||||
{"env", "extends", "agent_provider", "git-gate", "egress", "supervise"}
|
||||
)
|
||||
AGENT_KEYS_REQUIRED = frozenset({"bottle"})
|
||||
AGENT_KEYS_OPTIONAL = frozenset({"skills", "git-gate"})
|
||||
AGENT_KEYS_REQUIRED: frozenset[str] = frozenset()
|
||||
AGENT_KEYS_OPTIONAL = frozenset({"bottle", "skills", "git-gate"})
|
||||
|
||||
# Claude Code subagent fields bot-bottle ignores at launch but does
|
||||
# not reject. This lets the same file double as
|
||||
|
||||
@@ -90,19 +90,19 @@ def parse_jsonrpc(body: bytes) -> JsonRpcRequest:
|
||||
try:
|
||||
raw = json.loads(body)
|
||||
except json.JSONDecodeError as e:
|
||||
raise _RpcError(ERR_PARSE, f"parse error: {e}") from e
|
||||
raise _RpcClientError(ERR_PARSE, f"parse error: {e}") from e
|
||||
if not isinstance(raw, dict):
|
||||
raise _RpcError(ERR_INVALID_REQUEST, "request must be a JSON object")
|
||||
raise _RpcClientError(ERR_INVALID_REQUEST, "request must be a JSON object")
|
||||
if raw.get("jsonrpc") != JSONRPC_VERSION:
|
||||
raise _RpcError(ERR_INVALID_REQUEST, "jsonrpc field must be '2.0'")
|
||||
raise _RpcClientError(ERR_INVALID_REQUEST, "jsonrpc field must be '2.0'")
|
||||
method = raw.get("method")
|
||||
if not isinstance(method, str):
|
||||
raise _RpcError(ERR_INVALID_REQUEST, "method must be a string")
|
||||
raise _RpcClientError(ERR_INVALID_REQUEST, "method must be a string")
|
||||
params = raw.get("params", {})
|
||||
if params is None:
|
||||
params = {}
|
||||
if not isinstance(params, dict):
|
||||
raise _RpcError(ERR_INVALID_PARAMS, "params must be an object")
|
||||
raise _RpcClientError(ERR_INVALID_PARAMS, "params must be an object")
|
||||
rpc_id = raw.get("id", _NO_ID)
|
||||
is_notification = rpc_id is _NO_ID
|
||||
return JsonRpcRequest(
|
||||
@@ -117,12 +117,23 @@ _NO_ID = object()
|
||||
|
||||
|
||||
class _RpcError(Exception):
|
||||
"""Base class for all typed RPC errors that surface as JSON-RPC error responses."""
|
||||
def __init__(self, code: int, message: str):
|
||||
super().__init__(message)
|
||||
self.code = code
|
||||
self.message = message
|
||||
|
||||
|
||||
class _RpcClientError(_RpcError):
|
||||
"""Caller sent a bad request; returned verbatim, no server-side logging."""
|
||||
|
||||
|
||||
class _RpcInternalError(_RpcError):
|
||||
"""Server-side fault; logged at ERROR with cause, always returns ERR_INTERNAL."""
|
||||
def __init__(self, message: str) -> None:
|
||||
super().__init__(ERR_INTERNAL, message)
|
||||
|
||||
|
||||
def jsonrpc_result(request_id: object, result: object) -> bytes:
|
||||
payload = {"jsonrpc": JSONRPC_VERSION, "id": request_id, "result": result}
|
||||
return (json.dumps(payload) + "\n").encode("utf-8")
|
||||
@@ -290,7 +301,7 @@ def validate_proposed_file(tool: str, content: str) -> None:
|
||||
catches obvious paste-errors / wrong-tool selections before they
|
||||
enter the queue."""
|
||||
if not content.strip():
|
||||
raise _RpcError(ERR_INVALID_PARAMS, f"{tool}: proposed file is empty")
|
||||
raise _RpcClientError(ERR_INVALID_PARAMS, f"{tool}: proposed file is empty")
|
||||
if tool == _sv.TOOL_CAPABILITY_BLOCK:
|
||||
# Dockerfiles are too varied to validate syntactically beyond
|
||||
# non-empty. The operator reads the diff in the TUI.
|
||||
@@ -299,17 +310,17 @@ def validate_proposed_file(tool: str, content: str) -> None:
|
||||
try:
|
||||
config = load_config(content)
|
||||
except ValueError as e:
|
||||
raise _RpcError(
|
||||
raise _RpcClientError(
|
||||
ERR_INVALID_PARAMS,
|
||||
f"{tool}: proposed routes.yaml is not valid: {e}",
|
||||
) from e
|
||||
if config.log != LOG_OFF:
|
||||
raise _RpcError(
|
||||
raise _RpcClientError(
|
||||
ERR_INVALID_PARAMS,
|
||||
f"{tool}: proposed routes.yaml must not change egress logging",
|
||||
)
|
||||
else:
|
||||
raise _RpcError(ERR_INVALID_PARAMS, f"unknown tool {tool!r}")
|
||||
raise _RpcClientError(ERR_INVALID_PARAMS, f"unknown tool {tool!r}")
|
||||
|
||||
|
||||
# --- MCP handlers ----------------------------------------------------------
|
||||
@@ -382,17 +393,17 @@ def handle_tools_call(
|
||||
doesn't need operator approval."""
|
||||
name = params.get("name")
|
||||
if not isinstance(name, str):
|
||||
raise _RpcError(ERR_INVALID_PARAMS, "tools/call missing 'name'")
|
||||
raise _RpcClientError(ERR_INVALID_PARAMS, "tools/call missing 'name'")
|
||||
if name == _sv.TOOL_LIST_EGRESS_ROUTES:
|
||||
return handle_list_egress_routes(typing.cast(dict[str, object], params.get("arguments", {})), config)
|
||||
|
||||
args_raw = params.get("arguments", {})
|
||||
if not isinstance(args_raw, dict):
|
||||
raise _RpcError(ERR_INVALID_PARAMS, "tools/call 'arguments' must be an object")
|
||||
raise _RpcClientError(ERR_INVALID_PARAMS, "tools/call 'arguments' must be an object")
|
||||
|
||||
justification = args_raw.get("justification")
|
||||
if not isinstance(justification, str) or not justification.strip():
|
||||
raise _RpcError(
|
||||
raise _RpcClientError(
|
||||
ERR_INVALID_PARAMS,
|
||||
f"{name}: 'justification' is required and must be a non-empty string",
|
||||
)
|
||||
@@ -401,13 +412,13 @@ def handle_tools_call(
|
||||
file_field = PROPOSED_FILE_FIELD[name]
|
||||
proposed_file = args_raw.get(file_field)
|
||||
if not isinstance(proposed_file, str):
|
||||
raise _RpcError(
|
||||
raise _RpcClientError(
|
||||
ERR_INVALID_PARAMS,
|
||||
f"{name}: '{file_field}' is required and must be a string",
|
||||
)
|
||||
validate_proposed_file(name, proposed_file)
|
||||
else:
|
||||
raise _RpcError(ERR_INVALID_PARAMS, f"unknown tool {name!r}")
|
||||
raise _RpcClientError(ERR_INVALID_PARAMS, f"unknown tool {name!r}")
|
||||
|
||||
proposal = _sv.Proposal.new(
|
||||
bottle_slug=config.bottle_slug,
|
||||
@@ -416,7 +427,10 @@ def handle_tools_call(
|
||||
justification=justification,
|
||||
current_file_hash=_sv.sha256_hex(proposed_file),
|
||||
)
|
||||
_sv.write_proposal(config.queue_dir, proposal)
|
||||
try:
|
||||
_sv.write_proposal(config.queue_dir, proposal)
|
||||
except OSError as e:
|
||||
raise _RpcInternalError(f"failed to write proposal to queue: {e}") from e
|
||||
sys.stderr.write(
|
||||
f"supervise: queued proposal {proposal.id} ({name}) "
|
||||
f"for bottle {config.bottle_slug}; waiting for operator...\n"
|
||||
@@ -436,7 +450,10 @@ def handle_tools_call(
|
||||
"content": [{"type": "text", "text": text}],
|
||||
"isError": False,
|
||||
}
|
||||
_sv.archive_proposal(config.queue_dir, proposal.id)
|
||||
try:
|
||||
_sv.archive_proposal(config.queue_dir, proposal.id)
|
||||
except OSError as e:
|
||||
raise _RpcInternalError(f"failed to archive proposal: {e}") from e
|
||||
|
||||
text = format_response_text(response)
|
||||
return {
|
||||
@@ -512,7 +529,7 @@ class MCPHandler(http.server.BaseHTTPRequestHandler):
|
||||
|
||||
try:
|
||||
req = parse_jsonrpc(body)
|
||||
except _RpcError as e:
|
||||
except _RpcClientError as e:
|
||||
self._write_jsonrpc(jsonrpc_error(None, e.code, e.message))
|
||||
return
|
||||
|
||||
@@ -520,11 +537,19 @@ class MCPHandler(http.server.BaseHTTPRequestHandler):
|
||||
|
||||
try:
|
||||
result = self._dispatch(req, config)
|
||||
except _RpcError as e:
|
||||
except _RpcClientError as e:
|
||||
self._write_jsonrpc(jsonrpc_error(req.id, e.code, e.message))
|
||||
return
|
||||
except Exception as e: # noqa: W0718 — catch-all for RPC dispatch errors
|
||||
sys.stderr.write(f"supervise: internal error: {e}\n")
|
||||
except _RpcInternalError as e:
|
||||
cause = e.__cause__
|
||||
detail = f": {cause}" if cause else ""
|
||||
sys.stderr.write(f"supervise: internal error: {e.message}{detail}\n")
|
||||
sys.stderr.flush()
|
||||
self._write_jsonrpc(jsonrpc_error(req.id, ERR_INTERNAL, "internal error"))
|
||||
return
|
||||
except Exception as e: # noqa: W0718 — unexpected errors
|
||||
sys.stderr.write(f"supervise: unexpected error: {type(e).__name__}: {e}\n")
|
||||
sys.stderr.flush()
|
||||
self._write_jsonrpc(jsonrpc_error(req.id, ERR_INTERNAL, "internal error"))
|
||||
return
|
||||
|
||||
@@ -543,7 +568,7 @@ class MCPHandler(http.server.BaseHTTPRequestHandler):
|
||||
return handle_tools_list(req.params)
|
||||
if method == "tools/call":
|
||||
return handle_tools_call(req.params, config)
|
||||
raise _RpcError(ERR_METHOD_NOT_FOUND, f"method not found: {method}")
|
||||
raise _RpcClientError(ERR_METHOD_NOT_FOUND, f"method not found: {method}")
|
||||
|
||||
def _write_jsonrpc(self, body: bytes) -> None:
|
||||
self.send_response(200)
|
||||
|
||||
@@ -0,0 +1,216 @@
|
||||
# PRD prd-new: Separate agent and bottle selection
|
||||
|
||||
- **Status:** Active
|
||||
- **Author:** claude
|
||||
- **Created:** 2026-06-25
|
||||
- **Issue:** #269
|
||||
|
||||
## Summary
|
||||
|
||||
Agents and bottles are two separate concerns: agents carry a system prompt and
|
||||
skills; bottles carry infrastructure configuration (egress, git-gate, env,
|
||||
agent provider). Today an agent's manifest file hard-codes a single `bottle:`
|
||||
reference, which prevents the same agent prompt from being reused across
|
||||
projects that need different bottle configurations. This PRD decouples them: at
|
||||
launch time, after choosing the agent, the operator picks an ordered list of
|
||||
bottles via a multi-select picker. The selected bottles are merged in order
|
||||
(later entries override earlier ones) to produce the effective bottle for the
|
||||
session.
|
||||
|
||||
## Problem
|
||||
|
||||
The current `bottle: <name>` field on an agent manifest file binds the agent
|
||||
permanently to one bottle. To use the same system prompt with a different bottle
|
||||
(e.g. `claude-implementer` at home vs. at a client site that needs a different
|
||||
egress policy), the operator must duplicate the agent file and change the
|
||||
`bottle:` field. Duplicate agent files drift out of sync.
|
||||
|
||||
## Goals / Success Criteria
|
||||
|
||||
1. `bottle:` in an agent's frontmatter becomes optional. Existing manifests with
|
||||
`bottle:` continue to work unchanged (backward compat).
|
||||
2. After selecting an agent (via the existing single-select picker), a new
|
||||
multi-select bottle picker appears showing all available bottles.
|
||||
3. The multi-select picker pre-populates with the agent's `bottle:` value when
|
||||
present.
|
||||
4. Confirming with one or more bottles selected uses those bottles, merged in
|
||||
selection order, as the effective bottle for the session.
|
||||
5. Confirming with an empty selection falls back to the agent's `bottle:` field.
|
||||
If neither is set, a ManifestError is raised pointing the operator at the fix.
|
||||
6. The ordered bottle list is stored in launch metadata so `./cli.py resume`
|
||||
uses the same bottles.
|
||||
7. The preflight summary (`y/N` screen) shows the effective bottle name(s).
|
||||
8. The multi-select picker supports incremental filtering, Space/Enter to toggle
|
||||
selection, an ordered "Selected: ..." summary line, Ctrl-D to confirm, and
|
||||
Esc/q to cancel the whole start operation.
|
||||
9. Unit tests cover: multi-select widget (filter, toggle, confirm, cancel),
|
||||
the `cmd_start` bottle-picker step, and the manifest `load_for_agent`
|
||||
runtime-bottle-merge path.
|
||||
|
||||
## Non-goals
|
||||
|
||||
- Reordering the selection list from within the picker (order = insertion order;
|
||||
drag-and-drop is out of scope).
|
||||
- Storing bottle selection history / MRU.
|
||||
- Changes to `./cli.py edit`, `./cli.py list`, or `./cli.py info`.
|
||||
- Removing the `bottle:` key from the agent schema (it stays, now optional).
|
||||
|
||||
## Design
|
||||
|
||||
### `bot_bottle/cli/tui.py` — `filter_multiselect`
|
||||
|
||||
```python
|
||||
def filter_multiselect(
|
||||
items: list[str],
|
||||
*,
|
||||
title: str = "",
|
||||
initial: list[str] | None = None,
|
||||
tty_path: str = "/dev/tty",
|
||||
) -> list[str] | None:
|
||||
"""Multi-select variant of filter_select.
|
||||
|
||||
Returns the ordered list of selected items, or None on cancel.
|
||||
Press Space/Enter to toggle the item under the cursor.
|
||||
Press Ctrl-D to confirm. Press Esc/q to cancel.
|
||||
"""
|
||||
```
|
||||
|
||||
Layout:
|
||||
|
||||
```
|
||||
Select bottles
|
||||
Filter: _
|
||||
─────────────────────────────────────────
|
||||
> [*] claude
|
||||
[ ] dev
|
||||
[ ] codex
|
||||
─────────────────────────────────────────
|
||||
Selected (in order): claude
|
||||
─────────────────────────────────────────
|
||||
[↑↓/jk] move [Space] toggle [Ctrl-D] done [Esc] cancel
|
||||
```
|
||||
|
||||
`initial` pre-populates the ordered selection. `None` means no pre-selection.
|
||||
Items added are appended in insertion order; items removed leave the remaining
|
||||
order unchanged.
|
||||
|
||||
### `bot_bottle/manifest_schema.py` — optional `bottle:`
|
||||
|
||||
`bottle` moves from `AGENT_KEYS_REQUIRED` to `AGENT_KEYS_OPTIONAL`.
|
||||
|
||||
### `bot_bottle/manifest_agent.py` — optional `bottle:`
|
||||
|
||||
`ManifestAgent.bottle` changes from `str` (required) to `str = ""`.
|
||||
`from_dict` no longer requires the key to be present; the bottle-exists
|
||||
validation is skipped when the key is absent.
|
||||
|
||||
### `bot_bottle/manifest_loader.py` — `scan_bottle_names`
|
||||
|
||||
```python
|
||||
def scan_bottle_names(bottles_dir: Path) -> list[str]:
|
||||
"""Scan <bottles_dir>/*.md and return sorted bottle names."""
|
||||
```
|
||||
|
||||
### `bot_bottle/manifest.py` — `ManifestIndex` changes
|
||||
|
||||
**`all_bottle_names` property** — analogous to `all_agent_names`; scans
|
||||
`home_md / "bottles"` in lazy mode, returns `sorted(self.bottles.keys())` in
|
||||
eager mode.
|
||||
|
||||
**`load_for_agent(agent_name, bottle_names: tuple[str, ...] = ())`** — new
|
||||
`bottle_names` parameter. When non-empty, the listed bottles are resolved and
|
||||
merged in order (index 0 is the base; each subsequent bottle is applied on top
|
||||
using the same field-merge rules as `extends:`). The result replaces the bottle
|
||||
that `agent.bottle` would have provided. When empty, falls back to `agent.bottle`.
|
||||
Raises ManifestError if neither `bottle_names` nor `agent.bottle` is set.
|
||||
|
||||
### `bot_bottle/manifest_extends.py` — `merge_bottles_runtime`
|
||||
|
||||
```python
|
||||
def merge_bottles_runtime(bottles: list[ManifestBottle]) -> ManifestBottle:
|
||||
"""Merge an ordered list of pre-resolved ManifestBottle objects.
|
||||
|
||||
Index 0 is the base; each subsequent entry overrides the previous using
|
||||
the same rules as the file-based extends machinery:
|
||||
- env: dict merge, later wins
|
||||
- git_user: per-field overlay, later wins on non-empty
|
||||
- git (repos): union by name, later wins per-name
|
||||
- egress.routes: concatenate
|
||||
- agent_provider, supervise: later bottle's value replaces earlier
|
||||
"""
|
||||
```
|
||||
|
||||
This function operates on already-parsed `ManifestBottle` objects, so it does
|
||||
not need to touch the raw-dict path.
|
||||
|
||||
### `bot_bottle/backend/__init__.py` — `BottleSpec` + `_validate`
|
||||
|
||||
`BottleSpec` gains `bottle_names: tuple[str, ...] = ()`.
|
||||
|
||||
`BottleBackend._validate` passes `spec.bottle_names` to `load_for_agent`:
|
||||
|
||||
```python
|
||||
manifest = spec.manifest.load_for_agent(spec.agent_name, spec.bottle_names)
|
||||
```
|
||||
|
||||
The preflight print updates `info(f"bottle: {agent.bottle}")` to display the
|
||||
effective bottle name(s). When `spec.bottle_names` is non-empty those are
|
||||
shown; when empty and `agent.bottle` is set, the agent's `bottle:` is shown.
|
||||
|
||||
### `bot_bottle/bottle_state.py` — persist bottle names
|
||||
|
||||
`BottleMetadata` gains `bottle_names: tuple[str, ...] = ()`. `read_metadata`
|
||||
reads this from JSON (default `()`). `write_launch_metadata` passes
|
||||
`spec.bottle_names` through.
|
||||
|
||||
### `bot_bottle/cli/start.py` — bottle multiselect step
|
||||
|
||||
After agent selection, before the name/color modal:
|
||||
|
||||
```python
|
||||
available_bottle_names = manifest.all_bottle_names
|
||||
# Peek at agent's bottle default for pre-population
|
||||
initial_bottle = _peek_agent_bottle(manifest, agent_name)
|
||||
initial = [initial_bottle] if initial_bottle else []
|
||||
|
||||
bottle_names_list = tui.filter_multiselect(
|
||||
available_bottle_names,
|
||||
title="Select bottles",
|
||||
initial=initial,
|
||||
)
|
||||
if bottle_names_list is None:
|
||||
return 0 # user cancelled
|
||||
bottle_names = tuple(bottle_names_list)
|
||||
```
|
||||
|
||||
`_peek_agent_bottle` reads the agent file's frontmatter without full parsing,
|
||||
returning the `bottle:` value or `""` when absent.
|
||||
|
||||
`BottleSpec` is built with `bottle_names=bottle_names`.
|
||||
|
||||
### `bot_bottle/cli/resume.py` — bottle names from metadata
|
||||
|
||||
```python
|
||||
spec = BottleSpec(
|
||||
...
|
||||
bottle_names=tuple(metadata.bottle_names),
|
||||
)
|
||||
```
|
||||
|
||||
## Implementation chunks
|
||||
|
||||
1. **Schema + model** — `manifest_schema.py`, `manifest_agent.py` (optional
|
||||
`bottle:`), `manifest_loader.py` (`scan_bottle_names`), `manifest.py`
|
||||
(`all_bottle_names`, `load_for_agent` signature), `manifest_extends.py`
|
||||
(`merge_bottles_runtime`), `bottle_state.py` (`bottle_names` field),
|
||||
`resolve_common.py` (thread through).
|
||||
2. **Backend** — `BottleSpec.bottle_names`, `_validate`, preflight print.
|
||||
3. **TUI** — `filter_multiselect` in `tui.py` + unit tests.
|
||||
4. **CLI wiring** — `start.py` bottle picker step, `resume.py` metadata load.
|
||||
5. **Tests** — `test_cli_start_selector.py` bottle-picker cases,
|
||||
`test_manifest_agent.py` optional-bottle cases, new
|
||||
`test_manifest_bottle_merge.py` for `merge_bottles_runtime`.
|
||||
|
||||
## Open questions
|
||||
|
||||
None.
|
||||
@@ -1,7 +1,8 @@
|
||||
"""Unit: cmd_start selector dispatch (PRD 0051).
|
||||
"""Unit: cmd_start selector dispatch (PRD 0051, issue #269).
|
||||
|
||||
Tests that cmd_start calls filter_select only when the agent name is
|
||||
absent, skips it when the agent is explicit, and returns 0 on cancel.
|
||||
absent, shows the bottle multiselect after agent selection, and skips
|
||||
pickers when both are explicitly set.
|
||||
|
||||
All actual launch work is stubbed so no container is created.
|
||||
"""
|
||||
@@ -10,6 +11,7 @@ from __future__ import annotations
|
||||
|
||||
import os
|
||||
import unittest
|
||||
from collections.abc import Mapping, Sequence
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import bot_bottle.cli.start as start_mod
|
||||
@@ -17,10 +19,16 @@ import bot_bottle.cli.tui as tui_mod
|
||||
from bot_bottle.backend import ActiveAgent
|
||||
|
||||
|
||||
def _make_manifest(agent_names: list[str]):
|
||||
def _make_manifest(
|
||||
agent_names: list[str],
|
||||
bottle_names: list[str] | None = None,
|
||||
agent_bottle: str = "",
|
||||
):
|
||||
manifest = MagicMock()
|
||||
manifest.agents = {name: MagicMock() for name in agent_names}
|
||||
manifest.agents = {name: MagicMock(bottle=agent_bottle) for name in agent_names}
|
||||
manifest.all_agent_names = sorted(agent_names)
|
||||
manifest.all_bottle_names = sorted(bottle_names or [])
|
||||
manifest.home_md = None # eager mode so _peek_agent_bottle uses agents dict
|
||||
return manifest
|
||||
|
||||
|
||||
@@ -28,27 +36,27 @@ class TestCmdStartSelector(unittest.TestCase):
|
||||
"""Drive cmd_start with a minimal set of stubs."""
|
||||
|
||||
def setUp(self):
|
||||
# Stub Manifest.resolve so no on-disk manifest is needed.
|
||||
self._manifest = _make_manifest(["researcher", "implementer"])
|
||||
self._manifest = _make_manifest(["researcher", "implementer"], ["claude", "dev"])
|
||||
self._resolve_patch = patch(
|
||||
"bot_bottle.cli.start.ManifestIndex.resolve",
|
||||
return_value=self._manifest,
|
||||
)
|
||||
self._resolve_patch.start()
|
||||
|
||||
# Stub _launch_bottle so no real container work happens.
|
||||
self._launch_patch = patch(
|
||||
"bot_bottle.cli.start._launch_bottle",
|
||||
return_value=0,
|
||||
)
|
||||
self._launch_mock = self._launch_patch.start()
|
||||
|
||||
# Stub filter_select to avoid opening /dev/tty.
|
||||
self._tui_patch = patch.object(tui_mod, "filter_select")
|
||||
self._tui_mock = self._tui_patch.start()
|
||||
# Stub filter_select (agent picker) and filter_multiselect (bottle picker).
|
||||
self._agent_picker_patch = patch.object(tui_mod, "filter_select")
|
||||
self._agent_picker_mock = self._agent_picker_patch.start()
|
||||
|
||||
self._bottle_picker_patch = patch.object(tui_mod, "filter_multiselect")
|
||||
self._bottle_picker_mock = self._bottle_picker_patch.start()
|
||||
self._bottle_picker_mock.return_value = ["claude"] # default: one bottle selected
|
||||
|
||||
# Ensure BOT_BOTTLE_BACKEND is absent so omitted --backend
|
||||
# flows through to the resolver default.
|
||||
self._env_patch = patch.dict(os.environ, {}, clear=False)
|
||||
self._env_patch.start()
|
||||
os.environ.pop("BOT_BOTTLE_BACKEND", None)
|
||||
@@ -56,50 +64,108 @@ class TestCmdStartSelector(unittest.TestCase):
|
||||
def tearDown(self):
|
||||
self._resolve_patch.stop()
|
||||
self._launch_patch.stop()
|
||||
self._tui_patch.stop()
|
||||
self._agent_picker_patch.stop()
|
||||
self._bottle_picker_patch.stop()
|
||||
self._env_patch.stop()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Both explicit — no picker shown
|
||||
# Agent explicit — agent picker skipped; bottle picker always shown
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def test_both_explicit_skips_picker(self):
|
||||
self._tui_mock.return_value = "researcher"
|
||||
def test_explicit_agent_skips_agent_picker(self):
|
||||
rc = start_mod.cmd_start(["--backend=docker", "researcher"])
|
||||
self.assertEqual(0, rc)
|
||||
self._tui_mock.assert_not_called()
|
||||
self._agent_picker_mock.assert_not_called()
|
||||
self._bottle_picker_mock.assert_called_once()
|
||||
self._launch_mock.assert_called_once()
|
||||
_, kwargs = self._launch_mock.call_args
|
||||
self.assertEqual("docker", kwargs["backend_name"])
|
||||
|
||||
def test_explicit_agent_bottle_picker_shows_available_bottles(self):
|
||||
start_mod.cmd_start(["researcher"])
|
||||
call_kwargs = self._bottle_picker_mock.call_args
|
||||
self.assertEqual(["claude", "dev"], call_kwargs[0][0])
|
||||
self.assertIn("bottle", call_kwargs[1]["title"].lower())
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Agent absent → agent picker fires; backend explicit
|
||||
# Agent absent → agent picker fires; bottle picker always follows
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def test_agent_absent_shows_agent_picker(self):
|
||||
self._tui_mock.return_value = "researcher"
|
||||
self._agent_picker_mock.return_value = "researcher"
|
||||
rc = start_mod.cmd_start(["--backend=docker"])
|
||||
self.assertEqual(0, rc)
|
||||
self._tui_mock.assert_called_once()
|
||||
call_kwargs = self._tui_mock.call_args
|
||||
self._agent_picker_mock.assert_called_once()
|
||||
call_kwargs = self._agent_picker_mock.call_args
|
||||
self.assertEqual(["implementer", "researcher"], call_kwargs[0][0])
|
||||
self.assertIn("agent", call_kwargs[1]["title"].lower())
|
||||
# Bottle picker must also fire after agent selection.
|
||||
self._bottle_picker_mock.assert_called_once()
|
||||
|
||||
def test_agent_picker_cancel_returns_0(self):
|
||||
self._tui_mock.return_value = None
|
||||
def test_agent_picker_cancel_skips_bottle_picker(self):
|
||||
self._agent_picker_mock.return_value = None
|
||||
rc = start_mod.cmd_start(["--backend=docker"])
|
||||
self.assertEqual(0, rc)
|
||||
self._bottle_picker_mock.assert_not_called()
|
||||
self._launch_mock.assert_not_called()
|
||||
|
||||
def test_bottle_picker_cancel_returns_0(self):
|
||||
self._bottle_picker_mock.return_value = None
|
||||
rc = start_mod.cmd_start(["researcher"])
|
||||
self.assertEqual(0, rc)
|
||||
self._launch_mock.assert_not_called()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Agent explicit, backend absent → no picker
|
||||
# Bottle selection is forwarded to BottleSpec
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def test_backend_absent_uses_default_without_picker(self):
|
||||
rc = start_mod.cmd_start(["researcher"])
|
||||
self.assertEqual(0, rc)
|
||||
self._tui_mock.assert_not_called()
|
||||
def test_selected_bottles_forwarded_to_spec(self):
|
||||
self._bottle_picker_mock.return_value = ["claude", "dev"]
|
||||
start_mod.cmd_start(["researcher"])
|
||||
self._launch_mock.assert_called_once()
|
||||
spec = self._launch_mock.call_args[0][0]
|
||||
self.assertEqual(("claude", "dev"), spec.bottle_names)
|
||||
|
||||
def test_empty_bottle_selection_forwarded(self):
|
||||
self._bottle_picker_mock.return_value = []
|
||||
start_mod.cmd_start(["researcher"])
|
||||
self._launch_mock.assert_called_once()
|
||||
spec = self._launch_mock.call_args[0][0]
|
||||
self.assertEqual((), spec.bottle_names)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Agent default bottle pre-populates the picker
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def test_agent_bottle_prepopulates_bottle_picker(self):
|
||||
manifest = _make_manifest(
|
||||
["implementer"], ["claude", "dev"], agent_bottle="claude"
|
||||
)
|
||||
with patch(
|
||||
"bot_bottle.cli.start.ManifestIndex.resolve", return_value=manifest
|
||||
):
|
||||
start_mod.cmd_start(["implementer"])
|
||||
call_kwargs = self._bottle_picker_mock.call_args
|
||||
self.assertEqual(["claude"], call_kwargs[1]["initial"])
|
||||
|
||||
def test_no_agent_bottle_empty_initial(self):
|
||||
manifest = _make_manifest(["researcher"], ["claude", "dev"], agent_bottle="")
|
||||
with patch(
|
||||
"bot_bottle.cli.start.ManifestIndex.resolve", return_value=manifest
|
||||
):
|
||||
start_mod.cmd_start(["researcher"])
|
||||
call_kwargs = self._bottle_picker_mock.call_args
|
||||
self.assertEqual([], call_kwargs[1]["initial"])
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Backend wiring
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def test_explicit_backend_forwarded(self):
|
||||
start_mod.cmd_start(["--backend=docker", "researcher"])
|
||||
_, kwargs = self._launch_mock.call_args
|
||||
self.assertEqual("docker", kwargs["backend_name"])
|
||||
|
||||
def test_absent_backend_uses_default(self):
|
||||
start_mod.cmd_start(["researcher"])
|
||||
_, kwargs = self._launch_mock.call_args
|
||||
self.assertIsNone(kwargs["backend_name"])
|
||||
|
||||
@@ -110,28 +176,21 @@ class TestCmdStartSelector(unittest.TestCase):
|
||||
finally:
|
||||
os.environ.pop("BOT_BOTTLE_BACKEND", None)
|
||||
self.assertEqual(0, rc)
|
||||
self._tui_mock.assert_not_called()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Both absent → only agent picker
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def test_both_absent_shows_only_agent_picker(self):
|
||||
self._tui_mock.return_value = "researcher"
|
||||
def test_both_absent_shows_agent_picker_then_bottle_picker(self):
|
||||
self._agent_picker_mock.return_value = "researcher"
|
||||
rc = start_mod.cmd_start([])
|
||||
self.assertEqual(0, rc)
|
||||
self._tui_mock.assert_called_once()
|
||||
title = self._tui_mock.call_args[1]["title"].lower()
|
||||
self.assertIn("agent", title)
|
||||
self._agent_picker_mock.assert_called_once()
|
||||
self._bottle_picker_mock.assert_called_once()
|
||||
self._launch_mock.assert_called_once()
|
||||
_, kwargs = self._launch_mock.call_args
|
||||
self.assertIsNone(kwargs["backend_name"])
|
||||
|
||||
def test_both_absent_agent_cancel_skips_backend_picker(self):
|
||||
self._tui_mock.side_effect = [None]
|
||||
def test_both_absent_agent_cancel_skips_bottle_and_launch(self):
|
||||
self._agent_picker_mock.return_value = None
|
||||
rc = start_mod.cmd_start([])
|
||||
self.assertEqual(0, rc)
|
||||
self.assertEqual(1, self._tui_mock.call_count)
|
||||
self._agent_picker_mock.assert_called_once()
|
||||
self._bottle_picker_mock.assert_not_called()
|
||||
self._launch_mock.assert_not_called()
|
||||
|
||||
|
||||
@@ -149,11 +208,13 @@ class TestCmdStartLabelCollision(unittest.TestCase):
|
||||
"""cmd_start re-prompts when the label's slug is already running."""
|
||||
|
||||
def setUp(self):
|
||||
self._manifest = _make_manifest(["researcher"])
|
||||
self._manifest = _make_manifest(["researcher"], ["claude"])
|
||||
patch("bot_bottle.cli.start.ManifestIndex.resolve", return_value=self._manifest).start()
|
||||
self._launch_mock = patch(
|
||||
"bot_bottle.cli.start._launch_bottle", return_value=0,
|
||||
).start()
|
||||
# Stub the bottle picker to always return a selection.
|
||||
patch.object(tui_mod, "filter_multiselect", return_value=["claude"]).start()
|
||||
self.addCleanup(patch.stopall)
|
||||
|
||||
def test_no_collision_proceeds_without_reprompt(self):
|
||||
@@ -193,5 +254,107 @@ class TestCmdStartLabelCollision(unittest.TestCase):
|
||||
self.assertIn("already in use", second_call_kwargs.get("disclaimer", ""))
|
||||
|
||||
|
||||
class TestBottleLineage(unittest.TestCase):
|
||||
"""Unit tests for _bottle_lineage."""
|
||||
|
||||
def test_returns_empty_in_eager_mode(self):
|
||||
manifest = _make_manifest(["agent"], ["base", "dev"])
|
||||
# home_md is None in eager mode → no file reads, returns {}
|
||||
result = start_mod._bottle_lineage(manifest)
|
||||
self.assertEqual({}, result)
|
||||
|
||||
def test_reads_extends_chain_from_files(self):
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
bottles_dir = Path(tmp) / "bottles"
|
||||
bottles_dir.mkdir()
|
||||
(bottles_dir / "base.md").write_text("---\n{}\n---\n")
|
||||
(bottles_dir / "mid.md").write_text("---\nextends: base\n---\n")
|
||||
(bottles_dir / "leaf.md").write_text("---\nextends: mid\n---\n")
|
||||
|
||||
manifest = MagicMock()
|
||||
manifest.home_md = Path(tmp)
|
||||
|
||||
result = start_mod._bottle_lineage(manifest)
|
||||
|
||||
self.assertNotIn("base", result) # no parent → not in map
|
||||
self.assertEqual("base <- mid", result["mid"])
|
||||
self.assertEqual("base <- mid <- leaf", result["leaf"])
|
||||
|
||||
def test_cycle_protection(self):
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
bottles_dir = Path(tmp) / "bottles"
|
||||
bottles_dir.mkdir()
|
||||
(bottles_dir / "a.md").write_text("---\nextends: b\n---\n")
|
||||
(bottles_dir / "b.md").write_text("---\nextends: a\n---\n")
|
||||
|
||||
manifest = MagicMock()
|
||||
manifest.home_md = Path(tmp)
|
||||
|
||||
result = start_mod._bottle_lineage(manifest)
|
||||
|
||||
# Cycle must not hang; each should get a two-element chain.
|
||||
for name in ("a", "b"):
|
||||
self.assertIn(name, result)
|
||||
self.assertIn("<-", result[name])
|
||||
|
||||
|
||||
class TestManifestToYaml(unittest.TestCase):
|
||||
"""Unit tests for _manifest_to_yaml."""
|
||||
|
||||
def _make_manifest_obj(
|
||||
self,
|
||||
*,
|
||||
skills: Sequence[str] = (),
|
||||
env: Mapping[str, str] | None = None,
|
||||
supervise: bool = True,
|
||||
agent_provider_template: str = "claude",
|
||||
):
|
||||
from bot_bottle.manifest import Manifest, ManifestBottle
|
||||
from bot_bottle.manifest_agent import ManifestAgent, ManifestAgentProvider
|
||||
|
||||
agent = ManifestAgent(skills=tuple(skills))
|
||||
bottle = ManifestBottle(
|
||||
env=env or {},
|
||||
supervise=supervise,
|
||||
agent_provider=ManifestAgentProvider(template=agent_provider_template),
|
||||
)
|
||||
return Manifest(agent=agent, bottle=bottle)
|
||||
|
||||
def test_includes_agent_section(self):
|
||||
m = self._make_manifest_obj(skills=["researcher"])
|
||||
yaml = start_mod._manifest_to_yaml(m)
|
||||
self.assertIn("agent:", yaml)
|
||||
self.assertIn("- researcher", yaml)
|
||||
|
||||
def test_includes_bottle_section(self):
|
||||
m = self._make_manifest_obj(env={"FOO": "bar"})
|
||||
yaml = start_mod._manifest_to_yaml(m)
|
||||
self.assertIn("bottle:", yaml)
|
||||
self.assertIn("FOO: bar", yaml)
|
||||
|
||||
def test_supervise_rendered(self):
|
||||
m_true = self._make_manifest_obj(supervise=True)
|
||||
m_false = self._make_manifest_obj(supervise=False)
|
||||
self.assertIn("supervise: true", start_mod._manifest_to_yaml(m_true))
|
||||
self.assertIn("supervise: false", start_mod._manifest_to_yaml(m_false))
|
||||
|
||||
def test_non_claude_provider_shown(self):
|
||||
m = self._make_manifest_obj(agent_provider_template="codex")
|
||||
yaml = start_mod._manifest_to_yaml(m)
|
||||
self.assertIn("agent_provider:", yaml)
|
||||
self.assertIn("template: codex", yaml)
|
||||
|
||||
def test_default_claude_provider_omitted(self):
|
||||
m = self._make_manifest_obj(agent_provider_template="claude")
|
||||
yaml = start_mod._manifest_to_yaml(m)
|
||||
self.assertNotIn("agent_provider:", yaml)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
+128
-2
@@ -1,4 +1,4 @@
|
||||
"""Unit tests for bot_bottle.cli.tui — filter_select internals.
|
||||
"""Unit tests for bot_bottle.cli.tui — filter_select and filter_multiselect.
|
||||
|
||||
We test the pure-Python logic (_filter_items, cursor movement, confirm,
|
||||
cancel) by exercising the internal helpers directly, without spinning up
|
||||
@@ -8,8 +8,15 @@ a real curses session (which requires a TTY).
|
||||
from __future__ import annotations
|
||||
|
||||
import unittest
|
||||
from typing import Any, Optional
|
||||
|
||||
from bot_bottle.cli.tui import _filter_items, filter_select
|
||||
from bot_bottle.cli.tui import _filter_items, _multiselect_loop, filter_multiselect, filter_select
|
||||
|
||||
_KEY_SPACE = 32
|
||||
_KEY_ENTER = 10
|
||||
|
||||
_KEY_ESC = 27
|
||||
_KEY_CTRL_D = 4
|
||||
|
||||
|
||||
class TestFilterItems(unittest.TestCase):
|
||||
@@ -46,5 +53,124 @@ class TestFilterSelectEmptyItems(unittest.TestCase):
|
||||
self.assertIsNone(result)
|
||||
|
||||
|
||||
class TestFilterMultiselectEmptyItems(unittest.TestCase):
|
||||
def test_returns_empty_list_for_empty_items(self):
|
||||
# No TTY needed — short-circuits before opening tty.
|
||||
result = filter_multiselect([], title="Select", tty_path="/dev/null")
|
||||
self.assertEqual([], result)
|
||||
|
||||
def test_returns_none_when_tty_unavailable(self):
|
||||
result = filter_multiselect(["a", "b"], tty_path="/nonexistent/tty")
|
||||
self.assertIsNone(result)
|
||||
|
||||
|
||||
class TestMultiselectLoopReordering(unittest.TestCase):
|
||||
"""Exercise _multiselect_loop key handling without a real curses terminal.
|
||||
|
||||
We drive the loop via a fake screen that feeds a pre-recorded key sequence
|
||||
and records what was drawn — we only need the return value, so the fake
|
||||
screen's getch() raises StopIteration after the key list is exhausted, and
|
||||
the loop is expected to return before that via Ctrl-D.
|
||||
"""
|
||||
|
||||
def _run(self, keys: list[int], items: list[str], initial: list[str]) -> Optional[list[str]]:
|
||||
"""Run _multiselect_loop with a synthetic screen feeding `keys`."""
|
||||
key_iter = iter(keys)
|
||||
|
||||
class FakeScreen:
|
||||
def erase(self) -> None: pass
|
||||
def getmaxyx(self) -> tuple[int, int]: return (40, 80)
|
||||
def refresh(self) -> None: pass
|
||||
def getch(self) -> int: return next(key_iter)
|
||||
def addstr(self, *a: Any) -> None: pass
|
||||
def keypad(self, *a: Any) -> None: pass
|
||||
|
||||
return _multiselect_loop(FakeScreen(), items, title="", initial=initial) # type: ignore[arg-type]
|
||||
|
||||
def test_ctrl_d_confirms_initial_selection(self):
|
||||
result = self._run([_KEY_CTRL_D], ["a", "b", "c"], ["a", "b"])
|
||||
self.assertEqual(["a", "b"], result)
|
||||
|
||||
def test_esc_cancels(self):
|
||||
result = self._run([_KEY_ESC], ["a", "b"], ["a"])
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_tab_then_K_moves_item_up(self):
|
||||
# Start: selected = ["a", "b", "c"]
|
||||
# Tab → order mode (order_cursor=0 on "a")
|
||||
# ↓ → order_cursor=1 (on "b")
|
||||
# K → swap b and a → ["b", "a", "c"], order_cursor=0
|
||||
# Ctrl-D → confirm
|
||||
DOWN = ord("j")
|
||||
result = self._run(
|
||||
[ord("\t"), DOWN, ord("K"), _KEY_CTRL_D],
|
||||
["a", "b", "c"],
|
||||
["a", "b", "c"],
|
||||
)
|
||||
self.assertEqual(["b", "a", "c"], result)
|
||||
|
||||
def test_tab_then_J_moves_item_down(self):
|
||||
# selected = ["a", "b", "c"], focus order, cursor=0
|
||||
# J → swap a and b → ["b", "a", "c"], cursor=1
|
||||
# Ctrl-D → confirm
|
||||
result = self._run(
|
||||
[ord("\t"), ord("J"), _KEY_CTRL_D],
|
||||
["a", "b", "c"],
|
||||
["a", "b", "c"],
|
||||
)
|
||||
self.assertEqual(["b", "a", "c"], result)
|
||||
|
||||
def test_K_at_top_is_no_op(self):
|
||||
# cursor already at 0, K should not change order
|
||||
result = self._run(
|
||||
[ord("\t"), ord("K"), _KEY_CTRL_D],
|
||||
["a", "b"],
|
||||
["a", "b"],
|
||||
)
|
||||
self.assertEqual(["a", "b"], result)
|
||||
|
||||
def test_J_at_bottom_is_no_op(self):
|
||||
DOWN = ord("j")
|
||||
result = self._run(
|
||||
[ord("\t"), DOWN, ord("J"), _KEY_CTRL_D],
|
||||
["a", "b"],
|
||||
["a", "b"],
|
||||
)
|
||||
self.assertEqual(["a", "b"], result)
|
||||
|
||||
def test_tab_back_to_filter_then_confirm(self):
|
||||
# Tab → order, Tab → filter, Ctrl-D confirms unchanged
|
||||
result = self._run(
|
||||
[ord("\t"), ord("\t"), _KEY_CTRL_D],
|
||||
["a", "b"],
|
||||
["a", "b"],
|
||||
)
|
||||
self.assertEqual(["a", "b"], result)
|
||||
|
||||
def test_space_toggles_item_on(self):
|
||||
# Space on an unselected item selects it; Ctrl-D confirms.
|
||||
result = self._run([_KEY_SPACE, _KEY_CTRL_D], ["a", "b"], [])
|
||||
self.assertEqual(["a"], result)
|
||||
|
||||
def test_space_toggles_item_off(self):
|
||||
# Space on a selected item deselects it; Ctrl-D confirms empty.
|
||||
result = self._run([_KEY_SPACE, _KEY_CTRL_D], ["a", "b"], ["a"])
|
||||
self.assertEqual([], result)
|
||||
|
||||
def test_enter_confirms_without_toggle(self):
|
||||
# Enter immediately confirms the current selection without toggling.
|
||||
result = self._run([_KEY_ENTER], ["a", "b"], ["a"])
|
||||
self.assertEqual(["a"], result)
|
||||
|
||||
def test_enter_confirms_empty_selection(self):
|
||||
result = self._run([_KEY_ENTER], ["a", "b"], [])
|
||||
self.assertEqual([], result)
|
||||
|
||||
def test_space_then_enter_confirms(self):
|
||||
# Space selects "a", Enter confirms.
|
||||
result = self._run([_KEY_SPACE, _KEY_ENTER], ["a", "b"], [])
|
||||
self.assertEqual(["a"], result)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
@@ -10,6 +10,8 @@ from unittest.mock import MagicMock, patch
|
||||
|
||||
from bot_bottle.contrib.gitea.deploy_key_provisioner import (
|
||||
GiteaDeployKeyProvisioner,
|
||||
_API_TIMEOUT_SECS,
|
||||
_KEYGEN_TIMEOUT_SECS,
|
||||
_split_owner_repo,
|
||||
)
|
||||
from bot_bottle.deploy_key_provisioner import DeployKeyCollisionError
|
||||
@@ -83,6 +85,25 @@ class TestCreate(unittest.TestCase):
|
||||
self.assertEqual(str(fake_key_id), key_id)
|
||||
self.assertEqual(fake_private, private_bytes)
|
||||
|
||||
def test_create_passes_timeout_to_ssh_keygen_and_urlopen(self):
|
||||
provisioner = _provisioner()
|
||||
with patch(
|
||||
"bot_bottle.contrib.gitea.deploy_key_provisioner.subprocess.run"
|
||||
) as mock_run, patch(
|
||||
"bot_bottle.contrib.gitea.deploy_key_provisioner.urllib.request.urlopen"
|
||||
) as mock_urlopen, patch(
|
||||
"bot_bottle.contrib.gitea.deploy_key_provisioner.Path.read_bytes",
|
||||
return_value=b"PRIVATE",
|
||||
), patch(
|
||||
"bot_bottle.contrib.gitea.deploy_key_provisioner.Path.read_text",
|
||||
return_value="ssh-ed25519 AAAA\n",
|
||||
):
|
||||
mock_urlopen.return_value = _urlopen_response({"id": 1})
|
||||
provisioner.create("owner/repo", "title")
|
||||
|
||||
self.assertEqual(_KEYGEN_TIMEOUT_SECS, mock_run.call_args.kwargs.get("timeout"))
|
||||
self.assertEqual(_API_TIMEOUT_SECS, mock_urlopen.call_args.kwargs.get("timeout"))
|
||||
|
||||
def test_create_raises_on_http_error(self):
|
||||
provisioner = _provisioner()
|
||||
with patch(
|
||||
@@ -139,6 +160,16 @@ class TestDelete(unittest.TestCase):
|
||||
self.assertIn("/api/v1/repos/didericis/bot-bottle/keys/99", req.full_url)
|
||||
self.assertEqual("DELETE", req.get_method())
|
||||
|
||||
def test_delete_passes_timeout_to_urlopen(self):
|
||||
provisioner = _provisioner()
|
||||
with patch(
|
||||
"bot_bottle.contrib.gitea.deploy_key_provisioner.urllib.request.urlopen"
|
||||
) as mock_urlopen:
|
||||
mock_urlopen.return_value = _urlopen_response({})
|
||||
provisioner.delete("owner/repo", "7")
|
||||
|
||||
self.assertEqual(_API_TIMEOUT_SECS, mock_urlopen.call_args.kwargs.get("timeout"))
|
||||
|
||||
def test_delete_tolerates_404(self):
|
||||
provisioner = _provisioner()
|
||||
with patch(
|
||||
|
||||
@@ -10,6 +10,7 @@ from bot_bottle.egress import (
|
||||
Egress,
|
||||
EgressPlan,
|
||||
EgressRoute,
|
||||
_yaml_str_escape,
|
||||
egress_agent_env_entries,
|
||||
egress_manifest_routes,
|
||||
egress_render_routes,
|
||||
@@ -419,6 +420,76 @@ class TestRenderRoutes(unittest.TestCase):
|
||||
self.assertEqual(LOG_BLOCKS, cfg.log)
|
||||
|
||||
|
||||
class TestYamlStrEscape(unittest.TestCase):
|
||||
"""_yaml_str_escape produces safe YAML double-quoted scalar content."""
|
||||
|
||||
def test_plain_string_unchanged(self):
|
||||
self.assertEqual("api.example.com", _yaml_str_escape("api.example.com"))
|
||||
|
||||
def test_double_quote_escaped(self):
|
||||
self.assertEqual('\\"', _yaml_str_escape('"'))
|
||||
|
||||
def test_backslash_escaped(self):
|
||||
self.assertEqual("\\\\", _yaml_str_escape("\\"))
|
||||
|
||||
def test_newline_escaped(self):
|
||||
self.assertEqual("\\n", _yaml_str_escape("\n"))
|
||||
|
||||
def test_carriage_return_escaped(self):
|
||||
self.assertEqual("\\r", _yaml_str_escape("\r"))
|
||||
|
||||
def test_tab_escaped(self):
|
||||
self.assertEqual("\\t", _yaml_str_escape("\t"))
|
||||
|
||||
def test_combined(self):
|
||||
self.assertEqual('\\"\\n\\\\', _yaml_str_escape('"\n\\'))
|
||||
|
||||
|
||||
class TestRenderRoutesEscaping(unittest.TestCase):
|
||||
"""Stray quotes/newlines in manifest strings do not corrupt routes.yaml."""
|
||||
|
||||
@staticmethod
|
||||
def _parsed(routes) -> list[dict]: # type: ignore
|
||||
return parse_yaml_subset(egress_render_routes(routes))["routes"] # type: ignore
|
||||
|
||||
def test_host_with_double_quote_round_trips(self):
|
||||
routes = (EgressRoute(host='bad"host.example'),)
|
||||
parsed = self._parsed(routes)
|
||||
self.assertEqual('bad"host.example', parsed[0]["host"])
|
||||
|
||||
def test_host_with_newline_round_trips(self):
|
||||
routes = (EgressRoute(host="host\nextra.example"),)
|
||||
parsed = self._parsed(routes)
|
||||
self.assertEqual("host\nextra.example", parsed[0]["host"])
|
||||
|
||||
def test_auth_scheme_with_double_quote_round_trips(self):
|
||||
routes = (EgressRoute(
|
||||
host="api.example",
|
||||
auth_scheme='Bear"er',
|
||||
token_env="EGRESS_TOKEN_0",
|
||||
),)
|
||||
parsed = self._parsed(routes)
|
||||
self.assertEqual('Bear"er', parsed[0]["auth_scheme"])
|
||||
|
||||
def test_path_value_with_double_quote_round_trips(self):
|
||||
from bot_bottle.egress_addon_core import PathMatch, MatchEntry
|
||||
routes = (EgressRoute(
|
||||
host="api.example",
|
||||
matches=(MatchEntry(paths=(PathMatch(type="prefix", value='/v1/"quoted"/'),)),),
|
||||
),)
|
||||
parsed = self._parsed(routes)
|
||||
self.assertEqual('/v1/"quoted"/', parsed[0]["matches"][0]["paths"][0]["value"])
|
||||
|
||||
def test_header_value_with_double_quote_round_trips(self):
|
||||
from bot_bottle.egress_addon_core import HeaderMatch, MatchEntry
|
||||
routes = (EgressRoute(
|
||||
host="api.example",
|
||||
matches=(MatchEntry(headers=(HeaderMatch(name="x-h", value='val"ue'),)),),
|
||||
),)
|
||||
parsed = self._parsed(routes)
|
||||
self.assertEqual('val"ue', parsed[0]["matches"][0]["headers"][0]["value"])
|
||||
|
||||
|
||||
class TestResolveTokenValues(unittest.TestCase):
|
||||
def test_reads_host_env(self):
|
||||
out = egress_resolve_token_values(
|
||||
|
||||
@@ -9,6 +9,7 @@ import urllib.request
|
||||
from pathlib import Path
|
||||
from unittest import mock
|
||||
|
||||
from bot_bottle.git_gate import GIT_GATE_TIMEOUT_SECS
|
||||
from bot_bottle.git_http_backend import GitHttpHandler, MAX_BODY_BYTES
|
||||
|
||||
|
||||
@@ -150,6 +151,61 @@ class TestGitHttpBackend(unittest.TestCase):
|
||||
)
|
||||
self.assertEqual("git/test", env["HTTP_USER_AGENT"])
|
||||
|
||||
def test_subprocess_calls_include_timeout(self):
|
||||
"""Both subprocess.run calls (access-hook and git http-backend) must
|
||||
pass timeout= so a hung upstream cannot wedge the sidecar."""
|
||||
from http.server import ThreadingHTTPServer
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
root = Path(tmp)
|
||||
(root / "repo.git").mkdir()
|
||||
|
||||
old_root = os.environ.get("GIT_PROJECT_ROOT")
|
||||
os.environ["GIT_PROJECT_ROOT"] = str(root)
|
||||
self.addCleanup(self._restore_env, old_root)
|
||||
old_hook = os.environ.get("GIT_GATE_ACCESS_HOOK")
|
||||
hook = root / "access-hook"
|
||||
hook.write_text("#!/bin/sh\nexit 0\n")
|
||||
hook.chmod(0o700)
|
||||
os.environ["GIT_GATE_ACCESS_HOOK"] = str(hook)
|
||||
self.addCleanup(self._restore_hook, old_hook)
|
||||
|
||||
server = ThreadingHTTPServer(("127.0.0.1", 0), GitHttpHandler)
|
||||
thread = threading.Thread(target=server.serve_forever, daemon=True)
|
||||
thread.start()
|
||||
self.addCleanup(server.shutdown)
|
||||
self.addCleanup(server.server_close)
|
||||
|
||||
backend_response = (
|
||||
b"Status: 200 OK\r\n"
|
||||
b"Content-Type: application/x-git-upload-pack-result\r\n"
|
||||
b"\r\n"
|
||||
b"0000"
|
||||
)
|
||||
calls = [
|
||||
subprocess.CompletedProcess(["hook"], 0, b"", b""),
|
||||
subprocess.CompletedProcess(["git"], 0, backend_response, b""),
|
||||
]
|
||||
with mock.patch(
|
||||
"bot_bottle.git_http_backend.subprocess.run",
|
||||
side_effect=calls,
|
||||
) as run:
|
||||
req = urllib.request.Request(
|
||||
f"http://127.0.0.1:{server.server_port}"
|
||||
"/repo.git/git-upload-pack",
|
||||
data=b"",
|
||||
method="POST",
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=5):
|
||||
pass
|
||||
|
||||
for call in run.call_args_list:
|
||||
self.assertEqual(
|
||||
GIT_GATE_TIMEOUT_SECS,
|
||||
call.kwargs.get("timeout"),
|
||||
f"subprocess.run call missing timeout: {call}",
|
||||
)
|
||||
|
||||
def test_access_hook_denial_is_logged_to_stdout(self):
|
||||
"""When the access-hook exits non-zero we still return 403 to the
|
||||
client, but the hook's stderr must also appear on the handler's
|
||||
@@ -256,6 +312,57 @@ class TestGitHttpBackend(unittest.TestCase):
|
||||
os.environ["GIT_GATE_ACCESS_HOOK"] = value
|
||||
|
||||
|
||||
class TestMalformedStatusHeader(unittest.TestCase):
|
||||
"""Malformed CGI Status: headers must not propagate as unhandled exceptions;
|
||||
the handler should fall back to HTTP 500."""
|
||||
|
||||
def setUp(self):
|
||||
from http.server import ThreadingHTTPServer
|
||||
import tempfile
|
||||
self._tmp = tempfile.mkdtemp()
|
||||
os.environ["GIT_PROJECT_ROOT"] = self._tmp
|
||||
self._server = ThreadingHTTPServer(("127.0.0.1", 0), GitHttpHandler)
|
||||
self._thread = threading.Thread(
|
||||
target=self._server.serve_forever, daemon=True,
|
||||
)
|
||||
self._thread.start()
|
||||
self._port = self._server.server_port
|
||||
|
||||
def tearDown(self):
|
||||
self._server.shutdown()
|
||||
self._server.server_close()
|
||||
os.environ.pop("GIT_PROJECT_ROOT", None)
|
||||
import shutil
|
||||
shutil.rmtree(self._tmp, ignore_errors=True)
|
||||
|
||||
def _get_with_backend_response(self, cgi_response: bytes) -> int:
|
||||
with mock.patch(
|
||||
"bot_bottle.git_http_backend.subprocess.run",
|
||||
return_value=mock.Mock(returncode=0, stdout=cgi_response),
|
||||
):
|
||||
req = urllib.request.Request(
|
||||
f"http://127.0.0.1:{self._port}/repo.git/info/refs",
|
||||
method="GET",
|
||||
)
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=3) as resp:
|
||||
return resp.status
|
||||
except urllib.error.HTTPError as e: # type: ignore
|
||||
return e.code
|
||||
|
||||
def test_empty_status_value_returns_500(self):
|
||||
status = self._get_with_backend_response(
|
||||
b"Status: \r\nContent-Type: text/plain\r\n\r\n"
|
||||
)
|
||||
self.assertEqual(500, status)
|
||||
|
||||
def test_non_numeric_status_returns_500(self):
|
||||
status = self._get_with_backend_response(
|
||||
b"Status: bad\r\nContent-Type: text/plain\r\n\r\n"
|
||||
)
|
||||
self.assertEqual(500, status)
|
||||
|
||||
|
||||
class TestContentLengthBounds(unittest.TestCase):
|
||||
"""PRD 0041: malformed or oversized Content-Length is rejected before
|
||||
git http-backend is invoked."""
|
||||
|
||||
@@ -73,6 +73,33 @@ resolver #2
|
||||
)
|
||||
self.assertTrue(run.call_args_list[-1].kwargs["check"])
|
||||
|
||||
def test_build_image_anchors_relative_dockerfile_to_context(self):
|
||||
status = util.subprocess.CompletedProcess(
|
||||
args=[],
|
||||
returncode=0,
|
||||
stdout=(
|
||||
'[{"status":{"state":"running"},'
|
||||
'"configuration":{"dns":{"nameservers":["9.9.9.9"]}}}]'
|
||||
),
|
||||
stderr="",
|
||||
)
|
||||
with patch.object(util.subprocess, "run", return_value=status) as run, \
|
||||
patch.object(util.os, "environ", {
|
||||
"BOT_BOTTLE_MACOS_CONTAINER_DNS": "9.9.9.9",
|
||||
}):
|
||||
util.build_image(
|
||||
"bot-bottle-sidecars:latest",
|
||||
"/repo",
|
||||
dockerfile="Dockerfile.sidecars",
|
||||
)
|
||||
self.assertEqual(
|
||||
[
|
||||
"container", "build", "-t", "bot-bottle-sidecars:latest",
|
||||
"--dns", "9.9.9.9", "-f", "/repo/Dockerfile.sidecars", "/repo",
|
||||
],
|
||||
run.call_args_list[-1].args[0],
|
||||
)
|
||||
|
||||
def test_commit_container_execs_tar_and_builds_image(self):
|
||||
# stderr is bytes because subprocess.run uses stderr=PIPE without text=True
|
||||
completed = util.subprocess.CompletedProcess(
|
||||
|
||||
@@ -0,0 +1,200 @@
|
||||
"""Unit: runtime bottle composition (issue #269).
|
||||
|
||||
Tests for merge_bottles_runtime and ManifestIndex.load_for_agent with
|
||||
the new bottle_names parameter.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
import textwrap
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
from bot_bottle.manifest import ManifestBottle, ManifestError, ManifestIndex
|
||||
from bot_bottle.manifest_extends import merge_bottles_runtime
|
||||
|
||||
|
||||
def _index(bottles: dict[str, object], agents: dict[str, object]) -> ManifestIndex:
|
||||
return ManifestIndex.from_json_obj({"bottles": bottles, "agents": agents})
|
||||
|
||||
|
||||
def _bottle(**kwargs: object) -> ManifestBottle:
|
||||
return ManifestBottle.from_dict("test", kwargs)
|
||||
|
||||
|
||||
class TestMergeBottlesRuntime(unittest.TestCase):
|
||||
def test_single_bottle_returns_as_is(self):
|
||||
b = _bottle(env={"FOO": "1"})
|
||||
result = merge_bottles_runtime([b])
|
||||
self.assertEqual({"FOO": "1"}, dict(result.env))
|
||||
|
||||
def test_env_later_wins(self):
|
||||
base = _bottle(env={"FOO": "base", "ONLY_BASE": "x"})
|
||||
override = _bottle(env={"FOO": "override", "ONLY_OVERRIDE": "y"})
|
||||
result = merge_bottles_runtime([base, override])
|
||||
self.assertEqual("override", result.env["FOO"])
|
||||
self.assertEqual("x", result.env["ONLY_BASE"])
|
||||
self.assertEqual("y", result.env["ONLY_OVERRIDE"])
|
||||
|
||||
def test_egress_routes_concatenated(self):
|
||||
from bot_bottle.manifest_egress import ManifestEgressConfig, ManifestEgressRoute
|
||||
r1 = ManifestEgressRoute(Host="api.a.com")
|
||||
r2 = ManifestEgressRoute(Host="api.b.com")
|
||||
base = ManifestBottle(egress=ManifestEgressConfig(routes=(r1,)))
|
||||
override = ManifestBottle(egress=ManifestEgressConfig(routes=(r2,)))
|
||||
result = merge_bottles_runtime([base, override])
|
||||
hosts = [r.Host for r in result.egress.routes]
|
||||
self.assertIn("api.a.com", hosts)
|
||||
self.assertIn("api.b.com", hosts)
|
||||
|
||||
def test_supervise_later_wins(self):
|
||||
base = _bottle(supervise=True)
|
||||
override = _bottle(supervise=False)
|
||||
result = merge_bottles_runtime([base, override])
|
||||
self.assertFalse(result.supervise)
|
||||
|
||||
def test_three_bottles_merged_left_to_right(self):
|
||||
b1 = _bottle(env={"A": "1", "B": "1", "C": "1"})
|
||||
b2 = _bottle(env={"B": "2", "C": "2"})
|
||||
b3 = _bottle(env={"C": "3"})
|
||||
result = merge_bottles_runtime([b1, b2, b3])
|
||||
self.assertEqual("1", result.env["A"])
|
||||
self.assertEqual("2", result.env["B"])
|
||||
self.assertEqual("3", result.env["C"])
|
||||
|
||||
def test_empty_list_raises(self):
|
||||
with self.assertRaises(ValueError):
|
||||
merge_bottles_runtime([])
|
||||
|
||||
|
||||
class TestLoadForAgentWithBottleNames(unittest.TestCase):
|
||||
def test_bottle_names_override_agent_bottle(self):
|
||||
idx = _index(
|
||||
bottles={
|
||||
"base": {"env": {"X": "base"}},
|
||||
"override": {"env": {"X": "override"}},
|
||||
},
|
||||
agents={"impl": {"bottle": "base", "skills": [], "prompt": ""}},
|
||||
)
|
||||
m = idx.load_for_agent("impl", ("override",))
|
||||
self.assertEqual("override", m.bottle.env["X"])
|
||||
|
||||
def test_bottle_names_merged_in_order(self):
|
||||
idx = _index(
|
||||
bottles={
|
||||
"a": {"env": {"X": "a", "A": "only-a"}},
|
||||
"b": {"env": {"X": "b", "B": "only-b"}},
|
||||
},
|
||||
agents={"impl": {"bottle": "a", "skills": [], "prompt": ""}},
|
||||
)
|
||||
m = idx.load_for_agent("impl", ("a", "b"))
|
||||
self.assertEqual("b", m.bottle.env["X"])
|
||||
self.assertEqual("only-a", m.bottle.env["A"])
|
||||
self.assertEqual("only-b", m.bottle.env["B"])
|
||||
|
||||
def test_empty_bottle_names_uses_agent_bottle(self):
|
||||
idx = _index(
|
||||
bottles={"base": {"env": {"X": "base"}}},
|
||||
agents={"impl": {"bottle": "base", "skills": [], "prompt": ""}},
|
||||
)
|
||||
m = idx.load_for_agent("impl", ())
|
||||
self.assertEqual("base", m.bottle.env["X"])
|
||||
|
||||
def test_no_bottle_and_no_bottle_names_raises(self):
|
||||
idx = _index(
|
||||
bottles={"base": {}},
|
||||
agents={"impl": {"skills": [], "prompt": ""}},
|
||||
)
|
||||
with self.assertRaises(ManifestError) as ctx:
|
||||
idx.load_for_agent("impl", ())
|
||||
self.assertIn("no 'bottle' field", str(ctx.exception))
|
||||
|
||||
def test_unknown_bottle_name_raises(self):
|
||||
idx = _index(
|
||||
bottles={"base": {}},
|
||||
agents={"impl": {"bottle": "base", "skills": [], "prompt": ""}},
|
||||
)
|
||||
with self.assertRaises(ManifestError) as ctx:
|
||||
idx.load_for_agent("impl", ("nonexistent",))
|
||||
self.assertIn("nonexistent", str(ctx.exception))
|
||||
|
||||
def test_agent_without_bottle_works_with_bottle_names(self):
|
||||
idx = _index(
|
||||
bottles={"base": {"env": {"X": "base"}}},
|
||||
agents={"impl": {"skills": [], "prompt": ""}},
|
||||
)
|
||||
m = idx.load_for_agent("impl", ("base",))
|
||||
self.assertEqual("base", m.bottle.env["X"])
|
||||
|
||||
|
||||
class TestAllBottleNames(unittest.TestCase):
|
||||
def test_eager_mode_returns_bottle_names(self):
|
||||
idx = _index(
|
||||
bottles={"alpha": {}, "beta": {}, "gamma": {}},
|
||||
agents={"impl": {"bottle": "alpha", "skills": [], "prompt": ""}},
|
||||
)
|
||||
self.assertEqual(["alpha", "beta", "gamma"], idx.all_bottle_names)
|
||||
|
||||
def test_lazy_mode_scans_files(self):
|
||||
home = Path(tempfile.mkdtemp(prefix="cb-home-"))
|
||||
orig_home = os.environ.get("HOME")
|
||||
os.environ["HOME"] = str(home)
|
||||
try:
|
||||
bottles_dir = home / ".bot-bottle" / "bottles"
|
||||
agents_dir = home / ".bot-bottle" / "agents"
|
||||
bottles_dir.mkdir(parents=True)
|
||||
agents_dir.mkdir(parents=True)
|
||||
(bottles_dir / "claude.md").write_text("---\n---\n")
|
||||
(bottles_dir / "dev.md").write_text("---\n---\n")
|
||||
(agents_dir / "impl.md").write_text("---\nbottle: claude\n---\n")
|
||||
idx = ManifestIndex.resolve(str(home))
|
||||
self.assertEqual(["claude", "dev"], idx.all_bottle_names)
|
||||
finally:
|
||||
if orig_home is None:
|
||||
os.environ.pop("HOME", None)
|
||||
else:
|
||||
os.environ["HOME"] = orig_home
|
||||
shutil.rmtree(home, ignore_errors=True)
|
||||
|
||||
|
||||
class TestAgentOptionalBottleMd(unittest.TestCase):
|
||||
"""Agent file without bottle: works when bottle_names are provided at launch."""
|
||||
|
||||
def setUp(self) -> None:
|
||||
self.home = Path(tempfile.mkdtemp(prefix="cb-home-"))
|
||||
self._orig_home = os.environ.get("HOME")
|
||||
os.environ["HOME"] = str(self.home)
|
||||
|
||||
def tearDown(self) -> None:
|
||||
if self._orig_home is None:
|
||||
os.environ.pop("HOME", None)
|
||||
else:
|
||||
os.environ["HOME"] = self._orig_home
|
||||
shutil.rmtree(self.home, ignore_errors=True)
|
||||
|
||||
def _write(self, rel: str, text: str) -> None:
|
||||
p = self.home / ".bot-bottle" / rel
|
||||
p.parent.mkdir(parents=True, exist_ok=True)
|
||||
p.write_text(textwrap.dedent(text).lstrip("\n"))
|
||||
|
||||
def test_agent_without_bottle_resolves_with_bottle_names(self):
|
||||
self._write("bottles/dev.md", "---\nenv:\n X: dev\n---\n")
|
||||
self._write("agents/impl.md", "---\n---\nimpl agent.\n")
|
||||
idx = ManifestIndex.resolve(str(self.home))
|
||||
m = idx.load_for_agent("impl", ("dev",))
|
||||
self.assertEqual("dev", m.bottle.env["X"])
|
||||
|
||||
def test_agent_without_bottle_fails_without_bottle_names(self):
|
||||
self._write("bottles/dev.md", "---\n---\n")
|
||||
self._write("agents/impl.md", "---\n---\nimpl agent.\n")
|
||||
idx = ManifestIndex.resolve(str(self.home))
|
||||
with self.assertRaises(ManifestError) as ctx:
|
||||
idx.load_for_agent("impl", ())
|
||||
self.assertIn("no 'bottle' field", str(ctx.exception))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -8,6 +8,7 @@ import unittest
|
||||
|
||||
from bot_bottle.git_gate import (
|
||||
GIT_GATE_HOSTNAME,
|
||||
_gitconfig_validate_value,
|
||||
git_gate_render_gitconfig,
|
||||
)
|
||||
from bot_bottle.manifest import ManifestIndex
|
||||
@@ -90,5 +91,42 @@ class TestGitGateGitconfigRender(unittest.TestCase):
|
||||
self.assertNotIn("gitea.dideric.is", out)
|
||||
|
||||
|
||||
class TestGitconfigValidateValue(unittest.TestCase):
|
||||
"""_gitconfig_validate_value rejects values that would inject gitconfig keys."""
|
||||
|
||||
def test_normal_url_passes(self):
|
||||
_gitconfig_validate_value("url", "ssh://git@github.com/owner/repo.git")
|
||||
|
||||
def test_newline_in_url_raises(self):
|
||||
with self.assertRaises(ValueError):
|
||||
_gitconfig_validate_value("url", "ssh://git@github.com/owner/\nrepo.git")
|
||||
|
||||
def test_carriage_return_in_url_raises(self):
|
||||
with self.assertRaises(ValueError):
|
||||
_gitconfig_validate_value("url", "ssh://git@github.com/\rrepo.git")
|
||||
|
||||
def test_error_message_names_field(self):
|
||||
with self.assertRaises(ValueError, msg="error should name the field") as ctx:
|
||||
_gitconfig_validate_value("repos['bad'].url", "ssh://host/\npath")
|
||||
self.assertIn("repos['bad'].url", str(ctx.exception))
|
||||
|
||||
|
||||
class TestGitconfigRenderRejectsNewlineInUpstream(unittest.TestCase):
|
||||
"""git_gate_render_gitconfig raises on Upstream values with newlines."""
|
||||
|
||||
def test_newline_in_upstream_raises(self):
|
||||
m = ManifestIndex.from_json_obj({
|
||||
"bottles": {"dev": {"git-gate": {"repos": {
|
||||
"evil": {
|
||||
"url": "ssh://git@github.com/owner/\nfake-key = injected\nrepo.git",
|
||||
"key": {"provider": "static", "path": "/dev/null"},
|
||||
},
|
||||
}}}},
|
||||
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
|
||||
})
|
||||
with self.assertRaises(ValueError):
|
||||
git_gate_render_gitconfig(m.bottles["dev"].git, GIT_GATE_HOSTNAME)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
@@ -20,6 +20,7 @@ import supervise as _sv # noqa: E402 # type: ignore
|
||||
|
||||
from bot_bottle import supervise_server # noqa: E402
|
||||
from bot_bottle.supervise_server import (
|
||||
ERR_INTERNAL,
|
||||
ERR_INVALID_PARAMS,
|
||||
ERR_INVALID_REQUEST,
|
||||
ERR_METHOD_NOT_FOUND,
|
||||
@@ -29,7 +30,9 @@ from bot_bottle.supervise_server import (
|
||||
PROPOSED_FILE_FIELD,
|
||||
ServerConfig,
|
||||
TOOL_DEFINITIONS,
|
||||
_RpcClientError,
|
||||
_RpcError,
|
||||
_RpcInternalError,
|
||||
_response_timeout_from_env,
|
||||
format_response_text,
|
||||
handle_initialize,
|
||||
@@ -77,6 +80,65 @@ class TestValidation(unittest.TestCase):
|
||||
self.assertIn("must not change egress logging", cm.exception.message)
|
||||
|
||||
|
||||
# --- Error taxonomy --------------------------------------------------------
|
||||
|
||||
|
||||
class TestRpcErrorTaxonomy(unittest.TestCase):
|
||||
def test_rpc_client_error_is_rpc_error(self):
|
||||
e = _RpcClientError(ERR_INVALID_PARAMS, "bad param")
|
||||
self.assertIsInstance(e, _RpcError)
|
||||
self.assertEqual(ERR_INVALID_PARAMS, e.code)
|
||||
self.assertEqual("bad param", e.message)
|
||||
|
||||
def test_rpc_internal_error_is_rpc_error(self):
|
||||
e = _RpcInternalError("disk full")
|
||||
self.assertIsInstance(e, _RpcError)
|
||||
self.assertEqual(ERR_INTERNAL, e.code)
|
||||
self.assertEqual("disk full", e.message)
|
||||
|
||||
def test_rpc_internal_error_preserves_cause(self):
|
||||
cause = OSError("no space left on device")
|
||||
try:
|
||||
raise _RpcInternalError("failed to write") from cause
|
||||
except _RpcInternalError as e:
|
||||
self.assertIs(cause, e.__cause__)
|
||||
|
||||
def test_parse_error_is_client_error(self):
|
||||
with self.assertRaises(_RpcClientError):
|
||||
parse_jsonrpc(b"{bad json")
|
||||
|
||||
def test_validation_error_is_client_error(self):
|
||||
with self.assertRaises(_RpcClientError):
|
||||
validate_proposed_file(_sv.TOOL_EGRESS_ALLOW, "routes: nope\n")
|
||||
|
||||
def test_unknown_tool_in_tools_call_is_client_error(self):
|
||||
config = ServerConfig(bottle_slug="dev", queue_dir=Path("/unused"))
|
||||
with self.assertRaises(_RpcClientError) as cm:
|
||||
handle_tools_call({"name": "no-such-tool", "arguments": {}}, config)
|
||||
self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code)
|
||||
|
||||
|
||||
class TestRpcInternalErrorOnIoFailure(unittest.TestCase):
|
||||
def test_write_proposal_os_error_raises_internal(self):
|
||||
config = ServerConfig(
|
||||
bottle_slug="dev",
|
||||
queue_dir=Path("/dev/null/cannot-exist"),
|
||||
)
|
||||
with self.assertRaises(_RpcInternalError) as cm:
|
||||
handle_tools_call(
|
||||
{
|
||||
"name": _sv.TOOL_CAPABILITY_BLOCK,
|
||||
"arguments": {
|
||||
"dockerfile": "FROM python:3.13\n",
|
||||
"justification": "x",
|
||||
},
|
||||
},
|
||||
config,
|
||||
)
|
||||
self.assertEqual(ERR_INTERNAL, cm.exception.code)
|
||||
self.assertIsNotNone(cm.exception.__cause__)
|
||||
|
||||
|
||||
# --- JSON-RPC parsing ------------------------------------------------------
|
||||
|
||||
|
||||
@@ -469,6 +531,26 @@ class TestHttpEndToEnd(unittest.TestCase):
|
||||
)
|
||||
self.assertEqual(ERR_METHOD_NOT_FOUND, result["error"]["code"]) # type: ignore[index]
|
||||
|
||||
def test_internal_error_returns_err_internal_over_http(self):
|
||||
with patch.object(
|
||||
supervise_server._sv, "write_proposal",
|
||||
side_effect=OSError("disk full"),
|
||||
):
|
||||
result = self._post_jsonrpc({
|
||||
"jsonrpc": "2.0",
|
||||
"id": 99,
|
||||
"method": "tools/call",
|
||||
"params": {
|
||||
"name": _sv.TOOL_CAPABILITY_BLOCK,
|
||||
"arguments": {
|
||||
"dockerfile": "FROM python:3.13\n",
|
||||
"justification": "x",
|
||||
},
|
||||
},
|
||||
})
|
||||
self.assertIn("error", result)
|
||||
self.assertEqual(ERR_INTERNAL, result["error"]["code"]) # type: ignore[index]
|
||||
|
||||
def test_health_endpoint(self):
|
||||
conn = http.client.HTTPConnection("127.0.0.1", self.port, timeout=5)
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user