refactor: address PR review feedback — de-privatize helpers and rename modules
test / unit (pull_request) Successful in 34s
test / integration (pull_request) Successful in 43s
test / unit (push) Successful in 34s
test / integration (push) Successful in 43s

- Rename _manifest_util.py → manifest_util.py (module isn't private)
- Rename _as_json_object → as_json_object, _parse_git_upstream → parse_git_upstream,
  _parse_git_gate_config → parse_git_gate_config,
  _validate_unique_git_names → validate_unique_git_names,
  _validate_egress_routes → validate_egress_routes (none are private at
  module boundary — underscore prefix was a carry-over from the old
  monolithic manifest.py where everything lived in one namespace)
- Move _is_ip_literal → util.is_ip_literal (generic, belongs in the
  top-level util module)
- Update all import sites across manifest_*.py, manifest_extends.py,
  manifest_schema.py; existing callers of manifest.py are unaffected

All 867 unit tests pass.
This commit was merged in pull request #163.
This commit is contained in:
2026-06-03 04:32:25 +00:00
committed by didericis
parent b9ab1263c2
commit 8c9d4fbc46
8 changed files with 46 additions and 48 deletions
+10 -13
View File
@@ -50,16 +50,16 @@ from dataclasses import dataclass, field, replace
from pathlib import Path
from typing import Mapping
from ._manifest_util import ManifestError, _as_json_object
from .manifest_util import ManifestError, as_json_object
from .manifest_agent import Agent, AgentProvider
from .manifest_egress import (
EGRESS_AUTH_SCHEMES,
EgressConfig,
EgressRoute,
PipelockRoutePolicy,
_validate_egress_routes,
validate_egress_routes,
)
from .manifest_git import GitEntry, GitUser, _parse_git_gate_config
from .manifest_git import GitEntry, GitUser, parse_git_gate_config
from .manifest_schema import BOTTLE_KEYS
# Re-export everything that callers currently import from this module.
@@ -75,9 +75,6 @@ __all__ = [
"Agent",
"Bottle",
"Manifest",
# private helpers used by manifest_extends / manifest_loader
"_as_json_object",
"_validate_egress_routes",
]
@@ -86,10 +83,10 @@ def _empty_str_dict() -> dict[str, str]:
def _section_dict(value: object, label: str) -> dict[str, object]:
"""Like _as_json_object but treats absent/null as an empty section."""
"""Like as_json_object but treats absent/null as an empty section."""
if value is None:
return {}
return _as_json_object(value, label)
return as_json_object(value, label)
@dataclass(frozen=True)
@@ -114,7 +111,7 @@ class Bottle:
@classmethod
def from_dict(cls, name: str, raw: object) -> "Bottle":
d = _as_json_object(raw, f"bottle '{name}'")
d = as_json_object(raw, f"bottle '{name}'")
if "runtime" in d:
raise ManifestError(
@@ -156,7 +153,7 @@ class Bottle:
env: dict[str, str] = {}
env_raw = d.get("env")
if env_raw is not None:
env_dict = _as_json_object(env_raw, f"bottle '{name}' env")
env_dict = as_json_object(env_raw, f"bottle '{name}' env")
for var, value in env_dict.items():
if not isinstance(value, str):
raise ManifestError(
@@ -169,7 +166,7 @@ class Bottle:
git_user = GitUser()
git_raw = d.get("git-gate")
if git_raw is not None:
git, git_user = _parse_git_gate_config(name, git_raw)
git, git_user = parse_git_gate_config(name, git_raw)
agent_provider = (
AgentProvider.from_dict(name, d["agent_provider"])
@@ -298,7 +295,7 @@ class Manifest:
@classmethod
def from_json_obj(cls, obj: object) -> "Manifest":
"""Validate and build a Manifest from a raw JSON-like dict."""
d = _as_json_object(obj, "manifest")
d = as_json_object(obj, "manifest")
raw_bottles_obj = _section_dict(d.get("bottles"), "manifest 'bottles'")
raw_agents = _section_dict(d.get("agents"), "manifest 'agents'")
@@ -307,7 +304,7 @@ class Manifest:
# consistently with the md-loader path.
raw_bottles: dict[str, dict[str, object]] = {}
for n, b in raw_bottles_obj.items():
raw_bottles[n] = _as_json_object(b, f"bottle '{n}'")
raw_bottles[n] = as_json_object(b, f"bottle '{n}'")
from .manifest_extends import resolve_bottles
bottles = resolve_bottles(raw_bottles)
+4 -4
View File
@@ -6,7 +6,7 @@ from dataclasses import dataclass
from typing import cast
from .agent_provider import PROVIDER_TEMPLATES
from ._manifest_util import ManifestError, _as_json_object
from .manifest_util import ManifestError, as_json_object
from .manifest_git import GitUser
from .manifest_schema import AGENT_MODEL_KEYS
@@ -36,7 +36,7 @@ class AgentProvider:
@classmethod
def from_dict(cls, bottle_name: str, raw: object) -> "AgentProvider":
d = _as_json_object(raw, f"bottle '{bottle_name}' agent_provider")
d = as_json_object(raw, f"bottle '{bottle_name}' agent_provider")
for k in d:
if k not in {"template", "dockerfile", "auth_token", "forward_host_credentials"}:
raise ManifestError(
@@ -103,7 +103,7 @@ class Agent:
@classmethod
def from_dict(cls, name: str, raw: object, bottle_names: set[str]) -> "Agent":
d = _as_json_object(raw, f"agent '{name}'")
d = as_json_object(raw, f"agent '{name}'")
unknown = set(d.keys()) - AGENT_MODEL_KEYS
if unknown:
allowed = ", ".join(sorted(AGENT_MODEL_KEYS))
@@ -151,7 +151,7 @@ class Agent:
git_user = GitUser()
git_raw = d.get("git-gate")
if git_raw is not None:
gd = _as_json_object(git_raw, f"agent '{name}' git-gate")
gd = as_json_object(git_raw, f"agent '{name}' git-gate")
for k in gd.keys():
if k != "user":
raise ManifestError(
+7 -15
View File
@@ -6,7 +6,7 @@ import ipaddress
from dataclasses import dataclass, field
from typing import cast
from ._manifest_util import ManifestError, _as_json_object
from .manifest_util import ManifestError, as_json_object
# Auth schemes for the egress route's optional `auth` block.
@@ -15,15 +15,7 @@ from ._manifest_util import ManifestError, _as_json_object
EGRESS_AUTH_SCHEMES = ("Bearer", "token")
def _is_ip_literal(value: str) -> bool:
try:
ipaddress.ip_address(value)
except ValueError:
return False
return True
def _validate_egress_routes(
def validate_egress_routes(
bottle_name: str,
routes: tuple[EgressRoute, ...],
) -> None:
@@ -68,7 +60,7 @@ class PipelockRoutePolicy:
cls, bottle_name: str, idx: int, raw: object,
) -> "PipelockRoutePolicy":
label = f"bottle '{bottle_name}' egress.routes[{idx}] pipelock"
d = _as_json_object(raw, label)
d = as_json_object(raw, label)
for k in d:
if k not in ("tls_passthrough", "ssrf_ip_allowlist"):
raise ManifestError(
@@ -145,7 +137,7 @@ class EgressRoute:
@classmethod
def from_dict(cls, bottle_name: str, idx: int, raw: object) -> "EgressRoute":
label = f"bottle '{bottle_name}' egress.routes[{idx}]"
d = _as_json_object(raw, label)
d = as_json_object(raw, label)
host = d.get("host")
if not isinstance(host, str) or not host:
raise ManifestError(f"{label} missing required string field 'host'")
@@ -178,7 +170,7 @@ class EgressRoute:
token_ref = ""
if "auth" in d:
auth_raw = d.get("auth")
auth_d = _as_json_object(auth_raw, f"{label} auth")
auth_d = as_json_object(auth_raw, f"{label} auth")
if not auth_d:
raise ManifestError(
f"{label} auth is empty ({{}}); omit the 'auth' key "
@@ -270,7 +262,7 @@ class EgressConfig:
@classmethod
def from_dict(cls, bottle_name: str, raw: object) -> "EgressConfig":
d = _as_json_object(raw, f"bottle '{bottle_name}' egress")
d = as_json_object(raw, f"bottle '{bottle_name}' egress")
routes_raw = d.get("routes")
routes: tuple[EgressRoute, ...] = ()
if routes_raw is not None:
@@ -284,7 +276,7 @@ class EgressConfig:
EgressRoute.from_dict(bottle_name, i, entry)
for i, entry in enumerate(routes_list)
)
_validate_egress_routes(bottle_name, routes)
validate_egress_routes(bottle_name, routes)
for k in d:
if k != "routes":
raise ManifestError(
+4 -4
View File
@@ -72,7 +72,7 @@ def _merge_bottles(
) -> Bottle:
"""Apply PRD 0025 merge rules."""
from .manifest import Bottle, GitUser
from .manifest_egress import _validate_egress_routes
from .manifest_egress import validate_egress_routes
# Parse the child's declared fields into a Bottle (with the
# usual defaults for anything missing). Validation runs the same
@@ -110,7 +110,7 @@ def _merge_bottles(
merged_supervise = (
child.supervise if "supervise" in child_raw else parent.supervise
)
_validate_egress_routes(name, merged_egress.routes)
validate_egress_routes(name, merged_egress.routes)
return Bottle(
env=merged_env,
@@ -123,12 +123,12 @@ def _merge_bottles(
def _child_declares_git_gate_repos(child_raw: dict[str, object]) -> bool:
from ._manifest_util import _as_json_object
from .manifest_util import as_json_object
git_raw = child_raw.get("git-gate")
if git_raw is None:
return False
git_obj = _as_json_object(git_raw, "child git-gate")
git_obj = as_json_object(git_raw, "child git-gate")
return "repos" in git_obj
+10 -10
View File
@@ -4,7 +4,7 @@ from __future__ import annotations
from dataclasses import dataclass
from ._manifest_util import ManifestError, _as_json_object
from .manifest_util import ManifestError, as_json_object
def _opt_str(value: object, label: str) -> str:
@@ -15,7 +15,7 @@ def _opt_str(value: object, label: str) -> str:
return value
def _parse_git_upstream(url: str, label: str) -> tuple[str, str, str, str]:
def parse_git_upstream(url: str, label: str) -> tuple[str, str, str, str]:
"""Parse `ssh://user@host[:port]/path` into (user, host, port, path).
Dies if `url` doesn't match the ssh:// shape v1 supports. Default
port is 22 (matches OpenSSH)."""
@@ -44,7 +44,7 @@ def _parse_git_upstream(url: str, label: str) -> tuple[str, str, str, str]:
return (user, host, port, path)
def _validate_unique_git_names(bottle_name: str, git: tuple[GitEntry, ...]) -> None:
def validate_unique_git_names(bottle_name: str, git: tuple[GitEntry, ...]) -> None:
seen: dict[str, None] = {}
for g in git:
if g.Name in seen:
@@ -95,7 +95,7 @@ class GitEntry:
f"bottle '{bottle_name}' git-gate.repos has an empty key"
)
label = f"git-gate.repos[{repo_name!r}]"
d = _as_json_object(raw, f"bottle '{bottle_name}' {label}")
d = as_json_object(raw, f"bottle '{bottle_name}' {label}")
for k in d:
if k not in {"url", "identity", "host_key"}:
raise ManifestError(
@@ -116,7 +116,7 @@ class GitEntry:
d.get("host_key"),
f"bottle '{bottle_name}' {label} host_key",
)
user, host, port, path = _parse_git_upstream(
user, host, port, path = parse_git_upstream(
upstream, f"bottle '{bottle_name}' {label} url"
)
return cls(
@@ -149,7 +149,7 @@ class GitUser:
@classmethod
def from_dict(cls, bottle_name: str, raw: object) -> "GitUser":
d = _as_json_object(raw, f"bottle '{bottle_name}' git-gate.user")
d = as_json_object(raw, f"bottle '{bottle_name}' git-gate.user")
for k in d.keys():
if k not in {"name", "email"}:
raise ManifestError(
@@ -180,11 +180,11 @@ class GitUser:
return not self.name and not self.email
def _parse_git_gate_config(
def parse_git_gate_config(
bottle_name: str,
raw: object,
) -> tuple[tuple[GitEntry, ...], GitUser]:
d = _as_json_object(raw, f"bottle '{bottle_name}' git-gate")
d = as_json_object(raw, f"bottle '{bottle_name}' git-gate")
for k in d.keys():
if k not in {"user", "repos"}:
raise ManifestError(
@@ -201,11 +201,11 @@ def _parse_git_gate_config(
git: tuple[GitEntry, ...] = ()
repos_raw = d.get("repos")
if repos_raw is not None:
repos = _as_json_object(repos_raw, f"bottle '{bottle_name}' git-gate.repos")
repos = as_json_object(repos_raw, f"bottle '{bottle_name}' git-gate.repos")
git = tuple(
GitEntry.from_repos_entry(bottle_name, name, entry)
for name, entry in repos.items()
)
_validate_unique_git_names(bottle_name, git)
validate_unique_git_names(bottle_name, git)
return git, git_user
+1 -1
View File
@@ -58,7 +58,7 @@ def _validate_frontmatter_keys(
keys: object,
allowed_keys: frozenset[str],
) -> None:
from ._manifest_util import ManifestError
from .manifest_util import ManifestError
key_set = set(keys)
unknown = key_set - allowed_keys
@@ -9,7 +9,7 @@ class ManifestError(Exception):
"""A manifest file (or the manifest tree) is invalid."""
def _as_json_object(value: object, label: str) -> dict[str, object]:
def as_json_object(value: object, label: str) -> dict[str, object]:
"""Assert that `value` is a JSON object (str-keyed dict) and return
a view typed as `dict[str, object]` so downstream `.get(...)` calls
have a typed surface."""
+9
View File
@@ -5,9 +5,18 @@ level deeper, under their backend package."""
from __future__ import annotations
import ipaddress
import os
def is_ip_literal(value: str) -> bool:
try:
ipaddress.ip_address(value)
except ValueError:
return False
return True
def expand_tilde(path: str) -> str:
"""Expand a leading '~' to $HOME. Leaves paths without a leading
tilde unchanged. Falls back to the empty string if $HOME is unset