Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 137 additions & 5 deletions internal/proxy/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net"
"os"
"path/filepath"
"sync"

"github.com/nemirovsky/sluice/internal/vault"
"golang.org/x/crypto/ssh"
Expand Down Expand Up @@ -263,8 +264,33 @@ func sshHandleChannel(newChan ssh.NewChannel, dst ssh.Conn) {
// from upstream finish, each signaling on this channel.
upstreamDone := make(chan struct{}, 3)

// Forward per-channel requests bidirectionally.
go sshForwardChannelRequests(srcReqs, dstChan)
// Track agent-to-upstream requests that are mid-flight. Each request
// the agent sends has to be forwarded to upstream, awaited for a
// reply (when WantReply is true), and replied to on the agent side
// before sluice may close srcChan. Without this barrier, a fast
// upstream that replies + writes data + sends exit-status + closes
// in one burst lets sluice drain all three upstream-to-agent
// goroutines and close srcChan while this forwarder is still
// mid-reply for the original exec request. The agent then observes
// SSH_MSG_CHANNEL_CLOSE before its SendRequest("exec", true, ...)
// receives a SUCCESS/FAILURE on ch.msg, and the gossh client
// surfaces the closed ch.msg as io.EOF.
//
// sync.WaitGroup is the wrong primitive here because Add and Wait
// are not safe to call concurrently when the counter is at zero
// (Go runtime panics with "sync: WaitGroup misuse"). The forwarder
// goroutine ranges over srcReqs and could enter a new iteration at
// any moment, racing the main goroutine's drain. We use a mutex +
// cond + draining flag instead: once draining is set, the forwarder
// rejects further requests so Wait() can converge.
barrier := &inflightBarrier{}
barrier.cond = sync.NewCond(&barrier.mu)

// Forward per-channel requests bidirectionally. The agent-to-upstream
// loop reports each request via barrier so sluice's pre-close
// drain knows when none are pending. The upstream-to-agent loop
// signals upstreamDone when dstReqs closes.
go sshForwardAgentRequests(srcReqs, dstChan, barrier)
Comment thread
nnemirovsky marked this conversation as resolved.
go func() {
sshForwardChannelRequests(dstReqs, srcChan)
upstreamDone <- struct{}{}
Expand Down Expand Up @@ -304,10 +330,24 @@ func sshHandleChannel(newChan ssh.NewChannel, dst ssh.Conn) {
<-upstreamDone
<-upstreamDone

// Also drain any agent-to-upstream request that is mid-flight. A
// pending WantReply=true request is waiting on dst.SendRequest to
// return, after which it still has to call req.Reply on the agent
// side. Closing srcChan before that reply is written would let the
// agent see channel-close before the SUCCESS/FAILURE message on
// ch.msg, which gossh surfaces as io.EOF from
// session.SendRequest("exec", true, ...).
//
// Drain sets a draining flag (so the forwarder rejects any further
// request without bumping the counter) and waits on the cond for
// the current iteration, if any, to finish.
barrier.drain()

Comment thread
nnemirovsky marked this conversation as resolved.
// Now that exit-status has been forwarded (the dstReqs goroutine
// has finished), signal stdout EOF to the agent and close the
// channel. The agent's session.Wait() now sees the documented
// order: data, exit-status, EOF, close.
// has finished) and every pending agent-side reply has been
// written, signal stdout EOF to the agent and close the channel.
// The agent's session.Wait() now sees the documented order:
// data, exit-status, EOF, close.
_ = srcChan.CloseWrite()
_ = srcChan.Close()
_ = dstChan.Close()
Expand All @@ -328,3 +368,95 @@ func sshForwardChannelRequests(reqs <-chan *ssh.Request, dst ssh.Channel) {
}
}
}

// inflightBarrier serializes the agent-to-upstream request forwarder
// with sshHandleChannel's pre-close drain. The forwarder calls enter()
// before forwarding a request to upstream and leave() after replying to
// the agent. sshHandleChannel calls drain() once the upstream side has
// fully closed: drain sets the draining flag (so any further enter()
// returns false and the forwarder rejects the request without waiting
// on a closed upstream) and blocks until count reaches zero.
//
// The mutex+cond pattern avoids the Add/Wait race that a sync.WaitGroup
// would have: with a WaitGroup the forwarder's loop could call Add(1)
// at the same instant sshHandleChannel called Wait() with the counter
// at zero, and the Go runtime panics on that interleaving.
type inflightBarrier struct {
mu sync.Mutex
cond *sync.Cond
count int
draining bool
}

// enter reports the start of a request handler. Returns false if drain
// has already begun, in which case the caller must NOT proceed to
// forward the request to a possibly-closed upstream.
func (b *inflightBarrier) enter() bool {
b.mu.Lock()
defer b.mu.Unlock()
if b.draining {
return false
}
b.count++
return true
}

// leave matches a successful enter. When the counter reaches zero
// during draining, the waiter is signaled.
func (b *inflightBarrier) leave() {
b.mu.Lock()
b.count--
if b.count == 0 && b.draining {
b.cond.Broadcast()
}
b.mu.Unlock()
}

// drain sets the draining flag (locking out new enters) and blocks
// until any currently in-flight handlers call leave.
func (b *inflightBarrier) drain() {
b.mu.Lock()
b.draining = true
for b.count > 0 {
b.cond.Wait()
}
b.mu.Unlock()
}

// sshForwardAgentRequests is the agent-to-upstream variant of
// sshForwardChannelRequests. It coordinates with sshHandleChannel's
// pre-close drain via inflightBarrier so the reply on the agent
// direction (req.Reply on srcChan) is fully written before sluice
// closes srcChan. Otherwise an agent that called
// session.SendRequest("exec", WantReply=true, ...) can observe
// SSH_MSG_CHANNEL_CLOSE before its ch.msg receives the
// CHANNEL_REQUEST_SUCCESS reply — gossh surfaces a closed ch.msg as
// io.EOF, and `session.Output("cmd")` fails with EOF even though the
// upstream replied successfully.
//
// When drain has already begun, the request is rejected without being
// forwarded to upstream: the upstream channel is closing, so any reply
// from upstream would never arrive. Replying false to the agent on a
// WantReply request unblocks any caller waiting on ch.msg.
func sshForwardAgentRequests(reqs <-chan *ssh.Request, dst ssh.Channel, barrier *inflightBarrier) {
for req := range reqs {
if !barrier.enter() {
if req.WantReply {
_ = req.Reply(false, nil)
}
continue
}
ok, err := dst.SendRequest(req.Type, req.WantReply, req.Payload)
if err != nil {
if req.WantReply {
_ = req.Reply(false, nil)
}
barrier.leave()
continue
}
if req.WantReply {
_ = req.Reply(ok, nil)
}
barrier.leave()
}
}
190 changes: 190 additions & 0 deletions internal/proxy/ssh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,196 @@ func TestResolveHostKeyCallbackNoKnownHosts(t *testing.T) {
}
}

