Files
bot-bottle/bot_bottle/egress_addon.py
T
didericis a04aed098d
test / unit (pull_request) Successful in 32s
test / integration (pull_request) Successful in 46s
lint / lint (push) Successful in 1m27s
test / unit (push) Successful in 35s
test / integration (push) Successful in 42s
Update Quality Badges / update-badges (push) Successful in 1m20s
fix(egress): strip Authorization before DLP scan; remove auth_header param from scan_outbound
2026-06-07 22:30:10 -04:00

234 lines
7.9 KiB
Python

"""mitmproxy addon entrypoint for the egress sidecar (PRD 0017, PRD 0053).
Loaded by `mitmdump -s /app/egress_addon.py` inside the
egress container."""
from __future__ import annotations
import dataclasses
import json
import os
import signal
import sys
from pathlib import Path
from mitmproxy import http # type: ignore[import-not-found] # pylint: disable=import-error
from egress_addon_core import ( # type: ignore[import-not-found] # pylint: disable=import-error
LOG_BLOCKS,
LOG_FULL,
Config,
decide,
is_git_push_request,
load_config,
match_route,
scan_inbound,
scan_outbound,
)
try:
from dlp_detectors import redact_tokens # type: ignore[import-not-found]
except ImportError: # pragma: no cover - host-side path
from bot_bottle.dlp_detectors import redact_tokens # type: ignore[import-not-found]
DEFAULT_ROUTES_PATH = "/etc/egress/routes.yaml"
INTROSPECT_HOST = "_egress.local"
class EgressAddon:
def __init__(self) -> None:
self.routes_path = os.environ.get("EGRESS_ROUTES", DEFAULT_ROUTES_PATH)
self.config: Config = Config(routes=())
self._reload(initial=True)
self._install_sighup()
def _reload(self, *, initial: bool = False) -> None:
try:
text = Path(self.routes_path).read_text(encoding="utf-8")
new_config = load_config(text)
except (OSError, ValueError) as e:
tag = "boot" if initial else "SIGHUP"
sys.stderr.write(
f"egress: {tag} load failed: {e}\n"
)
if initial:
self.config = Config(routes=())
return
self.config = new_config
log_label = ("off", "blocks", "full")[self.config.log]
sys.stderr.write(
f"egress: loaded {len(self.config.routes)} route(s): "
f"{', '.join(r.host for r in self.config.routes)}"
f" [log={log_label}]\n"
)
def _install_sighup(self) -> None:
if not hasattr(signal, "SIGHUP"):
return
def handler(signum: int, frame: object) -> None:
del signum, frame
self._reload()
signal.signal(signal.SIGHUP, handler)
def _serve_introspection(self, flow: http.HTTPFlow, path: str) -> None:
if path == "/allowlist":
payload = json.dumps(
{"routes": [dataclasses.asdict(r) for r in self.config.routes]},
indent=2,
).encode("utf-8")
flow.response = http.Response.make(
200, payload,
{"Content-Type": "application/json"},
)
return
flow.response = http.Response.make(
404,
f"egress introspection: no such endpoint {path!r}".encode(),
{"Content-Type": "text/plain; charset=utf-8"},
)
def _req_ctx(self, flow: http.HTTPFlow) -> dict[str, object]:
return {
"host": redact_tokens(flow.request.pretty_host, env=os.environ),
"method": flow.request.method,
"path": redact_tokens(flow.request.path, env=os.environ),
}
def _block(
self,
flow: http.HTTPFlow,
reason: str,
ctx: dict[str, object] | None = None,
) -> None:
if self.config.log >= LOG_BLOCKS:
entry: dict[str, object] = {"event": "egress_block", "reason": reason}
if ctx:
entry.update(ctx)
sys.stderr.write(json.dumps(entry) + "\n")
flow.response = http.Response.make(
403,
reason.encode("utf-8"),
{"Content-Type": "text/plain; charset=utf-8"},
)
def _log_request(self, flow: http.HTTPFlow) -> None:
sys.stderr.write(
json.dumps({
"event": "egress_request",
"host": redact_tokens(flow.request.pretty_host, env=os.environ),
"method": flow.request.method,
"path": redact_tokens(flow.request.path, env=os.environ),
"headers": dict(flow.request.headers),
"body": flow.request.get_text(strict=False) or "",
})
+ "\n"
)
def _log_response(self, flow: http.HTTPFlow) -> None:
sys.stderr.write(
json.dumps({
"event": "egress_response",
"host": flow.request.pretty_host,
"status": flow.response.status_code,
"headers": dict(flow.response.headers),
"body": flow.response.get_text(strict=False) or "",
})
+ "\n"
)
def request(self, flow: http.HTTPFlow) -> None:
request_path, _, query = flow.request.path.partition("?")
if flow.request.pretty_host == INTROSPECT_HOST:
self._serve_introspection(flow, request_path)
return
# Strip inbound Authorization before DLP and matching; the agent cannot
# smuggle tokens, and the route may inject sidecar-owned auth later.
flow.request.headers.pop("authorization", None)
# DLP outbound scan after auth stripping so placeholder or attempted
# agent auth headers do not become part of the scanned payload.
route = match_route(self.config.routes, flow.request.pretty_host)
if route is not None:
body = flow.request.get_text(strict=False) or ""
dlp_result = scan_outbound(route, body, os.environ)
if dlp_result is not None and dlp_result.severity == "block":
ctx = self._req_ctx(flow)
if dlp_result.context:
ctx = {**ctx, "context": dlp_result.context}
self._block(flow, f"egress DLP: {dlp_result.reason}", ctx=ctx)
return
if is_git_push_request(request_path, query):
self._block(
flow,
"egress: git push over HTTPS is not supported; "
"use the bottle.git SSH path (gitleaks-scanned by "
"git-gate's pre-receive hook).",
ctx=self._req_ctx(flow),
)
return
# Build headers mapping for match evaluation
req_headers = {k.lower(): v for k, v in flow.request.headers.items()}
decision = decide(
self.config.routes,
flow.request.pretty_host,
request_path,
os.environ,
request_method=flow.request.method,
request_headers=req_headers,
)
if decision.action == "block":
self._block(flow, decision.reason, ctx=self._req_ctx(flow))
return
if decision.inject_authorization is not None:
flow.request.headers["authorization"] = decision.inject_authorization
if self.config.log >= LOG_FULL:
self._log_request(flow)
def response(self, flow: http.HTTPFlow) -> None:
"""DLP inbound scan on response bodies (PRD 0053)."""
route = match_route(self.config.routes, flow.request.pretty_host)
if route is None:
return
if flow.response is None:
return
if self.config.log >= LOG_FULL:
self._log_response(flow)
body = flow.response.get_text(strict=False) or ""
if not body:
return
result = scan_inbound(route, body)
if result is None:
return
resp_ctx: dict[str, object] = {
**self._req_ctx(flow),
"response_status": flow.response.status_code,
}
if result.context:
resp_ctx = {**resp_ctx, "context": result.context}
if result.severity == "block":
self._block(flow, f"egress DLP: {result.reason}", ctx=resp_ctx)
elif result.severity == "warn" and self.config.log >= LOG_BLOCKS:
sys.stderr.write(
json.dumps({
"event": "egress_warn",
"reason": f"egress DLP: {result.reason}",
**resp_ctx,
})
+ "\n"
)
addons = [EgressAddon()]