Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 295d65e4ef | |||
| 0f5d484151 |
@@ -8,7 +8,6 @@ on:
|
|||||||
- '**.py'
|
- '**.py'
|
||||||
- '.pylintrc'
|
- '.pylintrc'
|
||||||
- 'pyrightconfig.json'
|
- 'pyrightconfig.json'
|
||||||
workflow_dispatch:
|
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
update-badges:
|
update-badges:
|
||||||
|
|||||||
@@ -419,8 +419,7 @@ disable=raw-checker-failed,
|
|||||||
too-many-instance-attributes,
|
too-many-instance-attributes,
|
||||||
duplicate-code,
|
duplicate-code,
|
||||||
import-outside-toplevel,
|
import-outside-toplevel,
|
||||||
too-few-public-methods,
|
too-few-public-methods
|
||||||
unnecessary-ellipsis
|
|
||||||
|
|
||||||
# Enable the message, report, category or checker with the given id(s). You can
|
# Enable the message, report, category or checker with the given id(s). You can
|
||||||
# either give multiple identifier separated by comma (,) or put this option
|
# either give multiple identifier separated by comma (,) or put this option
|
||||||
|
|||||||
@@ -36,6 +36,7 @@ follow-up tracked separately)."""
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import fcntl
|
import fcntl
|
||||||
|
import io
|
||||||
import signal
|
import signal
|
||||||
import struct
|
import struct
|
||||||
import subprocess
|
import subprocess
|
||||||
@@ -68,7 +69,11 @@ def _read_winsize() -> tuple[int, int] | None:
|
|||||||
- tmux respawn-pane: tmux sets all three to the pane's PTY.
|
- tmux respawn-pane: tmux sets all three to the pane's PTY.
|
||||||
- non-TTY (someone piped stdin in tests): none are; the
|
- non-TTY (someone piped stdin in tests): none are; the
|
||||||
sync just no-ops, which is the right behavior."""
|
sync just no-ops, which is the right behavior."""
|
||||||
for fd in (sys.stdin.fileno(), sys.stdout.fileno(), sys.stderr.fileno()):
|
for default_fd, stream in enumerate((sys.stdin, sys.stdout, sys.stderr)):
|
||||||
|
try:
|
||||||
|
fd = stream.fileno()
|
||||||
|
except (AttributeError, io.UnsupportedOperation, OSError):
|
||||||
|
fd = default_fd
|
||||||
try:
|
try:
|
||||||
data = fcntl.ioctl(fd, termios.TIOCGWINSZ, b"\x00" * 8)
|
data = fcntl.ioctl(fd, termios.TIOCGWINSZ, b"\x00" * 8)
|
||||||
except OSError:
|
except OSError:
|
||||||
|
|||||||
@@ -42,7 +42,8 @@ def filter_select(
|
|||||||
# Use os.dup() to duplicate the fd so the original file object
|
# Use os.dup() to duplicate the fd so the original file object
|
||||||
# and FileIO in _run_picker each manage independent copies,
|
# and FileIO in _run_picker each manage independent copies,
|
||||||
# preventing double-close errors.
|
# preventing double-close errors.
|
||||||
fd_dup = os.dup(tty_fd.fileno())
|
import os as _os
|
||||||
|
fd_dup = _os.dup(tty_fd.fileno())
|
||||||
return _run_picker(items, title=title, tty_fd=fd_dup)
|
return _run_picker(items, title=title, tty_fd=fd_dup)
|
||||||
finally:
|
finally:
|
||||||
tty_fd.close()
|
tty_fd.close()
|
||||||
|
|||||||
@@ -23,7 +23,7 @@ from ...agent_provider import (
|
|||||||
AgentProvisionFile,
|
AgentProvisionFile,
|
||||||
AgentProvisionPlan,
|
AgentProvisionPlan,
|
||||||
)
|
)
|
||||||
from ...codex_auth import codex_host_access_token, write_codex_dummy_auth_file
|
from .codex_auth import codex_host_access_token, write_codex_dummy_auth_file
|
||||||
from ...egress import CODEX_HOST_CREDENTIAL_TOKEN_REF, EgressRoute
|
from ...egress import CODEX_HOST_CREDENTIAL_TOKEN_REF, EgressRoute
|
||||||
from ...log import die, info, warn
|
from ...log import die, info, warn
|
||||||
|
|
||||||
|
|||||||
@@ -15,8 +15,8 @@ from datetime import datetime, timezone
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import cast
|
from typing import cast
|
||||||
|
|
||||||
from .log import die
|
from bot_bottle.log import die
|
||||||
from .util import expand_tilde
|
from bot_bottle.util import expand_tilde
|
||||||
|
|
||||||
|
|
||||||
def codex_auth_path(host_env: dict[str, str] | None = None) -> Path:
|
def codex_auth_path(host_env: dict[str, str] | None = None) -> Path:
|
||||||
@@ -153,7 +153,9 @@ def _dummy_jwt_from_host(
|
|||||||
return _dummy_jwt(now, exp_ts=exp_ts)
|
return _dummy_jwt(now, exp_ts=exp_ts)
|
||||||
if not isinstance(payload, dict):
|
if not isinstance(payload, dict):
|
||||||
return _dummy_jwt(now, exp_ts=exp_ts)
|
return _dummy_jwt(now, exp_ts=exp_ts)
|
||||||
return _encode_dummy_jwt(_redact_jwt_payload(cast(dict[str, object], payload), now=now, exp_ts=exp_ts))
|
return _encode_dummy_jwt(
|
||||||
|
_redact_jwt_payload(cast(dict[str, object], payload), now=now, exp_ts=exp_ts)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def _encode_dummy_jwt(payload: dict[str, object]) -> str:
|
def _encode_dummy_jwt(payload: dict[str, object]) -> str:
|
||||||
@@ -141,15 +141,13 @@ def egress_manifest_routes(
|
|||||||
routes are merged."""
|
routes are merged."""
|
||||||
out: list[EgressRoute] = []
|
out: list[EgressRoute] = []
|
||||||
for r in bottle.egress.routes:
|
for r in bottle.egress.routes:
|
||||||
tls_pt = r.Pipelock.Config.get("tls_passthrough", False)
|
|
||||||
tls_passthrough = tls_pt if isinstance(tls_pt, bool) else False
|
|
||||||
out.append(EgressRoute(
|
out.append(EgressRoute(
|
||||||
host=r.Host,
|
host=r.Host,
|
||||||
path_allowlist=r.PathAllowlist,
|
path_allowlist=r.PathAllowlist,
|
||||||
auth_scheme=r.AuthScheme,
|
auth_scheme=r.AuthScheme,
|
||||||
token_ref=r.TokenRef,
|
token_ref=r.TokenRef,
|
||||||
roles=r.Role,
|
roles=r.Role,
|
||||||
tls_passthrough=tls_passthrough,
|
tls_passthrough=r.Pipelock.TlsPassthrough,
|
||||||
))
|
))
|
||||||
return tuple(out)
|
return tuple(out)
|
||||||
|
|
||||||
|
|||||||
@@ -161,7 +161,7 @@ class Agent:
|
|||||||
git_raw = d.get("git-gate")
|
git_raw = d.get("git-gate")
|
||||||
if git_raw is not None:
|
if git_raw is not None:
|
||||||
gd = as_json_object(git_raw, f"agent '{name}' git-gate")
|
gd = as_json_object(git_raw, f"agent '{name}' git-gate")
|
||||||
for k in gd:
|
for k in gd.keys():
|
||||||
if k != "user":
|
if k != "user":
|
||||||
raise ManifestError(
|
raise ManifestError(
|
||||||
f"agent '{name}' git-gate.{k} is not allowed at the "
|
f"agent '{name}' git-gate.{k} is not allowed at the "
|
||||||
|
|||||||
@@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import ipaddress
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
from typing import cast
|
from typing import cast
|
||||||
|
|
||||||
@@ -42,18 +43,17 @@ def validate_egress_routes(
|
|||||||
class PipelockRoutePolicy:
|
class PipelockRoutePolicy:
|
||||||
"""Per-route pipelock policy overrides.
|
"""Per-route pipelock policy overrides.
|
||||||
|
|
||||||
Stores raw pipelock configuration that's passed through to the
|
`TlsPassthrough` adds the route host to pipelock's
|
||||||
pipelock sidecar. Pipelock validates all config options, so
|
`tls_interception.passthrough_domains`, so pipelock still enforces
|
||||||
bot-bottle forwards manifest settings without coercion or strict
|
the hostname allowlist but does not MITM/decrypt request bodies or
|
||||||
validation. Supported options include:
|
headers for that host.
|
||||||
|
|
||||||
- `tls_passthrough`: bool — skip TLS MITM for this host
|
`SsrfIpAllowlist` adds explicit IPs/CIDRs to pipelock's SSRF
|
||||||
- `ssrf_ip_allowlist`: list of CIDR/IP — allow private destinations
|
allowlist for private/internal destinations behind this route.
|
||||||
- `skip_scan_for_extensions`: list of file extensions to skip DLP
|
|
||||||
scanning for (e.g., [".whl", ".tar.gz"])
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
Config: dict[str, object] = field(default_factory=dict)
|
TlsPassthrough: bool = False
|
||||||
|
SsrfIpAllowlist: tuple[str, ...] = ()
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_dict(
|
def from_dict(
|
||||||
@@ -61,7 +61,44 @@ class PipelockRoutePolicy:
|
|||||||
) -> "PipelockRoutePolicy":
|
) -> "PipelockRoutePolicy":
|
||||||
label = f"bottle '{bottle_name}' egress.routes[{idx}] pipelock"
|
label = f"bottle '{bottle_name}' egress.routes[{idx}] pipelock"
|
||||||
d = as_json_object(raw, label)
|
d = as_json_object(raw, label)
|
||||||
return cls(Config=d)
|
for k in d:
|
||||||
|
if k not in ("tls_passthrough", "ssrf_ip_allowlist"):
|
||||||
|
raise ManifestError(
|
||||||
|
f"{label} has unknown key {k!r}; "
|
||||||
|
f"only 'tls_passthrough' and 'ssrf_ip_allowlist' "
|
||||||
|
f"are accepted"
|
||||||
|
)
|
||||||
|
tls_passthrough_raw = d.get("tls_passthrough", False)
|
||||||
|
if not isinstance(tls_passthrough_raw, bool):
|
||||||
|
raise ManifestError(
|
||||||
|
f"{label}.tls_passthrough must be a boolean "
|
||||||
|
f"(was {type(tls_passthrough_raw).__name__})"
|
||||||
|
)
|
||||||
|
ssrf_raw = d.get("ssrf_ip_allowlist", [])
|
||||||
|
if not isinstance(ssrf_raw, list):
|
||||||
|
raise ManifestError(
|
||||||
|
f"{label}.ssrf_ip_allowlist must be an array "
|
||||||
|
f"(was {type(ssrf_raw).__name__})"
|
||||||
|
)
|
||||||
|
ssrf_ip_allowlist: list[str] = []
|
||||||
|
for j, item in enumerate(ssrf_raw):
|
||||||
|
if not isinstance(item, str) or not item:
|
||||||
|
raise ManifestError(
|
||||||
|
f"{label}.ssrf_ip_allowlist[{j}] must be a non-empty "
|
||||||
|
f"string (was {type(item).__name__})"
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
ipaddress.ip_network(item, strict=False)
|
||||||
|
except ValueError as e:
|
||||||
|
raise ManifestError(
|
||||||
|
f"{label}.ssrf_ip_allowlist[{j}] must be an IP address "
|
||||||
|
f"or CIDR (was {item!r}): {e}"
|
||||||
|
) from e
|
||||||
|
ssrf_ip_allowlist.append(item)
|
||||||
|
return cls(
|
||||||
|
TlsPassthrough=tls_passthrough_raw,
|
||||||
|
SsrfIpAllowlist=tuple(ssrf_ip_allowlist),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@dataclass(frozen=True)
|
@dataclass(frozen=True)
|
||||||
|
|||||||
@@ -246,7 +246,7 @@ class GitUser:
|
|||||||
@classmethod
|
@classmethod
|
||||||
def from_dict(cls, bottle_name: str, raw: object) -> "GitUser":
|
def from_dict(cls, bottle_name: str, raw: object) -> "GitUser":
|
||||||
d = as_json_object(raw, f"bottle '{bottle_name}' git-gate.user")
|
d = as_json_object(raw, f"bottle '{bottle_name}' git-gate.user")
|
||||||
for k in d:
|
for k in d.keys():
|
||||||
if k not in {"name", "email"}:
|
if k not in {"name", "email"}:
|
||||||
raise ManifestError(
|
raise ManifestError(
|
||||||
f"bottle '{bottle_name}' git-gate.user has unknown key {k!r}; "
|
f"bottle '{bottle_name}' git-gate.user has unknown key {k!r}; "
|
||||||
@@ -281,7 +281,7 @@ def parse_git_gate_config(
|
|||||||
raw: object,
|
raw: object,
|
||||||
) -> tuple[tuple[GitEntry, ...], GitUser]:
|
) -> tuple[tuple[GitEntry, ...], GitUser]:
|
||||||
d = as_json_object(raw, f"bottle '{bottle_name}' git-gate")
|
d = as_json_object(raw, f"bottle '{bottle_name}' git-gate")
|
||||||
for k in d:
|
for k in d.keys():
|
||||||
if k not in {"user", "repos"}:
|
if k not in {"user", "repos"}:
|
||||||
raise ManifestError(
|
raise ManifestError(
|
||||||
f"bottle '{bottle_name}' git-gate has unknown key {k!r}; "
|
f"bottle '{bottle_name}' git-gate has unknown key {k!r}; "
|
||||||
|
|||||||
+2
-14
@@ -132,11 +132,8 @@ def pipelock_effective_ssrf_ip_allowlist(
|
|||||||
"""
|
"""
|
||||||
seen: dict[str, None] = {ip: None for ip in extra}
|
seen: dict[str, None] = {ip: None for ip in extra}
|
||||||
for route in bottle.egress.routes:
|
for route in bottle.egress.routes:
|
||||||
ssrf_raw = route.Pipelock.Config.get("ssrf_ip_allowlist", [])
|
for ip in route.Pipelock.SsrfIpAllowlist:
|
||||||
if isinstance(ssrf_raw, list):
|
seen.setdefault(ip, None)
|
||||||
for ip in ssrf_raw:
|
|
||||||
if isinstance(ip, str):
|
|
||||||
seen.setdefault(ip, None)
|
|
||||||
return sorted(seen.keys())
|
return sorted(seen.keys())
|
||||||
|
|
||||||
|
|
||||||
@@ -223,15 +220,6 @@ def pipelock_build_config(
|
|||||||
)
|
)
|
||||||
if effective_ssrf_ip_allowlist:
|
if effective_ssrf_ip_allowlist:
|
||||||
cfg["ssrf"] = {"ip_allowlist": effective_ssrf_ip_allowlist}
|
cfg["ssrf"] = {"ip_allowlist": effective_ssrf_ip_allowlist}
|
||||||
|
|
||||||
# Merge per-route pipelock config (e.g., response_body_scanning settings).
|
|
||||||
# Routes can specify arbitrary pipelock options that apply globally.
|
|
||||||
for route in bottle.egress.routes:
|
|
||||||
for key, value in route.Pipelock.Config.items():
|
|
||||||
if key not in ("tls_passthrough", "ssrf_ip_allowlist"):
|
|
||||||
if key not in cfg:
|
|
||||||
cfg[key] = value
|
|
||||||
|
|
||||||
return cfg
|
return cfg
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ import unittest
|
|||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from bot_bottle.codex_auth import (
|
from bot_bottle.contrib.codex.codex_auth import (
|
||||||
codex_auth_path,
|
codex_auth_path,
|
||||||
codex_dummy_auth_json,
|
codex_dummy_auth_json,
|
||||||
codex_host_access_token,
|
codex_host_access_token,
|
||||||
|
|||||||
@@ -225,7 +225,7 @@ class TestPipelockPolicy(unittest.TestCase):
|
|||||||
"host": "api.openai.com",
|
"host": "api.openai.com",
|
||||||
"pipelock": {"tls_passthrough": True},
|
"pipelock": {"tls_passthrough": True},
|
||||||
}])
|
}])
|
||||||
self.assertTrue(b.egress.routes[0].Pipelock.Config["tls_passthrough"])
|
self.assertTrue(b.egress.routes[0].Pipelock.TlsPassthrough)
|
||||||
|
|
||||||
def test_ssrf_ip_allowlist_route_policy(self):
|
def test_ssrf_ip_allowlist_route_policy(self):
|
||||||
b = _bottle([{
|
b = _bottle([{
|
||||||
@@ -233,28 +233,44 @@ class TestPipelockPolicy(unittest.TestCase):
|
|||||||
"pipelock": {"ssrf_ip_allowlist": ["100.78.141.42/32"]},
|
"pipelock": {"ssrf_ip_allowlist": ["100.78.141.42/32"]},
|
||||||
}])
|
}])
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
["100.78.141.42/32"],
|
("100.78.141.42/32",),
|
||||||
b.egress.routes[0].Pipelock.Config["ssrf_ip_allowlist"],
|
b.egress.routes[0].Pipelock.SsrfIpAllowlist,
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_skip_scan_for_extensions_route_policy(self):
|
def test_tls_passthrough_defaults_false(self):
|
||||||
b = _bottle([{
|
|
||||||
"host": "files.pythonhosted.org",
|
|
||||||
"pipelock": {"skip_scan_for_extensions": [".whl", ".tar.gz"]},
|
|
||||||
}])
|
|
||||||
self.assertEqual(
|
|
||||||
[".whl", ".tar.gz"],
|
|
||||||
b.egress.routes[0].Pipelock.Config["skip_scan_for_extensions"],
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_empty_config_when_pipelock_omitted(self):
|
|
||||||
b = _bottle([{"host": "api.openai.com"}])
|
b = _bottle([{"host": "api.openai.com"}])
|
||||||
self.assertEqual({}, b.egress.routes[0].Pipelock.Config)
|
self.assertFalse(b.egress.routes[0].Pipelock.TlsPassthrough)
|
||||||
|
self.assertEqual((), b.egress.routes[0].Pipelock.SsrfIpAllowlist)
|
||||||
|
|
||||||
def test_pipelock_policy_must_be_object(self):
|
def test_pipelock_policy_must_be_object(self):
|
||||||
with self.assertRaises(ManifestError):
|
with self.assertRaises(ManifestError):
|
||||||
_bottle([{"host": "x.example", "pipelock": True}])
|
_bottle([{"host": "x.example", "pipelock": True}])
|
||||||
|
|
||||||
|
def test_tls_passthrough_must_be_bool(self):
|
||||||
|
with self.assertRaises(ManifestError):
|
||||||
|
_bottle([{
|
||||||
|
"host": "x.example",
|
||||||
|
"pipelock": {"tls_passthrough": "yes"},
|
||||||
|
}])
|
||||||
|
|
||||||
|
def test_ssrf_ip_allowlist_must_be_array(self):
|
||||||
|
with self.assertRaises(ManifestError):
|
||||||
|
_bottle([{
|
||||||
|
"host": "x.example",
|
||||||
|
"pipelock": {"ssrf_ip_allowlist": "100.78.141.42/32"},
|
||||||
|
}])
|
||||||
|
|
||||||
|
def test_ssrf_ip_allowlist_items_must_be_cidr_or_ip(self):
|
||||||
|
with self.assertRaises(ManifestError):
|
||||||
|
_bottle([{
|
||||||
|
"host": "x.example",
|
||||||
|
"pipelock": {"ssrf_ip_allowlist": ["not-an-ip"]},
|
||||||
|
}])
|
||||||
|
|
||||||
|
def test_unknown_pipelock_key_rejected(self):
|
||||||
|
with self.assertRaises(ManifestError):
|
||||||
|
_bottle([{"host": "x.example", "pipelock": {"wat": True}}])
|
||||||
|
|
||||||
|
|
||||||
class TestRouteValidation(unittest.TestCase):
|
class TestRouteValidation(unittest.TestCase):
|
||||||
def test_duplicate_hosts_rejected(self):
|
def test_duplicate_hosts_rejected(self):
|
||||||
|
|||||||
Reference in New Issue
Block a user