7f43f64c24
The issue: Both the original file object (tty_fd) and the FileIO object created in _run_picker() were managing the same file descriptor. When both tried to close it (or during garbage collection), we got 'Bad file descriptor' errors. The solution: Use os.dup() to create an independent copy of the fd that FileIO can own exclusively. The original file object closes its copy, and FileIO closes its independent copy, preventing conflicts. This properly separates fd ownership between the two objects. Fixes the 'Exception ignored while finalizing file' errors on agent startup. Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
222 lines
6.5 KiB
Python
222 lines
6.5 KiB
Python
"""tui.py — minimal curses filter-select picker for CLI prompts.
|
|
|
|
Exposed surface:
|
|
|
|
filter_select(items, *, title="", tty_path="/dev/tty") -> str | None
|
|
|
|
Opens /dev/tty directly so the picker works even when stdout/stdin are
|
|
redirected. Returns the selected item or None on cancel.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import curses
|
|
import os
|
|
import sys
|
|
from typing import Any, Optional
|
|
|
|
|
|
def filter_select(
|
|
items: list[str],
|
|
*,
|
|
title: str = "",
|
|
tty_path: str = "/dev/tty",
|
|
) -> Optional[str]:
|
|
"""Render a filter-select picker over *items*.
|
|
|
|
Returns the selected item string, or ``None`` if the user cancelled
|
|
(Esc / ``q`` / Ctrl-C / Ctrl-D) or if the terminal is too small.
|
|
|
|
The picker opens *tty_path* directly so it works even when
|
|
stdout/stdin are redirected.
|
|
"""
|
|
if not items:
|
|
return None
|
|
|
|
try:
|
|
tty_fd = open(tty_path, "r+b", buffering=0)
|
|
except OSError:
|
|
return None
|
|
|
|
try:
|
|
# Use os.dup() to duplicate the fd so the original file object
|
|
# and FileIO in _run_picker each manage independent copies,
|
|
# preventing double-close errors.
|
|
import os as _os
|
|
fd_dup = _os.dup(tty_fd.fileno())
|
|
return _run_picker(items, title=title, tty_fd=fd_dup)
|
|
finally:
|
|
tty_fd.close()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Internal implementation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_KEY_ESC = 27
|
|
_KEY_CTRL_C = 3
|
|
_KEY_CTRL_D = 4
|
|
_KEY_BACKSPACE_WIN = 8
|
|
_KEY_ENTER_ALT = 10
|
|
|
|
_CANCEL_KEYS = frozenset([_KEY_ESC, _KEY_CTRL_C, _KEY_CTRL_D, ord("q")])
|
|
|
|
|
|
def _run_picker(items: list[str], *, title: str, tty_fd: int) -> Optional[str]:
|
|
"""Drive a curses session on *tty_fd* and return the picked item."""
|
|
# newterm lets us run curses on an arbitrary fd rather than the
|
|
# process's controlling tty / stdout — crucial when stdout is piped.
|
|
os.environ.setdefault("TERM", "xterm-256color")
|
|
|
|
# Save / restore the real stdin/stdout so curses newterm can use tty_fd.
|
|
orig_stdin = sys.__stdin__
|
|
orig_stdout = sys.__stdout__
|
|
|
|
try:
|
|
import io
|
|
tty_text = io.TextIOWrapper(io.FileIO(tty_fd, mode='r+'), write_through=True)
|
|
sys.__stdin__ = tty_text # type: ignore[assignment]
|
|
sys.__stdout__ = tty_text # type: ignore[assignment]
|
|
|
|
# curses.wrapper calls initscr which honours sys.__stdin__ / __stdout__
|
|
# on some builds; use newterm where available.
|
|
screen = curses.initscr()
|
|
curses.noecho()
|
|
curses.cbreak()
|
|
screen.keypad(True)
|
|
|
|
try:
|
|
result = _picker_loop(screen, items, title=title)
|
|
finally:
|
|
screen.keypad(False)
|
|
curses.nocbreak()
|
|
curses.echo()
|
|
curses.endwin()
|
|
except Exception: # noqa: W0718 — curses can raise many error types
|
|
return None
|
|
finally:
|
|
sys.__stdin__ = orig_stdin # type: ignore[assignment]
|
|
sys.__stdout__ = orig_stdout # type: ignore[assignment]
|
|
|
|
return result
|
|
|
|
|
|
def _picker_loop(screen: Any, items: list[str], *, title: str) -> Optional[str]:
|
|
query = ""
|
|
cursor = 0
|
|
|
|
while True:
|
|
filtered = _filter_items(items, query)
|
|
|
|
# Clamp cursor into the visible list.
|
|
if not filtered:
|
|
cursor = 0
|
|
elif cursor >= len(filtered):
|
|
cursor = len(filtered) - 1
|
|
|
|
try:
|
|
_render(screen, filtered, cursor, query=query, title=title)
|
|
except curses.error:
|
|
# Terminal too small or write error — bail out.
|
|
return None
|
|
|
|
try:
|
|
key = screen.getch()
|
|
except KeyboardInterrupt:
|
|
return None
|
|
|
|
if key in _CANCEL_KEYS:
|
|
return None
|
|
|
|
if key in (curses.KEY_ENTER, _KEY_ENTER_ALT, ord("\r")):
|
|
return filtered[cursor] if filtered else None
|
|
|
|
if key in (curses.KEY_UP, ord("k")):
|
|
if cursor > 0:
|
|
cursor -= 1
|
|
|
|
elif key in (curses.KEY_DOWN, ord("j")):
|
|
if cursor < len(filtered) - 1:
|
|
cursor += 1
|
|
|
|
elif key in (curses.KEY_BACKSPACE, _KEY_BACKSPACE_WIN, 127):
|
|
query = query[:-1]
|
|
# After narrowing the filter, keep cursor in range.
|
|
new_filtered = _filter_items(items, query)
|
|
if cursor >= len(new_filtered):
|
|
cursor = max(0, len(new_filtered) - 1)
|
|
|
|
elif 32 <= key <= 126:
|
|
# Printable ASCII — append to query and reset cursor so the
|
|
# top of the newly-filtered list is selected.
|
|
query += chr(key)
|
|
cursor = 0
|
|
|
|
|
|
def _filter_items(items: list[str], query: str) -> list[str]:
|
|
if not query:
|
|
return list(items)
|
|
q = query.lower()
|
|
return [i for i in items if q in i.lower()]
|
|
|
|
|
|
def _render(screen: Any, filtered: list[str], cursor: int, *, query: str, title: str) -> None:
|
|
screen.erase()
|
|
rows, cols = screen.getmaxyx()
|
|
min_rows = 5
|
|
|
|
if rows < min_rows:
|
|
raise curses.error("terminal too small")
|
|
|
|
row = 0
|
|
|
|
if title and row < rows - 1:
|
|
_addstr_safe(screen, row, 0, title[:cols - 1], curses.A_BOLD)
|
|
row += 1
|
|
|
|
filter_label = f"Filter: {query}"
|
|
if row < rows - 1:
|
|
_addstr_safe(screen, row, 0, filter_label[:cols - 1])
|
|
row += 1
|
|
|
|
sep = "─" * min(cols - 1, 40)
|
|
if row < rows - 1:
|
|
_addstr_safe(screen, row, 0, sep)
|
|
row += 1
|
|
|
|
list_start = row
|
|
# Reserve two rows for separator + help line at bottom.
|
|
list_rows = rows - list_start - 2
|
|
if list_rows < 1:
|
|
return
|
|
|
|
# Scroll window: keep cursor visible.
|
|
scroll = max(0, cursor - list_rows + 1)
|
|
visible = filtered[scroll: scroll + list_rows]
|
|
|
|
for idx, item in enumerate(visible):
|
|
abs_idx = scroll + idx
|
|
attr = curses.A_REVERSE if abs_idx == cursor else curses.A_NORMAL
|
|
prefix = "> " if abs_idx == cursor else " "
|
|
line = (prefix + item)[:cols - 1]
|
|
if row < rows - 1:
|
|
_addstr_safe(screen, row, 0, line, attr)
|
|
row += 1
|
|
|
|
if row < rows - 1:
|
|
_addstr_safe(screen, row, 0, sep)
|
|
row += 1
|
|
|
|
help_line = "[↑↓/jk] move [Enter] select [Esc/q] cancel"
|
|
if row < rows:
|
|
_addstr_safe(screen, min(rows - 1, row), 0, help_line[:cols - 1])
|
|
|
|
screen.refresh()
|
|
|
|
|
|
def _addstr_safe(screen: Any, row: int, col: int, text: str, attr: int = curses.A_NORMAL) -> None:
|
|
try:
|
|
screen.addstr(row, col, text, attr)
|
|
except curses.error:
|
|
pass
|