From f289b6382cbf6290c45bcfc326d9ea83939e609f Mon Sep 17 00:00:00 2001 From: didericis Date: Thu, 25 Jun 2026 22:04:27 -0400 Subject: [PATCH] test(egress): ratchet egress_addon_core coverage to >=90% Third per-module ratchet under ADR 0004. Add a parsing/serialization suite for the egress engine's core: - route validation rejections: payload/route shape, host, auth pairing, git block, every matches sub-field (paths/methods/headers type + regex-compile + unknown-key), and the dlp block (detector type/name, outbound_on_match, unknown key) - a full valid route round-trips; detectors:false disables - parse_config log-level validation + load_config invalid-YAML - route_to_yaml_dict: minimal/auth/git/dlp/matches with default-omission - evaluate_matches: exact/prefix/regex paths, method filter, exact + regex header matching (match and non-match) egress_addon_core.py: 84% -> 99%. The two remaining missed statements are defensive guards (an unreachable separator-return and a no-matching-path-type fallthrough). Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01NkwFXLFff9PYPy4wgVBJp9 --- tests/unit/test_egress_core_parsing.py | 297 +++++++++++++++++++++++++ 1 file changed, 297 insertions(+) create mode 100644 tests/unit/test_egress_core_parsing.py diff --git a/tests/unit/test_egress_core_parsing.py b/tests/unit/test_egress_core_parsing.py new file mode 100644 index 0000000..cd45632 --- /dev/null +++ b/tests/unit/test_egress_core_parsing.py @@ -0,0 +1,297 @@ +"""Unit: egress_addon_core route parsing, serialization, and match +evaluation error/edge branches (coverage ratchet, ADR 0004). + +Complements test_egress_addon_core.py — focuses on the validation +rejections, the Route->YAML serializer, and evaluate_matches.""" + +from __future__ import annotations + +import unittest + +from bot_bottle.egress_addon_core import ( + HeaderMatch, + MatchEntry, + PathMatch, + Route, + evaluate_matches, + load_config, + parse_config, + parse_routes, + route_to_yaml_dict, +) + + +def _route(d: dict[str, object]) -> Route: + return parse_routes({"routes": [d]})[0] + + +class TestRouteValidationErrors(unittest.TestCase): + def _bad(self, d: dict[str, object]) -> None: + with self.assertRaises(ValueError): + parse_routes({"routes": [d]}) + + # routes-payload shape + def test_payload_not_dict(self) -> None: + with self.assertRaises(ValueError): + parse_routes(["nope"]) + + def test_routes_not_list(self) -> None: + with self.assertRaises(ValueError): + parse_routes({"routes": "nope"}) + + def test_route_not_dict(self) -> None: + with self.assertRaises(ValueError): + parse_routes({"routes": ["nope"]}) + + def test_host_missing(self) -> None: + self._bad({}) + + def test_unknown_route_key(self) -> None: + self._bad({"host": "h", "bogus": 1}) + + # auth + def test_auth_scheme_without_token_env(self) -> None: + self._bad({"host": "h", "auth_scheme": "Bearer"}) + + def test_auth_scheme_wrong_type(self) -> None: + self._bad({"host": "h", "auth_scheme": 5, "token_env": "T"}) + + # git + def test_git_not_dict(self) -> None: + self._bad({"host": "h", "git": "yes"}) + + def test_git_fetch_not_bool(self) -> None: + self._bad({"host": "h", "git": {"fetch": "yes"}}) + + def test_git_unknown_key(self) -> None: + self._bad({"host": "h", "git": {"fetch": True, "push": True}}) + + # matches: paths + def test_matches_not_list(self) -> None: + self._bad({"host": "h", "matches": "x"}) + + def test_match_entry_not_dict(self) -> None: + self._bad({"host": "h", "matches": ["x"]}) + + def test_paths_not_list(self) -> None: + self._bad({"host": "h", "matches": [{"paths": "x"}]}) + + def test_path_not_dict(self) -> None: + self._bad({"host": "h", "matches": [{"paths": ["x"]}]}) + + def test_path_bad_type(self) -> None: + self._bad({"host": "h", "matches": [{"paths": [{"type": "bogus", "value": "/x"}]}]}) + + def test_path_empty_value(self) -> None: + self._bad({"host": "h", "matches": [{"paths": [{"value": ""}]}]}) + + def test_path_value_missing_slash(self) -> None: + self._bad({"host": "h", "matches": [{"paths": [{"type": "prefix", "value": "x"}]}]}) + + def test_path_bad_regex(self) -> None: + self._bad({"host": "h", "matches": [{"paths": [{"type": "regex", "value": "("}]}]}) + + def test_path_unknown_key(self) -> None: + self._bad({"host": "h", "matches": [{"paths": [{"value": "/x", "z": 1}]}]}) + + # matches: methods + def test_methods_not_list(self) -> None: + self._bad({"host": "h", "matches": [{"methods": "GET"}]}) + + def test_method_not_string(self) -> None: + self._bad({"host": "h", "matches": [{"methods": [5]}]}) + + def test_method_invalid(self) -> None: + self._bad({"host": "h", "matches": [{"methods": ["FETCH"]}]}) + + # matches: headers + def test_headers_not_list(self) -> None: + self._bad({"host": "h", "matches": [{"headers": "x"}]}) + + def test_header_not_dict(self) -> None: + self._bad({"host": "h", "matches": [{"headers": ["x"]}]}) + + def test_header_name_empty(self) -> None: + self._bad({"host": "h", "matches": [{"headers": [{"name": "", "value": "v"}]}]}) + + def test_header_value_not_string(self) -> None: + self._bad({"host": "h", "matches": [{"headers": [{"name": "X", "value": 1}]}]}) + + def test_header_bad_type(self) -> None: + self._bad({"host": "h", "matches": [{"headers": [{"name": "X", "value": "v", "type": "z"}]}]}) + + def test_header_bad_regex(self) -> None: + self._bad({"host": "h", "matches": [{"headers": [{"name": "X", "value": "(", "type": "regex"}]}]}) + + def test_header_unknown_key(self) -> None: + self._bad({"host": "h", "matches": [{"headers": [{"name": "X", "value": "v", "z": 1}]}]}) + + # dlp + def test_dlp_not_dict(self) -> None: + self._bad({"host": "h", "dlp": "x"}) + + def test_dlp_detectors_wrong_type(self) -> None: + self._bad({"host": "h", "dlp": {"outbound_detectors": "x"}}) + + def test_dlp_detector_name_invalid(self) -> None: + self._bad({"host": "h", "dlp": {"outbound_detectors": ["bogus"]}}) + + def test_dlp_detector_item_not_string(self) -> None: + self._bad({"host": "h", "dlp": {"outbound_detectors": [5]}}) + + def test_dlp_on_match_invalid(self) -> None: + self._bad({"host": "h", "dlp": {"outbound_on_match": "maybe"}}) + + def test_dlp_unknown_key(self) -> None: + self._bad({"host": "h", "dlp": {"bogus": 1}}) + + +class TestRouteValidAccepts(unittest.TestCase): + def test_full_route_parses(self) -> None: + r = _route({ + "host": "api.example.com", + "auth_scheme": "Bearer", + "token_env": "TOK", + "matches": [{ + "paths": [{"type": "exact", "value": "/v1"}], + "methods": ["get", "post"], + "headers": [{"name": "X-Env", "value": "prod"}], + }], + "git": {"fetch": True}, + "dlp": { + "outbound_detectors": ["token_patterns"], + "inbound_detectors": ["naive_injection_detection"], + "outbound_on_match": "block", + }, + }) + self.assertEqual("api.example.com", r.host) + self.assertEqual(("GET", "POST"), r.matches[0].methods) + self.assertTrue(r.git_fetch) + self.assertEqual("block", r.outbound_on_match) + + def test_dlp_detectors_false_disables(self) -> None: + r = _route({"host": "h", "dlp": {"outbound_detectors": False}}) + self.assertEqual((), r.outbound_detectors) + + +class TestParseConfig(unittest.TestCase): + def test_log_must_be_valid_level(self) -> None: + with self.assertRaises(ValueError): + parse_config({"log": 5, "routes": []}) + + def test_log_true_rejected(self) -> None: + with self.assertRaises(ValueError): + parse_config({"log": True, "routes": []}) + + def test_top_level_not_dict(self) -> None: + with self.assertRaises(ValueError): + parse_config(["x"]) + + def test_load_config_invalid_yaml(self) -> None: + with self.assertRaises(ValueError): + load_config("routes: [unterminated\n") + + +class TestRouteToYamlDict(unittest.TestCase): + def test_minimal(self) -> None: + self.assertEqual({"host": "h"}, route_to_yaml_dict(Route(host="h"))) + + def test_auth_fields(self) -> None: + d = route_to_yaml_dict(Route(host="h", auth_scheme="Bearer", token_env="T")) + self.assertEqual("Bearer", d["auth_scheme"]) + self.assertEqual("T", d["token_env"]) + + def test_git_fetch(self) -> None: + d = route_to_yaml_dict(Route(host="h", git_fetch=True)) + self.assertEqual({"fetch": True}, d["git"]) + + def test_dlp_fields(self) -> None: + d = route_to_yaml_dict(Route( + host="h", + outbound_detectors=("token_patterns",), + inbound_detectors=("naive_injection_detection",), + outbound_on_match="redact", + )) + self.assertEqual( + { + "outbound_detectors": ["token_patterns"], + "inbound_detectors": ["naive_injection_detection"], + "outbound_on_match": "redact", + }, + d["dlp"], + ) + + def test_matches_serialization_omits_defaults(self) -> None: + route = Route(host="h", matches=(MatchEntry( + paths=( + PathMatch(type="prefix", value="/p"), # default type -> omitted + PathMatch(type="exact", value="/e"), # non-default -> kept + ), + methods=("GET",), + headers=( + HeaderMatch(name="X", value="v"), # exact -> omitted + HeaderMatch(name="Y", value="r", type="regex"), # regex -> kept + ), + ),)) + d = route_to_yaml_dict(route) + matches = d["matches"] + assert isinstance(matches, list) + entry = matches[0] + self.assertEqual( + [{"value": "/p"}, {"value": "/e", "type": "exact"}], + entry["paths"], + ) + self.assertEqual(["GET"], entry["methods"]) + self.assertEqual( + [{"name": "X", "value": "v"}, {"name": "Y", "value": "r", "type": "regex"}], + entry["headers"], + ) + + +class TestEvaluateMatches(unittest.TestCase): + def _route_with(self, entry: MatchEntry) -> Route: + return Route(host="h", matches=(entry,)) + + def test_empty_matches_allows_all(self) -> None: + self.assertTrue(evaluate_matches(Route(host="h"), "/anything", "GET")) + + def test_exact_path(self) -> None: + r = self._route_with(MatchEntry(paths=(PathMatch("exact", "/a"),))) + self.assertTrue(evaluate_matches(r, "/a", "GET")) + self.assertFalse(evaluate_matches(r, "/a/b", "GET")) + + def test_prefix_path_boundary(self) -> None: + r = self._route_with(MatchEntry(paths=(PathMatch("prefix", "/a"),))) + self.assertTrue(evaluate_matches(r, "/a/b", "GET")) + self.assertFalse(evaluate_matches(r, "/ab", "GET")) + + def test_regex_path(self) -> None: + import re + r = self._route_with(MatchEntry( + paths=(PathMatch("regex", r"/v\d+", compiled=re.compile(r"/v\d+")),), + )) + self.assertTrue(evaluate_matches(r, "/v1", "GET")) + self.assertFalse(evaluate_matches(r, "/x", "GET")) + + def test_method_filter(self) -> None: + r = self._route_with(MatchEntry(methods=("POST",))) + self.assertTrue(evaluate_matches(r, "/x", "post")) + self.assertFalse(evaluate_matches(r, "/x", "GET")) + + def test_header_exact(self) -> None: + r = self._route_with(MatchEntry(headers=(HeaderMatch("X-Env", "prod"),))) + self.assertTrue(evaluate_matches(r, "/x", "GET", {"x-env": "prod"})) + self.assertFalse(evaluate_matches(r, "/x", "GET", {"x-env": "dev"})) + self.assertFalse(evaluate_matches(r, "/x", "GET", {})) + + def test_header_regex(self) -> None: + import re + r = self._route_with(MatchEntry( + headers=(HeaderMatch("X-Env", r"pr.*", type="regex", compiled=re.compile(r"pr.*")),), + )) + self.assertTrue(evaluate_matches(r, "/x", "GET", {"x-env": "prod"})) + self.assertFalse(evaluate_matches(r, "/x", "GET", {"x-env": "dev"})) + + +if __name__ == "__main__": + unittest.main() -- 2.52.0