test: add coverage for orchestrator + gitea client (diff gate 77% → 98%)
Three new unit test modules: - tests/unit/test_contrib_gitea_client.py — GiteaClient (urllib mocked) and GiteaForge delegation - tests/unit/orchestrator/test_main.py — __main__ run/status commands - tests/unit/orchestrator/test_bootstrap.py — _token, BotBottleStateStore, _to_forge_state/_to_record, make_forge, make_sidecar, build Augments to existing suites: - test_events: non-"created" comment action ignored - test_lifecycle: _iso_now callable, untracked-issue comment ignored, untracked-PR closed ignored (covers _find_by_pr return-None path) - test_runner: destroy command, _default_run via subprocess mock - test_sidecar: _jsonable dataclass/list branches, OpLog.read on missing file, drain_done_events on corrupted file, socket _Handler invalid-JSON and empty-line paths, serve() with pre-existing socket path - test_watchdog: _loop body covered by patching _TICK_SECS to 0.01s - test_webhook: unknown GET path returns 404 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,179 @@
|
|||||||
|
"""Unit: BotBottleStateStore, _token, conversions, make_forge/make_sidecar, build."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
import unittest
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
from bot_bottle.orchestrator.bootstrap import (
|
||||||
|
BotBottleStateStore,
|
||||||
|
_to_forge_state,
|
||||||
|
_to_record,
|
||||||
|
_token,
|
||||||
|
build,
|
||||||
|
make_forge,
|
||||||
|
make_sidecar,
|
||||||
|
)
|
||||||
|
from bot_bottle.orchestrator.config import Config
|
||||||
|
from bot_bottle.orchestrator.model import RunRecord
|
||||||
|
|
||||||
|
|
||||||
|
def _config(tmp: str) -> Config:
|
||||||
|
return Config(
|
||||||
|
forge_org="org",
|
||||||
|
gitea_api="http://g/api/v1",
|
||||||
|
watchdog_timeout_secs=1800,
|
||||||
|
webhook_host="127.0.0.1",
|
||||||
|
webhook_port=0,
|
||||||
|
bot_bottle_cli="cli.py",
|
||||||
|
queue_dir=Path(tmp) / "q",
|
||||||
|
sidecar_socket=Path(tmp) / "s.sock",
|
||||||
|
db_path=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _record(**kw) -> RunRecord:
|
||||||
|
defaults: dict[str, object] = dict(
|
||||||
|
owner="o", repo="r", issue_number=1, slug="s1", agent_name="a",
|
||||||
|
bottle_names=["claude"], backend_name="docker", agent_git_user="bot",
|
||||||
|
pr_number=5, status="running", last_checkin_at="2026-01-01T00:00:00+00:00",
|
||||||
|
)
|
||||||
|
defaults.update(kw)
|
||||||
|
return RunRecord(**defaults) # type: ignore[arg-type]
|
||||||
|
|
||||||
|
|
||||||
|
class TokenTest(unittest.TestCase):
|
||||||
|
def test_gitea_token_env(self):
|
||||||
|
with patch.dict(os.environ, {"GITEA_TOKEN": "tok123"}):
|
||||||
|
self.assertEqual("tok123", _token())
|
||||||
|
|
||||||
|
def test_forge_gitea_token_fallback(self):
|
||||||
|
clean = {k: v for k, v in os.environ.items()
|
||||||
|
if k not in ("GITEA_TOKEN", "FORGE_GITEA_TOKEN")}
|
||||||
|
with patch.dict(os.environ, {**clean, "FORGE_GITEA_TOKEN": "tok456"}, clear=True):
|
||||||
|
self.assertEqual("tok456", _token())
|
||||||
|
|
||||||
|
def test_missing_token_raises(self):
|
||||||
|
clean = {k: v for k, v in os.environ.items()
|
||||||
|
if k not in ("GITEA_TOKEN", "FORGE_GITEA_TOKEN")}
|
||||||
|
with patch.dict(os.environ, clean, clear=True):
|
||||||
|
with self.assertRaises(RuntimeError):
|
||||||
|
_token()
|
||||||
|
|
||||||
|
|
||||||
|
class ConversionRoundTripTest(unittest.TestCase):
|
||||||
|
def test_record_survives_forge_state_roundtrip(self):
|
||||||
|
rec = _record()
|
||||||
|
result = _to_record(_to_forge_state(rec))
|
||||||
|
self.assertEqual(rec.owner, result.owner)
|
||||||
|
self.assertEqual(rec.repo, result.repo)
|
||||||
|
self.assertEqual(rec.issue_number, result.issue_number)
|
||||||
|
self.assertEqual(rec.slug, result.slug)
|
||||||
|
self.assertEqual(rec.agent_name, result.agent_name)
|
||||||
|
self.assertEqual(rec.bottle_names, result.bottle_names)
|
||||||
|
self.assertEqual(rec.backend_name, result.backend_name)
|
||||||
|
self.assertEqual(rec.agent_git_user, result.agent_git_user)
|
||||||
|
self.assertEqual(rec.pr_number, result.pr_number)
|
||||||
|
self.assertEqual(rec.status, result.status)
|
||||||
|
self.assertEqual(rec.last_checkin_at, result.last_checkin_at)
|
||||||
|
|
||||||
|
def test_none_pr_number_preserved(self):
|
||||||
|
rec = _record(pr_number=None)
|
||||||
|
result = _to_record(_to_forge_state(rec))
|
||||||
|
self.assertIsNone(result.pr_number)
|
||||||
|
|
||||||
|
|
||||||
|
class BotBottleStateStoreTest(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.store = BotBottleStateStore(None)
|
||||||
|
|
||||||
|
def test_upsert_and_get(self):
|
||||||
|
self.store.upsert(_record())
|
||||||
|
got = self.store.get("o", "r", 1)
|
||||||
|
assert got is not None
|
||||||
|
self.assertEqual("s1", got.slug)
|
||||||
|
|
||||||
|
def test_get_missing(self):
|
||||||
|
self.assertIsNone(self.store.get("o", "r", 99))
|
||||||
|
|
||||||
|
def test_upsert_replaces(self):
|
||||||
|
self.store.upsert(_record())
|
||||||
|
self.store.upsert(_record(slug="new-slug"))
|
||||||
|
got = self.store.get("o", "r", 1)
|
||||||
|
assert got is not None
|
||||||
|
self.assertEqual("new-slug", got.slug)
|
||||||
|
|
||||||
|
def test_delete(self):
|
||||||
|
self.store.upsert(_record())
|
||||||
|
self.store.delete("o", "r", 1)
|
||||||
|
self.assertIsNone(self.store.get("o", "r", 1))
|
||||||
|
|
||||||
|
def test_all_returns_all_records(self):
|
||||||
|
self.store.upsert(_record(issue_number=1, slug="s1"))
|
||||||
|
self.store.upsert(_record(issue_number=2, slug="s2"))
|
||||||
|
recs = self.store.all()
|
||||||
|
self.assertEqual(2, len(recs))
|
||||||
|
slugs = {r.slug for r in recs}
|
||||||
|
self.assertEqual({"s1", "s2"}, slugs)
|
||||||
|
|
||||||
|
def test_all_empty(self):
|
||||||
|
self.assertEqual([], self.store.all())
|
||||||
|
|
||||||
|
def test_bottle_names_preserved(self):
|
||||||
|
self.store.upsert(_record(bottle_names=["claude", "dev"]))
|
||||||
|
got = self.store.get("o", "r", 1)
|
||||||
|
assert got is not None
|
||||||
|
self.assertEqual(["claude", "dev"], got.bottle_names)
|
||||||
|
|
||||||
|
|
||||||
|
class MakeForgeTest(unittest.TestCase):
|
||||||
|
def test_returns_gitea_forge(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
config = _config(tmp)
|
||||||
|
with patch.dict(os.environ, {"GITEA_TOKEN": "tok"}):
|
||||||
|
forge = make_forge(config, "owner", "repo")
|
||||||
|
from bot_bottle.contrib.gitea.client import GiteaForge
|
||||||
|
self.assertIsInstance(forge, GiteaForge)
|
||||||
|
|
||||||
|
|
||||||
|
class MakeSidecarTest(unittest.TestCase):
|
||||||
|
def test_returns_forge_sidecar(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
config = _config(tmp)
|
||||||
|
with patch.dict(os.environ, {"GITEA_TOKEN": "tok"}):
|
||||||
|
sidecar = make_sidecar(config, "owner", "repo", 1, [])
|
||||||
|
from bot_bottle.orchestrator.sidecar import ForgeSidecar
|
||||||
|
self.assertIsInstance(sidecar, ForgeSidecar)
|
||||||
|
|
||||||
|
|
||||||
|
class BuildTest(unittest.TestCase):
|
||||||
|
def test_returns_server_watchdog_orchestrator(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
config = _config(tmp)
|
||||||
|
with patch.dict(os.environ, {"GITEA_TOKEN": "tok"}):
|
||||||
|
server, watchdog, orch = build(config)
|
||||||
|
server.server_close()
|
||||||
|
|
||||||
|
from bot_bottle.orchestrator.lifecycle import Orchestrator
|
||||||
|
from bot_bottle.orchestrator.watchdog import Watchdog
|
||||||
|
from bot_bottle.orchestrator.webhook import WebhookServer
|
||||||
|
self.assertIsInstance(server, WebhookServer)
|
||||||
|
self.assertIsInstance(watchdog, Watchdog)
|
||||||
|
self.assertIsInstance(orch, Orchestrator)
|
||||||
|
|
||||||
|
def test_server_binds_to_configured_host(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
config = _config(tmp)
|
||||||
|
with patch.dict(os.environ, {"GITEA_TOKEN": "tok"}):
|
||||||
|
server, _, _ = build(config)
|
||||||
|
host, port = server.server_address
|
||||||
|
server.server_close()
|
||||||
|
self.assertEqual("127.0.0.1", host)
|
||||||
|
self.assertGreater(port, 0)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
@@ -56,6 +56,10 @@ class ParseEventTest(unittest.TestCase):
|
|||||||
def test_pull_request_non_closed_ignored(self):
|
def test_pull_request_non_closed_ignored(self):
|
||||||
self.assertIsNone(parse_event("pull_request", {**_REPO, "action": "opened"}))
|
self.assertIsNone(parse_event("pull_request", {**_REPO, "action": "opened"}))
|
||||||
|
|
||||||
|
def test_comment_non_created_action_ignored(self):
|
||||||
|
payload = {**_REPO, "action": "edited", "issue": {}, "comment": {}}
|
||||||
|
self.assertIsNone(parse_event("issue_comment", payload))
|
||||||
|
|
||||||
def test_unknown_kind_ignored(self):
|
def test_unknown_kind_ignored(self):
|
||||||
self.assertIsNone(parse_event("push", {**_REPO}))
|
self.assertIsNone(parse_event("push", {**_REPO}))
|
||||||
|
|
||||||
|
|||||||
@@ -135,6 +135,29 @@ class LifecycleTest(unittest.TestCase):
|
|||||||
self.assertIn(("destroy", "impl-didericis-bot-bottle-17"), self.runner.calls)
|
self.assertIn(("destroy", "impl-didericis-bot-bottle-17"), self.runner.calls)
|
||||||
self.assertIsNone(self._record())
|
self.assertIsNone(self._record())
|
||||||
|
|
||||||
|
def test_comment_on_untracked_issue_ignored(self):
|
||||||
|
# No record in store and is_pull=False -> _route_comment returns None.
|
||||||
|
self.orch.handle(CommentCreated(
|
||||||
|
owner="didericis", repo="bot-bottle", issue_number=99,
|
||||||
|
comment_id=1, author="reviewer", body="hi", is_pull=False,
|
||||||
|
))
|
||||||
|
self.assertEqual([], self.runner.calls)
|
||||||
|
|
||||||
|
def test_pr_closed_untracked_pr_ignored(self):
|
||||||
|
# _find_by_pr finds nothing -> _on_pr_closed exits early.
|
||||||
|
self.orch.handle(PullRequestClosed(
|
||||||
|
owner="didericis", repo="bot-bottle", pr_number=999, merged=True,
|
||||||
|
))
|
||||||
|
self.assertEqual([], self.runner.calls)
|
||||||
|
|
||||||
|
|
||||||
|
class IsoNowTest(unittest.TestCase):
|
||||||
|
def test_returns_iso_string(self):
|
||||||
|
from bot_bottle.orchestrator.lifecycle import _iso_now
|
||||||
|
ts = _iso_now()
|
||||||
|
self.assertIsInstance(ts, str)
|
||||||
|
self.assertIn("T", ts)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|||||||
@@ -0,0 +1,88 @@
|
|||||||
|
"""Unit: __main__ CLI entry points (run and status commands)."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import io
|
||||||
|
import unittest
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
from bot_bottle.orchestrator.__main__ import main
|
||||||
|
from bot_bottle.orchestrator.config import Config
|
||||||
|
from bot_bottle.orchestrator.model import RunRecord
|
||||||
|
|
||||||
|
|
||||||
|
def _config() -> Config:
|
||||||
|
return Config.from_env({"HOME": "/tmp"})
|
||||||
|
|
||||||
|
|
||||||
|
class MainRunTest(unittest.TestCase):
|
||||||
|
def test_run_delegates_to_bootstrap(self):
|
||||||
|
config = _config()
|
||||||
|
with patch.object(Config, "from_env", return_value=config), \
|
||||||
|
patch("bot_bottle.orchestrator.bootstrap.run") as mock_run:
|
||||||
|
rc = main(["run"])
|
||||||
|
self.assertEqual(0, rc)
|
||||||
|
mock_run.assert_called_once_with(config)
|
||||||
|
|
||||||
|
def test_run_prints_listen_address_to_stderr(self):
|
||||||
|
config = _config()
|
||||||
|
err = io.StringIO()
|
||||||
|
with patch.object(Config, "from_env", return_value=config), \
|
||||||
|
patch("bot_bottle.orchestrator.bootstrap.run"), \
|
||||||
|
patch("sys.stderr", err):
|
||||||
|
main(["run"])
|
||||||
|
self.assertIn(str(config.webhook_port), err.getvalue())
|
||||||
|
|
||||||
|
|
||||||
|
class MainStatusTest(unittest.TestCase):
|
||||||
|
def test_status_empty_store(self):
|
||||||
|
config = _config()
|
||||||
|
with patch.object(Config, "from_env", return_value=config), \
|
||||||
|
patch("bot_bottle.orchestrator.bootstrap.BotBottleStateStore") as MockStore:
|
||||||
|
MockStore.return_value.all.return_value = []
|
||||||
|
rc = main(["status"])
|
||||||
|
self.assertEqual(0, rc)
|
||||||
|
|
||||||
|
def test_status_prints_records(self):
|
||||||
|
config = _config()
|
||||||
|
rec = RunRecord(
|
||||||
|
owner="o", repo="r", issue_number=1, slug="my-slug",
|
||||||
|
agent_name="a", pr_number=7, status="frozen",
|
||||||
|
)
|
||||||
|
out = io.StringIO()
|
||||||
|
with patch.object(Config, "from_env", return_value=config), \
|
||||||
|
patch("bot_bottle.orchestrator.bootstrap.BotBottleStateStore") as MockStore, \
|
||||||
|
patch("sys.stdout", out):
|
||||||
|
MockStore.return_value.all.return_value = [rec]
|
||||||
|
rc = main(["status"])
|
||||||
|
self.assertEqual(0, rc)
|
||||||
|
self.assertIn("my-slug", out.getvalue())
|
||||||
|
self.assertIn("PR#7", out.getvalue())
|
||||||
|
|
||||||
|
def test_status_no_pr_prints_dash(self):
|
||||||
|
config = _config()
|
||||||
|
rec = RunRecord(
|
||||||
|
owner="o", repo="r", issue_number=2, slug="s2",
|
||||||
|
agent_name="a", pr_number=None, status="running",
|
||||||
|
)
|
||||||
|
out = io.StringIO()
|
||||||
|
with patch.object(Config, "from_env", return_value=config), \
|
||||||
|
patch("bot_bottle.orchestrator.bootstrap.BotBottleStateStore") as MockStore, \
|
||||||
|
patch("sys.stdout", out):
|
||||||
|
MockStore.return_value.all.return_value = [rec]
|
||||||
|
main(["status"])
|
||||||
|
self.assertIn("-", out.getvalue())
|
||||||
|
|
||||||
|
|
||||||
|
class MainArgparseTest(unittest.TestCase):
|
||||||
|
def test_no_command_exits(self):
|
||||||
|
with self.assertRaises(SystemExit):
|
||||||
|
main([])
|
||||||
|
|
||||||
|
def test_unknown_command_exits(self):
|
||||||
|
with self.assertRaises(SystemExit):
|
||||||
|
main(["bogus"])
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
@@ -60,6 +60,22 @@ class SubprocessRunnerTest(unittest.TestCase):
|
|||||||
["/py", "/x/cli.py", "resume", "slug-1", "--headless", "--prompt",
|
["/py", "/x/cli.py", "resume", "slug-1", "--headless", "--prompt",
|
||||||
"address review"], self.argvs[0])
|
"address review"], self.argvs[0])
|
||||||
|
|
||||||
|
def test_destroy_calls_cleanup(self):
|
||||||
|
code = self.runner.destroy("slug-7")
|
||||||
|
self.assertEqual(0, code)
|
||||||
|
self.assertEqual(["/py", "/x/cli.py", "cleanup", "slug-7"], self.argvs[0])
|
||||||
|
|
||||||
|
|
||||||
|
class DefaultRunTest(unittest.TestCase):
|
||||||
|
def test_calls_subprocess_and_returns_code(self):
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
from bot_bottle.orchestrator.runner import _default_run
|
||||||
|
with patch("subprocess.run") as mock_run:
|
||||||
|
mock_run.return_value = MagicMock(returncode=42)
|
||||||
|
code = _default_run(["echo", "hi"], {"PATH": "/bin"})
|
||||||
|
self.assertEqual(42, code)
|
||||||
|
mock_run.assert_called_once_with(["echo", "hi"], env={"PATH": "/bin"}, check=False)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|||||||
@@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import dataclasses
|
||||||
import json
|
import json
|
||||||
import socket
|
import socket
|
||||||
import tempfile
|
import tempfile
|
||||||
@@ -12,6 +13,7 @@ from pathlib import Path
|
|||||||
from bot_bottle.orchestrator.sidecar import (
|
from bot_bottle.orchestrator.sidecar import (
|
||||||
ForgeSidecar,
|
ForgeSidecar,
|
||||||
OpLog,
|
OpLog,
|
||||||
|
_jsonable,
|
||||||
drain_done_events,
|
drain_done_events,
|
||||||
serve,
|
serve,
|
||||||
write_done_event,
|
write_done_event,
|
||||||
@@ -66,6 +68,29 @@ class SidecarDispatchTest(unittest.TestCase):
|
|||||||
self.assertFalse(resp["ok"])
|
self.assertFalse(resp["ok"])
|
||||||
|
|
||||||
|
|
||||||
|
class JsonableTest(unittest.TestCase):
|
||||||
|
def test_plain_value_passthrough(self):
|
||||||
|
self.assertEqual(42, _jsonable(42))
|
||||||
|
self.assertEqual("s", _jsonable("s"))
|
||||||
|
|
||||||
|
def test_dataclass_converted_to_dict(self):
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class Thing:
|
||||||
|
x: int
|
||||||
|
y: str = "hi"
|
||||||
|
self.assertEqual({"x": 99, "y": "hi"}, _jsonable(Thing(x=99)))
|
||||||
|
|
||||||
|
def test_list_recursed(self):
|
||||||
|
self.assertEqual([1, 2, 3], _jsonable([1, 2, 3]))
|
||||||
|
|
||||||
|
def test_list_of_dataclasses(self):
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class Item:
|
||||||
|
v: int
|
||||||
|
result = _jsonable([Item(v=1), Item(v=2)])
|
||||||
|
self.assertEqual([{"v": 1}, {"v": 2}], result)
|
||||||
|
|
||||||
|
|
||||||
class QueueTest(unittest.TestCase):
|
class QueueTest(unittest.TestCase):
|
||||||
def test_drain_removes_events(self):
|
def test_drain_removes_events(self):
|
||||||
tmp = Path(self.enterContext(tempfile.TemporaryDirectory())) # pylint: disable=consider-using-with
|
tmp = Path(self.enterContext(tempfile.TemporaryDirectory())) # pylint: disable=consider-using-with
|
||||||
@@ -76,8 +101,34 @@ class QueueTest(unittest.TestCase):
|
|||||||
def test_drain_missing_dir(self):
|
def test_drain_missing_dir(self):
|
||||||
self.assertEqual([], drain_done_events(Path("/nonexistent/queue")))
|
self.assertEqual([], drain_done_events(Path("/nonexistent/queue")))
|
||||||
|
|
||||||
|
def test_drain_skips_corrupted_file(self):
|
||||||
|
tmp = Path(self.enterContext(tempfile.TemporaryDirectory())) # pylint: disable=consider-using-with
|
||||||
|
(tmp / "done-bad.json").write_text("not json", encoding="utf-8")
|
||||||
|
events = drain_done_events(tmp)
|
||||||
|
self.assertEqual([], events)
|
||||||
|
# The corrupted file is removed by the finally block.
|
||||||
|
self.assertFalse((tmp / "done-bad.json").exists())
|
||||||
|
|
||||||
|
|
||||||
|
class OpLogReadTest(unittest.TestCase):
|
||||||
|
def test_read_missing_file_returns_empty(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
log = OpLog(Path(tmp) / "sub" / "ops.jsonl")
|
||||||
|
# File not written yet — read() should return [].
|
||||||
|
self.assertEqual([], log.read())
|
||||||
|
|
||||||
|
|
||||||
class SocketServerTest(unittest.TestCase):
|
class SocketServerTest(unittest.TestCase):
|
||||||
|
def _make_server(self, tmp: Path):
|
||||||
|
sock = tmp / "s.sock"
|
||||||
|
if len(str(sock)) > 100:
|
||||||
|
self.skipTest("temp socket path too long for AF_UNIX")
|
||||||
|
sidecar = ForgeSidecar(
|
||||||
|
forge=FakeForge(), op_log=OpLog(tmp / "ops.jsonl"),
|
||||||
|
queue_dir=tmp / "q", run_key=("o", "r", 1),
|
||||||
|
)
|
||||||
|
return serve(sidecar, sock), sock
|
||||||
|
|
||||||
def test_round_trip_over_unix_socket(self):
|
def test_round_trip_over_unix_socket(self):
|
||||||
tmp = tempfile.mkdtemp()
|
tmp = tempfile.mkdtemp()
|
||||||
sock = Path(tmp) / "s.sock"
|
sock = Path(tmp) / "s.sock"
|
||||||
@@ -104,5 +155,50 @@ class SocketServerTest(unittest.TestCase):
|
|||||||
self.assertEqual(3, resp["result"]["number"])
|
self.assertEqual(3, resp["result"]["number"])
|
||||||
|
|
||||||
|
|
||||||
|
def test_handler_invalid_json_returns_error(self):
|
||||||
|
tmp = Path(tempfile.mkdtemp())
|
||||||
|
srv, sock = self._make_server(tmp)
|
||||||
|
t = threading.Thread(target=srv.handle_request, daemon=True)
|
||||||
|
t.start()
|
||||||
|
try:
|
||||||
|
client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||||
|
client.connect(str(sock))
|
||||||
|
client.sendall(b"not valid json!\n")
|
||||||
|
line = client.makefile().readline()
|
||||||
|
client.close()
|
||||||
|
finally:
|
||||||
|
t.join(timeout=5)
|
||||||
|
srv.server_close()
|
||||||
|
resp = json.loads(line)
|
||||||
|
self.assertFalse(resp["ok"])
|
||||||
|
self.assertIn("invalid json", resp["error"])
|
||||||
|
|
||||||
|
def test_handler_empty_line_closes_silently(self):
|
||||||
|
tmp = Path(tempfile.mkdtemp())
|
||||||
|
srv, sock = self._make_server(tmp)
|
||||||
|
t = threading.Thread(target=srv.handle_request, daemon=True)
|
||||||
|
t.start()
|
||||||
|
try:
|
||||||
|
client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||||
|
client.connect(str(sock))
|
||||||
|
client.close() # immediate EOF -> readline() returns b""
|
||||||
|
finally:
|
||||||
|
t.join(timeout=5)
|
||||||
|
srv.server_close()
|
||||||
|
|
||||||
|
def test_serve_removes_existing_socket_path(self):
|
||||||
|
tmp = Path(tempfile.mkdtemp())
|
||||||
|
sock = tmp / "existing.sock"
|
||||||
|
if len(str(sock)) > 100:
|
||||||
|
self.skipTest("temp socket path too long for AF_UNIX")
|
||||||
|
sock.touch() # pre-existing file at socket path
|
||||||
|
sidecar = ForgeSidecar(
|
||||||
|
forge=FakeForge(), op_log=OpLog(tmp / "ops.jsonl"),
|
||||||
|
queue_dir=tmp / "q", run_key=("o", "r", 1),
|
||||||
|
)
|
||||||
|
srv = serve(sidecar, sock) # should unlink the pre-existing file
|
||||||
|
srv.server_close()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|||||||
@@ -2,7 +2,9 @@
|
|||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import time
|
||||||
import unittest
|
import unittest
|
||||||
|
import unittest.mock
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
|
|
||||||
from bot_bottle.orchestrator.model import STATUS_FROZEN, STATUS_RUNNING, RunRecord
|
from bot_bottle.orchestrator.model import STATUS_FROZEN, STATUS_RUNNING, RunRecord
|
||||||
@@ -61,6 +63,16 @@ class WatchdogSweepTest(unittest.TestCase):
|
|||||||
self.wd.start()
|
self.wd.start()
|
||||||
self.wd.stop()
|
self.wd.stop()
|
||||||
|
|
||||||
|
def test_loop_sweeps_stale_record(self):
|
||||||
|
# Patch tick to near-zero so the loop iterates quickly.
|
||||||
|
stale = (_NOW - timedelta(hours=1)).isoformat()
|
||||||
|
self.store.upsert(_record(5, STATUS_RUNNING, stale))
|
||||||
|
with unittest.mock.patch("bot_bottle.orchestrator.watchdog._TICK_SECS", 0.01):
|
||||||
|
self.wd.start()
|
||||||
|
time.sleep(0.05) # enough for several iterations at 0.01s tick
|
||||||
|
self.wd.stop()
|
||||||
|
self.assertEqual(STATUS_FROZEN, self.store.get("o", "r", 5).status)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|||||||
@@ -150,6 +150,12 @@ class WebhookServerTest(unittest.TestCase):
|
|||||||
self._get("/provenance?owner=x&repo=y&issue=1")
|
self._get("/provenance?owner=x&repo=y&issue=1")
|
||||||
self.assertEqual(404, ctx.exception.code)
|
self.assertEqual(404, ctx.exception.code)
|
||||||
|
|
||||||
|
def test_unknown_get_path_404(self):
|
||||||
|
self._serve()
|
||||||
|
with self.assertRaises(HTTPError) as ctx:
|
||||||
|
self._get("/nope")
|
||||||
|
self.assertEqual(404, ctx.exception.code)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|||||||
@@ -0,0 +1,158 @@
|
|||||||
|
"""Unit: GiteaClient and GiteaForge (urllib mocked — no network)."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import unittest
|
||||||
|
import urllib.error
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
from bot_bottle.contrib.gitea.client import GiteaClient, GiteaForge
|
||||||
|
|
||||||
|
|
||||||
|
def _client() -> GiteaClient:
|
||||||
|
return GiteaClient(api_url="http://g/api/v1", owner="o", repo="r", token="tok")
|
||||||
|
|
||||||
|
|
||||||
|
def _mock_response(body: bytes) -> MagicMock:
|
||||||
|
resp = MagicMock()
|
||||||
|
resp.read.return_value = body
|
||||||
|
resp.__enter__ = lambda s: s
|
||||||
|
resp.__exit__ = MagicMock(return_value=False)
|
||||||
|
return resp
|
||||||
|
|
||||||
|
|
||||||
|
class GiteaClientTest(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.client = _client()
|
||||||
|
|
||||||
|
def test_request_returns_parsed_json(self):
|
||||||
|
payload = {"number": 42}
|
||||||
|
resp = _mock_response(json.dumps(payload).encode())
|
||||||
|
with patch("urllib.request.urlopen", return_value=resp):
|
||||||
|
result = self.client._request("GET", "/repos/o/r/issues/42")
|
||||||
|
self.assertEqual(payload, result)
|
||||||
|
|
||||||
|
def test_request_empty_body_returns_none(self):
|
||||||
|
resp = _mock_response(b"")
|
||||||
|
with patch("urllib.request.urlopen", return_value=resp):
|
||||||
|
result = self.client._request("POST", "/some/path", {"x": 1})
|
||||||
|
self.assertIsNone(result)
|
||||||
|
|
||||||
|
def test_is_org_member_true_on_200(self):
|
||||||
|
mock_resp = MagicMock()
|
||||||
|
mock_resp.close = MagicMock()
|
||||||
|
with patch("urllib.request.urlopen", return_value=mock_resp):
|
||||||
|
self.assertTrue(self.client.is_org_member("myorg", "alice"))
|
||||||
|
|
||||||
|
def test_is_org_member_false_on_http_error(self):
|
||||||
|
with patch(
|
||||||
|
"urllib.request.urlopen",
|
||||||
|
side_effect=urllib.error.HTTPError("url", 404, "Not Found", {}, None),
|
||||||
|
):
|
||||||
|
self.assertFalse(self.client.is_org_member("myorg", "nobody"))
|
||||||
|
|
||||||
|
def test_get_issue(self):
|
||||||
|
resp = _mock_response(json.dumps({"number": 1}).encode())
|
||||||
|
with patch("urllib.request.urlopen", return_value=resp):
|
||||||
|
result = self.client.get_issue(1)
|
||||||
|
self.assertEqual(1, result["number"])
|
||||||
|
|
||||||
|
def test_get_pull(self):
|
||||||
|
resp = _mock_response(json.dumps({"number": 7, "merged": False}).encode())
|
||||||
|
with patch("urllib.request.urlopen", return_value=resp):
|
||||||
|
result = self.client.get_pull(7)
|
||||||
|
self.assertEqual(7, result["number"])
|
||||||
|
|
||||||
|
def test_list_comments(self):
|
||||||
|
resp = _mock_response(json.dumps([{"id": 1, "body": "hi"}]).encode())
|
||||||
|
with patch("urllib.request.urlopen", return_value=resp):
|
||||||
|
result = self.client.list_comments(1)
|
||||||
|
self.assertEqual(1, len(result))
|
||||||
|
self.assertEqual(1, result[0]["id"])
|
||||||
|
|
||||||
|
def test_create_comment(self):
|
||||||
|
resp = _mock_response(b"")
|
||||||
|
with patch("urllib.request.urlopen", return_value=resp) as mock_open:
|
||||||
|
self.client.create_comment(1, "hello")
|
||||||
|
mock_open.assert_called_once()
|
||||||
|
|
||||||
|
def test_update_issue(self):
|
||||||
|
resp = _mock_response(b"")
|
||||||
|
with patch("urllib.request.urlopen", return_value=resp) as mock_open:
|
||||||
|
self.client.update_issue(1, "new body")
|
||||||
|
mock_open.assert_called_once()
|
||||||
|
|
||||||
|
def test_request_builds_correct_url(self):
|
||||||
|
captured: list[object] = []
|
||||||
|
|
||||||
|
def fake_urlopen(req, timeout):
|
||||||
|
captured.append(req)
|
||||||
|
return _mock_response(b"{}")
|
||||||
|
|
||||||
|
with patch("urllib.request.urlopen", side_effect=fake_urlopen):
|
||||||
|
self.client.get_issue(5)
|
||||||
|
|
||||||
|
import urllib.request as ureq
|
||||||
|
req = captured[0]
|
||||||
|
assert isinstance(req, ureq.Request)
|
||||||
|
self.assertIn("/issues/5", req.full_url)
|
||||||
|
|
||||||
|
def test_request_sends_auth_header(self):
|
||||||
|
captured: list[object] = []
|
||||||
|
|
||||||
|
def fake_urlopen(req, timeout):
|
||||||
|
captured.append(req)
|
||||||
|
return _mock_response(b"{}")
|
||||||
|
|
||||||
|
with patch("urllib.request.urlopen", side_effect=fake_urlopen):
|
||||||
|
self.client.get_issue(1)
|
||||||
|
|
||||||
|
import urllib.request as ureq
|
||||||
|
req = captured[0]
|
||||||
|
assert isinstance(req, ureq.Request)
|
||||||
|
self.assertEqual("token tok", req.get_header("Authorization"))
|
||||||
|
|
||||||
|
|
||||||
|
class GiteaForgeTest(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.client = MagicMock(spec=GiteaClient)
|
||||||
|
self.forge = GiteaForge(self.client)
|
||||||
|
|
||||||
|
def test_is_org_member_delegates(self):
|
||||||
|
self.client.is_org_member.return_value = True
|
||||||
|
self.assertTrue(self.forge.is_org_member("org", "alice"))
|
||||||
|
self.client.is_org_member.assert_called_once_with("org", "alice")
|
||||||
|
|
||||||
|
def test_is_org_member_false(self):
|
||||||
|
self.client.is_org_member.return_value = False
|
||||||
|
self.assertFalse(self.forge.is_org_member("org", "outsider"))
|
||||||
|
|
||||||
|
def test_read_issue_delegates(self):
|
||||||
|
self.client.get_issue.return_value = {"number": 3}
|
||||||
|
self.assertEqual({"number": 3}, self.forge.read_issue(3))
|
||||||
|
self.client.get_issue.assert_called_once_with(3)
|
||||||
|
|
||||||
|
def test_read_pr_delegates(self):
|
||||||
|
self.client.get_pull.return_value = {"number": 5, "merged": False}
|
||||||
|
result = self.forge.read_pr(5)
|
||||||
|
self.assertEqual(5, result["number"])
|
||||||
|
self.client.get_pull.assert_called_once_with(5)
|
||||||
|
|
||||||
|
def test_read_comments_delegates(self):
|
||||||
|
self.client.list_comments.return_value = [{"id": 1}]
|
||||||
|
comments = self.forge.read_comments(1)
|
||||||
|
self.assertEqual([{"id": 1}], comments)
|
||||||
|
self.client.list_comments.assert_called_once_with(1)
|
||||||
|
|
||||||
|
def test_post_comment_delegates(self):
|
||||||
|
self.forge.post_comment(1, "looks good")
|
||||||
|
self.client.create_comment.assert_called_once_with(1, "looks good")
|
||||||
|
|
||||||
|
def test_update_description_delegates(self):
|
||||||
|
self.forge.update_description(1, "updated body")
|
||||||
|
self.client.update_issue.assert_called_once_with(1, "updated body")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
Reference in New Issue
Block a user