From 9b81173699339786d62a8e41695d152770f93157 Mon Sep 17 00:00:00 2001 From: claude Date: Wed, 3 Jun 2026 04:16:42 +0000 Subject: [PATCH] refactor: split manifest.py into domain-specific modules Closes #157. Distributes the 1,026-line manifest.py across four focused modules: - _manifest_util.py: ManifestError + _as_json_object (shared base) - manifest_git.py: GitEntry, GitUser, git-gate config helpers - manifest_egress.py: EgressRoute, EgressConfig, PipelockRoutePolicy - manifest_agent.py: AgentProvider, Agent manifest.py is now the residual orchestration layer: Bottle, Manifest, and re-exports of all public names so existing callers are unaffected. All 867 unit tests pass. --- bot_bottle/_manifest_util.py | 24 ++ bot_bottle/manifest.py | 707 ++------------------------------- bot_bottle/manifest_agent.py | 166 ++++++++ bot_bottle/manifest_egress.py | 294 ++++++++++++++ bot_bottle/manifest_extends.py | 5 +- bot_bottle/manifest_git.py | 211 ++++++++++ bot_bottle/manifest_schema.py | 2 +- 7 files changed, 734 insertions(+), 675 deletions(-) create mode 100644 bot_bottle/_manifest_util.py create mode 100644 bot_bottle/manifest_agent.py create mode 100644 bot_bottle/manifest_egress.py create mode 100644 bot_bottle/manifest_git.py diff --git a/bot_bottle/_manifest_util.py b/bot_bottle/_manifest_util.py new file mode 100644 index 0000000..eff6012 --- /dev/null +++ b/bot_bottle/_manifest_util.py @@ -0,0 +1,24 @@ +"""Shared manifest primitives used by all manifest sub-modules.""" + +from __future__ import annotations + +from typing import cast + + +class ManifestError(Exception): + """A manifest file (or the manifest tree) is invalid.""" + + +def _as_json_object(value: object, label: str) -> dict[str, object]: + """Assert that `value` is a JSON object (str-keyed dict) and return + a view typed as `dict[str, object]` so downstream `.get(...)` calls + have a typed surface.""" + if not isinstance(value, dict): + raise ManifestError(f"{label} must be a JSON object (was {type(value).__name__})") + items = cast(dict[object, object], value) + out: dict[str, object] = {} + for k, v in items.items(): + if not isinstance(k, str): + raise ManifestError(f"{label} keys must be strings (found {type(k).__name__})") + out[k] = v + return out diff --git a/bot_bottle/manifest.py b/bot_bottle/manifest.py index 89a3a87..9ec7656 100644 --- a/bot_bottle/manifest.py +++ b/bot_bottle/manifest.py @@ -45,511 +45,51 @@ on-disk files. from __future__ import annotations -import ipaddress import os from dataclasses import dataclass, field, replace from pathlib import Path -from typing import Mapping, cast +from typing import Mapping -from .agent_provider import PROVIDER_TEMPLATES -from .log import warn -from .manifest_schema import AGENT_MODEL_KEYS, BOTTLE_KEYS +from ._manifest_util import ManifestError, _as_json_object +from .manifest_agent import Agent, AgentProvider +from .manifest_egress import ( + EGRESS_AUTH_SCHEMES, + EgressConfig, + EgressRoute, + PipelockRoutePolicy, + _validate_egress_routes, +) +from .manifest_git import GitEntry, GitUser, _parse_git_gate_config +from .manifest_schema import BOTTLE_KEYS - -class ManifestError(Exception): - """A manifest file (or the manifest tree) is invalid.""" +# Re-export everything that callers currently import from this module. +__all__ = [ + "ManifestError", + "GitEntry", + "GitUser", + "AgentProvider", + "EGRESS_AUTH_SCHEMES", + "PipelockRoutePolicy", + "EgressRoute", + "EgressConfig", + "Agent", + "Bottle", + "Manifest", + # private helpers used by manifest_extends / manifest_loader + "_as_json_object", + "_validate_egress_routes", +] def _empty_str_dict() -> dict[str, str]: return {} -@dataclass(frozen=True) -class GitEntry: - """One upstream the per-agent git-gate (PRD 0008) is allowed to - talk to. `Upstream` is the real remote URL the agent would push to - if there were no gate; the gate hosts a bare repo at /git/.git - and `IdentityFile` is the SSH key the gate uses to push that repo - upstream after gitleaks passes. The agent itself never holds the - upstream credential. - - The Upstream URL is parsed once at construction and the pieces are - stashed in the `Upstream*` fields so the git-gate render step - doesn't have to re-parse. - - Manifest source: `git-gate.repos.` (PRD 0047). The YAML keys - are `url`, `identity`, and `host_key`; the internal field names are - stable across that rename.""" - - Name: str - Upstream: str - IdentityFile: str - KnownHostKey: str = "" - RemoteKey: str = "" - UpstreamUser: str = "" - UpstreamHost: str = "" - UpstreamPort: str = "" - UpstreamPath: str = "" - - @classmethod - def from_repos_entry( - cls, bottle_name: str, repo_name: str, raw: object - ) -> "GitEntry": - """Parse one entry from `git-gate.repos.`. - - YAML keys: `url` (required), `identity` (required), - `host_key` (optional). The repo_name becomes `Name`.""" - if not repo_name: - raise ManifestError( - f"bottle '{bottle_name}' git-gate.repos has an empty key" - ) - label = f"git-gate.repos[{repo_name!r}]" - d = _as_json_object(raw, f"bottle '{bottle_name}' {label}") - for k in d: - if k not in {"url", "identity", "host_key"}: - raise ManifestError( - f"bottle '{bottle_name}' {label} has unknown key {k!r}; " - f"allowed: url, identity, host_key" - ) - upstream = d.get("url") - if not isinstance(upstream, str) or not upstream: - raise ManifestError( - f"bottle '{bottle_name}' {label} missing required string field 'url'" - ) - ident = d.get("identity") - if not isinstance(ident, str) or not ident: - raise ManifestError( - f"bottle '{bottle_name}' {label} missing required string field 'identity'" - ) - khk = _opt_str( - d.get("host_key"), - f"bottle '{bottle_name}' {label} host_key", - ) - user, host, port, path = _parse_git_upstream( - upstream, f"bottle '{bottle_name}' {label} url" - ) - return cls( - Name=repo_name, - Upstream=upstream, - IdentityFile=ident, - KnownHostKey=khk, - RemoteKey=host, - UpstreamUser=user, - UpstreamHost=host, - UpstreamPort=port, - UpstreamPath=path, - ) - - -# Auth schemes for the egress route's optional `auth` block. -# Same values cred-proxy accepts today; `token` sidesteps the Gitea -# token-not-Bearer quirk (go-gitea/gitea#16734). -EGRESS_AUTH_SCHEMES = ("Bearer", "token") - - -@dataclass(frozen=True) -class AgentProvider: - """Provider/template for the agent process inside a bottle. - - `template` selects a built-in launch/runtime contract. `dockerfile` - optionally points at a custom agent-image Dockerfile while leaving - bot-bottle's sidecar infrastructure intact. - - `auth_token` names the host env var that holds the provider's OAuth - token (Claude only). The provisioner injects a provider-owned egress - route for api.anthropic.com that re-injects this token as the Bearer - header, and sets a placeholder CLAUDE_CODE_OAUTH_TOKEN in the agent - so the Claude Code CLI starts. - - `forward_host_credentials` forwards the host Codex auth token into - the egress sidecar (Codex only). - """ - - template: str = "claude" - dockerfile: str = "" - auth_token: str = "" - forward_host_credentials: bool = False - - @classmethod - def from_dict(cls, bottle_name: str, raw: object) -> "AgentProvider": - d = _as_json_object(raw, f"bottle '{bottle_name}' agent_provider") - for k in d: - if k not in {"template", "dockerfile", "auth_token", "forward_host_credentials"}: - raise ManifestError( - f"bottle '{bottle_name}' agent_provider has unknown key {k!r}; " - f"allowed: template, dockerfile, auth_token, forward_host_credentials" - ) - template = d.get("template", "claude") - if not isinstance(template, str) or not template: - raise ManifestError( - f"bottle '{bottle_name}' agent_provider.template must be a " - f"non-empty string" - ) - if template not in PROVIDER_TEMPLATES: - raise ManifestError( - f"bottle '{bottle_name}' agent_provider.template {template!r} " - f"is not one of {', '.join(sorted(PROVIDER_TEMPLATES))}" - ) - dockerfile = d.get("dockerfile", "") - if not isinstance(dockerfile, str): - raise ManifestError( - f"bottle '{bottle_name}' agent_provider.dockerfile must be a " - f"string (was {type(dockerfile).__name__})" - ) - auth_token = d.get("auth_token", "") - if not isinstance(auth_token, str): - raise ManifestError( - f"bottle '{bottle_name}' agent_provider.auth_token must be a " - f"string (was {type(auth_token).__name__})" - ) - if auth_token and template != "claude": - raise ManifestError( - f"bottle '{bottle_name}' agent_provider.auth_token is only " - f"supported for template 'claude'" - ) - forward_host_credentials = d.get("forward_host_credentials", False) - if not isinstance(forward_host_credentials, bool): - raise ManifestError( - f"bottle '{bottle_name}' agent_provider.forward_host_credentials " - f"must be a boolean (was {type(forward_host_credentials).__name__})" - ) - if forward_host_credentials and template != "codex": - raise ManifestError( - f"bottle '{bottle_name}' agent_provider.forward_host_credentials " - "is currently only supported for template 'codex'" - ) - return cls( - template=template, - dockerfile=dockerfile, - auth_token=auth_token, - forward_host_credentials=forward_host_credentials, - ) - - -@dataclass(frozen=True) -class GitUser: - """Per-bottle `git config --global user.name` / `user.email` - pair (issue #86). The agent's commits inside the bottle are - attributed to this identity rather than the agent image's - image-baked default (no user, or whatever the image dropped - in). Either or both fields can be set independently. - - `from_dict` is forgiving on shape (a single missing field is - fine — we just skip that config line at provisioning) but - strict on types (string-or-die).""" - - name: str = "" - email: str = "" - - @classmethod - def from_dict(cls, bottle_name: str, raw: object) -> "GitUser": - d = _as_json_object(raw, f"bottle '{bottle_name}' git-gate.user") - for k in d.keys(): - if k not in {"name", "email"}: - raise ManifestError( - f"bottle '{bottle_name}' git-gate.user has unknown key {k!r}; " - f"allowed: name, email" - ) - name = d.get("name", "") - email = d.get("email", "") - if not isinstance(name, str): - raise ManifestError( - f"bottle '{bottle_name}' git-gate.user.name must be a string " - f"(was {type(name).__name__})" - ) - if not isinstance(email, str): - raise ManifestError( - f"bottle '{bottle_name}' git-gate.user.email must be a string " - f"(was {type(email).__name__})" - ) - if not name and not email: - raise ManifestError( - f"bottle '{bottle_name}' git-gate.user is set but neither " - f"name nor email is non-empty; remove the block or " - f"fill at least one field." - ) - return cls(name=name, email=email) - - def is_empty(self) -> bool: - return not self.name and not self.email - - -def _parse_git_gate_config( - bottle_name: str, - raw: object, -) -> tuple[tuple[GitEntry, ...], GitUser]: - d = _as_json_object(raw, f"bottle '{bottle_name}' git-gate") - for k in d.keys(): - if k not in {"user", "repos"}: - raise ManifestError( - f"bottle '{bottle_name}' git-gate has unknown key {k!r}; " - f"allowed: user, repos" - ) - - git_user = ( - GitUser.from_dict(bottle_name, d["user"]) - if "user" in d - else GitUser() - ) - - git: tuple[GitEntry, ...] = () - repos_raw = d.get("repos") - if repos_raw is not None: - repos = _as_json_object(repos_raw, f"bottle '{bottle_name}' git-gate.repos") - git = tuple( - GitEntry.from_repos_entry(bottle_name, name, entry) - for name, entry in repos.items() - ) - _validate_unique_git_names(bottle_name, git) - - return git, git_user - - -@dataclass(frozen=True) -class PipelockRoutePolicy: - """Per-route pipelock policy overrides. - - `TlsPassthrough` adds the route host to pipelock's - `tls_interception.passthrough_domains`, so pipelock still enforces - the hostname allowlist but does not MITM/decrypt request bodies or - headers for that host. - - `SsrfIpAllowlist` adds explicit IPs/CIDRs to pipelock's SSRF - allowlist for private/internal destinations behind this route. - """ - - TlsPassthrough: bool = False - SsrfIpAllowlist: tuple[str, ...] = () - - @classmethod - def from_dict( - cls, bottle_name: str, idx: int, raw: object, - ) -> "PipelockRoutePolicy": - label = f"bottle '{bottle_name}' egress.routes[{idx}] pipelock" - d = _as_json_object(raw, label) - for k in d: - if k not in ("tls_passthrough", "ssrf_ip_allowlist"): - raise ManifestError( - f"{label} has unknown key {k!r}; " - f"only 'tls_passthrough' and 'ssrf_ip_allowlist' " - f"are accepted" - ) - tls_passthrough_raw = d.get("tls_passthrough", False) - if not isinstance(tls_passthrough_raw, bool): - raise ManifestError( - f"{label}.tls_passthrough must be a boolean " - f"(was {type(tls_passthrough_raw).__name__})" - ) - ssrf_raw = d.get("ssrf_ip_allowlist", []) - if not isinstance(ssrf_raw, list): - raise ManifestError( - f"{label}.ssrf_ip_allowlist must be an array " - f"(was {type(ssrf_raw).__name__})" - ) - ssrf_ip_allowlist: list[str] = [] - for j, item in enumerate(ssrf_raw): - if not isinstance(item, str) or not item: - raise ManifestError( - f"{label}.ssrf_ip_allowlist[{j}] must be a non-empty " - f"string (was {type(item).__name__})" - ) - try: - ipaddress.ip_network(item, strict=False) - except ValueError as e: - raise ManifestError( - f"{label}.ssrf_ip_allowlist[{j}] must be an IP address " - f"or CIDR (was {item!r}): {e}" - ) - ssrf_ip_allowlist.append(item) - return cls( - TlsPassthrough=tls_passthrough_raw, - SsrfIpAllowlist=tuple(ssrf_ip_allowlist), - ) - - -@dataclass(frozen=True) -class EgressRoute: - """One route on the per-bottle egress sidecar (PRD 0017). - - `Host` matches the request's hostname (case-insensitive). The - optional `PathAllowlist` constrains the URL path to a set of - prefixes; empty tuple means no path-level filtering. The optional - `AuthScheme` / `TokenRef` pair drives credential injection: - when set, the proxy strips any inbound Authorization and injects - ` `. When the - manifest's `auth` block is omitted both fields are empty strings — - no Authorization is written, no token forwarded. - - `Role` is reserved for future use; all role strings are currently - rejected by the validator. - - Validation rules (enforced in `from_dict`): - - `host` required, non-empty. - - `path_allowlist` optional, list of absolute path prefixes. - - `auth` optional. If present, MUST carry both `scheme` and - `token_ref` as non-empty strings; an empty `auth: {}` is an - error rather than a synonym for "no auth" (omit `auth` for - that case). - - `role` optional, reserved — any non-empty value is rejected. - """ - - Host: str - PathAllowlist: tuple[str, ...] = () - AuthScheme: str = "" - TokenRef: str = "" - Role: tuple[str, ...] = () - Pipelock: PipelockRoutePolicy = field(default_factory=PipelockRoutePolicy) - - @classmethod - def from_dict(cls, bottle_name: str, idx: int, raw: object) -> "EgressRoute": - label = f"bottle '{bottle_name}' egress.routes[{idx}]" - d = _as_json_object(raw, label) - host = d.get("host") - if not isinstance(host, str) or not host: - raise ManifestError(f"{label} missing required string field 'host'") - - path_allow_raw = d.get("path_allowlist") - prefixes: tuple[str, ...] = () - if path_allow_raw is not None: - if not isinstance(path_allow_raw, list): - raise ManifestError( - f"{label} path_allowlist must be an array " - f"(was {type(path_allow_raw).__name__})" - ) - path_list = cast(list[object], path_allow_raw) - collected: list[str] = [] - for j, p in enumerate(path_list): - if not isinstance(p, str): - raise ManifestError( - f"{label} path_allowlist[{j}] must be a string " - f"(was {type(p).__name__})" - ) - if not p.startswith("/"): - raise ManifestError( - f"{label} path_allowlist[{j}] {p!r} must be an " - f"absolute path prefix starting with '/'" - ) - collected.append(p) - prefixes = tuple(collected) - - auth_scheme = "" - token_ref = "" - if "auth" in d: - auth_raw = d.get("auth") - auth_d = _as_json_object(auth_raw, f"{label} auth") - if not auth_d: - raise ManifestError( - f"{label} auth is empty ({{}}); omit the 'auth' key " - f"entirely if this route is unauthenticated. Otherwise " - f"both 'scheme' and 'token_ref' are required." - ) - auth_scheme_raw = auth_d.get("scheme") - if not isinstance(auth_scheme_raw, str) or not auth_scheme_raw: - raise ManifestError( - f"{label} auth.scheme is required when 'auth' is set " - f"(non-empty string)" - ) - if auth_scheme_raw not in EGRESS_AUTH_SCHEMES: - raise ManifestError( - f"{label} auth.scheme {auth_scheme_raw!r} is not one of " - f"{', '.join(EGRESS_AUTH_SCHEMES)}" - ) - token_ref_raw = auth_d.get("token_ref") - if not isinstance(token_ref_raw, str) or not token_ref_raw: - raise ManifestError( - f"{label} auth.token_ref is required when 'auth' is set " - f"(name of the host env var holding the token value)" - ) - for k in auth_d: - if k not in ("scheme", "token_ref"): - raise ManifestError( - f"{label} auth has unknown key {k!r}; " - f"only 'scheme' and 'token_ref' are accepted" - ) - auth_scheme = auth_scheme_raw - token_ref = token_ref_raw - - role_raw = d.get("role") - roles: tuple[str, ...] = () - if role_raw is None: - roles = () - elif isinstance(role_raw, str): - roles = (role_raw,) - elif isinstance(role_raw, list): - role_list = cast(list[object], role_raw) - collected_roles: list[str] = [] - for r in role_list: - if not isinstance(r, str): - raise ManifestError(f"{label} role items must be strings (got {type(r).__name__})") - collected_roles.append(r) - roles = tuple(collected_roles) - else: - raise ManifestError( - f"{label} role must be a string or a list of strings " - f"(was {type(role_raw).__name__})" - ) - if roles: - raise ManifestError( - f"{label} role {roles[0]!r} is not accepted; " - f"the 'role' field is reserved for future use" - ) - - pipelock = ( - PipelockRoutePolicy.from_dict(bottle_name, idx, d["pipelock"]) - if "pipelock" in d - else PipelockRoutePolicy() - ) - - for k in d: - if k not in ("host", "path_allowlist", "auth", "role", "pipelock"): - raise ManifestError( - f"{label} has unknown key {k!r}; accepted keys are " - f"'host', 'path_allowlist', 'auth', 'role', 'pipelock'" - ) - - return cls( - Host=host, - PathAllowlist=prefixes, - AuthScheme=auth_scheme, - TokenRef=token_ref, - Role=roles, - Pipelock=pipelock, - ) - - -@dataclass(frozen=True) -class EgressConfig: - """Per-bottle egress configuration. Today this is just the - route table; the nesting under `egress:` leaves room for - per-bottle proxy settings (port override, log level, etc.) in - follow-ups.""" - - routes: tuple[EgressRoute, ...] = () - - @classmethod - def from_dict(cls, bottle_name: str, raw: object) -> "EgressConfig": - d = _as_json_object(raw, f"bottle '{bottle_name}' egress") - routes_raw = d.get("routes") - routes: tuple[EgressRoute, ...] = () - if routes_raw is not None: - if not isinstance(routes_raw, list): - raise ManifestError( - f"bottle '{bottle_name}' egress.routes must be an array " - f"(was {type(routes_raw).__name__})" - ) - routes_list = cast(list[object], routes_raw) - routes = tuple( - EgressRoute.from_dict(bottle_name, i, entry) - for i, entry in enumerate(routes_list) - ) - _validate_egress_routes(bottle_name, routes) - for k in d: - if k != "routes": - raise ManifestError( - f"bottle '{bottle_name}' egress has unknown key {k!r}; " - f"only 'routes' is accepted" - ) - return cls(routes=routes) +def _section_dict(value: object, label: str) -> dict[str, object]: + """Like _as_json_object but treats absent/null as an empty section.""" + if value is None: + return {} + return _as_json_object(value, label) @dataclass(frozen=True) @@ -656,82 +196,6 @@ class Bottle: ) -@dataclass(frozen=True) -class Agent: - bottle: str - skills: tuple[str, ...] = () - prompt: str = "" - # Per-agent git identity (issue #94). Overlays the referenced - # bottle's git-gate.user per-field at `Manifest.bottle_for`. Only - # `user` is allowed at the agent level; `repos` stays bottle-only - # because it carries credentials and host trust. - git_user: GitUser = GitUser() - - @classmethod - def from_dict(cls, name: str, raw: object, bottle_names: set[str]) -> "Agent": - d = _as_json_object(raw, f"agent '{name}'") - unknown = set(d.keys()) - AGENT_MODEL_KEYS - if unknown: - allowed = ", ".join(sorted(AGENT_MODEL_KEYS)) - raise ManifestError( - f"agent '{name}' has unknown key(s) {sorted(unknown)}; " - f"allowed keys are {allowed}." - ) - - bottle = d.get("bottle") - if not isinstance(bottle, str) or not bottle: - raise ManifestError(f"agent '{name}' must declare a 'bottle' field naming a defined bottle") - if bottle not in bottle_names: - available = ", ".join(sorted(bottle_names)) or "(none defined)" - raise ManifestError( - f"agent '{name}' references bottle '{bottle}', which is not defined. " - f"Available: {available}" - ) - - skills: tuple[str, ...] = () - skills_raw = d.get("skills") - if skills_raw is not None: - if not isinstance(skills_raw, list): - raise ManifestError(f"agent '{name}' skills must be an array (was {type(skills_raw).__name__})") - collected: list[str] = [] - skills_list = cast(list[object], skills_raw) - for i, skill in enumerate(skills_list): - if not isinstance(skill, str): - raise ManifestError( - f"agent '{name}' skills[{i}] must be a string " - f"(was {type(skill).__name__})" - ) - collected.append(skill) - skills = tuple(collected) - - prompt_raw = d.get("prompt") - if prompt_raw is None: - prompt = "" - elif isinstance(prompt_raw, str): - prompt = prompt_raw - else: - raise ManifestError(f"agent '{name}' prompt must be a string (was {type(prompt_raw).__name__})") - - # git-gate: agents may declare only `git-gate.user` (name/email). - # `git-gate.repos` is bottle-only — it carries credentials and host trust. - git_user = GitUser() - git_raw = d.get("git-gate") - if git_raw is not None: - gd = _as_json_object(git_raw, f"agent '{name}' git-gate") - for k in gd.keys(): - if k != "user": - raise ManifestError( - f"agent '{name}' git-gate.{k} is not allowed at the " - f"agent level; only git-gate.user (name/email) may be " - f"set on an agent. git-gate.repos is bottle-only " - f"(it carries credentials and host trust)." - ) - if "user" in gd: - git_user = GitUser.from_dict(name, gd["user"]) - - return cls(bottle=bottle, skills=skills, prompt=prompt, git_user=git_user) - - @dataclass(frozen=True) class Manifest: bottles: Mapping[str, Bottle] @@ -816,6 +280,7 @@ class Manifest: files = sorted(stale_bottles.glob("*.md")) if files: names = ", ".join(p.name for p in files) + from .log import warn warn( f"ignoring bottle file(s) under " f"{stale_bottles}: {names}. Bottles can only " @@ -922,105 +387,3 @@ class Manifest: if merged.email: parts.append(f"email={merged.email} ({'agent' if over.email else 'bottle'})") return ", ".join(parts) - - -def _as_json_object(value: object, label: str) -> dict[str, object]: - """Assert that `value` is a JSON object (str-keyed dict) and return - a view typed as `dict[str, object]` so downstream `.get(...)` calls - have a typed surface.""" - if not isinstance(value, dict): - raise ManifestError(f"{label} must be a JSON object (was {type(value).__name__})") - items = cast(dict[object, object], value) - out: dict[str, object] = {} - for k, v in items.items(): - if not isinstance(k, str): - raise ManifestError(f"{label} keys must be strings (found {type(k).__name__})") - out[k] = v - return out - - -def _section_dict(value: object, label: str) -> dict[str, object]: - """Like _as_json_object but treats absent/null as an empty section.""" - if value is None: - return {} - return _as_json_object(value, label) - - -def _opt_str(value: object, label: str) -> str: - if value is None: - return "" - if not isinstance(value, str): - raise ManifestError(f"{label} must be a string (was {type(value).__name__})") - return value - - -def _parse_git_upstream(url: str, label: str) -> tuple[str, str, str, str]: - """Parse `ssh://user@host[:port]/path` into (user, host, port, path). - Dies if `url` doesn't match the ssh:// shape v1 supports. Default - port is 22 (matches OpenSSH).""" - if not url.startswith("ssh://"): - raise ManifestError(f"{label} must be an ssh:// URL (was {url!r})") - rest = url[len("ssh://"):] - if "@" not in rest: - raise ManifestError(f"{label} must include a user (e.g. ssh://git@host/path.git); was {url!r}") - user, _, hostpart = rest.partition("@") - if not user: - raise ManifestError(f"{label} user is empty in {url!r}") - if "/" not in hostpart: - raise ManifestError(f"{label} must include a path (e.g. ssh://git@host/path.git); was {url!r}") - hostport, _, path = hostpart.partition("/") - if not path: - raise ManifestError(f"{label} path is empty in {url!r}") - if ":" in hostport: - host, _, port = hostport.partition(":") - if not port.isdigit(): - raise ManifestError(f"{label} port must be numeric in {url!r}") - else: - host = hostport - port = "22" - if not host: - raise ManifestError(f"{label} host is empty in {url!r}") - return (user, host, port, path) - - -def _is_ip_literal(value: str) -> bool: - try: - ipaddress.ip_address(value) - except ValueError: - return False - return True - - -def _validate_egress_routes( - bottle_name: str, - routes: tuple[EgressRoute, ...], -) -> None: - """Cross-validation for `bottle.egress.routes`: hosts must be unique. - - The proxy matches by exact-host (v1); duplicate hosts leave the - route choice ambiguous so we reject them up front. - - No cross-validation against `bottle.git-gate.repos` is performed. - git-gate (SSH push/fetch) and egress (HTTPS) broker different - protocols; declaring both for the same host is a legitimate dev - setup.""" - seen_hosts: dict[str, None] = {} - for r in routes: - key = r.Host.lower() - if key in seen_hosts: - raise ManifestError( - f"bottle '{bottle_name}' egress.routes has duplicate host " - f"{r.Host!r}; each host must be unique on the proxy." - ) - seen_hosts[key] = None - - -def _validate_unique_git_names(bottle_name: str, git: tuple[GitEntry, ...]) -> None: - seen: dict[str, None] = {} - for g in git: - if g.Name in seen: - raise ManifestError( - f"bottle '{bottle_name}' git-gate.repos has duplicate name '{g.Name}'; " - f"each entry maps to a distinct bare repo on the gate." - ) - seen[g.Name] = None diff --git a/bot_bottle/manifest_agent.py b/bot_bottle/manifest_agent.py new file mode 100644 index 0000000..2f0568c --- /dev/null +++ b/bot_bottle/manifest_agent.py @@ -0,0 +1,166 @@ +"""Agent configuration manifest dataclasses.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import cast + +from .agent_provider import PROVIDER_TEMPLATES +from ._manifest_util import ManifestError, _as_json_object +from .manifest_git import GitUser +from .manifest_schema import AGENT_MODEL_KEYS + + +@dataclass(frozen=True) +class AgentProvider: + """Provider/template for the agent process inside a bottle. + + `template` selects a built-in launch/runtime contract. `dockerfile` + optionally points at a custom agent-image Dockerfile while leaving + bot-bottle's sidecar infrastructure intact. + + `auth_token` names the host env var that holds the provider's OAuth + token (Claude only). The provisioner injects a provider-owned egress + route for api.anthropic.com that re-injects this token as the Bearer + header, and sets a placeholder CLAUDE_CODE_OAUTH_TOKEN in the agent + so the Claude Code CLI starts. + + `forward_host_credentials` forwards the host Codex auth token into + the egress sidecar (Codex only). + """ + + template: str = "claude" + dockerfile: str = "" + auth_token: str = "" + forward_host_credentials: bool = False + + @classmethod + def from_dict(cls, bottle_name: str, raw: object) -> "AgentProvider": + d = _as_json_object(raw, f"bottle '{bottle_name}' agent_provider") + for k in d: + if k not in {"template", "dockerfile", "auth_token", "forward_host_credentials"}: + raise ManifestError( + f"bottle '{bottle_name}' agent_provider has unknown key {k!r}; " + f"allowed: template, dockerfile, auth_token, forward_host_credentials" + ) + template = d.get("template", "claude") + if not isinstance(template, str) or not template: + raise ManifestError( + f"bottle '{bottle_name}' agent_provider.template must be a " + f"non-empty string" + ) + if template not in PROVIDER_TEMPLATES: + raise ManifestError( + f"bottle '{bottle_name}' agent_provider.template {template!r} " + f"is not one of {', '.join(sorted(PROVIDER_TEMPLATES))}" + ) + dockerfile = d.get("dockerfile", "") + if not isinstance(dockerfile, str): + raise ManifestError( + f"bottle '{bottle_name}' agent_provider.dockerfile must be a " + f"string (was {type(dockerfile).__name__})" + ) + auth_token = d.get("auth_token", "") + if not isinstance(auth_token, str): + raise ManifestError( + f"bottle '{bottle_name}' agent_provider.auth_token must be a " + f"string (was {type(auth_token).__name__})" + ) + if auth_token and template != "claude": + raise ManifestError( + f"bottle '{bottle_name}' agent_provider.auth_token is only " + f"supported for template 'claude'" + ) + forward_host_credentials = d.get("forward_host_credentials", False) + if not isinstance(forward_host_credentials, bool): + raise ManifestError( + f"bottle '{bottle_name}' agent_provider.forward_host_credentials " + f"must be a boolean (was {type(forward_host_credentials).__name__})" + ) + if forward_host_credentials and template != "codex": + raise ManifestError( + f"bottle '{bottle_name}' agent_provider.forward_host_credentials " + "is currently only supported for template 'codex'" + ) + return cls( + template=template, + dockerfile=dockerfile, + auth_token=auth_token, + forward_host_credentials=forward_host_credentials, + ) + + +@dataclass(frozen=True) +class Agent: + bottle: str + skills: tuple[str, ...] = () + prompt: str = "" + # Per-agent git identity (issue #94). Overlays the referenced + # bottle's git-gate.user per-field at `Manifest.bottle_for`. Only + # `user` is allowed at the agent level; `repos` stays bottle-only + # because it carries credentials and host trust. + git_user: GitUser = GitUser() + + @classmethod + def from_dict(cls, name: str, raw: object, bottle_names: set[str]) -> "Agent": + d = _as_json_object(raw, f"agent '{name}'") + unknown = set(d.keys()) - AGENT_MODEL_KEYS + if unknown: + allowed = ", ".join(sorted(AGENT_MODEL_KEYS)) + raise ManifestError( + f"agent '{name}' has unknown key(s) {sorted(unknown)}; " + f"allowed keys are {allowed}." + ) + + bottle = d.get("bottle") + if not isinstance(bottle, str) or not bottle: + raise ManifestError(f"agent '{name}' must declare a 'bottle' field naming a defined bottle") + if bottle not in bottle_names: + available = ", ".join(sorted(bottle_names)) or "(none defined)" + raise ManifestError( + f"agent '{name}' references bottle '{bottle}', which is not defined. " + f"Available: {available}" + ) + + skills: tuple[str, ...] = () + skills_raw = d.get("skills") + if skills_raw is not None: + if not isinstance(skills_raw, list): + raise ManifestError(f"agent '{name}' skills must be an array (was {type(skills_raw).__name__})") + collected: list[str] = [] + skills_list = cast(list[object], skills_raw) + for i, skill in enumerate(skills_list): + if not isinstance(skill, str): + raise ManifestError( + f"agent '{name}' skills[{i}] must be a string " + f"(was {type(skill).__name__})" + ) + collected.append(skill) + skills = tuple(collected) + + prompt_raw = d.get("prompt") + if prompt_raw is None: + prompt = "" + elif isinstance(prompt_raw, str): + prompt = prompt_raw + else: + raise ManifestError(f"agent '{name}' prompt must be a string (was {type(prompt_raw).__name__})") + + # git-gate: agents may declare only `git-gate.user` (name/email). + # `git-gate.repos` is bottle-only — it carries credentials and host trust. + git_user = GitUser() + git_raw = d.get("git-gate") + if git_raw is not None: + gd = _as_json_object(git_raw, f"agent '{name}' git-gate") + for k in gd.keys(): + if k != "user": + raise ManifestError( + f"agent '{name}' git-gate.{k} is not allowed at the " + f"agent level; only git-gate.user (name/email) may be " + f"set on an agent. git-gate.repos is bottle-only " + f"(it carries credentials and host trust)." + ) + if "user" in gd: + git_user = GitUser.from_dict(name, gd["user"]) + + return cls(bottle=bottle, skills=skills, prompt=prompt, git_user=git_user) diff --git a/bot_bottle/manifest_egress.py b/bot_bottle/manifest_egress.py new file mode 100644 index 0000000..4c1eb28 --- /dev/null +++ b/bot_bottle/manifest_egress.py @@ -0,0 +1,294 @@ +"""Egress routing manifest dataclasses and helpers.""" + +from __future__ import annotations + +import ipaddress +from dataclasses import dataclass, field +from typing import cast + +from ._manifest_util import ManifestError, _as_json_object + + +# Auth schemes for the egress route's optional `auth` block. +# Same values cred-proxy accepts today; `token` sidesteps the Gitea +# token-not-Bearer quirk (go-gitea/gitea#16734). +EGRESS_AUTH_SCHEMES = ("Bearer", "token") + + +def _is_ip_literal(value: str) -> bool: + try: + ipaddress.ip_address(value) + except ValueError: + return False + return True + + +def _validate_egress_routes( + bottle_name: str, + routes: tuple[EgressRoute, ...], +) -> None: + """Cross-validation for `bottle.egress.routes`: hosts must be unique. + + The proxy matches by exact-host (v1); duplicate hosts leave the + route choice ambiguous so we reject them up front. + + No cross-validation against `bottle.git-gate.repos` is performed. + git-gate (SSH push/fetch) and egress (HTTPS) broker different + protocols; declaring both for the same host is a legitimate dev + setup.""" + seen_hosts: dict[str, None] = {} + for r in routes: + key = r.Host.lower() + if key in seen_hosts: + raise ManifestError( + f"bottle '{bottle_name}' egress.routes has duplicate host " + f"{r.Host!r}; each host must be unique on the proxy." + ) + seen_hosts[key] = None + + +@dataclass(frozen=True) +class PipelockRoutePolicy: + """Per-route pipelock policy overrides. + + `TlsPassthrough` adds the route host to pipelock's + `tls_interception.passthrough_domains`, so pipelock still enforces + the hostname allowlist but does not MITM/decrypt request bodies or + headers for that host. + + `SsrfIpAllowlist` adds explicit IPs/CIDRs to pipelock's SSRF + allowlist for private/internal destinations behind this route. + """ + + TlsPassthrough: bool = False + SsrfIpAllowlist: tuple[str, ...] = () + + @classmethod + def from_dict( + cls, bottle_name: str, idx: int, raw: object, + ) -> "PipelockRoutePolicy": + label = f"bottle '{bottle_name}' egress.routes[{idx}] pipelock" + d = _as_json_object(raw, label) + for k in d: + if k not in ("tls_passthrough", "ssrf_ip_allowlist"): + raise ManifestError( + f"{label} has unknown key {k!r}; " + f"only 'tls_passthrough' and 'ssrf_ip_allowlist' " + f"are accepted" + ) + tls_passthrough_raw = d.get("tls_passthrough", False) + if not isinstance(tls_passthrough_raw, bool): + raise ManifestError( + f"{label}.tls_passthrough must be a boolean " + f"(was {type(tls_passthrough_raw).__name__})" + ) + ssrf_raw = d.get("ssrf_ip_allowlist", []) + if not isinstance(ssrf_raw, list): + raise ManifestError( + f"{label}.ssrf_ip_allowlist must be an array " + f"(was {type(ssrf_raw).__name__})" + ) + ssrf_ip_allowlist: list[str] = [] + for j, item in enumerate(ssrf_raw): + if not isinstance(item, str) or not item: + raise ManifestError( + f"{label}.ssrf_ip_allowlist[{j}] must be a non-empty " + f"string (was {type(item).__name__})" + ) + try: + ipaddress.ip_network(item, strict=False) + except ValueError as e: + raise ManifestError( + f"{label}.ssrf_ip_allowlist[{j}] must be an IP address " + f"or CIDR (was {item!r}): {e}" + ) + ssrf_ip_allowlist.append(item) + return cls( + TlsPassthrough=tls_passthrough_raw, + SsrfIpAllowlist=tuple(ssrf_ip_allowlist), + ) + + +@dataclass(frozen=True) +class EgressRoute: + """One route on the per-bottle egress sidecar (PRD 0017). + + `Host` matches the request's hostname (case-insensitive). The + optional `PathAllowlist` constrains the URL path to a set of + prefixes; empty tuple means no path-level filtering. The optional + `AuthScheme` / `TokenRef` pair drives credential injection: + when set, the proxy strips any inbound Authorization and injects + ` `. When the + manifest's `auth` block is omitted both fields are empty strings — + no Authorization is written, no token forwarded. + + `Role` is reserved for future use; all role strings are currently + rejected by the validator. + + Validation rules (enforced in `from_dict`): + - `host` required, non-empty. + - `path_allowlist` optional, list of absolute path prefixes. + - `auth` optional. If present, MUST carry both `scheme` and + `token_ref` as non-empty strings; an empty `auth: {}` is an + error rather than a synonym for "no auth" (omit `auth` for + that case). + - `role` optional, reserved — any non-empty value is rejected. + """ + + Host: str + PathAllowlist: tuple[str, ...] = () + AuthScheme: str = "" + TokenRef: str = "" + Role: tuple[str, ...] = () + Pipelock: PipelockRoutePolicy = field(default_factory=PipelockRoutePolicy) + + @classmethod + def from_dict(cls, bottle_name: str, idx: int, raw: object) -> "EgressRoute": + label = f"bottle '{bottle_name}' egress.routes[{idx}]" + d = _as_json_object(raw, label) + host = d.get("host") + if not isinstance(host, str) or not host: + raise ManifestError(f"{label} missing required string field 'host'") + + path_allow_raw = d.get("path_allowlist") + prefixes: tuple[str, ...] = () + if path_allow_raw is not None: + if not isinstance(path_allow_raw, list): + raise ManifestError( + f"{label} path_allowlist must be an array " + f"(was {type(path_allow_raw).__name__})" + ) + path_list = cast(list[object], path_allow_raw) + collected: list[str] = [] + for j, p in enumerate(path_list): + if not isinstance(p, str): + raise ManifestError( + f"{label} path_allowlist[{j}] must be a string " + f"(was {type(p).__name__})" + ) + if not p.startswith("/"): + raise ManifestError( + f"{label} path_allowlist[{j}] {p!r} must be an " + f"absolute path prefix starting with '/'" + ) + collected.append(p) + prefixes = tuple(collected) + + auth_scheme = "" + token_ref = "" + if "auth" in d: + auth_raw = d.get("auth") + auth_d = _as_json_object(auth_raw, f"{label} auth") + if not auth_d: + raise ManifestError( + f"{label} auth is empty ({{}}); omit the 'auth' key " + f"entirely if this route is unauthenticated. Otherwise " + f"both 'scheme' and 'token_ref' are required." + ) + auth_scheme_raw = auth_d.get("scheme") + if not isinstance(auth_scheme_raw, str) or not auth_scheme_raw: + raise ManifestError( + f"{label} auth.scheme is required when 'auth' is set " + f"(non-empty string)" + ) + if auth_scheme_raw not in EGRESS_AUTH_SCHEMES: + raise ManifestError( + f"{label} auth.scheme {auth_scheme_raw!r} is not one of " + f"{', '.join(EGRESS_AUTH_SCHEMES)}" + ) + token_ref_raw = auth_d.get("token_ref") + if not isinstance(token_ref_raw, str) or not token_ref_raw: + raise ManifestError( + f"{label} auth.token_ref is required when 'auth' is set " + f"(name of the host env var holding the token value)" + ) + for k in auth_d: + if k not in ("scheme", "token_ref"): + raise ManifestError( + f"{label} auth has unknown key {k!r}; " + f"only 'scheme' and 'token_ref' are accepted" + ) + auth_scheme = auth_scheme_raw + token_ref = token_ref_raw + + role_raw = d.get("role") + roles: tuple[str, ...] = () + if role_raw is None: + roles = () + elif isinstance(role_raw, str): + roles = (role_raw,) + elif isinstance(role_raw, list): + role_list = cast(list[object], role_raw) + collected_roles: list[str] = [] + for r in role_list: + if not isinstance(r, str): + raise ManifestError(f"{label} role items must be strings (got {type(r).__name__})") + collected_roles.append(r) + roles = tuple(collected_roles) + else: + raise ManifestError( + f"{label} role must be a string or a list of strings " + f"(was {type(role_raw).__name__})" + ) + if roles: + raise ManifestError( + f"{label} role {roles[0]!r} is not accepted; " + f"the 'role' field is reserved for future use" + ) + + pipelock = ( + PipelockRoutePolicy.from_dict(bottle_name, idx, d["pipelock"]) + if "pipelock" in d + else PipelockRoutePolicy() + ) + + for k in d: + if k not in ("host", "path_allowlist", "auth", "role", "pipelock"): + raise ManifestError( + f"{label} has unknown key {k!r}; accepted keys are " + f"'host', 'path_allowlist', 'auth', 'role', 'pipelock'" + ) + + return cls( + Host=host, + PathAllowlist=prefixes, + AuthScheme=auth_scheme, + TokenRef=token_ref, + Role=roles, + Pipelock=pipelock, + ) + + +@dataclass(frozen=True) +class EgressConfig: + """Per-bottle egress configuration. Today this is just the + route table; the nesting under `egress:` leaves room for + per-bottle proxy settings (port override, log level, etc.) in + follow-ups.""" + + routes: tuple[EgressRoute, ...] = () + + @classmethod + def from_dict(cls, bottle_name: str, raw: object) -> "EgressConfig": + d = _as_json_object(raw, f"bottle '{bottle_name}' egress") + routes_raw = d.get("routes") + routes: tuple[EgressRoute, ...] = () + if routes_raw is not None: + if not isinstance(routes_raw, list): + raise ManifestError( + f"bottle '{bottle_name}' egress.routes must be an array " + f"(was {type(routes_raw).__name__})" + ) + routes_list = cast(list[object], routes_raw) + routes = tuple( + EgressRoute.from_dict(bottle_name, i, entry) + for i, entry in enumerate(routes_list) + ) + _validate_egress_routes(bottle_name, routes) + for k in d: + if k != "routes": + raise ManifestError( + f"bottle '{bottle_name}' egress has unknown key {k!r}; " + f"only 'routes' is accepted" + ) + return cls(routes=routes) diff --git a/bot_bottle/manifest_extends.py b/bot_bottle/manifest_extends.py index 4149a19..c1f8099 100644 --- a/bot_bottle/manifest_extends.py +++ b/bot_bottle/manifest_extends.py @@ -71,7 +71,8 @@ def _merge_bottles( name: str, ) -> Bottle: """Apply PRD 0025 merge rules.""" - from .manifest import Bottle, GitUser, _validate_egress_routes + from .manifest import Bottle, GitUser + from .manifest_egress import _validate_egress_routes # Parse the child's declared fields into a Bottle (with the # usual defaults for anything missing). Validation runs the same @@ -122,7 +123,7 @@ def _merge_bottles( def _child_declares_git_gate_repos(child_raw: dict[str, object]) -> bool: - from .manifest import _as_json_object + from ._manifest_util import _as_json_object git_raw = child_raw.get("git-gate") if git_raw is None: diff --git a/bot_bottle/manifest_git.py b/bot_bottle/manifest_git.py new file mode 100644 index 0000000..3341e1e --- /dev/null +++ b/bot_bottle/manifest_git.py @@ -0,0 +1,211 @@ +"""Git-related manifest dataclasses and helpers.""" + +from __future__ import annotations + +from dataclasses import dataclass + +from ._manifest_util import ManifestError, _as_json_object + + +def _opt_str(value: object, label: str) -> str: + if value is None: + return "" + if not isinstance(value, str): + raise ManifestError(f"{label} must be a string (was {type(value).__name__})") + return value + + +def _parse_git_upstream(url: str, label: str) -> tuple[str, str, str, str]: + """Parse `ssh://user@host[:port]/path` into (user, host, port, path). + Dies if `url` doesn't match the ssh:// shape v1 supports. Default + port is 22 (matches OpenSSH).""" + if not url.startswith("ssh://"): + raise ManifestError(f"{label} must be an ssh:// URL (was {url!r})") + rest = url[len("ssh://"):] + if "@" not in rest: + raise ManifestError(f"{label} must include a user (e.g. ssh://git@host/path.git); was {url!r}") + user, _, hostpart = rest.partition("@") + if not user: + raise ManifestError(f"{label} user is empty in {url!r}") + if "/" not in hostpart: + raise ManifestError(f"{label} must include a path (e.g. ssh://git@host/path.git); was {url!r}") + hostport, _, path = hostpart.partition("/") + if not path: + raise ManifestError(f"{label} path is empty in {url!r}") + if ":" in hostport: + host, _, port = hostport.partition(":") + if not port.isdigit(): + raise ManifestError(f"{label} port must be numeric in {url!r}") + else: + host = hostport + port = "22" + if not host: + raise ManifestError(f"{label} host is empty in {url!r}") + return (user, host, port, path) + + +def _validate_unique_git_names(bottle_name: str, git: tuple[GitEntry, ...]) -> None: + seen: dict[str, None] = {} + for g in git: + if g.Name in seen: + raise ManifestError( + f"bottle '{bottle_name}' git-gate.repos has duplicate name '{g.Name}'; " + f"each entry maps to a distinct bare repo on the gate." + ) + seen[g.Name] = None + + +@dataclass(frozen=True) +class GitEntry: + """One upstream the per-agent git-gate (PRD 0008) is allowed to + talk to. `Upstream` is the real remote URL the agent would push to + if there were no gate; the gate hosts a bare repo at /git/.git + and `IdentityFile` is the SSH key the gate uses to push that repo + upstream after gitleaks passes. The agent itself never holds the + upstream credential. + + The Upstream URL is parsed once at construction and the pieces are + stashed in the `Upstream*` fields so the git-gate render step + doesn't have to re-parse. + + Manifest source: `git-gate.repos.` (PRD 0047). The YAML keys + are `url`, `identity`, and `host_key`; the internal field names are + stable across that rename.""" + + Name: str + Upstream: str + IdentityFile: str + KnownHostKey: str = "" + RemoteKey: str = "" + UpstreamUser: str = "" + UpstreamHost: str = "" + UpstreamPort: str = "" + UpstreamPath: str = "" + + @classmethod + def from_repos_entry( + cls, bottle_name: str, repo_name: str, raw: object + ) -> "GitEntry": + """Parse one entry from `git-gate.repos.`. + + YAML keys: `url` (required), `identity` (required), + `host_key` (optional). The repo_name becomes `Name`.""" + if not repo_name: + raise ManifestError( + f"bottle '{bottle_name}' git-gate.repos has an empty key" + ) + label = f"git-gate.repos[{repo_name!r}]" + d = _as_json_object(raw, f"bottle '{bottle_name}' {label}") + for k in d: + if k not in {"url", "identity", "host_key"}: + raise ManifestError( + f"bottle '{bottle_name}' {label} has unknown key {k!r}; " + f"allowed: url, identity, host_key" + ) + upstream = d.get("url") + if not isinstance(upstream, str) or not upstream: + raise ManifestError( + f"bottle '{bottle_name}' {label} missing required string field 'url'" + ) + ident = d.get("identity") + if not isinstance(ident, str) or not ident: + raise ManifestError( + f"bottle '{bottle_name}' {label} missing required string field 'identity'" + ) + khk = _opt_str( + d.get("host_key"), + f"bottle '{bottle_name}' {label} host_key", + ) + user, host, port, path = _parse_git_upstream( + upstream, f"bottle '{bottle_name}' {label} url" + ) + return cls( + Name=repo_name, + Upstream=upstream, + IdentityFile=ident, + KnownHostKey=khk, + RemoteKey=host, + UpstreamUser=user, + UpstreamHost=host, + UpstreamPort=port, + UpstreamPath=path, + ) + + +@dataclass(frozen=True) +class GitUser: + """Per-bottle `git config --global user.name` / `user.email` + pair (issue #86). The agent's commits inside the bottle are + attributed to this identity rather than the agent image's + image-baked default (no user, or whatever the image dropped + in). Either or both fields can be set independently. + + `from_dict` is forgiving on shape (a single missing field is + fine — we just skip that config line at provisioning) but + strict on types (string-or-die).""" + + name: str = "" + email: str = "" + + @classmethod + def from_dict(cls, bottle_name: str, raw: object) -> "GitUser": + d = _as_json_object(raw, f"bottle '{bottle_name}' git-gate.user") + for k in d.keys(): + if k not in {"name", "email"}: + raise ManifestError( + f"bottle '{bottle_name}' git-gate.user has unknown key {k!r}; " + f"allowed: name, email" + ) + name = d.get("name", "") + email = d.get("email", "") + if not isinstance(name, str): + raise ManifestError( + f"bottle '{bottle_name}' git-gate.user.name must be a string " + f"(was {type(name).__name__})" + ) + if not isinstance(email, str): + raise ManifestError( + f"bottle '{bottle_name}' git-gate.user.email must be a string " + f"(was {type(email).__name__})" + ) + if not name and not email: + raise ManifestError( + f"bottle '{bottle_name}' git-gate.user is set but neither " + f"name nor email is non-empty; remove the block or " + f"fill at least one field." + ) + return cls(name=name, email=email) + + def is_empty(self) -> bool: + return not self.name and not self.email + + +def _parse_git_gate_config( + bottle_name: str, + raw: object, +) -> tuple[tuple[GitEntry, ...], GitUser]: + d = _as_json_object(raw, f"bottle '{bottle_name}' git-gate") + for k in d.keys(): + if k not in {"user", "repos"}: + raise ManifestError( + f"bottle '{bottle_name}' git-gate has unknown key {k!r}; " + f"allowed: user, repos" + ) + + git_user = ( + GitUser.from_dict(bottle_name, d["user"]) + if "user" in d + else GitUser() + ) + + git: tuple[GitEntry, ...] = () + repos_raw = d.get("repos") + if repos_raw is not None: + repos = _as_json_object(repos_raw, f"bottle '{bottle_name}' git-gate.repos") + git = tuple( + GitEntry.from_repos_entry(bottle_name, name, entry) + for name, entry in repos.items() + ) + _validate_unique_git_names(bottle_name, git) + + return git, git_user diff --git a/bot_bottle/manifest_schema.py b/bot_bottle/manifest_schema.py index 3c963a6..ceb25b5 100644 --- a/bot_bottle/manifest_schema.py +++ b/bot_bottle/manifest_schema.py @@ -58,7 +58,7 @@ def _validate_frontmatter_keys( keys: object, allowed_keys: frozenset[str], ) -> None: - from .manifest import ManifestError + from ._manifest_util import ManifestError key_set = set(keys) unknown = key_set - allowed_keys