From 07da4366ad9a13408a055fff8bca2387dae809b1 Mon Sep 17 00:00:00 2001 From: didericis Date: Wed, 13 May 2026 16:29:10 -0400 Subject: [PATCH] test(cred_proxy): integration tests for header inject + strip (PRD 0010) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Drives DockerCredProxy.start through the production code path against a fake upstream container running on the same egress network. The "agent" is a curl container on the bottle's internal network — same access topology the agent uses in production. Covers PRD 0010 success criteria: - SC3 (the request reaches upstream, header round-trip works) - SC6 (inbound Authorization stripped; the proxy injects the configured token even when the agent tries to smuggle one in) - partial SC2 (cred-proxy reachable by the alias from the internal network) - 404 for unconfigured routes Live-network tests against real Anthropic / GitHub / Gitea / npm upstreams (SC4 and SC5 specifically) are deferred — the fake-upstream shape covers the routing + header layer that's actually under test here. --- tests/integration/_fake_upstream.py | 91 ++++++ tests/integration/test_cred_proxy_sidecar.py | 295 +++++++++++++++++++ 2 files changed, 386 insertions(+) create mode 100644 tests/integration/_fake_upstream.py create mode 100644 tests/integration/test_cred_proxy_sidecar.py diff --git a/tests/integration/_fake_upstream.py b/tests/integration/_fake_upstream.py new file mode 100644 index 0000000..f5c2264 --- /dev/null +++ b/tests/integration/_fake_upstream.py @@ -0,0 +1,91 @@ +"""A capture-and-echo HTTP server used as a fake upstream behind the +cred-proxy in integration tests. + +Captures the last request's method, path, and headers under +/__last_request (as JSON). Returns a fixed 200 OK with a deterministic +body for every other path. Tests probe /__last_request to assert on +header injection (PRD 0010 SC3/SC6). + +Stdlib-only; runs inside a python:alpine container with a single +bind-mount. +""" + +from __future__ import annotations + +import http.server +import json +import os +import socketserver +import sys +import threading + + +_lock = threading.Lock() +_last_request: dict[str, object] = {} + + +class Handler(http.server.BaseHTTPRequestHandler): + def log_message(self, format: str, *args: object) -> None: + # Quiet — the test reads the capture endpoint, not stderr. + return + + def _capture_and_respond(self) -> None: + # Skip capturing the inspection endpoints so the test's own + # query to /__last_request doesn't overwrite the request it + # came in to inspect. + if not self.path.startswith("/__"): + with _lock: + global _last_request + _last_request = { + "method": self.command, + "path": self.path, + "headers": [[k, v] for k, v in self.headers.items()], + } + if self.path == "/__last_request": + body = json.dumps(_last_request, indent=2).encode("utf-8") + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + return + if self.path == "/__sse": + # SSE-style streaming response. Used by the no-buffering + # test: three events with short flushes between them. + self.send_response(200) + self.send_header("Content-Type", "text/event-stream") + self.send_header("Cache-Control", "no-cache") + self.end_headers() + for i in range(3): + self.wfile.write(f"data: event-{i}\n\n".encode("utf-8")) + self.wfile.flush() + return + body = b'{"upstream":"fake","ok":true}\n' + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def do_GET(self) -> None: self._capture_and_respond() + def do_POST(self) -> None: self._capture_and_respond() + def do_PUT(self) -> None: self._capture_and_respond() + def do_DELETE(self) -> None: self._capture_and_respond() + def do_PATCH(self) -> None: self._capture_and_respond() + + +class FakeServer(socketserver.ThreadingMixIn, http.server.HTTPServer): + allow_reuse_address = True + daemon_threads = True + + +def main() -> None: + port = int(os.environ.get("FAKE_UPSTREAM_PORT", "8080")) + server = FakeServer(("0.0.0.0", port), Handler) + sys.stderr.write(f"fake-upstream listening on :{port}\n") + sys.stderr.flush() + server.serve_forever() + + +if __name__ == "__main__": + main() diff --git a/tests/integration/test_cred_proxy_sidecar.py b/tests/integration/test_cred_proxy_sidecar.py new file mode 100644 index 0000000..a121780 --- /dev/null +++ b/tests/integration/test_cred_proxy_sidecar.py @@ -0,0 +1,295 @@ +"""Integration: drive `DockerCredProxy.prepare` → `.start` against a +fake upstream container, then verify header injection / strip-and- +replace at the wire level (PRD 0010 SC2, SC3, SC6). + +Topology mirrors production: a per-bottle internal docker network (no +default gateway) for the agent ↔ cred-proxy leg, and an egress network +for cred-proxy ↔ upstream. The "agent" is a curl container on the +internal net; the "upstream" is the fake-upstream container on the +egress net. cred-proxy straddles both. +""" + +from __future__ import annotations + +import dataclasses +import json +import os +import shutil +import subprocess +import tempfile +import unittest +from pathlib import Path + +from claude_bottle.backend.docker.cred_proxy import ( + CRED_PROXY_HOSTNAME, + CRED_PROXY_PORT, + DockerCredProxy, + build_cred_proxy_image, + cred_proxy_container_name, +) +from claude_bottle.backend.docker.network import ( + network_create_egress, + network_create_internal, + network_remove, +) +from claude_bottle.cred_proxy import CredProxy +from claude_bottle.manifest import Manifest +from tests._docker import skip_unless_docker + + +CURL_IMAGE = "curlimages/curl:latest" +FAKE_UPSTREAM_IMAGE = "python:3.13-alpine" +FAKE_UPSTREAM_HOST = "fake-upstream" +FAKE_UPSTREAM_PORT = "8080" + + +def _bottle(tokens): + return Manifest.from_json_obj({ + "bottles": {"dev": {"tokens": tokens}}, + "agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}}, + }).bottles["dev"] + + +class _StubCredProxy(CredProxy): + """CredProxy.prepare's render uses the Kind defaults, but the + integration test needs the cred-proxy to forward to the fake + upstream — not api.anthropic.com / github.com / npmjs.org. We + pass a one-route plan in directly via DockerCredProxy.start + rather than going through the manifest path.""" + + def start(self, plan): raise NotImplementedError + def stop(self, target): return None + + +def _make_routes_json(upstream_host: str, upstream_port: str) -> str: + payload = { + "routes": [ + { + "path": "/fake/", + "upstream": f"http://{upstream_host}:{upstream_port}", + "auth_scheme": "Bearer", + "token_env": "CRED_PROXY_TOKEN_0", + }, + ], + } + return json.dumps(payload, indent=2) + "\n" + + +@skip_unless_docker() +class TestCredProxySidecar(unittest.TestCase): + @classmethod + def setUpClass(cls): + # Pre-pull the probe + fake-upstream base images so per-test + # retries don't race the registry. Skip if pulls fail (the + # canary suite separately probes registry health). + for image in (CURL_IMAGE, FAKE_UPSTREAM_IMAGE): + r = subprocess.run( + ["docker", "pull", image], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + check=False, + ) + if r.returncode != 0: + raise unittest.SkipTest(f"could not pull {image}") + build_cred_proxy_image() + + def setUp(self): + self.slug = f"cb-test-cp-{os.getpid()}" + self.proxy_name = "" + self.fake_name = f"fake-upstream-{self.slug}" + self.internal_net = "" + self.egress_net = "" + self.work_dir = Path(tempfile.mkdtemp()) + + def tearDown(self): + for name in (self.proxy_name, self.fake_name): + if name: + subprocess.run( + ["docker", "rm", "-f", name], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + check=False, + ) + for n in (self.internal_net, self.egress_net): + if n: + network_remove(n) + shutil.rmtree(self.work_dir, ignore_errors=True) + + def _bring_up_fake_upstream(self) -> None: + """Run the fake-upstream container on the egress network with + the host stable name `fake-upstream`. Bind-mount the script + from tests/integration/.""" + repo_dir = str(Path(__file__).resolve().parent.parent.parent) + script = "tests/integration/_fake_upstream.py" + r = subprocess.run( + [ + "docker", "run", "-d", + "--name", self.fake_name, + "--hostname", FAKE_UPSTREAM_HOST, + "--network", self.egress_net, + "--network-alias", FAKE_UPSTREAM_HOST, + "-v", f"{repo_dir}/{script}:/srv.py:ro", + "-e", f"FAKE_UPSTREAM_PORT={FAKE_UPSTREAM_PORT}", + FAKE_UPSTREAM_IMAGE, + "python3", "/srv.py", + ], + capture_output=True, text=True, check=False, + ) + if r.returncode != 0: + self.fail(f"failed to start fake-upstream: {r.stderr}") + + def _start_cred_proxy_via_production_code(self) -> str: + """Run DockerCredProxy.start with a plan that points at the + fake upstream. We bypass the manifest path (which fixes + upstreams by Kind) by handing .start an already-rendered + routes.json.""" + from claude_bottle.cred_proxy import ( + CredProxyPlan, + CredProxyUpstream, + ) + routes_path = self.work_dir / "routes.json" + routes_path.write_text(_make_routes_json(FAKE_UPSTREAM_HOST, FAKE_UPSTREAM_PORT)) + routes_path.chmod(0o600) + plan = CredProxyPlan( + slug=self.slug, + routes_path=routes_path, + upstreams=(CredProxyUpstream( + kind="fake", + path="/fake/", + upstream=f"http://{FAKE_UPSTREAM_HOST}:{FAKE_UPSTREAM_PORT}", + auth_scheme="Bearer", + token_env="CRED_PROXY_TOKEN_0", + token_ref="TEST_TOKEN", + ),), + token_env_map={"CRED_PROXY_TOKEN_0": "TEST_TOKEN"}, + internal_network=self.internal_net, + egress_network=self.egress_net, + ) + # Inject the host-side TEST_TOKEN into our process env so the + # production resolver picks it up. + os.environ["TEST_TOKEN"] = "real-token-injected-by-proxy" + try: + return DockerCredProxy().start(plan) + finally: + os.environ.pop("TEST_TOKEN", None) + + def _curl_via_internal_net(self, path: str, *extra: str) -> str: + """Run a sibling curl container on the internal network — same + access topology the agent uses in production — to hit the + cred-proxy. Returns stdout.""" + r = subprocess.run( + [ + "docker", "run", "--rm", + "--network", self.internal_net, + CURL_IMAGE, + "-s", "--max-time", "10", + "--retry", "20", "--retry-delay", "1", "--retry-connrefused", + *extra, + f"http://{CRED_PROXY_HOSTNAME}:{CRED_PROXY_PORT}{path}", + ], + capture_output=True, text=True, timeout=60, check=False, + ) + self.assertEqual(0, r.returncode, + f"curl failed: stdout={r.stdout!r} stderr={r.stderr!r}") + return r.stdout + + def _query_fake_capture(self) -> dict: + """Read the fake upstream's /__last_request endpoint to see + what headers it received.""" + r = subprocess.run( + [ + "docker", "run", "--rm", + "--network", self.egress_net, + CURL_IMAGE, + "-s", "--max-time", "10", + "--retry", "5", "--retry-delay", "1", "--retry-connrefused", + f"http://{FAKE_UPSTREAM_HOST}:{FAKE_UPSTREAM_PORT}/__last_request", + ], + capture_output=True, text=True, timeout=30, check=False, + ) + self.assertEqual(0, r.returncode, f"capture query failed: {r.stderr}") + return json.loads(r.stdout) + + @unittest.skipIf( + os.environ.get("GITEA_ACTIONS") == "true", + "skipped under act_runner: docker socket mount topology breaks " + "in-process visibility of networks created on the host daemon", + ) + def test_end_to_end_header_injection_and_strip(self): + """Full bring-up via the production DockerCredProxy code path, + then send a request from a sibling curl container with the + agent's `Authorization` header. The fake upstream's capture + must show: + - the agent's Authorization was stripped (no `stolen` token) + - the cred-proxy injected `Bearer real-token-injected-by-proxy` + - the request reached the upstream at all + """ + self.internal_net = network_create_internal(self.slug) + self.egress_net = network_create_egress(self.slug) + self._bring_up_fake_upstream() + self.proxy_name = self._start_cred_proxy_via_production_code() + self.assertEqual(cred_proxy_container_name(self.slug), self.proxy_name) + + # Agent → cred-proxy with a smuggled Authorization header. + body = self._curl_via_internal_net( + "/fake/v1/messages", + "-H", "Authorization: Bearer stolen-by-prompt-injection", + "-X", "POST", + "-H", "Content-Type: application/json", + "--data-binary", '{"hello":"world"}', + ) + # The fake upstream responds with a fixed body. + self.assertIn('"upstream":"fake"', body) + + # Now ask the fake upstream what headers it actually saw. + captured = self._query_fake_capture() + self.assertEqual("POST", captured["method"]) + self.assertEqual("/v1/messages", captured["path"], + "the /fake/ prefix should be stripped before forwarding") + + headers = {k.lower(): v for k, v in captured["headers"]} + self.assertEqual( + "Bearer real-token-injected-by-proxy", + headers.get("authorization"), + "cred-proxy must strip the inbound Authorization and inject " + "the configured value", + ) + self.assertNotIn("stolen", headers.get("authorization", ""), + "the agent's smuggled token must NOT reach upstream") + self.assertEqual( + FAKE_UPSTREAM_HOST, + headers.get("host"), + "Host header should point at the upstream, not the proxy", + ) + + @unittest.skipIf( + os.environ.get("GITEA_ACTIONS") == "true", + "skipped under act_runner: docker socket mount topology breaks " + "in-process visibility of networks created on the host daemon", + ) + def test_unknown_path_returns_404(self): + """An agent reaching for an unconfigured route gets a 404, + not a silent forward to anywhere.""" + self.internal_net = network_create_internal(self.slug) + self.egress_net = network_create_egress(self.slug) + self._bring_up_fake_upstream() + self.proxy_name = self._start_cred_proxy_via_production_code() + + r = subprocess.run( + [ + "docker", "run", "--rm", + "--network", self.internal_net, + CURL_IMAGE, + "-s", "-o", "/dev/null", "-w", "%{http_code}", + "--max-time", "10", + "--retry", "20", "--retry-delay", "1", "--retry-connrefused", + f"http://{CRED_PROXY_HOSTNAME}:{CRED_PROXY_PORT}/not-a-route", + ], + capture_output=True, text=True, timeout=60, check=False, + ) + self.assertEqual(0, r.returncode) + self.assertEqual("404", r.stdout.strip()) + + +if __name__ == "__main__": + unittest.main()