Files
bot-bottle/tests/unit/orchestrator/test_sidecar.py
T
didericis-claude 57290da1e8
lint / lint (push) Failing after 2m5s
test / unit (pull_request) Successful in 53s
test / integration (pull_request) Successful in 24s
test / coverage (pull_request) Successful in 1m12s
test: add coverage for orchestrator + gitea client (diff gate 77% → 98%)
Three new unit test modules:
- tests/unit/test_contrib_gitea_client.py — GiteaClient (urllib mocked)
  and GiteaForge delegation
- tests/unit/orchestrator/test_main.py — __main__ run/status commands
- tests/unit/orchestrator/test_bootstrap.py — _token, BotBottleStateStore,
  _to_forge_state/_to_record, make_forge, make_sidecar, build

Augments to existing suites:
- test_events: non-"created" comment action ignored
- test_lifecycle: _iso_now callable, untracked-issue comment ignored,
  untracked-PR closed ignored (covers _find_by_pr return-None path)
- test_runner: destroy command, _default_run via subprocess mock
- test_sidecar: _jsonable dataclass/list branches, OpLog.read on missing
  file, drain_done_events on corrupted file, socket _Handler invalid-JSON
  and empty-line paths, serve() with pre-existing socket path
- test_watchdog: _loop body covered by patching _TICK_SECS to 0.01s
- test_webhook: unknown GET path returns 404

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-07-01 19:35:30 +00:00

205 lines
7.5 KiB
Python

