diff --git a/claude_bottle/supervise.py b/claude_bottle/supervise.py new file mode 100644 index 0000000..f0ed4c2 --- /dev/null +++ b/claude_bottle/supervise.py @@ -0,0 +1,499 @@ +"""Per-bottle supervise plane (PRD 0013). + +The supervise plane is the per-bottle MCP sidecar plus its host-side +queue/audit support. The sidecar (claude_bottle.supervise_server) +sits on the bottle's internal network and exposes three MCP tools the +agent calls when it hits a stuck-recovery category: + + * cred-proxy-block — agent proposes a new routes.json + * pipelock-block — agent proposes a new pipelock allowlist + * capability-block — agent proposes a new agent Dockerfile + +Each tool call: the agent passes the full proposed file plus a +justification text. The sidecar validates the proposal syntactically, +writes it to the host's per-bottle queue dir, and holds the tool-call +connection open. The operator's TUI dashboard +(claude_bottle.cli.dashboard) sees the proposal, accepts +approve / modify / reject, and writes a response file alongside the +proposal. The sidecar sees the response and returns `{status, notes}` +to the agent. + +This module defines the host-side library: dataclasses for the queue +file shapes, queue read/write helpers, the audit log writer, and the +diff renderer. The in-container sidecar lives in +claude_bottle/supervise_server.py; the Docker lifecycle in +claude_bottle/backend/docker/supervise.py. + +For 0013 the supervisor's approval handlers are deliberately no-ops: +on approval the audit log is written and the response file is +delivered to the agent, but no host-side config change happens. The +remediation engines that wire real config changes land in PRDs 0014, +0015, and 0016. +""" + +from __future__ import annotations + +import dataclasses +import difflib +import hashlib +import json +import os +import time +import uuid +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path + + +SUPERVISE_HOSTNAME = "supervise" +SUPERVISE_PORT = 9100 + +TOOL_CRED_PROXY_BLOCK = "cred-proxy-block" +TOOL_PIPELOCK_BLOCK = "pipelock-block" +TOOL_CAPABILITY_BLOCK = "capability-block" +TOOLS: tuple[str, ...] = ( + TOOL_CRED_PROXY_BLOCK, + TOOL_PIPELOCK_BLOCK, + TOOL_CAPABILITY_BLOCK, +) + +# capability-block has no on-disk config the operator edits in place +# (the Dockerfile is rebuilt, not patched), so it has no audit log +# here — those changes are captured by git history + the rebuild +# record laid down in PRD 0016. +COMPONENT_FOR_TOOL: dict[str, str] = { + TOOL_CRED_PROXY_BLOCK: "cred-proxy", + TOOL_PIPELOCK_BLOCK: "pipelock", +} + +STATUS_APPROVED = "approved" +STATUS_MODIFIED = "modified" +STATUS_REJECTED = "rejected" +STATUSES: tuple[str, ...] = (STATUS_APPROVED, STATUS_MODIFIED, STATUS_REJECTED) + +# Operator-initiated audit entries (no tool call). PRD 0014's +# `routes edit ` and PRD 0015's `pipelock edit ` +# verbs write entries with this action. +ACTION_OPERATOR_EDIT = "operator-edit" + +QUEUE_DIR_IN_CONTAINER = "/run/supervise/queue" +CURRENT_CONFIG_DIR_IN_AGENT = "/etc/claude-bottle/current-config" + +DEFAULT_POLL_INTERVAL_SEC = 0.5 + + +# --- Paths ----------------------------------------------------------------- + + +def claude_bottle_root() -> Path: + return Path.home() / ".claude-bottle" + + +def queue_dir_for_slug(slug: str) -> Path: + return claude_bottle_root() / "queue" / slug + + +def audit_dir() -> Path: + return claude_bottle_root() / "audit" + + +def audit_log_path(component: str, slug: str) -> Path: + return audit_dir() / f"{component}-{slug}.log" + + +# --- Dataclasses ----------------------------------------------------------- + + +@dataclass(frozen=True) +class Proposal: + """One pending tool-call from the agent. The sidecar writes one + of these to the queue dir on a tool call; the operator's TUI + reads them; the sidecar polls for a matching Response.""" + + id: str + bottle_slug: str + tool: str + proposed_file: str + justification: str + arrival_timestamp: str + current_file_hash: str + + @classmethod + def new( + cls, + *, + bottle_slug: str, + tool: str, + proposed_file: str, + justification: str, + current_file_hash: str, + now: datetime | None = None, + ) -> "Proposal": + ts = (now or datetime.now(timezone.utc)).isoformat() + return cls( + id=str(uuid.uuid4()), + bottle_slug=bottle_slug, + tool=tool, + proposed_file=proposed_file, + justification=justification, + arrival_timestamp=ts, + current_file_hash=current_file_hash, + ) + + def to_dict(self) -> dict[str, object]: + return dataclasses.asdict(self) + + @classmethod + def from_dict(cls, raw: dict[str, object]) -> "Proposal": + tool = _require_str(raw, "tool") + if tool not in TOOLS: + raise ValueError(f"tool must be one of {TOOLS}; got {tool!r}") + return cls( + id=_require_str(raw, "id"), + bottle_slug=_require_str(raw, "bottle_slug"), + tool=tool, + proposed_file=_require_str(raw, "proposed_file"), + justification=_require_str(raw, "justification"), + arrival_timestamp=_require_str(raw, "arrival_timestamp"), + current_file_hash=_require_str(raw, "current_file_hash"), + ) + + +@dataclass(frozen=True) +class Response: + """The operator's decision on a proposal. The TUI writes one of + these to the queue dir; the sidecar reads it and returns the + `{status, notes}` pair to the agent's tool call. + + `final_file` carries the file content the supervisor will + actually apply: for `approved`, equal to the proposal's + `proposed_file`; for `modified`, the operator's edited version + (the audit diff is current → final_file, not current → + proposed_file); for `rejected`, None.""" + + proposal_id: str + status: str + notes: str + final_file: str | None = None + + def to_dict(self) -> dict[str, object]: + return dataclasses.asdict(self) + + @classmethod + def from_dict(cls, raw: dict[str, object]) -> "Response": + status = _require_str(raw, "status") + if status not in STATUSES: + raise ValueError( + f"response status must be one of {STATUSES}; got {status!r}" + ) + final = raw.get("final_file") + if final is not None and not isinstance(final, str): + raise ValueError( + f"final_file must be a string or null; got {type(final).__name__}" + ) + return cls( + proposal_id=_require_str(raw, "proposal_id"), + status=status, + notes=_require_str(raw, "notes"), + final_file=final, + ) + + +@dataclass(frozen=True) +class AuditEntry: + """One row of the per-bottle audit log. JSON-Lines, append-only.""" + + timestamp: str + bottle_slug: str + component: str + operator_action: str + operator_notes: str + justification: str + diff: str + + def to_dict(self) -> dict[str, object]: + return dataclasses.asdict(self) + + +# --- Queue I/O ------------------------------------------------------------- + + +def _proposal_filename(proposal_id: str) -> str: + return f"{proposal_id}.proposal.json" + + +def _response_filename(proposal_id: str) -> str: + return f"{proposal_id}.response.json" + + +def _id_from_proposal_filename(path: Path) -> str | None: + name = path.name + if not name.endswith(".proposal.json"): + return None + return name[: -len(".proposal.json")] + + +def write_proposal(queue_dir: Path, proposal: Proposal) -> Path: + """Persist `proposal` as JSON in the queue dir, mode 0o600. + Directory is created if missing.""" + queue_dir.mkdir(parents=True, exist_ok=True) + path = queue_dir / _proposal_filename(proposal.id) + payload = json.dumps(proposal.to_dict(), indent=2) + "\n" + _atomic_write(path, payload, mode=0o600) + return path + + +def read_proposal(queue_dir: Path, proposal_id: str) -> Proposal: + path = queue_dir / _proposal_filename(proposal_id) + with path.open() as f: + raw = json.load(f) + if not isinstance(raw, dict): + raise ValueError(f"{path}: top-level must be an object") + return Proposal.from_dict(raw) + + +def list_pending_proposals(queue_dir: Path) -> list[Proposal]: + """All proposals in `queue_dir` that do not yet have a matching + response file. Sorted by `arrival_timestamp` so the operator + sees the queue FIFO.""" + if not queue_dir.is_dir(): + return [] + out: list[Proposal] = [] + for path in sorted(queue_dir.glob("*.proposal.json")): + proposal_id = _id_from_proposal_filename(path) + if proposal_id is None: + continue + if (queue_dir / _response_filename(proposal_id)).exists(): + continue + try: + with path.open() as f: + raw = json.load(f) + except (OSError, json.JSONDecodeError): + continue + if not isinstance(raw, dict): + continue + try: + out.append(Proposal.from_dict(raw)) + except (KeyError, ValueError): + continue + out.sort(key=lambda p: p.arrival_timestamp) + return out + + +def write_response(queue_dir: Path, response: Response) -> Path: + queue_dir.mkdir(parents=True, exist_ok=True) + path = queue_dir / _response_filename(response.proposal_id) + payload = json.dumps(response.to_dict(), indent=2) + "\n" + _atomic_write(path, payload, mode=0o600) + return path + + +def read_response(queue_dir: Path, proposal_id: str) -> Response: + path = queue_dir / _response_filename(proposal_id) + with path.open() as f: + raw = json.load(f) + if not isinstance(raw, dict): + raise ValueError(f"{path}: top-level must be an object") + return Response.from_dict(raw) + + +def wait_for_response( + queue_dir: Path, + proposal_id: str, + *, + poll_interval: float = DEFAULT_POLL_INTERVAL_SEC, + deadline: float | None = None, +) -> Response: + """Block until a response file appears for `proposal_id`, then + return it. `deadline` is an absolute time.monotonic() value after + which the wait raises TimeoutError. None waits forever — the + natural shape, since the operator's response time is unbounded. + + Polls the filesystem so the implementation stays portable and + stdlib-only.""" + path = queue_dir / _response_filename(proposal_id) + while True: + if path.exists(): + try: + with path.open() as f: + raw = json.load(f) + except (OSError, json.JSONDecodeError): + raw = None + if isinstance(raw, dict): + try: + return Response.from_dict(raw) + except (KeyError, ValueError): + pass + if deadline is not None and time.monotonic() >= deadline: + raise TimeoutError(f"no response for proposal {proposal_id!r}") + time.sleep(poll_interval) + + +def archive_proposal(queue_dir: Path, proposal_id: str) -> None: + """Move both proposal and response files to `/processed/`. + Idempotent — missing files are silently skipped.""" + processed = queue_dir / "processed" + processed.mkdir(parents=True, exist_ok=True) + for name in (_proposal_filename(proposal_id), _response_filename(proposal_id)): + src = queue_dir / name + if src.exists(): + src.rename(processed / name) + + +# --- Audit log ------------------------------------------------------------- + + +def write_audit_entry(entry: AuditEntry) -> Path: + """Append `entry` as one JSON-Lines record to the per-bottle + audit log. Acquires an advisory exclusive lock so concurrent + writers don't interleave bytes.""" + path = audit_log_path(entry.component, entry.bottle_slug) + path.parent.mkdir(parents=True, exist_ok=True) + line = json.dumps(entry.to_dict(), sort_keys=False) + "\n" + fd = os.open(path, os.O_WRONLY | os.O_APPEND | os.O_CREAT, 0o600) + try: + _try_flock(fd) + try: + os.write(fd, line.encode("utf-8")) + finally: + _try_funlock(fd) + finally: + os.close(fd) + return path + + +def read_audit_entries(component: str, slug: str) -> list[AuditEntry]: + """Load all audit entries for the given component+slug. Empty + list if the log doesn't exist.""" + path = audit_log_path(component, slug) + if not path.is_file(): + return [] + out: list[AuditEntry] = [] + with path.open() as f: + for raw_line in f: + raw_line = raw_line.strip() + if not raw_line: + continue + try: + raw = json.loads(raw_line) + except json.JSONDecodeError: + continue + if not isinstance(raw, dict): + continue + try: + out.append(AuditEntry( + timestamp=_require_str(raw, "timestamp"), + bottle_slug=_require_str(raw, "bottle_slug"), + component=_require_str(raw, "component"), + operator_action=_require_str(raw, "operator_action"), + operator_notes=_require_str(raw, "operator_notes"), + justification=_require_str(raw, "justification"), + diff=_require_str(raw, "diff"), + )) + except ValueError: + continue + return out + + +# --- Diff rendering -------------------------------------------------------- + + +def render_diff(before: str, after: str, *, label: str = "config") -> str: + """Unified diff suitable for the audit log + TUI. Empty diff (no + changes) renders as the empty string.""" + diff = difflib.unified_diff( + before.splitlines(keepends=True), + after.splitlines(keepends=True), + fromfile=f"{label} (current)", + tofile=f"{label} (proposed)", + lineterm="", + ) + parts = list(diff) + if not parts: + return "" + return "".join(p if p.endswith("\n") else p + "\n" for p in parts).rstrip("\n") + + +def sha256_hex(content: str) -> str: + return hashlib.sha256(content.encode("utf-8")).hexdigest() + + +# --- Helpers --------------------------------------------------------------- + + +def _require_str(raw: dict[str, object], key: str) -> str: + value = raw.get(key) + if not isinstance(value, str): + raise ValueError(f"missing or non-string field {key!r}") + return value + + +def _atomic_write(path: Path, content: str, *, mode: int) -> None: + """Atomic: write to a sibling tmp file, fsync, rename.""" + tmp = path.with_suffix(path.suffix + ".tmp") + fd = os.open(tmp, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, mode) + try: + os.write(fd, content.encode("utf-8")) + os.fsync(fd) + finally: + os.close(fd) + os.replace(tmp, path) + + +try: + import fcntl as _fcntl + + def _try_flock(fd: int) -> None: + try: + _fcntl.flock(fd, _fcntl.LOCK_EX) + except OSError: + pass + + def _try_funlock(fd: int) -> None: + try: + _fcntl.flock(fd, _fcntl.LOCK_UN) + except OSError: + pass +except ImportError: # pragma: no cover — Windows path + def _try_flock(fd: int) -> None: + return None + + def _try_funlock(fd: int) -> None: + return None + + +__all__ = [ + "ACTION_OPERATOR_EDIT", + "AuditEntry", + "COMPONENT_FOR_TOOL", + "CURRENT_CONFIG_DIR_IN_AGENT", + "DEFAULT_POLL_INTERVAL_SEC", + "Proposal", + "QUEUE_DIR_IN_CONTAINER", + "Response", + "STATUSES", + "STATUS_APPROVED", + "STATUS_MODIFIED", + "STATUS_REJECTED", + "SUPERVISE_HOSTNAME", + "SUPERVISE_PORT", + "TOOLS", + "TOOL_CAPABILITY_BLOCK", + "TOOL_CRED_PROXY_BLOCK", + "TOOL_PIPELOCK_BLOCK", + "archive_proposal", + "audit_dir", + "audit_log_path", + "claude_bottle_root", + "list_pending_proposals", + "queue_dir_for_slug", + "read_audit_entries", + "read_proposal", + "read_response", + "render_diff", + "sha256_hex", + "wait_for_response", + "write_audit_entry", + "write_proposal", + "write_response", +] diff --git a/tests/unit/test_supervise.py b/tests/unit/test_supervise.py new file mode 100644 index 0000000..700ba48 --- /dev/null +++ b/tests/unit/test_supervise.py @@ -0,0 +1,328 @@ +"""Unit: supervise queue + audit log + diff helpers (PRD 0013).""" + +import json +import tempfile +import threading +import time +import unittest +from datetime import datetime, timezone +from pathlib import Path + +from claude_bottle import supervise +from claude_bottle.supervise import ( + AuditEntry, + Proposal, + Response, + STATUS_APPROVED, + STATUS_MODIFIED, + STATUS_REJECTED, + TOOL_CAPABILITY_BLOCK, + TOOL_CRED_PROXY_BLOCK, + TOOL_PIPELOCK_BLOCK, + archive_proposal, + audit_log_path, + list_pending_proposals, + read_audit_entries, + read_proposal, + read_response, + render_diff, + sha256_hex, + wait_for_response, + write_audit_entry, + write_proposal, + write_response, +) + + +FIXED_TS = datetime(2026, 5, 25, 12, 0, 0, tzinfo=timezone.utc) + + +def _proposal(tool: str = TOOL_CRED_PROXY_BLOCK, proposed: str = "{}", justification: str = "need a route") -> Proposal: + return Proposal.new( + bottle_slug="dev", + tool=tool, + proposed_file=proposed, + justification=justification, + current_file_hash=sha256_hex("{}"), + now=FIXED_TS, + ) + + +class TestProposalRoundtrip(unittest.TestCase): + def test_new_stamps_uuid_and_iso_timestamp(self): + p = _proposal() + self.assertTrue(p.id) + self.assertEqual("2026-05-25T12:00:00+00:00", p.arrival_timestamp) + self.assertEqual("dev", p.bottle_slug) + self.assertEqual(TOOL_CRED_PROXY_BLOCK, p.tool) + + def test_to_from_dict_roundtrip(self): + p = _proposal() + self.assertEqual(p, Proposal.from_dict(p.to_dict())) + + def test_from_dict_rejects_unknown_tool(self): + raw = _proposal().to_dict() + raw["tool"] = "not-a-real-tool" + with self.assertRaises(ValueError): + Proposal.from_dict(raw) + + def test_from_dict_rejects_missing_field(self): + raw = _proposal().to_dict() + del raw["justification"] + with self.assertRaises(ValueError): + Proposal.from_dict(raw) + + +class TestResponseRoundtrip(unittest.TestCase): + def test_to_from_dict_approved(self): + r = Response(proposal_id="abc", status=STATUS_APPROVED, notes="lgtm") + self.assertEqual(r, Response.from_dict(r.to_dict())) + + def test_to_from_dict_modified_with_final_file(self): + r = Response( + proposal_id="abc", + status=STATUS_MODIFIED, + notes="tweaked the upstream", + final_file='{"routes": []}\n', + ) + self.assertEqual(r, Response.from_dict(r.to_dict())) + + def test_rejects_unknown_status(self): + with self.assertRaises(ValueError): + Response.from_dict({ + "proposal_id": "abc", + "status": "maybe", + "notes": "", + "final_file": None, + }) + + def test_rejects_non_string_final_file(self): + with self.assertRaises(ValueError): + Response.from_dict({ + "proposal_id": "abc", + "status": STATUS_APPROVED, + "notes": "", + "final_file": 123, + }) + + +class TestQueueIO(unittest.TestCase): + def setUp(self): + self._tmp = tempfile.TemporaryDirectory(prefix="claude-bottle-supervise-test.") + self.queue_dir = Path(self._tmp.name) + + def tearDown(self): + self._tmp.cleanup() + + def test_write_and_read_proposal(self): + p = _proposal() + path = write_proposal(self.queue_dir, p) + self.assertTrue(path.exists()) + self.assertEqual(0o600, path.stat().st_mode & 0o777) + loaded = read_proposal(self.queue_dir, p.id) + self.assertEqual(p, loaded) + + def test_list_pending_excludes_responded(self): + a = _proposal(justification="first") + b = _proposal(justification="second") + write_proposal(self.queue_dir, a) + write_proposal(self.queue_dir, b) + write_response(self.queue_dir, Response( + proposal_id=a.id, status=STATUS_APPROVED, notes="", + )) + pending = list_pending_proposals(self.queue_dir) + self.assertEqual([b.id], [p.id for p in pending]) + + def test_list_pending_returns_empty_for_missing_dir(self): + self.assertEqual([], list_pending_proposals(self.queue_dir / "nope")) + + def test_list_pending_sorted_by_arrival(self): + # Fabricate two with explicit timestamps. + a = Proposal.new( + bottle_slug="dev", tool=TOOL_CRED_PROXY_BLOCK, + proposed_file="{}", justification="early", + current_file_hash="x", + now=datetime(2026, 5, 25, 10, 0, 0, tzinfo=timezone.utc), + ) + b = Proposal.new( + bottle_slug="dev", tool=TOOL_CRED_PROXY_BLOCK, + proposed_file="{}", justification="late", + current_file_hash="x", + now=datetime(2026, 5, 25, 14, 0, 0, tzinfo=timezone.utc), + ) + # Write in reverse order. + write_proposal(self.queue_dir, b) + write_proposal(self.queue_dir, a) + ordered = list_pending_proposals(self.queue_dir) + self.assertEqual([a.id, b.id], [p.id for p in ordered]) + + def test_write_and_read_response(self): + r = Response(proposal_id="xyz", status=STATUS_REJECTED, notes="no") + write_response(self.queue_dir, r) + self.assertEqual(r, read_response(self.queue_dir, "xyz")) + + def test_wait_for_response_returns_when_file_appears(self): + p = _proposal() + write_proposal(self.queue_dir, p) + + def write_after_delay(): + time.sleep(0.05) + write_response(self.queue_dir, Response( + proposal_id=p.id, status=STATUS_APPROVED, notes="ok", + )) + + t = threading.Thread(target=write_after_delay) + t.start() + try: + r = wait_for_response(self.queue_dir, p.id, poll_interval=0.01) + finally: + t.join() + self.assertEqual(STATUS_APPROVED, r.status) + self.assertEqual("ok", r.notes) + + def test_wait_for_response_times_out(self): + deadline = time.monotonic() + 0.05 + with self.assertRaises(TimeoutError): + wait_for_response( + self.queue_dir, "never", + poll_interval=0.01, deadline=deadline, + ) + + def test_archive_proposal_moves_both_files(self): + p = _proposal() + write_proposal(self.queue_dir, p) + write_response(self.queue_dir, Response( + proposal_id=p.id, status=STATUS_APPROVED, notes="", + )) + archive_proposal(self.queue_dir, p.id) + self.assertFalse((self.queue_dir / f"{p.id}.proposal.json").exists()) + self.assertFalse((self.queue_dir / f"{p.id}.response.json").exists()) + self.assertTrue((self.queue_dir / "processed" / f"{p.id}.proposal.json").exists()) + self.assertTrue((self.queue_dir / "processed" / f"{p.id}.response.json").exists()) + + def test_archive_is_idempotent_on_missing_files(self): + # Should not raise. + archive_proposal(self.queue_dir, "nope") + + +class TestAuditLog(unittest.TestCase): + def setUp(self): + self._tmp = tempfile.TemporaryDirectory(prefix="claude-bottle-supervise-audit.") + self._home_patch = self._patch_home(Path(self._tmp.name)) + + def tearDown(self): + self._home_patch() + self._tmp.cleanup() + + def _patch_home(self, fake_home: Path): + original = supervise.claude_bottle_root + + def fake_root() -> Path: + return fake_home / ".claude-bottle" + + supervise.claude_bottle_root = fake_root # type: ignore[assignment] + return lambda: setattr(supervise, "claude_bottle_root", original) + + def test_write_then_read_single_entry(self): + e = AuditEntry( + timestamp="2026-05-25T12:00:00+00:00", + bottle_slug="dev", + component="cred-proxy", + operator_action=STATUS_APPROVED, + operator_notes="lgtm", + justification="agent needed gh-api token", + diff="--- before\n+++ after\n", + ) + path = write_audit_entry(e) + self.assertEqual(0o600, path.stat().st_mode & 0o777) + loaded = read_audit_entries("cred-proxy", "dev") + self.assertEqual([e], loaded) + + def test_appends_one_line_per_entry(self): + for i in range(3): + write_audit_entry(AuditEntry( + timestamp=f"2026-05-25T12:00:0{i}+00:00", + bottle_slug="dev", + component="pipelock", + operator_action=STATUS_APPROVED, + operator_notes=f"n{i}", + justification="", + diff="", + )) + path = audit_log_path("pipelock", "dev") + with path.open() as f: + lines = [line for line in f if line.strip()] + self.assertEqual(3, len(lines)) + for line in lines: + self.assertTrue(json.loads(line)) # each line is valid JSON + + def test_separate_logs_per_component_slug(self): + write_audit_entry(AuditEntry( + timestamp="t", + bottle_slug="dev", + component="cred-proxy", + operator_action=STATUS_APPROVED, + operator_notes="", + justification="", + diff="", + )) + write_audit_entry(AuditEntry( + timestamp="t", + bottle_slug="dev", + component="pipelock", + operator_action=STATUS_APPROVED, + operator_notes="", + justification="", + diff="", + )) + write_audit_entry(AuditEntry( + timestamp="t", + bottle_slug="other", + component="cred-proxy", + operator_action=STATUS_REJECTED, + operator_notes="", + justification="", + diff="", + )) + self.assertEqual(1, len(read_audit_entries("cred-proxy", "dev"))) + self.assertEqual(1, len(read_audit_entries("pipelock", "dev"))) + self.assertEqual(1, len(read_audit_entries("cred-proxy", "other"))) + + def test_read_audit_entries_missing_log_returns_empty(self): + self.assertEqual([], read_audit_entries("cred-proxy", "no-such-bottle")) + + +class TestDiffAndHash(unittest.TestCase): + def test_render_diff_returns_empty_when_unchanged(self): + self.assertEqual("", render_diff("a\nb\n", "a\nb\n")) + + def test_render_diff_shows_changes(self): + diff = render_diff("a\nb\nc\n", "a\nB\nc\n", label="routes.json") + self.assertIn("routes.json (current)", diff) + self.assertIn("routes.json (proposed)", diff) + self.assertIn("-b", diff) + self.assertIn("+B", diff) + + def test_sha256_hex_is_deterministic_and_hex(self): + h1 = sha256_hex("hello") + h2 = sha256_hex("hello") + self.assertEqual(h1, h2) + self.assertEqual(64, len(h1)) + int(h1, 16) # parses as hex + + +class TestToolConstants(unittest.TestCase): + def test_tools_tuple_matches_individual_constants(self): + self.assertEqual( + (TOOL_CRED_PROXY_BLOCK, TOOL_PIPELOCK_BLOCK, TOOL_CAPABILITY_BLOCK), + supervise.TOOLS, + ) + + def test_component_map_covers_two_remediation_tools_only(self): + self.assertIn(TOOL_CRED_PROXY_BLOCK, supervise.COMPONENT_FOR_TOOL) + self.assertIn(TOOL_PIPELOCK_BLOCK, supervise.COMPONENT_FOR_TOOL) + self.assertNotIn(TOOL_CAPABILITY_BLOCK, supervise.COMPONENT_FOR_TOOL) + + +if __name__ == "__main__": + unittest.main()