[FLINK-37663][connector-base] Fix lost wakeup in FutureCompletingBlockingQueue#28463
Merged
MartijnVisser merged 2 commits intoJun 17, 2026
Merged
Conversation
Collaborator
…ockingQueue lost wakeup A freed slot signals a putter that wakeUpPuttingThread() already released instead of the genuinely waiting putter; the test reproduces the stall and fails without the fix. Its @VisibleForTesting accessor is recorded in the architecture violation store, like the existing take(). Generated-by: Claude Code (Claude Opus 4.8)
… wakeup Remove a blocked putter's condition from the notFull queue when its await() returns, so signalNextPutter() no longer signals a putter that has stopped waiting after a graceful wakeUpPuttingThread() or a spurious wakeup. Generated-by: Claude Code (Claude Opus 4.8)
cca0d9d to
71031be
Compare
This was referenced Jun 17, 2026
Contributor
CI has passed. There is some issue with the comment reflecting pass. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What is the purpose of the change
FutureCompletingBlockingQueue(the split-fetcher to source-reader handover) has a lost-wakeup bug.A putter that blocks in
put()registers itsConditionin the internalnotFullqueue, but thatcondition is not always removed when the putter stops waiting:
wakeUpPuttingThread(int)(and returnsfalsewithoutenqueuing), and
await()returns spuriously (permitted by theConditioncontract) and itre-blocks, re-adding its condition.
The leftover (stale or duplicated) entry makes a later
signalNextPutter(), fired when a consumerfrees a slot, signal a thread that is no longer waiting. A genuinely waiting putter is then never
woken, so a split fetcher can stall indefinitely.
This supersedes the stale #26446 by @herbherbherb, which first diagnosed this issue and proposed
removing the condition inside
wakeUpPuttingThread. That covers the graceful-wakeup case but not theduplicate-condition case (
remove()deletes only one occurrence), so the lost wakeup can stilloccur; this PR instead removes the condition on every
await()return.Brief change log
FutureCompletingBlockingQueue#waitOnPut: remove the putter's condition fromnotFullonceawait()returns (in afinally), restoring the invariant thatnotFullonly holds conditions ofcurrently waiting putters. The removal is
O(number of concurrently blocked putters)and is off theper-record path (it runs only when a putter unblocks).
@VisibleForTestingaccessor (getNumberOfQueuedPutters()) so the regression test candeterministically sequence the two contending putters without depending on
Thread.State. Like theexisting
@VisibleForTesting take()in the same class, this trips the blanket connector "dependonly on public API" ArchUnit rule (because
@VisibleForTestingis@Internal), so it is recordedin the
flink-architecture-tests-productionviolation store alongside the existing entry.FutureCompletingBlockingQueueTest#testWakeUpDoesNotStrandAnotherPutter, a deterministicregression test (first commit) that reproduces the stranded-putter stall and fails on the current
code. The 10s latch wait is the functional assertion; the method-level
@Timeout(60s)is a hangbackstop for this deadlock-regression test.
The change is split into two commits: the first adds the failing reproduction test together with the
small
@VisibleForTestingaccessor it needs to sequence the putters deterministically; the secondapplies the fix. (The accessor is grouped with the test it enables rather than with the behavioural
change.)
Verifying this change
This change added tests and can be verified as follows:
FutureCompletingBlockingQueueTest#testWakeUpDoesNotStrandAnotherPutterblocks two putters, wakesthe first gracefully, then frees one slot and asserts the second (still-waiting) putter is
signalled. It is split into a reproduction commit and a fix commit for red-green verification: it
fails on the code before the fix (the still-waiting putter is stranded, the latch times out) and
passes after the fix.
FutureCompletingBlockingQueueTest,SplitFetcherTest,SplitFetcherManagerTest,SourceReaderBaseTest).dependency): on the unfixed code Fray deterministically reproduces the deadlock; with PR [FLINK-37663] [connector-base] Address potential sync issue in FutureCompletingBlockingQueue wakeup #26446's
one-line change it still deadlocks (duplicate-condition case); with this fix it ran 5000 explored
interleavings with no deadlock.
Does this pull request potentially affect one of the following parts:
@Public(Evolving): no (FutureCompletingBlockingQueueis@Internal)Documentation
Was generative AI tooling used to co-author this PR?
Generated-by: Claude Code (Claude Opus 4.8)