cdfaaa3de8
Give each egress route a policy for what the proxy does when an outbound DLP detector matches a token, defaulting to the supervise flow added in the previous commit. The goal is cutting false-positive friction without weakening default-deny. - redact: scrub the matched value(s) from the body, non-host headers, and path/query via redact_tokens, then re-scan. Forward if clean; fail closed with a 403 if a match remains on a surface redaction can't rewrite (the hostname, or a unicode-evasion token). For routes where a token-shaped value is noise the upstream doesn't need. - block: the original hard 403, never overridable. - supervise (default, unset): hold the request for operator approval. Structural blocks (CRLF, no safelist-able value) stay hard 403s under every policy. Threads outbound_on_match from the bottle manifest (manifest_egress) through the resolved EgressRoute and rendered routes.yaml (egress.py) to the addon's Route (egress_addon_core), and round-trips it via the list-egress-routes introspection endpoint. The allow/egress-block tool descriptions document the new key. Tests: manifest parse/validation, core parse/validation, full manifest->render->addon round-trip for redact. README + PRD 0062 updated. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01HnvBjPZC5V7qeQpFbQdDmS
425 lines
15 KiB
Python
425 lines
15 KiB
Python
"""Egress routing manifest dataclasses and helpers (PRD 0017, PRD 0053)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
from dataclasses import dataclass
|
|
from typing import cast
|
|
|
|
from .manifest_util import ManifestError, as_json_object
|
|
|
|
EGRESS_AUTH_SCHEMES = ("Bearer", "token")
|
|
|
|
PATH_MATCH_TYPES = ("exact", "prefix", "regex")
|
|
HEADER_MATCH_TYPES = ("exact", "regex")
|
|
|
|
VALID_METHODS = frozenset({
|
|
"GET", "HEAD", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "TRACE",
|
|
"CONNECT",
|
|
})
|
|
|
|
OUTBOUND_DETECTOR_NAMES = frozenset({"token_patterns", "known_secrets"})
|
|
INBOUND_DETECTOR_NAMES = frozenset({"naive_injection_detection"})
|
|
|
|
# What the proxy does on an outbound token match (PRD 0062).
|
|
OUTBOUND_ON_MATCH_VALUES = ("block", "redact", "supervise")
|
|
|
|
|
|
def validate_egress_routes(
|
|
bottle_name: str,
|
|
routes: tuple[ManifestEgressRoute, ...],
|
|
) -> None:
|
|
seen_hosts: dict[str, None] = {}
|
|
for r in routes:
|
|
key = r.Host.lower()
|
|
if key in seen_hosts:
|
|
raise ManifestError(
|
|
f"bottle '{bottle_name}' egress.routes has duplicate host "
|
|
f"{r.Host!r}; each host must be unique on the proxy."
|
|
)
|
|
seen_hosts[key] = None
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ManifestPathMatch:
|
|
Type: str = "prefix"
|
|
Value: str = ""
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ManifestHeaderMatch:
|
|
Name: str = ""
|
|
Value: str = ""
|
|
Type: str = "exact"
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ManifestMatchEntry:
|
|
Paths: tuple[ManifestPathMatch, ...] = ()
|
|
Methods: tuple[str, ...] = ()
|
|
Headers: tuple[ManifestHeaderMatch, ...] = ()
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ManifestEgressRoute:
|
|
Host: str
|
|
Matches: tuple[ManifestMatchEntry, ...] = ()
|
|
AuthScheme: str = ""
|
|
TokenRef: str = ""
|
|
Role: tuple[str, ...] = ()
|
|
GitFetch: bool = False
|
|
OutboundDetectors: tuple[str, ...] | None = None
|
|
InboundDetectors: tuple[str, ...] | None = None
|
|
OutboundOnMatch: str = ""
|
|
|
|
@classmethod
|
|
def from_dict(cls, bottle_name: str, idx: int, raw: object) -> "ManifestEgressRoute":
|
|
label = f"bottle '{bottle_name}' egress.routes[{idx}]"
|
|
d = as_json_object(raw, label)
|
|
host = d.get("host")
|
|
if not isinstance(host, str) or not host:
|
|
raise ManifestError(f"{label} missing required string field 'host'")
|
|
|
|
# --- matches ---
|
|
matches: tuple[ManifestMatchEntry, ...] = ()
|
|
matches_raw = d.get("matches")
|
|
if matches_raw is not None:
|
|
if not isinstance(matches_raw, list):
|
|
raise ManifestError(
|
|
f"{label} matches must be an array "
|
|
f"(was {type(matches_raw).__name__})"
|
|
)
|
|
matches_list = cast(list[object], matches_raw)
|
|
entries: list[ManifestMatchEntry] = []
|
|
for k, entry_raw in enumerate(matches_list):
|
|
entries.append(
|
|
_parse_match_entry(label, k, entry_raw)
|
|
)
|
|
matches = tuple(entries)
|
|
|
|
# --- auth ---
|
|
auth_scheme = ""
|
|
token_ref = ""
|
|
if "auth" in d:
|
|
auth_raw = d.get("auth")
|
|
auth_d = as_json_object(auth_raw, f"{label} auth")
|
|
if not auth_d:
|
|
raise ManifestError(
|
|
f"{label} auth is empty ({{}}); omit the 'auth' key "
|
|
f"entirely if this route is unauthenticated. Otherwise "
|
|
f"both 'scheme' and 'token_ref' are required."
|
|
)
|
|
auth_scheme_raw = auth_d.get("scheme")
|
|
if not isinstance(auth_scheme_raw, str) or not auth_scheme_raw:
|
|
raise ManifestError(
|
|
f"{label} auth.scheme is required when 'auth' is set "
|
|
f"(non-empty string)"
|
|
)
|
|
if auth_scheme_raw not in EGRESS_AUTH_SCHEMES:
|
|
raise ManifestError(
|
|
f"{label} auth.scheme {auth_scheme_raw!r} is not one of "
|
|
f"{', '.join(EGRESS_AUTH_SCHEMES)}"
|
|
)
|
|
token_ref_raw = auth_d.get("token_ref")
|
|
if not isinstance(token_ref_raw, str) or not token_ref_raw:
|
|
raise ManifestError(
|
|
f"{label} auth.token_ref is required when 'auth' is set "
|
|
f"(name of the host env var holding the token value)"
|
|
)
|
|
for k in auth_d:
|
|
if k not in ("scheme", "token_ref"):
|
|
raise ManifestError(
|
|
f"{label} auth has unknown key {k!r}; "
|
|
f"only 'scheme' and 'token_ref' are accepted"
|
|
)
|
|
auth_scheme = auth_scheme_raw
|
|
token_ref = token_ref_raw
|
|
|
|
# --- role (reserved) ---
|
|
role_raw = d.get("role")
|
|
roles: tuple[str, ...] = ()
|
|
if role_raw is None:
|
|
roles = ()
|
|
elif isinstance(role_raw, str):
|
|
roles = (role_raw,)
|
|
elif isinstance(role_raw, list):
|
|
role_list = cast(list[object], role_raw)
|
|
collected_roles: list[str] = []
|
|
for r in role_list:
|
|
if not isinstance(r, str):
|
|
msg = f"{label} role items must be strings (got {type(r).__name__})"
|
|
raise ManifestError(msg)
|
|
collected_roles.append(r)
|
|
roles = tuple(collected_roles)
|
|
else:
|
|
raise ManifestError(
|
|
f"{label} role must be a string or a list of strings "
|
|
f"(was {type(role_raw).__name__})"
|
|
)
|
|
if roles:
|
|
raise ManifestError(
|
|
f"{label} role {roles[0]!r} is not accepted; "
|
|
f"the 'role' field is reserved for future use"
|
|
)
|
|
|
|
# --- dlp ---
|
|
outbound_detectors: tuple[str, ...] | None = None
|
|
inbound_detectors: tuple[str, ...] | None = None
|
|
outbound_on_match = ""
|
|
if "dlp" in d:
|
|
outbound_detectors, inbound_detectors, outbound_on_match = _parse_dlp_block(
|
|
label, d.get("dlp"),
|
|
)
|
|
|
|
# --- git-over-HTTPS policy ---
|
|
git_fetch = False
|
|
if "git" in d:
|
|
git_d = as_json_object(d.get("git"), f"{label} git")
|
|
raw_fetch = git_d.get("fetch", False)
|
|
if isinstance(raw_fetch, bool):
|
|
git_fetch = raw_fetch
|
|
else:
|
|
raise ManifestError(
|
|
f"{label} git.fetch must be a boolean "
|
|
f"(was {type(raw_fetch).__name__})"
|
|
)
|
|
for k in git_d:
|
|
if k != "fetch":
|
|
raise ManifestError(
|
|
f"{label} git has unknown key {k!r}; "
|
|
f"only 'fetch' is accepted"
|
|
)
|
|
|
|
for k in d:
|
|
if k not in ("host", "matches", "auth", "role", "dlp", "git"):
|
|
raise ManifestError(
|
|
f"{label} has unknown key {k!r}; accepted keys are "
|
|
f"'host', 'matches', 'auth', 'role', 'dlp', 'git'"
|
|
)
|
|
|
|
return cls(
|
|
Host=host,
|
|
Matches=matches,
|
|
AuthScheme=auth_scheme,
|
|
TokenRef=token_ref,
|
|
Role=roles,
|
|
GitFetch=git_fetch,
|
|
OutboundDetectors=outbound_detectors,
|
|
InboundDetectors=inbound_detectors,
|
|
OutboundOnMatch=outbound_on_match,
|
|
)
|
|
|
|
|
|
def _parse_match_entry(
|
|
route_label: str, k: int, raw: object,
|
|
) -> ManifestMatchEntry:
|
|
label = f"{route_label} matches[{k}]"
|
|
d = as_json_object(raw, label)
|
|
|
|
paths: tuple[ManifestPathMatch, ...] = ()
|
|
paths_raw = d.get("paths")
|
|
if paths_raw is not None:
|
|
if not isinstance(paths_raw, list):
|
|
raise ManifestError(f"{label} paths must be an array")
|
|
paths_list = cast(list[object], paths_raw)
|
|
parsed_paths: list[ManifestPathMatch] = []
|
|
for j, p_raw in enumerate(paths_list):
|
|
parsed_paths.append(_parse_path_match(label, j, p_raw))
|
|
paths = tuple(parsed_paths)
|
|
|
|
methods: tuple[str, ...] = ()
|
|
methods_raw = d.get("methods")
|
|
if methods_raw is not None:
|
|
if not isinstance(methods_raw, list):
|
|
raise ManifestError(f"{label} methods must be an array")
|
|
methods_list = cast(list[object], methods_raw)
|
|
normalised: list[str] = []
|
|
for j, m in enumerate(methods_list):
|
|
if not isinstance(m, str):
|
|
raise ManifestError(
|
|
f"{label} methods[{j}] must be a string"
|
|
)
|
|
upper = m.upper()
|
|
if upper not in VALID_METHODS:
|
|
raise ManifestError(
|
|
f"{label} methods[{j}] {m!r} is not a valid HTTP method"
|
|
)
|
|
normalised.append(upper)
|
|
methods = tuple(normalised)
|
|
|
|
headers: tuple[ManifestHeaderMatch, ...] = ()
|
|
headers_raw = d.get("headers")
|
|
if headers_raw is not None:
|
|
if not isinstance(headers_raw, list):
|
|
raise ManifestError(f"{label} headers must be an array")
|
|
headers_list = cast(list[object], headers_raw)
|
|
parsed_headers: list[ManifestHeaderMatch] = []
|
|
for j, h_raw in enumerate(headers_list):
|
|
parsed_headers.append(_parse_header_match(label, j, h_raw))
|
|
headers = tuple(parsed_headers)
|
|
|
|
for key in d:
|
|
if key not in ("paths", "methods", "headers"):
|
|
raise ManifestError(f"{label} has unknown key {key!r}")
|
|
|
|
return ManifestMatchEntry(Paths=paths, Methods=methods, Headers=headers)
|
|
|
|
|
|
def _parse_path_match(
|
|
entry_label: str, j: int, raw: object,
|
|
) -> ManifestPathMatch:
|
|
label = f"{entry_label} paths[{j}]"
|
|
d = as_json_object(raw, label)
|
|
ptype = d.get("type", "prefix")
|
|
if not isinstance(ptype, str) or ptype not in PATH_MATCH_TYPES:
|
|
raise ManifestError(
|
|
f"{label} type must be one of {', '.join(PATH_MATCH_TYPES)} "
|
|
f"(got {ptype!r})"
|
|
)
|
|
value = d.get("value")
|
|
if not isinstance(value, str) or not value:
|
|
raise ManifestError(f"{label} value must be a non-empty string")
|
|
if ptype in ("exact", "prefix") and not value.startswith("/"):
|
|
raise ManifestError(
|
|
f"{label} value {value!r} must start with '/' for type {ptype!r}"
|
|
)
|
|
if ptype == "regex":
|
|
try:
|
|
re.compile(value)
|
|
except re.error as e:
|
|
raise ManifestError(
|
|
f"{label} regex {value!r} failed to compile: {e}"
|
|
) from e
|
|
for k in d:
|
|
if k not in ("type", "value"):
|
|
raise ManifestError(f"{label} has unknown key {k!r}")
|
|
return ManifestPathMatch(Type=ptype, Value=value)
|
|
|
|
|
|
def _parse_header_match(
|
|
entry_label: str, j: int, raw: object,
|
|
) -> ManifestHeaderMatch:
|
|
label = f"{entry_label} headers[{j}]"
|
|
d = as_json_object(raw, label)
|
|
name = d.get("name")
|
|
if not isinstance(name, str) or not name:
|
|
raise ManifestError(f"{label} name must be a non-empty string")
|
|
value = d.get("value")
|
|
if not isinstance(value, str):
|
|
raise ManifestError(f"{label} value must be a string")
|
|
htype = d.get("type", "exact")
|
|
if not isinstance(htype, str) or htype not in HEADER_MATCH_TYPES:
|
|
raise ManifestError(
|
|
f"{label} type must be one of {', '.join(HEADER_MATCH_TYPES)} "
|
|
f"(got {htype!r})"
|
|
)
|
|
if htype == "regex":
|
|
try:
|
|
re.compile(value)
|
|
except re.error as e:
|
|
raise ManifestError(
|
|
f"{label} regex {value!r} failed to compile: {e}"
|
|
) from e
|
|
for k in d:
|
|
if k not in ("name", "value", "type"):
|
|
raise ManifestError(f"{label} has unknown key {k!r}")
|
|
return ManifestHeaderMatch(Name=name, Value=value, Type=htype)
|
|
|
|
|
|
def _parse_dlp_block(
|
|
route_label: str,
|
|
raw: object,
|
|
) -> tuple[tuple[str, ...] | None, tuple[str, ...] | None, str]:
|
|
label = f"{route_label} dlp"
|
|
d = as_json_object(raw, label)
|
|
|
|
def _parse_field(
|
|
field: str,
|
|
valid_names: frozenset[str],
|
|
) -> tuple[str, ...] | None:
|
|
val = d.get(field)
|
|
if val is None:
|
|
return None
|
|
if val is False:
|
|
return ()
|
|
if not isinstance(val, list):
|
|
raise ManifestError(
|
|
f"{label} {field} must be false, a list, or omitted"
|
|
)
|
|
items = cast(list[object], val)
|
|
names: list[str] = []
|
|
for j, item in enumerate(items):
|
|
if not isinstance(item, str):
|
|
raise ManifestError(
|
|
f"{label} {field}[{j}] must be a string"
|
|
)
|
|
if item not in valid_names:
|
|
raise ManifestError(
|
|
f"{label} {field}[{j}] {item!r} is not a valid "
|
|
f"detector; valid: {', '.join(sorted(valid_names))}"
|
|
)
|
|
names.append(item)
|
|
return tuple(names)
|
|
|
|
outbound = _parse_field("outbound_detectors", OUTBOUND_DETECTOR_NAMES)
|
|
inbound = _parse_field("inbound_detectors", INBOUND_DETECTOR_NAMES)
|
|
|
|
on_match = ""
|
|
on_match_raw = d.get("outbound_on_match")
|
|
if on_match_raw is not None:
|
|
if not isinstance(on_match_raw, str) or on_match_raw not in OUTBOUND_ON_MATCH_VALUES:
|
|
raise ManifestError(
|
|
f"{label} outbound_on_match must be one of "
|
|
f"{', '.join(OUTBOUND_ON_MATCH_VALUES)} (got {on_match_raw!r})"
|
|
)
|
|
on_match = on_match_raw
|
|
|
|
for k in d:
|
|
if k not in ("outbound_detectors", "inbound_detectors", "outbound_on_match"):
|
|
raise ManifestError(
|
|
f"{label} has unknown key {k!r}; accepted keys are "
|
|
f"'outbound_detectors', 'inbound_detectors', "
|
|
f"'outbound_on_match'"
|
|
)
|
|
return outbound, inbound, on_match
|
|
|
|
|
|
LOG_LEVELS = frozenset({0, 1, 2})
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ManifestEgressConfig:
|
|
routes: tuple[ManifestEgressRoute, ...] = ()
|
|
Log: int = 0
|
|
|
|
@classmethod
|
|
def from_dict(cls, bottle_name: str, raw: object) -> "ManifestEgressConfig":
|
|
d = as_json_object(raw, f"bottle '{bottle_name}' egress")
|
|
routes_raw = d.get("routes")
|
|
routes: tuple[ManifestEgressRoute, ...] = ()
|
|
if routes_raw is not None:
|
|
if not isinstance(routes_raw, list):
|
|
raise ManifestError(
|
|
f"bottle '{bottle_name}' egress.routes must be an array "
|
|
f"(was {type(routes_raw).__name__})"
|
|
)
|
|
routes_list = cast(list[object], routes_raw)
|
|
routes = tuple(
|
|
ManifestEgressRoute.from_dict(bottle_name, i, entry)
|
|
for i, entry in enumerate(routes_list)
|
|
)
|
|
validate_egress_routes(bottle_name, routes)
|
|
log_raw = d.get("log", 0)
|
|
if isinstance(log_raw, bool) or not isinstance(log_raw, int) \
|
|
or log_raw not in LOG_LEVELS:
|
|
raise ManifestError(
|
|
f"bottle '{bottle_name}' egress.log must be 0, 1, or 2"
|
|
)
|
|
for k in d:
|
|
if k not in ("routes", "log"):
|
|
raise ManifestError(
|
|
f"bottle '{bottle_name}' egress has unknown key {k!r}; "
|
|
f"accepted keys are 'routes', 'log'"
|
|
)
|
|
return cls(routes=routes, Log=log_raw)
|