test(egress): cover blocked git push fail-fast
This commit is contained in:
@@ -4,7 +4,14 @@ These tests target `egress_addon_core` — the host-importable
|
||||
half of the addon. The mitmproxy hook wrapper in
|
||||
`egress_addon.py` is container-only and is not exercised here."""
|
||||
|
||||
import http.server
|
||||
import subprocess
|
||||
import tempfile
|
||||
import threading
|
||||
import time
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from urllib.parse import urlsplit
|
||||
|
||||
from bot_bottle.egress_addon_core import (
|
||||
Decision,
|
||||
@@ -326,5 +333,88 @@ class TestIsGitPushRequest(unittest.TestCase):
|
||||
self.assertFalse(is_git_push_request("/", ""))
|
||||
|
||||
|
||||
class TestGitPushBlockFailFast(unittest.TestCase):
|
||||
def test_real_git_push_fails_fast_when_egress_blocks_receive_pack(self):
|
||||
"""A real git client should see egress's HTTPS-push 403 and exit.
|
||||
|
||||
The local server stands in for the egress proxy response after
|
||||
CONNECT/TLS interception; git smart-HTTP uses the same paths over
|
||||
plain HTTP here, which keeps this regression test hermetic.
|
||||
"""
|
||||
|
||||
seen_paths: list[str] = []
|
||||
|
||||
class Handler(http.server.BaseHTTPRequestHandler):
|
||||
def do_GET(self):
|
||||
self._handle()
|
||||
|
||||
def do_POST(self):
|
||||
self._handle()
|
||||
|
||||
def _handle(self):
|
||||
parsed = urlsplit(self.path)
|
||||
seen_paths.append(self.path)
|
||||
if is_git_push_request(parsed.path, parsed.query):
|
||||
body = (
|
||||
b"egress: git push over HTTPS is not supported; "
|
||||
b"use the bottle.git SSH path (gitleaks-scanned by "
|
||||
b"git-gate's pre-receive hook)."
|
||||
)
|
||||
self.send_response(403)
|
||||
self.send_header("Content-Type", "text/plain; charset=utf-8")
|
||||
self.send_header("Content-Length", str(len(body)))
|
||||
self.end_headers()
|
||||
self.wfile.write(body)
|
||||
return
|
||||
self.send_response(404)
|
||||
self.send_header("Content-Length", "0")
|
||||
self.end_headers()
|
||||
|
||||
def log_message(self, _fmt, *_args):
|
||||
pass
|
||||
|
||||
server = http.server.ThreadingHTTPServer(("127.0.0.1", 0), Handler)
|
||||
thread = threading.Thread(target=server.serve_forever, daemon=True)
|
||||
thread.start()
|
||||
self.addCleanup(server.shutdown)
|
||||
self.addCleanup(server.server_close)
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
repo = Path(tmp) / "repo"
|
||||
repo.mkdir()
|
||||
subprocess.run(["git", "init"], cwd=repo, check=True,
|
||||
capture_output=True, text=True)
|
||||
subprocess.run(["git", "config", "user.name", "test"],
|
||||
cwd=repo, check=True)
|
||||
subprocess.run(["git", "config", "user.email", "test@example.invalid"],
|
||||
cwd=repo, check=True)
|
||||
(repo / "README.md").write_text("test\n")
|
||||
subprocess.run(["git", "add", "README.md"], cwd=repo, check=True)
|
||||
subprocess.run(["git", "commit", "-m", "test"],
|
||||
cwd=repo, check=True, capture_output=True, text=True)
|
||||
remote = f"http://127.0.0.1:{server.server_port}/owner/repo.git"
|
||||
subprocess.run(["git", "remote", "add", "origin", remote],
|
||||
cwd=repo, check=True)
|
||||
|
||||
started = time.monotonic()
|
||||
result = subprocess.run(
|
||||
["git", "push", "origin", "HEAD:refs/heads/main"],
|
||||
cwd=repo,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=5,
|
||||
check=False,
|
||||
)
|
||||
elapsed = time.monotonic() - started
|
||||
|
||||
self.assertNotEqual(0, result.returncode)
|
||||
self.assertLess(elapsed, 5)
|
||||
self.assertTrue(
|
||||
any("service=git-receive-pack" in p for p in seen_paths),
|
||||
f"git did not request receive-pack capabilities; saw {seen_paths!r}",
|
||||
)
|
||||
self.assertIn("403", result.stderr)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user