feat(egress): inject per-session canary token into sidecar and agent environments

EgressPlan gains a `canary: str` field (default "") populated in Egress.prepare()
using secrets.token_urlsafe(32).  Each launched bottle:

  - sidecar receives EGRESS_TOKEN_CANARY=<value> (literal env entry, scanned by
    existing known-secrets detector without any detector code changes)
  - agent receives BOT_BOTTLE_CANARY=<value> (visible fake secret that signals
    exfiltration with zero false positives if it appears in outbound traffic)

Docker compose and macos-container backends updated; smolmachines shares docker
compose and so picks this up automatically.  Unit tests cover canary uniqueness,
detection via scan_known_secrets, and EgressPlan backward-compat default.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-06-25 01:02:44 +00:00
parent aab450f2f7
commit 99236d4a01
4 changed files with 83 additions and 1 deletions
+8
View File
@@ -137,6 +137,10 @@ def _sidecar_bundle_service(plan: DockerBottlePlan) -> dict[str, Any]:
volumes.append(_bind(ep.routes_path.parent, str(Path(EGRESS_ROUTES_IN_CONTAINER).parent)))
for token_env in sorted(ep.token_env_map.keys()):
env.append(token_env)
if ep.canary:
# Inject canary as a literal NAME=VALUE (not a bare name) — the
# value is a fake secret so it need not be hidden from the compose file.
env.append(f"EGRESS_TOKEN_CANARY={ep.canary}")
# --- git-gate -----------------------------------------------------
gp = plan.git_gate_plan
@@ -220,6 +224,10 @@ def _agent_service(plan: DockerBottlePlan) -> dict[str, Any]:
# never lands on argv or in the compose file.
for name in sorted(plan.forwarded_env.keys()):
env.append(name)
# Canary token: visible to the agent as a fake secret so that any
# outbound appearance of this value is a zero-FP exfil signal.
if plan.egress_plan.canary:
env.append(f"BOT_BOTTLE_CANARY={plan.egress_plan.canary}")
service: dict[str, Any] = {
"image": plan.image,
@@ -353,6 +353,8 @@ def _sidecar_env_entries(plan: MacosContainerBottlePlan) -> tuple[str, ...]:
env: list[str] = []
if plan.egress_plan.routes:
env.extend(sorted(plan.egress_plan.token_env_map.keys()))
if plan.egress_plan.canary:
env.append(f"EGRESS_TOKEN_CANARY={plan.egress_plan.canary}")
if plan.git_gate_plan.upstreams:
env.append(f"BOT_BOTTLE_GIT_GATE_READY_FILE={_GIT_GATE_READY_FILE}")
if plan.supervise_plan is not None:
@@ -420,6 +422,8 @@ def _agent_env_entries(
env.append(f"{name}={value}")
for name in sorted(plan.forwarded_env.keys()):
env.append(name)
if plan.egress_plan.canary:
env.append(f"BOT_BOTTLE_CANARY={plan.egress_plan.canary}")
return tuple(env)
+7
View File
@@ -10,6 +10,7 @@ specific and lives on concrete subclasses (see
from __future__ import annotations
import dataclasses
import secrets
from abc import ABC
from dataclasses import dataclass
from pathlib import Path
@@ -65,6 +66,7 @@ class EgressPlan:
mitmproxy_ca_host_path: Path = Path()
mitmproxy_ca_cert_only_host_path: Path = Path()
log: int = 0
canary: str = ""
def egress_manifest_routes(
@@ -324,12 +326,17 @@ class Egress(ABC):
routes_path = stage_dir / EGRESS_ROUTES_FILENAME
routes_path.write_text(egress_render_routes(routes, log=log))
routes_path.chmod(0o600)
# Generate a per-session canary token. The sidecar receives it as
# EGRESS_TOKEN_CANARY (scanned by the existing known-secrets detector);
# the agent receives it as BOT_BOTTLE_CANARY (a visible fake secret).
canary = secrets.token_urlsafe(32)
return EgressPlan(
slug=slug,
routes_path=routes_path,
routes=routes,
token_env_map=egress_token_env_map(routes),
log=log,
canary=canary,
)
__all__ = [
+64 -1
View File
@@ -1,10 +1,14 @@
"""Unit: Egress route lift + routes.yaml render + token
resolution (PRD 0017, PRD 0053)."""
resolution (PRD 0017, PRD 0053, prd-new)."""
import tempfile
import unittest
from pathlib import Path
from bot_bottle.egress import (
CODEX_HOST_CREDENTIAL_TOKEN_REF,
Egress,
EgressPlan,
EgressRoute,
egress_manifest_routes,
egress_render_routes,
@@ -443,5 +447,64 @@ class TestResolveTokenValues(unittest.TestCase):
self.assertEqual({"EGRESS_TOKEN_0": "codex-access-token"}, out)
class TestCanaryGeneration(unittest.TestCase):
"""Egress.prepare() generates a unique canary token per session (prd-new)."""
def _bottle_obj(self):
return ManifestIndex.from_json_obj({
"bottles": {"dev": {"egress": {"routes": []}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
}).bottles["dev"]
def _make_plan(self) -> EgressPlan:
# Use a concrete no-op subclass so we can call prepare() without
# a real backend.
class _TestEgress(Egress):
pass
e = _TestEgress()
with tempfile.TemporaryDirectory() as td:
return e.prepare(self._bottle_obj(), "test-slug", Path(td))
def test_canary_is_non_empty(self):
plan = self._make_plan()
self.assertIsInstance(plan.canary, str)
self.assertGreater(len(plan.canary), 0)
def test_canary_is_unique_per_session(self):
with tempfile.TemporaryDirectory() as td:
bottle = self._bottle_obj()
class _TestEgress(Egress):
pass
e = _TestEgress()
plan_a = e.prepare(bottle, "slug-a", Path(td))
plan_b = e.prepare(bottle, "slug-b", Path(td))
self.assertNotEqual(plan_a.canary, plan_b.canary)
def test_canary_detected_by_scan_known_secrets(self):
from bot_bottle.dlp_detectors import scan_known_secrets
plan = self._make_plan()
env = {"EGRESS_TOKEN_CANARY": plan.canary}
result = scan_known_secrets(f"exfil={plan.canary}", env=env)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual("block", result.severity)
self.assertIn("EGRESS_TOKEN_CANARY", result.reason)
def test_egress_plan_canary_field_default_empty(self):
# Verify EgressPlan can be constructed with an empty canary (backward compat).
from pathlib import Path
plan = EgressPlan(
slug="s",
routes_path=Path("/tmp/r.yaml"),
routes=(),
token_env_map={},
)
self.assertEqual("", plan.canary)
if __name__ == "__main__":
unittest.main()