Skip to content

Commit 8a914f9

Browse files
Junjie Wangfacebook-github-bot
authored andcommitted
feat: optimize array_agg to avoid copy
Summary: existing fast path requires a single source and a single contiguous byte range to avoid a copy. we can apply zero copy to a more sophisticated condition as long as for each group there is a contiguous byte range. the reason why we see fragments of ranges in one input batch is that in the query plan we have filter and then native join which includes this streamingAgg operator so we fall into the branch where not all rows are selected Reviewed By: Yuhta, xiaoxmeng Differential Revision: D88194739
1 parent 8a466df commit 8a914f9

File tree

1 file changed

+77
-29
lines changed

1 file changed

+77
-29
lines changed

velox/functions/prestosql/aggregates/ArrayAggAggregate.cpp

Lines changed: 77 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -128,46 +128,94 @@ class ArrayAggAggregate : public exec::Aggregate {
128128
if (clusteredInput_) {
129129
bool singleSource{true};
130130
VectorPtr* currentSource{nullptr};
131-
std::vector<BaseVector::CopyRange> ranges;
131+
bool contiguousElementsPerGroup = true;
132+
// check if we can get a contiguous range for each group
133+
std::vector<BaseVector::CopyRange> rangeMap;
134+
rangeMap.resize(numGroups, {0, 0, 0});
132135
for (int32_t i = 0; i < numGroups; ++i) {
133136
auto* accumulator = value<ClusteredAccumulator>(groups[i]);
134-
resultOffsets[i] = arrayOffset;
135-
vector_size_t arraySize = 0;
137+
if (accumulator->sources.empty()) {
138+
continue;
139+
}
136140
for (auto& source : accumulator->sources) {
137-
if (currentSource && currentSource->get() != source.vector.get()) {
138-
elements->copyRanges(currentSource->get(), ranges);
139-
ranges.clear();
140-
}
141-
if (!currentSource || currentSource->get() != source.vector.get()) {
142-
singleSource = currentSource == nullptr;
141+
if (!currentSource) {
143142
currentSource = &source.vector;
144-
ranges.push_back({source.offset, arrayOffset, source.size});
145-
} else if (
146-
ranges.back().sourceIndex + ranges.back().count ==
147-
source.offset) {
148-
ranges.back().count += source.size;
149143
} else {
150-
VELOX_DCHECK_LT(
151-
ranges.back().sourceIndex + ranges.back().count, source.offset);
152-
ranges.push_back({source.offset, arrayOffset, source.size});
144+
singleSource =
145+
singleSource && (currentSource->get() == source.vector.get());
153146
}
154-
arrayOffset += source.size;
155-
arraySize += source.size;
156147
}
157-
resultSizes[i] = arraySize;
158-
if (arraySize == 0) {
159-
vector->setNull(i, true);
148+
if (singleSource) {
149+
vector_size_t offset = accumulator->sources[0].offset;
150+
vector_size_t size = accumulator->sources[0].size;
151+
for (auto i = 1; i < accumulator->sources.size(); ++i) {
152+
if (accumulator->sources[i].offset !=
153+
accumulator->sources[i - 1].offset +
154+
accumulator->sources[i - 1].size) {
155+
contiguousElementsPerGroup = false;
156+
break;
157+
} else {
158+
size += accumulator->sources[i].size;
159+
}
160+
}
161+
if (contiguousElementsPerGroup) {
162+
rangeMap[i] = {offset, 0, size};
163+
}
160164
} else {
161-
clearNull(rawNulls, i);
165+
contiguousElementsPerGroup = false;
166+
break;
167+
}
168+
}
169+
// fallback to copy if we can't get a contiguous range for each group
170+
std::vector<BaseVector::CopyRange> ranges;
171+
if (currentSource != nullptr && !contiguousElementsPerGroup) {
172+
currentSource = nullptr;
173+
for (int32_t i = 0; i < numGroups; ++i) {
174+
auto* accumulator = value<ClusteredAccumulator>(groups[i]);
175+
resultOffsets[i] = arrayOffset;
176+
vector_size_t arraySize = 0;
177+
for (auto& source : accumulator->sources) {
178+
if (currentSource && currentSource->get() != source.vector.get()) {
179+
elements->copyRanges(currentSource->get(), ranges);
180+
ranges.clear();
181+
}
182+
if (!currentSource || currentSource->get() != source.vector.get()) {
183+
currentSource = &source.vector;
184+
ranges.push_back({source.offset, arrayOffset, source.size});
185+
} else if (
186+
ranges.back().sourceIndex + ranges.back().count ==
187+
source.offset) {
188+
ranges.back().count += source.size;
189+
} else {
190+
VELOX_DCHECK_LT(
191+
ranges.back().sourceIndex + ranges.back().count,
192+
source.offset);
193+
ranges.push_back({source.offset, arrayOffset, source.size});
194+
}
195+
arrayOffset += source.size;
196+
arraySize += source.size;
197+
}
198+
resultSizes[i] = arraySize;
199+
if (arraySize == 0) {
200+
vector->setNull(i, true);
201+
} else {
202+
clearNull(rawNulls, i);
203+
}
162204
}
163205
}
164206
if (currentSource != nullptr) {
165-
VELOX_DCHECK(!ranges.empty());
166-
if (singleSource && ranges.size() == 1) {
167-
VELOX_CHECK_GE(currentSource->get()->size(), numElements);
168-
VELOX_CHECK_EQ(ranges[0].count, numElements);
169-
elements = currentSource->get()->slice(
170-
ranges[0].sourceIndex, ranges[0].count);
207+
VELOX_DCHECK(contiguousElementsPerGroup || !ranges.empty());
208+
if (singleSource && contiguousElementsPerGroup) {
209+
elements = BaseVector::loadedVectorShared(*currentSource);
210+
for (int32_t i = 0; i < numGroups; ++i) {
211+
if (rangeMap[i].count > 0) {
212+
resultOffsets[i] = rangeMap[i].sourceIndex;
213+
resultSizes[i] = rangeMap[i].count;
214+
clearNull(rawNulls, i);
215+
} else {
216+
vector->setNull(i, true);
217+
}
218+
}
171219
} else {
172220
elements->copyRanges(currentSource->get(), ranges);
173221
}

0 commit comments

Comments
 (0)