feat(egress): inject per-session canary token into sidecar and agent environments
EgressPlan gains a `canary: str` field (default "") populated in Egress.prepare()
using secrets.token_urlsafe(32). Each launched bottle:
- sidecar receives EGRESS_TOKEN_CANARY=<value> (literal env entry, scanned by
existing known-secrets detector without any detector code changes)
- agent receives BOT_BOTTLE_CANARY=<value> (visible fake secret that signals
exfiltration with zero false positives if it appears in outbound traffic)
Docker compose and macos-container backends updated; smolmachines shares docker
compose and so picks this up automatically. Unit tests cover canary uniqueness,
detection via scan_known_secrets, and EgressPlan backward-compat default.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -137,6 +137,10 @@ def _sidecar_bundle_service(plan: DockerBottlePlan) -> dict[str, Any]:
|
|||||||
volumes.append(_bind(ep.routes_path.parent, str(Path(EGRESS_ROUTES_IN_CONTAINER).parent)))
|
volumes.append(_bind(ep.routes_path.parent, str(Path(EGRESS_ROUTES_IN_CONTAINER).parent)))
|
||||||
for token_env in sorted(ep.token_env_map.keys()):
|
for token_env in sorted(ep.token_env_map.keys()):
|
||||||
env.append(token_env)
|
env.append(token_env)
|
||||||
|
if ep.canary:
|
||||||
|
# Inject canary as a literal NAME=VALUE (not a bare name) — the
|
||||||
|
# value is a fake secret so it need not be hidden from the compose file.
|
||||||
|
env.append(f"EGRESS_TOKEN_CANARY={ep.canary}")
|
||||||
|
|
||||||
# --- git-gate -----------------------------------------------------
|
# --- git-gate -----------------------------------------------------
|
||||||
gp = plan.git_gate_plan
|
gp = plan.git_gate_plan
|
||||||
@@ -220,6 +224,10 @@ def _agent_service(plan: DockerBottlePlan) -> dict[str, Any]:
|
|||||||
# never lands on argv or in the compose file.
|
# never lands on argv or in the compose file.
|
||||||
for name in sorted(plan.forwarded_env.keys()):
|
for name in sorted(plan.forwarded_env.keys()):
|
||||||
env.append(name)
|
env.append(name)
|
||||||
|
# Canary token: visible to the agent as a fake secret so that any
|
||||||
|
# outbound appearance of this value is a zero-FP exfil signal.
|
||||||
|
if plan.egress_plan.canary:
|
||||||
|
env.append(f"BOT_BOTTLE_CANARY={plan.egress_plan.canary}")
|
||||||
|
|
||||||
service: dict[str, Any] = {
|
service: dict[str, Any] = {
|
||||||
"image": plan.image,
|
"image": plan.image,
|
||||||
|
|||||||
@@ -353,6 +353,8 @@ def _sidecar_env_entries(plan: MacosContainerBottlePlan) -> tuple[str, ...]:
|
|||||||
env: list[str] = []
|
env: list[str] = []
|
||||||
if plan.egress_plan.routes:
|
if plan.egress_plan.routes:
|
||||||
env.extend(sorted(plan.egress_plan.token_env_map.keys()))
|
env.extend(sorted(plan.egress_plan.token_env_map.keys()))
|
||||||
|
if plan.egress_plan.canary:
|
||||||
|
env.append(f"EGRESS_TOKEN_CANARY={plan.egress_plan.canary}")
|
||||||
if plan.git_gate_plan.upstreams:
|
if plan.git_gate_plan.upstreams:
|
||||||
env.append(f"BOT_BOTTLE_GIT_GATE_READY_FILE={_GIT_GATE_READY_FILE}")
|
env.append(f"BOT_BOTTLE_GIT_GATE_READY_FILE={_GIT_GATE_READY_FILE}")
|
||||||
if plan.supervise_plan is not None:
|
if plan.supervise_plan is not None:
|
||||||
@@ -420,6 +422,8 @@ def _agent_env_entries(
|
|||||||
env.append(f"{name}={value}")
|
env.append(f"{name}={value}")
|
||||||
for name in sorted(plan.forwarded_env.keys()):
|
for name in sorted(plan.forwarded_env.keys()):
|
||||||
env.append(name)
|
env.append(name)
|
||||||
|
if plan.egress_plan.canary:
|
||||||
|
env.append(f"BOT_BOTTLE_CANARY={plan.egress_plan.canary}")
|
||||||
return tuple(env)
|
return tuple(env)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ specific and lives on concrete subclasses (see
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import dataclasses
|
import dataclasses
|
||||||
|
import secrets
|
||||||
from abc import ABC
|
from abc import ABC
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
@@ -64,6 +65,7 @@ class EgressPlan:
|
|||||||
mitmproxy_ca_host_path: Path = Path()
|
mitmproxy_ca_host_path: Path = Path()
|
||||||
mitmproxy_ca_cert_only_host_path: Path = Path()
|
mitmproxy_ca_cert_only_host_path: Path = Path()
|
||||||
log: int = 0
|
log: int = 0
|
||||||
|
canary: str = ""
|
||||||
|
|
||||||
|
|
||||||
def egress_manifest_routes(
|
def egress_manifest_routes(
|
||||||
@@ -299,12 +301,17 @@ class Egress(ABC):
|
|||||||
routes_path = stage_dir / EGRESS_ROUTES_FILENAME
|
routes_path = stage_dir / EGRESS_ROUTES_FILENAME
|
||||||
routes_path.write_text(egress_render_routes(routes, log=log))
|
routes_path.write_text(egress_render_routes(routes, log=log))
|
||||||
routes_path.chmod(0o600)
|
routes_path.chmod(0o600)
|
||||||
|
# Generate a per-session canary token. The sidecar receives it as
|
||||||
|
# EGRESS_TOKEN_CANARY (scanned by the existing known-secrets detector);
|
||||||
|
# the agent receives it as BOT_BOTTLE_CANARY (a visible fake secret).
|
||||||
|
canary = secrets.token_urlsafe(32)
|
||||||
return EgressPlan(
|
return EgressPlan(
|
||||||
slug=slug,
|
slug=slug,
|
||||||
routes_path=routes_path,
|
routes_path=routes_path,
|
||||||
routes=routes,
|
routes=routes,
|
||||||
token_env_map=egress_token_env_map(routes),
|
token_env_map=egress_token_env_map(routes),
|
||||||
log=log,
|
log=log,
|
||||||
|
canary=canary,
|
||||||
)
|
)
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
|
|||||||
@@ -1,10 +1,14 @@
|
|||||||
"""Unit: Egress route lift + routes.yaml render + token
|
"""Unit: Egress route lift + routes.yaml render + token
|
||||||
resolution (PRD 0017, PRD 0053)."""
|
resolution (PRD 0017, PRD 0053, prd-new)."""
|
||||||
|
|
||||||
|
import tempfile
|
||||||
import unittest
|
import unittest
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
from bot_bottle.egress import (
|
from bot_bottle.egress import (
|
||||||
CODEX_HOST_CREDENTIAL_TOKEN_REF,
|
CODEX_HOST_CREDENTIAL_TOKEN_REF,
|
||||||
|
Egress,
|
||||||
|
EgressPlan,
|
||||||
EgressRoute,
|
EgressRoute,
|
||||||
egress_manifest_routes,
|
egress_manifest_routes,
|
||||||
egress_render_routes,
|
egress_render_routes,
|
||||||
@@ -409,5 +413,64 @@ class TestResolveTokenValues(unittest.TestCase):
|
|||||||
self.assertEqual({"EGRESS_TOKEN_0": "codex-access-token"}, out)
|
self.assertEqual({"EGRESS_TOKEN_0": "codex-access-token"}, out)
|
||||||
|
|
||||||
|
|
||||||
|
class TestCanaryGeneration(unittest.TestCase):
|
||||||
|
"""Egress.prepare() generates a unique canary token per session (prd-new)."""
|
||||||
|
|
||||||
|
def _bottle_obj(self):
|
||||||
|
return ManifestIndex.from_json_obj({
|
||||||
|
"bottles": {"dev": {"egress": {"routes": []}}},
|
||||||
|
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
|
||||||
|
}).bottles["dev"]
|
||||||
|
|
||||||
|
def _make_plan(self) -> EgressPlan:
|
||||||
|
# Use a concrete no-op subclass so we can call prepare() without
|
||||||
|
# a real backend.
|
||||||
|
class _TestEgress(Egress):
|
||||||
|
pass
|
||||||
|
|
||||||
|
e = _TestEgress()
|
||||||
|
with tempfile.TemporaryDirectory() as td:
|
||||||
|
return e.prepare(self._bottle_obj(), "test-slug", Path(td))
|
||||||
|
|
||||||
|
def test_canary_is_non_empty(self):
|
||||||
|
plan = self._make_plan()
|
||||||
|
self.assertIsInstance(plan.canary, str)
|
||||||
|
self.assertGreater(len(plan.canary), 0)
|
||||||
|
|
||||||
|
def test_canary_is_unique_per_session(self):
|
||||||
|
with tempfile.TemporaryDirectory() as td:
|
||||||
|
bottle = self._bottle_obj()
|
||||||
|
|
||||||
|
class _TestEgress(Egress):
|
||||||
|
pass
|
||||||
|
|
||||||
|
e = _TestEgress()
|
||||||
|
plan_a = e.prepare(bottle, "slug-a", Path(td))
|
||||||
|
plan_b = e.prepare(bottle, "slug-b", Path(td))
|
||||||
|
self.assertNotEqual(plan_a.canary, plan_b.canary)
|
||||||
|
|
||||||
|
def test_canary_detected_by_scan_known_secrets(self):
|
||||||
|
from bot_bottle.dlp_detectors import scan_known_secrets
|
||||||
|
|
||||||
|
plan = self._make_plan()
|
||||||
|
env = {"EGRESS_TOKEN_CANARY": plan.canary}
|
||||||
|
result = scan_known_secrets(f"exfil={plan.canary}", env=env)
|
||||||
|
self.assertIsNotNone(result)
|
||||||
|
assert result is not None
|
||||||
|
self.assertEqual("block", result.severity)
|
||||||
|
self.assertIn("EGRESS_TOKEN_CANARY", result.reason)
|
||||||
|
|
||||||
|
def test_egress_plan_canary_field_default_empty(self):
|
||||||
|
# Verify EgressPlan can be constructed with an empty canary (backward compat).
|
||||||
|
from pathlib import Path
|
||||||
|
plan = EgressPlan(
|
||||||
|
slug="s",
|
||||||
|
routes_path=Path("/tmp/r.yaml"),
|
||||||
|
routes=(),
|
||||||
|
token_env_map={},
|
||||||
|
)
|
||||||
|
self.assertEqual("", plan.canary)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|||||||
Reference in New Issue
Block a user