feat(egress): add global log option for full request/response logging
lint / lint (push) Failing after 1m25s
test / unit (pull_request) Successful in 29s
test / integration (pull_request) Successful in 44s

Adds a top-level `log: true` option to the egress config that logs the
full request (method, path, headers, body) and response (status, headers,
body) for every forwarded connection as JSON lines on stderr.

Wire format: `log: true` at the root of routes.yaml, parsed into the new
`Config` dataclass alongside `routes`. The sidecar addon switches from
`self.routes` to `self.config` and writes `_log_request` / `_log_response`
JSON lines when `self.config.log` is set.

Manifest: `egress.log: true` in bottle YAML flows through `EgressConfig.Log`
→ `Egress.prepare()` → `egress_render_routes(..., log=)` → routes.yaml.
`EgressPlan` also carries the flag for introspection.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-06-06 13:59:48 -04:00
parent e82bbb587f
commit e72247a1b5
7 changed files with 183 additions and 17 deletions
+11 -3
View File
@@ -62,6 +62,7 @@ class EgressPlan:
egress_network: str = ""
mitmproxy_ca_host_path: Path = Path()
mitmproxy_ca_cert_only_host_path: Path = Path()
log: bool = False
def egress_manifest_routes(
@@ -190,10 +191,15 @@ def _route_to_yaml_fields(r: Route) -> dict[str, object]:
def egress_render_routes(
routes: tuple[EgressRoute, ...],
*,
log: bool = False,
) -> str:
lines: list[str] = ["routes:"]
lines: list[str] = []
if log:
lines.append("log: true")
lines.append("routes:")
if not routes:
lines[0] = "routes: []"
lines[-1] = "routes: []"
return "\n".join(lines) + "\n"
for r in routes:
f = _route_to_yaml_fields(r)
@@ -279,14 +285,16 @@ class Egress(ABC):
provider_routes: tuple[EgressRoute, ...] = (),
) -> EgressPlan:
routes = egress_routes_for_bottle(bottle, provider_routes)
log = bottle.egress.Log
routes_path = stage_dir / "egress_routes.yaml"
routes_path.write_text(egress_render_routes(routes))
routes_path.write_text(egress_render_routes(routes, log=log))
routes_path.chmod(0o600)
return EgressPlan(
slug=slug,
routes_path=routes_path,
routes=routes,
token_env_map=egress_token_env_map(routes),
log=log,
)
__all__ = [
+43 -11
View File
@@ -15,10 +15,11 @@ from pathlib import Path
from mitmproxy import http # type: ignore[import-not-found]
from egress_addon_core import ( # type: ignore[import-not-found]
Config,
Route,
decide,
is_git_push_request,
load_routes,
load_config,
match_route,
scan_inbound,
scan_outbound,
@@ -33,26 +34,27 @@ INTROSPECT_HOST = "_egress.local"
class EgressAddon:
def __init__(self) -> None:
self.routes_path = os.environ.get("EGRESS_ROUTES", DEFAULT_ROUTES_PATH)
self.routes: tuple[Route, ...] = ()
self.config: Config = Config(routes=())
self._reload(initial=True)
self._install_sighup()
def _reload(self, *, initial: bool = False) -> None:
try:
text = Path(self.routes_path).read_text(encoding="utf-8")
new_routes = load_routes(text)
new_config = load_config(text)
except (OSError, ValueError) as e:
tag = "boot" if initial else "SIGHUP"
sys.stderr.write(
f"egress: {tag} load failed: {e}\n"
)
if initial:
self.routes = ()
self.config = Config(routes=())
return
self.routes = new_routes
self.config = new_config
sys.stderr.write(
f"egress: loaded {len(self.routes)} route(s): "
f"{', '.join(r.host for r in self.routes)}\n"
f"egress: loaded {len(self.config.routes)} route(s): "
f"{', '.join(r.host for r in self.config.routes)}"
f"{' [log=on]' if self.config.log else ''}\n"
)
def _install_sighup(self) -> None:
@@ -68,7 +70,7 @@ class EgressAddon:
def _serve_introspection(self, flow: http.HTTPFlow, path: str) -> None:
if path == "/allowlist":
payload = json.dumps(
{"routes": [dataclasses.asdict(r) for r in self.routes]},
{"routes": [dataclasses.asdict(r) for r in self.config.routes]},
indent=2,
).encode("utf-8")
flow.response = http.Response.make(
@@ -90,6 +92,31 @@ class EgressAddon:
{"Content-Type": "text/plain; charset=utf-8"},
)
def _log_request(self, flow: http.HTTPFlow) -> None:
sys.stderr.write(
json.dumps({
"event": "egress_request",
"host": flow.request.pretty_host,
"method": flow.request.method,
"path": flow.request.path,
"headers": dict(flow.request.headers),
"body": flow.request.get_text(strict=False) or "",
})
+ "\n"
)
def _log_response(self, flow: http.HTTPFlow) -> None:
sys.stderr.write(
json.dumps({
"event": "egress_response",
"host": flow.request.pretty_host,
"status": flow.response.status_code,
"headers": dict(flow.response.headers),
"body": flow.response.get_text(strict=False) or "",
})
+ "\n"
)
def request(self, flow: http.HTTPFlow) -> None:
request_path, _, query = flow.request.path.partition("?")
@@ -99,7 +126,7 @@ class EgressAddon:
# DLP outbound scan BEFORE stripping auth — catches tokens the
# agent tried to smuggle in the Authorization header.
route = match_route(self.routes, flow.request.pretty_host)
route = match_route(self.config.routes, flow.request.pretty_host)
if route is not None:
body = flow.request.get_text(strict=False) or ""
auth_header = flow.request.headers.get("authorization", "")
@@ -127,7 +154,7 @@ class EgressAddon:
req_headers = {k.lower(): v for k, v in flow.request.headers.items()}
decision = decide(
self.routes,
self.config.routes,
flow.request.pretty_host,
request_path,
os.environ,
@@ -142,13 +169,18 @@ class EgressAddon:
if decision.inject_authorization is not None:
flow.request.headers["authorization"] = decision.inject_authorization
if self.config.log:
self._log_request(flow)
def response(self, flow: http.HTTPFlow) -> None:
"""DLP inbound scan on response bodies (PRD 0053)."""
route = match_route(self.routes, flow.request.pretty_host)
route = match_route(self.config.routes, flow.request.pretty_host)
if route is None:
return
if flow.response is None:
return
if self.config.log:
self._log_response(flow)
body = flow.response.get_text(strict=False) or ""
if not body:
return
+32
View File
@@ -70,6 +70,12 @@ class Route:
inbound_detectors: tuple[str, ...] | None = None
@dataclass(frozen=True)
class Config:
routes: tuple[Route, ...]
log: bool = False
@dataclass(frozen=True)
class Decision:
action: str # "forward" or "block"
@@ -334,6 +340,29 @@ def load_routes(text: str) -> tuple[Route, ...]:
return parse_routes(payload)
def parse_config(payload: object) -> "Config":
"""Parse a full egress config payload (top-level log flag + routes)."""
if not isinstance(payload, dict):
raise ValueError("routes payload: top-level must be an object")
payload_dict: dict[str, object] = typing.cast(dict[str, object], payload)
log_raw: object = payload_dict.get("log", False)
if not isinstance(log_raw, bool):
raise ValueError("routes payload: 'log' must be true or false")
routes = parse_routes(payload)
return Config(routes=routes, log=log_raw)
def load_config(text: str) -> "Config":
"""Parse YAML text → Config (routes + log flag)."""
try:
payload = parse_yaml_subset(text)
except YamlSubsetError as e:
raise ValueError(f"routes payload: invalid YAML: {e}") from e
return parse_config(payload)
# ---------------------------------------------------------------------------
# Match evaluation
# ---------------------------------------------------------------------------
@@ -535,6 +564,7 @@ def scan_inbound(
__all__ = [
"Config",
"Decision",
"HeaderMatch",
"MatchEntry",
@@ -544,8 +574,10 @@ __all__ = [
"decide",
"evaluate_matches",
"is_git_push_request",
"load_config",
"load_routes",
"match_route",
"parse_config",
"parse_routes",
"scan_inbound",
"scan_outbound",
+9 -3
View File
@@ -349,6 +349,7 @@ def _parse_dlp_block(
@dataclass(frozen=True)
class EgressConfig:
routes: tuple[EgressRoute, ...] = ()
Log: bool = False
@classmethod
def from_dict(cls, bottle_name: str, raw: object) -> "EgressConfig":
@@ -367,10 +368,15 @@ class EgressConfig:
for i, entry in enumerate(routes_list)
)
validate_egress_routes(bottle_name, routes)
log_raw = d.get("log", False)
if not isinstance(log_raw, bool):
raise ManifestError(
f"bottle '{bottle_name}' egress.log must be true or false"
)
for k in d:
if k != "routes":
if k not in ("routes", "log"):
raise ManifestError(
f"bottle '{bottle_name}' egress has unknown key {k!r}; "
f"only 'routes' is accepted"
f"accepted keys are 'routes', 'log'"
)
return cls(routes=routes)
return cls(routes=routes, Log=log_raw)
+38
View File
@@ -324,6 +324,44 @@ class TestRenderRoutes(unittest.TestCase):
self.assertEqual(("token_patterns",), addon_routes[0].outbound_detectors)
self.assertEqual((), addon_routes[0].inbound_detectors)
def test_log_false_omitted_from_render(self):
b = _bottle([{"host": "x.example"}])
routes = egress_routes_for_bottle(b)
rendered = egress_render_routes(routes, log=False)
self.assertNotIn("log:", rendered)
def test_log_true_emitted_at_top_level(self):
b = _bottle([{"host": "x.example"}])
routes = egress_routes_for_bottle(b)
rendered = egress_render_routes(routes, log=True)
self.assertTrue(rendered.startswith("log: true\n"))
def test_log_true_round_trips_to_addon_core(self):
from bot_bottle.egress_addon_core import load_config
b = _bottle([{"host": "x.example"}])
routes = egress_routes_for_bottle(b)
rendered = egress_render_routes(routes, log=True)
cfg = load_config(rendered)
self.assertTrue(cfg.log)
self.assertEqual("x.example", cfg.routes[0].host)
def test_log_via_manifest_flows_to_render(self):
from bot_bottle.manifest import Manifest
from bot_bottle.egress_addon_core import load_config
m = Manifest.from_json_obj({
"bottles": {"dev": {"egress": {
"log": True,
"routes": [{"host": "x.example"}],
}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
})
bottle = m.bottles["dev"]
self.assertTrue(bottle.egress.Log)
routes = egress_routes_for_bottle(bottle)
rendered = egress_render_routes(routes, log=bottle.egress.Log)
cfg = load_config(rendered)
self.assertTrue(cfg.log)
class TestResolveTokenValues(unittest.TestCase):
def test_reads_host_env(self):
+31
View File
@@ -13,6 +13,7 @@ from pathlib import Path
from urllib.parse import urlsplit
from bot_bottle.egress_addon_core import (
Config,
Decision,
HeaderMatch,
MatchEntry,
@@ -21,8 +22,10 @@ from bot_bottle.egress_addon_core import (
decide,
evaluate_matches,
is_git_push_request,
load_config,
load_routes,
match_route,
parse_config,
parse_routes,
)
@@ -271,6 +274,34 @@ class TestLoadRoutes(unittest.TestCase):
load_routes("routes:\n\t- host: x\n")
# --- load_config / parse_config ------------------------------------------
class TestLoadConfig(unittest.TestCase):
def test_log_defaults_to_false(self):
cfg = load_config('routes:\n - host: "api.example"\n')
self.assertFalse(cfg.log)
self.assertEqual(1, len(cfg.routes))
def test_log_true_parsed(self):
cfg = load_config('log: true\nroutes:\n - host: "api.example"\n')
self.assertTrue(cfg.log)
self.assertEqual(1, len(cfg.routes))
def test_log_false_explicit(self):
cfg = load_config('log: false\nroutes:\n - host: "api.example"\n')
self.assertFalse(cfg.log)
def test_log_non_bool_rejected(self):
with self.assertRaises(ValueError):
load_config('log: "yes"\nroutes: []\n')
def test_routes_accessible_via_config(self):
cfg = load_config('routes:\n - host: "x.example"\n')
self.assertIsInstance(cfg, Config)
self.assertEqual("x.example", cfg.routes[0].host)
# --- evaluate_matches ---------------------------------------------------
+19
View File
@@ -346,6 +346,25 @@ class TestConfigShape(unittest.TestCase):
"bottle": "dev"}},
})
def test_log_defaults_false(self):
b = _bottle([])
self.assertFalse(b.egress.Log)
def test_log_true_accepted(self):
b = Manifest.from_json_obj({
"bottles": {"dev": {"egress": {"log": True, "routes": []}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
}).bottles["dev"]
self.assertTrue(b.egress.Log)
def test_log_non_bool_rejected(self):
with self.assertRaises(ManifestError):
Manifest.from_json_obj({
"bottles": {"dev": {"egress": {"log": "yes"}}},
"agents": {"demo": {"skills": [], "prompt": "",
"bottle": "dev"}},
})
if __name__ == "__main__":
unittest.main()