Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 43e8c5244c |
@@ -1,9 +0,0 @@
|
|||||||
[run]
|
|
||||||
branch = True
|
|
||||||
source = .
|
|
||||||
|
|
||||||
[report]
|
|
||||||
omit =
|
|
||||||
bot_bottle/egress_addon.py
|
|
||||||
bot_bottle/cli/tui.py
|
|
||||||
tests/*
|
|
||||||
@@ -39,14 +39,8 @@ jobs:
|
|||||||
with:
|
with:
|
||||||
python-version: "3.12"
|
python-version: "3.12"
|
||||||
|
|
||||||
- name: Install dev requirements
|
|
||||||
run: python3 -m pip install -r requirements-dev.txt
|
|
||||||
|
|
||||||
- name: Run unit tests
|
- name: Run unit tests
|
||||||
run: python3 -m coverage run -m unittest discover -t . -s tests/unit -v
|
run: python3 -m unittest discover -t . -s tests/unit -v
|
||||||
|
|
||||||
- name: Report unit coverage
|
|
||||||
run: python3 -m coverage report -m
|
|
||||||
|
|
||||||
integration:
|
integration:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
|
|||||||
@@ -22,4 +22,3 @@ venv/
|
|||||||
.pytest_cache/
|
.pytest_cache/
|
||||||
.mypy_cache/
|
.mypy_cache/
|
||||||
.ruff_cache/
|
.ruff_cache/
|
||||||
.coverage
|
|
||||||
|
|||||||
@@ -68,11 +68,6 @@ def build_image(ref: str, context: str, *, dockerfile: str = "") -> None:
|
|||||||
_ensure_builder_dns()
|
_ensure_builder_dns()
|
||||||
args = [_CONTAINER, "build", "-t", ref, "--dns", dns_server()]
|
args = [_CONTAINER, "build", "-t", ref, "--dns", dns_server()]
|
||||||
if dockerfile:
|
if dockerfile:
|
||||||
# `container build` resolves -f relative to the current working
|
|
||||||
# directory, not the build context. Anchor a relative Dockerfile to
|
|
||||||
# the context so builds work from any cwd.
|
|
||||||
if not os.path.isabs(dockerfile):
|
|
||||||
dockerfile = os.path.join(context, dockerfile)
|
|
||||||
args.extend(["-f", dockerfile])
|
args.extend(["-f", dockerfile])
|
||||||
args.append(context)
|
args.append(context)
|
||||||
subprocess.run(args, check=True)
|
subprocess.run(args, check=True)
|
||||||
|
|||||||
@@ -2,8 +2,9 @@
|
|||||||
act on them (approve / modify / reject).
|
act on them (approve / modify / reject).
|
||||||
|
|
||||||
Curses-based TUI; modify-then-approve shells out to $EDITOR. The
|
Curses-based TUI; modify-then-approve shells out to $EDITOR. The
|
||||||
Egress proposals are queued for operator review as full routes.yaml
|
approval handler wires to PRD 0016 (capability-block), which rebuilds
|
||||||
updates.
|
the bottle Dockerfile. Egress proposals are queued for operator review
|
||||||
|
as full routes.yaml updates.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
@@ -21,6 +22,10 @@ from pathlib import Path
|
|||||||
|
|
||||||
from .. import supervise as _supervise
|
from .. import supervise as _supervise
|
||||||
from ..bottle_state import read_metadata
|
from ..bottle_state import read_metadata
|
||||||
|
# from ..backend.docker.capability_apply import (
|
||||||
|
# CapabilityApplyError,
|
||||||
|
# apply_capability_change,
|
||||||
|
# )
|
||||||
from ..backend.docker.egress_apply import (
|
from ..backend.docker.egress_apply import (
|
||||||
EgressApplyError,
|
EgressApplyError,
|
||||||
applicator as _docker_applicator,
|
applicator as _docker_applicator,
|
||||||
@@ -33,6 +38,10 @@ from ..backend.smolmachines.egress_apply import (
|
|||||||
)
|
)
|
||||||
from ..log import Die, error, info
|
from ..log import Die, error, info
|
||||||
|
|
||||||
|
|
||||||
|
class CapabilityApplyError(RuntimeError):
|
||||||
|
"""Placeholder while capability_apply is disabled."""
|
||||||
|
|
||||||
from ..supervise import (
|
from ..supervise import (
|
||||||
COMPONENT_FOR_TOOL,
|
COMPONENT_FOR_TOOL,
|
||||||
AuditEntry,
|
AuditEntry,
|
||||||
@@ -41,10 +50,12 @@ from ..supervise import (
|
|||||||
STATUS_APPROVED,
|
STATUS_APPROVED,
|
||||||
STATUS_MODIFIED,
|
STATUS_MODIFIED,
|
||||||
STATUS_REJECTED,
|
STATUS_REJECTED,
|
||||||
|
TOOL_CAPABILITY_BLOCK,
|
||||||
TOOL_EGRESS_ALLOW,
|
TOOL_EGRESS_ALLOW,
|
||||||
TOOL_EGRESS_BLOCK,
|
TOOL_EGRESS_BLOCK,
|
||||||
TOOL_GITLEAKS_ALLOW,
|
TOOL_GITLEAKS_ALLOW,
|
||||||
TOOL_EGRESS_TOKEN_ALLOW,
|
TOOL_EGRESS_TOKEN_ALLOW,
|
||||||
|
archive_proposal,
|
||||||
list_pending_proposals,
|
list_pending_proposals,
|
||||||
render_diff,
|
render_diff,
|
||||||
write_audit_entry,
|
write_audit_entry,
|
||||||
@@ -72,7 +83,7 @@ class QueuedProposal:
|
|||||||
# Errors any remediation engine may raise. Caught by the TUI key
|
# Errors any remediation engine may raise. Caught by the TUI key
|
||||||
# handlers and surfaced in the status line so a failed apply keeps
|
# handlers and surfaced in the status line so a failed apply keeps
|
||||||
# the proposal pending rather than crashing curses.
|
# the proposal pending rather than crashing curses.
|
||||||
ApplyError = (EgressApplyError,)
|
ApplyError = (CapabilityApplyError, EgressApplyError)
|
||||||
|
|
||||||
|
|
||||||
def apply_routes_change(slug: str, content: str) -> tuple[str, str]:
|
def apply_routes_change(slug: str, content: str) -> tuple[str, str]:
|
||||||
@@ -132,6 +143,8 @@ def _detail_lines(
|
|||||||
|
|
||||||
|
|
||||||
def _suffix_for_tool(tool: str) -> str:
|
def _suffix_for_tool(tool: str) -> str:
|
||||||
|
if tool == TOOL_CAPABILITY_BLOCK:
|
||||||
|
return ".dockerfile"
|
||||||
if tool in (TOOL_EGRESS_ALLOW, TOOL_EGRESS_BLOCK):
|
if tool in (TOOL_EGRESS_ALLOW, TOOL_EGRESS_BLOCK):
|
||||||
return ".yaml"
|
return ".yaml"
|
||||||
if tool in (TOOL_GITLEAKS_ALLOW, TOOL_EGRESS_TOKEN_ALLOW):
|
if tool in (TOOL_GITLEAKS_ALLOW, TOOL_EGRESS_TOKEN_ALLOW):
|
||||||
@@ -153,6 +166,17 @@ def approve(
|
|||||||
file_to_apply = final_file if final_file is not None else qp.proposal.proposed_file
|
file_to_apply = final_file if final_file is not None else qp.proposal.proposed_file
|
||||||
|
|
||||||
diff_before, diff_after = "", ""
|
diff_before, diff_after = "", ""
|
||||||
|
# if qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
|
||||||
|
# _meta = read_metadata(qp.proposal.bottle_slug)
|
||||||
|
# if _meta is not None and not _meta.compose_project:
|
||||||
|
# raise CapabilityApplyError(
|
||||||
|
# "capability-block remediation is not supported for smolmachines "
|
||||||
|
# "bottles. Reject this proposal or handle the capability change "
|
||||||
|
# "manually, then restart the bottle."
|
||||||
|
# )
|
||||||
|
# diff_before, diff_after = apply_capability_change(
|
||||||
|
# qp.proposal.bottle_slug, file_to_apply,
|
||||||
|
# )
|
||||||
if qp.proposal.tool in (TOOL_EGRESS_ALLOW, TOOL_EGRESS_BLOCK):
|
if qp.proposal.tool in (TOOL_EGRESS_ALLOW, TOOL_EGRESS_BLOCK):
|
||||||
diff_before, diff_after = apply_routes_change(
|
diff_before, diff_after = apply_routes_change(
|
||||||
qp.proposal.bottle_slug,
|
qp.proposal.bottle_slug,
|
||||||
@@ -170,6 +194,9 @@ def approve(
|
|||||||
qp, action=status, notes=notes,
|
qp, action=status, notes=notes,
|
||||||
diff_before=diff_before, diff_after=diff_after,
|
diff_before=diff_before, diff_after=diff_after,
|
||||||
)
|
)
|
||||||
|
if qp.proposal.tool == TOOL_CAPABILITY_BLOCK:
|
||||||
|
archive_proposal(qp.queue_dir, qp.proposal.id)
|
||||||
|
|
||||||
|
|
||||||
def reject(qp: QueuedProposal, *, reason: str) -> None:
|
def reject(qp: QueuedProposal, *, reason: str) -> None:
|
||||||
"""Write a rejection response and an audit entry."""
|
"""Write a rejection response and an audit entry."""
|
||||||
@@ -319,7 +346,7 @@ def _list_once() -> int:
|
|||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
|
||||||
def _try_init_green() -> int: # pragma: no cover
|
def _try_init_green() -> int:
|
||||||
"""Initialise a green color pair and return its attr, or 0."""
|
"""Initialise a green color pair and return its attr, or 0."""
|
||||||
try:
|
try:
|
||||||
curses.start_color()
|
curses.start_color()
|
||||||
@@ -330,7 +357,7 @@ def _try_init_green() -> int: # pragma: no cover
|
|||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
|
||||||
def _main_loop(stdscr: "curses._CursesWindow") -> None: # type: ignore # pragma: no cover
|
def _main_loop(stdscr: "curses._CursesWindow") -> None: # type: ignore
|
||||||
curses.curs_set(0)
|
curses.curs_set(0)
|
||||||
stdscr.timeout(_REFRESH_INTERVAL_MS)
|
stdscr.timeout(_REFRESH_INTERVAL_MS)
|
||||||
green_attr = _try_init_green()
|
green_attr = _try_init_green()
|
||||||
@@ -420,7 +447,7 @@ def _render(
|
|||||||
status_line: str,
|
status_line: str,
|
||||||
*,
|
*,
|
||||||
green_attr: int = 0, # noqa: F841 — unused, but required by interface
|
green_attr: int = 0, # noqa: F841 — unused, but required by interface
|
||||||
) -> None: # pragma: no cover
|
) -> None:
|
||||||
stdscr.erase()
|
stdscr.erase()
|
||||||
h, w = stdscr.getmaxyx()
|
h, w = stdscr.getmaxyx()
|
||||||
header = f"bot-bottle supervise ({len(pending)} pending)"
|
header = f"bot-bottle supervise ({len(pending)} pending)"
|
||||||
@@ -471,7 +498,7 @@ def _detail_view(
|
|||||||
qp: QueuedProposal,
|
qp: QueuedProposal,
|
||||||
*,
|
*,
|
||||||
green_attr: int = 0,
|
green_attr: int = 0,
|
||||||
) -> None: # pragma: no cover
|
) -> None:
|
||||||
"""Render the full proposal. Scrollable. Press q to return."""
|
"""Render the full proposal. Scrollable. Press q to return."""
|
||||||
lines = _detail_lines(qp, green_attr=green_attr)
|
lines = _detail_lines(qp, green_attr=green_attr)
|
||||||
offset = 0
|
offset = 0
|
||||||
@@ -523,7 +550,7 @@ def _detail_view(
|
|||||||
return
|
return
|
||||||
|
|
||||||
|
|
||||||
def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None: # type: ignore # pragma: no cover
|
def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None: # type: ignore
|
||||||
"""Suspend curses, open $EDITOR on the proposed file, return edited content."""
|
"""Suspend curses, open $EDITOR on the proposed file, return edited content."""
|
||||||
suffix = _suffix_for_tool(qp.proposal.tool)
|
suffix = _suffix_for_tool(qp.proposal.tool)
|
||||||
curses.endwin()
|
curses.endwin()
|
||||||
@@ -534,7 +561,7 @@ def _modify(stdscr: "curses._CursesWindow", qp: QueuedProposal) -> str | None:
|
|||||||
return edited
|
return edited
|
||||||
|
|
||||||
|
|
||||||
def _prompt(stdscr: "curses._CursesWindow", label: str) -> str: # type: ignore # pragma: no cover
|
def _prompt(stdscr: "curses._CursesWindow", label: str) -> str: # type: ignore
|
||||||
"""One-line input at the bottom of the screen."""
|
"""One-line input at the bottom of the screen."""
|
||||||
curses.curs_set(1)
|
curses.curs_set(1)
|
||||||
h, _ = stdscr.getmaxyx()
|
h, _ = stdscr.getmaxyx()
|
||||||
|
|||||||
+21
-10
@@ -210,6 +210,17 @@ def egress_token_env_map(
|
|||||||
return out
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def _yaml_str_escape(s: str) -> str:
|
||||||
|
"""Escape a string for use inside a YAML double-quoted scalar."""
|
||||||
|
return (
|
||||||
|
s.replace("\\", "\\\\")
|
||||||
|
.replace('"', '\\"')
|
||||||
|
.replace("\n", "\\n")
|
||||||
|
.replace("\r", "\\r")
|
||||||
|
.replace("\t", "\\t")
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def _route_to_yaml_fields(r: Route) -> dict[str, object]:
|
def _route_to_yaml_fields(r: Route) -> dict[str, object]:
|
||||||
fields: dict[str, object] = {"host": r.host}
|
fields: dict[str, object] = {"host": r.host}
|
||||||
if r.auth_scheme and r.token_env:
|
if r.auth_scheme and r.token_env:
|
||||||
@@ -272,12 +283,12 @@ def _render_match_entry(entry: dict[str, object]) -> list[str]:
|
|||||||
for pd in entry["paths"]: # type: ignore[union-attr]
|
for pd in entry["paths"]: # type: ignore[union-attr]
|
||||||
pd_dict: dict[str, str] = pd # type: ignore[assignment]
|
pd_dict: dict[str, str] = pd # type: ignore[assignment]
|
||||||
if "type" in pd_dict:
|
if "type" in pd_dict:
|
||||||
lines.append(f' - type: "{pd_dict["type"]}"')
|
lines.append(f' - type: "{_yaml_str_escape(pd_dict["type"])}"')
|
||||||
lines.append(f' value: "{pd_dict["value"]}"')
|
lines.append(f' value: "{_yaml_str_escape(pd_dict["value"])}"')
|
||||||
else:
|
else:
|
||||||
lines.append(f' - value: "{pd_dict["value"]}"')
|
lines.append(f' - value: "{_yaml_str_escape(pd_dict["value"])}"')
|
||||||
if "methods" in entry:
|
if "methods" in entry:
|
||||||
methods_str = ", ".join(f'"{m}"' for m in entry["methods"]) # type: ignore[union-attr]
|
methods_str = ", ".join(f'"{_yaml_str_escape(m)}"' for m in entry["methods"]) # type: ignore[union-attr]
|
||||||
prefix = " - " if first_key else " "
|
prefix = " - " if first_key else " "
|
||||||
lines.append(f'{prefix}methods: [{methods_str}]')
|
lines.append(f'{prefix}methods: [{methods_str}]')
|
||||||
first_key = False
|
first_key = False
|
||||||
@@ -287,8 +298,8 @@ def _render_match_entry(entry: dict[str, object]) -> list[str]:
|
|||||||
first_key = False
|
first_key = False
|
||||||
for hd in entry["headers"]: # type: ignore[union-attr]
|
for hd in entry["headers"]: # type: ignore[union-attr]
|
||||||
hd_dict: dict[str, str] = hd # type: ignore[assignment]
|
hd_dict: dict[str, str] = hd # type: ignore[assignment]
|
||||||
lines.append(f' - name: "{hd_dict["name"]}"')
|
lines.append(f' - name: "{_yaml_str_escape(hd_dict["name"])}"')
|
||||||
lines.append(f' value: "{hd_dict["value"]}"')
|
lines.append(f' value: "{_yaml_str_escape(hd_dict["value"])}"')
|
||||||
if first_key:
|
if first_key:
|
||||||
lines.append(" - {}")
|
lines.append(" - {}")
|
||||||
return lines
|
return lines
|
||||||
@@ -308,10 +319,10 @@ def egress_render_routes(
|
|||||||
return "\n".join(lines) + "\n"
|
return "\n".join(lines) + "\n"
|
||||||
for r in routes:
|
for r in routes:
|
||||||
f = _route_to_yaml_fields(r)
|
f = _route_to_yaml_fields(r)
|
||||||
lines.append(f' - host: "{f["host"]}"')
|
lines.append(f' - host: "{_yaml_str_escape(str(f["host"]))}"')
|
||||||
if "auth_scheme" in f:
|
if "auth_scheme" in f:
|
||||||
lines.append(f' auth_scheme: "{f["auth_scheme"]}"')
|
lines.append(f' auth_scheme: "{_yaml_str_escape(str(f["auth_scheme"]))}"')
|
||||||
lines.append(f' token_env: "{f["token_env"]}"')
|
lines.append(f' token_env: "{_yaml_str_escape(str(f["token_env"]))}"')
|
||||||
if "matches" in f:
|
if "matches" in f:
|
||||||
lines.append(" matches:")
|
lines.append(" matches:")
|
||||||
for entry in f["matches"]: # type: ignore[union-attr]
|
for entry in f["matches"]: # type: ignore[union-attr]
|
||||||
@@ -331,7 +342,7 @@ def egress_render_routes(
|
|||||||
items_str = ", ".join(f'"{x}"' for x in dv)
|
items_str = ", ".join(f'"{x}"' for x in dv)
|
||||||
lines.append(f" {dk}: [{items_str}]")
|
lines.append(f" {dk}: [{items_str}]")
|
||||||
elif isinstance(dv, str):
|
elif isinstance(dv, str):
|
||||||
lines.append(f' {dk}: "{dv}"')
|
lines.append(f' {dk}: "{_yaml_str_escape(dv)}"')
|
||||||
return "\n".join(lines) + "\n"
|
return "\n".join(lines) + "\n"
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -112,6 +112,15 @@ def git_gate_upstreams_for_bottle(bottle: ManifestBottle) -> tuple[GitGateUpstre
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _gitconfig_validate_value(field: str, value: str) -> None:
|
||||||
|
"""Raise ValueError if value contains characters that break gitconfig line syntax."""
|
||||||
|
if "\n" in value or "\r" in value:
|
||||||
|
raise ValueError(
|
||||||
|
f"git-gate: {field} contains a newline, which would inject "
|
||||||
|
f"arbitrary gitconfig keys; rejecting manifest entry"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def git_gate_render_gitconfig(
|
def git_gate_render_gitconfig(
|
||||||
entries: tuple[ManifestGitEntry, ...], gate_host: str, *, scheme: str = "git",
|
entries: tuple[ManifestGitEntry, ...], gate_host: str, *, scheme: str = "git",
|
||||||
) -> str:
|
) -> str:
|
||||||
@@ -136,6 +145,7 @@ def git_gate_render_gitconfig(
|
|||||||
"# fetch-from-upstream-before-every-upload-pack via access-hook).\n",
|
"# fetch-from-upstream-before-every-upload-pack via access-hook).\n",
|
||||||
]
|
]
|
||||||
for entry in entries:
|
for entry in entries:
|
||||||
|
_gitconfig_validate_value(f"repos[{entry.Name!r}].url", entry.Upstream)
|
||||||
out.append(f'[url "{scheme}://{gate_host}/{entry.Name}.git"]\n')
|
out.append(f'[url "{scheme}://{gate_host}/{entry.Name}.git"]\n')
|
||||||
out.append(f"\tinsteadOf = {entry.Upstream}\n")
|
out.append(f"\tinsteadOf = {entry.Upstream}\n")
|
||||||
if entry.RemoteKey and entry.RemoteKey != entry.UpstreamHost:
|
if entry.RemoteKey and entry.RemoteKey != entry.UpstreamHost:
|
||||||
@@ -148,6 +158,7 @@ def git_gate_render_gitconfig(
|
|||||||
f"ssh://{entry.UpstreamUser}@{entry.RemoteKey}{port}/"
|
f"ssh://{entry.UpstreamUser}@{entry.RemoteKey}{port}/"
|
||||||
f"{entry.UpstreamPath}"
|
f"{entry.UpstreamPath}"
|
||||||
)
|
)
|
||||||
|
_gitconfig_validate_value(f"repos[{entry.Name!r}].url (resolved alias)", alias)
|
||||||
out.append(f"\tinsteadOf = {alias}\n")
|
out.append(f"\tinsteadOf = {alias}\n")
|
||||||
return "".join(out)
|
return "".join(out)
|
||||||
|
|
||||||
|
|||||||
@@ -4,4 +4,3 @@
|
|||||||
|
|
||||||
pylint>=3.0.0
|
pylint>=3.0.0
|
||||||
pyright>=1.1.300
|
pyright>=1.1.300
|
||||||
coverage>=7.0.0
|
|
||||||
|
|||||||
@@ -92,9 +92,9 @@ class TestSandboxEscape(unittest.TestCase):
|
|||||||
"on PATH: curl -sSL https://smolmachines.com/install.sh | sh"
|
"on PATH: curl -sSL https://smolmachines.com/install.sh | sh"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Throwaway static key for the git-gate fixture. It need not
|
# Throwaway "identity file" for the git-gate's `identity` field.
|
||||||
# be a real SSH key: test 5 reaches gitleaks before any SSH
|
# It need not be a real SSH key: test 5 reaches gitleaks before
|
||||||
# attempt anyway.
|
# any SSH attempt anyway.
|
||||||
fd, kp = tempfile.mkstemp(prefix="sandbox-test-key.")
|
fd, kp = tempfile.mkstemp(prefix="sandbox-test-key.")
|
||||||
os.close(fd)
|
os.close(fd)
|
||||||
cls._key_path = Path(kp)
|
cls._key_path = Path(kp)
|
||||||
@@ -123,10 +123,7 @@ class TestSandboxEscape(unittest.TestCase):
|
|||||||
"git-gate": {"repos": {
|
"git-gate": {"repos": {
|
||||||
"throwaway": {
|
"throwaway": {
|
||||||
"url": "ssh://git@unreachable.invalid:22/throwaway.git",
|
"url": "ssh://git@unreachable.invalid:22/throwaway.git",
|
||||||
"key": {
|
"identity": str(cls._key_path),
|
||||||
"provider": "static",
|
|
||||||
"path": str(cls._key_path),
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
}},
|
}},
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -198,7 +198,6 @@ class TestSmolmachinesLaunch(unittest.TestCase):
|
|||||||
# connect fails, which is the property chunk 3 will
|
# connect fails, which is the property chunk 3 will
|
||||||
# preserve once egress is actually running.
|
# preserve once egress is actually running.
|
||||||
r = self.bottle.exec(
|
r = self.bottle.exec(
|
||||||
"env -u HTTPS_PROXY -u HTTP_PROXY -u https_proxy -u http_proxy "
|
|
||||||
f"curl -s --show-error --max-time 3 http://{self.plan.bundle_ip}:9099 "
|
f"curl -s --show-error --max-time 3 http://{self.plan.bundle_ip}:9099 "
|
||||||
"2>&1 || true"
|
"2>&1 || true"
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ from bot_bottle.egress import (
|
|||||||
Egress,
|
Egress,
|
||||||
EgressPlan,
|
EgressPlan,
|
||||||
EgressRoute,
|
EgressRoute,
|
||||||
|
_yaml_str_escape,
|
||||||
egress_agent_env_entries,
|
egress_agent_env_entries,
|
||||||
egress_manifest_routes,
|
egress_manifest_routes,
|
||||||
egress_render_routes,
|
egress_render_routes,
|
||||||
@@ -419,6 +420,76 @@ class TestRenderRoutes(unittest.TestCase):
|
|||||||
self.assertEqual(LOG_BLOCKS, cfg.log)
|
self.assertEqual(LOG_BLOCKS, cfg.log)
|
||||||
|
|
||||||
|
|
||||||
|
class TestYamlStrEscape(unittest.TestCase):
|
||||||
|
"""_yaml_str_escape produces safe YAML double-quoted scalar content."""
|
||||||
|
|
||||||
|
def test_plain_string_unchanged(self):
|
||||||
|
self.assertEqual("api.example.com", _yaml_str_escape("api.example.com"))
|
||||||
|
|
||||||
|
def test_double_quote_escaped(self):
|
||||||
|
self.assertEqual('\\"', _yaml_str_escape('"'))
|
||||||
|
|
||||||
|
def test_backslash_escaped(self):
|
||||||
|
self.assertEqual("\\\\", _yaml_str_escape("\\"))
|
||||||
|
|
||||||
|
def test_newline_escaped(self):
|
||||||
|
self.assertEqual("\\n", _yaml_str_escape("\n"))
|
||||||
|
|
||||||
|
def test_carriage_return_escaped(self):
|
||||||
|
self.assertEqual("\\r", _yaml_str_escape("\r"))
|
||||||
|
|
||||||
|
def test_tab_escaped(self):
|
||||||
|
self.assertEqual("\\t", _yaml_str_escape("\t"))
|
||||||
|
|
||||||
|
def test_combined(self):
|
||||||
|
self.assertEqual('\\"\\n\\\\', _yaml_str_escape('"\n\\'))
|
||||||
|
|
||||||
|
|
||||||
|
class TestRenderRoutesEscaping(unittest.TestCase):
|
||||||
|
"""Stray quotes/newlines in manifest strings do not corrupt routes.yaml."""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _parsed(routes) -> list[dict]: # type: ignore
|
||||||
|
return parse_yaml_subset(egress_render_routes(routes))["routes"] # type: ignore
|
||||||
|
|
||||||
|
def test_host_with_double_quote_round_trips(self):
|
||||||
|
routes = (EgressRoute(host='bad"host.example'),)
|
||||||
|
parsed = self._parsed(routes)
|
||||||
|
self.assertEqual('bad"host.example', parsed[0]["host"])
|
||||||
|
|
||||||
|
def test_host_with_newline_round_trips(self):
|
||||||
|
routes = (EgressRoute(host="host\nextra.example"),)
|
||||||
|
parsed = self._parsed(routes)
|
||||||
|
self.assertEqual("host\nextra.example", parsed[0]["host"])
|
||||||
|
|
||||||
|
def test_auth_scheme_with_double_quote_round_trips(self):
|
||||||
|
routes = (EgressRoute(
|
||||||
|
host="api.example",
|
||||||
|
auth_scheme='Bear"er',
|
||||||
|
token_env="EGRESS_TOKEN_0",
|
||||||
|
),)
|
||||||
|
parsed = self._parsed(routes)
|
||||||
|
self.assertEqual('Bear"er', parsed[0]["auth_scheme"])
|
||||||
|
|
||||||
|
def test_path_value_with_double_quote_round_trips(self):
|
||||||
|
from bot_bottle.egress_addon_core import PathMatch, MatchEntry
|
||||||
|
routes = (EgressRoute(
|
||||||
|
host="api.example",
|
||||||
|
matches=(MatchEntry(paths=(PathMatch(type="prefix", value='/v1/"quoted"/'),)),),
|
||||||
|
),)
|
||||||
|
parsed = self._parsed(routes)
|
||||||
|
self.assertEqual('/v1/"quoted"/', parsed[0]["matches"][0]["paths"][0]["value"])
|
||||||
|
|
||||||
|
def test_header_value_with_double_quote_round_trips(self):
|
||||||
|
from bot_bottle.egress_addon_core import HeaderMatch, MatchEntry
|
||||||
|
routes = (EgressRoute(
|
||||||
|
host="api.example",
|
||||||
|
matches=(MatchEntry(headers=(HeaderMatch(name="x-h", value='val"ue'),)),),
|
||||||
|
),)
|
||||||
|
parsed = self._parsed(routes)
|
||||||
|
self.assertEqual('val"ue', parsed[0]["matches"][0]["headers"][0]["value"])
|
||||||
|
|
||||||
|
|
||||||
class TestResolveTokenValues(unittest.TestCase):
|
class TestResolveTokenValues(unittest.TestCase):
|
||||||
def test_reads_host_env(self):
|
def test_reads_host_env(self):
|
||||||
out = egress_resolve_token_values(
|
out = egress_resolve_token_values(
|
||||||
|
|||||||
@@ -73,33 +73,6 @@ resolver #2
|
|||||||
)
|
)
|
||||||
self.assertTrue(run.call_args_list[-1].kwargs["check"])
|
self.assertTrue(run.call_args_list[-1].kwargs["check"])
|
||||||
|
|
||||||
def test_build_image_anchors_relative_dockerfile_to_context(self):
|
|
||||||
status = util.subprocess.CompletedProcess(
|
|
||||||
args=[],
|
|
||||||
returncode=0,
|
|
||||||
stdout=(
|
|
||||||
'[{"status":{"state":"running"},'
|
|
||||||
'"configuration":{"dns":{"nameservers":["9.9.9.9"]}}}]'
|
|
||||||
),
|
|
||||||
stderr="",
|
|
||||||
)
|
|
||||||
with patch.object(util.subprocess, "run", return_value=status) as run, \
|
|
||||||
patch.object(util.os, "environ", {
|
|
||||||
"BOT_BOTTLE_MACOS_CONTAINER_DNS": "9.9.9.9",
|
|
||||||
}):
|
|
||||||
util.build_image(
|
|
||||||
"bot-bottle-sidecars:latest",
|
|
||||||
"/repo",
|
|
||||||
dockerfile="Dockerfile.sidecars",
|
|
||||||
)
|
|
||||||
self.assertEqual(
|
|
||||||
[
|
|
||||||
"container", "build", "-t", "bot-bottle-sidecars:latest",
|
|
||||||
"--dns", "9.9.9.9", "-f", "/repo/Dockerfile.sidecars", "/repo",
|
|
||||||
],
|
|
||||||
run.call_args_list[-1].args[0],
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_commit_container_execs_tar_and_builds_image(self):
|
def test_commit_container_execs_tar_and_builds_image(self):
|
||||||
# stderr is bytes because subprocess.run uses stderr=PIPE without text=True
|
# stderr is bytes because subprocess.run uses stderr=PIPE without text=True
|
||||||
completed = util.subprocess.CompletedProcess(
|
completed = util.subprocess.CompletedProcess(
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import unittest
|
|||||||
|
|
||||||
from bot_bottle.git_gate import (
|
from bot_bottle.git_gate import (
|
||||||
GIT_GATE_HOSTNAME,
|
GIT_GATE_HOSTNAME,
|
||||||
|
_gitconfig_validate_value,
|
||||||
git_gate_render_gitconfig,
|
git_gate_render_gitconfig,
|
||||||
)
|
)
|
||||||
from bot_bottle.manifest import ManifestIndex
|
from bot_bottle.manifest import ManifestIndex
|
||||||
@@ -90,5 +91,42 @@ class TestGitGateGitconfigRender(unittest.TestCase):
|
|||||||
self.assertNotIn("gitea.dideric.is", out)
|
self.assertNotIn("gitea.dideric.is", out)
|
||||||
|
|
||||||
|
|
||||||
|
class TestGitconfigValidateValue(unittest.TestCase):
|
||||||
|
"""_gitconfig_validate_value rejects values that would inject gitconfig keys."""
|
||||||
|
|
||||||
|
def test_normal_url_passes(self):
|
||||||
|
_gitconfig_validate_value("url", "ssh://git@github.com/owner/repo.git")
|
||||||
|
|
||||||
|
def test_newline_in_url_raises(self):
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
_gitconfig_validate_value("url", "ssh://git@github.com/owner/\nrepo.git")
|
||||||
|
|
||||||
|
def test_carriage_return_in_url_raises(self):
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
_gitconfig_validate_value("url", "ssh://git@github.com/\rrepo.git")
|
||||||
|
|
||||||
|
def test_error_message_names_field(self):
|
||||||
|
with self.assertRaises(ValueError, msg="error should name the field") as ctx:
|
||||||
|
_gitconfig_validate_value("repos['bad'].url", "ssh://host/\npath")
|
||||||
|
self.assertIn("repos['bad'].url", str(ctx.exception))
|
||||||
|
|
||||||
|
|
||||||
|
class TestGitconfigRenderRejectsNewlineInUpstream(unittest.TestCase):
|
||||||
|
"""git_gate_render_gitconfig raises on Upstream values with newlines."""
|
||||||
|
|
||||||
|
def test_newline_in_upstream_raises(self):
|
||||||
|
m = ManifestIndex.from_json_obj({
|
||||||
|
"bottles": {"dev": {"git-gate": {"repos": {
|
||||||
|
"evil": {
|
||||||
|
"url": "ssh://git@github.com/owner/\nfake-key = injected\nrepo.git",
|
||||||
|
"key": {"provider": "static", "path": "/dev/null"},
|
||||||
|
},
|
||||||
|
}}}},
|
||||||
|
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
|
||||||
|
})
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
git_gate_render_gitconfig(m.bottles["dev"].git, GIT_GATE_HOSTNAME)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|||||||
@@ -20,7 +20,6 @@ import supervise as _sv # noqa: E402 # type: ignore
|
|||||||
|
|
||||||
from bot_bottle import supervise_server # noqa: E402
|
from bot_bottle import supervise_server # noqa: E402
|
||||||
from bot_bottle.supervise_server import (
|
from bot_bottle.supervise_server import (
|
||||||
ERR_INTERNAL,
|
|
||||||
ERR_INVALID_PARAMS,
|
ERR_INVALID_PARAMS,
|
||||||
ERR_INVALID_REQUEST,
|
ERR_INVALID_REQUEST,
|
||||||
ERR_METHOD_NOT_FOUND,
|
ERR_METHOD_NOT_FOUND,
|
||||||
@@ -30,9 +29,7 @@ from bot_bottle.supervise_server import (
|
|||||||
PROPOSED_FILE_FIELD,
|
PROPOSED_FILE_FIELD,
|
||||||
ServerConfig,
|
ServerConfig,
|
||||||
TOOL_DEFINITIONS,
|
TOOL_DEFINITIONS,
|
||||||
_RpcClientError,
|
|
||||||
_RpcError,
|
_RpcError,
|
||||||
_RpcInternalError,
|
|
||||||
_response_timeout_from_env,
|
_response_timeout_from_env,
|
||||||
format_response_text,
|
format_response_text,
|
||||||
handle_initialize,
|
handle_initialize,
|
||||||
@@ -50,15 +47,15 @@ from bot_bottle.supervise_server import (
|
|||||||
|
|
||||||
|
|
||||||
class TestValidation(unittest.TestCase):
|
class TestValidation(unittest.TestCase):
|
||||||
|
def test_capability_block_accepts_anything_nonempty(self):
|
||||||
|
validate_proposed_file(
|
||||||
|
_sv.TOOL_CAPABILITY_BLOCK,
|
||||||
|
"FROM python:3.13\nRUN apk add git\n",
|
||||||
|
)
|
||||||
|
|
||||||
def test_empty_proposed_file_rejected_for_tools_with_file_field(self):
|
def test_empty_proposed_file_rejected_for_tools_with_file_field(self):
|
||||||
with self.assertRaises(_RpcError):
|
with self.assertRaises(_RpcError):
|
||||||
validate_proposed_file(_sv.TOOL_EGRESS_ALLOW, " \n\t")
|
validate_proposed_file(_sv.TOOL_CAPABILITY_BLOCK, " \n\t")
|
||||||
|
|
||||||
def test_capability_block_rejected_as_unknown_tool(self):
|
|
||||||
with self.assertRaises(_RpcError) as cm:
|
|
||||||
validate_proposed_file("capability-block", "FROM python:3.13\n")
|
|
||||||
self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code)
|
|
||||||
self.assertIn("unknown tool", cm.exception.message)
|
|
||||||
|
|
||||||
def test_egress_routes_yaml_is_validated(self):
|
def test_egress_routes_yaml_is_validated(self):
|
||||||
validate_proposed_file(
|
validate_proposed_file(
|
||||||
@@ -80,65 +77,6 @@ class TestValidation(unittest.TestCase):
|
|||||||
self.assertIn("must not change egress logging", cm.exception.message)
|
self.assertIn("must not change egress logging", cm.exception.message)
|
||||||
|
|
||||||
|
|
||||||
# --- Error taxonomy --------------------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
class TestRpcErrorTaxonomy(unittest.TestCase):
|
|
||||||
def test_rpc_client_error_is_rpc_error(self):
|
|
||||||
e = _RpcClientError(ERR_INVALID_PARAMS, "bad param")
|
|
||||||
self.assertIsInstance(e, _RpcError)
|
|
||||||
self.assertEqual(ERR_INVALID_PARAMS, e.code)
|
|
||||||
self.assertEqual("bad param", e.message)
|
|
||||||
|
|
||||||
def test_rpc_internal_error_is_rpc_error(self):
|
|
||||||
e = _RpcInternalError("disk full")
|
|
||||||
self.assertIsInstance(e, _RpcError)
|
|
||||||
self.assertEqual(ERR_INTERNAL, e.code)
|
|
||||||
self.assertEqual("disk full", e.message)
|
|
||||||
|
|
||||||
def test_rpc_internal_error_preserves_cause(self):
|
|
||||||
cause = OSError("no space left on device")
|
|
||||||
try:
|
|
||||||
raise _RpcInternalError("failed to write") from cause
|
|
||||||
except _RpcInternalError as e:
|
|
||||||
self.assertIs(cause, e.__cause__)
|
|
||||||
|
|
||||||
def test_parse_error_is_client_error(self):
|
|
||||||
with self.assertRaises(_RpcClientError):
|
|
||||||
parse_jsonrpc(b"{bad json")
|
|
||||||
|
|
||||||
def test_validation_error_is_client_error(self):
|
|
||||||
with self.assertRaises(_RpcClientError):
|
|
||||||
validate_proposed_file(_sv.TOOL_EGRESS_ALLOW, "routes: nope\n")
|
|
||||||
|
|
||||||
def test_unknown_tool_in_tools_call_is_client_error(self):
|
|
||||||
config = ServerConfig(bottle_slug="dev", queue_dir=Path("/unused"))
|
|
||||||
with self.assertRaises(_RpcClientError) as cm:
|
|
||||||
handle_tools_call({"name": "no-such-tool", "arguments": {}}, config)
|
|
||||||
self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code)
|
|
||||||
|
|
||||||
|
|
||||||
class TestRpcInternalErrorOnIoFailure(unittest.TestCase):
|
|
||||||
def test_write_proposal_os_error_raises_internal(self):
|
|
||||||
config = ServerConfig(
|
|
||||||
bottle_slug="dev",
|
|
||||||
queue_dir=Path("/dev/null/cannot-exist"),
|
|
||||||
)
|
|
||||||
with self.assertRaises(_RpcInternalError) as cm:
|
|
||||||
handle_tools_call(
|
|
||||||
{
|
|
||||||
"name": _sv.TOOL_EGRESS_ALLOW,
|
|
||||||
"arguments": {
|
|
||||||
"routes_yaml": "routes:\n - host: example.com\n",
|
|
||||||
"justification": "x",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
config,
|
|
||||||
)
|
|
||||||
self.assertEqual(ERR_INTERNAL, cm.exception.code)
|
|
||||||
self.assertIsNotNone(cm.exception.__cause__)
|
|
||||||
|
|
||||||
|
|
||||||
# --- JSON-RPC parsing ------------------------------------------------------
|
# --- JSON-RPC parsing ------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
@@ -219,6 +157,7 @@ class TestHandleToolsList(unittest.TestCase):
|
|||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
sorted([
|
sorted([
|
||||||
_sv.TOOL_EGRESS_ALLOW,
|
_sv.TOOL_EGRESS_ALLOW,
|
||||||
|
_sv.TOOL_CAPABILITY_BLOCK,
|
||||||
_sv.TOOL_EGRESS_BLOCK,
|
_sv.TOOL_EGRESS_BLOCK,
|
||||||
_sv.TOOL_LIST_EGRESS_ROUTES,
|
_sv.TOOL_LIST_EGRESS_ROUTES,
|
||||||
]),
|
]),
|
||||||
@@ -294,10 +233,10 @@ class TestHandleToolsCall(unittest.TestCase):
|
|||||||
try:
|
try:
|
||||||
result = handle_tools_call(
|
result = handle_tools_call(
|
||||||
{
|
{
|
||||||
"name": _sv.TOOL_EGRESS_BLOCK,
|
"name": _sv.TOOL_CAPABILITY_BLOCK,
|
||||||
"arguments": {
|
"arguments": {
|
||||||
"routes_yaml": "routes:\n - host: example.com\n",
|
"dockerfile": "FROM python:3.13\n",
|
||||||
"justification": "need example.com",
|
"justification": "need git",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
self.config,
|
self.config,
|
||||||
@@ -334,9 +273,9 @@ class TestHandleToolsCall(unittest.TestCase):
|
|||||||
try:
|
try:
|
||||||
result = handle_tools_call(
|
result = handle_tools_call(
|
||||||
{
|
{
|
||||||
"name": _sv.TOOL_EGRESS_ALLOW,
|
"name": _sv.TOOL_CAPABILITY_BLOCK,
|
||||||
"arguments": {
|
"arguments": {
|
||||||
"routes_yaml": "routes:\n - host: example.com\n",
|
"dockerfile": "FROM python:3.13\n",
|
||||||
"justification": "needed for tests",
|
"justification": "needed for tests",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@@ -358,52 +297,20 @@ class TestHandleToolsCall(unittest.TestCase):
|
|||||||
with self.assertRaises(_RpcError):
|
with self.assertRaises(_RpcError):
|
||||||
handle_tools_call(
|
handle_tools_call(
|
||||||
{
|
{
|
||||||
"name": _sv.TOOL_EGRESS_ALLOW,
|
"name": _sv.TOOL_CAPABILITY_BLOCK,
|
||||||
"arguments": {"routes_yaml": "routes:\n - host: example.com\n"},
|
"arguments": {"dockerfile": "FROM python:3.13\n"},
|
||||||
},
|
},
|
||||||
self.config,
|
self.config,
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_missing_name_raises(self):
|
|
||||||
with self.assertRaises(_RpcError) as cm:
|
|
||||||
handle_tools_call({"arguments": {}}, self.config)
|
|
||||||
self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code)
|
|
||||||
|
|
||||||
def test_arguments_must_be_object(self):
|
|
||||||
with self.assertRaises(_RpcError) as cm:
|
|
||||||
handle_tools_call(
|
|
||||||
{
|
|
||||||
"name": _sv.TOOL_EGRESS_ALLOW,
|
|
||||||
"arguments": [],
|
|
||||||
},
|
|
||||||
self.config,
|
|
||||||
)
|
|
||||||
self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code)
|
|
||||||
self.assertIn("must be an object", cm.exception.message)
|
|
||||||
|
|
||||||
def test_capability_block_call_raises_unknown_tool(self):
|
|
||||||
with self.assertRaises(_RpcError) as cm:
|
|
||||||
handle_tools_call(
|
|
||||||
{
|
|
||||||
"name": "capability-block",
|
|
||||||
"arguments": {
|
|
||||||
"dockerfile": "FROM python:3.13\n",
|
|
||||||
"justification": "need git",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
self.config,
|
|
||||||
)
|
|
||||||
self.assertEqual(ERR_INVALID_PARAMS, cm.exception.code)
|
|
||||||
self.assertIn("unknown tool", cm.exception.message)
|
|
||||||
|
|
||||||
def test_archives_proposal_after_response(self):
|
def test_archives_proposal_after_response(self):
|
||||||
responder = self._respond_when_proposal_appears(_sv.STATUS_APPROVED)
|
responder = self._respond_when_proposal_appears(_sv.STATUS_APPROVED)
|
||||||
try:
|
try:
|
||||||
handle_tools_call(
|
handle_tools_call(
|
||||||
{
|
{
|
||||||
"name": _sv.TOOL_EGRESS_ALLOW,
|
"name": _sv.TOOL_CAPABILITY_BLOCK,
|
||||||
"arguments": {
|
"arguments": {
|
||||||
"routes_yaml": "routes:\n - host: example.com\n",
|
"dockerfile": "FROM python:3.13\n",
|
||||||
"justification": "x",
|
"justification": "x",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@@ -425,10 +332,10 @@ class TestHandleToolsCall(unittest.TestCase):
|
|||||||
)
|
)
|
||||||
result = handle_tools_call(
|
result = handle_tools_call(
|
||||||
{
|
{
|
||||||
"name": _sv.TOOL_EGRESS_ALLOW,
|
"name": _sv.TOOL_CAPABILITY_BLOCK,
|
||||||
"arguments": {
|
"arguments": {
|
||||||
"routes_yaml": "routes:\n - host: example.com\n",
|
"dockerfile": "FROM python:3.13\n",
|
||||||
"justification": "need egress",
|
"justification": "need a capability",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
config,
|
config,
|
||||||
@@ -443,31 +350,6 @@ class TestHandleToolsCall(unittest.TestCase):
|
|||||||
|
|
||||||
|
|
||||||
class TestHandleListEgressRoutes(unittest.TestCase):
|
class TestHandleListEgressRoutes(unittest.TestCase):
|
||||||
def test_success_returns_body_text(self):
|
|
||||||
class _Resp:
|
|
||||||
def __enter__(self):
|
|
||||||
return self
|
|
||||||
|
|
||||||
def __exit__(self, exc_type: type[BaseException] | None, exc: BaseException | None, tb: object) -> bool:
|
|
||||||
return False
|
|
||||||
|
|
||||||
def read(self):
|
|
||||||
return b"[{\"host\": \"example.com\"}]"
|
|
||||||
|
|
||||||
class _Opener:
|
|
||||||
def open(self, *args, **kwargs): # noqa: ANN001, ANN002, ANN003 # type: ignore
|
|
||||||
return _Resp()
|
|
||||||
|
|
||||||
with patch.object(supervise_server.urllib.request, "build_opener", return_value=_Opener()):
|
|
||||||
result = handle_list_egress_routes(
|
|
||||||
{},
|
|
||||||
ServerConfig(bottle_slug="dev", queue_dir=Path("/unused")),
|
|
||||||
)
|
|
||||||
|
|
||||||
self.assertFalse(result["isError"]) # type: ignore[index]
|
|
||||||
text = result["content"][0]["text"] # type: ignore[index]
|
|
||||||
self.assertIn("example.com", text)
|
|
||||||
|
|
||||||
def test_url_error_returns_tool_error(self):
|
def test_url_error_returns_tool_error(self):
|
||||||
class _Opener:
|
class _Opener:
|
||||||
def open(self, *args, **kwargs): # noqa: ANN001, ANN002, ANN003 # type: ignore
|
def open(self, *args, **kwargs): # noqa: ANN001, ANN002, ANN003 # type: ignore
|
||||||
@@ -527,13 +409,6 @@ class TestFormatResponseText(unittest.TestCase):
|
|||||||
self.assertIn("the operator modified", text.lower())
|
self.assertIn("the operator modified", text.lower())
|
||||||
|
|
||||||
|
|
||||||
class TestFormatPendingResponseText(unittest.TestCase):
|
|
||||||
def test_formats_timeout_message(self):
|
|
||||||
text = supervise_server.format_pending_response_text(12.5)
|
|
||||||
self.assertIn("status: pending", text)
|
|
||||||
self.assertIn("12.5s", text)
|
|
||||||
|
|
||||||
|
|
||||||
# --- End-to-end HTTP sanity ------------------------------------------------
|
# --- End-to-end HTTP sanity ------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
@@ -584,7 +459,7 @@ class TestHttpEndToEnd(unittest.TestCase):
|
|||||||
self.assertEqual("2.0", result["jsonrpc"])
|
self.assertEqual("2.0", result["jsonrpc"])
|
||||||
self.assertEqual(1, result["id"])
|
self.assertEqual(1, result["id"])
|
||||||
names = [t["name"] for t in result["result"]["tools"]] # type: ignore[index]
|
names = [t["name"] for t in result["result"]["tools"]] # type: ignore[index]
|
||||||
self.assertNotIn("capability-block", names)
|
self.assertIn(_sv.TOOL_CAPABILITY_BLOCK, names)
|
||||||
self.assertIn(_sv.TOOL_EGRESS_ALLOW, names)
|
self.assertIn(_sv.TOOL_EGRESS_ALLOW, names)
|
||||||
self.assertIn(_sv.TOOL_EGRESS_BLOCK, names)
|
self.assertIn(_sv.TOOL_EGRESS_BLOCK, names)
|
||||||
|
|
||||||
@@ -594,26 +469,6 @@ class TestHttpEndToEnd(unittest.TestCase):
|
|||||||
)
|
)
|
||||||
self.assertEqual(ERR_METHOD_NOT_FOUND, result["error"]["code"]) # type: ignore[index]
|
self.assertEqual(ERR_METHOD_NOT_FOUND, result["error"]["code"]) # type: ignore[index]
|
||||||
|
|
||||||
def test_internal_error_returns_err_internal_over_http(self):
|
|
||||||
with patch.object(
|
|
||||||
supervise_server._sv, "write_proposal",
|
|
||||||
side_effect=OSError("disk full"),
|
|
||||||
):
|
|
||||||
result = self._post_jsonrpc({
|
|
||||||
"jsonrpc": "2.0",
|
|
||||||
"id": 99,
|
|
||||||
"method": "tools/call",
|
|
||||||
"params": {
|
|
||||||
"name": _sv.TOOL_EGRESS_ALLOW,
|
|
||||||
"arguments": {
|
|
||||||
"routes_yaml": "routes:\n - host: example.com\n",
|
|
||||||
"justification": "x",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
})
|
|
||||||
self.assertIn("error", result)
|
|
||||||
self.assertEqual(ERR_INTERNAL, result["error"]["code"]) # type: ignore[index]
|
|
||||||
|
|
||||||
def test_health_endpoint(self):
|
def test_health_endpoint(self):
|
||||||
conn = http.client.HTTPConnection("127.0.0.1", self.port, timeout=5)
|
conn = http.client.HTTPConnection("127.0.0.1", self.port, timeout=5)
|
||||||
try:
|
try:
|
||||||
|
|||||||
Reference in New Issue
Block a user