Supervise gitleaks inline allow exceptions
This commit is contained in:
@@ -40,6 +40,7 @@ from ..supervise import (
|
|||||||
STATUS_MODIFIED,
|
STATUS_MODIFIED,
|
||||||
STATUS_REJECTED,
|
STATUS_REJECTED,
|
||||||
TOOL_CAPABILITY_BLOCK,
|
TOOL_CAPABILITY_BLOCK,
|
||||||
|
TOOL_GITLEAKS_ALLOW,
|
||||||
archive_proposal,
|
archive_proposal,
|
||||||
list_pending_proposals,
|
list_pending_proposals,
|
||||||
render_diff,
|
render_diff,
|
||||||
@@ -115,6 +116,8 @@ def _detail_lines(
|
|||||||
def _suffix_for_tool(tool: str) -> str:
|
def _suffix_for_tool(tool: str) -> str:
|
||||||
if tool == TOOL_CAPABILITY_BLOCK:
|
if tool == TOOL_CAPABILITY_BLOCK:
|
||||||
return ".dockerfile"
|
return ".dockerfile"
|
||||||
|
if tool == TOOL_GITLEAKS_ALLOW:
|
||||||
|
return ".txt"
|
||||||
return ".txt"
|
return ".txt"
|
||||||
|
|
||||||
|
|
||||||
@@ -154,7 +157,7 @@ def approve(
|
|||||||
qp, action=status, notes=notes,
|
qp, action=status, notes=notes,
|
||||||
diff_before=diff_before, diff_after=diff_after,
|
diff_before=diff_before, diff_after=diff_after,
|
||||||
)
|
)
|
||||||
if qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
|
if qp.proposal.tool in (TOOL_CAPABILITY_BLOCK, TOOL_GITLEAKS_ALLOW):
|
||||||
archive_proposal(qp.queue_dir, qp.proposal.id)
|
archive_proposal(qp.queue_dir, qp.proposal.id)
|
||||||
|
|
||||||
|
|
||||||
@@ -170,6 +173,23 @@ def reject(qp: QueuedProposal, *, reason: str) -> None:
|
|||||||
_write_audit(qp, action=STATUS_REJECTED, notes=reason, diff_before="", diff_after="")
|
_write_audit(qp, action=STATUS_REJECTED, notes=reason, diff_before="", diff_after="")
|
||||||
|
|
||||||
|
|
||||||
|
def _approve_from_tui(
|
||||||
|
stdscr: "curses._CursesWindow", # type: ignore
|
||||||
|
qp: QueuedProposal,
|
||||||
|
*,
|
||||||
|
final_file: str | None = None,
|
||||||
|
notes: str = "",
|
||||||
|
) -> str:
|
||||||
|
"""Approve from curses, prompting for any tool-specific audit note."""
|
||||||
|
if qp.proposal.tool == TOOL_GITLEAKS_ALLOW and final_file is None:
|
||||||
|
notes = _prompt(stdscr, "allow reason (test fixture/false positive): ")
|
||||||
|
if not notes:
|
||||||
|
return "approve aborted (empty reason)"
|
||||||
|
approve(qp, final_file=final_file, notes=notes)
|
||||||
|
verb = "modified+approved" if final_file is not None else "approved"
|
||||||
|
return _approval_status(qp, verb)
|
||||||
|
|
||||||
|
|
||||||
def _write_audit(
|
def _write_audit(
|
||||||
qp: QueuedProposal,
|
qp: QueuedProposal,
|
||||||
*,
|
*,
|
||||||
@@ -353,18 +373,22 @@ def _main_loop(stdscr: "curses._CursesWindow") -> None: # type: ignore
|
|||||||
_detail_view(stdscr, qp, green_attr=green_attr)
|
_detail_view(stdscr, qp, green_attr=green_attr)
|
||||||
elif key == ord("a"):
|
elif key == ord("a"):
|
||||||
try:
|
try:
|
||||||
approve(qp)
|
status_line = _approve_from_tui(stdscr, qp)
|
||||||
status_line = _approval_status(qp, "approved")
|
|
||||||
except ApplyError as e:
|
except ApplyError as e:
|
||||||
status_line = f"apply failed: {e}"
|
status_line = f"apply failed: {e}"
|
||||||
elif key == ord("m"):
|
elif key == ord("m"):
|
||||||
|
if qp.proposal.tool == TOOL_GITLEAKS_ALLOW:
|
||||||
|
status_line = "modify unavailable for gitleaks-allow"
|
||||||
|
continue
|
||||||
edited = _modify(stdscr, qp)
|
edited = _modify(stdscr, qp)
|
||||||
if edited is None:
|
if edited is None:
|
||||||
status_line = "modify aborted (no change)"
|
status_line = "modify aborted (no change)"
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
approve(qp, final_file=edited, notes="operator modified before approving")
|
status_line = _approve_from_tui(
|
||||||
status_line = _approval_status(qp, "modified+approved")
|
stdscr, qp, final_file=edited,
|
||||||
|
notes="operator modified before approving",
|
||||||
|
)
|
||||||
except ApplyError as e:
|
except ApplyError as e:
|
||||||
status_line = f"apply failed: {e}"
|
status_line = f"apply failed: {e}"
|
||||||
elif key == ord("r"):
|
elif key == ord("r"):
|
||||||
@@ -462,15 +486,20 @@ def _detail_view(
|
|||||||
offset = max(0, len(lines) - 1)
|
offset = max(0, len(lines) - 1)
|
||||||
elif key == ord("a"):
|
elif key == ord("a"):
|
||||||
try:
|
try:
|
||||||
approve(qp)
|
_approve_from_tui(stdscr, qp)
|
||||||
except ApplyError:
|
except ApplyError:
|
||||||
pass
|
pass
|
||||||
return
|
return
|
||||||
elif key == ord("m"):
|
elif key == ord("m"):
|
||||||
|
if qp.proposal.tool == TOOL_GITLEAKS_ALLOW:
|
||||||
|
return
|
||||||
edited = _modify(stdscr, qp)
|
edited = _modify(stdscr, qp)
|
||||||
if edited is not None:
|
if edited is not None:
|
||||||
try:
|
try:
|
||||||
approve(qp, final_file=edited, notes="operator modified before approving")
|
_approve_from_tui(
|
||||||
|
stdscr, qp, final_file=edited,
|
||||||
|
notes="operator modified before approving",
|
||||||
|
)
|
||||||
except ApplyError:
|
except ApplyError:
|
||||||
pass
|
pass
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -247,6 +247,164 @@ cat > "$refs_file"
|
|||||||
|
|
||||||
zero=0000000000000000000000000000000000000000
|
zero=0000000000000000000000000000000000000000
|
||||||
|
|
||||||
|
supervise_gitleaks_allow() {
|
||||||
|
log_opts=$1
|
||||||
|
ref=$2
|
||||||
|
report_file=$(mktemp)
|
||||||
|
if ! gitleaks git \
|
||||||
|
--log-opts="$log_opts" \
|
||||||
|
--no-banner \
|
||||||
|
--redact \
|
||||||
|
--ignore-gitleaks-allow \
|
||||||
|
--report-format=json \
|
||||||
|
--report-path="$report_file" \
|
||||||
|
--exit-code 0 \
|
||||||
|
1>&2; then
|
||||||
|
rm -f "$report_file"
|
||||||
|
echo "git-gate: gitleaks inline-suppression scan failed for $ref" >&2
|
||||||
|
return 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
proposal_id=$(
|
||||||
|
GITLEAKS_ALLOW_REF="$ref" python3 - "$report_file" <<'PY'
|
||||||
|
import datetime
|
||||||
|
import hashlib
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import uuid
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
report_path = Path(sys.argv[1])
|
||||||
|
queue_dir = os.environ.get("SUPERVISE_QUEUE_DIR", "")
|
||||||
|
slug = os.environ.get("SUPERVISE_BOTTLE_SLUG", "")
|
||||||
|
if not queue_dir or not slug:
|
||||||
|
sys.exit(2)
|
||||||
|
|
||||||
|
try:
|
||||||
|
raw = json.loads(report_path.read_text() or "[]")
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
sys.exit(3)
|
||||||
|
if not isinstance(raw, list):
|
||||||
|
sys.exit(3)
|
||||||
|
if not raw:
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
ref = os.environ.get("GITLEAKS_ALLOW_REF", "")
|
||||||
|
lines = [
|
||||||
|
"gitleaks inline suppression requires supervisor approval",
|
||||||
|
f"ref: {ref}",
|
||||||
|
"",
|
||||||
|
]
|
||||||
|
for i, finding in enumerate(raw, 1):
|
||||||
|
if not isinstance(finding, dict):
|
||||||
|
continue
|
||||||
|
file_path = finding.get("File", "")
|
||||||
|
line_no = finding.get("StartLine", finding.get("Line", ""))
|
||||||
|
rule_id = finding.get("RuleID", "")
|
||||||
|
commit = finding.get("Commit", "")
|
||||||
|
line = finding.get("Line", "")
|
||||||
|
lines.extend([
|
||||||
|
f"finding {i}:",
|
||||||
|
f" file: {file_path}",
|
||||||
|
f" line: {line_no}",
|
||||||
|
f" rule: {rule_id}",
|
||||||
|
f" commit: {commit}",
|
||||||
|
f" code: {line}",
|
||||||
|
"",
|
||||||
|
])
|
||||||
|
|
||||||
|
payload = "\n".join(lines).rstrip() + "\n"
|
||||||
|
proposal_id = str(uuid.uuid4())
|
||||||
|
proposal = {
|
||||||
|
"id": proposal_id,
|
||||||
|
"bottle_slug": slug,
|
||||||
|
"tool": "gitleaks-allow",
|
||||||
|
"proposed_file": payload,
|
||||||
|
"justification": (
|
||||||
|
"git-gate found gitleaks findings hidden by # gitleaks:allow; "
|
||||||
|
"approve only for dummy test fixtures or confirmed false positives"
|
||||||
|
),
|
||||||
|
"arrival_timestamp": datetime.datetime.now(
|
||||||
|
datetime.timezone.utc
|
||||||
|
).isoformat(),
|
||||||
|
"current_file_hash": hashlib.sha256(payload.encode("utf-8")).hexdigest(),
|
||||||
|
}
|
||||||
|
queue = Path(queue_dir)
|
||||||
|
queue.mkdir(parents=True, exist_ok=True)
|
||||||
|
path = queue / f"{proposal_id}.proposal.json"
|
||||||
|
tmp = path.with_suffix(path.suffix + ".tmp")
|
||||||
|
with tmp.open("w", encoding="utf-8") as f:
|
||||||
|
json.dump(proposal, f, indent=2)
|
||||||
|
f.write("\n")
|
||||||
|
os.chmod(tmp, 0o600)
|
||||||
|
os.replace(tmp, path)
|
||||||
|
print(proposal_id)
|
||||||
|
PY
|
||||||
|
)
|
||||||
|
rc=$?
|
||||||
|
rm -f "$report_file"
|
||||||
|
if [ "$rc" -eq 0 ] && [ -z "$proposal_id" ]; then
|
||||||
|
return 0
|
||||||
|
fi
|
||||||
|
if [ "$rc" -ne 0 ]; then
|
||||||
|
echo "git-gate: cannot route # gitleaks:allow finding to supervisor; refusing push" >&2
|
||||||
|
return 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
queue_dir=${SUPERVISE_QUEUE_DIR:-}
|
||||||
|
response_file="$queue_dir/${proposal_id}.response.json"
|
||||||
|
timeout=${SUPERVISE_GITLEAKS_ALLOW_TIMEOUT_SECONDS:-300}
|
||||||
|
case "$timeout" in
|
||||||
|
''|*[!0-9]*)
|
||||||
|
echo "git-gate: invalid SUPERVISE_GITLEAKS_ALLOW_TIMEOUT_SECONDS=$timeout" >&2
|
||||||
|
return 1
|
||||||
|
;;
|
||||||
|
esac
|
||||||
|
echo "git-gate: queued # gitleaks:allow supervisor approval $proposal_id" >&2
|
||||||
|
echo "git-gate: approve with './cli.py supervise' to continue this push" >&2
|
||||||
|
waited=0
|
||||||
|
while [ "$waited" -lt "$timeout" ]; do
|
||||||
|
if [ -f "$response_file" ]; then
|
||||||
|
status=$(python3 - "$response_file" <<'PY'
|
||||||
|
import json
|
||||||
|
import sys
|
||||||
|
try:
|
||||||
|
with open(sys.argv[1], encoding="utf-8") as f:
|
||||||
|
raw = json.load(f)
|
||||||
|
except (OSError, json.JSONDecodeError):
|
||||||
|
sys.exit(1)
|
||||||
|
status = raw.get("status")
|
||||||
|
if not isinstance(status, str):
|
||||||
|
sys.exit(1)
|
||||||
|
print(status)
|
||||||
|
PY
|
||||||
|
) || status=""
|
||||||
|
case "$status" in
|
||||||
|
approved|modified)
|
||||||
|
mkdir -p "$queue_dir/processed"
|
||||||
|
mv -f "$queue_dir/${proposal_id}.proposal.json" "$queue_dir/processed/" 2>/dev/null || true
|
||||||
|
mv -f "$queue_dir/${proposal_id}.response.json" "$queue_dir/processed/" 2>/dev/null || true
|
||||||
|
echo "git-gate: supervisor approved # gitleaks:allow for $ref" >&2
|
||||||
|
return 0
|
||||||
|
;;
|
||||||
|
rejected)
|
||||||
|
echo "git-gate: supervisor rejected # gitleaks:allow for $ref" >&2
|
||||||
|
return 1
|
||||||
|
;;
|
||||||
|
*)
|
||||||
|
echo "git-gate: invalid supervisor response for # gitleaks:allow" >&2
|
||||||
|
return 1
|
||||||
|
;;
|
||||||
|
esac
|
||||||
|
fi
|
||||||
|
sleep 1
|
||||||
|
waited=$((waited + 1))
|
||||||
|
done
|
||||||
|
echo "git-gate: supervisor approval timed out for # gitleaks:allow; refusing push" >&2
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
|
||||||
# Phase 1: gitleaks scan each ref's incoming commits.
|
# Phase 1: gitleaks scan each ref's incoming commits.
|
||||||
while IFS=' ' read -r old new ref; do
|
while IFS=' ' read -r old new ref; do
|
||||||
[ -z "$ref" ] && continue
|
[ -z "$ref" ] && continue
|
||||||
@@ -268,6 +426,9 @@ while IFS=' ' read -r old new ref; do
|
|||||||
echo "git-gate: gitleaks rejected push to $ref" >&2
|
echo "git-gate: gitleaks rejected push to $ref" >&2
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
if ! supervise_gitleaks_allow "$log_opts" "$ref"; then
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
done < "$refs_file"
|
done < "$refs_file"
|
||||||
|
|
||||||
# Phase 2: forward each ref to the upstream (`origin`, configured
|
# Phase 2: forward each ref to the upstream (`origin`, configured
|
||||||
|
|||||||
@@ -49,9 +49,11 @@ SUPERVISE_HOSTNAME = "supervise"
|
|||||||
SUPERVISE_PORT = 9100
|
SUPERVISE_PORT = 9100
|
||||||
|
|
||||||
TOOL_CAPABILITY_BLOCK = "capability-block"
|
TOOL_CAPABILITY_BLOCK = "capability-block"
|
||||||
|
TOOL_GITLEAKS_ALLOW = "gitleaks-allow"
|
||||||
TOOL_LIST_EGRESS_ROUTES = "list-egress-routes"
|
TOOL_LIST_EGRESS_ROUTES = "list-egress-routes"
|
||||||
TOOLS: tuple[str, ...] = (
|
TOOLS: tuple[str, ...] = (
|
||||||
TOOL_CAPABILITY_BLOCK,
|
TOOL_CAPABILITY_BLOCK,
|
||||||
|
TOOL_GITLEAKS_ALLOW,
|
||||||
TOOL_LIST_EGRESS_ROUTES,
|
TOOL_LIST_EGRESS_ROUTES,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -199,6 +199,30 @@ class TestHookRender(unittest.TestCase):
|
|||||||
self.assertIn('set -- "$@" --push-option="$opt"', hook)
|
self.assertIn('set -- "$@" --push-option="$opt"', hook)
|
||||||
self.assertIn('git push "$@" origin "$refspec"', hook)
|
self.assertIn('git push "$@" origin "$refspec"', hook)
|
||||||
|
|
||||||
|
def test_inline_gitleaks_allow_routes_to_supervisor(self):
|
||||||
|
hook = git_gate_render_hook()
|
||||||
|
# First gitleaks runs normally; only if that passes does the
|
||||||
|
# hook ask gitleaks to ignore inline allow comments and report
|
||||||
|
# the suppressed findings for human approval.
|
||||||
|
self.assertIn("--ignore-gitleaks-allow", hook)
|
||||||
|
self.assertIn("--report-format=json", hook)
|
||||||
|
self.assertIn('"tool": "gitleaks-allow"', hook)
|
||||||
|
self.assertIn("SUPERVISE_QUEUE_DIR", hook)
|
||||||
|
self.assertIn("SUPERVISE_BOTTLE_SLUG", hook)
|
||||||
|
self.assertIn("supervisor approved # gitleaks:allow", hook)
|
||||||
|
self.assertIn("supervisor rejected # gitleaks:allow", hook)
|
||||||
|
|
||||||
|
def test_inline_gitleaks_allow_fails_closed_without_supervisor(self):
|
||||||
|
hook = git_gate_render_hook()
|
||||||
|
self.assertIn(
|
||||||
|
"cannot route # gitleaks:allow finding to supervisor; refusing push",
|
||||||
|
hook,
|
||||||
|
)
|
||||||
|
self.assertIn(
|
||||||
|
"supervisor approval timed out for # gitleaks:allow; refusing push",
|
||||||
|
hook,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class TestAccessHookRender(unittest.TestCase):
|
class TestAccessHookRender(unittest.TestCase):
|
||||||
def test_access_hook_refreshes_origin_on_upload_pack(self):
|
def test_access_hook_refreshes_origin_on_upload_pack(self):
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ from bot_bottle.supervise import (
|
|||||||
STATUS_MODIFIED,
|
STATUS_MODIFIED,
|
||||||
STATUS_REJECTED,
|
STATUS_REJECTED,
|
||||||
TOOL_CAPABILITY_BLOCK,
|
TOOL_CAPABILITY_BLOCK,
|
||||||
|
TOOL_GITLEAKS_ALLOW,
|
||||||
archive_proposal,
|
archive_proposal,
|
||||||
audit_log_path,
|
audit_log_path,
|
||||||
list_pending_proposals,
|
list_pending_proposals,
|
||||||
@@ -318,6 +319,7 @@ class TestToolConstants(unittest.TestCase):
|
|||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
(
|
(
|
||||||
TOOL_CAPABILITY_BLOCK,
|
TOOL_CAPABILITY_BLOCK,
|
||||||
|
TOOL_GITLEAKS_ALLOW,
|
||||||
supervise.TOOL_LIST_EGRESS_ROUTES,
|
supervise.TOOL_LIST_EGRESS_ROUTES,
|
||||||
),
|
),
|
||||||
supervise.TOOLS,
|
supervise.TOOLS,
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ import tempfile
|
|||||||
import unittest
|
import unittest
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
from bot_bottle import supervise
|
from bot_bottle import supervise
|
||||||
from bot_bottle.cli import supervise as supervise_cli
|
from bot_bottle.cli import supervise as supervise_cli
|
||||||
@@ -21,6 +22,7 @@ from bot_bottle.supervise import (
|
|||||||
STATUS_MODIFIED,
|
STATUS_MODIFIED,
|
||||||
STATUS_REJECTED,
|
STATUS_REJECTED,
|
||||||
TOOL_CAPABILITY_BLOCK,
|
TOOL_CAPABILITY_BLOCK,
|
||||||
|
TOOL_GITLEAKS_ALLOW,
|
||||||
read_audit_entries,
|
read_audit_entries,
|
||||||
read_response,
|
read_response,
|
||||||
sha256_hex,
|
sha256_hex,
|
||||||
@@ -33,6 +35,7 @@ FIXED = datetime(2026, 5, 25, 12, 0, 0, tzinfo=timezone.utc)
|
|||||||
def _proposal(slug: str = "dev", tool: str = TOOL_CAPABILITY_BLOCK) -> Proposal:
|
def _proposal(slug: str = "dev", tool: str = TOOL_CAPABILITY_BLOCK) -> Proposal:
|
||||||
payloads = {
|
payloads = {
|
||||||
TOOL_CAPABILITY_BLOCK: "FROM python:3.13\n",
|
TOOL_CAPABILITY_BLOCK: "FROM python:3.13\n",
|
||||||
|
TOOL_GITLEAKS_ALLOW: "file: tests/test_fixture.py\nline: 3\n",
|
||||||
}
|
}
|
||||||
payload = payloads.get(tool, "")
|
payload = payloads.get(tool, "")
|
||||||
return Proposal.new(
|
return Proposal.new(
|
||||||
@@ -154,6 +157,28 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
|
|||||||
supervise_cli.approve(qp)
|
supervise_cli.approve(qp)
|
||||||
self.assertEqual([], read_audit_entries("egress", "dev"))
|
self.assertEqual([], read_audit_entries("egress", "dev"))
|
||||||
|
|
||||||
|
def test_approve_archives_gitleaks_allow(self):
|
||||||
|
qp = self._enqueue(tool=TOOL_GITLEAKS_ALLOW)
|
||||||
|
supervise_cli.approve(qp, notes="dummy fixture")
|
||||||
|
resp = read_response(qp.queue_dir / "processed", qp.proposal.id)
|
||||||
|
self.assertEqual(STATUS_APPROVED, resp.status)
|
||||||
|
self.assertEqual("dummy fixture", resp.notes)
|
||||||
|
|
||||||
|
def test_tui_gitleaks_allow_requires_reason(self):
|
||||||
|
qp = self._enqueue(tool=TOOL_GITLEAKS_ALLOW)
|
||||||
|
with patch.object(supervise_cli, "_prompt", return_value=""):
|
||||||
|
status = supervise_cli._approve_from_tui(None, qp) # type: ignore[arg-type]
|
||||||
|
self.assertEqual("approve aborted (empty reason)", status)
|
||||||
|
self.assertFalse((qp.queue_dir / "processed").exists())
|
||||||
|
|
||||||
|
def test_tui_gitleaks_allow_writes_reason(self):
|
||||||
|
qp = self._enqueue(tool=TOOL_GITLEAKS_ALLOW)
|
||||||
|
with patch.object(supervise_cli, "_prompt", return_value="test fixture"):
|
||||||
|
status = supervise_cli._approve_from_tui(None, qp) # type: ignore[arg-type]
|
||||||
|
self.assertIn("approved gitleaks-allow", status)
|
||||||
|
resp = read_response(qp.queue_dir / "processed", qp.proposal.id)
|
||||||
|
self.assertEqual("test fixture", resp.notes)
|
||||||
|
|
||||||
|
|
||||||
# class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
|
# class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
|
||||||
# # DISABLED — capability_apply functionality is currently commented out.
|
# # DISABLED — capability_apply functionality is currently commented out.
|
||||||
|
|||||||
Reference in New Issue
Block a user