Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ Method <org.apache.flink.connector.base.source.hybrid.HybridSource$HybridSourceB
Method <org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.checkAndSignalNoMoreSplits(org.apache.flink.api.connector.source.SplitEnumeratorContext, int, int, int)> calls method <org.apache.flink.api.connector.source.SupportsIntermediateNoMoreSplits.signalIntermediateNoMoreSplits(int)> in (HybridSourceSplitEnumerator.java:450)
Method <org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.checkAndSignalNoMoreSplits(org.apache.flink.api.connector.source.SplitEnumeratorContext, int, int, int)> checks instanceof <org.apache.flink.api.connector.source.SupportsIntermediateNoMoreSplits> in (HybridSourceSplitEnumerator.java:442)
Method <org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.getNumAliveFetchers()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (SplitFetcherManager.java:0)
Method <org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue.getNumberOfQueuedPutters()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (FutureCompletingBlockingQueue.java:0)
Method <org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue.take()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (FutureCompletingBlockingQueue.java:0)
Method <org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction.getSerializer()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (FromElementsGeneratorFunction.java:0)
Method <org.apache.flink.connector.datagen.functions.IndexLookupGeneratorFunction.getSerializer()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (IndexLookupGeneratorFunction.java:0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,16 @@ public int remainingCapacity() {
}
}

@VisibleForTesting
int getNumberOfQueuedPutters() {
lock.lock();
try {
return notFull.size();
} finally {
lock.unlock();
}
}

/**
* Gracefully wakes up the thread with the given {@code threadIndex} if it is blocked in adding
* an element. to the queue. If the thread is blocked in {@link #put(int, Object)} it will
Expand Down Expand Up @@ -363,7 +373,13 @@ private void waitOnPut(int fetcherIndex) throws InterruptedException {
maybeCreateCondition(fetcherIndex);
Condition cond = putConditionAndFlags[fetcherIndex].condition();
notFull.add(cond);
cond.await();
try {
cond.await();
} finally {
// drop the condition once the thread stops waiting, so a later signalNextPutter()
// does not signal a putter that is no longer waiting
notFull.remove(cond);
}
}

@GuardedBy("lock")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@

import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.testutils.CommonTestUtils;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -102,6 +106,56 @@ void testWakeUpPut() throws InterruptedException {
assertThat(latch.getCount()).isEqualTo(0);
}

/**
* A putter gracefully woken via {@link FutureCompletingBlockingQueue#wakeUpPuttingThread(int)}
* must not prevent another, genuinely waiting putter from being signalled once a slot frees up.
* See FLINK-37663.
*/
@Test
@Timeout(value = 60, unit = TimeUnit.SECONDS)
void testWakeUpDoesNotStrandAnotherPutter() throws Exception {
final FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(1);

queue.put(2, 0);

final AtomicBoolean wokenPutterResult = new AtomicBoolean(true);
final Thread wokenPutter =
new Thread(() -> wokenPutterResult.set(putUnchecked(queue, 0, 1)), "wokenPutter");
final CountDownLatch genuinePutterDone = new CountDownLatch(1);
final Thread genuinePutter =
new Thread(
() -> {
putUnchecked(queue, 1, 2);
genuinePutterDone.countDown();
},
"genuinePutter");

// Block putter 0 before putter 1 so putter 0 is at the head of the wait queue.
wokenPutter.start();
CommonTestUtils.waitUntilCondition(() -> queue.getNumberOfQueuedPutters() == 1);
genuinePutter.start();
CommonTestUtils.waitUntilCondition(() -> queue.getNumberOfQueuedPutters() == 2);

queue.wakeUpPuttingThread(0);
wokenPutter.join();
assertThat(wokenPutterResult).isFalse();

queue.poll();

assertThat(genuinePutterDone.await(10, TimeUnit.SECONDS))
.as("A still-waiting putter must be signalled when a slot frees up")
.isTrue();
}

private static boolean putUnchecked(
FutureCompletingBlockingQueue<Integer> queue, int threadIndex, int value) {
try {
return queue.put(threadIndex, value);
} catch (InterruptedException e) {
return fail("Putting thread interrupted unexpectedly.");
}
}

@Test
void testConcurrency() throws InterruptedException {
FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(5);
Expand Down