Convert Manifest to frozen dataclasses #4
+20
-34
@@ -5,15 +5,7 @@ from __future__ import annotations
|
||||
import argparse
|
||||
|
||||
from ..log import info
|
||||
from ..manifest import (
|
||||
manifest_agent_bottle,
|
||||
manifest_env_names,
|
||||
manifest_prompt,
|
||||
manifest_require_agent,
|
||||
manifest_resolve,
|
||||
manifest_skills,
|
||||
manifest_ssh,
|
||||
)
|
||||
from ..manifest import Manifest
|
||||
from ._common import PROG, USER_CWD
|
||||
|
||||
|
||||
@@ -22,39 +14,33 @@ def cmd_info(argv: list[str]) -> int:
|
||||
parser.add_argument("name", help="agent name defined in claude-bottle.json")
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
manifest = manifest_resolve(USER_CWD)
|
||||
manifest_require_agent(manifest, args.name)
|
||||
manifest = Manifest.resolve(USER_CWD)
|
||||
manifest.require_agent(args.name)
|
||||
|
||||
env_names = manifest_env_names(manifest, args.name)
|
||||
skill_names = manifest_skills(manifest, args.name)
|
||||
prompt_content = manifest_prompt(manifest, args.name)
|
||||
prompt_first_line = prompt_content.splitlines()[0] if prompt_content else ""
|
||||
|
||||
bottle_name = manifest_agent_bottle(manifest, args.name)
|
||||
ssh_entries = manifest_ssh(manifest, args.name)
|
||||
agent = manifest.agents[args.name]
|
||||
bottle = manifest.bottle_for(args.name)
|
||||
env_names = list(bottle.env.keys())
|
||||
prompt_first_line = agent.prompt.splitlines()[0] if agent.prompt else ""
|
||||
|
||||
print()
|
||||
info(f"agent : {args.name}")
|
||||
info(f"env (names only): {', '.join(env_names) if env_names else '(none)'}")
|
||||
info(f"skills : {' '.join(skill_names) if skill_names else '(none)'}")
|
||||
info(f"skills : {' '.join(agent.skills) if agent.skills else '(none)'}")
|
||||
info(
|
||||
f"prompt : {len(prompt_content)} chars; "
|
||||
f"prompt : {len(agent.prompt)} chars; "
|
||||
f"first line: {prompt_first_line or '(empty)'}"
|
||||
)
|
||||
if bottle_name:
|
||||
info(f"bottle : {bottle_name}")
|
||||
if ssh_entries:
|
||||
for e in ssh_entries:
|
||||
info(
|
||||
f" ssh host : {e.get('Host')} "
|
||||
f"(Hostname={e.get('Hostname')}, User={e.get('User')}, "
|
||||
f"Port={e.get('Port')}, IdentityFile={e.get('IdentityFile')})"
|
||||
)
|
||||
if e.get("KnownHostKey"):
|
||||
info(f" KnownHostKey: {e['KnownHostKey']}")
|
||||
else:
|
||||
info(" ssh hosts : (none)")
|
||||
info(f"bottle : {agent.bottle}")
|
||||
if bottle.ssh:
|
||||
for e in bottle.ssh:
|
||||
info(
|
||||
f" ssh host : {e.Host} "
|
||||
f"(Hostname={e.Hostname}, User={e.User}, "
|
||||
f"Port={e.Port}, IdentityFile={e.IdentityFile})"
|
||||
)
|
||||
if e.KnownHostKey:
|
||||
info(f" KnownHostKey: {e.KnownHostKey}")
|
||||
else:
|
||||
info("bottle : (none)")
|
||||
info(" ssh hosts : (none)")
|
||||
print()
|
||||
return 0
|
||||
|
||||
@@ -7,7 +7,7 @@ import subprocess
|
||||
|
||||
from .. import docker as docker_mod
|
||||
from ..log import info
|
||||
from ..manifest import manifest_resolve
|
||||
from ..manifest import Manifest
|
||||
from ._common import PROG, USER_CWD
|
||||
|
||||
|
||||
@@ -17,8 +17,8 @@ def cmd_list(argv: list[str]) -> int:
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
if args.scope == "available":
|
||||
manifest = manifest_resolve(USER_CWD)
|
||||
for name in (manifest.get("agents") or {}).keys():
|
||||
manifest = Manifest.resolve(USER_CWD)
|
||||
for name in manifest.agents.keys():
|
||||
print(name)
|
||||
return 0
|
||||
|
||||
|
||||
+16
-32
@@ -19,17 +19,7 @@ from .. import skills as skills_mod
|
||||
from .. import ssh as ssh_mod
|
||||
from ..env_resolve import env_resolve
|
||||
from ..log import die, info
|
||||
from ..manifest import (
|
||||
manifest_agent_bottle,
|
||||
manifest_bottle_runtime,
|
||||
manifest_env_names,
|
||||
manifest_prompt,
|
||||
manifest_require_agent,
|
||||
manifest_require_bottle,
|
||||
manifest_resolve,
|
||||
manifest_skills,
|
||||
manifest_ssh,
|
||||
)
|
||||
from ..manifest import Manifest
|
||||
from ._common import PROG, REPO_DIR, USER_CWD, read_tty_line
|
||||
|
||||
|
||||
@@ -57,8 +47,11 @@ def cmd_start(argv: list[str]) -> int:
|
||||
runtime_image = derived_image
|
||||
|
||||
docker_mod.require_docker()
|
||||
manifest = manifest_resolve(USER_CWD)
|
||||
manifest_require_agent(manifest, name)
|
||||
manifest = Manifest.resolve(USER_CWD)
|
||||
manifest.require_agent(name)
|
||||
agent = manifest.agents[name]
|
||||
bottle_name = agent.bottle
|
||||
bottle = manifest.bottle_for(name)
|
||||
|
||||
container = pinned_container or default_container
|
||||
suffix = 2
|
||||
@@ -81,7 +74,7 @@ def cmd_start(argv: list[str]) -> int:
|
||||
)
|
||||
|
||||
# --- Plan resolution (host-only, no container yet) ---
|
||||
env_names = manifest_env_names(manifest, name)
|
||||
env_names = list(bottle.env.keys())
|
||||
|
||||
# CLAUDE_BOTTLE_OAUTH_TOKEN → CLAUDE_CODE_OAUTH_TOKEN forwarding.
|
||||
# Host-side token is always forwarded so every container can authenticate.
|
||||
@@ -90,23 +83,14 @@ def cmd_start(argv: list[str]) -> int:
|
||||
if forward_oauth_token:
|
||||
display_env_names.append("CLAUDE_CODE_OAUTH_TOKEN")
|
||||
|
||||
skill_names = manifest_skills(manifest, name)
|
||||
if skill_names:
|
||||
skills_mod.skills_validate_all(skill_names)
|
||||
if agent.skills:
|
||||
skills_mod.skills_validate_all(list(agent.skills))
|
||||
|
||||
bottle_name = manifest_agent_bottle(manifest, name)
|
||||
if not bottle_name:
|
||||
die(
|
||||
f"agent '{name}' has no 'bottle' field. "
|
||||
f"Add a bottle association to this agent in claude-bottle.json."
|
||||
)
|
||||
manifest_require_bottle(manifest, bottle_name)
|
||||
|
||||
runtime = manifest_bottle_runtime(manifest, bottle_name)
|
||||
runtime = bottle.runtime
|
||||
if runtime == "runsc":
|
||||
docker_mod.require_runsc()
|
||||
|
||||
ssh_entries = manifest_ssh(manifest, name)
|
||||
ssh_entries = bottle.ssh
|
||||
if ssh_entries:
|
||||
ssh_mod.ssh_validate_entries(ssh_entries)
|
||||
|
||||
@@ -153,7 +137,7 @@ def cmd_start(argv: list[str]) -> int:
|
||||
|
||||
env_resolve(manifest, name, env_file, args_file)
|
||||
|
||||
prompt_content = manifest_prompt(manifest, name)
|
||||
prompt_content = agent.prompt
|
||||
prompt_file.write_text(prompt_content)
|
||||
prompt_first_line = prompt_content.splitlines()[0] if prompt_content else ""
|
||||
|
||||
@@ -169,11 +153,11 @@ def cmd_start(argv: list[str]) -> int:
|
||||
"env (names only): "
|
||||
+ (", ".join(display_env_names) if display_env_names else "(none)")
|
||||
)
|
||||
info("skills : " + (" ".join(skill_names) if skill_names else "(none)"))
|
||||
info("skills : " + (" ".join(agent.skills) if agent.skills else "(none)"))
|
||||
info(f"bottle : {bottle_name}")
|
||||
info(f" runtime : {runtime}{' (gVisor)' if runtime == 'runsc' else ''}")
|
||||
if ssh_entries:
|
||||
ssh_names = ", ".join(e.get("Host", "") for e in ssh_entries)
|
||||
ssh_names = ", ".join(e.Host for e in ssh_entries)
|
||||
info(f" ssh hosts : {ssh_names}")
|
||||
else:
|
||||
info(" ssh hosts : (none)")
|
||||
@@ -293,8 +277,8 @@ def cmd_start(argv: list[str]) -> int:
|
||||
check=True,
|
||||
)
|
||||
|
||||
if skill_names:
|
||||
skills_mod.skills_copy_into(container, skill_names)
|
||||
if agent.skills:
|
||||
skills_mod.skills_copy_into(container, list(agent.skills))
|
||||
|
||||
if ssh_entries:
|
||||
proxy_host_port = pipelock.pipelock_proxy_host_port(slug)
|
||||
|
||||
@@ -35,7 +35,7 @@ import sys
|
||||
from pathlib import Path
|
||||
|
||||
from .log import die
|
||||
from .manifest import Manifest, manifest_env_entry, manifest_env_names
|
||||
from .manifest import Manifest
|
||||
|
||||
_INTERPOLATED_RE = re.compile(r"^\$\{([A-Za-z_][A-Za-z0-9_]*)\}$")
|
||||
|
||||
@@ -108,10 +108,10 @@ def env_resolve(
|
||||
- interpolated: copy host value; export under target name; append `-e NAME`
|
||||
- literal: append `NAME=VALUE` to env_file
|
||||
"""
|
||||
for name in manifest_env_names(manifest, agent):
|
||||
bottle = manifest.bottle_for(agent)
|
||||
for name, raw in bottle.env.items():
|
||||
if not name:
|
||||
continue
|
||||
raw = manifest_env_entry(manifest, agent, name)
|
||||
kind = env_entry_kind(raw)
|
||||
if kind == "secret":
|
||||
prompt_body = env_entry_secret_prompt(raw)
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
from typing import NoReturn
|
||||
|
||||
|
||||
def info(msg: str) -> None:
|
||||
@@ -18,6 +19,6 @@ class Die(SystemExit):
|
||||
fatal exit from an unrelated SystemExit."""
|
||||
|
||||
|
||||
def die(msg: str) -> "Die":
|
||||
def die(msg: str) -> NoReturn:
|
||||
print(f"claude-bottle: error: {msg}", file=sys.stderr)
|
||||
raise Die(1)
|
||||
|
||||
+299
-169
@@ -1,5 +1,5 @@
|
||||
"""Manifest helpers. Read claude-bottle.json and pull the definition for a
|
||||
named agent.
|
||||
"""Manifest dataclasses. Read claude-bottle.json (cwd + $HOME, deep-merged)
|
||||
into a frozen, validated Manifest tree.
|
||||
|
||||
Schema (see CLAUDE.md "Intended design"):
|
||||
{
|
||||
@@ -7,7 +7,8 @@ Schema (see CLAUDE.md "Intended design"):
|
||||
"<bottle-name>": {
|
||||
"env": { "<NAME>": <env-entry>, ... },
|
||||
"ssh": [ <ssh-entry>, ... ],
|
||||
"egress": { "allowlist": [ "<hostname>", ... ] }
|
||||
"egress": { "allowlist": [ "<hostname>", ... ] },
|
||||
"runtime": "runc" | "runsc"
|
||||
}
|
||||
},
|
||||
"agents": {
|
||||
@@ -21,199 +22,328 @@ Schema (see CLAUDE.md "Intended design"):
|
||||
|
||||
Bottles group shared infrastructure (SSH keys, known hosts, egress allowlist)
|
||||
that multiple agents can reference. Every agent must reference a bottle.
|
||||
|
||||
Validation runs once at construction (Manifest.from_json_obj) so getters
|
||||
can trust the shape.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from typing import Literal, Mapping, cast
|
||||
|
||||
from .log import die
|
||||
|
||||
Manifest = dict[str, Any]
|
||||
|
||||
Runtime = Literal["runc", "runsc"]
|
||||
_SUPPORTED_RUNTIMES: tuple[Runtime, ...] = ("runc", "runsc")
|
||||
|
||||
|
||||
def manifest_resolve(cwd: str) -> Manifest:
|
||||
"""Look for claude-bottle.json in <cwd> and in $HOME, deep-merge
|
||||
them (cwd entries override home entries on key conflict for both
|
||||
bottles and agents). Dies if neither file is found or either is
|
||||
invalid JSON."""
|
||||
cwd_file = Path(cwd) / "claude-bottle.json"
|
||||
home_file = Path(os.environ["HOME"]) / "claude-bottle.json"
|
||||
|
||||
cwd_doc = _load_json_or_die(cwd_file) if cwd_file.is_file() else None
|
||||
home_doc = _load_json_or_die(home_file) if home_file.is_file() else None
|
||||
|
||||
if cwd_doc is None and home_doc is None:
|
||||
die(f"no claude-bottle.json found in {cwd} or {os.environ['HOME']}")
|
||||
|
||||
if cwd_doc is None:
|
||||
return home_doc # type: ignore[return-value]
|
||||
if home_doc is None:
|
||||
return cwd_doc
|
||||
|
||||
return {
|
||||
"bottles": {**(home_doc.get("bottles") or {}), **(cwd_doc.get("bottles") or {})},
|
||||
"agents": {**(home_doc.get("agents") or {}), **(cwd_doc.get("agents") or {})},
|
||||
}
|
||||
def _empty_str_dict() -> dict[str, str]:
|
||||
return {}
|
||||
|
||||
|
||||
def _load_json_or_die(path: Path) -> Manifest:
|
||||
try:
|
||||
with path.open() as f:
|
||||
doc = json.load(f)
|
||||
except json.JSONDecodeError:
|
||||
die(f"claude-bottle.json at {path} is not valid JSON")
|
||||
if not isinstance(doc, dict):
|
||||
die(f"claude-bottle.json at {path} must be a JSON object")
|
||||
return doc
|
||||
@dataclass(frozen=True)
|
||||
class SshEntry:
|
||||
Host: str
|
||||
IdentityFile: str
|
||||
Hostname: str = ""
|
||||
User: str = ""
|
||||
Port: str = ""
|
||||
KnownHostKey: str = ""
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, bottle_name: str, idx: int, raw: object) -> "SshEntry":
|
||||
d = _as_json_object(raw, f"bottle '{bottle_name}' ssh[{idx}]")
|
||||
host = d.get("Host")
|
||||
if not isinstance(host, str) or not host:
|
||||
die(f"bottle '{bottle_name}' ssh[{idx}] missing required string field 'Host'")
|
||||
ident = d.get("IdentityFile")
|
||||
if not isinstance(ident, str) or not ident:
|
||||
die(
|
||||
f"bottle '{bottle_name}' ssh '{host}' missing required string field "
|
||||
f"'IdentityFile'"
|
||||
)
|
||||
hostname = _opt_str(d.get("Hostname"), f"bottle '{bottle_name}' ssh '{host}' Hostname")
|
||||
user = _opt_str(d.get("User"), f"bottle '{bottle_name}' ssh '{host}' User")
|
||||
port = _opt_port(d.get("Port"), f"bottle '{bottle_name}' ssh '{host}' Port")
|
||||
khk = _opt_str(
|
||||
d.get("KnownHostKey"),
|
||||
f"bottle '{bottle_name}' ssh '{host}' KnownHostKey",
|
||||
)
|
||||
return cls(
|
||||
Host=host,
|
||||
IdentityFile=ident,
|
||||
Hostname=hostname,
|
||||
User=user,
|
||||
Port=port,
|
||||
KnownHostKey=khk,
|
||||
)
|
||||
|
||||
|
||||
def manifest_has_agent(manifest: Manifest, name: str) -> bool:
|
||||
return name in (manifest.get("agents") or {})
|
||||
@dataclass(frozen=True)
|
||||
class BottleEgress:
|
||||
allowlist: tuple[str, ...] = ()
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, bottle_name: str, raw: object) -> "BottleEgress":
|
||||
d = _as_json_object(raw, f"bottle '{bottle_name}' egress")
|
||||
allow = d.get("allowlist")
|
||||
if allow is None:
|
||||
return cls()
|
||||
if not isinstance(allow, list):
|
||||
die(
|
||||
f"bottle '{bottle_name}' egress.allowlist must be an array "
|
||||
f"(was {type(allow).__name__})"
|
||||
)
|
||||
items: list[str] = []
|
||||
allow_list = cast(list[object], allow)
|
||||
for i, host in enumerate(allow_list):
|
||||
if not isinstance(host, str):
|
||||
die(
|
||||
f"bottle '{bottle_name}' egress.allowlist[{i}] must be a string "
|
||||
f"(was {type(host).__name__})"
|
||||
)
|
||||
items.append(host)
|
||||
return cls(allowlist=tuple(items))
|
||||
|
||||
|
||||
def manifest_require_agent(manifest: Manifest, name: str) -> None:
|
||||
"""Like has_agent but dies with the available agent names listed."""
|
||||
if manifest_has_agent(manifest, name):
|
||||
return
|
||||
available = ", ".join((manifest.get("agents") or {}).keys())
|
||||
if available:
|
||||
die(f"agent '{name}' not defined in claude-bottle.json. Available: {available}")
|
||||
else:
|
||||
@dataclass(frozen=True)
|
||||
class Bottle:
|
||||
env: Mapping[str, str] = field(default_factory=_empty_str_dict)
|
||||
ssh: tuple[SshEntry, ...] = ()
|
||||
egress: BottleEgress = field(default_factory=BottleEgress)
|
||||
runtime: Runtime = "runc"
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, name: str, raw: object) -> "Bottle":
|
||||
d = _as_json_object(raw, f"bottle '{name}'")
|
||||
|
||||
env: dict[str, str] = {}
|
||||
env_raw = d.get("env")
|
||||
if env_raw is not None:
|
||||
env_dict = _as_json_object(env_raw, f"bottle '{name}' env")
|
||||
for var, value in env_dict.items():
|
||||
if not isinstance(value, str):
|
||||
die(
|
||||
f"env entry {var} in bottle '{name}' must be a JSON string "
|
||||
f"(was {type(value).__name__}). Use \"?<message>\" for prompt-at-runtime."
|
||||
)
|
||||
env[var] = value
|
||||
|
||||
ssh: tuple[SshEntry, ...] = ()
|
||||
ssh_raw = d.get("ssh")
|
||||
if ssh_raw is not None:
|
||||
if not isinstance(ssh_raw, list):
|
||||
die(f"bottle '{name}' ssh must be an array (was {type(ssh_raw).__name__})")
|
||||
ssh_list = cast(list[object], ssh_raw)
|
||||
ssh = tuple(
|
||||
SshEntry.from_dict(name, i, entry)
|
||||
for i, entry in enumerate(ssh_list)
|
||||
)
|
||||
|
||||
egress_raw = d.get("egress")
|
||||
egress = (
|
||||
BottleEgress.from_dict(name, egress_raw)
|
||||
if egress_raw is not None
|
||||
else BottleEgress()
|
||||
)
|
||||
|
||||
runtime_raw = d.get("runtime")
|
||||
runtime: Runtime
|
||||
if runtime_raw is None:
|
||||
runtime = "runc"
|
||||
else:
|
||||
if not isinstance(runtime_raw, str):
|
||||
die(f"bottle '{name}' runtime must be a string (was {type(runtime_raw).__name__})")
|
||||
if runtime_raw not in _SUPPORTED_RUNTIMES:
|
||||
die(
|
||||
f"bottle '{name}' runtime '{runtime_raw}' is not supported. "
|
||||
f"Use one of: {', '.join(_SUPPORTED_RUNTIMES)}."
|
||||
)
|
||||
runtime = runtime_raw
|
||||
|
||||
return cls(env=env, ssh=ssh, egress=egress, runtime=runtime)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Agent:
|
||||
bottle: str
|
||||
skills: tuple[str, ...] = ()
|
||||
prompt: str = ""
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, name: str, raw: object, bottle_names: set[str]) -> "Agent":
|
||||
d = _as_json_object(raw, f"agent '{name}'")
|
||||
|
||||
bottle = d.get("bottle")
|
||||
if not isinstance(bottle, str) or not bottle:
|
||||
die(f"agent '{name}' must declare a 'bottle' field naming a defined bottle")
|
||||
if bottle not in bottle_names:
|
||||
available = ", ".join(sorted(bottle_names)) or "(none defined)"
|
||||
die(
|
||||
f"agent '{name}' references bottle '{bottle}', which is not defined. "
|
||||
f"Available: {available}"
|
||||
)
|
||||
|
||||
skills: tuple[str, ...] = ()
|
||||
skills_raw = d.get("skills")
|
||||
if skills_raw is not None:
|
||||
if not isinstance(skills_raw, list):
|
||||
die(f"agent '{name}' skills must be an array (was {type(skills_raw).__name__})")
|
||||
collected: list[str] = []
|
||||
skills_list = cast(list[object], skills_raw)
|
||||
for i, skill in enumerate(skills_list):
|
||||
if not isinstance(skill, str):
|
||||
die(
|
||||
f"agent '{name}' skills[{i}] must be a string "
|
||||
f"(was {type(skill).__name__})"
|
||||
)
|
||||
collected.append(skill)
|
||||
skills = tuple(collected)
|
||||
|
||||
prompt_raw = d.get("prompt")
|
||||
if prompt_raw is None:
|
||||
prompt = ""
|
||||
elif isinstance(prompt_raw, str):
|
||||
prompt = prompt_raw
|
||||
else:
|
||||
die(f"agent '{name}' prompt must be a string (was {type(prompt_raw).__name__})")
|
||||
|
||||
return cls(bottle=bottle, skills=skills, prompt=prompt)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Manifest:
|
||||
bottles: Mapping[str, Bottle]
|
||||
agents: Mapping[str, Agent]
|
||||
|
||||
@classmethod
|
||||
def resolve(cls, cwd: str) -> "Manifest":
|
||||
"""Look for claude-bottle.json in <cwd> and in $HOME, deep-merge
|
||||
them (cwd entries override home entries on key conflict for both
|
||||
bottles and agents), then validate. Dies if neither file is
|
||||
found, either is invalid JSON, or the merged shape violates the
|
||||
schema."""
|
||||
cwd_file = Path(cwd) / "claude-bottle.json"
|
||||
home_file = Path(os.environ["HOME"]) / "claude-bottle.json"
|
||||
|
||||
cwd_doc = _load_json_or_die(cwd_file) if cwd_file.is_file() else None
|
||||
home_doc = _load_json_or_die(home_file) if home_file.is_file() else None
|
||||
|
||||
if cwd_doc is None and home_doc is None:
|
||||
die(f"no claude-bottle.json found in {cwd} or {os.environ['HOME']}")
|
||||
|
||||
h: dict[str, object] = home_doc if home_doc is not None else {}
|
||||
c: dict[str, object] = cwd_doc if cwd_doc is not None else {}
|
||||
h_bottles = _section_dict(h.get("bottles"), "bottles")
|
||||
c_bottles = _section_dict(c.get("bottles"), "bottles")
|
||||
h_agents = _section_dict(h.get("agents"), "agents")
|
||||
c_agents = _section_dict(c.get("agents"), "agents")
|
||||
merged: dict[str, object] = {
|
||||
"bottles": {**h_bottles, **c_bottles},
|
||||
"agents": {**h_agents, **c_agents},
|
||||
}
|
||||
return cls.from_json_obj(merged)
|
||||
|
||||
@classmethod
|
||||
def from_json_obj(cls, obj: object) -> "Manifest":
|
||||
"""Validate and build a Manifest from a raw JSON-like dict."""
|
||||
d = _as_json_object(obj, "manifest")
|
||||
raw_bottles = _section_dict(d.get("bottles"), "manifest 'bottles'")
|
||||
raw_agents = _section_dict(d.get("agents"), "manifest 'agents'")
|
||||
|
||||
bottles: dict[str, Bottle] = {
|
||||
n: Bottle.from_dict(n, b) for n, b in raw_bottles.items()
|
||||
}
|
||||
bottle_names = set(bottles.keys())
|
||||
agents: dict[str, Agent] = {
|
||||
n: Agent.from_dict(n, a, bottle_names) for n, a in raw_agents.items()
|
||||
}
|
||||
return cls(bottles=bottles, agents=agents)
|
||||
|
||||
def has_agent(self, name: str) -> bool:
|
||||
return name in self.agents
|
||||
|
||||
def require_agent(self, name: str) -> None:
|
||||
if self.has_agent(name):
|
||||
return
|
||||
available = ", ".join(self.agents.keys())
|
||||
if available:
|
||||
die(f"agent '{name}' not defined in claude-bottle.json. Available: {available}")
|
||||
die(f"agent '{name}' not defined in claude-bottle.json (manifest is empty).")
|
||||
|
||||
def has_bottle(self, name: str) -> bool:
|
||||
return name in self.bottles
|
||||
|
||||
def manifest_env_names(manifest: Manifest, name: str) -> list[str]:
|
||||
"""Names (not values) of bottles[agent.bottle].env, in declaration
|
||||
order. Empty list if the agent has no bottle or the bottle has no env."""
|
||||
agent = (manifest.get("agents") or {}).get(name) or {}
|
||||
bottle_name = agent.get("bottle") or ""
|
||||
if not bottle_name:
|
||||
return []
|
||||
bottle = (manifest.get("bottles") or {}).get(bottle_name) or {}
|
||||
return list((bottle.get("env") or {}).keys())
|
||||
def require_bottle(self, name: str) -> None:
|
||||
if self.has_bottle(name):
|
||||
return
|
||||
available = ", ".join(self.bottles.keys())
|
||||
if available:
|
||||
die(
|
||||
f"bottle '{name}' not defined in claude-bottle.json. "
|
||||
f"Available bottles: {available}"
|
||||
)
|
||||
die(f"bottle '{name}' not defined in claude-bottle.json (no bottles defined).")
|
||||
|
||||
def bottle_for(self, agent_name: str) -> Bottle:
|
||||
"""Resolve the Bottle the named agent references. The validator
|
||||
guarantees both lookups succeed for a manifest built via
|
||||
from_json_obj."""
|
||||
return self.bottles[self.agents[agent_name].bottle]
|
||||
|
||||
|
||||
def manifest_env_entry(manifest: Manifest, agent: str, var: str) -> str:
|
||||
"""Raw string value of one env entry. Used by env_resolve, which
|
||||
classifies the result by sentinel. Dies if the agent has no bottle,
|
||||
or the entry is not a string."""
|
||||
agent_def = (manifest.get("agents") or {}).get(agent) or {}
|
||||
bottle_name = agent_def.get("bottle") or ""
|
||||
if not bottle_name:
|
||||
die(f"env entry {var} for agent {agent}: agent has no 'bottle' field")
|
||||
bottle = (manifest.get("bottles") or {}).get(bottle_name) or {}
|
||||
env = bottle.get("env") or {}
|
||||
value = env.get(var)
|
||||
def _as_json_object(value: object, label: str) -> dict[str, object]:
|
||||
"""Assert that `value` is a JSON object (str-keyed dict) and return
|
||||
a view typed as `dict[str, object]` so downstream `.get(...)` calls
|
||||
have a typed surface."""
|
||||
if not isinstance(value, dict):
|
||||
die(f"{label} must be a JSON object (was {type(value).__name__})")
|
||||
items = cast(dict[object, object], value)
|
||||
out: dict[str, object] = {}
|
||||
for k, v in items.items():
|
||||
if not isinstance(k, str):
|
||||
die(f"{label} keys must be strings (found {type(k).__name__})")
|
||||
out[k] = v
|
||||
return out
|
||||
|
||||
|
||||
def _section_dict(value: object, label: str) -> dict[str, object]:
|
||||
"""Like _as_json_object but treats absent/null as an empty section."""
|
||||
if value is None:
|
||||
return {}
|
||||
return _as_json_object(value, label)
|
||||
|
||||
|
||||
def _load_json_or_die(path: Path) -> dict[str, object]:
|
||||
try:
|
||||
with path.open() as f:
|
||||
doc: object = json.load(f)
|
||||
except json.JSONDecodeError:
|
||||
die(f"claude-bottle.json at {path} is not valid JSON")
|
||||
return _as_json_object(doc, f"claude-bottle.json at {path}")
|
||||
|
||||
|
||||
def _opt_str(value: object, label: str) -> str:
|
||||
if value is None:
|
||||
return ""
|
||||
if not isinstance(value, str):
|
||||
actual = _json_type(value)
|
||||
die(
|
||||
f"env entry {var} for agent {agent} must be a JSON string "
|
||||
f"(was {actual}). Use \"?<message>\" for prompt-at-runtime."
|
||||
)
|
||||
die(f"{label} must be a string (was {type(value).__name__})")
|
||||
return value
|
||||
|
||||
|
||||
def manifest_skills(manifest: Manifest, name: str) -> list[str]:
|
||||
agent = (manifest.get("agents") or {}).get(name) or {}
|
||||
return list(agent.get("skills") or [])
|
||||
|
||||
|
||||
def manifest_prompt(manifest: Manifest, name: str) -> str:
|
||||
agent = (manifest.get("agents") or {}).get(name) or {}
|
||||
return agent.get("prompt") or ""
|
||||
|
||||
|
||||
def manifest_agent_bottle(manifest: Manifest, name: str) -> str:
|
||||
agent = (manifest.get("agents") or {}).get(name) or {}
|
||||
return agent.get("bottle") or ""
|
||||
|
||||
|
||||
def manifest_has_bottle(manifest: Manifest, bottle_name: str) -> bool:
|
||||
return bottle_name in (manifest.get("bottles") or {})
|
||||
|
||||
|
||||
def manifest_require_bottle(manifest: Manifest, bottle_name: str) -> None:
|
||||
if manifest_has_bottle(manifest, bottle_name):
|
||||
return
|
||||
available = ", ".join((manifest.get("bottles") or {}).keys())
|
||||
if available:
|
||||
die(
|
||||
f"bottle '{bottle_name}' not defined in claude-bottle.json. "
|
||||
f"Available bottles: {available}"
|
||||
)
|
||||
else:
|
||||
die(f"bottle '{bottle_name}' not defined in claude-bottle.json (no bottles defined).")
|
||||
|
||||
|
||||
def manifest_bottle_ssh(manifest: Manifest, bottle_name: str) -> list[dict[str, Any]]:
|
||||
bottle = (manifest.get("bottles") or {}).get(bottle_name) or {}
|
||||
return list(bottle.get("ssh") or [])
|
||||
|
||||
|
||||
_SUPPORTED_RUNTIMES: tuple[str, ...] = ("runc", "runsc")
|
||||
|
||||
|
||||
def manifest_bottle_runtime(manifest: Manifest, bottle_name: str) -> str:
|
||||
"""Container runtime for the bottle's agent container. Returns
|
||||
"runc" (Docker default) or "runsc" (gVisor opt-in). Dies if the
|
||||
field is present but not one of the supported values."""
|
||||
bottle = (manifest.get("bottles") or {}).get(bottle_name) or {}
|
||||
raw = bottle.get("runtime")
|
||||
if raw is None:
|
||||
return "runc"
|
||||
if not isinstance(raw, str):
|
||||
die(
|
||||
f"bottle '{bottle_name}' runtime must be a string "
|
||||
f"(was {_json_type(raw)})."
|
||||
)
|
||||
if raw not in _SUPPORTED_RUNTIMES:
|
||||
die(
|
||||
f"bottle '{bottle_name}' runtime '{raw}' is not supported. "
|
||||
f"Use one of: {', '.join(_SUPPORTED_RUNTIMES)}."
|
||||
)
|
||||
return raw
|
||||
|
||||
|
||||
def manifest_bottle_egress_allowlist(manifest: Manifest, bottle_name: str) -> list[str]:
|
||||
"""Hostnames in bottles[bottle_name].egress.allowlist. Dies if the
|
||||
field is present but not an array. Per-element string typing is
|
||||
re-checked at use-time in pipelock."""
|
||||
bottle = (manifest.get("bottles") or {}).get(bottle_name) or {}
|
||||
allowlist = (bottle.get("egress") or {}).get("allowlist")
|
||||
if allowlist is None:
|
||||
return []
|
||||
if not isinstance(allowlist, list):
|
||||
die(
|
||||
f"bottle '{bottle_name}' egress.allowlist must be an array "
|
||||
f"(was {_json_type(allowlist)})."
|
||||
)
|
||||
return list(allowlist)
|
||||
|
||||
|
||||
def manifest_ssh(manifest: Manifest, agent_name: str) -> list[dict[str, Any]]:
|
||||
"""SSH entries resolved via the agent's "bottle" field; empty if no bottle set."""
|
||||
bottle_name = manifest_agent_bottle(manifest, agent_name)
|
||||
if not bottle_name:
|
||||
return []
|
||||
return manifest_bottle_ssh(manifest, bottle_name)
|
||||
|
||||
|
||||
def _json_type(value: Any) -> str:
|
||||
"""Mirror jq's type names for parity with the bash error messages."""
|
||||
def _opt_port(value: object, label: str) -> str:
|
||||
"""Port accepts string or int (JSON-friendly) and is normalized to str."""
|
||||
if value is None:
|
||||
return "null"
|
||||
return ""
|
||||
if isinstance(value, bool):
|
||||
return "boolean"
|
||||
if isinstance(value, (int, float)):
|
||||
return "number"
|
||||
die(f"{label} must be a string or number (was boolean)")
|
||||
if isinstance(value, int):
|
||||
return str(value)
|
||||
if isinstance(value, str):
|
||||
return "string"
|
||||
if isinstance(value, list):
|
||||
return "array"
|
||||
if isinstance(value, dict):
|
||||
return "object"
|
||||
return type(value).__name__
|
||||
return value
|
||||
die(f"{label} must be a string or number (was {type(value).__name__})")
|
||||
|
||||
@@ -18,7 +18,7 @@ import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
from .log import die, info, warn
|
||||
from .manifest import Manifest, manifest_bottle_egress_allowlist, manifest_bottle_ssh
|
||||
from .manifest import Manifest
|
||||
|
||||
# Pipelock image, pinned by digest. The digest is the multi-arch image
|
||||
# index for ghcr.io/luckypipewrench/pipelock:2.3.0.
|
||||
@@ -58,23 +58,12 @@ def pipelock_proxy_host_port(slug: str) -> str:
|
||||
|
||||
|
||||
def pipelock_bottle_allowlist(manifest: Manifest, bottle_name: str) -> list[str]:
|
||||
"""Hostnames in bottles[<bottle_name>].egress.allowlist. Validates
|
||||
that each entry is a string."""
|
||||
raw = manifest_bottle_egress_allowlist(manifest, bottle_name)
|
||||
for entry in raw:
|
||||
if not isinstance(entry, str):
|
||||
t = _json_type(entry)
|
||||
die(f"bottle '{bottle_name}' egress.allowlist must contain only strings; found a '{t}' entry.")
|
||||
return list(raw)
|
||||
"""Hostnames in bottles[<bottle_name>].egress.allowlist."""
|
||||
return list(manifest.bottles[bottle_name].egress.allowlist)
|
||||
|
||||
|
||||
def pipelock_bottle_ssh_hostnames(manifest: Manifest, bottle_name: str) -> list[str]:
|
||||
out: list[str] = []
|
||||
for entry in manifest_bottle_ssh(manifest, bottle_name):
|
||||
h = entry.get("Hostname") or ""
|
||||
if h:
|
||||
out.append(h)
|
||||
return out
|
||||
return [e.Hostname for e in manifest.bottles[bottle_name].ssh if e.Hostname]
|
||||
|
||||
|
||||
_IPV4_RE = re.compile(r"^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+$")
|
||||
@@ -256,19 +245,3 @@ def pipelock_stop(slug: str) -> None:
|
||||
stderr=subprocess.DEVNULL,
|
||||
).returncode != 0:
|
||||
warn(f"failed to remove pipelock sidecar {name}; clean up with 'docker rm -f {name}'")
|
||||
|
||||
|
||||
def _json_type(value: object) -> str:
|
||||
if value is None:
|
||||
return "null"
|
||||
if isinstance(value, bool):
|
||||
return "boolean"
|
||||
if isinstance(value, (int, float)):
|
||||
return "number"
|
||||
if isinstance(value, str):
|
||||
return "string"
|
||||
if isinstance(value, list):
|
||||
return "array"
|
||||
if isinstance(value, dict):
|
||||
return "object"
|
||||
return type(value).__name__
|
||||
|
||||
+14
-19
@@ -36,31 +36,26 @@ from __future__ import annotations
|
||||
import os
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from typing import Sequence
|
||||
|
||||
from .log import die, info
|
||||
from .manifest import SshEntry
|
||||
|
||||
|
||||
def ssh_validate_entries(entries: list[dict[str, Any]]) -> None:
|
||||
"""Each entry must have Host + IdentityFile, and the IdentityFile
|
||||
must exist on the host (after expanding leading ~)."""
|
||||
def ssh_validate_entries(entries: Sequence[SshEntry]) -> None:
|
||||
"""The IdentityFile must exist on the host (after expanding leading ~).
|
||||
Host and IdentityFile shape are already enforced by Manifest validation."""
|
||||
for entry in entries:
|
||||
name = entry.get("Host", "")
|
||||
key = entry.get("IdentityFile", "")
|
||||
if not name:
|
||||
die(f"ssh entry missing required field 'Host': {entry}")
|
||||
if not key:
|
||||
die(f"ssh entry '{name}' missing required field 'IdentityFile'")
|
||||
key = _expand_tilde(key)
|
||||
key = _expand_tilde(entry.IdentityFile)
|
||||
if not os.path.isfile(key):
|
||||
die(f"ssh key file not found for host '{name}': {key}")
|
||||
die(f"ssh key file not found for host '{entry.Host}': {key}")
|
||||
|
||||
|
||||
def ssh_setup(
|
||||
container: str,
|
||||
stage_dir: Path,
|
||||
proxy_host_port: str,
|
||||
entries: list[dict[str, Any]],
|
||||
entries: Sequence[SshEntry],
|
||||
) -> None:
|
||||
"""Set up SSH in the container so node can authenticate using each
|
||||
entry's key without the key file being readable by node."""
|
||||
@@ -91,12 +86,12 @@ def ssh_setup(
|
||||
|
||||
container_key_paths: list[str] = []
|
||||
for entry in entries:
|
||||
name = entry["Host"]
|
||||
key = _expand_tilde(entry["IdentityFile"])
|
||||
hostname = entry["Hostname"]
|
||||
user = entry["User"]
|
||||
port = str(entry["Port"])
|
||||
known_host_key = entry.get("KnownHostKey", "")
|
||||
name = entry.Host
|
||||
key = _expand_tilde(entry.IdentityFile)
|
||||
hostname = entry.Hostname
|
||||
user = entry.User
|
||||
port = entry.Port
|
||||
known_host_key = entry.KnownHostKey
|
||||
|
||||
key_basename = os.path.basename(key)
|
||||
container_key_path = f"{keys_dir}/{key_basename}"
|
||||
|
||||
+27
-8
@@ -1,4 +1,7 @@
|
||||
"""Manifest fixtures for the test suite."""
|
||||
"""Manifest fixtures for the test suite. Each fixture returns a built
|
||||
Manifest dataclass; callers that need the raw JSON shape (e.g. to write
|
||||
to a file on disk) can build it themselves or call .from_json_obj on
|
||||
a dict literal in the test."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
@@ -7,9 +10,11 @@ import tempfile
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from claude_bottle.manifest import Manifest
|
||||
|
||||
def fixture_minimal() -> dict[str, Any]:
|
||||
"""One bottle, one agent, no env / ssh / skills."""
|
||||
|
||||
def fixture_minimal_dict() -> dict[str, Any]:
|
||||
"""One bottle, one agent, no env / ssh / skills. JSON shape."""
|
||||
return {
|
||||
"bottles": {"dev": {}},
|
||||
"agents": {
|
||||
@@ -18,8 +23,8 @@ def fixture_minimal() -> dict[str, Any]:
|
||||
}
|
||||
|
||||
|
||||
def fixture_with_egress() -> dict[str, Any]:
|
||||
"""Bottle declares an egress.allowlist."""
|
||||
def fixture_with_egress_dict() -> dict[str, Any]:
|
||||
"""Bottle declares an egress.allowlist. JSON shape."""
|
||||
return {
|
||||
"bottles": {
|
||||
"dev": {
|
||||
@@ -32,9 +37,9 @@ def fixture_with_egress() -> dict[str, Any]:
|
||||
}
|
||||
|
||||
|
||||
def fixture_with_ssh() -> dict[str, Any]:
|
||||
def fixture_with_ssh_dict() -> dict[str, Any]:
|
||||
"""Bottle has both an IPv4-literal SSH host (CGNAT) and a hostname host,
|
||||
exercising both ssrf.ip_allowlist and trusted_domains code paths."""
|
||||
exercising both ssrf.ip_allowlist and trusted_domains code paths. JSON shape."""
|
||||
return {
|
||||
"bottles": {
|
||||
"dev": {
|
||||
@@ -60,8 +65,22 @@ def fixture_with_ssh() -> dict[str, Any]:
|
||||
}
|
||||
|
||||
|
||||
def fixture_minimal() -> Manifest:
|
||||
return Manifest.from_json_obj(fixture_minimal_dict())
|
||||
|
||||
|
||||
def fixture_with_egress() -> Manifest:
|
||||
return Manifest.from_json_obj(fixture_with_egress_dict())
|
||||
|
||||
|
||||
def fixture_with_ssh() -> Manifest:
|
||||
return Manifest.from_json_obj(fixture_with_ssh_dict())
|
||||
|
||||
|
||||
def write_fixture(fn) -> Path:
|
||||
"""Write fixture dict to a temp file; return the path. Caller must rm."""
|
||||
"""Write fixture JSON to a temp file; return the path. Caller must rm.
|
||||
Accepts a function returning either a dict (JSON shape) or a Manifest;
|
||||
only the dict form is supported here since we need to serialize."""
|
||||
f = tempfile.NamedTemporaryFile(
|
||||
mode="w", suffix=".json", delete=False, encoding="utf-8"
|
||||
)
|
||||
|
||||
@@ -1,15 +1,18 @@
|
||||
"""Unit: manifest_bottle_runtime — defaults to runc, accepts runsc,
|
||||
rejects unknown values and non-strings."""
|
||||
"""Unit: bottle runtime — Manifest.from_json_obj defaults runtime to runc,
|
||||
accepts runsc, and rejects unknown values, non-strings, and empty strings."""
|
||||
|
||||
import unittest
|
||||
|
||||
from claude_bottle.log import Die
|
||||
from claude_bottle.manifest import manifest_bottle_runtime
|
||||
from claude_bottle.manifest import Manifest
|
||||
|
||||
|
||||
def _bottle(runtime_value: object | None) -> dict:
|
||||
"""Build a minimal manifest with one bottle whose runtime field is
|
||||
set (or absent if `runtime_value is _ABSENT`)."""
|
||||
_ABSENT = object()
|
||||
|
||||
|
||||
def _bottle(runtime_value: object) -> dict:
|
||||
"""Build a minimal manifest JSON shape with one bottle whose runtime
|
||||
field is set (or absent if `runtime_value is _ABSENT`)."""
|
||||
bottle: dict = {}
|
||||
if runtime_value is not _ABSENT:
|
||||
bottle["runtime"] = runtime_value
|
||||
@@ -19,30 +22,30 @@ def _bottle(runtime_value: object | None) -> dict:
|
||||
}
|
||||
|
||||
|
||||
_ABSENT = object()
|
||||
|
||||
|
||||
class TestManifestBottleRuntime(unittest.TestCase):
|
||||
def test_default_runc_when_absent(self):
|
||||
self.assertEqual("runc", manifest_bottle_runtime(_bottle(_ABSENT), "dev"))
|
||||
m = Manifest.from_json_obj(_bottle(_ABSENT))
|
||||
self.assertEqual("runc", m.bottles["dev"].runtime)
|
||||
|
||||
def test_explicit_runc(self):
|
||||
self.assertEqual("runc", manifest_bottle_runtime(_bottle("runc"), "dev"))
|
||||
m = Manifest.from_json_obj(_bottle("runc"))
|
||||
self.assertEqual("runc", m.bottles["dev"].runtime)
|
||||
|
||||
def test_explicit_runsc(self):
|
||||
self.assertEqual("runsc", manifest_bottle_runtime(_bottle("runsc"), "dev"))
|
||||
m = Manifest.from_json_obj(_bottle("runsc"))
|
||||
self.assertEqual("runsc", m.bottles["dev"].runtime)
|
||||
|
||||
def test_rejects_unknown_runtime(self):
|
||||
with self.assertRaises(Die):
|
||||
manifest_bottle_runtime(_bottle("kata-runtime"), "dev")
|
||||
Manifest.from_json_obj(_bottle("kata-runtime"))
|
||||
|
||||
def test_rejects_non_string(self):
|
||||
with self.assertRaises(Die):
|
||||
manifest_bottle_runtime(_bottle(42), "dev")
|
||||
Manifest.from_json_obj(_bottle(42))
|
||||
|
||||
def test_rejects_empty_string(self):
|
||||
with self.assertRaises(Die):
|
||||
manifest_bottle_runtime(_bottle(""), "dev")
|
||||
Manifest.from_json_obj(_bottle(""))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -5,6 +5,7 @@ pipelock_bottle_ssh_trusted_domains, pipelock_effective_allowlist."""
|
||||
import unittest
|
||||
|
||||
from claude_bottle.log import Die
|
||||
from claude_bottle.manifest import Manifest
|
||||
from claude_bottle.pipelock import (
|
||||
pipelock_bottle_allowlist,
|
||||
pipelock_bottle_ssh_hostnames,
|
||||
@@ -32,7 +33,7 @@ class TestBottleAllowlist(unittest.TestCase):
|
||||
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
|
||||
}
|
||||
with self.assertRaises(Die):
|
||||
pipelock_bottle_allowlist(bad, "dev")
|
||||
Manifest.from_json_obj(bad)
|
||||
|
||||
|
||||
class TestSSHHostnames(unittest.TestCase):
|
||||
@@ -54,7 +55,7 @@ class TestSSHHostnames(unittest.TestCase):
|
||||
|
||||
class TestEffectiveAllowlist(unittest.TestCase):
|
||||
def test_union_and_dedup(self):
|
||||
manifest = {
|
||||
manifest = Manifest.from_json_obj({
|
||||
"bottles": {
|
||||
"dev": {
|
||||
"egress": {"allowlist": ["registry.npmjs.org"]},
|
||||
@@ -67,7 +68,7 @@ class TestEffectiveAllowlist(unittest.TestCase):
|
||||
}
|
||||
},
|
||||
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
|
||||
}
|
||||
})
|
||||
eff = pipelock_effective_allowlist(manifest, "dev")
|
||||
self.assertIn("api.anthropic.com", eff)
|
||||
self.assertIn("registry.npmjs.org", eff)
|
||||
|
||||
@@ -7,6 +7,7 @@ import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
from claude_bottle.manifest import Manifest
|
||||
from claude_bottle.pipelock import pipelock_write_yaml
|
||||
from tests.fixtures import fixture_minimal, fixture_with_ssh
|
||||
|
||||
@@ -50,7 +51,7 @@ class TestPipelockYaml(unittest.TestCase):
|
||||
self.assertIn("100.78.141.42", content)
|
||||
|
||||
def test_secret_hygiene(self):
|
||||
manifest = {
|
||||
manifest = Manifest.from_json_obj({
|
||||
"bottles": {
|
||||
"dev": {
|
||||
"env": {
|
||||
@@ -61,7 +62,7 @@ class TestPipelockYaml(unittest.TestCase):
|
||||
}
|
||||
},
|
||||
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
|
||||
}
|
||||
})
|
||||
yaml_path = self.out_dir / "secret.yaml"
|
||||
pipelock_write_yaml(manifest, "dev", yaml_path)
|
||||
content = yaml_path.read_text()
|
||||
|
||||
Reference in New Issue
Block a user