9cd583fbbb
Finishes PRD 0017. The `cred-proxy-block` MCP tool is renamed and
its remediation apply path is repointed at egress-proxy.
- `claude_bottle/supervise.py` — `TOOL_CRED_PROXY_BLOCK` →
`TOOL_EGRESS_PROXY_BLOCK`; `COMPONENT_FOR_TOOL` maps the new
tool ID to `egress-proxy` for audit-log routing.
- `claude_bottle/supervise_server.py` — tool definition renamed
+ description rewritten: "Call when egress-proxy refused your
HTTPS request ... Read the current routes.yaml from /etc/
claude-bottle/current-config/routes.yaml, compose a modified
version, pass the full new file plus a justification." The
syntactic validator dispatches on the new tool ID.
- `claude_bottle/backend/docker/egress_proxy_apply.py` — renamed
from `cred_proxy_apply.py`. Reads routes.yaml from
/etc/egress-proxy/routes.yaml via `docker exec cat`; validates
via `egress_proxy_addon_core.load_routes` (so both sides use
the same parser); writes via `docker cp`; SIGHUPs egress-proxy
with `docker kill --signal HUP`. `EgressProxyApplyError`
replaces `CredProxyApplyError`.
- `claude_bottle/cli/dashboard.py` — wires the new apply +
`discover_egress_proxy_slugs` helper; the operator-initiated
`routes edit <bottle>` verb now writes to egress-proxy with
`.yaml` suffix. Stale follow-up comment about path-aware
filtering removed — PRD 0017 settled that question.
- `tests/integration/test_supervise_sidecar.py` — restores the
approval round-trip test (chunk 2 had switched it to a reject
path because no cred-proxy existed). Approval stubs
`apply_routes_change` so the test focuses on the supervise
queue/response plumbing rather than docker-exec into a real
egress-proxy sidecar (that's covered separately).
- `tests/unit/test_egress_proxy_apply.py` — rewritten against
the new validator; covers JSON shape, missing routes key,
partial-auth-pair rejection (the addon-core parser catches
these before SIGHUP).
- PRDs 0010 + 0014 — status headers updated to
Superseded / Retargeted with a callout block pointing at PRD
0017's migration section. Historical text preserved.
384 unit + integration tests pass.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
589 lines
24 KiB
Python
589 lines
24 KiB
Python
"""Unit: dashboard headless paths (PRD 0013 phase 4, PRD 0014).
|
|
|
|
The curses TUI itself isn't exercised here — these tests cover the
|
|
discovery + approve/reject + audit-write paths that the TUI's key
|
|
handlers call into.
|
|
|
|
apply_routes_change is stubbed at the dashboard module level so the
|
|
tests don't need a running cred-proxy sidecar; the real docker
|
|
exec/cp/SIGHUP plumbing is covered by the integration test.
|
|
"""
|
|
|
|
import os
|
|
import tempfile
|
|
import unittest
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
from claude_bottle import supervise
|
|
from claude_bottle.backend.docker.capability_apply import CapabilityApplyError
|
|
from claude_bottle.backend.docker.egress_proxy_apply import EgressProxyApplyError
|
|
from claude_bottle.backend.docker.pipelock_apply import PipelockApplyError
|
|
from claude_bottle.cli import dashboard
|
|
from claude_bottle.supervise import (
|
|
Proposal,
|
|
STATUS_APPROVED,
|
|
STATUS_MODIFIED,
|
|
STATUS_REJECTED,
|
|
TOOL_CAPABILITY_BLOCK,
|
|
TOOL_EGRESS_PROXY_BLOCK,
|
|
TOOL_PIPELOCK_BLOCK,
|
|
read_audit_entries,
|
|
read_response,
|
|
sha256_hex,
|
|
)
|
|
|
|
|
|
FIXED = datetime(2026, 5, 25, 12, 0, 0, tzinfo=timezone.utc)
|
|
|
|
|
|
def _proposal(slug: str = "dev", tool: str = TOOL_EGRESS_PROXY_BLOCK) -> Proposal:
|
|
# Per-tool payload shape: cred-proxy gets routes.json, pipelock
|
|
# gets a failed URL (PR #25 follow-up), capability gets a
|
|
# Dockerfile-ish blob. Match the production dispatch in
|
|
# PROPOSED_FILE_FIELD.
|
|
payloads = {
|
|
TOOL_EGRESS_PROXY_BLOCK: '{"routes": []}\n',
|
|
TOOL_PIPELOCK_BLOCK: "https://example.com/path",
|
|
TOOL_CAPABILITY_BLOCK: "FROM python:3.13\n",
|
|
}
|
|
payload = payloads.get(tool, "")
|
|
return Proposal.new(
|
|
bottle_slug=slug, tool=tool,
|
|
proposed_file=payload,
|
|
justification=f"needed for {slug}",
|
|
current_file_hash=sha256_hex(payload),
|
|
now=FIXED,
|
|
)
|
|
|
|
|
|
class _FakeHomeMixin:
|
|
"""Patch supervise.claude_bottle_root to a temp dir for the test."""
|
|
|
|
def _setup_fake_home(self):
|
|
self._tmp = tempfile.TemporaryDirectory(prefix="dashboard-test.")
|
|
original = supervise.claude_bottle_root
|
|
|
|
def fake_root() -> Path:
|
|
return Path(self._tmp.name) / ".claude-bottle"
|
|
|
|
supervise.claude_bottle_root = fake_root # type: ignore[assignment]
|
|
self._restore_home = lambda: setattr(supervise, "claude_bottle_root", original)
|
|
|
|
def _teardown_fake_home(self):
|
|
self._restore_home()
|
|
self._tmp.cleanup()
|
|
|
|
|
|
class TestDiscoverPending(_FakeHomeMixin, unittest.TestCase):
|
|
def setUp(self):
|
|
self._setup_fake_home()
|
|
|
|
def tearDown(self):
|
|
self._teardown_fake_home()
|
|
|
|
def test_empty_when_no_queues(self):
|
|
self.assertEqual([], dashboard.discover_pending())
|
|
|
|
def test_walks_all_slug_subdirs(self):
|
|
for slug in ("dev", "api"):
|
|
qdir = supervise.queue_dir_for_slug(slug)
|
|
qdir.mkdir(parents=True)
|
|
supervise.write_proposal(qdir, _proposal(slug=slug))
|
|
pending = dashboard.discover_pending()
|
|
self.assertEqual({"dev", "api"}, {qp.proposal.bottle_slug for qp in pending})
|
|
|
|
def test_sorted_by_arrival_across_bottles(self):
|
|
early = Proposal.new(
|
|
bottle_slug="api", tool=TOOL_EGRESS_PROXY_BLOCK,
|
|
proposed_file="{}", justification="early",
|
|
current_file_hash="h",
|
|
now=datetime(2026, 5, 25, 10, 0, 0, tzinfo=timezone.utc),
|
|
)
|
|
late = Proposal.new(
|
|
bottle_slug="dev", tool=TOOL_EGRESS_PROXY_BLOCK,
|
|
proposed_file="{}", justification="late",
|
|
current_file_hash="h",
|
|
now=datetime(2026, 5, 25, 14, 0, 0, tzinfo=timezone.utc),
|
|
)
|
|
for p in (late, early):
|
|
qdir = supervise.queue_dir_for_slug(p.bottle_slug)
|
|
qdir.mkdir(parents=True, exist_ok=True)
|
|
supervise.write_proposal(qdir, p)
|
|
pending = dashboard.discover_pending()
|
|
self.assertEqual([early.id, late.id], [qp.proposal.id for qp in pending])
|
|
|
|
def test_excludes_already_responded(self):
|
|
p = _proposal()
|
|
qdir = supervise.queue_dir_for_slug("dev")
|
|
qdir.mkdir(parents=True)
|
|
supervise.write_proposal(qdir, p)
|
|
supervise.write_response(qdir, supervise.Response(
|
|
proposal_id=p.id, status=STATUS_APPROVED, notes="",
|
|
))
|
|
self.assertEqual([], dashboard.discover_pending())
|
|
|
|
|
|
class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
|
|
def setUp(self):
|
|
self._setup_fake_home()
|
|
self._original_apply_routes = dashboard.apply_routes_change
|
|
self._original_apply_allowlist = dashboard.apply_allowlist_change
|
|
self._original_fetch_allowlist = dashboard.fetch_current_allowlist
|
|
self._original_apply_capability = dashboard.apply_capability_change
|
|
# Default stubs: succeed with deterministic before/after so the
|
|
# audit log shows a non-empty diff.
|
|
dashboard.apply_routes_change = lambda slug, content: (
|
|
'{"routes": []}\n', content,
|
|
)
|
|
dashboard.apply_allowlist_change = lambda slug, content: (
|
|
"old.example\n", content,
|
|
)
|
|
dashboard.fetch_current_allowlist = lambda slug: "old.example\n"
|
|
dashboard.apply_capability_change = lambda slug, content: (
|
|
"FROM old\n", content,
|
|
)
|
|
|
|
def tearDown(self):
|
|
dashboard.apply_routes_change = self._original_apply_routes
|
|
dashboard.apply_allowlist_change = self._original_apply_allowlist
|
|
dashboard.fetch_current_allowlist = self._original_fetch_allowlist
|
|
dashboard.apply_capability_change = self._original_apply_capability
|
|
self._teardown_fake_home()
|
|
|
|
def _enqueue(self, tool: str = TOOL_EGRESS_PROXY_BLOCK):
|
|
p = _proposal(tool=tool)
|
|
qdir = supervise.queue_dir_for_slug("dev")
|
|
qdir.mkdir(parents=True, exist_ok=True)
|
|
supervise.write_proposal(qdir, p)
|
|
return dashboard.QueuedProposal(proposal=p, queue_dir=qdir)
|
|
|
|
def test_approve_writes_response_and_audit(self):
|
|
qp = self._enqueue()
|
|
dashboard.approve(qp)
|
|
resp = read_response(qp.queue_dir, qp.proposal.id)
|
|
self.assertEqual(STATUS_APPROVED, resp.status)
|
|
self.assertIsNone(resp.final_file)
|
|
entries = read_audit_entries("egress-proxy", "dev")
|
|
self.assertEqual(1, len(entries))
|
|
self.assertEqual("approved", entries[0].operator_action)
|
|
|
|
def test_approve_with_final_file_marks_modified(self):
|
|
qp = self._enqueue()
|
|
dashboard.approve(qp, final_file='{"routes": [{"path": "/x/"}]}\n', notes="tweaked")
|
|
resp = read_response(qp.queue_dir, qp.proposal.id)
|
|
self.assertEqual(STATUS_MODIFIED, resp.status)
|
|
self.assertEqual('{"routes": [{"path": "/x/"}]}\n', resp.final_file)
|
|
self.assertEqual("tweaked", resp.notes)
|
|
entries = read_audit_entries("egress-proxy", "dev")
|
|
self.assertEqual("modified", entries[0].operator_action)
|
|
|
|
def test_reject_writes_rejection(self):
|
|
qp = self._enqueue()
|
|
dashboard.reject(qp, reason="nope")
|
|
resp = read_response(qp.queue_dir, qp.proposal.id)
|
|
self.assertEqual(STATUS_REJECTED, resp.status)
|
|
self.assertEqual("nope", resp.notes)
|
|
entries = read_audit_entries("egress-proxy", "dev")
|
|
self.assertEqual("rejected", entries[0].operator_action)
|
|
self.assertEqual("nope", entries[0].operator_notes)
|
|
|
|
def test_capability_block_skips_audit_log(self):
|
|
qp = self._enqueue(tool=TOOL_CAPABILITY_BLOCK)
|
|
dashboard.approve(qp)
|
|
# No audit log for capability-block (per PRD 0013 / 0016).
|
|
# cred-proxy and pipelock logs both empty.
|
|
self.assertEqual([], read_audit_entries("egress-proxy", "dev"))
|
|
self.assertEqual([], read_audit_entries("pipelock", "dev"))
|
|
|
|
def test_pipelock_audit_distinct_from_egress_proxy(self):
|
|
qp = self._enqueue(tool=TOOL_PIPELOCK_BLOCK)
|
|
dashboard.approve(qp)
|
|
self.assertEqual(1, len(read_audit_entries("pipelock", "dev")))
|
|
self.assertEqual(0, len(read_audit_entries("egress-proxy", "dev")))
|
|
|
|
|
|
class TestEgressProxyApplyWiring(_FakeHomeMixin, unittest.TestCase):
|
|
"""PRD 0014 Phase 3: approve() on a egress-proxy-block proposal
|
|
must call apply_routes_change with the right args and surface
|
|
its failures."""
|
|
|
|
def setUp(self):
|
|
self._setup_fake_home()
|
|
self._original_apply = dashboard.apply_routes_change
|
|
|
|
def tearDown(self):
|
|
dashboard.apply_routes_change = self._original_apply
|
|
self._teardown_fake_home()
|
|
|
|
def _enqueue_egress_proxy(self, proposed: str = '{"routes": []}\n'):
|
|
p = Proposal.new(
|
|
bottle_slug="dev", tool=TOOL_EGRESS_PROXY_BLOCK,
|
|
proposed_file=proposed,
|
|
justification="need a route",
|
|
current_file_hash=sha256_hex(proposed),
|
|
now=FIXED,
|
|
)
|
|
qdir = supervise.queue_dir_for_slug("dev")
|
|
qdir.mkdir(parents=True, exist_ok=True)
|
|
supervise.write_proposal(qdir, p)
|
|
return dashboard.QueuedProposal(proposal=p, queue_dir=qdir)
|
|
|
|
def test_egress_proxy_block_calls_apply_with_proposed_file(self):
|
|
calls = []
|
|
dashboard.apply_routes_change = lambda slug, content: (
|
|
calls.append((slug, content)) or ("before", content)
|
|
)
|
|
qp = self._enqueue_egress_proxy(proposed='{"routes": [{"path": "/new/"}]}\n')
|
|
dashboard.approve(qp)
|
|
self.assertEqual(1, len(calls))
|
|
slug, content = calls[0]
|
|
self.assertEqual("dev", slug)
|
|
self.assertEqual('{"routes": [{"path": "/new/"}]}\n', content)
|
|
|
|
def test_modify_passes_final_file_to_apply(self):
|
|
calls = []
|
|
dashboard.apply_routes_change = lambda slug, content: (
|
|
calls.append(content) or ("before", content)
|
|
)
|
|
qp = self._enqueue_egress_proxy()
|
|
dashboard.approve(qp, final_file='{"routes": [{"path": "/edited/"}]}\n', notes="tweaked")
|
|
self.assertEqual(['{"routes": [{"path": "/edited/"}]}\n'], calls)
|
|
|
|
def test_apply_failure_blocks_response_and_audit(self):
|
|
dashboard.apply_routes_change = lambda slug, content: (_ for _ in ()).throw(
|
|
EgressProxyApplyError("docker exec failed")
|
|
)
|
|
qp = self._enqueue_egress_proxy()
|
|
with self.assertRaises(EgressProxyApplyError):
|
|
dashboard.approve(qp)
|
|
# No response file (proposal stays pending).
|
|
self.assertEqual(
|
|
[qp.proposal.id],
|
|
[p.id for p in supervise.list_pending_proposals(qp.queue_dir)],
|
|
)
|
|
# No audit entry.
|
|
self.assertEqual([], read_audit_entries("egress-proxy", "dev"))
|
|
|
|
def test_real_diff_lands_in_audit(self):
|
|
dashboard.apply_routes_change = lambda slug, content: (
|
|
'{"routes": []}\n', # before
|
|
'{"routes": [{"path": "/new/"}]}\n', # after
|
|
)
|
|
qp = self._enqueue_egress_proxy(proposed='{"routes": [{"path": "/new/"}]}\n')
|
|
dashboard.approve(qp)
|
|
entries = read_audit_entries("egress-proxy", "dev")
|
|
self.assertEqual(1, len(entries))
|
|
self.assertIn('+{"routes": [{"path": "/new/"}]}', entries[0].diff)
|
|
self.assertIn('-{"routes": []}', entries[0].diff)
|
|
|
|
def test_reject_does_not_call_apply(self):
|
|
called = []
|
|
dashboard.apply_routes_change = lambda slug, content: (
|
|
called.append(True) or ("", content)
|
|
)
|
|
qp = self._enqueue_egress_proxy()
|
|
dashboard.reject(qp, reason="no thanks")
|
|
self.assertEqual([], called)
|
|
# Reject still writes a response + audit entry with empty diff.
|
|
resp = read_response(qp.queue_dir, qp.proposal.id)
|
|
self.assertEqual(STATUS_REJECTED, resp.status)
|
|
entries = read_audit_entries("egress-proxy", "dev")
|
|
self.assertEqual(1, len(entries))
|
|
self.assertEqual("", entries[0].diff)
|
|
|
|
|
|
class TestPipelockApplyWiring(_FakeHomeMixin, unittest.TestCase):
|
|
"""PRD 0015 Phase 2 + PR #25 follow-up: approve() on a
|
|
pipelock-block proposal carries the failed URL; the dashboard
|
|
extracts the host, merges it into the running allowlist, and
|
|
calls apply_allowlist_change with the merged content."""
|
|
|
|
def setUp(self):
|
|
self._setup_fake_home()
|
|
self._original_apply = dashboard.apply_allowlist_change
|
|
self._original_fetch = dashboard.fetch_current_allowlist
|
|
|
|
def tearDown(self):
|
|
dashboard.apply_allowlist_change = self._original_apply
|
|
dashboard.fetch_current_allowlist = self._original_fetch
|
|
self._teardown_fake_home()
|
|
|
|
def _enqueue_pipelock(self, failed_url: str = "https://api.github.com/repos/foo/bar"):
|
|
p = Proposal.new(
|
|
bottle_slug="dev", tool=TOOL_PIPELOCK_BLOCK,
|
|
proposed_file=failed_url,
|
|
justification="need to read PR metadata",
|
|
current_file_hash=sha256_hex(failed_url),
|
|
now=FIXED,
|
|
)
|
|
qdir = supervise.queue_dir_for_slug("dev")
|
|
qdir.mkdir(parents=True, exist_ok=True)
|
|
supervise.write_proposal(qdir, p)
|
|
return dashboard.QueuedProposal(proposal=p, queue_dir=qdir)
|
|
|
|
def test_url_host_merged_into_current_allowlist(self):
|
|
dashboard.fetch_current_allowlist = lambda slug: "existing.example\n"
|
|
applied = []
|
|
dashboard.apply_allowlist_change = lambda slug, content: (
|
|
applied.append((slug, content))
|
|
or ("existing.example\n", content)
|
|
)
|
|
qp = self._enqueue_pipelock("https://api.github.com/repos/foo/bar")
|
|
dashboard.approve(qp)
|
|
# apply_allowlist_change was called with the merged content:
|
|
# existing host + the URL's host (no path, since pipelock is
|
|
# hostname-only).
|
|
self.assertEqual(1, len(applied))
|
|
slug, content = applied[0]
|
|
self.assertEqual("dev", slug)
|
|
self.assertIn("existing.example", content)
|
|
self.assertIn("api.github.com", content)
|
|
self.assertNotIn("/repos/foo/bar", content) # path stripped
|
|
|
|
def test_host_already_in_allowlist_is_idempotent(self):
|
|
dashboard.fetch_current_allowlist = lambda slug: "api.github.com\n"
|
|
applied = []
|
|
dashboard.apply_allowlist_change = lambda slug, content: (
|
|
applied.append(content)
|
|
or ("api.github.com\n", content)
|
|
)
|
|
qp = self._enqueue_pipelock("https://api.github.com/some/path")
|
|
dashboard.approve(qp)
|
|
# Still applied, but the content is unchanged from current —
|
|
# before/after diff is empty.
|
|
self.assertEqual(1, len(applied))
|
|
self.assertEqual("api.github.com\n", applied[0])
|
|
|
|
def test_apply_failure_blocks_response_and_audit(self):
|
|
dashboard.fetch_current_allowlist = lambda slug: "existing.example\n"
|
|
dashboard.apply_allowlist_change = lambda slug, content: (_ for _ in ()).throw(
|
|
PipelockApplyError("docker exec failed")
|
|
)
|
|
qp = self._enqueue_pipelock()
|
|
with self.assertRaises(PipelockApplyError):
|
|
dashboard.approve(qp)
|
|
self.assertEqual(
|
|
[qp.proposal.id],
|
|
[p.id for p in supervise.list_pending_proposals(qp.queue_dir)],
|
|
)
|
|
self.assertEqual([], read_audit_entries("pipelock", "dev"))
|
|
|
|
def test_url_without_host_raises(self):
|
|
dashboard.fetch_current_allowlist = lambda slug: ""
|
|
# supervise_server's validator would catch this; if a broken
|
|
# URL ever makes it through, the dashboard surfaces it too.
|
|
qp = self._enqueue_pipelock("https:///nohost")
|
|
with self.assertRaises(PipelockApplyError):
|
|
dashboard.approve(qp)
|
|
|
|
|
|
class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
|
|
"""PRD 0016 Phase 3: approve() on a capability-block proposal
|
|
calls apply_capability_change, archives the proposal afterward
|
|
(sidecar is gone so it can't archive itself), and writes no
|
|
audit entry (capability-block has none per PRD 0013)."""
|
|
|
|
def setUp(self):
|
|
self._setup_fake_home()
|
|
self._original = dashboard.apply_capability_change
|
|
|
|
def tearDown(self):
|
|
dashboard.apply_capability_change = self._original
|
|
self._teardown_fake_home()
|
|
|
|
def _enqueue_capability(self, proposed: str = "FROM python:3.13\nRUN apk add ripgrep\n"):
|
|
p = Proposal.new(
|
|
bottle_slug="dev", tool=TOOL_CAPABILITY_BLOCK,
|
|
proposed_file=proposed,
|
|
justification="need ripgrep",
|
|
current_file_hash=sha256_hex(proposed),
|
|
now=FIXED,
|
|
)
|
|
qdir = supervise.queue_dir_for_slug("dev")
|
|
qdir.mkdir(parents=True, exist_ok=True)
|
|
supervise.write_proposal(qdir, p)
|
|
return dashboard.QueuedProposal(proposal=p, queue_dir=qdir)
|
|
|
|
def test_capability_block_calls_apply_with_proposed_file(self):
|
|
calls = []
|
|
dashboard.apply_capability_change = lambda slug, content: (
|
|
calls.append((slug, content)) or ("FROM old\n", content)
|
|
)
|
|
qp = self._enqueue_capability("FROM bookworm\n")
|
|
dashboard.approve(qp)
|
|
self.assertEqual([("dev", "FROM bookworm\n")], calls)
|
|
|
|
def test_apply_failure_blocks_response_and_keeps_pending(self):
|
|
dashboard.apply_capability_change = lambda slug, content: (_ for _ in ()).throw(
|
|
CapabilityApplyError("teardown failed")
|
|
)
|
|
qp = self._enqueue_capability()
|
|
with self.assertRaises(CapabilityApplyError):
|
|
dashboard.approve(qp)
|
|
self.assertEqual(
|
|
[qp.proposal.id],
|
|
[p.id for p in supervise.list_pending_proposals(qp.queue_dir)],
|
|
)
|
|
|
|
def test_no_audit_log_for_capability(self):
|
|
dashboard.apply_capability_change = lambda slug, content: ("FROM old\n", content)
|
|
qp = self._enqueue_capability()
|
|
dashboard.approve(qp)
|
|
# capability-block has no audit log per PRD 0013 — its record
|
|
# lives in the per-bottle Dockerfile + transcript state.
|
|
self.assertEqual([], read_audit_entries("egress-proxy", "dev"))
|
|
self.assertEqual([], read_audit_entries("pipelock", "dev"))
|
|
|
|
def test_proposal_archived_after_apply(self):
|
|
dashboard.apply_capability_change = lambda slug, content: ("FROM old\n", content)
|
|
qp = self._enqueue_capability()
|
|
dashboard.approve(qp)
|
|
# Sidecar would normally archive after delivering the response,
|
|
# but it's gone by then. The dashboard archives so
|
|
# discover_pending stops surfacing the resolved proposal.
|
|
self.assertEqual([], supervise.list_pending_proposals(qp.queue_dir))
|
|
processed = list((qp.queue_dir / "processed").glob("*.json"))
|
|
self.assertEqual(2, len(processed))
|
|
|
|
|
|
class TestOperatorEditRoutes(_FakeHomeMixin, unittest.TestCase):
|
|
"""PRD 0014 Phase 4: operator-initiated routes edit (not gated
|
|
on a pending proposal)."""
|
|
|
|
def setUp(self):
|
|
self._setup_fake_home()
|
|
self._original_apply = dashboard.apply_routes_change
|
|
|
|
def tearDown(self):
|
|
dashboard.apply_routes_change = self._original_apply
|
|
self._teardown_fake_home()
|
|
|
|
def test_writes_audit_with_operator_edit_action(self):
|
|
dashboard.apply_routes_change = lambda slug, content: (
|
|
'{"routes": []}\n', content,
|
|
)
|
|
dashboard.operator_edit_routes("dev", '{"routes": [{"path": "/x/"}]}\n')
|
|
entries = read_audit_entries("egress-proxy", "dev")
|
|
self.assertEqual(1, len(entries))
|
|
self.assertEqual(supervise.ACTION_OPERATOR_EDIT, entries[0].operator_action)
|
|
self.assertEqual("", entries[0].justification)
|
|
self.assertIn("+", entries[0].diff)
|
|
|
|
def test_failure_does_not_write_audit(self):
|
|
dashboard.apply_routes_change = lambda slug, content: (_ for _ in ()).throw(
|
|
EgressProxyApplyError("nope")
|
|
)
|
|
with self.assertRaises(EgressProxyApplyError):
|
|
dashboard.operator_edit_routes("dev", '{"routes": []}\n')
|
|
self.assertEqual([], read_audit_entries("egress-proxy", "dev"))
|
|
|
|
|
|
class TestDiscoverEgressProxySlugs(unittest.TestCase):
|
|
"""Slug-extraction parsing — exercises only the parsing path; the
|
|
docker ps invocation itself is environment-dependent (and tested
|
|
implicitly by the integration test)."""
|
|
|
|
def test_returns_empty_when_docker_unavailable(self):
|
|
# Force a failure by setting PATH to a dir with no docker
|
|
# binary. The discover helper swallows the non-zero rc.
|
|
import os
|
|
original = os.environ.get("PATH", "")
|
|
os.environ["PATH"] = "/nonexistent-no-docker-here"
|
|
try:
|
|
self.assertEqual([], dashboard.discover_egress_proxy_slugs())
|
|
self.assertEqual([], dashboard.discover_pipelock_slugs())
|
|
finally:
|
|
os.environ["PATH"] = original
|
|
|
|
|
|
class TestOperatorEditAllowlist(_FakeHomeMixin, unittest.TestCase):
|
|
"""PRD 0015 Phase 3: operator-initiated pipelock allowlist edit."""
|
|
|
|
def setUp(self):
|
|
self._setup_fake_home()
|
|
self._original = dashboard.apply_allowlist_change
|
|
|
|
def tearDown(self):
|
|
dashboard.apply_allowlist_change = self._original
|
|
self._teardown_fake_home()
|
|
|
|
def test_writes_audit_with_operator_edit_action(self):
|
|
dashboard.apply_allowlist_change = lambda slug, content: (
|
|
"old.example\n", content,
|
|
)
|
|
dashboard.operator_edit_allowlist("dev", "old.example\nnew.example\n")
|
|
entries = read_audit_entries("pipelock", "dev")
|
|
self.assertEqual(1, len(entries))
|
|
self.assertEqual(supervise.ACTION_OPERATOR_EDIT, entries[0].operator_action)
|
|
self.assertIn("+new.example", entries[0].diff)
|
|
|
|
def test_failure_does_not_write_audit(self):
|
|
dashboard.apply_allowlist_change = lambda slug, content: (_ for _ in ()).throw(
|
|
PipelockApplyError("nope")
|
|
)
|
|
with self.assertRaises(PipelockApplyError):
|
|
dashboard.operator_edit_allowlist("dev", "x.example\n")
|
|
self.assertEqual([], read_audit_entries("pipelock", "dev"))
|
|
|
|
|
|
class TestEditInEditor(unittest.TestCase):
|
|
def test_runs_editor_returns_edited_content(self):
|
|
# Fake "editor" is /bin/sh -c 'cat <<EOF > $1 ... EOF'
|
|
original_editor = os.environ.get("EDITOR")
|
|
try:
|
|
# Use a fake editor that overwrites the file with a known
|
|
# marker. EDITOR is split with shlex equivalence by
|
|
# subprocess.run when invoked as a list — keep it as a
|
|
# single program path that takes the file as argv[1].
|
|
os.environ["EDITOR"] = (
|
|
"/bin/sh -c 'printf %s \"edited\" > \"$0\"'"
|
|
)
|
|
# subprocess.run with the str as the first list element
|
|
# would try to find a binary literally named "/bin/sh -c ..."
|
|
# — that won't work. Use shell mode trick: wrap in a script.
|
|
# Easier: build a tiny helper script.
|
|
with tempfile.NamedTemporaryFile(
|
|
mode="w", suffix=".sh", delete=False, prefix="fake-editor.",
|
|
) as script:
|
|
script.write('#!/bin/sh\nprintf "%s" "edited" > "$1"\n')
|
|
editor_script = script.name
|
|
os.chmod(editor_script, 0o755)
|
|
os.environ["EDITOR"] = editor_script
|
|
try:
|
|
result = dashboard.edit_in_editor("original")
|
|
self.assertEqual("edited", result)
|
|
finally:
|
|
os.unlink(editor_script)
|
|
finally:
|
|
if original_editor is None:
|
|
os.environ.pop("EDITOR", None)
|
|
else:
|
|
os.environ["EDITOR"] = original_editor
|
|
|
|
def test_returns_none_when_unchanged(self):
|
|
original_editor = os.environ.get("EDITOR")
|
|
try:
|
|
# No-op editor: touch the file (leaves it unchanged).
|
|
with tempfile.NamedTemporaryFile(
|
|
mode="w", suffix=".sh", delete=False, prefix="noop-editor.",
|
|
) as script:
|
|
script.write('#!/bin/sh\n: $1\n')
|
|
editor_script = script.name
|
|
os.chmod(editor_script, 0o755)
|
|
os.environ["EDITOR"] = editor_script
|
|
try:
|
|
result = dashboard.edit_in_editor("original")
|
|
self.assertIsNone(result)
|
|
finally:
|
|
os.unlink(editor_script)
|
|
finally:
|
|
if original_editor is None:
|
|
os.environ.pop("EDITOR", None)
|
|
else:
|
|
os.environ["EDITOR"] = original_editor
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|