Files
bot-bottle/claude_bottle/egress_proxy_addon_core.py
T
didericis f807ed1149
test / unit (pull_request) Successful in 17s
test / integration (pull_request) Successful in 1m5s
fix(egress-proxy): force traffic through pipelock + block unallowlisted hosts
Two issues stopping the bottle's egress allowlist from being
enforced:

1. mitmproxy was bypassing pipelock. We set HTTPS_PROXY=pipelock
   in the egress-proxy container's env, but mitmproxy is a proxy
   *server* — it does NOT honor HTTP(S)_PROXY env vars on its
   outbound side the way HTTP-client libraries do. All
   post-MITM traffic was going direct to the upstream, never
   touching pipelock's hostname allowlist or DLP scanner.

   Fix: use mitmproxy's `--mode upstream:URL` flag. The Dockerfile
   entrypoint now reads a new `EGRESS_PROXY_UPSTREAM_PROXY` env
   (set by `DockerEgressProxy.start` to the pipelock URL when
   pipelock is in the topology) and switches mitmdump to
   upstream-proxy mode. Standalone runs of the image without the
   env still get `--mode regular@9099` direct-to-upstream — useful
   for unit-test boots. Confirmed in the boot log: "HTTP(S) proxy
   (upstream mode) listening at *:9099."

2. egress-proxy was forwarding unrecognized hosts. The addon's
   `decide()` returned `Decision(action="forward")` whenever no
   route matched the request host, deferring to pipelock to gate.
   With #1 broken pipelock wasn't gating either; even with #1
   fixed, defense-in-depth wants both layers enforcing.

   Fix: no-route-match → 403 with a "host not in allowlist"
   reason. The egress allowlist is now strictly the set of hosts
   declared in `bottle.egress_proxy.routes`; bare-pass routes
   (host with no auth, no path_allowlist) cover the passthrough
   case for hosts that just need reach. path_allowlist enforcement
   on matched routes is unchanged.

Test updated: `test_no_matching_route_forwards` →
`test_no_matching_route_blocks`. 364 unit tests pass.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-25 16:38:18 -04:00

255 lines
8.8 KiB
Python

