"""Unit: sidecar bundle init supervisor (PRD 0024 chunk 1). Tests both the helper functions in `bot_bottle.sidecar_init` and the supervisor's end-to-end signal / exit-code behavior. The end-to-end tests use real subprocesses (`/bin/sleep`, `/bin/sh -c '...'`) — short-lived, no docker required — so they run under `tests/unit/` rather than `tests/integration/`.""" from __future__ import annotations import os import signal import subprocess import sys import time import unittest from pathlib import Path from unittest.mock import patch from bot_bottle.sidecar_init import ( _DaemonSpec, _Supervisor, _env_for_daemon, _selected_daemons, ) class TestEnvForDaemon(unittest.TestCase): """Scope egress-only credential env vars to the egress daemon. Regression for issue #84: pipelock's `scan_env: true` matched `EGRESS_TOKEN_*` against egress's just-injected Authorization header and 403-blocked the legitimate request. The agent never has access to these slots, so stripping them from non-egress daemons loses no DLP coverage.""" _BASE = { "PATH": "/usr/bin", "EGRESS_TOKEN_0": "tok-anthropic", "EGRESS_TOKEN_1": "tok-gitea", "EGRESS_UPSTREAM_PROXY": "http://127.0.0.1:8888", "SUPERVISE_PORT": "9100", } def test_egress_sees_full_env(self): env = _env_for_daemon("egress", self._BASE) self.assertEqual(self._BASE, env) def test_pipelock_loses_egress_tokens(self): env = _env_for_daemon("pipelock", self._BASE) self.assertNotIn("EGRESS_TOKEN_0", env) self.assertNotIn("EGRESS_TOKEN_1", env) # Non-token bundle env stays — supervise / git-gate / the # upstream proxy URL are all load-bearing for other # daemons. self.assertEqual("/usr/bin", env["PATH"]) self.assertEqual("http://127.0.0.1:8888", env["EGRESS_UPSTREAM_PROXY"]) self.assertEqual("9100", env["SUPERVISE_PORT"]) def test_git_gate_and_supervise_also_lose_egress_tokens(self): for name in ("git-gate", "supervise"): env = _env_for_daemon(name, self._BASE) self.assertNotIn("EGRESS_TOKEN_0", env) self.assertNotIn("EGRESS_TOKEN_1", env) def test_returns_independent_dict(self): # Caller mutation mustn't affect the original. env = _env_for_daemon("pipelock", self._BASE) env["X"] = "y" self.assertNotIn("X", self._BASE) class TestSelectedDaemons(unittest.TestCase): """Env-var subset filtering. The compose renderer is the source of truth for which daemons are wired; the supervisor just honors what it's told.""" _DAEMONS = ( _DaemonSpec("egress", ("/bin/sh", "-c", ":")), _DaemonSpec("pipelock", ("/bin/sh", "-c", ":")), _DaemonSpec("git-gate", ("/bin/sh", "-c", ":")), _DaemonSpec("supervise", ("/bin/sh", "-c", ":")), ) def test_unset_returns_all(self): got = _selected_daemons({}, all_daemons=self._DAEMONS) self.assertEqual([d.name for d in got], ["egress", "pipelock", "git-gate", "supervise"]) def test_empty_returns_all(self): got = _selected_daemons({"BOT_BOTTLE_SIDECAR_DAEMONS": ""}, all_daemons=self._DAEMONS) self.assertEqual(4, len(got)) def test_whitespace_only_returns_all(self): got = _selected_daemons({"BOT_BOTTLE_SIDECAR_DAEMONS": " "}, all_daemons=self._DAEMONS) self.assertEqual(4, len(got)) def test_explicit_subset(self): got = _selected_daemons( {"BOT_BOTTLE_SIDECAR_DAEMONS": "egress,pipelock"}, all_daemons=self._DAEMONS, ) self.assertEqual([d.name for d in got], ["egress", "pipelock"]) def test_preserves_canonical_order(self): # Order in the env var doesn't matter; the result follows # the canonical _DAEMONS order so egress starts before # pipelock (race-window reason). got = _selected_daemons( {"BOT_BOTTLE_SIDECAR_DAEMONS": "supervise,pipelock,egress"}, all_daemons=self._DAEMONS, ) self.assertEqual([d.name for d in got], ["egress", "pipelock", "supervise"]) def test_unknown_names_ignored(self): got = _selected_daemons( {"BOT_BOTTLE_SIDECAR_DAEMONS": "egress,bogus"}, all_daemons=self._DAEMONS, ) self.assertEqual([d.name for d in got], ["egress"]) def test_whitespace_in_names_stripped(self): got = _selected_daemons( {"BOT_BOTTLE_SIDECAR_DAEMONS": " egress , pipelock "}, all_daemons=self._DAEMONS, ) self.assertEqual([d.name for d in got], ["egress", "pipelock"]) class TestSupervisor(unittest.TestCase): """End-to-end: drive `_Supervisor` directly with fake commands. We don't go through `main()` because main installs signal handlers process-wide, which collides with the test runner.""" def _drive(self, sup: _Supervisor, max_wait_s: float = 6.0) -> int: deadline = time.monotonic() + max_wait_s while not sup.tick(): if time.monotonic() > deadline: self.fail("supervisor watch loop did not converge in time") time.sleep(0.05) return sup.exit_code() def test_all_children_succeed_returns_zero(self): # `sh -c :` exits 0 immediately. With the new failure # policy a child dying doesn't trigger shutdown, so the # loop only converges once BOTH have exited on their own. # Both exit 0 → max(0, 0) = 0. specs = [ _DaemonSpec("a", ("/bin/sh", "-c", ":")), _DaemonSpec("b", ("/bin/sh", "-c", ":")), ] sup = _Supervisor(specs) sup.start_all() rc = self._drive(sup) self.assertEqual(0, rc) def test_child_crash_does_not_initiate_shutdown(self): # Failure policy (PRD 0024, interim): a child dying # unexpectedly is logged but the supervisor does NOT tear # down the survivors. Verified by giving the crasher # ~0.3s to die, then asserting the long-runner is still # up and the supervisor never set shutdown_at. specs = [ _DaemonSpec("crasher", ("/bin/sh", "-c", "exit 1")), _DaemonSpec("longrun", ("/bin/sleep", "30")), ] sup = _Supervisor(specs) sup.start_all() # Drive ticks for a while; crasher should die, longrun # should survive. deadline = time.monotonic() + 1.0 while time.monotonic() < deadline: done = sup.tick() self.assertFalse(done, "loop converged with a child still alive") if sup.procs[0][1].poll() is not None: break time.sleep(0.05) self.assertEqual(1, sup.procs[0][1].returncode, "crasher should have exited 1") self.assertIsNone(sup.procs[1][1].poll(), "longrun should still be running") self.assertIsNone(sup.shutdown_at, "supervisor must not initiate shutdown on child death") # Clean up — explicit signal-driven shutdown. sup.request_shutdown(reason="test-teardown") self._drive(sup) def test_crash_then_signal_surfaces_nonzero_exit_code(self): # The crasher's exit code is what reaches the container # exit even though shutdown was triggered by SIGTERM. # exit_code() = max(child returncodes) → 1 wins over the # signal-killed longrun's negative returncode. specs = [ _DaemonSpec("crasher", ("/bin/sh", "-c", "exit 1")), _DaemonSpec("longrun", ("/bin/sleep", "30")), ] sup = _Supervisor(specs) sup.start_all() time.sleep(0.3) # let crasher die sup.request_shutdown(reason="test") rc = self._drive(sup) self.assertEqual(1, rc) def test_all_children_die_unattended_loop_converges(self): # If nobody sends a signal but every child eventually # dies on its own, the supervisor still exits — nothing # left to supervise. specs = [ _DaemonSpec("a", ("/bin/sh", "-c", "exit 0")), _DaemonSpec("b", ("/bin/sh", "-c", "exit 2")), ] sup = _Supervisor(specs) sup.start_all() rc = self._drive(sup) self.assertEqual(2, rc) self.assertIsNone(sup.shutdown_at) def test_forward_signal_to_named_child(self): # SIGHUP needs to reach mitmdump inside the bundle so # routes.yaml reloads (egress_apply.py issues `docker kill # --signal HUP `). The supervisor forwards by daemon # name. # # The child is Python (not a shell-with-trap) on purpose: # bash on macOS defers trap execution while a foreground # builtin like `sleep` is running and stdout is a pipe, # which makes shell-trap test fixtures flaky. The # production code path (mitmdump as the bundle child) # doesn't have a shell in between — egress_entrypoint.sh # `exec`s mitmdump, so SIGHUP lands directly on mitmdump. sighup_marker = ( sys.executable, "-c", "import signal, sys, time\n" "def _h(*_): sys.exit(42)\n" "signal.signal(signal.SIGHUP, _h)\n" "while True: time.sleep(0.1)\n", ) specs = [ _DaemonSpec("egress", sighup_marker), _DaemonSpec("other", ("/bin/sleep", "30")), ] sup = _Supervisor(specs) sup.start_all() time.sleep(0.3) # let Python install the handler delivered = sup.forward_signal(signal.SIGHUP, "egress") self.assertTrue(delivered) deadline = time.monotonic() + 3.0 while time.monotonic() < deadline: if sup.procs[0][1].poll() is not None: break time.sleep(0.05) self.assertEqual(42, sup.procs[0][1].returncode, "egress did not see SIGHUP") # The other daemon is untouched. self.assertIsNone(sup.procs[1][1].poll()) sup.request_shutdown(reason="cleanup") self._drive(sup) def test_forward_signal_unknown_daemon_no_op(self): specs = [_DaemonSpec("a", ("/bin/sleep", "30"))] sup = _Supervisor(specs) sup.start_all() delivered = sup.forward_signal(signal.SIGHUP, "ghost") self.assertFalse(delivered) sup.request_shutdown(reason="cleanup") self._drive(sup) def test_restart_daemon_replaces_in_place(self): # pipelock_apply.py sends SIGUSR1 to the bundle, supervisor # restarts the pipelock daemon, supervise (the other # daemon's MCP server in production) stays up. specs = [ _DaemonSpec("pipelock", ("/bin/sleep", "30")), _DaemonSpec("supervise", ("/bin/sleep", "30")), ] sup = _Supervisor(specs) sup.start_all() time.sleep(0.1) old_pipelock_pid = sup.procs[0][1].pid supervise_pid = sup.procs[1][1].pid ok = sup.restart_daemon("pipelock", grace=2.0) self.assertTrue(ok) # Pipelock got a fresh PID — different process. new_pipelock_pid = sup.procs[0][1].pid self.assertNotEqual(old_pipelock_pid, new_pipelock_pid) # Supervise's PID is unchanged — it was NOT restarted. self.assertEqual(supervise_pid, sup.procs[1][1].pid) self.assertIsNone(sup.procs[1][1].poll(), "supervise should still be running") sup.request_shutdown(reason="cleanup") self._drive(sup) def test_restart_unknown_daemon_no_op(self): specs = [_DaemonSpec("a", ("/bin/sleep", "30"))] sup = _Supervisor(specs) sup.start_all() ok = sup.restart_daemon("ghost") self.assertFalse(ok) sup.request_shutdown(reason="cleanup") self._drive(sup) def test_restart_during_shutdown_is_no_op(self): specs = [_DaemonSpec("pipelock", ("/bin/sleep", "30"))] sup = _Supervisor(specs) sup.start_all() sup.request_shutdown(reason="test") ok = sup.restart_daemon("pipelock") self.assertFalse(ok, "must not respawn a daemon during teardown") self._drive(sup) def test_shutdown_after_start_terminates_children(self): # Two long-running children. Caller requests shutdown; # both should receive SIGTERM and exit. exit_code() is # max of (returncodes) — both signal-killed (negative), # so max() picks 0 in the typical case (or the # platform-specific signal returncode). specs = [ _DaemonSpec("a", ("/bin/sleep", "60")), _DaemonSpec("b", ("/bin/sleep", "60")), ] sup = _Supervisor(specs) sup.start_all() time.sleep(0.2) # let them actually start sup.request_shutdown(reason="test") rc = self._drive(sup) self.assertIsNotNone(rc) # Both children got the signal — neither survived past # the grace deadline. for _, p in sup.procs: self.assertIsNotNone(p.returncode) def test_grace_period_escalates_to_sigkill(self): # A child that ignores SIGTERM. The supervisor's # _GRACE_SECONDS is 8s globally; we patch it to 0.3 so the # test stays fast. ignore_term = ( "/bin/sh", "-c", "trap '' TERM; sleep 30", ) specs = [_DaemonSpec("stubborn", ignore_term)] sup = _Supervisor(specs) sup.start_all() time.sleep(0.3) # let `trap` register sup.request_shutdown(reason="test") with patch("bot_bottle.sidecar_init._GRACE_SECONDS", 0.3): rc = self._drive(sup, max_wait_s=4.0) # Process was SIGKILL'd → returncode -9 on POSIX. self.assertEqual(-9, sup.procs[0][1].returncode) self.assertIsNotNone(rc) def test_idempotent_shutdown_requests(self): specs = [_DaemonSpec("a", ("/bin/sleep", "60"))] sup = _Supervisor(specs) sup.start_all() time.sleep(0.1) first_at = None sup.request_shutdown(reason="first") first_at = sup.shutdown_at # Second call must NOT reset the deadline (otherwise the # grace timer is gameable by a noisy signal). sup.request_shutdown(reason="second") self.assertEqual(first_at, sup.shutdown_at) self._drive(sup) class TestMainEndToEnd(unittest.TestCase): """Run sidecar_init.py as a real subprocess to cover the signal-handler installation path. Skipped on platforms without /bin/sleep + /bin/sh.""" @classmethod def setUpClass(cls): for p in ("/bin/sh", "/bin/sleep"): if not Path(p).exists(): raise unittest.SkipTest(f"missing {p}") def _run(self, daemons_csv: str, send_signal: int | None, wait_before_signal: float = 0.4, overall_timeout: float = 6.0) -> tuple[int, str]: """Spawn sidecar_init.main() in a child process with the DAEMONS list patched to harmless `sleep 30` commands. Returns (returncode, captured stdout).""" helper = ( "import os, runpy, sys\n" "from bot_bottle import sidecar_init as si\n" "si._DAEMONS = (\n" " si._DaemonSpec('alpha', ('/bin/sleep','30')),\n" " si._DaemonSpec('beta', ('/bin/sleep','30')),\n" ")\n" "sys.exit(si.main([]))\n" ) env = {**os.environ, "BOT_BOTTLE_SIDECAR_DAEMONS": daemons_csv} proc = subprocess.Popen( [sys.executable, "-c", helper], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env, ) try: if send_signal is not None: time.sleep(wait_before_signal) proc.send_signal(send_signal) out_b, _ = proc.communicate(timeout=overall_timeout) except subprocess.TimeoutExpired: proc.kill() out_b, _ = proc.communicate() self.fail("sidecar_init main() did not exit before timeout") return proc.returncode, out_b.decode("utf-8", errors="replace") def test_sigterm_clean_shutdown(self): rc, out = self._run("alpha,beta", signal.SIGTERM) self.assertIn("starting alpha", out) self.assertIn("starting beta", out) self.assertIn("forwarding SIGTERM", out) # Sleep terminated by SIGTERM exits with returncode -15; # supervisor surfaces that via max(...) and main() # returns -15 → process exit becomes 256-15 = 241. # On macOS bash may convert to 143. Either way, nonzero # AND the child finished — we don't pin the exact code. self.assertNotEqual(0, rc) def test_empty_daemon_set_exits_zero_immediately(self): # Use a sentinel value that filters out both alpha+beta. rc, out = self._run("nothing", send_signal=None, overall_timeout=2.0) self.assertEqual(0, rc) self.assertIn("no daemons selected", out) if __name__ == "__main__": unittest.main()