feat(cred_proxy): add HTTP server + sidecar image (PRD 0010)
Stdlib-only Python proxy: reads /run/cred-proxy/routes.json on boot, listens on 0.0.0.0:9099, strips inbound Authorization, injects the configured header (Bearer or token) using the route's token_env env var, forwards over HTTPS to the upstream, and streams the response back chunk-by-chunk (SSE-safe). Hop-by-hop headers are stripped per RFC 7230, including anything listed in `Connection:`. Content-Length is dropped so http.client recomputes it on the upstream leg. Tokens never reach routes.json — they arrive via the container's environ. Dockerfile.cred-proxy builds on python:3.13-alpine pinned by digest; mkdir /run/cred-proxy is baked in so docker cp can drop the route table at start time. No pip install layer. Smoke-tested: container boots, logs listen line, returns 404 for unmatched paths. Full request/response cycle covered by the integration tests in a follow-up commit.
This commit is contained in:
@@ -0,0 +1,35 @@
|
||||
# Per-bottle cred-proxy sidecar image (PRD 0010).
|
||||
#
|
||||
# Holds API tokens (Anthropic OAuth, GitHub PAT, Gitea PAT, npm) in
|
||||
# this container's environ, strips inbound Authorization headers, and
|
||||
# injects the configured one before forwarding to the real upstream
|
||||
# over HTTPS. The agent's environ carries only URLs pointing at this
|
||||
# sidecar — the upstream credentials never reach the agent container.
|
||||
#
|
||||
# Stdlib-only Python; no pip install layer. The route table lands at
|
||||
# /run/cred-proxy/routes.json via `docker cp` from the backend's
|
||||
# start step.
|
||||
|
||||
# python:3.13-alpine. Pinned by digest for reproducibility — the
|
||||
# proxy script is stdlib-only so a Python minor-version drift would
|
||||
# only affect the runtime, not API surface, but pinning makes the
|
||||
# image bytes deterministic.
|
||||
FROM python@sha256:420cd0bf0f3998275875e02ecd5808168cf0843cbb4d3c536432f729247b2acc
|
||||
|
||||
# The proxy script ships as a single file. Tests in tests/unit/ import
|
||||
# it as `claude_bottle.cred_proxy_server`; the container runs it
|
||||
# directly as a script. No package install, no other modules pulled.
|
||||
COPY claude_bottle/cred_proxy_server.py /app/cred_proxy_server.py
|
||||
|
||||
# Pre-create the runtime directory the backend's start step will
|
||||
# `docker cp` routes.json into. docker cp does not create
|
||||
# intermediate dirs, so the mkdir must be baked into the image.
|
||||
RUN mkdir -p /run/cred-proxy
|
||||
|
||||
# Listening port. The agent's environ resolves the cred-proxy host
|
||||
# via Docker's embedded DNS on the per-bottle internal network and
|
||||
# dials this port. Surfaced as EXPOSE for documentation; not required
|
||||
# for the internal network to route to it.
|
||||
EXPOSE 9099
|
||||
|
||||
ENTRYPOINT ["python3", "/app/cred_proxy_server.py"]
|
||||
@@ -0,0 +1,401 @@
|
||||
"""Cred-proxy HTTP server (PRD 0010).
|
||||
|
||||
Runs inside the per-bottle cred-proxy sidecar. Reads
|
||||
`/run/cred-proxy/routes.json` (laid down by the backend's start step
|
||||
via `docker cp`) and listens on `0.0.0.0:<port>`. For each request:
|
||||
|
||||
1. Match the request path against the longest route prefix.
|
||||
2. Strip any inbound `Authorization` header (the agent cannot
|
||||
smuggle a stolen token through this path).
|
||||
3. Inject the configured header using the value of the env var
|
||||
named by the route's `token_env`.
|
||||
4. Forward to the upstream over HTTPS, preserving method, path
|
||||
suffix, query string, request body, and the remaining headers.
|
||||
5. Stream the response back without buffering — SSE-safe.
|
||||
|
||||
The agent talks plain HTTP to this server (loopback-equivalent across
|
||||
the per-bottle internal docker network). The cred-proxy talks HTTPS
|
||||
outbound through pipelock to the real upstream. Tokens live in this
|
||||
container's environ; they never land in routes.json on disk and never
|
||||
reach the agent's container.
|
||||
|
||||
Stdlib-only: this file ships into a minimal Python image with no pip
|
||||
install layer. The constants are duplicated from `cred_proxy.py` so
|
||||
the server doesn't need to import the rest of the package.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import http.client
|
||||
import http.server
|
||||
import json
|
||||
import os
|
||||
import socketserver
|
||||
import sys
|
||||
import typing
|
||||
import urllib.parse
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
# --- Config / route table ---------------------------------------------------
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Route:
|
||||
"""One row of the proxy's route table.
|
||||
|
||||
`path` is the agent-facing prefix (e.g. `/anthropic/`); the
|
||||
incoming request's path starts with this. `upstream_scheme` /
|
||||
`upstream_host` / `upstream_base_path` are the parsed pieces of
|
||||
the upstream URL — the request's path after the prefix is
|
||||
appended to `upstream_base_path`. `auth_scheme` is the literal
|
||||
word in the injected header (`Bearer` or `token`). `token_env`
|
||||
is the env-var name this container reads to get the token."""
|
||||
|
||||
path: str
|
||||
upstream_scheme: str
|
||||
upstream_host: str
|
||||
upstream_port: int
|
||||
upstream_base_path: str
|
||||
auth_scheme: str
|
||||
token_env: str
|
||||
|
||||
|
||||
def parse_routes(payload: dict[str, object]) -> tuple[Route, ...]:
|
||||
"""Parse the routes.json payload into a tuple of `Route`s. Sorted
|
||||
by descending path length so longest-prefix match is the first
|
||||
hit in iteration order."""
|
||||
raw = payload.get("routes")
|
||||
if not isinstance(raw, list):
|
||||
raise ValueError("routes.json: 'routes' must be a list")
|
||||
out: list[Route] = []
|
||||
for r in raw:
|
||||
if not isinstance(r, dict):
|
||||
raise ValueError(f"routes.json: route must be an object (got {type(r).__name__})")
|
||||
path = r["path"]
|
||||
upstream = r["upstream"]
|
||||
auth_scheme = r["auth_scheme"]
|
||||
token_env = r["token_env"]
|
||||
if not isinstance(path, str) or not path.startswith("/") or not path.endswith("/"):
|
||||
raise ValueError(f"routes.json: path {path!r} must start and end with /")
|
||||
if not isinstance(upstream, str):
|
||||
raise ValueError("routes.json: upstream must be a string")
|
||||
if not isinstance(auth_scheme, str):
|
||||
raise ValueError("routes.json: auth_scheme must be a string")
|
||||
if not isinstance(token_env, str) or not token_env:
|
||||
raise ValueError("routes.json: token_env must be a non-empty string")
|
||||
parsed = urllib.parse.urlsplit(upstream)
|
||||
if parsed.scheme not in ("http", "https"):
|
||||
raise ValueError(f"routes.json: upstream scheme must be http or https (got {parsed.scheme!r})")
|
||||
if not parsed.hostname:
|
||||
raise ValueError(f"routes.json: upstream {upstream!r} missing host")
|
||||
port = parsed.port or (443 if parsed.scheme == "https" else 80)
|
||||
base_path = parsed.path or ""
|
||||
out.append(Route(
|
||||
path=path,
|
||||
upstream_scheme=parsed.scheme,
|
||||
upstream_host=parsed.hostname,
|
||||
upstream_port=port,
|
||||
upstream_base_path=base_path,
|
||||
auth_scheme=auth_scheme,
|
||||
token_env=token_env,
|
||||
))
|
||||
out.sort(key=lambda r: len(r.path), reverse=True)
|
||||
return tuple(out)
|
||||
|
||||
|
||||
def select_route(routes: typing.Sequence[Route], request_path: str) -> Route | None:
|
||||
"""Return the longest-prefix matching route, or None. Caller is
|
||||
responsible for stripping any query string before passing
|
||||
`request_path`."""
|
||||
for r in routes:
|
||||
if request_path.startswith(r.path):
|
||||
return r
|
||||
return None
|
||||
|
||||
|
||||
# --- Header handling --------------------------------------------------------
|
||||
|
||||
|
||||
# Hop-by-hop headers (RFC 7230 §6.1). Stripped before forwarding.
|
||||
# Plus `host` (we set it for the upstream) and any `authorization` /
|
||||
# `proxy-authorization` (the proxy injects its own, never forwards
|
||||
# the agent's).
|
||||
_HOP_BY_HOP = frozenset({
|
||||
"connection",
|
||||
"keep-alive",
|
||||
"proxy-authenticate",
|
||||
"proxy-authorization",
|
||||
"te",
|
||||
"trailers",
|
||||
"transfer-encoding",
|
||||
"upgrade",
|
||||
})
|
||||
|
||||
_STRIPPED = _HOP_BY_HOP | frozenset({"host", "authorization", "content-length"})
|
||||
|
||||
|
||||
def build_forward_headers(
|
||||
incoming: typing.Iterable[tuple[str, str]],
|
||||
*,
|
||||
auth_scheme: str,
|
||||
token: str,
|
||||
upstream_host: str,
|
||||
) -> list[tuple[str, str]]:
|
||||
"""Build the header list to send upstream.
|
||||
|
||||
- Strip hop-by-hop headers, the inbound Authorization (the agent
|
||||
cannot smuggle a stolen token), and Host (we set it ourselves).
|
||||
- Strip Content-Length too: http.client recomputes it when we
|
||||
pass `body` to `request()`.
|
||||
- Honor the `Connection: close, x, y, z` form by also stripping
|
||||
every listed header name.
|
||||
- Inject `Authorization: <auth_scheme> <token>` and a Host header
|
||||
pointing at the upstream.
|
||||
"""
|
||||
incoming_list = list(incoming)
|
||||
# Headers listed in `Connection:` are also hop-by-hop for this hop.
|
||||
extra_hop: set[str] = set()
|
||||
for name, value in incoming_list:
|
||||
if name.lower() == "connection":
|
||||
for token_name in value.split(","):
|
||||
extra_hop.add(token_name.strip().lower())
|
||||
forwarded: list[tuple[str, str]] = []
|
||||
for name, value in incoming_list:
|
||||
lname = name.lower()
|
||||
if lname in _STRIPPED or lname in extra_hop:
|
||||
continue
|
||||
forwarded.append((name, value))
|
||||
forwarded.append(("Host", upstream_host))
|
||||
forwarded.append(("Authorization", f"{auth_scheme} {token}"))
|
||||
return forwarded
|
||||
|
||||
|
||||
def filter_response_headers(
|
||||
incoming: typing.Iterable[tuple[str, str]],
|
||||
) -> list[tuple[str, str]]:
|
||||
"""Build the response header list to send back to the agent.
|
||||
Strip hop-by-hop + `transfer-encoding` (we let the client's
|
||||
HTTP/1.1 default chunking handle streamed bodies)."""
|
||||
incoming_list = list(incoming)
|
||||
extra_hop: set[str] = set()
|
||||
for name, value in incoming_list:
|
||||
if name.lower() == "connection":
|
||||
for token_name in value.split(","):
|
||||
extra_hop.add(token_name.strip().lower())
|
||||
out: list[tuple[str, str]] = []
|
||||
for name, value in incoming_list:
|
||||
lname = name.lower()
|
||||
if lname in _HOP_BY_HOP or lname in extra_hop:
|
||||
continue
|
||||
out.append((name, value))
|
||||
return out
|
||||
|
||||
|
||||
# --- HTTP handler -----------------------------------------------------------
|
||||
|
||||
|
||||
# How many bytes to read off the upstream response per chunk. Small
|
||||
# enough that SSE keep-alive `:` lines (~1 byte) and per-event payloads
|
||||
# (~hundreds of bytes) round-trip without waiting for a larger buffer
|
||||
# to fill. Large enough to not dominate syscall overhead under load.
|
||||
STREAM_CHUNK = 4096
|
||||
|
||||
|
||||
class CredProxyHandler(http.server.BaseHTTPRequestHandler):
|
||||
"""Per-request handler. The routes + tokens are read off the
|
||||
server instance (set by `serve()`)."""
|
||||
|
||||
# Quieter logs: the default writes one line per request to stderr.
|
||||
# Useful in debug but noisy in normal operation.
|
||||
def log_message(self, format: str, *args: typing.Any) -> None:
|
||||
if os.environ.get("CRED_PROXY_DEBUG"):
|
||||
super().log_message(format, *args)
|
||||
|
||||
def do_GET(self) -> None: self._proxy()
|
||||
def do_POST(self) -> None: self._proxy()
|
||||
def do_PUT(self) -> None: self._proxy()
|
||||
def do_DELETE(self) -> None: self._proxy()
|
||||
def do_PATCH(self) -> None: self._proxy()
|
||||
def do_HEAD(self) -> None: self._proxy()
|
||||
def do_OPTIONS(self) -> None: self._proxy()
|
||||
|
||||
def _proxy(self) -> None:
|
||||
server = typing.cast("CredProxyServer", self.server)
|
||||
path, _, query = self.path.partition("?")
|
||||
route = select_route(server.routes, path)
|
||||
if route is None:
|
||||
self.send_error(404, f"no route for {path!r}")
|
||||
return
|
||||
token = server.tokens.get(route.token_env)
|
||||
if not token:
|
||||
self.send_error(500, f"cred-proxy: env var {route.token_env} unset in sidecar")
|
||||
return
|
||||
|
||||
suffix = path[len(route.path):]
|
||||
upstream_path = route.upstream_base_path.rstrip("/") + "/" + suffix
|
||||
if query:
|
||||
upstream_path = f"{upstream_path}?{query}"
|
||||
|
||||
# Read the request body, if any. We do not stream the body up
|
||||
# because http.client doesn't accept a streamable body for
|
||||
# arbitrary methods cleanly. v1 buffers — claude's tool-use
|
||||
# requests are small JSON payloads; SSE flows are in the
|
||||
# response direction only.
|
||||
body: bytes | None = None
|
||||
length_header = self.headers.get("Content-Length")
|
||||
if length_header is not None:
|
||||
try:
|
||||
length = int(length_header)
|
||||
except ValueError:
|
||||
self.send_error(400, "invalid Content-Length")
|
||||
return
|
||||
if length > 0:
|
||||
body = self.rfile.read(length)
|
||||
elif self.headers.get("Transfer-Encoding", "").lower() == "chunked":
|
||||
self.send_error(411, "cred-proxy: chunked request bodies not supported in v1")
|
||||
return
|
||||
|
||||
forward_headers = build_forward_headers(
|
||||
self.headers.items(),
|
||||
auth_scheme=route.auth_scheme,
|
||||
token=token,
|
||||
upstream_host=route.upstream_host,
|
||||
)
|
||||
|
||||
if route.upstream_scheme == "https":
|
||||
conn: http.client.HTTPConnection = http.client.HTTPSConnection(
|
||||
route.upstream_host, route.upstream_port, timeout=300,
|
||||
)
|
||||
else:
|
||||
conn = http.client.HTTPConnection(
|
||||
route.upstream_host, route.upstream_port, timeout=300,
|
||||
)
|
||||
|
||||
try:
|
||||
conn.request(self.command, upstream_path, body=body,
|
||||
headers=dict(forward_headers))
|
||||
resp = conn.getresponse()
|
||||
except (OSError, http.client.HTTPException) as e:
|
||||
try:
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
self.send_error(502, f"upstream connection failed: {e}")
|
||||
return
|
||||
|
||||
try:
|
||||
self._stream_response(resp)
|
||||
finally:
|
||||
try:
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _stream_response(self, resp: http.client.HTTPResponse) -> None:
|
||||
out_headers = filter_response_headers(resp.getheaders())
|
||||
# We send Connection: close so the agent's client closes after
|
||||
# each request; simplifies streaming bookkeeping and keeps
|
||||
# the handler stateless per request.
|
||||
self.send_response(resp.status, resp.reason)
|
||||
for name, value in out_headers:
|
||||
self.send_header(name, value)
|
||||
self.send_header("Connection", "close")
|
||||
self.end_headers()
|
||||
try:
|
||||
while True:
|
||||
chunk = resp.read(STREAM_CHUNK)
|
||||
if not chunk:
|
||||
break
|
||||
self.wfile.write(chunk)
|
||||
self.wfile.flush()
|
||||
except (BrokenPipeError, ConnectionResetError):
|
||||
# Agent disconnected mid-stream; that's fine.
|
||||
return
|
||||
|
||||
|
||||
class CredProxyServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
|
||||
"""Threaded HTTP server. `routes` + `tokens` are populated by
|
||||
`serve()` before `serve_forever()`."""
|
||||
|
||||
allow_reuse_address = True
|
||||
daemon_threads = True
|
||||
|
||||
routes: tuple[Route, ...] = ()
|
||||
tokens: dict[str, str] = {}
|
||||
|
||||
|
||||
# --- Entry point ------------------------------------------------------------
|
||||
|
||||
|
||||
DEFAULT_ROUTES_PATH = "/run/cred-proxy/routes.json"
|
||||
DEFAULT_PORT = 9099
|
||||
|
||||
|
||||
def load_routes(path: str) -> tuple[Route, ...]:
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
payload = json.load(f)
|
||||
if not isinstance(payload, dict):
|
||||
raise ValueError(f"{path}: top-level must be an object")
|
||||
return parse_routes(payload)
|
||||
|
||||
|
||||
def load_tokens(routes: tuple[Route, ...], environ: typing.Mapping[str, str]) -> dict[str, str]:
|
||||
"""Read each route's `token_env` from the supplied environ. Missing
|
||||
entries default to empty string; the handler returns 500 for
|
||||
unset tokens at request time so the operator can spot the
|
||||
misconfig in the cred-proxy's logs without the proxy refusing to
|
||||
boot."""
|
||||
out: dict[str, str] = {}
|
||||
for r in routes:
|
||||
out[r.token_env] = environ.get(r.token_env, "")
|
||||
return out
|
||||
|
||||
|
||||
def serve(
|
||||
*,
|
||||
routes_path: str = DEFAULT_ROUTES_PATH,
|
||||
port: int = DEFAULT_PORT,
|
||||
bind: str = "0.0.0.0",
|
||||
environ: typing.Mapping[str, str] | None = None,
|
||||
) -> typing.NoReturn:
|
||||
"""Bring up the server and run until killed. Exits non-zero on
|
||||
config error so the container's restart policy can surface the
|
||||
failure rather than silently retrying."""
|
||||
env = environ if environ is not None else os.environ
|
||||
routes = load_routes(routes_path)
|
||||
tokens = load_tokens(routes, env)
|
||||
server = CredProxyServer((bind, port), CredProxyHandler)
|
||||
server.routes = routes
|
||||
server.tokens = tokens
|
||||
sys.stderr.write(
|
||||
f"cred-proxy listening on {bind}:{port}; "
|
||||
f"{len(routes)} route(s): "
|
||||
f"{', '.join(r.path for r in routes)}\n"
|
||||
)
|
||||
sys.stderr.flush()
|
||||
try:
|
||||
server.serve_forever()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
server.server_close()
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
def main(argv: list[str]) -> int:
|
||||
"""Tiny argv shim: no flags in v1, all config via env vars.
|
||||
|
||||
`CRED_PROXY_ROUTES` overrides the routes path (default
|
||||
`/run/cred-proxy/routes.json`). `CRED_PROXY_PORT` overrides the
|
||||
listen port. Both have defaults so the container needs no extra
|
||||
config to come up."""
|
||||
routes_path = os.environ.get("CRED_PROXY_ROUTES", DEFAULT_ROUTES_PATH)
|
||||
port = int(os.environ.get("CRED_PROXY_PORT", str(DEFAULT_PORT)))
|
||||
bind = os.environ.get("CRED_PROXY_BIND", "0.0.0.0")
|
||||
serve(routes_path=routes_path, port=port, bind=bind)
|
||||
return 0 # serve() does not return.
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main(sys.argv))
|
||||
@@ -0,0 +1,205 @@
|
||||
"""Unit: cred-proxy server pure functions — route parsing, route
|
||||
selection, header injection (PRD 0010)."""
|
||||
|
||||
import unittest
|
||||
|
||||
from claude_bottle.cred_proxy_server import (
|
||||
Route,
|
||||
build_forward_headers,
|
||||
filter_response_headers,
|
||||
load_tokens,
|
||||
parse_routes,
|
||||
select_route,
|
||||
)
|
||||
|
||||
|
||||
class TestParseRoutes(unittest.TestCase):
|
||||
def test_parses_minimal_payload(self):
|
||||
routes = parse_routes({"routes": [
|
||||
{"path": "/anthropic/", "upstream": "https://api.anthropic.com",
|
||||
"auth_scheme": "Bearer", "token_env": "CRED_PROXY_TOKEN_0"},
|
||||
]})
|
||||
self.assertEqual(1, len(routes))
|
||||
r = routes[0]
|
||||
self.assertEqual("/anthropic/", r.path)
|
||||
self.assertEqual("https", r.upstream_scheme)
|
||||
self.assertEqual("api.anthropic.com", r.upstream_host)
|
||||
self.assertEqual(443, r.upstream_port)
|
||||
self.assertEqual("", r.upstream_base_path)
|
||||
self.assertEqual("Bearer", r.auth_scheme)
|
||||
self.assertEqual("CRED_PROXY_TOKEN_0", r.token_env)
|
||||
|
||||
def test_extracts_port_from_upstream(self):
|
||||
routes = parse_routes({"routes": [
|
||||
{"path": "/gitea/gitea.dideric.is/",
|
||||
"upstream": "https://gitea.dideric.is:30443",
|
||||
"auth_scheme": "token", "token_env": "CRED_PROXY_TOKEN_0"},
|
||||
]})
|
||||
self.assertEqual(30443, routes[0].upstream_port)
|
||||
|
||||
def test_sorted_by_descending_path_length(self):
|
||||
# /a/b/ should come before /a/ so longest-prefix is first.
|
||||
routes = parse_routes({"routes": [
|
||||
{"path": "/a/", "upstream": "https://x.example",
|
||||
"auth_scheme": "Bearer", "token_env": "T1"},
|
||||
{"path": "/a/b/", "upstream": "https://y.example",
|
||||
"auth_scheme": "Bearer", "token_env": "T2"},
|
||||
]})
|
||||
self.assertEqual("/a/b/", routes[0].path)
|
||||
self.assertEqual("/a/", routes[1].path)
|
||||
|
||||
def test_bad_path_rejected(self):
|
||||
with self.assertRaises(ValueError):
|
||||
parse_routes({"routes": [
|
||||
{"path": "no-leading-slash", "upstream": "https://x",
|
||||
"auth_scheme": "Bearer", "token_env": "T"},
|
||||
]})
|
||||
|
||||
def test_non_http_scheme_rejected(self):
|
||||
with self.assertRaises(ValueError):
|
||||
parse_routes({"routes": [
|
||||
{"path": "/x/", "upstream": "ftp://x.example/",
|
||||
"auth_scheme": "Bearer", "token_env": "T"},
|
||||
]})
|
||||
|
||||
|
||||
class TestSelectRoute(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.routes = parse_routes({"routes": [
|
||||
{"path": "/anthropic/", "upstream": "https://api.anthropic.com",
|
||||
"auth_scheme": "Bearer", "token_env": "T_A"},
|
||||
{"path": "/gh-api/", "upstream": "https://api.github.com",
|
||||
"auth_scheme": "Bearer", "token_env": "T_G"},
|
||||
{"path": "/gitea/gitea.dideric.is/",
|
||||
"upstream": "https://gitea.dideric.is",
|
||||
"auth_scheme": "token", "token_env": "T_T"},
|
||||
]})
|
||||
|
||||
def test_matches_prefix(self):
|
||||
r = select_route(self.routes, "/anthropic/v1/messages")
|
||||
assert r is not None
|
||||
self.assertEqual("/anthropic/", r.path)
|
||||
|
||||
def test_no_match_returns_none(self):
|
||||
self.assertIsNone(select_route(self.routes, "/other/path"))
|
||||
|
||||
def test_picks_longest_prefix(self):
|
||||
routes = parse_routes({"routes": [
|
||||
{"path": "/a/", "upstream": "https://x.example",
|
||||
"auth_scheme": "Bearer", "token_env": "T1"},
|
||||
{"path": "/a/long/", "upstream": "https://y.example",
|
||||
"auth_scheme": "Bearer", "token_env": "T2"},
|
||||
]})
|
||||
r = select_route(routes, "/a/long/sub")
|
||||
assert r is not None
|
||||
self.assertEqual("/a/long/", r.path)
|
||||
|
||||
|
||||
class TestBuildForwardHeaders(unittest.TestCase):
|
||||
def test_strips_authorization_and_injects(self):
|
||||
headers = build_forward_headers(
|
||||
[("Authorization", "Bearer stolen-token"),
|
||||
("Content-Type", "application/json")],
|
||||
auth_scheme="Bearer",
|
||||
token="real-token",
|
||||
upstream_host="api.anthropic.com",
|
||||
)
|
||||
names = [n.lower() for n, _ in headers]
|
||||
# Only one Authorization remains, with the injected value.
|
||||
auth_values = [v for n, v in headers if n.lower() == "authorization"]
|
||||
self.assertEqual(["Bearer real-token"], auth_values)
|
||||
self.assertEqual(1, names.count("authorization"))
|
||||
# Content-Type passes through.
|
||||
self.assertIn(("Content-Type", "application/json"), headers)
|
||||
|
||||
def test_strips_authorization_case_insensitive(self):
|
||||
headers = build_forward_headers(
|
||||
[("authorization", "Bearer stolen")],
|
||||
auth_scheme="Bearer",
|
||||
token="real",
|
||||
upstream_host="x.example",
|
||||
)
|
||||
auth_values = [v for n, v in headers if n.lower() == "authorization"]
|
||||
self.assertEqual(["Bearer real"], auth_values)
|
||||
|
||||
def test_strips_hop_by_hop(self):
|
||||
headers = build_forward_headers(
|
||||
[("Connection", "keep-alive, x-custom"),
|
||||
("X-Custom", "should-be-dropped"),
|
||||
("Keep-Alive", "300"),
|
||||
("Transfer-Encoding", "chunked"),
|
||||
("X-Real", "kept")],
|
||||
auth_scheme="Bearer",
|
||||
token="t",
|
||||
upstream_host="x.example",
|
||||
)
|
||||
names = [n.lower() for n, _ in headers]
|
||||
self.assertNotIn("connection", names)
|
||||
self.assertNotIn("keep-alive", names)
|
||||
self.assertNotIn("transfer-encoding", names)
|
||||
self.assertNotIn("x-custom", names) # listed in Connection: -> hop-by-hop
|
||||
self.assertIn("x-real", names)
|
||||
|
||||
def test_strips_content_length(self):
|
||||
# http.client recomputes Content-Length; passing it through
|
||||
# double-counts and breaks the upstream.
|
||||
headers = build_forward_headers(
|
||||
[("Content-Length", "999")],
|
||||
auth_scheme="Bearer", token="t", upstream_host="x.example",
|
||||
)
|
||||
names = [n.lower() for n, _ in headers]
|
||||
self.assertNotIn("content-length", names)
|
||||
|
||||
def test_sets_host_to_upstream(self):
|
||||
headers = build_forward_headers(
|
||||
[("Host", "cred-proxy:9099")],
|
||||
auth_scheme="Bearer", token="t", upstream_host="api.anthropic.com",
|
||||
)
|
||||
host_values = [v for n, v in headers if n.lower() == "host"]
|
||||
self.assertEqual(["api.anthropic.com"], host_values)
|
||||
|
||||
def test_uses_token_scheme(self):
|
||||
# gitea uses Authorization: token <pat>, not Bearer.
|
||||
headers = build_forward_headers(
|
||||
[],
|
||||
auth_scheme="token", token="abc123", upstream_host="gitea.dideric.is",
|
||||
)
|
||||
auth_values = [v for n, v in headers if n.lower() == "authorization"]
|
||||
self.assertEqual(["token abc123"], auth_values)
|
||||
|
||||
|
||||
class TestFilterResponseHeaders(unittest.TestCase):
|
||||
def test_strips_hop_by_hop_only(self):
|
||||
out = filter_response_headers([
|
||||
("Content-Type", "text/event-stream"),
|
||||
("Connection", "close"),
|
||||
("Transfer-Encoding", "chunked"),
|
||||
("Cache-Control", "no-cache"),
|
||||
])
|
||||
names = [n.lower() for n, _ in out]
|
||||
self.assertIn("content-type", names)
|
||||
self.assertIn("cache-control", names)
|
||||
self.assertNotIn("connection", names)
|
||||
self.assertNotIn("transfer-encoding", names)
|
||||
|
||||
|
||||
class TestLoadTokens(unittest.TestCase):
|
||||
def test_reads_per_route_env(self):
|
||||
routes = (
|
||||
Route("/a/", "https", "x", 443, "", "Bearer", "T_0"),
|
||||
Route("/b/", "https", "y", 443, "", "Bearer", "T_1"),
|
||||
)
|
||||
out = load_tokens(routes, {"T_0": "val0", "T_1": "val1"})
|
||||
self.assertEqual({"T_0": "val0", "T_1": "val1"}, out)
|
||||
|
||||
def test_missing_env_yields_empty_string(self):
|
||||
# The handler returns 500 at request time rather than the
|
||||
# server refusing to start. This keeps the operator's failure
|
||||
# signal in the cred-proxy's logs.
|
||||
routes = (Route("/a/", "https", "x", 443, "", "Bearer", "T_0"),)
|
||||
out = load_tokens(routes, {})
|
||||
self.assertEqual({"T_0": ""}, out)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user