diff --git a/bot_bottle/backend/docker/bottle_state.py b/bot_bottle/backend/docker/bottle_state.py index d258974..f0e8497 100644 --- a/bot_bottle/backend/docker/bottle_state.py +++ b/bot_bottle/backend/docker/bottle_state.py @@ -35,6 +35,7 @@ import secrets import string from dataclasses import dataclass from pathlib import Path +from typing import cast from ... import supervise as _supervise from . import util as docker_mod @@ -135,14 +136,15 @@ def read_metadata(identity: str) -> BottleMetadata | None: raw = json.loads(path.read_text()) if not isinstance(raw, dict): return None + raw_typed = cast(dict[str, object], raw) return BottleMetadata( - identity=str(raw.get("identity", identity)), - agent_name=str(raw.get("agent_name", "")), - cwd=str(raw.get("cwd", "")), - copy_cwd=bool(raw.get("copy_cwd", False)), - started_at=str(raw.get("started_at", "")), - compose_project=str(raw.get("compose_project", "")), - backend=str(raw.get("backend", "")), + identity=str(raw_typed.get("identity", identity)), + agent_name=str(raw_typed.get("agent_name", "")), + cwd=str(raw_typed.get("cwd", "")), + copy_cwd=bool(raw_typed.get("copy_cwd", False)), + started_at=str(raw_typed.get("started_at", "")), + compose_project=str(raw_typed.get("compose_project", "")), + backend=str(raw_typed.get("backend", "")), ) diff --git a/bot_bottle/backend/docker/egress_apply.py b/bot_bottle/backend/docker/egress_apply.py index 1cc73b0..80eb507 100644 --- a/bot_bottle/backend/docker/egress_apply.py +++ b/bot_bottle/backend/docker/egress_apply.py @@ -26,6 +26,7 @@ import json import re import subprocess from pathlib import Path +from typing import cast from ...egress import EGRESS_ROUTES_IN_CONTAINER from ...egress_addon_core import load_routes @@ -57,7 +58,8 @@ def _render_routes_payload(routes_list: list[dict[str, object]]) -> str: if auth_scheme and token_env: lines.append(f' auth_scheme: "{auth_scheme}"') lines.append(f' token_env: "{token_env}"') - paths = entry.get("path_allowlist") or [] + paths_obj = entry.get("path_allowlist") + paths = cast(list[str], paths_obj) if isinstance(paths_obj, list) else [] if paths: lines.append(" path_allowlist:") for p in paths: @@ -257,6 +259,7 @@ def _merge_single_route( raise EgressApplyError( "current routes.yaml: 'routes' is not a list" ) + routes_typed = cast(list[object], routes) new_host = str(new_route.get("host", "")).lower() if not new_host: @@ -264,22 +267,25 @@ def _merge_single_route( "proposed route is missing 'host'" ) - proposed_paths = list(new_route.get("path_allowlist") or []) + proposed_paths_obj = new_route.get("path_allowlist") + proposed_paths = cast(list[str], proposed_paths_obj) if isinstance(proposed_paths_obj, list) else [] # Look for an existing entry with the same host (case-insensitive). - for entry in routes: + for entry in routes_typed: if not isinstance(entry, dict): continue - if str(entry.get("host", "")).lower() == new_host: + entry_typed = cast(dict[str, object], entry) + if str(entry_typed.get("host", "")).lower() == new_host: # Merge path_allowlist: union proposed + existing, ordered # by first-seen so existing paths stay in original order. - existing_paths: list[str] = list(entry.get("path_allowlist") or []) + existing_paths_obj = entry_typed.get("path_allowlist") + existing_paths = cast(list[str], existing_paths_obj) if isinstance(existing_paths_obj, list) else [] seen = {p: None for p in existing_paths} for p in proposed_paths: seen.setdefault(p, None) merged_paths = list(seen.keys()) if merged_paths: - entry["path_allowlist"] = merged_paths + entry_typed["path_allowlist"] = merged_paths # Preserve existing auth — tool description says agent- # proposed auth on an existing host is ignored. break @@ -289,19 +295,22 @@ def _merge_single_route( # `auth` was proposed (otherwise the addon's parser rejects # a half-set auth pair). Slots: count existing slots, pick # the next free index. - entry = {"host": new_route["host"]} + entry_typed: dict[str, object] = {"host": new_route.get("host")} # type: ignore if proposed_paths: - entry["path_allowlist"] = proposed_paths + entry_typed["path_allowlist"] = proposed_paths auth = new_route.get("auth") - if isinstance(auth, dict) and auth.get("scheme") and auth.get("token_ref"): + if isinstance(auth, dict) and auth.get("scheme") and auth.get("token_ref"): # type: ignore + auth_typed = cast(dict[str, object], auth) existing_slots = sorted({ - str(r.get("token_env")) - for r in routes - if isinstance(r, dict) and r.get("token_env") + str(r_entry.get("token_env", "")) + for r_entry_obj in routes_typed + if isinstance(r_entry_obj, dict) + for r_entry in [cast(dict[str, object], r_entry_obj)] + if r_entry.get("token_env") }) next_idx = len(existing_slots) - entry["auth_scheme"] = str(auth["scheme"]) - entry["token_env"] = f"EGRESS_TOKEN_{next_idx}" + entry_typed["auth_scheme"] = str(cast(object, auth_typed.get("scheme"))) + entry_typed["token_env"] = f"EGRESS_TOKEN_{next_idx}" # NOTE: the addon reads token VALUES from its container's # environ keyed by token_env. A newly-added auth route at # runtime points at a slot that has no env value → the @@ -309,9 +318,9 @@ def _merge_single_route( # arranges for the value to land in the container's env. # Recording this here so the operator-facing diff carries # the slot name they'll need to provision. - routes.append(entry) + routes_typed.append(entry_typed) - return _render_routes_payload(routes) + return _render_routes_payload(cast(list[dict[str, object]], routes_typed)) def add_route(slug: str, proposed_route_json: str) -> tuple[str, str]: