test(supervise): update edge cases for sqlite storage
lint / lint (push) Successful in 1m55s
test / unit (pull_request) Successful in 53s
test / integration (pull_request) Successful in 20s
test / coverage (pull_request) Successful in 1m10s

This commit is contained in:
2026-07-01 16:57:45 +00:00
parent 08918f9a8a
commit f1b8bbdfa1
+51 -38
View File
@@ -4,7 +4,6 @@ fallback paths."""
from __future__ import annotations from __future__ import annotations
import os
import tempfile import tempfile
import time import time
import unittest import unittest
@@ -13,13 +12,16 @@ from unittest.mock import patch
from bot_bottle import supervise from bot_bottle import supervise
from bot_bottle.supervise import ( from bot_bottle.supervise import (
AuditEntry,
Proposal, Proposal,
STATUS_APPROVED,
TOOL_EGRESS_ALLOW, TOOL_EGRESS_ALLOW,
list_pending_proposals, list_pending_proposals,
read_audit_entries, read_audit_entries,
read_proposal, read_proposal,
read_response, read_response,
wait_for_response, wait_for_response,
write_audit_entry,
) )
@@ -40,29 +42,29 @@ class TestPathHelpers(unittest.TestCase):
def test_queue_dir_for_slug(self) -> None: def test_queue_dir_for_slug(self) -> None:
self.assertIn("slug", str(supervise.queue_dir_for_slug("slug"))) self.assertIn("slug", str(supervise.queue_dir_for_slug("slug")))
def test_id_from_non_proposal_filename(self) -> None: def test_queue_db_path_for_slug_dir(self) -> None:
self.assertIsNone(supervise._id_from_proposal_filename(Path("x.response.json"))) self.assertEqual(
Path("/tmp/queue/supervise.db"),
supervise.queue_db_path(Path("/tmp/queue")),
)
class TestReadMalformed(unittest.TestCase): class TestReadMalformed(unittest.TestCase):
def test_read_proposal_non_dict(self) -> None: def test_read_proposal_missing_row(self) -> None:
with tempfile.TemporaryDirectory() as d: with tempfile.TemporaryDirectory() as d:
(Path(d) / "p.proposal.json").write_text("[]") with self.assertRaises(FileNotFoundError):
with self.assertRaises(ValueError):
read_proposal(Path(d), "p") read_proposal(Path(d), "p")
def test_read_response_non_dict(self) -> None: def test_read_response_missing_row(self) -> None:
with tempfile.TemporaryDirectory() as d: with tempfile.TemporaryDirectory() as d:
(Path(d) / "p.response.json").write_text("[]") with self.assertRaises(FileNotFoundError):
with self.assertRaises(ValueError):
read_response(Path(d), "p") read_response(Path(d), "p")
def test_list_pending_skips_malformed(self) -> None: def test_list_pending_ignores_legacy_json_files(self) -> None:
with tempfile.TemporaryDirectory() as d: with tempfile.TemporaryDirectory() as d:
qd = Path(d) qd = Path(d)
(qd / "bad.proposal.json").write_text("{ not json") (qd / "bad.proposal.json").write_text("{ not json")
(qd / "arr.proposal.json").write_text("[]") (qd / "arr.proposal.json").write_text("[]")
(qd / "incomplete.proposal.json").write_text("{}") # from_dict raises
supervise.write_proposal(qd, _proposal()) # one valid supervise.write_proposal(qd, _proposal()) # one valid
pending = list_pending_proposals(qd) pending = list_pending_proposals(qd)
self.assertEqual(1, len(pending)) self.assertEqual(1, len(pending))
@@ -73,18 +75,21 @@ class TestReadMalformed(unittest.TestCase):
qd = Path(d) qd = Path(d)
p = _proposal() p = _proposal()
supervise.write_proposal(qd, p) supervise.write_proposal(qd, p)
(qd / f"{p.id}.response.json").write_text("{}") # response exists -> skipped supervise.write_response(qd, supervise.Response(
proposal_id=p.id,
status=STATUS_APPROVED,
notes="",
))
self.assertEqual([], list_pending_proposals(qd)) self.assertEqual([], list_pending_proposals(qd))
class TestWaitForResponse(unittest.TestCase): class TestWaitForResponse(unittest.TestCase):
def test_malformed_response_then_timeout(self) -> None: def test_missing_response_times_out(self) -> None:
with tempfile.TemporaryDirectory() as d: with tempfile.TemporaryDirectory() as d:
(Path(d) / "p.response.json").write_text("{ not json")
with self.assertRaises(TimeoutError): with self.assertRaises(TimeoutError):
wait_for_response(Path(d), "p", deadline=time.monotonic()) wait_for_response(Path(d), "p", deadline=time.monotonic())
def test_incomplete_response_then_timeout(self) -> None: def test_legacy_response_file_does_not_count(self) -> None:
with tempfile.TemporaryDirectory() as d: with tempfile.TemporaryDirectory() as d:
(Path(d) / "p.response.json").write_text("{}") # dict but from_dict raises (Path(d) / "p.response.json").write_text("{}") # dict but from_dict raises
with self.assertRaises(TimeoutError): with self.assertRaises(TimeoutError):
@@ -97,35 +102,43 @@ class TestReadAuditEntries(unittest.TestCase):
patch.dict("os.environ", {"HOME": home}): patch.dict("os.environ", {"HOME": home}):
self.assertEqual([], read_audit_entries("egress", "nope")) self.assertEqual([], read_audit_entries("egress", "nope"))
def test_skips_malformed_lines(self) -> None: def test_reads_entries_from_db(self) -> None:
with tempfile.TemporaryDirectory() as home, \ with tempfile.TemporaryDirectory() as home, \
patch.dict("os.environ", {"HOME": home}): patch.dict("os.environ", {"HOME": home}):
path = supervise.audit_log_path("egress", "slug") write_audit_entry(AuditEntry(
path.parent.mkdir(parents=True, exist_ok=True) timestamp="t",
valid = ( bottle_slug="slug",
'{"timestamp": "t", "bottle_slug": "slug", "component": "egress",' component="egress",
' "operator_action": "approve", "operator_notes": "",' operator_action="approve",
' "justification": "", "diff": ""}' operator_notes="",
) justification="",
path.write_text( diff="",
"\n" # blank line skipped ))
"{ not json\n" # JSONDecodeError skipped write_audit_entry(AuditEntry(
"[]\n" # not a dict skipped timestamp="t",
"{}\n" # missing fields -> ValueError skipped bottle_slug="other",
+ valid + "\n" component="egress",
) operator_action="reject",
operator_notes="",
justification="",
diff="",
))
entries = read_audit_entries("egress", "slug") entries = read_audit_entries("egress", "slug")
self.assertEqual(1, len(entries)) self.assertEqual(1, len(entries))
self.assertEqual("approve", entries[0].operator_action) self.assertEqual("approve", entries[0].operator_action)
def test_legacy_audit_log_file_does_not_count(self) -> None:
class TestFlockFallback(unittest.TestCase): with tempfile.TemporaryDirectory() as home, \
def test_flock_on_closed_fd_is_swallowed(self) -> None: patch.dict("os.environ", {"HOME": home}):
# flock on a closed fd raises OSError(EBADF), which the helpers swallow. path = supervise.audit_log_path("egress", "slug")
fd = os.open(os.devnull, os.O_RDONLY) path.parent.mkdir(parents=True, exist_ok=True)
os.close(fd) path.write_text(
supervise._try_flock(fd) '{"timestamp": "t", "bottle_slug": "slug", "component": "egress",'
supervise._try_funlock(fd) ' "operator_action": "approve", "operator_notes": "",'
' "justification": "", "diff": ""}\n'
)
entries = read_audit_entries("egress", "slug")
self.assertEqual([], entries)
if __name__ == "__main__": if __name__ == "__main__":