feat(egress): replace log bool with integer log levels (0/1/2)

Level 0 (off, default): no stderr output beyond boot line.
Level 1 (blocks): each block/warn emitted as JSON with reason and
request context (host, method, path, response_status for inbound).
Level 2 (full): level-1 events + egress_request and egress_response
JSON lines for every forwarded connection.

Block logging at level 1+ replaces the previous plain-text stderr write.
DLP warn logging is also gated on level 1+. All block call sites now pass
_req_ctx(flow) so the blocked request is visible in the log entry.
Boot message shows log level label (off/blocks/full).

Adds PRD 0053 documenting wire format, manifest format, and all log event
shapes.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-06-06 14:16:12 -04:00
parent 76dd153760
commit 79212481c9
8 changed files with 287 additions and 53 deletions
+3 -3
View File
@@ -62,7 +62,7 @@ class EgressPlan:
egress_network: str = ""
mitmproxy_ca_host_path: Path = Path()
mitmproxy_ca_cert_only_host_path: Path = Path()
log: bool = False
log: int = 0
def egress_manifest_routes(
@@ -192,11 +192,11 @@ def _route_to_yaml_fields(r: Route) -> dict[str, object]:
def egress_render_routes(
routes: tuple[EgressRoute, ...],
*,
log: bool = False,
log: int = 0,
) -> str:
lines: list[str] = []
if log:
lines.append("log: true")
lines.append(f"log: {log}")
lines.append("routes:")
if not routes:
lines[-1] = "routes: []"
+42 -10
View File
@@ -15,6 +15,8 @@ from pathlib import Path
from mitmproxy import http # type: ignore[import-not-found]
from egress_addon_core import ( # type: ignore[import-not-found]
LOG_BLOCKS,
LOG_FULL,
Config,
Route,
decide,
@@ -51,10 +53,11 @@ class EgressAddon:
self.config = Config(routes=())
return
self.config = new_config
log_label = ("off", "blocks", "full")[self.config.log]
sys.stderr.write(
f"egress: loaded {len(self.config.routes)} route(s): "
f"{', '.join(r.host for r in self.config.routes)}"
f"{' [log=on]' if self.config.log else ''}\n"
f" [log={log_label}]\n"
)
def _install_sighup(self) -> None:
@@ -84,8 +87,24 @@ class EgressAddon:
{"Content-Type": "text/plain; charset=utf-8"},
)
def _block(self, flow: http.HTTPFlow, reason: str) -> None:
sys.stderr.write(f"{reason}\n")
def _req_ctx(self, flow: http.HTTPFlow) -> dict[str, object]:
return {
"host": flow.request.pretty_host,
"method": flow.request.method,
"path": flow.request.path,
}
def _block(
self,
flow: http.HTTPFlow,
reason: str,
ctx: dict[str, object] | None = None,
) -> None:
if self.config.log >= LOG_BLOCKS:
entry: dict[str, object] = {"event": "egress_block", "reason": reason}
if ctx:
entry.update(ctx)
sys.stderr.write(json.dumps(entry) + "\n")
flow.response = http.Response.make(
403,
reason.encode("utf-8"),
@@ -135,7 +154,11 @@ class EgressAddon:
scan_text = auth_header + "\n" + body
dlp_result = scan_outbound(route, scan_text, os.environ)
if dlp_result is not None and dlp_result.severity == "block":
self._block(flow, f"egress DLP: {dlp_result.reason}")
self._block(
flow,
f"egress DLP: {dlp_result.reason}",
ctx=self._req_ctx(flow),
)
return
# Strip inbound Authorization — agent cannot smuggle tokens.
@@ -147,6 +170,7 @@ class EgressAddon:
"egress: git push over HTTPS is not supported; "
"use the bottle.git SSH path (gitleaks-scanned by "
"git-gate's pre-receive hook).",
ctx=self._req_ctx(flow),
)
return
@@ -163,13 +187,13 @@ class EgressAddon:
)
if decision.action == "block":
self._block(flow, decision.reason)
self._block(flow, decision.reason, ctx=self._req_ctx(flow))
return
if decision.inject_authorization is not None:
flow.request.headers["authorization"] = decision.inject_authorization
if self.config.log:
if self.config.log >= LOG_FULL:
self._log_request(flow)
def response(self, flow: http.HTTPFlow) -> None:
@@ -179,7 +203,7 @@ class EgressAddon:
return
if flow.response is None:
return
if self.config.log:
if self.config.log >= LOG_FULL:
self._log_response(flow)
body = flow.response.get_text(strict=False) or ""
if not body:
@@ -187,10 +211,18 @@ class EgressAddon:
result = scan_inbound(route, body)
if result is None:
return
resp_ctx = {**self._req_ctx(flow), "response_status": flow.response.status_code}
if result.severity == "block":
self._block(flow, f"egress DLP: {result.reason}")
elif result.severity == "warn":
sys.stderr.write(f"egress DLP warn: {result.reason}\n")
self._block(flow, f"egress DLP: {result.reason}", ctx=resp_ctx)
elif result.severity == "warn" and self.config.log >= LOG_BLOCKS:
sys.stderr.write(
json.dumps({
"event": "egress_warn",
"reason": f"egress DLP: {result.reason}",
**resp_ctx,
})
+ "\n"
)
addons = [EgressAddon()]
+16 -5
View File
@@ -70,10 +70,15 @@ class Route:
inbound_detectors: tuple[str, ...] | None = None
LOG_OFF = 0 # no logging
LOG_BLOCKS = 1 # log block/warn events with request context
LOG_FULL = 2 # log block/warn events + full request and response bodies
@dataclass(frozen=True)
class Config:
routes: tuple[Route, ...]
log: bool = False
log: int = LOG_OFF
@dataclass(frozen=True)
@@ -341,14 +346,17 @@ def load_routes(text: str) -> tuple[Route, ...]:
def parse_config(payload: object) -> "Config":
"""Parse a full egress config payload (top-level log flag + routes)."""
"""Parse a full egress config payload (top-level log level + routes)."""
if not isinstance(payload, dict):
raise ValueError("routes payload: top-level must be an object")
payload_dict: dict[str, object] = typing.cast(dict[str, object], payload)
log_raw: object = payload_dict.get("log", False)
if not isinstance(log_raw, bool):
raise ValueError("routes payload: 'log' must be true or false")
log_raw: object = payload_dict.get("log", LOG_OFF)
if log_raw is True or log_raw is False or not isinstance(log_raw, int) \
or log_raw not in (LOG_OFF, LOG_BLOCKS, LOG_FULL):
raise ValueError(
f"routes payload: 'log' must be {LOG_OFF}, {LOG_BLOCKS}, or {LOG_FULL}"
)
routes = parse_routes(payload)
return Config(routes=routes, log=log_raw)
@@ -564,6 +572,9 @@ def scan_inbound(
__all__ = [
"LOG_BLOCKS",
"LOG_FULL",
"LOG_OFF",
"Config",
"Decision",
"HeaderMatch",
+8 -4
View File
@@ -346,10 +346,13 @@ def _parse_dlp_block(
return outbound, inbound
LOG_LEVELS = frozenset({0, 1, 2})
@dataclass(frozen=True)
class EgressConfig:
routes: tuple[EgressRoute, ...] = ()
Log: bool = False
Log: int = 0
@classmethod
def from_dict(cls, bottle_name: str, raw: object) -> "EgressConfig":
@@ -368,10 +371,11 @@ class EgressConfig:
for i, entry in enumerate(routes_list)
)
validate_egress_routes(bottle_name, routes)
log_raw = d.get("log", False)
if not isinstance(log_raw, bool):
log_raw = d.get("log", 0)
if isinstance(log_raw, bool) or not isinstance(log_raw, int) \
or log_raw not in LOG_LEVELS:
raise ManifestError(
f"bottle '{bottle_name}' egress.log must be true or false"
f"bottle '{bottle_name}' egress.log must be 0, 1, or 2"
)
for k in d:
if k not in ("routes", "log"):
+148
View File
@@ -0,0 +1,148 @@
# PRD 0053: Egress traffic logging
- **Status:** Active
- **Author:** claude
- **Created:** 2026-06-06
- **PR:** #207
## Summary
Adds structured log levels to the egress proxy so operators can observe
traffic and security decisions without modifying any application code.
Three integer levels control verbosity: `0` (off), `1` (security events
only), and `2` (full request/response capture). All output is JSON lines
written to stderr.
## Problem
The egress proxy makes per-request allow/block decisions and DLP scans, but
until now those decisions are invisible unless something is actively blocked
and the caller inspects the 403 body. Debugging unexpected blocks, auditing
what an agent is sending upstream, and verifying DLP detector behaviour all
require adding ad-hoc instrumentation or tailing the sidecar container logs
with no structure to grep against.
## Goals / Success Criteria
1. **Level 0 (off, default):** no egress output to stderr beyond the boot
line. Existing behaviour for production deployments.
2. **Level 1 (blocks):** every block or DLP warn event is emitted to stderr
as a JSON line with the event type, human-readable reason (including the
secret type detected for DLP hits), and the request context (host, method,
path; plus upstream status code for response-phase events). No traffic
bodies are logged.
3. **Level 2 (full):** all level-1 events, plus a `egress_request` JSON line
for every forwarded request (method, path, headers, body after auth
injection) and an `egress_response` JSON line for every response that
passes DLP (status, headers, body).
4. The log level is a single integer field `log` at the top of the egress
config (routes.yaml in the sidecar; `egress.log` in the bottle manifest).
Values other than 0, 1, 2 are rejected at parse time on both sides.
5. The boot message includes the active log level label (`off`, `blocks`,
`full`).
## Non-goals
- Log rotation or file sinks — stderr output is captured by the container
runtime (Docker, smolmachines) and goes wherever the operator routes it.
- Per-route log levels — all routes share the global level.
- Redacting secrets from the level-2 body dump — at level 2 the operator
has explicitly requested full visibility; redaction belongs in the
log consumer, not the proxy.
## Design
### Wire format
`routes.yaml` gains an optional top-level `log` key:
```yaml
log: 1 # 0 = off (default), 1 = blocks, 2 = full
routes:
- host: "api.anthropic.com"
...
```
The field is omitted entirely when the level is 0 (default).
### Manifest format
```yaml
egress:
log: 1
routes:
- host: "api.anthropic.com"
...
```
`egress.log` accepts integers 0, 1, or 2. Booleans and strings are rejected.
### Log events
**Block / DLP block (level ≥ 1):**
```json
{
"event": "egress_block",
"reason": "egress DLP: GitHub token (classic) found in request",
"host": "api.github.com",
"method": "POST",
"path": "/gists"
}
```
Response-phase block also includes `"response_status"`.
**DLP warn (level ≥ 1):**
```json
{
"event": "egress_warn",
"reason": "egress DLP: possible prompt injection detected",
"host": "api.anthropic.com",
"method": "POST",
"path": "/v1/messages",
"response_status": 200
}
```
**Forwarded request (level 2):**
```json
{
"event": "egress_request",
"host": "api.anthropic.com",
"method": "POST",
"path": "/v1/messages",
"headers": { "authorization": "Bearer sk-ant-...", "content-type": "application/json" },
"body": "{\"model\": \"claude-opus-4-8\", ...}"
}
```
The request is logged after auth injection, so the outgoing `Authorization`
header is present. The agent's original `Authorization` header is stripped
before logging.
**Response (level 2):**
```json
{
"event": "egress_response",
"host": "api.anthropic.com",
"status": 200,
"headers": { "content-type": "application/json" },
"body": "{\"id\": \"msg_...\", ...}"
}
```
Responses are logged before DLP scanning, so the body is always the raw
upstream response.
### Implementation
- **`egress_addon_core.py`**: `Config.log: int = LOG_OFF` (`LOG_OFF=0`,
`LOG_BLOCKS=1`, `LOG_FULL=2`). `parse_config()` validates the integer and
rejects booleans.
- **`egress_addon.py`**: `_block()` emits JSON when `log >= LOG_BLOCKS`. The
`_req_ctx()` helper builds `{host, method, path}` for every call site.
`_log_request()` / `_log_response()` fire when `log >= LOG_FULL`.
- **`manifest_egress.py`**: `EgressConfig.Log: int = 0`, parsed from
`egress.log`, validated against `{0, 1, 2}`.
- **`egress.py`**: `egress_render_routes(routes, *, log: int = 0)` emits
`log: N` at the top of routes.yaml when N > 0. `EgressPlan.log: int = 0`.
+15 -13
View File
@@ -324,43 +324,45 @@ class TestRenderRoutes(unittest.TestCase):
self.assertEqual(("token_patterns",), addon_routes[0].outbound_detectors)
self.assertEqual((), addon_routes[0].inbound_detectors)
def test_log_false_omitted_from_render(self):
def test_log_zero_omitted_from_render(self):
b = _bottle([{"host": "x.example"}])
routes = egress_routes_for_bottle(b)
rendered = egress_render_routes(routes, log=False)
rendered = egress_render_routes(routes, log=0)
self.assertNotIn("log:", rendered)
def test_log_true_emitted_at_top_level(self):
def test_log_level_emitted_at_top_level(self):
b = _bottle([{"host": "x.example"}])
routes = egress_routes_for_bottle(b)
rendered = egress_render_routes(routes, log=True)
self.assertTrue(rendered.startswith("log: true\n"))
for level in (1, 2):
with self.subTest(level=level):
rendered = egress_render_routes(routes, log=level)
self.assertTrue(rendered.startswith(f"log: {level}\n"))
def test_log_true_round_trips_to_addon_core(self):
from bot_bottle.egress_addon_core import load_config
def test_log_level_round_trips_to_addon_core(self):
from bot_bottle.egress_addon_core import load_config, LOG_FULL
b = _bottle([{"host": "x.example"}])
routes = egress_routes_for_bottle(b)
rendered = egress_render_routes(routes, log=True)
rendered = egress_render_routes(routes, log=LOG_FULL)
cfg = load_config(rendered)
self.assertTrue(cfg.log)
self.assertEqual(LOG_FULL, cfg.log)
self.assertEqual("x.example", cfg.routes[0].host)
def test_log_via_manifest_flows_to_render(self):
from bot_bottle.manifest import Manifest
from bot_bottle.egress_addon_core import load_config
from bot_bottle.egress_addon_core import load_config, LOG_BLOCKS
m = Manifest.from_json_obj({
"bottles": {"dev": {"egress": {
"log": True,
"log": 1,
"routes": [{"host": "x.example"}],
}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
})
bottle = m.bottles["dev"]
self.assertTrue(bottle.egress.Log)
self.assertEqual(LOG_BLOCKS, bottle.egress.Log)
routes = egress_routes_for_bottle(bottle)
rendered = egress_render_routes(routes, log=bottle.egress.Log)
cfg = load_config(rendered)
self.assertTrue(cfg.log)
self.assertEqual(LOG_BLOCKS, cfg.log)
class TestResolveTokenValues(unittest.TestCase):
+25 -11
View File
@@ -13,6 +13,9 @@ from pathlib import Path
from urllib.parse import urlsplit
from bot_bottle.egress_addon_core import (
LOG_BLOCKS,
LOG_FULL,
LOG_OFF,
Config,
Decision,
HeaderMatch,
@@ -278,23 +281,34 @@ class TestLoadRoutes(unittest.TestCase):
class TestLoadConfig(unittest.TestCase):
def test_log_defaults_to_false(self):
def test_log_defaults_to_off(self):
cfg = load_config('routes:\n - host: "api.example"\n')
self.assertFalse(cfg.log)
self.assertEqual(LOG_OFF, cfg.log)
self.assertEqual(1, len(cfg.routes))
def test_log_true_parsed(self):
cfg = load_config('log: true\nroutes:\n - host: "api.example"\n')
self.assertTrue(cfg.log)
self.assertEqual(1, len(cfg.routes))
def test_log_level_1_parsed(self):
cfg = load_config('log: 1\nroutes:\n - host: "api.example"\n')
self.assertEqual(LOG_BLOCKS, cfg.log)
def test_log_false_explicit(self):
cfg = load_config('log: false\nroutes:\n - host: "api.example"\n')
self.assertFalse(cfg.log)
def test_log_level_2_parsed(self):
cfg = load_config('log: 2\nroutes:\n - host: "api.example"\n')
self.assertEqual(LOG_FULL, cfg.log)
def test_log_non_bool_rejected(self):
def test_log_level_0_explicit(self):
cfg = load_config('log: 0\nroutes:\n - host: "api.example"\n')
self.assertEqual(LOG_OFF, cfg.log)
def test_log_invalid_level_rejected(self):
with self.assertRaises(ValueError):
load_config('log: "yes"\nroutes: []\n')
load_config('log: 3\nroutes: []\n')
def test_log_bool_rejected(self):
with self.assertRaises(ValueError):
load_config('log: true\nroutes: []\n')
def test_log_string_rejected(self):
with self.assertRaises(ValueError):
load_config('log: "full"\nroutes: []\n')
def test_routes_accessible_via_config(self):
cfg = load_config('routes:\n - host: "x.example"\n')
+30 -7
View File
@@ -346,21 +346,44 @@ class TestConfigShape(unittest.TestCase):
"bottle": "dev"}},
})
def test_log_defaults_false(self):
def test_log_defaults_zero(self):
b = _bottle([])
self.assertFalse(b.egress.Log)
self.assertEqual(0, b.egress.Log)
def test_log_true_accepted(self):
def test_log_level_1_accepted(self):
b = Manifest.from_json_obj({
"bottles": {"dev": {"egress": {"log": True, "routes": []}}},
"bottles": {"dev": {"egress": {"log": 1, "routes": []}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
}).bottles["dev"]
self.assertTrue(b.egress.Log)
self.assertEqual(1, b.egress.Log)
def test_log_non_bool_rejected(self):
def test_log_level_2_accepted(self):
b = Manifest.from_json_obj({
"bottles": {"dev": {"egress": {"log": 2, "routes": []}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
}).bottles["dev"]
self.assertEqual(2, b.egress.Log)
def test_log_invalid_level_rejected(self):
with self.assertRaises(ManifestError):
Manifest.from_json_obj({
"bottles": {"dev": {"egress": {"log": "yes"}}},
"bottles": {"dev": {"egress": {"log": 3}}},
"agents": {"demo": {"skills": [], "prompt": "",
"bottle": "dev"}},
})
def test_log_bool_rejected(self):
with self.assertRaises(ManifestError):
Manifest.from_json_obj({
"bottles": {"dev": {"egress": {"log": True}}},
"agents": {"demo": {"skills": [], "prompt": "",
"bottle": "dev"}},
})
def test_log_string_rejected(self):
with self.assertRaises(ManifestError):
Manifest.from_json_obj({
"bottles": {"dev": {"egress": {"log": "full"}}},
"agents": {"demo": {"skills": [], "prompt": "",
"bottle": "dev"}},
})