diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index aad27b869863..98f596141eed 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.dataflow.worker;
import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannels.remoteChannel;
+import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
import com.google.api.services.dataflow.model.MapTask;
import com.google.auto.value.AutoValue;
@@ -119,6 +120,8 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.checkerframework.checker.initialization.qual.UnderInitialization;
+import org.checkerframework.checker.initialization.qual.UnknownInitialization;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
@@ -129,9 +132,6 @@
*
*
Implements a Streaming Dataflow worker.
*/
-@SuppressWarnings({
- "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
@Internal
public final class StreamingDataflowWorker {
@@ -189,7 +189,7 @@ public final class StreamingDataflowWorker {
private final StreamingWorkerStatusReporter workerStatusReporter;
private final int numCommitThreads;
private final Supplier clock;
- private final GrpcDispatcherClient dispatcherClient;
+ private final @Nullable GrpcDispatcherClient dispatcherClient;
private final ExecutorService harnessSwitchExecutor;
private final long clientId;
private final WindmillServerStub windmillServer;
@@ -271,7 +271,7 @@ private StreamingDataflowWorker(
streamingWorkScheduler,
getDataMetricTracker,
memoryMonitor,
- this.dispatcherClient);
+ checkNotNull(this.dispatcherClient));
} else {
harnessFactoryOutput =
createSingleSourceWorkerHarness(
@@ -330,6 +330,8 @@ private StreamingDataflowWorker(
}
private StreamingWorkerHarnessFactoryOutput createApplianceWorkerHarness(
+ @UnderInitialization()
+ StreamingDataflowWorker this, // Use receiver parameter syntax to allow annotation.
long clientId,
DataflowWorkerHarnessOptions options,
WindmillServerStub windmillServer,
@@ -345,6 +347,7 @@ private StreamingWorkerHarnessFactoryOutput createApplianceWorkerHarness(
GetDataClient getDataClient = new ApplianceGetDataClient(windmillServer, getDataMetricTracker);
HeartbeatSender heartbeatSender = new ApplianceHeartbeatSender(windmillServer::getData);
+ @SuppressWarnings("methodref.receiver.bound")
WorkCommitter workCommitter =
StreamingApplianceWorkCommitter.create(windmillServer::commitWork, this::onCompleteCommit);
GetWorkSender getWorkSender = GetWorkSender.forAppliance(() -> windmillServer.getWork(request));
@@ -355,7 +358,7 @@ private StreamingWorkerHarnessFactoryOutput createApplianceWorkerHarness(
.setStreamingWorkScheduler(streamingWorkScheduler)
.setWorkCommitter(workCommitter)
.setGetDataClient(getDataClient)
- .setComputationStateFetcher(this.computationStateCache::get)
+ .setComputationStateFetcher(checkNotNull(this.computationStateCache)::get)
.setWaitForResources(() -> memoryMonitor.waitForResources("GetWork"))
.setHeartbeatSender(heartbeatSender)
.setGetWorkSender(getWorkSender)
@@ -368,6 +371,8 @@ private StreamingWorkerHarnessFactoryOutput createApplianceWorkerHarness(
}
private StreamingWorkerHarnessFactoryOutput createFanOutStreamingEngineWorkerHarness(
+ @UnknownInitialization()
+ StreamingDataflowWorker this, // Use receiver parameter syntax to allow annotation.
long clientId,
DataflowWorkerHarnessOptions options,
GrpcWindmillStreamFactory windmillStreamFactory,
@@ -376,7 +381,8 @@ private StreamingWorkerHarnessFactoryOutput createFanOutStreamingEngineWorkerHar
MemoryMonitor memoryMonitor,
GrpcDispatcherClient dispatcherClient) {
WeightedSemaphore maxCommitByteSemaphore = Commits.maxCommitByteSemaphore();
- ChannelCache channelCache = createChannelCache(options, configFetcher);
+ ChannelCache channelCache = createChannelCache(options, checkNotNull(configFetcher));
+ @SuppressWarnings("methodref.receiver.bound")
FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness =
FanOutStreamingEngineWorkerHarness.create(
createJobHeader(options, clientId),
@@ -391,7 +397,7 @@ private StreamingWorkerHarnessFactoryOutput createFanOutStreamingEngineWorkerHar
processingContext,
drainMode,
getWorkStreamLatencies) ->
- computationStateCache
+ checkNotNull(computationStateCache)
.get(processingContext.computationId())
.ifPresent(
computationState -> {
@@ -407,7 +413,7 @@ private StreamingWorkerHarnessFactoryOutput createFanOutStreamingEngineWorkerHar
}),
ChannelCachingRemoteStubFactory.create(options.getGcpCredential(), channelCache),
GetWorkBudgetDistributors.distributeEvenly(),
- Preconditions.checkNotNull(dispatcherClient),
+ checkNotNull(dispatcherClient),
commitWorkStream ->
StreamingEngineWorkCommitter.builder()
// Share the commitByteSemaphore across all created workCommitters.
@@ -433,6 +439,8 @@ private StreamingWorkerHarnessFactoryOutput createFanOutStreamingEngineWorkerHar
}
private StreamingWorkerHarnessFactoryOutput createSingleSourceWorkerHarness(
+ @UnknownInitialization()
+ StreamingDataflowWorker this, // Use receiver parameter syntax to allow annotation.
long clientId,
DataflowWorkerHarnessOptions options,
WindmillServerStub windmillServer,
@@ -454,7 +462,11 @@ private StreamingWorkerHarnessFactoryOutput createSingleSourceWorkerHarness(
new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool);
HeartbeatSender heartbeatSender =
createStreamingEngineHeartbeatSender(
- options, windmillServer, getDataStreamPool, configFetcher.getGlobalConfigHandle());
+ options,
+ windmillServer,
+ getDataStreamPool,
+ checkNotNull(configFetcher).getGlobalConfigHandle());
+ @SuppressWarnings("methodref.receiver.bound")
WorkCommitter workCommitter =
StreamingEngineWorkCommitter.builder()
.setCommitWorkStreamFactory(
@@ -476,7 +488,7 @@ private StreamingWorkerHarnessFactoryOutput createSingleSourceWorkerHarness(
.setStreamingWorkScheduler(streamingWorkScheduler)
.setWorkCommitter(workCommitter)
.setGetDataClient(getDataClient)
- .setComputationStateFetcher(this.computationStateCache::get)
+ .setComputationStateFetcher(checkNotNull(this.computationStateCache)::get)
.setWaitForResources(() -> memoryMonitor.waitForResources("GetWork"))
.setHeartbeatSender(heartbeatSender)
.setGetWorkSender(getWorkSender)
@@ -489,17 +501,20 @@ private StreamingWorkerHarnessFactoryOutput createSingleSourceWorkerHarness(
}
private void switchStreamingWorkerHarness(ConnectivityType connectivityType) {
- if ((connectivityType == ConnectivityType.CONNECTIVITY_TYPE_DIRECTPATH
+ if (connectivityType == ConnectivityType.CONNECTIVITY_TYPE_DEFAULT) {
+ return;
+ }
+ boolean directPath = connectivityType == ConnectivityType.CONNECTIVITY_TYPE_DIRECTPATH;
+ if ((directPath
&& this.streamingWorkerHarness.get() instanceof FanOutStreamingEngineWorkerHarness)
- || (connectivityType == ConnectivityType.CONNECTIVITY_TYPE_CLOUDPATH
- && streamingWorkerHarness.get() instanceof SingleSourceWorkerHarness)) {
+ || (!directPath && streamingWorkerHarness.get() instanceof SingleSourceWorkerHarness)) {
return;
}
// Stop the current status pages before switching the harness.
this.statusPages.get().stop();
LOG.debug("Stopped StreamingWorkerStatusPages before switching connectivity type.");
- StreamingWorkerHarnessFactoryOutput newHarnessFactoryOutput = null;
- if (connectivityType == ConnectivityType.CONNECTIVITY_TYPE_DIRECTPATH) {
+ StreamingWorkerHarnessFactoryOutput newHarnessFactoryOutput;
+ if (directPath) {
// If dataflow experiment `enable_windmill_service_direct_path` is not set for
// the job, do not switch to FanOutStreamingEngineWorkerHarness. This is because
// `enable_windmill_service_direct_path` is tied to SDK version and is only
@@ -524,11 +539,11 @@ private void switchStreamingWorkerHarness(ConnectivityType connectivityType) {
this.streamingWorkScheduler,
this.getDataMetricTracker,
this.memoryMonitor.memoryMonitor(),
- this.dispatcherClient);
+ checkNotNull(this.dispatcherClient));
this.streamingWorkerHarness.set(newHarnessFactoryOutput.streamingWorkerHarness());
streamingWorkerHarness.get().start();
LOG.debug("Started FanOutStreamingEngineWorkerHarness");
- } else if (connectivityType == ConnectivityType.CONNECTIVITY_TYPE_CLOUDPATH) {
+ } else {
LOG.info("Switching connectivity type from DIRECTPATH to CLOUDPATH");
LOG.debug("Shutting down FanOutStreamingEngineWorkerHarness");
streamingWorkerHarness.get().shutdown();