Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c97c01d300 |
@@ -62,7 +62,6 @@ COPY --from=gitleaks-src /usr/bin/gitleaks /usr/bin/gitleaks
|
||||
# top-level siblings (absolute imports), matching the prior
|
||||
# Dockerfile.egress / Dockerfile.supervise layout.
|
||||
COPY bot_bottle/egress_addon_core.py /app/egress_addon_core.py
|
||||
COPY bot_bottle/egress_dlp_config.py /app/egress_dlp_config.py
|
||||
COPY bot_bottle/egress_addon.py /app/egress_addon.py
|
||||
COPY bot_bottle/dlp_detectors.py /app/dlp_detectors.py
|
||||
COPY bot_bottle/yaml_subset.py /app/yaml_subset.py
|
||||
|
||||
@@ -21,32 +21,6 @@ try:
|
||||
except ImportError: # pragma: no cover - host-side path
|
||||
from .yaml_subset import YamlSubsetError, parse_yaml_subset
|
||||
|
||||
# DLP detector-config parsing lives in a sibling module (also flat-bundled
|
||||
# into the sidecar — see Dockerfile.sidecars). Re-exported below so existing
|
||||
# `from egress_addon_core import ON_MATCH_*` callers keep working.
|
||||
try:
|
||||
from egress_dlp_config import ( # type: ignore[import-not-found]
|
||||
DEFAULT_OUTBOUND_ON_MATCH,
|
||||
INBOUND_DETECTOR_NAMES,
|
||||
ON_MATCH_BLOCK,
|
||||
ON_MATCH_REDACT,
|
||||
ON_MATCH_SUPERVISE,
|
||||
OUTBOUND_DETECTOR_NAMES,
|
||||
OUTBOUND_ON_MATCH_VALUES,
|
||||
parse_dlp_block,
|
||||
)
|
||||
except ImportError: # pragma: no cover - host-side path
|
||||
from .egress_dlp_config import (
|
||||
DEFAULT_OUTBOUND_ON_MATCH,
|
||||
INBOUND_DETECTOR_NAMES,
|
||||
ON_MATCH_BLOCK,
|
||||
ON_MATCH_REDACT,
|
||||
ON_MATCH_SUPERVISE,
|
||||
OUTBOUND_DETECTOR_NAMES,
|
||||
OUTBOUND_ON_MATCH_VALUES,
|
||||
parse_dlp_block,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Match types (Gateway API HTTPRoute vocabulary, PRD 0053)
|
||||
@@ -60,6 +34,18 @@ VALID_METHODS = frozenset({
|
||||
"CONNECT",
|
||||
})
|
||||
|
||||
OUTBOUND_DETECTOR_NAMES = frozenset({"token_patterns", "known_secrets", "entropy"})
|
||||
INBOUND_DETECTOR_NAMES = frozenset({"naive_injection_detection"})
|
||||
|
||||
# Per-route policy for what the proxy does when an outbound DLP detector
|
||||
# matches a token (PRD 0062).
|
||||
ON_MATCH_BLOCK = "block" # hard 403, never overridable
|
||||
ON_MATCH_REDACT = "redact" # scrub the matched value, forward the request
|
||||
ON_MATCH_SUPERVISE = "supervise" # queue for operator approval, hold the request
|
||||
OUTBOUND_ON_MATCH_VALUES = (ON_MATCH_BLOCK, ON_MATCH_REDACT, ON_MATCH_SUPERVISE)
|
||||
# Unset resolves to supervise (fall back to block when supervise is not wired).
|
||||
DEFAULT_OUTBOUND_ON_MATCH = ON_MATCH_SUPERVISE
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class PathMatch:
|
||||
@@ -244,6 +230,72 @@ def _parse_match_entry(idx: int, k: int, raw: object) -> MatchEntry:
|
||||
return MatchEntry(paths=paths, methods=methods, headers=headers)
|
||||
|
||||
|
||||
def _parse_detectors(
|
||||
idx: int,
|
||||
host: str,
|
||||
raw_dict: dict[str, object],
|
||||
) -> tuple[tuple[str, ...] | None, tuple[str, ...] | None, str]:
|
||||
"""Parse the optional `dlp` block on a route, returning
|
||||
(outbound_detectors, inbound_detectors, outbound_on_match)."""
|
||||
dlp_raw = raw_dict.get("dlp")
|
||||
if dlp_raw is None:
|
||||
return None, None, ""
|
||||
label = f"route[{idx}] ({host})"
|
||||
if not isinstance(dlp_raw, dict):
|
||||
raise ValueError(f"{label}: 'dlp' must be an object")
|
||||
dlp = typing.cast(dict[str, object], dlp_raw)
|
||||
|
||||
def _parse_detector_field(
|
||||
field: str,
|
||||
valid_names: frozenset[str],
|
||||
) -> tuple[str, ...] | None:
|
||||
val = dlp.get(field)
|
||||
if val is None:
|
||||
return None
|
||||
if val is False:
|
||||
return ()
|
||||
if not isinstance(val, list):
|
||||
raise ValueError(
|
||||
f"{label}: dlp.{field} must be false, a list, or omitted"
|
||||
)
|
||||
items = typing.cast(list[object], val)
|
||||
names: list[str] = []
|
||||
for j, item in enumerate(items):
|
||||
if not isinstance(item, str):
|
||||
raise ValueError(
|
||||
f"{label}: dlp.{field}[{j}] must be a string"
|
||||
)
|
||||
if item not in valid_names:
|
||||
raise ValueError(
|
||||
f"{label}: dlp.{field}[{j}] {item!r} is not a valid "
|
||||
f"detector name; valid names: {', '.join(sorted(valid_names))}"
|
||||
)
|
||||
names.append(item)
|
||||
return tuple(names)
|
||||
|
||||
outbound = _parse_detector_field("outbound_detectors", OUTBOUND_DETECTOR_NAMES)
|
||||
inbound = _parse_detector_field("inbound_detectors", INBOUND_DETECTOR_NAMES)
|
||||
|
||||
on_match = ""
|
||||
on_match_raw = dlp.get("outbound_on_match")
|
||||
if on_match_raw is not None:
|
||||
if not isinstance(on_match_raw, str) or on_match_raw not in OUTBOUND_ON_MATCH_VALUES:
|
||||
raise ValueError(
|
||||
f"{label}: dlp.outbound_on_match must be one of "
|
||||
f"{', '.join(OUTBOUND_ON_MATCH_VALUES)} (got {on_match_raw!r})"
|
||||
)
|
||||
on_match = on_match_raw
|
||||
|
||||
for k in dlp:
|
||||
if k not in ("outbound_detectors", "inbound_detectors", "outbound_on_match"):
|
||||
raise ValueError(
|
||||
f"{label}: dlp has unknown key {k!r}; accepted keys "
|
||||
f"are 'outbound_detectors', 'inbound_detectors', "
|
||||
f"'outbound_on_match'"
|
||||
)
|
||||
return outbound, inbound, on_match
|
||||
|
||||
|
||||
def parse_routes(payload: object) -> tuple[Route, ...]:
|
||||
if not isinstance(payload, dict):
|
||||
raise ValueError("routes payload: top-level must be an object")
|
||||
@@ -312,7 +364,7 @@ def _parse_one(idx: int, raw: object) -> Route:
|
||||
)
|
||||
|
||||
# dlp detectors
|
||||
outbound_detectors, inbound_detectors, outbound_on_match = parse_dlp_block(
|
||||
outbound_detectors, inbound_detectors, outbound_on_match = _parse_detectors(
|
||||
idx, host, raw_dict,
|
||||
)
|
||||
|
||||
@@ -785,9 +837,6 @@ __all__ = [
|
||||
"ON_MATCH_SUPERVISE",
|
||||
"OUTBOUND_ON_MATCH_VALUES",
|
||||
"DEFAULT_OUTBOUND_ON_MATCH",
|
||||
"OUTBOUND_DETECTOR_NAMES",
|
||||
"INBOUND_DETECTOR_NAMES",
|
||||
"parse_dlp_block",
|
||||
"Config",
|
||||
"Decision",
|
||||
"HeaderMatch",
|
||||
|
||||
@@ -1,92 +0,0 @@
|
||||
"""DLP detector-config parsing for egress routes (PRD 0053, PRD 0062).
|
||||
|
||||
A route's optional `dlp:` block names which outbound/inbound detectors run
|
||||
and what the proxy does when an outbound detector matches a token
|
||||
(`outbound_on_match`). This module owns parsing and validating that block,
|
||||
kept apart from the request-time scan/decision flow in `egress_addon_core`
|
||||
so each half reads top-to-bottom without scrolling past the other.
|
||||
|
||||
Stdlib-only; ships flat into the sidecar bundle image alongside
|
||||
`egress_addon_core.py` — see `Dockerfile.sidecars`."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import typing
|
||||
|
||||
OUTBOUND_DETECTOR_NAMES = frozenset({"token_patterns", "known_secrets", "entropy"})
|
||||
INBOUND_DETECTOR_NAMES = frozenset({"naive_injection_detection"})
|
||||
|
||||
# Per-route policy for what the proxy does when an outbound DLP detector
|
||||
# matches a token (PRD 0062).
|
||||
ON_MATCH_BLOCK = "block" # hard 403, never overridable
|
||||
ON_MATCH_REDACT = "redact" # scrub the matched value, forward the request
|
||||
ON_MATCH_SUPERVISE = "supervise" # queue for operator approval, hold the request
|
||||
OUTBOUND_ON_MATCH_VALUES = (ON_MATCH_BLOCK, ON_MATCH_REDACT, ON_MATCH_SUPERVISE)
|
||||
# Unset resolves to supervise (fall back to block when supervise is not wired).
|
||||
DEFAULT_OUTBOUND_ON_MATCH = ON_MATCH_SUPERVISE
|
||||
|
||||
|
||||
def parse_dlp_block(
|
||||
idx: int,
|
||||
host: str,
|
||||
raw_dict: dict[str, object],
|
||||
) -> tuple[tuple[str, ...] | None, tuple[str, ...] | None, str]:
|
||||
"""Parse the optional `dlp` block on a route, returning
|
||||
(outbound_detectors, inbound_detectors, outbound_on_match)."""
|
||||
dlp_raw = raw_dict.get("dlp")
|
||||
if dlp_raw is None:
|
||||
return None, None, ""
|
||||
label = f"route[{idx}] ({host})"
|
||||
if not isinstance(dlp_raw, dict):
|
||||
raise ValueError(f"{label}: 'dlp' must be an object")
|
||||
dlp = typing.cast(dict[str, object], dlp_raw)
|
||||
|
||||
def _parse_detector_field(
|
||||
field: str,
|
||||
valid_names: frozenset[str],
|
||||
) -> tuple[str, ...] | None:
|
||||
val = dlp.get(field)
|
||||
if val is None:
|
||||
return None
|
||||
if val is False:
|
||||
return ()
|
||||
if not isinstance(val, list):
|
||||
raise ValueError(
|
||||
f"{label}: dlp.{field} must be false, a list, or omitted"
|
||||
)
|
||||
items = typing.cast(list[object], val)
|
||||
names: list[str] = []
|
||||
for j, item in enumerate(items):
|
||||
if not isinstance(item, str):
|
||||
raise ValueError(
|
||||
f"{label}: dlp.{field}[{j}] must be a string"
|
||||
)
|
||||
if item not in valid_names:
|
||||
raise ValueError(
|
||||
f"{label}: dlp.{field}[{j}] {item!r} is not a valid "
|
||||
f"detector name; valid names: {', '.join(sorted(valid_names))}"
|
||||
)
|
||||
names.append(item)
|
||||
return tuple(names)
|
||||
|
||||
outbound = _parse_detector_field("outbound_detectors", OUTBOUND_DETECTOR_NAMES)
|
||||
inbound = _parse_detector_field("inbound_detectors", INBOUND_DETECTOR_NAMES)
|
||||
|
||||
on_match = ""
|
||||
on_match_raw = dlp.get("outbound_on_match")
|
||||
if on_match_raw is not None:
|
||||
if not isinstance(on_match_raw, str) or on_match_raw not in OUTBOUND_ON_MATCH_VALUES:
|
||||
raise ValueError(
|
||||
f"{label}: dlp.outbound_on_match must be one of "
|
||||
f"{', '.join(OUTBOUND_ON_MATCH_VALUES)} (got {on_match_raw!r})"
|
||||
)
|
||||
on_match = on_match_raw
|
||||
|
||||
for k in dlp:
|
||||
if k not in ("outbound_detectors", "inbound_detectors", "outbound_on_match"):
|
||||
raise ValueError(
|
||||
f"{label}: dlp has unknown key {k!r}; accepted keys "
|
||||
f"are 'outbound_detectors', 'inbound_detectors', "
|
||||
f"'outbound_on_match'"
|
||||
)
|
||||
return outbound, inbound, on_match
|
||||
@@ -24,61 +24,36 @@ from bot_bottle.dlp_detectors import (
|
||||
)
|
||||
|
||||
|
||||
# (case id, sample body carrying the token, substring expected in the reason).
|
||||
# One row per known token shape; all are block-severity credential matches.
|
||||
# `# gitleaks:allow` marks the synthetic tokens so a source scan won't flag them.
|
||||
_TOKEN_PATTERN_CASES: list[tuple[str, str, str]] = [
|
||||
("aws_access_key", "key=AKIAIOSFODNN7EXAMPLE", "AWS access key"),
|
||||
("github_classic", "token: ghp_" + "A" * 36, "GitHub token"), # gitleaks:allow
|
||||
("github_fine_grained", "pat=github_pat_" + "A" * 82, "fine-grained"), # gitleaks:allow
|
||||
("anthropic", "auth: sk-ant-" + "A" * 93, "Anthropic"), # gitleaks:allow
|
||||
("openai", "key=sk-" + "A" * 48, "OpenAI"), # gitleaks:allow
|
||||
("stripe_live", "stripe: sk_live_" + "A" * 24, "Stripe"), # gitleaks:allow
|
||||
("bearer_jwt", "Authorization: Bearer " + "A" * 60, "Bearer JWT"), # gitleaks:allow
|
||||
("openai_project", "key=sk-proj-" + "A" * 48, "OpenAI project"), # gitleaks:allow
|
||||
("huggingface", "token=hf_" + "A" * 34, "HuggingFace"), # gitleaks:allow
|
||||
("databricks", "dapi" + "a" * 32, "Databricks"), # gitleaks:allow
|
||||
("slack_bot", "xoxb-00000000000-00000000000-" + "A" * 24, "Slack"), # gitleaks:allow
|
||||
("npm", "npm_" + "A" * 36, "npm"), # gitleaks:allow
|
||||
("sendgrid", "SG." + "A" * 22 + "." + "B" * 43, "SendGrid"), # gitleaks:allow
|
||||
("pypi", "pypi-" + "A" * 80, "PyPI"), # gitleaks:allow
|
||||
("vault", "hvs." + "A" * 24, "Vault"), # gitleaks:allow
|
||||
]
|
||||
|
||||
|
||||
class TestScanTokenPatterns(unittest.TestCase):
|
||||
def test_aws_access_key(self):
|
||||
result = scan_token_patterns("key=AKIAIOSFODNN7EXAMPLE")
|
||||
assert result is not None
|
||||
self.assertEqual("block", result.severity)
|
||||
self.assertIn("AWS access key", result.reason)
|
||||
|
||||
def test_github_classic_token(self):
|
||||
result = scan_token_patterns(
|
||||
"token: ghp_" + "A" * 36,
|
||||
)
|
||||
assert result is not None
|
||||
self.assertIn("GitHub token", result.reason)
|
||||
|
||||
def test_github_fine_grained_token(self):
|
||||
result = scan_token_patterns(
|
||||
"pat=github_pat_" + "A" * 82,
|
||||
)
|
||||
assert result is not None
|
||||
self.assertIn("fine-grained", result.reason)
|
||||
|
||||
def test_anthropic_api_key(self):
|
||||
result = scan_token_patterns(
|
||||
"auth: sk-ant-" + "A" * 93,
|
||||
)
|
||||
assert result is not None
|
||||
self.assertIn("Anthropic", result.reason)
|
||||
|
||||
def test_openai_api_key(self):
|
||||
result = scan_token_patterns(
|
||||
"key=sk-" + "A" * 48,
|
||||
)
|
||||
assert result is not None
|
||||
self.assertIn("OpenAI", result.reason)
|
||||
|
||||
def test_stripe_live_key(self):
|
||||
result = scan_token_patterns(
|
||||
"stripe: sk_live_" + "A" * 24,
|
||||
)
|
||||
assert result is not None
|
||||
self.assertIn("Stripe", result.reason)
|
||||
|
||||
def test_bearer_jwt(self):
|
||||
result = scan_token_patterns(
|
||||
"Authorization: Bearer " + "A" * 60,
|
||||
)
|
||||
assert result is not None
|
||||
self.assertIn("Bearer JWT", result.reason)
|
||||
|
||||
def test_openai_project_key(self):
|
||||
result = scan_token_patterns(
|
||||
"key=sk-proj-" + "A" * 48,
|
||||
)
|
||||
assert result is not None
|
||||
self.assertIn("OpenAI project", result.reason)
|
||||
def test_detects_each_token_pattern(self):
|
||||
for case_id, sample, expected in _TOKEN_PATTERN_CASES:
|
||||
with self.subTest(case_id):
|
||||
result = scan_token_patterns(sample)
|
||||
assert result is not None
|
||||
self.assertEqual("block", result.severity)
|
||||
self.assertIn(expected, result.reason)
|
||||
|
||||
def test_clean_text_returns_none(self):
|
||||
self.assertIsNone(scan_token_patterns("hello world"))
|
||||
@@ -307,44 +282,6 @@ class TestEncodedVariants(unittest.TestCase):
|
||||
self.assertEqual(len(v), len(set(v)))
|
||||
|
||||
|
||||
class TestScanTokenPatternsExtended(unittest.TestCase):
|
||||
def test_huggingface_token(self):
|
||||
result = scan_token_patterns("token=hf_" + "A" * 34) # gitleaks:allow
|
||||
assert result is not None
|
||||
self.assertIn("HuggingFace", result.reason)
|
||||
|
||||
def test_databricks_token(self):
|
||||
result = scan_token_patterns("dapi" + "a" * 32) # gitleaks:allow
|
||||
assert result is not None
|
||||
self.assertIn("Databricks", result.reason)
|
||||
|
||||
def test_slack_bot_token(self):
|
||||
# Use all-zero numeric segments to keep entropy low
|
||||
result = scan_token_patterns("xoxb-00000000000-00000000000-" + "A" * 24) # gitleaks:allow
|
||||
assert result is not None
|
||||
self.assertIn("Slack", result.reason)
|
||||
|
||||
def test_npm_token(self):
|
||||
result = scan_token_patterns("npm_" + "A" * 36) # gitleaks:allow
|
||||
assert result is not None
|
||||
self.assertIn("npm", result.reason)
|
||||
|
||||
def test_sendgrid_key(self):
|
||||
result = scan_token_patterns("SG." + "A" * 22 + "." + "B" * 43) # gitleaks:allow
|
||||
assert result is not None
|
||||
self.assertIn("SendGrid", result.reason)
|
||||
|
||||
def test_pypi_token(self):
|
||||
result = scan_token_patterns("pypi-" + "A" * 80) # gitleaks:allow
|
||||
assert result is not None
|
||||
self.assertIn("PyPI", result.reason)
|
||||
|
||||
def test_vault_token(self):
|
||||
result = scan_token_patterns("hvs." + "A" * 24) # gitleaks:allow
|
||||
assert result is not None
|
||||
self.assertIn("Vault", result.reason)
|
||||
|
||||
|
||||
class TestUnicodeNormalization(unittest.TestCase):
|
||||
def test_fullwidth_chars_normalized(self):
|
||||
# Fullwidth ASCII chars (U+FF21..U+FF3A) should map to ASCII
|
||||
|
||||
Reference in New Issue
Block a user