diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java index 2b3e47116c..ef9a386314 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java @@ -773,7 +773,7 @@ private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) { return new NullIntIterator(); } return new RLEIntIterator(new RunLengthBitPackingHybridDecoder( - BytesUtils.getWidthFromMaxInt(maxLevel), bytes.toInputStream())); + BytesUtils.getWidthFromMaxInt(maxLevel), bytes.toByteBuffer())); } catch (IOException e) { throw new ParquetDecodingException("could not read levels in page for col " + path, e); } @@ -832,11 +832,7 @@ public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) { @Override int nextInt() { - try { - return delegate.readInt(); - } catch (IOException e) { - throw new ParquetDecodingException(e); - } + return delegate.readInt(); } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java index 53fafc55dc..77e3784392 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java @@ -19,6 +19,7 @@ package org.apache.parquet.column.values.dictionary; import java.io.IOException; +import java.nio.ByteBuffer; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.Dictionary; @@ -52,12 +53,13 @@ public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IO LOG.debug("init from page at offset {} for length {}", stream.position(), stream.available()); int bitWidth = BytesUtils.readIntLittleEndianOnOneByte(in); LOG.debug("bit width {}", bitWidth); - decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in); + ByteBuffer buf = in.slice(in.available()); + decoder = new RunLengthBitPackingHybridDecoder(bitWidth, buf); } else { - decoder = new RunLengthBitPackingHybridDecoder(1, in) { + decoder = new RunLengthBitPackingHybridDecoder(1, ByteBuffer.allocate(0)) { @Override - public int readInt() throws IOException { - throw new IOException("Attempt to read from empty page"); + public int readInt() { + throw new ParquetDecodingException("Attempt to read from empty page"); } }; } @@ -65,64 +67,36 @@ public int readInt() throws IOException { @Override public int readValueDictionaryId() { - try { - return decoder.readInt(); - } catch (IOException e) { - throw new ParquetDecodingException(e); - } + return decoder.readInt(); } @Override public Binary readBytes() { - try { - return dictionary.decodeToBinary(decoder.readInt()); - } catch (IOException e) { - throw new ParquetDecodingException(e); - } + return dictionary.decodeToBinary(decoder.readInt()); } @Override public float readFloat() { - try { - return dictionary.decodeToFloat(decoder.readInt()); - } catch (IOException e) { - throw new ParquetDecodingException(e); - } + return dictionary.decodeToFloat(decoder.readInt()); } @Override public double readDouble() { - try { - return dictionary.decodeToDouble(decoder.readInt()); - } catch (IOException e) { - throw new ParquetDecodingException(e); - } + return dictionary.decodeToDouble(decoder.readInt()); } @Override public int readInteger() { - try { - return dictionary.decodeToInt(decoder.readInt()); - } catch (IOException e) { - throw new ParquetDecodingException(e); - } + return dictionary.decodeToInt(decoder.readInt()); } @Override public long readLong() { - try { - return dictionary.decodeToLong(decoder.readInt()); - } catch (IOException e) { - throw new ParquetDecodingException(e); - } + return dictionary.decodeToLong(decoder.readInt()); } @Override public void skip() { - try { - decoder.readInt(); // Type does not matter as we are just skipping dictionary keys - } catch (IOException e) { - throw new ParquetDecodingException(e); - } + decoder.readInt(); // Type does not matter as we are just skipping dictionary keys } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java index e55b276b29..407cd154e1 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java @@ -18,9 +18,8 @@ */ package org.apache.parquet.column.values.rle; -import java.io.DataInputStream; -import java.io.IOException; -import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import org.apache.parquet.Preconditions; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.values.bitpacking.BytePacker; @@ -42,23 +41,34 @@ private static enum MODE { private final int bitWidth; private final BytePacker packer; - private final InputStream in; + private final ByteBuffer buffer; private MODE mode; private int currentCount; private int currentValue; private int[] currentBuffer; + private int currentBufferLength; - public RunLengthBitPackingHybridDecoder(int bitWidth, InputStream in) { + // Reusable buffers to avoid per-run allocation in PACKED mode + private int[] packedValuesBuffer = new int[0]; + private byte[] packedBytesBuffer = new byte[0]; + + public RunLengthBitPackingHybridDecoder(int bitWidth, ByteBuffer buffer) { LOG.debug("decoding bitWidth {}", bitWidth); Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32"); this.bitWidth = bitWidth; this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth); - this.in = in; + this.buffer = buffer.order(ByteOrder.LITTLE_ENDIAN); } - public int readInt() throws IOException { + /** + * Reads the next int value from the RLE/Bit-Packing hybrid stream. + * + * @return the next decoded integer value + * @throws ParquetDecodingException if a decoding error occurs + */ + public int readInt() { if (currentCount == 0) { readNext(); } @@ -69,7 +79,7 @@ public int readInt() throws IOException { result = currentValue; break; case PACKED: - result = currentBuffer[currentBuffer.length - 1 - currentCount]; + result = currentBuffer[currentBufferLength - 1 - currentCount]; break; default: throw new ParquetDecodingException("not a valid mode " + mode); @@ -77,30 +87,45 @@ public int readInt() throws IOException { return result; } - private void readNext() throws IOException { - Preconditions.checkArgument(in.available() > 0, "Reading past RLE/BitPacking stream."); - final int header = BytesUtils.readUnsignedVarInt(in); + private void readNext() { + Preconditions.checkArgument(buffer.hasRemaining(), "Reading past RLE/BitPacking stream."); + final int header = BytesUtils.readUnsignedVarInt(buffer); mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED; switch (mode) { case RLE: currentCount = header >>> 1; LOG.debug("reading {} values RLE", currentCount); - currentValue = BytesUtils.readIntLittleEndianPaddedOnBitWidth(in, bitWidth); + currentValue = BytesUtils.readIntLittleEndianPaddedOnBitWidth(buffer, bitWidth); break; case PACKED: int numGroups = header >>> 1; currentCount = numGroups * 8; + currentBufferLength = currentCount; LOG.debug("reading {} values BIT PACKED", currentCount); - currentBuffer = new int[currentCount]; // TODO: reuse a buffer - byte[] bytes = new byte[numGroups * bitWidth]; + if (packedValuesBuffer.length < currentCount) { + packedValuesBuffer = new int[currentCount]; + } + currentBuffer = packedValuesBuffer; + int bytesRequired = numGroups * bitWidth; + if (packedBytesBuffer.length < bytesRequired) { + packedBytesBuffer = new byte[bytesRequired]; + } // At the end of the file RLE data though, there might not be that many bytes left. int bytesToRead = (int) Math.ceil(currentCount * bitWidth / 8.0); - bytesToRead = Math.min(bytesToRead, in.available()); - new DataInputStream(in).readFully(bytes, 0, bytesToRead); - for (int valueIndex = 0, byteIndex = 0; - valueIndex < currentCount; - valueIndex += 8, byteIndex += bitWidth) { - packer.unpack8Values(bytes, byteIndex, currentBuffer, valueIndex); + bytesToRead = Math.min(bytesToRead, buffer.remaining()); + buffer.get(packedBytesBuffer, 0, bytesToRead); + // Unpack 32 values (4 groups) at a time when possible — symmetric to the encoder's + // pack32Values fast path. Falls back to unpack8Values for any residual groups. + int groupIdx = 0; + int byteIndex = 0; + final int step32 = bitWidth * 4; + while (groupIdx + 4 <= numGroups) { + packer.unpack32Values(packedBytesBuffer, byteIndex, currentBuffer, groupIdx * 8); + groupIdx += 4; + byteIndex += step32; + } + for (; groupIdx < numGroups; groupIdx++, byteIndex += bitWidth) { + packer.unpack8Values(packedBytesBuffer, byteIndex, currentBuffer, groupIdx * 8); } break; default: diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java index e33824bff1..7dfeb28088 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java @@ -74,6 +74,11 @@ public class RunLengthBitPackingHybridEncoder implements AutoCloseable { */ private final byte[] packBuffer; + /** + * Buffer four 8-value groups so we can use the packer's 32-value fast path. + */ + private final int[] bitPackedValuesBuffer; + /** * Previous value written, used to detect repeated values */ @@ -98,6 +103,8 @@ public class RunLengthBitPackingHybridEncoder implements AutoCloseable { */ private int bitPackedGroupCount; + private int numBitPackedValues; + /** * A "pointer" to a single byte in baos, * which we use as our bit-packed-header. It's really @@ -125,7 +132,8 @@ public RunLengthBitPackingHybridEncoder( this.bitWidth = bitWidth; this.baos = new CapacityByteArrayOutputStream(initialCapacity, pageSize, allocator); - this.packBuffer = new byte[bitWidth]; + this.packBuffer = new byte[bitWidth * 4]; + this.bitPackedValuesBuffer = new int[32]; this.bufferedValues = new int[8]; this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth); reset(false); @@ -139,6 +147,7 @@ private void reset(boolean resetBaos) { this.numBufferedValues = 0; this.repeatCount = 0; this.bitPackedGroupCount = 0; + this.numBitPackedValues = 0; this.bitPackedRunHeaderPointer = -1; this.toBytesCalled = false; } @@ -196,8 +205,9 @@ private void writeOrAppendBitPackedRun() throws IOException { bitPackedRunHeaderPointer = baos.getCurrentIndex(); } - packer.pack8Values(bufferedValues, 0, packBuffer, 0); - baos.write(packBuffer); + System.arraycopy(bufferedValues, 0, bitPackedValuesBuffer, numBitPackedValues, 8); + numBitPackedValues += 8; + flushBitPackedValuesIfFull(); // empty the buffer, they've all been written numBufferedValues = 0; @@ -209,6 +219,34 @@ private void writeOrAppendBitPackedRun() throws IOException { ++bitPackedGroupCount; } + private void flushBitPackedValuesIfFull() { + if (numBitPackedValues == bitPackedValuesBuffer.length) { + packer.pack32Values(bitPackedValuesBuffer, 0, packBuffer, 0); + baos.write(packBuffer, 0, bitWidth * 4); + numBitPackedValues = 0; + } + } + + private void flushBitPackedValues() { + if (numBitPackedValues == 0) { + return; + } + + if (numBitPackedValues == bitPackedValuesBuffer.length) { + packer.pack32Values(bitPackedValuesBuffer, 0, packBuffer, 0); + baos.write(packBuffer, 0, bitWidth * 4); + } else { + int outPos = 0; + for (int inPos = 0; inPos < numBitPackedValues; inPos += 8) { + packer.pack8Values(bitPackedValuesBuffer, inPos, packBuffer, outPos); + outPos += bitWidth; + } + baos.write(packBuffer, 0, outPos); + } + + numBitPackedValues = 0; + } + /** * If we are currently writing a bit-packed-run, update the * bit-packed-header and consider this run to be over @@ -221,6 +259,8 @@ private void endPreviousBitPackedRun() { return; } + flushBitPackedValues(); + // create bit-packed-header, which needs to fit in 1 byte byte bitPackHeader = (byte) ((bitPackedGroupCount << 1) | 1); diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java index 0bd5a18d2b..8050662f14 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java @@ -19,10 +19,10 @@ package org.apache.parquet.column.values.rle; import java.io.IOException; +import java.nio.ByteBuffer; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.values.ValuesReader; -import org.apache.parquet.io.ParquetDecodingException; /** * This ValuesReader does all the reading in {@link #initFromPage} @@ -39,7 +39,8 @@ public RunLengthBitPackingHybridValuesReader(int bitWidth) { @Override public void initFromPage(int valueCountL, ByteBufferInputStream stream) throws IOException { int length = BytesUtils.readIntLittleEndian(stream); - this.decoder = new RunLengthBitPackingHybridDecoder(bitWidth, stream.sliceStream(length)); + ByteBuffer buf = stream.slice(length); + this.decoder = new RunLengthBitPackingHybridDecoder(bitWidth, buf); // 4 is for the length which is stored as 4 bytes little endian updateNextOffset(length + 4); @@ -47,11 +48,7 @@ public void initFromPage(int valueCountL, ByteBufferInputStream stream) throws I @Override public int readInteger() { - try { - return decoder.readInt(); - } catch (IOException e) { - throw new ParquetDecodingException(e); - } + return decoder.readInt(); } @Override diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java index 01a4c96e85..b06672cee9 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals; import java.nio.ByteBuffer; -import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.DirectByteBufferAllocator; import org.junit.Test; @@ -57,9 +56,8 @@ private void doIntegrationTest(int bitWidth) throws Exception { encoder.writeInt((int) (17 % modValue)); } ByteBuffer encodedBytes = encoder.toBytes().toByteBuffer(); - ByteBufferInputStream in = ByteBufferInputStream.wrap(encodedBytes); - RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in); + RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder(bitWidth, encodedBytes); for (int i = 0; i < 100; i++) { assertEquals(i % modValue, decoder.readInt()); diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java index 93a6c8deb4..b38ea671e0 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import java.io.ByteArrayInputStream; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import org.apache.parquet.bytes.BytesUtils; @@ -290,12 +291,12 @@ public void testGroupBoundary() throws Exception { // bit width 2. bytes[0] = (1 << 1) | 1; bytes[1] = (1 << 0) | (2 << 2) | (3 << 4); - ByteArrayInputStream stream = new ByteArrayInputStream(bytes); - RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder(2, stream); + ByteBuffer buffer = ByteBuffer.wrap(bytes); + RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder(2, buffer); assertEquals(decoder.readInt(), 1); assertEquals(decoder.readInt(), 2); assertEquals(decoder.readInt(), 3); - assertEquals(stream.available(), 0); + assertEquals(buffer.remaining(), 0); } private static List unpack(int bitWidth, int numValues, ByteArrayInputStream is) throws Exception { diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java index b8373a898d..b791eb3f32 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java @@ -141,6 +141,33 @@ public static int readIntLittleEndianPaddedOnBitWidth(InputStream in, int bitWid } } + /** + * Reads a little-endian int padded to the byte count for the given bit width from a ByteBuffer. + * The buffer must be in {@link java.nio.ByteOrder#LITTLE_ENDIAN} order for 2-byte and 4-byte reads. + * + * @param in a ByteBuffer in LITTLE_ENDIAN order + * @param bitWidth the bit width determining how many bytes to read + * @return the value read + */ + public static int readIntLittleEndianPaddedOnBitWidth(ByteBuffer in, int bitWidth) { + int bytesWidth = paddedByteCountFromBits(bitWidth); + switch (bytesWidth) { + case 0: + return 0; + case 1: + return in.get() & 0xFF; + case 2: + return in.getShort() & 0xFFFF; + case 3: + return (in.get() & 0xFF) | ((in.get() & 0xFF) << 8) | ((in.get() & 0xFF) << 16); + case 4: + return in.getInt(); + default: + throw new IllegalArgumentException( + String.format("Encountered bitWidth (%d) that requires more than 4 bytes", bitWidth)); + } + } + public static void writeIntLittleEndianOnOneByte(OutputStream out, int v) throws IOException { out.write((v >>> 0) & 0xFF); } @@ -210,6 +237,23 @@ public static int readUnsignedVarInt(InputStream in) throws IOException { return value | (b << i); } + /** + * Reads an unsigned variable-length integer (varint) from a ByteBuffer. + * + * @param in a ByteBuffer + * @return the unsigned varint value + */ + public static int readUnsignedVarInt(ByteBuffer in) { + int value = 0; + int i = 0; + int b; + while (((b = in.get() & 0xFF) & 0x80) != 0) { + value |= (b & 0x7F) << i; + i += 7; + } + return value | (b << i); + } + /** * uses a trick mentioned in https://developers.google.com/protocol-buffers/docs/encoding to read zigZag encoded data *