Skip to content

Reading pages with large statistics with an IndexedPageReader fails #198

@tjwilson90

Description

@tjwilson90

IndexedPageReader puts a hard limit of 1MB on the size of the page headers it can deserialize https://github.com/jorgecarleitao/parquet2/blob/main/src/read/page/indexed_reader.rs#L63

If a page contains a value larger than 512KB and is written out with statistics, the page header will be larger than 1MB.

This is not a problem when using an unfiltered PageReader whose limit scales with the size of the page https://github.com/jorgecarleitao/arrow2/blob/main/src/io/parquet/read/row_group.rs#L240

This would not be much of an issue to me if I could disable statistics per field, but parquet2 can only either write all pages with statistics or none of them https://github.com/jorgecarleitao/parquet2/blob/main/src/write/indexes/serialize.rs#L47

Repro

use arrow2::array::Utf8Array;
use arrow2::chunk::Chunk;
use arrow2::datatypes::{DataType, Field, Schema};
use arrow2::io::parquet::read;
use arrow2::io::parquet::write::{
    transverse, CompressionOptions, Encoding, FileWriter, RowGroupIterator, Version, WriteOptions,
};
use parquet2::indexes;
use parquet2::indexes::Interval;
use std::error::Error;
use std::fs::File;

#[test]
fn write_large_statistics() -> Result<(), Box<dyn Error>> {
    let array = Utf8Array::<i32>::from_slice(["foo".repeat(1_000_000)]);
    let field = Field::new("strings", DataType::Utf8, false);
    let schema = Schema::from(vec![field.clone()]);
    let chunk = Chunk::new(vec![array.boxed()]);

    let options = WriteOptions {
        write_statistics: true,
        compression: CompressionOptions::Uncompressed,
        version: Version::V2,
    };

    let iter = vec![Ok(chunk)];

    let encodings = schema
        .fields
        .iter()
        .map(|f| transverse(&f.data_type, |_| Encoding::Plain))
        .collect();

    let row_groups = RowGroupIterator::try_new(iter.into_iter(), &schema, options, encodings)?;

    let path = "large_statistics.parquet";
    let mut file = File::create(path)?;
    let mut writer = FileWriter::try_new(&mut file, schema, options)?;
    for group in row_groups {
        writer.write(group?)?;
    }
    writer.end(None)?;

    let mut reader = File::open(path)?;
    let metadata = read::read_metadata(&mut reader)?;
    let target_group = &metadata.row_groups[0];
    let intervals = vec![Interval {
        start: 0,
        length: target_group.num_rows(),
    }];
    let locations = read::read_pages_locations(&mut reader, target_group.columns())?;
    let columns = read::read_columns(&mut reader, target_group.columns(), &field.name)?;
    let field_pages = read::get_field_pages(target_group.columns(), &locations, &field.name);
    let filtered_pages = field_pages
        .into_iter()
        .map(|field_page| indexes::select_pages(&intervals, field_page, target_group.num_rows()))
        .collect::<Result<Vec<_>, _>>()?;

    let mut iter = read::to_deserializer(
        columns,
        field,
        target_group.num_rows(),
        None,
        Some(filtered_pages),
    )?;
    let array = iter.next().unwrap()?;
    println!("{:?}", array);
    Ok(())
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions