fix: resolve all pyright errors in pipelock.py

- Add cast import and use for dict/list access from object types
- Cast after isinstance checks in helper functions (_required_dict, _required_str_list)
- Cast dict and list values extracted from cfg in pipelock_render_yaml
- Fix list comprehension type issue by casting to list[object] first
- Fixes 14 pyright errors in YAML rendering code

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-06-03 23:37:23 -04:00
parent b032ff746d
commit 73a4fbe0a7
+27 -32
View File
@@ -19,6 +19,7 @@ from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import cast
from .egress import EgressRoute, egress_routes_for_bottle
from .supervise import SUPERVISE_HOSTNAME
@@ -259,7 +260,7 @@ def _required_dict(
value = obj.get(key)
if not isinstance(value, dict):
raise _pipelock_render_error(section, key, "a mapping")
return value
return cast(dict[str, object], value)
def _required_bool(obj: dict[str, object], section: str, key: str) -> bool:
@@ -289,9 +290,12 @@ def _required_str_list(
key: str,
) -> list[str]:
value = obj.get(key)
if not isinstance(value, list) or not all(isinstance(v, str) for v in value):
if not isinstance(value, list):
raise _pipelock_render_error(section, key, "a list of strings")
return value
value_list = cast(list[object], value)
if not all(isinstance(v, str) for v in value_list):
raise _pipelock_render_error(section, key, "a list of strings")
return cast(list[str], value)
def _optional_str_list(
@@ -407,49 +411,42 @@ def pipelock_render_yaml(cfg: dict[str, object]) -> str:
lines: list[str] = []
lines.append(f"version: {cfg['version']}")
lines.append(f"mode: {cfg['mode']}")
lines.append(f"enforce: {_bool(cfg['enforce'])}")
lines.append(f"enforce: {_bool(cast(bool, cfg['enforce']))}")
lines.append("")
lines.append("api_allowlist:")
api_allowlist = cfg["api_allowlist"]
assert isinstance(api_allowlist, list)
api_allowlist = cast(list[str], cfg["api_allowlist"])
for h in api_allowlist:
lines.append(f' - "{h}"')
lines.append("")
if "seed_phrase_detection" in cfg:
lines.append("seed_phrase_detection:")
spd = cfg["seed_phrase_detection"]
assert isinstance(spd, dict)
lines.append(f" enabled: {_bool(spd['enabled'])}")
spd = cast(dict[str, object], cfg["seed_phrase_detection"])
lines.append(f" enabled: {_bool(cast(bool, spd['enabled']))}")
lines.append("")
lines.append("forward_proxy:")
fp = cfg["forward_proxy"]
assert isinstance(fp, dict)
lines.append(f" enabled: {_bool(fp['enabled'])}")
fp = cast(dict[str, object], cfg["forward_proxy"])
lines.append(f" enabled: {_bool(cast(bool, fp['enabled']))}")
lines.append("")
lines.append("dlp:")
dlp = cfg["dlp"]
assert isinstance(dlp, dict)
lines.append(f" include_defaults: {_bool(dlp['include_defaults'])}")
lines.append(f" scan_env: {_bool(dlp['scan_env'])}")
dlp = cast(dict[str, object], cfg["dlp"])
lines.append(f" include_defaults: {_bool(cast(bool, dlp['include_defaults']))}")
lines.append(f" scan_env: {_bool(cast(bool, dlp['scan_env']))}")
lines.append("")
lines.append("request_body_scanning:")
rbs = cfg["request_body_scanning"]
assert isinstance(rbs, dict)
lines.append(f' action: "{rbs["action"]}"')
rbs = cast(dict[str, object], cfg["request_body_scanning"])
lines.append(f' action: "{cast(str, rbs["action"])}"')
if "scan_headers" in rbs:
lines.append(f" scan_headers: {_bool(rbs['scan_headers'])}")
lines.append(f" scan_headers: {_bool(cast(bool, rbs['scan_headers']))}")
if "header_mode" in rbs:
lines.append(f' header_mode: "{rbs["header_mode"]}"')
lines.append(f' header_mode: "{cast(str, rbs["header_mode"])}"')
if "tls_interception" in cfg:
lines.append("")
lines.append("tls_interception:")
tls = cfg["tls_interception"]
assert isinstance(tls, dict)
lines.append(f" enabled: {_bool(tls['enabled'])}")
lines.append(f' ca_cert: "{tls["ca_cert"]}"')
lines.append(f' ca_key: "{tls["ca_key"]}"')
passthrough = tls["passthrough_domains"]
assert isinstance(passthrough, list)
tls = cast(dict[str, object], cfg["tls_interception"])
lines.append(f" enabled: {_bool(cast(bool, tls['enabled']))}")
lines.append(f' ca_cert: "{cast(str, tls["ca_cert"])}"')
lines.append(f' ca_key: "{cast(str, tls["ca_key"])}"')
passthrough = cast(list[str], tls["passthrough_domains"])
if passthrough:
lines.append(" passthrough_domains:")
for d in passthrough:
@@ -457,11 +454,9 @@ def pipelock_render_yaml(cfg: dict[str, object]) -> str:
if "ssrf" in cfg:
lines.append("")
lines.append("ssrf:")
ssrf = cfg["ssrf"]
assert isinstance(ssrf, dict)
ssrf = cast(dict[str, object], cfg["ssrf"])
lines.append(" ip_allowlist:")
ip_allowlist = ssrf["ip_allowlist"]
assert isinstance(ip_allowlist, list)
ip_allowlist = cast(list[str], ssrf["ip_allowlist"])
for ip in ip_allowlist:
lines.append(f' - "{ip}"')
return "\n".join(lines) + "\n"