From a04aed098d050971807133ade05f3575c95099f2 Mon Sep 17 00:00:00 2001 From: didericis Date: Sat, 6 Jun 2026 16:15:30 -0400 Subject: [PATCH] fix(egress): strip Authorization before DLP scan; remove auth_header param from scan_outbound --- bot_bottle/egress_addon.py | 14 +++++++------- bot_bottle/egress_addon_core.py | 10 ---------- tests/unit/test_egress_addon_core.py | 17 +++++++++++++++++ 3 files changed, 24 insertions(+), 17 deletions(-) diff --git a/bot_bottle/egress_addon.py b/bot_bottle/egress_addon.py index 73655e1..2def445 100644 --- a/bot_bottle/egress_addon.py +++ b/bot_bottle/egress_addon.py @@ -147,13 +147,16 @@ class EgressAddon: self._serve_introspection(flow, request_path) return - # DLP outbound scan BEFORE stripping auth — catches tokens the - # agent tried to smuggle in the Authorization header. + # Strip inbound Authorization before DLP and matching; the agent cannot + # smuggle tokens, and the route may inject sidecar-owned auth later. + flow.request.headers.pop("authorization", None) + + # DLP outbound scan after auth stripping so placeholder or attempted + # agent auth headers do not become part of the scanned payload. route = match_route(self.config.routes, flow.request.pretty_host) if route is not None: body = flow.request.get_text(strict=False) or "" - auth_header = flow.request.headers.get("authorization", "") - dlp_result = scan_outbound(route, body, os.environ, auth_header=auth_header) + dlp_result = scan_outbound(route, body, os.environ) if dlp_result is not None and dlp_result.severity == "block": ctx = self._req_ctx(flow) if dlp_result.context: @@ -161,9 +164,6 @@ class EgressAddon: self._block(flow, f"egress DLP: {dlp_result.reason}", ctx=ctx) return - # Strip inbound Authorization — agent cannot smuggle tokens. - flow.request.headers.pop("authorization", None) - if is_git_push_request(request_path, query): self._block( flow, diff --git a/bot_bottle/egress_addon_core.py b/bot_bottle/egress_addon_core.py index 8bed9ec..b0fc8fe 100644 --- a/bot_bottle/egress_addon_core.py +++ b/bot_bottle/egress_addon_core.py @@ -532,8 +532,6 @@ def scan_outbound( route: Route, body: str | bytes, environ: typing.Mapping[str, str], - *, - auth_header: str = "", ) -> ScanResult | None: # Lazy import to avoid circular deps and keep dlp_detectors optional # at import time (the sidecar copies it flat alongside this file). @@ -549,19 +547,11 @@ def scan_outbound( text = body if isinstance(body, str) else body.decode("utf-8", errors="replace") if _detector_enabled(route.outbound_detectors, "token_patterns"): - if auth_header: - result = scan_token_patterns(auth_header, location="authorization header") - if result is not None: - return result result = scan_token_patterns(text, location="body") if result is not None: return result if _detector_enabled(route.outbound_detectors, "known_secrets"): - if auth_header: - result = scan_known_secrets(auth_header, location="authorization header", env=environ) - if result is not None: - return result result = scan_known_secrets(text, location="body", env=environ) if result is not None: return result diff --git a/tests/unit/test_egress_addon_core.py b/tests/unit/test_egress_addon_core.py index cd39154..2bcfe1e 100644 --- a/tests/unit/test_egress_addon_core.py +++ b/tests/unit/test_egress_addon_core.py @@ -30,6 +30,7 @@ from bot_bottle.egress_addon_core import ( match_route, parse_config, parse_routes, + scan_outbound, ) @@ -599,6 +600,22 @@ class TestDecisionDefaults(unittest.TestCase): self.assertIsNone(d.inject_authorization) +# --- scan_outbound ------------------------------------------------------- + + +class TestScanOutbound(unittest.TestCase): + def test_body_token_patterns_still_block(self): + result = scan_outbound( + Route(host="chatgpt.com"), + "leak sk-" + "A" * 48, + {}, + ) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("body", result.location) + self.assertIn("OpenAI API key", result.reason) + + # --- is_git_push_request ------------------------------------------------