From f7fb691626b70ae2ff250709f4aad3a4836f612c Mon Sep 17 00:00:00 2001 From: didericis Date: Tue, 12 May 2026 15:56:52 -0400 Subject: [PATCH] feat(ssh-gate): add abstract SSHGate + plan dataclass MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit First piece of PRD 0007: the per-agent SSH egress gate that will let pipelock stop seeing SSH traffic. This commit only lands the backend-agnostic surface — the SSHGate ABC, SSHGatePlan, the listen-port assignment (BASE_LISTEN_PORT + index), and the entrypoint-script renderer. Backend wiring lands in follow-up commits. --- claude_bottle/ssh_gate.py | 124 ++++++++++++++++++++++++++++++++++++ tests/unit/test_ssh_gate.py | 114 +++++++++++++++++++++++++++++++++ 2 files changed, 238 insertions(+) create mode 100644 claude_bottle/ssh_gate.py create mode 100644 tests/unit/test_ssh_gate.py diff --git a/claude_bottle/ssh_gate.py b/claude_bottle/ssh_gate.py new file mode 100644 index 0000000..a531d7f --- /dev/null +++ b/claude_bottle/ssh_gate.py @@ -0,0 +1,124 @@ +"""Per-agent SSH egress gate (PRD 0007). + +A second per-agent sidecar that does plain TCP forwarding from a set +of static listen ports to the SSH hosts declared in `bottle.ssh`. +The agent's ssh client points each `Host` block at the gate +container + a per-entry listen port; pipelock stops seeing SSH +traffic entirely. + +This module defines the abstract gate (`SSHGate`) and the plan +dataclass (`SSHGatePlan`) consumed by its `start`. The sidecar's +start/stop lifecycle is backend-specific and lives on concrete +subclasses (see `claude_bottle/backend/docker/ssh_gate.py`).""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from pathlib import Path + +from .manifest import Bottle + +# First listen port on the gate; entry i listens on BASE_LISTEN_PORT + i. +# Picked high enough to avoid colliding with anything an alpine image +# might pre-bind. The port space is per-gate (gate is per-agent) so +# collisions across bottles aren't possible. +BASE_LISTEN_PORT = 30000 + + +@dataclass(frozen=True) +class SSHGateUpstream: + """One forwarder rule on the gate: listen locally on `listen_port`, + forward each connection to `upstream_host:upstream_port`. The + `bottle_host_alias` is the `Host` value from the manifest entry, + kept for diagnostics + so the ssh provisioner can correlate + upstreams with their alias.""" + + listen_port: int + upstream_host: str + upstream_port: str + bottle_host_alias: str + + +@dataclass(frozen=True) +class SSHGatePlan: + """Output of SSHGate.prepare; consumed by .start when the sidecar + needs to be brought up. + + `upstreams` + `slug` + `entrypoint_script` are filled in at + prepare time (host-side, side-effect-free on docker). The network + fields are populated by the backend's launch step via + `dataclasses.replace` once those networks exist. Empty defaults + are sentinels meaning "not yet set"; `.start` validates that + they are populated.""" + + slug: str + entrypoint_script: Path + upstreams: tuple[SSHGateUpstream, ...] + internal_network: str = "" + egress_network: str = "" + + +def ssh_gate_upstreams_for_bottle(bottle: Bottle) -> tuple[SSHGateUpstream, ...]: + """Deterministic assignment of listen ports to bottle.ssh entries: + BASE_LISTEN_PORT + index. Order matches `bottle.ssh` so a manifest + re-order yields a different port mapping (intentional — the + provisioner reads the same tuple).""" + return tuple( + SSHGateUpstream( + listen_port=BASE_LISTEN_PORT + i, + upstream_host=e.Hostname, + upstream_port=e.Port, + bottle_host_alias=e.Host, + ) + for i, e in enumerate(bottle.ssh) + ) + + +def ssh_gate_render_entrypoint(upstreams: tuple[SSHGateUpstream, ...]) -> str: + """Render the gate's entrypoint script: one `socat TCP-LISTEN` + per upstream, all backgrounded, then `wait`. Posix sh, no bash-isms + (alpine's sh is busybox ash). If any one socat dies, the others + keep running until the container is removed — matches the v1 + no-restart policy from the PRD.""" + lines = ["#!/bin/sh", "set -eu"] + for u in upstreams: + lines.append( + f"socat TCP-LISTEN:{u.listen_port},reuseaddr,fork " + f"TCP:{u.upstream_host}:{u.upstream_port} &" + ) + lines.append("wait") + return "\n".join(lines) + "\n" + + +class SSHGate(ABC): + """The per-agent SSH egress gate. Encapsulates the host-side + prepare step (upstream allocation + entrypoint render); the + sidecar's start/stop lifecycle is backend-specific and lives on + concrete subclasses.""" + + def prepare(self, bottle: Bottle, slug: str, stage_dir: Path) -> SSHGatePlan: + """Compute the upstream table from `bottle.ssh` and write the + entrypoint script (mode 600) under `stage_dir`. Pure host-side, + no docker subprocess. + + Returned plan is incomplete: the launch step must fill + `internal_network` / `egress_network` via `dataclasses.replace` + before passing the plan to `.start`.""" + upstreams = ssh_gate_upstreams_for_bottle(bottle) + script = stage_dir / "ssh_gate_entrypoint.sh" + script.write_text(ssh_gate_render_entrypoint(upstreams)) + script.chmod(0o600) + return SSHGatePlan(slug=slug, entrypoint_script=script, upstreams=upstreams) + + @abstractmethod + def start(self, plan: SSHGatePlan) -> str: + """Bring up the gate sidecar according to `plan`. Returns the + target string identifying the running instance — the same + value to pass to `.stop`. Backend-specific.""" + + @abstractmethod + def stop(self, target: str) -> None: + """Tear down the gate sidecar identified by `target` (the + value `.start` returned). Idempotent: a missing target is + success. Backend-specific.""" diff --git a/tests/unit/test_ssh_gate.py b/tests/unit/test_ssh_gate.py new file mode 100644 index 0000000..4c8d89a --- /dev/null +++ b/tests/unit/test_ssh_gate.py @@ -0,0 +1,114 @@ +"""Unit: SSHGate prepare shape + entrypoint render.""" + +import os +import stat +import tempfile +import unittest +from pathlib import Path + +from claude_bottle.ssh_gate import ( + BASE_LISTEN_PORT, + SSHGate, + SSHGatePlan, + SSHGateUpstream, + ssh_gate_render_entrypoint, + ssh_gate_upstreams_for_bottle, +) +from tests.fixtures import fixture_minimal, fixture_with_ssh + + +class _StubGate(SSHGate): + """Concrete subclass for testing the abstract `prepare`. The + backend-specific start/stop aren't exercised here.""" + + def start(self, plan: SSHGatePlan) -> str: + raise NotImplementedError + + def stop(self, target: str) -> None: + raise NotImplementedError + + +class TestUpstreamAssignment(unittest.TestCase): + def test_indexed_listen_ports(self): + bottle = fixture_with_ssh().bottles["dev"] + upstreams = ssh_gate_upstreams_for_bottle(bottle) + self.assertEqual(2, len(upstreams)) + self.assertEqual(BASE_LISTEN_PORT, upstreams[0].listen_port) + self.assertEqual(BASE_LISTEN_PORT + 1, upstreams[1].listen_port) + + def test_upstream_fields_mirror_ssh_entry(self): + bottle = fixture_with_ssh().bottles["dev"] + first = ssh_gate_upstreams_for_bottle(bottle)[0] + # The fixture's first ssh entry: tailscale-gitea / 100.78.141.42:30009. + self.assertEqual("tailscale-gitea", first.bottle_host_alias) + self.assertEqual("100.78.141.42", first.upstream_host) + self.assertEqual("30009", first.upstream_port) + + def test_empty_bottle_yields_empty_upstreams(self): + bottle = fixture_minimal().bottles["dev"] + self.assertEqual((), ssh_gate_upstreams_for_bottle(bottle)) + + +class TestEntrypointRender(unittest.TestCase): + def test_one_socat_line_per_upstream(self): + upstreams = ( + SSHGateUpstream(30000, "gitea.example", "22", "gitea"), + SSHGateUpstream(30001, "github.com", "22", "gh"), + ) + script = ssh_gate_render_entrypoint(upstreams) + self.assertIn("#!/bin/sh", script) + self.assertIn( + "socat TCP-LISTEN:30000,reuseaddr,fork TCP:gitea.example:22 &", script + ) + self.assertIn( + "socat TCP-LISTEN:30001,reuseaddr,fork TCP:github.com:22 &", script + ) + # wait blocks the entrypoint so PID 1 stays alive while sockets + # are open. + self.assertTrue(script.rstrip().endswith("wait")) + + def test_empty_upstreams_still_has_wait(self): + # Defensive: a no-upstream gate is a no-op, but render must + # still produce a valid shell script. + script = ssh_gate_render_entrypoint(()) + self.assertIn("#!/bin/sh", script) + self.assertIn("wait", script) + + +class TestPrepare(unittest.TestCase): + def setUp(self): + self.stage = Path(tempfile.mkdtemp()) + + def tearDown(self): + import shutil + + shutil.rmtree(self.stage, ignore_errors=True) + + def test_prepare_writes_entrypoint_mode_600(self): + plan = _StubGate().prepare( + fixture_with_ssh().bottles["dev"], "demo", self.stage + ) + self.assertEqual(self.stage / "ssh_gate_entrypoint.sh", plan.entrypoint_script) + self.assertEqual(0o600, os.stat(plan.entrypoint_script).st_mode & 0o777) + + def test_prepare_plan_carries_upstreams_and_slug(self): + plan = _StubGate().prepare( + fixture_with_ssh().bottles["dev"], "demo", self.stage + ) + self.assertEqual("demo", plan.slug) + self.assertEqual(2, len(plan.upstreams)) + self.assertEqual("", plan.internal_network) + self.assertEqual("", plan.egress_network) + + def test_prepare_with_no_ssh_writes_minimal_script(self): + plan = _StubGate().prepare( + fixture_minimal().bottles["dev"], "demo", self.stage + ) + self.assertEqual((), plan.upstreams) + content = plan.entrypoint_script.read_text() + self.assertNotIn("socat", content) + self.assertIn("wait", content) + + +if __name__ == "__main__": + unittest.main()