Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 99ba532783 |
@@ -4,5 +4,6 @@ source = .
|
||||
|
||||
[report]
|
||||
omit =
|
||||
bot_bottle/egress_addon.py
|
||||
bot_bottle/cli/tui.py
|
||||
tests/*
|
||||
|
||||
@@ -0,0 +1,247 @@
|
||||
# PRD prd-new: Egress control plane — metering, budgets, and forced cutoff
|
||||
|
||||
- **Status:** Draft
|
||||
- **Author:** didericis
|
||||
- **Created:** 2026-06-25
|
||||
- **Issue:** #251
|
||||
|
||||
## Summary
|
||||
|
||||
Add an **out-of-band egress enforcement & observability plane**: meter every
|
||||
agent's token usage at the egress proxy, decrement budgets without the agent's
|
||||
cooperation, and forcibly cut a bottle's egress when a budget is exhausted —
|
||||
either automatically or on command from a host-level dashboard. The trigger
|
||||
(usage threshold) and the action (route-drop / freeze / kill) both live in the
|
||||
egress plane and run with no agent in the loop. This is distinct from the
|
||||
supervise sidecar (PRD 0013), which is agent-initiated and therefore cannot
|
||||
enforce a cost cutoff on a runaway agent. State (usage ledger, budgets, audit)
|
||||
moves into a host-level SQLite database behind a thin repository API, the first
|
||||
SQL store in an otherwise flat-file repo.
|
||||
|
||||
## Problem
|
||||
|
||||
bot-bottle can't currently do two things the cost-overrun case demands:
|
||||
|
||||
1. **Forced egress shutdown on limit.** When an agent crosses a token
|
||||
threshold, kill its egress automatically — no human in the loop.
|
||||
2. **Remote (host-level) management.** Drive agents from a single surface:
|
||||
see usage, cut egress, stop bottles, to prevent cost overruns.
|
||||
|
||||
The existing supervise sidecar (PRD 0013) is **entirely agent-initiated**: every
|
||||
action begins with the agent voluntarily calling an MCP tool and an operator
|
||||
approving it. A runaway or expensive agent — exactly the cost-overrun case —
|
||||
will never call `egress-block` on itself. Supervision is therefore a
|
||||
**collaborative recovery** mechanism, not an **enforcement** mechanism; making
|
||||
it mandatory (#249) would not deliver forced cost-cutoff.
|
||||
|
||||
The requirement forces a distinction the current design blurs:
|
||||
|
||||
- **Plane A — enforcement / observability (this PRD).** System → infrastructure.
|
||||
Meter usage, cut egress on threshold or command, account for cost.
|
||||
Out-of-band; independent of the agent. **Unconditional** — an enforcement
|
||||
plane you can opt out of isn't enforcement.
|
||||
- **Plane B — agent-facing recovery (the existing supervise sidecar).**
|
||||
Agent → operator, approval-gated. Useful interactively; meaningless for a
|
||||
headless agent with no operator watching its queue. Remains optional.
|
||||
|
||||
This PRD builds Plane A. It reframes the "always-on control" invariant of #249
|
||||
as "the egress control plane is always present" — a more defensible property
|
||||
than "every agent runs the agent-facing supervisor." Unsupervised
|
||||
(headless/CI/ephemeral) agents stay first-class: still subject to the mandatory
|
||||
meter + kill switch, they simply lack the agent-facing proposal tools they
|
||||
couldn't use anyway.
|
||||
|
||||
## Goals / Success Criteria
|
||||
|
||||
- The egress proxy meters every request to a metered API host (e.g.
|
||||
`api.anthropic.com`) and records authoritative token usage per bottle and per
|
||||
agent provider, with no agent cooperation.
|
||||
- A budget can be set at four scopes with deterministic precedence
|
||||
(**agent → bottle → parent bottle → global host budget**); the
|
||||
most-specific applicable budget governs.
|
||||
- When usage crosses a budget, the bottle's configured **cutoff policy**
|
||||
(`cutoff` | `freeze` | `kill`) fires automatically, executed host-side on the
|
||||
egress plane — never via the supervise queue.
|
||||
- An operator can, from a single **host-level TUI dashboard**, see live per-bottle
|
||||
usage against budget and command a cutoff/stop on demand.
|
||||
- Host budgets, default cutoff policy, and per-provider limits are declared in a
|
||||
new host-level `~/.bot-bottle/settings.yml`, parseable by `yaml_subset.py`.
|
||||
- All usage, budget state, and enforcement actions persist in a host-level
|
||||
SQLite DB behind a thin repository API, so the store can later be swapped for
|
||||
a cross-host cloud service.
|
||||
|
||||
## Non-goals
|
||||
|
||||
- **Remote control / cross-host control plane.** Web + mobile remote control,
|
||||
cross-host budgets, and the authn/transport they require are explicitly
|
||||
deferred. v1 is a **host-only TUI** with no remote surface.
|
||||
- **Dollar-denominated budgets.** Budgets are token counts keyed by agent
|
||||
provider, not currency. Price tables are out of scope.
|
||||
- **Migrating existing flat-file state into SQLite.** Resume `metadata.json`,
|
||||
transcripts, Dockerfile overrides, the supervise queue, and audit logs stay on
|
||||
the filesystem. Only the *new* metering/budget/enforcement ledger is SQL.
|
||||
- **Making the supervise sidecar (Plane B) mandatory.** Out of scope here; this
|
||||
PRD is the answer to "what should be unconditional" (Plane A), leaving #249's
|
||||
Plane-B question open.
|
||||
- **Per-request hard pre-send blocking as the primary mechanism.** The gate is
|
||||
budget-crossing detected at/after metering; a pre-flight estimator (below) is a
|
||||
refinement, not the core enforcement path.
|
||||
|
||||
## Design
|
||||
|
||||
### Two measurements: gate vs. account
|
||||
|
||||
There are two distinct needs, and they want different signals:
|
||||
|
||||
- **Account (authoritative).** Decrement the real budget from the API
|
||||
**response**, which already carries authoritative usage (Anthropic
|
||||
`input_tokens` / `output_tokens`, OpenAI `usage`). The egress addon already
|
||||
has a `response(flow)` hook (`bot_bottle/egress_addon.py:460`), so the real
|
||||
number is available with no extra network call. **Caveat:** agent traffic is
|
||||
mostly streaming SSE, so the response path must tail the stream for the final
|
||||
usage event rather than parse a single JSON body — scoped explicitly as work.
|
||||
- **Gate (estimate).** To block *before* sending, only the request is available,
|
||||
so an estimator / provider `count_tokens` endpoint is the only option.
|
||||
|
||||
Calling `count_tokens` for accounting would be both less accurate *and* an extra
|
||||
metered egress call per request, so accounting uses response `usage` and the
|
||||
estimator is reserved for the optional pre-flight gate.
|
||||
|
||||
### `count_tokens` on agent providers
|
||||
|
||||
Add an abstract `count_tokens(request) -> int` to the `AgentProvider`
|
||||
abstraction (`bot_bottle/agent_provider.py`):
|
||||
|
||||
- **Default** is a good-enough stdlib estimator. Prefer stdlib only; a small
|
||||
pip dependency *for the sidecar* is acceptable for the fallback if stdlib
|
||||
proves too inaccurate (this does not relax the package's stdlib-first stance —
|
||||
it would be a sidecar-only dep, like the bundle already carries).
|
||||
- **Built-in `claude`** uses Anthropic's token-counting endpoint;
|
||||
**built-in `codex`** uses OpenAI's. These are exact for the gate but cost a
|
||||
metered call, so they are gate-only; accounting still comes from the response.
|
||||
|
||||
### Budgets and precedence
|
||||
|
||||
Budgets are token counts keyed by **agent provider name** (the same names
|
||||
bottles already use). Four scopes, most-specific wins:
|
||||
|
||||
```
|
||||
agent → bottle → parent bottle → global (host)
|
||||
```
|
||||
|
||||
The global host budget is the highest-priority feature to ship (the cross-host
|
||||
control plane will eventually consume it); per-agent and per-bottle budgets
|
||||
override it for finer control. A budget can also be supplied **at bottle
|
||||
launch** (`--budget` or equivalent), overriding the settings.yml defaults for
|
||||
that run. Enforcement evaluates the effective budget as the
|
||||
nearest-defined scope at decrement time.
|
||||
|
||||
### `~/.bot-bottle/settings.yml`
|
||||
|
||||
New **host-level** settings file (the `~/.bot-bottle/` root, *not* the per-repo
|
||||
`.bot-bottle/` — host budgets must not be committed per-repo). Parsed by
|
||||
`yaml_subset.py`, so it must stay within that bounded subset (flat mappings,
|
||||
scalars; no anchors, no multi-line block scalars). Shape:
|
||||
|
||||
```yaml
|
||||
budget:
|
||||
claude: 5000000 # token budget keyed by agent provider
|
||||
codex: 2000000
|
||||
shutdown: cutoff # default cutoff policy: cutoff | freeze | kill
|
||||
```
|
||||
|
||||
### Forced cutoff and cutoff policy
|
||||
|
||||
On budget exhaustion (or an operator command), the configured per-bottle cutoff
|
||||
policy fires. The three policies map onto primitives that already exist:
|
||||
|
||||
- **`cutoff`** (default) — drop the bottle's `routes.yaml` to empty and reload
|
||||
(or isolate the bottle from the egress network); the agent/bottle keeps
|
||||
running but can no longer reach metered hosts. This is the route-drop already
|
||||
available on the egress plane (`bot_bottle/backend/egress_apply.py`).
|
||||
- **`freeze`** — commit/snapshot state, then kill the agent/bottle; resumable
|
||||
later via `bot_bottle/backend/freeze.py`.
|
||||
- **`kill`** — tear the bottle down without saving state (backend teardown).
|
||||
|
||||
The trigger lives in the metering path and the action in the egress/backend
|
||||
plane; **neither touches the supervise proposal queue** (design constraint from
|
||||
#251).
|
||||
|
||||
### Host-level SQLite store
|
||||
|
||||
**Decision: introduce SQLite now, narrowly.**
|
||||
|
||||
- **The dependency objection doesn't apply.** `sqlite3` is in the Python stdlib,
|
||||
so it does not break the AGENTS.md stdlib-first / no-runtime-pip stance — same
|
||||
category as the hand-rolled `yaml_subset.py`, except the stdlib already ships
|
||||
the whole engine.
|
||||
- **It fits the problem.** A *global* token budget decremented concurrently by N
|
||||
egress sidecars (today `~/.bot-bottle/` already has `state/`, `audit/`,
|
||||
`queue/` written by parallel bottles) is a read-modify-write race. Over JSON
|
||||
that means hand-rolled file locking; SQLite gives atomic transactions + WAL for
|
||||
free. The per-agent/per-bottle precedence rollup plus "sum across all bottles"
|
||||
is a `GROUP BY`, not an N-directory rescan.
|
||||
- **It rehearses the cloud swap.** "Wrap operations in an API so we can swap to a
|
||||
cloud service" maps directly onto a thin repository/DAO over SQLite → Postgres
|
||||
later. A JSON-file store is a worse rehearsal than SQL.
|
||||
|
||||
**Costs (real but bounded):** a new paradigm in a flat-file repo needs a
|
||||
`schema_version` table + idempotent startup migrations; SQLite serializes
|
||||
writers, so WAL mode + `busy_timeout` are required (a non-issue at a handful of
|
||||
bottles); test fixtures need temp DBs.
|
||||
|
||||
**Scope of the store:** one DB at `~/.bot-bottle/bot-bottle.db` behind a thin
|
||||
repository API. Only the **new** metering/budget/enforcement-audit ledger lives
|
||||
there. Existing per-bottle blobs (resume `metadata.json`, transcripts,
|
||||
Dockerfile overrides, supervise queue) stay on the filesystem — migrating them
|
||||
now is churn for no benefit and they lack the concurrency/aggregation problem.
|
||||
|
||||
### Host-level controller + dashboard
|
||||
|
||||
A single **host-level controller** owns the meter, budget evaluation, and the
|
||||
cutoff actions across all bottles (cf. `bot_bottle/cli/supervise.py`'s
|
||||
cross-bottle view), rather than a per-bottle daemon. v1 ships one host-level
|
||||
**TUI dashboard** that reads live usage-vs-budget from the SQLite store and
|
||||
offers on-demand cutoff/stop. The existing supervisor UI should eventually fold
|
||||
into this same dashboard; this PRD lays the host-level surface it will move to.
|
||||
|
||||
## Implementation chunks
|
||||
|
||||
Ordered, individually mergeable:
|
||||
|
||||
1. **SQLite repository foundation.** `~/.bot-bottle/bot-bottle.db`, schema +
|
||||
`schema_version` migrations, WAL + `busy_timeout`, thin repository API,
|
||||
temp-DB test fixtures. No behavior wired yet.
|
||||
2. **Metering at the egress proxy.** Parse authoritative response `usage`
|
||||
(including SSE final-usage tailing) in the egress addon `response` hook;
|
||||
write per-bottle / per-provider usage rows to the ledger.
|
||||
3. **`settings.yml` + budget model.** Host-level `~/.bot-bottle/settings.yml`
|
||||
parsed by `yaml_subset.py`; budget precedence (agent → bottle → parent →
|
||||
global) and the `--budget` launch flag.
|
||||
4. **Forced cutoff + cutoff policy.** Wire the threshold trigger to the
|
||||
`cutoff` / `freeze` / `kill` primitives on the egress/backend plane; record
|
||||
enforcement actions to the audit ledger.
|
||||
5. **Host-level TUI dashboard.** Live usage-vs-budget view + on-demand
|
||||
cutoff/stop, reading the store.
|
||||
6. **`count_tokens` pre-flight gate (optional refinement).** Abstract method +
|
||||
stdlib estimator default; Anthropic/OpenAI endpoints for built-in
|
||||
claude/codex; optional pre-send block.
|
||||
|
||||
## Open questions
|
||||
|
||||
- **SSE usage tailing robustness.** Buffering streamed responses to extract the
|
||||
final usage event without breaking the agent's own stream consumption — how
|
||||
much of the body must the addon hold, and what's the failure mode if the
|
||||
stream is interrupted mid-flight?
|
||||
- **Crossing mid-request.** A single response can push usage past budget only
|
||||
*after* it's already been delivered. Is post-hoc cutoff (next request blocked)
|
||||
sufficient, or is a pre-flight estimator gate (chunk 6) required for v1?
|
||||
- **Provider name ↔ metered host mapping.** How does the proxy attribute a
|
||||
flow to an agent-provider budget key — by destination host, by bottle
|
||||
identity, or both?
|
||||
- **Parent-bottle budget semantics.** For `bottle extends` (PRD 0025 / 0065)
|
||||
chains, does "parent bottle" mean the manifest parent, the launching bottle,
|
||||
or the full ancestry summed?
|
||||
- **Dashboard ↔ controller transport (even host-only).** In-process, a local
|
||||
socket, or polling the SQLite store directly? Picks the seam the future remote
|
||||
control plane will extend.
|
||||
@@ -1,525 +0,0 @@
|
||||
"""Unit: EgressAddon request/response decision flow (issue #286).
|
||||
|
||||
`egress_addon.py` is the sidecar-only mitmproxy adapter that wires the
|
||||
host-importable decision logic in `egress_addon_core` into mitmproxy's
|
||||
request/response hooks. The core logic is exercised directly by
|
||||
`test_egress_addon_core.py`; the redaction logging by
|
||||
`test_egress_addon_log_redaction.py`. This file covers the adapter glue
|
||||
itself — `request()`, `response()`, `websocket_message()`, introspection,
|
||||
auth injection, git push/fetch blocking and the outbound-DLP policy
|
||||
branches — so `bot_bottle/egress_addon.py` no longer has to be omitted
|
||||
from coverage.
|
||||
|
||||
mitmproxy is not installed on the host, so we pre-populate `sys.modules`
|
||||
with the minimum stubs needed to import the adapter (a `mitmproxy.http`
|
||||
module exposing a `Response` with `.make`, plus the flat
|
||||
`egress_addon_core` name the sidecar uses)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import sys
|
||||
import tempfile
|
||||
import types
|
||||
import unittest
|
||||
from io import StringIO
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Stub flow objects (mirror the slice of mitmproxy's API the adapter uses)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class _Headers:
|
||||
"""Case-insensitive header map covering the subset of mitmproxy's
|
||||
Headers API the adapter touches: items/get/pop/__setitem__/dict()."""
|
||||
|
||||
def __init__(self, d: dict[str, str] | None = None) -> None:
|
||||
self._d: dict[str, str] = dict(d or {})
|
||||
|
||||
def _find(self, key: str) -> str | None:
|
||||
return next((k for k in self._d if k.lower() == key.lower()), None)
|
||||
|
||||
def items(self) -> list[tuple[str, str]]:
|
||||
return list(self._d.items())
|
||||
|
||||
def keys(self) -> list[str]:
|
||||
return list(self._d.keys())
|
||||
|
||||
def __iter__(self) -> Any:
|
||||
return iter(self._d)
|
||||
|
||||
def __getitem__(self, key: str) -> str:
|
||||
k = self._find(key)
|
||||
if k is None:
|
||||
raise KeyError(key)
|
||||
return self._d[k]
|
||||
|
||||
def __setitem__(self, key: str, value: str) -> None:
|
||||
self._d[self._find(key) or key] = value
|
||||
|
||||
def __contains__(self, key: str) -> bool:
|
||||
return self._find(key) is not None
|
||||
|
||||
def get(self, key: str, default: str | None = None) -> str | None:
|
||||
k = self._find(key)
|
||||
return self._d[k] if k is not None else default
|
||||
|
||||
def pop(self, key: str, default: str | None = None) -> str | None:
|
||||
k = self._find(key)
|
||||
return self._d.pop(k) if k is not None else default
|
||||
|
||||
|
||||
class _Response:
|
||||
def __init__(
|
||||
self,
|
||||
status_code: int = 200,
|
||||
headers: dict[str, str] | None = None,
|
||||
content: bytes | str = b"",
|
||||
) -> None:
|
||||
self.status_code = status_code
|
||||
self.headers = _Headers(headers)
|
||||
self._body = (
|
||||
content if isinstance(content, str)
|
||||
else content.decode("utf-8", "replace")
|
||||
)
|
||||
|
||||
def get_text(self, *, strict: bool = True) -> str:
|
||||
del strict
|
||||
return self._body
|
||||
|
||||
@classmethod
|
||||
def make(
|
||||
cls,
|
||||
status_code: int = 200,
|
||||
content: bytes | str = b"",
|
||||
headers: dict[str, str] | None = None,
|
||||
) -> "_Response":
|
||||
return cls(status_code, headers, content)
|
||||
|
||||
|
||||
class _Request:
|
||||
def __init__(
|
||||
self,
|
||||
host: str = "api.example.com",
|
||||
method: str = "GET",
|
||||
path: str = "/v1/messages",
|
||||
headers: dict[str, str] | None = None,
|
||||
body: str = "",
|
||||
) -> None:
|
||||
self.pretty_host = host
|
||||
self.method = method
|
||||
self.path = path
|
||||
self.headers = _Headers(headers)
|
||||
self._body = body
|
||||
|
||||
def get_text(self, *, strict: bool = True) -> str:
|
||||
del strict
|
||||
return self._body
|
||||
|
||||
@property
|
||||
def text(self) -> str:
|
||||
return self._body
|
||||
|
||||
@text.setter
|
||||
def text(self, value: str) -> None:
|
||||
self._body = value
|
||||
|
||||
|
||||
class _Flow:
|
||||
def __init__(
|
||||
self,
|
||||
request: _Request | None = None,
|
||||
response: _Response | None = None,
|
||||
) -> None:
|
||||
self.request = request or _Request()
|
||||
self.response = response
|
||||
self.websocket: Any = None
|
||||
self.killed = False
|
||||
|
||||
def kill(self) -> None:
|
||||
self.killed = True
|
||||
|
||||
|
||||
class _Message:
|
||||
def __init__(self, content: bytes, from_client: bool) -> None:
|
||||
self.content = content
|
||||
self.from_client = from_client
|
||||
|
||||
|
||||
class _WebSocketData:
|
||||
def __init__(self, messages: list[_Message]) -> None:
|
||||
self.messages = messages
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Sidecar-import shims — must run before importing egress_addon
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _ensure_shims() -> None:
|
||||
mm = sys.modules.get("mitmproxy")
|
||||
if mm is None:
|
||||
mm = types.ModuleType("mitmproxy")
|
||||
sys.modules["mitmproxy"] = mm
|
||||
mh = sys.modules.get("mitmproxy.http")
|
||||
if mh is None:
|
||||
mh = types.ModuleType("mitmproxy.http")
|
||||
sys.modules["mitmproxy.http"] = mh
|
||||
setattr(mm, "http", mh)
|
||||
# Other egress_addon tests may have registered an empty mitmproxy.http;
|
||||
# make sure the Response/HTTPFlow attrs the request flow needs exist.
|
||||
if not hasattr(mh, "Response"):
|
||||
setattr(mh, "Response", _Response)
|
||||
if not hasattr(mh, "HTTPFlow"):
|
||||
setattr(mh, "HTTPFlow", object)
|
||||
if "egress_addon_core" not in sys.modules:
|
||||
import bot_bottle.egress_addon_core as _core
|
||||
sys.modules["egress_addon_core"] = _core
|
||||
|
||||
|
||||
_ensure_shims()
|
||||
|
||||
import bot_bottle.egress_addon as _ea_mod # noqa: E402 (after shims)
|
||||
from bot_bottle.egress_addon import EgressAddon # noqa: E402 (after shims)
|
||||
from bot_bottle.egress_addon_core import ( # noqa: E402
|
||||
Config,
|
||||
LOG_BLOCKS,
|
||||
Route,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
_OPENAI_KEY = "sk-" + "A" * 48
|
||||
|
||||
|
||||
def _addon(config: Config) -> EgressAddon:
|
||||
"""Bare EgressAddon with a supplied config and no supervise wiring."""
|
||||
a: EgressAddon = EgressAddon.__new__(EgressAddon)
|
||||
a.config = config
|
||||
a.safe_tokens = set()
|
||||
a._supervise_queue_dir = ""
|
||||
a._supervise_slug = ""
|
||||
a._token_allow_timeout = 300.0
|
||||
a.routes_path = "/nonexistent/routes.yaml"
|
||||
return a
|
||||
|
||||
|
||||
def _run_request(addon: EgressAddon, flow: _Flow) -> None:
|
||||
asyncio.run(addon.request(flow)) # type: ignore[arg-type]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Introspection endpoint
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestIntrospection(unittest.TestCase):
|
||||
def test_allowlist_endpoint_lists_routes(self) -> None:
|
||||
addon = _addon(Config(routes=(Route(host="api.example.com"),)))
|
||||
flow = _Flow(_Request(host="_egress.local", path="/allowlist"))
|
||||
_run_request(addon, flow)
|
||||
assert flow.response is not None
|
||||
self.assertEqual(200, flow.response.status_code)
|
||||
payload = json.loads(flow.response.get_text())
|
||||
self.assertEqual(["api.example.com"], [r["host"] for r in payload["routes"]])
|
||||
|
||||
def test_unknown_endpoint_404(self) -> None:
|
||||
addon = _addon(Config(routes=()))
|
||||
flow = _Flow(_Request(host="_egress.local", path="/nope"))
|
||||
_run_request(addon, flow)
|
||||
assert flow.response is not None
|
||||
self.assertEqual(404, flow.response.status_code)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Allowlist enforcement
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestAllowlist(unittest.TestCase):
|
||||
def test_unlisted_host_blocked_403(self) -> None:
|
||||
addon = _addon(Config(routes=(Route(host="allowed.example.com"),)))
|
||||
flow = _Flow(_Request(host="evil.example.com"))
|
||||
_run_request(addon, flow)
|
||||
assert flow.response is not None
|
||||
self.assertEqual(403, flow.response.status_code)
|
||||
self.assertIn("allowlist", flow.response.get_text())
|
||||
|
||||
def test_listed_host_forwarded_no_response_written(self) -> None:
|
||||
addon = _addon(Config(routes=(Route(host="api.example.com"),)))
|
||||
flow = _Flow(_Request(host="api.example.com"))
|
||||
_run_request(addon, flow)
|
||||
# forward == adapter leaves flow.response untouched for the upstream
|
||||
self.assertIsNone(flow.response)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Authorization stripping + injection
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestAuthInjection(unittest.TestCase):
|
||||
def test_agent_authorization_stripped_and_real_token_injected(self) -> None:
|
||||
route = Route(host="api.example.com", auth_scheme="Bearer", token_env="EGRESS_TOKEN_0")
|
||||
addon = _addon(Config(routes=(route,)))
|
||||
flow = _Flow(_Request(host="api.example.com", headers={"authorization": "Bearer agent-faked"}))
|
||||
with patch.dict("os.environ", {"EGRESS_TOKEN_0": "real-sidecar-token"}):
|
||||
_run_request(addon, flow)
|
||||
self.assertEqual("Bearer real-sidecar-token", flow.request.headers.get("authorization"))
|
||||
self.assertIsNone(flow.response)
|
||||
|
||||
def test_auth_route_with_unset_env_blocks(self) -> None:
|
||||
route = Route(
|
||||
host="api.example.com", auth_scheme="Bearer", token_env="EGRESS_TOKEN_MISSING",
|
||||
)
|
||||
addon = _addon(Config(routes=(route,)))
|
||||
flow = _Flow(_Request(host="api.example.com"))
|
||||
with patch.dict("os.environ", {}, clear=False):
|
||||
import os
|
||||
os.environ.pop("EGRESS_TOKEN_MISSING", None)
|
||||
_run_request(addon, flow)
|
||||
assert flow.response is not None
|
||||
self.assertEqual(403, flow.response.status_code)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# git push / fetch over HTTPS
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestGitOverHttps(unittest.TestCase):
|
||||
def test_git_push_blocked(self) -> None:
|
||||
addon = _addon(Config(routes=(Route(host="git.example.com"),)))
|
||||
flow = _Flow(_Request(
|
||||
host="git.example.com",
|
||||
method="POST",
|
||||
path="/repo.git/git-receive-pack",
|
||||
))
|
||||
_run_request(addon, flow)
|
||||
assert flow.response is not None
|
||||
self.assertEqual(403, flow.response.status_code)
|
||||
self.assertIn("git push over HTTPS", flow.response.get_text())
|
||||
|
||||
def test_git_fetch_blocked_on_non_fetch_route(self) -> None:
|
||||
addon = _addon(Config(routes=(Route(host="git.example.com"),)))
|
||||
flow = _Flow(_Request(
|
||||
host="git.example.com",
|
||||
path="/repo.git/info/refs",
|
||||
))
|
||||
flow.request.path = "/repo.git/info/refs?service=git-upload-pack"
|
||||
_run_request(addon, flow)
|
||||
assert flow.response is not None
|
||||
self.assertEqual(403, flow.response.status_code)
|
||||
|
||||
def test_git_fetch_allowed_on_fetch_route(self) -> None:
|
||||
addon = _addon(Config(routes=(Route(host="git.example.com", git_fetch=True),)))
|
||||
flow = _Flow(_Request(
|
||||
host="git.example.com",
|
||||
path="/repo.git/info/refs?service=git-upload-pack",
|
||||
))
|
||||
_run_request(addon, flow)
|
||||
self.assertIsNone(flow.response)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Outbound DLP policy branches
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestOutboundDlpPolicy(unittest.TestCase):
|
||||
def test_block_policy_hard_403(self) -> None:
|
||||
route = Route(host="api.example.com", outbound_on_match="block")
|
||||
addon = _addon(Config(routes=(route,)))
|
||||
flow = _Flow(_Request(host="api.example.com", method="POST", body=f"key={_OPENAI_KEY}"))
|
||||
_run_request(addon, flow)
|
||||
assert flow.response is not None
|
||||
self.assertEqual(403, flow.response.status_code)
|
||||
self.assertIn("DLP", flow.response.get_text())
|
||||
|
||||
def test_redact_policy_scrubs_and_forwards(self) -> None:
|
||||
route = Route(host="api.example.com", outbound_on_match="redact")
|
||||
addon = _addon(Config(routes=(route,)))
|
||||
flow = _Flow(_Request(host="api.example.com", method="POST", body=f"key={_OPENAI_KEY}"))
|
||||
_run_request(addon, flow)
|
||||
self.assertIsNone(flow.response) # forwarded
|
||||
self.assertNotIn(_OPENAI_KEY, flow.request.get_text())
|
||||
|
||||
def test_supervise_default_without_wiring_blocks(self) -> None:
|
||||
# outbound_on_match unset -> supervise default; no supervise queue wired
|
||||
# -> fail closed with a hard 403.
|
||||
route = Route(host="api.example.com")
|
||||
addon = _addon(Config(routes=(route,)))
|
||||
flow = _Flow(_Request(host="api.example.com", method="POST", body=f"key={_OPENAI_KEY}"))
|
||||
_run_request(addon, flow)
|
||||
assert flow.response is not None
|
||||
self.assertEqual(403, flow.response.status_code)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Outbound DLP supervise branch (operator approval round-trip)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _fake_sv(response_status: str | None) -> types.SimpleNamespace:
|
||||
"""Stand-in for the `supervise` module the adapter queues proposals to.
|
||||
|
||||
`response_status` of None models a timeout (read_response never returns a
|
||||
decision); a status string models the operator's eventual answer."""
|
||||
def _new_proposal(**_kw: Any) -> Any:
|
||||
return types.SimpleNamespace(id="prop-1")
|
||||
|
||||
def _sha256_hex(_payload: Any) -> str:
|
||||
return "hash"
|
||||
|
||||
def _noop(_a: Any, _b: Any) -> None:
|
||||
return None
|
||||
|
||||
def _read_response(_qd: Any, _pid: Any) -> Any:
|
||||
if response_status is None:
|
||||
raise OSError("not written yet") # forces poll -> timeout
|
||||
return types.SimpleNamespace(status=response_status)
|
||||
|
||||
ns = types.SimpleNamespace()
|
||||
ns.STATUS_APPROVED = "approved"
|
||||
ns.STATUS_MODIFIED = "modified"
|
||||
ns.TOOL_EGRESS_TOKEN_ALLOW = "egress_token_allow"
|
||||
ns.Proposal = types.SimpleNamespace(new=_new_proposal)
|
||||
ns.sha256_hex = _sha256_hex
|
||||
ns.write_proposal = _noop
|
||||
ns.archive_proposal = _noop
|
||||
ns.read_response = _read_response
|
||||
return ns
|
||||
|
||||
|
||||
class TestSuperviseBranch(unittest.TestCase):
|
||||
def _supervised_addon(self) -> EgressAddon:
|
||||
addon = _addon(Config(routes=(Route(host="api.example.com"),)))
|
||||
addon._supervise_queue_dir = "/tmp/egress-queue"
|
||||
addon._supervise_slug = "test-bottle"
|
||||
addon._token_allow_timeout = 0.05
|
||||
return addon
|
||||
|
||||
def test_operator_approval_allows_token_and_forwards(self) -> None:
|
||||
addon = self._supervised_addon()
|
||||
flow = _Flow(_Request(host="api.example.com", method="POST", body=f"k={_OPENAI_KEY}"))
|
||||
with patch.object(_ea_mod, "_sv", _fake_sv("approved")):
|
||||
_run_request(addon, flow)
|
||||
self.assertIsNone(flow.response) # forwarded after approval
|
||||
self.assertIn(_OPENAI_KEY, addon.safe_tokens)
|
||||
|
||||
def test_operator_rejection_blocks(self) -> None:
|
||||
addon = self._supervised_addon()
|
||||
flow = _Flow(_Request(host="api.example.com", method="POST", body=f"k={_OPENAI_KEY}"))
|
||||
with patch.object(_ea_mod, "_sv", _fake_sv("rejected")):
|
||||
_run_request(addon, flow)
|
||||
assert flow.response is not None
|
||||
self.assertEqual(403, flow.response.status_code)
|
||||
self.assertIn("rejected", flow.response.get_text())
|
||||
|
||||
def test_supervise_timeout_blocks(self) -> None:
|
||||
addon = self._supervised_addon()
|
||||
flow = _Flow(_Request(host="api.example.com", method="POST", body=f"k={_OPENAI_KEY}"))
|
||||
with patch.object(_ea_mod, "_sv", _fake_sv(None)):
|
||||
_run_request(addon, flow)
|
||||
assert flow.response is not None
|
||||
self.assertEqual(403, flow.response.status_code)
|
||||
self.assertIn("timed out", flow.response.get_text())
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Inbound DLP on responses
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestInboundResponseScan(unittest.TestCase):
|
||||
def test_clean_response_untouched(self) -> None:
|
||||
route = Route(host="api.example.com")
|
||||
addon = _addon(Config(routes=(route,)))
|
||||
flow = _Flow(
|
||||
_Request(host="api.example.com"),
|
||||
_Response(200, content='{"ok": true}'),
|
||||
)
|
||||
addon.response(flow) # type: ignore[arg-type]
|
||||
assert flow.response is not None
|
||||
self.assertEqual(200, flow.response.status_code)
|
||||
|
||||
def test_response_for_unlisted_host_is_noop(self) -> None:
|
||||
addon = _addon(Config(routes=()))
|
||||
flow = _Flow(_Request(host="api.example.com"), _Response(200, content="x"))
|
||||
addon.response(flow) # type: ignore[arg-type]
|
||||
assert flow.response is not None
|
||||
self.assertEqual(200, flow.response.status_code)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# WebSocket frame scanning
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestWebSocket(unittest.TestCase):
|
||||
def test_outbound_frame_with_token_kills_connection(self) -> None:
|
||||
route = Route(host="api.example.com")
|
||||
addon = _addon(Config(routes=(route,)))
|
||||
flow = _Flow(_Request(host="api.example.com"))
|
||||
flow.websocket = _WebSocketData([_Message(f"k={_OPENAI_KEY}".encode(), from_client=True)])
|
||||
addon.websocket_message(flow) # type: ignore[arg-type]
|
||||
self.assertTrue(flow.killed)
|
||||
|
||||
def test_clean_outbound_frame_passes(self) -> None:
|
||||
route = Route(host="api.example.com")
|
||||
addon = _addon(Config(routes=(route,)))
|
||||
flow = _Flow(_Request(host="api.example.com"))
|
||||
flow.websocket = _WebSocketData([_Message(b"hello world", from_client=True)])
|
||||
addon.websocket_message(flow) # type: ignore[arg-type]
|
||||
self.assertFalse(flow.killed)
|
||||
|
||||
def test_unlisted_host_websocket_is_noop(self) -> None:
|
||||
addon = _addon(Config(routes=()))
|
||||
flow = _Flow(_Request(host="api.example.com"))
|
||||
flow.websocket = _WebSocketData([_Message(f"k={_OPENAI_KEY}".encode(), from_client=True)])
|
||||
addon.websocket_message(flow) # type: ignore[arg-type]
|
||||
self.assertFalse(flow.killed)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _block logging + config reload via the real file path
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestBlockLoggingAndReload(unittest.TestCase):
|
||||
def test_block_emits_json_log_when_enabled(self) -> None:
|
||||
addon = _addon(Config(routes=(Route(host="allowed.example.com"),), log=LOG_BLOCKS))
|
||||
flow = _Flow(_Request(host="evil.example.com"))
|
||||
buf = StringIO()
|
||||
with patch("sys.stderr", buf):
|
||||
_run_request(addon, flow)
|
||||
logged = [json.loads(line) for line in buf.getvalue().splitlines() if line.strip()]
|
||||
self.assertTrue(any(e.get("event") == "egress_block" for e in logged))
|
||||
|
||||
def test_init_loads_routes_from_file(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as d:
|
||||
routes = Path(d) / "routes.yaml"
|
||||
routes.write_text("routes:\n - host: api.example.com\n", encoding="utf-8")
|
||||
with patch.dict("os.environ", {"EGRESS_ROUTES": str(routes)}):
|
||||
addon = EgressAddon()
|
||||
self.assertEqual(("api.example.com",), tuple(r.host for r in addon.config.routes))
|
||||
|
||||
def test_init_missing_routes_file_is_empty_config(self) -> None:
|
||||
with patch.dict("os.environ", {"EGRESS_ROUTES": "/no/such/routes.yaml"}):
|
||||
buf = StringIO()
|
||||
with patch("sys.stderr", buf):
|
||||
addon = EgressAddon()
|
||||
self.assertEqual((), addon.config.routes)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user