Compare commits

..

5 Commits

Author SHA1 Message Date
didericis-claude dee3600400 test: update PipelockRoutePolicy tests for Config dict design
lint / lint (push) Successful in 1m29s
test / unit (pull_request) Successful in 37s
test / integration (pull_request) Successful in 49s
Replace typed-attribute assertions (TlsPassthrough, SsrfIpAllowlist)
with Config dict lookups, drop the four strict-validation tests that
were intentionally removed in the refactor, and add a
skip_scan_for_extensions test to cover the PR's stated new feature.
2026-06-04 17:22:44 +00:00
didericis d90b04d343 feat: add generic pipelock config merging for future extensibility
lint / lint (push) Failing after 1m26s
test / unit (pull_request) Failing after 36s
test / integration (pull_request) Successful in 44s
- Merge arbitrary pipelock settings from routes into global config
- Allows routes to configure new pipelock options without code changes
- Special-case tls_passthrough and ssrf_ip_allowlist (already aggregated)

Note: Pipelock doesn't currently support per-path/per-host response
scanning rules or response size limits, so response_body_scanning config
is not yet usable. For now, use tls_passthrough for binary download hosts.

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-06-04 13:14:59 -04:00
didericis 8601c686f3 feat: forward pipelock config dict instead of parsing individual fields
lint / lint (push) Failing after 1m32s
test / unit (pull_request) Failing after 37s
test / integration (pull_request) Successful in 42s
- Change PipelockRoutePolicy to store raw pipelock config dict instead
  of individual coerced fields (TlsPassthrough, SsrfIpAllowlist)
- Update pipelock.py and egress.py to extract values from Config dict
- Simplifies manifest validation: pipelock handles its own schema
- Enables new pipelock options like skip_scan_for_extensions without
  updating bot-bottle code

This allows bottles to configure pipelock directly, e.g.:

  pipelock:
    skip_scan_for_extensions: [".whl", ".tar.gz"]

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-06-04 13:04:12 -04:00
didericis f114c861b4 fix: resolve pylint and pyright linting issues
lint / lint (push) Successful in 1m43s
test / unit (push) Successful in 42s
test / integration (push) Successful in 59s
- Remove .keys() iteration in favor of direct dictionary iteration
- Remove redundant os module reimport in tui.py
- Disable unnecessary-ellipsis rule in pylintrc to avoid conflict with pyright's
  Protocol type requirements

