Skip to content

[BP-2.3][FLINK-37663][connector-base] Fix lost wakeup in FutureCompletingBlockingQueue#28467

Open
MartijnVisser wants to merge 2 commits into
apache:release-2.3from
MartijnVisser:backport-FLINK-37663-release-2.3
Open

[BP-2.3][FLINK-37663][connector-base] Fix lost wakeup in FutureCompletingBlockingQueue#28467
MartijnVisser wants to merge 2 commits into
apache:release-2.3from
MartijnVisser:backport-FLINK-37663-release-2.3

Conversation

@MartijnVisser

Copy link
Copy Markdown
Contributor

What is the purpose of the change

Backport of #28463 to release-2.3.

FutureCompletingBlockingQueue (the split-fetcher to source-reader handover) has a lost-wakeup bug. A putter that blocks in put() registers its Condition in the internal notFull queue, but that condition is not always removed when the putter stops waiting:

  • when the putter is gracefully woken via wakeUpPuttingThread(int) (and returns false without enqueuing), and
  • when the putter's await() returns spuriously (permitted by the Condition contract) and it re-blocks, re-adding its condition.

The leftover (stale or duplicated) entry makes a later signalNextPutter(), fired when a consumer frees a slot, signal a thread that is no longer waiting. A genuinely waiting putter is then never woken, so a split fetcher can stall indefinitely.

Brief change log

  • FutureCompletingBlockingQueue#waitOnPut: remove the putter's condition from notFull once await() returns (in a finally), restoring the invariant that notFull only holds conditions of currently waiting putters.
  • Added a @VisibleForTesting accessor (getNumberOfQueuedPutters()) so the regression test can deterministically sequence the two contending putters. Like the existing @VisibleForTesting take() in the same class, this trips the blanket connector "depend only on public API" ArchUnit rule, so it is recorded in the flink-architecture-tests-production violation store alongside the existing entry.
  • Added FutureCompletingBlockingQueueTest#testWakeUpDoesNotStrandAnotherPutter, a deterministic regression test that reproduces the stranded-putter stall and fails on the unfixed code.

Verifying this change

This change added tests and can be verified as follows:

  • FutureCompletingBlockingQueueTest#testWakeUpDoesNotStrandAnotherPutter blocks two putters, wakes the first gracefully, then frees one slot and asserts the second (still-waiting) putter is signalled. It fails on the code before the fix (the still-waiting putter is stranded, the latch times out) and passes after the fix.
  • Existing tests in the module remain green.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no (FutureCompletingBlockingQueue is @Internal)
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

Was generative AI tooling used to co-author this PR?
  • Yes (Claude Code, Claude Opus 4.8)

Generated-by: Claude Code (Claude Opus 4.8)

…ockingQueue lost wakeup

A freed slot signals a putter that wakeUpPuttingThread() already released instead of the genuinely waiting putter; the test reproduces the stall and fails without the fix. Its @VisibleForTesting accessor is recorded in the architecture violation store, like the existing take().

Generated-by: Claude Code (Claude Opus 4.8)
… wakeup

Remove a blocked putter's condition from the notFull queue when its await() returns, so signalNextPutter() no longer signals a putter that has stopped waiting after a graceful wakeUpPuttingThread() or a spurious wakeup.

Generated-by: Claude Code (Claude Opus 4.8)
@flinkbot

flinkbot commented Jun 17, 2026

Copy link
Copy Markdown
Collaborator

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@spuru9 spuru9 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clean Backport

@raminqaf raminqaf left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@github-actions github-actions Bot added target:release-2.3 community-reviewed-LGTM Applied if there are 2 non-committer approves on a PR. (The submitter cannot approve their own PR.) labels Jun 17, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed-LGTM Applied if there are 2 non-committer approves on a PR. (The submitter cannot approve their own PR.) target:release-2.3

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants