Compare commits

...

10 Commits

Author SHA1 Message Date
didericis-claude cc0c952d0b fix(security): harden git_gate.py shell rendering with shlex.quote and name validation
test / unit (pull_request) Successful in 35s
test / integration (pull_request) Successful in 44s
test / unit (push) Successful in 32s
test / integration (push) Successful in 41s
Use shlex.quote() on name and upstream_url in git_gate_render_entrypoint()
so special characters (single quotes, spaces, semicolons) cannot break or
inject into the generated sh script.

Add _GIT_NAME_RE validation in GitEntry.from_repos_entry() to restrict
repo names to [A-Za-z0-9._-]+, making the manifest the first line of
defence and shlex.quote() the belt-and-suspenders backstop.

Closes #155
2026-06-03 04:40:21 +00:00
didericis-claude 8c9d4fbc46 refactor: address PR review feedback — de-privatize helpers and rename modules
test / unit (pull_request) Successful in 34s
test / integration (pull_request) Successful in 43s
test / unit (push) Successful in 34s
test / integration (push) Successful in 43s
- Rename _manifest_util.py → manifest_util.py (module isn't private)
- Rename _as_json_object → as_json_object, _parse_git_upstream → parse_git_upstream,
  _parse_git_gate_config → parse_git_gate_config,
  _validate_unique_git_names → validate_unique_git_names,
  _validate_egress_routes → validate_egress_routes (none are private at
  module boundary — underscore prefix was a carry-over from the old
  monolithic manifest.py where everything lived in one namespace)
- Move _is_ip_literal → util.is_ip_literal (generic, belongs in the
  top-level util module)
- Update all import sites across manifest_*.py, manifest_extends.py,
  manifest_schema.py; existing callers of manifest.py are unaffected

All 867 unit tests pass.
2026-06-03 00:33:02 -04:00
didericis-claude b9ab1263c2 refactor: split manifest.py into domain-specific modules
Closes #157. Distributes the 1,026-line manifest.py across four
focused modules:

- _manifest_util.py: ManifestError + _as_json_object (shared base)
- manifest_git.py: GitEntry, GitUser, git-gate config helpers
- manifest_egress.py: EgressRoute, EgressConfig, PipelockRoutePolicy
- manifest_agent.py: AgentProvider, Agent

manifest.py is now the residual orchestration layer: Bottle, Manifest,
and re-exports of all public names so existing callers are unaffected.
All 867 unit tests pass.
2026-06-03 00:33:02 -04:00
didericis-claude 9282bceaf8 fix: emit WARNING when Docker teardown ExitStack raises (issue #156)
test / unit (pull_request) Successful in 37s
test / integration (pull_request) Successful in 40s
test / unit (push) Successful in 32s
test / integration (push) Successful in 43s
Replace the bare `except BaseException: pass` in the `teardown` closure
with a `warn()` call that includes the container name and operation type
("compose-down"), so cleanup failures are visible in the log rather than
silently discarded.  Non-blocking: the exception is consumed and teardown
continues, preserving the original error-propagation contract.

Add test_docker_launch_teardown.py to lock the new behaviour: it injects
a RuntimeError via a mocked `compose_down` callback and asserts the
WARNING message contains the container name and operation label.
2026-06-03 04:13:53 +00:00
didericis-claude 3e50079bcc docs(prd): activate git-gate manifest redesign
test / unit (pull_request) Successful in 41s
test / integration (pull_request) Successful in 1m10s
test / unit (push) Successful in 39s
test / integration (push) Successful in 54s
PRD 0047 is now shipped to main.
2026-06-02 23:59:34 -04:00
didericis-claude cf9aaf68e7 chore: update demo manifest and example agent to git-gate (PRD 0047)
bot-bottle.demo.json: git array → git-gate.repos with url/identity/host_key
examples/agents/implementer.md: git.user → git-gate.user
2026-06-02 23:59:34 -04:00
didericis-claude 4cf2cfc55d test: update test suite for git-gate manifest redesign (PRD 0047)
- fixtures.py: fixture_with_git_dict uses git-gate.repos + url/identity/host_key
- test_manifest_git: rewrite to use git-gate.repos; replace duplicate-name
  test (names = dict keys, always unique) with two-repos-different-hosts test
- test_manifest_git_user: _manifest → git-gate.user; update error message assertions
- test_manifest_agent_git_user: git → git-gate throughout; repos rejection test
- test_manifest_extends: git.remotes/git.user → git-gate.repos/git-gate.user
- test_provision_git: IP test updated — no host alias, single insteadOf
- test_compose: git.remotes → git-gate.repos + new field names
- test_docker_provision_git_user: git.user → git-gate.user
- test_git_gate: inline manifest dict updated to git-gate.repos
- test_smolmachines_provision: git_json → git_gate_json; remove _remote_host
2026-06-02 23:59:34 -04:00
didericis-claude 7c285fde7a feat(manifest): replace git key with git-gate (PRD 0047)
- BOTTLE_KEYS and AGENT_KEYS_OPTIONAL: "git" → "git-gate"
- GitEntry: remove from_dict/from_remote_dict; add from_repos_entry
  parsing url/identity/host_key with repo name as the dict key
- GitUser.from_dict: error messages updated to git-gate.user
- _parse_git_config → _parse_git_gate_config; repos/user subkeys
- Bottle.from_dict: reads git-gate key; "git" key raises a migration error
- Agent.from_dict: reads git-gate key; repos rejected at agent level
- manifest_extends: _child_declares_git_remotes → _child_declares_git_gate_repos
- manifest_loader: threads git-gate frontmatter key into agent_dict
2026-06-02 23:59:34 -04:00
didericis-claude 64ac204c05 docs(prd): consolidate git.user into git-gate per review
Move git.user under git-gate and remove git as a top-level key
entirely, so all git configuration lives under a single section.
2026-06-02 23:59:34 -04:00
didericis-claude 59fd132b9d docs(prd): add git-gate manifest redesign plan
PRD 0047 proposes replacing git.remotes with a top-level git-gate.repos
section and snake_case field names to make clear the config is
specifically for git-gate routing, not generic git or SSH config.

Closes #160
2026-06-02 23:59:34 -04:00
25 changed files with 1438 additions and 1010 deletions
+8 -7
View File
@@ -4,14 +4,15 @@
"env": {
"FAKE_TOKEN": "ghp_aB3cD4eF5gH6iJ7kL8mN9oP0qR1sT2uV3wX4yZ"
},
"git": [
{
"Name": "foo",
"Upstream": "ssh://git@upstream.invalid/path.git",
"IdentityFile": "~/.cache/bot-bottle-demo/fake-key",
"KnownHostKey": "ssh-ed25519 AAAAEXAMPLE"
"git-gate": {
"repos": {
"foo": {
"url": "ssh://git@upstream.invalid/path.git",
"identity": "~/.cache/bot-bottle-demo/fake-key",
"host_key": "ssh-ed25519 AAAAEXAMPLE"
}
}
]
}
}
},
+6 -5
View File
@@ -43,7 +43,7 @@ from pathlib import Path
from typing import Callable, Generator
from ...egress import egress_resolve_token_values
from ...log import info
from ...log import info, warn
from . import network as network_mod
from . import util as docker_mod
from .bottle import DockerBottle
@@ -87,10 +87,11 @@ def launch(
def teardown() -> None:
try:
stack.close()
except BaseException:
# Teardown must not raise; swallow so the caller's
# __exit__ path can still propagate the original error.
pass
except BaseException as exc:
warn(
f"teardown failed for container {plan.container_name}"
f" (compose-down): {exc!r}"
)
try:
# Step 1: agent image build. Sidecar images get built lazily by
+2 -4
View File
@@ -29,6 +29,7 @@ backend-specific and lives on concrete subclasses (see
from __future__ import annotations
import shlex
from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
@@ -207,10 +208,7 @@ def git_gate_render_entrypoint(upstreams: tuple[GitGateUpstream, ...]) -> str:
"mkdir -p /git",
]
for u in upstreams:
# Single-quote args so URL/path content (containing : and /)
# passes through ash unmangled. Names came through the manifest
# validator so they don't contain a single quote.
lines.append(f"init_repo '{u.name}' '{u.upstream_url}'")
lines.append(f"init_repo {shlex.quote(u.name)} {shlex.quote(u.upstream_url)}")
lines.extend([
"",
"exec git daemon \\",
+56 -706
View File
@@ -14,9 +14,9 @@ the system prompt, for bottles the body is human documentation
Bottle schema (frontmatter):
extends: <bottle-name> # optional (PRD 0025)
env: { <NAME>: <env-entry>, ... }
git:
git-gate: # optional (PRD 0047)
user: { name: <str>, email: <str> } # optional
remotes: { <host>: <git-entry>, ... } # optional
repos: { <name>: <git-gate-entry>, ... } # optional
egress: { routes: [ <egress-route>, ... ] }
# route keys: host, path_allowlist, auth, role, pipelock
# pipelock: { tls_passthrough: <bool>, ssrf_ip_allowlist: [<cidr>, ...] }
@@ -25,6 +25,8 @@ Bottle schema (frontmatter):
Agent schema (frontmatter):
bottle: <bottle-name> # required
skills: [ <skill-name>, ... ] # optional
git-gate:
user: { name: <str>, email: <str> } # optional; overlays bottle
# Claude Code subagent passthrough fields — accepted, ignored:
name, description, model, color, memory
@@ -43,528 +45,48 @@ on-disk files.
from __future__ import annotations
import ipaddress
import os
from dataclasses import dataclass, field, replace
from pathlib import Path
from typing import Mapping, cast
from typing import Mapping
from .agent_provider import PROVIDER_TEMPLATES
from .log import warn
from .manifest_schema import AGENT_MODEL_KEYS, BOTTLE_KEYS
from .manifest_util import ManifestError, as_json_object
from .manifest_agent import Agent, AgentProvider
from .manifest_egress import (
EGRESS_AUTH_SCHEMES,
EgressConfig,
EgressRoute,
PipelockRoutePolicy,
validate_egress_routes,
)
from .manifest_git import GitEntry, GitUser, parse_git_gate_config
from .manifest_schema import BOTTLE_KEYS
class ManifestError(Exception):
"""A manifest file (or the manifest tree) is invalid."""
# Re-export everything that callers currently import from this module.
__all__ = [
"ManifestError",
"GitEntry",
"GitUser",
"AgentProvider",
"EGRESS_AUTH_SCHEMES",
"PipelockRoutePolicy",
"EgressRoute",
"EgressConfig",
"Agent",
"Bottle",
"Manifest",
]
def _empty_str_dict() -> dict[str, str]:
return {}
@dataclass(frozen=True)
class GitEntry:
"""One upstream the per-agent git-gate (PRD 0008) is allowed to
talk to. `Upstream` is the real remote URL the agent would push to
if there were no gate; the gate hosts a bare repo at /git/<Name>.git
and `IdentityFile` is the SSH key the gate uses to push that repo
upstream after gitleaks passes. The agent itself never holds the
upstream credential.
The Upstream URL is parsed once at construction and the pieces are
stashed in the `Upstream*` fields so the git-gate render step
doesn't have to re-parse."""
Name: str
Upstream: str
IdentityFile: str
KnownHostKey: str = ""
RemoteKey: str = ""
UpstreamUser: str = ""
UpstreamHost: str = ""
UpstreamPort: str = ""
UpstreamPath: str = ""
@classmethod
def from_dict(cls, bottle_name: str, idx: int, raw: object) -> "GitEntry":
d = _as_json_object(raw, f"bottle '{bottle_name}' git[{idx}]")
return cls._from_object(bottle_name, d, f"git[{idx}]", None)
@classmethod
def from_remote_dict(
cls, bottle_name: str, host_key: str, raw: object
) -> "GitEntry":
if not host_key:
raise ManifestError(f"bottle '{bottle_name}' git.remotes has an empty host key")
d = _as_json_object(raw, f"bottle '{bottle_name}' git.remotes[{host_key!r}]")
return cls._from_object(
bottle_name, d, f"git.remotes[{host_key!r}]", host_key,
)
@classmethod
def _from_object(
cls,
bottle_name: str,
d: dict[str, object],
label: str,
host_key: str | None,
) -> "GitEntry":
name = d.get("Name")
if not isinstance(name, str) or not name:
raise ManifestError(
f"bottle '{bottle_name}' {label} missing required string "
f"field 'Name'"
)
upstream = d.get("Upstream")
if not isinstance(upstream, str) or not upstream:
raise ManifestError(
f"bottle '{bottle_name}' {label} '{name}' missing required string field "
f"'Upstream'"
)
ident = d.get("IdentityFile")
if not isinstance(ident, str) or not ident:
raise ManifestError(
f"bottle '{bottle_name}' {label} '{name}' missing required string field "
f"'IdentityFile'"
)
khk = _opt_str(
d.get("KnownHostKey"),
f"bottle '{bottle_name}' {label} '{name}' KnownHostKey",
)
user, host, port, path = _parse_git_upstream(
upstream, f"bottle '{bottle_name}' {label} '{name}' Upstream"
)
if (
host_key is not None
and host_key != host
and not _is_ip_literal(host)
):
raise ManifestError(
f"bottle '{bottle_name}' git.remotes key {host_key!r} "
f"does not match Upstream host {host!r}"
)
return cls(
Name=name,
Upstream=upstream,
IdentityFile=ident,
KnownHostKey=khk,
RemoteKey=host_key or host,
UpstreamUser=user,
UpstreamHost=host,
UpstreamPort=port,
UpstreamPath=path,
)
# Auth schemes for the egress route's optional `auth` block.
# Same values cred-proxy accepts today; `token` sidesteps the Gitea
# token-not-Bearer quirk (go-gitea/gitea#16734).
EGRESS_AUTH_SCHEMES = ("Bearer", "token")
@dataclass(frozen=True)
class AgentProvider:
"""Provider/template for the agent process inside a bottle.
`template` selects a built-in launch/runtime contract. `dockerfile`
optionally points at a custom agent-image Dockerfile while leaving
bot-bottle's sidecar infrastructure intact.
`auth_token` names the host env var that holds the provider's OAuth
token (Claude only). The provisioner injects a provider-owned egress
route for api.anthropic.com that re-injects this token as the Bearer
header, and sets a placeholder CLAUDE_CODE_OAUTH_TOKEN in the agent
so the Claude Code CLI starts.
`forward_host_credentials` forwards the host Codex auth token into
the egress sidecar (Codex only).
"""
template: str = "claude"
dockerfile: str = ""
auth_token: str = ""
forward_host_credentials: bool = False
@classmethod
def from_dict(cls, bottle_name: str, raw: object) -> "AgentProvider":
d = _as_json_object(raw, f"bottle '{bottle_name}' agent_provider")
for k in d:
if k not in {"template", "dockerfile", "auth_token", "forward_host_credentials"}:
raise ManifestError(
f"bottle '{bottle_name}' agent_provider has unknown key {k!r}; "
f"allowed: template, dockerfile, auth_token, forward_host_credentials"
)
template = d.get("template", "claude")
if not isinstance(template, str) or not template:
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.template must be a "
f"non-empty string"
)
if template not in PROVIDER_TEMPLATES:
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.template {template!r} "
f"is not one of {', '.join(sorted(PROVIDER_TEMPLATES))}"
)
dockerfile = d.get("dockerfile", "")
if not isinstance(dockerfile, str):
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.dockerfile must be a "
f"string (was {type(dockerfile).__name__})"
)
auth_token = d.get("auth_token", "")
if not isinstance(auth_token, str):
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.auth_token must be a "
f"string (was {type(auth_token).__name__})"
)
if auth_token and template != "claude":
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.auth_token is only "
f"supported for template 'claude'"
)
forward_host_credentials = d.get("forward_host_credentials", False)
if not isinstance(forward_host_credentials, bool):
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.forward_host_credentials "
f"must be a boolean (was {type(forward_host_credentials).__name__})"
)
if forward_host_credentials and template != "codex":
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.forward_host_credentials "
"is currently only supported for template 'codex'"
)
return cls(
template=template,
dockerfile=dockerfile,
auth_token=auth_token,
forward_host_credentials=forward_host_credentials,
)
@dataclass(frozen=True)
class GitUser:
"""Per-bottle `git config --global user.name` / `user.email`
pair (issue #86). The agent's commits inside the bottle are
attributed to this identity rather than the agent image's
image-baked default (no user, or whatever the image dropped
in). Either or both fields can be set independently.
`from_dict` is forgiving on shape (a single missing field is
fine — we just skip that config line at provisioning) but
strict on types (string-or-die)."""
name: str = ""
email: str = ""
@classmethod
def from_dict(cls, bottle_name: str, raw: object) -> "GitUser":
d = _as_json_object(raw, f"bottle '{bottle_name}' git.user")
for k in d.keys():
if k not in {"name", "email"}:
raise ManifestError(
f"bottle '{bottle_name}' git.user has unknown key {k!r}; "
f"allowed: name, email"
)
name = d.get("name", "")
email = d.get("email", "")
if not isinstance(name, str):
raise ManifestError(
f"bottle '{bottle_name}' git.user.name must be a string "
f"(was {type(name).__name__})"
)
if not isinstance(email, str):
raise ManifestError(
f"bottle '{bottle_name}' git.user.email must be a string "
f"(was {type(email).__name__})"
)
if not name and not email:
raise ManifestError(
f"bottle '{bottle_name}' git.user is set but neither "
f"name nor email is non-empty; remove the block or "
f"fill at least one field."
)
return cls(name=name, email=email)
def is_empty(self) -> bool:
return not self.name and not self.email
def _parse_git_config(
bottle_name: str,
raw: object,
) -> tuple[tuple[GitEntry, ...], GitUser]:
d = _as_json_object(raw, f"bottle '{bottle_name}' git")
for k in d.keys():
if k not in {"user", "remotes"}:
raise ManifestError(
f"bottle '{bottle_name}' git has unknown key {k!r}; "
f"allowed: user, remotes"
)
git_user = (
GitUser.from_dict(bottle_name, d["user"])
if "user" in d
else GitUser()
)
git: tuple[GitEntry, ...] = ()
remotes_raw = d.get("remotes")
if remotes_raw is not None:
remotes = _as_json_object(remotes_raw, f"bottle '{bottle_name}' git.remotes")
git = tuple(
GitEntry.from_remote_dict(bottle_name, host, entry)
for host, entry in remotes.items()
)
_validate_unique_git_names(bottle_name, git)
return git, git_user
@dataclass(frozen=True)
class PipelockRoutePolicy:
"""Per-route pipelock policy overrides.
`TlsPassthrough` adds the route host to pipelock's
`tls_interception.passthrough_domains`, so pipelock still enforces
the hostname allowlist but does not MITM/decrypt request bodies or
headers for that host.
`SsrfIpAllowlist` adds explicit IPs/CIDRs to pipelock's SSRF
allowlist for private/internal destinations behind this route.
"""
TlsPassthrough: bool = False
SsrfIpAllowlist: tuple[str, ...] = ()
@classmethod
def from_dict(
cls, bottle_name: str, idx: int, raw: object,
) -> "PipelockRoutePolicy":
label = f"bottle '{bottle_name}' egress.routes[{idx}] pipelock"
d = _as_json_object(raw, label)
for k in d:
if k not in ("tls_passthrough", "ssrf_ip_allowlist"):
raise ManifestError(
f"{label} has unknown key {k!r}; "
f"only 'tls_passthrough' and 'ssrf_ip_allowlist' "
f"are accepted"
)
tls_passthrough_raw = d.get("tls_passthrough", False)
if not isinstance(tls_passthrough_raw, bool):
raise ManifestError(
f"{label}.tls_passthrough must be a boolean "
f"(was {type(tls_passthrough_raw).__name__})"
)
ssrf_raw = d.get("ssrf_ip_allowlist", [])
if not isinstance(ssrf_raw, list):
raise ManifestError(
f"{label}.ssrf_ip_allowlist must be an array "
f"(was {type(ssrf_raw).__name__})"
)
ssrf_ip_allowlist: list[str] = []
for j, item in enumerate(ssrf_raw):
if not isinstance(item, str) or not item:
raise ManifestError(
f"{label}.ssrf_ip_allowlist[{j}] must be a non-empty "
f"string (was {type(item).__name__})"
)
try:
ipaddress.ip_network(item, strict=False)
except ValueError as e:
raise ManifestError(
f"{label}.ssrf_ip_allowlist[{j}] must be an IP address "
f"or CIDR (was {item!r}): {e}"
)
ssrf_ip_allowlist.append(item)
return cls(
TlsPassthrough=tls_passthrough_raw,
SsrfIpAllowlist=tuple(ssrf_ip_allowlist),
)
@dataclass(frozen=True)
class EgressRoute:
"""One route on the per-bottle egress sidecar (PRD 0017).
`Host` matches the request's hostname (case-insensitive). The
optional `PathAllowlist` constrains the URL path to a set of
prefixes; empty tuple means no path-level filtering. The optional
`AuthScheme` / `TokenRef` pair drives credential injection:
when set, the proxy strips any inbound Authorization and injects
`<AuthScheme> <value-of-host-env-named-by-TokenRef>`. When the
manifest's `auth` block is omitted both fields are empty strings —
no Authorization is written, no token forwarded.
`Role` is reserved for future use; all role strings are currently
rejected by the validator.
Validation rules (enforced in `from_dict`):
- `host` required, non-empty.
- `path_allowlist` optional, list of absolute path prefixes.
- `auth` optional. If present, MUST carry both `scheme` and
`token_ref` as non-empty strings; an empty `auth: {}` is an
error rather than a synonym for "no auth" (omit `auth` for
that case).
- `role` optional, reserved — any non-empty value is rejected.
"""
Host: str
PathAllowlist: tuple[str, ...] = ()
AuthScheme: str = ""
TokenRef: str = ""
Role: tuple[str, ...] = ()
Pipelock: PipelockRoutePolicy = field(default_factory=PipelockRoutePolicy)
@classmethod
def from_dict(cls, bottle_name: str, idx: int, raw: object) -> "EgressRoute":
label = f"bottle '{bottle_name}' egress.routes[{idx}]"
d = _as_json_object(raw, label)
host = d.get("host")
if not isinstance(host, str) or not host:
raise ManifestError(f"{label} missing required string field 'host'")
path_allow_raw = d.get("path_allowlist")
prefixes: tuple[str, ...] = ()
if path_allow_raw is not None:
if not isinstance(path_allow_raw, list):
raise ManifestError(
f"{label} path_allowlist must be an array "
f"(was {type(path_allow_raw).__name__})"
)
path_list = cast(list[object], path_allow_raw)
collected: list[str] = []
for j, p in enumerate(path_list):
if not isinstance(p, str):
raise ManifestError(
f"{label} path_allowlist[{j}] must be a string "
f"(was {type(p).__name__})"
)
if not p.startswith("/"):
raise ManifestError(
f"{label} path_allowlist[{j}] {p!r} must be an "
f"absolute path prefix starting with '/'"
)
collected.append(p)
prefixes = tuple(collected)
auth_scheme = ""
token_ref = ""
if "auth" in d:
auth_raw = d.get("auth")
auth_d = _as_json_object(auth_raw, f"{label} auth")
if not auth_d:
raise ManifestError(
f"{label} auth is empty ({{}}); omit the 'auth' key "
f"entirely if this route is unauthenticated. Otherwise "
f"both 'scheme' and 'token_ref' are required."
)
auth_scheme_raw = auth_d.get("scheme")
if not isinstance(auth_scheme_raw, str) or not auth_scheme_raw:
raise ManifestError(
f"{label} auth.scheme is required when 'auth' is set "
f"(non-empty string)"
)
if auth_scheme_raw not in EGRESS_AUTH_SCHEMES:
raise ManifestError(
f"{label} auth.scheme {auth_scheme_raw!r} is not one of "
f"{', '.join(EGRESS_AUTH_SCHEMES)}"
)
token_ref_raw = auth_d.get("token_ref")
if not isinstance(token_ref_raw, str) or not token_ref_raw:
raise ManifestError(
f"{label} auth.token_ref is required when 'auth' is set "
f"(name of the host env var holding the token value)"
)
for k in auth_d:
if k not in ("scheme", "token_ref"):
raise ManifestError(
f"{label} auth has unknown key {k!r}; "
f"only 'scheme' and 'token_ref' are accepted"
)
auth_scheme = auth_scheme_raw
token_ref = token_ref_raw
role_raw = d.get("role")
roles: tuple[str, ...] = ()
if role_raw is None:
roles = ()
elif isinstance(role_raw, str):
roles = (role_raw,)
elif isinstance(role_raw, list):
role_list = cast(list[object], role_raw)
collected_roles: list[str] = []
for r in role_list:
if not isinstance(r, str):
raise ManifestError(f"{label} role items must be strings (got {type(r).__name__})")
collected_roles.append(r)
roles = tuple(collected_roles)
else:
raise ManifestError(
f"{label} role must be a string or a list of strings "
f"(was {type(role_raw).__name__})"
)
if roles:
raise ManifestError(
f"{label} role {roles[0]!r} is not accepted; "
f"the 'role' field is reserved for future use"
)
pipelock = (
PipelockRoutePolicy.from_dict(bottle_name, idx, d["pipelock"])
if "pipelock" in d
else PipelockRoutePolicy()
)
for k in d:
if k not in ("host", "path_allowlist", "auth", "role", "pipelock"):
raise ManifestError(
f"{label} has unknown key {k!r}; accepted keys are "
f"'host', 'path_allowlist', 'auth', 'role', 'pipelock'"
)
return cls(
Host=host,
PathAllowlist=prefixes,
AuthScheme=auth_scheme,
TokenRef=token_ref,
Role=roles,
Pipelock=pipelock,
)
@dataclass(frozen=True)
class EgressConfig:
"""Per-bottle egress configuration. Today this is just the
route table; the nesting under `egress:` leaves room for
per-bottle proxy settings (port override, log level, etc.) in
follow-ups."""
routes: tuple[EgressRoute, ...] = ()
@classmethod
def from_dict(cls, bottle_name: str, raw: object) -> "EgressConfig":
d = _as_json_object(raw, f"bottle '{bottle_name}' egress")
routes_raw = d.get("routes")
routes: tuple[EgressRoute, ...] = ()
if routes_raw is not None:
if not isinstance(routes_raw, list):
raise ManifestError(
f"bottle '{bottle_name}' egress.routes must be an array "
f"(was {type(routes_raw).__name__})"
)
routes_list = cast(list[object], routes_raw)
routes = tuple(
EgressRoute.from_dict(bottle_name, i, entry)
for i, entry in enumerate(routes_list)
)
_validate_egress_routes(bottle_name, routes)
for k in d:
if k != "routes":
raise ManifestError(
f"bottle '{bottle_name}' egress has unknown key {k!r}; "
f"only 'routes' is accepted"
)
return cls(routes=routes)
def _section_dict(value: object, label: str) -> dict[str, object]:
"""Like as_json_object but treats absent/null as an empty section."""
if value is None:
return {}
return as_json_object(value, label)
@dataclass(frozen=True)
@@ -573,10 +95,9 @@ class Bottle:
agent_provider: AgentProvider = field(default_factory=AgentProvider)
git: tuple[GitEntry, ...] = ()
# Per-bottle git identity (issue #86). Empty default — bottles
# that don't set `git.user:` in the manifest skip the
# `git config --global` step entirely. Set independently of
# the `git.remotes:` upstream map above: a bottle can declare a user
# identity without any git-gate upstreams, and vice versa.
# that don't set `git-gate.user:` in the manifest skip the
# `git config --global` step entirely. A bottle can declare a user
# identity without any git-gate.repos upstreams, and vice versa.
git_user: GitUser = field(default_factory=GitUser)
egress: EgressConfig = field(default_factory=EgressConfig)
# Opt-in per-bottle stuck-recovery sidecar (PRD 0013). When true,
@@ -590,7 +111,7 @@ class Bottle:
@classmethod
def from_dict(cls, name: str, raw: object) -> "Bottle":
d = _as_json_object(raw, f"bottle '{name}'")
d = as_json_object(raw, f"bottle '{name}'")
if "runtime" in d:
raise ManifestError(
@@ -603,16 +124,22 @@ class Bottle:
if "ssh" in d:
raise ManifestError(
f"bottle '{name}' has an 'ssh' field, which has been removed "
f"(PRD 0009). Move each entry to 'git': declare the upstream "
f"as a git remote with Name + Upstream URL + IdentityFile, "
f"and the per-bottle git-gate (PRD 0008) will hold the "
f"credential and gitleaks-scan pushes."
f"(PRD 0009). Declare upstreams under 'git-gate.repos' with "
f"url + identity + host_key; the git-gate sidecar (PRD 0008) "
f"holds the credential and gitleaks-scans pushes."
)
if "git" in d:
raise ManifestError(
f"bottle '{name}' uses 'git' which has been replaced by "
f"'git-gate' (PRD 0047). Move git.user → git-gate.user "
f"and git.remotes → git-gate.repos (fields: url, identity, host_key)."
)
if "git_user" in d:
raise ManifestError(
f"bottle '{name}' has a 'git_user' field, which has been "
f"removed. Move it under 'git.user'."
f"removed. Move it under 'git-gate.user'."
)
unknown = set(d.keys()) - BOTTLE_KEYS
@@ -626,7 +153,7 @@ class Bottle:
env: dict[str, str] = {}
env_raw = d.get("env")
if env_raw is not None:
env_dict = _as_json_object(env_raw, f"bottle '{name}' env")
env_dict = as_json_object(env_raw, f"bottle '{name}' env")
for var, value in env_dict.items():
if not isinstance(value, str):
raise ManifestError(
@@ -637,9 +164,9 @@ class Bottle:
git: tuple[GitEntry, ...] = ()
git_user = GitUser()
git_raw = d.get("git")
git_raw = d.get("git-gate")
if git_raw is not None:
git, git_user = _parse_git_config(name, git_raw)
git, git_user = parse_git_gate_config(name, git_raw)
agent_provider = (
AgentProvider.from_dict(name, d["agent_provider"])
@@ -666,83 +193,6 @@ class Bottle:
)
@dataclass(frozen=True)
class Agent:
bottle: str
skills: tuple[str, ...] = ()
prompt: str = ""
# Per-agent git identity (issue #94). Overlays the referenced
# bottle's git.user per-field at `Manifest.bottle_for`. Only the
# `user` block is allowed at the agent level; `git.remotes` stays
# bottle-only because it carries credentials and host trust.
git_user: GitUser = GitUser()
@classmethod
def from_dict(cls, name: str, raw: object, bottle_names: set[str]) -> "Agent":
d = _as_json_object(raw, f"agent '{name}'")
unknown = set(d.keys()) - AGENT_MODEL_KEYS
if unknown:
allowed = ", ".join(sorted(AGENT_MODEL_KEYS))
raise ManifestError(
f"agent '{name}' has unknown key(s) {sorted(unknown)}; "
f"allowed keys are {allowed}."
)
bottle = d.get("bottle")
if not isinstance(bottle, str) or not bottle:
raise ManifestError(f"agent '{name}' must declare a 'bottle' field naming a defined bottle")
if bottle not in bottle_names:
available = ", ".join(sorted(bottle_names)) or "(none defined)"
raise ManifestError(
f"agent '{name}' references bottle '{bottle}', which is not defined. "
f"Available: {available}"
)
skills: tuple[str, ...] = ()
skills_raw = d.get("skills")
if skills_raw is not None:
if not isinstance(skills_raw, list):
raise ManifestError(f"agent '{name}' skills must be an array (was {type(skills_raw).__name__})")
collected: list[str] = []
skills_list = cast(list[object], skills_raw)
for i, skill in enumerate(skills_list):
if not isinstance(skill, str):
raise ManifestError(
f"agent '{name}' skills[{i}] must be a string "
f"(was {type(skill).__name__})"
)
collected.append(skill)
skills = tuple(collected)
prompt_raw = d.get("prompt")
if prompt_raw is None:
prompt = ""
elif isinstance(prompt_raw, str):
prompt = prompt_raw
else:
raise ManifestError(f"agent '{name}' prompt must be a string (was {type(prompt_raw).__name__})")
# git: agents may declare only `git.user` (name/email). Any
# other git key — notably `remotes` — is rejected: remotes
# carry credentials and host trust and stay bottle-only.
git_user = GitUser()
git_raw = d.get("git")
if git_raw is not None:
gd = _as_json_object(git_raw, f"agent '{name}' git")
for k in gd.keys():
if k != "user":
raise ManifestError(
f"agent '{name}' git.{k} is not allowed at the "
f"agent level; only git.user (name/email) may be "
f"set on an agent. git.remotes is bottle-only "
f"(it carries credentials and host trust)."
)
if "user" in gd:
git_user = GitUser.from_dict(name, gd["user"])
return cls(bottle=bottle, skills=skills, prompt=prompt, git_user=git_user)
@dataclass(frozen=True)
class Manifest:
bottles: Mapping[str, Bottle]
@@ -827,6 +277,7 @@ class Manifest:
files = sorted(stale_bottles.glob("*.md"))
if files:
names = ", ".join(p.name for p in files)
from .log import warn
warn(
f"ignoring bottle file(s) under "
f"{stale_bottles}: {names}. Bottles can only "
@@ -844,7 +295,7 @@ class Manifest:
@classmethod
def from_json_obj(cls, obj: object) -> "Manifest":
"""Validate and build a Manifest from a raw JSON-like dict."""
d = _as_json_object(obj, "manifest")
d = as_json_object(obj, "manifest")
raw_bottles_obj = _section_dict(d.get("bottles"), "manifest 'bottles'")
raw_agents = _section_dict(d.get("agents"), "manifest 'agents'")
@@ -853,7 +304,7 @@ class Manifest:
# consistently with the md-loader path.
raw_bottles: dict[str, dict[str, object]] = {}
for n, b in raw_bottles_obj.items():
raw_bottles[n] = _as_json_object(b, f"bottle '{n}'")
raw_bottles[n] = as_json_object(b, f"bottle '{n}'")
from .manifest_extends import resolve_bottles
bottles = resolve_bottles(raw_bottles)
@@ -933,104 +384,3 @@ class Manifest:
if merged.email:
parts.append(f"email={merged.email} ({'agent' if over.email else 'bottle'})")
return ", ".join(parts)
def _as_json_object(value: object, label: str) -> dict[str, object]:
"""Assert that `value` is a JSON object (str-keyed dict) and return
a view typed as `dict[str, object]` so downstream `.get(...)` calls
have a typed surface."""
if not isinstance(value, dict):
raise ManifestError(f"{label} must be a JSON object (was {type(value).__name__})")
items = cast(dict[object, object], value)
out: dict[str, object] = {}
for k, v in items.items():
if not isinstance(k, str):
raise ManifestError(f"{label} keys must be strings (found {type(k).__name__})")
out[k] = v
return out
def _section_dict(value: object, label: str) -> dict[str, object]:
"""Like _as_json_object but treats absent/null as an empty section."""
if value is None:
return {}
return _as_json_object(value, label)
def _opt_str(value: object, label: str) -> str:
if value is None:
return ""
if not isinstance(value, str):
raise ManifestError(f"{label} must be a string (was {type(value).__name__})")
return value
def _parse_git_upstream(url: str, label: str) -> tuple[str, str, str, str]:
"""Parse `ssh://user@host[:port]/path` into (user, host, port, path).
Dies if `url` doesn't match the ssh:// shape v1 supports. Default
port is 22 (matches OpenSSH)."""
if not url.startswith("ssh://"):
raise ManifestError(f"{label} must be an ssh:// URL (was {url!r})")
rest = url[len("ssh://"):]
if "@" not in rest:
raise ManifestError(f"{label} must include a user (e.g. ssh://git@host/path.git); was {url!r}")
user, _, hostpart = rest.partition("@")
if not user:
raise ManifestError(f"{label} user is empty in {url!r}")
if "/" not in hostpart:
raise ManifestError(f"{label} must include a path (e.g. ssh://git@host/path.git); was {url!r}")
hostport, _, path = hostpart.partition("/")
if not path:
raise ManifestError(f"{label} path is empty in {url!r}")
if ":" in hostport:
host, _, port = hostport.partition(":")
if not port.isdigit():
raise ManifestError(f"{label} port must be numeric in {url!r}")
else:
host = hostport
port = "22"
if not host:
raise ManifestError(f"{label} host is empty in {url!r}")
return (user, host, port, path)
def _is_ip_literal(value: str) -> bool:
try:
ipaddress.ip_address(value)
except ValueError:
return False
return True
def _validate_egress_routes(
bottle_name: str,
routes: tuple[EgressRoute, ...],
) -> None:
"""Cross-validation for `bottle.egress.routes`: hosts must be unique.
The proxy matches by exact-host (v1); duplicate hosts leave the
route choice ambiguous so we reject them up front.
No cross-validation against `bottle.git` is performed. git-gate
(SSH push/fetch) and egress (HTTPS) broker different protocols;
declaring both for the same host is a legitimate dev setup."""
seen_hosts: dict[str, None] = {}
for r in routes:
key = r.Host.lower()
if key in seen_hosts:
raise ManifestError(
f"bottle '{bottle_name}' egress.routes has duplicate host "
f"{r.Host!r}; each host must be unique on the proxy."
)
seen_hosts[key] = None
def _validate_unique_git_names(bottle_name: str, git: tuple[GitEntry, ...]) -> None:
seen: dict[str, None] = {}
for g in git:
if g.Name in seen:
raise ManifestError(
f"bottle '{bottle_name}' git entries have duplicate Name '{g.Name}'; "
f"each entry maps to a distinct bare repo on the gate."
)
seen[g.Name] = None
+166
View File
@@ -0,0 +1,166 @@
"""Agent configuration manifest dataclasses."""
from __future__ import annotations
from dataclasses import dataclass
from typing import cast
from .agent_provider import PROVIDER_TEMPLATES
from .manifest_util import ManifestError, as_json_object
from .manifest_git import GitUser
from .manifest_schema import AGENT_MODEL_KEYS
@dataclass(frozen=True)
class AgentProvider:
"""Provider/template for the agent process inside a bottle.
`template` selects a built-in launch/runtime contract. `dockerfile`
optionally points at a custom agent-image Dockerfile while leaving
bot-bottle's sidecar infrastructure intact.
`auth_token` names the host env var that holds the provider's OAuth
token (Claude only). The provisioner injects a provider-owned egress
route for api.anthropic.com that re-injects this token as the Bearer
header, and sets a placeholder CLAUDE_CODE_OAUTH_TOKEN in the agent
so the Claude Code CLI starts.
`forward_host_credentials` forwards the host Codex auth token into
the egress sidecar (Codex only).
"""
template: str = "claude"
dockerfile: str = ""
auth_token: str = ""
forward_host_credentials: bool = False
@classmethod
def from_dict(cls, bottle_name: str, raw: object) -> "AgentProvider":
d = as_json_object(raw, f"bottle '{bottle_name}' agent_provider")
for k in d:
if k not in {"template", "dockerfile", "auth_token", "forward_host_credentials"}:
raise ManifestError(
f"bottle '{bottle_name}' agent_provider has unknown key {k!r}; "
f"allowed: template, dockerfile, auth_token, forward_host_credentials"
)
template = d.get("template", "claude")
if not isinstance(template, str) or not template:
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.template must be a "
f"non-empty string"
)
if template not in PROVIDER_TEMPLATES:
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.template {template!r} "
f"is not one of {', '.join(sorted(PROVIDER_TEMPLATES))}"
)
dockerfile = d.get("dockerfile", "")
if not isinstance(dockerfile, str):
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.dockerfile must be a "
f"string (was {type(dockerfile).__name__})"
)
auth_token = d.get("auth_token", "")
if not isinstance(auth_token, str):
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.auth_token must be a "
f"string (was {type(auth_token).__name__})"
)
if auth_token and template != "claude":
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.auth_token is only "
f"supported for template 'claude'"
)
forward_host_credentials = d.get("forward_host_credentials", False)
if not isinstance(forward_host_credentials, bool):
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.forward_host_credentials "
f"must be a boolean (was {type(forward_host_credentials).__name__})"
)
if forward_host_credentials and template != "codex":
raise ManifestError(
f"bottle '{bottle_name}' agent_provider.forward_host_credentials "
"is currently only supported for template 'codex'"
)
return cls(
template=template,
dockerfile=dockerfile,
auth_token=auth_token,
forward_host_credentials=forward_host_credentials,
)
@dataclass(frozen=True)
class Agent:
bottle: str
skills: tuple[str, ...] = ()
prompt: str = ""
# Per-agent git identity (issue #94). Overlays the referenced
# bottle's git-gate.user per-field at `Manifest.bottle_for`. Only
# `user` is allowed at the agent level; `repos` stays bottle-only
# because it carries credentials and host trust.
git_user: GitUser = GitUser()
@classmethod
def from_dict(cls, name: str, raw: object, bottle_names: set[str]) -> "Agent":
d = as_json_object(raw, f"agent '{name}'")
unknown = set(d.keys()) - AGENT_MODEL_KEYS
if unknown:
allowed = ", ".join(sorted(AGENT_MODEL_KEYS))
raise ManifestError(
f"agent '{name}' has unknown key(s) {sorted(unknown)}; "
f"allowed keys are {allowed}."
)
bottle = d.get("bottle")
if not isinstance(bottle, str) or not bottle:
raise ManifestError(f"agent '{name}' must declare a 'bottle' field naming a defined bottle")
if bottle not in bottle_names:
available = ", ".join(sorted(bottle_names)) or "(none defined)"
raise ManifestError(
f"agent '{name}' references bottle '{bottle}', which is not defined. "
f"Available: {available}"
)
skills: tuple[str, ...] = ()
skills_raw = d.get("skills")
if skills_raw is not None:
if not isinstance(skills_raw, list):
raise ManifestError(f"agent '{name}' skills must be an array (was {type(skills_raw).__name__})")
collected: list[str] = []
skills_list = cast(list[object], skills_raw)
for i, skill in enumerate(skills_list):
if not isinstance(skill, str):
raise ManifestError(
f"agent '{name}' skills[{i}] must be a string "
f"(was {type(skill).__name__})"
)
collected.append(skill)
skills = tuple(collected)
prompt_raw = d.get("prompt")
if prompt_raw is None:
prompt = ""
elif isinstance(prompt_raw, str):
prompt = prompt_raw
else:
raise ManifestError(f"agent '{name}' prompt must be a string (was {type(prompt_raw).__name__})")
# git-gate: agents may declare only `git-gate.user` (name/email).
# `git-gate.repos` is bottle-only — it carries credentials and host trust.
git_user = GitUser()
git_raw = d.get("git-gate")
if git_raw is not None:
gd = as_json_object(git_raw, f"agent '{name}' git-gate")
for k in gd.keys():
if k != "user":
raise ManifestError(
f"agent '{name}' git-gate.{k} is not allowed at the "
f"agent level; only git-gate.user (name/email) may be "
f"set on an agent. git-gate.repos is bottle-only "
f"(it carries credentials and host trust)."
)
if "user" in gd:
git_user = GitUser.from_dict(name, gd["user"])
return cls(bottle=bottle, skills=skills, prompt=prompt, git_user=git_user)
+286
View File
@@ -0,0 +1,286 @@
"""Egress routing manifest dataclasses and helpers."""
from __future__ import annotations
import ipaddress
from dataclasses import dataclass, field
from typing import cast
from .manifest_util import ManifestError, as_json_object
# Auth schemes for the egress route's optional `auth` block.
# Same values cred-proxy accepts today; `token` sidesteps the Gitea
# token-not-Bearer quirk (go-gitea/gitea#16734).
EGRESS_AUTH_SCHEMES = ("Bearer", "token")
def validate_egress_routes(
bottle_name: str,
routes: tuple[EgressRoute, ...],
) -> None:
"""Cross-validation for `bottle.egress.routes`: hosts must be unique.
The proxy matches by exact-host (v1); duplicate hosts leave the
route choice ambiguous so we reject them up front.
No cross-validation against `bottle.git-gate.repos` is performed.
git-gate (SSH push/fetch) and egress (HTTPS) broker different
protocols; declaring both for the same host is a legitimate dev
setup."""
seen_hosts: dict[str, None] = {}
for r in routes:
key = r.Host.lower()
if key in seen_hosts:
raise ManifestError(
f"bottle '{bottle_name}' egress.routes has duplicate host "
f"{r.Host!r}; each host must be unique on the proxy."
)
seen_hosts[key] = None
@dataclass(frozen=True)
class PipelockRoutePolicy:
"""Per-route pipelock policy overrides.
`TlsPassthrough` adds the route host to pipelock's
`tls_interception.passthrough_domains`, so pipelock still enforces
the hostname allowlist but does not MITM/decrypt request bodies or
headers for that host.
`SsrfIpAllowlist` adds explicit IPs/CIDRs to pipelock's SSRF
allowlist for private/internal destinations behind this route.
"""
TlsPassthrough: bool = False
SsrfIpAllowlist: tuple[str, ...] = ()
@classmethod
def from_dict(
cls, bottle_name: str, idx: int, raw: object,
) -> "PipelockRoutePolicy":
label = f"bottle '{bottle_name}' egress.routes[{idx}] pipelock"
d = as_json_object(raw, label)
for k in d:
if k not in ("tls_passthrough", "ssrf_ip_allowlist"):
raise ManifestError(
f"{label} has unknown key {k!r}; "
f"only 'tls_passthrough' and 'ssrf_ip_allowlist' "
f"are accepted"
)
tls_passthrough_raw = d.get("tls_passthrough", False)
if not isinstance(tls_passthrough_raw, bool):
raise ManifestError(
f"{label}.tls_passthrough must be a boolean "
f"(was {type(tls_passthrough_raw).__name__})"
)
ssrf_raw = d.get("ssrf_ip_allowlist", [])
if not isinstance(ssrf_raw, list):
raise ManifestError(
f"{label}.ssrf_ip_allowlist must be an array "
f"(was {type(ssrf_raw).__name__})"
)
ssrf_ip_allowlist: list[str] = []
for j, item in enumerate(ssrf_raw):
if not isinstance(item, str) or not item:
raise ManifestError(
f"{label}.ssrf_ip_allowlist[{j}] must be a non-empty "
f"string (was {type(item).__name__})"
)
try:
ipaddress.ip_network(item, strict=False)
except ValueError as e:
raise ManifestError(
f"{label}.ssrf_ip_allowlist[{j}] must be an IP address "
f"or CIDR (was {item!r}): {e}"
)
ssrf_ip_allowlist.append(item)
return cls(
TlsPassthrough=tls_passthrough_raw,
SsrfIpAllowlist=tuple(ssrf_ip_allowlist),
)
@dataclass(frozen=True)
class EgressRoute:
"""One route on the per-bottle egress sidecar (PRD 0017).
`Host` matches the request's hostname (case-insensitive). The
optional `PathAllowlist` constrains the URL path to a set of
prefixes; empty tuple means no path-level filtering. The optional
`AuthScheme` / `TokenRef` pair drives credential injection:
when set, the proxy strips any inbound Authorization and injects
`<AuthScheme> <value-of-host-env-named-by-TokenRef>`. When the
manifest's `auth` block is omitted both fields are empty strings —
no Authorization is written, no token forwarded.
`Role` is reserved for future use; all role strings are currently
rejected by the validator.
Validation rules (enforced in `from_dict`):
- `host` required, non-empty.
- `path_allowlist` optional, list of absolute path prefixes.
- `auth` optional. If present, MUST carry both `scheme` and
`token_ref` as non-empty strings; an empty `auth: {}` is an
error rather than a synonym for "no auth" (omit `auth` for
that case).
- `role` optional, reserved — any non-empty value is rejected.
"""
Host: str
PathAllowlist: tuple[str, ...] = ()
AuthScheme: str = ""
TokenRef: str = ""
Role: tuple[str, ...] = ()
Pipelock: PipelockRoutePolicy = field(default_factory=PipelockRoutePolicy)
@classmethod
def from_dict(cls, bottle_name: str, idx: int, raw: object) -> "EgressRoute":
label = f"bottle '{bottle_name}' egress.routes[{idx}]"
d = as_json_object(raw, label)
host = d.get("host")
if not isinstance(host, str) or not host:
raise ManifestError(f"{label} missing required string field 'host'")
path_allow_raw = d.get("path_allowlist")
prefixes: tuple[str, ...] = ()
if path_allow_raw is not None:
if not isinstance(path_allow_raw, list):
raise ManifestError(
f"{label} path_allowlist must be an array "
f"(was {type(path_allow_raw).__name__})"
)
path_list = cast(list[object], path_allow_raw)
collected: list[str] = []
for j, p in enumerate(path_list):
if not isinstance(p, str):
raise ManifestError(
f"{label} path_allowlist[{j}] must be a string "
f"(was {type(p).__name__})"
)
if not p.startswith("/"):
raise ManifestError(
f"{label} path_allowlist[{j}] {p!r} must be an "
f"absolute path prefix starting with '/'"
)
collected.append(p)
prefixes = tuple(collected)
auth_scheme = ""
token_ref = ""
if "auth" in d:
auth_raw = d.get("auth")
auth_d = as_json_object(auth_raw, f"{label} auth")
if not auth_d:
raise ManifestError(
f"{label} auth is empty ({{}}); omit the 'auth' key "
f"entirely if this route is unauthenticated. Otherwise "
f"both 'scheme' and 'token_ref' are required."
)
auth_scheme_raw = auth_d.get("scheme")
if not isinstance(auth_scheme_raw, str) or not auth_scheme_raw:
raise ManifestError(
f"{label} auth.scheme is required when 'auth' is set "
f"(non-empty string)"
)
if auth_scheme_raw not in EGRESS_AUTH_SCHEMES:
raise ManifestError(
f"{label} auth.scheme {auth_scheme_raw!r} is not one of "
f"{', '.join(EGRESS_AUTH_SCHEMES)}"
)
token_ref_raw = auth_d.get("token_ref")
if not isinstance(token_ref_raw, str) or not token_ref_raw:
raise ManifestError(
f"{label} auth.token_ref is required when 'auth' is set "
f"(name of the host env var holding the token value)"
)
for k in auth_d:
if k not in ("scheme", "token_ref"):
raise ManifestError(
f"{label} auth has unknown key {k!r}; "
f"only 'scheme' and 'token_ref' are accepted"
)
auth_scheme = auth_scheme_raw
token_ref = token_ref_raw
role_raw = d.get("role")
roles: tuple[str, ...] = ()
if role_raw is None:
roles = ()
elif isinstance(role_raw, str):
roles = (role_raw,)
elif isinstance(role_raw, list):
role_list = cast(list[object], role_raw)
collected_roles: list[str] = []
for r in role_list:
if not isinstance(r, str):
raise ManifestError(f"{label} role items must be strings (got {type(r).__name__})")
collected_roles.append(r)
roles = tuple(collected_roles)
else:
raise ManifestError(
f"{label} role must be a string or a list of strings "
f"(was {type(role_raw).__name__})"
)
if roles:
raise ManifestError(
f"{label} role {roles[0]!r} is not accepted; "
f"the 'role' field is reserved for future use"
)
pipelock = (
PipelockRoutePolicy.from_dict(bottle_name, idx, d["pipelock"])
if "pipelock" in d
else PipelockRoutePolicy()
)
for k in d:
if k not in ("host", "path_allowlist", "auth", "role", "pipelock"):
raise ManifestError(
f"{label} has unknown key {k!r}; accepted keys are "
f"'host', 'path_allowlist', 'auth', 'role', 'pipelock'"
)
return cls(
Host=host,
PathAllowlist=prefixes,
AuthScheme=auth_scheme,
TokenRef=token_ref,
Role=roles,
Pipelock=pipelock,
)
@dataclass(frozen=True)
class EgressConfig:
"""Per-bottle egress configuration. Today this is just the
route table; the nesting under `egress:` leaves room for
per-bottle proxy settings (port override, log level, etc.) in
follow-ups."""
routes: tuple[EgressRoute, ...] = ()
@classmethod
def from_dict(cls, bottle_name: str, raw: object) -> "EgressConfig":
d = as_json_object(raw, f"bottle '{bottle_name}' egress")
routes_raw = d.get("routes")
routes: tuple[EgressRoute, ...] = ()
if routes_raw is not None:
if not isinstance(routes_raw, list):
raise ManifestError(
f"bottle '{bottle_name}' egress.routes must be an array "
f"(was {type(routes_raw).__name__})"
)
routes_list = cast(list[object], routes_raw)
routes = tuple(
EgressRoute.from_dict(bottle_name, i, entry)
for i, entry in enumerate(routes_list)
)
validate_egress_routes(bottle_name, routes)
for k in d:
if k != "routes":
raise ManifestError(
f"bottle '{bottle_name}' egress has unknown key {k!r}; "
f"only 'routes' is accepted"
)
return cls(routes=routes)
+12 -11
View File
@@ -71,7 +71,8 @@ def _merge_bottles(
name: str,
) -> Bottle:
"""Apply PRD 0025 merge rules."""
from .manifest import Bottle, GitUser, _validate_egress_routes
from .manifest import Bottle, GitUser
from .manifest_egress import validate_egress_routes
# Parse the child's declared fields into a Bottle (with the
# usual defaults for anything missing). Validation runs the same
@@ -81,19 +82,19 @@ def _merge_bottles(
# env: dict merge, child wins on collision.
merged_env = {**parent.env, **child.env}
# git.user: per-field overlay. Each non-empty field on child
# git-gate.user: per-field overlay. Each non-empty field on child
# wins; empties fall through to parent. The default GitUser()
# is two empty strings, so a child that omits git.user
# is two empty strings, so a child that omits git-gate.user
# inherits the parent's user verbatim.
merged_git_user = GitUser(
name=child.git_user.name or parent.git_user.name,
email=child.git_user.email or parent.git_user.email,
)
# git.remotes: missing means inherit; an explicit empty object
# git-gate.repos: missing means inherit; an explicit empty object
# clears; otherwise parent and child merge by UpstreamHost with
# child entries replacing duplicate hosts.
if _child_declares_git_remotes(child_raw):
if _child_declares_git_gate_repos(child_raw):
merged_git = _merge_git_remotes(parent.git, child.git) if child.git else ()
else:
merged_git = parent.git
@@ -109,7 +110,7 @@ def _merge_bottles(
merged_supervise = (
child.supervise if "supervise" in child_raw else parent.supervise
)
_validate_egress_routes(name, merged_egress.routes)
validate_egress_routes(name, merged_egress.routes)
return Bottle(
env=merged_env,
@@ -121,14 +122,14 @@ def _merge_bottles(
)
def _child_declares_git_remotes(child_raw: dict[str, object]) -> bool:
from .manifest import _as_json_object
def _child_declares_git_gate_repos(child_raw: dict[str, object]) -> bool:
from .manifest_util import as_json_object
git_raw = child_raw.get("git")
git_raw = child_raw.get("git-gate")
if git_raw is None:
return False
git_obj = _as_json_object(git_raw, "child git")
return "remotes" in git_obj
git_obj = as_json_object(git_raw, "child git-gate")
return "repos" in git_obj
def _merge_git_remotes(
+222
View File
@@ -0,0 +1,222 @@
"""Git-related manifest dataclasses and helpers."""
from __future__ import annotations
import re
from dataclasses import dataclass
from .manifest_util import ManifestError, as_json_object
# Shell-safe characters for git-gate repo names. Names are embedded in
# the generated entrypoint shell script (shlex.quote is the primary
# defence; this regex is belt-and-suspenders and documents intent).
_GIT_NAME_RE = re.compile(r"^[A-Za-z0-9._-]+$")
def _opt_str(value: object, label: str) -> str:
if value is None:
return ""
if not isinstance(value, str):
raise ManifestError(f"{label} must be a string (was {type(value).__name__})")
return value
def parse_git_upstream(url: str, label: str) -> tuple[str, str, str, str]:
"""Parse `ssh://user@host[:port]/path` into (user, host, port, path).
Dies if `url` doesn't match the ssh:// shape v1 supports. Default
port is 22 (matches OpenSSH)."""
if not url.startswith("ssh://"):
raise ManifestError(f"{label} must be an ssh:// URL (was {url!r})")
rest = url[len("ssh://"):]
if "@" not in rest:
raise ManifestError(f"{label} must include a user (e.g. ssh://git@host/path.git); was {url!r}")
user, _, hostpart = rest.partition("@")
if not user:
raise ManifestError(f"{label} user is empty in {url!r}")
if "/" not in hostpart:
raise ManifestError(f"{label} must include a path (e.g. ssh://git@host/path.git); was {url!r}")
hostport, _, path = hostpart.partition("/")
if not path:
raise ManifestError(f"{label} path is empty in {url!r}")
if ":" in hostport:
host, _, port = hostport.partition(":")
if not port.isdigit():
raise ManifestError(f"{label} port must be numeric in {url!r}")
else:
host = hostport
port = "22"
if not host:
raise ManifestError(f"{label} host is empty in {url!r}")
return (user, host, port, path)
def validate_unique_git_names(bottle_name: str, git: tuple[GitEntry, ...]) -> None:
seen: dict[str, None] = {}
for g in git:
if g.Name in seen:
raise ManifestError(
f"bottle '{bottle_name}' git-gate.repos has duplicate name '{g.Name}'; "
f"each entry maps to a distinct bare repo on the gate."
)
seen[g.Name] = None
@dataclass(frozen=True)
class GitEntry:
"""One upstream the per-agent git-gate (PRD 0008) is allowed to
talk to. `Upstream` is the real remote URL the agent would push to
if there were no gate; the gate hosts a bare repo at /git/<Name>.git
and `IdentityFile` is the SSH key the gate uses to push that repo
upstream after gitleaks passes. The agent itself never holds the
upstream credential.
The Upstream URL is parsed once at construction and the pieces are
stashed in the `Upstream*` fields so the git-gate render step
doesn't have to re-parse.
Manifest source: `git-gate.repos.<Name>` (PRD 0047). The YAML keys
are `url`, `identity`, and `host_key`; the internal field names are
stable across that rename."""
Name: str
Upstream: str
IdentityFile: str
KnownHostKey: str = ""
RemoteKey: str = ""
UpstreamUser: str = ""
UpstreamHost: str = ""
UpstreamPort: str = ""
UpstreamPath: str = ""
@classmethod
def from_repos_entry(
cls, bottle_name: str, repo_name: str, raw: object
) -> "GitEntry":
"""Parse one entry from `git-gate.repos.<repo_name>`.
YAML keys: `url` (required), `identity` (required),
`host_key` (optional). The repo_name becomes `Name`."""
if not repo_name:
raise ManifestError(
f"bottle '{bottle_name}' git-gate.repos has an empty key"
)
if not _GIT_NAME_RE.match(repo_name):
raise ManifestError(
f"bottle '{bottle_name}' git-gate.repos name {repo_name!r} is invalid; "
f"allowed characters: A-Z a-z 0-9 . _ -"
)
label = f"git-gate.repos[{repo_name!r}]"
d = as_json_object(raw, f"bottle '{bottle_name}' {label}")
for k in d:
if k not in {"url", "identity", "host_key"}:
raise ManifestError(
f"bottle '{bottle_name}' {label} has unknown key {k!r}; "
f"allowed: url, identity, host_key"
)
upstream = d.get("url")
if not isinstance(upstream, str) or not upstream:
raise ManifestError(
f"bottle '{bottle_name}' {label} missing required string field 'url'"
)
ident = d.get("identity")
if not isinstance(ident, str) or not ident:
raise ManifestError(
f"bottle '{bottle_name}' {label} missing required string field 'identity'"
)
khk = _opt_str(
d.get("host_key"),
f"bottle '{bottle_name}' {label} host_key",
)
user, host, port, path = parse_git_upstream(
upstream, f"bottle '{bottle_name}' {label} url"
)
return cls(
Name=repo_name,
Upstream=upstream,
IdentityFile=ident,
KnownHostKey=khk,
RemoteKey=host,
UpstreamUser=user,
UpstreamHost=host,
UpstreamPort=port,
UpstreamPath=path,
)
@dataclass(frozen=True)
class GitUser:
"""Per-bottle `git config --global user.name` / `user.email`
pair (issue #86). The agent's commits inside the bottle are
attributed to this identity rather than the agent image's
image-baked default (no user, or whatever the image dropped
in). Either or both fields can be set independently.
`from_dict` is forgiving on shape (a single missing field is
fine — we just skip that config line at provisioning) but
strict on types (string-or-die)."""
name: str = ""
email: str = ""
@classmethod
def from_dict(cls, bottle_name: str, raw: object) -> "GitUser":
d = as_json_object(raw, f"bottle '{bottle_name}' git-gate.user")
for k in d.keys():
if k not in {"name", "email"}:
raise ManifestError(
f"bottle '{bottle_name}' git-gate.user has unknown key {k!r}; "
f"allowed: name, email"
)
name = d.get("name", "")
email = d.get("email", "")
if not isinstance(name, str):
raise ManifestError(
f"bottle '{bottle_name}' git-gate.user.name must be a string "
f"(was {type(name).__name__})"
)
if not isinstance(email, str):
raise ManifestError(
f"bottle '{bottle_name}' git-gate.user.email must be a string "
f"(was {type(email).__name__})"
)
if not name and not email:
raise ManifestError(
f"bottle '{bottle_name}' git-gate.user is set but neither "
f"name nor email is non-empty; remove the block or "
f"fill at least one field."
)
return cls(name=name, email=email)
def is_empty(self) -> bool:
return not self.name and not self.email
def parse_git_gate_config(
bottle_name: str,
raw: object,
) -> tuple[tuple[GitEntry, ...], GitUser]:
d = as_json_object(raw, f"bottle '{bottle_name}' git-gate")
for k in d.keys():
if k not in {"user", "repos"}:
raise ManifestError(
f"bottle '{bottle_name}' git-gate has unknown key {k!r}; "
f"allowed: user, repos"
)
git_user = (
GitUser.from_dict(bottle_name, d["user"])
if "user" in d
else GitUser()
)
git: tuple[GitEntry, ...] = ()
repos_raw = d.get("repos")
if repos_raw is not None:
repos = as_json_object(repos_raw, f"bottle '{bottle_name}' git-gate.repos")
git = tuple(
GitEntry.from_repos_entry(bottle_name, name, entry)
for name, entry in repos.items()
)
validate_unique_git_names(bottle_name, git)
return git, git_user
+3 -3
View File
@@ -93,13 +93,13 @@ def load_agents_from_dir(
validate_agent_frontmatter_keys(path, fm.keys())
# Build the dict Agent.from_dict expects. The body becomes
# prompt; Claude Code passthrough fields stay in fm and get
# ignored by Agent.from_dict (which reads bottle/skills/git/prompt).
# ignored by Agent.from_dict (reads bottle/skills/git-gate/prompt).
agent_dict: dict[str, object] = {
"bottle": fm.get("bottle"),
"skills": fm.get("skills", []),
"prompt": body.strip(),
}
if "git" in fm:
agent_dict["git"] = fm["git"]
if "git-gate" in fm:
agent_dict["git-gate"] = fm["git-gate"]
out[name] = Agent.from_dict(name, agent_dict, bottle_names)
return out
+3 -3
View File
@@ -16,10 +16,10 @@ _FILENAME_RX = re.compile(r"^[a-z][a-z0-9-]*$")
# sets dies with a "did you mean" pointer: typos should not silently
# ghost into an empty config.
BOTTLE_KEYS = frozenset(
{"env", "extends", "agent_provider", "git", "egress", "supervise"}
{"env", "extends", "agent_provider", "git-gate", "egress", "supervise"}
)
AGENT_KEYS_REQUIRED = frozenset({"bottle"})
AGENT_KEYS_OPTIONAL = frozenset({"skills", "git"})
AGENT_KEYS_OPTIONAL = frozenset({"skills", "git-gate"})
# Claude Code subagent fields bot-bottle ignores at launch but does
# not reject. This lets the same file double as
@@ -58,7 +58,7 @@ def _validate_frontmatter_keys(
keys: object,
allowed_keys: frozenset[str],
) -> None:
from .manifest import ManifestError
from .manifest_util import ManifestError
key_set = set(keys)
unknown = key_set - allowed_keys
+24
View File
@@ -0,0 +1,24 @@
"""Shared manifest primitives used by all manifest sub-modules."""
from __future__ import annotations
from typing import cast
class ManifestError(Exception):
"""A manifest file (or the manifest tree) is invalid."""
def as_json_object(value: object, label: str) -> dict[str, object]:
"""Assert that `value` is a JSON object (str-keyed dict) and return
a view typed as `dict[str, object]` so downstream `.get(...)` calls
have a typed surface."""
if not isinstance(value, dict):
raise ManifestError(f"{label} must be a JSON object (was {type(value).__name__})")
items = cast(dict[object, object], value)
out: dict[str, object] = {}
for k, v in items.items():
if not isinstance(k, str):
raise ManifestError(f"{label} keys must be strings (found {type(k).__name__})")
out[k] = v
return out
+9
View File
@@ -5,9 +5,18 @@ level deeper, under their backend package."""
from __future__ import annotations
import ipaddress
import os
def is_ip_literal(value: str) -> bool:
try:
ipaddress.ip_address(value)
except ValueError:
return False
return True
def expand_tilde(path: str) -> str:
"""Expand a leading '~' to $HOME. Leaves paths without a leading
tilde unchanged. Falls back to the empty string if $HOME is unset
@@ -0,0 +1,170 @@
# PRD 0047: Git-gate Manifest Redesign
- **Status:** Active
- **Author:** didericis
- **Created:** 2026-06-03
- **Issue:** #160
## Summary
Replace the `git` top-level key in bottle and agent manifests with `git-gate`,
consolidating git-identity configuration (`user`) and git-gate sidecar
configuration (`repos`) under a single section. Within `repos`, field names
move to lowercase snake_case and the local repo name is promoted to the YAML
key. The change removes the ambiguity in the current `git` block: its fields
are not generic git or SSH config — they are specifically the credential,
host-trust, and identity material that is managed in relation to git-gate.
## Problem
The current bottle manifest uses a `git` top-level key that mixes two concerns:
- `git.user``git config --global user.name / user.email` identity, which
the provisioner injects into the agent's shell.
- `git.remotes` — upstream URL, identity file, and host key material that the
git-gate sidecar consumes; the agent never sees these values.
That grouping suggests the `remotes` entries behave like an SSH config or a
generic `.gitconfig` remote declaration. They do not. The gate reads the
credential material to push upstream after gitleaks passes; the agent's
`.gitconfig` receives only the `insteadOf` rewrite that redirects traffic
through the gate. Nothing in the current key name or field names signals this.
Splitting `git.user` into a separate section from `git.remotes` also doesn't
help: both concepts exist because of git-gate, and keeping them under a single
`git-gate` key makes their relationship and purpose explicit.
The field names inside each remote entry also use PascalCase (`Name`,
`Upstream`, `IdentityFile`, `KnownHostKey`), inconsistent with every other
manifest section, which uses snake_case.
The current `git.remotes` dict is keyed by upstream host, which works for
simple remotes but forces a separate `Name` field to give the gate's bare repo
a local label. The host key and `Name` field are often redundant or confusing
(e.g., IP-literal upstreams where the key carries no semantic meaning).
## Goals / Success Criteria
- `git-gate` is accepted as a top-level bottle and agent key; `git` is removed
from both allowed-key sets.
- `git-gate.repos` is a named map where each key is the local repo name
exposed by the gate (bottle-only; rejected at the agent level).
- Each entry in `git-gate.repos` accepts exactly: `url` (required), `identity`
(required), `host_key` (optional).
- `git-gate.user` replaces `git.user` on both bottles and agents, with the
same `name` / `email` fields and overlay semantics.
- The manifest parser rejects `git.remotes` and `git.user` with errors that
point to the new keys.
- `GitEntry` internal fields are updated to match the new names; all callers
(provisioner, git-gate render, plan, tests) compile and pass.
- Existing unit tests in `tests/unit/test_manifest_git.py` and
`tests/unit/test_manifest_git_user.py` are rewritten to use the new YAML
shape; all other manifest unit tests remain green.
- The demo manifest (`bot-bottle.demo.json`) and any examples using the old
shape are updated.
## Non-goals
- No change to `git.user` / `git-gate.user` semantics or field names (`name`,
`email`).
- No change to git-gate runtime behavior (mirroring, gitleaks, access-hook
refresh).
- No change to the `insteadOf` rewrite the provisioner emits.
- No migration shim: the old `git.*` shape is rejected immediately with clear
error messages pointing to the new keys.
- No change to how agent-level user config overlays the bottle-level value.
## Design
### New manifest shape
**Before** (bottle frontmatter):
```yaml
git:
user:
name: implementer-bot
email: eric+implementer@dideric.is
remotes:
gitea.dideric.is:
Name: bot-bottle
Upstream: ssh://git@gitea.dideric.is:30009/didericis/bot-bottle.git
IdentityFile: ~/.ssh/gitea-delos-2.pem
KnownHostKey: "ssh-rsa AAAA..."
```
**After**:
```yaml
git-gate:
user:
name: implementer-bot
email: eric+implementer@dideric.is
repos:
bot-bottle:
url: ssh://git@gitea.dideric.is:30009/didericis/bot-bottle.git
identity: ~/.ssh/gitea-delos-2.pem
host_key: "ssh-rsa AAAA..."
```
`git-gate` is the single optional top-level key for all git configuration.
Bottles that previously used only `git.user` now use only `git-gate.user`;
those that used only `git.remotes` now use only `git-gate.repos`.
### Key-name-as-repo-name
The YAML key in `git-gate.repos` becomes the local repo name (previously
`Name`). The upstream host is no longer the primary key; the provisioner and
gate derive it from the `url` field during parse. IP-literal upstreams work
without an artificial host-as-key constraint.
### Field renames
| Old field | New field |
|-----------|-----------|
| `Name` (from dict key) | YAML key in `repos` |
| `Upstream` | `url` |
| `IdentityFile` | `identity` |
| `KnownHostKey` | `host_key` |
### Parser changes
- `manifest_schema.py`: replace `"git"` with `"git-gate"` in `BOTTLE_KEYS`
and `AGENT_KEYS_OPTIONAL`.
- `manifest.py`: replace `_parse_git_config` with `_parse_git_gate_config`
that validates both `user` and `repos` subkeys. Update `Bottle.from_dict`
and `Agent.from_dict` to call it for the `"git-gate"` key.
- `Agent.from_dict` continues to reject `repos` at the agent level with a
clear error.
- Remove `from_remote_dict` and update `GitEntry._from_object` to accept the
new field names. Internal dataclass field names (`UpstreamUser`, etc.) are
unchanged — they are internal plumbing, not user-facing.
- Any existing `"git"` key raises a targeted error:
```
bottle 'dev' uses 'git' which has been replaced by 'git-gate' (PRD 0047).
Move git.user → git-gate.user and git.remotes → git-gate.repos.
```
## Testing Strategy
Run:
```
python3 -m unittest discover -s tests/unit
```
Test files to update:
- `tests/unit/test_manifest_git.py` — rewrite fixtures and assertions to use
`git-gate.repos` / lowercase fields. Cover: minimal entry, optional
`host_key`, missing `url`, missing `identity`, unknown key, IP-literal
upstreams, duplicate name rejection, old `git.remotes` and bare `git` key
both rejected.
- `tests/unit/test_manifest_git_user.py` and
`tests/unit/test_manifest_agent_git_user.py` — update fixtures to use
`git-gate.user` at both bottle and agent level.
## Open Questions
None.
+1 -1
View File
@@ -5,7 +5,7 @@ model: opus
bottle: dev
skills:
- init-prd
git:
git-gate:
user:
name: implementer-bot
email: eric+implementer@dideric.is
+11 -13
View File
@@ -38,23 +38,21 @@ def fixture_with_egress_dict() -> dict[str, Any]:
def fixture_with_git_dict() -> dict[str, Any]:
"""Bottle declares a git-gate upstream. JSON shape."""
"""Bottle declares git-gate upstreams. JSON shape."""
return {
"bottles": {
"dev": {
"git": {
"remotes": {
"gitea.dideric.is": {
"Name": "bot-bottle",
"Upstream": "ssh://git@gitea.dideric.is:30009/didericis/bot-bottle.git",
"IdentityFile": "/dev/null",
"KnownHostKey": "ssh-ed25519 AAAA...",
"git-gate": {
"repos": {
"bot-bottle": {
"url": "ssh://git@gitea.dideric.is:30009/didericis/bot-bottle.git",
"identity": "/dev/null",
"host_key": "ssh-ed25519 AAAA...",
},
"github.com": {
"Name": "foo",
"Upstream": "ssh://git@github.com/didericis/foo.git",
"IdentityFile": "/dev/null",
"KnownHostKey": "ssh-ed25519 BBBB...",
"foo": {
"url": "ssh://git@github.com/didericis/foo.git",
"identity": "/dev/null",
"host_key": "ssh-ed25519 BBBB...",
},
},
}
+4 -5
View File
@@ -49,11 +49,10 @@ def _manifest(*, supervise: bool, with_git: bool, with_egress: bool) -> Manifest
if supervise:
bottle["supervise"] = True
if with_git:
bottle["git"] = {"remotes": {
"example.com": {
"Name": "upstream",
"Upstream": "ssh://git@example.com:22/x/y.git",
"IdentityFile": "/etc/hostname", # any existing file
bottle["git-gate"] = {"repos": {
"upstream": {
"url": "ssh://git@example.com:22/x/y.git",
"identity": "/etc/hostname", # any existing file
},
}}
if with_egress:
+145
View File
@@ -0,0 +1,145 @@
"""Unit: Docker launch teardown warning on ExitStack failure (issue #156).
When a callback registered in the ExitStack raises during teardown,
the teardown function must emit a WARNING-level message that includes
the container name and operation type, rather than silently discarding
the exception.
"""
from __future__ import annotations
import contextlib
import io
import tempfile
import unittest
from pathlib import Path
from unittest import mock
from bot_bottle.agent_provider import AgentProvisionPlan
from bot_bottle.backend import BottleSpec
from bot_bottle.backend.docker import launch as launch_mod
from bot_bottle.backend.docker.bottle_plan import DockerBottlePlan
from bot_bottle.egress import EgressPlan
from bot_bottle.git_gate import GitGatePlan
from bot_bottle.manifest import Manifest
from bot_bottle.pipelock import PipelockProxyPlan
from bot_bottle.workspace import workspace_plan
def _manifest() -> Manifest:
return Manifest.from_json_obj({
"bottles": {"dev": {}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
})
def _plan(tmp: str) -> DockerBottlePlan:
stage = Path(tmp)
manifest = _manifest()
spec = BottleSpec(
manifest=manifest,
agent_name="demo",
copy_cwd=False,
user_cwd=tmp,
identity="test-teardown-00001",
)
return DockerBottlePlan(
spec=spec,
stage_dir=stage,
git_gate_plan=GitGatePlan(
slug="test-teardown-00001",
entrypoint_script=stage / "entrypoint.sh",
hook_script=stage / "hook.sh",
access_hook_script=stage / "access-hook.sh",
upstreams=(),
),
egress_plan=EgressPlan(
slug="test-teardown-00001",
routes_path=stage / "egress.yaml",
routes=(),
token_env_map={},
),
supervise_plan=None,
agent_provision=AgentProvisionPlan(
template="claude",
command="claude",
prompt_mode="append_file",
image="",
dockerfile="",
guest_env={},
),
workspace_plan=workspace_plan(spec, guest_home="/home/node"),
slug="test-teardown-00001",
container_name="bot-bottle-test-teardown-abc",
container_name_pinned=False,
image="bot-bottle-claude:latest",
derived_image="",
runtime_image="bot-bottle-claude:latest",
dockerfile_path="",
env_file=stage / "env",
forwarded_env={},
prompt_file=stage / "prompt.txt",
proxy_plan=PipelockProxyPlan(
yaml_path=stage / "pipelock.yaml",
slug="test-teardown-00001",
),
use_runsc=False,
)
class TestTeardownWarning(unittest.TestCase):
def setUp(self) -> None:
self._tmp = tempfile.mkdtemp(prefix="docker-launch-teardown-test.")
def tearDown(self) -> None:
import shutil
shutil.rmtree(self._tmp, ignore_errors=True)
def test_teardown_failure_emits_warning_with_container_and_operation(self):
plan = _plan(self._tmp)
buf = io.StringIO()
with mock.patch.object(launch_mod.docker_mod, "build_image"), \
mock.patch.object(
launch_mod, "pipelock_tls_init",
return_value=(Path("/ca.crt"), Path("/ca.key")),
), \
mock.patch.object(
launch_mod, "egress_tls_init",
return_value=(Path("/egress_ca"), Path("/egress_cert")),
), \
mock.patch.object(
launch_mod.network_mod, "network_name_for_slug",
return_value="bb-internal-test",
), \
mock.patch.object(
launch_mod.network_mod, "network_egress_name_for_slug",
return_value="bb-egress-test",
), \
mock.patch.object(
launch_mod, "bottle_plan_to_compose",
return_value={"services": {"agent": {}}},
), \
mock.patch.object(
launch_mod, "write_compose_file",
return_value=Path("/tmp/compose.yml"),
), \
mock.patch.object(launch_mod, "compose_up"), \
mock.patch.object(launch_mod, "compose_dump_logs"), \
mock.patch.object(
launch_mod, "compose_down",
side_effect=RuntimeError("network remove failed"),
), \
contextlib.redirect_stderr(buf):
provision = mock.Mock(return_value=None)
with launch_mod.launch(plan, provision=provision):
pass
output = buf.getvalue()
self.assertIn("bot-bottle: warning:", output)
self.assertIn("bot-bottle-test-teardown-abc", output)
self.assertIn("compose-down", output)
if __name__ == "__main__":
unittest.main()
+1 -1
View File
@@ -30,7 +30,7 @@ def _plan(*, git_user: dict | None = None,
stage_dir: Path | None = None) -> DockerBottlePlan:
bottle_json: dict = {}
if git_user is not None:
bottle_json["git"] = {"user": git_user}
bottle_json["git-gate"] = {"user": git_user}
manifest = Manifest.from_json_obj({
"bottles": {"dev": bottle_json},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
+50 -12
View File
@@ -76,14 +76,18 @@ class TestEntrypointRender(unittest.TestCase):
)
script = git_gate_render_entrypoint(ups)
self.assertIn("#!/bin/sh", script)
self.assertIn(
"init_repo 'bot-bottle' "
"'ssh://git@gitea.dideric.is:30009/didericis/bot-bottle.git'",
script,
# shlex.quote leaves safe strings unquoted; verify via token parse.
import shlex as _shlex
lines_with_init = [l for l in script.splitlines() if l.startswith("init_repo ")]
self.assertEqual(2, len(lines_with_init))
self.assertEqual(
["init_repo", "bot-bottle",
"ssh://git@gitea.dideric.is:30009/didericis/bot-bottle.git"],
_shlex.split(lines_with_init[0]),
)
self.assertIn(
"init_repo 'foo' 'ssh://git@github.com/didericis/foo.git'",
script,
self.assertEqual(
["init_repo", "foo", "ssh://git@github.com/didericis/foo.git"],
_shlex.split(lines_with_init[1]),
)
# Daemon line is what keeps PID 1 alive.
self.assertIn("exec git daemon", script)
@@ -108,6 +112,41 @@ class TestEntrypointRender(unittest.TestCase):
self.assertNotIn("init_repo '", script)
self.assertIn("exec git daemon", script)
def test_single_quote_in_upstream_url_is_escaped(self):
ups = (GitGateUpstream(
name="myrepo",
upstream_url="ssh://git@host/path'with'quotes.git",
upstream_host="host",
upstream_port="22",
identity_file="/key",
known_host_key="",
),)
script = git_gate_render_entrypoint(ups)
self.assertNotIn(
"init_repo 'myrepo' 'ssh://git@host/path'with'quotes.git'",
script,
)
self.assertIn("init_repo", script)
self.assertIn("path", script)
def test_space_and_semicolon_in_upstream_url_are_escaped(self):
import shlex as _shlex
raw_url = "ssh://git@host/path with spaces;evil.git"
ups = (GitGateUpstream(
name="myrepo",
upstream_url=raw_url,
upstream_host="host",
upstream_port="22",
identity_file="/key",
known_host_key="",
),)
script = git_gate_render_entrypoint(ups)
line = next(l for l in script.splitlines() if l.startswith("init_repo "))
tokens = _shlex.split(line)
self.assertEqual(3, len(tokens))
self.assertEqual("myrepo", tokens[1])
self.assertEqual(raw_url, tokens[2])
class TestHookRender(unittest.TestCase):
def test_pre_receive_hook_has_two_phases(self):
@@ -220,11 +259,10 @@ class TestPrepare(unittest.TestCase):
def test_prepare_skips_known_hosts_file_when_key_missing(self):
manifest = Manifest.from_json_obj({
"bottles": {"dev": {"git": {"remotes": {
"github.com": {
"Name": "foo",
"Upstream": "ssh://git@github.com/didericis/foo.git",
"IdentityFile": "/dev/null",
"bottles": {"dev": {"git-gate": {"repos": {
"foo": {
"url": "ssh://git@github.com/didericis/foo.git",
"identity": "/dev/null",
},
}}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
+27 -33
View File
@@ -1,14 +1,14 @@
"""Unit: agent-level git.user overlay + provenance (PRD 0027, issue #94).
"""Unit: agent-level git-gate.user overlay + provenance (PRD 0027, PRD 0047).
An agent file may declare `git.user` (name/email). At
An agent file may declare `git-gate.user` (name/email). At
`Manifest.bottle_for()` it overlays the referenced bottle's
`git.user` per-field, agent-wins-on-non-empty. `git.remotes` is
`git-gate.user` per-field, agent-wins-on-non-empty. `git-gate.repos` is
rejected on agents. `Manifest.git_identity_summary()` reports the
effective identity with per-field `(agent)`/`(bottle)` provenance.
The `from_json_obj` path drives `Agent.from_dict` + `bottle_for`;
a temp-dir case locks the md loader (the `_AGENT_KEYS` allow + the
`git` threading into `agent_dict`)."""
`git-gate` threading into `agent_dict`)."""
from __future__ import annotations
@@ -34,10 +34,10 @@ def _error_message(callable_, *args, **kwargs) -> str:
def _manifest(*, bottle_user=None, agent_git=None) -> Manifest:
bottle: dict = {}
if bottle_user is not None:
bottle = {"git": {"user": bottle_user}}
bottle = {"git-gate": {"user": bottle_user}}
agent: dict = {"skills": [], "prompt": "", "bottle": "dev"}
if agent_git is not None:
agent["git"] = agent_git
agent["git-gate"] = agent_git
return Manifest.from_json_obj({
"bottles": {"dev": bottle},
"agents": {"impl": agent},
@@ -71,7 +71,6 @@ class TestAgentGitUserOverlay(unittest.TestCase):
def test_agent_identity_with_bottle_declaring_none(self):
m = _manifest(agent_git={"user": {"name": "a", "email": "a@b"}})
# The underlying bottle declares no identity; the merged one does.
self.assertTrue(m.bottles["dev"].git_user.is_empty())
self.assertFalse(m.bottle_for("impl").git_user.is_empty())
@@ -82,14 +81,10 @@ class TestAgentGitUserOverlay(unittest.TestCase):
self.assertEqual("b@c", u.email)
def test_bottle_for_returns_same_instance_when_no_overlay(self):
# No agent git.user → no replace(); the cached Bottle is
# returned as-is (identity check guards against churn).
m = _manifest(bottle_user={"name": "B"})
self.assertIs(m.bottles["dev"], m.bottle_for("impl"))
def test_bottle_for_returns_same_instance_when_overlay_is_noop(self):
# Agent restates exactly what the bottle already has → merged
# == bottle.git_user → same instance, no replace().
m = _manifest(
bottle_user={"name": "B", "email": "b@c"},
agent_git={"user": {"name": "B", "email": "b@c"}},
@@ -101,11 +96,11 @@ class TestAgentGitUserOverlay(unittest.TestCase):
"bottles": {"dev": {
"env": {"FOO": "bar"},
"supervise": True,
"git": {"user": {"name": "B"}},
"git-gate": {"user": {"name": "B"}},
}},
"agents": {"impl": {
"bottle": "dev", "skills": [], "prompt": "",
"git": {"user": {"name": "a"}},
"git-gate": {"user": {"name": "a"}},
}},
})
b = m.bottle_for("impl")
@@ -115,11 +110,11 @@ class TestAgentGitUserOverlay(unittest.TestCase):
class TestAgentGitUserRejections(unittest.TestCase):
def test_agent_remotes_dies_bottle_only(self):
def test_agent_repos_dies_bottle_only(self):
msg = _error_message(_manifest, agent_git={
"remotes": {"h": {"Name": "r", "Upstream": "ssh://x/y.git"}},
"repos": {"r": {"url": "ssh://git@x/y.git", "identity": "/dev/null"}},
})
self.assertIn("git.remotes", msg)
self.assertIn("git-gate.repos", msg)
self.assertIn("bottle-only", msg)
def test_agent_unknown_git_subkey_dies(self):
@@ -127,7 +122,6 @@ class TestAgentGitUserRejections(unittest.TestCase):
self.assertIn("not allowed at the agent level", msg)
def test_agent_git_user_both_empty_dies(self):
# Reuses GitUser.from_dict validation.
msg = _error_message(_manifest, agent_git={"user": {"name": "", "email": ""}})
self.assertIn("neither name nor email", msg)
@@ -164,7 +158,7 @@ class TestGitIdentitySummary(unittest.TestCase):
_BOTTLE_DEV = """
---
git:
git-gate:
user:
name: bottle-name
email: bottle@example.com
@@ -176,7 +170,7 @@ _BOTTLE_DEV = """
_AGENT_WITH_GIT = """
---
bottle: dev
git:
git-gate:
user:
name: agent-name
---
@@ -184,14 +178,14 @@ _AGENT_WITH_GIT = """
impl agent.
"""
_AGENT_WITH_REMOTES = """
_AGENT_WITH_REPOS = """
---
bottle: dev
git:
remotes:
h:
Name: r
Upstream: ssh://x/y.git
git-gate:
repos:
r:
url: ssh://git@x/y.git
identity: /dev/null
---
bad agent.
@@ -199,9 +193,9 @@ _AGENT_WITH_REMOTES = """
class TestAgentGitUserMdLoader(unittest.TestCase):
"""Locks the md path: `git` is an accepted agent key and threads
into the parsed Agent (not rejected as an unknown frontmatter
key), and agent `git.remotes` dies through the same loader."""
"""Locks the md path: `git-gate` is an accepted agent key and threads
into the parsed Agent (not rejected as an unknown frontmatter key),
and agent `git-gate.repos` dies through the same loader."""
def setUp(self) -> None:
self.home = Path(tempfile.mkdtemp(prefix="cb-home-"))
@@ -225,18 +219,18 @@ class TestAgentGitUserMdLoader(unittest.TestCase):
self._write("agents/impl.md", _AGENT_WITH_GIT)
m = Manifest.resolve(str(self.home))
u = m.bottle_for("impl").git_user
self.assertEqual("agent-name", u.name) # agent wins
self.assertEqual("bottle@example.com", u.email) # bottle falls through
self.assertEqual("agent-name", u.name)
self.assertEqual("bottle@example.com", u.email)
self.assertEqual(
"name=agent-name (agent), email=bottle@example.com (bottle)",
m.git_identity_summary("impl"),
)
def test_md_agent_remotes_dies(self):
def test_md_agent_repos_dies(self):
self._write("bottles/dev.md", _BOTTLE_DEV)
self._write("agents/impl.md", _AGENT_WITH_REMOTES)
self._write("agents/impl.md", _AGENT_WITH_REPOS)
msg = _error_message(Manifest.resolve, str(self.home))
self.assertIn("git.remotes", msg)
self.assertIn("git-gate.repos", msg)
self.assertIn("bottle-only", msg)
+30 -45
View File
@@ -113,42 +113,30 @@ class TestExtendsEnvMerge(unittest.TestCase):
class TestExtendsGitMerge(unittest.TestCase):
"""git.user overlays by field; git.remotes merges by upstream
"""git-gate.user overlays by field; git-gate.repos merges by upstream
host, with child entries replacing duplicate hosts."""
_GIT_ENTRY_A = {
"Name": "a",
"Upstream": "ssh://git@host-a/a.git",
"IdentityFile": "/dev/null",
}
_GIT_ENTRY_B = {
"Name": "b",
"Upstream": "ssh://git@host-b/b.git",
"IdentityFile": "/dev/null",
}
_GIT_ENTRY_A = {"url": "ssh://git@host-a/a.git", "identity": "/dev/null"}
_GIT_ENTRY_B = {"url": "ssh://git@host-b/b.git", "identity": "/dev/null"}
def test_child_git_remotes_merge_with_parent(self):
def test_child_git_repos_merge_with_parent(self):
m = _build(
base={"git": {"remotes": {"host-a": self._GIT_ENTRY_A}}},
base={"git-gate": {"repos": {"a": self._GIT_ENTRY_A}}},
child={
"extends": "base",
"git": {"remotes": {"host-b": self._GIT_ENTRY_B}},
"git-gate": {"repos": {"b": self._GIT_ENTRY_B}},
},
)
names = [e.Name for e in m.bottles["child"].git]
self.assertEqual(["a", "b"], names)
def test_child_git_remote_replaces_same_host(self):
replacement = {
"Name": "a2",
"Upstream": "ssh://git@host-a/replacement.git",
"IdentityFile": "/dev/null",
}
def test_child_git_repo_replaces_same_host(self):
replacement = {"url": "ssh://git@host-a/replacement.git", "identity": "/dev/null"}
m = _build(
base={"git": {"remotes": {"host-a": self._GIT_ENTRY_A}}},
base={"git-gate": {"repos": {"a": self._GIT_ENTRY_A}}},
child={
"extends": "base",
"git": {"remotes": {"host-a": replacement}},
"git-gate": {"repos": {"a2": replacement}},
},
)
entries = m.bottles["child"].git
@@ -156,30 +144,30 @@ class TestExtendsGitMerge(unittest.TestCase):
self.assertEqual("a2", entries[0].Name)
self.assertEqual("replacement.git", entries[0].UpstreamPath)
def test_child_omits_git_inherits_full_list(self):
def test_child_omits_git_gate_inherits_full_list(self):
m = _build(
base={"git": {"remotes": {
"host-a": self._GIT_ENTRY_A,
"host-b": self._GIT_ENTRY_B,
base={"git-gate": {"repos": {
"a": self._GIT_ENTRY_A,
"b": self._GIT_ENTRY_B,
}}},
child={"extends": "base"},
)
names = [e.Name for e in m.bottles["child"].git]
self.assertEqual(["a", "b"], names)
def test_child_explicit_empty_git_clears_parent(self):
# `git.remotes: {}` is the documented way to say "drop
# the parent's remotes" rather than "inherit them".
def test_child_explicit_empty_repos_clears_parent(self):
# `git-gate.repos: {}` is the documented way to say "drop
# the parent's repos" rather than "inherit them".
m = _build(
base={"git": {"remotes": {"host-a": self._GIT_ENTRY_A}}},
child={"extends": "base", "git": {"remotes": {}}},
base={"git-gate": {"repos": {"a": self._GIT_ENTRY_A}}},
child={"extends": "base", "git-gate": {"repos": {}}},
)
self.assertEqual((), m.bottles["child"].git)
def test_child_git_user_inherits_parent_remotes(self):
def test_child_git_user_inherits_parent_repos(self):
m = _build(
base={"git": {"remotes": {"host-a": self._GIT_ENTRY_A}}},
child={"extends": "base", "git": {"user": {"name": "Child"}}},
base={"git-gate": {"repos": {"a": self._GIT_ENTRY_A}}},
child={"extends": "base", "git-gate": {"user": {"name": "Child"}}},
)
self.assertEqual(["a"], [e.Name for e in m.bottles["child"].git])
self.assertEqual("Child", m.bottles["child"].git_user.name)
@@ -209,12 +197,12 @@ class TestExtendsListsFullReplace(unittest.TestCase):
class TestExtendsGitUserOverlay(unittest.TestCase):
"""git.user: per-field overlay. Each non-empty field on child
"""git-gate.user: per-field overlay. Each non-empty field on child
wins; empties fall through to parent."""
def test_parent_full_child_omits(self):
m = _build(
base={"git": {"user": {"name": "Parent", "email": "p@x"}}},
base={"git-gate": {"user": {"name": "Parent", "email": "p@x"}}},
child={"extends": "base"},
)
u = m.bottles["child"].git_user
@@ -223,10 +211,10 @@ class TestExtendsGitUserOverlay(unittest.TestCase):
def test_child_overrides_both(self):
m = _build(
base={"git": {"user": {"name": "Parent", "email": "p@x"}}},
base={"git-gate": {"user": {"name": "Parent", "email": "p@x"}}},
child={
"extends": "base",
"git": {"user": {"name": "Child", "email": "c@x"}},
"git-gate": {"user": {"name": "Child", "email": "c@x"}},
},
)
u = m.bottles["child"].git_user
@@ -234,11 +222,9 @@ class TestExtendsGitUserOverlay(unittest.TestCase):
self.assertEqual("c@x", u.email)
def test_child_adds_email_inherits_name(self):
# Parent sets only name; child sets only email. Both end
# up populated on the child.
m = _build(
base={"git": {"user": {"name": "Parent"}}},
child={"extends": "base", "git": {"user": {"email": "c@x"}}},
base={"git-gate": {"user": {"name": "Parent"}}},
child={"extends": "base", "git-gate": {"user": {"email": "c@x"}}},
)
u = m.bottles["child"].git_user
self.assertEqual("Parent", u.name)
@@ -246,11 +232,10 @@ class TestExtendsGitUserOverlay(unittest.TestCase):
def test_child_overrides_only_email(self):
m = _build(
base={"git": {"user": {"name": "Parent", "email": "p@x"}}},
child={"extends": "base", "git": {"user": {"email": "c@x"}}},
base={"git-gate": {"user": {"name": "Parent", "email": "p@x"}}},
child={"extends": "base", "git-gate": {"user": {"email": "c@x"}}},
)
u = m.bottles["child"].git_user
# Child overrides email; name inherited from parent.
self.assertEqual("Parent", u.name)
self.assertEqual("c@x", u.email)
+181 -131
View File
@@ -1,39 +1,25 @@
"""Unit: Bottle.git manifest parsing + validation (PRD 0008)."""
"""Unit: git-gate.repos manifest parsing + validation (PRD 0047)."""
import unittest
from bot_bottle.manifest import ManifestError, Manifest
def _manifest(git_entries):
def _manifest(repos: dict) -> dict:
return {
"bottles": {"dev": {"git": {"remotes": {
_host_for(entry): entry for entry in git_entries
}}}},
"bottles": {"dev": {"git-gate": {"repos": repos}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
}
def _host_for(entry):
upstream = entry.get("Upstream", "")
if "@a.example" in upstream:
return "a.example"
if "@b.example" in upstream:
return "b.example"
if "@github.com" in upstream:
return "github.com"
if "@gitea.dideric.is" in upstream:
return "gitea.dideric.is"
return "example.com"
class TestGitEntryParsing(unittest.TestCase):
def test_parses_minimal_entry(self):
m = Manifest.from_json_obj(_manifest([{
"Name": "bot-bottle",
"Upstream": "ssh://git@gitea.dideric.is:30009/didericis/bot-bottle.git",
"IdentityFile": "/dev/null",
}]))
m = Manifest.from_json_obj(_manifest({
"bot-bottle": {
"url": "ssh://git@gitea.dideric.is:30009/didericis/bot-bottle.git",
"identity": "/dev/null",
},
}))
entries = m.bottles["dev"].git
self.assertEqual(1, len(entries))
e = entries[0]
@@ -44,138 +30,145 @@ class TestGitEntryParsing(unittest.TestCase):
self.assertEqual("didericis/bot-bottle.git", e.UpstreamPath)
def test_default_port_is_22(self):
m = Manifest.from_json_obj(_manifest([{
"Name": "foo",
"Upstream": "ssh://git@github.com/didericis/foo.git",
"IdentityFile": "/dev/null",
}]))
m = Manifest.from_json_obj(_manifest({
"foo": {
"url": "ssh://git@github.com/didericis/foo.git",
"identity": "/dev/null",
},
}))
e = m.bottles["dev"].git[0]
self.assertEqual("22", e.UpstreamPort)
self.assertEqual("github.com", e.UpstreamHost)
def test_known_host_key_optional(self):
m = Manifest.from_json_obj(_manifest([{
"Name": "foo",
"Upstream": "ssh://git@github.com/foo.git",
"IdentityFile": "/dev/null",
}]))
def test_host_key_optional(self):
m = Manifest.from_json_obj(_manifest({
"foo": {
"url": "ssh://git@github.com/foo.git",
"identity": "/dev/null",
},
}))
self.assertEqual("", m.bottles["dev"].git[0].KnownHostKey)
def test_missing_name_dies(self):
with self.assertRaises(ManifestError):
Manifest.from_json_obj(_manifest([{
"Upstream": "ssh://git@github.com/foo.git",
"IdentityFile": "/dev/null",
}]))
def test_host_key_stored(self):
m = Manifest.from_json_obj(_manifest({
"foo": {
"url": "ssh://git@github.com/foo.git",
"identity": "/dev/null",
"host_key": "ssh-ed25519 AAAA",
},
}))
self.assertEqual("ssh-ed25519 AAAA", m.bottles["dev"].git[0].KnownHostKey)
def test_missing_upstream_dies(self):
with self.assertRaises(ManifestError):
Manifest.from_json_obj(_manifest([{
"Name": "foo",
"IdentityFile": "/dev/null",
}]))
def test_repo_name_becomes_Name(self):
m = Manifest.from_json_obj(_manifest({
"my-repo": {
"url": "ssh://git@github.com/foo.git",
"identity": "/dev/null",
},
}))
self.assertEqual("my-repo", m.bottles["dev"].git[0].Name)
def test_missing_identity_file_dies(self):
def test_missing_url_dies(self):
with self.assertRaises(ManifestError):
Manifest.from_json_obj(_manifest([{
"Name": "foo",
"Upstream": "ssh://git@github.com/foo.git",
}]))
Manifest.from_json_obj(_manifest({
"foo": {"identity": "/dev/null"},
}))
def test_non_ssh_upstream_dies(self):
def test_missing_identity_dies(self):
with self.assertRaises(ManifestError):
Manifest.from_json_obj(_manifest([{
"Name": "foo",
"Upstream": "https://github.com/didericis/foo.git",
"IdentityFile": "/dev/null",
}]))
Manifest.from_json_obj(_manifest({
"foo": {"url": "ssh://git@github.com/foo.git"},
}))
def test_scp_style_upstream_dies(self):
# SCP-style "git@host:path" is intentionally not supported in
# v1 — ssh:// only.
def test_unknown_key_in_entry_dies(self):
with self.assertRaises(ManifestError):
Manifest.from_json_obj(_manifest([{
"Name": "foo",
"Upstream": "git@github.com:didericis/foo.git",
"IdentityFile": "/dev/null",
}]))
Manifest.from_json_obj(_manifest({
"foo": {
"url": "ssh://git@github.com/foo.git",
"identity": "/dev/null",
"IdentityFile": "/dev/null", # old PascalCase key
},
}))
def test_upstream_without_user_dies(self):
def test_non_ssh_url_dies(self):
with self.assertRaises(ManifestError):
Manifest.from_json_obj(_manifest([{
"Name": "foo",
"Upstream": "ssh://github.com/foo.git",
"IdentityFile": "/dev/null",
}]))
Manifest.from_json_obj(_manifest({
"foo": {
"url": "https://github.com/didericis/foo.git",
"identity": "/dev/null",
},
}))
def test_upstream_without_path_dies(self):
def test_scp_style_url_dies(self):
with self.assertRaises(ManifestError):
Manifest.from_json_obj(_manifest([{
"Name": "foo",
"Upstream": "ssh://git@github.com",
"IdentityFile": "/dev/null",
}]))
Manifest.from_json_obj(_manifest({
"foo": {
"url": "git@github.com:didericis/foo.git",
"identity": "/dev/null",
},
}))
def test_url_without_user_dies(self):
with self.assertRaises(ManifestError):
Manifest.from_json_obj(_manifest({
"foo": {
"url": "ssh://github.com/foo.git",
"identity": "/dev/null",
},
}))
def test_url_without_path_dies(self):
with self.assertRaises(ManifestError):
Manifest.from_json_obj(_manifest({
"foo": {
"url": "ssh://git@github.com",
"identity": "/dev/null",
},
}))
def test_non_numeric_port_dies(self):
with self.assertRaises(ManifestError):
Manifest.from_json_obj(_manifest([{
"Name": "foo",
"Upstream": "ssh://git@github.com:notaport/foo.git",
"IdentityFile": "/dev/null",
}]))
Manifest.from_json_obj(_manifest({
"foo": {
"url": "ssh://git@github.com:notaport/foo.git",
"identity": "/dev/null",
},
}))
def test_ip_literal_upstream(self):
m = Manifest.from_json_obj(_manifest({
"bot-bottle": {
"url": "ssh://git@100.78.141.42:30009/didericis/bot-bottle.git",
"identity": "/dev/null",
},
}))
e = m.bottles["dev"].git[0]
self.assertEqual("100.78.141.42", e.UpstreamHost)
self.assertEqual("30009", e.UpstreamPort)
self.assertEqual("bot-bottle", e.Name)
class TestGitEntryCrossValidation(unittest.TestCase):
def test_duplicate_name_dies(self):
with self.assertRaises(ManifestError):
Manifest.from_json_obj({
"bottles": {"dev": {"git": {"remotes": {
"a.example": {
"Name": "foo",
"Upstream": "ssh://git@a.example/x.git",
"IdentityFile": "/dev/null",
},
"b.example": {
"Name": "foo",
"Upstream": "ssh://git@b.example/y.git",
"IdentityFile": "/dev/null",
},
}}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
})
def test_remote_key_must_match_upstream_host(self):
with self.assertRaises(ManifestError):
Manifest.from_json_obj({
"bottles": {"dev": {"git": {"remotes": {
"wrong.example": {
"Name": "foo",
"Upstream": "ssh://git@github.com/foo.git",
"IdentityFile": "/dev/null",
},
}}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
})
def test_remote_key_can_name_logical_host_for_ip_upstream(self):
def test_two_repos_different_hosts_both_parsed(self):
# Repo names come from dict keys; two distinct keys always produce
# two distinct entries (uniqueness is guaranteed at the YAML/dict level).
m = Manifest.from_json_obj({
"bottles": {"dev": {"git": {"remotes": {
"gitea.dideric.is": {
"Name": "bot-bottle",
"Upstream": "ssh://git@100.78.141.42:30009/didericis/bot-bottle.git",
"IdentityFile": "/dev/null",
"bottles": {"dev": {"git-gate": {"repos": {
"foo": {
"url": "ssh://git@a.example/x.git",
"identity": "/dev/null",
},
"bar": {
"url": "ssh://git@b.example/y.git",
"identity": "/dev/null",
},
}}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
})
e = m.bottles["dev"].git[0]
self.assertEqual("gitea.dideric.is", e.RemoteKey)
self.assertEqual("100.78.141.42", e.UpstreamHost)
self.assertEqual("30009", e.UpstreamPort)
names = {e.Name for e in m.bottles["dev"].git}
self.assertEqual({"foo", "bar"}, names)
def test_legacy_ssh_field_dies_with_hint(self):
# PRD 0009: bottle.ssh is removed; manifests carrying it must
# fail loudly with a hint pointing at bottle.git.
with self.assertRaises(ManifestError):
Manifest.from_json_obj({
"bottles": {
@@ -192,25 +185,82 @@ class TestGitEntryCrossValidation(unittest.TestCase):
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
})
def test_name_with_single_quote_dies(self):
with self.assertRaises(ManifestError):
Manifest.from_json_obj(_manifest({
"o'reilly": {
"url": "ssh://git@github.com/foo.git",
"identity": "/dev/null",
},
}))
class TestEmptyGitField(unittest.TestCase):
def test_no_git_field_yields_empty_tuple(self):
def test_name_with_space_dies(self):
with self.assertRaises(ManifestError):
Manifest.from_json_obj(_manifest({
"my repo": {
"url": "ssh://git@github.com/foo.git",
"identity": "/dev/null",
},
}))
def test_name_with_semicolon_dies(self):
with self.assertRaises(ManifestError):
Manifest.from_json_obj(_manifest({
"foo;bar": {
"url": "ssh://git@github.com/foo.git",
"identity": "/dev/null",
},
}))
def test_name_with_dollar_dies(self):
with self.assertRaises(ManifestError):
Manifest.from_json_obj(_manifest({
"foo$bar": {
"url": "ssh://git@github.com/foo.git",
"identity": "/dev/null",
},
}))
def test_valid_name_with_dots_and_hyphens_accepted(self):
m = Manifest.from_json_obj(_manifest({
"my.repo-name_1": {
"url": "ssh://git@github.com/foo.git",
"identity": "/dev/null",
},
}))
self.assertEqual("my.repo-name_1", m.bottles["dev"].git[0].Name)
def test_legacy_git_key_dies_with_hint(self):
msg = ""
try:
Manifest.from_json_obj({
"bottles": {"dev": {"git": {"remotes": {}}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
})
except ManifestError as e:
msg = str(e)
self.assertIn("git-gate", msg)
self.assertIn("PRD 0047", msg)
class TestEmptyGitGateField(unittest.TestCase):
def test_no_git_gate_field_yields_empty_tuple(self):
m = Manifest.from_json_obj({
"bottles": {"dev": {}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
})
self.assertEqual((), m.bottles["dev"].git)
def test_git_object_type_required(self):
def test_git_gate_object_type_required(self):
with self.assertRaises(ManifestError):
Manifest.from_json_obj({
"bottles": {"dev": {"git": "not-a-list"}},
"bottles": {"dev": {"git-gate": "not-a-dict"}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
})
def test_empty_remotes_yields_empty_tuple(self):
def test_empty_repos_yields_empty_tuple(self):
m = Manifest.from_json_obj({
"bottles": {"dev": {"git": {"remotes": {}}}},
"bottles": {"dev": {"git-gate": {"repos": {}}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
})
self.assertEqual((), m.bottles["dev"].git)
+5 -5
View File
@@ -1,4 +1,4 @@
"""Unit: Bottle git.user manifest parsing + validation (issue #86)."""
"""Unit: Bottle git-gate.user manifest parsing + validation (issue #86, PRD 0047)."""
import unittest
@@ -16,7 +16,7 @@ def _error_message(callable_, *args, **kwargs) -> str:
def _manifest(git_user):
return {
"bottles": {"dev": {"git": {"user": git_user}}},
"bottles": {"dev": {"git-gate": {"user": git_user}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
}
@@ -75,13 +75,13 @@ class TestGitUserParsing(unittest.TestCase):
msg = _error_message(
Manifest.from_json_obj, _manifest({"name": 42}),
)
self.assertIn("git.user.name must be a string", msg)
self.assertIn("git-gate.user.name must be a string", msg)
def test_non_string_email_dies(self):
msg = _error_message(
Manifest.from_json_obj, _manifest({"email": ["x@y.z"]}),
)
self.assertIn("git.user.email must be a string", msg)
self.assertIn("git-gate.user.email must be a string", msg)
def test_legacy_top_level_git_user_dies(self):
msg = _error_message(
@@ -92,7 +92,7 @@ class TestGitUserParsing(unittest.TestCase):
},
)
self.assertIn("git_user", msg)
self.assertIn("git.user", msg)
self.assertIn("git-gate.user", msg)
class TestGitUserDirect(unittest.TestCase):
+8 -11
View File
@@ -69,13 +69,14 @@ class TestGitGateGitconfigRender(unittest.TestCase):
'[url "http://127.0.0.16:57001/bot-bottle.git"]', out,
)
def test_ip_upstream_also_rewrites_logical_remote_key(self):
def test_ip_upstream_emits_single_insteadof(self):
# In the new format the dict key is the repo name, not a host
# alias, so there is only one insteadOf rule — for the IP URL.
m = Manifest.from_json_obj({
"bottles": {"dev": {"git": {"remotes": {
"gitea.dideric.is": {
"Name": "bot-bottle",
"Upstream": "ssh://git@100.78.141.42:30009/didericis/bot-bottle.git",
"IdentityFile": "/dev/null",
"bottles": {"dev": {"git-gate": {"repos": {
"bot-bottle": {
"url": "ssh://git@100.78.141.42:30009/didericis/bot-bottle.git",
"identity": "/dev/null",
},
}}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
@@ -86,11 +87,7 @@ class TestGitGateGitconfigRender(unittest.TestCase):
"ssh://git@100.78.141.42:30009/didericis/bot-bottle.git",
out,
)
self.assertIn(
"\tinsteadOf = "
"ssh://git@gitea.dideric.is:30009/didericis/bot-bottle.git",
out,
)
self.assertNotIn("gitea.dideric.is", out)
if __name__ == "__main__":
+8 -14
View File
@@ -42,11 +42,6 @@ from bot_bottle.supervise import SupervisePlan
from bot_bottle.workspace import workspace_plan
def _remote_host(g: GitEntry) -> str:
if g.UpstreamHost:
return g.UpstreamHost
return g.Upstream.split("@", 1)[1].split("/", 1)[0].split(":", 1)[0]
def _plan(
*,
@@ -69,20 +64,19 @@ def _plan(
guest_env: dict[str, str] | None = None,
) -> SmolmachinesBottlePlan:
bottle_json: dict = {}
git_json: dict = {}
git_gate_json: dict = {}
if git:
git_json["remotes"] = {
_remote_host(g): {
"Name": g.Name,
"Upstream": g.Upstream,
"IdentityFile": g.IdentityFile,
git_gate_json["repos"] = {
g.Name: {
"url": g.Upstream,
"identity": g.IdentityFile,
}
for g in git
}
if git_user is not None:
git_json["user"] = git_user
if git_json:
bottle_json["git"] = git_json
git_gate_json["user"] = git_user
if git_gate_json:
bottle_json["git-gate"] = git_gate_json
if supervise:
bottle_json["supervise"] = True
manifest = Manifest.from_json_obj({