diff --git a/bot_bottle/backend/docker/pipelock.py b/bot_bottle/backend/docker/pipelock.py deleted file mode 100644 index 53d2c2a..0000000 --- a/bot_bottle/backend/docker/pipelock.py +++ /dev/null @@ -1,74 +0,0 @@ -"""Docker-side pipelock helpers: image pin, container naming, and -the one-shot `pipelock tls init` host-side CA mint. The -prepare-time YAML rendering itself lives on the platform-neutral -`PipelockProxy` ABC — backends instantiate it directly. - -The per-container `.start()` / `.stop()` lifecycle was deleted in -PRD 0024 chunk 3; compose-up owns the container lifecycle (PRD -0018) and the bundle path (PRD 0024) collapses pipelock + egress -+ git-gate + supervise into one container.""" - -from __future__ import annotations - -import os -import subprocess -from pathlib import Path - -from ...log import die - - -# Pipelock image, pinned by digest. The digest is the multi-arch image -# index for ghcr.io/luckypipewrench/pipelock:2.3.0. -PIPELOCK_IMAGE = os.environ.get( - "BOT_BOTTLE_PIPELOCK_IMAGE", - "ghcr.io/luckypipewrench/pipelock@sha256:" - "3b1a39417b98406ddc5dc2d8fcb42865ddc0c68a43d355db55f0f8cb06bc6de9", -) - -# Listening port for pipelock's forward proxy. -PIPELOCK_PORT = os.environ.get("BOT_BOTTLE_PIPELOCK_PORT", "8888") - - -# The URL egress dials for its upstream HTTPS_PROXY. egress and pipelock -# share the same container's network namespace inside the sidecar bundle, so -# loopback reaches pipelock directly — no docker DNS aliases involved. -BUNDLE_LOCAL_PIPELOCK_URL = f"http://127.0.0.1:{PIPELOCK_PORT}" - - -def pipelock_tls_init(stage_dir: Path) -> tuple[Path, Path]: - """Generate a fresh per-bottle CA via a one-shot pipelock container. - - Runs `pipelock tls init` against a host-mounted scratch dir, leaving - `ca.pem` (public cert, mode 600) and `ca-key.pem` (private key, mode - 600) under `/pipelock-ca/`. Returns the two host paths. - - The image is pinned (same digest the running sidecar uses) so the - generated CA matches what the sidecar expects. Output is owned by - whatever UID the one-shot ran as; the compose renderer's - bind-mounts pin the files in place at runtime, so ownership - inside the running sidecar (root in pipelock's distroless image) - is independent.""" - work = stage_dir / "pipelock-ca" - work.mkdir(exist_ok=True) - result = subprocess.run( - ["docker", "run", "--rm", - "-v", f"{work}:/h", - "-e", "PIPELOCK_HOME=/h", - PIPELOCK_IMAGE, "tls", "init"], - capture_output=True, - text=True, - check=False, - ) - if result.returncode != 0: - die(f"pipelock tls init failed: {result.stderr.strip()}") - cert = work / "ca.pem" - key = work / "ca-key.pem" - if not cert.is_file() or not key.is_file(): - die(f"pipelock tls init did not produce ca files in {work}") - # Explicit perms in case a future pipelock release changes - # defaults. Pipelock runs as root in its distroless image and - # bind-mounts work with 0o600 (root reads everything); the key - # has no reason to be readable to anyone else on the host. - key.chmod(0o600) - cert.chmod(0o644) - return (cert, key) diff --git a/bot_bottle/backend/docker/pipelock_apply.py b/bot_bottle/backend/docker/pipelock_apply.py deleted file mode 100644 index e66251d..0000000 --- a/bot_bottle/backend/docker/pipelock_apply.py +++ /dev/null @@ -1,200 +0,0 @@ -"""pipelock_apply — host-side helper to apply an api_allowlist -change to a running pipelock sidecar (PRD 0015). - -Used by the supervise dashboard when the operator approves a -pipelock-block proposal (or runs the operator-initiated `pipelock -edit ` verb). Fetches the current pipelock.yaml via `docker -exec`, parses it, swaps the api_allowlist with the proposed hosts, -re-renders, writes back via the bind-mount path, then signals the -bundle supervisor to restart the pipelock daemon (`docker kill ---signal USR1`) so -pipelock picks up the new config. - -v1 uses restart, not SIGHUP — pipelock has no in-process reload -hook and adding one is the "SIGHUP reload for pipelock" open -question in PRD 0015. Restart drops in-flight outbound calls; the -agent's HTTP client retries pick up against the restarted proxy. -""" - -from __future__ import annotations - -import os -import re -import subprocess -import tempfile -from pathlib import Path - -from ...pipelock import pipelock_render_yaml -from ...yaml_subset import YamlSubsetError, parse_yaml_subset -from .bottle_state import pipelock_state_dir -from .sidecar_bundle import sidecar_bundle_container_name - - -def _pipelock_yaml_host_path(slug: str) -> Path: - """The bind-mount source for the pipelock sidecar's - pipelock.yaml — matches what pipelock.prepare wrote at chunk-2 - paths.""" - return pipelock_state_dir(slug) / "pipelock.yaml" - - -PIPELOCK_YAML_IN_CONTAINER = "/etc/pipelock.yaml" - -# Allowlist proposals are one-hostname-per-line. Blank lines and -# `#`-prefixed comments are ignored. The character set matches the -# supervise sidecar's syntactic check on the agent's pipelock-block -# proposal (alphanumerics + dot/dash/underscore). -_HOST_OK = re.compile(r"^[A-Za-z0-9_.-]+$") - - -class PipelockApplyError(RuntimeError): - """Raised when fetch / parse / apply fails. The dashboard renders - the message and keeps the proposal pending — never crashes.""" - - -def parse_allowlist_content(content: str) -> list[str]: - """One hostname per line. Blanks and `#` comments are ignored. - Raises PipelockApplyError if a line has a disallowed character.""" - hosts: list[str] = [] - for i, raw_line in enumerate(content.splitlines(), start=1): - line = raw_line.strip() - if not line or line.startswith("#"): - continue - if not _HOST_OK.match(line): - raise PipelockApplyError( - f"allowlist line {i}: {line!r} has disallowed characters" - ) - hosts.append(line) - return hosts - - -def render_allowlist_content(hosts: list[str]) -> str: - """Hosts → one-per-line string (the operator-facing format).""" - if not hosts: - return "" - return "\n".join(hosts) + "\n" - - -def fetch_current_yaml(slug: str) -> str: - """Read the live /etc/pipelock.yaml from the sidecar bundle. - - Uses `docker cp` because pipelock inside the bundle is the - distroless pipelock binary with no shell, and `docker cp` is a - daemon-API tarball copy that works regardless of what's - available inside the container. - - Raises PipelockApplyError if the read fails.""" - container = sidecar_bundle_container_name(slug) - fd, tmp_path = tempfile.mkstemp(prefix="cb-pipelock-fetch.", suffix=".yaml") - os.close(fd) - try: - r = subprocess.run( - [ - "docker", "cp", - f"{container}:{PIPELOCK_YAML_IN_CONTAINER}", tmp_path, - ], - capture_output=True, text=True, check=False, - ) - if r.returncode != 0: - raise PipelockApplyError( - f"could not fetch pipelock.yaml from {container}: " - f"{(r.stderr or '').strip() or 'container not running?'}" - ) - return Path(tmp_path).read_text(encoding="utf-8") - finally: - try: - Path(tmp_path).unlink() - except OSError: - pass - - -def fetch_current_allowlist(slug: str) -> str: - """Fetch the live yaml, extract api_allowlist, render as one-per- - line — the operator-facing format for the TUI / agent's - current-config mount.""" - yaml = fetch_current_yaml(slug) - try: - cfg = parse_yaml_subset(yaml) - except YamlSubsetError as e: - raise PipelockApplyError(f"running pipelock yaml: {e}") from e - hosts = cfg.get("api_allowlist", []) - if not isinstance(hosts, list): - raise PipelockApplyError( - "running pipelock yaml: api_allowlist is not a list" - ) - return render_allowlist_content([str(h) for h in hosts]) - - -def apply_allowlist_change( - slug: str, new_allowlist_content: str, -) -> tuple[str, str]: - """Apply `new_allowlist_content` to the sidecar bundle: - 1. Parse the proposed hosts (one per line). - 2. Fetch + parse current pipelock.yaml. - 3. Replace api_allowlist with the proposed hosts; re-render. - 4. Write the new yaml to the bind-mount source. - 5. `docker kill --signal USR1 ` so the supervisor - restarts the pipelock daemon in place (leaving egress, - git-gate, and supervise running). Pipelock has no - in-process reload; the supervisor's per-daemon restart - keeps the agent's MCP socket alive — a whole-bundle - `docker restart` would bounce supervise too. - - Returns (before, after) where both are one-per-line allowlist - strings (operator-facing format). Raises PipelockApplyError on - any failure; the sidecar's existing config stays in place until - the host write succeeds, and the SIGUSR1 is what makes it - live.""" - new_hosts = parse_allowlist_content(new_allowlist_content) - container = sidecar_bundle_container_name(slug) - current_yaml = fetch_current_yaml(slug) - try: - cfg = parse_yaml_subset(current_yaml) - except YamlSubsetError as e: - raise PipelockApplyError(f"running pipelock yaml: {e}") from e - current_hosts = cfg.get("api_allowlist", []) - if not isinstance(current_hosts, list): - raise PipelockApplyError( - "running pipelock yaml: api_allowlist is not a list" - ) - - before = render_allowlist_content([str(h) for h in current_hosts]) - after = render_allowlist_content(new_hosts) - - cfg["api_allowlist"] = new_hosts - rendered = pipelock_render_yaml(cfg) - - # pipelock.yaml is bind-mounted into the container as a SINGLE - # FILE — same Docker single-file inode issue as egress_apply: - # write-temp-then-rename swaps the host inode and leaves the - # container's mount pointing at the orphaned old one. Write - # in-place. The SIGUSR1 below makes the new content live - # (pipelock has no in-process reload, so the supervisor - # restarts the pipelock daemon in response). - target = _pipelock_yaml_host_path(slug) - target.parent.mkdir(parents=True, exist_ok=True) - target.write_text(rendered) - # pipelock runs as root in its distroless image — any mode is - # fine — but 0o600 matches what prepare wrote. - target.chmod(0o600) - restart = subprocess.run( - ["docker", "kill", "--signal", "USR1", container], - capture_output=True, text=True, check=False, - ) - if restart.returncode != 0: - raise PipelockApplyError( - f"failed to signal {container} for pipelock restart: " - f"{(restart.stderr or '').strip()}" - ) - - return before, after - - -__all__ = [ - "PIPELOCK_YAML_IN_CONTAINER", - "PipelockApplyError", - "apply_allowlist_change", - "fetch_current_allowlist", - "fetch_current_yaml", - "parse_allowlist_content", - "render_allowlist_content", -] diff --git a/bot_bottle/manifest.py b/bot_bottle/manifest.py index 8be2e44..63ad90d 100644 --- a/bot_bottle/manifest.py +++ b/bot_bottle/manifest.py @@ -56,7 +56,6 @@ from .manifest_egress import ( EGRESS_AUTH_SCHEMES, EgressConfig, EgressRoute, - PipelockRoutePolicy, ) from .manifest_git import GitEntry, GitUser, parse_git_gate_config from .manifest_schema import BOTTLE_KEYS @@ -68,7 +67,6 @@ __all__ = [ "GitUser", "AgentProvider", "EGRESS_AUTH_SCHEMES", - "PipelockRoutePolicy", "EgressRoute", "EgressConfig", "Agent", diff --git a/bot_bottle/manifest_egress.py b/bot_bottle/manifest_egress.py index 24a6b67..6f7c1d7 100644 --- a/bot_bottle/manifest_egress.py +++ b/bot_bottle/manifest_egress.py @@ -2,8 +2,7 @@ from __future__ import annotations -import ipaddress -from dataclasses import dataclass, field +from dataclasses import dataclass from typing import cast from .manifest_util import ManifestError, as_json_object @@ -39,68 +38,6 @@ def validate_egress_routes( seen_hosts[key] = None -@dataclass(frozen=True) -class PipelockRoutePolicy: - """Per-route pipelock policy overrides. - - `TlsPassthrough` adds the route host to pipelock's - `tls_interception.passthrough_domains`, so pipelock still enforces - the hostname allowlist but does not MITM/decrypt request bodies or - headers for that host. - - `SsrfIpAllowlist` adds explicit IPs/CIDRs to pipelock's SSRF - allowlist for private/internal destinations behind this route. - """ - - TlsPassthrough: bool = False - SsrfIpAllowlist: tuple[str, ...] = () - - @classmethod - def from_dict( - cls, bottle_name: str, idx: int, raw: object, - ) -> "PipelockRoutePolicy": - label = f"bottle '{bottle_name}' egress.routes[{idx}] pipelock" - d = as_json_object(raw, label) - for k in d: - if k not in ("tls_passthrough", "ssrf_ip_allowlist"): - raise ManifestError( - f"{label} has unknown key {k!r}; " - f"only 'tls_passthrough' and 'ssrf_ip_allowlist' " - f"are accepted" - ) - tls_passthrough_raw = d.get("tls_passthrough", False) - if not isinstance(tls_passthrough_raw, bool): - raise ManifestError( - f"{label}.tls_passthrough must be a boolean " - f"(was {type(tls_passthrough_raw).__name__})" - ) - ssrf_raw = d.get("ssrf_ip_allowlist", []) - if not isinstance(ssrf_raw, list): - raise ManifestError( - f"{label}.ssrf_ip_allowlist must be an array " - f"(was {type(ssrf_raw).__name__})" - ) - ssrf_ip_allowlist: list[str] = [] - for j, item in enumerate(ssrf_raw): - if not isinstance(item, str) or not item: - raise ManifestError( - f"{label}.ssrf_ip_allowlist[{j}] must be a non-empty " - f"string (was {type(item).__name__})" - ) - try: - ipaddress.ip_network(item, strict=False) - except ValueError as e: - raise ManifestError( - f"{label}.ssrf_ip_allowlist[{j}] must be an IP address " - f"or CIDR (was {item!r}): {e}" - ) from e - ssrf_ip_allowlist.append(item) - return cls( - TlsPassthrough=tls_passthrough_raw, - SsrfIpAllowlist=tuple(ssrf_ip_allowlist), - ) - - @dataclass(frozen=True) class EgressRoute: """One route on the per-bottle egress sidecar (PRD 0017). @@ -132,7 +69,6 @@ class EgressRoute: AuthScheme: str = "" TokenRef: str = "" Role: tuple[str, ...] = () - Pipelock: PipelockRoutePolicy = field(default_factory=PipelockRoutePolicy) @classmethod def from_dict(cls, bottle_name: str, idx: int, raw: object) -> "EgressRoute": @@ -229,17 +165,11 @@ class EgressRoute: f"the 'role' field is reserved for future use" ) - pipelock = ( - PipelockRoutePolicy.from_dict(bottle_name, idx, d["pipelock"]) - if "pipelock" in d - else PipelockRoutePolicy() - ) - for k in d: - if k not in ("host", "path_allowlist", "auth", "role", "pipelock"): + if k not in ("host", "path_allowlist", "auth", "role"): raise ManifestError( f"{label} has unknown key {k!r}; accepted keys are " - f"'host', 'path_allowlist', 'auth', 'role', 'pipelock'" + f"'host', 'path_allowlist', 'auth', 'role'" ) return cls( @@ -248,7 +178,6 @@ class EgressRoute: AuthScheme=auth_scheme, TokenRef=token_ref, Role=roles, - Pipelock=pipelock, ) diff --git a/bot_bottle/pipelock.py b/bot_bottle/pipelock.py deleted file mode 100644 index c9ea82d..0000000 --- a/bot_bottle/pipelock.py +++ /dev/null @@ -1,541 +0,0 @@ -"""Pipelock sidecar lifecycle for the per-agent egress topology. - -Pipelock (https://github.com/luckyPipewrench/pipelock) is an HTTP -forward proxy with hostname allowlisting + DLP scanning + URL-entropy -checks. One sidecar per agent, attached to the agent's --internal -network and a per-agent user-defined egress bridge. - -Post-PRD-0017 topology: the agent's HTTP_PROXY points at egress -(not pipelock); egress sets `HTTPS_PROXY=pipelock` on its -outbound leg. So pipelock no longer sees the agent's connections -directly — it sees the egress → upstream leg, applies the -hostname allowlist + DLP body scan there, and forwards to the real -upstream. - -Image pin: ghcr.io/luckypipewrench/pipelock@sha256: for tag 2.3.0. -""" - -from __future__ import annotations - -from dataclasses import dataclass -from pathlib import Path -from typing import cast - -from .egress import EgressRoute, egress_routes_for_bottle -from .supervise import SUPERVISE_HOSTNAME -from .manifest import Bottle - -# Hosts pipelock should NOT TLS-MITM, even when tls_interception is -# enabled. This is now route-owned manifest policy via -# `egress.routes[].pipelock.tls_passthrough`; no provider hosts are -# injected implicitly. -DEFAULT_TLS_PASSTHROUGH: tuple[str, ...] = () - - -# In-container paths the rendered pipelock YAML references under -# `tls_interception`. The pipelock binary expects the per-bottle CA -# cert + key at these exact paths inside its container — independent -# of how the daemon is wrapped (own container, sidecar bundle, etc.), -# which is why they live in the platform-neutral module. -PIPELOCK_CA_CERT_IN_CONTAINER = "/etc/pipelock-ca.pem" -PIPELOCK_CA_KEY_IN_CONTAINER = "/etc/pipelock-ca-key.pem" - - -# Short network alias for pipelock inside the sidecar bundle. The -# agent's HTTP_PROXY (when no egress is declared) and any in-bundle -# consumer's URL both reference this name. -PIPELOCK_HOSTNAME = "pipelock" - - -# --- Allowlist resolution -------------------------------------------------- - - -def pipelock_effective_allowlist( - bottle: Bottle, - provider_routes: tuple[EgressRoute, ...] = (), -) -> list[str]: - """Hostnames pipelock allows. Sorted for stability. - - Always mirrors `egress_routes_for_bottle(bottle, provider_routes)` — - egress is the single allowlist surface, and pipelock's allowlist is - the downstream copy for defense-in-depth + DLP body scanning. For - bottles without any `egress.routes[]` declared, this is empty except - for supervise sidecar traffic when `supervise: true`. - - The supervise sidecar's hostname is auto-added when supervise - is enabled (sibling-sidecar traffic that flows through pipelock - would otherwise be 403'd). Git upstreams declared in - `bottle.git` do NOT contribute here — git traffic flows - through git-gate (PRD 0008), not pipelock.""" - seen: dict[str, None] = {} - for r in egress_routes_for_bottle(bottle, provider_routes): - if r.host: - seen.setdefault(r.host, None) - if bottle.supervise: - seen.setdefault(SUPERVISE_HOSTNAME, None) - return sorted(seen.keys()) - - -def pipelock_seed_phrase_detection_enabled(bottle: Bottle) -> bool: - """Whether pipelock's BIP-39 seed-phrase detector stays on. - - LLM conversation bodies legitimately trip the detector — any 12+ - English words that pass the BIP-39 checksum match — so agents can - get blocked on ordinary prompts/responses regardless of provider - (Claude, Codex/OpenAI, or future harnesses). We tried two narrower - knobs first: - - - `suppress: [{rule, path}]` — pipelock accepts the schema - but the entry only silences the alert; the body_dlp block - still fires. - - `rules.disabled: ["dlp:BIP-39 Seed Phrase"]` — same shape, - same outcome: 403 still returned. - - Empirically only `seed_phrase_detection.enabled: false` - actually stops the block (verified by sending a 12-word BIP-39 - body through three pipelock instances). It is a global toggle — - no per-path / per-host knob in pipelock 2.3.0 — so we turn off - only this detector for every bottle. The rest of pipelock's DLP - defaults and request-body/header scanning remain enabled.""" - del bottle # kept for call-site stability and future policy knobs. - return False - - -def pipelock_effective_tls_passthrough( - bottle: Bottle, - provider_routes: tuple[EgressRoute, ...] = (), -) -> list[str]: - """Hostnames pipelock should pass through (no TLS MITM). - - A manifest route opts in with `pipelock.tls_passthrough: true` - (lifted into `EgressRoute.tls_passthrough` in `egress_manifest_routes`). - Provider routes that set `tls_passthrough=True` (e.g. Codex credential - routes where egress injects the host bearer after the agent boundary) - are also included. Both arrive via `egress_routes_for_bottle` — no - provider-specific branching needed here. - """ - seen: dict[str, None] = {host: None for host in DEFAULT_TLS_PASSTHROUGH} - for route in egress_routes_for_bottle(bottle, provider_routes): - if route.tls_passthrough: - seen.setdefault(route.host, None) - return sorted(seen.keys()) - - -def pipelock_effective_ssrf_ip_allowlist( - bottle: Bottle, - extra: tuple[str, ...] = (), -) -> list[str]: - """IP/CIDR entries that bypass pipelock's SSRF destination guard. - - Launch code can pass backend-owned entries through `extra`, while - route-owned entries come from `pipelock.ssrf_ip_allowlist`. - """ - seen: dict[str, None] = {ip: None for ip in extra} - for route in bottle.egress.routes: - for ip in route.Pipelock.SsrfIpAllowlist: - seen.setdefault(ip, None) - return sorted(seen.keys()) - - - - - -# --- Config build + YAML render -------------------------------------------- - - -def pipelock_build_config( - bottle: Bottle, - *, - ca_cert_path: str = "", - ca_key_path: str = "", - ssrf_ip_allowlist: tuple[str, ...] = (), - provider_routes: tuple[EgressRoute, ...] = (), -) -> dict[str, object]: - """Build the structured pipelock config dict the sidecar will load. - - Deliberately carries no env values, no secrets, no per-agent - customization beyond the resolved hostname list. The shape mirrors - the YAML pipelock expects on disk; `pipelock_render_yaml` serializes - it. Tests assert on this dict; production code renders it. - - `ca_cert_path` / `ca_key_path` are the **in-container** paths the - pipelock sidecar will read its CA from at runtime (they're - populated into the container at start time via `docker cp`). - Pass both or neither: both → emit `tls_interception` block with - `enabled: true`; neither → omit the block entirely (pipelock - falls back to its built-in default of `enabled: false`). Used - by PRD 0006 to turn on pipelock's native TLS interception. - - `ssrf_ip_allowlist` is the list of IPs / CIDRs that bypass - pipelock's SSRF guard. Pipelock blocks RFC1918-resolved - destinations by default, which would catch sibling-sidecar - traffic on the bottle's internal Docker network in 172.x space - (e.g. egress → pipelock on the upstream leg). Pass the - bottle's internal network CIDR here so internal-network requests - pass through pipelock while api_allowlist + body-scanning still - apply. Empty by default; omitted from the rendered yaml when - empty so pipelock keeps its built-in SSRF defaults.""" - cfg: dict[str, object] = { - "version": 1, - "mode": "strict", - "enforce": True, - "api_allowlist": pipelock_effective_allowlist(bottle, provider_routes), - "forward_proxy": {"enabled": True}, - } - if not pipelock_seed_phrase_detection_enabled(bottle): - cfg["seed_phrase_detection"] = {"enabled": False} - cfg["dlp"] = {"include_defaults": True, "scan_env": True} - # Body-scan enforcement is a separate pipelock section (each DLP - # "surface" — body, MCP, response — has its own action). Pipelock's - # built-in default for request_body_scanning is "warn" (forward - # with a log line); bot-bottle hard-codes "block" so a hit - # actually stops the request from leaving the egress network. - # - # `scan_headers: true` + `header_mode: all` extends the scan to - # every request header — pipelock's default `header_mode: - # sensitive` only checks Authorization / Cookie / X-Api-Key / - # X-Token / Proxy-Authorization / X-Goog-Api-Key, which an - # agent attempting to exfil could trivially avoid by picking - # a non-sensitive header name. "all" closes the gap; pipelock - # caps it at the same max_body_bytes the body scan uses. - cfg["request_body_scanning"] = { - "action": "block", - "scan_headers": True, - "header_mode": "all", - } - if ca_cert_path or ca_key_path: - if not (ca_cert_path and ca_key_path): - raise ValueError( - "pipelock_build_config: pass both ca_cert_path and ca_key_path " - "to enable tls_interception, or neither to leave it off" - ) - cfg["tls_interception"] = { - "enabled": True, - "ca_cert": ca_cert_path, - "ca_key": ca_key_path, - "passthrough_domains": pipelock_effective_tls_passthrough(bottle, provider_routes), - } - effective_ssrf_ip_allowlist = pipelock_effective_ssrf_ip_allowlist( - bottle, ssrf_ip_allowlist, - ) - if effective_ssrf_ip_allowlist: - cfg["ssrf"] = {"ip_allowlist": effective_ssrf_ip_allowlist} - return cfg - - -_PIPELOCK_TOP_LEVEL_KEYS = { - "version", - "mode", - "enforce", - "api_allowlist", - "seed_phrase_detection", - "forward_proxy", - "dlp", - "request_body_scanning", - "tls_interception", - "ssrf", -} - - -def _pipelock_render_error(section: str, key: str, expected: str) -> ValueError: - return ValueError( - f"pipelock_render_yaml: {section}.{key} must be {expected}" - ) - - -def _reject_unknown_keys( - section: str, - obj: dict[str, object], - allowed: set[str], -) -> None: - for key in sorted(set(obj) - allowed): - raise ValueError(f"pipelock_render_yaml: {section}.{key} is unsupported") - - -def _required_dict( - obj: dict[str, object], - section: str, - key: str, -) -> dict[str, object]: - value = obj.get(key) - if not isinstance(value, dict): - raise _pipelock_render_error(section, key, "a mapping") - return cast(dict[str, object], value) - - -def _required_bool(obj: dict[str, object], section: str, key: str) -> bool: - value = obj.get(key) - if not isinstance(value, bool): - raise _pipelock_render_error(section, key, "a boolean") - return value - - -def _required_int(obj: dict[str, object], section: str, key: str) -> int: - value = obj.get(key) - if isinstance(value, bool) or not isinstance(value, int): - raise _pipelock_render_error(section, key, "an integer") - return value - - -def _required_str(obj: dict[str, object], section: str, key: str) -> str: - value = obj.get(key) - if not isinstance(value, str): - raise _pipelock_render_error(section, key, "a string") - return value - - -def _required_str_list( - obj: dict[str, object], - section: str, - key: str, -) -> list[str]: - value = obj.get(key) - if not isinstance(value, list): - raise _pipelock_render_error(section, key, "a list of strings") - value_list = cast(list[object], value) - if not all(isinstance(v, str) for v in value_list): - raise _pipelock_render_error(section, key, "a list of strings") - return cast(list[str], value) - - -def _optional_str_list( - obj: dict[str, object], - section: str, - key: str, -) -> list[str]: - if key not in obj: - return [] - return _required_str_list(obj, section, key) - - -def _optional_bool( - obj: dict[str, object], - section: str, - key: str, -) -> bool | None: - if key not in obj: - return None - return _required_bool(obj, section, key) - - -def _optional_str( - obj: dict[str, object], - section: str, - key: str, -) -> str | None: - if key not in obj: - return None - return _required_str(obj, section, key) - - -def _validate_pipelock_render_config(cfg: dict[str, object]) -> dict[str, object]: - _reject_unknown_keys("config", cfg, _PIPELOCK_TOP_LEVEL_KEYS) - normalized: dict[str, object] = { - "version": _required_int(cfg, "config", "version"), - "mode": _required_str(cfg, "config", "mode"), - "enforce": _required_bool(cfg, "config", "enforce"), - "api_allowlist": _required_str_list(cfg, "config", "api_allowlist"), - } - - if "seed_phrase_detection" in cfg: - spd = _required_dict(cfg, "config", "seed_phrase_detection") - _reject_unknown_keys("seed_phrase_detection", spd, {"enabled"}) - normalized["seed_phrase_detection"] = { - "enabled": _required_bool(spd, "seed_phrase_detection", "enabled"), - } - - fp = _required_dict(cfg, "config", "forward_proxy") - _reject_unknown_keys("forward_proxy", fp, {"enabled"}) - normalized["forward_proxy"] = { - "enabled": _required_bool(fp, "forward_proxy", "enabled"), - } - - dlp = _required_dict(cfg, "config", "dlp") - _reject_unknown_keys("dlp", dlp, {"include_defaults", "scan_env"}) - normalized["dlp"] = { - "include_defaults": _required_bool(dlp, "dlp", "include_defaults"), - "scan_env": _required_bool(dlp, "dlp", "scan_env"), - } - - rbs = _required_dict(cfg, "config", "request_body_scanning") - _reject_unknown_keys( - "request_body_scanning", - rbs, - {"action", "scan_headers", "header_mode"}, - ) - normalized_rbs: dict[str, object] = { - "action": _required_str(rbs, "request_body_scanning", "action"), - } - scan_headers = _optional_bool(rbs, "request_body_scanning", "scan_headers") - if scan_headers is not None: - normalized_rbs["scan_headers"] = scan_headers - header_mode = _optional_str(rbs, "request_body_scanning", "header_mode") - if header_mode is not None: - normalized_rbs["header_mode"] = header_mode - normalized["request_body_scanning"] = normalized_rbs - - if "tls_interception" in cfg: - tls = _required_dict(cfg, "config", "tls_interception") - _reject_unknown_keys( - "tls_interception", - tls, - {"enabled", "ca_cert", "ca_key", "passthrough_domains"}, - ) - normalized["tls_interception"] = { - "enabled": _required_bool(tls, "tls_interception", "enabled"), - "ca_cert": _required_str(tls, "tls_interception", "ca_cert"), - "ca_key": _required_str(tls, "tls_interception", "ca_key"), - "passthrough_domains": _optional_str_list( - tls, "tls_interception", "passthrough_domains", - ), - } - - if "ssrf" in cfg: - ssrf = _required_dict(cfg, "config", "ssrf") - _reject_unknown_keys("ssrf", ssrf, {"ip_allowlist"}) - normalized["ssrf"] = { - "ip_allowlist": _required_str_list(ssrf, "ssrf", "ip_allowlist"), - } - - return normalized - - -def pipelock_render_yaml(cfg: dict[str, object]) -> str: - """Render a pipelock config dict (as produced by - `pipelock_build_config`) as YAML. Hand-rolled so we don't take a - YAML-parser dependency for a fixed, narrow shape.""" - def _bool(b: object) -> str: - return "true" if b else "false" - - cfg = _validate_pipelock_render_config(cfg) - lines: list[str] = [] - lines.append(f"version: {cfg['version']}") - lines.append(f"mode: {cfg['mode']}") - lines.append(f"enforce: {_bool(cast(bool, cfg['enforce']))}") - lines.append("") - lines.append("api_allowlist:") - api_allowlist = cast(list[str], cfg["api_allowlist"]) - for h in api_allowlist: - lines.append(f' - "{h}"') - lines.append("") - if "seed_phrase_detection" in cfg: - lines.append("seed_phrase_detection:") - spd = cast(dict[str, object], cfg["seed_phrase_detection"]) - lines.append(f" enabled: {_bool(cast(bool, spd['enabled']))}") - lines.append("") - lines.append("forward_proxy:") - fp = cast(dict[str, object], cfg["forward_proxy"]) - lines.append(f" enabled: {_bool(cast(bool, fp['enabled']))}") - lines.append("") - lines.append("dlp:") - dlp = cast(dict[str, object], cfg["dlp"]) - lines.append(f" include_defaults: {_bool(cast(bool, dlp['include_defaults']))}") - lines.append(f" scan_env: {_bool(cast(bool, dlp['scan_env']))}") - lines.append("") - lines.append("request_body_scanning:") - rbs = cast(dict[str, object], cfg["request_body_scanning"]) - lines.append(f' action: "{cast(str, rbs["action"])}"') - if "scan_headers" in rbs: - lines.append(f" scan_headers: {_bool(cast(bool, rbs['scan_headers']))}") - if "header_mode" in rbs: - lines.append(f' header_mode: "{cast(str, rbs["header_mode"])}"') - if "tls_interception" in cfg: - lines.append("") - lines.append("tls_interception:") - tls = cast(dict[str, object], cfg["tls_interception"]) - lines.append(f" enabled: {_bool(cast(bool, tls['enabled']))}") - lines.append(f' ca_cert: "{cast(str, tls["ca_cert"])}"') - lines.append(f' ca_key: "{cast(str, tls["ca_key"])}"') - passthrough = cast(list[str], tls["passthrough_domains"]) - if passthrough: - lines.append(" passthrough_domains:") - for d in passthrough: - lines.append(f' - "{d}"') - if "ssrf" in cfg: - lines.append("") - lines.append("ssrf:") - ssrf = cast(dict[str, object], cfg["ssrf"]) - lines.append(" ip_allowlist:") - ip_allowlist = cast(list[str], ssrf["ip_allowlist"]) - for ip in ip_allowlist: - lines.append(f' - "{ip}"') - return "\n".join(lines) + "\n" - - -# --- Proxy class ----------------------------------------------------------- - - -@dataclass(frozen=True) -class PipelockProxyPlan: - """Output of PipelockProxy.prepare; consumed by .start when the - sidecar needs to be brought up. - - yaml_path + slug are filled in at prepare time (host-side, side- - effect-free; the YAML references the in-container CA paths - already so it doesn't need the host paths to be valid). The - remaining fields are populated by the backend's launch step - via `dataclasses.replace`: internal/egress networks once - those networks exist, the CA host paths once the one-shot - `pipelock tls init` has run, and `internal_network_cidr` once - Docker has assigned a subnet to the internal network. Empty - defaults are sentinels meaning "not yet set"; `.start` validates - that they are populated. - - `internal_network_cidr` ends up on pipelock's `ssrf.ip_allowlist` - so traffic from sibling sidecars (egress → pipelock on the - upstream leg, etc.) bypasses pipelock's RFC1918 SSRF guard while - api_allowlist and body-scanning still apply.""" - - yaml_path: Path - slug: str - internal_network: str = "" - internal_network_cidr: str = "" - egress_network: str = "" - ca_cert_host_path: Path = Path() - ca_key_host_path: Path = Path() - - -class PipelockProxy: - """The pipelock egress proxy. Encapsulates the YAML-config - generation; the container lifecycle is owned by whatever - wraps the daemon (compose-managed pipelock container on docker, - sidecar-bundle PID 1 on smolmachines). - - Backends instantiate the class directly — there are no - platform-specific subclasses; the in-container CA paths are - universal module-level constants - (`PIPELOCK_CA_CERT_IN_CONTAINER` / `PIPELOCK_CA_KEY_IN_CONTAINER`).""" - - def prepare( - self, - bottle: Bottle, - slug: str, - stage_dir: Path, - provider_routes: tuple[EgressRoute, ...] = (), - ) -> PipelockProxyPlan: - """Write the pipelock yaml config (mode 600) under `stage_dir` - and return the plan for launch. Pure host-side, no docker - subprocess. - - `slug` is the agent-derived identifier (lowercased, - hyphen-normalized) used as the suffix in every per-agent - resource name — the agent container, the sidecar bundle - container, the internal/egress networks. It's stored on the - returned plan so the backend's launch step can derive those - names. - - The CA paths the YAML references are the module-level - in-container constants. The host-side counterparts are - generated by the launch step (not here, so prepare stays - side-effect-free on docker) and added to the plan via - `dataclasses.replace` before the daemon starts.""" - yaml_path = stage_dir / "pipelock.yaml" - cfg = pipelock_build_config( - bottle, - ca_cert_path=PIPELOCK_CA_CERT_IN_CONTAINER, - ca_key_path=PIPELOCK_CA_KEY_IN_CONTAINER, - provider_routes=provider_routes, - ) - yaml_path.write_text(pipelock_render_yaml(cfg)) - yaml_path.chmod(0o600) - return PipelockProxyPlan(yaml_path=yaml_path, slug=slug) diff --git a/tests/canaries/test_pipelock_image.py b/tests/canaries/test_pipelock_image.py deleted file mode 100644 index 9bfc5d4..0000000 --- a/tests/canaries/test_pipelock_image.py +++ /dev/null @@ -1,45 +0,0 @@ -"""Canary: the pinned pipelock image's binary actually runs. - -This test exists to catch a broken upstream packaging at the pinned -digest. It is NOT part of the per-push suite — that would couple every -dev push to upstream registry availability. Set -BOT_BOTTLE_RUN_CANARIES=1 to opt in (a scheduled CI workflow does -this; humans can run it ad-hoc the same way). -""" - -import os -import subprocess -import unittest - -from bot_bottle.backend.docker.pipelock import PIPELOCK_IMAGE -from tests._docker import skip_unless_docker - - -@unittest.skipUnless( - os.environ.get("BOT_BOTTLE_RUN_CANARIES") == "1", - "canary suite is opt-in; set BOT_BOTTLE_RUN_CANARIES=1 to run", -) -@skip_unless_docker() -class TestPipelockImage(unittest.TestCase): - @classmethod - def setUpClass(cls): - result = subprocess.run( - ["docker", "pull", PIPELOCK_IMAGE], - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - check=False, - ) - if result.returncode != 0: - raise unittest.SkipTest(f"could not pull {PIPELOCK_IMAGE}") - - def test_binary_runs(self): - result = subprocess.run( - ["docker", "run", "--rm", PIPELOCK_IMAGE, "--version"], - capture_output=True, text=True, check=False, - ) - out = result.stdout + result.stderr - self.assertRegex(out, r"[Pp]ipelock|2\.[0-9]+\.[0-9]+") - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/integration/test_pipelock_allow_node.py b/tests/integration/test_pipelock_allow_node.py deleted file mode 100644 index 047df68..0000000 --- a/tests/integration/test_pipelock_allow_node.py +++ /dev/null @@ -1,110 +0,0 @@ -"""Integration: a Node request to a host on pipelock's allowlist is -tunneled through. - -End-to-end mirror of test_pipelock_block_node: drives `BottleBackend. -prepare → launch` so the real image build, network plumbing, and -pipelock sidecar are all in the loop. Inside the bottle, a Node -script issues an HTTPS CONNECT for raw.githubusercontent.com:443 — -a host in the baked-in default allowlist — through `$HTTPS_PROXY`. -Pipelock must answer 200 Connection Established. The 200 vs. 403 -split on CONNECT is decided by pipelock itself (the remote never -sees the CONNECT verb), so it isolates the allowlist decision from -anything the remote might return. -""" - -from __future__ import annotations - -import os -import shutil -import tempfile -import unittest -from pathlib import Path - -from bot_bottle.backend import BottleSpec, get_bottle_backend -from tests._docker import skip_unless_docker -from tests.fixtures import fixture_minimal - - -# Output contract (parsed by the test): -# - "connect=" proxy upgraded to a tunnel (CONNECT success path) -# - "status=" proxy answered without tunneling (block path) -# - "error= " transport-level failure -# - "timeout" request hung -_PROBE_JS = r""" -const http = require('http'); -const proxy = new URL(process.env.HTTPS_PROXY); -const req = http.request({ - host: proxy.hostname, - port: proxy.port, - method: 'CONNECT', - path: 'raw.githubusercontent.com:443', -}); -req.on('connect', (res, socket) => { - console.log('connect=' + res.statusCode); - socket.destroy(); - process.exit(0); -}); -req.on('response', (res) => { - res.resume(); - res.on('end', () => { - console.log('status=' + res.statusCode); - process.exit(0); - }); -}); -req.on('error', (e) => { - console.log('error=' + (e.code || '') + ' ' + e.message); - process.exit(0); -}); -req.setTimeout(5000, () => { - console.log('timeout'); - req.destroy(); -}); -req.end(); -""" - - -@skip_unless_docker() -class TestPipelockAllowsNode(unittest.TestCase): - @unittest.skipIf( - os.environ.get("GITEA_ACTIONS") == "true", - "skipped under act_runner: docker socket mount topology breaks " - "in-process visibility of networks created on the host daemon", - ) - def test_node_request_to_allowed_host_is_tunneled(self): - backend = get_bottle_backend() - stage_dir = Path(tempfile.mkdtemp(prefix="cb-test-stage.")) - try: - spec = BottleSpec( - manifest=fixture_minimal(), - agent_name="demo", - copy_cwd=False, - user_cwd=str(stage_dir), - ) - plan = backend.prepare(spec, stage_dir=stage_dir) - with backend.launch(plan) as bottle: - script = ( - "set -e\n" - "cat > /tmp/probe.js <<'PROBE_EOF'\n" - f"{_PROBE_JS}\n" - "PROBE_EOF\n" - "node /tmp/probe.js\n" - ) - result = bottle.exec(script) - finally: - shutil.rmtree(stage_dir, ignore_errors=True) - - self.assertEqual( - 0, result.returncode, - f"exec wrapper failed: stdout={result.stdout!r} stderr={result.stderr!r}", - ) - # raw.githubusercontent.com IS in fixture_minimal's effective - # allowlist (baked-in default). Pipelock must answer the CONNECT - # with 200 Connection Established. - self.assertIn( - "connect=200", result.stdout, - f"pipelock should have tunneled to raw.githubusercontent.com; got: {result.stdout!r}", - ) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/integration/test_pipelock_allows_normal_https.py b/tests/integration/test_pipelock_allows_normal_https.py deleted file mode 100644 index 8342512..0000000 --- a/tests/integration/test_pipelock_allows_normal_https.py +++ /dev/null @@ -1,83 +0,0 @@ -"""Integration: with pipelock's tls_interception enabled (PRD 0006), -a clean HTTPS GET to an allowlisted host succeeds end-to-end through -the bumped tunnel. - -Complement to test_pipelock_blocks_secret_https_post — together they -pin pipelock's two paths (block on body match, allow on clean -traffic). This test is also the implicit TLS-trust check: if -provision_ca had failed to install pipelock's CA into the agent's -trust store, curl would have rejected the bumped leaf cert and the -fetch would have failed before any HTTP response could come back.""" - -from __future__ import annotations - -import os -import shutil -import tempfile -import unittest -from pathlib import Path - -from bot_bottle.backend import BottleSpec, get_bottle_backend -from tests._docker import skip_unless_docker -from tests.fixtures import fixture_minimal - - -# raw.githubusercontent.com is in the baked-in DEFAULT_ALLOWLIST. -# `git`'s own README on the master branch is a long-lived raw file -# (~3 KB) that any CI runner with internet can fetch. -_TARGET_URL = "https://raw.githubusercontent.com/git/git/master/README.md" - - -@skip_unless_docker() -class TestPipelockAllowsNormalHttps(unittest.TestCase): - @unittest.skipIf( - os.environ.get("GITEA_ACTIONS") == "true", - "skipped under act_runner: docker socket mount topology breaks " - "in-process visibility of networks created on the host daemon", - ) - def test_https_get_to_allowed_host_succeeds(self): - backend = get_bottle_backend() - stage_dir = Path(tempfile.mkdtemp(prefix="cb-test-stage.")) - try: - spec = BottleSpec( - manifest=fixture_minimal(), - agent_name="demo", - copy_cwd=False, - user_cwd=str(stage_dir), - ) - plan = backend.prepare(spec, stage_dir=stage_dir) - with backend.launch(plan) as bottle: - script = ( - "set -eu\n" - 'curl --proxy "$HTTPS_PROXY" -s --max-time 10 \\\n' - " -w 'status=%{http_code}\\n' \\\n" - " -o /tmp/probe-body.txt \\\n" - f" {_TARGET_URL}\n" - 'echo "len=$(wc -c < /tmp/probe-body.txt)"\n' - ) - result = bottle.exec(script) - finally: - shutil.rmtree(stage_dir, ignore_errors=True) - - self.assertEqual( - 0, result.returncode, - f"exec wrapper failed: stdout={result.stdout!r} stderr={result.stderr!r}", - ) - # 200 from the upstream (pipelock forwarded after the body - # scan passed). If curl had failed the bumped-cert trust - # check, the exit code or status would be non-200 here. - self.assertIn( - "status=200", result.stdout, - f"expected 200 from raw.githubusercontent.com; got: {result.stdout!r}", - ) - # The git README is ~3 KB. Anything substantially non-zero - # proves the response body actually transferred — i.e. the - # CONNECT tunnel + bumped TLS + body forwarding all worked. - self.assertNotIn( - "len=0\n", result.stdout, - f"response body was empty: {result.stdout!r}", - ) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/integration/test_pipelock_apply.py b/tests/integration/test_pipelock_apply.py deleted file mode 100644 index e8670ab..0000000 --- a/tests/integration/test_pipelock_apply.py +++ /dev/null @@ -1,210 +0,0 @@ -"""Integration: drive `apply_allowlist_change` against a real -pipelock sidecar (PRD 0015). - -Brings up a real pipelock container via direct `docker run` (the -old `.start()` helper went away in PRD 0024 chunk 3), calls -apply_allowlist_change to swap the api_allowlist, restarts -pipelock, and verifies the running container now serves the new -yaml. - -The hot-reload code path under test (apply_allowlist_change, -fetch_current_yaml, fetch_current_allowlist) is unchanged from -PRD 0015 — only the test's bringup helper moved. - -Setup uses pipelock_tls_init which bind-mounts a host path into a -one-shot pipelock container — that doesn't work in DinD, so the -test skips under GITEA_ACTIONS. -""" - -from __future__ import annotations - -import os -import shutil -import subprocess -import tempfile -import time -import unittest -from pathlib import Path - -from bot_bottle.backend.docker.bottle_state import pipelock_state_dir -from bot_bottle.backend.docker.network import ( - network_create_egress, - network_create_internal, - network_remove, -) -from bot_bottle.pipelock import ( - PIPELOCK_CA_CERT_IN_CONTAINER, - PIPELOCK_CA_KEY_IN_CONTAINER, -) -from bot_bottle.backend.docker.pipelock import pipelock_tls_init -from bot_bottle.pipelock import PipelockProxy -from bot_bottle.backend.docker.pipelock_apply import ( - PipelockApplyError, - apply_allowlist_change, - fetch_current_allowlist, - fetch_current_yaml, -) -from bot_bottle.backend.docker.sidecar_bundle import ( - SIDECAR_BUNDLE_IMAGE, - sidecar_bundle_container_name, -) -from bot_bottle.yaml_subset import parse_yaml_subset -from tests._docker import skip_unless_docker -from tests.fixtures import fixture_minimal - - -@skip_unless_docker() -@unittest.skipIf( - os.environ.get("GITEA_ACTIONS") == "true", - "skipped under act_runner: pipelock_tls_init uses a host bind mount " - "that doesn't share fs with the runner container", -) -class TestPipelockApply(unittest.TestCase): - def setUp(self): - self.slug = f"cb-test-pla-{os.getpid()}-{int(time.time())}" - self.sidecar_name = "" - self.internal_net = "" - self.egress_net = "" - self.work_dir = Path(tempfile.mkdtemp(prefix="pipelock-apply.")) - - def tearDown(self): - if self.sidecar_name: - subprocess.run( - ["docker", "rm", "-f", self.sidecar_name], - stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False, - ) - for n in (self.internal_net, self.egress_net): - if n: - network_remove(n) - shutil.rmtree(self.work_dir, ignore_errors=True) - # Clean up the per-slug state dir under ~/.bot-bottle/state/ - # (apply_allowlist_change writes there; _bring_up calls - # proxy.prepare with the same path so the bind-mount and the - # hot-reload write target stay coherent). - shutil.rmtree(pipelock_state_dir(self.slug), ignore_errors=True) - - def _bring_up(self) -> None: - """Brings up the bundle image with only the pipelock daemon - selected. The bundle's Python supervisor is PID 1, which is - what apply_allowlist_change targets via `docker kill - --signal USR1` — pipelock alone as PID 1 wouldn't survive - SIGUSR1 (default disposition = terminate). This shape is - what runs in production minus the other three daemons. - - The yaml stages into the production-real - `pipelock_state_dir(slug)` (not a private temp dir) so the - bind-mount target matches what `apply_allowlist_change` - writes to — otherwise the hot-reload would write to a - nowhere-mounted host path and the container would never see - the updated config.""" - state_dir = pipelock_state_dir(self.slug) - state_dir.mkdir(parents=True, exist_ok=True) - prep = PipelockProxy().prepare( - fixture_minimal().bottles["dev"], self.slug, state_dir, - ) - self.internal_net = network_create_internal(self.slug) - self.egress_net = network_create_egress(self.slug) - ca_cert_host, ca_key_host = pipelock_tls_init(state_dir) - - # Ensure the bundle image is built. compose normally builds - # this lazily; we go through `docker run` here so we have to - # do it ourselves. Idempotent — cached layers make repeats - # fast. - repo_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) - subprocess.run( - ["docker", "build", - "-t", SIDECAR_BUNDLE_IMAGE, - "-f", "Dockerfile.sidecars", "."], - cwd=repo_root, check=True, capture_output=True, - ) - - self.sidecar_name = sidecar_bundle_container_name(self.slug) - subprocess.run( - ["docker", "create", - "--name", self.sidecar_name, - "--network", self.internal_net, - "-e", "BOT_BOTTLE_SIDECAR_DAEMONS=pipelock", - "-v", f"{prep.yaml_path}:/etc/pipelock.yaml:ro", - "-v", f"{ca_cert_host}:{PIPELOCK_CA_CERT_IN_CONTAINER}:ro", - "-v", f"{ca_key_host}:{PIPELOCK_CA_KEY_IN_CONTAINER}:ro", - SIDECAR_BUNDLE_IMAGE], - check=True, capture_output=True, - ) - subprocess.run( - ["docker", "network", "connect", self.egress_net, self.sidecar_name], - check=True, capture_output=True, - ) - subprocess.run( - ["docker", "start", self.sidecar_name], - check=True, capture_output=True, - ) - # Wait until fetch_current_yaml succeeds — it's a docker cp - # which works on a started-but-not-yet-ready pipelock, so - # this is more of a "container exists" probe than a - # readiness one; the hot-reload tests below tolerate - # pipelock briefly being slow to serve. - deadline = time.monotonic() + 15.0 - while time.monotonic() < deadline: - try: - fetch_current_yaml(self.slug) - return - except PipelockApplyError: - pass - time.sleep(0.25) - raise AssertionError("pipelock sidecar never became reachable") - - def _wait_for_yaml(self, contains: str, *, deadline_s: float = 15.0) -> str: - """Poll docker exec until /etc/pipelock.yaml contains `contains`, - returning the yaml. Used to bridge the docker-restart window.""" - deadline = time.monotonic() + deadline_s - while time.monotonic() < deadline: - try: - yaml = fetch_current_yaml(self.slug) - if contains in yaml: - return yaml - except PipelockApplyError: - pass - time.sleep(0.25) - self.fail(f"never saw {contains!r} in /etc/pipelock.yaml") - - def test_apply_swaps_api_allowlist(self): - self._bring_up() - - initial_yaml = fetch_current_yaml(self.slug) - # fixture_minimal yields the baked-in DEFAULT_ALLOWLIST in - # pipelock.py; api.anthropic.com is in there. - self.assertIn("api.anthropic.com", initial_yaml) - - new_content = "api.anthropic.com\nnew-host.example\n" - before, after = apply_allowlist_change(self.slug, new_content) - self.assertIn("api.anthropic.com", before) - self.assertNotIn("new-host.example", before) - self.assertIn("new-host.example", after) - - updated = self._wait_for_yaml("new-host.example") - cfg = parse_yaml_subset(updated) - self.assertIn("new-host.example", cfg["api_allowlist"]) # type: ignore[operator] - self.assertIn("api.anthropic.com", cfg["api_allowlist"]) # type: ignore[operator] - # tls_interception block (set up by the production prepare - # via pipelock_build_config) is preserved across the swap. - self.assertIn("tls_interception", cfg) - - def test_apply_with_invalid_host_raises(self): - self._bring_up() - with self.assertRaises(PipelockApplyError): - apply_allowlist_change(self.slug, "host with space.example\n") - - def test_fetch_current_allowlist_renders_one_per_line(self): - self._bring_up() - listing = fetch_current_allowlist(self.slug) - self.assertTrue(listing.endswith("\n")) - self.assertIn("api.anthropic.com\n", listing) - - def test_apply_against_missing_sidecar_raises(self): - # Don't bring up — the slug points at nothing. - with self.assertRaises(PipelockApplyError): - apply_allowlist_change(self.slug, "x.example\n") - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/integration/test_pipelock_block_node.py b/tests/integration/test_pipelock_block_node.py deleted file mode 100644 index 01671b6..0000000 --- a/tests/integration/test_pipelock_block_node.py +++ /dev/null @@ -1,114 +0,0 @@ -"""Integration: a Node script run inside a launched bottle, hitting -a host outside the pipelock allowlist, is blocked. - -End-to-end: drives `BottleBackend.prepare → launch` so the real -image build, network plumbing, and pipelock sidecar are all in the -loop. Inside the bottle, a Node script forms an HTTP forward-proxy -request (absolute-URI path) to `example.com` via `$HTTPS_PROXY`. The -fixture's effective allowlist contains only the baked-in defaults, -so pipelock must refuse to forward. -""" - -from __future__ import annotations - -import os -import shutil -import tempfile -import unittest -from pathlib import Path - -from bot_bottle.backend import BottleSpec, get_bottle_backend -from tests._docker import skip_unless_docker -from tests.fixtures import fixture_minimal - - -# Node's stdlib http does not respect HTTPS_PROXY on its own; this -# script builds the forward-proxy request shape by hand so the test -# is asserting on pipelock's allowlist decision, not on whatever -# proxy-env auto-detection a Node release happens to ship. -# -# Output contract (parsed by the test): -# - "status=" when the proxy returns an HTTP response -# - "error= " on a transport-level failure -# - "timeout" on a hung request -_PROBE_JS = r""" -const http = require('http'); -const proxy = new URL(process.env.HTTPS_PROXY); -const req = http.request({ - host: proxy.hostname, - port: proxy.port, - method: 'GET', - path: 'http://example.com/', - headers: { Host: 'example.com' }, -}, (res) => { - res.resume(); - res.on('end', () => { - console.log('status=' + res.statusCode); - process.exit(0); - }); -}); -req.on('error', (e) => { - console.log('error=' + (e.code || '') + ' ' + e.message); - process.exit(0); -}); -req.setTimeout(5000, () => { - console.log('timeout'); - req.destroy(); -}); -req.end(); -""" - - -@skip_unless_docker() -class TestPipelockBlocksNode(unittest.TestCase): - @unittest.skipIf( - os.environ.get("GITEA_ACTIONS") == "true", - "skipped under act_runner: docker socket mount topology breaks " - "in-process visibility of networks created on the host daemon", - ) - def test_node_request_to_blocked_host_is_rejected(self): - backend = get_bottle_backend() - stage_dir = Path(tempfile.mkdtemp(prefix="cb-test-stage.")) - try: - spec = BottleSpec( - manifest=fixture_minimal(), - agent_name="demo", - copy_cwd=False, - user_cwd=str(stage_dir), - ) - plan = backend.prepare(spec, stage_dir=stage_dir) - with backend.launch(plan) as bottle: - script = ( - "set -e\n" - "cat > /tmp/probe.js <<'PROBE_EOF'\n" - f"{_PROBE_JS}\n" - "PROBE_EOF\n" - "node /tmp/probe.js\n" - ) - result = bottle.exec(script) - finally: - shutil.rmtree(stage_dir, ignore_errors=True) - - self.assertEqual( - 0, result.returncode, - f"exec wrapper failed: stdout={result.stdout!r} stderr={result.stderr!r}", - ) - # The probe always prints exactly one signal line. If it - # doesn't, the script failed in a way the test doesn't - # understand and the surrounding assertions would be - # ambiguous. - self.assertTrue( - "status=" in result.stdout or "error=" in result.stdout or "timeout" in result.stdout, - f"probe produced no recognized output: {result.stdout!r}", - ) - # The core invariant: example.com is NOT in fixture_minimal's - # effective allowlist (only the baked-in defaults), so the - # proxy must not have forwarded a successful response. - self.assertNotIn( - "status=200", result.stdout, - "example.com is outside the allowlist; pipelock should not have forwarded a 200", - ) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/integration/test_pipelock_blocks_secret_https_post.py b/tests/integration/test_pipelock_blocks_secret_https_post.py deleted file mode 100644 index b1d1320..0000000 --- a/tests/integration/test_pipelock_blocks_secret_https_post.py +++ /dev/null @@ -1,101 +0,0 @@ -"""Integration: with pipelock's tls_interception enabled (PRD 0006), -a credential POST sent over HTTPS is blocked by pipelock's body-scan -layer — closing the gap that motivated this PRD. - -End-to-end: drives `BottleBackend.prepare → launch` so the real -image build, network plumbing, pipelock_tls_init, sidecar bring-up, -and provision_ca (CA install in the agent's trust store) are all in -the loop. The probe is a single `curl --proxy "$HTTPS_PROXY" -X POST -... https://raw.githubusercontent.com/...` — curl natively does -CONNECT through the proxy, the agent's trust store now contains -pipelock's per-bottle CA so curl trusts pipelock's bumped leaf, and -pipelock sees the decrypted body and returns its known -`blocked: request body contains secret: ` 403. - -The host has to be allowlisted (so the CONNECT is accepted) but must -not opt into `pipelock.tls_passthrough` (so the body actually gets -scanned). This probe targets `raw.githubusercontent.com`, which is on -the baked allowlist and intercepted+scanned like any non-passthrough -host.""" - -from __future__ import annotations - -import os -import shutil -import tempfile -import unittest -from pathlib import Path - -from bot_bottle.backend import BottleSpec, get_bottle_backend -from bot_bottle.manifest import Manifest -from tests._docker import skip_unless_docker - - -# Synthetic value shaped like a GitHub Personal Access Token; not a -# real credential. Carried into the bottle as an env var so the -# probe shell can read it via $FAKE_TOKEN without ever interpolating -# the value on the bash `bottle.exec` argv. -_FAKE_TOKEN = "ghp_aB3cD4eF5gH6iJ7kL8mN9oP0qR1sT2uV3wX4yZ" - - -@skip_unless_docker() -class TestPipelockBlocksSecretHttpsPost(unittest.TestCase): - @unittest.skipIf( - os.environ.get("GITEA_ACTIONS") == "true", - "skipped under act_runner: docker socket mount topology breaks " - "in-process visibility of networks created on the host daemon", - ) - def test_https_post_with_credential_body_is_blocked(self): - manifest = Manifest.from_json_obj({ - "bottles": { - "dev": {"env": {"FAKE_TOKEN": _FAKE_TOKEN}}, - }, - "agents": { - "demo": {"skills": [], "prompt": "", "bottle": "dev"}, - }, - }) - backend = get_bottle_backend() - stage_dir = Path(tempfile.mkdtemp(prefix="cb-test-stage.")) - try: - spec = BottleSpec( - manifest=manifest, - agent_name="demo", - copy_cwd=False, - user_cwd=str(stage_dir), - ) - plan = backend.prepare(spec, stage_dir=stage_dir) - with backend.launch(plan) as bottle: - script = ( - "set -eu\n" - 'curl --proxy "$HTTPS_PROXY" -s --max-time 8 \\\n' - " -w 'status=%{http_code}\\n' \\\n" - " -o /tmp/probe-body.txt \\\n" - ' -X POST -d "token=$FAKE_TOKEN" \\\n' - " https://raw.githubusercontent.com/dlp-probe\n" - 'echo "body=$(head -c 200 /tmp/probe-body.txt)"\n' - ) - result = bottle.exec(script) - finally: - shutil.rmtree(stage_dir, ignore_errors=True) - - self.assertEqual( - 0, result.returncode, - f"exec wrapper failed: stdout={result.stdout!r} stderr={result.stderr!r}", - ) - # Pipelock's body-scan block returns 403 with a plain-text - # body starting `blocked: ` (pinned empirically; see - # tests/unit/test_mitmproxy_verdict.py for the - # corresponding-fingerprint test, retained from PR #8 as - # general pipelock-block-shape coverage). - self.assertIn( - "status=403", result.stdout, - f"expected 403 from pipelock; got: {result.stdout!r}", - ) - self.assertIn( - "body=blocked: ", result.stdout, - f"expected pipelock block body; got: {result.stdout!r}", - ) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/integration/test_pipelock_blocks_secret_post.py b/tests/integration/test_pipelock_blocks_secret_post.py deleted file mode 100644 index a97ff33..0000000 --- a/tests/integration/test_pipelock_blocks_secret_post.py +++ /dev/null @@ -1,132 +0,0 @@ -"""Integration: pipelock blocks a POST whose body carries a -recognized credential pattern, even when the host is on the -allowlist. - -End-to-end companion to the block / allow node tests. The manifest -carries a literal env var whose value matches pipelock's DLP rules. -A Node script POSTs that value to an allowlisted host via plain -HTTP forward proxy (absolute-URI form) so pipelock can scan the -body — routing the same request over CONNECT would tunnel TLS -opaquely and the DLP layer would have nothing to see. The 403 -return from pipelock isolates the body-scan layer as the active -control, distinct from the host-allowlist decision the other two -tests pin down. -""" - -from __future__ import annotations - -import os -import shutil -import tempfile -import unittest -from pathlib import Path - -from bot_bottle.backend import BottleSpec, get_bottle_backend -from bot_bottle.manifest import Manifest -from tests._docker import skip_unless_docker - - -# Synthetic value shaped like a GitHub Personal Access Token -# (`ghp_` + 36 alnum chars). Not a real token; the only relevant -# property is that pipelock's default DLP rules recognize the -# shape. Kept obviously dummy so a stray grep can't mistake it -# for a real credential. -_FAKE_TOKEN = "ghp_aB3cD4eF5gH6iJ7kL8mN9oP0qR1sT2uV3wX4yZ" - - -# Output contract (parsed by the test): -# - "status=" proxy answered with an HTTP response -# - "error= " transport-level failure -# - "timeout" request hung -_PROBE_JS = r""" -const http = require('http'); -const proxy = new URL(process.env.HTTPS_PROXY); -const body = 'token=' + process.env.FAKE_TOKEN; -const req = http.request({ - host: proxy.hostname, - port: proxy.port, - method: 'POST', - // Absolute-URI form: pipelock acts as a plain HTTP forward proxy - // and the body is visible to its DLP scanner. CONNECT would - // tunnel TLS bytes that pipelock can't see into. - path: 'http://api.anthropic.com/dlp-probe', - headers: { - Host: 'api.anthropic.com', - 'Content-Type': 'application/x-www-form-urlencoded', - 'Content-Length': Buffer.byteLength(body), - }, -}, (res) => { - res.resume(); - res.on('end', () => { - console.log('status=' + res.statusCode); - process.exit(0); - }); -}); -req.on('error', (e) => { - console.log('error=' + (e.code || '') + ' ' + e.message); - process.exit(0); -}); -req.setTimeout(5000, () => { - console.log('timeout'); - req.destroy(); -}); -req.write(body); -req.end(); -""" - - -@skip_unless_docker() -class TestPipelockBlocksSecretPost(unittest.TestCase): - @unittest.skipIf( - os.environ.get("GITEA_ACTIONS") == "true", - "skipped under act_runner: docker socket mount topology breaks " - "in-process visibility of networks created on the host daemon", - ) - def test_post_with_credential_body_is_blocked(self): - manifest = Manifest.from_json_obj({ - "bottles": { - "dev": {"env": {"FAKE_TOKEN": _FAKE_TOKEN}}, - }, - "agents": { - "demo": {"skills": [], "prompt": "", "bottle": "dev"}, - }, - }) - backend = get_bottle_backend() - stage_dir = Path(tempfile.mkdtemp(prefix="cb-test-stage.")) - try: - spec = BottleSpec( - manifest=manifest, - agent_name="demo", - copy_cwd=False, - user_cwd=str(stage_dir), - ) - plan = backend.prepare(spec, stage_dir=stage_dir) - with backend.launch(plan) as bottle: - script = ( - "set -e\n" - "cat > /tmp/probe.js <<'PROBE_EOF'\n" - f"{_PROBE_JS}\n" - "PROBE_EOF\n" - "node /tmp/probe.js\n" - ) - result = bottle.exec(script) - finally: - shutil.rmtree(stage_dir, ignore_errors=True) - - self.assertEqual( - 0, result.returncode, - f"exec wrapper failed: stdout={result.stdout!r} stderr={result.stderr!r}", - ) - # api.anthropic.com is on the baked-in allowlist, so the - # host-allowlist layer would have let this through. Pipelock's - # DLP body-scan layer must catch the credential pattern and - # answer 403; any other code means the body reached the - # upstream. - self.assertIn( - "status=403", result.stdout, - f"pipelock DLP should have blocked the credential POST; got: {result.stdout!r}", - ) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/integration/test_pipelock_llm_passthrough.py b/tests/integration/test_pipelock_llm_passthrough.py deleted file mode 100644 index f2b008d..0000000 --- a/tests/integration/test_pipelock_llm_passthrough.py +++ /dev/null @@ -1,107 +0,0 @@ -"""Integration: route-owned `pipelock.tls_passthrough` renders into -pipelock's `tls_interception.passthrough_domains`, so request bodies -that would otherwise trip the body-scan layer are not inspected and the -request reaches the provider TLS endpoint. - -Probe: POST the canonical zero-entropy 12-word BIP-39 mnemonic -(`abandon` × 11 + `about`) — checksum-valid by construction — to -`https://api.anthropic.com/v1/messages`. With the route policy, -pipelock relays the CONNECT opaquely and the upstream replies with -whatever it likes (401/4xx from Anthropic for an unauthenticated junk -POST). We assert that the verdict is NOT pipelock's block. -""" - -from __future__ import annotations - -import os -import shutil -import tempfile -import unittest -from pathlib import Path - -from bot_bottle.backend import BottleSpec, get_bottle_backend -from bot_bottle.manifest import Manifest -from tests._docker import skip_unless_docker - - -# Canonical BIP-39 12-word test mnemonic. Valid SHA-256 checksum — -# pipelock's seed-phrase scanner (default `verify_checksum: true`) -# fires on this exact string if it ever sees the cleartext body. -_BIP39_PHRASE = ( - "abandon abandon abandon abandon abandon abandon " - "abandon abandon abandon abandon abandon about" -) - - -@skip_unless_docker() -class TestPipelockLlmPassthrough(unittest.TestCase): - @unittest.skipIf( - os.environ.get("GITEA_ACTIONS") == "true", - "skipped under act_runner: docker socket mount topology breaks " - "in-process visibility of networks created on the host daemon", - ) - def test_bip39_body_to_anthropic_is_not_blocked(self): - manifest = Manifest.from_json_obj({ - "bottles": { - "dev": { - "env": {"SEED": _BIP39_PHRASE}, - "egress": {"routes": [{ - "host": "api.anthropic.com", - "pipelock": {"tls_passthrough": True}, - }]}, - }, - }, - "agents": { - "demo": {"skills": [], "prompt": "", "bottle": "dev"}, - }, - }) - backend = get_bottle_backend() - stage_dir = Path(tempfile.mkdtemp(prefix="cb-test-stage.")) - try: - spec = BottleSpec( - manifest=manifest, - agent_name="demo", - copy_cwd=False, - user_cwd=str(stage_dir), - ) - plan = backend.prepare(spec, stage_dir=stage_dir) - with backend.launch(plan) as bottle: - script = ( - "set -eu\n" - 'curl --proxy "$HTTPS_PROXY" -s --max-time 10 \\\n' - " -w 'status=%{http_code}\\n' \\\n" - " -o /tmp/probe-body.txt \\\n" - ' -X POST -H "content-type: application/json" \\\n' - ' --data "{\\"phrase\\": \\"$SEED\\"}" \\\n' - " https://api.anthropic.com/v1/messages\n" - 'echo "body=$(head -c 200 /tmp/probe-body.txt)"\n' - ) - result = bottle.exec(script) - finally: - shutil.rmtree(stage_dir, ignore_errors=True) - - self.assertEqual( - 0, result.returncode, - f"exec wrapper failed: stdout={result.stdout!r} " - f"stderr={result.stderr!r}", - ) - # The pipelock block verdict starts with `blocked: ` in the - # body. Anything else (auth error, 401, 4xx from Anthropic) is - # an acceptable outcome — it means the body was NOT inspected - # by the proxy and the request was relayed to the upstream - # TLS endpoint. - self.assertNotIn( - "body=blocked: ", result.stdout, - f"unexpected pipelock body-scan block on api.anthropic.com; " - f"expected passthrough to skip MITM. got: {result.stdout!r}", - ) - self.assertNotIn( - "BIP-39", result.stdout, - f"BIP-39 verdict should never appear for api.anthropic.com " - f"requests under tls_interception.passthrough_domains; " - f"got: {result.stdout!r}", - ) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/unit/test_pipelock_allowlist.py b/tests/unit/test_pipelock_allowlist.py deleted file mode 100644 index 23383e1..0000000 --- a/tests/unit/test_pipelock_allowlist.py +++ /dev/null @@ -1,169 +0,0 @@ -"""Unit: pipelock_effective_allowlist — pipelock's allowlist -mirrors manifest-declared egress routes. Git upstreams declared in -`bottle.git` don't contribute; they flow through the per-agent -git-gate (PRD 0008).""" - -import unittest - -from bot_bottle.agent_provider import CODEX_HOST_CREDENTIAL_HOSTS -from bot_bottle.egress import CODEX_HOST_CREDENTIAL_TOKEN_REF, EgressRoute -from bot_bottle.manifest import Manifest -from bot_bottle.pipelock import ( - pipelock_effective_allowlist, - pipelock_effective_ssrf_ip_allowlist, - pipelock_effective_tls_passthrough, -) - - -def _bottle(spec): # type: ignore - return Manifest.from_json_obj({ - "bottles": {"dev": spec}, - "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, - }).bottles["dev"] - - -def _routes(routes): # type: ignore - return {"egress": {"routes": routes}} - - -class TestEffectiveAllowlist(unittest.TestCase): - def test_empty_without_any_manifest_routes(self): - eff = pipelock_effective_allowlist(_bottle({})) - self.assertEqual([], eff) - - def test_sorted_and_deduped(self): - eff = pipelock_effective_allowlist(_bottle(_routes([ - {"host": "api.anthropic.com", - "auth": {"scheme": "Bearer", "token_ref": "T"}}, - ]))) - self.assertEqual(len(eff), len(set(eff))) - self.assertEqual(eff, sorted(eff)) - - -class TestAllowlistWithRoutes(unittest.TestCase): - def test_manifest_route_hosts_present(self): - eff = pipelock_effective_allowlist(_bottle(_routes([ - {"host": "registry.npmjs.org", - "auth": {"scheme": "Bearer", "token_ref": "N"}}, - {"host": "api.github.com", - "auth": {"scheme": "Bearer", "token_ref": "G"}}, - ]))) - self.assertIn("registry.npmjs.org", eff) - self.assertIn("api.github.com", eff) - - def test_no_baked_defaults_alongside_manifest_routes(self): - eff = pipelock_effective_allowlist(_bottle(_routes([ - {"host": "x.example", - "auth": {"scheme": "Bearer", "token_ref": "T"}}, - ]))) - self.assertEqual(["x.example"], eff) - - def test_egress_hostname_NOT_in_pipelock_allowlist(self): - # The agent never dials egress via the proxy mechanism - # — it IS the proxy. Pipelock receives upstream hostnames - # from egress's CONNECT requests, not the - # `egress` hostname itself. - eff = pipelock_effective_allowlist(_bottle(_routes([ - {"host": "x.example", - "auth": {"scheme": "Bearer", "token_ref": "T"}}, - ]))) - self.assertNotIn("egress", eff) - - def test_supervise_hostname_auto_added_when_supervise_enabled(self): - eff = pipelock_effective_allowlist(_bottle({"supervise": True})) - self.assertIn("supervise", eff) - - def test_supervise_hostname_NOT_added_when_disabled(self): - eff = pipelock_effective_allowlist(_bottle({})) - self.assertNotIn("supervise", eff) - eff_explicit = pipelock_effective_allowlist(_bottle({"supervise": False})) - self.assertNotIn("supervise", eff_explicit) - - def test_path_allowlist_does_not_affect_pipelock_allowlist(self): - # path_allowlist is enforced by egress, not pipelock. - # Pipelock only sees the upstream hostname; the path filter - # has already passed (or 403'd) at egress. - eff = pipelock_effective_allowlist(_bottle(_routes([ - {"host": "github.com", "path_allowlist": ["/x/", "/y/"]}, - ]))) - self.assertIn("github.com", eff) - for entry in eff: - self.assertFalse(entry.startswith("/")) - - -class TestTlsPassthrough(unittest.TestCase): - def test_default_empty(self): - passthrough = pipelock_effective_tls_passthrough(_bottle({})) - self.assertEqual([], passthrough) - - def test_route_hosts_not_added_to_passthrough_by_default(self): - passthrough = pipelock_effective_tls_passthrough(_bottle(_routes([ - {"host": "api.github.com", - "auth": {"scheme": "Bearer", "token_ref": "G"}}, - {"host": "registry.npmjs.org", - "auth": {"scheme": "Bearer", "token_ref": "N"}}, - ]))) - self.assertEqual([], passthrough) - - def test_route_policy_adds_tls_passthrough(self): - passthrough = pipelock_effective_tls_passthrough(_bottle(_routes([ - {"host": "api.openai.com", - "auth": {"scheme": "Bearer", "token_ref": "O"}, - "pipelock": {"tls_passthrough": True}}, - {"host": "api.github.com", - "auth": {"scheme": "Bearer", "token_ref": "G"}}, - ]))) - self.assertEqual(["api.openai.com"], passthrough) - - def test_forward_host_credentials_passes_through_codex_hosts(self): - # Egress injects the host bearer on the Codex API hosts; pipelock - # must pass them through or its header DLP blocks the injected JWT - # ("request header contains secret"). Provider routes carry - # tls_passthrough=True; pipelock reads this via egress_routes_for_bottle. - provider_routes = tuple( - EgressRoute( - host=host, - auth_scheme="Bearer", - token_ref=CODEX_HOST_CREDENTIAL_TOKEN_REF, - tls_passthrough=True, - ) - for host in CODEX_HOST_CREDENTIAL_HOSTS - ) - passthrough = pipelock_effective_tls_passthrough( - _bottle({}), provider_routes, - ) - self.assertEqual(["api.openai.com", "chatgpt.com"], passthrough) - - def test_no_codex_passthrough_without_provider_routes(self): - passthrough = pipelock_effective_tls_passthrough(_bottle({ - "agent_provider": {"template": "codex"}, - })) - self.assertEqual([], passthrough) - - -class TestSsrfIpAllowlist(unittest.TestCase): - def test_default_empty(self): - allowlist = pipelock_effective_ssrf_ip_allowlist(_bottle({})) - self.assertEqual([], allowlist) - - def test_route_policy_adds_ssrf_ip_allowlist(self): - allowlist = pipelock_effective_ssrf_ip_allowlist(_bottle(_routes([ - {"host": "gitea.dideric.is", - "auth": {"scheme": "token", "token_ref": "G"}, - "pipelock": {"ssrf_ip_allowlist": ["100.78.141.42/32"]}}, - ]))) - self.assertEqual(["100.78.141.42/32"], allowlist) - - def test_route_policy_merges_with_extra(self): - allowlist = pipelock_effective_ssrf_ip_allowlist( - _bottle(_routes([ - {"host": "gitea.dideric.is", - "pipelock": {"ssrf_ip_allowlist": ["100.78.141.42/32"]}}, - ])), - ("172.20.0.0/16",), - ) - self.assertEqual(["100.78.141.42/32", "172.20.0.0/16"], allowlist) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/unit/test_pipelock_apply.py b/tests/unit/test_pipelock_apply.py deleted file mode 100644 index 8a35729..0000000 --- a/tests/unit/test_pipelock_apply.py +++ /dev/null @@ -1,115 +0,0 @@ -"""Unit: pipelock_apply parsers + helpers (PRD 0015 Phase 1). - -docker exec / cp / restart paths are covered by the integration -test in Phase 4. Here we cover the host-side parsing + yaml roundtrip. -""" - -import unittest - -from bot_bottle.backend.docker.pipelock_apply import ( - PipelockApplyError, - parse_allowlist_content, - render_allowlist_content, -) -from bot_bottle.pipelock import pipelock_render_yaml -from bot_bottle.yaml_subset import parse_yaml_subset - - -class TestParseAllowlistContent(unittest.TestCase): - def test_one_per_line(self): - self.assertEqual( - ["a.example", "b.example"], - parse_allowlist_content("a.example\nb.example\n"), - ) - - def test_blank_lines_ignored(self): - self.assertEqual( - ["a", "b"], - parse_allowlist_content("a\n\n \nb\n"), - ) - - def test_comments_ignored(self): - self.assertEqual( - ["a"], - parse_allowlist_content("# top comment\na\n# trailing\n"), - ) - - def test_invalid_char_raises(self): - with self.assertRaises(PipelockApplyError) as cm: - parse_allowlist_content("host with space\n") - self.assertIn("disallowed characters", str(cm.exception)) - - def test_empty_input_returns_empty_list(self): - self.assertEqual([], parse_allowlist_content("")) - - -class TestRenderAllowlistContent(unittest.TestCase): - def test_one_per_line_with_trailing_newline(self): - self.assertEqual("a\nb\n", render_allowlist_content(["a", "b"])) - - def test_empty_renders_empty(self): - self.assertEqual("", render_allowlist_content([])) - - def test_roundtrip(self): - original = ["api.example.com", "ghcr.io", "example.org"] - self.assertEqual( - original, - parse_allowlist_content(render_allowlist_content(original)), - ) - - -class TestYamlRoundtripPreservesPipelockFields(unittest.TestCase): - """The apply path parses the running pipelock.yaml, swaps - api_allowlist, re-renders. Verify that parse(render(cfg)) == - cfg for the fields pipelock_render_yaml emits — otherwise - the apply would silently drop config.""" - - def test_minimal_config_roundtrips(self): - cfg = { - "version": 1, - "mode": "strict", - "enforce": True, - "api_allowlist": ["a.example", "b.example"], - "forward_proxy": {"enabled": True}, - "dlp": {"include_defaults": True, "scan_env": True}, - "request_body_scanning": {"action": "block"}, - } - rendered = pipelock_render_yaml(cfg) # type: ignore - parsed = parse_yaml_subset(rendered) - self.assertEqual(["a.example", "b.example"], parsed["api_allowlist"]) - self.assertEqual(1, parsed["version"]) - self.assertEqual("strict", parsed["mode"]) - self.assertEqual(True, parsed["enforce"]) - - def test_swap_allowlist_then_render_preserves_other_fields(self): - cfg = { - "version": 1, - "mode": "strict", - "enforce": True, - "api_allowlist": ["old.example"], - "forward_proxy": {"enabled": True}, - "dlp": {"include_defaults": True, "scan_env": True}, - "request_body_scanning": {"action": "block"}, - "tls_interception": { - "enabled": True, - "ca_cert": "/etc/pipelock-ca.pem", - "ca_key": "/etc/pipelock-ca-key.pem", - "passthrough_domains": ["api.anthropic.com"], - }, - } - parsed = parse_yaml_subset(pipelock_render_yaml(cfg)) # type: ignore - parsed["api_allowlist"] = ["new.example"] - rerendered = pipelock_render_yaml(parsed) - roundtripped = parse_yaml_subset(rerendered) - self.assertEqual(["new.example"], roundtripped["api_allowlist"]) - # Non-allowlist fields stay put. - self.assertEqual("strict", roundtripped["mode"]) - tls = roundtripped["tls_interception"] - self.assertIsInstance(tls, dict) - assert isinstance(tls, dict) # type-narrowing - self.assertEqual("/etc/pipelock-ca.pem", tls["ca_cert"]) - self.assertEqual(["api.anthropic.com"], tls["passthrough_domains"]) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/unit/test_pipelock_yaml.py b/tests/unit/test_pipelock_yaml.py deleted file mode 100644 index 52e6dd8..0000000 --- a/tests/unit/test_pipelock_yaml.py +++ /dev/null @@ -1,356 +0,0 @@ -"""Unit: pipelock config building and YAML rendering. - -`pipelock_build_config` produces the structured config dict pipelock -will load; tests assert on that dict so they don't break on cosmetic -YAML changes. A small set of tests still hit the rendered output for -properties that only make sense on disk (file mode, no-secret-leakage). -""" - -import os -import tempfile -import unittest -from pathlib import Path -from typing import cast - -from bot_bottle.manifest import Manifest -from bot_bottle.pipelock import ( - DEFAULT_TLS_PASSTHROUGH, - PipelockProxy, - pipelock_build_config, - pipelock_render_yaml, -) -from bot_bottle.yaml_subset import parse_yaml_subset -from tests.fixtures import fixture_minimal - - -class TestBuildConfig(unittest.TestCase): - def test_minimal_shape(self): - cfg = pipelock_build_config(fixture_minimal().bottles["dev"]) - self.assertEqual("strict", cfg["mode"]) - self.assertEqual(True, cfg["enforce"]) - self.assertEqual({"enabled": True}, cfg["forward_proxy"]) - self.assertEqual( - {"include_defaults": True, "scan_env": True}, cfg["dlp"] - ) - # Body-scan action is hard-coded "block" in pipelock_build_config. - # `scan_headers: True` + `header_mode: "all"` close the - # header-shape exfil gap surfaced by PRD 0022 attack 3. - self.assertEqual( - { - "action": "block", - "scan_headers": True, - "header_mode": "all", - }, - cfg["request_body_scanning"], - ) - # No provider defaults are injected implicitly. - self.assertEqual([], cast(list[str], cfg["api_allowlist"])) - # pipelock has no SSH carve-outs at all — neither - # trusted_domains nor ssrf are emitted from bottle data. - self.assertNotIn("trusted_domains", cfg) - self.assertNotIn("ssrf", cfg) - # Without CA paths, the tls_interception block is omitted — - # pipelock falls back to its built-in default of `enabled: false`. - self.assertNotIn("tls_interception", cfg) - - def test_tls_interception_block_emitted_when_paths_supplied(self): - # PRD 0006: paths flow in via the platform-neutral in-container - # constants; this directly pins the dict shape. - cfg = pipelock_build_config( - fixture_minimal().bottles["dev"], - ca_cert_path="/etc/pipelock-ca.pem", - ca_key_path="/etc/pipelock-ca-key.pem", - ) - self.assertEqual( - { - "enabled": True, - "ca_cert": "/etc/pipelock-ca.pem", - "ca_key": "/etc/pipelock-ca-key.pem", - "passthrough_domains": [], - }, - cfg["tls_interception"], - ) - self.assertEqual((), DEFAULT_TLS_PASSTHROUGH) - - def test_tls_passthrough_route_policy_emits_domain(self): - bottle = Manifest.from_json_obj({ - "bottles": {"dev": {"egress": {"routes": [ - {"host": "api.openai.com", - "auth": {"scheme": "Bearer", "token_ref": "T"}, - "pipelock": {"tls_passthrough": True}}, - ]}}}, - "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, - }).bottles["dev"] - cfg = pipelock_build_config( - bottle, - ca_cert_path="/etc/pipelock-ca.pem", - ca_key_path="/etc/pipelock-ca-key.pem", - ) - tls = cast(dict[str, object], cfg["tls_interception"]) - self.assertEqual(["api.openai.com"], tls["passthrough_domains"]) - - def test_tls_interception_requires_both_paths(self): - # Half-set is a programmer error, not a silent omission. - with self.assertRaises(ValueError): - pipelock_build_config( - fixture_minimal().bottles["dev"], - ca_cert_path="/etc/pipelock-ca.pem", - ) - - def test_ssrf_block_omitted_when_no_allowlist(self): - cfg = pipelock_build_config(fixture_minimal().bottles["dev"]) - self.assertNotIn("ssrf", cfg) - - def test_ssrf_block_emitted_when_allowlist_supplied(self): - # The bottle's internal Docker subnet lands here at launch - # time so sibling-sidecar traffic (172.x.x.x) doesn't trip - # pipelock's RFC1918 SSRF guard. - cfg = pipelock_build_config( - fixture_minimal().bottles["dev"], - ssrf_ip_allowlist=("172.20.0.0/16",), - ) - self.assertIn("ssrf", cfg) - self.assertEqual({"ip_allowlist": ["172.20.0.0/16"]}, cfg["ssrf"]) - - def test_ssrf_block_emitted_from_route_policy(self): - bottle = Manifest.from_json_obj({ - "bottles": {"dev": {"egress": {"routes": [ - {"host": "gitea.dideric.is", - "pipelock": {"ssrf_ip_allowlist": ["100.78.141.42/32"]}}, - ]}}}, - "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, - }).bottles["dev"] - cfg = pipelock_build_config(bottle) - self.assertEqual( - {"ip_allowlist": ["100.78.141.42/32"]}, - cfg["ssrf"], - ) - - def test_seed_phrase_detection_disabled_by_default(self): - # Only the broad BIP-39 detector is disabled. The rest of - # DLP remains enabled via the `dlp` and request-body sections. - cfg = pipelock_build_config(fixture_minimal().bottles["dev"]) - self.assertEqual({"enabled": False}, cfg["seed_phrase_detection"]) - - def test_seed_phrase_detection_disabled_for_openai_route(self): - # OpenAI/Codex chat bodies trip pipelock's BIP-39 detector - # (12+ English words that pass the checksum). pipelock 2.3.0 - # has no per-path knob for this detector, and both `suppress` - # and `rules.disabled` only silence alerts — the block still - # fires. The only knob that actually skips the block is the - # global on/off. - from bot_bottle.manifest import Manifest - bottle = Manifest.from_json_obj({ - "bottles": {"dev": {"egress": {"routes": [ - {"host": "api.openai.com", - "auth": {"scheme": "Bearer", "token_ref": "T"}}, - ]}}}, - "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, - }).bottles["dev"] - cfg = pipelock_build_config(bottle) - self.assertEqual({"enabled": False}, cfg["seed_phrase_detection"]) - - -class TestRenderAndWrite(unittest.TestCase): - def setUp(self): - self.out_dir = Path(tempfile.mkdtemp()) - - def tearDown(self): - import shutil - shutil.rmtree(self.out_dir, ignore_errors=True) - - def assert_render_semantics_match(self, cfg: dict[str, object]) -> None: - parsed = parse_yaml_subset(pipelock_render_yaml(cfg)) - self.assertEqual(cfg["version"], parsed["version"]) - self.assertEqual(cfg["mode"], parsed["mode"]) - self.assertEqual(cfg["enforce"], parsed["enforce"]) - parsed_allowlist = parsed["api_allowlist"] - if cfg["api_allowlist"] == [] and parsed_allowlist is None: - parsed_allowlist = [] - self.assertEqual(cfg["api_allowlist"], parsed_allowlist) - self.assertEqual(cfg["forward_proxy"], parsed["forward_proxy"]) - self.assertEqual(cfg["dlp"], parsed["dlp"]) - self.assertEqual( - cfg["request_body_scanning"], - parsed["request_body_scanning"], - ) - if "seed_phrase_detection" in cfg: - self.assertEqual( - cfg["seed_phrase_detection"], - parsed["seed_phrase_detection"], - ) - else: - self.assertNotIn("seed_phrase_detection", parsed) - - if "tls_interception" in cfg: - expected_tls = cast(dict[str, object], cfg["tls_interception"]) - actual_tls = cast(dict[str, object], parsed["tls_interception"]) - self.assertEqual(expected_tls["enabled"], actual_tls["enabled"]) - self.assertEqual(expected_tls["ca_cert"], actual_tls["ca_cert"]) - self.assertEqual(expected_tls["ca_key"], actual_tls["ca_key"]) - expected_passthrough = expected_tls["passthrough_domains"] - if expected_passthrough: - self.assertEqual( - expected_passthrough, - actual_tls["passthrough_domains"], - ) - else: - self.assertNotIn("passthrough_domains", actual_tls) - else: - self.assertNotIn("tls_interception", parsed) - - if "ssrf" in cfg: - self.assertEqual(cfg["ssrf"], parsed["ssrf"]) - else: - self.assertNotIn("ssrf", parsed) - - def test_render_emits_required_top_level_keys(self): - """One render-level smoke check: the serialized YAML is plausibly - the shape pipelock expects. We don't grep every key here — that's - what TestBuildConfig is for.""" - cfg = pipelock_build_config(fixture_minimal().bottles["dev"]) - text = pipelock_render_yaml(cfg) - for required in ( - "api_allowlist:", - "forward_proxy:", - "dlp:", - "request_body_scanning:", - ): - self.assertIn(required, text) - # No ssh carve-outs in the rendered yaml. - self.assertNotIn("trusted_domains:", text) - self.assertNotIn("ssrf:", text) - - def test_render_semantics_match_minimal_config(self): - cfg = pipelock_build_config(fixture_minimal().bottles["dev"]) - self.assert_render_semantics_match(cfg) - - def test_render_semantics_match_tls_with_empty_passthrough(self): - cfg = pipelock_build_config( - fixture_minimal().bottles["dev"], - ca_cert_path="/etc/pipelock-ca.pem", - ca_key_path="/etc/pipelock-ca-key.pem", - ) - self.assert_render_semantics_match(cfg) - - def test_render_semantics_match_all_optional_sections(self): - bottle = Manifest.from_json_obj({ - "bottles": {"dev": {"egress": {"routes": [ - {"host": "api.openai.com", - "pipelock": {"tls_passthrough": True}}, - {"host": "gitea.dideric.is", - "pipelock": {"ssrf_ip_allowlist": ["100.78.141.42/32"]}}, - ]}}}, - "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, - }).bottles["dev"] - cfg = pipelock_build_config( - bottle, - ca_cert_path="/etc/pipelock-ca.pem", - ca_key_path="/etc/pipelock-ca-key.pem", - ssrf_ip_allowlist=("172.20.0.0/16",), - ) - self.assert_render_semantics_match(cfg) - - def test_render_rejects_missing_required_key(self): - cfg = pipelock_build_config(fixture_minimal().bottles["dev"]) - del cfg["mode"] - with self.assertRaisesRegex(ValueError, r"config\.mode"): - pipelock_render_yaml(cfg) - - def test_render_rejects_wrong_section_type(self): - cfg = pipelock_build_config(fixture_minimal().bottles["dev"]) - cfg["dlp"] = [] - with self.assertRaisesRegex(ValueError, r"config\.dlp.*mapping"): - pipelock_render_yaml(cfg) - - def test_render_rejects_wrong_list_item_type(self): - cfg = pipelock_build_config( - fixture_minimal().bottles["dev"], - ca_cert_path="/etc/pipelock-ca.pem", - ca_key_path="/etc/pipelock-ca-key.pem", - ) - tls = cast(dict[str, object], cfg["tls_interception"]) - tls["passthrough_domains"] = ["api.openai.com", 3] - with self.assertRaisesRegex( - ValueError, r"tls_interception\.passthrough_domains", - ): - pipelock_render_yaml(cfg) - - def test_render_rejects_unsupported_top_level_section(self): - cfg = pipelock_build_config(fixture_minimal().bottles["dev"]) - cfg["trusted_domains"] = [] - with self.assertRaisesRegex(ValueError, r"config\.trusted_domains"): - pipelock_render_yaml(cfg) - - def test_prepare_writes_file_at_mode_600(self): - plan = PipelockProxy().prepare( - fixture_minimal().bottles["dev"], "demo", self.out_dir - ) - self.assertEqual(0o600, os.stat(plan.yaml_path).st_mode & 0o777) - - def test_prepare_does_not_leak_env_names_or_values(self): - manifest = Manifest.from_json_obj({ - "bottles": { - "dev": { - "env": { - "MY_SECRET": "literal-value-should-not-appear", - "ANOTHER": "?prompt-message", - }, - "egress": {"routes": [{"host": "github.com"}]}, - } - }, - "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, - }) - plan = PipelockProxy().prepare( - manifest.bottles["dev"], "demo", self.out_dir - ) - content = plan.yaml_path.read_text() - self.assertNotIn("literal-value-should-not-appear", content) - self.assertNotIn("MY_SECRET", content) - self.assertNotIn("prompt-message", content) - - def test_render_emits_tls_interception_via_prepare(self): - """`PipelockProxy.prepare` plumbs the module-level in-container - CA constants through to the YAML. The block should land in the - rendered output with `enabled: true`, the configured paths, - and any route-owned passthrough domains. The actual - host-side CA generation happens in launch (not prepare), so - this test exercises only the YAML rendering.""" - bottle = Manifest.from_json_obj({ - "bottles": {"dev": {"egress": {"routes": [ - {"host": "api.openai.com", - "pipelock": {"tls_passthrough": True}}, - ]}}}, - "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, - }).bottles["dev"] - plan = PipelockProxy().prepare(bottle, "demo", self.out_dir) - content = plan.yaml_path.read_text() - self.assertIn("tls_interception:", content) - self.assertIn("enabled: true", content) - self.assertIn('ca_cert: "/etc/pipelock-ca.pem"', content) - self.assertIn('ca_key: "/etc/pipelock-ca-key.pem"', content) - self.assertIn("passthrough_domains:", content) - self.assertIn('- "api.openai.com"', content) - - def test_render_emits_ssrf_block_when_allowlist_given(self): - cfg = pipelock_build_config( - fixture_minimal().bottles["dev"], - ca_cert_path="/etc/pipelock-ca.pem", - ca_key_path="/etc/pipelock-ca-key.pem", - ssrf_ip_allowlist=("172.20.0.0/16",), - ) - text = pipelock_render_yaml(cfg) - self.assertIn("ssrf:", text) - self.assertIn("ip_allowlist:", text) - self.assertIn('- "172.20.0.0/16"', text) - - def test_render_emits_seed_phrase_off_by_default(self): - text = pipelock_render_yaml( - pipelock_build_config(fixture_minimal().bottles["dev"]) - ) - self.assertIn("seed_phrase_detection:", text) - self.assertIn("enabled: false", text) - - -if __name__ == "__main__": - unittest.main()