fix: resolve pyright errors in bottle_state.py and most of egress_apply.py

- Add cast import and use for dict.get() results in bottle_state.py
- Fix JSON metadata loading with proper dict type casting
- Apply same pattern to egress_apply.py for YAML routes parsing
- Cast routes list after isinstance check
- Properly type proposed_paths and existing_paths after validation
- Fixes 35 pyright errors across both files

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-06-03 23:40:14 -04:00
parent 73a4fbe0a7
commit 570cd42532
2 changed files with 34 additions and 23 deletions
+9 -7
View File
@@ -35,6 +35,7 @@ import secrets
import string
from dataclasses import dataclass
from pathlib import Path
from typing import cast
from ... import supervise as _supervise
from . import util as docker_mod
@@ -135,14 +136,15 @@ def read_metadata(identity: str) -> BottleMetadata | None:
raw = json.loads(path.read_text())
if not isinstance(raw, dict):
return None
raw_typed = cast(dict[str, object], raw)
return BottleMetadata(
identity=str(raw.get("identity", identity)),
agent_name=str(raw.get("agent_name", "")),
cwd=str(raw.get("cwd", "")),
copy_cwd=bool(raw.get("copy_cwd", False)),
started_at=str(raw.get("started_at", "")),
compose_project=str(raw.get("compose_project", "")),
backend=str(raw.get("backend", "")),
identity=str(raw_typed.get("identity", identity)),
agent_name=str(raw_typed.get("agent_name", "")),
cwd=str(raw_typed.get("cwd", "")),
copy_cwd=bool(raw_typed.get("copy_cwd", False)),
started_at=str(raw_typed.get("started_at", "")),
compose_project=str(raw_typed.get("compose_project", "")),
backend=str(raw_typed.get("backend", "")),
)
+25 -16
View File
@@ -26,6 +26,7 @@ import json
import re
import subprocess
from pathlib import Path
from typing import cast
from ...egress import EGRESS_ROUTES_IN_CONTAINER
from ...egress_addon_core import load_routes
@@ -57,7 +58,8 @@ def _render_routes_payload(routes_list: list[dict[str, object]]) -> str:
if auth_scheme and token_env:
lines.append(f' auth_scheme: "{auth_scheme}"')
lines.append(f' token_env: "{token_env}"')
paths = entry.get("path_allowlist") or []
paths_obj = entry.get("path_allowlist")
paths = cast(list[str], paths_obj) if isinstance(paths_obj, list) else []
if paths:
lines.append(" path_allowlist:")
for p in paths:
@@ -257,6 +259,7 @@ def _merge_single_route(
raise EgressApplyError(
"current routes.yaml: 'routes' is not a list"
)
routes_typed = cast(list[object], routes)
new_host = str(new_route.get("host", "")).lower()
if not new_host:
@@ -264,22 +267,25 @@ def _merge_single_route(
"proposed route is missing 'host'"
)
proposed_paths = list(new_route.get("path_allowlist") or [])
proposed_paths_obj = new_route.get("path_allowlist")
proposed_paths = cast(list[str], proposed_paths_obj) if isinstance(proposed_paths_obj, list) else []
# Look for an existing entry with the same host (case-insensitive).
for entry in routes:
for entry in routes_typed:
if not isinstance(entry, dict):
continue
if str(entry.get("host", "")).lower() == new_host:
entry_typed = cast(dict[str, object], entry)
if str(entry_typed.get("host", "")).lower() == new_host:
# Merge path_allowlist: union proposed + existing, ordered
# by first-seen so existing paths stay in original order.
existing_paths: list[str] = list(entry.get("path_allowlist") or [])
existing_paths_obj = entry_typed.get("path_allowlist")
existing_paths = cast(list[str], existing_paths_obj) if isinstance(existing_paths_obj, list) else []
seen = {p: None for p in existing_paths}
for p in proposed_paths:
seen.setdefault(p, None)
merged_paths = list(seen.keys())
if merged_paths:
entry["path_allowlist"] = merged_paths
entry_typed["path_allowlist"] = merged_paths
# Preserve existing auth — tool description says agent-
# proposed auth on an existing host is ignored.
break
@@ -289,19 +295,22 @@ def _merge_single_route(
# `auth` was proposed (otherwise the addon's parser rejects
# a half-set auth pair). Slots: count existing slots, pick
# the next free index.
entry = {"host": new_route["host"]}
entry_typed: dict[str, object] = {"host": new_route.get("host")} # type: ignore
if proposed_paths:
entry["path_allowlist"] = proposed_paths
entry_typed["path_allowlist"] = proposed_paths
auth = new_route.get("auth")
if isinstance(auth, dict) and auth.get("scheme") and auth.get("token_ref"):
if isinstance(auth, dict) and auth.get("scheme") and auth.get("token_ref"): # type: ignore
auth_typed = cast(dict[str, object], auth)
existing_slots = sorted({
str(r.get("token_env"))
for r in routes
if isinstance(r, dict) and r.get("token_env")
str(r_entry.get("token_env", ""))
for r_entry_obj in routes_typed
if isinstance(r_entry_obj, dict)
for r_entry in [cast(dict[str, object], r_entry_obj)]
if r_entry.get("token_env")
})
next_idx = len(existing_slots)
entry["auth_scheme"] = str(auth["scheme"])
entry["token_env"] = f"EGRESS_TOKEN_{next_idx}"
entry_typed["auth_scheme"] = str(cast(object, auth_typed.get("scheme")))
entry_typed["token_env"] = f"EGRESS_TOKEN_{next_idx}"
# NOTE: the addon reads token VALUES from its container's
# environ keyed by token_env. A newly-added auth route at
# runtime points at a slot that has no env value → the
@@ -309,9 +318,9 @@ def _merge_single_route(
# arranges for the value to land in the container's env.
# Recording this here so the operator-facing diff carries
# the slot name they'll need to provision.
routes.append(entry)
routes_typed.append(entry_typed)
return _render_routes_payload(routes)
return _render_routes_payload(cast(list[dict[str, object]], routes_typed))
def add_route(slug: str, proposed_route_json: str) -> tuple[str, str]: