chore: reduce lint and type-check noise #187

Merged
didericis merged 32 commits from feat/linting-and-type-fixes into main 2026-06-04 12:24:39 -04:00
11 changed files with 31 additions and 29 deletions
Showing only changes of commit a430bac1bf - Show all commits
+3 -1
View File
@@ -5,6 +5,8 @@ from __future__ import annotations
import subprocess
from typing import Callable
from typing import cast
from ...agent_provider import PromptMode, prompt_args
from .. import Bottle, ExecResult
@@ -36,7 +38,7 @@ class DockerBottle(Bottle):
) -> list[str]:
full_argv = list(argv)
full_argv.extend(
prompt_args(self._agent_prompt_mode, self.prompt_path, argv=full_argv)
prompt_args(cast(PromptMode, self._agent_prompt_mode), self.prompt_path, argv=full_argv)
)
cmd = ["docker", "exec"]
if tty:
+3 -3
View File
@@ -19,7 +19,7 @@ from __future__ import annotations
import subprocess
import sys
from typing import Mapping
from typing import Mapping, cast
from ...agent_provider import PromptMode, prompt_args
from .. import Bottle, ExecResult
@@ -93,9 +93,9 @@ class SmolmachinesBottle(Bottle):
agent_tail = ["env", *_env_assignments_for("node", self._guest_env),
self.agent_command]
provider_prompt_args = prompt_args(
self._agent_prompt_mode, self.prompt_path, argv=argv,
cast(PromptMode, self._agent_prompt_mode), self.prompt_path, argv=argv,
)
if self._agent_prompt_mode == "read_prompt_file":
if cast(PromptMode, self._agent_prompt_mode) == "read_prompt_file":
agent_tail += argv
agent_tail += provider_prompt_args
else:
@@ -42,7 +42,7 @@ import time
import uuid
from contextlib import contextmanager
from dataclasses import dataclass
from typing import Generator, Iterator
from typing import Generator
from ...log import die
@@ -42,6 +42,7 @@ import subprocess
import sys
import termios
import threading
from types import FrameType
# How long to wait after the main exec starts before pushing the
@@ -123,7 +124,7 @@ def main(argv: list[str]) -> int:
machine = argv[0]
inner = argv[2:]
def sync(*_args: int) -> None:
def sync(_signum: int, _frame: FrameType | None) -> None:
size = _read_winsize()
if size is None:
return
+2 -2
View File
@@ -52,7 +52,7 @@ class SmolvmError(RuntimeError):
pack failed, etc.). Carries the captured stderr for the
operator-facing log line."""
def __init__(self, argv: Sequence[str], result: subprocess.CompletedProcess):
def __init__(self, argv: Sequence[str], result: subprocess.CompletedProcess[str]):
self.argv = list(argv)
self.returncode = result.returncode
self.stdout = result.stdout
@@ -65,7 +65,7 @@ class SmolvmError(RuntimeError):
def _smolvm(*args: str, env: Mapping[str, str] | None = None,
check: bool = True) -> subprocess.CompletedProcess:
check: bool = True) -> subprocess.CompletedProcess[str]:
"""One subprocess call into the smolvm CLI. `check=True`
raises SmolvmError on non-zero; `check=False` returns the
CompletedProcess for the caller to inspect."""
+5 -5
View File
@@ -354,7 +354,7 @@ def _try_init_green() -> int:
return 0
def _main_loop(stdscr: "curses._CursesWindow") -> None:
def _main_loop(stdscr: "curses._CursesWindow") -> None: # type: ignore
curses.curs_set(0)
stdscr.timeout(_REFRESH_INTERVAL_MS)
green_attr = _try_init_green()
@@ -434,7 +434,7 @@ def _main_loop(stdscr: "curses._CursesWindow") -> None:
def _render(
stdscr: "curses._CursesWindow",
stdscr: "curses._CursesWindow", # type: ignore
pending: list[QueuedProposal],
selected: int,
status_line: str,
@@ -488,7 +488,7 @@ def _render(
def _detail_view(
stdscr: "curses._CursesWindow",
stdscr: "curses._CursesWindow", # type: ignore
qp: QueuedProposal,
*,
green_attr: int = 0,
@@ -539,7 +539,7 @@ def _detail_view(
return
def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None:
def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None: # type: ignore
"""Suspend curses, open $EDITOR on the proposed file, return edited content."""
suffix = _suffix_for_tool(qp.proposal.tool)
curses.endwin()
@@ -550,7 +550,7 @@ def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None:
return edited
def _prompt(stdscr: "curses._CursesWindow", label: str) -> str:
def _prompt(stdscr: "curses._CursesWindow", label: str) -> str: # type: ignore
"""One-line input at the bottom of the screen."""
curses.curs_set(1)
h, _ = stdscr.getmaxyx()
+7 -8
View File
@@ -13,7 +13,7 @@ from __future__ import annotations
import curses
import os
import sys
from typing import Optional
from typing import Any, Optional
def filter_select(
@@ -39,7 +39,7 @@ def filter_select(
return None
try:
result = _run_picker(items, title=title, tty_fd=tty_fd)
result = _run_picker(items, title=title, tty_fd=tty_fd.fileno())
finally:
tty_fd.close()
@@ -59,11 +59,10 @@ _KEY_ENTER_ALT = 10
_CANCEL_KEYS = frozenset([_KEY_ESC, _KEY_CTRL_C, _KEY_CTRL_D, ord("q")])
def _run_picker(items: list[str], *, title: str, tty_fd) -> Optional[str]:
def _run_picker(items: list[str], *, title: str, tty_fd: int) -> Optional[str]:
"""Drive a curses session on *tty_fd* and return the picked item."""
# newterm lets us run curses on an arbitrary fd rather than the
# process's controlling tty / stdout — crucial when stdout is piped.
old_term = os.environ.get("TERM", "xterm-256color")
os.environ.setdefault("TERM", "xterm-256color")
# Save / restore the real stdin/stdout so curses newterm can use tty_fd.
@@ -72,7 +71,7 @@ def _run_picker(items: list[str], *, title: str, tty_fd) -> Optional[str]:
try:
import io
tty_text = io.TextIOWrapper(tty_fd, write_through=True)
tty_text = io.TextIOWrapper(io.FileIO(tty_fd, mode='r+'), write_through=True)
sys.__stdin__ = tty_text # type: ignore[assignment]
sys.__stdout__ = tty_text # type: ignore[assignment]
@@ -99,7 +98,7 @@ def _run_picker(items: list[str], *, title: str, tty_fd) -> Optional[str]:
return result
def _picker_loop(screen, items: list[str], *, title: str) -> Optional[str]:
def _picker_loop(screen: Any, items: list[str], *, title: str) -> Optional[str]:
query = ""
cursor = 0
@@ -158,7 +157,7 @@ def _filter_items(items: list[str], query: str) -> list[str]:
return [i for i in items if q in i.lower()]
def _render(screen, filtered: list[str], cursor: int, *, query: str, title: str) -> None:
def _render(screen: Any, filtered: list[str], cursor: int, *, query: str, title: str) -> None:
screen.erase()
rows, cols = screen.getmaxyx()
min_rows = 5
@@ -212,7 +211,7 @@ def _render(screen, filtered: list[str], cursor: int, *, query: str, title: str)
screen.refresh()
def _addstr_safe(screen, row: int, col: int, text: str, attr: int = curses.A_NORMAL) -> None:
def _addstr_safe(screen: Any, row: int, col: int, text: str, attr: int = curses.A_NORMAL) -> None:
try:
screen.addstr(row, col, text, attr)
except curses.error:
+1 -1
View File
@@ -144,7 +144,7 @@ class ClaudeAgentProvider(AgentProvider):
prompt (drives `--append-system-prompt-file`); the file is
copied either way so the path always exists."""
prompt_path = _prompt_path(plan.guest_home)
bottle.cp_in(str(plan.prompt_file), prompt_path)
bottle.cp_in(str(plan.prompt_file), prompt_path) # type: ignore
bottle.exec(
f"chown node:node {prompt_path} && chmod 600 {prompt_path}",
user="root",
+1 -1
View File
@@ -189,7 +189,7 @@ class CodexAgentProvider(AgentProvider):
instructions in <path>.` bootstrap (see `prompt_args`); the
file is copied either way so the path always exists."""
prompt_path = _prompt_path(plan.guest_home)
bottle.cp_in(str(plan.prompt_file), prompt_path)
bottle.cp_in(str(plan.prompt_file), prompt_path) # type: ignore
bottle.exec(
f"chown node:node {prompt_path} && chmod 600 {prompt_path}",
user="root",
+4 -4
View File
@@ -78,8 +78,8 @@ class GitHttpHandler(BaseHTTPRequestHandler):
"REMOTE_ADDR": self.client_address[0],
"REMOTE_PORT": str(self.client_address[1]),
"REMOTE_USER": "",
"SERVER_NAME": self.server.server_name,
"SERVER_PORT": str(self.server.server_port),
"SERVER_NAME": self.server.server_name, # type: ignore
"SERVER_PORT": str(self.server.server_port), # type: ignore
"SERVER_PROTOCOL": self.request_version,
})
for header, variable in (
@@ -157,8 +157,8 @@ class GitHttpHandler(BaseHTTPRequestHandler):
self.end_headers()
self.wfile.write(body)
def log_message(self, fmt: str, *args: object) -> None:
sys.stdout.write(fmt % args + "\n")
def log_message(self, format: str, *args: object) -> None: # type: ignore
sys.stdout.write(format % args + "\n")
sys.stdout.flush()
+2 -2
View File
@@ -138,7 +138,7 @@ def _pump(name: str, stream: IO[bytes]) -> None:
sys.stdout.flush()
def _spawn(spec: _DaemonSpec) -> subprocess.Popen:
def _spawn(spec: _DaemonSpec) -> subprocess.Popen[bytes]:
proc = subprocess.Popen(
list(spec.argv),
stdout=subprocess.PIPE,
@@ -158,7 +158,7 @@ class _Supervisor:
def __init__(self, specs: Sequence[_DaemonSpec]):
self.specs = tuple(specs)
self.procs: list[tuple[_DaemonSpec, subprocess.Popen]] = []
self.procs: list[tuple[_DaemonSpec, subprocess.Popen[bytes]]] = []
self.shutdown_at: float | None = None
# Names of children that have been logged as having exited
# so we only log each death once across watch-loop ticks.