Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
250 changes: 155 additions & 95 deletions src/iceberg/avro/avro_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,149 @@ Result<std::unique_ptr<AvroInputStream>> CreateInputStream(const ReaderOptions&
return std::make_unique<AvroInputStream>(file, buffer_size);
}

} // namespace
// Abstract base class for Avro read backends.
class AvroReadBackend {
public:
virtual ~AvroReadBackend() = default;
virtual Result<::avro::ValidSchema> Init(
std::unique_ptr<AvroInputStream> input_stream) = 0;
virtual Status InitWithSchema(const ::avro::ValidSchema& file_schema,
const std::optional<Split>& split) = 0;
virtual void InitReadContext(const ::avro::ValidSchema& reader_schema) = 0;
virtual bool HasMore() = 0;
virtual Status DecodeNext(const SchemaProjection& projection, const Schema& read_schema,
::arrow::ArrayBuilder* builder) = 0;
virtual bool IsPastSync(int64_t split_end) const = 0;
virtual const ::avro::Metadata& GetMetadata() const = 0;
virtual const ::avro::ValidSchema& GetReaderSchema() const = 0;
virtual void Close() = 0;
virtual bool Closed() const = 0;
};

// Backend implementation using direct Avro decoder.
class DirectDecoderBackend : public AvroReadBackend {
public:
Result<::avro::ValidSchema> Init(
std::unique_ptr<AvroInputStream> input_stream) override {
reader_ = std::make_unique<::avro::DataFileReaderBase>(std::move(input_stream));
return reader_->dataSchema();
}

Status InitWithSchema(const ::avro::ValidSchema& file_schema,
const std::optional<Split>& split) override {
reader_->init(file_schema);
if (split) {
reader_->sync(split->offset);
}
return {};
}

void InitReadContext(const ::avro::ValidSchema&) override {}

bool HasMore() override { return reader_->hasMore(); }

Status DecodeNext(const SchemaProjection& projection, const Schema& read_schema,
::arrow::ArrayBuilder* builder) override {
reader_->decr();
return DecodeAvroToBuilder(GetReaderSchema().root(), reader_->decoder(), projection,
read_schema, builder, decode_context_);
}

bool IsPastSync(int64_t split_end) const override {
return reader_->pastSync(split_end);
}

const ::avro::Metadata& GetMetadata() const override { return reader_->metadata(); }

const ::avro::ValidSchema& GetReaderSchema() const override {
return reader_->readerSchema();
}

void Close() override {
if (reader_) {
reader_->close();
reader_.reset();
}
}

bool Closed() const override { return reader_ == nullptr; }

private:
std::unique_ptr<::avro::DataFileReaderBase> reader_;
// Decode context for reusing scratch buffers
DecodeContext decode_context_;
};

// Backend implementation using avro::GenericDatum.
class GenericDatumBackend : public AvroReadBackend {
public:
Result<::avro::ValidSchema> Init(
std::unique_ptr<AvroInputStream> input_stream) override {
reader_ = std::make_unique<::avro::DataFileReader<::avro::GenericDatum>>(
std::move(input_stream));
return reader_->dataSchema();
}

Status InitWithSchema(const ::avro::ValidSchema& /*file_schema*/,
const std::optional<Split>& split) override {
if (split) {
reader_->sync(split->offset);
}
return {};
}

void InitReadContext(const ::avro::ValidSchema& reader_schema) override {
datum_ = std::make_unique<::avro::GenericDatum>(reader_schema);
}

bool HasMore() override {
has_more_ = reader_->read(*datum_);
return has_more_;
}

Status DecodeNext(const SchemaProjection& projection, const Schema& read_schema,
::arrow::ArrayBuilder* builder) override {
return AppendDatumToBuilder(GetReaderSchema().root(), *datum_, projection,
read_schema, builder);
}

bool IsPastSync(int64_t split_end) const override {
return reader_->pastSync(split_end);
}

const ::avro::Metadata& GetMetadata() const override { return reader_->metadata(); }

const ::avro::ValidSchema& GetReaderSchema() const override {
return reader_->readerSchema();
}

void Close() override {
if (reader_) {
reader_->close();
reader_.reset();
}
}

bool Closed() const override { return reader_ == nullptr; }

private:
std::unique_ptr<::avro::DataFileReader<::avro::GenericDatum>> reader_;
// Reusable GenericDatum for reading records
std::unique_ptr<::avro::GenericDatum> datum_;
// Cached result from HasMore()
bool has_more_ = false;
};

