Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,17 @@
<version>${gson.version}</version>
</dependency>

<!-- Apache Arrow dependencies -->
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>${apache.arrow.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-compression</artifactId>
<version>${apache.arrow.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions query/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
<artifactId>arrow-vector</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-compression</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>junit</groupId>
Expand Down
33 changes: 28 additions & 5 deletions query/src/main/java/tech/ydb/query/impl/SessionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@
import tech.ydb.core.utils.URITools;
import tech.ydb.core.utils.UpdatableOptional;
import tech.ydb.proto.ValueProtos;
import tech.ydb.proto.formats.YdbFormats;
import tech.ydb.proto.query.YdbQuery;
import tech.ydb.query.QuerySession;
import tech.ydb.query.QueryStream;
import tech.ydb.query.QueryTransaction;
import tech.ydb.query.result.QueryInfo;
import tech.ydb.query.result.QueryStats;
import tech.ydb.query.settings.ApacheArrowFormat;
import tech.ydb.query.settings.AttachSessionSettings;
import tech.ydb.query.settings.BeginTransactionSettings;
import tech.ydb.query.settings.CommitTransactionSettings;
Expand Down Expand Up @@ -204,18 +206,34 @@ private static YdbQuery.StatsMode mapStatsMode(QueryStatsMode mode) {
}
}

private static YdbFormats.ArrowFormatSettings mapApacheArrowFormat(ApacheArrowFormat mode) {
YdbFormats.ArrowFormatSettings.CompressionCodec.Builder codecBuilder = YdbFormats.ArrowFormatSettings
.CompressionCodec.newBuilder();

switch (mode.getCodec()) {
case ZSTD:
codecBuilder.setType(YdbFormats.ArrowFormatSettings.CompressionCodec.Type.TYPE_ZSTD)
.setLevel(mode.getCompressionLevel());
break;
case LZ4_FRAME:
codecBuilder.setType(YdbFormats.ArrowFormatSettings.CompressionCodec.Type.TYPE_LZ4_FRAME);
break;
case NONE:
default:
codecBuilder.setType(YdbFormats.ArrowFormatSettings.CompressionCodec.Type.TYPE_NONE);
break;
}

return YdbFormats.ArrowFormatSettings.newBuilder().setCompressionCodec(codecBuilder).build();
}

GrpcReadStream<YdbQuery.ExecuteQueryResponsePart> createGrpcStream(
String query, YdbQuery.TransactionControl tx, Params prms, ExecuteQuerySettings settings
) {
ValueProtos.ResultSet.Format format = settings.isUseApacheArrowFormat() ?
ValueProtos.ResultSet.Format.FORMAT_ARROW :
ValueProtos.ResultSet.Format.FORMAT_VALUE;

YdbQuery.ExecuteQueryRequest.Builder request = YdbQuery.ExecuteQueryRequest.newBuilder()
.setSessionId(sessionId)
.setExecMode(mapExecMode(settings.getExecMode()))
.setStatsMode(mapStatsMode(settings.getStatsMode()))
.setResultSetFormat(format)
.setConcurrentResultSets(settings.isConcurrentResultSets())
.setQueryContent(YdbQuery.QueryContent.newBuilder()
.setSyntax(YdbQuery.Syntax.SYNTAX_YQL_V1)
Expand All @@ -224,6 +242,11 @@ GrpcReadStream<YdbQuery.ExecuteQueryResponsePart> createGrpcStream(
)
.putAllParameters(prms.toPb());

if (settings.getApacheArrowFormat() != null) {
request.setResultSetFormat(ValueProtos.ResultSet.Format.FORMAT_ARROW)
.setArrowFormatSettings(mapApacheArrowFormat(settings.getApacheArrowFormat()));
}

String resourcePool = settings.getResourcePool();
if (resourcePool != null && !resourcePool.isEmpty()) {
request.setPoolId(resourcePool);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package tech.ydb.query.result.arrow;

import io.grpc.ExperimentalApi;
import org.apache.arrow.compression.CommonsCompressionFactory;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;

/**
*
* @author Aleksandr Gorshenin
*/
@ExperimentalApi("ApacheArrow support is experimental and API may change without notice")
public abstract class ApacheArrowCompressedPartsHandler extends ApacheArrowPartsHandler {
public ApacheArrowCompressedPartsHandler(RootAllocator allocator) {
super(allocator);
}

@Override
protected VectorLoader createLoader(VectorSchemaRoot vsr) {
return new VectorLoader(vsr, CommonsCompressionFactory.INSTANCE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
* @author Aleksandr Gorshenin
*/
@ExperimentalApi("ApacheArrow support is experimental and API may change without notice")
public abstract class ArrowPartsHandler implements QueryStream.PartsHandler {
public abstract class ApacheArrowPartsHandler implements QueryStream.PartsHandler {
private final RootAllocator allocator;

public ArrowPartsHandler(RootAllocator allocator) {
public ApacheArrowPartsHandler(RootAllocator allocator) {
this.allocator = allocator;
}

Expand All @@ -42,18 +42,22 @@ public void onNextRawPart(long index, ValueProtos.ResultSet rs) {
Schema schema = readApacheArrowSchema(rs.getArrowFormatMeta().getSchema());
try (VectorSchemaRoot vsr = VectorSchemaRoot.create(schema, allocator)) {
loadApacheArrowVector(vsr, rs.getData());
onNextPart(new ArrowQueryResultPart(index, vsr, rs.getColumnsList(), rs.getTruncated()));
onNextPart(new ApacheArrowQueryResultPart(index, vsr, rs.getColumnsList(), rs.getTruncated()));
}
} catch (IOException ex) {
throw new RuntimeException("Cannot read ApacheArrow vector", ex);
}
}

protected VectorLoader createLoader(VectorSchemaRoot vsr) {
return new VectorLoader(vsr);
}

private void loadApacheArrowVector(VectorSchemaRoot vsr, ByteString bytes) throws IOException {
try (InputStream is = bytes.newInput()) {
try (ReadChannel channel = new ReadChannel(Channels.newChannel(is))) {
try (ArrowRecordBatch batch = MessageSerializer.deserializeRecordBatch(channel, allocator)) {
VectorLoader loader = new VectorLoader(vsr);
VectorLoader loader = createLoader(vsr);
loader.load(batch);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,22 @@
* @author Aleksandr Gorshenin
*/
@ExperimentalApi("ApacheArrow support is experimental and API may change without notice")
public class ArrowQueryResultPart extends QueryResultPart {
private final ArrowResultSetReader resultSetReader;
public class ApacheArrowQueryResultPart extends QueryResultPart {
private final ApacheArrowResultSetReader resultSetReader;

public ArrowQueryResultPart(long index, VectorSchemaRoot vsr, List<ValueProtos.Column> columns, boolean truncated) {
public ApacheArrowQueryResultPart(long index, VectorSchemaRoot vsr, List<ValueProtos.Column> columns,
boolean isTruncated) {
super(index, null);

ArrowValueReader<?>[] readers = new ArrowValueReader<?>[columns.size()];
ApacheArrowValueReader<?>[] readers = new ApacheArrowValueReader<?>[columns.size()];
for (int idx = 0; idx < columns.size(); idx += 1) {
ValueProtos.Column column = columns.get(idx);
Type type = validateType(column.getType());
boolean optional = column.getType().hasOptionalType();
readers[idx] = ArrowValueReader.createReader(vsr.getVector(column.getName()), type, optional);
readers[idx] = ApacheArrowValueReader.createReader(vsr.getVector(column.getName()), type, optional);
}

this.resultSetReader = new ArrowResultSetReader(vsr, readers, truncated);
this.resultSetReader = new ApacheArrowResultSetReader(vsr, readers, isTruncated);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
* @author Aleksandr Gorshenin
*/
@ExperimentalApi("ApacheArrow support is experimental and API may change without notice")
public class ArrowResultSetReader implements ResultSetReader {
public class ApacheArrowResultSetReader implements ResultSetReader {
private final boolean isTruncated;
private final VectorSchemaRoot vsr;
private final ArrowValueReader<?>[] readers;
private final ApacheArrowValueReader<?>[] readers;
private final Map<String, Integer> columnIndexes;
private int rowIndex = -1; // before first

public ArrowResultSetReader(VectorSchemaRoot vsr, ArrowValueReader<?>[] readers, boolean isTruncated) {
public ApacheArrowResultSetReader(VectorSchemaRoot vsr, ApacheArrowValueReader<?>[] readers, boolean isTruncated) {
this.vsr = vsr;
this.readers = readers;
this.columnIndexes = Maps.newHashMapWithExpectedSize(readers.length);
Expand Down Expand Up @@ -100,7 +100,7 @@ public ValueReader getColumn(int index) {
if (index < 0 || index >= readers.length) {
throw new IllegalArgumentException("Column index: " + index + ", columns count: " + readers.length);
}
ArrowValueReader<?> reader = readers[index];
ApacheArrowValueReader<?> reader = readers[index];
reader.setRowIndex(rowIndex);
return reader;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@
* @param <T> type of FieldVector
*/
@ExperimentalApi("ApacheArrow support is experimental and API may change without notice")
public abstract class ArrowValueReader<T extends FieldVector> implements ValueReader {
public abstract class ApacheArrowValueReader<T extends FieldVector> implements ValueReader {
protected final Type type;
protected final boolean isNullable;
protected final T vector;
protected int rowIndex;

protected ArrowValueReader(T vector, Type type, boolean isNullable) {
protected ApacheArrowValueReader(T vector, Type type, boolean isNullable) {
this.vector = vector;
this.type = type;
this.isNullable = isNullable;
Expand All @@ -62,7 +62,7 @@ protected RuntimeException error(String method) {
return new IllegalStateException("cannot call " + method + ", actual type: " + getType());
}

protected abstract ArrowValueReader<T> toNotNull();
protected abstract ApacheArrowValueReader<T> toNotNull();
protected abstract Value<?> getNotNullValue();
protected abstract String getNotNullValueAsString();

Expand Down Expand Up @@ -102,7 +102,7 @@ public ValueReader getOptionalItem() {
if (vector.isNull(rowIndex)) {
return null;
}
ArrowValueReader<T> notNull = toNotNull();
ApacheArrowValueReader<T> notNull = toNotNull();
notNull.setRowIndex(rowIndex);
return notNull;
}
Expand Down Expand Up @@ -348,7 +348,7 @@ public ValueReader getVariantItem() {
throw error("getVariantItem");
}

private static class UInt1VectorReader extends ArrowValueReader<UInt1Vector> {
private static class UInt1VectorReader extends ApacheArrowValueReader<UInt1Vector> {
UInt1VectorReader(UInt1Vector vector, Type type, boolean isNullable) {
super(vector, type, isNullable);
}
Expand Down Expand Up @@ -398,7 +398,7 @@ public String getNotNullValueAsString() {
}
}

private static class UInt2VectorReader extends ArrowValueReader<UInt2Vector> {
private static class UInt2VectorReader extends ApacheArrowValueReader<UInt2Vector> {
UInt2VectorReader(UInt2Vector vector, Type type, boolean isNullable) {
super(vector, type, isNullable);
}
Expand Down Expand Up @@ -448,7 +448,7 @@ public String getNotNullValueAsString() {
}
}

private static class UInt4VectorReader extends ArrowValueReader<UInt4Vector> {
private static class UInt4VectorReader extends ApacheArrowValueReader<UInt4Vector> {
UInt4VectorReader(UInt4Vector vector, Type type, boolean isNullable) {
super(vector, type, isNullable);
}
Expand Down Expand Up @@ -497,7 +497,7 @@ public String getNotNullValueAsString() {
}
}

private static class UInt8VectorReader extends ArrowValueReader<UInt8Vector> {
private static class UInt8VectorReader extends ApacheArrowValueReader<UInt8Vector> {
UInt8VectorReader(UInt8Vector vector, Type type, boolean isNullable) {
super(vector, type, isNullable);
}
Expand Down Expand Up @@ -549,7 +549,7 @@ public String getNotNullValueAsString() {
}
}

private static class TinyIntVectorReader extends ArrowValueReader<TinyIntVector> {
private static class TinyIntVectorReader extends ApacheArrowValueReader<TinyIntVector> {
TinyIntVectorReader(TinyIntVector vector, Type type, boolean isNullable) {
super(vector, type, isNullable);
}
Expand Down Expand Up @@ -584,7 +584,7 @@ public String getNotNullValueAsString() {
}
}

private static class SmallIntVectorReader extends ArrowValueReader<SmallIntVector> {
private static class SmallIntVectorReader extends ApacheArrowValueReader<SmallIntVector> {
SmallIntVectorReader(SmallIntVector vector, Type type, boolean isNullable) {
super(vector, type, isNullable);
}
Expand Down Expand Up @@ -619,7 +619,7 @@ public String getNotNullValueAsString() {
}
}

private static class IntVectorReader extends ArrowValueReader<IntVector> {
private static class IntVectorReader extends ApacheArrowValueReader<IntVector> {
IntVectorReader(IntVector vector, Type type, boolean isNullable) {
super(vector, type, isNullable);
}
Expand Down Expand Up @@ -668,7 +668,7 @@ public String getNotNullValueAsString() {
}
}

private static class BigIntVectorReader extends ArrowValueReader<BigIntVector> {
private static class BigIntVectorReader extends ApacheArrowValueReader<BigIntVector> {
BigIntVectorReader(BigIntVector vector, Type type, boolean isNullable) {
super(vector, type, isNullable);
}
Expand Down Expand Up @@ -762,7 +762,7 @@ public String getNotNullValueAsString() {
}
}

private static class FloatVectorReader extends ArrowValueReader<Float4Vector> {
private static class FloatVectorReader extends ApacheArrowValueReader<Float4Vector> {
FloatVectorReader(Float4Vector vector, Type type, boolean isNullable) {
super(vector, type, isNullable);
}
Expand Down Expand Up @@ -797,7 +797,7 @@ public String getNotNullValueAsString() {
}
}

private static class DoubleVectorReader extends ArrowValueReader<Float8Vector> {
private static class DoubleVectorReader extends ApacheArrowValueReader<Float8Vector> {
DoubleVectorReader(Float8Vector vector, Type type, boolean isNullable) {
super(vector, type, isNullable);
}
Expand Down Expand Up @@ -832,7 +832,7 @@ public String getNotNullValueAsString() {
}
}

private static class VarCharVectorReader extends ArrowValueReader<VarCharVector> {
private static class VarCharVectorReader extends ApacheArrowValueReader<VarCharVector> {
VarCharVectorReader(VarCharVector vector, Type type, boolean isNullable) {
super(vector, type, isNullable);
}
Expand Down Expand Up @@ -895,7 +895,7 @@ public String getNotNullValueAsString() {
}
}

private static class VarBinaryVectorReader extends ArrowValueReader<VarBinaryVector> {
private static class VarBinaryVectorReader extends ApacheArrowValueReader<VarBinaryVector> {
VarBinaryVectorReader(VarBinaryVector vector, Type type, boolean isNullable) {
super(vector, type, isNullable);
}
Expand Down Expand Up @@ -952,7 +952,7 @@ public String getNotNullValueAsString() {
}
}

private static class FixedSizeBinaryVectorReader extends ArrowValueReader<FixedSizeBinaryVector> {
private static class FixedSizeBinaryVectorReader extends ApacheArrowValueReader<FixedSizeBinaryVector> {
FixedSizeBinaryVectorReader(FixedSizeBinaryVector vector, Type type, boolean isNullable) {
super(vector, type, isNullable);
}
Expand Down Expand Up @@ -1013,7 +1013,7 @@ public String getNotNullValueAsString() {
}
}

public static ArrowValueReader<?> createReader(FieldVector vector, Type type, boolean optional) {
public static ApacheArrowValueReader<?> createReader(FieldVector vector, Type type, boolean optional) {
switch (vector.getClass().getSimpleName()) {
case "UInt1Vector": return new UInt1VectorReader((UInt1Vector) vector, type, optional);
case "UInt2Vector": return new UInt2VectorReader((UInt2Vector) vector, type, optional);
Expand Down
Loading
Loading