perf(dlp): linearize injection proximity check; bound variant cache; dedup supervise schema
lint / lint (push) Successful in 2m21s
test / unit (pull_request) Successful in 1m1s
test / integration (pull_request) Successful in 27s
test / coverage (pull_request) Successful in 1m15s

- dlp_detectors._closest_pair: replace the O(n*m) cross product with an
  O(n log n) sort + O(n) two-pointer merge, and early-out once a pair
  falls within the proximity threshold. The inputs are attacker-controlled
  response-body matches past the body-size cap, so the quadratic form was a
  latent DoS. Extract _match_gap to share the span-gap calc with the caller.
- dlp_detectors._compute_encoded_variants: back the memo with a bounded
  functools.lru_cache instead of an unbounded module dict, so a long-lived
  proxy seeing rotating secrets evicts rather than growing without limit.
- supervise_server: extract the duplicated routes.yaml inputSchema into
  _proposal_input_schema()/_ROUTES_YAML_DESCRIPTION so the egress-allow and
  egress-block tools can't drift.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01NkwFXLFff9PYPy4wgVBJp9
This commit is contained in:
2026-06-26 23:22:18 -04:00
parent 09755c3e24
commit b7f5f6439e
3 changed files with 118 additions and 81 deletions
+45 -64
View File
@@ -151,6 +151,49 @@ def jsonrpc_error(request_id: object, code: int, message: str) -> bytes:
# --- Tool definitions ------------------------------------------------------
# Shared by both proposal tools (egress-allow / egress-block): they take the
# same arguments and differ only in their top-level tool description. Kept as a
# single source of truth so the schema can't drift between the two tools.
_ROUTES_YAML_DESCRIPTION = (
"Full proposed /etc/egress/routes.yaml content. "
"Each route entry accepts these keys:\n"
" host: <hostname> (required)\n"
" auth_scheme: Bearer|token (must pair with token_env)\n"
" token_env: <ENV_VAR_NAME> (must pair with auth_scheme)\n"
" matches: (optional list of match entries)\n"
" - paths: [{type: prefix|exact|regex, value: /...}]\n"
" methods: [GET, POST, ...]\n"
" headers: [{name: X-Hdr, value: val, type: exact|regex}]\n"
" git: (optional; omit to block git clone/fetch)\n"
" fetch: true\n"
" dlp: (optional DLP scanner overrides)\n"
" outbound_detectors: [token_patterns, known_secrets]\n"
" inbound_detectors: [naive_injection_detection]\n"
" outbound_on_match: block|redact|supervise (default supervise)\n"
"Omit any key that should use its default. "
"`list-egress-routes` returns routes in this same format."
)
def _proposal_input_schema() -> dict[str, object]:
"""Build a fresh input schema for a routes.yaml proposal tool. Returns a
new dict per call so the two tool definitions don't alias one object."""
return {
"type": "object",
"properties": {
"routes_yaml": {
"type": "string",
"description": _ROUTES_YAML_DESCRIPTION,
},
"justification": {
"type": "string",
"description": "Why this egress route is needed.",
},
},
"required": ["routes_yaml", "justification"],
}
TOOL_DEFINITIONS: list[dict[str, object]] = [
{
"name": _sv.TOOL_LIST_EGRESS_ROUTES,
@@ -178,38 +221,7 @@ TOOL_DEFINITIONS: list[dict[str, object]] = [
"`list-egress-routes` first so the proposal preserves existing "
"routes."
),
"inputSchema": {
"type": "object",
"properties": {
"routes_yaml": {
"type": "string",
"description": (
"Full proposed /etc/egress/routes.yaml content. "
"Each route entry accepts these keys:\n"
" host: <hostname> (required)\n"
" auth_scheme: Bearer|token (must pair with token_env)\n"
" token_env: <ENV_VAR_NAME> (must pair with auth_scheme)\n"
" matches: (optional list of match entries)\n"
" - paths: [{type: prefix|exact|regex, value: /...}]\n"
" methods: [GET, POST, ...]\n"
" headers: [{name: X-Hdr, value: val, type: exact|regex}]\n"
" git: (optional; omit to block git clone/fetch)\n"
" fetch: true\n"
" dlp: (optional DLP scanner overrides)\n"
" outbound_detectors: [token_patterns, known_secrets]\n"
" inbound_detectors: [naive_injection_detection]\n"
" outbound_on_match: block|redact|supervise (default supervise)\n"
"Omit any key that should use its default. "
"`list-egress-routes` returns routes in this same format."
),
},
"justification": {
"type": "string",
"description": "Why this egress route is needed.",
},
},
"required": ["routes_yaml", "justification"],
},
"inputSchema": _proposal_input_schema(),
},
{
"name": _sv.TOOL_EGRESS_BLOCK,
@@ -220,38 +232,7 @@ TOOL_DEFINITIONS: list[dict[str, object]] = [
"`list-egress-routes` first so the proposal preserves existing "
"routes."
),
"inputSchema": {
"type": "object",
"properties": {
"routes_yaml": {
"type": "string",
"description": (
"Full proposed /etc/egress/routes.yaml content. "
"Each route entry accepts these keys:\n"
" host: <hostname> (required)\n"
" auth_scheme: Bearer|token (must pair with token_env)\n"
" token_env: <ENV_VAR_NAME> (must pair with auth_scheme)\n"
" matches: (optional list of match entries)\n"
" - paths: [{type: prefix|exact|regex, value: /...}]\n"
" methods: [GET, POST, ...]\n"
" headers: [{name: X-Hdr, value: val, type: exact|regex}]\n"
" git: (optional; omit to block git clone/fetch)\n"
" fetch: true\n"
" dlp: (optional DLP scanner overrides)\n"
" outbound_detectors: [token_patterns, known_secrets]\n"
" inbound_detectors: [naive_injection_detection]\n"
" outbound_on_match: block|redact|supervise (default supervise)\n"
"Omit any key that should use its default. "
"`list-egress-routes` returns routes in this same format."
),
},
"justification": {
"type": "string",
"description": "Why this egress route is needed.",
},
},
"required": ["routes_yaml", "justification"],
},
"inputSchema": _proposal_input_schema(),
},
]