fix(egress): strip Authorization before DLP scan #212

Merged
didericis merged 1 commits from fix/egress-strip-auth-before-scan into main 2026-06-07 22:36:01 -04:00
3 changed files with 24 additions and 17 deletions
+7 -7
View File
@@ -147,13 +147,16 @@ class EgressAddon:
self._serve_introspection(flow, request_path)
return
# DLP outbound scan BEFORE stripping auth — catches tokens the
# agent tried to smuggle in the Authorization header.
# Strip inbound Authorization before DLP and matching; the agent cannot
# smuggle tokens, and the route may inject sidecar-owned auth later.
flow.request.headers.pop("authorization", None)
# DLP outbound scan after auth stripping so placeholder or attempted
# agent auth headers do not become part of the scanned payload.
route = match_route(self.config.routes, flow.request.pretty_host)
if route is not None:
body = flow.request.get_text(strict=False) or ""
auth_header = flow.request.headers.get("authorization", "")
dlp_result = scan_outbound(route, body, os.environ, auth_header=auth_header)
dlp_result = scan_outbound(route, body, os.environ)
if dlp_result is not None and dlp_result.severity == "block":
ctx = self._req_ctx(flow)
if dlp_result.context:
@@ -161,9 +164,6 @@ class EgressAddon:
self._block(flow, f"egress DLP: {dlp_result.reason}", ctx=ctx)
return
# Strip inbound Authorization — agent cannot smuggle tokens.
flow.request.headers.pop("authorization", None)
if is_git_push_request(request_path, query):
self._block(
flow,
-10
View File
@@ -532,8 +532,6 @@ def scan_outbound(
route: Route,
body: str | bytes,
environ: typing.Mapping[str, str],
*,
auth_header: str = "",
) -> ScanResult | None:
# Lazy import to avoid circular deps and keep dlp_detectors optional
# at import time (the sidecar copies it flat alongside this file).
@@ -549,19 +547,11 @@ def scan_outbound(
text = body if isinstance(body, str) else body.decode("utf-8", errors="replace")
if _detector_enabled(route.outbound_detectors, "token_patterns"):
if auth_header:
result = scan_token_patterns(auth_header, location="authorization header")
if result is not None:
return result
result = scan_token_patterns(text, location="body")
if result is not None:
return result
if _detector_enabled(route.outbound_detectors, "known_secrets"):
if auth_header:
result = scan_known_secrets(auth_header, location="authorization header", env=environ)
if result is not None:
return result
result = scan_known_secrets(text, location="body", env=environ)
if result is not None:
return result
+17
View File
@@ -30,6 +30,7 @@ from bot_bottle.egress_addon_core import (
match_route,
parse_config,
parse_routes,
scan_outbound,
)
@@ -599,6 +600,22 @@ class TestDecisionDefaults(unittest.TestCase):
self.assertIsNone(d.inject_authorization)
# --- scan_outbound -------------------------------------------------------
class TestScanOutbound(unittest.TestCase):
def test_body_token_patterns_still_block(self):
result = scan_outbound(
Route(host="chatgpt.com"),
"leak sk-" + "A" * 48,
{},
)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("body", result.location)
self.assertIn("OpenAI API key", result.reason)
# --- is_git_push_request ------------------------------------------------