fix(dashboard): surface launch/crash failures (#100) #102

Merged
didericis merged 3 commits from fix/dashboard-crash-logging into main 2026-05-29 00:27:22 -04:00
12 changed files with 400 additions and 207 deletions
+7 -1
View File
@@ -7,7 +7,8 @@ from __future__ import annotations
import sys
from ..log import Die, die
from ..log import Die, die, error
from ..manifest import ManifestError
from ._common import PROG
from . import list as _list_mod
from .cleanup import cmd_cleanup
@@ -63,6 +64,11 @@ def main(argv: list[str] | None = None) -> int:
die(f"unknown command: {command}")
try:
return handler(rest) or 0
except ManifestError as e:
# Manifest/config problems surface as a catchable exception;
# print the reason and exit non-zero (same UX die() used to give).
error(str(e))
return 1
except Die as e:
return e.code if isinstance(e.code, int) else 1
except KeyboardInterrupt:
+65 -4
View File
@@ -21,6 +21,7 @@ import subprocess
import sys
import tempfile
import time
import traceback
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
@@ -52,8 +53,8 @@ from ..backend.docker.pipelock_apply import (
parse_allowlist_content,
render_allowlist_content,
)
from ..log import info
from ..manifest import Manifest
from ..log import Die, error, info
from ..manifest import Manifest, ManifestError
from ..supervise import (
ACTION_OPERATOR_EDIT,
COMPONENT_FOR_TOOL,
@@ -1277,9 +1278,57 @@ def cmd_dashboard(argv: list[str]) -> int:
curses.wrapper(_main_loop)
except KeyboardInterrupt:
return 130
except Die as e:
# die() printed the reason to stderr, but that happened while
# curses owned the terminal — the text landed on the alternate
# screen and was wiped when the terminal was restored. Re-surface
# it now that we're back on the normal screen.
if e.message:
error(e.message)
else:
error("dashboard exited on a fatal error (no detail captured).")
return e.code if isinstance(e.code, int) else 1
except Exception as e:
# Any other crash inside the TUI. The traceback would otherwise
# vanish with the alternate screen, so persist it and tell the
# operator where to look.
log_path = _write_crash_log(e)
error(f"dashboard crashed: {type(e).__name__}: {e}")
error(f"full traceback written to {log_path}")
return 1
return 0
def _write_crash_log(exc: BaseException) -> Path:
"""Persist `exc`'s traceback to a stable file under ~/.bot-bottle/
and return its path.
The dashboard runs under curses, so a crash's stderr/traceback is
painted onto the alternate screen and lost when the terminal is
restored — this leaves the operator a durable record of *why* it
died. Best-effort: falls back to a tempfile if the home dir can't
be written."""
stamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
body = "".join(
traceback.format_exception(type(exc), exc, exc.__traceback__)
)
entry = f"=== dashboard crash {stamp} ===\n{body}\n"
try:
log_dir = _supervise.bot_bottle_root() / "logs"
log_dir.mkdir(parents=True, exist_ok=True)
path = log_dir / "dashboard-crash.log"
with path.open("a", encoding="utf-8") as fh:
fh.write(entry)
return path
except OSError:
fd, tmp = tempfile.mkstemp(
prefix="bot-bottle-dashboard-crash-", suffix=".log",
)
with os.fdopen(fd, "w", encoding="utf-8") as fh:
fh.write(entry)
return Path(tmp)
def _list_once() -> int:
pending = discover_pending()
if not pending:
@@ -1407,8 +1456,17 @@ def _main_loop(stdscr: "curses._CursesWindow") -> None:
if manifest_cache[0] is None:
manifest_cache[0] = Manifest.resolve(USER_CWD, missing_ok=True)
return manifest_cache[0]
if not _get_manifest().bottles and not _get_manifest().agents:
status_line = "warning: no bot-bottle config/agents found; new-agent picker is empty"
# A malformed manifest must not take the whole dashboard down — the
# operator may just be watching running bottles. Degrade to a
# status-line warning instead. (Any non-config error propagates to
Outdated
Review

Is there a good reason to die when a manifest can't be read? Seems like that should be an exception.

Is there a good reason to die when a manifest can't be read? Seems like that should be an exception.
# cmd_dashboard's crash handler.)
try:
_loaded = _get_manifest()
except ManifestError as e:
status_line = f"config error: {e}"
else:
if not _loaded.bottles and not _loaded.agents:
status_line = "warning: no bot-bottle config/agents found; new-agent picker is empty"
# First-tick guard: a brand-new dashboard finds any
# pre-existing queue entries on its first poll; those
# shouldn't ring the bell as if they just arrived.
@@ -1494,6 +1552,9 @@ def _main_loop(stdscr: "curses._CursesWindow") -> None:
# bottle running.
try:
manifest = _get_manifest()
except ManifestError as e:
status_line = f"config error: {e}"
continue
except Exception as e:
status_line = f"manifest load failed: {e}"
continue
+15 -3
View File
@@ -14,11 +14,23 @@ def warn(msg: str) -> None:
print(f"bot-bottle: warning: {msg}", file=sys.stderr)
def error(msg: str) -> None:
print(f"bot-bottle: error: {msg}", file=sys.stderr)
class Die(SystemExit):
"""Raised by die() so callers (and tests) can distinguish a deliberate
fatal exit from an unrelated SystemExit."""
fatal exit from an unrelated SystemExit.
Carries the human-facing message so a caller that suppressed stderr
— e.g. the curses dashboard, whose alternate screen is wiped when the
terminal is restored — can re-surface the reason after the fact."""
def __init__(self, code: int = 1, message: str = "") -> None:
super().__init__(code)
self.message = message
def die(msg: str) -> NoReturn:
print(f"bot-bottle: error: {msg}", file=sys.stderr)
raise Die(1)
error(msg)
raise Die(1, msg)
+85 -91
View File
@@ -29,7 +29,7 @@ Agent schema (frontmatter):
name, description, model, color, memory
The agent file's Markdown body is the system prompt (stripped).
Unknown top-level frontmatter keys die with a hint.
Unknown top-level frontmatter keys raise ManifestError with a hint.
Bottles can ONLY live under $HOME. A bottles/ dir under $CWD is a
warn at load time and contributes nothing. The trust boundary is
@@ -44,7 +44,6 @@ on-disk files.
from __future__ import annotations
import ipaddress
import json
import os
import re
from dataclasses import dataclass, field, replace
@@ -52,10 +51,14 @@ from pathlib import Path
from typing import Mapping, cast
from .agent_provider import PROVIDER_TEMPLATES
from .log import die, warn
from .log import warn
from .yaml_subset import YamlSubsetError, parse_frontmatter
class ManifestError(Exception):
"""A manifest file (or the manifest tree) is invalid."""
def _empty_str_dict() -> dict[str, str]:
return {}
@@ -101,7 +104,7 @@ class GitEntry:
cls, bottle_name: str, host_key: str, raw: object
) -> "GitEntry":
if not host_key:
die(f"bottle '{bottle_name}' git.remotes has an empty host key")
raise ManifestError(f"bottle '{bottle_name}' git.remotes has an empty host key")
d = _as_json_object(raw, f"bottle '{bottle_name}' git.remotes[{host_key!r}]")
return cls._from_object(
bottle_name, d, f"git.remotes[{host_key!r}]", host_key,
@@ -117,19 +120,19 @@ class GitEntry:
) -> "GitEntry":
name = d.get("Name")
if not isinstance(name, str) or not name:
die(
raise ManifestError(
f"bottle '{bottle_name}' {label} missing required string "
f"field 'Name'"
)
upstream = d.get("Upstream")
if not isinstance(upstream, str) or not upstream:
die(
raise ManifestError(
f"bottle '{bottle_name}' {label} '{name}' missing required string field "
f"'Upstream'"
)
ident = d.get("IdentityFile")
if not isinstance(ident, str) or not ident:
die(
raise ManifestError(
f"bottle '{bottle_name}' {label} '{name}' missing required string field "
f"'IdentityFile'"
)
@@ -149,7 +152,7 @@ class GitEntry:
and host_key != host
and not _is_ip_literal(host)
):
die(
raise ManifestError(
f"bottle '{bottle_name}' git.remotes key {host_key!r} "
f"does not match Upstream host {host!r}"
)
@@ -225,24 +228,24 @@ class AgentProvider:
d = _as_json_object(raw, f"bottle '{bottle_name}' agent_provider")
for k in d:
if k not in {"template", "dockerfile"}:
die(
raise ManifestError(
f"bottle '{bottle_name}' agent_provider has unknown key {k!r}; "
f"allowed: template, dockerfile"
)
template = d.get("template", "claude")
if not isinstance(template, str) or not template:
die(
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.template must be a "
f"non-empty string"
)
if template not in PROVIDER_TEMPLATES:
die(
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.template {template!r} "
f"is not one of {', '.join(sorted(PROVIDER_TEMPLATES))}"
)
dockerfile = d.get("dockerfile", "")
if not isinstance(dockerfile, str):
die(
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.dockerfile must be a "
f"string (was {type(dockerfile).__name__})"
)
@@ -269,24 +272,24 @@ class GitUser:
d = _as_json_object(raw, f"bottle '{bottle_name}' git.user")
for k in d.keys():
if k not in {"name", "email"}:
die(
raise ManifestError(
f"bottle '{bottle_name}' git.user has unknown key {k!r}; "
f"allowed: name, email"
)
name = d.get("name", "")
email = d.get("email", "")
if not isinstance(name, str):
die(
raise ManifestError(
f"bottle '{bottle_name}' git.user.name must be a string "
f"(was {type(name).__name__})"
)
if not isinstance(email, str):
die(
raise ManifestError(
f"bottle '{bottle_name}' git.user.email must be a string "
f"(was {type(email).__name__})"
)
if not name and not email:
die(
raise ManifestError(
f"bottle '{bottle_name}' git.user is set but neither "
f"name nor email is non-empty; remove the block or "
f"fill at least one field."
@@ -304,7 +307,7 @@ def _parse_git_config(
d = _as_json_object(raw, f"bottle '{bottle_name}' git")
for k in d.keys():
if k not in {"user", "remotes"}:
die(
raise ManifestError(
f"bottle '{bottle_name}' git has unknown key {k!r}; "
f"allowed: user, remotes"
)
@@ -352,34 +355,34 @@ class PipelockRoutePolicy:
d = _as_json_object(raw, label)
for k in d:
if k not in ("tls_passthrough", "ssrf_ip_allowlist"):
die(
raise ManifestError(
f"{label} has unknown key {k!r}; "
f"only 'tls_passthrough' and 'ssrf_ip_allowlist' "
f"are accepted"
)
tls_passthrough_raw = d.get("tls_passthrough", False)
if not isinstance(tls_passthrough_raw, bool):
die(
raise ManifestError(
f"{label}.tls_passthrough must be a boolean "
f"(was {type(tls_passthrough_raw).__name__})"
)
ssrf_raw = d.get("ssrf_ip_allowlist", [])
if not isinstance(ssrf_raw, list):
die(
raise ManifestError(
f"{label}.ssrf_ip_allowlist must be an array "
f"(was {type(ssrf_raw).__name__})"
)
ssrf_ip_allowlist: list[str] = []
for j, item in enumerate(ssrf_raw):
if not isinstance(item, str) or not item:
die(
raise ManifestError(
f"{label}.ssrf_ip_allowlist[{j}] must be a non-empty "
f"string (was {type(item).__name__})"
)
try:
ipaddress.ip_network(item, strict=False)
except ValueError as e:
die(
raise ManifestError(
f"{label}.ssrf_ip_allowlist[{j}] must be an IP address "
f"or CIDR (was {item!r}): {e}"
)
@@ -434,13 +437,13 @@ class EgressRoute:
d = _as_json_object(raw, label)
host = d.get("host")
if not isinstance(host, str) or not host:
die(f"{label} missing required string field 'host'")
raise ManifestError(f"{label} missing required string field 'host'")
path_allow_raw = d.get("path_allowlist")
prefixes: tuple[str, ...] = ()
if path_allow_raw is not None:
if not isinstance(path_allow_raw, list):
die(
raise ManifestError(
f"{label} path_allowlist must be an array "
f"(was {type(path_allow_raw).__name__})"
)
@@ -448,12 +451,12 @@ class EgressRoute:
collected: list[str] = []
for j, p in enumerate(path_list):
if not isinstance(p, str):
die(
raise ManifestError(
f"{label} path_allowlist[{j}] must be a string "
f"(was {type(p).__name__})"
)
if not p.startswith("/"):
die(
raise ManifestError(
f"{label} path_allowlist[{j}] {p!r} must be an "
f"absolute path prefix starting with '/'"
)
@@ -466,31 +469,31 @@ class EgressRoute:
auth_raw = d.get("auth")
auth_d = _as_json_object(auth_raw, f"{label} auth")
if not auth_d:
die(
raise ManifestError(
f"{label} auth is empty ({{}}); omit the 'auth' key "
f"entirely if this route is unauthenticated. Otherwise "
f"both 'scheme' and 'token_ref' are required."
)
auth_scheme_raw = auth_d.get("scheme")
if not isinstance(auth_scheme_raw, str) or not auth_scheme_raw:
die(
raise ManifestError(
f"{label} auth.scheme is required when 'auth' is set "
f"(non-empty string)"
)
if auth_scheme_raw not in EGRESS_AUTH_SCHEMES:
die(
raise ManifestError(
f"{label} auth.scheme {auth_scheme_raw!r} is not one of "
f"{', '.join(EGRESS_AUTH_SCHEMES)}"
)
token_ref_raw = auth_d.get("token_ref")
if not isinstance(token_ref_raw, str) or not token_ref_raw:
die(
raise ManifestError(
f"{label} auth.token_ref is required when 'auth' is set "
f"(name of the host env var holding the token value)"
)
for k in auth_d:
if k not in ("scheme", "token_ref"):
die(
raise ManifestError(
f"{label} auth has unknown key {k!r}; "
f"only 'scheme' and 'token_ref' are accepted"
)
@@ -508,17 +511,17 @@ class EgressRoute:
collected_roles: list[str] = []
for r in role_list:
if not isinstance(r, str):
die(f"{label} role items must be strings (got {type(r).__name__})")
raise ManifestError(f"{label} role items must be strings (got {type(r).__name__})")
collected_roles.append(r)
roles = tuple(collected_roles)
else:
die(
raise ManifestError(
f"{label} role must be a string or a list of strings "
f"(was {type(role_raw).__name__})"
)
for r in roles:
if r not in EGRESS_ROLES:
die(
raise ManifestError(
f"{label} role {r!r} is not one of "
f"{', '.join(sorted(EGRESS_ROLES))}"
)
@@ -531,7 +534,7 @@ class EgressRoute:
for k in d:
if k not in ("host", "path_allowlist", "auth", "role", "pipelock"):
die(
raise ManifestError(
f"{label} has unknown key {k!r}; accepted keys are "
f"'host', 'path_allowlist', 'auth', 'role', 'pipelock'"
)
@@ -564,7 +567,7 @@ class EgressConfig:
routes: tuple[EgressRoute, ...] = ()
if routes_raw is not None:
if not isinstance(routes_raw, list):
die(
raise ManifestError(
f"bottle '{bottle_name}' egress.routes must be an array "
f"(was {type(routes_raw).__name__})"
)
@@ -578,7 +581,7 @@ class EgressConfig:
)
for k in d:
if k != "routes":
die(
raise ManifestError(
f"bottle '{bottle_name}' egress has unknown key {k!r}; "
f"only 'routes' is accepted"
)
@@ -611,7 +614,7 @@ class Bottle:
d = _as_json_object(raw, f"bottle '{name}'")
if "runtime" in d:
die(
raise ManifestError(
f"bottle '{name}' has a 'runtime' field, which is no longer "
f"supported. gVisor (runsc) is now auto-detected by the "
f"backend; remove the 'runtime' field from the bottle "
@@ -619,7 +622,7 @@ class Bottle:
)
if "ssh" in d:
die(
raise ManifestError(
f"bottle '{name}' has an 'ssh' field, which has been removed "
f"(PRD 0009). Move each entry to 'git': declare the upstream "
f"as a git remote with Name + Upstream URL + IdentityFile, "
@@ -628,7 +631,7 @@ class Bottle:
)
if "git_user" in d:
die(
raise ManifestError(
f"bottle '{name}' has a 'git_user' field, which has been "
f"removed. Move it under 'git.user'."
)
@@ -636,7 +639,7 @@ class Bottle:
unknown = set(d.keys()) - _BOTTLE_KEYS
if unknown:
allowed = ", ".join(sorted(_BOTTLE_KEYS))
die(
raise ManifestError(
f"bottle '{name}' has unknown key(s) {sorted(unknown)}; "
f"allowed keys are {allowed}."
)
@@ -647,7 +650,7 @@ class Bottle:
env_dict = _as_json_object(env_raw, f"bottle '{name}' env")
for var, value in env_dict.items():
if not isinstance(value, str):
die(
raise ManifestError(
f"env entry {var} in bottle '{name}' must be a JSON string "
f"(was {type(value).__name__}). Use \"?<message>\" for prompt-at-runtime."
)
@@ -676,7 +679,7 @@ class Bottle:
supervise_raw = d.get("supervise", False)
if not isinstance(supervise_raw, bool):
die(
raise ManifestError(
f"bottle '{name}' supervise must be a boolean "
f"(was {type(supervise_raw).__name__})"
)
@@ -704,10 +707,10 @@ class Agent:
bottle = d.get("bottle")
if not isinstance(bottle, str) or not bottle:
die(f"agent '{name}' must declare a 'bottle' field naming a defined bottle")
raise ManifestError(f"agent '{name}' must declare a 'bottle' field naming a defined bottle")
if bottle not in bottle_names:
available = ", ".join(sorted(bottle_names)) or "(none defined)"
die(
raise ManifestError(
f"agent '{name}' references bottle '{bottle}', which is not defined. "
f"Available: {available}"
)
@@ -716,12 +719,12 @@ class Agent:
skills_raw = d.get("skills")
if skills_raw is not None:
if not isinstance(skills_raw, list):
die(f"agent '{name}' skills must be an array (was {type(skills_raw).__name__})")
raise ManifestError(f"agent '{name}' skills must be an array (was {type(skills_raw).__name__})")
collected: list[str] = []
skills_list = cast(list[object], skills_raw)
for i, skill in enumerate(skills_list):
if not isinstance(skill, str):
die(
raise ManifestError(
f"agent '{name}' skills[{i}] must be a string "
f"(was {type(skill).__name__})"
)
@@ -734,7 +737,7 @@ class Agent:
elif isinstance(prompt_raw, str):
prompt = prompt_raw
else:
die(f"agent '{name}' prompt must be a string (was {type(prompt_raw).__name__})")
raise ManifestError(f"agent '{name}' prompt must be a string (was {type(prompt_raw).__name__})")
# git: agents may declare only `git.user` (name/email). Any
# other git key — notably `remotes` — is rejected: remotes
@@ -745,7 +748,7 @@ class Agent:
gd = _as_json_object(git_raw, f"agent '{name}' git")
for k in gd.keys():
if k != "user":
die(
raise ManifestError(
f"agent '{name}' git.{k} is not allowed at the "
f"agent level; only git.user (name/email) may be "
f"set on an agent. git.remotes is bottle-only "
@@ -798,7 +801,7 @@ class Manifest:
if not home_md.is_dir():
if missing_ok:
return cls.from_json_obj({"bottles": {}, "agents": {}})
die(
raise ManifestError(
f"no manifest found: {home_md} does not exist. "
f"See README.md for the per-file Markdown layout "
f"(PRD 0011)."
@@ -880,8 +883,8 @@ class Manifest:
return
available = ", ".join(self.agents.keys())
if available:
die(f"agent '{name}' not defined in bot-bottle.json. Available: {available}")
die(f"agent '{name}' not defined in bot-bottle.json (manifest is empty).")
raise ManifestError(f"agent '{name}' not defined in bot-bottle.json. Available: {available}")
raise ManifestError(f"agent '{name}' not defined in bot-bottle.json (manifest is empty).")
def has_bottle(self, name: str) -> bool:
return name in self.bottles
@@ -891,11 +894,11 @@ class Manifest:
return
available = ", ".join(self.bottles.keys())
if available:
die(
raise ManifestError(
f"bottle '{name}' not defined in bot-bottle.json. "
f"Available bottles: {available}"
)
die(f"bottle '{name}' not defined in bot-bottle.json (no bottles defined).")
raise ManifestError(f"bottle '{name}' not defined in bot-bottle.json (no bottles defined).")
def _effective_git_user(self, agent_name: str) -> GitUser:
"""Merge the agent's git.user over the referenced bottle's,
@@ -948,12 +951,12 @@ def _as_json_object(value: object, label: str) -> dict[str, object]:
a view typed as `dict[str, object]` so downstream `.get(...)` calls
have a typed surface."""
if not isinstance(value, dict):
die(f"{label} must be a JSON object (was {type(value).__name__})")
raise ManifestError(f"{label} must be a JSON object (was {type(value).__name__})")
items = cast(dict[object, object], value)
out: dict[str, object] = {}
for k, v in items.items():
if not isinstance(k, str):
die(f"{label} keys must be strings (found {type(k).__name__})")
raise ManifestError(f"{label} keys must be strings (found {type(k).__name__})")
out[k] = v
return out
@@ -965,20 +968,11 @@ def _section_dict(value: object, label: str) -> dict[str, object]:
return _as_json_object(value, label)
def _load_json_or_die(path: Path) -> dict[str, object]:
try:
with path.open() as f:
doc: object = json.load(f)
except json.JSONDecodeError:
die(f"bot-bottle.json at {path} is not valid JSON")
return _as_json_object(doc, f"bot-bottle.json at {path}")
def _opt_str(value: object, label: str) -> str:
if value is None:
return ""
if not isinstance(value, str):
die(f"{label} must be a string (was {type(value).__name__})")
raise ManifestError(f"{label} must be a string (was {type(value).__name__})")
return value
@@ -993,11 +987,11 @@ def _opt_extra_hosts(value: object, label: str) -> dict[str, str]:
out: dict[str, str] = {}
for host, ip in obj.items():
if not host:
die(f"{label} contains an empty hostname key")
raise ManifestError(f"{label} contains an empty hostname key")
if not isinstance(ip, str):
die(f"{label}['{host}'] must be a string (was {type(ip).__name__})")
raise ManifestError(f"{label}['{host}'] must be a string (was {type(ip).__name__})")
if not ip:
die(f"{label}['{host}'] must be a non-empty string")
raise ManifestError(f"{label}['{host}'] must be a non-empty string")
out[host] = ip
return out
@@ -1007,27 +1001,27 @@ def _parse_git_upstream(url: str, label: str) -> tuple[str, str, str, str]:
Dies if `url` doesn't match the ssh:// shape v1 supports. Default
port is 22 (matches OpenSSH)."""
if not url.startswith("ssh://"):
die(f"{label} must be an ssh:// URL (was {url!r})")
raise ManifestError(f"{label} must be an ssh:// URL (was {url!r})")
rest = url[len("ssh://"):]
if "@" not in rest:
die(f"{label} must include a user (e.g. ssh://git@host/path.git); was {url!r}")
raise ManifestError(f"{label} must include a user (e.g. ssh://git@host/path.git); was {url!r}")
user, _, hostpart = rest.partition("@")
if not user:
die(f"{label} user is empty in {url!r}")
raise ManifestError(f"{label} user is empty in {url!r}")
if "/" not in hostpart:
die(f"{label} must include a path (e.g. ssh://git@host/path.git); was {url!r}")
raise ManifestError(f"{label} must include a path (e.g. ssh://git@host/path.git); was {url!r}")
hostport, _, path = hostpart.partition("/")
if not path:
die(f"{label} path is empty in {url!r}")
raise ManifestError(f"{label} path is empty in {url!r}")
if ":" in hostport:
host, _, port = hostport.partition(":")
if not port.isdigit():
die(f"{label} port must be numeric in {url!r}")
raise ManifestError(f"{label} port must be numeric in {url!r}")
else:
host = hostport
port = "22"
if not host:
die(f"{label} host is empty in {url!r}")
raise ManifestError(f"{label} host is empty in {url!r}")
return (user, host, port, path)
@@ -1061,7 +1055,7 @@ def _validate_egress_routes(
for r in routes:
key = r.Host.lower()
if key in seen_hosts:
die(
raise ManifestError(
f"bottle '{bottle_name}' egress.routes has duplicate host "
f"{r.Host!r}; each host must be unique on the proxy."
)
@@ -1070,7 +1064,7 @@ def _validate_egress_routes(
with_role = [r for r in routes if role in r.Role]
if len(with_role) > 1:
hosts = ", ".join(r.Host for r in with_role)
die(
raise ManifestError(
f"bottle '{bottle_name}' egress.routes has {len(with_role)} "
f"routes with role {role!r} (hosts: {hosts}); this role drives a "
f"single launch-step side effect — pick one."
@@ -1079,7 +1073,7 @@ def _validate_egress_routes(
for route in routes:
for role in route.Role:
if role not in allowed_roles:
die(
raise ManifestError(
f"bottle '{bottle_name}' egress route for host "
f"{route.Host!r} has role {role!r}, but provider "
f"{agent_provider_template!r} only accepts roles "
@@ -1091,7 +1085,7 @@ def _validate_unique_git_names(bottle_name: str, git: tuple[GitEntry, ...]) -> N
seen: dict[str, None] = {}
for g in git:
if g.Name in seen:
die(
raise ManifestError(
f"bottle '{bottle_name}' git entries have duplicate Name '{g.Name}'; "
f"each entry maps to a distinct bare repo on the gate."
)
@@ -1132,7 +1126,7 @@ def _check_stale_json(dir_path: Path, md_dir: Path, label: str) -> None:
to silently leave the JSON content unused."""
legacy = dir_path / "bot-bottle.json"
if legacy.is_file() and not md_dir.exists():
die(
raise ManifestError(
f"found {legacy} but {md_dir} does not exist. The manifest "
f"format changed in PRD 0011 — rewrite the JSON content "
f"as per-file Markdown under {md_dir}/bottles/ and "
@@ -1176,13 +1170,13 @@ def _load_bottles_from_dir(bottles_dir: Path) -> dict[str, Bottle]:
try:
fm, _body = parse_frontmatter(path.read_text())
except OSError as e:
die(f"could not read {path}: {e}")
raise ManifestError(f"could not read {path}: {e}")
except YamlSubsetError as e:
die(f"{path}: {e}")
raise ManifestError(f"{path}: {e}")
unknown = set(fm.keys()) - _BOTTLE_KEYS
if unknown:
allowed = ", ".join(sorted(_BOTTLE_KEYS))
die(
raise ManifestError(
f"bottle file {path}: unknown frontmatter key(s) "
f"{sorted(unknown)}; allowed keys are {allowed}."
)
@@ -1214,7 +1208,7 @@ def _resolve_one_bottle(
return cache[name]
if name in seen:
chain = " -> ".join(seen + (name,))
die(f"bottle '{name}' is in an extends cycle: {chain}")
raise ManifestError(f"bottle '{name}' is in an extends cycle: {chain}")
raw = raws[name]
parent_name_raw = raw.get("extends")
# Strip `extends:` before passing to Bottle.from_dict so it
@@ -1228,19 +1222,19 @@ def _resolve_one_bottle(
return bottle
if not isinstance(parent_name_raw, str):
die(
raise ManifestError(
f"bottle '{name}' extends must be a string "
f"(was {type(parent_name_raw).__name__})"
)
parent_name: str = parent_name_raw
if parent_name == name:
die(
raise ManifestError(
f"bottle '{name}' extends itself; remove the "
f"self-reference"
)
if parent_name not in raws:
avail = ", ".join(sorted(raws.keys())) or "(none)"
die(
raise ManifestError(
f"bottle '{name}' extends '{parent_name}' which is not "
f"defined. Available bottles: {avail}"
)
@@ -1351,13 +1345,13 @@ def _load_agents_from_dir(
try:
fm, body = parse_frontmatter(path.read_text())
except OSError as e:
die(f"could not read {path}: {e}")
raise ManifestError(f"could not read {path}: {e}")
except YamlSubsetError as e:
die(f"{path}: {e}")
raise ManifestError(f"{path}: {e}")
unknown = set(fm.keys()) - _AGENT_KEYS
if unknown:
allowed = ", ".join(sorted(_AGENT_KEYS))
die(
raise ManifestError(
f"agent file {path}: unknown frontmatter key(s) "
f"{sorted(unknown)}; allowed keys are {allowed}."
)
+140
View File
@@ -0,0 +1,140 @@
"""Unit: dashboard launch/crash failure logging (issue #100).
The dashboard runs under curses, so anything written to stderr while the
TUI owns the terminal is wiped when the terminal is restored. These
tests lock the recovery paths: a config error (`Die`) is re-surfaced
after the wrapper returns, and an unexpected crash is persisted to a
log file the operator can read.
"""
from __future__ import annotations
import contextlib
import io
import tempfile
import unittest
from pathlib import Path
from unittest import mock
from bot_bottle import supervise
from bot_bottle.cli import dashboard
from bot_bottle.log import Die, die
class TestDieCarriesMessage(unittest.TestCase):
def test_die_attaches_message_and_code(self):
buf = io.StringIO()
with contextlib.redirect_stderr(buf):
with self.assertRaises(Die) as cm:
die("bad manifest: unknown key 'foo'")
self.assertEqual("bad manifest: unknown key 'foo'", cm.exception.message)
self.assertEqual(1, cm.exception.code)
self.assertIn(
"bot-bottle: error: bad manifest: unknown key 'foo'", buf.getvalue()
)
def test_die_default_message_is_empty(self):
self.assertEqual("", Die(1).message)
self.assertEqual(1, Die(1).code)
class _FakeHomeMixin:
"""Point supervise.bot_bottle_root (what _write_crash_log resolves
through) at a temp dir so the crash log doesn't touch the real
~/.bot-bottle."""
def _setup_fake_home(self):
self._tmp = tempfile.TemporaryDirectory(prefix="dash-crash-test.")
self._orig_root = supervise.bot_bottle_root
self._root = Path(self._tmp.name) / ".bot-bottle"
supervise.bot_bottle_root = lambda: self._root # type: ignore[assignment]
def _teardown_fake_home(self):
supervise.bot_bottle_root = self._orig_root # type: ignore[assignment]
self._tmp.cleanup()
class TestCmdDashboardErrorPaths(_FakeHomeMixin, unittest.TestCase):
def setUp(self):
self._setup_fake_home()
def tearDown(self):
self._teardown_fake_home()
def test_keyboard_interrupt_returns_130(self):
with mock.patch.object(
dashboard.curses, "wrapper", side_effect=KeyboardInterrupt
):
self.assertEqual(130, dashboard.cmd_dashboard([]))
def test_die_resurfaces_message_after_curses(self):
buf = io.StringIO()
with mock.patch.object(
dashboard.curses, "wrapper",
side_effect=Die(1, "manifest parse error at line 3"),
):
with contextlib.redirect_stderr(buf):
rc = dashboard.cmd_dashboard([])
self.assertEqual(1, rc)
self.assertIn("manifest parse error at line 3", buf.getvalue())
def test_die_without_message_has_fallback(self):
buf = io.StringIO()
with mock.patch.object(dashboard.curses, "wrapper", side_effect=Die(1)):
with contextlib.redirect_stderr(buf):
rc = dashboard.cmd_dashboard([])
self.assertEqual(1, rc)
self.assertIn("fatal error", buf.getvalue())
def test_unexpected_exception_writes_crash_log(self):
buf = io.StringIO()
with mock.patch.object(
dashboard.curses, "wrapper",
side_effect=ValueError("kaboom in render"),
):
with contextlib.redirect_stderr(buf):
rc = dashboard.cmd_dashboard([])
self.assertEqual(1, rc)
out = buf.getvalue()
self.assertIn("dashboard crashed: ValueError: kaboom in render", out)
self.assertIn("full traceback written to", out)
log_path = self._root / "logs" / "dashboard-crash.log"
self.assertTrue(log_path.exists())
content = log_path.read_text()
self.assertIn("kaboom in render", content)
self.assertIn("Traceback (most recent call last)", content)
class TestWriteCrashLog(_FakeHomeMixin, unittest.TestCase):
def setUp(self):
self._setup_fake_home()
def tearDown(self):
self._teardown_fake_home()
def test_appends_traceback_with_header(self):
try:
raise RuntimeError("explode")
except RuntimeError as e:
path = dashboard._write_crash_log(e)
self.assertEqual(self._root / "logs" / "dashboard-crash.log", path)
text = path.read_text()
self.assertIn("=== dashboard crash", text)
self.assertIn("RuntimeError: explode", text)
def test_falls_back_to_tempfile_when_home_unwritable(self):
# bot_bottle_root points at a *file*, so mkdir under it raises
# OSError and the helper must fall back to a tempfile.
bad = Path(self._tmp.name) / "not-a-dir"
bad.write_text("x")
supervise.bot_bottle_root = lambda: bad # type: ignore[assignment]
try:
raise RuntimeError("explode2")
except RuntimeError as e:
path = dashboard._write_crash_log(e)
self.assertTrue(path.exists())
self.assertIn("explode2", path.read_text())
if __name__ == "__main__":
unittest.main()
+12 -16
View File
@@ -12,8 +12,6 @@ a temp-dir case locks the md loader (the `_AGENT_KEYS` allow + the
from __future__ import annotations
import contextlib
import io
import os
import shutil
import tempfile
@@ -21,18 +19,16 @@ import textwrap
import unittest
from pathlib import Path
from bot_bottle.log import Die
from bot_bottle.manifest import Manifest
from bot_bottle.manifest import ManifestError, Manifest
def _die_message(callable_, *args, **kwargs) -> str:
buf = io.StringIO()
with contextlib.redirect_stderr(buf):
try:
callable_(*args, **kwargs)
except Die:
return buf.getvalue()
raise AssertionError("expected Die was not raised")
def _error_message(callable_, *args, **kwargs) -> str:
"""Run `callable_` expecting a ManifestError; return its message."""
try:
callable_(*args, **kwargs)
except ManifestError as e:
return str(e)
raise AssertionError("expected ManifestError was not raised")
def _manifest(*, bottle_user=None, agent_git=None) -> Manifest:
@@ -120,19 +116,19 @@ class TestAgentGitUserOverlay(unittest.TestCase):
class TestAgentGitUserRejections(unittest.TestCase):
def test_agent_remotes_dies_bottle_only(self):
msg = _die_message(_manifest, agent_git={
msg = _error_message(_manifest, agent_git={
"remotes": {"h": {"Name": "r", "Upstream": "ssh://x/y.git"}},
})
self.assertIn("git.remotes", msg)
self.assertIn("bottle-only", msg)
def test_agent_unknown_git_subkey_dies(self):
msg = _die_message(_manifest, agent_git={"nope": {}})
msg = _error_message(_manifest, agent_git={"nope": {}})
self.assertIn("not allowed at the agent level", msg)
def test_agent_git_user_both_empty_dies(self):
# Reuses GitUser.from_dict validation.
msg = _die_message(_manifest, agent_git={"user": {"name": "", "email": ""}})
msg = _error_message(_manifest, agent_git={"user": {"name": "", "email": ""}})
self.assertIn("neither name nor email", msg)
@@ -239,7 +235,7 @@ class TestAgentGitUserMdLoader(unittest.TestCase):
def test_md_agent_remotes_dies(self):
self._write("bottles/dev.md", _BOTTLE_DEV)
self._write("agents/impl.md", _AGENT_WITH_REMOTES)
msg = _die_message(Manifest.resolve, str(self.home))
msg = _error_message(Manifest.resolve, str(self.home))
self.assertIn("git.remotes", msg)
self.assertIn("bottle-only", msg)
+26 -27
View File
@@ -7,8 +7,7 @@ auth omission means unauthenticated."""
import unittest
from bot_bottle.log import Die
from bot_bottle.manifest import EgressRoute, Manifest
from bot_bottle.manifest import ManifestError, EgressRoute, Manifest
def _bottle(routes):
@@ -41,15 +40,15 @@ class TestMinimalRoute(unittest.TestCase):
self.assertEqual("", r.TokenRef)
def test_host_required(self):
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
_bottle([{}])
def test_host_must_be_non_empty(self):
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
_bottle([{"host": ""}])
def test_unknown_top_level_key_dies(self):
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
_bottle([{"host": "x.example", "wat": "yes"}])
@@ -59,15 +58,15 @@ class TestPathAllowlist(unittest.TestCase):
self.assertEqual((), b.egress.routes[0].PathAllowlist)
def test_must_be_array(self):
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
_bottle([{"host": "x.example", "path_allowlist": "/x/"}])
def test_items_must_be_strings(self):
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
_bottle([{"host": "x.example", "path_allowlist": [42]}])
def test_items_must_be_absolute_paths(self):
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
_bottle([{"host": "x.example", "path_allowlist": ["nope/"]}])
def test_full_list(self):
@@ -100,25 +99,25 @@ class TestAuth(unittest.TestCase):
def test_empty_auth_block_rejected(self):
# Per PRD 0017: `auth: {}` is an error, not a synonym for
# "no auth" — that's what omission is for.
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
_bottle([{"host": "x.example", "auth": {}}])
def test_missing_scheme_rejected(self):
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
_bottle([{
"host": "x.example",
"auth": {"token_ref": "T"},
}])
def test_missing_token_ref_rejected(self):
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
_bottle([{
"host": "x.example",
"auth": {"scheme": "Bearer"},
}])
def test_unknown_scheme_rejected(self):
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
_bottle([{
"host": "x.example",
"auth": {"scheme": "Basic", "token_ref": "T"},
@@ -133,7 +132,7 @@ class TestAuth(unittest.TestCase):
self.assertEqual("token", b.egress.routes[0].AuthScheme)
def test_unknown_auth_key_rejected(self):
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
_bottle([{
"host": "x.example",
"auth": {"scheme": "Bearer", "token_ref": "T", "extra": "no"},
@@ -166,22 +165,22 @@ class TestRole(unittest.TestCase):
def test_unknown_role_rejected(self):
# The role enum is locked down — typos shouldn't silently
# become no-op markers.
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
_bottle([{"host": "x.example", "role": "totally-made-up"}])
def test_non_string_role_rejected(self):
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
_bottle([{"host": "x.example", "role": 42}])
def test_list_with_non_string_item_rejected(self):
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
_bottle([{"host": "x.example",
"role": ["claude_code_oauth", 42]}])
def test_singleton_claude_code_oauth_enforced(self):
# Two routes both claiming the role would make "which one
# drives the placeholder env?" ambiguous.
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
_bottle([
{"host": "api.anthropic.com", "role": "claude_code_oauth",
"auth": {"scheme": "Bearer", "token_ref": "T1"}},
@@ -199,7 +198,7 @@ class TestRole(unittest.TestCase):
self.assertEqual(("codex_auth",), b.egress.routes[0].Role)
def test_claude_role_rejected_for_codex_provider(self):
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
_provider_bottle("codex", [{
"host": "api.anthropic.com",
"role": "claude_code_oauth",
@@ -207,7 +206,7 @@ class TestRole(unittest.TestCase):
}])
def test_codex_role_rejected_for_default_claude_provider(self):
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
_bottle([{
"host": "api.openai.com",
"role": "codex_auth",
@@ -239,32 +238,32 @@ class TestPipelockPolicy(unittest.TestCase):
self.assertEqual((), b.egress.routes[0].Pipelock.SsrfIpAllowlist)
def test_pipelock_policy_must_be_object(self):
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
_bottle([{"host": "x.example", "pipelock": True}])
def test_tls_passthrough_must_be_bool(self):
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
_bottle([{
"host": "x.example",
"pipelock": {"tls_passthrough": "yes"},
}])
def test_ssrf_ip_allowlist_must_be_array(self):
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
_bottle([{
"host": "x.example",
"pipelock": {"ssrf_ip_allowlist": "100.78.141.42/32"},
}])
def test_ssrf_ip_allowlist_items_must_be_cidr_or_ip(self):
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
_bottle([{
"host": "x.example",
"pipelock": {"ssrf_ip_allowlist": ["not-an-ip"]},
}])
def test_unknown_pipelock_key_rejected(self):
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
_bottle([{"host": "x.example", "pipelock": {"wat": True}}])
@@ -273,14 +272,14 @@ class TestRouteValidation(unittest.TestCase):
# Routes match by exact host; duplicates leave the choice
# ambiguous, so we reject them up front rather than picking
# the first/last silently.
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
_bottle([
{"host": "github.com"},
{"host": "github.com", "path_allowlist": ["/x/"]},
])
def test_duplicate_host_case_insensitive(self):
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
_bottle([
{"host": "GitHub.com"},
{"host": "github.com"},
@@ -301,7 +300,7 @@ class TestRouteValidation(unittest.TestCase):
class TestConfigShape(unittest.TestCase):
def test_unknown_egress_key_rejected(self):
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
Manifest.from_json_obj({
"bottles": {"dev": {"egress": {"wat": []}}},
"agents": {"demo": {"skills": [], "prompt": "",
+13 -17
View File
@@ -10,22 +10,18 @@ it here covers both."""
from __future__ import annotations
import contextlib
import io
import unittest
from bot_bottle.log import Die
from bot_bottle.manifest import Manifest
from bot_bottle.manifest import ManifestError, Manifest
def _die_message(callable_, *args, **kwargs) -> str:
buf = io.StringIO()
with contextlib.redirect_stderr(buf):
try:
callable_(*args, **kwargs)
except Die:
return buf.getvalue()
raise AssertionError("expected Die was not raised")
def _error_message(callable_, *args, **kwargs) -> str:
"""Run `callable_` expecting a ManifestError; return its message."""
try:
callable_(*args, **kwargs)
except ManifestError as e:
return str(e)
raise AssertionError("expected ManifestError was not raised")
def _build(**bottles) -> Manifest:
@@ -295,16 +291,16 @@ class TestExtendsChain(unittest.TestCase):
class TestExtendsErrors(unittest.TestCase):
def test_missing_parent_dies(self):
msg = _die_message(_build, child={"extends": "ghost"})
msg = _error_message(_build, child={"extends": "ghost"})
self.assertIn("extends 'ghost'", msg)
self.assertIn("not defined", msg)
def test_self_extends_dies(self):
msg = _die_message(_build, loop={"extends": "loop"})
msg = _error_message(_build, loop={"extends": "loop"})
self.assertIn("extends itself", msg)
def test_two_node_cycle_dies(self):
msg = _die_message(
msg = _error_message(
_build,
a={"extends": "b"},
b={"extends": "a"},
@@ -315,7 +311,7 @@ class TestExtendsErrors(unittest.TestCase):
self.assertIn("b", msg)
def test_three_node_cycle_dies(self):
msg = _die_message(
msg = _error_message(
_build,
a={"extends": "b"},
b={"extends": "c"},
@@ -324,7 +320,7 @@ class TestExtendsErrors(unittest.TestCase):
self.assertIn("extends cycle", msg)
def test_non_string_extends_dies(self):
msg = _die_message(_build, child={"extends": ["base"]})
msg = _error_message(_build, child={"extends": ["base"]})
self.assertIn("extends must be a string", msg)
+16 -17
View File
@@ -2,8 +2,7 @@
import unittest
from bot_bottle.log import Die
from bot_bottle.manifest import Manifest
from bot_bottle.manifest import ManifestError, Manifest
def _manifest(git_entries):
@@ -63,28 +62,28 @@ class TestGitEntryParsing(unittest.TestCase):
self.assertEqual("", m.bottles["dev"].git[0].KnownHostKey)
def test_missing_name_dies(self):
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
Manifest.from_json_obj(_manifest([{
"Upstream": "ssh://git@github.com/foo.git",
"IdentityFile": "/dev/null",
}]))
def test_missing_upstream_dies(self):
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
Manifest.from_json_obj(_manifest([{
"Name": "foo",
"IdentityFile": "/dev/null",
}]))
def test_missing_identity_file_dies(self):
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
Manifest.from_json_obj(_manifest([{
"Name": "foo",
"Upstream": "ssh://git@github.com/foo.git",
}]))
def test_non_ssh_upstream_dies(self):
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
Manifest.from_json_obj(_manifest([{
"Name": "foo",
"Upstream": "https://github.com/didericis/foo.git",
@@ -94,7 +93,7 @@ class TestGitEntryParsing(unittest.TestCase):
def test_scp_style_upstream_dies(self):
# SCP-style "git@host:path" is intentionally not supported in
# v1 — ssh:// only.
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
Manifest.from_json_obj(_manifest([{
"Name": "foo",
"Upstream": "git@github.com:didericis/foo.git",
@@ -102,7 +101,7 @@ class TestGitEntryParsing(unittest.TestCase):
}]))
def test_upstream_without_user_dies(self):
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
Manifest.from_json_obj(_manifest([{
"Name": "foo",
"Upstream": "ssh://github.com/foo.git",
@@ -110,7 +109,7 @@ class TestGitEntryParsing(unittest.TestCase):
}]))
def test_upstream_without_path_dies(self):
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
Manifest.from_json_obj(_manifest([{
"Name": "foo",
"Upstream": "ssh://git@github.com",
@@ -118,7 +117,7 @@ class TestGitEntryParsing(unittest.TestCase):
}]))
def test_non_numeric_port_dies(self):
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
Manifest.from_json_obj(_manifest([{
"Name": "foo",
"Upstream": "ssh://git@github.com:notaport/foo.git",
@@ -146,7 +145,7 @@ class TestGitEntryExtraHosts(unittest.TestCase):
self.assertEqual({"gitea.dideric.is": "100.78.141.42"}, eh)
def test_extra_hosts_must_be_object(self):
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
Manifest.from_json_obj(_manifest([{
"Name": "foo",
"Upstream": "ssh://git@github.com/foo.git",
@@ -155,7 +154,7 @@ class TestGitEntryExtraHosts(unittest.TestCase):
}]))
def test_extra_hosts_ip_must_be_string(self):
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
Manifest.from_json_obj(_manifest([{
"Name": "foo",
"Upstream": "ssh://git@github.com/foo.git",
@@ -164,7 +163,7 @@ class TestGitEntryExtraHosts(unittest.TestCase):
}]))
def test_extra_hosts_empty_ip_dies(self):
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
Manifest.from_json_obj(_manifest([{
"Name": "foo",
"Upstream": "ssh://git@github.com/foo.git",
@@ -175,7 +174,7 @@ class TestGitEntryExtraHosts(unittest.TestCase):
class TestGitEntryCrossValidation(unittest.TestCase):
def test_duplicate_name_dies(self):
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
Manifest.from_json_obj({
"bottles": {"dev": {"git": {"remotes": {
"a.example": {
@@ -193,7 +192,7 @@ class TestGitEntryCrossValidation(unittest.TestCase):
})
def test_remote_key_must_match_upstream_host(self):
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
Manifest.from_json_obj({
"bottles": {"dev": {"git": {"remotes": {
"wrong.example": {
@@ -224,7 +223,7 @@ class TestGitEntryCrossValidation(unittest.TestCase):
def test_legacy_ssh_field_dies_with_hint(self):
# PRD 0009: bottle.ssh is removed; manifests carrying it must
# fail loudly with a hint pointing at bottle.git.
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
Manifest.from_json_obj({
"bottles": {
"dev": {
@@ -250,7 +249,7 @@ class TestEmptyGitField(unittest.TestCase):
self.assertEqual((), m.bottles["dev"].git)
def test_git_object_type_required(self):
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
Manifest.from_json_obj({
"bottles": {"dev": {"git": "not-a-list"}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
+13 -21
View File
@@ -1,25 +1,17 @@
"""Unit: Bottle git.user manifest parsing + validation (issue #86)."""
import contextlib
import io
import unittest
from bot_bottle.log import Die
from bot_bottle.manifest import GitUser, Manifest
from bot_bottle.manifest import ManifestError, GitUser, Manifest
def _die_message(callable_, *args, **kwargs) -> str:
"""Run `callable_` expecting it to die, return the stderr text
so tests can assert specifics. `die()` prints to stderr then
raises Die(1) the exit code is in the exception, the human
message is in stderr."""
buf = io.StringIO()
with contextlib.redirect_stderr(buf):
try:
callable_(*args, **kwargs)
except Die:
return buf.getvalue()
raise AssertionError("expected Die was not raised")
def _error_message(callable_, *args, **kwargs) -> str:
"""Run `callable_` expecting a ManifestError; return its message."""
try:
callable_(*args, **kwargs)
except ManifestError as e:
return str(e)
raise AssertionError("expected ManifestError was not raised")
def _manifest(git_user):
@@ -66,13 +58,13 @@ class TestGitUserParsing(unittest.TestCase):
# An explicit `git.user: {name: "", email: ""}` is a typo
# / half-finished edit; fail loudly rather than silently
# no-op (the operator clearly meant to configure something).
msg = _die_message(
msg = _error_message(
Manifest.from_json_obj, _manifest({"name": "", "email": ""}),
)
self.assertIn("neither name nor email", msg)
def test_unknown_key_dies(self):
msg = _die_message(
msg = _error_message(
Manifest.from_json_obj,
_manifest({"name": "Bot", "username": "bot"}),
)
@@ -80,19 +72,19 @@ class TestGitUserParsing(unittest.TestCase):
self.assertIn("username", msg)
def test_non_string_name_dies(self):
msg = _die_message(
msg = _error_message(
Manifest.from_json_obj, _manifest({"name": 42}),
)
self.assertIn("git.user.name must be a string", msg)
def test_non_string_email_dies(self):
msg = _die_message(
msg = _error_message(
Manifest.from_json_obj, _manifest({"email": ["x@y.z"]}),
)
self.assertIn("git.user.email must be a string", msg)
def test_legacy_top_level_git_user_dies(self):
msg = _die_message(
msg = _error_message(
Manifest.from_json_obj,
{
"bottles": {"dev": {"git_user": {"name": "Bot"}}},
+6 -7
View File
@@ -11,8 +11,7 @@ import textwrap
import unittest
from pathlib import Path
from bot_bottle.log import Die
from bot_bottle.manifest import Manifest
from bot_bottle.manifest import ManifestError, Manifest
def _write(p: Path, text: str) -> None:
@@ -238,7 +237,7 @@ class TestUnknownAgentKeyDies(_ResolveCase):
...
""",
)
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
self.resolve()
@@ -257,7 +256,7 @@ class TestUnknownBottleKeyDies(_ResolveCase):
""",
)
_write(self.home_cb / "agents" / "implementer.md", _AGENT_IMPL)
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
self.resolve()
@@ -268,7 +267,7 @@ class TestStaleJsonDies(_ResolveCase):
def test_dies(self):
(self.home_root / "bot-bottle.json").write_text('{"bottles": {}}')
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
self.resolve()
@@ -277,7 +276,7 @@ class TestNoManifestDies(_ResolveCase):
pointer at the new layout."""
def test_dies(self):
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
self.resolve()
def test_missing_ok_returns_empty_manifest(self):
@@ -300,7 +299,7 @@ class TestUnknownBottleReferenceDies(_ResolveCase):
---
""",
)
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
self.resolve()
+2 -3
View File
@@ -7,8 +7,7 @@ silently ignoring."""
import unittest
from typing import Any
from bot_bottle.log import Die
from bot_bottle.manifest import Bottle, Manifest
from bot_bottle.manifest import ManifestError, Bottle, Manifest
def _manifest_with_runtime(value: object) -> dict[str, Any]:
@@ -32,7 +31,7 @@ class TestManifestRuntimeRemoved(unittest.TestCase):
def test_any_runtime_value_is_rejected(self):
for value in ("runsc", "runc", "kata-runtime", "", 42, None):
with self.subTest(value=value):
with self.assertRaises(Die):
with self.assertRaises(ManifestError):
Manifest.from_json_obj(_manifest_with_runtime(value))