diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index fe0f1ab8bc91..32c40cde57c6 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -1534,7 +1534,6 @@ class BeamModulePlugin implements Plugin { "AutoValueImmutableFields", "AutoValueImmutableFields", "AutoValueSubclassLeaked", - "BadImport", "BigDecimalEquals", "ComparableType", "DoNotMockAutoValue", diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index d11c6c374333..d46ea6e0d701 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -134,7 +134,6 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution.State; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer; -import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer.Type; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WatermarkHold; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkerMetadataResponse; @@ -1643,7 +1642,7 @@ private Timer buildWatermarkTimer(String tagPrefix, long timestampMillis, boolea Timer.Builder builder = Timer.newBuilder() .setTag(ByteString.copyFromUtf8(tagPrefix + ":" + timestampMillis)) - .setType(Type.WATERMARK) + .setType(Timer.Type.WATERMARK) .setStateFamily("MergeWindows"); if (!delete) { builder.setTimestamp(timestampMillis * 1000); diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.java index eb06bff2237e..94e081d02684 100644 --- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.java +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.java @@ -50,7 +50,6 @@ import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService; import org.apache.beam.runners.fnexecution.data.GrpcDataService; import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory; -import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory.Provider; import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment; import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService; import org.apache.beam.runners.fnexecution.provisioning.JobInfo; @@ -190,7 +189,7 @@ public void createsMultipleEnvironmentOfSingleType() throws Exception { .thenReturn(envFactoryB); when(environmentProviderFactoryB.getServerFactory()).thenReturn(serverFactory); - Map environmentFactoryProviderMap = + Map environmentFactoryProviderMap = ImmutableMap.of( environmentA.getUrn(), environmentProviderFactoryA, environmentB.getUrn(), environmentProviderFactoryB); @@ -231,7 +230,7 @@ public void createsMultipleEnvironmentsWithSdkWorkerParallelism() throws Excepti .thenReturn(envFactoryA); when(environmentProviderFactoryA.getServerFactory()).thenReturn(serverFactory); - Map environmentFactoryProviderMap = + Map environmentFactoryProviderMap = ImmutableMap.of(environmentA.getUrn(), environmentProviderFactoryA); PortablePipelineOptions portableOptions = @@ -309,7 +308,7 @@ public void creatingMultipleEnvironmentFromMultipleTypes() throws Exception { .thenReturn(envFactoryB); when(environmentProviderFactoryB.getServerFactory()).thenReturn(serverFactory); - Map environmentFactoryProviderMap = + Map environmentFactoryProviderMap = ImmutableMap.of( environmentA.getUrn(), environmentProviderFactoryA, environmentB.getUrn(), environmentProviderFactoryB); @@ -338,7 +337,7 @@ public void expiresEnvironment() throws Exception { .thenReturn(envFactoryA); when(environmentProviderFactoryA.getServerFactory()).thenReturn(serverFactory); - Map environmentFactoryProviderMap = + Map environmentFactoryProviderMap = ImmutableMap.of(environmentA.getUrn(), environmentProviderFactoryA); PortablePipelineOptions portableOptions = diff --git a/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobInvocation.java b/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobInvocation.java index 8032549ab606..a642383378de 100644 --- a/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobInvocation.java +++ b/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobInvocation.java @@ -29,7 +29,6 @@ import org.apache.beam.model.jobmanagement.v1.JobApi; import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; -import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum; import org.apache.beam.model.jobmanagement.v1.JobApi.JobStateEvent; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; @@ -112,16 +111,16 @@ public void onSuccess(PortablePipelineResult pipelineResult) { switch (state) { case DONE: - setState(Enum.DONE); + setState(JobState.Enum.DONE); break; case RUNNING: - setState(Enum.RUNNING); + setState(JobState.Enum.RUNNING); break; case CANCELLED: - setState(Enum.CANCELLED); + setState(JobState.Enum.CANCELLED); break; case FAILED: - setState(Enum.FAILED); + setState(JobState.Enum.FAILED); break; default: setState(JobState.Enum.UNSPECIFIED); @@ -257,7 +256,7 @@ private synchronized void sendMessage(JobMessage message) { } } - public static Boolean isTerminated(Enum state) { + public static Boolean isTerminated(JobState.Enum state) { switch (state) { case DONE: case FAILED: diff --git a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 7ea277740b62..df5ad41bdc3c 100644 --- a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -19,7 +19,6 @@ import static org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderFactory.invoke; import static org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderFactory.invokeIfNotNull; -import static org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderFactory.newInstance; import static org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.match; import static org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.replace; import static org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.seqOf; @@ -417,7 +416,8 @@ private static Expression serializeOneOfField( private static Expression deserializeOneOfField(Expression in, Encoder enc, int idx) { GetStructField field = new GetStructField(in, idx, Option.empty()); Expression litNull = lit(null, TUPLE2_TYPE); - Expression newTuple = newInstance(Tuple2.class, TUPLE2_TYPE, lit(idx), deserialize(field, enc)); + Expression newTuple = + EncoderFactory.newInstance(Tuple2.class, TUPLE2_TYPE, lit(idx), deserialize(field, enc)); return new If(new IsNull(field), litNull, newTuple); } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java index a18aa2c25d72..74c213af391b 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java @@ -19,8 +19,6 @@ import java.util.concurrent.ExecutionException; import org.apache.beam.runners.core.SideInputReader; -import org.apache.beam.runners.spark.util.SideInputStorage.Key; -import org.apache.beam.runners.spark.util.SideInputStorage.Value; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; @@ -57,13 +55,13 @@ private CachedSideInputReader(SideInputReader delegate) { @Override public @Nullable T get(PCollectionView view, BoundedWindow window) { @SuppressWarnings("unchecked") - final Cache, Value> materializedCasted = + final Cache, SideInputStorage.Value> materializedCasted = (Cache) SideInputStorage.getMaterializedSideInputs(); - Key sideInputKey = new Key<>(view, window); + SideInputStorage.Key sideInputKey = new SideInputStorage.Key<>(view, window); try { - Value cachedResult = + SideInputStorage.Value cachedResult = materializedCasted.get( sideInputKey, () -> { @@ -73,7 +71,7 @@ private CachedSideInputReader(SideInputReader delegate) { sideInputKey, SizeEstimator.estimate(result)); - return new Value<>(result); + return new SideInputStorage.Value<>(result); }); return cachedResult.getValue(); } catch (ExecutionException e) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/channel/AddHarnessIdInterceptor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/channel/AddHarnessIdInterceptor.java index d079a80a948b..bc603367ec74 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/channel/AddHarnessIdInterceptor.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/channel/AddHarnessIdInterceptor.java @@ -21,12 +21,12 @@ import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ClientInterceptor; import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Metadata; -import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Metadata.Key; import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.MetadataUtils; /** A {@link ClientInterceptor} that attaches a provided SDK Harness ID to outgoing messages. */ public class AddHarnessIdInterceptor { - private static final Key ID_KEY = Key.of("worker_id", Metadata.ASCII_STRING_MARSHALLER); + private static final Metadata.Key ID_KEY = + Metadata.Key.of("worker_id", Metadata.ASCII_STRING_MARSHALLER); public static ClientInterceptor create(String harnessId) { checkArgument(harnessId != null, "harnessId must not be null"); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/server/GrpcContextHeaderAccessorProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/server/GrpcContextHeaderAccessorProvider.java index 6288ceba4cd1..1061cb045f68 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/server/GrpcContextHeaderAccessorProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/server/GrpcContextHeaderAccessorProvider.java @@ -20,7 +20,6 @@ import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Context; import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Contexts; import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Metadata; -import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Metadata.Key; import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ServerCall; import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ServerCall.Listener; import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ServerCallHandler; @@ -36,8 +35,8 @@ }) public class GrpcContextHeaderAccessorProvider { - private static final Key WORKER_ID_KEY = - Key.of("worker_id", Metadata.ASCII_STRING_MARSHALLER); + private static final Metadata.Key WORKER_ID_KEY = + Metadata.Key.of("worker_id", Metadata.ASCII_STRING_MARSHALLER); private static final Context.Key SDK_WORKER_CONTEXT_KEY = Context.key("worker_id"); private static final GrpcHeaderAccessor HEADER_ACCESSOR = new GrpcHeaderAccessor(); diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java index 882e46208a96..cee0b62bfe11 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java @@ -56,7 +56,6 @@ import org.apache.avro.Conversions; import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; -import org.apache.avro.Schema.Type; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericFixed; import org.apache.avro.generic.GenericRecord; @@ -238,7 +237,7 @@ public static TypeWithNullability create(org.apache.avro.Schema avroSchema) { } TypeWithNullability(org.apache.avro.Schema avroSchema) { - if (avroSchema.getType() == Type.UNION) { + if (avroSchema.getType() == org.apache.avro.Schema.Type.UNION) { List types = avroSchema.getTypes(); // optional fields in AVRO have form of: @@ -246,7 +245,9 @@ public static TypeWithNullability create(org.apache.avro.Schema avroSchema) { // don't need recursion because nested unions aren't supported in AVRO List nonNullTypes = - types.stream().filter(x -> x.getType() != Type.NULL).collect(Collectors.toList()); + types.stream() + .filter(x -> x.getType() != org.apache.avro.Schema.Type.NULL) + .collect(Collectors.toList()); if (nonNullTypes.size() == types.size() || nonNullTypes.isEmpty()) { // union without `null` or all 'null' union, keep as is. @@ -303,7 +304,7 @@ && checkNotNull(fieldType.getLogicalType()) /** Create a {@link FixedBytesField} from an AVRO type. */ public static @Nullable FixedBytesField fromAvroType(org.apache.avro.Schema type) { - if (type.getType().equals(Type.FIXED)) { + if (type.getType().equals(org.apache.avro.Schema.Type.FIXED)) { return new FixedBytesField(type.getFixedSize()); } else { return null; @@ -672,7 +673,9 @@ public static SerializableFunction getFromRowFunction(Class clazz public static @Nullable Schema getSchema( Class clazz, org.apache.avro.@Nullable Schema schema) { if (schema != null) { - return schema.getType().equals(Type.RECORD) ? toBeamSchema(schema) : null; + return schema.getType().equals(org.apache.avro.Schema.Type.RECORD) + ? toBeamSchema(schema) + : null; } if (GenericRecord.class.equals(clazz)) { throw new IllegalArgumentException("No schema provided for getSchema(GenericRecord)"); @@ -1118,44 +1121,45 @@ private static org.apache.avro.Schema getFieldSchema( case BYTE: case INT16: case INT32: - baseType = org.apache.avro.Schema.create(Type.INT); + baseType = org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT); break; case INT64: - baseType = org.apache.avro.Schema.create(Type.LONG); + baseType = org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG); break; case DECIMAL: baseType = LogicalTypes.decimal(Integer.MAX_VALUE) - .addToSchema(org.apache.avro.Schema.create(Type.BYTES)); + .addToSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES)); break; case FLOAT: - baseType = org.apache.avro.Schema.create(Type.FLOAT); + baseType = org.apache.avro.Schema.create(org.apache.avro.Schema.Type.FLOAT); break; case DOUBLE: - baseType = org.apache.avro.Schema.create(Type.DOUBLE); + baseType = org.apache.avro.Schema.create(org.apache.avro.Schema.Type.DOUBLE); break; case STRING: - baseType = org.apache.avro.Schema.create(Type.STRING); + baseType = org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING); break; case DATETIME: // TODO: There is a desire to move Beam schema DATETIME to a micros representation. When // this is done, this logical type needs to be changed. baseType = - LogicalTypes.timestampMillis().addToSchema(org.apache.avro.Schema.create(Type.LONG)); + LogicalTypes.timestampMillis() + .addToSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG)); break; case BOOLEAN: - baseType = org.apache.avro.Schema.create(Type.BOOLEAN); + baseType = org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BOOLEAN); break; case BYTES: - baseType = org.apache.avro.Schema.create(Type.BYTES); + baseType = org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES); break; case LOGICAL_TYPE: @@ -1167,7 +1171,7 @@ private static org.apache.avro.Schema getFieldSchema( baseType = fixedBytesField.toAvroType("fixed", namespace + "." + fieldName); } else if (VariableBytes.IDENTIFIER.equals(identifier)) { // treat VARBINARY as bytes as that is what avro supports - baseType = org.apache.avro.Schema.create(Type.BYTES); + baseType = org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES); } else if (FixedString.IDENTIFIER.equals(identifier) || "CHAR".equals(identifier) || "NCHAR".equals(identifier)) { @@ -1190,19 +1194,24 @@ private static org.apache.avro.Schema getFieldSchema( .map(x -> getFieldSchema(x.getType(), x.getName(), namespace)) .collect(Collectors.toList())); } else if ("DATE".equals(identifier) || SqlTypes.DATE.getIdentifier().equals(identifier)) { - baseType = LogicalTypes.date().addToSchema(org.apache.avro.Schema.create(Type.INT)); + baseType = + LogicalTypes.date() + .addToSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT)); } else if ("TIME".equals(identifier)) { - baseType = LogicalTypes.timeMillis().addToSchema(org.apache.avro.Schema.create(Type.INT)); + baseType = + LogicalTypes.timeMillis() + .addToSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT)); } else if (SqlTypes.TIMESTAMP.getIdentifier().equals(identifier)) { baseType = - LogicalTypes.timestampMicros().addToSchema(org.apache.avro.Schema.create(Type.LONG)); + LogicalTypes.timestampMicros() + .addToSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG)); } else if (Timestamp.IDENTIFIER.equals(identifier)) { int precision = checkNotNull(logicalType.getArgument()); if (precision != 9) { throw new RuntimeException( "Timestamp logical type precision not supported:" + precision); } - baseType = org.apache.avro.Schema.create(Type.LONG); + baseType = org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG); baseType.addProp("logicalType", TIMESTAMP_NANOS_LOGICAL_TYPE); } else { throw new RuntimeException( @@ -1242,10 +1251,11 @@ private static org.apache.avro.Schema getFieldSchema( private static final Map> NUMERIC_CONVERTERS = ImmutableMap.of( - org.apache.avro.Schema.create(Type.INT), Number::intValue, - org.apache.avro.Schema.create(Type.LONG), Number::longValue, - org.apache.avro.Schema.create(Type.FLOAT), Number::floatValue, - org.apache.avro.Schema.create(Type.DOUBLE), Number::doubleValue); + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT), Number::intValue, + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG), Number::longValue, + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.FLOAT), Number::floatValue, + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.DOUBLE), + Number::doubleValue); /** Convert a value from Beam Row to a vlue used for Avro GenericRecord. */ private static @Nullable Object genericFromBeamField( @@ -1290,10 +1300,10 @@ private static org.apache.avro.Schema getFieldSchema( return result; case DATETIME: - if (typeWithNullability.type.getType() == Type.INT) { + if (typeWithNullability.type.getType() == org.apache.avro.Schema.Type.INT) { ReadableInstant instant = (ReadableInstant) value; return (int) Days.daysBetween(Instant.EPOCH, instant).getDays(); - } else if (typeWithNullability.type.getType() == Type.LONG) { + } else if (typeWithNullability.type.getType() == org.apache.avro.Schema.Type.LONG) { ReadableInstant instant = (ReadableInstant) value; return (long) instant.getMillis(); } else { @@ -1418,7 +1428,7 @@ private static Object convertLogicalType( // TODO: Remove this workaround once Avro is upgraded to 1.12+ where timestamp-nanos if (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(type.type.getProp("logicalType"))) { - if (type.type.getType() == Type.LONG) { + if (type.type.getType() == org.apache.avro.Schema.Type.LONG) { Long nanos = (Long) value; // Check if Beam expects Timestamp logical type if (fieldType.getTypeName() == TypeName.LOGICAL_TYPE diff --git a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java index d087ed0a20bc..9e0519ae4eb9 100644 --- a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java +++ b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java @@ -36,7 +36,6 @@ import org.apache.avro.Conversions; import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; -import org.apache.avro.Schema.Type; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; @@ -84,7 +83,7 @@ public class AvroUtilsTest { private static final org.apache.avro.Schema NULL_SCHEMA = - org.apache.avro.Schema.create(Type.NULL); + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.NULL); private static final String VERSION_AVRO = org.apache.avro.Schema.class.getPackage().getImplementationVersion(); @@ -296,40 +295,47 @@ public void avroToBeamRoundTrip( public void testUnwrapNullableSchema() { org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createUnion( - org.apache.avro.Schema.create(Type.NULL), org.apache.avro.Schema.create(Type.STRING)); + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.NULL), + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING)); AvroUtils.TypeWithNullability typeWithNullability = new AvroUtils.TypeWithNullability(avroSchema); assertTrue(typeWithNullability.nullable); - assertEquals(org.apache.avro.Schema.create(Type.STRING), typeWithNullability.type); + assertEquals( + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING), + typeWithNullability.type); } @Test public void testUnwrapNullableSchemaReordered() { org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createUnion( - org.apache.avro.Schema.create(Type.STRING), org.apache.avro.Schema.create(Type.NULL)); + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING), + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.NULL)); AvroUtils.TypeWithNullability typeWithNullability = new AvroUtils.TypeWithNullability(avroSchema); assertTrue(typeWithNullability.nullable); - assertEquals(org.apache.avro.Schema.create(Type.STRING), typeWithNullability.type); + assertEquals( + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING), + typeWithNullability.type); } @Test public void testUnwrapNullableSchemaToUnion() { org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createUnion( - org.apache.avro.Schema.create(Type.STRING), - org.apache.avro.Schema.create(Type.LONG), - org.apache.avro.Schema.create(Type.NULL)); + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING), + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG), + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.NULL)); AvroUtils.TypeWithNullability typeWithNullability = new AvroUtils.TypeWithNullability(avroSchema); assertTrue(typeWithNullability.nullable); assertEquals( org.apache.avro.Schema.createUnion( - org.apache.avro.Schema.create(Type.STRING), org.apache.avro.Schema.create(Type.LONG)), + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING), + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG)), typeWithNullability.type); } @@ -339,7 +345,8 @@ public void testNullableArrayFieldToBeamArrayField() { new org.apache.avro.Schema.Field( "arrayField", ReflectData.makeNullable( - org.apache.avro.Schema.createArray(org.apache.avro.Schema.create(Type.INT))), + org.apache.avro.Schema.createArray( + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT))), "", (Object) null); @@ -357,7 +364,8 @@ public void testNullableBeamArrayFieldToAvroField() { new org.apache.avro.Schema.Field( "arrayField", ReflectData.makeNullable( - org.apache.avro.Schema.createArray(org.apache.avro.Schema.create(Type.INT))), + org.apache.avro.Schema.createArray( + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT))), "", (Object) null); @@ -369,10 +377,16 @@ private static List getAvroSubSchemaFields() { List fields = Lists.newArrayList(); fields.add( new org.apache.avro.Schema.Field( - "bool", org.apache.avro.Schema.create(Type.BOOLEAN), "", (Object) null)); + "bool", + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BOOLEAN), + "", + (Object) null)); fields.add( new org.apache.avro.Schema.Field( - "int", org.apache.avro.Schema.create(Type.INT), "", (Object) null)); + "int", + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT), + "", + (Object) null)); return fields; } @@ -385,36 +399,58 @@ private static org.apache.avro.Schema getAvroSchema() { List fields = Lists.newArrayList(); fields.add( new org.apache.avro.Schema.Field( - "bool", org.apache.avro.Schema.create(Type.BOOLEAN), "", (Object) null)); + "bool", + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BOOLEAN), + "", + (Object) null)); fields.add( new org.apache.avro.Schema.Field( - "int", org.apache.avro.Schema.create(Type.INT), "", (Object) null)); + "int", + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT), + "", + (Object) null)); fields.add( new org.apache.avro.Schema.Field( - "long", org.apache.avro.Schema.create(Type.LONG), "", (Object) null)); + "long", + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG), + "", + (Object) null)); fields.add( new org.apache.avro.Schema.Field( - "float", org.apache.avro.Schema.create(Type.FLOAT), "", (Object) null)); + "float", + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.FLOAT), + "", + (Object) null)); fields.add( new org.apache.avro.Schema.Field( - "double", org.apache.avro.Schema.create(Type.DOUBLE), "", (Object) null)); + "double", + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.DOUBLE), + "", + (Object) null)); fields.add( new org.apache.avro.Schema.Field( - "string", org.apache.avro.Schema.create(Type.STRING), "", (Object) null)); + "string", + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING), + "", + (Object) null)); fields.add( new org.apache.avro.Schema.Field( - "bytes", org.apache.avro.Schema.create(Type.BYTES), "", (Object) null)); + "bytes", + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES), + "", + (Object) null)); fields.add( new org.apache.avro.Schema.Field( "decimal", LogicalTypes.decimal(Integer.MAX_VALUE) - .addToSchema(org.apache.avro.Schema.create(Type.BYTES)), + .addToSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES)), "", (Object) null)); fields.add( new org.apache.avro.Schema.Field( "timestampMillis", - LogicalTypes.timestampMillis().addToSchema(org.apache.avro.Schema.create(Type.LONG)), + LogicalTypes.timestampMillis() + .addToSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG)), "", (Object) null)); fields.add(new org.apache.avro.Schema.Field("row", getAvroSubSchema("row"), "", (Object) null)); @@ -489,7 +525,7 @@ private static GenericRecord getGenericRecord() { LogicalType decimalType = LogicalTypes.decimal(Integer.MAX_VALUE) - .addToSchema(org.apache.avro.Schema.create(Type.BYTES)) + .addToSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES)) .getLogicalType(); ByteBuffer encodedDecimal = new Conversions.DecimalConversion().toBytes(BIG_DECIMAL, null, decimalType); @@ -693,21 +729,24 @@ public void testNullableFieldInAvroSchema() { fields.add( new org.apache.avro.Schema.Field( "int", - ReflectData.makeNullable(org.apache.avro.Schema.create(Type.INT)), + ReflectData.makeNullable( + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT)), "", (Object) null)); fields.add( new org.apache.avro.Schema.Field( "array", org.apache.avro.Schema.createArray( - ReflectData.makeNullable(org.apache.avro.Schema.create(Type.BYTES))), + ReflectData.makeNullable( + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES))), "", (Object) null)); fields.add( new org.apache.avro.Schema.Field( "map", org.apache.avro.Schema.createMap( - ReflectData.makeNullable(org.apache.avro.Schema.create(Type.INT))), + ReflectData.makeNullable( + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT))), "", (Object) null)); fields.add( @@ -766,21 +805,24 @@ public void testNullableFieldsInBeamSchema() { fields.add( new org.apache.avro.Schema.Field( "int", - ReflectData.makeNullable(org.apache.avro.Schema.create(Type.INT)), + ReflectData.makeNullable( + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT)), "", (Object) null)); fields.add( new org.apache.avro.Schema.Field( "array", org.apache.avro.Schema.createArray( - ReflectData.makeNullable(org.apache.avro.Schema.create(Type.INT))), + ReflectData.makeNullable( + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT))), "", (Object) null)); fields.add( new org.apache.avro.Schema.Field( "map", org.apache.avro.Schema.createMap( - ReflectData.makeNullable(org.apache.avro.Schema.create(Type.INT))), + ReflectData.makeNullable( + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT))), "", (Object) null)); org.apache.avro.Schema avroSchema = @@ -813,8 +855,8 @@ public void testUnionFieldInAvroSchema() { List fields = Lists.newArrayList(); List unionFields = Lists.newArrayList(); - unionFields.add(org.apache.avro.Schema.create(Type.INT)); - unionFields.add(org.apache.avro.Schema.create(Type.STRING)); + unionFields.add(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT)); + unionFields.add(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING)); fields.add( new org.apache.avro.Schema.Field( @@ -841,8 +883,8 @@ public void testUnionFieldInBeamSchema() { List fields = Lists.newArrayList(); List unionFields = Lists.newArrayList(); - unionFields.add(org.apache.avro.Schema.create(Type.INT)); - unionFields.add(org.apache.avro.Schema.create(Type.STRING)); + unionFields.add(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT)); + unionFields.add(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING)); fields.add( new org.apache.avro.Schema.Field( "union", org.apache.avro.Schema.createUnion(unionFields), "", (Object) null)); @@ -1133,7 +1175,8 @@ public void testBeamTimestampLogicalTypeToAvro() { fields.add( new org.apache.avro.Schema.Field( "timestampMicrosLT", - LogicalTypes.timestampMicros().addToSchema(org.apache.avro.Schema.create(Type.LONG)), + LogicalTypes.timestampMicros() + .addToSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG)), "", (Object) null)); org.apache.avro.Schema avroSchema = diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/FullJoin.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/FullJoin.java index 63312a735e33..774df463d0f3 100644 --- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/FullJoin.java +++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/FullJoin.java @@ -24,7 +24,6 @@ import org.apache.beam.sdk.extensions.euphoria.core.client.functional.BinaryFunctor; import org.apache.beam.sdk.extensions.euphoria.core.client.functional.UnaryFunction; import org.apache.beam.sdk.extensions.euphoria.core.client.io.Collector; -import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Join.Type; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; @@ -161,7 +160,7 @@ public UsingBuilder by( public Join.WindowByBuilder using( BinaryFunctor, Optional, OutputT> joinFunc, @Nullable TypeDescriptor outputType) { - return new Join.Builder<>(name, Type.FULL) + return new Join.Builder<>(name, Join.Type.FULL) .of(left, right) .by(leftKeyExtractor, rightKeyExtractor, keyType) .using( diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/RightJoin.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/RightJoin.java index c35657029333..d72cb37107f4 100644 --- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/RightJoin.java +++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/RightJoin.java @@ -24,7 +24,6 @@ import org.apache.beam.sdk.extensions.euphoria.core.client.functional.BinaryFunctor; import org.apache.beam.sdk.extensions.euphoria.core.client.functional.UnaryFunction; import org.apache.beam.sdk.extensions.euphoria.core.client.io.Collector; -import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Join.Type; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; @@ -161,7 +160,7 @@ public UsingBuilder by( public Join.WindowByBuilder using( BinaryFunctor, RightT, OutputT> joinFunc, @Nullable TypeDescriptor outputType) { - return new Join.Builder<>(name, Type.RIGHT) + return new Join.Builder<>(name, Join.Type.RIGHT) .of(left, right) .by(leftKeyExtractor, rightKeyExtractor, keyType) .using( diff --git a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/SingleJvmAccumulatorProviderTest.java b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/SingleJvmAccumulatorProviderTest.java index 4872b8e3b782..adfbb8a97144 100644 --- a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/SingleJvmAccumulatorProviderTest.java +++ b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/SingleJvmAccumulatorProviderTest.java @@ -22,7 +22,6 @@ import org.apache.beam.sdk.extensions.euphoria.core.client.accumulators.Counter; import org.apache.beam.sdk.extensions.euphoria.core.client.accumulators.Histogram; import org.apache.beam.sdk.extensions.euphoria.core.testkit.accumulators.SingleJvmAccumulatorProvider; -import org.apache.beam.sdk.extensions.euphoria.core.testkit.accumulators.SingleJvmAccumulatorProvider.Factory; import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -40,7 +39,8 @@ public class SingleJvmAccumulatorProviderTest { private static final String TEST_COUNTER_NAME = "test-counter"; private static final String TEST_HISTOGRAM_NAME = "test-histogram"; - private Factory accFactory = Factory.get(); + private SingleJvmAccumulatorProvider.Factory accFactory = + SingleJvmAccumulatorProvider.Factory.get(); @Test public void testBasicAccumulatorsFunction() { diff --git a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/collector/SingleValueCollectorTest.java b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/collector/SingleValueCollectorTest.java index 6d868344c46a..47e3fe28a6ad 100644 --- a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/collector/SingleValueCollectorTest.java +++ b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/collector/SingleValueCollectorTest.java @@ -22,7 +22,6 @@ import org.apache.beam.sdk.extensions.euphoria.core.client.accumulators.Counter; import org.apache.beam.sdk.extensions.euphoria.core.client.accumulators.Histogram; import org.apache.beam.sdk.extensions.euphoria.core.testkit.accumulators.SingleJvmAccumulatorProvider; -import org.apache.beam.sdk.extensions.euphoria.core.testkit.accumulators.SingleJvmAccumulatorProvider.Factory; import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -39,7 +38,8 @@ public class SingleValueCollectorTest { private static final String TEST_COUNTER_NAME = "test-counter"; private static final String TEST_HISTOGRAM_NAME = "test-histogram"; - private final Factory accumulatorFactory = SingleJvmAccumulatorProvider.Factory.get(); + private final SingleJvmAccumulatorProvider.Factory accumulatorFactory = + SingleJvmAccumulatorProvider.Factory.get(); @Test public void testBasicAccumulatorsAccess() { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java index 46a014f8196b..058b64f91532 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java @@ -43,7 +43,6 @@ import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; -import org.apache.avro.Schema.Type; import org.apache.avro.SchemaBuilder; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; @@ -324,7 +323,7 @@ static TableRow convertGenericRecordToTableRow(GenericRecord record) { } private static @Nullable Object getTypedCellValue(String name, Schema schema, Object v) { - Type type = schema.getType(); + Schema.Type type = schema.getType(); switch (type) { case ARRAY: return convertRepeatedField(name, schema.getElementType(), v); @@ -376,7 +375,7 @@ private static Object convertRequiredField(String name, Schema schema, Object v) // REQUIRED fields are represented as the corresponding Avro types. For example, a BigQuery // INTEGER type maps to an Avro LONG type. checkNotNull(v, "REQUIRED field %s should not be null", name); - Type type = schema.getType(); + Schema.Type type = schema.getType(); LogicalType logicalType = schema.getLogicalType(); switch (type) { case BOOLEAN: @@ -472,7 +471,7 @@ private static Object convertRequiredField(String name, Schema schema, Object v) private static @Nullable Object convertNullableField(String name, Schema union, Object v) { // NULLABLE fields are represented as an Avro Union of the corresponding type and "null". verify( - union.getType() == Type.UNION, + union.getType() == Schema.Type.UNION, "Expected Avro schema type UNION, not %s, for BigQuery NULLABLE field %s", union.getType(), name); @@ -484,7 +483,7 @@ private static Object convertRequiredField(String name, Schema schema, Object v) union); Schema type = union.getTypes().get(GenericData.get().resolveUnion(union, v)); - if (type.getType() == Type.NULL) { + if (type.getType() == Schema.Type.NULL) { return null; } else { return convertRequiredField(name, type, v); @@ -583,7 +582,7 @@ static TableSchema fromGenericAvroSchema(Schema schema) { static TableSchema fromGenericAvroSchema(Schema schema, Boolean useAvroLogicalTypes) { verify( - schema.getType() == Type.RECORD, + schema.getType() == Schema.Type.RECORD, "Expected Avro schema type RECORD, not %s", schema.getType()); @@ -602,7 +601,7 @@ private static TableFieldSchema fromAvroFieldSchema( case UNION: List types = fieldSchema.getTypes(); verify( - types.size() == 2 && types.get(0).getType() == Type.NULL, + types.size() == 2 && types.get(0).getType() == Schema.Type.NULL, "Avro union field %s should be of null and another type, not %s", avrofield.name(), fieldSchema); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 41bf06d7af23..c44e840b7231 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -29,7 +29,7 @@ import com.google.cloud.bigquery.storage.v1.Exceptions; import com.google.cloud.bigquery.storage.v1.ProtoRows; import com.google.cloud.bigquery.storage.v1.TableSchema; -import com.google.cloud.bigquery.storage.v1.WriteStream.Type; +import com.google.cloud.bigquery.storage.v1.WriteStream; import com.google.protobuf.ByteString; import com.google.protobuf.DescriptorProtos; import com.google.protobuf.Descriptors.Descriptor; @@ -397,7 +397,7 @@ String getOrCreateStreamName() throws Exception { if (!useDefaultStream) { this.streamName = Preconditions.checkStateNotNull(maybeWriteStreamService) - .createWriteStream(tableUrn, Type.PENDING) + .createWriteStream(tableUrn, WriteStream.Type.PENDING) .getName(); this.currentOffset = 0; } else { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index 03a5924cacb3..fd3d14e11804 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -29,7 +29,7 @@ import com.google.cloud.bigquery.storage.v1.Exceptions.StreamFinalizedException; import com.google.cloud.bigquery.storage.v1.ProtoRows; import com.google.cloud.bigquery.storage.v1.TableSchema; -import com.google.cloud.bigquery.storage.v1.WriteStream.Type; +import com.google.cloud.bigquery.storage.v1.WriteStream; import com.google.protobuf.ByteString; import com.google.protobuf.DescriptorProtos; import io.grpc.Status; @@ -395,7 +395,10 @@ String getOrCreateStream( // In a buffered stream, data is only visible up to the offset to which it was flushed. CreateTableHelpers.createTableWrapper( () -> { - stream.set(writeStreamService.createWriteStream(tableId, Type.BUFFERED).getName()); + stream.set( + writeStreamService + .createWriteStream(tableId, WriteStream.Type.BUFFERED) + .getName()); return null; }, tryCreateTable); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java index ab5ae80065a4..640de93bd083 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java @@ -33,7 +33,6 @@ import com.google.protobuf.DescriptorProtos.DescriptorProto; import com.google.protobuf.DescriptorProtos.FieldDescriptorProto; import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label; -import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type; import com.google.protobuf.DescriptorProtos.FileDescriptorProto; import com.google.protobuf.Descriptors; import com.google.protobuf.Descriptors.Descriptor; @@ -645,20 +644,22 @@ static SchemaInformation fromTableSchema( } } - static final Map PRIMITIVE_TYPES_BQ_TO_PROTO = - ImmutableMap.builder() - .put(TableFieldSchema.Type.INT64, Type.TYPE_INT64) - .put(TableFieldSchema.Type.DOUBLE, Type.TYPE_DOUBLE) - .put(TableFieldSchema.Type.STRING, Type.TYPE_STRING) - .put(TableFieldSchema.Type.BOOL, Type.TYPE_BOOL) - .put(TableFieldSchema.Type.BYTES, Type.TYPE_BYTES) - .put(TableFieldSchema.Type.NUMERIC, Type.TYPE_BYTES) - .put(TableFieldSchema.Type.BIGNUMERIC, Type.TYPE_BYTES) - .put(TableFieldSchema.Type.GEOGRAPHY, Type.TYPE_STRING) // Pass through the JSON encoding. - .put(TableFieldSchema.Type.DATE, Type.TYPE_INT32) - .put(TableFieldSchema.Type.TIME, Type.TYPE_INT64) - .put(TableFieldSchema.Type.DATETIME, Type.TYPE_INT64) - .put(TableFieldSchema.Type.JSON, Type.TYPE_STRING) + static final Map PRIMITIVE_TYPES_BQ_TO_PROTO = + ImmutableMap.builder() + .put(TableFieldSchema.Type.INT64, FieldDescriptorProto.Type.TYPE_INT64) + .put(TableFieldSchema.Type.DOUBLE, FieldDescriptorProto.Type.TYPE_DOUBLE) + .put(TableFieldSchema.Type.STRING, FieldDescriptorProto.Type.TYPE_STRING) + .put(TableFieldSchema.Type.BOOL, FieldDescriptorProto.Type.TYPE_BOOL) + .put(TableFieldSchema.Type.BYTES, FieldDescriptorProto.Type.TYPE_BYTES) + .put(TableFieldSchema.Type.NUMERIC, FieldDescriptorProto.Type.TYPE_BYTES) + .put(TableFieldSchema.Type.BIGNUMERIC, FieldDescriptorProto.Type.TYPE_BYTES) + .put( + TableFieldSchema.Type.GEOGRAPHY, + FieldDescriptorProto.Type.TYPE_STRING) // Pass through the JSON encoding. + .put(TableFieldSchema.Type.DATE, FieldDescriptorProto.Type.TYPE_INT32) + .put(TableFieldSchema.Type.TIME, FieldDescriptorProto.Type.TYPE_INT64) + .put(TableFieldSchema.Type.DATETIME, FieldDescriptorProto.Type.TYPE_INT64) + .put(TableFieldSchema.Type.JSON, FieldDescriptorProto.Type.TYPE_STRING) .build(); static final Map @@ -1044,14 +1045,16 @@ private static DescriptorProto descriptorSchemaFromTableFieldSchemas( FieldDescriptorProto.Builder fieldDescriptorBuilder = FieldDescriptorProto.newBuilder(); fieldDescriptorBuilder = fieldDescriptorBuilder.setName(StorageApiCDC.CHANGE_TYPE_COLUMN); fieldDescriptorBuilder = fieldDescriptorBuilder.setNumber(i++); - fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_STRING); + fieldDescriptorBuilder = + fieldDescriptorBuilder.setType(FieldDescriptorProto.Type.TYPE_STRING); fieldDescriptorBuilder = fieldDescriptorBuilder.setLabel(Label.LABEL_OPTIONAL); descriptorBuilder.addField(fieldDescriptorBuilder.build()); fieldDescriptorBuilder = FieldDescriptorProto.newBuilder(); fieldDescriptorBuilder = fieldDescriptorBuilder.setName(StorageApiCDC.CHANGE_SQN_COLUMN); fieldDescriptorBuilder = fieldDescriptorBuilder.setNumber(i++); - fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_STRING); + fieldDescriptorBuilder = + fieldDescriptorBuilder.setType(FieldDescriptorProto.Type.TYPE_STRING); fieldDescriptorBuilder = fieldDescriptorBuilder.setLabel(Label.LABEL_OPTIONAL); descriptorBuilder.addField(fieldDescriptorBuilder.build()); } @@ -1090,7 +1093,9 @@ private static void fieldDescriptorFromTableField( fieldSchema.getFieldsList(), respectRequired, false); descriptorBuilder.addNestedType(nested); fieldDescriptorBuilder = - fieldDescriptorBuilder.setType(Type.TYPE_MESSAGE).setTypeName(nested.getName()); + fieldDescriptorBuilder + .setType(FieldDescriptorProto.Type.TYPE_MESSAGE) + .setTypeName(nested.getName()); break; case TIMESTAMP: if (fieldSchema.getTimestampPrecision().getValue() == PICOSECOND_PRECISION) { @@ -1103,16 +1108,18 @@ private static void fieldDescriptorFromTableField( } fieldDescriptorBuilder = fieldDescriptorBuilder - .setType(Type.TYPE_MESSAGE) + .setType(FieldDescriptorProto.Type.TYPE_MESSAGE) .setTypeName(TIMESTAMP_PICOS_DESCRIPTOR_PROTO.getName()); } else { // Microsecond precision - use simple INT64 - fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_INT64); + fieldDescriptorBuilder = + fieldDescriptorBuilder.setType(FieldDescriptorProto.Type.TYPE_INT64); } break; default: - @Nullable Type type = PRIMITIVE_TYPES_BQ_TO_PROTO.get(fieldSchema.getType()); + FieldDescriptorProto.@Nullable Type type = + PRIMITIVE_TYPES_BQ_TO_PROTO.get(fieldSchema.getType()); if (type == null) { throw new UnsupportedOperationException( "Converting BigQuery type " + fieldSchema.getType() + " to Beam type is unsupported"); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java index e9b1e25a7afc..6a975d44bcd8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java @@ -40,7 +40,6 @@ import com.google.cloud.bigquery.storage.v1.FlushRowsResponse; import com.google.cloud.bigquery.storage.v1.ProtoRows; import com.google.cloud.bigquery.storage.v1.WriteStream; -import com.google.cloud.bigquery.storage.v1.WriteStream.Type; import com.google.errorprone.annotations.FormatMethod; import com.google.errorprone.annotations.FormatString; import com.google.protobuf.ByteString; @@ -120,13 +119,13 @@ public Entry(TableRow tableRow, UpdateType updateType, long sqn) { final String streamName; final List stream; final TableContainer tableContainer; - final Type type; + final WriteStream.Type type; long nextFlushPosition; boolean finalized; TableSchema currentSchema; @Nullable TableSchema updatedSchema = null; - Stream(String streamName, TableContainer tableContainer, Type type) { + Stream(String streamName, TableContainer tableContainer, WriteStream.Type type) { this.streamName = streamName; this.stream = Lists.newArrayList(); this.tableContainer = tableContainer; @@ -171,13 +170,13 @@ void appendRows(long position, List rowsToAppend) { + stream.size()); } stream.addAll(rowsToAppend); - if (type == Type.COMMITTED) { + if (type == WriteStream.Type.COMMITTED) { rowsToAppend.forEach(this::applyEntry); } } void flush(long position) { - Preconditions.checkState(type == Type.BUFFERED); + Preconditions.checkState(type == WriteStream.Type.BUFFERED); Preconditions.checkState(!finalized); if (position >= stream.size()) { throw new RuntimeException(""); @@ -204,7 +203,7 @@ void commit() { if (!finalized) { throw new RuntimeException("Can't commit unfinalized stream."); } - Preconditions.checkState(type == Type.PENDING); + Preconditions.checkState(type == WriteStream.Type.PENDING); stream.forEach(this::applyEntry); } } @@ -356,7 +355,8 @@ public void createTable(Table table) throws IOException { tableReference.getProjectId(), tableReference.getDatasetId(), BigQueryHelpers.stripPartitionDecorator(tableReference.getTableId())); - writeStreams.put(streamName, new Stream(streamName, tableContainer, Type.COMMITTED)); + writeStreams.put( + streamName, new Stream(streamName, tableContainer, WriteStream.Type.COMMITTED)); return tableContainer; }); @@ -566,7 +566,8 @@ public Table patchTableDescription( } @Override - public WriteStream createWriteStream(String tableUrn, Type type) throws InterruptedException { + public WriteStream createWriteStream(String tableUrn, WriteStream.Type type) + throws InterruptedException { try { TableReference tableReference = BigQueryHelpers.parseTableUrn(BigQueryHelpers.stripPartitionDecorator(tableUrn)); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java index 9698aaff1d73..8a02c285e090 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java @@ -26,7 +26,6 @@ import com.google.protobuf.DescriptorProtos.DescriptorProto; import com.google.protobuf.DescriptorProtos.FieldDescriptorProto; import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label; -import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type; import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; import java.math.BigDecimal; @@ -145,77 +144,77 @@ enum TestEnum { FieldDescriptorProto.newBuilder() .setName("bytesvalue") .setNumber(1) - .setType(Type.TYPE_BYTES) + .setType(FieldDescriptorProto.Type.TYPE_BYTES) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("bytebuffervalue") .setNumber(2) - .setType(Type.TYPE_BYTES) + .setType(FieldDescriptorProto.Type.TYPE_BYTES) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("intvalue") .setNumber(3) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("longvalue") .setNumber(4) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("floatvalue") .setNumber(5) - .setType(Type.TYPE_DOUBLE) + .setType(FieldDescriptorProto.Type.TYPE_DOUBLE) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("doublevalue") .setNumber(6) - .setType(Type.TYPE_DOUBLE) + .setType(FieldDescriptorProto.Type.TYPE_DOUBLE) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("stringvalue") .setNumber(7) - .setType(Type.TYPE_STRING) + .setType(FieldDescriptorProto.Type.TYPE_STRING) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("booleanvalue") .setNumber(8) - .setType(Type.TYPE_BOOL) + .setType(FieldDescriptorProto.Type.TYPE_BOOL) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("arrayvalue") .setNumber(9) - .setType(Type.TYPE_STRING) + .setType(FieldDescriptorProto.Type.TYPE_STRING) .setLabel(Label.LABEL_REPEATED) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("enumvalue") .setNumber(10) - .setType(Type.TYPE_STRING) + .setType(FieldDescriptorProto.Type.TYPE_STRING) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("fixedvalue") .setNumber(11) - .setType(Type.TYPE_BYTES) + .setType(FieldDescriptorProto.Type.TYPE_BYTES) .setLabel(Label.LABEL_REQUIRED) .build()) .build(); @@ -226,70 +225,70 @@ enum TestEnum { FieldDescriptorProto.newBuilder() .setName("numericvalue") .setNumber(1) - .setType(Type.TYPE_BYTES) + .setType(FieldDescriptorProto.Type.TYPE_BYTES) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("bignumericvalue") .setNumber(2) - .setType(Type.TYPE_BYTES) + .setType(FieldDescriptorProto.Type.TYPE_BYTES) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("datevalue") .setNumber(3) - .setType(Type.TYPE_INT32) + .setType(FieldDescriptorProto.Type.TYPE_INT32) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("timemicrosvalue") .setNumber(4) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("timemillisvalue") .setNumber(5) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("timestampmicrosvalue") .setNumber(6) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("timestampmillisvalue") .setNumber(7) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("localtimestampmicrosvalue") .setNumber(8) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("localtimestampmillisvalue") .setNumber(9) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("uuidvalue") .setNumber(10) - .setType(Type.TYPE_STRING) + .setType(FieldDescriptorProto.Type.TYPE_STRING) .setLabel(Label.LABEL_OPTIONAL) .build()) .build(); @@ -518,11 +517,11 @@ void validateDescriptorAgainstSchema(Schema originalSchema, DescriptorProto sche AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema(originalSchema), true, false); - Map types = + Map types = descriptor.getFieldList().stream() .collect( Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); - Map expectedTypes = + Map expectedTypes = schemaProto.getFieldList().stream() .collect( Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); @@ -563,12 +562,12 @@ public void testNestedFromSchema() { AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema(NESTED_SCHEMA), true, false); - Map expectedBaseTypes = + Map expectedBaseTypes = BASE_SCHEMA_PROTO.getFieldList().stream() .collect( Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); - Map types = + Map types = descriptor.getFieldList().stream() .collect( Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); @@ -587,19 +586,19 @@ public void testNestedFromSchema() { descriptor.getNestedTypeList().stream() .collect(Collectors.toMap(DescriptorProto::getName, Functions.identity())); assertEquals(2, nestedTypes.size()); - assertEquals(Type.TYPE_MESSAGE, types.get("nested")); + assertEquals(FieldDescriptorProto.Type.TYPE_MESSAGE, types.get("nested")); assertEquals(Label.LABEL_OPTIONAL, typeLabels.get("nested")); String nestedTypeName1 = typeNames.get("nested"); - Map nestedTypes1 = + Map nestedTypes1 = nestedTypes.get(nestedTypeName1).getFieldList().stream() .collect( Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); assertEquals(expectedBaseTypes, nestedTypes1); - assertEquals(Type.TYPE_MESSAGE, types.get("nestedarray")); + assertEquals(FieldDescriptorProto.Type.TYPE_MESSAGE, types.get("nestedarray")); assertEquals(Label.LABEL_REPEATED, typeLabels.get("nestedarray")); String nestedTypeName2 = typeNames.get("nestedarray"); - Map nestedTypes2 = + Map nestedTypes2 = nestedTypes.get(nestedTypeName2).getFieldList().stream() .collect( Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); @@ -808,7 +807,7 @@ public void testDescriptorFromSchemaTimestampNanos() { assertEquals(1, descriptor.getFieldCount()); FieldDescriptorProto field = descriptor.getField(0); assertEquals("timestampnanosvalue", field.getName()); - assertEquals(Type.TYPE_MESSAGE, field.getType()); + assertEquals(FieldDescriptorProto.Type.TYPE_MESSAGE, field.getType()); assertEquals("TimestampPicos", field.getTypeName()); // Verify nested TimestampPicos type exists diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java index c546a7ca5d77..b4f2fa6aa241 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java @@ -26,7 +26,6 @@ import com.google.protobuf.DescriptorProtos.DescriptorProto; import com.google.protobuf.DescriptorProtos.FieldDescriptorProto; import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label; -import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type; import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.Descriptors.FieldDescriptor; import com.google.protobuf.DynamicMessage; @@ -105,133 +104,133 @@ public class BeamRowToStorageApiProtoTest { FieldDescriptorProto.newBuilder() .setName("bytevalue") .setNumber(1) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("int16value") .setNumber(2) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_REQUIRED) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("int32value") .setNumber(3) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("int64value") .setNumber(4) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("decimalvalue") .setNumber(5) - .setType(Type.TYPE_BYTES) + .setType(FieldDescriptorProto.Type.TYPE_BYTES) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("floatvalue") .setNumber(6) - .setType(Type.TYPE_DOUBLE) + .setType(FieldDescriptorProto.Type.TYPE_DOUBLE) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("doublevalue") .setNumber(7) - .setType(Type.TYPE_DOUBLE) + .setType(FieldDescriptorProto.Type.TYPE_DOUBLE) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("stringvalue") .setNumber(8) - .setType(Type.TYPE_STRING) + .setType(FieldDescriptorProto.Type.TYPE_STRING) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("datetimevalue") .setNumber(9) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("booleanvalue") .setNumber(10) - .setType(Type.TYPE_BOOL) + .setType(FieldDescriptorProto.Type.TYPE_BOOL) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("bytesvalue") .setNumber(11) - .setType(Type.TYPE_BYTES) + .setType(FieldDescriptorProto.Type.TYPE_BYTES) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("arrayvalue") .setNumber(12) - .setType(Type.TYPE_STRING) + .setType(FieldDescriptorProto.Type.TYPE_STRING) .setLabel(Label.LABEL_REPEATED) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("arraynullvalue") .setNumber(13) - .setType(Type.TYPE_STRING) + .setType(FieldDescriptorProto.Type.TYPE_STRING) .setLabel(Label.LABEL_REPEATED) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("iterablevalue") .setNumber(14) - .setType(Type.TYPE_STRING) + .setType(FieldDescriptorProto.Type.TYPE_STRING) .setLabel(Label.LABEL_REPEATED) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("sqldatevalue") .setNumber(15) - .setType(Type.TYPE_INT32) + .setType(FieldDescriptorProto.Type.TYPE_INT32) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("sqltimevalue") .setNumber(16) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("sqldatetimevalue") .setNumber(17) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("sqltimestampvalue") .setNumber(18) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("enumvalue") .setNumber(19) - .setType(Type.TYPE_STRING) + .setType(FieldDescriptorProto.Type.TYPE_STRING) .setLabel(Label.LABEL_OPTIONAL) .build()) .build(); @@ -315,11 +314,11 @@ public void testDescriptorFromSchema() { DescriptorProto descriptor = TableRowToStorageApiProto.descriptorSchemaFromTableSchema( BeamRowToStorageApiProto.protoTableSchemaFromBeamSchema(BASE_SCHEMA), true, false); - Map types = + Map types = descriptor.getFieldList().stream() .collect( Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); - Map expectedTypes = + Map expectedTypes = BASE_SCHEMA_PROTO.getFieldList().stream() .collect( Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); @@ -347,12 +346,12 @@ public void testNestedFromSchema() { DescriptorProto descriptor = TableRowToStorageApiProto.descriptorSchemaFromTableSchema( BeamRowToStorageApiProto.protoTableSchemaFromBeamSchema((NESTED_SCHEMA)), true, false); - Map expectedBaseTypes = + Map expectedBaseTypes = BASE_SCHEMA_PROTO.getFieldList().stream() .collect( Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); - Map types = + Map types = descriptor.getFieldList().stream() .collect( Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); @@ -371,34 +370,34 @@ public void testNestedFromSchema() { descriptor.getNestedTypeList().stream() .collect(Collectors.toMap(DescriptorProto::getName, Functions.identity())); assertEquals(4, nestedTypes.size()); - assertEquals(Type.TYPE_MESSAGE, types.get("nested")); + assertEquals(FieldDescriptorProto.Type.TYPE_MESSAGE, types.get("nested")); assertEquals(Label.LABEL_OPTIONAL, typeLabels.get("nested")); String nestedTypeName1 = typeNames.get("nested"); - Map nestedTypes1 = + Map nestedTypes1 = nestedTypes.get(nestedTypeName1).getFieldList().stream() .collect( Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); assertEquals(expectedBaseTypes, nestedTypes1); - assertEquals(Type.TYPE_MESSAGE, types.get("nestedarray")); + assertEquals(FieldDescriptorProto.Type.TYPE_MESSAGE, types.get("nestedarray")); assertEquals(Label.LABEL_REPEATED, typeLabels.get("nestedarray")); String nestedTypeName2 = typeNames.get("nestedarray"); - Map nestedTypes2 = + Map nestedTypes2 = nestedTypes.get(nestedTypeName2).getFieldList().stream() .collect( Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); assertEquals(expectedBaseTypes, nestedTypes2); - assertEquals(Type.TYPE_MESSAGE, types.get("nestediterable")); + assertEquals(FieldDescriptorProto.Type.TYPE_MESSAGE, types.get("nestediterable")); assertEquals(Label.LABEL_REPEATED, typeLabels.get("nestediterable")); String nestedTypeName3 = typeNames.get("nestediterable"); - Map nestedTypes3 = + Map nestedTypes3 = nestedTypes.get(nestedTypeName3).getFieldList().stream() .collect( Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); assertEquals(expectedBaseTypes, nestedTypes3); - assertEquals(Type.TYPE_MESSAGE, types.get("nestedmap")); + assertEquals(FieldDescriptorProto.Type.TYPE_MESSAGE, types.get("nestedmap")); assertEquals(Label.LABEL_REPEATED, typeLabels.get("nestedmap")); String nestedTypeName4 = typeNames.get("nestedmap"); // expects 2 fields in the nested map, key and value @@ -408,7 +407,7 @@ public void testNestedFromSchema() { assertTrue(stream.get().anyMatch(fdp -> fdp.getName().equals("key"))); assertTrue(stream.get().anyMatch(fdp -> fdp.getName().equals("value"))); - Map nestedTypes4 = + Map nestedTypes4 = nestedTypes.get(nestedTypeName4).getNestedTypeList().stream() .flatMap(vdesc -> vdesc.getFieldList().stream()) .collect( @@ -434,7 +433,7 @@ public void testParticularMapsFromSchemas() { true, false); - Map types = + Map types = descriptor.getFieldList().stream() .collect( Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); @@ -452,7 +451,7 @@ public void testParticularMapsFromSchemas() { .collect(Collectors.toMap(DescriptorProto::getName, Functions.identity())); assertEquals(2, nestedTypes.size()); - assertEquals(Type.TYPE_MESSAGE, types.get("nestedmultimap")); + assertEquals(FieldDescriptorProto.Type.TYPE_MESSAGE, types.get("nestedmultimap")); assertEquals(Label.LABEL_REPEATED, typeLabels.get("nestedmultimap")); String nestedMultiMapName = typeNames.get("nestedmultimap"); // expects 2 fields for the nested array of maps, key and value @@ -469,7 +468,7 @@ public void testParticularMapsFromSchemas() { .count() == 1); - assertEquals(Type.TYPE_MESSAGE, types.get("nestedmapnullable")); + assertEquals(FieldDescriptorProto.Type.TYPE_MESSAGE, types.get("nestedmapnullable")); // even though the field is marked as optional in the row we will should see repeated in proto assertEquals(Label.LABEL_REPEATED, typeLabels.get("nestedmapnullable")); String nestedMapNullableName = typeNames.get("nestedmapnullable"); @@ -641,7 +640,7 @@ public void testTimestampNanosDescriptor() throws Exception { FieldDescriptorProto field = descriptor.getField(0); assertEquals("timestampnanos", field.getName()); - assertEquals(Type.TYPE_MESSAGE, field.getType()); + assertEquals(FieldDescriptorProto.Type.TYPE_MESSAGE, field.getType()); assertEquals("TimestampPicos", field.getTypeName()); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index a5d6ac68ce66..624683e4756d 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -19,7 +19,6 @@ import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString; import static org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto.TYPE_MAP_PROTO_CONVERTERS; -import static org.apache.beam.sdk.io.gcp.bigquery.WriteTables.ResultCoder.INSTANCE; import static org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryFileLoadsSchemaTransformProvider.BigQueryFileLoadsSchemaTransform; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; @@ -3006,7 +3005,7 @@ public void testWriteTables() throws Exception { PCollection> writeTablesOutput = writeTablesInput .apply(writeTables) - .setCoder(KvCoder.of(StringUtf8Coder.of(), INSTANCE)) + .setCoder(KvCoder.of(StringUtf8Coder.of(), new WriteTables.ResultCoder())) .apply( ParDo.of( new DoFn< @@ -3115,7 +3114,9 @@ public void testWriteRename() throws Exception { Create.of( ImmutableList.of( (Iterable>) tempTablesElement)) - .withCoder(IterableCoder.of(KvCoder.of(TableDestinationCoder.of(), INSTANCE)))) + .withCoder( + IterableCoder.of( + KvCoder.of(TableDestinationCoder.of(), WriteTables.ResultCoder.INSTANCE)))) .apply(writeRename); p.run().waitUntilFinish(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java index ea3bb29e0815..27f7b2cf5249 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java @@ -37,7 +37,6 @@ import com.google.protobuf.DescriptorProtos.DescriptorProto; import com.google.protobuf.DescriptorProtos.FieldDescriptorProto; import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label; -import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type; import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.Descriptors.DescriptorValidationException; import com.google.protobuf.Descriptors.FieldDescriptor; @@ -203,196 +202,196 @@ public class TableRowToStorageApiProtoTest { FieldDescriptorProto.newBuilder() .setName("stringvalue") .setNumber(1) - .setType(Type.TYPE_STRING) + .setType(FieldDescriptorProto.Type.TYPE_STRING) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("f") .setNumber(2) - .setType(Type.TYPE_STRING) + .setType(FieldDescriptorProto.Type.TYPE_STRING) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("bytesvalue") .setNumber(3) - .setType(Type.TYPE_BYTES) + .setType(FieldDescriptorProto.Type.TYPE_BYTES) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("int64value") .setNumber(4) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("intvalue") .setNumber(5) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("float64value") .setNumber(6) - .setType(Type.TYPE_DOUBLE) + .setType(FieldDescriptorProto.Type.TYPE_DOUBLE) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("floatvalue") .setNumber(7) - .setType(Type.TYPE_DOUBLE) + .setType(FieldDescriptorProto.Type.TYPE_DOUBLE) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("boolvalue") .setNumber(8) - .setType(Type.TYPE_BOOL) + .setType(FieldDescriptorProto.Type.TYPE_BOOL) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("booleanvalue") .setNumber(9) - .setType(Type.TYPE_BOOL) + .setType(FieldDescriptorProto.Type.TYPE_BOOL) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("timestampvalue") .setNumber(10) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("timevalue") .setNumber(11) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("datetimevalue") .setNumber(12) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("datevalue") .setNumber(13) - .setType(Type.TYPE_INT32) + .setType(FieldDescriptorProto.Type.TYPE_INT32) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("numericvalue") .setNumber(14) - .setType(Type.TYPE_BYTES) + .setType(FieldDescriptorProto.Type.TYPE_BYTES) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("bignumericvalue") .setNumber(15) - .setType(Type.TYPE_BYTES) + .setType(FieldDescriptorProto.Type.TYPE_BYTES) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("numericvalue2") .setNumber(16) - .setType(Type.TYPE_BYTES) + .setType(FieldDescriptorProto.Type.TYPE_BYTES) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("bignumericvalue2") .setNumber(17) - .setType(Type.TYPE_BYTES) + .setType(FieldDescriptorProto.Type.TYPE_BYTES) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("arrayvalue") .setNumber(18) - .setType(Type.TYPE_BYTES) + .setType(FieldDescriptorProto.Type.TYPE_BYTES) .setLabel(Label.LABEL_REPEATED) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("timestampisovalue") .setNumber(19) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("timestampisovalueoffsethh") .setNumber(20) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("timestampvaluelong") .setNumber(21) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("timestampvaluespace") .setNumber(22) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("timestampvaluespaceutc") .setNumber(23) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("timestampvaluezoneregion") .setNumber(24) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("timestampvaluespacemilli") .setNumber(25) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("timestampvaluespacetrailingzero") .setNumber(26) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("datetimevaluespace") .setNumber(27) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("timestampvaluemaximum") .setNumber(28) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( @@ -400,7 +399,7 @@ public class TableRowToStorageApiProtoTest { .setName( BigQuerySchemaUtil.generatePlaceholderFieldName("123_illegalprotofieldname")) .setNumber(29) - .setType(Type.TYPE_STRING) + .setType(FieldDescriptorProto.Type.TYPE_STRING) .setLabel(Label.LABEL_OPTIONAL) .setOptions( DescriptorProtos.FieldOptions.newBuilder() @@ -412,7 +411,7 @@ public class TableRowToStorageApiProtoTest { FieldDescriptorProto.newBuilder() .setName("timestamppicosvalue") .setNumber(30) - .setType(Type.TYPE_MESSAGE) + .setType(FieldDescriptorProto.Type.TYPE_MESSAGE) .setLabel(Label.LABEL_OPTIONAL) .setTypeName("TimestampPicos") .build()) @@ -579,189 +578,189 @@ public class TableRowToStorageApiProtoTest { FieldDescriptorProto.newBuilder() .setName("stringvalue") .setNumber(1) - .setType(Type.TYPE_STRING) + .setType(FieldDescriptorProto.Type.TYPE_STRING) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("bytesvalue") .setNumber(2) - .setType(Type.TYPE_BYTES) + .setType(FieldDescriptorProto.Type.TYPE_BYTES) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("int64value") .setNumber(3) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("intvalue") .setNumber(4) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("float64value") .setNumber(5) - .setType(Type.TYPE_DOUBLE) + .setType(FieldDescriptorProto.Type.TYPE_DOUBLE) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("floatvalue") .setNumber(6) - .setType(Type.TYPE_DOUBLE) + .setType(FieldDescriptorProto.Type.TYPE_DOUBLE) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("boolvalue") .setNumber(7) - .setType(Type.TYPE_BOOL) + .setType(FieldDescriptorProto.Type.TYPE_BOOL) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("booleanvalue") .setNumber(8) - .setType(Type.TYPE_BOOL) + .setType(FieldDescriptorProto.Type.TYPE_BOOL) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("timestampvalue") .setNumber(9) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("timevalue") .setNumber(10) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("datetimevalue") .setNumber(11) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("datevalue") .setNumber(2) - .setType(Type.TYPE_INT32) + .setType(FieldDescriptorProto.Type.TYPE_INT32) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("numericvalue") .setNumber(13) - .setType(Type.TYPE_BYTES) + .setType(FieldDescriptorProto.Type.TYPE_BYTES) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("bignumericvalue") .setNumber(14) - .setType(Type.TYPE_BYTES) + .setType(FieldDescriptorProto.Type.TYPE_BYTES) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("numericvalue2") .setNumber(15) - .setType(Type.TYPE_BYTES) + .setType(FieldDescriptorProto.Type.TYPE_BYTES) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("bignumericvalue2") .setNumber(16) - .setType(Type.TYPE_BYTES) + .setType(FieldDescriptorProto.Type.TYPE_BYTES) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("arrayvalue") .setNumber(17) - .setType(Type.TYPE_BYTES) + .setType(FieldDescriptorProto.Type.TYPE_BYTES) .setLabel(Label.LABEL_REPEATED) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("timestampisovalue") .setNumber(18) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("timestampisovalueoffsethh") .setNumber(19) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("timestampvaluelong") .setNumber(20) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("timestampvaluespace") .setNumber(21) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("timestampvaluespaceutc") .setNumber(22) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("timestampvaluezoneregion") .setNumber(23) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("timestampvaluespacemilli") .setNumber(24) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("timestampvaluespacetrailingzero") .setNumber(25) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("datetimevaluespace") .setNumber(26) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("timestampvaluemaximum") .setNumber(27) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( @@ -769,7 +768,7 @@ public class TableRowToStorageApiProtoTest { .setName( BigQuerySchemaUtil.generatePlaceholderFieldName("123_illegalprotofieldname")) .setNumber(28) - .setType(Type.TYPE_STRING) + .setType(FieldDescriptorProto.Type.TYPE_STRING) .setLabel(Label.LABEL_OPTIONAL) .setOptions( DescriptorProtos.FieldOptions.newBuilder() @@ -781,7 +780,7 @@ public class TableRowToStorageApiProtoTest { FieldDescriptorProto.newBuilder() .setName("timestamppicosvalue") .setNumber(29) - .setType(Type.TYPE_MESSAGE) + .setType(FieldDescriptorProto.Type.TYPE_MESSAGE) .setLabel(Label.LABEL_OPTIONAL) .setTypeName("TimestampPicos") .build()) @@ -997,11 +996,11 @@ public class TableRowToStorageApiProtoTest { public void testDescriptorFromTableSchema() throws Exception { DescriptorProto descriptor = TableRowToStorageApiProto.descriptorSchemaFromTableSchema(BASE_TABLE_SCHEMA, true, false); - Map types = + Map types = descriptor.getFieldList().stream() .collect( Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); - Map expectedTypes = + Map expectedTypes = BASE_TABLE_SCHEMA_PROTO_DESCRIPTOR.getFieldList().stream() .collect( Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); @@ -1031,16 +1030,16 @@ public void testDescriptorFromTableSchema() throws Exception { public void testNestedFromTableSchema() throws Exception { DescriptorProto descriptor = TableRowToStorageApiProto.descriptorSchemaFromTableSchema(NESTED_TABLE_SCHEMA, true, false); - Map expectedBaseTypes = + Map expectedBaseTypes = BASE_TABLE_SCHEMA_PROTO_DESCRIPTOR.getFieldList().stream() .collect( Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); - Map expectedBaseTypesNoF = + Map expectedBaseTypesNoF = BASE_TABLE_SCHEMA_NO_F_PROTO.getFieldList().stream() .collect( Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); - Map types = + Map types = descriptor.getFieldList().stream() .collect( Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); @@ -1054,32 +1053,32 @@ public void testNestedFromTableSchema() throws Exception { descriptor.getNestedTypeList().stream() .collect(Collectors.toMap(DescriptorProto::getName, Functions.identity())); assertEquals(4, nestedTypes.size()); - assertEquals(Type.TYPE_MESSAGE, types.get("nestedvalue1")); + assertEquals(FieldDescriptorProto.Type.TYPE_MESSAGE, types.get("nestedvalue1")); String nestedTypeName1 = typeNames.get("nestedvalue1"); - Map nestedTypes1 = + Map nestedTypes1 = nestedTypes.get(nestedTypeName1).getFieldList().stream() .collect( Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); assertEquals(expectedBaseTypes, nestedTypes1); - assertEquals(Type.TYPE_MESSAGE, types.get("nestedvalue2")); + assertEquals(FieldDescriptorProto.Type.TYPE_MESSAGE, types.get("nestedvalue2")); String nestedTypeName2 = typeNames.get("nestedvalue2"); - Map nestedTypes2 = + Map nestedTypes2 = nestedTypes.get(nestedTypeName2).getFieldList().stream() .collect( Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); assertEquals(expectedBaseTypes, nestedTypes2); - assertEquals(Type.TYPE_MESSAGE, types.get("nestedvaluenof1")); + assertEquals(FieldDescriptorProto.Type.TYPE_MESSAGE, types.get("nestedvaluenof1")); String nestedTypeNameNoF1 = typeNames.get("nestedvaluenof1"); - Map nestedTypesNoF1 = + Map nestedTypesNoF1 = nestedTypes.get(nestedTypeNameNoF1).getFieldList().stream() .collect( Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); assertEquals(expectedBaseTypesNoF, nestedTypesNoF1); - assertEquals(Type.TYPE_MESSAGE, types.get("nestedvaluenof2")); + assertEquals(FieldDescriptorProto.Type.TYPE_MESSAGE, types.get("nestedvaluenof2")); String nestedTypeNameNoF2 = typeNames.get("nestedvaluenof2"); - Map nestedTypesNoF2 = + Map nestedTypesNoF2 = nestedTypes.get(nestedTypeNameNoF2).getFieldList().stream() .collect( Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); @@ -1184,13 +1183,13 @@ public void testNestedFromTableSchema() throws Exception { FieldDescriptorProto.newBuilder() .setName("seconds") .setNumber(1) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL)) .addField( FieldDescriptorProto.newBuilder() .setName("picoseconds") .setNumber(2) - .setType(Type.TYPE_INT64) + .setType(FieldDescriptorProto.Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL)) .build(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableSharedClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableSharedClientTest.java index 0b99e0aab5c7..3cdba0d9f25a 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableSharedClientTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableSharedClientTest.java @@ -31,7 +31,7 @@ import com.google.bigtable.v2.PingAndWarmResponse; import com.google.bigtable.v2.ReadRowsRequest; import com.google.bigtable.v2.ReadRowsResponse; -import com.google.cloud.bigtable.data.v2.BigtableDataSettings.Builder; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; import com.google.cloud.bigtable.data.v2.stub.metrics.NoopMetricsProvider; import com.google.protobuf.ByteString; import com.google.rpc.Code; @@ -225,10 +225,12 @@ public void processElement( /** Overrides the default settings to ensure 1 channel per client. */ public static class ClientSettingsOverride - implements BiFunction { + implements BiFunction< + BigtableDataSettings.Builder, PipelineOptions, BigtableDataSettings.Builder> { @Override - public Builder apply(Builder builder, PipelineOptions pipelineOptions) { + public BigtableDataSettings.Builder apply( + BigtableDataSettings.Builder builder, PipelineOptions pipelineOptions) { InstantiatingGrpcChannelProvider oldTransport = (InstantiatingGrpcChannelProvider) builder.stubSettings().getTransportChannelProvider(); diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/data/SnowflakeDataType.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/data/SnowflakeDataType.java index e15194b4356d..447b6f6be553 100644 --- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/data/SnowflakeDataType.java +++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/data/SnowflakeDataType.java @@ -19,7 +19,6 @@ import java.io.Serializable; import net.snowflake.client.jdbc.internal.fasterxml.jackson.annotation.JsonSubTypes; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.annotation.JsonSubTypes.Type; import net.snowflake.client.jdbc.internal.fasterxml.jackson.annotation.JsonTypeInfo; import org.apache.beam.sdk.io.snowflake.data.datetime.SnowflakeDate; import org.apache.beam.sdk.io.snowflake.data.datetime.SnowflakeDateTime; @@ -50,31 +49,31 @@ /** Interface for data types to provide SQLs for themselves. */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type") @JsonSubTypes({ - @Type(value = SnowflakeDate.class, name = "date"), - @Type(value = SnowflakeDateTime.class, name = "datetime"), - @Type(value = SnowflakeTime.class, name = "time"), - @Type(value = SnowflakeTimestamp.class, name = "timestamp"), - @Type(value = SnowflakeTimestampLTZ.class, name = "timestamp_ltz"), - @Type(value = SnowflakeTimestampNTZ.class, name = "timestamp_ntz"), - @Type(value = SnowflakeTimestampTZ.class, name = "timestamp_tz"), - @Type(value = SnowflakeBoolean.class, name = "boolean"), - @Type(value = SnowflakeDecimal.class, name = "decimal"), - @Type(value = SnowflakeDouble.class, name = "double"), - @Type(value = SnowflakeFloat.class, name = "float"), - @Type(value = SnowflakeInteger.class, name = "integer"), - @Type(value = SnowflakeNumber.class, name = "number"), - @Type(value = SnowflakeNumeric.class, name = "numeric"), - @Type(value = SnowflakeReal.class, name = "real"), - @Type(value = SnowflakeArray.class, name = "array"), - @Type(value = SnowflakeObject.class, name = "object"), - @Type(value = SnowflakeVariant.class, name = "variant"), - @Type(value = SnowflakeBinary.class, name = "binary"), - @Type(value = SnowflakeChar.class, name = "char"), - @Type(value = SnowflakeString.class, name = "string"), - @Type(value = SnowflakeText.class, name = "text"), - @Type(value = SnowflakeVarBinary.class, name = "varbinary"), - @Type(value = SnowflakeVarchar.class, name = "varchar"), - @Type(value = SnowflakeGeography.class, name = "geography"), + @JsonSubTypes.Type(value = SnowflakeDate.class, name = "date"), + @JsonSubTypes.Type(value = SnowflakeDateTime.class, name = "datetime"), + @JsonSubTypes.Type(value = SnowflakeTime.class, name = "time"), + @JsonSubTypes.Type(value = SnowflakeTimestamp.class, name = "timestamp"), + @JsonSubTypes.Type(value = SnowflakeTimestampLTZ.class, name = "timestamp_ltz"), + @JsonSubTypes.Type(value = SnowflakeTimestampNTZ.class, name = "timestamp_ntz"), + @JsonSubTypes.Type(value = SnowflakeTimestampTZ.class, name = "timestamp_tz"), + @JsonSubTypes.Type(value = SnowflakeBoolean.class, name = "boolean"), + @JsonSubTypes.Type(value = SnowflakeDecimal.class, name = "decimal"), + @JsonSubTypes.Type(value = SnowflakeDouble.class, name = "double"), + @JsonSubTypes.Type(value = SnowflakeFloat.class, name = "float"), + @JsonSubTypes.Type(value = SnowflakeInteger.class, name = "integer"), + @JsonSubTypes.Type(value = SnowflakeNumber.class, name = "number"), + @JsonSubTypes.Type(value = SnowflakeNumeric.class, name = "numeric"), + @JsonSubTypes.Type(value = SnowflakeReal.class, name = "real"), + @JsonSubTypes.Type(value = SnowflakeArray.class, name = "array"), + @JsonSubTypes.Type(value = SnowflakeObject.class, name = "object"), + @JsonSubTypes.Type(value = SnowflakeVariant.class, name = "variant"), + @JsonSubTypes.Type(value = SnowflakeBinary.class, name = "binary"), + @JsonSubTypes.Type(value = SnowflakeChar.class, name = "char"), + @JsonSubTypes.Type(value = SnowflakeString.class, name = "string"), + @JsonSubTypes.Type(value = SnowflakeText.class, name = "text"), + @JsonSubTypes.Type(value = SnowflakeVarBinary.class, name = "varbinary"), + @JsonSubTypes.Type(value = SnowflakeVarchar.class, name = "varchar"), + @JsonSubTypes.Type(value = SnowflakeGeography.class, name = "geography"), }) public interface SnowflakeDataType extends Serializable { String sql();