"""Pure logic for the egress-proxy mitmproxy addon (PRD 0017).
Split out of `egress_proxy_addon.py` so the host's unit tests can
exercise the parse + decision functions without depending on the
`mitmproxy` package. The companion module wraps these with the
`mitmproxy.http.HTTPFlow` API and is loaded inside the sidecar
container.
Stdlib only: this file ships into the egress-proxy image, where the
container's Python is whatever mitmproxy itself runs on.
"""
from __future__ import annotations
import json
import typing
from dataclasses import dataclass
@dataclass(frozen=True)
class Route:
"""One row of the egress-proxy route table.
`host` is the request's `Host` header (or SNI hostname) to match
against. `path_allowlist` is an optional tuple of absolute path
prefixes the request path must start with; empty tuple means no
path constraint. `auth_scheme` and `token_env` together form the
credential-injection pair (both set or both empty); a non-empty
pair tells the addon to overwrite the inbound Authorization with
`<auth_scheme> <value-of-environ[token_env]>`.
"""
host: str
path_allowlist: tuple[str, ...] = ()
auth_scheme: str = ""
token_env: str = ""
@dataclass(frozen=True)
class Decision:
"""The result of `decide()`. Either forward (with optional
`inject_authorization` header) or block (with a `reason` to surface
to the agent)."""
action: str # "forward" or "block"
reason: str = ""
inject_authorization: str | None = None
def parse_routes(payload: object) -> tuple[Route, ...]:
"""Parse the routes-file payload (already JSON-decoded) into a
tuple of `Route`s. Raises `ValueError` on any malformed entry —
the caller decides whether to keep the old table or refuse to
start.
Schema:
{
"routes": [
{
"host": "api.github.com",
"path_allowlist": ["/repos/x/", "/users/x"], # optional
"auth_scheme": "Bearer", # optional
"token_env": "EGRESS_PROXY_TOKEN_0" # optional
},
...
]
}
"""
if not isinstance(payload, dict):
raise ValueError("routes payload: top-level must be an object")
raw = payload.get("routes")
if not isinstance(raw, list):
raise ValueError("routes payload: 'routes' must be a list")
out: list[Route] = []
for i, r in enumerate(raw):
out.append(_parse_one(i, r))
return tuple(out)
def _parse_one(idx: int, raw: object) -> Route:
label = f"route[{idx}]"
if not isinstance(raw, dict):
raise ValueError(f"{label}: must be an object (got {type(raw).__name__})")
host = raw.get("host")
if not isinstance(host, str) or not host:
raise ValueError(f"{label}: 'host' must be a non-empty string")
path_allow_raw = raw.get("path_allowlist", [])
if not isinstance(path_allow_raw, list):
raise ValueError(f"{label} ({host}): 'path_allowlist' must be a list")
prefixes: list[str] = []
for j, p in enumerate(path_allow_raw):
if not isinstance(p, str):
raise ValueError(
f"{label} ({host}): path_allowlist[{j}] must be a string"
)
if not p.startswith("/"):
raise ValueError(
f"{label} ({host}): path_allowlist[{j}] {p!r} must be an "
f"absolute path prefix starting with '/'"
)
prefixes.append(p)
auth_scheme = raw.get("auth_scheme", "")
token_env = raw.get("token_env", "")
if not isinstance(auth_scheme, str):
raise ValueError(f"{label} ({host}): 'auth_scheme' must be a string")
if not isinstance(token_env, str):
raise ValueError(f"{label} ({host}): 'token_env' must be a string")
# Both-or-neither: 'auth' on the manifest side renders to this
# pair atomically. A partial pair here means the renderer or a
# hand-edited file is broken.
if bool(auth_scheme) != bool(token_env):
raise ValueError(
f"{label} ({host}): 'auth_scheme' and 'token_env' must be both "
f"set or both empty (got auth_scheme={auth_scheme!r}, "
f"token_env={token_env!r})"
)
return Route(
host=host,
path_allowlist=tuple(prefixes),
auth_scheme=auth_scheme,
token_env=token_env,
)
def load_routes(text: str) -> tuple[Route, ...]:
"""Convenience: parse JSON text → routes. Raises `ValueError` for
both decode and shape errors so callers handle them uniformly."""
try:
payload = json.loads(text)
except json.JSONDecodeError as e:
raise ValueError(f"routes payload: invalid JSON: {e}") from e
return parse_routes(payload)
def is_git_push_request(path: str, query: str) -> bool:
"""Return True if the request is a git smart-HTTP push.
git push over HTTPS hits two endpoints:
GET <repo>/info/refs?service=git-receive-pack (capabilities)
POST <repo>/git-receive-pack (the push)
Fetches use `service=git-upload-pack` / `/git-upload-pack` and
are unaffected. Egress-proxy refuses HTTPS push because git-gate's
pre-receive gitleaks scan is the gate for outbound git data;
routing push through egress-proxy would bypass that. Use the
bottle.git SSH path if you need to push.
Universal across routes — the block fires even when no
egress_proxy route matches the host. A bare-pass route (host with
no auth, no path_allowlist) would otherwise let push through to
pipelock + upstream untouched.
"""
if path.endswith("/git-receive-pack"):
return True
if path.endswith("/info/refs"):
# Query string is parsed leniently — `service=git-receive-pack`
# may appear with other params in any order.
for pair in query.split("&"):
k, _, v = pair.partition("=")
if k == "service" and v == "git-receive-pack":
return True
return False
def match_route(
routes: typing.Sequence[Route],
request_host: str,
) -> Route | None:
"""Return the first route whose `host` matches `request_host`.
Exact match in v1 — globs / wildcards are a follow-up (per PRD
0017 open questions). Hostname comparison is case-insensitive
because DNS names are case-insensitive."""
target = request_host.lower()
for r in routes:
if r.host.lower() == target:
return r
return None
def decide(
routes: typing.Sequence[Route],
request_host: str,
request_path: str,
environ: typing.Mapping[str, str],
) -> Decision:
"""Pure decision: given a route table + request host + path + env,
return what the addon should do with the request.
- No matching route → BLOCK. The route table is the bottle's
egress allowlist; defense-in-depth complements pipelock's
hostname gate on the downstream leg. A bottle that wants a
host reachable from the agent must declare a route for it
(bare-pass route — no `auth`, no `path_allowlist` — is fine
for hosts that just need passthrough).
- Matching route with `path_allowlist` set, request path doesn't
start with any of the allowed prefixes → block with a clear
reason.
- Matching route with an auth pair → forward + inject
Authorization. Token comes from `environ[route.token_env]`;
missing/empty values block (route declared auth but the secret
isn't here — operator misconfig).
"""
route = match_route(routes, request_host)
if route is None:
return Decision(
action="block",
reason=(
f"egress-proxy: host {request_host!r} is not in the "
f"bottle's egress_proxy.routes allowlist. Declare a "
f"route for it or remove the request."
),
)
if route.path_allowlist:
if not any(request_path.startswith(p) for p in route.path_allowlist):
return Decision(
action="block",
reason=(
f"egress-proxy: path {request_path!r} not in "
f"path_allowlist for {route.host!r}"
),
)
if route.auth_scheme and route.token_env:
token = environ.get(route.token_env, "")
if not token:
return Decision(
action="block",
reason=(
f"egress-proxy: route for {route.host!r} declared auth "
f"but env var {route.token_env!r} is unset"
),
)
return Decision(
action="forward",
inject_authorization=f"{route.auth_scheme} {token}",
)
return Decision(action="forward")
__all__ = [
"Decision",
"Route",
"decide",
"is_git_push_request",
"load_routes",
"match_route",
"parse_routes",
]