PRD 0015: pipelock block remediation #21

Merged
didericis merged 5 commits from prd-0015-pipelock-block into main 2026-05-25 05:15:16 -04:00
2 changed files with 91 additions and 10 deletions
Showing only changes of commit 5a6c4be342 - Show all commits
+23 -7
View File
@@ -27,6 +27,11 @@ from ..backend.docker.cred_proxy_apply import (
apply_routes_change,
fetch_current_routes,
)
from ..backend.docker.pipelock_apply import (
PipelockApplyError,
apply_allowlist_change,
fetch_current_allowlist,
)
from ..log import info
from ..supervise import (
ACTION_OPERATOR_EDIT,
@@ -39,6 +44,7 @@ from ..supervise import (
STATUS_REJECTED,
TOOL_CAPABILITY_BLOCK,
TOOL_CRED_PROXY_BLOCK,
TOOL_PIPELOCK_BLOCK,
list_pending_proposals,
render_diff,
write_audit_entry,
@@ -47,6 +53,12 @@ from ..supervise import (
from ._common import PROG
# Errors any remediation engine may raise. Caught by the TUI key
# handlers and surfaced in the status line so a failed apply keeps
# the proposal pending rather than crashing curses.
ApplyError = (CredProxyApplyError, PipelockApplyError)
# --- Discovery -------------------------------------------------------------
@@ -129,9 +141,13 @@ def approve(
diff_before, diff_after = apply_routes_change(
qp.proposal.bottle_slug, file_to_apply,
)
# pipelock-block + capability-block remediation lands in PRDs
# 0015 + 0016; for 0014 they remain no-op approvals and the
# audit diff stays empty.
elif qp.proposal.tool == TOOL_PIPELOCK_BLOCK:
diff_before, diff_after = apply_allowlist_change(
qp.proposal.bottle_slug, file_to_apply,
)
# capability-block remediation lands in PRD 0016; until then
# it stays a no-op approval and the audit (none for capability)
# is skipped.
response = Response(
proposal_id=qp.proposal.id,
@@ -310,7 +326,7 @@ def _main_loop(stdscr: "curses._CursesWindow") -> None:
try:
approve(qp)
status_line = f"approved {qp.proposal.tool} for [{qp.proposal.bottle_slug}]"
except CredProxyApplyError as e:
except ApplyError as e:
status_line = f"apply failed: {e}"
elif key == ord("m"):
edited = _modify(stdscr, qp)
@@ -320,7 +336,7 @@ def _main_loop(stdscr: "curses._CursesWindow") -> None:
try:
approve(qp, final_file=edited, notes="operator modified before approving")
status_line = f"modified+approved {qp.proposal.tool} for [{qp.proposal.bottle_slug}]"
except CredProxyApplyError as e:
except ApplyError as e:
status_line = f"apply failed: {e}"
elif key == ord("r"):
reason = _prompt(stdscr, "reject reason: ")
@@ -403,7 +419,7 @@ def _detail_view(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> None:
elif key == ord("a"):
try:
approve(qp)
except CredProxyApplyError:
except ApplyError:
pass # Status surfaces back in the list view's render.
return
elif key == ord("m"):
@@ -411,7 +427,7 @@ def _detail_view(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> None:
if edited is not None:
try:
approve(qp, final_file=edited, notes="operator modified before approving")
except CredProxyApplyError:
except ApplyError:
pass
return
elif key == ord("r"):
+68 -3
View File
@@ -17,6 +17,7 @@ from pathlib import Path
from claude_bottle import supervise
from claude_bottle.backend.docker.cred_proxy_apply import CredProxyApplyError
from claude_bottle.backend.docker.pipelock_apply import PipelockApplyError
from claude_bottle.cli import dashboard
from claude_bottle.supervise import (
Proposal,
@@ -115,15 +116,20 @@ class TestDiscoverPending(_FakeHomeMixin, unittest.TestCase):
class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
def setUp(self):
self._setup_fake_home()
self._original_apply = dashboard.apply_routes_change
# Default stub: succeed with deterministic before/after so the
self._original_apply_routes = dashboard.apply_routes_change
self._original_apply_allowlist = dashboard.apply_allowlist_change
# Default stubs: succeed with deterministic before/after so the
# audit log shows a non-empty diff.
dashboard.apply_routes_change = lambda slug, content: (
'{"routes": []}\n', content,
)
dashboard.apply_allowlist_change = lambda slug, content: (
"old.example\n", content,
)
def tearDown(self):
dashboard.apply_routes_change = self._original_apply
dashboard.apply_routes_change = self._original_apply_routes
dashboard.apply_allowlist_change = self._original_apply_allowlist
self._teardown_fake_home()
def _enqueue(self, tool: str = TOOL_CRED_PROXY_BLOCK):
@@ -268,6 +274,65 @@ class TestCredProxyApplyWiring(_FakeHomeMixin, unittest.TestCase):
self.assertEqual("", entries[0].diff)
class TestPipelockApplyWiring(_FakeHomeMixin, unittest.TestCase):
"""PRD 0015 Phase 2: approve() on a pipelock-block proposal
must call apply_allowlist_change and surface its failures."""
def setUp(self):
self._setup_fake_home()
self._original = dashboard.apply_allowlist_change
def tearDown(self):
dashboard.apply_allowlist_change = self._original
self._teardown_fake_home()
def _enqueue_pipelock(self, proposed: str = "host.example\n"):
p = Proposal.new(
bottle_slug="dev", tool=TOOL_PIPELOCK_BLOCK,
proposed_file=proposed,
justification="need new host",
current_file_hash=sha256_hex(proposed),
now=FIXED,
)
qdir = supervise.queue_dir_for_slug("dev")
qdir.mkdir(parents=True, exist_ok=True)
supervise.write_proposal(qdir, p)
return dashboard.QueuedProposal(proposal=p, queue_dir=qdir)
def test_pipelock_block_calls_apply_with_proposed_file(self):
calls = []
dashboard.apply_allowlist_change = lambda slug, content: (
calls.append((slug, content)) or ("before", content)
)
qp = self._enqueue_pipelock("new.example\n")
dashboard.approve(qp)
self.assertEqual([("dev", "new.example\n")], calls)
def test_apply_failure_blocks_response_and_audit(self):
dashboard.apply_allowlist_change = lambda slug, content: (_ for _ in ()).throw(
PipelockApplyError("docker exec failed")
)
qp = self._enqueue_pipelock()
with self.assertRaises(PipelockApplyError):
dashboard.approve(qp)
self.assertEqual(
[qp.proposal.id],
[p.id for p in supervise.list_pending_proposals(qp.queue_dir)],
)
self.assertEqual([], read_audit_entries("pipelock", "dev"))
def test_real_diff_lands_in_audit(self):
dashboard.apply_allowlist_change = lambda slug, content: (
"old.example\n",
"old.example\nnew.example\n",
)
qp = self._enqueue_pipelock("old.example\nnew.example\n")
dashboard.approve(qp)
entries = read_audit_entries("pipelock", "dev")
self.assertEqual(1, len(entries))
self.assertIn("+new.example", entries[0].diff)
class TestOperatorEditRoutes(_FakeHomeMixin, unittest.TestCase):
"""PRD 0014 Phase 4: operator-initiated routes edit (not gated
on a pending proposal)."""