"""Unit: forge sidecar dispatch, op log, queue relay, socket server."""
from __future__ import annotations
import dataclasses
import json
import socket
import tempfile
import threading
import unittest
from pathlib import Path
from bot_bottle.orchestrator.sidecar import (
ForgeSidecar,
OpLog,
_jsonable,
drain_done_events,
serve,
write_done_event,
)
from ._fakes import FakeForge
class SidecarDispatchTest(unittest.TestCase):
def setUp(self):
self.tmp = Path(self.enterContext(tempfile.TemporaryDirectory())) # pylint: disable=consider-using-with
self.forge = FakeForge()
self.log = OpLog(self.tmp / "ops.jsonl", now=lambda: "T")
self.queue = self.tmp / "queue"
self.sc = ForgeSidecar(
forge=self.forge, op_log=self.log, queue_dir=self.queue,
run_key=("o", "r", 17),
)
def test_read_pr_ok_and_logged(self):
resp = self.sc.dispatch("read_pr", {"number": 5})
self.assertTrue(resp["ok"])
self.assertEqual(5, resp["result"]["number"])
self.assertEqual([("read_pr", 5, "ok")],
[(o["op"], o["target"], o["detail"]) for o in self.log.read()])
def test_post_comment_writes_and_logs(self):
resp = self.sc.dispatch("post_comment", {"number": 17, "body": "done"})
self.assertTrue(resp["ok"])
self.assertEqual([(17, "done")], self.forge.comments)
def test_scope_denied_write_returns_error_and_audits_rejection(self):
self.forge.scope_denied.add(999)
resp = self.sc.dispatch("post_comment", {"number": 999, "body": "x"})
self.assertFalse(resp["ok"])
self.assertIn("denied", resp["error"])
# The rejection is recorded in the op log, not just the allows.
self.assertIn("error", self.log.read()[-1]["detail"])
self.assertEqual([], self.forge.comments)
def test_signal_done_queues_event(self):
resp = self.sc.dispatch("signal_done", {"status": "success", "summary": "ok"})
self.assertTrue(resp["ok"])
events = drain_done_events(self.queue)
self.assertEqual(1, len(events))
self.assertEqual(("o", "r", 17, "success"),
(events[0]["owner"], events[0]["repo"],
events[0]["issue_number"], events[0]["status"]))
def test_unknown_method(self):
resp = self.sc.dispatch("delete_repo", {})
self.assertFalse(resp["ok"])
class JsonableTest(unittest.TestCase):
def test_plain_value_passthrough(self):
self.assertEqual(42, _jsonable(42))
self.assertEqual("s", _jsonable("s"))
def test_dataclass_converted_to_dict(self):
@dataclasses.dataclass
class Thing:
x: int
y: str = "hi"
self.assertEqual({"x": 99, "y": "hi"}, _jsonable(Thing(x=99)))
def test_list_recursed(self):
self.assertEqual([1, 2, 3], _jsonable([1, 2, 3]))
def test_list_of_dataclasses(self):
@dataclasses.dataclass
class Item:
v: int
result = _jsonable([Item(v=1), Item(v=2)])
self.assertEqual([{"v": 1}, {"v": 2}], result)
class QueueTest(unittest.TestCase):
def test_drain_removes_events(self):
tmp = Path(self.enterContext(tempfile.TemporaryDirectory())) # pylint: disable=consider-using-with
write_done_event(tmp, {"owner": "o", "repo": "r", "issue_number": 1})
self.assertEqual(1, len(drain_done_events(tmp)))
self.assertEqual([], drain_done_events(tmp)) # drained
def test_drain_missing_dir(self):
self.assertEqual([], drain_done_events(Path("/nonexistent/queue")))
def test_drain_skips_corrupted_file(self):
tmp = Path(self.enterContext(tempfile.TemporaryDirectory())) # pylint: disable=consider-using-with
(tmp / "done-bad.json").write_text("not json", encoding="utf-8")
events = drain_done_events(tmp)
self.assertEqual([], events)
# The corrupted file is removed by the finally block.
self.assertFalse((tmp / "done-bad.json").exists())
class OpLogReadTest(unittest.TestCase):
def test_read_missing_file_returns_empty(self):
with tempfile.TemporaryDirectory() as tmp:
log = OpLog(Path(tmp) / "sub" / "ops.jsonl")
# File not written yet — read() should return [].
self.assertEqual([], log.read())
class SocketServerTest(unittest.TestCase):
def _make_server(self, tmp: Path):
sock = tmp / "s.sock"
if len(str(sock)) > 100:
self.skipTest("temp socket path too long for AF_UNIX")
sidecar = ForgeSidecar(
forge=FakeForge(), op_log=OpLog(tmp / "ops.jsonl"),
queue_dir=tmp / "q", run_key=("o", "r", 1),
)
return serve(sidecar, sock), sock
def test_round_trip_over_unix_socket(self):
tmp = tempfile.mkdtemp()
sock = Path(tmp) / "s.sock"
if len(str(sock)) > 100: # AF_UNIX path limit; skip on long tmp paths
self.skipTest("temp socket path too long for AF_UNIX")
sidecar = ForgeSidecar(
forge=FakeForge(), op_log=OpLog(Path(tmp) / "ops.jsonl"),
queue_dir=Path(tmp) / "q", run_key=("o", "r", 1),
)
srv = serve(sidecar, sock)
t = threading.Thread(target=srv.handle_request, daemon=True)
t.start()
try:
client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
client.connect(str(sock))
client.sendall(b'{"method": "read_issue", "params": {"number": 3}}\n')
line = client.makefile().readline()
client.close()
finally:
t.join(timeout=5)
srv.server_close()
resp = json.loads(line)
self.assertTrue(resp["ok"])
self.assertEqual(3, resp["result"]["number"])
def test_handler_invalid_json_returns_error(self):
tmp = Path(tempfile.mkdtemp())
srv, sock = self._make_server(tmp)
t = threading.Thread(target=srv.handle_request, daemon=True)
t.start()
try:
client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
client.connect(str(sock))
client.sendall(b"not valid json!\n")
line = client.makefile().readline()
client.close()
finally:
t.join(timeout=5)
srv.server_close()
resp = json.loads(line)
self.assertFalse(resp["ok"])
self.assertIn("invalid json", resp["error"])
def test_handler_empty_line_closes_silently(self):
tmp = Path(tempfile.mkdtemp())
srv, sock = self._make_server(tmp)
t = threading.Thread(target=srv.handle_request, daemon=True)
t.start()
try:
client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
client.connect(str(sock))
client.close() # immediate EOF -> readline() returns b""
finally:
t.join(timeout=5)
srv.server_close()
def test_serve_removes_existing_socket_path(self):
tmp = Path(tempfile.mkdtemp())
sock = tmp / "existing.sock"
if len(str(sock)) > 100:
self.skipTest("temp socket path too long for AF_UNIX")
sock.touch() # pre-existing file at socket path
sidecar = ForgeSidecar(
forge=FakeForge(), op_log=OpLog(tmp / "ops.jsonl"),
queue_dir=tmp / "q", run_key=("o", "r", 1),
)
srv = serve(sidecar, sock) # should unlink the pre-existing file
srv.server_close()
if __name__ == "__main__":
unittest.main()