Merge pull request 'refactor(manifest): convert to frozen dataclasses' (#4) from convert-manifest-to-dataclass into main
test / run tests/run_tests.py (push) Successful in 16s
test / run tests/run_tests.py (push) Successful in 16s
This commit was merged in pull request #4.
This commit is contained in:
+20
-34
@@ -5,15 +5,7 @@ from __future__ import annotations
|
|||||||
import argparse
|
import argparse
|
||||||
|
|
||||||
from ..log import info
|
from ..log import info
|
||||||
from ..manifest import (
|
from ..manifest import Manifest
|
||||||
manifest_agent_bottle,
|
|
||||||
manifest_env_names,
|
|
||||||
manifest_prompt,
|
|
||||||
manifest_require_agent,
|
|
||||||
manifest_resolve,
|
|
||||||
manifest_skills,
|
|
||||||
manifest_ssh,
|
|
||||||
)
|
|
||||||
from ._common import PROG, USER_CWD
|
from ._common import PROG, USER_CWD
|
||||||
|
|
||||||
|
|
||||||
@@ -22,39 +14,33 @@ def cmd_info(argv: list[str]) -> int:
|
|||||||
parser.add_argument("name", help="agent name defined in claude-bottle.json")
|
parser.add_argument("name", help="agent name defined in claude-bottle.json")
|
||||||
args = parser.parse_args(argv)
|
args = parser.parse_args(argv)
|
||||||
|
|
||||||
manifest = manifest_resolve(USER_CWD)
|
manifest = Manifest.resolve(USER_CWD)
|
||||||
manifest_require_agent(manifest, args.name)
|
manifest.require_agent(args.name)
|
||||||
|
|
||||||
env_names = manifest_env_names(manifest, args.name)
|
agent = manifest.agents[args.name]
|
||||||
skill_names = manifest_skills(manifest, args.name)
|
bottle = manifest.bottle_for(args.name)
|
||||||
prompt_content = manifest_prompt(manifest, args.name)
|
env_names = list(bottle.env.keys())
|
||||||
prompt_first_line = prompt_content.splitlines()[0] if prompt_content else ""
|
prompt_first_line = agent.prompt.splitlines()[0] if agent.prompt else ""
|
||||||
|
|
||||||
bottle_name = manifest_agent_bottle(manifest, args.name)
|
|
||||||
ssh_entries = manifest_ssh(manifest, args.name)
|
|
||||||
|
|
||||||
print()
|
print()
|
||||||
info(f"agent : {args.name}")
|
info(f"agent : {args.name}")
|
||||||
info(f"env (names only): {', '.join(env_names) if env_names else '(none)'}")
|
info(f"env (names only): {', '.join(env_names) if env_names else '(none)'}")
|
||||||
info(f"skills : {' '.join(skill_names) if skill_names else '(none)'}")
|
info(f"skills : {' '.join(agent.skills) if agent.skills else '(none)'}")
|
||||||
info(
|
info(
|
||||||
f"prompt : {len(prompt_content)} chars; "
|
f"prompt : {len(agent.prompt)} chars; "
|
||||||
f"first line: {prompt_first_line or '(empty)'}"
|
f"first line: {prompt_first_line or '(empty)'}"
|
||||||
)
|
)
|
||||||
if bottle_name:
|
info(f"bottle : {agent.bottle}")
|
||||||
info(f"bottle : {bottle_name}")
|
if bottle.ssh:
|
||||||
if ssh_entries:
|
for e in bottle.ssh:
|
||||||
for e in ssh_entries:
|
info(
|
||||||
info(
|
f" ssh host : {e.Host} "
|
||||||
f" ssh host : {e.get('Host')} "
|
f"(Hostname={e.Hostname}, User={e.User}, "
|
||||||
f"(Hostname={e.get('Hostname')}, User={e.get('User')}, "
|
f"Port={e.Port}, IdentityFile={e.IdentityFile})"
|
||||||
f"Port={e.get('Port')}, IdentityFile={e.get('IdentityFile')})"
|
)
|
||||||
)
|
if e.KnownHostKey:
|
||||||
if e.get("KnownHostKey"):
|
info(f" KnownHostKey: {e.KnownHostKey}")
|
||||||
info(f" KnownHostKey: {e['KnownHostKey']}")
|
|
||||||
else:
|
|
||||||
info(" ssh hosts : (none)")
|
|
||||||
else:
|
else:
|
||||||
info("bottle : (none)")
|
info(" ssh hosts : (none)")
|
||||||
print()
|
print()
|
||||||
return 0
|
return 0
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ import subprocess
|
|||||||
|
|
||||||
from .. import docker as docker_mod
|
from .. import docker as docker_mod
|
||||||
from ..log import info
|
from ..log import info
|
||||||
from ..manifest import manifest_resolve
|
from ..manifest import Manifest
|
||||||
from ._common import PROG, USER_CWD
|
from ._common import PROG, USER_CWD
|
||||||
|
|
||||||
|
|
||||||
@@ -17,8 +17,8 @@ def cmd_list(argv: list[str]) -> int:
|
|||||||
args = parser.parse_args(argv)
|
args = parser.parse_args(argv)
|
||||||
|
|
||||||
if args.scope == "available":
|
if args.scope == "available":
|
||||||
manifest = manifest_resolve(USER_CWD)
|
manifest = Manifest.resolve(USER_CWD)
|
||||||
for name in (manifest.get("agents") or {}).keys():
|
for name in manifest.agents.keys():
|
||||||
print(name)
|
print(name)
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
|||||||
+16
-32
@@ -19,17 +19,7 @@ from .. import skills as skills_mod
|
|||||||
from .. import ssh as ssh_mod
|
from .. import ssh as ssh_mod
|
||||||
from ..env_resolve import env_resolve
|
from ..env_resolve import env_resolve
|
||||||
from ..log import die, info
|
from ..log import die, info
|
||||||
from ..manifest import (
|
from ..manifest import Manifest
|
||||||
manifest_agent_bottle,
|
|
||||||
manifest_bottle_runtime,
|
|
||||||
manifest_env_names,
|
|
||||||
manifest_prompt,
|
|
||||||
manifest_require_agent,
|
|
||||||
manifest_require_bottle,
|
|
||||||
manifest_resolve,
|
|
||||||
manifest_skills,
|
|
||||||
manifest_ssh,
|
|
||||||
)
|
|
||||||
from ._common import PROG, REPO_DIR, USER_CWD, read_tty_line
|
from ._common import PROG, REPO_DIR, USER_CWD, read_tty_line
|
||||||
|
|
||||||
|
|
||||||
@@ -57,8 +47,11 @@ def cmd_start(argv: list[str]) -> int:
|
|||||||
runtime_image = derived_image
|
runtime_image = derived_image
|
||||||
|
|
||||||
docker_mod.require_docker()
|
docker_mod.require_docker()
|
||||||
manifest = manifest_resolve(USER_CWD)
|
manifest = Manifest.resolve(USER_CWD)
|
||||||
manifest_require_agent(manifest, name)
|
manifest.require_agent(name)
|
||||||
|
agent = manifest.agents[name]
|
||||||
|
bottle_name = agent.bottle
|
||||||
|
bottle = manifest.bottle_for(name)
|
||||||
|
|
||||||
container = pinned_container or default_container
|
container = pinned_container or default_container
|
||||||
suffix = 2
|
suffix = 2
|
||||||
@@ -81,7 +74,7 @@ def cmd_start(argv: list[str]) -> int:
|
|||||||
)
|
)
|
||||||
|
|
||||||
# --- Plan resolution (host-only, no container yet) ---
|
# --- Plan resolution (host-only, no container yet) ---
|
||||||
env_names = manifest_env_names(manifest, name)
|
env_names = list(bottle.env.keys())
|
||||||
|
|
||||||
# CLAUDE_BOTTLE_OAUTH_TOKEN → CLAUDE_CODE_OAUTH_TOKEN forwarding.
|
# CLAUDE_BOTTLE_OAUTH_TOKEN → CLAUDE_CODE_OAUTH_TOKEN forwarding.
|
||||||
# Host-side token is always forwarded so every container can authenticate.
|
# Host-side token is always forwarded so every container can authenticate.
|
||||||
@@ -90,23 +83,14 @@ def cmd_start(argv: list[str]) -> int:
|
|||||||
if forward_oauth_token:
|
if forward_oauth_token:
|
||||||
display_env_names.append("CLAUDE_CODE_OAUTH_TOKEN")
|
display_env_names.append("CLAUDE_CODE_OAUTH_TOKEN")
|
||||||
|
|
||||||
skill_names = manifest_skills(manifest, name)
|
if agent.skills:
|
||||||
if skill_names:
|
skills_mod.skills_validate_all(list(agent.skills))
|
||||||
skills_mod.skills_validate_all(skill_names)
|
|
||||||
|
|
||||||
bottle_name = manifest_agent_bottle(manifest, name)
|
runtime = bottle.runtime
|
||||||
if not bottle_name:
|
|
||||||
die(
|
|
||||||
f"agent '{name}' has no 'bottle' field. "
|
|
||||||
f"Add a bottle association to this agent in claude-bottle.json."
|
|
||||||
)
|
|
||||||
manifest_require_bottle(manifest, bottle_name)
|
|
||||||
|
|
||||||
runtime = manifest_bottle_runtime(manifest, bottle_name)
|
|
||||||
if runtime == "runsc":
|
if runtime == "runsc":
|
||||||
docker_mod.require_runsc()
|
docker_mod.require_runsc()
|
||||||
|
|
||||||
ssh_entries = manifest_ssh(manifest, name)
|
ssh_entries = bottle.ssh
|
||||||
if ssh_entries:
|
if ssh_entries:
|
||||||
ssh_mod.ssh_validate_entries(ssh_entries)
|
ssh_mod.ssh_validate_entries(ssh_entries)
|
||||||
|
|
||||||
@@ -153,7 +137,7 @@ def cmd_start(argv: list[str]) -> int:
|
|||||||
|
|
||||||
env_resolve(manifest, name, env_file, args_file)
|
env_resolve(manifest, name, env_file, args_file)
|
||||||
|
|
||||||
prompt_content = manifest_prompt(manifest, name)
|
prompt_content = agent.prompt
|
||||||
prompt_file.write_text(prompt_content)
|
prompt_file.write_text(prompt_content)
|
||||||
prompt_first_line = prompt_content.splitlines()[0] if prompt_content else ""
|
prompt_first_line = prompt_content.splitlines()[0] if prompt_content else ""
|
||||||
|
|
||||||
@@ -169,11 +153,11 @@ def cmd_start(argv: list[str]) -> int:
|
|||||||
"env (names only): "
|
"env (names only): "
|
||||||
+ (", ".join(display_env_names) if display_env_names else "(none)")
|
+ (", ".join(display_env_names) if display_env_names else "(none)")
|
||||||
)
|
)
|
||||||
info("skills : " + (" ".join(skill_names) if skill_names else "(none)"))
|
info("skills : " + (" ".join(agent.skills) if agent.skills else "(none)"))
|
||||||
info(f"bottle : {bottle_name}")
|
info(f"bottle : {bottle_name}")
|
||||||
info(f" runtime : {runtime}{' (gVisor)' if runtime == 'runsc' else ''}")
|
info(f" runtime : {runtime}{' (gVisor)' if runtime == 'runsc' else ''}")
|
||||||
if ssh_entries:
|
if ssh_entries:
|
||||||
ssh_names = ", ".join(e.get("Host", "") for e in ssh_entries)
|
ssh_names = ", ".join(e.Host for e in ssh_entries)
|
||||||
info(f" ssh hosts : {ssh_names}")
|
info(f" ssh hosts : {ssh_names}")
|
||||||
else:
|
else:
|
||||||
info(" ssh hosts : (none)")
|
info(" ssh hosts : (none)")
|
||||||
@@ -293,8 +277,8 @@ def cmd_start(argv: list[str]) -> int:
|
|||||||
check=True,
|
check=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
if skill_names:
|
if agent.skills:
|
||||||
skills_mod.skills_copy_into(container, skill_names)
|
skills_mod.skills_copy_into(container, list(agent.skills))
|
||||||
|
|
||||||
if ssh_entries:
|
if ssh_entries:
|
||||||
proxy_host_port = pipelock.pipelock_proxy_host_port(slug)
|
proxy_host_port = pipelock.pipelock_proxy_host_port(slug)
|
||||||
|
|||||||
@@ -35,7 +35,7 @@ import sys
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from .log import die
|
from .log import die
|
||||||
from .manifest import Manifest, manifest_env_entry, manifest_env_names
|
from .manifest import Manifest
|
||||||
|
|
||||||
_INTERPOLATED_RE = re.compile(r"^\$\{([A-Za-z_][A-Za-z0-9_]*)\}$")
|
_INTERPOLATED_RE = re.compile(r"^\$\{([A-Za-z_][A-Za-z0-9_]*)\}$")
|
||||||
|
|
||||||
@@ -108,10 +108,10 @@ def env_resolve(
|
|||||||
- interpolated: copy host value; export under target name; append `-e NAME`
|
- interpolated: copy host value; export under target name; append `-e NAME`
|
||||||
- literal: append `NAME=VALUE` to env_file
|
- literal: append `NAME=VALUE` to env_file
|
||||||
"""
|
"""
|
||||||
for name in manifest_env_names(manifest, agent):
|
bottle = manifest.bottle_for(agent)
|
||||||
|
for name, raw in bottle.env.items():
|
||||||
if not name:
|
if not name:
|
||||||
continue
|
continue
|
||||||
raw = manifest_env_entry(manifest, agent, name)
|
|
||||||
kind = env_entry_kind(raw)
|
kind = env_entry_kind(raw)
|
||||||
if kind == "secret":
|
if kind == "secret":
|
||||||
prompt_body = env_entry_secret_prompt(raw)
|
prompt_body = env_entry_secret_prompt(raw)
|
||||||
|
|||||||
@@ -3,6 +3,7 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
|
from typing import NoReturn
|
||||||
|
|
||||||
|
|
||||||
def info(msg: str) -> None:
|
def info(msg: str) -> None:
|
||||||
@@ -18,6 +19,6 @@ class Die(SystemExit):
|
|||||||
fatal exit from an unrelated SystemExit."""
|
fatal exit from an unrelated SystemExit."""
|
||||||
|
|
||||||
|
|
||||||
def die(msg: str) -> "Die":
|
def die(msg: str) -> NoReturn:
|
||||||
print(f"claude-bottle: error: {msg}", file=sys.stderr)
|
print(f"claude-bottle: error: {msg}", file=sys.stderr)
|
||||||
raise Die(1)
|
raise Die(1)
|
||||||
|
|||||||
+299
-169
@@ -1,5 +1,5 @@
|
|||||||
"""Manifest helpers. Read claude-bottle.json and pull the definition for a
|
"""Manifest dataclasses. Read claude-bottle.json (cwd + $HOME, deep-merged)
|
||||||
named agent.
|
into a frozen, validated Manifest tree.
|
||||||
|
|
||||||
Schema (see CLAUDE.md "Intended design"):
|
Schema (see CLAUDE.md "Intended design"):
|
||||||
{
|
{
|
||||||
@@ -7,7 +7,8 @@ Schema (see CLAUDE.md "Intended design"):
|
|||||||
"<bottle-name>": {
|
"<bottle-name>": {
|
||||||
"env": { "<NAME>": <env-entry>, ... },
|
"env": { "<NAME>": <env-entry>, ... },
|
||||||
"ssh": [ <ssh-entry>, ... ],
|
"ssh": [ <ssh-entry>, ... ],
|
||||||
"egress": { "allowlist": [ "<hostname>", ... ] }
|
"egress": { "allowlist": [ "<hostname>", ... ] },
|
||||||
|
"runtime": "runc" | "runsc"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"agents": {
|
"agents": {
|
||||||
@@ -21,199 +22,328 @@ Schema (see CLAUDE.md "Intended design"):
|
|||||||
|
|
||||||
Bottles group shared infrastructure (SSH keys, known hosts, egress allowlist)
|
Bottles group shared infrastructure (SSH keys, known hosts, egress allowlist)
|
||||||
that multiple agents can reference. Every agent must reference a bottle.
|
that multiple agents can reference. Every agent must reference a bottle.
|
||||||
|
|
||||||
|
Validation runs once at construction (Manifest.from_json_obj) so getters
|
||||||
|
can trust the shape.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
|
from dataclasses import dataclass, field
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any
|
from typing import Literal, Mapping, cast
|
||||||
|
|
||||||
from .log import die
|
from .log import die
|
||||||
|
|
||||||
Manifest = dict[str, Any]
|
|
||||||
|
Runtime = Literal["runc", "runsc"]
|
||||||
|
_SUPPORTED_RUNTIMES: tuple[Runtime, ...] = ("runc", "runsc")
|
||||||
|
|
||||||
|
|
||||||
def manifest_resolve(cwd: str) -> Manifest:
|
def _empty_str_dict() -> dict[str, str]:
|
||||||
"""Look for claude-bottle.json in <cwd> and in $HOME, deep-merge
|
return {}
|
||||||
them (cwd entries override home entries on key conflict for both
|
|
||||||
bottles and agents). Dies if neither file is found or either is
|
|
||||||
invalid JSON."""
|
|
||||||
cwd_file = Path(cwd) / "claude-bottle.json"
|
|
||||||
home_file = Path(os.environ["HOME"]) / "claude-bottle.json"
|
|
||||||
|
|
||||||
cwd_doc = _load_json_or_die(cwd_file) if cwd_file.is_file() else None
|
|
||||||
home_doc = _load_json_or_die(home_file) if home_file.is_file() else None
|
|
||||||
|
|
||||||
if cwd_doc is None and home_doc is None:
|
|
||||||
die(f"no claude-bottle.json found in {cwd} or {os.environ['HOME']}")
|
|
||||||
|
|
||||||
if cwd_doc is None:
|
|
||||||
return home_doc # type: ignore[return-value]
|
|
||||||
if home_doc is None:
|
|
||||||
return cwd_doc
|
|
||||||
|
|
||||||
return {
|
|
||||||
"bottles": {**(home_doc.get("bottles") or {}), **(cwd_doc.get("bottles") or {})},
|
|
||||||
"agents": {**(home_doc.get("agents") or {}), **(cwd_doc.get("agents") or {})},
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def _load_json_or_die(path: Path) -> Manifest:
|
@dataclass(frozen=True)
|
||||||
try:
|
class SshEntry:
|
||||||
with path.open() as f:
|
Host: str
|
||||||
doc = json.load(f)
|
IdentityFile: str
|
||||||
except json.JSONDecodeError:
|
Hostname: str = ""
|
||||||
die(f"claude-bottle.json at {path} is not valid JSON")
|
User: str = ""
|
||||||
if not isinstance(doc, dict):
|
Port: str = ""
|
||||||
die(f"claude-bottle.json at {path} must be a JSON object")
|
KnownHostKey: str = ""
|
||||||
return doc
|
|
||||||
|
@classmethod
|
||||||
|
def from_dict(cls, bottle_name: str, idx: int, raw: object) -> "SshEntry":
|
||||||
|
d = _as_json_object(raw, f"bottle '{bottle_name}' ssh[{idx}]")
|
||||||
|
host = d.get("Host")
|
||||||
|
if not isinstance(host, str) or not host:
|
||||||
|
die(f"bottle '{bottle_name}' ssh[{idx}] missing required string field 'Host'")
|
||||||
|
ident = d.get("IdentityFile")
|
||||||
|
if not isinstance(ident, str) or not ident:
|
||||||
|
die(
|
||||||
|
f"bottle '{bottle_name}' ssh '{host}' missing required string field "
|
||||||
|
f"'IdentityFile'"
|
||||||
|
)
|
||||||
|
hostname = _opt_str(d.get("Hostname"), f"bottle '{bottle_name}' ssh '{host}' Hostname")
|
||||||
|
user = _opt_str(d.get("User"), f"bottle '{bottle_name}' ssh '{host}' User")
|
||||||
|
port = _opt_port(d.get("Port"), f"bottle '{bottle_name}' ssh '{host}' Port")
|
||||||
|
khk = _opt_str(
|
||||||
|
d.get("KnownHostKey"),
|
||||||
|
f"bottle '{bottle_name}' ssh '{host}' KnownHostKey",
|
||||||
|
)
|
||||||
|
return cls(
|
||||||
|
Host=host,
|
||||||
|
IdentityFile=ident,
|
||||||
|
Hostname=hostname,
|
||||||
|
User=user,
|
||||||
|
Port=port,
|
||||||
|
KnownHostKey=khk,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def manifest_has_agent(manifest: Manifest, name: str) -> bool:
|
@dataclass(frozen=True)
|
||||||
return name in (manifest.get("agents") or {})
|
class BottleEgress:
|
||||||
|
allowlist: tuple[str, ...] = ()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_dict(cls, bottle_name: str, raw: object) -> "BottleEgress":
|
||||||
|
d = _as_json_object(raw, f"bottle '{bottle_name}' egress")
|
||||||
|
allow = d.get("allowlist")
|
||||||
|
if allow is None:
|
||||||
|
return cls()
|
||||||
|
if not isinstance(allow, list):
|
||||||
|
die(
|
||||||
|
f"bottle '{bottle_name}' egress.allowlist must be an array "
|
||||||
|
f"(was {type(allow).__name__})"
|
||||||
|
)
|
||||||
|
items: list[str] = []
|
||||||
|
allow_list = cast(list[object], allow)
|
||||||
|
for i, host in enumerate(allow_list):
|
||||||
|
if not isinstance(host, str):
|
||||||
|
die(
|
||||||
|
f"bottle '{bottle_name}' egress.allowlist[{i}] must be a string "
|
||||||
|
f"(was {type(host).__name__})"
|
||||||
|
)
|
||||||
|
items.append(host)
|
||||||
|
return cls(allowlist=tuple(items))
|
||||||
|
|
||||||
|
|
||||||
def manifest_require_agent(manifest: Manifest, name: str) -> None:
|
@dataclass(frozen=True)
|
||||||
"""Like has_agent but dies with the available agent names listed."""
|
class Bottle:
|
||||||
if manifest_has_agent(manifest, name):
|
env: Mapping[str, str] = field(default_factory=_empty_str_dict)
|
||||||
return
|
ssh: tuple[SshEntry, ...] = ()
|
||||||
available = ", ".join((manifest.get("agents") or {}).keys())
|
egress: BottleEgress = field(default_factory=BottleEgress)
|
||||||
if available:
|
runtime: Runtime = "runc"
|
||||||
die(f"agent '{name}' not defined in claude-bottle.json. Available: {available}")
|
|
||||||
else:
|
@classmethod
|
||||||
|
def from_dict(cls, name: str, raw: object) -> "Bottle":
|
||||||
|
d = _as_json_object(raw, f"bottle '{name}'")
|
||||||
|
|
||||||
|
env: dict[str, str] = {}
|
||||||
|
env_raw = d.get("env")
|
||||||
|
if env_raw is not None:
|
||||||
|
env_dict = _as_json_object(env_raw, f"bottle '{name}' env")
|
||||||
|
for var, value in env_dict.items():
|
||||||
|
if not isinstance(value, str):
|
||||||
|
die(
|
||||||
|
f"env entry {var} in bottle '{name}' must be a JSON string "
|
||||||
|
f"(was {type(value).__name__}). Use \"?<message>\" for prompt-at-runtime."
|
||||||
|
)
|
||||||
|
env[var] = value
|
||||||
|
|
||||||
|
ssh: tuple[SshEntry, ...] = ()
|
||||||
|
ssh_raw = d.get("ssh")
|
||||||
|
if ssh_raw is not None:
|
||||||
|
if not isinstance(ssh_raw, list):
|
||||||
|
die(f"bottle '{name}' ssh must be an array (was {type(ssh_raw).__name__})")
|
||||||
|
ssh_list = cast(list[object], ssh_raw)
|
||||||
|
ssh = tuple(
|
||||||
|
SshEntry.from_dict(name, i, entry)
|
||||||
|
for i, entry in enumerate(ssh_list)
|
||||||
|
)
|
||||||
|
|
||||||
|
egress_raw = d.get("egress")
|
||||||
|
egress = (
|
||||||
|
BottleEgress.from_dict(name, egress_raw)
|
||||||
|
if egress_raw is not None
|
||||||
|
else BottleEgress()
|
||||||
|
)
|
||||||
|
|
||||||
|
runtime_raw = d.get("runtime")
|
||||||
|
runtime: Runtime
|
||||||
|
if runtime_raw is None:
|
||||||
|
runtime = "runc"
|
||||||
|
else:
|
||||||
|
if not isinstance(runtime_raw, str):
|
||||||
|
die(f"bottle '{name}' runtime must be a string (was {type(runtime_raw).__name__})")
|
||||||
|
if runtime_raw not in _SUPPORTED_RUNTIMES:
|
||||||
|
die(
|
||||||
|
f"bottle '{name}' runtime '{runtime_raw}' is not supported. "
|
||||||
|
f"Use one of: {', '.join(_SUPPORTED_RUNTIMES)}."
|
||||||
|
)
|
||||||
|
runtime = runtime_raw
|
||||||
|
|
||||||
|
return cls(env=env, ssh=ssh, egress=egress, runtime=runtime)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class Agent:
|
||||||
|
bottle: str
|
||||||
|
skills: tuple[str, ...] = ()
|
||||||
|
prompt: str = ""
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_dict(cls, name: str, raw: object, bottle_names: set[str]) -> "Agent":
|
||||||
|
d = _as_json_object(raw, f"agent '{name}'")
|
||||||
|
|
||||||
|
bottle = d.get("bottle")
|
||||||
|
if not isinstance(bottle, str) or not bottle:
|
||||||
|
die(f"agent '{name}' must declare a 'bottle' field naming a defined bottle")
|
||||||
|
if bottle not in bottle_names:
|
||||||
|
available = ", ".join(sorted(bottle_names)) or "(none defined)"
|
||||||
|
die(
|
||||||
|
f"agent '{name}' references bottle '{bottle}', which is not defined. "
|
||||||
|
f"Available: {available}"
|
||||||
|
)
|
||||||
|
|
||||||
|
skills: tuple[str, ...] = ()
|
||||||
|
skills_raw = d.get("skills")
|
||||||
|
if skills_raw is not None:
|
||||||
|
if not isinstance(skills_raw, list):
|
||||||
|
die(f"agent '{name}' skills must be an array (was {type(skills_raw).__name__})")
|
||||||
|
collected: list[str] = []
|
||||||
|
skills_list = cast(list[object], skills_raw)
|
||||||
|
for i, skill in enumerate(skills_list):
|
||||||
|
if not isinstance(skill, str):
|
||||||
|
die(
|
||||||
|
f"agent '{name}' skills[{i}] must be a string "
|
||||||
|
f"(was {type(skill).__name__})"
|
||||||
|
)
|
||||||
|
collected.append(skill)
|
||||||
|
skills = tuple(collected)
|
||||||
|
|
||||||
|
prompt_raw = d.get("prompt")
|
||||||
|
if prompt_raw is None:
|
||||||
|
prompt = ""
|
||||||
|
elif isinstance(prompt_raw, str):
|
||||||
|
prompt = prompt_raw
|
||||||
|
else:
|
||||||
|
die(f"agent '{name}' prompt must be a string (was {type(prompt_raw).__name__})")
|
||||||
|
|
||||||
|
return cls(bottle=bottle, skills=skills, prompt=prompt)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class Manifest:
|
||||||
|
bottles: Mapping[str, Bottle]
|
||||||
|
agents: Mapping[str, Agent]
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def resolve(cls, cwd: str) -> "Manifest":
|
||||||
|
"""Look for claude-bottle.json in <cwd> and in $HOME, deep-merge
|
||||||
|
them (cwd entries override home entries on key conflict for both
|
||||||
|
bottles and agents), then validate. Dies if neither file is
|
||||||
|
found, either is invalid JSON, or the merged shape violates the
|
||||||
|
schema."""
|
||||||
|
cwd_file = Path(cwd) / "claude-bottle.json"
|
||||||
|
home_file = Path(os.environ["HOME"]) / "claude-bottle.json"
|
||||||
|
|
||||||
|
cwd_doc = _load_json_or_die(cwd_file) if cwd_file.is_file() else None
|
||||||
|
home_doc = _load_json_or_die(home_file) if home_file.is_file() else None
|
||||||
|
|
||||||
|
if cwd_doc is None and home_doc is None:
|
||||||
|
die(f"no claude-bottle.json found in {cwd} or {os.environ['HOME']}")
|
||||||
|
|
||||||
|
h: dict[str, object] = home_doc if home_doc is not None else {}
|
||||||
|
c: dict[str, object] = cwd_doc if cwd_doc is not None else {}
|
||||||
|
h_bottles = _section_dict(h.get("bottles"), "bottles")
|
||||||
|
c_bottles = _section_dict(c.get("bottles"), "bottles")
|
||||||
|
h_agents = _section_dict(h.get("agents"), "agents")
|
||||||
|
c_agents = _section_dict(c.get("agents"), "agents")
|
||||||
|
merged: dict[str, object] = {
|
||||||
|
"bottles": {**h_bottles, **c_bottles},
|
||||||
|
"agents": {**h_agents, **c_agents},
|
||||||
|
}
|
||||||
|
return cls.from_json_obj(merged)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_json_obj(cls, obj: object) -> "Manifest":
|
||||||
|
"""Validate and build a Manifest from a raw JSON-like dict."""
|
||||||
|
d = _as_json_object(obj, "manifest")
|
||||||
|
raw_bottles = _section_dict(d.get("bottles"), "manifest 'bottles'")
|
||||||
|
raw_agents = _section_dict(d.get("agents"), "manifest 'agents'")
|
||||||
|
|
||||||
|
bottles: dict[str, Bottle] = {
|
||||||
|
n: Bottle.from_dict(n, b) for n, b in raw_bottles.items()
|
||||||
|
}
|
||||||
|
bottle_names = set(bottles.keys())
|
||||||
|
agents: dict[str, Agent] = {
|
||||||
|
n: Agent.from_dict(n, a, bottle_names) for n, a in raw_agents.items()
|
||||||
|
}
|
||||||
|
return cls(bottles=bottles, agents=agents)
|
||||||
|
|
||||||
|
def has_agent(self, name: str) -> bool:
|
||||||
|
return name in self.agents
|
||||||
|
|
||||||
|
def require_agent(self, name: str) -> None:
|
||||||
|
if self.has_agent(name):
|
||||||
|
return
|
||||||
|
available = ", ".join(self.agents.keys())
|
||||||
|
if available:
|
||||||
|
die(f"agent '{name}' not defined in claude-bottle.json. Available: {available}")
|
||||||
die(f"agent '{name}' not defined in claude-bottle.json (manifest is empty).")
|
die(f"agent '{name}' not defined in claude-bottle.json (manifest is empty).")
|
||||||
|
|
||||||
|
def has_bottle(self, name: str) -> bool:
|
||||||
|
return name in self.bottles
|
||||||
|
|
||||||
def manifest_env_names(manifest: Manifest, name: str) -> list[str]:
|
def require_bottle(self, name: str) -> None:
|
||||||
"""Names (not values) of bottles[agent.bottle].env, in declaration
|
if self.has_bottle(name):
|
||||||
order. Empty list if the agent has no bottle or the bottle has no env."""
|
return
|
||||||
agent = (manifest.get("agents") or {}).get(name) or {}
|
available = ", ".join(self.bottles.keys())
|
||||||
bottle_name = agent.get("bottle") or ""
|
if available:
|
||||||
if not bottle_name:
|
die(
|
||||||
return []
|
f"bottle '{name}' not defined in claude-bottle.json. "
|
||||||
bottle = (manifest.get("bottles") or {}).get(bottle_name) or {}
|
f"Available bottles: {available}"
|
||||||
return list((bottle.get("env") or {}).keys())
|
)
|
||||||
|
die(f"bottle '{name}' not defined in claude-bottle.json (no bottles defined).")
|
||||||
|
|
||||||
|
def bottle_for(self, agent_name: str) -> Bottle:
|
||||||
|
"""Resolve the Bottle the named agent references. The validator
|
||||||
|
guarantees both lookups succeed for a manifest built via
|
||||||
|
from_json_obj."""
|
||||||
|
return self.bottles[self.agents[agent_name].bottle]
|
||||||
|
|
||||||
|
|
||||||
def manifest_env_entry(manifest: Manifest, agent: str, var: str) -> str:
|
def _as_json_object(value: object, label: str) -> dict[str, object]:
|
||||||
"""Raw string value of one env entry. Used by env_resolve, which
|
"""Assert that `value` is a JSON object (str-keyed dict) and return
|
||||||
classifies the result by sentinel. Dies if the agent has no bottle,
|
a view typed as `dict[str, object]` so downstream `.get(...)` calls
|
||||||
or the entry is not a string."""
|
have a typed surface."""
|
||||||
agent_def = (manifest.get("agents") or {}).get(agent) or {}
|
if not isinstance(value, dict):
|
||||||
bottle_name = agent_def.get("bottle") or ""
|
die(f"{label} must be a JSON object (was {type(value).__name__})")
|
||||||
if not bottle_name:
|
items = cast(dict[object, object], value)
|
||||||
die(f"env entry {var} for agent {agent}: agent has no 'bottle' field")
|
out: dict[str, object] = {}
|
||||||
bottle = (manifest.get("bottles") or {}).get(bottle_name) or {}
|
for k, v in items.items():
|
||||||
env = bottle.get("env") or {}
|
if not isinstance(k, str):
|
||||||
value = env.get(var)
|
die(f"{label} keys must be strings (found {type(k).__name__})")
|
||||||
|
out[k] = v
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def _section_dict(value: object, label: str) -> dict[str, object]:
|
||||||
|
"""Like _as_json_object but treats absent/null as an empty section."""
|
||||||
|
if value is None:
|
||||||
|
return {}
|
||||||
|
return _as_json_object(value, label)
|
||||||
|
|
||||||
|
|
||||||
|
def _load_json_or_die(path: Path) -> dict[str, object]:
|
||||||
|
try:
|
||||||
|
with path.open() as f:
|
||||||
|
doc: object = json.load(f)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
die(f"claude-bottle.json at {path} is not valid JSON")
|
||||||
|
return _as_json_object(doc, f"claude-bottle.json at {path}")
|
||||||
|
|
||||||
|
|
||||||
|
def _opt_str(value: object, label: str) -> str:
|
||||||
|
if value is None:
|
||||||
|
return ""
|
||||||
if not isinstance(value, str):
|
if not isinstance(value, str):
|
||||||
actual = _json_type(value)
|
die(f"{label} must be a string (was {type(value).__name__})")
|
||||||
die(
|
|
||||||
f"env entry {var} for agent {agent} must be a JSON string "
|
|
||||||
f"(was {actual}). Use \"?<message>\" for prompt-at-runtime."
|
|
||||||
)
|
|
||||||
return value
|
return value
|
||||||
|
|
||||||
|
|
||||||
def manifest_skills(manifest: Manifest, name: str) -> list[str]:
|
def _opt_port(value: object, label: str) -> str:
|
||||||
agent = (manifest.get("agents") or {}).get(name) or {}
|
"""Port accepts string or int (JSON-friendly) and is normalized to str."""
|
||||||
return list(agent.get("skills") or [])
|
|
||||||
|
|
||||||
|
|
||||||
def manifest_prompt(manifest: Manifest, name: str) -> str:
|
|
||||||
agent = (manifest.get("agents") or {}).get(name) or {}
|
|
||||||
return agent.get("prompt") or ""
|
|
||||||
|
|
||||||
|
|
||||||
def manifest_agent_bottle(manifest: Manifest, name: str) -> str:
|
|
||||||
agent = (manifest.get("agents") or {}).get(name) or {}
|
|
||||||
return agent.get("bottle") or ""
|
|
||||||
|
|
||||||
|
|
||||||
def manifest_has_bottle(manifest: Manifest, bottle_name: str) -> bool:
|
|
||||||
return bottle_name in (manifest.get("bottles") or {})
|
|
||||||
|
|
||||||
|
|
||||||
def manifest_require_bottle(manifest: Manifest, bottle_name: str) -> None:
|
|
||||||
if manifest_has_bottle(manifest, bottle_name):
|
|
||||||
return
|
|
||||||
available = ", ".join((manifest.get("bottles") or {}).keys())
|
|
||||||
if available:
|
|
||||||
die(
|
|
||||||
f"bottle '{bottle_name}' not defined in claude-bottle.json. "
|
|
||||||
f"Available bottles: {available}"
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
die(f"bottle '{bottle_name}' not defined in claude-bottle.json (no bottles defined).")
|
|
||||||
|
|
||||||
|
|
||||||
def manifest_bottle_ssh(manifest: Manifest, bottle_name: str) -> list[dict[str, Any]]:
|
|
||||||
bottle = (manifest.get("bottles") or {}).get(bottle_name) or {}
|
|
||||||
return list(bottle.get("ssh") or [])
|
|
||||||
|
|
||||||
|
|
||||||
_SUPPORTED_RUNTIMES: tuple[str, ...] = ("runc", "runsc")
|
|
||||||
|
|
||||||
|
|
||||||
def manifest_bottle_runtime(manifest: Manifest, bottle_name: str) -> str:
|
|
||||||
"""Container runtime for the bottle's agent container. Returns
|
|
||||||
"runc" (Docker default) or "runsc" (gVisor opt-in). Dies if the
|
|
||||||
field is present but not one of the supported values."""
|
|
||||||
bottle = (manifest.get("bottles") or {}).get(bottle_name) or {}
|
|
||||||
raw = bottle.get("runtime")
|
|
||||||
if raw is None:
|
|
||||||
return "runc"
|
|
||||||
if not isinstance(raw, str):
|
|
||||||
die(
|
|
||||||
f"bottle '{bottle_name}' runtime must be a string "
|
|
||||||
f"(was {_json_type(raw)})."
|
|
||||||
)
|
|
||||||
if raw not in _SUPPORTED_RUNTIMES:
|
|
||||||
die(
|
|
||||||
f"bottle '{bottle_name}' runtime '{raw}' is not supported. "
|
|
||||||
f"Use one of: {', '.join(_SUPPORTED_RUNTIMES)}."
|
|
||||||
)
|
|
||||||
return raw
|
|
||||||
|
|
||||||
|
|
||||||
def manifest_bottle_egress_allowlist(manifest: Manifest, bottle_name: str) -> list[str]:
|
|
||||||
"""Hostnames in bottles[bottle_name].egress.allowlist. Dies if the
|
|
||||||
field is present but not an array. Per-element string typing is
|
|
||||||
re-checked at use-time in pipelock."""
|
|
||||||
bottle = (manifest.get("bottles") or {}).get(bottle_name) or {}
|
|
||||||
allowlist = (bottle.get("egress") or {}).get("allowlist")
|
|
||||||
if allowlist is None:
|
|
||||||
return []
|
|
||||||
if not isinstance(allowlist, list):
|
|
||||||
die(
|
|
||||||
f"bottle '{bottle_name}' egress.allowlist must be an array "
|
|
||||||
f"(was {_json_type(allowlist)})."
|
|
||||||
)
|
|
||||||
return list(allowlist)
|
|
||||||
|
|
||||||
|
|
||||||
def manifest_ssh(manifest: Manifest, agent_name: str) -> list[dict[str, Any]]:
|
|
||||||
"""SSH entries resolved via the agent's "bottle" field; empty if no bottle set."""
|
|
||||||
bottle_name = manifest_agent_bottle(manifest, agent_name)
|
|
||||||
if not bottle_name:
|
|
||||||
return []
|
|
||||||
return manifest_bottle_ssh(manifest, bottle_name)
|
|
||||||
|
|
||||||
|
|
||||||
def _json_type(value: Any) -> str:
|
|
||||||
"""Mirror jq's type names for parity with the bash error messages."""
|
|
||||||
if value is None:
|
if value is None:
|
||||||
return "null"
|
return ""
|
||||||
if isinstance(value, bool):
|
if isinstance(value, bool):
|
||||||
return "boolean"
|
die(f"{label} must be a string or number (was boolean)")
|
||||||
if isinstance(value, (int, float)):
|
if isinstance(value, int):
|
||||||
return "number"
|
return str(value)
|
||||||
if isinstance(value, str):
|
if isinstance(value, str):
|
||||||
return "string"
|
return value
|
||||||
if isinstance(value, list):
|
die(f"{label} must be a string or number (was {type(value).__name__})")
|
||||||
return "array"
|
|
||||||
if isinstance(value, dict):
|
|
||||||
return "object"
|
|
||||||
return type(value).__name__
|
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ import subprocess
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from .log import die, info, warn
|
from .log import die, info, warn
|
||||||
from .manifest import Manifest, manifest_bottle_egress_allowlist, manifest_bottle_ssh
|
from .manifest import Manifest
|
||||||
|
|
||||||
# Pipelock image, pinned by digest. The digest is the multi-arch image
|
# Pipelock image, pinned by digest. The digest is the multi-arch image
|
||||||
# index for ghcr.io/luckypipewrench/pipelock:2.3.0.
|
# index for ghcr.io/luckypipewrench/pipelock:2.3.0.
|
||||||
@@ -58,23 +58,12 @@ def pipelock_proxy_host_port(slug: str) -> str:
|
|||||||
|
|
||||||
|
|
||||||
def pipelock_bottle_allowlist(manifest: Manifest, bottle_name: str) -> list[str]:
|
def pipelock_bottle_allowlist(manifest: Manifest, bottle_name: str) -> list[str]:
|
||||||
"""Hostnames in bottles[<bottle_name>].egress.allowlist. Validates
|
"""Hostnames in bottles[<bottle_name>].egress.allowlist."""
|
||||||
that each entry is a string."""
|
return list(manifest.bottles[bottle_name].egress.allowlist)
|
||||||
raw = manifest_bottle_egress_allowlist(manifest, bottle_name)
|
|
||||||
for entry in raw:
|
|
||||||
if not isinstance(entry, str):
|
|
||||||
t = _json_type(entry)
|
|
||||||
die(f"bottle '{bottle_name}' egress.allowlist must contain only strings; found a '{t}' entry.")
|
|
||||||
return list(raw)
|
|
||||||
|
|
||||||
|
|
||||||
def pipelock_bottle_ssh_hostnames(manifest: Manifest, bottle_name: str) -> list[str]:
|
def pipelock_bottle_ssh_hostnames(manifest: Manifest, bottle_name: str) -> list[str]:
|
||||||
out: list[str] = []
|
return [e.Hostname for e in manifest.bottles[bottle_name].ssh if e.Hostname]
|
||||||
for entry in manifest_bottle_ssh(manifest, bottle_name):
|
|
||||||
h = entry.get("Hostname") or ""
|
|
||||||
if h:
|
|
||||||
out.append(h)
|
|
||||||
return out
|
|
||||||
|
|
||||||
|
|
||||||
_IPV4_RE = re.compile(r"^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+$")
|
_IPV4_RE = re.compile(r"^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+$")
|
||||||
@@ -256,19 +245,3 @@ def pipelock_stop(slug: str) -> None:
|
|||||||
stderr=subprocess.DEVNULL,
|
stderr=subprocess.DEVNULL,
|
||||||
).returncode != 0:
|
).returncode != 0:
|
||||||
warn(f"failed to remove pipelock sidecar {name}; clean up with 'docker rm -f {name}'")
|
warn(f"failed to remove pipelock sidecar {name}; clean up with 'docker rm -f {name}'")
|
||||||
|
|
||||||
|
|
||||||
def _json_type(value: object) -> str:
|
|
||||||
if value is None:
|
|
||||||
return "null"
|
|
||||||
if isinstance(value, bool):
|
|
||||||
return "boolean"
|
|
||||||
if isinstance(value, (int, float)):
|
|
||||||
return "number"
|
|
||||||
if isinstance(value, str):
|
|
||||||
return "string"
|
|
||||||
if isinstance(value, list):
|
|
||||||
return "array"
|
|
||||||
if isinstance(value, dict):
|
|
||||||
return "object"
|
|
||||||
return type(value).__name__
|
|
||||||
|
|||||||
+14
-19
@@ -36,31 +36,26 @@ from __future__ import annotations
|
|||||||
import os
|
import os
|
||||||
import subprocess
|
import subprocess
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any
|
from typing import Sequence
|
||||||
|
|
||||||
from .log import die, info
|
from .log import die, info
|
||||||
|
from .manifest import SshEntry
|
||||||
|
|
||||||
|
|
||||||
def ssh_validate_entries(entries: list[dict[str, Any]]) -> None:
|
def ssh_validate_entries(entries: Sequence[SshEntry]) -> None:
|
||||||
"""Each entry must have Host + IdentityFile, and the IdentityFile
|
"""The IdentityFile must exist on the host (after expanding leading ~).
|
||||||
must exist on the host (after expanding leading ~)."""
|
Host and IdentityFile shape are already enforced by Manifest validation."""
|
||||||
for entry in entries:
|
for entry in entries:
|
||||||
name = entry.get("Host", "")
|
key = _expand_tilde(entry.IdentityFile)
|
||||||
key = entry.get("IdentityFile", "")
|
|
||||||
if not name:
|
|
||||||
die(f"ssh entry missing required field 'Host': {entry}")
|
|
||||||
if not key:
|
|
||||||
die(f"ssh entry '{name}' missing required field 'IdentityFile'")
|
|
||||||
key = _expand_tilde(key)
|
|
||||||
if not os.path.isfile(key):
|
if not os.path.isfile(key):
|
||||||
die(f"ssh key file not found for host '{name}': {key}")
|
die(f"ssh key file not found for host '{entry.Host}': {key}")
|
||||||
|
|
||||||
|
|
||||||
def ssh_setup(
|
def ssh_setup(
|
||||||
container: str,
|
container: str,
|
||||||
stage_dir: Path,
|
stage_dir: Path,
|
||||||
proxy_host_port: str,
|
proxy_host_port: str,
|
||||||
entries: list[dict[str, Any]],
|
entries: Sequence[SshEntry],
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Set up SSH in the container so node can authenticate using each
|
"""Set up SSH in the container so node can authenticate using each
|
||||||
entry's key without the key file being readable by node."""
|
entry's key without the key file being readable by node."""
|
||||||
@@ -91,12 +86,12 @@ def ssh_setup(
|
|||||||
|
|
||||||
container_key_paths: list[str] = []
|
container_key_paths: list[str] = []
|
||||||
for entry in entries:
|
for entry in entries:
|
||||||
name = entry["Host"]
|
name = entry.Host
|
||||||
key = _expand_tilde(entry["IdentityFile"])
|
key = _expand_tilde(entry.IdentityFile)
|
||||||
hostname = entry["Hostname"]
|
hostname = entry.Hostname
|
||||||
user = entry["User"]
|
user = entry.User
|
||||||
port = str(entry["Port"])
|
port = entry.Port
|
||||||
known_host_key = entry.get("KnownHostKey", "")
|
known_host_key = entry.KnownHostKey
|
||||||
|
|
||||||
key_basename = os.path.basename(key)
|
key_basename = os.path.basename(key)
|
||||||
container_key_path = f"{keys_dir}/{key_basename}"
|
container_key_path = f"{keys_dir}/{key_basename}"
|
||||||
|
|||||||
+27
-8
@@ -1,4 +1,7 @@
|
|||||||
"""Manifest fixtures for the test suite."""
|
"""Manifest fixtures for the test suite. Each fixture returns a built
|
||||||
|
Manifest dataclass; callers that need the raw JSON shape (e.g. to write
|
||||||
|
to a file on disk) can build it themselves or call .from_json_obj on
|
||||||
|
a dict literal in the test."""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
@@ -7,9 +10,11 @@ import tempfile
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
|
from claude_bottle.manifest import Manifest
|
||||||
|
|
||||||
def fixture_minimal() -> dict[str, Any]:
|
|
||||||
"""One bottle, one agent, no env / ssh / skills."""
|
def fixture_minimal_dict() -> dict[str, Any]:
|
||||||
|
"""One bottle, one agent, no env / ssh / skills. JSON shape."""
|
||||||
return {
|
return {
|
||||||
"bottles": {"dev": {}},
|
"bottles": {"dev": {}},
|
||||||
"agents": {
|
"agents": {
|
||||||
@@ -18,8 +23,8 @@ def fixture_minimal() -> dict[str, Any]:
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def fixture_with_egress() -> dict[str, Any]:
|
def fixture_with_egress_dict() -> dict[str, Any]:
|
||||||
"""Bottle declares an egress.allowlist."""
|
"""Bottle declares an egress.allowlist. JSON shape."""
|
||||||
return {
|
return {
|
||||||
"bottles": {
|
"bottles": {
|
||||||
"dev": {
|
"dev": {
|
||||||
@@ -32,9 +37,9 @@ def fixture_with_egress() -> dict[str, Any]:
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def fixture_with_ssh() -> dict[str, Any]:
|
def fixture_with_ssh_dict() -> dict[str, Any]:
|
||||||
"""Bottle has both an IPv4-literal SSH host (CGNAT) and a hostname host,
|
"""Bottle has both an IPv4-literal SSH host (CGNAT) and a hostname host,
|
||||||
exercising both ssrf.ip_allowlist and trusted_domains code paths."""
|
exercising both ssrf.ip_allowlist and trusted_domains code paths. JSON shape."""
|
||||||
return {
|
return {
|
||||||
"bottles": {
|
"bottles": {
|
||||||
"dev": {
|
"dev": {
|
||||||
@@ -60,8 +65,22 @@ def fixture_with_ssh() -> dict[str, Any]:
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def fixture_minimal() -> Manifest:
|
||||||
|
return Manifest.from_json_obj(fixture_minimal_dict())
|
||||||
|
|
||||||
|
|
||||||
|
def fixture_with_egress() -> Manifest:
|
||||||
|
return Manifest.from_json_obj(fixture_with_egress_dict())
|
||||||
|
|
||||||
|
|
||||||
|
def fixture_with_ssh() -> Manifest:
|
||||||
|
return Manifest.from_json_obj(fixture_with_ssh_dict())
|
||||||
|
|
||||||
|
|
||||||
def write_fixture(fn) -> Path:
|
def write_fixture(fn) -> Path:
|
||||||
"""Write fixture dict to a temp file; return the path. Caller must rm."""
|
"""Write fixture JSON to a temp file; return the path. Caller must rm.
|
||||||
|
Accepts a function returning either a dict (JSON shape) or a Manifest;
|
||||||
|
only the dict form is supported here since we need to serialize."""
|
||||||
f = tempfile.NamedTemporaryFile(
|
f = tempfile.NamedTemporaryFile(
|
||||||
mode="w", suffix=".json", delete=False, encoding="utf-8"
|
mode="w", suffix=".json", delete=False, encoding="utf-8"
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -1,15 +1,18 @@
|
|||||||
"""Unit: manifest_bottle_runtime — defaults to runc, accepts runsc,
|
"""Unit: bottle runtime — Manifest.from_json_obj defaults runtime to runc,
|
||||||
rejects unknown values and non-strings."""
|
accepts runsc, and rejects unknown values, non-strings, and empty strings."""
|
||||||
|
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from claude_bottle.log import Die
|
from claude_bottle.log import Die
|
||||||
from claude_bottle.manifest import manifest_bottle_runtime
|
from claude_bottle.manifest import Manifest
|
||||||
|
|
||||||
|
|
||||||
def _bottle(runtime_value: object | None) -> dict:
|
_ABSENT = object()
|
||||||
"""Build a minimal manifest with one bottle whose runtime field is
|
|
||||||
set (or absent if `runtime_value is _ABSENT`)."""
|
|
||||||
|
def _bottle(runtime_value: object) -> dict:
|
||||||
|
"""Build a minimal manifest JSON shape with one bottle whose runtime
|
||||||
|
field is set (or absent if `runtime_value is _ABSENT`)."""
|
||||||
bottle: dict = {}
|
bottle: dict = {}
|
||||||
if runtime_value is not _ABSENT:
|
if runtime_value is not _ABSENT:
|
||||||
bottle["runtime"] = runtime_value
|
bottle["runtime"] = runtime_value
|
||||||
@@ -19,30 +22,30 @@ def _bottle(runtime_value: object | None) -> dict:
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
_ABSENT = object()
|
|
||||||
|
|
||||||
|
|
||||||
class TestManifestBottleRuntime(unittest.TestCase):
|
class TestManifestBottleRuntime(unittest.TestCase):
|
||||||
def test_default_runc_when_absent(self):
|
def test_default_runc_when_absent(self):
|
||||||
self.assertEqual("runc", manifest_bottle_runtime(_bottle(_ABSENT), "dev"))
|
m = Manifest.from_json_obj(_bottle(_ABSENT))
|
||||||
|
self.assertEqual("runc", m.bottles["dev"].runtime)
|
||||||
|
|
||||||
def test_explicit_runc(self):
|
def test_explicit_runc(self):
|
||||||
self.assertEqual("runc", manifest_bottle_runtime(_bottle("runc"), "dev"))
|
m = Manifest.from_json_obj(_bottle("runc"))
|
||||||
|
self.assertEqual("runc", m.bottles["dev"].runtime)
|
||||||
|
|
||||||
def test_explicit_runsc(self):
|
def test_explicit_runsc(self):
|
||||||
self.assertEqual("runsc", manifest_bottle_runtime(_bottle("runsc"), "dev"))
|
m = Manifest.from_json_obj(_bottle("runsc"))
|
||||||
|
self.assertEqual("runsc", m.bottles["dev"].runtime)
|
||||||
|
|
||||||
def test_rejects_unknown_runtime(self):
|
def test_rejects_unknown_runtime(self):
|
||||||
with self.assertRaises(Die):
|
with self.assertRaises(Die):
|
||||||
manifest_bottle_runtime(_bottle("kata-runtime"), "dev")
|
Manifest.from_json_obj(_bottle("kata-runtime"))
|
||||||
|
|
||||||
def test_rejects_non_string(self):
|
def test_rejects_non_string(self):
|
||||||
with self.assertRaises(Die):
|
with self.assertRaises(Die):
|
||||||
manifest_bottle_runtime(_bottle(42), "dev")
|
Manifest.from_json_obj(_bottle(42))
|
||||||
|
|
||||||
def test_rejects_empty_string(self):
|
def test_rejects_empty_string(self):
|
||||||
with self.assertRaises(Die):
|
with self.assertRaises(Die):
|
||||||
manifest_bottle_runtime(_bottle(""), "dev")
|
Manifest.from_json_obj(_bottle(""))
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ pipelock_bottle_ssh_trusted_domains, pipelock_effective_allowlist."""
|
|||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from claude_bottle.log import Die
|
from claude_bottle.log import Die
|
||||||
|
from claude_bottle.manifest import Manifest
|
||||||
from claude_bottle.pipelock import (
|
from claude_bottle.pipelock import (
|
||||||
pipelock_bottle_allowlist,
|
pipelock_bottle_allowlist,
|
||||||
pipelock_bottle_ssh_hostnames,
|
pipelock_bottle_ssh_hostnames,
|
||||||
@@ -32,7 +33,7 @@ class TestBottleAllowlist(unittest.TestCase):
|
|||||||
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
|
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
|
||||||
}
|
}
|
||||||
with self.assertRaises(Die):
|
with self.assertRaises(Die):
|
||||||
pipelock_bottle_allowlist(bad, "dev")
|
Manifest.from_json_obj(bad)
|
||||||
|
|
||||||
|
|
||||||
class TestSSHHostnames(unittest.TestCase):
|
class TestSSHHostnames(unittest.TestCase):
|
||||||
@@ -54,7 +55,7 @@ class TestSSHHostnames(unittest.TestCase):
|
|||||||
|
|
||||||
class TestEffectiveAllowlist(unittest.TestCase):
|
class TestEffectiveAllowlist(unittest.TestCase):
|
||||||
def test_union_and_dedup(self):
|
def test_union_and_dedup(self):
|
||||||
manifest = {
|
manifest = Manifest.from_json_obj({
|
||||||
"bottles": {
|
"bottles": {
|
||||||
"dev": {
|
"dev": {
|
||||||
"egress": {"allowlist": ["registry.npmjs.org"]},
|
"egress": {"allowlist": ["registry.npmjs.org"]},
|
||||||
@@ -67,7 +68,7 @@ class TestEffectiveAllowlist(unittest.TestCase):
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
|
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
|
||||||
}
|
})
|
||||||
eff = pipelock_effective_allowlist(manifest, "dev")
|
eff = pipelock_effective_allowlist(manifest, "dev")
|
||||||
self.assertIn("api.anthropic.com", eff)
|
self.assertIn("api.anthropic.com", eff)
|
||||||
self.assertIn("registry.npmjs.org", eff)
|
self.assertIn("registry.npmjs.org", eff)
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import tempfile
|
|||||||
import unittest
|
import unittest
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
|
from claude_bottle.manifest import Manifest
|
||||||
from claude_bottle.pipelock import pipelock_write_yaml
|
from claude_bottle.pipelock import pipelock_write_yaml
|
||||||
from tests.fixtures import fixture_minimal, fixture_with_ssh
|
from tests.fixtures import fixture_minimal, fixture_with_ssh
|
||||||
|
|
||||||
@@ -50,7 +51,7 @@ class TestPipelockYaml(unittest.TestCase):
|
|||||||
self.assertIn("100.78.141.42", content)
|
self.assertIn("100.78.141.42", content)
|
||||||
|
|
||||||
def test_secret_hygiene(self):
|
def test_secret_hygiene(self):
|
||||||
manifest = {
|
manifest = Manifest.from_json_obj({
|
||||||
"bottles": {
|
"bottles": {
|
||||||
"dev": {
|
"dev": {
|
||||||
"env": {
|
"env": {
|
||||||
@@ -61,7 +62,7 @@ class TestPipelockYaml(unittest.TestCase):
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
|
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
|
||||||
}
|
})
|
||||||
yaml_path = self.out_dir / "secret.yaml"
|
yaml_path = self.out_dir / "secret.yaml"
|
||||||
pipelock_write_yaml(manifest, "dev", yaml_path)
|
pipelock_write_yaml(manifest, "dev", yaml_path)
|
||||||
content = yaml_path.read_text()
|
content = yaml_path.read_text()
|
||||||
|
|||||||
Reference in New Issue
Block a user