Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -430,7 +431,9 @@ private void setVertexNonChainedOutputsConfig(
private void connectToFinishedUpStreamVertex(JobVertexBuildContext jobVertexBuildContext) {
Map<Integer, OperatorChainInfo> chainInfos = jobVertexBuildContext.getChainInfosInOrder();
for (OperatorChainInfo chainInfo : chainInfos.values()) {
List<StreamEdge> transitiveInEdges = chainInfo.getTransitiveInEdges();
List<StreamEdge> transitiveInEdges =
getTransitiveInEdgesInOrder(
chainInfo.getTransitiveInEdges(), jobVertexBuildContext);
for (StreamEdge transitiveInEdge : transitiveInEdges) {
NonChainedOutput output =
intermediateOutputsCaches
Expand All @@ -447,6 +450,55 @@ private void connectToFinishedUpStreamVertex(JobVertexBuildContext jobVertexBuil
}
}

private List<StreamEdge> getTransitiveInEdgesInOrder(
List<StreamEdge> transitiveInEdges, JobVertexBuildContext jobVertexBuildContext) {
final List<StreamEdge> transitiveInEdgesInOrder =
transitiveInEdges.stream()
.sorted(
Comparator.comparing(
inEdge -> getStartNodeId(inEdge.getSourceId())))
.collect(Collectors.toList());
final List<StreamEdge> uidTransitiveInEdges =
transitiveInEdgesInOrder.stream()
.filter(this::hasUidBackedUpstream)
.sorted(
Comparator.comparing(
inEdge ->
getStartNodeJobVertexId(
inEdge, jobVertexBuildContext)))
.collect(Collectors.toList());

if (uidTransitiveInEdges.size() < 2) {
return transitiveInEdgesInOrder;
}

int uidTransitiveInEdgeIndex = 0;
for (int transitiveInEdgeIndex = 0;
transitiveInEdgeIndex < transitiveInEdgesInOrder.size();
transitiveInEdgeIndex++) {
if (hasUidBackedUpstream(transitiveInEdgesInOrder.get(transitiveInEdgeIndex))) {
transitiveInEdgesInOrder.set(
transitiveInEdgeIndex,
uidTransitiveInEdges.get(uidTransitiveInEdgeIndex++));
}
}
return transitiveInEdgesInOrder;
}

private boolean hasUidBackedUpstream(StreamEdge inEdge) {
return streamGraph
.getStreamNode(getStartNodeId(inEdge.getSourceId()))
.getTransformationUID()
!= null;
}

private JobVertexID getStartNodeJobVertexId(
StreamEdge inEdge, JobVertexBuildContext jobVertexBuildContext) {
return new JobVertexID(
Preconditions.checkNotNull(
jobVertexBuildContext.getHash(getStartNodeId(inEdge.getSourceId()))));
}

private void recordCreatedJobVerticesInfo(JobVertexBuildContext jobVertexBuildContext) {
Map<Integer, OperatorChainInfo> chainInfos = jobVertexBuildContext.getChainInfosInOrder();
for (OperatorChainInfo chainInfo : chainInfos.values()) {
Expand All @@ -473,11 +525,17 @@ private void createOperatorChainInfos(
final Map<Integer, OperatorChainInfo> chainEntryPoints =
buildAndGetChainEntryPoints(streamNodes, jobVertexBuildContext);

chainEntryPoints.values().stream()
.filter(
chainInfo ->
streamGraph
.getStreamNode(chainInfo.getStartNodeId())
.getTransformationUID()
!= null)
.forEach(chainInfo -> generateHashesByStreamNodeId(chainInfo.getStartNodeId()));
final Collection<OperatorChainInfo> initialEntryPoints =
chainEntryPoints.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.map(Map.Entry::getValue)
.collect(Collectors.toList());
StreamingJobGraphGenerator.getInitialEntryPoints(
chainEntryPoints.values(), jobVertexBuildContext);

for (OperatorChainInfo info : initialEntryPoints) {
// We use generateHashesByStreamNodeId to subscribe the visited stream node id and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -607,10 +607,7 @@ private void setChaining() {
final Map<Integer, OperatorChainInfo> chainEntryPoints =
buildChainedInputsAndGetHeadInputs();
final Collection<OperatorChainInfo> initialEntryPoints =
chainEntryPoints.entrySet().stream()
.sorted(Comparator.comparing(Map.Entry::getKey))
.map(Map.Entry::getValue)
.collect(Collectors.toList());
getInitialEntryPoints(chainEntryPoints.values(), jobVertexBuildContext);

// iterate over a copy of the values, because this map gets concurrently modified
for (OperatorChainInfo info : initialEntryPoints) {
Expand All @@ -626,6 +623,57 @@ private void setChaining() {
}
}

static Collection<OperatorChainInfo> getInitialEntryPoints(
Collection<OperatorChainInfo> chainEntryPoints,
JobVertexBuildContext jobVertexBuildContext) {
final List<OperatorChainInfo> initialEntryPoints =
chainEntryPoints.stream()
.sorted(Comparator.comparing(OperatorChainInfo::getStartNodeId))
.collect(Collectors.toList());
final List<OperatorChainInfo> uidEntryPoints =
initialEntryPoints.stream()
.filter(
chainInfo ->
hasTransformationUid(
chainInfo, jobVertexBuildContext.getStreamGraph()))
.sorted(
Comparator.comparing(
chainInfo ->
getStartNodeJobVertexId(
chainInfo, jobVertexBuildContext)))
.collect(Collectors.toList());

if (uidEntryPoints.size() < 2) {
return initialEntryPoints;
}

int uidEntryPointIndex = 0;
for (int entryPointIndex = 0;
entryPointIndex < initialEntryPoints.size();
entryPointIndex++) {
if (hasTransformationUid(
initialEntryPoints.get(entryPointIndex),
jobVertexBuildContext.getStreamGraph())) {
// Input gates are restored by position, so align source traversal with the stable
// JobVertexID for UIDed heads while keeping the other heads in their legacy
// positions.
initialEntryPoints.set(entryPointIndex, uidEntryPoints.get(uidEntryPointIndex++));
}
}
return initialEntryPoints;
}

private static boolean hasTransformationUid(
OperatorChainInfo chainInfo, StreamGraph streamGraph) {
return streamGraph.getStreamNode(chainInfo.getStartNodeId()).getTransformationUID() != null;
}

private static JobVertexID getStartNodeJobVertexId(
OperatorChainInfo chainInfo, JobVertexBuildContext jobVertexBuildContext) {
return new JobVertexID(
checkNotNull(jobVertexBuildContext.getHash(chainInfo.getStartNodeId())));
}

public static List<StreamEdge> createChain(
final Integer currentNodeId,
final int chainIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1187,6 +1187,24 @@ void testDeterministicUnionOrder() {
}
}

@Test
void testStableUnionInputOrderWithOperatorUids() {
assertThat(getInputSourceNames(getUidUnionJobGraph(false)))
.isEqualTo(getInputSourceNames(getUidUnionJobGraph(true)));
}

@Test
void testStableUnionInputOrderWithOperatorUidsAndUnrelatedSource() {
assertThat(getMultiInputSourceNames(getUidUnionJobGraphWithUnrelatedSource(false)))
.isEqualTo(getMultiInputSourceNames(getUidUnionJobGraphWithUnrelatedSource(true)));
}

@Test
void testUidHashUnionInputOrderKeepsDeclarationOrder() {
assertThat(getInputSourceNames(getUidHashUnionJobGraph()))
.containsExactly("Source: source-b", "Source: source-a");
}

private JobGraph getUnionJobGraph(StreamExecutionEnvironment env) {

createSource(env, 1)
Expand All @@ -1202,6 +1220,87 @@ private DataStream<Integer> createSource(StreamExecutionEnvironment env, int ind
return env.fromData(index).name("source" + index).map(i -> i).name("map" + index);
}

private JobGraph getUidUnionJobGraph(boolean reverseSources) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
env.disableOperatorChaining();

DataStream<Integer> firstSource =
createUidSource(
env, reverseSources ? "source-b" : "source-a", reverseSources ? 2 : 1);
DataStream<Integer> secondSource =
createUidSource(
env, reverseSources ? "source-a" : "source-b", reverseSources ? 1 : 2);

firstSource.union(secondSource).sinkTo(new DiscardingSink<>()).name("sink").uid("sink");

return createJobGraph(env.getStreamGraph());
}

private DataStream<Integer> createUidSource(
StreamExecutionEnvironment env, String sourceName, int value) {
return env.fromData(value).name(sourceName).uid(sourceName);
}

private JobGraph getUidUnionJobGraphWithUnrelatedSource(boolean reverseSources) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
env.disableOperatorChaining();

env.fromData(0)
.name("unrelated-source")
.sinkTo(new DiscardingSink<>())
.name("unrelated-sink");

DataStream<Integer> firstSource =
createUidSource(
env, reverseSources ? "source-b" : "source-a", reverseSources ? 2 : 1);
DataStream<Integer> secondSource =
createUidSource(
env, reverseSources ? "source-a" : "source-b", reverseSources ? 1 : 2);

firstSource.union(secondSource).sinkTo(new DiscardingSink<>()).name("sink").uid("sink");

return createJobGraph(env.getStreamGraph());
}

private JobGraph getUidHashUnionJobGraph() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
env.disableOperatorChaining();

DataStream<Integer> firstSource =
createUidHashSource(env, "source-b", 2, "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
DataStream<Integer> secondSource =
createUidHashSource(env, "source-a", 1, "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");

firstSource.union(secondSource).sinkTo(new DiscardingSink<>()).name("sink").uid("sink");

return createJobGraph(env.getStreamGraph());
}

private DataStream<Integer> createUidHashSource(
StreamExecutionEnvironment env, String sourceName, int value, String uidHash) {
return env.fromData(value).name(sourceName).setUidHash(uidHash);
}

private List<String> getInputSourceNames(JobGraph jobGraph) {
JobVertex jobSink = Iterables.getLast(jobGraph.getVerticesSortedTopologicallyFromSources());
return getInputSourceNames(jobSink);
}

private List<String> getMultiInputSourceNames(JobGraph jobGraph) {
JobVertex jobSink =
jobGraph.getVerticesSortedTopologicallyFromSources().stream()
.filter(jobVertex -> jobVertex.getInputs().size() > 1)
.findFirst()
.orElseThrow(() -> new AssertionError("Expected a multi-input sink"));
return getInputSourceNames(jobSink);
}

private List<String> getInputSourceNames(JobVertex jobSink) {
return jobSink.getInputs().stream()
.map(edge -> edge.getSource().getProducer().getName())
.collect(Collectors.toList());
}

@Test
void testNotSupportInputSelectableOperatorIfCheckpointing() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down