diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index fe0f1ab8bc91..3143ecdd3efc 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -1559,7 +1559,6 @@ class BeamModulePlugin implements Plugin { "NonCanonicalType", "Slf4jFormatShouldBeConst", "Slf4jSignOnlyFormat", - "StaticAssignmentInConstructor", "ThreadPriorityCheck", "TimeUnitConversionChecker", "UndefinedEquals", diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java index 12f8eae152fc..0a8ec4f21b50 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java @@ -36,6 +36,7 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.beam.runners.direct.BoundedReadEvaluatorFactory.BoundedSourceShard; import org.apache.beam.sdk.coders.BigEndianLongCoder; import org.apache.beam.sdk.coders.Coder; @@ -306,7 +307,7 @@ public void boundedSourceEvaluatorClosesReader() throws Exception { evaluator.finishBundle(); CommittedBundle committed = output.commit(Instant.now()); assertThat(committed.getElements(), containsInAnyOrder(gw(2L), gw(3L), gw(1L))); - assertThat(TestSource.readerClosed, is(true)); + assertThat(TestSource.readerClosed.get(), is(true)); } @Test @@ -326,7 +327,7 @@ public void boundedSourceEvaluatorNoElementsClosesReader() throws Exception { evaluator.finishBundle(); CommittedBundle committed = output.commit(Instant.now()); assertThat(committed.getElements(), emptyIterable()); - assertThat(TestSource.readerClosed, is(true)); + assertThat(TestSource.readerClosed.get(), is(true)); } @Test @@ -336,7 +337,7 @@ public void cleanupShutsDownExecutor() { } private static class TestSource extends OffsetBasedSource { - private static boolean readerClosed; + private static final AtomicBoolean readerClosed = new AtomicBoolean(false); private final Coder coder; private final T[] elems; private final int firstSplitIndex; @@ -352,7 +353,7 @@ public TestSource(Coder coder, int firstSplitIndex, T... elems) { this.elems = elems; this.coder = coder; this.firstSplitIndex = firstSplitIndex; - readerClosed = false; + readerClosed.set(false); subrangesCompleted = new CountDownLatch(2); } @@ -449,7 +450,7 @@ public T getCurrent() throws NoSuchElementException { @Override public void close() throws IOException { - TestSource.readerClosed = true; + TestSource.readerClosed.set(true); } } diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java index 5413a694e92b..ca577aeb034d 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java @@ -339,8 +339,8 @@ public void evaluatorReusesReaderAndClosesAtTheEnd() throws Exception { } while (!Iterables.isEmpty(residual.getElements())); verify(output, times(numElements)).add(any()); - assertThat(TestUnboundedSource.readerCreatedCount, equalTo(1)); - assertThat(TestUnboundedSource.readerClosedCount, equalTo(1)); + assertThat(TestUnboundedSource.READER_CREATED_COUNT.get(), equalTo(1)); + assertThat(TestUnboundedSource.READER_CLOSED_COUNT.get(), equalTo(1)); } @Test @@ -382,7 +382,7 @@ public void evaluatorClosesReaderAndResumesFromCheckpoint() throws Exception { secondEvaluator.processElement(Iterables.getOnlyElement(residual.getElements())); secondEvaluator.finishBundle(); - assertThat(TestUnboundedSource.readerClosedCount, equalTo(2)); + assertThat(TestUnboundedSource.READER_CLOSED_COUNT.get(), equalTo(2)); assertThat( Iterables.getOnlyElement(residual.getElements()).getValue().getCheckpoint().isFinalized(), is(true)); @@ -421,12 +421,12 @@ public void evaluatorThrowsInCloseRethrows() throws Exception { @Test // before this was throwing a NPE public void emptySource() throws Exception { - TestUnboundedSource.readerClosedCount = 0; + TestUnboundedSource.READER_CLOSED_COUNT.set(0); final TestUnboundedSource source = new TestUnboundedSource<>(StringUtf8Coder.of()); source.advanceWatermarkToInfinity = true; processElement(source); - assertEquals(1, TestUnboundedSource.readerClosedCount); - TestUnboundedSource.readerClosedCount = 0; // reset + assertEquals(1, TestUnboundedSource.READER_CLOSED_COUNT.get()); + TestUnboundedSource.READER_CLOSED_COUNT.set(0); // reset } @Test(expected = IOException.class) @@ -472,7 +472,7 @@ private void processElement(final TestUnboundedSource source) throws Exc final WindowedValue> value = WindowedValues.of( shard, BoundedWindow.TIMESTAMP_MAX_VALUE, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING); - TestUnboundedSource.readerClosedCount = 0; + TestUnboundedSource.READER_CLOSED_COUNT.set(0); evaluator.processElement(value); } @@ -492,11 +492,15 @@ public Instant apply(Long input) { } private static class TestUnboundedSource extends UnboundedSource { - private static int getWatermarkCalls = 0; - - static int readerCreatedCount; - static int readerClosedCount; - static int readerAdvancedCount; + private static final java.util.concurrent.atomic.AtomicInteger getWatermarkCalls = + new java.util.concurrent.atomic.AtomicInteger(0); + + static final java.util.concurrent.atomic.AtomicInteger READER_CREATED_COUNT = + new java.util.concurrent.atomic.AtomicInteger(0); + static final java.util.concurrent.atomic.AtomicInteger READER_CLOSED_COUNT = + new java.util.concurrent.atomic.AtomicInteger(0); + static final java.util.concurrent.atomic.AtomicInteger READER_ADVANCED_COUNT = + new java.util.concurrent.atomic.AtomicInteger(0); private final Coder coder; private final List elems; private boolean dedupes = false; @@ -508,9 +512,9 @@ public TestUnboundedSource(Coder coder, T... elems) { } private TestUnboundedSource(Coder coder, boolean throwOnClose, List elems) { - readerCreatedCount = 0; - readerClosedCount = 0; - readerAdvancedCount = 0; + READER_CREATED_COUNT.set(0); + READER_CLOSED_COUNT.set(0); + READER_ADVANCED_COUNT.set(0); this.coder = coder; this.elems = elems; this.throwOnClose = throwOnClose; @@ -528,7 +532,7 @@ public UnboundedSource.UnboundedReader createReader( checkState( checkpointMark == null || checkpointMark.decoded, "Cannot resume from a checkpoint that has not been decoded"); - readerCreatedCount++; + READER_CREATED_COUNT.incrementAndGet(); return new TestUnboundedReader(elems, checkpointMark == null ? -1 : checkpointMark.index); } @@ -568,7 +572,7 @@ public boolean start() throws IOException { @Override public boolean advance() throws IOException { - readerAdvancedCount++; + READER_ADVANCED_COUNT.incrementAndGet(); if (index + 1 < elems.size()) { index++; return true; @@ -578,11 +582,11 @@ public boolean advance() throws IOException { @Override public Instant getWatermark() { - getWatermarkCalls++; + getWatermarkCalls.incrementAndGet(); if (index + 1 == elems.size() && TestUnboundedSource.this.advanceWatermarkToInfinity) { return BoundedWindow.TIMESTAMP_MAX_VALUE; } else { - return new Instant(index + getWatermarkCalls); + return new Instant(index + getWatermarkCalls.get()); } } @@ -618,7 +622,7 @@ public byte[] getCurrentRecordId() { @Override public void close() throws IOException { try { - readerClosedCount++; + READER_CLOSED_COUNT.incrementAndGet(); // Enforce the AutoCloseable contract. Close is not idempotent. assertThat(closed, is(false)); if (throwOnClose) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index d11c6c374333..1bee9d004ed5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -4732,17 +4732,18 @@ public void run() { private static class FakeSlowDoFn extends DoFn { - private static FakeClock clock; // A static variable keeps this DoFn serializable. + private static final AtomicReference clock = + new AtomicReference<>(); // A static variable keeps this DoFn serializable. private final Duration sleep; FakeSlowDoFn(FakeClock clock, Duration sleep) { - FakeSlowDoFn.clock = clock; + FakeSlowDoFn.clock.set(clock); this.sleep = sleep; } @ProcessElement public void processElement(ProcessContext c) throws Exception { - clock.sleep(sleep); + clock.get().sleep(sleep); c.output(c.element()); } } diff --git a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/InMemoryMetrics.java b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/InMemoryMetrics.java index 69f6abee1f67..69df5768e5d0 100644 --- a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/InMemoryMetrics.java +++ b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/InMemoryMetrics.java @@ -22,6 +22,7 @@ import com.codahale.metrics.MetricRegistry; import java.util.Collection; import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; import org.apache.beam.runners.spark.structuredstreaming.metrics.WithMetricsSupport; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.spark.metrics.sink.Sink; @@ -29,8 +30,10 @@ /** An in-memory {@link Sink} implementation for tests. */ public class InMemoryMetrics implements Sink { - private static WithMetricsSupport extendedMetricsRegistry; - private static MetricRegistry internalMetricRegistry; + private static final AtomicReference extendedMetricsRegistry = + new AtomicReference<>(); + private static final AtomicReference internalMetricRegistry = + new AtomicReference<>(); // Constructor for Spark 3.1 @SuppressWarnings("UnusedParameters") @@ -38,24 +41,24 @@ public InMemoryMetrics( final Properties properties, final MetricRegistry metricRegistry, final org.apache.spark.SecurityManager securityMgr) { - extendedMetricsRegistry = WithMetricsSupport.forRegistry(metricRegistry); - internalMetricRegistry = metricRegistry; + extendedMetricsRegistry.set(WithMetricsSupport.forRegistry(metricRegistry)); + internalMetricRegistry.set(metricRegistry); } // Constructor for Spark >= 3.2 @SuppressWarnings("UnusedParameters") public InMemoryMetrics(final Properties properties, final MetricRegistry metricRegistry) { - extendedMetricsRegistry = WithMetricsSupport.forRegistry(metricRegistry); - internalMetricRegistry = metricRegistry; + extendedMetricsRegistry.set(WithMetricsSupport.forRegistry(metricRegistry)); + internalMetricRegistry.set(metricRegistry); } @SuppressWarnings({"TypeParameterUnusedInFormals", "rawtypes"}) // because of getGauges public static T valueOf(final String name) { // this might fail in case we have multiple aggregators with the same suffix after // the last dot, but it should be good enough for tests. - if (extendedMetricsRegistry != null) { - Collection matches = - extendedMetricsRegistry.getGauges((n, m) -> n.endsWith(name)).values(); + WithMetricsSupport extended = extendedMetricsRegistry.get(); + if (extended != null) { + Collection matches = extended.getGauges((n, m) -> n.endsWith(name)).values(); return matches.isEmpty() ? null : (T) Iterables.getOnlyElement(matches).getValue(); } else { return null; @@ -64,8 +67,9 @@ public static T valueOf(final String name) { @SuppressWarnings("WeakerAccess") public static void clearAll() { - if (internalMetricRegistry != null) { - internalMetricRegistry.removeMatching(MetricFilter.ALL); + MetricRegistry internal = internalMetricRegistry.get(); + if (internal != null) { + internal.removeMatching(MetricFilter.ALL); } } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java index db040bbfcc45..2c12c42bbf7a 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java @@ -22,6 +22,7 @@ import com.codahale.metrics.MetricRegistry; import java.util.Collection; import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; import org.apache.beam.runners.spark.metrics.WithMetricsSupport; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.spark.metrics.sink.Sink; @@ -29,8 +30,10 @@ /** An in-memory {@link Sink} implementation for tests. */ public class InMemoryMetrics implements Sink { - private static WithMetricsSupport extendedMetricsRegistry; - private static MetricRegistry internalMetricRegistry; + private static final AtomicReference extendedMetricsRegistry = + new AtomicReference<>(); + private static final AtomicReference internalMetricRegistry = + new AtomicReference<>(); // Constructor for Spark 3.1 @SuppressWarnings("UnusedParameters") @@ -38,24 +41,24 @@ public InMemoryMetrics( final Properties properties, final MetricRegistry metricRegistry, final org.apache.spark.SecurityManager securityMgr) { - extendedMetricsRegistry = WithMetricsSupport.forRegistry(metricRegistry); - internalMetricRegistry = metricRegistry; + extendedMetricsRegistry.set(WithMetricsSupport.forRegistry(metricRegistry)); + internalMetricRegistry.set(metricRegistry); } // Constructor for Spark >= 3.2 @SuppressWarnings("UnusedParameters") public InMemoryMetrics(final Properties properties, final MetricRegistry metricRegistry) { - extendedMetricsRegistry = WithMetricsSupport.forRegistry(metricRegistry); - internalMetricRegistry = metricRegistry; + extendedMetricsRegistry.set(WithMetricsSupport.forRegistry(metricRegistry)); + internalMetricRegistry.set(metricRegistry); } @SuppressWarnings({"TypeParameterUnusedInFormals", "rawtypes"}) public static T valueOf(final String name) { // this might fail in case we have multiple aggregators with the same suffix after // the last dot, but it should be good enough for tests. - if (extendedMetricsRegistry != null) { - Collection matches = - extendedMetricsRegistry.getGauges((n, m) -> n.endsWith(name)).values(); + WithMetricsSupport extended = extendedMetricsRegistry.get(); + if (extended != null) { + Collection matches = extended.getGauges((n, m) -> n.endsWith(name)).values(); return matches.isEmpty() ? null : (T) Iterables.getOnlyElement(matches).getValue(); } else { return null; @@ -64,8 +67,9 @@ public static T valueOf(final String name) { @SuppressWarnings("WeakerAccess") public static void clearAll() { - if (internalMetricRegistry != null) { - internalMetricRegistry.removeMatching(MetricFilter.ALL); + MetricRegistry internal = internalMetricRegistry.get(); + if (internal != null) { + internal.removeMatching(MetricFilter.ALL); } } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java index 9cb82f27722d..ddd0e74d1c9e 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java @@ -38,7 +38,8 @@ * tests requiring a different context have to be forked using separate test classes. */ @SuppressWarnings({ - "rawtypes" // TODO(https://github.com/apache/beam/issues/20447) + "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) + "StaticAssignmentInConstructor" // used for testing purposes }) @RunWith(Enclosed.class) public class SparkRunnerKryoRegistratorTest { diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HiveDatabaseTestHelper.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HiveDatabaseTestHelper.java index fb83c0060f49..616ad10edb49 100644 --- a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HiveDatabaseTestHelper.java +++ b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HiveDatabaseTestHelper.java @@ -21,12 +21,13 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; +import java.util.concurrent.atomic.AtomicReference; import org.apache.beam.sdk.io.common.DatabaseTestHelper; /** Helper for creating connection and test tables on hive database via JDBC driver. */ class HiveDatabaseTestHelper { - private static Connection con; - private static Statement stmt; + private static final AtomicReference con = new AtomicReference<>(); + private static final AtomicReference stmt = new AtomicReference<>(); HiveDatabaseTestHelper( String hiveHost, @@ -36,24 +37,24 @@ class HiveDatabaseTestHelper { String hivePassword) throws Exception { String hiveUrl = String.format("jdbc:hive2://%s:%s/%s", hiveHost, hivePort, hiveDatabase); - con = DriverManager.getConnection(hiveUrl, hiveUsername, hivePassword); - stmt = con.createStatement(); + con.set(DriverManager.getConnection(hiveUrl, hiveUsername, hivePassword)); + stmt.set(con.get().createStatement()); } /** Create hive table. */ String createHiveTable(String testIdentifier) throws Exception { String tableName = DatabaseTestHelper.getTestTableName(testIdentifier); - stmt.execute(" CREATE TABLE IF NOT EXISTS " + tableName + " (id STRING)"); + stmt.get().execute(" CREATE TABLE IF NOT EXISTS " + tableName + " (id STRING)"); return tableName; } /** Delete hive table. */ void dropHiveTable(String tableName) throws SQLException { - stmt.execute(" DROP TABLE " + tableName); + stmt.get().execute(" DROP TABLE " + tableName); } void closeConnection() throws Exception { - stmt.close(); - con.close(); + stmt.get().close(); + con.get().close(); } } diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index 7f3b394d7f6a..b3233f866172 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -69,6 +69,7 @@ import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import javax.jms.BytesMessage; import javax.jms.Connection; @@ -1197,17 +1198,14 @@ public Message apply(String value, Session session) { private static class TextMessageMapperWithErrorCounter implements SerializableBiFunction { - private static int errorCounter; + private static final AtomicInteger errorCounter = new AtomicInteger(0); - TextMessageMapperWithErrorCounter() { - errorCounter = 0; - } + TextMessageMapperWithErrorCounter() {} @Override public Message apply(String value, Session session) { try { - if (errorCounter == 0) { - errorCounter++; + if (errorCounter.getAndIncrement() == 0) { throw new JMSException("Error!!"); } TextMessage msg = session.createTextMessage(); diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeDatabase.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeDatabase.java index 32e494496bec..338a316b1b9b 100644 --- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeDatabase.java +++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeDatabase.java @@ -19,19 +19,17 @@ import java.io.Serializable; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import net.snowflake.client.jdbc.SnowflakeSQLException; /** Fake implementation of Snowflake warehouse used in test code. */ public class FakeSnowflakeDatabase implements Serializable { - private static Map> tables = new HashMap<>(); + private static final Map> tables = new ConcurrentHashMap<>(); - private FakeSnowflakeDatabase() { - tables = new HashMap<>(); - } + private FakeSnowflakeDatabase() {} public static void createTable(String table) { FakeSnowflakeDatabase.tables.put(table, Collections.emptyList()); @@ -72,7 +70,7 @@ public static void createTableWithElements(String table, List rows) { } public static void clean() { - FakeSnowflakeDatabase.tables = new HashMap<>(); + tables.clear(); } public static void truncateTable(String table) {