fix(egress): ignore stripped auth header in DLP scan
lint / lint (push) Failing after 1m32s
test / unit (pull_request) Successful in 32s
test / integration (pull_request) Successful in 17s

This commit is contained in:
2026-06-08 15:43:46 -04:00
parent 37a780acf6
commit a397d37bbe
3 changed files with 81 additions and 1 deletions
+57
View File
@@ -30,6 +30,7 @@ from bot_bottle.egress_addon_core import (
load_config,
load_routes,
match_route,
outbound_scan_headers,
parse_config,
parse_routes,
scan_inbound,
@@ -798,6 +799,41 @@ class TestBuildOutboundScanText(unittest.TestCase):
self.assertIn(fragment, text)
class TestOutboundScanHeaders(unittest.TestCase):
def test_authed_route_omits_authorization_header_from_scan(self):
route = Route(
host="chatgpt.com",
auth_scheme="Bearer",
token_env="EGRESS_TOKEN_0",
)
headers = outbound_scan_headers(route, {
"Authorization": "Bearer " + "A" * 60,
"x-api-key": "still-scanned",
})
self.assertNotIn("Authorization", headers)
self.assertEqual({"x-api-key": "still-scanned"}, headers)
def test_authed_route_omits_lowercase_authorization_header_from_scan(self):
route = Route(
host="chatgpt.com",
auth_scheme="Bearer",
token_env="EGRESS_TOKEN_0",
)
headers = outbound_scan_headers(route, {
"authorization": "Bearer " + "A" * 60,
"accept": "application/json",
})
self.assertEqual({"accept": "application/json"}, headers)
def test_unauthenticated_route_keeps_authorization_header_in_scan(self):
route = Route(host="api.example.com")
auth = "Bearer " + "A" * 60
headers = outbound_scan_headers(route, {
"Authorization": auth,
})
self.assertEqual({"Authorization": auth}, headers)
# --- scan_outbound -------------------------------------------------------
_AWS_KEY = "AKIAIOSFODNN7EXAMPLE"
@@ -815,6 +851,27 @@ class TestScanOutbound(unittest.TestCase):
)
self.assertIsNone(scan_outbound(_ROUTE, text, {}))
def test_authed_route_authorization_placeholder_not_scanned(self):
route = Route(
host="chatgpt.com",
auth_scheme="Bearer",
token_env="EGRESS_TOKEN_0",
)
headers = outbound_scan_headers(route, {
"Authorization": "Bearer " + "A" * 60,
"content-type": "application/json",
})
text = build_outbound_scan_text(
host="chatgpt.com",
path="/backend-api/codex/responses",
query="",
headers=headers,
body='{"jsonrpc":"2.0","method":"initialize"}',
)
self.assertIsNone(scan_outbound(route, text, {
"EGRESS_TOKEN_0": "sidecar-owned-secret",
}))
def test_token_in_body_blocked(self):
text = build_outbound_scan_text(
host="api.example.com",