diff --git a/bot_bottle/egress_addon.py b/bot_bottle/egress_addon.py index 4b3cbdc..41abeff 100644 --- a/bot_bottle/egress_addon.py +++ b/bot_bottle/egress_addon.py @@ -82,6 +82,14 @@ class EgressAddon: {"Content-Type": "text/plain; charset=utf-8"}, ) + def _block(self, flow: http.HTTPFlow, reason: str) -> None: + sys.stderr.write(f"{reason}\n") + flow.response = http.Response.make( + 403, + reason.encode("utf-8"), + {"Content-Type": "text/plain; charset=utf-8"}, + ) + def request(self, flow: http.HTTPFlow) -> None: request_path, _, query = flow.request.path.partition("?") @@ -100,25 +108,18 @@ class EgressAddon: scan_text = auth_header + "\n" + body dlp_result = scan_outbound(route, scan_text, os.environ) if dlp_result is not None and dlp_result.severity == "block": - flow.response = http.Response.make( - 403, - f"egress DLP: {dlp_result.reason}".encode("utf-8"), - {"Content-Type": "text/plain; charset=utf-8"}, - ) + self._block(flow, f"egress DLP: {dlp_result.reason}") return # Strip inbound Authorization — agent cannot smuggle tokens. flow.request.headers.pop("authorization", None) if is_git_push_request(request_path, query): - flow.response = http.Response.make( - 403, - ( - b"egress: git push over HTTPS is not supported; " - b"use the bottle.git SSH path (gitleaks-scanned by " - b"git-gate's pre-receive hook)." - ), - {"Content-Type": "text/plain; charset=utf-8"}, + self._block( + flow, + "egress: git push over HTTPS is not supported; " + "use the bottle.git SSH path (gitleaks-scanned by " + "git-gate's pre-receive hook).", ) return @@ -135,12 +136,7 @@ class EgressAddon: ) if decision.action == "block": - sys.stderr.write(f"{decision.reason}\n") - flow.response = http.Response.make( - 403, - decision.reason.encode("utf-8"), - {"Content-Type": "text/plain; charset=utf-8"}, - ) + self._block(flow, decision.reason) return if decision.inject_authorization is not None: @@ -160,11 +156,7 @@ class EgressAddon: if result is None: return if result.severity == "block": - flow.response = http.Response.make( - 403, - f"egress DLP: {result.reason}".encode("utf-8"), - {"Content-Type": "text/plain; charset=utf-8"}, - ) + self._block(flow, f"egress DLP: {result.reason}") elif result.severity == "warn": sys.stderr.write(f"egress DLP warn: {result.reason}\n")