diff --git a/bot_bottle/egress.py b/bot_bottle/egress.py index 13718d6..8d29a00 100644 --- a/bot_bottle/egress.py +++ b/bot_bottle/egress.py @@ -210,6 +210,17 @@ def egress_token_env_map( return out +def _yaml_str_escape(s: str) -> str: + """Escape a string for use inside a YAML double-quoted scalar.""" + return ( + s.replace("\\", "\\\\") + .replace('"', '\\"') + .replace("\n", "\\n") + .replace("\r", "\\r") + .replace("\t", "\\t") + ) + + def _route_to_yaml_fields(r: Route) -> dict[str, object]: fields: dict[str, object] = {"host": r.host} if r.auth_scheme and r.token_env: @@ -272,12 +283,12 @@ def _render_match_entry(entry: dict[str, object]) -> list[str]: for pd in entry["paths"]: # type: ignore[union-attr] pd_dict: dict[str, str] = pd # type: ignore[assignment] if "type" in pd_dict: - lines.append(f' - type: "{pd_dict["type"]}"') - lines.append(f' value: "{pd_dict["value"]}"') + lines.append(f' - type: "{_yaml_str_escape(pd_dict["type"])}"') + lines.append(f' value: "{_yaml_str_escape(pd_dict["value"])}"') else: - lines.append(f' - value: "{pd_dict["value"]}"') + lines.append(f' - value: "{_yaml_str_escape(pd_dict["value"])}"') if "methods" in entry: - methods_str = ", ".join(f'"{m}"' for m in entry["methods"]) # type: ignore[union-attr] + methods_str = ", ".join(f'"{_yaml_str_escape(m)}"' for m in entry["methods"]) # type: ignore[union-attr] prefix = " - " if first_key else " " lines.append(f'{prefix}methods: [{methods_str}]') first_key = False @@ -287,8 +298,8 @@ def _render_match_entry(entry: dict[str, object]) -> list[str]: first_key = False for hd in entry["headers"]: # type: ignore[union-attr] hd_dict: dict[str, str] = hd # type: ignore[assignment] - lines.append(f' - name: "{hd_dict["name"]}"') - lines.append(f' value: "{hd_dict["value"]}"') + lines.append(f' - name: "{_yaml_str_escape(hd_dict["name"])}"') + lines.append(f' value: "{_yaml_str_escape(hd_dict["value"])}"') if first_key: lines.append(" - {}") return lines @@ -308,10 +319,10 @@ def egress_render_routes( return "\n".join(lines) + "\n" for r in routes: f = _route_to_yaml_fields(r) - lines.append(f' - host: "{f["host"]}"') + lines.append(f' - host: "{_yaml_str_escape(str(f["host"]))}"') if "auth_scheme" in f: - lines.append(f' auth_scheme: "{f["auth_scheme"]}"') - lines.append(f' token_env: "{f["token_env"]}"') + lines.append(f' auth_scheme: "{_yaml_str_escape(str(f["auth_scheme"]))}"') + lines.append(f' token_env: "{_yaml_str_escape(str(f["token_env"]))}"') if "matches" in f: lines.append(" matches:") for entry in f["matches"]: # type: ignore[union-attr] @@ -331,7 +342,7 @@ def egress_render_routes( items_str = ", ".join(f'"{x}"' for x in dv) lines.append(f" {dk}: [{items_str}]") elif isinstance(dv, str): - lines.append(f' {dk}: "{dv}"') + lines.append(f' {dk}: "{_yaml_str_escape(dv)}"') return "\n".join(lines) + "\n" diff --git a/bot_bottle/git_gate.py b/bot_bottle/git_gate.py index acbf4d4..532c3ff 100644 --- a/bot_bottle/git_gate.py +++ b/bot_bottle/git_gate.py @@ -112,6 +112,15 @@ def git_gate_upstreams_for_bottle(bottle: ManifestBottle) -> tuple[GitGateUpstre ) +def _gitconfig_validate_value(field: str, value: str) -> None: + """Raise ValueError if value contains characters that break gitconfig line syntax.""" + if "\n" in value or "\r" in value: + raise ValueError( + f"git-gate: {field} contains a newline, which would inject " + f"arbitrary gitconfig keys; rejecting manifest entry" + ) + + def git_gate_render_gitconfig( entries: tuple[ManifestGitEntry, ...], gate_host: str, *, scheme: str = "git", ) -> str: @@ -136,6 +145,7 @@ def git_gate_render_gitconfig( "# fetch-from-upstream-before-every-upload-pack via access-hook).\n", ] for entry in entries: + _gitconfig_validate_value(f"repos[{entry.Name!r}].url", entry.Upstream) out.append(f'[url "{scheme}://{gate_host}/{entry.Name}.git"]\n') out.append(f"\tinsteadOf = {entry.Upstream}\n") if entry.RemoteKey and entry.RemoteKey != entry.UpstreamHost: @@ -148,6 +158,7 @@ def git_gate_render_gitconfig( f"ssh://{entry.UpstreamUser}@{entry.RemoteKey}{port}/" f"{entry.UpstreamPath}" ) + _gitconfig_validate_value(f"repos[{entry.Name!r}].url (resolved alias)", alias) out.append(f"\tinsteadOf = {alias}\n") return "".join(out) diff --git a/tests/unit/test_egress.py b/tests/unit/test_egress.py index ce982d9..b225ee3 100644 --- a/tests/unit/test_egress.py +++ b/tests/unit/test_egress.py @@ -10,6 +10,7 @@ from bot_bottle.egress import ( Egress, EgressPlan, EgressRoute, + _yaml_str_escape, egress_agent_env_entries, egress_manifest_routes, egress_render_routes, @@ -419,6 +420,76 @@ class TestRenderRoutes(unittest.TestCase): self.assertEqual(LOG_BLOCKS, cfg.log) +class TestYamlStrEscape(unittest.TestCase): + """_yaml_str_escape produces safe YAML double-quoted scalar content.""" + + def test_plain_string_unchanged(self): + self.assertEqual("api.example.com", _yaml_str_escape("api.example.com")) + + def test_double_quote_escaped(self): + self.assertEqual('\\"', _yaml_str_escape('"')) + + def test_backslash_escaped(self): + self.assertEqual("\\\\", _yaml_str_escape("\\")) + + def test_newline_escaped(self): + self.assertEqual("\\n", _yaml_str_escape("\n")) + + def test_carriage_return_escaped(self): + self.assertEqual("\\r", _yaml_str_escape("\r")) + + def test_tab_escaped(self): + self.assertEqual("\\t", _yaml_str_escape("\t")) + + def test_combined(self): + self.assertEqual('\\"\\n\\\\', _yaml_str_escape('"\n\\')) + + +class TestRenderRoutesEscaping(unittest.TestCase): + """Stray quotes/newlines in manifest strings do not corrupt routes.yaml.""" + + @staticmethod + def _parsed(routes) -> list[dict]: # type: ignore + return parse_yaml_subset(egress_render_routes(routes))["routes"] # type: ignore + + def test_host_with_double_quote_round_trips(self): + routes = (EgressRoute(host='bad"host.example'),) + parsed = self._parsed(routes) + self.assertEqual('bad"host.example', parsed[0]["host"]) + + def test_host_with_newline_round_trips(self): + routes = (EgressRoute(host="host\nextra.example"),) + parsed = self._parsed(routes) + self.assertEqual("host\nextra.example", parsed[0]["host"]) + + def test_auth_scheme_with_double_quote_round_trips(self): + routes = (EgressRoute( + host="api.example", + auth_scheme='Bear"er', + token_env="EGRESS_TOKEN_0", + ),) + parsed = self._parsed(routes) + self.assertEqual('Bear"er', parsed[0]["auth_scheme"]) + + def test_path_value_with_double_quote_round_trips(self): + from bot_bottle.egress_addon_core import PathMatch, MatchEntry + routes = (EgressRoute( + host="api.example", + matches=(MatchEntry(paths=(PathMatch(type="prefix", value='/v1/"quoted"/'),)),), + ),) + parsed = self._parsed(routes) + self.assertEqual('/v1/"quoted"/', parsed[0]["matches"][0]["paths"][0]["value"]) + + def test_header_value_with_double_quote_round_trips(self): + from bot_bottle.egress_addon_core import HeaderMatch, MatchEntry + routes = (EgressRoute( + host="api.example", + matches=(MatchEntry(headers=(HeaderMatch(name="x-h", value='val"ue'),)),), + ),) + parsed = self._parsed(routes) + self.assertEqual('val"ue', parsed[0]["matches"][0]["headers"][0]["value"]) + + class TestResolveTokenValues(unittest.TestCase): def test_reads_host_env(self): out = egress_resolve_token_values( diff --git a/tests/unit/test_provision_git.py b/tests/unit/test_provision_git.py index 6023333..6c1d3c4 100644 --- a/tests/unit/test_provision_git.py +++ b/tests/unit/test_provision_git.py @@ -8,6 +8,7 @@ import unittest from bot_bottle.git_gate import ( GIT_GATE_HOSTNAME, + _gitconfig_validate_value, git_gate_render_gitconfig, ) from bot_bottle.manifest import ManifestIndex @@ -90,5 +91,42 @@ class TestGitGateGitconfigRender(unittest.TestCase): self.assertNotIn("gitea.dideric.is", out) +class TestGitconfigValidateValue(unittest.TestCase): + """_gitconfig_validate_value rejects values that would inject gitconfig keys.""" + + def test_normal_url_passes(self): + _gitconfig_validate_value("url", "ssh://git@github.com/owner/repo.git") + + def test_newline_in_url_raises(self): + with self.assertRaises(ValueError): + _gitconfig_validate_value("url", "ssh://git@github.com/owner/\nrepo.git") + + def test_carriage_return_in_url_raises(self): + with self.assertRaises(ValueError): + _gitconfig_validate_value("url", "ssh://git@github.com/\rrepo.git") + + def test_error_message_names_field(self): + with self.assertRaises(ValueError, msg="error should name the field") as ctx: + _gitconfig_validate_value("repos['bad'].url", "ssh://host/\npath") + self.assertIn("repos['bad'].url", str(ctx.exception)) + + +class TestGitconfigRenderRejectsNewlineInUpstream(unittest.TestCase): + """git_gate_render_gitconfig raises on Upstream values with newlines.""" + + def test_newline_in_upstream_raises(self): + m = ManifestIndex.from_json_obj({ + "bottles": {"dev": {"git-gate": {"repos": { + "evil": { + "url": "ssh://git@github.com/owner/\nfake-key = injected\nrepo.git", + "key": {"provider": "static", "path": "/dev/null"}, + }, + }}}}, + "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, + }) + with self.assertRaises(ValueError): + git_gate_render_gitconfig(m.bottles["dev"].git, GIT_GATE_HOSTNAME) + + if __name__ == "__main__": unittest.main()