b9ab1263c2
Closes #157. Distributes the 1,026-line manifest.py across four focused modules: - _manifest_util.py: ManifestError + _as_json_object (shared base) - manifest_git.py: GitEntry, GitUser, git-gate config helpers - manifest_egress.py: EgressRoute, EgressConfig, PipelockRoutePolicy - manifest_agent.py: AgentProvider, Agent manifest.py is now the residual orchestration layer: Bottle, Manifest, and re-exports of all public names so existing callers are unaffected. All 867 unit tests pass.
295 lines
11 KiB
Python
295 lines
11 KiB
Python
"""Egress routing manifest dataclasses and helpers."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import ipaddress
|
|
from dataclasses import dataclass, field
|
|
from typing import cast
|
|
|
|
from ._manifest_util import ManifestError, _as_json_object
|
|
|
|
|
|
# Auth schemes for the egress route's optional `auth` block.
|
|
# Same values cred-proxy accepts today; `token` sidesteps the Gitea
|
|
# token-not-Bearer quirk (go-gitea/gitea#16734).
|
|
EGRESS_AUTH_SCHEMES = ("Bearer", "token")
|
|
|
|
|
|
def _is_ip_literal(value: str) -> bool:
|
|
try:
|
|
ipaddress.ip_address(value)
|
|
except ValueError:
|
|
return False
|
|
return True
|
|
|
|
|
|
def _validate_egress_routes(
|
|
bottle_name: str,
|
|
routes: tuple[EgressRoute, ...],
|
|
) -> None:
|
|
"""Cross-validation for `bottle.egress.routes`: hosts must be unique.
|
|
|
|
The proxy matches by exact-host (v1); duplicate hosts leave the
|
|
route choice ambiguous so we reject them up front.
|
|
|
|
No cross-validation against `bottle.git-gate.repos` is performed.
|
|
git-gate (SSH push/fetch) and egress (HTTPS) broker different
|
|
protocols; declaring both for the same host is a legitimate dev
|
|
setup."""
|
|
seen_hosts: dict[str, None] = {}
|
|
for r in routes:
|
|
key = r.Host.lower()
|
|
if key in seen_hosts:
|
|
raise ManifestError(
|
|
f"bottle '{bottle_name}' egress.routes has duplicate host "
|
|
f"{r.Host!r}; each host must be unique on the proxy."
|
|
)
|
|
seen_hosts[key] = None
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class PipelockRoutePolicy:
|
|
"""Per-route pipelock policy overrides.
|
|
|
|
`TlsPassthrough` adds the route host to pipelock's
|
|
`tls_interception.passthrough_domains`, so pipelock still enforces
|
|
the hostname allowlist but does not MITM/decrypt request bodies or
|
|
headers for that host.
|
|
|
|
`SsrfIpAllowlist` adds explicit IPs/CIDRs to pipelock's SSRF
|
|
allowlist for private/internal destinations behind this route.
|
|
"""
|
|
|
|
TlsPassthrough: bool = False
|
|
SsrfIpAllowlist: tuple[str, ...] = ()
|
|
|
|
@classmethod
|
|
def from_dict(
|
|
cls, bottle_name: str, idx: int, raw: object,
|
|
) -> "PipelockRoutePolicy":
|
|
label = f"bottle '{bottle_name}' egress.routes[{idx}] pipelock"
|
|
d = _as_json_object(raw, label)
|
|
for k in d:
|
|
if k not in ("tls_passthrough", "ssrf_ip_allowlist"):
|
|
raise ManifestError(
|
|
f"{label} has unknown key {k!r}; "
|
|
f"only 'tls_passthrough' and 'ssrf_ip_allowlist' "
|
|
f"are accepted"
|
|
)
|
|
tls_passthrough_raw = d.get("tls_passthrough", False)
|
|
if not isinstance(tls_passthrough_raw, bool):
|
|
raise ManifestError(
|
|
f"{label}.tls_passthrough must be a boolean "
|
|
f"(was {type(tls_passthrough_raw).__name__})"
|
|
)
|
|
ssrf_raw = d.get("ssrf_ip_allowlist", [])
|
|
if not isinstance(ssrf_raw, list):
|
|
raise ManifestError(
|
|
f"{label}.ssrf_ip_allowlist must be an array "
|
|
f"(was {type(ssrf_raw).__name__})"
|
|
)
|
|
ssrf_ip_allowlist: list[str] = []
|
|
for j, item in enumerate(ssrf_raw):
|
|
if not isinstance(item, str) or not item:
|
|
raise ManifestError(
|
|
f"{label}.ssrf_ip_allowlist[{j}] must be a non-empty "
|
|
f"string (was {type(item).__name__})"
|
|
)
|
|
try:
|
|
ipaddress.ip_network(item, strict=False)
|
|
except ValueError as e:
|
|
raise ManifestError(
|
|
f"{label}.ssrf_ip_allowlist[{j}] must be an IP address "
|
|
f"or CIDR (was {item!r}): {e}"
|
|
)
|
|
ssrf_ip_allowlist.append(item)
|
|
return cls(
|
|
TlsPassthrough=tls_passthrough_raw,
|
|
SsrfIpAllowlist=tuple(ssrf_ip_allowlist),
|
|
)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class EgressRoute:
|
|
"""One route on the per-bottle egress sidecar (PRD 0017).
|
|
|
|
`Host` matches the request's hostname (case-insensitive). The
|
|
optional `PathAllowlist` constrains the URL path to a set of
|
|
prefixes; empty tuple means no path-level filtering. The optional
|
|
`AuthScheme` / `TokenRef` pair drives credential injection:
|
|
when set, the proxy strips any inbound Authorization and injects
|
|
`<AuthScheme> <value-of-host-env-named-by-TokenRef>`. When the
|
|
manifest's `auth` block is omitted both fields are empty strings —
|
|
no Authorization is written, no token forwarded.
|
|
|
|
`Role` is reserved for future use; all role strings are currently
|
|
rejected by the validator.
|
|
|
|
Validation rules (enforced in `from_dict`):
|
|
- `host` required, non-empty.
|
|
- `path_allowlist` optional, list of absolute path prefixes.
|
|
- `auth` optional. If present, MUST carry both `scheme` and
|
|
`token_ref` as non-empty strings; an empty `auth: {}` is an
|
|
error rather than a synonym for "no auth" (omit `auth` for
|
|
that case).
|
|
- `role` optional, reserved — any non-empty value is rejected.
|
|
"""
|
|
|
|
Host: str
|
|
PathAllowlist: tuple[str, ...] = ()
|
|
AuthScheme: str = ""
|
|
TokenRef: str = ""
|
|
Role: tuple[str, ...] = ()
|
|
Pipelock: PipelockRoutePolicy = field(default_factory=PipelockRoutePolicy)
|
|
|
|
@classmethod
|
|
def from_dict(cls, bottle_name: str, idx: int, raw: object) -> "EgressRoute":
|
|
label = f"bottle '{bottle_name}' egress.routes[{idx}]"
|
|
d = _as_json_object(raw, label)
|
|
host = d.get("host")
|
|
if not isinstance(host, str) or not host:
|
|
raise ManifestError(f"{label} missing required string field 'host'")
|
|
|
|
path_allow_raw = d.get("path_allowlist")
|
|
prefixes: tuple[str, ...] = ()
|
|
if path_allow_raw is not None:
|
|
if not isinstance(path_allow_raw, list):
|
|
raise ManifestError(
|
|
f"{label} path_allowlist must be an array "
|
|
f"(was {type(path_allow_raw).__name__})"
|
|
)
|
|
path_list = cast(list[object], path_allow_raw)
|
|
collected: list[str] = []
|
|
for j, p in enumerate(path_list):
|
|
if not isinstance(p, str):
|
|
raise ManifestError(
|
|
f"{label} path_allowlist[{j}] must be a string "
|
|
f"(was {type(p).__name__})"
|
|
)
|
|
if not p.startswith("/"):
|
|
raise ManifestError(
|
|
f"{label} path_allowlist[{j}] {p!r} must be an "
|
|
f"absolute path prefix starting with '/'"
|
|
)
|
|
collected.append(p)
|
|
prefixes = tuple(collected)
|
|
|
|
auth_scheme = ""
|
|
token_ref = ""
|
|
if "auth" in d:
|
|
auth_raw = d.get("auth")
|
|
auth_d = _as_json_object(auth_raw, f"{label} auth")
|
|
if not auth_d:
|
|
raise ManifestError(
|
|
f"{label} auth is empty ({{}}); omit the 'auth' key "
|
|
f"entirely if this route is unauthenticated. Otherwise "
|
|
f"both 'scheme' and 'token_ref' are required."
|
|
)
|
|
auth_scheme_raw = auth_d.get("scheme")
|
|
if not isinstance(auth_scheme_raw, str) or not auth_scheme_raw:
|
|
raise ManifestError(
|
|
f"{label} auth.scheme is required when 'auth' is set "
|
|
f"(non-empty string)"
|
|
)
|
|
if auth_scheme_raw not in EGRESS_AUTH_SCHEMES:
|
|
raise ManifestError(
|
|
f"{label} auth.scheme {auth_scheme_raw!r} is not one of "
|
|
f"{', '.join(EGRESS_AUTH_SCHEMES)}"
|
|
)
|
|
token_ref_raw = auth_d.get("token_ref")
|
|
if not isinstance(token_ref_raw, str) or not token_ref_raw:
|
|
raise ManifestError(
|
|
f"{label} auth.token_ref is required when 'auth' is set "
|
|
f"(name of the host env var holding the token value)"
|
|
)
|
|
for k in auth_d:
|
|
if k not in ("scheme", "token_ref"):
|
|
raise ManifestError(
|
|
f"{label} auth has unknown key {k!r}; "
|
|
f"only 'scheme' and 'token_ref' are accepted"
|
|
)
|
|
auth_scheme = auth_scheme_raw
|
|
token_ref = token_ref_raw
|
|
|
|
role_raw = d.get("role")
|
|
roles: tuple[str, ...] = ()
|
|
if role_raw is None:
|
|
roles = ()
|
|
elif isinstance(role_raw, str):
|
|
roles = (role_raw,)
|
|
elif isinstance(role_raw, list):
|
|
role_list = cast(list[object], role_raw)
|
|
collected_roles: list[str] = []
|
|
for r in role_list:
|
|
if not isinstance(r, str):
|
|
raise ManifestError(f"{label} role items must be strings (got {type(r).__name__})")
|
|
collected_roles.append(r)
|
|
roles = tuple(collected_roles)
|
|
else:
|
|
raise ManifestError(
|
|
f"{label} role must be a string or a list of strings "
|
|
f"(was {type(role_raw).__name__})"
|
|
)
|
|
if roles:
|
|
raise ManifestError(
|
|
f"{label} role {roles[0]!r} is not accepted; "
|
|
f"the 'role' field is reserved for future use"
|
|
)
|
|
|
|
pipelock = (
|
|
PipelockRoutePolicy.from_dict(bottle_name, idx, d["pipelock"])
|
|
if "pipelock" in d
|
|
else PipelockRoutePolicy()
|
|
)
|
|
|
|
for k in d:
|
|
if k not in ("host", "path_allowlist", "auth", "role", "pipelock"):
|
|
raise ManifestError(
|
|
f"{label} has unknown key {k!r}; accepted keys are "
|
|
f"'host', 'path_allowlist', 'auth', 'role', 'pipelock'"
|
|
)
|
|
|
|
return cls(
|
|
Host=host,
|
|
PathAllowlist=prefixes,
|
|
AuthScheme=auth_scheme,
|
|
TokenRef=token_ref,
|
|
Role=roles,
|
|
Pipelock=pipelock,
|
|
)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class EgressConfig:
|
|
"""Per-bottle egress configuration. Today this is just the
|
|
route table; the nesting under `egress:` leaves room for
|
|
per-bottle proxy settings (port override, log level, etc.) in
|
|
follow-ups."""
|
|
|
|
routes: tuple[EgressRoute, ...] = ()
|
|
|
|
@classmethod
|
|
def from_dict(cls, bottle_name: str, raw: object) -> "EgressConfig":
|
|
d = _as_json_object(raw, f"bottle '{bottle_name}' egress")
|
|
routes_raw = d.get("routes")
|
|
routes: tuple[EgressRoute, ...] = ()
|
|
if routes_raw is not None:
|
|
if not isinstance(routes_raw, list):
|
|
raise ManifestError(
|
|
f"bottle '{bottle_name}' egress.routes must be an array "
|
|
f"(was {type(routes_raw).__name__})"
|
|
)
|
|
routes_list = cast(list[object], routes_raw)
|
|
routes = tuple(
|
|
EgressRoute.from_dict(bottle_name, i, entry)
|
|
for i, entry in enumerate(routes_list)
|
|
)
|
|
_validate_egress_routes(bottle_name, routes)
|
|
for k in d:
|
|
if k != "routes":
|
|
raise ManifestError(
|
|
f"bottle '{bottle_name}' egress has unknown key {k!r}; "
|
|
f"only 'routes' is accepted"
|
|
)
|
|
return cls(routes=routes)
|