pyright: 0 errors
pylint: 9.93/10

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-06-04 12:40:36 -04:00
didericis 544a024e22 ci: add update-badges workflow with dispatch trigger
- Runs on push to main when Python files change
- Can be manually triggered via workflow_dispatch
- Executes pylint and pyright to extract quality scores
- Updates README.md badges with current metrics
- Auto-commits changes with [skip ci] to prevent loops

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-06-04 12:33:11 -04:00
13 changed files with 55 additions and 100 deletions
+1
View File
@@ -8,6 +8,7 @@ on:
- '**.py'
- '.pylintrc'
- 'pyrightconfig.json'
workflow_dispatch:
jobs:
update-badges:
+2 -1
View File
@@ -419,7 +419,8 @@ disable=raw-checker-failed,
too-many-instance-attributes,
duplicate-code,
import-outside-toplevel,
too-few-public-methods
too-few-public-methods,
unnecessary-ellipsis
# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option
@@ -36,7 +36,6 @@ follow-up tracked separately)."""
from __future__ import annotations
import fcntl
import io
import signal
import struct
import subprocess
@@ -69,11 +68,7 @@ def _read_winsize() -> tuple[int, int] | None:
- tmux respawn-pane: tmux sets all three to the pane's PTY.
- non-TTY (someone piped stdin in tests): none are; the
sync just no-ops, which is the right behavior."""
for default_fd, stream in enumerate((sys.stdin, sys.stdout, sys.stderr)):
try:
fd = stream.fileno()
except (AttributeError, io.UnsupportedOperation, OSError):
fd = default_fd
for fd in (sys.stdin.fileno(), sys.stdout.fileno(), sys.stderr.fileno()):
try:
data = fcntl.ioctl(fd, termios.TIOCGWINSZ, b"\x00" * 8)
except OSError:
+1 -2
View File
@@ -42,8 +42,7 @@ def filter_select(
# Use os.dup() to duplicate the fd so the original file object
# and FileIO in _run_picker each manage independent copies,
# preventing double-close errors.
import os as _os
fd_dup = _os.dup(tty_fd.fileno())
fd_dup = os.dup(tty_fd.fileno())
return _run_picker(items, title=title, tty_fd=fd_dup)
finally:
tty_fd.close()
@@ -15,8 +15,8 @@ from datetime import datetime, timezone
from pathlib import Path
from typing import cast
from bot_bottle.log import die
from bot_bottle.util import expand_tilde
from .log import die
from .util import expand_tilde
def codex_auth_path(host_env: dict[str, str] | None = None) -> Path:
@@ -153,9 +153,7 @@ def _dummy_jwt_from_host(
return _dummy_jwt(now, exp_ts=exp_ts)
if not isinstance(payload, dict):
return _dummy_jwt(now, exp_ts=exp_ts)
return _encode_dummy_jwt(
_redact_jwt_payload(cast(dict[str, object], payload), now=now, exp_ts=exp_ts)
)
return _encode_dummy_jwt(_redact_jwt_payload(cast(dict[str, object], payload), now=now, exp_ts=exp_ts))
def _encode_dummy_jwt(payload: dict[str, object]) -> str:
+1 -1
View File
@@ -23,7 +23,7 @@ from ...agent_provider import (
AgentProvisionFile,
AgentProvisionPlan,
)
from .codex_auth import codex_host_access_token, write_codex_dummy_auth_file
from ...codex_auth import codex_host_access_token, write_codex_dummy_auth_file
from ...egress import CODEX_HOST_CREDENTIAL_TOKEN_REF, EgressRoute
from ...log import die, info, warn
+3 -1
View File
@@ -141,13 +141,15 @@ def egress_manifest_routes(
routes are merged."""
out: list[EgressRoute] = []
for r in bottle.egress.routes:
tls_pt = r.Pipelock.Config.get("tls_passthrough", False)
tls_passthrough = tls_pt if isinstance(tls_pt, bool) else False
out.append(EgressRoute(
host=r.Host,
path_allowlist=r.PathAllowlist,
auth_scheme=r.AuthScheme,
token_ref=r.TokenRef,
roles=r.Role,
tls_passthrough=r.Pipelock.TlsPassthrough,
tls_passthrough=tls_passthrough,
))
return tuple(out)
+1 -1
View File
@@ -161,7 +161,7 @@ class Agent:
git_raw = d.get("git-gate")
if git_raw is not None:
gd = as_json_object(git_raw, f"agent '{name}' git-gate")
for k in gd.keys():
for k in gd:
if k != "user":
raise ManifestError(
f"agent '{name}' git-gate.{k} is not allowed at the "
+10 -47
View File
@@ -2,7 +2,6 @@
from __future__ import annotations
import ipaddress
from dataclasses import dataclass, field
from typing import cast
@@ -43,17 +42,18 @@ def validate_egress_routes(
class PipelockRoutePolicy:
"""Per-route pipelock policy overrides.
`TlsPassthrough` adds the route host to pipelock's
`tls_interception.passthrough_domains`, so pipelock still enforces
the hostname allowlist but does not MITM/decrypt request bodies or
headers for that host.
Stores raw pipelock configuration that's passed through to the
pipelock sidecar. Pipelock validates all config options, so
bot-bottle forwards manifest settings without coercion or strict
validation. Supported options include:
`SsrfIpAllowlist` adds explicit IPs/CIDRs to pipelock's SSRF
allowlist for private/internal destinations behind this route.
- `tls_passthrough`: bool — skip TLS MITM for this host
- `ssrf_ip_allowlist`: list of CIDR/IP — allow private destinations
- `skip_scan_for_extensions`: list of file extensions to skip DLP
scanning for (e.g., [".whl", ".tar.gz"])
"""
TlsPassthrough: bool = False
SsrfIpAllowlist: tuple[str, ...] = ()
Config: dict[str, object] = field(default_factory=dict)
@classmethod
def from_dict(
@@ -61,44 +61,7 @@ class PipelockRoutePolicy:
) -> "PipelockRoutePolicy":
label = f"bottle '{bottle_name}' egress.routes[{idx}] pipelock"
d = as_json_object(raw, label)
for k in d:
if k not in ("tls_passthrough", "ssrf_ip_allowlist"):
raise ManifestError(
f"{label} has unknown key {k!r}; "
f"only 'tls_passthrough' and 'ssrf_ip_allowlist' "
f"are accepted"
)
tls_passthrough_raw = d.get("tls_passthrough", False)
if not isinstance(tls_passthrough_raw, bool):
raise ManifestError(
f"{label}.tls_passthrough must be a boolean "
f"(was {type(tls_passthrough_raw).__name__})"
)
ssrf_raw = d.get("ssrf_ip_allowlist", [])
if not isinstance(ssrf_raw, list):
raise ManifestError(
f"{label}.ssrf_ip_allowlist must be an array "
f"(was {type(ssrf_raw).__name__})"
)
ssrf_ip_allowlist: list[str] = []
for j, item in enumerate(ssrf_raw):
if not isinstance(item, str) or not item:
raise ManifestError(
f"{label}.ssrf_ip_allowlist[{j}] must be a non-empty "
f"string (was {type(item).__name__})"
)
try:
ipaddress.ip_network(item, strict=False)
except ValueError as e:
raise ManifestError(
f"{label}.ssrf_ip_allowlist[{j}] must be an IP address "
f"or CIDR (was {item!r}): {e}"
) from e
ssrf_ip_allowlist.append(item)
return cls(
TlsPassthrough=tls_passthrough_raw,
SsrfIpAllowlist=tuple(ssrf_ip_allowlist),
)
return cls(Config=d)
@dataclass(frozen=True)
+2 -2
View File
@@ -246,7 +246,7 @@ class GitUser:
@classmethod
def from_dict(cls, bottle_name: str, raw: object) -> "GitUser":
d = as_json_object(raw, f"bottle '{bottle_name}' git-gate.user")
for k in d.keys():
for k in d:
if k not in {"name", "email"}:
raise ManifestError(
f"bottle '{bottle_name}' git-gate.user has unknown key {k!r}; "
@@ -281,7 +281,7 @@ def parse_git_gate_config(
raw: object,
) -> tuple[tuple[GitEntry, ...], GitUser]:
d = as_json_object(raw, f"bottle '{bottle_name}' git-gate")
for k in d.keys():
for k in d:
if k not in {"user", "repos"}:
raise ManifestError(
f"bottle '{bottle_name}' git-gate has unknown key {k!r}; "
+14 -2
View File
@@ -132,8 +132,11 @@ def pipelock_effective_ssrf_ip_allowlist(
"""
seen: dict[str, None] = {ip: None for ip in extra}
for route in bottle.egress.routes:
for ip in route.Pipelock.SsrfIpAllowlist:
seen.setdefault(ip, None)
ssrf_raw = route.Pipelock.Config.get("ssrf_ip_allowlist", [])
if isinstance(ssrf_raw, list):
for ip in ssrf_raw:
if isinstance(ip, str):
seen.setdefault(ip, None)
return sorted(seen.keys())
@@ -220,6 +223,15 @@ def pipelock_build_config(
)
if effective_ssrf_ip_allowlist:
cfg["ssrf"] = {"ip_allowlist": effective_ssrf_ip_allowlist}
# Merge per-route pipelock config (e.g., response_body_scanning settings).
# Routes can specify arbitrary pipelock options that apply globally.
for route in bottle.egress.routes:
for key, value in route.Pipelock.Config.items():
if key not in ("tls_passthrough", "ssrf_ip_allowlist"):
if key not in cfg:
cfg[key] = value
return cfg
+1 -1
View File
@@ -9,7 +9,7 @@ import unittest
from datetime import datetime, timezone
from pathlib import Path
from bot_bottle.contrib.codex.codex_auth import (
from bot_bottle.codex_auth import (
codex_auth_path,
codex_dummy_auth_json,
codex_host_access_token,
+15 -31
View File
@@ -225,7 +225,7 @@ class TestPipelockPolicy(unittest.TestCase):
"host": "api.openai.com",
"pipelock": {"tls_passthrough": True},
}])
self.assertTrue(b.egress.routes[0].Pipelock.TlsPassthrough)
self.assertTrue(b.egress.routes[0].Pipelock.Config["tls_passthrough"])
def test_ssrf_ip_allowlist_route_policy(self):
b = _bottle([{
@@ -233,44 +233,28 @@ class TestPipelockPolicy(unittest.TestCase):
"pipelock": {"ssrf_ip_allowlist": ["100.78.141.42/32"]},
}])
self.assertEqual(
("100.78.141.42/32",),
b.egress.routes[0].Pipelock.SsrfIpAllowlist,
["100.78.141.42/32"],
b.egress.routes[0].Pipelock.Config["ssrf_ip_allowlist"],
)
def test_tls_passthrough_defaults_false(self):
def test_skip_scan_for_extensions_route_policy(self):
b = _bottle([{
"host": "files.pythonhosted.org",
"pipelock": {"skip_scan_for_extensions": [".whl", ".tar.gz"]},
}])
self.assertEqual(
[".whl", ".tar.gz"],
b.egress.routes[0].Pipelock.Config["skip_scan_for_extensions"],
)
def test_empty_config_when_pipelock_omitted(self):
b = _bottle([{"host": "api.openai.com"}])
self.assertFalse(b.egress.routes[0].Pipelock.TlsPassthrough)
self.assertEqual((), b.egress.routes[0].Pipelock.SsrfIpAllowlist)
self.assertEqual({}, b.egress.routes[0].Pipelock.Config)
def test_pipelock_policy_must_be_object(self):
with self.assertRaises(ManifestError):
_bottle([{"host": "x.example", "pipelock": True}])
def test_tls_passthrough_must_be_bool(self):
with self.assertRaises(ManifestError):
_bottle([{
"host": "x.example",
"pipelock": {"tls_passthrough": "yes"},
}])
def test_ssrf_ip_allowlist_must_be_array(self):
with self.assertRaises(ManifestError):
_bottle([{
"host": "x.example",
"pipelock": {"ssrf_ip_allowlist": "100.78.141.42/32"},
}])
def test_ssrf_ip_allowlist_items_must_be_cidr_or_ip(self):
with self.assertRaises(ManifestError):
_bottle([{
"host": "x.example",
"pipelock": {"ssrf_ip_allowlist": ["not-an-ip"]},
}])
def test_unknown_pipelock_key_rejected(self):
with self.assertRaises(ManifestError):
_bottle([{"host": "x.example", "pipelock": {"wat": True}}])
class TestRouteValidation(unittest.TestCase):
def test_duplicate_hosts_rejected(self):