diff --git a/internal/proxy/ssh.go b/internal/proxy/ssh.go index d464565..d327892 100644 --- a/internal/proxy/ssh.go +++ b/internal/proxy/ssh.go @@ -11,6 +11,7 @@ import ( "net" "os" "path/filepath" + "sync" "github.com/nemirovsky/sluice/internal/vault" "golang.org/x/crypto/ssh" @@ -263,8 +264,33 @@ func sshHandleChannel(newChan ssh.NewChannel, dst ssh.Conn) { // from upstream finish, each signaling on this channel. upstreamDone := make(chan struct{}, 3) - // Forward per-channel requests bidirectionally. - go sshForwardChannelRequests(srcReqs, dstChan) + // Track agent-to-upstream requests that are mid-flight. Each request + // the agent sends has to be forwarded to upstream, awaited for a + // reply (when WantReply is true), and replied to on the agent side + // before sluice may close srcChan. Without this barrier, a fast + // upstream that replies + writes data + sends exit-status + closes + // in one burst lets sluice drain all three upstream-to-agent + // goroutines and close srcChan while this forwarder is still + // mid-reply for the original exec request. The agent then observes + // SSH_MSG_CHANNEL_CLOSE before its SendRequest("exec", true, ...) + // receives a SUCCESS/FAILURE on ch.msg, and the gossh client + // surfaces the closed ch.msg as io.EOF. + // + // sync.WaitGroup is the wrong primitive here because Add and Wait + // are not safe to call concurrently when the counter is at zero + // (Go runtime panics with "sync: WaitGroup misuse"). The forwarder + // goroutine ranges over srcReqs and could enter a new iteration at + // any moment, racing the main goroutine's drain. We use a mutex + + // cond + draining flag instead: once draining is set, the forwarder + // rejects further requests so Wait() can converge. + barrier := &inflightBarrier{} + barrier.cond = sync.NewCond(&barrier.mu) + + // Forward per-channel requests bidirectionally. The agent-to-upstream + // loop reports each request via barrier so sluice's pre-close + // drain knows when none are pending. The upstream-to-agent loop + // signals upstreamDone when dstReqs closes. + go sshForwardAgentRequests(srcReqs, dstChan, barrier) go func() { sshForwardChannelRequests(dstReqs, srcChan) upstreamDone <- struct{}{} @@ -304,10 +330,24 @@ func sshHandleChannel(newChan ssh.NewChannel, dst ssh.Conn) { <-upstreamDone <-upstreamDone + // Also drain any agent-to-upstream request that is mid-flight. A + // pending WantReply=true request is waiting on dst.SendRequest to + // return, after which it still has to call req.Reply on the agent + // side. Closing srcChan before that reply is written would let the + // agent see channel-close before the SUCCESS/FAILURE message on + // ch.msg, which gossh surfaces as io.EOF from + // session.SendRequest("exec", true, ...). + // + // Drain sets a draining flag (so the forwarder rejects any further + // request without bumping the counter) and waits on the cond for + // the current iteration, if any, to finish. + barrier.drain() + // Now that exit-status has been forwarded (the dstReqs goroutine - // has finished), signal stdout EOF to the agent and close the - // channel. The agent's session.Wait() now sees the documented - // order: data, exit-status, EOF, close. + // has finished) and every pending agent-side reply has been + // written, signal stdout EOF to the agent and close the channel. + // The agent's session.Wait() now sees the documented order: + // data, exit-status, EOF, close. _ = srcChan.CloseWrite() _ = srcChan.Close() _ = dstChan.Close() @@ -328,3 +368,95 @@ func sshForwardChannelRequests(reqs <-chan *ssh.Request, dst ssh.Channel) { } } } + +// inflightBarrier serializes the agent-to-upstream request forwarder +// with sshHandleChannel's pre-close drain. The forwarder calls enter() +// before forwarding a request to upstream and leave() after replying to +// the agent. sshHandleChannel calls drain() once the upstream side has +// fully closed: drain sets the draining flag (so any further enter() +// returns false and the forwarder rejects the request without waiting +// on a closed upstream) and blocks until count reaches zero. +// +// The mutex+cond pattern avoids the Add/Wait race that a sync.WaitGroup +// would have: with a WaitGroup the forwarder's loop could call Add(1) +// at the same instant sshHandleChannel called Wait() with the counter +// at zero, and the Go runtime panics on that interleaving. +type inflightBarrier struct { + mu sync.Mutex + cond *sync.Cond + count int + draining bool +} + +// enter reports the start of a request handler. Returns false if drain +// has already begun, in which case the caller must NOT proceed to +// forward the request to a possibly-closed upstream. +func (b *inflightBarrier) enter() bool { + b.mu.Lock() + defer b.mu.Unlock() + if b.draining { + return false + } + b.count++ + return true +} + +// leave matches a successful enter. When the counter reaches zero +// during draining, the waiter is signaled. +func (b *inflightBarrier) leave() { + b.mu.Lock() + b.count-- + if b.count == 0 && b.draining { + b.cond.Broadcast() + } + b.mu.Unlock() +} + +// drain sets the draining flag (locking out new enters) and blocks +// until any currently in-flight handlers call leave. +func (b *inflightBarrier) drain() { + b.mu.Lock() + b.draining = true + for b.count > 0 { + b.cond.Wait() + } + b.mu.Unlock() +} + +// sshForwardAgentRequests is the agent-to-upstream variant of +// sshForwardChannelRequests. It coordinates with sshHandleChannel's +// pre-close drain via inflightBarrier so the reply on the agent +// direction (req.Reply on srcChan) is fully written before sluice +// closes srcChan. Otherwise an agent that called +// session.SendRequest("exec", WantReply=true, ...) can observe +// SSH_MSG_CHANNEL_CLOSE before its ch.msg receives the +// CHANNEL_REQUEST_SUCCESS reply — gossh surfaces a closed ch.msg as +// io.EOF, and `session.Output("cmd")` fails with EOF even though the +// upstream replied successfully. +// +// When drain has already begun, the request is rejected without being +// forwarded to upstream: the upstream channel is closing, so any reply +// from upstream would never arrive. Replying false to the agent on a +// WantReply request unblocks any caller waiting on ch.msg. +func sshForwardAgentRequests(reqs <-chan *ssh.Request, dst ssh.Channel, barrier *inflightBarrier) { + for req := range reqs { + if !barrier.enter() { + if req.WantReply { + _ = req.Reply(false, nil) + } + continue + } + ok, err := dst.SendRequest(req.Type, req.WantReply, req.Payload) + if err != nil { + if req.WantReply { + _ = req.Reply(false, nil) + } + barrier.leave() + continue + } + if req.WantReply { + _ = req.Reply(ok, nil) + } + barrier.leave() + } +} diff --git a/internal/proxy/ssh_test.go b/internal/proxy/ssh_test.go index f2df9f8..35c8f2c 100644 --- a/internal/proxy/ssh_test.go +++ b/internal/proxy/ssh_test.go @@ -406,6 +406,196 @@ func TestResolveHostKeyCallbackNoKnownHosts(t *testing.T) { } } +// TestSSHJumpHost_BurstCloseDoesNotDropExecReply is a focused regression +// test for the race where an upstream that replies + writes data + sends +// exit-status + closes in one burst (no sleep between exit-status and +// close) caused sluice to close srcChan before the agent-to-upstream +// forwarder finished writing the SUCCESS reply for the agent's +// session.SendRequest("exec", true, ...). gossh closes ch.msg on +// SSH_MSG_CHANNEL_CLOSE, so the blocked SendRequest returns io.EOF and +// session.Output(...) fails with "exec command via SSH: EOF". +// +// startTestSSHServer (used by other tests) papers over the race with a +// 50ms sleep before returning from the channel handler. This test +// spins up its own burst-close server with no such sleep, so the race +// is deterministically triggered without the inflightBarrier fix. +func TestSSHJumpHost_BurstCloseDoesNotDropExecReply(t *testing.T) { + pubKey, privPEM := generateTestSSHKey(t) + dir := t.TempDir() + store, err := vault.NewStore(dir) + if err != nil { + t.Fatal(err) + } + if _, err := store.Add("ssh_key", string(privPEM)); err != nil { + t.Fatal(err) + } + + sshServer := startBurstCloseSSHServer(t, pubKey) + defer func() { _ = sshServer.Close() }() + + proxyHostKey, err := GenerateSSHHostKey() + if err != nil { + t.Fatal(err) + } + + binding := vault.Binding{ + Credential: "ssh_key", + Template: "testuser", + Protocols: []string{"ssh"}, + } + + jumpHost := NewSSHJumpHost(store, proxyHostKey) + jumpHost.HostKeyCallback = ssh.InsecureIgnoreHostKey() + + // Run the test many times in a single process to maximize the + // chance the close race fires if the fix regresses. Each iteration + // runs through a fresh proxy connection + fresh agent session. + const iterations = 50 + for i := 0; i < iterations; i++ { + agentConn, proxyConn := tcpConnPair(t) + + ready := make(chan error, 1) + errCh := make(chan error, 1) + go func() { + errCh <- jumpHost.HandleConnection(proxyConn, []string{sshServer.Addr().String()}, sshServer.Addr().String(), binding, ready) + }() + + if setupErr := <-ready; setupErr != nil { + t.Fatalf("iter %d: handler setup: %v", i, setupErr) + } + + agentSSH, agentChans, agentReqs, err := ssh.NewClientConn(agentConn, "proxy", &ssh.ClientConfig{ + User: "ignored", + HostKeyCallback: ssh.InsecureIgnoreHostKey(), + }) + if err != nil { + t.Fatalf("iter %d: agent SSH handshake: %v", i, err) + } + + client := ssh.NewClient(agentSSH, agentChans, agentReqs) + session, err := client.NewSession() + if err != nil { + t.Fatalf("iter %d: open session: %v", i, err) + } + + output, err := session.Output("whoami") + if err != nil { + t.Fatalf("iter %d: exec: %v (this is the EOF symptom of the close race)", i, err) + } + if string(output) != "ssh-injection-ok\n" { + t.Errorf("iter %d: expected 'ssh-injection-ok', got %q", i, string(output)) + } + _ = session.Close() + _ = client.Close() + _ = agentSSH.Close() + _ = agentConn.Close() + + // Wait for HandleConnection to return so a leaked handler + // goroutine (or a connection that fails to teardown after + // close) surfaces as a test timeout rather than as silent + // resource exhaustion on the next iteration. HandleConnection + // returns nil on graceful agent disconnect; a non-nil error + // here would mean the teardown path produced an unexpected + // failure that a future regression could mask. + select { + case handlerErr := <-errCh: + if handlerErr != nil { + t.Fatalf("iter %d: HandleConnection returned error: %v", i, handlerErr) + } + case <-time.After(5 * time.Second): + t.Fatalf("iter %d: HandleConnection did not return within 5s after close", i) + } + } +} + +// startBurstCloseSSHServer is a test SSH server that, on the first exec +// request, replies + writes output + sends exit-status + closes the +// channel with no delay between exit-status and Close. The lack of any +// sleep is intentional: it deterministically triggers the close race +// in sluice's SSH jump host when the inflightBarrier fix is absent. +func startBurstCloseSSHServer(t *testing.T, authorizedKey ssh.PublicKey) net.Listener { + t.Helper() + + key, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) + if err != nil { + t.Fatal(err) + } + hostSigner, err := ssh.NewSignerFromKey(key) + if err != nil { + t.Fatal(err) + } + + config := &ssh.ServerConfig{ + PublicKeyCallback: func(_ ssh.ConnMetadata, pubKey ssh.PublicKey) (*ssh.Permissions, error) { + if bytes.Equal(pubKey.Marshal(), authorizedKey.Marshal()) { + return &ssh.Permissions{}, nil + } + return nil, fmt.Errorf("unknown public key") + }, + } + config.AddHostKey(hostSigner) + + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + + go func() { + for { + conn, err := ln.Accept() + if err != nil { + return + } + go func(c net.Conn) { + sshConn, chans, reqs, err := ssh.NewServerConn(c, config) + if err != nil { + _ = c.Close() + return + } + defer func() { _ = sshConn.Close() }() + go ssh.DiscardRequests(reqs) + for newChan := range chans { + if newChan.ChannelType() != "session" { + _ = newChan.Reject(ssh.UnknownChannelType, "unsupported") + continue + } + ch, chReqs, err := newChan.Accept() + if err != nil { + continue + } + go func(ch ssh.Channel, reqs <-chan *ssh.Request) { + // Defer close so a request loop that exits without + // hitting the exec path (early agent close, + // non-exec request only) still releases the + // server-side channel. + defer func() { _ = ch.Close() }() + for req := range reqs { + if req.Type != "exec" { + if req.WantReply { + _ = req.Reply(false, nil) + } + continue + } + if req.WantReply { + _ = req.Reply(true, nil) + } + _, _ = ch.Write([]byte("ssh-injection-ok\n")) + _, _ = ch.SendRequest("exit-status", false, ssh.Marshal(struct{ Status uint32 }{0})) + _ = ch.CloseWrite() + // NO time.Sleep here. This is the whole point of + // the test: close immediately after exit-status + // to maximally tighten the race window in + // sluice's sshHandleChannel. + return + } + }(ch, chReqs) + } + }(conn) + } + }() + return ln +} + // TestGenerateSSHHostKey tests SSH host key generation. func TestGenerateSSHHostKey(t *testing.T) { signer, err := GenerateSSHHostKey()