DLP hot-path perf + manifest load_for_agent split #310

Merged
didericis-claude merged 2 commits from dlp-perf-manifest-cleanup into main 2026-06-26 23:03:41 -04:00
3 changed files with 91 additions and 32 deletions
+38 -10
View File
@@ -126,8 +126,30 @@ def redact_tokens(
# Known secrets detector # Known secrets detector
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Encoded-variant cache. Provisioned secrets are stable for the life of the
# proxy, but `_encoded_variants` is on the per-request hot path — it runs for
# every secret on every redaction and known-secret scan (host, path, each
# header, body). Deriving the variant set is relatively expensive (gzip +
# nine encodings), so memoize it per distinct secret. The proxy process
# already holds these values in `os.environ`, so caching them here adds no
# new exposure.
_VARIANT_CACHE: dict[str, tuple[str, ...]] = {}
def _encoded_variants(secret: str) -> list[str]: def _encoded_variants(secret: str) -> list[str]:
"""Return the secret plus common encoded variants for exfil detection.""" """Return the secret plus common encoded variants for exfil detection.
The variant set is computed once per distinct secret and cached; callers
get a fresh list so they can't mutate the shared cached tuple."""
cached = _VARIANT_CACHE.get(secret)
if cached is None:
cached = _compute_encoded_variants(secret)
_VARIANT_CACHE[secret] = cached
return list(cached)
def _compute_encoded_variants(secret: str) -> tuple[str, ...]:
"""Derive the secret plus its encoded variants (uncached)."""
seen: set[str] = {secret} seen: set[str] = {secret}
variants: list[str] = [secret] variants: list[str] = [secret]
@@ -161,7 +183,7 @@ def _encoded_variants(secret: str) -> list[str]:
# gzip + base64 (deterministic: mtime=0); recognisable by H4sI prefix # gzip + base64 (deterministic: mtime=0); recognisable by H4sI prefix
_add(base64.b64encode(gzip.compress(secret_bytes, mtime=0)).decode("ascii")) _add(base64.b64encode(gzip.compress(secret_bytes, mtime=0)).decode("ascii"))
return variants return tuple(variants)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -187,18 +209,24 @@ def _alnum_projection(text: str) -> str:
def _find_partial_window(secret_alnum: str, text_alnum: str, min_len: int) -> int | None: def _find_partial_window(secret_alnum: str, text_alnum: str, min_len: int) -> int | None:
"""Return the position in text_alnum where any min_len-char window of """Return the earliest position in text_alnum holding a min_len-char window
secret_alnum first appears, or None. that also appears in secret_alnum, or None.
Slides a window of width min_len across secret_alnum and searches for The secret's set of min_len-grams is small (bounded by the secret length),
each window in text_alnum. The first hit position is returned. so building it once and sweeping the text a single time is O(len(text))
rather than the O(len(secret) * len(text)) of repeated substring searches —
which matters because this runs per provisioned secret on every request
body. Coverage is unchanged: a hit still means at least min_len consecutive
alphanumeric characters of the secret leaked into the text.
""" """
if len(secret_alnum) < min_len or len(text_alnum) < min_len: if len(secret_alnum) < min_len or len(text_alnum) < min_len:
return None return None
for i in range(len(secret_alnum) - min_len + 1): secret_grams = {
window = secret_alnum[i:i + min_len] secret_alnum[i:i + min_len]
pos = text_alnum.find(window) for i in range(len(secret_alnum) - min_len + 1)
if pos >= 0: }
for pos in range(len(text_alnum) - min_len + 1):
if text_alnum[pos:pos + min_len] in secret_grams:
return pos return pos
return None return None
+42 -22
View File
@@ -213,6 +213,20 @@ def _merge_git_user(
) )
def _manifest_with_merged_git_user(
agent: "ManifestAgent", raw_bottle: "ManifestBottle"
) -> "Manifest":
"""Build the single-value Manifest, overlaying the agent's git-gate.user
onto the bottle (agent wins on non-empty, per-field). Shared by the eager
and lazy load_for_agent paths."""
merged = _merge_git_user(agent.git_user, raw_bottle.git_user)
bottle = (
raw_bottle if merged == raw_bottle.git_user
else replace(raw_bottle, git_user=merged)
)
return Manifest(agent=agent, bottle=bottle)
def _resolve_effective_bottle_eager( def _resolve_effective_bottle_eager(
agent_name: str, agent_name: str,
agent: "ManifestAgent", agent: "ManifestAgent",
@@ -468,24 +482,33 @@ class ManifestIndex:
Always raises ManifestError if the agent is unknown or invalid. Always raises ManifestError if the agent is unknown or invalid.
Backends call this at preflight inside _validate.""" Backends call this at preflight inside _validate."""
effective_bottle_names: tuple[str, ...] = bottle_names or () effective_bottle_names: tuple[str, ...] = bottle_names or ()
if self.home_md is None: if self.home_md is None:
# Eager manifest (from_json_obj): data already parsed; filter to return self._load_for_agent_eager(agent_name, effective_bottle_names)
# the one requested agent and its bottle so the returned Manifest return self._load_for_agent_lazy(agent_name, effective_bottle_names)
# always holds exactly one agent and one bottle regardless of path.
if agent_name not in self.agents:
available = ", ".join(sorted(self.agents.keys())) or "(none)"
raise ManifestError(
f"agent '{agent_name}' not defined. Available: {available}"
)
agent = self.agents[agent_name]
raw_bottle = _resolve_effective_bottle_eager(
agent_name, agent, effective_bottle_names, self.bottles
)
merged = _merge_git_user(agent.git_user, raw_bottle.git_user)
bottle = raw_bottle if merged == raw_bottle.git_user else replace(raw_bottle, git_user=merged)
return Manifest(agent=agent, bottle=bottle)
def _load_for_agent_eager(
self, agent_name: str, bottle_names: tuple[str, ...]
) -> "Manifest":
"""Eager path (from_json_obj): data is already parsed; filter to the one
requested agent and its bottle so the returned Manifest always holds
exactly one agent and one bottle regardless of path."""
if agent_name not in self.agents:
available = ", ".join(sorted(self.agents.keys())) or "(none)"
raise ManifestError(
f"agent '{agent_name}' not defined. Available: {available}"
)
agent = self.agents[agent_name]
raw_bottle = _resolve_effective_bottle_eager(
agent_name, agent, bottle_names, self.bottles
)
return _manifest_with_merged_git_user(agent, raw_bottle)
def _load_for_agent_lazy(
self, agent_name: str, bottle_names: tuple[str, ...]
) -> "Manifest":
"""Lazy path (resolve/from_md_dirs): read and parse the agent file and
its bottle chain from disk for the first time here."""
assert self.home_md is not None # guaranteed by load_for_agent dispatch
from .manifest_loader import scan_agent_names from .manifest_loader import scan_agent_names
from .manifest_schema import validate_agent_frontmatter_keys from .manifest_schema import validate_agent_frontmatter_keys
from .yaml_subset import YamlSubsetError, parse_frontmatter from .yaml_subset import YamlSubsetError, parse_frontmatter
@@ -517,11 +540,10 @@ class ManifestIndex:
agent_bottle = fm.get("bottle") or "" agent_bottle = fm.get("bottle") or ""
bottles_dir = self.home_md / "bottles" bottles_dir = self.home_md / "bottles"
raw_bottle = _resolve_effective_bottle_lazy( raw_bottle = _resolve_effective_bottle_lazy(
agent_name, str(agent_bottle), effective_bottle_names, bottles_dir agent_name, str(agent_bottle), bottle_names, bottles_dir
) )
effective_bottle_name = ( effective_bottle_name = (
effective_bottle_names[-1] if effective_bottle_names bottle_names[-1] if bottle_names else str(agent_bottle)
else str(agent_bottle)
) )
# Build and validate the full ManifestAgent. # Build and validate the full ManifestAgent.
@@ -539,9 +561,7 @@ class ManifestIndex:
known = {effective_bottle_name} if effective_bottle_name else set() known = {effective_bottle_name} if effective_bottle_name else set()
agent = ManifestAgent.from_dict(agent_name, agent_dict, known) agent = ManifestAgent.from_dict(agent_name, agent_dict, known)
merged_user = _merge_git_user(agent.git_user, raw_bottle.git_user) return _manifest_with_merged_git_user(agent, raw_bottle)
bottle = raw_bottle if merged_user == raw_bottle.git_user else replace(raw_bottle, git_user=merged_user)
return Manifest(agent=agent, bottle=bottle)
def has_agent(self, name: str) -> bool: def has_agent(self, name: str) -> bool:
return name in self.agents return name in self.agents
+11
View File
@@ -281,6 +281,17 @@ class TestEncodedVariants(unittest.TestCase):
v = self._variants() v = self._variants()
self.assertEqual(len(v), len(set(v))) self.assertEqual(len(v), len(set(v)))
def test_repeated_calls_equal(self):
# Memoization must not change observable output.
self.assertEqual(self._variants(), self._variants())
def test_returns_fresh_list_each_call(self):
# Callers mutate/iterate the result; the cached set must not be
# exposed by reference, or one caller could corrupt another's view.
first = self._variants()
first.append("MUTATED")
self.assertNotIn("MUTATED", self._variants())
class TestUnicodeNormalization(unittest.TestCase): class TestUnicodeNormalization(unittest.TestCase):
def test_fullwidth_chars_normalized(self): def test_fullwidth_chars_normalized(self):