diff --git a/bot_bottle/cli/supervise.py b/bot_bottle/cli/supervise.py index 9812cf5..da15cbe 100644 --- a/bot_bottle/cli/supervise.py +++ b/bot_bottle/cli/supervise.py @@ -40,6 +40,7 @@ from ..supervise import ( STATUS_MODIFIED, STATUS_REJECTED, TOOL_CAPABILITY_BLOCK, + TOOL_GITLEAKS_ALLOW, archive_proposal, list_pending_proposals, render_diff, @@ -115,6 +116,8 @@ def _detail_lines( def _suffix_for_tool(tool: str) -> str: if tool == TOOL_CAPABILITY_BLOCK: return ".dockerfile" + if tool == TOOL_GITLEAKS_ALLOW: + return ".txt" return ".txt" @@ -154,7 +157,7 @@ def approve( qp, action=status, notes=notes, diff_before=diff_before, diff_after=diff_after, ) - if qp.proposal.tool == TOOL_CAPABILITY_BLOCK: + if qp.proposal.tool in (TOOL_CAPABILITY_BLOCK, TOOL_GITLEAKS_ALLOW): archive_proposal(qp.queue_dir, qp.proposal.id) @@ -170,6 +173,23 @@ def reject(qp: QueuedProposal, *, reason: str) -> None: _write_audit(qp, action=STATUS_REJECTED, notes=reason, diff_before="", diff_after="") +def _approve_from_tui( + stdscr: "curses._CursesWindow", # type: ignore + qp: QueuedProposal, + *, + final_file: str | None = None, + notes: str = "", +) -> str: + """Approve from curses, prompting for any tool-specific audit note.""" + if qp.proposal.tool == TOOL_GITLEAKS_ALLOW and final_file is None: + notes = _prompt(stdscr, "allow reason (test fixture/false positive): ") + if not notes: + return "approve aborted (empty reason)" + approve(qp, final_file=final_file, notes=notes) + verb = "modified+approved" if final_file is not None else "approved" + return _approval_status(qp, verb) + + def _write_audit( qp: QueuedProposal, *, @@ -353,18 +373,22 @@ def _main_loop(stdscr: "curses._CursesWindow") -> None: # type: ignore _detail_view(stdscr, qp, green_attr=green_attr) elif key == ord("a"): try: - approve(qp) - status_line = _approval_status(qp, "approved") + status_line = _approve_from_tui(stdscr, qp) except ApplyError as e: status_line = f"apply failed: {e}" elif key == ord("m"): + if qp.proposal.tool == TOOL_GITLEAKS_ALLOW: + status_line = "modify unavailable for gitleaks-allow" + continue edited = _modify(stdscr, qp) if edited is None: status_line = "modify aborted (no change)" else: try: - approve(qp, final_file=edited, notes="operator modified before approving") - status_line = _approval_status(qp, "modified+approved") + status_line = _approve_from_tui( + stdscr, qp, final_file=edited, + notes="operator modified before approving", + ) except ApplyError as e: status_line = f"apply failed: {e}" elif key == ord("r"): @@ -462,15 +486,20 @@ def _detail_view( offset = max(0, len(lines) - 1) elif key == ord("a"): try: - approve(qp) + _approve_from_tui(stdscr, qp) except ApplyError: pass return elif key == ord("m"): + if qp.proposal.tool == TOOL_GITLEAKS_ALLOW: + return edited = _modify(stdscr, qp) if edited is not None: try: - approve(qp, final_file=edited, notes="operator modified before approving") + _approve_from_tui( + stdscr, qp, final_file=edited, + notes="operator modified before approving", + ) except ApplyError: pass return diff --git a/bot_bottle/git_gate.py b/bot_bottle/git_gate.py index c2241f4..239499c 100644 --- a/bot_bottle/git_gate.py +++ b/bot_bottle/git_gate.py @@ -247,6 +247,164 @@ cat > "$refs_file" zero=0000000000000000000000000000000000000000 +supervise_gitleaks_allow() { + log_opts=$1 + ref=$2 + report_file=$(mktemp) + if ! gitleaks git \ + --log-opts="$log_opts" \ + --no-banner \ + --redact \ + --ignore-gitleaks-allow \ + --report-format=json \ + --report-path="$report_file" \ + --exit-code 0 \ + 1>&2; then + rm -f "$report_file" + echo "git-gate: gitleaks inline-suppression scan failed for $ref" >&2 + return 1 + fi + + proposal_id=$( + GITLEAKS_ALLOW_REF="$ref" python3 - "$report_file" <<'PY' +import datetime +import hashlib +import json +import os +import sys +import uuid +from pathlib import Path + +report_path = Path(sys.argv[1]) +queue_dir = os.environ.get("SUPERVISE_QUEUE_DIR", "") +slug = os.environ.get("SUPERVISE_BOTTLE_SLUG", "") +if not queue_dir or not slug: + sys.exit(2) + +try: + raw = json.loads(report_path.read_text() or "[]") +except json.JSONDecodeError: + sys.exit(3) +if not isinstance(raw, list): + sys.exit(3) +if not raw: + sys.exit(0) + +ref = os.environ.get("GITLEAKS_ALLOW_REF", "") +lines = [ + "gitleaks inline suppression requires supervisor approval", + f"ref: {ref}", + "", +] +for i, finding in enumerate(raw, 1): + if not isinstance(finding, dict): + continue + file_path = finding.get("File", "") + line_no = finding.get("StartLine", finding.get("Line", "")) + rule_id = finding.get("RuleID", "") + commit = finding.get("Commit", "") + line = finding.get("Line", "") + lines.extend([ + f"finding {i}:", + f" file: {file_path}", + f" line: {line_no}", + f" rule: {rule_id}", + f" commit: {commit}", + f" code: {line}", + "", + ]) + +payload = "\n".join(lines).rstrip() + "\n" +proposal_id = str(uuid.uuid4()) +proposal = { + "id": proposal_id, + "bottle_slug": slug, + "tool": "gitleaks-allow", + "proposed_file": payload, + "justification": ( + "git-gate found gitleaks findings hidden by # gitleaks:allow; " + "approve only for dummy test fixtures or confirmed false positives" + ), + "arrival_timestamp": datetime.datetime.now( + datetime.timezone.utc + ).isoformat(), + "current_file_hash": hashlib.sha256(payload.encode("utf-8")).hexdigest(), +} +queue = Path(queue_dir) +queue.mkdir(parents=True, exist_ok=True) +path = queue / f"{proposal_id}.proposal.json" +tmp = path.with_suffix(path.suffix + ".tmp") +with tmp.open("w", encoding="utf-8") as f: + json.dump(proposal, f, indent=2) + f.write("\n") +os.chmod(tmp, 0o600) +os.replace(tmp, path) +print(proposal_id) +PY + ) + rc=$? + rm -f "$report_file" + if [ "$rc" -eq 0 ] && [ -z "$proposal_id" ]; then + return 0 + fi + if [ "$rc" -ne 0 ]; then + echo "git-gate: cannot route # gitleaks:allow finding to supervisor; refusing push" >&2 + return 1 + fi + + queue_dir=${SUPERVISE_QUEUE_DIR:-} + response_file="$queue_dir/${proposal_id}.response.json" + timeout=${SUPERVISE_GITLEAKS_ALLOW_TIMEOUT_SECONDS:-300} + case "$timeout" in + ''|*[!0-9]*) + echo "git-gate: invalid SUPERVISE_GITLEAKS_ALLOW_TIMEOUT_SECONDS=$timeout" >&2 + return 1 + ;; + esac + echo "git-gate: queued # gitleaks:allow supervisor approval $proposal_id" >&2 + echo "git-gate: approve with './cli.py supervise' to continue this push" >&2 + waited=0 + while [ "$waited" -lt "$timeout" ]; do + if [ -f "$response_file" ]; then + status=$(python3 - "$response_file" <<'PY' +import json +import sys +try: + with open(sys.argv[1], encoding="utf-8") as f: + raw = json.load(f) +except (OSError, json.JSONDecodeError): + sys.exit(1) +status = raw.get("status") +if not isinstance(status, str): + sys.exit(1) +print(status) +PY + ) || status="" + case "$status" in + approved|modified) + mkdir -p "$queue_dir/processed" + mv -f "$queue_dir/${proposal_id}.proposal.json" "$queue_dir/processed/" 2>/dev/null || true + mv -f "$queue_dir/${proposal_id}.response.json" "$queue_dir/processed/" 2>/dev/null || true + echo "git-gate: supervisor approved # gitleaks:allow for $ref" >&2 + return 0 + ;; + rejected) + echo "git-gate: supervisor rejected # gitleaks:allow for $ref" >&2 + return 1 + ;; + *) + echo "git-gate: invalid supervisor response for # gitleaks:allow" >&2 + return 1 + ;; + esac + fi + sleep 1 + waited=$((waited + 1)) + done + echo "git-gate: supervisor approval timed out for # gitleaks:allow; refusing push" >&2 + return 1 +} + # Phase 1: gitleaks scan each ref's incoming commits. while IFS=' ' read -r old new ref; do [ -z "$ref" ] && continue @@ -268,6 +426,9 @@ while IFS=' ' read -r old new ref; do echo "git-gate: gitleaks rejected push to $ref" >&2 exit 1 fi + if ! supervise_gitleaks_allow "$log_opts" "$ref"; then + exit 1 + fi done < "$refs_file" # Phase 2: forward each ref to the upstream (`origin`, configured diff --git a/bot_bottle/supervise.py b/bot_bottle/supervise.py index df01a67..3c9b202 100644 --- a/bot_bottle/supervise.py +++ b/bot_bottle/supervise.py @@ -49,9 +49,11 @@ SUPERVISE_HOSTNAME = "supervise" SUPERVISE_PORT = 9100 TOOL_CAPABILITY_BLOCK = "capability-block" +TOOL_GITLEAKS_ALLOW = "gitleaks-allow" TOOL_LIST_EGRESS_ROUTES = "list-egress-routes" TOOLS: tuple[str, ...] = ( TOOL_CAPABILITY_BLOCK, + TOOL_GITLEAKS_ALLOW, TOOL_LIST_EGRESS_ROUTES, ) diff --git a/tests/unit/test_git_gate.py b/tests/unit/test_git_gate.py index 29ade69..abe2413 100644 --- a/tests/unit/test_git_gate.py +++ b/tests/unit/test_git_gate.py @@ -199,6 +199,30 @@ class TestHookRender(unittest.TestCase): self.assertIn('set -- "$@" --push-option="$opt"', hook) self.assertIn('git push "$@" origin "$refspec"', hook) + def test_inline_gitleaks_allow_routes_to_supervisor(self): + hook = git_gate_render_hook() + # First gitleaks runs normally; only if that passes does the + # hook ask gitleaks to ignore inline allow comments and report + # the suppressed findings for human approval. + self.assertIn("--ignore-gitleaks-allow", hook) + self.assertIn("--report-format=json", hook) + self.assertIn('"tool": "gitleaks-allow"', hook) + self.assertIn("SUPERVISE_QUEUE_DIR", hook) + self.assertIn("SUPERVISE_BOTTLE_SLUG", hook) + self.assertIn("supervisor approved # gitleaks:allow", hook) + self.assertIn("supervisor rejected # gitleaks:allow", hook) + + def test_inline_gitleaks_allow_fails_closed_without_supervisor(self): + hook = git_gate_render_hook() + self.assertIn( + "cannot route # gitleaks:allow finding to supervisor; refusing push", + hook, + ) + self.assertIn( + "supervisor approval timed out for # gitleaks:allow; refusing push", + hook, + ) + class TestAccessHookRender(unittest.TestCase): def test_access_hook_refreshes_origin_on_upload_pack(self): diff --git a/tests/unit/test_supervise.py b/tests/unit/test_supervise.py index 8eeb8db..01bc2c2 100644 --- a/tests/unit/test_supervise.py +++ b/tests/unit/test_supervise.py @@ -17,6 +17,7 @@ from bot_bottle.supervise import ( STATUS_MODIFIED, STATUS_REJECTED, TOOL_CAPABILITY_BLOCK, + TOOL_GITLEAKS_ALLOW, archive_proposal, audit_log_path, list_pending_proposals, @@ -318,6 +319,7 @@ class TestToolConstants(unittest.TestCase): self.assertEqual( ( TOOL_CAPABILITY_BLOCK, + TOOL_GITLEAKS_ALLOW, supervise.TOOL_LIST_EGRESS_ROUTES, ), supervise.TOOLS, diff --git a/tests/unit/test_supervise_cli.py b/tests/unit/test_supervise_cli.py index f7ad6d4..69240bd 100644 --- a/tests/unit/test_supervise_cli.py +++ b/tests/unit/test_supervise_cli.py @@ -12,6 +12,7 @@ import tempfile import unittest from datetime import datetime, timezone from pathlib import Path +from unittest.mock import patch from bot_bottle import supervise from bot_bottle.cli import supervise as supervise_cli @@ -21,6 +22,7 @@ from bot_bottle.supervise import ( STATUS_MODIFIED, STATUS_REJECTED, TOOL_CAPABILITY_BLOCK, + TOOL_GITLEAKS_ALLOW, read_audit_entries, read_response, sha256_hex, @@ -33,6 +35,7 @@ FIXED = datetime(2026, 5, 25, 12, 0, 0, tzinfo=timezone.utc) def _proposal(slug: str = "dev", tool: str = TOOL_CAPABILITY_BLOCK) -> Proposal: payloads = { TOOL_CAPABILITY_BLOCK: "FROM python:3.13\n", + TOOL_GITLEAKS_ALLOW: "file: tests/test_fixture.py\nline: 3\n", } payload = payloads.get(tool, "") return Proposal.new( @@ -154,6 +157,28 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase): supervise_cli.approve(qp) self.assertEqual([], read_audit_entries("egress", "dev")) + def test_approve_archives_gitleaks_allow(self): + qp = self._enqueue(tool=TOOL_GITLEAKS_ALLOW) + supervise_cli.approve(qp, notes="dummy fixture") + resp = read_response(qp.queue_dir / "processed", qp.proposal.id) + self.assertEqual(STATUS_APPROVED, resp.status) + self.assertEqual("dummy fixture", resp.notes) + + def test_tui_gitleaks_allow_requires_reason(self): + qp = self._enqueue(tool=TOOL_GITLEAKS_ALLOW) + with patch.object(supervise_cli, "_prompt", return_value=""): + status = supervise_cli._approve_from_tui(None, qp) # type: ignore[arg-type] + self.assertEqual("approve aborted (empty reason)", status) + self.assertFalse((qp.queue_dir / "processed").exists()) + + def test_tui_gitleaks_allow_writes_reason(self): + qp = self._enqueue(tool=TOOL_GITLEAKS_ALLOW) + with patch.object(supervise_cli, "_prompt", return_value="test fixture"): + status = supervise_cli._approve_from_tui(None, qp) # type: ignore[arg-type] + self.assertIn("approved gitleaks-allow", status) + resp = read_response(qp.queue_dir / "processed", qp.proposal.id) + self.assertEqual("test fixture", resp.notes) + # class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase): # # DISABLED — capability_apply functionality is currently commented out.