refactor: address PR review feedback — de-privatize helpers and rename modules
- Rename _manifest_util.py → manifest_util.py (module isn't private) - Rename _as_json_object → as_json_object, _parse_git_upstream → parse_git_upstream, _parse_git_gate_config → parse_git_gate_config, _validate_unique_git_names → validate_unique_git_names, _validate_egress_routes → validate_egress_routes (none are private at module boundary — underscore prefix was a carry-over from the old monolithic manifest.py where everything lived in one namespace) - Move _is_ip_literal → util.is_ip_literal (generic, belongs in the top-level util module) - Update all import sites across manifest_*.py, manifest_extends.py, manifest_schema.py; existing callers of manifest.py are unaffected All 867 unit tests pass.
This commit is contained in:
@@ -6,7 +6,7 @@ import ipaddress
|
||||
from dataclasses import dataclass, field
|
||||
from typing import cast
|
||||
|
||||
from ._manifest_util import ManifestError, _as_json_object
|
||||
from .manifest_util import ManifestError, as_json_object
|
||||
|
||||
|
||||
# Auth schemes for the egress route's optional `auth` block.
|
||||
@@ -15,15 +15,7 @@ from ._manifest_util import ManifestError, _as_json_object
|
||||
EGRESS_AUTH_SCHEMES = ("Bearer", "token")
|
||||
|
||||
|
||||
def _is_ip_literal(value: str) -> bool:
|
||||
try:
|
||||
ipaddress.ip_address(value)
|
||||
except ValueError:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def _validate_egress_routes(
|
||||
def validate_egress_routes(
|
||||
bottle_name: str,
|
||||
routes: tuple[EgressRoute, ...],
|
||||
) -> None:
|
||||
@@ -68,7 +60,7 @@ class PipelockRoutePolicy:
|
||||
cls, bottle_name: str, idx: int, raw: object,
|
||||
) -> "PipelockRoutePolicy":
|
||||
label = f"bottle '{bottle_name}' egress.routes[{idx}] pipelock"
|
||||
d = _as_json_object(raw, label)
|
||||
d = as_json_object(raw, label)
|
||||
for k in d:
|
||||
if k not in ("tls_passthrough", "ssrf_ip_allowlist"):
|
||||
raise ManifestError(
|
||||
@@ -145,7 +137,7 @@ class EgressRoute:
|
||||
@classmethod
|
||||
def from_dict(cls, bottle_name: str, idx: int, raw: object) -> "EgressRoute":
|
||||
label = f"bottle '{bottle_name}' egress.routes[{idx}]"
|
||||
d = _as_json_object(raw, label)
|
||||
d = as_json_object(raw, label)
|
||||
host = d.get("host")
|
||||
if not isinstance(host, str) or not host:
|
||||
raise ManifestError(f"{label} missing required string field 'host'")
|
||||
@@ -178,7 +170,7 @@ class EgressRoute:
|
||||
token_ref = ""
|
||||
if "auth" in d:
|
||||
auth_raw = d.get("auth")
|
||||
auth_d = _as_json_object(auth_raw, f"{label} auth")
|
||||
auth_d = as_json_object(auth_raw, f"{label} auth")
|
||||
if not auth_d:
|
||||
raise ManifestError(
|
||||
f"{label} auth is empty ({{}}); omit the 'auth' key "
|
||||
@@ -270,7 +262,7 @@ class EgressConfig:
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, bottle_name: str, raw: object) -> "EgressConfig":
|
||||
d = _as_json_object(raw, f"bottle '{bottle_name}' egress")
|
||||
d = as_json_object(raw, f"bottle '{bottle_name}' egress")
|
||||
routes_raw = d.get("routes")
|
||||
routes: tuple[EgressRoute, ...] = ()
|
||||
if routes_raw is not None:
|
||||
@@ -284,7 +276,7 @@ class EgressConfig:
|
||||
EgressRoute.from_dict(bottle_name, i, entry)
|
||||
for i, entry in enumerate(routes_list)
|
||||
)
|
||||
_validate_egress_routes(bottle_name, routes)
|
||||
validate_egress_routes(bottle_name, routes)
|
||||
for k in d:
|
||||
if k != "routes":
|
||||
raise ManifestError(
|
||||
|
||||
Reference in New Issue
Block a user