fix: escape quotes/newlines in YAML and gitconfig emitters
lint / lint (push) Successful in 1m46s
test / unit (pull_request) Successful in 30s
test / integration (pull_request) Successful in 16s

Closes #258.

`egress_render_routes` and `_render_match_entry` now pass all manifest
strings (host, auth_scheme, token_env, path/header values) through
`_yaml_str_escape` before interpolating into double-quoted YAML scalars,
preventing stray `"` or newlines from corrupting routes.yaml.

`git_gate_render_gitconfig` now calls `_gitconfig_validate_value` on
each Upstream value (and the derived alias) before writing the
`insteadOf` line, rejecting any value containing a newline that would
inject arbitrary gitconfig keys.
This commit is contained in:
2026-06-25 07:42:31 +00:00
parent fd6b14fb32
commit 43e8c5244c
4 changed files with 141 additions and 10 deletions
+21 -10
View File
@@ -210,6 +210,17 @@ def egress_token_env_map(
return out return out
def _yaml_str_escape(s: str) -> str:
"""Escape a string for use inside a YAML double-quoted scalar."""
return (
s.replace("\\", "\\\\")
.replace('"', '\\"')
.replace("\n", "\\n")
.replace("\r", "\\r")
.replace("\t", "\\t")
)
def _route_to_yaml_fields(r: Route) -> dict[str, object]: def _route_to_yaml_fields(r: Route) -> dict[str, object]:
fields: dict[str, object] = {"host": r.host} fields: dict[str, object] = {"host": r.host}
if r.auth_scheme and r.token_env: if r.auth_scheme and r.token_env:
@@ -272,12 +283,12 @@ def _render_match_entry(entry: dict[str, object]) -> list[str]:
for pd in entry["paths"]: # type: ignore[union-attr] for pd in entry["paths"]: # type: ignore[union-attr]
pd_dict: dict[str, str] = pd # type: ignore[assignment] pd_dict: dict[str, str] = pd # type: ignore[assignment]
if "type" in pd_dict: if "type" in pd_dict:
lines.append(f' - type: "{pd_dict["type"]}"') lines.append(f' - type: "{_yaml_str_escape(pd_dict["type"])}"')
lines.append(f' value: "{pd_dict["value"]}"') lines.append(f' value: "{_yaml_str_escape(pd_dict["value"])}"')
else: else:
lines.append(f' - value: "{pd_dict["value"]}"') lines.append(f' - value: "{_yaml_str_escape(pd_dict["value"])}"')
if "methods" in entry: if "methods" in entry:
methods_str = ", ".join(f'"{m}"' for m in entry["methods"]) # type: ignore[union-attr] methods_str = ", ".join(f'"{_yaml_str_escape(m)}"' for m in entry["methods"]) # type: ignore[union-attr]
prefix = " - " if first_key else " " prefix = " - " if first_key else " "
lines.append(f'{prefix}methods: [{methods_str}]') lines.append(f'{prefix}methods: [{methods_str}]')
first_key = False first_key = False
@@ -287,8 +298,8 @@ def _render_match_entry(entry: dict[str, object]) -> list[str]:
first_key = False first_key = False
for hd in entry["headers"]: # type: ignore[union-attr] for hd in entry["headers"]: # type: ignore[union-attr]
hd_dict: dict[str, str] = hd # type: ignore[assignment] hd_dict: dict[str, str] = hd # type: ignore[assignment]
lines.append(f' - name: "{hd_dict["name"]}"') lines.append(f' - name: "{_yaml_str_escape(hd_dict["name"])}"')
lines.append(f' value: "{hd_dict["value"]}"') lines.append(f' value: "{_yaml_str_escape(hd_dict["value"])}"')
if first_key: if first_key:
lines.append(" - {}") lines.append(" - {}")
return lines return lines
@@ -308,10 +319,10 @@ def egress_render_routes(
return "\n".join(lines) + "\n" return "\n".join(lines) + "\n"
for r in routes: for r in routes:
f = _route_to_yaml_fields(r) f = _route_to_yaml_fields(r)
lines.append(f' - host: "{f["host"]}"') lines.append(f' - host: "{_yaml_str_escape(str(f["host"]))}"')
if "auth_scheme" in f: if "auth_scheme" in f:
lines.append(f' auth_scheme: "{f["auth_scheme"]}"') lines.append(f' auth_scheme: "{_yaml_str_escape(str(f["auth_scheme"]))}"')
lines.append(f' token_env: "{f["token_env"]}"') lines.append(f' token_env: "{_yaml_str_escape(str(f["token_env"]))}"')
if "matches" in f: if "matches" in f:
lines.append(" matches:") lines.append(" matches:")
for entry in f["matches"]: # type: ignore[union-attr] for entry in f["matches"]: # type: ignore[union-attr]
@@ -331,7 +342,7 @@ def egress_render_routes(
items_str = ", ".join(f'"{x}"' for x in dv) items_str = ", ".join(f'"{x}"' for x in dv)
lines.append(f" {dk}: [{items_str}]") lines.append(f" {dk}: [{items_str}]")
elif isinstance(dv, str): elif isinstance(dv, str):
lines.append(f' {dk}: "{dv}"') lines.append(f' {dk}: "{_yaml_str_escape(dv)}"')
return "\n".join(lines) + "\n" return "\n".join(lines) + "\n"
+11
View File
@@ -112,6 +112,15 @@ def git_gate_upstreams_for_bottle(bottle: ManifestBottle) -> tuple[GitGateUpstre
) )
def _gitconfig_validate_value(field: str, value: str) -> None:
"""Raise ValueError if value contains characters that break gitconfig line syntax."""
if "\n" in value or "\r" in value:
raise ValueError(
f"git-gate: {field} contains a newline, which would inject "
f"arbitrary gitconfig keys; rejecting manifest entry"
)
def git_gate_render_gitconfig( def git_gate_render_gitconfig(
entries: tuple[ManifestGitEntry, ...], gate_host: str, *, scheme: str = "git", entries: tuple[ManifestGitEntry, ...], gate_host: str, *, scheme: str = "git",
) -> str: ) -> str:
@@ -136,6 +145,7 @@ def git_gate_render_gitconfig(
"# fetch-from-upstream-before-every-upload-pack via access-hook).\n", "# fetch-from-upstream-before-every-upload-pack via access-hook).\n",
] ]
for entry in entries: for entry in entries:
_gitconfig_validate_value(f"repos[{entry.Name!r}].url", entry.Upstream)
out.append(f'[url "{scheme}://{gate_host}/{entry.Name}.git"]\n') out.append(f'[url "{scheme}://{gate_host}/{entry.Name}.git"]\n')
out.append(f"\tinsteadOf = {entry.Upstream}\n") out.append(f"\tinsteadOf = {entry.Upstream}\n")
if entry.RemoteKey and entry.RemoteKey != entry.UpstreamHost: if entry.RemoteKey and entry.RemoteKey != entry.UpstreamHost:
@@ -148,6 +158,7 @@ def git_gate_render_gitconfig(
f"ssh://{entry.UpstreamUser}@{entry.RemoteKey}{port}/" f"ssh://{entry.UpstreamUser}@{entry.RemoteKey}{port}/"
f"{entry.UpstreamPath}" f"{entry.UpstreamPath}"
) )
_gitconfig_validate_value(f"repos[{entry.Name!r}].url (resolved alias)", alias)
out.append(f"\tinsteadOf = {alias}\n") out.append(f"\tinsteadOf = {alias}\n")
return "".join(out) return "".join(out)
+71
View File
@@ -10,6 +10,7 @@ from bot_bottle.egress import (
Egress, Egress,
EgressPlan, EgressPlan,
EgressRoute, EgressRoute,
_yaml_str_escape,
egress_agent_env_entries, egress_agent_env_entries,
egress_manifest_routes, egress_manifest_routes,
egress_render_routes, egress_render_routes,
@@ -419,6 +420,76 @@ class TestRenderRoutes(unittest.TestCase):
self.assertEqual(LOG_BLOCKS, cfg.log) self.assertEqual(LOG_BLOCKS, cfg.log)
class TestYamlStrEscape(unittest.TestCase):
"""_yaml_str_escape produces safe YAML double-quoted scalar content."""
def test_plain_string_unchanged(self):
self.assertEqual("api.example.com", _yaml_str_escape("api.example.com"))
def test_double_quote_escaped(self):
self.assertEqual('\\"', _yaml_str_escape('"'))
def test_backslash_escaped(self):
self.assertEqual("\\\\", _yaml_str_escape("\\"))
def test_newline_escaped(self):
self.assertEqual("\\n", _yaml_str_escape("\n"))
def test_carriage_return_escaped(self):
self.assertEqual("\\r", _yaml_str_escape("\r"))
def test_tab_escaped(self):
self.assertEqual("\\t", _yaml_str_escape("\t"))
def test_combined(self):
self.assertEqual('\\"\\n\\\\', _yaml_str_escape('"\n\\'))
class TestRenderRoutesEscaping(unittest.TestCase):
"""Stray quotes/newlines in manifest strings do not corrupt routes.yaml."""
@staticmethod
def _parsed(routes) -> list[dict]: # type: ignore
return parse_yaml_subset(egress_render_routes(routes))["routes"] # type: ignore
def test_host_with_double_quote_round_trips(self):
routes = (EgressRoute(host='bad"host.example'),)
parsed = self._parsed(routes)
self.assertEqual('bad"host.example', parsed[0]["host"])
def test_host_with_newline_round_trips(self):
routes = (EgressRoute(host="host\nextra.example"),)
parsed = self._parsed(routes)
self.assertEqual("host\nextra.example", parsed[0]["host"])
def test_auth_scheme_with_double_quote_round_trips(self):
routes = (EgressRoute(
host="api.example",
auth_scheme='Bear"er',
token_env="EGRESS_TOKEN_0",
),)
parsed = self._parsed(routes)
self.assertEqual('Bear"er', parsed[0]["auth_scheme"])
def test_path_value_with_double_quote_round_trips(self):
from bot_bottle.egress_addon_core import PathMatch, MatchEntry
routes = (EgressRoute(
host="api.example",
matches=(MatchEntry(paths=(PathMatch(type="prefix", value='/v1/"quoted"/'),)),),
),)
parsed = self._parsed(routes)
self.assertEqual('/v1/"quoted"/', parsed[0]["matches"][0]["paths"][0]["value"])
def test_header_value_with_double_quote_round_trips(self):
from bot_bottle.egress_addon_core import HeaderMatch, MatchEntry
routes = (EgressRoute(
host="api.example",
matches=(MatchEntry(headers=(HeaderMatch(name="x-h", value='val"ue'),)),),
),)
parsed = self._parsed(routes)
self.assertEqual('val"ue', parsed[0]["matches"][0]["headers"][0]["value"])
class TestResolveTokenValues(unittest.TestCase): class TestResolveTokenValues(unittest.TestCase):
def test_reads_host_env(self): def test_reads_host_env(self):
out = egress_resolve_token_values( out = egress_resolve_token_values(
+38
View File
@@ -8,6 +8,7 @@ import unittest
from bot_bottle.git_gate import ( from bot_bottle.git_gate import (
GIT_GATE_HOSTNAME, GIT_GATE_HOSTNAME,
_gitconfig_validate_value,
git_gate_render_gitconfig, git_gate_render_gitconfig,
) )
from bot_bottle.manifest import ManifestIndex from bot_bottle.manifest import ManifestIndex
@@ -90,5 +91,42 @@ class TestGitGateGitconfigRender(unittest.TestCase):
self.assertNotIn("gitea.dideric.is", out) self.assertNotIn("gitea.dideric.is", out)
class TestGitconfigValidateValue(unittest.TestCase):
"""_gitconfig_validate_value rejects values that would inject gitconfig keys."""
def test_normal_url_passes(self):
_gitconfig_validate_value("url", "ssh://git@github.com/owner/repo.git")
def test_newline_in_url_raises(self):
with self.assertRaises(ValueError):
_gitconfig_validate_value("url", "ssh://git@github.com/owner/\nrepo.git")
def test_carriage_return_in_url_raises(self):
with self.assertRaises(ValueError):
_gitconfig_validate_value("url", "ssh://git@github.com/\rrepo.git")
def test_error_message_names_field(self):
with self.assertRaises(ValueError, msg="error should name the field") as ctx:
_gitconfig_validate_value("repos['bad'].url", "ssh://host/\npath")
self.assertIn("repos['bad'].url", str(ctx.exception))
class TestGitconfigRenderRejectsNewlineInUpstream(unittest.TestCase):
"""git_gate_render_gitconfig raises on Upstream values with newlines."""
def test_newline_in_upstream_raises(self):
m = ManifestIndex.from_json_obj({
"bottles": {"dev": {"git-gate": {"repos": {
"evil": {
"url": "ssh://git@github.com/owner/\nfake-key = injected\nrepo.git",
"key": {"provider": "static", "path": "/dev/null"},
},
}}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
})
with self.assertRaises(ValueError):
git_gate_render_gitconfig(m.bottles["dev"].git, GIT_GATE_HOSTNAME)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()