diff --git a/bot_bottle/pipelock.py b/bot_bottle/pipelock.py index b5b119e..c9ea82d 100644 --- a/bot_bottle/pipelock.py +++ b/bot_bottle/pipelock.py @@ -19,6 +19,7 @@ from __future__ import annotations from dataclasses import dataclass from pathlib import Path +from typing import cast from .egress import EgressRoute, egress_routes_for_bottle from .supervise import SUPERVISE_HOSTNAME @@ -259,7 +260,7 @@ def _required_dict( value = obj.get(key) if not isinstance(value, dict): raise _pipelock_render_error(section, key, "a mapping") - return value + return cast(dict[str, object], value) def _required_bool(obj: dict[str, object], section: str, key: str) -> bool: @@ -289,9 +290,12 @@ def _required_str_list( key: str, ) -> list[str]: value = obj.get(key) - if not isinstance(value, list) or not all(isinstance(v, str) for v in value): + if not isinstance(value, list): raise _pipelock_render_error(section, key, "a list of strings") - return value + value_list = cast(list[object], value) + if not all(isinstance(v, str) for v in value_list): + raise _pipelock_render_error(section, key, "a list of strings") + return cast(list[str], value) def _optional_str_list( @@ -407,49 +411,42 @@ def pipelock_render_yaml(cfg: dict[str, object]) -> str: lines: list[str] = [] lines.append(f"version: {cfg['version']}") lines.append(f"mode: {cfg['mode']}") - lines.append(f"enforce: {_bool(cfg['enforce'])}") + lines.append(f"enforce: {_bool(cast(bool, cfg['enforce']))}") lines.append("") lines.append("api_allowlist:") - api_allowlist = cfg["api_allowlist"] - assert isinstance(api_allowlist, list) + api_allowlist = cast(list[str], cfg["api_allowlist"]) for h in api_allowlist: lines.append(f' - "{h}"') lines.append("") if "seed_phrase_detection" in cfg: lines.append("seed_phrase_detection:") - spd = cfg["seed_phrase_detection"] - assert isinstance(spd, dict) - lines.append(f" enabled: {_bool(spd['enabled'])}") + spd = cast(dict[str, object], cfg["seed_phrase_detection"]) + lines.append(f" enabled: {_bool(cast(bool, spd['enabled']))}") lines.append("") lines.append("forward_proxy:") - fp = cfg["forward_proxy"] - assert isinstance(fp, dict) - lines.append(f" enabled: {_bool(fp['enabled'])}") + fp = cast(dict[str, object], cfg["forward_proxy"]) + lines.append(f" enabled: {_bool(cast(bool, fp['enabled']))}") lines.append("") lines.append("dlp:") - dlp = cfg["dlp"] - assert isinstance(dlp, dict) - lines.append(f" include_defaults: {_bool(dlp['include_defaults'])}") - lines.append(f" scan_env: {_bool(dlp['scan_env'])}") + dlp = cast(dict[str, object], cfg["dlp"]) + lines.append(f" include_defaults: {_bool(cast(bool, dlp['include_defaults']))}") + lines.append(f" scan_env: {_bool(cast(bool, dlp['scan_env']))}") lines.append("") lines.append("request_body_scanning:") - rbs = cfg["request_body_scanning"] - assert isinstance(rbs, dict) - lines.append(f' action: "{rbs["action"]}"') + rbs = cast(dict[str, object], cfg["request_body_scanning"]) + lines.append(f' action: "{cast(str, rbs["action"])}"') if "scan_headers" in rbs: - lines.append(f" scan_headers: {_bool(rbs['scan_headers'])}") + lines.append(f" scan_headers: {_bool(cast(bool, rbs['scan_headers']))}") if "header_mode" in rbs: - lines.append(f' header_mode: "{rbs["header_mode"]}"') + lines.append(f' header_mode: "{cast(str, rbs["header_mode"])}"') if "tls_interception" in cfg: lines.append("") lines.append("tls_interception:") - tls = cfg["tls_interception"] - assert isinstance(tls, dict) - lines.append(f" enabled: {_bool(tls['enabled'])}") - lines.append(f' ca_cert: "{tls["ca_cert"]}"') - lines.append(f' ca_key: "{tls["ca_key"]}"') - passthrough = tls["passthrough_domains"] - assert isinstance(passthrough, list) + tls = cast(dict[str, object], cfg["tls_interception"]) + lines.append(f" enabled: {_bool(cast(bool, tls['enabled']))}") + lines.append(f' ca_cert: "{cast(str, tls["ca_cert"])}"') + lines.append(f' ca_key: "{cast(str, tls["ca_key"])}"') + passthrough = cast(list[str], tls["passthrough_domains"]) if passthrough: lines.append(" passthrough_domains:") for d in passthrough: @@ -457,11 +454,9 @@ def pipelock_render_yaml(cfg: dict[str, object]) -> str: if "ssrf" in cfg: lines.append("") lines.append("ssrf:") - ssrf = cfg["ssrf"] - assert isinstance(ssrf, dict) + ssrf = cast(dict[str, object], cfg["ssrf"]) lines.append(" ip_allowlist:") - ip_allowlist = ssrf["ip_allowlist"] - assert isinstance(ip_allowlist, list) + ip_allowlist = cast(list[str], ssrf["ip_allowlist"]) for ip in ip_allowlist: lines.append(f' - "{ip}"') return "\n".join(lines) + "\n"