Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1534,7 +1534,6 @@ class BeamModulePlugin implements Plugin<Project> {
"AutoValueImmutableFields",
"AutoValueImmutableFields",
"AutoValueSubclassLeaked",
"BadImport",
"BigDecimalEquals",
"ComparableType",
"DoNotMockAutoValue",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution.State;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer.Type;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WatermarkHold;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkerMetadataResponse;
Expand Down Expand Up @@ -1643,7 +1642,7 @@ private Timer buildWatermarkTimer(String tagPrefix, long timestampMillis, boolea
Timer.Builder builder =
Timer.newBuilder()
.setTag(ByteString.copyFromUtf8(tagPrefix + ":" + timestampMillis))
.setType(Type.WATERMARK)
.setType(Timer.Type.WATERMARK)
.setStateFamily("MergeWindows");
if (!delete) {
builder.setTimestamp(timestampMillis * 1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
import org.apache.beam.runners.fnexecution.data.GrpcDataService;
import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory.Provider;
import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
Expand Down Expand Up @@ -190,7 +189,7 @@ public void createsMultipleEnvironmentOfSingleType() throws Exception {
.thenReturn(envFactoryB);
when(environmentProviderFactoryB.getServerFactory()).thenReturn(serverFactory);

Map<String, Provider> environmentFactoryProviderMap =
Map<String, EnvironmentFactory.Provider> environmentFactoryProviderMap =
ImmutableMap.of(
environmentA.getUrn(), environmentProviderFactoryA,
environmentB.getUrn(), environmentProviderFactoryB);
Expand Down Expand Up @@ -231,7 +230,7 @@ public void createsMultipleEnvironmentsWithSdkWorkerParallelism() throws Excepti
.thenReturn(envFactoryA);
when(environmentProviderFactoryA.getServerFactory()).thenReturn(serverFactory);

Map<String, Provider> environmentFactoryProviderMap =
Map<String, EnvironmentFactory.Provider> environmentFactoryProviderMap =
ImmutableMap.of(environmentA.getUrn(), environmentProviderFactoryA);

PortablePipelineOptions portableOptions =
Expand Down Expand Up @@ -309,7 +308,7 @@ public void creatingMultipleEnvironmentFromMultipleTypes() throws Exception {
.thenReturn(envFactoryB);
when(environmentProviderFactoryB.getServerFactory()).thenReturn(serverFactory);

Map<String, Provider> environmentFactoryProviderMap =
Map<String, EnvironmentFactory.Provider> environmentFactoryProviderMap =
ImmutableMap.of(
environmentA.getUrn(), environmentProviderFactoryA,
environmentB.getUrn(), environmentProviderFactoryB);
Expand Down Expand Up @@ -338,7 +337,7 @@ public void expiresEnvironment() throws Exception {
.thenReturn(envFactoryA);
when(environmentProviderFactoryA.getServerFactory()).thenReturn(serverFactory);

Map<String, Provider> environmentFactoryProviderMap =
Map<String, EnvironmentFactory.Provider> environmentFactoryProviderMap =
ImmutableMap.of(environmentA.getUrn(), environmentProviderFactoryA);

PortablePipelineOptions portableOptions =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.beam.model.jobmanagement.v1.JobApi;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobStateEvent;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
Expand Down Expand Up @@ -112,16 +111,16 @@ public void onSuccess(PortablePipelineResult pipelineResult) {

switch (state) {
case DONE:
setState(Enum.DONE);
setState(JobState.Enum.DONE);
break;
case RUNNING:
setState(Enum.RUNNING);
setState(JobState.Enum.RUNNING);
break;
case CANCELLED:
setState(Enum.CANCELLED);
setState(JobState.Enum.CANCELLED);
break;
case FAILED:
setState(Enum.FAILED);
setState(JobState.Enum.FAILED);
break;
default:
setState(JobState.Enum.UNSPECIFIED);
Expand Down Expand Up @@ -257,7 +256,7 @@ private synchronized void sendMessage(JobMessage message) {
}
}

public static Boolean isTerminated(Enum state) {
public static Boolean isTerminated(JobState.Enum state) {
switch (state) {
case DONE:
case FAILED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import static org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderFactory.invoke;
import static org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderFactory.invokeIfNotNull;
import static org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderFactory.newInstance;
import static org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.match;
import static org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.replace;
import static org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.seqOf;
Expand Down Expand Up @@ -417,7 +416,8 @@ private static <T> Expression serializeOneOfField(
private static <T> Expression deserializeOneOfField(Expression in, Encoder<T> enc, int idx) {
GetStructField field = new GetStructField(in, idx, Option.empty());
Expression litNull = lit(null, TUPLE2_TYPE);
Expression newTuple = newInstance(Tuple2.class, TUPLE2_TYPE, lit(idx), deserialize(field, enc));
Expression newTuple =
EncoderFactory.newInstance(Tuple2.class, TUPLE2_TYPE, lit(idx), deserialize(field, enc));
return new If(new IsNull(field), litNull, newTuple);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import java.util.concurrent.ExecutionException;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.spark.util.SideInputStorage.Key;
import org.apache.beam.runners.spark.util.SideInputStorage.Value;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
Expand Down Expand Up @@ -57,13 +55,13 @@ private CachedSideInputReader(SideInputReader delegate) {
@Override
public <T> @Nullable T get(PCollectionView<T> view, BoundedWindow window) {
@SuppressWarnings("unchecked")
final Cache<Key<T>, Value<T>> materializedCasted =
final Cache<SideInputStorage.Key<T>, SideInputStorage.Value<T>> materializedCasted =
(Cache) SideInputStorage.getMaterializedSideInputs();

Key<T> sideInputKey = new Key<>(view, window);
SideInputStorage.Key<T> sideInputKey = new SideInputStorage.Key<>(view, window);

try {
Value<T> cachedResult =
SideInputStorage.Value<T> cachedResult =
materializedCasted.get(
sideInputKey,
() -> {
Expand All @@ -73,7 +71,7 @@ private CachedSideInputReader(SideInputReader delegate) {
sideInputKey,
SizeEstimator.estimate(result));

return new Value<>(result);
return new SideInputStorage.Value<>(result);
});
return cachedResult.getValue();
} catch (ExecutionException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@

import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ClientInterceptor;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Metadata;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Metadata.Key;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.MetadataUtils;

/** A {@link ClientInterceptor} that attaches a provided SDK Harness ID to outgoing messages. */
public class AddHarnessIdInterceptor {
private static final Key<String> ID_KEY = Key.of("worker_id", Metadata.ASCII_STRING_MARSHALLER);
private static final Metadata.Key<String> ID_KEY =
Metadata.Key.of("worker_id", Metadata.ASCII_STRING_MARSHALLER);

public static ClientInterceptor create(String harnessId) {
checkArgument(harnessId != null, "harnessId must not be null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Context;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Contexts;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Metadata;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Metadata.Key;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ServerCall;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ServerCall.Listener;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ServerCallHandler;
Expand All @@ -36,8 +35,8 @@
})
public class GrpcContextHeaderAccessorProvider {

private static final Key<String> WORKER_ID_KEY =
Key.of("worker_id", Metadata.ASCII_STRING_MARSHALLER);
private static final Metadata.Key<String> WORKER_ID_KEY =
Metadata.Key.of("worker_id", Metadata.ASCII_STRING_MARSHALLER);
private static final Context.Key<String> SDK_WORKER_CONTEXT_KEY = Context.key("worker_id");
private static final GrpcHeaderAccessor HEADER_ACCESSOR = new GrpcHeaderAccessor();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.apache.avro.Conversions;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
Expand Down Expand Up @@ -238,15 +237,17 @@ public static TypeWithNullability create(org.apache.avro.Schema avroSchema) {
}

TypeWithNullability(org.apache.avro.Schema avroSchema) {
if (avroSchema.getType() == Type.UNION) {
if (avroSchema.getType() == org.apache.avro.Schema.Type.UNION) {
List<org.apache.avro.Schema> types = avroSchema.getTypes();

// optional fields in AVRO have form of:
// {"name": "foo", "type": ["null", "something"]}

// don't need recursion because nested unions aren't supported in AVRO
List<org.apache.avro.Schema> nonNullTypes =
types.stream().filter(x -> x.getType() != Type.NULL).collect(Collectors.toList());
types.stream()
.filter(x -> x.getType() != org.apache.avro.Schema.Type.NULL)
.collect(Collectors.toList());

if (nonNullTypes.size() == types.size() || nonNullTypes.isEmpty()) {
// union without `null` or all 'null' union, keep as is.
Expand Down Expand Up @@ -303,7 +304,7 @@ && checkNotNull(fieldType.getLogicalType())

/** Create a {@link FixedBytesField} from an AVRO type. */
public static @Nullable FixedBytesField fromAvroType(org.apache.avro.Schema type) {
if (type.getType().equals(Type.FIXED)) {
if (type.getType().equals(org.apache.avro.Schema.Type.FIXED)) {
return new FixedBytesField(type.getFixedSize());
} else {
return null;
Expand Down Expand Up @@ -672,7 +673,9 @@ public static <T> SerializableFunction<Row, T> getFromRowFunction(Class<T> clazz
public static @Nullable <T> Schema getSchema(
Class<T> clazz, org.apache.avro.@Nullable Schema schema) {
if (schema != null) {
return schema.getType().equals(Type.RECORD) ? toBeamSchema(schema) : null;
return schema.getType().equals(org.apache.avro.Schema.Type.RECORD)
? toBeamSchema(schema)
: null;
}
if (GenericRecord.class.equals(clazz)) {
throw new IllegalArgumentException("No schema provided for getSchema(GenericRecord)");
Expand Down Expand Up @@ -1118,44 +1121,45 @@ private static org.apache.avro.Schema getFieldSchema(
case BYTE:
case INT16:
case INT32:
baseType = org.apache.avro.Schema.create(Type.INT);
baseType = org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT);
break;

case INT64:
baseType = org.apache.avro.Schema.create(Type.LONG);
baseType = org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG);
break;

case DECIMAL:
baseType =
LogicalTypes.decimal(Integer.MAX_VALUE)
.addToSchema(org.apache.avro.Schema.create(Type.BYTES));
.addToSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES));
break;

case FLOAT:
baseType = org.apache.avro.Schema.create(Type.FLOAT);
baseType = org.apache.avro.Schema.create(org.apache.avro.Schema.Type.FLOAT);
break;

case DOUBLE:
baseType = org.apache.avro.Schema.create(Type.DOUBLE);
baseType = org.apache.avro.Schema.create(org.apache.avro.Schema.Type.DOUBLE);
break;

case STRING:
baseType = org.apache.avro.Schema.create(Type.STRING);
baseType = org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING);
break;

case DATETIME:
// TODO: There is a desire to move Beam schema DATETIME to a micros representation. When
// this is done, this logical type needs to be changed.
baseType =
LogicalTypes.timestampMillis().addToSchema(org.apache.avro.Schema.create(Type.LONG));
LogicalTypes.timestampMillis()
.addToSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG));
break;

case BOOLEAN:
baseType = org.apache.avro.Schema.create(Type.BOOLEAN);
baseType = org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BOOLEAN);
break;

case BYTES:
baseType = org.apache.avro.Schema.create(Type.BYTES);
baseType = org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES);
break;

case LOGICAL_TYPE:
Expand All @@ -1167,7 +1171,7 @@ private static org.apache.avro.Schema getFieldSchema(
baseType = fixedBytesField.toAvroType("fixed", namespace + "." + fieldName);
} else if (VariableBytes.IDENTIFIER.equals(identifier)) {
// treat VARBINARY as bytes as that is what avro supports
baseType = org.apache.avro.Schema.create(Type.BYTES);
baseType = org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES);
} else if (FixedString.IDENTIFIER.equals(identifier)
|| "CHAR".equals(identifier)
|| "NCHAR".equals(identifier)) {
Expand All @@ -1190,19 +1194,24 @@ private static org.apache.avro.Schema getFieldSchema(
.map(x -> getFieldSchema(x.getType(), x.getName(), namespace))
.collect(Collectors.toList()));
} else if ("DATE".equals(identifier) || SqlTypes.DATE.getIdentifier().equals(identifier)) {
baseType = LogicalTypes.date().addToSchema(org.apache.avro.Schema.create(Type.INT));
baseType =
LogicalTypes.date()
.addToSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT));
} else if ("TIME".equals(identifier)) {
baseType = LogicalTypes.timeMillis().addToSchema(org.apache.avro.Schema.create(Type.INT));
baseType =
LogicalTypes.timeMillis()
.addToSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT));
} else if (SqlTypes.TIMESTAMP.getIdentifier().equals(identifier)) {
baseType =
LogicalTypes.timestampMicros().addToSchema(org.apache.avro.Schema.create(Type.LONG));
LogicalTypes.timestampMicros()
.addToSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG));
} else if (Timestamp.IDENTIFIER.equals(identifier)) {
int precision = checkNotNull(logicalType.getArgument());
if (precision != 9) {
throw new RuntimeException(
"Timestamp logical type precision not supported:" + precision);
}
baseType = org.apache.avro.Schema.create(Type.LONG);
baseType = org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG);
baseType.addProp("logicalType", TIMESTAMP_NANOS_LOGICAL_TYPE);
} else {
throw new RuntimeException(
Expand Down Expand Up @@ -1242,10 +1251,11 @@ private static org.apache.avro.Schema getFieldSchema(
private static final Map<org.apache.avro.Schema, Function<Number, ? extends Number>>
NUMERIC_CONVERTERS =
ImmutableMap.of(
org.apache.avro.Schema.create(Type.INT), Number::intValue,
org.apache.avro.Schema.create(Type.LONG), Number::longValue,
org.apache.avro.Schema.create(Type.FLOAT), Number::floatValue,
org.apache.avro.Schema.create(Type.DOUBLE), Number::doubleValue);
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT), Number::intValue,
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG), Number::longValue,
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.FLOAT), Number::floatValue,
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.DOUBLE),
Number::doubleValue);

/** Convert a value from Beam Row to a vlue used for Avro GenericRecord. */
private static @Nullable Object genericFromBeamField(
Expand Down Expand Up @@ -1290,10 +1300,10 @@ private static org.apache.avro.Schema getFieldSchema(
return result;

case DATETIME:
if (typeWithNullability.type.getType() == Type.INT) {
if (typeWithNullability.type.getType() == org.apache.avro.Schema.Type.INT) {
ReadableInstant instant = (ReadableInstant) value;
return (int) Days.daysBetween(Instant.EPOCH, instant).getDays();
} else if (typeWithNullability.type.getType() == Type.LONG) {
} else if (typeWithNullability.type.getType() == org.apache.avro.Schema.Type.LONG) {
ReadableInstant instant = (ReadableInstant) value;
return (long) instant.getMillis();
} else {
Expand Down Expand Up @@ -1418,7 +1428,7 @@ private static Object convertLogicalType(

// TODO: Remove this workaround once Avro is upgraded to 1.12+ where timestamp-nanos
if (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(type.type.getProp("logicalType"))) {
if (type.type.getType() == Type.LONG) {
if (type.type.getType() == org.apache.avro.Schema.Type.LONG) {
Long nanos = (Long) value;
// Check if Beam expects Timestamp logical type
if (fieldType.getTypeName() == TypeName.LOGICAL_TYPE
Expand Down
Loading
Loading