diff --git a/bot_bottle/supervise_server.py b/bot_bottle/supervise_server.py index 7600925..c10b731 100644 --- a/bot_bottle/supervise_server.py +++ b/bot_bottle/supervise_server.py @@ -35,6 +35,7 @@ import json import os import socketserver import sys +import time import typing import urllib.error import urllib.parse @@ -63,6 +64,10 @@ ERR_METHOD_NOT_FOUND = -32601 ERR_INVALID_PARAMS = -32602 ERR_INTERNAL = -32603 +DEFAULT_RESPONSE_TIMEOUT_SECONDS = 30.0 +MIN_RESPONSE_POLL_INTERVAL_SECONDS = 0.05 +EGRESS_LIST_TIMEOUT_SECONDS = 5.0 + @dataclass(frozen=True) class JsonRpcRequest: @@ -412,6 +417,7 @@ def _validate_and_bundle_egress_route( class ServerConfig: bottle_slug: str queue_dir: Path + response_timeout_seconds: float = DEFAULT_RESPONSE_TIMEOUT_SECONDS def handle_initialize(_params: dict[str, object]) -> dict[str, object]: @@ -442,7 +448,7 @@ def handle_list_egress_routes( }) opener = urllib.request.build_opener(proxy_handler) try: - with opener.open(_sv.EGRESS_INTROSPECT_URL, timeout=5) as resp: + with opener.open(_sv.EGRESS_INTROSPECT_URL, timeout=EGRESS_LIST_TIMEOUT_SECONDS) as resp: body = resp.read().decode("utf-8") except (urllib.error.URLError, OSError) as e: return { @@ -520,7 +526,20 @@ def handle_tools_call( f"for bottle {config.bottle_slug}; waiting for operator...\n" ) sys.stderr.flush() - response = _sv.wait_for_response(config.queue_dir, proposal.id) + deadline = time.monotonic() + config.response_timeout_seconds + try: + response = _sv.wait_for_response( + config.queue_dir, + proposal.id, + poll_interval=MIN_RESPONSE_POLL_INTERVAL_SECONDS, + deadline=deadline, + ) + except TimeoutError: + text = format_pending_response_text(config.response_timeout_seconds) + return { + "content": [{"type": "text", "text": text}], + "isError": False, + } _sv.archive_proposal(config.queue_dir, proposal.id) text = format_response_text(response) @@ -542,6 +561,16 @@ def format_response_text(response: "_sv.Response") -> str: return "\n".join(lines) +def format_pending_response_text(timeout_seconds: float) -> str: + return "\n".join([ + "status: pending", + ( + "notes: operator response timed out after " + f"{timeout_seconds:g}s; proposal remains queued" + ), + ]) + + # --- HTTP transport -------------------------------------------------------- @@ -654,10 +683,15 @@ def serve( queue_dir: Path, port: int = _sv.SUPERVISE_PORT, bind: str = "0.0.0.0", + response_timeout_seconds: float = DEFAULT_RESPONSE_TIMEOUT_SECONDS, ) -> typing.NoReturn: queue_dir.mkdir(parents=True, exist_ok=True) server = MCPServer((bind, port), MCPHandler) - server.config = ServerConfig(bottle_slug=bottle_slug, queue_dir=queue_dir) + server.config = ServerConfig( + bottle_slug=bottle_slug, + queue_dir=queue_dir, + response_timeout_seconds=response_timeout_seconds, + ) sys.stderr.write( f"supervise listening on {bind}:{port}; " f"slug={bottle_slug!r}; queue={queue_dir}; " @@ -682,9 +716,37 @@ def main(argv: list[str]) -> int: queue_dir = Path(os.environ.get("SUPERVISE_QUEUE_DIR", _sv.QUEUE_DIR_IN_CONTAINER)) port = int(os.environ.get("SUPERVISE_PORT", str(_sv.SUPERVISE_PORT))) bind = os.environ.get("SUPERVISE_BIND", "0.0.0.0") - serve(bottle_slug=bottle_slug, queue_dir=queue_dir, port=port, bind=bind) + try: + response_timeout_seconds = _response_timeout_from_env(os.environ) + except ValueError as e: + sys.stderr.write(f"supervise: {e}\n") + return 2 + serve( + bottle_slug=bottle_slug, + queue_dir=queue_dir, + port=port, + bind=bind, + response_timeout_seconds=response_timeout_seconds, + ) return 0 # serve() does not return +def _response_timeout_from_env(env: typing.Mapping[str, str]) -> float: + raw = env.get("SUPERVISE_RESPONSE_TIMEOUT_SECONDS", "").strip() + if not raw: + return DEFAULT_RESPONSE_TIMEOUT_SECONDS + try: + value = float(raw) + except ValueError as e: + raise ValueError( + "SUPERVISE_RESPONSE_TIMEOUT_SECONDS must be a positive number" + ) from e + if value <= 0: + raise ValueError( + "SUPERVISE_RESPONSE_TIMEOUT_SECONDS must be a positive number" + ) + return value + + if __name__ == "__main__": raise SystemExit(main(sys.argv)) diff --git a/docs/prds/0035-supervise-wait-bounds.md b/docs/prds/0035-supervise-wait-bounds.md new file mode 100644 index 0000000..be19dc9 --- /dev/null +++ b/docs/prds/0035-supervise-wait-bounds.md @@ -0,0 +1,99 @@ +# PRD 0035: Supervise Wait Bounds + +- **Status:** Active +- **Author:** didericis-codex +- **Created:** 2026-06-02 +- **Issue:** #128 + +## Summary + +Bound the supervise sidecar's request-thread waits so an agent tool call cannot +hold an HTTP worker forever while waiting for operator action. Preserve the MCP +tool surface, but make timeout behavior explicit, observable, and tested. + +## Problem + +`bot_bottle/supervise_server.py` handles MCP over a threaded stdlib HTTP +server. Tool calls validate a proposal, write it to the supervise queue, and +then wait for the operator response file. Today that wait can last forever. +Each outstanding tool call consumes one server thread until the operator acts. + +The route-listing helper also performs a live HTTP request to egress inside the +request thread. It has a short timeout today, but the behavior is not described +as part of a broader request-thread budget. + +This is operationally risky in multi-agent or repeated-call scenarios: a stuck +or ignored proposal can accumulate blocked threads, and callers do not get a +clear "still pending" answer they can reason about. + +## Goals / Success Criteria + +- Add a bounded wait for operator responses to supervise tool calls. +- Return a clear JSON-RPC tool result when the proposal remains pending after + the timeout; do not treat pending operator action as an internal server error. +- Keep the queued proposal on disk after timeout so the operator can still act. +- Make the wait duration configurable by environment with a conservative + default. +- Preserve current success and rejection result shapes for completed operator + responses. +- Keep `list-egress-routes` bounded and document its timeout behavior. +- Add focused tests for approved responses, timed-out pending responses, and + route-list timeout/error handling. + +## Non-goals + +- No asynchronous framework or new runtime dependency. +- No replacement of the stdlib threaded HTTP server. +- No change to the host-side supervise queue format. +- No cancellation protocol between the agent and operator UI. +- No dashboard or TUI changes. + +## Scope + +In scope: + +- `bot_bottle/supervise_server.py` request handling. +- Any small helper in `bot_bottle/supervise.py` needed to support a bounded + wait cleanly. +- Unit tests around tool-call response waiting and route-list behavior. + +Out of scope: + +- Reworking proposal persistence. +- Changing egress apply or pipelock apply flows. +- Adding background workers to complete HTTP requests after the client returns. + +## Design + +Introduce a supervise response wait budget, +`SUPERVISE_RESPONSE_TIMEOUT_SECONDS`, with a 30 second default. The existing +poll loop should stop after that budget and return a normal tool result such as +`{"status": "pending", "notes": "operator response timed out; proposal remains queued"}`. +The exact field names should fit the existing response schema so agents can +handle success, rejection, and pending with one result parser. + +The proposal file must remain in the queue when the HTTP call times out. The +operator can still approve or reject it later, but that later response will not +resume the original HTTP request. + +Route-listing should continue to use a short HTTP timeout to egress. Errors +should be returned as tool results or JSON-RPC errors consistently with the +existing server behavior; the implementation should avoid an unbounded socket +wait in the request thread. + +## Testing Strategy + +- Unit-test a tool call whose response appears before the timeout. +- Unit-test a tool call whose response never appears and assert the request + returns a pending result while the proposal remains queued. +- Unit-test invalid timeout env values fall back or fail clearly. +- Unit-test `list-egress-routes` timeout/error behavior with a fake URL opener. + +Run: + +- `python3 -m unittest tests.unit.test_supervise_server` +- `python3 -m unittest discover -s tests/unit` + +## Open Questions + +None. diff --git a/tests/unit/test_supervise_server.py b/tests/unit/test_supervise_server.py index af6cd22..474d65e 100644 --- a/tests/unit/test_supervise_server.py +++ b/tests/unit/test_supervise_server.py @@ -8,6 +8,7 @@ import threading import time import unittest from pathlib import Path +from unittest.mock import patch # The server module loads `supervise` via same-directory import inside @@ -29,8 +30,10 @@ from bot_bottle.supervise_server import ( ServerConfig, TOOL_DEFINITIONS, _RpcError, + _response_timeout_from_env, format_response_text, handle_initialize, + handle_list_egress_routes, handle_tools_call, handle_tools_list, jsonrpc_error, @@ -300,6 +303,70 @@ class TestHandleToolsCall(unittest.TestCase): processed = list((self.queue_dir / "processed").glob("*.json")) self.assertEqual(2, len(processed)) + def test_pending_response_times_out_without_archive(self): + config = ServerConfig( + bottle_slug="dev", + queue_dir=self.queue_dir, + response_timeout_seconds=0.05, + ) + result = handle_tools_call( + { + "name": _sv.TOOL_EGRESS_BLOCK, + "arguments": { + "host": "example.com", + "justification": "need a route", + }, + }, + config, + ) + + self.assertFalse(result["isError"]) # type: ignore[index] + text = result["content"][0]["text"] # type: ignore[index] + self.assertIn("status: pending", text) + self.assertIn("proposal remains queued", text) + self.assertEqual(1, len(_sv.list_pending_proposals(self.queue_dir))) + self.assertFalse((self.queue_dir / "processed").exists()) + + +class TestHandleListEgressRoutes(unittest.TestCase): + def test_url_error_returns_tool_error(self): + class _Opener: + def open(self, *args, **kwargs): # noqa: ANN001, ANN002, ANN003 + raise OSError("egress unavailable") + + with patch.object(supervise_server.urllib.request, "build_opener", return_value=_Opener()): + result = handle_list_egress_routes( + {}, + ServerConfig(bottle_slug="dev", queue_dir=Path("/unused")), + ) + + self.assertTrue(result["isError"]) # type: ignore[index] + text = result["content"][0]["text"] # type: ignore[index] + self.assertIn("could not reach", text) + self.assertIn("egress unavailable", text) + + +class TestResponseTimeoutEnv(unittest.TestCase): + def test_unset_uses_default(self): + self.assertEqual( + supervise_server.DEFAULT_RESPONSE_TIMEOUT_SECONDS, + _response_timeout_from_env({}), + ) + + def test_positive_float_accepted(self): + self.assertEqual( + 12.5, + _response_timeout_from_env({"SUPERVISE_RESPONSE_TIMEOUT_SECONDS": "12.5"}), + ) + + def test_invalid_value_rejected(self): + with self.assertRaises(ValueError): + _response_timeout_from_env({"SUPERVISE_RESPONSE_TIMEOUT_SECONDS": "soon"}) + + def test_nonpositive_value_rejected(self): + with self.assertRaises(ValueError): + _response_timeout_from_env({"SUPERVISE_RESPONSE_TIMEOUT_SECONDS": "0"}) + # --- Response text formatting ---------------------------------------------