feat(egress): inject per-session canary token into sidecar and agent environments
EgressPlan gains a `canary: str` field (default "") populated in Egress.prepare()
using secrets.token_urlsafe(32). Each launched bottle:
- sidecar receives EGRESS_TOKEN_CANARY=<value> (literal env entry, scanned by
existing known-secrets detector without any detector code changes)
- agent receives BOT_BOTTLE_CANARY=<value> (visible fake secret that signals
exfiltration with zero false positives if it appears in outbound traffic)
Docker compose and macos-container backends updated; smolmachines shares docker
compose and so picks this up automatically. Unit tests cover canary uniqueness,
detection via scan_known_secrets, and EgressPlan backward-compat default.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -137,6 +137,10 @@ def _sidecar_bundle_service(plan: DockerBottlePlan) -> dict[str, Any]:
|
||||
volumes.append(_bind(ep.routes_path.parent, str(Path(EGRESS_ROUTES_IN_CONTAINER).parent)))
|
||||
for token_env in sorted(ep.token_env_map.keys()):
|
||||
env.append(token_env)
|
||||
if ep.canary:
|
||||
# Inject canary as a literal NAME=VALUE (not a bare name) — the
|
||||
# value is a fake secret so it need not be hidden from the compose file.
|
||||
env.append(f"EGRESS_TOKEN_CANARY={ep.canary}")
|
||||
|
||||
# --- git-gate -----------------------------------------------------
|
||||
gp = plan.git_gate_plan
|
||||
@@ -220,6 +224,10 @@ def _agent_service(plan: DockerBottlePlan) -> dict[str, Any]:
|
||||
# never lands on argv or in the compose file.
|
||||
for name in sorted(plan.forwarded_env.keys()):
|
||||
env.append(name)
|
||||
# Canary token: visible to the agent as a fake secret so that any
|
||||
# outbound appearance of this value is a zero-FP exfil signal.
|
||||
if plan.egress_plan.canary:
|
||||
env.append(f"BOT_BOTTLE_CANARY={plan.egress_plan.canary}")
|
||||
|
||||
service: dict[str, Any] = {
|
||||
"image": plan.image,
|
||||
|
||||
@@ -353,6 +353,8 @@ def _sidecar_env_entries(plan: MacosContainerBottlePlan) -> tuple[str, ...]:
|
||||
env: list[str] = []
|
||||
if plan.egress_plan.routes:
|
||||
env.extend(sorted(plan.egress_plan.token_env_map.keys()))
|
||||
if plan.egress_plan.canary:
|
||||
env.append(f"EGRESS_TOKEN_CANARY={plan.egress_plan.canary}")
|
||||
if plan.git_gate_plan.upstreams:
|
||||
env.append(f"BOT_BOTTLE_GIT_GATE_READY_FILE={_GIT_GATE_READY_FILE}")
|
||||
if plan.supervise_plan is not None:
|
||||
@@ -420,6 +422,8 @@ def _agent_env_entries(
|
||||
env.append(f"{name}={value}")
|
||||
for name in sorted(plan.forwarded_env.keys()):
|
||||
env.append(name)
|
||||
if plan.egress_plan.canary:
|
||||
env.append(f"BOT_BOTTLE_CANARY={plan.egress_plan.canary}")
|
||||
return tuple(env)
|
||||
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ specific and lives on concrete subclasses (see
|
||||
from __future__ import annotations
|
||||
|
||||
import dataclasses
|
||||
import secrets
|
||||
from abc import ABC
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
@@ -65,6 +66,7 @@ class EgressPlan:
|
||||
mitmproxy_ca_host_path: Path = Path()
|
||||
mitmproxy_ca_cert_only_host_path: Path = Path()
|
||||
log: int = 0
|
||||
canary: str = ""
|
||||
|
||||
|
||||
def egress_manifest_routes(
|
||||
@@ -324,12 +326,17 @@ class Egress(ABC):
|
||||
routes_path = stage_dir / EGRESS_ROUTES_FILENAME
|
||||
routes_path.write_text(egress_render_routes(routes, log=log))
|
||||
routes_path.chmod(0o600)
|
||||
# Generate a per-session canary token. The sidecar receives it as
|
||||
# EGRESS_TOKEN_CANARY (scanned by the existing known-secrets detector);
|
||||
# the agent receives it as BOT_BOTTLE_CANARY (a visible fake secret).
|
||||
canary = secrets.token_urlsafe(32)
|
||||
return EgressPlan(
|
||||
slug=slug,
|
||||
routes_path=routes_path,
|
||||
routes=routes,
|
||||
token_env_map=egress_token_env_map(routes),
|
||||
log=log,
|
||||
canary=canary,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
|
||||
@@ -1,10 +1,14 @@
|
||||
"""Unit: Egress route lift + routes.yaml render + token
|
||||
resolution (PRD 0017, PRD 0053)."""
|
||||
resolution (PRD 0017, PRD 0053, prd-new)."""
|
||||
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
from bot_bottle.egress import (
|
||||
CODEX_HOST_CREDENTIAL_TOKEN_REF,
|
||||
Egress,
|
||||
EgressPlan,
|
||||
EgressRoute,
|
||||
egress_manifest_routes,
|
||||
egress_render_routes,
|
||||
@@ -443,5 +447,64 @@ class TestResolveTokenValues(unittest.TestCase):
|
||||
self.assertEqual({"EGRESS_TOKEN_0": "codex-access-token"}, out)
|
||||
|
||||
|
||||
class TestCanaryGeneration(unittest.TestCase):
|
||||
"""Egress.prepare() generates a unique canary token per session (prd-new)."""
|
||||
|
||||
def _bottle_obj(self):
|
||||
return ManifestIndex.from_json_obj({
|
||||
"bottles": {"dev": {"egress": {"routes": []}}},
|
||||
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
|
||||
}).bottles["dev"]
|
||||
|
||||
def _make_plan(self) -> EgressPlan:
|
||||
# Use a concrete no-op subclass so we can call prepare() without
|
||||
# a real backend.
|
||||
class _TestEgress(Egress):
|
||||
pass
|
||||
|
||||
e = _TestEgress()
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
return e.prepare(self._bottle_obj(), "test-slug", Path(td))
|
||||
|
||||
def test_canary_is_non_empty(self):
|
||||
plan = self._make_plan()
|
||||
self.assertIsInstance(plan.canary, str)
|
||||
self.assertGreater(len(plan.canary), 0)
|
||||
|
||||
def test_canary_is_unique_per_session(self):
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
bottle = self._bottle_obj()
|
||||
|
||||
class _TestEgress(Egress):
|
||||
pass
|
||||
|
||||
e = _TestEgress()
|
||||
plan_a = e.prepare(bottle, "slug-a", Path(td))
|
||||
plan_b = e.prepare(bottle, "slug-b", Path(td))
|
||||
self.assertNotEqual(plan_a.canary, plan_b.canary)
|
||||
|
||||
def test_canary_detected_by_scan_known_secrets(self):
|
||||
from bot_bottle.dlp_detectors import scan_known_secrets
|
||||
|
||||
plan = self._make_plan()
|
||||
env = {"EGRESS_TOKEN_CANARY": plan.canary}
|
||||
result = scan_known_secrets(f"exfil={plan.canary}", env=env)
|
||||
self.assertIsNotNone(result)
|
||||
assert result is not None
|
||||
self.assertEqual("block", result.severity)
|
||||
self.assertIn("EGRESS_TOKEN_CANARY", result.reason)
|
||||
|
||||
def test_egress_plan_canary_field_default_empty(self):
|
||||
# Verify EgressPlan can be constructed with an empty canary (backward compat).
|
||||
from pathlib import Path
|
||||
plan = EgressPlan(
|
||||
slug="s",
|
||||
routes_path=Path("/tmp/r.yaml"),
|
||||
routes=(),
|
||||
token_env_map={},
|
||||
)
|
||||
self.assertEqual("", plan.canary)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user