Convert Manifest to frozen dataclasses #4

Merged
didericis merged 4 commits from convert-manifest-to-dataclass into main 2026-05-10 21:42:35 -04:00
12 changed files with 413 additions and 320 deletions
+20 -34
View File
@@ -5,15 +5,7 @@ from __future__ import annotations
import argparse
from ..log import info
from ..manifest import (
manifest_agent_bottle,
manifest_env_names,
manifest_prompt,
manifest_require_agent,
manifest_resolve,
manifest_skills,
manifest_ssh,
)
from ..manifest import Manifest
from ._common import PROG, USER_CWD
@@ -22,39 +14,33 @@ def cmd_info(argv: list[str]) -> int:
parser.add_argument("name", help="agent name defined in claude-bottle.json")
args = parser.parse_args(argv)
manifest = manifest_resolve(USER_CWD)
manifest_require_agent(manifest, args.name)
manifest = Manifest.resolve(USER_CWD)
manifest.require_agent(args.name)
env_names = manifest_env_names(manifest, args.name)
skill_names = manifest_skills(manifest, args.name)
prompt_content = manifest_prompt(manifest, args.name)
prompt_first_line = prompt_content.splitlines()[0] if prompt_content else ""
bottle_name = manifest_agent_bottle(manifest, args.name)
ssh_entries = manifest_ssh(manifest, args.name)
agent = manifest.agents[args.name]
bottle = manifest.bottle_for(args.name)
env_names = list(bottle.env.keys())
prompt_first_line = agent.prompt.splitlines()[0] if agent.prompt else ""
print()
info(f"agent : {args.name}")
info(f"env (names only): {', '.join(env_names) if env_names else '(none)'}")
info(f"skills : {' '.join(skill_names) if skill_names else '(none)'}")
info(f"skills : {' '.join(agent.skills) if agent.skills else '(none)'}")
info(
f"prompt : {len(prompt_content)} chars; "
f"prompt : {len(agent.prompt)} chars; "
f"first line: {prompt_first_line or '(empty)'}"
)
if bottle_name:
info(f"bottle : {bottle_name}")
if ssh_entries:
for e in ssh_entries:
info(
f" ssh host : {e.get('Host')} "
f"(Hostname={e.get('Hostname')}, User={e.get('User')}, "
f"Port={e.get('Port')}, IdentityFile={e.get('IdentityFile')})"
)
if e.get("KnownHostKey"):
info(f" KnownHostKey: {e['KnownHostKey']}")
else:
info(" ssh hosts : (none)")
info(f"bottle : {agent.bottle}")
if bottle.ssh:
for e in bottle.ssh:
info(
f" ssh host : {e.Host} "
f"(Hostname={e.Hostname}, User={e.User}, "
f"Port={e.Port}, IdentityFile={e.IdentityFile})"
)
if e.KnownHostKey:
info(f" KnownHostKey: {e.KnownHostKey}")
else:
info("bottle : (none)")
info(" ssh hosts : (none)")
print()
return 0
+3 -3
View File
@@ -7,7 +7,7 @@ import subprocess
from .. import docker as docker_mod
from ..log import info
from ..manifest import manifest_resolve
from ..manifest import Manifest
from ._common import PROG, USER_CWD
@@ -17,8 +17,8 @@ def cmd_list(argv: list[str]) -> int:
args = parser.parse_args(argv)
if args.scope == "available":
manifest = manifest_resolve(USER_CWD)
for name in (manifest.get("agents") or {}).keys():
manifest = Manifest.resolve(USER_CWD)
for name in manifest.agents.keys():
print(name)
return 0
+16 -32
View File
@@ -19,17 +19,7 @@ from .. import skills as skills_mod
from .. import ssh as ssh_mod
from ..env_resolve import env_resolve
from ..log import die, info
from ..manifest import (
manifest_agent_bottle,
manifest_bottle_runtime,
manifest_env_names,
manifest_prompt,
manifest_require_agent,
manifest_require_bottle,
manifest_resolve,
manifest_skills,
manifest_ssh,
)
from ..manifest import Manifest
from ._common import PROG, REPO_DIR, USER_CWD, read_tty_line
@@ -57,8 +47,11 @@ def cmd_start(argv: list[str]) -> int:
runtime_image = derived_image
docker_mod.require_docker()
manifest = manifest_resolve(USER_CWD)
manifest_require_agent(manifest, name)
manifest = Manifest.resolve(USER_CWD)
manifest.require_agent(name)
agent = manifest.agents[name]
bottle_name = agent.bottle
bottle = manifest.bottle_for(name)
container = pinned_container or default_container
suffix = 2
@@ -81,7 +74,7 @@ def cmd_start(argv: list[str]) -> int:
)
# --- Plan resolution (host-only, no container yet) ---
env_names = manifest_env_names(manifest, name)
env_names = list(bottle.env.keys())
# CLAUDE_BOTTLE_OAUTH_TOKEN → CLAUDE_CODE_OAUTH_TOKEN forwarding.
# Host-side token is always forwarded so every container can authenticate.
@@ -90,23 +83,14 @@ def cmd_start(argv: list[str]) -> int:
if forward_oauth_token:
display_env_names.append("CLAUDE_CODE_OAUTH_TOKEN")
skill_names = manifest_skills(manifest, name)
if skill_names:
skills_mod.skills_validate_all(skill_names)
if agent.skills:
skills_mod.skills_validate_all(list(agent.skills))
bottle_name = manifest_agent_bottle(manifest, name)
if not bottle_name:
die(
f"agent '{name}' has no 'bottle' field. "
f"Add a bottle association to this agent in claude-bottle.json."
)
manifest_require_bottle(manifest, bottle_name)
runtime = manifest_bottle_runtime(manifest, bottle_name)
runtime = bottle.runtime
if runtime == "runsc":
docker_mod.require_runsc()
ssh_entries = manifest_ssh(manifest, name)
ssh_entries = bottle.ssh
if ssh_entries:
ssh_mod.ssh_validate_entries(ssh_entries)
@@ -153,7 +137,7 @@ def cmd_start(argv: list[str]) -> int:
env_resolve(manifest, name, env_file, args_file)
prompt_content = manifest_prompt(manifest, name)
prompt_content = agent.prompt
prompt_file.write_text(prompt_content)
prompt_first_line = prompt_content.splitlines()[0] if prompt_content else ""
@@ -169,11 +153,11 @@ def cmd_start(argv: list[str]) -> int:
"env (names only): "
+ (", ".join(display_env_names) if display_env_names else "(none)")
)
info("skills : " + (" ".join(skill_names) if skill_names else "(none)"))
info("skills : " + (" ".join(agent.skills) if agent.skills else "(none)"))
info(f"bottle : {bottle_name}")
info(f" runtime : {runtime}{' (gVisor)' if runtime == 'runsc' else ''}")
if ssh_entries:
ssh_names = ", ".join(e.get("Host", "") for e in ssh_entries)
ssh_names = ", ".join(e.Host for e in ssh_entries)
info(f" ssh hosts : {ssh_names}")
else:
info(" ssh hosts : (none)")
@@ -293,8 +277,8 @@ def cmd_start(argv: list[str]) -> int:
check=True,
)
if skill_names:
skills_mod.skills_copy_into(container, skill_names)
if agent.skills:
skills_mod.skills_copy_into(container, list(agent.skills))
if ssh_entries:
proxy_host_port = pipelock.pipelock_proxy_host_port(slug)
+3 -3
View File
@@ -35,7 +35,7 @@ import sys
from pathlib import Path
from .log import die
from .manifest import Manifest, manifest_env_entry, manifest_env_names
from .manifest import Manifest
_INTERPOLATED_RE = re.compile(r"^\$\{([A-Za-z_][A-Za-z0-9_]*)\}$")
@@ -108,10 +108,10 @@ def env_resolve(
- interpolated: copy host value; export under target name; append `-e NAME`
- literal: append `NAME=VALUE` to env_file
"""
for name in manifest_env_names(manifest, agent):
bottle = manifest.bottle_for(agent)
for name, raw in bottle.env.items():
if not name:
continue
raw = manifest_env_entry(manifest, agent, name)
kind = env_entry_kind(raw)
if kind == "secret":
prompt_body = env_entry_secret_prompt(raw)
+2 -1
View File
@@ -3,6 +3,7 @@
from __future__ import annotations
import sys
from typing import NoReturn
def info(msg: str) -> None:
@@ -18,6 +19,6 @@ class Die(SystemExit):
fatal exit from an unrelated SystemExit."""
def die(msg: str) -> "Die":
def die(msg: str) -> NoReturn:
print(f"claude-bottle: error: {msg}", file=sys.stderr)
raise Die(1)
+299 -169
View File
@@ -1,5 +1,5 @@
"""Manifest helpers. Read claude-bottle.json and pull the definition for a
named agent.
"""Manifest dataclasses. Read claude-bottle.json (cwd + $HOME, deep-merged)
into a frozen, validated Manifest tree.
Schema (see CLAUDE.md "Intended design"):
{
@@ -7,7 +7,8 @@ Schema (see CLAUDE.md "Intended design"):
"<bottle-name>": {
"env": { "<NAME>": <env-entry>, ... },
"ssh": [ <ssh-entry>, ... ],
"egress": { "allowlist": [ "<hostname>", ... ] }
"egress": { "allowlist": [ "<hostname>", ... ] },
"runtime": "runc" | "runsc"
}
},
"agents": {
@@ -21,199 +22,328 @@ Schema (see CLAUDE.md "Intended design"):
Bottles group shared infrastructure (SSH keys, known hosts, egress allowlist)
that multiple agents can reference. Every agent must reference a bottle.
Validation runs once at construction (Manifest.from_json_obj) so getters
can trust the shape.
"""
from __future__ import annotations
import json
import os
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
from typing import Literal, Mapping, cast
from .log import die
Manifest = dict[str, Any]
Runtime = Literal["runc", "runsc"]
_SUPPORTED_RUNTIMES: tuple[Runtime, ...] = ("runc", "runsc")
def manifest_resolve(cwd: str) -> Manifest:
"""Look for claude-bottle.json in <cwd> and in $HOME, deep-merge
them (cwd entries override home entries on key conflict for both
bottles and agents). Dies if neither file is found or either is
invalid JSON."""
cwd_file = Path(cwd) / "claude-bottle.json"
home_file = Path(os.environ["HOME"]) / "claude-bottle.json"
cwd_doc = _load_json_or_die(cwd_file) if cwd_file.is_file() else None
home_doc = _load_json_or_die(home_file) if home_file.is_file() else None
if cwd_doc is None and home_doc is None:
die(f"no claude-bottle.json found in {cwd} or {os.environ['HOME']}")
if cwd_doc is None:
return home_doc # type: ignore[return-value]
if home_doc is None:
return cwd_doc
return {
"bottles": {**(home_doc.get("bottles") or {}), **(cwd_doc.get("bottles") or {})},
"agents": {**(home_doc.get("agents") or {}), **(cwd_doc.get("agents") or {})},
}
def _empty_str_dict() -> dict[str, str]:
return {}
def _load_json_or_die(path: Path) -> Manifest:
try:
with path.open() as f:
doc = json.load(f)
except json.JSONDecodeError:
die(f"claude-bottle.json at {path} is not valid JSON")
if not isinstance(doc, dict):
die(f"claude-bottle.json at {path} must be a JSON object")
return doc
@dataclass(frozen=True)
class SshEntry:
Host: str
IdentityFile: str
Hostname: str = ""
User: str = ""
Port: str = ""
KnownHostKey: str = ""
@classmethod
def from_dict(cls, bottle_name: str, idx: int, raw: object) -> "SshEntry":
d = _as_json_object(raw, f"bottle '{bottle_name}' ssh[{idx}]")
host = d.get("Host")
if not isinstance(host, str) or not host:
die(f"bottle '{bottle_name}' ssh[{idx}] missing required string field 'Host'")
ident = d.get("IdentityFile")
if not isinstance(ident, str) or not ident:
die(
f"bottle '{bottle_name}' ssh '{host}' missing required string field "
f"'IdentityFile'"
)
hostname = _opt_str(d.get("Hostname"), f"bottle '{bottle_name}' ssh '{host}' Hostname")
user = _opt_str(d.get("User"), f"bottle '{bottle_name}' ssh '{host}' User")
port = _opt_port(d.get("Port"), f"bottle '{bottle_name}' ssh '{host}' Port")
khk = _opt_str(
d.get("KnownHostKey"),
f"bottle '{bottle_name}' ssh '{host}' KnownHostKey",
)
return cls(
Host=host,
IdentityFile=ident,
Hostname=hostname,
User=user,
Port=port,
KnownHostKey=khk,
)
def manifest_has_agent(manifest: Manifest, name: str) -> bool:
return name in (manifest.get("agents") or {})
@dataclass(frozen=True)
class BottleEgress:
allowlist: tuple[str, ...] = ()
@classmethod
def from_dict(cls, bottle_name: str, raw: object) -> "BottleEgress":
d = _as_json_object(raw, f"bottle '{bottle_name}' egress")
allow = d.get("allowlist")
if allow is None:
return cls()
if not isinstance(allow, list):
die(
f"bottle '{bottle_name}' egress.allowlist must be an array "
f"(was {type(allow).__name__})"
)
items: list[str] = []
allow_list = cast(list[object], allow)
for i, host in enumerate(allow_list):
if not isinstance(host, str):
die(
f"bottle '{bottle_name}' egress.allowlist[{i}] must be a string "
f"(was {type(host).__name__})"
)
items.append(host)
return cls(allowlist=tuple(items))
def manifest_require_agent(manifest: Manifest, name: str) -> None:
"""Like has_agent but dies with the available agent names listed."""
if manifest_has_agent(manifest, name):
return
available = ", ".join((manifest.get("agents") or {}).keys())
if available:
die(f"agent '{name}' not defined in claude-bottle.json. Available: {available}")
else:
@dataclass(frozen=True)
class Bottle:
env: Mapping[str, str] = field(default_factory=_empty_str_dict)
ssh: tuple[SshEntry, ...] = ()
egress: BottleEgress = field(default_factory=BottleEgress)
runtime: Runtime = "runc"
@classmethod
def from_dict(cls, name: str, raw: object) -> "Bottle":
d = _as_json_object(raw, f"bottle '{name}'")
env: dict[str, str] = {}
env_raw = d.get("env")
if env_raw is not None:
env_dict = _as_json_object(env_raw, f"bottle '{name}' env")
for var, value in env_dict.items():
if not isinstance(value, str):
die(
f"env entry {var} in bottle '{name}' must be a JSON string "
f"(was {type(value).__name__}). Use \"?<message>\" for prompt-at-runtime."
)
env[var] = value
ssh: tuple[SshEntry, ...] = ()
ssh_raw = d.get("ssh")
if ssh_raw is not None:
if not isinstance(ssh_raw, list):
die(f"bottle '{name}' ssh must be an array (was {type(ssh_raw).__name__})")
ssh_list = cast(list[object], ssh_raw)
ssh = tuple(
SshEntry.from_dict(name, i, entry)
for i, entry in enumerate(ssh_list)
)
egress_raw = d.get("egress")
egress = (
BottleEgress.from_dict(name, egress_raw)
if egress_raw is not None
else BottleEgress()
)
runtime_raw = d.get("runtime")
runtime: Runtime
if runtime_raw is None:
runtime = "runc"
else:
if not isinstance(runtime_raw, str):
die(f"bottle '{name}' runtime must be a string (was {type(runtime_raw).__name__})")
if runtime_raw not in _SUPPORTED_RUNTIMES:
die(
f"bottle '{name}' runtime '{runtime_raw}' is not supported. "
f"Use one of: {', '.join(_SUPPORTED_RUNTIMES)}."
)
runtime = runtime_raw
return cls(env=env, ssh=ssh, egress=egress, runtime=runtime)
@dataclass(frozen=True)
class Agent:
bottle: str
skills: tuple[str, ...] = ()
prompt: str = ""
@classmethod
def from_dict(cls, name: str, raw: object, bottle_names: set[str]) -> "Agent":
d = _as_json_object(raw, f"agent '{name}'")
bottle = d.get("bottle")
if not isinstance(bottle, str) or not bottle:
die(f"agent '{name}' must declare a 'bottle' field naming a defined bottle")
if bottle not in bottle_names:
available = ", ".join(sorted(bottle_names)) or "(none defined)"
die(
f"agent '{name}' references bottle '{bottle}', which is not defined. "
f"Available: {available}"
)
skills: tuple[str, ...] = ()
skills_raw = d.get("skills")
if skills_raw is not None:
if not isinstance(skills_raw, list):
die(f"agent '{name}' skills must be an array (was {type(skills_raw).__name__})")
collected: list[str] = []
skills_list = cast(list[object], skills_raw)
for i, skill in enumerate(skills_list):
if not isinstance(skill, str):
die(
f"agent '{name}' skills[{i}] must be a string "
f"(was {type(skill).__name__})"
)
collected.append(skill)
skills = tuple(collected)
prompt_raw = d.get("prompt")
if prompt_raw is None:
prompt = ""
elif isinstance(prompt_raw, str):
prompt = prompt_raw
else:
die(f"agent '{name}' prompt must be a string (was {type(prompt_raw).__name__})")
return cls(bottle=bottle, skills=skills, prompt=prompt)
@dataclass(frozen=True)
class Manifest:
bottles: Mapping[str, Bottle]
agents: Mapping[str, Agent]
@classmethod
def resolve(cls, cwd: str) -> "Manifest":
"""Look for claude-bottle.json in <cwd> and in $HOME, deep-merge
them (cwd entries override home entries on key conflict for both
bottles and agents), then validate. Dies if neither file is
found, either is invalid JSON, or the merged shape violates the
schema."""
cwd_file = Path(cwd) / "claude-bottle.json"
home_file = Path(os.environ["HOME"]) / "claude-bottle.json"
cwd_doc = _load_json_or_die(cwd_file) if cwd_file.is_file() else None
home_doc = _load_json_or_die(home_file) if home_file.is_file() else None
if cwd_doc is None and home_doc is None:
die(f"no claude-bottle.json found in {cwd} or {os.environ['HOME']}")
h: dict[str, object] = home_doc if home_doc is not None else {}
c: dict[str, object] = cwd_doc if cwd_doc is not None else {}
h_bottles = _section_dict(h.get("bottles"), "bottles")
c_bottles = _section_dict(c.get("bottles"), "bottles")
h_agents = _section_dict(h.get("agents"), "agents")
c_agents = _section_dict(c.get("agents"), "agents")
merged: dict[str, object] = {
"bottles": {**h_bottles, **c_bottles},
"agents": {**h_agents, **c_agents},
}
return cls.from_json_obj(merged)
@classmethod
def from_json_obj(cls, obj: object) -> "Manifest":
"""Validate and build a Manifest from a raw JSON-like dict."""
d = _as_json_object(obj, "manifest")
raw_bottles = _section_dict(d.get("bottles"), "manifest 'bottles'")
raw_agents = _section_dict(d.get("agents"), "manifest 'agents'")
bottles: dict[str, Bottle] = {
n: Bottle.from_dict(n, b) for n, b in raw_bottles.items()
}
bottle_names = set(bottles.keys())
agents: dict[str, Agent] = {
n: Agent.from_dict(n, a, bottle_names) for n, a in raw_agents.items()
}
return cls(bottles=bottles, agents=agents)
def has_agent(self, name: str) -> bool:
return name in self.agents
def require_agent(self, name: str) -> None:
if self.has_agent(name):
return
available = ", ".join(self.agents.keys())
if available:
die(f"agent '{name}' not defined in claude-bottle.json. Available: {available}")
die(f"agent '{name}' not defined in claude-bottle.json (manifest is empty).")
def has_bottle(self, name: str) -> bool:
return name in self.bottles
def manifest_env_names(manifest: Manifest, name: str) -> list[str]:
"""Names (not values) of bottles[agent.bottle].env, in declaration
order. Empty list if the agent has no bottle or the bottle has no env."""
agent = (manifest.get("agents") or {}).get(name) or {}
bottle_name = agent.get("bottle") or ""
if not bottle_name:
return []
bottle = (manifest.get("bottles") or {}).get(bottle_name) or {}
return list((bottle.get("env") or {}).keys())
def require_bottle(self, name: str) -> None:
if self.has_bottle(name):
return
available = ", ".join(self.bottles.keys())
if available:
die(
f"bottle '{name}' not defined in claude-bottle.json. "
f"Available bottles: {available}"
)
die(f"bottle '{name}' not defined in claude-bottle.json (no bottles defined).")
def bottle_for(self, agent_name: str) -> Bottle:
"""Resolve the Bottle the named agent references. The validator
guarantees both lookups succeed for a manifest built via
from_json_obj."""
return self.bottles[self.agents[agent_name].bottle]
def manifest_env_entry(manifest: Manifest, agent: str, var: str) -> str:
"""Raw string value of one env entry. Used by env_resolve, which
classifies the result by sentinel. Dies if the agent has no bottle,
or the entry is not a string."""
agent_def = (manifest.get("agents") or {}).get(agent) or {}
bottle_name = agent_def.get("bottle") or ""
if not bottle_name:
die(f"env entry {var} for agent {agent}: agent has no 'bottle' field")
bottle = (manifest.get("bottles") or {}).get(bottle_name) or {}
env = bottle.get("env") or {}
value = env.get(var)
def _as_json_object(value: object, label: str) -> dict[str, object]:
"""Assert that `value` is a JSON object (str-keyed dict) and return
a view typed as `dict[str, object]` so downstream `.get(...)` calls
have a typed surface."""
if not isinstance(value, dict):
die(f"{label} must be a JSON object (was {type(value).__name__})")
items = cast(dict[object, object], value)
out: dict[str, object] = {}
for k, v in items.items():
if not isinstance(k, str):
die(f"{label} keys must be strings (found {type(k).__name__})")
out[k] = v
return out
def _section_dict(value: object, label: str) -> dict[str, object]:
"""Like _as_json_object but treats absent/null as an empty section."""
if value is None:
return {}
return _as_json_object(value, label)
def _load_json_or_die(path: Path) -> dict[str, object]:
try:
with path.open() as f:
doc: object = json.load(f)
except json.JSONDecodeError:
die(f"claude-bottle.json at {path} is not valid JSON")
return _as_json_object(doc, f"claude-bottle.json at {path}")
def _opt_str(value: object, label: str) -> str:
if value is None:
return ""
if not isinstance(value, str):
actual = _json_type(value)
die(
f"env entry {var} for agent {agent} must be a JSON string "
f"(was {actual}). Use \"?<message>\" for prompt-at-runtime."
)
die(f"{label} must be a string (was {type(value).__name__})")
return value
def manifest_skills(manifest: Manifest, name: str) -> list[str]:
agent = (manifest.get("agents") or {}).get(name) or {}
return list(agent.get("skills") or [])
def manifest_prompt(manifest: Manifest, name: str) -> str:
agent = (manifest.get("agents") or {}).get(name) or {}
return agent.get("prompt") or ""
def manifest_agent_bottle(manifest: Manifest, name: str) -> str:
agent = (manifest.get("agents") or {}).get(name) or {}
return agent.get("bottle") or ""
def manifest_has_bottle(manifest: Manifest, bottle_name: str) -> bool:
return bottle_name in (manifest.get("bottles") or {})
def manifest_require_bottle(manifest: Manifest, bottle_name: str) -> None:
if manifest_has_bottle(manifest, bottle_name):
return
available = ", ".join((manifest.get("bottles") or {}).keys())
if available:
die(
f"bottle '{bottle_name}' not defined in claude-bottle.json. "
f"Available bottles: {available}"
)
else:
die(f"bottle '{bottle_name}' not defined in claude-bottle.json (no bottles defined).")
def manifest_bottle_ssh(manifest: Manifest, bottle_name: str) -> list[dict[str, Any]]:
bottle = (manifest.get("bottles") or {}).get(bottle_name) or {}
return list(bottle.get("ssh") or [])
_SUPPORTED_RUNTIMES: tuple[str, ...] = ("runc", "runsc")
def manifest_bottle_runtime(manifest: Manifest, bottle_name: str) -> str:
"""Container runtime for the bottle's agent container. Returns
"runc" (Docker default) or "runsc" (gVisor opt-in). Dies if the
field is present but not one of the supported values."""
bottle = (manifest.get("bottles") or {}).get(bottle_name) or {}
raw = bottle.get("runtime")
if raw is None:
return "runc"
if not isinstance(raw, str):
die(
f"bottle '{bottle_name}' runtime must be a string "
f"(was {_json_type(raw)})."
)
if raw not in _SUPPORTED_RUNTIMES:
die(
f"bottle '{bottle_name}' runtime '{raw}' is not supported. "
f"Use one of: {', '.join(_SUPPORTED_RUNTIMES)}."
)
return raw
def manifest_bottle_egress_allowlist(manifest: Manifest, bottle_name: str) -> list[str]:
"""Hostnames in bottles[bottle_name].egress.allowlist. Dies if the
field is present but not an array. Per-element string typing is
re-checked at use-time in pipelock."""
bottle = (manifest.get("bottles") or {}).get(bottle_name) or {}
allowlist = (bottle.get("egress") or {}).get("allowlist")
if allowlist is None:
return []
if not isinstance(allowlist, list):
die(
f"bottle '{bottle_name}' egress.allowlist must be an array "
f"(was {_json_type(allowlist)})."
)
return list(allowlist)
def manifest_ssh(manifest: Manifest, agent_name: str) -> list[dict[str, Any]]:
"""SSH entries resolved via the agent's "bottle" field; empty if no bottle set."""
bottle_name = manifest_agent_bottle(manifest, agent_name)
if not bottle_name:
return []
return manifest_bottle_ssh(manifest, bottle_name)
def _json_type(value: Any) -> str:
"""Mirror jq's type names for parity with the bash error messages."""
def _opt_port(value: object, label: str) -> str:
"""Port accepts string or int (JSON-friendly) and is normalized to str."""
if value is None:
return "null"
return ""
if isinstance(value, bool):
return "boolean"
if isinstance(value, (int, float)):
return "number"
die(f"{label} must be a string or number (was boolean)")
if isinstance(value, int):
return str(value)
if isinstance(value, str):
return "string"
if isinstance(value, list):
return "array"
if isinstance(value, dict):
return "object"
return type(value).__name__
return value
die(f"{label} must be a string or number (was {type(value).__name__})")
+4 -31
View File
@@ -18,7 +18,7 @@ import subprocess
from pathlib import Path
from .log import die, info, warn
from .manifest import Manifest, manifest_bottle_egress_allowlist, manifest_bottle_ssh
from .manifest import Manifest
# Pipelock image, pinned by digest. The digest is the multi-arch image
# index for ghcr.io/luckypipewrench/pipelock:2.3.0.
@@ -58,23 +58,12 @@ def pipelock_proxy_host_port(slug: str) -> str:
def pipelock_bottle_allowlist(manifest: Manifest, bottle_name: str) -> list[str]:
"""Hostnames in bottles[<bottle_name>].egress.allowlist. Validates
that each entry is a string."""
raw = manifest_bottle_egress_allowlist(manifest, bottle_name)
for entry in raw:
if not isinstance(entry, str):
t = _json_type(entry)
die(f"bottle '{bottle_name}' egress.allowlist must contain only strings; found a '{t}' entry.")
return list(raw)
"""Hostnames in bottles[<bottle_name>].egress.allowlist."""
return list(manifest.bottles[bottle_name].egress.allowlist)
def pipelock_bottle_ssh_hostnames(manifest: Manifest, bottle_name: str) -> list[str]:
out: list[str] = []
for entry in manifest_bottle_ssh(manifest, bottle_name):
h = entry.get("Hostname") or ""
if h:
out.append(h)
return out
return [e.Hostname for e in manifest.bottles[bottle_name].ssh if e.Hostname]
_IPV4_RE = re.compile(r"^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+$")
@@ -256,19 +245,3 @@ def pipelock_stop(slug: str) -> None:
stderr=subprocess.DEVNULL,
).returncode != 0:
warn(f"failed to remove pipelock sidecar {name}; clean up with 'docker rm -f {name}'")
def _json_type(value: object) -> str:
if value is None:
return "null"
if isinstance(value, bool):
return "boolean"
if isinstance(value, (int, float)):
return "number"
if isinstance(value, str):
return "string"
if isinstance(value, list):
return "array"
if isinstance(value, dict):
return "object"
return type(value).__name__
+14 -19
View File
@@ -36,31 +36,26 @@ from __future__ import annotations
import os
import subprocess
from pathlib import Path
from typing import Any
from typing import Sequence
from .log import die, info
from .manifest import SshEntry
def ssh_validate_entries(entries: list[dict[str, Any]]) -> None:
"""Each entry must have Host + IdentityFile, and the IdentityFile
must exist on the host (after expanding leading ~)."""
def ssh_validate_entries(entries: Sequence[SshEntry]) -> None:
"""The IdentityFile must exist on the host (after expanding leading ~).
Host and IdentityFile shape are already enforced by Manifest validation."""
for entry in entries:
name = entry.get("Host", "")
key = entry.get("IdentityFile", "")
if not name:
die(f"ssh entry missing required field 'Host': {entry}")
if not key:
die(f"ssh entry '{name}' missing required field 'IdentityFile'")
key = _expand_tilde(key)
key = _expand_tilde(entry.IdentityFile)
if not os.path.isfile(key):
die(f"ssh key file not found for host '{name}': {key}")
die(f"ssh key file not found for host '{entry.Host}': {key}")
def ssh_setup(
container: str,
stage_dir: Path,
proxy_host_port: str,
entries: list[dict[str, Any]],
entries: Sequence[SshEntry],
) -> None:
"""Set up SSH in the container so node can authenticate using each
entry's key without the key file being readable by node."""
@@ -91,12 +86,12 @@ def ssh_setup(
container_key_paths: list[str] = []
for entry in entries:
name = entry["Host"]
key = _expand_tilde(entry["IdentityFile"])
hostname = entry["Hostname"]
user = entry["User"]
port = str(entry["Port"])
known_host_key = entry.get("KnownHostKey", "")
name = entry.Host
key = _expand_tilde(entry.IdentityFile)
hostname = entry.Hostname
user = entry.User
port = entry.Port
known_host_key = entry.KnownHostKey
key_basename = os.path.basename(key)
container_key_path = f"{keys_dir}/{key_basename}"
+27 -8
View File
@@ -1,4 +1,7 @@
"""Manifest fixtures for the test suite."""
"""Manifest fixtures for the test suite. Each fixture returns a built
Manifest dataclass; callers that need the raw JSON shape (e.g. to write
to a file on disk) can build it themselves or call .from_json_obj on
a dict literal in the test."""
from __future__ import annotations
@@ -7,9 +10,11 @@ import tempfile
from pathlib import Path
from typing import Any
from claude_bottle.manifest import Manifest
def fixture_minimal() -> dict[str, Any]:
"""One bottle, one agent, no env / ssh / skills."""
def fixture_minimal_dict() -> dict[str, Any]:
"""One bottle, one agent, no env / ssh / skills. JSON shape."""
return {
"bottles": {"dev": {}},
"agents": {
@@ -18,8 +23,8 @@ def fixture_minimal() -> dict[str, Any]:
}
def fixture_with_egress() -> dict[str, Any]:
"""Bottle declares an egress.allowlist."""
def fixture_with_egress_dict() -> dict[str, Any]:
"""Bottle declares an egress.allowlist. JSON shape."""
return {
"bottles": {
"dev": {
@@ -32,9 +37,9 @@ def fixture_with_egress() -> dict[str, Any]:
}
def fixture_with_ssh() -> dict[str, Any]:
def fixture_with_ssh_dict() -> dict[str, Any]:
"""Bottle has both an IPv4-literal SSH host (CGNAT) and a hostname host,
exercising both ssrf.ip_allowlist and trusted_domains code paths."""
exercising both ssrf.ip_allowlist and trusted_domains code paths. JSON shape."""
return {
"bottles": {
"dev": {
@@ -60,8 +65,22 @@ def fixture_with_ssh() -> dict[str, Any]:
}
def fixture_minimal() -> Manifest:
return Manifest.from_json_obj(fixture_minimal_dict())
def fixture_with_egress() -> Manifest:
return Manifest.from_json_obj(fixture_with_egress_dict())
def fixture_with_ssh() -> Manifest:
return Manifest.from_json_obj(fixture_with_ssh_dict())
def write_fixture(fn) -> Path:
"""Write fixture dict to a temp file; return the path. Caller must rm."""
"""Write fixture JSON to a temp file; return the path. Caller must rm.
Accepts a function returning either a dict (JSON shape) or a Manifest;
only the dict form is supported here since we need to serialize."""
f = tempfile.NamedTemporaryFile(
mode="w", suffix=".json", delete=False, encoding="utf-8"
)
+18 -15
View File
@@ -1,15 +1,18 @@
"""Unit: manifest_bottle_runtime — defaults to runc, accepts runsc,
rejects unknown values and non-strings."""
"""Unit: bottle runtime — Manifest.from_json_obj defaults runtime to runc,
accepts runsc, and rejects unknown values, non-strings, and empty strings."""
import unittest
from claude_bottle.log import Die
from claude_bottle.manifest import manifest_bottle_runtime
from claude_bottle.manifest import Manifest
def _bottle(runtime_value: object | None) -> dict:
"""Build a minimal manifest with one bottle whose runtime field is
set (or absent if `runtime_value is _ABSENT`)."""
_ABSENT = object()
def _bottle(runtime_value: object) -> dict:
"""Build a minimal manifest JSON shape with one bottle whose runtime
field is set (or absent if `runtime_value is _ABSENT`)."""
bottle: dict = {}
if runtime_value is not _ABSENT:
bottle["runtime"] = runtime_value
@@ -19,30 +22,30 @@ def _bottle(runtime_value: object | None) -> dict:
}
_ABSENT = object()
class TestManifestBottleRuntime(unittest.TestCase):
def test_default_runc_when_absent(self):
self.assertEqual("runc", manifest_bottle_runtime(_bottle(_ABSENT), "dev"))
m = Manifest.from_json_obj(_bottle(_ABSENT))
self.assertEqual("runc", m.bottles["dev"].runtime)
def test_explicit_runc(self):
self.assertEqual("runc", manifest_bottle_runtime(_bottle("runc"), "dev"))
m = Manifest.from_json_obj(_bottle("runc"))
self.assertEqual("runc", m.bottles["dev"].runtime)
def test_explicit_runsc(self):
self.assertEqual("runsc", manifest_bottle_runtime(_bottle("runsc"), "dev"))
m = Manifest.from_json_obj(_bottle("runsc"))
self.assertEqual("runsc", m.bottles["dev"].runtime)
def test_rejects_unknown_runtime(self):
with self.assertRaises(Die):
manifest_bottle_runtime(_bottle("kata-runtime"), "dev")
Manifest.from_json_obj(_bottle("kata-runtime"))
def test_rejects_non_string(self):
with self.assertRaises(Die):
manifest_bottle_runtime(_bottle(42), "dev")
Manifest.from_json_obj(_bottle(42))
def test_rejects_empty_string(self):
with self.assertRaises(Die):
manifest_bottle_runtime(_bottle(""), "dev")
Manifest.from_json_obj(_bottle(""))
if __name__ == "__main__":
+4 -3
View File
@@ -5,6 +5,7 @@ pipelock_bottle_ssh_trusted_domains, pipelock_effective_allowlist."""
import unittest
from claude_bottle.log import Die
from claude_bottle.manifest import Manifest
from claude_bottle.pipelock import (
pipelock_bottle_allowlist,
pipelock_bottle_ssh_hostnames,
@@ -32,7 +33,7 @@ class TestBottleAllowlist(unittest.TestCase):
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
}
with self.assertRaises(Die):
pipelock_bottle_allowlist(bad, "dev")
Manifest.from_json_obj(bad)
class TestSSHHostnames(unittest.TestCase):
@@ -54,7 +55,7 @@ class TestSSHHostnames(unittest.TestCase):
class TestEffectiveAllowlist(unittest.TestCase):
def test_union_and_dedup(self):
manifest = {
manifest = Manifest.from_json_obj({
"bottles": {
"dev": {
"egress": {"allowlist": ["registry.npmjs.org"]},
@@ -67,7 +68,7 @@ class TestEffectiveAllowlist(unittest.TestCase):
}
},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
}
})
eff = pipelock_effective_allowlist(manifest, "dev")
self.assertIn("api.anthropic.com", eff)
self.assertIn("registry.npmjs.org", eff)
+3 -2
View File
@@ -7,6 +7,7 @@ import tempfile
import unittest
from pathlib import Path
from claude_bottle.manifest import Manifest
from claude_bottle.pipelock import pipelock_write_yaml
from tests.fixtures import fixture_minimal, fixture_with_ssh
@@ -50,7 +51,7 @@ class TestPipelockYaml(unittest.TestCase):
self.assertIn("100.78.141.42", content)
def test_secret_hygiene(self):
manifest = {
manifest = Manifest.from_json_obj({
"bottles": {
"dev": {
"env": {
@@ -61,7 +62,7 @@ class TestPipelockYaml(unittest.TestCase):
}
},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
}
})
yaml_path = self.out_dir / "secret.yaml"
pipelock_write_yaml(manifest, "dev", yaml_path)
content = yaml_path.read_text()