feat(egress): replace log bool with integer log levels (0/1/2)
lint / lint (push) Failing after 1m25s
test / unit (pull_request) Successful in 31s
test / integration (pull_request) Successful in 42s

Level 0 (off, default): no stderr output beyond boot line.
Level 1 (blocks): each block/warn emitted as JSON with reason and
request context (host, method, path, response_status for inbound).
Level 2 (full): level-1 events + egress_request and egress_response
JSON lines for every forwarded connection.

Block logging at level 1+ replaces the previous plain-text stderr write.
DLP warn logging is also gated on level 1+. All block call sites now pass
_req_ctx(flow) so the blocked request is visible in the log entry.
Boot message shows log level label (off/blocks/full).

Adds PRD 0053 documenting wire format, manifest format, and all log event
shapes.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-06-06 14:16:12 -04:00
parent e72247a1b5
commit a165752edb
8 changed files with 287 additions and 53 deletions
+42 -10
View File
@@ -15,6 +15,8 @@ from pathlib import Path
from mitmproxy import http # type: ignore[import-not-found]
from egress_addon_core import ( # type: ignore[import-not-found]
LOG_BLOCKS,
LOG_FULL,
Config,
Route,
decide,
@@ -51,10 +53,11 @@ class EgressAddon:
self.config = Config(routes=())
return
self.config = new_config
log_label = ("off", "blocks", "full")[self.config.log]
sys.stderr.write(
f"egress: loaded {len(self.config.routes)} route(s): "
f"{', '.join(r.host for r in self.config.routes)}"
f"{' [log=on]' if self.config.log else ''}\n"
f" [log={log_label}]\n"
)
def _install_sighup(self) -> None:
@@ -84,8 +87,24 @@ class EgressAddon:
{"Content-Type": "text/plain; charset=utf-8"},
)
def _block(self, flow: http.HTTPFlow, reason: str) -> None:
sys.stderr.write(f"{reason}\n")
def _req_ctx(self, flow: http.HTTPFlow) -> dict[str, object]:
return {
"host": flow.request.pretty_host,
"method": flow.request.method,
"path": flow.request.path,
}
def _block(
self,
flow: http.HTTPFlow,
reason: str,
ctx: dict[str, object] | None = None,
) -> None:
if self.config.log >= LOG_BLOCKS:
entry: dict[str, object] = {"event": "egress_block", "reason": reason}
if ctx:
entry.update(ctx)
sys.stderr.write(json.dumps(entry) + "\n")
flow.response = http.Response.make(
403,
reason.encode("utf-8"),
@@ -135,7 +154,11 @@ class EgressAddon:
scan_text = auth_header + "\n" + body
dlp_result = scan_outbound(route, scan_text, os.environ)
if dlp_result is not None and dlp_result.severity == "block":
self._block(flow, f"egress DLP: {dlp_result.reason}")
self._block(
flow,
f"egress DLP: {dlp_result.reason}",
ctx=self._req_ctx(flow),
)
return
# Strip inbound Authorization — agent cannot smuggle tokens.
@@ -147,6 +170,7 @@ class EgressAddon:
"egress: git push over HTTPS is not supported; "
"use the bottle.git SSH path (gitleaks-scanned by "
"git-gate's pre-receive hook).",
ctx=self._req_ctx(flow),
)
return
@@ -163,13 +187,13 @@ class EgressAddon:
)
if decision.action == "block":
self._block(flow, decision.reason)
self._block(flow, decision.reason, ctx=self._req_ctx(flow))
return
if decision.inject_authorization is not None:
flow.request.headers["authorization"] = decision.inject_authorization
if self.config.log:
if self.config.log >= LOG_FULL:
self._log_request(flow)
def response(self, flow: http.HTTPFlow) -> None:
@@ -179,7 +203,7 @@ class EgressAddon:
return
if flow.response is None:
return
if self.config.log:
if self.config.log >= LOG_FULL:
self._log_response(flow)
body = flow.response.get_text(strict=False) or ""
if not body:
@@ -187,10 +211,18 @@ class EgressAddon:
result = scan_inbound(route, body)
if result is None:
return
resp_ctx = {**self._req_ctx(flow), "response_status": flow.response.status_code}
if result.severity == "block":
self._block(flow, f"egress DLP: {result.reason}")
elif result.severity == "warn":
sys.stderr.write(f"egress DLP warn: {result.reason}\n")
self._block(flow, f"egress DLP: {result.reason}", ctx=resp_ctx)
elif result.severity == "warn" and self.config.log >= LOG_BLOCKS:
sys.stderr.write(
json.dumps({
"event": "egress_warn",
"reason": f"egress DLP: {result.reason}",
**resp_ctx,
})
+ "\n"
)
addons = [EgressAddon()]