fix(supervise): bound response waits
This commit is contained in:
@@ -35,6 +35,7 @@ import json
|
|||||||
import os
|
import os
|
||||||
import socketserver
|
import socketserver
|
||||||
import sys
|
import sys
|
||||||
|
import time
|
||||||
import typing
|
import typing
|
||||||
import urllib.error
|
import urllib.error
|
||||||
import urllib.parse
|
import urllib.parse
|
||||||
@@ -63,6 +64,10 @@ ERR_METHOD_NOT_FOUND = -32601
|
|||||||
ERR_INVALID_PARAMS = -32602
|
ERR_INVALID_PARAMS = -32602
|
||||||
ERR_INTERNAL = -32603
|
ERR_INTERNAL = -32603
|
||||||
|
|
||||||
|
DEFAULT_RESPONSE_TIMEOUT_SECONDS = 30.0
|
||||||
|
MIN_RESPONSE_POLL_INTERVAL_SECONDS = 0.05
|
||||||
|
EGRESS_LIST_TIMEOUT_SECONDS = 5.0
|
||||||
|
|
||||||
|
|
||||||
@dataclass(frozen=True)
|
@dataclass(frozen=True)
|
||||||
class JsonRpcRequest:
|
class JsonRpcRequest:
|
||||||
@@ -412,6 +417,7 @@ def _validate_and_bundle_egress_route(
|
|||||||
class ServerConfig:
|
class ServerConfig:
|
||||||
bottle_slug: str
|
bottle_slug: str
|
||||||
queue_dir: Path
|
queue_dir: Path
|
||||||
|
response_timeout_seconds: float = DEFAULT_RESPONSE_TIMEOUT_SECONDS
|
||||||
|
|
||||||
|
|
||||||
def handle_initialize(_params: dict[str, object]) -> dict[str, object]:
|
def handle_initialize(_params: dict[str, object]) -> dict[str, object]:
|
||||||
@@ -442,7 +448,7 @@ def handle_list_egress_routes(
|
|||||||
})
|
})
|
||||||
opener = urllib.request.build_opener(proxy_handler)
|
opener = urllib.request.build_opener(proxy_handler)
|
||||||
try:
|
try:
|
||||||
with opener.open(_sv.EGRESS_INTROSPECT_URL, timeout=5) as resp:
|
with opener.open(_sv.EGRESS_INTROSPECT_URL, timeout=EGRESS_LIST_TIMEOUT_SECONDS) as resp:
|
||||||
body = resp.read().decode("utf-8")
|
body = resp.read().decode("utf-8")
|
||||||
except (urllib.error.URLError, OSError) as e:
|
except (urllib.error.URLError, OSError) as e:
|
||||||
return {
|
return {
|
||||||
@@ -520,7 +526,20 @@ def handle_tools_call(
|
|||||||
f"for bottle {config.bottle_slug}; waiting for operator...\n"
|
f"for bottle {config.bottle_slug}; waiting for operator...\n"
|
||||||
)
|
)
|
||||||
sys.stderr.flush()
|
sys.stderr.flush()
|
||||||
response = _sv.wait_for_response(config.queue_dir, proposal.id)
|
deadline = time.monotonic() + config.response_timeout_seconds
|
||||||
|
try:
|
||||||
|
response = _sv.wait_for_response(
|
||||||
|
config.queue_dir,
|
||||||
|
proposal.id,
|
||||||
|
poll_interval=MIN_RESPONSE_POLL_INTERVAL_SECONDS,
|
||||||
|
deadline=deadline,
|
||||||
|
)
|
||||||
|
except TimeoutError:
|
||||||
|
text = format_pending_response_text(config.response_timeout_seconds)
|
||||||
|
return {
|
||||||
|
"content": [{"type": "text", "text": text}],
|
||||||
|
"isError": False,
|
||||||
|
}
|
||||||
_sv.archive_proposal(config.queue_dir, proposal.id)
|
_sv.archive_proposal(config.queue_dir, proposal.id)
|
||||||
|
|
||||||
text = format_response_text(response)
|
text = format_response_text(response)
|
||||||
@@ -542,6 +561,16 @@ def format_response_text(response: "_sv.Response") -> str:
|
|||||||
return "\n".join(lines)
|
return "\n".join(lines)
|
||||||
|
|
||||||
|
|
||||||
|
def format_pending_response_text(timeout_seconds: float) -> str:
|
||||||
|
return "\n".join([
|
||||||
|
"status: pending",
|
||||||
|
(
|
||||||
|
"notes: operator response timed out after "
|
||||||
|
f"{timeout_seconds:g}s; proposal remains queued"
|
||||||
|
),
|
||||||
|
])
|
||||||
|
|
||||||
|
|
||||||
# --- HTTP transport --------------------------------------------------------
|
# --- HTTP transport --------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
@@ -654,10 +683,15 @@ def serve(
|
|||||||
queue_dir: Path,
|
queue_dir: Path,
|
||||||
port: int = _sv.SUPERVISE_PORT,
|
port: int = _sv.SUPERVISE_PORT,
|
||||||
bind: str = "0.0.0.0",
|
bind: str = "0.0.0.0",
|
||||||
|
response_timeout_seconds: float = DEFAULT_RESPONSE_TIMEOUT_SECONDS,
|
||||||
) -> typing.NoReturn:
|
) -> typing.NoReturn:
|
||||||
queue_dir.mkdir(parents=True, exist_ok=True)
|
queue_dir.mkdir(parents=True, exist_ok=True)
|
||||||
server = MCPServer((bind, port), MCPHandler)
|
server = MCPServer((bind, port), MCPHandler)
|
||||||
server.config = ServerConfig(bottle_slug=bottle_slug, queue_dir=queue_dir)
|
server.config = ServerConfig(
|
||||||
|
bottle_slug=bottle_slug,
|
||||||
|
queue_dir=queue_dir,
|
||||||
|
response_timeout_seconds=response_timeout_seconds,
|
||||||
|
)
|
||||||
sys.stderr.write(
|
sys.stderr.write(
|
||||||
f"supervise listening on {bind}:{port}; "
|
f"supervise listening on {bind}:{port}; "
|
||||||
f"slug={bottle_slug!r}; queue={queue_dir}; "
|
f"slug={bottle_slug!r}; queue={queue_dir}; "
|
||||||
@@ -682,9 +716,37 @@ def main(argv: list[str]) -> int:
|
|||||||
queue_dir = Path(os.environ.get("SUPERVISE_QUEUE_DIR", _sv.QUEUE_DIR_IN_CONTAINER))
|
queue_dir = Path(os.environ.get("SUPERVISE_QUEUE_DIR", _sv.QUEUE_DIR_IN_CONTAINER))
|
||||||
port = int(os.environ.get("SUPERVISE_PORT", str(_sv.SUPERVISE_PORT)))
|
port = int(os.environ.get("SUPERVISE_PORT", str(_sv.SUPERVISE_PORT)))
|
||||||
bind = os.environ.get("SUPERVISE_BIND", "0.0.0.0")
|
bind = os.environ.get("SUPERVISE_BIND", "0.0.0.0")
|
||||||
serve(bottle_slug=bottle_slug, queue_dir=queue_dir, port=port, bind=bind)
|
try:
|
||||||
|
response_timeout_seconds = _response_timeout_from_env(os.environ)
|
||||||
|
except ValueError as e:
|
||||||
|
sys.stderr.write(f"supervise: {e}\n")
|
||||||
|
return 2
|
||||||
|
serve(
|
||||||
|
bottle_slug=bottle_slug,
|
||||||
|
queue_dir=queue_dir,
|
||||||
|
port=port,
|
||||||
|
bind=bind,
|
||||||
|
response_timeout_seconds=response_timeout_seconds,
|
||||||
|
)
|
||||||
return 0 # serve() does not return
|
return 0 # serve() does not return
|
||||||
|
|
||||||
|
|
||||||
|
def _response_timeout_from_env(env: typing.Mapping[str, str]) -> float:
|
||||||
|
raw = env.get("SUPERVISE_RESPONSE_TIMEOUT_SECONDS", "").strip()
|
||||||
|
if not raw:
|
||||||
|
return DEFAULT_RESPONSE_TIMEOUT_SECONDS
|
||||||
|
try:
|
||||||
|
value = float(raw)
|
||||||
|
except ValueError as e:
|
||||||
|
raise ValueError(
|
||||||
|
"SUPERVISE_RESPONSE_TIMEOUT_SECONDS must be a positive number"
|
||||||
|
) from e
|
||||||
|
if value <= 0:
|
||||||
|
raise ValueError(
|
||||||
|
"SUPERVISE_RESPONSE_TIMEOUT_SECONDS must be a positive number"
|
||||||
|
)
|
||||||
|
return value
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
raise SystemExit(main(sys.argv))
|
raise SystemExit(main(sys.argv))
|
||||||
|
|||||||
@@ -65,8 +65,8 @@ Out of scope:
|
|||||||
|
|
||||||
## Design
|
## Design
|
||||||
|
|
||||||
Introduce a supervise response wait budget, for example
|
Introduce a supervise response wait budget,
|
||||||
`SUPERVISE_RESPONSE_TIMEOUT_SECONDS`, with a documented default. The existing
|
`SUPERVISE_RESPONSE_TIMEOUT_SECONDS`, with a 30 second default. The existing
|
||||||
poll loop should stop after that budget and return a normal tool result such as
|
poll loop should stop after that budget and return a normal tool result such as
|
||||||
`{"status": "pending", "notes": "operator response timed out; proposal remains queued"}`.
|
`{"status": "pending", "notes": "operator response timed out; proposal remains queued"}`.
|
||||||
The exact field names should fit the existing response schema so agents can
|
The exact field names should fit the existing response schema so agents can
|
||||||
@@ -96,6 +96,4 @@ Run:
|
|||||||
|
|
||||||
## Open Questions
|
## Open Questions
|
||||||
|
|
||||||
- What default wait budget is best for agent ergonomics? A short timeout keeps
|
None.
|
||||||
worker threads free; a longer timeout gives an operator more time to respond
|
|
||||||
inline before the agent has to retry.
|
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import threading
|
|||||||
import time
|
import time
|
||||||
import unittest
|
import unittest
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
|
||||||
# The server module loads `supervise` via same-directory import inside
|
# The server module loads `supervise` via same-directory import inside
|
||||||
@@ -29,8 +30,10 @@ from bot_bottle.supervise_server import (
|
|||||||
ServerConfig,
|
ServerConfig,
|
||||||
TOOL_DEFINITIONS,
|
TOOL_DEFINITIONS,
|
||||||
_RpcError,
|
_RpcError,
|
||||||
|
_response_timeout_from_env,
|
||||||
format_response_text,
|
format_response_text,
|
||||||
handle_initialize,
|
handle_initialize,
|
||||||
|
handle_list_egress_routes,
|
||||||
handle_tools_call,
|
handle_tools_call,
|
||||||
handle_tools_list,
|
handle_tools_list,
|
||||||
jsonrpc_error,
|
jsonrpc_error,
|
||||||
@@ -300,6 +303,70 @@ class TestHandleToolsCall(unittest.TestCase):
|
|||||||
processed = list((self.queue_dir / "processed").glob("*.json"))
|
processed = list((self.queue_dir / "processed").glob("*.json"))
|
||||||
self.assertEqual(2, len(processed))
|
self.assertEqual(2, len(processed))
|
||||||
|
|
||||||
|
def test_pending_response_times_out_without_archive(self):
|
||||||
|
config = ServerConfig(
|
||||||
|
bottle_slug="dev",
|
||||||
|
queue_dir=self.queue_dir,
|
||||||
|
response_timeout_seconds=0.05,
|
||||||
|
)
|
||||||
|
result = handle_tools_call(
|
||||||
|
{
|
||||||
|
"name": _sv.TOOL_EGRESS_BLOCK,
|
||||||
|
"arguments": {
|
||||||
|
"host": "example.com",
|
||||||
|
"justification": "need a route",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
config,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertFalse(result["isError"]) # type: ignore[index]
|
||||||
|
text = result["content"][0]["text"] # type: ignore[index]
|
||||||
|
self.assertIn("status: pending", text)
|
||||||
|
self.assertIn("proposal remains queued", text)
|
||||||
|
self.assertEqual(1, len(_sv.list_pending_proposals(self.queue_dir)))
|
||||||
|
self.assertFalse((self.queue_dir / "processed").exists())
|
||||||
|
|
||||||
|
|
||||||
|
class TestHandleListEgressRoutes(unittest.TestCase):
|
||||||
|
def test_url_error_returns_tool_error(self):
|
||||||
|
class _Opener:
|
||||||
|
def open(self, *args, **kwargs): # noqa: ANN001, ANN002, ANN003
|
||||||
|
raise OSError("egress unavailable")
|
||||||
|
|
||||||
|
with patch.object(supervise_server.urllib.request, "build_opener", return_value=_Opener()):
|
||||||
|
result = handle_list_egress_routes(
|
||||||
|
{},
|
||||||
|
ServerConfig(bottle_slug="dev", queue_dir=Path("/unused")),
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertTrue(result["isError"]) # type: ignore[index]
|
||||||
|
text = result["content"][0]["text"] # type: ignore[index]
|
||||||
|
self.assertIn("could not reach", text)
|
||||||
|
self.assertIn("egress unavailable", text)
|
||||||
|
|
||||||
|
|
||||||
|
class TestResponseTimeoutEnv(unittest.TestCase):
|
||||||
|
def test_unset_uses_default(self):
|
||||||
|
self.assertEqual(
|
||||||
|
supervise_server.DEFAULT_RESPONSE_TIMEOUT_SECONDS,
|
||||||
|
_response_timeout_from_env({}),
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_positive_float_accepted(self):
|
||||||
|
self.assertEqual(
|
||||||
|
12.5,
|
||||||
|
_response_timeout_from_env({"SUPERVISE_RESPONSE_TIMEOUT_SECONDS": "12.5"}),
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_invalid_value_rejected(self):
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
_response_timeout_from_env({"SUPERVISE_RESPONSE_TIMEOUT_SECONDS": "soon"})
|
||||||
|
|
||||||
|
def test_nonpositive_value_rejected(self):
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
_response_timeout_from_env({"SUPERVISE_RESPONSE_TIMEOUT_SECONDS": "0"})
|
||||||
|
|
||||||
|
|
||||||
# --- Response text formatting ---------------------------------------------
|
# --- Response text formatting ---------------------------------------------
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user