// TestSSHJumpHost_BurstCloseDoesNotDropExecReply is a focused regression
// test for the race where an upstream that replies + writes data + sends
// exit-status + closes in one burst (no sleep between exit-status and
// close) caused sluice to close srcChan before the agent-to-upstream
// forwarder finished writing the SUCCESS reply for the agent's
// session.SendRequest("exec", true, ...). gossh closes ch.msg on
// SSH_MSG_CHANNEL_CLOSE, so the blocked SendRequest returns io.EOF and
// session.Output(...) fails with "exec command via SSH: EOF".
//
// startTestSSHServer (used by other tests) papers over the race with a
// 50ms sleep before returning from the channel handler. This test
// spins up its own burst-close server with no such sleep, so the race
// is deterministically triggered without the inflightBarrier fix.
func TestSSHJumpHost_BurstCloseDoesNotDropExecReply(t *testing.T) {
pubKey, privPEM := generateTestSSHKey(t)
dir := t.TempDir()
store, err := vault.NewStore(dir)
if err != nil {
t.Fatal(err)
}
if _, err := store.Add("ssh_key", string(privPEM)); err != nil {
t.Fatal(err)
}

sshServer := startBurstCloseSSHServer(t, pubKey)
defer func() { _ = sshServer.Close() }()

proxyHostKey, err := GenerateSSHHostKey()
if err != nil {
t.Fatal(err)
}

binding := vault.Binding{
Credential: "ssh_key",
Template: "testuser",
Protocols: []string{"ssh"},
}

jumpHost := NewSSHJumpHost(store, proxyHostKey)
jumpHost.HostKeyCallback = ssh.InsecureIgnoreHostKey()

// Run the test many times in a single process to maximize the
// chance the close race fires if the fix regresses. Each iteration
// runs through a fresh proxy connection + fresh agent session.
const iterations = 50
for i := 0; i < iterations; i++ {
agentConn, proxyConn := tcpConnPair(t)

ready := make(chan error, 1)
errCh := make(chan error, 1)
go func() {
errCh <- jumpHost.HandleConnection(proxyConn, []string{sshServer.Addr().String()}, sshServer.Addr().String(), binding, ready)
}()

if setupErr := <-ready; setupErr != nil {
t.Fatalf("iter %d: handler setup: %v", i, setupErr)
}

agentSSH, agentChans, agentReqs, err := ssh.NewClientConn(agentConn, "proxy", &ssh.ClientConfig{
User: "ignored",
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
})
if err != nil {
t.Fatalf("iter %d: agent SSH handshake: %v", i, err)
}

client := ssh.NewClient(agentSSH, agentChans, agentReqs)
session, err := client.NewSession()
if err != nil {
t.Fatalf("iter %d: open session: %v", i, err)
}

output, err := session.Output("whoami")
if err != nil {
t.Fatalf("iter %d: exec: %v (this is the EOF symptom of the close race)", i, err)
}
if string(output) != "ssh-injection-ok\n" {
t.Errorf("iter %d: expected 'ssh-injection-ok', got %q", i, string(output))
}
_ = session.Close()
_ = client.Close()
Comment thread
nnemirovsky marked this conversation as resolved.
_ = agentSSH.Close()
_ = agentConn.Close()

// Wait for HandleConnection to return so a leaked handler
// goroutine (or a connection that fails to teardown after
// close) surfaces as a test timeout rather than as silent
// resource exhaustion on the next iteration. HandleConnection
// returns nil on graceful agent disconnect; a non-nil error
// here would mean the teardown path produced an unexpected
// failure that a future regression could mask.
select {
case handlerErr := <-errCh:
if handlerErr != nil {
t.Fatalf("iter %d: HandleConnection returned error: %v", i, handlerErr)
}
case <-time.After(5 * time.Second):
t.Fatalf("iter %d: HandleConnection did not return within 5s after close", i)
}
}
}

