egress: require opt-in for HTTPS git fetch
test / unit (pull_request) Successful in 42s
test / integration (pull_request) Successful in 27s
lint / lint (push) Successful in 1m53s
test / unit (push) Successful in 41s
test / integration (push) Successful in 23s
Update Quality Badges / update-badges (push) Successful in 1m35s

This commit was merged in pull request #227.
This commit is contained in:
2026-06-10 07:00:01 +00:00
parent acb9cd67c6
commit 3f04567290
8 changed files with 240 additions and 7 deletions
+14
View File
@@ -86,6 +86,11 @@ class TestManifestRouteLift(unittest.TestCase):
self.assertEqual(("token_patterns",), r.outbound_detectors)
self.assertEqual((), r.inbound_detectors)
def test_git_fetch_policy_lifted(self):
b = _bottle([{"host": "github.com", "git": {"fetch": True}}])
routes = egress_manifest_routes(b)
self.assertTrue(routes[0].git_fetch)
class TestSlotAssignment(unittest.TestCase):
"""Slot assignment happens in egress_routes_for_bottle."""
@@ -324,6 +329,15 @@ class TestRenderRoutes(unittest.TestCase):
self.assertEqual(("token_patterns",), addon_routes[0].outbound_detectors)
self.assertEqual((), addon_routes[0].inbound_detectors)
def test_git_fetch_policy_round_trips(self):
from bot_bottle.egress_addon_core import load_routes
b = _bottle([{"host": "github.com", "git": {"fetch": True}}])
routes = egress_routes_for_bottle(b)
rendered = egress_render_routes(routes)
self.assertEqual({"fetch": True}, self._parsed(routes)[0]["git"])
addon_routes = load_routes(rendered)
self.assertTrue(addon_routes[0].git_fetch)
def test_log_zero_omitted_from_render(self):
b = _bottle([{"host": "x.example"}])
routes = egress_routes_for_bottle(b)
+78 -2
View File
@@ -25,7 +25,9 @@ from bot_bottle.egress_addon_core import (
build_inbound_scan_text,
build_outbound_scan_text,
decide,
decide_git_fetch,
evaluate_matches,
is_git_fetch_request,
is_git_push_request,
load_config,
load_routes,
@@ -67,6 +69,31 @@ class TestParseRoutes(unittest.TestCase):
self.assertEqual("Bearer", r.auth_scheme)
self.assertEqual("EGRESS_TOKEN_0", r.token_env)
def test_git_fetch_defaults_false(self):
routes = parse_routes({"routes": [{"host": "github.com"}]})
self.assertFalse(routes[0].git_fetch)
def test_git_fetch_true(self):
routes = parse_routes({"routes": [{
"host": "github.com",
"git": {"fetch": True},
}]})
self.assertTrue(routes[0].git_fetch)
def test_git_fetch_must_be_boolean(self):
with self.assertRaises(ValueError):
parse_routes({"routes": [{
"host": "github.com",
"git": {"fetch": "yes"},
}]})
def test_unknown_git_key_rejected(self):
with self.assertRaises(ValueError):
parse_routes({"routes": [{
"host": "github.com",
"git": {"push": True},
}]})
def test_order_preserved(self):
routes = parse_routes({"routes": [
{"host": "a.example"},
@@ -604,6 +631,24 @@ class TestDecisionDefaults(unittest.TestCase):
self.assertIsNone(d.inject_authorization)
class TestDecideGitFetch(unittest.TestCase):
def test_blocks_when_host_not_allowlisted(self):
d = decide_git_fetch((), "github.com")
self.assertEqual("block", d.action)
self.assertIn("git fetch/clone over HTTPS", d.reason)
def test_blocks_when_route_does_not_opt_in(self):
d = decide_git_fetch((Route(host="github.com"),), "github.com")
self.assertEqual("block", d.action)
def test_forwards_when_route_opts_in(self):
d = decide_git_fetch(
(Route(host="github.com", git_fetch=True),),
"github.com",
)
self.assertEqual("forward", d.action)
# --- scan_outbound -------------------------------------------------------
@@ -620,7 +665,7 @@ class TestScanOutboundBody(unittest.TestCase):
self.assertIn("OpenAI API key", result.reason)
# --- is_git_push_request ------------------------------------------------
# --- HTTPS Git request detection ----------------------------------------
class TestIsGitPushRequest(unittest.TestCase):
@@ -643,7 +688,7 @@ class TestIsGitPushRequest(unittest.TestCase):
"service=git-receive-pack&foo=bar",
))
def test_fetch_endpoints_not_blocked(self):
def test_fetch_endpoints_are_not_push(self):
self.assertFalse(is_git_push_request(
"/owner/repo.git/info/refs",
"service=git-upload-pack",
@@ -661,6 +706,37 @@ class TestIsGitPushRequest(unittest.TestCase):
self.assertFalse(is_git_push_request("/", ""))
class TestIsGitFetchRequest(unittest.TestCase):
def test_post_git_upload_pack_endpoint(self):
self.assertTrue(is_git_fetch_request("/owner/repo.git/git-upload-pack", ""))
def test_info_refs_with_upload_pack_service(self):
self.assertTrue(is_git_fetch_request(
"/owner/repo.git/info/refs",
"service=git-upload-pack",
))
def test_info_refs_with_extra_query_params(self):
self.assertTrue(is_git_fetch_request(
"/owner/repo.git/info/refs",
"foo=bar&service=git-upload-pack&z=1",
))
def test_push_endpoints_are_not_fetch(self):
self.assertFalse(is_git_fetch_request(
"/owner/repo.git/info/refs",
"service=git-receive-pack",
))
self.assertFalse(is_git_fetch_request(
"/owner/repo.git/git-receive-pack", "",
))
def test_unrelated_paths_not_fetch(self):
self.assertFalse(is_git_fetch_request("/repos/owner/repo", ""))
self.assertFalse(is_git_fetch_request("/v1/messages", ""))
self.assertFalse(is_git_fetch_request("/", ""))
class TestGitPushBlockFailFast(unittest.TestCase):
def test_real_git_push_fails_fast_when_egress_blocks_receive_pack(self):
seen_paths: list[str] = []
+28 -1
View File
@@ -2,7 +2,8 @@
The route shape uses Gateway API HTTPRoute match vocabulary:
`host` (required), optional `matches` (paths/methods/headers),
optional nested `auth: { scheme, token_ref }`, optional `dlp`.
optional nested `auth: { scheme, token_ref }`, optional `dlp`,
optional `git: { fetch: true }`.
Validation rules per PRD 0017/0053: empty `auth: {}` is an error,
partial `auth` is an error, auth omission means unauthenticated."""
@@ -302,6 +303,32 @@ class TestDlp(unittest.TestCase):
}}])
class TestGitPolicy(unittest.TestCase):
def test_omitted_means_https_git_fetch_disabled(self):
b = _bottle([{"host": "github.com"}])
self.assertFalse(b.egress.routes[0].GitFetch)
def test_fetch_true_allowed(self):
b = _bottle([{"host": "github.com", "git": {"fetch": True}}])
self.assertTrue(b.egress.routes[0].GitFetch)
def test_fetch_false_allowed(self):
b = _bottle([{"host": "github.com", "git": {"fetch": False}}])
self.assertFalse(b.egress.routes[0].GitFetch)
def test_git_must_be_object(self):
with self.assertRaises(ManifestError):
_bottle([{"host": "github.com", "git": True}])
def test_fetch_must_be_boolean(self):
with self.assertRaises(ManifestError):
_bottle([{"host": "github.com", "git": {"fetch": "yes"}}])
def test_unknown_git_key_rejected(self):
with self.assertRaises(ManifestError):
_bottle([{"host": "github.com", "git": {"push": True}}])
class TestAuth(unittest.TestCase):
def test_omitted_means_no_auth(self):
b = _bottle([{"host": "github.com"}])