fix(egress): strip Authorization before DLP scan #212
@@ -147,13 +147,16 @@ class EgressAddon:
|
||||
self._serve_introspection(flow, request_path)
|
||||
return
|
||||
|
||||
# DLP outbound scan BEFORE stripping auth — catches tokens the
|
||||
# agent tried to smuggle in the Authorization header.
|
||||
# Strip inbound Authorization before DLP and matching; the agent cannot
|
||||
# smuggle tokens, and the route may inject sidecar-owned auth later.
|
||||
flow.request.headers.pop("authorization", None)
|
||||
|
||||
# DLP outbound scan after auth stripping so placeholder or attempted
|
||||
# agent auth headers do not become part of the scanned payload.
|
||||
route = match_route(self.config.routes, flow.request.pretty_host)
|
||||
if route is not None:
|
||||
body = flow.request.get_text(strict=False) or ""
|
||||
auth_header = flow.request.headers.get("authorization", "")
|
||||
dlp_result = scan_outbound(route, body, os.environ, auth_header=auth_header)
|
||||
dlp_result = scan_outbound(route, body, os.environ)
|
||||
if dlp_result is not None and dlp_result.severity == "block":
|
||||
ctx = self._req_ctx(flow)
|
||||
if dlp_result.context:
|
||||
@@ -161,9 +164,6 @@ class EgressAddon:
|
||||
self._block(flow, f"egress DLP: {dlp_result.reason}", ctx=ctx)
|
||||
return
|
||||
|
||||
# Strip inbound Authorization — agent cannot smuggle tokens.
|
||||
flow.request.headers.pop("authorization", None)
|
||||
|
||||
if is_git_push_request(request_path, query):
|
||||
self._block(
|
||||
flow,
|
||||
|
||||
@@ -532,8 +532,6 @@ def scan_outbound(
|
||||
route: Route,
|
||||
body: str | bytes,
|
||||
environ: typing.Mapping[str, str],
|
||||
*,
|
||||
auth_header: str = "",
|
||||
) -> ScanResult | None:
|
||||
# Lazy import to avoid circular deps and keep dlp_detectors optional
|
||||
# at import time (the sidecar copies it flat alongside this file).
|
||||
@@ -549,19 +547,11 @@ def scan_outbound(
|
||||
text = body if isinstance(body, str) else body.decode("utf-8", errors="replace")
|
||||
|
||||
if _detector_enabled(route.outbound_detectors, "token_patterns"):
|
||||
if auth_header:
|
||||
result = scan_token_patterns(auth_header, location="authorization header")
|
||||
if result is not None:
|
||||
return result
|
||||
result = scan_token_patterns(text, location="body")
|
||||
if result is not None:
|
||||
return result
|
||||
|
||||
if _detector_enabled(route.outbound_detectors, "known_secrets"):
|
||||
if auth_header:
|
||||
result = scan_known_secrets(auth_header, location="authorization header", env=environ)
|
||||
if result is not None:
|
||||
return result
|
||||
result = scan_known_secrets(text, location="body", env=environ)
|
||||
if result is not None:
|
||||
return result
|
||||
|
||||
@@ -30,6 +30,7 @@ from bot_bottle.egress_addon_core import (
|
||||
match_route,
|
||||
parse_config,
|
||||
parse_routes,
|
||||
scan_outbound,
|
||||
)
|
||||
|
||||
|
||||
@@ -599,6 +600,22 @@ class TestDecisionDefaults(unittest.TestCase):
|
||||
self.assertIsNone(d.inject_authorization)
|
||||
|
||||
|
||||
# --- scan_outbound -------------------------------------------------------
|
||||
|
||||
|
||||
class TestScanOutbound(unittest.TestCase):
|
||||
def test_body_token_patterns_still_block(self):
|
||||
result = scan_outbound(
|
||||
Route(host="chatgpt.com"),
|
||||
"leak sk-" + "A" * 48,
|
||||
{},
|
||||
)
|
||||
self.assertIsNotNone(result)
|
||||
assert result is not None
|
||||
self.assertEqual("body", result.location)
|
||||
self.assertIn("OpenAI API key", result.reason)
|
||||
|
||||
|
||||
# --- is_git_push_request ------------------------------------------------
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user