diff --git a/claude_bottle/cli/info.py b/claude_bottle/cli/info.py index 77f2545..b479bd0 100644 --- a/claude_bottle/cli/info.py +++ b/claude_bottle/cli/info.py @@ -5,15 +5,7 @@ from __future__ import annotations import argparse from ..log import info -from ..manifest import ( - manifest_agent_bottle, - manifest_env_names, - manifest_prompt, - manifest_require_agent, - manifest_resolve, - manifest_skills, - manifest_ssh, -) +from ..manifest import Manifest from ._common import PROG, USER_CWD @@ -22,39 +14,33 @@ def cmd_info(argv: list[str]) -> int: parser.add_argument("name", help="agent name defined in claude-bottle.json") args = parser.parse_args(argv) - manifest = manifest_resolve(USER_CWD) - manifest_require_agent(manifest, args.name) + manifest = Manifest.resolve(USER_CWD) + manifest.require_agent(args.name) - env_names = manifest_env_names(manifest, args.name) - skill_names = manifest_skills(manifest, args.name) - prompt_content = manifest_prompt(manifest, args.name) - prompt_first_line = prompt_content.splitlines()[0] if prompt_content else "" - - bottle_name = manifest_agent_bottle(manifest, args.name) - ssh_entries = manifest_ssh(manifest, args.name) + agent = manifest.agents[args.name] + bottle = manifest.bottle_for(args.name) + env_names = list(bottle.env.keys()) + prompt_first_line = agent.prompt.splitlines()[0] if agent.prompt else "" print() info(f"agent : {args.name}") info(f"env (names only): {', '.join(env_names) if env_names else '(none)'}") - info(f"skills : {' '.join(skill_names) if skill_names else '(none)'}") + info(f"skills : {' '.join(agent.skills) if agent.skills else '(none)'}") info( - f"prompt : {len(prompt_content)} chars; " + f"prompt : {len(agent.prompt)} chars; " f"first line: {prompt_first_line or '(empty)'}" ) - if bottle_name: - info(f"bottle : {bottle_name}") - if ssh_entries: - for e in ssh_entries: - info( - f" ssh host : {e.get('Host')} " - f"(Hostname={e.get('Hostname')}, User={e.get('User')}, " - f"Port={e.get('Port')}, IdentityFile={e.get('IdentityFile')})" - ) - if e.get("KnownHostKey"): - info(f" KnownHostKey: {e['KnownHostKey']}") - else: - info(" ssh hosts : (none)") + info(f"bottle : {agent.bottle}") + if bottle.ssh: + for e in bottle.ssh: + info( + f" ssh host : {e.Host} " + f"(Hostname={e.Hostname}, User={e.User}, " + f"Port={e.Port}, IdentityFile={e.IdentityFile})" + ) + if e.KnownHostKey: + info(f" KnownHostKey: {e.KnownHostKey}") else: - info("bottle : (none)") + info(" ssh hosts : (none)") print() return 0 diff --git a/claude_bottle/cli/list.py b/claude_bottle/cli/list.py index 06f2c97..a60143b 100644 --- a/claude_bottle/cli/list.py +++ b/claude_bottle/cli/list.py @@ -7,7 +7,7 @@ import subprocess from .. import docker as docker_mod from ..log import info -from ..manifest import manifest_resolve +from ..manifest import Manifest from ._common import PROG, USER_CWD @@ -17,8 +17,8 @@ def cmd_list(argv: list[str]) -> int: args = parser.parse_args(argv) if args.scope == "available": - manifest = manifest_resolve(USER_CWD) - for name in (manifest.get("agents") or {}).keys(): + manifest = Manifest.resolve(USER_CWD) + for name in manifest.agents.keys(): print(name) return 0 diff --git a/claude_bottle/cli/start.py b/claude_bottle/cli/start.py index 5cca959..2685df6 100644 --- a/claude_bottle/cli/start.py +++ b/claude_bottle/cli/start.py @@ -19,17 +19,7 @@ from .. import skills as skills_mod from .. import ssh as ssh_mod from ..env_resolve import env_resolve from ..log import die, info -from ..manifest import ( - manifest_agent_bottle, - manifest_bottle_runtime, - manifest_env_names, - manifest_prompt, - manifest_require_agent, - manifest_require_bottle, - manifest_resolve, - manifest_skills, - manifest_ssh, -) +from ..manifest import Manifest from ._common import PROG, REPO_DIR, USER_CWD, read_tty_line @@ -57,8 +47,11 @@ def cmd_start(argv: list[str]) -> int: runtime_image = derived_image docker_mod.require_docker() - manifest = manifest_resolve(USER_CWD) - manifest_require_agent(manifest, name) + manifest = Manifest.resolve(USER_CWD) + manifest.require_agent(name) + agent = manifest.agents[name] + bottle_name = agent.bottle + bottle = manifest.bottle_for(name) container = pinned_container or default_container suffix = 2 @@ -81,7 +74,7 @@ def cmd_start(argv: list[str]) -> int: ) # --- Plan resolution (host-only, no container yet) --- - env_names = manifest_env_names(manifest, name) + env_names = list(bottle.env.keys()) # CLAUDE_BOTTLE_OAUTH_TOKEN → CLAUDE_CODE_OAUTH_TOKEN forwarding. # Host-side token is always forwarded so every container can authenticate. @@ -90,23 +83,14 @@ def cmd_start(argv: list[str]) -> int: if forward_oauth_token: display_env_names.append("CLAUDE_CODE_OAUTH_TOKEN") - skill_names = manifest_skills(manifest, name) - if skill_names: - skills_mod.skills_validate_all(skill_names) + if agent.skills: + skills_mod.skills_validate_all(list(agent.skills)) - bottle_name = manifest_agent_bottle(manifest, name) - if not bottle_name: - die( - f"agent '{name}' has no 'bottle' field. " - f"Add a bottle association to this agent in claude-bottle.json." - ) - manifest_require_bottle(manifest, bottle_name) - - runtime = manifest_bottle_runtime(manifest, bottle_name) + runtime = bottle.runtime if runtime == "runsc": docker_mod.require_runsc() - ssh_entries = manifest_ssh(manifest, name) + ssh_entries = bottle.ssh if ssh_entries: ssh_mod.ssh_validate_entries(ssh_entries) @@ -153,7 +137,7 @@ def cmd_start(argv: list[str]) -> int: env_resolve(manifest, name, env_file, args_file) - prompt_content = manifest_prompt(manifest, name) + prompt_content = agent.prompt prompt_file.write_text(prompt_content) prompt_first_line = prompt_content.splitlines()[0] if prompt_content else "" @@ -169,11 +153,11 @@ def cmd_start(argv: list[str]) -> int: "env (names only): " + (", ".join(display_env_names) if display_env_names else "(none)") ) - info("skills : " + (" ".join(skill_names) if skill_names else "(none)")) + info("skills : " + (" ".join(agent.skills) if agent.skills else "(none)")) info(f"bottle : {bottle_name}") info(f" runtime : {runtime}{' (gVisor)' if runtime == 'runsc' else ''}") if ssh_entries: - ssh_names = ", ".join(e.get("Host", "") for e in ssh_entries) + ssh_names = ", ".join(e.Host for e in ssh_entries) info(f" ssh hosts : {ssh_names}") else: info(" ssh hosts : (none)") @@ -293,8 +277,8 @@ def cmd_start(argv: list[str]) -> int: check=True, ) - if skill_names: - skills_mod.skills_copy_into(container, skill_names) + if agent.skills: + skills_mod.skills_copy_into(container, list(agent.skills)) if ssh_entries: proxy_host_port = pipelock.pipelock_proxy_host_port(slug) diff --git a/claude_bottle/env_resolve.py b/claude_bottle/env_resolve.py index 82f2980..0e0b3e7 100644 --- a/claude_bottle/env_resolve.py +++ b/claude_bottle/env_resolve.py @@ -35,7 +35,7 @@ import sys from pathlib import Path from .log import die -from .manifest import Manifest, manifest_env_entry, manifest_env_names +from .manifest import Manifest _INTERPOLATED_RE = re.compile(r"^\$\{([A-Za-z_][A-Za-z0-9_]*)\}$") @@ -108,10 +108,10 @@ def env_resolve( - interpolated: copy host value; export under target name; append `-e NAME` - literal: append `NAME=VALUE` to env_file """ - for name in manifest_env_names(manifest, agent): + bottle = manifest.bottle_for(agent) + for name, raw in bottle.env.items(): if not name: continue - raw = manifest_env_entry(manifest, agent, name) kind = env_entry_kind(raw) if kind == "secret": prompt_body = env_entry_secret_prompt(raw) diff --git a/claude_bottle/log.py b/claude_bottle/log.py index b666e1e..4fc93cf 100644 --- a/claude_bottle/log.py +++ b/claude_bottle/log.py @@ -3,6 +3,7 @@ from __future__ import annotations import sys +from typing import NoReturn def info(msg: str) -> None: @@ -18,6 +19,6 @@ class Die(SystemExit): fatal exit from an unrelated SystemExit.""" -def die(msg: str) -> "Die": +def die(msg: str) -> NoReturn: print(f"claude-bottle: error: {msg}", file=sys.stderr) raise Die(1) diff --git a/claude_bottle/manifest.py b/claude_bottle/manifest.py index 91eaa39..20b6975 100644 --- a/claude_bottle/manifest.py +++ b/claude_bottle/manifest.py @@ -1,5 +1,5 @@ -"""Manifest helpers. Read claude-bottle.json and pull the definition for a -named agent. +"""Manifest dataclasses. Read claude-bottle.json (cwd + $HOME, deep-merged) +into a frozen, validated Manifest tree. Schema (see CLAUDE.md "Intended design"): { @@ -7,7 +7,8 @@ Schema (see CLAUDE.md "Intended design"): "": { "env": { "": , ... }, "ssh": [ , ... ], - "egress": { "allowlist": [ "", ... ] } + "egress": { "allowlist": [ "", ... ] }, + "runtime": "runc" | "runsc" } }, "agents": { @@ -21,199 +22,328 @@ Schema (see CLAUDE.md "Intended design"): Bottles group shared infrastructure (SSH keys, known hosts, egress allowlist) that multiple agents can reference. Every agent must reference a bottle. + +Validation runs once at construction (Manifest.from_json_obj) so getters +can trust the shape. """ from __future__ import annotations import json import os +from dataclasses import dataclass, field from pathlib import Path -from typing import Any +from typing import Literal, Mapping, cast from .log import die -Manifest = dict[str, Any] + +Runtime = Literal["runc", "runsc"] +_SUPPORTED_RUNTIMES: tuple[Runtime, ...] = ("runc", "runsc") -def manifest_resolve(cwd: str) -> Manifest: - """Look for claude-bottle.json in and in $HOME, deep-merge - them (cwd entries override home entries on key conflict for both - bottles and agents). Dies if neither file is found or either is - invalid JSON.""" - cwd_file = Path(cwd) / "claude-bottle.json" - home_file = Path(os.environ["HOME"]) / "claude-bottle.json" - - cwd_doc = _load_json_or_die(cwd_file) if cwd_file.is_file() else None - home_doc = _load_json_or_die(home_file) if home_file.is_file() else None - - if cwd_doc is None and home_doc is None: - die(f"no claude-bottle.json found in {cwd} or {os.environ['HOME']}") - - if cwd_doc is None: - return home_doc # type: ignore[return-value] - if home_doc is None: - return cwd_doc - - return { - "bottles": {**(home_doc.get("bottles") or {}), **(cwd_doc.get("bottles") or {})}, - "agents": {**(home_doc.get("agents") or {}), **(cwd_doc.get("agents") or {})}, - } +def _empty_str_dict() -> dict[str, str]: + return {} -def _load_json_or_die(path: Path) -> Manifest: - try: - with path.open() as f: - doc = json.load(f) - except json.JSONDecodeError: - die(f"claude-bottle.json at {path} is not valid JSON") - if not isinstance(doc, dict): - die(f"claude-bottle.json at {path} must be a JSON object") - return doc +@dataclass(frozen=True) +class SshEntry: + Host: str + IdentityFile: str + Hostname: str = "" + User: str = "" + Port: str = "" + KnownHostKey: str = "" + + @classmethod + def from_dict(cls, bottle_name: str, idx: int, raw: object) -> "SshEntry": + d = _as_json_object(raw, f"bottle '{bottle_name}' ssh[{idx}]") + host = d.get("Host") + if not isinstance(host, str) or not host: + die(f"bottle '{bottle_name}' ssh[{idx}] missing required string field 'Host'") + ident = d.get("IdentityFile") + if not isinstance(ident, str) or not ident: + die( + f"bottle '{bottle_name}' ssh '{host}' missing required string field " + f"'IdentityFile'" + ) + hostname = _opt_str(d.get("Hostname"), f"bottle '{bottle_name}' ssh '{host}' Hostname") + user = _opt_str(d.get("User"), f"bottle '{bottle_name}' ssh '{host}' User") + port = _opt_port(d.get("Port"), f"bottle '{bottle_name}' ssh '{host}' Port") + khk = _opt_str( + d.get("KnownHostKey"), + f"bottle '{bottle_name}' ssh '{host}' KnownHostKey", + ) + return cls( + Host=host, + IdentityFile=ident, + Hostname=hostname, + User=user, + Port=port, + KnownHostKey=khk, + ) -def manifest_has_agent(manifest: Manifest, name: str) -> bool: - return name in (manifest.get("agents") or {}) +@dataclass(frozen=True) +class BottleEgress: + allowlist: tuple[str, ...] = () + + @classmethod + def from_dict(cls, bottle_name: str, raw: object) -> "BottleEgress": + d = _as_json_object(raw, f"bottle '{bottle_name}' egress") + allow = d.get("allowlist") + if allow is None: + return cls() + if not isinstance(allow, list): + die( + f"bottle '{bottle_name}' egress.allowlist must be an array " + f"(was {type(allow).__name__})" + ) + items: list[str] = [] + allow_list = cast(list[object], allow) + for i, host in enumerate(allow_list): + if not isinstance(host, str): + die( + f"bottle '{bottle_name}' egress.allowlist[{i}] must be a string " + f"(was {type(host).__name__})" + ) + items.append(host) + return cls(allowlist=tuple(items)) -def manifest_require_agent(manifest: Manifest, name: str) -> None: - """Like has_agent but dies with the available agent names listed.""" - if manifest_has_agent(manifest, name): - return - available = ", ".join((manifest.get("agents") or {}).keys()) - if available: - die(f"agent '{name}' not defined in claude-bottle.json. Available: {available}") - else: +@dataclass(frozen=True) +class Bottle: + env: Mapping[str, str] = field(default_factory=_empty_str_dict) + ssh: tuple[SshEntry, ...] = () + egress: BottleEgress = field(default_factory=BottleEgress) + runtime: Runtime = "runc" + + @classmethod + def from_dict(cls, name: str, raw: object) -> "Bottle": + d = _as_json_object(raw, f"bottle '{name}'") + + env: dict[str, str] = {} + env_raw = d.get("env") + if env_raw is not None: + env_dict = _as_json_object(env_raw, f"bottle '{name}' env") + for var, value in env_dict.items(): + if not isinstance(value, str): + die( + f"env entry {var} in bottle '{name}' must be a JSON string " + f"(was {type(value).__name__}). Use \"?\" for prompt-at-runtime." + ) + env[var] = value + + ssh: tuple[SshEntry, ...] = () + ssh_raw = d.get("ssh") + if ssh_raw is not None: + if not isinstance(ssh_raw, list): + die(f"bottle '{name}' ssh must be an array (was {type(ssh_raw).__name__})") + ssh_list = cast(list[object], ssh_raw) + ssh = tuple( + SshEntry.from_dict(name, i, entry) + for i, entry in enumerate(ssh_list) + ) + + egress_raw = d.get("egress") + egress = ( + BottleEgress.from_dict(name, egress_raw) + if egress_raw is not None + else BottleEgress() + ) + + runtime_raw = d.get("runtime") + runtime: Runtime + if runtime_raw is None: + runtime = "runc" + else: + if not isinstance(runtime_raw, str): + die(f"bottle '{name}' runtime must be a string (was {type(runtime_raw).__name__})") + if runtime_raw not in _SUPPORTED_RUNTIMES: + die( + f"bottle '{name}' runtime '{runtime_raw}' is not supported. " + f"Use one of: {', '.join(_SUPPORTED_RUNTIMES)}." + ) + runtime = runtime_raw + + return cls(env=env, ssh=ssh, egress=egress, runtime=runtime) + + +@dataclass(frozen=True) +class Agent: + bottle: str + skills: tuple[str, ...] = () + prompt: str = "" + + @classmethod + def from_dict(cls, name: str, raw: object, bottle_names: set[str]) -> "Agent": + d = _as_json_object(raw, f"agent '{name}'") + + bottle = d.get("bottle") + if not isinstance(bottle, str) or not bottle: + die(f"agent '{name}' must declare a 'bottle' field naming a defined bottle") + if bottle not in bottle_names: + available = ", ".join(sorted(bottle_names)) or "(none defined)" + die( + f"agent '{name}' references bottle '{bottle}', which is not defined. " + f"Available: {available}" + ) + + skills: tuple[str, ...] = () + skills_raw = d.get("skills") + if skills_raw is not None: + if not isinstance(skills_raw, list): + die(f"agent '{name}' skills must be an array (was {type(skills_raw).__name__})") + collected: list[str] = [] + skills_list = cast(list[object], skills_raw) + for i, skill in enumerate(skills_list): + if not isinstance(skill, str): + die( + f"agent '{name}' skills[{i}] must be a string " + f"(was {type(skill).__name__})" + ) + collected.append(skill) + skills = tuple(collected) + + prompt_raw = d.get("prompt") + if prompt_raw is None: + prompt = "" + elif isinstance(prompt_raw, str): + prompt = prompt_raw + else: + die(f"agent '{name}' prompt must be a string (was {type(prompt_raw).__name__})") + + return cls(bottle=bottle, skills=skills, prompt=prompt) + + +@dataclass(frozen=True) +class Manifest: + bottles: Mapping[str, Bottle] + agents: Mapping[str, Agent] + + @classmethod + def resolve(cls, cwd: str) -> "Manifest": + """Look for claude-bottle.json in and in $HOME, deep-merge + them (cwd entries override home entries on key conflict for both + bottles and agents), then validate. Dies if neither file is + found, either is invalid JSON, or the merged shape violates the + schema.""" + cwd_file = Path(cwd) / "claude-bottle.json" + home_file = Path(os.environ["HOME"]) / "claude-bottle.json" + + cwd_doc = _load_json_or_die(cwd_file) if cwd_file.is_file() else None + home_doc = _load_json_or_die(home_file) if home_file.is_file() else None + + if cwd_doc is None and home_doc is None: + die(f"no claude-bottle.json found in {cwd} or {os.environ['HOME']}") + + h: dict[str, object] = home_doc if home_doc is not None else {} + c: dict[str, object] = cwd_doc if cwd_doc is not None else {} + h_bottles = _section_dict(h.get("bottles"), "bottles") + c_bottles = _section_dict(c.get("bottles"), "bottles") + h_agents = _section_dict(h.get("agents"), "agents") + c_agents = _section_dict(c.get("agents"), "agents") + merged: dict[str, object] = { + "bottles": {**h_bottles, **c_bottles}, + "agents": {**h_agents, **c_agents}, + } + return cls.from_json_obj(merged) + + @classmethod + def from_json_obj(cls, obj: object) -> "Manifest": + """Validate and build a Manifest from a raw JSON-like dict.""" + d = _as_json_object(obj, "manifest") + raw_bottles = _section_dict(d.get("bottles"), "manifest 'bottles'") + raw_agents = _section_dict(d.get("agents"), "manifest 'agents'") + + bottles: dict[str, Bottle] = { + n: Bottle.from_dict(n, b) for n, b in raw_bottles.items() + } + bottle_names = set(bottles.keys()) + agents: dict[str, Agent] = { + n: Agent.from_dict(n, a, bottle_names) for n, a in raw_agents.items() + } + return cls(bottles=bottles, agents=agents) + + def has_agent(self, name: str) -> bool: + return name in self.agents + + def require_agent(self, name: str) -> None: + if self.has_agent(name): + return + available = ", ".join(self.agents.keys()) + if available: + die(f"agent '{name}' not defined in claude-bottle.json. Available: {available}") die(f"agent '{name}' not defined in claude-bottle.json (manifest is empty).") + def has_bottle(self, name: str) -> bool: + return name in self.bottles -def manifest_env_names(manifest: Manifest, name: str) -> list[str]: - """Names (not values) of bottles[agent.bottle].env, in declaration - order. Empty list if the agent has no bottle or the bottle has no env.""" - agent = (manifest.get("agents") or {}).get(name) or {} - bottle_name = agent.get("bottle") or "" - if not bottle_name: - return [] - bottle = (manifest.get("bottles") or {}).get(bottle_name) or {} - return list((bottle.get("env") or {}).keys()) + def require_bottle(self, name: str) -> None: + if self.has_bottle(name): + return + available = ", ".join(self.bottles.keys()) + if available: + die( + f"bottle '{name}' not defined in claude-bottle.json. " + f"Available bottles: {available}" + ) + die(f"bottle '{name}' not defined in claude-bottle.json (no bottles defined).") + + def bottle_for(self, agent_name: str) -> Bottle: + """Resolve the Bottle the named agent references. The validator + guarantees both lookups succeed for a manifest built via + from_json_obj.""" + return self.bottles[self.agents[agent_name].bottle] -def manifest_env_entry(manifest: Manifest, agent: str, var: str) -> str: - """Raw string value of one env entry. Used by env_resolve, which - classifies the result by sentinel. Dies if the agent has no bottle, - or the entry is not a string.""" - agent_def = (manifest.get("agents") or {}).get(agent) or {} - bottle_name = agent_def.get("bottle") or "" - if not bottle_name: - die(f"env entry {var} for agent {agent}: agent has no 'bottle' field") - bottle = (manifest.get("bottles") or {}).get(bottle_name) or {} - env = bottle.get("env") or {} - value = env.get(var) +def _as_json_object(value: object, label: str) -> dict[str, object]: + """Assert that `value` is a JSON object (str-keyed dict) and return + a view typed as `dict[str, object]` so downstream `.get(...)` calls + have a typed surface.""" + if not isinstance(value, dict): + die(f"{label} must be a JSON object (was {type(value).__name__})") + items = cast(dict[object, object], value) + out: dict[str, object] = {} + for k, v in items.items(): + if not isinstance(k, str): + die(f"{label} keys must be strings (found {type(k).__name__})") + out[k] = v + return out + + +def _section_dict(value: object, label: str) -> dict[str, object]: + """Like _as_json_object but treats absent/null as an empty section.""" + if value is None: + return {} + return _as_json_object(value, label) + + +def _load_json_or_die(path: Path) -> dict[str, object]: + try: + with path.open() as f: + doc: object = json.load(f) + except json.JSONDecodeError: + die(f"claude-bottle.json at {path} is not valid JSON") + return _as_json_object(doc, f"claude-bottle.json at {path}") + + +def _opt_str(value: object, label: str) -> str: + if value is None: + return "" if not isinstance(value, str): - actual = _json_type(value) - die( - f"env entry {var} for agent {agent} must be a JSON string " - f"(was {actual}). Use \"?\" for prompt-at-runtime." - ) + die(f"{label} must be a string (was {type(value).__name__})") return value -def manifest_skills(manifest: Manifest, name: str) -> list[str]: - agent = (manifest.get("agents") or {}).get(name) or {} - return list(agent.get("skills") or []) - - -def manifest_prompt(manifest: Manifest, name: str) -> str: - agent = (manifest.get("agents") or {}).get(name) or {} - return agent.get("prompt") or "" - - -def manifest_agent_bottle(manifest: Manifest, name: str) -> str: - agent = (manifest.get("agents") or {}).get(name) or {} - return agent.get("bottle") or "" - - -def manifest_has_bottle(manifest: Manifest, bottle_name: str) -> bool: - return bottle_name in (manifest.get("bottles") or {}) - - -def manifest_require_bottle(manifest: Manifest, bottle_name: str) -> None: - if manifest_has_bottle(manifest, bottle_name): - return - available = ", ".join((manifest.get("bottles") or {}).keys()) - if available: - die( - f"bottle '{bottle_name}' not defined in claude-bottle.json. " - f"Available bottles: {available}" - ) - else: - die(f"bottle '{bottle_name}' not defined in claude-bottle.json (no bottles defined).") - - -def manifest_bottle_ssh(manifest: Manifest, bottle_name: str) -> list[dict[str, Any]]: - bottle = (manifest.get("bottles") or {}).get(bottle_name) or {} - return list(bottle.get("ssh") or []) - - -_SUPPORTED_RUNTIMES: tuple[str, ...] = ("runc", "runsc") - - -def manifest_bottle_runtime(manifest: Manifest, bottle_name: str) -> str: - """Container runtime for the bottle's agent container. Returns - "runc" (Docker default) or "runsc" (gVisor opt-in). Dies if the - field is present but not one of the supported values.""" - bottle = (manifest.get("bottles") or {}).get(bottle_name) or {} - raw = bottle.get("runtime") - if raw is None: - return "runc" - if not isinstance(raw, str): - die( - f"bottle '{bottle_name}' runtime must be a string " - f"(was {_json_type(raw)})." - ) - if raw not in _SUPPORTED_RUNTIMES: - die( - f"bottle '{bottle_name}' runtime '{raw}' is not supported. " - f"Use one of: {', '.join(_SUPPORTED_RUNTIMES)}." - ) - return raw - - -def manifest_bottle_egress_allowlist(manifest: Manifest, bottle_name: str) -> list[str]: - """Hostnames in bottles[bottle_name].egress.allowlist. Dies if the - field is present but not an array. Per-element string typing is - re-checked at use-time in pipelock.""" - bottle = (manifest.get("bottles") or {}).get(bottle_name) or {} - allowlist = (bottle.get("egress") or {}).get("allowlist") - if allowlist is None: - return [] - if not isinstance(allowlist, list): - die( - f"bottle '{bottle_name}' egress.allowlist must be an array " - f"(was {_json_type(allowlist)})." - ) - return list(allowlist) - - -def manifest_ssh(manifest: Manifest, agent_name: str) -> list[dict[str, Any]]: - """SSH entries resolved via the agent's "bottle" field; empty if no bottle set.""" - bottle_name = manifest_agent_bottle(manifest, agent_name) - if not bottle_name: - return [] - return manifest_bottle_ssh(manifest, bottle_name) - - -def _json_type(value: Any) -> str: - """Mirror jq's type names for parity with the bash error messages.""" +def _opt_port(value: object, label: str) -> str: + """Port accepts string or int (JSON-friendly) and is normalized to str.""" if value is None: - return "null" + return "" if isinstance(value, bool): - return "boolean" - if isinstance(value, (int, float)): - return "number" + die(f"{label} must be a string or number (was boolean)") + if isinstance(value, int): + return str(value) if isinstance(value, str): - return "string" - if isinstance(value, list): - return "array" - if isinstance(value, dict): - return "object" - return type(value).__name__ + return value + die(f"{label} must be a string or number (was {type(value).__name__})") diff --git a/claude_bottle/pipelock.py b/claude_bottle/pipelock.py index b4b2256..e6f2bb9 100644 --- a/claude_bottle/pipelock.py +++ b/claude_bottle/pipelock.py @@ -18,7 +18,7 @@ import subprocess from pathlib import Path from .log import die, info, warn -from .manifest import Manifest, manifest_bottle_egress_allowlist, manifest_bottle_ssh +from .manifest import Manifest # Pipelock image, pinned by digest. The digest is the multi-arch image # index for ghcr.io/luckypipewrench/pipelock:2.3.0. @@ -58,23 +58,12 @@ def pipelock_proxy_host_port(slug: str) -> str: def pipelock_bottle_allowlist(manifest: Manifest, bottle_name: str) -> list[str]: - """Hostnames in bottles[].egress.allowlist. Validates - that each entry is a string.""" - raw = manifest_bottle_egress_allowlist(manifest, bottle_name) - for entry in raw: - if not isinstance(entry, str): - t = _json_type(entry) - die(f"bottle '{bottle_name}' egress.allowlist must contain only strings; found a '{t}' entry.") - return list(raw) + """Hostnames in bottles[].egress.allowlist.""" + return list(manifest.bottles[bottle_name].egress.allowlist) def pipelock_bottle_ssh_hostnames(manifest: Manifest, bottle_name: str) -> list[str]: - out: list[str] = [] - for entry in manifest_bottle_ssh(manifest, bottle_name): - h = entry.get("Hostname") or "" - if h: - out.append(h) - return out + return [e.Hostname for e in manifest.bottles[bottle_name].ssh if e.Hostname] _IPV4_RE = re.compile(r"^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+$") @@ -256,19 +245,3 @@ def pipelock_stop(slug: str) -> None: stderr=subprocess.DEVNULL, ).returncode != 0: warn(f"failed to remove pipelock sidecar {name}; clean up with 'docker rm -f {name}'") - - -def _json_type(value: object) -> str: - if value is None: - return "null" - if isinstance(value, bool): - return "boolean" - if isinstance(value, (int, float)): - return "number" - if isinstance(value, str): - return "string" - if isinstance(value, list): - return "array" - if isinstance(value, dict): - return "object" - return type(value).__name__ diff --git a/claude_bottle/ssh.py b/claude_bottle/ssh.py index 3c37e16..634958f 100644 --- a/claude_bottle/ssh.py +++ b/claude_bottle/ssh.py @@ -36,31 +36,26 @@ from __future__ import annotations import os import subprocess from pathlib import Path -from typing import Any +from typing import Sequence from .log import die, info +from .manifest import SshEntry -def ssh_validate_entries(entries: list[dict[str, Any]]) -> None: - """Each entry must have Host + IdentityFile, and the IdentityFile - must exist on the host (after expanding leading ~).""" +def ssh_validate_entries(entries: Sequence[SshEntry]) -> None: + """The IdentityFile must exist on the host (after expanding leading ~). + Host and IdentityFile shape are already enforced by Manifest validation.""" for entry in entries: - name = entry.get("Host", "") - key = entry.get("IdentityFile", "") - if not name: - die(f"ssh entry missing required field 'Host': {entry}") - if not key: - die(f"ssh entry '{name}' missing required field 'IdentityFile'") - key = _expand_tilde(key) + key = _expand_tilde(entry.IdentityFile) if not os.path.isfile(key): - die(f"ssh key file not found for host '{name}': {key}") + die(f"ssh key file not found for host '{entry.Host}': {key}") def ssh_setup( container: str, stage_dir: Path, proxy_host_port: str, - entries: list[dict[str, Any]], + entries: Sequence[SshEntry], ) -> None: """Set up SSH in the container so node can authenticate using each entry's key without the key file being readable by node.""" @@ -91,12 +86,12 @@ def ssh_setup( container_key_paths: list[str] = [] for entry in entries: - name = entry["Host"] - key = _expand_tilde(entry["IdentityFile"]) - hostname = entry["Hostname"] - user = entry["User"] - port = str(entry["Port"]) - known_host_key = entry.get("KnownHostKey", "") + name = entry.Host + key = _expand_tilde(entry.IdentityFile) + hostname = entry.Hostname + user = entry.User + port = entry.Port + known_host_key = entry.KnownHostKey key_basename = os.path.basename(key) container_key_path = f"{keys_dir}/{key_basename}" diff --git a/tests/fixtures.py b/tests/fixtures.py index d4c6432..fae2f66 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -1,4 +1,7 @@ -"""Manifest fixtures for the test suite.""" +"""Manifest fixtures for the test suite. Each fixture returns a built +Manifest dataclass; callers that need the raw JSON shape (e.g. to write +to a file on disk) can build it themselves or call .from_json_obj on +a dict literal in the test.""" from __future__ import annotations @@ -7,9 +10,11 @@ import tempfile from pathlib import Path from typing import Any +from claude_bottle.manifest import Manifest -def fixture_minimal() -> dict[str, Any]: - """One bottle, one agent, no env / ssh / skills.""" + +def fixture_minimal_dict() -> dict[str, Any]: + """One bottle, one agent, no env / ssh / skills. JSON shape.""" return { "bottles": {"dev": {}}, "agents": { @@ -18,8 +23,8 @@ def fixture_minimal() -> dict[str, Any]: } -def fixture_with_egress() -> dict[str, Any]: - """Bottle declares an egress.allowlist.""" +def fixture_with_egress_dict() -> dict[str, Any]: + """Bottle declares an egress.allowlist. JSON shape.""" return { "bottles": { "dev": { @@ -32,9 +37,9 @@ def fixture_with_egress() -> dict[str, Any]: } -def fixture_with_ssh() -> dict[str, Any]: +def fixture_with_ssh_dict() -> dict[str, Any]: """Bottle has both an IPv4-literal SSH host (CGNAT) and a hostname host, - exercising both ssrf.ip_allowlist and trusted_domains code paths.""" + exercising both ssrf.ip_allowlist and trusted_domains code paths. JSON shape.""" return { "bottles": { "dev": { @@ -60,8 +65,22 @@ def fixture_with_ssh() -> dict[str, Any]: } +def fixture_minimal() -> Manifest: + return Manifest.from_json_obj(fixture_minimal_dict()) + + +def fixture_with_egress() -> Manifest: + return Manifest.from_json_obj(fixture_with_egress_dict()) + + +def fixture_with_ssh() -> Manifest: + return Manifest.from_json_obj(fixture_with_ssh_dict()) + + def write_fixture(fn) -> Path: - """Write fixture dict to a temp file; return the path. Caller must rm.""" + """Write fixture JSON to a temp file; return the path. Caller must rm. + Accepts a function returning either a dict (JSON shape) or a Manifest; + only the dict form is supported here since we need to serialize.""" f = tempfile.NamedTemporaryFile( mode="w", suffix=".json", delete=False, encoding="utf-8" ) diff --git a/tests/test_manifest_runtime.py b/tests/test_manifest_runtime.py index 4ba013d..829ce67 100644 --- a/tests/test_manifest_runtime.py +++ b/tests/test_manifest_runtime.py @@ -1,15 +1,18 @@ -"""Unit: manifest_bottle_runtime — defaults to runc, accepts runsc, -rejects unknown values and non-strings.""" +"""Unit: bottle runtime — Manifest.from_json_obj defaults runtime to runc, +accepts runsc, and rejects unknown values, non-strings, and empty strings.""" import unittest from claude_bottle.log import Die -from claude_bottle.manifest import manifest_bottle_runtime +from claude_bottle.manifest import Manifest -def _bottle(runtime_value: object | None) -> dict: - """Build a minimal manifest with one bottle whose runtime field is - set (or absent if `runtime_value is _ABSENT`).""" +_ABSENT = object() + + +def _bottle(runtime_value: object) -> dict: + """Build a minimal manifest JSON shape with one bottle whose runtime + field is set (or absent if `runtime_value is _ABSENT`).""" bottle: dict = {} if runtime_value is not _ABSENT: bottle["runtime"] = runtime_value @@ -19,30 +22,30 @@ def _bottle(runtime_value: object | None) -> dict: } -_ABSENT = object() - - class TestManifestBottleRuntime(unittest.TestCase): def test_default_runc_when_absent(self): - self.assertEqual("runc", manifest_bottle_runtime(_bottle(_ABSENT), "dev")) + m = Manifest.from_json_obj(_bottle(_ABSENT)) + self.assertEqual("runc", m.bottles["dev"].runtime) def test_explicit_runc(self): - self.assertEqual("runc", manifest_bottle_runtime(_bottle("runc"), "dev")) + m = Manifest.from_json_obj(_bottle("runc")) + self.assertEqual("runc", m.bottles["dev"].runtime) def test_explicit_runsc(self): - self.assertEqual("runsc", manifest_bottle_runtime(_bottle("runsc"), "dev")) + m = Manifest.from_json_obj(_bottle("runsc")) + self.assertEqual("runsc", m.bottles["dev"].runtime) def test_rejects_unknown_runtime(self): with self.assertRaises(Die): - manifest_bottle_runtime(_bottle("kata-runtime"), "dev") + Manifest.from_json_obj(_bottle("kata-runtime")) def test_rejects_non_string(self): with self.assertRaises(Die): - manifest_bottle_runtime(_bottle(42), "dev") + Manifest.from_json_obj(_bottle(42)) def test_rejects_empty_string(self): with self.assertRaises(Die): - manifest_bottle_runtime(_bottle(""), "dev") + Manifest.from_json_obj(_bottle("")) if __name__ == "__main__": diff --git a/tests/test_pipelock_allowlist.py b/tests/test_pipelock_allowlist.py index 08911fd..887cfcf 100644 --- a/tests/test_pipelock_allowlist.py +++ b/tests/test_pipelock_allowlist.py @@ -5,6 +5,7 @@ pipelock_bottle_ssh_trusted_domains, pipelock_effective_allowlist.""" import unittest from claude_bottle.log import Die +from claude_bottle.manifest import Manifest from claude_bottle.pipelock import ( pipelock_bottle_allowlist, pipelock_bottle_ssh_hostnames, @@ -32,7 +33,7 @@ class TestBottleAllowlist(unittest.TestCase): "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, } with self.assertRaises(Die): - pipelock_bottle_allowlist(bad, "dev") + Manifest.from_json_obj(bad) class TestSSHHostnames(unittest.TestCase): @@ -54,7 +55,7 @@ class TestSSHHostnames(unittest.TestCase): class TestEffectiveAllowlist(unittest.TestCase): def test_union_and_dedup(self): - manifest = { + manifest = Manifest.from_json_obj({ "bottles": { "dev": { "egress": {"allowlist": ["registry.npmjs.org"]}, @@ -67,7 +68,7 @@ class TestEffectiveAllowlist(unittest.TestCase): } }, "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, - } + }) eff = pipelock_effective_allowlist(manifest, "dev") self.assertIn("api.anthropic.com", eff) self.assertIn("registry.npmjs.org", eff) diff --git a/tests/test_pipelock_yaml.py b/tests/test_pipelock_yaml.py index 184bf83..9602458 100644 --- a/tests/test_pipelock_yaml.py +++ b/tests/test_pipelock_yaml.py @@ -7,6 +7,7 @@ import tempfile import unittest from pathlib import Path +from claude_bottle.manifest import Manifest from claude_bottle.pipelock import pipelock_write_yaml from tests.fixtures import fixture_minimal, fixture_with_ssh @@ -50,7 +51,7 @@ class TestPipelockYaml(unittest.TestCase): self.assertIn("100.78.141.42", content) def test_secret_hygiene(self): - manifest = { + manifest = Manifest.from_json_obj({ "bottles": { "dev": { "env": { @@ -61,7 +62,7 @@ class TestPipelockYaml(unittest.TestCase): } }, "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, - } + }) yaml_path = self.out_dir / "secret.yaml" pipelock_write_yaml(manifest, "dev", yaml_path) content = yaml_path.read_text()