From 3e1cefd7f781e29b8fdbe533dfbb85716d5428de Mon Sep 17 00:00:00 2001 From: Martijn Visser <2989614+MartijnVisser@users.noreply.github.com> Date: Tue, 16 Jun 2026 15:12:10 -0700 Subject: [PATCH 1/2] [FLINK-37663][connector-base] Add test reproducing FutureCompletingBlockingQueue lost wakeup A freed slot signals a putter that wakeUpPuttingThread() already released instead of the genuinely waiting putter; the test reproduces the stall and fails without the fix. Its @VisibleForTesting accessor is recorded in the architecture violation store, like the existing take(). Generated-by: Claude Code (Claude Opus 4.8) --- .../f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e | 1 + .../FutureCompletingBlockingQueue.java | 10 ++++ .../FutureCompletingBlockingQueueTest.java | 54 +++++++++++++++++++ 3 files changed, 65 insertions(+) diff --git a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e index 4425343714287..11cbe707c435f 100644 --- a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e +++ b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e @@ -187,6 +187,7 @@ Method calls method in (HybridSourceSplitEnumerator.java:450) Method checks instanceof in (HybridSourceSplitEnumerator.java:442) Method is annotated with in (SplitFetcherManager.java:0) +Method is annotated with in (FutureCompletingBlockingQueue.java:0) Method is annotated with in (FutureCompletingBlockingQueue.java:0) Method is annotated with in (FromElementsGeneratorFunction.java:0) Method is annotated with in (IndexLookupGeneratorFunction.java:0) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java index a341c52b9d289..aaf90f4b5eed7 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java @@ -307,6 +307,16 @@ public int remainingCapacity() { } } + @VisibleForTesting + int getNumberOfQueuedPutters() { + lock.lock(); + try { + return notFull.size(); + } finally { + lock.unlock(); + } + } + /** * Gracefully wakes up the thread with the given {@code threadIndex} if it is blocked in adding * an element. to the queue. If the thread is blocked in {@link #put(int, Object)} it will diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java index a19e07346e08d..65e7ff38b9239 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java @@ -20,14 +20,18 @@ import org.apache.flink.connector.base.source.reader.SourceReaderOptions; import org.apache.flink.runtime.io.AvailabilityProvider; +import org.apache.flink.runtime.testutils.CommonTestUtils; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import java.util.ArrayList; import java.util.BitSet; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; @@ -102,6 +106,56 @@ void testWakeUpPut() throws InterruptedException { assertThat(latch.getCount()).isEqualTo(0); } + /** + * A putter gracefully woken via {@link FutureCompletingBlockingQueue#wakeUpPuttingThread(int)} + * must not prevent another, genuinely waiting putter from being signalled once a slot frees up. + * See FLINK-37663. + */ + @Test + @Timeout(value = 60, unit = TimeUnit.SECONDS) + void testWakeUpDoesNotStrandAnotherPutter() throws Exception { + final FutureCompletingBlockingQueue queue = new FutureCompletingBlockingQueue<>(1); + + queue.put(2, 0); + + final AtomicBoolean wokenPutterResult = new AtomicBoolean(true); + final Thread wokenPutter = + new Thread(() -> wokenPutterResult.set(putUnchecked(queue, 0, 1)), "wokenPutter"); + final CountDownLatch genuinePutterDone = new CountDownLatch(1); + final Thread genuinePutter = + new Thread( + () -> { + putUnchecked(queue, 1, 2); + genuinePutterDone.countDown(); + }, + "genuinePutter"); + + // Block putter 0 before putter 1 so putter 0 is at the head of the wait queue. + wokenPutter.start(); + CommonTestUtils.waitUntilCondition(() -> queue.getNumberOfQueuedPutters() == 1); + genuinePutter.start(); + CommonTestUtils.waitUntilCondition(() -> queue.getNumberOfQueuedPutters() == 2); + + queue.wakeUpPuttingThread(0); + wokenPutter.join(); + assertThat(wokenPutterResult).isFalse(); + + queue.poll(); + + assertThat(genuinePutterDone.await(10, TimeUnit.SECONDS)) + .as("A still-waiting putter must be signalled when a slot frees up") + .isTrue(); + } + + private static boolean putUnchecked( + FutureCompletingBlockingQueue queue, int threadIndex, int value) { + try { + return queue.put(threadIndex, value); + } catch (InterruptedException e) { + return fail("Putting thread interrupted unexpectedly."); + } + } + @Test void testConcurrency() throws InterruptedException { FutureCompletingBlockingQueue queue = new FutureCompletingBlockingQueue<>(5); From 5f1e759cbffc5913f26245c335ba1325decdce44 Mon Sep 17 00:00:00 2001 From: Martijn Visser <2989614+MartijnVisser@users.noreply.github.com> Date: Tue, 16 Jun 2026 15:14:42 -0700 Subject: [PATCH 2/2] [FLINK-37663][connector-base] Remove stale put conditions to fix lost wakeup Remove a blocked putter's condition from the notFull queue when its await() returns, so signalNextPutter() no longer signals a putter that has stopped waiting after a graceful wakeUpPuttingThread() or a spurious wakeup. Generated-by: Claude Code (Claude Opus 4.8) --- .../synchronization/FutureCompletingBlockingQueue.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java index aaf90f4b5eed7..799fd0c4489b9 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java @@ -373,7 +373,13 @@ private void waitOnPut(int fetcherIndex) throws InterruptedException { maybeCreateCondition(fetcherIndex); Condition cond = putConditionAndFlags[fetcherIndex].condition(); notFull.add(cond); - cond.await(); + try { + cond.await(); + } finally { + // drop the condition once the thread stops waiting, so a later signalNextPutter() + // does not signal a putter that is no longer waiting + notFull.remove(cond); + } } @GuardedBy("lock")