From 4e570e3e2bc36219d4908031c03648ffb398c1c3 Mon Sep 17 00:00:00 2001 From: didericis Date: Mon, 8 Jun 2026 15:43:46 -0400 Subject: [PATCH] fix(egress): ignore stripped auth header in DLP scan --- bot_bottle/egress_addon.py | 3 +- bot_bottle/egress_addon_core.py | 22 +++++++++++ tests/unit/test_egress_addon_core.py | 57 ++++++++++++++++++++++++++++ 3 files changed, 81 insertions(+), 1 deletion(-) diff --git a/bot_bottle/egress_addon.py b/bot_bottle/egress_addon.py index 2bfaa1a..8e57b69 100644 --- a/bot_bottle/egress_addon.py +++ b/bot_bottle/egress_addon.py @@ -24,6 +24,7 @@ from egress_addon_core import ( # type: ignore[import-not-found] # pylint: dis is_git_push_request, load_config, match_route, + outbound_scan_headers, scan_inbound, scan_outbound, ) @@ -159,7 +160,7 @@ class EgressAddon: flow.request.pretty_host, request_path, query, - dict(flow.request.headers), + outbound_scan_headers(route, dict(flow.request.headers)), body, ) dlp_result = scan_outbound(route, scan_text, os.environ) diff --git a/bot_bottle/egress_addon_core.py b/bot_bottle/egress_addon_core.py index 6112814..4327948 100644 --- a/bot_bottle/egress_addon_core.py +++ b/bot_bottle/egress_addon_core.py @@ -538,6 +538,27 @@ def build_outbound_scan_text( return "\n".join(parts) +def outbound_scan_headers( + route: Route, + headers: typing.Mapping[str, str], +) -> dict[str, str]: + """Return request headers that should be included in outbound DLP. + + Routes that inject sidecar-owned auth always strip the agent's + Authorization header before forwarding. Scanning that header first + creates false positives for provider clients that insist on sending + their own bearer-shaped placeholder, while still not changing what + reaches the upstream. + """ + out: dict[str, str] = {} + skip_auth = bool(route.auth_scheme and route.token_env) + for name, value in headers.items(): + if skip_auth and name.lower() == "authorization": + continue + out[name] = value + return out + + def build_inbound_scan_text( headers: typing.Mapping[str, str], body: str, @@ -644,6 +665,7 @@ __all__ = [ "load_config", "load_routes", "match_route", + "outbound_scan_headers", "parse_config", "parse_routes", "scan_inbound", diff --git a/tests/unit/test_egress_addon_core.py b/tests/unit/test_egress_addon_core.py index 2c11486..85f44a9 100644 --- a/tests/unit/test_egress_addon_core.py +++ b/tests/unit/test_egress_addon_core.py @@ -30,6 +30,7 @@ from bot_bottle.egress_addon_core import ( load_config, load_routes, match_route, + outbound_scan_headers, parse_config, parse_routes, scan_inbound, @@ -798,6 +799,41 @@ class TestBuildOutboundScanText(unittest.TestCase): self.assertIn(fragment, text) +class TestOutboundScanHeaders(unittest.TestCase): + def test_authed_route_omits_authorization_header_from_scan(self): + route = Route( + host="chatgpt.com", + auth_scheme="Bearer", + token_env="EGRESS_TOKEN_0", + ) + headers = outbound_scan_headers(route, { + "Authorization": "Bearer " + "A" * 60, + "x-api-key": "still-scanned", + }) + self.assertNotIn("Authorization", headers) + self.assertEqual({"x-api-key": "still-scanned"}, headers) + + def test_authed_route_omits_lowercase_authorization_header_from_scan(self): + route = Route( + host="chatgpt.com", + auth_scheme="Bearer", + token_env="EGRESS_TOKEN_0", + ) + headers = outbound_scan_headers(route, { + "authorization": "Bearer " + "A" * 60, + "accept": "application/json", + }) + self.assertEqual({"accept": "application/json"}, headers) + + def test_unauthenticated_route_keeps_authorization_header_in_scan(self): + route = Route(host="api.example.com") + auth = "Bearer " + "A" * 60 + headers = outbound_scan_headers(route, { + "Authorization": auth, + }) + self.assertEqual({"Authorization": auth}, headers) + + # --- scan_outbound ------------------------------------------------------- _AWS_KEY = "AKIAIOSFODNN7EXAMPLE" @@ -815,6 +851,27 @@ class TestScanOutbound(unittest.TestCase): ) self.assertIsNone(scan_outbound(_ROUTE, text, {})) + def test_authed_route_authorization_placeholder_not_scanned(self): + route = Route( + host="chatgpt.com", + auth_scheme="Bearer", + token_env="EGRESS_TOKEN_0", + ) + headers = outbound_scan_headers(route, { + "Authorization": "Bearer " + "A" * 60, + "content-type": "application/json", + }) + text = build_outbound_scan_text( + host="chatgpt.com", + path="/backend-api/codex/responses", + query="", + headers=headers, + body='{"jsonrpc":"2.0","method":"initialize"}', + ) + self.assertIsNone(scan_outbound(route, text, { + "EGRESS_TOKEN_0": "sidecar-owned-secret", + })) + def test_token_in_body_blocked(self): text = build_outbound_scan_text( host="api.example.com",