feat(egress-proxy): add mitmproxy-based sidecar core (PRD 0017 chunk 1)
Lands the new egress-proxy artifact alongside cred-proxy. Chunk 2
wires the agent's HTTP_PROXY to it and removes cred-proxy.
- `Dockerfile.egress-proxy` — mitmproxy 11.1.3 base, COPY addon
files flat to /app, mkdir routes dir at /etc/egress-proxy/.
Digest pin deferred to chunk 2.
- `egress_proxy_addon_core.py` — pure-logic parse + decide
(host-importable; 21 unit tests).
- `egress_proxy_addon.py` — mitmproxy hook wrapper, container-only
(boot + SIGHUP reload, strip-Authorization + decide + 403/inject).
- `egress_proxy.py` — host helpers: manifest lift, routes.yaml
render (JSON content), token-env-map, Plan + abstract class.
- `backend/docker/egress_proxy.py` — `DockerEgressProxy` start/stop
mirroring `DockerCredProxy`; not yet called from launch.py.
- `manifest.py` — new `EgressProxyRoute` + `EgressProxyConfig` types
with the nested `auth: { scheme, token_ref }` block per PRD;
`bottle.egress_proxy` added to the bottle key set alongside
`cred_proxy` (chunk 2 hard-fails on the latter).
All 427 unit tests pass. Image builds; `docker run` boots mitmdump
and the addon loads routes from a mounted routes.yaml.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,47 @@
|
||||
# Per-bottle egress-proxy sidecar image (PRD 0017).
|
||||
#
|
||||
# Replaces cred-proxy (PRD 0010). Sits on the agent's HTTP_PROXY /
|
||||
# HTTPS_PROXY path (wiring lands in chunk 2) and owns three jobs:
|
||||
# 1. MITM HTTPS using the per-bottle CA (chunk 2 moves the CA
|
||||
# generation from pipelock).
|
||||
# 2. Enforce manifest-declared path_allowlist per route.
|
||||
# 3. Inject Authorization headers for routes that declare an auth
|
||||
# block.
|
||||
#
|
||||
# Chunk 1 of PRD 0017 ships this image and the addon. Wiring it
|
||||
# into the bottle launch (and the per-bottle CA + the pipelock
|
||||
# upstream proxy) is chunk 2.
|
||||
|
||||
# mitmproxy base image. mitmdump + addon API are already there; we
|
||||
# only need to drop our addon in. TODO(chunk-2): pin by digest.
|
||||
FROM mitmproxy/mitmproxy:11.1.3
|
||||
|
||||
USER root
|
||||
|
||||
# The addon ships as two files. `_core.py` is pure-logic, importable
|
||||
# both inside the container and from the host's tests; `_addon.py` is
|
||||
# the mitmproxy hook wrapper. Both land flat in /app/ so mitmdump's
|
||||
# loader finds them as top-level sibling modules.
|
||||
COPY claude_bottle/egress_proxy_addon_core.py /app/egress_proxy_addon_core.py
|
||||
COPY claude_bottle/egress_proxy_addon.py /app/egress_proxy_addon.py
|
||||
|
||||
# Pre-create the runtime directory the backend's start step will
|
||||
# `docker cp` routes.yaml into. docker cp does not create
|
||||
# intermediate dirs, so the mkdir must be baked into the image.
|
||||
# Ownership lets the unprivileged mitmproxy user read the file.
|
||||
RUN mkdir -p /etc/egress-proxy \
|
||||
&& chown -R mitmproxy:mitmproxy /etc/egress-proxy /app
|
||||
|
||||
USER mitmproxy
|
||||
|
||||
# Listening port. Agents will dial egress-proxy on this port via
|
||||
# their HTTP_PROXY env (chunk 2). Surfaced as EXPOSE for
|
||||
# documentation; not required for the internal network to route to it.
|
||||
EXPOSE 9099
|
||||
|
||||
# --mode regular@9099: standard HTTP/HTTPS forward proxy on :9099.
|
||||
# -s /app/egress_proxy_addon.py: loads our addon, which reads the
|
||||
# route table from /etc/egress-proxy/routes.yaml.
|
||||
# (Upstream-trust + CA-cert hooks land in chunk 2 when the per-bottle
|
||||
# pipelock CA wiring moves over from cred-proxy.)
|
||||
ENTRYPOINT ["mitmdump", "--mode", "regular@9099", "-s", "/app/egress_proxy_addon.py"]
|
||||
@@ -0,0 +1,216 @@
|
||||
"""DockerEgressProxy — the Docker-specific lifecycle for the
|
||||
per-bottle egress-proxy sidecar (PRD 0017). Inherits the platform-
|
||||
agnostic prepare step (route lift + routes.yaml render + token-env
|
||||
map derivation) from `EgressProxy`.
|
||||
|
||||
Chunk 1 of the PRD: the lifecycle is implemented but not yet called
|
||||
from `launch.py`. Tests build the image and exercise start/stop
|
||||
directly. Chunk 2 wires this in alongside the cred-proxy removal."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
from ...egress_proxy import (
|
||||
EGRESS_PROXY_HOSTNAME,
|
||||
EGRESS_PROXY_ROUTES_IN_CONTAINER,
|
||||
EgressProxy,
|
||||
EgressProxyPlan,
|
||||
egress_proxy_resolve_token_values,
|
||||
)
|
||||
from ...log import die, info, warn
|
||||
from . import util as docker_mod
|
||||
|
||||
|
||||
EGRESS_PROXY_IMAGE = os.environ.get(
|
||||
"CLAUDE_BOTTLE_EGRESS_PROXY_IMAGE",
|
||||
"claude-bottle-egress-proxy:latest",
|
||||
)
|
||||
|
||||
EGRESS_PROXY_DOCKERFILE = "Dockerfile.egress-proxy"
|
||||
|
||||
# Listening port inside the sidecar. The agent's HTTP_PROXY env var
|
||||
# (chunk 2) will resolve to `http://egress-proxy:<port>`.
|
||||
EGRESS_PROXY_PORT = int(os.environ.get("CLAUDE_BOTTLE_EGRESS_PROXY_PORT", "9099"))
|
||||
|
||||
# Repo root, for `docker build` context. Resolved from this file's
|
||||
# location: claude_bottle/backend/docker/egress_proxy.py → repo root.
|
||||
_REPO_DIR = str(Path(__file__).resolve().parent.parent.parent.parent)
|
||||
|
||||
|
||||
def egress_proxy_container_name(slug: str) -> str:
|
||||
return f"claude-bottle-egress-proxy-{slug}"
|
||||
|
||||
|
||||
def egress_proxy_url() -> str:
|
||||
"""Base URL the agent will dial via HTTP_PROXY (chunk 2). Stable
|
||||
across bottles because the sidecar attaches `--network-alias
|
||||
egress-proxy` on the internal network; the container name (which
|
||||
carries the slug) is not referenced by agent-side config."""
|
||||
return f"http://{EGRESS_PROXY_HOSTNAME}:{EGRESS_PROXY_PORT}"
|
||||
|
||||
|
||||
def build_egress_proxy_image() -> None:
|
||||
"""Build the egress-proxy image from `Dockerfile.egress-proxy`.
|
||||
Called by `DockerEgressProxy.start`; exposed at module level so
|
||||
integration tests can build it without running the full launch
|
||||
pipeline."""
|
||||
docker_mod.build_image(
|
||||
EGRESS_PROXY_IMAGE, _REPO_DIR, dockerfile=EGRESS_PROXY_DOCKERFILE,
|
||||
)
|
||||
|
||||
|
||||
class DockerEgressProxy(EgressProxy):
|
||||
"""Brings the egress-proxy sidecar up and down via Docker."""
|
||||
|
||||
def start(self, plan: EgressProxyPlan) -> str:
|
||||
"""Boot the egress-proxy sidecar:
|
||||
1. Resolve every host TokenRef env var into a concrete
|
||||
value. Fails early if any are unset.
|
||||
2. Build the egress-proxy image (no-op when cache is hot).
|
||||
3. `docker create` on the internal network with
|
||||
`--network-alias egress-proxy` and one `-e EGRESS_PROXY_TOKEN_N`
|
||||
flag per token slot. The values arrive via subprocess env, so
|
||||
they never land on argv.
|
||||
4. `docker cp` the routes.yaml into the container.
|
||||
5. Attach to the per-agent egress network so the proxy can
|
||||
reach pipelock (chunk 2 turns this into the pipelock leg
|
||||
via HTTPS_PROXY).
|
||||
6. `docker start`.
|
||||
Returns the container name (the target passed to `.stop`)."""
|
||||
if not plan.routes:
|
||||
die("DockerEgressProxy.start called with no routes; caller should skip")
|
||||
if not plan.internal_network or not plan.egress_network:
|
||||
die(
|
||||
"DockerEgressProxy.start: internal_network / egress_network must be "
|
||||
"populated on the plan before start"
|
||||
)
|
||||
if not plan.routes_path.is_file():
|
||||
die(
|
||||
f"egress-proxy routes file missing at {plan.routes_path}; "
|
||||
f"EgressProxy.prepare must run first"
|
||||
)
|
||||
|
||||
# Resolve host env vars into concrete values. Must happen at
|
||||
# start time (not prepare) — the values flow into the sidecar's
|
||||
# environ via subprocess env. The plan never holds them.
|
||||
token_values = egress_proxy_resolve_token_values(
|
||||
plan.token_env_map, dict(os.environ),
|
||||
)
|
||||
|
||||
build_egress_proxy_image()
|
||||
|
||||
name = egress_proxy_container_name(plan.slug)
|
||||
info(f"starting egress-proxy sidecar {name} on network {plan.internal_network}")
|
||||
|
||||
create_args = [
|
||||
"docker", "create",
|
||||
"--name", name,
|
||||
"--network", plan.internal_network,
|
||||
"--network-alias", EGRESS_PROXY_HOSTNAME,
|
||||
]
|
||||
if plan.pipelock_proxy_url:
|
||||
# Route egress-proxy's outbound HTTPS through pipelock so
|
||||
# the egress allowlist + DLP body scanner apply to its
|
||||
# traffic on the egress-proxy → upstream leg. Wiring lands
|
||||
# in chunk 2.
|
||||
create_args.extend([
|
||||
"-e", f"HTTPS_PROXY={plan.pipelock_proxy_url}",
|
||||
"-e", f"HTTP_PROXY={plan.pipelock_proxy_url}",
|
||||
"-e", "NO_PROXY=localhost,127.0.0.1",
|
||||
])
|
||||
# One -e flag per token slot; values arrive via subprocess env.
|
||||
# docker create with `-e NAME` (no =VALUE) reads NAME from the
|
||||
# current process env at create time. We pass `env=child_env`
|
||||
# to subprocess.run so the value comes from token_values, not
|
||||
# the host's os.environ directly — keeps the resolver in one
|
||||
# place and lets egress_proxy_resolve_token_values surface
|
||||
# missing-env errors with a clear hint.
|
||||
for token_env in sorted(plan.token_env_map.keys()):
|
||||
create_args.extend(["-e", token_env])
|
||||
create_args.append(EGRESS_PROXY_IMAGE)
|
||||
|
||||
child_env: dict[str, str] = {**os.environ, **token_values}
|
||||
|
||||
create_result = subprocess.run(
|
||||
create_args, capture_output=True, text=True, env=child_env, check=False,
|
||||
)
|
||||
if create_result.returncode != 0:
|
||||
die(
|
||||
f"failed to create egress-proxy sidecar {name}: "
|
||||
f"{create_result.stderr.strip()}"
|
||||
)
|
||||
|
||||
cp_result = subprocess.run(
|
||||
["docker", "cp", str(plan.routes_path),
|
||||
f"{name}:{EGRESS_PROXY_ROUTES_IN_CONTAINER}"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=False,
|
||||
)
|
||||
if cp_result.returncode != 0:
|
||||
subprocess.run(
|
||||
["docker", "rm", "-f", name],
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL,
|
||||
check=False,
|
||||
)
|
||||
die(
|
||||
f"failed to copy routes.yaml into {name}: "
|
||||
f"{cp_result.stderr.strip()}"
|
||||
)
|
||||
|
||||
connect_result = subprocess.run(
|
||||
["docker", "network", "connect", plan.egress_network, name],
|
||||
capture_output=True, text=True, check=False,
|
||||
)
|
||||
if connect_result.returncode != 0:
|
||||
subprocess.run(
|
||||
["docker", "rm", "-f", name],
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL,
|
||||
check=False,
|
||||
)
|
||||
die(
|
||||
f"failed to attach egress-proxy sidecar {name} to egress network "
|
||||
f"{plan.egress_network}: {connect_result.stderr.strip()}"
|
||||
)
|
||||
|
||||
start_result = subprocess.run(
|
||||
["docker", "start", name], capture_output=True, text=True, check=False,
|
||||
)
|
||||
if start_result.returncode != 0:
|
||||
subprocess.run(
|
||||
["docker", "rm", "-f", name],
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL,
|
||||
check=False,
|
||||
)
|
||||
die(
|
||||
f"failed to start egress-proxy sidecar {name}: "
|
||||
f"{start_result.stderr.strip()}"
|
||||
)
|
||||
|
||||
return name
|
||||
|
||||
def stop(self, target: str) -> None:
|
||||
"""Idempotent: missing container is success. `target` is the
|
||||
container name returned by `.start`."""
|
||||
if subprocess.run(
|
||||
["docker", "inspect", target],
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL,
|
||||
check=False,
|
||||
).returncode == 0:
|
||||
if subprocess.run(
|
||||
["docker", "rm", "-f", target],
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL,
|
||||
check=False,
|
||||
).returncode != 0:
|
||||
warn(
|
||||
f"failed to remove egress-proxy sidecar {target}; "
|
||||
f"clean up with 'docker rm -f {target}'"
|
||||
)
|
||||
@@ -0,0 +1,271 @@
|
||||
"""Per-bottle egress proxy (PRD 0017).
|
||||
|
||||
Replaces the cred-proxy sidecar (PRD 0010) with a mitmproxy-based
|
||||
sidecar that becomes the agent's `HTTP_PROXY` / `HTTPS_PROXY`. It
|
||||
owns three jobs:
|
||||
|
||||
1. MITM the agent's HTTPS with the per-bottle CA (moved from
|
||||
pipelock).
|
||||
2. Enforce manifest-declared `path_allowlist` per route.
|
||||
3. Inject `Authorization` headers for routes that declare an
|
||||
`auth` block, the same way cred-proxy does today.
|
||||
|
||||
This module defines the abstract proxy (`EgressProxy`), its plan
|
||||
dataclass (`EgressProxyPlan`), and the resolved per-route shape
|
||||
(`EgressProxyRoute`). The sidecar's start/stop lifecycle is backend-
|
||||
specific and lives on concrete subclasses (see
|
||||
`claude_bottle/backend/docker/egress_proxy.py`).
|
||||
|
||||
Chunk 1 of the PRD: this module + the mitmproxy addon + the Docker
|
||||
lifecycle land alongside the existing cred-proxy code. Chunk 2 wires
|
||||
the agent's `HTTP_PROXY` over to egress-proxy and removes cred-proxy.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
from .log import die
|
||||
from .manifest import Bottle
|
||||
|
||||
|
||||
# DNS name agents will dial for the per-bottle egress-proxy sidecar.
|
||||
# Backend-agnostic by contract: every concrete backend (Docker today,
|
||||
# others later) attaches this name to its sidecar on the bottle's
|
||||
# internal network. The agent's `HTTP_PROXY` env var resolves to
|
||||
# `http://egress-proxy:<port>` once chunk 2 cuts over.
|
||||
EGRESS_PROXY_HOSTNAME = "egress-proxy"
|
||||
|
||||
# In-container path the addon reads. Pre-created in
|
||||
# `Dockerfile.egress-proxy` so `docker cp` can drop the file directly.
|
||||
# `.yaml` extension per PRD 0017 — content is JSON (valid YAML) so
|
||||
# both sides can use stdlib `json`.
|
||||
EGRESS_PROXY_ROUTES_IN_CONTAINER = "/etc/egress-proxy/routes.yaml"
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class EgressProxyRoute:
|
||||
"""One resolved route on the egress-proxy sidecar.
|
||||
|
||||
`host` matches the request's hostname (case-insensitive). The
|
||||
optional `path_allowlist` constrains the URL path; empty tuple
|
||||
means no path-level filtering. The `auth_scheme` / `token_env` /
|
||||
`token_ref` triple is the credential-injection config; empty
|
||||
strings mean "no auth injection" (the manifest's nested `auth`
|
||||
block was omitted).
|
||||
|
||||
`token_env` is the env-var slot inside the egress-proxy container
|
||||
(e.g. `EGRESS_PROXY_TOKEN_0`); `token_ref` is the host env var
|
||||
the CLI reads at launch and forwards into the container's environ
|
||||
under `token_env`. Routes that share a `token_ref` coalesce to
|
||||
one `token_env` slot."""
|
||||
|
||||
host: str
|
||||
path_allowlist: tuple[str, ...] = ()
|
||||
auth_scheme: str = ""
|
||||
token_env: str = ""
|
||||
token_ref: str = ""
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class EgressProxyPlan:
|
||||
"""Output of EgressProxy.prepare; consumed by .start.
|
||||
|
||||
The slug + routes_path + routes + token_env_map fields are
|
||||
filled at prepare time (host-side, side-effect-free on docker).
|
||||
The network + pipelock fields are populated by the backend's
|
||||
launch step via `dataclasses.replace` once those resources
|
||||
exist. Empty defaults are sentinels meaning "not yet set";
|
||||
`.start` validates that they are populated.
|
||||
|
||||
`token_env_map` is `{<token_env in container>: <token_ref on host>}`.
|
||||
The backend's start step reads `os.environ[token_ref]` and
|
||||
forwards the value into the egress-proxy container's environ
|
||||
under `token_env`. The plan itself never holds token values —
|
||||
secrets never land in a dataclass that might be logged.
|
||||
|
||||
`pipelock_proxy_url` is the URL egress-proxy sets as `HTTPS_PROXY`
|
||||
in its environ so outbound HTTPS traverses pipelock — keeping
|
||||
pipelock's hostname allowlist + DLP body scanner on the
|
||||
egress-proxy → upstream leg.
|
||||
"""
|
||||
|
||||
slug: str
|
||||
routes_path: Path
|
||||
routes: tuple[EgressProxyRoute, ...]
|
||||
token_env_map: dict[str, str]
|
||||
internal_network: str = ""
|
||||
egress_network: str = ""
|
||||
pipelock_proxy_url: str = ""
|
||||
|
||||
|
||||
def egress_proxy_routes_for_bottle(
|
||||
bottle: Bottle,
|
||||
) -> tuple[EgressProxyRoute, ...]:
|
||||
"""Lift each `bottle.egress_proxy.routes[]` manifest entry into a
|
||||
resolved EgressProxyRoute. Order is preserved so route lookup at
|
||||
the proxy is stable.
|
||||
|
||||
Token-env slots are assigned per distinct `token_ref`: the first
|
||||
authenticated route with `token_ref` "GH_PAT" gets
|
||||
`EGRESS_PROXY_TOKEN_0`; a second route with the same `token_ref`
|
||||
shares slot 0. Unauthenticated routes (`auth` omitted) contribute
|
||||
no slot."""
|
||||
out: list[EgressProxyRoute] = []
|
||||
slot_for_token: dict[str, str] = {}
|
||||
for r in bottle.egress_proxy.routes:
|
||||
if r.AuthScheme and r.TokenRef:
|
||||
token_env = slot_for_token.get(r.TokenRef)
|
||||
if token_env is None:
|
||||
token_env = f"EGRESS_PROXY_TOKEN_{len(slot_for_token)}"
|
||||
slot_for_token[r.TokenRef] = token_env
|
||||
out.append(EgressProxyRoute(
|
||||
host=r.Host,
|
||||
path_allowlist=r.PathAllowlist,
|
||||
auth_scheme=r.AuthScheme,
|
||||
token_env=token_env,
|
||||
token_ref=r.TokenRef,
|
||||
))
|
||||
else:
|
||||
out.append(EgressProxyRoute(
|
||||
host=r.Host,
|
||||
path_allowlist=r.PathAllowlist,
|
||||
))
|
||||
return tuple(out)
|
||||
|
||||
|
||||
def egress_proxy_token_env_map(
|
||||
routes: tuple[EgressProxyRoute, ...],
|
||||
) -> dict[str, str]:
|
||||
"""Collapse the route list into `{token_env: token_ref}` for the
|
||||
authenticated routes. Routes without `auth` contribute no entry.
|
||||
|
||||
Conflict detection: two routes that share a `token_env` slot but
|
||||
name different `token_ref` host vars is a programming error in
|
||||
`egress_proxy_routes_for_bottle`; surface it as a die rather than
|
||||
silently picking one."""
|
||||
out: dict[str, str] = {}
|
||||
for r in routes:
|
||||
if not r.token_env:
|
||||
continue
|
||||
existing = out.get(r.token_env)
|
||||
if existing is not None and existing != r.token_ref:
|
||||
die(
|
||||
f"egress-proxy plan conflict: {r.token_env} maps to both "
|
||||
f"{existing!r} and {r.token_ref!r}. Two routes sharing a "
|
||||
f"token slot must reference the same host env var."
|
||||
)
|
||||
out[r.token_env] = r.token_ref
|
||||
return out
|
||||
|
||||
|
||||
def egress_proxy_render_routes(
|
||||
routes: tuple[EgressProxyRoute, ...],
|
||||
) -> str:
|
||||
"""Serialize the route table for the addon to read.
|
||||
|
||||
JSON content (valid YAML), no token values, no host env-var
|
||||
names — the only thing the addon needs at runtime is the host →
|
||||
path_allowlist + auth_scheme + in-container env-var mapping. The
|
||||
actual token values arrive via the container's environ.
|
||||
|
||||
Authenticated routes carry `auth_scheme` + `token_env`;
|
||||
unauthenticated routes omit both keys (the addon's parser
|
||||
enforces both-or-neither)."""
|
||||
payload_routes: list[dict[str, object]] = []
|
||||
for r in routes:
|
||||
entry: dict[str, object] = {"host": r.host}
|
||||
if r.path_allowlist:
|
||||
entry["path_allowlist"] = list(r.path_allowlist)
|
||||
if r.auth_scheme and r.token_env:
|
||||
entry["auth_scheme"] = r.auth_scheme
|
||||
entry["token_env"] = r.token_env
|
||||
payload_routes.append(entry)
|
||||
payload = {"routes": payload_routes}
|
||||
return json.dumps(payload, indent=2, sort_keys=False) + "\n"
|
||||
|
||||
|
||||
def egress_proxy_resolve_token_values(
|
||||
token_env_map: dict[str, str],
|
||||
host_env: dict[str, str],
|
||||
) -> dict[str, str]:
|
||||
"""Read `host_env[TokenRef]` for each entry in `token_env_map` and
|
||||
return `{token_env: <value>}`. Dies (with a pointer at the missing
|
||||
var name) if any TokenRef is unset.
|
||||
|
||||
Pure function: takes the host env as an argument so tests can pass
|
||||
a sealed mapping without touching `os.environ`."""
|
||||
out: dict[str, str] = {}
|
||||
for token_env, token_ref in token_env_map.items():
|
||||
value = host_env.get(token_ref)
|
||||
if value is None:
|
||||
die(
|
||||
f"egress-proxy: host env var '{token_ref}' is unset. Set it "
|
||||
f"before launching, or remove the corresponding auth block "
|
||||
f"from bottle.egress_proxy.routes."
|
||||
)
|
||||
if not value:
|
||||
die(
|
||||
f"egress-proxy: host env var '{token_ref}' is empty. The "
|
||||
f"egress-proxy will not inject an empty token; set it to "
|
||||
f"the real value or remove the route's auth block."
|
||||
)
|
||||
out[token_env] = value
|
||||
return out
|
||||
|
||||
|
||||
class EgressProxy(ABC):
|
||||
"""The per-bottle egress proxy. Encapsulates the host-side prepare
|
||||
(route lift + routes.yaml render + token-env-map derivation); the
|
||||
sidecar's start/stop lifecycle is backend-specific and lives on
|
||||
concrete subclasses."""
|
||||
|
||||
def prepare(self, bottle: Bottle, slug: str, stage_dir: Path) -> EgressProxyPlan:
|
||||
"""Lift `bottle.egress_proxy.routes` into resolved routes,
|
||||
render the routes file (mode 600) under `stage_dir`, and
|
||||
return the plan. Pure host-side, no docker subprocess. The
|
||||
token-env map records the mapping the launch step uses to
|
||||
forward values from the host's environ into the sidecar's
|
||||
environ.
|
||||
|
||||
Returned plan is incomplete: the launch step must fill
|
||||
`internal_network` / `egress_network` / `pipelock_proxy_url`
|
||||
via `dataclasses.replace` before passing it to `.start`."""
|
||||
routes = egress_proxy_routes_for_bottle(bottle)
|
||||
routes_path = stage_dir / "egress_proxy_routes.yaml"
|
||||
routes_path.write_text(egress_proxy_render_routes(routes))
|
||||
routes_path.chmod(0o600)
|
||||
return EgressProxyPlan(
|
||||
slug=slug,
|
||||
routes_path=routes_path,
|
||||
routes=routes,
|
||||
token_env_map=egress_proxy_token_env_map(routes),
|
||||
)
|
||||
|
||||
@abstractmethod
|
||||
def start(self, plan: EgressProxyPlan) -> str:
|
||||
"""Bring up the egress-proxy sidecar according to `plan`.
|
||||
Returns the target string identifying the running instance —
|
||||
the same value to pass to `.stop`. Backend-specific."""
|
||||
|
||||
@abstractmethod
|
||||
def stop(self, target: str) -> None:
|
||||
"""Tear down the egress-proxy sidecar identified by `target`
|
||||
(the value `.start` returned). Idempotent: a missing target
|
||||
is success. Backend-specific."""
|
||||
|
||||
|
||||
__all__ = [
|
||||
"EGRESS_PROXY_HOSTNAME",
|
||||
"EGRESS_PROXY_ROUTES_IN_CONTAINER",
|
||||
"EgressProxy",
|
||||
"EgressProxyPlan",
|
||||
"EgressProxyRoute",
|
||||
"egress_proxy_render_routes",
|
||||
"egress_proxy_resolve_token_values",
|
||||
"egress_proxy_routes_for_bottle",
|
||||
"egress_proxy_token_env_map",
|
||||
]
|
||||
@@ -0,0 +1,116 @@
|
||||
"""mitmproxy addon entrypoint for the egress-proxy sidecar (PRD 0017).
|
||||
|
||||
Loaded by `mitmdump -s /app/egress_proxy_addon.py` inside the
|
||||
egress-proxy container. Wraps the pure logic from
|
||||
`egress_proxy_addon_core` with mitmproxy's HTTPFlow API:
|
||||
|
||||
- At startup, read `EGRESS_PROXY_ROUTES` (default
|
||||
`/etc/egress-proxy/routes.yaml`, JSON content) → routes table.
|
||||
- SIGHUP re-reads the file and atomically swaps the in-memory
|
||||
table. A parse error keeps the old table in place — better to
|
||||
keep serving the old config than to leave the proxy with no
|
||||
routes after a typo.
|
||||
- On each `request`: strip the inbound Authorization header, then
|
||||
consult `decide()` for forward / block / inject-auth and apply
|
||||
the decision to the flow.
|
||||
|
||||
This file imports `mitmproxy` and is never imported on the host —
|
||||
mitmproxy is a container-only dependency. The host's tests target
|
||||
`egress_proxy_addon_core`.
|
||||
|
||||
Dockerfile.egress-proxy copies both this file and
|
||||
`egress_proxy_addon_core.py` flat into `/app/`; the absolute import
|
||||
below works because mitmdump runs with `/app` on its sys.path. The
|
||||
parallel file in the package source tree (claude_bottle/) is the
|
||||
build input — not a module the host imports."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
from mitmproxy import http # type: ignore[import-not-found]
|
||||
|
||||
# Absolute import (NOT `from .egress_proxy_addon_core`) — the
|
||||
# container drops both files flat into /app/ so they are sibling
|
||||
# top-level modules to mitmdump's loader, not a package.
|
||||
from egress_proxy_addon_core import Route, decide, load_routes # type: ignore[import-not-found]
|
||||
|
||||
|
||||
DEFAULT_ROUTES_PATH = "/etc/egress-proxy/routes.yaml"
|
||||
|
||||
|
||||
class EgressProxyAddon:
|
||||
"""The mitmproxy addon. One instance per `mitmdump` process; the
|
||||
request hook is invoked on every CONNECT-decapsulated HTTP/HTTPS
|
||||
request the agent makes."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.routes_path = os.environ.get("EGRESS_PROXY_ROUTES", DEFAULT_ROUTES_PATH)
|
||||
self.routes: tuple[Route, ...] = ()
|
||||
self._reload(initial=True)
|
||||
self._install_sighup()
|
||||
|
||||
def _reload(self, *, initial: bool = False) -> None:
|
||||
try:
|
||||
text = Path(self.routes_path).read_text(encoding="utf-8")
|
||||
new_routes = load_routes(text)
|
||||
except (OSError, ValueError) as e:
|
||||
tag = "boot" if initial else "SIGHUP"
|
||||
sys.stderr.write(
|
||||
f"egress-proxy: {tag} load failed: {e}\n"
|
||||
)
|
||||
if initial:
|
||||
# No baseline to fall back on; serve nothing rather
|
||||
# than masquerade as a proxy with a route table the
|
||||
# operator never declared.
|
||||
self.routes = ()
|
||||
return
|
||||
self.routes = new_routes
|
||||
sys.stderr.write(
|
||||
f"egress-proxy: loaded {len(self.routes)} route(s): "
|
||||
f"{', '.join(r.host for r in self.routes)}\n"
|
||||
)
|
||||
|
||||
def _install_sighup(self) -> None:
|
||||
if not hasattr(signal, "SIGHUP"):
|
||||
return
|
||||
|
||||
def handler(signum: int, frame: object) -> None:
|
||||
del signum, frame
|
||||
self._reload()
|
||||
|
||||
signal.signal(signal.SIGHUP, handler)
|
||||
|
||||
# mitmproxy's addon API: this method name + signature is how
|
||||
# mitmdump discovers the request hook.
|
||||
def request(self, flow: http.HTTPFlow) -> None:
|
||||
# Inbound Authorization is always stripped — the agent cannot
|
||||
# smuggle a stolen token through the proxy. If the matched
|
||||
# route declares an auth pair, a fresh header is injected
|
||||
# below.
|
||||
flow.request.headers.pop("authorization", None)
|
||||
|
||||
request_path = flow.request.path.split("?", 1)[0]
|
||||
decision = decide(
|
||||
self.routes,
|
||||
flow.request.pretty_host,
|
||||
request_path,
|
||||
os.environ,
|
||||
)
|
||||
|
||||
if decision.action == "block":
|
||||
flow.response = http.Response.make(
|
||||
403,
|
||||
decision.reason.encode("utf-8"),
|
||||
{"Content-Type": "text/plain; charset=utf-8"},
|
||||
)
|
||||
return
|
||||
|
||||
if decision.inject_authorization is not None:
|
||||
flow.request.headers["authorization"] = decision.inject_authorization
|
||||
|
||||
|
||||
addons = [EgressProxyAddon()]
|
||||
@@ -0,0 +1,213 @@
|
||||
"""Pure logic for the egress-proxy mitmproxy addon (PRD 0017).
|
||||
|
||||
Split out of `egress_proxy_addon.py` so the host's unit tests can
|
||||
exercise the parse + decision functions without depending on the
|
||||
`mitmproxy` package. The companion module wraps these with the
|
||||
`mitmproxy.http.HTTPFlow` API and is loaded inside the sidecar
|
||||
container.
|
||||
|
||||
Stdlib only: this file ships into the egress-proxy image, where the
|
||||
container's Python is whatever mitmproxy itself runs on.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import typing
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Route:
|
||||
"""One row of the egress-proxy route table.
|
||||
|
||||
`host` is the request's `Host` header (or SNI hostname) to match
|
||||
against. `path_allowlist` is an optional tuple of absolute path
|
||||
prefixes the request path must start with; empty tuple means no
|
||||
path constraint. `auth_scheme` and `token_env` together form the
|
||||
credential-injection pair (both set or both empty); a non-empty
|
||||
pair tells the addon to overwrite the inbound Authorization with
|
||||
`<auth_scheme> <value-of-environ[token_env]>`.
|
||||
"""
|
||||
|
||||
host: str
|
||||
path_allowlist: tuple[str, ...] = ()
|
||||
auth_scheme: str = ""
|
||||
token_env: str = ""
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Decision:
|
||||
"""The result of `decide()`. Either forward (with optional
|
||||
`inject_authorization` header) or block (with a `reason` to surface
|
||||
to the agent)."""
|
||||
|
||||
action: str # "forward" or "block"
|
||||
reason: str = ""
|
||||
inject_authorization: str | None = None
|
||||
|
||||
|
||||
def parse_routes(payload: object) -> tuple[Route, ...]:
|
||||
"""Parse the routes-file payload (already JSON-decoded) into a
|
||||
tuple of `Route`s. Raises `ValueError` on any malformed entry —
|
||||
the caller decides whether to keep the old table or refuse to
|
||||
start.
|
||||
|
||||
Schema:
|
||||
{
|
||||
"routes": [
|
||||
{
|
||||
"host": "api.github.com",
|
||||
"path_allowlist": ["/repos/x/", "/users/x"], # optional
|
||||
"auth_scheme": "Bearer", # optional
|
||||
"token_env": "EGRESS_PROXY_TOKEN_0" # optional
|
||||
},
|
||||
...
|
||||
]
|
||||
}
|
||||
"""
|
||||
if not isinstance(payload, dict):
|
||||
raise ValueError("routes payload: top-level must be an object")
|
||||
raw = payload.get("routes")
|
||||
if not isinstance(raw, list):
|
||||
raise ValueError("routes payload: 'routes' must be a list")
|
||||
out: list[Route] = []
|
||||
for i, r in enumerate(raw):
|
||||
out.append(_parse_one(i, r))
|
||||
return tuple(out)
|
||||
|
||||
|
||||
def _parse_one(idx: int, raw: object) -> Route:
|
||||
label = f"route[{idx}]"
|
||||
if not isinstance(raw, dict):
|
||||
raise ValueError(f"{label}: must be an object (got {type(raw).__name__})")
|
||||
host = raw.get("host")
|
||||
if not isinstance(host, str) or not host:
|
||||
raise ValueError(f"{label}: 'host' must be a non-empty string")
|
||||
|
||||
path_allow_raw = raw.get("path_allowlist", [])
|
||||
if not isinstance(path_allow_raw, list):
|
||||
raise ValueError(f"{label} ({host}): 'path_allowlist' must be a list")
|
||||
prefixes: list[str] = []
|
||||
for j, p in enumerate(path_allow_raw):
|
||||
if not isinstance(p, str):
|
||||
raise ValueError(
|
||||
f"{label} ({host}): path_allowlist[{j}] must be a string"
|
||||
)
|
||||
if not p.startswith("/"):
|
||||
raise ValueError(
|
||||
f"{label} ({host}): path_allowlist[{j}] {p!r} must be an "
|
||||
f"absolute path prefix starting with '/'"
|
||||
)
|
||||
prefixes.append(p)
|
||||
|
||||
auth_scheme = raw.get("auth_scheme", "")
|
||||
token_env = raw.get("token_env", "")
|
||||
if not isinstance(auth_scheme, str):
|
||||
raise ValueError(f"{label} ({host}): 'auth_scheme' must be a string")
|
||||
if not isinstance(token_env, str):
|
||||
raise ValueError(f"{label} ({host}): 'token_env' must be a string")
|
||||
# Both-or-neither: 'auth' on the manifest side renders to this
|
||||
# pair atomically. A partial pair here means the renderer or a
|
||||
# hand-edited file is broken.
|
||||
if bool(auth_scheme) != bool(token_env):
|
||||
raise ValueError(
|
||||
f"{label} ({host}): 'auth_scheme' and 'token_env' must be both "
|
||||
f"set or both empty (got auth_scheme={auth_scheme!r}, "
|
||||
f"token_env={token_env!r})"
|
||||
)
|
||||
|
||||
return Route(
|
||||
host=host,
|
||||
path_allowlist=tuple(prefixes),
|
||||
auth_scheme=auth_scheme,
|
||||
token_env=token_env,
|
||||
)
|
||||
|
||||
|
||||
def load_routes(text: str) -> tuple[Route, ...]:
|
||||
"""Convenience: parse JSON text → routes. Raises `ValueError` for
|
||||
both decode and shape errors so callers handle them uniformly."""
|
||||
try:
|
||||
payload = json.loads(text)
|
||||
except json.JSONDecodeError as e:
|
||||
raise ValueError(f"routes payload: invalid JSON: {e}") from e
|
||||
return parse_routes(payload)
|
||||
|
||||
|
||||
def match_route(
|
||||
routes: typing.Sequence[Route],
|
||||
request_host: str,
|
||||
) -> Route | None:
|
||||
"""Return the first route whose `host` matches `request_host`.
|
||||
|
||||
Exact match in v1 — globs / wildcards are a follow-up (per PRD
|
||||
0017 open questions). Hostname comparison is case-insensitive
|
||||
because DNS names are case-insensitive."""
|
||||
target = request_host.lower()
|
||||
for r in routes:
|
||||
if r.host.lower() == target:
|
||||
return r
|
||||
return None
|
||||
|
||||
|
||||
def decide(
|
||||
routes: typing.Sequence[Route],
|
||||
request_host: str,
|
||||
request_path: str,
|
||||
environ: typing.Mapping[str, str],
|
||||
) -> Decision:
|
||||
"""Pure decision: given a route table + request host + path + env,
|
||||
return what the addon should do with the request.
|
||||
|
||||
- No matching route → forward unchanged. Pipelock will
|
||||
hostname-gate it downstream; egress-proxy does not need to
|
||||
decide on hosts it doesn't recognise.
|
||||
- Matching route with `path_allowlist` set, request path doesn't
|
||||
start with any of the allowed prefixes → block with a clear
|
||||
reason.
|
||||
- Matching route with an auth pair → forward + inject
|
||||
Authorization. Token comes from `environ[route.token_env]`;
|
||||
missing/empty values 500 (route declared auth but the secret
|
||||
isn't here — operator misconfig).
|
||||
"""
|
||||
route = match_route(routes, request_host)
|
||||
if route is None:
|
||||
return Decision(action="forward")
|
||||
|
||||
if route.path_allowlist:
|
||||
if not any(request_path.startswith(p) for p in route.path_allowlist):
|
||||
return Decision(
|
||||
action="block",
|
||||
reason=(
|
||||
f"egress-proxy: path {request_path!r} not in "
|
||||
f"path_allowlist for {route.host!r}"
|
||||
),
|
||||
)
|
||||
|
||||
if route.auth_scheme and route.token_env:
|
||||
token = environ.get(route.token_env, "")
|
||||
if not token:
|
||||
return Decision(
|
||||
action="block",
|
||||
reason=(
|
||||
f"egress-proxy: route for {route.host!r} declared auth "
|
||||
f"but env var {route.token_env!r} is unset"
|
||||
),
|
||||
)
|
||||
return Decision(
|
||||
action="forward",
|
||||
inject_authorization=f"{route.auth_scheme} {token}",
|
||||
)
|
||||
|
||||
return Decision(action="forward")
|
||||
|
||||
|
||||
__all__ = [
|
||||
"Decision",
|
||||
"Route",
|
||||
"decide",
|
||||
"load_routes",
|
||||
"match_route",
|
||||
"parse_routes",
|
||||
]
|
||||
+194
-7
@@ -12,10 +12,11 @@ the system prompt, for bottles the body is human documentation
|
||||
(ignored by the parser).
|
||||
|
||||
Bottle schema (frontmatter):
|
||||
env: { <NAME>: <env-entry>, ... }
|
||||
git: [ <git-entry>, ... ]
|
||||
cred_proxy: { routes: [ <route>, ... ] }
|
||||
egress: { allowlist: [ <hostname>, ... ] }
|
||||
env: { <NAME>: <env-entry>, ... }
|
||||
git: [ <git-entry>, ... ]
|
||||
cred_proxy: { routes: [ <route>, ... ] } # superseded by egress_proxy (PRD 0017)
|
||||
egress_proxy: { routes: [ <egress-route>, ... ] }
|
||||
egress: { allowlist: [ <hostname>, ... ] }
|
||||
|
||||
Agent schema (frontmatter):
|
||||
bottle: <bottle-name> # required
|
||||
@@ -272,6 +273,158 @@ class CredProxyConfig:
|
||||
return cls(routes=routes)
|
||||
|
||||
|
||||
# Auth schemes for the egress-proxy route's optional `auth` block.
|
||||
# Same values cred-proxy accepts today; `token` sidesteps the Gitea
|
||||
# token-not-Bearer quirk (go-gitea/gitea#16734).
|
||||
EGRESS_PROXY_AUTH_SCHEMES = ("Bearer", "token")
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class EgressProxyRoute:
|
||||
"""One route on the per-bottle egress-proxy sidecar (PRD 0017).
|
||||
|
||||
`Host` matches the request's hostname (case-insensitive). The
|
||||
optional `PathAllowlist` constrains the URL path to a set of
|
||||
prefixes; empty tuple means no path-level filtering. The optional
|
||||
`AuthScheme` / `TokenRef` pair drives credential injection:
|
||||
when set, the proxy strips any inbound Authorization and injects
|
||||
`<AuthScheme> <value-of-host-env-named-by-TokenRef>`. When the
|
||||
manifest's `auth` block is omitted both fields are empty strings —
|
||||
no Authorization is written, no token forwarded.
|
||||
|
||||
Validation rules (enforced in `from_dict`):
|
||||
- `host` required, non-empty.
|
||||
- `path_allowlist` optional, list of absolute path prefixes.
|
||||
- `auth` optional. If present, MUST carry both `scheme` and
|
||||
`token_ref` as non-empty strings; an empty `auth: {}` is an
|
||||
error rather than a synonym for "no auth" (omit `auth` for
|
||||
that case).
|
||||
"""
|
||||
|
||||
Host: str
|
||||
PathAllowlist: tuple[str, ...] = ()
|
||||
AuthScheme: str = ""
|
||||
TokenRef: str = ""
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, bottle_name: str, idx: int, raw: object) -> "EgressProxyRoute":
|
||||
label = f"bottle '{bottle_name}' egress_proxy.routes[{idx}]"
|
||||
d = _as_json_object(raw, label)
|
||||
host = d.get("host")
|
||||
if not isinstance(host, str) or not host:
|
||||
die(f"{label} missing required string field 'host'")
|
||||
|
||||
path_allow_raw = d.get("path_allowlist")
|
||||
prefixes: tuple[str, ...] = ()
|
||||
if path_allow_raw is not None:
|
||||
if not isinstance(path_allow_raw, list):
|
||||
die(
|
||||
f"{label} path_allowlist must be an array "
|
||||
f"(was {type(path_allow_raw).__name__})"
|
||||
)
|
||||
path_list = cast(list[object], path_allow_raw)
|
||||
collected: list[str] = []
|
||||
for j, p in enumerate(path_list):
|
||||
if not isinstance(p, str):
|
||||
die(
|
||||
f"{label} path_allowlist[{j}] must be a string "
|
||||
f"(was {type(p).__name__})"
|
||||
)
|
||||
if not p.startswith("/"):
|
||||
die(
|
||||
f"{label} path_allowlist[{j}] {p!r} must be an "
|
||||
f"absolute path prefix starting with '/'"
|
||||
)
|
||||
collected.append(p)
|
||||
prefixes = tuple(collected)
|
||||
|
||||
auth_scheme = ""
|
||||
token_ref = ""
|
||||
if "auth" in d:
|
||||
auth_raw = d.get("auth")
|
||||
auth_d = _as_json_object(auth_raw, f"{label} auth")
|
||||
if not auth_d:
|
||||
die(
|
||||
f"{label} auth is empty ({{}}); omit the 'auth' key "
|
||||
f"entirely if this route is unauthenticated. Otherwise "
|
||||
f"both 'scheme' and 'token_ref' are required."
|
||||
)
|
||||
auth_scheme_raw = auth_d.get("scheme")
|
||||
if not isinstance(auth_scheme_raw, str) or not auth_scheme_raw:
|
||||
die(
|
||||
f"{label} auth.scheme is required when 'auth' is set "
|
||||
f"(non-empty string)"
|
||||
)
|
||||
if auth_scheme_raw not in EGRESS_PROXY_AUTH_SCHEMES:
|
||||
die(
|
||||
f"{label} auth.scheme {auth_scheme_raw!r} is not one of "
|
||||
f"{', '.join(EGRESS_PROXY_AUTH_SCHEMES)}"
|
||||
)
|
||||
token_ref_raw = auth_d.get("token_ref")
|
||||
if not isinstance(token_ref_raw, str) or not token_ref_raw:
|
||||
die(
|
||||
f"{label} auth.token_ref is required when 'auth' is set "
|
||||
f"(name of the host env var holding the token value)"
|
||||
)
|
||||
for k in auth_d:
|
||||
if k not in ("scheme", "token_ref"):
|
||||
die(
|
||||
f"{label} auth has unknown key {k!r}; "
|
||||
f"only 'scheme' and 'token_ref' are accepted"
|
||||
)
|
||||
auth_scheme = auth_scheme_raw
|
||||
token_ref = token_ref_raw
|
||||
|
||||
for k in d:
|
||||
if k not in ("host", "path_allowlist", "auth"):
|
||||
die(
|
||||
f"{label} has unknown key {k!r}; accepted keys are "
|
||||
f"'host', 'path_allowlist', 'auth'"
|
||||
)
|
||||
|
||||
return cls(
|
||||
Host=host,
|
||||
PathAllowlist=prefixes,
|
||||
AuthScheme=auth_scheme,
|
||||
TokenRef=token_ref,
|
||||
)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class EgressProxyConfig:
|
||||
"""Per-bottle egress-proxy configuration. Today this is just the
|
||||
route table; the nesting under `egress_proxy:` leaves room for
|
||||
per-bottle proxy settings (port override, log level, etc.) in
|
||||
follow-ups."""
|
||||
|
||||
routes: tuple[EgressProxyRoute, ...] = ()
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, bottle_name: str, raw: object) -> "EgressProxyConfig":
|
||||
d = _as_json_object(raw, f"bottle '{bottle_name}' egress_proxy")
|
||||
routes_raw = d.get("routes")
|
||||
routes: tuple[EgressProxyRoute, ...] = ()
|
||||
if routes_raw is not None:
|
||||
if not isinstance(routes_raw, list):
|
||||
die(
|
||||
f"bottle '{bottle_name}' egress_proxy.routes must be an array "
|
||||
f"(was {type(routes_raw).__name__})"
|
||||
)
|
||||
routes_list = cast(list[object], routes_raw)
|
||||
routes = tuple(
|
||||
EgressProxyRoute.from_dict(bottle_name, i, entry)
|
||||
for i, entry in enumerate(routes_list)
|
||||
)
|
||||
_validate_egress_proxy_routes(bottle_name, routes)
|
||||
for k in d:
|
||||
if k != "routes":
|
||||
die(
|
||||
f"bottle '{bottle_name}' egress_proxy has unknown key {k!r}; "
|
||||
f"only 'routes' is accepted"
|
||||
)
|
||||
return cls(routes=routes)
|
||||
|
||||
|
||||
DLP_ACTIONS = ("block", "warn")
|
||||
|
||||
|
||||
@@ -328,6 +481,7 @@ class Bottle:
|
||||
env: Mapping[str, str] = field(default_factory=_empty_str_dict)
|
||||
git: tuple[GitEntry, ...] = ()
|
||||
cred_proxy: CredProxyConfig = field(default_factory=CredProxyConfig)
|
||||
egress_proxy: EgressProxyConfig = field(default_factory=EgressProxyConfig)
|
||||
egress: BottleEgress = field(default_factory=BottleEgress)
|
||||
# Opt-in per-bottle stuck-recovery sidecar (PRD 0013). When true,
|
||||
# the launch step brings up a supervise sidecar that exposes three
|
||||
@@ -396,6 +550,12 @@ class Bottle:
|
||||
else CredProxyConfig()
|
||||
)
|
||||
|
||||
egress_proxy = (
|
||||
EgressProxyConfig.from_dict(name, d["egress_proxy"])
|
||||
if "egress_proxy" in d
|
||||
else EgressProxyConfig()
|
||||
)
|
||||
|
||||
egress_raw = d.get("egress")
|
||||
egress = (
|
||||
BottleEgress.from_dict(name, egress_raw)
|
||||
@@ -411,8 +571,8 @@ class Bottle:
|
||||
)
|
||||
|
||||
return cls(
|
||||
env=env, git=git, cred_proxy=cred_proxy, egress=egress,
|
||||
supervise=supervise_raw,
|
||||
env=env, git=git, cred_proxy=cred_proxy, egress_proxy=egress_proxy,
|
||||
egress=egress, supervise=supervise_raw,
|
||||
)
|
||||
|
||||
|
||||
@@ -740,6 +900,31 @@ def _validate_cred_proxy_routes(
|
||||
)
|
||||
|
||||
|
||||
def _validate_egress_proxy_routes(
|
||||
bottle_name: str,
|
||||
routes: tuple[EgressProxyRoute, ...],
|
||||
) -> None:
|
||||
"""Cross-validation for `bottle.egress_proxy.routes`:
|
||||
|
||||
- Hosts must be unique within the bottle. The proxy matches by
|
||||
exact-host (v1, prefix matching is on path_allowlist only);
|
||||
duplicate hosts leave the route choice ambiguous.
|
||||
|
||||
No cross-validation against `bottle.git` is performed. git-gate
|
||||
(SSH push/fetch) and egress-proxy (HTTPS) broker different
|
||||
protocols; declaring both for the same host is a legitimate
|
||||
dev setup."""
|
||||
seen_hosts: dict[str, None] = {}
|
||||
for r in routes:
|
||||
key = r.Host.lower()
|
||||
if key in seen_hosts:
|
||||
die(
|
||||
f"bottle '{bottle_name}' egress_proxy.routes has duplicate host "
|
||||
f"{r.Host!r}; each host must be unique on the proxy."
|
||||
)
|
||||
seen_hosts[key] = None
|
||||
|
||||
|
||||
def _validate_unique_git_names(bottle_name: str, git: tuple[GitEntry, ...]) -> None:
|
||||
seen: dict[str, None] = {}
|
||||
for g in git:
|
||||
@@ -764,7 +949,9 @@ _FILENAME_RX = re.compile(r"^[a-z][a-z0-9-]*$")
|
||||
# Frontmatter keys we accept on each entity. Anything not in these
|
||||
# sets dies with a "did you mean" pointer — typos shouldn't silently
|
||||
# ghost into an empty config.
|
||||
_BOTTLE_KEYS = frozenset({"env", "git", "cred_proxy", "egress", "supervise"})
|
||||
_BOTTLE_KEYS = frozenset(
|
||||
{"env", "git", "cred_proxy", "egress_proxy", "egress", "supervise"}
|
||||
)
|
||||
_AGENT_KEYS_REQUIRED = frozenset({"bottle"})
|
||||
_AGENT_KEYS_OPTIONAL = frozenset({"skills"})
|
||||
# Claude Code subagent fields claude-bottle ignores at launch but
|
||||
|
||||
@@ -0,0 +1,185 @@
|
||||
"""Unit: EgressProxy route lift + routes.yaml render + token
|
||||
resolution (PRD 0017)."""
|
||||
|
||||
import json
|
||||
import unittest
|
||||
|
||||
from claude_bottle.egress_proxy import (
|
||||
egress_proxy_render_routes,
|
||||
egress_proxy_resolve_token_values,
|
||||
egress_proxy_routes_for_bottle,
|
||||
egress_proxy_token_env_map,
|
||||
)
|
||||
from claude_bottle.log import Die
|
||||
from claude_bottle.manifest import Manifest
|
||||
|
||||
|
||||
def _bottle(routes):
|
||||
return Manifest.from_json_obj({
|
||||
"bottles": {"dev": {"egress_proxy": {"routes": routes}}},
|
||||
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
|
||||
}).bottles["dev"]
|
||||
|
||||
|
||||
class TestRoutesForBottle(unittest.TestCase):
|
||||
def test_authenticated_route_gets_slot(self):
|
||||
b = _bottle([{
|
||||
"host": "api.github.com",
|
||||
"auth": {"scheme": "Bearer", "token_ref": "GH_PAT"},
|
||||
}])
|
||||
routes = egress_proxy_routes_for_bottle(b)
|
||||
self.assertEqual(1, len(routes))
|
||||
r = routes[0]
|
||||
self.assertEqual("api.github.com", r.host)
|
||||
self.assertEqual("Bearer", r.auth_scheme)
|
||||
self.assertEqual("EGRESS_PROXY_TOKEN_0", r.token_env)
|
||||
self.assertEqual("GH_PAT", r.token_ref)
|
||||
self.assertEqual((), r.path_allowlist)
|
||||
|
||||
def test_unauthenticated_route_has_empty_auth_fields(self):
|
||||
b = _bottle([{"host": "github.com", "path_allowlist": ["/x/"]}])
|
||||
routes = egress_proxy_routes_for_bottle(b)
|
||||
r = routes[0]
|
||||
self.assertEqual("", r.auth_scheme)
|
||||
self.assertEqual("", r.token_env)
|
||||
self.assertEqual("", r.token_ref)
|
||||
self.assertEqual(("/x/",), r.path_allowlist)
|
||||
|
||||
def test_shared_token_ref_collapses_to_one_slot(self):
|
||||
b = _bottle([
|
||||
{"host": "api.github.com",
|
||||
"auth": {"scheme": "Bearer", "token_ref": "GH_PAT"}},
|
||||
{"host": "github.com",
|
||||
"auth": {"scheme": "Bearer", "token_ref": "GH_PAT"}},
|
||||
])
|
||||
routes = egress_proxy_routes_for_bottle(b)
|
||||
slots = {r.token_env for r in routes}
|
||||
self.assertEqual({"EGRESS_PROXY_TOKEN_0"}, slots)
|
||||
|
||||
def test_distinct_token_refs_get_distinct_slots(self):
|
||||
b = _bottle([
|
||||
{"host": "a.example",
|
||||
"auth": {"scheme": "Bearer", "token_ref": "T1"}},
|
||||
{"host": "b.example",
|
||||
"auth": {"scheme": "Bearer", "token_ref": "T2"}},
|
||||
])
|
||||
routes = egress_proxy_routes_for_bottle(b)
|
||||
slots = [r.token_env for r in routes]
|
||||
self.assertEqual(["EGRESS_PROXY_TOKEN_0", "EGRESS_PROXY_TOKEN_1"], slots)
|
||||
|
||||
def test_unauthenticated_routes_dont_consume_slots(self):
|
||||
# A bare-pass route between two authenticated routes mustn't
|
||||
# skip a slot number — slot 0 + slot 1 stay tight.
|
||||
b = _bottle([
|
||||
{"host": "a.example",
|
||||
"auth": {"scheme": "Bearer", "token_ref": "T1"}},
|
||||
{"host": "passthrough.example"},
|
||||
{"host": "b.example",
|
||||
"auth": {"scheme": "Bearer", "token_ref": "T2"}},
|
||||
])
|
||||
routes = egress_proxy_routes_for_bottle(b)
|
||||
authed = [r.token_env for r in routes if r.token_env]
|
||||
self.assertEqual(["EGRESS_PROXY_TOKEN_0", "EGRESS_PROXY_TOKEN_1"], authed)
|
||||
self.assertEqual("", routes[1].token_env)
|
||||
|
||||
|
||||
class TestTokenEnvMap(unittest.TestCase):
|
||||
def test_only_authenticated_routes_contribute(self):
|
||||
b = _bottle([
|
||||
{"host": "a.example",
|
||||
"auth": {"scheme": "Bearer", "token_ref": "T1"}},
|
||||
{"host": "passthrough.example"},
|
||||
])
|
||||
routes = egress_proxy_routes_for_bottle(b)
|
||||
m = egress_proxy_token_env_map(routes)
|
||||
self.assertEqual({"EGRESS_PROXY_TOKEN_0": "T1"}, m)
|
||||
|
||||
def test_no_routes_empty(self):
|
||||
self.assertEqual({}, egress_proxy_token_env_map(()))
|
||||
|
||||
|
||||
class TestRenderRoutes(unittest.TestCase):
|
||||
def test_authenticated_route_serialised_with_auth_fields(self):
|
||||
b = _bottle([{
|
||||
"host": "api.github.com",
|
||||
"auth": {"scheme": "Bearer", "token_ref": "GH_PAT"},
|
||||
"path_allowlist": ["/repos/x/"],
|
||||
}])
|
||||
routes = egress_proxy_routes_for_bottle(b)
|
||||
payload = json.loads(egress_proxy_render_routes(routes))
|
||||
self.assertEqual(
|
||||
[{
|
||||
"host": "api.github.com",
|
||||
"path_allowlist": ["/repos/x/"],
|
||||
"auth_scheme": "Bearer",
|
||||
"token_env": "EGRESS_PROXY_TOKEN_0",
|
||||
}],
|
||||
payload["routes"],
|
||||
)
|
||||
|
||||
def test_unauthenticated_route_omits_auth_fields(self):
|
||||
# auth_scheme + token_env keys are absent when the route was
|
||||
# declared without an `auth` block — the addon's parser
|
||||
# enforces both-or-neither, so emitting empty strings would
|
||||
# round-trip as a partial pair and crash.
|
||||
b = _bottle([{"host": "github.com", "path_allowlist": ["/x/"]}])
|
||||
routes = egress_proxy_routes_for_bottle(b)
|
||||
payload = json.loads(egress_proxy_render_routes(routes))
|
||||
entry = payload["routes"][0]
|
||||
self.assertNotIn("auth_scheme", entry)
|
||||
self.assertNotIn("token_env", entry)
|
||||
|
||||
def test_no_path_allowlist_omits_field(self):
|
||||
b = _bottle([{
|
||||
"host": "api.anthropic.com",
|
||||
"auth": {"scheme": "Bearer", "token_ref": "CL"},
|
||||
}])
|
||||
routes = egress_proxy_routes_for_bottle(b)
|
||||
payload = json.loads(egress_proxy_render_routes(routes))
|
||||
self.assertNotIn("path_allowlist", payload["routes"][0])
|
||||
|
||||
def test_round_trip_through_addon_core(self):
|
||||
# Render here → parse in the addon must succeed for every
|
||||
# combination the manifest can produce.
|
||||
from claude_bottle.egress_proxy_addon_core import load_routes
|
||||
b = _bottle([
|
||||
{"host": "api.github.com",
|
||||
"auth": {"scheme": "Bearer", "token_ref": "GH_PAT"},
|
||||
"path_allowlist": ["/repos/x/"]},
|
||||
{"host": "github.com", "path_allowlist": ["/x/"]},
|
||||
{"host": "api.anthropic.com"},
|
||||
])
|
||||
routes = egress_proxy_routes_for_bottle(b)
|
||||
addon_routes = load_routes(egress_proxy_render_routes(routes))
|
||||
self.assertEqual(3, len(addon_routes))
|
||||
self.assertEqual("Bearer", addon_routes[0].auth_scheme)
|
||||
self.assertEqual("EGRESS_PROXY_TOKEN_0", addon_routes[0].token_env)
|
||||
self.assertEqual("", addon_routes[1].auth_scheme)
|
||||
self.assertEqual("", addon_routes[2].auth_scheme)
|
||||
|
||||
|
||||
class TestResolveTokenValues(unittest.TestCase):
|
||||
def test_reads_host_env(self):
|
||||
out = egress_proxy_resolve_token_values(
|
||||
{"EGRESS_PROXY_TOKEN_0": "GH_PAT"},
|
||||
{"GH_PAT": "the-value"},
|
||||
)
|
||||
self.assertEqual({"EGRESS_PROXY_TOKEN_0": "the-value"}, out)
|
||||
|
||||
def test_missing_token_ref_dies(self):
|
||||
with self.assertRaises(Die):
|
||||
egress_proxy_resolve_token_values(
|
||||
{"EGRESS_PROXY_TOKEN_0": "GH_PAT"},
|
||||
{},
|
||||
)
|
||||
|
||||
def test_empty_token_ref_dies(self):
|
||||
with self.assertRaises(Die):
|
||||
egress_proxy_resolve_token_values(
|
||||
{"EGRESS_PROXY_TOKEN_0": "GH_PAT"},
|
||||
{"GH_PAT": ""},
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,249 @@
|
||||
"""Unit: pure-logic core of the egress-proxy mitmproxy addon (PRD 0017).
|
||||
|
||||
These tests target `egress_proxy_addon_core` — the host-importable
|
||||
half of the addon. The mitmproxy hook wrapper in
|
||||
`egress_proxy_addon.py` is container-only and is not exercised here."""
|
||||
|
||||
import unittest
|
||||
|
||||
from claude_bottle.egress_proxy_addon_core import (
|
||||
Decision,
|
||||
Route,
|
||||
decide,
|
||||
load_routes,
|
||||
match_route,
|
||||
parse_routes,
|
||||
)
|
||||
|
||||
|
||||
# --- parse_routes --------------------------------------------------------
|
||||
|
||||
|
||||
class TestParseRoutes(unittest.TestCase):
|
||||
def test_minimal_route(self):
|
||||
routes = parse_routes({"routes": [{"host": "api.github.com"}]})
|
||||
self.assertEqual(1, len(routes))
|
||||
self.assertEqual("api.github.com", routes[0].host)
|
||||
self.assertEqual((), routes[0].path_allowlist)
|
||||
self.assertEqual("", routes[0].auth_scheme)
|
||||
self.assertEqual("", routes[0].token_env)
|
||||
|
||||
def test_full_route(self):
|
||||
routes = parse_routes({"routes": [{
|
||||
"host": "api.github.com",
|
||||
"path_allowlist": ["/repos/x/", "/users/x"],
|
||||
"auth_scheme": "Bearer",
|
||||
"token_env": "EGRESS_PROXY_TOKEN_0",
|
||||
}]})
|
||||
r = routes[0]
|
||||
self.assertEqual(("/repos/x/", "/users/x"), r.path_allowlist)
|
||||
self.assertEqual("Bearer", r.auth_scheme)
|
||||
self.assertEqual("EGRESS_PROXY_TOKEN_0", r.token_env)
|
||||
|
||||
def test_order_preserved(self):
|
||||
# Host match is exact (not longest-prefix), but the file order
|
||||
# is preserved anyway so the operator's mental model matches
|
||||
# what the proxy sees.
|
||||
routes = parse_routes({"routes": [
|
||||
{"host": "a.example"},
|
||||
{"host": "b.example"},
|
||||
{"host": "c.example"},
|
||||
]})
|
||||
self.assertEqual(
|
||||
["a.example", "b.example", "c.example"],
|
||||
[r.host for r in routes],
|
||||
)
|
||||
|
||||
def test_partial_auth_pair_rejected(self):
|
||||
# auth_scheme without token_env is a renderer bug (the manifest's
|
||||
# `auth: { scheme, token_ref }` block writes both at once).
|
||||
with self.assertRaises(ValueError) as cm:
|
||||
parse_routes({"routes": [{
|
||||
"host": "x.example",
|
||||
"auth_scheme": "Bearer",
|
||||
}]})
|
||||
self.assertIn("both set or both empty", str(cm.exception))
|
||||
|
||||
def test_partial_auth_other_direction_rejected(self):
|
||||
with self.assertRaises(ValueError) as cm:
|
||||
parse_routes({"routes": [{
|
||||
"host": "x.example",
|
||||
"token_env": "EGRESS_PROXY_TOKEN_0",
|
||||
}]})
|
||||
self.assertIn("both set or both empty", str(cm.exception))
|
||||
|
||||
def test_path_allowlist_must_be_absolute(self):
|
||||
with self.assertRaises(ValueError) as cm:
|
||||
parse_routes({"routes": [{
|
||||
"host": "x.example",
|
||||
"path_allowlist": ["no-leading-slash/"],
|
||||
}]})
|
||||
self.assertIn("absolute path prefix", str(cm.exception))
|
||||
|
||||
def test_path_allowlist_items_must_be_strings(self):
|
||||
with self.assertRaises(ValueError):
|
||||
parse_routes({"routes": [{
|
||||
"host": "x.example",
|
||||
"path_allowlist": [42],
|
||||
}]})
|
||||
|
||||
def test_top_level_must_be_object(self):
|
||||
with self.assertRaises(ValueError):
|
||||
parse_routes(["not", "an", "object"])
|
||||
|
||||
def test_routes_must_be_list(self):
|
||||
with self.assertRaises(ValueError):
|
||||
parse_routes({"routes": "not a list"})
|
||||
|
||||
def test_route_must_have_host(self):
|
||||
with self.assertRaises(ValueError):
|
||||
parse_routes({"routes": [{}]})
|
||||
|
||||
|
||||
# --- load_routes ---------------------------------------------------------
|
||||
|
||||
|
||||
class TestLoadRoutes(unittest.TestCase):
|
||||
def test_json_text_round_trip(self):
|
||||
routes = load_routes('{"routes":[{"host":"api.example"}]}')
|
||||
self.assertEqual(1, len(routes))
|
||||
self.assertEqual("api.example", routes[0].host)
|
||||
|
||||
def test_invalid_json_raises_value_error(self):
|
||||
# Both decode and schema errors land as ValueError so callers
|
||||
# have a single except clause.
|
||||
with self.assertRaises(ValueError):
|
||||
load_routes("not json at all")
|
||||
|
||||
|
||||
# --- match_route ---------------------------------------------------------
|
||||
|
||||
|
||||
class TestMatchRoute(unittest.TestCase):
|
||||
ROUTES = (
|
||||
Route(host="api.github.com"),
|
||||
Route(host="github.com", path_allowlist=("/x/",)),
|
||||
)
|
||||
|
||||
def test_exact_match(self):
|
||||
r = match_route(self.ROUTES, "api.github.com")
|
||||
self.assertIsNotNone(r)
|
||||
self.assertEqual("api.github.com", r.host)
|
||||
|
||||
def test_case_insensitive(self):
|
||||
# DNS hostnames are case-insensitive per RFC 1035; mitmproxy
|
||||
# surfaces the host as the agent wrote it, which may include
|
||||
# uppercase. Lookup must normalise.
|
||||
r = match_route(self.ROUTES, "API.GitHub.COM")
|
||||
self.assertIsNotNone(r)
|
||||
self.assertEqual("api.github.com", r.host)
|
||||
|
||||
def test_no_match_returns_none(self):
|
||||
self.assertIsNone(match_route(self.ROUTES, "elsewhere.example"))
|
||||
|
||||
def test_no_substring_or_prefix_matching(self):
|
||||
# api.github.com is in the table; github.com is too. Some
|
||||
# other-host shouldn't be matched via a "ends with" check.
|
||||
self.assertIsNone(match_route(self.ROUTES, "evil.api.github.com"))
|
||||
|
||||
|
||||
# --- decide --------------------------------------------------------------
|
||||
|
||||
|
||||
class TestDecide(unittest.TestCase):
|
||||
def test_no_matching_route_forwards(self):
|
||||
# Hostnames the operator didn't declare are not the
|
||||
# egress-proxy's concern; pipelock's hostname allowlist gates
|
||||
# them downstream.
|
||||
d = decide((), "elsewhere.example", "/anything", {})
|
||||
self.assertEqual("forward", d.action)
|
||||
self.assertIsNone(d.inject_authorization)
|
||||
|
||||
def test_path_allowlist_match_forwards(self):
|
||||
d = decide(
|
||||
(Route(host="github.com", path_allowlist=("/didericis/",)),),
|
||||
"github.com", "/didericis/repo", {},
|
||||
)
|
||||
self.assertEqual("forward", d.action)
|
||||
|
||||
def test_path_allowlist_miss_blocks(self):
|
||||
d = decide(
|
||||
(Route(host="github.com", path_allowlist=("/didericis/",)),),
|
||||
"github.com", "/somebody-else/secret", {},
|
||||
)
|
||||
self.assertEqual("block", d.action)
|
||||
self.assertIn("path_allowlist", d.reason)
|
||||
self.assertIn("'github.com'", d.reason)
|
||||
|
||||
def test_empty_path_allowlist_means_no_constraint(self):
|
||||
# Bare-pass route: declared but no path filtering.
|
||||
d = decide(
|
||||
(Route(host="api.anthropic.com"),),
|
||||
"api.anthropic.com", "/v1/messages", {},
|
||||
)
|
||||
self.assertEqual("forward", d.action)
|
||||
|
||||
def test_auth_injection_uses_environ_value(self):
|
||||
d = decide(
|
||||
(Route(host="api.github.com", auth_scheme="Bearer",
|
||||
token_env="EGRESS_PROXY_TOKEN_0"),),
|
||||
"api.github.com", "/repos/x", {"EGRESS_PROXY_TOKEN_0": "the-token"},
|
||||
)
|
||||
self.assertEqual("forward", d.action)
|
||||
self.assertEqual("Bearer the-token", d.inject_authorization)
|
||||
|
||||
def test_auth_with_missing_token_env_blocks(self):
|
||||
# The route declared auth but the secret isn't in the
|
||||
# container's env — operator misconfig at start-time, blocked
|
||||
# with a clear reason rather than forwarding an unauthenticated
|
||||
# request the upstream would reject.
|
||||
d = decide(
|
||||
(Route(host="api.github.com", auth_scheme="Bearer",
|
||||
token_env="EGRESS_PROXY_TOKEN_0"),),
|
||||
"api.github.com", "/repos/x", {},
|
||||
)
|
||||
self.assertEqual("block", d.action)
|
||||
self.assertIn("EGRESS_PROXY_TOKEN_0", d.reason)
|
||||
|
||||
def test_auth_with_empty_token_env_blocks(self):
|
||||
# Empty env var is treated the same as unset — we don't inject
|
||||
# a literal "Bearer " (blank token) which would burn the
|
||||
# upstream rate limit with a 401.
|
||||
d = decide(
|
||||
(Route(host="api.github.com", auth_scheme="Bearer",
|
||||
token_env="EGRESS_PROXY_TOKEN_0"),),
|
||||
"api.github.com", "/repos/x", {"EGRESS_PROXY_TOKEN_0": ""},
|
||||
)
|
||||
self.assertEqual("block", d.action)
|
||||
|
||||
def test_unauthenticated_route_skips_injection(self):
|
||||
d = decide(
|
||||
(Route(host="github.com", path_allowlist=("/x/",)),),
|
||||
"github.com", "/x/repo", {"GH_PAT": "should-not-appear"},
|
||||
)
|
||||
self.assertEqual("forward", d.action)
|
||||
self.assertIsNone(d.inject_authorization)
|
||||
|
||||
def test_token_token_scheme(self):
|
||||
# Gitea uses `Authorization: token <pat>` (sidesteps
|
||||
# go-gitea/gitea#16734). The addon is scheme-agnostic.
|
||||
d = decide(
|
||||
(Route(host="git.example", auth_scheme="token",
|
||||
token_env="EGRESS_PROXY_TOKEN_0"),),
|
||||
"git.example", "/api/v1/repos", {"EGRESS_PROXY_TOKEN_0": "abc"},
|
||||
)
|
||||
self.assertEqual("token abc", d.inject_authorization)
|
||||
|
||||
|
||||
# --- Decision dataclass --------------------------------------------------
|
||||
|
||||
|
||||
class TestDecisionDefaults(unittest.TestCase):
|
||||
def test_forward_default_has_no_reason_or_inject(self):
|
||||
d = Decision(action="forward")
|
||||
self.assertEqual("", d.reason)
|
||||
self.assertIsNone(d.inject_authorization)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,173 @@
|
||||
"""Unit: manifest parsing for `bottle.egress_proxy.routes[]` (PRD 0017).
|
||||
|
||||
The route shape is new: `host` (required), optional `path_allowlist`,
|
||||
optional nested `auth: { scheme, token_ref }`. Validation rules per
|
||||
the PRD: empty `auth: {}` is an error, partial `auth` is an error,
|
||||
auth omission means unauthenticated."""
|
||||
|
||||
import unittest
|
||||
|
||||
from claude_bottle.log import Die
|
||||
from claude_bottle.manifest import EgressProxyRoute, Manifest
|
||||
|
||||
|
||||
def _bottle(routes):
|
||||
return Manifest.from_json_obj({
|
||||
"bottles": {"dev": {"egress_proxy": {"routes": routes}}},
|
||||
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
|
||||
}).bottles["dev"]
|
||||
|
||||
|
||||
class TestMinimalRoute(unittest.TestCase):
|
||||
def test_host_only(self):
|
||||
b = _bottle([{"host": "api.example.com"}])
|
||||
self.assertEqual(1, len(b.egress_proxy.routes))
|
||||
r = b.egress_proxy.routes[0]
|
||||
self.assertEqual("api.example.com", r.Host)
|
||||
self.assertEqual((), r.PathAllowlist)
|
||||
self.assertEqual("", r.AuthScheme)
|
||||
self.assertEqual("", r.TokenRef)
|
||||
|
||||
def test_host_required(self):
|
||||
with self.assertRaises(Die):
|
||||
_bottle([{}])
|
||||
|
||||
def test_host_must_be_non_empty(self):
|
||||
with self.assertRaises(Die):
|
||||
_bottle([{"host": ""}])
|
||||
|
||||
def test_unknown_top_level_key_dies(self):
|
||||
with self.assertRaises(Die):
|
||||
_bottle([{"host": "x.example", "wat": "yes"}])
|
||||
|
||||
|
||||
class TestPathAllowlist(unittest.TestCase):
|
||||
def test_optional(self):
|
||||
b = _bottle([{"host": "x.example"}])
|
||||
self.assertEqual((), b.egress_proxy.routes[0].PathAllowlist)
|
||||
|
||||
def test_must_be_array(self):
|
||||
with self.assertRaises(Die):
|
||||
_bottle([{"host": "x.example", "path_allowlist": "/x/"}])
|
||||
|
||||
def test_items_must_be_strings(self):
|
||||
with self.assertRaises(Die):
|
||||
_bottle([{"host": "x.example", "path_allowlist": [42]}])
|
||||
|
||||
def test_items_must_be_absolute_paths(self):
|
||||
with self.assertRaises(Die):
|
||||
_bottle([{"host": "x.example", "path_allowlist": ["nope/"]}])
|
||||
|
||||
def test_full_list(self):
|
||||
b = _bottle([{
|
||||
"host": "github.com",
|
||||
"path_allowlist": ["/didericis/", "/users/didericis"],
|
||||
}])
|
||||
self.assertEqual(
|
||||
("/didericis/", "/users/didericis"),
|
||||
b.egress_proxy.routes[0].PathAllowlist,
|
||||
)
|
||||
|
||||
|
||||
class TestAuth(unittest.TestCase):
|
||||
def test_omitted_means_no_auth(self):
|
||||
b = _bottle([{"host": "github.com"}])
|
||||
r = b.egress_proxy.routes[0]
|
||||
self.assertEqual("", r.AuthScheme)
|
||||
self.assertEqual("", r.TokenRef)
|
||||
|
||||
def test_full_auth(self):
|
||||
b = _bottle([{
|
||||
"host": "api.github.com",
|
||||
"auth": {"scheme": "Bearer", "token_ref": "GH_PAT"},
|
||||
}])
|
||||
r = b.egress_proxy.routes[0]
|
||||
self.assertEqual("Bearer", r.AuthScheme)
|
||||
self.assertEqual("GH_PAT", r.TokenRef)
|
||||
|
||||
def test_empty_auth_block_rejected(self):
|
||||
# Per PRD 0017: `auth: {}` is an error, not a synonym for
|
||||
# "no auth" — that's what omission is for.
|
||||
with self.assertRaises(Die):
|
||||
_bottle([{"host": "x.example", "auth": {}}])
|
||||
|
||||
def test_missing_scheme_rejected(self):
|
||||
with self.assertRaises(Die):
|
||||
_bottle([{
|
||||
"host": "x.example",
|
||||
"auth": {"token_ref": "T"},
|
||||
}])
|
||||
|
||||
def test_missing_token_ref_rejected(self):
|
||||
with self.assertRaises(Die):
|
||||
_bottle([{
|
||||
"host": "x.example",
|
||||
"auth": {"scheme": "Bearer"},
|
||||
}])
|
||||
|
||||
def test_unknown_scheme_rejected(self):
|
||||
with self.assertRaises(Die):
|
||||
_bottle([{
|
||||
"host": "x.example",
|
||||
"auth": {"scheme": "Basic", "token_ref": "T"},
|
||||
}])
|
||||
|
||||
def test_token_scheme_allowed(self):
|
||||
# Gitea quirk: `Authorization: token <pat>` (not Bearer).
|
||||
b = _bottle([{
|
||||
"host": "git.example",
|
||||
"auth": {"scheme": "token", "token_ref": "GITEA_PAT"},
|
||||
}])
|
||||
self.assertEqual("token", b.egress_proxy.routes[0].AuthScheme)
|
||||
|
||||
def test_unknown_auth_key_rejected(self):
|
||||
with self.assertRaises(Die):
|
||||
_bottle([{
|
||||
"host": "x.example",
|
||||
"auth": {"scheme": "Bearer", "token_ref": "T", "extra": "no"},
|
||||
}])
|
||||
|
||||
|
||||
class TestRouteValidation(unittest.TestCase):
|
||||
def test_duplicate_hosts_rejected(self):
|
||||
# Routes match by exact host; duplicates leave the choice
|
||||
# ambiguous, so we reject them up front rather than picking
|
||||
# the first/last silently.
|
||||
with self.assertRaises(Die):
|
||||
_bottle([
|
||||
{"host": "github.com"},
|
||||
{"host": "github.com", "path_allowlist": ["/x/"]},
|
||||
])
|
||||
|
||||
def test_duplicate_host_case_insensitive(self):
|
||||
with self.assertRaises(Die):
|
||||
_bottle([
|
||||
{"host": "GitHub.com"},
|
||||
{"host": "github.com"},
|
||||
])
|
||||
|
||||
def test_empty_routes_allowed(self):
|
||||
b = _bottle([])
|
||||
self.assertEqual((), b.egress_proxy.routes)
|
||||
|
||||
def test_no_egress_proxy_block_means_empty(self):
|
||||
# The bottle dataclass defaults to an empty EgressProxyConfig.
|
||||
b = Manifest.from_json_obj({
|
||||
"bottles": {"dev": {}},
|
||||
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
|
||||
}).bottles["dev"]
|
||||
self.assertEqual((), b.egress_proxy.routes)
|
||||
|
||||
|
||||
class TestConfigShape(unittest.TestCase):
|
||||
def test_unknown_egress_proxy_key_rejected(self):
|
||||
with self.assertRaises(Die):
|
||||
Manifest.from_json_obj({
|
||||
"bottles": {"dev": {"egress_proxy": {"wat": []}}},
|
||||
"agents": {"demo": {"skills": [], "prompt": "",
|
||||
"bottle": "dev"}},
|
||||
})
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user