Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1559,7 +1559,6 @@ class BeamModulePlugin implements Plugin<Project> {
"NonCanonicalType",
"Slf4jFormatShouldBeConst",
"Slf4jSignOnlyFormat",
"StaticAssignmentInConstructor",
"ThreadPriorityCheck",
"TimeUnitConversionChecker",
"UndefinedEquals",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.runners.direct.BoundedReadEvaluatorFactory.BoundedSourceShard;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
Expand Down Expand Up @@ -306,7 +307,7 @@ public void boundedSourceEvaluatorClosesReader() throws Exception {
evaluator.finishBundle();
CommittedBundle<Long> committed = output.commit(Instant.now());
assertThat(committed.getElements(), containsInAnyOrder(gw(2L), gw(3L), gw(1L)));
assertThat(TestSource.readerClosed, is(true));
assertThat(TestSource.readerClosed.get(), is(true));
}

@Test
Expand All @@ -326,7 +327,7 @@ public void boundedSourceEvaluatorNoElementsClosesReader() throws Exception {
evaluator.finishBundle();
CommittedBundle<Long> committed = output.commit(Instant.now());
assertThat(committed.getElements(), emptyIterable());
assertThat(TestSource.readerClosed, is(true));
assertThat(TestSource.readerClosed.get(), is(true));
}

@Test
Expand All @@ -336,7 +337,7 @@ public void cleanupShutsDownExecutor() {
}

private static class TestSource<T> extends OffsetBasedSource<T> {
private static boolean readerClosed;
private static final AtomicBoolean readerClosed = new AtomicBoolean(false);
private final Coder<T> coder;
private final T[] elems;
private final int firstSplitIndex;
Expand All @@ -352,7 +353,7 @@ public TestSource(Coder<T> coder, int firstSplitIndex, T... elems) {
this.elems = elems;
this.coder = coder;
this.firstSplitIndex = firstSplitIndex;
readerClosed = false;
readerClosed.set(false);

subrangesCompleted = new CountDownLatch(2);
}
Expand Down Expand Up @@ -449,7 +450,7 @@ public T getCurrent() throws NoSuchElementException {

@Override
public void close() throws IOException {
TestSource.readerClosed = true;
TestSource.readerClosed.set(true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,8 @@ public void evaluatorReusesReaderAndClosesAtTheEnd() throws Exception {
} while (!Iterables.isEmpty(residual.getElements()));

verify(output, times(numElements)).add(any());
assertThat(TestUnboundedSource.readerCreatedCount, equalTo(1));
assertThat(TestUnboundedSource.readerClosedCount, equalTo(1));
assertThat(TestUnboundedSource.READER_CREATED_COUNT.get(), equalTo(1));
assertThat(TestUnboundedSource.READER_CLOSED_COUNT.get(), equalTo(1));
}

@Test
Expand Down Expand Up @@ -382,7 +382,7 @@ public void evaluatorClosesReaderAndResumesFromCheckpoint() throws Exception {
secondEvaluator.processElement(Iterables.getOnlyElement(residual.getElements()));
secondEvaluator.finishBundle();

assertThat(TestUnboundedSource.readerClosedCount, equalTo(2));
assertThat(TestUnboundedSource.READER_CLOSED_COUNT.get(), equalTo(2));
assertThat(
Iterables.getOnlyElement(residual.getElements()).getValue().getCheckpoint().isFinalized(),
is(true));
Expand Down Expand Up @@ -421,12 +421,12 @@ public void evaluatorThrowsInCloseRethrows() throws Exception {

@Test // before this was throwing a NPE
public void emptySource() throws Exception {
TestUnboundedSource.readerClosedCount = 0;
TestUnboundedSource.READER_CLOSED_COUNT.set(0);
final TestUnboundedSource<String> source = new TestUnboundedSource<>(StringUtf8Coder.of());
source.advanceWatermarkToInfinity = true;
processElement(source);
assertEquals(1, TestUnboundedSource.readerClosedCount);
TestUnboundedSource.readerClosedCount = 0; // reset
assertEquals(1, TestUnboundedSource.READER_CLOSED_COUNT.get());
TestUnboundedSource.READER_CLOSED_COUNT.set(0); // reset
}

@Test(expected = IOException.class)
Expand Down Expand Up @@ -472,7 +472,7 @@ private void processElement(final TestUnboundedSource<String> source) throws Exc
final WindowedValue<UnboundedSourceShard<String, TestCheckpointMark>> value =
WindowedValues.of(
shard, BoundedWindow.TIMESTAMP_MAX_VALUE, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
TestUnboundedSource.readerClosedCount = 0;
TestUnboundedSource.READER_CLOSED_COUNT.set(0);
evaluator.processElement(value);
}

Expand All @@ -492,11 +492,15 @@ public Instant apply(Long input) {
}

private static class TestUnboundedSource<T> extends UnboundedSource<T, TestCheckpointMark> {
private static int getWatermarkCalls = 0;

static int readerCreatedCount;
static int readerClosedCount;
static int readerAdvancedCount;
private static final java.util.concurrent.atomic.AtomicInteger getWatermarkCalls =
new java.util.concurrent.atomic.AtomicInteger(0);

static final java.util.concurrent.atomic.AtomicInteger READER_CREATED_COUNT =
new java.util.concurrent.atomic.AtomicInteger(0);
static final java.util.concurrent.atomic.AtomicInteger READER_CLOSED_COUNT =
new java.util.concurrent.atomic.AtomicInteger(0);
static final java.util.concurrent.atomic.AtomicInteger READER_ADVANCED_COUNT =
new java.util.concurrent.atomic.AtomicInteger(0);
private final Coder<T> coder;
private final List<T> elems;
private boolean dedupes = false;
Expand All @@ -508,9 +512,9 @@ public TestUnboundedSource(Coder<T> coder, T... elems) {
}

private TestUnboundedSource(Coder<T> coder, boolean throwOnClose, List<T> elems) {
readerCreatedCount = 0;
readerClosedCount = 0;
readerAdvancedCount = 0;
READER_CREATED_COUNT.set(0);
READER_CLOSED_COUNT.set(0);
READER_ADVANCED_COUNT.set(0);
this.coder = coder;
this.elems = elems;
this.throwOnClose = throwOnClose;
Expand All @@ -528,7 +532,7 @@ public UnboundedSource.UnboundedReader<T> createReader(
checkState(
checkpointMark == null || checkpointMark.decoded,
"Cannot resume from a checkpoint that has not been decoded");
readerCreatedCount++;
READER_CREATED_COUNT.incrementAndGet();
return new TestUnboundedReader(elems, checkpointMark == null ? -1 : checkpointMark.index);
}

Expand Down Expand Up @@ -568,7 +572,7 @@ public boolean start() throws IOException {

@Override
public boolean advance() throws IOException {
readerAdvancedCount++;
READER_ADVANCED_COUNT.incrementAndGet();
if (index + 1 < elems.size()) {
index++;
return true;
Expand All @@ -578,11 +582,11 @@ public boolean advance() throws IOException {

@Override
public Instant getWatermark() {
getWatermarkCalls++;
getWatermarkCalls.incrementAndGet();
if (index + 1 == elems.size() && TestUnboundedSource.this.advanceWatermarkToInfinity) {
return BoundedWindow.TIMESTAMP_MAX_VALUE;
} else {
return new Instant(index + getWatermarkCalls);
return new Instant(index + getWatermarkCalls.get());
}
}

Expand Down Expand Up @@ -618,7 +622,7 @@ public byte[] getCurrentRecordId() {
@Override
public void close() throws IOException {
try {
readerClosedCount++;
READER_CLOSED_COUNT.incrementAndGet();
// Enforce the AutoCloseable contract. Close is not idempotent.
assertThat(closed, is(false));
if (throwOnClose) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4732,17 +4732,18 @@ public void run() {

private static class FakeSlowDoFn extends DoFn<String, String> {

private static FakeClock clock; // A static variable keeps this DoFn serializable.
private static final AtomicReference<FakeClock> clock =
new AtomicReference<>(); // A static variable keeps this DoFn serializable.
private final Duration sleep;

FakeSlowDoFn(FakeClock clock, Duration sleep) {
FakeSlowDoFn.clock = clock;
FakeSlowDoFn.clock.set(clock);
this.sleep = sleep;
}

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
clock.sleep(sleep);
clock.get().sleep(sleep);
c.output(c.element());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,40 +22,43 @@
import com.codahale.metrics.MetricRegistry;
import java.util.Collection;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.runners.spark.structuredstreaming.metrics.WithMetricsSupport;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.spark.metrics.sink.Sink;

/** An in-memory {@link Sink} implementation for tests. */
public class InMemoryMetrics implements Sink {

private static WithMetricsSupport extendedMetricsRegistry;
private static MetricRegistry internalMetricRegistry;
private static final AtomicReference<WithMetricsSupport> extendedMetricsRegistry =
new AtomicReference<>();
private static final AtomicReference<MetricRegistry> internalMetricRegistry =
new AtomicReference<>();

// Constructor for Spark 3.1
@SuppressWarnings("UnusedParameters")
public InMemoryMetrics(
final Properties properties,
final MetricRegistry metricRegistry,
final org.apache.spark.SecurityManager securityMgr) {
extendedMetricsRegistry = WithMetricsSupport.forRegistry(metricRegistry);
internalMetricRegistry = metricRegistry;
extendedMetricsRegistry.set(WithMetricsSupport.forRegistry(metricRegistry));
internalMetricRegistry.set(metricRegistry);
}

// Constructor for Spark >= 3.2
@SuppressWarnings("UnusedParameters")
public InMemoryMetrics(final Properties properties, final MetricRegistry metricRegistry) {
extendedMetricsRegistry = WithMetricsSupport.forRegistry(metricRegistry);
internalMetricRegistry = metricRegistry;
extendedMetricsRegistry.set(WithMetricsSupport.forRegistry(metricRegistry));
internalMetricRegistry.set(metricRegistry);
}

@SuppressWarnings({"TypeParameterUnusedInFormals", "rawtypes"}) // because of getGauges
public static <T> T valueOf(final String name) {
// this might fail in case we have multiple aggregators with the same suffix after
// the last dot, but it should be good enough for tests.
if (extendedMetricsRegistry != null) {
Collection<Gauge> matches =
extendedMetricsRegistry.getGauges((n, m) -> n.endsWith(name)).values();
WithMetricsSupport extended = extendedMetricsRegistry.get();
if (extended != null) {
Collection<Gauge> matches = extended.getGauges((n, m) -> n.endsWith(name)).values();
return matches.isEmpty() ? null : (T) Iterables.getOnlyElement(matches).getValue();
} else {
return null;
Expand All @@ -64,8 +67,9 @@ public static <T> T valueOf(final String name) {

@SuppressWarnings("WeakerAccess")
public static void clearAll() {
if (internalMetricRegistry != null) {
internalMetricRegistry.removeMatching(MetricFilter.ALL);
MetricRegistry internal = internalMetricRegistry.get();
if (internal != null) {
internal.removeMatching(MetricFilter.ALL);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,40 +22,43 @@
import com.codahale.metrics.MetricRegistry;
import java.util.Collection;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.runners.spark.metrics.WithMetricsSupport;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.spark.metrics.sink.Sink;

/** An in-memory {@link Sink} implementation for tests. */
public class InMemoryMetrics implements Sink {

private static WithMetricsSupport extendedMetricsRegistry;
private static MetricRegistry internalMetricRegistry;
private static final AtomicReference<WithMetricsSupport> extendedMetricsRegistry =
new AtomicReference<>();
private static final AtomicReference<MetricRegistry> internalMetricRegistry =
new AtomicReference<>();

// Constructor for Spark 3.1
@SuppressWarnings("UnusedParameters")
public InMemoryMetrics(
final Properties properties,
final MetricRegistry metricRegistry,
final org.apache.spark.SecurityManager securityMgr) {
extendedMetricsRegistry = WithMetricsSupport.forRegistry(metricRegistry);
internalMetricRegistry = metricRegistry;
extendedMetricsRegistry.set(WithMetricsSupport.forRegistry(metricRegistry));
internalMetricRegistry.set(metricRegistry);
}

// Constructor for Spark >= 3.2
@SuppressWarnings("UnusedParameters")
public InMemoryMetrics(final Properties properties, final MetricRegistry metricRegistry) {
extendedMetricsRegistry = WithMetricsSupport.forRegistry(metricRegistry);
internalMetricRegistry = metricRegistry;
extendedMetricsRegistry.set(WithMetricsSupport.forRegistry(metricRegistry));
internalMetricRegistry.set(metricRegistry);
}

@SuppressWarnings({"TypeParameterUnusedInFormals", "rawtypes"})
public static <T> T valueOf(final String name) {
// this might fail in case we have multiple aggregators with the same suffix after
// the last dot, but it should be good enough for tests.
if (extendedMetricsRegistry != null) {
Collection<Gauge> matches =
extendedMetricsRegistry.getGauges((n, m) -> n.endsWith(name)).values();
WithMetricsSupport extended = extendedMetricsRegistry.get();
if (extended != null) {
Collection<Gauge> matches = extended.getGauges((n, m) -> n.endsWith(name)).values();
return matches.isEmpty() ? null : (T) Iterables.getOnlyElement(matches).getValue();
} else {
return null;
Expand All @@ -64,8 +67,9 @@ public static <T> T valueOf(final String name) {

@SuppressWarnings("WeakerAccess")
public static void clearAll() {
if (internalMetricRegistry != null) {
internalMetricRegistry.removeMatching(MetricFilter.ALL);
MetricRegistry internal = internalMetricRegistry.get();
if (internal != null) {
internal.removeMatching(MetricFilter.ALL);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
* tests requiring a different context have to be forked using separate test classes.
*/
@SuppressWarnings({
"rawtypes" // TODO(https://github.com/apache/beam/issues/20447)
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
"StaticAssignmentInConstructor" // used for testing purposes
})
@RunWith(Enclosed.class)
public class SparkRunnerKryoRegistratorTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.io.common.DatabaseTestHelper;

/** Helper for creating connection and test tables on hive database via JDBC driver. */
class HiveDatabaseTestHelper {
private static Connection con;
private static Statement stmt;
private static final AtomicReference<Connection> con = new AtomicReference<>();
private static final AtomicReference<Statement> stmt = new AtomicReference<>();

HiveDatabaseTestHelper(
String hiveHost,
Expand All @@ -36,24 +37,24 @@ class HiveDatabaseTestHelper {
String hivePassword)
throws Exception {
String hiveUrl = String.format("jdbc:hive2://%s:%s/%s", hiveHost, hivePort, hiveDatabase);
con = DriverManager.getConnection(hiveUrl, hiveUsername, hivePassword);
stmt = con.createStatement();
con.set(DriverManager.getConnection(hiveUrl, hiveUsername, hivePassword));
stmt.set(con.get().createStatement());
}

/** Create hive table. */
String createHiveTable(String testIdentifier) throws Exception {
String tableName = DatabaseTestHelper.getTestTableName(testIdentifier);
stmt.execute(" CREATE TABLE IF NOT EXISTS " + tableName + " (id STRING)");
stmt.get().execute(" CREATE TABLE IF NOT EXISTS " + tableName + " (id STRING)");
return tableName;
}

/** Delete hive table. */
void dropHiveTable(String tableName) throws SQLException {
stmt.execute(" DROP TABLE " + tableName);
stmt.get().execute(" DROP TABLE " + tableName);
}

void closeConnection() throws Exception {
stmt.close();
con.close();
stmt.get().close();
con.get().close();
}
}
Loading
Loading