Merge pull request 'PRD 0007: SSH egress gate' (#10) from ssh-egress-gate into main
test / unit (push) Successful in 11s
test / integration (push) Successful in 13s

This commit was merged in pull request #10.
This commit is contained in:
2026-05-12 16:21:11 -04:00
17 changed files with 759 additions and 150 deletions
+8 -2
View File
@@ -29,6 +29,7 @@ from .provision import git as _git
from .provision import prompt as _prompt
from .provision import skills as _skills
from .provision import ssh as _ssh
from .ssh_gate import DockerSSHGate
class DockerBottleBackend(BottleBackend["DockerBottlePlan", "DockerBottleCleanupPlan"]):
@@ -39,13 +40,18 @@ class DockerBottleBackend(BottleBackend["DockerBottlePlan", "DockerBottleCleanup
def __init__(self) -> None:
self._proxy = DockerPipelockProxy()
self._gate = DockerSSHGate()
def _resolve_plan(self, spec: BottleSpec, *, stage_dir: Path) -> DockerBottlePlan:
return _prepare.resolve_plan(spec, stage_dir=stage_dir, proxy=self._proxy)
return _prepare.resolve_plan(
spec, stage_dir=stage_dir, proxy=self._proxy, gate=self._gate
)
@contextmanager
def launch(self, plan: DockerBottlePlan) -> Generator[DockerBottle, None, None]:
with _launch.launch(plan, proxy=self._proxy, provision=self.provision) as bottle:
with _launch.launch(
plan, proxy=self._proxy, gate=self._gate, provision=self.provision
) as bottle:
yield bottle
def provision_ca(self, plan: DockerBottlePlan, target: str) -> None:
@@ -14,6 +14,7 @@ from pathlib import Path
from ...log import info
from ...manifest import Agent, Bottle
from ...pipelock import PipelockProxyPlan, pipelock_effective_allowlist
from ...ssh_gate import SSHGatePlan
from .. import BottlePlan
@@ -49,6 +50,7 @@ class DockerBottlePlan(BottlePlan):
forwarded_env: dict[str, str] = field(repr=False)
prompt_file: Path
proxy_plan: PipelockProxyPlan
gate_plan: SSHGatePlan
allowlist_summary: str
use_runsc: bool
@@ -90,6 +92,12 @@ class DockerBottlePlan(BottlePlan):
info(f"bottle : {v.agent.bottle}")
if v.ssh_hosts:
info(f" ssh hosts : {', '.join(v.ssh_hosts)}")
gate_lines = [
f"{u.bottle_host_alias} -> {u.upstream_host}:{u.upstream_port} "
f"(listen {u.listen_port})"
for u in self.gate_plan.upstreams
]
info(f" ssh gate : {'; '.join(gate_lines)}")
else:
info(" ssh hosts : (none)")
info(f" egress : {self.allowlist_summary}")
@@ -115,6 +123,14 @@ class DockerBottlePlan(BottlePlan):
"env_names": v.env_names,
"skills": list(v.agent.skills),
"ssh_hosts": v.ssh_hosts,
"ssh_gate": [
{
"host": u.bottle_host_alias,
"upstream": f"{u.upstream_host}:{u.upstream_port}",
"listen_port": u.listen_port,
}
for u in self.gate_plan.upstreams
],
"egress": {
"host_count": len(hosts),
"hosts": hosts,
+17
View File
@@ -24,6 +24,7 @@ from .bottle import DockerBottle
from .bottle_plan import DockerBottlePlan
from .pipelock import DockerPipelockProxy, pipelock_proxy_url, pipelock_tls_init
from .provision.ca import AGENT_CA_BUNDLE, AGENT_CA_PATH
from .ssh_gate import DockerSSHGate
# Where the repo root lives, for `docker build` context. Computed once.
@@ -35,6 +36,7 @@ def launch(
plan: DockerBottlePlan,
*,
proxy: DockerPipelockProxy,
gate: DockerSSHGate,
provision: Callable[[DockerBottlePlan, str], str | None],
) -> Generator[DockerBottle, None, None]:
"""Build, launch, and provision a Docker bottle. Teardown on exit.
@@ -85,6 +87,21 @@ def launch(
pipelock_name = proxy.start(plan.proxy_plan)
stack.callback(proxy.stop, pipelock_name)
# SSH egress gate (PRD 0007). One sidecar per agent, only
# brought up when the bottle has ssh entries. Lives on the
# same internal + egress networks pipelock straddles; the
# agent dials it by container name (DNS works on --internal,
# confirmed by the PRD 0007 spike).
if plan.gate_plan.upstreams:
gate_plan = dataclasses.replace(
plan.gate_plan,
internal_network=internal_network,
egress_network=egress_network,
)
plan = dataclasses.replace(plan, gate_plan=gate_plan)
gate_name = gate.start(plan.gate_plan)
stack.callback(gate.stop, gate_name)
container = _run_agent_container(plan, internal_network)
stack.callback(docker_mod.force_remove_container, container)
-4
View File
@@ -37,10 +37,6 @@ def pipelock_proxy_url(slug: str) -> str:
return f"http://{pipelock_container_name(slug)}:{PIPELOCK_PORT}"
def pipelock_proxy_host_port(slug: str) -> str:
return f"{pipelock_container_name(slug)}:{PIPELOCK_PORT}"
def pipelock_tls_init(stage_dir: Path) -> tuple[Path, Path]:
"""Generate a fresh per-bottle CA via a one-shot pipelock container.
+4
View File
@@ -20,6 +20,7 @@ from .. import BottleSpec
from . import util as docker_mod
from .bottle_plan import DockerBottlePlan
from .pipelock import DockerPipelockProxy
from .ssh_gate import DockerSSHGate
def resolve_plan(
@@ -27,6 +28,7 @@ def resolve_plan(
*,
stage_dir: Path,
proxy: DockerPipelockProxy,
gate: DockerSSHGate,
) -> DockerBottlePlan:
"""Resolve Docker-specific names and write scratch files. Trusts
that the agent and its skills/SSH keys are present — validation
@@ -78,6 +80,7 @@ def resolve_plan(
prompt_file.chmod(0o600)
proxy_plan = proxy.prepare(bottle, slug, stage_dir)
gate_plan = gate.prepare(bottle, slug, stage_dir)
resolved = resolve_env(manifest, spec.agent_name)
# Everything that should reach the bottle by-name (so its value
# never lands on argv or in env_file) goes into one dict. The
@@ -105,6 +108,7 @@ def resolve_plan(
forwarded_env=forwarded_env,
prompt_file=prompt_file,
proxy_plan=proxy_plan,
gate_plan=gate_plan,
allowlist_summary=allowlist_summary,
use_runsc=use_runsc,
)
+30 -24
View File
@@ -17,11 +17,11 @@ from __future__ import annotations
import os
import subprocess
from ....log import info
from ....log import die, info
from ....util import expand_tilde
from .. import util as docker_mod
from ..bottle_plan import DockerBottlePlan
from ..pipelock import pipelock_proxy_host_port
from ..ssh_gate import ssh_gate_host
def provision_ssh(plan: DockerBottlePlan, target: str) -> None:
@@ -61,13 +61,23 @@ def provision_ssh(plan: DockerBottlePlan, target: str) -> None:
return
container = target
proxy_host_port = pipelock_proxy_host_port(plan.slug)
gate_target = ssh_gate_host(plan.slug)
container_home = os.environ.get("CLAUDE_BOTTLE_CONTAINER_HOME", "/home/node")
container_ssh = f"{container_home}/.ssh"
agent_socket = "/run/claude-bottle-agent.sock"
public_socket = "/run/claude-bottle-agent-public.sock"
keys_dir = "/root/.claude-bottle-keys"
# Per-entry listen ports come off the gate plan (PRD 0007).
# Indexed by the bottle.ssh entry's Host alias so each ssh_config
# block knows which port its forwarder lives on.
upstreams_by_alias = {u.bottle_host_alias: u for u in plan.gate_plan.upstreams}
if set(upstreams_by_alias) != {e.Host for e in bottle.ssh}:
die(
"ssh-gate upstream table is out of sync with bottle.ssh; "
"this is an internal bug"
)
# ~/.ssh for node (700, owned by node).
docker_mod.docker_exec_root(container, ["mkdir", "-p", container_ssh])
docker_mod.docker_exec_root(container, ["chown", "node:node", container_ssh])
@@ -85,16 +95,15 @@ def provision_ssh(plan: DockerBottlePlan, target: str) -> None:
known_hosts_file.write_text("")
known_hosts_file.chmod(0o600)
proxy_host, _, proxy_port = proxy_host_port.partition(":")
container_key_paths: list[str] = []
for entry in bottle.ssh:
name = entry.Host
key = expand_tilde(entry.IdentityFile)
hostname = entry.Hostname
user = entry.User
port = entry.Port
known_host_key = entry.KnownHostKey
upstream = upstreams_by_alias[name]
listen_port = upstream.listen_port
key_basename = os.path.basename(key)
container_key_path = f"{keys_dir}/{key_basename}"
@@ -110,35 +119,32 @@ def provision_ssh(plan: DockerBottlePlan, target: str) -> None:
container_key_paths.append(container_key_path)
# ProxyCommand tunnels SSH through pipelock via HTTP
# CONNECT. %h / %p expand to this block's HostName /
# Port. socat's PROXY: mode does CONNECT host:port to
# the proxy.
# Each Host block points at the gate container + its
# per-entry listen port. HostKeyAlias makes ssh validate
# the host key against `hostname` (the real upstream
# name) instead of the gate container; CheckHostIP=no
# skips the resolved-IP lookup, which would also point at
# the gate.
block = (
f"Host {name}\n"
f" HostName {hostname}\n"
f" HostName {gate_target}\n"
f" User {user}\n"
f" Port {port}\n"
f" Port {listen_port}\n"
f" IdentityAgent {public_socket}\n"
f" ProxyCommand socat - PROXY:{proxy_host}:%h:%p,proxyport={proxy_port}\n"
f" HostKeyAlias {hostname}\n"
f" CheckHostIP no\n"
f"\n"
)
with config_file.open("a") as f:
f.write(block)
if known_host_key:
entries_to_write: list[str] = []
if port == "22":
entries_to_write.append(f"{name} {known_host_key}\n")
if hostname != name:
entries_to_write.append(f"{hostname} {known_host_key}\n")
else:
entries_to_write.append(f"[{name}]:{port} {known_host_key}\n")
if hostname != name:
entries_to_write.append(f"[{hostname}]:{port} {known_host_key}\n")
# HostKeyAlias makes ssh look up known_hosts under
# `hostname` (the upstream's real name / IP literal),
# not the gate container. One unambiguous entry per
# ssh entry.
with known_hosts_file.open("a") as f:
for e in entries_to_write:
f.write(e)
f.write(f"{hostname} {known_host_key}\n")
# Boot the agent, load each key, delete the key files, then
# start the root-owned socat forwarder. One docker exec so the
+159
View File
@@ -0,0 +1,159 @@
"""DockerSSHGate — the Docker-specific lifecycle for the per-agent
SSH egress gate sidecar (PRD 0007). Inherits the platform-agnostic
prepare step (upstream allocation + entrypoint render) from
`SSHGate`."""
from __future__ import annotations
import os
import subprocess
from ...log import die, info, warn
from ...ssh_gate import SSHGate, SSHGatePlan
# alpine/socat pinned by digest. The image is `alpine` + `socat`
# pre-installed; PRD 0007 requires the gate image to be
# self-sufficient at boot (no apk pulls) because the agent-facing
# leg sits on the `--internal` network.
SSH_GATE_IMAGE = os.environ.get(
"CLAUDE_BOTTLE_SSH_GATE_IMAGE",
"alpine/socat@sha256:a26f4bcee25ad4a4096ce91e596c0a2fffcbb51f7fd198dd87a5c86eae66f0e1",
)
# In-container path the entrypoint script lands at after `docker cp`.
# Root path keeps the cp simple — no intermediate directories to
# create.
SSH_GATE_ENTRYPOINT_IN_CONTAINER = "/ssh-gate-entrypoint.sh"
def ssh_gate_container_name(slug: str) -> str:
return f"claude-bottle-ssh-gate-{slug}"
def ssh_gate_host(slug: str) -> str:
"""The hostname the agent's ssh client should connect to. Same as
the container name — Docker's embedded DNS resolves it on the
`--internal` network (verified by the PRD 0007 DNS spike)."""
return ssh_gate_container_name(slug)
class DockerSSHGate(SSHGate):
"""Brings the SSH gate sidecar up and down via Docker."""
def start(self, plan: SSHGatePlan) -> str:
"""Boot the gate sidecar:
1. `docker create` on the internal network with the
canonical name, `--entrypoint /bin/sh`, and the
in-container entrypoint path as the CMD.
2. `docker cp` the entrypoint script in.
3. Attach to the per-agent egress network so socat can dial
upstream.
4. `docker start`.
Returns the container name (the target passed to `.stop`)."""
if not plan.upstreams:
die("DockerSSHGate.start called with no upstreams; caller should skip")
if not plan.internal_network or not plan.egress_network:
die(
"DockerSSHGate.start: internal_network / egress_network must be "
"populated on the plan before start"
)
if not plan.entrypoint_script.is_file():
die(
f"ssh-gate entrypoint script missing at {plan.entrypoint_script}; "
f"SSHGate.prepare must run first"
)
name = ssh_gate_container_name(plan.slug)
info(f"starting ssh-gate sidecar {name} on network {plan.internal_network}")
create_args = [
"docker", "create",
"--name", name,
"--network", plan.internal_network,
"--entrypoint", "/bin/sh",
SSH_GATE_IMAGE,
SSH_GATE_ENTRYPOINT_IN_CONTAINER,
]
if subprocess.run(
create_args,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
).returncode != 0:
die(f"failed to create ssh-gate sidecar {name}")
cp_result = subprocess.run(
[
"docker", "cp",
str(plan.entrypoint_script),
f"{name}:{SSH_GATE_ENTRYPOINT_IN_CONTAINER}",
],
capture_output=True,
text=True,
check=False,
)
if cp_result.returncode != 0:
subprocess.run(
["docker", "rm", "-f", name],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
)
die(
f"failed to copy ssh-gate entrypoint into {name}: "
f"{cp_result.stderr.strip()}"
)
if subprocess.run(
["docker", "network", "connect", plan.egress_network, name],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
).returncode != 0:
subprocess.run(
["docker", "rm", "-f", name],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
)
die(
f"failed to attach ssh-gate sidecar {name} to egress network "
f"{plan.egress_network}"
)
if subprocess.run(
["docker", "start", name],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
).returncode != 0:
subprocess.run(
["docker", "rm", "-f", name],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
)
die(f"failed to start ssh-gate sidecar {name}")
return name
def stop(self, target: str) -> None:
"""Idempotent: missing container is success. `target` is the
container name returned by `.start`."""
if subprocess.run(
["docker", "inspect", target],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
).returncode == 0:
if subprocess.run(
["docker", "rm", "-f", target],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
).returncode != 0:
warn(
f"failed to remove ssh-gate sidecar {target}; "
f"clean up with 'docker rm -f {target}'"
)
+4 -36
View File
@@ -18,7 +18,6 @@ from pathlib import Path
from typing import cast
from .manifest import Bottle
from .util import is_ipv4_literal
# Baked-in default allowlist for hosts Claude Code itself needs.
DEFAULT_ALLOWLIST: tuple[str, ...] = (
@@ -40,30 +39,17 @@ def pipelock_bottle_allowlist(bottle: Bottle) -> list[str]:
return list(bottle.egress.allowlist)
def pipelock_bottle_ssh_hostnames(bottle: Bottle) -> list[str]:
return [e.Hostname for e in bottle.ssh if e.Hostname]
def pipelock_bottle_ssh_trusted_domains(bottle: Bottle) -> list[str]:
return [h for h in pipelock_bottle_ssh_hostnames(bottle) if not is_ipv4_literal(h)]
def pipelock_bottle_ssh_ip_cidrs(bottle: Bottle) -> list[str]:
return [f"{h}/32" for h in pipelock_bottle_ssh_hostnames(bottle) if is_ipv4_literal(h)]
def pipelock_effective_allowlist(bottle: Bottle) -> list[str]:
"""Deduplicated union of: baked-in defaults, bottle.egress.allowlist,
bottle.ssh[].Hostname. Sorted for stability."""
"""Deduplicated union of: baked-in defaults, bottle.egress.allowlist.
Sorted for stability. Per PRD 0007, bottle.ssh entries do NOT
contribute here — SSH traffic flows through the per-agent ssh-gate
sidecar, not pipelock."""
seen: dict[str, None] = {}
for h in DEFAULT_ALLOWLIST:
seen.setdefault(h, None)
for h in pipelock_bottle_allowlist(bottle):
if h:
seen.setdefault(h, None)
for h in pipelock_bottle_ssh_hostnames(bottle):
if h:
seen.setdefault(h, None)
return sorted(seen.keys())
@@ -116,12 +102,6 @@ def pipelock_build_config(
"api_allowlist": pipelock_effective_allowlist(bottle),
"forward_proxy": {"enabled": True},
}
trusted = pipelock_bottle_ssh_trusted_domains(bottle)
if trusted:
cfg["trusted_domains"] = trusted
ip_cidrs = pipelock_bottle_ssh_ip_cidrs(bottle)
if ip_cidrs:
cfg["ssrf"] = {"ip_allowlist": ip_cidrs}
cfg["dlp"] = {"include_defaults": True, "scan_env": True}
# Body-scan enforcement is a separate pipelock section (each DLP
# "surface" — body, MCP, response — has its own action). Pipelock's
@@ -163,18 +143,6 @@ def pipelock_render_yaml(cfg: dict[str, object]) -> str:
fp = cast(dict[str, object], cfg["forward_proxy"])
lines.append(f" enabled: {_bool(fp['enabled'])}")
lines.append("")
if "trusted_domains" in cfg:
lines.append("trusted_domains:")
for td in cast(list[str], cfg["trusted_domains"]):
lines.append(f' - "{td}"')
lines.append("")
if "ssrf" in cfg:
lines.append("ssrf:")
ssrf = cast(dict[str, object], cfg["ssrf"])
lines.append(" ip_allowlist:")
for cidr in cast(list[str], ssrf["ip_allowlist"]):
lines.append(f' - "{cidr}"')
lines.append("")
lines.append("dlp:")
dlp = cast(dict[str, object], cfg["dlp"])
lines.append(f" include_defaults: {_bool(dlp['include_defaults'])}")
+144
View File
@@ -0,0 +1,144 @@
"""Per-agent SSH egress gate (PRD 0007).
A second per-agent sidecar that does plain TCP forwarding from a set
of static listen ports to the SSH hosts declared in `bottle.ssh`.
The agent's ssh client points each `Host` block at the gate
container + a per-entry listen port; pipelock stops seeing SSH
traffic entirely.
This module defines the abstract gate (`SSHGate`) and the plan
dataclass (`SSHGatePlan`) consumed by its `start`. The sidecar's
start/stop lifecycle is backend-specific and lives on concrete
subclasses (see `claude_bottle/backend/docker/ssh_gate.py`)."""
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
from .log import die
from .manifest import Bottle
# Default port when an ssh entry has no `Port` field. Matches OpenSSH.
_DEFAULT_SSH_PORT = 22
@dataclass(frozen=True)
class SSHGateUpstream:
"""One forwarder rule on the gate: listen locally on `listen_port`,
forward each connection to `upstream_host:upstream_port`. The
`bottle_host_alias` is the `Host` value from the manifest entry,
kept for diagnostics + so the ssh provisioner can correlate
upstreams with their alias.
`listen_port` mirrors the upstream port. That choice lets git
URLs that bake the upstream port into the remote (e.g.
`ssh://git@host:30009/repo.git`) work without rewriting: OpenSSH
treats a URL-supplied port as overriding the config's `Port`
directive, so the gate must be reachable on the same port the URL
names. Two ssh entries that share an upstream port are a config
error and rejected at prepare time."""
listen_port: int
upstream_host: str
upstream_port: str
bottle_host_alias: str
@dataclass(frozen=True)
class SSHGatePlan:
"""Output of SSHGate.prepare; consumed by .start when the sidecar
needs to be brought up.
`upstreams` + `slug` + `entrypoint_script` are filled in at
prepare time (host-side, side-effect-free on docker). The network
fields are populated by the backend's launch step via
`dataclasses.replace` once those networks exist. Empty defaults
are sentinels meaning "not yet set"; `.start` validates that
they are populated."""
slug: str
entrypoint_script: Path
upstreams: tuple[SSHGateUpstream, ...]
internal_network: str = ""
egress_network: str = ""
def ssh_gate_upstreams_for_bottle(bottle: Bottle) -> tuple[SSHGateUpstream, ...]:
"""Build the gate's upstream table. Each ssh entry's listen port
equals its upstream port so URL-supplied ports (which override
`~/.ssh/config`'s `Port` directive) still reach the gate.
Dies on two entries sharing an upstream port — the gate is a
single container with a flat port space, so each listener has to
be unique."""
seen_ports: dict[int, str] = {}
upstreams: list[SSHGateUpstream] = []
for e in bottle.ssh:
port = int(e.Port) if e.Port else _DEFAULT_SSH_PORT
if port in seen_ports:
die(
f"ssh entries '{seen_ports[port]}' and '{e.Host}' share upstream port "
f"{port}; the per-agent ssh gate can only forward one upstream "
f"per port. Change one of the upstream Ports in claude-bottle.json."
)
seen_ports[port] = e.Host
upstreams.append(
SSHGateUpstream(
listen_port=port,
upstream_host=e.Hostname,
upstream_port=e.Port,
bottle_host_alias=e.Host,
)
)
return tuple(upstreams)
def ssh_gate_render_entrypoint(upstreams: tuple[SSHGateUpstream, ...]) -> str:
"""Render the gate's entrypoint script: one `socat TCP-LISTEN`
per upstream, all backgrounded, then `wait`. Posix sh, no bash-isms
(alpine's sh is busybox ash). If any one socat dies, the others
keep running until the container is removed — matches the v1
no-restart policy from the PRD."""
lines = ["#!/bin/sh", "set -eu"]
for u in upstreams:
lines.append(
f"socat TCP-LISTEN:{u.listen_port},reuseaddr,fork "
f"TCP:{u.upstream_host}:{u.upstream_port} &"
)
lines.append("wait")
return "\n".join(lines) + "\n"
class SSHGate(ABC):
"""The per-agent SSH egress gate. Encapsulates the host-side
prepare step (upstream allocation + entrypoint render); the
sidecar's start/stop lifecycle is backend-specific and lives on
concrete subclasses."""
def prepare(self, bottle: Bottle, slug: str, stage_dir: Path) -> SSHGatePlan:
"""Compute the upstream table from `bottle.ssh` and write the
entrypoint script (mode 600) under `stage_dir`. Pure host-side,
no docker subprocess.
Returned plan is incomplete: the launch step must fill
`internal_network` / `egress_network` via `dataclasses.replace`
before passing the plan to `.start`."""
upstreams = ssh_gate_upstreams_for_bottle(bottle)
script = stage_dir / "ssh_gate_entrypoint.sh"
script.write_text(ssh_gate_render_entrypoint(upstreams))
script.chmod(0o600)
return SSHGatePlan(slug=slug, entrypoint_script=script, upstreams=upstreams)
@abstractmethod
def start(self, plan: SSHGatePlan) -> str:
"""Bring up the gate sidecar according to `plan`. Returns the
target string identifying the running instance — the same
value to pass to `.stop`. Backend-specific."""
@abstractmethod
def stop(self, target: str) -> None:
"""Tear down the gate sidecar identified by `target` (the
value `.start` returned). Idempotent: a missing target is
success. Backend-specific."""
-13
View File
@@ -6,7 +6,6 @@ level deeper, under their backend package."""
from __future__ import annotations
import os
import re
def expand_tilde(path: str) -> str:
@@ -17,15 +16,3 @@ def expand_tilde(path: str) -> str:
home = os.environ.get("HOME", "")
return home + path[1:]
return path
_IPV4_RE = re.compile(r"^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+$")
def is_ipv4_literal(s: str) -> bool:
"""True iff `s` looks like a dotted-quad IPv4 literal. Does not
validate octet ranges; consumers that care about that should run
a stricter check. Empty input returns False."""
if not s:
return False
return bool(_IPV4_RE.match(s))
+201
View File
@@ -0,0 +1,201 @@
# PRD 0007: SSH egress gate
- **Status:** Draft
- **Author:** didericis
- **Created:** 2026-05-12
## Summary
Per-agent TCP-forwarder sidecar built from `bottle.ssh` entries; SSH stops
going through pipelock; pipelock keeps full TLS interception with no
SSH carve-outs.
## Problem
`git fetch` over SSH from inside an implementer-agent bottle is broken
on `main`. The error surfaced after PRD 0006 enabled pipelock's
native `tls_interception`:
```
kex_exchange_identification: Connection closed by remote host
Connection closed by UNKNOWN port 65535
fatal: Could not read from remote repository.
```
The agent's ssh client tunnels through pipelock via a `ProxyCommand
socat - PROXY:pipelock:%h:%p` and pipelock now bumps that CONNECT
tunnel. SSH sends its banner instead of a TLS ClientHello; pipelock's
SNI gate rejects it; the tunnel closes mid-kex. Every bottle with an
`ssh` entry hits this — including the implementer agent used by the
free-agent workflow, which can't pull or push.
## Goals / Success Criteria
Integration test: spin up a bottle with an SSH entry, exec `git
fetch` against a real-ish SSH host from inside the agent, observe
exit 0. This is the same signal that's broken today; flipping it
back to green is the test.
## Non-goals
- Pluggable forwarder backend. One TCP forwarder image is baked in;
abstracting over haproxy / nginx-stream / etc. is deferred.
- SSH-protocol awareness. The gate stays at L4. No SSH-version
sniffing, no kex inspection, no per-key gating beyond what ssh
itself enforces inside the agent.
- Replacing pipelock for anything else. HTTPS / HTTP traffic
continues to flow through pipelock unchanged. This PRD adds a
sidecar; it doesn't displace one.
- Connection rate limits or quotas. No per-host or per-agent rate
limiting on the gate; future PRD if it ever matters.
## Scope
### In scope
- **Gate sidecar lifecycle.** `DockerSSHGate` class with
`prepare` / `start` / `stop`, mirroring `DockerPipelockProxy`'s
shape and network attachment story.
- **ssh provisioner rewrite.** `provision/ssh.py` drops the socat
`ProxyCommand`; `~/.ssh/config` points each `Host` at the gate
container and the per-host listen port.
- **Pipelock carve-out removal.** Strip
`pipelock_bottle_ssh_trusted_domains`,
`pipelock_bottle_ssh_ip_cidrs`, and the related code paths in
`pipelock_build_config` + tests. After this PRD, pipelock has no
knowledge of `bottle.ssh`.
- **Plan rendering / dry-run.** `bottle_plan.py` and the y/N
preflight surface the new gate sidecar (name, listen ports,
upstream targets).
### Out of scope
- SSH key generation / rotation. Bottle keys are still
user-supplied via `IdentityFile`; the gate doesn't manage key
material.
- Per-host audit logging. The gate is dumb TCP forwarding; no
in-band visibility into SSH session content. (Connection-level
logs from socat are a nice-to-have, not a goal.)
- Non-Docker backends. Implementation lands for Docker only; the
`BottleBackend` abstraction can grow the hook but other backends
are deferred.
- Manifest schema changes. `bottle.ssh` stays exactly as it is
today; this PRD is internals-only.
## Proposed Design
### New services / components
Mirror the pipelock layout:
- **`claude_bottle/ssh_gate.py`** (new): abstract `SSHGate` +
`SSHGatePlan` dataclass. `prepare` is host-side / side-effect-free
on docker; renders the forwarder config under `stage_dir`.
- **`claude_bottle/backend/docker/ssh_gate.py`** (new):
`DockerSSHGate` concrete subclass — `start` does `docker create`
on the internal network, copies the config in, attaches the
egress network, `docker start`. `stop` is idempotent `docker rm
-f`. Container name: `claude-bottle-ssh-gate-<slug>`.
Forwarder image: `alpine/socat`, pinned by digest. Must be
self-sufficient at boot (no apk/apt pulls on first run) because
the gate's agent-facing leg sits on the `--internal` network and
has no internet at startup. One socat process per ssh entry,
multiplexed inside the same gate container via an entrypoint
script that backgrounds N socat invocations:
```
socat TCP-LISTEN:<port_i>,reuseaddr,fork TCP:<Hostname_i>:<Port_i>
```
Listen ports mirror the upstream port (entry `Port`, default 22).
That choice is load-bearing: OpenSSH treats a URL-supplied port
(e.g. `ssh://git@host:30009/repo.git`) as overriding the config's
`Port` directive, so the gate has to be reachable on the same port
the URL names — otherwise git fetch hits "connection refused" on
the URL's port even though the config block points elsewhere. Two
ssh entries sharing an upstream port are a config error and
rejected at prepare time. One container, N listeners, N upstreams.
### Existing code touched
- **`claude_bottle/backend/docker/provision/ssh.py`**: drop the
`ProxyCommand socat - PROXY:...` plumbing and the
`pipelock_proxy_host_port` import. The rendered `~/.ssh/config`
block per entry becomes:
```
Host <name>
HostName <gate-container>
User <user>
Port <listen-port>
IdentityAgent <public-socket>
```
`known_hosts` entries are keyed off `<name>` and the new
`[<gate-container>]:<listen-port>` form so OpenSSH's strict
host-key checking still matches.
- **`claude_bottle/pipelock.py`**: delete
`pipelock_bottle_ssh_hostnames`, `pipelock_bottle_ssh_trusted_domains`,
`pipelock_bottle_ssh_ip_cidrs`, and the calls into them from
`pipelock_effective_allowlist` and `pipelock_build_config`. The
effective allowlist becomes baked-defaults `bottle.egress.allowlist`.
- **`claude_bottle/backend/docker/backend.py`**: instantiate
`DockerSSHGate` alongside `DockerPipelockProxy`; thread its
`prepare` / `start` / `stop` through `resolve_plan` / `launch`.
- **`claude_bottle/backend/docker/launch.py`**: add gate start /
stop to the `ExitStack` in the right order — gate must be up
before `provision_ssh` runs so the agent can dial it on first
boot.
- **`claude_bottle/backend/docker/bottle_plan.py`**: new
`SSHGatePlan` field on `DockerBottlePlan`; preflight rendering
surfaces the gate sidecar (name, per-entry listen ports,
upstream `Hostname:Port` targets).
- **Tests**: update `tests/fixtures.py` callers; rewrite
`tests/unit/test_pipelock_yaml.py::TestBuildConfig::test_ssh_shape`
to assert pipelock no longer reflects ssh entries; add unit
tests for `SSHGate.prepare` + render shape; add an integration
test in `tests/integration/` for the `git fetch` round-trip.
### Data model changes
None. `bottle.ssh` schema is unchanged; one new internal plan
dataclass (`SSHGatePlan`) under `claude_bottle/ssh_gate.py`.
### External dependencies
- `alpine/socat` image, pinned by digest (declared next to the
`PIPELOCK_IMAGE` constant). No new Python packages.
## Open questions
- Network topology: does the gate need its own per-agent egress
bridge, or can it share pipelock's egress network? Sharing is
simpler; per-gate isolates failure modes. Decide during
implementation; default to "share pipelock's egress network"
unless a concrete reason emerges.
- Socat container restart policy: a single socat that crashes
takes one upstream offline; do we want a wrapper that restarts
individual listeners, or just rely on `docker restart`? Default
to no-restart for v1 (matches pipelock).
- Connection-level audit log: socat's `-v` mode logs every
connect/close. Worth piping into the bottle's stderr stream, or
is that noise? Default off, reconsider if debugging gets hard.
- ~~Docker DNS for the `<gate-container>` hostname inside the
agent: works via Docker's embedded resolver on user-defined
networks. Verify on the `--internal` network specifically before
implementation.~~ **Resolved.** Spike confirmed: a container on
a `--internal` user-defined network resolves another
container's name via the embedded resolver at 127.0.0.11 and
reaches it over TCP, while egress to the public internet
remains blocked. The PRD's design assumption holds.
## References
- PRD 0001: per-agent egress proxy via pipelock — the parent
topology this PRD slots into.
- PRD 0006: pipelock native TLS interception — the change that
surfaced this regression by making pipelock incompatible with
SSH-over-CONNECT.
- `claude_bottle/backend/docker/provision/ssh.py` — current SSH
provisioning that this PRD rewrites.
- `claude_bottle/pipelock.py` — current pipelock config builder
that gains the `bottle.ssh`-derived fields this PRD removes.
+1
View File
@@ -80,6 +80,7 @@ class TestDryRunPlan(unittest.TestCase):
"runsc isn't available on the CI runner")
self.assertEqual([], plan["skills"])
self.assertEqual([], plan["ssh_hosts"])
self.assertEqual([], plan["ssh_gate"])
self.assertEqual(False, plan["remote_control"])
self.assertEqual(0, plan["prompt"]["length"])
+13 -2
View File
@@ -1,8 +1,8 @@
"""Integration: the cleanup primitives the start-flow trap depends on
are idempotent. The original orphan-network bug was a trap-ordering
issue; the fix moved the install earlier. The trap is only safe if
network_remove and PipelockProxy.stop are no-ops against missing
resources."""
network_remove, PipelockProxy.stop, and SSHGate.stop are no-ops
against missing resources."""
import os
import subprocess
@@ -17,6 +17,10 @@ from claude_bottle.backend.docker.pipelock import (
DockerPipelockProxy,
pipelock_container_name,
)
from claude_bottle.backend.docker.ssh_gate import (
DockerSSHGate,
ssh_gate_container_name,
)
from tests._docker import skip_unless_docker
@@ -75,6 +79,13 @@ class TestOrphanCleanup(unittest.TestCase):
# Should not raise.
DockerPipelockProxy().stop(pipelock_container_name(f"missing-{self.slug}"))
def test_ssh_gate_stop_missing_sidecar(self):
# Same trap-safety requirement for the gate (PRD 0007). The
# launch ExitStack calls gate.stop on every error path; if
# the container was never created (early failure), stop must
# still no-op.
DockerSSHGate().stop(ssh_gate_container_name(f"missing-{self.slug}"))
if __name__ == "__main__":
unittest.main()
+10 -24
View File
@@ -1,20 +1,12 @@
"""Unit: pipelock_effective_allowlist — the union of baked-in defaults,
bottle.egress.allowlist, and bottle.ssh[].Hostname. Plus a small check
that IPv4 hostnames pick up the /32 suffix when classified as CIDRs.
The lower-level one-line helpers (pipelock_bottle_allowlist,
pipelock_bottle_ssh_hostnames, pipelock_bottle_ssh_trusted_domains)
are exercised end-to-end by test_union_and_dedup, so they don't get
their own tests."""
"""Unit: pipelock_effective_allowlist — the union of baked-in defaults
and bottle.egress.allowlist. Per PRD 0007, bottle.ssh entries do NOT
contribute (SSH traffic goes through the per-agent ssh-gate, not
pipelock)."""
import unittest
from claude_bottle.manifest import Manifest
from claude_bottle.pipelock import (
pipelock_bottle_ssh_ip_cidrs,
pipelock_effective_allowlist,
)
from tests.fixtures import fixture_with_ssh
from claude_bottle.pipelock import pipelock_effective_allowlist
class TestEffectiveAllowlist(unittest.TestCase):
@@ -36,20 +28,14 @@ class TestEffectiveAllowlist(unittest.TestCase):
eff = pipelock_effective_allowlist(manifest.bottles["dev"])
self.assertIn("api.anthropic.com", eff, "baked default present")
self.assertIn("registry.npmjs.org", eff, "egress.allowlist present")
self.assertIn("100.78.141.42", eff, "ssh ipv4 hostname present")
self.assertIn("github.com", eff, "ssh hostname present")
# PRD 0007: ssh hostnames must not contribute to pipelock's
# allowlist anymore — they're routed through the ssh-gate
# sidecar, which is on its own egress path.
self.assertNotIn("100.78.141.42", eff)
self.assertNotIn("github.com", eff)
self.assertEqual(len(eff), len(set(eff)), "deduplicated")
self.assertEqual(eff, sorted(eff), "sorted")
class TestSSHIPCidrs(unittest.TestCase):
def test_ipv4_hostname_gets_32_suffix(self):
cidrs = pipelock_bottle_ssh_ip_cidrs(fixture_with_ssh().bottles["dev"])
self.assertIn("100.78.141.42/32", cidrs)
# Hostname-typed entries don't end up here.
self.assertNotIn("github.com", cidrs)
self.assertNotIn("github.com/32", cidrs)
if __name__ == "__main__":
unittest.main()
-33
View File
@@ -1,33 +0,0 @@
"""Unit: is_ipv4_literal — the classifier that decides whether
bottle.ssh[].Hostname goes into pipelock's ssrf.ip_allowlist (IPv4
literal) or trusted_domains (hostname)."""
import unittest
from claude_bottle.util import is_ipv4_literal
class TestIPv4Classify(unittest.TestCase):
def test_positive(self):
for ip in ("127.0.0.1", "10.0.0.5", "100.78.141.42", "0.0.0.0", "255.255.255.255"):
with self.subTest(ip=ip):
self.assertTrue(is_ipv4_literal(ip), ip)
def test_negative(self):
for hn in (
"github.com",
"gitea.dideric.is",
"100.78.141",
"100.78.141.42.5",
"::1",
"fe80::1",
"localhost",
"",
"1.2.3.4.example.com",
):
with self.subTest(hn=hn):
self.assertFalse(is_ipv4_literal(hn), hn)
if __name__ == "__main__":
unittest.main()
+15 -12
View File
@@ -34,23 +34,25 @@ class TestBuildConfig(unittest.TestCase):
# Baked defaults always present.
self.assertIn("api.anthropic.com", cast(list[str], cfg["api_allowlist"]))
self.assertIn("raw.githubusercontent.com", cast(list[str], cfg["api_allowlist"]))
# No SSH entries → no trusted_domains, no ssrf.
# PRD 0007: pipelock has no SSH carve-outs at all — neither
# trusted_domains nor ssrf are ever emitted from bottle data
# in v1.
self.assertNotIn("trusted_domains", cfg)
self.assertNotIn("ssrf", cfg)
# Without CA paths, the tls_interception block is omitted —
# pipelock falls back to its built-in default of `enabled: false`.
self.assertNotIn("tls_interception", cfg)
def test_ssh_shape(self):
def test_ssh_entries_do_not_leak_into_pipelock(self):
# PRD 0007: bottle.ssh routes through the ssh-gate sidecar,
# so pipelock's config must not reflect those hostnames or
# IPs in any of its blocks.
cfg = pipelock_build_config(fixture_with_ssh().bottles["dev"])
self.assertIn("github.com", cast(list[str], cfg["trusted_domains"]))
self.assertNotIn("100.78.141.42", cast(list[str], cfg["trusted_domains"]))
self.assertIn(
"100.78.141.42/32",
cast(dict[str, Any], cfg["ssrf"])["ip_allowlist"],
)
# Strict mode: IPv4 host is also in the api_allowlist union.
self.assertIn("100.78.141.42", cast(list[str], cfg["api_allowlist"]))
allow = cast(list[str], cfg["api_allowlist"])
self.assertNotIn("github.com", allow)
self.assertNotIn("100.78.141.42", allow)
self.assertNotIn("trusted_domains", cfg)
self.assertNotIn("ssrf", cfg)
def test_tls_interception_block_emitted_when_paths_supplied(self):
# PRD 0006: paths flow in via DockerPipelockProxy's in-container
@@ -95,12 +97,13 @@ class TestRenderAndWrite(unittest.TestCase):
for required in (
"api_allowlist:",
"forward_proxy:",
"trusted_domains:",
"ssrf:",
"dlp:",
"request_body_scanning:",
):
self.assertIn(required, text)
# PRD 0007: no ssh carve-outs in the rendered yaml.
self.assertNotIn("trusted_domains:", text)
self.assertNotIn("ssrf:", text)
def test_prepare_writes_file_at_mode_600(self):
plan = DockerPipelockProxy().prepare(
+137
View File
@@ -0,0 +1,137 @@
"""Unit: SSHGate prepare shape + entrypoint render."""
import os
import stat
import tempfile
import unittest
from pathlib import Path
from claude_bottle.manifest import Manifest
from claude_bottle.ssh_gate import (
SSHGate,
SSHGatePlan,
SSHGateUpstream,
ssh_gate_render_entrypoint,
ssh_gate_upstreams_for_bottle,
)
from tests.fixtures import fixture_minimal, fixture_with_ssh
class _StubGate(SSHGate):
"""Concrete subclass for testing the abstract `prepare`. The
backend-specific start/stop aren't exercised here."""
def start(self, plan: SSHGatePlan) -> str:
raise NotImplementedError
def stop(self, target: str) -> None:
raise NotImplementedError
class TestUpstreamAssignment(unittest.TestCase):
def test_listen_port_matches_upstream_port(self):
# Critical: URLs like ssh://git@host:30009/... override the
# config Port directive, so the gate must listen on the same
# port the URL names.
bottle = fixture_with_ssh().bottles["dev"]
upstreams = ssh_gate_upstreams_for_bottle(bottle)
self.assertEqual(2, len(upstreams))
# Fixture: tailscale-gitea -> 100.78.141.42:30009, github -> github.com:22.
self.assertEqual(30009, upstreams[0].listen_port)
self.assertEqual(22, upstreams[1].listen_port)
def test_upstream_fields_mirror_ssh_entry(self):
bottle = fixture_with_ssh().bottles["dev"]
first = ssh_gate_upstreams_for_bottle(bottle)[0]
self.assertEqual("tailscale-gitea", first.bottle_host_alias)
self.assertEqual("100.78.141.42", first.upstream_host)
self.assertEqual("30009", first.upstream_port)
def test_empty_bottle_yields_empty_upstreams(self):
bottle = fixture_minimal().bottles["dev"]
self.assertEqual((), ssh_gate_upstreams_for_bottle(bottle))
def test_duplicate_upstream_port_is_rejected(self):
# Two entries on the same upstream port can't both have a
# listener — the gate is one container with a flat port
# space. Surface as a clear config error.
manifest = Manifest.from_json_obj({
"bottles": {
"dev": {
"ssh": [
{"Host": "a", "IdentityFile": "/dev/null",
"Hostname": "host-a.example", "User": "git", "Port": 22},
{"Host": "b", "IdentityFile": "/dev/null",
"Hostname": "host-b.example", "User": "git", "Port": 22},
],
}
},
"agents": {"demo": {"skills": [], "prompt": "", "bottle": "dev"}},
})
with self.assertRaises(SystemExit):
ssh_gate_upstreams_for_bottle(manifest.bottles["dev"])
class TestEntrypointRender(unittest.TestCase):
def test_one_socat_line_per_upstream(self):
upstreams = (
SSHGateUpstream(30009, "gitea.example", "30009", "gitea"),
SSHGateUpstream(22, "github.com", "22", "gh"),
)
script = ssh_gate_render_entrypoint(upstreams)
self.assertIn("#!/bin/sh", script)
self.assertIn(
"socat TCP-LISTEN:30009,reuseaddr,fork TCP:gitea.example:30009 &", script
)
self.assertIn(
"socat TCP-LISTEN:22,reuseaddr,fork TCP:github.com:22 &", script
)
# wait blocks the entrypoint so PID 1 stays alive while sockets
# are open.
self.assertTrue(script.rstrip().endswith("wait"))
def test_empty_upstreams_still_has_wait(self):
# Defensive: a no-upstream gate is a no-op, but render must
# still produce a valid shell script.
script = ssh_gate_render_entrypoint(())
self.assertIn("#!/bin/sh", script)
self.assertIn("wait", script)
class TestPrepare(unittest.TestCase):
def setUp(self):
self.stage = Path(tempfile.mkdtemp())
def tearDown(self):
import shutil
shutil.rmtree(self.stage, ignore_errors=True)
def test_prepare_writes_entrypoint_mode_600(self):
plan = _StubGate().prepare(
fixture_with_ssh().bottles["dev"], "demo", self.stage
)
self.assertEqual(self.stage / "ssh_gate_entrypoint.sh", plan.entrypoint_script)
self.assertEqual(0o600, os.stat(plan.entrypoint_script).st_mode & 0o777)
def test_prepare_plan_carries_upstreams_and_slug(self):
plan = _StubGate().prepare(
fixture_with_ssh().bottles["dev"], "demo", self.stage
)
self.assertEqual("demo", plan.slug)
self.assertEqual(2, len(plan.upstreams))
self.assertEqual("", plan.internal_network)
self.assertEqual("", plan.egress_network)
def test_prepare_with_no_ssh_writes_minimal_script(self):
plan = _StubGate().prepare(
fixture_minimal().bottles["dev"], "demo", self.stage
)
self.assertEqual((), plan.upstreams)
content = plan.entrypoint_script.read_text()
self.assertNotIn("socat", content)
self.assertIn("wait", content)
if __name__ == "__main__":
unittest.main()