fix(egress): randomize canary secret env name
This commit is contained in:
@@ -80,7 +80,11 @@ def _git_gate_plan(upstreams: tuple[GitGateUpstream, ...] = ()) -> GitGatePlan:
|
||||
)
|
||||
|
||||
|
||||
def _egress_plan(routes: tuple[EgressRoute, ...] = ()) -> EgressPlan:
|
||||
def _egress_plan(
|
||||
routes: tuple[EgressRoute, ...] = (),
|
||||
*,
|
||||
canary: bool = False,
|
||||
) -> EgressPlan:
|
||||
token_env_map = {
|
||||
r.token_env: r.token_ref
|
||||
for r in routes
|
||||
@@ -95,6 +99,8 @@ def _egress_plan(routes: tuple[EgressRoute, ...] = ()) -> EgressPlan:
|
||||
egress_network=f"bot-bottle-egress-{SLUG}",
|
||||
mitmproxy_ca_host_path=STATE / "egress-ca" / "mitmproxy-ca.pem",
|
||||
mitmproxy_ca_cert_only_host_path=STATE / "egress-ca" / "ca.pem",
|
||||
canary="fake-canary-value" if canary else "",
|
||||
canary_env="CANON_ALPHA_SECRET" if canary else "",
|
||||
)
|
||||
|
||||
|
||||
@@ -112,6 +118,7 @@ def _plan(
|
||||
with_git: bool = False,
|
||||
with_egress: bool = False,
|
||||
supervise: bool = False,
|
||||
canary: bool = False,
|
||||
) -> DockerBottlePlan:
|
||||
"""Build a fully-resolved DockerBottlePlan. Toggles cover the
|
||||
matrix the renderer's conditional-service logic branches on."""
|
||||
@@ -150,7 +157,7 @@ def _plan(
|
||||
slug=SLUG,
|
||||
forwarded_env={"CLAUDE_CODE_OAUTH_TOKEN": "x"},
|
||||
git_gate_plan=_git_gate_plan(upstreams),
|
||||
egress_plan=_egress_plan(routes),
|
||||
egress_plan=_egress_plan(routes, canary=canary),
|
||||
supervise_plan=_supervise_plan() if supervise else None,
|
||||
use_runsc=False,
|
||||
agent_provision=AgentProvisionPlan(
|
||||
@@ -375,6 +382,20 @@ class TestSidecarBundleShape(unittest.TestCase):
|
||||
env_strings = sc["environment"]
|
||||
self.assertNotIn("EGRESS_TOKEN_0", env_strings)
|
||||
|
||||
def test_canary_env_registered_as_sensitive_in_sidecar(self):
|
||||
sc = self._render(canary=True)["services"]["sidecars"]
|
||||
env_strings = sc["environment"]
|
||||
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", env_strings)
|
||||
self.assertIn(
|
||||
"BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET",
|
||||
env_strings,
|
||||
)
|
||||
|
||||
def test_canary_env_visible_to_agent(self):
|
||||
agent = self._render(canary=True)["services"]["agent"]
|
||||
env_strings = agent["environment"]
|
||||
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", env_strings)
|
||||
|
||||
def test_supervise_env_present_when_active(self):
|
||||
sc = self._render(supervise=True)["services"]["sidecars"]
|
||||
env_strings = sc["environment"]
|
||||
|
||||
@@ -593,11 +593,15 @@ class TestFragmentationResistantMatching(unittest.TestCase):
|
||||
|
||||
def test_canary_prefix_detected(self):
|
||||
canary_value = "canary-fake-secret-value-xyz"
|
||||
env = {"EGRESS_TOKEN_CANARY": canary_value}
|
||||
result = scan_known_secrets(f"x={canary_value}", env=env)
|
||||
env = {"CANON_ALPHA_SECRET": canary_value}
|
||||
result = scan_known_secrets(
|
||||
f"x={canary_value}",
|
||||
env=env,
|
||||
sensitive_prefixes=("CANON_ALPHA_SECRET",),
|
||||
)
|
||||
self.assertIsNotNone(result)
|
||||
assert result is not None
|
||||
self.assertIn("EGRESS_TOKEN_CANARY", result.reason)
|
||||
self.assertIn("CANON_ALPHA_SECRET", result.reason)
|
||||
|
||||
|
||||
class TestRedactTokensBroadenedPrefixes(unittest.TestCase):
|
||||
|
||||
@@ -470,6 +470,7 @@ class TestCanaryGeneration(unittest.TestCase):
|
||||
plan = self._make_plan()
|
||||
self.assertIsInstance(plan.canary, str)
|
||||
self.assertGreater(len(plan.canary), 0)
|
||||
self.assertRegex(plan.canary_env, r"^[A-Z]+_[A-Z]+_SECRET$")
|
||||
|
||||
def test_canary_is_unique_per_session(self):
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
@@ -487,12 +488,16 @@ class TestCanaryGeneration(unittest.TestCase):
|
||||
from bot_bottle.dlp_detectors import scan_known_secrets
|
||||
|
||||
plan = self._make_plan()
|
||||
env = {"EGRESS_TOKEN_CANARY": plan.canary}
|
||||
result = scan_known_secrets(f"exfil={plan.canary}", env=env)
|
||||
env = {plan.canary_env: plan.canary}
|
||||
result = scan_known_secrets(
|
||||
f"exfil={plan.canary}",
|
||||
env=env,
|
||||
sensitive_prefixes=(plan.canary_env,),
|
||||
)
|
||||
self.assertIsNotNone(result)
|
||||
assert result is not None
|
||||
self.assertEqual("block", result.severity)
|
||||
self.assertIn("EGRESS_TOKEN_CANARY", result.reason)
|
||||
self.assertIn(plan.canary_env, result.reason)
|
||||
|
||||
def test_egress_plan_canary_field_default_empty(self):
|
||||
# Verify EgressPlan can be constructed with an empty canary (backward compat).
|
||||
@@ -504,6 +509,7 @@ class TestCanaryGeneration(unittest.TestCase):
|
||||
token_env_map={},
|
||||
)
|
||||
self.assertEqual("", plan.canary)
|
||||
self.assertEqual("", plan.canary_env)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -1338,20 +1338,27 @@ class TestScanOutboundEnhanced(unittest.TestCase):
|
||||
result = scan_outbound(self._ROUTE, f"auth={secret}", env)
|
||||
self.assertIsNotNone(result)
|
||||
|
||||
def test_canary_detected_via_egress_token_canary(self):
|
||||
# The canary (injected as EGRESS_TOKEN_CANARY) is caught by known_secrets.
|
||||
def test_canary_detected_via_random_secret_env_name(self):
|
||||
# The fake secret uses a randomized env name that the sidecar marks
|
||||
# as sensitive through BOT_BOTTLE_SENSITIVE_PREFIXES.
|
||||
canary = "canaryvalue12345abcdef"
|
||||
env = {"EGRESS_TOKEN_CANARY": canary}
|
||||
env = {
|
||||
"CANON_ALPHA_SECRET": canary,
|
||||
"BOT_BOTTLE_SENSITIVE_PREFIXES": "CANON_ALPHA_SECRET",
|
||||
}
|
||||
result = scan_outbound(self._ROUTE, f"data={canary}", env)
|
||||
self.assertIsNotNone(result)
|
||||
assert result is not None
|
||||
self.assertEqual("block", result.severity)
|
||||
self.assertIn("EGRESS_TOKEN_CANARY", result.reason)
|
||||
self.assertIn("CANON_ALPHA_SECRET", result.reason)
|
||||
|
||||
def test_fragmented_canary_blocked(self):
|
||||
# Canary with separators injected is still caught.
|
||||
canary = "supersecretcanary99"
|
||||
env = {"EGRESS_TOKEN_CANARY": canary}
|
||||
env = {
|
||||
"CANON_ALPHA_SECRET": canary,
|
||||
"BOT_BOTTLE_SENSITIVE_PREFIXES": "CANON_ALPHA_SECRET",
|
||||
}
|
||||
fragmented = "-".join(canary)
|
||||
result = scan_outbound(self._ROUTE, f"x={fragmented}", env)
|
||||
self.assertIsNotNone(result)
|
||||
|
||||
@@ -30,6 +30,7 @@ def _plan(
|
||||
supervise: bool = False,
|
||||
agent_git_gate_url: str = "",
|
||||
agent_supervise_url: str = "",
|
||||
canary: bool = False,
|
||||
) -> MacosContainerBottlePlan:
|
||||
routes_path = stage_dir / "routes.yaml"
|
||||
routes_path.write_text("routes: []\n", encoding="utf-8")
|
||||
@@ -42,7 +43,8 @@ def _plan(
|
||||
routes_path=routes_path,
|
||||
routes=("route",),
|
||||
token_env_map={"EGRESS_TOKEN_0": "HOST_TOKEN"},
|
||||
canary="",
|
||||
canary="fake-canary-value" if canary else "",
|
||||
canary_env="CANON_ALPHA_SECRET" if canary else "",
|
||||
)
|
||||
if git:
|
||||
key_path = stage_dir / "origin-key"
|
||||
@@ -139,6 +141,26 @@ class TestMacosContainerLaunchArgv(unittest.TestCase):
|
||||
argv,
|
||||
)
|
||||
|
||||
def test_sidecar_argv_registers_canary_env_as_sensitive(self):
|
||||
plan = _plan(stage_dir=self.stage_dir, canary=True)
|
||||
argv = launch._sidecar_run_argv(
|
||||
plan,
|
||||
"bot-bottle-sidecars-dev-abc",
|
||||
"bot-bottle-net-dev-abc",
|
||||
"bot-bottle-egress-dev-abc",
|
||||
)
|
||||
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", argv)
|
||||
self.assertIn("BOT_BOTTLE_SENSITIVE_PREFIXES=CANON_ALPHA_SECRET", argv)
|
||||
|
||||
def test_agent_argv_receives_canary_env(self):
|
||||
plan = _plan(stage_dir=self.stage_dir, canary=True)
|
||||
argv = launch._agent_run_argv(
|
||||
plan,
|
||||
"bot-bottle-net-dev-abc",
|
||||
"192.0.2.10",
|
||||
)
|
||||
self.assertIn("CANON_ALPHA_SECRET=fake-canary-value", argv)
|
||||
|
||||
def test_agent_env_points_proxy_at_sidecar_ip(self):
|
||||
plan = _plan(
|
||||
stage_dir=self.stage_dir,
|
||||
|
||||
Reference in New Issue
Block a user