feat(egress): extend outbound DLP scan to headers, query params, path, and hostname (PRD 0053)
lint / lint (push) Failing after 1m30s
test / unit (pull_request) Successful in 35s
test / integration (pull_request) Successful in 43s

This commit is contained in:
2026-06-06 17:43:55 +00:00
parent 9f3991164c
commit 4515c9e8ad
4 changed files with 221 additions and 6 deletions
+188
View File
@@ -18,12 +18,15 @@ from bot_bottle.egress_addon_core import (
MatchEntry,
PathMatch,
Route,
ScanResult,
build_outbound_scan_text,
decide,
evaluate_matches,
is_git_push_request,
load_routes,
match_route,
parse_routes,
scan_outbound,
)
@@ -661,5 +664,190 @@ class TestGitPushBlockFailFast(unittest.TestCase):
self.assertIn("403", result.stderr)
# --- build_outbound_scan_text -------------------------------------------
class TestBuildOutboundScanText(unittest.TestCase):
def _build(self, **kwargs):
defaults = dict(
host="api.example.com",
path="/v1/data",
query="",
headers={},
body="",
)
defaults.update(kwargs)
return build_outbound_scan_text(**defaults)
def test_host_appears(self):
text = self._build(host="secret.attacker.com")
self.assertIn("secret.attacker.com", text)
def test_path_appears(self):
text = self._build(path="/api/token-in-path")
self.assertIn("/api/token-in-path", text)
def test_query_appears(self):
text = self._build(query="api_key=abc123")
self.assertIn("api_key=abc123", text)
def test_empty_query_omitted(self):
text = self._build(query="")
self.assertEqual(1, text.count("\n")) # host + path only: one separator
def test_headers_appear(self):
text = self._build(headers={"x-api-key": "tok", "accept": "application/json"})
self.assertIn("x-api-key: tok", text)
self.assertIn("accept: application/json", text)
def test_body_appears(self):
text = self._build(body="hello world")
self.assertIn("hello world", text)
def test_empty_body_omitted(self):
text = self._build(body="")
self.assertNotIn("\n\n", text)
def test_all_surfaces_present(self):
text = build_outbound_scan_text(
host="h.example",
path="/p",
query="q=1",
headers={"x-h": "v"},
body="body",
)
for fragment in ["h.example", "/p", "q=1", "x-h: v", "body"]:
self.assertIn(fragment, text)
# --- scan_outbound -------------------------------------------------------
_AWS_KEY = "AKIAIOSFODNN7EXAMPLE"
_ROUTE = Route(host="api.example.com")
class TestScanOutbound(unittest.TestCase):
def test_clean_request_returns_none(self):
text = build_outbound_scan_text(
host="api.example.com",
path="/v1/data",
query="limit=10",
headers={"content-type": "application/json"},
body='{"msg": "hello"}',
)
self.assertIsNone(scan_outbound(_ROUTE, text, {}))
def test_token_in_body_blocked(self):
text = build_outbound_scan_text(
host="api.example.com",
path="/v1/data",
query="",
headers={},
body=f"key={_AWS_KEY}",
)
result = scan_outbound(_ROUTE, text, {})
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_token_in_path_blocked(self):
text = build_outbound_scan_text(
host="api.example.com",
path=f"/proxy/{_AWS_KEY}/resource",
query="",
headers={},
body="",
)
result = scan_outbound(_ROUTE, text, {})
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_token_in_query_param_blocked(self):
text = build_outbound_scan_text(
host="api.example.com",
path="/search",
query=f"aws_key={_AWS_KEY}",
headers={},
body="",
)
result = scan_outbound(_ROUTE, text, {})
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_token_in_non_auth_header_blocked(self):
text = build_outbound_scan_text(
host="api.example.com",
path="/v1/data",
query="",
headers={"x-aws-key": _AWS_KEY},
body="",
)
result = scan_outbound(_ROUTE, text, {})
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_token_in_hostname_blocked(self):
# DNS-tunnelling: secret encoded in subdomain label
text = build_outbound_scan_text(
host=f"{_AWS_KEY}.attacker.com",
path="/",
query="",
headers={},
body="",
)
result = scan_outbound(_ROUTE, text, {})
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_known_secret_in_query_param_blocked(self):
secret = "my-provisioned-secret"
env = {"EGRESS_TOKEN_0": secret}
text = build_outbound_scan_text(
host="api.example.com",
path="/data",
query=f"token={secret}",
headers={},
body="",
)
result = scan_outbound(_ROUTE, text, env)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_known_secret_in_path_blocked(self):
secret = "my-provisioned-secret"
env = {"EGRESS_TOKEN_0": secret}
text = build_outbound_scan_text(
host="api.example.com",
path=f"/proxy/{secret}/resource",
query="",
headers={},
body="",
)
result = scan_outbound(_ROUTE, text, env)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_known_secret_in_custom_header_blocked(self):
secret = "my-provisioned-secret"
env = {"EGRESS_TOKEN_0": secret}
text = build_outbound_scan_text(
host="api.example.com",
path="/data",
query="",
headers={"x-secret": secret},
body="",
)
result = scan_outbound(_ROUTE, text, env)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
if __name__ == "__main__":
unittest.main()