diff --git a/quickwit/quickwit-parquet-engine/src/merge/mod.rs b/quickwit/quickwit-parquet-engine/src/merge/mod.rs index 008c456208a..fcc353992a9 100644 --- a/quickwit/quickwit-parquet-engine/src/merge/mod.rs +++ b/quickwit/quickwit-parquet-engine/src/merge/mod.rs @@ -24,6 +24,7 @@ mod merge_order; pub mod metadata_aggregation; pub mod policy; mod schema; +pub mod streaming; mod writer; #[cfg(test)] diff --git a/quickwit/quickwit-parquet-engine/src/merge/streaming.rs b/quickwit/quickwit-parquet-engine/src/merge/streaming.rs new file mode 100644 index 00000000000..609233ed6c4 --- /dev/null +++ b/quickwit/quickwit-parquet-engine/src/merge/streaming.rs @@ -0,0 +1,2245 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Streaming column-major merge engine with page-bounded body cols. +//! +//! Architecture (Husky multi-input → multi-output sorted merge): +//! +//! 1. **Phase 0 (async): drain sort cols** from each input. With Husky +//! column ordering, sort cols + `sorted_series` are the prefix of +//! each row group's body bytes, so we can stop the decoder after +//! those are fully decoded. The remaining body col pages stay +//! un-read in the input stream, ready for phase 3. +//! 2. **Phase 1: compute merge order** via the existing k-way merge +//! on `(sorted_series, timestamp_secs)` from the per-input sort +//! col [`RecordBatch`]es. Produces a run-length-encoded merge +//! plan over input row positions. +//! 3. **Phase 2: compute output boundaries** with the caller's +//! `num_outputs`, splitting at `sorted_series` transitions so each +//! output file's key range is non-overlapping with adjacent files. +//! 4. **Phase 3 (blocking + block_on bridges): streaming write**. +//! All output writers are alive for the duration. For each column +//! in Husky order, every output's col K is written in turn: +//! - Sort col / `sorted_series`: applied via `take` from the +//! already-buffered phase 0 data. +//! - Body col: each output page is assembled via +//! [`arrow::compute::interleave`] from input page slices, with +//! decoders advanced page-by-page via `Handle::block_on` from +//! inside a sync iterator. Pages flush to the writer's sink as +//! [`SerializedColumnWriter`]'s page-size threshold trips — +//! memory stays bounded by the in-flight output page plus a +//! small number of in-flight input pages. +//! +//! After all M outputs' col K is done, every input decoder is at the +//! start of col K+1 in its single row group. Move to col K+1. +//! +//! ## Single-RG inputs assumption +//! +//! PR-6b.2 only handles **single-row-group inputs**. With multi-RG +//! inputs the body bytes interleave with successive RGs' sort cols +//! (`sort_cols_RG0`, `body_cols_RG0`, `sort_cols_RG1`, ...), so +//! draining sort cols from RG1 onwards requires either consuming + +//! discarding body cols of RG0 from the stream or buffering them. +//! Neither fits the page-bounded contract; multi-RG-input streaming +//! lands in a follow-up. Today's real inputs are: (a) post-PR-3 +//! single-RG ingest splits, or (b) PR-5's legacy adapter that +//! presents arbitrary multi-RG splits as one synthetic RG. Both +//! satisfy the assumption. +//! +//! [`SerializedColumnWriter`]: parquet::file::writer::SerializedColumnWriter + +#![allow(dead_code)] + +use std::collections::{HashMap, HashSet}; +use std::ops::Range; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use anyhow::{Context, Result, anyhow, bail}; +use arrow::array::{Array, ArrayRef, RecordBatch, new_null_array}; +use arrow::compute::interleave; +use arrow::datatypes::{DataType, Field, Schema as ArrowSchema, SchemaRef}; +use parquet::file::metadata::ParquetMetaData; +use tokio::runtime::Handle; +use tracing::info; +use ulid::Ulid; + +use super::merge_order::{MergeRun, compute_merge_order, compute_output_boundaries}; +use super::schema::{align_inputs_to_union_schema, optimize_output_batch}; +use super::writer::{ + apply_merge_permutation, build_merge_kv_metadata, build_sorting_columns, + resolve_sort_field_names, verify_sort_order, +}; +use super::{InputMetadata, MergeConfig, MergeOutputFile}; +use crate::row_keys; +use crate::sort_fields::{ + equivalent_schemas_for_compaction, is_timestamp_column_name, parse_sort_fields, +}; +use crate::sorted_series::SORTED_SERIES_COLUMN; +use crate::split::TAG_SERVICE; +use crate::storage::page_decoder::{DecodedPage, StreamDecoder}; +use crate::storage::split_writer::{extract_metric_names, extract_time_range}; +use crate::storage::streaming_writer::StreamingParquetWriter; +use crate::storage::{ + ColumnPageStream, PARQUET_META_NUM_MERGE_OPS, PARQUET_META_RG_PARTITION_PREFIX_LEN, + PARQUET_META_SORT_FIELDS, PARQUET_META_WINDOW_DURATION, PARQUET_META_WINDOW_START, +}; +use crate::zonemap::{self, ZonemapOptions}; + +/// Output page size in rows for body-col assembly. Each call to the +/// sync iterator passed to [`write_next_column_arrays`] yields one +/// `ArrayRef` of up to this many rows; the parquet writer flushes +/// physical pages independently as encoded bytes cross +/// `data_page_size_limit`. 1024 keeps assembled arrays small enough +/// to bound per-output memory but large enough to amortise per-page +/// fixed costs. +/// +/// [`write_next_column_arrays`]: crate::storage::streaming_writer::RowGroupBuilder::write_next_column_arrays +const OUTPUT_PAGE_ROWS: usize = 1024; + +/// Streaming N-input → M-output column-major merge. +/// +/// See module docs for the four phases. Returns one +/// [`MergeOutputFile`] per output file produced (zero-row outputs are +/// dropped). Caller's `config.num_outputs` is the upper bound on the +/// number of files; fewer are returned when there are not enough +/// `sorted_series` transitions to split at. +pub async fn streaming_merge_sorted_parquet_files( + inputs: Vec>, + output_dir: &Path, + config: &MergeConfig, +) -> Result> { + if inputs.is_empty() { + bail!("merge requires at least one input"); + } + if config.num_outputs == 0 { + bail!("num_outputs must be at least 1"); + } + + // Validate that all inputs are single-RG (or zero-RG, which means + // the file has no data). PR-6b.2 simplification — see module docs. + for (idx, stream) in inputs.iter().enumerate() { + let num_rgs = stream.metadata().num_row_groups(); + if num_rgs > 1 { + bail!( + "streaming merge requires single-row-group inputs in PR-6b.2 (input {idx} has \ + {num_rgs} row groups); multi-RG metric-aligned inputs land in a follow-up. \ + Legacy multi-RG (rg_partition_prefix_len=0) inputs must go through the \ + PR-5 adapter, which presents them as a single synthetic row group." + ); + } + } + + let input_meta = extract_and_validate_input_metadata(&inputs)?; + + info!( + num_inputs = inputs.len(), + num_outputs = config.num_outputs, + sort_fields = %input_meta.sort_fields, + "starting streaming sorted parquet merge" + ); + + let output_dir = output_dir.to_path_buf(); + let writer_config = config.writer_config.clone(); + let num_outputs = config.num_outputs; + + // Move everything onto a blocking task. Inside, the decoders need to + // make async I/O calls (page fetches over the network); we drive + // those via `handle.block_on(...)` from inside sync iterators that + // feed the parquet writer's column-write methods. The writer is + // sync; this single-task pattern avoids the lifetime complexity of + // moving borrowed `RowGroupBuilder`s across tokio tasks. + let result = tokio::task::spawn_blocking(move || -> Result> { + let handle = Handle::current(); + + let mut inputs = inputs; + let mut decoders_state = build_input_decoders_state(&mut inputs)?; + + // Phase 0 + let sort_col_batches = + drain_sort_cols_all_inputs(&handle, &mut decoders_state, &input_meta.sort_fields)?; + + if sort_col_batches.iter().all(|b| b.num_rows() == 0) { + info!("all inputs empty, producing no output"); + return Ok(Vec::new()); + } + + // Phase 1: align inputs to a union sort-col schema so the merge-order + // comparator sees uniformly-typed `sorted_series` + `timestamp_secs`. + let (sort_union_schema, aligned_sort_batches) = + align_inputs_to_union_schema(&sort_col_batches, &input_meta.sort_fields)?; + let merge_order = compute_merge_order(&aligned_sort_batches, &input_meta.sort_fields)?; + + // Phase 2: split merge order into M outputs at sorted_series boundaries. + let boundaries = + compute_output_boundaries(&merge_order, &aligned_sort_batches, num_outputs)?; + + let total_rows: usize = aligned_sort_batches.iter().map(|b| b.num_rows()).sum(); + info!( + total_rows, + num_outputs = boundaries.len(), + "streaming merge order computed" + ); + + // Pre-compute per-input row → (output_idx, output_position) destination map. + // Used by every column write to slice take/interleave indices per page. + let destinations = + build_input_row_destinations(&aligned_sort_batches, &merge_order, &boundaries); + + // Phase 3 + let outputs = write_streaming_outputs( + &handle, + &mut decoders_state, + &aligned_sort_batches, + &sort_union_schema, + &merge_order, + &boundaries, + &destinations, + &input_meta, + &writer_config, + &output_dir, + )?; + + // MC-1: total row count preserved. + let output_total: usize = outputs.iter().map(|o| o.num_rows).sum(); + quickwit_dst::check_invariant!( + quickwit_dst::invariants::InvariantId::MC1, + output_total == total_rows, + ": streaming merge input rows={}, output rows={}", + total_rows, + output_total, + ); + + Ok(outputs) + }) + .await + .context("streaming merge blocking task panicked")??; + + Ok(result) +} + +/// Per-input state held across phase 0 → phase 3 inside the blocking +/// task. The decoder borrows mutably from its input stream; both live +/// here so the borrow checker is happy with one struct per input. +struct InputDecoderState { + /// Owned stream (so the decoder's `&mut dyn ColumnPageStream` borrow + /// is anchored to a stable address inside this struct). + stream: Box, + metadata: Arc, + /// Arrow schema of this input (from parquet → arrow conversion). + arrow_schema: SchemaRef, +} + +/// Build per-input state. The streams are moved in from the caller. +fn build_input_decoders_state( + inputs: &mut Vec>, +) -> Result> { + let mut states = Vec::with_capacity(inputs.len()); + for stream in inputs.drain(..) { + let metadata = Arc::clone(stream.metadata()); + let parquet_schema = metadata.file_metadata().schema_descr(); + let arrow_schema = parquet::arrow::parquet_to_arrow_schema(parquet_schema, None) + .context("converting parquet schema → arrow")?; + states.push(InputDecoderState { + stream, + metadata, + arrow_schema: Arc::new(arrow_schema), + }); + } + Ok(states) +} + +/// Extract sort schema, window, and merge-ops metadata from each +/// input stream and validate consistency across inputs. Reads +/// `qh.*` KVs from [`ColumnPageStream::metadata`]. +fn extract_and_validate_input_metadata( + inputs: &[Box], +) -> Result { + let mut consensus_sort_fields: Option = None; + let mut consensus_window_start: Option> = None; + let mut consensus_window_duration: Option = None; + let mut consensus_prefix_len: Option = None; + let mut max_merge_ops: u32 = 0; + + for (idx, stream) in inputs.iter().enumerate() { + let metadata = stream.metadata(); + let kv_metadata = metadata.file_metadata().key_value_metadata(); + + let find_kv = |key: &str| -> Option { + kv_metadata.and_then(|kvs| { + kvs.iter() + .find(|kv| kv.key == key) + .and_then(|kv| kv.value.clone()) + }) + }; + + let file_sort_fields = match find_kv(PARQUET_META_SORT_FIELDS) { + Some(s) => s, + None => bail!( + "input {idx} is missing {} metadata", + PARQUET_META_SORT_FIELDS, + ), + }; + + match &consensus_sort_fields { + Some(expected) => { + let expected_schema = parse_sort_fields(expected)?; + let file_schema = parse_sort_fields(&file_sort_fields).with_context(|| { + format!("parsing sort schema from input {idx}: '{file_sort_fields}'") + })?; + if !equivalent_schemas_for_compaction(&expected_schema, &file_schema) { + bail!( + "sort schema mismatch in input {idx}: expected '{expected}', found \ + '{file_sort_fields}'", + ); + } + } + None => { + parse_sort_fields(&file_sort_fields).with_context(|| { + format!("parsing sort schema from input {idx}: '{file_sort_fields}'") + })?; + consensus_sort_fields = Some(file_sort_fields.clone()); + } + } + + let file_window_start = find_kv(PARQUET_META_WINDOW_START) + .map(|s| s.parse::()) + .transpose() + .with_context(|| format!("parsing window_start from input {idx}"))?; + match &consensus_window_start { + Some(expected) if file_window_start != *expected => { + bail!( + "window_start mismatch in input {idx}: expected {:?}, found {:?}", + expected, + file_window_start, + ); + } + Some(_) => {} + None => consensus_window_start = Some(file_window_start), + } + + let file_window_duration = find_kv(PARQUET_META_WINDOW_DURATION) + .map(|s| s.parse::()) + .transpose() + .with_context(|| format!("parsing window_duration from input {idx}"))? + .unwrap_or(0); + match &consensus_window_duration { + Some(expected) if file_window_duration != *expected => { + bail!( + "window_duration_secs mismatch in input {idx}: expected {expected}, found \ + {file_window_duration}", + ); + } + Some(_) => {} + None => consensus_window_duration = Some(file_window_duration), + } + + let file_merge_ops = find_kv(PARQUET_META_NUM_MERGE_OPS) + .map(|s| s.parse::()) + .transpose() + .with_context(|| format!("parsing num_merge_ops from input {idx}"))? + .unwrap_or(0); + if file_merge_ops > max_merge_ops { + max_merge_ops = file_merge_ops; + } + + let file_prefix_len = find_kv(PARQUET_META_RG_PARTITION_PREFIX_LEN) + .map(|s| s.parse::()) + .transpose() + .with_context(|| format!("parsing rg_partition_prefix_len from input {idx}"))? + .unwrap_or(0); + match &consensus_prefix_len { + Some(expected) if file_prefix_len != *expected => { + bail!( + "rg_partition_prefix_len mismatch in input {idx}: expected {expected}, found \ + {file_prefix_len}", + ); + } + Some(_) => {} + None => consensus_prefix_len = Some(file_prefix_len), + } + } + + let sort_fields = match consensus_sort_fields { + Some(s) => s, + None => bail!("at least one input is required"), + }; + + Ok(InputMetadata { + sort_fields, + window_start_secs: consensus_window_start.unwrap_or(None), + window_duration_secs: consensus_window_duration.unwrap_or(0), + num_merge_ops: max_merge_ops + 1, + rg_partition_prefix_len: consensus_prefix_len.unwrap_or(0), + }) +} + +// ============================================================================ +// Phase 0: drain sort cols from each input +// ============================================================================ + +/// Drive each input's decoder via `block_on` until its sort cols + +/// `sorted_series` are fully decoded for the (single) row group. +/// Returns one [`RecordBatch`] per input with just those columns; the +/// rest of each input's body bytes stay un-read in the stream, ready +/// for phase 3 to consume page-by-page. +fn drain_sort_cols_all_inputs( + handle: &Handle, + decoders_state: &mut [InputDecoderState], + sort_fields_str: &str, +) -> Result> { + let mut batches = Vec::with_capacity(decoders_state.len()); + for (idx, state) in decoders_state.iter_mut().enumerate() { + let batch = drain_sort_cols_one_input(handle, state, sort_fields_str, idx)?; + if batch.num_columns() > 0 && batch.schema().index_of(SORTED_SERIES_COLUMN).is_err() { + bail!( + "input {idx} is missing the '{}' column required for merge", + SORTED_SERIES_COLUMN, + ); + } + batches.push(batch); + } + Ok(batches) +} + +fn drain_sort_cols_one_input( + handle: &Handle, + state: &mut InputDecoderState, + sort_fields_str: &str, + input_idx: usize, +) -> Result { + if state.metadata.num_row_groups() == 0 { + // Empty input — no rows to drain. Return a zero-row batch with the + // sort cols' fields preserved so downstream merge order code sees a + // uniform schema across inputs. + return empty_sort_col_record_batch(state, sort_fields_str); + } + let sort_field_schema = parse_sort_fields(sort_fields_str)?; + + // The set of column names we treat as "sort columns" for drain + // purposes: every sort-schema column name that is present in this + // input's arrow schema, plus `sorted_series` (always required). + let sort_col_names: HashSet = + sort_col_names_for_input(&sort_field_schema, state.arrow_schema.as_ref()); + + // Map each sort col name → its parquet leaf column index. The + // page decoder reports pages by parquet column index (matches arrow + // top-level field index when there are no nested types). + let parquet_schema = state.metadata.file_metadata().schema_descr(); + let mut sort_col_parquet_indices: HashMap = HashMap::new(); + for (col_idx, col) in parquet_schema.columns().iter().enumerate() { + // For flat schemas (one leaf per top-level field), the parquet + // column index equals the arrow top-level field index. We + // match by name: parquet `column_path` root → arrow field name. + let name = col.path().parts()[0].to_string(); + if sort_col_names.contains(&name) { + sort_col_parquet_indices.insert(col_idx, name); + } + } + + if sort_col_parquet_indices.is_empty() { + // No sort cols present in this input — return an empty batch + // with the input's arrow schema. Downstream merge order check + // will catch the missing `sorted_series`. + return Ok(RecordBatch::new_empty(Arc::clone(&state.arrow_schema))); + } + + // Target row count per sort col (from row group's column chunk metadata). + let rg_meta = state.metadata.row_group(0); + let mut target_rows_per_col: HashMap = HashMap::new(); + for &col_idx in sort_col_parquet_indices.keys() { + target_rows_per_col.insert(col_idx, rg_meta.column(col_idx).num_values() as usize); + } + + // Drain pages into per-col buffers until all sort cols are fully + // decoded. With Husky storage ordering, sort col pages come before + // any body col page within the row group, so we should never see a + // non-sort page while sort cols are incomplete. + let mut per_col_pages: HashMap> = HashMap::new(); + let mut rows_done_per_col: HashMap = + sort_col_parquet_indices.keys().map(|&i| (i, 0)).collect(); + let mut sort_cols_finished = 0usize; + let sort_col_target = sort_col_parquet_indices.len(); + + let mut decoder = StreamDecoder::new(&mut *state.stream); + + while sort_cols_finished < sort_col_target { + let decoded = handle + .block_on(decoder.decode_next_page()) + .with_context(|| format!("decoding sort col page (input {input_idx})"))?; + let page = match decoded { + Some(p) => p, + None => bail!( + "stream ended before sort cols fully drained for input {input_idx}: \ + {sort_cols_finished}/{sort_col_target} cols complete", + ), + }; + + if !sort_col_parquet_indices.contains_key(&page.col_idx) { + bail!( + "input {input_idx} returned a non-sort page (col {}) before all sort cols were \ + drained — this violates Husky storage ordering", + page.col_idx, + ); + } + if page.rg_idx != 0 { + bail!( + "input {input_idx} returned a page from rg {} during sort col drain — only single-\ + RG inputs are supported in PR-6b.2", + page.rg_idx, + ); + } + + let array_len = page.array.len(); + let rows_done = rows_done_per_col.get_mut(&page.col_idx).unwrap(); + *rows_done += array_len; + per_col_pages + .entry(page.col_idx) + .or_default() + .push(page.array); + + if *rows_done == target_rows_per_col[&page.col_idx] { + sort_cols_finished += 1; + } else if *rows_done > target_rows_per_col[&page.col_idx] { + bail!( + "input {input_idx} col {} decoded more rows ({}) than expected ({})", + page.col_idx, + rows_done, + target_rows_per_col[&page.col_idx], + ); + } + } + + // Build a RecordBatch holding just the sort cols. Field order + // matches the arrow schema's order (so downstream consumers see + // the same field order whether or not body cols are present). + let mut fields: Vec> = Vec::new(); + let mut columns: Vec = Vec::new(); + for (field_idx, field) in state.arrow_schema.fields().iter().enumerate() { + let Some(_name) = sort_col_parquet_indices.get(&field_idx) else { + continue; + }; + let pages = per_col_pages.remove(&field_idx).expect("col drained"); + let concatenated = concat_arrays(&pages).with_context(|| { + format!( + "concatenating sort col '{}' pages for input {input_idx}", + field.name(), + ) + })?; + fields.push(Arc::clone(field)); + columns.push(concatenated); + } + + let schema = Arc::new(ArrowSchema::new(fields)); + RecordBatch::try_new(schema, columns) + .with_context(|| format!("building sort col record batch for input {input_idx}")) +} + +/// Set of column names treated as "sort cols" for phase 0 drain. +fn sort_col_names_for_input( + sort_field_schema: &quickwit_proto::sortschema::SortSchema, + arrow_schema: &ArrowSchema, +) -> HashSet { + let mut names: HashSet = HashSet::new(); + for sf in &sort_field_schema.column { + if arrow_schema.field_with_name(&sf.name).is_ok() { + names.insert(sf.name.clone()); + } + // Legacy schemas may declare `timestamp` but the column is named + // `timestamp_secs`. The merge order code already handles this + // alias; we want both candidates drained whichever matches. + if is_timestamp_column_name(&sf.name) + && arrow_schema.field_with_name("timestamp_secs").is_ok() + { + names.insert("timestamp_secs".to_string()); + } + } + if arrow_schema.field_with_name(SORTED_SERIES_COLUMN).is_ok() { + names.insert(SORTED_SERIES_COLUMN.to_string()); + } + names +} + +/// Build a zero-row RecordBatch with the input's sort cols + `sorted_series`. +/// Used when an input file has zero rows (no row groups) so that downstream +/// k-way merge sees a consistent schema shape across inputs. +fn empty_sort_col_record_batch( + state: &InputDecoderState, + sort_fields_str: &str, +) -> Result { + let sort_field_schema = parse_sort_fields(sort_fields_str)?; + let sort_col_names = sort_col_names_for_input(&sort_field_schema, state.arrow_schema.as_ref()); + let mut fields: Vec> = Vec::new(); + let mut columns: Vec = Vec::new(); + for field in state.arrow_schema.fields() { + if !sort_col_names.contains(field.name()) { + continue; + } + fields.push(Arc::clone(field)); + columns.push(new_null_array(field.data_type(), 0)); + } + let schema = Arc::new(ArrowSchema::new(fields)); + RecordBatch::try_new(schema, columns).context("building empty sort col record batch") +} + +fn concat_arrays(arrays: &[ArrayRef]) -> Result { + if arrays.len() == 1 { + return Ok(Arc::clone(&arrays[0])); + } + let refs: Vec<&dyn Array> = arrays.iter().map(|a| a.as_ref()).collect(); + Ok(arrow::compute::concat(&refs)?) +} + +// ============================================================================ +// Pre-compute input row → output destination map +// ============================================================================ + +/// `destinations[input_idx][input_row] = Some((output_idx, output_pos))` +/// if that input row contributes to output `output_idx` at position +/// `output_pos` within that output's row range. `None` means the row +/// is not in any output (only possible for rows beyond the merge +/// plan's coverage; shouldn't happen with our merge order). +#[derive(Debug)] +struct InputRowDestinations { + /// One Vec per input. Length = input's sort-col row count. + per_input: Vec>>, + /// Total rows per output index (cumulative writer "expected" rows). + rows_per_output: Vec, +} + +fn build_input_row_destinations( + aligned_sort_batches: &[RecordBatch], + merge_order: &[MergeRun], + boundaries: &[Range], +) -> InputRowDestinations { + let mut per_input: Vec>> = aligned_sort_batches + .iter() + .map(|b| vec![None; b.num_rows()]) + .collect(); + let mut rows_per_output: Vec = vec![0; boundaries.len()]; + + for (out_idx, boundary) in boundaries.iter().enumerate() { + let runs = &merge_order[boundary.clone()]; + for run in runs { + for r in 0..run.row_count { + let input_row = run.start_row + r; + per_input[run.input_index][input_row] = Some((out_idx, rows_per_output[out_idx])); + rows_per_output[out_idx] += 1; + } + } + } + + InputRowDestinations { + per_input, + rows_per_output, + } +} + +// ============================================================================ +// Phase 3: streaming write with one writer per output +// ============================================================================ + +/// Per-output state owned across phase 3 (writer + bookkeeping). +/// The row group lives in a parallel Vec so its borrow into `writer` +/// is tracked by the compiler instead of through a `'static` +/// transmute. +struct OutputWriterStorage { + output_idx: usize, + output_path: PathBuf, + writer: StreamingParquetWriter, + /// Service-name set built during the body col write of "service" + /// (or empty if no service col). + service_names: HashSet, + /// Per-output total row count = sum of merge runs in this output's boundary. + num_rows: usize, +} + +#[allow(clippy::too_many_arguments)] +fn write_streaming_outputs( + handle: &Handle, + decoders_state: &mut [InputDecoderState], + aligned_sort_batches: &[RecordBatch], + sort_union_schema: &SchemaRef, + merge_order: &[MergeRun], + boundaries: &[Range], + destinations: &InputRowDestinations, + input_meta: &InputMetadata, + writer_config: &crate::storage::ParquetWriterConfig, + output_dir: &Path, +) -> Result> { + // 1. Build the union schema across full input arrow schemas (so the + // output covers every column that appears in any input). The + // sort union schema covers only sort cols. + let input_arrow_schemas: Vec = decoders_state + .iter() + .map(|s| Arc::clone(&s.arrow_schema)) + .collect(); + let (full_union_schema, _aligned_full_placeholder) = + build_full_union_schema_from_arrow_schemas(&input_arrow_schemas, &input_meta.sort_fields)?; + + // 2. Build per-output metadata (KV entries, row keys, zonemaps) up front + // from sort col data — these are what the schema + writer props depend on. + let per_output_static = boundaries + .iter() + .enumerate() + .map(|(out_idx, boundary)| { + build_per_output_static( + out_idx, + boundary, + aligned_sort_batches, + sort_union_schema, + merge_order, + input_meta, + ) + }) + .collect::>>()?; + + // 3. Decide per-output schema: optimise based on each output's sort col data + // (which determines metric_name cardinality, etc.). Body cols stay as + // declared by the union schema; we don't probe their cardinality here + // since we haven't read them yet. This is a slight regression vs. the + // non-streaming engine — it would dict-encode low-cardinality string + // body cols too. PR-6c.2 or later can revisit by gathering body-col + // cardinality during the streaming pass. + let per_output_schemas: Vec = per_output_static + .iter() + .map(|s| derive_output_schema(&full_union_schema, &s.sort_optimised)) + .collect::>>()?; + + // 4. Open M writers, one per output. Writers + bookkeeping live in + // `writer_states`; the row group borrows mutably from each writer + // and is held in a parallel `row_groups` Vec for the col loop. + let mut writer_states: Vec = Vec::with_capacity(boundaries.len()); + for (out_idx, (schema, static_meta)) in per_output_schemas + .iter() + .zip(per_output_static.iter()) + .enumerate() + { + if destinations.rows_per_output[out_idx] == 0 { + continue; + } + writer_states.push(open_output_writer( + out_idx, + output_dir, + Arc::clone(schema), + static_meta, + input_meta, + writer_config, + )?); + } + + // Snapshot the (output_idx, num_rows) for each storage entry BEFORE + // calling `start_row_group`, which borrows `writer_states` mutably + // for the rest of phase 3's col loop. + let writer_index_view = writer_states_index_view(&writer_states); + let num_storages = writer_states.len(); + + let mut row_groups: Vec> = + writer_states + .iter_mut() + .map(|s| { + s.writer + .start_row_group() + .with_context(|| format!("opening row group for output {}", s.output_idx)) + }) + .collect::>>()?; + + // Service names are collected into a separate Vec> + // parallel to `row_groups`; we can't write into `writer_states` here + // because it is already borrowed mutably by `row_groups`. We merge + // these back into `writer_states` after dropping the row groups. + let mut service_names_per_output: Vec> = + (0..num_storages).map(|_| HashSet::new()).collect(); + write_all_columns( + handle, + &mut row_groups, + &mut service_names_per_output, + &writer_index_view, + decoders_state, + aligned_sort_batches, + sort_union_schema, + merge_order, + boundaries, + destinations, + &per_output_schemas, + )?; + + // 6. Finish all row groups (drops the borrows on writers). + for rg in row_groups { + rg.finish().context("finishing row group")?; + } + + // 7. Merge collected service names + close writers + build MergeOutputFiles. + let mut outputs = Vec::with_capacity(writer_states.len()); + for (mut state, services) in writer_states + .into_iter() + .zip(service_names_per_output.into_iter()) + { + state.service_names.extend(services); + outputs.push(finalize_output_writer(state, &per_output_static)?); + } + Ok(outputs) +} + +/// Static per-output state computed once from sort col data. Holds +/// the per-output sort-col-only batch (used for metadata extraction) +/// and the per-output schema-optimisation hints. +struct PerOutputStatic { + /// Sort-cols-only batch in output sort order — used by row_keys / + /// zonemap / metric_names / time_range extractors. + sort_optimised: RecordBatch, + row_keys_proto: Option>, + zonemap_regexes: HashMap, + metric_names: HashSet, + time_range: crate::split::TimeRange, + /// Number of rows that go into this output. + num_rows: usize, +} + +fn build_per_output_static( + out_idx: usize, + boundary: &Range, + aligned_sort_batches: &[RecordBatch], + sort_union_schema: &SchemaRef, + merge_order: &[MergeRun], + input_meta: &InputMetadata, +) -> Result { + let runs = &merge_order[boundary.clone()]; + let sort_batch = apply_merge_permutation(aligned_sort_batches, sort_union_schema, runs) + .with_context(|| format!("applying merge permutation for output {out_idx} sort cols"))?; + let num_rows = sort_batch.num_rows(); + + // MC-3 sort order on the sort-col-only batch (same check the + // non-streaming engine does, just restricted to columns we have). + verify_sort_order(&sort_batch, &input_meta.sort_fields); + let sort_optimised = optimize_output_batch(&sort_batch); + + let row_keys_proto = row_keys::extract_row_keys(&input_meta.sort_fields, &sort_optimised) + .with_context(|| format!("extracting row keys for output {out_idx}"))? + .map(|rk| row_keys::encode_row_keys_proto(&rk)); + + let zonemap_opts = ZonemapOptions::default(); + let zonemap_regexes = + zonemap::extract_zonemap_regexes(&input_meta.sort_fields, &sort_optimised, &zonemap_opts) + .with_context(|| format!("extracting zonemap regexes for output {out_idx}"))?; + + let metric_names = extract_metric_names(&sort_optimised) + .with_context(|| format!("extracting metric names for output {out_idx}"))?; + let time_range = extract_time_range(&sort_optimised) + .with_context(|| format!("extracting time range for output {out_idx}"))?; + + Ok(PerOutputStatic { + sort_optimised, + row_keys_proto, + zonemap_regexes, + metric_names, + time_range, + num_rows, + }) +} + +/// Build the full union schema across all inputs' arrow schemas +/// (NOT just sort cols). Reuses the same algorithm as +/// [`align_inputs_to_union_schema`] but takes pre-extracted arrow +/// schemas — phase 3 doesn't have full input batches. +fn build_full_union_schema_from_arrow_schemas( + arrow_schemas: &[SchemaRef], + sort_fields_str: &str, +) -> Result<(SchemaRef, ())> { + // Build zero-row batches with the right schemas; that lets us + // reuse `align_inputs_to_union_schema`'s field-merge / Husky-order + // logic unchanged. + let empty_batches: Vec = arrow_schemas + .iter() + .map(|s| RecordBatch::new_empty(Arc::clone(s))) + .collect(); + let (schema, _) = align_inputs_to_union_schema(&empty_batches, sort_fields_str)?; + Ok((schema, ())) +} + +/// Compute the per-output schema. For PR-6b.2 we use the +/// (string-normalised) union schema as the output schema directly — +/// fields stay Utf8/LargeUtf8 rather than being re-dict-encoded. +/// Reason: streaming-decoded input arrays come out of the page +/// decoder as plain `StringArray`/`BinaryArray` (not Dictionary), and +/// dict re-encoding per output page would add a per-page CPU cost we +/// don't want to take in the page-bounded path. Re-introducing +/// dict-encoded output strings can be done later by tracking +/// cardinality during the streaming pass — call site is here. +/// +/// We do still want to drop columns that are all-null *for this +/// output* (e.g., a column only present in inputs that don't +/// contribute any rows to this output's range). Detect this from the +/// `sort_optimised` schema for sort cols; for body cols, leave them +/// in the union for now (PR-6c.2 will track per-output body-col +/// presence). +fn derive_output_schema( + full_union_schema: &SchemaRef, + sort_optimised: &RecordBatch, +) -> Result { + let sort_schema = sort_optimised.schema(); + let mut fields: Vec> = Vec::with_capacity(full_union_schema.fields().len()); + for field in full_union_schema.fields() { + // If sort_optimised dropped this field entirely, the field was + // all-null in this output's sort cols — drop from output too. + // (Body cols are always retained.) + let is_sort_field = sort_schema.index_of(field.name()).is_ok(); + if is_sort_field || full_union_schema.index_of(field.name()).is_ok() { + fields.push(Arc::clone(field)); + } + } + Ok(Arc::new(ArrowSchema::new(fields))) +} + +fn open_output_writer( + out_idx: usize, + output_dir: &Path, + schema: SchemaRef, + static_meta: &PerOutputStatic, + input_meta: &InputMetadata, + writer_config: &crate::storage::ParquetWriterConfig, +) -> Result { + let output_prefix_len = input_meta.rg_partition_prefix_len; + let kv_entries = build_merge_kv_metadata( + input_meta, + &static_meta.row_keys_proto, + &static_meta.zonemap_regexes, + output_prefix_len, + ); + let sorting_cols = build_sorting_columns(&static_meta.sort_optimised, &input_meta.sort_fields)?; + let sort_field_names = resolve_sort_field_names(&input_meta.sort_fields)?; + + let props = writer_config.to_writer_properties_with_metadata( + &schema, + sorting_cols, + Some(kv_entries), + &sort_field_names, + ); + + let output_filename = format!("merge_output_{}.parquet", Ulid::new()); + let output_path = output_dir.join(&output_filename); + let file = std::fs::File::create(&output_path) + .with_context(|| format!("creating output file: {}", output_path.display()))?; + let writer = StreamingParquetWriter::try_new(file, Arc::clone(&schema), props) + .with_context(|| format!("opening streaming writer for output {out_idx}"))?; + + Ok(OutputWriterStorage { + output_idx: out_idx, + output_path, + writer, + service_names: HashSet::new(), + num_rows: static_meta.num_rows, + }) +} + +/// Index view used inside the col loop to find the writer's +/// `output_idx` and `num_rows` without needing a mutable borrow on +/// `writer_states` (which is already mutably borrowed by `row_groups`). +fn writer_states_index_view(writer_states: &[OutputWriterStorage]) -> Vec<(usize, usize)> { + writer_states + .iter() + .map(|s| (s.output_idx, s.num_rows)) + .collect() +} + +#[allow(clippy::too_many_arguments)] +fn write_all_columns( + handle: &Handle, + row_groups: &mut [crate::storage::streaming_writer::RowGroupBuilder<'_, std::fs::File>], + service_names_per_output: &mut [HashSet], + writer_index_view: &[(usize, usize)], + decoders_state: &mut [InputDecoderState], + aligned_sort_batches: &[RecordBatch], + sort_union_schema: &SchemaRef, + merge_order: &[MergeRun], + boundaries: &[Range], + destinations: &InputRowDestinations, + per_output_schemas: &[SchemaRef], +) -> Result<()> { + // Iterate cols in the union (per-output) schema order. We need + // ONE pass through Husky-ordered cols of an output to feed its + // writer. The writer's expected col order is each output's own + // schema. All outputs share Husky col order with possible + // per-output drops of all-null columns. + // + // Strategy: process the FULL union schema's cols in order. For + // each col K in the union schema, for each output: + // - If output's schema includes col K: write col K's data + // (sort col → from buffer, body col → from decoder). + // - Else: skip (col was dropped as all-null for this output). + // This keeps decoder advancement in lockstep across outputs: a + // body col K's pages are consumed once across all outputs in turn. + + // We need the parent union schema to drive iteration. Recompute + // here (cheap) from the per-output schemas + sort union schema. + let parent_union_schema = build_parent_union_schema(per_output_schemas); + + // For each union-schema col K: + for parent_col_idx in 0..parent_union_schema.fields().len() { + let parent_field = parent_union_schema.field(parent_col_idx); + let parent_name = parent_field.name(); + + // Is this a sort col (in memory) or a body col (streamed)? + let is_sort_col = sort_union_schema.index_of(parent_name).is_ok(); + + if is_sort_col { + write_sort_col_for_all_outputs( + row_groups, + writer_index_view, + parent_name, + aligned_sort_batches, + sort_union_schema, + merge_order, + boundaries, + destinations, + per_output_schemas, + )?; + } else { + write_body_col_for_all_outputs( + handle, + row_groups, + service_names_per_output, + writer_index_view, + decoders_state, + parent_name, + destinations, + per_output_schemas, + )?; + } + } + + Ok(()) +} + +fn build_parent_union_schema(per_output_schemas: &[SchemaRef]) -> SchemaRef { + // All per-output schemas have the same column order (Husky), with + // some cols possibly missing per output (all-null drops). Pick the + // schema with the most fields as the parent driver, ties broken by + // first occurrence. This is safe because all schemas share Husky + // ordering as a subsequence. + let mut best = Arc::clone(&per_output_schemas[0]); + for s in per_output_schemas.iter().skip(1) { + if s.fields().len() > best.fields().len() { + best = Arc::clone(s); + } + } + best +} + +#[allow(clippy::too_many_arguments)] +fn write_sort_col_for_all_outputs( + row_groups: &mut [crate::storage::streaming_writer::RowGroupBuilder<'_, std::fs::File>], + writer_index_view: &[(usize, usize)], + col_name: &str, + aligned_sort_batches: &[RecordBatch], + sort_union_schema: &SchemaRef, + merge_order: &[MergeRun], + boundaries: &[Range], + destinations: &InputRowDestinations, + per_output_schemas: &[SchemaRef], +) -> Result<()> { + let _ = sort_union_schema; + + let mut storage_idx = 0; + for (out_idx, boundary) in boundaries.iter().enumerate() { + if destinations.rows_per_output[out_idx] == 0 { + continue; + } + debug_assert_eq!(writer_index_view[storage_idx].0, out_idx); + + // Drop this col if the output's schema doesn't include it. + let out_schema = &per_output_schemas[out_idx]; + if out_schema.index_of(col_name).is_err() { + storage_idx += 1; + continue; + } + + let runs = &merge_order[boundary.clone()]; + let arrays = build_sort_col_pages_for_output(col_name, aligned_sort_batches, runs)?; + row_groups[storage_idx] + .write_next_column_arrays(arrays.into_iter()) + .with_context(|| format!("writing sort col '{col_name}' to output {out_idx}"))?; + storage_idx += 1; + } + Ok(()) +} + +/// Build per-output-page arrays for one sort col. The col is already +/// in memory across all inputs (`aligned_sort_batches`); for this +/// output we walk its merge runs and split the take result into +/// `OUTPUT_PAGE_ROWS`-sized chunks. +fn build_sort_col_pages_for_output( + col_name: &str, + aligned_sort_batches: &[RecordBatch], + runs: &[MergeRun], +) -> Result> { + // Collect references to each input's column array. + let mut input_arrays: Vec<&dyn Array> = Vec::with_capacity(aligned_sort_batches.len()); + for batch in aligned_sort_batches { + let idx = batch.schema().index_of(col_name).map_err(|_| { + anyhow!("input is missing sort col '{col_name}' that the union schema expected",) + })?; + input_arrays.push(batch.column(idx).as_ref()); + } + + let mut indices: Vec<(usize, usize)> = + Vec::with_capacity(runs.iter().map(|r| r.row_count).sum()); + for run in runs { + for r in 0..run.row_count { + indices.push((run.input_index, run.start_row + r)); + } + } + + // Split into OUTPUT_PAGE_ROWS-sized chunks; each chunk → one + // arrow::interleave call → one ArrayRef. + let mut pages = Vec::with_capacity(indices.len().div_ceil(OUTPUT_PAGE_ROWS)); + for chunk in indices.chunks(OUTPUT_PAGE_ROWS) { + let arr = interleave(&input_arrays, chunk) + .with_context(|| format!("interleaving sort col '{col_name}' pages"))?; + pages.push(arr); + } + Ok(pages) +} + +#[allow(clippy::too_many_arguments)] +fn write_body_col_for_all_outputs( + handle: &Handle, + row_groups: &mut [crate::storage::streaming_writer::RowGroupBuilder<'_, std::fs::File>], + service_names_per_output: &mut [HashSet], + writer_index_view: &[(usize, usize)], + decoders_state: &mut [InputDecoderState], + col_name: &str, + destinations: &InputRowDestinations, + per_output_schemas: &[SchemaRef], +) -> Result<()> { + // Find this col's per-input parquet leaf index (one per input). + // Inputs whose schema doesn't have this col contribute null rows + // and don't advance their decoder for this col. + let mut input_col_indices: Vec> = Vec::with_capacity(decoders_state.len()); + let mut input_target_rows: Vec = Vec::with_capacity(decoders_state.len()); + for state in decoders_state.iter() { + match state.arrow_schema.index_of(col_name) { + Ok(idx) => { + input_col_indices.push(Some(idx)); + let rg = state.metadata.row_group(0); + input_target_rows.push(rg.column(idx).num_values() as usize); + } + Err(_) => { + input_col_indices.push(None); + input_target_rows.push(state.metadata.row_group(0).num_rows() as usize); + } + } + } + + // For each output sequentially: build output pages, feed to writer. + let mut storage_idx = 0; + for (out_idx, &row_count) in destinations.rows_per_output.iter().enumerate() { + if row_count == 0 { + continue; + } + debug_assert_eq!(writer_index_view[storage_idx].0, out_idx); + + let out_schema = &per_output_schemas[out_idx]; + if out_schema.index_of(col_name).is_err() { + storage_idx += 1; + continue; + } + let out_field_idx = out_schema.index_of(col_name)?; + let out_field = out_schema.field(out_field_idx); + + // Build per-output assembler. Feeds one output page per + // `Iterator::next()` call. + let assembler = BodyColOutputPageAssembler::new( + handle, + decoders_state, + &input_col_indices, + destinations, + out_idx, + col_name, + out_field, + ); + + // Track service names while streaming the service col. + let track_service = col_name == "service"; + + // Drive the assembler via a sync iterator. We must `?`-propagate + // assembly errors out of the iterator; collect into a Vec and + // return on first error. + let pages: Vec = assembler + .into_iter() + .collect::>>() + .with_context(|| format!("assembling body col '{col_name}' for output {out_idx}"))?; + + if track_service { + for page in &pages { + collect_service_names_from_page( + page.as_ref(), + &mut service_names_per_output[storage_idx], + )?; + } + } + + row_groups[storage_idx] + .write_next_column_arrays(pages.into_iter()) + .with_context(|| format!("writing body col '{col_name}' to output {out_idx}"))?; + storage_idx += 1; + } + + Ok(()) +} + +/// Per-page service name collector. Used during the streaming write +/// of the "service" body col to populate per-output service_names. +fn collect_service_names_from_page(arr: &dyn Array, out: &mut HashSet) -> Result<()> { + use arrow::array::AsArray; + use arrow::datatypes::{Int8Type, Int16Type, Int32Type, Int64Type}; + + fn extend_from_strings(strings: &arrow::array::StringArray, out: &mut HashSet) { + for i in 0..strings.len() { + if strings.is_valid(i) { + out.insert(strings.value(i).to_string()); + } + } + } + + match arr.data_type() { + DataType::Utf8 => { + let strings = arr + .as_any() + .downcast_ref::() + .ok_or_else(|| anyhow!("expected StringArray for service col page"))?; + extend_from_strings(strings, out); + } + DataType::LargeUtf8 => { + let strings = arr + .as_any() + .downcast_ref::() + .ok_or_else(|| anyhow!("expected LargeStringArray for service col page"))?; + for i in 0..strings.len() { + if strings.is_valid(i) { + out.insert(strings.value(i).to_string()); + } + } + } + DataType::Dictionary(key_type, value_type) + if matches!(value_type.as_ref(), DataType::Utf8) => + { + // Extract the dictionary's values that are referenced by + // valid (non-null) keys. + match key_type.as_ref() { + DataType::Int8 => { + let dict = arr.as_dictionary::(); + if let Some(strings) = dict + .values() + .as_any() + .downcast_ref::() + { + for i in 0..dict.len() { + if dict.is_valid(i) { + let key = dict.keys().value(i) as usize; + if key < strings.len() && strings.is_valid(key) { + out.insert(strings.value(key).to_string()); + } + } + } + } + } + DataType::Int16 => { + let dict = arr.as_dictionary::(); + if let Some(strings) = dict + .values() + .as_any() + .downcast_ref::() + { + for i in 0..dict.len() { + if dict.is_valid(i) { + let key = dict.keys().value(i) as usize; + if key < strings.len() && strings.is_valid(key) { + out.insert(strings.value(key).to_string()); + } + } + } + } + } + DataType::Int32 => { + let dict = arr.as_dictionary::(); + if let Some(strings) = dict + .values() + .as_any() + .downcast_ref::() + { + for i in 0..dict.len() { + if dict.is_valid(i) { + let key = dict.keys().value(i) as usize; + if key < strings.len() && strings.is_valid(key) { + out.insert(strings.value(key).to_string()); + } + } + } + } + } + DataType::Int64 => { + let dict = arr.as_dictionary::(); + if let Some(strings) = dict + .values() + .as_any() + .downcast_ref::() + { + for i in 0..dict.len() { + if dict.is_valid(i) { + let key = dict.keys().value(i) as usize; + if key < strings.len() && strings.is_valid(key) { + out.insert(strings.value(key).to_string()); + } + } + } + } + } + _ => {} + } + } + _ => { + // Skip non-string types — service col is expected to be + // string-like; if it isn't, just don't collect names. + } + } + Ok(()) +} + +// ============================================================================ +// Body col output page assembler — the page-bounded streaming core +// ============================================================================ + +/// Assembles output pages for one (output_idx, body_col) by: +/// 1. Walking the destinations table forward through this output's +/// row range, accumulating `(input_idx, input_row)` index pairs. +/// 2. When the index buffer hits `OUTPUT_PAGE_ROWS`, advancing each +/// contributing input's decoder until its decoded pages cover the +/// needed input rows, then calling `arrow::compute::interleave`. +/// 3. Emitting one `ArrayRef` per iter step until the row range is +/// exhausted; then `Ok(None)`. +/// +/// Memory per `next()` call: one in-progress output page (P rows) + +/// up to ~2 in-flight decoded pages per input (kept until all their +/// rows are consumed). Bounded by page sizes, not column-chunk sizes. +struct BodyColOutputPageAssembler<'a> { + handle: &'a Handle, + decoders_state: &'a mut [InputDecoderState], + input_col_indices: &'a [Option], + destinations: &'a InputRowDestinations, + out_idx: usize, + col_name: &'a str, + out_field: &'a Field, + /// Cursor into `destinations.per_input[*]` — next input row index per input. + input_cursors: Vec, + /// Per-input decoded page cache. Pages are kept in order; the + /// front page's `row_start` is the smallest input row we still + /// have decoded. + page_cache: Vec>, + /// Total rows written so far for this output's col. + rows_emitted: usize, + /// Total rows expected = destinations.rows_per_output[out_idx]. + expected_rows: usize, + /// EOF flag (returns None on subsequent calls once true). + done: bool, +} + +impl<'a> BodyColOutputPageAssembler<'a> { + #[allow(clippy::too_many_arguments)] + fn new( + handle: &'a Handle, + decoders_state: &'a mut [InputDecoderState], + input_col_indices: &'a [Option], + destinations: &'a InputRowDestinations, + out_idx: usize, + col_name: &'a str, + out_field: &'a Field, + ) -> Self { + let num_inputs = decoders_state.len(); + let mut page_cache: Vec> = Vec::with_capacity(num_inputs); + for _ in 0..num_inputs { + page_cache.push(Vec::new()); + } + Self { + handle, + decoders_state, + input_col_indices, + destinations, + out_idx, + col_name, + out_field, + input_cursors: vec![0; num_inputs], + page_cache, + rows_emitted: 0, + expected_rows: destinations.rows_per_output[out_idx], + done: false, + } + } + + fn into_iter(self) -> BodyColOutputPageIter<'a> { + BodyColOutputPageIter { inner: self } + } +} + +struct BodyColOutputPageIter<'a> { + inner: BodyColOutputPageAssembler<'a>, +} + +impl Iterator for BodyColOutputPageIter<'_> { + type Item = Result; + + fn next(&mut self) -> Option { + if self.inner.done || self.inner.rows_emitted >= self.inner.expected_rows { + self.inner.done = true; + return None; + } + match assemble_one_output_page(&mut self.inner) { + Ok(Some(arr)) => Some(Ok(arr)), + Ok(None) => { + self.inner.done = true; + None + } + Err(e) => { + self.inner.done = true; + Some(Err(e)) + } + } + } +} + +fn assemble_one_output_page(s: &mut BodyColOutputPageAssembler) -> Result> { + let remaining = s.expected_rows - s.rows_emitted; + if remaining == 0 { + return Ok(None); + } + let page_size = remaining.min(OUTPUT_PAGE_ROWS); + + // Walk this output's row positions and figure out which (input, input_row) + // contributes each one. We use the per-input destinations table: for + // input i, find the next input_row whose destination is (out_idx, *). + // Since `destinations.per_input[i]` is in input order and outputs are + // strictly increasing by sort key, the rows that go to this output are + // a contiguous slice in input i's row order. + // + // For each output position 0..page_size, we need (input_idx, input_row). + // Walk input cursors and pick the next row going to this output. + + // Collect (input_idx, input_row) indices for this output page. + let mut indices_per_input: Vec> = vec![Vec::new(); s.decoders_state.len()]; + let mut interleave_indices: Vec<(usize, usize)> = Vec::with_capacity(page_size); + let mut total_picked = 0usize; + + while total_picked < page_size { + // Look across all inputs for the next contribution to this output. + // Per the merge order, within each input the rows assigned to this + // output are a contiguous slice; once we've advanced cursor past + // them, no more rows from this input contribute. We collect ALL + // rows from one input up to a per-input limit determined by the + // merge order, but the simplest correct approach is to walk in + // merge-order globally. We don't have the merge order indexed by + // output here, so re-derive by scanning the destinations table. + // + // Better: pre-compute per-output, per-input row ranges. Each input + // contributes a contiguous half-open range `[lo_i..hi_i)` to this + // output (possibly empty). We could compute these ranges once and + // reuse. For now, lazy approach: scan forward from cursor on each + // input, picking the next row that maps to (out_idx, *). + // + // The ORDER in which we pick across inputs must match the merge + // plan's output position. We have output positions in destinations: + // `destinations.per_input[i][r] = Some((out_idx, pos))`. The merged + // output picks rows in order of increasing `pos`. + // + // For one output page, the positions we want are + // `s.rows_emitted..s.rows_emitted + page_size`. For each position + // p in that range, find (input_idx, input_row) such that + // destinations.per_input[input_idx][input_row] == Some((out_idx, p)). + let target_pos = s.rows_emitted + total_picked; + let mut found = false; + for (input_idx, dests) in s.destinations.per_input.iter().enumerate() { + let cursor = s.input_cursors[input_idx]; + for (input_row, dest) in dests.iter().enumerate().skip(cursor) { + match dest { + Some((o, p)) if *o == s.out_idx => { + if *p == target_pos { + interleave_indices.push((input_idx, input_row)); + indices_per_input[input_idx].push(input_row); + // Don't advance the cursor past this row yet — + // we may need rows from input i in this page + // with positions ahead. We bump it after the + // whole page is collected. + found = true; + break; + } + } + _ => {} + } + if found { + break; + } + } + if found { + break; + } + } + if !found { + // Shouldn't happen — every output position should be reachable. + bail!( + "merge plan inconsistency: output {} position {target_pos} not found in any input", + s.out_idx, + ); + } + total_picked += 1; + } + + // Now ensure each input's decoder has decoded pages covering all + // `indices_per_input[i]` rows. Advance decoders as needed. + for (input_idx, input_rows) in indices_per_input.iter().enumerate() { + if input_rows.is_empty() { + continue; + } + let col_parquet_idx = match s.input_col_indices[input_idx] { + Some(c) => c, + None => { + // This input lacks this col entirely — null contributions. + // We'll handle null-filling in the interleave step below. + continue; + } + }; + let max_needed_row = *input_rows.iter().max().expect("non-empty"); + advance_decoder_to_row( + s.handle, + &mut s.decoders_state[input_idx], + &mut s.page_cache[input_idx], + col_parquet_idx, + max_needed_row, + )?; + } + + // Build the per-(input, row) value array by: + // 1. Concatenating each input's cached pages into one ArrayRef (they + // cover a contiguous input row range from cache_start to cursor_max). + // 2. Computing local indices = input_row - cache_start. + // 3. Calling arrow::compute::interleave across N input arrays. + // + // For inputs without this col, we substitute a single null page of the + // out_field's type. + let mut input_array_refs: Vec = Vec::with_capacity(s.decoders_state.len()); + let mut input_cache_starts: Vec = Vec::with_capacity(s.decoders_state.len()); + + for input_idx in 0..s.decoders_state.len() { + match s.input_col_indices[input_idx] { + Some(_) => { + let pages = &s.page_cache[input_idx]; + if pages.is_empty() { + // No pages decoded for this input (no rows from this input go to this output). + // Use a zero-row placeholder; we won't index into it. + input_array_refs.push(new_null_array(s.out_field.data_type(), 0)); + input_cache_starts.push(0); + } else { + let cache_start = pages[0].row_start; + let arrays: Vec<&dyn Array> = pages.iter().map(|p| p.array.as_ref()).collect(); + let concatenated = arrow::compute::concat(&arrays).with_context(|| { + format!( + "concatenating cached pages for input {input_idx} col '{}'", + s.col_name, + ) + })?; + input_array_refs.push(concatenated); + input_cache_starts.push(cache_start); + } + } + None => { + // Null-fill array of the right length. The max needed local + // index from this input is the largest index we'd reference; + // since we don't actually reference rows from this input (we'd + // need an alternate "null contribution" mechanism), we leave + // it as a 1-row null array and route indices to position 0. + let null_arr = new_null_array(s.out_field.data_type(), 1); + input_array_refs.push(null_arr); + input_cache_starts.push(0); + } + } + } + + let interleave_local: Vec<(usize, usize)> = interleave_indices + .iter() + .map(|&(i_idx, i_row)| match s.input_col_indices[i_idx] { + Some(_) => (i_idx, i_row - input_cache_starts[i_idx]), + None => (i_idx, 0), + }) + .collect(); + + let array_refs_ref: Vec<&dyn Array> = input_array_refs.iter().map(|a| a.as_ref()).collect(); + let assembled = interleave(&array_refs_ref, &interleave_local).with_context(|| { + format!( + "interleaving body col '{}' for output {}", + s.col_name, s.out_idx, + ) + })?; + + // Bump input cursors past rows we just consumed and drop pages + // whose rows are fully consumed. + for (input_idx, input_rows) in indices_per_input.iter().enumerate() { + if input_rows.is_empty() { + continue; + } + let max_row = *input_rows.iter().max().expect("non-empty"); + s.input_cursors[input_idx] = max_row + 1; + + // Drop pages whose last row is < cursor. + if s.input_col_indices[input_idx].is_some() { + let pages = &mut s.page_cache[input_idx]; + while let Some(front) = pages.first() { + let front_end = front.row_start + front.array.len(); + if front_end <= s.input_cursors[input_idx] { + pages.remove(0); + } else { + break; + } + } + } + } + + s.rows_emitted += page_size; + Ok(Some(assembled)) +} + +/// Drive `state.stream`'s decoder forward via `block_on` until the +/// cached pages for `col_parquet_idx` cover up through `target_row` +/// (inclusive). Stops as soon as the latest cached page ends past +/// `target_row`. +fn advance_decoder_to_row( + handle: &Handle, + state: &mut InputDecoderState, + page_cache: &mut Vec, + col_parquet_idx: usize, + target_row: usize, +) -> Result<()> { + // If cache already covers target_row, nothing to do. + if let Some(last) = page_cache.last() { + let last_end = last.row_start + last.array.len(); + if target_row < last_end { + return Ok(()); + } + } + + let mut decoder = StreamDecoder::new(&mut *state.stream); + loop { + let decoded = handle + .block_on(decoder.decode_next_page()) + .context("decoding body col page")?; + let page = match decoded { + Some(p) => p, + None => bail!( + "stream EOF while advancing to row {target_row} for parquet col {col_parquet_idx}", + ), + }; + if page.col_idx != col_parquet_idx { + bail!( + "expected col {col_parquet_idx} page, got col {} — Husky col ordering violated", + page.col_idx, + ); + } + let end = page.row_start + page.array.len(); + page_cache.push(page); + if target_row < end { + return Ok(()); + } + } +} + +fn finalize_output_writer( + state: OutputWriterStorage, + per_output_static: &[PerOutputStatic], +) -> Result { + let OutputWriterStorage { + output_idx, + output_path, + writer, + service_names, + num_rows, + } = state; + + let _metadata = writer + .close() + .with_context(|| format!("closing writer for output {output_idx}"))?; + + let size_bytes = std::fs::metadata(&output_path) + .with_context(|| format!("stat output file: {}", output_path.display()))? + .len(); + + let static_meta = &per_output_static[output_idx]; + + let mut low_cardinality_tags: HashMap> = HashMap::new(); + if !service_names.is_empty() { + low_cardinality_tags.insert(TAG_SERVICE.to_string(), service_names); + } + + Ok(MergeOutputFile { + path: output_path, + num_rows, + num_row_groups: 1, + size_bytes, + row_keys_proto: static_meta.row_keys_proto.clone(), + zonemap_regexes: static_meta.zonemap_regexes.clone(), + metric_names: static_meta.metric_names.clone(), + time_range: static_meta.time_range, + low_cardinality_tags, + }) +} + +// ============================================================================ +// Tests +// ============================================================================ + +#[cfg(test)] +mod tests { + use std::path::PathBuf; + use std::sync::Arc; + + use arrow::array::{ + ArrayRef, BinaryArray, DictionaryArray, Float64Array, Int64Array, StringArray, UInt8Array, + UInt64Array, + }; + use arrow::datatypes::{DataType, Field, Int32Type, Schema as ArrowSchema}; + use bytes::Bytes; + use parquet::arrow::ArrowWriter; + use parquet::file::metadata::KeyValue; + use parquet::file::properties::WriterProperties; + use parquet::file::reader::{FileReader, SerializedFileReader}; + use tempfile::TempDir; + use tokio::io::AsyncRead; + + use super::*; + use crate::storage::page_decoder::StreamDecoder; + use crate::storage::streaming_reader::{RemoteByteSource, StreamingParquetReader}; + use crate::storage::{Compression, ParquetWriterConfig}; + + // -------- Fixtures -------- + + /// Build a sorted metrics RecordBatch with `num_rows` rows in + /// **Husky column order**: sort cols (metric_name, timestamp_secs) + /// → sorted_series → remaining body cols lexicographic + /// (metric_type, service, timeseries_id, value). All rows share + /// the single metric_name "cpu.usage". `sorted_series` is monotonic + /// from `start_series_idx`. `service` carries nulls every 5th row. + fn make_sorted_batch(num_rows: usize, start_series_idx: u64) -> RecordBatch { + let dict_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); + let schema = Arc::new(ArrowSchema::new(vec![ + // sort cols (in sort schema order) + Field::new("metric_name", dict_type.clone(), false), + Field::new("timestamp_secs", DataType::UInt64, false), + // sorted_series marker + Field::new("sorted_series", DataType::Binary, false), + // body cols lexicographic + Field::new("metric_type", DataType::UInt8, false), + Field::new("service", dict_type, true), + Field::new("timeseries_id", DataType::Int64, false), + Field::new("value", DataType::Float64, false), + ])); + + let metric_keys: Vec = (0..num_rows as i32).map(|_| 0).collect(); + let metric_values = StringArray::from(vec!["cpu.usage", "memory.used"]); + let metric_name: ArrayRef = Arc::new( + DictionaryArray::::try_new( + arrow::array::Int32Array::from(metric_keys), + Arc::new(metric_values), + ) + .expect("test dict array"), + ); + let metric_type: ArrayRef = Arc::new(UInt8Array::from(vec![0u8; num_rows])); + let timestamps: Vec = (0..num_rows as u64) + .map(|i| 1_700_000_000 + (num_rows as u64 - i)) + .collect(); + let timestamp_secs: ArrayRef = Arc::new(UInt64Array::from(timestamps)); + let values: Vec = (0..num_rows).map(|i| i as f64).collect(); + let value: ArrayRef = Arc::new(Float64Array::from(values)); + let tsids: Vec = (0..num_rows as i64).map(|i| 1000 + i).collect(); + let timeseries_id: ArrayRef = Arc::new(Int64Array::from(tsids)); + let svc_keys: Vec> = (0..num_rows as i32) + .map(|i| if i % 5 == 0 { None } else { Some(i % 3) }) + .collect(); + let svc_values = StringArray::from(vec!["api", "db", "cache"]); + let service: ArrayRef = Arc::new( + DictionaryArray::::try_new( + arrow::array::Int32Array::from(svc_keys), + Arc::new(svc_values), + ) + .expect("test dict array"), + ); + let mut series_bytes: Vec> = Vec::with_capacity(num_rows); + for i in 0..num_rows as u64 { + let id = start_series_idx + i; + series_bytes.push(id.to_be_bytes().to_vec()); + } + let series_refs: Vec<&[u8]> = series_bytes.iter().map(|v| v.as_slice()).collect(); + let sorted_series: ArrayRef = Arc::new(BinaryArray::from(series_refs)); + + RecordBatch::try_new( + schema, + vec![ + metric_name, + timestamp_secs, + sorted_series, + metric_type, + service, + timeseries_id, + value, + ], + ) + .expect("test batch") + } + + /// Write a fixture parquet file with the standard `qh.*` KVs that the + /// streaming merge engine validates. + fn write_input_parquet(batches: &[RecordBatch], extra_kvs: &[(&str, &str)]) -> Bytes { + let schema = batches[0].schema(); + let cfg = ParquetWriterConfig { + compression: Compression::Snappy, + ..ParquetWriterConfig::default() + }; + let sort_fields = "metric_name|-timestamp_secs/V2"; + let sort_field_names = vec!["metric_name".to_string(), "timestamp_secs".to_string()]; + let mut kvs = vec![ + KeyValue::new( + PARQUET_META_SORT_FIELDS.to_string(), + sort_fields.to_string(), + ), + KeyValue::new( + PARQUET_META_WINDOW_START.to_string(), + "1700000000".to_string(), + ), + KeyValue::new(PARQUET_META_WINDOW_DURATION.to_string(), "60".to_string()), + KeyValue::new(PARQUET_META_NUM_MERGE_OPS.to_string(), "0".to_string()), + ]; + for (k, v) in extra_kvs { + kvs.push(KeyValue::new(k.to_string(), v.to_string())); + } + let sorting_cols = vec![ + parquet::file::metadata::SortingColumn { + column_idx: schema.index_of("metric_name").expect("test schema") as i32, + descending: false, + nulls_first: false, + }, + parquet::file::metadata::SortingColumn { + column_idx: schema.index_of("timestamp_secs").expect("test schema") as i32, + descending: true, + nulls_first: false, + }, + ]; + let props: WriterProperties = cfg.to_writer_properties_with_metadata( + &schema, + sorting_cols, + Some(kvs), + &sort_field_names, + ); + let mut buf: Vec = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buf, schema, Some(props)).expect("arrow writer"); + for b in batches { + writer.write(b).expect("test write"); + } + writer.close().expect("test close"); + Bytes::from(buf) + } + + // -------- In-memory byte source -------- + + struct InMemorySource { + bytes: Bytes, + } + + #[async_trait::async_trait] + impl RemoteByteSource for InMemorySource { + async fn file_size(&self, _path: &std::path::Path) -> std::io::Result { + Ok(self.bytes.len() as u64) + } + async fn get_slice( + &self, + _path: &std::path::Path, + range: std::ops::Range, + ) -> std::io::Result { + Ok(self.bytes.slice(range.start as usize..range.end as usize)) + } + async fn get_slice_stream( + &self, + _path: &std::path::Path, + range: std::ops::Range, + ) -> std::io::Result> { + let slice = self.bytes.slice(range.start as usize..range.end as usize); + Ok(Box::new(std::io::Cursor::new(slice.to_vec()))) + } + } + + async fn open_stream(bytes: Bytes) -> Box { + let source = Arc::new(InMemorySource { bytes }); + let reader = StreamingParquetReader::try_open(source, PathBuf::from("test.parquet")) + .await + .expect("open reader"); + Box::new(reader) + } + + /// Read an output parquet file back into a single concatenated RecordBatch. + fn read_output_to_record_batch(path: &Path) -> RecordBatch { + let bytes = std::fs::read(path).expect("read output"); + let builder = parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new( + Bytes::from(bytes), + ) + .expect("read output builder"); + let schema = builder.schema().clone(); + let reader = builder.build().expect("read output build"); + let batches: Vec = reader.collect::, _>>().expect("read output"); + if batches.is_empty() { + RecordBatch::new_empty(schema) + } else { + arrow::compute::concat_batches(&schema, &batches).expect("concat") + } + } + + fn merge_config(num_outputs: usize) -> MergeConfig { + MergeConfig { + num_outputs, + writer_config: ParquetWriterConfig { + compression: Compression::Snappy, + ..ParquetWriterConfig::default() + }, + } + } + + // -------- Tests -------- + + /// Two inputs → one output: row count and sort order preserved. + #[tokio::test] + async fn test_two_inputs_simple_merge() { + let batch_a = make_sorted_batch(50, 0); + let batch_b = make_sorted_batch(50, 50); + let bytes_a = write_input_parquet(std::slice::from_ref(&batch_a), &[]); + let bytes_b = write_input_parquet(std::slice::from_ref(&batch_b), &[]); + + let inputs: Vec> = + vec![open_stream(bytes_a).await, open_stream(bytes_b).await]; + + let tmp = TempDir::new().expect("tmpdir"); + let outputs = streaming_merge_sorted_parquet_files(inputs, tmp.path(), &merge_config(1)) + .await + .expect("merge"); + assert_eq!(outputs.len(), 1); + assert_eq!(outputs[0].num_rows, 100); + + let merged = read_output_to_record_batch(&outputs[0].path); + assert_eq!(merged.num_rows(), 100); + let ss_array = merged.column(merged.schema().index_of("sorted_series").expect("col")); + let ss = ss_array + .as_any() + .downcast_ref::() + .expect("binary"); + for i in 0..ss_array.len().saturating_sub(1) { + assert!( + ss.value(i) <= ss.value(i + 1), + "row {i}: sorted_series not ascending", + ); + } + } + + /// Single-metric_name input + num_outputs=1 → output is single row group. + #[tokio::test] + async fn test_output_is_single_row_group() { + let batch_a = make_sorted_batch(200, 0); + let bytes_a = write_input_parquet(std::slice::from_ref(&batch_a), &[]); + let inputs: Vec> = vec![open_stream(bytes_a).await]; + + let tmp = TempDir::new().expect("tmpdir"); + let outputs = streaming_merge_sorted_parquet_files(inputs, tmp.path(), &merge_config(1)) + .await + .expect("merge"); + assert_eq!(outputs.len(), 1); + + let bytes = std::fs::read(&outputs[0].path).expect("read"); + let reader = SerializedFileReader::new(Bytes::from(bytes)).expect("ser reader"); + assert_eq!( + reader.metadata().num_row_groups(), + 1, + "single-metric_name single-output merge must produce single row group", + ); + } + + /// N inputs → M outputs: total row count preserved (MC-1). + #[tokio::test] + async fn test_total_rows_preserved() { + let batch_a = make_sorted_batch(75, 0); + let batch_b = make_sorted_batch(50, 100); + let batch_c = make_sorted_batch(25, 200); + let bytes_a = write_input_parquet(std::slice::from_ref(&batch_a), &[]); + let bytes_b = write_input_parquet(std::slice::from_ref(&batch_b), &[]); + let bytes_c = write_input_parquet(std::slice::from_ref(&batch_c), &[]); + + let inputs: Vec> = vec![ + open_stream(bytes_a).await, + open_stream(bytes_b).await, + open_stream(bytes_c).await, + ]; + let tmp = TempDir::new().expect("tmpdir"); + let outputs = streaming_merge_sorted_parquet_files(inputs, tmp.path(), &merge_config(2)) + .await + .expect("merge"); + + let total: usize = outputs.iter().map(|o| o.num_rows).sum(); + assert_eq!(total, 150); + } + + /// Sort schema mismatch across inputs is rejected. + #[tokio::test] + async fn test_sort_schema_mismatch_rejected() { + let batch_a = make_sorted_batch(20, 0); + let bytes_a = write_input_parquet(std::slice::from_ref(&batch_a), &[]); + + let cfg = ParquetWriterConfig { + compression: Compression::Snappy, + ..ParquetWriterConfig::default() + }; + let kvs = vec![ + KeyValue::new( + PARQUET_META_SORT_FIELDS.to_string(), + "service|-timestamp_secs/V2".to_string(), + ), + KeyValue::new( + PARQUET_META_WINDOW_START.to_string(), + "1700000000".to_string(), + ), + KeyValue::new(PARQUET_META_WINDOW_DURATION.to_string(), "60".to_string()), + KeyValue::new(PARQUET_META_NUM_MERGE_OPS.to_string(), "0".to_string()), + ]; + let props: WriterProperties = cfg.to_writer_properties_with_metadata( + &batch_a.schema(), + Vec::new(), + Some(kvs), + &["service".to_string(), "timestamp_secs".to_string()], + ); + let mut buf: Vec = Vec::new(); + let mut writer = + ArrowWriter::try_new(&mut buf, batch_a.schema(), Some(props)).expect("arrow writer"); + writer.write(&batch_a).expect("write"); + writer.close().expect("close"); + let bytes_b = Bytes::from(buf); + + let inputs: Vec> = + vec![open_stream(bytes_a).await, open_stream(bytes_b).await]; + let tmp = TempDir::new().expect("tmpdir"); + let err = streaming_merge_sorted_parquet_files(inputs, tmp.path(), &merge_config(1)) + .await + .expect_err("must reject mismatched sort schema"); + let s = err.to_string(); + assert!( + s.contains("sort schema mismatch"), + "expected 'sort schema mismatch', got: {s}", + ); + } + + /// qh.* KV metadata is propagated to the output; num_merge_ops increments. + #[tokio::test] + async fn test_kv_metadata_propagated_to_output() { + let batch_a = make_sorted_batch(40, 0); + let bytes_a = write_input_parquet(std::slice::from_ref(&batch_a), &[]); + let inputs: Vec> = vec![open_stream(bytes_a).await]; + + let tmp = TempDir::new().expect("tmpdir"); + let outputs = streaming_merge_sorted_parquet_files(inputs, tmp.path(), &merge_config(1)) + .await + .expect("merge"); + let bytes = std::fs::read(&outputs[0].path).expect("read"); + let reader = SerializedFileReader::new(Bytes::from(bytes)).expect("ser reader"); + let kvs = reader + .metadata() + .file_metadata() + .key_value_metadata() + .cloned() + .unwrap_or_default(); + let find = |k: &str| -> Option { + kvs.iter() + .find(|kv| kv.key == k) + .and_then(|kv| kv.value.clone()) + }; + assert_eq!( + find(PARQUET_META_SORT_FIELDS).as_deref(), + Some("metric_name|-timestamp_secs/V2"), + ); + assert_eq!( + find(PARQUET_META_WINDOW_START).as_deref(), + Some("1700000000") + ); + assert_eq!(find(PARQUET_META_WINDOW_DURATION).as_deref(), Some("60")); + assert_eq!( + find(PARQUET_META_NUM_MERGE_OPS).as_deref(), + Some("1"), + "num_merge_ops must increment by 1 over input's max", + ); + } + + /// All-empty inputs produce no output. + #[tokio::test] + async fn test_all_empty_inputs_no_output() { + let empty = make_sorted_batch(0, 0); + let bytes = write_input_parquet(std::slice::from_ref(&empty), &[]); + let inputs: Vec> = vec![open_stream(bytes).await]; + + let tmp = TempDir::new().expect("tmpdir"); + let outputs = streaming_merge_sorted_parquet_files(inputs, tmp.path(), &merge_config(1)) + .await + .expect("merge"); + assert!(outputs.is_empty()); + } + + /// The streaming engine's output can be drained back via the new + /// page-bounded decoder. End-to-end sanity check. + #[tokio::test] + async fn test_output_drainable_by_stream_decoder() { + let batch_a = make_sorted_batch(40, 0); + let batch_b = make_sorted_batch(40, 40); + let bytes_a = write_input_parquet(std::slice::from_ref(&batch_a), &[]); + let bytes_b = write_input_parquet(std::slice::from_ref(&batch_b), &[]); + + let inputs: Vec> = + vec![open_stream(bytes_a).await, open_stream(bytes_b).await]; + let tmp = TempDir::new().expect("tmpdir"); + let outputs = streaming_merge_sorted_parquet_files(inputs, tmp.path(), &merge_config(1)) + .await + .expect("merge"); + + let bytes = std::fs::read(&outputs[0].path).expect("read"); + let mut output_stream = open_stream(Bytes::from(bytes)).await; + let mut decoder = StreamDecoder::new(&mut *output_stream); + let mut total_decoded = 0usize; + while let Some(page) = decoder.decode_next_page().await.expect("decode") { + // Count only sort col 0 (col_idx 0) pages to get a row count. + if page.col_idx == 0 { + total_decoded += page.array.len(); + } + } + assert_eq!(total_decoded, 80); + } + + /// Page-bounded contract sanity: with a row group large enough to + /// require many parquet pages per col, body col writes go through + /// the page-by-page assembler instead of materialising column + /// chunks. We can't directly observe peak memory from a test, but + /// we *can* assert that the merge completes correctly with input + /// data whose body cols span many pages, and that the output is + /// itself multi-page (no whole-column buffering happened on the + /// output side either). + #[tokio::test] + async fn test_body_col_streams_many_pages_per_column_chunk() { + // Force multiple pages per column chunk by setting a small + // data_page_row_count_limit. With 8000 rows and a 1000-row + // page limit, the output value col chunk must span ≥ 8 pages. + let batch = make_sorted_batch(8000, 0); + let bytes = write_input_parquet(std::slice::from_ref(&batch), &[]); + let inputs: Vec> = vec![open_stream(bytes).await]; + + let writer_config = ParquetWriterConfig { + compression: Compression::Snappy, + data_page_row_count_limit: 1000, + ..ParquetWriterConfig::default() + }; + let config = MergeConfig { + num_outputs: 1, + writer_config, + }; + + let tmp = TempDir::new().expect("tmpdir"); + let outputs = streaming_merge_sorted_parquet_files(inputs, tmp.path(), &config) + .await + .expect("merge"); + assert_eq!(outputs.len(), 1); + assert_eq!(outputs[0].num_rows, 8000); + + // Verify the output is itself multi-page-per-column (which is + // what page-bounded writing should produce, given the default + // data_page_size). Read via the page-bounded decoder and count + // pages for the value column. + let out_bytes = std::fs::read(&outputs[0].path).expect("read"); + let mut output_stream = open_stream(Bytes::from(out_bytes)).await; + // Find the "value" col index in the output's arrow schema BEFORE + // borrowing output_stream mutably for the decoder. + let arrow_schema = parquet::arrow::parquet_to_arrow_schema( + output_stream.metadata().file_metadata().schema_descr(), + None, + ) + .expect("arrow schema"); + let value_col_idx = arrow_schema.index_of("value").expect("value col"); + let mut decoder = StreamDecoder::new(&mut *output_stream); + + let mut value_pages = 0; + while let Some(page) = decoder.decode_next_page().await.expect("decode") { + if page.col_idx == value_col_idx { + value_pages += 1; + } + } + assert!( + value_pages >= 2, + "expected output 'value' col to span multiple pages (got {value_pages}); body col \ + writes should respect data_page_size", + ); + } + + /// Multi-RG input is rejected (PR-6b.2 simplification). + #[tokio::test] + async fn test_multi_rg_input_rejected() { + // Force a 2-RG file by writing two batches with row_group_size = 1 + // small enough to trip RG rollover. + let batch_a = make_sorted_batch(50, 0); + let batch_b = make_sorted_batch(50, 50); + + let cfg = ParquetWriterConfig { + compression: Compression::Snappy, + row_group_size: 50, // force one RG per 50-row batch + ..ParquetWriterConfig::default() + }; + let kvs = vec![ + KeyValue::new( + PARQUET_META_SORT_FIELDS.to_string(), + "metric_name|-timestamp_secs/V2".to_string(), + ), + KeyValue::new( + PARQUET_META_WINDOW_START.to_string(), + "1700000000".to_string(), + ), + KeyValue::new(PARQUET_META_WINDOW_DURATION.to_string(), "60".to_string()), + KeyValue::new(PARQUET_META_NUM_MERGE_OPS.to_string(), "0".to_string()), + ]; + let props: WriterProperties = cfg.to_writer_properties_with_metadata( + &batch_a.schema(), + Vec::new(), + Some(kvs), + &["metric_name".to_string(), "timestamp_secs".to_string()], + ); + let mut buf: Vec = Vec::new(); + let mut writer = + ArrowWriter::try_new(&mut buf, batch_a.schema(), Some(props)).expect("arrow writer"); + writer.write(&batch_a).expect("write"); + writer.write(&batch_b).expect("write"); + writer.close().expect("close"); + let bytes = Bytes::from(buf); + + let inputs: Vec> = vec![open_stream(bytes).await]; + let tmp = TempDir::new().expect("tmpdir"); + let err = streaming_merge_sorted_parquet_files(inputs, tmp.path(), &merge_config(1)) + .await + .expect_err("multi-RG input must be rejected"); + let s = err.to_string(); + assert!( + s.contains("single-row-group"), + "expected 'single-row-group' error, got: {s}", + ); + } +} diff --git a/quickwit/quickwit-parquet-engine/src/merge/writer.rs b/quickwit/quickwit-parquet-engine/src/merge/writer.rs index 3ac908dab3c..071726a3a7d 100644 --- a/quickwit/quickwit-parquet-engine/src/merge/writer.rs +++ b/quickwit/quickwit-parquet-engine/src/merge/writer.rs @@ -185,7 +185,7 @@ pub fn write_merge_outputs( /// /// Takes the relevant row ranges from each input according to the merge runs, /// concatenates into a single batch, and applies the permutation via `take`. -fn apply_merge_permutation( +pub(super) fn apply_merge_permutation( inputs: &[RecordBatch], union_schema: &SchemaRef, runs: &[MergeRun], @@ -254,7 +254,7 @@ fn predict_num_row_groups(num_rows: usize, row_group_size: usize) -> usize { /// `qh.rg_partition_prefix_len` KV — caller computes this based on /// whether the file is going to be single-RG (preserve input prefix) /// or multi-RG (must be 0). -fn build_merge_kv_metadata( +pub(super) fn build_merge_kv_metadata( input_meta: &InputMetadata, row_keys_proto: &Option>, zonemap_regexes: &std::collections::HashMap, @@ -324,7 +324,10 @@ fn build_merge_kv_metadata( } /// Build `SortingColumn` entries for Parquet file metadata. -fn build_sorting_columns(batch: &RecordBatch, sort_fields_str: &str) -> Result> { +pub(super) fn build_sorting_columns( + batch: &RecordBatch, + sort_fields_str: &str, +) -> Result> { let sort_schema = parse_sort_fields(sort_fields_str)?; let schema = batch.schema(); @@ -347,7 +350,7 @@ fn build_sorting_columns(batch: &RecordBatch, sort_fields_str: &str) -> Result Result> { +pub(super) fn resolve_sort_field_names(sort_fields_str: &str) -> Result> { let sort_schema = parse_sort_fields(sort_fields_str)?; Ok(sort_schema .column @@ -361,7 +364,7 @@ fn resolve_sort_field_names(sort_fields_str: &str) -> Result> { /// /// Checks that sorted_series values are non-decreasing, and within equal /// sorted_series values, timestamp_secs respects the schema's sort direction. -fn verify_sort_order(batch: &RecordBatch, sort_fields_str: &str) { +pub(super) fn verify_sort_order(batch: &RecordBatch, sort_fields_str: &str) { if batch.num_rows() <= 1 { return; } diff --git a/quickwit/quickwit-parquet-engine/src/storage/inspect.rs b/quickwit/quickwit-parquet-engine/src/storage/inspect.rs index 5172aa742bf..8a981b1e04b 100644 --- a/quickwit/quickwit-parquet-engine/src/storage/inspect.rs +++ b/quickwit/quickwit-parquet-engine/src/storage/inspect.rs @@ -405,7 +405,9 @@ type PageStat = (Option, Option, Option); fn per_page_stats(index: &ColumnIndexMetaData) -> Vec { fn primitive(idx: &PrimitiveColumnIndex, to_string: F) -> Vec - where F: Fn(&T) -> String { + where + F: Fn(&T) -> String, + { let num_pages = idx.num_pages() as usize; let mins: Vec> = idx.min_values_iter().collect(); let maxs: Vec> = idx.max_values_iter().collect(); diff --git a/quickwit/quickwit-parquet-engine/src/storage/streaming_writer.rs b/quickwit/quickwit-parquet-engine/src/storage/streaming_writer.rs index 550c03ce318..c8f22091560 100644 --- a/quickwit/quickwit-parquet-engine/src/storage/streaming_writer.rs +++ b/quickwit/quickwit-parquet-engine/src/storage/streaming_writer.rs @@ -283,10 +283,7 @@ impl<'a, W: Write + Send> RowGroupBuilder<'a, W> { /// is never opened. /// /// [`SerializedColumnWriter::write_batch`]: parquet::file::writer::SerializedColumnWriter - pub(crate) fn write_next_column_arrays( - &mut self, - arrays: I, - ) -> Result<(), ParquetWriteError> + pub(crate) fn write_next_column_arrays(&mut self, arrays: I) -> Result<(), ParquetWriteError> where I: IntoIterator, {