Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 1c5d8adb55 |
@@ -35,7 +35,6 @@ import json
|
|||||||
import os
|
import os
|
||||||
import socketserver
|
import socketserver
|
||||||
import sys
|
import sys
|
||||||
import time
|
|
||||||
import typing
|
import typing
|
||||||
import urllib.error
|
import urllib.error
|
||||||
import urllib.parse
|
import urllib.parse
|
||||||
@@ -64,10 +63,6 @@ ERR_METHOD_NOT_FOUND = -32601
|
|||||||
ERR_INVALID_PARAMS = -32602
|
ERR_INVALID_PARAMS = -32602
|
||||||
ERR_INTERNAL = -32603
|
ERR_INTERNAL = -32603
|
||||||
|
|
||||||
DEFAULT_RESPONSE_TIMEOUT_SECONDS = 30.0
|
|
||||||
MIN_RESPONSE_POLL_INTERVAL_SECONDS = 0.05
|
|
||||||
EGRESS_LIST_TIMEOUT_SECONDS = 5.0
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass(frozen=True)
|
@dataclass(frozen=True)
|
||||||
class JsonRpcRequest:
|
class JsonRpcRequest:
|
||||||
@@ -417,7 +412,6 @@ def _validate_and_bundle_egress_route(
|
|||||||
class ServerConfig:
|
class ServerConfig:
|
||||||
bottle_slug: str
|
bottle_slug: str
|
||||||
queue_dir: Path
|
queue_dir: Path
|
||||||
response_timeout_seconds: float = DEFAULT_RESPONSE_TIMEOUT_SECONDS
|
|
||||||
|
|
||||||
|
|
||||||
def handle_initialize(_params: dict[str, object]) -> dict[str, object]:
|
def handle_initialize(_params: dict[str, object]) -> dict[str, object]:
|
||||||
@@ -448,7 +442,7 @@ def handle_list_egress_routes(
|
|||||||
})
|
})
|
||||||
opener = urllib.request.build_opener(proxy_handler)
|
opener = urllib.request.build_opener(proxy_handler)
|
||||||
try:
|
try:
|
||||||
with opener.open(_sv.EGRESS_INTROSPECT_URL, timeout=EGRESS_LIST_TIMEOUT_SECONDS) as resp:
|
with opener.open(_sv.EGRESS_INTROSPECT_URL, timeout=5) as resp:
|
||||||
body = resp.read().decode("utf-8")
|
body = resp.read().decode("utf-8")
|
||||||
except (urllib.error.URLError, OSError) as e:
|
except (urllib.error.URLError, OSError) as e:
|
||||||
return {
|
return {
|
||||||
@@ -526,20 +520,7 @@ def handle_tools_call(
|
|||||||
f"for bottle {config.bottle_slug}; waiting for operator...\n"
|
f"for bottle {config.bottle_slug}; waiting for operator...\n"
|
||||||
)
|
)
|
||||||
sys.stderr.flush()
|
sys.stderr.flush()
|
||||||
deadline = time.monotonic() + config.response_timeout_seconds
|
response = _sv.wait_for_response(config.queue_dir, proposal.id)
|
||||||
try:
|
|
||||||
response = _sv.wait_for_response(
|
|
||||||
config.queue_dir,
|
|
||||||
proposal.id,
|
|
||||||
poll_interval=MIN_RESPONSE_POLL_INTERVAL_SECONDS,
|
|
||||||
deadline=deadline,
|
|
||||||
)
|
|
||||||
except TimeoutError:
|
|
||||||
text = format_pending_response_text(config.response_timeout_seconds)
|
|
||||||
return {
|
|
||||||
"content": [{"type": "text", "text": text}],
|
|
||||||
"isError": False,
|
|
||||||
}
|
|
||||||
_sv.archive_proposal(config.queue_dir, proposal.id)
|
_sv.archive_proposal(config.queue_dir, proposal.id)
|
||||||
|
|
||||||
text = format_response_text(response)
|
text = format_response_text(response)
|
||||||
@@ -561,16 +542,6 @@ def format_response_text(response: "_sv.Response") -> str:
|
|||||||
return "\n".join(lines)
|
return "\n".join(lines)
|
||||||
|
|
||||||
|
|
||||||
def format_pending_response_text(timeout_seconds: float) -> str:
|
|
||||||
return "\n".join([
|
|
||||||
"status: pending",
|
|
||||||
(
|
|
||||||
"notes: operator response timed out after "
|
|
||||||
f"{timeout_seconds:g}s; proposal remains queued"
|
|
||||||
),
|
|
||||||
])
|
|
||||||
|
|
||||||
|
|
||||||
# --- HTTP transport --------------------------------------------------------
|
# --- HTTP transport --------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
@@ -683,15 +654,10 @@ def serve(
|
|||||||
queue_dir: Path,
|
queue_dir: Path,
|
||||||
port: int = _sv.SUPERVISE_PORT,
|
port: int = _sv.SUPERVISE_PORT,
|
||||||
bind: str = "0.0.0.0",
|
bind: str = "0.0.0.0",
|
||||||
response_timeout_seconds: float = DEFAULT_RESPONSE_TIMEOUT_SECONDS,
|
|
||||||
) -> typing.NoReturn:
|
) -> typing.NoReturn:
|
||||||
queue_dir.mkdir(parents=True, exist_ok=True)
|
queue_dir.mkdir(parents=True, exist_ok=True)
|
||||||
server = MCPServer((bind, port), MCPHandler)
|
server = MCPServer((bind, port), MCPHandler)
|
||||||
server.config = ServerConfig(
|
server.config = ServerConfig(bottle_slug=bottle_slug, queue_dir=queue_dir)
|
||||||
bottle_slug=bottle_slug,
|
|
||||||
queue_dir=queue_dir,
|
|
||||||
response_timeout_seconds=response_timeout_seconds,
|
|
||||||
)
|
|
||||||
sys.stderr.write(
|
sys.stderr.write(
|
||||||
f"supervise listening on {bind}:{port}; "
|
f"supervise listening on {bind}:{port}; "
|
||||||
f"slug={bottle_slug!r}; queue={queue_dir}; "
|
f"slug={bottle_slug!r}; queue={queue_dir}; "
|
||||||
@@ -716,37 +682,9 @@ def main(argv: list[str]) -> int:
|
|||||||
queue_dir = Path(os.environ.get("SUPERVISE_QUEUE_DIR", _sv.QUEUE_DIR_IN_CONTAINER))
|
queue_dir = Path(os.environ.get("SUPERVISE_QUEUE_DIR", _sv.QUEUE_DIR_IN_CONTAINER))
|
||||||
port = int(os.environ.get("SUPERVISE_PORT", str(_sv.SUPERVISE_PORT)))
|
port = int(os.environ.get("SUPERVISE_PORT", str(_sv.SUPERVISE_PORT)))
|
||||||
bind = os.environ.get("SUPERVISE_BIND", "0.0.0.0")
|
bind = os.environ.get("SUPERVISE_BIND", "0.0.0.0")
|
||||||
try:
|
serve(bottle_slug=bottle_slug, queue_dir=queue_dir, port=port, bind=bind)
|
||||||
response_timeout_seconds = _response_timeout_from_env(os.environ)
|
|
||||||
except ValueError as e:
|
|
||||||
sys.stderr.write(f"supervise: {e}\n")
|
|
||||||
return 2
|
|
||||||
serve(
|
|
||||||
bottle_slug=bottle_slug,
|
|
||||||
queue_dir=queue_dir,
|
|
||||||
port=port,
|
|
||||||
bind=bind,
|
|
||||||
response_timeout_seconds=response_timeout_seconds,
|
|
||||||
)
|
|
||||||
return 0 # serve() does not return
|
return 0 # serve() does not return
|
||||||
|
|
||||||
|
|
||||||
def _response_timeout_from_env(env: typing.Mapping[str, str]) -> float:
|
|
||||||
raw = env.get("SUPERVISE_RESPONSE_TIMEOUT_SECONDS", "").strip()
|
|
||||||
if not raw:
|
|
||||||
return DEFAULT_RESPONSE_TIMEOUT_SECONDS
|
|
||||||
try:
|
|
||||||
value = float(raw)
|
|
||||||
except ValueError as e:
|
|
||||||
raise ValueError(
|
|
||||||
"SUPERVISE_RESPONSE_TIMEOUT_SECONDS must be a positive number"
|
|
||||||
) from e
|
|
||||||
if value <= 0:
|
|
||||||
raise ValueError(
|
|
||||||
"SUPERVISE_RESPONSE_TIMEOUT_SECONDS must be a positive number"
|
|
||||||
)
|
|
||||||
return value
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
raise SystemExit(main(sys.argv))
|
raise SystemExit(main(sys.argv))
|
||||||
|
|||||||
@@ -1,99 +0,0 @@
|
|||||||
# PRD 0035: Supervise Wait Bounds
|
|
||||||
|
|
||||||
- **Status:** Active
|
|
||||||
- **Author:** didericis-codex
|
|
||||||
- **Created:** 2026-06-02
|
|
||||||
- **Issue:** #128
|
|
||||||
|
|
||||||
## Summary
|
|
||||||
|
|
||||||
Bound the supervise sidecar's request-thread waits so an agent tool call cannot
|
|
||||||
hold an HTTP worker forever while waiting for operator action. Preserve the MCP
|
|
||||||
tool surface, but make timeout behavior explicit, observable, and tested.
|
|
||||||
|
|
||||||
## Problem
|
|
||||||
|
|
||||||
`bot_bottle/supervise_server.py` handles MCP over a threaded stdlib HTTP
|
|
||||||
server. Tool calls validate a proposal, write it to the supervise queue, and
|
|
||||||
then wait for the operator response file. Today that wait can last forever.
|
|
||||||
Each outstanding tool call consumes one server thread until the operator acts.
|
|
||||||
|
|
||||||
The route-listing helper also performs a live HTTP request to egress inside the
|
|
||||||
request thread. It has a short timeout today, but the behavior is not described
|
|
||||||
as part of a broader request-thread budget.
|
|
||||||
|
|
||||||
This is operationally risky in multi-agent or repeated-call scenarios: a stuck
|
|
||||||
or ignored proposal can accumulate blocked threads, and callers do not get a
|
|
||||||
clear "still pending" answer they can reason about.
|
|
||||||
|
|
||||||
## Goals / Success Criteria
|
|
||||||
|
|
||||||
- Add a bounded wait for operator responses to supervise tool calls.
|
|
||||||
- Return a clear JSON-RPC tool result when the proposal remains pending after
|
|
||||||
the timeout; do not treat pending operator action as an internal server error.
|
|
||||||
- Keep the queued proposal on disk after timeout so the operator can still act.
|
|
||||||
- Make the wait duration configurable by environment with a conservative
|
|
||||||
default.
|
|
||||||
- Preserve current success and rejection result shapes for completed operator
|
|
||||||
responses.
|
|
||||||
- Keep `list-egress-routes` bounded and document its timeout behavior.
|
|
||||||
- Add focused tests for approved responses, timed-out pending responses, and
|
|
||||||
route-list timeout/error handling.
|
|
||||||
|
|
||||||
## Non-goals
|
|
||||||
|
|
||||||
- No asynchronous framework or new runtime dependency.
|
|
||||||
- No replacement of the stdlib threaded HTTP server.
|
|
||||||
- No change to the host-side supervise queue format.
|
|
||||||
- No cancellation protocol between the agent and operator UI.
|
|
||||||
- No dashboard or TUI changes.
|
|
||||||
|
|
||||||
## Scope
|
|
||||||
|
|
||||||
In scope:
|
|
||||||
|
|
||||||
- `bot_bottle/supervise_server.py` request handling.
|
|
||||||
- Any small helper in `bot_bottle/supervise.py` needed to support a bounded
|
|
||||||
wait cleanly.
|
|
||||||
- Unit tests around tool-call response waiting and route-list behavior.
|
|
||||||
|
|
||||||
Out of scope:
|
|
||||||
|
|
||||||
- Reworking proposal persistence.
|
|
||||||
- Changing egress apply or pipelock apply flows.
|
|
||||||
- Adding background workers to complete HTTP requests after the client returns.
|
|
||||||
|
|
||||||
## Design
|
|
||||||
|
|
||||||
Introduce a supervise response wait budget,
|
|
||||||
`SUPERVISE_RESPONSE_TIMEOUT_SECONDS`, with a 30 second default. The existing
|
|
||||||
poll loop should stop after that budget and return a normal tool result such as
|
|
||||||
`{"status": "pending", "notes": "operator response timed out; proposal remains queued"}`.
|
|
||||||
The exact field names should fit the existing response schema so agents can
|
|
||||||
handle success, rejection, and pending with one result parser.
|
|
||||||
|
|
||||||
The proposal file must remain in the queue when the HTTP call times out. The
|
|
||||||
operator can still approve or reject it later, but that later response will not
|
|
||||||
resume the original HTTP request.
|
|
||||||
|
|
||||||
Route-listing should continue to use a short HTTP timeout to egress. Errors
|
|
||||||
should be returned as tool results or JSON-RPC errors consistently with the
|
|
||||||
existing server behavior; the implementation should avoid an unbounded socket
|
|
||||||
wait in the request thread.
|
|
||||||
|
|
||||||
## Testing Strategy
|
|
||||||
|
|
||||||
- Unit-test a tool call whose response appears before the timeout.
|
|
||||||
- Unit-test a tool call whose response never appears and assert the request
|
|
||||||
returns a pending result while the proposal remains queued.
|
|
||||||
- Unit-test invalid timeout env values fall back or fail clearly.
|
|
||||||
- Unit-test `list-egress-routes` timeout/error behavior with a fake URL opener.
|
|
||||||
|
|
||||||
Run:
|
|
||||||
|
|
||||||
- `python3 -m unittest tests.unit.test_supervise_server`
|
|
||||||
- `python3 -m unittest discover -s tests/unit`
|
|
||||||
|
|
||||||
## Open Questions
|
|
||||||
|
|
||||||
None.
|
|
||||||
@@ -8,7 +8,6 @@ import threading
|
|||||||
import time
|
import time
|
||||||
import unittest
|
import unittest
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from unittest.mock import patch
|
|
||||||
|
|
||||||
|
|
||||||
# The server module loads `supervise` via same-directory import inside
|
# The server module loads `supervise` via same-directory import inside
|
||||||
@@ -30,10 +29,8 @@ from bot_bottle.supervise_server import (
|
|||||||
ServerConfig,
|
ServerConfig,
|
||||||
TOOL_DEFINITIONS,
|
TOOL_DEFINITIONS,
|
||||||
_RpcError,
|
_RpcError,
|
||||||
_response_timeout_from_env,
|
|
||||||
format_response_text,
|
format_response_text,
|
||||||
handle_initialize,
|
handle_initialize,
|
||||||
handle_list_egress_routes,
|
|
||||||
handle_tools_call,
|
handle_tools_call,
|
||||||
handle_tools_list,
|
handle_tools_list,
|
||||||
jsonrpc_error,
|
jsonrpc_error,
|
||||||
@@ -303,70 +300,6 @@ class TestHandleToolsCall(unittest.TestCase):
|
|||||||
processed = list((self.queue_dir / "processed").glob("*.json"))
|
processed = list((self.queue_dir / "processed").glob("*.json"))
|
||||||
self.assertEqual(2, len(processed))
|
self.assertEqual(2, len(processed))
|
||||||
|
|
||||||
def test_pending_response_times_out_without_archive(self):
|
|
||||||
config = ServerConfig(
|
|
||||||
bottle_slug="dev",
|
|
||||||
queue_dir=self.queue_dir,
|
|
||||||
response_timeout_seconds=0.05,
|
|
||||||
)
|
|
||||||
result = handle_tools_call(
|
|
||||||
{
|
|
||||||
"name": _sv.TOOL_EGRESS_BLOCK,
|
|
||||||
"arguments": {
|
|
||||||
"host": "example.com",
|
|
||||||
"justification": "need a route",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
config,
|
|
||||||
)
|
|
||||||
|
|
||||||
self.assertFalse(result["isError"]) # type: ignore[index]
|
|
||||||
text = result["content"][0]["text"] # type: ignore[index]
|
|
||||||
self.assertIn("status: pending", text)
|
|
||||||
self.assertIn("proposal remains queued", text)
|
|
||||||
self.assertEqual(1, len(_sv.list_pending_proposals(self.queue_dir)))
|
|
||||||
self.assertFalse((self.queue_dir / "processed").exists())
|
|
||||||
|
|
||||||
|
|
||||||
class TestHandleListEgressRoutes(unittest.TestCase):
|
|
||||||
def test_url_error_returns_tool_error(self):
|
|
||||||
class _Opener:
|
|
||||||
def open(self, *args, **kwargs): # noqa: ANN001, ANN002, ANN003
|
|
||||||
raise OSError("egress unavailable")
|
|
||||||
|
|
||||||
with patch.object(supervise_server.urllib.request, "build_opener", return_value=_Opener()):
|
|
||||||
result = handle_list_egress_routes(
|
|
||||||
{},
|
|
||||||
ServerConfig(bottle_slug="dev", queue_dir=Path("/unused")),
|
|
||||||
)
|
|
||||||
|
|
||||||
self.assertTrue(result["isError"]) # type: ignore[index]
|
|
||||||
text = result["content"][0]["text"] # type: ignore[index]
|
|
||||||
self.assertIn("could not reach", text)
|
|
||||||
self.assertIn("egress unavailable", text)
|
|
||||||
|
|
||||||
|
|
||||||
class TestResponseTimeoutEnv(unittest.TestCase):
|
|
||||||
def test_unset_uses_default(self):
|
|
||||||
self.assertEqual(
|
|
||||||
supervise_server.DEFAULT_RESPONSE_TIMEOUT_SECONDS,
|
|
||||||
_response_timeout_from_env({}),
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_positive_float_accepted(self):
|
|
||||||
self.assertEqual(
|
|
||||||
12.5,
|
|
||||||
_response_timeout_from_env({"SUPERVISE_RESPONSE_TIMEOUT_SECONDS": "12.5"}),
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_invalid_value_rejected(self):
|
|
||||||
with self.assertRaises(ValueError):
|
|
||||||
_response_timeout_from_env({"SUPERVISE_RESPONSE_TIMEOUT_SECONDS": "soon"})
|
|
||||||
|
|
||||||
def test_nonpositive_value_rejected(self):
|
|
||||||
with self.assertRaises(ValueError):
|
|
||||||
_response_timeout_from_env({"SUPERVISE_RESPONSE_TIMEOUT_SECONDS": "0"})
|
|
||||||
|
|
||||||
|
|
||||||
# --- Response text formatting ---------------------------------------------
|
# --- Response text formatting ---------------------------------------------
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user