Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,16 +142,50 @@ public DataGeneratorSource(
rateLimiterStrategy, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
}

/**
* Instantiates a new {@code DataGeneratorSource}.
*
* @param generatorFunction The {@code GeneratorFunction} function.
* @param numberSource The number source.
* @param rateLimiterStrategy The strategy for rate limiting.
* @param typeInfo The type of the produced data points.
*/
public DataGeneratorSource(
GeneratorFunction<Long, OUT> generatorFunction,
NumberSequenceSource numberSource,
RateLimiterStrategy rateLimiterStrategy,
TypeInformation<OUT> typeInfo) {
this(
new GeneratorSourceReaderFactory<>(generatorFunction, rateLimiterStrategy),
generatorFunction,
typeInfo,
numberSource);
ClosureCleaner.clean(
rateLimiterStrategy, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
}

DataGeneratorSource(
SourceReaderFactory<OUT, NumberSequenceSplit> sourceReaderFactory,
GeneratorFunction<Long, OUT> generatorFunction,
long count,
TypeInformation<OUT> typeInfo) {
this(
sourceReaderFactory,
generatorFunction,
typeInfo,
// a noop source (0 elements) is used in Table tests
new NumberSequenceSource(0, count > 0 ? count - 1 : 0));
}

DataGeneratorSource(
SourceReaderFactory<OUT, NumberSequenceSplit> sourceReaderFactory,
GeneratorFunction<Long, OUT> generatorFunction,
TypeInformation<OUT> typeInfo,
NumberSequenceSource numberSource) {
this.sourceReaderFactory = checkNotNull(sourceReaderFactory);
this.generatorFunction = checkNotNull(generatorFunction);
this.typeInfo = checkNotNull(typeInfo);
long to = count > 0 ? count - 1 : 0; // a noop source (0 elements) is used in Table tests
this.numberSource = new NumberSequenceSource(0, to);
this.numberSource = numberSource;
ClosureCleaner.clean(
generatorFunction, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
ClosureCleaner.clean(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.CheckpointingOptions;
Expand Down Expand Up @@ -171,16 +172,10 @@ private StreamExecutionEnvironment getUnalignedCheckpointEnv(@Nullable String re
}

private static JobClient createMultiOutputDAG(StreamExecutionEnvironment env) throws Exception {
DataGeneratorSource<Long> source =
new DataGeneratorSource<>(
index -> index,
Long.MAX_VALUE,
RateLimiterStrategy.perSecond(5000),
Types.LONG);

int sourceParallelism = getRandomParallelism();
DataStream<Long> sourceStream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Data Generator")
env.fromSource(createSource(), WatermarkStrategy.noWatermarks(), "Data Generator")
.setParallelism(sourceParallelism);

sourceStream
Expand Down Expand Up @@ -208,25 +203,13 @@ private static JobClient createMultiOutputDAG(StreamExecutionEnvironment env) th

private static JobClient createMultiInputDAG(StreamExecutionEnvironment env) throws Exception {
int source1Parallelism = getRandomParallelism();
DataGeneratorSource<Long> source1 =
new DataGeneratorSource<>(
index -> index,
Long.MAX_VALUE,
RateLimiterStrategy.perSecond(5000),
Types.LONG);
DataStream<Long> sourceStream1 =
env.fromSource(source1, WatermarkStrategy.noWatermarks(), "Source 1")
env.fromSource(createSource(), WatermarkStrategy.noWatermarks(), "Source 1")
.setParallelism(source1Parallelism);

int source2Parallelism = getRandomParallelism();
DataGeneratorSource<Long> source2 =
new DataGeneratorSource<>(
index -> index,
Long.MAX_VALUE,
RateLimiterStrategy.perSecond(5000),
Types.LONG);
DataStream<Long> sourceStream2 =
env.fromSource(source2, WatermarkStrategy.noWatermarks(), "Source 2")
env.fromSource(createSource(), WatermarkStrategy.noWatermarks(), "Source 2")
.setParallelism(source2Parallelism);

// Keep the same parallelism to ensure the ForwardPartitioner will be used.
Expand All @@ -245,16 +228,10 @@ private static JobClient createMultiInputDAG(StreamExecutionEnvironment env) thr

private static JobClient createRescalePartitionerDAG(StreamExecutionEnvironment env)
throws Exception {
DataGeneratorSource<Long> source =
new DataGeneratorSource<>(
index -> index,
Long.MAX_VALUE,
RateLimiterStrategy.perSecond(5000),
Types.LONG);

int sourceParallelism = getRandomParallelism();
DataStream<Long> sourceStream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Data Generator")
env.fromSource(createSource(), WatermarkStrategy.noWatermarks(), "Data Generator")
.setParallelism(sourceParallelism);

sourceStream
Expand Down Expand Up @@ -284,25 +261,13 @@ private static JobClient createMixedComplexityDAG(StreamExecutionEnvironment env
throws Exception {
// Multi-input part
int source1Parallelism = getRandomParallelism();
DataGeneratorSource<Long> source1 =
new DataGeneratorSource<>(
index -> index,
Long.MAX_VALUE,
RateLimiterStrategy.perSecond(5000),
Types.LONG);
DataStream<Long> sourceStream1 =
env.fromSource(source1, WatermarkStrategy.noWatermarks(), "Source 1")
env.fromSource(createSource(), WatermarkStrategy.noWatermarks(), "Source 1")
.setParallelism(source1Parallelism);

int source2Parallelism = getRandomParallelism();
DataGeneratorSource<Long> source2 =
new DataGeneratorSource<>(
index -> index,
Long.MAX_VALUE,
RateLimiterStrategy.perSecond(5000),
Types.LONG);
DataStream<Long> sourceStream2 =
env.fromSource(source2, WatermarkStrategy.noWatermarks(), "Source 2")
env.fromSource(createSource(), WatermarkStrategy.noWatermarks(), "Source 2")
.setParallelism(source2Parallelism);

// Keep the same parallelism to ensure the ForwardPartitioner will be used.
Expand Down Expand Up @@ -349,27 +314,15 @@ private static JobClient createMixedComplexityDAG(StreamExecutionEnvironment env
private static JobClient createPartEmptyHashExchangeDAG(StreamExecutionEnvironment env)
throws Exception {
int source1Parallelism = getRandomParallelism();
DataGeneratorSource<Long> source1 =
new DataGeneratorSource<>(
index -> index,
Long.MAX_VALUE,
RateLimiterStrategy.perSecond(5000),
Types.LONG);
DataStream<Long> sourceStream1 =
env.fromSource(source1, WatermarkStrategy.noWatermarks(), "Source 1")
env.fromSource(createSource(), WatermarkStrategy.noWatermarks(), "Source 1")
.setParallelism(source1Parallelism);

int source2Parallelism = getRandomParallelism();
DataGeneratorSource<Long> source2 =
new DataGeneratorSource<>(
index -> index,
Long.MAX_VALUE,
RateLimiterStrategy.perSecond(5000),
Types.LONG);

// Filter all records to simulate empty state exchange
DataStream<Long> sourceStream2 =
env.fromSource(source2, WatermarkStrategy.noWatermarks(), "Source 2")
env.fromSource(createSource(), WatermarkStrategy.noWatermarks(), "Source 2")
.setParallelism(source2Parallelism)
.filter(value -> false)
.setParallelism(source2Parallelism);
Expand All @@ -392,6 +345,20 @@ private static int getRandomParallelism() {
return RANDOM.nextInt(MAX_SLOTS) + 1;
}

private static DataGeneratorSource<Long> createSource() {
return new DataGeneratorSource<>(
index -> index,
new NumberSequenceSource(0, Long.MAX_VALUE - 1) {
@Override
protected List<NumberSequenceSplit> splitNumberRange(
long from, long to, int numSplitsIgnored) {
return super.splitNumberRange(from, to, MAX_SLOTS);
}
},
RateLimiterStrategy.perSecond(5000),
Types.LONG) {};
}

/** A simple CoMapFunction that sleeps for 1ms for each element. */
private static class SleepingCoMap implements CoMapFunction<Long, Long, Long> {
@Override
Expand Down