Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 142 additions & 16 deletions cpp/src/common/tablet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,15 @@ int Tablet::init() {
case BLOB:
case TEXT:
case STRING: {
value_matrix_[c].string_data =
static_cast<common::String*>(common::mem_alloc(
sizeof(String) * max_row_num_, common::MOD_TABLET));
if (value_matrix_[c].string_data == nullptr) return E_OOM;
auto* sc = static_cast<StringColumn*>(common::mem_alloc(
sizeof(StringColumn), common::MOD_TABLET));
if (sc == nullptr) return E_OOM;
new (sc) StringColumn();
// 8 bytes/row is a conservative initial estimate for short
// string columns (e.g. device IDs, tags). The buffer grows
// automatically on demand via mem_realloc.
sc->init(max_row_num_, max_row_num_ * 8);
value_matrix_[c].string_col = sc;
break;
}
default:
Expand Down Expand Up @@ -150,7 +155,8 @@ void Tablet::destroy() {
case BLOB:
case TEXT:
case STRING:
common::mem_free(value_matrix_[c].string_data);
value_matrix_[c].string_col->destroy();
common::mem_free(value_matrix_[c].string_col);
break;
default:
break;
Expand Down Expand Up @@ -240,17 +246,51 @@ int Tablet::set_column_values(uint32_t schema_index, const void* data,
return E_TYPE_NOT_SUPPORTED;
}

std::memcpy(dst, data, count * elem_size);
if (bitmap == nullptr) {
// All valid: bulk copy + mark all as non-null
std::memcpy(dst, data, count * elem_size);
bitmaps_[schema_index].clear_all();
} else {
// Bulk copy all data (null positions will have garbage but won't be
// read).
std::memcpy(dst, data, count * elem_size);
char* tsfile_bm = bitmaps_[schema_index].get_bitmap();
uint32_t bm_bytes = (count + 7) / 8;
std::memcpy(tsfile_bm, bitmap, bm_bytes);
}
cur_row_size_ = std::max(count, cur_row_size_);
return E_OK;
}

int Tablet::set_column_string_values(uint32_t schema_index,
const int32_t* offsets, const char* data,
const uint8_t* bitmap, uint32_t count) {
if (err_code_ != E_OK) {
return err_code_;
}
if (UNLIKELY(schema_index >= schema_vec_->size())) {
return E_OUT_OF_RANGE;
}
if (UNLIKELY(count > static_cast<uint32_t>(max_row_num_))) {
return E_OUT_OF_RANGE;
}

StringColumn* sc = value_matrix_[schema_index].string_col;
if (sc == nullptr) {
return E_INVALID_ARG;
}

uint32_t total_bytes = static_cast<uint32_t>(offsets[count]);
if (total_bytes > sc->buf_capacity) {
sc->buf_capacity = total_bytes;
sc->buffer = (char*)mem_realloc(sc->buffer, sc->buf_capacity);
}

// bitmap uses TsFile convention (1=null, 0=valid), same as
// internal BitMap, so copy directly.
if (total_bytes > 0) {
std::memcpy(sc->buffer, data, total_bytes);
}
std::memcpy(sc->offsets, offsets, (count + 1) * sizeof(int32_t));
sc->buf_used = total_bytes;

if (bitmap == nullptr) {
bitmaps_[schema_index].clear_all();
} else {
char* tsfile_bm = bitmaps_[schema_index].get_bitmap();
uint32_t bm_bytes = (count + 7) / 8;
std::memcpy(tsfile_bm, bitmap, bm_bytes);
Expand Down Expand Up @@ -292,9 +332,10 @@ void* Tablet::get_value(int row_index, uint32_t schema_index,
double* double_values = column_values.double_data;
return &double_values[row_index];
}
case TEXT:
case BLOB:
case STRING: {
auto string_values = column_values.string_data;
return &string_values[row_index];
return &column_values.string_col->get_string_view(row_index);
}
default:
return nullptr;
Expand All @@ -304,8 +345,8 @@ void* Tablet::get_value(int row_index, uint32_t schema_index,
template <>
void Tablet::process_val(uint32_t row_index, uint32_t schema_index,
common::String str) {
value_matrix_[schema_index].string_data[row_index].dup_from(str,
page_arena_);
value_matrix_[schema_index].string_col->append(row_index, str.buf_,
str.len_);
bitmaps_[schema_index].clear(row_index); /* mark as non-null */
}

Expand Down Expand Up @@ -450,6 +491,91 @@ void Tablet::set_column_categories(
}
}

void Tablet::reset_string_columns() {
size_t schema_count = schema_vec_->size();
for (size_t c = 0; c < schema_count; c++) {
const MeasurementSchema& schema = schema_vec_->at(c);
if (schema.data_type_ == STRING || schema.data_type_ == TEXT ||
schema.data_type_ == BLOB) {
value_matrix_[c].string_col->reset();
}
}
}

// Find all row indices where the device ID changes. A device ID is the
// composite key formed by all id columns (e.g. region + sensor_id). Row i
// is a boundary when at least one id column differs between row i-1 and row i.
//
// Example (2 id columns: region, sensor_id):
// row 0: "A", "s1"
// row 1: "A", "s2" <- boundary: sensor_id changed
// row 2: "B", "s1" <- boundary: region changed
// row 3: "B", "s1"
// row 4: "B", "s2" <- boundary: sensor_id changed
// result: [1, 2, 4]
//
// Boundaries are computed in one shot at flush time rather than maintained
// incrementally during add_value / set_column_*. The total work is similar
// either way, but batch computation here is far more CPU-friendly: the inner
// loop is a tight memcmp scan over contiguous buffers with good cache
// locality, and the CPU can pipeline comparisons without the branch overhead
// and cache thrashing of per-row bookkeeping spread across the write path.
std::vector<uint32_t> Tablet::find_all_device_boundaries() const {
const uint32_t row_count = get_cur_row_size();
if (row_count <= 1) return {};

const uint32_t nwords = (row_count + 63) / 64;
std::vector<uint64_t> boundary(nwords, 0);

uint32_t boundary_count = 0;
const uint32_t max_boundaries = row_count - 1;
for (auto it = id_column_indexes_.rbegin(); it != id_column_indexes_.rend();
++it) {
const StringColumn& sc = *value_matrix_[*it].string_col;
const int32_t* off = sc.offsets;
const char* buf = sc.buffer;
for (uint32_t i = 1; i < row_count; i++) {
if (boundary[i >> 6] & (1ULL << (i & 63))) continue;
int32_t len_a = off[i] - off[i - 1];
int32_t len_b = off[i + 1] - off[i];
if (len_a != len_b ||
(len_a > 0 && memcmp(buf + off[i - 1], buf + off[i],
static_cast<uint32_t>(len_a)) != 0)) {
boundary[i >> 6] |= (1ULL << (i & 63));
if (++boundary_count >= max_boundaries) break;
}
}
if (boundary_count >= max_boundaries) break;
}

// Sweep the bitmap word by word, extracting set bit positions in order.
// Each word covers 64 consecutive rows: word w covers rows [w*64, w*64+63].
//
// For each word we use two standard bit tricks:
// __builtin_ctzll(bits) — count trailing zeros = index of lowest set bit
// bits &= bits - 1 — clear the lowest set bit
//
// Example: w=1, bits=0b...00010100 (bits 2 and 4 set)
// iter 1: ctzll=2 → idx=1*64+2=66, bits becomes 0b...00010000
// iter 2: ctzll=4 → idx=1*64+4=68, bits becomes 0b...00000000 → exit
//
// Guards: idx>0 because row 0 can never be a boundary (no predecessor);
// idx<row_count trims padding bits in the last word when row_count%64 != 0.
std::vector<uint32_t> result;
for (uint32_t w = 0; w < nwords; w++) {
uint64_t bits = boundary[w];
while (bits) {
uint32_t bit = __builtin_ctzll(bits);
uint32_t idx = w * 64 + bit;
if (idx > 0 && idx < row_count) {
result.push_back(idx);
}
bits &= bits - 1;
}
}
return result;
}

std::shared_ptr<IDeviceID> Tablet::get_device_id(int i) const {
std::vector<std::string*> id_array;
id_array.push_back(new std::string(insert_target_name_));
Expand Down
80 changes: 78 additions & 2 deletions cpp/src/common/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,79 @@ class TabletColIterator;
* with their associated metadata such as column names and types.
*/
class Tablet {
// Arrow-style string column: offsets + contiguous buffer.
// string[i] = buffer + offsets[i], len = offsets[i+1] - offsets[i]
struct StringColumn {
int32_t* offsets; // length: max_rows + 1 (Arrow-compatible)
char* buffer; // contiguous string data
uint32_t buf_capacity; // allocated buffer size
uint32_t buf_used; // bytes written so far

StringColumn()
: offsets(nullptr), buffer(nullptr), buf_capacity(0), buf_used(0) {}

void init(uint32_t max_rows, uint32_t init_buf_capacity) {
offsets = (int32_t*)common::mem_alloc(
sizeof(int32_t) * (max_rows + 1), common::MOD_DEFAULT);
offsets[0] = 0;
buf_capacity = init_buf_capacity;
buffer =
(char*)common::mem_alloc(buf_capacity, common::MOD_DEFAULT);
buf_used = 0;
}

void destroy() {
if (offsets) common::mem_free(offsets);
offsets = nullptr;
if (buffer) common::mem_free(buffer);
buffer = nullptr;
buf_capacity = buf_used = 0;
}

void reset() {
buf_used = 0;
if (offsets) offsets[0] = 0;
}

void append(uint32_t row, const char* data, uint32_t len) {
// Grow buffer if needed
if (buf_used + len > buf_capacity) {
buf_capacity = buf_capacity * 2 + len;
buffer = (char*)common::mem_realloc(buffer, buf_capacity);
}
memcpy(buffer + buf_used, data, len);
offsets[row] = static_cast<int32_t>(buf_used);
offsets[row + 1] = static_cast<int32_t>(buf_used + len);
buf_used += len;
}

const char* get_str(uint32_t row) const {
return buffer + offsets[row];
}
uint32_t get_len(uint32_t row) const {
return static_cast<uint32_t>(offsets[row + 1] - offsets[row]);
}
// Return a String view for a given row. The returned reference is
// valid until the next call to get_string_view on this column.
common::String& get_string_view(uint32_t row) {
view_cache_.buf_ = buffer + offsets[row];
view_cache_.len_ =
static_cast<uint32_t>(offsets[row + 1] - offsets[row]);
return view_cache_;
}

private:
common::String view_cache_;
};

struct ValueMatrixEntry {
union {
int32_t* int32_data;
int64_t* int64_data;
float* float_data;
double* double_data;
bool* bool_data;
common::String* string_data;
StringColumn* string_col;
};
};

Expand Down Expand Up @@ -220,6 +285,16 @@ class Tablet {
void set_column_categories(
const std::vector<common::ColumnCategory>& column_categories);
std::shared_ptr<IDeviceID> get_device_id(int i) const;
std::vector<uint32_t> find_all_device_boundaries() const;

// Bulk copy string column data (offsets + data buffer).
// offsets has count+1 entries and must start from 0 (offsets[0] == 0).
// bitmap follows TsFile convention (bit=1 means null, nullptr means all
// valid). Callers using Arrow convention (bit=1 means valid) must invert
// before calling.
int set_column_string_values(uint32_t schema_index, const int32_t* offsets,
const char* data, const uint8_t* bitmap,
uint32_t count);
/**
* @brief Template function to add a value of type T to the specified row
* and column by name.
Expand Down Expand Up @@ -253,6 +328,8 @@ class Tablet {
schema_map_ = schema_map;
}

void reset_string_columns();

friend class TabletColIterator;
friend class TsFileWriter;
friend struct MeasurementNamesFromTablet;
Expand All @@ -265,7 +342,6 @@ class Tablet {
private:
template <typename T>
void process_val(uint32_t row_index, uint32_t schema_index, T val);
common::PageArena page_arena_{common::MOD_TABLET};
uint32_t max_row_num_;
uint32_t cur_row_size_;
std::string insert_target_name_;
Expand Down
Loading
Loading