Compare commits

..

4 Commits

Author SHA1 Message Date
didericis-codex 47b75030ee docs(prd): add pipelock yaml contract
test / unit (pull_request) Successful in 31s
test / integration (pull_request) Successful in 40s
2026-06-02 04:09:55 -04:00
didericis-codex 3472e06efb complete(prd): mark PRD 0035 active
test / integration (pull_request) Successful in 1m4s
test / unit (pull_request) Successful in 45s
test / unit (push) Successful in 36s
test / integration (push) Successful in 46s
2026-06-02 08:06:53 +00:00
didericis-codex 82ce5d3034 fix(supervise): bound response waits 2026-06-02 08:06:45 +00:00
didericis-codex 7c260eeff9 docs(prd): add supervise wait bounds
test / unit (pull_request) Successful in 36s
test / integration (pull_request) Successful in 54s
2026-06-02 07:58:39 +00:00
3 changed files with 232 additions and 4 deletions
+66 -4
View File
@@ -35,6 +35,7 @@ import json
import os
import socketserver
import sys
import time
import typing
import urllib.error
import urllib.parse
@@ -63,6 +64,10 @@ ERR_METHOD_NOT_FOUND = -32601
ERR_INVALID_PARAMS = -32602
ERR_INTERNAL = -32603
DEFAULT_RESPONSE_TIMEOUT_SECONDS = 30.0
MIN_RESPONSE_POLL_INTERVAL_SECONDS = 0.05
EGRESS_LIST_TIMEOUT_SECONDS = 5.0
@dataclass(frozen=True)
class JsonRpcRequest:
@@ -412,6 +417,7 @@ def _validate_and_bundle_egress_route(
class ServerConfig:
bottle_slug: str
queue_dir: Path
response_timeout_seconds: float = DEFAULT_RESPONSE_TIMEOUT_SECONDS
def handle_initialize(_params: dict[str, object]) -> dict[str, object]:
@@ -442,7 +448,7 @@ def handle_list_egress_routes(
})
opener = urllib.request.build_opener(proxy_handler)
try:
with opener.open(_sv.EGRESS_INTROSPECT_URL, timeout=5) as resp:
with opener.open(_sv.EGRESS_INTROSPECT_URL, timeout=EGRESS_LIST_TIMEOUT_SECONDS) as resp:
body = resp.read().decode("utf-8")
except (urllib.error.URLError, OSError) as e:
return {
@@ -520,7 +526,20 @@ def handle_tools_call(
f"for bottle {config.bottle_slug}; waiting for operator...\n"
)
sys.stderr.flush()
response = _sv.wait_for_response(config.queue_dir, proposal.id)
deadline = time.monotonic() + config.response_timeout_seconds
try:
response = _sv.wait_for_response(
config.queue_dir,
proposal.id,
poll_interval=MIN_RESPONSE_POLL_INTERVAL_SECONDS,
deadline=deadline,
)
except TimeoutError:
text = format_pending_response_text(config.response_timeout_seconds)
return {
"content": [{"type": "text", "text": text}],
"isError": False,
}
_sv.archive_proposal(config.queue_dir, proposal.id)
text = format_response_text(response)
@@ -542,6 +561,16 @@ def format_response_text(response: "_sv.Response") -> str:
return "\n".join(lines)
def format_pending_response_text(timeout_seconds: float) -> str:
return "\n".join([
"status: pending",
(
"notes: operator response timed out after "
f"{timeout_seconds:g}s; proposal remains queued"
),
])
# --- HTTP transport --------------------------------------------------------
@@ -654,10 +683,15 @@ def serve(
queue_dir: Path,
port: int = _sv.SUPERVISE_PORT,
bind: str = "0.0.0.0",
response_timeout_seconds: float = DEFAULT_RESPONSE_TIMEOUT_SECONDS,
) -> typing.NoReturn:
queue_dir.mkdir(parents=True, exist_ok=True)
server = MCPServer((bind, port), MCPHandler)
server.config = ServerConfig(bottle_slug=bottle_slug, queue_dir=queue_dir)
server.config = ServerConfig(
bottle_slug=bottle_slug,
queue_dir=queue_dir,
response_timeout_seconds=response_timeout_seconds,
)
sys.stderr.write(
f"supervise listening on {bind}:{port}; "
f"slug={bottle_slug!r}; queue={queue_dir}; "
@@ -682,9 +716,37 @@ def main(argv: list[str]) -> int:
queue_dir = Path(os.environ.get("SUPERVISE_QUEUE_DIR", _sv.QUEUE_DIR_IN_CONTAINER))
port = int(os.environ.get("SUPERVISE_PORT", str(_sv.SUPERVISE_PORT)))
bind = os.environ.get("SUPERVISE_BIND", "0.0.0.0")
serve(bottle_slug=bottle_slug, queue_dir=queue_dir, port=port, bind=bind)
try:
response_timeout_seconds = _response_timeout_from_env(os.environ)
except ValueError as e:
sys.stderr.write(f"supervise: {e}\n")
return 2
serve(
bottle_slug=bottle_slug,
queue_dir=queue_dir,
port=port,
bind=bind,
response_timeout_seconds=response_timeout_seconds,
)
return 0 # serve() does not return
def _response_timeout_from_env(env: typing.Mapping[str, str]) -> float:
raw = env.get("SUPERVISE_RESPONSE_TIMEOUT_SECONDS", "").strip()
if not raw:
return DEFAULT_RESPONSE_TIMEOUT_SECONDS
try:
value = float(raw)
except ValueError as e:
raise ValueError(
"SUPERVISE_RESPONSE_TIMEOUT_SECONDS must be a positive number"
) from e
if value <= 0:
raise ValueError(
"SUPERVISE_RESPONSE_TIMEOUT_SECONDS must be a positive number"
)
return value
if __name__ == "__main__":
raise SystemExit(main(sys.argv))
+99
View File
@@ -0,0 +1,99 @@
# PRD 0035: Supervise Wait Bounds
- **Status:** Active
- **Author:** didericis-codex
- **Created:** 2026-06-02
- **Issue:** #128
## Summary
Bound the supervise sidecar's request-thread waits so an agent tool call cannot
hold an HTTP worker forever while waiting for operator action. Preserve the MCP
tool surface, but make timeout behavior explicit, observable, and tested.
## Problem
`bot_bottle/supervise_server.py` handles MCP over a threaded stdlib HTTP
server. Tool calls validate a proposal, write it to the supervise queue, and
then wait for the operator response file. Today that wait can last forever.
Each outstanding tool call consumes one server thread until the operator acts.
The route-listing helper also performs a live HTTP request to egress inside the
request thread. It has a short timeout today, but the behavior is not described
as part of a broader request-thread budget.
This is operationally risky in multi-agent or repeated-call scenarios: a stuck
or ignored proposal can accumulate blocked threads, and callers do not get a
clear "still pending" answer they can reason about.
## Goals / Success Criteria
- Add a bounded wait for operator responses to supervise tool calls.
- Return a clear JSON-RPC tool result when the proposal remains pending after
the timeout; do not treat pending operator action as an internal server error.
- Keep the queued proposal on disk after timeout so the operator can still act.
- Make the wait duration configurable by environment with a conservative
default.
- Preserve current success and rejection result shapes for completed operator
responses.
- Keep `list-egress-routes` bounded and document its timeout behavior.
- Add focused tests for approved responses, timed-out pending responses, and
route-list timeout/error handling.
## Non-goals
- No asynchronous framework or new runtime dependency.
- No replacement of the stdlib threaded HTTP server.
- No change to the host-side supervise queue format.
- No cancellation protocol between the agent and operator UI.
- No dashboard or TUI changes.
## Scope
In scope:
- `bot_bottle/supervise_server.py` request handling.
- Any small helper in `bot_bottle/supervise.py` needed to support a bounded
wait cleanly.
- Unit tests around tool-call response waiting and route-list behavior.
Out of scope:
- Reworking proposal persistence.
- Changing egress apply or pipelock apply flows.
- Adding background workers to complete HTTP requests after the client returns.
## Design
Introduce a supervise response wait budget,
`SUPERVISE_RESPONSE_TIMEOUT_SECONDS`, with a 30 second default. The existing
poll loop should stop after that budget and return a normal tool result such as
`{"status": "pending", "notes": "operator response timed out; proposal remains queued"}`.
The exact field names should fit the existing response schema so agents can
handle success, rejection, and pending with one result parser.
The proposal file must remain in the queue when the HTTP call times out. The
operator can still approve or reject it later, but that later response will not
resume the original HTTP request.
Route-listing should continue to use a short HTTP timeout to egress. Errors
should be returned as tool results or JSON-RPC errors consistently with the
existing server behavior; the implementation should avoid an unbounded socket
wait in the request thread.
## Testing Strategy
- Unit-test a tool call whose response appears before the timeout.
- Unit-test a tool call whose response never appears and assert the request
returns a pending result while the proposal remains queued.
- Unit-test invalid timeout env values fall back or fail clearly.
- Unit-test `list-egress-routes` timeout/error behavior with a fake URL opener.
Run:
- `python3 -m unittest tests.unit.test_supervise_server`
- `python3 -m unittest discover -s tests/unit`
## Open Questions
None.
+67
View File
@@ -8,6 +8,7 @@ import threading
import time
import unittest
from pathlib import Path
from unittest.mock import patch
# The server module loads `supervise` via same-directory import inside
@@ -29,8 +30,10 @@ from bot_bottle.supervise_server import (
ServerConfig,
TOOL_DEFINITIONS,
_RpcError,
_response_timeout_from_env,
format_response_text,
handle_initialize,
handle_list_egress_routes,
handle_tools_call,
handle_tools_list,
jsonrpc_error,
@@ -300,6 +303,70 @@ class TestHandleToolsCall(unittest.TestCase):
processed = list((self.queue_dir / "processed").glob("*.json"))
self.assertEqual(2, len(processed))
def test_pending_response_times_out_without_archive(self):
config = ServerConfig(
bottle_slug="dev",
queue_dir=self.queue_dir,
response_timeout_seconds=0.05,
)
result = handle_tools_call(
{
"name": _sv.TOOL_EGRESS_BLOCK,
"arguments": {
"host": "example.com",
"justification": "need a route",
},
},
config,
)
self.assertFalse(result["isError"]) # type: ignore[index]
text = result["content"][0]["text"] # type: ignore[index]
self.assertIn("status: pending", text)
self.assertIn("proposal remains queued", text)
self.assertEqual(1, len(_sv.list_pending_proposals(self.queue_dir)))
self.assertFalse((self.queue_dir / "processed").exists())
class TestHandleListEgressRoutes(unittest.TestCase):
def test_url_error_returns_tool_error(self):
class _Opener:
def open(self, *args, **kwargs): # noqa: ANN001, ANN002, ANN003
raise OSError("egress unavailable")
with patch.object(supervise_server.urllib.request, "build_opener", return_value=_Opener()):
result = handle_list_egress_routes(
{},
ServerConfig(bottle_slug="dev", queue_dir=Path("/unused")),
)
self.assertTrue(result["isError"]) # type: ignore[index]
text = result["content"][0]["text"] # type: ignore[index]
self.assertIn("could not reach", text)
self.assertIn("egress unavailable", text)
class TestResponseTimeoutEnv(unittest.TestCase):
def test_unset_uses_default(self):
self.assertEqual(
supervise_server.DEFAULT_RESPONSE_TIMEOUT_SECONDS,
_response_timeout_from_env({}),
)
def test_positive_float_accepted(self):
self.assertEqual(
12.5,
_response_timeout_from_env({"SUPERVISE_RESPONSE_TIMEOUT_SECONDS": "12.5"}),
)
def test_invalid_value_rejected(self):
with self.assertRaises(ValueError):
_response_timeout_from_env({"SUPERVISE_RESPONSE_TIMEOUT_SECONDS": "soon"})
def test_nonpositive_value_rejected(self):
with self.assertRaises(ValueError):
_response_timeout_from_env({"SUPERVISE_RESPONSE_TIMEOUT_SECONDS": "0"})
# --- Response text formatting ---------------------------------------------