Files
bot-bottle/tests/unit/test_sidecar_init.py
didericis-claude 5a2011c48f fix: close child stdout pipes on restart and loop convergence (PRD 0043)
Closes #140. In restart_daemon, the old process's stdout pipe was never
explicitly closed after p.wait() returned, leaking the fd until the
supervisor object was GC'd. Similarly, when the watch loop converged
(all children dead), no pipe was closed. Both paths now call
p.stdout.close() immediately after the process is confirmed exited.
Tests enforce this with warnings.simplefilter("error", ResourceWarning)
in TestSupervisor.setUp.
2026-06-02 11:48:24 -04:00

516 lines
19 KiB
Python

"""Unit: sidecar bundle init supervisor (PRD 0024 chunk 1).
Tests both the helper functions in `bot_bottle.sidecar_init`
and the supervisor's end-to-end signal / exit-code behavior. The
end-to-end tests use real subprocesses (`/bin/sleep`,
`/bin/sh -c '...'`) — short-lived, no docker required — so they
run under `tests/unit/` rather than `tests/integration/`."""
from __future__ import annotations
import os
import signal
import subprocess
import sys
import time
import unittest
import warnings
from pathlib import Path
from unittest.mock import patch
from bot_bottle.sidecar_init import (
_DaemonSpec,
_Supervisor,
_env_for_daemon,
_selected_daemons,
)
class TestEnvForDaemon(unittest.TestCase):
"""Scope egress-only credential env vars to the egress daemon.
Regression for issue #84: pipelock's `scan_env: true` matched
`EGRESS_TOKEN_*` against egress's just-injected Authorization
header and 403-blocked the legitimate request. The agent
never has access to these slots, so stripping them from
non-egress daemons loses no DLP coverage."""
_BASE = {
"PATH": "/usr/bin",
"EGRESS_TOKEN_0": "tok-anthropic",
"EGRESS_TOKEN_1": "tok-gitea",
"EGRESS_UPSTREAM_PROXY": "http://127.0.0.1:8888",
"SUPERVISE_PORT": "9100",
}
def test_egress_sees_full_env(self):
env = _env_for_daemon("egress", self._BASE)
self.assertEqual(self._BASE, env)
def test_pipelock_loses_egress_tokens(self):
env = _env_for_daemon("pipelock", self._BASE)
self.assertNotIn("EGRESS_TOKEN_0", env)
self.assertNotIn("EGRESS_TOKEN_1", env)
# Non-token bundle env stays — supervise / git-gate / git-http / the
# upstream proxy URL are all load-bearing for other
# daemons.
self.assertEqual("/usr/bin", env["PATH"])
self.assertEqual("http://127.0.0.1:8888", env["EGRESS_UPSTREAM_PROXY"])
self.assertEqual("9100", env["SUPERVISE_PORT"])
def test_git_daemons_and_supervise_also_lose_egress_tokens(self):
for name in ("git-gate", "git-http", "supervise"):
env = _env_for_daemon(name, self._BASE)
self.assertNotIn("EGRESS_TOKEN_0", env)
self.assertNotIn("EGRESS_TOKEN_1", env)
def test_returns_independent_dict(self):
# Caller mutation mustn't affect the original.
env = _env_for_daemon("pipelock", self._BASE)
env["X"] = "y"
self.assertNotIn("X", self._BASE)
class TestSelectedDaemons(unittest.TestCase):
"""Env-var subset filtering. The compose renderer is the source
of truth for which daemons are wired; the supervisor just
honors what it's told."""
_DAEMONS = (
_DaemonSpec("egress", ("/bin/sh", "-c", ":")),
_DaemonSpec("pipelock", ("/bin/sh", "-c", ":")),
_DaemonSpec("git-gate", ("/bin/sh", "-c", ":")),
_DaemonSpec("supervise", ("/bin/sh", "-c", ":")),
)
def test_unset_returns_all(self):
got = _selected_daemons({}, all_daemons=self._DAEMONS)
self.assertEqual([d.name for d in got],
["egress", "pipelock", "git-gate", "supervise"])
def test_empty_returns_all(self):
got = _selected_daemons({"BOT_BOTTLE_SIDECAR_DAEMONS": ""},
all_daemons=self._DAEMONS)
self.assertEqual(4, len(got))
def test_whitespace_only_returns_all(self):
got = _selected_daemons({"BOT_BOTTLE_SIDECAR_DAEMONS": " "},
all_daemons=self._DAEMONS)
self.assertEqual(4, len(got))
def test_explicit_subset(self):
got = _selected_daemons(
{"BOT_BOTTLE_SIDECAR_DAEMONS": "egress,pipelock"},
all_daemons=self._DAEMONS,
)
self.assertEqual([d.name for d in got], ["egress", "pipelock"])
def test_preserves_canonical_order(self):
# Order in the env var doesn't matter; the result follows
# the canonical _DAEMONS order so egress starts before
# pipelock (race-window reason).
got = _selected_daemons(
{"BOT_BOTTLE_SIDECAR_DAEMONS": "supervise,pipelock,egress"},
all_daemons=self._DAEMONS,
)
self.assertEqual([d.name for d in got],
["egress", "pipelock", "supervise"])
def test_unknown_names_ignored(self):
got = _selected_daemons(
{"BOT_BOTTLE_SIDECAR_DAEMONS": "egress,bogus"},
all_daemons=self._DAEMONS,
)
self.assertEqual([d.name for d in got], ["egress"])
def test_whitespace_in_names_stripped(self):
got = _selected_daemons(
{"BOT_BOTTLE_SIDECAR_DAEMONS": " egress , pipelock "},
all_daemons=self._DAEMONS,
)
self.assertEqual([d.name for d in got], ["egress", "pipelock"])
class TestSupervisor(unittest.TestCase):
"""End-to-end: drive `_Supervisor` directly with fake commands.
We don't go through `main()` because main installs signal
handlers process-wide, which collides with the test runner."""
def setUp(self):
warnings.simplefilter("error", ResourceWarning)
self.addCleanup(warnings.resetwarnings)
def _drive(self, sup: _Supervisor, max_wait_s: float = 6.0) -> int:
deadline = time.monotonic() + max_wait_s
while not sup.tick():
if time.monotonic() > deadline:
self.fail("supervisor watch loop did not converge in time")
time.sleep(0.05)
return sup.exit_code()
def test_all_children_succeed_returns_zero(self):
# `sh -c :` exits 0 immediately. With the new failure
# policy a child dying doesn't trigger shutdown, so the
# loop only converges once BOTH have exited on their own.
# Both exit 0 → max(0, 0) = 0.
specs = [
_DaemonSpec("a", ("/bin/sh", "-c", ":")),
_DaemonSpec("b", ("/bin/sh", "-c", ":")),
]
sup = _Supervisor(specs)
sup.start_all()
rc = self._drive(sup)
self.assertEqual(0, rc)
def test_child_crash_does_not_initiate_shutdown(self):
# Failure policy (PRD 0024, interim): a child dying
# unexpectedly is logged but the supervisor does NOT tear
# down the survivors. Verified by giving the crasher
# ~0.3s to die, then asserting the long-runner is still
# up and the supervisor never set shutdown_at.
specs = [
_DaemonSpec("crasher", ("/bin/sh", "-c", "exit 1")),
_DaemonSpec("longrun", ("/bin/sleep", "30")),
]
sup = _Supervisor(specs)
sup.start_all()
# Drive ticks for a while; crasher should die, longrun
# should survive.
deadline = time.monotonic() + 1.0
while time.monotonic() < deadline:
done = sup.tick()
self.assertFalse(done, "loop converged with a child still alive")
if sup.procs[0][1].poll() is not None:
break
time.sleep(0.05)
self.assertEqual(1, sup.procs[0][1].returncode,
"crasher should have exited 1")
self.assertIsNone(sup.procs[1][1].poll(),
"longrun should still be running")
self.assertIsNone(sup.shutdown_at,
"supervisor must not initiate shutdown on child death")
# Clean up — explicit signal-driven shutdown.
sup.request_shutdown(reason="test-teardown")
self._drive(sup)
def test_crash_then_signal_surfaces_nonzero_exit_code(self):
# The crasher's exit code is what reaches the container
# exit even though shutdown was triggered by SIGTERM.
# exit_code() = max(child returncodes) → 1 wins over the
# signal-killed longrun's negative returncode.
specs = [
_DaemonSpec("crasher", ("/bin/sh", "-c", "exit 1")),
_DaemonSpec("longrun", ("/bin/sleep", "30")),
]
sup = _Supervisor(specs)
sup.start_all()
time.sleep(0.3) # let crasher die
sup.request_shutdown(reason="test")
rc = self._drive(sup)
self.assertEqual(1, rc)
def test_all_children_die_unattended_loop_converges(self):
# If nobody sends a signal but every child eventually
# dies on its own, the supervisor still exits — nothing
# left to supervise.
specs = [
_DaemonSpec("a", ("/bin/sh", "-c", "exit 0")),
_DaemonSpec("b", ("/bin/sh", "-c", "exit 2")),
]
sup = _Supervisor(specs)
sup.start_all()
rc = self._drive(sup)
self.assertEqual(2, rc)
self.assertIsNone(sup.shutdown_at)
def test_forward_signal_to_named_child(self):
# SIGHUP needs to reach mitmdump inside the bundle so
# routes.yaml reloads (egress_apply.py issues `docker kill
# --signal HUP <bundle>`). The supervisor forwards by daemon
# name.
#
# The child is Python (not a shell-with-trap) on purpose:
# bash on macOS defers trap execution while a foreground
# builtin like `sleep` is running and stdout is a pipe,
# which makes shell-trap test fixtures flaky. The
# production code path (mitmdump as the bundle child)
# doesn't have a shell in between — egress_entrypoint.sh
# `exec`s mitmdump, so SIGHUP lands directly on mitmdump.
sighup_marker = (
sys.executable, "-c",
"import signal, sys, time\n"
"def _h(*_): sys.exit(42)\n"
"signal.signal(signal.SIGHUP, _h)\n"
"while True: time.sleep(0.1)\n",
)
specs = [
_DaemonSpec("egress", sighup_marker),
_DaemonSpec("other", ("/bin/sleep", "30")),
]
sup = _Supervisor(specs)
sup.start_all()
time.sleep(0.3) # let Python install the handler
delivered = sup.forward_signal(signal.SIGHUP, "egress")
self.assertTrue(delivered)
deadline = time.monotonic() + 3.0
while time.monotonic() < deadline:
if sup.procs[0][1].poll() is not None:
break
time.sleep(0.05)
self.assertEqual(42, sup.procs[0][1].returncode,
"egress did not see SIGHUP")
# The other daemon is untouched.
self.assertIsNone(sup.procs[1][1].poll())
sup.request_shutdown(reason="cleanup")
self._drive(sup)
def test_forward_signal_unknown_daemon_no_op(self):
specs = [_DaemonSpec("a", ("/bin/sleep", "30"))]
sup = _Supervisor(specs)
sup.start_all()
delivered = sup.forward_signal(signal.SIGHUP, "ghost")
self.assertFalse(delivered)
sup.request_shutdown(reason="cleanup")
self._drive(sup)
def test_restart_daemon_replaces_in_place(self):
# pipelock_apply.py sends SIGUSR1 to the bundle, supervisor
# restarts the pipelock daemon, supervise (the other
# daemon's MCP server in production) stays up.
specs = [
_DaemonSpec("pipelock", ("/bin/sleep", "30")),
_DaemonSpec("supervise", ("/bin/sleep", "30")),
]
sup = _Supervisor(specs)
sup.start_all()
time.sleep(0.1)
old_pipelock_pid = sup.procs[0][1].pid
supervise_pid = sup.procs[1][1].pid
ok = sup.restart_daemon("pipelock", grace=2.0)
self.assertTrue(ok)
# Pipelock got a fresh PID — different process.
new_pipelock_pid = sup.procs[0][1].pid
self.assertNotEqual(old_pipelock_pid, new_pipelock_pid)
# Supervise's PID is unchanged — it was NOT restarted.
self.assertEqual(supervise_pid, sup.procs[1][1].pid)
self.assertIsNone(sup.procs[1][1].poll(),
"supervise should still be running")
sup.request_shutdown(reason="cleanup")
self._drive(sup)
def test_request_restart_is_drained_by_tick(self):
specs = [
_DaemonSpec("pipelock", ("/bin/sleep", "30")),
_DaemonSpec("supervise", ("/bin/sleep", "30")),
]
sup = _Supervisor(specs)
sup.start_all()
time.sleep(0.1)
old_pipelock_pid = sup.procs[0][1].pid
supervise_pid = sup.procs[1][1].pid
ok = sup.request_restart("pipelock")
self.assertTrue(ok)
# The non-blocking request path only records intent.
self.assertEqual(old_pipelock_pid, sup.procs[0][1].pid)
done = sup.tick()
self.assertFalse(done)
self.assertNotEqual(old_pipelock_pid, sup.procs[0][1].pid)
self.assertEqual(supervise_pid, sup.procs[1][1].pid)
sup.request_shutdown(reason="cleanup")
self._drive(sup)
def test_repeated_restart_requests_coalesce(self):
specs = [_DaemonSpec("pipelock", ("/bin/sleep", "30"))]
sup = _Supervisor(specs)
sup.start_all()
time.sleep(0.1)
self.assertTrue(sup.request_restart("pipelock"))
self.assertTrue(sup.request_restart("pipelock"))
self.assertEqual({"pipelock"}, sup._restart_requested)
old_pid = sup.procs[0][1].pid
sup.tick()
first_restarted_pid = sup.procs[0][1].pid
self.assertNotEqual(old_pid, first_restarted_pid)
# A second tick should not restart again; the coalesced
# request was consumed by the first tick.
sup.tick()
self.assertEqual(first_restarted_pid, sup.procs[0][1].pid)
sup.request_shutdown(reason="cleanup")
self._drive(sup)
def test_request_restart_unknown_daemon_no_op(self):
specs = [_DaemonSpec("a", ("/bin/sleep", "30"))]
sup = _Supervisor(specs)
sup.start_all()
ok = sup.request_restart("ghost")
self.assertFalse(ok)
self.assertEqual(set(), sup._restart_requested)
sup.request_shutdown(reason="cleanup")
self._drive(sup)
def test_restart_unknown_daemon_no_op(self):
specs = [_DaemonSpec("a", ("/bin/sleep", "30"))]
sup = _Supervisor(specs)
sup.start_all()
ok = sup.restart_daemon("ghost")
self.assertFalse(ok)
sup.request_shutdown(reason="cleanup")
self._drive(sup)
def test_restart_during_shutdown_is_no_op(self):
specs = [_DaemonSpec("pipelock", ("/bin/sleep", "30"))]
sup = _Supervisor(specs)
sup.start_all()
sup.request_shutdown(reason="test")
ok = sup.restart_daemon("pipelock")
self.assertFalse(ok,
"must not respawn a daemon during teardown")
self._drive(sup)
def test_pending_restart_dropped_during_shutdown(self):
specs = [_DaemonSpec("pipelock", ("/bin/sleep", "30"))]
sup = _Supervisor(specs)
sup.start_all()
time.sleep(0.1)
old_pid = sup.procs[0][1].pid
self.assertTrue(sup.request_restart("pipelock"))
sup.request_shutdown(reason="test")
self.assertEqual(set(), sup._restart_requested)
self._drive(sup)
self.assertEqual(old_pid, sup.procs[0][1].pid)
def test_shutdown_after_start_terminates_children(self):
# Two long-running children. Caller requests shutdown;
# both should receive SIGTERM and exit. Signal-only
# shutdown clamps to a zero supervisor exit code.
specs = [
_DaemonSpec("a", ("/bin/sleep", "60")),
_DaemonSpec("b", ("/bin/sleep", "60")),
]
sup = _Supervisor(specs)
sup.start_all()
time.sleep(0.2) # let them actually start
sup.request_shutdown(reason="test")
rc = self._drive(sup)
self.assertEqual(0, rc)
# Both children got the signal — neither survived past
# the grace deadline.
for _, p in sup.procs:
self.assertIsNotNone(p.returncode)
def test_grace_period_escalates_to_sigkill(self):
# A child that ignores SIGTERM. The supervisor's
# _GRACE_SECONDS is 8s globally; we patch it to 0.3 so the
# test stays fast.
ignore_term = (
"/bin/sh", "-c",
"trap '' TERM; sleep 30",
)
specs = [_DaemonSpec("stubborn", ignore_term)]
sup = _Supervisor(specs)
sup.start_all()
time.sleep(0.3) # let `trap` register
sup.request_shutdown(reason="test")
with patch("bot_bottle.sidecar_init._GRACE_SECONDS", 0.3):
rc = self._drive(sup, max_wait_s=4.0)
# Process was SIGKILL'd → returncode -9 on POSIX.
self.assertEqual(-9, sup.procs[0][1].returncode)
self.assertEqual(0, rc)
def test_idempotent_shutdown_requests(self):
specs = [_DaemonSpec("a", ("/bin/sleep", "60"))]
sup = _Supervisor(specs)
sup.start_all()
time.sleep(0.1)
first_at = None
sup.request_shutdown(reason="first")
first_at = sup.shutdown_at
# Second call must NOT reset the deadline (otherwise the
# grace timer is gameable by a noisy signal).
sup.request_shutdown(reason="second")
self.assertEqual(first_at, sup.shutdown_at)
self._drive(sup)
class TestMainEndToEnd(unittest.TestCase):
"""Run sidecar_init.py as a real subprocess to cover the
signal-handler installation path. Skipped on platforms
without /bin/sleep + /bin/sh."""
@classmethod
def setUpClass(cls):
for p in ("/bin/sh", "/bin/sleep"):
if not Path(p).exists():
raise unittest.SkipTest(f"missing {p}")
def _run(self, daemons_csv: str, send_signal: int | None,
wait_before_signal: float = 0.4,
overall_timeout: float = 6.0) -> tuple[int, str]:
"""Spawn sidecar_init.main() in a child process with the
DAEMONS list patched to harmless `sleep 30` commands.
Returns (returncode, captured stdout)."""
helper = (
"import os, runpy, sys\n"
"from bot_bottle import sidecar_init as si\n"
"si._DAEMONS = (\n"
" si._DaemonSpec('alpha', ('/bin/sleep','30')),\n"
" si._DaemonSpec('beta', ('/bin/sleep','30')),\n"
")\n"
"sys.exit(si.main([]))\n"
)
env = {**os.environ, "BOT_BOTTLE_SIDECAR_DAEMONS": daemons_csv}
proc = subprocess.Popen(
[sys.executable, "-c", helper],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
env=env,
)
try:
if send_signal is not None:
time.sleep(wait_before_signal)
proc.send_signal(send_signal)
out_b, _ = proc.communicate(timeout=overall_timeout)
except subprocess.TimeoutExpired:
proc.kill()
out_b, _ = proc.communicate()
self.fail("sidecar_init main() did not exit before timeout")
return proc.returncode, out_b.decode("utf-8", errors="replace")
def test_sigterm_clean_shutdown(self):
rc, out = self._run("alpha,beta", signal.SIGTERM)
self.assertIn("starting alpha", out)
self.assertIn("starting beta", out)
self.assertIn("forwarding SIGTERM", out)
self.assertEqual(0, rc)
def test_empty_daemon_set_exits_zero_immediately(self):
# Use a sentinel value that filters out both alpha+beta.
rc, out = self._run("nothing", send_signal=None,
overall_timeout=2.0)
self.assertEqual(0, rc)
self.assertIn("no daemons selected", out)
if __name__ == "__main__":
unittest.main()