diff --git a/claude_bottle/cli/info.py b/claude_bottle/cli/info.py index 77f2545..b479bd0 100644 --- a/claude_bottle/cli/info.py +++ b/claude_bottle/cli/info.py @@ -5,15 +5,7 @@ from __future__ import annotations import argparse from ..log import info -from ..manifest import ( - manifest_agent_bottle, - manifest_env_names, - manifest_prompt, - manifest_require_agent, - manifest_resolve, - manifest_skills, - manifest_ssh, -) +from ..manifest import Manifest from ._common import PROG, USER_CWD @@ -22,39 +14,33 @@ def cmd_info(argv: list[str]) -> int: parser.add_argument("name", help="agent name defined in claude-bottle.json") args = parser.parse_args(argv) - manifest = manifest_resolve(USER_CWD) - manifest_require_agent(manifest, args.name) + manifest = Manifest.resolve(USER_CWD) + manifest.require_agent(args.name) - env_names = manifest_env_names(manifest, args.name) - skill_names = manifest_skills(manifest, args.name) - prompt_content = manifest_prompt(manifest, args.name) - prompt_first_line = prompt_content.splitlines()[0] if prompt_content else "" - - bottle_name = manifest_agent_bottle(manifest, args.name) - ssh_entries = manifest_ssh(manifest, args.name) + agent = manifest.agents[args.name] + bottle = manifest.bottle_for(args.name) + env_names = list(bottle.env.keys()) + prompt_first_line = agent.prompt.splitlines()[0] if agent.prompt else "" print() info(f"agent : {args.name}") info(f"env (names only): {', '.join(env_names) if env_names else '(none)'}") - info(f"skills : {' '.join(skill_names) if skill_names else '(none)'}") + info(f"skills : {' '.join(agent.skills) if agent.skills else '(none)'}") info( - f"prompt : {len(prompt_content)} chars; " + f"prompt : {len(agent.prompt)} chars; " f"first line: {prompt_first_line or '(empty)'}" ) - if bottle_name: - info(f"bottle : {bottle_name}") - if ssh_entries: - for e in ssh_entries: - info( - f" ssh host : {e.get('Host')} " - f"(Hostname={e.get('Hostname')}, User={e.get('User')}, " - f"Port={e.get('Port')}, IdentityFile={e.get('IdentityFile')})" - ) - if e.get("KnownHostKey"): - info(f" KnownHostKey: {e['KnownHostKey']}") - else: - info(" ssh hosts : (none)") + info(f"bottle : {agent.bottle}") + if bottle.ssh: + for e in bottle.ssh: + info( + f" ssh host : {e.Host} " + f"(Hostname={e.Hostname}, User={e.User}, " + f"Port={e.Port}, IdentityFile={e.IdentityFile})" + ) + if e.KnownHostKey: + info(f" KnownHostKey: {e.KnownHostKey}") else: - info("bottle : (none)") + info(" ssh hosts : (none)") print() return 0 diff --git a/claude_bottle/cli/list.py b/claude_bottle/cli/list.py index 06f2c97..a60143b 100644 --- a/claude_bottle/cli/list.py +++ b/claude_bottle/cli/list.py @@ -7,7 +7,7 @@ import subprocess from .. import docker as docker_mod from ..log import info -from ..manifest import manifest_resolve +from ..manifest import Manifest from ._common import PROG, USER_CWD @@ -17,8 +17,8 @@ def cmd_list(argv: list[str]) -> int: args = parser.parse_args(argv) if args.scope == "available": - manifest = manifest_resolve(USER_CWD) - for name in (manifest.get("agents") or {}).keys(): + manifest = Manifest.resolve(USER_CWD) + for name in manifest.agents.keys(): print(name) return 0 diff --git a/claude_bottle/cli/start.py b/claude_bottle/cli/start.py index 5cca959..2685df6 100644 --- a/claude_bottle/cli/start.py +++ b/claude_bottle/cli/start.py @@ -19,17 +19,7 @@ from .. import skills as skills_mod from .. import ssh as ssh_mod from ..env_resolve import env_resolve from ..log import die, info -from ..manifest import ( - manifest_agent_bottle, - manifest_bottle_runtime, - manifest_env_names, - manifest_prompt, - manifest_require_agent, - manifest_require_bottle, - manifest_resolve, - manifest_skills, - manifest_ssh, -) +from ..manifest import Manifest from ._common import PROG, REPO_DIR, USER_CWD, read_tty_line @@ -57,8 +47,11 @@ def cmd_start(argv: list[str]) -> int: runtime_image = derived_image docker_mod.require_docker() - manifest = manifest_resolve(USER_CWD) - manifest_require_agent(manifest, name) + manifest = Manifest.resolve(USER_CWD) + manifest.require_agent(name) + agent = manifest.agents[name] + bottle_name = agent.bottle + bottle = manifest.bottle_for(name) container = pinned_container or default_container suffix = 2 @@ -81,7 +74,7 @@ def cmd_start(argv: list[str]) -> int: ) # --- Plan resolution (host-only, no container yet) --- - env_names = manifest_env_names(manifest, name) + env_names = list(bottle.env.keys()) # CLAUDE_BOTTLE_OAUTH_TOKEN → CLAUDE_CODE_OAUTH_TOKEN forwarding. # Host-side token is always forwarded so every container can authenticate. @@ -90,23 +83,14 @@ def cmd_start(argv: list[str]) -> int: if forward_oauth_token: display_env_names.append("CLAUDE_CODE_OAUTH_TOKEN") - skill_names = manifest_skills(manifest, name) - if skill_names: - skills_mod.skills_validate_all(skill_names) + if agent.skills: + skills_mod.skills_validate_all(list(agent.skills)) - bottle_name = manifest_agent_bottle(manifest, name) - if not bottle_name: - die( - f"agent '{name}' has no 'bottle' field. " - f"Add a bottle association to this agent in claude-bottle.json." - ) - manifest_require_bottle(manifest, bottle_name) - - runtime = manifest_bottle_runtime(manifest, bottle_name) + runtime = bottle.runtime if runtime == "runsc": docker_mod.require_runsc() - ssh_entries = manifest_ssh(manifest, name) + ssh_entries = bottle.ssh if ssh_entries: ssh_mod.ssh_validate_entries(ssh_entries) @@ -153,7 +137,7 @@ def cmd_start(argv: list[str]) -> int: env_resolve(manifest, name, env_file, args_file) - prompt_content = manifest_prompt(manifest, name) + prompt_content = agent.prompt prompt_file.write_text(prompt_content) prompt_first_line = prompt_content.splitlines()[0] if prompt_content else "" @@ -169,11 +153,11 @@ def cmd_start(argv: list[str]) -> int: "env (names only): " + (", ".join(display_env_names) if display_env_names else "(none)") ) - info("skills : " + (" ".join(skill_names) if skill_names else "(none)")) + info("skills : " + (" ".join(agent.skills) if agent.skills else "(none)")) info(f"bottle : {bottle_name}") info(f" runtime : {runtime}{' (gVisor)' if runtime == 'runsc' else ''}") if ssh_entries: - ssh_names = ", ".join(e.get("Host", "") for e in ssh_entries) + ssh_names = ", ".join(e.Host for e in ssh_entries) info(f" ssh hosts : {ssh_names}") else: info(" ssh hosts : (none)") @@ -293,8 +277,8 @@ def cmd_start(argv: list[str]) -> int: check=True, ) - if skill_names: - skills_mod.skills_copy_into(container, skill_names) + if agent.skills: + skills_mod.skills_copy_into(container, list(agent.skills)) if ssh_entries: proxy_host_port = pipelock.pipelock_proxy_host_port(slug) diff --git a/claude_bottle/env_resolve.py b/claude_bottle/env_resolve.py index 82f2980..0e0b3e7 100644 --- a/claude_bottle/env_resolve.py +++ b/claude_bottle/env_resolve.py @@ -35,7 +35,7 @@ import sys from pathlib import Path from .log import die -from .manifest import Manifest, manifest_env_entry, manifest_env_names +from .manifest import Manifest _INTERPOLATED_RE = re.compile(r"^\$\{([A-Za-z_][A-Za-z0-9_]*)\}$") @@ -108,10 +108,10 @@ def env_resolve( - interpolated: copy host value; export under target name; append `-e NAME` - literal: append `NAME=VALUE` to env_file """ - for name in manifest_env_names(manifest, agent): + bottle = manifest.bottle_for(agent) + for name, raw in bottle.env.items(): if not name: continue - raw = manifest_env_entry(manifest, agent, name) kind = env_entry_kind(raw) if kind == "secret": prompt_body = env_entry_secret_prompt(raw) diff --git a/claude_bottle/manifest.py b/claude_bottle/manifest.py index f1674ec..fc87a76 100644 --- a/claude_bottle/manifest.py +++ b/claude_bottle/manifest.py @@ -1,5 +1,5 @@ -"""Manifest helpers. Read claude-bottle.json and pull the definition for a -named agent. +"""Manifest dataclasses. Read claude-bottle.json (cwd + $HOME, deep-merged) +into a frozen, validated Manifest tree. Schema (see CLAUDE.md "Intended design"): { @@ -7,7 +7,8 @@ Schema (see CLAUDE.md "Intended design"): "": { "env": { "": , ... }, "ssh": [ , ... ], - "egress": { "allowlist": [ "", ... ] } + "egress": { "allowlist": [ "", ... ] }, + "runtime": "runc" | "runsc" } }, "agents": { @@ -21,203 +22,278 @@ Schema (see CLAUDE.md "Intended design"): Bottles group shared infrastructure (SSH keys, known hosts, egress allowlist) that multiple agents can reference. Every agent must reference a bottle. + +Validation runs once at construction (Manifest.from_json_obj) so getters +can trust the shape. """ from __future__ import annotations import json import os +from dataclasses import dataclass, field from pathlib import Path -from typing import Any, Literal, TypedDict, cast +from typing import Any, Literal, Mapping from .log import die -class SshEntry(TypedDict, total=False): - """One entry from bottle.ssh. Host and IdentityFile are required at - runtime (enforced by manifest_validate); the rest configure the - generated ~/.ssh/config block inside the container.""" +_SUPPORTED_RUNTIMES: tuple[str, ...] = ("runc", "runsc") +Runtime = Literal["runc", "runsc"] + +@dataclass(frozen=True) +class SshEntry: Host: str IdentityFile: str - Hostname: str - User: str - Port: int | str - KnownHostKey: str + Hostname: str = "" + User: str = "" + Port: str = "" + KnownHostKey: str = "" - -class BottleEgress(TypedDict, total=False): - allowlist: list[str] - - -class Bottle(TypedDict, total=False): - """Shared infrastructure referenced by agents. env values are - sentinel-prefixed strings (see env_resolve); runtime defaults to - "runc" when absent.""" - - env: dict[str, str] - ssh: list[SshEntry] - egress: BottleEgress - runtime: Literal["runc", "runsc"] - - -class Agent(TypedDict, total=False): - skills: list[str] - prompt: str - bottle: str - - -class Manifest(TypedDict, total=False): - bottles: dict[str, Bottle] - agents: dict[str, Agent] - - -_SUPPORTED_RUNTIMES: tuple[str, ...] = ("runc", "runsc") - - -def manifest_resolve(cwd: str) -> Manifest: - """Look for claude-bottle.json in and in $HOME, deep-merge - them (cwd entries override home entries on key conflict for both - bottles and agents), then validate. Dies if neither file is found, - either is invalid JSON, or the merged shape violates the schema.""" - cwd_file = Path(cwd) / "claude-bottle.json" - home_file = Path(os.environ["HOME"]) / "claude-bottle.json" - - cwd_doc = _load_json_or_die(cwd_file) if cwd_file.is_file() else None - home_doc = _load_json_or_die(home_file) if home_file.is_file() else None - - if cwd_doc is None and home_doc is None: - die(f"no claude-bottle.json found in {cwd} or {os.environ['HOME']}") - - h = home_doc if home_doc is not None else cast(Manifest, {}) - c = cwd_doc if cwd_doc is not None else cast(Manifest, {}) - manifest: Manifest = { - "bottles": {**(h.get("bottles") or {}), **(c.get("bottles") or {})}, - "agents": {**(h.get("agents") or {}), **(c.get("agents") or {})}, - } - manifest_validate(manifest) - return manifest - - -def manifest_validate(manifest: Manifest) -> None: - """Deep-validate the manifest, dying on the first schema violation. - After this returns, getters can trust that bottles and agents are - dicts, every agent has a 'bottle' field that references a defined - bottle, and optional fields (when present) have correct types.""" - bottles = manifest.get("bottles") or {} - agents = manifest.get("agents") or {} - - for bname, bottle in bottles.items(): - _validate_bottle(bname, bottle) - - for aname, agent in agents.items(): - _validate_agent(aname, agent, bottles) - - -def _validate_bottle(name: str, bottle: Any) -> None: - if not isinstance(bottle, dict): - die(f"bottle '{name}' must be a JSON object (was {_json_type(bottle)})") - bottle = cast(dict[Any, Any], bottle) - - env = bottle.get("env") - if env is not None: - if not isinstance(env, dict): - die(f"bottle '{name}' env must be a JSON object (was {_json_type(env)})") - env = cast(dict[Any, Any], env) - for var, value in env.items(): - if not isinstance(value, str): - die( - f"env entry {var} in bottle '{name}' must be a JSON string " - f"(was {_json_type(value)}). Use \"?\" for prompt-at-runtime." - ) - - ssh = bottle.get("ssh") - if ssh is not None: - if not isinstance(ssh, list): - die(f"bottle '{name}' ssh must be an array (was {_json_type(ssh)})") - ssh = cast(list[Any], ssh) - for i, entry in enumerate(ssh): - if not isinstance(entry, dict): - die( - f"bottle '{name}' ssh[{i}] must be a JSON object " - f"(was {_json_type(entry)})" - ) - entry = cast(dict[Any, Any], entry) - host = entry.get("Host") - if not isinstance(host, str) or not host: - die(f"bottle '{name}' ssh[{i}] missing required string field 'Host'") - ident = entry.get("IdentityFile") - if not isinstance(ident, str) or not ident: - die( - f"bottle '{name}' ssh '{host}' missing required string field " - f"'IdentityFile'" - ) - - egress = bottle.get("egress") - if egress is not None: - if not isinstance(egress, dict): - die(f"bottle '{name}' egress must be a JSON object (was {_json_type(egress)})") - egress = cast(dict[Any, Any], egress) - allowlist = egress.get("allowlist") - if allowlist is not None: - if not isinstance(allowlist, list): - die( - f"bottle '{name}' egress.allowlist must be an array " - f"(was {_json_type(allowlist)})" - ) - allowlist = cast(list[Any], allowlist) - for i, host in enumerate(allowlist): - if not isinstance(host, str): - die( - f"bottle '{name}' egress.allowlist[{i}] must be a string " - f"(was {_json_type(host)})" - ) - - runtime = bottle.get("runtime") - if runtime is not None: - if not isinstance(runtime, str): - die(f"bottle '{name}' runtime must be a string (was {_json_type(runtime)})") - if runtime not in _SUPPORTED_RUNTIMES: + @classmethod + def _from_dict(cls, bottle_name: str, idx: int, raw: Any) -> "SshEntry": + if not isinstance(raw, dict): die( - f"bottle '{name}' runtime '{runtime}' is not supported. " - f"Use one of: {', '.join(_SUPPORTED_RUNTIMES)}." + f"bottle '{bottle_name}' ssh[{idx}] must be a JSON object " + f"(was {_json_type(raw)})" ) - - -def _validate_agent(name: str, agent: Any, bottles: dict[str, Any]) -> None: - if not isinstance(agent, dict): - die(f"agent '{name}' must be a JSON object (was {_json_type(agent)})") - agent = cast(dict[Any, Any], agent) - - skills = agent.get("skills") - if skills is not None: - if not isinstance(skills, list): - die(f"agent '{name}' skills must be an array (was {_json_type(skills)})") - skills = cast(list[Any], skills) - for i, skill in enumerate(skills): - if not isinstance(skill, str): - die(f"agent '{name}' skills[{i}] must be a string (was {_json_type(skill)})") - - prompt = agent.get("prompt") - if prompt is not None and not isinstance(prompt, str): - die(f"agent '{name}' prompt must be a string (was {_json_type(prompt)})") - - bottle = agent.get("bottle") - if not isinstance(bottle, str) or not bottle: - die(f"agent '{name}' must declare a 'bottle' field naming a defined bottle") - if bottle not in bottles: - available = ", ".join(bottles.keys()) or "(none defined)" - die( - f"agent '{name}' references bottle '{bottle}', which is not defined. " - f"Available: {available}" + host = raw.get("Host") + if not isinstance(host, str) or not host: + die(f"bottle '{bottle_name}' ssh[{idx}] missing required string field 'Host'") + ident = raw.get("IdentityFile") + if not isinstance(ident, str) or not ident: + die( + f"bottle '{bottle_name}' ssh '{host}' missing required string field " + f"'IdentityFile'" + ) + hostname = _opt_str(raw.get("Hostname"), f"bottle '{bottle_name}' ssh '{host}' Hostname") + user = _opt_str(raw.get("User"), f"bottle '{bottle_name}' ssh '{host}' User") + port = _opt_port(raw.get("Port"), f"bottle '{bottle_name}' ssh '{host}' Port") + khk = _opt_str( + raw.get("KnownHostKey"), + f"bottle '{bottle_name}' ssh '{host}' KnownHostKey", + ) + return cls( + Host=host, + IdentityFile=ident, + Hostname=hostname, + User=user, + Port=port, + KnownHostKey=khk, ) -def _load_json_or_die(path: Path) -> dict[Any, Any]: - """Load and shallow-check. Confirms bottles/agents (when present) - are JSON objects so the merge in manifest_resolve cannot TypeError - before manifest_validate runs.""" - doc: Any = None +@dataclass(frozen=True) +class BottleEgress: + allowlist: tuple[str, ...] = () + + @classmethod + def _from_dict(cls, bottle_name: str, raw: Any) -> "BottleEgress": + if not isinstance(raw, dict): + die( + f"bottle '{bottle_name}' egress must be a JSON object " + f"(was {_json_type(raw)})" + ) + allow = raw.get("allowlist") + if allow is None: + return cls() + if not isinstance(allow, list): + die( + f"bottle '{bottle_name}' egress.allowlist must be an array " + f"(was {_json_type(allow)})" + ) + for i, host in enumerate(allow): + if not isinstance(host, str): + die( + f"bottle '{bottle_name}' egress.allowlist[{i}] must be a string " + f"(was {_json_type(host)})" + ) + return cls(allowlist=tuple(allow)) + + +@dataclass(frozen=True) +class Bottle: + env: Mapping[str, str] = field(default_factory=dict) + ssh: tuple[SshEntry, ...] = () + egress: BottleEgress = field(default_factory=BottleEgress) + runtime: Runtime = "runc" + + @classmethod + def _from_dict(cls, name: str, raw: Any) -> "Bottle": + if not isinstance(raw, dict): + die(f"bottle '{name}' must be a JSON object (was {_json_type(raw)})") + + env_raw = raw.get("env") + env: dict[str, str] = {} + if env_raw is not None: + if not isinstance(env_raw, dict): + die(f"bottle '{name}' env must be a JSON object (was {_json_type(env_raw)})") + for var, value in env_raw.items(): + if not isinstance(value, str): + die( + f"env entry {var} in bottle '{name}' must be a JSON string " + f"(was {_json_type(value)}). Use \"?\" for prompt-at-runtime." + ) + env[var] = value + + ssh_raw = raw.get("ssh") + ssh: tuple[SshEntry, ...] = () + if ssh_raw is not None: + if not isinstance(ssh_raw, list): + die(f"bottle '{name}' ssh must be an array (was {_json_type(ssh_raw)})") + ssh = tuple( + SshEntry._from_dict(name, i, entry) for i, entry in enumerate(ssh_raw) + ) + + egress_raw = raw.get("egress") + egress = ( + BottleEgress._from_dict(name, egress_raw) + if egress_raw is not None + else BottleEgress() + ) + + runtime_raw = raw.get("runtime") + if runtime_raw is None: + runtime: Runtime = "runc" + else: + if not isinstance(runtime_raw, str): + die(f"bottle '{name}' runtime must be a string (was {_json_type(runtime_raw)})") + if runtime_raw not in _SUPPORTED_RUNTIMES: + die( + f"bottle '{name}' runtime '{runtime_raw}' is not supported. " + f"Use one of: {', '.join(_SUPPORTED_RUNTIMES)}." + ) + runtime = runtime_raw # type: ignore[assignment] + + return cls(env=env, ssh=ssh, egress=egress, runtime=runtime) + + +@dataclass(frozen=True) +class Agent: + bottle: str + skills: tuple[str, ...] = () + prompt: str = "" + + @classmethod + def _from_dict(cls, name: str, raw: Any, bottle_names: set[str]) -> "Agent": + if not isinstance(raw, dict): + die(f"agent '{name}' must be a JSON object (was {_json_type(raw)})") + + bottle = raw.get("bottle") + if not isinstance(bottle, str) or not bottle: + die(f"agent '{name}' must declare a 'bottle' field naming a defined bottle") + if bottle not in bottle_names: + available = ", ".join(sorted(bottle_names)) or "(none defined)" + die( + f"agent '{name}' references bottle '{bottle}', which is not defined. " + f"Available: {available}" + ) + + skills_raw = raw.get("skills") + skills: tuple[str, ...] = () + if skills_raw is not None: + if not isinstance(skills_raw, list): + die(f"agent '{name}' skills must be an array (was {_json_type(skills_raw)})") + for i, skill in enumerate(skills_raw): + if not isinstance(skill, str): + die( + f"agent '{name}' skills[{i}] must be a string " + f"(was {_json_type(skill)})" + ) + skills = tuple(skills_raw) + + prompt_raw = raw.get("prompt") + if prompt_raw is None: + prompt = "" + elif isinstance(prompt_raw, str): + prompt = prompt_raw + else: + die(f"agent '{name}' prompt must be a string (was {_json_type(prompt_raw)})") + + return cls(bottle=bottle, skills=skills, prompt=prompt) + + +@dataclass(frozen=True) +class Manifest: + bottles: Mapping[str, Bottle] + agents: Mapping[str, Agent] + + @classmethod + def resolve(cls, cwd: str) -> "Manifest": + """Look for claude-bottle.json in and in $HOME, deep-merge + them (cwd entries override home entries on key conflict for both + bottles and agents), then validate. Dies if neither file is + found, either is invalid JSON, or the merged shape violates the + schema.""" + cwd_file = Path(cwd) / "claude-bottle.json" + home_file = Path(os.environ["HOME"]) / "claude-bottle.json" + + cwd_doc = _load_json_or_die(cwd_file) if cwd_file.is_file() else None + home_doc = _load_json_or_die(home_file) if home_file.is_file() else None + + if cwd_doc is None and home_doc is None: + die(f"no claude-bottle.json found in {cwd} or {os.environ['HOME']}") + + h = home_doc or {} + c = cwd_doc or {} + merged: dict[str, Any] = { + "bottles": {**(h.get("bottles") or {}), **(c.get("bottles") or {})}, + "agents": {**(h.get("agents") or {}), **(c.get("agents") or {})}, + } + return cls.from_json_obj(merged) + + @classmethod + def from_json_obj(cls, obj: Any) -> "Manifest": + """Validate and build a Manifest from a raw JSON-like dict.""" + if not isinstance(obj, dict): + die(f"manifest must be a JSON object (was {_json_type(obj)})") + + raw_bottles = obj.get("bottles") or {} + raw_agents = obj.get("agents") or {} + if not isinstance(raw_bottles, dict): + die(f"manifest 'bottles' must be a JSON object (was {_json_type(raw_bottles)})") + if not isinstance(raw_agents, dict): + die(f"manifest 'agents' must be a JSON object (was {_json_type(raw_agents)})") + + bottles = {n: Bottle._from_dict(n, b) for n, b in raw_bottles.items()} + bottle_names = set(bottles.keys()) + agents = {n: Agent._from_dict(n, a, bottle_names) for n, a in raw_agents.items()} + return cls(bottles=bottles, agents=agents) + + def has_agent(self, name: str) -> bool: + return name in self.agents + + def require_agent(self, name: str) -> None: + if self.has_agent(name): + return + available = ", ".join(self.agents.keys()) + if available: + die(f"agent '{name}' not defined in claude-bottle.json. Available: {available}") + die(f"agent '{name}' not defined in claude-bottle.json (manifest is empty).") + + def has_bottle(self, name: str) -> bool: + return name in self.bottles + + def require_bottle(self, name: str) -> None: + if self.has_bottle(name): + return + available = ", ".join(self.bottles.keys()) + if available: + die( + f"bottle '{name}' not defined in claude-bottle.json. " + f"Available bottles: {available}" + ) + die(f"bottle '{name}' not defined in claude-bottle.json (no bottles defined).") + + def bottle_for(self, agent_name: str) -> Bottle: + """Resolve the Bottle the named agent references. The validator + guarantees both lookups succeed for a manifest built via + from_json_obj.""" + return self.bottles[self.agents[agent_name].bottle] + + +def _load_json_or_die(path: Path) -> dict[str, Any]: try: with path.open() as f: doc = json.load(f) @@ -225,90 +301,32 @@ def _load_json_or_die(path: Path) -> dict[Any, Any]: die(f"claude-bottle.json at {path} is not valid JSON") if not isinstance(doc, dict): die(f"claude-bottle.json at {path} must be a JSON object") - return cast(dict[Any, Any], doc) + return doc -def manifest_has_agent(manifest: Manifest, name: str) -> bool: - return name in manifest["agents"] +def _opt_str(value: Any, label: str) -> str: + if value is None: + return "" + if not isinstance(value, str): + die(f"{label} must be a string (was {_json_type(value)})") + return value -def manifest_require_agent(manifest: Manifest, name: str) -> None: - """Like has_agent but dies with the available agent names listed.""" - if manifest_has_agent(manifest, name): - return - available = ", ".join(manifest["agents"].keys()) - if available: - die(f"agent '{name}' not defined in claude-bottle.json. Available: {available}") - else: - die(f"agent '{name}' not defined in claude-bottle.json (manifest is empty).") - - -def manifest_env_names(manifest: Manifest, name: str) -> list[str]: - """Names (not values) of bottles[agent.bottle].env, in declaration - order. Empty if the agent's bottle has no env.""" - bottle_name = manifest["agents"][name]["bottle"] - return list(manifest["bottles"][bottle_name].get("env", {}).keys()) - - -def manifest_env_entry(manifest: Manifest, agent: str, var: str) -> str: - """Raw string value of one env entry. Used by env_resolve, which - classifies the result by sentinel.""" - bottle_name = manifest["agents"][agent]["bottle"] - return manifest["bottles"][bottle_name]["env"][var] - - -def manifest_skills(manifest: Manifest, name: str) -> list[str]: - return list(manifest["agents"][name].get("skills", [])) - - -def manifest_prompt(manifest: Manifest, name: str) -> str: - return manifest["agents"][name].get("prompt", "") - - -def manifest_agent_bottle(manifest: Manifest, name: str) -> str: - return manifest["agents"][name]["bottle"] - - -def manifest_has_bottle(manifest: Manifest, bottle_name: str) -> bool: - return bottle_name in manifest["bottles"] - - -def manifest_require_bottle(manifest: Manifest, bottle_name: str) -> None: - if manifest_has_bottle(manifest, bottle_name): - return - available = ", ".join(manifest["bottles"].keys()) - if available: - die( - f"bottle '{bottle_name}' not defined in claude-bottle.json. " - f"Available bottles: {available}" - ) - else: - die(f"bottle '{bottle_name}' not defined in claude-bottle.json (no bottles defined).") - - -def manifest_bottle_ssh(manifest: Manifest, bottle_name: str) -> list[SshEntry]: - return list(manifest["bottles"][bottle_name].get("ssh", [])) - - -def manifest_bottle_runtime(manifest: Manifest, bottle_name: str) -> str: - """Container runtime for the bottle's agent container. Returns - "runc" (Docker default) or "runsc" (gVisor opt-in).""" - return manifest["bottles"][bottle_name].get("runtime", "runc") - - -def manifest_bottle_egress_allowlist(manifest: Manifest, bottle_name: str) -> list[str]: - """Hostnames in bottles[bottle_name].egress.allowlist.""" - return list(manifest["bottles"][bottle_name].get("egress", {}).get("allowlist", [])) - - -def manifest_ssh(manifest: Manifest, agent_name: str) -> list[SshEntry]: - """SSH entries resolved via the agent's "bottle" field.""" - bottle_name = manifest_agent_bottle(manifest, agent_name) - return manifest_bottle_ssh(manifest, bottle_name) +def _opt_port(value: Any, label: str) -> str: + """Port accepts string or int (JSON-friendly) and is normalized to str.""" + if value is None: + return "" + if isinstance(value, bool): + die(f"{label} must be a string or number (was boolean)") + if isinstance(value, int): + return str(value) + if isinstance(value, str): + return value + die(f"{label} must be a string or number (was {_json_type(value)})") def _json_type(value: Any) -> str: - """Mirror jq's type names for parity with the bash error messages.""" + """Mirror jq's type names for parity with the original bash error messages.""" if value is None: return "null" if isinstance(value, bool): diff --git a/claude_bottle/pipelock.py b/claude_bottle/pipelock.py index b4b2256..e6f2bb9 100644 --- a/claude_bottle/pipelock.py +++ b/claude_bottle/pipelock.py @@ -18,7 +18,7 @@ import subprocess from pathlib import Path from .log import die, info, warn -from .manifest import Manifest, manifest_bottle_egress_allowlist, manifest_bottle_ssh +from .manifest import Manifest # Pipelock image, pinned by digest. The digest is the multi-arch image # index for ghcr.io/luckypipewrench/pipelock:2.3.0. @@ -58,23 +58,12 @@ def pipelock_proxy_host_port(slug: str) -> str: def pipelock_bottle_allowlist(manifest: Manifest, bottle_name: str) -> list[str]: - """Hostnames in bottles[].egress.allowlist. Validates - that each entry is a string.""" - raw = manifest_bottle_egress_allowlist(manifest, bottle_name) - for entry in raw: - if not isinstance(entry, str): - t = _json_type(entry) - die(f"bottle '{bottle_name}' egress.allowlist must contain only strings; found a '{t}' entry.") - return list(raw) + """Hostnames in bottles[].egress.allowlist.""" + return list(manifest.bottles[bottle_name].egress.allowlist) def pipelock_bottle_ssh_hostnames(manifest: Manifest, bottle_name: str) -> list[str]: - out: list[str] = [] - for entry in manifest_bottle_ssh(manifest, bottle_name): - h = entry.get("Hostname") or "" - if h: - out.append(h) - return out + return [e.Hostname for e in manifest.bottles[bottle_name].ssh if e.Hostname] _IPV4_RE = re.compile(r"^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+$") @@ -256,19 +245,3 @@ def pipelock_stop(slug: str) -> None: stderr=subprocess.DEVNULL, ).returncode != 0: warn(f"failed to remove pipelock sidecar {name}; clean up with 'docker rm -f {name}'") - - -def _json_type(value: object) -> str: - if value is None: - return "null" - if isinstance(value, bool): - return "boolean" - if isinstance(value, (int, float)): - return "number" - if isinstance(value, str): - return "string" - if isinstance(value, list): - return "array" - if isinstance(value, dict): - return "object" - return type(value).__name__ diff --git a/claude_bottle/ssh.py b/claude_bottle/ssh.py index 3c37e16..634958f 100644 --- a/claude_bottle/ssh.py +++ b/claude_bottle/ssh.py @@ -36,31 +36,26 @@ from __future__ import annotations import os import subprocess from pathlib import Path -from typing import Any +from typing import Sequence from .log import die, info +from .manifest import SshEntry -def ssh_validate_entries(entries: list[dict[str, Any]]) -> None: - """Each entry must have Host + IdentityFile, and the IdentityFile - must exist on the host (after expanding leading ~).""" +def ssh_validate_entries(entries: Sequence[SshEntry]) -> None: + """The IdentityFile must exist on the host (after expanding leading ~). + Host and IdentityFile shape are already enforced by Manifest validation.""" for entry in entries: - name = entry.get("Host", "") - key = entry.get("IdentityFile", "") - if not name: - die(f"ssh entry missing required field 'Host': {entry}") - if not key: - die(f"ssh entry '{name}' missing required field 'IdentityFile'") - key = _expand_tilde(key) + key = _expand_tilde(entry.IdentityFile) if not os.path.isfile(key): - die(f"ssh key file not found for host '{name}': {key}") + die(f"ssh key file not found for host '{entry.Host}': {key}") def ssh_setup( container: str, stage_dir: Path, proxy_host_port: str, - entries: list[dict[str, Any]], + entries: Sequence[SshEntry], ) -> None: """Set up SSH in the container so node can authenticate using each entry's key without the key file being readable by node.""" @@ -91,12 +86,12 @@ def ssh_setup( container_key_paths: list[str] = [] for entry in entries: - name = entry["Host"] - key = _expand_tilde(entry["IdentityFile"]) - hostname = entry["Hostname"] - user = entry["User"] - port = str(entry["Port"]) - known_host_key = entry.get("KnownHostKey", "") + name = entry.Host + key = _expand_tilde(entry.IdentityFile) + hostname = entry.Hostname + user = entry.User + port = entry.Port + known_host_key = entry.KnownHostKey key_basename = os.path.basename(key) container_key_path = f"{keys_dir}/{key_basename}" diff --git a/tests/fixtures.py b/tests/fixtures.py index d4c6432..fae2f66 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -1,4 +1,7 @@ -"""Manifest fixtures for the test suite.""" +"""Manifest fixtures for the test suite. Each fixture returns a built +Manifest dataclass; callers that need the raw JSON shape (e.g. to write +to a file on disk) can build it themselves or call .from_json_obj on +a dict literal in the test.""" from __future__ import annotations @@ -7,9 +10,11 @@ import tempfile from pathlib import Path from typing import Any +from claude_bottle.manifest import Manifest -def fixture_minimal() -> dict[str, Any]: - """One bottle, one agent, no env / ssh / skills.""" + +def fixture_minimal_dict() -> dict[str, Any]: + """One bottle, one agent, no env / ssh / skills. JSON shape.""" return { "bottles": {"dev": {}}, "agents": { @@ -18,8 +23,8 @@ def fixture_minimal() -> dict[str, Any]: } -def fixture_with_egress() -> dict[str, Any]: - """Bottle declares an egress.allowlist.""" +def fixture_with_egress_dict() -> dict[str, Any]: + """Bottle declares an egress.allowlist. JSON shape.""" return { "bottles": { "dev": { @@ -32,9 +37,9 @@ def fixture_with_egress() -> dict[str, Any]: } -def fixture_with_ssh() -> dict[str, Any]: +def fixture_with_ssh_dict() -> dict[str, Any]: """Bottle has both an IPv4-literal SSH host (CGNAT) and a hostname host, - exercising both ssrf.ip_allowlist and trusted_domains code paths.""" + exercising both ssrf.ip_allowlist and trusted_domains code paths. JSON shape.""" return { "bottles": { "dev": { @@ -60,8 +65,22 @@ def fixture_with_ssh() -> dict[str, Any]: } +def fixture_minimal() -> Manifest: + return Manifest.from_json_obj(fixture_minimal_dict()) + + +def fixture_with_egress() -> Manifest: + return Manifest.from_json_obj(fixture_with_egress_dict()) + + +def fixture_with_ssh() -> Manifest: + return Manifest.from_json_obj(fixture_with_ssh_dict()) + + def write_fixture(fn) -> Path: - """Write fixture dict to a temp file; return the path. Caller must rm.""" + """Write fixture JSON to a temp file; return the path. Caller must rm. + Accepts a function returning either a dict (JSON shape) or a Manifest; + only the dict form is supported here since we need to serialize.""" f = tempfile.NamedTemporaryFile( mode="w", suffix=".json", delete=False, encoding="utf-8" ) diff --git a/tests/test_manifest_runtime.py b/tests/test_manifest_runtime.py index eeec914..829ce67 100644 --- a/tests/test_manifest_runtime.py +++ b/tests/test_manifest_runtime.py @@ -1,16 +1,18 @@ -"""Unit: bottle runtime — manifest_bottle_runtime returns the configured -runtime (defaulting to runc); manifest_validate rejects unknown values, -non-strings, and empty strings.""" +"""Unit: bottle runtime — Manifest.from_json_obj defaults runtime to runc, +accepts runsc, and rejects unknown values, non-strings, and empty strings.""" import unittest from claude_bottle.log import Die -from claude_bottle.manifest import manifest_bottle_runtime, manifest_validate +from claude_bottle.manifest import Manifest -def _bottle(runtime_value: object | None) -> dict: - """Build a minimal manifest with one bottle whose runtime field is - set (or absent if `runtime_value is _ABSENT`).""" +_ABSENT = object() + + +def _bottle(runtime_value: object) -> dict: + """Build a minimal manifest JSON shape with one bottle whose runtime + field is set (or absent if `runtime_value is _ABSENT`).""" bottle: dict = {} if runtime_value is not _ABSENT: bottle["runtime"] = runtime_value @@ -20,30 +22,30 @@ def _bottle(runtime_value: object | None) -> dict: } -_ABSENT = object() - - class TestManifestBottleRuntime(unittest.TestCase): def test_default_runc_when_absent(self): - self.assertEqual("runc", manifest_bottle_runtime(_bottle(_ABSENT), "dev")) + m = Manifest.from_json_obj(_bottle(_ABSENT)) + self.assertEqual("runc", m.bottles["dev"].runtime) def test_explicit_runc(self): - self.assertEqual("runc", manifest_bottle_runtime(_bottle("runc"), "dev")) + m = Manifest.from_json_obj(_bottle("runc")) + self.assertEqual("runc", m.bottles["dev"].runtime) def test_explicit_runsc(self): - self.assertEqual("runsc", manifest_bottle_runtime(_bottle("runsc"), "dev")) + m = Manifest.from_json_obj(_bottle("runsc")) + self.assertEqual("runsc", m.bottles["dev"].runtime) def test_rejects_unknown_runtime(self): with self.assertRaises(Die): - manifest_validate(_bottle("kata-runtime")) + Manifest.from_json_obj(_bottle("kata-runtime")) def test_rejects_non_string(self): with self.assertRaises(Die): - manifest_validate(_bottle(42)) + Manifest.from_json_obj(_bottle(42)) def test_rejects_empty_string(self): with self.assertRaises(Die): - manifest_validate(_bottle("")) + Manifest.from_json_obj(_bottle("")) if __name__ == "__main__": diff --git a/tests/test_pipelock_allowlist.py b/tests/test_pipelock_allowlist.py index 08911fd..887cfcf 100644 --- a/tests/test_pipelock_allowlist.py +++ b/tests/test_pipelock_allowlist.py @@ -5,6 +5,7 @@ pipelock_bottle_ssh_trusted_domains, pipelock_effective_allowlist.""" import unittest from claude_bottle.log import Die +from claude_bottle.manifest import Manifest from claude_bottle.pipelock import ( pipelock_bottle_allowlist, pipelock_bottle_ssh_hostnames, @@ -32,7 +33,7 @@ class TestBottleAllowlist(unittest.TestCase): "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, } with self.assertRaises(Die): - pipelock_bottle_allowlist(bad, "dev") + Manifest.from_json_obj(bad) class TestSSHHostnames(unittest.TestCase): @@ -54,7 +55,7 @@ class TestSSHHostnames(unittest.TestCase): class TestEffectiveAllowlist(unittest.TestCase): def test_union_and_dedup(self): - manifest = { + manifest = Manifest.from_json_obj({ "bottles": { "dev": { "egress": {"allowlist": ["registry.npmjs.org"]}, @@ -67,7 +68,7 @@ class TestEffectiveAllowlist(unittest.TestCase): } }, "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, - } + }) eff = pipelock_effective_allowlist(manifest, "dev") self.assertIn("api.anthropic.com", eff) self.assertIn("registry.npmjs.org", eff) diff --git a/tests/test_pipelock_yaml.py b/tests/test_pipelock_yaml.py index 184bf83..9602458 100644 --- a/tests/test_pipelock_yaml.py +++ b/tests/test_pipelock_yaml.py @@ -7,6 +7,7 @@ import tempfile import unittest from pathlib import Path +from claude_bottle.manifest import Manifest from claude_bottle.pipelock import pipelock_write_yaml from tests.fixtures import fixture_minimal, fixture_with_ssh @@ -50,7 +51,7 @@ class TestPipelockYaml(unittest.TestCase): self.assertIn("100.78.141.42", content) def test_secret_hygiene(self): - manifest = { + manifest = Manifest.from_json_obj({ "bottles": { "dev": { "env": { @@ -61,7 +62,7 @@ class TestPipelockYaml(unittest.TestCase): } }, "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, - } + }) yaml_path = self.out_dir / "secret.yaml" pipelock_write_yaml(manifest, "dev", yaml_path) content = yaml_path.read_text()