fix(cred_proxy): close git-push bypass + route through pipelock (PRD 0010)
Three coupled fixes that close a documented bypass of git-gate's gitleaks pre-receive hook: 1. cred-proxy refuses git smart-HTTP push at runtime. Any path ending in /git-receive-pack or /info/refs?service=git-receive-pack returns 403 with a pointer at the bottle.git SSH path. Fetch (upload-pack) is still allowed — the bypass we're closing is push, where gitleaks is the load-bearing scanner. Hard guarantee. 2. The provisioner suppresses the cred-proxy `~/.gitconfig` insteadOf rewrite for any host already declared in bottle.git. git-gate is the canonical git path there; we don't write a competing rule that would let `git clone https://<host>/...` succeed in ways that confuse on push. Defense in depth — (1) is the hard guarantee. 3. cred-proxy routes its outbound HTTPS through pipelock. The sidecar's environ now sets HTTPS_PROXY=<pipelock-url>, and the image's entrypoint runs `update-ca-certificates` over the per-bottle pipelock CA (docker cp'd into /usr/local/share/ca-certificates/pipelock.crt before start) so the proxy's HTTPS client trusts pipelock's bumped certs. Consequence: pipelock's allowlist + body scanner now sit in the cred-proxy egress path the same way they sit in front of direct agent traffic. The cred-proxy upstream hosts (api.github.com, github.com, gitea hosts, registry.npmjs.org) come OFF pipelock's passthrough_domains. Only api.anthropic.com remains on passthrough (LLM body content legitimately trips DLP). PRD 0010 updated to reflect all three. Tests adjusted: the "cred-proxy hosts go on passthrough" assertion in test_pipelock_allowlist flips to "they don't", a new TestIsGitPushRequest exercises the smart-HTTP refusal predicate, and the gitconfig renderer tests cover the per-host suppression matrix.
This commit is contained in:
@@ -7,6 +7,7 @@ from claude_bottle.cred_proxy_server import (
|
||||
Route,
|
||||
build_forward_headers,
|
||||
filter_response_headers,
|
||||
is_git_push_request,
|
||||
load_tokens,
|
||||
parse_routes,
|
||||
select_route,
|
||||
@@ -183,6 +184,49 @@ class TestFilterResponseHeaders(unittest.TestCase):
|
||||
self.assertNotIn("transfer-encoding", names)
|
||||
|
||||
|
||||
class TestIsGitPushRequest(unittest.TestCase):
|
||||
"""git push over HTTPS goes through /info/refs?service=git-receive-pack
|
||||
(capabilities probe) then POST /git-receive-pack (the push body).
|
||||
Fetches use /git-upload-pack and are not blocked — the bypass we're
|
||||
closing is push, since git-gate's gitleaks pre-receive is the scanner
|
||||
for outbound git data."""
|
||||
|
||||
def test_push_capabilities_probe_blocked(self):
|
||||
self.assertTrue(is_git_push_request(
|
||||
"/gh-git/owner/repo.git/info/refs",
|
||||
"service=git-receive-pack",
|
||||
))
|
||||
|
||||
def test_push_body_blocked(self):
|
||||
self.assertTrue(is_git_push_request(
|
||||
"/gh-git/owner/repo.git/git-receive-pack", "",
|
||||
))
|
||||
|
||||
def test_fetch_capabilities_allowed(self):
|
||||
self.assertFalse(is_git_push_request(
|
||||
"/gh-git/owner/repo.git/info/refs",
|
||||
"service=git-upload-pack",
|
||||
))
|
||||
|
||||
def test_fetch_body_allowed(self):
|
||||
self.assertFalse(is_git_push_request(
|
||||
"/gh-git/owner/repo.git/git-upload-pack", "",
|
||||
))
|
||||
|
||||
def test_rest_api_allowed(self):
|
||||
# tea/gh-style REST calls hit /api/v1/... — unrelated.
|
||||
self.assertFalse(is_git_push_request(
|
||||
"/gitea/gitea.dideric.is/api/v1/repos/x/y", "",
|
||||
))
|
||||
|
||||
def test_push_with_extra_query_params(self):
|
||||
# `service` may appear with other params in any order.
|
||||
self.assertTrue(is_git_push_request(
|
||||
"/gh-git/owner/repo.git/info/refs",
|
||||
"trace=1&service=git-receive-pack",
|
||||
))
|
||||
|
||||
|
||||
class TestLoadTokens(unittest.TestCase):
|
||||
def test_reads_per_route_env(self):
|
||||
routes = (
|
||||
|
||||
@@ -4,6 +4,7 @@ The full docker lifecycle is exercised by integration tests; here we
|
||||
cover the pure helpers and the validation checks `.start` runs
|
||||
before touching docker."""
|
||||
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
@@ -26,6 +27,8 @@ def _empty_plan(**overrides):
|
||||
"token_env_map": {},
|
||||
"internal_network": "",
|
||||
"egress_network": "",
|
||||
"pipelock_ca_host_path": Path(),
|
||||
"pipelock_proxy_url": "",
|
||||
}
|
||||
base.update(overrides)
|
||||
return CredProxyPlan(**base)
|
||||
@@ -77,6 +80,26 @@ class TestStartGuards(unittest.TestCase):
|
||||
routes_path=Path("/tmp/cred-proxy-test-does-not-exist.json"),
|
||||
))
|
||||
|
||||
def test_pipelock_url_without_ca_dies(self):
|
||||
# URL set + CA path empty/missing is a wiring bug: either both
|
||||
# populated (production) or both empty (test escape hatch).
|
||||
upstream = CredProxyUpstream(
|
||||
kind="anthropic", path="/anthropic/",
|
||||
upstream="https://api.anthropic.com",
|
||||
auth_scheme="Bearer", token_env="CRED_PROXY_TOKEN_0",
|
||||
token_ref="T",
|
||||
)
|
||||
with tempfile.NamedTemporaryFile() as routes:
|
||||
with self.assertRaises(Die):
|
||||
self.proxy.start(_empty_plan(
|
||||
upstreams=(upstream,),
|
||||
internal_network="net-x",
|
||||
egress_network="egress-x",
|
||||
routes_path=Path(routes.name),
|
||||
pipelock_proxy_url="http://pipelock:8888",
|
||||
pipelock_ca_host_path=Path("/tmp/cred-proxy-no-ca.pem"),
|
||||
))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
@@ -92,9 +92,13 @@ class TestTlsPassthrough(unittest.TestCase):
|
||||
passthrough = pipelock_effective_tls_passthrough(_bottle({}))
|
||||
self.assertEqual(["api.anthropic.com"], passthrough)
|
||||
|
||||
def test_token_hosts_added_to_passthrough(self):
|
||||
# cred-proxy validates upstream certs with the real CA bundle;
|
||||
# pipelock must not MITM these or the handshake fails.
|
||||
def test_token_hosts_NOT_added_to_passthrough(self):
|
||||
# cred-proxy now trusts pipelock's per-bottle CA (loaded into
|
||||
# its container's trust store via docker cp + update-ca-
|
||||
# certificates at start time), so pipelock can MITM the
|
||||
# cred-proxy -> upstream leg and body-scan it. Auto-adding
|
||||
# cred-proxy hosts to passthrough would silently disable that
|
||||
# second scanner for github / gitea / npm.
|
||||
passthrough = pipelock_effective_tls_passthrough(_bottle({
|
||||
"tokens": [
|
||||
{"Kind": "github", "TokenRef": "G"},
|
||||
@@ -103,10 +107,7 @@ class TestTlsPassthrough(unittest.TestCase):
|
||||
"Url": "https://gitea.dideric.is"},
|
||||
],
|
||||
}))
|
||||
for host in ("api.anthropic.com", "api.github.com", "github.com",
|
||||
"registry.npmjs.org", "gitea.dideric.is"):
|
||||
self.assertIn(host, passthrough)
|
||||
self.assertEqual(passthrough, sorted(passthrough), "sorted")
|
||||
self.assertEqual(["api.anthropic.com"], passthrough)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -83,6 +83,40 @@ class TestRenderGitconfig(unittest.TestCase):
|
||||
self.assertIn("gitea.dideric.is/", out)
|
||||
self.assertIn("gitea.example.com/", out)
|
||||
|
||||
def test_github_suppressed_when_git_gate_covers_host(self):
|
||||
# When bottle.git brokers github.com over SSH, git-gate is the
|
||||
# canonical git path. The cred-proxy https://github.com/
|
||||
# rewrite would let the agent push over HTTPS — bypassing
|
||||
# gitleaks. Suppress it.
|
||||
out = render_cred_proxy_gitconfig(
|
||||
_upstreams([{"Kind": "github", "TokenRef": "GH"}]),
|
||||
{"github.com"},
|
||||
)
|
||||
self.assertEqual("", out)
|
||||
|
||||
def test_gitea_suppressed_when_git_gate_covers_host(self):
|
||||
out = render_cred_proxy_gitconfig(
|
||||
_upstreams([{"Kind": "gitea", "TokenRef": "T",
|
||||
"Url": "https://gitea.dideric.is"}]),
|
||||
{"gitea.dideric.is"},
|
||||
)
|
||||
self.assertEqual("", out)
|
||||
|
||||
def test_partial_suppression_keeps_other_giteas(self):
|
||||
# Two gitea instances; git-gate brokers one. The other still
|
||||
# gets the cred-proxy rewrite.
|
||||
out = render_cred_proxy_gitconfig(
|
||||
_upstreams([
|
||||
{"Kind": "gitea", "TokenRef": "T1",
|
||||
"Url": "https://gitea.dideric.is"},
|
||||
{"Kind": "gitea", "TokenRef": "T2",
|
||||
"Url": "https://gitea.example.com"},
|
||||
]),
|
||||
{"gitea.dideric.is"},
|
||||
)
|
||||
self.assertNotIn("gitea.dideric.is/", out)
|
||||
self.assertIn("gitea.example.com/", out)
|
||||
|
||||
|
||||
class TestRenderTeaConfig(unittest.TestCase):
|
||||
def test_empty_when_no_gitea(self):
|
||||
|
||||
Reference in New Issue
Block a user