diff --git a/Dockerfile.sidecars b/Dockerfile.sidecars index 6960848..a94960e 100644 --- a/Dockerfile.sidecars +++ b/Dockerfile.sidecars @@ -62,6 +62,7 @@ COPY --from=gitleaks-src /usr/bin/gitleaks /usr/bin/gitleaks # top-level siblings (absolute imports), matching the prior # Dockerfile.egress / Dockerfile.supervise layout. COPY bot_bottle/egress_addon_core.py /app/egress_addon_core.py +COPY bot_bottle/egress_dlp_config.py /app/egress_dlp_config.py COPY bot_bottle/egress_addon.py /app/egress_addon.py COPY bot_bottle/dlp_detectors.py /app/dlp_detectors.py COPY bot_bottle/yaml_subset.py /app/yaml_subset.py diff --git a/bot_bottle/egress_addon_core.py b/bot_bottle/egress_addon_core.py index 7ced873..af4cc39 100644 --- a/bot_bottle/egress_addon_core.py +++ b/bot_bottle/egress_addon_core.py @@ -21,6 +21,32 @@ try: except ImportError: # pragma: no cover - host-side path from .yaml_subset import YamlSubsetError, parse_yaml_subset +# DLP detector-config parsing lives in a sibling module (also flat-bundled +# into the sidecar — see Dockerfile.sidecars). Re-exported below so existing +# `from egress_addon_core import ON_MATCH_*` callers keep working. +try: + from egress_dlp_config import ( # type: ignore[import-not-found] + DEFAULT_OUTBOUND_ON_MATCH, + INBOUND_DETECTOR_NAMES, + ON_MATCH_BLOCK, + ON_MATCH_REDACT, + ON_MATCH_SUPERVISE, + OUTBOUND_DETECTOR_NAMES, + OUTBOUND_ON_MATCH_VALUES, + parse_dlp_block, + ) +except ImportError: # pragma: no cover - host-side path + from .egress_dlp_config import ( + DEFAULT_OUTBOUND_ON_MATCH, + INBOUND_DETECTOR_NAMES, + ON_MATCH_BLOCK, + ON_MATCH_REDACT, + ON_MATCH_SUPERVISE, + OUTBOUND_DETECTOR_NAMES, + OUTBOUND_ON_MATCH_VALUES, + parse_dlp_block, + ) + # --------------------------------------------------------------------------- # Match types (Gateway API HTTPRoute vocabulary, PRD 0053) @@ -34,18 +60,6 @@ VALID_METHODS = frozenset({ "CONNECT", }) -OUTBOUND_DETECTOR_NAMES = frozenset({"token_patterns", "known_secrets", "entropy"}) -INBOUND_DETECTOR_NAMES = frozenset({"naive_injection_detection"}) - -# Per-route policy for what the proxy does when an outbound DLP detector -# matches a token (PRD 0062). -ON_MATCH_BLOCK = "block" # hard 403, never overridable -ON_MATCH_REDACT = "redact" # scrub the matched value, forward the request -ON_MATCH_SUPERVISE = "supervise" # queue for operator approval, hold the request -OUTBOUND_ON_MATCH_VALUES = (ON_MATCH_BLOCK, ON_MATCH_REDACT, ON_MATCH_SUPERVISE) -# Unset resolves to supervise (fall back to block when supervise is not wired). -DEFAULT_OUTBOUND_ON_MATCH = ON_MATCH_SUPERVISE - @dataclass(frozen=True) class PathMatch: @@ -230,72 +244,6 @@ def _parse_match_entry(idx: int, k: int, raw: object) -> MatchEntry: return MatchEntry(paths=paths, methods=methods, headers=headers) -def _parse_detectors( - idx: int, - host: str, - raw_dict: dict[str, object], -) -> tuple[tuple[str, ...] | None, tuple[str, ...] | None, str]: - """Parse the optional `dlp` block on a route, returning - (outbound_detectors, inbound_detectors, outbound_on_match).""" - dlp_raw = raw_dict.get("dlp") - if dlp_raw is None: - return None, None, "" - label = f"route[{idx}] ({host})" - if not isinstance(dlp_raw, dict): - raise ValueError(f"{label}: 'dlp' must be an object") - dlp = typing.cast(dict[str, object], dlp_raw) - - def _parse_detector_field( - field: str, - valid_names: frozenset[str], - ) -> tuple[str, ...] | None: - val = dlp.get(field) - if val is None: - return None - if val is False: - return () - if not isinstance(val, list): - raise ValueError( - f"{label}: dlp.{field} must be false, a list, or omitted" - ) - items = typing.cast(list[object], val) - names: list[str] = [] - for j, item in enumerate(items): - if not isinstance(item, str): - raise ValueError( - f"{label}: dlp.{field}[{j}] must be a string" - ) - if item not in valid_names: - raise ValueError( - f"{label}: dlp.{field}[{j}] {item!r} is not a valid " - f"detector name; valid names: {', '.join(sorted(valid_names))}" - ) - names.append(item) - return tuple(names) - - outbound = _parse_detector_field("outbound_detectors", OUTBOUND_DETECTOR_NAMES) - inbound = _parse_detector_field("inbound_detectors", INBOUND_DETECTOR_NAMES) - - on_match = "" - on_match_raw = dlp.get("outbound_on_match") - if on_match_raw is not None: - if not isinstance(on_match_raw, str) or on_match_raw not in OUTBOUND_ON_MATCH_VALUES: - raise ValueError( - f"{label}: dlp.outbound_on_match must be one of " - f"{', '.join(OUTBOUND_ON_MATCH_VALUES)} (got {on_match_raw!r})" - ) - on_match = on_match_raw - - for k in dlp: - if k not in ("outbound_detectors", "inbound_detectors", "outbound_on_match"): - raise ValueError( - f"{label}: dlp has unknown key {k!r}; accepted keys " - f"are 'outbound_detectors', 'inbound_detectors', " - f"'outbound_on_match'" - ) - return outbound, inbound, on_match - - def parse_routes(payload: object) -> tuple[Route, ...]: if not isinstance(payload, dict): raise ValueError("routes payload: top-level must be an object") @@ -364,7 +312,7 @@ def _parse_one(idx: int, raw: object) -> Route: ) # dlp detectors - outbound_detectors, inbound_detectors, outbound_on_match = _parse_detectors( + outbound_detectors, inbound_detectors, outbound_on_match = parse_dlp_block( idx, host, raw_dict, ) @@ -837,6 +785,9 @@ __all__ = [ "ON_MATCH_SUPERVISE", "OUTBOUND_ON_MATCH_VALUES", "DEFAULT_OUTBOUND_ON_MATCH", + "OUTBOUND_DETECTOR_NAMES", + "INBOUND_DETECTOR_NAMES", + "parse_dlp_block", "Config", "Decision", "HeaderMatch", diff --git a/bot_bottle/egress_dlp_config.py b/bot_bottle/egress_dlp_config.py new file mode 100644 index 0000000..29892ee --- /dev/null +++ b/bot_bottle/egress_dlp_config.py @@ -0,0 +1,92 @@ +"""DLP detector-config parsing for egress routes (PRD 0053, PRD 0062). + +A route's optional `dlp:` block names which outbound/inbound detectors run +and what the proxy does when an outbound detector matches a token +(`outbound_on_match`). This module owns parsing and validating that block, +kept apart from the request-time scan/decision flow in `egress_addon_core` +so each half reads top-to-bottom without scrolling past the other. + +Stdlib-only; ships flat into the sidecar bundle image alongside +`egress_addon_core.py` — see `Dockerfile.sidecars`.""" + +from __future__ import annotations + +import typing + +OUTBOUND_DETECTOR_NAMES = frozenset({"token_patterns", "known_secrets", "entropy"}) +INBOUND_DETECTOR_NAMES = frozenset({"naive_injection_detection"}) + +# Per-route policy for what the proxy does when an outbound DLP detector +# matches a token (PRD 0062). +ON_MATCH_BLOCK = "block" # hard 403, never overridable +ON_MATCH_REDACT = "redact" # scrub the matched value, forward the request +ON_MATCH_SUPERVISE = "supervise" # queue for operator approval, hold the request +OUTBOUND_ON_MATCH_VALUES = (ON_MATCH_BLOCK, ON_MATCH_REDACT, ON_MATCH_SUPERVISE) +# Unset resolves to supervise (fall back to block when supervise is not wired). +DEFAULT_OUTBOUND_ON_MATCH = ON_MATCH_SUPERVISE + + +def parse_dlp_block( + idx: int, + host: str, + raw_dict: dict[str, object], +) -> tuple[tuple[str, ...] | None, tuple[str, ...] | None, str]: + """Parse the optional `dlp` block on a route, returning + (outbound_detectors, inbound_detectors, outbound_on_match).""" + dlp_raw = raw_dict.get("dlp") + if dlp_raw is None: + return None, None, "" + label = f"route[{idx}] ({host})" + if not isinstance(dlp_raw, dict): + raise ValueError(f"{label}: 'dlp' must be an object") + dlp = typing.cast(dict[str, object], dlp_raw) + + def _parse_detector_field( + field: str, + valid_names: frozenset[str], + ) -> tuple[str, ...] | None: + val = dlp.get(field) + if val is None: + return None + if val is False: + return () + if not isinstance(val, list): + raise ValueError( + f"{label}: dlp.{field} must be false, a list, or omitted" + ) + items = typing.cast(list[object], val) + names: list[str] = [] + for j, item in enumerate(items): + if not isinstance(item, str): + raise ValueError( + f"{label}: dlp.{field}[{j}] must be a string" + ) + if item not in valid_names: + raise ValueError( + f"{label}: dlp.{field}[{j}] {item!r} is not a valid " + f"detector name; valid names: {', '.join(sorted(valid_names))}" + ) + names.append(item) + return tuple(names) + + outbound = _parse_detector_field("outbound_detectors", OUTBOUND_DETECTOR_NAMES) + inbound = _parse_detector_field("inbound_detectors", INBOUND_DETECTOR_NAMES) + + on_match = "" + on_match_raw = dlp.get("outbound_on_match") + if on_match_raw is not None: + if not isinstance(on_match_raw, str) or on_match_raw not in OUTBOUND_ON_MATCH_VALUES: + raise ValueError( + f"{label}: dlp.outbound_on_match must be one of " + f"{', '.join(OUTBOUND_ON_MATCH_VALUES)} (got {on_match_raw!r})" + ) + on_match = on_match_raw + + for k in dlp: + if k not in ("outbound_detectors", "inbound_detectors", "outbound_on_match"): + raise ValueError( + f"{label}: dlp has unknown key {k!r}; accepted keys " + f"are 'outbound_detectors', 'inbound_detectors', " + f"'outbound_on_match'" + ) + return outbound, inbound, on_match