diff --git a/bot_bottle/egress_addon.py b/bot_bottle/egress_addon.py index 41abeff..a78f2c8 100644 --- a/bot_bottle/egress_addon.py +++ b/bot_bottle/egress_addon.py @@ -16,6 +16,7 @@ from mitmproxy import http # type: ignore[import-not-found] from egress_addon_core import ( # type: ignore[import-not-found] Route, + build_outbound_scan_text, decide, is_git_push_request, load_routes, @@ -98,14 +99,18 @@ class EgressAddon: return # DLP outbound scan BEFORE stripping auth — catches tokens the - # agent tried to smuggle in the Authorization header. + # agent tried to smuggle in any header, path, query param, or body. + # Hostname is included to catch DNS-tunnelling exfiltration attempts. route = match_route(self.routes, flow.request.pretty_host) if route is not None: body = flow.request.get_text(strict=False) or "" - auth_header = flow.request.headers.get("authorization", "") - scan_text = body - if auth_header: - scan_text = auth_header + "\n" + body + scan_text = build_outbound_scan_text( + flow.request.pretty_host, + request_path, + query, + dict(flow.request.headers), + body, + ) dlp_result = scan_outbound(route, scan_text, os.environ) if dlp_result is not None and dlp_result.severity == "block": self._block(flow, f"egress DLP: {dlp_result.reason}") diff --git a/bot_bottle/egress_addon_core.py b/bot_bottle/egress_addon_core.py index a6b3c09..9cc1e17 100644 --- a/bot_bottle/egress_addon_core.py +++ b/bot_bottle/egress_addon_core.py @@ -477,6 +477,27 @@ def decide( # DLP scan dispatch (PRD 0053) # --------------------------------------------------------------------------- +def build_outbound_scan_text( + host: str, + path: str, + query: str, + headers: typing.Mapping[str, str], + body: str, +) -> str: + """Assemble all outbound request surfaces into one string for DLP scanning. + + Covers hostname (DNS tunnelling), path, query params, all headers, body. + """ + parts: list[str] = [host, path] + if query: + parts.append(query) + for name, value in headers.items(): + parts.append(f"{name}: {value}") + if body: + parts.append(body) + return "\n".join(parts) + + def _detector_enabled( configured: tuple[str, ...] | None, name: str, @@ -541,6 +562,7 @@ __all__ = [ "PathMatch", "Route", "ScanResult", + "build_outbound_scan_text", "decide", "evaluate_matches", "is_git_push_request", diff --git a/docs/prds/0053-extended-outbound-scan.md b/docs/prds/0053-extended-outbound-scan.md index 2cbcf4c..3f2ff23 100644 --- a/docs/prds/0053-extended-outbound-scan.md +++ b/docs/prds/0053-extended-outbound-scan.md @@ -1,6 +1,6 @@ # PRD 0053: Extended outbound DLP scan surfaces -- **Status:** Draft +- **Status:** Active - **Author:** claude - **Created:** 2026-06-06 - **Issue:** #204 diff --git a/tests/unit/test_egress_addon_core.py b/tests/unit/test_egress_addon_core.py index 72f5d66..31f17fa 100644 --- a/tests/unit/test_egress_addon_core.py +++ b/tests/unit/test_egress_addon_core.py @@ -18,12 +18,15 @@ from bot_bottle.egress_addon_core import ( MatchEntry, PathMatch, Route, + ScanResult, + build_outbound_scan_text, decide, evaluate_matches, is_git_push_request, load_routes, match_route, parse_routes, + scan_outbound, ) @@ -661,5 +664,190 @@ class TestGitPushBlockFailFast(unittest.TestCase): self.assertIn("403", result.stderr) +# --- build_outbound_scan_text ------------------------------------------- + + +class TestBuildOutboundScanText(unittest.TestCase): + def _build(self, **kwargs): + defaults = dict( + host="api.example.com", + path="/v1/data", + query="", + headers={}, + body="", + ) + defaults.update(kwargs) + return build_outbound_scan_text(**defaults) + + def test_host_appears(self): + text = self._build(host="secret.attacker.com") + self.assertIn("secret.attacker.com", text) + + def test_path_appears(self): + text = self._build(path="/api/token-in-path") + self.assertIn("/api/token-in-path", text) + + def test_query_appears(self): + text = self._build(query="api_key=abc123") + self.assertIn("api_key=abc123", text) + + def test_empty_query_omitted(self): + text = self._build(query="") + self.assertEqual(1, text.count("\n")) # host + path only: one separator + + def test_headers_appear(self): + text = self._build(headers={"x-api-key": "tok", "accept": "application/json"}) + self.assertIn("x-api-key: tok", text) + self.assertIn("accept: application/json", text) + + def test_body_appears(self): + text = self._build(body="hello world") + self.assertIn("hello world", text) + + def test_empty_body_omitted(self): + text = self._build(body="") + self.assertNotIn("\n\n", text) + + def test_all_surfaces_present(self): + text = build_outbound_scan_text( + host="h.example", + path="/p", + query="q=1", + headers={"x-h": "v"}, + body="body", + ) + for fragment in ["h.example", "/p", "q=1", "x-h: v", "body"]: + self.assertIn(fragment, text) + + +# --- scan_outbound ------------------------------------------------------- + +_AWS_KEY = "AKIAIOSFODNN7EXAMPLE" +_ROUTE = Route(host="api.example.com") + + +class TestScanOutbound(unittest.TestCase): + def test_clean_request_returns_none(self): + text = build_outbound_scan_text( + host="api.example.com", + path="/v1/data", + query="limit=10", + headers={"content-type": "application/json"}, + body='{"msg": "hello"}', + ) + self.assertIsNone(scan_outbound(_ROUTE, text, {})) + + def test_token_in_body_blocked(self): + text = build_outbound_scan_text( + host="api.example.com", + path="/v1/data", + query="", + headers={}, + body=f"key={_AWS_KEY}", + ) + result = scan_outbound(_ROUTE, text, {}) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("block", result.severity) + + def test_token_in_path_blocked(self): + text = build_outbound_scan_text( + host="api.example.com", + path=f"/proxy/{_AWS_KEY}/resource", + query="", + headers={}, + body="", + ) + result = scan_outbound(_ROUTE, text, {}) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("block", result.severity) + + def test_token_in_query_param_blocked(self): + text = build_outbound_scan_text( + host="api.example.com", + path="/search", + query=f"aws_key={_AWS_KEY}", + headers={}, + body="", + ) + result = scan_outbound(_ROUTE, text, {}) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("block", result.severity) + + def test_token_in_non_auth_header_blocked(self): + text = build_outbound_scan_text( + host="api.example.com", + path="/v1/data", + query="", + headers={"x-aws-key": _AWS_KEY}, + body="", + ) + result = scan_outbound(_ROUTE, text, {}) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("block", result.severity) + + def test_token_in_hostname_blocked(self): + # DNS-tunnelling: secret encoded in subdomain label + text = build_outbound_scan_text( + host=f"{_AWS_KEY}.attacker.com", + path="/", + query="", + headers={}, + body="", + ) + result = scan_outbound(_ROUTE, text, {}) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("block", result.severity) + + def test_known_secret_in_query_param_blocked(self): + secret = "my-provisioned-secret" + env = {"EGRESS_TOKEN_0": secret} + text = build_outbound_scan_text( + host="api.example.com", + path="/data", + query=f"token={secret}", + headers={}, + body="", + ) + result = scan_outbound(_ROUTE, text, env) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("block", result.severity) + + def test_known_secret_in_path_blocked(self): + secret = "my-provisioned-secret" + env = {"EGRESS_TOKEN_0": secret} + text = build_outbound_scan_text( + host="api.example.com", + path=f"/proxy/{secret}/resource", + query="", + headers={}, + body="", + ) + result = scan_outbound(_ROUTE, text, env) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("block", result.severity) + + def test_known_secret_in_custom_header_blocked(self): + secret = "my-provisioned-secret" + env = {"EGRESS_TOKEN_0": secret} + text = build_outbound_scan_text( + host="api.example.com", + path="/data", + query="", + headers={"x-secret": secret}, + body="", + ) + result = scan_outbound(_ROUTE, text, env) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("block", result.severity) + + if __name__ == "__main__": unittest.main()