fix(egress): strip Authorization before DLP scan; remove auth_header param from scan_outbound
lint / lint (push) Failing after 1m24s
test / unit (pull_request) Successful in 31s
test / integration (pull_request) Successful in 44s

This commit is contained in:
2026-06-06 16:15:30 -04:00
parent ffa651c91d
commit fcdcfe9a48
3 changed files with 24 additions and 17 deletions
+7 -7
View File
@@ -148,13 +148,16 @@ class EgressAddon:
self._serve_introspection(flow, request_path)
return
# DLP outbound scan BEFORE stripping auth — catches tokens the
# agent tried to smuggle in the Authorization header.
# Strip inbound Authorization before DLP and matching; the agent cannot
# smuggle tokens, and the route may inject sidecar-owned auth later.
flow.request.headers.pop("authorization", None)
# DLP outbound scan after auth stripping so placeholder or attempted
# agent auth headers do not become part of the scanned payload.
route = match_route(self.config.routes, flow.request.pretty_host)
if route is not None:
body = flow.request.get_text(strict=False) or ""
auth_header = flow.request.headers.get("authorization", "")
dlp_result = scan_outbound(route, body, os.environ, auth_header=auth_header)
dlp_result = scan_outbound(route, body, os.environ)
if dlp_result is not None and dlp_result.severity == "block":
ctx = self._req_ctx(flow)
if dlp_result.context:
@@ -162,9 +165,6 @@ class EgressAddon:
self._block(flow, f"egress DLP: {dlp_result.reason}", ctx=ctx)
return
# Strip inbound Authorization — agent cannot smuggle tokens.
flow.request.headers.pop("authorization", None)
if is_git_push_request(request_path, query):
self._block(
flow,
-10
View File
@@ -531,8 +531,6 @@ def scan_outbound(
route: Route,
body: str | bytes,
environ: typing.Mapping[str, str],
*,
auth_header: str = "",
) -> ScanResult | None:
# Lazy import to avoid circular deps and keep dlp_detectors optional
# at import time (the sidecar copies it flat alongside this file).
@@ -544,19 +542,11 @@ def scan_outbound(
text = body if isinstance(body, str) else body.decode("utf-8", errors="replace")
if _detector_enabled(route.outbound_detectors, "token_patterns"):
if auth_header:
result = scan_token_patterns(auth_header, location="authorization header")
if result is not None:
return result
result = scan_token_patterns(text, location="body")
if result is not None:
return result
if _detector_enabled(route.outbound_detectors, "known_secrets"):
if auth_header:
result = scan_known_secrets(auth_header, location="authorization header", env=environ)
if result is not None:
return result
result = scan_known_secrets(text, location="body", env=environ)
if result is not None:
return result
+17
View File
@@ -30,6 +30,7 @@ from bot_bottle.egress_addon_core import (
match_route,
parse_config,
parse_routes,
scan_outbound,
)
@@ -589,6 +590,22 @@ class TestDecisionDefaults(unittest.TestCase):
self.assertIsNone(d.inject_authorization)
# --- scan_outbound -------------------------------------------------------
class TestScanOutbound(unittest.TestCase):
def test_body_token_patterns_still_block(self):
result = scan_outbound(
Route(host="chatgpt.com"),
"leak sk-" + "A" * 48,
{},
)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("body", result.location)
self.assertIn("OpenAI API key", result.reason)
# --- is_git_push_request ------------------------------------------------