diff --git a/cpp/src/common/tablet.cc b/cpp/src/common/tablet.cc index afe62ab11..4088a6927 100644 --- a/cpp/src/common/tablet.cc +++ b/cpp/src/common/tablet.cc @@ -98,10 +98,15 @@ int Tablet::init() { case BLOB: case TEXT: case STRING: { - value_matrix_[c].string_data = - static_cast(common::mem_alloc( - sizeof(String) * max_row_num_, common::MOD_TABLET)); - if (value_matrix_[c].string_data == nullptr) return E_OOM; + auto* sc = static_cast(common::mem_alloc( + sizeof(StringColumn), common::MOD_TABLET)); + if (sc == nullptr) return E_OOM; + new (sc) StringColumn(); + // 8 bytes/row is a conservative initial estimate for short + // string columns (e.g. device IDs, tags). The buffer grows + // automatically on demand via mem_realloc. + sc->init(max_row_num_, max_row_num_ * 8); + value_matrix_[c].string_col = sc; break; } default: @@ -150,7 +155,8 @@ void Tablet::destroy() { case BLOB: case TEXT: case STRING: - common::mem_free(value_matrix_[c].string_data); + value_matrix_[c].string_col->destroy(); + common::mem_free(value_matrix_[c].string_col); break; default: break; @@ -240,17 +246,51 @@ int Tablet::set_column_values(uint32_t schema_index, const void* data, return E_TYPE_NOT_SUPPORTED; } + std::memcpy(dst, data, count * elem_size); if (bitmap == nullptr) { - // All valid: bulk copy + mark all as non-null - std::memcpy(dst, data, count * elem_size); bitmaps_[schema_index].clear_all(); } else { - // Bulk copy all data (null positions will have garbage but won't be - // read). - std::memcpy(dst, data, count * elem_size); + char* tsfile_bm = bitmaps_[schema_index].get_bitmap(); + uint32_t bm_bytes = (count + 7) / 8; + std::memcpy(tsfile_bm, bitmap, bm_bytes); + } + cur_row_size_ = std::max(count, cur_row_size_); + return E_OK; +} + +int Tablet::set_column_string_values(uint32_t schema_index, + const int32_t* offsets, const char* data, + const uint8_t* bitmap, uint32_t count) { + if (err_code_ != E_OK) { + return err_code_; + } + if (UNLIKELY(schema_index >= schema_vec_->size())) { + return E_OUT_OF_RANGE; + } + if (UNLIKELY(count > static_cast(max_row_num_))) { + return E_OUT_OF_RANGE; + } + + StringColumn* sc = value_matrix_[schema_index].string_col; + if (sc == nullptr) { + return E_INVALID_ARG; + } + + uint32_t total_bytes = static_cast(offsets[count]); + if (total_bytes > sc->buf_capacity) { + sc->buf_capacity = total_bytes; + sc->buffer = (char*)mem_realloc(sc->buffer, sc->buf_capacity); + } - // bitmap uses TsFile convention (1=null, 0=valid), same as - // internal BitMap, so copy directly. + if (total_bytes > 0) { + std::memcpy(sc->buffer, data, total_bytes); + } + std::memcpy(sc->offsets, offsets, (count + 1) * sizeof(int32_t)); + sc->buf_used = total_bytes; + + if (bitmap == nullptr) { + bitmaps_[schema_index].clear_all(); + } else { char* tsfile_bm = bitmaps_[schema_index].get_bitmap(); uint32_t bm_bytes = (count + 7) / 8; std::memcpy(tsfile_bm, bitmap, bm_bytes); @@ -292,9 +332,10 @@ void* Tablet::get_value(int row_index, uint32_t schema_index, double* double_values = column_values.double_data; return &double_values[row_index]; } + case TEXT: + case BLOB: case STRING: { - auto string_values = column_values.string_data; - return &string_values[row_index]; + return &column_values.string_col->get_string_view(row_index); } default: return nullptr; @@ -304,8 +345,8 @@ void* Tablet::get_value(int row_index, uint32_t schema_index, template <> void Tablet::process_val(uint32_t row_index, uint32_t schema_index, common::String str) { - value_matrix_[schema_index].string_data[row_index].dup_from(str, - page_arena_); + value_matrix_[schema_index].string_col->append(row_index, str.buf_, + str.len_); bitmaps_[schema_index].clear(row_index); /* mark as non-null */ } @@ -450,6 +491,91 @@ void Tablet::set_column_categories( } } +void Tablet::reset_string_columns() { + size_t schema_count = schema_vec_->size(); + for (size_t c = 0; c < schema_count; c++) { + const MeasurementSchema& schema = schema_vec_->at(c); + if (schema.data_type_ == STRING || schema.data_type_ == TEXT || + schema.data_type_ == BLOB) { + value_matrix_[c].string_col->reset(); + } + } +} + +// Find all row indices where the device ID changes. A device ID is the +// composite key formed by all id columns (e.g. region + sensor_id). Row i +// is a boundary when at least one id column differs between row i-1 and row i. +// +// Example (2 id columns: region, sensor_id): +// row 0: "A", "s1" +// row 1: "A", "s2" <- boundary: sensor_id changed +// row 2: "B", "s1" <- boundary: region changed +// row 3: "B", "s1" +// row 4: "B", "s2" <- boundary: sensor_id changed +// result: [1, 2, 4] +// +// Boundaries are computed in one shot at flush time rather than maintained +// incrementally during add_value / set_column_*. The total work is similar +// either way, but batch computation here is far more CPU-friendly: the inner +// loop is a tight memcmp scan over contiguous buffers with good cache +// locality, and the CPU can pipeline comparisons without the branch overhead +// and cache thrashing of per-row bookkeeping spread across the write path. +std::vector Tablet::find_all_device_boundaries() const { + const uint32_t row_count = get_cur_row_size(); + if (row_count <= 1) return {}; + + const uint32_t nwords = (row_count + 63) / 64; + std::vector boundary(nwords, 0); + + uint32_t boundary_count = 0; + const uint32_t max_boundaries = row_count - 1; + for (auto it = id_column_indexes_.rbegin(); it != id_column_indexes_.rend(); + ++it) { + const StringColumn& sc = *value_matrix_[*it].string_col; + const int32_t* off = sc.offsets; + const char* buf = sc.buffer; + for (uint32_t i = 1; i < row_count; i++) { + if (boundary[i >> 6] & (1ULL << (i & 63))) continue; + int32_t len_a = off[i] - off[i - 1]; + int32_t len_b = off[i + 1] - off[i]; + if (len_a != len_b || + (len_a > 0 && memcmp(buf + off[i - 1], buf + off[i], + static_cast(len_a)) != 0)) { + boundary[i >> 6] |= (1ULL << (i & 63)); + if (++boundary_count >= max_boundaries) break; + } + } + if (boundary_count >= max_boundaries) break; + } + + // Sweep the bitmap word by word, extracting set bit positions in order. + // Each word covers 64 consecutive rows: word w covers rows [w*64, w*64+63]. + // + // For each word we use two standard bit tricks: + // __builtin_ctzll(bits) — count trailing zeros = index of lowest set bit + // bits &= bits - 1 — clear the lowest set bit + // + // Example: w=1, bits=0b...00010100 (bits 2 and 4 set) + // iter 1: ctzll=2 → idx=1*64+2=66, bits becomes 0b...00010000 + // iter 2: ctzll=4 → idx=1*64+4=68, bits becomes 0b...00000000 → exit + // + // Guards: idx>0 because row 0 can never be a boundary (no predecessor); + // idx result; + for (uint32_t w = 0; w < nwords; w++) { + uint64_t bits = boundary[w]; + while (bits) { + uint32_t bit = __builtin_ctzll(bits); + uint32_t idx = w * 64 + bit; + if (idx > 0 && idx < row_count) { + result.push_back(idx); + } + bits &= bits - 1; + } + } + return result; +} + std::shared_ptr Tablet::get_device_id(int i) const { std::vector id_array; id_array.push_back(new std::string(insert_target_name_)); diff --git a/cpp/src/common/tablet.h b/cpp/src/common/tablet.h index e47aa5c42..beedacc04 100644 --- a/cpp/src/common/tablet.h +++ b/cpp/src/common/tablet.h @@ -46,6 +46,71 @@ class TabletColIterator; * with their associated metadata such as column names and types. */ class Tablet { + // Arrow-style string column: offsets + contiguous buffer. + // string[i] = buffer + offsets[i], len = offsets[i+1] - offsets[i] + struct StringColumn { + int32_t* offsets; // length: max_rows + 1 (Arrow-compatible) + char* buffer; // contiguous string data + uint32_t buf_capacity; // allocated buffer size + uint32_t buf_used; // bytes written so far + + StringColumn() + : offsets(nullptr), buffer(nullptr), buf_capacity(0), buf_used(0) {} + + void init(uint32_t max_rows, uint32_t init_buf_capacity) { + offsets = (int32_t*)common::mem_alloc( + sizeof(int32_t) * (max_rows + 1), common::MOD_DEFAULT); + offsets[0] = 0; + buf_capacity = init_buf_capacity; + buffer = + (char*)common::mem_alloc(buf_capacity, common::MOD_DEFAULT); + buf_used = 0; + } + + void destroy() { + if (offsets) common::mem_free(offsets); + offsets = nullptr; + if (buffer) common::mem_free(buffer); + buffer = nullptr; + buf_capacity = buf_used = 0; + } + + void reset() { + buf_used = 0; + if (offsets) offsets[0] = 0; + } + + void append(uint32_t row, const char* data, uint32_t len) { + // Grow buffer if needed + if (buf_used + len > buf_capacity) { + buf_capacity = buf_capacity * 2 + len; + buffer = (char*)common::mem_realloc(buffer, buf_capacity); + } + memcpy(buffer + buf_used, data, len); + offsets[row] = static_cast(buf_used); + offsets[row + 1] = static_cast(buf_used + len); + buf_used += len; + } + + const char* get_str(uint32_t row) const { + return buffer + offsets[row]; + } + uint32_t get_len(uint32_t row) const { + return static_cast(offsets[row + 1] - offsets[row]); + } + // Return a String view for a given row. The returned reference is + // valid until the next call to get_string_view on this column. + common::String& get_string_view(uint32_t row) { + view_cache_.buf_ = buffer + offsets[row]; + view_cache_.len_ = + static_cast(offsets[row + 1] - offsets[row]); + return view_cache_; + } + + private: + common::String view_cache_; + }; + struct ValueMatrixEntry { union { int32_t* int32_data; @@ -53,7 +118,7 @@ class Tablet { float* float_data; double* double_data; bool* bool_data; - common::String* string_data; + StringColumn* string_col; }; }; @@ -220,6 +285,16 @@ class Tablet { void set_column_categories( const std::vector& column_categories); std::shared_ptr get_device_id(int i) const; + std::vector find_all_device_boundaries() const; + + // Bulk copy string column data (offsets + data buffer). + // offsets has count+1 entries and must start from 0 (offsets[0] == 0). + // bitmap follows TsFile convention (bit=1 means null, nullptr means all + // valid). Callers using Arrow convention (bit=1 means valid) must invert + // before calling. + int set_column_string_values(uint32_t schema_index, const int32_t* offsets, + const char* data, const uint8_t* bitmap, + uint32_t count); /** * @brief Template function to add a value of type T to the specified row * and column by name. @@ -253,6 +328,8 @@ class Tablet { schema_map_ = schema_map; } + void reset_string_columns(); + friend class TabletColIterator; friend class TsFileWriter; friend struct MeasurementNamesFromTablet; @@ -265,7 +342,6 @@ class Tablet { private: template void process_val(uint32_t row_index, uint32_t schema_index, T val); - common::PageArena page_arena_{common::MOD_TABLET}; uint32_t max_row_num_; uint32_t cur_row_size_; std::string insert_target_name_; diff --git a/cpp/src/cwrapper/arrow_c.cc b/cpp/src/cwrapper/arrow_c.cc index 6f56cfc6a..931c17de7 100644 --- a/cpp/src/cwrapper/arrow_c.cc +++ b/cpp/src/cwrapper/arrow_c.cc @@ -714,6 +714,43 @@ int TsBlockToArrowStruct(common::TsBlock& tsblock, ArrowArray* out_array, return common::E_OK; } +// Allocate and return a TsFile null bitmap (bit=1=null) by inverting an Arrow +// validity bitmap (bit=1=valid). bit_offset is the Arrow array's offset field; +// bits [bit_offset, bit_offset+n_rows) are extracted and inverted. +// Returns nullptr if validity is nullptr (all rows valid, no allocation needed) +// or on OOM. Caller must mem_free the result. +// To distinguish OOM from "no validity": OOM only when validity!=nullptr && +// result==nullptr. +static uint8_t* InvertArrowBitmap(const uint8_t* validity, int64_t bit_offset, + uint32_t n_rows) { + if (validity == nullptr) { + return nullptr; + } + uint32_t bm_bytes = (n_rows + 7) / 8; + uint8_t* null_bm = + static_cast(common::mem_alloc(bm_bytes, common::MOD_TSBLOCK)); + if (null_bm == nullptr) { + return nullptr; + } + if (bit_offset == 0) { + // Fast path: byte-level invert when there is no bit misalignment. + for (uint32_t b = 0; b < bm_bytes; b++) { + null_bm[b] = ~validity[b]; + } + } else { + // Sliced array: extract one bit at a time starting at bit_offset. + std::memset(null_bm, 0, bm_bytes); + for (uint32_t i = 0; i < n_rows; i++) { + int64_t src = bit_offset + i; + uint8_t valid = (validity[src / 8] >> (src % 8)) & 1; + if (!valid) { + null_bm[i / 8] |= static_cast(1u << (i % 8)); + } + } + } + return null_bm; +} + // Check if Arrow row is valid (non-null) based on validity bitmap static bool ArrowIsValid(const ArrowArray* arr, int64_t row) { if (arr->null_count == 0 || arr->buffers[0] == nullptr) return true; @@ -814,6 +851,13 @@ int ArrowStructToTablet(const char* table_name, const ArrowArray* in_array, const ArrowArray* col_arr = in_array->children[data_col_indices[ci]]; common::TSDataType dtype = read_modes[ci]; uint32_t tcol = static_cast(ci); + // ArrowArray::offset is non-zero when the array is a slice of a larger + // buffer — for example, when Python pandas/PyArrow passes a column that + // was created via slice(), take(), or filter() without a copy, or when + // RecordBatch::Slice() is used to split a batch. In those cases the + // underlying buffer starts at element 0 of the original allocation, so + // all buffer accesses (data, offsets, validity bitmap) must be shifted + // by `off` before reading the `length` visible elements. int64_t off = col_arr->offset; const uint8_t* validity = @@ -837,26 +881,21 @@ int ArrowStructToTablet(const char* table_name, const ArrowArray* in_array, case common::INT64: case common::FLOAT: case common::DOUBLE: { - // Invert Arrow bitmap (1=valid) to TsFile bitmap (1=null) - const uint8_t* null_bm = nullptr; - uint8_t* inverted_bm = nullptr; - if (validity != nullptr) { - uint32_t bm_bytes = (static_cast(n_rows) + 7) / 8; - inverted_bm = static_cast( - common::mem_alloc(bm_bytes, common::MOD_TSBLOCK)); - if (inverted_bm == nullptr) { - delete tablet; - return common::E_OOM; - } - for (uint32_t b = 0; b < bm_bytes; b++) { - inverted_bm[b] = ~validity[b]; - } - null_bm = inverted_bm; + size_t elem_size = + (dtype == common::INT64 || dtype == common::DOUBLE) ? 8 : 4; + const void* data = + static_cast(col_arr->buffers[1]) + + off * elem_size; + uint8_t* null_bm = InvertArrowBitmap( + validity, off, static_cast(n_rows)); + if (validity != nullptr && null_bm == nullptr) { + delete tablet; + return common::E_OOM; } - tablet->set_column_values(tcol, col_arr->buffers[1], null_bm, + tablet->set_column_values(tcol, data, null_bm, static_cast(n_rows)); - if (inverted_bm != nullptr) { - common::mem_free(inverted_bm); + if (null_bm != nullptr) { + common::mem_free(null_bm); } break; } @@ -877,16 +916,45 @@ int ArrowStructToTablet(const char* table_name, const ArrowArray* in_array, case common::TEXT: case common::STRING: case common::BLOB: { - const int32_t* offsets = - static_cast(col_arr->buffers[1]); - const char* data = + // set_column_string_values requires offsets[0] == 0. + // When off > 0 (sliced Arrow array), normalize here: shift + // offsets down by base and advance the data pointer + // accordingly. + const int32_t* raw_offsets = + static_cast(col_arr->buffers[1]) + off; + const char* raw_data = static_cast(col_arr->buffers[2]); - for (int64_t r = 0; r < n_rows; r++) { - if (!ArrowIsValid(col_arr, r)) continue; - int32_t start = offsets[off + r]; - int32_t len = offsets[off + r + 1] - start; - tablet->add_value(static_cast(r), tcol, - common::String(data + start, len)); + uint32_t nrows = static_cast(n_rows); + const int32_t* offsets = raw_offsets; + const char* data = raw_data; + int32_t* norm_offsets = nullptr; + if (off > 0) { + int32_t base = raw_offsets[0]; + norm_offsets = static_cast(common::mem_alloc( + (nrows + 1) * sizeof(int32_t), common::MOD_TSBLOCK)); + if (norm_offsets == nullptr) { + delete tablet; + return common::E_OOM; + } + for (uint32_t i = 0; i <= nrows; i++) { + norm_offsets[i] = raw_offsets[i] - base; + } + offsets = norm_offsets; + data = raw_data + base; + } + uint8_t* null_bm = InvertArrowBitmap(validity, off, nrows); + if (validity != nullptr && null_bm == nullptr) { + common::mem_free(norm_offsets); + delete tablet; + return common::E_OOM; + } + tablet->set_column_string_values(tcol, offsets, data, null_bm, + nrows); + if (null_bm != nullptr) { + common::mem_free(null_bm); + } + if (norm_offsets != nullptr) { + common::mem_free(norm_offsets); } break; } diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc index 786325db5..657fcabc2 100644 --- a/cpp/src/writer/tsfile_writer.cc +++ b/cpp/src/writer/tsfile_writer.cc @@ -1412,6 +1412,9 @@ int TsFileWriter::write_table(Tablet& tablet) { } } record_count_since_last_flush_ += tablet.cur_row_size_; + // Reset string column buffers so the tablet can be reused for the next + // batch without accumulating memory across writes. + tablet.reset_string_columns(); ret = check_memory_size_and_may_flush_chunks(); return ret; } @@ -1419,10 +1422,10 @@ int TsFileWriter::write_table(Tablet& tablet) { std::vector, int>> TsFileWriter::split_tablet_by_device(const Tablet& tablet) { std::vector, int>> result; - std::shared_ptr last_device_id = - std::make_shared("last_device_id"); + if (tablet.id_column_indexes_.empty()) { - result.emplace_back(std::move(last_device_id), 0); + auto sentinel = std::make_shared("last_device_id"); + result.emplace_back(std::move(sentinel), 0); std::vector id_array; id_array.push_back(new std::string(tablet.insert_target_name_)); auto res = std::make_shared(id_array); @@ -1431,14 +1434,22 @@ TsFileWriter::split_tablet_by_device(const Tablet& tablet) { return result; } - for (uint32_t i = 0; i < tablet.get_cur_row_size(); i++) { - std::shared_ptr cur_device_id(tablet.get_device_id(i)); - if (*cur_device_id != *last_device_id) { - result.emplace_back(std::move(last_device_id), i); - last_device_id = std::move(cur_device_id); - } + const uint32_t row_count = tablet.get_cur_row_size(); + if (row_count == 0) return result; + + auto sentinel = std::make_shared("last_device_id"); + result.emplace_back(std::move(sentinel), 0); + + auto boundaries = tablet.find_all_device_boundaries(); + + uint32_t seg_start = 0; + for (uint32_t b : boundaries) { + std::shared_ptr dev_id(tablet.get_device_id(seg_start)); + result.emplace_back(std::move(dev_id), b); + seg_start = b; } - result.emplace_back(std::move(last_device_id), tablet.get_cur_row_size()); + std::shared_ptr last_id(tablet.get_device_id(seg_start)); + result.emplace_back(std::move(last_id), row_count); return result; } @@ -1474,7 +1485,7 @@ int TsFileWriter::write_column(ChunkWriter* chunk_writer, const Tablet& tablet, col_notnull_bitmap, start_idx, end_idx); } else if (data_type == common::STRING) { ret = - write_typed_column(chunk_writer, timestamps, col_values.string_data, + write_typed_column(chunk_writer, timestamps, col_values.string_col, col_notnull_bitmap, start_idx, end_idx); } else { ASSERT(false); @@ -1539,8 +1550,8 @@ int TsFileWriter::value_write_column(ValueChunkWriter* value_chunk_writer, case common::TEXT: case common::BLOB: ret = write_typed_column(value_chunk_writer, timestamps, - (common::String*)col_values.string_data, - col_notnull_bitmap, start_idx, end_idx); + col_values.string_col, col_notnull_bitmap, + start_idx, end_idx); break; default: ret = E_NOT_SUPPORT; @@ -1618,10 +1629,22 @@ int TsFileWriter::write_typed_column(ChunkWriter* chunk_writer, int TsFileWriter::write_typed_column(ChunkWriter* chunk_writer, int64_t* timestamps, - common::String* col_values, + Tablet::StringColumn* string_col, BitMap& col_notnull_bitmap, uint32_t start_idx, uint32_t end_idx) { - DO_WRITE_TYPED_COLUMN(); + int ret = E_OK; + for (uint32_t r = start_idx; r < end_idx; r++) { + if (LIKELY(!col_notnull_bitmap.test(r))) { + common::String val( + string_col->buffer + string_col->offsets[r], + static_cast(string_col->offsets[r + 1] - + string_col->offsets[r])); + if (RET_FAIL(chunk_writer->write(timestamps[r], val))) { + return ret; + } + } + } + return ret; } int TsFileWriter::write_typed_column(ValueChunkWriter* value_chunk_writer, @@ -1661,10 +1684,26 @@ int TsFileWriter::write_typed_column(ValueChunkWriter* value_chunk_writer, int TsFileWriter::write_typed_column(ValueChunkWriter* value_chunk_writer, int64_t* timestamps, - common::String* col_values, + Tablet::StringColumn* string_col, common::BitMap& col_notnull_bitmap, uint32_t start_idx, uint32_t end_idx) { - DO_VALUE_WRITE_TYPED_COLUMN(); + int ret = E_OK; + for (uint32_t r = start_idx; r < end_idx; r++) { + common::String val(string_col->buffer + string_col->offsets[r], + static_cast(string_col->offsets[r + 1] - + string_col->offsets[r])); + if (LIKELY(col_notnull_bitmap.test(r))) { + if (RET_FAIL(value_chunk_writer->write(timestamps[r], val, true))) { + return ret; + } + } else { + if (RET_FAIL( + value_chunk_writer->write(timestamps[r], val, false))) { + return ret; + } + } + } + return ret; } // TODO make sure ret is meaningful to SDK user diff --git a/cpp/src/writer/tsfile_writer.h b/cpp/src/writer/tsfile_writer.h index ff7cdbac2..01028e2e2 100644 --- a/cpp/src/writer/tsfile_writer.h +++ b/cpp/src/writer/tsfile_writer.h @@ -145,7 +145,7 @@ class TsFileWriter { common::BitMap& col_notnull_bitmap, uint32_t start_idx, uint32_t end_idx); int write_typed_column(ChunkWriter* chunk_writer, int64_t* timestamps, - common::String* col_values, + Tablet::StringColumn* string_col, common::BitMap& col_notnull_bitmap, uint32_t start_idx, uint32_t end_idx); @@ -206,7 +206,8 @@ class TsFileWriter { common::BitMap& col_notnull_bitmap, uint32_t start_idx, uint32_t end_idx); int write_typed_column(ValueChunkWriter* value_chunk_writer, - int64_t* timestamps, common::String* col_values, + int64_t* timestamps, + Tablet::StringColumn* string_col, common::BitMap& col_notnull_bitmap, uint32_t start_idx, uint32_t end_idx); diff --git a/cpp/test/common/tsblock/arrow_tsblock_test.cc b/cpp/test/common/tsblock/arrow_tsblock_test.cc index 123efb59f..348c18a4a 100644 --- a/cpp/test/common/tsblock/arrow_tsblock_test.cc +++ b/cpp/test/common/tsblock/arrow_tsblock_test.cc @@ -20,6 +20,7 @@ #include +#include "common/tablet.h" #include "common/tsblock/tsblock.h" #include "cwrapper/tsfile_cwrapper.h" #include "utils/db_utils.h" @@ -34,9 +35,13 @@ using ArrowSchema = ::ArrowSchema; #define ARROW_FLAG_NULLABLE 2 #define ARROW_FLAG_MAP_KEYS_SORTED 4 -// Function declaration (defined in arrow_c.cc) +// Function declarations (defined in arrow_c.cc) int TsBlockToArrowStruct(common::TsBlock& tsblock, ArrowArray* out_array, ArrowSchema* out_schema); +int ArrowStructToTablet(const char* table_name, const ArrowArray* in_array, + const ArrowSchema* in_schema, + const storage::TableSchema* reg_schema, + storage::Tablet** out_tablet, int time_col_index); } // namespace arrow static void VerifyArrowSchema( @@ -332,3 +337,152 @@ TEST(ArrowTsBlockTest, TsBlock_EdgeCases) { } } } + +// Test ArrowStructToTablet with sliced Arrow arrays (offset > 0). +// Full arrays have 5 rows; offset=2 on every child means only rows [2..4] +// (3 rows) are consumed. Row index 3 in the full array (local index 1 in the +// slice) carries a null in the INT32 column. +TEST(ArrowStructToTabletTest, SlicedArray_WithOffset) { + // --- timestamps (int64, no nulls) --- + int64_t ts_data[5] = {1000, 1001, 1002, 1003, 1004}; + const void* ts_bufs[2] = {nullptr, ts_data}; + ArrowArray ts_arr = {}; + ts_arr.length = 3; + ts_arr.offset = 2; + ts_arr.null_count = 0; + ts_arr.n_buffers = 2; + ts_arr.buffers = ts_bufs; + + ArrowSchema ts_schema = {}; + ts_schema.format = "l"; + ts_schema.name = "time"; + ts_schema.flags = ARROW_FLAG_NULLABLE; + + // --- INT32 column: values [100..104], row 3 (global) = local row 1 null + // Arrow validity bitmap: bit=1 means valid. + // bits 0,1,2,4=valid, bit 3=null → byte 0 = 0b00010111 = 0x17 + int32_t int_data[5] = {100, 101, 102, 103, 104}; + uint8_t int_validity[1] = {0x17}; + const void* int_bufs[2] = {int_validity, int_data}; + ArrowArray int_arr = {}; + int_arr.length = 3; + int_arr.offset = 2; + int_arr.null_count = 1; + int_arr.n_buffers = 2; + int_arr.buffers = int_bufs; + + ArrowSchema int_schema = {}; + int_schema.format = "i"; + int_schema.name = "int_col"; + int_schema.flags = ARROW_FLAG_NULLABLE; + + // --- DOUBLE column: values [10.0..14.0], no nulls --- + double dbl_data[5] = {10.0, 11.0, 12.0, 13.0, 14.0}; + const void* dbl_bufs[2] = {nullptr, dbl_data}; + ArrowArray dbl_arr = {}; + dbl_arr.length = 3; + dbl_arr.offset = 2; + dbl_arr.null_count = 0; + dbl_arr.n_buffers = 2; + dbl_arr.buffers = dbl_bufs; + + ArrowSchema dbl_schema = {}; + dbl_schema.format = "g"; + dbl_schema.name = "dbl_col"; + dbl_schema.flags = ARROW_FLAG_NULLABLE; + + // --- UTF-8 string column: "str0".."str4", no nulls --- + // With offset=2, the slice covers "str2","str3","str4". + const char str_chars[] = "str0str1str2str3str4"; + int32_t str_offs[6] = {0, 4, 8, 12, 16, 20}; + const void* str_bufs[3] = {nullptr, str_offs, str_chars}; + ArrowArray str_arr = {}; + str_arr.length = 3; + str_arr.offset = 2; + str_arr.null_count = 0; + str_arr.n_buffers = 3; + str_arr.buffers = str_bufs; + + ArrowSchema str_schema = {}; + str_schema.format = "u"; + str_schema.name = "str_col"; + str_schema.flags = ARROW_FLAG_NULLABLE; + + // --- parent struct array --- + ArrowArray* children[4] = {&ts_arr, &int_arr, &dbl_arr, &str_arr}; + ArrowArray parent = {}; + parent.length = 3; + parent.n_buffers = 0; + parent.n_children = 4; + parent.children = children; + + ArrowSchema* child_schemas[4] = {&ts_schema, &int_schema, &dbl_schema, + &str_schema}; + ArrowSchema parent_schema = {}; + parent_schema.format = "+s"; + parent_schema.n_children = 4; + parent_schema.children = child_schemas; + + storage::Tablet* tablet = nullptr; + // time_col_index=0 → timestamp from ts_arr; data cols are int, dbl, str + int ret = arrow::ArrowStructToTablet("test_table", &parent, &parent_schema, + nullptr, &tablet, 0); + ASSERT_EQ(ret, common::E_OK); + ASSERT_NE(tablet, nullptr); + + EXPECT_EQ(tablet->get_cur_row_size(), 3u); + + common::TSDataType dtype; + void* v; + + // INT32 col (schema_index=0): local rows 0,1,2 → 102, null, 104 + v = tablet->get_value(0, 0, dtype); + ASSERT_NE(v, nullptr); + EXPECT_EQ(*static_cast(v), 102); + + v = tablet->get_value(1, 0, dtype); + EXPECT_EQ(v, nullptr); // row 3 in original data is null + + v = tablet->get_value(2, 0, dtype); + ASSERT_NE(v, nullptr); + EXPECT_EQ(*static_cast(v), 104); + + // DOUBLE col (schema_index=1): local rows 0,1,2 → 12.0, 13.0, 14.0 + v = tablet->get_value(0, 1, dtype); + ASSERT_NE(v, nullptr); + EXPECT_DOUBLE_EQ(*static_cast(v), 12.0); + + v = tablet->get_value(1, 1, dtype); + ASSERT_NE(v, nullptr); + EXPECT_DOUBLE_EQ(*static_cast(v), 13.0); + + v = tablet->get_value(2, 1, dtype); + ASSERT_NE(v, nullptr); + EXPECT_DOUBLE_EQ(*static_cast(v), 14.0); + + // STRING col (schema_index=2): local rows 0,1,2 → "str2","str3","str4" + // Arrow "u" maps to common::TEXT; offset normalization in arrow_c.cc + // ensures offsets[0]==0 before calling set_column_string_values. + v = tablet->get_value(0, 2, dtype); + ASSERT_NE(v, nullptr); + { + common::String* s = static_cast(v); + EXPECT_EQ(std::string(s->buf_, s->len_), "str2"); + } + + v = tablet->get_value(1, 2, dtype); + ASSERT_NE(v, nullptr); + { + common::String* s = static_cast(v); + EXPECT_EQ(std::string(s->buf_, s->len_), "str3"); + } + + v = tablet->get_value(2, 2, dtype); + ASSERT_NE(v, nullptr); + { + common::String* s = static_cast(v); + EXPECT_EQ(std::string(s->buf_, s->len_), "str4"); + } + + delete tablet; +}