Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 62 additions & 1 deletion api/src/main/java/org/apache/iceberg/util/ByteBuffers.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

public class ByteBuffers {

private ByteBuffers() {}

public static byte[] toByteArray(ByteBuffer buffer) {
if (buffer == null) {
return null;
Expand Down Expand Up @@ -73,5 +75,64 @@ public static ByteBuffer copy(ByteBuffer buffer) {
return ByteBuffer.wrap(copyArray);
}

private ByteBuffers() {}
public static void writeByte(ByteBuffer buffer, int value, int offset) {
buffer.put(buffer.position() + offset, (byte) (value & 0xFF));
}

public static void writeLittleEndianUnsigned(ByteBuffer buffer, int value, int offset, int size) {
int base = buffer.position() + offset;
switch (size) {
case 4:
buffer.putInt(base, value);
return;
case 3:
buffer.putShort(base, (short) (value & 0xFFFF));
buffer.put(base + 2, (byte) ((value >> 16) & 0xFF));
return;
case 2:
buffer.putShort(base, (short) (value & 0xFFFF));
return;
case 1:
buffer.put(base, (byte) (value & 0xFF));
return;
}

throw new IllegalArgumentException("Invalid size: " + size);
}

public static byte readLittleEndianInt8(ByteBuffer buffer, int offset) {
return buffer.get(buffer.position() + offset);
}

public static short readLittleEndianInt16(ByteBuffer buffer, int offset) {
return buffer.getShort(buffer.position() + offset);
}

public static int readByte(ByteBuffer buffer, int offset) {
return buffer.get(buffer.position() + offset) & 0xFF;
}

public static int readLittleEndianUnsigned(ByteBuffer buffer, int offset, int size) {
int base = buffer.position() + offset;
switch (size) {
case 4:
return buffer.getInt(base);
case 3:
return (((int) buffer.getShort(base)) & 0xFFFF) | ((buffer.get(base + 2) & 0xFF) << 16);
case 2:
return ((int) buffer.getShort(base)) & 0xFFFF;
case 1:
return buffer.get(base) & 0xFF;
}

throw new IllegalArgumentException("Invalid size: " + size);
}

public static int readLittleEndianInt32(ByteBuffer buffer, int offset) {
return buffer.getInt(buffer.position() + offset);
}

public static long readLittleEndianInt64(ByteBuffer buffer, int offset) {
return buffer.getLong(buffer.position() + offset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.nio.ByteOrder;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.ByteBuffers;

class SerializedArray implements VariantArray, SerializedValue {
private static final int HEADER_SIZE = 1;
Expand Down Expand Up @@ -55,7 +56,7 @@ private SerializedArray(VariantMetadata metadata, ByteBuffer value, int header)
this.value = value;
this.offsetSize = 1 + ((header & OFFSET_SIZE_MASK) >> OFFSET_SIZE_SHIFT);
int numElementsSize = ((header & IS_LARGE) == IS_LARGE) ? 4 : 1;
int numElements = VariantUtil.readLittleEndianUnsigned(value, HEADER_SIZE, numElementsSize);
int numElements = ByteBuffers.readLittleEndianUnsigned(value, HEADER_SIZE, numElementsSize);
this.offsetListOffset = HEADER_SIZE + numElementsSize;
this.dataOffset = offsetListOffset + ((1 + numElements) * offsetSize);
this.array = new VariantValue[numElements];
Expand All @@ -70,10 +71,10 @@ public int numElements() {
public VariantValue get(int index) {
if (null == array[index]) {
int offset =
VariantUtil.readLittleEndianUnsigned(
ByteBuffers.readLittleEndianUnsigned(
value, offsetListOffset + (offsetSize * index), offsetSize);
int next =
VariantUtil.readLittleEndianUnsigned(
ByteBuffers.readLittleEndianUnsigned(
value, offsetListOffset + (offsetSize * (1 + index)), offsetSize);
array[index] =
VariantValue.from(metadata, VariantUtil.slice(value, dataOffset + offset, next - offset));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.nio.ByteOrder;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.ByteBuffers;

class SerializedMetadata implements VariantMetadata, Serialized {
private static final int HEADER_SIZE = 1;
Expand All @@ -42,7 +43,7 @@ static SerializedMetadata from(byte[] bytes) {
static SerializedMetadata from(ByteBuffer metadata) {
Preconditions.checkArgument(
metadata.order() == ByteOrder.LITTLE_ENDIAN, "Unsupported byte order: big endian");
int header = VariantUtil.readByte(metadata, 0);
int header = ByteBuffers.readByte(metadata, 0);
int version = header & VERSION_MASK;
Preconditions.checkArgument(SUPPORTED_VERSION == version, "Unsupported version: %s", version);
return new SerializedMetadata(metadata, header);
Expand All @@ -58,13 +59,13 @@ static SerializedMetadata from(ByteBuffer metadata) {
private SerializedMetadata(ByteBuffer metadata, int header) {
this.isSorted = (header & SORTED_STRINGS) == SORTED_STRINGS;
this.offsetSize = 1 + ((header & OFFSET_SIZE_MASK) >> OFFSET_SIZE_SHIFT);
int dictSize = VariantUtil.readLittleEndianUnsigned(metadata, HEADER_SIZE, offsetSize);
int dictSize = ByteBuffers.readLittleEndianUnsigned(metadata, HEADER_SIZE, offsetSize);
this.dict = new String[dictSize];
this.offsetListOffset = HEADER_SIZE + offsetSize;
this.dataOffset = offsetListOffset + ((1 + dictSize) * offsetSize);
int endOffset =
dataOffset
+ VariantUtil.readLittleEndianUnsigned(
+ ByteBuffers.readLittleEndianUnsigned(
metadata, offsetListOffset + (offsetSize * dictSize), offsetSize);
if (endOffset < metadata.limit()) {
this.metadata = VariantUtil.slice(metadata, 0, endOffset);
Expand Down Expand Up @@ -106,10 +107,10 @@ public int id(String name) {
public String get(int index) {
if (null == dict[index]) {
int offset =
VariantUtil.readLittleEndianUnsigned(
ByteBuffers.readLittleEndianUnsigned(
metadata, offsetListOffset + (offsetSize * index), offsetSize);
int next =
VariantUtil.readLittleEndianUnsigned(
ByteBuffers.readLittleEndianUnsigned(
metadata, offsetListOffset + (offsetSize * (1 + index)), offsetSize);
dict[index] = VariantUtil.readString(metadata, dataOffset + offset, next - offset);
}
Expand All @@ -129,7 +130,7 @@ public int sizeInBytes() {
@Override
public int writeTo(ByteBuffer buffer, int offset) {
ByteBuffer value = buffer();
VariantUtil.writeBufferAbsolute(buffer, offset, value);
buffer.put(offset, value, value.position(), value.remaining());
return value.remaining();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.ByteBuffers;

class SerializedObject implements VariantObject, SerializedValue {
private static final int HEADER_SIZE = 1;
Expand Down Expand Up @@ -67,7 +68,7 @@ private SerializedObject(VariantMetadata metadata, ByteBuffer value, int header)
this.offsetSize = 1 + ((header & OFFSET_SIZE_MASK) >> OFFSET_SIZE_SHIFT);
this.fieldIdSize = 1 + ((header & FIELD_ID_SIZE_MASK) >> FIELD_ID_SIZE_SHIFT);
int numElementsSize = ((header & IS_LARGE) == IS_LARGE) ? 4 : 1;
int numElements = VariantUtil.readLittleEndianUnsigned(value, HEADER_SIZE, numElementsSize);
int numElements = ByteBuffers.readLittleEndianUnsigned(value, HEADER_SIZE, numElementsSize);
this.fieldIdListOffset = HEADER_SIZE + numElementsSize;
this.fieldIds = new Integer[numElements];
this.offsetListOffset = fieldIdListOffset + (numElements * fieldIdSize);
Expand All @@ -86,14 +87,14 @@ private void initOffsetsAndLengths(int numElements) {
Map<Integer, Integer> offsetToLength = Maps.newHashMap();
for (int index = 0; index < numElements; index += 1) {
offsets[index] =
VariantUtil.readLittleEndianUnsigned(
ByteBuffers.readLittleEndianUnsigned(
value, offsetListOffset + (index * offsetSize), offsetSize);

offsetToLength.put(offsets[index], 0);
}

int dataLength =
VariantUtil.readLittleEndianUnsigned(
ByteBuffers.readLittleEndianUnsigned(
value, offsetListOffset + (numElements * offsetSize), offsetSize);
offsetToLength.put(dataLength, 0);

Expand Down Expand Up @@ -163,7 +164,7 @@ public String next() {
private int id(int index) {
if (null == fieldIds[index]) {
fieldIds[index] =
VariantUtil.readLittleEndianUnsigned(
ByteBuffers.readLittleEndianUnsigned(
value, fieldIdListOffset + (index * fieldIdSize), fieldIdSize);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.UUIDUtil;

class SerializedPrimitive implements VariantPrimitive<Object>, SerializedValue {
Expand Down Expand Up @@ -61,52 +62,52 @@ private Object read() {
case BOOLEAN_FALSE:
return false;
case INT8:
return VariantUtil.readLittleEndianInt8(value, PRIMITIVE_OFFSET);
return ByteBuffers.readLittleEndianInt8(value, PRIMITIVE_OFFSET);
case INT16:
return VariantUtil.readLittleEndianInt16(value, PRIMITIVE_OFFSET);
return ByteBuffers.readLittleEndianInt16(value, PRIMITIVE_OFFSET);
case INT32:
case DATE:
return VariantUtil.readLittleEndianInt32(value, PRIMITIVE_OFFSET);
return ByteBuffers.readLittleEndianInt32(value, PRIMITIVE_OFFSET);
case INT64:
case TIMESTAMPTZ:
case TIMESTAMPNTZ:
case TIME:
case TIMESTAMPTZ_NANOS:
case TIMESTAMPNTZ_NANOS:
return VariantUtil.readLittleEndianInt64(value, PRIMITIVE_OFFSET);
return ByteBuffers.readLittleEndianInt64(value, PRIMITIVE_OFFSET);
case FLOAT:
return VariantUtil.readFloat(value, PRIMITIVE_OFFSET);
case DOUBLE:
return VariantUtil.readDouble(value, PRIMITIVE_OFFSET);
case DECIMAL4:
{
int scale = VariantUtil.readByte(value, PRIMITIVE_OFFSET);
int unscaled = VariantUtil.readLittleEndianInt32(value, PRIMITIVE_OFFSET + 1);
int scale = ByteBuffers.readByte(value, PRIMITIVE_OFFSET);
int unscaled = ByteBuffers.readLittleEndianInt32(value, PRIMITIVE_OFFSET + 1);
return new BigDecimal(BigInteger.valueOf(unscaled), scale);
}
case DECIMAL8:
{
int scale = VariantUtil.readByte(value, PRIMITIVE_OFFSET);
long unscaled = VariantUtil.readLittleEndianInt64(value, PRIMITIVE_OFFSET + 1);
int scale = ByteBuffers.readByte(value, PRIMITIVE_OFFSET);
long unscaled = ByteBuffers.readLittleEndianInt64(value, PRIMITIVE_OFFSET + 1);
return new BigDecimal(BigInteger.valueOf(unscaled), scale);
}
case DECIMAL16:
{
int scale = VariantUtil.readByte(value, PRIMITIVE_OFFSET);
int scale = ByteBuffers.readByte(value, PRIMITIVE_OFFSET);
byte[] unscaled = new byte[16];
for (int i = 0; i < 16; i += 1) {
unscaled[i] = (byte) VariantUtil.readByte(value, PRIMITIVE_OFFSET + 16 - i);
unscaled[i] = (byte) ByteBuffers.readByte(value, PRIMITIVE_OFFSET + 16 - i);
}
return new BigDecimal(new BigInteger(unscaled), scale);
}
case BINARY:
{
int size = VariantUtil.readLittleEndianInt32(value, PRIMITIVE_OFFSET);
int size = ByteBuffers.readLittleEndianInt32(value, PRIMITIVE_OFFSET);
return VariantUtil.slice(value, PRIMITIVE_OFFSET + 4, size);
}
case STRING:
{
int size = VariantUtil.readLittleEndianInt32(value, PRIMITIVE_OFFSET);
int size = ByteBuffers.readLittleEndianInt32(value, PRIMITIVE_OFFSET);
return VariantUtil.readString(value, PRIMITIVE_OFFSET + 4, size);
}
case UUID:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ default int sizeInBytes() {
@Override
default int writeTo(ByteBuffer buffer, int offset) {
ByteBuffer value = buffer();
VariantUtil.writeBufferAbsolute(buffer, offset, value);
buffer.put(offset, value, value.position(), value.remaining());
return value.remaining();
}
}
73 changes: 0 additions & 73 deletions api/src/main/java/org/apache/iceberg/variants/VariantUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.function.Function;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

class VariantUtil {
private static final int BASIC_TYPE_MASK = 0b11;
Expand All @@ -33,78 +32,6 @@ class VariantUtil {

private VariantUtil() {}

/** A hacky absolute put for ByteBuffer */
static int writeBufferAbsolute(ByteBuffer buffer, int offset, ByteBuffer toCopy) {
int originalPosition = buffer.position();
buffer.position(offset);
ByteBuffer copy = toCopy.duplicate();
buffer.put(copy); // duplicate so toCopy is not modified
buffer.position(originalPosition);
Preconditions.checkArgument(copy.remaining() <= 0, "Not fully written");
return toCopy.remaining();
}

static void writeByte(ByteBuffer buffer, int value, int offset) {
buffer.put(buffer.position() + offset, (byte) (value & 0xFF));
}

static void writeLittleEndianUnsigned(ByteBuffer buffer, int value, int offset, int size) {
int base = buffer.position() + offset;
switch (size) {
case 4:
buffer.putInt(base, value);
return;
case 3:
buffer.putShort(base, (short) (value & 0xFFFF));
buffer.put(base + 2, (byte) ((value >> 16) & 0xFF));
return;
case 2:
buffer.putShort(base, (short) (value & 0xFFFF));
return;
case 1:
buffer.put(base, (byte) (value & 0xFF));
return;
}

throw new IllegalArgumentException("Invalid size: " + size);
}

static byte readLittleEndianInt8(ByteBuffer buffer, int offset) {
return buffer.get(buffer.position() + offset);
}

static short readLittleEndianInt16(ByteBuffer buffer, int offset) {
return buffer.getShort(buffer.position() + offset);
}

static int readByte(ByteBuffer buffer, int offset) {
return buffer.get(buffer.position() + offset) & 0xFF;
}

static int readLittleEndianUnsigned(ByteBuffer buffer, int offset, int size) {
int base = buffer.position() + offset;
switch (size) {
case 4:
return buffer.getInt(base);
case 3:
return (((int) buffer.getShort(base)) & 0xFFFF) | ((buffer.get(base + 2) & 0xFF) << 16);
case 2:
return ((int) buffer.getShort(base)) & 0xFFFF;
case 1:
return buffer.get(base) & 0xFF;
}

throw new IllegalArgumentException("Invalid size: " + size);
}

static int readLittleEndianInt32(ByteBuffer buffer, int offset) {
return buffer.getInt(buffer.position() + offset);
}

static long readLittleEndianInt64(ByteBuffer buffer, int offset) {
return buffer.getLong(buffer.position() + offset);
}

static float readFloat(ByteBuffer buffer, int offset) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should readFloat and readDouble be moved as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so because they are only used here. We can always move them later, but these weren't shared originally because we didn't have other parts of the codebase that needed them.

return buffer.getFloat(buffer.position() + offset);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.variants;

import java.nio.ByteBuffer;
import org.apache.iceberg.util.ByteBuffers;

/** A variant value. */
public interface VariantValue {
Expand Down Expand Up @@ -61,7 +62,7 @@ default VariantArray asArray() {
}

static VariantValue from(VariantMetadata metadata, ByteBuffer value) {
int header = VariantUtil.readByte(value, 0);
int header = ByteBuffers.readByte(value, 0);
BasicType basicType = VariantUtil.basicType(header);
switch (basicType) {
case PRIMITIVE:
Expand Down
Loading
Loading