From 11cf12188dd8f66dc304c75eecab868e1e4b12e2 Mon Sep 17 00:00:00 2001 From: claude Date: Thu, 25 Jun 2026 01:02:44 +0000 Subject: [PATCH] feat(egress): inject per-session canary token into sidecar and agent environments EgressPlan gains a `canary: str` field (default "") populated in Egress.prepare() using secrets.token_urlsafe(32). Each launched bottle: - sidecar receives EGRESS_TOKEN_CANARY= (literal env entry, scanned by existing known-secrets detector without any detector code changes) - agent receives BOT_BOTTLE_CANARY= (visible fake secret that signals exfiltration with zero false positives if it appears in outbound traffic) Docker compose and macos-container backends updated; smolmachines shares docker compose and so picks this up automatically. Unit tests cover canary uniqueness, detection via scan_known_secrets, and EgressPlan backward-compat default. Co-Authored-By: Claude Sonnet 4.6 --- bot_bottle/backend/docker/compose.py | 8 +++ bot_bottle/backend/macos_container/launch.py | 4 ++ bot_bottle/egress.py | 7 +++ tests/unit/test_egress.py | 65 +++++++++++++++++++- 4 files changed, 83 insertions(+), 1 deletion(-) diff --git a/bot_bottle/backend/docker/compose.py b/bot_bottle/backend/docker/compose.py index 9ad0011..121fc02 100644 --- a/bot_bottle/backend/docker/compose.py +++ b/bot_bottle/backend/docker/compose.py @@ -137,6 +137,10 @@ def _sidecar_bundle_service(plan: DockerBottlePlan) -> dict[str, Any]: volumes.append(_bind(ep.routes_path.parent, str(Path(EGRESS_ROUTES_IN_CONTAINER).parent))) for token_env in sorted(ep.token_env_map.keys()): env.append(token_env) + if ep.canary: + # Inject canary as a literal NAME=VALUE (not a bare name) — the + # value is a fake secret so it need not be hidden from the compose file. + env.append(f"EGRESS_TOKEN_CANARY={ep.canary}") # --- git-gate ----------------------------------------------------- gp = plan.git_gate_plan @@ -220,6 +224,10 @@ def _agent_service(plan: DockerBottlePlan) -> dict[str, Any]: # never lands on argv or in the compose file. for name in sorted(plan.forwarded_env.keys()): env.append(name) + # Canary token: visible to the agent as a fake secret so that any + # outbound appearance of this value is a zero-FP exfil signal. + if plan.egress_plan.canary: + env.append(f"BOT_BOTTLE_CANARY={plan.egress_plan.canary}") service: dict[str, Any] = { "image": plan.image, diff --git a/bot_bottle/backend/macos_container/launch.py b/bot_bottle/backend/macos_container/launch.py index 8de4385..9c0f145 100644 --- a/bot_bottle/backend/macos_container/launch.py +++ b/bot_bottle/backend/macos_container/launch.py @@ -353,6 +353,8 @@ def _sidecar_env_entries(plan: MacosContainerBottlePlan) -> tuple[str, ...]: env: list[str] = [] if plan.egress_plan.routes: env.extend(sorted(plan.egress_plan.token_env_map.keys())) + if plan.egress_plan.canary: + env.append(f"EGRESS_TOKEN_CANARY={plan.egress_plan.canary}") if plan.git_gate_plan.upstreams: env.append(f"BOT_BOTTLE_GIT_GATE_READY_FILE={_GIT_GATE_READY_FILE}") if plan.supervise_plan is not None: @@ -420,6 +422,8 @@ def _agent_env_entries( env.append(f"{name}={value}") for name in sorted(plan.forwarded_env.keys()): env.append(name) + if plan.egress_plan.canary: + env.append(f"BOT_BOTTLE_CANARY={plan.egress_plan.canary}") return tuple(env) diff --git a/bot_bottle/egress.py b/bot_bottle/egress.py index 8943049..ea10183 100644 --- a/bot_bottle/egress.py +++ b/bot_bottle/egress.py @@ -10,6 +10,7 @@ specific and lives on concrete subclasses (see from __future__ import annotations import dataclasses +import secrets from abc import ABC from dataclasses import dataclass from pathlib import Path @@ -65,6 +66,7 @@ class EgressPlan: mitmproxy_ca_host_path: Path = Path() mitmproxy_ca_cert_only_host_path: Path = Path() log: int = 0 + canary: str = "" def egress_manifest_routes( @@ -324,12 +326,17 @@ class Egress(ABC): routes_path = stage_dir / EGRESS_ROUTES_FILENAME routes_path.write_text(egress_render_routes(routes, log=log)) routes_path.chmod(0o600) + # Generate a per-session canary token. The sidecar receives it as + # EGRESS_TOKEN_CANARY (scanned by the existing known-secrets detector); + # the agent receives it as BOT_BOTTLE_CANARY (a visible fake secret). + canary = secrets.token_urlsafe(32) return EgressPlan( slug=slug, routes_path=routes_path, routes=routes, token_env_map=egress_token_env_map(routes), log=log, + canary=canary, ) __all__ = [ diff --git a/tests/unit/test_egress.py b/tests/unit/test_egress.py index 4fdae02..73d4c9a 100644 --- a/tests/unit/test_egress.py +++ b/tests/unit/test_egress.py @@ -1,10 +1,14 @@ """Unit: Egress route lift + routes.yaml render + token -resolution (PRD 0017, PRD 0053).""" +resolution (PRD 0017, PRD 0053, prd-new).""" +import tempfile import unittest +from pathlib import Path from bot_bottle.egress import ( CODEX_HOST_CREDENTIAL_TOKEN_REF, + Egress, + EgressPlan, EgressRoute, egress_manifest_routes, egress_render_routes, @@ -443,5 +447,64 @@ class TestResolveTokenValues(unittest.TestCase): self.assertEqual({"EGRESS_TOKEN_0": "codex-access-token"}, out) +class TestCanaryGeneration(unittest.TestCase): + """Egress.prepare() generates a unique canary token per session (prd-new).""" + + def _bottle_obj(self): + return ManifestIndex.from_json_obj({ + "bottles": {"dev": {"egress": {"routes": []}}}, + "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, + }).bottles["dev"] + + def _make_plan(self) -> EgressPlan: + # Use a concrete no-op subclass so we can call prepare() without + # a real backend. + class _TestEgress(Egress): + pass + + e = _TestEgress() + with tempfile.TemporaryDirectory() as td: + return e.prepare(self._bottle_obj(), "test-slug", Path(td)) + + def test_canary_is_non_empty(self): + plan = self._make_plan() + self.assertIsInstance(plan.canary, str) + self.assertGreater(len(plan.canary), 0) + + def test_canary_is_unique_per_session(self): + with tempfile.TemporaryDirectory() as td: + bottle = self._bottle_obj() + + class _TestEgress(Egress): + pass + + e = _TestEgress() + plan_a = e.prepare(bottle, "slug-a", Path(td)) + plan_b = e.prepare(bottle, "slug-b", Path(td)) + self.assertNotEqual(plan_a.canary, plan_b.canary) + + def test_canary_detected_by_scan_known_secrets(self): + from bot_bottle.dlp_detectors import scan_known_secrets + + plan = self._make_plan() + env = {"EGRESS_TOKEN_CANARY": plan.canary} + result = scan_known_secrets(f"exfil={plan.canary}", env=env) + self.assertIsNotNone(result) + assert result is not None + self.assertEqual("block", result.severity) + self.assertIn("EGRESS_TOKEN_CANARY", result.reason) + + def test_egress_plan_canary_field_default_empty(self): + # Verify EgressPlan can be constructed with an empty canary (backward compat). + from pathlib import Path + plan = EgressPlan( + slug="s", + routes_path=Path("/tmp/r.yaml"), + routes=(), + token_env_map={}, + ) + self.assertEqual("", plan.canary) + + if __name__ == "__main__": unittest.main()