"""Manifest dataclasses (PRD 0011 layout). Reads the per-file manifest tree: $HOME/.bot-bottle/bottles/.md — one bottle per file $HOME/.bot-bottle/agents/.md — home-resident agents $CWD/.bot-bottle/agents/.md — cwd-supplied agents Each file is Markdown with YAML frontmatter. The frontmatter holds the structured config (see schema below); for agents the body is the system prompt, for bottles the body is human documentation (ignored by the parser). Bottle schema (frontmatter): extends: # optional (PRD 0025) env: { : , ... } git: user: { name: , email: } # optional remotes: { : , ... } # optional egress: { routes: [ , ... ] } # route keys: host, path_allowlist, auth, role, pipelock # pipelock: { tls_passthrough: , ssrf_ip_allowlist: [, ...] } supervise: # optional Agent schema (frontmatter): bottle: # required skills: [ , ... ] # optional # Claude Code subagent passthrough fields — accepted, ignored: name, description, model, color, memory The agent file's Markdown body is the system prompt (stripped). Unknown top-level frontmatter keys raise ManifestError with a hint. Bottles can ONLY live under $HOME. A bottles/ dir under $CWD is a warn at load time and contributes nothing. The trust boundary is expressed as filesystem layout rather than resolver logic. Validation runs once at load. Manifest.from_json_obj is preserved as a programmatic entry point (used by tests) that takes a dict with the same field names — useful for building manifests without on-disk files. """ from __future__ import annotations import ipaddress import os from dataclasses import dataclass, field, replace from pathlib import Path from typing import Mapping, cast from .agent_provider import PROVIDER_TEMPLATES from .log import warn from .manifest_schema import AGENT_MODEL_KEYS, BOTTLE_KEYS class ManifestError(Exception): """A manifest file (or the manifest tree) is invalid.""" def _empty_str_dict() -> dict[str, str]: return {} @dataclass(frozen=True) class GitEntry: """One upstream the per-agent git-gate (PRD 0008) is allowed to talk to. `Upstream` is the real remote URL the agent would push to if there were no gate; the gate hosts a bare repo at /git/.git and `IdentityFile` is the SSH key the gate uses to push that repo upstream after gitleaks passes. The agent itself never holds the upstream credential. The Upstream URL is parsed once at construction and the pieces are stashed in the `Upstream*` fields so the git-gate render step doesn't have to re-parse.""" Name: str Upstream: str IdentityFile: str KnownHostKey: str = "" RemoteKey: str = "" UpstreamUser: str = "" UpstreamHost: str = "" UpstreamPort: str = "" UpstreamPath: str = "" @classmethod def from_dict(cls, bottle_name: str, idx: int, raw: object) -> "GitEntry": d = _as_json_object(raw, f"bottle '{bottle_name}' git[{idx}]") return cls._from_object(bottle_name, d, f"git[{idx}]", None) @classmethod def from_remote_dict( cls, bottle_name: str, host_key: str, raw: object ) -> "GitEntry": if not host_key: raise ManifestError(f"bottle '{bottle_name}' git.remotes has an empty host key") d = _as_json_object(raw, f"bottle '{bottle_name}' git.remotes[{host_key!r}]") return cls._from_object( bottle_name, d, f"git.remotes[{host_key!r}]", host_key, ) @classmethod def _from_object( cls, bottle_name: str, d: dict[str, object], label: str, host_key: str | None, ) -> "GitEntry": name = d.get("Name") if not isinstance(name, str) or not name: raise ManifestError( f"bottle '{bottle_name}' {label} missing required string " f"field 'Name'" ) upstream = d.get("Upstream") if not isinstance(upstream, str) or not upstream: raise ManifestError( f"bottle '{bottle_name}' {label} '{name}' missing required string field " f"'Upstream'" ) ident = d.get("IdentityFile") if not isinstance(ident, str) or not ident: raise ManifestError( f"bottle '{bottle_name}' {label} '{name}' missing required string field " f"'IdentityFile'" ) khk = _opt_str( d.get("KnownHostKey"), f"bottle '{bottle_name}' {label} '{name}' KnownHostKey", ) user, host, port, path = _parse_git_upstream( upstream, f"bottle '{bottle_name}' {label} '{name}' Upstream" ) if ( host_key is not None and host_key != host and not _is_ip_literal(host) ): raise ManifestError( f"bottle '{bottle_name}' git.remotes key {host_key!r} " f"does not match Upstream host {host!r}" ) return cls( Name=name, Upstream=upstream, IdentityFile=ident, KnownHostKey=khk, RemoteKey=host_key or host, UpstreamUser=user, UpstreamHost=host, UpstreamPort=port, UpstreamPath=path, ) # Auth schemes for the egress route's optional `auth` block. # Same values cred-proxy accepts today; `token` sidesteps the Gitea # token-not-Bearer quirk (go-gitea/gitea#16734). EGRESS_AUTH_SCHEMES = ("Bearer", "token") @dataclass(frozen=True) class AgentProvider: """Provider/template for the agent process inside a bottle. `template` selects a built-in launch/runtime contract. `dockerfile` optionally points at a custom agent-image Dockerfile while leaving bot-bottle's sidecar infrastructure intact. `auth_token` names the host env var that holds the provider's OAuth token (Claude only). The provisioner injects a provider-owned egress route for api.anthropic.com that re-injects this token as the Bearer header, and sets a placeholder CLAUDE_CODE_OAUTH_TOKEN in the agent so the Claude Code CLI starts. `forward_host_credentials` forwards the host Codex auth token into the egress sidecar (Codex only). """ template: str = "claude" dockerfile: str = "" auth_token: str = "" forward_host_credentials: bool = False @classmethod def from_dict(cls, bottle_name: str, raw: object) -> "AgentProvider": d = _as_json_object(raw, f"bottle '{bottle_name}' agent_provider") for k in d: if k not in {"template", "dockerfile", "auth_token", "forward_host_credentials"}: raise ManifestError( f"bottle '{bottle_name}' agent_provider has unknown key {k!r}; " f"allowed: template, dockerfile, auth_token, forward_host_credentials" ) template = d.get("template", "claude") if not isinstance(template, str) or not template: raise ManifestError( f"bottle '{bottle_name}' agent_provider.template must be a " f"non-empty string" ) if template not in PROVIDER_TEMPLATES: raise ManifestError( f"bottle '{bottle_name}' agent_provider.template {template!r} " f"is not one of {', '.join(sorted(PROVIDER_TEMPLATES))}" ) dockerfile = d.get("dockerfile", "") if not isinstance(dockerfile, str): raise ManifestError( f"bottle '{bottle_name}' agent_provider.dockerfile must be a " f"string (was {type(dockerfile).__name__})" ) auth_token = d.get("auth_token", "") if not isinstance(auth_token, str): raise ManifestError( f"bottle '{bottle_name}' agent_provider.auth_token must be a " f"string (was {type(auth_token).__name__})" ) if auth_token and template != "claude": raise ManifestError( f"bottle '{bottle_name}' agent_provider.auth_token is only " f"supported for template 'claude'" ) forward_host_credentials = d.get("forward_host_credentials", False) if not isinstance(forward_host_credentials, bool): raise ManifestError( f"bottle '{bottle_name}' agent_provider.forward_host_credentials " f"must be a boolean (was {type(forward_host_credentials).__name__})" ) if forward_host_credentials and template != "codex": raise ManifestError( f"bottle '{bottle_name}' agent_provider.forward_host_credentials " "is currently only supported for template 'codex'" ) return cls( template=template, dockerfile=dockerfile, auth_token=auth_token, forward_host_credentials=forward_host_credentials, ) @dataclass(frozen=True) class GitUser: """Per-bottle `git config --global user.name` / `user.email` pair (issue #86). The agent's commits inside the bottle are attributed to this identity rather than the agent image's image-baked default (no user, or whatever the image dropped in). Either or both fields can be set independently. `from_dict` is forgiving on shape (a single missing field is fine — we just skip that config line at provisioning) but strict on types (string-or-die).""" name: str = "" email: str = "" @classmethod def from_dict(cls, bottle_name: str, raw: object) -> "GitUser": d = _as_json_object(raw, f"bottle '{bottle_name}' git.user") for k in d.keys(): if k not in {"name", "email"}: raise ManifestError( f"bottle '{bottle_name}' git.user has unknown key {k!r}; " f"allowed: name, email" ) name = d.get("name", "") email = d.get("email", "") if not isinstance(name, str): raise ManifestError( f"bottle '{bottle_name}' git.user.name must be a string " f"(was {type(name).__name__})" ) if not isinstance(email, str): raise ManifestError( f"bottle '{bottle_name}' git.user.email must be a string " f"(was {type(email).__name__})" ) if not name and not email: raise ManifestError( f"bottle '{bottle_name}' git.user is set but neither " f"name nor email is non-empty; remove the block or " f"fill at least one field." ) return cls(name=name, email=email) def is_empty(self) -> bool: return not self.name and not self.email def _parse_git_config( bottle_name: str, raw: object, ) -> tuple[tuple[GitEntry, ...], GitUser]: d = _as_json_object(raw, f"bottle '{bottle_name}' git") for k in d.keys(): if k not in {"user", "remotes"}: raise ManifestError( f"bottle '{bottle_name}' git has unknown key {k!r}; " f"allowed: user, remotes" ) git_user = ( GitUser.from_dict(bottle_name, d["user"]) if "user" in d else GitUser() ) git: tuple[GitEntry, ...] = () remotes_raw = d.get("remotes") if remotes_raw is not None: remotes = _as_json_object(remotes_raw, f"bottle '{bottle_name}' git.remotes") git = tuple( GitEntry.from_remote_dict(bottle_name, host, entry) for host, entry in remotes.items() ) _validate_unique_git_names(bottle_name, git) return git, git_user @dataclass(frozen=True) class PipelockRoutePolicy: """Per-route pipelock policy overrides. `TlsPassthrough` adds the route host to pipelock's `tls_interception.passthrough_domains`, so pipelock still enforces the hostname allowlist but does not MITM/decrypt request bodies or headers for that host. `SsrfIpAllowlist` adds explicit IPs/CIDRs to pipelock's SSRF allowlist for private/internal destinations behind this route. """ TlsPassthrough: bool = False SsrfIpAllowlist: tuple[str, ...] = () @classmethod def from_dict( cls, bottle_name: str, idx: int, raw: object, ) -> "PipelockRoutePolicy": label = f"bottle '{bottle_name}' egress.routes[{idx}] pipelock" d = _as_json_object(raw, label) for k in d: if k not in ("tls_passthrough", "ssrf_ip_allowlist"): raise ManifestError( f"{label} has unknown key {k!r}; " f"only 'tls_passthrough' and 'ssrf_ip_allowlist' " f"are accepted" ) tls_passthrough_raw = d.get("tls_passthrough", False) if not isinstance(tls_passthrough_raw, bool): raise ManifestError( f"{label}.tls_passthrough must be a boolean " f"(was {type(tls_passthrough_raw).__name__})" ) ssrf_raw = d.get("ssrf_ip_allowlist", []) if not isinstance(ssrf_raw, list): raise ManifestError( f"{label}.ssrf_ip_allowlist must be an array " f"(was {type(ssrf_raw).__name__})" ) ssrf_ip_allowlist: list[str] = [] for j, item in enumerate(ssrf_raw): if not isinstance(item, str) or not item: raise ManifestError( f"{label}.ssrf_ip_allowlist[{j}] must be a non-empty " f"string (was {type(item).__name__})" ) try: ipaddress.ip_network(item, strict=False) except ValueError as e: raise ManifestError( f"{label}.ssrf_ip_allowlist[{j}] must be an IP address " f"or CIDR (was {item!r}): {e}" ) ssrf_ip_allowlist.append(item) return cls( TlsPassthrough=tls_passthrough_raw, SsrfIpAllowlist=tuple(ssrf_ip_allowlist), ) @dataclass(frozen=True) class EgressRoute: """One route on the per-bottle egress sidecar (PRD 0017). `Host` matches the request's hostname (case-insensitive). The optional `PathAllowlist` constrains the URL path to a set of prefixes; empty tuple means no path-level filtering. The optional `AuthScheme` / `TokenRef` pair drives credential injection: when set, the proxy strips any inbound Authorization and injects ` `. When the manifest's `auth` block is omitted both fields are empty strings — no Authorization is written, no token forwarded. `Role` is reserved for future use; all role strings are currently rejected by the validator. Validation rules (enforced in `from_dict`): - `host` required, non-empty. - `path_allowlist` optional, list of absolute path prefixes. - `auth` optional. If present, MUST carry both `scheme` and `token_ref` as non-empty strings; an empty `auth: {}` is an error rather than a synonym for "no auth" (omit `auth` for that case). - `role` optional, reserved — any non-empty value is rejected. """ Host: str PathAllowlist: tuple[str, ...] = () AuthScheme: str = "" TokenRef: str = "" Role: tuple[str, ...] = () Pipelock: PipelockRoutePolicy = field(default_factory=PipelockRoutePolicy) @classmethod def from_dict(cls, bottle_name: str, idx: int, raw: object) -> "EgressRoute": label = f"bottle '{bottle_name}' egress.routes[{idx}]" d = _as_json_object(raw, label) host = d.get("host") if not isinstance(host, str) or not host: raise ManifestError(f"{label} missing required string field 'host'") path_allow_raw = d.get("path_allowlist") prefixes: tuple[str, ...] = () if path_allow_raw is not None: if not isinstance(path_allow_raw, list): raise ManifestError( f"{label} path_allowlist must be an array " f"(was {type(path_allow_raw).__name__})" ) path_list = cast(list[object], path_allow_raw) collected: list[str] = [] for j, p in enumerate(path_list): if not isinstance(p, str): raise ManifestError( f"{label} path_allowlist[{j}] must be a string " f"(was {type(p).__name__})" ) if not p.startswith("/"): raise ManifestError( f"{label} path_allowlist[{j}] {p!r} must be an " f"absolute path prefix starting with '/'" ) collected.append(p) prefixes = tuple(collected) auth_scheme = "" token_ref = "" if "auth" in d: auth_raw = d.get("auth") auth_d = _as_json_object(auth_raw, f"{label} auth") if not auth_d: raise ManifestError( f"{label} auth is empty ({{}}); omit the 'auth' key " f"entirely if this route is unauthenticated. Otherwise " f"both 'scheme' and 'token_ref' are required." ) auth_scheme_raw = auth_d.get("scheme") if not isinstance(auth_scheme_raw, str) or not auth_scheme_raw: raise ManifestError( f"{label} auth.scheme is required when 'auth' is set " f"(non-empty string)" ) if auth_scheme_raw not in EGRESS_AUTH_SCHEMES: raise ManifestError( f"{label} auth.scheme {auth_scheme_raw!r} is not one of " f"{', '.join(EGRESS_AUTH_SCHEMES)}" ) token_ref_raw = auth_d.get("token_ref") if not isinstance(token_ref_raw, str) or not token_ref_raw: raise ManifestError( f"{label} auth.token_ref is required when 'auth' is set " f"(name of the host env var holding the token value)" ) for k in auth_d: if k not in ("scheme", "token_ref"): raise ManifestError( f"{label} auth has unknown key {k!r}; " f"only 'scheme' and 'token_ref' are accepted" ) auth_scheme = auth_scheme_raw token_ref = token_ref_raw role_raw = d.get("role") roles: tuple[str, ...] = () if role_raw is None: roles = () elif isinstance(role_raw, str): roles = (role_raw,) elif isinstance(role_raw, list): role_list = cast(list[object], role_raw) collected_roles: list[str] = [] for r in role_list: if not isinstance(r, str): raise ManifestError(f"{label} role items must be strings (got {type(r).__name__})") collected_roles.append(r) roles = tuple(collected_roles) else: raise ManifestError( f"{label} role must be a string or a list of strings " f"(was {type(role_raw).__name__})" ) if roles: raise ManifestError( f"{label} role {roles[0]!r} is not accepted; " f"the 'role' field is reserved for future use" ) pipelock = ( PipelockRoutePolicy.from_dict(bottle_name, idx, d["pipelock"]) if "pipelock" in d else PipelockRoutePolicy() ) for k in d: if k not in ("host", "path_allowlist", "auth", "role", "pipelock"): raise ManifestError( f"{label} has unknown key {k!r}; accepted keys are " f"'host', 'path_allowlist', 'auth', 'role', 'pipelock'" ) return cls( Host=host, PathAllowlist=prefixes, AuthScheme=auth_scheme, TokenRef=token_ref, Role=roles, Pipelock=pipelock, ) @dataclass(frozen=True) class EgressConfig: """Per-bottle egress configuration. Today this is just the route table; the nesting under `egress:` leaves room for per-bottle proxy settings (port override, log level, etc.) in follow-ups.""" routes: tuple[EgressRoute, ...] = () @classmethod def from_dict(cls, bottle_name: str, raw: object) -> "EgressConfig": d = _as_json_object(raw, f"bottle '{bottle_name}' egress") routes_raw = d.get("routes") routes: tuple[EgressRoute, ...] = () if routes_raw is not None: if not isinstance(routes_raw, list): raise ManifestError( f"bottle '{bottle_name}' egress.routes must be an array " f"(was {type(routes_raw).__name__})" ) routes_list = cast(list[object], routes_raw) routes = tuple( EgressRoute.from_dict(bottle_name, i, entry) for i, entry in enumerate(routes_list) ) _validate_egress_routes(bottle_name, routes) for k in d: if k != "routes": raise ManifestError( f"bottle '{bottle_name}' egress has unknown key {k!r}; " f"only 'routes' is accepted" ) return cls(routes=routes) @dataclass(frozen=True) class Bottle: env: Mapping[str, str] = field(default_factory=_empty_str_dict) agent_provider: AgentProvider = field(default_factory=AgentProvider) git: tuple[GitEntry, ...] = () # Per-bottle git identity (issue #86). Empty default — bottles # that don't set `git.user:` in the manifest skip the # `git config --global` step entirely. Set independently of # the `git.remotes:` upstream map above: a bottle can declare a user # identity without any git-gate upstreams, and vice versa. git_user: GitUser = field(default_factory=GitUser) egress: EgressConfig = field(default_factory=EgressConfig) # Opt-in per-bottle stuck-recovery sidecar (PRD 0013). When true, # the launch step brings up a supervise sidecar that exposes three # MCP tools to the agent (cred-proxy-block, pipelock-block, # capability-block; the cred-proxy-block tool is renamed and # retargeted at egress in PRD 0017 chunk 3) plus mounts the # current-config dir read-only into the agent at /etc/bot-bottle/ # current-config. False (the default) skips the sidecar and mount. supervise: bool = False @classmethod def from_dict(cls, name: str, raw: object) -> "Bottle": d = _as_json_object(raw, f"bottle '{name}'") if "runtime" in d: raise ManifestError( f"bottle '{name}' has a 'runtime' field, which is no longer " f"supported. gVisor (runsc) is now auto-detected by the " f"backend; remove the 'runtime' field from the bottle " f"definition." ) if "ssh" in d: raise ManifestError( f"bottle '{name}' has an 'ssh' field, which has been removed " f"(PRD 0009). Move each entry to 'git': declare the upstream " f"as a git remote with Name + Upstream URL + IdentityFile, " f"and the per-bottle git-gate (PRD 0008) will hold the " f"credential and gitleaks-scan pushes." ) if "git_user" in d: raise ManifestError( f"bottle '{name}' has a 'git_user' field, which has been " f"removed. Move it under 'git.user'." ) unknown = set(d.keys()) - BOTTLE_KEYS if unknown: allowed = ", ".join(sorted(BOTTLE_KEYS)) raise ManifestError( f"bottle '{name}' has unknown key(s) {sorted(unknown)}; " f"allowed keys are {allowed}." ) env: dict[str, str] = {} env_raw = d.get("env") if env_raw is not None: env_dict = _as_json_object(env_raw, f"bottle '{name}' env") for var, value in env_dict.items(): if not isinstance(value, str): raise ManifestError( f"env entry {var} in bottle '{name}' must be a JSON string " f"(was {type(value).__name__}). Use \"?\" for prompt-at-runtime." ) env[var] = value git: tuple[GitEntry, ...] = () git_user = GitUser() git_raw = d.get("git") if git_raw is not None: git, git_user = _parse_git_config(name, git_raw) agent_provider = ( AgentProvider.from_dict(name, d["agent_provider"]) if "agent_provider" in d else AgentProvider() ) egress = ( EgressConfig.from_dict(name, d["egress"]) if "egress" in d else EgressConfig() ) supervise_raw = d.get("supervise", False) if not isinstance(supervise_raw, bool): raise ManifestError( f"bottle '{name}' supervise must be a boolean " f"(was {type(supervise_raw).__name__})" ) return cls( env=env, agent_provider=agent_provider, git=git, git_user=git_user, egress=egress, supervise=supervise_raw, ) @dataclass(frozen=True) class Agent: bottle: str skills: tuple[str, ...] = () prompt: str = "" # Per-agent git identity (issue #94). Overlays the referenced # bottle's git.user per-field at `Manifest.bottle_for`. Only the # `user` block is allowed at the agent level; `git.remotes` stays # bottle-only because it carries credentials and host trust. git_user: GitUser = GitUser() @classmethod def from_dict(cls, name: str, raw: object, bottle_names: set[str]) -> "Agent": d = _as_json_object(raw, f"agent '{name}'") unknown = set(d.keys()) - AGENT_MODEL_KEYS if unknown: allowed = ", ".join(sorted(AGENT_MODEL_KEYS)) raise ManifestError( f"agent '{name}' has unknown key(s) {sorted(unknown)}; " f"allowed keys are {allowed}." ) bottle = d.get("bottle") if not isinstance(bottle, str) or not bottle: raise ManifestError(f"agent '{name}' must declare a 'bottle' field naming a defined bottle") if bottle not in bottle_names: available = ", ".join(sorted(bottle_names)) or "(none defined)" raise ManifestError( f"agent '{name}' references bottle '{bottle}', which is not defined. " f"Available: {available}" ) skills: tuple[str, ...] = () skills_raw = d.get("skills") if skills_raw is not None: if not isinstance(skills_raw, list): raise ManifestError(f"agent '{name}' skills must be an array (was {type(skills_raw).__name__})") collected: list[str] = [] skills_list = cast(list[object], skills_raw) for i, skill in enumerate(skills_list): if not isinstance(skill, str): raise ManifestError( f"agent '{name}' skills[{i}] must be a string " f"(was {type(skill).__name__})" ) collected.append(skill) skills = tuple(collected) prompt_raw = d.get("prompt") if prompt_raw is None: prompt = "" elif isinstance(prompt_raw, str): prompt = prompt_raw else: raise ManifestError(f"agent '{name}' prompt must be a string (was {type(prompt_raw).__name__})") # git: agents may declare only `git.user` (name/email). Any # other git key — notably `remotes` — is rejected: remotes # carry credentials and host trust and stay bottle-only. git_user = GitUser() git_raw = d.get("git") if git_raw is not None: gd = _as_json_object(git_raw, f"agent '{name}' git") for k in gd.keys(): if k != "user": raise ManifestError( f"agent '{name}' git.{k} is not allowed at the " f"agent level; only git.user (name/email) may be " f"set on an agent. git.remotes is bottle-only " f"(it carries credentials and host trust)." ) if "user" in gd: git_user = GitUser.from_dict(name, gd["user"]) return cls(bottle=bottle, skills=skills, prompt=prompt, git_user=git_user) @dataclass(frozen=True) class Manifest: bottles: Mapping[str, Bottle] agents: Mapping[str, Agent] @classmethod def resolve(cls, cwd: str, *, missing_ok: bool = False) -> "Manifest": """Walk the per-file manifest tree and build a Manifest. Layout (PRD 0011): $HOME/.bot-bottle/bottles/.md — bottles (home-only) $HOME/.bot-bottle/agents/.md — home agents $CWD/.bot-bottle/agents/.md — cwd agents Cwd agents merge into the home agents on the same name (cwd wins). A bottles/ subdir under $CWD is logged as a warning and ignored — the filesystem layout IS the trust boundary. If `missing_ok` is true, a missing `$HOME/.bot-bottle/` returns an empty manifest instead of dying. This is for passive UI surfaces like the dashboard, which can still monitor already-running agents without launch config. If `bot-bottle.json` exists alongside a missing `.bot-bottle/` directory at either side, dies with a clear pointer at the README's manifest section — the manifest format changed in PRD 0011 and we don't silently fall back.""" home_dir = Path(os.environ["HOME"]) cwd_dir = Path(cwd) home_md = home_dir / ".bot-bottle" cwd_md = cwd_dir / ".bot-bottle" from .manifest_loader import check_stale_json check_stale_json(home_dir, home_md, "$HOME") if cwd_dir.resolve() != home_dir.resolve(): check_stale_json(cwd_dir, cwd_md, "$CWD") if not home_md.is_dir(): if missing_ok: return cls.from_json_obj({"bottles": {}, "agents": {}}) raise ManifestError( f"no manifest found: {home_md} does not exist. " f"See README.md for the per-file Markdown layout " f"(PRD 0011)." ) # When CWD == HOME (running from $HOME directly), pass the # same dir for both — _load_md_dirs will dedupe. cwd_md_arg = cwd_md if cwd_md.is_dir() and cwd_dir.resolve() != home_dir.resolve() else None return cls.from_md_dirs(home_md, cwd_md_arg) @classmethod def from_md_dirs( cls, home_dir: Path, cwd_dir: Path | None, ) -> "Manifest": """Programmatic entry point. Loads bottles from `/bottles/`, home agents from `/agents/`, and (if `cwd_dir` is passed) cwd agents from `/agents/`. Cwd agents override home agents on name collision. A `bottles/` subdir under `cwd_dir` is logged as a warning and ignored. Used by tests to build a Manifest from fixture directories without touching `os.environ`.""" bottles_dir = home_dir / "bottles" from .manifest_loader import load_agents_from_dir, load_bottles_from_dir bottles = load_bottles_from_dir(bottles_dir) bottle_names = set(bottles.keys()) agents_dir = home_dir / "agents" agents = load_agents_from_dir(agents_dir, bottle_names, source="$HOME") if cwd_dir is not None: stale_bottles = cwd_dir / "bottles" if stale_bottles.is_dir(): files = sorted(stale_bottles.glob("*.md")) if files: names = ", ".join(p.name for p in files) warn( f"ignoring bottle file(s) under " f"{stale_bottles}: {names}. Bottles can only " f"live under $HOME/.bot-bottle/bottles/ " f"(PRD 0011). Move them or delete." ) cwd_agents_dir = cwd_dir / "agents" cwd_agents = load_agents_from_dir( cwd_agents_dir, bottle_names, source="$CWD" ) agents = {**agents, **cwd_agents} return cls(bottles=bottles, agents=agents) @classmethod def from_json_obj(cls, obj: object) -> "Manifest": """Validate and build a Manifest from a raw JSON-like dict.""" d = _as_json_object(obj, "manifest") raw_bottles_obj = _section_dict(d.get("bottles"), "manifest 'bottles'") raw_agents = _section_dict(d.get("agents"), "manifest 'agents'") # Coerce each bottle's raw to dict[str, object] so the # PRD 0025 resolver can apply extends-merge rules # consistently with the md-loader path. raw_bottles: dict[str, dict[str, object]] = {} for n, b in raw_bottles_obj.items(): raw_bottles[n] = _as_json_object(b, f"bottle '{n}'") from .manifest_extends import resolve_bottles bottles = resolve_bottles(raw_bottles) bottle_names = set(bottles.keys()) agents: dict[str, Agent] = { n: Agent.from_dict(n, a, bottle_names) for n, a in raw_agents.items() } return cls(bottles=bottles, agents=agents) def has_agent(self, name: str) -> bool: return name in self.agents def require_agent(self, name: str) -> None: if self.has_agent(name): return available = ", ".join(self.agents.keys()) if available: raise ManifestError(f"agent '{name}' not defined in bot-bottle.json. Available: {available}") raise ManifestError(f"agent '{name}' not defined in bot-bottle.json (manifest is empty).") def has_bottle(self, name: str) -> bool: return name in self.bottles def require_bottle(self, name: str) -> None: if self.has_bottle(name): return available = ", ".join(self.bottles.keys()) if available: raise ManifestError( f"bottle '{name}' not defined in bot-bottle.json. " f"Available bottles: {available}" ) raise ManifestError(f"bottle '{name}' not defined in bot-bottle.json (no bottles defined).") def _effective_git_user(self, agent_name: str) -> GitUser: """Merge the agent's git.user over the referenced bottle's, per-field, agent-wins-on-non-empty (issue #94). Same overlay the `extends:` resolver applies between bottles (`_merge_bottles`).""" agent = self.agents[agent_name] base = self.bottles[agent.bottle].git_user over = agent.git_user if over.is_empty(): return base return GitUser( name=over.name or base.name, email=over.email or base.email, ) def bottle_for(self, agent_name: str) -> Bottle: """Resolve the Bottle the named agent references, with the agent's git.user overlaid on top. The validator guarantees both lookups succeed for a manifest built via from_json_obj. The overlay lives here, the single point both backends call to resolve an agent's bottle, so the docker / smolmachines git provisioners pick up the merged identity unchanged.""" bottle = self.bottles[self.agents[agent_name].bottle] merged = self._effective_git_user(agent_name) if merged == bottle.git_user: return bottle return replace(bottle, git_user=merged) def git_identity_summary(self, agent_name: str) -> str | None: """One-line effective git identity with per-field provenance for launch summaries, e.g. `name=claude (agent), email=eric@dideric.is (bottle)`. Returns None when neither agent nor bottle sets an identity.""" over = self.agents[agent_name].git_user merged = self._effective_git_user(agent_name) if merged.is_empty(): return None parts: list[str] = [] if merged.name: parts.append(f"name={merged.name} ({'agent' if over.name else 'bottle'})") if merged.email: parts.append(f"email={merged.email} ({'agent' if over.email else 'bottle'})") return ", ".join(parts) def _as_json_object(value: object, label: str) -> dict[str, object]: """Assert that `value` is a JSON object (str-keyed dict) and return a view typed as `dict[str, object]` so downstream `.get(...)` calls have a typed surface.""" if not isinstance(value, dict): raise ManifestError(f"{label} must be a JSON object (was {type(value).__name__})") items = cast(dict[object, object], value) out: dict[str, object] = {} for k, v in items.items(): if not isinstance(k, str): raise ManifestError(f"{label} keys must be strings (found {type(k).__name__})") out[k] = v return out def _section_dict(value: object, label: str) -> dict[str, object]: """Like _as_json_object but treats absent/null as an empty section.""" if value is None: return {} return _as_json_object(value, label) def _opt_str(value: object, label: str) -> str: if value is None: return "" if not isinstance(value, str): raise ManifestError(f"{label} must be a string (was {type(value).__name__})") return value def _parse_git_upstream(url: str, label: str) -> tuple[str, str, str, str]: """Parse `ssh://user@host[:port]/path` into (user, host, port, path). Dies if `url` doesn't match the ssh:// shape v1 supports. Default port is 22 (matches OpenSSH).""" if not url.startswith("ssh://"): raise ManifestError(f"{label} must be an ssh:// URL (was {url!r})") rest = url[len("ssh://"):] if "@" not in rest: raise ManifestError(f"{label} must include a user (e.g. ssh://git@host/path.git); was {url!r}") user, _, hostpart = rest.partition("@") if not user: raise ManifestError(f"{label} user is empty in {url!r}") if "/" not in hostpart: raise ManifestError(f"{label} must include a path (e.g. ssh://git@host/path.git); was {url!r}") hostport, _, path = hostpart.partition("/") if not path: raise ManifestError(f"{label} path is empty in {url!r}") if ":" in hostport: host, _, port = hostport.partition(":") if not port.isdigit(): raise ManifestError(f"{label} port must be numeric in {url!r}") else: host = hostport port = "22" if not host: raise ManifestError(f"{label} host is empty in {url!r}") return (user, host, port, path) def _is_ip_literal(value: str) -> bool: try: ipaddress.ip_address(value) except ValueError: return False return True def _validate_egress_routes( bottle_name: str, routes: tuple[EgressRoute, ...], ) -> None: """Cross-validation for `bottle.egress.routes`: hosts must be unique. The proxy matches by exact-host (v1); duplicate hosts leave the route choice ambiguous so we reject them up front. No cross-validation against `bottle.git` is performed. git-gate (SSH push/fetch) and egress (HTTPS) broker different protocols; declaring both for the same host is a legitimate dev setup.""" seen_hosts: dict[str, None] = {} for r in routes: key = r.Host.lower() if key in seen_hosts: raise ManifestError( f"bottle '{bottle_name}' egress.routes has duplicate host " f"{r.Host!r}; each host must be unique on the proxy." ) seen_hosts[key] = None def _validate_unique_git_names(bottle_name: str, git: tuple[GitEntry, ...]) -> None: seen: dict[str, None] = {} for g in git: if g.Name in seen: raise ManifestError( f"bottle '{bottle_name}' git entries have duplicate Name '{g.Name}'; " f"each entry maps to a distinct bare repo on the gate." ) seen[g.Name] = None