diff --git a/bot_bottle/backend/docker/compose.py b/bot_bottle/backend/docker/compose.py index 121fc02..2806509 100644 --- a/bot_bottle/backend/docker/compose.py +++ b/bot_bottle/backend/docker/compose.py @@ -137,10 +137,11 @@ def _sidecar_bundle_service(plan: DockerBottlePlan) -> dict[str, Any]: volumes.append(_bind(ep.routes_path.parent, str(Path(EGRESS_ROUTES_IN_CONTAINER).parent))) for token_env in sorted(ep.token_env_map.keys()): env.append(token_env) - if ep.canary: - # Inject canary as a literal NAME=VALUE (not a bare name) — the - # value is a fake secret so it need not be hidden from the compose file. - env.append(f"EGRESS_TOKEN_CANARY={ep.canary}") + if ep.canary and ep.canary_env: + # Inject the fake secret as a literal NAME=VALUE (not a bare name) and + # tell the sidecar to scan that exact env var as sensitive. + env.append(f"{ep.canary_env}={ep.canary}") + env.append(f"BOT_BOTTLE_SENSITIVE_PREFIXES={ep.canary_env}") # --- git-gate ----------------------------------------------------- gp = plan.git_gate_plan @@ -226,8 +227,8 @@ def _agent_service(plan: DockerBottlePlan) -> dict[str, Any]: env.append(name) # Canary token: visible to the agent as a fake secret so that any # outbound appearance of this value is a zero-FP exfil signal. - if plan.egress_plan.canary: - env.append(f"BOT_BOTTLE_CANARY={plan.egress_plan.canary}") + if plan.egress_plan.canary and plan.egress_plan.canary_env: + env.append(f"{plan.egress_plan.canary_env}={plan.egress_plan.canary}") service: dict[str, Any] = { "image": plan.image, diff --git a/bot_bottle/backend/macos_container/launch.py b/bot_bottle/backend/macos_container/launch.py index 9c0f145..26c8d17 100644 --- a/bot_bottle/backend/macos_container/launch.py +++ b/bot_bottle/backend/macos_container/launch.py @@ -353,8 +353,9 @@ def _sidecar_env_entries(plan: MacosContainerBottlePlan) -> tuple[str, ...]: env: list[str] = [] if plan.egress_plan.routes: env.extend(sorted(plan.egress_plan.token_env_map.keys())) - if plan.egress_plan.canary: - env.append(f"EGRESS_TOKEN_CANARY={plan.egress_plan.canary}") + if plan.egress_plan.canary and plan.egress_plan.canary_env: + env.append(f"{plan.egress_plan.canary_env}={plan.egress_plan.canary}") + env.append(f"BOT_BOTTLE_SENSITIVE_PREFIXES={plan.egress_plan.canary_env}") if plan.git_gate_plan.upstreams: env.append(f"BOT_BOTTLE_GIT_GATE_READY_FILE={_GIT_GATE_READY_FILE}") if plan.supervise_plan is not None: @@ -422,8 +423,8 @@ def _agent_env_entries( env.append(f"{name}={value}") for name in sorted(plan.forwarded_env.keys()): env.append(name) - if plan.egress_plan.canary: - env.append(f"BOT_BOTTLE_CANARY={plan.egress_plan.canary}") + if plan.egress_plan.canary and plan.egress_plan.canary_env: + env.append(f"{plan.egress_plan.canary_env}={plan.egress_plan.canary}") return tuple(env) diff --git a/bot_bottle/egress.py b/bot_bottle/egress.py index ea10183..cd78790 100644 --- a/bot_bottle/egress.py +++ b/bot_bottle/egress.py @@ -35,6 +35,32 @@ EGRESS_HOSTNAME = "egress" EGRESS_ROUTES_IN_CONTAINER = "/etc/egress/routes.yaml" EGRESS_ROUTES_FILENAME = Path(EGRESS_ROUTES_IN_CONTAINER).name +_CANARY_ENV_WORDS = ( + "ACCORD", + "ANCHOR", + "ATLAS", + "CANON", + "CIPHER", + "EMBER", + "FALCON", + "HARBOR", + "LANTERN", + "MARBLE", + "NOVA", + "ORBIT", + "PIVOT", + "RADIUS", + "SUMMIT", + "VECTOR", +) + + +def _random_canary_env() -> str: + first = secrets.choice(_CANARY_ENV_WORDS) + remaining = tuple(word for word in _CANARY_ENV_WORDS if word != first) + second = secrets.choice(remaining) + return f"{first}_{second}_SECRET" + @dataclass(frozen=True) class EgressRoute(Route): @@ -67,6 +93,7 @@ class EgressPlan: mitmproxy_ca_cert_only_host_path: Path = Path() log: int = 0 canary: str = "" + canary_env: str = "" def egress_manifest_routes( @@ -326,9 +353,9 @@ class Egress(ABC): routes_path = stage_dir / EGRESS_ROUTES_FILENAME routes_path.write_text(egress_render_routes(routes, log=log)) routes_path.chmod(0o600) - # Generate a per-session canary token. The sidecar receives it as - # EGRESS_TOKEN_CANARY (scanned by the existing known-secrets detector); - # the agent receives it as BOT_BOTTLE_CANARY (a visible fake secret). + # Generate a per-session fake secret under a plausible random env name. + # The sidecar marks that exact env name as sensitive for known-secret + # scanning; the agent receives the same name/value as exfil bait. canary = secrets.token_urlsafe(32) return EgressPlan( slug=slug, @@ -337,6 +364,7 @@ class Egress(ABC): token_env_map=egress_token_env_map(routes), log=log, canary=canary, + canary_env=_random_canary_env(), ) __all__ = [ diff --git a/docs/prds/prd-new-strengthen-outbound-exfil-detection.md b/docs/prds/prd-new-strengthen-outbound-exfil-detection.md index 7cf49d5..490e519 100644 --- a/docs/prds/prd-new-strengthen-outbound-exfil-detection.md +++ b/docs/prds/prd-new-strengthen-outbound-exfil-detection.md @@ -37,9 +37,10 @@ query, headers, body). But the content-based strong tier only matches ## Goals / Success Criteria 1. Each launched bottle has a unique canary token in the agent's environment - (`BOT_BOTTLE_CANARY`) and the egress sidecar's environment - (`EGRESS_TOKEN_CANARY`). Any outbound appearance of the canary blocks the - request with reason `"canary token"`. + under a randomized `WORD_WORD_SECRET` env var name. The egress sidecar gets + the same env var and registers that exact name through + `BOT_BOTTLE_SENSITIVE_PREFIXES`. Any outbound appearance of the canary + blocks the request as a known-secret match. 2. `scan_known_secrets` accepts a `sensitive_prefixes` parameter (default: `("EGRESS_TOKEN_",)`). `scan_outbound` reads `BOT_BOTTLE_SENSITIVE_PREFIXES` from `environ` and merges those prefixes @@ -77,18 +78,20 @@ query, headers, body). But the content-based strong tier only matches ``` Egress.prepare() canary = secrets.token_urlsafe(32) - EgressPlan(canary=canary, ...) + canary_env = + EgressPlan(canary=canary, canary_env=canary_env, ...) Docker compose render: - sidecar env: EGRESS_TOKEN_CANARY= ← scanned by existing known-secrets detector - agent env: BOT_BOTTLE_CANARY= ← visible to agent as a "secret" + sidecar env: = + sidecar env: BOT_BOTTLE_SENSITIVE_PREFIXES= + agent env: = ← visible to agent as a "secret" macos-container launch: same literals added to sidecar + agent env entries ``` -`EGRESS_TOKEN_CANARY` matches the `EGRESS_TOKEN_` prefix already scanned by -`scan_known_secrets`, so no detector code changes are required for canary -detection — only the injection path. +The sidecar uses `BOT_BOTTLE_SENSITIVE_PREFIXES` to make the random canary env +name part of the existing `scan_known_secrets` detector without adding a +manifest schema field. ### Broadened known-value scanning diff --git a/tests/unit/test_compose.py b/tests/unit/test_compose.py index 8b10eec..0e5d743 100644 --- a/tests/unit/test_compose.py +++ b/tests/unit/test_compose.py @@ -80,7 +80,11 @@ def _git_gate_plan(upstreams: tuple[GitGateUpstream, ...] = ()) -> GitGatePlan: ) -def _egress_plan(routes: tuple[EgressRoute, ...] = ()) -> EgressPlan: +def _egress_plan( + routes: tuple[EgressRoute, ...] = (), + *, + canary: bool = False, +) -> EgressPlan: token_env_map = { r.token_env: r.token_ref for r in routes @@ -95,6 +99,8 @@ def _egress_plan(routes: tuple[EgressRoute, ...] = ()) -> EgressPlan: egress_network=f"bot-bottle-egress-{SLUG}", mitmproxy_ca_host_path=STATE / "egress-ca" / "mitmproxy-ca.pem", mitmproxy_ca_cert_only_host_path=STATE / "egress-ca" / "ca.pem", + canary="fake-canary-value" if canary else "", + canary_env="CANON_ALPHA_SECRET" if canary else "", ) @@ -112,6 +118,7 @@ def _plan( with_git: bool = False, with_egress: bool = False, supervise: bool = False, + canary: bool = False, ) -> DockerBottlePlan: """Build a fully-resolved DockerBottlePlan. Toggles cover the matrix the renderer's conditional-service logic branches on.""" @@ -150,7 +157,7 @@ def _plan( slug=SLUG, forwarded_env={"CLAUDE_CODE_OAUTH_TOKEN": "x"}, git_gate_plan=_git_gate_plan(upstreams), - egress_plan=_egress_plan(routes), + egress_plan=_egress_plan(routes, canary=canary), supervise_plan=_supervise_plan() if supervise else None, use_runsc=False, agent_provision=AgentProvisionPlan( @@ -375,6 +382,20 @@ class TestSidecarBundleShape(unittest.TestCase): env_strings = sc["environment"] self.assertNotIn("EGRESS_TOKEN_0", env_strings) + def test_canary_env_registered_as_sensitive_in_sidecar(self): + sc = self._render(canary=True)["services"]["sidecars"] + env_strings = sc["environment"] + self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", env_strings) + self.assertIn( + "BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET", + env_strings, + ) + + def test_canary_env_visible_to_agent(self): + agent = self._render(canary=True)["services"]["agent"] + env_strings = agent["environment"] + self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", env_strings) + def test_supervise_env_present_when_active(self): sc = self._render(supervise=True)["services"]["sidecars"] env_strings = sc["environment"] diff --git a/tests/unit/test_dlp_detectors.py b/tests/unit/test_dlp_detectors.py index f7f593c..bf6ebd6 100644 --- a/tests/unit/test_dlp_detectors.py +++ b/tests/unit/test_dlp_detectors.py @@ -593,11 +593,15 @@ class TestFragmentationResistantMatching(unittest.TestCase): def test_canary_prefix_detected(self): canary_value = "canary-fake-secret-value-xyz" - env = {"EGRESS_TOKEN_CANARY": canary_value} - result = scan_known_secrets(f"x={canary_value}", env=env) + env = {"CANON_ALPHA_SECRET": canary_value} + result = scan_known_secrets( + f"x={canary_value}", + env=env, + sensitive_prefixes=("CANON_ALPHA_SECRET",), + ) self.assertIsNotNone(result) assert result is not None - self.assertIn("EGRESS_TOKEN_CANARY", result.reason) + self.assertIn("CANON_ALPHA_SECRET", result.reason) class TestRedactTokensBroadenedPrefixes(unittest.TestCase): diff --git a/tests/unit/test_egress.py b/tests/unit/test_egress.py index 73d4c9a..ffcfe38 100644 --- a/tests/unit/test_egress.py +++ b/tests/unit/test_egress.py @@ -470,6 +470,7 @@ class TestCanaryGeneration(unittest.TestCase): plan = self._make_plan() self.assertIsInstance(plan.canary, str) self.assertGreater(len(plan.canary), 0) + self.assertRegex(plan.canary_env, r"^[A-Z]+_[A-Z]+_SECRET$") def test_canary_is_unique_per_session(self): with tempfile.TemporaryDirectory() as td: @@ -487,12 +488,16 @@ class TestCanaryGeneration(unittest.TestCase): from bot_bottle.dlp_detectors import scan_known_secrets plan = self._make_plan() - env = {"EGRESS_TOKEN_CANARY": plan.canary} - result = scan_known_secrets(f"exfil={plan.canary}", env=env) + env = {plan.canary_env: plan.canary} + result = scan_known_secrets( + f"exfil={plan.canary}", + env=env, + sensitive_prefixes=(plan.canary_env,), + ) self.assertIsNotNone(result) assert result is not None self.assertEqual("block", result.severity) - self.assertIn("EGRESS_TOKEN_CANARY", result.reason) + self.assertIn(plan.canary_env, result.reason) def test_egress_plan_canary_field_default_empty(self): # Verify EgressPlan can be constructed with an empty canary (backward compat). @@ -504,6 +509,7 @@ class TestCanaryGeneration(unittest.TestCase): token_env_map={}, ) self.assertEqual("", plan.canary) + self.assertEqual("", plan.canary_env) if __name__ == "__main__": diff --git a/tests/unit/test_egress_addon_core.py b/tests/unit/test_egress_addon_core.py index 023f0ca..460d018 100644 --- a/tests/unit/test_egress_addon_core.py +++ b/tests/unit/test_egress_addon_core.py @@ -1338,20 +1338,27 @@ class TestScanOutboundEnhanced(unittest.TestCase): result = scan_outbound(self._ROUTE, f"auth={secret}", env) self.assertIsNotNone(result) - def test_canary_detected_via_egress_token_canary(self): - # The canary (injected as EGRESS_TOKEN_CANARY) is caught by known_secrets. + def test_canary_detected_via_random_secret_env_name(self): + # The fake secret uses a randomized env name that the sidecar marks + # as sensitive through BOT_BOTTLE_SENSITIVE_PREFIXES. canary = "canaryvalue12345abcdef" - env = {"EGRESS_TOKEN_CANARY": canary} + env = { + "CANON_ALPHA_SECRET": canary, + "BOT_BOTTLE_SENSITIVE_PREFIXES": "CANON_ALPHA_SECRET", + } result = scan_outbound(self._ROUTE, f"data={canary}", env) self.assertIsNotNone(result) assert result is not None self.assertEqual("block", result.severity) - self.assertIn("EGRESS_TOKEN_CANARY", result.reason) + self.assertIn("CANON_ALPHA_SECRET", result.reason) def test_fragmented_canary_blocked(self): # Canary with separators injected is still caught. canary = "supersecretcanary99" - env = {"EGRESS_TOKEN_CANARY": canary} + env = { + "CANON_ALPHA_SECRET": canary, + "BOT_BOTTLE_SENSITIVE_PREFIXES": "CANON_ALPHA_SECRET", + } fragmented = "-".join(canary) result = scan_outbound(self._ROUTE, f"x={fragmented}", env) self.assertIsNotNone(result) diff --git a/tests/unit/test_macos_container_launch.py b/tests/unit/test_macos_container_launch.py index 4f66005..3e1038e 100644 --- a/tests/unit/test_macos_container_launch.py +++ b/tests/unit/test_macos_container_launch.py @@ -30,6 +30,7 @@ def _plan( supervise: bool = False, agent_git_gate_url: str = "", agent_supervise_url: str = "", + canary: bool = False, ) -> MacosContainerBottlePlan: routes_path = stage_dir / "routes.yaml" routes_path.write_text("routes: []\n", encoding="utf-8") @@ -42,7 +43,8 @@ def _plan( routes_path=routes_path, routes=("route",), token_env_map={"EGRESS_TOKEN_0": "HOST_TOKEN"}, - canary="", + canary="fake-canary-value" if canary else "", + canary_env="CANON_ALPHA_SECRET" if canary else "", ) if git: key_path = stage_dir / "origin-key" @@ -139,6 +141,26 @@ class TestMacosContainerLaunchArgv(unittest.TestCase): argv, ) + def test_sidecar_argv_registers_canary_env_as_sensitive(self): + plan = _plan(stage_dir=self.stage_dir, canary=True) + argv = launch._sidecar_run_argv( + plan, + "bot-bottle-sidecars-dev-abc", + "bot-bottle-net-dev-abc", + "bot-bottle-egress-dev-abc", + ) + self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", argv) + self.assertIn("BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET", argv) + + def test_agent_argv_receives_canary_env(self): + plan = _plan(stage_dir=self.stage_dir, canary=True) + argv = launch._agent_run_argv( + plan, + "bot-bottle-net-dev-abc", + "192.0.2.10", + ) + self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", argv) + def test_agent_env_points_proxy_at_sidecar_ip(self): plan = _plan( stage_dir=self.stage_dir,