Files
bot-bottle/bot_bottle/cli/tui.py
T
didericis 82b8dffc54
lint / lint (push) Successful in 1m26s
test / unit (pull_request) Successful in 33s
test / integration (pull_request) Successful in 42s
fix: remove tty_fd.close() to prevent 'Bad file descriptor' error
The issue: filter_select() opens a file object and passes its file
descriptor to _run_picker(). Inside _run_picker(), a FileIO object is
created from that same fd number. When filter_select() then calls
tty_fd.close(), it closes the underlying fd. But FileIO still has a
reference to that fd number, causing 'Bad file descriptor' errors.

Solution: Don't explicitly close tty_fd. Let it be garbage collected,
which naturally closes the fd. This works because FileIO will also
attempt to close it, but by that time both objects reference the same
closed fd through the file object's lifecycle.

The fd is properly closed by the time the function returns.

Fixes agent startup failure.

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-06-04 12:11:29 -04:00

219 lines
6.5 KiB
Python

"""tui.py — minimal curses filter-select picker for CLI prompts.
Exposed surface:
filter_select(items, *, title="", tty_path="/dev/tty") -> str | None
Opens /dev/tty directly so the picker works even when stdout/stdin are
redirected. Returns the selected item or None on cancel.
"""
from __future__ import annotations
import curses
import os
import sys
from typing import Any, Optional
def filter_select(
items: list[str],
*,
title: str = "",
tty_path: str = "/dev/tty",
) -> Optional[str]:
"""Render a filter-select picker over *items*.
Returns the selected item string, or ``None`` if the user cancelled
(Esc / ``q`` / Ctrl-C / Ctrl-D) or if the terminal is too small.
The picker opens *tty_path* directly so it works even when
stdout/stdin are redirected.
"""
if not items:
return None
try:
tty_fd = open(tty_path, "r+b", buffering=0)
except OSError:
return None
# Note: Don't close tty_fd here. FileIO in _run_picker wraps the same
# file descriptor and manages its lifecycle. Closing tty_fd would close
# the underlying fd, causing "Bad file descriptor" errors when FileIO
# tries to use it. Let the file object be closed by garbage collection.
result = _run_picker(items, title=title, tty_fd=tty_fd.fileno())
return result
# ---------------------------------------------------------------------------
# Internal implementation
# ---------------------------------------------------------------------------
_KEY_ESC = 27
_KEY_CTRL_C = 3
_KEY_CTRL_D = 4
_KEY_BACKSPACE_WIN = 8
_KEY_ENTER_ALT = 10
_CANCEL_KEYS = frozenset([_KEY_ESC, _KEY_CTRL_C, _KEY_CTRL_D, ord("q")])
def _run_picker(items: list[str], *, title: str, tty_fd: int) -> Optional[str]:
"""Drive a curses session on *tty_fd* and return the picked item."""
# newterm lets us run curses on an arbitrary fd rather than the
# process's controlling tty / stdout — crucial when stdout is piped.
os.environ.setdefault("TERM", "xterm-256color")
# Save / restore the real stdin/stdout so curses newterm can use tty_fd.
orig_stdin = sys.__stdin__
orig_stdout = sys.__stdout__
try:
import io
tty_text = io.TextIOWrapper(io.FileIO(tty_fd, mode='r+'), write_through=True)
sys.__stdin__ = tty_text # type: ignore[assignment]
sys.__stdout__ = tty_text # type: ignore[assignment]
# curses.wrapper calls initscr which honours sys.__stdin__ / __stdout__
# on some builds; use newterm where available.
screen = curses.initscr()
curses.noecho()
curses.cbreak()
screen.keypad(True)
try:
result = _picker_loop(screen, items, title=title)
finally:
screen.keypad(False)
curses.nocbreak()
curses.echo()
curses.endwin()
except Exception: # noqa: W0718 — curses can raise many error types
return None
finally:
sys.__stdin__ = orig_stdin # type: ignore[assignment]
sys.__stdout__ = orig_stdout # type: ignore[assignment]
return result
def _picker_loop(screen: Any, items: list[str], *, title: str) -> Optional[str]:
query = ""
cursor = 0
while True:
filtered = _filter_items(items, query)
# Clamp cursor into the visible list.
if not filtered:
cursor = 0
elif cursor >= len(filtered):
cursor = len(filtered) - 1
try:
_render(screen, filtered, cursor, query=query, title=title)
except curses.error:
# Terminal too small or write error — bail out.
return None
try:
key = screen.getch()
except KeyboardInterrupt:
return None
if key in _CANCEL_KEYS:
return None
if key in (curses.KEY_ENTER, _KEY_ENTER_ALT, ord("\r")):
return filtered[cursor] if filtered else None
if key in (curses.KEY_UP, ord("k")):
if cursor > 0:
cursor -= 1
elif key in (curses.KEY_DOWN, ord("j")):
if cursor < len(filtered) - 1:
cursor += 1
elif key in (curses.KEY_BACKSPACE, _KEY_BACKSPACE_WIN, 127):
query = query[:-1]
# After narrowing the filter, keep cursor in range.
new_filtered = _filter_items(items, query)
if cursor >= len(new_filtered):
cursor = max(0, len(new_filtered) - 1)
elif 32 <= key <= 126:
# Printable ASCII — append to query and reset cursor so the
# top of the newly-filtered list is selected.
query += chr(key)
cursor = 0
def _filter_items(items: list[str], query: str) -> list[str]:
if not query:
return list(items)
q = query.lower()
return [i for i in items if q in i.lower()]
def _render(screen: Any, filtered: list[str], cursor: int, *, query: str, title: str) -> None:
screen.erase()
rows, cols = screen.getmaxyx()
min_rows = 5
if rows < min_rows:
raise curses.error("terminal too small")
row = 0
if title and row < rows - 1:
_addstr_safe(screen, row, 0, title[:cols - 1], curses.A_BOLD)
row += 1
filter_label = f"Filter: {query}"
if row < rows - 1:
_addstr_safe(screen, row, 0, filter_label[:cols - 1])
row += 1
sep = "" * min(cols - 1, 40)
if row < rows - 1:
_addstr_safe(screen, row, 0, sep)
row += 1
list_start = row
# Reserve two rows for separator + help line at bottom.
list_rows = rows - list_start - 2
if list_rows < 1:
return
# Scroll window: keep cursor visible.
scroll = max(0, cursor - list_rows + 1)
visible = filtered[scroll: scroll + list_rows]
for idx, item in enumerate(visible):
abs_idx = scroll + idx
attr = curses.A_REVERSE if abs_idx == cursor else curses.A_NORMAL
prefix = "> " if abs_idx == cursor else " "
line = (prefix + item)[:cols - 1]
if row < rows - 1:
_addstr_safe(screen, row, 0, line, attr)
row += 1
if row < rows - 1:
_addstr_safe(screen, row, 0, sep)
row += 1
help_line = "[↑↓/jk] move [Enter] select [Esc/q] cancel"
if row < rows:
_addstr_safe(screen, min(rows - 1, row), 0, help_line[:cols - 1])
screen.refresh()
def _addstr_safe(screen: Any, row: int, col: int, text: str, attr: int = curses.A_NORMAL) -> None:
try:
screen.addstr(row, col, text, attr)
except curses.error:
pass