From 3c49045c933f3f628e9ca1441779f2610352fbe1 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Tue, 30 Dec 2025 17:12:12 +0800 Subject: [PATCH] refactor(avro): use polymorphism for reader/writer backends Refactor AvroReader and AvroWriter to use the strategy pattern for handling different encoding/decoding backends. This replaces the previous if/else logic with dedicated backend implementations (Direct vs GenericDatum), improving code structure and maintainability. --- src/iceberg/avro/avro_reader.cc | 250 ++++++++++++++++++++------------ src/iceberg/avro/avro_writer.cc | 165 +++++++++++++-------- 2 files changed, 262 insertions(+), 153 deletions(-) diff --git a/src/iceberg/avro/avro_reader.cc b/src/iceberg/avro/avro_reader.cc index 964f6d1d4..aff906257 100644 --- a/src/iceberg/avro/avro_reader.cc +++ b/src/iceberg/avro/avro_reader.cc @@ -59,7 +59,138 @@ Result> CreateInputStream(const ReaderOptions& return std::make_unique(file, buffer_size); } -} // namespace +// Abstract base class for Avro read backends. +class AvroReadBackend { + public: + virtual ~AvroReadBackend() = default; + virtual Result<::avro::ValidSchema> Init( + std::unique_ptr input_stream) = 0; + virtual Status InitWithSchema(const ::avro::ValidSchema& file_schema, + const std::optional& split) = 0; + virtual void InitReadContext(const ::avro::ValidSchema& reader_schema) = 0; + virtual bool HasMore() = 0; + virtual Status DecodeNext(const SchemaProjection& projection, const Schema& read_schema, + ::arrow::ArrayBuilder* builder) = 0; + virtual bool IsPastSync(int64_t split_end) const = 0; + virtual const ::avro::Metadata& GetMetadata() const = 0; + virtual const ::avro::ValidSchema& GetReaderSchema() const = 0; + virtual void Close() = 0; + virtual bool Closed() const = 0; +}; + +// Backend implementation using direct Avro decoder. +class DirectDecoderBackend : public AvroReadBackend { + public: + Result<::avro::ValidSchema> Init( + std::unique_ptr input_stream) override { + reader_ = std::make_unique<::avro::DataFileReaderBase>(std::move(input_stream)); + return reader_->dataSchema(); + } + + Status InitWithSchema(const ::avro::ValidSchema& file_schema, + const std::optional& split) override { + reader_->init(file_schema); + if (split) { + reader_->sync(split->offset); + } + return {}; + } + + void InitReadContext(const ::avro::ValidSchema&) override {} + + bool HasMore() override { return reader_->hasMore(); } + + Status DecodeNext(const SchemaProjection& projection, const Schema& read_schema, + ::arrow::ArrayBuilder* builder) override { + reader_->decr(); + return DecodeAvroToBuilder(GetReaderSchema().root(), reader_->decoder(), projection, + read_schema, builder, decode_context_); + } + + bool IsPastSync(int64_t split_end) const override { + return reader_->pastSync(split_end); + } + + const ::avro::Metadata& GetMetadata() const override { return reader_->metadata(); } + + const ::avro::ValidSchema& GetReaderSchema() const override { + return reader_->readerSchema(); + } + + void Close() override { + if (reader_) { + reader_->close(); + reader_.reset(); + } + } + + bool Closed() const override { return reader_ == nullptr; } + + private: + std::unique_ptr<::avro::DataFileReaderBase> reader_; + // Decode context for reusing scratch buffers + DecodeContext decode_context_; +}; + +// Backend implementation using avro::GenericDatum. +class GenericDatumBackend : public AvroReadBackend { + public: + Result<::avro::ValidSchema> Init( + std::unique_ptr input_stream) override { + reader_ = std::make_unique<::avro::DataFileReader<::avro::GenericDatum>>( + std::move(input_stream)); + return reader_->dataSchema(); + } + + Status InitWithSchema(const ::avro::ValidSchema& /*file_schema*/, + const std::optional& split) override { + if (split) { + reader_->sync(split->offset); + } + return {}; + } + + void InitReadContext(const ::avro::ValidSchema& reader_schema) override { + datum_ = std::make_unique<::avro::GenericDatum>(reader_schema); + } + + bool HasMore() override { + has_more_ = reader_->read(*datum_); + return has_more_; + } + + Status DecodeNext(const SchemaProjection& projection, const Schema& read_schema, + ::arrow::ArrayBuilder* builder) override { + return AppendDatumToBuilder(GetReaderSchema().root(), *datum_, projection, + read_schema, builder); + } + + bool IsPastSync(int64_t split_end) const override { + return reader_->pastSync(split_end); + } + + const ::avro::Metadata& GetMetadata() const override { return reader_->metadata(); } + + const ::avro::ValidSchema& GetReaderSchema() const override { + return reader_->readerSchema(); + } + + void Close() override { + if (reader_) { + reader_->close(); + reader_.reset(); + } + } + + bool Closed() const override { return reader_ == nullptr; } + + private: + std::unique_ptr<::avro::DataFileReader<::avro::GenericDatum>> reader_; + // Reusable GenericDatum for reading records + std::unique_ptr<::avro::GenericDatum> datum_; + // Cached result from HasMore() + bool has_more_ = false; +}; // A stateful context to keep track of the reading progress. struct ReadContext { @@ -67,14 +198,10 @@ struct ReadContext { std::shared_ptr<::arrow::Schema> arrow_schema_; // The builder to build the record batch. std::shared_ptr<::arrow::ArrayBuilder> builder_; - // GenericDatum for GenericDatum-based decoding (only used if direct decoder is - // disabled) - std::unique_ptr<::avro::GenericDatum> datum_; - // Decode context for reusing scratch buffers (only used if direct decoder is - // enabled) - DecodeContext decode_context_; }; +} // namespace + // TODO(gang.wu): collect basic reader metrics class AvroReader::Impl { public: @@ -85,7 +212,6 @@ class AvroReader::Impl { } batch_size_ = options.properties->Get(ReaderProperties::kBatchSize); - use_direct_decoder_ = options.properties->Get(ReaderProperties::kAvroSkipDatum); read_schema_ = options.projection; // Open the input stream and adapt to the avro interface. @@ -94,22 +220,15 @@ class AvroReader::Impl { CreateInputStream(options, options.properties->Get(ReaderProperties::kAvroBufferSize))); - ::avro::ValidSchema file_schema; - - if (use_direct_decoder_) { - // Create base reader for direct decoder access - auto base_reader = - std::make_unique<::avro::DataFileReaderBase>(std::move(input_stream)); - file_schema = base_reader->dataSchema(); - base_reader_ = std::move(base_reader); + // Create the appropriate backend based on configuration + if (options.properties->Get(ReaderProperties::kAvroSkipDatum)) { + backend_ = std::make_unique(); } else { - // Create DataFileReader for GenericDatum-based decoding - auto datum_reader = std::make_unique<::avro::DataFileReader<::avro::GenericDatum>>( - std::move(input_stream)); - file_schema = datum_reader->dataSchema(); - datum_reader_ = std::move(datum_reader); + backend_ = std::make_unique(); } + ICEBERG_ASSIGN_OR_RAISE(auto file_schema, backend_->Init(std::move(input_stream))); + // Validate field ids in the file schema. HasIdVisitor has_id_visitor; ICEBERG_RETURN_UNEXPECTED(has_id_visitor.Visit(file_schema)); @@ -132,23 +251,13 @@ class AvroReader::Impl { } // Project the read schema on top of the file schema. - // TODO(gangwu): support pruning source fields ICEBERG_ASSIGN_OR_RAISE(projection_, Project(*read_schema_, file_schema.root(), /*prune_source=*/false)); - if (use_direct_decoder_) { - // Initialize the base reader with the file schema - base_reader_->init(file_schema); - if (options.split) { - base_reader_->sync(options.split->offset); - split_end_ = options.split->offset + options.split->length; - } - } else { - // The datum reader is already initialized during construction - if (options.split) { - datum_reader_->sync(options.split->offset); - split_end_ = options.split->offset + options.split->length; - } + ICEBERG_RETURN_UNEXPECTED(backend_->InitWithSchema(file_schema, options.split)); + + if (options.split) { + split_end_ = options.split->offset + options.split->length; } return {}; @@ -163,34 +272,18 @@ class AvroReader::Impl { if (IsPastSync()) { break; } - - if (use_direct_decoder_) { - // Direct decoder: decode Avro to Arrow without GenericDatum - if (!base_reader_->hasMore()) { - break; - } - base_reader_->decr(); - - ICEBERG_RETURN_UNEXPECTED(DecodeAvroToBuilder( - GetReaderSchema().root(), base_reader_->decoder(), projection_, *read_schema_, - context_->builder_.get(), context_->decode_context_)); - } else { - // GenericDatum-based decoding: decode via GenericDatum intermediate - if (!datum_reader_->read(*context_->datum_)) { - break; - } - - ICEBERG_RETURN_UNEXPECTED( - AppendDatumToBuilder(GetReaderSchema().root(), *context_->datum_, projection_, - *read_schema_, context_->builder_.get())); + if (!backend_->HasMore()) { + break; } + ICEBERG_RETURN_UNEXPECTED( + backend_->DecodeNext(projection_, *read_schema_, context_->builder_.get())); } return ConvertBuilderToArrowArray(); } Status Close() { - CloseReader(); + backend_->Close(); context_.reset(); return {}; } @@ -209,12 +302,11 @@ class AvroReader::Impl { } Result> Metadata() { - if ((use_direct_decoder_ && !base_reader_) || - (!use_direct_decoder_ && !datum_reader_)) { + if (backend_->Closed()) { return Invalid("Reader is not opened"); } - const auto& metadata = GetReaderMetadata(); + const auto& metadata = backend_->GetMetadata(); std::unordered_map metadata_map; metadata_map.reserve(metadata.size()); @@ -247,11 +339,7 @@ class AvroReader::Impl { builder_result.status().message()); } context_->builder_ = builder_result.MoveValueUnsafe(); - - // Initialize GenericDatum for GenericDatum-based decoding - if (!use_direct_decoder_) { - context_->datum_ = std::make_unique<::avro::GenericDatum>(GetReaderSchema()); - } + backend_->InitReadContext(backend_->GetReaderSchema()); return {}; } @@ -281,48 +369,20 @@ class AvroReader::Impl { if (!split_end_) { return false; } - return use_direct_decoder_ ? base_reader_->pastSync(split_end_.value()) - : datum_reader_->pastSync(split_end_.value()); - } - - const ::avro::Metadata& GetReaderMetadata() const { - return use_direct_decoder_ ? base_reader_->metadata() : datum_reader_->metadata(); - } - - void CloseReader() { - if (use_direct_decoder_) { - if (base_reader_) { - base_reader_->close(); - base_reader_.reset(); - } - } else { - if (datum_reader_) { - datum_reader_->close(); - datum_reader_.reset(); - } - } - } - - const ::avro::ValidSchema& GetReaderSchema() const { - return use_direct_decoder_ ? base_reader_->readerSchema() - : datum_reader_->readerSchema(); + return backend_->IsPastSync(split_end_.value()); } private: // Max number of rows in the record batch to read. int64_t batch_size_{}; - // Whether to use direct decoder (true) or GenericDatum-based decoder (false). - bool use_direct_decoder_{true}; // The end of the split to read and used to terminate the reading. std::optional split_end_; // The schema to read. std::shared_ptr<::iceberg::Schema> read_schema_; // The projection result to apply to the read schema. SchemaProjection projection_; - // The avro reader base - provides direct access to decoder for direct decoding. - std::unique_ptr<::avro::DataFileReaderBase> base_reader_; - // The datum reader for GenericDatum-based decoding. - std::unique_ptr<::avro::DataFileReader<::avro::GenericDatum>> datum_reader_; + // The read backend to read data into Arrow. + std::unique_ptr backend_; // The context to keep track of the reading progress. std::unique_ptr context_; }; diff --git a/src/iceberg/avro/avro_writer.cc b/src/iceberg/avro/avro_writer.cc index 0c640231b..b426a756c 100644 --- a/src/iceberg/avro/avro_writer.cc +++ b/src/iceberg/avro/avro_writer.cc @@ -52,6 +52,95 @@ Result> CreateOutputStream(const WriterOptions return std::make_unique(output, buffer_size); } +// Abstract base class for Avro write backends. +class AvroWriteBackend { + public: + virtual ~AvroWriteBackend() = default; + virtual Status Init(std::unique_ptr output_stream, + const ::avro::ValidSchema& avro_schema, int64_t sync_interval, + const std::map>& metadata) = 0; + virtual Status WriteRow(const Schema& write_schema, const ::arrow::Array& array, + int64_t row_index) = 0; + virtual void Close() = 0; + virtual bool Closed() const = 0; +}; + +// Backend implementation using direct Avro encoder. +class DirectEncoderBackend : public AvroWriteBackend { + public: + Status Init(std::unique_ptr output_stream, + const ::avro::ValidSchema& avro_schema, int64_t sync_interval, + const std::map>& metadata) override { + writer_ = std::make_unique<::avro::DataFileWriterBase>(std::move(output_stream), + avro_schema, sync_interval, + ::avro::NULL_CODEC, metadata); + avro_root_node_ = avro_schema.root(); + return {}; + } + + Status WriteRow(const Schema& write_schema, const ::arrow::Array& array, + int64_t row_index) override { + ICEBERG_RETURN_UNEXPECTED(EncodeArrowToAvro(avro_root_node_, writer_->encoder(), + write_schema, array, row_index, + encode_ctx_)); + writer_->incr(); + return {}; + } + + void Close() override { + if (writer_) { + writer_->close(); + writer_.reset(); + } + } + + bool Closed() const override { return writer_ == nullptr; } + + private: + // Root node of the Avro schema + ::avro::NodePtr avro_root_node_; + // The avro writer using direct encoder + std::unique_ptr<::avro::DataFileWriterBase> writer_; + // Encode context for reusing scratch buffers + EncodeContext encode_ctx_; +}; + +// Backend implementation using avro::GenericDatum as the intermediate representation. +class GenericDatumBackend : public AvroWriteBackend { + public: + Status Init(std::unique_ptr output_stream, + const ::avro::ValidSchema& avro_schema, int64_t sync_interval, + const std::map>& metadata) override { + writer_ = std::make_unique<::avro::DataFileWriter<::avro::GenericDatum>>( + std::move(output_stream), avro_schema, sync_interval, ::avro::NULL_CODEC, + metadata); + datum_ = std::make_unique<::avro::GenericDatum>(avro_schema); + return {}; + } + + Status WriteRow(const Schema& /*write_schema*/, const ::arrow::Array& array, + int64_t row_index) override { + ICEBERG_RETURN_UNEXPECTED(ExtractDatumFromArray(array, row_index, datum_.get())); + writer_->write(*datum_); + return {}; + } + + void Close() override { + if (writer_) { + writer_->close(); + writer_.reset(); + } + } + + bool Closed() const override { return writer_ == nullptr; } + + private: + // The avro writer to write the data into a datum + std::unique_ptr<::avro::DataFileWriter<::avro::GenericDatum>> writer_; + // Reusable Avro datum for writing individual records + std::unique_ptr<::avro::GenericDatum> datum_; +}; + } // namespace class AvroWriter::Impl { @@ -64,7 +153,6 @@ class AvroWriter::Impl { Status Open(const WriterOptions& options) { write_schema_ = options.schema; - use_direct_encoder_ = options.properties->Get(WriterProperties::kAvroSkipDatum); ::avro::NodePtr root; ICEBERG_RETURN_UNEXPECTED(ToAvroNodeVisitor{}.Visit(*write_schema_, &root)); @@ -82,6 +170,7 @@ class AvroWriter::Impl { CreateOutputStream(options, options.properties->Get(WriterProperties::kAvroBufferSize))); arrow_output_stream_ = output_stream->arrow_output_stream(); + std::map> metadata; for (const auto& [key, value] : options.metadata) { std::vector vec; @@ -90,22 +179,17 @@ class AvroWriter::Impl { metadata.emplace(key, std::move(vec)); } - if (use_direct_encoder_) { - // Skip avro::GenericDatum by using encoder provided by DataFileWriterBase. - writer_base_ = std::make_unique<::avro::DataFileWriterBase>( - std::move(output_stream), *avro_schema_, - options.properties->Get(WriterProperties::kAvroSyncInterval), - ::avro::NULL_CODEC /*codec*/, metadata); - avro_root_node_ = avro_schema_->root(); + // Create the appropriate backend based on configuration + if (options.properties->Get(WriterProperties::kAvroSkipDatum)) { + backend_ = std::make_unique(); } else { - // Everything via avro::GenericDatum. - writer_datum_ = std::make_unique<::avro::DataFileWriter<::avro::GenericDatum>>( - std::move(output_stream), *avro_schema_, - options.properties->Get(WriterProperties::kAvroSyncInterval), - ::avro::NULL_CODEC /*codec*/, metadata); - datum_ = std::make_unique<::avro::GenericDatum>(*avro_schema_); + backend_ = std::make_unique(); } + ICEBERG_RETURN_UNEXPECTED(backend_->Init( + std::move(output_stream), *avro_schema_, + options.properties->Get(WriterProperties::kAvroSyncInterval), metadata)); + ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*write_schema_, &arrow_schema_)); return {}; } @@ -114,45 +198,23 @@ class AvroWriter::Impl { ICEBERG_ARROW_ASSIGN_OR_RETURN(auto result, ::arrow::ImportArray(data, &arrow_schema_)); - if (use_direct_encoder_) { - for (int64_t i = 0; i < result->length(); i++) { - ICEBERG_RETURN_UNEXPECTED( - EncodeArrowToAvro(avro_root_node_, writer_base_->encoder(), *write_schema_, - *result, i, encode_ctx_)); - writer_base_->incr(); - } - } else { - for (int64_t i = 0; i < result->length(); i++) { - ICEBERG_RETURN_UNEXPECTED(ExtractDatumFromArray(*result, i, datum_.get())); - writer_datum_->write(*datum_); - } + for (int64_t i = 0; i < result->length(); i++) { + ICEBERG_RETURN_UNEXPECTED(backend_->WriteRow(*write_schema_, *result, i)); } return {}; } Status Close() { - if (use_direct_encoder_) { - if (writer_base_ != nullptr) { - writer_base_->close(); - writer_base_.reset(); - ICEBERG_ARROW_ASSIGN_OR_RETURN(total_bytes_, arrow_output_stream_->Tell()); - ICEBERG_ARROW_RETURN_NOT_OK(arrow_output_stream_->Close()); - } - } else { - if (writer_datum_ != nullptr) { - writer_datum_->close(); - writer_datum_.reset(); - ICEBERG_ARROW_ASSIGN_OR_RETURN(total_bytes_, arrow_output_stream_->Tell()); - ICEBERG_ARROW_RETURN_NOT_OK(arrow_output_stream_->Close()); - } + if (!backend_->Closed()) { + backend_->Close(); + ICEBERG_ARROW_ASSIGN_OR_RETURN(total_bytes_, arrow_output_stream_->Tell()); + ICEBERG_ARROW_RETURN_NOT_OK(arrow_output_stream_->Close()); } return {}; } - bool Closed() const { - return use_direct_encoder_ ? writer_base_ == nullptr : writer_datum_ == nullptr; - } + bool Closed() const { return backend_->Closed(); } Result length() { if (Closed()) { @@ -174,21 +236,8 @@ class AvroWriter::Impl { ArrowSchema arrow_schema_; // Total length of the written Avro file. int64_t total_bytes_ = 0; - - // Flag to determine which encoder to use - bool use_direct_encoder_ = true; - - // [Encoder path] Root node of the Avro schema - ::avro::NodePtr avro_root_node_; - // [Encoder path] The avro writer using direct encoder - std::unique_ptr<::avro::DataFileWriterBase> writer_base_; - // [Encoder path] Encode context for reusing scratch buffers - EncodeContext encode_ctx_; - - // [GenericDatum path] The avro writer to write the data into a datum - std::unique_ptr<::avro::DataFileWriter<::avro::GenericDatum>> writer_datum_; - // [GenericDatum path] Reusable Avro datum for writing individual records - std::unique_ptr<::avro::GenericDatum> datum_; + // The write backend to write data. + std::unique_ptr backend_; }; AvroWriter::~AvroWriter() = default;