feat(egress): extend outbound DLP scan to headers, query params, path, and hostname (PRD 0053)

This commit is contained in:
2026-06-06 17:43:55 +00:00
committed by didericis
parent 2c51bc47e8
commit b1283a0e7b
4 changed files with 226 additions and 8 deletions
+187
View File
@@ -22,6 +22,8 @@ from bot_bottle.egress_addon_core import (
MatchEntry,
PathMatch,
Route,
ScanResult,
build_outbound_scan_text,
decide,
evaluate_matches,
is_git_push_request,
@@ -733,5 +735,190 @@ class TestGitPushBlockFailFast(unittest.TestCase):
self.assertIn("403", result.stderr)
# --- build_outbound_scan_text -------------------------------------------
class TestBuildOutboundScanText(unittest.TestCase):
def _build(self, **kwargs):
defaults = dict(
host="api.example.com",
path="/v1/data",
query="",
headers={},
body="",
)
defaults.update(kwargs)
return build_outbound_scan_text(**defaults)
def test_host_appears(self):
text = self._build(host="secret.attacker.com")
self.assertIn("secret.attacker.com", text)
def test_path_appears(self):
text = self._build(path="/api/token-in-path")
self.assertIn("/api/token-in-path", text)
def test_query_appears(self):
text = self._build(query="api_key=abc123")
self.assertIn("api_key=abc123", text)
def test_empty_query_omitted(self):
text = self._build(query="")
self.assertEqual(1, text.count("\n")) # host + path only: one separator
def test_headers_appear(self):
text = self._build(headers={"x-api-key": "tok", "accept": "application/json"})
self.assertIn("x-api-key: tok", text)
self.assertIn("accept: application/json", text)
def test_body_appears(self):
text = self._build(body="hello world")
self.assertIn("hello world", text)
def test_empty_body_omitted(self):
text = self._build(body="")
self.assertNotIn("\n\n", text)
def test_all_surfaces_present(self):
text = build_outbound_scan_text(
host="h.example",
path="/p",
query="q=1",
headers={"x-h": "v"},
body="body",
)
for fragment in ["h.example", "/p", "q=1", "x-h: v", "body"]:
self.assertIn(fragment, text)
# --- scan_outbound -------------------------------------------------------
_AWS_KEY = "AKIAIOSFODNN7EXAMPLE"
_ROUTE = Route(host="api.example.com")
class TestScanOutbound(unittest.TestCase):
def test_clean_request_returns_none(self):
text = build_outbound_scan_text(
host="api.example.com",
path="/v1/data",
query="limit=10",
headers={"content-type": "application/json"},
body='{"msg": "hello"}',
)
self.assertIsNone(scan_outbound(_ROUTE, text, {}))
def test_token_in_body_blocked(self):
text = build_outbound_scan_text(
host="api.example.com",
path="/v1/data",
query="",
headers={},
body=f"key={_AWS_KEY}",
)
result = scan_outbound(_ROUTE, text, {})
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_token_in_path_blocked(self):
text = build_outbound_scan_text(
host="api.example.com",
path=f"/proxy/{_AWS_KEY}/resource",
query="",
headers={},
body="",
)
result = scan_outbound(_ROUTE, text, {})
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_token_in_query_param_blocked(self):
text = build_outbound_scan_text(
host="api.example.com",
path="/search",
query=f"aws_key={_AWS_KEY}",
headers={},
body="",
)
result = scan_outbound(_ROUTE, text, {})
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_token_in_non_auth_header_blocked(self):
text = build_outbound_scan_text(
host="api.example.com",
path="/v1/data",
query="",
headers={"x-aws-key": _AWS_KEY},
body="",
)
result = scan_outbound(_ROUTE, text, {})
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_token_in_hostname_blocked(self):
# DNS-tunnelling: secret encoded in subdomain label
text = build_outbound_scan_text(
host=f"{_AWS_KEY}.attacker.com",
path="/",
query="",
headers={},
body="",
)
result = scan_outbound(_ROUTE, text, {})
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_known_secret_in_query_param_blocked(self):
secret = "my-provisioned-secret"
env = {"EGRESS_TOKEN_0": secret}
text = build_outbound_scan_text(
host="api.example.com",
path="/data",
query=f"token={secret}",
headers={},
body="",
)
result = scan_outbound(_ROUTE, text, env)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_known_secret_in_path_blocked(self):
secret = "my-provisioned-secret"
env = {"EGRESS_TOKEN_0": secret}
text = build_outbound_scan_text(
host="api.example.com",
path=f"/proxy/{secret}/resource",
query="",
headers={},
body="",
)
result = scan_outbound(_ROUTE, text, env)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
def test_known_secret_in_custom_header_blocked(self):
secret = "my-provisioned-secret"
env = {"EGRESS_TOKEN_0": secret}
text = build_outbound_scan_text(
host="api.example.com",
path="/data",
query="",
headers={"x-secret": secret},
body="",
)
result = scan_outbound(_ROUTE, text, env)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
if __name__ == "__main__":
unittest.main()