Compare commits

..

2 Commits

Author SHA1 Message Date
didericis-claude df469b2f47 docs: add role and git.fetch to egress route fields table
Both fields were missing from the reference table added in the preceding
commit — `role` is visible in examples/bottles/claude.md and `git.fetch`
is documented in PRD 0052 but neither appeared in the README table.
2026-06-22 18:31:32 +00:00
didericis d1d9e7a105 docs: document egress matches, dlp fields, and detector defaults
lint / lint (push) Successful in 1m32s
2026-06-19 21:58:20 -04:00
25 changed files with 187 additions and 682 deletions
+26 -2
View File
@@ -14,7 +14,7 @@
## Features
- **Per-bottle egress allowlist** — TLS-bumped HTTP/HTTPS chokepoint with a per-manifest host allowlist and request-body DLP scanner; DoH and arbitrary hosts blocked by default.
- **Per-bottle egress allowlist** — TLS-bumped HTTP/HTTPS chokepoint with a per-manifest host allowlist; per-route path/method/header `matches` filtering; outbound DLP scanning for known tokens and secrets, inbound DLP scanning for prompt-injection attempts; DoH and arbitrary hosts blocked by default.
- **Tokens the agent never sees** — host secrets live in a sidecar; the agent dials `http://sidecar:9099/<path>` and the proxy strips inbound `Authorization` and injects the real token before forwarding. `printenv` in the agent shows proxy URLs only.
- **Gitleaks-scanned push (git-gate)** — `bottle.git` remotes route through a per-bottle `git daemon` that gitleaks-scans incoming refs pre-receive and forwards clean refs upstream over SSH. The agent never holds the upstream credential.
- **Manifest-scoped skills + secrets** — each bottle declares its skills, env, git identity, remotes, and egress routes; unknown keys die at load.
@@ -106,8 +106,15 @@ egress:
routes:
- host: gitea.dideric.is
auth:
scheme: token
scheme: token # Bearer | token
token_ref: BOT_BOTTLE_GITEA_TOKEN
matches: # optional — restrict to specific paths/methods/headers
- paths:
- {type: prefix, value: /api/v1/}
methods: [GET, POST, PATCH, DELETE]
dlp: # optional — per-route detector overrides (default: all on)
outbound_detectors: [token_patterns, known_secrets]
inbound_detectors: false # disable response scanning for this host
---
The `gitea-dev` bottle. Provider auth via the inherited Claude route;
@@ -126,6 +133,23 @@ skills:
You help maintain Gitea-hosted projects.
````
**Egress route fields:**
| Field | Required | Description |
|---|---|---|
| `host` | yes | Hostname to allowlist. One entry per host. |
| `role` | no | Provider-specific role string (e.g. `claude_code_oauth`). Wires built-in auth flows; set by provider templates, not manually. |
| `auth.scheme` | when `auth` present | `Bearer` or `token`. Injected by the proxy; the agent never sees the value. |
| `auth.token_ref` | when `auth` present | Env-var name holding the secret on the host. |
| `matches` | no | Array of `{paths, methods, headers}` filters. A request must match at least one entry (if any are given) to be forwarded. |
| `matches[].paths` | no | Array of `{type, value}`. `type` is `prefix` (default), `exact`, or `regex`. |
| `matches[].methods` | no | Array of HTTP method strings, e.g. `[GET, POST]`. |
| `matches[].headers` | no | Array of `{name, value, type}`. `type` is `exact` (default) or `regex`. |
| `dlp` | no | Per-route DLP overrides. Omit to use defaults (all detectors on). |
| `dlp.outbound_detectors` | no | `false` disables outbound scanning; list restricts to named detectors (`token_patterns`, `known_secrets`). |
| `dlp.inbound_detectors` | no | `false` disables inbound scanning; list restricts to named detectors (`naive_injection_detection`). |
| `git.fetch` | no | `true` permits smart HTTP clone/fetch (`git-upload-pack`) for this host. Push (`git-receive-pack`) remains blocked. |
More examples in `examples/`. Full design lives under `docs/prds/`; the trust-boundary rationale is in `docs/prds/0011-per-file-md-manifest.md`.
## Trademarks
+17 -5
View File
@@ -45,7 +45,7 @@ from ..agent_provider import AgentProvisionPlan, get_provider, build_agent_provi
from ..egress import EgressPlan
from ..git_gate import GitGatePlan
from ..log import die, info
from ..manifest import Manifest
from ..manifest import ManifestGitEntry, Manifest
from ..supervise import SupervisePlan
from ..util import expand_tilde
from ..env import resolve_env, ResolvedEnv
@@ -356,14 +356,16 @@ class BottleBackend(ABC, Generic[PlanT, CleanupT]):
pass
def _validate(self, spec: BottleSpec) -> None:
"""Cross-backend pre-launch checks. Confirms the agent exists
and the named skills are present on the host. Subclasses with
additional preconditions should override and call
`super()._validate(spec)` first."""
"""Cross-backend pre-launch checks. Confirms the agent exists,
the named skills are present on the host, and every git
IdentityFile resolves. Subclasses with additional preconditions
should override and call `super()._validate(spec)` first."""
manifest = spec.manifest
manifest.require_agent(spec.agent_name)
agent = manifest.agents[spec.agent_name]
bottle = manifest.bottle_for(spec.agent_name)
self._validate_skills(agent.skills)
self._validate_git_entries(bottle.git)
self._validate_agent_provider_dockerfile(spec)
def _validate_skills(self, skills: Sequence[str]) -> None:
@@ -378,6 +380,16 @@ class BottleBackend(ABC, Generic[PlanT, CleanupT]):
f"Create it under ~/.claude/skills/, then re-run."
)
def _validate_git_entries(self, entries: Sequence[ManifestGitEntry]) -> None:
"""Each entry's IdentityFile must exist on the host (after
expanding leading ~) — the git-gate copies it in at start time
to authenticate the upstream push (PRD 0008). Shape is already
enforced by Manifest validation; this only checks presence."""
for entry in entries:
key = expand_tilde(entry.IdentityFile)
if not os.path.isfile(key):
die(f"git upstream key file not found for '{entry.Name}': {key}")
def _validate_agent_provider_dockerfile(self, spec: BottleSpec) -> None:
bottle = spec.manifest.bottle_for(spec.agent_name)
dockerfile = bottle.agent_provider.dockerfile
+2 -12
View File
@@ -33,18 +33,8 @@ from . import BottleSpec
def mint_slug(spec: BottleSpec) -> str:
"""Return the bottle identity: the recorded identity for a resume,
or a freshly minted one for a new start.
When a label is provided it becomes the full slug (no random suffix),
so two launches with the same label collide by design. When no label
is given the identity is minted with a random suffix to avoid
collisions between anonymous launches of the same agent."""
if spec.identity:
return spec.identity
if spec.label:
from .docker import util as docker_mod
return docker_mod.slugify(spec.label)
return bottle_identity(spec.agent_name)
or a freshly minted one for a new start."""
return spec.identity or bottle_identity(spec.agent_name)
def write_launch_metadata(
+16 -5
View File
@@ -12,11 +12,22 @@ import shlex
# uses true/24-bit colors for its own chrome, which would otherwise bypass
# the palette entirely.
_COLORS: dict[str, tuple[int, str, int, str, str]] = {
"red": (9, "#e74c3c", 1, "#c0392b", "#200808"),
"green": (10, "#2ecc71", 2, "#27ae60", "#082008"),
"yellow": (11, "#f1c40f", 3, "#d4ac0d", "#201808"),
"blue": (12, "#3498db", 4, "#2471a3", "#080820"),
"magenta": (13, "#9b59b6", 5, "#7d3c98", "#160820"),
"black": (0, "#2d2d2d", 8, "#5c5c5c", "#0a0a0a"),
"red": (1, "#c0392b", 9, "#e74c3c", "#1a0707"),
"green": (2, "#27ae60", 10, "#2ecc71", "#071a09"),
"yellow": (3, "#d4ac0d", 11, "#f1c40f", "#1a1507"),
"blue": (4, "#2471a3", 12, "#3498db", "#07071a"),
"magenta": (5, "#7d3c98", 13, "#9b59b6", "#12071a"),
"cyan": (6, "#148f77", 14, "#1abc9c", "#071a1a"),
"white": (7, "#bdc3c7", 15, "#ecf0f1", "#111111"),
"bright-black": (8, "#5c5c5c", 0, "#2d2d2d", "#111111"),
"bright-red": (9, "#e74c3c", 1, "#c0392b", "#200808"),
"bright-green": (10, "#2ecc71", 2, "#27ae60", "#082008"),
"bright-yellow": (11, "#f1c40f", 3, "#d4ac0d", "#201808"),
"bright-blue": (12, "#3498db", 4, "#2471a3", "#080820"),
"bright-magenta": (13, "#9b59b6", 5, "#7d3c98", "#160820"),
"bright-cyan": (14, "#1abc9c", 6, "#148f77", "#082020"),
"bright-white": (15, "#ecf0f1", 7, "#bdc3c7", "#151515"),
}
# OSC 104 resets all indexed palette entries; OSC 111 resets default background.
+16 -5
View File
@@ -11,11 +11,22 @@ from ..manifest import Manifest
from ._common import PROG, USER_CWD
_ANSI_COLOR_CODES: dict[str, str] = {
"red": "\033[91m",
"green": "\033[92m",
"yellow": "\033[93m",
"blue": "\033[94m",
"magenta": "\033[95m",
"black": "\033[30m",
"red": "\033[31m",
"green": "\033[32m",
"yellow": "\033[33m",
"blue": "\033[34m",
"magenta": "\033[35m",
"cyan": "\033[36m",
"white": "\033[37m",
"bright-black": "\033[90m",
"bright-red": "\033[91m",
"bright-green": "\033[92m",
"bright-yellow": "\033[93m",
"bright-blue": "\033[94m",
"bright-magenta": "\033[95m",
"bright-cyan": "\033[96m",
"bright-white": "\033[97m",
}
_ANSI_RESET = "\033[0m"
-18
View File
@@ -20,11 +20,9 @@ from ..agent_provider import runtime_for
from ..backend import (
Bottle,
BottleSpec,
enumerate_active_agents,
get_bottle_backend,
known_backend_names,
)
from ..backend.docker import util as docker_mod
from ..backend.docker.bottle_plan import DockerBottlePlan
from ..bottle_state import (
cleanup_state,
@@ -76,7 +74,6 @@ def cmd_start(argv: list[str]) -> int:
backend_name: str | None = args.backend
label, color = tui.name_color_modal(default_label=agent_name)
label, color = _resolve_unique_label(label, color)
spec = BottleSpec(
manifest=manifest,
@@ -194,21 +191,6 @@ def _identity_from_plan(plan: object) -> str:
return getattr(plan, "slug", "")
def _resolve_unique_label(label: str, color: str) -> tuple[str, str]:
"""Re-prompt with a disclaimer until the label's slug is not already
in use among running bottles. Passes through unchanged when no
collision is found on the first check."""
while True:
slug_candidate = docker_mod.slugify(label)
active_slugs = {a.slug for a in enumerate_active_agents()}
if slug_candidate not in active_slugs:
return label, color
label, color = tui.name_color_modal(
default_label=label,
disclaimer=f'"{label}" is already in use',
)
def _text_prompt_yes() -> bool:
"""Default `prompt_yes` for CLI use: reads y/N from the
controlling tty via stderr prompt + tty-line read."""
+7 -36
View File
@@ -40,7 +40,6 @@ from ..supervise import (
STATUS_MODIFIED,
STATUS_REJECTED,
TOOL_CAPABILITY_BLOCK,
TOOL_GITLEAKS_ALLOW,
archive_proposal,
list_pending_proposals,
render_diff,
@@ -116,8 +115,6 @@ def _detail_lines(
def _suffix_for_tool(tool: str) -> str:
if tool == TOOL_CAPABILITY_BLOCK:
return ".dockerfile"
if tool == TOOL_GITLEAKS_ALLOW:
return ".txt"
return ".txt"
@@ -157,7 +154,7 @@ def approve(
qp, action=status, notes=notes,
diff_before=diff_before, diff_after=diff_after,
)
if qp.proposal.tool in (TOOL_CAPABILITY_BLOCK, TOOL_GITLEAKS_ALLOW):
if qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
archive_proposal(qp.queue_dir, qp.proposal.id)
@@ -173,23 +170,6 @@ def reject(qp: QueuedProposal, *, reason: str) -> None:
_write_audit(qp, action=STATUS_REJECTED, notes=reason, diff_before="", diff_after="")
def _approve_from_tui(
stdscr: "curses._CursesWindow", # type: ignore
qp: QueuedProposal,
*,
final_file: str | None = None,
notes: str = "",
) -> str:
"""Approve from curses, prompting for any tool-specific audit note."""
if qp.proposal.tool == TOOL_GITLEAKS_ALLOW and final_file is None:
notes = _prompt(stdscr, "allow reason (test fixture/false positive): ")
if not notes:
return "approve aborted (empty reason)"
approve(qp, final_file=final_file, notes=notes)
verb = "modified+approved" if final_file is not None else "approved"
return _approval_status(qp, verb)
def _write_audit(
qp: QueuedProposal,
*,
@@ -373,22 +353,18 @@ def _main_loop(stdscr: "curses._CursesWindow") -> None: # type: ignore
_detail_view(stdscr, qp, green_attr=green_attr)
elif key == ord("a"):
try:
status_line = _approve_from_tui(stdscr, qp)
approve(qp)
status_line = _approval_status(qp, "approved")
except ApplyError as e:
status_line = f"apply failed: {e}"
elif key == ord("m"):
if qp.proposal.tool == TOOL_GITLEAKS_ALLOW:
status_line = "modify unavailable for gitleaks-allow"
continue
edited = _modify(stdscr, qp)
if edited is None:
status_line = "modify aborted (no change)"
else:
try:
status_line = _approve_from_tui(
stdscr, qp, final_file=edited,
notes="operator modified before approving",
)
approve(qp, final_file=edited, notes="operator modified before approving")
status_line = _approval_status(qp, "modified+approved")
except ApplyError as e:
status_line = f"apply failed: {e}"
elif key == ord("r"):
@@ -486,20 +462,15 @@ def _detail_view(
offset = max(0, len(lines) - 1)
elif key == ord("a"):
try:
_approve_from_tui(stdscr, qp)
approve(qp)
except ApplyError:
pass
return
elif key == ord("m"):
if qp.proposal.tool == TOOL_GITLEAKS_ALLOW:
return
edited = _modify(stdscr, qp)
if edited is not None:
try:
_approve_from_tui(
stdscr, qp, final_file=edited,
notes="operator modified before approving",
)
approve(qp, final_file=edited, notes="operator modified before approving")
except ApplyError:
pass
return
+19 -19
View File
@@ -226,15 +226,20 @@ def _addstr_safe(screen: Any, row: int, col: int, text: str, attr: int = curses.
# ---------------------------------------------------------------------------
_ANSI_COLORS = [
"red", "green", "yellow", "blue", "magenta",
"red", "green", "blue", "yellow", "magenta", "cyan", "white", "black",
"bright-red", "bright-green", "bright-blue", "bright-yellow",
"bright-magenta", "bright-cyan", "bright-white", "bright-black",
]
_CURSES_COLOR_MAP: dict[str, int] = {
"black": curses.COLOR_BLACK,
"red": curses.COLOR_RED,
"green": curses.COLOR_GREEN,
"yellow": curses.COLOR_YELLOW,
"blue": curses.COLOR_BLUE,
"magenta": curses.COLOR_MAGENTA,
"cyan": curses.COLOR_CYAN,
"white": curses.COLOR_WHITE,
}
_COLOR_NONE = "(none)"
@@ -243,15 +248,11 @@ _COLOR_NONE = "(none)"
def name_color_modal(
default_label: str,
*,
disclaimer: str = "",
tty_path: str = "/dev/tty",
) -> tuple[str, str]:
"""Present a two-step curses modal: first edit the agent label,
then optionally pick a color.
``disclaimer`` is shown below the input field use it to surface
an error from a previous attempt (e.g. name already in use).
Returns ``(label, color)`` where ``color`` is one of the 16 ANSI
color name strings or ``""`` for no color. Falls back to
``(default_label, "")`` on any error (terminal too small, not a tty).
@@ -263,14 +264,14 @@ def name_color_modal(
try:
fd_dup = os.dup(tty_fd.fileno())
return _run_name_color(default_label, tty_fd=fd_dup, disclaimer=disclaimer)
return _run_name_color(default_label, tty_fd=fd_dup)
except Exception: # noqa: BLE001 # pylint: disable=broad-exception-caught
return default_label, ""
finally:
tty_fd.close()
def _run_name_color(default_label: str, *, tty_fd: int, disclaimer: str = "") -> tuple[str, str]:
def _run_name_color(default_label: str, *, tty_fd: int) -> tuple[str, str]:
import io
orig_stdin = sys.__stdin__
orig_stdout = sys.__stdout__
@@ -285,7 +286,7 @@ def _run_name_color(default_label: str, *, tty_fd: int, disclaimer: str = "") ->
curses.cbreak()
screen.keypad(True)
try:
label = _label_step(screen, default_label, disclaimer=disclaimer)
label = _label_step(screen, default_label)
color = _color_step(screen, label)
finally:
screen.keypad(False)
@@ -298,14 +299,14 @@ def _run_name_color(default_label: str, *, tty_fd: int, disclaimer: str = "") ->
return label, color
def _label_step(screen: Any, default_label: str, *, disclaimer: str = "") -> str:
def _label_step(screen: Any, default_label: str) -> str:
"""Step 1: edit the label. First printable key replaces the
pre-fill; subsequent keys append. Enter confirms."""
text = default_label
replaced = False # True once the user has typed their first char
while True:
_render_label(screen, text, disclaimer=disclaimer)
_render_label(screen, text)
try:
key = screen.getch()
except KeyboardInterrupt:
@@ -329,7 +330,7 @@ def _label_step(screen: Any, default_label: str, *, disclaimer: str = "") -> str
text += chr(key)
def _render_label(screen: Any, text: str, *, disclaimer: str = "") -> None:
def _render_label(screen: Any, text: str) -> None:
screen.erase()
rows, cols = screen.getmaxyx()
sep = "" * min(cols - 1, 40)
@@ -337,12 +338,8 @@ def _render_label(screen: Any, text: str, *, disclaimer: str = "") -> None:
_addstr_safe(screen, 1, 0, sep)
_addstr_safe(screen, 2, 0, text[:cols - 1], curses.A_REVERSE)
_addstr_safe(screen, 3, 0, sep)
row = 4
if disclaimer and rows > row + 1:
_addstr_safe(screen, row, 0, disclaimer[:cols - 1], curses.A_BOLD)
row += 1
if rows > row + 1:
_addstr_safe(screen, row, 0, "[any key] edit [Enter] confirm", curses.A_DIM)
if rows > 5:
_addstr_safe(screen, 5, 0, "[any key] edit [Enter] confirm", curses.A_DIM)
screen.refresh()
@@ -382,10 +379,13 @@ def _init_color_pairs() -> dict[str, int]:
curses.use_default_colors()
pair_idx = 2 # pair 1 reserved for other uses
for name in _ANSI_COLORS:
fg = _CURSES_COLOR_MAP.get(name, curses.COLOR_WHITE)
base = name.replace("bright-", "")
fg = _CURSES_COLOR_MAP.get(base, curses.COLOR_WHITE)
try:
curses.init_pair(pair_idx, fg, -1)
attr = curses.color_pair(pair_idx) | curses.A_BOLD
attr = curses.color_pair(pair_idx)
if name.startswith("bright-"):
attr |= curses.A_BOLD
attrs[name] = attr
pair_idx += 1
except curses.error:
+32 -10
View File
@@ -42,19 +42,41 @@ def _prompt_path(guest_home: str) -> str:
_STATUS_LINE_COLORS = {
"red": "\033[91m",
"green": "\033[92m",
"yellow": "\033[93m",
"blue": "\033[94m",
"magenta": "\033[95m",
"black": "\033[30m",
"red": "\033[31m",
"green": "\033[32m",
"yellow": "\033[33m",
"blue": "\033[34m",
"magenta": "\033[35m",
"cyan": "\033[36m",
"white": "\033[37m",
"bright-black": "\033[90m",
"bright-red": "\033[91m",
"bright-green": "\033[92m",
"bright-yellow": "\033[93m",
"bright-blue": "\033[94m",
"bright-magenta": "\033[95m",
"bright-cyan": "\033[96m",
"bright-white": "\033[97m",
}
_CLAUDE_THEME_COLORS = {
"red": "redBright",
"green": "greenBright",
"yellow": "yellowBright",
"blue": "blueBright",
"magenta": "magentaBright",
"black": "black",
"red": "red",
"green": "green",
"yellow": "yellow",
"blue": "blue",
"magenta": "magenta",
"cyan": "cyan",
"white": "white",
"bright-black": "blackBright",
"bright-red": "redBright",
"bright-green": "greenBright",
"bright-yellow": "yellowBright",
"bright-blue": "blueBright",
"bright-magenta": "magentaBright",
"bright-cyan": "cyanBright",
"bright-white": "whiteBright",
}
-161
View File
@@ -247,164 +247,6 @@ cat > "$refs_file"
zero=0000000000000000000000000000000000000000
supervise_gitleaks_allow() {
log_opts=$1
ref=$2
report_file=$(mktemp)
if ! gitleaks git \
--log-opts="$log_opts" \
--no-banner \
--redact \
--ignore-gitleaks-allow \
--report-format=json \
--report-path="$report_file" \
--exit-code 0 \
1>&2; then
rm -f "$report_file"
echo "git-gate: gitleaks inline-suppression scan failed for $ref" >&2
return 1
fi
proposal_id=$(
GITLEAKS_ALLOW_REF="$ref" python3 - "$report_file" <<'PY'
import datetime
import hashlib
import json
import os
import sys
import uuid
from pathlib import Path
report_path = Path(sys.argv[1])
queue_dir = os.environ.get("SUPERVISE_QUEUE_DIR", "")
slug = os.environ.get("SUPERVISE_BOTTLE_SLUG", "")
if not queue_dir or not slug:
sys.exit(2)
try:
raw = json.loads(report_path.read_text() or "[]")
except json.JSONDecodeError:
sys.exit(3)
if not isinstance(raw, list):
sys.exit(3)
if not raw:
sys.exit(0)
ref = os.environ.get("GITLEAKS_ALLOW_REF", "")
lines = [
"gitleaks inline suppression requires supervisor approval",
f"ref: {ref}",
"",
]
for i, finding in enumerate(raw, 1):
if not isinstance(finding, dict):
continue
file_path = finding.get("File", "")
line_no = finding.get("StartLine", finding.get("Line", ""))
rule_id = finding.get("RuleID", "")
commit = finding.get("Commit", "")
line = finding.get("Line", "")
lines.extend([
f"finding {i}:",
f" file: {file_path}",
f" line: {line_no}",
f" rule: {rule_id}",
f" commit: {commit}",
f" code: {line}",
"",
])
payload = "\n".join(lines).rstrip() + "\n"
proposal_id = str(uuid.uuid4())
proposal = {
"id": proposal_id,
"bottle_slug": slug,
"tool": "gitleaks-allow",
"proposed_file": payload,
"justification": (
"git-gate found gitleaks findings hidden by # gitleaks:allow; "
"approve only for dummy test fixtures or confirmed false positives"
),
"arrival_timestamp": datetime.datetime.now(
datetime.timezone.utc
).isoformat(),
"current_file_hash": hashlib.sha256(payload.encode("utf-8")).hexdigest(),
}
queue = Path(queue_dir)
queue.mkdir(parents=True, exist_ok=True)
path = queue / f"{proposal_id}.proposal.json"
tmp = path.with_suffix(path.suffix + ".tmp")
with tmp.open("w", encoding="utf-8") as f:
json.dump(proposal, f, indent=2)
f.write("\n")
os.chmod(tmp, 0o600)
os.replace(tmp, path)
print(proposal_id)
PY
)
rc=$?
rm -f "$report_file"
if [ "$rc" -eq 0 ] && [ -z "$proposal_id" ]; then
return 0
fi
if [ "$rc" -ne 0 ]; then
echo "git-gate: cannot route # gitleaks:allow finding to supervisor; refusing push" >&2
return 1
fi
queue_dir=${SUPERVISE_QUEUE_DIR:-}
response_file="$queue_dir/${proposal_id}.response.json"
timeout=${SUPERVISE_GITLEAKS_ALLOW_TIMEOUT_SECONDS:-300}
case "$timeout" in
''|*[!0-9]*)
echo "git-gate: invalid SUPERVISE_GITLEAKS_ALLOW_TIMEOUT_SECONDS=$timeout" >&2
return 1
;;
esac
echo "git-gate: queued # gitleaks:allow supervisor approval $proposal_id" >&2
echo "git-gate: approve with './cli.py supervise' to continue this push" >&2
waited=0
while [ "$waited" -lt "$timeout" ]; do
if [ -f "$response_file" ]; then
status=$(python3 - "$response_file" <<'PY'
import json
import sys
try:
with open(sys.argv[1], encoding="utf-8") as f:
raw = json.load(f)
except (OSError, json.JSONDecodeError):
sys.exit(1)
status = raw.get("status")
if not isinstance(status, str):
sys.exit(1)
print(status)
PY
) || status=""
case "$status" in
approved|modified)
mkdir -p "$queue_dir/processed"
mv -f "$queue_dir/${proposal_id}.proposal.json" "$queue_dir/processed/" 2>/dev/null || true
mv -f "$queue_dir/${proposal_id}.response.json" "$queue_dir/processed/" 2>/dev/null || true
echo "git-gate: supervisor approved # gitleaks:allow for $ref" >&2
return 0
;;
rejected)
echo "git-gate: supervisor rejected # gitleaks:allow for $ref" >&2
return 1
;;
*)
echo "git-gate: invalid supervisor response for # gitleaks:allow" >&2
return 1
;;
esac
fi
sleep 1
waited=$((waited + 1))
done
echo "git-gate: supervisor approval timed out for # gitleaks:allow; refusing push" >&2
return 1
}
# Phase 1: gitleaks scan each ref's incoming commits.
while IFS=' ' read -r old new ref; do
[ -z "$ref" ] && continue
@@ -426,9 +268,6 @@ while IFS=' ' read -r old new ref; do
echo "git-gate: gitleaks rejected push to $ref" >&2
exit 1
fi
if ! supervise_gitleaks_allow "$log_opts" "$ref"; then
exit 1
fi
done < "$refs_file"
# Phase 2: forward each ref to the upstream (`origin`, configured
+18 -66
View File
@@ -5,20 +5,16 @@ from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .manifest import ManifestBottle
from .manifest import ManifestBottle, ManifestGitEntry
from .manifest_egress import ManifestEgressConfig
def resolve_bottles(raws: dict[str, dict[str, object]]) -> dict[str, ManifestBottle]:
"""Apply `extends:` chains and return resolved ManifestBottle objects."""
cache: dict[str, ManifestBottle] = {}
# Per-bottle effective git-gate.repos, as raw dicts keyed by repo name.
# Threaded alongside `cache` so a child can field-merge against its
# parent's repos without reconstructing them from parsed entries.
repos_cache: dict[str, dict[str, object]] = {}
for name in raws:
if name not in cache:
_resolve_one_bottle(name, raws, cache, repos_cache, ())
_resolve_one_bottle(name, raws, cache, ())
return cache
@@ -26,7 +22,6 @@ def _resolve_one_bottle(
name: str,
raws: dict[str, dict[str, object]],
cache: dict[str, ManifestBottle],
repos_cache: dict[str, dict[str, object]],
seen: tuple[str, ...],
) -> ManifestBottle:
from .manifest import ManifestBottle, ManifestError
@@ -46,7 +41,6 @@ def _resolve_one_bottle(
if parent_name_raw is None:
bottle = ManifestBottle.from_dict(name, child_raw)
cache[name] = bottle
repos_cache[name] = _resolve_repos_raw({}, child_raw)
return bottle
if not isinstance(parent_name_raw, str):
@@ -66,33 +60,20 @@ def _resolve_one_bottle(
f"bottle '{name}' extends '{parent_name}' which is not "
f"defined. Available bottles: {avail}"
)
parent = _resolve_one_bottle(
parent_name, raws, cache, repos_cache, seen + (name,)
)
merged_repos_raw = _resolve_repos_raw(repos_cache[parent_name], child_raw)
bottle = _merge_bottles(parent, child_raw, merged_repos_raw, name)
parent = _resolve_one_bottle(parent_name, raws, cache, seen + (name,))
bottle = _merge_bottles(parent, child_raw, name)
cache[name] = bottle
repos_cache[name] = merged_repos_raw
return bottle
def _merge_bottles(
parent: ManifestBottle,
child_raw: dict[str, object],
merged_repos_raw: dict[str, object],
name: str,
) -> ManifestBottle:
"""Apply PRD 0025 merge rules."""
from .manifest import ManifestBottle, ManifestGitUser
from .manifest_egress import validate_egress_routes
from .manifest_util import as_json_object
# git-gate.repos: when the child declares repos, inject the already
# name-merged repo set (computed by _resolve_repos_raw) so the child
# parses with the full inherited+overridden list (issue #237).
if _child_declares_git_gate_repos(child_raw):
git_raw = as_json_object(child_raw.get("git-gate", {}), "child git-gate")
child_raw = {**child_raw, "git-gate": {**git_raw, "repos": merged_repos_raw}}
# Parse the child's declared fields into a ManifestBottle (with the
# usual defaults for anything missing). Validation runs the same
@@ -111,11 +92,11 @@ def _merge_bottles(
email=child.git_user.email or parent.git_user.email,
)
# git-gate.repos: when declared, child.git already holds the merged
# set (an explicit empty dict clears parent, leaving child.git empty).
# When omitted, the parent's entries are inherited verbatim.
# git-gate.repos: missing means inherit; an explicit empty object
# clears; otherwise parent and child merge by UpstreamHost with
# child entries replacing duplicate hosts.
if _child_declares_git_gate_repos(child_raw):
merged_git = child.git
merged_git = _merge_git_remotes(parent.git, child.git) if child.git else ()
else:
merged_git = parent.git
@@ -149,45 +130,6 @@ def _merge_bottles(
)
def _resolve_repos_raw(
parent_repos: dict[str, object],
child_raw: dict[str, object],
) -> dict[str, object]:
"""Compute a bottle's effective git-gate.repos as raw dicts.
Repos are keyed by name. When the child omits git-gate.repos it
inherits the parent's set verbatim; an explicit empty dict clears it.
Otherwise parent and child unite by name, with same-name entries
field-merged (parent fields are defaults, child fields win)."""
from .manifest_util import as_json_object
if not _child_declares_git_gate_repos(child_raw):
return parent_repos
child_repos = _declared_repos_raw(child_raw)
if not child_repos:
return {}
# Parent entries keep their order; child-only names are appended.
names = list(parent_repos) + [n for n in child_repos if n not in parent_repos]
return {
name: {
**as_json_object(parent_repos.get(name, {}), "parent git-gate repo"),
**as_json_object(child_repos.get(name, {}), "child git-gate repo"),
}
for name in names
}
def _declared_repos_raw(child_raw: dict[str, object]) -> dict[str, object]:
"""Return the child's explicitly declared git-gate.repos as raw dicts,
or an empty dict when none are declared."""
from .manifest_util import as_json_object
if not _child_declares_git_gate_repos(child_raw):
return {}
git_raw = as_json_object(child_raw.get("git-gate", {}), "child git-gate")
return as_json_object(git_raw.get("repos", {}), "child git-gate.repos")
def _child_declares_git_gate_repos(child_raw: dict[str, object]) -> bool:
from .manifest_util import as_json_object
@@ -198,6 +140,16 @@ def _child_declares_git_gate_repos(child_raw: dict[str, object]) -> bool:
return "repos" in git_obj
def _merge_git_remotes(
parent: tuple[ManifestGitEntry, ...],
child: tuple[ManifestGitEntry, ...],
) -> tuple[ManifestGitEntry, ...]:
by_host = {entry.UpstreamHost: entry for entry in parent}
for entry in child:
by_host[entry.UpstreamHost] = entry
return tuple(by_host.values())
def _merge_egress(
parent: ManifestEgressConfig,
child: ManifestEgressConfig,
-2
View File
@@ -49,11 +49,9 @@ SUPERVISE_HOSTNAME = "supervise"
SUPERVISE_PORT = 9100
TOOL_CAPABILITY_BLOCK = "capability-block"
TOOL_GITLEAKS_ALLOW = "gitleaks-allow"
TOOL_LIST_EGRESS_ROUTES = "list-egress-routes"
TOOLS: tuple[str, ...] = (
TOOL_CAPABILITY_BLOCK,
TOOL_GITLEAKS_ALLOW,
TOOL_LIST_EGRESS_ROUTES,
)
@@ -1,101 +0,0 @@
# PRD prd-new: Gitleaks inline suppression supervision
- **Status:** Active
- **Author:** codex
- **Created:** 2026-06-10
- **Issue:** #208
## Summary
Route every `# gitleaks:allow` inline suppression through the existing
supervisor approval queue before git-gate forwards a push upstream. The
annotation remains available for test fixtures and confirmed false positives,
but it no longer lets an agent silently bypass the gate's secret scan.
## Problem
Gitleaks supports inline suppression comments. With the default behaviour,
a pushed line that contains a secret-like value and `# gitleaks:allow` is
ignored by gitleaks and accepted by git-gate. That is convenient for dummy
credentials in tests, but it also gives an agent a one-line bypass for a real
secret. Once the commit lands, the diff alone cannot prove whether the
suppression was legitimate.
## Goals / Success Criteria
1. Git-gate continues to run the normal gitleaks scan for every incoming ref.
2. After the normal scan passes, git-gate runs a second scan with
`--ignore-gitleaks-allow` and a JSON report so suppressed findings become
visible.
3. If that second scan reports no suppressed findings, the push proceeds
unchanged.
4. If it reports suppressed findings, git-gate creates a `gitleaks-allow`
supervisor proposal containing the ref, file path, line number, rule,
commit, and flagged line for each finding.
5. The push proceeds only when the supervisor explicitly approves the
proposal; rejection, malformed responses, missing supervisor configuration,
and timeout all refuse the push.
6. The supervisor TUI requires a reason when approving a `gitleaks-allow`
proposal, so the audit trail records whether the approval was for a test
fixture or a false positive.
## Non-goals
- Replacing gitleaks or changing the main secret-detection rule set.
- Removing support for `# gitleaks:allow`.
- Automatically classifying fixture files or false positives.
- Adding new supervisor transport or authentication mechanisms.
## Design
### Git-gate flow
`git_gate_render_hook()` emits a `supervise_gitleaks_allow` shell helper.
For each incoming ref, git-gate first runs the existing gitleaks command. If
that scan passes, it runs:
```sh
gitleaks git \
--log-opts="$log_opts" \
--no-banner \
--redact \
--ignore-gitleaks-allow \
--report-format=json \
--report-path="$report_file" \
--exit-code 0
```
The second pass keeps the push path non-interactive while producing a report
of findings that would otherwise have been hidden by inline suppression.
### Supervisor proposal
When the JSON report contains findings, an embedded Python helper writes a
proposal into `SUPERVISE_QUEUE_DIR` using the existing proposal schema. The
proposal uses:
- `tool: "gitleaks-allow"`
- a text payload with the ref and each finding's file, line, rule, commit,
and redacted code line
- a justification that tells the operator to approve only dummy test fixtures
or confirmed false positives
Git-gate then waits for `<proposal-id>.response.json` for
`SUPERVISE_GITLEAKS_ALLOW_TIMEOUT_SECONDS`, defaulting to 300 seconds.
`approved` and `modified` responses allow the push; `rejected`, invalid
responses, invalid timeout configuration, or timeout refuse it.
### Supervisor UI
`TOOL_GITLEAKS_ALLOW` is added to the supervisor tool registry. The curses
supervisor renders the proposal as text and allows approval or rejection.
Modification is unavailable for this proposal type because there is no file
patch to apply. Approval from the TUI prompts for a non-empty reason and
writes that reason to the response/audit path.
### Tests
Unit tests assert that the rendered git-gate hook includes the second gitleaks
pass, supervisor queue fields, and fail-closed messages. Supervisor tests cover
the new tool constant, proposal archiving, and the required TUI approval
reason.
+6 -1
View File
@@ -5,10 +5,15 @@ agent_provider:
egress:
routes:
- host: api.anthropic.com
role: claude_code_oauth
role: claude_code_oauth # wires Claude Code OAuth; do not change
auth:
scheme: Bearer
token_ref: BOT_BOTTLE_CLAUDE_OAUTH_TOKEN
# dlp is omitted → all detectors on by default (token_patterns,
# known_secrets outbound; naive_injection_detection inbound).
# To disable inbound scanning for this route:
# dlp:
# inbound_detectors: false
---
Common Claude provider boundary. Drop this file into
+4 -3
View File
@@ -92,9 +92,10 @@ class TestSandboxEscape(unittest.TestCase):
"on PATH: curl -sSL https://smolmachines.com/install.sh | sh"
)
# Throwaway "identity file" for the git-gate's `identity` field.
# It need not be a real SSH key: test 5 reaches gitleaks before
# any SSH attempt anyway.
# Throwaway "identity file" so the manifest's _validate_git_entries
# passes (it only checks `os.path.isfile`, not that the content is
# a real SSH key). Test 5 reaches gitleaks before any SSH attempt
# anyway.
fd, kp = tempfile.mkstemp(prefix="sandbox-test-key.")
os.close(fd)
cls._key_path = Path(kp)
+1 -1
View File
@@ -74,7 +74,7 @@ class TestAgentProviderRuntime(unittest.TestCase):
instance_name="bot-bottle-test",
prompt_file=prompt_file,
label="review-api",
color="cyan",
color="bright-cyan",
)
prompt = prompt_file.read_text()
config = Path(tmp, "codex-config.toml").read_text()
-32
View File
@@ -16,7 +16,6 @@ from bot_bottle import bottle_state
from bot_bottle import supervise
from bot_bottle.backend import BottleSpec
from bot_bottle.backend.docker import DockerBottleBackend
from bot_bottle.backend.resolve_common import mint_slug
from bot_bottle.backend.smolmachines import SmolmachinesBottleBackend
from bot_bottle.manifest import Manifest
@@ -116,36 +115,5 @@ class TestSmolmachinesPrepare(_FakeStateMixin, unittest.TestCase):
)
class TestMintSlug(unittest.TestCase):
def _spec(self, *, label: str = "", identity: str = "") -> BottleSpec:
manifest = _manifest()
return BottleSpec(
manifest=manifest,
agent_name="demo",
copy_cwd=False,
user_cwd="/tmp",
label=label,
identity=identity,
)
def test_no_label_uses_agent_name_with_random_suffix(self) -> None:
slug = mint_slug(self._spec(label=""))
self.assertTrue(slug.startswith("demo-"), slug)
# random suffix present — slug is longer than just "demo"
self.assertGreater(len(slug), len("demo-"))
def test_label_becomes_exact_slug(self) -> None:
slug = mint_slug(self._spec(label="my-run"))
self.assertEqual("my-run", slug)
def test_label_with_spaces_slugified_no_suffix(self) -> None:
slug = mint_slug(self._spec(label="My Feature Run"))
self.assertEqual("my-feature-run", slug)
def test_identity_takes_precedence_over_label(self) -> None:
slug = mint_slug(self._spec(label="my-run", identity="fixed-id"))
self.assertEqual("fixed-id", slug)
if __name__ == "__main__":
unittest.main()
+11 -8
View File
@@ -11,14 +11,14 @@ class TestPalettePrintf(unittest.TestCase):
def test_known_color_returns_printf(self):
cmd = palette_printf("red")
self.assertTrue(cmd.startswith("printf '"))
self.assertIn("\\033]4;9;", cmd) # bright-red slot
self.assertIn("\\033]4;1;", cmd) # normal-red slot
self.assertIn("\\033]4;1;", cmd) # normal red
self.assertIn("\\033]4;9;", cmd) # bright red
self.assertIn("\\033]11;", cmd) # default background tint
def test_color_sets_both_palette_slots(self):
cmd = palette_printf("blue")
self.assertIn("\\033]4;12;", cmd) # bright-blue slot
self.assertIn("\\033]4;4;", cmd) # normal-blue slot
def test_bright_variant_sets_both_slots(self):
cmd = palette_printf("bright-blue")
self.assertIn("\\033]4;12;", cmd) # bright-blue
self.assertIn("\\033]4;4;", cmd) # blue
def test_unknown_color_returns_empty(self):
self.assertEqual("", palette_printf(""))
@@ -26,7 +26,10 @@ class TestPalettePrintf(unittest.TestCase):
def test_all_named_colors_produce_output(self):
colors = [
"red", "green", "yellow", "blue", "magenta",
"black", "red", "green", "yellow",
"blue", "magenta", "cyan", "white",
"bright-black", "bright-red", "bright-green", "bright-yellow",
"bright-blue", "bright-magenta", "bright-cyan", "bright-white",
]
for color in colors:
with self.subTest(color=color):
@@ -62,7 +65,7 @@ class TestExecShellScript(unittest.TestCase):
self.assertFalse(agent_part.startswith("exec "))
def test_title_and_color_both_appear(self):
script = exec_shell_script(self._ARGV, terminal_title="bot", terminal_color="magenta")
script = exec_shell_script(self._ARGV, terminal_title="bot", terminal_color="cyan")
assert script is not None
self.assertIn("bot", script)
self.assertIn("\\033]4;", script)
-59
View File
@@ -14,7 +14,6 @@ from unittest.mock import MagicMock, patch
import bot_bottle.cli.start as start_mod
import bot_bottle.cli.tui as tui_mod
from bot_bottle.backend import ActiveAgent
def _make_manifest(agent_names: list[str]):
@@ -134,63 +133,5 @@ class TestCmdStartSelector(unittest.TestCase):
self._launch_mock.assert_not_called()
def _active_agent(slug: str) -> ActiveAgent:
return ActiveAgent(
backend_name="docker",
slug=slug,
agent_name="demo",
started_at="2026-01-01T00:00:00+00:00",
services=(),
)
class TestCmdStartLabelCollision(unittest.TestCase):
"""cmd_start re-prompts when the label's slug is already running."""
def setUp(self):
self._manifest = _make_manifest(["researcher"])
patch("bot_bottle.cli.start.Manifest.resolve", return_value=self._manifest).start()
self._launch_mock = patch(
"bot_bottle.cli.start._launch_bottle", return_value=0,
).start()
self.addCleanup(patch.stopall)
def test_no_collision_proceeds_without_reprompt(self):
with (
patch.object(tui_mod, "name_color_modal", return_value=("researcher", "")) as modal,
patch("bot_bottle.cli.start.enumerate_active_agents", return_value=[]),
):
rc = start_mod.cmd_start(["researcher"])
self.assertEqual(0, rc)
modal.assert_called_once()
self._launch_mock.assert_called_once()
def test_collision_reprompts_with_disclaimer(self):
collision_agent = _active_agent("researcher")
call_count = 0
def _modal(default_label: str, *, disclaimer: str = "", **_kw: object) -> tuple[str, str]:
nonlocal call_count
call_count += 1
if call_count == 1:
return "researcher", ""
return "researcher-2", ""
with (
patch.object(tui_mod, "name_color_modal", side_effect=_modal) as modal,
patch(
"bot_bottle.cli.start.enumerate_active_agents",
side_effect=[[collision_agent], []],
),
):
rc = start_mod.cmd_start(["researcher"])
self.assertEqual(0, rc)
self.assertEqual(2, modal.call_count)
second_call_kwargs = modal.call_args_list[1][1]
self.assertIn("researcher", second_call_kwargs.get("disclaimer", ""))
self.assertIn("already in use", second_call_kwargs.get("disclaimer", ""))
if __name__ == "__main__":
unittest.main()
+3 -3
View File
@@ -276,7 +276,7 @@ class TestClaudeUiProvision(unittest.TestCase):
instance_name="bot-bottle-demo-abc12",
prompt_file=prompt_file,
label="research-ui",
color="blue",
color="bright-cyan",
)
settings = json.loads((state_dir / "claude-settings.json").read_text())
statusline = (state_dir / "claude-statusline.sh").read_text()
@@ -288,9 +288,9 @@ class TestClaudeUiProvision(unittest.TestCase):
self.assertEqual("~/.claude/statusline.sh", settings["statusLine"]["command"])
self.assertEqual("custom:bot-bottle-research-ui", settings["theme"])
self.assertIn("research-ui", statusline)
self.assertIn("\x1b[94m", statusline)
self.assertIn("\x1b[96m", statusline)
self.assertEqual("dark", theme["base"])
self.assertEqual("ansi:blueBright", theme["overrides"]["claude"])
self.assertEqual("ansi:cyanBright", theme["overrides"]["claude"])
def test_runs_verify_commands(self):
provision = AgentProvisionPlan(
+1 -1
View File
@@ -158,7 +158,7 @@ class TestCodexProvisionPrompt(unittest.TestCase):
instance_name="bot-bottle-demo-abc12",
prompt_file=prompt_file,
label="research-ui",
color="cyan",
color="bright-cyan",
)
config = (state_dir / "codex-config.toml").read_text()
prompt_text = prompt_file.read_text()
-24
View File
@@ -199,30 +199,6 @@ class TestHookRender(unittest.TestCase):
self.assertIn('set -- "$@" --push-option="$opt"', hook)
self.assertIn('git push "$@" origin "$refspec"', hook)
def test_inline_gitleaks_allow_routes_to_supervisor(self):
hook = git_gate_render_hook()
# First gitleaks runs normally; only if that passes does the
# hook ask gitleaks to ignore inline allow comments and report
# the suppressed findings for human approval.
self.assertIn("--ignore-gitleaks-allow", hook)
self.assertIn("--report-format=json", hook)
self.assertIn('"tool": "gitleaks-allow"', hook)
self.assertIn("SUPERVISE_QUEUE_DIR", hook)
self.assertIn("SUPERVISE_BOTTLE_SLUG", hook)
self.assertIn("supervisor approved # gitleaks:allow", hook)
self.assertIn("supervisor rejected # gitleaks:allow", hook)
def test_inline_gitleaks_allow_fails_closed_without_supervisor(self):
hook = git_gate_render_hook()
self.assertIn(
"cannot route # gitleaks:allow finding to supervisor; refusing push",
hook,
)
self.assertIn(
"supervisor approval timed out for # gitleaks:allow; refusing push",
hook,
)
class TestAccessHookRender(unittest.TestCase):
def test_access_hook_refreshes_origin_on_upload_pack(self):
+8 -81
View File
@@ -113,8 +113,8 @@ class TestExtendsEnvMerge(unittest.TestCase):
class TestExtendsGitMerge(unittest.TestCase):
"""git-gate.user overlays by field; git-gate.repos merges by name,
with same-name child entries merging field-by-field (child wins)."""
"""git-gate.user overlays by field; git-gate.repos merges by upstream
host, with child entries replacing duplicate hosts."""
_GIT_ENTRY_A = {"url": "ssh://git@host-a/a.git", "key": {"provider": "static", "path": "/dev/null"}}
_GIT_ENTRY_B = {"url": "ssh://git@host-b/b.git", "key": {"provider": "static", "path": "/dev/null"}}
@@ -130,21 +130,19 @@ class TestExtendsGitMerge(unittest.TestCase):
names = [e.Name for e in m.bottles["child"].git]
self.assertEqual(["a", "b"], names)
def test_child_git_repo_different_name_same_host_coexists(self):
# Repos are keyed by Name, not UpstreamHost: two repos with
# different names on the same host both survive the merge.
same_host_b = {"url": "ssh://git@host-a/b.git", "key": {"provider": "static", "path": "/dev/null"}}
def test_child_git_repo_replaces_same_host(self):
replacement = {"url": "ssh://git@host-a/replacement.git", "key": {"provider": "static", "path": "/dev/null"}}
m = _build(
base={"git-gate": {"repos": {"a": self._GIT_ENTRY_A}}},
child={
"extends": "base",
"git-gate": {"repos": {"a2": same_host_b}},
"git-gate": {"repos": {"a2": replacement}},
},
)
entries = m.bottles["child"].git
self.assertEqual(2, len(entries))
names = {e.Name for e in entries}
self.assertEqual({"a", "a2"}, names)
self.assertEqual(1, len(entries))
self.assertEqual("a2", entries[0].Name)
self.assertEqual("replacement.git", entries[0].UpstreamPath)
def test_child_omits_git_gate_inherits_full_list(self):
m = _build(
@@ -166,77 +164,6 @@ class TestExtendsGitMerge(unittest.TestCase):
)
self.assertEqual((), m.bottles["child"].git)
def test_child_same_name_repo_merges_key_field(self):
# Issue #237: child repo with same name as parent should merge
# field-by-field. Child overrides only `key`; parent's url and
# host_key are preserved.
parent_entry = {
"url": "ssh://git@host-a/repo.git",
"host_key": "ecdsa-sha2-nistp256 AAAA",
"key": {"provider": "static", "path": "/keys/id_rsa"},
}
m = _build(
base={"git-gate": {"repos": {"repo": parent_entry}}},
child={
"extends": "base",
"git-gate": {"repos": {"repo": {
"key": {"provider": "gitea", "forge_token_env": "GITEA_TOKEN"},
}}},
},
)
entries = m.bottles["child"].git
self.assertEqual(1, len(entries))
e = entries[0]
self.assertEqual("repo", e.Name)
self.assertEqual("ssh://git@host-a/repo.git", e.Upstream)
self.assertEqual("ecdsa-sha2-nistp256 AAAA", e.KnownHostKey)
self.assertEqual("gitea", e.Key.provider)
self.assertEqual("GITEA_TOKEN", e.Key.forge_token_env)
def test_child_same_name_repo_overrides_url(self):
# Child can override url on a same-name repo; other parent fields
# fall through.
parent_entry = {
"url": "ssh://git@host-a/old.git",
"key": {"provider": "static", "path": "/keys/id_rsa"},
}
m = _build(
base={"git-gate": {"repos": {"repo": parent_entry}}},
child={
"extends": "base",
"git-gate": {"repos": {"repo": {
"url": "ssh://git@host-b/new.git",
"key": {"provider": "static", "path": "/keys/id_rsa"},
}}},
},
)
entries = m.bottles["child"].git
self.assertEqual(1, len(entries))
self.assertEqual("ssh://git@host-b/new.git", entries[0].Upstream)
def test_child_same_name_plus_new_repo(self):
# Same-name repo is field-merged; a distinct new name in child
# is appended.
parent_entry = {
"url": "ssh://git@host-a/repo.git",
"key": {"provider": "static", "path": "/keys/id_rsa"},
}
m = _build(
base={"git-gate": {"repos": {"repo": parent_entry}}},
child={
"extends": "base",
"git-gate": {"repos": {
"repo": {"key": {"provider": "gitea", "forge_token_env": "TOK"}},
"other": self._GIT_ENTRY_B,
}},
},
)
child = m.bottles["child"]
names = {e.Name for e in child.git}
self.assertEqual({"repo", "other"}, names)
repo_entry = next(e for e in child.git if e.Name == "repo")
self.assertEqual("gitea", repo_entry.Key.provider)
def test_child_git_user_inherits_parent_repos(self):
m = _build(
base={"git-gate": {"repos": {"a": self._GIT_ENTRY_A}}},
-2
View File
@@ -17,7 +17,6 @@ from bot_bottle.supervise import (
STATUS_MODIFIED,
STATUS_REJECTED,
TOOL_CAPABILITY_BLOCK,
TOOL_GITLEAKS_ALLOW,
archive_proposal,
audit_log_path,
list_pending_proposals,
@@ -319,7 +318,6 @@ class TestToolConstants(unittest.TestCase):
self.assertEqual(
(
TOOL_CAPABILITY_BLOCK,
TOOL_GITLEAKS_ALLOW,
supervise.TOOL_LIST_EGRESS_ROUTES,
),
supervise.TOOLS,
-25
View File
@@ -12,7 +12,6 @@ import tempfile
import unittest
from datetime import datetime, timezone
from pathlib import Path
from unittest.mock import patch
from bot_bottle import supervise
from bot_bottle.cli import supervise as supervise_cli
@@ -22,7 +21,6 @@ from bot_bottle.supervise import (
STATUS_MODIFIED,
STATUS_REJECTED,
TOOL_CAPABILITY_BLOCK,
TOOL_GITLEAKS_ALLOW,
read_audit_entries,
read_response,
sha256_hex,
@@ -35,7 +33,6 @@ FIXED = datetime(2026, 5, 25, 12, 0, 0, tzinfo=timezone.utc)
def _proposal(slug: str = "dev", tool: str = TOOL_CAPABILITY_BLOCK) -> Proposal:
payloads = {
TOOL_CAPABILITY_BLOCK: "FROM python:3.13\n",
TOOL_GITLEAKS_ALLOW: "file: tests/test_fixture.py\nline: 3\n",
}
payload = payloads.get(tool, "")
return Proposal.new(
@@ -157,28 +154,6 @@ class TestApproveReject(_FakeHomeMixin, unittest.TestCase):
supervise_cli.approve(qp)
self.assertEqual([], read_audit_entries("egress", "dev"))
def test_approve_archives_gitleaks_allow(self):
qp = self._enqueue(tool=TOOL_GITLEAKS_ALLOW)
supervise_cli.approve(qp, notes="dummy fixture")
resp = read_response(qp.queue_dir / "processed", qp.proposal.id)
self.assertEqual(STATUS_APPROVED, resp.status)
self.assertEqual("dummy fixture", resp.notes)
def test_tui_gitleaks_allow_requires_reason(self):
qp = self._enqueue(tool=TOOL_GITLEAKS_ALLOW)
with patch.object(supervise_cli, "_prompt", return_value=""):
status = supervise_cli._approve_from_tui(None, qp) # type: ignore[arg-type]
self.assertEqual("approve aborted (empty reason)", status)
self.assertFalse((qp.queue_dir / "processed").exists())
def test_tui_gitleaks_allow_writes_reason(self):
qp = self._enqueue(tool=TOOL_GITLEAKS_ALLOW)
with patch.object(supervise_cli, "_prompt", return_value="test fixture"):
status = supervise_cli._approve_from_tui(None, qp) # type: ignore[arg-type]
self.assertIn("approved gitleaks-allow", status)
resp = read_response(qp.queue_dir / "processed", qp.proposal.id)
self.assertEqual("test fixture", resp.notes)
# class TestCapabilityApplyWiring(_FakeHomeMixin, unittest.TestCase):
# # DISABLED — capability_apply functionality is currently commented out.