diff --git a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java index e43d5a76e0681..f855e316508d5 100644 --- a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java +++ b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java @@ -100,10 +100,14 @@ public DeserializationSchema createRuntimeDecoder( int[][] projections) { producedDataType = Projection.of(projections).project(producedDataType); final RowType rowType = (RowType) producedDataType.getLogicalType(); + // When no explicit schema is provided, pass null so that the + // writer schema from the registry is used for deserialization. + // This avoids schema resolution failures when Avro types (e.g. + // enums) are lossy-converted through Flink's type system. final Schema schema = schemaString .map(s -> getAvroSchema(s, rowType)) - .orElse(AvroSchemaConverter.convertToSchema(rowType)); + .orElse(null); final TypeInformation rowDataTypeInfo = context.createTypeInformation(producedDataType); return new AvroRowDataDeserializationSchema( diff --git a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactoryTest.java b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactoryTest.java index 6ace72b199ed8..1443a3d99dbfb 100644 --- a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactoryTest.java +++ b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactoryTest.java @@ -116,7 +116,7 @@ void testDeserializationSchema() { final AvroRowDataDeserializationSchema expectedDeser = new AvroRowDataDeserializationSchema( ConfluentRegistryAvroDeserializationSchema.forGeneric( - AvroSchemaConverter.convertToSchema(ROW_TYPE), REGISTRY_URL), + null, REGISTRY_URL), AvroToRowDataConverters.createRowConverter(ROW_TYPE), InternalTypeInfo.of(ROW_TYPE)); diff --git a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroRowDataSeDeSchemaTest.java b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroRowDataSeDeSchemaTest.java index a5046ac23c360..2ca514c51cbc5 100644 --- a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroRowDataSeDeSchemaTest.java +++ b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroRowDataSeDeSchemaTest.java @@ -38,13 +38,18 @@ import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.EncoderFactory; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Random; import static org.apache.flink.formats.avro.utils.AvroTestUtils.writeRecord; @@ -56,6 +61,22 @@ * schema registry avro. */ class RegistryAvroRowDataSeDeSchemaTest { + private static final String ENUM_SUBJECT = "enum-record-value"; + + private static final Schema ENUM_RECORD_SCHEMA = + new Schema.Parser() + .parse( + "{\"namespace\": \"org.apache.flink.formats.avro.generated\",\n" + + " \"type\": \"record\",\n" + + " \"name\": \"EnumRecord\",\n" + + " \"fields\": [\n" + + " {\"name\": \"name\", \"type\": \"string\"},\n" + + " {\"name\": \"color\", \"type\": [\"null\"," + + " {\"type\": \"enum\", \"name\": \"Colors\"," + + " \"symbols\": [\"RED\", \"GREEN\", \"BLUE\"]}]}\n" + + " ]\n" + + "}"); + private static final Schema ADDRESS_SCHEMA = Address.getClassSchema(); private static final Schema ADDRESS_SCHEMA_COMPATIBLE = @@ -90,6 +111,7 @@ void before() { @AfterEach void after() throws IOException, RestClientException { client.deleteSubject(SUBJECT); + client.deleteSubject(ENUM_SUBJECT); } @Test @@ -129,6 +151,30 @@ void testRowDataReadWithNonRegistryAvro() throws Exception { .hasCause(new IOException("Unknown data format. Magic number does not match")); } + @Test + void testRowDataReadWithEnumFieldAndNullReaderSchema() throws Exception { + DataType dataType = AvroSchemaConverter.convertToDataType(ENUM_RECORD_SCHEMA.toString()); + RowType rowType = (RowType) dataType.getLogicalType(); + + int schemaId = client.register(ENUM_SUBJECT, ENUM_RECORD_SCHEMA); + GenericRecord record = new GenericData.Record(ENUM_RECORD_SCHEMA); + record.put("name", "Alice"); + record.put( + "color", + new GenericData.EnumSymbol( + ENUM_RECORD_SCHEMA.getField("color").schema().getTypes().get(1), "RED")); + byte[] serialized = serializeWithRegistryFormat(record, ENUM_RECORD_SCHEMA, schemaId); + + AvroRowDataDeserializationSchema deserializer = + getDeserializationSchemaForSubject(rowType, null, ENUM_SUBJECT); + deserializer.open(null); + + RowData result = deserializer.deserialize(serialized); + assertThat(result.getArity()).isEqualTo(2); + assertThat(result.getString(0).toString()).isEqualTo("Alice"); + assertThat(result.getString(1).toString()).isEqualTo("RED"); + } + private void testRowDataWriteReadWithSchema(Schema schema) throws Exception { DataType dataType = AvroSchemaConverter.convertToDataType(schema.toString()); RowType rowType = (RowType) dataType.getLogicalType(); @@ -162,8 +208,13 @@ private void testRowDataWriteReadWithSchema(Schema schema) throws Exception { private static AvroRowDataSerializationSchema getSerializationSchema( RowType rowType, Schema avroSchema) { + return getSerializationSchemaForSubject(rowType, avroSchema, SUBJECT); + } + + private static AvroRowDataSerializationSchema getSerializationSchemaForSubject( + RowType rowType, Schema avroSchema, String subject) { ConfluentSchemaRegistryCoder registryCoder = - new ConfluentSchemaRegistryCoder(SUBJECT, client); + new ConfluentSchemaRegistryCoder(subject, client); return new AvroRowDataSerializationSchema( rowType, new RegistryAvroSerializationSchema( @@ -173,8 +224,13 @@ private static AvroRowDataSerializationSchema getSerializationSchema( private static AvroRowDataDeserializationSchema getDeserializationSchema( RowType rowType, Schema avroSchema) { + return getDeserializationSchemaForSubject(rowType, avroSchema, SUBJECT); + } + + private static AvroRowDataDeserializationSchema getDeserializationSchemaForSubject( + RowType rowType, Schema avroSchema, String subject) { ConfluentSchemaRegistryCoder registryCoder = - new ConfluentSchemaRegistryCoder(SUBJECT, client); + new ConfluentSchemaRegistryCoder(subject, client); return new AvroRowDataDeserializationSchema( new RegistryAvroDeserializationSchema( GenericRecord.class, avroSchema, () -> registryCoder), @@ -182,6 +238,17 @@ private static AvroRowDataDeserializationSchema getDeserializationSchema( InternalTypeInfo.of(rowType)); } + private static byte[] serializeWithRegistryFormat( + GenericRecord record, Schema schema, int schemaId) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + out.write(0); + out.write(ByteBuffer.allocate(4).putInt(schemaId).array()); + org.apache.avro.io.Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); + new GenericDatumWriter<>(schema).write(record, encoder); + encoder.flush(); + return out.toByteArray(); + } + private static RowData address2RowData(Address address) { GenericRowData rowData = new GenericRowData(5); rowData.setField(0, address.getNum()); diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java index 226f47a339b21..43752f5c8d7d1 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java @@ -172,7 +172,11 @@ public T deserialize(@Nullable byte[] message) throws IOException { // read record checkAvroInitialized(); inputStream.setBuffer(message); - Schema readerSchema = getReaderSchema(); + Schema readerSchema = + Preconditions.checkNotNull( + getReaderSchema(), + "Reader schema is required for non-registry deserialization. " + + "Use RegistryAvroDeserializationSchema for registry-based deserialization."); GenericDatumReader datumReader = getDatumReader(); datumReader.setSchema(readerSchema); @@ -198,16 +202,18 @@ void checkAvroInitialized() throws IOException { this.datumReader = new SpecificDatumReader<>(specificData); this.reader = AvroFactory.extractAvroSpecificSchema(recordClazz, specificData); } else { - this.reader = new Schema.Parser().parse(schemaString); + if (schemaString != null) { + this.reader = new Schema.Parser().parse(schemaString); + } GenericData genericData = new GenericData(cl); this.datumReader = new GenericDatumReader<>(null, this.reader, genericData); } this.inputStream = new MutableByteArrayInputStream(); - if (encoding == AvroEncoding.JSON) { + if (encoding == AvroEncoding.JSON && getReaderSchema() != null) { this.decoder = DecoderFactory.get().jsonDecoder(getReaderSchema(), inputStream); - } else { + } else if (encoding != AvroEncoding.JSON) { this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); } } @@ -223,7 +229,12 @@ public TypeInformation getProducedType() { if (SpecificRecord.class.isAssignableFrom(recordClazz)) { return new AvroTypeInfo(recordClazz); } else { - return (TypeInformation) new GenericRecordAvroTypeInfo(this.reader); + return (TypeInformation) + new GenericRecordAvroTypeInfo( + Preconditions.checkNotNull( + this.reader, + "Reader schema is required to derive TypeInformation. " + + "When using RegistryAvroDeserializationSchema, provide TypeInformation explicitly.")); } } diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java index 0310582f6cae2..0e2366f8ced9d 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java @@ -100,7 +100,7 @@ public T deserialize(@Nullable byte[] message) throws IOException { GenericDatumReader datumReader = getDatumReader(); datumReader.setSchema(writerSchema); - datumReader.setExpected(readerSchema); + datumReader.setExpected(readerSchema != null ? readerSchema : writerSchema); if (getEncoding() == AvroEncoding.JSON) { ((JsonDecoder) getDecoder()).configure(getInputStream());