feat(egress-proxy): block HTTPS git push + restore role provisioner
Two related fixes on top of PR #29's chunk-2 cutover: 1. Universal HTTPS git-push block in the egress-proxy addon (`is_git_push_request` in egress_proxy_addon_core, called from the mitmproxy request hook before route matching). 403s any `/git-receive-pack` or `info/refs?service=git-receive-pack` — defense in depth so git-gate (PRD 0008) remains the only outbound path for writes, gitleaks-scanned by its pre-receive. Replicates cred-proxy's `is_git_push_request` behavior. 2. Restored agent-side role provisioner. Brings back `Role` on EgressProxyRoute (manifest + runtime) with three roles — `anthropic-base-url`, `npm-registry`, `tea-login`. Singleton constraint on the first two carries over from cred-proxy. `git-insteadof` is intentionally absent (option 1 above handles the push-bypass concern, and the canonical-URL rewrite has no function when egress-proxy is on HTTPS_PROXY). The provisioner (`backend/docker/provision/egress_proxy.py`): - `~/.npmrc` registry= the canonical upstream URL. - `~/.config/tea/config.yml` logins[] entry per tea-login route. - `ANTHROPIC_BASE_URL` env set in prepare.py based on the anthropic-base-url role (was a token_ref="CLAUDE_CODE_OAUTH_TOKEN" check in this PR's earlier draft — the role marker is cleaner and matches the cred-proxy precedent the user wants kept). All three dotfile values point at canonical upstream URLs; the agent's HTTPS_PROXY=egress-proxy routes them through the proxy automatically. Tests: 11 new role-validation tests, 11 new provisioner-render tests, the chunk-1 manifest fixture exercise role=anthropic-base-url. 400 tests pass (was 376). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -10,6 +10,7 @@ from claude_bottle.egress_proxy_addon_core import (
|
||||
Decision,
|
||||
Route,
|
||||
decide,
|
||||
is_git_push_request,
|
||||
load_routes,
|
||||
match_route,
|
||||
parse_routes,
|
||||
@@ -245,5 +246,52 @@ class TestDecisionDefaults(unittest.TestCase):
|
||||
self.assertIsNone(d.inject_authorization)
|
||||
|
||||
|
||||
# --- is_git_push_request ------------------------------------------------
|
||||
|
||||
|
||||
class TestIsGitPushRequest(unittest.TestCase):
|
||||
def test_post_git_receive_pack_endpoint(self):
|
||||
# The POST that carries the actual push payload.
|
||||
self.assertTrue(is_git_push_request("/owner/repo.git/git-receive-pack", ""))
|
||||
|
||||
def test_info_refs_with_receive_pack_service(self):
|
||||
# The capability advertisement GET that precedes a push.
|
||||
self.assertTrue(is_git_push_request(
|
||||
"/owner/repo.git/info/refs",
|
||||
"service=git-receive-pack",
|
||||
))
|
||||
|
||||
def test_info_refs_with_extra_query_params(self):
|
||||
# service= may appear with other params in any order.
|
||||
self.assertTrue(is_git_push_request(
|
||||
"/owner/repo.git/info/refs",
|
||||
"foo=bar&service=git-receive-pack&z=1",
|
||||
))
|
||||
self.assertTrue(is_git_push_request(
|
||||
"/owner/repo.git/info/refs",
|
||||
"service=git-receive-pack&foo=bar",
|
||||
))
|
||||
|
||||
def test_fetch_endpoints_not_blocked(self):
|
||||
# `service=git-upload-pack` is fetch; never blocked.
|
||||
self.assertFalse(is_git_push_request(
|
||||
"/owner/repo.git/info/refs",
|
||||
"service=git-upload-pack",
|
||||
))
|
||||
self.assertFalse(is_git_push_request(
|
||||
"/owner/repo.git/git-upload-pack", "",
|
||||
))
|
||||
|
||||
def test_info_refs_without_service_not_blocked(self):
|
||||
# Bare info/refs (no query) defaults to git-upload-pack on
|
||||
# the server side; not push.
|
||||
self.assertFalse(is_git_push_request("/x/info/refs", ""))
|
||||
|
||||
def test_unrelated_paths_not_blocked(self):
|
||||
self.assertFalse(is_git_push_request("/repos/owner/repo", ""))
|
||||
self.assertFalse(is_git_push_request("/v1/messages", ""))
|
||||
self.assertFalse(is_git_push_request("/", ""))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
@@ -128,6 +128,64 @@ class TestAuth(unittest.TestCase):
|
||||
}])
|
||||
|
||||
|
||||
class TestRole(unittest.TestCase):
|
||||
def test_omitted_means_no_roles(self):
|
||||
b = _bottle([{"host": "x.example"}])
|
||||
self.assertEqual((), b.egress_proxy.routes[0].Role)
|
||||
|
||||
def test_string_normalizes_to_tuple(self):
|
||||
b = _bottle([{"host": "api.anthropic.com",
|
||||
"role": "anthropic-base-url",
|
||||
"auth": {"scheme": "Bearer", "token_ref": "T"}}])
|
||||
self.assertEqual(("anthropic-base-url",),
|
||||
b.egress_proxy.routes[0].Role)
|
||||
|
||||
def test_list_supported(self):
|
||||
b = _bottle([{"host": "registry.npmjs.org",
|
||||
"role": ["npm-registry"]}])
|
||||
self.assertEqual(("npm-registry",), b.egress_proxy.routes[0].Role)
|
||||
|
||||
def test_unknown_role_rejected(self):
|
||||
with self.assertRaises(Die):
|
||||
_bottle([{"host": "x.example", "role": "git-insteadof"}])
|
||||
|
||||
def test_non_string_role_rejected(self):
|
||||
with self.assertRaises(Die):
|
||||
_bottle([{"host": "x.example", "role": 42}])
|
||||
|
||||
def test_list_with_non_string_item_rejected(self):
|
||||
with self.assertRaises(Die):
|
||||
_bottle([{"host": "x.example", "role": ["npm-registry", 42]}])
|
||||
|
||||
def test_singleton_anthropic_base_url_enforced(self):
|
||||
with self.assertRaises(Die):
|
||||
_bottle([
|
||||
{"host": "api.anthropic.com", "role": "anthropic-base-url",
|
||||
"auth": {"scheme": "Bearer", "token_ref": "T1"}},
|
||||
{"host": "api2.anthropic.example",
|
||||
"role": "anthropic-base-url",
|
||||
"auth": {"scheme": "Bearer", "token_ref": "T2"}},
|
||||
])
|
||||
|
||||
def test_singleton_npm_registry_enforced(self):
|
||||
with self.assertRaises(Die):
|
||||
_bottle([
|
||||
{"host": "registry.npmjs.org", "role": "npm-registry"},
|
||||
{"host": "npm.example", "role": "npm-registry"},
|
||||
])
|
||||
|
||||
def test_tea_login_is_not_singleton(self):
|
||||
# Multiple Gitea instances on one bottle is a legitimate
|
||||
# dev setup; tea-login lays one logins[] entry per route.
|
||||
b = _bottle([
|
||||
{"host": "gitea.example", "role": "tea-login",
|
||||
"auth": {"scheme": "token", "token_ref": "T1"}},
|
||||
{"host": "other-gitea.example", "role": "tea-login",
|
||||
"auth": {"scheme": "token", "token_ref": "T2"}},
|
||||
])
|
||||
self.assertEqual(2, len(b.egress_proxy.routes))
|
||||
|
||||
|
||||
class TestRouteValidation(unittest.TestCase):
|
||||
def test_duplicate_hosts_rejected(self):
|
||||
# Routes match by exact host; duplicates leave the choice
|
||||
|
||||
@@ -25,6 +25,7 @@ _BOTTLE_DEV = """
|
||||
egress_proxy:
|
||||
routes:
|
||||
- host: api.anthropic.com
|
||||
role: anthropic-base-url
|
||||
auth:
|
||||
scheme: Bearer
|
||||
token_ref: CLAUDE_CODE_OAUTH_TOKEN
|
||||
@@ -92,6 +93,7 @@ class TestBottleFileParses(_ResolveCase):
|
||||
self.assertEqual("api.anthropic.com", routes[0].Host)
|
||||
self.assertEqual("Bearer", routes[0].AuthScheme)
|
||||
self.assertEqual("CLAUDE_CODE_OAUTH_TOKEN", routes[0].TokenRef)
|
||||
self.assertEqual(("anthropic-base-url",), routes[0].Role)
|
||||
self.assertEqual(["example.com"], list(m.bottles["dev"].egress.allowlist))
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,109 @@
|
||||
"""Unit: agent-side provisioning for egress-proxy roles (PRD 0017).
|
||||
|
||||
Each role drives one dotfile / env-var rewrite at bottle bring-up.
|
||||
HTTPS_PROXY routes the canonical URL through egress-proxy, which
|
||||
injects auth and DLP-scans on the upstream leg."""
|
||||
|
||||
import unittest
|
||||
|
||||
from claude_bottle.backend.docker.provision.egress_proxy import (
|
||||
render_npmrc,
|
||||
render_tea_config,
|
||||
)
|
||||
from claude_bottle.egress_proxy import egress_proxy_routes_for_bottle
|
||||
from claude_bottle.manifest import Manifest
|
||||
|
||||
|
||||
def _routes(manifest_routes):
|
||||
m = Manifest.from_json_obj({
|
||||
"bottles": {"dev": {"egress_proxy": {"routes": manifest_routes}}},
|
||||
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
|
||||
})
|
||||
return egress_proxy_routes_for_bottle(m.bottles["dev"])
|
||||
|
||||
|
||||
# --- npmrc -----------------------------------------------------------
|
||||
|
||||
|
||||
class TestRenderNpmrc(unittest.TestCase):
|
||||
def test_canonical_upstream_url(self):
|
||||
routes = _routes([
|
||||
{"host": "registry.npmjs.org", "role": "npm-registry",
|
||||
"auth": {"scheme": "Bearer", "token_ref": "NPM_TOKEN"}},
|
||||
])
|
||||
self.assertEqual(
|
||||
"registry=https://registry.npmjs.org/\n",
|
||||
render_npmrc(routes),
|
||||
)
|
||||
|
||||
def test_empty_when_no_npm_role(self):
|
||||
routes = _routes([
|
||||
{"host": "api.github.com",
|
||||
"auth": {"scheme": "Bearer", "token_ref": "GH"}},
|
||||
])
|
||||
self.assertEqual("", render_npmrc(routes))
|
||||
|
||||
def test_no_routes_empty(self):
|
||||
self.assertEqual("", render_npmrc(()))
|
||||
|
||||
def test_no_auth_token_in_npmrc(self):
|
||||
# The proxy injects auth; the npmrc must carry no secret —
|
||||
# not even `:always-auth=true` lines that would prompt npm
|
||||
# to wait for credentials. Just the registry URL.
|
||||
routes = _routes([
|
||||
{"host": "registry.npmjs.org", "role": "npm-registry",
|
||||
"auth": {"scheme": "Bearer", "token_ref": "NPM_TOKEN"}},
|
||||
])
|
||||
out = render_npmrc(routes)
|
||||
self.assertNotIn("_authToken", out)
|
||||
self.assertNotIn("NPM_TOKEN", out)
|
||||
|
||||
|
||||
# --- tea config ------------------------------------------------------
|
||||
|
||||
|
||||
class TestRenderTeaConfig(unittest.TestCase):
|
||||
def test_single_login(self):
|
||||
routes = _routes([
|
||||
{"host": "gitea.dideric.is", "role": "tea-login",
|
||||
"auth": {"scheme": "token", "token_ref": "GITEA_TOKEN"}},
|
||||
])
|
||||
out = render_tea_config(routes)
|
||||
self.assertIn("- name: gitea.dideric.is", out)
|
||||
self.assertIn("url: https://gitea.dideric.is", out)
|
||||
self.assertIn("token: egress-proxy-placeholder", out)
|
||||
|
||||
def test_multiple_logins_each_get_own_entry(self):
|
||||
routes = _routes([
|
||||
{"host": "gitea.a.example", "role": "tea-login",
|
||||
"auth": {"scheme": "token", "token_ref": "T_A"}},
|
||||
{"host": "gitea.b.example", "role": "tea-login",
|
||||
"auth": {"scheme": "token", "token_ref": "T_B"}},
|
||||
])
|
||||
out = render_tea_config(routes)
|
||||
self.assertIn("- name: gitea.a.example", out)
|
||||
self.assertIn("- name: gitea.b.example", out)
|
||||
|
||||
def test_empty_when_no_tea_role(self):
|
||||
routes = _routes([
|
||||
{"host": "api.github.com",
|
||||
"auth": {"scheme": "Bearer", "token_ref": "GH"}},
|
||||
])
|
||||
self.assertEqual("", render_tea_config(routes))
|
||||
|
||||
def test_no_routes_empty(self):
|
||||
self.assertEqual("", render_tea_config(()))
|
||||
|
||||
def test_no_real_token_in_config(self):
|
||||
routes = _routes([
|
||||
{"host": "gitea.dideric.is", "role": "tea-login",
|
||||
"auth": {"scheme": "token", "token_ref": "GITEA_TOKEN"}},
|
||||
])
|
||||
out = render_tea_config(routes)
|
||||
# GITEA_TOKEN is just the env var name, not the value —
|
||||
# placeholder-only is the SC.
|
||||
self.assertIn("egress-proxy-placeholder", out)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user