chore: delete pipelock files and strip from manifest layer
lint / lint (push) Failing after 1m36s
test / unit (pull_request) Failing after 33s
test / integration (pull_request) Failing after 18s

- Delete bot_bottle/pipelock.py, backend/docker/pipelock.py,
  backend/docker/pipelock_apply.py
- Delete all pipelock unit/integration/canary tests
- Remove PipelockRoutePolicy from manifest_egress.py; drop the
  Pipelock field from EgressRoute and the 'pipelock' key from
  EgressRoute.from_dict
- Remove PipelockRoutePolicy re-export from manifest.py __all__
This commit is contained in:
2026-06-04 21:11:14 +00:00
parent c94a2542bd
commit 9eb5eef676
16 changed files with 3 additions and 2433 deletions
-45
View File
@@ -1,45 +0,0 @@
"""Canary: the pinned pipelock image's binary actually runs.
This test exists to catch a broken upstream packaging at the pinned
digest. It is NOT part of the per-push suite — that would couple every
dev push to upstream registry availability. Set
BOT_BOTTLE_RUN_CANARIES=1 to opt in (a scheduled CI workflow does
this; humans can run it ad-hoc the same way).
"""
import os
import subprocess
import unittest
from bot_bottle.backend.docker.pipelock import PIPELOCK_IMAGE
from tests._docker import skip_unless_docker
@unittest.skipUnless(
os.environ.get("BOT_BOTTLE_RUN_CANARIES") == "1",
"canary suite is opt-in; set BOT_BOTTLE_RUN_CANARIES=1 to run",
)
@skip_unless_docker()
class TestPipelockImage(unittest.TestCase):
@classmethod
def setUpClass(cls):
result = subprocess.run(
["docker", "pull", PIPELOCK_IMAGE],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
)
if result.returncode != 0:
raise unittest.SkipTest(f"could not pull {PIPELOCK_IMAGE}")
def test_binary_runs(self):
result = subprocess.run(
["docker", "run", "--rm", PIPELOCK_IMAGE, "--version"],
capture_output=True, text=True, check=False,
)
out = result.stdout + result.stderr
self.assertRegex(out, r"[Pp]ipelock|2\.[0-9]+\.[0-9]+")
if __name__ == "__main__":
unittest.main()
@@ -1,110 +0,0 @@
"""Integration: a Node request to a host on pipelock's allowlist is
tunneled through.
End-to-end mirror of test_pipelock_block_node: drives `BottleBackend.
prepare → launch` so the real image build, network plumbing, and
pipelock sidecar are all in the loop. Inside the bottle, a Node
script issues an HTTPS CONNECT for raw.githubusercontent.com:443 —
a host in the baked-in default allowlist — through `$HTTPS_PROXY`.
Pipelock must answer 200 Connection Established. The 200 vs. 403
split on CONNECT is decided by pipelock itself (the remote never
sees the CONNECT verb), so it isolates the allowlist decision from
anything the remote might return.
"""
from __future__ import annotations
import os
import shutil
import tempfile
import unittest
from pathlib import Path
from bot_bottle.backend import BottleSpec, get_bottle_backend
from tests._docker import skip_unless_docker
from tests.fixtures import fixture_minimal
# Output contract (parsed by the test):
# - "connect=<code>" proxy upgraded to a tunnel (CONNECT success path)
# - "status=<code>" proxy answered without tunneling (block path)
# - "error=<code> <message>" transport-level failure
# - "timeout" request hung
_PROBE_JS = r"""
const http = require('http');
const proxy = new URL(process.env.HTTPS_PROXY);
const req = http.request({
host: proxy.hostname,
port: proxy.port,
method: 'CONNECT',
path: 'raw.githubusercontent.com:443',
});
req.on('connect', (res, socket) => {
console.log('connect=' + res.statusCode);
socket.destroy();
process.exit(0);
});
req.on('response', (res) => {
res.resume();
res.on('end', () => {
console.log('status=' + res.statusCode);
process.exit(0);
});
});
req.on('error', (e) => {
console.log('error=' + (e.code || '') + ' ' + e.message);
process.exit(0);
});
req.setTimeout(5000, () => {
console.log('timeout');
req.destroy();
});
req.end();
"""
@skip_unless_docker()
class TestPipelockAllowsNode(unittest.TestCase):
@unittest.skipIf(
os.environ.get("GITEA_ACTIONS") == "true",
"skipped under act_runner: docker socket mount topology breaks "
"in-process visibility of networks created on the host daemon",
)
def test_node_request_to_allowed_host_is_tunneled(self):
backend = get_bottle_backend()
stage_dir = Path(tempfile.mkdtemp(prefix="cb-test-stage."))
try:
spec = BottleSpec(
manifest=fixture_minimal(),
agent_name="demo",
copy_cwd=False,
user_cwd=str(stage_dir),
)
plan = backend.prepare(spec, stage_dir=stage_dir)
with backend.launch(plan) as bottle:
script = (
"set -e\n"
"cat > /tmp/probe.js <<'PROBE_EOF'\n"
f"{_PROBE_JS}\n"
"PROBE_EOF\n"
"node /tmp/probe.js\n"
)
result = bottle.exec(script)
finally:
shutil.rmtree(stage_dir, ignore_errors=True)
self.assertEqual(
0, result.returncode,
f"exec wrapper failed: stdout={result.stdout!r} stderr={result.stderr!r}",
)
# raw.githubusercontent.com IS in fixture_minimal's effective
# allowlist (baked-in default). Pipelock must answer the CONNECT
# with 200 Connection Established.
self.assertIn(
"connect=200", result.stdout,
f"pipelock should have tunneled to raw.githubusercontent.com; got: {result.stdout!r}",
)
if __name__ == "__main__":
unittest.main()
@@ -1,83 +0,0 @@
"""Integration: with pipelock's tls_interception enabled (PRD 0006),
a clean HTTPS GET to an allowlisted host succeeds end-to-end through
the bumped tunnel.
Complement to test_pipelock_blocks_secret_https_post — together they
pin pipelock's two paths (block on body match, allow on clean
traffic). This test is also the implicit TLS-trust check: if
provision_ca had failed to install pipelock's CA into the agent's
trust store, curl would have rejected the bumped leaf cert and the
fetch would have failed before any HTTP response could come back."""
from __future__ import annotations
import os
import shutil
import tempfile
import unittest
from pathlib import Path
from bot_bottle.backend import BottleSpec, get_bottle_backend
from tests._docker import skip_unless_docker
from tests.fixtures import fixture_minimal
# raw.githubusercontent.com is in the baked-in DEFAULT_ALLOWLIST.
# `git`'s own README on the master branch is a long-lived raw file
# (~3 KB) that any CI runner with internet can fetch.
_TARGET_URL = "https://raw.githubusercontent.com/git/git/master/README.md"
@skip_unless_docker()
class TestPipelockAllowsNormalHttps(unittest.TestCase):
@unittest.skipIf(
os.environ.get("GITEA_ACTIONS") == "true",
"skipped under act_runner: docker socket mount topology breaks "
"in-process visibility of networks created on the host daemon",
)
def test_https_get_to_allowed_host_succeeds(self):
backend = get_bottle_backend()
stage_dir = Path(tempfile.mkdtemp(prefix="cb-test-stage."))
try:
spec = BottleSpec(
manifest=fixture_minimal(),
agent_name="demo",
copy_cwd=False,
user_cwd=str(stage_dir),
)
plan = backend.prepare(spec, stage_dir=stage_dir)
with backend.launch(plan) as bottle:
script = (
"set -eu\n"
'curl --proxy "$HTTPS_PROXY" -s --max-time 10 \\\n'
" -w 'status=%{http_code}\\n' \\\n"
" -o /tmp/probe-body.txt \\\n"
f" {_TARGET_URL}\n"
'echo "len=$(wc -c < /tmp/probe-body.txt)"\n'
)
result = bottle.exec(script)
finally:
shutil.rmtree(stage_dir, ignore_errors=True)
self.assertEqual(
0, result.returncode,
f"exec wrapper failed: stdout={result.stdout!r} stderr={result.stderr!r}",
)
# 200 from the upstream (pipelock forwarded after the body
# scan passed). If curl had failed the bumped-cert trust
# check, the exit code or status would be non-200 here.
self.assertIn(
"status=200", result.stdout,
f"expected 200 from raw.githubusercontent.com; got: {result.stdout!r}",
)
# The git README is ~3 KB. Anything substantially non-zero
# proves the response body actually transferred — i.e. the
# CONNECT tunnel + bumped TLS + body forwarding all worked.
self.assertNotIn(
"len=0\n", result.stdout,
f"response body was empty: {result.stdout!r}",
)
if __name__ == "__main__":
unittest.main()
-210
View File
@@ -1,210 +0,0 @@
"""Integration: drive `apply_allowlist_change` against a real
pipelock sidecar (PRD 0015).
Brings up a real pipelock container via direct `docker run` (the
old `.start()` helper went away in PRD 0024 chunk 3), calls
apply_allowlist_change to swap the api_allowlist, restarts
pipelock, and verifies the running container now serves the new
yaml.
The hot-reload code path under test (apply_allowlist_change,
fetch_current_yaml, fetch_current_allowlist) is unchanged from
PRD 0015 — only the test's bringup helper moved.
Setup uses pipelock_tls_init which bind-mounts a host path into a
one-shot pipelock container — that doesn't work in DinD, so the
test skips under GITEA_ACTIONS.
"""
from __future__ import annotations
import os
import shutil
import subprocess
import tempfile
import time
import unittest
from pathlib import Path
from bot_bottle.backend.docker.bottle_state import pipelock_state_dir
from bot_bottle.backend.docker.network import (
network_create_egress,
network_create_internal,
network_remove,
)
from bot_bottle.pipelock import (
PIPELOCK_CA_CERT_IN_CONTAINER,
PIPELOCK_CA_KEY_IN_CONTAINER,
)
from bot_bottle.backend.docker.pipelock import pipelock_tls_init
from bot_bottle.pipelock import PipelockProxy
from bot_bottle.backend.docker.pipelock_apply import (
PipelockApplyError,
apply_allowlist_change,
fetch_current_allowlist,
fetch_current_yaml,
)
from bot_bottle.backend.docker.sidecar_bundle import (
SIDECAR_BUNDLE_IMAGE,
sidecar_bundle_container_name,
)
from bot_bottle.yaml_subset import parse_yaml_subset
from tests._docker import skip_unless_docker
from tests.fixtures import fixture_minimal
@skip_unless_docker()
@unittest.skipIf(
os.environ.get("GITEA_ACTIONS") == "true",
"skipped under act_runner: pipelock_tls_init uses a host bind mount "
"that doesn't share fs with the runner container",
)
class TestPipelockApply(unittest.TestCase):
def setUp(self):
self.slug = f"cb-test-pla-{os.getpid()}-{int(time.time())}"
self.sidecar_name = ""
self.internal_net = ""
self.egress_net = ""
self.work_dir = Path(tempfile.mkdtemp(prefix="pipelock-apply."))
def tearDown(self):
if self.sidecar_name:
subprocess.run(
["docker", "rm", "-f", self.sidecar_name],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False,
)
for n in (self.internal_net, self.egress_net):
if n:
network_remove(n)
shutil.rmtree(self.work_dir, ignore_errors=True)
# Clean up the per-slug state dir under ~/.bot-bottle/state/
# (apply_allowlist_change writes there; _bring_up calls
# proxy.prepare with the same path so the bind-mount and the
# hot-reload write target stay coherent).
shutil.rmtree(pipelock_state_dir(self.slug), ignore_errors=True)
def _bring_up(self) -> None:
"""Brings up the bundle image with only the pipelock daemon
selected. The bundle's Python supervisor is PID 1, which is
what apply_allowlist_change targets via `docker kill
--signal USR1` — pipelock alone as PID 1 wouldn't survive
SIGUSR1 (default disposition = terminate). This shape is
what runs in production minus the other three daemons.
The yaml stages into the production-real
`pipelock_state_dir(slug)` (not a private temp dir) so the
bind-mount target matches what `apply_allowlist_change`
writes to — otherwise the hot-reload would write to a
nowhere-mounted host path and the container would never see
the updated config."""
state_dir = pipelock_state_dir(self.slug)
state_dir.mkdir(parents=True, exist_ok=True)
prep = PipelockProxy().prepare(
fixture_minimal().bottles["dev"], self.slug, state_dir,
)
self.internal_net = network_create_internal(self.slug)
self.egress_net = network_create_egress(self.slug)
ca_cert_host, ca_key_host = pipelock_tls_init(state_dir)
# Ensure the bundle image is built. compose normally builds
# this lazily; we go through `docker run` here so we have to
# do it ourselves. Idempotent — cached layers make repeats
# fast.
repo_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
subprocess.run(
["docker", "build",
"-t", SIDECAR_BUNDLE_IMAGE,
"-f", "Dockerfile.sidecars", "."],
cwd=repo_root, check=True, capture_output=True,
)
self.sidecar_name = sidecar_bundle_container_name(self.slug)
subprocess.run(
["docker", "create",
"--name", self.sidecar_name,
"--network", self.internal_net,
"-e", "BOT_BOTTLE_SIDECAR_DAEMONS=pipelock",
"-v", f"{prep.yaml_path}:/etc/pipelock.yaml:ro",
"-v", f"{ca_cert_host}:{PIPELOCK_CA_CERT_IN_CONTAINER}:ro",
"-v", f"{ca_key_host}:{PIPELOCK_CA_KEY_IN_CONTAINER}:ro",
SIDECAR_BUNDLE_IMAGE],
check=True, capture_output=True,
)
subprocess.run(
["docker", "network", "connect", self.egress_net, self.sidecar_name],
check=True, capture_output=True,
)
subprocess.run(
["docker", "start", self.sidecar_name],
check=True, capture_output=True,
)
# Wait until fetch_current_yaml succeeds — it's a docker cp
# which works on a started-but-not-yet-ready pipelock, so
# this is more of a "container exists" probe than a
# readiness one; the hot-reload tests below tolerate
# pipelock briefly being slow to serve.
deadline = time.monotonic() + 15.0
while time.monotonic() < deadline:
try:
fetch_current_yaml(self.slug)
return
except PipelockApplyError:
pass
time.sleep(0.25)
raise AssertionError("pipelock sidecar never became reachable")
def _wait_for_yaml(self, contains: str, *, deadline_s: float = 15.0) -> str:
"""Poll docker exec until /etc/pipelock.yaml contains `contains`,
returning the yaml. Used to bridge the docker-restart window."""
deadline = time.monotonic() + deadline_s
while time.monotonic() < deadline:
try:
yaml = fetch_current_yaml(self.slug)
if contains in yaml:
return yaml
except PipelockApplyError:
pass
time.sleep(0.25)
self.fail(f"never saw {contains!r} in /etc/pipelock.yaml")
def test_apply_swaps_api_allowlist(self):
self._bring_up()
initial_yaml = fetch_current_yaml(self.slug)
# fixture_minimal yields the baked-in DEFAULT_ALLOWLIST in
# pipelock.py; api.anthropic.com is in there.
self.assertIn("api.anthropic.com", initial_yaml)
new_content = "api.anthropic.com\nnew-host.example\n"
before, after = apply_allowlist_change(self.slug, new_content)
self.assertIn("api.anthropic.com", before)
self.assertNotIn("new-host.example", before)
self.assertIn("new-host.example", after)
updated = self._wait_for_yaml("new-host.example")
cfg = parse_yaml_subset(updated)
self.assertIn("new-host.example", cfg["api_allowlist"]) # type: ignore[operator]
self.assertIn("api.anthropic.com", cfg["api_allowlist"]) # type: ignore[operator]
# tls_interception block (set up by the production prepare
# via pipelock_build_config) is preserved across the swap.
self.assertIn("tls_interception", cfg)
def test_apply_with_invalid_host_raises(self):
self._bring_up()
with self.assertRaises(PipelockApplyError):
apply_allowlist_change(self.slug, "host with space.example\n")
def test_fetch_current_allowlist_renders_one_per_line(self):
self._bring_up()
listing = fetch_current_allowlist(self.slug)
self.assertTrue(listing.endswith("\n"))
self.assertIn("api.anthropic.com\n", listing)
def test_apply_against_missing_sidecar_raises(self):
# Don't bring up — the slug points at nothing.
with self.assertRaises(PipelockApplyError):
apply_allowlist_change(self.slug, "x.example\n")
if __name__ == "__main__":
unittest.main()
@@ -1,114 +0,0 @@
"""Integration: a Node script run inside a launched bottle, hitting
a host outside the pipelock allowlist, is blocked.
End-to-end: drives `BottleBackend.prepare → launch` so the real
image build, network plumbing, and pipelock sidecar are all in the
loop. Inside the bottle, a Node script forms an HTTP forward-proxy
request (absolute-URI path) to `example.com` via `$HTTPS_PROXY`. The
fixture's effective allowlist contains only the baked-in defaults,
so pipelock must refuse to forward.
"""
from __future__ import annotations
import os
import shutil
import tempfile
import unittest
from pathlib import Path
from bot_bottle.backend import BottleSpec, get_bottle_backend
from tests._docker import skip_unless_docker
from tests.fixtures import fixture_minimal
# Node's stdlib http does not respect HTTPS_PROXY on its own; this
# script builds the forward-proxy request shape by hand so the test
# is asserting on pipelock's allowlist decision, not on whatever
# proxy-env auto-detection a Node release happens to ship.
#
# Output contract (parsed by the test):
# - "status=<code>" when the proxy returns an HTTP response
# - "error=<code> <message>" on a transport-level failure
# - "timeout" on a hung request
_PROBE_JS = r"""
const http = require('http');
const proxy = new URL(process.env.HTTPS_PROXY);
const req = http.request({
host: proxy.hostname,
port: proxy.port,
method: 'GET',
path: 'http://example.com/',
headers: { Host: 'example.com' },
}, (res) => {
res.resume();
res.on('end', () => {
console.log('status=' + res.statusCode);
process.exit(0);
});
});
req.on('error', (e) => {
console.log('error=' + (e.code || '') + ' ' + e.message);
process.exit(0);
});
req.setTimeout(5000, () => {
console.log('timeout');
req.destroy();
});
req.end();
"""
@skip_unless_docker()
class TestPipelockBlocksNode(unittest.TestCase):
@unittest.skipIf(
os.environ.get("GITEA_ACTIONS") == "true",
"skipped under act_runner: docker socket mount topology breaks "
"in-process visibility of networks created on the host daemon",
)
def test_node_request_to_blocked_host_is_rejected(self):
backend = get_bottle_backend()
stage_dir = Path(tempfile.mkdtemp(prefix="cb-test-stage."))
try:
spec = BottleSpec(
manifest=fixture_minimal(),
agent_name="demo",
copy_cwd=False,
user_cwd=str(stage_dir),
)
plan = backend.prepare(spec, stage_dir=stage_dir)
with backend.launch(plan) as bottle:
script = (
"set -e\n"
"cat > /tmp/probe.js <<'PROBE_EOF'\n"
f"{_PROBE_JS}\n"
"PROBE_EOF\n"
"node /tmp/probe.js\n"
)
result = bottle.exec(script)
finally:
shutil.rmtree(stage_dir, ignore_errors=True)
self.assertEqual(
0, result.returncode,
f"exec wrapper failed: stdout={result.stdout!r} stderr={result.stderr!r}",
)
# The probe always prints exactly one signal line. If it
# doesn't, the script failed in a way the test doesn't
# understand and the surrounding assertions would be
# ambiguous.
self.assertTrue(
"status=" in result.stdout or "error=" in result.stdout or "timeout" in result.stdout,
f"probe produced no recognized output: {result.stdout!r}",
)
# The core invariant: example.com is NOT in fixture_minimal's
# effective allowlist (only the baked-in defaults), so the
# proxy must not have forwarded a successful response.
self.assertNotIn(
"status=200", result.stdout,
"example.com is outside the allowlist; pipelock should not have forwarded a 200",
)
if __name__ == "__main__":
unittest.main()
@@ -1,101 +0,0 @@
"""Integration: with pipelock's tls_interception enabled (PRD 0006),
a credential POST sent over HTTPS is blocked by pipelock's body-scan
layer — closing the gap that motivated this PRD.
End-to-end: drives `BottleBackend.prepare → launch` so the real
image build, network plumbing, pipelock_tls_init, sidecar bring-up,
and provision_ca (CA install in the agent's trust store) are all in
the loop. The probe is a single `curl --proxy "$HTTPS_PROXY" -X POST
... https://raw.githubusercontent.com/...` — curl natively does
CONNECT through the proxy, the agent's trust store now contains
pipelock's per-bottle CA so curl trusts pipelock's bumped leaf, and
pipelock sees the decrypted body and returns its known
`blocked: request body contains secret: <pattern>` 403.
The host has to be allowlisted (so the CONNECT is accepted) but must
not opt into `pipelock.tls_passthrough` (so the body actually gets
scanned). This probe targets `raw.githubusercontent.com`, which is on
the baked allowlist and intercepted+scanned like any non-passthrough
host."""
from __future__ import annotations
import os
import shutil
import tempfile
import unittest
from pathlib import Path
from bot_bottle.backend import BottleSpec, get_bottle_backend
from bot_bottle.manifest import Manifest
from tests._docker import skip_unless_docker
# Synthetic value shaped like a GitHub Personal Access Token; not a
# real credential. Carried into the bottle as an env var so the
# probe shell can read it via $FAKE_TOKEN without ever interpolating
# the value on the bash `bottle.exec` argv.
_FAKE_TOKEN = "ghp_aB3cD4eF5gH6iJ7kL8mN9oP0qR1sT2uV3wX4yZ"
@skip_unless_docker()
class TestPipelockBlocksSecretHttpsPost(unittest.TestCase):
@unittest.skipIf(
os.environ.get("GITEA_ACTIONS") == "true",
"skipped under act_runner: docker socket mount topology breaks "
"in-process visibility of networks created on the host daemon",
)
def test_https_post_with_credential_body_is_blocked(self):
manifest = Manifest.from_json_obj({
"bottles": {
"dev": {"env": {"FAKE_TOKEN": _FAKE_TOKEN}},
},
"agents": {
"demo": {"skills": [], "prompt": "", "bottle": "dev"},
},
})
backend = get_bottle_backend()
stage_dir = Path(tempfile.mkdtemp(prefix="cb-test-stage."))
try:
spec = BottleSpec(
manifest=manifest,
agent_name="demo",
copy_cwd=False,
user_cwd=str(stage_dir),
)
plan = backend.prepare(spec, stage_dir=stage_dir)
with backend.launch(plan) as bottle:
script = (
"set -eu\n"
'curl --proxy "$HTTPS_PROXY" -s --max-time 8 \\\n'
" -w 'status=%{http_code}\\n' \\\n"
" -o /tmp/probe-body.txt \\\n"
' -X POST -d "token=$FAKE_TOKEN" \\\n'
" https://raw.githubusercontent.com/dlp-probe\n"
'echo "body=$(head -c 200 /tmp/probe-body.txt)"\n'
)
result = bottle.exec(script)
finally:
shutil.rmtree(stage_dir, ignore_errors=True)
self.assertEqual(
0, result.returncode,
f"exec wrapper failed: stdout={result.stdout!r} stderr={result.stderr!r}",
)
# Pipelock's body-scan block returns 403 with a plain-text
# body starting `blocked: ` (pinned empirically; see
# tests/unit/test_mitmproxy_verdict.py for the
# corresponding-fingerprint test, retained from PR #8 as
# general pipelock-block-shape coverage).
self.assertIn(
"status=403", result.stdout,
f"expected 403 from pipelock; got: {result.stdout!r}",
)
self.assertIn(
"body=blocked: ", result.stdout,
f"expected pipelock block body; got: {result.stdout!r}",
)
if __name__ == "__main__":
unittest.main()
@@ -1,132 +0,0 @@
"""Integration: pipelock blocks a POST whose body carries a
recognized credential pattern, even when the host is on the
allowlist.
End-to-end companion to the block / allow node tests. The manifest
carries a literal env var whose value matches pipelock's DLP rules.
A Node script POSTs that value to an allowlisted host via plain
HTTP forward proxy (absolute-URI form) so pipelock can scan the
body — routing the same request over CONNECT would tunnel TLS
opaquely and the DLP layer would have nothing to see. The 403
return from pipelock isolates the body-scan layer as the active
control, distinct from the host-allowlist decision the other two
tests pin down.
"""
from __future__ import annotations
import os
import shutil
import tempfile
import unittest
from pathlib import Path
from bot_bottle.backend import BottleSpec, get_bottle_backend
from bot_bottle.manifest import Manifest
from tests._docker import skip_unless_docker
# Synthetic value shaped like a GitHub Personal Access Token
# (`ghp_` + 36 alnum chars). Not a real token; the only relevant
# property is that pipelock's default DLP rules recognize the
# shape. Kept obviously dummy so a stray grep can't mistake it
# for a real credential.
_FAKE_TOKEN = "ghp_aB3cD4eF5gH6iJ7kL8mN9oP0qR1sT2uV3wX4yZ"
# Output contract (parsed by the test):
# - "status=<code>" proxy answered with an HTTP response
# - "error=<code> <message>" transport-level failure
# - "timeout" request hung
_PROBE_JS = r"""
const http = require('http');
const proxy = new URL(process.env.HTTPS_PROXY);
const body = 'token=' + process.env.FAKE_TOKEN;
const req = http.request({
host: proxy.hostname,
port: proxy.port,
method: 'POST',
// Absolute-URI form: pipelock acts as a plain HTTP forward proxy
// and the body is visible to its DLP scanner. CONNECT would
// tunnel TLS bytes that pipelock can't see into.
path: 'http://api.anthropic.com/dlp-probe',
headers: {
Host: 'api.anthropic.com',
'Content-Type': 'application/x-www-form-urlencoded',
'Content-Length': Buffer.byteLength(body),
},
}, (res) => {
res.resume();
res.on('end', () => {
console.log('status=' + res.statusCode);
process.exit(0);
});
});
req.on('error', (e) => {
console.log('error=' + (e.code || '') + ' ' + e.message);
process.exit(0);
});
req.setTimeout(5000, () => {
console.log('timeout');
req.destroy();
});
req.write(body);
req.end();
"""
@skip_unless_docker()
class TestPipelockBlocksSecretPost(unittest.TestCase):
@unittest.skipIf(
os.environ.get("GITEA_ACTIONS") == "true",
"skipped under act_runner: docker socket mount topology breaks "
"in-process visibility of networks created on the host daemon",
)
def test_post_with_credential_body_is_blocked(self):
manifest = Manifest.from_json_obj({
"bottles": {
"dev": {"env": {"FAKE_TOKEN": _FAKE_TOKEN}},
},
"agents": {
"demo": {"skills": [], "prompt": "", "bottle": "dev"},
},
})
backend = get_bottle_backend()
stage_dir = Path(tempfile.mkdtemp(prefix="cb-test-stage."))
try:
spec = BottleSpec(
manifest=manifest,
agent_name="demo",
copy_cwd=False,
user_cwd=str(stage_dir),
)
plan = backend.prepare(spec, stage_dir=stage_dir)
with backend.launch(plan) as bottle:
script = (
"set -e\n"
"cat > /tmp/probe.js <<'PROBE_EOF'\n"
f"{_PROBE_JS}\n"
"PROBE_EOF\n"
"node /tmp/probe.js\n"
)
result = bottle.exec(script)
finally:
shutil.rmtree(stage_dir, ignore_errors=True)
self.assertEqual(
0, result.returncode,
f"exec wrapper failed: stdout={result.stdout!r} stderr={result.stderr!r}",
)
# api.anthropic.com is on the baked-in allowlist, so the
# host-allowlist layer would have let this through. Pipelock's
# DLP body-scan layer must catch the credential pattern and
# answer 403; any other code means the body reached the
# upstream.
self.assertIn(
"status=403", result.stdout,
f"pipelock DLP should have blocked the credential POST; got: {result.stdout!r}",
)
if __name__ == "__main__":
unittest.main()
@@ -1,107 +0,0 @@
"""Integration: route-owned `pipelock.tls_passthrough` renders into
pipelock's `tls_interception.passthrough_domains`, so request bodies
that would otherwise trip the body-scan layer are not inspected and the
request reaches the provider TLS endpoint.
Probe: POST the canonical zero-entropy 12-word BIP-39 mnemonic
(`abandon` × 11 + `about`) — checksum-valid by construction — to
`https://api.anthropic.com/v1/messages`. With the route policy,
pipelock relays the CONNECT opaquely and the upstream replies with
whatever it likes (401/4xx from Anthropic for an unauthenticated junk
POST). We assert that the verdict is NOT pipelock's block.
"""
from __future__ import annotations
import os
import shutil
import tempfile
import unittest
from pathlib import Path
from bot_bottle.backend import BottleSpec, get_bottle_backend
from bot_bottle.manifest import Manifest
from tests._docker import skip_unless_docker
# Canonical BIP-39 12-word test mnemonic. Valid SHA-256 checksum —
# pipelock's seed-phrase scanner (default `verify_checksum: true`)
# fires on this exact string if it ever sees the cleartext body.
_BIP39_PHRASE = (
"abandon abandon abandon abandon abandon abandon "
"abandon abandon abandon abandon abandon about"
)
@skip_unless_docker()
class TestPipelockLlmPassthrough(unittest.TestCase):
@unittest.skipIf(
os.environ.get("GITEA_ACTIONS") == "true",
"skipped under act_runner: docker socket mount topology breaks "
"in-process visibility of networks created on the host daemon",
)
def test_bip39_body_to_anthropic_is_not_blocked(self):
manifest = Manifest.from_json_obj({
"bottles": {
"dev": {
"env": {"SEED": _BIP39_PHRASE},
"egress": {"routes": [{
"host": "api.anthropic.com",
"pipelock": {"tls_passthrough": True},
}]},
},
},
"agents": {
"demo": {"skills": [], "prompt": "", "bottle": "dev"},
},
})
backend = get_bottle_backend()
stage_dir = Path(tempfile.mkdtemp(prefix="cb-test-stage."))
try:
spec = BottleSpec(
manifest=manifest,
agent_name="demo",
copy_cwd=False,
user_cwd=str(stage_dir),
)
plan = backend.prepare(spec, stage_dir=stage_dir)
with backend.launch(plan) as bottle:
script = (
"set -eu\n"
'curl --proxy "$HTTPS_PROXY" -s --max-time 10 \\\n'
" -w 'status=%{http_code}\\n' \\\n"
" -o /tmp/probe-body.txt \\\n"
' -X POST -H "content-type: application/json" \\\n'
' --data "{\\"phrase\\": \\"$SEED\\"}" \\\n'
" https://api.anthropic.com/v1/messages\n"
'echo "body=$(head -c 200 /tmp/probe-body.txt)"\n'
)
result = bottle.exec(script)
finally:
shutil.rmtree(stage_dir, ignore_errors=True)
self.assertEqual(
0, result.returncode,
f"exec wrapper failed: stdout={result.stdout!r} "
f"stderr={result.stderr!r}",
)
# The pipelock block verdict starts with `blocked: ` in the
# body. Anything else (auth error, 401, 4xx from Anthropic) is
# an acceptable outcome — it means the body was NOT inspected
# by the proxy and the request was relayed to the upstream
# TLS endpoint.
self.assertNotIn(
"body=blocked: ", result.stdout,
f"unexpected pipelock body-scan block on api.anthropic.com; "
f"expected passthrough to skip MITM. got: {result.stdout!r}",
)
self.assertNotIn(
"BIP-39", result.stdout,
f"BIP-39 verdict should never appear for api.anthropic.com "
f"requests under tls_interception.passthrough_domains; "
f"got: {result.stdout!r}",
)
if __name__ == "__main__":
unittest.main()
-169
View File
@@ -1,169 +0,0 @@
"""Unit: pipelock_effective_allowlist — pipelock's allowlist
mirrors manifest-declared egress routes. Git upstreams declared in
`bottle.git` don't contribute; they flow through the per-agent
git-gate (PRD 0008)."""
import unittest
from bot_bottle.agent_provider import CODEX_HOST_CREDENTIAL_HOSTS
from bot_bottle.egress import CODEX_HOST_CREDENTIAL_TOKEN_REF, EgressRoute
from bot_bottle.manifest import Manifest
from bot_bottle.pipelock import (
pipelock_effective_allowlist,
pipelock_effective_ssrf_ip_allowlist,
pipelock_effective_tls_passthrough,
)
def _bottle(spec): # type: ignore
return Manifest.from_json_obj({
"bottles": {"dev": spec},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
}).bottles["dev"]
def _routes(routes): # type: ignore
return {"egress": {"routes": routes}}
class TestEffectiveAllowlist(unittest.TestCase):
def test_empty_without_any_manifest_routes(self):
eff = pipelock_effective_allowlist(_bottle({}))
self.assertEqual([], eff)
def test_sorted_and_deduped(self):
eff = pipelock_effective_allowlist(_bottle(_routes([
{"host": "api.anthropic.com",
"auth": {"scheme": "Bearer", "token_ref": "T"}},
])))
self.assertEqual(len(eff), len(set(eff)))
self.assertEqual(eff, sorted(eff))
class TestAllowlistWithRoutes(unittest.TestCase):
def test_manifest_route_hosts_present(self):
eff = pipelock_effective_allowlist(_bottle(_routes([
{"host": "registry.npmjs.org",
"auth": {"scheme": "Bearer", "token_ref": "N"}},
{"host": "api.github.com",
"auth": {"scheme": "Bearer", "token_ref": "G"}},
])))
self.assertIn("registry.npmjs.org", eff)
self.assertIn("api.github.com", eff)
def test_no_baked_defaults_alongside_manifest_routes(self):
eff = pipelock_effective_allowlist(_bottle(_routes([
{"host": "x.example",
"auth": {"scheme": "Bearer", "token_ref": "T"}},
])))
self.assertEqual(["x.example"], eff)
def test_egress_hostname_NOT_in_pipelock_allowlist(self):
# The agent never dials egress via the proxy mechanism
# — it IS the proxy. Pipelock receives upstream hostnames
# from egress's CONNECT requests, not the
# `egress` hostname itself.
eff = pipelock_effective_allowlist(_bottle(_routes([
{"host": "x.example",
"auth": {"scheme": "Bearer", "token_ref": "T"}},
])))
self.assertNotIn("egress", eff)
def test_supervise_hostname_auto_added_when_supervise_enabled(self):
eff = pipelock_effective_allowlist(_bottle({"supervise": True}))
self.assertIn("supervise", eff)
def test_supervise_hostname_NOT_added_when_disabled(self):
eff = pipelock_effective_allowlist(_bottle({}))
self.assertNotIn("supervise", eff)
eff_explicit = pipelock_effective_allowlist(_bottle({"supervise": False}))
self.assertNotIn("supervise", eff_explicit)
def test_path_allowlist_does_not_affect_pipelock_allowlist(self):
# path_allowlist is enforced by egress, not pipelock.
# Pipelock only sees the upstream hostname; the path filter
# has already passed (or 403'd) at egress.
eff = pipelock_effective_allowlist(_bottle(_routes([
{"host": "github.com", "path_allowlist": ["/x/", "/y/"]},
])))
self.assertIn("github.com", eff)
for entry in eff:
self.assertFalse(entry.startswith("/"))
class TestTlsPassthrough(unittest.TestCase):
def test_default_empty(self):
passthrough = pipelock_effective_tls_passthrough(_bottle({}))
self.assertEqual([], passthrough)
def test_route_hosts_not_added_to_passthrough_by_default(self):
passthrough = pipelock_effective_tls_passthrough(_bottle(_routes([
{"host": "api.github.com",
"auth": {"scheme": "Bearer", "token_ref": "G"}},
{"host": "registry.npmjs.org",
"auth": {"scheme": "Bearer", "token_ref": "N"}},
])))
self.assertEqual([], passthrough)
def test_route_policy_adds_tls_passthrough(self):
passthrough = pipelock_effective_tls_passthrough(_bottle(_routes([
{"host": "api.openai.com",
"auth": {"scheme": "Bearer", "token_ref": "O"},
"pipelock": {"tls_passthrough": True}},
{"host": "api.github.com",
"auth": {"scheme": "Bearer", "token_ref": "G"}},
])))
self.assertEqual(["api.openai.com"], passthrough)
def test_forward_host_credentials_passes_through_codex_hosts(self):
# Egress injects the host bearer on the Codex API hosts; pipelock
# must pass them through or its header DLP blocks the injected JWT
# ("request header contains secret"). Provider routes carry
# tls_passthrough=True; pipelock reads this via egress_routes_for_bottle.
provider_routes = tuple(
EgressRoute(
host=host,
auth_scheme="Bearer",
token_ref=CODEX_HOST_CREDENTIAL_TOKEN_REF,
tls_passthrough=True,
)
for host in CODEX_HOST_CREDENTIAL_HOSTS
)
passthrough = pipelock_effective_tls_passthrough(
_bottle({}), provider_routes,
)
self.assertEqual(["api.openai.com", "chatgpt.com"], passthrough)
def test_no_codex_passthrough_without_provider_routes(self):
passthrough = pipelock_effective_tls_passthrough(_bottle({
"agent_provider": {"template": "codex"},
}))
self.assertEqual([], passthrough)
class TestSsrfIpAllowlist(unittest.TestCase):
def test_default_empty(self):
allowlist = pipelock_effective_ssrf_ip_allowlist(_bottle({}))
self.assertEqual([], allowlist)
def test_route_policy_adds_ssrf_ip_allowlist(self):
allowlist = pipelock_effective_ssrf_ip_allowlist(_bottle(_routes([
{"host": "gitea.dideric.is",
"auth": {"scheme": "token", "token_ref": "G"},
"pipelock": {"ssrf_ip_allowlist": ["100.78.141.42/32"]}},
])))
self.assertEqual(["100.78.141.42/32"], allowlist)
def test_route_policy_merges_with_extra(self):
allowlist = pipelock_effective_ssrf_ip_allowlist(
_bottle(_routes([
{"host": "gitea.dideric.is",
"pipelock": {"ssrf_ip_allowlist": ["100.78.141.42/32"]}},
])),
("172.20.0.0/16",),
)
self.assertEqual(["100.78.141.42/32", "172.20.0.0/16"], allowlist)
if __name__ == "__main__":
unittest.main()
-115
View File
@@ -1,115 +0,0 @@
"""Unit: pipelock_apply parsers + helpers (PRD 0015 Phase 1).
docker exec / cp / restart paths are covered by the integration
test in Phase 4. Here we cover the host-side parsing + yaml roundtrip.
"""
import unittest
from bot_bottle.backend.docker.pipelock_apply import (
PipelockApplyError,
parse_allowlist_content,
render_allowlist_content,
)
from bot_bottle.pipelock import pipelock_render_yaml
from bot_bottle.yaml_subset import parse_yaml_subset
class TestParseAllowlistContent(unittest.TestCase):
def test_one_per_line(self):
self.assertEqual(
["a.example", "b.example"],
parse_allowlist_content("a.example\nb.example\n"),
)
def test_blank_lines_ignored(self):
self.assertEqual(
["a", "b"],
parse_allowlist_content("a\n\n \nb\n"),
)
def test_comments_ignored(self):
self.assertEqual(
["a"],
parse_allowlist_content("# top comment\na\n# trailing\n"),
)
def test_invalid_char_raises(self):
with self.assertRaises(PipelockApplyError) as cm:
parse_allowlist_content("host with space\n")
self.assertIn("disallowed characters", str(cm.exception))
def test_empty_input_returns_empty_list(self):
self.assertEqual([], parse_allowlist_content(""))
class TestRenderAllowlistContent(unittest.TestCase):
def test_one_per_line_with_trailing_newline(self):
self.assertEqual("a\nb\n", render_allowlist_content(["a", "b"]))
def test_empty_renders_empty(self):
self.assertEqual("", render_allowlist_content([]))
def test_roundtrip(self):
original = ["api.example.com", "ghcr.io", "example.org"]
self.assertEqual(
original,
parse_allowlist_content(render_allowlist_content(original)),
)
class TestYamlRoundtripPreservesPipelockFields(unittest.TestCase):
"""The apply path parses the running pipelock.yaml, swaps
api_allowlist, re-renders. Verify that parse(render(cfg)) ==
cfg for the fields pipelock_render_yaml emits — otherwise
the apply would silently drop config."""
def test_minimal_config_roundtrips(self):
cfg = {
"version": 1,
"mode": "strict",
"enforce": True,
"api_allowlist": ["a.example", "b.example"],
"forward_proxy": {"enabled": True},
"dlp": {"include_defaults": True, "scan_env": True},
"request_body_scanning": {"action": "block"},
}
rendered = pipelock_render_yaml(cfg) # type: ignore
parsed = parse_yaml_subset(rendered)
self.assertEqual(["a.example", "b.example"], parsed["api_allowlist"])
self.assertEqual(1, parsed["version"])
self.assertEqual("strict", parsed["mode"])
self.assertEqual(True, parsed["enforce"])
def test_swap_allowlist_then_render_preserves_other_fields(self):
cfg = {
"version": 1,
"mode": "strict",
"enforce": True,
"api_allowlist": ["old.example"],
"forward_proxy": {"enabled": True},
"dlp": {"include_defaults": True, "scan_env": True},
"request_body_scanning": {"action": "block"},
"tls_interception": {
"enabled": True,
"ca_cert": "/etc/pipelock-ca.pem",
"ca_key": "/etc/pipelock-ca-key.pem",
"passthrough_domains": ["api.anthropic.com"],
},
}
parsed = parse_yaml_subset(pipelock_render_yaml(cfg)) # type: ignore
parsed["api_allowlist"] = ["new.example"]
rerendered = pipelock_render_yaml(parsed)
roundtripped = parse_yaml_subset(rerendered)
self.assertEqual(["new.example"], roundtripped["api_allowlist"])
# Non-allowlist fields stay put.
self.assertEqual("strict", roundtripped["mode"])
tls = roundtripped["tls_interception"]
self.assertIsInstance(tls, dict)
assert isinstance(tls, dict) # type-narrowing
self.assertEqual("/etc/pipelock-ca.pem", tls["ca_cert"])
self.assertEqual(["api.anthropic.com"], tls["passthrough_domains"])
if __name__ == "__main__":
unittest.main()
-356
View File
@@ -1,356 +0,0 @@
"""Unit: pipelock config building and YAML rendering.
`pipelock_build_config` produces the structured config dict pipelock
will load; tests assert on that dict so they don't break on cosmetic
YAML changes. A small set of tests still hit the rendered output for
properties that only make sense on disk (file mode, no-secret-leakage).
"""
import os
import tempfile
import unittest
from pathlib import Path
from typing import cast
from bot_bottle.manifest import Manifest
from bot_bottle.pipelock import (
DEFAULT_TLS_PASSTHROUGH,
PipelockProxy,
pipelock_build_config,
pipelock_render_yaml,
)
from bot_bottle.yaml_subset import parse_yaml_subset
from tests.fixtures import fixture_minimal
class TestBuildConfig(unittest.TestCase):
def test_minimal_shape(self):
cfg = pipelock_build_config(fixture_minimal().bottles["dev"])
self.assertEqual("strict", cfg["mode"])
self.assertEqual(True, cfg["enforce"])
self.assertEqual({"enabled": True}, cfg["forward_proxy"])
self.assertEqual(
{"include_defaults": True, "scan_env": True}, cfg["dlp"]
)
# Body-scan action is hard-coded "block" in pipelock_build_config.
# `scan_headers: True` + `header_mode: "all"` close the
# header-shape exfil gap surfaced by PRD 0022 attack 3.
self.assertEqual(
{
"action": "block",
"scan_headers": True,
"header_mode": "all",
},
cfg["request_body_scanning"],
)
# No provider defaults are injected implicitly.
self.assertEqual([], cast(list[str], cfg["api_allowlist"]))
# pipelock has no SSH carve-outs at all — neither
# trusted_domains nor ssrf are emitted from bottle data.
self.assertNotIn("trusted_domains", cfg)
self.assertNotIn("ssrf", cfg)
# Without CA paths, the tls_interception block is omitted —
# pipelock falls back to its built-in default of `enabled: false`.
self.assertNotIn("tls_interception", cfg)
def test_tls_interception_block_emitted_when_paths_supplied(self):
# PRD 0006: paths flow in via the platform-neutral in-container
# constants; this directly pins the dict shape.
cfg = pipelock_build_config(
fixture_minimal().bottles["dev"],
ca_cert_path="/etc/pipelock-ca.pem",
ca_key_path="/etc/pipelock-ca-key.pem",
)
self.assertEqual(
{
"enabled": True,
"ca_cert": "/etc/pipelock-ca.pem",
"ca_key": "/etc/pipelock-ca-key.pem",
"passthrough_domains": [],
},
cfg["tls_interception"],
)
self.assertEqual((), DEFAULT_TLS_PASSTHROUGH)
def test_tls_passthrough_route_policy_emits_domain(self):
bottle = Manifest.from_json_obj({
"bottles": {"dev": {"egress": {"routes": [
{"host": "api.openai.com",
"auth": {"scheme": "Bearer", "token_ref": "T"},
"pipelock": {"tls_passthrough": True}},
]}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
}).bottles["dev"]
cfg = pipelock_build_config(
bottle,
ca_cert_path="/etc/pipelock-ca.pem",
ca_key_path="/etc/pipelock-ca-key.pem",
)
tls = cast(dict[str, object], cfg["tls_interception"])
self.assertEqual(["api.openai.com"], tls["passthrough_domains"])
def test_tls_interception_requires_both_paths(self):
# Half-set is a programmer error, not a silent omission.
with self.assertRaises(ValueError):
pipelock_build_config(
fixture_minimal().bottles["dev"],
ca_cert_path="/etc/pipelock-ca.pem",
)
def test_ssrf_block_omitted_when_no_allowlist(self):
cfg = pipelock_build_config(fixture_minimal().bottles["dev"])
self.assertNotIn("ssrf", cfg)
def test_ssrf_block_emitted_when_allowlist_supplied(self):
# The bottle's internal Docker subnet lands here at launch
# time so sibling-sidecar traffic (172.x.x.x) doesn't trip
# pipelock's RFC1918 SSRF guard.
cfg = pipelock_build_config(
fixture_minimal().bottles["dev"],
ssrf_ip_allowlist=("172.20.0.0/16",),
)
self.assertIn("ssrf", cfg)
self.assertEqual({"ip_allowlist": ["172.20.0.0/16"]}, cfg["ssrf"])
def test_ssrf_block_emitted_from_route_policy(self):
bottle = Manifest.from_json_obj({
"bottles": {"dev": {"egress": {"routes": [
{"host": "gitea.dideric.is",
"pipelock": {"ssrf_ip_allowlist": ["100.78.141.42/32"]}},
]}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
}).bottles["dev"]
cfg = pipelock_build_config(bottle)
self.assertEqual(
{"ip_allowlist": ["100.78.141.42/32"]},
cfg["ssrf"],
)
def test_seed_phrase_detection_disabled_by_default(self):
# Only the broad BIP-39 detector is disabled. The rest of
# DLP remains enabled via the `dlp` and request-body sections.
cfg = pipelock_build_config(fixture_minimal().bottles["dev"])
self.assertEqual({"enabled": False}, cfg["seed_phrase_detection"])
def test_seed_phrase_detection_disabled_for_openai_route(self):
# OpenAI/Codex chat bodies trip pipelock's BIP-39 detector
# (12+ English words that pass the checksum). pipelock 2.3.0
# has no per-path knob for this detector, and both `suppress`
# and `rules.disabled` only silence alerts — the block still
# fires. The only knob that actually skips the block is the
# global on/off.
from bot_bottle.manifest import Manifest
bottle = Manifest.from_json_obj({
"bottles": {"dev": {"egress": {"routes": [
{"host": "api.openai.com",
"auth": {"scheme": "Bearer", "token_ref": "T"}},
]}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
}).bottles["dev"]
cfg = pipelock_build_config(bottle)
self.assertEqual({"enabled": False}, cfg["seed_phrase_detection"])
class TestRenderAndWrite(unittest.TestCase):
def setUp(self):
self.out_dir = Path(tempfile.mkdtemp())
def tearDown(self):
import shutil
shutil.rmtree(self.out_dir, ignore_errors=True)
def assert_render_semantics_match(self, cfg: dict[str, object]) -> None:
parsed = parse_yaml_subset(pipelock_render_yaml(cfg))
self.assertEqual(cfg["version"], parsed["version"])
self.assertEqual(cfg["mode"], parsed["mode"])
self.assertEqual(cfg["enforce"], parsed["enforce"])
parsed_allowlist = parsed["api_allowlist"]
if cfg["api_allowlist"] == [] and parsed_allowlist is None:
parsed_allowlist = []
self.assertEqual(cfg["api_allowlist"], parsed_allowlist)
self.assertEqual(cfg["forward_proxy"], parsed["forward_proxy"])
self.assertEqual(cfg["dlp"], parsed["dlp"])
self.assertEqual(
cfg["request_body_scanning"],
parsed["request_body_scanning"],
)
if "seed_phrase_detection" in cfg:
self.assertEqual(
cfg["seed_phrase_detection"],
parsed["seed_phrase_detection"],
)
else:
self.assertNotIn("seed_phrase_detection", parsed)
if "tls_interception" in cfg:
expected_tls = cast(dict[str, object], cfg["tls_interception"])
actual_tls = cast(dict[str, object], parsed["tls_interception"])
self.assertEqual(expected_tls["enabled"], actual_tls["enabled"])
self.assertEqual(expected_tls["ca_cert"], actual_tls["ca_cert"])
self.assertEqual(expected_tls["ca_key"], actual_tls["ca_key"])
expected_passthrough = expected_tls["passthrough_domains"]
if expected_passthrough:
self.assertEqual(
expected_passthrough,
actual_tls["passthrough_domains"],
)
else:
self.assertNotIn("passthrough_domains", actual_tls)
else:
self.assertNotIn("tls_interception", parsed)
if "ssrf" in cfg:
self.assertEqual(cfg["ssrf"], parsed["ssrf"])
else:
self.assertNotIn("ssrf", parsed)
def test_render_emits_required_top_level_keys(self):
"""One render-level smoke check: the serialized YAML is plausibly
the shape pipelock expects. We don't grep every key here — that's
what TestBuildConfig is for."""
cfg = pipelock_build_config(fixture_minimal().bottles["dev"])
text = pipelock_render_yaml(cfg)
for required in (
"api_allowlist:",
"forward_proxy:",
"dlp:",
"request_body_scanning:",
):
self.assertIn(required, text)
# No ssh carve-outs in the rendered yaml.
self.assertNotIn("trusted_domains:", text)
self.assertNotIn("ssrf:", text)
def test_render_semantics_match_minimal_config(self):
cfg = pipelock_build_config(fixture_minimal().bottles["dev"])
self.assert_render_semantics_match(cfg)
def test_render_semantics_match_tls_with_empty_passthrough(self):
cfg = pipelock_build_config(
fixture_minimal().bottles["dev"],
ca_cert_path="/etc/pipelock-ca.pem",
ca_key_path="/etc/pipelock-ca-key.pem",
)
self.assert_render_semantics_match(cfg)
def test_render_semantics_match_all_optional_sections(self):
bottle = Manifest.from_json_obj({
"bottles": {"dev": {"egress": {"routes": [
{"host": "api.openai.com",
"pipelock": {"tls_passthrough": True}},
{"host": "gitea.dideric.is",
"pipelock": {"ssrf_ip_allowlist": ["100.78.141.42/32"]}},
]}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
}).bottles["dev"]
cfg = pipelock_build_config(
bottle,
ca_cert_path="/etc/pipelock-ca.pem",
ca_key_path="/etc/pipelock-ca-key.pem",
ssrf_ip_allowlist=("172.20.0.0/16",),
)
self.assert_render_semantics_match(cfg)
def test_render_rejects_missing_required_key(self):
cfg = pipelock_build_config(fixture_minimal().bottles["dev"])
del cfg["mode"]
with self.assertRaisesRegex(ValueError, r"config\.mode"):
pipelock_render_yaml(cfg)
def test_render_rejects_wrong_section_type(self):
cfg = pipelock_build_config(fixture_minimal().bottles["dev"])
cfg["dlp"] = []
with self.assertRaisesRegex(ValueError, r"config\.dlp.*mapping"):
pipelock_render_yaml(cfg)
def test_render_rejects_wrong_list_item_type(self):
cfg = pipelock_build_config(
fixture_minimal().bottles["dev"],
ca_cert_path="/etc/pipelock-ca.pem",
ca_key_path="/etc/pipelock-ca-key.pem",
)
tls = cast(dict[str, object], cfg["tls_interception"])
tls["passthrough_domains"] = ["api.openai.com", 3]
with self.assertRaisesRegex(
ValueError, r"tls_interception\.passthrough_domains",
):
pipelock_render_yaml(cfg)
def test_render_rejects_unsupported_top_level_section(self):
cfg = pipelock_build_config(fixture_minimal().bottles["dev"])
cfg["trusted_domains"] = []
with self.assertRaisesRegex(ValueError, r"config\.trusted_domains"):
pipelock_render_yaml(cfg)
def test_prepare_writes_file_at_mode_600(self):
plan = PipelockProxy().prepare(
fixture_minimal().bottles["dev"], "demo", self.out_dir
)
self.assertEqual(0o600, os.stat(plan.yaml_path).st_mode & 0o777)
def test_prepare_does_not_leak_env_names_or_values(self):
manifest = Manifest.from_json_obj({
"bottles": {
"dev": {
"env": {
"MY_SECRET": "literal-value-should-not-appear",
"ANOTHER": "?prompt-message",
},
"egress": {"routes": [{"host": "github.com"}]},
}
},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
})
plan = PipelockProxy().prepare(
manifest.bottles["dev"], "demo", self.out_dir
)
content = plan.yaml_path.read_text()
self.assertNotIn("literal-value-should-not-appear", content)
self.assertNotIn("MY_SECRET", content)
self.assertNotIn("prompt-message", content)
def test_render_emits_tls_interception_via_prepare(self):
"""`PipelockProxy.prepare` plumbs the module-level in-container
CA constants through to the YAML. The block should land in the
rendered output with `enabled: true`, the configured paths,
and any route-owned passthrough domains. The actual
host-side CA generation happens in launch (not prepare), so
this test exercises only the YAML rendering."""
bottle = Manifest.from_json_obj({
"bottles": {"dev": {"egress": {"routes": [
{"host": "api.openai.com",
"pipelock": {"tls_passthrough": True}},
]}}},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
}).bottles["dev"]
plan = PipelockProxy().prepare(bottle, "demo", self.out_dir)
content = plan.yaml_path.read_text()
self.assertIn("tls_interception:", content)
self.assertIn("enabled: true", content)
self.assertIn('ca_cert: "/etc/pipelock-ca.pem"', content)
self.assertIn('ca_key: "/etc/pipelock-ca-key.pem"', content)
self.assertIn("passthrough_domains:", content)
self.assertIn('- "api.openai.com"', content)
def test_render_emits_ssrf_block_when_allowlist_given(self):
cfg = pipelock_build_config(
fixture_minimal().bottles["dev"],
ca_cert_path="/etc/pipelock-ca.pem",
ca_key_path="/etc/pipelock-ca-key.pem",
ssrf_ip_allowlist=("172.20.0.0/16",),
)
text = pipelock_render_yaml(cfg)
self.assertIn("ssrf:", text)
self.assertIn("ip_allowlist:", text)
self.assertIn('- "172.20.0.0/16"', text)
def test_render_emits_seed_phrase_off_by_default(self):
text = pipelock_render_yaml(
pipelock_build_config(fixture_minimal().bottles["dev"])
)
self.assertIn("seed_phrase_detection:", text)
self.assertIn("enabled: false", text)
if __name__ == "__main__":
unittest.main()