Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Map;
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.examples.complete.game.utils.GameConstants;
import org.apache.beam.examples.complete.game.utils.WriteToBigQuery;
import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
Expand Down Expand Up @@ -189,26 +190,25 @@ public interface Options extends LeaderBoard.Options {
* Create a map of information that describes how to write pipeline output to BigQuery. This map
* is used to write information about team score sums.
*/
protected static Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>
protected static Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>
configureWindowedWrite() {
Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure =
new HashMap<>();
Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure = new HashMap<>();
tableConfigure.put(
"team", new WriteWindowedToBigQuery.FieldInfo<>("STRING", (c, w) -> c.element().getKey()));
"team", new WriteToBigQuery.FieldInfo<>("STRING", (c, w) -> c.element().getKey()));
tableConfigure.put(
"total_score",
new WriteWindowedToBigQuery.FieldInfo<>("INTEGER", (c, w) -> c.element().getValue()));
new WriteToBigQuery.FieldInfo<>("INTEGER", (c, w) -> c.element().getValue()));
tableConfigure.put(
"window_start",
new WriteWindowedToBigQuery.FieldInfo<>(
new WriteToBigQuery.FieldInfo<>(
"STRING",
(c, w) -> {
IntervalWindow window = (IntervalWindow) w;
return GameConstants.DATE_TIME_FORMATTER.print(window.start());
}));
tableConfigure.put(
"processing_time",
new WriteWindowedToBigQuery.FieldInfo<>(
new WriteToBigQuery.FieldInfo<>(
"STRING", (c, w) -> GameConstants.DATE_TIME_FORMATTER.print(Instant.now())));
return tableConfigure;
}
Expand All @@ -217,20 +217,19 @@ public interface Options extends LeaderBoard.Options {
* Create a map of information that describes how to write pipeline output to BigQuery. This map
* is used to write information about mean user session time.
*/
protected static Map<String, WriteWindowedToBigQuery.FieldInfo<Double>>
configureSessionWindowWrite() {
protected static Map<String, WriteToBigQuery.FieldInfo<Double>> configureSessionWindowWrite() {

Map<String, WriteWindowedToBigQuery.FieldInfo<Double>> tableConfigure = new HashMap<>();
Map<String, WriteToBigQuery.FieldInfo<Double>> tableConfigure = new HashMap<>();
tableConfigure.put(
"window_start",
new WriteWindowedToBigQuery.FieldInfo<>(
new WriteToBigQuery.FieldInfo<>(
"STRING",
(c, w) -> {
IntervalWindow window = (IntervalWindow) w;
return GameConstants.DATE_TIME_FORMATTER.print(window.start());
}));
tableConfigure.put(
"mean_duration", new WriteWindowedToBigQuery.FieldInfo<>("FLOAT", (c, w) -> c.element()));
"mean_duration", new WriteToBigQuery.FieldInfo<>("FLOAT", (c, w) -> c.element()));
return tableConfigure;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,32 +130,30 @@ public interface Options extends ExampleOptions, StreamingOptions {
* Create a map of information that describes how to write pipeline output to BigQuery. This map
* is used to write team score sums and includes event timing information.
*/
protected static Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>
protected static Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>
configureWindowedTableWrite() {

Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure =
new HashMap<>();
Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure = new HashMap<>();
tableConfigure.put(
"team", new WriteWindowedToBigQuery.FieldInfo<>("STRING", (c, w) -> c.element().getKey()));
"team", new WriteToBigQuery.FieldInfo<>("STRING", (c, w) -> c.element().getKey()));
tableConfigure.put(
"total_score",
new WriteWindowedToBigQuery.FieldInfo<>("INTEGER", (c, w) -> c.element().getValue()));
new WriteToBigQuery.FieldInfo<>("INTEGER", (c, w) -> c.element().getValue()));
tableConfigure.put(
"window_start",
new WriteWindowedToBigQuery.FieldInfo<>(
new WriteToBigQuery.FieldInfo<>(
"STRING",
(c, w) -> {
IntervalWindow window = (IntervalWindow) w;
return GameConstants.DATE_TIME_FORMATTER.print(window.start());
}));
tableConfigure.put(
"processing_time",
new WriteWindowedToBigQuery.FieldInfo<>(
new WriteToBigQuery.FieldInfo<>(
"STRING", (c, w) -> GameConstants.DATE_TIME_FORMATTER.print(Instant.now())));
tableConfigure.put(
"timing",
new WriteWindowedToBigQuery.FieldInfo<>(
"STRING", (c, w) -> c.pane().getTiming().toString()));
new WriteToBigQuery.FieldInfo<>("STRING", (c, w) -> c.pane().getTiming().toString()));
return tableConfigure;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,12 @@ public interface Options extends LeaderBoard.Options {
*/
private static Map<String, FieldInfo<KV<String, Integer>>> configureCompleteWindowedTableWrite() {

Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure =
new HashMap<>();
tableConfigure.put(
"team", new WriteWindowedToBigQuery.FieldInfo<>("STRING", (c, w) -> c.element().getKey()));
tableConfigure.put(
"total_score",
new WriteWindowedToBigQuery.FieldInfo<>("INTEGER", (c, w) -> c.element().getValue()));
Map<String, FieldInfo<KV<String, Integer>>> tableConfigure = new HashMap<>();
tableConfigure.put("team", new FieldInfo<>("STRING", (c, w) -> c.element().getKey()));
tableConfigure.put("total_score", new FieldInfo<>("INTEGER", (c, w) -> c.element().getValue()));
tableConfigure.put(
"processing_time",
new WriteWindowedToBigQuery.FieldInfo<>(
new FieldInfo<>(
"STRING", (c, w) -> GameConstants.DATE_TIME_FORMATTER.print(Instant.now())));
return tableConfigure;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.joda.time.Duration;
Expand Down Expand Up @@ -68,8 +69,9 @@ public void after() {
private void add(long... instants) {
for (final long instant : instants) {
System.out.println("ADD " + instant);
Sessions.AssignContext context =
windowFn.new AssignContext() {
WindowFn<Object, IntervalWindow> wf = windowFn;
WindowFn<Object, IntervalWindow>.AssignContext context =
wf.new AssignContext() {
@Override
public Object element() {
return (Object) instant;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,7 @@ public FlinkPortablePipelineTranslator.Executor translate(
return context;
}

private void urnNotFound(
String id,
RunnerApi.Pipeline pipeline,
FlinkStreamingPortablePipelineTranslator.TranslationContext context) {
private void urnNotFound(String id, RunnerApi.Pipeline pipeline, TranslationContext context) {
throw new IllegalArgumentException(
String.format(
"Unknown type of URN %s for PTransform with id %s.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.jobsubmission.JobInvocation;
import org.apache.beam.runners.jobsubmission.JobInvoker;
import org.apache.beam.runners.jobsubmission.JobServerDriver;
import org.apache.beam.runners.jobsubmission.PortablePipelineResult;
import org.apache.beam.runners.jobsubmission.PortablePipelineRunner;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -210,7 +211,7 @@ private void stopJobService() throws InterruptedException {
}
}

private class DetachedJobInvokerFactory implements FlinkJobServerDriver.JobInvokerFactory {
private class DetachedJobInvokerFactory implements JobServerDriver.JobInvokerFactory {

private CountDownLatch latch = new CountDownLatch(1);
private volatile PortablePipelineRunner actualPipelineRunner;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,7 @@ public FlinkPortablePipelineTranslator.Executor translate(
return context;
}

private void urnNotFound(
String id,
RunnerApi.Pipeline pipeline,
FlinkStreamingPortablePipelineTranslator.TranslationContext context) {
private void urnNotFound(String id, RunnerApi.Pipeline pipeline, TranslationContext context) {
throw new IllegalArgumentException(
String.format(
"Unknown type of URN %s for PTransform with id %s.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ public void testCounter() {

@Test
public void testGauge() {
FlinkMetricContainer.FlinkGauge flinkGauge =
new FlinkMetricContainer.FlinkGauge(GaugeResult.empty());
FlinkMetricContainerBase.FlinkGauge flinkGauge =
new FlinkMetricContainerBase.FlinkGauge(GaugeResult.empty());
when(metricGroup.gauge(eq("namespace.name"), any())).thenReturn(flinkGauge);

MetricsContainer step = container.getMetricsContainer("step");
Expand Down Expand Up @@ -249,8 +249,8 @@ public boolean matches(FlinkDistributionGauge argument) {

@Test
public void testDistribution() {
FlinkMetricContainer.FlinkDistributionGauge flinkGauge =
new FlinkMetricContainer.FlinkDistributionGauge(DistributionResult.IDENTITY_ELEMENT);
FlinkDistributionGauge flinkGauge =
new FlinkDistributionGauge(DistributionResult.IDENTITY_ELEMENT);
when(metricGroup.gauge(eq("namespace.name"), any())).thenReturn(flinkGauge);

MetricsContainer step = container.getMetricsContainer("step");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void readerShouldRereadEvictedBatches() throws IOException, ExecutionExce
verify(base, times(1)).read(null, null);
CachingShuffleBatchReader.BatchRange range =
new CachingShuffleBatchReader.BatchRange(null, null);
CachingShuffleBatchReader.Batch batch = reader.cache.get(range);
ShuffleBatchReader.Batch batch = reader.cache.get(range);
assertThat(batch, notNullValue());
reader.cache.invalidateAll();
read = reader.read(null, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public final class ClassicBundleManagerTest {

private FutureCollector<String> mockFutureCollector;
private ClassicBundleManager<String> bundleManager;
private ClassicBundleManager.BundleProgressListener<String> bundleProgressListener;
private BundleManager.BundleProgressListener<String> bundleProgressListener;
private Scheduler<KeyedTimerData<Void>> mockScheduler;

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.google.protobuf.Message;
import org.checkerframework.checker.nullness.qual.Nullable;

public class DummyRateLimitPolicy implements GoogleAdsV23.RateLimitPolicy<GoogleAdsError> {
public class DummyRateLimitPolicy implements GoogleAdsIO.RateLimitPolicy<GoogleAdsError> {
@Override
public void onBeforeRequest(@Nullable String developerToken, String customerId, Message request)
throws InterruptedException {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler.BadRecordErrorHandler;
import org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator;
import org.apache.beam.sdk.util.construction.SdkComponents;
import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
Expand Down Expand Up @@ -355,7 +354,7 @@ public TypedRead<?> fromConfigRow(Row configRow, PipelineOptions options) {
// from older Beam versions.
// See https://github.com/apache/beam/issues/30534.
builder.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER);
builder.setBadRecordErrorHandler(new BadRecordErrorHandler.DefaultErrorHandler<>());
builder.setBadRecordErrorHandler(new ErrorHandler.DefaultErrorHandler<>());
} else {
byte[] badRecordRouter = configRow.getBytes("bad_record_router");
builder.setBadRecordRouter((BadRecordRouter) fromByteArray(badRecordRouter));
Expand Down Expand Up @@ -923,7 +922,7 @@ public Write<?> fromConfigRow(Row configRow, PipelineOptions options) {
// from older Beam versions.
// See https://github.com/apache/beam/issues/30534.
builder.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER);
builder.setBadRecordErrorHandler(new BadRecordErrorHandler.DefaultErrorHandler<>());
builder.setBadRecordErrorHandler(new ErrorHandler.DefaultErrorHandler<>());
} else {
byte[] badRecordRouter = configRow.getBytes("bad_record_router");
builder.setBadRecordRouter((BadRecordRouter) fromByteArray(badRecordRouter));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ListCoder;
Expand Down Expand Up @@ -128,7 +129,7 @@ public static KV<Table, List<TableRow>> decodeQueryResult(String queryResult) th

// Longs tend to get converted back to Integers due to JSON serialization. Convert them back.
public static TableRow convertNumbers(TableRow tableRow) {
for (TableRow.Entry<?, Object> entry : tableRow.entrySet()) {
for (Map.Entry<?, Object> entry : tableRow.entrySet()) {
if (entry.getValue() instanceof Integer) {
entry.setValue(Long.valueOf((Integer) entry.getValue()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

import static org.junit.Assert.assertEquals;

import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.http.HttpHeaders;
import com.google.api.client.http.HttpResponseException;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.grpc.GrpcStatusCode;
Expand Down Expand Up @@ -810,7 +810,7 @@ Map<TableRow, List<TableDataInsertAllResponse.InsertErrors>> getInsertErrors() {
void throwNotFound(@FormatString String format, Object... args) throws IOException {
throw new IOException(
String.format(format, args),
new GoogleJsonResponseException.Builder(404, String.format(format, args), new HttpHeaders())
new HttpResponseException.Builder(404, String.format(format, args), new HttpHeaders())
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1065,7 +1065,7 @@ public void testBatchableMutationFilterFn_cells() {
BatchableMutationFilterFn testFn =
new BatchableMutationFilterFn(null, null, 10000000, 3 * CELLS_PER_KEY, 1000);

BatchableMutationFilterFn.ProcessContext mockProcessContext =
DoFn<MutationGroup, MutationGroup>.ProcessContext mockProcessContext =
Mockito.mock(ProcessContext.class);
when(mockProcessContext.sideInput(any())).thenReturn(getSchema());

Expand Down Expand Up @@ -1195,7 +1195,7 @@ public void testBatchableMutationFilterFn_rows() {

BatchableMutationFilterFn testFn = new BatchableMutationFilterFn(null, null, 1000, 1000, 3);

BatchableMutationFilterFn.ProcessContext mockProcessContext =
DoFn<MutationGroup, MutationGroup>.ProcessContext mockProcessContext =
Mockito.mock(ProcessContext.class);
when(mockProcessContext.sideInput(any())).thenReturn(getSchema());

Expand Down Expand Up @@ -1246,7 +1246,7 @@ public void testBatchableMutationFilterFn_batchingDisabled() {

BatchableMutationFilterFn testFn = new BatchableMutationFilterFn(null, null, 0, 0, 0);

BatchableMutationFilterFn.ProcessContext mockProcessContext =
DoFn<MutationGroup, MutationGroup>.ProcessContext mockProcessContext =
Mockito.mock(ProcessContext.class);
when(mockProcessContext.sideInput(any())).thenReturn(getSchema());

Expand Down Expand Up @@ -1280,9 +1280,9 @@ public void testGatherSortAndBatchFn() throws Exception {
100, // groupingFactor
null);

GatherSortCreateBatchesFn.ProcessContext mockProcessContext =
DoFn<MutationGroup, Iterable<MutationGroup>>.ProcessContext mockProcessContext =
Mockito.mock(ProcessContext.class);
GatherSortCreateBatchesFn.FinishBundleContext mockFinishBundleContext =
DoFn<MutationGroup, Iterable<MutationGroup>>.FinishBundleContext mockFinishBundleContext =
Mockito.mock(FinishBundleContext.class);
when(mockProcessContext.sideInput(any())).thenReturn(getSchema());

Expand Down Expand Up @@ -1355,9 +1355,9 @@ public void testGatherBundleAndSortFn_flushOversizedBundle() throws Exception {
3, // groupingFactor
null);

GatherSortCreateBatchesFn.ProcessContext mockProcessContext =
DoFn<MutationGroup, Iterable<MutationGroup>>.ProcessContext mockProcessContext =
Mockito.mock(ProcessContext.class);
GatherSortCreateBatchesFn.FinishBundleContext mockFinishBundleContext =
DoFn<MutationGroup, Iterable<MutationGroup>>.FinishBundleContext mockFinishBundleContext =
Mockito.mock(FinishBundleContext.class);
when(mockProcessContext.sideInput(any())).thenReturn(getSchema());
OutputReceiver<Iterable<MutationGroup>> mockOutputReceiver = mock(OutputReceiver.class);
Expand Down Expand Up @@ -1485,9 +1485,9 @@ public void testBatchFn_rows() throws Exception {
}

private void testAndVerifyBatches(GatherSortCreateBatchesFn testFn) throws Exception {
GatherSortCreateBatchesFn.ProcessContext mockProcessContext =
DoFn<MutationGroup, Iterable<MutationGroup>>.ProcessContext mockProcessContext =
Mockito.mock(ProcessContext.class);
GatherSortCreateBatchesFn.FinishBundleContext mockFinishBundleContext =
DoFn<MutationGroup, Iterable<MutationGroup>>.FinishBundleContext mockFinishBundleContext =
Mockito.mock(FinishBundleContext.class);
when(mockProcessContext.sideInput(any())).thenReturn(getSchema());

Expand Down
Loading