fix(lint): resolve pylint and pyright issues on egress-log-option
- egress.py: extract _render_match_entry helper to reduce nesting depth - egress_addon_core.py: make request_method/request_headers keyword-only to satisfy too-many-positional-arguments; wrap long lazy import lines - egress_addon.py: remove unused Route import; add pylint disable for import-error on sidecar-only mitmproxy/egress_addon_core imports - dlp_detectors.py: remove dead _min_distance function (superseded by _closest_pair) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -147,22 +147,6 @@ JAILBREAK_PHRASES: tuple[re.Pattern[str], ...] = (
|
|||||||
PROXIMITY_CHARS = 500
|
PROXIMITY_CHARS = 500
|
||||||
|
|
||||||
|
|
||||||
def _min_distance(
|
|
||||||
a_matches: list[re.Match[str]],
|
|
||||||
b_matches: list[re.Match[str]],
|
|
||||||
) -> int | None:
|
|
||||||
"""Smallest char distance between any pair of matches."""
|
|
||||||
if not a_matches or not b_matches:
|
|
||||||
return None
|
|
||||||
best = None
|
|
||||||
for a in a_matches:
|
|
||||||
for b in b_matches:
|
|
||||||
gap = max(0, max(a.start(), b.start()) - min(a.end(), b.end()))
|
|
||||||
if best is None or gap < best:
|
|
||||||
best = gap
|
|
||||||
return best
|
|
||||||
|
|
||||||
|
|
||||||
def _closest_pair(
|
def _closest_pair(
|
||||||
a_matches: list[re.Match[str]],
|
a_matches: list[re.Match[str]],
|
||||||
b_matches: list[re.Match[str]],
|
b_matches: list[re.Match[str]],
|
||||||
|
|||||||
+33
-32
@@ -189,6 +189,37 @@ def _route_to_yaml_fields(r: Route) -> dict[str, object]:
|
|||||||
return fields
|
return fields
|
||||||
|
|
||||||
|
|
||||||
|
def _render_match_entry(entry: dict[str, object]) -> list[str]:
|
||||||
|
lines: list[str] = []
|
||||||
|
first_key = True
|
||||||
|
if "paths" in entry:
|
||||||
|
lines.append(" - paths:")
|
||||||
|
first_key = False
|
||||||
|
for pd in entry["paths"]: # type: ignore[union-attr]
|
||||||
|
pd_dict: dict[str, str] = pd # type: ignore[assignment]
|
||||||
|
if "type" in pd_dict:
|
||||||
|
lines.append(f' - type: "{pd_dict["type"]}"')
|
||||||
|
lines.append(f' value: "{pd_dict["value"]}"')
|
||||||
|
else:
|
||||||
|
lines.append(f' - value: "{pd_dict["value"]}"')
|
||||||
|
if "methods" in entry:
|
||||||
|
methods_str = ", ".join(f'"{m}"' for m in entry["methods"]) # type: ignore[union-attr]
|
||||||
|
prefix = " - " if first_key else " "
|
||||||
|
lines.append(f'{prefix}methods: [{methods_str}]')
|
||||||
|
first_key = False
|
||||||
|
if "headers" in entry:
|
||||||
|
prefix = " - " if first_key else " "
|
||||||
|
lines.append(f"{prefix}headers:")
|
||||||
|
first_key = False
|
||||||
|
for hd in entry["headers"]: # type: ignore[union-attr]
|
||||||
|
hd_dict: dict[str, str] = hd # type: ignore[assignment]
|
||||||
|
lines.append(f' - name: "{hd_dict["name"]}"')
|
||||||
|
lines.append(f' value: "{hd_dict["value"]}"')
|
||||||
|
if first_key:
|
||||||
|
lines.append(" - {}")
|
||||||
|
return lines
|
||||||
|
|
||||||
|
|
||||||
def egress_render_routes(
|
def egress_render_routes(
|
||||||
routes: tuple[EgressRoute, ...],
|
routes: tuple[EgressRoute, ...],
|
||||||
*,
|
*,
|
||||||
@@ -209,38 +240,8 @@ def egress_render_routes(
|
|||||||
lines.append(f' token_env: "{f["token_env"]}"')
|
lines.append(f' token_env: "{f["token_env"]}"')
|
||||||
if "matches" in f:
|
if "matches" in f:
|
||||||
lines.append(" matches:")
|
lines.append(" matches:")
|
||||||
for entry in f["matches"]: # type: ignore
|
for entry in f["matches"]: # type: ignore[union-attr]
|
||||||
entry_dict: dict[str, object] = entry # type: ignore
|
lines.extend(_render_match_entry(entry)) # type: ignore[arg-type]
|
||||||
first_key = True
|
|
||||||
if "paths" in entry_dict:
|
|
||||||
lines.append(" - paths:")
|
|
||||||
first_key = False
|
|
||||||
for pd in entry_dict["paths"]: # type: ignore
|
|
||||||
pd_dict: dict[str, str] = pd # type: ignore
|
|
||||||
if "type" in pd_dict:
|
|
||||||
lines.append(f' - type: "{pd_dict["type"]}"')
|
|
||||||
lines.append(f' value: "{pd_dict["value"]}"')
|
|
||||||
else:
|
|
||||||
lines.append(f' - value: "{pd_dict["value"]}"')
|
|
||||||
if "methods" in entry_dict:
|
|
||||||
methods_str = ", ".join(
|
|
||||||
f'"{m}"' for m in entry_dict["methods"] # type: ignore
|
|
||||||
)
|
|
||||||
prefix = " - " if first_key else " "
|
|
||||||
lines.append(f'{prefix}methods: [{methods_str}]')
|
|
||||||
first_key = False
|
|
||||||
if "headers" in entry_dict:
|
|
||||||
prefix = " - " if first_key else " "
|
|
||||||
lines.append(f"{prefix}headers:")
|
|
||||||
first_key = False
|
|
||||||
for hd in entry_dict["headers"]: # type: ignore
|
|
||||||
hd_dict: dict[str, str] = hd # type: ignore
|
|
||||||
lines.append(f' - name: "{hd_dict["name"]}"')
|
|
||||||
lines.append(f' value: "{hd_dict["value"]}"')
|
|
||||||
if "type" in hd_dict:
|
|
||||||
lines.append(f' type: "{hd_dict["type"]}"')
|
|
||||||
if first_key:
|
|
||||||
lines.append(" - {}")
|
|
||||||
if "dlp" in f:
|
if "dlp" in f:
|
||||||
dlp_dict: dict[str, object] = f["dlp"] # type: ignore
|
dlp_dict: dict[str, object] = f["dlp"] # type: ignore
|
||||||
lines.append(" dlp:")
|
lines.append(" dlp:")
|
||||||
|
|||||||
@@ -12,13 +12,12 @@ import signal
|
|||||||
import sys
|
import sys
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from mitmproxy import http # type: ignore[import-not-found]
|
from mitmproxy import http # type: ignore[import-not-found] # pylint: disable=import-error
|
||||||
|
|
||||||
from egress_addon_core import ( # type: ignore[import-not-found]
|
from egress_addon_core import ( # type: ignore[import-not-found] # pylint: disable=import-error
|
||||||
LOG_BLOCKS,
|
LOG_BLOCKS,
|
||||||
LOG_FULL,
|
LOG_FULL,
|
||||||
Config,
|
Config,
|
||||||
Route,
|
|
||||||
decide,
|
decide,
|
||||||
is_git_push_request,
|
is_git_push_request,
|
||||||
load_config,
|
load_config,
|
||||||
|
|||||||
@@ -470,6 +470,7 @@ def decide(
|
|||||||
request_host: str,
|
request_host: str,
|
||||||
request_path: str,
|
request_path: str,
|
||||||
environ: typing.Mapping[str, str],
|
environ: typing.Mapping[str, str],
|
||||||
|
*,
|
||||||
request_method: str = "GET",
|
request_method: str = "GET",
|
||||||
request_headers: typing.Mapping[str, str] | None = None,
|
request_headers: typing.Mapping[str, str] | None = None,
|
||||||
) -> Decision:
|
) -> Decision:
|
||||||
@@ -537,9 +538,13 @@ def scan_outbound(
|
|||||||
# Lazy import to avoid circular deps and keep dlp_detectors optional
|
# Lazy import to avoid circular deps and keep dlp_detectors optional
|
||||||
# at import time (the sidecar copies it flat alongside this file).
|
# at import time (the sidecar copies it flat alongside this file).
|
||||||
try:
|
try:
|
||||||
from dlp_detectors import scan_token_patterns, scan_known_secrets # type: ignore[import-not-found]
|
from dlp_detectors import ( # type: ignore[import-not-found]
|
||||||
|
scan_token_patterns, scan_known_secrets,
|
||||||
|
)
|
||||||
except ImportError: # pragma: no cover - host-side path
|
except ImportError: # pragma: no cover - host-side path
|
||||||
from .dlp_detectors import scan_token_patterns, scan_known_secrets # type: ignore[import-not-found]
|
from .dlp_detectors import ( # type: ignore[import-not-found]
|
||||||
|
scan_token_patterns, scan_known_secrets,
|
||||||
|
)
|
||||||
|
|
||||||
text = body if isinstance(body, str) else body.decode("utf-8", errors="replace")
|
text = body if isinstance(body, str) else body.decode("utf-8", errors="replace")
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user