From 36cb0c53bf2c2226874059f50294c5dc5dffa9e8 Mon Sep 17 00:00:00 2001 From: didericis Date: Sun, 10 May 2026 21:08:54 -0400 Subject: [PATCH 1/4] refactor(manifest): add TypedDict schema and eager validation Move schema checks out of per-access getters into a single manifest_validate pass invoked by manifest_resolve. Getters can now assume bottles/agents are well-typed dicts and every agent has a defined bottle, so the .get(...) or {} chains collapse. Behavior change: a bad runtime / shape error anywhere in the manifest now fails at load instead of on the N-th read. Intermediate step toward replacing TypedDict with a dataclass. Co-Authored-By: Claude Opus 4.7 --- claude_bottle/manifest.py | 279 +++++++++++++++++++++++---------- tests/test_manifest_runtime.py | 13 +- 2 files changed, 199 insertions(+), 93 deletions(-) diff --git a/claude_bottle/manifest.py b/claude_bottle/manifest.py index 91eaa39..f1674ec 100644 --- a/claude_bottle/manifest.py +++ b/claude_bottle/manifest.py @@ -28,18 +28,58 @@ from __future__ import annotations import json import os from pathlib import Path -from typing import Any +from typing import Any, Literal, TypedDict, cast from .log import die -Manifest = dict[str, Any] + +class SshEntry(TypedDict, total=False): + """One entry from bottle.ssh. Host and IdentityFile are required at + runtime (enforced by manifest_validate); the rest configure the + generated ~/.ssh/config block inside the container.""" + + Host: str + IdentityFile: str + Hostname: str + User: str + Port: int | str + KnownHostKey: str + + +class BottleEgress(TypedDict, total=False): + allowlist: list[str] + + +class Bottle(TypedDict, total=False): + """Shared infrastructure referenced by agents. env values are + sentinel-prefixed strings (see env_resolve); runtime defaults to + "runc" when absent.""" + + env: dict[str, str] + ssh: list[SshEntry] + egress: BottleEgress + runtime: Literal["runc", "runsc"] + + +class Agent(TypedDict, total=False): + skills: list[str] + prompt: str + bottle: str + + +class Manifest(TypedDict, total=False): + bottles: dict[str, Bottle] + agents: dict[str, Agent] + + +_SUPPORTED_RUNTIMES: tuple[str, ...] = ("runc", "runsc") def manifest_resolve(cwd: str) -> Manifest: """Look for claude-bottle.json in and in $HOME, deep-merge them (cwd entries override home entries on key conflict for both - bottles and agents). Dies if neither file is found or either is - invalid JSON.""" + bottles and agents), then validate. Dies if neither file is found, + either is invalid JSON, or the merged shape violates the schema.""" cwd_file = Path(cwd) / "claude-bottle.json" home_file = Path(os.environ["HOME"]) / "claude-bottle.json" @@ -49,18 +89,135 @@ def manifest_resolve(cwd: str) -> Manifest: if cwd_doc is None and home_doc is None: die(f"no claude-bottle.json found in {cwd} or {os.environ['HOME']}") - if cwd_doc is None: - return home_doc # type: ignore[return-value] - if home_doc is None: - return cwd_doc - - return { - "bottles": {**(home_doc.get("bottles") or {}), **(cwd_doc.get("bottles") or {})}, - "agents": {**(home_doc.get("agents") or {}), **(cwd_doc.get("agents") or {})}, + h = home_doc if home_doc is not None else cast(Manifest, {}) + c = cwd_doc if cwd_doc is not None else cast(Manifest, {}) + manifest: Manifest = { + "bottles": {**(h.get("bottles") or {}), **(c.get("bottles") or {})}, + "agents": {**(h.get("agents") or {}), **(c.get("agents") or {})}, } + manifest_validate(manifest) + return manifest -def _load_json_or_die(path: Path) -> Manifest: +def manifest_validate(manifest: Manifest) -> None: + """Deep-validate the manifest, dying on the first schema violation. + After this returns, getters can trust that bottles and agents are + dicts, every agent has a 'bottle' field that references a defined + bottle, and optional fields (when present) have correct types.""" + bottles = manifest.get("bottles") or {} + agents = manifest.get("agents") or {} + + for bname, bottle in bottles.items(): + _validate_bottle(bname, bottle) + + for aname, agent in agents.items(): + _validate_agent(aname, agent, bottles) + + +def _validate_bottle(name: str, bottle: Any) -> None: + if not isinstance(bottle, dict): + die(f"bottle '{name}' must be a JSON object (was {_json_type(bottle)})") + bottle = cast(dict[Any, Any], bottle) + + env = bottle.get("env") + if env is not None: + if not isinstance(env, dict): + die(f"bottle '{name}' env must be a JSON object (was {_json_type(env)})") + env = cast(dict[Any, Any], env) + for var, value in env.items(): + if not isinstance(value, str): + die( + f"env entry {var} in bottle '{name}' must be a JSON string " + f"(was {_json_type(value)}). Use \"?\" for prompt-at-runtime." + ) + + ssh = bottle.get("ssh") + if ssh is not None: + if not isinstance(ssh, list): + die(f"bottle '{name}' ssh must be an array (was {_json_type(ssh)})") + ssh = cast(list[Any], ssh) + for i, entry in enumerate(ssh): + if not isinstance(entry, dict): + die( + f"bottle '{name}' ssh[{i}] must be a JSON object " + f"(was {_json_type(entry)})" + ) + entry = cast(dict[Any, Any], entry) + host = entry.get("Host") + if not isinstance(host, str) or not host: + die(f"bottle '{name}' ssh[{i}] missing required string field 'Host'") + ident = entry.get("IdentityFile") + if not isinstance(ident, str) or not ident: + die( + f"bottle '{name}' ssh '{host}' missing required string field " + f"'IdentityFile'" + ) + + egress = bottle.get("egress") + if egress is not None: + if not isinstance(egress, dict): + die(f"bottle '{name}' egress must be a JSON object (was {_json_type(egress)})") + egress = cast(dict[Any, Any], egress) + allowlist = egress.get("allowlist") + if allowlist is not None: + if not isinstance(allowlist, list): + die( + f"bottle '{name}' egress.allowlist must be an array " + f"(was {_json_type(allowlist)})" + ) + allowlist = cast(list[Any], allowlist) + for i, host in enumerate(allowlist): + if not isinstance(host, str): + die( + f"bottle '{name}' egress.allowlist[{i}] must be a string " + f"(was {_json_type(host)})" + ) + + runtime = bottle.get("runtime") + if runtime is not None: + if not isinstance(runtime, str): + die(f"bottle '{name}' runtime must be a string (was {_json_type(runtime)})") + if runtime not in _SUPPORTED_RUNTIMES: + die( + f"bottle '{name}' runtime '{runtime}' is not supported. " + f"Use one of: {', '.join(_SUPPORTED_RUNTIMES)}." + ) + + +def _validate_agent(name: str, agent: Any, bottles: dict[str, Any]) -> None: + if not isinstance(agent, dict): + die(f"agent '{name}' must be a JSON object (was {_json_type(agent)})") + agent = cast(dict[Any, Any], agent) + + skills = agent.get("skills") + if skills is not None: + if not isinstance(skills, list): + die(f"agent '{name}' skills must be an array (was {_json_type(skills)})") + skills = cast(list[Any], skills) + for i, skill in enumerate(skills): + if not isinstance(skill, str): + die(f"agent '{name}' skills[{i}] must be a string (was {_json_type(skill)})") + + prompt = agent.get("prompt") + if prompt is not None and not isinstance(prompt, str): + die(f"agent '{name}' prompt must be a string (was {_json_type(prompt)})") + + bottle = agent.get("bottle") + if not isinstance(bottle, str) or not bottle: + die(f"agent '{name}' must declare a 'bottle' field naming a defined bottle") + if bottle not in bottles: + available = ", ".join(bottles.keys()) or "(none defined)" + die( + f"agent '{name}' references bottle '{bottle}', which is not defined. " + f"Available: {available}" + ) + + +def _load_json_or_die(path: Path) -> dict[Any, Any]: + """Load and shallow-check. Confirms bottles/agents (when present) + are JSON objects so the merge in manifest_resolve cannot TypeError + before manifest_validate runs.""" + doc: Any = None try: with path.open() as f: doc = json.load(f) @@ -68,18 +225,18 @@ def _load_json_or_die(path: Path) -> Manifest: die(f"claude-bottle.json at {path} is not valid JSON") if not isinstance(doc, dict): die(f"claude-bottle.json at {path} must be a JSON object") - return doc + return cast(dict[Any, Any], doc) def manifest_has_agent(manifest: Manifest, name: str) -> bool: - return name in (manifest.get("agents") or {}) + return name in manifest["agents"] def manifest_require_agent(manifest: Manifest, name: str) -> None: """Like has_agent but dies with the available agent names listed.""" if manifest_has_agent(manifest, name): return - available = ", ".join((manifest.get("agents") or {}).keys()) + available = ", ".join(manifest["agents"].keys()) if available: die(f"agent '{name}' not defined in claude-bottle.json. Available: {available}") else: @@ -88,58 +245,38 @@ def manifest_require_agent(manifest: Manifest, name: str) -> None: def manifest_env_names(manifest: Manifest, name: str) -> list[str]: """Names (not values) of bottles[agent.bottle].env, in declaration - order. Empty list if the agent has no bottle or the bottle has no env.""" - agent = (manifest.get("agents") or {}).get(name) or {} - bottle_name = agent.get("bottle") or "" - if not bottle_name: - return [] - bottle = (manifest.get("bottles") or {}).get(bottle_name) or {} - return list((bottle.get("env") or {}).keys()) + order. Empty if the agent's bottle has no env.""" + bottle_name = manifest["agents"][name]["bottle"] + return list(manifest["bottles"][bottle_name].get("env", {}).keys()) def manifest_env_entry(manifest: Manifest, agent: str, var: str) -> str: """Raw string value of one env entry. Used by env_resolve, which - classifies the result by sentinel. Dies if the agent has no bottle, - or the entry is not a string.""" - agent_def = (manifest.get("agents") or {}).get(agent) or {} - bottle_name = agent_def.get("bottle") or "" - if not bottle_name: - die(f"env entry {var} for agent {agent}: agent has no 'bottle' field") - bottle = (manifest.get("bottles") or {}).get(bottle_name) or {} - env = bottle.get("env") or {} - value = env.get(var) - if not isinstance(value, str): - actual = _json_type(value) - die( - f"env entry {var} for agent {agent} must be a JSON string " - f"(was {actual}). Use \"?\" for prompt-at-runtime." - ) - return value + classifies the result by sentinel.""" + bottle_name = manifest["agents"][agent]["bottle"] + return manifest["bottles"][bottle_name]["env"][var] def manifest_skills(manifest: Manifest, name: str) -> list[str]: - agent = (manifest.get("agents") or {}).get(name) or {} - return list(agent.get("skills") or []) + return list(manifest["agents"][name].get("skills", [])) def manifest_prompt(manifest: Manifest, name: str) -> str: - agent = (manifest.get("agents") or {}).get(name) or {} - return agent.get("prompt") or "" + return manifest["agents"][name].get("prompt", "") def manifest_agent_bottle(manifest: Manifest, name: str) -> str: - agent = (manifest.get("agents") or {}).get(name) or {} - return agent.get("bottle") or "" + return manifest["agents"][name]["bottle"] def manifest_has_bottle(manifest: Manifest, bottle_name: str) -> bool: - return bottle_name in (manifest.get("bottles") or {}) + return bottle_name in manifest["bottles"] def manifest_require_bottle(manifest: Manifest, bottle_name: str) -> None: if manifest_has_bottle(manifest, bottle_name): return - available = ", ".join((manifest.get("bottles") or {}).keys()) + available = ", ".join(manifest["bottles"].keys()) if available: die( f"bottle '{bottle_name}' not defined in claude-bottle.json. " @@ -149,56 +286,24 @@ def manifest_require_bottle(manifest: Manifest, bottle_name: str) -> None: die(f"bottle '{bottle_name}' not defined in claude-bottle.json (no bottles defined).") -def manifest_bottle_ssh(manifest: Manifest, bottle_name: str) -> list[dict[str, Any]]: - bottle = (manifest.get("bottles") or {}).get(bottle_name) or {} - return list(bottle.get("ssh") or []) - - -_SUPPORTED_RUNTIMES: tuple[str, ...] = ("runc", "runsc") +def manifest_bottle_ssh(manifest: Manifest, bottle_name: str) -> list[SshEntry]: + return list(manifest["bottles"][bottle_name].get("ssh", [])) def manifest_bottle_runtime(manifest: Manifest, bottle_name: str) -> str: """Container runtime for the bottle's agent container. Returns - "runc" (Docker default) or "runsc" (gVisor opt-in). Dies if the - field is present but not one of the supported values.""" - bottle = (manifest.get("bottles") or {}).get(bottle_name) or {} - raw = bottle.get("runtime") - if raw is None: - return "runc" - if not isinstance(raw, str): - die( - f"bottle '{bottle_name}' runtime must be a string " - f"(was {_json_type(raw)})." - ) - if raw not in _SUPPORTED_RUNTIMES: - die( - f"bottle '{bottle_name}' runtime '{raw}' is not supported. " - f"Use one of: {', '.join(_SUPPORTED_RUNTIMES)}." - ) - return raw + "runc" (Docker default) or "runsc" (gVisor opt-in).""" + return manifest["bottles"][bottle_name].get("runtime", "runc") def manifest_bottle_egress_allowlist(manifest: Manifest, bottle_name: str) -> list[str]: - """Hostnames in bottles[bottle_name].egress.allowlist. Dies if the - field is present but not an array. Per-element string typing is - re-checked at use-time in pipelock.""" - bottle = (manifest.get("bottles") or {}).get(bottle_name) or {} - allowlist = (bottle.get("egress") or {}).get("allowlist") - if allowlist is None: - return [] - if not isinstance(allowlist, list): - die( - f"bottle '{bottle_name}' egress.allowlist must be an array " - f"(was {_json_type(allowlist)})." - ) - return list(allowlist) + """Hostnames in bottles[bottle_name].egress.allowlist.""" + return list(manifest["bottles"][bottle_name].get("egress", {}).get("allowlist", [])) -def manifest_ssh(manifest: Manifest, agent_name: str) -> list[dict[str, Any]]: - """SSH entries resolved via the agent's "bottle" field; empty if no bottle set.""" +def manifest_ssh(manifest: Manifest, agent_name: str) -> list[SshEntry]: + """SSH entries resolved via the agent's "bottle" field.""" bottle_name = manifest_agent_bottle(manifest, agent_name) - if not bottle_name: - return [] return manifest_bottle_ssh(manifest, bottle_name) diff --git a/tests/test_manifest_runtime.py b/tests/test_manifest_runtime.py index 4ba013d..eeec914 100644 --- a/tests/test_manifest_runtime.py +++ b/tests/test_manifest_runtime.py @@ -1,10 +1,11 @@ -"""Unit: manifest_bottle_runtime — defaults to runc, accepts runsc, -rejects unknown values and non-strings.""" +"""Unit: bottle runtime — manifest_bottle_runtime returns the configured +runtime (defaulting to runc); manifest_validate rejects unknown values, +non-strings, and empty strings.""" import unittest from claude_bottle.log import Die -from claude_bottle.manifest import manifest_bottle_runtime +from claude_bottle.manifest import manifest_bottle_runtime, manifest_validate def _bottle(runtime_value: object | None) -> dict: @@ -34,15 +35,15 @@ class TestManifestBottleRuntime(unittest.TestCase): def test_rejects_unknown_runtime(self): with self.assertRaises(Die): - manifest_bottle_runtime(_bottle("kata-runtime"), "dev") + manifest_validate(_bottle("kata-runtime")) def test_rejects_non_string(self): with self.assertRaises(Die): - manifest_bottle_runtime(_bottle(42), "dev") + manifest_validate(_bottle(42)) def test_rejects_empty_string(self): with self.assertRaises(Die): - manifest_bottle_runtime(_bottle(""), "dev") + manifest_validate(_bottle("")) if __name__ == "__main__": From 1f36d53f7b14e742117690ed0e8867707b3d6190 Mon Sep 17 00:00:00 2001 From: didericis Date: Sun, 10 May 2026 21:20:15 -0400 Subject: [PATCH 2/4] refactor(manifest): convert TypedDict to frozen dataclasses MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the TypedDict + 14 manifest_* free functions with frozen dataclasses (SshEntry, BottleEgress, Bottle, Agent, Manifest) carrying their own validators and constructors. Call sites import Manifest and chain attribute access; the manifest_* helpers and manifest_validate are gone. Behavior changes worth flagging: - Agent.bottle is now required (was optional with a "(none)" fallback). Manifest.from_json_obj dies if any agent lacks a 'bottle' field or references an undefined bottle, where previously start.py raised the error lazily for the specific agent being launched. - ssh.py now takes SshEntry instances; Host/IdentityFile shape checks moved upstream into Manifest construction, leaving only the IdentityFile filesystem-existence check in ssh_validate_entries. - pipelock_bottle_allowlist's per-element string check is dropped — the Manifest validator enforces it at load. Co-Authored-By: Claude Opus 4.7 --- claude_bottle/cli/info.py | 54 ++-- claude_bottle/cli/list.py | 6 +- claude_bottle/cli/start.py | 48 +-- claude_bottle/env_resolve.py | 6 +- claude_bottle/manifest.py | 532 ++++++++++++++++--------------- claude_bottle/pipelock.py | 35 +- claude_bottle/ssh.py | 33 +- tests/fixtures.py | 35 +- tests/test_manifest_runtime.py | 34 +- tests/test_pipelock_allowlist.py | 7 +- tests/test_pipelock_yaml.py | 5 +- 11 files changed, 387 insertions(+), 408 deletions(-) diff --git a/claude_bottle/cli/info.py b/claude_bottle/cli/info.py index 77f2545..b479bd0 100644 --- a/claude_bottle/cli/info.py +++ b/claude_bottle/cli/info.py @@ -5,15 +5,7 @@ from __future__ import annotations import argparse from ..log import info -from ..manifest import ( - manifest_agent_bottle, - manifest_env_names, - manifest_prompt, - manifest_require_agent, - manifest_resolve, - manifest_skills, - manifest_ssh, -) +from ..manifest import Manifest from ._common import PROG, USER_CWD @@ -22,39 +14,33 @@ def cmd_info(argv: list[str]) -> int: parser.add_argument("name", help="agent name defined in claude-bottle.json") args = parser.parse_args(argv) - manifest = manifest_resolve(USER_CWD) - manifest_require_agent(manifest, args.name) + manifest = Manifest.resolve(USER_CWD) + manifest.require_agent(args.name) - env_names = manifest_env_names(manifest, args.name) - skill_names = manifest_skills(manifest, args.name) - prompt_content = manifest_prompt(manifest, args.name) - prompt_first_line = prompt_content.splitlines()[0] if prompt_content else "" - - bottle_name = manifest_agent_bottle(manifest, args.name) - ssh_entries = manifest_ssh(manifest, args.name) + agent = manifest.agents[args.name] + bottle = manifest.bottle_for(args.name) + env_names = list(bottle.env.keys()) + prompt_first_line = agent.prompt.splitlines()[0] if agent.prompt else "" print() info(f"agent : {args.name}") info(f"env (names only): {', '.join(env_names) if env_names else '(none)'}") - info(f"skills : {' '.join(skill_names) if skill_names else '(none)'}") + info(f"skills : {' '.join(agent.skills) if agent.skills else '(none)'}") info( - f"prompt : {len(prompt_content)} chars; " + f"prompt : {len(agent.prompt)} chars; " f"first line: {prompt_first_line or '(empty)'}" ) - if bottle_name: - info(f"bottle : {bottle_name}") - if ssh_entries: - for e in ssh_entries: - info( - f" ssh host : {e.get('Host')} " - f"(Hostname={e.get('Hostname')}, User={e.get('User')}, " - f"Port={e.get('Port')}, IdentityFile={e.get('IdentityFile')})" - ) - if e.get("KnownHostKey"): - info(f" KnownHostKey: {e['KnownHostKey']}") - else: - info(" ssh hosts : (none)") + info(f"bottle : {agent.bottle}") + if bottle.ssh: + for e in bottle.ssh: + info( + f" ssh host : {e.Host} " + f"(Hostname={e.Hostname}, User={e.User}, " + f"Port={e.Port}, IdentityFile={e.IdentityFile})" + ) + if e.KnownHostKey: + info(f" KnownHostKey: {e.KnownHostKey}") else: - info("bottle : (none)") + info(" ssh hosts : (none)") print() return 0 diff --git a/claude_bottle/cli/list.py b/claude_bottle/cli/list.py index 06f2c97..a60143b 100644 --- a/claude_bottle/cli/list.py +++ b/claude_bottle/cli/list.py @@ -7,7 +7,7 @@ import subprocess from .. import docker as docker_mod from ..log import info -from ..manifest import manifest_resolve +from ..manifest import Manifest from ._common import PROG, USER_CWD @@ -17,8 +17,8 @@ def cmd_list(argv: list[str]) -> int: args = parser.parse_args(argv) if args.scope == "available": - manifest = manifest_resolve(USER_CWD) - for name in (manifest.get("agents") or {}).keys(): + manifest = Manifest.resolve(USER_CWD) + for name in manifest.agents.keys(): print(name) return 0 diff --git a/claude_bottle/cli/start.py b/claude_bottle/cli/start.py index 5cca959..2685df6 100644 --- a/claude_bottle/cli/start.py +++ b/claude_bottle/cli/start.py @@ -19,17 +19,7 @@ from .. import skills as skills_mod from .. import ssh as ssh_mod from ..env_resolve import env_resolve from ..log import die, info -from ..manifest import ( - manifest_agent_bottle, - manifest_bottle_runtime, - manifest_env_names, - manifest_prompt, - manifest_require_agent, - manifest_require_bottle, - manifest_resolve, - manifest_skills, - manifest_ssh, -) +from ..manifest import Manifest from ._common import PROG, REPO_DIR, USER_CWD, read_tty_line @@ -57,8 +47,11 @@ def cmd_start(argv: list[str]) -> int: runtime_image = derived_image docker_mod.require_docker() - manifest = manifest_resolve(USER_CWD) - manifest_require_agent(manifest, name) + manifest = Manifest.resolve(USER_CWD) + manifest.require_agent(name) + agent = manifest.agents[name] + bottle_name = agent.bottle + bottle = manifest.bottle_for(name) container = pinned_container or default_container suffix = 2 @@ -81,7 +74,7 @@ def cmd_start(argv: list[str]) -> int: ) # --- Plan resolution (host-only, no container yet) --- - env_names = manifest_env_names(manifest, name) + env_names = list(bottle.env.keys()) # CLAUDE_BOTTLE_OAUTH_TOKEN → CLAUDE_CODE_OAUTH_TOKEN forwarding. # Host-side token is always forwarded so every container can authenticate. @@ -90,23 +83,14 @@ def cmd_start(argv: list[str]) -> int: if forward_oauth_token: display_env_names.append("CLAUDE_CODE_OAUTH_TOKEN") - skill_names = manifest_skills(manifest, name) - if skill_names: - skills_mod.skills_validate_all(skill_names) + if agent.skills: + skills_mod.skills_validate_all(list(agent.skills)) - bottle_name = manifest_agent_bottle(manifest, name) - if not bottle_name: - die( - f"agent '{name}' has no 'bottle' field. " - f"Add a bottle association to this agent in claude-bottle.json." - ) - manifest_require_bottle(manifest, bottle_name) - - runtime = manifest_bottle_runtime(manifest, bottle_name) + runtime = bottle.runtime if runtime == "runsc": docker_mod.require_runsc() - ssh_entries = manifest_ssh(manifest, name) + ssh_entries = bottle.ssh if ssh_entries: ssh_mod.ssh_validate_entries(ssh_entries) @@ -153,7 +137,7 @@ def cmd_start(argv: list[str]) -> int: env_resolve(manifest, name, env_file, args_file) - prompt_content = manifest_prompt(manifest, name) + prompt_content = agent.prompt prompt_file.write_text(prompt_content) prompt_first_line = prompt_content.splitlines()[0] if prompt_content else "" @@ -169,11 +153,11 @@ def cmd_start(argv: list[str]) -> int: "env (names only): " + (", ".join(display_env_names) if display_env_names else "(none)") ) - info("skills : " + (" ".join(skill_names) if skill_names else "(none)")) + info("skills : " + (" ".join(agent.skills) if agent.skills else "(none)")) info(f"bottle : {bottle_name}") info(f" runtime : {runtime}{' (gVisor)' if runtime == 'runsc' else ''}") if ssh_entries: - ssh_names = ", ".join(e.get("Host", "") for e in ssh_entries) + ssh_names = ", ".join(e.Host for e in ssh_entries) info(f" ssh hosts : {ssh_names}") else: info(" ssh hosts : (none)") @@ -293,8 +277,8 @@ def cmd_start(argv: list[str]) -> int: check=True, ) - if skill_names: - skills_mod.skills_copy_into(container, skill_names) + if agent.skills: + skills_mod.skills_copy_into(container, list(agent.skills)) if ssh_entries: proxy_host_port = pipelock.pipelock_proxy_host_port(slug) diff --git a/claude_bottle/env_resolve.py b/claude_bottle/env_resolve.py index 82f2980..0e0b3e7 100644 --- a/claude_bottle/env_resolve.py +++ b/claude_bottle/env_resolve.py @@ -35,7 +35,7 @@ import sys from pathlib import Path from .log import die -from .manifest import Manifest, manifest_env_entry, manifest_env_names +from .manifest import Manifest _INTERPOLATED_RE = re.compile(r"^\$\{([A-Za-z_][A-Za-z0-9_]*)\}$") @@ -108,10 +108,10 @@ def env_resolve( - interpolated: copy host value; export under target name; append `-e NAME` - literal: append `NAME=VALUE` to env_file """ - for name in manifest_env_names(manifest, agent): + bottle = manifest.bottle_for(agent) + for name, raw in bottle.env.items(): if not name: continue - raw = manifest_env_entry(manifest, agent, name) kind = env_entry_kind(raw) if kind == "secret": prompt_body = env_entry_secret_prompt(raw) diff --git a/claude_bottle/manifest.py b/claude_bottle/manifest.py index f1674ec..fc87a76 100644 --- a/claude_bottle/manifest.py +++ b/claude_bottle/manifest.py @@ -1,5 +1,5 @@ -"""Manifest helpers. Read claude-bottle.json and pull the definition for a -named agent. +"""Manifest dataclasses. Read claude-bottle.json (cwd + $HOME, deep-merged) +into a frozen, validated Manifest tree. Schema (see CLAUDE.md "Intended design"): { @@ -7,7 +7,8 @@ Schema (see CLAUDE.md "Intended design"): "": { "env": { "": , ... }, "ssh": [ , ... ], - "egress": { "allowlist": [ "", ... ] } + "egress": { "allowlist": [ "", ... ] }, + "runtime": "runc" | "runsc" } }, "agents": { @@ -21,203 +22,278 @@ Schema (see CLAUDE.md "Intended design"): Bottles group shared infrastructure (SSH keys, known hosts, egress allowlist) that multiple agents can reference. Every agent must reference a bottle. + +Validation runs once at construction (Manifest.from_json_obj) so getters +can trust the shape. """ from __future__ import annotations import json import os +from dataclasses import dataclass, field from pathlib import Path -from typing import Any, Literal, TypedDict, cast +from typing import Any, Literal, Mapping from .log import die -class SshEntry(TypedDict, total=False): - """One entry from bottle.ssh. Host and IdentityFile are required at - runtime (enforced by manifest_validate); the rest configure the - generated ~/.ssh/config block inside the container.""" +_SUPPORTED_RUNTIMES: tuple[str, ...] = ("runc", "runsc") +Runtime = Literal["runc", "runsc"] + +@dataclass(frozen=True) +class SshEntry: Host: str IdentityFile: str - Hostname: str - User: str - Port: int | str - KnownHostKey: str + Hostname: str = "" + User: str = "" + Port: str = "" + KnownHostKey: str = "" - -class BottleEgress(TypedDict, total=False): - allowlist: list[str] - - -class Bottle(TypedDict, total=False): - """Shared infrastructure referenced by agents. env values are - sentinel-prefixed strings (see env_resolve); runtime defaults to - "runc" when absent.""" - - env: dict[str, str] - ssh: list[SshEntry] - egress: BottleEgress - runtime: Literal["runc", "runsc"] - - -class Agent(TypedDict, total=False): - skills: list[str] - prompt: str - bottle: str - - -class Manifest(TypedDict, total=False): - bottles: dict[str, Bottle] - agents: dict[str, Agent] - - -_SUPPORTED_RUNTIMES: tuple[str, ...] = ("runc", "runsc") - - -def manifest_resolve(cwd: str) -> Manifest: - """Look for claude-bottle.json in and in $HOME, deep-merge - them (cwd entries override home entries on key conflict for both - bottles and agents), then validate. Dies if neither file is found, - either is invalid JSON, or the merged shape violates the schema.""" - cwd_file = Path(cwd) / "claude-bottle.json" - home_file = Path(os.environ["HOME"]) / "claude-bottle.json" - - cwd_doc = _load_json_or_die(cwd_file) if cwd_file.is_file() else None - home_doc = _load_json_or_die(home_file) if home_file.is_file() else None - - if cwd_doc is None and home_doc is None: - die(f"no claude-bottle.json found in {cwd} or {os.environ['HOME']}") - - h = home_doc if home_doc is not None else cast(Manifest, {}) - c = cwd_doc if cwd_doc is not None else cast(Manifest, {}) - manifest: Manifest = { - "bottles": {**(h.get("bottles") or {}), **(c.get("bottles") or {})}, - "agents": {**(h.get("agents") or {}), **(c.get("agents") or {})}, - } - manifest_validate(manifest) - return manifest - - -def manifest_validate(manifest: Manifest) -> None: - """Deep-validate the manifest, dying on the first schema violation. - After this returns, getters can trust that bottles and agents are - dicts, every agent has a 'bottle' field that references a defined - bottle, and optional fields (when present) have correct types.""" - bottles = manifest.get("bottles") or {} - agents = manifest.get("agents") or {} - - for bname, bottle in bottles.items(): - _validate_bottle(bname, bottle) - - for aname, agent in agents.items(): - _validate_agent(aname, agent, bottles) - - -def _validate_bottle(name: str, bottle: Any) -> None: - if not isinstance(bottle, dict): - die(f"bottle '{name}' must be a JSON object (was {_json_type(bottle)})") - bottle = cast(dict[Any, Any], bottle) - - env = bottle.get("env") - if env is not None: - if not isinstance(env, dict): - die(f"bottle '{name}' env must be a JSON object (was {_json_type(env)})") - env = cast(dict[Any, Any], env) - for var, value in env.items(): - if not isinstance(value, str): - die( - f"env entry {var} in bottle '{name}' must be a JSON string " - f"(was {_json_type(value)}). Use \"?\" for prompt-at-runtime." - ) - - ssh = bottle.get("ssh") - if ssh is not None: - if not isinstance(ssh, list): - die(f"bottle '{name}' ssh must be an array (was {_json_type(ssh)})") - ssh = cast(list[Any], ssh) - for i, entry in enumerate(ssh): - if not isinstance(entry, dict): - die( - f"bottle '{name}' ssh[{i}] must be a JSON object " - f"(was {_json_type(entry)})" - ) - entry = cast(dict[Any, Any], entry) - host = entry.get("Host") - if not isinstance(host, str) or not host: - die(f"bottle '{name}' ssh[{i}] missing required string field 'Host'") - ident = entry.get("IdentityFile") - if not isinstance(ident, str) or not ident: - die( - f"bottle '{name}' ssh '{host}' missing required string field " - f"'IdentityFile'" - ) - - egress = bottle.get("egress") - if egress is not None: - if not isinstance(egress, dict): - die(f"bottle '{name}' egress must be a JSON object (was {_json_type(egress)})") - egress = cast(dict[Any, Any], egress) - allowlist = egress.get("allowlist") - if allowlist is not None: - if not isinstance(allowlist, list): - die( - f"bottle '{name}' egress.allowlist must be an array " - f"(was {_json_type(allowlist)})" - ) - allowlist = cast(list[Any], allowlist) - for i, host in enumerate(allowlist): - if not isinstance(host, str): - die( - f"bottle '{name}' egress.allowlist[{i}] must be a string " - f"(was {_json_type(host)})" - ) - - runtime = bottle.get("runtime") - if runtime is not None: - if not isinstance(runtime, str): - die(f"bottle '{name}' runtime must be a string (was {_json_type(runtime)})") - if runtime not in _SUPPORTED_RUNTIMES: + @classmethod + def _from_dict(cls, bottle_name: str, idx: int, raw: Any) -> "SshEntry": + if not isinstance(raw, dict): die( - f"bottle '{name}' runtime '{runtime}' is not supported. " - f"Use one of: {', '.join(_SUPPORTED_RUNTIMES)}." + f"bottle '{bottle_name}' ssh[{idx}] must be a JSON object " + f"(was {_json_type(raw)})" ) - - -def _validate_agent(name: str, agent: Any, bottles: dict[str, Any]) -> None: - if not isinstance(agent, dict): - die(f"agent '{name}' must be a JSON object (was {_json_type(agent)})") - agent = cast(dict[Any, Any], agent) - - skills = agent.get("skills") - if skills is not None: - if not isinstance(skills, list): - die(f"agent '{name}' skills must be an array (was {_json_type(skills)})") - skills = cast(list[Any], skills) - for i, skill in enumerate(skills): - if not isinstance(skill, str): - die(f"agent '{name}' skills[{i}] must be a string (was {_json_type(skill)})") - - prompt = agent.get("prompt") - if prompt is not None and not isinstance(prompt, str): - die(f"agent '{name}' prompt must be a string (was {_json_type(prompt)})") - - bottle = agent.get("bottle") - if not isinstance(bottle, str) or not bottle: - die(f"agent '{name}' must declare a 'bottle' field naming a defined bottle") - if bottle not in bottles: - available = ", ".join(bottles.keys()) or "(none defined)" - die( - f"agent '{name}' references bottle '{bottle}', which is not defined. " - f"Available: {available}" + host = raw.get("Host") + if not isinstance(host, str) or not host: + die(f"bottle '{bottle_name}' ssh[{idx}] missing required string field 'Host'") + ident = raw.get("IdentityFile") + if not isinstance(ident, str) or not ident: + die( + f"bottle '{bottle_name}' ssh '{host}' missing required string field " + f"'IdentityFile'" + ) + hostname = _opt_str(raw.get("Hostname"), f"bottle '{bottle_name}' ssh '{host}' Hostname") + user = _opt_str(raw.get("User"), f"bottle '{bottle_name}' ssh '{host}' User") + port = _opt_port(raw.get("Port"), f"bottle '{bottle_name}' ssh '{host}' Port") + khk = _opt_str( + raw.get("KnownHostKey"), + f"bottle '{bottle_name}' ssh '{host}' KnownHostKey", + ) + return cls( + Host=host, + IdentityFile=ident, + Hostname=hostname, + User=user, + Port=port, + KnownHostKey=khk, ) -def _load_json_or_die(path: Path) -> dict[Any, Any]: - """Load and shallow-check. Confirms bottles/agents (when present) - are JSON objects so the merge in manifest_resolve cannot TypeError - before manifest_validate runs.""" - doc: Any = None +@dataclass(frozen=True) +class BottleEgress: + allowlist: tuple[str, ...] = () + + @classmethod + def _from_dict(cls, bottle_name: str, raw: Any) -> "BottleEgress": + if not isinstance(raw, dict): + die( + f"bottle '{bottle_name}' egress must be a JSON object " + f"(was {_json_type(raw)})" + ) + allow = raw.get("allowlist") + if allow is None: + return cls() + if not isinstance(allow, list): + die( + f"bottle '{bottle_name}' egress.allowlist must be an array " + f"(was {_json_type(allow)})" + ) + for i, host in enumerate(allow): + if not isinstance(host, str): + die( + f"bottle '{bottle_name}' egress.allowlist[{i}] must be a string " + f"(was {_json_type(host)})" + ) + return cls(allowlist=tuple(allow)) + + +@dataclass(frozen=True) +class Bottle: + env: Mapping[str, str] = field(default_factory=dict) + ssh: tuple[SshEntry, ...] = () + egress: BottleEgress = field(default_factory=BottleEgress) + runtime: Runtime = "runc" + + @classmethod + def _from_dict(cls, name: str, raw: Any) -> "Bottle": + if not isinstance(raw, dict): + die(f"bottle '{name}' must be a JSON object (was {_json_type(raw)})") + + env_raw = raw.get("env") + env: dict[str, str] = {} + if env_raw is not None: + if not isinstance(env_raw, dict): + die(f"bottle '{name}' env must be a JSON object (was {_json_type(env_raw)})") + for var, value in env_raw.items(): + if not isinstance(value, str): + die( + f"env entry {var} in bottle '{name}' must be a JSON string " + f"(was {_json_type(value)}). Use \"?\" for prompt-at-runtime." + ) + env[var] = value + + ssh_raw = raw.get("ssh") + ssh: tuple[SshEntry, ...] = () + if ssh_raw is not None: + if not isinstance(ssh_raw, list): + die(f"bottle '{name}' ssh must be an array (was {_json_type(ssh_raw)})") + ssh = tuple( + SshEntry._from_dict(name, i, entry) for i, entry in enumerate(ssh_raw) + ) + + egress_raw = raw.get("egress") + egress = ( + BottleEgress._from_dict(name, egress_raw) + if egress_raw is not None + else BottleEgress() + ) + + runtime_raw = raw.get("runtime") + if runtime_raw is None: + runtime: Runtime = "runc" + else: + if not isinstance(runtime_raw, str): + die(f"bottle '{name}' runtime must be a string (was {_json_type(runtime_raw)})") + if runtime_raw not in _SUPPORTED_RUNTIMES: + die( + f"bottle '{name}' runtime '{runtime_raw}' is not supported. " + f"Use one of: {', '.join(_SUPPORTED_RUNTIMES)}." + ) + runtime = runtime_raw # type: ignore[assignment] + + return cls(env=env, ssh=ssh, egress=egress, runtime=runtime) + + +@dataclass(frozen=True) +class Agent: + bottle: str + skills: tuple[str, ...] = () + prompt: str = "" + + @classmethod + def _from_dict(cls, name: str, raw: Any, bottle_names: set[str]) -> "Agent": + if not isinstance(raw, dict): + die(f"agent '{name}' must be a JSON object (was {_json_type(raw)})") + + bottle = raw.get("bottle") + if not isinstance(bottle, str) or not bottle: + die(f"agent '{name}' must declare a 'bottle' field naming a defined bottle") + if bottle not in bottle_names: + available = ", ".join(sorted(bottle_names)) or "(none defined)" + die( + f"agent '{name}' references bottle '{bottle}', which is not defined. " + f"Available: {available}" + ) + + skills_raw = raw.get("skills") + skills: tuple[str, ...] = () + if skills_raw is not None: + if not isinstance(skills_raw, list): + die(f"agent '{name}' skills must be an array (was {_json_type(skills_raw)})") + for i, skill in enumerate(skills_raw): + if not isinstance(skill, str): + die( + f"agent '{name}' skills[{i}] must be a string " + f"(was {_json_type(skill)})" + ) + skills = tuple(skills_raw) + + prompt_raw = raw.get("prompt") + if prompt_raw is None: + prompt = "" + elif isinstance(prompt_raw, str): + prompt = prompt_raw + else: + die(f"agent '{name}' prompt must be a string (was {_json_type(prompt_raw)})") + + return cls(bottle=bottle, skills=skills, prompt=prompt) + + +@dataclass(frozen=True) +class Manifest: + bottles: Mapping[str, Bottle] + agents: Mapping[str, Agent] + + @classmethod + def resolve(cls, cwd: str) -> "Manifest": + """Look for claude-bottle.json in and in $HOME, deep-merge + them (cwd entries override home entries on key conflict for both + bottles and agents), then validate. Dies if neither file is + found, either is invalid JSON, or the merged shape violates the + schema.""" + cwd_file = Path(cwd) / "claude-bottle.json" + home_file = Path(os.environ["HOME"]) / "claude-bottle.json" + + cwd_doc = _load_json_or_die(cwd_file) if cwd_file.is_file() else None + home_doc = _load_json_or_die(home_file) if home_file.is_file() else None + + if cwd_doc is None and home_doc is None: + die(f"no claude-bottle.json found in {cwd} or {os.environ['HOME']}") + + h = home_doc or {} + c = cwd_doc or {} + merged: dict[str, Any] = { + "bottles": {**(h.get("bottles") or {}), **(c.get("bottles") or {})}, + "agents": {**(h.get("agents") or {}), **(c.get("agents") or {})}, + } + return cls.from_json_obj(merged) + + @classmethod + def from_json_obj(cls, obj: Any) -> "Manifest": + """Validate and build a Manifest from a raw JSON-like dict.""" + if not isinstance(obj, dict): + die(f"manifest must be a JSON object (was {_json_type(obj)})") + + raw_bottles = obj.get("bottles") or {} + raw_agents = obj.get("agents") or {} + if not isinstance(raw_bottles, dict): + die(f"manifest 'bottles' must be a JSON object (was {_json_type(raw_bottles)})") + if not isinstance(raw_agents, dict): + die(f"manifest 'agents' must be a JSON object (was {_json_type(raw_agents)})") + + bottles = {n: Bottle._from_dict(n, b) for n, b in raw_bottles.items()} + bottle_names = set(bottles.keys()) + agents = {n: Agent._from_dict(n, a, bottle_names) for n, a in raw_agents.items()} + return cls(bottles=bottles, agents=agents) + + def has_agent(self, name: str) -> bool: + return name in self.agents + + def require_agent(self, name: str) -> None: + if self.has_agent(name): + return + available = ", ".join(self.agents.keys()) + if available: + die(f"agent '{name}' not defined in claude-bottle.json. Available: {available}") + die(f"agent '{name}' not defined in claude-bottle.json (manifest is empty).") + + def has_bottle(self, name: str) -> bool: + return name in self.bottles + + def require_bottle(self, name: str) -> None: + if self.has_bottle(name): + return + available = ", ".join(self.bottles.keys()) + if available: + die( + f"bottle '{name}' not defined in claude-bottle.json. " + f"Available bottles: {available}" + ) + die(f"bottle '{name}' not defined in claude-bottle.json (no bottles defined).") + + def bottle_for(self, agent_name: str) -> Bottle: + """Resolve the Bottle the named agent references. The validator + guarantees both lookups succeed for a manifest built via + from_json_obj.""" + return self.bottles[self.agents[agent_name].bottle] + + +def _load_json_or_die(path: Path) -> dict[str, Any]: try: with path.open() as f: doc = json.load(f) @@ -225,90 +301,32 @@ def _load_json_or_die(path: Path) -> dict[Any, Any]: die(f"claude-bottle.json at {path} is not valid JSON") if not isinstance(doc, dict): die(f"claude-bottle.json at {path} must be a JSON object") - return cast(dict[Any, Any], doc) + return doc -def manifest_has_agent(manifest: Manifest, name: str) -> bool: - return name in manifest["agents"] +def _opt_str(value: Any, label: str) -> str: + if value is None: + return "" + if not isinstance(value, str): + die(f"{label} must be a string (was {_json_type(value)})") + return value -def manifest_require_agent(manifest: Manifest, name: str) -> None: - """Like has_agent but dies with the available agent names listed.""" - if manifest_has_agent(manifest, name): - return - available = ", ".join(manifest["agents"].keys()) - if available: - die(f"agent '{name}' not defined in claude-bottle.json. Available: {available}") - else: - die(f"agent '{name}' not defined in claude-bottle.json (manifest is empty).") - - -def manifest_env_names(manifest: Manifest, name: str) -> list[str]: - """Names (not values) of bottles[agent.bottle].env, in declaration - order. Empty if the agent's bottle has no env.""" - bottle_name = manifest["agents"][name]["bottle"] - return list(manifest["bottles"][bottle_name].get("env", {}).keys()) - - -def manifest_env_entry(manifest: Manifest, agent: str, var: str) -> str: - """Raw string value of one env entry. Used by env_resolve, which - classifies the result by sentinel.""" - bottle_name = manifest["agents"][agent]["bottle"] - return manifest["bottles"][bottle_name]["env"][var] - - -def manifest_skills(manifest: Manifest, name: str) -> list[str]: - return list(manifest["agents"][name].get("skills", [])) - - -def manifest_prompt(manifest: Manifest, name: str) -> str: - return manifest["agents"][name].get("prompt", "") - - -def manifest_agent_bottle(manifest: Manifest, name: str) -> str: - return manifest["agents"][name]["bottle"] - - -def manifest_has_bottle(manifest: Manifest, bottle_name: str) -> bool: - return bottle_name in manifest["bottles"] - - -def manifest_require_bottle(manifest: Manifest, bottle_name: str) -> None: - if manifest_has_bottle(manifest, bottle_name): - return - available = ", ".join(manifest["bottles"].keys()) - if available: - die( - f"bottle '{bottle_name}' not defined in claude-bottle.json. " - f"Available bottles: {available}" - ) - else: - die(f"bottle '{bottle_name}' not defined in claude-bottle.json (no bottles defined).") - - -def manifest_bottle_ssh(manifest: Manifest, bottle_name: str) -> list[SshEntry]: - return list(manifest["bottles"][bottle_name].get("ssh", [])) - - -def manifest_bottle_runtime(manifest: Manifest, bottle_name: str) -> str: - """Container runtime for the bottle's agent container. Returns - "runc" (Docker default) or "runsc" (gVisor opt-in).""" - return manifest["bottles"][bottle_name].get("runtime", "runc") - - -def manifest_bottle_egress_allowlist(manifest: Manifest, bottle_name: str) -> list[str]: - """Hostnames in bottles[bottle_name].egress.allowlist.""" - return list(manifest["bottles"][bottle_name].get("egress", {}).get("allowlist", [])) - - -def manifest_ssh(manifest: Manifest, agent_name: str) -> list[SshEntry]: - """SSH entries resolved via the agent's "bottle" field.""" - bottle_name = manifest_agent_bottle(manifest, agent_name) - return manifest_bottle_ssh(manifest, bottle_name) +def _opt_port(value: Any, label: str) -> str: + """Port accepts string or int (JSON-friendly) and is normalized to str.""" + if value is None: + return "" + if isinstance(value, bool): + die(f"{label} must be a string or number (was boolean)") + if isinstance(value, int): + return str(value) + if isinstance(value, str): + return value + die(f"{label} must be a string or number (was {_json_type(value)})") def _json_type(value: Any) -> str: - """Mirror jq's type names for parity with the bash error messages.""" + """Mirror jq's type names for parity with the original bash error messages.""" if value is None: return "null" if isinstance(value, bool): diff --git a/claude_bottle/pipelock.py b/claude_bottle/pipelock.py index b4b2256..e6f2bb9 100644 --- a/claude_bottle/pipelock.py +++ b/claude_bottle/pipelock.py @@ -18,7 +18,7 @@ import subprocess from pathlib import Path from .log import die, info, warn -from .manifest import Manifest, manifest_bottle_egress_allowlist, manifest_bottle_ssh +from .manifest import Manifest # Pipelock image, pinned by digest. The digest is the multi-arch image # index for ghcr.io/luckypipewrench/pipelock:2.3.0. @@ -58,23 +58,12 @@ def pipelock_proxy_host_port(slug: str) -> str: def pipelock_bottle_allowlist(manifest: Manifest, bottle_name: str) -> list[str]: - """Hostnames in bottles[].egress.allowlist. Validates - that each entry is a string.""" - raw = manifest_bottle_egress_allowlist(manifest, bottle_name) - for entry in raw: - if not isinstance(entry, str): - t = _json_type(entry) - die(f"bottle '{bottle_name}' egress.allowlist must contain only strings; found a '{t}' entry.") - return list(raw) + """Hostnames in bottles[].egress.allowlist.""" + return list(manifest.bottles[bottle_name].egress.allowlist) def pipelock_bottle_ssh_hostnames(manifest: Manifest, bottle_name: str) -> list[str]: - out: list[str] = [] - for entry in manifest_bottle_ssh(manifest, bottle_name): - h = entry.get("Hostname") or "" - if h: - out.append(h) - return out + return [e.Hostname for e in manifest.bottles[bottle_name].ssh if e.Hostname] _IPV4_RE = re.compile(r"^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+$") @@ -256,19 +245,3 @@ def pipelock_stop(slug: str) -> None: stderr=subprocess.DEVNULL, ).returncode != 0: warn(f"failed to remove pipelock sidecar {name}; clean up with 'docker rm -f {name}'") - - -def _json_type(value: object) -> str: - if value is None: - return "null" - if isinstance(value, bool): - return "boolean" - if isinstance(value, (int, float)): - return "number" - if isinstance(value, str): - return "string" - if isinstance(value, list): - return "array" - if isinstance(value, dict): - return "object" - return type(value).__name__ diff --git a/claude_bottle/ssh.py b/claude_bottle/ssh.py index 3c37e16..634958f 100644 --- a/claude_bottle/ssh.py +++ b/claude_bottle/ssh.py @@ -36,31 +36,26 @@ from __future__ import annotations import os import subprocess from pathlib import Path -from typing import Any +from typing import Sequence from .log import die, info +from .manifest import SshEntry -def ssh_validate_entries(entries: list[dict[str, Any]]) -> None: - """Each entry must have Host + IdentityFile, and the IdentityFile - must exist on the host (after expanding leading ~).""" +def ssh_validate_entries(entries: Sequence[SshEntry]) -> None: + """The IdentityFile must exist on the host (after expanding leading ~). + Host and IdentityFile shape are already enforced by Manifest validation.""" for entry in entries: - name = entry.get("Host", "") - key = entry.get("IdentityFile", "") - if not name: - die(f"ssh entry missing required field 'Host': {entry}") - if not key: - die(f"ssh entry '{name}' missing required field 'IdentityFile'") - key = _expand_tilde(key) + key = _expand_tilde(entry.IdentityFile) if not os.path.isfile(key): - die(f"ssh key file not found for host '{name}': {key}") + die(f"ssh key file not found for host '{entry.Host}': {key}") def ssh_setup( container: str, stage_dir: Path, proxy_host_port: str, - entries: list[dict[str, Any]], + entries: Sequence[SshEntry], ) -> None: """Set up SSH in the container so node can authenticate using each entry's key without the key file being readable by node.""" @@ -91,12 +86,12 @@ def ssh_setup( container_key_paths: list[str] = [] for entry in entries: - name = entry["Host"] - key = _expand_tilde(entry["IdentityFile"]) - hostname = entry["Hostname"] - user = entry["User"] - port = str(entry["Port"]) - known_host_key = entry.get("KnownHostKey", "") + name = entry.Host + key = _expand_tilde(entry.IdentityFile) + hostname = entry.Hostname + user = entry.User + port = entry.Port + known_host_key = entry.KnownHostKey key_basename = os.path.basename(key) container_key_path = f"{keys_dir}/{key_basename}" diff --git a/tests/fixtures.py b/tests/fixtures.py index d4c6432..fae2f66 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -1,4 +1,7 @@ -"""Manifest fixtures for the test suite.""" +"""Manifest fixtures for the test suite. Each fixture returns a built +Manifest dataclass; callers that need the raw JSON shape (e.g. to write +to a file on disk) can build it themselves or call .from_json_obj on +a dict literal in the test.""" from __future__ import annotations @@ -7,9 +10,11 @@ import tempfile from pathlib import Path from typing import Any +from claude_bottle.manifest import Manifest -def fixture_minimal() -> dict[str, Any]: - """One bottle, one agent, no env / ssh / skills.""" + +def fixture_minimal_dict() -> dict[str, Any]: + """One bottle, one agent, no env / ssh / skills. JSON shape.""" return { "bottles": {"dev": {}}, "agents": { @@ -18,8 +23,8 @@ def fixture_minimal() -> dict[str, Any]: } -def fixture_with_egress() -> dict[str, Any]: - """Bottle declares an egress.allowlist.""" +def fixture_with_egress_dict() -> dict[str, Any]: + """Bottle declares an egress.allowlist. JSON shape.""" return { "bottles": { "dev": { @@ -32,9 +37,9 @@ def fixture_with_egress() -> dict[str, Any]: } -def fixture_with_ssh() -> dict[str, Any]: +def fixture_with_ssh_dict() -> dict[str, Any]: """Bottle has both an IPv4-literal SSH host (CGNAT) and a hostname host, - exercising both ssrf.ip_allowlist and trusted_domains code paths.""" + exercising both ssrf.ip_allowlist and trusted_domains code paths. JSON shape.""" return { "bottles": { "dev": { @@ -60,8 +65,22 @@ def fixture_with_ssh() -> dict[str, Any]: } +def fixture_minimal() -> Manifest: + return Manifest.from_json_obj(fixture_minimal_dict()) + + +def fixture_with_egress() -> Manifest: + return Manifest.from_json_obj(fixture_with_egress_dict()) + + +def fixture_with_ssh() -> Manifest: + return Manifest.from_json_obj(fixture_with_ssh_dict()) + + def write_fixture(fn) -> Path: - """Write fixture dict to a temp file; return the path. Caller must rm.""" + """Write fixture JSON to a temp file; return the path. Caller must rm. + Accepts a function returning either a dict (JSON shape) or a Manifest; + only the dict form is supported here since we need to serialize.""" f = tempfile.NamedTemporaryFile( mode="w", suffix=".json", delete=False, encoding="utf-8" ) diff --git a/tests/test_manifest_runtime.py b/tests/test_manifest_runtime.py index eeec914..829ce67 100644 --- a/tests/test_manifest_runtime.py +++ b/tests/test_manifest_runtime.py @@ -1,16 +1,18 @@ -"""Unit: bottle runtime — manifest_bottle_runtime returns the configured -runtime (defaulting to runc); manifest_validate rejects unknown values, -non-strings, and empty strings.""" +"""Unit: bottle runtime — Manifest.from_json_obj defaults runtime to runc, +accepts runsc, and rejects unknown values, non-strings, and empty strings.""" import unittest from claude_bottle.log import Die -from claude_bottle.manifest import manifest_bottle_runtime, manifest_validate +from claude_bottle.manifest import Manifest -def _bottle(runtime_value: object | None) -> dict: - """Build a minimal manifest with one bottle whose runtime field is - set (or absent if `runtime_value is _ABSENT`).""" +_ABSENT = object() + + +def _bottle(runtime_value: object) -> dict: + """Build a minimal manifest JSON shape with one bottle whose runtime + field is set (or absent if `runtime_value is _ABSENT`).""" bottle: dict = {} if runtime_value is not _ABSENT: bottle["runtime"] = runtime_value @@ -20,30 +22,30 @@ def _bottle(runtime_value: object | None) -> dict: } -_ABSENT = object() - - class TestManifestBottleRuntime(unittest.TestCase): def test_default_runc_when_absent(self): - self.assertEqual("runc", manifest_bottle_runtime(_bottle(_ABSENT), "dev")) + m = Manifest.from_json_obj(_bottle(_ABSENT)) + self.assertEqual("runc", m.bottles["dev"].runtime) def test_explicit_runc(self): - self.assertEqual("runc", manifest_bottle_runtime(_bottle("runc"), "dev")) + m = Manifest.from_json_obj(_bottle("runc")) + self.assertEqual("runc", m.bottles["dev"].runtime) def test_explicit_runsc(self): - self.assertEqual("runsc", manifest_bottle_runtime(_bottle("runsc"), "dev")) + m = Manifest.from_json_obj(_bottle("runsc")) + self.assertEqual("runsc", m.bottles["dev"].runtime) def test_rejects_unknown_runtime(self): with self.assertRaises(Die): - manifest_validate(_bottle("kata-runtime")) + Manifest.from_json_obj(_bottle("kata-runtime")) def test_rejects_non_string(self): with self.assertRaises(Die): - manifest_validate(_bottle(42)) + Manifest.from_json_obj(_bottle(42)) def test_rejects_empty_string(self): with self.assertRaises(Die): - manifest_validate(_bottle("")) + Manifest.from_json_obj(_bottle("")) if __name__ == "__main__": diff --git a/tests/test_pipelock_allowlist.py b/tests/test_pipelock_allowlist.py index 08911fd..887cfcf 100644 --- a/tests/test_pipelock_allowlist.py +++ b/tests/test_pipelock_allowlist.py @@ -5,6 +5,7 @@ pipelock_bottle_ssh_trusted_domains, pipelock_effective_allowlist.""" import unittest from claude_bottle.log import Die +from claude_bottle.manifest import Manifest from claude_bottle.pipelock import ( pipelock_bottle_allowlist, pipelock_bottle_ssh_hostnames, @@ -32,7 +33,7 @@ class TestBottleAllowlist(unittest.TestCase): "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, } with self.assertRaises(Die): - pipelock_bottle_allowlist(bad, "dev") + Manifest.from_json_obj(bad) class TestSSHHostnames(unittest.TestCase): @@ -54,7 +55,7 @@ class TestSSHHostnames(unittest.TestCase): class TestEffectiveAllowlist(unittest.TestCase): def test_union_and_dedup(self): - manifest = { + manifest = Manifest.from_json_obj({ "bottles": { "dev": { "egress": {"allowlist": ["registry.npmjs.org"]}, @@ -67,7 +68,7 @@ class TestEffectiveAllowlist(unittest.TestCase): } }, "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, - } + }) eff = pipelock_effective_allowlist(manifest, "dev") self.assertIn("api.anthropic.com", eff) self.assertIn("registry.npmjs.org", eff) diff --git a/tests/test_pipelock_yaml.py b/tests/test_pipelock_yaml.py index 184bf83..9602458 100644 --- a/tests/test_pipelock_yaml.py +++ b/tests/test_pipelock_yaml.py @@ -7,6 +7,7 @@ import tempfile import unittest from pathlib import Path +from claude_bottle.manifest import Manifest from claude_bottle.pipelock import pipelock_write_yaml from tests.fixtures import fixture_minimal, fixture_with_ssh @@ -50,7 +51,7 @@ class TestPipelockYaml(unittest.TestCase): self.assertIn("100.78.141.42", content) def test_secret_hygiene(self): - manifest = { + manifest = Manifest.from_json_obj({ "bottles": { "dev": { "env": { @@ -61,7 +62,7 @@ class TestPipelockYaml(unittest.TestCase): } }, "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, - } + }) yaml_path = self.out_dir / "secret.yaml" pipelock_write_yaml(manifest, "dev", yaml_path) content = yaml_path.read_text() From e9a3de49aff436ccfe28618d4ba649fa477a3553 Mon Sep 17 00:00:00 2001 From: didericis Date: Sun, 10 May 2026 21:34:03 -0400 Subject: [PATCH 3/4] fix(types): make manifest.py clean under pyright strict - log.die() typed NoReturn so pyright knows it terminates control flow (was returning the unreachable Die instance type). - manifest.py: raw inputs typed object (not Any) and narrowed via a new _as_json_object helper that validates str keys and returns dict[str, object]. Eliminates the Unknown cascade through .get() calls under strict. - _from_dict classmethods renamed to from_dict so cross-class construction (Bottle.from_dict from Manifest.from_json_obj, etc.) doesn't trip reportPrivateUsage. - _SUPPORTED_RUNTIMES typed tuple[Runtime, ...] so the membership check narrows runtime_raw to Literal["runc", "runsc"] and the # type: ignore[assignment] is no longer needed. - Bottle.env uses a typed _empty_str_dict factory; bare dict resolves to dict[Unknown, Unknown] under strict. Co-Authored-By: Claude Opus 4.7 --- claude_bottle/log.py | 3 +- claude_bottle/manifest.py | 166 ++++++++++++++++++++++---------------- 2 files changed, 97 insertions(+), 72 deletions(-) diff --git a/claude_bottle/log.py b/claude_bottle/log.py index b666e1e..4fc93cf 100644 --- a/claude_bottle/log.py +++ b/claude_bottle/log.py @@ -3,6 +3,7 @@ from __future__ import annotations import sys +from typing import NoReturn def info(msg: str) -> None: @@ -18,6 +19,6 @@ class Die(SystemExit): fatal exit from an unrelated SystemExit.""" -def die(msg: str) -> "Die": +def die(msg: str) -> NoReturn: print(f"claude-bottle: error: {msg}", file=sys.stderr) raise Die(1) diff --git a/claude_bottle/manifest.py b/claude_bottle/manifest.py index fc87a76..5f30d35 100644 --- a/claude_bottle/manifest.py +++ b/claude_bottle/manifest.py @@ -33,13 +33,17 @@ import json import os from dataclasses import dataclass, field from pathlib import Path -from typing import Any, Literal, Mapping +from typing import Literal, Mapping, cast from .log import die -_SUPPORTED_RUNTIMES: tuple[str, ...] = ("runc", "runsc") Runtime = Literal["runc", "runsc"] +_SUPPORTED_RUNTIMES: tuple[Runtime, ...] = ("runc", "runsc") + + +def _empty_str_dict() -> dict[str, str]: + return {} @dataclass(frozen=True) @@ -52,26 +56,22 @@ class SshEntry: KnownHostKey: str = "" @classmethod - def _from_dict(cls, bottle_name: str, idx: int, raw: Any) -> "SshEntry": - if not isinstance(raw, dict): - die( - f"bottle '{bottle_name}' ssh[{idx}] must be a JSON object " - f"(was {_json_type(raw)})" - ) - host = raw.get("Host") + def from_dict(cls, bottle_name: str, idx: int, raw: object) -> "SshEntry": + d = _as_json_object(raw, f"bottle '{bottle_name}' ssh[{idx}]") + host = d.get("Host") if not isinstance(host, str) or not host: die(f"bottle '{bottle_name}' ssh[{idx}] missing required string field 'Host'") - ident = raw.get("IdentityFile") + ident = d.get("IdentityFile") if not isinstance(ident, str) or not ident: die( f"bottle '{bottle_name}' ssh '{host}' missing required string field " f"'IdentityFile'" ) - hostname = _opt_str(raw.get("Hostname"), f"bottle '{bottle_name}' ssh '{host}' Hostname") - user = _opt_str(raw.get("User"), f"bottle '{bottle_name}' ssh '{host}' User") - port = _opt_port(raw.get("Port"), f"bottle '{bottle_name}' ssh '{host}' Port") + hostname = _opt_str(d.get("Hostname"), f"bottle '{bottle_name}' ssh '{host}' Hostname") + user = _opt_str(d.get("User"), f"bottle '{bottle_name}' ssh '{host}' User") + port = _opt_port(d.get("Port"), f"bottle '{bottle_name}' ssh '{host}' Port") khk = _opt_str( - raw.get("KnownHostKey"), + d.get("KnownHostKey"), f"bottle '{bottle_name}' ssh '{host}' KnownHostKey", ) return cls( @@ -89,13 +89,9 @@ class BottleEgress: allowlist: tuple[str, ...] = () @classmethod - def _from_dict(cls, bottle_name: str, raw: Any) -> "BottleEgress": - if not isinstance(raw, dict): - die( - f"bottle '{bottle_name}' egress must be a JSON object " - f"(was {_json_type(raw)})" - ) - allow = raw.get("allowlist") + def from_dict(cls, bottle_name: str, raw: object) -> "BottleEgress": + d = _as_json_object(raw, f"bottle '{bottle_name}' egress") + allow = d.get("allowlist") if allow is None: return cls() if not isinstance(allow, list): @@ -103,33 +99,34 @@ class BottleEgress: f"bottle '{bottle_name}' egress.allowlist must be an array " f"(was {_json_type(allow)})" ) - for i, host in enumerate(allow): + items: list[str] = [] + allow_list = cast(list[object], allow) + for i, host in enumerate(allow_list): if not isinstance(host, str): die( f"bottle '{bottle_name}' egress.allowlist[{i}] must be a string " f"(was {_json_type(host)})" ) - return cls(allowlist=tuple(allow)) + items.append(host) + return cls(allowlist=tuple(items)) @dataclass(frozen=True) class Bottle: - env: Mapping[str, str] = field(default_factory=dict) + env: Mapping[str, str] = field(default_factory=_empty_str_dict) ssh: tuple[SshEntry, ...] = () egress: BottleEgress = field(default_factory=BottleEgress) runtime: Runtime = "runc" @classmethod - def _from_dict(cls, name: str, raw: Any) -> "Bottle": - if not isinstance(raw, dict): - die(f"bottle '{name}' must be a JSON object (was {_json_type(raw)})") + def from_dict(cls, name: str, raw: object) -> "Bottle": + d = _as_json_object(raw, f"bottle '{name}'") - env_raw = raw.get("env") env: dict[str, str] = {} + env_raw = d.get("env") if env_raw is not None: - if not isinstance(env_raw, dict): - die(f"bottle '{name}' env must be a JSON object (was {_json_type(env_raw)})") - for var, value in env_raw.items(): + env_dict = _as_json_object(env_raw, f"bottle '{name}' env") + for var, value in env_dict.items(): if not isinstance(value, str): die( f"env entry {var} in bottle '{name}' must be a JSON string " @@ -137,25 +134,28 @@ class Bottle: ) env[var] = value - ssh_raw = raw.get("ssh") ssh: tuple[SshEntry, ...] = () + ssh_raw = d.get("ssh") if ssh_raw is not None: if not isinstance(ssh_raw, list): die(f"bottle '{name}' ssh must be an array (was {_json_type(ssh_raw)})") + ssh_list = cast(list[object], ssh_raw) ssh = tuple( - SshEntry._from_dict(name, i, entry) for i, entry in enumerate(ssh_raw) + SshEntry.from_dict(name, i, entry) + for i, entry in enumerate(ssh_list) ) - egress_raw = raw.get("egress") + egress_raw = d.get("egress") egress = ( - BottleEgress._from_dict(name, egress_raw) + BottleEgress.from_dict(name, egress_raw) if egress_raw is not None else BottleEgress() ) - runtime_raw = raw.get("runtime") + runtime_raw = d.get("runtime") + runtime: Runtime if runtime_raw is None: - runtime: Runtime = "runc" + runtime = "runc" else: if not isinstance(runtime_raw, str): die(f"bottle '{name}' runtime must be a string (was {_json_type(runtime_raw)})") @@ -164,7 +164,7 @@ class Bottle: f"bottle '{name}' runtime '{runtime_raw}' is not supported. " f"Use one of: {', '.join(_SUPPORTED_RUNTIMES)}." ) - runtime = runtime_raw # type: ignore[assignment] + runtime = runtime_raw return cls(env=env, ssh=ssh, egress=egress, runtime=runtime) @@ -176,11 +176,10 @@ class Agent: prompt: str = "" @classmethod - def _from_dict(cls, name: str, raw: Any, bottle_names: set[str]) -> "Agent": - if not isinstance(raw, dict): - die(f"agent '{name}' must be a JSON object (was {_json_type(raw)})") + def from_dict(cls, name: str, raw: object, bottle_names: set[str]) -> "Agent": + d = _as_json_object(raw, f"agent '{name}'") - bottle = raw.get("bottle") + bottle = d.get("bottle") if not isinstance(bottle, str) or not bottle: die(f"agent '{name}' must declare a 'bottle' field naming a defined bottle") if bottle not in bottle_names: @@ -190,20 +189,23 @@ class Agent: f"Available: {available}" ) - skills_raw = raw.get("skills") skills: tuple[str, ...] = () + skills_raw = d.get("skills") if skills_raw is not None: if not isinstance(skills_raw, list): die(f"agent '{name}' skills must be an array (was {_json_type(skills_raw)})") - for i, skill in enumerate(skills_raw): + collected: list[str] = [] + skills_list = cast(list[object], skills_raw) + for i, skill in enumerate(skills_list): if not isinstance(skill, str): die( f"agent '{name}' skills[{i}] must be a string " f"(was {_json_type(skill)})" ) - skills = tuple(skills_raw) + collected.append(skill) + skills = tuple(collected) - prompt_raw = raw.get("prompt") + prompt_raw = d.get("prompt") if prompt_raw is None: prompt = "" elif isinstance(prompt_raw, str): @@ -235,30 +237,32 @@ class Manifest: if cwd_doc is None and home_doc is None: die(f"no claude-bottle.json found in {cwd} or {os.environ['HOME']}") - h = home_doc or {} - c = cwd_doc or {} - merged: dict[str, Any] = { - "bottles": {**(h.get("bottles") or {}), **(c.get("bottles") or {})}, - "agents": {**(h.get("agents") or {}), **(c.get("agents") or {})}, + h: dict[str, object] = home_doc if home_doc is not None else {} + c: dict[str, object] = cwd_doc if cwd_doc is not None else {} + h_bottles = _section_dict(h.get("bottles"), "bottles") + c_bottles = _section_dict(c.get("bottles"), "bottles") + h_agents = _section_dict(h.get("agents"), "agents") + c_agents = _section_dict(c.get("agents"), "agents") + merged: dict[str, object] = { + "bottles": {**h_bottles, **c_bottles}, + "agents": {**h_agents, **c_agents}, } return cls.from_json_obj(merged) @classmethod - def from_json_obj(cls, obj: Any) -> "Manifest": + def from_json_obj(cls, obj: object) -> "Manifest": """Validate and build a Manifest from a raw JSON-like dict.""" - if not isinstance(obj, dict): - die(f"manifest must be a JSON object (was {_json_type(obj)})") + d = _as_json_object(obj, "manifest") + raw_bottles = _section_dict(d.get("bottles"), "manifest 'bottles'") + raw_agents = _section_dict(d.get("agents"), "manifest 'agents'") - raw_bottles = obj.get("bottles") or {} - raw_agents = obj.get("agents") or {} - if not isinstance(raw_bottles, dict): - die(f"manifest 'bottles' must be a JSON object (was {_json_type(raw_bottles)})") - if not isinstance(raw_agents, dict): - die(f"manifest 'agents' must be a JSON object (was {_json_type(raw_agents)})") - - bottles = {n: Bottle._from_dict(n, b) for n, b in raw_bottles.items()} + bottles: dict[str, Bottle] = { + n: Bottle.from_dict(n, b) for n, b in raw_bottles.items() + } bottle_names = set(bottles.keys()) - agents = {n: Agent._from_dict(n, a, bottle_names) for n, a in raw_agents.items()} + agents: dict[str, Agent] = { + n: Agent.from_dict(n, a, bottle_names) for n, a in raw_agents.items() + } return cls(bottles=bottles, agents=agents) def has_agent(self, name: str) -> bool: @@ -293,18 +297,38 @@ class Manifest: return self.bottles[self.agents[agent_name].bottle] -def _load_json_or_die(path: Path) -> dict[str, Any]: +def _as_json_object(value: object, label: str) -> dict[str, object]: + """Assert that `value` is a JSON object (str-keyed dict) and return + a view typed as `dict[str, object]` so downstream `.get(...)` calls + have a typed surface.""" + if not isinstance(value, dict): + die(f"{label} must be a JSON object (was {_json_type(value)})") + items = cast(dict[object, object], value) + out: dict[str, object] = {} + for k, v in items.items(): + if not isinstance(k, str): + die(f"{label} keys must be strings (found {_json_type(k)})") + out[k] = v + return out + + +def _section_dict(value: object, label: str) -> dict[str, object]: + """Like _as_json_object but treats absent/null as an empty section.""" + if value is None: + return {} + return _as_json_object(value, label) + + +def _load_json_or_die(path: Path) -> dict[str, object]: try: with path.open() as f: - doc = json.load(f) + doc: object = json.load(f) except json.JSONDecodeError: die(f"claude-bottle.json at {path} is not valid JSON") - if not isinstance(doc, dict): - die(f"claude-bottle.json at {path} must be a JSON object") - return doc + return _as_json_object(doc, f"claude-bottle.json at {path}") -def _opt_str(value: Any, label: str) -> str: +def _opt_str(value: object, label: str) -> str: if value is None: return "" if not isinstance(value, str): @@ -312,7 +336,7 @@ def _opt_str(value: Any, label: str) -> str: return value -def _opt_port(value: Any, label: str) -> str: +def _opt_port(value: object, label: str) -> str: """Port accepts string or int (JSON-friendly) and is normalized to str.""" if value is None: return "" @@ -325,7 +349,7 @@ def _opt_port(value: Any, label: str) -> str: die(f"{label} must be a string or number (was {_json_type(value)})") -def _json_type(value: Any) -> str: +def _json_type(value: object) -> str: """Mirror jq's type names for parity with the original bash error messages.""" if value is None: return "null" From 9343f6f21d3ca0f5c84fd5a4dba4e34d364a4488 Mon Sep 17 00:00:00 2001 From: didericis Date: Sun, 10 May 2026 21:36:38 -0400 Subject: [PATCH 4/4] refactor(manifest): drop _json_type, use type(x).__name__ in error messages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The jq-style mapping (bool→"boolean", list→"array", None→"null", etc.) existed only to match the original bash error wording. Not worth the extra function; Python's native type names are clear enough. Co-Authored-By: Claude Opus 4.7 --- claude_bottle/manifest.py | 41 ++++++++++++--------------------------- 1 file changed, 12 insertions(+), 29 deletions(-) diff --git a/claude_bottle/manifest.py b/claude_bottle/manifest.py index 5f30d35..20b6975 100644 --- a/claude_bottle/manifest.py +++ b/claude_bottle/manifest.py @@ -97,7 +97,7 @@ class BottleEgress: if not isinstance(allow, list): die( f"bottle '{bottle_name}' egress.allowlist must be an array " - f"(was {_json_type(allow)})" + f"(was {type(allow).__name__})" ) items: list[str] = [] allow_list = cast(list[object], allow) @@ -105,7 +105,7 @@ class BottleEgress: if not isinstance(host, str): die( f"bottle '{bottle_name}' egress.allowlist[{i}] must be a string " - f"(was {_json_type(host)})" + f"(was {type(host).__name__})" ) items.append(host) return cls(allowlist=tuple(items)) @@ -130,7 +130,7 @@ class Bottle: if not isinstance(value, str): die( f"env entry {var} in bottle '{name}' must be a JSON string " - f"(was {_json_type(value)}). Use \"?\" for prompt-at-runtime." + f"(was {type(value).__name__}). Use \"?\" for prompt-at-runtime." ) env[var] = value @@ -138,7 +138,7 @@ class Bottle: ssh_raw = d.get("ssh") if ssh_raw is not None: if not isinstance(ssh_raw, list): - die(f"bottle '{name}' ssh must be an array (was {_json_type(ssh_raw)})") + die(f"bottle '{name}' ssh must be an array (was {type(ssh_raw).__name__})") ssh_list = cast(list[object], ssh_raw) ssh = tuple( SshEntry.from_dict(name, i, entry) @@ -158,7 +158,7 @@ class Bottle: runtime = "runc" else: if not isinstance(runtime_raw, str): - die(f"bottle '{name}' runtime must be a string (was {_json_type(runtime_raw)})") + die(f"bottle '{name}' runtime must be a string (was {type(runtime_raw).__name__})") if runtime_raw not in _SUPPORTED_RUNTIMES: die( f"bottle '{name}' runtime '{runtime_raw}' is not supported. " @@ -193,14 +193,14 @@ class Agent: skills_raw = d.get("skills") if skills_raw is not None: if not isinstance(skills_raw, list): - die(f"agent '{name}' skills must be an array (was {_json_type(skills_raw)})") + die(f"agent '{name}' skills must be an array (was {type(skills_raw).__name__})") collected: list[str] = [] skills_list = cast(list[object], skills_raw) for i, skill in enumerate(skills_list): if not isinstance(skill, str): die( f"agent '{name}' skills[{i}] must be a string " - f"(was {_json_type(skill)})" + f"(was {type(skill).__name__})" ) collected.append(skill) skills = tuple(collected) @@ -211,7 +211,7 @@ class Agent: elif isinstance(prompt_raw, str): prompt = prompt_raw else: - die(f"agent '{name}' prompt must be a string (was {_json_type(prompt_raw)})") + die(f"agent '{name}' prompt must be a string (was {type(prompt_raw).__name__})") return cls(bottle=bottle, skills=skills, prompt=prompt) @@ -302,12 +302,12 @@ def _as_json_object(value: object, label: str) -> dict[str, object]: a view typed as `dict[str, object]` so downstream `.get(...)` calls have a typed surface.""" if not isinstance(value, dict): - die(f"{label} must be a JSON object (was {_json_type(value)})") + die(f"{label} must be a JSON object (was {type(value).__name__})") items = cast(dict[object, object], value) out: dict[str, object] = {} for k, v in items.items(): if not isinstance(k, str): - die(f"{label} keys must be strings (found {_json_type(k)})") + die(f"{label} keys must be strings (found {type(k).__name__})") out[k] = v return out @@ -332,7 +332,7 @@ def _opt_str(value: object, label: str) -> str: if value is None: return "" if not isinstance(value, str): - die(f"{label} must be a string (was {_json_type(value)})") + die(f"{label} must be a string (was {type(value).__name__})") return value @@ -346,21 +346,4 @@ def _opt_port(value: object, label: str) -> str: return str(value) if isinstance(value, str): return value - die(f"{label} must be a string or number (was {_json_type(value)})") - - -def _json_type(value: object) -> str: - """Mirror jq's type names for parity with the original bash error messages.""" - if value is None: - return "null" - if isinstance(value, bool): - return "boolean" - if isinstance(value, (int, float)): - return "number" - if isinstance(value, str): - return "string" - if isinstance(value, list): - return "array" - if isinstance(value, dict): - return "object" - return type(value).__name__ + die(f"{label} must be a string or number (was {type(value).__name__})")