// startBurstCloseSSHServer is a test SSH server that, on the first exec
// request, replies + writes output + sends exit-status + closes the
// channel with no delay between exit-status and Close. The lack of any
// sleep is intentional: it deterministically triggers the close race
// in sluice's SSH jump host when the inflightBarrier fix is absent.
func startBurstCloseSSHServer(t *testing.T, authorizedKey ssh.PublicKey) net.Listener {
t.Helper()

key, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
if err != nil {
t.Fatal(err)
}
hostSigner, err := ssh.NewSignerFromKey(key)
if err != nil {
t.Fatal(err)
}

config := &ssh.ServerConfig{
PublicKeyCallback: func(_ ssh.ConnMetadata, pubKey ssh.PublicKey) (*ssh.Permissions, error) {
if bytes.Equal(pubKey.Marshal(), authorizedKey.Marshal()) {
return &ssh.Permissions{}, nil
}
return nil, fmt.Errorf("unknown public key")
},
}
config.AddHostKey(hostSigner)

ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}

go func() {
for {
conn, err := ln.Accept()
if err != nil {
return
}
go func(c net.Conn) {
sshConn, chans, reqs, err := ssh.NewServerConn(c, config)
if err != nil {
_ = c.Close()
return
}
defer func() { _ = sshConn.Close() }()
go ssh.DiscardRequests(reqs)
for newChan := range chans {
if newChan.ChannelType() != "session" {
_ = newChan.Reject(ssh.UnknownChannelType, "unsupported")
continue
}
ch, chReqs, err := newChan.Accept()
if err != nil {
continue
}
go func(ch ssh.Channel, reqs <-chan *ssh.Request) {
// Defer close so a request loop that exits without
// hitting the exec path (early agent close,
// non-exec request only) still releases the
// server-side channel.
defer func() { _ = ch.Close() }()
for req := range reqs {
if req.Type != "exec" {
if req.WantReply {
_ = req.Reply(false, nil)
}
continue
}
Comment thread
nnemirovsky marked this conversation as resolved.
if req.WantReply {
_ = req.Reply(true, nil)
}
_, _ = ch.Write([]byte("ssh-injection-ok\n"))
_, _ = ch.SendRequest("exit-status", false, ssh.Marshal(struct{ Status uint32 }{0}))
_ = ch.CloseWrite()
// NO time.Sleep here. This is the whole point of
// the test: close immediately after exit-status
// to maximally tighten the race window in
// sluice's sshHandleChannel.
return
}
}(ch, chReqs)
}
}(conn)
}
}()
return ln
}

// TestGenerateSSHHostKey tests SSH host key generation.
func TestGenerateSSHHostKey(t *testing.T) {
signer, err := GenerateSSHHostKey()
Expand Down
Loading