Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 47b75030ee | |||
| 3472e06efb | |||
| 82ce5d3034 | |||
| 7c260eeff9 |
@@ -35,6 +35,7 @@ import json
|
||||
import os
|
||||
import socketserver
|
||||
import sys
|
||||
import time
|
||||
import typing
|
||||
import urllib.error
|
||||
import urllib.parse
|
||||
@@ -63,6 +64,10 @@ ERR_METHOD_NOT_FOUND = -32601
|
||||
ERR_INVALID_PARAMS = -32602
|
||||
ERR_INTERNAL = -32603
|
||||
|
||||
DEFAULT_RESPONSE_TIMEOUT_SECONDS = 30.0
|
||||
MIN_RESPONSE_POLL_INTERVAL_SECONDS = 0.05
|
||||
EGRESS_LIST_TIMEOUT_SECONDS = 5.0
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class JsonRpcRequest:
|
||||
@@ -412,6 +417,7 @@ def _validate_and_bundle_egress_route(
|
||||
class ServerConfig:
|
||||
bottle_slug: str
|
||||
queue_dir: Path
|
||||
response_timeout_seconds: float = DEFAULT_RESPONSE_TIMEOUT_SECONDS
|
||||
|
||||
|
||||
def handle_initialize(_params: dict[str, object]) -> dict[str, object]:
|
||||
@@ -442,7 +448,7 @@ def handle_list_egress_routes(
|
||||
})
|
||||
opener = urllib.request.build_opener(proxy_handler)
|
||||
try:
|
||||
with opener.open(_sv.EGRESS_INTROSPECT_URL, timeout=5) as resp:
|
||||
with opener.open(_sv.EGRESS_INTROSPECT_URL, timeout=EGRESS_LIST_TIMEOUT_SECONDS) as resp:
|
||||
body = resp.read().decode("utf-8")
|
||||
except (urllib.error.URLError, OSError) as e:
|
||||
return {
|
||||
@@ -520,7 +526,20 @@ def handle_tools_call(
|
||||
f"for bottle {config.bottle_slug}; waiting for operator...\n"
|
||||
)
|
||||
sys.stderr.flush()
|
||||
response = _sv.wait_for_response(config.queue_dir, proposal.id)
|
||||
deadline = time.monotonic() + config.response_timeout_seconds
|
||||
try:
|
||||
response = _sv.wait_for_response(
|
||||
config.queue_dir,
|
||||
proposal.id,
|
||||
poll_interval=MIN_RESPONSE_POLL_INTERVAL_SECONDS,
|
||||
deadline=deadline,
|
||||
)
|
||||
except TimeoutError:
|
||||
text = format_pending_response_text(config.response_timeout_seconds)
|
||||
return {
|
||||
"content": [{"type": "text", "text": text}],
|
||||
"isError": False,
|
||||
}
|
||||
_sv.archive_proposal(config.queue_dir, proposal.id)
|
||||
|
||||
text = format_response_text(response)
|
||||
@@ -542,6 +561,16 @@ def format_response_text(response: "_sv.Response") -> str:
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def format_pending_response_text(timeout_seconds: float) -> str:
|
||||
return "\n".join([
|
||||
"status: pending",
|
||||
(
|
||||
"notes: operator response timed out after "
|
||||
f"{timeout_seconds:g}s; proposal remains queued"
|
||||
),
|
||||
])
|
||||
|
||||
|
||||
# --- HTTP transport --------------------------------------------------------
|
||||
|
||||
|
||||
@@ -654,10 +683,15 @@ def serve(
|
||||
queue_dir: Path,
|
||||
port: int = _sv.SUPERVISE_PORT,
|
||||
bind: str = "0.0.0.0",
|
||||
response_timeout_seconds: float = DEFAULT_RESPONSE_TIMEOUT_SECONDS,
|
||||
) -> typing.NoReturn:
|
||||
queue_dir.mkdir(parents=True, exist_ok=True)
|
||||
server = MCPServer((bind, port), MCPHandler)
|
||||
server.config = ServerConfig(bottle_slug=bottle_slug, queue_dir=queue_dir)
|
||||
server.config = ServerConfig(
|
||||
bottle_slug=bottle_slug,
|
||||
queue_dir=queue_dir,
|
||||
response_timeout_seconds=response_timeout_seconds,
|
||||
)
|
||||
sys.stderr.write(
|
||||
f"supervise listening on {bind}:{port}; "
|
||||
f"slug={bottle_slug!r}; queue={queue_dir}; "
|
||||
@@ -682,9 +716,37 @@ def main(argv: list[str]) -> int:
|
||||
queue_dir = Path(os.environ.get("SUPERVISE_QUEUE_DIR", _sv.QUEUE_DIR_IN_CONTAINER))
|
||||
port = int(os.environ.get("SUPERVISE_PORT", str(_sv.SUPERVISE_PORT)))
|
||||
bind = os.environ.get("SUPERVISE_BIND", "0.0.0.0")
|
||||
serve(bottle_slug=bottle_slug, queue_dir=queue_dir, port=port, bind=bind)
|
||||
try:
|
||||
response_timeout_seconds = _response_timeout_from_env(os.environ)
|
||||
except ValueError as e:
|
||||
sys.stderr.write(f"supervise: {e}\n")
|
||||
return 2
|
||||
serve(
|
||||
bottle_slug=bottle_slug,
|
||||
queue_dir=queue_dir,
|
||||
port=port,
|
||||
bind=bind,
|
||||
response_timeout_seconds=response_timeout_seconds,
|
||||
)
|
||||
return 0 # serve() does not return
|
||||
|
||||
|
||||
def _response_timeout_from_env(env: typing.Mapping[str, str]) -> float:
|
||||
raw = env.get("SUPERVISE_RESPONSE_TIMEOUT_SECONDS", "").strip()
|
||||
if not raw:
|
||||
return DEFAULT_RESPONSE_TIMEOUT_SECONDS
|
||||
try:
|
||||
value = float(raw)
|
||||
except ValueError as e:
|
||||
raise ValueError(
|
||||
"SUPERVISE_RESPONSE_TIMEOUT_SECONDS must be a positive number"
|
||||
) from e
|
||||
if value <= 0:
|
||||
raise ValueError(
|
||||
"SUPERVISE_RESPONSE_TIMEOUT_SECONDS must be a positive number"
|
||||
)
|
||||
return value
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main(sys.argv))
|
||||
|
||||
@@ -0,0 +1,99 @@
|
||||
# PRD 0035: Supervise Wait Bounds
|
||||
|
||||
- **Status:** Active
|
||||
- **Author:** didericis-codex
|
||||
- **Created:** 2026-06-02
|
||||
- **Issue:** #128
|
||||
|
||||
## Summary
|
||||
|
||||
Bound the supervise sidecar's request-thread waits so an agent tool call cannot
|
||||
hold an HTTP worker forever while waiting for operator action. Preserve the MCP
|
||||
tool surface, but make timeout behavior explicit, observable, and tested.
|
||||
|
||||
## Problem
|
||||
|
||||
`bot_bottle/supervise_server.py` handles MCP over a threaded stdlib HTTP
|
||||
server. Tool calls validate a proposal, write it to the supervise queue, and
|
||||
then wait for the operator response file. Today that wait can last forever.
|
||||
Each outstanding tool call consumes one server thread until the operator acts.
|
||||
|
||||
The route-listing helper also performs a live HTTP request to egress inside the
|
||||
request thread. It has a short timeout today, but the behavior is not described
|
||||
as part of a broader request-thread budget.
|
||||
|
||||
This is operationally risky in multi-agent or repeated-call scenarios: a stuck
|
||||
or ignored proposal can accumulate blocked threads, and callers do not get a
|
||||
clear "still pending" answer they can reason about.
|
||||
|
||||
## Goals / Success Criteria
|
||||
|
||||
- Add a bounded wait for operator responses to supervise tool calls.
|
||||
- Return a clear JSON-RPC tool result when the proposal remains pending after
|
||||
the timeout; do not treat pending operator action as an internal server error.
|
||||
- Keep the queued proposal on disk after timeout so the operator can still act.
|
||||
- Make the wait duration configurable by environment with a conservative
|
||||
default.
|
||||
- Preserve current success and rejection result shapes for completed operator
|
||||
responses.
|
||||
- Keep `list-egress-routes` bounded and document its timeout behavior.
|
||||
- Add focused tests for approved responses, timed-out pending responses, and
|
||||
route-list timeout/error handling.
|
||||
|
||||
## Non-goals
|
||||
|
||||
- No asynchronous framework or new runtime dependency.
|
||||
- No replacement of the stdlib threaded HTTP server.
|
||||
- No change to the host-side supervise queue format.
|
||||
- No cancellation protocol between the agent and operator UI.
|
||||
- No dashboard or TUI changes.
|
||||
|
||||
## Scope
|
||||
|
||||
In scope:
|
||||
|
||||
- `bot_bottle/supervise_server.py` request handling.
|
||||
- Any small helper in `bot_bottle/supervise.py` needed to support a bounded
|
||||
wait cleanly.
|
||||
- Unit tests around tool-call response waiting and route-list behavior.
|
||||
|
||||
Out of scope:
|
||||
|
||||
- Reworking proposal persistence.
|
||||
- Changing egress apply or pipelock apply flows.
|
||||
- Adding background workers to complete HTTP requests after the client returns.
|
||||
|
||||
## Design
|
||||
|
||||
Introduce a supervise response wait budget,
|
||||
`SUPERVISE_RESPONSE_TIMEOUT_SECONDS`, with a 30 second default. The existing
|
||||
poll loop should stop after that budget and return a normal tool result such as
|
||||
`{"status": "pending", "notes": "operator response timed out; proposal remains queued"}`.
|
||||
The exact field names should fit the existing response schema so agents can
|
||||
handle success, rejection, and pending with one result parser.
|
||||
|
||||
The proposal file must remain in the queue when the HTTP call times out. The
|
||||
operator can still approve or reject it later, but that later response will not
|
||||
resume the original HTTP request.
|
||||
|
||||
Route-listing should continue to use a short HTTP timeout to egress. Errors
|
||||
should be returned as tool results or JSON-RPC errors consistently with the
|
||||
existing server behavior; the implementation should avoid an unbounded socket
|
||||
wait in the request thread.
|
||||
|
||||
## Testing Strategy
|
||||
|
||||
- Unit-test a tool call whose response appears before the timeout.
|
||||
- Unit-test a tool call whose response never appears and assert the request
|
||||
returns a pending result while the proposal remains queued.
|
||||
- Unit-test invalid timeout env values fall back or fail clearly.
|
||||
- Unit-test `list-egress-routes` timeout/error behavior with a fake URL opener.
|
||||
|
||||
Run:
|
||||
|
||||
- `python3 -m unittest tests.unit.test_supervise_server`
|
||||
- `python3 -m unittest discover -s tests/unit`
|
||||
|
||||
## Open Questions
|
||||
|
||||
None.
|
||||
@@ -8,6 +8,7 @@ import threading
|
||||
import time
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
# The server module loads `supervise` via same-directory import inside
|
||||
@@ -29,8 +30,10 @@ from bot_bottle.supervise_server import (
|
||||
ServerConfig,
|
||||
TOOL_DEFINITIONS,
|
||||
_RpcError,
|
||||
_response_timeout_from_env,
|
||||
format_response_text,
|
||||
handle_initialize,
|
||||
handle_list_egress_routes,
|
||||
handle_tools_call,
|
||||
handle_tools_list,
|
||||
jsonrpc_error,
|
||||
@@ -300,6 +303,70 @@ class TestHandleToolsCall(unittest.TestCase):
|
||||
processed = list((self.queue_dir / "processed").glob("*.json"))
|
||||
self.assertEqual(2, len(processed))
|
||||
|
||||
def test_pending_response_times_out_without_archive(self):
|
||||
config = ServerConfig(
|
||||
bottle_slug="dev",
|
||||
queue_dir=self.queue_dir,
|
||||
response_timeout_seconds=0.05,
|
||||
)
|
||||
result = handle_tools_call(
|
||||
{
|
||||
"name": _sv.TOOL_EGRESS_BLOCK,
|
||||
"arguments": {
|
||||
"host": "example.com",
|
||||
"justification": "need a route",
|
||||
},
|
||||
},
|
||||
config,
|
||||
)
|
||||
|
||||
self.assertFalse(result["isError"]) # type: ignore[index]
|
||||
text = result["content"][0]["text"] # type: ignore[index]
|
||||
self.assertIn("status: pending", text)
|
||||
self.assertIn("proposal remains queued", text)
|
||||
self.assertEqual(1, len(_sv.list_pending_proposals(self.queue_dir)))
|
||||
self.assertFalse((self.queue_dir / "processed").exists())
|
||||
|
||||
|
||||
class TestHandleListEgressRoutes(unittest.TestCase):
|
||||
def test_url_error_returns_tool_error(self):
|
||||
class _Opener:
|
||||
def open(self, *args, **kwargs): # noqa: ANN001, ANN002, ANN003
|
||||
raise OSError("egress unavailable")
|
||||
|
||||
with patch.object(supervise_server.urllib.request, "build_opener", return_value=_Opener()):
|
||||
result = handle_list_egress_routes(
|
||||
{},
|
||||
ServerConfig(bottle_slug="dev", queue_dir=Path("/unused")),
|
||||
)
|
||||
|
||||
self.assertTrue(result["isError"]) # type: ignore[index]
|
||||
text = result["content"][0]["text"] # type: ignore[index]
|
||||
self.assertIn("could not reach", text)
|
||||
self.assertIn("egress unavailable", text)
|
||||
|
||||
|
||||
class TestResponseTimeoutEnv(unittest.TestCase):
|
||||
def test_unset_uses_default(self):
|
||||
self.assertEqual(
|
||||
supervise_server.DEFAULT_RESPONSE_TIMEOUT_SECONDS,
|
||||
_response_timeout_from_env({}),
|
||||
)
|
||||
|
||||
def test_positive_float_accepted(self):
|
||||
self.assertEqual(
|
||||
12.5,
|
||||
_response_timeout_from_env({"SUPERVISE_RESPONSE_TIMEOUT_SECONDS": "12.5"}),
|
||||
)
|
||||
|
||||
def test_invalid_value_rejected(self):
|
||||
with self.assertRaises(ValueError):
|
||||
_response_timeout_from_env({"SUPERVISE_RESPONSE_TIMEOUT_SECONDS": "soon"})
|
||||
|
||||
def test_nonpositive_value_rejected(self):
|
||||
with self.assertRaises(ValueError):
|
||||
_response_timeout_from_env({"SUPERVISE_RESPONSE_TIMEOUT_SECONDS": "0"})
|
||||
|
||||
|
||||
# --- Response text formatting ---------------------------------------------
|
||||
|
||||
|
||||
Reference in New Issue
Block a user