Skip to content

Commit f8b32e7

Browse files
duanmengmeta-codesync[bot]
authored andcommitted
feat: Add listRowsFast in RowContainer (facebookincubator#15698)
Summary: Adds an optional `useListRowIndex` flag and listRowsFast method in `RowContainer`. When enabled, RowContainer maintains a row-pointer index (`rowPointers_`) and uses `listRowsFast()` to iterate rows without scanning allocation ranges or checking free/probe flags. It is intended to be used in SortBuffer and SortInputSpiller to improve performance. Pull Request resolved: facebookincubator#15698 Reviewed By: Yuhta Differential Revision: D88497347 Pulled By: zacw7 fbshipit-source-id: 4cec8eeb6b49fd7a80e97933cb0dd2da038a8a70
1 parent 35b8a49 commit f8b32e7

12 files changed

+118
-42
lines changed

velox/exec/GroupingSet.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1102,6 +1102,7 @@ bool GroupingSet::getOutputWithSpill(
11021102
false,
11031103
false,
11041104
false,
1105+
false,
11051106
pool_);
11061107

11071108
initializeAggregates(aggregates_, *mergeRows_, false);
@@ -1437,6 +1438,7 @@ void GroupingSet::abandonPartialAggregation() {
14371438
false,
14381439
false,
14391440
false,
1441+
false,
14401442
pool_);
14411443
initializeAggregates(aggregates_, *intermediateRows_, true);
14421444
table_.reset();

velox/exec/HashTable.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ HashTable<ignoreNullKeys>::HashTable(
7575
isJoinBuild,
7676
hasProbedFlag,
7777
hashMode_ != HashMode::kHash,
78+
/*useListRowIndex=*/false,
7879
pool);
7980
nextOffset_ = rows_->nextOffset();
8081
}

