feat(ssh-gate): add abstract SSHGate + plan dataclass

First piece of PRD 0007: the per-agent SSH egress gate that will
let pipelock stop seeing SSH traffic. This commit only lands the
backend-agnostic surface — the SSHGate ABC, SSHGatePlan, the
listen-port assignment (BASE_LISTEN_PORT + index), and the
entrypoint-script renderer. Backend wiring lands in follow-up
commits.
This commit is contained in:
2026-05-12 15:56:52 -04:00
parent b2927b1483
commit f7fb691626
2 changed files with 238 additions and 0 deletions
+124
View File
@@ -0,0 +1,124 @@
"""Per-agent SSH egress gate (PRD 0007).
A second per-agent sidecar that does plain TCP forwarding from a set
of static listen ports to the SSH hosts declared in `bottle.ssh`.
The agent's ssh client points each `Host` block at the gate
container + a per-entry listen port; pipelock stops seeing SSH
traffic entirely.
This module defines the abstract gate (`SSHGate`) and the plan
dataclass (`SSHGatePlan`) consumed by its `start`. The sidecar's
start/stop lifecycle is backend-specific and lives on concrete
subclasses (see `claude_bottle/backend/docker/ssh_gate.py`)."""
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
from .manifest import Bottle
# First listen port on the gate; entry i listens on BASE_LISTEN_PORT + i.
# Picked high enough to avoid colliding with anything an alpine image
# might pre-bind. The port space is per-gate (gate is per-agent) so
# collisions across bottles aren't possible.
BASE_LISTEN_PORT = 30000
@dataclass(frozen=True)
class SSHGateUpstream:
"""One forwarder rule on the gate: listen locally on `listen_port`,
forward each connection to `upstream_host:upstream_port`. The
`bottle_host_alias` is the `Host` value from the manifest entry,
kept for diagnostics + so the ssh provisioner can correlate
upstreams with their alias."""
listen_port: int
upstream_host: str
upstream_port: str
bottle_host_alias: str
@dataclass(frozen=True)
class SSHGatePlan:
"""Output of SSHGate.prepare; consumed by .start when the sidecar
needs to be brought up.
`upstreams` + `slug` + `entrypoint_script` are filled in at
prepare time (host-side, side-effect-free on docker). The network
fields are populated by the backend's launch step via
`dataclasses.replace` once those networks exist. Empty defaults
are sentinels meaning "not yet set"; `.start` validates that
they are populated."""
slug: str
entrypoint_script: Path
upstreams: tuple[SSHGateUpstream, ...]
internal_network: str = ""
egress_network: str = ""
def ssh_gate_upstreams_for_bottle(bottle: Bottle) -> tuple[SSHGateUpstream, ...]:
"""Deterministic assignment of listen ports to bottle.ssh entries:
BASE_LISTEN_PORT + index. Order matches `bottle.ssh` so a manifest
re-order yields a different port mapping (intentional — the
provisioner reads the same tuple)."""
return tuple(
SSHGateUpstream(
listen_port=BASE_LISTEN_PORT + i,
upstream_host=e.Hostname,
upstream_port=e.Port,
bottle_host_alias=e.Host,
)
for i, e in enumerate(bottle.ssh)
)
def ssh_gate_render_entrypoint(upstreams: tuple[SSHGateUpstream, ...]) -> str:
"""Render the gate's entrypoint script: one `socat TCP-LISTEN`
per upstream, all backgrounded, then `wait`. Posix sh, no bash-isms
(alpine's sh is busybox ash). If any one socat dies, the others
keep running until the container is removed — matches the v1
no-restart policy from the PRD."""
lines = ["#!/bin/sh", "set -eu"]
for u in upstreams:
lines.append(
f"socat TCP-LISTEN:{u.listen_port},reuseaddr,fork "
f"TCP:{u.upstream_host}:{u.upstream_port} &"
)
lines.append("wait")
return "\n".join(lines) + "\n"
class SSHGate(ABC):
"""The per-agent SSH egress gate. Encapsulates the host-side
prepare step (upstream allocation + entrypoint render); the
sidecar's start/stop lifecycle is backend-specific and lives on
concrete subclasses."""
def prepare(self, bottle: Bottle, slug: str, stage_dir: Path) -> SSHGatePlan:
"""Compute the upstream table from `bottle.ssh` and write the
entrypoint script (mode 600) under `stage_dir`. Pure host-side,
no docker subprocess.
Returned plan is incomplete: the launch step must fill
`internal_network` / `egress_network` via `dataclasses.replace`
before passing the plan to `.start`."""
upstreams = ssh_gate_upstreams_for_bottle(bottle)
script = stage_dir / "ssh_gate_entrypoint.sh"
script.write_text(ssh_gate_render_entrypoint(upstreams))
script.chmod(0o600)
return SSHGatePlan(slug=slug, entrypoint_script=script, upstreams=upstreams)
@abstractmethod
def start(self, plan: SSHGatePlan) -> str:
"""Bring up the gate sidecar according to `plan`. Returns the
target string identifying the running instance — the same
value to pass to `.stop`. Backend-specific."""
@abstractmethod
def stop(self, target: str) -> None:
"""Tear down the gate sidecar identified by `target` (the
value `.start` returned). Idempotent: a missing target is
success. Backend-specific."""
+114
View File
@@ -0,0 +1,114 @@
"""Unit: SSHGate prepare shape + entrypoint render."""
import os
import stat
import tempfile
import unittest
from pathlib import Path
from claude_bottle.ssh_gate import (
BASE_LISTEN_PORT,
SSHGate,
SSHGatePlan,
SSHGateUpstream,
ssh_gate_render_entrypoint,
ssh_gate_upstreams_for_bottle,
)
from tests.fixtures import fixture_minimal, fixture_with_ssh
class _StubGate(SSHGate):
"""Concrete subclass for testing the abstract `prepare`. The
backend-specific start/stop aren't exercised here."""
def start(self, plan: SSHGatePlan) -> str:
raise NotImplementedError
def stop(self, target: str) -> None:
raise NotImplementedError
class TestUpstreamAssignment(unittest.TestCase):
def test_indexed_listen_ports(self):
bottle = fixture_with_ssh().bottles["dev"]
upstreams = ssh_gate_upstreams_for_bottle(bottle)
self.assertEqual(2, len(upstreams))
self.assertEqual(BASE_LISTEN_PORT, upstreams[0].listen_port)
self.assertEqual(BASE_LISTEN_PORT + 1, upstreams[1].listen_port)
def test_upstream_fields_mirror_ssh_entry(self):
bottle = fixture_with_ssh().bottles["dev"]
first = ssh_gate_upstreams_for_bottle(bottle)[0]
# The fixture's first ssh entry: tailscale-gitea / 100.78.141.42:30009.
self.assertEqual("tailscale-gitea", first.bottle_host_alias)
self.assertEqual("100.78.141.42", first.upstream_host)
self.assertEqual("30009", first.upstream_port)
def test_empty_bottle_yields_empty_upstreams(self):
bottle = fixture_minimal().bottles["dev"]
self.assertEqual((), ssh_gate_upstreams_for_bottle(bottle))
class TestEntrypointRender(unittest.TestCase):
def test_one_socat_line_per_upstream(self):
upstreams = (
SSHGateUpstream(30000, "gitea.example", "22", "gitea"),
SSHGateUpstream(30001, "github.com", "22", "gh"),
)
script = ssh_gate_render_entrypoint(upstreams)
self.assertIn("#!/bin/sh", script)
self.assertIn(
"socat TCP-LISTEN:30000,reuseaddr,fork TCP:gitea.example:22 &", script
)
self.assertIn(
"socat TCP-LISTEN:30001,reuseaddr,fork TCP:github.com:22 &", script
)
# wait blocks the entrypoint so PID 1 stays alive while sockets
# are open.
self.assertTrue(script.rstrip().endswith("wait"))
def test_empty_upstreams_still_has_wait(self):
# Defensive: a no-upstream gate is a no-op, but render must
# still produce a valid shell script.
script = ssh_gate_render_entrypoint(())
self.assertIn("#!/bin/sh", script)
self.assertIn("wait", script)
class TestPrepare(unittest.TestCase):
def setUp(self):
self.stage = Path(tempfile.mkdtemp())
def tearDown(self):
import shutil
shutil.rmtree(self.stage, ignore_errors=True)
def test_prepare_writes_entrypoint_mode_600(self):
plan = _StubGate().prepare(
fixture_with_ssh().bottles["dev"], "demo", self.stage
)
self.assertEqual(self.stage / "ssh_gate_entrypoint.sh", plan.entrypoint_script)
self.assertEqual(0o600, os.stat(plan.entrypoint_script).st_mode & 0o777)
def test_prepare_plan_carries_upstreams_and_slug(self):
plan = _StubGate().prepare(
fixture_with_ssh().bottles["dev"], "demo", self.stage
)
self.assertEqual("demo", plan.slug)
self.assertEqual(2, len(plan.upstreams))
self.assertEqual("", plan.internal_network)
self.assertEqual("", plan.egress_network)
def test_prepare_with_no_ssh_writes_minimal_script(self):
plan = _StubGate().prepare(
fixture_minimal().bottles["dev"], "demo", self.stage
)
self.assertEqual((), plan.upstreams)
content = plan.entrypoint_script.read_text()
self.assertNotIn("socat", content)
self.assertIn("wait", content)
if __name__ == "__main__":
unittest.main()