feat(supervise): host-side queue + audit log primitives (PRD 0013)

Phase 1 of PRD 0013. Adds claude_bottle/supervise.py with:

- Proposal / Response / AuditEntry dataclasses
- Per-bottle queue dir under ~/.claude-bottle/queue/<slug>/
- write/read/list/archive proposal helpers + wait_for_response
- Audit log writer (JSON-Lines under ~/.claude-bottle/audit/)
- Unified-diff rendering + sha256 helper for stale-proposal detection

Stdlib-only; in-container code (Phase 2) and Docker lifecycle
(Phase 3) follow. Tests cover queue, audit, and diff/hash helpers.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-25 03:56:50 -04:00
parent 578363bea3
commit 2e06090464
2 changed files with 827 additions and 0 deletions
+499
View File
@@ -0,0 +1,499 @@
"""Per-bottle supervise plane (PRD 0013).
The supervise plane is the per-bottle MCP sidecar plus its host-side
queue/audit support. The sidecar (claude_bottle.supervise_server)
sits on the bottle's internal network and exposes three MCP tools the
agent calls when it hits a stuck-recovery category:
* cred-proxy-block — agent proposes a new routes.json
* pipelock-block — agent proposes a new pipelock allowlist
* capability-block — agent proposes a new agent Dockerfile
Each tool call: the agent passes the full proposed file plus a
justification text. The sidecar validates the proposal syntactically,
writes it to the host's per-bottle queue dir, and holds the tool-call
connection open. The operator's TUI dashboard
(claude_bottle.cli.dashboard) sees the proposal, accepts
approve / modify / reject, and writes a response file alongside the
proposal. The sidecar sees the response and returns `{status, notes}`
to the agent.
This module defines the host-side library: dataclasses for the queue
file shapes, queue read/write helpers, the audit log writer, and the
diff renderer. The in-container sidecar lives in
claude_bottle/supervise_server.py; the Docker lifecycle in
claude_bottle/backend/docker/supervise.py.
For 0013 the supervisor's approval handlers are deliberately no-ops:
on approval the audit log is written and the response file is
delivered to the agent, but no host-side config change happens. The
remediation engines that wire real config changes land in PRDs 0014,
0015, and 0016.
"""
from __future__ import annotations
import dataclasses
import difflib
import hashlib
import json
import os
import time
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
SUPERVISE_HOSTNAME = "supervise"
SUPERVISE_PORT = 9100
TOOL_CRED_PROXY_BLOCK = "cred-proxy-block"
TOOL_PIPELOCK_BLOCK = "pipelock-block"
TOOL_CAPABILITY_BLOCK = "capability-block"
TOOLS: tuple[str, ...] = (
TOOL_CRED_PROXY_BLOCK,
TOOL_PIPELOCK_BLOCK,
TOOL_CAPABILITY_BLOCK,
)
# capability-block has no on-disk config the operator edits in place
# (the Dockerfile is rebuilt, not patched), so it has no audit log
# here — those changes are captured by git history + the rebuild
# record laid down in PRD 0016.
COMPONENT_FOR_TOOL: dict[str, str] = {
TOOL_CRED_PROXY_BLOCK: "cred-proxy",
TOOL_PIPELOCK_BLOCK: "pipelock",
}
STATUS_APPROVED = "approved"
STATUS_MODIFIED = "modified"
STATUS_REJECTED = "rejected"
STATUSES: tuple[str, ...] = (STATUS_APPROVED, STATUS_MODIFIED, STATUS_REJECTED)
# Operator-initiated audit entries (no tool call). PRD 0014's
# `routes edit <bottle>` and PRD 0015's `pipelock edit <bottle>`
# verbs write entries with this action.
ACTION_OPERATOR_EDIT = "operator-edit"
QUEUE_DIR_IN_CONTAINER = "/run/supervise/queue"
CURRENT_CONFIG_DIR_IN_AGENT = "/etc/claude-bottle/current-config"
DEFAULT_POLL_INTERVAL_SEC = 0.5
# --- Paths -----------------------------------------------------------------
def claude_bottle_root() -> Path:
return Path.home() / ".claude-bottle"
def queue_dir_for_slug(slug: str) -> Path:
return claude_bottle_root() / "queue" / slug
def audit_dir() -> Path:
return claude_bottle_root() / "audit"
def audit_log_path(component: str, slug: str) -> Path:
return audit_dir() / f"{component}-{slug}.log"
# --- Dataclasses -----------------------------------------------------------
@dataclass(frozen=True)
class Proposal:
"""One pending tool-call from the agent. The sidecar writes one
of these to the queue dir on a tool call; the operator's TUI
reads them; the sidecar polls for a matching Response."""
id: str
bottle_slug: str
tool: str
proposed_file: str
justification: str
arrival_timestamp: str
current_file_hash: str
@classmethod
def new(
cls,
*,
bottle_slug: str,
tool: str,
proposed_file: str,
justification: str,
current_file_hash: str,
now: datetime | None = None,
) -> "Proposal":
ts = (now or datetime.now(timezone.utc)).isoformat()
return cls(
id=str(uuid.uuid4()),
bottle_slug=bottle_slug,
tool=tool,
proposed_file=proposed_file,
justification=justification,
arrival_timestamp=ts,
current_file_hash=current_file_hash,
)
def to_dict(self) -> dict[str, object]:
return dataclasses.asdict(self)
@classmethod
def from_dict(cls, raw: dict[str, object]) -> "Proposal":
tool = _require_str(raw, "tool")
if tool not in TOOLS:
raise ValueError(f"tool must be one of {TOOLS}; got {tool!r}")
return cls(
id=_require_str(raw, "id"),
bottle_slug=_require_str(raw, "bottle_slug"),
tool=tool,
proposed_file=_require_str(raw, "proposed_file"),
justification=_require_str(raw, "justification"),
arrival_timestamp=_require_str(raw, "arrival_timestamp"),
current_file_hash=_require_str(raw, "current_file_hash"),
)
@dataclass(frozen=True)
class Response:
"""The operator's decision on a proposal. The TUI writes one of
these to the queue dir; the sidecar reads it and returns the
`{status, notes}` pair to the agent's tool call.
`final_file` carries the file content the supervisor will
actually apply: for `approved`, equal to the proposal's
`proposed_file`; for `modified`, the operator's edited version
(the audit diff is current → final_file, not current →
proposed_file); for `rejected`, None."""
proposal_id: str
status: str
notes: str
final_file: str | None = None
def to_dict(self) -> dict[str, object]:
return dataclasses.asdict(self)
@classmethod
def from_dict(cls, raw: dict[str, object]) -> "Response":
status = _require_str(raw, "status")
if status not in STATUSES:
raise ValueError(
f"response status must be one of {STATUSES}; got {status!r}"
)
final = raw.get("final_file")
if final is not None and not isinstance(final, str):
raise ValueError(
f"final_file must be a string or null; got {type(final).__name__}"
)
return cls(
proposal_id=_require_str(raw, "proposal_id"),
status=status,
notes=_require_str(raw, "notes"),
final_file=final,
)
@dataclass(frozen=True)
class AuditEntry:
"""One row of the per-bottle audit log. JSON-Lines, append-only."""
timestamp: str
bottle_slug: str
component: str
operator_action: str
operator_notes: str
justification: str
diff: str
def to_dict(self) -> dict[str, object]:
return dataclasses.asdict(self)
# --- Queue I/O -------------------------------------------------------------
def _proposal_filename(proposal_id: str) -> str:
return f"{proposal_id}.proposal.json"
def _response_filename(proposal_id: str) -> str:
return f"{proposal_id}.response.json"
def _id_from_proposal_filename(path: Path) -> str | None:
name = path.name
if not name.endswith(".proposal.json"):
return None
return name[: -len(".proposal.json")]
def write_proposal(queue_dir: Path, proposal: Proposal) -> Path:
"""Persist `proposal` as JSON in the queue dir, mode 0o600.
Directory is created if missing."""
queue_dir.mkdir(parents=True, exist_ok=True)
path = queue_dir / _proposal_filename(proposal.id)
payload = json.dumps(proposal.to_dict(), indent=2) + "\n"
_atomic_write(path, payload, mode=0o600)
return path
def read_proposal(queue_dir: Path, proposal_id: str) -> Proposal:
path = queue_dir / _proposal_filename(proposal_id)
with path.open() as f:
raw = json.load(f)
if not isinstance(raw, dict):
raise ValueError(f"{path}: top-level must be an object")
return Proposal.from_dict(raw)
def list_pending_proposals(queue_dir: Path) -> list[Proposal]:
"""All proposals in `queue_dir` that do not yet have a matching
response file. Sorted by `arrival_timestamp` so the operator
sees the queue FIFO."""
if not queue_dir.is_dir():
return []
out: list[Proposal] = []
for path in sorted(queue_dir.glob("*.proposal.json")):
proposal_id = _id_from_proposal_filename(path)
if proposal_id is None:
continue
if (queue_dir / _response_filename(proposal_id)).exists():
continue
try:
with path.open() as f:
raw = json.load(f)
except (OSError, json.JSONDecodeError):
continue
if not isinstance(raw, dict):
continue
try:
out.append(Proposal.from_dict(raw))
except (KeyError, ValueError):
continue
out.sort(key=lambda p: p.arrival_timestamp)
return out
def write_response(queue_dir: Path, response: Response) -> Path:
queue_dir.mkdir(parents=True, exist_ok=True)
path = queue_dir / _response_filename(response.proposal_id)
payload = json.dumps(response.to_dict(), indent=2) + "\n"
_atomic_write(path, payload, mode=0o600)
return path
def read_response(queue_dir: Path, proposal_id: str) -> Response:
path = queue_dir / _response_filename(proposal_id)
with path.open() as f:
raw = json.load(f)
if not isinstance(raw, dict):
raise ValueError(f"{path}: top-level must be an object")
return Response.from_dict(raw)
def wait_for_response(
queue_dir: Path,
proposal_id: str,
*,
poll_interval: float = DEFAULT_POLL_INTERVAL_SEC,
deadline: float | None = None,
) -> Response:
"""Block until a response file appears for `proposal_id`, then
return it. `deadline` is an absolute time.monotonic() value after
which the wait raises TimeoutError. None waits forever — the
natural shape, since the operator's response time is unbounded.
Polls the filesystem so the implementation stays portable and
stdlib-only."""
path = queue_dir / _response_filename(proposal_id)
while True:
if path.exists():
try:
with path.open() as f:
raw = json.load(f)
except (OSError, json.JSONDecodeError):
raw = None
if isinstance(raw, dict):
try:
return Response.from_dict(raw)
except (KeyError, ValueError):
pass
if deadline is not None and time.monotonic() >= deadline:
raise TimeoutError(f"no response for proposal {proposal_id!r}")
time.sleep(poll_interval)
def archive_proposal(queue_dir: Path, proposal_id: str) -> None:
"""Move both proposal and response files to `<queue_dir>/processed/`.
Idempotent — missing files are silently skipped."""
processed = queue_dir / "processed"
processed.mkdir(parents=True, exist_ok=True)
for name in (_proposal_filename(proposal_id), _response_filename(proposal_id)):
src = queue_dir / name
if src.exists():
src.rename(processed / name)
# --- Audit log -------------------------------------------------------------
def write_audit_entry(entry: AuditEntry) -> Path:
"""Append `entry` as one JSON-Lines record to the per-bottle
audit log. Acquires an advisory exclusive lock so concurrent
writers don't interleave bytes."""
path = audit_log_path(entry.component, entry.bottle_slug)
path.parent.mkdir(parents=True, exist_ok=True)
line = json.dumps(entry.to_dict(), sort_keys=False) + "\n"
fd = os.open(path, os.O_WRONLY | os.O_APPEND | os.O_CREAT, 0o600)
try:
_try_flock(fd)
try:
os.write(fd, line.encode("utf-8"))
finally:
_try_funlock(fd)
finally:
os.close(fd)
return path
def read_audit_entries(component: str, slug: str) -> list[AuditEntry]:
"""Load all audit entries for the given component+slug. Empty
list if the log doesn't exist."""
path = audit_log_path(component, slug)
if not path.is_file():
return []
out: list[AuditEntry] = []
with path.open() as f:
for raw_line in f:
raw_line = raw_line.strip()
if not raw_line:
continue
try:
raw = json.loads(raw_line)
except json.JSONDecodeError:
continue
if not isinstance(raw, dict):
continue
try:
out.append(AuditEntry(
timestamp=_require_str(raw, "timestamp"),
bottle_slug=_require_str(raw, "bottle_slug"),
component=_require_str(raw, "component"),
operator_action=_require_str(raw, "operator_action"),
operator_notes=_require_str(raw, "operator_notes"),
justification=_require_str(raw, "justification"),
diff=_require_str(raw, "diff"),
))
except ValueError:
continue
return out
# --- Diff rendering --------------------------------------------------------
def render_diff(before: str, after: str, *, label: str = "config") -> str:
"""Unified diff suitable for the audit log + TUI. Empty diff (no
changes) renders as the empty string."""
diff = difflib.unified_diff(
before.splitlines(keepends=True),
after.splitlines(keepends=True),
fromfile=f"{label} (current)",
tofile=f"{label} (proposed)",
lineterm="",
)
parts = list(diff)
if not parts:
return ""
return "".join(p if p.endswith("\n") else p + "\n" for p in parts).rstrip("\n")
def sha256_hex(content: str) -> str:
return hashlib.sha256(content.encode("utf-8")).hexdigest()
# --- Helpers ---------------------------------------------------------------
def _require_str(raw: dict[str, object], key: str) -> str:
value = raw.get(key)
if not isinstance(value, str):
raise ValueError(f"missing or non-string field {key!r}")
return value
def _atomic_write(path: Path, content: str, *, mode: int) -> None:
"""Atomic: write to a sibling tmp file, fsync, rename."""
tmp = path.with_suffix(path.suffix + ".tmp")
fd = os.open(tmp, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, mode)
try:
os.write(fd, content.encode("utf-8"))
os.fsync(fd)
finally:
os.close(fd)
os.replace(tmp, path)
try:
import fcntl as _fcntl
def _try_flock(fd: int) -> None:
try:
_fcntl.flock(fd, _fcntl.LOCK_EX)
except OSError:
pass
def _try_funlock(fd: int) -> None:
try:
_fcntl.flock(fd, _fcntl.LOCK_UN)
except OSError:
pass
except ImportError: # pragma: no cover — Windows path
def _try_flock(fd: int) -> None:
return None
def _try_funlock(fd: int) -> None:
return None
__all__ = [
"ACTION_OPERATOR_EDIT",
"AuditEntry",
"COMPONENT_FOR_TOOL",
"CURRENT_CONFIG_DIR_IN_AGENT",
"DEFAULT_POLL_INTERVAL_SEC",
"Proposal",
"QUEUE_DIR_IN_CONTAINER",
"Response",
"STATUSES",
"STATUS_APPROVED",
"STATUS_MODIFIED",
"STATUS_REJECTED",
"SUPERVISE_HOSTNAME",
"SUPERVISE_PORT",
"TOOLS",
"TOOL_CAPABILITY_BLOCK",
"TOOL_CRED_PROXY_BLOCK",
"TOOL_PIPELOCK_BLOCK",
"archive_proposal",
"audit_dir",
"audit_log_path",
"claude_bottle_root",
"list_pending_proposals",
"queue_dir_for_slug",
"read_audit_entries",
"read_proposal",
"read_response",
"render_diff",
"sha256_hex",
"wait_for_response",
"write_audit_entry",
"write_proposal",
"write_response",
]
+328
View File
@@ -0,0 +1,328 @@
"""Unit: supervise queue + audit log + diff helpers (PRD 0013)."""
import json
import tempfile
import threading
import time
import unittest
from datetime import datetime, timezone
from pathlib import Path
from claude_bottle import supervise
from claude_bottle.supervise import (
AuditEntry,
Proposal,
Response,
STATUS_APPROVED,
STATUS_MODIFIED,
STATUS_REJECTED,
TOOL_CAPABILITY_BLOCK,
TOOL_CRED_PROXY_BLOCK,
TOOL_PIPELOCK_BLOCK,
archive_proposal,
audit_log_path,
list_pending_proposals,
read_audit_entries,
read_proposal,
read_response,
render_diff,
sha256_hex,
wait_for_response,
write_audit_entry,
write_proposal,
write_response,
)
FIXED_TS = datetime(2026, 5, 25, 12, 0, 0, tzinfo=timezone.utc)
def _proposal(tool: str = TOOL_CRED_PROXY_BLOCK, proposed: str = "{}", justification: str = "need a route") -> Proposal:
return Proposal.new(
bottle_slug="dev",
tool=tool,
proposed_file=proposed,
justification=justification,
current_file_hash=sha256_hex("{}"),
now=FIXED_TS,
)
class TestProposalRoundtrip(unittest.TestCase):
def test_new_stamps_uuid_and_iso_timestamp(self):
p = _proposal()
self.assertTrue(p.id)
self.assertEqual("2026-05-25T12:00:00+00:00", p.arrival_timestamp)
self.assertEqual("dev", p.bottle_slug)
self.assertEqual(TOOL_CRED_PROXY_BLOCK, p.tool)
def test_to_from_dict_roundtrip(self):
p = _proposal()
self.assertEqual(p, Proposal.from_dict(p.to_dict()))
def test_from_dict_rejects_unknown_tool(self):
raw = _proposal().to_dict()
raw["tool"] = "not-a-real-tool"
with self.assertRaises(ValueError):
Proposal.from_dict(raw)
def test_from_dict_rejects_missing_field(self):
raw = _proposal().to_dict()
del raw["justification"]
with self.assertRaises(ValueError):
Proposal.from_dict(raw)
class TestResponseRoundtrip(unittest.TestCase):
def test_to_from_dict_approved(self):
r = Response(proposal_id="abc", status=STATUS_APPROVED, notes="lgtm")
self.assertEqual(r, Response.from_dict(r.to_dict()))
def test_to_from_dict_modified_with_final_file(self):
r = Response(
proposal_id="abc",
status=STATUS_MODIFIED,
notes="tweaked the upstream",
final_file='{"routes": []}\n',
)
self.assertEqual(r, Response.from_dict(r.to_dict()))
def test_rejects_unknown_status(self):
with self.assertRaises(ValueError):
Response.from_dict({
"proposal_id": "abc",
"status": "maybe",
"notes": "",
"final_file": None,
})
def test_rejects_non_string_final_file(self):
with self.assertRaises(ValueError):
Response.from_dict({
"proposal_id": "abc",
"status": STATUS_APPROVED,
"notes": "",
"final_file": 123,
})
class TestQueueIO(unittest.TestCase):
def setUp(self):
self._tmp = tempfile.TemporaryDirectory(prefix="claude-bottle-supervise-test.")
self.queue_dir = Path(self._tmp.name)
def tearDown(self):
self._tmp.cleanup()
def test_write_and_read_proposal(self):
p = _proposal()
path = write_proposal(self.queue_dir, p)
self.assertTrue(path.exists())
self.assertEqual(0o600, path.stat().st_mode & 0o777)
loaded = read_proposal(self.queue_dir, p.id)
self.assertEqual(p, loaded)
def test_list_pending_excludes_responded(self):
a = _proposal(justification="first")
b = _proposal(justification="second")
write_proposal(self.queue_dir, a)
write_proposal(self.queue_dir, b)
write_response(self.queue_dir, Response(
proposal_id=a.id, status=STATUS_APPROVED, notes="",
))
pending = list_pending_proposals(self.queue_dir)
self.assertEqual([b.id], [p.id for p in pending])
def test_list_pending_returns_empty_for_missing_dir(self):
self.assertEqual([], list_pending_proposals(self.queue_dir / "nope"))
def test_list_pending_sorted_by_arrival(self):
# Fabricate two with explicit timestamps.
a = Proposal.new(
bottle_slug="dev", tool=TOOL_CRED_PROXY_BLOCK,
proposed_file="{}", justification="early",
current_file_hash="x",
now=datetime(2026, 5, 25, 10, 0, 0, tzinfo=timezone.utc),
)
b = Proposal.new(
bottle_slug="dev", tool=TOOL_CRED_PROXY_BLOCK,
proposed_file="{}", justification="late",
current_file_hash="x",
now=datetime(2026, 5, 25, 14, 0, 0, tzinfo=timezone.utc),
)
# Write in reverse order.
write_proposal(self.queue_dir, b)
write_proposal(self.queue_dir, a)
ordered = list_pending_proposals(self.queue_dir)
self.assertEqual([a.id, b.id], [p.id for p in ordered])
def test_write_and_read_response(self):
r = Response(proposal_id="xyz", status=STATUS_REJECTED, notes="no")
write_response(self.queue_dir, r)
self.assertEqual(r, read_response(self.queue_dir, "xyz"))
def test_wait_for_response_returns_when_file_appears(self):
p = _proposal()
write_proposal(self.queue_dir, p)
def write_after_delay():
time.sleep(0.05)
write_response(self.queue_dir, Response(
proposal_id=p.id, status=STATUS_APPROVED, notes="ok",
))
t = threading.Thread(target=write_after_delay)
t.start()
try:
r = wait_for_response(self.queue_dir, p.id, poll_interval=0.01)
finally:
t.join()
self.assertEqual(STATUS_APPROVED, r.status)
self.assertEqual("ok", r.notes)
def test_wait_for_response_times_out(self):
deadline = time.monotonic() + 0.05
with self.assertRaises(TimeoutError):
wait_for_response(
self.queue_dir, "never",
poll_interval=0.01, deadline=deadline,
)
def test_archive_proposal_moves_both_files(self):
p = _proposal()
write_proposal(self.queue_dir, p)
write_response(self.queue_dir, Response(
proposal_id=p.id, status=STATUS_APPROVED, notes="",
))
archive_proposal(self.queue_dir, p.id)
self.assertFalse((self.queue_dir / f"{p.id}.proposal.json").exists())
self.assertFalse((self.queue_dir / f"{p.id}.response.json").exists())
self.assertTrue((self.queue_dir / "processed" / f"{p.id}.proposal.json").exists())
self.assertTrue((self.queue_dir / "processed" / f"{p.id}.response.json").exists())
def test_archive_is_idempotent_on_missing_files(self):
# Should not raise.
archive_proposal(self.queue_dir, "nope")
class TestAuditLog(unittest.TestCase):
def setUp(self):
self._tmp = tempfile.TemporaryDirectory(prefix="claude-bottle-supervise-audit.")
self._home_patch = self._patch_home(Path(self._tmp.name))
def tearDown(self):
self._home_patch()
self._tmp.cleanup()
def _patch_home(self, fake_home: Path):
original = supervise.claude_bottle_root
def fake_root() -> Path:
return fake_home / ".claude-bottle"
supervise.claude_bottle_root = fake_root # type: ignore[assignment]
return lambda: setattr(supervise, "claude_bottle_root", original)
def test_write_then_read_single_entry(self):
e = AuditEntry(
timestamp="2026-05-25T12:00:00+00:00",
bottle_slug="dev",
component="cred-proxy",
operator_action=STATUS_APPROVED,
operator_notes="lgtm",
justification="agent needed gh-api token",
diff="--- before\n+++ after\n",
)
path = write_audit_entry(e)
self.assertEqual(0o600, path.stat().st_mode & 0o777)
loaded = read_audit_entries("cred-proxy", "dev")
self.assertEqual([e], loaded)
def test_appends_one_line_per_entry(self):
for i in range(3):
write_audit_entry(AuditEntry(
timestamp=f"2026-05-25T12:00:0{i}+00:00",
bottle_slug="dev",
component="pipelock",
operator_action=STATUS_APPROVED,
operator_notes=f"n{i}",
justification="",
diff="",
))
path = audit_log_path("pipelock", "dev")
with path.open() as f:
lines = [line for line in f if line.strip()]
self.assertEqual(3, len(lines))
for line in lines:
self.assertTrue(json.loads(line)) # each line is valid JSON
def test_separate_logs_per_component_slug(self):
write_audit_entry(AuditEntry(
timestamp="t",
bottle_slug="dev",
component="cred-proxy",
operator_action=STATUS_APPROVED,
operator_notes="",
justification="",
diff="",
))
write_audit_entry(AuditEntry(
timestamp="t",
bottle_slug="dev",
component="pipelock",
operator_action=STATUS_APPROVED,
operator_notes="",
justification="",
diff="",
))
write_audit_entry(AuditEntry(
timestamp="t",
bottle_slug="other",
component="cred-proxy",
operator_action=STATUS_REJECTED,
operator_notes="",
justification="",
diff="",
))
self.assertEqual(1, len(read_audit_entries("cred-proxy", "dev")))
self.assertEqual(1, len(read_audit_entries("pipelock", "dev")))
self.assertEqual(1, len(read_audit_entries("cred-proxy", "other")))
def test_read_audit_entries_missing_log_returns_empty(self):
self.assertEqual([], read_audit_entries("cred-proxy", "no-such-bottle"))
class TestDiffAndHash(unittest.TestCase):
def test_render_diff_returns_empty_when_unchanged(self):
self.assertEqual("", render_diff("a\nb\n", "a\nb\n"))
def test_render_diff_shows_changes(self):
diff = render_diff("a\nb\nc\n", "a\nB\nc\n", label="routes.json")
self.assertIn("routes.json (current)", diff)
self.assertIn("routes.json (proposed)", diff)
self.assertIn("-b", diff)
self.assertIn("+B", diff)
def test_sha256_hex_is_deterministic_and_hex(self):
h1 = sha256_hex("hello")
h2 = sha256_hex("hello")
self.assertEqual(h1, h2)
self.assertEqual(64, len(h1))
int(h1, 16) # parses as hex
class TestToolConstants(unittest.TestCase):
def test_tools_tuple_matches_individual_constants(self):
self.assertEqual(
(TOOL_CRED_PROXY_BLOCK, TOOL_PIPELOCK_BLOCK, TOOL_CAPABILITY_BLOCK),
supervise.TOOLS,
)
def test_component_map_covers_two_remediation_tools_only(self):
self.assertIn(TOOL_CRED_PROXY_BLOCK, supervise.COMPONENT_FOR_TOOL)
self.assertIn(TOOL_PIPELOCK_BLOCK, supervise.COMPONENT_FOR_TOOL)
self.assertNotIn(TOOL_CAPABILITY_BLOCK, supervise.COMPONENT_FOR_TOOL)
if __name__ == "__main__":
unittest.main()