velox/exec/RowContainer.cpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,14 +136,17 @@ RowContainer::RowContainer(
136136
bool isJoinBuild,
137137
bool hasProbedFlag,
138138
bool hasNormalizedKeys,
139+
bool useListRowIndex,
139140
memory::MemoryPool* pool)
140141
: keyTypes_(keyTypes),
141142
nullableKeys_(nullableKeys),
142143
isJoinBuild_(isJoinBuild),
143144
hasNormalizedKeys_(hasNormalizedKeys),
145+
useListRowIndex_(useListRowIndex),
144146
stringAllocator_(std::make_unique<HashStringAllocator>(pool)),
145147
accumulators_(accumulators),
146-
rows_(pool) {
148+
rows_(pool),
149+
rowPointers_(StlAllocator<char*>(stringAllocator_.get())) {
147150
// Compute the layout of the payload row. The row has keys, null flags,
148151
// accumulators, dependent fields. All fields are fixed width. If variable
149152
// width data is referenced, this is done with StringView(for VARCHAR) and
@@ -279,6 +282,10 @@ char* RowContainer::newRow() {
279282
if (normalizedKeySize_) {
280283
++numRowsWithNormalizedKey_;
281284
}
285+
286+
if (useListRowIndex_) {
287+
rowPointers_.push_back(row);
288+
}
282289
}
283290
return initializeRow(row, false /* reuse */);
284291
}
@@ -964,6 +971,8 @@ void RowContainer::clear() {
964971
hasDuplicateRows_ = false;
965972

966973
rows_.clear();
974+
rowPointers_.clear();
975+
rowPointers_.shrink_to_fit();
967976
stringAllocator_->clear();
968977
numRows_ = 0;
969978
numRowsWithNormalizedKey_ = 0;
@@ -1025,7 +1034,8 @@ std::optional<int64_t> RowContainer::estimateRowSize() const {
10251034
}
10261035
int64_t freeBytes = rows_.freeBytes() + fixedRowSize_ * numFreeRows_;
10271036
int64_t usedSize = rows_.allocatedBytes() - freeBytes +
1028-
stringAllocator_->retainedSize() - stringAllocator_->freeSpace();
1037+
stringAllocator_->retainedSize() - stringAllocator_->freeSpace() -
1038+
rowPointers_.capacity() * sizeof(char*);
10291039
int64_t rowSize = usedSize / numRows_;
10301040
VELOX_CHECK_GT(
10311041
rowSize, 0, "Estimated row size of the RowContainer must be positive.");

velox/exec/RowContainer.h

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ struct RowContainerIterator {
8383
char* rowBegin{nullptr};
8484
/// First byte after the end of the range containing 'currentRow'.
8585
char* endOfRun{nullptr};
86+
/// Cursor of the list row operation.
87+
int32_t listRowCursor{0};
8688

8789
/// Returns the current row, skipping a possible normalized key below the
8890
/// first byte of row.
@@ -273,6 +275,21 @@ class RowContainer {
273275
const std::vector<TypePtr>& keyTypes,
274276
const std::vector<TypePtr>& dependentTypes,
275277
memory::MemoryPool* pool)
278+
: RowContainer(
279+
keyTypes,
280+
dependentTypes,
281+
/*useListRowIndex=*/false,
282+
pool) {}
283+
284+
/// If 'useListRowIndex' is true, the container maintains an internal array of
285+
/// row pointers so that listRowsFast() can return rows without scanning
286+
/// underlying allocations or checking free/probe flags. It is intended to be
287+
/// used in SortBuffer and SortInputSpiller to improve performance.
288+
RowContainer(
289+
const std::vector<TypePtr>& keyTypes,
290+
const std::vector<TypePtr>& dependentTypes,
291+
bool useListRowIndex,
292+
memory::MemoryPool* pool)
276293
: RowContainer(
277294
keyTypes,
278295
true, // nullableKeys
@@ -282,6 +299,7 @@ class RowContainer {
282299
false, // isJoinBuild
283300
false, // hasProbedFlag
284301
false, // hasNormalizedKey
302+
useListRowIndex,
285303
pool) {}
286304

287305
~RowContainer();
@@ -313,6 +331,7 @@ class RowContainer {
313331
bool isJoinBuild,
314332
bool hasProbedFlag,
315333
bool hasNormalizedKey,
334+
bool useListRowIndex,
316335
memory::MemoryPool* pool);
317336

318337
/// Allocates a new row and initializes possible aggregates to null.
@@ -637,6 +656,20 @@ class RowContainer {
637656
return count;
638657
}
639658

659+
/// Fast path for `listRows` that returns `rowPointers_` directly. Used by
660+
/// `SortBuffer` and `SortInputSpiller`, so it skips checking the free and
661+
/// probe flags.
662+
int32_t listRowsFast(RowContainerIterator* iter, int32_t maxRows, char** rows)
663+
const {
664+
int32_t count = 0;
665+
while (count < maxRows && iter->listRowCursor < rowPointers_.size()) {
666+
char* row = rowPointers_[iter->listRowCursor];
667+
rows[count++] = row;
668+
++iter->listRowCursor;
669+
}
670+
return count;
671+
}
672+
640673
/// Extracts up to 'maxRows' rows starting at the position of 'iter'. A
641674
/// default constructed or reset iter starts at the beginning. Returns the
642675
/// number of rows written to 'rows'. Returns 0 when at end. Stops after the
@@ -651,6 +684,9 @@ class RowContainer {
651684

652685
int32_t listRows(RowContainerIterator* iter, int32_t maxRows, char** rows)
653686
const {
687+
if (useListRowIndex_) {
688+
return listRowsFast(iter, maxRows, rows);
689+
}
654690
return listRows<ProbeType::kAll>(iter, maxRows, kUnlimited, rows);
655691
}
656692

@@ -791,6 +827,10 @@ class RowContainer {
791827
return 0;
792828
}
793829

830+
const std::vector<char*, StlAllocator<char*>>& testingRowPointers() const {
831+
return rowPointers_;
832+
}
833+
794834
memory::MemoryPool* pool() const {
795835
return stringAllocator_->pool();
796836
}
@@ -1471,7 +1511,8 @@ class RowContainer {
14711511
const bool isJoinBuild_;
14721512
// True if normalized keys are enabled in initial state.
14731513
const bool hasNormalizedKeys_;
1474-
1514+
// True if use 'listRowsFast'.
1515+
const bool useListRowIndex_;
14751516
const std::unique_ptr<HashStringAllocator> stringAllocator_;
14761517

14771518
// Indicates if we can add new row to this row container. It is set to false
@@ -1527,6 +1568,7 @@ class RowContainer {
15271568
uint64_t numFreeRows_ = 0;
15281569

15291570
memory::AllocationPool rows_;
1571+
std::vector<char*, StlAllocator<char*>> rowPointers_;
15301572

15311573
int alignment_ = 1;
15321574

velox/exec/SortBuffer.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ SortBuffer::SortBuffer(
7474
}
7575

7676
data_ = std::make_unique<RowContainer>(
77-
sortedColumnTypes, nonSortedColumnTypes, pool_);
77+
sortedColumnTypes, nonSortedColumnTypes, /*useListRowIndex=*/true, pool_);
7878
spillerStoreType_ =
7979
ROW(std::move(sortedSpillColumnNames), std::move(sortedSpillColumnTypes));
8080
}
@@ -128,6 +128,7 @@ void SortBuffer::noMoreInput() {
128128
updateEstimatedOutputRowSize();
129129
// Sort the pointers to the rows in RowContainer (data_) instead of sorting
130130
// the rows.
131+
// TODO: Reuse 'RowContainer::rowPointers_'.
131132
sortedRows_.resize(numInputRows_);
132133
RowContainerIterator iter;
133134
data_->listRows(&iter, numInputRows_, sortedRows_.data());

velox/exec/Spiller.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,9 @@ bool SpillerBase::fillSpillRuns(RowContainerIterator* iterator) {
101101

102102
uint64_t totalRows{0};
103103
for (;;) {
104-
const auto numRows = container_->listRows(
105-
iterator, rows.size(), RowContainer::kUnlimited, rows.data());
104+
// TODO: Reuse 'RowContainer::rowPointers_'.
105+
const auto numRows =
106+
container_->listRows(iterator, rows.size(), rows.data());
106107
if (numRows == 0) {
107108
lastRun = true;
108109
break;

velox/exec/StreamingAggregation.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,7 @@ std::unique_ptr<RowContainer> StreamingAggregation::makeRowContainer(
425425
false,
426426
false,
427427
false,
428+
false,
428429
pool());
429430
}
430431

velox/exec/tests/AggregateSpillBenchmarkBase.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,11 @@ std::unique_ptr<RowContainer> makeRowContainer(
3636
true, // nullableKeys
3737
std::vector<Accumulator>{},
3838
dependentTypes,
39-
false, // hasNext
40-
false, // isJoinBuild
41-
false, // hasProbedFlag
42-
false, // hasNormalizedKey
39+
/*hasNext=*/false,
40+
/*isJoinBuild=*/false,
41+
/*hasProbedFlag=*/false,
42+
/*hasNormalizedKey=*/false,
43+
/*useListRowIndex=*/false,
4344
pool.get());
4445
}
4546

velox/exec/tests/AggregationTest.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,7 @@ class AggregationTest : public OperatorTestBase {
384384
false,
385385
true,
386386
true,
387+
false,
387388
pool_.get());
388389
}
389390

@@ -4028,6 +4029,7 @@ TEST_F(AggregationTest, destroyAfterPartialInitialization) {
40284029
false, // isJoinBuild
40294030
false, // hasProbedFlag
40304031
false, // hasNormalizedKeys
4032+
false, // useListRowIndex
40314033
pool());
40324034
const auto rowColumn = rows.columnAt(0);
40334035
agg.setOffsets(

0 commit comments

Comments
 (0)