57290da1e8
Three new unit test modules: - tests/unit/test_contrib_gitea_client.py — GiteaClient (urllib mocked) and GiteaForge delegation - tests/unit/orchestrator/test_main.py — __main__ run/status commands - tests/unit/orchestrator/test_bootstrap.py — _token, BotBottleStateStore, _to_forge_state/_to_record, make_forge, make_sidecar, build Augments to existing suites: - test_events: non-"created" comment action ignored - test_lifecycle: _iso_now callable, untracked-issue comment ignored, untracked-PR closed ignored (covers _find_by_pr return-None path) - test_runner: destroy command, _default_run via subprocess mock - test_sidecar: _jsonable dataclass/list branches, OpLog.read on missing file, drain_done_events on corrupted file, socket _Handler invalid-JSON and empty-line paths, serve() with pre-existing socket path - test_watchdog: _loop body covered by patching _TICK_SECS to 0.01s - test_webhook: unknown GET path returns 404 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
205 lines
7.5 KiB
Python
205 lines
7.5 KiB
Python
"""Unit: forge sidecar dispatch, op log, queue relay, socket server."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import dataclasses
|
|
import json
|
|
import socket
|
|
import tempfile
|
|
import threading
|
|
import unittest
|
|
from pathlib import Path
|
|
|
|
from bot_bottle.orchestrator.sidecar import (
|
|
ForgeSidecar,
|
|
OpLog,
|
|
_jsonable,
|
|
drain_done_events,
|
|
serve,
|
|
write_done_event,
|
|
)
|
|
|
|
from ._fakes import FakeForge
|
|
|
|
|
|
class SidecarDispatchTest(unittest.TestCase):
|
|
def setUp(self):
|
|
self.tmp = Path(self.enterContext(tempfile.TemporaryDirectory())) # pylint: disable=consider-using-with
|
|
self.forge = FakeForge()
|
|
self.log = OpLog(self.tmp / "ops.jsonl", now=lambda: "T")
|
|
self.queue = self.tmp / "queue"
|
|
self.sc = ForgeSidecar(
|
|
forge=self.forge, op_log=self.log, queue_dir=self.queue,
|
|
run_key=("o", "r", 17),
|
|
)
|
|
|
|
def test_read_pr_ok_and_logged(self):
|
|
resp = self.sc.dispatch("read_pr", {"number": 5})
|
|
self.assertTrue(resp["ok"])
|
|
self.assertEqual(5, resp["result"]["number"])
|
|
self.assertEqual([("read_pr", 5, "ok")],
|
|
[(o["op"], o["target"], o["detail"]) for o in self.log.read()])
|
|
|
|
def test_post_comment_writes_and_logs(self):
|
|
resp = self.sc.dispatch("post_comment", {"number": 17, "body": "done"})
|
|
self.assertTrue(resp["ok"])
|
|
self.assertEqual([(17, "done")], self.forge.comments)
|
|
|
|
def test_scope_denied_write_returns_error_and_audits_rejection(self):
|
|
self.forge.scope_denied.add(999)
|
|
resp = self.sc.dispatch("post_comment", {"number": 999, "body": "x"})
|
|
self.assertFalse(resp["ok"])
|
|
self.assertIn("denied", resp["error"])
|
|
# The rejection is recorded in the op log, not just the allows.
|
|
self.assertIn("error", self.log.read()[-1]["detail"])
|
|
self.assertEqual([], self.forge.comments)
|
|
|
|
def test_signal_done_queues_event(self):
|
|
resp = self.sc.dispatch("signal_done", {"status": "success", "summary": "ok"})
|
|
self.assertTrue(resp["ok"])
|
|
events = drain_done_events(self.queue)
|
|
self.assertEqual(1, len(events))
|
|
self.assertEqual(("o", "r", 17, "success"),
|
|
(events[0]["owner"], events[0]["repo"],
|
|
events[0]["issue_number"], events[0]["status"]))
|
|
|
|
def test_unknown_method(self):
|
|
resp = self.sc.dispatch("delete_repo", {})
|
|
self.assertFalse(resp["ok"])
|
|
|
|
|
|
class JsonableTest(unittest.TestCase):
|
|
def test_plain_value_passthrough(self):
|
|
self.assertEqual(42, _jsonable(42))
|
|
self.assertEqual("s", _jsonable("s"))
|
|
|
|
def test_dataclass_converted_to_dict(self):
|
|
@dataclasses.dataclass
|
|
class Thing:
|
|
x: int
|
|
y: str = "hi"
|
|
self.assertEqual({"x": 99, "y": "hi"}, _jsonable(Thing(x=99)))
|
|
|
|
def test_list_recursed(self):
|
|
self.assertEqual([1, 2, 3], _jsonable([1, 2, 3]))
|
|
|
|
def test_list_of_dataclasses(self):
|
|
@dataclasses.dataclass
|
|
class Item:
|
|
v: int
|
|
result = _jsonable([Item(v=1), Item(v=2)])
|
|
self.assertEqual([{"v": 1}, {"v": 2}], result)
|
|
|
|
|
|
class QueueTest(unittest.TestCase):
|
|
def test_drain_removes_events(self):
|
|
tmp = Path(self.enterContext(tempfile.TemporaryDirectory())) # pylint: disable=consider-using-with
|
|
write_done_event(tmp, {"owner": "o", "repo": "r", "issue_number": 1})
|
|
self.assertEqual(1, len(drain_done_events(tmp)))
|
|
self.assertEqual([], drain_done_events(tmp)) # drained
|
|
|
|
def test_drain_missing_dir(self):
|
|
self.assertEqual([], drain_done_events(Path("/nonexistent/queue")))
|
|
|
|
def test_drain_skips_corrupted_file(self):
|
|
tmp = Path(self.enterContext(tempfile.TemporaryDirectory())) # pylint: disable=consider-using-with
|
|
(tmp / "done-bad.json").write_text("not json", encoding="utf-8")
|
|
events = drain_done_events(tmp)
|
|
self.assertEqual([], events)
|
|
# The corrupted file is removed by the finally block.
|
|
self.assertFalse((tmp / "done-bad.json").exists())
|
|
|
|
|
|
class OpLogReadTest(unittest.TestCase):
|
|
def test_read_missing_file_returns_empty(self):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
log = OpLog(Path(tmp) / "sub" / "ops.jsonl")
|
|
# File not written yet — read() should return [].
|
|
self.assertEqual([], log.read())
|
|
|
|
|
|
class SocketServerTest(unittest.TestCase):
|
|
def _make_server(self, tmp: Path):
|
|
sock = tmp / "s.sock"
|
|
if len(str(sock)) > 100:
|
|
self.skipTest("temp socket path too long for AF_UNIX")
|
|
sidecar = ForgeSidecar(
|
|
forge=FakeForge(), op_log=OpLog(tmp / "ops.jsonl"),
|
|
queue_dir=tmp / "q", run_key=("o", "r", 1),
|
|
)
|
|
return serve(sidecar, sock), sock
|
|
|
|
def test_round_trip_over_unix_socket(self):
|
|
tmp = tempfile.mkdtemp()
|
|
sock = Path(tmp) / "s.sock"
|
|
if len(str(sock)) > 100: # AF_UNIX path limit; skip on long tmp paths
|
|
self.skipTest("temp socket path too long for AF_UNIX")
|
|
sidecar = ForgeSidecar(
|
|
forge=FakeForge(), op_log=OpLog(Path(tmp) / "ops.jsonl"),
|
|
queue_dir=Path(tmp) / "q", run_key=("o", "r", 1),
|
|
)
|
|
srv = serve(sidecar, sock)
|
|
t = threading.Thread(target=srv.handle_request, daemon=True)
|
|
t.start()
|
|
try:
|
|
client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
client.connect(str(sock))
|
|
client.sendall(b'{"method": "read_issue", "params": {"number": 3}}\n')
|
|
line = client.makefile().readline()
|
|
client.close()
|
|
finally:
|
|
t.join(timeout=5)
|
|
srv.server_close()
|
|
resp = json.loads(line)
|
|
self.assertTrue(resp["ok"])
|
|
self.assertEqual(3, resp["result"]["number"])
|
|
|
|
|
|
def test_handler_invalid_json_returns_error(self):
|
|
tmp = Path(tempfile.mkdtemp())
|
|
srv, sock = self._make_server(tmp)
|
|
t = threading.Thread(target=srv.handle_request, daemon=True)
|
|
t.start()
|
|
try:
|
|
client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
client.connect(str(sock))
|
|
client.sendall(b"not valid json!\n")
|
|
line = client.makefile().readline()
|
|
client.close()
|
|
finally:
|
|
t.join(timeout=5)
|
|
srv.server_close()
|
|
resp = json.loads(line)
|
|
self.assertFalse(resp["ok"])
|
|
self.assertIn("invalid json", resp["error"])
|
|
|
|
def test_handler_empty_line_closes_silently(self):
|
|
tmp = Path(tempfile.mkdtemp())
|
|
srv, sock = self._make_server(tmp)
|
|
t = threading.Thread(target=srv.handle_request, daemon=True)
|
|
t.start()
|
|
try:
|
|
client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
client.connect(str(sock))
|
|
client.close() # immediate EOF -> readline() returns b""
|
|
finally:
|
|
t.join(timeout=5)
|
|
srv.server_close()
|
|
|
|
def test_serve_removes_existing_socket_path(self):
|
|
tmp = Path(tempfile.mkdtemp())
|
|
sock = tmp / "existing.sock"
|
|
if len(str(sock)) > 100:
|
|
self.skipTest("temp socket path too long for AF_UNIX")
|
|
sock.touch() # pre-existing file at socket path
|
|
sidecar = ForgeSidecar(
|
|
forge=FakeForge(), op_log=OpLog(tmp / "ops.jsonl"),
|
|
queue_dir=tmp / "q", run_key=("o", "r", 1),
|
|
)
|
|
srv = serve(sidecar, sock) # should unlink the pre-existing file
|
|
srv.server_close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|