Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion kv/raft_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,30 @@ package kv

import (
"context"
"time"

"github.com/bootjp/elastickv/internal/monoclock"
"github.com/bootjp/elastickv/internal/raftengine"
"github.com/cockroachdb/errors"
)

// verifyLeaderTimeout caps how long the no-context verifyLeaderEngine path
// is willing to wait for a ReadIndex round-trip. Without this bound,
// callers that hold context.Background() (LeaderProxy.Commit/Abort,
// Coordinate.VerifyLeader, ShardedCoordinator.VerifyLeader[ForKey], and
// the S3/SQS/admin /healthz/leader handlers) blocked indefinitely whenever
// ReadIndex completion stalled, and a single transient stall accumulated
// callers permanently — Engine.run's Ready loop walks pendingReads O(N)
// per tick, so the queue feeds back on itself once it grows.
//
// 5s matches leaderForwardTimeout: a verify that takes longer than a
// single forward RPC is useless as a freshness check, and the proxy's
// verify-then-forward path stays within its 5s retry budget.
//
// See PR #745 / incident 2026-05-08 for the goroutine-pile production
// failure this prevents.
const verifyLeaderTimeout = 5 * time.Second

func engineForGroup(g *ShardGroup) raftengine.Engine {
if g == nil {
return nil
Expand Down Expand Up @@ -41,7 +59,9 @@ func verifyLeaderEngineCtx(ctx context.Context, engine raftengine.LeaderView) er
}

func verifyLeaderEngine(engine raftengine.LeaderView) error {
return verifyLeaderEngineCtx(context.Background(), engine)
ctx, cancel := context.WithTimeout(context.Background(), verifyLeaderTimeout)
defer cancel()
return verifyLeaderEngineCtx(ctx, engine)
}

func linearizableReadEngineCtx(ctx context.Context, engine raftengine.LeaderView) (uint64, error) {
Expand Down
73 changes: 73 additions & 0 deletions kv/raft_engine_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package kv

import (
"context"
"testing"
"time"

"github.com/bootjp/elastickv/internal/raftengine"
"github.com/cockroachdb/errors"
)

// blockingLeaderView is a LeaderView whose VerifyLeader blocks until ctx is
// cancelled, modelling the production pathology where ReadIndex stalls
// because heartbeat acks fail to land. LinearizableRead is similarly
// well-behaved on cancel; State / Leader are stamped enough to satisfy the
// callers under test.
type blockingLeaderView struct{}

func (blockingLeaderView) State() raftengine.State { return raftengine.StateLeader }
func (blockingLeaderView) Leader() raftengine.LeaderInfo { return raftengine.LeaderInfo{ID: "self"} }
func (blockingLeaderView) VerifyLeader(ctx context.Context) error {
<-ctx.Done()
return ctx.Err()
}
func (blockingLeaderView) LinearizableRead(ctx context.Context) (uint64, error) {
<-ctx.Done()
return 0, ctx.Err()
}

// TestVerifyLeaderEngine_BoundsBlockingReadIndex pins the regression: if a
// stalled ReadIndex used to return only when the underlying ctx fired, but
// callers passed context.Background(), the goroutine pinned forever. After
// the 2026-05-08 incident this must complete within roughly
// verifyLeaderTimeout, surfacing context.DeadlineExceeded.
//
// Skipped under -short because the whole point is to wait for the deadline
// to fire; the no-skip path adds verifyLeaderTimeout (5s) to every default
// `make test` run.
func TestVerifyLeaderEngine_BoundsBlockingReadIndex(t *testing.T) {
t.Parallel()
if testing.Short() {
t.Skip("skipping: blocks for verifyLeaderTimeout (5s)")
}

start := time.Now()
err := verifyLeaderEngine(blockingLeaderView{})
elapsed := time.Since(start)

if err == nil {
t.Fatalf("verifyLeaderEngine(blocking) returned nil; expected DeadlineExceeded")
}
if !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("verifyLeaderEngine(blocking) err = %v; want DeadlineExceeded", err)
}
// Lower bound: confirm the engine actually held the call until the
// deadline fired, not that some other error path returned
// immediately. Without this, a future regression that returned
// DeadlineExceeded before doing any work (e.g. a misplaced ctx
// check before the engine call) would silently pass.
//
// Tolerate a 200ms early-return slack so a slow CI scheduler that
// trips ctx.Done() a hair before the wall clock catches up does
// not flake.
const slack = 200 * time.Millisecond
if elapsed+slack < verifyLeaderTimeout {
t.Fatalf("verifyLeaderEngine(blocking) returned too early after %s; want >= %s (-%s slack)", elapsed, verifyLeaderTimeout, slack)
}
// Upper bound: prove the call returned at all. Generous so a slow
// CI host does not flake.
if elapsed > 2*verifyLeaderTimeout {
t.Fatalf("verifyLeaderEngine(blocking) returned after %s; want <= 2x verifyLeaderTimeout (%s)", elapsed, verifyLeaderTimeout)
}
Comment on lines +70 to +72
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To ensure that the test is actually exercising the timeout logic and not returning early for some other reason, consider asserting a minimum elapsed time. This confirms that the blockingLeaderView mock correctly held the call until the context deadline was reached.

	if elapsed < verifyLeaderTimeout {
		t.Fatalf("verifyLeaderEngine(blocking) returned too early after %s; want >= %s", elapsed, verifyLeaderTimeout)
	}
	if elapsed > 2*verifyLeaderTimeout {
		t.Fatalf("verifyLeaderEngine(blocking) returned after %s; want <= 2x verifyLeaderTimeout (%s)", elapsed, verifyLeaderTimeout)
	}

}
Loading