1659f738ef
`egress_addon_core.py` mixed the per-route `dlp:` block parser (`_parse_detectors` plus the detector-name and `outbound_on_match` constants) in with the request-time scan/decision flow. Move that config-parsing layer into a new stdlib-only `egress_dlp_config.py` as `parse_dlp_block`, so the decision path in the core module reads top-to-bottom without scrolling past config plumbing. The constants and parser are re-exported from `egress_addon_core` (and listed in `__all__`) so existing `from egress_addon_core import ON_MATCH_*` / `OUTBOUND_DETECTOR_NAMES` callers are unchanged. The new module ships flat into the sidecar bundle (Dockerfile.sidecars) and uses the same flat/package import shim as its siblings. Pure refactor; behavior and wire format unchanged. Closes #287 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01NkwFXLFff9PYPy4wgVBJp9
93 lines
3.7 KiB
Python
93 lines
3.7 KiB
Python
"""DLP detector-config parsing for egress routes (PRD 0053, PRD 0062).
|
|
|
|
A route's optional `dlp:` block names which outbound/inbound detectors run
|
|
and what the proxy does when an outbound detector matches a token
|
|
(`outbound_on_match`). This module owns parsing and validating that block,
|
|
kept apart from the request-time scan/decision flow in `egress_addon_core`
|
|
so each half reads top-to-bottom without scrolling past the other.
|
|
|
|
Stdlib-only; ships flat into the sidecar bundle image alongside
|
|
`egress_addon_core.py` — see `Dockerfile.sidecars`."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import typing
|
|
|
|
OUTBOUND_DETECTOR_NAMES = frozenset({"token_patterns", "known_secrets", "entropy"})
|
|
INBOUND_DETECTOR_NAMES = frozenset({"naive_injection_detection"})
|
|
|
|
# Per-route policy for what the proxy does when an outbound DLP detector
|
|
# matches a token (PRD 0062).
|
|
ON_MATCH_BLOCK = "block" # hard 403, never overridable
|
|
ON_MATCH_REDACT = "redact" # scrub the matched value, forward the request
|
|
ON_MATCH_SUPERVISE = "supervise" # queue for operator approval, hold the request
|
|
OUTBOUND_ON_MATCH_VALUES = (ON_MATCH_BLOCK, ON_MATCH_REDACT, ON_MATCH_SUPERVISE)
|
|
# Unset resolves to supervise (fall back to block when supervise is not wired).
|
|
DEFAULT_OUTBOUND_ON_MATCH = ON_MATCH_SUPERVISE
|
|
|
|
|
|
def parse_dlp_block(
|
|
idx: int,
|
|
host: str,
|
|
raw_dict: dict[str, object],
|
|
) -> tuple[tuple[str, ...] | None, tuple[str, ...] | None, str]:
|
|
"""Parse the optional `dlp` block on a route, returning
|
|
(outbound_detectors, inbound_detectors, outbound_on_match)."""
|
|
dlp_raw = raw_dict.get("dlp")
|
|
if dlp_raw is None:
|
|
return None, None, ""
|
|
label = f"route[{idx}] ({host})"
|
|
if not isinstance(dlp_raw, dict):
|
|
raise ValueError(f"{label}: 'dlp' must be an object")
|
|
dlp = typing.cast(dict[str, object], dlp_raw)
|
|
|
|
def _parse_detector_field(
|
|
field: str,
|
|
valid_names: frozenset[str],
|
|
) -> tuple[str, ...] | None:
|
|
val = dlp.get(field)
|
|
if val is None:
|
|
return None
|
|
if val is False:
|
|
return ()
|
|
if not isinstance(val, list):
|
|
raise ValueError(
|
|
f"{label}: dlp.{field} must be false, a list, or omitted"
|
|
)
|
|
items = typing.cast(list[object], val)
|
|
names: list[str] = []
|
|
for j, item in enumerate(items):
|
|
if not isinstance(item, str):
|
|
raise ValueError(
|
|
f"{label}: dlp.{field}[{j}] must be a string"
|
|
)
|
|
if item not in valid_names:
|
|
raise ValueError(
|
|
f"{label}: dlp.{field}[{j}] {item!r} is not a valid "
|
|
f"detector name; valid names: {', '.join(sorted(valid_names))}"
|
|
)
|
|
names.append(item)
|
|
return tuple(names)
|
|
|
|
outbound = _parse_detector_field("outbound_detectors", OUTBOUND_DETECTOR_NAMES)
|
|
inbound = _parse_detector_field("inbound_detectors", INBOUND_DETECTOR_NAMES)
|
|
|
|
on_match = ""
|
|
on_match_raw = dlp.get("outbound_on_match")
|
|
if on_match_raw is not None:
|
|
if not isinstance(on_match_raw, str) or on_match_raw not in OUTBOUND_ON_MATCH_VALUES:
|
|
raise ValueError(
|
|
f"{label}: dlp.outbound_on_match must be one of "
|
|
f"{', '.join(OUTBOUND_ON_MATCH_VALUES)} (got {on_match_raw!r})"
|
|
)
|
|
on_match = on_match_raw
|
|
|
|
for k in dlp:
|
|
if k not in ("outbound_detectors", "inbound_detectors", "outbound_on_match"):
|
|
raise ValueError(
|
|
f"{label}: dlp has unknown key {k!r}; accepted keys "
|
|
f"are 'outbound_detectors', 'inbound_detectors', "
|
|
f"'outbound_on_match'"
|
|
)
|
|
return outbound, inbound, on_match
|