// A stateful context to keep track of the reading progress.
struct ReadContext {
// The arrow schema to build the record batch.
std::shared_ptr<::arrow::Schema> arrow_schema_;
// The builder to build the record batch.
std::shared_ptr<::arrow::ArrayBuilder> builder_;
// GenericDatum for GenericDatum-based decoding (only used if direct decoder is
// disabled)
std::unique_ptr<::avro::GenericDatum> datum_;
// Decode context for reusing scratch buffers (only used if direct decoder is
// enabled)
DecodeContext decode_context_;
};

} // namespace

// TODO(gang.wu): collect basic reader metrics
class AvroReader::Impl {
public:
Expand All @@ -85,7 +212,6 @@ class AvroReader::Impl {
}

batch_size_ = options.properties->Get(ReaderProperties::kBatchSize);
use_direct_decoder_ = options.properties->Get(ReaderProperties::kAvroSkipDatum);
read_schema_ = options.projection;

// Open the input stream and adapt to the avro interface.
Expand All @@ -94,22 +220,15 @@ class AvroReader::Impl {
CreateInputStream(options,
options.properties->Get(ReaderProperties::kAvroBufferSize)));

::avro::ValidSchema file_schema;

if (use_direct_decoder_) {
// Create base reader for direct decoder access
auto base_reader =
std::make_unique<::avro::DataFileReaderBase>(std::move(input_stream));
file_schema = base_reader->dataSchema();
base_reader_ = std::move(base_reader);
// Create the appropriate backend based on configuration
if (options.properties->Get(ReaderProperties::kAvroSkipDatum)) {
backend_ = std::make_unique<DirectDecoderBackend>();
} else {
// Create DataFileReader<GenericDatum> for GenericDatum-based decoding
auto datum_reader = std::make_unique<::avro::DataFileReader<::avro::GenericDatum>>(
std::move(input_stream));
file_schema = datum_reader->dataSchema();
datum_reader_ = std::move(datum_reader);
backend_ = std::make_unique<GenericDatumBackend>();
}

ICEBERG_ASSIGN_OR_RAISE(auto file_schema, backend_->Init(std::move(input_stream)));

// Validate field ids in the file schema.
HasIdVisitor has_id_visitor;
ICEBERG_RETURN_UNEXPECTED(has_id_visitor.Visit(file_schema));
Expand All @@ -132,23 +251,13 @@ class AvroReader::Impl {
}

// Project the read schema on top of the file schema.
// TODO(gangwu): support pruning source fields
ICEBERG_ASSIGN_OR_RAISE(projection_, Project(*read_schema_, file_schema.root(),
/*prune_source=*/false));

if (use_direct_decoder_) {
// Initialize the base reader with the file schema
base_reader_->init(file_schema);
if (options.split) {
base_reader_->sync(options.split->offset);
split_end_ = options.split->offset + options.split->length;
}
} else {
// The datum reader is already initialized during construction
if (options.split) {
datum_reader_->sync(options.split->offset);
split_end_ = options.split->offset + options.split->length;
}
ICEBERG_RETURN_UNEXPECTED(backend_->InitWithSchema(file_schema, options.split));

if (options.split) {
split_end_ = options.split->offset + options.split->length;
}

return {};
Expand All @@ -163,34 +272,18 @@ class AvroReader::Impl {
if (IsPastSync()) {
break;
}

if (use_direct_decoder_) {
// Direct decoder: decode Avro to Arrow without GenericDatum
if (!base_reader_->hasMore()) {
break;
}
base_reader_->decr();

ICEBERG_RETURN_UNEXPECTED(DecodeAvroToBuilder(
GetReaderSchema().root(), base_reader_->decoder(), projection_, *read_schema_,
context_->builder_.get(), context_->decode_context_));
} else {
// GenericDatum-based decoding: decode via GenericDatum intermediate
if (!datum_reader_->read(*context_->datum_)) {
break;
}

ICEBERG_RETURN_UNEXPECTED(
AppendDatumToBuilder(GetReaderSchema().root(), *context_->datum_, projection_,
*read_schema_, context_->builder_.get()));
if (!backend_->HasMore()) {
break;
}
ICEBERG_RETURN_UNEXPECTED(
backend_->DecodeNext(projection_, *read_schema_, context_->builder_.get()));
}

return ConvertBuilderToArrowArray();
}

Status Close() {
CloseReader();
backend_->Close();
context_.reset();
return {};
}
Expand All @@ -209,12 +302,11 @@ class AvroReader::Impl {
}

Result<std::unordered_map<std::string, std::string>> Metadata() {
if ((use_direct_decoder_ && !base_reader_) ||
(!use_direct_decoder_ && !datum_reader_)) {
if (backend_->Closed()) {
return Invalid("Reader is not opened");
}

const auto& metadata = GetReaderMetadata();
const auto& metadata = backend_->GetMetadata();
std::unordered_map<std::string, std::string> metadata_map;
metadata_map.reserve(metadata.size());

Expand Down Expand Up @@ -247,11 +339,7 @@ class AvroReader::Impl {
builder_result.status().message());
}
context_->builder_ = builder_result.MoveValueUnsafe();

// Initialize GenericDatum for GenericDatum-based decoding
if (!use_direct_decoder_) {
context_->datum_ = std::make_unique<::avro::GenericDatum>(GetReaderSchema());
}
backend_->InitReadContext(backend_->GetReaderSchema());

return {};
}
Expand Down Expand Up @@ -281,48 +369,20 @@ class AvroReader::Impl {
if (!split_end_) {
return false;
}
return use_direct_decoder_ ? base_reader_->pastSync(split_end_.value())
: datum_reader_->pastSync(split_end_.value());
}

const ::avro::Metadata& GetReaderMetadata() const {
return use_direct_decoder_ ? base_reader_->metadata() : datum_reader_->metadata();
}

void CloseReader() {
if (use_direct_decoder_) {
if (base_reader_) {
base_reader_->close();
base_reader_.reset();
}
} else {
if (datum_reader_) {
datum_reader_->close();
datum_reader_.reset();
}
}
}

const ::avro::ValidSchema& GetReaderSchema() const {
return use_direct_decoder_ ? base_reader_->readerSchema()
: datum_reader_->readerSchema();
return backend_->IsPastSync(split_end_.value());
}

private:
// Max number of rows in the record batch to read.
int64_t batch_size_{};
// Whether to use direct decoder (true) or GenericDatum-based decoder (false).
bool use_direct_decoder_{true};
// The end of the split to read and used to terminate the reading.
std::optional<int64_t> split_end_;
// The schema to read.
std::shared_ptr<::iceberg::Schema> read_schema_;
// The projection result to apply to the read schema.
SchemaProjection projection_;
// The avro reader base - provides direct access to decoder for direct decoding.
std::unique_ptr<::avro::DataFileReaderBase> base_reader_;
// The datum reader for GenericDatum-based decoding.
std::unique_ptr<::avro::DataFileReader<::avro::GenericDatum>> datum_reader_;
// The read backend to read data into Arrow.
std::unique_ptr<AvroReadBackend> backend_;
// The context to keep track of the reading progress.
std::unique_ptr<ReadContext> context_;
};
Expand Down
Loading
Loading